porting to 2.0, making it compile: UNTESTED!
This commit is contained in:
parent
418b11d553
commit
6c786d20b8
5 changed files with 135 additions and 82 deletions
|
|
@ -31,9 +31,11 @@ class NettyRemoteTransport(val remoteSettings: RemoteSettings, val system: Actor
|
|||
|
||||
val settings = new NettySettings(remoteSettings.config.getConfig("akka.remote.netty"), remoteSettings.systemName)
|
||||
|
||||
// TODO replace by system.scheduler
|
||||
val timer: HashedWheelTimer = new HashedWheelTimer(system.threadFactory)
|
||||
|
||||
val executor = new OrderedMemoryAwareThreadPoolExecutor(
|
||||
// TODO make configurable
|
||||
lazy val executor = new OrderedMemoryAwareThreadPoolExecutor(
|
||||
settings.ExecutionPoolSize,
|
||||
settings.MaxChannelMemorySize,
|
||||
settings.MaxTotalMemorySize,
|
||||
|
|
@ -41,6 +43,7 @@ class NettyRemoteTransport(val remoteSettings: RemoteSettings, val system: Actor
|
|||
settings.ExecutionPoolKeepalive.unit,
|
||||
system.threadFactory)
|
||||
|
||||
// TODO make configurable/shareable with server socket factory
|
||||
val clientChannelFactory = new NioClientSocketChannelFactory(
|
||||
Executors.newCachedThreadPool(system.threadFactory),
|
||||
Executors.newCachedThreadPool(system.threadFactory))
|
||||
|
|
@ -50,9 +53,20 @@ class NettyRemoteTransport(val remoteSettings: RemoteSettings, val system: Actor
|
|||
|
||||
override protected def useUntrustedMode = remoteSettings.UntrustedMode
|
||||
|
||||
val server = try new NettyRemoteServer(this) catch {
|
||||
case ex ⇒ shutdown(); throw ex
|
||||
}
|
||||
val server: NettyRemoteServer = try createServer() catch { case NonFatal(ex) ⇒ shutdown(); throw ex }
|
||||
|
||||
/**
|
||||
* Override this method to inject a subclass of NettyRemoteServer instead of
|
||||
* the normal one, e.g. for altering the pipeline.
|
||||
*/
|
||||
protected def createServer(): NettyRemoteServer = new NettyRemoteServer(this)
|
||||
|
||||
/**
|
||||
* Override this method to inject a subclass of RemoteClient instead of
|
||||
* the normal one, e.g. for altering the pipeline. Get this transport’s
|
||||
* address from `this.address`.
|
||||
*/
|
||||
protected def createClient(recipient: Address): RemoteClient = new ActiveRemoteClient(this, recipient, address)
|
||||
|
||||
// the address is set in start() or from the RemoteServerHandler, whichever comes first
|
||||
private val _address = new AtomicReference[Address]
|
||||
|
|
@ -121,7 +135,7 @@ class NettyRemoteTransport(val remoteSettings: RemoteSettings, val system: Actor
|
|||
//Recheck for addition, race between upgrades
|
||||
case Some(client) ⇒ client //If already populated by other writer
|
||||
case None ⇒ //Populate map
|
||||
val client = new ActiveRemoteClient(this, recipientAddress, address)
|
||||
val client = createClient(recipientAddress)
|
||||
client.connect()
|
||||
remoteClients += recipientAddress -> client
|
||||
client
|
||||
|
|
|
|||
|
|
@ -37,13 +37,15 @@ class NettyRemoteServer(val netty: NettyRemoteTransport) {
|
|||
|
||||
private val bootstrap = {
|
||||
val b = new ServerBootstrap(factory)
|
||||
b.setPipelineFactory(new RemoteServerPipelineFactory(openChannels, executionHandler, netty))
|
||||
b.setPipelineFactory(makePipeline())
|
||||
b.setOption("backlog", settings.Backlog)
|
||||
b.setOption("tcpNoDelay", true)
|
||||
b.setOption("child.keepAlive", true)
|
||||
b.setOption("reuseAddress", true)
|
||||
b
|
||||
}
|
||||
|
||||
protected def makePipeline(): ChannelPipelineFactory = new RemoteServerPipelineFactory(openChannels, executionHandler, netty)
|
||||
|
||||
@volatile
|
||||
private[akka] var channel: Channel = _
|
||||
|
|
|
|||
|
|
@ -3,24 +3,41 @@
|
|||
*/
|
||||
package akka.remote.testconductor
|
||||
|
||||
import akka.actor.{ Actor, ActorRef, LoggingFSM, Timeout, UntypedChannel }
|
||||
import akka.event.EventHandler
|
||||
import akka.actor.{ Actor, ActorRef, ActorSystem, LoggingFSM, Props }
|
||||
import RemoteConnection.getAddrString
|
||||
import akka.util.duration._
|
||||
import TestConductorProtocol._
|
||||
import akka.NoStackTrace
|
||||
import org.jboss.netty.channel.{ Channel, SimpleChannelUpstreamHandler, ChannelHandlerContext, ChannelStateEvent, MessageEvent }
|
||||
import com.typesafe.config.ConfigFactory
|
||||
import akka.util.Timeout
|
||||
import akka.util.Duration
|
||||
import akka.util.duration._
|
||||
import akka.pattern.ask
|
||||
import java.util.concurrent.TimeUnit.MILLISECONDS
|
||||
import akka.dispatch.Await
|
||||
import akka.event.LoggingAdapter
|
||||
import akka.actor.PoisonPill
|
||||
import akka.event.Logging
|
||||
import scala.util.control.NoStackTrace
|
||||
|
||||
object Conductor extends RunControl with FailureInject with BarrierSync {
|
||||
|
||||
val system = ActorSystem("conductor", ConfigFactory.load().getConfig("conductor"))
|
||||
|
||||
object Settings {
|
||||
val config = system.settings.config
|
||||
|
||||
implicit val BarrierTimeout = Timeout(Duration(config.getMilliseconds("barrier-timeout"), MILLISECONDS))
|
||||
implicit val QueryTimeout = Timeout(Duration(config.getMilliseconds("query-timeout"), MILLISECONDS))
|
||||
}
|
||||
|
||||
import Controller._
|
||||
|
||||
private val controller = Actor.actorOf[Controller]
|
||||
private val controller = system.actorOf(Props[Controller], "controller")
|
||||
controller ! ClientConnected
|
||||
|
||||
override def enter(name: String*) {
|
||||
implicit val timeout = Timeout(30 seconds)
|
||||
name foreach (b ⇒ (controller ? EnterBarrier(b)).get)
|
||||
import Settings.BarrierTimeout
|
||||
name foreach (b ⇒ Await.result(controller ? EnterBarrier(b), Duration.Inf))
|
||||
}
|
||||
|
||||
override def throttle(node: String, target: String, direction: Direction, rateMBit: Float) {
|
||||
|
|
@ -47,7 +64,10 @@ object Conductor extends RunControl with FailureInject with BarrierSync {
|
|||
controller ! Terminate(node, -1)
|
||||
}
|
||||
|
||||
override def getNodes = (controller ? GetNodes).as[List[String]].get
|
||||
override def getNodes = {
|
||||
import Settings.QueryTimeout
|
||||
Await.result(controller ? GetNodes mapTo manifest[List[String]], Duration.Inf)
|
||||
}
|
||||
|
||||
override def removeNode(node: String) {
|
||||
controller ! Remove(node)
|
||||
|
|
@ -55,33 +75,33 @@ object Conductor extends RunControl with FailureInject with BarrierSync {
|
|||
|
||||
}
|
||||
|
||||
class ConductorHandler(controller: ActorRef) extends SimpleChannelUpstreamHandler {
|
||||
class ConductorHandler(system: ActorSystem, controller: ActorRef, log: LoggingAdapter) extends SimpleChannelUpstreamHandler {
|
||||
|
||||
var clients = Map[Channel, ActorRef]()
|
||||
|
||||
override def channelConnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = {
|
||||
val channel = event.getChannel
|
||||
EventHandler.debug(this, "connection from " + getAddrString(channel))
|
||||
val fsm = Actor.actorOf(new ServerFSM(controller, channel))
|
||||
log.debug("connection from {}", getAddrString(channel))
|
||||
val fsm = system.actorOf(Props(new ServerFSM(controller, channel)))
|
||||
clients += channel -> fsm
|
||||
}
|
||||
|
||||
override def channelDisconnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = {
|
||||
val channel = event.getChannel
|
||||
EventHandler.debug(this, "disconnect from " + getAddrString(channel))
|
||||
log.debug("disconnect from {}", getAddrString(channel))
|
||||
val fsm = clients(channel)
|
||||
fsm.stop()
|
||||
fsm ! PoisonPill
|
||||
clients -= channel
|
||||
}
|
||||
|
||||
override def messageReceived(ctx: ChannelHandlerContext, event: MessageEvent) = {
|
||||
val channel = event.getChannel
|
||||
EventHandler.debug(this, "message from " + getAddrString(channel) + ": " + event.getMessage)
|
||||
log.debug("message from {}: {}", getAddrString(channel), event.getMessage)
|
||||
event.getMessage match {
|
||||
case msg: Wrapper if msg.getAllFields.size == 1 ⇒
|
||||
clients(channel) ! msg
|
||||
case msg ⇒
|
||||
EventHandler.info(this, "client " + getAddrString(channel) + " sent garbage '" + msg + "', disconnecting")
|
||||
log.info("client {} sent garbage '{}', disconnecting", getAddrString(channel), msg)
|
||||
channel.close()
|
||||
}
|
||||
}
|
||||
|
|
@ -104,35 +124,35 @@ class ServerFSM(val controller: ActorRef, val channel: Channel) extends Actor wi
|
|||
startWith(Initial, null)
|
||||
|
||||
when(Initial, stateTimeout = 10 seconds) {
|
||||
case Ev(msg: Wrapper) ⇒
|
||||
case Event(msg: Wrapper, _) ⇒
|
||||
if (msg.hasHello) {
|
||||
val hello = msg.getHello
|
||||
controller ! ClientConnected(hello.getName, hello.getHost, hello.getPort)
|
||||
goto(Ready)
|
||||
} else {
|
||||
EventHandler.warning(this, "client " + getAddrString(channel) + " sent no Hello in first message, disconnecting")
|
||||
log.warning("client {} sent no Hello in first message, disconnecting", getAddrString(channel))
|
||||
channel.close()
|
||||
stop()
|
||||
}
|
||||
case Ev(StateTimeout) ⇒
|
||||
EventHandler.info(this, "closing channel to " + getAddrString(channel) + " because of Hello timeout")
|
||||
case Event(StateTimeout, _) ⇒
|
||||
log.info("closing channel to {} because of Hello timeout", getAddrString(channel))
|
||||
channel.close()
|
||||
stop()
|
||||
}
|
||||
|
||||
when(Ready) {
|
||||
case Ev(msg: Wrapper) ⇒
|
||||
case Event(msg: Wrapper, _) ⇒
|
||||
if (msg.hasBarrier) {
|
||||
val barrier = msg.getBarrier
|
||||
controller ! EnterBarrier(barrier.getName)
|
||||
} else {
|
||||
EventHandler.warning(this, "client " + getAddrString(channel) + " sent unsupported message " + msg)
|
||||
log.warning("client {} sent unsupported message {}", getAddrString(channel), msg)
|
||||
}
|
||||
stay
|
||||
case Ev(Send(msg)) ⇒
|
||||
case Event(Send(msg), _) ⇒
|
||||
channel.write(msg)
|
||||
stay
|
||||
case Ev(EnterBarrier(name)) ⇒
|
||||
case Event(EnterBarrier(name), _) ⇒
|
||||
val barrier = TestConductorProtocol.EnterBarrier.newBuilder.setName(name).build
|
||||
channel.write(Wrapper.newBuilder.setBarrier(barrier).build)
|
||||
stay
|
||||
|
|
@ -152,18 +172,19 @@ object Controller {
|
|||
class Controller extends Actor {
|
||||
import Controller._
|
||||
|
||||
val host = System.getProperty("akka.testconductor.host", "localhost")
|
||||
val port = Integer.getInteger("akka.testconductor.port", 4545)
|
||||
val connection = RemoteConnection(Server, host, port, new ConductorHandler(self))
|
||||
val config = context.system.settings.config
|
||||
|
||||
val barrier = Actor.actorOf[BarrierCoordinator]
|
||||
val host = config.getString("akka.testconductor.host")
|
||||
val port = config.getInt("akka.testconductor.port")
|
||||
val connection = RemoteConnection(Server, host, port,
|
||||
new ConductorHandler(context.system, self, Logging(context.system, "ConductorHandler")))
|
||||
|
||||
val barrier = context.actorOf(Props[BarrierCoordinator], "barriers")
|
||||
var nodes = Map[String, NodeInfo]()
|
||||
|
||||
override def receive = Actor.loggable(this) {
|
||||
override def receive = {
|
||||
case ClientConnected(name, host, port) ⇒
|
||||
self.channel match {
|
||||
case ref: ActorRef ⇒ nodes += name -> NodeInfo(name, host, port, ref)
|
||||
}
|
||||
nodes += name -> NodeInfo(name, host, port, sender)
|
||||
barrier forward ClientConnected
|
||||
case ClientConnected ⇒
|
||||
barrier forward ClientConnected
|
||||
|
|
@ -202,7 +223,7 @@ class Controller extends Actor {
|
|||
// TODO: properly remove node from BarrierCoordinator
|
||||
// case Remove(node) =>
|
||||
// nodes -= node
|
||||
case GetNodes ⇒ self reply nodes.keys
|
||||
case GetNodes ⇒ sender ! nodes.keys
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -211,7 +232,7 @@ object BarrierCoordinator {
|
|||
case object Idle extends State
|
||||
case object Waiting extends State
|
||||
|
||||
case class Data(clients: Int, barrier: String, arrived: List[UntypedChannel])
|
||||
case class Data(clients: Int, barrier: String, arrived: List[ActorRef])
|
||||
class BarrierTimeoutException(msg: String) extends RuntimeException(msg) with NoStackTrace
|
||||
}
|
||||
|
||||
|
|
@ -225,7 +246,7 @@ class BarrierCoordinator extends Actor with LoggingFSM[BarrierCoordinator.State,
|
|||
when(Idle) {
|
||||
case Event(EnterBarrier(name), Data(num, _, _)) ⇒
|
||||
if (num == 0) throw new IllegalStateException("no client expected yet")
|
||||
goto(Waiting) using Data(num, name, self.channel :: Nil)
|
||||
goto(Waiting) using Data(num, name, sender :: Nil)
|
||||
case Event(ClientConnected, d @ Data(num, _, _)) ⇒
|
||||
stay using d.copy(clients = num + 1)
|
||||
case Event(ClientDisconnected, d @ Data(num, _, _)) ⇒
|
||||
|
|
@ -241,7 +262,7 @@ class BarrierCoordinator extends Actor with LoggingFSM[BarrierCoordinator.State,
|
|||
when(Waiting) {
|
||||
case Event(e @ EnterBarrier(name), d @ Data(num, barrier, arrived)) ⇒
|
||||
if (name != barrier) throw new IllegalStateException("trying enter barrier '" + name + "' while barrier '" + barrier + "' is active")
|
||||
val together = self.channel :: arrived
|
||||
val together = sender :: arrived
|
||||
if (together.size == num) {
|
||||
together foreach (_ ! e)
|
||||
goto(Idle) using Data(num, "", Nil)
|
||||
|
|
@ -254,7 +275,7 @@ class BarrierCoordinator extends Actor with LoggingFSM[BarrierCoordinator.State,
|
|||
val expected = num - 1
|
||||
if (arrived.size == expected) {
|
||||
val e = EnterBarrier(barrier)
|
||||
self.channel :: arrived foreach (_ ! e)
|
||||
sender :: arrived foreach (_ ! e)
|
||||
goto(Idle) using Data(expected, "", Nil)
|
||||
} else {
|
||||
stay using d.copy(clients = expected)
|
||||
|
|
|
|||
|
|
@ -4,9 +4,7 @@
|
|||
package akka.remote.testconductor
|
||||
|
||||
import java.net.InetSocketAddress
|
||||
|
||||
import scala.collection.immutable.Queue
|
||||
|
||||
import org.jboss.netty.buffer.ChannelBuffer
|
||||
import org.jboss.netty.channel.ChannelState.BOUND
|
||||
import org.jboss.netty.channel.ChannelState.OPEN
|
||||
|
|
@ -17,34 +15,34 @@ import org.jboss.netty.channel.ChannelHandlerContext
|
|||
import org.jboss.netty.channel.ChannelStateEvent
|
||||
import org.jboss.netty.channel.ChannelUpstreamHandler
|
||||
import org.jboss.netty.channel.MessageEvent
|
||||
|
||||
import akka.actor.FSM
|
||||
import akka.actor.Actor
|
||||
import akka.util.duration.doubleToDurationDouble
|
||||
import akka.util.Index
|
||||
import akka.util.RemoteAddress
|
||||
import akka.actor.Address
|
||||
import akka.actor.ActorSystem
|
||||
import akka.actor.Props
|
||||
|
||||
object NetworkFailureInjector {
|
||||
|
||||
val channels = new Index[RemoteAddress, Channel]()
|
||||
val channels = new Index[Address, Channel](16, (c1, c2) => c1 compareTo c2)
|
||||
|
||||
def close(remote: RemoteAddress): Unit = {
|
||||
val set = channels.remove(remote)
|
||||
def close(remote: Address): Unit = {
|
||||
// channels will be cleaned up by the handler
|
||||
set foreach (_.close())
|
||||
for (chs <- channels.remove(remote); c <- chs) c.close()
|
||||
}
|
||||
}
|
||||
|
||||
class NetworkFailureInjector extends ChannelUpstreamHandler with ChannelDownstreamHandler {
|
||||
class NetworkFailureInjector(system: ActorSystem) extends ChannelUpstreamHandler with ChannelDownstreamHandler {
|
||||
|
||||
import NetworkFailureInjector._
|
||||
|
||||
// local cache of remote address
|
||||
private var remote: Option[RemoteAddress] = None
|
||||
private var remote: Option[Address] = None
|
||||
|
||||
// everything goes via these Throttle actors to enable easy steering
|
||||
private val sender = Actor.actorOf(new Throttle(_.sendDownstream(_)))
|
||||
private val receiver = Actor.actorOf(new Throttle(_.sendUpstream(_)))
|
||||
private val sender = system.actorOf(Props(new Throttle(_.sendDownstream(_))))
|
||||
private val receiver = system.actorOf(Props(new Throttle(_.sendUpstream(_))))
|
||||
|
||||
/*
|
||||
* State, Data and Messages for the internal Throttle actor
|
||||
|
|
@ -135,7 +133,7 @@ class NetworkFailureInjector extends ChannelUpstreamHandler with ChannelDownstre
|
|||
case null ⇒
|
||||
remote = remote flatMap { a ⇒ channels.remove(a, state.getChannel); None }
|
||||
case a: InetSocketAddress ⇒
|
||||
val addr = RemoteAddress(a)
|
||||
val addr = Address("akka", "XXX", a.getHostName, a.getPort)
|
||||
channels.put(addr, state.getChannel)
|
||||
remote = Some(addr)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -3,25 +3,42 @@
|
|||
*/
|
||||
package akka.remote.testconductor
|
||||
|
||||
import akka.actor.{ Actor, ActorRef, LoggingFSM, Timeout, UntypedChannel }
|
||||
import akka.event.EventHandler
|
||||
import akka.actor.{ Actor, ActorRef, ActorSystem, LoggingFSM, Props }
|
||||
import RemoteConnection.getAddrString
|
||||
import akka.util.duration._
|
||||
import TestConductorProtocol._
|
||||
import akka.NoStackTrace
|
||||
import org.jboss.netty.channel.{ Channel, SimpleChannelUpstreamHandler, ChannelHandlerContext, ChannelStateEvent, MessageEvent }
|
||||
import com.eaio.uuid.UUID
|
||||
import com.typesafe.config.ConfigFactory
|
||||
import akka.util.Timeout
|
||||
import akka.util.Duration
|
||||
import java.util.concurrent.TimeUnit.MILLISECONDS
|
||||
import akka.pattern.ask
|
||||
import akka.dispatch.Await
|
||||
import scala.util.control.NoStackTrace
|
||||
import akka.actor.Status
|
||||
import akka.event.LoggingAdapter
|
||||
import akka.actor.PoisonPill
|
||||
import akka.event.Logging
|
||||
|
||||
object Player extends BarrierSync {
|
||||
|
||||
private val server = Actor.actorOf[ClientFSM]
|
||||
val system = ActorSystem("Player", ConfigFactory.load().getConfig("player"))
|
||||
|
||||
object Settings {
|
||||
val config = system.settings.config
|
||||
|
||||
implicit val BarrierTimeout = Timeout(Duration(config.getMilliseconds("barrier-timeout"), MILLISECONDS))
|
||||
}
|
||||
|
||||
private val server = system.actorOf(Props[ClientFSM], "client")
|
||||
|
||||
override def enter(name: String*) {
|
||||
EventHandler.debug(this, "entering barriers " + name.mkString("(", ", ", ")"))
|
||||
implicit val timeout = Timeout(30 seconds)
|
||||
system.log.debug("entering barriers " + name.mkString("(", ", ", ")"))
|
||||
name foreach { b ⇒
|
||||
(server ? EnterBarrier(b)).get
|
||||
EventHandler.debug(this, "passed barrier " + b)
|
||||
import Settings.BarrierTimeout
|
||||
Await.result(server ? EnterBarrier(b), Duration.Inf)
|
||||
system.log.debug("passed barrier {}", b)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -31,7 +48,7 @@ object ClientFSM {
|
|||
case object Connecting extends State
|
||||
case object Connected extends State
|
||||
|
||||
case class Data(channel: Channel, msg: Either[List[ClientOp], (String, UntypedChannel)])
|
||||
case class Data(channel: Channel, msg: Either[List[ClientOp], (String, ActorRef)])
|
||||
|
||||
class ConnectionFailure(msg: String) extends RuntimeException(msg) with NoStackTrace
|
||||
case object Disconnected
|
||||
|
|
@ -39,14 +56,16 @@ object ClientFSM {
|
|||
|
||||
class ClientFSM extends Actor with LoggingFSM[ClientFSM.State, ClientFSM.Data] {
|
||||
import ClientFSM._
|
||||
import akka.actor.FSM._
|
||||
|
||||
val name = System.getProperty("akka.testconductor.name", (new UUID).toString)
|
||||
val host = System.getProperty("akka.testconductor.host", "localhost")
|
||||
val port = Integer.getInteger("akka.testconductor.port", 4545)
|
||||
val handler = new PlayerHandler(self)
|
||||
val config = context.system.settings.config
|
||||
|
||||
val myself = Actor.remote.address
|
||||
val name = config.getString("akka.testconductor.name")
|
||||
val host = config.getString("akka.testconductor.host")
|
||||
val port = config.getInt("akka.testconductor.port")
|
||||
val handler = new PlayerHandler(self, Logging(context.system, "PlayerHandler"))
|
||||
|
||||
val myself = "XXX"
|
||||
val myport = 12345
|
||||
|
||||
startWith(Connecting, Data(RemoteConnection(Client, host, port, handler), Left(Nil)))
|
||||
|
||||
|
|
@ -54,7 +73,7 @@ class ClientFSM extends Actor with LoggingFSM[ClientFSM.State, ClientFSM.Data] {
|
|||
case Event(msg: ClientOp, Data(channel, Left(msgs))) ⇒
|
||||
stay using Data(channel, Left(msg :: msgs))
|
||||
case Event(Connected, Data(channel, Left(msgs))) ⇒
|
||||
val hello = Hello.newBuilder.setName(name).setHost(myself.getAddress.getHostAddress).setPort(myself.getPort).build
|
||||
val hello = Hello.newBuilder.setName(name).setHost(myself).setPort(myport).build
|
||||
channel.write(Wrapper.newBuilder.setHello(hello).build)
|
||||
msgs.reverse foreach sendMsg(channel)
|
||||
goto(Connected) using Data(channel, Left(Nil))
|
||||
|
|
@ -62,23 +81,23 @@ class ClientFSM extends Actor with LoggingFSM[ClientFSM.State, ClientFSM.Data] {
|
|||
// System.exit(1)
|
||||
stop
|
||||
case Event(StateTimeout, _) ⇒
|
||||
EventHandler.error(this, "connect timeout to TestConductor")
|
||||
log.error("connect timeout to TestConductor")
|
||||
// System.exit(1)
|
||||
stop
|
||||
}
|
||||
|
||||
when(Connected) {
|
||||
case Event(Disconnected, _) ⇒
|
||||
EventHandler.info(this, "disconnected from TestConductor")
|
||||
log.info("disconnected from TestConductor")
|
||||
throw new ConnectionFailure("disconnect")
|
||||
case Event(msg: EnterBarrier, Data(channel, _)) ⇒
|
||||
sendMsg(channel)(msg)
|
||||
stay using Data(channel, Right((msg.name, self.channel)))
|
||||
stay using Data(channel, Right((msg.name, sender)))
|
||||
case Event(msg: Wrapper, Data(channel, Right((barrier, sender)))) if msg.getAllFields.size == 1 ⇒
|
||||
if (msg.hasBarrier) {
|
||||
val b = msg.getBarrier.getName
|
||||
if (b != barrier) {
|
||||
sender.sendException(new RuntimeException("wrong barrier " + b + " received while waiting for " + barrier))
|
||||
sender ! Status.Failure(new RuntimeException("wrong barrier " + b + " received while waiting for " + barrier))
|
||||
} else {
|
||||
sender ! b
|
||||
}
|
||||
|
|
@ -101,31 +120,30 @@ class ClientFSM extends Actor with LoggingFSM[ClientFSM.State, ClientFSM.Data] {
|
|||
|
||||
}
|
||||
|
||||
class PlayerHandler(fsm: ActorRef) extends SimpleChannelUpstreamHandler {
|
||||
class PlayerHandler(fsm: ActorRef, log: LoggingAdapter) extends SimpleChannelUpstreamHandler {
|
||||
|
||||
import ClientFSM._
|
||||
|
||||
override def channelConnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = {
|
||||
val channel = event.getChannel
|
||||
EventHandler.debug(this, "connected to " + getAddrString(channel))
|
||||
while (!fsm.isRunning) Thread.sleep(100)
|
||||
log.debug("connected to {}", getAddrString(channel))
|
||||
fsm ! Connected
|
||||
}
|
||||
|
||||
override def channelDisconnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = {
|
||||
val channel = event.getChannel
|
||||
EventHandler.debug(this, "disconnected from " + getAddrString(channel))
|
||||
fsm.stop()
|
||||
log.debug("disconnected from {}", getAddrString(channel))
|
||||
fsm ! PoisonPill
|
||||
}
|
||||
|
||||
override def messageReceived(ctx: ChannelHandlerContext, event: MessageEvent) = {
|
||||
val channel = event.getChannel
|
||||
EventHandler.debug(this, "message from " + getAddrString(channel) + ": " + event.getMessage)
|
||||
log.debug("message from {}: {}", getAddrString(channel), event.getMessage)
|
||||
event.getMessage match {
|
||||
case msg: Wrapper if msg.getAllFields.size == 1 ⇒
|
||||
fsm ! msg
|
||||
case msg ⇒
|
||||
EventHandler.info(this, "server " + getAddrString(channel) + " sent garbage '" + msg + "', disconnecting")
|
||||
log.info("server {} sent garbage '{}', disconnecting", getAddrString(channel), msg)
|
||||
channel.close()
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue