Fixed #305. Invoking 'stop' on client-managed remote actors does not shut down remote instance (but only local)

This commit is contained in:
Jonas Bonér 2010-08-12 15:36:05 +02:00
parent 59069fcf32
commit 5605895ea6
5 changed files with 59 additions and 31 deletions

View file

@ -13,7 +13,7 @@ import se.scalablesolutions.akka.stm.TransactionManagement._
import se.scalablesolutions.akka.stm.{TransactionManagement, TransactionSetAbortedException}
import se.scalablesolutions.akka.remote.protocol.RemoteProtocol._
import se.scalablesolutions.akka.remote.{RemoteNode, RemoteServer, RemoteClient, MessageSerializer, RemoteRequestProtocolIdFactory}
import se.scalablesolutions.akka.serialization.Serializer
import se.scalablesolutions.akka.serialization.{Serializer, BinaryString}
import se.scalablesolutions.akka.util.{HashCode, Logging, UUID, ReentrantGuard}
import RemoteActorSerialization._
@ -1253,6 +1253,15 @@ class LocalActorRef private[akka](
} else message
}
/**
* System messages for RemoteActorRef.
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
object RemoteActorSystemMessage {
val Stop = BinaryString("RemoteActorRef:stop")
}
/**
* Remote ActorRef that is used when referencing the Actor on a different node than its "home" node.
* This reference is network-aware (remembers its origin) and immutable.
@ -1263,6 +1272,7 @@ private[akka] case class RemoteActorRef private[akka] (
uuuid: String, val className: String, val hostname: String, val port: Int, _timeout: Long, loader: Option[ClassLoader])
// uuid: String, className: String, hostname: String, port: Int, timeOut: Long, isOnRemoteHost: Boolean) extends ActorRef {
extends ActorRef {
_uuid = uuuid
timeout = _timeout
@ -1291,6 +1301,7 @@ private[akka] case class RemoteActorRef private[akka] (
def stop: Unit = {
_isRunning = false
_isShutDown = true
postMessageToMailbox(RemoteActorSystemMessage.Stop, None)
}
/**

View file

@ -4,7 +4,7 @@
package se.scalablesolutions.akka.config
import se.scalablesolutions.akka.actor.{Actor, ActorRef}
import se.scalablesolutions.akka.actor.{ActorRef, UntypedActorRef}
import se.scalablesolutions.akka.dispatch.MessageDispatcher
sealed abstract class FaultHandlingStrategy
@ -25,11 +25,15 @@ object ScalaConfig {
case class SupervisorConfig(restartStrategy: RestartStrategy, worker: List[Server]) extends Server
class Supervise(val actorRef: ActorRef, val lifeCycle: LifeCycle, _remoteAddress: RemoteAddress) extends Server {
def this(actorRef: UntypedActorRef, lifeCycle: LifeCycle, _remoteAddress: RemoteAddress) =
this(actorRef.actorRef, lifeCycle, _remoteAddress)
val remoteAddress: Option[RemoteAddress] = if (_remoteAddress eq null) None else Some(_remoteAddress)
}
object Supervise {
def apply(actorRef: ActorRef, lifeCycle: LifeCycle, remoteAddress: RemoteAddress) = new Supervise(actorRef, lifeCycle, remoteAddress)
def apply(actorRef: ActorRef, lifeCycle: LifeCycle) = new Supervise(actorRef, lifeCycle, null)
def apply(actorRef: UntypedActorRef, lifeCycle: LifeCycle, remoteAddress: RemoteAddress) = new Supervise(actorRef, lifeCycle, remoteAddress)
def apply(actorRef: UntypedActorRef, lifeCycle: LifeCycle) = new Supervise(actorRef, lifeCycle, null)
def unapply(supervise: Supervise) = Some((supervise.actorRef, supervise.lifeCycle, supervise.remoteAddress))
}
@ -215,6 +219,9 @@ object JavaConfig {
def newSupervised(actorRef: ActorRef) =
se.scalablesolutions.akka.config.ScalaConfig.Supervise(actorRef, lifeCycle.transform)
def newSupervised(actorRef: UntypedActorRef) =
se.scalablesolutions.akka.config.ScalaConfig.Supervise(actorRef, lifeCycle.transform)
}
}

View file

@ -25,7 +25,6 @@ object MessageSerializer extends Logging {
}
def deserialize(messageProtocol: MessageProtocol): Any = {
log.debug("scheme = " + messageProtocol.getSerializationScheme)
messageProtocol.getSerializationScheme match {
case SerializationSchemeType.JAVA =>
unbox(SERIALIZER_JAVA.fromBinary(messageProtocol.getMessage.toByteArray, None))

View file

@ -44,9 +44,16 @@ object RemoteRequestProtocolIdFactory {
* Life-cycle events for RemoteClient.
*/
sealed trait RemoteClientLifeCycleEvent
case class RemoteClientError(@BeanProperty val cause: Throwable, @BeanProperty val host: String, @BeanProperty val port: Int) extends RemoteClientLifeCycleEvent
case class RemoteClientDisconnected(@BeanProperty val host: String, @BeanProperty val port: Int) extends RemoteClientLifeCycleEvent
case class RemoteClientConnected(@BeanProperty val host: String, @BeanProperty val port: Int) extends RemoteClientLifeCycleEvent
case class RemoteClientError(
@BeanProperty val cause: Throwable,
@BeanProperty val host: String,
@BeanProperty val port: Int) extends RemoteClientLifeCycleEvent
case class RemoteClientDisconnected(
@BeanProperty val host: String,
@BeanProperty val port: Int) extends RemoteClientLifeCycleEvent
case class RemoteClientConnected(
@BeanProperty val host: String,
@BeanProperty val port: Int) extends RemoteClientLifeCycleEvent
class RemoteClientException private[akka](message: String) extends RuntimeException(message)
@ -259,23 +266,23 @@ class RemoteClientPipelineFactory(
remoteAddress: SocketAddress,
timer: HashedWheelTimer,
client: RemoteClient) extends ChannelPipelineFactory {
def getPipeline: ChannelPipeline = {
def join(ch: ChannelHandler*) = Array[ChannelHandler](ch:_*)
def getPipeline: ChannelPipeline = {
def join(ch: ChannelHandler*) = Array[ChannelHandler](ch: _*)
val engine = RemoteServerSslContext.client.createSSLEngine()
engine.setEnabledCipherSuites(engine.getSupportedCipherSuites) //TODO is this sensible?
engine.setUseClientMode(true)
val ssl = if(RemoteServer.SECURE) join(new SslHandler(engine)) else join()
val ssl = if (RemoteServer.SECURE) join(new SslHandler(engine)) else join()
val timeout = new ReadTimeoutHandler(timer, RemoteClient.READ_TIMEOUT.toMillis.toInt)
val lenDec = new LengthFieldBasedFrameDecoder(1048576, 0, 4, 0, 4)
val lenPrep = new LengthFieldPrepender(4)
val protobufDec = new ProtobufDecoder(RemoteReplyProtocol.getDefaultInstance)
val protobufEnc = new ProtobufEncoder
val(enc,dec) = RemoteServer.COMPRESSION_SCHEME match {
case "zlib" => (join(new ZlibEncoder(RemoteServer.ZLIB_COMPRESSION_LEVEL)),join(new ZlibDecoder))
case _ => (join(),join())
val (enc, dec) = RemoteServer.COMPRESSION_SCHEME match {
case "zlib" => (join(new ZlibEncoder(RemoteServer.ZLIB_COMPRESSION_LEVEL)), join(new ZlibDecoder))
case _ => (join(), join())
}
val remoteClient = new RemoteClientHandler(name, futures, supervisors, bootstrap, remoteAddress, timer, client)

View file

@ -309,10 +309,10 @@ object RemoteServerSslContext {
//val algorithm = Option(Security.getProperty("ssl.KeyManagerFactory.algorithm")).getOrElse("SunX509")
//val store = KeyStore.getInstance("JKS")
val s = SSLContext.getInstance(protocol)
s.init(null,null,null)
s.init(null, null, null)
val c = SSLContext.getInstance(protocol)
c.init(null,null,null)
(c,s)
c.init(null, null, null)
(c, s)
}
}
@ -429,6 +429,9 @@ class RemoteServerHandler(
if (request.hasSender) Some(RemoteActorSerialization.fromProtobufToRemoteActorRef(request.getSender, applicationLoader))
else None
message match { // first match on system messages
case RemoteActorSystemMessage.Stop => actorRef.stop
case _ => // then match on user defined messages
if (request.getIsOneWay) actorRef.!(message)(sender)
else {
try {
@ -450,6 +453,7 @@ class RemoteServerHandler(
}
}
}
}
private def dispatchToTypedActor(request: RemoteRequestProtocol, channel: Channel) = {
val actorInfo = request.getActorInfo