Make MultiNodeSpec shut down the conductor after other nodes. See #2230
This commit is contained in:
parent
91268365c1
commit
4e49b2c843
5 changed files with 75 additions and 26 deletions
|
|
@ -282,6 +282,8 @@ private[akka] class ServerFSM(val controller: ActorRef, val channel: Channel) ex
|
|||
import akka.actor.FSM._
|
||||
import Controller._
|
||||
|
||||
var roleName: RoleName = null
|
||||
|
||||
startWith(Initial, None)
|
||||
|
||||
whenUnhandled {
|
||||
|
|
@ -292,12 +294,15 @@ private[akka] class ServerFSM(val controller: ActorRef, val channel: Channel) ex
|
|||
}
|
||||
|
||||
onTermination {
|
||||
case _ ⇒ controller ! ClientDisconnected
|
||||
case _ ⇒
|
||||
controller ! ClientDisconnected(roleName)
|
||||
channel.close()
|
||||
}
|
||||
|
||||
when(Initial, stateTimeout = 10 seconds) {
|
||||
case Event(Hello(name, addr), _) ⇒
|
||||
controller ! NodeInfo(RoleName(name), addr, self)
|
||||
roleName = RoleName(name)
|
||||
controller ! NodeInfo(roleName, addr, self)
|
||||
goto(Ready)
|
||||
case Event(x: NetworkOp, _) ⇒
|
||||
log.warning("client {} sent no Hello in first message (instead {}), disconnecting", getAddrString(channel), x)
|
||||
|
|
@ -334,10 +339,6 @@ private[akka] class ServerFSM(val controller: ActorRef, val channel: Channel) ex
|
|||
}
|
||||
|
||||
initialize
|
||||
|
||||
onTermination {
|
||||
case _ ⇒ channel.close()
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -517,10 +518,13 @@ private[akka] class BarrierCoordinator extends Actor with LoggingFSM[BarrierCoor
|
|||
if (clients.find(_.name == n.name).isDefined) throw new DuplicateNode(d, n)
|
||||
stay using d.copy(clients = clients + n)
|
||||
case Event(ClientDisconnected(name), d @ Data(clients, _, arrived, _)) ⇒
|
||||
if (clients.isEmpty) throw BarrierEmpty(d, "cannot disconnect " + name + ": no client to disconnect")
|
||||
(clients find (_.name == name)) match {
|
||||
case None ⇒ stay
|
||||
case Some(c) ⇒ throw ClientLost(d.copy(clients = clients - c, arrived = arrived filterNot (_ == c.fsm)), name)
|
||||
if (arrived.isEmpty)
|
||||
stay using d.copy(clients = clients.filterNot(_.name == name))
|
||||
else {
|
||||
(clients find (_.name == name)) match {
|
||||
case None ⇒ stay
|
||||
case Some(c) ⇒ throw ClientLost(d.copy(clients = clients - c, arrived = arrived filterNot (_ == c.fsm)), name)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -0,0 +1,36 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
package akka.remote.testkit
|
||||
|
||||
import akka.testkit.LongRunningTest
|
||||
|
||||
object MultiNodeSpecMultiJvmSpec extends MultiNodeConfig {
|
||||
commonConfig(debugConfig(on = false))
|
||||
|
||||
val node1 = role("node1")
|
||||
val node2 = role("node2")
|
||||
val node3 = role("node3")
|
||||
val node4 = role("node4")
|
||||
}
|
||||
|
||||
class MultiNodeSpecSpecMultiJvmNode1 extends MultiNodeSpecSpec
|
||||
class MultiNodeSpecSpecMultiJvmNode2 extends MultiNodeSpecSpec
|
||||
class MultiNodeSpecSpecMultiJvmNode3 extends MultiNodeSpecSpec
|
||||
class MultiNodeSpecSpecMultiJvmNode4 extends MultiNodeSpecSpec
|
||||
|
||||
class MultiNodeSpecSpec extends MultiNodeSpec(MultiNodeSpecMultiJvmSpec) {
|
||||
|
||||
import MultiNodeSpecMultiJvmSpec._
|
||||
|
||||
def initialParticipants = 4
|
||||
|
||||
"A MultiNodeSpec" must {
|
||||
|
||||
"wait for all nodes to remove themselves before we shut the conductor down" taggedAs LongRunningTest in {
|
||||
enterBarrier("startup")
|
||||
// this test is empty here since it only exercises the shutdown code in the MultiNodeSpec
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
|
@ -59,14 +59,9 @@ class BarrierSpec extends AkkaSpec(BarrierSpec.config) with ImplicitSender {
|
|||
val b = getBarrier()
|
||||
b ! NodeInfo(A, AddressFromURIString("akka://sys"), system.deadLetters)
|
||||
b ! ClientDisconnected(B)
|
||||
EventFilter[ClientLost](occurrences = 1) intercept {
|
||||
b ! ClientDisconnected(A)
|
||||
}
|
||||
expectMsg(Failed(b, ClientLost(Data(Set(), "", Nil, null), A)))
|
||||
EventFilter[BarrierEmpty](occurrences = 1) intercept {
|
||||
b ! ClientDisconnected(A)
|
||||
}
|
||||
expectMsg(Failed(b, BarrierEmpty(Data(Set(), "", Nil, null), "cannot disconnect RoleName(a): no client to disconnect")))
|
||||
expectNoMsg(1 second)
|
||||
b ! ClientDisconnected(A)
|
||||
expectNoMsg(1 second)
|
||||
}
|
||||
|
||||
"fail entering barrier when nobody registered" taggedAs TimingTest in {
|
||||
|
|
@ -264,12 +259,9 @@ class BarrierSpec extends AkkaSpec(BarrierSpec.config) with ImplicitSender {
|
|||
b ! NodeInfo(A, AddressFromURIString("akka://sys"), testActor)
|
||||
expectMsg(ToClient(Done))
|
||||
b ! ClientDisconnected(B)
|
||||
EventFilter[ClientLost](occurrences = 1) intercept {
|
||||
b ! ClientDisconnected(A)
|
||||
}
|
||||
EventFilter[BarrierEmpty](occurrences = 1) intercept {
|
||||
b ! ClientDisconnected(A)
|
||||
}
|
||||
expectNoMsg(1 second)
|
||||
b ! ClientDisconnected(A)
|
||||
expectNoMsg(1 second)
|
||||
}
|
||||
|
||||
"fail entering barrier when nobody registered" taggedAs TimingTest in {
|
||||
|
|
|
|||
|
|
@ -7,12 +7,13 @@ import java.net.InetSocketAddress
|
|||
|
||||
import com.typesafe.config.{ ConfigObject, ConfigFactory, Config }
|
||||
|
||||
import akka.actor.{ RootActorPath, Deploy, ActorPath, ActorSystem, ExtendedActorSystem }
|
||||
import akka.actor.{ RootActorPath, ActorPath, ActorSystem, ExtendedActorSystem }
|
||||
import akka.dispatch.Await
|
||||
import akka.dispatch.Await.Awaitable
|
||||
import akka.remote.testconductor.{ TestConductorExt, TestConductor, RoleName }
|
||||
import akka.testkit.AkkaSpec
|
||||
import akka.util.{ Timeout, NonFatal, Duration }
|
||||
import akka.util.{ Timeout, NonFatal }
|
||||
import akka.util.duration._
|
||||
|
||||
/**
|
||||
* Configure the role names and participants of the test, including configuration settings.
|
||||
|
|
@ -261,4 +262,17 @@ abstract class MultiNodeSpec(val myself: RoleName, _system: ActorSystem, _roles:
|
|||
// useful to see which jvm is running which role
|
||||
log.info("Role [{}] started", myself.name)
|
||||
|
||||
// wait for all nodes to remove themselves before we shut the conductor down
|
||||
final override def beforeShutdown() = {
|
||||
if (selfIndex == 0) {
|
||||
testConductor.removeNode(myself)
|
||||
within(testConductor.Settings.BarrierTimeout.duration) {
|
||||
awaitCond {
|
||||
val nodes = testConductor.getNodes.await
|
||||
nodes.size < 1 || (nodes.size == 1 && nodes.head == myself)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -74,6 +74,7 @@ abstract class AkkaSpec(_system: ActorSystem)
|
|||
}
|
||||
|
||||
final override def afterAll {
|
||||
beforeShutdown()
|
||||
system.shutdown()
|
||||
try system.awaitTermination(5 seconds) catch {
|
||||
case _: TimeoutException ⇒ system.log.warning("Failed to stop [{}] within 5 seconds", system.name)
|
||||
|
|
@ -83,6 +84,8 @@ abstract class AkkaSpec(_system: ActorSystem)
|
|||
|
||||
protected def atStartup() {}
|
||||
|
||||
protected def beforeShutdown() {}
|
||||
|
||||
protected def atTermination() {}
|
||||
|
||||
def spawn(dispatcherId: String = Dispatchers.DefaultDispatcherId)(body: ⇒ Unit) {
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue