From 65868d7c96f3bfabc64b44c7073e93fe13fb39a8 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Tue, 18 Oct 2011 14:21:48 +0200 Subject: [PATCH] Making sure that the RemoteActorRefProvider delegates systemServices down to the LocalActorRefProvider --- .../akka/remote/RemoteActorRefProvider.scala | 156 +++++++++--------- 1 file changed, 79 insertions(+), 77 deletions(-) diff --git a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala index af89a719e5..2663abbf35 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala @@ -46,93 +46,95 @@ class RemoteActorRefProvider(val app: AkkaApplication) extends ActorRefProvider def actorOf(props: Props, address: String): ActorRef = actorOf(props, address, false) - def actorOf(props: Props, address: String, systemService: Boolean): ActorRef = { - val newFuture = Promise[ActorRef](5000)(defaultDispatcher) // FIXME is this proper timeout? + def actorOf(props: Props, address: String, systemService: Boolean): ActorRef = + if (systemService) local.actorOf(props, address, systemService) + else { + val newFuture = Promise[ActorRef](5000)(defaultDispatcher) // FIXME is this proper timeout? - actors.putIfAbsent(address, newFuture) match { // we won the race -- create the actor and resolve the future - case null ⇒ - val actor: ActorRef = try { - app.deployer.lookupDeploymentFor(address) match { - case Some(DeploymentConfig.Deploy(_, _, routerType, nrOfInstances, failureDetectorType, DeploymentConfig.RemoteScope(remoteAddresses))) ⇒ + actors.putIfAbsent(address, newFuture) match { // we won the race -- create the actor and resolve the future + case null ⇒ + val actor: ActorRef = try { + app.deployer.lookupDeploymentFor(address) match { + case Some(DeploymentConfig.Deploy(_, _, routerType, nrOfInstances, failureDetectorType, DeploymentConfig.RemoteScope(remoteAddresses))) ⇒ - val failureDetector = DeploymentConfig.failureDetectorTypeFor(failureDetectorType) match { - case FailureDetectorType.NoOp ⇒ new NoOpFailureDetector - case FailureDetectorType.RemoveConnectionOnFirstFailure ⇒ new RemoveConnectionOnFirstFailureFailureDetector - case FailureDetectorType.BannagePeriod(timeToBan) ⇒ new BannagePeriodFailureDetector(timeToBan) - case FailureDetectorType.Custom(implClass) ⇒ FailureDetector.createCustomFailureDetector(implClass) - } - - val thisHostname = remote.address.getHostName - val thisPort = remote.address.getPort - - def isReplicaNode: Boolean = remoteAddresses exists { some ⇒ some.hostname == thisHostname && some.port == thisPort } - - if (isReplicaNode) { - // we are on one of the replica node for this remote actor - new LocalActorRef(app, props, address, false) - } else { - - // we are on the single "reference" node uses the remote actors on the replica nodes - val routerFactory: () ⇒ Router = DeploymentConfig.routerTypeFor(routerType) match { - case RouterType.Direct ⇒ - if (remoteAddresses.size != 1) throw new ConfigurationException( - "Actor [%s] configured with Direct router must have exactly 1 remote node configured. Found [%s]" - .format(address, remoteAddresses.mkString(", "))) - () ⇒ new DirectRouter - - case RouterType.Random ⇒ - if (remoteAddresses.size < 1) throw new ConfigurationException( - "Actor [%s] configured with Random router must have at least 1 remote node configured. Found [%s]" - .format(address, remoteAddresses.mkString(", "))) - () ⇒ new RandomRouter - - case RouterType.RoundRobin ⇒ - if (remoteAddresses.size < 1) throw new ConfigurationException( - "Actor [%s] configured with RoundRobin router must have at least 1 remote node configured. Found [%s]" - .format(address, remoteAddresses.mkString(", "))) - () ⇒ new RoundRobinRouter - - case RouterType.ScatterGather ⇒ - if (remoteAddresses.size < 1) throw new ConfigurationException( - "Actor [%s] configured with ScatterGather router must have at least 1 remote node configured. Found [%s]" - .format(address, remoteAddresses.mkString(", "))) - () ⇒ new ScatterGatherFirstCompletedRouter()(defaultDispatcher, defaultTimeout) - - case RouterType.LeastCPU ⇒ sys.error("Router LeastCPU not supported yet") - case RouterType.LeastRAM ⇒ sys.error("Router LeastRAM not supported yet") - case RouterType.LeastMessages ⇒ sys.error("Router LeastMessages not supported yet") - case RouterType.Custom(implClass) ⇒ () ⇒ Routing.createCustomRouter(implClass) + val failureDetector = DeploymentConfig.failureDetectorTypeFor(failureDetectorType) match { + case FailureDetectorType.NoOp ⇒ new NoOpFailureDetector + case FailureDetectorType.RemoveConnectionOnFirstFailure ⇒ new RemoveConnectionOnFirstFailureFailureDetector + case FailureDetectorType.BannagePeriod(timeToBan) ⇒ new BannagePeriodFailureDetector(timeToBan) + case FailureDetectorType.Custom(implClass) ⇒ FailureDetector.createCustomFailureDetector(implClass) } - val connections = (Map.empty[InetSocketAddress, ActorRef] /: remoteAddresses) { (conns, a) ⇒ - val inetAddr = new InetSocketAddress(a.hostname, a.port) - conns + (inetAddr -> RemoteActorRef(remote.server, inetAddr, address, None)) + val thisHostname = remote.address.getHostName + val thisPort = remote.address.getPort + + def isReplicaNode: Boolean = remoteAddresses exists { some ⇒ some.hostname == thisHostname && some.port == thisPort } + + if (isReplicaNode) { + // we are on one of the replica node for this remote actor + new LocalActorRef(app, props, address, false) + } else { + + // we are on the single "reference" node uses the remote actors on the replica nodes + val routerFactory: () ⇒ Router = DeploymentConfig.routerTypeFor(routerType) match { + case RouterType.Direct ⇒ + if (remoteAddresses.size != 1) throw new ConfigurationException( + "Actor [%s] configured with Direct router must have exactly 1 remote node configured. Found [%s]" + .format(address, remoteAddresses.mkString(", "))) + () ⇒ new DirectRouter + + case RouterType.Random ⇒ + if (remoteAddresses.size < 1) throw new ConfigurationException( + "Actor [%s] configured with Random router must have at least 1 remote node configured. Found [%s]" + .format(address, remoteAddresses.mkString(", "))) + () ⇒ new RandomRouter + + case RouterType.RoundRobin ⇒ + if (remoteAddresses.size < 1) throw new ConfigurationException( + "Actor [%s] configured with RoundRobin router must have at least 1 remote node configured. Found [%s]" + .format(address, remoteAddresses.mkString(", "))) + () ⇒ new RoundRobinRouter + + case RouterType.ScatterGather ⇒ + if (remoteAddresses.size < 1) throw new ConfigurationException( + "Actor [%s] configured with ScatterGather router must have at least 1 remote node configured. Found [%s]" + .format(address, remoteAddresses.mkString(", "))) + () ⇒ new ScatterGatherFirstCompletedRouter()(defaultDispatcher, defaultTimeout) + + case RouterType.LeastCPU ⇒ sys.error("Router LeastCPU not supported yet") + case RouterType.LeastRAM ⇒ sys.error("Router LeastRAM not supported yet") + case RouterType.LeastMessages ⇒ sys.error("Router LeastMessages not supported yet") + case RouterType.Custom(implClass) ⇒ () ⇒ Routing.createCustomRouter(implClass) + } + + val connections = (Map.empty[InetSocketAddress, ActorRef] /: remoteAddresses) { (conns, a) ⇒ + val inetAddr = new InetSocketAddress(a.hostname, a.port) + conns + (inetAddr -> RemoteActorRef(remote.server, inetAddr, address, None)) + } + + val connectionManager = new RemoteConnectionManager(app, remote, connections, failureDetector) + + connections.keys foreach { useActorOnNode(_, address, props.creator) } + + actorOf(RoutedProps(routerFactory = routerFactory, connectionManager = connectionManager), address) } - val connectionManager = new RemoteConnectionManager(app, remote, connections, failureDetector) - - connections.keys foreach { useActorOnNode(_, address, props.creator) } - - actorOf(RoutedProps(routerFactory = routerFactory, connectionManager = connectionManager), address) - } - - case deploy ⇒ local.actorOf(props, address, systemService) + case deploy ⇒ local.actorOf(props, address, systemService) + } + } catch { + case e: Exception ⇒ + newFuture completeWithException e // so the other threads gets notified of error + throw e } - } catch { - case e: Exception ⇒ - newFuture completeWithException e // so the other threads gets notified of error - throw e - } - // actor foreach app.registry.register // only for ActorRegistry backward compat, will be removed later + // actor foreach app.registry.register // only for ActorRegistry backward compat, will be removed later - newFuture completeWithResult actor - actors.replace(address, newFuture, actor) - actor - case actor: ActorRef ⇒ actor - case future: Future[_] ⇒ future.get.asInstanceOf[ActorRef] + newFuture completeWithResult actor + actors.replace(address, newFuture, actor) + actor + case actor: ActorRef ⇒ actor + case future: Future[_] ⇒ future.get.asInstanceOf[ActorRef] + } } - } /** * Copied from LocalActorRefProvider...