diff --git a/akka-remote-tests/src/main/scala/akka/remote/testconductor/Conductor.scala b/akka-remote-tests/src/main/scala/akka/remote/testconductor/Conductor.scala index b25bd1838c..347973a255 100644 --- a/akka-remote-tests/src/main/scala/akka/remote/testconductor/Conductor.scala +++ b/akka-remote-tests/src/main/scala/akka/remote/testconductor/Conductor.scala @@ -25,8 +25,26 @@ import akka.dispatch.Future import akka.actor.OneForOneStrategy import akka.actor.SupervisorStrategy import java.util.concurrent.ConcurrentHashMap +import akka.actor.Status -trait Conductor extends RunControl with FailureInject { this: TestConductorExt ⇒ +sealed trait Direction + +object Direction { + case object Send extends Direction + case object Receive extends Direction + case object Both extends Direction +} + +/** + * The conductor is the one orchestrating the test: it governs the + * [[akka.remote.testconductor.Controller]]’s port to which all + * [[akka.remote.testconductor.Player]]s connect, it issues commands to their + * [[akka.remote.testconductor.NetworkFailureInjector]] and provides support + * for barriers using the [[akka.remote.testconductor.BarrierCoordinator]]. + * All of this is bundled inside the [[akka.remote.testconductor.TestConductorExt]] + * extension. + */ +trait Conductor { this: TestConductorExt ⇒ import Controller._ @@ -36,60 +54,154 @@ trait Conductor extends RunControl with FailureInject { this: TestConductorExt case x ⇒ x } - override def startController(participants: Int): Future[Int] = { + /** + * Start the [[akka.remote.testconductor.Controller]], which in turn will + * bind to a TCP port as specified in the `akka.testconductor.port` config + * property, where 0 denotes automatic allocation. Since the latter is + * actually preferred, a `Future[Int]` is returned which will be completed + * with the port number actually chosen, so that this can then be communicated + * to the players for their proper start-up. + * + * This method also invokes [[akka.remote.testconductor.Player]].startClient, + * since it is expected that the conductor participates in barriers for + * overall coordination. The returned Future will only be completed once the + * client’s start-up finishes, which in fact waits for all other players to + * connect. + * + * @param participants gives the number of participants which shall connect + * before any of their startClient() operations complete. + */ + def startController(participants: Int): Future[Int] = { if (_controller ne null) throw new RuntimeException("TestConductorServer was already started") _controller = system.actorOf(Props(new Controller(participants)), "controller") import Settings.BarrierTimeout controller ? GetPort flatMap { case port: Int ⇒ startClient(port) map (_ ⇒ port) } } - override def port: Future[Int] = { + /** + * Obtain the port to which the controller’s socket is actually bound. This + * will deviate from the configuration in `akka.testconductor.port` in case + * that was given as zero. + */ + def port: Future[Int] = { import Settings.QueryTimeout controller ? GetPort mapTo } - override def throttle(node: String, target: String, direction: Direction, rateMBit: Double): Future[Done] = { + /** + * Make the remoting pipeline on the node throttle data sent to or received + * from the given remote peer. Throttling works by delaying packet submission + * within the netty pipeline until the packet would have been completely sent + * according to the given rate, the previous packet completion and the current + * packet length. In case of large packets they are split up if the calculated + * send pause would exceed `akka.testconductor.packet-split-threshold` + * (roughly). All of this uses the system’s HashedWheelTimer, which is not + * terribly precise and will execute tasks later than they are schedule (even + * on average), but that is countered by using the actual execution time for + * determining how much to send, leading to the correct output rate, but with + * increased latency. + * + * @param node is the symbolic name of the node which is to be affected + * @param target is the symbolic name of the other node to which connectivity shall be throttled + * @param direction can be either `Direction.Send`, `Direction.Receive` or `Direction.Both` + * @param rateMBit is the maximum data rate in MBit + */ + def throttle(node: String, target: String, direction: Direction, rateMBit: Double): Future[Done] = { import Settings.QueryTimeout controller ? Throttle(node, target, direction, rateMBit.toFloat) mapTo } - override def blackhole(node: String, target: String, direction: Direction): Future[Done] = { + /** + * Switch the Netty pipeline of the remote support into blackhole mode for + * sending and/or receiving: it will just drop all messages right before + * submitting them to the Socket or right after receiving them from the + * Socket. + * + * @param node is the symbolic name of the node which is to be affected + * @param target is the symbolic name of the other node to which connectivity shall be impeded + * @param direction can be either `Direction.Send`, `Direction.Receive` or `Direction.Both` + */ + def blackhole(node: String, target: String, direction: Direction): Future[Done] = { import Settings.QueryTimeout controller ? Throttle(node, target, direction, 0f) mapTo } - override def disconnect(node: String, target: String): Future[Done] = { + /** + * Tell the remote support to shutdown the connection to the given remote + * peer. It works regardless of whether the recipient was initiator or + * responder. + * + * @param node is the symbolic name of the node which is to be affected + * @param target is the symbolic name of the other node to which connectivity shall be impeded + */ + def disconnect(node: String, target: String): Future[Done] = { import Settings.QueryTimeout controller ? Disconnect(node, target, false) mapTo } - override def abort(node: String, target: String): Future[Done] = { + /** + * Tell the remote support to TCP_RESET the connection to the given remote + * peer. It works regardless of whether the recipient was initiator or + * responder. + * + * @param node is the symbolic name of the node which is to be affected + * @param target is the symbolic name of the other node to which connectivity shall be impeded + */ + def abort(node: String, target: String): Future[Done] = { import Settings.QueryTimeout controller ? Disconnect(node, target, true) mapTo } - override def shutdown(node: String, exitValue: Int): Future[Done] = { + /** + * Tell the remote node to shut itself down using System.exit with the given + * exitValue. + * + * @param node is the symbolic name of the node which is to be affected + * @param exitValue is the return code which shall be given to System.exit + */ + def shutdown(node: String, exitValue: Int): Future[Done] = { import Settings.QueryTimeout controller ? Terminate(node, exitValue) mapTo } - override def kill(node: String): Future[Done] = { + /** + * Tell the SBT plugin to forcibly terminate the given remote node using Process.destroy. + * + * @param node is the symbolic name of the node which is to be affected + */ + def kill(node: String): Future[Done] = { import Settings.QueryTimeout controller ? Terminate(node, -1) mapTo } - override def getNodes: Future[List[String]] = { + /** + * Obtain the list of remote host names currently registered. + */ + def getNodes: Future[List[String]] = { import Settings.QueryTimeout controller ? GetNodes mapTo } - override def removeNode(node: String): Future[Done] = { + /** + * Remove a remote host from the list, so that the remaining nodes may still + * pass subsequent barriers. This must be done before the client connection + * breaks down in order to affect an “orderly” removal (i.e. without failing + * present and future barriers). + * + * @param node is the symbolic name of the node which is to be removed + */ + def removeNode(node: String): Future[Done] = { import Settings.QueryTimeout controller ? Remove(node) mapTo } } +/** + * This handler is installed at the end of the controller’s netty pipeline. Its only + * purpose is to dispatch incoming messages to the right ServerFSM actor. There is + * one shared instance of this class for all connections accepted by one Controller. + */ class ConductorHandler(system: ActorSystem, controller: ActorRef, log: LoggingAdapter) extends SimpleChannelUpstreamHandler { val clients = new ConcurrentHashMap[Channel, ActorRef]() @@ -105,7 +217,7 @@ class ConductorHandler(system: ActorSystem, controller: ActorRef, log: LoggingAd val channel = event.getChannel log.debug("disconnect from {}", getAddrString(channel)) val fsm = clients.get(channel) - fsm ! PoisonPill + fsm ! Controller.ClientDisconnected clients.remove(channel) } @@ -129,6 +241,19 @@ object ServerFSM { case object Ready extends State } +/** + * The server part of each client connection is represented by a ServerFSM. + * The Initial state handles reception of the new client’s + * [[akka.remote.testconductor.Hello]] message (which is needed for all subsequent + * node name translations). + * + * In the Ready state, messages from the client are forwarded to the controller + * and [[akka.remote.testconductor.Send]] requests are sent, but the latter is + * treated specially: all client operations are to be confirmed by a + * [[akka.remote.testconductor.Done]] message, and there can be only one such + * request outstanding at a given time (i.e. a Send fails if the previous has + * not yet been acknowledged). + */ class ServerFSM(val controller: ActorRef, val channel: Channel) extends Actor with LoggingFSM[ServerFSM.State, Option[ActorRef]] { import ServerFSM._ import akka.actor.FSM._ @@ -136,9 +261,20 @@ class ServerFSM(val controller: ActorRef, val channel: Channel) extends Actor wi startWith(Initial, None) + whenUnhandled { + case Event(ClientDisconnected, Some(s)) ⇒ + s ! Status.Failure(new RuntimeException("client disconnected in state " + stateName + ": " + channel)) + stop() + case Event(ClientDisconnected, None) ⇒ stop() + } + + onTermination { + case _ ⇒ controller ! ClientDisconnected + } + when(Initial, stateTimeout = 10 seconds) { case Event(Hello(name, addr), _) ⇒ - controller ! ClientConnected(name, addr) + controller ! NodeInfo(name, addr, self) goto(Ready) case Event(x: NetworkOp, _) ⇒ log.warning("client {} sent no Hello in first message (instead {}), disconnecting", getAddrString(channel), x) @@ -162,7 +298,6 @@ class ServerFSM(val controller: ActorRef, val channel: Channel) extends Actor wi stay using None case Event(msg: NetworkOp, _) ⇒ log.warning("client {} sent unsupported message {}", getAddrString(channel), msg) - channel.close() stop() case Event(Send(msg @ (_: EnterBarrier | _: Done)), _) ⇒ channel.write(msg) @@ -176,10 +311,13 @@ class ServerFSM(val controller: ActorRef, val channel: Channel) extends Actor wi } initialize + + onTermination { + case _ ⇒ channel.close() + } } object Controller { - case class ClientConnected(name: String, address: Address) case class ClientDisconnected(name: String) case object GetNodes case object GetPort @@ -187,6 +325,11 @@ object Controller { case class NodeInfo(name: String, addr: Address, fsm: ActorRef) } +/** + * This controls test execution by managing barriers (delegated to + * [[akka.remote.testconductor.BarrierCoordinator]], its child) and allowing + * network and other failures to be injected at the test nodes. + */ class Controller(_participants: Int) extends Actor { import Controller._ @@ -199,8 +342,8 @@ class Controller(_participants: Int) extends Actor { override def supervisorStrategy = OneForOneStrategy() { case e: BarrierCoordinator.BarrierTimeoutException ⇒ SupervisorStrategy.Resume case e: BarrierCoordinator.WrongBarrierException ⇒ - // I think we are lacking a means of communication here: this is not correct! - for (i ← 1 to e.data.clients) barrier ! ClientConnected + for (NodeInfo(c, _, _) ← e.data.clients; info ← nodes get c) + barrier ! NodeInfo(c, info.addr, info.fsm) for (c ← e.data.arrived) c ! BarrierFailed(e.barrier) SupervisorStrategy.Restart } @@ -209,17 +352,17 @@ class Controller(_participants: Int) extends Actor { var nodes = Map[String, NodeInfo]() override def receive = LoggingReceive { - case ClientConnected(name, addr) ⇒ - nodes += name -> NodeInfo(name, addr, sender) - barrier forward ClientConnected + case c @ NodeInfo(name, addr, fsm) ⇒ + nodes += name -> c + barrier forward c if (initialParticipants <= 0) sender ! Done else if (nodes.size == initialParticipants) { for (NodeInfo(_, _, client) ← nodes.values) client ! Send(Done) initialParticipants = 0 } - case ClientDisconnected(name) ⇒ + case c @ ClientDisconnected(name) ⇒ nodes -= name - barrier forward ClientDisconnected + barrier forward c case e @ EnterBarrier(name) ⇒ barrier forward e case Throttle(node, target, direction, rateMBit) ⇒ @@ -234,9 +377,9 @@ class Controller(_participants: Int) extends Actor { } else { nodes(node).fsm forward Send(TerminateMsg(exitValueOrKill)) } - // TODO: properly remove node from BarrierCoordinator - // case Remove(node) => - // nodes -= node + case Remove(node) ⇒ + nodes -= node + barrier ! BarrierCoordinator.RemoveClient(node) case GetNodes ⇒ sender ! nodes.keys case GetPort ⇒ sender ! (connection.getLocalAddress match { @@ -250,27 +393,60 @@ object BarrierCoordinator { case object Idle extends State case object Waiting extends State - case class Data(clients: Int, barrier: String, arrived: List[ActorRef]) + case class RemoveClient(name: String) + + case class Data(clients: Set[Controller.NodeInfo], barrier: String, arrived: List[ActorRef]) class BarrierTimeoutException(val data: Data) extends RuntimeException(data.barrier) with NoStackTrace class WrongBarrierException(val barrier: String, val client: ActorRef, val data: Data) extends RuntimeException(barrier) with NoStackTrace } +/** + * This barrier coordinator gets informed of players connecting (NodeInfo), + * players being deliberately removed (RemoveClient) or failing (ClientDisconnected) + * by the controller. It also receives EnterBarrier requests, where upon the first + * one received the name of the current barrier is set and all other known clients + * are expected to join the barrier, whereupon all of the will be sent the successful + * EnterBarrier return message. In case of planned removals, this may just happen + * earlier, in case of failures the current barrier (and all subsequent ones) will + * be failed by sending BarrierFailed responses. + */ class BarrierCoordinator extends Actor with LoggingFSM[BarrierCoordinator.State, BarrierCoordinator.Data] { import BarrierCoordinator._ import akka.actor.FSM._ import Controller._ - startWith(Idle, Data(0, "", Nil)) + // this shall be set to false if all subsequent barriers shall fail + var failed = false + override def preRestart(reason: Throwable, message: Option[Any]) {} + override def postRestart(reason: Throwable) { failed = true } + + // TODO what happens with the other waiting players in case of a test failure? + + startWith(Idle, Data(Set(), "", Nil)) + + whenUnhandled { + case Event(n: NodeInfo, d @ Data(clients, _, _)) ⇒ + stay using d.copy(clients = clients + n) + } when(Idle) { - case Event(EnterBarrier(name), Data(num, _, _)) ⇒ - if (num == 0) throw new IllegalStateException("no client expected yet") - goto(Waiting) using Data(num, name, sender :: Nil) - case Event(ClientConnected, d @ Data(num, _, _)) ⇒ - stay using d.copy(clients = num + 1) - case Event(ClientDisconnected, d @ Data(num, _, _)) ⇒ - if (num == 0) throw new IllegalStateException("no client to disconnect") - stay using d.copy(clients = num - 1) + case Event(EnterBarrier(name), d @ Data(clients, _, _)) ⇒ + if (clients.isEmpty) throw new IllegalStateException("no client expected yet") + if (failed) + stay replying BarrierFailed(name) + else + goto(Waiting) using d.copy(barrier = name, arrived = sender :: Nil) + case Event(ClientDisconnected(name), d @ Data(clients, _, _)) ⇒ + if (clients.isEmpty) throw new IllegalStateException("no client to disconnect") + (clients filterNot (_.name == name)) match { + case `clients` ⇒ stay + case c ⇒ + failed = true + stay using d.copy(clients = c) + } + case Event(RemoveClient(name), d @ Data(clients, _, _)) ⇒ + if (clients.isEmpty) throw new IllegalStateException("no client to remove") + stay using d.copy(clients = clients filterNot (_.name == name)) } onTransition { @@ -279,30 +455,37 @@ class BarrierCoordinator extends Actor with LoggingFSM[BarrierCoordinator.State, } when(Waiting) { - case Event(e @ EnterBarrier(name), d @ Data(num, barrier, arrived)) ⇒ + case Event(e @ EnterBarrier(name), d @ Data(clients, barrier, arrived)) ⇒ if (name != barrier) throw new WrongBarrierException(barrier, sender, d) val together = sender :: arrived - if (together.size == num) { - together foreach (_ ! Send(e)) - goto(Idle) using Data(num, "", Nil) - } else { - stay using d.copy(arrived = together) - } - case Event(ClientConnected, d @ Data(num, _, _)) ⇒ - stay using d.copy(clients = num + 1) - case Event(ClientDisconnected, d @ Data(num, barrier, arrived)) ⇒ - val expected = num - 1 - if (arrived.size == expected) { - val e = EnterBarrier(barrier) - sender :: arrived foreach (_ ! Send(e)) - goto(Idle) using Data(expected, "", Nil) - } else { - stay using d.copy(clients = expected) + handleBarrier(d.copy(arrived = together)) + case Event(RemoveClient(name), d @ Data(clients, barrier, arrived)) ⇒ + val newClients = clients filterNot (_.name == name) + val newArrived = arrived filterNot (_ == name) + handleBarrier(d.copy(clients = newClients, arrived = newArrived)) + case Event(ClientDisconnected(name), d @ Data(clients, barrier, arrived)) ⇒ + (clients filterNot (_.name == name)) match { + case `clients` ⇒ stay + case c ⇒ + val f = BarrierFailed(barrier) + arrived foreach (_ ! Send(f)) + failed = true + goto(Idle) using Data(c, "", Nil) } case Event(StateTimeout, data) ⇒ throw new BarrierTimeoutException(data) } initialize + + def handleBarrier(data: Data): State = + if ((data.clients.map(_.fsm) -- data.arrived).isEmpty) { + val e = EnterBarrier(data.barrier) + data.arrived foreach (_ ! Send(e)) + goto(Idle) using data.copy(barrier = "", arrived = Nil) + } else { + stay using data + } + } diff --git a/akka-remote-tests/src/main/scala/akka/remote/testconductor/Extension.scala b/akka-remote-tests/src/main/scala/akka/remote/testconductor/Extension.scala index 97f5dd7295..ff1d77fb9d 100644 --- a/akka-remote-tests/src/main/scala/akka/remote/testconductor/Extension.scala +++ b/akka-remote-tests/src/main/scala/akka/remote/testconductor/Extension.scala @@ -11,12 +11,28 @@ import akka.actor.ActorRef import java.util.concurrent.ConcurrentHashMap import akka.actor.Address +/** + * Access to the [[akka.remote.testconductor.TestConductorExt]] extension: + * + * {{{ + * val tc = TestConductor(system) + * tc.startController(numPlayers) + * // OR + * tc.startClient(conductorPort) + * }}} + */ object TestConductor extends ExtensionKey[TestConductorExt] { def apply()(implicit ctx: ActorContext): TestConductorExt = apply(ctx.system) } +/** + * This binds together the [[akka.remote.testconductor.Conductor]] and + * [[akka.remote.testconductor.Player]] roles inside an Akka + * [[akka.actor.Extension]]. Please follow the aforementioned links for + * more information. + */ class TestConductorExt(val system: ExtendedActorSystem) extends Extension with Conductor with Player { object Settings { diff --git a/akka-remote-tests/src/main/scala/akka/remote/testconductor/Features.scala b/akka-remote-tests/src/main/scala/akka/remote/testconductor/Features.scala deleted file mode 100644 index 336d04c368..0000000000 --- a/akka-remote-tests/src/main/scala/akka/remote/testconductor/Features.scala +++ /dev/null @@ -1,89 +0,0 @@ -/** - * Copyright (C) 2009-2011 Typesafe Inc. - */ -package akka.remote.testconductor - -import akka.dispatch.Future - -trait BarrierSync { - /** - * Enter all given barriers in the order in which they were given. - */ - def enter(name: String*): Unit -} - -sealed trait Direction - -object Direction { - case object Send extends Direction - case object Receive extends Direction - case object Both extends Direction -} - -trait FailureInject { - - /** - * Make the remoting pipeline on the node throttle data sent to or received - * from the given remote peer. - */ - def throttle(node: String, target: String, direction: Direction, rateMBit: Double): Future[Done] - - /** - * Switch the Netty pipeline of the remote support into blackhole mode for - * sending and/or receiving: it will just drop all messages right before - * submitting them to the Socket or right after receiving them from the - * Socket. - */ - def blackhole(node: String, target: String, direction: Direction): Future[Done] - - /** - * Tell the remote support to shutdown the connection to the given remote - * peer. It works regardless of whether the recipient was initiator or - * responder. - */ - def disconnect(node: String, target: String): Future[Done] - - /** - * Tell the remote support to TCP_RESET the connection to the given remote - * peer. It works regardless of whether the recipient was initiator or - * responder. - */ - def abort(node: String, target: String): Future[Done] - -} - -trait RunControl { - - /** - * Start the server port, returns the port number. - */ - def startController(participants: Int): Future[Int] - - /** - * Get the actual port used by the server. - */ - def port: Future[Int] - - /** - * Tell the remote node to shut itself down using System.exit with the given - * exitValue. - */ - def shutdown(node: String, exitValue: Int): Future[Done] - - /** - * Tell the SBT plugin to forcibly terminate the given remote node using Process.destroy. - */ - def kill(node: String): Future[Done] - - /** - * Obtain the list of remote host names currently registered. - */ - def getNodes: Future[List[String]] - - /** - * Remove a remote host from the list, so that the remaining nodes may still - * pass subsequent barriers. - */ - def removeNode(node: String): Future[Done] - -} diff --git a/akka-remote-tests/src/main/scala/akka/remote/testconductor/Player.scala b/akka-remote-tests/src/main/scala/akka/remote/testconductor/Player.scala index 6e78610cfb..38d0f6ef34 100644 --- a/akka-remote-tests/src/main/scala/akka/remote/testconductor/Player.scala +++ b/akka-remote-tests/src/main/scala/akka/remote/testconductor/Player.scala @@ -21,7 +21,13 @@ import akka.actor.PoisonPill import akka.event.Logging import akka.dispatch.Future -trait Player extends BarrierSync { this: TestConductorExt ⇒ +/** + * The Player is the client component of the + * [[akka.remote.testconductor.TestConductorExt]] extension. It registers with + * the [[akka.remote.testconductor.Conductor]]’s [[akka.remote.testconductor.Controller]] + * in order to participate in barriers and enable network failure injection. + */ +trait Player { this: TestConductorExt ⇒ private var _client: ActorRef = _ private def client = _client match { @@ -29,6 +35,14 @@ trait Player extends BarrierSync { this: TestConductorExt ⇒ case x ⇒ x } + /** + * Connect to the conductor on the given port (the host is taken from setting + * `akka.testconductor.host`). The connection is made asynchronously, but you + * should await completion of the returned Future because that implies that + * all expected participants of this test have successfully connected (i.e. + * this is a first barrier in itself). The number of expected participants is + * set in [[akka.remote.testconductor.Conductor]]`.startController()`. + */ def startClient(port: Int): Future[Done] = { import ClientFSM._ import akka.actor.FSM._ @@ -51,7 +65,11 @@ trait Player extends BarrierSync { this: TestConductorExt ⇒ a ? client mapTo } - override def enter(name: String*) { + /** + * Enter the named barriers, one after the other, in the order given. Will + * throw an exception in case of timeouts or other errors. + */ + def enter(name: String*) { system.log.debug("entering barriers " + name.mkString("(", ", ", ")")) name foreach { b ⇒ import Settings.BarrierTimeout @@ -73,6 +91,15 @@ object ClientFSM { case object Disconnected } +/** + * This is the controlling entity on the [[akka.remote.testconductor.Player]] + * side: in a first step it registers itself with a symbolic name and its remote + * address at the [[akka.remote.testconductor.Controller]], then waits for the + * `Done` message which signals that all other expected test participants have + * done the same. After that, it will pass barrier requests to and from the + * coordinator and react to the [[akka.remote.testconductor.Conductor]]’s + * requests for failure injection. + */ class ClientFSM(port: Int) extends Actor with LoggingFSM[ClientFSM.State, ClientFSM.Data] { import ClientFSM._ @@ -162,6 +189,9 @@ class ClientFSM(port: Int) extends Actor with LoggingFSM[ClientFSM.State, Client } +/** + * This handler only forwards messages received from the conductor to the [[akka.remote.testconductor.ClientFSM]]. + */ class PlayerHandler(fsm: ActorRef, log: LoggingAdapter) extends SimpleChannelUpstreamHandler { import ClientFSM._