diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderElectionSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderElectionSpec.scala new file mode 100644 index 0000000000..54f744a6c8 --- /dev/null +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderElectionSpec.scala @@ -0,0 +1,108 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ + +package akka.cluster + +import com.typesafe.config.ConfigFactory +import akka.remote.testkit.MultiNodeConfig +import akka.remote.testkit.MultiNodeSpec +import akka.testkit._ + +object LeaderElectionMultiJvmSpec extends MultiNodeConfig { + val controller = role("controller") + val first = role("first") + val second = role("second") + val third = role("third") + val fourth = role("fourth") + + commonConfig(debugConfig(on = false). + withFallback(ConfigFactory.parseString(""" + akka.cluster.auto-down = off + """)). + withFallback(MultiNodeClusterSpec.clusterConfig)) + +} + +class LeaderElectionMultiJvmNode1 extends LeaderElectionSpec +class LeaderElectionMultiJvmNode2 extends LeaderElectionSpec +class LeaderElectionMultiJvmNode3 extends LeaderElectionSpec +class LeaderElectionMultiJvmNode4 extends LeaderElectionSpec +class LeaderElectionMultiJvmNode5 extends LeaderElectionSpec + +abstract class LeaderElectionSpec extends MultiNodeSpec(LeaderElectionMultiJvmSpec) with MultiNodeClusterSpec { + import LeaderElectionMultiJvmSpec._ + + override def initialParticipants = 5 + + lazy val firstAddress = node(first).address + + // sorted in the order used by the cluster + lazy val roles = Seq(first, second, third, fourth).sorted + + "A cluster of four nodes" must { + + "be able to 'elect' a single leader" taggedAs LongRunningTest in { + // make sure that the node-to-join is started before other join + runOn(first) { + cluster + } + testConductor.enter("first-started") + + if (mySelf != controller) { + cluster.join(firstAddress) + awaitUpConvergence(numberOfMembers = roles.size) + cluster.isLeader must be(mySelf == roles.head) + } + testConductor.enter("after") + } + + def shutdownLeaderAndVerifyNewLeader(alreadyShutdown: Int): Unit = { + val currentRoles = roles.drop(alreadyShutdown) + currentRoles.size must be >= (2) + val leader = currentRoles.head + val aUser = currentRoles.last + + mySelf match { + + case `controller` ⇒ + testConductor.enter("before-shutdown") + testConductor.shutdown(leader, 0) + testConductor.removeNode(leader) + testConductor.enter("after-shutdown", "after-down", "completed") + + case `leader` ⇒ + testConductor.enter("before-shutdown") + // this node will be shutdown by the controller and doesn't participate in more barriers + + case `aUser` ⇒ + val leaderAddress = node(leader).address + testConductor.enter("before-shutdown", "after-shutdown") + // user marks the shutdown leader as DOWN + cluster.down(leaderAddress) + testConductor.enter("after-down", "completed") + + case _ if currentRoles.tail.contains(mySelf) ⇒ + // remaining cluster nodes, not shutdown + testConductor.enter("before-shutdown", "after-shutdown", "after-down") + + awaitUpConvergence(currentRoles.size - 1) + val nextExpectedLeader = currentRoles.tail.head + cluster.isLeader must be(mySelf == nextExpectedLeader) + + testConductor.enter("completed") + + } + + } + + "be able to 're-elect' a single leader after leader has left" taggedAs LongRunningTest in { + shutdownLeaderAndVerifyNewLeader(alreadyShutdown = 0) + } + + "be able to 're-elect' a single leader after leader has left (again)" taggedAs LongRunningTest in { + shutdownLeaderAndVerifyNewLeader(alreadyShutdown = 1) + } + } + +} diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala index 873d819dbb..48f1d0b520 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala @@ -45,8 +45,7 @@ trait MultiNodeClusterSpec { self: MultiNodeSpec ⇒ */ def assertLeader(nodesInCluster: RoleName*): Unit = if (nodesInCluster.contains(mySelf)) { nodesInCluster.length must not be (0) - import Member.addressOrdering - val expectedLeader = nodesInCluster.map(role ⇒ (role, node(role).address)).sortBy(_._2).head._1 + val expectedLeader = roleOfLeader(nodesInCluster) cluster.isLeader must be(ifNode(expectedLeader)(true)(false)) } @@ -60,4 +59,21 @@ trait MultiNodeClusterSpec { self: MultiNodeSpec ⇒ awaitCond(cluster.convergence.isDefined, 10 seconds) } + def roleOfLeader(nodesInCluster: Seq[RoleName]): RoleName = { + nodesInCluster.length must not be (0) + nodesInCluster.sorted.head + } + + /** + * Sort the roles in the order used by the cluster. + */ + implicit val clusterOrdering: Ordering[RoleName] = new Ordering[RoleName] { + import Member.addressOrdering + def compare(x: RoleName, y: RoleName) = addressOrdering.compare(node(x).address, node(y).address) + } + + def roleName(address: Address): Option[RoleName] = { + testConductor.getNodes.await.find(node(_).address == address) + } + } \ No newline at end of file diff --git a/akka-cluster/src/test/scala/akka/cluster/LeaderElectionSpec.scala b/akka-cluster/src/test/scala/akka/cluster/LeaderElectionSpec.scala deleted file mode 100644 index c262fad8c3..0000000000 --- a/akka-cluster/src/test/scala/akka/cluster/LeaderElectionSpec.scala +++ /dev/null @@ -1,131 +0,0 @@ -/** - * Copyright (C) 2009-2012 Typesafe Inc. - */ - -package akka.cluster - -import akka.testkit._ -import akka.dispatch._ -import akka.actor._ -import akka.remote._ -import akka.util.duration._ - -import com.typesafe.config._ - -import java.net.InetSocketAddress - -class LeaderElectionSpec extends ClusterSpec with ImplicitSender { - val portPrefix = 5 - - var node1: Cluster = _ - var node2: Cluster = _ - var node3: Cluster = _ - - var system1: ActorSystemImpl = _ - var system2: ActorSystemImpl = _ - var system3: ActorSystemImpl = _ - - try { - "A cluster of three nodes" must { - - // ======= NODE 1 ======== - system1 = ActorSystem("system1", ConfigFactory - .parseString(""" - akka { - actor.provider = "akka.remote.RemoteActorRefProvider" - remote.netty.port = %d550 - }""".format(portPrefix)) - .withFallback(system.settings.config)) - .asInstanceOf[ActorSystemImpl] - node1 = Cluster(system1) - val address1 = node1.remoteAddress - - // ======= NODE 2 ======== - system2 = ActorSystem("system2", ConfigFactory - .parseString(""" - akka { - actor.provider = "akka.remote.RemoteActorRefProvider" - remote.netty.port = %d551 - cluster.node-to-join = "akka://system1@localhost:%d550" - }""".format(portPrefix, portPrefix)) - .withFallback(system.settings.config)) - .asInstanceOf[ActorSystemImpl] - node2 = Cluster(system2) - val address2 = node2.remoteAddress - - // ======= NODE 3 ======== - system3 = ActorSystem("system3", ConfigFactory - .parseString(""" - akka { - actor.provider = "akka.remote.RemoteActorRefProvider" - remote.netty.port = %d552 - cluster.node-to-join = "akka://system1@localhost:%d550" - }""".format(portPrefix, portPrefix)) - .withFallback(system.settings.config)) - .asInstanceOf[ActorSystemImpl] - node3 = Cluster(system3) - val address3 = node3.remoteAddress - - "be able to 'elect' a single leader" taggedAs LongRunningTest in { - - println("Give the system time to converge...") - awaitConvergence(node1 :: node2 :: node3 :: Nil) - - // check leader - node1.isLeader must be(true) - node2.isLeader must be(false) - node3.isLeader must be(false) - } - - "be able to 're-elect' a single leader after leader has left" taggedAs LongRunningTest in { - - // shut down system1 - the leader - node1.shutdown() - system1.shutdown() - - // user marks node1 as DOWN - node2.down(address1) - - println("Give the system time to converge...") - Thread.sleep(10.seconds.dilated.toMillis) - awaitConvergence(node2 :: node3 :: Nil) - - // check leader - node2.isLeader must be(true) - node3.isLeader must be(false) - } - - "be able to 're-elect' a single leader after leader has left (again, leaving a single node)" taggedAs LongRunningTest in { - - // shut down system1 - the leader - node2.shutdown() - system2.shutdown() - - // user marks node2 as DOWN - node3.down(address2) - - println("Give the system time to converge...") - Thread.sleep(10.seconds.dilated.toMillis) - awaitConvergence(node3 :: Nil) - - // check leader - node3.isLeader must be(true) - } - } - } catch { - case e: Exception ⇒ - e.printStackTrace - fail(e.toString) - } - - override def atTermination() { - if (node1 ne null) node1.shutdown() - if (system1 ne null) system1.shutdown() - - if (node2 ne null) node2.shutdown() - if (system2 ne null) system2.shutdown() - - if (node3 ne null) node3.shutdown() - if (system3 ne null) system3.shutdown() - } -}