incorporate review comments into TestConductor work

- protect all internal API using private[akka] and ScalaDoc
- remove package object which was after a previous refactoring only used
  from a single place anyway
- document all public API methods, add brief description how failure
  injector works
- include remoteTests in the top-level aggregate project
This commit is contained in:
Roland 2012-05-22 15:19:45 +02:00
parent e825a8ac4f
commit 508d8f70a5
10 changed files with 188 additions and 82 deletions

View file

@ -50,7 +50,7 @@ trait Conductor { this: TestConductorExt ⇒
private var _controller: ActorRef = _
private def controller: ActorRef = _controller match {
case null throw new RuntimeException("TestConductorServer was not started")
case null throw new IllegalStateException("TestConductorServer was not started")
case x x
}
@ -169,10 +169,11 @@ trait Conductor { this: TestConductorExt ⇒
*
* @param node is the symbolic name of the node which is to be affected
*/
def kill(node: RoleName): Future[Done] = {
import Settings.QueryTimeout
controller ? Terminate(node, -1) mapTo
}
// TODO: uncomment (and implement in Controller) if really needed
// def kill(node: RoleName): Future[Done] = {
// import Settings.QueryTimeout
// controller ? Terminate(node, -1) mapTo
// }
/**
* Obtain the list of remote host names currently registered.
@ -201,8 +202,10 @@ trait Conductor { this: TestConductorExt ⇒
* This handler is installed at the end of the controllers netty pipeline. Its only
* purpose is to dispatch incoming messages to the right ServerFSM actor. There is
* one shared instance of this class for all connections accepted by one Controller.
*
* INTERNAL API.
*/
class ConductorHandler(system: ActorSystem, controller: ActorRef, log: LoggingAdapter) extends SimpleChannelUpstreamHandler {
private[akka] class ConductorHandler(system: ActorSystem, controller: ActorRef, log: LoggingAdapter) extends SimpleChannelUpstreamHandler {
val clients = new ConcurrentHashMap[Channel, ActorRef]()
@ -235,7 +238,10 @@ class ConductorHandler(system: ActorSystem, controller: ActorRef, log: LoggingAd
}
object ServerFSM {
/**
* INTERNAL API.
*/
private[akka] object ServerFSM {
sealed trait State
case object Initial extends State
case object Ready extends State
@ -253,8 +259,10 @@ object ServerFSM {
* [[akka.remote.testconductor.Done]] message, and there can be only one such
* request outstanding at a given time (i.e. a Send fails if the previous has
* not yet been acknowledged).
*
* INTERNAL API.
*/
class ServerFSM(val controller: ActorRef, val channel: Channel) extends Actor with LoggingFSM[ServerFSM.State, Option[ActorRef]] {
private[akka] class ServerFSM(val controller: ActorRef, val channel: Channel) extends Actor with LoggingFSM[ServerFSM.State, Option[ActorRef]] {
import ServerFSM._
import akka.actor.FSM._
import Controller._
@ -317,7 +325,10 @@ class ServerFSM(val controller: ActorRef, val channel: Channel) extends Actor wi
}
}
object Controller {
/**
* INTERNAL API.
*/
private[akka] object Controller {
case class ClientDisconnected(name: RoleName)
case object GetNodes
case object GetSockAddr
@ -329,8 +340,10 @@ object Controller {
* This controls test execution by managing barriers (delegated to
* [[akka.remote.testconductor.BarrierCoordinator]], its child) and allowing
* network and other failures to be injected at the test nodes.
*
* INTERNAL API.
*/
class Controller(private var initialParticipants: Int, controllerPort: InetSocketAddress) extends Actor {
private[akka] class Controller(private var initialParticipants: Int, controllerPort: InetSocketAddress) extends Actor {
import Controller._
import BarrierCoordinator._
@ -418,7 +431,10 @@ class Controller(private var initialParticipants: Int, controllerPort: InetSocke
}
}
object BarrierCoordinator {
/**
* INTERNAL API.
*/
private[akka] object BarrierCoordinator {
sealed trait State
case object Idle extends State
case object Waiting extends State
@ -447,8 +463,10 @@ object BarrierCoordinator {
* EnterBarrier return message. In case of planned removals, this may just happen
* earlier, in case of failures the current barrier (and all subsequent ones) will
* be failed by sending BarrierFailed responses.
*
* INTERNAL API.
*/
class BarrierCoordinator extends Actor with LoggingFSM[BarrierCoordinator.State, BarrierCoordinator.Data] {
private[akka] class BarrierCoordinator extends Actor with LoggingFSM[BarrierCoordinator.State, BarrierCoordinator.Data] {
import BarrierCoordinator._
import akka.actor.FSM._
import Controller._

View file

@ -13,44 +13,59 @@ import org.jboss.netty.handler.codec.oneone.OneToOneDecoder
case class RoleName(name: String)
case class ToClient(msg: ClientOp with NetworkOp)
case class ToServer(msg: ServerOp with NetworkOp)
private[akka] case class ToClient(msg: ClientOp with NetworkOp)
private[akka] case class ToServer(msg: ServerOp with NetworkOp)
sealed trait ClientOp // messages sent to from Conductor to Player
sealed trait ServerOp // messages sent to from Player to Conductor
sealed trait CommandOp // messages sent from TestConductorExt to Conductor
sealed trait NetworkOp // messages sent over the wire
sealed trait UnconfirmedClientOp extends ClientOp // unconfirmed messages going to the Player
sealed trait ConfirmedClientOp extends ClientOp
private[akka] sealed trait ClientOp // messages sent to from Conductor to Player
private[akka] sealed trait ServerOp // messages sent to from Player to Conductor
private[akka] sealed trait CommandOp // messages sent from TestConductorExt to Conductor
private[akka] sealed trait NetworkOp // messages sent over the wire
private[akka] sealed trait UnconfirmedClientOp extends ClientOp // unconfirmed messages going to the Player
private[akka] sealed trait ConfirmedClientOp extends ClientOp
/**
* First message of connection sets names straight.
*/
case class Hello(name: String, addr: Address) extends NetworkOp
private[akka] case class Hello(name: String, addr: Address) extends NetworkOp
case class EnterBarrier(name: String) extends ServerOp with NetworkOp
case class BarrierResult(name: String, success: Boolean) extends UnconfirmedClientOp with NetworkOp
private[akka] case class EnterBarrier(name: String) extends ServerOp with NetworkOp
private[akka] case class BarrierResult(name: String, success: Boolean) extends UnconfirmedClientOp with NetworkOp
case class Throttle(node: RoleName, target: RoleName, direction: Direction, rateMBit: Float) extends CommandOp
case class ThrottleMsg(target: Address, direction: Direction, rateMBit: Float) extends ConfirmedClientOp with NetworkOp
private[akka] case class Throttle(node: RoleName, target: RoleName, direction: Direction, rateMBit: Float) extends CommandOp
private[akka] case class ThrottleMsg(target: Address, direction: Direction, rateMBit: Float) extends ConfirmedClientOp with NetworkOp
case class Disconnect(node: RoleName, target: RoleName, abort: Boolean) extends CommandOp
case class DisconnectMsg(target: Address, abort: Boolean) extends ConfirmedClientOp with NetworkOp
private[akka] case class Disconnect(node: RoleName, target: RoleName, abort: Boolean) extends CommandOp
private[akka] case class DisconnectMsg(target: Address, abort: Boolean) extends ConfirmedClientOp with NetworkOp
case class Terminate(node: RoleName, exitValueOrKill: Int) extends CommandOp
case class TerminateMsg(exitValue: Int) extends ConfirmedClientOp with NetworkOp
private[akka] case class Terminate(node: RoleName, exitValueOrKill: Int) extends CommandOp
private[akka] case class TerminateMsg(exitValue: Int) extends ConfirmedClientOp with NetworkOp
case class GetAddress(node: RoleName) extends ServerOp with NetworkOp
case class AddressReply(node: RoleName, addr: Address) extends UnconfirmedClientOp with NetworkOp
private[akka] case class GetAddress(node: RoleName) extends ServerOp with NetworkOp
private[akka] case class AddressReply(node: RoleName, addr: Address) extends UnconfirmedClientOp with NetworkOp
abstract class Done extends ServerOp with UnconfirmedClientOp with NetworkOp
case object Done extends Done {
private[akka] abstract class Done extends ServerOp with UnconfirmedClientOp with NetworkOp
private[akka] case object Done extends Done {
def getInstance: Done = this
}
case class Remove(node: RoleName) extends CommandOp
private[akka] case class Remove(node: RoleName) extends CommandOp
private[akka] class MsgEncoder extends OneToOneEncoder {
implicit def address2proto(addr: Address): TCP.Address =
TCP.Address.newBuilder
.setProtocol(addr.protocol)
.setSystem(addr.system)
.setHost(addr.host.get)
.setPort(addr.port.get)
.build
implicit def direction2proto(dir: Direction): TCP.Direction = dir match {
case Direction.Send TCP.Direction.Send
case Direction.Receive TCP.Direction.Receive
case Direction.Both TCP.Direction.Both
}
class MsgEncoder extends OneToOneEncoder {
def encode(ctx: ChannelHandlerContext, ch: Channel, msg: AnyRef): AnyRef = msg match {
case x: NetworkOp
val w = TCP.Wrapper.newBuilder
@ -81,7 +96,17 @@ class MsgEncoder extends OneToOneEncoder {
}
}
class MsgDecoder extends OneToOneDecoder {
private[akka] class MsgDecoder extends OneToOneDecoder {
implicit def address2scala(addr: TCP.Address): Address =
Address(addr.getProtocol, addr.getSystem, addr.getHost, addr.getPort)
implicit def direction2scala(dir: TCP.Direction): Direction = dir match {
case TCP.Direction.Send Direction.Send
case TCP.Direction.Receive Direction.Receive
case TCP.Direction.Both Direction.Both
}
def decode(ctx: ChannelHandlerContext, ch: Channel, msg: AnyRef): AnyRef = msg match {
case w: TCP.Wrapper if w.getAllFields.size == 1
if (w.hasHello) {

View file

@ -32,6 +32,9 @@ object TestConductor extends ExtensionKey[TestConductorExt] {
* [[akka.remote.testconductor.Player]] roles inside an Akka
* [[akka.actor.Extension]]. Please follow the aforementioned links for
* more information.
*
* <b>This extension requires the `akka.actor.provider`
* to be a [[akka.remote.RemoteActorRefProvider]].</b>
*/
class TestConductorExt(val system: ExtendedActorSystem) extends Extension with Conductor with Player {
@ -47,9 +50,22 @@ class TestConductorExt(val system: ExtendedActorSystem) extends Extension with C
val PacketSplitThreshold = Duration(config.getMilliseconds("akka.testconductor.packet-split-threshold"), MILLISECONDS)
}
/**
* Remote transport used by the actor ref provider.
*/
val transport = system.provider.asInstanceOf[RemoteActorRefProvider].transport
/**
* Transport address of this Netty-like remote transport.
*/
val address = transport.address
val failureInjectors = new ConcurrentHashMap[Address, FailureInjector]
/**
* INTERNAL API.
*
* [[akka.remote.testconductor.FailureInjector]]s register themselves here so that
* failures can be injected.
*/
private[akka] val failureInjectors = new ConcurrentHashMap[Address, FailureInjector]
}

View file

@ -31,7 +31,10 @@ import org.jboss.netty.channel.ChannelFuture
import org.jboss.netty.channel.ChannelFutureListener
import org.jboss.netty.channel.ChannelFuture
case class FailureInjector(sender: ActorRef, receiver: ActorRef) {
/**
* INTERNAL API.
*/
private[akka] case class FailureInjector(sender: ActorRef, receiver: ActorRef) {
def refs(dir: Direction) = dir match {
case Direction.Send Seq(sender)
case Direction.Receive Seq(receiver)
@ -39,12 +42,27 @@ case class FailureInjector(sender: ActorRef, receiver: ActorRef) {
}
}
object NetworkFailureInjector {
/**
* INTERNAL API.
*/
private[akka] object NetworkFailureInjector {
case class SetRate(rateMBit: Float)
case class Disconnect(abort: Boolean)
}
class NetworkFailureInjector(system: ActorSystem) extends SimpleChannelHandler {
/**
* Brief overview: all network traffic passes through the `sender`/`receiver` FSMs, which can
* pass through requests immediately, drop them or throttle to a desired rate. The FSMs are
* registered in the TestConductorExt.failureInjectors so that settings can be applied from
* the ClientFSMs.
*
* I found that simply forwarding events using ctx.sendUpstream/sendDownstream does not work,
* it deadlocks and gives strange errors; in the end I just trusted the Netty docs which
* recommend to prefer `Channels.write()` and `Channels.fireMessageReceived()`.
*
* INTERNAL API.
*/
private[akka] class NetworkFailureInjector(system: ActorSystem) extends SimpleChannelHandler {
val log = Logging(system, "FailureInjector")

View file

@ -94,7 +94,10 @@ trait Player { this: TestConductorExt ⇒
}
}
object ClientFSM {
/**
* INTERNAL API.
*/
private[akka] object ClientFSM {
sealed trait State
case object Connecting extends State
case object AwaitDone extends State
@ -116,8 +119,10 @@ object ClientFSM {
* done the same. After that, it will pass barrier requests to and from the
* coordinator and react to the [[akka.remote.testconductor.Conductor]]s
* requests for failure injection.
*
* INTERNAL API.
*/
class ClientFSM(name: RoleName, controllerAddr: InetSocketAddress) extends Actor with LoggingFSM[ClientFSM.State, ClientFSM.Data] {
private[akka] class ClientFSM(name: RoleName, controllerAddr: InetSocketAddress) extends Actor with LoggingFSM[ClientFSM.State, ClientFSM.Data] {
import ClientFSM._
val settings = TestConductor().Settings
@ -236,8 +241,10 @@ class ClientFSM(name: RoleName, controllerAddr: InetSocketAddress) extends Actor
/**
* This handler only forwards messages received from the conductor to the [[akka.remote.testconductor.ClientFSM]].
*
* INTERNAL API.
*/
class PlayerHandler(
private[akka] class PlayerHandler(
server: InetSocketAddress,
private var reconnects: Int,
backoff: Duration,

View file

@ -13,7 +13,10 @@ import org.jboss.netty.handler.timeout.{ ReadTimeoutHandler, ReadTimeoutExceptio
import java.net.InetSocketAddress
import java.util.concurrent.Executors
class TestConductorPipelineFactory(handler: ChannelUpstreamHandler) extends ChannelPipelineFactory {
/**
* INTERNAL API.
*/
private[akka] class TestConductorPipelineFactory(handler: ChannelUpstreamHandler) extends ChannelPipelineFactory {
def getPipeline: ChannelPipeline = {
val encap = List(new LengthFieldPrepender(4), new LengthFieldBasedFrameDecoder(10000, 0, 4, 0, 4))
val proto = List(new ProtobufEncoder, new ProtobufDecoder(TestConductorProtocol.Wrapper.getDefaultInstance))
@ -22,11 +25,23 @@ class TestConductorPipelineFactory(handler: ChannelUpstreamHandler) extends Chan
}
}
sealed trait Role
case object Client extends Role
case object Server extends Role
/**
* INTERNAL API.
*/
private[akka] sealed trait Role
/**
* INTERNAL API.
*/
private[akka] case object Client extends Role
/**
* INTERNAL API.
*/
private[akka] case object Server extends Role
object RemoteConnection {
/**
* INTERNAL API.
*/
private[akka] object RemoteConnection {
def apply(role: Role, sockaddr: InetSocketAddress, handler: ChannelUpstreamHandler): Channel = {
role match {
case Client

View file

@ -10,7 +10,10 @@ import akka.remote.RemoteActorRefProvider
import org.jboss.netty.channel.ChannelHandler
import org.jboss.netty.channel.ChannelPipelineFactory
class TestConductorTransport(_remoteSettings: RemoteSettings, _system: ActorSystemImpl, _provider: RemoteActorRefProvider)
/**
* INTERNAL API.
*/
private[akka] class TestConductorTransport(_remoteSettings: RemoteSettings, _system: ActorSystemImpl, _provider: RemoteActorRefProvider)
extends NettyRemoteTransport(_remoteSettings, _system, _provider) {
override def createPipeline(endpoint: ChannelHandler, withTimeout: Boolean): ChannelPipelineFactory =

View file

@ -1,31 +0,0 @@
package akka.remote
import akka.actor.Address
import testconductor.{ TestConductorProtocol TCP }
package object testconductor {
implicit def address2proto(addr: Address): TCP.Address =
TCP.Address.newBuilder
.setProtocol(addr.protocol)
.setSystem(addr.system)
.setHost(addr.host.get)
.setPort(addr.port.get)
.build
implicit def address2scala(addr: TCP.Address): Address =
Address(addr.getProtocol, addr.getSystem, addr.getHost, addr.getPort)
implicit def direction2proto(dir: Direction): TCP.Direction = dir match {
case Direction.Send TCP.Direction.Send
case Direction.Receive TCP.Direction.Receive
case Direction.Both TCP.Direction.Both
}
implicit def direction2scala(dir: TCP.Direction): Direction = dir match {
case TCP.Direction.Send Direction.Send
case TCP.Direction.Receive Direction.Receive
case TCP.Direction.Both Direction.Both
}
}

View file

@ -46,13 +46,31 @@ class NettyRemoteTransport(val remoteSettings: RemoteSettings, val system: Actor
Executors.newCachedThreadPool(system.threadFactory),
Executors.newCachedThreadPool(system.threadFactory))
/**
* Backing scaffolding for the default implementation of NettyRemoteSupport.createPipeline.
*/
object PipelineFactory {
/**
* Construct a StaticChannelPipeline from a sequence of handlers; to be used
* in implementations of ChannelPipelineFactory.
*/
def apply(handlers: Seq[ChannelHandler]): StaticChannelPipeline = new StaticChannelPipeline(handlers: _*)
/**
* Constructs the NettyRemoteTransport default pipeline with the give head handler, which
* is taken by-name to allow it not to be shared across pipelines.
*
* @param withTimeout determines whether an IdleStateHandler shall be included
*/
def apply(endpoint: Seq[ChannelHandler], withTimeout: Boolean): ChannelPipelineFactory =
new ChannelPipelineFactory {
def getPipeline = apply(defaultStack(withTimeout) ++ endpoint)
}
/**
* Construct a default protocol stack, excluding the head handler (i.e. the one which
* actually dispatches the received messages to the local target actors).
*/
def defaultStack(withTimeout: Boolean): Seq[ChannelHandler] =
(if (withTimeout) timeout :: Nil else Nil) :::
msgFormat :::
@ -60,17 +78,28 @@ class NettyRemoteTransport(val remoteSettings: RemoteSettings, val system: Actor
executionHandler ::
Nil
/**
* Construct an IdleStateHandler which uses [[akka.remote.netty.NettyRemoteTransport]].timer.
*/
def timeout = new IdleStateHandler(timer,
settings.ReadTimeout.toSeconds.toInt,
settings.WriteTimeout.toSeconds.toInt,
settings.AllTimeout.toSeconds.toInt)
/**
* Construct frame&protobuf encoder/decoder.
*/
def msgFormat = new LengthFieldBasedFrameDecoder(settings.MessageFrameSize, 0, 4, 0, 4) ::
new LengthFieldPrepender(4) ::
new RemoteMessageDecoder ::
new RemoteMessageEncoder(NettyRemoteTransport.this) ::
Nil
/**
* Construct an ExecutionHandler which is used to ensure that message dispatch does not
* happen on a netty thread (that could be bad if re-sending over the network for
* remote-deployed actors).
*/
val executionHandler = new ExecutionHandler(new OrderedMemoryAwareThreadPoolExecutor(
settings.ExecutionPoolSize,
settings.MaxChannelMemorySize,
@ -79,6 +108,11 @@ class NettyRemoteTransport(val remoteSettings: RemoteSettings, val system: Actor
settings.ExecutionPoolKeepalive.unit,
system.threadFactory))
/**
* Construct and authentication handler which uses the SecureCookie to somewhat
* protect the TCP port from unauthorized use (dont rely on it too much, though,
* as this is NOT a cryptographic feature).
*/
def authenticator = if (settings.RequireCookie) new RemoteServerAuthenticationHandler(settings.SecureCookie) :: Nil else Nil
}
@ -98,7 +132,8 @@ class NettyRemoteTransport(val remoteSettings: RemoteSettings, val system: Actor
/**
* Override this method to inject a subclass of NettyRemoteServer instead of
* the normal one, e.g. for inserting security hooks.
* the normal one, e.g. for inserting security hooks. If this method throws
* an exception, the transport will shut itself down and re-throw.
*/
protected def createServer(): NettyRemoteServer = new NettyRemoteServer(this)

View file

@ -32,7 +32,7 @@ object AkkaBuild extends Build {
Unidoc.unidocExclude := Seq(samples.id, tutorials.id),
Dist.distExclude := Seq(actorTests.id, akkaSbtPlugin.id, docs.id)
),
aggregate = Seq(actor, testkit, actorTests, remote, camel, cluster, slf4j, agent, transactor, mailboxes, zeroMQ, kernel, akkaSbtPlugin, actorMigration, samples, tutorials, docs)
aggregate = Seq(actor, testkit, actorTests, remote, remoteTests, camel, cluster, slf4j, agent, transactor, mailboxes, zeroMQ, kernel, akkaSbtPlugin, actorMigration, samples, tutorials, docs)
)
lazy val actor = Project(