From 4f643eaa1bc84fd25f21a80169d56967e2e7d489 Mon Sep 17 00:00:00 2001 From: Roland Date: Fri, 9 Dec 2011 20:19:59 +0100 Subject: [PATCH] simplify structure of Deployer MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - remove unused ActorDeployer trait - put everything in one class with simple initialization structure and one protected method to override for adaptations - adapt RemoteDeployer accordingly - change “remote” key to directly contain the single remote address, since there is nothing else to configure - adapt test cases accordingly --- .../test/scala/akka/actor/DeployerSpec.scala | 38 +--- .../scala/akka/actor/ActorRefProvider.scala | 31 +-- .../src/main/scala/akka/actor/Deployer.scala | 207 ++---------------- .../akka/remote/RemoteActorRefProvider.scala | 133 ++--------- .../scala/akka/remote/RemoteDeployer.scala | 40 +--- .../scala/akka/remote/RemoteInterface.scala | 13 ++ .../akka/remote/RemoteCommunicationSpec.scala | 6 +- .../akka/remote/RemoteDeployerSpec.scala | 62 +----- 8 files changed, 85 insertions(+), 445 deletions(-) diff --git a/akka-actor-tests/src/test/scala/akka/actor/DeployerSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/DeployerSpec.scala index a3bcbaa882..5437d0daa0 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/DeployerSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/DeployerSpec.scala @@ -89,7 +89,7 @@ class DeployerSpec extends AkkaSpec(DeployerSpec.deployerConf) { "be able to parse 'akka.actor.deployment._' with all default values" in { val service = "/user/service1" - val deployment = system.asInstanceOf[ActorSystemImpl].provider.deployer.lookupDeployment(service) + val deployment = system.asInstanceOf[ActorSystemImpl].provider.deployer.lookup(service) deployment must be('defined) deployment must be(Some( @@ -103,13 +103,13 @@ class DeployerSpec extends AkkaSpec(DeployerSpec.deployerConf) { "use None deployment for undefined service" in { val service = "/user/undefined" - val deployment = system.asInstanceOf[ActorSystemImpl].provider.deployer.lookupDeployment(service) + val deployment = system.asInstanceOf[ActorSystemImpl].provider.deployer.lookup(service) deployment must be(None) } "be able to parse 'akka.actor.deployment._' with recipe" in { val service = "/user/service3" - val deployment = system.asInstanceOf[ActorSystemImpl].provider.deployer.lookupDeployment(service) + val deployment = system.asInstanceOf[ActorSystemImpl].provider.deployer.lookup(service) deployment must be('defined) deployment must be(Some( @@ -123,7 +123,7 @@ class DeployerSpec extends AkkaSpec(DeployerSpec.deployerConf) { "be able to parse 'akka.actor.deployment._' with number-of-instances=auto" in { val service = "/user/service-auto" - val deployment = system.asInstanceOf[ActorSystemImpl].provider.deployer.lookupDeployment(service) + val deployment = system.asInstanceOf[ActorSystemImpl].provider.deployer.lookup(service) deployment must be('defined) deployment must be(Some( @@ -186,7 +186,7 @@ class DeployerSpec extends AkkaSpec(DeployerSpec.deployerConf) { } def assertRouting(expected: Routing, service: String) { - val deployment = system.asInstanceOf[ActorSystemImpl].provider.deployer.lookupDeployment(service) + val deployment = system.asInstanceOf[ActorSystemImpl].provider.deployer.lookup(service) deployment must be('defined) deployment must be(Some( @@ -199,33 +199,5 @@ class DeployerSpec extends AkkaSpec(DeployerSpec.deployerConf) { } - "be able to parse 'akka.actor.deployment._' with specified cluster nodes" ignore { - val service = "/user/service-cluster1" - val deploymentConfig = system.asInstanceOf[ActorSystemImpl].provider.deployer.deploymentConfig - val deployment = system.asInstanceOf[ActorSystemImpl].provider.deployer.lookupDeployment(service) - deployment must be('defined) - - deployment.get.scope match { - case deploymentConfig.ClusterScope(remoteNodes, replication) ⇒ - remoteNodes must be(Seq(Node("wallace"), Node("gromit"))) - replication must be(Transient) - case other ⇒ fail("Unexpected: " + other) - } - } - - "be able to parse 'akka.actor.deployment._' with specified cluster replication" ignore { - val service = "/user/service-cluster2" - val deploymentConfig = system.asInstanceOf[ActorSystemImpl].provider.deployer.deploymentConfig - val deployment = system.asInstanceOf[ActorSystemImpl].provider.deployer.lookupDeployment(service) - deployment must be('defined) - - deployment.get.scope match { - case deploymentConfig.ClusterScope(remoteNodes, Replication(storage, strategy)) ⇒ - storage must be(TransactionLog) - strategy must be(WriteBehind) - case other ⇒ fail("Unexpected: " + other) - } - } - } } diff --git a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala index fc6f684aba..e9f75dba9d 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala @@ -70,9 +70,9 @@ trait ActorRefProvider { */ def init(system: ActorSystemImpl): Unit - private[akka] def deployer: Deployer + def deployer: Deployer - private[akka] def scheduler: Scheduler + def scheduler: Scheduler /** * Actor factory with create-only semantics: will create an actor as @@ -104,18 +104,16 @@ trait ActorRefProvider { */ def actorFor(ref: InternalActorRef, p: Iterable[String]): InternalActorRef - private[akka] def createDeathWatch(): DeathWatch - /** * Create AskActorRef to hook up message send to recipient with Future receiver. */ - private[akka] def ask(message: Any, recipient: ActorRef, within: Timeout): Future[Any] + def ask(message: Any, recipient: ActorRef, within: Timeout): Future[Any] /** * This Future is completed upon termination of this ActorRefProvider, which * is usually initiated by stopping the guardian via ActorSystem.stop(). */ - private[akka] def terminationFuture: Future[Unit] + def terminationFuture: Future[Unit] } /** @@ -342,14 +340,21 @@ class LocalActorRefProvider( val eventStream: EventStream, val scheduler: Scheduler, val deadLetters: InternalActorRef, - val rootPath: ActorPath) extends ActorRefProvider { + val rootPath: ActorPath, + val deployer: Deployer) extends ActorRefProvider { def this(_systemName: String, settings: ActorSystem.Settings, eventStream: EventStream, scheduler: Scheduler, deadLetters: InternalActorRef) = - this(_systemName, settings, eventStream, scheduler, deadLetters, new RootActorPath(LocalAddress(_systemName))) + this(_systemName, + settings, + eventStream, + scheduler, + deadLetters, + new RootActorPath(LocalAddress(_systemName)), + new Deployer(settings)) // FIXME remove both val nodename: String = "local" @@ -357,8 +362,6 @@ class LocalActorRefProvider( val log = Logging(eventStream, "LocalActorRefProvider") - private[akka] val deployer: Deployer = new Deployer(settings, eventStream, nodename) - /* * generate name for temporary actor refs */ @@ -481,7 +484,7 @@ class LocalActorRefProvider( lazy val tempContainer = new VirtualPathContainer(tempNode, rootGuardian, log) - val deathWatch = createDeathWatch() + val deathWatch = new LocalDeathWatch def init(_system: ActorSystemImpl) { system = _system @@ -524,13 +527,11 @@ class LocalActorRefProvider( private def adaptFromDeploy(r: RouterConfig, p: ActorPath): RouterConfig = { val lookupPath = p.elements.mkString("/", "/", "") - val deploy = deployer.instance.lookupDeployment(lookupPath) + val deploy = deployer.lookup(lookupPath) r.adaptFromDeploy(deploy) } - private[akka] def createDeathWatch(): DeathWatch = new LocalDeathWatch - - private[akka] def ask(message: Any, recipient: ActorRef, within: Timeout): Future[Any] = { + def ask(message: Any, recipient: ActorRef, within: Timeout): Future[Any] = { import akka.dispatch.DefaultPromise (if (within == null) settings.ActorTimeout else within) match { case t if t.duration.length <= 0 ⇒ diff --git a/akka-actor/src/main/scala/akka/actor/Deployer.scala b/akka-actor/src/main/scala/akka/actor/Deployer.scala index cbf0595065..dc8adf5a08 100644 --- a/akka-actor/src/main/scala/akka/actor/Deployer.scala +++ b/akka-actor/src/main/scala/akka/actor/Deployer.scala @@ -12,103 +12,38 @@ import akka.AkkaException import akka.config.ConfigurationException import akka.util.Duration import akka.event.EventStream -import com.typesafe.config.Config - -private[akka] trait ActorDeployer { - def init(deployments: Seq[Deploy]): Unit - def deploy(deployment: Deploy): Unit - def lookupDeploymentFor(path: String): Option[Deploy] - def lookupDeployment(path: String): Option[Deploy] = path match { - case null | "" ⇒ None - case s if s.startsWith("$") ⇒ None - case some ⇒ lookupDeploymentFor(some) - } - def deploy(deployment: Seq[Deploy]): Unit = deployment foreach (deploy(_)) -} +import com.typesafe.config._ /** * Deployer maps actor paths to actor deployments. * * @author Jonas Bonér */ -class Deployer(val settings: ActorSystem.Settings, val eventStream: EventStream, val nodename: String) extends ActorDeployer { +class Deployer(val settings: ActorSystem.Settings) { - val deploymentConfig = new DeploymentConfig(nodename) - val log = Logging(eventStream, "Deployer") + import scala.collection.JavaConverters._ - val instance: ActorDeployer = { - val deployer = new LocalDeployer() - deployer.init(deploymentsInConfig) - deployer - } + private val deployments = new ConcurrentHashMap[String, Deploy] + private val config = settings.config.getConfig("akka.actor.deployment") + protected val default = config.getConfig("default") + config.root.asScala flatMap { + case ("default", _) ⇒ None + case (key, value: ConfigObject) ⇒ parseConfig(key, value.toConfig) + case _ ⇒ None + } foreach deploy - def start(): Unit = instance.toString //Force evaluation + def lookup(path: String): Option[Deploy] = Option(deployments.get(path)) - def init(deployments: Seq[Deploy]) = instance.init(deployments) + def deploy(d: Deploy): Unit = deployments.put(d.path, d) - def deploy(deployment: Deploy): Unit = instance.deploy(deployment) - - def isLocal(deployment: Deploy): Boolean = deployment match { - case Deploy(_, _, _, _, LocalScope) | Deploy(_, _, _, _, _: LocalScope) ⇒ true - case _ ⇒ false - } - - def isClustered(deployment: Deploy): Boolean = !isLocal(deployment) - - def isLocal(path: String): Boolean = isLocal(deploymentFor(path)) //TODO Should this throw exception if path not found? - - def isClustered(path: String): Boolean = !isLocal(path) //TODO Should this throw exception if path not found? - - /** - * Same as 'lookupDeploymentFor' but throws an exception if no deployment is bound. - */ - protected def deploymentFor(path: String): Deploy = { - lookupDeploymentFor(path) match { - case Some(deployment) ⇒ deployment - case None ⇒ thrownNoDeploymentBoundException(path) - } - } - - def lookupDeploymentFor(path: String): Option[Deploy] = - instance.lookupDeploymentFor(path) - - protected def deploymentsInConfig: List[Deploy] = { - for (path ← pathsInConfig) yield lookupInConfig(path) - } - - protected def pathsInConfig: List[String] = { - def pathSubstring(path: String) = { - val i = path.indexOf(".") - if (i == -1) path else path.substring(0, i) - } - - import scala.collection.JavaConverters._ - settings.config.getConfig("akka.actor.deployment").root.keySet.asScala - .filterNot("default" ==) - .map(path ⇒ pathSubstring(path)) - .toSet.toList // toSet to force uniqueness - } - - /** - * Lookup deployment in 'akka.conf' configuration file. - */ - protected def lookupInConfig(path: String, configuration: Config = settings.config): Deploy = { - import scala.collection.JavaConverters._ + protected def parseConfig(key: String, config: Config): Option[Deploy] = { import akka.util.ReflectiveAccess.getClassFor - val defaultDeploymentConfig = configuration.getConfig("akka.actor.deployment.default") - - // -------------------------------- - // akka.actor.deployment. - // -------------------------------- - val deploymentKey = "akka.actor.deployment." + path - val deployment = configuration.getConfig(deploymentKey) - - val deploymentWithFallback = deployment.withFallback(defaultDeploymentConfig) + val deployment = config.withFallback(default) // -------------------------------- // akka.actor.deployment..router // -------------------------------- - val router: Routing = deploymentWithFallback.getString("router") match { + val router: Routing = deployment.getString("router") match { case "round-robin" ⇒ RoundRobin case "random" ⇒ Random case "scatter-gather" ⇒ ScatterGather @@ -125,11 +60,11 @@ class Deployer(val settings: ActorSystem.Settings, val eventStream: EventStream, if (router == NoRouting) OneNrOfInstances else { def invalidNrOfInstances(wasValue: Any) = new ConfigurationException( - "Config option [" + deploymentKey + + "Config option [akka.actor.deployment." + key + ".nr-of-instances] needs to be either [\"auto\"] or [1-N] - was [" + wasValue + "]") - deploymentWithFallback.getAnyRef("nr-of-instances").asInstanceOf[Any] match { + deployment.getAnyRef("nr-of-instances").asInstanceOf[Any] match { case "auto" ⇒ AutoNrOfInstances case 1 ⇒ OneNrOfInstances case 0 ⇒ ZeroNrOfInstances @@ -148,115 +83,15 @@ class Deployer(val settings: ActorSystem.Settings, val eventStream: EventStream, // akka.actor.deployment..create-as // -------------------------------- val recipe: Option[ActorRecipe] = - deploymentWithFallback.getString("create-as.class") match { + deployment.getString("create-as.class") match { case "" ⇒ None case impl ⇒ val implementationClass = getClassFor[Actor](impl).fold(e ⇒ throw new ConfigurationException( - "Config option [" + deploymentKey + ".create-as.class] load failed", e), identity) + "Config option [akka.actor.deployment." + key + ".create-as.class] load failed", e), identity) Some(ActorRecipe(implementationClass)) } - val clusterPreferredNodes = deploymentWithFallback.getStringList("cluster.preferred-nodes").asScala.toSeq - - // -------------------------------- - // akka.actor.deployment..cluster - // -------------------------------- - // def parseCluster: Scope = { - // def raiseHomeConfigError() = throw new ConfigurationException( - // "Config option [" + deploymentKey + - // ".cluster.preferred-nodes] needs to be a list with elements on format\n'host:', 'ip:' or 'node:', was [" + - // clusterPreferredNodes + "]") - // - // val remoteNodes = clusterPreferredNodes map { home ⇒ - // if (!(home.startsWith("host:") || home.startsWith("node:") || home.startsWith("ip:"))) raiseHomeConfigError() - // - // val tokenizer = new java.util.StringTokenizer(home, ":") - // val protocol = tokenizer.nextElement - // val address = tokenizer.nextElement.asInstanceOf[String] - // - // // TODO host and ip protocols? - // protocol match { - // case "node" ⇒ Node(address) - // case _ ⇒ raiseHomeConfigError() - // } - // } - // deploymentConfig.ClusterScope(remoteNodes, parseClusterReplication) - // } - - // -------------------------------- - // akka.actor.deployment..cluster.replication - // -------------------------------- - // def parseClusterReplication: ReplicationScheme = { - // deployment.hasPath("cluster.replication") match { - // case false ⇒ Transient - // case true ⇒ - // val replicationConfigWithFallback = deploymentWithFallback.getConfig("cluster.replication") - // val storage = replicationConfigWithFallback.getString("storage") match { - // case "transaction-log" ⇒ TransactionLog - // case "data-grid" ⇒ DataGrid - // case unknown ⇒ - // throw new ConfigurationException("Config option [" + deploymentKey + - // ".cluster.replication.storage] needs to be either [\"transaction-log\"] or [\"data-grid\"] - was [" + - // unknown + "]") - // } - // val strategy = replicationConfigWithFallback.getString("strategy") match { - // case "write-through" ⇒ WriteThrough - // case "write-behind" ⇒ WriteBehind - // case unknown ⇒ - // throw new ConfigurationException("Config option [" + deploymentKey + - // ".cluster.replication.strategy] needs to be either [\"write-through\"] or [\"write-behind\"] - was [" + - // unknown + "]") - // } - // Replication(storage, strategy) - // } - // } - // - // val scope = (remoteNodes, clusterPreferredNodes) match { - // case (Nil, Nil) ⇒ - // LocalScope - // case (_, Nil) ⇒ - // // we have a 'remote' config section - // parseRemote - // case (Nil, _) ⇒ - // // we have a 'cluster' config section - // parseCluster - // case (_, _) ⇒ throw new ConfigurationException( - // "Configuration for deployment ID [" + path + "] can not have both 'remote' and 'cluster' sections.") - // } - - Deploy(path, recipe, router, nrOfInstances, LocalScope) + Some(Deploy(key, recipe, router, nrOfInstances, LocalScope)) } - protected def throwDeploymentBoundException(deployment: Deploy): Nothing = { - val e = new DeploymentAlreadyBoundException("Path [" + deployment.path + "] already bound to [" + deployment + "]") - log.error(e, e.getMessage) - throw e - } - - protected def thrownNoDeploymentBoundException(path: String): Nothing = { - val e = new NoDeploymentBoundException("Path [" + path + "] is not bound to a deployment") - log.error(e, e.getMessage) - throw e - } } - -/** - * Simple local deployer, only for internal use. - * - * @author Jonas Bonér - */ -class LocalDeployer extends ActorDeployer { - private val deployments = new ConcurrentHashMap[String, Deploy] - - def init(deployments: Seq[Deploy]): Unit = deployments foreach deploy // deploy - - def shutdown(): Unit = deployments.clear() //TODO do something else/more? - - def deploy(deployment: Deploy): Unit = deployments.putIfAbsent(deployment.path, deployment) - - def lookupDeploymentFor(path: String): Option[Deploy] = Option(deployments.get(path)) -} - -class DeploymentException private[akka] (message: String) extends AkkaException(message) -class DeploymentAlreadyBoundException private[akka] (message: String) extends AkkaException(message) -class NoDeploymentBoundException private[akka] (message: String) extends AkkaException(message) diff --git a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala index 2a39a44c12..56a02219f9 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala @@ -47,9 +47,15 @@ class RemoteActorRefProvider( def systemGuardian = local.systemGuardian def nodename = remoteSettings.NodeName def clustername = remoteSettings.ClusterName + def terminationFuture = local.terminationFuture + def dispatcher = local.dispatcher + + val deployer = new RemoteDeployer(settings) val rootPath: ActorPath = RootActorPath(RemoteAddress(systemName, remoteSettings.serverSettings.Hostname, remoteSettings.serverSettings.Port)) - private val local = new LocalActorRefProvider(systemName, settings, eventStream, scheduler, _deadLetters, rootPath) + + private val local = new LocalActorRefProvider(systemName, settings, eventStream, scheduler, _deadLetters, rootPath, deployer) + private var serialization: Serialization = _ private var _remote: Remote = _ @@ -63,13 +69,6 @@ class RemoteActorRefProvider( terminationFuture.onComplete(_ ⇒ remote.server.shutdown()) } - private[akka] def terminationFuture = local.terminationFuture - - private[akka] def deployer: Deployer = new RemoteDeployer(settings, eventStream, nodename) - - def dispatcher = local.dispatcher - def defaultTimeout = settings.ActorTimeout - def actorOf(system: ActorSystemImpl, props: Props, supervisor: InternalActorRef, name: String, systemService: Boolean): InternalActorRef = if (systemService) local.actorOf(system, props, supervisor, name, systemService) else { @@ -80,105 +79,32 @@ class RemoteActorRefProvider( p.headOption match { case None ⇒ None case Some("remote") ⇒ lookupRemotes(p.drop(2)) - case Some("user") ⇒ deployer.lookupDeploymentFor(p.drop(1).mkString("/", "/", "")) + case Some("user") ⇒ deployer.lookup(p.drop(1).mkString("/", "/", "")) case Some(_) ⇒ None } } val elems = path.elements val deployment = (elems.head match { - case "user" ⇒ deployer.lookupDeploymentFor(elems.drop(1).mkString("/", "/", "")) - case _ ⇒ None - }) orElse (elems.head match { + case "user" ⇒ deployer.lookup(elems.drop(1).mkString("/", "/", "")) case "remote" ⇒ lookupRemotes(elems) case _ ⇒ None }) deployment match { - case Some(DeploymentConfig.Deploy(_, _, routerType, nrOfInstances, RemoteDeploymentConfig.RemoteScope(remoteAddresses))) ⇒ + case Some(DeploymentConfig.Deploy(_, _, _, _, RemoteDeploymentConfig.RemoteScope(address))) ⇒ - // FIXME RK deployer shall only concern itself with placement of actors on remote nodes - val address = remoteAddresses.head - if (address == rootPath.address) local.actorOf(system, props, supervisor, name, true) // FIXME RK make non-system + if (address == rootPath.address) local.actorOf(system, props, supervisor, name) else { val rpath = RootActorPath(address) / "remote" / rootPath.address.hostPort / path.elements useActorOnNode(rpath, props.creator, supervisor) new RemoteActorRef(this, remote.server, rpath, supervisor, None) } - // def isReplicaNode: Boolean = remoteAddresses exists { _ == remote.remoteAddress } - // - // //system.eventHandler.debug(this, "%s: Deploy Remote Actor with address [%s] connected to [%s]: isReplica(%s)".format(system.defaultAddress, address, remoteAddresses.mkString, isReplicaNode)) - // - // if (isReplicaNode) { - // // we are on one of the replica node for this remote actor - // local.actorOf(system, props, supervisor, name, true) //FIXME systemService = true here to bypass Deploy, should be fixed when create-or-get is replaced by get-or-create (is this fixed now?) - // } else { - // - // implicit val dispatcher = if (props.dispatcher == Props.defaultDispatcher) system.dispatcher else props.dispatcher - // implicit val timeout = system.settings.ActorTimeout - // - // // we are on the single "reference" node uses the remote actors on the replica nodes - // val routerFactory: () ⇒ Router = DeploymentConfig.routerTypeFor(routerType) match { - // case RouterType.Direct ⇒ - // if (remoteAddresses.size != 1) throw new ConfigurationException( - // "Actor [%s] configured with Direct router must have exactly 1 remote node configured. Found [%s]" - // .format(name, remoteAddresses.mkString(", "))) - // () ⇒ new DirectRouter - // - // case RouterType.Broadcast ⇒ - // if (remoteAddresses.size != 1) throw new ConfigurationException( - // "Actor [%s] configured with Broadcast router must have exactly 1 remote node configured. Found [%s]" - // .format(name, remoteAddresses.mkString(", "))) - // () ⇒ new BroadcastRouter - // - // case RouterType.Random ⇒ - // if (remoteAddresses.size < 1) throw new ConfigurationException( - // "Actor [%s] configured with Random router must have at least 1 remote node configured. Found [%s]" - // .format(name, remoteAddresses.mkString(", "))) - // () ⇒ new RandomRouter - // - // case RouterType.RoundRobin ⇒ - // if (remoteAddresses.size < 1) throw new ConfigurationException( - // "Actor [%s] configured with RoundRobin router must have at least 1 remote node configured. Found [%s]" - // .format(name, remoteAddresses.mkString(", "))) - // () ⇒ new RoundRobinRouter - // - // case RouterType.ScatterGather ⇒ - // if (remoteAddresses.size < 1) throw new ConfigurationException( - // "Actor [%s] configured with ScatterGather router must have at least 1 remote node configured. Found [%s]" - // .format(name, remoteAddresses.mkString(", "))) - // () ⇒ new ScatterGatherFirstCompletedRouter()(dispatcher, defaultTimeout) - // - // case RouterType.LeastCPU ⇒ sys.error("Router LeastCPU not supported yet") - // case RouterType.LeastRAM ⇒ sys.error("Router LeastRAM not supported yet") - // case RouterType.LeastMessages ⇒ sys.error("Router LeastMessages not supported yet") - // case RouterType.Custom(implClass) ⇒ () ⇒ Routing.createCustomRouter(implClass) - // } - // - // val connections = (Map.empty[RemoteAddress, ActorRef] /: remoteAddresses) { (conns, a) ⇒ - // conns + (a -> new RemoteActorRef(this, remote.server, path, None)) // FIXME RK correct path must be put in here - // } - // - // val connectionManager = new RemoteConnectionManager(system, remote, connections) - // - // connections.keys foreach { useActorOnNode(system, _, path.toString, props.creator) } - // - // actorOf(system, RoutedProps(routerFactory = routerFactory, connectionManager = connectionManager), supervisor, name) - // } - case deploy ⇒ local.actorOf(system, props, supervisor, name, systemService) + case _ ⇒ local.actorOf(system, props, supervisor, name, systemService) } } - /** - * Copied from LocalActorRefProvider... - */ - // FIXME: implement supervision, ticket #1408 - // def actorOf(system: ActorSystem, props: RoutedProps, supervisor: InternalActorRef, name: String): InternalActorRef = { - // if (props.connectionManager.isEmpty) throw new ConfigurationException("RoutedProps used for creating actor [" + name + "] has zero connections configured; can't create a router") - // new RoutedActorRef(system, props, supervisor, name) - // } - def actorFor(path: ActorPath): InternalActorRef = path.root match { case `rootPath` ⇒ actorFor(rootGuardian, path.elements) case RootActorPath(_: RemoteAddress, _) ⇒ new RemoteActorRef(this, remote.server, path, Nobody, None) @@ -194,9 +120,7 @@ class RemoteActorRefProvider( def actorFor(ref: InternalActorRef, path: Iterable[String]): InternalActorRef = local.actorFor(ref, path) - // TODO remove me - val optimizeLocal = new AtomicBoolean(true) - def optimizeLocalScoped_?() = optimizeLocal.get + def ask(message: Any, recipient: ActorRef, within: Timeout): Future[Any] = local.ask(message, recipient, within) /** * Using (checking out) actor on a specific node. @@ -220,37 +144,6 @@ class RemoteActorRefProvider( // we don’t wait for the ACK, because the remote end will process this command before any other message to the new actor actorFor(RootActorPath(path.address) / "remote") ! command } - - private def sendCommandToRemoteNode(connection: ActorRef, command: RemoteSystemDaemonMessageProtocol, withACK: Boolean) { - if (withACK) { - try { - val f = connection ? (command, remoteSettings.RemoteSystemDaemonAckTimeout) - (try f.await.value catch { case _: FutureTimeoutException ⇒ None }) match { - case Some(Right(receiver)) ⇒ - log.debug("Remote system command sent to [{}] successfully received", receiver) - - case Some(Left(cause)) ⇒ - log.error(cause, cause.toString) - throw cause - - case None ⇒ - val error = new RemoteException("Remote system command to [%s] timed out".format(connection.path)) - log.error(error, error.toString) - throw error - } - } catch { - case e: Exception ⇒ - log.error(e, "Could not send remote system command to [{}] due to: {}", connection.path, e.toString) - throw e - } - } else { - connection ! command - } - } - - private[akka] def createDeathWatch(): DeathWatch = local.createDeathWatch() //FIXME Implement Remote DeathWatch, ticket ##1190 - - private[akka] def ask(message: Any, recipient: ActorRef, within: Timeout): Future[Any] = local.ask(message, recipient, within) } /** diff --git a/akka-remote/src/main/scala/akka/remote/RemoteDeployer.scala b/akka-remote/src/main/scala/akka/remote/RemoteDeployer.scala index c3db02e9f1..8e790991db 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteDeployer.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteDeployer.scala @@ -11,46 +11,28 @@ import akka.config.ConfigurationException object RemoteDeploymentConfig { - case class RemoteScope(nodes: Iterable[RemoteAddress]) extends DeploymentConfig.Scope + case class RemoteScope(node: RemoteAddress) extends DeploymentConfig.Scope } -class RemoteDeployer(_settings: ActorSystem.Settings, _eventStream: EventStream, _nodename: String) - extends Deployer(_settings, _eventStream, _nodename) { +class RemoteDeployer(_settings: ActorSystem.Settings) extends Deployer(_settings) { import RemoteDeploymentConfig._ - override protected def lookupInConfig(path: String, configuration: Config = settings.config): Deploy = { + override protected def parseConfig(path: String, config: Config): Option[Deploy] = { import scala.collection.JavaConverters._ import akka.util.ReflectiveAccess._ - val defaultDeploymentConfig = configuration.getConfig("akka.actor.deployment.default") + val deployment = config.withFallback(default) - // -------------------------------- - // akka.actor.deployment. - // -------------------------------- - val deploymentKey = "akka.actor.deployment." + path - val deployment = configuration.getConfig(deploymentKey) + val transform: Deploy ⇒ Deploy = + if (deployment.hasPath("remote")) deployment.getString("remote") match { + case RemoteAddressExtractor(r) ⇒ (d ⇒ d.copy(scope = RemoteScope(r))) + case _ ⇒ identity + } + else identity - val deploymentWithFallback = deployment.withFallback(defaultDeploymentConfig) - - val remoteNodes = deploymentWithFallback.getStringList("remote.nodes").asScala.toSeq - - // -------------------------------- - // akka.actor.deployment..remote - // -------------------------------- - def parseRemote: Scope = { - def raiseRemoteNodeParsingError() = throw new ConfigurationException( - "Config option [" + deploymentKey + - ".remote.nodes] needs to be a list with elements on format \":\", was [" + remoteNodes.mkString(", ") + "]") - - val remoteAddresses = remoteNodes map (RemoteAddress(_, settings.name)) - - RemoteScope(remoteAddresses) - } - - val local = super.lookupInConfig(path, configuration) - if (remoteNodes.isEmpty) local else local.copy(scope = parseRemote) + super.parseConfig(path, config) map transform } } \ No newline at end of file diff --git a/akka-remote/src/main/scala/akka/remote/RemoteInterface.scala b/akka-remote/src/main/scala/akka/remote/RemoteInterface.scala index 3791924f19..21da080f87 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteInterface.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteInterface.scala @@ -29,6 +29,19 @@ object RemoteAddress { case RE(sys, host, Int(port)) ⇒ apply(if (sys != null) sys else defaultSystem, host, port) case _ ⇒ throw new IllegalArgumentException(stringRep + " is not a valid remote address [system@host:port]") } + +} + +object RemoteAddressExtractor { + def unapply(s: String): Option[RemoteAddress] = { + try { + val uri = new URI("akka://" + s) + if (uri.getScheme != "akka" || uri.getUserInfo == null || uri.getHost == null || uri.getPort == -1) None + else Some(RemoteAddress(uri.getUserInfo, uri.getHost, uri.getPort)) + } catch { + case _: URISyntaxException ⇒ None + } + } } case class RemoteAddress(system: String, host: String, ip: InetAddress, port: Int) extends Address { diff --git a/akka-remote/src/test/scala/akka/remote/RemoteCommunicationSpec.scala b/akka-remote/src/test/scala/akka/remote/RemoteCommunicationSpec.scala index 80e7c122da..cb4b3c2f52 100644 --- a/akka-remote/src/test/scala/akka/remote/RemoteCommunicationSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/RemoteCommunicationSpec.scala @@ -38,9 +38,9 @@ akka { port = 12345 } actor.deployment { - /blub.remote.nodes = ["remote_sys@localhost:12346"] - /looker/child.remote.nodes = ["remote_sys@localhost:12346"] - /looker/child/grandchild.remote.nodes = ["RemoteCommunicationSpec@localhost:12345"] + /blub.remote = "remote_sys@localhost:12346" + /looker/child.remote = "remote_sys@localhost:12346" + /looker/child/grandchild.remote = "RemoteCommunicationSpec@localhost:12345" } } """) with ImplicitSender { diff --git a/akka-remote/src/test/scala/akka/remote/RemoteDeployerSpec.scala b/akka-remote/src/test/scala/akka/remote/RemoteDeployerSpec.scala index 3956d8fdab..c8daf3b13b 100644 --- a/akka-remote/src/test/scala/akka/remote/RemoteDeployerSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/RemoteDeployerSpec.scala @@ -14,65 +14,10 @@ object RemoteDeployerSpec { akka.actor.provider = "akka.remote.RemoteActorRefProvider" akka.cluster.nodename = Whatever akka.actor.deployment { - /user/service1 { - } /user/service2 { router = round-robin nr-of-instances = 3 - remote { - nodes = ["wallace:2552", "gromit:2552"] - } - } - /user/service3 { - create-as { - class = "akka.actor.DeployerSpec$RecipeActor" - } - } - /user/service-auto { - router = round-robin - nr-of-instances = auto - } - /user/service-direct { - router = direct - } - /user/service-direct2 { - router = direct - # nr-of-instances ignored when router = direct - nr-of-instances = 2 - } - /user/service-round-robin { - router = round-robin - } - /user/service-random { - router = random - } - /user/service-scatter-gather { - router = scatter-gather - } - /user/service-least-cpu { - router = least-cpu - } - /user/service-least-ram { - router = least-ram - } - /user/service-least-messages { - router = least-messages - } - /user/service-custom { - router = org.my.Custom - } - /user/service-cluster1 { - cluster { - preferred-nodes = ["node:wallace", "node:gromit"] - } - } - /user/service-cluster2 { - cluster { - preferred-nodes = ["node:wallace", "node:gromit"] - replication { - strategy = write-behind - } - } + remote = "sys@wallace:2552" } } """, ConfigParseOptions.defaults) @@ -90,7 +35,7 @@ class RemoteDeployerSpec extends AkkaSpec(RemoteDeployerSpec.deployerConf) { "be able to parse 'akka.actor.deployment._' with specified remote nodes" in { val service = "/user/service2" - val deployment = system.asInstanceOf[ActorSystemImpl].provider.deployer.lookupDeployment(service) + val deployment = system.asInstanceOf[ActorSystemImpl].provider.deployer.lookup(service) deployment must be('defined) deployment must be(Some( @@ -99,8 +44,7 @@ class RemoteDeployerSpec extends AkkaSpec(RemoteDeployerSpec.deployerConf) { None, RoundRobin, NrOfInstances(3), - RemoteScope(Seq( - RemoteAddress(system.name, "wallace", 2552), RemoteAddress(system.name, "gromit", 2552)))))) + RemoteScope(RemoteAddress("sys", "wallace", 2552))))) } }