diff --git a/akka-core/src/main/scala/actor/ActorRef.scala b/akka-core/src/main/scala/actor/ActorRef.scala index d4700b9ba0..ce954e41de 100644 --- a/akka-core/src/main/scala/actor/ActorRef.scala +++ b/akka-core/src/main/scala/actor/ActorRef.scala @@ -13,7 +13,7 @@ import se.scalablesolutions.akka.stm.TransactionManagement._ import se.scalablesolutions.akka.stm.{TransactionManagement, TransactionSetAbortedException} import se.scalablesolutions.akka.remote.protocol.RemoteProtocol._ import se.scalablesolutions.akka.remote.{RemoteNode, RemoteServer, RemoteClient, MessageSerializer, RemoteRequestProtocolIdFactory} -import se.scalablesolutions.akka.serialization.Serializer +import se.scalablesolutions.akka.serialization.{Serializer, BinaryString} import se.scalablesolutions.akka.util.{HashCode, Logging, UUID, ReentrantGuard} import RemoteActorSerialization._ @@ -1253,6 +1253,15 @@ class LocalActorRef private[akka]( } else message } +/** + * System messages for RemoteActorRef. + * + * @author Jonas Bonér + */ +object RemoteActorSystemMessage { + val Stop = BinaryString("RemoteActorRef:stop") +} + /** * Remote ActorRef that is used when referencing the Actor on a different node than its "home" node. * This reference is network-aware (remembers its origin) and immutable. @@ -1263,6 +1272,7 @@ private[akka] case class RemoteActorRef private[akka] ( uuuid: String, val className: String, val hostname: String, val port: Int, _timeout: Long, loader: Option[ClassLoader]) // uuid: String, className: String, hostname: String, port: Int, timeOut: Long, isOnRemoteHost: Boolean) extends ActorRef { extends ActorRef { + _uuid = uuuid timeout = _timeout @@ -1291,6 +1301,7 @@ private[akka] case class RemoteActorRef private[akka] ( def stop: Unit = { _isRunning = false _isShutDown = true + postMessageToMailbox(RemoteActorSystemMessage.Stop, None) } /** diff --git a/akka-core/src/main/scala/config/SupervisionConfig.scala b/akka-core/src/main/scala/config/SupervisionConfig.scala index cb0829704d..071fdf0c12 100644 --- a/akka-core/src/main/scala/config/SupervisionConfig.scala +++ b/akka-core/src/main/scala/config/SupervisionConfig.scala @@ -4,7 +4,7 @@ package se.scalablesolutions.akka.config -import se.scalablesolutions.akka.actor.{Actor, ActorRef} +import se.scalablesolutions.akka.actor.{ActorRef, UntypedActorRef} import se.scalablesolutions.akka.dispatch.MessageDispatcher sealed abstract class FaultHandlingStrategy @@ -25,11 +25,15 @@ object ScalaConfig { case class SupervisorConfig(restartStrategy: RestartStrategy, worker: List[Server]) extends Server class Supervise(val actorRef: ActorRef, val lifeCycle: LifeCycle, _remoteAddress: RemoteAddress) extends Server { + def this(actorRef: UntypedActorRef, lifeCycle: LifeCycle, _remoteAddress: RemoteAddress) = + this(actorRef.actorRef, lifeCycle, _remoteAddress) val remoteAddress: Option[RemoteAddress] = if (_remoteAddress eq null) None else Some(_remoteAddress) } object Supervise { def apply(actorRef: ActorRef, lifeCycle: LifeCycle, remoteAddress: RemoteAddress) = new Supervise(actorRef, lifeCycle, remoteAddress) def apply(actorRef: ActorRef, lifeCycle: LifeCycle) = new Supervise(actorRef, lifeCycle, null) + def apply(actorRef: UntypedActorRef, lifeCycle: LifeCycle, remoteAddress: RemoteAddress) = new Supervise(actorRef, lifeCycle, remoteAddress) + def apply(actorRef: UntypedActorRef, lifeCycle: LifeCycle) = new Supervise(actorRef, lifeCycle, null) def unapply(supervise: Supervise) = Some((supervise.actorRef, supervise.lifeCycle, supervise.remoteAddress)) } @@ -215,6 +219,9 @@ object JavaConfig { def newSupervised(actorRef: ActorRef) = se.scalablesolutions.akka.config.ScalaConfig.Supervise(actorRef, lifeCycle.transform) + + def newSupervised(actorRef: UntypedActorRef) = + se.scalablesolutions.akka.config.ScalaConfig.Supervise(actorRef, lifeCycle.transform) } } diff --git a/akka-core/src/main/scala/remote/MessageSerializer.scala b/akka-core/src/main/scala/remote/MessageSerializer.scala index 24269c7f8e..8ef6f5d590 100644 --- a/akka-core/src/main/scala/remote/MessageSerializer.scala +++ b/akka-core/src/main/scala/remote/MessageSerializer.scala @@ -25,7 +25,6 @@ object MessageSerializer extends Logging { } def deserialize(messageProtocol: MessageProtocol): Any = { - log.debug("scheme = " + messageProtocol.getSerializationScheme) messageProtocol.getSerializationScheme match { case SerializationSchemeType.JAVA => unbox(SERIALIZER_JAVA.fromBinary(messageProtocol.getMessage.toByteArray, None)) diff --git a/akka-core/src/main/scala/remote/RemoteClient.scala b/akka-core/src/main/scala/remote/RemoteClient.scala index c1bd574c3e..fefb1521ed 100644 --- a/akka-core/src/main/scala/remote/RemoteClient.scala +++ b/akka-core/src/main/scala/remote/RemoteClient.scala @@ -44,9 +44,16 @@ object RemoteRequestProtocolIdFactory { * Life-cycle events for RemoteClient. */ sealed trait RemoteClientLifeCycleEvent -case class RemoteClientError(@BeanProperty val cause: Throwable, @BeanProperty val host: String, @BeanProperty val port: Int) extends RemoteClientLifeCycleEvent -case class RemoteClientDisconnected(@BeanProperty val host: String, @BeanProperty val port: Int) extends RemoteClientLifeCycleEvent -case class RemoteClientConnected(@BeanProperty val host: String, @BeanProperty val port: Int) extends RemoteClientLifeCycleEvent +case class RemoteClientError( + @BeanProperty val cause: Throwable, + @BeanProperty val host: String, + @BeanProperty val port: Int) extends RemoteClientLifeCycleEvent +case class RemoteClientDisconnected( + @BeanProperty val host: String, + @BeanProperty val port: Int) extends RemoteClientLifeCycleEvent +case class RemoteClientConnected( + @BeanProperty val host: String, + @BeanProperty val port: Int) extends RemoteClientLifeCycleEvent class RemoteClientException private[akka](message: String) extends RuntimeException(message) @@ -259,23 +266,23 @@ class RemoteClientPipelineFactory( remoteAddress: SocketAddress, timer: HashedWheelTimer, client: RemoteClient) extends ChannelPipelineFactory { - def getPipeline: ChannelPipeline = { - def join(ch: ChannelHandler*) = Array[ChannelHandler](ch:_*) + def getPipeline: ChannelPipeline = { + def join(ch: ChannelHandler*) = Array[ChannelHandler](ch: _*) val engine = RemoteServerSslContext.client.createSSLEngine() engine.setEnabledCipherSuites(engine.getSupportedCipherSuites) //TODO is this sensible? engine.setUseClientMode(true) - val ssl = if(RemoteServer.SECURE) join(new SslHandler(engine)) else join() + val ssl = if (RemoteServer.SECURE) join(new SslHandler(engine)) else join() val timeout = new ReadTimeoutHandler(timer, RemoteClient.READ_TIMEOUT.toMillis.toInt) val lenDec = new LengthFieldBasedFrameDecoder(1048576, 0, 4, 0, 4) val lenPrep = new LengthFieldPrepender(4) val protobufDec = new ProtobufDecoder(RemoteReplyProtocol.getDefaultInstance) val protobufEnc = new ProtobufEncoder - val(enc,dec) = RemoteServer.COMPRESSION_SCHEME match { - case "zlib" => (join(new ZlibEncoder(RemoteServer.ZLIB_COMPRESSION_LEVEL)),join(new ZlibDecoder)) - case _ => (join(),join()) + val (enc, dec) = RemoteServer.COMPRESSION_SCHEME match { + case "zlib" => (join(new ZlibEncoder(RemoteServer.ZLIB_COMPRESSION_LEVEL)), join(new ZlibDecoder)) + case _ => (join(), join()) } val remoteClient = new RemoteClientHandler(name, futures, supervisors, bootstrap, remoteAddress, timer, client) diff --git a/akka-core/src/main/scala/remote/RemoteServer.scala b/akka-core/src/main/scala/remote/RemoteServer.scala index 89c0a6e437..ec33bd8600 100644 --- a/akka-core/src/main/scala/remote/RemoteServer.scala +++ b/akka-core/src/main/scala/remote/RemoteServer.scala @@ -309,10 +309,10 @@ object RemoteServerSslContext { //val algorithm = Option(Security.getProperty("ssl.KeyManagerFactory.algorithm")).getOrElse("SunX509") //val store = KeyStore.getInstance("JKS") val s = SSLContext.getInstance(protocol) - s.init(null,null,null) + s.init(null, null, null) val c = SSLContext.getInstance(protocol) - c.init(null,null,null) - (c,s) + c.init(null, null, null) + (c, s) } } @@ -429,25 +429,29 @@ class RemoteServerHandler( if (request.hasSender) Some(RemoteActorSerialization.fromProtobufToRemoteActorRef(request.getSender, applicationLoader)) else None - if (request.getIsOneWay) actorRef.!(message)(sender) - else { - try { - val resultOrNone = (actorRef.!!(message)(sender)).as[AnyRef] - val result = if (resultOrNone.isDefined) resultOrNone.get else null + message match { // first match on system messages + case RemoteActorSystemMessage.Stop => actorRef.stop + case _ => // then match on user defined messages + if (request.getIsOneWay) actorRef.!(message)(sender) + else { + try { + val resultOrNone = (actorRef.!!(message)(sender)).as[AnyRef] + val result = if (resultOrNone.isDefined) resultOrNone.get else null - log.debug("Returning result from actor invocation [%s]", result) - val replyBuilder = RemoteReplyProtocol.newBuilder - .setId(request.getId) - .setMessage(MessageSerializer.serialize(result)) - .setIsSuccessful(true) - .setIsActor(true) + log.debug("Returning result from actor invocation [%s]", result) + val replyBuilder = RemoteReplyProtocol.newBuilder + .setId(request.getId) + .setMessage(MessageSerializer.serialize(result)) + .setIsSuccessful(true) + .setIsActor(true) - if (request.hasSupervisorUuid) replyBuilder.setSupervisorUuid(request.getSupervisorUuid) - channel.write(replyBuilder.build) + if (request.hasSupervisorUuid) replyBuilder.setSupervisorUuid(request.getSupervisorUuid) + channel.write(replyBuilder.build) - } catch { - case e: Throwable => channel.write(createErrorReplyMessage(e, request, true)) - } + } catch { + case e: Throwable => channel.write(createErrorReplyMessage(e, request, true)) + } + } } }