diff --git a/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/ClusterSingletonManagerChaosSpec.scala b/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/ClusterSingletonManagerChaosSpec.scala new file mode 100644 index 0000000000..8b9d5bd71b --- /dev/null +++ b/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/ClusterSingletonManagerChaosSpec.scala @@ -0,0 +1,151 @@ +/** + * Copyright (C) 2009-2013 Typesafe Inc. + */ + +package akka.contrib.pattern + +import language.postfixOps +import scala.collection.immutable +import scala.concurrent.duration._ +import com.typesafe.config.ConfigFactory +import akka.actor.Actor +import akka.actor.ActorLogging +import akka.actor.ActorRef +import akka.actor.Address +import akka.actor.Props +import akka.actor.PoisonPill +import akka.actor.RootActorPath +import akka.cluster.Cluster +import akka.cluster.ClusterEvent._ +import akka.cluster.Member +import akka.remote.testconductor.RoleName +import akka.remote.testkit.MultiNodeConfig +import akka.remote.testkit.MultiNodeSpec +import akka.remote.testkit.STMultiNodeSpec +import akka.testkit._ +import akka.testkit.TestEvent._ +import akka.actor.Terminated + +object ClusterSingletonManagerChaosSpec extends MultiNodeConfig { + val controller = role("controller") + val first = role("first") + val second = role("second") + val third = role("third") + val fourth = role("fourth") + val fifth = role("fifth") + val sixth = role("sixth") + + commonConfig(ConfigFactory.parseString(""" + akka.loglevel = INFO + akka.actor.provider = "akka.cluster.ClusterActorRefProvider" + akka.remote.log-remote-lifecycle-events = off + akka.cluster.auto-join = off + akka.cluster.auto-down = on + """)) + + case object EchoStarted + /** + * The singleton actor + */ + class Echo(testActor: ActorRef) extends Actor { + testActor ! EchoStarted + + def receive = { + case _ ⇒ sender ! self + } + } +} + +class ClusterSingletonManagerChaosMultiJvmNode1 extends ClusterSingletonManagerChaosSpec +class ClusterSingletonManagerChaosMultiJvmNode2 extends ClusterSingletonManagerChaosSpec +class ClusterSingletonManagerChaosMultiJvmNode3 extends ClusterSingletonManagerChaosSpec +class ClusterSingletonManagerChaosMultiJvmNode4 extends ClusterSingletonManagerChaosSpec +class ClusterSingletonManagerChaosMultiJvmNode5 extends ClusterSingletonManagerChaosSpec +class ClusterSingletonManagerChaosMultiJvmNode6 extends ClusterSingletonManagerChaosSpec +class ClusterSingletonManagerChaosMultiJvmNode7 extends ClusterSingletonManagerChaosSpec + +class ClusterSingletonManagerChaosSpec extends MultiNodeSpec(ClusterSingletonManagerChaosSpec) with STMultiNodeSpec with ImplicitSender { + import ClusterSingletonManagerChaosSpec._ + + override def initialParticipants = roles.size + + // Sort the roles in the order used by the cluster. + lazy val sortedClusterRoles: immutable.IndexedSeq[RoleName] = { + implicit val clusterOrdering: Ordering[RoleName] = new Ordering[RoleName] { + import Member.addressOrdering + def compare(x: RoleName, y: RoleName) = + addressOrdering.compare(node(x).address, node(y).address) + } + roles.filterNot(_ == controller).toVector.sorted + } + + def join(from: RoleName, to: RoleName): Unit = { + runOn(from) { + Cluster(system) join node(to).address + createSingleton() + } + } + + def createSingleton(): ActorRef = { + system.actorOf(Props(new ClusterSingletonManager( + singletonProps = handOverData ⇒ Props(new Echo(testActor)), + singletonName = "echo", + terminationMessage = PoisonPill)), + name = "singleton") + } + + def crash(roles: RoleName*): Unit = { + runOn(controller) { + roles foreach { r ⇒ + log.info("Shutdown [{}]", node(r).address) + testConductor.shutdown(r, 0).await + } + } + } + + def echo(leader: RoleName): ActorRef = + system.actorFor(RootActorPath(node(leader).address) / "user" / "singleton" / "echo") + + def verify(leader: RoleName): Unit = { + enterBarrier("before-" + leader.name + "-verified") + runOn(leader) { + expectMsg(EchoStarted) + } + enterBarrier(leader.name + "-active") + + runOn(sortedClusterRoles.filterNot(_ == leader): _*) { + ignoreMsg { case EchoStarted ⇒ true } + echo(leader) ! "hello" + expectMsgType[ActorRef].path.address must be(node(leader).address) + ignoreNoMsg() + } + enterBarrier(leader.name + "-verified") + } + + "A ClusterSingletonManager in chaotic cluster" must { + + "startup 3 node cluster" in within(90 seconds) { + log.info("Sorted cluster nodes [{}]", sortedClusterRoles.map(node(_).address).mkString(", ")) + + join(sortedClusterRoles(5), sortedClusterRoles.last) + join(sortedClusterRoles(4), sortedClusterRoles.last) + join(sortedClusterRoles(3), sortedClusterRoles.last) + + verify(sortedClusterRoles(3)) + } + + "hand over when joining 3 more nodes" in within(90 seconds) { + join(sortedClusterRoles(2), sortedClusterRoles(3)) + join(sortedClusterRoles(1), sortedClusterRoles(4)) + join(sortedClusterRoles(0), sortedClusterRoles(5)) + + verify(sortedClusterRoles(0)) + } + + "take over when three leaders crash in 6 nodes cluster" in within(90 seconds) { + crash(sortedClusterRoles(0), sortedClusterRoles(1), sortedClusterRoles(2)) + verify(sortedClusterRoles(3)) + } + + } +} diff --git a/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/ClusterSingletonManagerSpec.scala b/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/ClusterSingletonManagerSpec.scala index 19c6a26a44..8e56c2d7b7 100644 --- a/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/ClusterSingletonManagerSpec.scala +++ b/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/ClusterSingletonManagerSpec.scala @@ -22,7 +22,6 @@ import akka.remote.testkit.MultiNodeConfig import akka.remote.testkit.MultiNodeSpec import akka.remote.testkit.STMultiNodeSpec import akka.testkit._ -import akka.testkit.ImplicitSender import akka.testkit.TestEvent._ import akka.actor.Terminated @@ -43,8 +42,6 @@ object ClusterSingletonManagerSpec extends MultiNodeConfig { akka.cluster.auto-down = on """)) - testTransport(on = true) - object PointToPointChannel { case object RegisterConsumer case object UnregisterConsumer @@ -196,6 +193,13 @@ class ClusterSingletonManagerSpec extends MultiNodeSpec(ClusterSingletonManagerS def queue: ActorRef = system.actorFor(node(controller) / "user" / "queue") + def join(from: RoleName, to: RoleName): Unit = { + runOn(from) { + Cluster(system) join node(to).address + createSingleton() + } + } + def createSingleton(): ActorRef = { //#create-singleton-manager system.actorOf(Props(new ClusterSingletonManager( @@ -255,58 +259,38 @@ class ClusterSingletonManagerSpec extends MultiNodeSpec(ClusterSingletonManagerS } enterBarrier("queue-started") - runOn(sortedClusterRoles(5)) { - Cluster(system) join node(sortedClusterRoles(5)).address - createSingleton() - } - + join(sortedClusterRoles.last, sortedClusterRoles.last) verify(sortedClusterRoles.last, msg = 1, expectedCurrent = 0) } "hand over when new leader joins to 1 node cluster" in within(15 seconds) { val newLeaderRole = sortedClusterRoles(4) - runOn(newLeaderRole) { - Cluster(system) join node(sortedClusterRoles.last).address - createSingleton() - } - + join(newLeaderRole, sortedClusterRoles.last) verify(newLeaderRole, msg = 2, expectedCurrent = 1) } "hand over when new leader joins to 2 nodes cluster" in within(15 seconds) { val newLeaderRole = sortedClusterRoles(3) - runOn(newLeaderRole) { - Cluster(system) join node(sortedClusterRoles.last).address - createSingleton() - } - + join(newLeaderRole, sortedClusterRoles.last) verify(newLeaderRole, msg = 3, expectedCurrent = 2) } - "hand over when adding three new potential leaders to 3 nodes cluster" in within(60 seconds) { - // this test will result in restart after retry timeout - // because the new leader will not know about the real previous leader and the - // previous leader sortedClusterRoles(3) will first think that sortedClusterRoles(2) - // is the new leader - runOn(controller) { - queue ! Reset - expectMsg(ResetOk) - } - runOn(sortedClusterRoles(2)) { - // previous leader - Cluster(system) join node(sortedClusterRoles(3)).address - createSingleton() - } - runOn(sortedClusterRoles(1)) { - Cluster(system) join node(sortedClusterRoles(4)).address - createSingleton() - } - runOn(sortedClusterRoles(0)) { - Cluster(system) join node(sortedClusterRoles(5)).address - createSingleton() - } + "hand over when new leader joins to 3 nodes cluster" in within(15 seconds) { + val newLeaderRole = sortedClusterRoles(2) + join(newLeaderRole, sortedClusterRoles.last) + verify(newLeaderRole, msg = 4, expectedCurrent = 3) + } - verify(sortedClusterRoles(0), msg = 4, expectedCurrent = 0) + "hand over when new leader joins to 4 nodes cluster" in within(15 seconds) { + val newLeaderRole = sortedClusterRoles(1) + join(newLeaderRole, sortedClusterRoles.last) + verify(newLeaderRole, msg = 5, expectedCurrent = 4) + } + + "hand over when new leader joins to 5 nodes cluster" in within(15 seconds) { + val newLeaderRole = sortedClusterRoles(0) + join(newLeaderRole, sortedClusterRoles.last) + verify(newLeaderRole, msg = 6, expectedCurrent = 5) } "hand over when leader leaves in 6 nodes cluster " in within(30 seconds) { @@ -319,7 +303,7 @@ class ClusterSingletonManagerSpec extends MultiNodeSpec(ClusterSingletonManagerS } //#test-leave - verify(newLeaderRole, msg = 5, expectedCurrent = 4) + verify(newLeaderRole, msg = 7, expectedCurrent = 6) runOn(leaveRole) { val singleton = system.actorFor("/user/singleton") @@ -337,17 +321,17 @@ class ClusterSingletonManagerSpec extends MultiNodeSpec(ClusterSingletonManagerS enterBarrier("logs-muted") crash(sortedClusterRoles(1)) - verify(sortedClusterRoles(2), msg = 6, expectedCurrent = 0) + verify(sortedClusterRoles(2), msg = 8, expectedCurrent = 0) } "take over when two leaders crash in 3 nodes cluster" in within(45 seconds) { crash(sortedClusterRoles(2), sortedClusterRoles(3)) - verify(sortedClusterRoles(4), msg = 7, expectedCurrent = 0) + verify(sortedClusterRoles(4), msg = 9, expectedCurrent = 0) } "take over when leader crashes in 2 nodes cluster" in within(25 seconds) { crash(sortedClusterRoles(4)) - verify(sortedClusterRoles(5), msg = 6, expectedCurrent = 0) + verify(sortedClusterRoles(5), msg = 10, expectedCurrent = 0) } }