Remote DeathWatch (1): daemon communication

This commit is contained in:
Roland 2011-12-26 18:23:55 +01:00
parent 17125af571
commit 5e0ed18056
5 changed files with 74 additions and 1532 deletions

View file

@ -11,8 +11,6 @@ import akka.util.Timeout
import akka.config.ConfigurationException
import akka.event.{ DeathWatch, Logging }
import akka.serialization.Compression.LZF
import akka.remote.RemoteProtocol._
import akka.remote.RemoteProtocol.RemoteSystemDaemonMessageType._
import com.google.protobuf.ByteString
import akka.event.EventStream
import akka.dispatch.Promise
@ -147,21 +145,8 @@ class RemoteActorRefProvider(
def useActorOnNode(path: ActorPath, actorFactory: () Actor, supervisor: ActorRef) {
log.debug("[{}] Instantiating Remote Actor [{}]", rootPath, path)
val actorFactoryBytes =
remote.serialization.serialize(actorFactory) match {
case Left(error) throw error
case Right(bytes) if (remoteSettings.ShouldCompressData) LZF.compress(bytes) else bytes
}
val command = RemoteSystemDaemonMessageProtocol.newBuilder
.setMessageType(USE)
.setActorPath(path.toString)
.setPayload(ByteString.copyFrom(actorFactoryBytes))
.setSupervisor(supervisor.path.toString)
.build()
// we dont wait for the ACK, because the remote end will process this command before any other message to the new actor
actorFor(RootActorPath(path.address) / "remote") ! command
actorFor(RootActorPath(path.address) / "remote") ! DaemonMsgCreate(actorFactory, path.toString, supervisor)
}
}
@ -217,3 +202,25 @@ private[akka] class RemoteActorRef private[akka] (
@throws(classOf[java.io.ObjectStreamException])
private def writeReplace(): AnyRef = SerializedActorRef(path.toString)
}
class RemoteDeathWatch(val local: LocalDeathWatch, val provider: RemoteActorRefProvider) extends DeathWatch {
def subscribe(watcher: ActorRef, watched: ActorRef): Boolean = watched match {
case r: RemoteActorRef
val ret = local.subscribe(watcher, watched)
provider.actorFor(r.path.root / "remote") ! DaemonMsgWatch(watcher, watched)
ret
case l: LocalActorRef
local.subscribe(watcher, watched)
case _
provider.log.error("unknown ActorRef type {} as DeathWatch target", watched.getClass)
false
}
def unsubscribe(watcher: ActorRef, watched: ActorRef): Boolean = local.unsubscribe(watcher, watched)
def unsubscribe(watcher: ActorRef): Unit = local.unsubscribe(watcher)
def publish(event: Terminated): Unit = local.publish(event)
}