simplify structure of Deployer

- remove unused ActorDeployer trait
- put everything in one class with simple initialization structure and
  one protected method to override for adaptations
- adapt RemoteDeployer accordingly
- change “remote” key to directly contain the single remote address,
  since there is nothing else to configure
- adapt test cases accordingly
This commit is contained in:
Roland 2011-12-09 20:19:59 +01:00
parent 8540c70f18
commit 4f643eaa1b
8 changed files with 85 additions and 445 deletions

View file

@ -89,7 +89,7 @@ class DeployerSpec extends AkkaSpec(DeployerSpec.deployerConf) {
"be able to parse 'akka.actor.deployment._' with all default values" in {
val service = "/user/service1"
val deployment = system.asInstanceOf[ActorSystemImpl].provider.deployer.lookupDeployment(service)
val deployment = system.asInstanceOf[ActorSystemImpl].provider.deployer.lookup(service)
deployment must be('defined)
deployment must be(Some(
@ -103,13 +103,13 @@ class DeployerSpec extends AkkaSpec(DeployerSpec.deployerConf) {
"use None deployment for undefined service" in {
val service = "/user/undefined"
val deployment = system.asInstanceOf[ActorSystemImpl].provider.deployer.lookupDeployment(service)
val deployment = system.asInstanceOf[ActorSystemImpl].provider.deployer.lookup(service)
deployment must be(None)
}
"be able to parse 'akka.actor.deployment._' with recipe" in {
val service = "/user/service3"
val deployment = system.asInstanceOf[ActorSystemImpl].provider.deployer.lookupDeployment(service)
val deployment = system.asInstanceOf[ActorSystemImpl].provider.deployer.lookup(service)
deployment must be('defined)
deployment must be(Some(
@ -123,7 +123,7 @@ class DeployerSpec extends AkkaSpec(DeployerSpec.deployerConf) {
"be able to parse 'akka.actor.deployment._' with number-of-instances=auto" in {
val service = "/user/service-auto"
val deployment = system.asInstanceOf[ActorSystemImpl].provider.deployer.lookupDeployment(service)
val deployment = system.asInstanceOf[ActorSystemImpl].provider.deployer.lookup(service)
deployment must be('defined)
deployment must be(Some(
@ -186,7 +186,7 @@ class DeployerSpec extends AkkaSpec(DeployerSpec.deployerConf) {
}
def assertRouting(expected: Routing, service: String) {
val deployment = system.asInstanceOf[ActorSystemImpl].provider.deployer.lookupDeployment(service)
val deployment = system.asInstanceOf[ActorSystemImpl].provider.deployer.lookup(service)
deployment must be('defined)
deployment must be(Some(
@ -199,33 +199,5 @@ class DeployerSpec extends AkkaSpec(DeployerSpec.deployerConf) {
}
"be able to parse 'akka.actor.deployment._' with specified cluster nodes" ignore {
val service = "/user/service-cluster1"
val deploymentConfig = system.asInstanceOf[ActorSystemImpl].provider.deployer.deploymentConfig
val deployment = system.asInstanceOf[ActorSystemImpl].provider.deployer.lookupDeployment(service)
deployment must be('defined)
deployment.get.scope match {
case deploymentConfig.ClusterScope(remoteNodes, replication)
remoteNodes must be(Seq(Node("wallace"), Node("gromit")))
replication must be(Transient)
case other fail("Unexpected: " + other)
}
}
"be able to parse 'akka.actor.deployment._' with specified cluster replication" ignore {
val service = "/user/service-cluster2"
val deploymentConfig = system.asInstanceOf[ActorSystemImpl].provider.deployer.deploymentConfig
val deployment = system.asInstanceOf[ActorSystemImpl].provider.deployer.lookupDeployment(service)
deployment must be('defined)
deployment.get.scope match {
case deploymentConfig.ClusterScope(remoteNodes, Replication(storage, strategy))
storage must be(TransactionLog)
strategy must be(WriteBehind)
case other fail("Unexpected: " + other)
}
}
}
}

View file

@ -70,9 +70,9 @@ trait ActorRefProvider {
*/
def init(system: ActorSystemImpl): Unit
private[akka] def deployer: Deployer
def deployer: Deployer
private[akka] def scheduler: Scheduler
def scheduler: Scheduler
/**
* Actor factory with create-only semantics: will create an actor as
@ -104,18 +104,16 @@ trait ActorRefProvider {
*/
def actorFor(ref: InternalActorRef, p: Iterable[String]): InternalActorRef
private[akka] def createDeathWatch(): DeathWatch
/**
* Create AskActorRef to hook up message send to recipient with Future receiver.
*/
private[akka] def ask(message: Any, recipient: ActorRef, within: Timeout): Future[Any]
def ask(message: Any, recipient: ActorRef, within: Timeout): Future[Any]
/**
* This Future is completed upon termination of this ActorRefProvider, which
* is usually initiated by stopping the guardian via ActorSystem.stop().
*/
private[akka] def terminationFuture: Future[Unit]
def terminationFuture: Future[Unit]
}
/**
@ -342,14 +340,21 @@ class LocalActorRefProvider(
val eventStream: EventStream,
val scheduler: Scheduler,
val deadLetters: InternalActorRef,
val rootPath: ActorPath) extends ActorRefProvider {
val rootPath: ActorPath,
val deployer: Deployer) extends ActorRefProvider {
def this(_systemName: String,
settings: ActorSystem.Settings,
eventStream: EventStream,
scheduler: Scheduler,
deadLetters: InternalActorRef) =
this(_systemName, settings, eventStream, scheduler, deadLetters, new RootActorPath(LocalAddress(_systemName)))
this(_systemName,
settings,
eventStream,
scheduler,
deadLetters,
new RootActorPath(LocalAddress(_systemName)),
new Deployer(settings))
// FIXME remove both
val nodename: String = "local"
@ -357,8 +362,6 @@ class LocalActorRefProvider(
val log = Logging(eventStream, "LocalActorRefProvider")
private[akka] val deployer: Deployer = new Deployer(settings, eventStream, nodename)
/*
* generate name for temporary actor refs
*/
@ -481,7 +484,7 @@ class LocalActorRefProvider(
lazy val tempContainer = new VirtualPathContainer(tempNode, rootGuardian, log)
val deathWatch = createDeathWatch()
val deathWatch = new LocalDeathWatch
def init(_system: ActorSystemImpl) {
system = _system
@ -524,13 +527,11 @@ class LocalActorRefProvider(
private def adaptFromDeploy(r: RouterConfig, p: ActorPath): RouterConfig = {
val lookupPath = p.elements.mkString("/", "/", "")
val deploy = deployer.instance.lookupDeployment(lookupPath)
val deploy = deployer.lookup(lookupPath)
r.adaptFromDeploy(deploy)
}
private[akka] def createDeathWatch(): DeathWatch = new LocalDeathWatch
private[akka] def ask(message: Any, recipient: ActorRef, within: Timeout): Future[Any] = {
def ask(message: Any, recipient: ActorRef, within: Timeout): Future[Any] = {
import akka.dispatch.DefaultPromise
(if (within == null) settings.ActorTimeout else within) match {
case t if t.duration.length <= 0

View file

@ -12,103 +12,38 @@ import akka.AkkaException
import akka.config.ConfigurationException
import akka.util.Duration
import akka.event.EventStream
import com.typesafe.config.Config
private[akka] trait ActorDeployer {
def init(deployments: Seq[Deploy]): Unit
def deploy(deployment: Deploy): Unit
def lookupDeploymentFor(path: String): Option[Deploy]
def lookupDeployment(path: String): Option[Deploy] = path match {
case null | "" None
case s if s.startsWith("$") None
case some lookupDeploymentFor(some)
}
def deploy(deployment: Seq[Deploy]): Unit = deployment foreach (deploy(_))
}
import com.typesafe.config._
/**
* Deployer maps actor paths to actor deployments.
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class Deployer(val settings: ActorSystem.Settings, val eventStream: EventStream, val nodename: String) extends ActorDeployer {
class Deployer(val settings: ActorSystem.Settings) {
val deploymentConfig = new DeploymentConfig(nodename)
val log = Logging(eventStream, "Deployer")
import scala.collection.JavaConverters._
val instance: ActorDeployer = {
val deployer = new LocalDeployer()
deployer.init(deploymentsInConfig)
deployer
}
private val deployments = new ConcurrentHashMap[String, Deploy]
private val config = settings.config.getConfig("akka.actor.deployment")
protected val default = config.getConfig("default")
config.root.asScala flatMap {
case ("default", _) None
case (key, value: ConfigObject) parseConfig(key, value.toConfig)
case _ None
} foreach deploy
def start(): Unit = instance.toString //Force evaluation
def lookup(path: String): Option[Deploy] = Option(deployments.get(path))
def init(deployments: Seq[Deploy]) = instance.init(deployments)
def deploy(d: Deploy): Unit = deployments.put(d.path, d)
def deploy(deployment: Deploy): Unit = instance.deploy(deployment)
def isLocal(deployment: Deploy): Boolean = deployment match {
case Deploy(_, _, _, _, LocalScope) | Deploy(_, _, _, _, _: LocalScope) true
case _ false
}
def isClustered(deployment: Deploy): Boolean = !isLocal(deployment)
def isLocal(path: String): Boolean = isLocal(deploymentFor(path)) //TODO Should this throw exception if path not found?
def isClustered(path: String): Boolean = !isLocal(path) //TODO Should this throw exception if path not found?
/**
* Same as 'lookupDeploymentFor' but throws an exception if no deployment is bound.
*/
protected def deploymentFor(path: String): Deploy = {
lookupDeploymentFor(path) match {
case Some(deployment) deployment
case None thrownNoDeploymentBoundException(path)
}
}
def lookupDeploymentFor(path: String): Option[Deploy] =
instance.lookupDeploymentFor(path)
protected def deploymentsInConfig: List[Deploy] = {
for (path pathsInConfig) yield lookupInConfig(path)
}
protected def pathsInConfig: List[String] = {
def pathSubstring(path: String) = {
val i = path.indexOf(".")
if (i == -1) path else path.substring(0, i)
}
import scala.collection.JavaConverters._
settings.config.getConfig("akka.actor.deployment").root.keySet.asScala
.filterNot("default" ==)
.map(path pathSubstring(path))
.toSet.toList // toSet to force uniqueness
}
/**
* Lookup deployment in 'akka.conf' configuration file.
*/
protected def lookupInConfig(path: String, configuration: Config = settings.config): Deploy = {
import scala.collection.JavaConverters._
protected def parseConfig(key: String, config: Config): Option[Deploy] = {
import akka.util.ReflectiveAccess.getClassFor
val defaultDeploymentConfig = configuration.getConfig("akka.actor.deployment.default")
// --------------------------------
// akka.actor.deployment.<path>
// --------------------------------
val deploymentKey = "akka.actor.deployment." + path
val deployment = configuration.getConfig(deploymentKey)
val deploymentWithFallback = deployment.withFallback(defaultDeploymentConfig)
val deployment = config.withFallback(default)
// --------------------------------
// akka.actor.deployment.<path>.router
// --------------------------------
val router: Routing = deploymentWithFallback.getString("router") match {
val router: Routing = deployment.getString("router") match {
case "round-robin" RoundRobin
case "random" Random
case "scatter-gather" ScatterGather
@ -125,11 +60,11 @@ class Deployer(val settings: ActorSystem.Settings, val eventStream: EventStream,
if (router == NoRouting) OneNrOfInstances
else {
def invalidNrOfInstances(wasValue: Any) = new ConfigurationException(
"Config option [" + deploymentKey +
"Config option [akka.actor.deployment." + key +
".nr-of-instances] needs to be either [\"auto\"] or [1-N] - was [" +
wasValue + "]")
deploymentWithFallback.getAnyRef("nr-of-instances").asInstanceOf[Any] match {
deployment.getAnyRef("nr-of-instances").asInstanceOf[Any] match {
case "auto" AutoNrOfInstances
case 1 OneNrOfInstances
case 0 ZeroNrOfInstances
@ -148,115 +83,15 @@ class Deployer(val settings: ActorSystem.Settings, val eventStream: EventStream,
// akka.actor.deployment.<path>.create-as
// --------------------------------
val recipe: Option[ActorRecipe] =
deploymentWithFallback.getString("create-as.class") match {
deployment.getString("create-as.class") match {
case "" None
case impl
val implementationClass = getClassFor[Actor](impl).fold(e throw new ConfigurationException(
"Config option [" + deploymentKey + ".create-as.class] load failed", e), identity)
"Config option [akka.actor.deployment." + key + ".create-as.class] load failed", e), identity)
Some(ActorRecipe(implementationClass))
}
val clusterPreferredNodes = deploymentWithFallback.getStringList("cluster.preferred-nodes").asScala.toSeq
// --------------------------------
// akka.actor.deployment.<path>.cluster
// --------------------------------
// def parseCluster: Scope = {
// def raiseHomeConfigError() = throw new ConfigurationException(
// "Config option [" + deploymentKey +
// ".cluster.preferred-nodes] needs to be a list with elements on format\n'host:<hostname>', 'ip:<ip address>' or 'node:<node name>', was [" +
// clusterPreferredNodes + "]")
//
// val remoteNodes = clusterPreferredNodes map { home
// if (!(home.startsWith("host:") || home.startsWith("node:") || home.startsWith("ip:"))) raiseHomeConfigError()
//
// val tokenizer = new java.util.StringTokenizer(home, ":")
// val protocol = tokenizer.nextElement
// val address = tokenizer.nextElement.asInstanceOf[String]
//
// // TODO host and ip protocols?
// protocol match {
// case "node" Node(address)
// case _ raiseHomeConfigError()
// }
// }
// deploymentConfig.ClusterScope(remoteNodes, parseClusterReplication)
// }
// --------------------------------
// akka.actor.deployment.<path>.cluster.replication
// --------------------------------
// def parseClusterReplication: ReplicationScheme = {
// deployment.hasPath("cluster.replication") match {
// case false Transient
// case true
// val replicationConfigWithFallback = deploymentWithFallback.getConfig("cluster.replication")
// val storage = replicationConfigWithFallback.getString("storage") match {
// case "transaction-log" TransactionLog
// case "data-grid" DataGrid
// case unknown
// throw new ConfigurationException("Config option [" + deploymentKey +
// ".cluster.replication.storage] needs to be either [\"transaction-log\"] or [\"data-grid\"] - was [" +
// unknown + "]")
// }
// val strategy = replicationConfigWithFallback.getString("strategy") match {
// case "write-through" WriteThrough
// case "write-behind" WriteBehind
// case unknown
// throw new ConfigurationException("Config option [" + deploymentKey +
// ".cluster.replication.strategy] needs to be either [\"write-through\"] or [\"write-behind\"] - was [" +
// unknown + "]")
// }
// Replication(storage, strategy)
// }
// }
//
// val scope = (remoteNodes, clusterPreferredNodes) match {
// case (Nil, Nil)
// LocalScope
// case (_, Nil)
// // we have a 'remote' config section
// parseRemote
// case (Nil, _)
// // we have a 'cluster' config section
// parseCluster
// case (_, _) throw new ConfigurationException(
// "Configuration for deployment ID [" + path + "] can not have both 'remote' and 'cluster' sections.")
// }
Deploy(path, recipe, router, nrOfInstances, LocalScope)
Some(Deploy(key, recipe, router, nrOfInstances, LocalScope))
}
protected def throwDeploymentBoundException(deployment: Deploy): Nothing = {
val e = new DeploymentAlreadyBoundException("Path [" + deployment.path + "] already bound to [" + deployment + "]")
log.error(e, e.getMessage)
throw e
}
protected def thrownNoDeploymentBoundException(path: String): Nothing = {
val e = new NoDeploymentBoundException("Path [" + path + "] is not bound to a deployment")
log.error(e, e.getMessage)
throw e
}
}
/**
* Simple local deployer, only for internal use.
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class LocalDeployer extends ActorDeployer {
private val deployments = new ConcurrentHashMap[String, Deploy]
def init(deployments: Seq[Deploy]): Unit = deployments foreach deploy // deploy
def shutdown(): Unit = deployments.clear() //TODO do something else/more?
def deploy(deployment: Deploy): Unit = deployments.putIfAbsent(deployment.path, deployment)
def lookupDeploymentFor(path: String): Option[Deploy] = Option(deployments.get(path))
}
class DeploymentException private[akka] (message: String) extends AkkaException(message)
class DeploymentAlreadyBoundException private[akka] (message: String) extends AkkaException(message)
class NoDeploymentBoundException private[akka] (message: String) extends AkkaException(message)

View file

@ -47,9 +47,15 @@ class RemoteActorRefProvider(
def systemGuardian = local.systemGuardian
def nodename = remoteSettings.NodeName
def clustername = remoteSettings.ClusterName
def terminationFuture = local.terminationFuture
def dispatcher = local.dispatcher
val deployer = new RemoteDeployer(settings)
val rootPath: ActorPath = RootActorPath(RemoteAddress(systemName, remoteSettings.serverSettings.Hostname, remoteSettings.serverSettings.Port))
private val local = new LocalActorRefProvider(systemName, settings, eventStream, scheduler, _deadLetters, rootPath)
private val local = new LocalActorRefProvider(systemName, settings, eventStream, scheduler, _deadLetters, rootPath, deployer)
private var serialization: Serialization = _
private var _remote: Remote = _
@ -63,13 +69,6 @@ class RemoteActorRefProvider(
terminationFuture.onComplete(_ remote.server.shutdown())
}
private[akka] def terminationFuture = local.terminationFuture
private[akka] def deployer: Deployer = new RemoteDeployer(settings, eventStream, nodename)
def dispatcher = local.dispatcher
def defaultTimeout = settings.ActorTimeout
def actorOf(system: ActorSystemImpl, props: Props, supervisor: InternalActorRef, name: String, systemService: Boolean): InternalActorRef =
if (systemService) local.actorOf(system, props, supervisor, name, systemService)
else {
@ -80,105 +79,32 @@ class RemoteActorRefProvider(
p.headOption match {
case None None
case Some("remote") lookupRemotes(p.drop(2))
case Some("user") deployer.lookupDeploymentFor(p.drop(1).mkString("/", "/", ""))
case Some("user") deployer.lookup(p.drop(1).mkString("/", "/", ""))
case Some(_) None
}
}
val elems = path.elements
val deployment = (elems.head match {
case "user" deployer.lookupDeploymentFor(elems.drop(1).mkString("/", "/", ""))
case _ None
}) orElse (elems.head match {
case "user" deployer.lookup(elems.drop(1).mkString("/", "/", ""))
case "remote" lookupRemotes(elems)
case _ None
})
deployment match {
case Some(DeploymentConfig.Deploy(_, _, routerType, nrOfInstances, RemoteDeploymentConfig.RemoteScope(remoteAddresses)))
case Some(DeploymentConfig.Deploy(_, _, _, _, RemoteDeploymentConfig.RemoteScope(address)))
// FIXME RK deployer shall only concern itself with placement of actors on remote nodes
val address = remoteAddresses.head
if (address == rootPath.address) local.actorOf(system, props, supervisor, name, true) // FIXME RK make non-system
if (address == rootPath.address) local.actorOf(system, props, supervisor, name)
else {
val rpath = RootActorPath(address) / "remote" / rootPath.address.hostPort / path.elements
useActorOnNode(rpath, props.creator, supervisor)
new RemoteActorRef(this, remote.server, rpath, supervisor, None)
}
// def isReplicaNode: Boolean = remoteAddresses exists { _ == remote.remoteAddress }
//
// //system.eventHandler.debug(this, "%s: Deploy Remote Actor with address [%s] connected to [%s]: isReplica(%s)".format(system.defaultAddress, address, remoteAddresses.mkString, isReplicaNode))
//
// if (isReplicaNode) {
// // we are on one of the replica node for this remote actor
// local.actorOf(system, props, supervisor, name, true) //FIXME systemService = true here to bypass Deploy, should be fixed when create-or-get is replaced by get-or-create (is this fixed now?)
// } else {
//
// implicit val dispatcher = if (props.dispatcher == Props.defaultDispatcher) system.dispatcher else props.dispatcher
// implicit val timeout = system.settings.ActorTimeout
//
// // we are on the single "reference" node uses the remote actors on the replica nodes
// val routerFactory: () Router = DeploymentConfig.routerTypeFor(routerType) match {
// case RouterType.Direct
// if (remoteAddresses.size != 1) throw new ConfigurationException(
// "Actor [%s] configured with Direct router must have exactly 1 remote node configured. Found [%s]"
// .format(name, remoteAddresses.mkString(", ")))
// () new DirectRouter
//
// case RouterType.Broadcast
// if (remoteAddresses.size != 1) throw new ConfigurationException(
// "Actor [%s] configured with Broadcast router must have exactly 1 remote node configured. Found [%s]"
// .format(name, remoteAddresses.mkString(", ")))
// () new BroadcastRouter
//
// case RouterType.Random
// if (remoteAddresses.size < 1) throw new ConfigurationException(
// "Actor [%s] configured with Random router must have at least 1 remote node configured. Found [%s]"
// .format(name, remoteAddresses.mkString(", ")))
// () new RandomRouter
//
// case RouterType.RoundRobin
// if (remoteAddresses.size < 1) throw new ConfigurationException(
// "Actor [%s] configured with RoundRobin router must have at least 1 remote node configured. Found [%s]"
// .format(name, remoteAddresses.mkString(", ")))
// () new RoundRobinRouter
//
// case RouterType.ScatterGather
// if (remoteAddresses.size < 1) throw new ConfigurationException(
// "Actor [%s] configured with ScatterGather router must have at least 1 remote node configured. Found [%s]"
// .format(name, remoteAddresses.mkString(", ")))
// () new ScatterGatherFirstCompletedRouter()(dispatcher, defaultTimeout)
//
// case RouterType.LeastCPU sys.error("Router LeastCPU not supported yet")
// case RouterType.LeastRAM sys.error("Router LeastRAM not supported yet")
// case RouterType.LeastMessages sys.error("Router LeastMessages not supported yet")
// case RouterType.Custom(implClass) () Routing.createCustomRouter(implClass)
// }
//
// val connections = (Map.empty[RemoteAddress, ActorRef] /: remoteAddresses) { (conns, a)
// conns + (a -> new RemoteActorRef(this, remote.server, path, None)) // FIXME RK correct path must be put in here
// }
//
// val connectionManager = new RemoteConnectionManager(system, remote, connections)
//
// connections.keys foreach { useActorOnNode(system, _, path.toString, props.creator) }
//
// actorOf(system, RoutedProps(routerFactory = routerFactory, connectionManager = connectionManager), supervisor, name)
// }
case deploy local.actorOf(system, props, supervisor, name, systemService)
case _ local.actorOf(system, props, supervisor, name, systemService)
}
}
/**
* Copied from LocalActorRefProvider...
*/
// FIXME: implement supervision, ticket #1408
// def actorOf(system: ActorSystem, props: RoutedProps, supervisor: InternalActorRef, name: String): InternalActorRef = {
// if (props.connectionManager.isEmpty) throw new ConfigurationException("RoutedProps used for creating actor [" + name + "] has zero connections configured; can't create a router")
// new RoutedActorRef(system, props, supervisor, name)
// }
def actorFor(path: ActorPath): InternalActorRef = path.root match {
case `rootPath` actorFor(rootGuardian, path.elements)
case RootActorPath(_: RemoteAddress, _) new RemoteActorRef(this, remote.server, path, Nobody, None)
@ -194,9 +120,7 @@ class RemoteActorRefProvider(
def actorFor(ref: InternalActorRef, path: Iterable[String]): InternalActorRef = local.actorFor(ref, path)
// TODO remove me
val optimizeLocal = new AtomicBoolean(true)
def optimizeLocalScoped_?() = optimizeLocal.get
def ask(message: Any, recipient: ActorRef, within: Timeout): Future[Any] = local.ask(message, recipient, within)
/**
* Using (checking out) actor on a specific node.
@ -220,37 +144,6 @@ class RemoteActorRefProvider(
// we dont wait for the ACK, because the remote end will process this command before any other message to the new actor
actorFor(RootActorPath(path.address) / "remote") ! command
}
private def sendCommandToRemoteNode(connection: ActorRef, command: RemoteSystemDaemonMessageProtocol, withACK: Boolean) {
if (withACK) {
try {
val f = connection ? (command, remoteSettings.RemoteSystemDaemonAckTimeout)
(try f.await.value catch { case _: FutureTimeoutException None }) match {
case Some(Right(receiver))
log.debug("Remote system command sent to [{}] successfully received", receiver)
case Some(Left(cause))
log.error(cause, cause.toString)
throw cause
case None
val error = new RemoteException("Remote system command to [%s] timed out".format(connection.path))
log.error(error, error.toString)
throw error
}
} catch {
case e: Exception
log.error(e, "Could not send remote system command to [{}] due to: {}", connection.path, e.toString)
throw e
}
} else {
connection ! command
}
}
private[akka] def createDeathWatch(): DeathWatch = local.createDeathWatch() //FIXME Implement Remote DeathWatch, ticket ##1190
private[akka] def ask(message: Any, recipient: ActorRef, within: Timeout): Future[Any] = local.ask(message, recipient, within)
}
/**

View file

@ -11,46 +11,28 @@ import akka.config.ConfigurationException
object RemoteDeploymentConfig {
case class RemoteScope(nodes: Iterable[RemoteAddress]) extends DeploymentConfig.Scope
case class RemoteScope(node: RemoteAddress) extends DeploymentConfig.Scope
}
class RemoteDeployer(_settings: ActorSystem.Settings, _eventStream: EventStream, _nodename: String)
extends Deployer(_settings, _eventStream, _nodename) {
class RemoteDeployer(_settings: ActorSystem.Settings) extends Deployer(_settings) {
import RemoteDeploymentConfig._
override protected def lookupInConfig(path: String, configuration: Config = settings.config): Deploy = {
override protected def parseConfig(path: String, config: Config): Option[Deploy] = {
import scala.collection.JavaConverters._
import akka.util.ReflectiveAccess._
val defaultDeploymentConfig = configuration.getConfig("akka.actor.deployment.default")
val deployment = config.withFallback(default)
// --------------------------------
// akka.actor.deployment.<path>
// --------------------------------
val deploymentKey = "akka.actor.deployment." + path
val deployment = configuration.getConfig(deploymentKey)
val transform: Deploy Deploy =
if (deployment.hasPath("remote")) deployment.getString("remote") match {
case RemoteAddressExtractor(r) (d d.copy(scope = RemoteScope(r)))
case _ identity
}
else identity
val deploymentWithFallback = deployment.withFallback(defaultDeploymentConfig)
val remoteNodes = deploymentWithFallback.getStringList("remote.nodes").asScala.toSeq
// --------------------------------
// akka.actor.deployment.<path>.remote
// --------------------------------
def parseRemote: Scope = {
def raiseRemoteNodeParsingError() = throw new ConfigurationException(
"Config option [" + deploymentKey +
".remote.nodes] needs to be a list with elements on format \"<hostname>:<port>\", was [" + remoteNodes.mkString(", ") + "]")
val remoteAddresses = remoteNodes map (RemoteAddress(_, settings.name))
RemoteScope(remoteAddresses)
}
val local = super.lookupInConfig(path, configuration)
if (remoteNodes.isEmpty) local else local.copy(scope = parseRemote)
super.parseConfig(path, config) map transform
}
}

View file

@ -29,6 +29,19 @@ object RemoteAddress {
case RE(sys, host, Int(port)) apply(if (sys != null) sys else defaultSystem, host, port)
case _ throw new IllegalArgumentException(stringRep + " is not a valid remote address [system@host:port]")
}
}
object RemoteAddressExtractor {
def unapply(s: String): Option[RemoteAddress] = {
try {
val uri = new URI("akka://" + s)
if (uri.getScheme != "akka" || uri.getUserInfo == null || uri.getHost == null || uri.getPort == -1) None
else Some(RemoteAddress(uri.getUserInfo, uri.getHost, uri.getPort))
} catch {
case _: URISyntaxException None
}
}
}
case class RemoteAddress(system: String, host: String, ip: InetAddress, port: Int) extends Address {

View file

@ -38,9 +38,9 @@ akka {
port = 12345
}
actor.deployment {
/blub.remote.nodes = ["remote_sys@localhost:12346"]
/looker/child.remote.nodes = ["remote_sys@localhost:12346"]
/looker/child/grandchild.remote.nodes = ["RemoteCommunicationSpec@localhost:12345"]
/blub.remote = "remote_sys@localhost:12346"
/looker/child.remote = "remote_sys@localhost:12346"
/looker/child/grandchild.remote = "RemoteCommunicationSpec@localhost:12345"
}
}
""") with ImplicitSender {

View file

@ -14,65 +14,10 @@ object RemoteDeployerSpec {
akka.actor.provider = "akka.remote.RemoteActorRefProvider"
akka.cluster.nodename = Whatever
akka.actor.deployment {
/user/service1 {
}
/user/service2 {
router = round-robin
nr-of-instances = 3
remote {
nodes = ["wallace:2552", "gromit:2552"]
}
}
/user/service3 {
create-as {
class = "akka.actor.DeployerSpec$RecipeActor"
}
}
/user/service-auto {
router = round-robin
nr-of-instances = auto
}
/user/service-direct {
router = direct
}
/user/service-direct2 {
router = direct
# nr-of-instances ignored when router = direct
nr-of-instances = 2
}
/user/service-round-robin {
router = round-robin
}
/user/service-random {
router = random
}
/user/service-scatter-gather {
router = scatter-gather
}
/user/service-least-cpu {
router = least-cpu
}
/user/service-least-ram {
router = least-ram
}
/user/service-least-messages {
router = least-messages
}
/user/service-custom {
router = org.my.Custom
}
/user/service-cluster1 {
cluster {
preferred-nodes = ["node:wallace", "node:gromit"]
}
}
/user/service-cluster2 {
cluster {
preferred-nodes = ["node:wallace", "node:gromit"]
replication {
strategy = write-behind
}
}
remote = "sys@wallace:2552"
}
}
""", ConfigParseOptions.defaults)
@ -90,7 +35,7 @@ class RemoteDeployerSpec extends AkkaSpec(RemoteDeployerSpec.deployerConf) {
"be able to parse 'akka.actor.deployment._' with specified remote nodes" in {
val service = "/user/service2"
val deployment = system.asInstanceOf[ActorSystemImpl].provider.deployer.lookupDeployment(service)
val deployment = system.asInstanceOf[ActorSystemImpl].provider.deployer.lookup(service)
deployment must be('defined)
deployment must be(Some(
@ -99,8 +44,7 @@ class RemoteDeployerSpec extends AkkaSpec(RemoteDeployerSpec.deployerConf) {
None,
RoundRobin,
NrOfInstances(3),
RemoteScope(Seq(
RemoteAddress(system.name, "wallace", 2552), RemoteAddress(system.name, "gromit", 2552))))))
RemoteScope(RemoteAddress("sys", "wallace", 2552)))))
}
}