From 4e49b2c843ab08ddc08ee0ae98b9626b2a88eb02 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bj=C3=B6rn=20Antonsson?= Date: Thu, 21 Jun 2012 13:16:35 +0200 Subject: [PATCH] Make MultiNodeSpec shut down the conductor after other nodes. See #2230 --- .../akka/remote/testconductor/Conductor.scala | 24 +++++++------ .../remote/testkit/MultiNodeSpecSpec.scala | 36 +++++++++++++++++++ .../remote/testconductor/BarrierSpec.scala | 20 ++++------- .../akka/remote/testkit/MultiNodeSpec.scala | 18 ++++++++-- .../test/scala/akka/testkit/AkkaSpec.scala | 3 ++ 5 files changed, 75 insertions(+), 26 deletions(-) create mode 100644 akka-remote-tests/src/multi-jvm/scala/akka/remote/testkit/MultiNodeSpecSpec.scala diff --git a/akka-remote-tests/src/main/scala/akka/remote/testconductor/Conductor.scala b/akka-remote-tests/src/main/scala/akka/remote/testconductor/Conductor.scala index b6265125b1..eba0fffe63 100644 --- a/akka-remote-tests/src/main/scala/akka/remote/testconductor/Conductor.scala +++ b/akka-remote-tests/src/main/scala/akka/remote/testconductor/Conductor.scala @@ -282,6 +282,8 @@ private[akka] class ServerFSM(val controller: ActorRef, val channel: Channel) ex import akka.actor.FSM._ import Controller._ + var roleName: RoleName = null + startWith(Initial, None) whenUnhandled { @@ -292,12 +294,15 @@ private[akka] class ServerFSM(val controller: ActorRef, val channel: Channel) ex } onTermination { - case _ ⇒ controller ! ClientDisconnected + case _ ⇒ + controller ! ClientDisconnected(roleName) + channel.close() } when(Initial, stateTimeout = 10 seconds) { case Event(Hello(name, addr), _) ⇒ - controller ! NodeInfo(RoleName(name), addr, self) + roleName = RoleName(name) + controller ! NodeInfo(roleName, addr, self) goto(Ready) case Event(x: NetworkOp, _) ⇒ log.warning("client {} sent no Hello in first message (instead {}), disconnecting", getAddrString(channel), x) @@ -334,10 +339,6 @@ private[akka] class ServerFSM(val controller: ActorRef, val channel: Channel) ex } initialize - - onTermination { - case _ ⇒ channel.close() - } } /** @@ -517,10 +518,13 @@ private[akka] class BarrierCoordinator extends Actor with LoggingFSM[BarrierCoor if (clients.find(_.name == n.name).isDefined) throw new DuplicateNode(d, n) stay using d.copy(clients = clients + n) case Event(ClientDisconnected(name), d @ Data(clients, _, arrived, _)) ⇒ - if (clients.isEmpty) throw BarrierEmpty(d, "cannot disconnect " + name + ": no client to disconnect") - (clients find (_.name == name)) match { - case None ⇒ stay - case Some(c) ⇒ throw ClientLost(d.copy(clients = clients - c, arrived = arrived filterNot (_ == c.fsm)), name) + if (arrived.isEmpty) + stay using d.copy(clients = clients.filterNot(_.name == name)) + else { + (clients find (_.name == name)) match { + case None ⇒ stay + case Some(c) ⇒ throw ClientLost(d.copy(clients = clients - c, arrived = arrived filterNot (_ == c.fsm)), name) + } } } diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/testkit/MultiNodeSpecSpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/testkit/MultiNodeSpecSpec.scala new file mode 100644 index 0000000000..2a709a99a7 --- /dev/null +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/testkit/MultiNodeSpecSpec.scala @@ -0,0 +1,36 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ +package akka.remote.testkit + +import akka.testkit.LongRunningTest + +object MultiNodeSpecMultiJvmSpec extends MultiNodeConfig { + commonConfig(debugConfig(on = false)) + + val node1 = role("node1") + val node2 = role("node2") + val node3 = role("node3") + val node4 = role("node4") +} + +class MultiNodeSpecSpecMultiJvmNode1 extends MultiNodeSpecSpec +class MultiNodeSpecSpecMultiJvmNode2 extends MultiNodeSpecSpec +class MultiNodeSpecSpecMultiJvmNode3 extends MultiNodeSpecSpec +class MultiNodeSpecSpecMultiJvmNode4 extends MultiNodeSpecSpec + +class MultiNodeSpecSpec extends MultiNodeSpec(MultiNodeSpecMultiJvmSpec) { + + import MultiNodeSpecMultiJvmSpec._ + + def initialParticipants = 4 + + "A MultiNodeSpec" must { + + "wait for all nodes to remove themselves before we shut the conductor down" taggedAs LongRunningTest in { + enterBarrier("startup") + // this test is empty here since it only exercises the shutdown code in the MultiNodeSpec + } + + } +} diff --git a/akka-remote-tests/src/test/scala/akka/remote/testconductor/BarrierSpec.scala b/akka-remote-tests/src/test/scala/akka/remote/testconductor/BarrierSpec.scala index f418f4a717..8ff95d0831 100644 --- a/akka-remote-tests/src/test/scala/akka/remote/testconductor/BarrierSpec.scala +++ b/akka-remote-tests/src/test/scala/akka/remote/testconductor/BarrierSpec.scala @@ -59,14 +59,9 @@ class BarrierSpec extends AkkaSpec(BarrierSpec.config) with ImplicitSender { val b = getBarrier() b ! NodeInfo(A, AddressFromURIString("akka://sys"), system.deadLetters) b ! ClientDisconnected(B) - EventFilter[ClientLost](occurrences = 1) intercept { - b ! ClientDisconnected(A) - } - expectMsg(Failed(b, ClientLost(Data(Set(), "", Nil, null), A))) - EventFilter[BarrierEmpty](occurrences = 1) intercept { - b ! ClientDisconnected(A) - } - expectMsg(Failed(b, BarrierEmpty(Data(Set(), "", Nil, null), "cannot disconnect RoleName(a): no client to disconnect"))) + expectNoMsg(1 second) + b ! ClientDisconnected(A) + expectNoMsg(1 second) } "fail entering barrier when nobody registered" taggedAs TimingTest in { @@ -264,12 +259,9 @@ class BarrierSpec extends AkkaSpec(BarrierSpec.config) with ImplicitSender { b ! NodeInfo(A, AddressFromURIString("akka://sys"), testActor) expectMsg(ToClient(Done)) b ! ClientDisconnected(B) - EventFilter[ClientLost](occurrences = 1) intercept { - b ! ClientDisconnected(A) - } - EventFilter[BarrierEmpty](occurrences = 1) intercept { - b ! ClientDisconnected(A) - } + expectNoMsg(1 second) + b ! ClientDisconnected(A) + expectNoMsg(1 second) } "fail entering barrier when nobody registered" taggedAs TimingTest in { diff --git a/akka-remote-tests/src/test/scala/akka/remote/testkit/MultiNodeSpec.scala b/akka-remote-tests/src/test/scala/akka/remote/testkit/MultiNodeSpec.scala index 4d65a2084e..9f88f9e1c8 100644 --- a/akka-remote-tests/src/test/scala/akka/remote/testkit/MultiNodeSpec.scala +++ b/akka-remote-tests/src/test/scala/akka/remote/testkit/MultiNodeSpec.scala @@ -7,12 +7,13 @@ import java.net.InetSocketAddress import com.typesafe.config.{ ConfigObject, ConfigFactory, Config } -import akka.actor.{ RootActorPath, Deploy, ActorPath, ActorSystem, ExtendedActorSystem } +import akka.actor.{ RootActorPath, ActorPath, ActorSystem, ExtendedActorSystem } import akka.dispatch.Await import akka.dispatch.Await.Awaitable import akka.remote.testconductor.{ TestConductorExt, TestConductor, RoleName } import akka.testkit.AkkaSpec -import akka.util.{ Timeout, NonFatal, Duration } +import akka.util.{ Timeout, NonFatal } +import akka.util.duration._ /** * Configure the role names and participants of the test, including configuration settings. @@ -261,4 +262,17 @@ abstract class MultiNodeSpec(val myself: RoleName, _system: ActorSystem, _roles: // useful to see which jvm is running which role log.info("Role [{}] started", myself.name) + // wait for all nodes to remove themselves before we shut the conductor down + final override def beforeShutdown() = { + if (selfIndex == 0) { + testConductor.removeNode(myself) + within(testConductor.Settings.BarrierTimeout.duration) { + awaitCond { + val nodes = testConductor.getNodes.await + nodes.size < 1 || (nodes.size == 1 && nodes.head == myself) + } + } + } + } + } diff --git a/akka-testkit/src/test/scala/akka/testkit/AkkaSpec.scala b/akka-testkit/src/test/scala/akka/testkit/AkkaSpec.scala index 424c913662..f9ee989e1c 100644 --- a/akka-testkit/src/test/scala/akka/testkit/AkkaSpec.scala +++ b/akka-testkit/src/test/scala/akka/testkit/AkkaSpec.scala @@ -74,6 +74,7 @@ abstract class AkkaSpec(_system: ActorSystem) } final override def afterAll { + beforeShutdown() system.shutdown() try system.awaitTermination(5 seconds) catch { case _: TimeoutException ⇒ system.log.warning("Failed to stop [{}] within 5 seconds", system.name) @@ -83,6 +84,8 @@ abstract class AkkaSpec(_system: ActorSystem) protected def atStartup() {} + protected def beforeShutdown() {} + protected def atTermination() {} def spawn(dispatcherId: String = Dispatchers.DefaultDispatcherId)(body: ⇒ Unit) {