From 439f653427d4ad26d504b2b35633b2de9d421d8b Mon Sep 17 00:00:00 2001 From: Roland Date: Fri, 11 May 2012 11:31:44 +0200 Subject: [PATCH] add some tests for BarrierCoordinator and Controller --- .../akka/remote/testconductor/Conductor.scala | 129 +++-- .../akka/remote/testconductor/Extension.scala | 2 +- .../akka/remote/testconductor/Player.scala | 27 +- .../remote/testconductor/BarrierSpec.scala | 465 ++++++++++++++++++ .../remote/testconductor/ControllerSpec.scala | 38 ++ 5 files changed, 599 insertions(+), 62 deletions(-) create mode 100644 akka-remote-tests/src/test/scala/akka/remote/testconductor/BarrierSpec.scala create mode 100644 akka-remote-tests/src/test/scala/akka/remote/testconductor/ControllerSpec.scala diff --git a/akka-remote-tests/src/main/scala/akka/remote/testconductor/Conductor.scala b/akka-remote-tests/src/main/scala/akka/remote/testconductor/Conductor.scala index 347973a255..09a6faeeb0 100644 --- a/akka-remote-tests/src/main/scala/akka/remote/testconductor/Conductor.scala +++ b/akka-remote-tests/src/main/scala/akka/remote/testconductor/Conductor.scala @@ -1,5 +1,5 @@ /** - * Copyright (C) 2009-2011 Typesafe Inc. + * Copyright (C) 2009-2012 Typesafe Inc. */ package akka.remote.testconductor @@ -100,7 +100,7 @@ trait Conductor { this: TestConductorExt ⇒ * on average), but that is countered by using the actual execution time for * determining how much to send, leading to the correct output rate, but with * increased latency. - * + * * @param node is the symbolic name of the node which is to be affected * @param target is the symbolic name of the other node to which connectivity shall be throttled * @param direction can be either `Direction.Send`, `Direction.Receive` or `Direction.Both` @@ -116,7 +116,7 @@ trait Conductor { this: TestConductorExt ⇒ * sending and/or receiving: it will just drop all messages right before * submitting them to the Socket or right after receiving them from the * Socket. - * + * * @param node is the symbolic name of the node which is to be affected * @param target is the symbolic name of the other node to which connectivity shall be impeded * @param direction can be either `Direction.Send`, `Direction.Receive` or `Direction.Both` @@ -130,7 +130,7 @@ trait Conductor { this: TestConductorExt ⇒ * Tell the remote support to shutdown the connection to the given remote * peer. It works regardless of whether the recipient was initiator or * responder. - * + * * @param node is the symbolic name of the node which is to be affected * @param target is the symbolic name of the other node to which connectivity shall be impeded */ @@ -143,7 +143,7 @@ trait Conductor { this: TestConductorExt ⇒ * Tell the remote support to TCP_RESET the connection to the given remote * peer. It works regardless of whether the recipient was initiator or * responder. - * + * * @param node is the symbolic name of the node which is to be affected * @param target is the symbolic name of the other node to which connectivity shall be impeded */ @@ -155,7 +155,7 @@ trait Conductor { this: TestConductorExt ⇒ /** * Tell the remote node to shut itself down using System.exit with the given * exitValue. - * + * * @param node is the symbolic name of the node which is to be affected * @param exitValue is the return code which shall be given to System.exit */ @@ -166,7 +166,7 @@ trait Conductor { this: TestConductorExt ⇒ /** * Tell the SBT plugin to forcibly terminate the given remote node using Process.destroy. - * + * * @param node is the symbolic name of the node which is to be affected */ def kill(node: String): Future[Done] = { @@ -177,7 +177,7 @@ trait Conductor { this: TestConductorExt ⇒ /** * Obtain the list of remote host names currently registered. */ - def getNodes: Future[List[String]] = { + def getNodes: Future[Iterable[String]] = { import Settings.QueryTimeout controller ? GetNodes mapTo } @@ -187,7 +187,7 @@ trait Conductor { this: TestConductorExt ⇒ * pass subsequent barriers. This must be done before the client connection * breaks down in order to affect an “orderly” removal (i.e. without failing * present and future barriers). - * + * * @param node is the symbolic name of the node which is to be removed */ def removeNode(node: String): Future[Done] = { @@ -330,22 +330,32 @@ object Controller { * [[akka.remote.testconductor.BarrierCoordinator]], its child) and allowing * network and other failures to be injected at the test nodes. */ -class Controller(_participants: Int) extends Actor { +class Controller(private var initialParticipants: Int) extends Actor { import Controller._ - - var initialParticipants = _participants + import BarrierCoordinator._ val settings = TestConductor().Settings val connection = RemoteConnection(Server, settings.host, settings.port, new ConductorHandler(context.system, self, Logging(context.system, "ConductorHandler"))) + /* + * Supervision of the BarrierCoordinator means to catch all his bad emotions + * and sometimes console him (BarrierEmpty, BarrierTimeout), sometimes tell + * him to hate the world (WrongBarrier, DuplicateNode, ClientLost). The latter shall help + * terminate broken tests as quickly as possible (i.e. without awaiting + * BarrierTimeouts in the players). + */ override def supervisorStrategy = OneForOneStrategy() { - case e: BarrierCoordinator.BarrierTimeoutException ⇒ SupervisorStrategy.Resume - case e: BarrierCoordinator.WrongBarrierException ⇒ - for (NodeInfo(c, _, _) ← e.data.clients; info ← nodes get c) - barrier ! NodeInfo(c, info.addr, info.fsm) - for (c ← e.data.arrived) c ! BarrierFailed(e.barrier) - SupervisorStrategy.Restart + case BarrierTimeout(data) ⇒ SupervisorStrategy.Resume + case BarrierEmpty(data, msg) ⇒ SupervisorStrategy.Resume + case WrongBarrier(name, client, data) ⇒ client ! Send(BarrierFailed(name)); failBarrier(data) + case ClientLost(data, node) ⇒ failBarrier(data) + case DuplicateNode(data, node) ⇒ failBarrier(data) + } + + def failBarrier(data: Data): SupervisorStrategy.Directive = { + for (c ← data.arrived) c ! Send(BarrierFailed(data.barrier)) + SupervisorStrategy.Restart } val barrier = context.actorOf(Props[BarrierCoordinator], "barriers") @@ -353,12 +363,20 @@ class Controller(_participants: Int) extends Actor { override def receive = LoggingReceive { case c @ NodeInfo(name, addr, fsm) ⇒ - nodes += name -> c barrier forward c - if (initialParticipants <= 0) sender ! Done - else if (nodes.size == initialParticipants) { - for (NodeInfo(_, _, client) ← nodes.values) client ! Send(Done) - initialParticipants = 0 + if (nodes contains name) { + if (initialParticipants > 0) { + for (NodeInfo(_, _, client) ← nodes.values) client ! Send(BarrierFailed("initial startup")) + initialParticipants = 0 + } + fsm ! Send(BarrierFailed("initial startup")) + } else { + nodes += name -> c + if (initialParticipants <= 0) fsm ! Send(Done) + else if (nodes.size == initialParticipants) { + for (NodeInfo(_, _, client) ← nodes.values) client ! Send(Done) + initialParticipants = 0 + } } case c @ ClientDisconnected(name) ⇒ nodes -= name @@ -396,8 +414,16 @@ object BarrierCoordinator { case class RemoveClient(name: String) case class Data(clients: Set[Controller.NodeInfo], barrier: String, arrived: List[ActorRef]) - class BarrierTimeoutException(val data: Data) extends RuntimeException(data.barrier) with NoStackTrace - class WrongBarrierException(val barrier: String, val client: ActorRef, val data: Data) extends RuntimeException(barrier) with NoStackTrace + + trait Printer { this: Product with Throwable with NoStackTrace ⇒ + override def toString = productPrefix + productIterator.mkString("(", ", ", ")") + } + + case class BarrierTimeout(data: Data) extends RuntimeException(data.barrier) with NoStackTrace with Printer + case class DuplicateNode(data: Data, node: Controller.NodeInfo) extends RuntimeException with NoStackTrace with Printer + case class WrongBarrier(barrier: String, client: ActorRef, data: Data) extends RuntimeException(barrier) with NoStackTrace with Printer + case class BarrierEmpty(data: Data, msg: String) extends RuntimeException(msg) with NoStackTrace with Printer + case class ClientLost(data: Data, client: String) extends RuntimeException with NoStackTrace with Printer } /** @@ -426,26 +452,28 @@ class BarrierCoordinator extends Actor with LoggingFSM[BarrierCoordinator.State, whenUnhandled { case Event(n: NodeInfo, d @ Data(clients, _, _)) ⇒ + if (clients.find(_.name == n.name).isDefined) throw new DuplicateNode(d, n) stay using d.copy(clients = clients + n) + case Event(ClientDisconnected(name), d @ Data(clients, _, arrived)) ⇒ + if (clients.isEmpty) throw BarrierEmpty(d, "no client to disconnect") + (clients find (_.name == name)) match { + case None ⇒ stay + case Some(c) ⇒ throw ClientLost(d.copy(clients = clients - c, arrived = arrived filterNot (_ == c.fsm)), name) + } } when(Idle) { - case Event(EnterBarrier(name), d @ Data(clients, _, _)) ⇒ - if (clients.isEmpty) throw new IllegalStateException("no client expected yet") + case Event(e @ EnterBarrier(name), d @ Data(clients, _, _)) ⇒ if (failed) - stay replying BarrierFailed(name) + stay replying Send(BarrierFailed(name)) + else if (clients.map(_.fsm) == Set(sender)) + stay replying Send(e) + else if (clients.find(_.fsm == sender).isEmpty) + stay replying Send(BarrierFailed(name)) else goto(Waiting) using d.copy(barrier = name, arrived = sender :: Nil) - case Event(ClientDisconnected(name), d @ Data(clients, _, _)) ⇒ - if (clients.isEmpty) throw new IllegalStateException("no client to disconnect") - (clients filterNot (_.name == name)) match { - case `clients` ⇒ stay - case c ⇒ - failed = true - stay using d.copy(clients = c) - } case Event(RemoveClient(name), d @ Data(clients, _, _)) ⇒ - if (clients.isEmpty) throw new IllegalStateException("no client to remove") + if (clients.isEmpty) throw BarrierEmpty(d, "no client to remove") stay using d.copy(clients = clients filterNot (_.name == name)) } @@ -456,36 +484,33 @@ class BarrierCoordinator extends Actor with LoggingFSM[BarrierCoordinator.State, when(Waiting) { case Event(e @ EnterBarrier(name), d @ Data(clients, barrier, arrived)) ⇒ - if (name != barrier) throw new WrongBarrierException(barrier, sender, d) + if (name != barrier || clients.find(_.fsm == sender).isEmpty) throw WrongBarrier(name, sender, d) val together = sender :: arrived handleBarrier(d.copy(arrived = together)) case Event(RemoveClient(name), d @ Data(clients, barrier, arrived)) ⇒ - val newClients = clients filterNot (_.name == name) - val newArrived = arrived filterNot (_ == name) - handleBarrier(d.copy(clients = newClients, arrived = newArrived)) - case Event(ClientDisconnected(name), d @ Data(clients, barrier, arrived)) ⇒ - (clients filterNot (_.name == name)) match { - case `clients` ⇒ stay - case c ⇒ - val f = BarrierFailed(barrier) - arrived foreach (_ ! Send(f)) - failed = true - goto(Idle) using Data(c, "", Nil) + clients find (_.name == name) match { + case None ⇒ stay + case Some(client) ⇒ + handleBarrier(d.copy(clients = clients - client, arrived = arrived filterNot (_ == client.fsm))) } case Event(StateTimeout, data) ⇒ - throw new BarrierTimeoutException(data) + throw BarrierTimeout(data) } initialize - def handleBarrier(data: Data): State = - if ((data.clients.map(_.fsm) -- data.arrived).isEmpty) { + def handleBarrier(data: Data): State = { + log.debug("handleBarrier({})", data) + if (data.arrived.isEmpty) { + goto(Idle) using data.copy(barrier = "") + } else if ((data.clients.map(_.fsm) -- data.arrived).isEmpty) { val e = EnterBarrier(data.barrier) data.arrived foreach (_ ! Send(e)) goto(Idle) using data.copy(barrier = "", arrived = Nil) } else { stay using data } + } } diff --git a/akka-remote-tests/src/main/scala/akka/remote/testconductor/Extension.scala b/akka-remote-tests/src/main/scala/akka/remote/testconductor/Extension.scala index ff1d77fb9d..5d7826c60c 100644 --- a/akka-remote-tests/src/main/scala/akka/remote/testconductor/Extension.scala +++ b/akka-remote-tests/src/main/scala/akka/remote/testconductor/Extension.scala @@ -13,7 +13,7 @@ import akka.actor.Address /** * Access to the [[akka.remote.testconductor.TestConductorExt]] extension: - * + * * {{{ * val tc = TestConductor(system) * tc.startController(numPlayers) diff --git a/akka-remote-tests/src/main/scala/akka/remote/testconductor/Player.scala b/akka-remote-tests/src/main/scala/akka/remote/testconductor/Player.scala index 38d0f6ef34..a82a090b23 100644 --- a/akka-remote-tests/src/main/scala/akka/remote/testconductor/Player.scala +++ b/akka-remote-tests/src/main/scala/akka/remote/testconductor/Player.scala @@ -55,9 +55,9 @@ trait Player { this: TestConductorExt ⇒ def receive = { case fsm: ActorRef ⇒ waiting = sender; fsm ! SubscribeTransitionCallBack(self) case Transition(_, Connecting, AwaitDone) ⇒ // step 1, not there yet - case Transition(_, AwaitDone, Connected) ⇒ waiting ! Done - case t: Transition[_] ⇒ waiting ! Status.Failure(new RuntimeException("unexpected transition: " + t)) - case CurrentState(_, Connected) ⇒ waiting ! Done + case Transition(_, AwaitDone, Connected) ⇒ waiting ! Done; context stop self + case t: Transition[_] ⇒ waiting ! Status.Failure(new RuntimeException("unexpected transition: " + t)); context stop self + case CurrentState(_, Connected) ⇒ waiting ! Done; context stop self case _: CurrentState[_] ⇒ } })) @@ -84,6 +84,7 @@ object ClientFSM { case object Connecting extends State case object AwaitDone extends State case object Connected extends State + case object Failed extends State case class Data(channel: Channel, barrier: Option[(String, ActorRef)]) @@ -116,24 +117,24 @@ class ClientFSM(port: Int) extends Actor with LoggingFSM[ClientFSM.State, Client channel.write(Hello(settings.name, TestConductor().address)) goto(AwaitDone) case Event(_: ConnectionFailure, _) ⇒ - // System.exit(1) - stop + goto(Failed) case Event(StateTimeout, _) ⇒ log.error("connect timeout to TestConductor") - // System.exit(1) - stop + goto(Failed) } when(AwaitDone, stateTimeout = settings.BarrierTimeout.duration) { case Event(Done, _) ⇒ log.debug("received Done: starting test") goto(Connected) + case Event(msg: NetworkOp, _) ⇒ + log.error("received {} instead of Done", msg) + goto(Failed) case Event(msg: ClientOp, _) ⇒ stay replying Status.Failure(new IllegalStateException("not connected yet")) case Event(StateTimeout, _) ⇒ log.error("connect timeout to TestConductor") - // System.exit(1) - stop + goto(Failed) } when(Connected) { @@ -180,6 +181,14 @@ class ClientFSM(port: Int) extends Actor with LoggingFSM[ClientFSM.State, Client stay // needed because Java doesn’t have Nothing } + when(Failed) { + case Event(msg: ClientOp, _) ⇒ + stay replying Status.Failure(new RuntimeException("cannot do " + msg + " while Failed")) + case Event(msg: NetworkOp, _) ⇒ + log.warning("ignoring network message {} while Failed", msg) + stay + } + onTermination { case StopEvent(_, _, Data(channel, _)) ⇒ channel.close() diff --git a/akka-remote-tests/src/test/scala/akka/remote/testconductor/BarrierSpec.scala b/akka-remote-tests/src/test/scala/akka/remote/testconductor/BarrierSpec.scala new file mode 100644 index 0000000000..f0b668d1ed --- /dev/null +++ b/akka-remote-tests/src/test/scala/akka/remote/testconductor/BarrierSpec.scala @@ -0,0 +1,465 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ +package akka.remote.testconductor + +import akka.testkit.AkkaSpec +import akka.actor.Props +import akka.actor.AddressFromURIString +import akka.actor.ActorRef +import akka.testkit.ImplicitSender +import akka.actor.Actor +import akka.actor.OneForOneStrategy +import akka.actor.SupervisorStrategy +import akka.testkit.EventFilter +import akka.testkit.TestProbe +import akka.util.duration._ +import akka.event.Logging +import org.scalatest.BeforeAndAfterEach + +object BarrierSpec { + case class Failed(ref: ActorRef, thr: Throwable) + val config = """ + akka.testconductor.barrier-timeout = 5s + akka.actor.provider = akka.remote.RemoteActorRefProvider + akka.remote.netty.port = 0 + akka.actor.debug.fsm = on + akka.actor.debug.lifecycle = on + """ +} + +class BarrierSpec extends AkkaSpec(BarrierSpec.config) with ImplicitSender with BeforeAndAfterEach { + + import BarrierSpec._ + import Controller._ + import BarrierCoordinator._ + + override def afterEach { + system.eventStream.setLogLevel(Logging.WarningLevel) + } + + "A BarrierCoordinator" must { + + "register clients and remove them" in { + val b = getBarrier() + b ! NodeInfo("a", AddressFromURIString("akka://sys"), system.deadLetters) + b ! RemoveClient("b") + b ! RemoveClient("a") + EventFilter[BarrierEmpty](occurrences = 1) intercept { + b ! RemoveClient("a") + } + expectMsg(Failed(b, BarrierEmpty(Data(Set(), "", Nil), "no client to remove"))) + } + + "register clients and disconnect them" in { + val b = getBarrier() + b ! NodeInfo("a", AddressFromURIString("akka://sys"), system.deadLetters) + b ! ClientDisconnected("b") + EventFilter[ClientLost](occurrences = 1) intercept { + b ! ClientDisconnected("a") + } + expectMsg(Failed(b, ClientLost(Data(Set(), "", Nil), "a"))) + EventFilter[BarrierEmpty](occurrences = 1) intercept { + b ! ClientDisconnected("a") + } + expectMsg(Failed(b, BarrierEmpty(Data(Set(), "", Nil), "no client to disconnect"))) + } + + "fail entering barrier when nobody registered" in { + val b = getBarrier() + b ! EnterBarrier("b") + expectMsg(Send(BarrierFailed("b"))) + } + + "enter barrier" in { + val barrier = getBarrier() + val a, b = TestProbe() + barrier ! NodeInfo("a", AddressFromURIString("akka://sys"), a.ref) + barrier ! NodeInfo("b", AddressFromURIString("akka://sys"), b.ref) + a.send(barrier, EnterBarrier("bar")) + noMsg(a, b) + within(1 second) { + b.send(barrier, EnterBarrier("bar")) + a.expectMsg(Send(EnterBarrier("bar"))) + b.expectMsg(Send(EnterBarrier("bar"))) + } + } + + "enter barrier with joining node" in { + val barrier = getBarrier() + val a, b, c = TestProbe() + barrier ! NodeInfo("a", AddressFromURIString("akka://sys"), a.ref) + barrier ! NodeInfo("b", AddressFromURIString("akka://sys"), b.ref) + a.send(barrier, EnterBarrier("bar")) + barrier ! NodeInfo("c", AddressFromURIString("akka://sys"), c.ref) + b.send(barrier, EnterBarrier("bar")) + noMsg(a, b, c) + within(1 second) { + c.send(barrier, EnterBarrier("bar")) + a.expectMsg(Send(EnterBarrier("bar"))) + b.expectMsg(Send(EnterBarrier("bar"))) + c.expectMsg(Send(EnterBarrier("bar"))) + } + } + + "enter barrier with leaving node" in { + val barrier = getBarrier() + val a, b, c = TestProbe() + barrier ! NodeInfo("a", AddressFromURIString("akka://sys"), a.ref) + barrier ! NodeInfo("b", AddressFromURIString("akka://sys"), b.ref) + barrier ! NodeInfo("c", AddressFromURIString("akka://sys"), c.ref) + a.send(barrier, EnterBarrier("bar")) + b.send(barrier, EnterBarrier("bar")) + barrier ! RemoveClient("a") + barrier ! ClientDisconnected("a") + noMsg(a, b, c) + b.within(1 second) { + barrier ! RemoveClient("c") + b.expectMsg(Send(EnterBarrier("bar"))) + } + barrier ! ClientDisconnected("c") + expectNoMsg(1 second) + } + + "leave barrier when last “arrived” is removed" in { + val barrier = getBarrier() + val a, b = TestProbe() + barrier ! NodeInfo("a", AddressFromURIString("akka://sys"), a.ref) + barrier ! NodeInfo("b", AddressFromURIString("akka://sys"), b.ref) + a.send(barrier, EnterBarrier("bar")) + barrier ! RemoveClient("a") + b.send(barrier, EnterBarrier("foo")) + b.expectMsg(Send(EnterBarrier("foo"))) + } + + "fail barrier with disconnecing node" in { + val barrier = getBarrier() + val a, b = TestProbe() + val nodeA = NodeInfo("a", AddressFromURIString("akka://sys"), a.ref) + barrier ! nodeA + barrier ! NodeInfo("b", AddressFromURIString("akka://sys"), b.ref) + a.send(barrier, EnterBarrier("bar")) + EventFilter[ClientLost](occurrences = 1) intercept { + barrier ! ClientDisconnected("b") + } + expectMsg(Failed(barrier, ClientLost(Data(Set(nodeA), "bar", a.ref :: Nil), "b"))) + } + + "fail barrier with disconnecing node who already arrived" in { + val barrier = getBarrier() + val a, b, c = TestProbe() + val nodeA = NodeInfo("a", AddressFromURIString("akka://sys"), a.ref) + val nodeC = NodeInfo("c", AddressFromURIString("akka://sys"), c.ref) + barrier ! nodeA + barrier ! NodeInfo("b", AddressFromURIString("akka://sys"), b.ref) + barrier ! nodeC + a.send(barrier, EnterBarrier("bar")) + b.send(barrier, EnterBarrier("bar")) + EventFilter[ClientLost](occurrences = 1) intercept { + barrier ! ClientDisconnected("b") + } + expectMsg(Failed(barrier, ClientLost(Data(Set(nodeA, nodeC), "bar", a.ref :: Nil), "b"))) + } + + "fail when entering wrong barrier" in { + val barrier = getBarrier() + val a, b = TestProbe() + val nodeA = NodeInfo("a", AddressFromURIString("akka://sys"), a.ref) + barrier ! nodeA + val nodeB = NodeInfo("b", AddressFromURIString("akka://sys"), b.ref) + barrier ! nodeB + a.send(barrier, EnterBarrier("bar")) + EventFilter[WrongBarrier](occurrences = 1) intercept { + b.send(barrier, EnterBarrier("foo")) + } + expectMsg(Failed(barrier, WrongBarrier("foo", b.ref, Data(Set(nodeA, nodeB), "bar", a.ref :: Nil)))) + } + + "fail barrier after first failure" in { + val barrier = getBarrier() + val a = TestProbe() + EventFilter[BarrierEmpty](occurrences = 1) intercept { + barrier ! RemoveClient("a") + } + expectMsg(Failed(barrier, BarrierEmpty(Data(Set(), "", Nil), "no client to remove"))) + barrier ! NodeInfo("a", AddressFromURIString("akka://sys"), a.ref) + a.send(barrier, EnterBarrier("right")) + a.expectMsg(Send(BarrierFailed("right"))) + } + + "fail after barrier timeout" in { + val barrier = getBarrier() + val a, b = TestProbe() + val nodeA = NodeInfo("a", AddressFromURIString("akka://sys"), a.ref) + val nodeB = NodeInfo("b", AddressFromURIString("akka://sys"), b.ref) + barrier ! nodeA + barrier ! nodeB + a.send(barrier, EnterBarrier("right")) + EventFilter[BarrierTimeout](occurrences = 1) intercept { + expectMsg(7 seconds, Failed(barrier, BarrierTimeout(Data(Set(nodeA, nodeB), "right", a.ref :: Nil)))) + } + } + + "fail if a node registers twice" in { + val barrier = getBarrier() + val a, b = TestProbe() + val nodeA = NodeInfo("a", AddressFromURIString("akka://sys"), a.ref) + val nodeB = NodeInfo("a", AddressFromURIString("akka://sys"), b.ref) + barrier ! nodeA + EventFilter[DuplicateNode](occurrences = 1) intercept { + barrier ! nodeB + } + expectMsg(Failed(barrier, DuplicateNode(Data(Set(nodeA), "", Nil), nodeB))) + } + + "finally have no failure messages left" in { + expectNoMsg(1 second) + } + + } + + "A Controller with BarrierCoordinator" must { + + "register clients and remove them" in { + val b = getController(1) + b ! NodeInfo("a", AddressFromURIString("akka://sys"), testActor) + expectMsg(Send(Done)) + b ! Remove("b") + b ! Remove("a") + EventFilter[BarrierEmpty](occurrences = 1) intercept { + b ! Remove("a") + } + } + + "register clients and disconnect them" in { + val b = getController(1) + b ! NodeInfo("a", AddressFromURIString("akka://sys"), testActor) + expectMsg(Send(Done)) + b ! ClientDisconnected("b") + EventFilter[ClientLost](occurrences = 1) intercept { + b ! ClientDisconnected("a") + } + EventFilter[BarrierEmpty](occurrences = 1) intercept { + b ! ClientDisconnected("a") + } + } + + "fail entering barrier when nobody registered" in { + val b = getController(0) + b ! EnterBarrier("b") + expectMsg(Send(BarrierFailed("b"))) + } + + "enter barrier" in { + val barrier = getController(2) + val a, b = TestProbe() + barrier ! NodeInfo("a", AddressFromURIString("akka://sys"), a.ref) + barrier ! NodeInfo("b", AddressFromURIString("akka://sys"), b.ref) + a.expectMsg(Send(Done)) + b.expectMsg(Send(Done)) + a.send(barrier, EnterBarrier("bar")) + noMsg(a, b) + within(1 second) { + b.send(barrier, EnterBarrier("bar")) + a.expectMsg(Send(EnterBarrier("bar"))) + b.expectMsg(Send(EnterBarrier("bar"))) + } + } + + "enter barrier with joining node" in { + val barrier = getController(2) + val a, b, c = TestProbe() + barrier ! NodeInfo("a", AddressFromURIString("akka://sys"), a.ref) + barrier ! NodeInfo("b", AddressFromURIString("akka://sys"), b.ref) + a.expectMsg(Send(Done)) + b.expectMsg(Send(Done)) + a.send(barrier, EnterBarrier("bar")) + barrier ! NodeInfo("c", AddressFromURIString("akka://sys"), c.ref) + c.expectMsg(Send(Done)) + b.send(barrier, EnterBarrier("bar")) + noMsg(a, b, c) + within(1 second) { + c.send(barrier, EnterBarrier("bar")) + a.expectMsg(Send(EnterBarrier("bar"))) + b.expectMsg(Send(EnterBarrier("bar"))) + c.expectMsg(Send(EnterBarrier("bar"))) + } + } + + "enter barrier with leaving node" in { + val barrier = getController(3) + val a, b, c = TestProbe() + barrier ! NodeInfo("a", AddressFromURIString("akka://sys"), a.ref) + barrier ! NodeInfo("b", AddressFromURIString("akka://sys"), b.ref) + barrier ! NodeInfo("c", AddressFromURIString("akka://sys"), c.ref) + a.expectMsg(Send(Done)) + b.expectMsg(Send(Done)) + c.expectMsg(Send(Done)) + a.send(barrier, EnterBarrier("bar")) + b.send(barrier, EnterBarrier("bar")) + barrier ! Remove("a") + barrier ! ClientDisconnected("a") + noMsg(a, b, c) + b.within(1 second) { + barrier ! Remove("c") + b.expectMsg(Send(EnterBarrier("bar"))) + } + barrier ! ClientDisconnected("c") + expectNoMsg(1 second) + } + + "leave barrier when last “arrived” is removed" in { + val barrier = getController(2) + val a, b = TestProbe() + barrier ! NodeInfo("a", AddressFromURIString("akka://sys"), a.ref) + barrier ! NodeInfo("b", AddressFromURIString("akka://sys"), b.ref) + a.expectMsg(Send(Done)) + b.expectMsg(Send(Done)) + a.send(barrier, EnterBarrier("bar")) + barrier ! Remove("a") + b.send(barrier, EnterBarrier("foo")) + b.expectMsg(Send(EnterBarrier("foo"))) + } + + "fail barrier with disconnecing node" in { + val barrier = getController(2) + val a, b = TestProbe() + val nodeA = NodeInfo("a", AddressFromURIString("akka://sys"), a.ref) + barrier ! nodeA + barrier ! NodeInfo("b", AddressFromURIString("akka://sys"), b.ref) + a.expectMsg(Send(Done)) + b.expectMsg(Send(Done)) + a.send(barrier, EnterBarrier("bar")) + barrier ! ClientDisconnected("unknown") + noMsg(a) + EventFilter[ClientLost](occurrences = 1) intercept { + barrier ! ClientDisconnected("b") + } + a.expectMsg(Send(BarrierFailed("bar"))) + } + + "fail barrier with disconnecing node who already arrived" in { + val barrier = getController(3) + val a, b, c = TestProbe() + val nodeA = NodeInfo("a", AddressFromURIString("akka://sys"), a.ref) + val nodeC = NodeInfo("c", AddressFromURIString("akka://sys"), c.ref) + barrier ! nodeA + barrier ! NodeInfo("b", AddressFromURIString("akka://sys"), b.ref) + barrier ! nodeC + a.expectMsg(Send(Done)) + b.expectMsg(Send(Done)) + c.expectMsg(Send(Done)) + a.send(barrier, EnterBarrier("bar")) + b.send(barrier, EnterBarrier("bar")) + EventFilter[ClientLost](occurrences = 1) intercept { + barrier ! ClientDisconnected("b") + } + a.expectMsg(Send(BarrierFailed("bar"))) + } + + "fail when entering wrong barrier" in { + val barrier = getController(2) + val a, b = TestProbe() + val nodeA = NodeInfo("a", AddressFromURIString("akka://sys"), a.ref) + barrier ! nodeA + val nodeB = NodeInfo("b", AddressFromURIString("akka://sys"), b.ref) + barrier ! nodeB + a.expectMsg(Send(Done)) + b.expectMsg(Send(Done)) + a.send(barrier, EnterBarrier("bar")) + EventFilter[WrongBarrier](occurrences = 1) intercept { + b.send(barrier, EnterBarrier("foo")) + } + a.expectMsg(Send(BarrierFailed("bar"))) + b.expectMsg(Send(BarrierFailed("foo"))) + } + + "not really fail after barrier timeout" in { + val barrier = getController(2) + val a, b = TestProbe() + val nodeA = NodeInfo("a", AddressFromURIString("akka://sys"), a.ref) + val nodeB = NodeInfo("b", AddressFromURIString("akka://sys"), b.ref) + barrier ! nodeA + barrier ! nodeB + a.expectMsg(Send(Done)) + b.expectMsg(Send(Done)) + a.send(barrier, EnterBarrier("right")) + EventFilter[BarrierTimeout](occurrences = 1) intercept { + Thread.sleep(5000) + } + b.send(barrier, EnterBarrier("right")) + a.expectMsg(Send(EnterBarrier("right"))) + b.expectMsg(Send(EnterBarrier("right"))) + } + + "fail if a node registers twice" in { + val controller = getController(2) + val a, b = TestProbe() + val nodeA = NodeInfo("a", AddressFromURIString("akka://sys"), a.ref) + val nodeB = NodeInfo("a", AddressFromURIString("akka://sys"), b.ref) + controller ! nodeA + EventFilter[DuplicateNode](occurrences = 1) intercept { + controller ! nodeB + } + a.expectMsg(Send(BarrierFailed("initial startup"))) + b.expectMsg(Send(BarrierFailed("initial startup"))) + } + + "fail subsequent barriers if a node registers twice" in { + val controller = getController(1) + val a, b = TestProbe() + val nodeA = NodeInfo("a", AddressFromURIString("akka://sys"), a.ref) + val nodeB = NodeInfo("a", AddressFromURIString("akka://sys"), b.ref) + controller ! nodeA + a.expectMsg(Send(Done)) + EventFilter[DuplicateNode](occurrences = 1) intercept { + controller ! nodeB + b.expectMsg(Send(BarrierFailed("initial startup"))) + } + a.send(controller, EnterBarrier("x")) + a.expectMsg(Send(BarrierFailed("x"))) + } + + "finally have no failure messages left" in { + expectNoMsg(1 second) + } + + } + + private def getController(participants: Int): ActorRef = { + system.actorOf(Props(new Actor { + val controller = context.actorOf(Props(new Controller(participants))) + controller ! GetPort + override def supervisorStrategy = OneForOneStrategy() { + case x ⇒ testActor ! Failed(controller, x); SupervisorStrategy.Restart + } + def receive = { + case x: Int ⇒ testActor ! controller + } + })) + expectMsgType[ActorRef] + } + + /** + * Produce a BarrierCoordinator which is supervised with a strategy which + * forwards all failures to the testActor. + */ + private def getBarrier(): ActorRef = { + system.actorOf(Props(new Actor { + val barrier = context.actorOf(Props[BarrierCoordinator]) + override def supervisorStrategy = OneForOneStrategy() { + case x ⇒ testActor ! Failed(barrier, x); SupervisorStrategy.Restart + } + def receive = { + case _ ⇒ sender ! barrier + } + })) ! "" + expectMsgType[ActorRef] + } + + private def noMsg(probes: TestProbe*) { + expectNoMsg(1 second) + probes foreach (_.msgAvailable must be(false)) + } + +} \ No newline at end of file diff --git a/akka-remote-tests/src/test/scala/akka/remote/testconductor/ControllerSpec.scala b/akka-remote-tests/src/test/scala/akka/remote/testconductor/ControllerSpec.scala new file mode 100644 index 0000000000..db0e3cfe69 --- /dev/null +++ b/akka-remote-tests/src/test/scala/akka/remote/testconductor/ControllerSpec.scala @@ -0,0 +1,38 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ +package akka.remote.testconductor + +import akka.testkit.AkkaSpec +import akka.actor.Props +import akka.testkit.ImplicitSender +import akka.remote.testconductor.Controller.NodeInfo +import akka.actor.AddressFromURIString + +object ControllerSpec { + val config = """ + akka.testconductor.barrier-timeout = 5s + akka.actor.provider = akka.remote.RemoteActorRefProvider + akka.remote.netty.port = 0 + akka.actor.debug.fsm = on + akka.actor.debug.lifecycle = on + """ +} + +class ControllerSpec extends AkkaSpec(ControllerSpec.config) with ImplicitSender { + + "A Controller" must { + + "publish its nodes" in { + val c = system.actorOf(Props(new Controller(1))) + c ! NodeInfo("a", AddressFromURIString("akka://sys"), testActor) + expectMsg(Send(Done)) + c ! NodeInfo("b", AddressFromURIString("akka://sys"), testActor) + expectMsg(Send(Done)) + c ! Controller.GetNodes + expectMsgType[Iterable[String]].toSet must be(Set("a", "b")) + } + + } + +} \ No newline at end of file