diff --git a/akka-remote/src/main/scala/akka/remote/Remote.scala b/akka-remote/src/main/scala/akka/remote/Remote.scala index 6192f4d4d5..65bcf3167d 100644 --- a/akka-remote/src/main/scala/akka/remote/Remote.scala +++ b/akka-remote/src/main/scala/akka/remote/Remote.scala @@ -254,6 +254,8 @@ class RemoteMessage(input: RemoteMessageProtocol, remote: RemoteSupport, classLo def provider = remote.system.asInstanceOf[ActorSystemImpl].provider + def originalReceiver = input.getRecipient.getPath + lazy val sender: ActorRef = if (input.hasSender) provider.actorFor(provider.rootGuardian, input.getSender.getPath) else remote.system.deadLetters @@ -332,6 +334,12 @@ trait RemoteMarshallingOps { throw new SecurityException("RemoteModule server is operating is untrusted mode, can not pass on a AutoReceivedMessage to the remote actor") case m ⇒ l.!(m)(remoteMessage.sender) } + case r: RemoteActorRef ⇒ + remoteMessage.originalReceiver match { + case RemoteActorPath(address, _) if address == remote.remoteDaemon.path.address ⇒ + r.!(remoteMessage.payload)(remoteMessage.sender) + case r ⇒ log.error("dropping message {} for non-local recipient {}", remoteMessage.payload, r) + } case r ⇒ log.error("dropping message {} for non-local recipient {}", remoteMessage.payload, r) } } diff --git a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala index a68fa5e0e6..8d8fbc972e 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala @@ -87,66 +87,66 @@ class RemoteActorRefProvider( new RemoteActorRef(this, remote.server, rpath, supervisor, None) } -// def isReplicaNode: Boolean = remoteAddresses exists { _ == remote.remoteAddress } -// -// //system.eventHandler.debug(this, "%s: Deploy Remote Actor with address [%s] connected to [%s]: isReplica(%s)".format(system.defaultAddress, address, remoteAddresses.mkString, isReplicaNode)) -// -// if (isReplicaNode) { -// // we are on one of the replica node for this remote actor -// local.actorOf(system, props, supervisor, name, true) //FIXME systemService = true here to bypass Deploy, should be fixed when create-or-get is replaced by get-or-create (is this fixed now?) -// } else { -// -// implicit val dispatcher = if (props.dispatcher == Props.defaultDispatcher) system.dispatcher else props.dispatcher -// implicit val timeout = system.settings.ActorTimeout -// -// // we are on the single "reference" node uses the remote actors on the replica nodes -// val routerFactory: () ⇒ Router = DeploymentConfig.routerTypeFor(routerType) match { -// case RouterType.Direct ⇒ -// if (remoteAddresses.size != 1) throw new ConfigurationException( -// "Actor [%s] configured with Direct router must have exactly 1 remote node configured. Found [%s]" -// .format(name, remoteAddresses.mkString(", "))) -// () ⇒ new DirectRouter -// -// case RouterType.Broadcast ⇒ -// if (remoteAddresses.size != 1) throw new ConfigurationException( -// "Actor [%s] configured with Broadcast router must have exactly 1 remote node configured. Found [%s]" -// .format(name, remoteAddresses.mkString(", "))) -// () ⇒ new BroadcastRouter -// -// case RouterType.Random ⇒ -// if (remoteAddresses.size < 1) throw new ConfigurationException( -// "Actor [%s] configured with Random router must have at least 1 remote node configured. Found [%s]" -// .format(name, remoteAddresses.mkString(", "))) -// () ⇒ new RandomRouter -// -// case RouterType.RoundRobin ⇒ -// if (remoteAddresses.size < 1) throw new ConfigurationException( -// "Actor [%s] configured with RoundRobin router must have at least 1 remote node configured. Found [%s]" -// .format(name, remoteAddresses.mkString(", "))) -// () ⇒ new RoundRobinRouter -// -// case RouterType.ScatterGather ⇒ -// if (remoteAddresses.size < 1) throw new ConfigurationException( -// "Actor [%s] configured with ScatterGather router must have at least 1 remote node configured. Found [%s]" -// .format(name, remoteAddresses.mkString(", "))) -// () ⇒ new ScatterGatherFirstCompletedRouter()(dispatcher, defaultTimeout) -// -// case RouterType.LeastCPU ⇒ sys.error("Router LeastCPU not supported yet") -// case RouterType.LeastRAM ⇒ sys.error("Router LeastRAM not supported yet") -// case RouterType.LeastMessages ⇒ sys.error("Router LeastMessages not supported yet") -// case RouterType.Custom(implClass) ⇒ () ⇒ Routing.createCustomRouter(implClass) -// } -// -// val connections = (Map.empty[RemoteAddress, ActorRef] /: remoteAddresses) { (conns, a) ⇒ -// conns + (a -> new RemoteActorRef(this, remote.server, path, None)) // FIXME RK correct path must be put in here -// } -// -// val connectionManager = new RemoteConnectionManager(system, remote, connections) -// -// connections.keys foreach { useActorOnNode(system, _, path.toString, props.creator) } -// -// actorOf(system, RoutedProps(routerFactory = routerFactory, connectionManager = connectionManager), supervisor, name) -// } + // def isReplicaNode: Boolean = remoteAddresses exists { _ == remote.remoteAddress } + // + // //system.eventHandler.debug(this, "%s: Deploy Remote Actor with address [%s] connected to [%s]: isReplica(%s)".format(system.defaultAddress, address, remoteAddresses.mkString, isReplicaNode)) + // + // if (isReplicaNode) { + // // we are on one of the replica node for this remote actor + // local.actorOf(system, props, supervisor, name, true) //FIXME systemService = true here to bypass Deploy, should be fixed when create-or-get is replaced by get-or-create (is this fixed now?) + // } else { + // + // implicit val dispatcher = if (props.dispatcher == Props.defaultDispatcher) system.dispatcher else props.dispatcher + // implicit val timeout = system.settings.ActorTimeout + // + // // we are on the single "reference" node uses the remote actors on the replica nodes + // val routerFactory: () ⇒ Router = DeploymentConfig.routerTypeFor(routerType) match { + // case RouterType.Direct ⇒ + // if (remoteAddresses.size != 1) throw new ConfigurationException( + // "Actor [%s] configured with Direct router must have exactly 1 remote node configured. Found [%s]" + // .format(name, remoteAddresses.mkString(", "))) + // () ⇒ new DirectRouter + // + // case RouterType.Broadcast ⇒ + // if (remoteAddresses.size != 1) throw new ConfigurationException( + // "Actor [%s] configured with Broadcast router must have exactly 1 remote node configured. Found [%s]" + // .format(name, remoteAddresses.mkString(", "))) + // () ⇒ new BroadcastRouter + // + // case RouterType.Random ⇒ + // if (remoteAddresses.size < 1) throw new ConfigurationException( + // "Actor [%s] configured with Random router must have at least 1 remote node configured. Found [%s]" + // .format(name, remoteAddresses.mkString(", "))) + // () ⇒ new RandomRouter + // + // case RouterType.RoundRobin ⇒ + // if (remoteAddresses.size < 1) throw new ConfigurationException( + // "Actor [%s] configured with RoundRobin router must have at least 1 remote node configured. Found [%s]" + // .format(name, remoteAddresses.mkString(", "))) + // () ⇒ new RoundRobinRouter + // + // case RouterType.ScatterGather ⇒ + // if (remoteAddresses.size < 1) throw new ConfigurationException( + // "Actor [%s] configured with ScatterGather router must have at least 1 remote node configured. Found [%s]" + // .format(name, remoteAddresses.mkString(", "))) + // () ⇒ new ScatterGatherFirstCompletedRouter()(dispatcher, defaultTimeout) + // + // case RouterType.LeastCPU ⇒ sys.error("Router LeastCPU not supported yet") + // case RouterType.LeastRAM ⇒ sys.error("Router LeastRAM not supported yet") + // case RouterType.LeastMessages ⇒ sys.error("Router LeastMessages not supported yet") + // case RouterType.Custom(implClass) ⇒ () ⇒ Routing.createCustomRouter(implClass) + // } + // + // val connections = (Map.empty[RemoteAddress, ActorRef] /: remoteAddresses) { (conns, a) ⇒ + // conns + (a -> new RemoteActorRef(this, remote.server, path, None)) // FIXME RK correct path must be put in here + // } + // + // val connectionManager = new RemoteConnectionManager(system, remote, connections) + // + // connections.keys foreach { useActorOnNode(system, _, path.toString, props.creator) } + // + // actorOf(system, RoutedProps(routerFactory = routerFactory, connectionManager = connectionManager), supervisor, name) + // } case deploy ⇒ local.actorOf(system, props, supervisor, name, systemService) } } diff --git a/akka-remote/src/test/scala/akka/remote/RemoteCommunicationSpec.scala b/akka-remote/src/test/scala/akka/remote/RemoteCommunicationSpec.scala index 9d53e75993..411c0811da 100644 --- a/akka-remote/src/test/scala/akka/remote/RemoteCommunicationSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/RemoteCommunicationSpec.scala @@ -8,7 +8,25 @@ import akka.actor._ import com.typesafe.config._ object RemoteCommunicationSpec { - val echo = Props(ctx ⇒ { case x ⇒ ctx.sender ! x }) + class Echo extends Actor { + var target: ActorRef = context.system.deadLetters + + def receive = { + case (p: Props, n: String) ⇒ context.actorOf[Echo]("grandchild") + case ex: Exception ⇒ throw ex + case s: String ⇒ sender ! context.actorFor(s) + case x ⇒ target = sender; sender ! x + } + + override def preStart() {} + override def preRestart(cause: Throwable, msg: Option[Any]) { + target ! "preRestart" + } + override def postRestart(cause: Throwable) {} + override def postStop() { + target ! "postStop" + } + } } class RemoteCommunicationSpec extends AkkaSpec(""" @@ -21,9 +39,9 @@ akka { port = 12345 } actor.deployment { - /user/blub { - remote.nodes = ["remote_sys@localhost:12346"] - } + /user/blub.remote.nodes = ["remote_sys@localhost:12346"] + /user/looker/child.remote.nodes = ["remote_sys@localhost:12346"] + /user/looker/child/grandchild.remote.nodes = ["RemoteCommunicationSpec@localhost:12345"] } } """) with ImplicitSender { @@ -78,11 +96,39 @@ akka { }(other) } - "create children on remote node" in { - val r = system.actorOf(echo, "blub") + "create and supervise children on remote node" in { + val r = system.actorOf[Echo]("blub") r.path.toString must be === "akka://remote_sys@localhost:12346/remote/RemoteCommunicationSpec@localhost:12345/user/blub" r ! 42 expectMsg(42) + EventFilter[Exception]("crash", occurrences = 1).intercept { + r ! new Exception("crash") + }(other) + expectMsg("preRestart") + r ! 42 + expectMsg(42) + r.stop() + expectMsg("postStop") + } + + "look-up actors across node boundaries" in { + val l = system.actorOf(Props(new Actor { + def receive = { + case (p: Props, n: String) ⇒ sender ! context.actorOf(p, n) + case s: String ⇒ sender ! context.actorFor(s) + } + }), "looker") + l ! (Props[Echo], "child") + val r = expectMsgType[ActorRef] + r ! (Props[Echo], "grandchild") + val myref = system.actorFor(system / "looker" / "child" / "grandchild") + myref.isInstanceOf[RemoteActorRef] must be(true) + myref ! 43 + expectMsg(43) + val remref = lastSender + remref.isInstanceOf[LocalActorRef] must be(true) + (l ? "child/..").as[ActorRef].get must be theSameInstanceAs l + (system.actorFor(system / "looker" / "child") ? "..").as[ActorRef].get must be theSameInstanceAs l } }