add some tests for BarrierCoordinator and Controller

This commit is contained in:
Roland 2012-05-11 11:31:44 +02:00
parent 160aa73066
commit 439f653427
5 changed files with 599 additions and 62 deletions

View file

@ -1,5 +1,5 @@
/**
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.remote.testconductor
@ -100,7 +100,7 @@ trait Conductor { this: TestConductorExt ⇒
* on average), but that is countered by using the actual execution time for
* determining how much to send, leading to the correct output rate, but with
* increased latency.
*
*
* @param node is the symbolic name of the node which is to be affected
* @param target is the symbolic name of the other node to which connectivity shall be throttled
* @param direction can be either `Direction.Send`, `Direction.Receive` or `Direction.Both`
@ -116,7 +116,7 @@ trait Conductor { this: TestConductorExt ⇒
* sending and/or receiving: it will just drop all messages right before
* submitting them to the Socket or right after receiving them from the
* Socket.
*
*
* @param node is the symbolic name of the node which is to be affected
* @param target is the symbolic name of the other node to which connectivity shall be impeded
* @param direction can be either `Direction.Send`, `Direction.Receive` or `Direction.Both`
@ -130,7 +130,7 @@ trait Conductor { this: TestConductorExt ⇒
* Tell the remote support to shutdown the connection to the given remote
* peer. It works regardless of whether the recipient was initiator or
* responder.
*
*
* @param node is the symbolic name of the node which is to be affected
* @param target is the symbolic name of the other node to which connectivity shall be impeded
*/
@ -143,7 +143,7 @@ trait Conductor { this: TestConductorExt ⇒
* Tell the remote support to TCP_RESET the connection to the given remote
* peer. It works regardless of whether the recipient was initiator or
* responder.
*
*
* @param node is the symbolic name of the node which is to be affected
* @param target is the symbolic name of the other node to which connectivity shall be impeded
*/
@ -155,7 +155,7 @@ trait Conductor { this: TestConductorExt ⇒
/**
* Tell the remote node to shut itself down using System.exit with the given
* exitValue.
*
*
* @param node is the symbolic name of the node which is to be affected
* @param exitValue is the return code which shall be given to System.exit
*/
@ -166,7 +166,7 @@ trait Conductor { this: TestConductorExt ⇒
/**
* Tell the SBT plugin to forcibly terminate the given remote node using Process.destroy.
*
*
* @param node is the symbolic name of the node which is to be affected
*/
def kill(node: String): Future[Done] = {
@ -177,7 +177,7 @@ trait Conductor { this: TestConductorExt ⇒
/**
* Obtain the list of remote host names currently registered.
*/
def getNodes: Future[List[String]] = {
def getNodes: Future[Iterable[String]] = {
import Settings.QueryTimeout
controller ? GetNodes mapTo
}
@ -187,7 +187,7 @@ trait Conductor { this: TestConductorExt ⇒
* pass subsequent barriers. This must be done before the client connection
* breaks down in order to affect an orderly removal (i.e. without failing
* present and future barriers).
*
*
* @param node is the symbolic name of the node which is to be removed
*/
def removeNode(node: String): Future[Done] = {
@ -330,22 +330,32 @@ object Controller {
* [[akka.remote.testconductor.BarrierCoordinator]], its child) and allowing
* network and other failures to be injected at the test nodes.
*/
class Controller(_participants: Int) extends Actor {
class Controller(private var initialParticipants: Int) extends Actor {
import Controller._
var initialParticipants = _participants
import BarrierCoordinator._
val settings = TestConductor().Settings
val connection = RemoteConnection(Server, settings.host, settings.port,
new ConductorHandler(context.system, self, Logging(context.system, "ConductorHandler")))
/*
* Supervision of the BarrierCoordinator means to catch all his bad emotions
* and sometimes console him (BarrierEmpty, BarrierTimeout), sometimes tell
* him to hate the world (WrongBarrier, DuplicateNode, ClientLost). The latter shall help
* terminate broken tests as quickly as possible (i.e. without awaiting
* BarrierTimeouts in the players).
*/
override def supervisorStrategy = OneForOneStrategy() {
case e: BarrierCoordinator.BarrierTimeoutException SupervisorStrategy.Resume
case e: BarrierCoordinator.WrongBarrierException
for (NodeInfo(c, _, _) e.data.clients; info nodes get c)
barrier ! NodeInfo(c, info.addr, info.fsm)
for (c e.data.arrived) c ! BarrierFailed(e.barrier)
SupervisorStrategy.Restart
case BarrierTimeout(data) SupervisorStrategy.Resume
case BarrierEmpty(data, msg) SupervisorStrategy.Resume
case WrongBarrier(name, client, data) client ! Send(BarrierFailed(name)); failBarrier(data)
case ClientLost(data, node) failBarrier(data)
case DuplicateNode(data, node) failBarrier(data)
}
def failBarrier(data: Data): SupervisorStrategy.Directive = {
for (c data.arrived) c ! Send(BarrierFailed(data.barrier))
SupervisorStrategy.Restart
}
val barrier = context.actorOf(Props[BarrierCoordinator], "barriers")
@ -353,12 +363,20 @@ class Controller(_participants: Int) extends Actor {
override def receive = LoggingReceive {
case c @ NodeInfo(name, addr, fsm)
nodes += name -> c
barrier forward c
if (initialParticipants <= 0) sender ! Done
else if (nodes.size == initialParticipants) {
for (NodeInfo(_, _, client) nodes.values) client ! Send(Done)
initialParticipants = 0
if (nodes contains name) {
if (initialParticipants > 0) {
for (NodeInfo(_, _, client) nodes.values) client ! Send(BarrierFailed("initial startup"))
initialParticipants = 0
}
fsm ! Send(BarrierFailed("initial startup"))
} else {
nodes += name -> c
if (initialParticipants <= 0) fsm ! Send(Done)
else if (nodes.size == initialParticipants) {
for (NodeInfo(_, _, client) nodes.values) client ! Send(Done)
initialParticipants = 0
}
}
case c @ ClientDisconnected(name)
nodes -= name
@ -396,8 +414,16 @@ object BarrierCoordinator {
case class RemoveClient(name: String)
case class Data(clients: Set[Controller.NodeInfo], barrier: String, arrived: List[ActorRef])
class BarrierTimeoutException(val data: Data) extends RuntimeException(data.barrier) with NoStackTrace
class WrongBarrierException(val barrier: String, val client: ActorRef, val data: Data) extends RuntimeException(barrier) with NoStackTrace
trait Printer { this: Product with Throwable with NoStackTrace
override def toString = productPrefix + productIterator.mkString("(", ", ", ")")
}
case class BarrierTimeout(data: Data) extends RuntimeException(data.barrier) with NoStackTrace with Printer
case class DuplicateNode(data: Data, node: Controller.NodeInfo) extends RuntimeException with NoStackTrace with Printer
case class WrongBarrier(barrier: String, client: ActorRef, data: Data) extends RuntimeException(barrier) with NoStackTrace with Printer
case class BarrierEmpty(data: Data, msg: String) extends RuntimeException(msg) with NoStackTrace with Printer
case class ClientLost(data: Data, client: String) extends RuntimeException with NoStackTrace with Printer
}
/**
@ -426,26 +452,28 @@ class BarrierCoordinator extends Actor with LoggingFSM[BarrierCoordinator.State,
whenUnhandled {
case Event(n: NodeInfo, d @ Data(clients, _, _))
if (clients.find(_.name == n.name).isDefined) throw new DuplicateNode(d, n)
stay using d.copy(clients = clients + n)
case Event(ClientDisconnected(name), d @ Data(clients, _, arrived))
if (clients.isEmpty) throw BarrierEmpty(d, "no client to disconnect")
(clients find (_.name == name)) match {
case None stay
case Some(c) throw ClientLost(d.copy(clients = clients - c, arrived = arrived filterNot (_ == c.fsm)), name)
}
}
when(Idle) {
case Event(EnterBarrier(name), d @ Data(clients, _, _))
if (clients.isEmpty) throw new IllegalStateException("no client expected yet")
case Event(e @ EnterBarrier(name), d @ Data(clients, _, _))
if (failed)
stay replying BarrierFailed(name)
stay replying Send(BarrierFailed(name))
else if (clients.map(_.fsm) == Set(sender))
stay replying Send(e)
else if (clients.find(_.fsm == sender).isEmpty)
stay replying Send(BarrierFailed(name))
else
goto(Waiting) using d.copy(barrier = name, arrived = sender :: Nil)
case Event(ClientDisconnected(name), d @ Data(clients, _, _))
if (clients.isEmpty) throw new IllegalStateException("no client to disconnect")
(clients filterNot (_.name == name)) match {
case `clients` stay
case c
failed = true
stay using d.copy(clients = c)
}
case Event(RemoveClient(name), d @ Data(clients, _, _))
if (clients.isEmpty) throw new IllegalStateException("no client to remove")
if (clients.isEmpty) throw BarrierEmpty(d, "no client to remove")
stay using d.copy(clients = clients filterNot (_.name == name))
}
@ -456,36 +484,33 @@ class BarrierCoordinator extends Actor with LoggingFSM[BarrierCoordinator.State,
when(Waiting) {
case Event(e @ EnterBarrier(name), d @ Data(clients, barrier, arrived))
if (name != barrier) throw new WrongBarrierException(barrier, sender, d)
if (name != barrier || clients.find(_.fsm == sender).isEmpty) throw WrongBarrier(name, sender, d)
val together = sender :: arrived
handleBarrier(d.copy(arrived = together))
case Event(RemoveClient(name), d @ Data(clients, barrier, arrived))
val newClients = clients filterNot (_.name == name)
val newArrived = arrived filterNot (_ == name)
handleBarrier(d.copy(clients = newClients, arrived = newArrived))
case Event(ClientDisconnected(name), d @ Data(clients, barrier, arrived))
(clients filterNot (_.name == name)) match {
case `clients` stay
case c
val f = BarrierFailed(barrier)
arrived foreach (_ ! Send(f))
failed = true
goto(Idle) using Data(c, "", Nil)
clients find (_.name == name) match {
case None stay
case Some(client)
handleBarrier(d.copy(clients = clients - client, arrived = arrived filterNot (_ == client.fsm)))
}
case Event(StateTimeout, data)
throw new BarrierTimeoutException(data)
throw BarrierTimeout(data)
}
initialize
def handleBarrier(data: Data): State =
if ((data.clients.map(_.fsm) -- data.arrived).isEmpty) {
def handleBarrier(data: Data): State = {
log.debug("handleBarrier({})", data)
if (data.arrived.isEmpty) {
goto(Idle) using data.copy(barrier = "")
} else if ((data.clients.map(_.fsm) -- data.arrived).isEmpty) {
val e = EnterBarrier(data.barrier)
data.arrived foreach (_ ! Send(e))
goto(Idle) using data.copy(barrier = "", arrived = Nil)
} else {
stay using data
}
}
}

View file

@ -13,7 +13,7 @@ import akka.actor.Address
/**
* Access to the [[akka.remote.testconductor.TestConductorExt]] extension:
*
*
* {{{
* val tc = TestConductor(system)
* tc.startController(numPlayers)

View file

@ -55,9 +55,9 @@ trait Player { this: TestConductorExt ⇒
def receive = {
case fsm: ActorRef waiting = sender; fsm ! SubscribeTransitionCallBack(self)
case Transition(_, Connecting, AwaitDone) // step 1, not there yet
case Transition(_, AwaitDone, Connected) waiting ! Done
case t: Transition[_] waiting ! Status.Failure(new RuntimeException("unexpected transition: " + t))
case CurrentState(_, Connected) waiting ! Done
case Transition(_, AwaitDone, Connected) waiting ! Done; context stop self
case t: Transition[_] waiting ! Status.Failure(new RuntimeException("unexpected transition: " + t)); context stop self
case CurrentState(_, Connected) waiting ! Done; context stop self
case _: CurrentState[_]
}
}))
@ -84,6 +84,7 @@ object ClientFSM {
case object Connecting extends State
case object AwaitDone extends State
case object Connected extends State
case object Failed extends State
case class Data(channel: Channel, barrier: Option[(String, ActorRef)])
@ -116,24 +117,24 @@ class ClientFSM(port: Int) extends Actor with LoggingFSM[ClientFSM.State, Client
channel.write(Hello(settings.name, TestConductor().address))
goto(AwaitDone)
case Event(_: ConnectionFailure, _)
// System.exit(1)
stop
goto(Failed)
case Event(StateTimeout, _)
log.error("connect timeout to TestConductor")
// System.exit(1)
stop
goto(Failed)
}
when(AwaitDone, stateTimeout = settings.BarrierTimeout.duration) {
case Event(Done, _)
log.debug("received Done: starting test")
goto(Connected)
case Event(msg: NetworkOp, _)
log.error("received {} instead of Done", msg)
goto(Failed)
case Event(msg: ClientOp, _)
stay replying Status.Failure(new IllegalStateException("not connected yet"))
case Event(StateTimeout, _)
log.error("connect timeout to TestConductor")
// System.exit(1)
stop
goto(Failed)
}
when(Connected) {
@ -180,6 +181,14 @@ class ClientFSM(port: Int) extends Actor with LoggingFSM[ClientFSM.State, Client
stay // needed because Java doesnt have Nothing
}
when(Failed) {
case Event(msg: ClientOp, _)
stay replying Status.Failure(new RuntimeException("cannot do " + msg + " while Failed"))
case Event(msg: NetworkOp, _)
log.warning("ignoring network message {} while Failed", msg)
stay
}
onTermination {
case StopEvent(_, _, Data(channel, _))
channel.close()

View file

@ -0,0 +1,465 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.remote.testconductor
import akka.testkit.AkkaSpec
import akka.actor.Props
import akka.actor.AddressFromURIString
import akka.actor.ActorRef
import akka.testkit.ImplicitSender
import akka.actor.Actor
import akka.actor.OneForOneStrategy
import akka.actor.SupervisorStrategy
import akka.testkit.EventFilter
import akka.testkit.TestProbe
import akka.util.duration._
import akka.event.Logging
import org.scalatest.BeforeAndAfterEach
object BarrierSpec {
case class Failed(ref: ActorRef, thr: Throwable)
val config = """
akka.testconductor.barrier-timeout = 5s
akka.actor.provider = akka.remote.RemoteActorRefProvider
akka.remote.netty.port = 0
akka.actor.debug.fsm = on
akka.actor.debug.lifecycle = on
"""
}
class BarrierSpec extends AkkaSpec(BarrierSpec.config) with ImplicitSender with BeforeAndAfterEach {
import BarrierSpec._
import Controller._
import BarrierCoordinator._
override def afterEach {
system.eventStream.setLogLevel(Logging.WarningLevel)
}
"A BarrierCoordinator" must {
"register clients and remove them" in {
val b = getBarrier()
b ! NodeInfo("a", AddressFromURIString("akka://sys"), system.deadLetters)
b ! RemoveClient("b")
b ! RemoveClient("a")
EventFilter[BarrierEmpty](occurrences = 1) intercept {
b ! RemoveClient("a")
}
expectMsg(Failed(b, BarrierEmpty(Data(Set(), "", Nil), "no client to remove")))
}
"register clients and disconnect them" in {
val b = getBarrier()
b ! NodeInfo("a", AddressFromURIString("akka://sys"), system.deadLetters)
b ! ClientDisconnected("b")
EventFilter[ClientLost](occurrences = 1) intercept {
b ! ClientDisconnected("a")
}
expectMsg(Failed(b, ClientLost(Data(Set(), "", Nil), "a")))
EventFilter[BarrierEmpty](occurrences = 1) intercept {
b ! ClientDisconnected("a")
}
expectMsg(Failed(b, BarrierEmpty(Data(Set(), "", Nil), "no client to disconnect")))
}
"fail entering barrier when nobody registered" in {
val b = getBarrier()
b ! EnterBarrier("b")
expectMsg(Send(BarrierFailed("b")))
}
"enter barrier" in {
val barrier = getBarrier()
val a, b = TestProbe()
barrier ! NodeInfo("a", AddressFromURIString("akka://sys"), a.ref)
barrier ! NodeInfo("b", AddressFromURIString("akka://sys"), b.ref)
a.send(barrier, EnterBarrier("bar"))
noMsg(a, b)
within(1 second) {
b.send(barrier, EnterBarrier("bar"))
a.expectMsg(Send(EnterBarrier("bar")))
b.expectMsg(Send(EnterBarrier("bar")))
}
}
"enter barrier with joining node" in {
val barrier = getBarrier()
val a, b, c = TestProbe()
barrier ! NodeInfo("a", AddressFromURIString("akka://sys"), a.ref)
barrier ! NodeInfo("b", AddressFromURIString("akka://sys"), b.ref)
a.send(barrier, EnterBarrier("bar"))
barrier ! NodeInfo("c", AddressFromURIString("akka://sys"), c.ref)
b.send(barrier, EnterBarrier("bar"))
noMsg(a, b, c)
within(1 second) {
c.send(barrier, EnterBarrier("bar"))
a.expectMsg(Send(EnterBarrier("bar")))
b.expectMsg(Send(EnterBarrier("bar")))
c.expectMsg(Send(EnterBarrier("bar")))
}
}
"enter barrier with leaving node" in {
val barrier = getBarrier()
val a, b, c = TestProbe()
barrier ! NodeInfo("a", AddressFromURIString("akka://sys"), a.ref)
barrier ! NodeInfo("b", AddressFromURIString("akka://sys"), b.ref)
barrier ! NodeInfo("c", AddressFromURIString("akka://sys"), c.ref)
a.send(barrier, EnterBarrier("bar"))
b.send(barrier, EnterBarrier("bar"))
barrier ! RemoveClient("a")
barrier ! ClientDisconnected("a")
noMsg(a, b, c)
b.within(1 second) {
barrier ! RemoveClient("c")
b.expectMsg(Send(EnterBarrier("bar")))
}
barrier ! ClientDisconnected("c")
expectNoMsg(1 second)
}
"leave barrier when last “arrived” is removed" in {
val barrier = getBarrier()
val a, b = TestProbe()
barrier ! NodeInfo("a", AddressFromURIString("akka://sys"), a.ref)
barrier ! NodeInfo("b", AddressFromURIString("akka://sys"), b.ref)
a.send(barrier, EnterBarrier("bar"))
barrier ! RemoveClient("a")
b.send(barrier, EnterBarrier("foo"))
b.expectMsg(Send(EnterBarrier("foo")))
}
"fail barrier with disconnecing node" in {
val barrier = getBarrier()
val a, b = TestProbe()
val nodeA = NodeInfo("a", AddressFromURIString("akka://sys"), a.ref)
barrier ! nodeA
barrier ! NodeInfo("b", AddressFromURIString("akka://sys"), b.ref)
a.send(barrier, EnterBarrier("bar"))
EventFilter[ClientLost](occurrences = 1) intercept {
barrier ! ClientDisconnected("b")
}
expectMsg(Failed(barrier, ClientLost(Data(Set(nodeA), "bar", a.ref :: Nil), "b")))
}
"fail barrier with disconnecing node who already arrived" in {
val barrier = getBarrier()
val a, b, c = TestProbe()
val nodeA = NodeInfo("a", AddressFromURIString("akka://sys"), a.ref)
val nodeC = NodeInfo("c", AddressFromURIString("akka://sys"), c.ref)
barrier ! nodeA
barrier ! NodeInfo("b", AddressFromURIString("akka://sys"), b.ref)
barrier ! nodeC
a.send(barrier, EnterBarrier("bar"))
b.send(barrier, EnterBarrier("bar"))
EventFilter[ClientLost](occurrences = 1) intercept {
barrier ! ClientDisconnected("b")
}
expectMsg(Failed(barrier, ClientLost(Data(Set(nodeA, nodeC), "bar", a.ref :: Nil), "b")))
}
"fail when entering wrong barrier" in {
val barrier = getBarrier()
val a, b = TestProbe()
val nodeA = NodeInfo("a", AddressFromURIString("akka://sys"), a.ref)
barrier ! nodeA
val nodeB = NodeInfo("b", AddressFromURIString("akka://sys"), b.ref)
barrier ! nodeB
a.send(barrier, EnterBarrier("bar"))
EventFilter[WrongBarrier](occurrences = 1) intercept {
b.send(barrier, EnterBarrier("foo"))
}
expectMsg(Failed(barrier, WrongBarrier("foo", b.ref, Data(Set(nodeA, nodeB), "bar", a.ref :: Nil))))
}
"fail barrier after first failure" in {
val barrier = getBarrier()
val a = TestProbe()
EventFilter[BarrierEmpty](occurrences = 1) intercept {
barrier ! RemoveClient("a")
}
expectMsg(Failed(barrier, BarrierEmpty(Data(Set(), "", Nil), "no client to remove")))
barrier ! NodeInfo("a", AddressFromURIString("akka://sys"), a.ref)
a.send(barrier, EnterBarrier("right"))
a.expectMsg(Send(BarrierFailed("right")))
}
"fail after barrier timeout" in {
val barrier = getBarrier()
val a, b = TestProbe()
val nodeA = NodeInfo("a", AddressFromURIString("akka://sys"), a.ref)
val nodeB = NodeInfo("b", AddressFromURIString("akka://sys"), b.ref)
barrier ! nodeA
barrier ! nodeB
a.send(barrier, EnterBarrier("right"))
EventFilter[BarrierTimeout](occurrences = 1) intercept {
expectMsg(7 seconds, Failed(barrier, BarrierTimeout(Data(Set(nodeA, nodeB), "right", a.ref :: Nil))))
}
}
"fail if a node registers twice" in {
val barrier = getBarrier()
val a, b = TestProbe()
val nodeA = NodeInfo("a", AddressFromURIString("akka://sys"), a.ref)
val nodeB = NodeInfo("a", AddressFromURIString("akka://sys"), b.ref)
barrier ! nodeA
EventFilter[DuplicateNode](occurrences = 1) intercept {
barrier ! nodeB
}
expectMsg(Failed(barrier, DuplicateNode(Data(Set(nodeA), "", Nil), nodeB)))
}
"finally have no failure messages left" in {
expectNoMsg(1 second)
}
}
"A Controller with BarrierCoordinator" must {
"register clients and remove them" in {
val b = getController(1)
b ! NodeInfo("a", AddressFromURIString("akka://sys"), testActor)
expectMsg(Send(Done))
b ! Remove("b")
b ! Remove("a")
EventFilter[BarrierEmpty](occurrences = 1) intercept {
b ! Remove("a")
}
}
"register clients and disconnect them" in {
val b = getController(1)
b ! NodeInfo("a", AddressFromURIString("akka://sys"), testActor)
expectMsg(Send(Done))
b ! ClientDisconnected("b")
EventFilter[ClientLost](occurrences = 1) intercept {
b ! ClientDisconnected("a")
}
EventFilter[BarrierEmpty](occurrences = 1) intercept {
b ! ClientDisconnected("a")
}
}
"fail entering barrier when nobody registered" in {
val b = getController(0)
b ! EnterBarrier("b")
expectMsg(Send(BarrierFailed("b")))
}
"enter barrier" in {
val barrier = getController(2)
val a, b = TestProbe()
barrier ! NodeInfo("a", AddressFromURIString("akka://sys"), a.ref)
barrier ! NodeInfo("b", AddressFromURIString("akka://sys"), b.ref)
a.expectMsg(Send(Done))
b.expectMsg(Send(Done))
a.send(barrier, EnterBarrier("bar"))
noMsg(a, b)
within(1 second) {
b.send(barrier, EnterBarrier("bar"))
a.expectMsg(Send(EnterBarrier("bar")))
b.expectMsg(Send(EnterBarrier("bar")))
}
}
"enter barrier with joining node" in {
val barrier = getController(2)
val a, b, c = TestProbe()
barrier ! NodeInfo("a", AddressFromURIString("akka://sys"), a.ref)
barrier ! NodeInfo("b", AddressFromURIString("akka://sys"), b.ref)
a.expectMsg(Send(Done))
b.expectMsg(Send(Done))
a.send(barrier, EnterBarrier("bar"))
barrier ! NodeInfo("c", AddressFromURIString("akka://sys"), c.ref)
c.expectMsg(Send(Done))
b.send(barrier, EnterBarrier("bar"))
noMsg(a, b, c)
within(1 second) {
c.send(barrier, EnterBarrier("bar"))
a.expectMsg(Send(EnterBarrier("bar")))
b.expectMsg(Send(EnterBarrier("bar")))
c.expectMsg(Send(EnterBarrier("bar")))
}
}
"enter barrier with leaving node" in {
val barrier = getController(3)
val a, b, c = TestProbe()
barrier ! NodeInfo("a", AddressFromURIString("akka://sys"), a.ref)
barrier ! NodeInfo("b", AddressFromURIString("akka://sys"), b.ref)
barrier ! NodeInfo("c", AddressFromURIString("akka://sys"), c.ref)
a.expectMsg(Send(Done))
b.expectMsg(Send(Done))
c.expectMsg(Send(Done))
a.send(barrier, EnterBarrier("bar"))
b.send(barrier, EnterBarrier("bar"))
barrier ! Remove("a")
barrier ! ClientDisconnected("a")
noMsg(a, b, c)
b.within(1 second) {
barrier ! Remove("c")
b.expectMsg(Send(EnterBarrier("bar")))
}
barrier ! ClientDisconnected("c")
expectNoMsg(1 second)
}
"leave barrier when last “arrived” is removed" in {
val barrier = getController(2)
val a, b = TestProbe()
barrier ! NodeInfo("a", AddressFromURIString("akka://sys"), a.ref)
barrier ! NodeInfo("b", AddressFromURIString("akka://sys"), b.ref)
a.expectMsg(Send(Done))
b.expectMsg(Send(Done))
a.send(barrier, EnterBarrier("bar"))
barrier ! Remove("a")
b.send(barrier, EnterBarrier("foo"))
b.expectMsg(Send(EnterBarrier("foo")))
}
"fail barrier with disconnecing node" in {
val barrier = getController(2)
val a, b = TestProbe()
val nodeA = NodeInfo("a", AddressFromURIString("akka://sys"), a.ref)
barrier ! nodeA
barrier ! NodeInfo("b", AddressFromURIString("akka://sys"), b.ref)
a.expectMsg(Send(Done))
b.expectMsg(Send(Done))
a.send(barrier, EnterBarrier("bar"))
barrier ! ClientDisconnected("unknown")
noMsg(a)
EventFilter[ClientLost](occurrences = 1) intercept {
barrier ! ClientDisconnected("b")
}
a.expectMsg(Send(BarrierFailed("bar")))
}
"fail barrier with disconnecing node who already arrived" in {
val barrier = getController(3)
val a, b, c = TestProbe()
val nodeA = NodeInfo("a", AddressFromURIString("akka://sys"), a.ref)
val nodeC = NodeInfo("c", AddressFromURIString("akka://sys"), c.ref)
barrier ! nodeA
barrier ! NodeInfo("b", AddressFromURIString("akka://sys"), b.ref)
barrier ! nodeC
a.expectMsg(Send(Done))
b.expectMsg(Send(Done))
c.expectMsg(Send(Done))
a.send(barrier, EnterBarrier("bar"))
b.send(barrier, EnterBarrier("bar"))
EventFilter[ClientLost](occurrences = 1) intercept {
barrier ! ClientDisconnected("b")
}
a.expectMsg(Send(BarrierFailed("bar")))
}
"fail when entering wrong barrier" in {
val barrier = getController(2)
val a, b = TestProbe()
val nodeA = NodeInfo("a", AddressFromURIString("akka://sys"), a.ref)
barrier ! nodeA
val nodeB = NodeInfo("b", AddressFromURIString("akka://sys"), b.ref)
barrier ! nodeB
a.expectMsg(Send(Done))
b.expectMsg(Send(Done))
a.send(barrier, EnterBarrier("bar"))
EventFilter[WrongBarrier](occurrences = 1) intercept {
b.send(barrier, EnterBarrier("foo"))
}
a.expectMsg(Send(BarrierFailed("bar")))
b.expectMsg(Send(BarrierFailed("foo")))
}
"not really fail after barrier timeout" in {
val barrier = getController(2)
val a, b = TestProbe()
val nodeA = NodeInfo("a", AddressFromURIString("akka://sys"), a.ref)
val nodeB = NodeInfo("b", AddressFromURIString("akka://sys"), b.ref)
barrier ! nodeA
barrier ! nodeB
a.expectMsg(Send(Done))
b.expectMsg(Send(Done))
a.send(barrier, EnterBarrier("right"))
EventFilter[BarrierTimeout](occurrences = 1) intercept {
Thread.sleep(5000)
}
b.send(barrier, EnterBarrier("right"))
a.expectMsg(Send(EnterBarrier("right")))
b.expectMsg(Send(EnterBarrier("right")))
}
"fail if a node registers twice" in {
val controller = getController(2)
val a, b = TestProbe()
val nodeA = NodeInfo("a", AddressFromURIString("akka://sys"), a.ref)
val nodeB = NodeInfo("a", AddressFromURIString("akka://sys"), b.ref)
controller ! nodeA
EventFilter[DuplicateNode](occurrences = 1) intercept {
controller ! nodeB
}
a.expectMsg(Send(BarrierFailed("initial startup")))
b.expectMsg(Send(BarrierFailed("initial startup")))
}
"fail subsequent barriers if a node registers twice" in {
val controller = getController(1)
val a, b = TestProbe()
val nodeA = NodeInfo("a", AddressFromURIString("akka://sys"), a.ref)
val nodeB = NodeInfo("a", AddressFromURIString("akka://sys"), b.ref)
controller ! nodeA
a.expectMsg(Send(Done))
EventFilter[DuplicateNode](occurrences = 1) intercept {
controller ! nodeB
b.expectMsg(Send(BarrierFailed("initial startup")))
}
a.send(controller, EnterBarrier("x"))
a.expectMsg(Send(BarrierFailed("x")))
}
"finally have no failure messages left" in {
expectNoMsg(1 second)
}
}
private def getController(participants: Int): ActorRef = {
system.actorOf(Props(new Actor {
val controller = context.actorOf(Props(new Controller(participants)))
controller ! GetPort
override def supervisorStrategy = OneForOneStrategy() {
case x testActor ! Failed(controller, x); SupervisorStrategy.Restart
}
def receive = {
case x: Int testActor ! controller
}
}))
expectMsgType[ActorRef]
}
/**
* Produce a BarrierCoordinator which is supervised with a strategy which
* forwards all failures to the testActor.
*/
private def getBarrier(): ActorRef = {
system.actorOf(Props(new Actor {
val barrier = context.actorOf(Props[BarrierCoordinator])
override def supervisorStrategy = OneForOneStrategy() {
case x testActor ! Failed(barrier, x); SupervisorStrategy.Restart
}
def receive = {
case _ sender ! barrier
}
})) ! ""
expectMsgType[ActorRef]
}
private def noMsg(probes: TestProbe*) {
expectNoMsg(1 second)
probes foreach (_.msgAvailable must be(false))
}
}

View file

@ -0,0 +1,38 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.remote.testconductor
import akka.testkit.AkkaSpec
import akka.actor.Props
import akka.testkit.ImplicitSender
import akka.remote.testconductor.Controller.NodeInfo
import akka.actor.AddressFromURIString
object ControllerSpec {
val config = """
akka.testconductor.barrier-timeout = 5s
akka.actor.provider = akka.remote.RemoteActorRefProvider
akka.remote.netty.port = 0
akka.actor.debug.fsm = on
akka.actor.debug.lifecycle = on
"""
}
class ControllerSpec extends AkkaSpec(ControllerSpec.config) with ImplicitSender {
"A Controller" must {
"publish its nodes" in {
val c = system.actorOf(Props(new Controller(1)))
c ! NodeInfo("a", AddressFromURIString("akka://sys"), testActor)
expectMsg(Send(Done))
c ! NodeInfo("b", AddressFromURIString("akka://sys"), testActor)
expectMsg(Send(Done))
c ! Controller.GetNodes
expectMsgType[Iterable[String]].toSet must be(Set("a", "b"))
}
}
}