From 6c786d20b808b4b5989076223fcc7da0d27e71f9 Mon Sep 17 00:00:00 2001 From: Roland Date: Wed, 2 May 2012 21:56:26 +0200 Subject: [PATCH] porting to 2.0, making it compile: UNTESTED! --- .../remote/netty/NettyRemoteSupport.scala | 24 ++++- .../main/scala/akka/remote/netty/Server.scala | 4 +- .../akka/remote/testconductor/Conductor.scala | 93 ++++++++++++------- .../NetworkFailureInjector.scala | 24 +++-- .../akka/remote/testconductor/Player.scala | 72 ++++++++------ 5 files changed, 135 insertions(+), 82 deletions(-) diff --git a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala index 8acd33c7fb..55e2d95636 100644 --- a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala +++ b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala @@ -31,9 +31,11 @@ class NettyRemoteTransport(val remoteSettings: RemoteSettings, val system: Actor val settings = new NettySettings(remoteSettings.config.getConfig("akka.remote.netty"), remoteSettings.systemName) + // TODO replace by system.scheduler val timer: HashedWheelTimer = new HashedWheelTimer(system.threadFactory) - val executor = new OrderedMemoryAwareThreadPoolExecutor( + // TODO make configurable + lazy val executor = new OrderedMemoryAwareThreadPoolExecutor( settings.ExecutionPoolSize, settings.MaxChannelMemorySize, settings.MaxTotalMemorySize, @@ -41,6 +43,7 @@ class NettyRemoteTransport(val remoteSettings: RemoteSettings, val system: Actor settings.ExecutionPoolKeepalive.unit, system.threadFactory) + // TODO make configurable/shareable with server socket factory val clientChannelFactory = new NioClientSocketChannelFactory( Executors.newCachedThreadPool(system.threadFactory), Executors.newCachedThreadPool(system.threadFactory)) @@ -50,9 +53,20 @@ class NettyRemoteTransport(val remoteSettings: RemoteSettings, val system: Actor override protected def useUntrustedMode = remoteSettings.UntrustedMode - val server = try new NettyRemoteServer(this) catch { - case ex ⇒ shutdown(); throw ex - } + val server: NettyRemoteServer = try createServer() catch { case NonFatal(ex) ⇒ shutdown(); throw ex } + + /** + * Override this method to inject a subclass of NettyRemoteServer instead of + * the normal one, e.g. for altering the pipeline. + */ + protected def createServer(): NettyRemoteServer = new NettyRemoteServer(this) + + /** + * Override this method to inject a subclass of RemoteClient instead of + * the normal one, e.g. for altering the pipeline. Get this transport’s + * address from `this.address`. + */ + protected def createClient(recipient: Address): RemoteClient = new ActiveRemoteClient(this, recipient, address) // the address is set in start() or from the RemoteServerHandler, whichever comes first private val _address = new AtomicReference[Address] @@ -121,7 +135,7 @@ class NettyRemoteTransport(val remoteSettings: RemoteSettings, val system: Actor //Recheck for addition, race between upgrades case Some(client) ⇒ client //If already populated by other writer case None ⇒ //Populate map - val client = new ActiveRemoteClient(this, recipientAddress, address) + val client = createClient(recipientAddress) client.connect() remoteClients += recipientAddress -> client client diff --git a/akka-remote/src/main/scala/akka/remote/netty/Server.scala b/akka-remote/src/main/scala/akka/remote/netty/Server.scala index 1f18b27c8c..97d3f194f3 100644 --- a/akka-remote/src/main/scala/akka/remote/netty/Server.scala +++ b/akka-remote/src/main/scala/akka/remote/netty/Server.scala @@ -37,13 +37,15 @@ class NettyRemoteServer(val netty: NettyRemoteTransport) { private val bootstrap = { val b = new ServerBootstrap(factory) - b.setPipelineFactory(new RemoteServerPipelineFactory(openChannels, executionHandler, netty)) + b.setPipelineFactory(makePipeline()) b.setOption("backlog", settings.Backlog) b.setOption("tcpNoDelay", true) b.setOption("child.keepAlive", true) b.setOption("reuseAddress", true) b } + + protected def makePipeline(): ChannelPipelineFactory = new RemoteServerPipelineFactory(openChannels, executionHandler, netty) @volatile private[akka] var channel: Channel = _ diff --git a/akka-remote/src/main/scala/akka/remote/testconductor/Conductor.scala b/akka-remote/src/main/scala/akka/remote/testconductor/Conductor.scala index 58a6a5f88e..3265fc8808 100644 --- a/akka-remote/src/main/scala/akka/remote/testconductor/Conductor.scala +++ b/akka-remote/src/main/scala/akka/remote/testconductor/Conductor.scala @@ -3,24 +3,41 @@ */ package akka.remote.testconductor -import akka.actor.{ Actor, ActorRef, LoggingFSM, Timeout, UntypedChannel } -import akka.event.EventHandler +import akka.actor.{ Actor, ActorRef, ActorSystem, LoggingFSM, Props } import RemoteConnection.getAddrString -import akka.util.duration._ import TestConductorProtocol._ -import akka.NoStackTrace import org.jboss.netty.channel.{ Channel, SimpleChannelUpstreamHandler, ChannelHandlerContext, ChannelStateEvent, MessageEvent } +import com.typesafe.config.ConfigFactory +import akka.util.Timeout +import akka.util.Duration +import akka.util.duration._ +import akka.pattern.ask +import java.util.concurrent.TimeUnit.MILLISECONDS +import akka.dispatch.Await +import akka.event.LoggingAdapter +import akka.actor.PoisonPill +import akka.event.Logging +import scala.util.control.NoStackTrace object Conductor extends RunControl with FailureInject with BarrierSync { + val system = ActorSystem("conductor", ConfigFactory.load().getConfig("conductor")) + + object Settings { + val config = system.settings.config + + implicit val BarrierTimeout = Timeout(Duration(config.getMilliseconds("barrier-timeout"), MILLISECONDS)) + implicit val QueryTimeout = Timeout(Duration(config.getMilliseconds("query-timeout"), MILLISECONDS)) + } + import Controller._ - private val controller = Actor.actorOf[Controller] + private val controller = system.actorOf(Props[Controller], "controller") controller ! ClientConnected override def enter(name: String*) { - implicit val timeout = Timeout(30 seconds) - name foreach (b ⇒ (controller ? EnterBarrier(b)).get) + import Settings.BarrierTimeout + name foreach (b ⇒ Await.result(controller ? EnterBarrier(b), Duration.Inf)) } override def throttle(node: String, target: String, direction: Direction, rateMBit: Float) { @@ -47,7 +64,10 @@ object Conductor extends RunControl with FailureInject with BarrierSync { controller ! Terminate(node, -1) } - override def getNodes = (controller ? GetNodes).as[List[String]].get + override def getNodes = { + import Settings.QueryTimeout + Await.result(controller ? GetNodes mapTo manifest[List[String]], Duration.Inf) + } override def removeNode(node: String) { controller ! Remove(node) @@ -55,33 +75,33 @@ object Conductor extends RunControl with FailureInject with BarrierSync { } -class ConductorHandler(controller: ActorRef) extends SimpleChannelUpstreamHandler { +class ConductorHandler(system: ActorSystem, controller: ActorRef, log: LoggingAdapter) extends SimpleChannelUpstreamHandler { var clients = Map[Channel, ActorRef]() override def channelConnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = { val channel = event.getChannel - EventHandler.debug(this, "connection from " + getAddrString(channel)) - val fsm = Actor.actorOf(new ServerFSM(controller, channel)) + log.debug("connection from {}", getAddrString(channel)) + val fsm = system.actorOf(Props(new ServerFSM(controller, channel))) clients += channel -> fsm } override def channelDisconnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = { val channel = event.getChannel - EventHandler.debug(this, "disconnect from " + getAddrString(channel)) + log.debug("disconnect from {}", getAddrString(channel)) val fsm = clients(channel) - fsm.stop() + fsm ! PoisonPill clients -= channel } override def messageReceived(ctx: ChannelHandlerContext, event: MessageEvent) = { val channel = event.getChannel - EventHandler.debug(this, "message from " + getAddrString(channel) + ": " + event.getMessage) + log.debug("message from {}: {}", getAddrString(channel), event.getMessage) event.getMessage match { case msg: Wrapper if msg.getAllFields.size == 1 ⇒ clients(channel) ! msg case msg ⇒ - EventHandler.info(this, "client " + getAddrString(channel) + " sent garbage '" + msg + "', disconnecting") + log.info("client {} sent garbage '{}', disconnecting", getAddrString(channel), msg) channel.close() } } @@ -104,35 +124,35 @@ class ServerFSM(val controller: ActorRef, val channel: Channel) extends Actor wi startWith(Initial, null) when(Initial, stateTimeout = 10 seconds) { - case Ev(msg: Wrapper) ⇒ + case Event(msg: Wrapper, _) ⇒ if (msg.hasHello) { val hello = msg.getHello controller ! ClientConnected(hello.getName, hello.getHost, hello.getPort) goto(Ready) } else { - EventHandler.warning(this, "client " + getAddrString(channel) + " sent no Hello in first message, disconnecting") + log.warning("client {} sent no Hello in first message, disconnecting", getAddrString(channel)) channel.close() stop() } - case Ev(StateTimeout) ⇒ - EventHandler.info(this, "closing channel to " + getAddrString(channel) + " because of Hello timeout") + case Event(StateTimeout, _) ⇒ + log.info("closing channel to {} because of Hello timeout", getAddrString(channel)) channel.close() stop() } when(Ready) { - case Ev(msg: Wrapper) ⇒ + case Event(msg: Wrapper, _) ⇒ if (msg.hasBarrier) { val barrier = msg.getBarrier controller ! EnterBarrier(barrier.getName) } else { - EventHandler.warning(this, "client " + getAddrString(channel) + " sent unsupported message " + msg) + log.warning("client {} sent unsupported message {}", getAddrString(channel), msg) } stay - case Ev(Send(msg)) ⇒ + case Event(Send(msg), _) ⇒ channel.write(msg) stay - case Ev(EnterBarrier(name)) ⇒ + case Event(EnterBarrier(name), _) ⇒ val barrier = TestConductorProtocol.EnterBarrier.newBuilder.setName(name).build channel.write(Wrapper.newBuilder.setBarrier(barrier).build) stay @@ -152,18 +172,19 @@ object Controller { class Controller extends Actor { import Controller._ - val host = System.getProperty("akka.testconductor.host", "localhost") - val port = Integer.getInteger("akka.testconductor.port", 4545) - val connection = RemoteConnection(Server, host, port, new ConductorHandler(self)) + val config = context.system.settings.config - val barrier = Actor.actorOf[BarrierCoordinator] + val host = config.getString("akka.testconductor.host") + val port = config.getInt("akka.testconductor.port") + val connection = RemoteConnection(Server, host, port, + new ConductorHandler(context.system, self, Logging(context.system, "ConductorHandler"))) + + val barrier = context.actorOf(Props[BarrierCoordinator], "barriers") var nodes = Map[String, NodeInfo]() - override def receive = Actor.loggable(this) { + override def receive = { case ClientConnected(name, host, port) ⇒ - self.channel match { - case ref: ActorRef ⇒ nodes += name -> NodeInfo(name, host, port, ref) - } + nodes += name -> NodeInfo(name, host, port, sender) barrier forward ClientConnected case ClientConnected ⇒ barrier forward ClientConnected @@ -202,7 +223,7 @@ class Controller extends Actor { // TODO: properly remove node from BarrierCoordinator // case Remove(node) => // nodes -= node - case GetNodes ⇒ self reply nodes.keys + case GetNodes ⇒ sender ! nodes.keys } } @@ -211,7 +232,7 @@ object BarrierCoordinator { case object Idle extends State case object Waiting extends State - case class Data(clients: Int, barrier: String, arrived: List[UntypedChannel]) + case class Data(clients: Int, barrier: String, arrived: List[ActorRef]) class BarrierTimeoutException(msg: String) extends RuntimeException(msg) with NoStackTrace } @@ -225,7 +246,7 @@ class BarrierCoordinator extends Actor with LoggingFSM[BarrierCoordinator.State, when(Idle) { case Event(EnterBarrier(name), Data(num, _, _)) ⇒ if (num == 0) throw new IllegalStateException("no client expected yet") - goto(Waiting) using Data(num, name, self.channel :: Nil) + goto(Waiting) using Data(num, name, sender :: Nil) case Event(ClientConnected, d @ Data(num, _, _)) ⇒ stay using d.copy(clients = num + 1) case Event(ClientDisconnected, d @ Data(num, _, _)) ⇒ @@ -241,7 +262,7 @@ class BarrierCoordinator extends Actor with LoggingFSM[BarrierCoordinator.State, when(Waiting) { case Event(e @ EnterBarrier(name), d @ Data(num, barrier, arrived)) ⇒ if (name != barrier) throw new IllegalStateException("trying enter barrier '" + name + "' while barrier '" + barrier + "' is active") - val together = self.channel :: arrived + val together = sender :: arrived if (together.size == num) { together foreach (_ ! e) goto(Idle) using Data(num, "", Nil) @@ -254,7 +275,7 @@ class BarrierCoordinator extends Actor with LoggingFSM[BarrierCoordinator.State, val expected = num - 1 if (arrived.size == expected) { val e = EnterBarrier(barrier) - self.channel :: arrived foreach (_ ! e) + sender :: arrived foreach (_ ! e) goto(Idle) using Data(expected, "", Nil) } else { stay using d.copy(clients = expected) diff --git a/akka-remote/src/main/scala/akka/remote/testconductor/NetworkFailureInjector.scala b/akka-remote/src/main/scala/akka/remote/testconductor/NetworkFailureInjector.scala index eec6a2cbf1..88102b5e86 100644 --- a/akka-remote/src/main/scala/akka/remote/testconductor/NetworkFailureInjector.scala +++ b/akka-remote/src/main/scala/akka/remote/testconductor/NetworkFailureInjector.scala @@ -4,9 +4,7 @@ package akka.remote.testconductor import java.net.InetSocketAddress - import scala.collection.immutable.Queue - import org.jboss.netty.buffer.ChannelBuffer import org.jboss.netty.channel.ChannelState.BOUND import org.jboss.netty.channel.ChannelState.OPEN @@ -17,34 +15,34 @@ import org.jboss.netty.channel.ChannelHandlerContext import org.jboss.netty.channel.ChannelStateEvent import org.jboss.netty.channel.ChannelUpstreamHandler import org.jboss.netty.channel.MessageEvent - import akka.actor.FSM import akka.actor.Actor import akka.util.duration.doubleToDurationDouble import akka.util.Index -import akka.util.RemoteAddress +import akka.actor.Address +import akka.actor.ActorSystem +import akka.actor.Props object NetworkFailureInjector { - val channels = new Index[RemoteAddress, Channel]() + val channels = new Index[Address, Channel](16, (c1, c2) => c1 compareTo c2) - def close(remote: RemoteAddress): Unit = { - val set = channels.remove(remote) + def close(remote: Address): Unit = { // channels will be cleaned up by the handler - set foreach (_.close()) + for (chs <- channels.remove(remote); c <- chs) c.close() } } -class NetworkFailureInjector extends ChannelUpstreamHandler with ChannelDownstreamHandler { +class NetworkFailureInjector(system: ActorSystem) extends ChannelUpstreamHandler with ChannelDownstreamHandler { import NetworkFailureInjector._ // local cache of remote address - private var remote: Option[RemoteAddress] = None + private var remote: Option[Address] = None // everything goes via these Throttle actors to enable easy steering - private val sender = Actor.actorOf(new Throttle(_.sendDownstream(_))) - private val receiver = Actor.actorOf(new Throttle(_.sendUpstream(_))) + private val sender = system.actorOf(Props(new Throttle(_.sendDownstream(_)))) + private val receiver = system.actorOf(Props(new Throttle(_.sendUpstream(_)))) /* * State, Data and Messages for the internal Throttle actor @@ -135,7 +133,7 @@ class NetworkFailureInjector extends ChannelUpstreamHandler with ChannelDownstre case null ⇒ remote = remote flatMap { a ⇒ channels.remove(a, state.getChannel); None } case a: InetSocketAddress ⇒ - val addr = RemoteAddress(a) + val addr = Address("akka", "XXX", a.getHostName, a.getPort) channels.put(addr, state.getChannel) remote = Some(addr) } diff --git a/akka-remote/src/main/scala/akka/remote/testconductor/Player.scala b/akka-remote/src/main/scala/akka/remote/testconductor/Player.scala index 16abe5bb27..029045394c 100644 --- a/akka-remote/src/main/scala/akka/remote/testconductor/Player.scala +++ b/akka-remote/src/main/scala/akka/remote/testconductor/Player.scala @@ -3,25 +3,42 @@ */ package akka.remote.testconductor -import akka.actor.{ Actor, ActorRef, LoggingFSM, Timeout, UntypedChannel } -import akka.event.EventHandler +import akka.actor.{ Actor, ActorRef, ActorSystem, LoggingFSM, Props } import RemoteConnection.getAddrString import akka.util.duration._ import TestConductorProtocol._ -import akka.NoStackTrace import org.jboss.netty.channel.{ Channel, SimpleChannelUpstreamHandler, ChannelHandlerContext, ChannelStateEvent, MessageEvent } import com.eaio.uuid.UUID +import com.typesafe.config.ConfigFactory +import akka.util.Timeout +import akka.util.Duration +import java.util.concurrent.TimeUnit.MILLISECONDS +import akka.pattern.ask +import akka.dispatch.Await +import scala.util.control.NoStackTrace +import akka.actor.Status +import akka.event.LoggingAdapter +import akka.actor.PoisonPill +import akka.event.Logging object Player extends BarrierSync { - private val server = Actor.actorOf[ClientFSM] + val system = ActorSystem("Player", ConfigFactory.load().getConfig("player")) + + object Settings { + val config = system.settings.config + + implicit val BarrierTimeout = Timeout(Duration(config.getMilliseconds("barrier-timeout"), MILLISECONDS)) + } + + private val server = system.actorOf(Props[ClientFSM], "client") override def enter(name: String*) { - EventHandler.debug(this, "entering barriers " + name.mkString("(", ", ", ")")) - implicit val timeout = Timeout(30 seconds) + system.log.debug("entering barriers " + name.mkString("(", ", ", ")")) name foreach { b ⇒ - (server ? EnterBarrier(b)).get - EventHandler.debug(this, "passed barrier " + b) + import Settings.BarrierTimeout + Await.result(server ? EnterBarrier(b), Duration.Inf) + system.log.debug("passed barrier {}", b) } } } @@ -31,7 +48,7 @@ object ClientFSM { case object Connecting extends State case object Connected extends State - case class Data(channel: Channel, msg: Either[List[ClientOp], (String, UntypedChannel)]) + case class Data(channel: Channel, msg: Either[List[ClientOp], (String, ActorRef)]) class ConnectionFailure(msg: String) extends RuntimeException(msg) with NoStackTrace case object Disconnected @@ -39,14 +56,16 @@ object ClientFSM { class ClientFSM extends Actor with LoggingFSM[ClientFSM.State, ClientFSM.Data] { import ClientFSM._ - import akka.actor.FSM._ - val name = System.getProperty("akka.testconductor.name", (new UUID).toString) - val host = System.getProperty("akka.testconductor.host", "localhost") - val port = Integer.getInteger("akka.testconductor.port", 4545) - val handler = new PlayerHandler(self) + val config = context.system.settings.config - val myself = Actor.remote.address + val name = config.getString("akka.testconductor.name") + val host = config.getString("akka.testconductor.host") + val port = config.getInt("akka.testconductor.port") + val handler = new PlayerHandler(self, Logging(context.system, "PlayerHandler")) + + val myself = "XXX" + val myport = 12345 startWith(Connecting, Data(RemoteConnection(Client, host, port, handler), Left(Nil))) @@ -54,7 +73,7 @@ class ClientFSM extends Actor with LoggingFSM[ClientFSM.State, ClientFSM.Data] { case Event(msg: ClientOp, Data(channel, Left(msgs))) ⇒ stay using Data(channel, Left(msg :: msgs)) case Event(Connected, Data(channel, Left(msgs))) ⇒ - val hello = Hello.newBuilder.setName(name).setHost(myself.getAddress.getHostAddress).setPort(myself.getPort).build + val hello = Hello.newBuilder.setName(name).setHost(myself).setPort(myport).build channel.write(Wrapper.newBuilder.setHello(hello).build) msgs.reverse foreach sendMsg(channel) goto(Connected) using Data(channel, Left(Nil)) @@ -62,23 +81,23 @@ class ClientFSM extends Actor with LoggingFSM[ClientFSM.State, ClientFSM.Data] { // System.exit(1) stop case Event(StateTimeout, _) ⇒ - EventHandler.error(this, "connect timeout to TestConductor") + log.error("connect timeout to TestConductor") // System.exit(1) stop } when(Connected) { case Event(Disconnected, _) ⇒ - EventHandler.info(this, "disconnected from TestConductor") + log.info("disconnected from TestConductor") throw new ConnectionFailure("disconnect") case Event(msg: EnterBarrier, Data(channel, _)) ⇒ sendMsg(channel)(msg) - stay using Data(channel, Right((msg.name, self.channel))) + stay using Data(channel, Right((msg.name, sender))) case Event(msg: Wrapper, Data(channel, Right((barrier, sender)))) if msg.getAllFields.size == 1 ⇒ if (msg.hasBarrier) { val b = msg.getBarrier.getName if (b != barrier) { - sender.sendException(new RuntimeException("wrong barrier " + b + " received while waiting for " + barrier)) + sender ! Status.Failure(new RuntimeException("wrong barrier " + b + " received while waiting for " + barrier)) } else { sender ! b } @@ -101,31 +120,30 @@ class ClientFSM extends Actor with LoggingFSM[ClientFSM.State, ClientFSM.Data] { } -class PlayerHandler(fsm: ActorRef) extends SimpleChannelUpstreamHandler { +class PlayerHandler(fsm: ActorRef, log: LoggingAdapter) extends SimpleChannelUpstreamHandler { import ClientFSM._ override def channelConnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = { val channel = event.getChannel - EventHandler.debug(this, "connected to " + getAddrString(channel)) - while (!fsm.isRunning) Thread.sleep(100) + log.debug("connected to {}", getAddrString(channel)) fsm ! Connected } override def channelDisconnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = { val channel = event.getChannel - EventHandler.debug(this, "disconnected from " + getAddrString(channel)) - fsm.stop() + log.debug("disconnected from {}", getAddrString(channel)) + fsm ! PoisonPill } override def messageReceived(ctx: ChannelHandlerContext, event: MessageEvent) = { val channel = event.getChannel - EventHandler.debug(this, "message from " + getAddrString(channel) + ": " + event.getMessage) + log.debug("message from {}: {}", getAddrString(channel), event.getMessage) event.getMessage match { case msg: Wrapper if msg.getAllFields.size == 1 ⇒ fsm ! msg case msg ⇒ - EventHandler.info(this, "server " + getAddrString(channel) + " sent garbage '" + msg + "', disconnecting") + log.info("server {} sent garbage '{}', disconnecting", getAddrString(channel), msg) channel.close() } }