diff --git a/akka-actor-tests/src/test/scala/akka/actor/DeployerSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/DeployerSpec.scala index a3481e1903..6391200613 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/DeployerSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/DeployerSpec.scala @@ -7,7 +7,6 @@ package akka.actor import akka.testkit.AkkaSpec import akka.util.duration._ import DeploymentConfig._ -import akka.remote.RemoteAddress import com.typesafe.config.ConfigFactory import com.typesafe.config.ConfigParseOptions @@ -108,21 +107,6 @@ class DeployerSpec extends AkkaSpec(DeployerSpec.deployerConf) { deployment must be(None) } - "be able to parse 'akka.actor.deployment._' with specified remote nodes" in { - val service = "/user/service2" - val deployment = system.asInstanceOf[ActorSystemImpl].provider.deployer.lookupDeployment(service) - deployment must be('defined) - - deployment must be(Some( - Deploy( - service, - None, - RoundRobin, - NrOfInstances(3), - RemoteScope(Seq( - RemoteAddress(system.name, "wallace", 2552), RemoteAddress(system.name, "gromit", 2552)))))) - } - "be able to parse 'akka.actor.deployment._' with recipe" in { val service = "/user/service3" val deployment = system.asInstanceOf[ActorSystemImpl].provider.deployer.lookupDeployment(service) @@ -215,7 +199,7 @@ class DeployerSpec extends AkkaSpec(DeployerSpec.deployerConf) { } - "be able to parse 'akka.actor.deployment._' with specified cluster nodes" in { + "be able to parse 'akka.actor.deployment._' with specified cluster nodes" ignore { val service = "/user/service-cluster1" val deploymentConfig = system.asInstanceOf[ActorSystemImpl].provider.deployer.deploymentConfig val deployment = system.asInstanceOf[ActorSystemImpl].provider.deployer.lookupDeployment(service) @@ -229,7 +213,7 @@ class DeployerSpec extends AkkaSpec(DeployerSpec.deployerConf) { } } - "be able to parse 'akka.actor.deployment._' with specified cluster replication" in { + "be able to parse 'akka.actor.deployment._' with specified cluster replication" ignore { val service = "/user/service-cluster2" val deploymentConfig = system.asInstanceOf[ActorSystemImpl].provider.deployer.deploymentConfig val deployment = system.asInstanceOf[ActorSystemImpl].provider.deployer.lookupDeployment(service) diff --git a/akka-actor/src/main/scala/akka/actor/Actor.scala b/akka-actor/src/main/scala/akka/actor/Actor.scala index 8da17f13ea..ff41b5c2be 100644 --- a/akka-actor/src/main/scala/akka/actor/Actor.scala +++ b/akka-actor/src/main/scala/akka/actor/Actor.scala @@ -8,7 +8,6 @@ import DeploymentConfig._ import akka.dispatch._ import akka.routing._ import akka.util.Duration -import akka.remote.RemoteSupport import akka.japi.{ Creator, Procedure } import akka.serialization.{ Serializer, Serialization } import akka.event.Logging.Debug diff --git a/akka-actor/src/main/scala/akka/actor/ActorRef.scala b/akka-actor/src/main/scala/akka/actor/ActorRef.scala index 648e671c50..654b8141b8 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRef.scala @@ -9,8 +9,6 @@ import akka.util._ import scala.collection.immutable.Stack import java.lang.{ UnsupportedOperationException, IllegalStateException } import akka.serialization.Serialization -import java.net.InetSocketAddress -import akka.remote.RemoteAddress import java.util.concurrent.TimeUnit import akka.event.EventStream import akka.event.DeathWatch @@ -50,7 +48,6 @@ import scala.annotation.tailrec */ abstract class ActorRef extends java.lang.Comparable[ActorRef] with Serializable { scalaRef: InternalActorRef ⇒ - // Only mutable for RemoteServer in order to maintain identity across nodes /** * Returns the path for this actor (from this actor up to the root actor). @@ -190,7 +187,7 @@ private[akka] case object Nobody extends MinimalActorRef { * * @author Jonas Bonér */ -class LocalActorRef private[akka] ( +private[akka] class LocalActorRef private[akka] ( system: ActorSystemImpl, _props: Props, _supervisor: InternalActorRef, diff --git a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala index d68a1349f0..bcb9e872e0 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala @@ -13,9 +13,7 @@ import akka.config.ConfigurationException import akka.dispatch._ import akka.routing._ import akka.AkkaException -import com.eaio.uuid.UUID import akka.util.{ Duration, Switch, Helpers } -import akka.remote.RemoteAddress import org.jboss.netty.akka.util.internal.ConcurrentIdentityHashMap import akka.event._ import akka.event.Logging.Error._ @@ -342,9 +340,15 @@ class LocalActorRefProvider( val settings: ActorSystem.Settings, val eventStream: EventStream, val scheduler: Scheduler, - val deadLetters: InternalActorRef) extends ActorRefProvider { + val deadLetters: InternalActorRef, + val rootPath: ActorPath) extends ActorRefProvider { - val rootPath: ActorPath = new RootActorPath(LocalAddress(_systemName)) + def this(_systemName: String, + settings: ActorSystem.Settings, + eventStream: EventStream, + scheduler: Scheduler, + deadLetters: InternalActorRef) = + this(_systemName, settings, eventStream, scheduler, deadLetters, new RootActorPath(LocalAddress(_systemName))) // FIXME remove both val nodename: String = "local" diff --git a/akka-actor/src/main/scala/akka/actor/Deployer.scala b/akka-actor/src/main/scala/akka/actor/Deployer.scala index 7159c15ad6..d5f697523a 100644 --- a/akka-actor/src/main/scala/akka/actor/Deployer.scala +++ b/akka-actor/src/main/scala/akka/actor/Deployer.scala @@ -11,21 +11,19 @@ import akka.actor.DeploymentConfig._ import akka.AkkaException import akka.config.ConfigurationException import akka.util.Duration -import java.net.InetSocketAddress -import akka.remote.RemoteAddress import akka.event.EventStream import com.typesafe.config.Config -trait ActorDeployer { - private[akka] def init(deployments: Seq[Deploy]): Unit - private[akka] def deploy(deployment: Deploy): Unit - private[akka] def lookupDeploymentFor(path: String): Option[Deploy] +private[akka] trait ActorDeployer { + def init(deployments: Seq[Deploy]): Unit + def deploy(deployment: Deploy): Unit + def lookupDeploymentFor(path: String): Option[Deploy] def lookupDeployment(path: String): Option[Deploy] = path match { case null | "" ⇒ None case s if s.startsWith("$") ⇒ None case some ⇒ lookupDeploymentFor(some) } - private[akka] def deploy(deployment: Seq[Deploy]): Unit = deployment foreach (deploy(_)) + def deploy(deployment: Seq[Deploy]): Unit = deployment foreach (deploy(_)) } /** @@ -46,7 +44,7 @@ class Deployer(val settings: ActorSystem.Settings, val eventStream: EventStream, def start(): Unit = instance.toString //Force evaluation - private[akka] def init(deployments: Seq[Deploy]) = instance.init(deployments) + def init(deployments: Seq[Deploy]) = instance.init(deployments) def deploy(deployment: Deploy): Unit = instance.deploy(deployment) @@ -64,21 +62,21 @@ class Deployer(val settings: ActorSystem.Settings, val eventStream: EventStream, /** * Same as 'lookupDeploymentFor' but throws an exception if no deployment is bound. */ - private[akka] def deploymentFor(path: String): Deploy = { + protected def deploymentFor(path: String): Deploy = { lookupDeploymentFor(path) match { case Some(deployment) ⇒ deployment case None ⇒ thrownNoDeploymentBoundException(path) } } - private[akka] def lookupDeploymentFor(path: String): Option[Deploy] = + def lookupDeploymentFor(path: String): Option[Deploy] = instance.lookupDeploymentFor(path) - private[akka] def deploymentsInConfig: List[Deploy] = { + protected def deploymentsInConfig: List[Deploy] = { for (path ← pathsInConfig) yield lookupInConfig(path) } - private[akka] def pathsInConfig: List[String] = { + protected def pathsInConfig: List[String] = { def pathSubstring(path: String) = { val i = path.indexOf(".") if (i == -1) path else path.substring(0, i) @@ -94,7 +92,7 @@ class Deployer(val settings: ActorSystem.Settings, val eventStream: EventStream, /** * Lookup deployment in 'akka.conf' configuration file. */ - private[akka] def lookupInConfig(path: String, configuration: Config = settings.config): Deploy = { + protected def lookupInConfig(path: String, configuration: Config = settings.config): Deploy = { import scala.collection.JavaConverters._ import akka.util.ReflectiveAccess.getClassFor @@ -159,108 +157,84 @@ class Deployer(val settings: ActorSystem.Settings, val eventStream: EventStream, Some(ActorRecipe(implementationClass)) } - val remoteNodes = deploymentWithFallback.getStringList("remote.nodes").asScala.toSeq val clusterPreferredNodes = deploymentWithFallback.getStringList("cluster.preferred-nodes").asScala.toSeq - // -------------------------------- - // akka.actor.deployment..remote - // -------------------------------- - def parseRemote: Scope = { - def raiseRemoteNodeParsingError() = throw new ConfigurationException( - "Config option [" + deploymentKey + - ".remote.nodes] needs to be a list with elements on format \":\", was [" + remoteNodes.mkString(", ") + "]") - - val remoteAddresses = remoteNodes map { node ⇒ - val tokenizer = new java.util.StringTokenizer(node, ":") - val hostname = tokenizer.nextElement.toString - if ((hostname eq null) || (hostname == "")) raiseRemoteNodeParsingError() - val port = try tokenizer.nextElement.toString.toInt catch { - case e: Exception ⇒ raiseRemoteNodeParsingError() - } - if (port == 0) raiseRemoteNodeParsingError() - - RemoteAddress(settings.name, hostname, port) - } - - RemoteScope(remoteAddresses) - } - // -------------------------------- // akka.actor.deployment..cluster // -------------------------------- - def parseCluster: Scope = { - def raiseHomeConfigError() = throw new ConfigurationException( - "Config option [" + deploymentKey + - ".cluster.preferred-nodes] needs to be a list with elements on format\n'host:', 'ip:' or 'node:', was [" + - clusterPreferredNodes + "]") - - val remoteNodes = clusterPreferredNodes map { home ⇒ - if (!(home.startsWith("host:") || home.startsWith("node:") || home.startsWith("ip:"))) raiseHomeConfigError() - - val tokenizer = new java.util.StringTokenizer(home, ":") - val protocol = tokenizer.nextElement - val address = tokenizer.nextElement.asInstanceOf[String] - - // TODO host and ip protocols? - protocol match { - case "node" ⇒ Node(address) - case _ ⇒ raiseHomeConfigError() - } - } - deploymentConfig.ClusterScope(remoteNodes, parseClusterReplication) - } + // def parseCluster: Scope = { + // def raiseHomeConfigError() = throw new ConfigurationException( + // "Config option [" + deploymentKey + + // ".cluster.preferred-nodes] needs to be a list with elements on format\n'host:', 'ip:' or 'node:', was [" + + // clusterPreferredNodes + "]") + // + // val remoteNodes = clusterPreferredNodes map { home ⇒ + // if (!(home.startsWith("host:") || home.startsWith("node:") || home.startsWith("ip:"))) raiseHomeConfigError() + // + // val tokenizer = new java.util.StringTokenizer(home, ":") + // val protocol = tokenizer.nextElement + // val address = tokenizer.nextElement.asInstanceOf[String] + // + // // TODO host and ip protocols? + // protocol match { + // case "node" ⇒ Node(address) + // case _ ⇒ raiseHomeConfigError() + // } + // } + // deploymentConfig.ClusterScope(remoteNodes, parseClusterReplication) + // } // -------------------------------- // akka.actor.deployment..cluster.replication // -------------------------------- - def parseClusterReplication: ReplicationScheme = { - deployment.hasPath("cluster.replication") match { - case false ⇒ Transient - case true ⇒ - val replicationConfigWithFallback = deploymentWithFallback.getConfig("cluster.replication") - val storage = replicationConfigWithFallback.getString("storage") match { - case "transaction-log" ⇒ TransactionLog - case "data-grid" ⇒ DataGrid - case unknown ⇒ - throw new ConfigurationException("Config option [" + deploymentKey + - ".cluster.replication.storage] needs to be either [\"transaction-log\"] or [\"data-grid\"] - was [" + - unknown + "]") - } - val strategy = replicationConfigWithFallback.getString("strategy") match { - case "write-through" ⇒ WriteThrough - case "write-behind" ⇒ WriteBehind - case unknown ⇒ - throw new ConfigurationException("Config option [" + deploymentKey + - ".cluster.replication.strategy] needs to be either [\"write-through\"] or [\"write-behind\"] - was [" + - unknown + "]") - } - Replication(storage, strategy) - } - } + // def parseClusterReplication: ReplicationScheme = { + // deployment.hasPath("cluster.replication") match { + // case false ⇒ Transient + // case true ⇒ + // val replicationConfigWithFallback = deploymentWithFallback.getConfig("cluster.replication") + // val storage = replicationConfigWithFallback.getString("storage") match { + // case "transaction-log" ⇒ TransactionLog + // case "data-grid" ⇒ DataGrid + // case unknown ⇒ + // throw new ConfigurationException("Config option [" + deploymentKey + + // ".cluster.replication.storage] needs to be either [\"transaction-log\"] or [\"data-grid\"] - was [" + + // unknown + "]") + // } + // val strategy = replicationConfigWithFallback.getString("strategy") match { + // case "write-through" ⇒ WriteThrough + // case "write-behind" ⇒ WriteBehind + // case unknown ⇒ + // throw new ConfigurationException("Config option [" + deploymentKey + + // ".cluster.replication.strategy] needs to be either [\"write-through\"] or [\"write-behind\"] - was [" + + // unknown + "]") + // } + // Replication(storage, strategy) + // } + // } + // + // val scope = (remoteNodes, clusterPreferredNodes) match { + // case (Nil, Nil) ⇒ + // LocalScope + // case (_, Nil) ⇒ + // // we have a 'remote' config section + // parseRemote + // case (Nil, _) ⇒ + // // we have a 'cluster' config section + // parseCluster + // case (_, _) ⇒ throw new ConfigurationException( + // "Configuration for deployment ID [" + path + "] can not have both 'remote' and 'cluster' sections.") + // } - val scope = (remoteNodes, clusterPreferredNodes) match { - case (Nil, Nil) ⇒ - LocalScope - case (_, Nil) ⇒ - // we have a 'remote' config section - parseRemote - case (Nil, _) ⇒ - // we have a 'cluster' config section - parseCluster - case (_, _) ⇒ throw new ConfigurationException( - "Configuration for deployment ID [" + path + "] can not have both 'remote' and 'cluster' sections.") - } - - Deploy(path, recipe, router, nrOfInstances, scope) + Deploy(path, recipe, router, nrOfInstances, LocalScope) } - private[akka] def throwDeploymentBoundException(deployment: Deploy): Nothing = { + protected def throwDeploymentBoundException(deployment: Deploy): Nothing = { val e = new DeploymentAlreadyBoundException("Path [" + deployment.path + "] already bound to [" + deployment + "]") log.error(e, e.getMessage) throw e } - private[akka] def thrownNoDeploymentBoundException(path: String): Nothing = { + protected def thrownNoDeploymentBoundException(path: String): Nothing = { val e = new NoDeploymentBoundException("Path [" + path + "] is not bound to a deployment") log.error(e, e.getMessage) throw e @@ -275,13 +249,13 @@ class Deployer(val settings: ActorSystem.Settings, val eventStream: EventStream, class LocalDeployer extends ActorDeployer { private val deployments = new ConcurrentHashMap[String, Deploy] - private[akka] def init(deployments: Seq[Deploy]): Unit = deployments foreach deploy // deploy + def init(deployments: Seq[Deploy]): Unit = deployments foreach deploy // deploy - private[akka] def shutdown(): Unit = deployments.clear() //TODO do something else/more? + def shutdown(): Unit = deployments.clear() //TODO do something else/more? - private[akka] def deploy(deployment: Deploy): Unit = deployments.putIfAbsent(deployment.path, deployment) + def deploy(deployment: Deploy): Unit = deployments.putIfAbsent(deployment.path, deployment) - private[akka] def lookupDeploymentFor(path: String): Option[Deploy] = Option(deployments.get(path)) + def lookupDeploymentFor(path: String): Option[Deploy] = Option(deployments.get(path)) } class DeploymentException private[akka] (message: String) extends AkkaException(message) diff --git a/akka-actor/src/main/scala/akka/actor/DeploymentConfig.scala b/akka-actor/src/main/scala/akka/actor/DeploymentConfig.scala index 9a3d934f01..edeec3a656 100644 --- a/akka-actor/src/main/scala/akka/actor/DeploymentConfig.scala +++ b/akka-actor/src/main/scala/akka/actor/DeploymentConfig.scala @@ -6,7 +6,6 @@ package akka.actor import akka.util.Duration import akka.routing.RouterType -import akka.remote.RemoteAddress object DeploymentConfig { @@ -52,7 +51,7 @@ object DeploymentConfig { // -------------------------------- // --- Scope // -------------------------------- - sealed trait Scope + trait Scope // For Java API case class LocalScope() extends Scope @@ -60,8 +59,6 @@ object DeploymentConfig { // For Scala API case object LocalScope extends Scope - case class RemoteScope(nodes: Iterable[RemoteAddress]) extends Scope - // -------------------------------- // --- Home // -------------------------------- diff --git a/akka-actor/src/main/scala/akka/routing/ConnectionManager.scala b/akka-actor/src/main/scala/akka/routing/ConnectionManager.scala index 6e45a50cad..cf7dd3fda5 100644 --- a/akka-actor/src/main/scala/akka/routing/ConnectionManager.scala +++ b/akka-actor/src/main/scala/akka/routing/ConnectionManager.scala @@ -9,8 +9,6 @@ import akka.actor._ import scala.annotation.tailrec import java.util.concurrent.atomic.{ AtomicReference, AtomicInteger } -import java.net.InetSocketAddress -import akka.remote.RemoteAddress import collection.JavaConverters /** @@ -69,16 +67,6 @@ trait ConnectionManager { * @param ref the dead */ def remove(deadRef: ActorRef) - - /** - * Creates a new connection (ActorRef) if it didn't exist. Atomically. - */ - def putIfAbsent(address: RemoteAddress, newConnectionFactory: () ⇒ ActorRef): ActorRef - - /** - * Fails over connections from one address to another. - */ - def failOver(from: RemoteAddress, to: RemoteAddress) } /** @@ -125,10 +113,4 @@ class LocalConnectionManager(initialConnections: Iterable[ActorRef]) extends Con if (!state.compareAndSet(oldState, newState)) remove(ref) } } - - def failOver(from: RemoteAddress, to: RemoteAddress) {} // do nothing here - - def putIfAbsent(address: RemoteAddress, newConnectionFactory: () ⇒ ActorRef): ActorRef = { - throw new UnsupportedOperationException("Not supported") - } } diff --git a/akka-remote/src/main/scala/akka/remote/Gossiper.scala b/akka-remote/src/main/scala/akka/remote/Gossiper.scala index 20a047952f..cc52403f9f 100644 --- a/akka-remote/src/main/scala/akka/remote/Gossiper.scala +++ b/akka-remote/src/main/scala/akka/remote/Gossiper.scala @@ -161,7 +161,7 @@ class Gossiper(remote: Remote) { node ← oldAvailableNodes if connectionManager.connectionFor(node).isEmpty } { - val connectionFactory = () ⇒ RemoteActorRef(remote.system.provider, remote.server, gossipingNode, remote.remoteDaemon.path, None) + val connectionFactory = () ⇒ new RemoteActorRef(remote.system.provider, remote.server, RootActorPath(gossipingNode) / remote.remoteDaemon.path.elements, None) connectionManager.putIfAbsent(node, connectionFactory) // create a new remote connection to the new node oldState.nodeMembershipChangeListeners foreach (_ nodeConnected node) // notify listeners about the new nodes } diff --git a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala index 0d5d5f7838..f4954e919f 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala @@ -62,19 +62,22 @@ class RemoteActorRefProvider( val remoteAddress = RemoteAddress(system.name, remoteExtension.serverSettings.Hostname, remoteExtension.serverSettings.Port) new RootActorPath(remoteAddress) } - private lazy val local = new LocalActorRefProvider(systemName, settings, eventStream, scheduler, _deadLetters) - private[akka] lazy val remote = new Remote(system, nodename) + private lazy val local = new LocalActorRefProvider(systemName, settings, eventStream, scheduler, _deadLetters, rootPath) + private[akka] lazy val remote = { + val r = new Remote(system, nodename) + terminationFuture.onComplete(_ ⇒ r.server.shutdown()) + r + } private lazy val remoteDaemonConnectionManager = new RemoteConnectionManager(system, remote) def init(_system: ActorSystemImpl) { system = _system local.init(_system) - terminationFuture.onComplete(_ ⇒ remote.server.shutdown()) } private[akka] def terminationFuture = local.terminationFuture - private[akka] def deployer: Deployer = local.deployer + private[akka] def deployer: Deployer = new RemoteDeployer(settings, eventStream, nodename) def dispatcher = local.dispatcher def defaultTimeout = settings.ActorTimeout @@ -89,7 +92,7 @@ class RemoteActorRefProvider( case null ⇒ val actor: InternalActorRef = try { deployer.lookupDeploymentFor(path.toString) match { - case Some(DeploymentConfig.Deploy(_, _, routerType, nrOfInstances, DeploymentConfig.RemoteScope(remoteAddresses))) ⇒ + case Some(DeploymentConfig.Deploy(_, _, routerType, nrOfInstances, RemoteDeploymentConfig.RemoteScope(remoteAddresses))) ⇒ def isReplicaNode: Boolean = remoteAddresses exists { _ == remote.remoteAddress } @@ -142,8 +145,7 @@ class RemoteActorRefProvider( } val connections = (Map.empty[RemoteAddress, ActorRef] /: remoteAddresses) { (conns, a) ⇒ - val remoteAddress = RemoteAddress(system.name, a.host, a.port) - conns + (remoteAddress -> RemoteActorRef(remote.system.provider, remote.server, remoteAddress, path, None)) + conns + (a -> new RemoteActorRef(this, remote.server, path, None)) // FIXME RK correct path must be put in here } val connectionManager = new RemoteConnectionManager(system, remote, connections) @@ -180,8 +182,19 @@ class RemoteActorRefProvider( new RoutedActorRef(system, props, supervisor, name) } - def actorFor(path: ActorPath): InternalActorRef = local.actorFor(path) - def actorFor(ref: InternalActorRef, path: String): InternalActorRef = local.actorFor(ref, path) + def actorFor(path: ActorPath): InternalActorRef = path.root match { + case `rootPath` ⇒ actorFor(rootGuardian, path.elements) + case RootActorPath(_: RemoteAddress, _) ⇒ new RemoteActorRef(this, remote.server, path, None) + case _ ⇒ local.actorFor(path) + } + + def actorFor(ref: InternalActorRef, path: String): InternalActorRef = path match { + case RemoteActorPath(address, elems) ⇒ + if (address == rootPath.address) actorFor(rootGuardian, elems) + else new RemoteActorRef(this, remote.server, new RootActorPath(address) / elems, None) + case _ ⇒ local.actorFor(ref, path) + } + def actorFor(ref: InternalActorRef, path: Iterable[String]): InternalActorRef = local.actorFor(ref, path) // TODO remove me @@ -257,11 +270,10 @@ class RemoteActorRefProvider( * * @author Jonas Bonér */ -private[akka] case class RemoteActorRef private[akka] ( +private[akka] class RemoteActorRef private[akka] ( provider: ActorRefProvider, remote: RemoteSupport, - remoteAddress: RemoteAddress, - path: ActorPath, + val path: ActorPath, loader: Option[ClassLoader]) extends InternalActorRef { @@ -276,7 +288,7 @@ private[akka] case class RemoteActorRef private[akka] ( def sendSystemMessage(message: SystemMessage): Unit = throw new UnsupportedOperationException("Not supported for RemoteActorRef") - override def !(message: Any)(implicit sender: ActorRef = null): Unit = remote.send(message, Option(sender), remoteAddress, this, loader) + override def !(message: Any)(implicit sender: ActorRef = null): Unit = remote.send(message, Option(sender), this, loader) override def ?(message: Any)(implicit timeout: Timeout): Future[Any] = provider.ask(message, this, timeout) @@ -288,7 +300,7 @@ private[akka] case class RemoteActorRef private[akka] ( synchronized { if (running) { running = false - remote.send(new Terminate(), None, remoteAddress, this, loader) + remote.send(new Terminate(), None, this, loader) } } } diff --git a/akka-remote/src/main/scala/akka/remote/RemoteConnectionManager.scala b/akka-remote/src/main/scala/akka/remote/RemoteConnectionManager.scala index 7b739b6199..d0b7a863d4 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteConnectionManager.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteConnectionManager.scala @@ -149,5 +149,5 @@ class RemoteConnectionManager( } private[remote] def newConnection(remoteAddress: RemoteAddress, actorPath: ActorPath) = - RemoteActorRef(remote.system.provider, remote.server, remoteAddress, actorPath, None) + new RemoteActorRef(remote.system.provider, remote.server, actorPath, None) } diff --git a/akka-remote/src/main/scala/akka/remote/RemoteDeployer.scala b/akka-remote/src/main/scala/akka/remote/RemoteDeployer.scala new file mode 100644 index 0000000000..487ad683a3 --- /dev/null +++ b/akka-remote/src/main/scala/akka/remote/RemoteDeployer.scala @@ -0,0 +1,66 @@ +/** + * Copyright (C) 2009-2011 Typesafe Inc. + */ +package akka.remote + +import akka.actor._ +import akka.actor.DeploymentConfig._ +import akka.event.EventStream +import com.typesafe.config._ +import akka.config.ConfigurationException + +object RemoteDeploymentConfig { + + case class RemoteScope(nodes: Iterable[RemoteAddress]) extends DeploymentConfig.Scope + +} + +class RemoteDeployer(_settings: ActorSystem.Settings, _eventStream: EventStream, _nodename: String) + extends Deployer(_settings, _eventStream, _nodename) { + + import RemoteDeploymentConfig._ + + override protected def lookupInConfig(path: String, configuration: Config = settings.config): Deploy = { + import scala.collection.JavaConverters._ + import akka.util.ReflectiveAccess._ + + val defaultDeploymentConfig = configuration.getConfig("akka.actor.deployment.default") + + // -------------------------------- + // akka.actor.deployment. + // -------------------------------- + val deploymentKey = "akka.actor.deployment." + path + val deployment = configuration.getConfig(deploymentKey) + + val deploymentWithFallback = deployment.withFallback(defaultDeploymentConfig) + + val remoteNodes = deploymentWithFallback.getStringList("remote.nodes").asScala.toSeq + + // -------------------------------- + // akka.actor.deployment..remote + // -------------------------------- + def parseRemote: Scope = { + def raiseRemoteNodeParsingError() = throw new ConfigurationException( + "Config option [" + deploymentKey + + ".remote.nodes] needs to be a list with elements on format \":\", was [" + remoteNodes.mkString(", ") + "]") + + val remoteAddresses = remoteNodes map { node ⇒ + val tokenizer = new java.util.StringTokenizer(node, ":") + val hostname = tokenizer.nextElement.toString + if ((hostname eq null) || (hostname == "")) raiseRemoteNodeParsingError() + val port = try tokenizer.nextElement.toString.toInt catch { + case e: Exception ⇒ raiseRemoteNodeParsingError() + } + if (port == 0) raiseRemoteNodeParsingError() + + RemoteAddress(settings.name, hostname, port) + } + + RemoteScope(remoteAddresses) + } + + val local = super.lookupInConfig(path, configuration) + if (remoteNodes.isEmpty) local else local.copy(scope = parseRemote) + } + +} \ No newline at end of file diff --git a/akka-actor/src/main/scala/akka/remote/RemoteInterface.scala b/akka-remote/src/main/scala/akka/remote/RemoteInterface.scala similarity index 98% rename from akka-actor/src/main/scala/akka/remote/RemoteInterface.scala rename to akka-remote/src/main/scala/akka/remote/RemoteInterface.scala index 3639f056e8..3791924f19 100644 --- a/akka-actor/src/main/scala/akka/remote/RemoteInterface.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteInterface.scala @@ -175,8 +175,7 @@ abstract class RemoteSupport(val system: ActorSystem) { protected[akka] def send(message: Any, senderOption: Option[ActorRef], - remoteAddress: RemoteAddress, - recipient: ActorRef, + recipient: RemoteActorRef, loader: Option[ClassLoader]): Unit protected[akka] def notifyListeners(message: RemoteLifeCycleEvent): Unit = system.eventStream.publish(message) diff --git a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala index b412fcdf3e..8ceb56b16b 100644 --- a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala +++ b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala @@ -367,10 +367,11 @@ class NettyRemoteSupport(_system: ActorSystem, val remote: Remote) extends Remot protected[akka] def send( message: Any, senderOption: Option[ActorRef], - recipientAddress: RemoteAddress, - recipient: ActorRef, + recipient: RemoteActorRef, loader: Option[ClassLoader]): Unit = { + val recipientAddress = recipient.path.address.asInstanceOf[RemoteAddress] + clientsLock.readLock.lock try { val client = remoteClients.get(recipientAddress) match { diff --git a/akka-remote/src/test/scala/akka/remote/RemoteDeployerSpec.scala b/akka-remote/src/test/scala/akka/remote/RemoteDeployerSpec.scala new file mode 100644 index 0000000000..3956d8fdab --- /dev/null +++ b/akka-remote/src/test/scala/akka/remote/RemoteDeployerSpec.scala @@ -0,0 +1,108 @@ +/** + * Copyright (C) 2009-2011 Typesafe Inc. + */ +package akka.remote + +import akka.testkit._ +import akka.actor._ +import com.typesafe.config._ +import akka.actor.DeploymentConfig._ +import akka.remote.RemoteDeploymentConfig.RemoteScope + +object RemoteDeployerSpec { + val deployerConf = ConfigFactory.parseString(""" + akka.actor.provider = "akka.remote.RemoteActorRefProvider" + akka.cluster.nodename = Whatever + akka.actor.deployment { + /user/service1 { + } + /user/service2 { + router = round-robin + nr-of-instances = 3 + remote { + nodes = ["wallace:2552", "gromit:2552"] + } + } + /user/service3 { + create-as { + class = "akka.actor.DeployerSpec$RecipeActor" + } + } + /user/service-auto { + router = round-robin + nr-of-instances = auto + } + /user/service-direct { + router = direct + } + /user/service-direct2 { + router = direct + # nr-of-instances ignored when router = direct + nr-of-instances = 2 + } + /user/service-round-robin { + router = round-robin + } + /user/service-random { + router = random + } + /user/service-scatter-gather { + router = scatter-gather + } + /user/service-least-cpu { + router = least-cpu + } + /user/service-least-ram { + router = least-ram + } + /user/service-least-messages { + router = least-messages + } + /user/service-custom { + router = org.my.Custom + } + /user/service-cluster1 { + cluster { + preferred-nodes = ["node:wallace", "node:gromit"] + } + } + /user/service-cluster2 { + cluster { + preferred-nodes = ["node:wallace", "node:gromit"] + replication { + strategy = write-behind + } + } + } + } + """, ConfigParseOptions.defaults) + + class RecipeActor extends Actor { + def receive = { case _ ⇒ } + } + +} + +@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) +class RemoteDeployerSpec extends AkkaSpec(RemoteDeployerSpec.deployerConf) { + + "A RemoteDeployer" must { + + "be able to parse 'akka.actor.deployment._' with specified remote nodes" in { + val service = "/user/service2" + val deployment = system.asInstanceOf[ActorSystemImpl].provider.deployer.lookupDeployment(service) + deployment must be('defined) + + deployment must be(Some( + Deploy( + service, + None, + RoundRobin, + NrOfInstances(3), + RemoteScope(Seq( + RemoteAddress(system.name, "wallace", 2552), RemoteAddress(system.name, "gromit", 2552)))))) + } + + } + +} \ No newline at end of file