diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderElectionSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderElectionSpec.scala index c262fad8c3..56cfbee75d 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderElectionSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderElectionSpec.scala @@ -4,128 +4,100 @@ package akka.cluster +import com.typesafe.config.ConfigFactory +import akka.remote.testkit.MultiNodeConfig +import akka.remote.testkit.MultiNodeSpec import akka.testkit._ -import akka.dispatch._ -import akka.actor._ -import akka.remote._ -import akka.util.duration._ -import com.typesafe.config._ +object LeaderElectionMultiJvmSpec extends MultiNodeConfig { + val first = role("first") + val second = role("second") + val third = role("third") + val forth = role("forth") -import java.net.InetSocketAddress + commonConfig(debugConfig(on = false). + withFallback(ConfigFactory.parseString(""" + akka.cluster.auto-down = off + """)). + withFallback(MultiNodeClusterSpec.clusterConfig)) + +} + +class LeaderElectionMultiJvmNode1 extends LeaderElectionSpec +class LeaderElectionMultiJvmNode2 extends LeaderElectionSpec +class LeaderElectionMultiJvmNode3 extends LeaderElectionSpec +class LeaderElectionMultiJvmNode4 extends LeaderElectionSpec + +abstract class LeaderElectionSpec extends MultiNodeSpec(LeaderElectionMultiJvmSpec) with MultiNodeClusterSpec { + import LeaderElectionMultiJvmSpec._ + + override def initialParticipants = 4 + + val firstAddress = node(first).address + val myAddress = node(mySelf).address + + // sorted in the order used by the cluster + val roles = Seq(first, second, third, forth).sorted + + "A cluster of three nodes" must { + + "be able to 'elect' a single leader" in { + // make sure that the first cluster is started before other join + runOn(first) { + cluster + } + testConductor.enter("first-started") + + cluster.join(firstAddress) + awaitUpConvergence(numberOfMembers = 4) + cluster.isLeader must be(mySelf == roles.head) + testConductor.enter("after") + } + + def shutdownLeaderAndVerifyNewLeader(alreadyShutdown: Int): Unit = { + val currentRoles = roles.drop(alreadyShutdown) + currentRoles.size must be >= (2) + + runOn(currentRoles.head) { + cluster.shutdown() + testConductor.enter("after-shutdown") + testConductor.enter("after-down") + } + + // runOn previously shutdown cluster nodes + if ((roles diff currentRoles).contains(mySelf)) { + testConductor.enter("after-shutdown") + testConductor.enter("after-down") + } + + // runOn remaining cluster nodes + if (currentRoles.tail.contains(mySelf)) { + + testConductor.enter("after-shutdown") + + runOn(currentRoles.last) { + // user marks the shutdown leader as DOWN + val leaderAddress = node(currentRoles.head).address + cluster.down(leaderAddress) + } + + testConductor.enter("after-down") + + awaitUpConvergence(currentRoles.size - 1) + val nextExpectedLeader = currentRoles.tail.head + cluster.isLeader must be(mySelf == nextExpectedLeader) + } + + testConductor.enter("after") + } + + "be able to 're-elect' a single leader after leader has left" in { + shutdownLeaderAndVerifyNewLeader(alreadyShutdown = 0) + } + + "be able to 're-elect' a single leader after leader has left (again)" in { + shutdownLeaderAndVerifyNewLeader(alreadyShutdown = 1) + } + } -class LeaderElectionSpec extends ClusterSpec with ImplicitSender { - val portPrefix = 5 - - var node1: Cluster = _ - var node2: Cluster = _ - var node3: Cluster = _ - - var system1: ActorSystemImpl = _ - var system2: ActorSystemImpl = _ - var system3: ActorSystemImpl = _ - - try { - "A cluster of three nodes" must { - - // ======= NODE 1 ======== - system1 = ActorSystem("system1", ConfigFactory - .parseString(""" - akka { - actor.provider = "akka.remote.RemoteActorRefProvider" - remote.netty.port = %d550 - }""".format(portPrefix)) - .withFallback(system.settings.config)) - .asInstanceOf[ActorSystemImpl] - node1 = Cluster(system1) - val address1 = node1.remoteAddress - - // ======= NODE 2 ======== - system2 = ActorSystem("system2", ConfigFactory - .parseString(""" - akka { - actor.provider = "akka.remote.RemoteActorRefProvider" - remote.netty.port = %d551 - cluster.node-to-join = "akka://system1@localhost:%d550" - }""".format(portPrefix, portPrefix)) - .withFallback(system.settings.config)) - .asInstanceOf[ActorSystemImpl] - node2 = Cluster(system2) - val address2 = node2.remoteAddress - - // ======= NODE 3 ======== - system3 = ActorSystem("system3", ConfigFactory - .parseString(""" - akka { - actor.provider = "akka.remote.RemoteActorRefProvider" - remote.netty.port = %d552 - cluster.node-to-join = "akka://system1@localhost:%d550" - }""".format(portPrefix, portPrefix)) - .withFallback(system.settings.config)) - .asInstanceOf[ActorSystemImpl] - node3 = Cluster(system3) - val address3 = node3.remoteAddress - - "be able to 'elect' a single leader" taggedAs LongRunningTest in { - - println("Give the system time to converge...") - awaitConvergence(node1 :: node2 :: node3 :: Nil) - - // check leader - node1.isLeader must be(true) - node2.isLeader must be(false) - node3.isLeader must be(false) - } - - "be able to 're-elect' a single leader after leader has left" taggedAs LongRunningTest in { - - // shut down system1 - the leader - node1.shutdown() - system1.shutdown() - - // user marks node1 as DOWN - node2.down(address1) - - println("Give the system time to converge...") - Thread.sleep(10.seconds.dilated.toMillis) - awaitConvergence(node2 :: node3 :: Nil) - - // check leader - node2.isLeader must be(true) - node3.isLeader must be(false) - } - - "be able to 're-elect' a single leader after leader has left (again, leaving a single node)" taggedAs LongRunningTest in { - - // shut down system1 - the leader - node2.shutdown() - system2.shutdown() - - // user marks node2 as DOWN - node3.down(address2) - - println("Give the system time to converge...") - Thread.sleep(10.seconds.dilated.toMillis) - awaitConvergence(node3 :: Nil) - - // check leader - node3.isLeader must be(true) - } - } - } catch { - case e: Exception ⇒ - e.printStackTrace - fail(e.toString) - } - - override def atTermination() { - if (node1 ne null) node1.shutdown() - if (system1 ne null) system1.shutdown() - - if (node2 ne null) node2.shutdown() - if (system2 ne null) system2.shutdown() - - if (node3 ne null) node3.shutdown() - if (system3 ne null) system3.shutdown() - } } diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala index 873d819dbb..48f1d0b520 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala @@ -45,8 +45,7 @@ trait MultiNodeClusterSpec { self: MultiNodeSpec ⇒ */ def assertLeader(nodesInCluster: RoleName*): Unit = if (nodesInCluster.contains(mySelf)) { nodesInCluster.length must not be (0) - import Member.addressOrdering - val expectedLeader = nodesInCluster.map(role ⇒ (role, node(role).address)).sortBy(_._2).head._1 + val expectedLeader = roleOfLeader(nodesInCluster) cluster.isLeader must be(ifNode(expectedLeader)(true)(false)) } @@ -60,4 +59,21 @@ trait MultiNodeClusterSpec { self: MultiNodeSpec ⇒ awaitCond(cluster.convergence.isDefined, 10 seconds) } + def roleOfLeader(nodesInCluster: Seq[RoleName]): RoleName = { + nodesInCluster.length must not be (0) + nodesInCluster.sorted.head + } + + /** + * Sort the roles in the order used by the cluster. + */ + implicit val clusterOrdering: Ordering[RoleName] = new Ordering[RoleName] { + import Member.addressOrdering + def compare(x: RoleName, y: RoleName) = addressOrdering.compare(node(x).address, node(y).address) + } + + def roleName(address: Address): Option[RoleName] = { + testConductor.getNodes.await.find(node(_).address == address) + } + } \ No newline at end of file