diff --git a/akka-actor-tests/src/test/scala/akka/actor/DeployerSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/DeployerSpec.scala
index a3bcbaa882..5437d0daa0 100644
--- a/akka-actor-tests/src/test/scala/akka/actor/DeployerSpec.scala
+++ b/akka-actor-tests/src/test/scala/akka/actor/DeployerSpec.scala
@@ -89,7 +89,7 @@ class DeployerSpec extends AkkaSpec(DeployerSpec.deployerConf) {
"be able to parse 'akka.actor.deployment._' with all default values" in {
val service = "/user/service1"
- val deployment = system.asInstanceOf[ActorSystemImpl].provider.deployer.lookupDeployment(service)
+ val deployment = system.asInstanceOf[ActorSystemImpl].provider.deployer.lookup(service)
deployment must be('defined)
deployment must be(Some(
@@ -103,13 +103,13 @@ class DeployerSpec extends AkkaSpec(DeployerSpec.deployerConf) {
"use None deployment for undefined service" in {
val service = "/user/undefined"
- val deployment = system.asInstanceOf[ActorSystemImpl].provider.deployer.lookupDeployment(service)
+ val deployment = system.asInstanceOf[ActorSystemImpl].provider.deployer.lookup(service)
deployment must be(None)
}
"be able to parse 'akka.actor.deployment._' with recipe" in {
val service = "/user/service3"
- val deployment = system.asInstanceOf[ActorSystemImpl].provider.deployer.lookupDeployment(service)
+ val deployment = system.asInstanceOf[ActorSystemImpl].provider.deployer.lookup(service)
deployment must be('defined)
deployment must be(Some(
@@ -123,7 +123,7 @@ class DeployerSpec extends AkkaSpec(DeployerSpec.deployerConf) {
"be able to parse 'akka.actor.deployment._' with number-of-instances=auto" in {
val service = "/user/service-auto"
- val deployment = system.asInstanceOf[ActorSystemImpl].provider.deployer.lookupDeployment(service)
+ val deployment = system.asInstanceOf[ActorSystemImpl].provider.deployer.lookup(service)
deployment must be('defined)
deployment must be(Some(
@@ -186,7 +186,7 @@ class DeployerSpec extends AkkaSpec(DeployerSpec.deployerConf) {
}
def assertRouting(expected: Routing, service: String) {
- val deployment = system.asInstanceOf[ActorSystemImpl].provider.deployer.lookupDeployment(service)
+ val deployment = system.asInstanceOf[ActorSystemImpl].provider.deployer.lookup(service)
deployment must be('defined)
deployment must be(Some(
@@ -199,33 +199,5 @@ class DeployerSpec extends AkkaSpec(DeployerSpec.deployerConf) {
}
- "be able to parse 'akka.actor.deployment._' with specified cluster nodes" ignore {
- val service = "/user/service-cluster1"
- val deploymentConfig = system.asInstanceOf[ActorSystemImpl].provider.deployer.deploymentConfig
- val deployment = system.asInstanceOf[ActorSystemImpl].provider.deployer.lookupDeployment(service)
- deployment must be('defined)
-
- deployment.get.scope match {
- case deploymentConfig.ClusterScope(remoteNodes, replication) ⇒
- remoteNodes must be(Seq(Node("wallace"), Node("gromit")))
- replication must be(Transient)
- case other ⇒ fail("Unexpected: " + other)
- }
- }
-
- "be able to parse 'akka.actor.deployment._' with specified cluster replication" ignore {
- val service = "/user/service-cluster2"
- val deploymentConfig = system.asInstanceOf[ActorSystemImpl].provider.deployer.deploymentConfig
- val deployment = system.asInstanceOf[ActorSystemImpl].provider.deployer.lookupDeployment(service)
- deployment must be('defined)
-
- deployment.get.scope match {
- case deploymentConfig.ClusterScope(remoteNodes, Replication(storage, strategy)) ⇒
- storage must be(TransactionLog)
- strategy must be(WriteBehind)
- case other ⇒ fail("Unexpected: " + other)
- }
- }
-
}
}
diff --git a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala
index fc6f684aba..e9f75dba9d 100644
--- a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala
+++ b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala
@@ -70,9 +70,9 @@ trait ActorRefProvider {
*/
def init(system: ActorSystemImpl): Unit
- private[akka] def deployer: Deployer
+ def deployer: Deployer
- private[akka] def scheduler: Scheduler
+ def scheduler: Scheduler
/**
* Actor factory with create-only semantics: will create an actor as
@@ -104,18 +104,16 @@ trait ActorRefProvider {
*/
def actorFor(ref: InternalActorRef, p: Iterable[String]): InternalActorRef
- private[akka] def createDeathWatch(): DeathWatch
-
/**
* Create AskActorRef to hook up message send to recipient with Future receiver.
*/
- private[akka] def ask(message: Any, recipient: ActorRef, within: Timeout): Future[Any]
+ def ask(message: Any, recipient: ActorRef, within: Timeout): Future[Any]
/**
* This Future is completed upon termination of this ActorRefProvider, which
* is usually initiated by stopping the guardian via ActorSystem.stop().
*/
- private[akka] def terminationFuture: Future[Unit]
+ def terminationFuture: Future[Unit]
}
/**
@@ -342,14 +340,21 @@ class LocalActorRefProvider(
val eventStream: EventStream,
val scheduler: Scheduler,
val deadLetters: InternalActorRef,
- val rootPath: ActorPath) extends ActorRefProvider {
+ val rootPath: ActorPath,
+ val deployer: Deployer) extends ActorRefProvider {
def this(_systemName: String,
settings: ActorSystem.Settings,
eventStream: EventStream,
scheduler: Scheduler,
deadLetters: InternalActorRef) =
- this(_systemName, settings, eventStream, scheduler, deadLetters, new RootActorPath(LocalAddress(_systemName)))
+ this(_systemName,
+ settings,
+ eventStream,
+ scheduler,
+ deadLetters,
+ new RootActorPath(LocalAddress(_systemName)),
+ new Deployer(settings))
// FIXME remove both
val nodename: String = "local"
@@ -357,8 +362,6 @@ class LocalActorRefProvider(
val log = Logging(eventStream, "LocalActorRefProvider")
- private[akka] val deployer: Deployer = new Deployer(settings, eventStream, nodename)
-
/*
* generate name for temporary actor refs
*/
@@ -481,7 +484,7 @@ class LocalActorRefProvider(
lazy val tempContainer = new VirtualPathContainer(tempNode, rootGuardian, log)
- val deathWatch = createDeathWatch()
+ val deathWatch = new LocalDeathWatch
def init(_system: ActorSystemImpl) {
system = _system
@@ -524,13 +527,11 @@ class LocalActorRefProvider(
private def adaptFromDeploy(r: RouterConfig, p: ActorPath): RouterConfig = {
val lookupPath = p.elements.mkString("/", "/", "")
- val deploy = deployer.instance.lookupDeployment(lookupPath)
+ val deploy = deployer.lookup(lookupPath)
r.adaptFromDeploy(deploy)
}
- private[akka] def createDeathWatch(): DeathWatch = new LocalDeathWatch
-
- private[akka] def ask(message: Any, recipient: ActorRef, within: Timeout): Future[Any] = {
+ def ask(message: Any, recipient: ActorRef, within: Timeout): Future[Any] = {
import akka.dispatch.DefaultPromise
(if (within == null) settings.ActorTimeout else within) match {
case t if t.duration.length <= 0 ⇒
diff --git a/akka-actor/src/main/scala/akka/actor/Deployer.scala b/akka-actor/src/main/scala/akka/actor/Deployer.scala
index cbf0595065..dc8adf5a08 100644
--- a/akka-actor/src/main/scala/akka/actor/Deployer.scala
+++ b/akka-actor/src/main/scala/akka/actor/Deployer.scala
@@ -12,103 +12,38 @@ import akka.AkkaException
import akka.config.ConfigurationException
import akka.util.Duration
import akka.event.EventStream
-import com.typesafe.config.Config
-
-private[akka] trait ActorDeployer {
- def init(deployments: Seq[Deploy]): Unit
- def deploy(deployment: Deploy): Unit
- def lookupDeploymentFor(path: String): Option[Deploy]
- def lookupDeployment(path: String): Option[Deploy] = path match {
- case null | "" ⇒ None
- case s if s.startsWith("$") ⇒ None
- case some ⇒ lookupDeploymentFor(some)
- }
- def deploy(deployment: Seq[Deploy]): Unit = deployment foreach (deploy(_))
-}
+import com.typesafe.config._
/**
* Deployer maps actor paths to actor deployments.
*
* @author Jonas Bonér
*/
-class Deployer(val settings: ActorSystem.Settings, val eventStream: EventStream, val nodename: String) extends ActorDeployer {
+class Deployer(val settings: ActorSystem.Settings) {
- val deploymentConfig = new DeploymentConfig(nodename)
- val log = Logging(eventStream, "Deployer")
+ import scala.collection.JavaConverters._
- val instance: ActorDeployer = {
- val deployer = new LocalDeployer()
- deployer.init(deploymentsInConfig)
- deployer
- }
+ private val deployments = new ConcurrentHashMap[String, Deploy]
+ private val config = settings.config.getConfig("akka.actor.deployment")
+ protected val default = config.getConfig("default")
+ config.root.asScala flatMap {
+ case ("default", _) ⇒ None
+ case (key, value: ConfigObject) ⇒ parseConfig(key, value.toConfig)
+ case _ ⇒ None
+ } foreach deploy
- def start(): Unit = instance.toString //Force evaluation
+ def lookup(path: String): Option[Deploy] = Option(deployments.get(path))
- def init(deployments: Seq[Deploy]) = instance.init(deployments)
+ def deploy(d: Deploy): Unit = deployments.put(d.path, d)
- def deploy(deployment: Deploy): Unit = instance.deploy(deployment)
-
- def isLocal(deployment: Deploy): Boolean = deployment match {
- case Deploy(_, _, _, _, LocalScope) | Deploy(_, _, _, _, _: LocalScope) ⇒ true
- case _ ⇒ false
- }
-
- def isClustered(deployment: Deploy): Boolean = !isLocal(deployment)
-
- def isLocal(path: String): Boolean = isLocal(deploymentFor(path)) //TODO Should this throw exception if path not found?
-
- def isClustered(path: String): Boolean = !isLocal(path) //TODO Should this throw exception if path not found?
-
- /**
- * Same as 'lookupDeploymentFor' but throws an exception if no deployment is bound.
- */
- protected def deploymentFor(path: String): Deploy = {
- lookupDeploymentFor(path) match {
- case Some(deployment) ⇒ deployment
- case None ⇒ thrownNoDeploymentBoundException(path)
- }
- }
-
- def lookupDeploymentFor(path: String): Option[Deploy] =
- instance.lookupDeploymentFor(path)
-
- protected def deploymentsInConfig: List[Deploy] = {
- for (path ← pathsInConfig) yield lookupInConfig(path)
- }
-
- protected def pathsInConfig: List[String] = {
- def pathSubstring(path: String) = {
- val i = path.indexOf(".")
- if (i == -1) path else path.substring(0, i)
- }
-
- import scala.collection.JavaConverters._
- settings.config.getConfig("akka.actor.deployment").root.keySet.asScala
- .filterNot("default" ==)
- .map(path ⇒ pathSubstring(path))
- .toSet.toList // toSet to force uniqueness
- }
-
- /**
- * Lookup deployment in 'akka.conf' configuration file.
- */
- protected def lookupInConfig(path: String, configuration: Config = settings.config): Deploy = {
- import scala.collection.JavaConverters._
+ protected def parseConfig(key: String, config: Config): Option[Deploy] = {
import akka.util.ReflectiveAccess.getClassFor
- val defaultDeploymentConfig = configuration.getConfig("akka.actor.deployment.default")
-
- // --------------------------------
- // akka.actor.deployment.
- // --------------------------------
- val deploymentKey = "akka.actor.deployment." + path
- val deployment = configuration.getConfig(deploymentKey)
-
- val deploymentWithFallback = deployment.withFallback(defaultDeploymentConfig)
+ val deployment = config.withFallback(default)
// --------------------------------
// akka.actor.deployment..router
// --------------------------------
- val router: Routing = deploymentWithFallback.getString("router") match {
+ val router: Routing = deployment.getString("router") match {
case "round-robin" ⇒ RoundRobin
case "random" ⇒ Random
case "scatter-gather" ⇒ ScatterGather
@@ -125,11 +60,11 @@ class Deployer(val settings: ActorSystem.Settings, val eventStream: EventStream,
if (router == NoRouting) OneNrOfInstances
else {
def invalidNrOfInstances(wasValue: Any) = new ConfigurationException(
- "Config option [" + deploymentKey +
+ "Config option [akka.actor.deployment." + key +
".nr-of-instances] needs to be either [\"auto\"] or [1-N] - was [" +
wasValue + "]")
- deploymentWithFallback.getAnyRef("nr-of-instances").asInstanceOf[Any] match {
+ deployment.getAnyRef("nr-of-instances").asInstanceOf[Any] match {
case "auto" ⇒ AutoNrOfInstances
case 1 ⇒ OneNrOfInstances
case 0 ⇒ ZeroNrOfInstances
@@ -148,115 +83,15 @@ class Deployer(val settings: ActorSystem.Settings, val eventStream: EventStream,
// akka.actor.deployment..create-as
// --------------------------------
val recipe: Option[ActorRecipe] =
- deploymentWithFallback.getString("create-as.class") match {
+ deployment.getString("create-as.class") match {
case "" ⇒ None
case impl ⇒
val implementationClass = getClassFor[Actor](impl).fold(e ⇒ throw new ConfigurationException(
- "Config option [" + deploymentKey + ".create-as.class] load failed", e), identity)
+ "Config option [akka.actor.deployment." + key + ".create-as.class] load failed", e), identity)
Some(ActorRecipe(implementationClass))
}
- val clusterPreferredNodes = deploymentWithFallback.getStringList("cluster.preferred-nodes").asScala.toSeq
-
- // --------------------------------
- // akka.actor.deployment..cluster
- // --------------------------------
- // def parseCluster: Scope = {
- // def raiseHomeConfigError() = throw new ConfigurationException(
- // "Config option [" + deploymentKey +
- // ".cluster.preferred-nodes] needs to be a list with elements on format\n'host:', 'ip:' or 'node:', was [" +
- // clusterPreferredNodes + "]")
- //
- // val remoteNodes = clusterPreferredNodes map { home ⇒
- // if (!(home.startsWith("host:") || home.startsWith("node:") || home.startsWith("ip:"))) raiseHomeConfigError()
- //
- // val tokenizer = new java.util.StringTokenizer(home, ":")
- // val protocol = tokenizer.nextElement
- // val address = tokenizer.nextElement.asInstanceOf[String]
- //
- // // TODO host and ip protocols?
- // protocol match {
- // case "node" ⇒ Node(address)
- // case _ ⇒ raiseHomeConfigError()
- // }
- // }
- // deploymentConfig.ClusterScope(remoteNodes, parseClusterReplication)
- // }
-
- // --------------------------------
- // akka.actor.deployment..cluster.replication
- // --------------------------------
- // def parseClusterReplication: ReplicationScheme = {
- // deployment.hasPath("cluster.replication") match {
- // case false ⇒ Transient
- // case true ⇒
- // val replicationConfigWithFallback = deploymentWithFallback.getConfig("cluster.replication")
- // val storage = replicationConfigWithFallback.getString("storage") match {
- // case "transaction-log" ⇒ TransactionLog
- // case "data-grid" ⇒ DataGrid
- // case unknown ⇒
- // throw new ConfigurationException("Config option [" + deploymentKey +
- // ".cluster.replication.storage] needs to be either [\"transaction-log\"] or [\"data-grid\"] - was [" +
- // unknown + "]")
- // }
- // val strategy = replicationConfigWithFallback.getString("strategy") match {
- // case "write-through" ⇒ WriteThrough
- // case "write-behind" ⇒ WriteBehind
- // case unknown ⇒
- // throw new ConfigurationException("Config option [" + deploymentKey +
- // ".cluster.replication.strategy] needs to be either [\"write-through\"] or [\"write-behind\"] - was [" +
- // unknown + "]")
- // }
- // Replication(storage, strategy)
- // }
- // }
- //
- // val scope = (remoteNodes, clusterPreferredNodes) match {
- // case (Nil, Nil) ⇒
- // LocalScope
- // case (_, Nil) ⇒
- // // we have a 'remote' config section
- // parseRemote
- // case (Nil, _) ⇒
- // // we have a 'cluster' config section
- // parseCluster
- // case (_, _) ⇒ throw new ConfigurationException(
- // "Configuration for deployment ID [" + path + "] can not have both 'remote' and 'cluster' sections.")
- // }
-
- Deploy(path, recipe, router, nrOfInstances, LocalScope)
+ Some(Deploy(key, recipe, router, nrOfInstances, LocalScope))
}
- protected def throwDeploymentBoundException(deployment: Deploy): Nothing = {
- val e = new DeploymentAlreadyBoundException("Path [" + deployment.path + "] already bound to [" + deployment + "]")
- log.error(e, e.getMessage)
- throw e
- }
-
- protected def thrownNoDeploymentBoundException(path: String): Nothing = {
- val e = new NoDeploymentBoundException("Path [" + path + "] is not bound to a deployment")
- log.error(e, e.getMessage)
- throw e
- }
}
-
-/**
- * Simple local deployer, only for internal use.
- *
- * @author Jonas Bonér
- */
-class LocalDeployer extends ActorDeployer {
- private val deployments = new ConcurrentHashMap[String, Deploy]
-
- def init(deployments: Seq[Deploy]): Unit = deployments foreach deploy // deploy
-
- def shutdown(): Unit = deployments.clear() //TODO do something else/more?
-
- def deploy(deployment: Deploy): Unit = deployments.putIfAbsent(deployment.path, deployment)
-
- def lookupDeploymentFor(path: String): Option[Deploy] = Option(deployments.get(path))
-}
-
-class DeploymentException private[akka] (message: String) extends AkkaException(message)
-class DeploymentAlreadyBoundException private[akka] (message: String) extends AkkaException(message)
-class NoDeploymentBoundException private[akka] (message: String) extends AkkaException(message)
diff --git a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala
index 2a39a44c12..56a02219f9 100644
--- a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala
+++ b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala
@@ -47,9 +47,15 @@ class RemoteActorRefProvider(
def systemGuardian = local.systemGuardian
def nodename = remoteSettings.NodeName
def clustername = remoteSettings.ClusterName
+ def terminationFuture = local.terminationFuture
+ def dispatcher = local.dispatcher
+
+ val deployer = new RemoteDeployer(settings)
val rootPath: ActorPath = RootActorPath(RemoteAddress(systemName, remoteSettings.serverSettings.Hostname, remoteSettings.serverSettings.Port))
- private val local = new LocalActorRefProvider(systemName, settings, eventStream, scheduler, _deadLetters, rootPath)
+
+ private val local = new LocalActorRefProvider(systemName, settings, eventStream, scheduler, _deadLetters, rootPath, deployer)
+
private var serialization: Serialization = _
private var _remote: Remote = _
@@ -63,13 +69,6 @@ class RemoteActorRefProvider(
terminationFuture.onComplete(_ ⇒ remote.server.shutdown())
}
- private[akka] def terminationFuture = local.terminationFuture
-
- private[akka] def deployer: Deployer = new RemoteDeployer(settings, eventStream, nodename)
-
- def dispatcher = local.dispatcher
- def defaultTimeout = settings.ActorTimeout
-
def actorOf(system: ActorSystemImpl, props: Props, supervisor: InternalActorRef, name: String, systemService: Boolean): InternalActorRef =
if (systemService) local.actorOf(system, props, supervisor, name, systemService)
else {
@@ -80,105 +79,32 @@ class RemoteActorRefProvider(
p.headOption match {
case None ⇒ None
case Some("remote") ⇒ lookupRemotes(p.drop(2))
- case Some("user") ⇒ deployer.lookupDeploymentFor(p.drop(1).mkString("/", "/", ""))
+ case Some("user") ⇒ deployer.lookup(p.drop(1).mkString("/", "/", ""))
case Some(_) ⇒ None
}
}
val elems = path.elements
val deployment = (elems.head match {
- case "user" ⇒ deployer.lookupDeploymentFor(elems.drop(1).mkString("/", "/", ""))
- case _ ⇒ None
- }) orElse (elems.head match {
+ case "user" ⇒ deployer.lookup(elems.drop(1).mkString("/", "/", ""))
case "remote" ⇒ lookupRemotes(elems)
case _ ⇒ None
})
deployment match {
- case Some(DeploymentConfig.Deploy(_, _, routerType, nrOfInstances, RemoteDeploymentConfig.RemoteScope(remoteAddresses))) ⇒
+ case Some(DeploymentConfig.Deploy(_, _, _, _, RemoteDeploymentConfig.RemoteScope(address))) ⇒
- // FIXME RK deployer shall only concern itself with placement of actors on remote nodes
- val address = remoteAddresses.head
- if (address == rootPath.address) local.actorOf(system, props, supervisor, name, true) // FIXME RK make non-system
+ if (address == rootPath.address) local.actorOf(system, props, supervisor, name)
else {
val rpath = RootActorPath(address) / "remote" / rootPath.address.hostPort / path.elements
useActorOnNode(rpath, props.creator, supervisor)
new RemoteActorRef(this, remote.server, rpath, supervisor, None)
}
- // def isReplicaNode: Boolean = remoteAddresses exists { _ == remote.remoteAddress }
- //
- // //system.eventHandler.debug(this, "%s: Deploy Remote Actor with address [%s] connected to [%s]: isReplica(%s)".format(system.defaultAddress, address, remoteAddresses.mkString, isReplicaNode))
- //
- // if (isReplicaNode) {
- // // we are on one of the replica node for this remote actor
- // local.actorOf(system, props, supervisor, name, true) //FIXME systemService = true here to bypass Deploy, should be fixed when create-or-get is replaced by get-or-create (is this fixed now?)
- // } else {
- //
- // implicit val dispatcher = if (props.dispatcher == Props.defaultDispatcher) system.dispatcher else props.dispatcher
- // implicit val timeout = system.settings.ActorTimeout
- //
- // // we are on the single "reference" node uses the remote actors on the replica nodes
- // val routerFactory: () ⇒ Router = DeploymentConfig.routerTypeFor(routerType) match {
- // case RouterType.Direct ⇒
- // if (remoteAddresses.size != 1) throw new ConfigurationException(
- // "Actor [%s] configured with Direct router must have exactly 1 remote node configured. Found [%s]"
- // .format(name, remoteAddresses.mkString(", ")))
- // () ⇒ new DirectRouter
- //
- // case RouterType.Broadcast ⇒
- // if (remoteAddresses.size != 1) throw new ConfigurationException(
- // "Actor [%s] configured with Broadcast router must have exactly 1 remote node configured. Found [%s]"
- // .format(name, remoteAddresses.mkString(", ")))
- // () ⇒ new BroadcastRouter
- //
- // case RouterType.Random ⇒
- // if (remoteAddresses.size < 1) throw new ConfigurationException(
- // "Actor [%s] configured with Random router must have at least 1 remote node configured. Found [%s]"
- // .format(name, remoteAddresses.mkString(", ")))
- // () ⇒ new RandomRouter
- //
- // case RouterType.RoundRobin ⇒
- // if (remoteAddresses.size < 1) throw new ConfigurationException(
- // "Actor [%s] configured with RoundRobin router must have at least 1 remote node configured. Found [%s]"
- // .format(name, remoteAddresses.mkString(", ")))
- // () ⇒ new RoundRobinRouter
- //
- // case RouterType.ScatterGather ⇒
- // if (remoteAddresses.size < 1) throw new ConfigurationException(
- // "Actor [%s] configured with ScatterGather router must have at least 1 remote node configured. Found [%s]"
- // .format(name, remoteAddresses.mkString(", ")))
- // () ⇒ new ScatterGatherFirstCompletedRouter()(dispatcher, defaultTimeout)
- //
- // case RouterType.LeastCPU ⇒ sys.error("Router LeastCPU not supported yet")
- // case RouterType.LeastRAM ⇒ sys.error("Router LeastRAM not supported yet")
- // case RouterType.LeastMessages ⇒ sys.error("Router LeastMessages not supported yet")
- // case RouterType.Custom(implClass) ⇒ () ⇒ Routing.createCustomRouter(implClass)
- // }
- //
- // val connections = (Map.empty[RemoteAddress, ActorRef] /: remoteAddresses) { (conns, a) ⇒
- // conns + (a -> new RemoteActorRef(this, remote.server, path, None)) // FIXME RK correct path must be put in here
- // }
- //
- // val connectionManager = new RemoteConnectionManager(system, remote, connections)
- //
- // connections.keys foreach { useActorOnNode(system, _, path.toString, props.creator) }
- //
- // actorOf(system, RoutedProps(routerFactory = routerFactory, connectionManager = connectionManager), supervisor, name)
- // }
- case deploy ⇒ local.actorOf(system, props, supervisor, name, systemService)
+ case _ ⇒ local.actorOf(system, props, supervisor, name, systemService)
}
}
- /**
- * Copied from LocalActorRefProvider...
- */
- // FIXME: implement supervision, ticket #1408
- // def actorOf(system: ActorSystem, props: RoutedProps, supervisor: InternalActorRef, name: String): InternalActorRef = {
- // if (props.connectionManager.isEmpty) throw new ConfigurationException("RoutedProps used for creating actor [" + name + "] has zero connections configured; can't create a router")
- // new RoutedActorRef(system, props, supervisor, name)
- // }
-
def actorFor(path: ActorPath): InternalActorRef = path.root match {
case `rootPath` ⇒ actorFor(rootGuardian, path.elements)
case RootActorPath(_: RemoteAddress, _) ⇒ new RemoteActorRef(this, remote.server, path, Nobody, None)
@@ -194,9 +120,7 @@ class RemoteActorRefProvider(
def actorFor(ref: InternalActorRef, path: Iterable[String]): InternalActorRef = local.actorFor(ref, path)
- // TODO remove me
- val optimizeLocal = new AtomicBoolean(true)
- def optimizeLocalScoped_?() = optimizeLocal.get
+ def ask(message: Any, recipient: ActorRef, within: Timeout): Future[Any] = local.ask(message, recipient, within)
/**
* Using (checking out) actor on a specific node.
@@ -220,37 +144,6 @@ class RemoteActorRefProvider(
// we don’t wait for the ACK, because the remote end will process this command before any other message to the new actor
actorFor(RootActorPath(path.address) / "remote") ! command
}
-
- private def sendCommandToRemoteNode(connection: ActorRef, command: RemoteSystemDaemonMessageProtocol, withACK: Boolean) {
- if (withACK) {
- try {
- val f = connection ? (command, remoteSettings.RemoteSystemDaemonAckTimeout)
- (try f.await.value catch { case _: FutureTimeoutException ⇒ None }) match {
- case Some(Right(receiver)) ⇒
- log.debug("Remote system command sent to [{}] successfully received", receiver)
-
- case Some(Left(cause)) ⇒
- log.error(cause, cause.toString)
- throw cause
-
- case None ⇒
- val error = new RemoteException("Remote system command to [%s] timed out".format(connection.path))
- log.error(error, error.toString)
- throw error
- }
- } catch {
- case e: Exception ⇒
- log.error(e, "Could not send remote system command to [{}] due to: {}", connection.path, e.toString)
- throw e
- }
- } else {
- connection ! command
- }
- }
-
- private[akka] def createDeathWatch(): DeathWatch = local.createDeathWatch() //FIXME Implement Remote DeathWatch, ticket ##1190
-
- private[akka] def ask(message: Any, recipient: ActorRef, within: Timeout): Future[Any] = local.ask(message, recipient, within)
}
/**
diff --git a/akka-remote/src/main/scala/akka/remote/RemoteDeployer.scala b/akka-remote/src/main/scala/akka/remote/RemoteDeployer.scala
index c3db02e9f1..8e790991db 100644
--- a/akka-remote/src/main/scala/akka/remote/RemoteDeployer.scala
+++ b/akka-remote/src/main/scala/akka/remote/RemoteDeployer.scala
@@ -11,46 +11,28 @@ import akka.config.ConfigurationException
object RemoteDeploymentConfig {
- case class RemoteScope(nodes: Iterable[RemoteAddress]) extends DeploymentConfig.Scope
+ case class RemoteScope(node: RemoteAddress) extends DeploymentConfig.Scope
}
-class RemoteDeployer(_settings: ActorSystem.Settings, _eventStream: EventStream, _nodename: String)
- extends Deployer(_settings, _eventStream, _nodename) {
+class RemoteDeployer(_settings: ActorSystem.Settings) extends Deployer(_settings) {
import RemoteDeploymentConfig._
- override protected def lookupInConfig(path: String, configuration: Config = settings.config): Deploy = {
+ override protected def parseConfig(path: String, config: Config): Option[Deploy] = {
import scala.collection.JavaConverters._
import akka.util.ReflectiveAccess._
- val defaultDeploymentConfig = configuration.getConfig("akka.actor.deployment.default")
+ val deployment = config.withFallback(default)
- // --------------------------------
- // akka.actor.deployment.
- // --------------------------------
- val deploymentKey = "akka.actor.deployment." + path
- val deployment = configuration.getConfig(deploymentKey)
+ val transform: Deploy ⇒ Deploy =
+ if (deployment.hasPath("remote")) deployment.getString("remote") match {
+ case RemoteAddressExtractor(r) ⇒ (d ⇒ d.copy(scope = RemoteScope(r)))
+ case _ ⇒ identity
+ }
+ else identity
- val deploymentWithFallback = deployment.withFallback(defaultDeploymentConfig)
-
- val remoteNodes = deploymentWithFallback.getStringList("remote.nodes").asScala.toSeq
-
- // --------------------------------
- // akka.actor.deployment..remote
- // --------------------------------
- def parseRemote: Scope = {
- def raiseRemoteNodeParsingError() = throw new ConfigurationException(
- "Config option [" + deploymentKey +
- ".remote.nodes] needs to be a list with elements on format \":\", was [" + remoteNodes.mkString(", ") + "]")
-
- val remoteAddresses = remoteNodes map (RemoteAddress(_, settings.name))
-
- RemoteScope(remoteAddresses)
- }
-
- val local = super.lookupInConfig(path, configuration)
- if (remoteNodes.isEmpty) local else local.copy(scope = parseRemote)
+ super.parseConfig(path, config) map transform
}
}
\ No newline at end of file
diff --git a/akka-remote/src/main/scala/akka/remote/RemoteInterface.scala b/akka-remote/src/main/scala/akka/remote/RemoteInterface.scala
index 3791924f19..21da080f87 100644
--- a/akka-remote/src/main/scala/akka/remote/RemoteInterface.scala
+++ b/akka-remote/src/main/scala/akka/remote/RemoteInterface.scala
@@ -29,6 +29,19 @@ object RemoteAddress {
case RE(sys, host, Int(port)) ⇒ apply(if (sys != null) sys else defaultSystem, host, port)
case _ ⇒ throw new IllegalArgumentException(stringRep + " is not a valid remote address [system@host:port]")
}
+
+}
+
+object RemoteAddressExtractor {
+ def unapply(s: String): Option[RemoteAddress] = {
+ try {
+ val uri = new URI("akka://" + s)
+ if (uri.getScheme != "akka" || uri.getUserInfo == null || uri.getHost == null || uri.getPort == -1) None
+ else Some(RemoteAddress(uri.getUserInfo, uri.getHost, uri.getPort))
+ } catch {
+ case _: URISyntaxException ⇒ None
+ }
+ }
}
case class RemoteAddress(system: String, host: String, ip: InetAddress, port: Int) extends Address {
diff --git a/akka-remote/src/test/scala/akka/remote/RemoteCommunicationSpec.scala b/akka-remote/src/test/scala/akka/remote/RemoteCommunicationSpec.scala
index 80e7c122da..cb4b3c2f52 100644
--- a/akka-remote/src/test/scala/akka/remote/RemoteCommunicationSpec.scala
+++ b/akka-remote/src/test/scala/akka/remote/RemoteCommunicationSpec.scala
@@ -38,9 +38,9 @@ akka {
port = 12345
}
actor.deployment {
- /blub.remote.nodes = ["remote_sys@localhost:12346"]
- /looker/child.remote.nodes = ["remote_sys@localhost:12346"]
- /looker/child/grandchild.remote.nodes = ["RemoteCommunicationSpec@localhost:12345"]
+ /blub.remote = "remote_sys@localhost:12346"
+ /looker/child.remote = "remote_sys@localhost:12346"
+ /looker/child/grandchild.remote = "RemoteCommunicationSpec@localhost:12345"
}
}
""") with ImplicitSender {
diff --git a/akka-remote/src/test/scala/akka/remote/RemoteDeployerSpec.scala b/akka-remote/src/test/scala/akka/remote/RemoteDeployerSpec.scala
index 3956d8fdab..c8daf3b13b 100644
--- a/akka-remote/src/test/scala/akka/remote/RemoteDeployerSpec.scala
+++ b/akka-remote/src/test/scala/akka/remote/RemoteDeployerSpec.scala
@@ -14,65 +14,10 @@ object RemoteDeployerSpec {
akka.actor.provider = "akka.remote.RemoteActorRefProvider"
akka.cluster.nodename = Whatever
akka.actor.deployment {
- /user/service1 {
- }
/user/service2 {
router = round-robin
nr-of-instances = 3
- remote {
- nodes = ["wallace:2552", "gromit:2552"]
- }
- }
- /user/service3 {
- create-as {
- class = "akka.actor.DeployerSpec$RecipeActor"
- }
- }
- /user/service-auto {
- router = round-robin
- nr-of-instances = auto
- }
- /user/service-direct {
- router = direct
- }
- /user/service-direct2 {
- router = direct
- # nr-of-instances ignored when router = direct
- nr-of-instances = 2
- }
- /user/service-round-robin {
- router = round-robin
- }
- /user/service-random {
- router = random
- }
- /user/service-scatter-gather {
- router = scatter-gather
- }
- /user/service-least-cpu {
- router = least-cpu
- }
- /user/service-least-ram {
- router = least-ram
- }
- /user/service-least-messages {
- router = least-messages
- }
- /user/service-custom {
- router = org.my.Custom
- }
- /user/service-cluster1 {
- cluster {
- preferred-nodes = ["node:wallace", "node:gromit"]
- }
- }
- /user/service-cluster2 {
- cluster {
- preferred-nodes = ["node:wallace", "node:gromit"]
- replication {
- strategy = write-behind
- }
- }
+ remote = "sys@wallace:2552"
}
}
""", ConfigParseOptions.defaults)
@@ -90,7 +35,7 @@ class RemoteDeployerSpec extends AkkaSpec(RemoteDeployerSpec.deployerConf) {
"be able to parse 'akka.actor.deployment._' with specified remote nodes" in {
val service = "/user/service2"
- val deployment = system.asInstanceOf[ActorSystemImpl].provider.deployer.lookupDeployment(service)
+ val deployment = system.asInstanceOf[ActorSystemImpl].provider.deployer.lookup(service)
deployment must be('defined)
deployment must be(Some(
@@ -99,8 +44,7 @@ class RemoteDeployerSpec extends AkkaSpec(RemoteDeployerSpec.deployerConf) {
None,
RoundRobin,
NrOfInstances(3),
- RemoteScope(Seq(
- RemoteAddress(system.name, "wallace", 2552), RemoteAddress(system.name, "gromit", 2552))))))
+ RemoteScope(RemoteAddress("sys", "wallace", 2552)))))
}
}