scaladoc for TestConductor

This commit is contained in:
Roland 2012-05-10 21:08:06 +02:00
parent d931a6e727
commit 160aa73066
4 changed files with 283 additions and 143 deletions

View file

@ -25,8 +25,26 @@ import akka.dispatch.Future
import akka.actor.OneForOneStrategy
import akka.actor.SupervisorStrategy
import java.util.concurrent.ConcurrentHashMap
import akka.actor.Status
trait Conductor extends RunControl with FailureInject { this: TestConductorExt
sealed trait Direction
object Direction {
case object Send extends Direction
case object Receive extends Direction
case object Both extends Direction
}
/**
* The conductor is the one orchestrating the test: it governs the
* [[akka.remote.testconductor.Controller]]s port to which all
* [[akka.remote.testconductor.Player]]s connect, it issues commands to their
* [[akka.remote.testconductor.NetworkFailureInjector]] and provides support
* for barriers using the [[akka.remote.testconductor.BarrierCoordinator]].
* All of this is bundled inside the [[akka.remote.testconductor.TestConductorExt]]
* extension.
*/
trait Conductor { this: TestConductorExt
import Controller._
@ -36,60 +54,154 @@ trait Conductor extends RunControl with FailureInject { this: TestConductorExt
case x x
}
override def startController(participants: Int): Future[Int] = {
/**
* Start the [[akka.remote.testconductor.Controller]], which in turn will
* bind to a TCP port as specified in the `akka.testconductor.port` config
* property, where 0 denotes automatic allocation. Since the latter is
* actually preferred, a `Future[Int]` is returned which will be completed
* with the port number actually chosen, so that this can then be communicated
* to the players for their proper start-up.
*
* This method also invokes [[akka.remote.testconductor.Player]].startClient,
* since it is expected that the conductor participates in barriers for
* overall coordination. The returned Future will only be completed once the
* clients start-up finishes, which in fact waits for all other players to
* connect.
*
* @param participants gives the number of participants which shall connect
* before any of their startClient() operations complete.
*/
def startController(participants: Int): Future[Int] = {
if (_controller ne null) throw new RuntimeException("TestConductorServer was already started")
_controller = system.actorOf(Props(new Controller(participants)), "controller")
import Settings.BarrierTimeout
controller ? GetPort flatMap { case port: Int startClient(port) map (_ port) }
}
override def port: Future[Int] = {
/**
* Obtain the port to which the controllers socket is actually bound. This
* will deviate from the configuration in `akka.testconductor.port` in case
* that was given as zero.
*/
def port: Future[Int] = {
import Settings.QueryTimeout
controller ? GetPort mapTo
}
override def throttle(node: String, target: String, direction: Direction, rateMBit: Double): Future[Done] = {
/**
* Make the remoting pipeline on the node throttle data sent to or received
* from the given remote peer. Throttling works by delaying packet submission
* within the netty pipeline until the packet would have been completely sent
* according to the given rate, the previous packet completion and the current
* packet length. In case of large packets they are split up if the calculated
* send pause would exceed `akka.testconductor.packet-split-threshold`
* (roughly). All of this uses the systems HashedWheelTimer, which is not
* terribly precise and will execute tasks later than they are schedule (even
* on average), but that is countered by using the actual execution time for
* determining how much to send, leading to the correct output rate, but with
* increased latency.
*
* @param node is the symbolic name of the node which is to be affected
* @param target is the symbolic name of the other node to which connectivity shall be throttled
* @param direction can be either `Direction.Send`, `Direction.Receive` or `Direction.Both`
* @param rateMBit is the maximum data rate in MBit
*/
def throttle(node: String, target: String, direction: Direction, rateMBit: Double): Future[Done] = {
import Settings.QueryTimeout
controller ? Throttle(node, target, direction, rateMBit.toFloat) mapTo
}
override def blackhole(node: String, target: String, direction: Direction): Future[Done] = {
/**
* Switch the Netty pipeline of the remote support into blackhole mode for
* sending and/or receiving: it will just drop all messages right before
* submitting them to the Socket or right after receiving them from the
* Socket.
*
* @param node is the symbolic name of the node which is to be affected
* @param target is the symbolic name of the other node to which connectivity shall be impeded
* @param direction can be either `Direction.Send`, `Direction.Receive` or `Direction.Both`
*/
def blackhole(node: String, target: String, direction: Direction): Future[Done] = {
import Settings.QueryTimeout
controller ? Throttle(node, target, direction, 0f) mapTo
}
override def disconnect(node: String, target: String): Future[Done] = {
/**
* Tell the remote support to shutdown the connection to the given remote
* peer. It works regardless of whether the recipient was initiator or
* responder.
*
* @param node is the symbolic name of the node which is to be affected
* @param target is the symbolic name of the other node to which connectivity shall be impeded
*/
def disconnect(node: String, target: String): Future[Done] = {
import Settings.QueryTimeout
controller ? Disconnect(node, target, false) mapTo
}
override def abort(node: String, target: String): Future[Done] = {
/**
* Tell the remote support to TCP_RESET the connection to the given remote
* peer. It works regardless of whether the recipient was initiator or
* responder.
*
* @param node is the symbolic name of the node which is to be affected
* @param target is the symbolic name of the other node to which connectivity shall be impeded
*/
def abort(node: String, target: String): Future[Done] = {
import Settings.QueryTimeout
controller ? Disconnect(node, target, true) mapTo
}
override def shutdown(node: String, exitValue: Int): Future[Done] = {
/**
* Tell the remote node to shut itself down using System.exit with the given
* exitValue.
*
* @param node is the symbolic name of the node which is to be affected
* @param exitValue is the return code which shall be given to System.exit
*/
def shutdown(node: String, exitValue: Int): Future[Done] = {
import Settings.QueryTimeout
controller ? Terminate(node, exitValue) mapTo
}
override def kill(node: String): Future[Done] = {
/**
* Tell the SBT plugin to forcibly terminate the given remote node using Process.destroy.
*
* @param node is the symbolic name of the node which is to be affected
*/
def kill(node: String): Future[Done] = {
import Settings.QueryTimeout
controller ? Terminate(node, -1) mapTo
}
override def getNodes: Future[List[String]] = {
/**
* Obtain the list of remote host names currently registered.
*/
def getNodes: Future[List[String]] = {
import Settings.QueryTimeout
controller ? GetNodes mapTo
}
override def removeNode(node: String): Future[Done] = {
/**
* Remove a remote host from the list, so that the remaining nodes may still
* pass subsequent barriers. This must be done before the client connection
* breaks down in order to affect an orderly removal (i.e. without failing
* present and future barriers).
*
* @param node is the symbolic name of the node which is to be removed
*/
def removeNode(node: String): Future[Done] = {
import Settings.QueryTimeout
controller ? Remove(node) mapTo
}
}
/**
* This handler is installed at the end of the controllers netty pipeline. Its only
* purpose is to dispatch incoming messages to the right ServerFSM actor. There is
* one shared instance of this class for all connections accepted by one Controller.
*/
class ConductorHandler(system: ActorSystem, controller: ActorRef, log: LoggingAdapter) extends SimpleChannelUpstreamHandler {
val clients = new ConcurrentHashMap[Channel, ActorRef]()
@ -105,7 +217,7 @@ class ConductorHandler(system: ActorSystem, controller: ActorRef, log: LoggingAd
val channel = event.getChannel
log.debug("disconnect from {}", getAddrString(channel))
val fsm = clients.get(channel)
fsm ! PoisonPill
fsm ! Controller.ClientDisconnected
clients.remove(channel)
}
@ -129,6 +241,19 @@ object ServerFSM {
case object Ready extends State
}
/**
* The server part of each client connection is represented by a ServerFSM.
* The Initial state handles reception of the new clients
* [[akka.remote.testconductor.Hello]] message (which is needed for all subsequent
* node name translations).
*
* In the Ready state, messages from the client are forwarded to the controller
* and [[akka.remote.testconductor.Send]] requests are sent, but the latter is
* treated specially: all client operations are to be confirmed by a
* [[akka.remote.testconductor.Done]] message, and there can be only one such
* request outstanding at a given time (i.e. a Send fails if the previous has
* not yet been acknowledged).
*/
class ServerFSM(val controller: ActorRef, val channel: Channel) extends Actor with LoggingFSM[ServerFSM.State, Option[ActorRef]] {
import ServerFSM._
import akka.actor.FSM._
@ -136,9 +261,20 @@ class ServerFSM(val controller: ActorRef, val channel: Channel) extends Actor wi
startWith(Initial, None)
whenUnhandled {
case Event(ClientDisconnected, Some(s))
s ! Status.Failure(new RuntimeException("client disconnected in state " + stateName + ": " + channel))
stop()
case Event(ClientDisconnected, None) stop()
}
onTermination {
case _ controller ! ClientDisconnected
}
when(Initial, stateTimeout = 10 seconds) {
case Event(Hello(name, addr), _)
controller ! ClientConnected(name, addr)
controller ! NodeInfo(name, addr, self)
goto(Ready)
case Event(x: NetworkOp, _)
log.warning("client {} sent no Hello in first message (instead {}), disconnecting", getAddrString(channel), x)
@ -162,7 +298,6 @@ class ServerFSM(val controller: ActorRef, val channel: Channel) extends Actor wi
stay using None
case Event(msg: NetworkOp, _)
log.warning("client {} sent unsupported message {}", getAddrString(channel), msg)
channel.close()
stop()
case Event(Send(msg @ (_: EnterBarrier | _: Done)), _)
channel.write(msg)
@ -176,10 +311,13 @@ class ServerFSM(val controller: ActorRef, val channel: Channel) extends Actor wi
}
initialize
onTermination {
case _ channel.close()
}
}
object Controller {
case class ClientConnected(name: String, address: Address)
case class ClientDisconnected(name: String)
case object GetNodes
case object GetPort
@ -187,6 +325,11 @@ object Controller {
case class NodeInfo(name: String, addr: Address, fsm: ActorRef)
}
/**
* This controls test execution by managing barriers (delegated to
* [[akka.remote.testconductor.BarrierCoordinator]], its child) and allowing
* network and other failures to be injected at the test nodes.
*/
class Controller(_participants: Int) extends Actor {
import Controller._
@ -199,8 +342,8 @@ class Controller(_participants: Int) extends Actor {
override def supervisorStrategy = OneForOneStrategy() {
case e: BarrierCoordinator.BarrierTimeoutException SupervisorStrategy.Resume
case e: BarrierCoordinator.WrongBarrierException
// I think we are lacking a means of communication here: this is not correct!
for (i 1 to e.data.clients) barrier ! ClientConnected
for (NodeInfo(c, _, _) e.data.clients; info nodes get c)
barrier ! NodeInfo(c, info.addr, info.fsm)
for (c e.data.arrived) c ! BarrierFailed(e.barrier)
SupervisorStrategy.Restart
}
@ -209,17 +352,17 @@ class Controller(_participants: Int) extends Actor {
var nodes = Map[String, NodeInfo]()
override def receive = LoggingReceive {
case ClientConnected(name, addr)
nodes += name -> NodeInfo(name, addr, sender)
barrier forward ClientConnected
case c @ NodeInfo(name, addr, fsm)
nodes += name -> c
barrier forward c
if (initialParticipants <= 0) sender ! Done
else if (nodes.size == initialParticipants) {
for (NodeInfo(_, _, client) nodes.values) client ! Send(Done)
initialParticipants = 0
}
case ClientDisconnected(name)
case c @ ClientDisconnected(name)
nodes -= name
barrier forward ClientDisconnected
barrier forward c
case e @ EnterBarrier(name)
barrier forward e
case Throttle(node, target, direction, rateMBit)
@ -234,9 +377,9 @@ class Controller(_participants: Int) extends Actor {
} else {
nodes(node).fsm forward Send(TerminateMsg(exitValueOrKill))
}
// TODO: properly remove node from BarrierCoordinator
// case Remove(node) =>
// nodes -= node
case Remove(node)
nodes -= node
barrier ! BarrierCoordinator.RemoveClient(node)
case GetNodes sender ! nodes.keys
case GetPort
sender ! (connection.getLocalAddress match {
@ -250,27 +393,60 @@ object BarrierCoordinator {
case object Idle extends State
case object Waiting extends State
case class Data(clients: Int, barrier: String, arrived: List[ActorRef])
case class RemoveClient(name: String)
case class Data(clients: Set[Controller.NodeInfo], barrier: String, arrived: List[ActorRef])
class BarrierTimeoutException(val data: Data) extends RuntimeException(data.barrier) with NoStackTrace
class WrongBarrierException(val barrier: String, val client: ActorRef, val data: Data) extends RuntimeException(barrier) with NoStackTrace
}
/**
* This barrier coordinator gets informed of players connecting (NodeInfo),
* players being deliberately removed (RemoveClient) or failing (ClientDisconnected)
* by the controller. It also receives EnterBarrier requests, where upon the first
* one received the name of the current barrier is set and all other known clients
* are expected to join the barrier, whereupon all of the will be sent the successful
* EnterBarrier return message. In case of planned removals, this may just happen
* earlier, in case of failures the current barrier (and all subsequent ones) will
* be failed by sending BarrierFailed responses.
*/
class BarrierCoordinator extends Actor with LoggingFSM[BarrierCoordinator.State, BarrierCoordinator.Data] {
import BarrierCoordinator._
import akka.actor.FSM._
import Controller._
startWith(Idle, Data(0, "", Nil))
// this shall be set to false if all subsequent barriers shall fail
var failed = false
override def preRestart(reason: Throwable, message: Option[Any]) {}
override def postRestart(reason: Throwable) { failed = true }
// TODO what happens with the other waiting players in case of a test failure?
startWith(Idle, Data(Set(), "", Nil))
whenUnhandled {
case Event(n: NodeInfo, d @ Data(clients, _, _))
stay using d.copy(clients = clients + n)
}
when(Idle) {
case Event(EnterBarrier(name), Data(num, _, _))
if (num == 0) throw new IllegalStateException("no client expected yet")
goto(Waiting) using Data(num, name, sender :: Nil)
case Event(ClientConnected, d @ Data(num, _, _))
stay using d.copy(clients = num + 1)
case Event(ClientDisconnected, d @ Data(num, _, _))
if (num == 0) throw new IllegalStateException("no client to disconnect")
stay using d.copy(clients = num - 1)
case Event(EnterBarrier(name), d @ Data(clients, _, _))
if (clients.isEmpty) throw new IllegalStateException("no client expected yet")
if (failed)
stay replying BarrierFailed(name)
else
goto(Waiting) using d.copy(barrier = name, arrived = sender :: Nil)
case Event(ClientDisconnected(name), d @ Data(clients, _, _))
if (clients.isEmpty) throw new IllegalStateException("no client to disconnect")
(clients filterNot (_.name == name)) match {
case `clients` stay
case c
failed = true
stay using d.copy(clients = c)
}
case Event(RemoveClient(name), d @ Data(clients, _, _))
if (clients.isEmpty) throw new IllegalStateException("no client to remove")
stay using d.copy(clients = clients filterNot (_.name == name))
}
onTransition {
@ -279,30 +455,37 @@ class BarrierCoordinator extends Actor with LoggingFSM[BarrierCoordinator.State,
}
when(Waiting) {
case Event(e @ EnterBarrier(name), d @ Data(num, barrier, arrived))
case Event(e @ EnterBarrier(name), d @ Data(clients, barrier, arrived))
if (name != barrier) throw new WrongBarrierException(barrier, sender, d)
val together = sender :: arrived
if (together.size == num) {
together foreach (_ ! Send(e))
goto(Idle) using Data(num, "", Nil)
} else {
stay using d.copy(arrived = together)
}
case Event(ClientConnected, d @ Data(num, _, _))
stay using d.copy(clients = num + 1)
case Event(ClientDisconnected, d @ Data(num, barrier, arrived))
val expected = num - 1
if (arrived.size == expected) {
val e = EnterBarrier(barrier)
sender :: arrived foreach (_ ! Send(e))
goto(Idle) using Data(expected, "", Nil)
} else {
stay using d.copy(clients = expected)
handleBarrier(d.copy(arrived = together))
case Event(RemoveClient(name), d @ Data(clients, barrier, arrived))
val newClients = clients filterNot (_.name == name)
val newArrived = arrived filterNot (_ == name)
handleBarrier(d.copy(clients = newClients, arrived = newArrived))
case Event(ClientDisconnected(name), d @ Data(clients, barrier, arrived))
(clients filterNot (_.name == name)) match {
case `clients` stay
case c
val f = BarrierFailed(barrier)
arrived foreach (_ ! Send(f))
failed = true
goto(Idle) using Data(c, "", Nil)
}
case Event(StateTimeout, data)
throw new BarrierTimeoutException(data)
}
initialize
def handleBarrier(data: Data): State =
if ((data.clients.map(_.fsm) -- data.arrived).isEmpty) {
val e = EnterBarrier(data.barrier)
data.arrived foreach (_ ! Send(e))
goto(Idle) using data.copy(barrier = "", arrived = Nil)
} else {
stay using data
}
}

View file

@ -11,12 +11,28 @@ import akka.actor.ActorRef
import java.util.concurrent.ConcurrentHashMap
import akka.actor.Address
/**
* Access to the [[akka.remote.testconductor.TestConductorExt]] extension:
*
* {{{
* val tc = TestConductor(system)
* tc.startController(numPlayers)
* // OR
* tc.startClient(conductorPort)
* }}}
*/
object TestConductor extends ExtensionKey[TestConductorExt] {
def apply()(implicit ctx: ActorContext): TestConductorExt = apply(ctx.system)
}
/**
* This binds together the [[akka.remote.testconductor.Conductor]] and
* [[akka.remote.testconductor.Player]] roles inside an Akka
* [[akka.actor.Extension]]. Please follow the aforementioned links for
* more information.
*/
class TestConductorExt(val system: ExtendedActorSystem) extends Extension with Conductor with Player {
object Settings {

View file

@ -1,89 +0,0 @@
/**
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.remote.testconductor
import akka.dispatch.Future
trait BarrierSync {
/**
* Enter all given barriers in the order in which they were given.
*/
def enter(name: String*): Unit
}
sealed trait Direction
object Direction {
case object Send extends Direction
case object Receive extends Direction
case object Both extends Direction
}
trait FailureInject {
/**
* Make the remoting pipeline on the node throttle data sent to or received
* from the given remote peer.
*/
def throttle(node: String, target: String, direction: Direction, rateMBit: Double): Future[Done]
/**
* Switch the Netty pipeline of the remote support into blackhole mode for
* sending and/or receiving: it will just drop all messages right before
* submitting them to the Socket or right after receiving them from the
* Socket.
*/
def blackhole(node: String, target: String, direction: Direction): Future[Done]
/**
* Tell the remote support to shutdown the connection to the given remote
* peer. It works regardless of whether the recipient was initiator or
* responder.
*/
def disconnect(node: String, target: String): Future[Done]
/**
* Tell the remote support to TCP_RESET the connection to the given remote
* peer. It works regardless of whether the recipient was initiator or
* responder.
*/
def abort(node: String, target: String): Future[Done]
}
trait RunControl {
/**
* Start the server port, returns the port number.
*/
def startController(participants: Int): Future[Int]
/**
* Get the actual port used by the server.
*/
def port: Future[Int]
/**
* Tell the remote node to shut itself down using System.exit with the given
* exitValue.
*/
def shutdown(node: String, exitValue: Int): Future[Done]
/**
* Tell the SBT plugin to forcibly terminate the given remote node using Process.destroy.
*/
def kill(node: String): Future[Done]
/**
* Obtain the list of remote host names currently registered.
*/
def getNodes: Future[List[String]]
/**
* Remove a remote host from the list, so that the remaining nodes may still
* pass subsequent barriers.
*/
def removeNode(node: String): Future[Done]
}

View file

@ -21,7 +21,13 @@ import akka.actor.PoisonPill
import akka.event.Logging
import akka.dispatch.Future
trait Player extends BarrierSync { this: TestConductorExt
/**
* The Player is the client component of the
* [[akka.remote.testconductor.TestConductorExt]] extension. It registers with
* the [[akka.remote.testconductor.Conductor]]s [[akka.remote.testconductor.Controller]]
* in order to participate in barriers and enable network failure injection.
*/
trait Player { this: TestConductorExt
private var _client: ActorRef = _
private def client = _client match {
@ -29,6 +35,14 @@ trait Player extends BarrierSync { this: TestConductorExt ⇒
case x x
}
/**
* Connect to the conductor on the given port (the host is taken from setting
* `akka.testconductor.host`). The connection is made asynchronously, but you
* should await completion of the returned Future because that implies that
* all expected participants of this test have successfully connected (i.e.
* this is a first barrier in itself). The number of expected participants is
* set in [[akka.remote.testconductor.Conductor]]`.startController()`.
*/
def startClient(port: Int): Future[Done] = {
import ClientFSM._
import akka.actor.FSM._
@ -51,7 +65,11 @@ trait Player extends BarrierSync { this: TestConductorExt ⇒
a ? client mapTo
}
override def enter(name: String*) {
/**
* Enter the named barriers, one after the other, in the order given. Will
* throw an exception in case of timeouts or other errors.
*/
def enter(name: String*) {
system.log.debug("entering barriers " + name.mkString("(", ", ", ")"))
name foreach { b
import Settings.BarrierTimeout
@ -73,6 +91,15 @@ object ClientFSM {
case object Disconnected
}
/**
* This is the controlling entity on the [[akka.remote.testconductor.Player]]
* side: in a first step it registers itself with a symbolic name and its remote
* address at the [[akka.remote.testconductor.Controller]], then waits for the
* `Done` message which signals that all other expected test participants have
* done the same. After that, it will pass barrier requests to and from the
* coordinator and react to the [[akka.remote.testconductor.Conductor]]s
* requests for failure injection.
*/
class ClientFSM(port: Int) extends Actor with LoggingFSM[ClientFSM.State, ClientFSM.Data] {
import ClientFSM._
@ -162,6 +189,9 @@ class ClientFSM(port: Int) extends Actor with LoggingFSM[ClientFSM.State, Client
}
/**
* This handler only forwards messages received from the conductor to the [[akka.remote.testconductor.ClientFSM]].
*/
class PlayerHandler(fsm: ActorRef, log: LoggingAdapter) extends SimpleChannelUpstreamHandler {
import ClientFSM._