TestConductor: convert to Akka Extension and add test

- make start-up synchronous and explicit for client and server
- server can be queried for actual port, client requires explicit port
- simple multi-jvm-test for verification of TestConductor barriers
This commit is contained in:
Roland 2012-05-03 20:48:27 +02:00
parent 6c786d20b8
commit 5cf0fa66f8
12 changed files with 1170 additions and 353 deletions

View file

@ -19,14 +19,20 @@ message Wrapper {
message Hello {
required string name = 1;
required string host = 2;
required int32 port = 3;
required Address address = 2;
}
message EnterBarrier {
required string name = 1;
}
message Address {
required string protocol = 1;
required string system = 2;
required string host = 3;
required int32 port = 4;
}
enum FailType {
Throttle = 1;
Disconnect = 2;
@ -40,9 +46,8 @@ enum Direction {
message InjectFailure {
required FailType failure = 1;
optional Direction direction = 2;
optional string host = 3;
optional int32 port = 4;
optional float rateMBit = 5;
optional int32 exitValue = 6;
optional Address address = 3;
optional float rateMBit = 6;
optional int32 exitValue = 7;
}

View file

@ -155,4 +155,25 @@ akka {
type = PinnedDispatcher
}
}
testconductor {
# Timeout for joining a barrier: this is the maximum time any participants
# waits for everybody else to join a named barrier.
barrier-timeout = 30s
# Timeout for interrogation of TestConductors Controller actor
query-timeout = 5s
# Default port to start the conductor on; 0 means <auto>
port = 0
# Hostname of the TestConductor server, used by the server to bind to the IP
# and by the client to connect to it.
host = localhost
# Name of the TestConductor client (for identification on the server e.g. for
# failure injection)
name = "noname"
}
}

View file

@ -56,13 +56,13 @@ class NettyRemoteTransport(val remoteSettings: RemoteSettings, val system: Actor
val server: NettyRemoteServer = try createServer() catch { case NonFatal(ex) shutdown(); throw ex }
/**
* Override this method to inject a subclass of NettyRemoteServer instead of
* Override this method to inject a subclass of NettyRemoteServer instead of
* the normal one, e.g. for altering the pipeline.
*/
protected def createServer(): NettyRemoteServer = new NettyRemoteServer(this)
/**
* Override this method to inject a subclass of RemoteClient instead of
* Override this method to inject a subclass of RemoteClient instead of
* the normal one, e.g. for altering the pipeline. Get this transports
* address from `this.address`.
*/

View file

@ -44,7 +44,7 @@ class NettyRemoteServer(val netty: NettyRemoteTransport) {
b.setOption("reuseAddress", true)
b
}
protected def makePipeline(): ChannelPipelineFactory = new RemoteServerPipelineFactory(openChannels, executionHandler, netty)
@volatile

View file

@ -18,26 +18,30 @@ import akka.event.LoggingAdapter
import akka.actor.PoisonPill
import akka.event.Logging
import scala.util.control.NoStackTrace
import akka.event.LoggingReceive
import akka.actor.Address
import java.net.InetSocketAddress
object Conductor extends RunControl with FailureInject with BarrierSync {
val system = ActorSystem("conductor", ConfigFactory.load().getConfig("conductor"))
object Settings {
val config = system.settings.config
implicit val BarrierTimeout = Timeout(Duration(config.getMilliseconds("barrier-timeout"), MILLISECONDS))
implicit val QueryTimeout = Timeout(Duration(config.getMilliseconds("query-timeout"), MILLISECONDS))
}
trait Conductor extends RunControl with FailureInject { this: TestConductorExt
import Controller._
private val controller = system.actorOf(Props[Controller], "controller")
controller ! ClientConnected
private var _controller: ActorRef = _
private def controller: ActorRef = _controller match {
case null throw new RuntimeException("TestConductorServer was not started")
case x x
}
override def enter(name: String*) {
override def startController() {
if (_controller ne null) throw new RuntimeException("TestConductorServer was already started")
_controller = system.actorOf(Props[Controller], "controller")
import Settings.BarrierTimeout
name foreach (b Await.result(controller ? EnterBarrier(b), Duration.Inf))
startClient(Await.result(controller ? GetPort mapTo, Duration.Inf))
}
override def port: Int = {
import Settings.QueryTimeout
Await.result(controller ? GetPort mapTo, Duration.Inf)
}
override def throttle(node: String, target: String, direction: Direction, rateMBit: Float) {
@ -127,7 +131,7 @@ class ServerFSM(val controller: ActorRef, val channel: Channel) extends Actor wi
case Event(msg: Wrapper, _)
if (msg.hasHello) {
val hello = msg.getHello
controller ! ClientConnected(hello.getName, hello.getHost, hello.getPort)
controller ! ClientConnected(hello.getName, hello.getAddress)
goto(Ready)
} else {
log.warning("client {} sent no Hello in first message, disconnecting", getAddrString(channel))
@ -162,29 +166,28 @@ class ServerFSM(val controller: ActorRef, val channel: Channel) extends Actor wi
}
object Controller {
case class ClientConnected(name: String, host: String, port: Int)
case class ClientConnected(name: String, address: Address)
case class ClientDisconnected(name: String)
case object GetNodes
case object GetPort
case class NodeInfo(name: String, host: String, port: Int, fsm: ActorRef)
case class NodeInfo(name: String, addr: Address, fsm: ActorRef)
}
class Controller extends Actor {
import Controller._
val config = context.system.settings.config
val host = config.getString("akka.testconductor.host")
val port = config.getInt("akka.testconductor.port")
val connection = RemoteConnection(Server, host, port,
val settings = TestConductor().Settings
val connection = RemoteConnection(Server, settings.host, settings.port,
new ConductorHandler(context.system, self, Logging(context.system, "ConductorHandler")))
val barrier = context.actorOf(Props[BarrierCoordinator], "barriers")
var nodes = Map[String, NodeInfo]()
override def receive = {
case ClientConnected(name, host, port)
nodes += name -> NodeInfo(name, host, port, sender)
override def receive = LoggingReceive {
case "ready?" sender ! "yes"
case ClientConnected(name, addr)
nodes += name -> NodeInfo(name, addr, sender)
barrier forward ClientConnected
case ClientConnected
barrier forward ClientConnected
@ -199,8 +202,7 @@ class Controller extends Actor {
InjectFailure.newBuilder
.setFailure(FailType.Throttle)
.setDirection(TestConductorProtocol.Direction.valueOf(direction.toString))
.setHost(t.host)
.setPort(t.port)
.setAddress(t.addr)
.setRateMBit(rateMBit)
.build
nodes(node).fsm ! ServerFSM.Send(Wrapper.newBuilder.setFailure(throttle).build)
@ -209,8 +211,7 @@ class Controller extends Actor {
val disconnect =
InjectFailure.newBuilder
.setFailure(if (abort) FailType.Abort else FailType.Disconnect)
.setHost(t.host)
.setPort(t.port)
.setAddress(t.addr)
.build
nodes(node).fsm ! ServerFSM.Send(Wrapper.newBuilder.setFailure(disconnect).build)
case Terminate(node, exitValueOrKill)
@ -224,6 +225,10 @@ class Controller extends Actor {
// case Remove(node) =>
// nodes -= node
case GetNodes sender ! nodes.keys
case GetPort
sender ! (connection.getLocalAddress match {
case inet: InetSocketAddress inet.getPort
})
}
}

View file

@ -0,0 +1,31 @@
package akka.remote.testconductor
import akka.actor.ExtensionKey
import akka.actor.Extension
import akka.actor.ExtendedActorSystem
import akka.remote.RemoteActorRefProvider
import akka.actor.ActorContext
import akka.util.{ Duration, Timeout }
import java.util.concurrent.TimeUnit.MILLISECONDS
object TestConductor extends ExtensionKey[TestConductorExt] {
def apply()(implicit ctx: ActorContext): TestConductorExt = apply(ctx.system)
}
class TestConductorExt(val system: ExtendedActorSystem) extends Extension with Conductor with Player {
object Settings {
val config = system.settings.config
implicit val BarrierTimeout = Timeout(Duration(config.getMilliseconds("akka.testconductor.barrier-timeout"), MILLISECONDS))
implicit val QueryTimeout = Timeout(Duration(config.getMilliseconds("akka.testconductor.query-timeout"), MILLISECONDS))
val name = config.getString("akka.testconductor.name")
val host = config.getString("akka.testconductor.host")
val port = config.getInt("akka.testconductor.port")
}
val transport = system.provider.asInstanceOf[RemoteActorRefProvider].transport
val address = transport.address
}

View file

@ -49,6 +49,16 @@ trait FailureInject {
trait RunControl {
/**
* Start the server port.
*/
def startController(): Unit
/**
* Get the actual port used by the server.
*/
def port: Int
/**
* Tell the remote node to shut itself down using System.exit with the given
* exitValue.

View file

@ -25,11 +25,11 @@ import akka.actor.Props
object NetworkFailureInjector {
val channels = new Index[Address, Channel](16, (c1, c2) => c1 compareTo c2)
val channels = new Index[Address, Channel](16, (c1, c2) c1 compareTo c2)
def close(remote: Address): Unit = {
// channels will be cleaned up by the handler
for (chs <- channels.remove(remote); c <- chs) c.close()
for (chs channels.remove(remote); c chs) c.close()
}
}

View file

@ -21,23 +21,40 @@ import akka.event.LoggingAdapter
import akka.actor.PoisonPill
import akka.event.Logging
object Player extends BarrierSync {
trait Player extends BarrierSync { this: TestConductorExt
val system = ActorSystem("Player", ConfigFactory.load().getConfig("player"))
object Settings {
val config = system.settings.config
implicit val BarrierTimeout = Timeout(Duration(config.getMilliseconds("barrier-timeout"), MILLISECONDS))
private var _client: ActorRef = _
private def client = _client match {
case null throw new IllegalStateException("TestConductor client not yet started")
case x x
}
private val server = system.actorOf(Props[ClientFSM], "client")
def startClient(port: Int) {
import ClientFSM._
import akka.actor.FSM._
import Settings.BarrierTimeout
if (_client ne null) throw new IllegalStateException("TestConductorClient already started")
_client = system.actorOf(Props(new ClientFSM(port)), "TestConductorClient")
val a = system.actorOf(Props(new Actor {
var waiting: ActorRef = _
def receive = {
case fsm: ActorRef waiting = sender; fsm ! SubscribeTransitionCallBack(self)
case Transition(_, Connecting, Connected) waiting ! "okay"
case t: Transition[_] waiting ! Status.Failure(new RuntimeException("unexpected transition: " + t))
case CurrentState(_, Connected) waiting ! "okay"
case _: CurrentState[_]
}
}))
Await.result(a ? client, Duration.Inf)
}
override def enter(name: String*) {
system.log.debug("entering barriers " + name.mkString("(", ", ", ")"))
name foreach { b
import Settings.BarrierTimeout
Await.result(server ? EnterBarrier(b), Duration.Inf)
Await.result(client ? EnterBarrier(b), Duration.Inf)
system.log.debug("passed barrier {}", b)
}
}
@ -48,35 +65,28 @@ object ClientFSM {
case object Connecting extends State
case object Connected extends State
case class Data(channel: Channel, msg: Either[List[ClientOp], (String, ActorRef)])
case class Data(channel: Channel, barrier: Option[(String, ActorRef)])
class ConnectionFailure(msg: String) extends RuntimeException(msg) with NoStackTrace
case object Disconnected
}
class ClientFSM extends Actor with LoggingFSM[ClientFSM.State, ClientFSM.Data] {
class ClientFSM(port: Int) extends Actor with LoggingFSM[ClientFSM.State, ClientFSM.Data] {
import ClientFSM._
val config = context.system.settings.config
val settings = TestConductor().Settings
val name = config.getString("akka.testconductor.name")
val host = config.getString("akka.testconductor.host")
val port = config.getInt("akka.testconductor.port")
val handler = new PlayerHandler(self, Logging(context.system, "PlayerHandler"))
val myself = "XXX"
val myport = 12345
startWith(Connecting, Data(RemoteConnection(Client, host, port, handler), Left(Nil)))
startWith(Connecting, Data(RemoteConnection(Client, settings.host, port, handler), None))
when(Connecting, stateTimeout = 10 seconds) {
case Event(msg: ClientOp, Data(channel, Left(msgs)))
stay using Data(channel, Left(msg :: msgs))
case Event(Connected, Data(channel, Left(msgs)))
val hello = Hello.newBuilder.setName(name).setHost(myself).setPort(myport).build
case Event(msg: ClientOp, _)
stay replying Status.Failure(new IllegalStateException("not connected yet"))
case Event(Connected, d @ Data(channel, _))
val hello = Hello.newBuilder.setName(settings.name).setAddress(TestConductor().address).build
channel.write(Wrapper.newBuilder.setHello(hello).build)
msgs.reverse foreach sendMsg(channel)
goto(Connected) using Data(channel, Left(Nil))
goto(Connected)
case Event(_: ConnectionFailure, _)
// System.exit(1)
stop
@ -92,8 +102,8 @@ class ClientFSM extends Actor with LoggingFSM[ClientFSM.State, ClientFSM.Data] {
throw new ConnectionFailure("disconnect")
case Event(msg: EnterBarrier, Data(channel, _))
sendMsg(channel)(msg)
stay using Data(channel, Right((msg.name, sender)))
case Event(msg: Wrapper, Data(channel, Right((barrier, sender)))) if msg.getAllFields.size == 1
stay using Data(channel, Some(msg.name, sender))
case Event(msg: Wrapper, Data(channel, Some((barrier, sender)))) if msg.getAllFields.size == 1
if (msg.hasBarrier) {
val b = msg.getBarrier.getName
if (b != barrier) {
@ -102,7 +112,7 @@ class ClientFSM extends Actor with LoggingFSM[ClientFSM.State, ClientFSM.Data] {
sender ! b
}
}
stay using Data(channel, Left(Nil))
stay using Data(channel, None)
}
onTermination {
@ -110,6 +120,8 @@ class ClientFSM extends Actor with LoggingFSM[ClientFSM.State, ClientFSM.Data] {
channel.close()
}
initialize
private def sendMsg(channel: Channel)(msg: ClientOp) {
msg match {
case EnterBarrier(name)

View file

@ -0,0 +1,19 @@
package akka.remote
import akka.actor.Address
import testconductor.{ TestConductorProtocol TCP }
package object testconductor {
implicit def address2proto(addr: Address): TCP.Address =
TCP.Address.newBuilder
.setProtocol(addr.protocol)
.setSystem(addr.system)
.setHost(addr.host.get)
.setPort(addr.port.get)
.build
implicit def address2scala(addr: TCP.Address): Address =
Address(addr.getProtocol, addr.getSystem, addr.getHost, addr.getPort)
}

View file

@ -0,0 +1,52 @@
package akka.remote.testconductor
import akka.remote.AkkaRemoteSpec
import com.typesafe.config.ConfigFactory
import akka.remote.AbstractRemoteActorMultiJvmSpec
object TestConductorMultiJvmSpec extends AbstractRemoteActorMultiJvmSpec {
override def NrOfNodes = 2
override def commonConfig = ConfigFactory.parseString("""
akka.loglevel = DEBUG
akka.actor.provider = akka.remote.RemoteActorRefProvider
akka.actor.debug {
receive = on
fsm = on
}
akka.testconductor {
host = localhost
port = 4712
}
""")
def nameConfig(n: Int) = ConfigFactory.parseString("akka.testconductor.name = node" + n).withFallback(nodeConfigs(n))
}
import TestConductorMultiJvmSpec._
class TestConductorMultiJvmNode1 extends AkkaRemoteSpec(nameConfig(0)) {
val nodes = TestConductorMultiJvmSpec.NrOfNodes
"running a test" in {
val tc = TestConductor(system)
tc.startController()
barrier("start")
barrier("first")
tc.enter("begin")
barrier("end")
}
}
class TestConductorMultiJvmNode2 extends AkkaRemoteSpec(nameConfig(1)) {
val nodes = TestConductorMultiJvmSpec.NrOfNodes
"running a test" in {
barrier("start")
val tc = TestConductor(system)
tc.startClient(4712)
barrier("first")
tc.enter("begin")
barrier("end")
}
}