Merge pull request #107 from jboner/actor-path

Actor path: created pull request in Peter’s name
This commit is contained in:
Roland Kuhn 2011-11-10 06:11:12 -08:00
commit 3e166030ec
46 changed files with 749 additions and 529 deletions

View file

@ -51,7 +51,7 @@ class Remote(val app: AkkaApplication) {
val computeGridDispatcher = dispatcherFactory.newDispatcher("akka:compute-grid").build
private[remote] lazy val remoteDaemonSupervisor = app.actorOf(Props(
OneForOneStrategy(List(classOf[Exception]), None, None))) // is infinite restart what we want?
OneForOneStrategy(List(classOf[Exception]), None, None)), "akka-system-remote-supervisor") // is infinite restart what we want?
private[remote] lazy val remoteDaemon =
app.provider.actorOf(
@ -129,7 +129,7 @@ class RemoteSystemDaemon(remote: Remote) extends Actor {
def handleUse(message: RemoteSystemDaemonMessageProtocol) {
try {
if (message.hasActorAddress) {
if (message.hasActorPath) {
val actorFactoryBytes =
if (shouldCompressData) LZF.uncompress(message.getPayload.toByteArray) else message.getPayload.toByteArray
@ -140,7 +140,15 @@ class RemoteSystemDaemon(remote: Remote) extends Actor {
case Right(instance) instance.asInstanceOf[() Actor]
}
app.actorOf(Props(creator = actorFactory), message.getActorAddress)
val actorPath = ActorPath(remote.app, message.getActorPath)
val parent = actorPath.parent.ref
if (parent.isDefined) {
app.provider.actorOf(Props(creator = actorFactory), parent.get, actorPath.name)
} else {
log.error("Parent actor does not exist, ignoring remote system daemon command [{}]", message)
}
} else {
log.error("Actor 'address' for actor to instantiate is not defined, ignoring remote system daemon command [{}]", message)
}
@ -180,7 +188,7 @@ class RemoteSystemDaemon(remote: Remote) extends Actor {
Props(
context {
case f: Function0[_] try { f() } finally { context.self.stop() }
}).copy(dispatcher = computeGridDispatcher), app.guardian, Props.randomAddress, systemService = true) ! payloadFor(message, classOf[Function0[Unit]])
}).copy(dispatcher = computeGridDispatcher), app.guardian, app.guardian.path / Props.randomName, systemService = true) ! payloadFor(message, classOf[Function0[Unit]])
}
// FIXME: handle real remote supervision
@ -189,7 +197,7 @@ class RemoteSystemDaemon(remote: Remote) extends Actor {
Props(
context {
case f: Function0[_] try { sender ! f() } finally { context.self.stop() }
}).copy(dispatcher = computeGridDispatcher), app.guardian, Props.randomAddress, systemService = true) forward payloadFor(message, classOf[Function0[Any]])
}).copy(dispatcher = computeGridDispatcher), app.guardian, app.guardian.path / Props.randomName, systemService = true) forward payloadFor(message, classOf[Function0[Any]])
}
// FIXME: handle real remote supervision
@ -198,7 +206,7 @@ class RemoteSystemDaemon(remote: Remote) extends Actor {
Props(
context {
case (fun: Function[_, _], param: Any) try { fun.asInstanceOf[Any Unit].apply(param) } finally { context.self.stop() }
}).copy(dispatcher = computeGridDispatcher), app.guardian, Props.randomAddress, systemService = true) ! payloadFor(message, classOf[Tuple2[Function1[Any, Unit], Any]])
}).copy(dispatcher = computeGridDispatcher), app.guardian, app.guardian.path / Props.randomName, systemService = true) ! payloadFor(message, classOf[Tuple2[Function1[Any, Unit], Any]])
}
// FIXME: handle real remote supervision
@ -207,7 +215,7 @@ class RemoteSystemDaemon(remote: Remote) extends Actor {
Props(
context {
case (fun: Function[_, _], param: Any) try { sender ! fun.asInstanceOf[Any Any](param) } finally { context.self.stop() }
}).copy(dispatcher = computeGridDispatcher), app.guardian, Props.randomAddress, systemService = true) forward payloadFor(message, classOf[Tuple2[Function1[Any, Any], Any]])
}).copy(dispatcher = computeGridDispatcher), app.guardian, app.guardian.path / Props.randomName, systemService = true) forward payloadFor(message, classOf[Tuple2[Function1[Any, Any], Any]])
}
def handleFailover(message: RemoteSystemDaemonMessageProtocol) {
@ -227,10 +235,11 @@ class RemoteMessage(input: RemoteMessageProtocol, remote: RemoteSupport, classLo
lazy val sender: ActorRef =
if (input.hasSender)
remote.app.provider.deserialize(
SerializedActorRef(input.getSender.getAddress, input.getSender.getHost, input.getSender.getPort)).getOrElse(throw new IllegalStateException("OHNOES"))
SerializedActorRef(input.getSender.getHost, input.getSender.getPort, input.getSender.getPath)).getOrElse(throw new IllegalStateException("OHNOES"))
else
remote.app.deadLetters
lazy val recipient: ActorRef = remote.app.actorFor(input.getRecipient.getAddress).getOrElse(remote.app.deadLetters)
lazy val recipient: ActorRef = remote.app.actorFor(input.getRecipient.getPath).getOrElse(remote.app.deadLetters)
lazy val payload: Either[Throwable, AnyRef] =
if (input.hasException) Left(parseException())
@ -252,7 +261,7 @@ class RemoteMessage(input: RemoteMessageProtocol, remote: RemoteSupport, classLo
}
}
override def toString = "RemoteMessage: " + recipient + "(" + input.getRecipient.getAddress + ") from " + sender
override def toString = "RemoteMessage: " + recipient + "(" + input.getRecipient.getPath + ") from " + sender
}
trait RemoteMarshallingOps {
@ -276,7 +285,7 @@ trait RemoteMarshallingOps {
*/
def toRemoteActorRefProtocol(actor: ActorRef): ActorRefProtocol = {
val rep = app.provider.serialize(actor)
ActorRefProtocol.newBuilder.setAddress(rep.address).setHost(rep.hostname).setPort(rep.port).build
ActorRefProtocol.newBuilder.setHost(rep.hostname).setPort(rep.port).setPath(rep.path).build
}
def createRemoteMessageProtocolBuilder(