diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/ClusterDeathWatchSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/ClusterDeathWatchSpec.scala index dc8ee18e2b..6ccf9b4557 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/ClusterDeathWatchSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/ClusterDeathWatchSpec.scala @@ -24,6 +24,8 @@ import akka.actor.ActorIdentity import akka.actor.Identify import akka.actor.ActorRef import akka.remote.RemoteWatcher +import akka.actor.ActorSystem +import akka.cluster.MultiNodeClusterSpec.EndActor object ClusterDeathWatchMultiJvmSpec extends MultiNodeConfig { val first = role("first") @@ -39,6 +41,7 @@ object ClusterDeathWatchMultiJvmSpec extends MultiNodeConfig { class Hello extends Actor { def receive = Actor.emptyBehavior } + } class ClusterDeathWatchMultiJvmNode1 extends ClusterDeathWatchSpec @@ -206,6 +209,14 @@ abstract class ClusterDeathWatchSpec } "be able to shutdown system when using remote deployed actor on node that crash" taggedAs LongRunningTest in within(20 seconds) { + // fourth actor system will be shutdown, not part of testConductor any more + // so we can't use barriers to synchronize with it + val firstAddress = address(first) + runOn(first) { + system.actorOf(Props(classOf[EndActor], testActor, None), "end") + } + enterBarrier("end-actor-created") + runOn(fourth) { val hello = system.actorOf(Props[Hello], "hello") hello.isInstanceOf[RemoteActorRef] must be(true) @@ -231,14 +242,32 @@ abstract class ClusterDeathWatchSpec fail("Failed to stop [%s] within [%s] \n%s".format(system.name, timeout, system.asInstanceOf[ActorSystemImpl].printTree)) } + + // signal to the first node that fourth is done + val endSystem = ActorSystem("EndSystem", system.settings.config) + try { + val endProbe = TestProbe()(endSystem) + val endActor = endSystem.actorOf(Props(classOf[EndActor], endProbe.ref, Some(firstAddress)), "end") + endActor ! EndActor.SendEnd + endProbe.expectMsg(EndActor.EndAck) + + } finally { + endSystem.shutdown() + endSystem.awaitTermination(10 seconds) + } + // no barrier here, because it is not part of testConductor roles any more + } runOn(first, second, third, fifth) { enterBarrier("hello-deployed") enterBarrier("first-unavailable") + + // don't end the test until the fourth is done runOn(first) { // fourth system will be shutdown, remove to not participate in barriers any more testConductor.removeNode(fourth) + expectMsg(EndActor.End) } enterBarrier("after-4") diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala index 51eab3fea5..2268267275 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala @@ -18,6 +18,9 @@ import scala.concurrent.duration._ import scala.collection.immutable import java.util.concurrent.ConcurrentHashMap import akka.remote.DefaultFailureDetectorRegistry +import akka.actor.ActorRef +import akka.actor.Actor +import akka.actor.RootActorPath object MultiNodeClusterSpec { @@ -48,6 +51,28 @@ object MultiNodeClusterSpec { single-expect-default = 5 s } """) + + // sometimes we need to coordinate test shutdown with messages instead of barriers + object EndActor { + case object SendEnd + case object End + case object EndAck + } + + class EndActor(testActor: ActorRef, target: Option[Address]) extends Actor { + import EndActor._ + def receive = { + case SendEnd ⇒ + target foreach { t ⇒ + context.actorSelection(RootActorPath(t) / self.path.elements) ! End + } + case End ⇒ + testActor forward End + sender ! EndAck + case EndAck ⇒ + testActor forward EndAck + } + } } trait MultiNodeClusterSpec extends Suite with STMultiNodeSpec with WatchedByCoroner { self: MultiNodeSpec ⇒ diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/UnreachableNodeJoinsAgainSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/UnreachableNodeJoinsAgainSpec.scala index e33428cce4..673afbcd1c 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/UnreachableNodeJoinsAgainSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/UnreachableNodeJoinsAgainSpec.scala @@ -18,6 +18,7 @@ import akka.actor.Actor import akka.actor.ActorRef import akka.actor.Props import akka.actor.RootActorPath +import akka.cluster.MultiNodeClusterSpec.EndActor object UnreachableNodeJoinsAgainMultiNodeConfig extends MultiNodeConfig { val first = role("first") @@ -37,9 +38,6 @@ object UnreachableNodeJoinsAgainMultiNodeConfig extends MultiNodeConfig { testTransport(on = true) - class EndActor(testActor: ActorRef) extends Actor { - def receive = { case msg ⇒ testActor forward msg } - } } class UnreachableNodeJoinsAgainMultiJvmNode1 extends UnreachableNodeJoinsAgainSpec @@ -147,7 +145,7 @@ abstract class UnreachableNodeJoinsAgainSpec // so we can't use barriers to synchronize with it val masterAddress = address(master) runOn(master) { - system.actorOf(Props(classOf[EndActor], testActor), "end") + system.actorOf(Props(classOf[EndActor], testActor, None), "end") } enterBarrier("end-actor-created") @@ -186,7 +184,13 @@ abstract class UnreachableNodeJoinsAgainSpec awaitAssert(Cluster(freshSystem).readView.members.size must be(expectedNumberOfMembers)) awaitAssert(clusterView.members.map(_.status) must be(Set(MemberStatus.Up))) } - freshSystem.actorSelection(RootActorPath(master) / "user" / "end") ! "done" + + // signal to master node that victim is done + val endProbe = TestProbe()(freshSystem) + val endActor = freshSystem.actorOf(Props(classOf[EndActor], endProbe.ref, Some(masterAddress)), "end") + endActor ! EndActor.SendEnd + endProbe.expectMsg(EndActor.EndAck) + } finally { freshSystem.shutdown() freshSystem.awaitTermination(10 seconds) @@ -198,7 +202,7 @@ abstract class UnreachableNodeJoinsAgainSpec awaitMembersUp(expectedNumberOfMembers) // don't end the test until the freshSystem is done runOn(master) { - expectMsg("done") + expectMsg(EndActor.End) } endBarrier() }