From ac1ee9ae919d34252acf87f264b05e5b422bc8fe Mon Sep 17 00:00:00 2001 From: Roland Date: Fri, 27 Jan 2012 13:30:43 +0100 Subject: [PATCH] rework use of ClassLoaders, see #1736 --- .../main/scala/akka/actor/ActorSystem.scala | 14 ++++---- .../akka/dispatch/AbstractDispatcher.scala | 5 +-- .../akka/dispatch/ThreadPoolBuilder.scala | 2 ++ .../akka/serialization/Serialization.scala | 4 +-- .../akka/actor/mailbox/DurableMailbox.scala | 2 +- .../actor/mailbox/BSONSerialization.scala | 2 +- .../scala/akka/remote/MessageSerializer.scala | 4 +-- .../akka/remote/RemoteActorRefProvider.scala | 34 +++++++++---------- .../akka/remote/RemoteConnectionManager.scala | 2 +- .../scala/akka/remote/RemoteTransport.scala | 7 ++-- .../main/scala/akka/remote/netty/Client.scala | 5 ++- .../remote/netty/NettyRemoteSupport.scala | 28 ++++++--------- .../main/scala/akka/remote/netty/Server.scala | 12 +++---- 13 files changed, 54 insertions(+), 67 deletions(-) diff --git a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala index 50b69b9ae8..81e0761bb0 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala @@ -332,7 +332,9 @@ class ActorSystemImpl(val name: String, applicationConfig: Config) extends Exten import ActorSystem._ final val settings: Settings = new Settings(applicationConfig, name) - final val threadFactory: MonitorableThreadFactory = new MonitorableThreadFactory(name, settings.Daemonicity) + + final val threadFactory: MonitorableThreadFactory = + MonitorableThreadFactory(name, settings.Daemonicity, Option(Thread.currentThread.getContextClassLoader)) def logConfiguration(): Unit = log.info(settings.toString) @@ -391,10 +393,7 @@ class ActorSystemImpl(val name: String, applicationConfig: Config) extends Exten classOf[EventStream] -> eventStream, classOf[Scheduler] -> scheduler) - val loader = Thread.currentThread.getContextClassLoader match { - case null ⇒ getClass.getClassLoader - case l ⇒ l - } + val loader = Option(Thread.currentThread.getContextClassLoader) getOrElse getClass.getClassLoader ReflectiveAccess.createInstance[ActorRefProvider](ProviderClass, arguments, loader) match { case Left(e) ⇒ throw e @@ -534,9 +533,10 @@ class ActorSystemImpl(val name: String, applicationConfig: Config) extends Exten private def loadExtensions() { import scala.collection.JavaConversions._ + val loader = Option(Thread.currentThread.getContextClassLoader) getOrElse this.getClass.getClassLoader settings.config.getStringList("akka.extensions") foreach { fqcn ⇒ - import ReflectiveAccess._ - getObjectFor[AnyRef](fqcn).fold(_ ⇒ createInstance[AnyRef](fqcn, noParams, noArgs), Right(_)) match { + import ReflectiveAccess.{ getObjectFor, createInstance, noParams, noArgs } + getObjectFor[AnyRef](fqcn, loader).fold(_ ⇒ createInstance[AnyRef](fqcn, noParams, noArgs), Right(_)) match { case Right(p: ExtensionIdProvider) ⇒ registerExtension(p.lookup()); case Right(p: ExtensionId[_]) ⇒ registerExtension(p); case Right(other) ⇒ log.error("[{}] is not an 'ExtensionIdProvider' or 'ExtensionId', skipping...", fqcn) diff --git a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala index 29be04fe40..2f6b330cc8 100644 --- a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala @@ -321,8 +321,9 @@ abstract class MessageDispatcherConfigurator(val config: Config, val prerequisit BoundedMailbox(capacity, duration) } case fqcn ⇒ - val constructorSignature = Array[Class[_]](classOf[Config]) - ReflectiveAccess.createInstance[MailboxType](fqcn, constructorSignature, Array[AnyRef](config)) match { + val args = Seq(classOf[Config] -> config) + val loader = Option(Thread.currentThread.getContextClassLoader) getOrElse getClass.getClassLoader + ReflectiveAccess.createInstance[MailboxType](fqcn, args, loader) match { case Right(instance) ⇒ instance case Left(exception) ⇒ throw new IllegalArgumentException( diff --git a/akka-actor/src/main/scala/akka/dispatch/ThreadPoolBuilder.scala b/akka-actor/src/main/scala/akka/dispatch/ThreadPoolBuilder.scala index f8927f667a..8998ccca03 100644 --- a/akka-actor/src/main/scala/akka/dispatch/ThreadPoolBuilder.scala +++ b/akka-actor/src/main/scala/akka/dispatch/ThreadPoolBuilder.scala @@ -159,6 +159,7 @@ object MonitorableThreadFactory { case class MonitorableThreadFactory(name: String, daemonic: Boolean, + contextClassLoader: Option[ClassLoader], exceptionHandler: Thread.UncaughtExceptionHandler = MonitorableThreadFactory.doNothing) extends ThreadFactory { protected val counter = new AtomicLong @@ -167,6 +168,7 @@ case class MonitorableThreadFactory(name: String, val t = new Thread(runnable, name + counter.incrementAndGet()) t.setUncaughtExceptionHandler(exceptionHandler) t.setDaemon(daemonic) + contextClassLoader foreach (t.setContextClassLoader(_)) t } } diff --git a/akka-actor/src/main/scala/akka/serialization/Serialization.scala b/akka-actor/src/main/scala/akka/serialization/Serialization.scala index f56862b2fb..750b2e5c35 100644 --- a/akka-actor/src/main/scala/akka/serialization/Serialization.scala +++ b/akka-actor/src/main/scala/akka/serialization/Serialization.scala @@ -81,10 +81,10 @@ class Serialization(val system: ExtendedActorSystem) extends Extension { def deserialize(bytes: Array[Byte], serializerId: Int, clazz: Option[Class[_]], - classLoader: Option[ClassLoader]): Either[Exception, AnyRef] = + classLoader: ClassLoader): Either[Exception, AnyRef] = try { currentSystem.withValue(system) { - Right(serializerByIdentity(serializerId).fromBinary(bytes, clazz, classLoader)) + Right(serializerByIdentity(serializerId).fromBinary(bytes, clazz, Some(classLoader))) } } catch { case e: Exception ⇒ Left(e) } diff --git a/akka-durable-mailboxes/akka-mailboxes-common/src/main/scala/akka/actor/mailbox/DurableMailbox.scala b/akka-durable-mailboxes/akka-mailboxes-common/src/main/scala/akka/actor/mailbox/DurableMailbox.scala index e00520d92c..6638c380bf 100644 --- a/akka-durable-mailboxes/akka-mailboxes-common/src/main/scala/akka/actor/mailbox/DurableMailbox.scala +++ b/akka-durable-mailboxes/akka-mailboxes-common/src/main/scala/akka/actor/mailbox/DurableMailbox.scala @@ -44,7 +44,7 @@ trait DurableMessageSerialization { def deserializeActorRef(refProtocol: ActorRefProtocol): ActorRef = owner.system.actorFor(refProtocol.getPath) val durableMessage = RemoteMessageProtocol.parseFrom(bytes) - val message = MessageSerializer.deserialize(owner.system, durableMessage.getMessage) + val message = MessageSerializer.deserialize(owner.system, durableMessage.getMessage, getClass.getClassLoader) val sender = deserializeActorRef(durableMessage.getSender) new Envelope(message, sender)(owner.system) diff --git a/akka-durable-mailboxes/akka-mongo-mailbox/src/main/scala/akka/actor/mailbox/BSONSerialization.scala b/akka-durable-mailboxes/akka-mongo-mailbox/src/main/scala/akka/actor/mailbox/BSONSerialization.scala index 470993fdf3..217f46d6ec 100644 --- a/akka-durable-mailboxes/akka-mongo-mailbox/src/main/scala/akka/actor/mailbox/BSONSerialization.scala +++ b/akka-durable-mailboxes/akka-mongo-mailbox/src/main/scala/akka/actor/mailbox/BSONSerialization.scala @@ -67,7 +67,7 @@ class BSONSerializableMailbox(system: ActorSystem) extends SerializableBSONObjec val doc = deserializer.decodeAndFetch(in).asInstanceOf[BSONDocument] system.log.debug("Deserializing a durable message from MongoDB: {}", doc) val msgData = MessageProtocol.parseFrom(doc.as[org.bson.types.Binary]("message").getData) - val msg = MessageSerializer.deserialize(system, msgData) + val msg = MessageSerializer.deserialize(system, msgData, getClass.getClassLoader) val ownerPath = doc.as[String]("ownerPath") val senderPath = doc.as[String]("senderPath") val sender = systemImpl.actorFor(senderPath) diff --git a/akka-remote/src/main/scala/akka/remote/MessageSerializer.scala b/akka-remote/src/main/scala/akka/remote/MessageSerializer.scala index 878023c064..5301f2bdd0 100644 --- a/akka-remote/src/main/scala/akka/remote/MessageSerializer.scala +++ b/akka-remote/src/main/scala/akka/remote/MessageSerializer.scala @@ -12,11 +12,11 @@ import akka.util.ReflectiveAccess object MessageSerializer { - def deserialize(system: ActorSystem, messageProtocol: MessageProtocol, classLoader: Option[ClassLoader] = None): AnyRef = { + def deserialize(system: ActorSystem, messageProtocol: MessageProtocol, classLoader: ClassLoader): AnyRef = { val clazz = if (messageProtocol.hasMessageManifest) { Option(ReflectiveAccess.getClassFor[AnyRef]( messageProtocol.getMessageManifest.toStringUtf8, - classLoader.getOrElse(ReflectiveAccess.loader)) match { + classLoader) match { case Left(e) ⇒ throw e case Right(r) ⇒ r }) diff --git a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala index aefa770eaf..61b675fe55 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala @@ -82,16 +82,15 @@ class RemoteActorRefProvider( _transport = { val fqn = remoteSettings.RemoteTransport - // TODO check if this classloader is the right one; hint: this class was loaded by contextClassLoader if that was not null - ReflectiveAccess.createInstance[RemoteTransport]( - fqn, - Seq(classOf[RemoteSettings] -> remoteSettings, - classOf[ActorSystemImpl] -> system, - classOf[RemoteActorRefProvider] -> this), - getClass.getClassLoader) match { - case Left(problem) ⇒ throw new RemoteTransportException("Could not load remote transport layer " + fqn, problem) - case Right(remote) ⇒ remote - } + val args = Seq( + classOf[RemoteSettings] -> remoteSettings, + classOf[ActorSystemImpl] -> system, + classOf[RemoteActorRefProvider] -> this) + + ReflectiveAccess.createInstance[RemoteTransport](fqn, args, getClass.getClassLoader) match { + case Left(problem) ⇒ throw new RemoteTransportException("Could not load remote transport layer " + fqn, problem) + case Right(remote) ⇒ remote + } } _log = Logging(eventStream, "RemoteActorRefProvider(" + transport.address + ")") @@ -164,7 +163,7 @@ class RemoteActorRefProvider( else { val rpath = RootActorPath(addr) / "remote" / transport.address.hostPort / path.elements useActorOnNode(rpath, props.creator, supervisor) - new RemoteActorRef(this, transport, rpath, supervisor, None) + new RemoteActorRef(this, transport, rpath, supervisor) } case _ ⇒ local.actorOf(system, props, supervisor, path, systemService, deployment) @@ -174,12 +173,12 @@ class RemoteActorRefProvider( def actorFor(path: ActorPath): InternalActorRef = if (path.address == rootPath.address || path.address == transport.address) actorFor(rootGuardian, path.elements) - else new RemoteActorRef(this, transport, path, Nobody, None) + else new RemoteActorRef(this, transport, path, Nobody) def actorFor(ref: InternalActorRef, path: String): InternalActorRef = path match { case ActorPathExtractor(address, elems) ⇒ if (address == rootPath.address || address == transport.address) actorFor(rootGuardian, elems) - else new RemoteActorRef(this, transport, new RootActorPath(address) / elems, Nobody, None) + else new RemoteActorRef(this, transport, new RootActorPath(address) / elems, Nobody) case _ ⇒ local.actorFor(ref, path) } @@ -208,8 +207,7 @@ private[akka] class RemoteActorRef private[akka] ( val provider: RemoteActorRefProvider, remote: RemoteTransport, val path: ActorPath, - val getParent: InternalActorRef, - loader: Option[ClassLoader]) + val getParent: InternalActorRef) extends InternalActorRef with RemoteRef { def getChild(name: Iterator[String]): InternalActorRef = { @@ -217,7 +215,7 @@ private[akka] class RemoteActorRef private[akka] ( s.headOption match { case None ⇒ this case Some("..") ⇒ getParent getChild name - case _ ⇒ new RemoteActorRef(provider, remote, path / s, Nobody, loader) + case _ ⇒ new RemoteActorRef(provider, remote, path / s, Nobody) } } @@ -226,9 +224,9 @@ private[akka] class RemoteActorRef private[akka] ( def isTerminated: Boolean = !running - def sendSystemMessage(message: SystemMessage): Unit = remote.send(message, None, this, loader) + def sendSystemMessage(message: SystemMessage): Unit = remote.send(message, None, this) - override def !(message: Any)(implicit sender: ActorRef = null): Unit = remote.send(message, Option(sender), this, loader) + override def !(message: Any)(implicit sender: ActorRef = null): Unit = remote.send(message, Option(sender), this) def suspend(): Unit = sendSystemMessage(Suspend()) diff --git a/akka-remote/src/main/scala/akka/remote/RemoteConnectionManager.scala b/akka-remote/src/main/scala/akka/remote/RemoteConnectionManager.scala index 9cd974a274..fd2a9135d7 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteConnectionManager.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteConnectionManager.scala @@ -146,5 +146,5 @@ class RemoteConnectionManager( } private[remote] def newConnection(remoteAddress: Address, actorPath: ActorPath) = - new RemoteActorRef(remote, remote.transport, actorPath, Nobody, None) + new RemoteActorRef(remote, remote.transport, actorPath, Nobody) } diff --git a/akka-remote/src/main/scala/akka/remote/RemoteTransport.scala b/akka-remote/src/main/scala/akka/remote/RemoteTransport.scala index 8d29111672..c2bc63457a 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteTransport.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteTransport.scala @@ -217,8 +217,7 @@ abstract class RemoteTransport { protected[akka] def send(message: Any, senderOption: Option[ActorRef], - recipient: RemoteActorRef, - loader: Option[ClassLoader]): Unit + recipient: RemoteActorRef): Unit protected[akka] def notifyListeners(message: RemoteLifeCycleEvent): Unit = { system.eventStream.publish(message) @@ -228,7 +227,7 @@ abstract class RemoteTransport { override def toString = address.toString } -class RemoteMessage(input: RemoteMessageProtocol, system: ActorSystemImpl, classLoader: Option[ClassLoader]) { +class RemoteMessage(input: RemoteMessageProtocol, system: ActorSystemImpl) { def originalReceiver = input.getRecipient.getPath @@ -238,7 +237,7 @@ class RemoteMessage(input: RemoteMessageProtocol, system: ActorSystemImpl, class lazy val recipient: InternalActorRef = system.provider.actorFor(system.provider.rootGuardian, originalReceiver) - lazy val payload: AnyRef = MessageSerializer.deserialize(system, input.getMessage, classLoader) + lazy val payload: AnyRef = MessageSerializer.deserialize(system, input.getMessage, getClass.getClassLoader) override def toString = "RemoteMessage: " + payload + " to " + recipient + "<+{" + originalReceiver + "} from " + sender } diff --git a/akka-remote/src/main/scala/akka/remote/netty/Client.scala b/akka-remote/src/main/scala/akka/remote/netty/Client.scala index b72fd8b893..2d7d6218f7 100644 --- a/akka-remote/src/main/scala/akka/remote/netty/Client.scala +++ b/akka-remote/src/main/scala/akka/remote/netty/Client.scala @@ -100,8 +100,7 @@ abstract class RemoteClient private[akka] ( class ActiveRemoteClient private[akka] ( netty: NettyRemoteTransport, remoteAddress: Address, - localAddress: Address, - val loader: Option[ClassLoader] = None) + localAddress: Address) extends RemoteClient(netty, remoteAddress) { import netty.settings @@ -253,7 +252,7 @@ class ActiveRemoteClientHandler( } case arp: AkkaRemoteProtocol if arp.hasMessage ⇒ - client.netty.receiveMessage(new RemoteMessage(arp.getMessage, client.netty.system, client.loader)) + client.netty.receiveMessage(new RemoteMessage(arp.getMessage, client.netty.system)) case other ⇒ throw new RemoteClientException("Unknown message received in remote client handler: " + other, client.netty, client.remoteAddress) diff --git a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala index 7ad10c92ff..28dbdb3df6 100644 --- a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala +++ b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala @@ -4,31 +4,24 @@ package akka.remote.netty -import java.net.{ UnknownHostException, InetAddress } import java.util.concurrent.atomic.AtomicBoolean import java.util.concurrent.locks.ReentrantReadWriteLock import java.util.concurrent.Executors + import scala.collection.mutable.HashMap + import org.jboss.netty.channel.group.{ DefaultChannelGroup, ChannelGroupFuture } import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory -import org.jboss.netty.channel.{ ChannelHandlerContext, ChannelFutureListener, ChannelFuture, Channel } +import org.jboss.netty.channel.{ ChannelHandlerContext, Channel } import org.jboss.netty.handler.codec.protobuf.{ ProtobufEncoder, ProtobufDecoder } import org.jboss.netty.handler.execution.OrderedMemoryAwareThreadPoolExecutor import org.jboss.netty.util.HashedWheelTimer -import akka.actor.{ ActorSystemImpl, ActorRef, simpleName } + +import akka.actor.{ Address, ActorSystemImpl, ActorRef } import akka.dispatch.MonitorableThreadFactory import akka.event.Logging import akka.remote.RemoteProtocol.AkkaRemoteProtocol -import akka.remote.{ RemoteTransport, RemoteMarshallingOps, RemoteClientWriteFailed, RemoteClientException, RemoteClientError, RemoteActorRef } -import akka.util.Switch -import akka.AkkaException -import com.typesafe.config.Config -import akka.remote.RemoteSettings -import akka.actor.Address -import java.net.InetSocketAddress -import akka.remote.RemoteActorRefProvider -import akka.remote.RemoteActorRefProvider -import akka.event.LoggingAdapter +import akka.remote.{ RemoteTransport, RemoteSettings, RemoteMarshallingOps, RemoteActorRefProvider, RemoteActorRef } /** * Provides the implementation of the Netty remote support @@ -38,7 +31,7 @@ class NettyRemoteTransport(val remoteSettings: RemoteSettings, val system: Actor val settings = new NettySettings(remoteSettings.config.getConfig("akka.remote.netty"), remoteSettings.systemName) - val threadFactory = new MonitorableThreadFactory("NettyRemoteTransport", settings.Daemonic) + val threadFactory = new MonitorableThreadFactory("NettyRemoteTransport", settings.Daemonic, Some(getClass.getClassLoader)) val timer: HashedWheelTimer = new HashedWheelTimer(threadFactory) val executor = new OrderedMemoryAwareThreadPoolExecutor( @@ -58,7 +51,7 @@ class NettyRemoteTransport(val remoteSettings: RemoteSettings, val system: Actor override protected def useUntrustedMode = remoteSettings.UntrustedMode - val server = try new NettyRemoteServer(this, Some(getClass.getClassLoader)) catch { + val server = try new NettyRemoteServer(this) catch { case ex ⇒ shutdown(); throw ex } @@ -94,8 +87,7 @@ class NettyRemoteTransport(val remoteSettings: RemoteSettings, val system: Actor protected[akka] def send( message: Any, senderOption: Option[ActorRef], - recipient: RemoteActorRef, - loader: Option[ClassLoader]): Unit = { + recipient: RemoteActorRef): Unit = { val recipientAddress = recipient.path.address @@ -112,7 +104,7 @@ class NettyRemoteTransport(val remoteSettings: RemoteSettings, val system: Actor //Recheck for addition, race between upgrades case Some(client) ⇒ client //If already populated by other writer case None ⇒ //Populate map - val client = new ActiveRemoteClient(this, recipientAddress, address, loader) + val client = new ActiveRemoteClient(this, recipientAddress, address) client.connect() remoteClients += recipientAddress -> client client diff --git a/akka-remote/src/main/scala/akka/remote/netty/Server.scala b/akka-remote/src/main/scala/akka/remote/netty/Server.scala index f695042331..749e01d63f 100644 --- a/akka-remote/src/main/scala/akka/remote/netty/Server.scala +++ b/akka-remote/src/main/scala/akka/remote/netty/Server.scala @@ -21,9 +21,7 @@ import java.net.InetAddress import akka.actor.ActorSystemImpl import org.jboss.netty.channel.ChannelLocal -class NettyRemoteServer( - val netty: NettyRemoteTransport, - val loader: Option[ClassLoader]) { +class NettyRemoteServer(val netty: NettyRemoteTransport) { import netty.settings @@ -40,7 +38,7 @@ class NettyRemoteServer( // group of open channels, used for clean-up private val openChannels: ChannelGroup = new DefaultDisposableChannelGroup("akka-remote-server") - val pipelineFactory = new RemoteServerPipelineFactory(openChannels, executionHandler, loader, netty) + val pipelineFactory = new RemoteServerPipelineFactory(openChannels, executionHandler, netty) bootstrap.setPipelineFactory(pipelineFactory) bootstrap.setOption("backlog", settings.Backlog) bootstrap.setOption("tcpNoDelay", true) @@ -79,7 +77,6 @@ class NettyRemoteServer( class RemoteServerPipelineFactory( val openChannels: ChannelGroup, val executionHandler: ExecutionHandler, - val loader: Option[ClassLoader], val netty: NettyRemoteTransport) extends ChannelPipelineFactory { import netty.settings @@ -91,7 +88,7 @@ class RemoteServerPipelineFactory( val messageEnc = new RemoteMessageEncoder(netty) val authenticator = if (settings.RequireCookie) new RemoteServerAuthenticationHandler(settings.SecureCookie) :: Nil else Nil - val remoteServer = new RemoteServerHandler(openChannels, loader, netty) + val remoteServer = new RemoteServerHandler(openChannels, netty) val stages: List[ChannelHandler] = lenDec :: messageDec :: lenPrep :: messageEnc :: executionHandler :: authenticator ::: remoteServer :: Nil new StaticChannelPipeline(stages: _*) } @@ -131,7 +128,6 @@ object ChannelLocalSystem extends ChannelLocal[ActorSystemImpl] { @ChannelHandler.Sharable class RemoteServerHandler( val openChannels: ChannelGroup, - val applicationLoader: Option[ClassLoader], val netty: NettyRemoteTransport) extends SimpleChannelUpstreamHandler { import netty.settings @@ -164,7 +160,7 @@ class RemoteServerHandler( override def messageReceived(ctx: ChannelHandlerContext, event: MessageEvent) = try { event.getMessage match { case remote: AkkaRemoteProtocol if remote.hasMessage ⇒ - netty.receiveMessage(new RemoteMessage(remote.getMessage, netty.system, applicationLoader)) + netty.receiveMessage(new RemoteMessage(remote.getMessage, netty.system)) case remote: AkkaRemoteProtocol if remote.hasInstruction ⇒ val instruction = remote.getInstruction