remove references to Remote* from akka-actor

- moved RemoteInterface.scala into akka-remote, not needed anymore since
  the plugin is loaded as ActorRefProvider now
- removed some old unused imports
- split out remote aspects from Deployer & DeployerSpec into
  RemoteDeployer & RemoteDeployerSpec (the latter in akka-remote)
- created a ticket for cleaning up the rest, mostly remove all
  occurrences of “remote” and “cluster” from akka-actor in this way

All of this was triggered by wanting to:
- change the signature of RemoteSupport.send to require a RemoteActorRef
  recipient
This commit is contained in:
Roland 2011-12-07 16:29:12 +01:00
parent 9a74bcafb3
commit 25e23a3378
14 changed files with 294 additions and 171 deletions

View file

@ -7,7 +7,6 @@ package akka.actor
import akka.testkit.AkkaSpec import akka.testkit.AkkaSpec
import akka.util.duration._ import akka.util.duration._
import DeploymentConfig._ import DeploymentConfig._
import akka.remote.RemoteAddress
import com.typesafe.config.ConfigFactory import com.typesafe.config.ConfigFactory
import com.typesafe.config.ConfigParseOptions import com.typesafe.config.ConfigParseOptions
@ -108,21 +107,6 @@ class DeployerSpec extends AkkaSpec(DeployerSpec.deployerConf) {
deployment must be(None) deployment must be(None)
} }
"be able to parse 'akka.actor.deployment._' with specified remote nodes" in {
val service = "/user/service2"
val deployment = system.asInstanceOf[ActorSystemImpl].provider.deployer.lookupDeployment(service)
deployment must be('defined)
deployment must be(Some(
Deploy(
service,
None,
RoundRobin,
NrOfInstances(3),
RemoteScope(Seq(
RemoteAddress(system.name, "wallace", 2552), RemoteAddress(system.name, "gromit", 2552))))))
}
"be able to parse 'akka.actor.deployment._' with recipe" in { "be able to parse 'akka.actor.deployment._' with recipe" in {
val service = "/user/service3" val service = "/user/service3"
val deployment = system.asInstanceOf[ActorSystemImpl].provider.deployer.lookupDeployment(service) val deployment = system.asInstanceOf[ActorSystemImpl].provider.deployer.lookupDeployment(service)
@ -215,7 +199,7 @@ class DeployerSpec extends AkkaSpec(DeployerSpec.deployerConf) {
} }
"be able to parse 'akka.actor.deployment._' with specified cluster nodes" in { "be able to parse 'akka.actor.deployment._' with specified cluster nodes" ignore {
val service = "/user/service-cluster1" val service = "/user/service-cluster1"
val deploymentConfig = system.asInstanceOf[ActorSystemImpl].provider.deployer.deploymentConfig val deploymentConfig = system.asInstanceOf[ActorSystemImpl].provider.deployer.deploymentConfig
val deployment = system.asInstanceOf[ActorSystemImpl].provider.deployer.lookupDeployment(service) val deployment = system.asInstanceOf[ActorSystemImpl].provider.deployer.lookupDeployment(service)
@ -229,7 +213,7 @@ class DeployerSpec extends AkkaSpec(DeployerSpec.deployerConf) {
} }
} }
"be able to parse 'akka.actor.deployment._' with specified cluster replication" in { "be able to parse 'akka.actor.deployment._' with specified cluster replication" ignore {
val service = "/user/service-cluster2" val service = "/user/service-cluster2"
val deploymentConfig = system.asInstanceOf[ActorSystemImpl].provider.deployer.deploymentConfig val deploymentConfig = system.asInstanceOf[ActorSystemImpl].provider.deployer.deploymentConfig
val deployment = system.asInstanceOf[ActorSystemImpl].provider.deployer.lookupDeployment(service) val deployment = system.asInstanceOf[ActorSystemImpl].provider.deployer.lookupDeployment(service)

View file

@ -8,7 +8,6 @@ import DeploymentConfig._
import akka.dispatch._ import akka.dispatch._
import akka.routing._ import akka.routing._
import akka.util.Duration import akka.util.Duration
import akka.remote.RemoteSupport
import akka.japi.{ Creator, Procedure } import akka.japi.{ Creator, Procedure }
import akka.serialization.{ Serializer, Serialization } import akka.serialization.{ Serializer, Serialization }
import akka.event.Logging.Debug import akka.event.Logging.Debug

View file

@ -9,8 +9,6 @@ import akka.util._
import scala.collection.immutable.Stack import scala.collection.immutable.Stack
import java.lang.{ UnsupportedOperationException, IllegalStateException } import java.lang.{ UnsupportedOperationException, IllegalStateException }
import akka.serialization.Serialization import akka.serialization.Serialization
import java.net.InetSocketAddress
import akka.remote.RemoteAddress
import java.util.concurrent.TimeUnit import java.util.concurrent.TimeUnit
import akka.event.EventStream import akka.event.EventStream
import akka.event.DeathWatch import akka.event.DeathWatch
@ -50,7 +48,6 @@ import scala.annotation.tailrec
*/ */
abstract class ActorRef extends java.lang.Comparable[ActorRef] with Serializable { abstract class ActorRef extends java.lang.Comparable[ActorRef] with Serializable {
scalaRef: InternalActorRef scalaRef: InternalActorRef
// Only mutable for RemoteServer in order to maintain identity across nodes
/** /**
* Returns the path for this actor (from this actor up to the root actor). * Returns the path for this actor (from this actor up to the root actor).
@ -190,7 +187,7 @@ private[akka] case object Nobody extends MinimalActorRef {
* *
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a> * @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/ */
class LocalActorRef private[akka] ( private[akka] class LocalActorRef private[akka] (
system: ActorSystemImpl, system: ActorSystemImpl,
_props: Props, _props: Props,
_supervisor: InternalActorRef, _supervisor: InternalActorRef,

View file

@ -13,9 +13,7 @@ import akka.config.ConfigurationException
import akka.dispatch._ import akka.dispatch._
import akka.routing._ import akka.routing._
import akka.AkkaException import akka.AkkaException
import com.eaio.uuid.UUID
import akka.util.{ Duration, Switch, Helpers } import akka.util.{ Duration, Switch, Helpers }
import akka.remote.RemoteAddress
import org.jboss.netty.akka.util.internal.ConcurrentIdentityHashMap import org.jboss.netty.akka.util.internal.ConcurrentIdentityHashMap
import akka.event._ import akka.event._
import akka.event.Logging.Error._ import akka.event.Logging.Error._
@ -342,9 +340,15 @@ class LocalActorRefProvider(
val settings: ActorSystem.Settings, val settings: ActorSystem.Settings,
val eventStream: EventStream, val eventStream: EventStream,
val scheduler: Scheduler, val scheduler: Scheduler,
val deadLetters: InternalActorRef) extends ActorRefProvider { val deadLetters: InternalActorRef,
val rootPath: ActorPath) extends ActorRefProvider {
val rootPath: ActorPath = new RootActorPath(LocalAddress(_systemName)) def this(_systemName: String,
settings: ActorSystem.Settings,
eventStream: EventStream,
scheduler: Scheduler,
deadLetters: InternalActorRef) =
this(_systemName, settings, eventStream, scheduler, deadLetters, new RootActorPath(LocalAddress(_systemName)))
// FIXME remove both // FIXME remove both
val nodename: String = "local" val nodename: String = "local"

View file

@ -11,21 +11,19 @@ import akka.actor.DeploymentConfig._
import akka.AkkaException import akka.AkkaException
import akka.config.ConfigurationException import akka.config.ConfigurationException
import akka.util.Duration import akka.util.Duration
import java.net.InetSocketAddress
import akka.remote.RemoteAddress
import akka.event.EventStream import akka.event.EventStream
import com.typesafe.config.Config import com.typesafe.config.Config
trait ActorDeployer { private[akka] trait ActorDeployer {
private[akka] def init(deployments: Seq[Deploy]): Unit def init(deployments: Seq[Deploy]): Unit
private[akka] def deploy(deployment: Deploy): Unit def deploy(deployment: Deploy): Unit
private[akka] def lookupDeploymentFor(path: String): Option[Deploy] def lookupDeploymentFor(path: String): Option[Deploy]
def lookupDeployment(path: String): Option[Deploy] = path match { def lookupDeployment(path: String): Option[Deploy] = path match {
case null | "" None case null | "" None
case s if s.startsWith("$") None case s if s.startsWith("$") None
case some lookupDeploymentFor(some) case some lookupDeploymentFor(some)
} }
private[akka] def deploy(deployment: Seq[Deploy]): Unit = deployment foreach (deploy(_)) def deploy(deployment: Seq[Deploy]): Unit = deployment foreach (deploy(_))
} }
/** /**
@ -46,7 +44,7 @@ class Deployer(val settings: ActorSystem.Settings, val eventStream: EventStream,
def start(): Unit = instance.toString //Force evaluation def start(): Unit = instance.toString //Force evaluation
private[akka] def init(deployments: Seq[Deploy]) = instance.init(deployments) def init(deployments: Seq[Deploy]) = instance.init(deployments)
def deploy(deployment: Deploy): Unit = instance.deploy(deployment) def deploy(deployment: Deploy): Unit = instance.deploy(deployment)
@ -64,21 +62,21 @@ class Deployer(val settings: ActorSystem.Settings, val eventStream: EventStream,
/** /**
* Same as 'lookupDeploymentFor' but throws an exception if no deployment is bound. * Same as 'lookupDeploymentFor' but throws an exception if no deployment is bound.
*/ */
private[akka] def deploymentFor(path: String): Deploy = { protected def deploymentFor(path: String): Deploy = {
lookupDeploymentFor(path) match { lookupDeploymentFor(path) match {
case Some(deployment) deployment case Some(deployment) deployment
case None thrownNoDeploymentBoundException(path) case None thrownNoDeploymentBoundException(path)
} }
} }
private[akka] def lookupDeploymentFor(path: String): Option[Deploy] = def lookupDeploymentFor(path: String): Option[Deploy] =
instance.lookupDeploymentFor(path) instance.lookupDeploymentFor(path)
private[akka] def deploymentsInConfig: List[Deploy] = { protected def deploymentsInConfig: List[Deploy] = {
for (path pathsInConfig) yield lookupInConfig(path) for (path pathsInConfig) yield lookupInConfig(path)
} }
private[akka] def pathsInConfig: List[String] = { protected def pathsInConfig: List[String] = {
def pathSubstring(path: String) = { def pathSubstring(path: String) = {
val i = path.indexOf(".") val i = path.indexOf(".")
if (i == -1) path else path.substring(0, i) if (i == -1) path else path.substring(0, i)
@ -94,7 +92,7 @@ class Deployer(val settings: ActorSystem.Settings, val eventStream: EventStream,
/** /**
* Lookup deployment in 'akka.conf' configuration file. * Lookup deployment in 'akka.conf' configuration file.
*/ */
private[akka] def lookupInConfig(path: String, configuration: Config = settings.config): Deploy = { protected def lookupInConfig(path: String, configuration: Config = settings.config): Deploy = {
import scala.collection.JavaConverters._ import scala.collection.JavaConverters._
import akka.util.ReflectiveAccess.getClassFor import akka.util.ReflectiveAccess.getClassFor
@ -159,108 +157,84 @@ class Deployer(val settings: ActorSystem.Settings, val eventStream: EventStream,
Some(ActorRecipe(implementationClass)) Some(ActorRecipe(implementationClass))
} }
val remoteNodes = deploymentWithFallback.getStringList("remote.nodes").asScala.toSeq
val clusterPreferredNodes = deploymentWithFallback.getStringList("cluster.preferred-nodes").asScala.toSeq val clusterPreferredNodes = deploymentWithFallback.getStringList("cluster.preferred-nodes").asScala.toSeq
// --------------------------------
// akka.actor.deployment.<path>.remote
// --------------------------------
def parseRemote: Scope = {
def raiseRemoteNodeParsingError() = throw new ConfigurationException(
"Config option [" + deploymentKey +
".remote.nodes] needs to be a list with elements on format \"<hostname>:<port>\", was [" + remoteNodes.mkString(", ") + "]")
val remoteAddresses = remoteNodes map { node
val tokenizer = new java.util.StringTokenizer(node, ":")
val hostname = tokenizer.nextElement.toString
if ((hostname eq null) || (hostname == "")) raiseRemoteNodeParsingError()
val port = try tokenizer.nextElement.toString.toInt catch {
case e: Exception raiseRemoteNodeParsingError()
}
if (port == 0) raiseRemoteNodeParsingError()
RemoteAddress(settings.name, hostname, port)
}
RemoteScope(remoteAddresses)
}
// -------------------------------- // --------------------------------
// akka.actor.deployment.<path>.cluster // akka.actor.deployment.<path>.cluster
// -------------------------------- // --------------------------------
def parseCluster: Scope = { // def parseCluster: Scope = {
def raiseHomeConfigError() = throw new ConfigurationException( // def raiseHomeConfigError() = throw new ConfigurationException(
"Config option [" + deploymentKey + // "Config option [" + deploymentKey +
".cluster.preferred-nodes] needs to be a list with elements on format\n'host:<hostname>', 'ip:<ip address>' or 'node:<node name>', was [" + // ".cluster.preferred-nodes] needs to be a list with elements on format\n'host:<hostname>', 'ip:<ip address>' or 'node:<node name>', was [" +
clusterPreferredNodes + "]") // clusterPreferredNodes + "]")
//
val remoteNodes = clusterPreferredNodes map { home // val remoteNodes = clusterPreferredNodes map { home
if (!(home.startsWith("host:") || home.startsWith("node:") || home.startsWith("ip:"))) raiseHomeConfigError() // if (!(home.startsWith("host:") || home.startsWith("node:") || home.startsWith("ip:"))) raiseHomeConfigError()
//
val tokenizer = new java.util.StringTokenizer(home, ":") // val tokenizer = new java.util.StringTokenizer(home, ":")
val protocol = tokenizer.nextElement // val protocol = tokenizer.nextElement
val address = tokenizer.nextElement.asInstanceOf[String] // val address = tokenizer.nextElement.asInstanceOf[String]
//
// TODO host and ip protocols? // // TODO host and ip protocols?
protocol match { // protocol match {
case "node" Node(address) // case "node" Node(address)
case _ raiseHomeConfigError() // case _ raiseHomeConfigError()
} // }
} // }
deploymentConfig.ClusterScope(remoteNodes, parseClusterReplication) // deploymentConfig.ClusterScope(remoteNodes, parseClusterReplication)
} // }
// -------------------------------- // --------------------------------
// akka.actor.deployment.<path>.cluster.replication // akka.actor.deployment.<path>.cluster.replication
// -------------------------------- // --------------------------------
def parseClusterReplication: ReplicationScheme = { // def parseClusterReplication: ReplicationScheme = {
deployment.hasPath("cluster.replication") match { // deployment.hasPath("cluster.replication") match {
case false Transient // case false Transient
case true // case true
val replicationConfigWithFallback = deploymentWithFallback.getConfig("cluster.replication") // val replicationConfigWithFallback = deploymentWithFallback.getConfig("cluster.replication")
val storage = replicationConfigWithFallback.getString("storage") match { // val storage = replicationConfigWithFallback.getString("storage") match {
case "transaction-log" TransactionLog // case "transaction-log" TransactionLog
case "data-grid" DataGrid // case "data-grid" DataGrid
case unknown // case unknown
throw new ConfigurationException("Config option [" + deploymentKey + // throw new ConfigurationException("Config option [" + deploymentKey +
".cluster.replication.storage] needs to be either [\"transaction-log\"] or [\"data-grid\"] - was [" + // ".cluster.replication.storage] needs to be either [\"transaction-log\"] or [\"data-grid\"] - was [" +
unknown + "]") // unknown + "]")
} // }
val strategy = replicationConfigWithFallback.getString("strategy") match { // val strategy = replicationConfigWithFallback.getString("strategy") match {
case "write-through" WriteThrough // case "write-through" WriteThrough
case "write-behind" WriteBehind // case "write-behind" WriteBehind
case unknown // case unknown
throw new ConfigurationException("Config option [" + deploymentKey + // throw new ConfigurationException("Config option [" + deploymentKey +
".cluster.replication.strategy] needs to be either [\"write-through\"] or [\"write-behind\"] - was [" + // ".cluster.replication.strategy] needs to be either [\"write-through\"] or [\"write-behind\"] - was [" +
unknown + "]") // unknown + "]")
} // }
Replication(storage, strategy) // Replication(storage, strategy)
} // }
} // }
//
// val scope = (remoteNodes, clusterPreferredNodes) match {
// case (Nil, Nil)
// LocalScope
// case (_, Nil)
// // we have a 'remote' config section
// parseRemote
// case (Nil, _)
// // we have a 'cluster' config section
// parseCluster
// case (_, _) throw new ConfigurationException(
// "Configuration for deployment ID [" + path + "] can not have both 'remote' and 'cluster' sections.")
// }
val scope = (remoteNodes, clusterPreferredNodes) match { Deploy(path, recipe, router, nrOfInstances, LocalScope)
case (Nil, Nil)
LocalScope
case (_, Nil)
// we have a 'remote' config section
parseRemote
case (Nil, _)
// we have a 'cluster' config section
parseCluster
case (_, _) throw new ConfigurationException(
"Configuration for deployment ID [" + path + "] can not have both 'remote' and 'cluster' sections.")
}
Deploy(path, recipe, router, nrOfInstances, scope)
} }
private[akka] def throwDeploymentBoundException(deployment: Deploy): Nothing = { protected def throwDeploymentBoundException(deployment: Deploy): Nothing = {
val e = new DeploymentAlreadyBoundException("Path [" + deployment.path + "] already bound to [" + deployment + "]") val e = new DeploymentAlreadyBoundException("Path [" + deployment.path + "] already bound to [" + deployment + "]")
log.error(e, e.getMessage) log.error(e, e.getMessage)
throw e throw e
} }
private[akka] def thrownNoDeploymentBoundException(path: String): Nothing = { protected def thrownNoDeploymentBoundException(path: String): Nothing = {
val e = new NoDeploymentBoundException("Path [" + path + "] is not bound to a deployment") val e = new NoDeploymentBoundException("Path [" + path + "] is not bound to a deployment")
log.error(e, e.getMessage) log.error(e, e.getMessage)
throw e throw e
@ -275,13 +249,13 @@ class Deployer(val settings: ActorSystem.Settings, val eventStream: EventStream,
class LocalDeployer extends ActorDeployer { class LocalDeployer extends ActorDeployer {
private val deployments = new ConcurrentHashMap[String, Deploy] private val deployments = new ConcurrentHashMap[String, Deploy]
private[akka] def init(deployments: Seq[Deploy]): Unit = deployments foreach deploy // deploy def init(deployments: Seq[Deploy]): Unit = deployments foreach deploy // deploy
private[akka] def shutdown(): Unit = deployments.clear() //TODO do something else/more? def shutdown(): Unit = deployments.clear() //TODO do something else/more?
private[akka] def deploy(deployment: Deploy): Unit = deployments.putIfAbsent(deployment.path, deployment) def deploy(deployment: Deploy): Unit = deployments.putIfAbsent(deployment.path, deployment)
private[akka] def lookupDeploymentFor(path: String): Option[Deploy] = Option(deployments.get(path)) def lookupDeploymentFor(path: String): Option[Deploy] = Option(deployments.get(path))
} }
class DeploymentException private[akka] (message: String) extends AkkaException(message) class DeploymentException private[akka] (message: String) extends AkkaException(message)

View file

@ -6,7 +6,6 @@ package akka.actor
import akka.util.Duration import akka.util.Duration
import akka.routing.RouterType import akka.routing.RouterType
import akka.remote.RemoteAddress
object DeploymentConfig { object DeploymentConfig {
@ -52,7 +51,7 @@ object DeploymentConfig {
// -------------------------------- // --------------------------------
// --- Scope // --- Scope
// -------------------------------- // --------------------------------
sealed trait Scope trait Scope
// For Java API // For Java API
case class LocalScope() extends Scope case class LocalScope() extends Scope
@ -60,8 +59,6 @@ object DeploymentConfig {
// For Scala API // For Scala API
case object LocalScope extends Scope case object LocalScope extends Scope
case class RemoteScope(nodes: Iterable[RemoteAddress]) extends Scope
// -------------------------------- // --------------------------------
// --- Home // --- Home
// -------------------------------- // --------------------------------

View file

@ -9,8 +9,6 @@ import akka.actor._
import scala.annotation.tailrec import scala.annotation.tailrec
import java.util.concurrent.atomic.{ AtomicReference, AtomicInteger } import java.util.concurrent.atomic.{ AtomicReference, AtomicInteger }
import java.net.InetSocketAddress
import akka.remote.RemoteAddress
import collection.JavaConverters import collection.JavaConverters
/** /**
@ -69,16 +67,6 @@ trait ConnectionManager {
* @param ref the dead * @param ref the dead
*/ */
def remove(deadRef: ActorRef) def remove(deadRef: ActorRef)
/**
* Creates a new connection (ActorRef) if it didn't exist. Atomically.
*/
def putIfAbsent(address: RemoteAddress, newConnectionFactory: () ActorRef): ActorRef
/**
* Fails over connections from one address to another.
*/
def failOver(from: RemoteAddress, to: RemoteAddress)
} }
/** /**
@ -125,10 +113,4 @@ class LocalConnectionManager(initialConnections: Iterable[ActorRef]) extends Con
if (!state.compareAndSet(oldState, newState)) remove(ref) if (!state.compareAndSet(oldState, newState)) remove(ref)
} }
} }
def failOver(from: RemoteAddress, to: RemoteAddress) {} // do nothing here
def putIfAbsent(address: RemoteAddress, newConnectionFactory: () ActorRef): ActorRef = {
throw new UnsupportedOperationException("Not supported")
}
} }

View file

@ -161,7 +161,7 @@ class Gossiper(remote: Remote) {
node oldAvailableNodes node oldAvailableNodes
if connectionManager.connectionFor(node).isEmpty if connectionManager.connectionFor(node).isEmpty
} { } {
val connectionFactory = () RemoteActorRef(remote.system.provider, remote.server, gossipingNode, remote.remoteDaemon.path, None) val connectionFactory = () new RemoteActorRef(remote.system.provider, remote.server, RootActorPath(gossipingNode) / remote.remoteDaemon.path.elements, None)
connectionManager.putIfAbsent(node, connectionFactory) // create a new remote connection to the new node connectionManager.putIfAbsent(node, connectionFactory) // create a new remote connection to the new node
oldState.nodeMembershipChangeListeners foreach (_ nodeConnected node) // notify listeners about the new nodes oldState.nodeMembershipChangeListeners foreach (_ nodeConnected node) // notify listeners about the new nodes
} }

View file

@ -62,19 +62,22 @@ class RemoteActorRefProvider(
val remoteAddress = RemoteAddress(system.name, remoteExtension.serverSettings.Hostname, remoteExtension.serverSettings.Port) val remoteAddress = RemoteAddress(system.name, remoteExtension.serverSettings.Hostname, remoteExtension.serverSettings.Port)
new RootActorPath(remoteAddress) new RootActorPath(remoteAddress)
} }
private lazy val local = new LocalActorRefProvider(systemName, settings, eventStream, scheduler, _deadLetters) private lazy val local = new LocalActorRefProvider(systemName, settings, eventStream, scheduler, _deadLetters, rootPath)
private[akka] lazy val remote = new Remote(system, nodename) private[akka] lazy val remote = {
val r = new Remote(system, nodename)
terminationFuture.onComplete(_ r.server.shutdown())
r
}
private lazy val remoteDaemonConnectionManager = new RemoteConnectionManager(system, remote) private lazy val remoteDaemonConnectionManager = new RemoteConnectionManager(system, remote)
def init(_system: ActorSystemImpl) { def init(_system: ActorSystemImpl) {
system = _system system = _system
local.init(_system) local.init(_system)
terminationFuture.onComplete(_ remote.server.shutdown())
} }
private[akka] def terminationFuture = local.terminationFuture private[akka] def terminationFuture = local.terminationFuture
private[akka] def deployer: Deployer = local.deployer private[akka] def deployer: Deployer = new RemoteDeployer(settings, eventStream, nodename)
def dispatcher = local.dispatcher def dispatcher = local.dispatcher
def defaultTimeout = settings.ActorTimeout def defaultTimeout = settings.ActorTimeout
@ -89,7 +92,7 @@ class RemoteActorRefProvider(
case null case null
val actor: InternalActorRef = try { val actor: InternalActorRef = try {
deployer.lookupDeploymentFor(path.toString) match { deployer.lookupDeploymentFor(path.toString) match {
case Some(DeploymentConfig.Deploy(_, _, routerType, nrOfInstances, DeploymentConfig.RemoteScope(remoteAddresses))) case Some(DeploymentConfig.Deploy(_, _, routerType, nrOfInstances, RemoteDeploymentConfig.RemoteScope(remoteAddresses)))
def isReplicaNode: Boolean = remoteAddresses exists { _ == remote.remoteAddress } def isReplicaNode: Boolean = remoteAddresses exists { _ == remote.remoteAddress }
@ -142,8 +145,7 @@ class RemoteActorRefProvider(
} }
val connections = (Map.empty[RemoteAddress, ActorRef] /: remoteAddresses) { (conns, a) val connections = (Map.empty[RemoteAddress, ActorRef] /: remoteAddresses) { (conns, a)
val remoteAddress = RemoteAddress(system.name, a.host, a.port) conns + (a -> new RemoteActorRef(this, remote.server, path, None)) // FIXME RK correct path must be put in here
conns + (remoteAddress -> RemoteActorRef(remote.system.provider, remote.server, remoteAddress, path, None))
} }
val connectionManager = new RemoteConnectionManager(system, remote, connections) val connectionManager = new RemoteConnectionManager(system, remote, connections)
@ -180,8 +182,19 @@ class RemoteActorRefProvider(
new RoutedActorRef(system, props, supervisor, name) new RoutedActorRef(system, props, supervisor, name)
} }
def actorFor(path: ActorPath): InternalActorRef = local.actorFor(path) def actorFor(path: ActorPath): InternalActorRef = path.root match {
def actorFor(ref: InternalActorRef, path: String): InternalActorRef = local.actorFor(ref, path) case `rootPath` actorFor(rootGuardian, path.elements)
case RootActorPath(_: RemoteAddress, _) new RemoteActorRef(this, remote.server, path, None)
case _ local.actorFor(path)
}
def actorFor(ref: InternalActorRef, path: String): InternalActorRef = path match {
case RemoteActorPath(address, elems)
if (address == rootPath.address) actorFor(rootGuardian, elems)
else new RemoteActorRef(this, remote.server, new RootActorPath(address) / elems, None)
case _ local.actorFor(ref, path)
}
def actorFor(ref: InternalActorRef, path: Iterable[String]): InternalActorRef = local.actorFor(ref, path) def actorFor(ref: InternalActorRef, path: Iterable[String]): InternalActorRef = local.actorFor(ref, path)
// TODO remove me // TODO remove me
@ -257,11 +270,10 @@ class RemoteActorRefProvider(
* *
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a> * @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/ */
private[akka] case class RemoteActorRef private[akka] ( private[akka] class RemoteActorRef private[akka] (
provider: ActorRefProvider, provider: ActorRefProvider,
remote: RemoteSupport, remote: RemoteSupport,
remoteAddress: RemoteAddress, val path: ActorPath,
path: ActorPath,
loader: Option[ClassLoader]) loader: Option[ClassLoader])
extends InternalActorRef { extends InternalActorRef {
@ -276,7 +288,7 @@ private[akka] case class RemoteActorRef private[akka] (
def sendSystemMessage(message: SystemMessage): Unit = throw new UnsupportedOperationException("Not supported for RemoteActorRef") def sendSystemMessage(message: SystemMessage): Unit = throw new UnsupportedOperationException("Not supported for RemoteActorRef")
override def !(message: Any)(implicit sender: ActorRef = null): Unit = remote.send(message, Option(sender), remoteAddress, this, loader) override def !(message: Any)(implicit sender: ActorRef = null): Unit = remote.send(message, Option(sender), this, loader)
override def ?(message: Any)(implicit timeout: Timeout): Future[Any] = provider.ask(message, this, timeout) override def ?(message: Any)(implicit timeout: Timeout): Future[Any] = provider.ask(message, this, timeout)
@ -288,7 +300,7 @@ private[akka] case class RemoteActorRef private[akka] (
synchronized { synchronized {
if (running) { if (running) {
running = false running = false
remote.send(new Terminate(), None, remoteAddress, this, loader) remote.send(new Terminate(), None, this, loader)
} }
} }
} }

View file

@ -149,5 +149,5 @@ class RemoteConnectionManager(
} }
private[remote] def newConnection(remoteAddress: RemoteAddress, actorPath: ActorPath) = private[remote] def newConnection(remoteAddress: RemoteAddress, actorPath: ActorPath) =
RemoteActorRef(remote.system.provider, remote.server, remoteAddress, actorPath, None) new RemoteActorRef(remote.system.provider, remote.server, actorPath, None)
} }

View file

@ -0,0 +1,66 @@
/**
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.remote
import akka.actor._
import akka.actor.DeploymentConfig._
import akka.event.EventStream
import com.typesafe.config._
import akka.config.ConfigurationException
object RemoteDeploymentConfig {
case class RemoteScope(nodes: Iterable[RemoteAddress]) extends DeploymentConfig.Scope
}
class RemoteDeployer(_settings: ActorSystem.Settings, _eventStream: EventStream, _nodename: String)
extends Deployer(_settings, _eventStream, _nodename) {
import RemoteDeploymentConfig._
override protected def lookupInConfig(path: String, configuration: Config = settings.config): Deploy = {
import scala.collection.JavaConverters._
import akka.util.ReflectiveAccess._
val defaultDeploymentConfig = configuration.getConfig("akka.actor.deployment.default")
// --------------------------------
// akka.actor.deployment.<path>
// --------------------------------
val deploymentKey = "akka.actor.deployment." + path
val deployment = configuration.getConfig(deploymentKey)
val deploymentWithFallback = deployment.withFallback(defaultDeploymentConfig)
val remoteNodes = deploymentWithFallback.getStringList("remote.nodes").asScala.toSeq
// --------------------------------
// akka.actor.deployment.<path>.remote
// --------------------------------
def parseRemote: Scope = {
def raiseRemoteNodeParsingError() = throw new ConfigurationException(
"Config option [" + deploymentKey +
".remote.nodes] needs to be a list with elements on format \"<hostname>:<port>\", was [" + remoteNodes.mkString(", ") + "]")
val remoteAddresses = remoteNodes map { node
val tokenizer = new java.util.StringTokenizer(node, ":")
val hostname = tokenizer.nextElement.toString
if ((hostname eq null) || (hostname == "")) raiseRemoteNodeParsingError()
val port = try tokenizer.nextElement.toString.toInt catch {
case e: Exception raiseRemoteNodeParsingError()
}
if (port == 0) raiseRemoteNodeParsingError()
RemoteAddress(settings.name, hostname, port)
}
RemoteScope(remoteAddresses)
}
val local = super.lookupInConfig(path, configuration)
if (remoteNodes.isEmpty) local else local.copy(scope = parseRemote)
}
}

View file

@ -175,8 +175,7 @@ abstract class RemoteSupport(val system: ActorSystem) {
protected[akka] def send(message: Any, protected[akka] def send(message: Any,
senderOption: Option[ActorRef], senderOption: Option[ActorRef],
remoteAddress: RemoteAddress, recipient: RemoteActorRef,
recipient: ActorRef,
loader: Option[ClassLoader]): Unit loader: Option[ClassLoader]): Unit
protected[akka] def notifyListeners(message: RemoteLifeCycleEvent): Unit = system.eventStream.publish(message) protected[akka] def notifyListeners(message: RemoteLifeCycleEvent): Unit = system.eventStream.publish(message)

View file

@ -367,10 +367,11 @@ class NettyRemoteSupport(_system: ActorSystem, val remote: Remote) extends Remot
protected[akka] def send( protected[akka] def send(
message: Any, message: Any,
senderOption: Option[ActorRef], senderOption: Option[ActorRef],
recipientAddress: RemoteAddress, recipient: RemoteActorRef,
recipient: ActorRef,
loader: Option[ClassLoader]): Unit = { loader: Option[ClassLoader]): Unit = {
val recipientAddress = recipient.path.address.asInstanceOf[RemoteAddress]
clientsLock.readLock.lock clientsLock.readLock.lock
try { try {
val client = remoteClients.get(recipientAddress) match { val client = remoteClients.get(recipientAddress) match {

View file

@ -0,0 +1,108 @@
/**
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.remote
import akka.testkit._
import akka.actor._
import com.typesafe.config._
import akka.actor.DeploymentConfig._
import akka.remote.RemoteDeploymentConfig.RemoteScope
object RemoteDeployerSpec {
val deployerConf = ConfigFactory.parseString("""
akka.actor.provider = "akka.remote.RemoteActorRefProvider"
akka.cluster.nodename = Whatever
akka.actor.deployment {
/user/service1 {
}
/user/service2 {
router = round-robin
nr-of-instances = 3
remote {
nodes = ["wallace:2552", "gromit:2552"]
}
}
/user/service3 {
create-as {
class = "akka.actor.DeployerSpec$RecipeActor"
}
}
/user/service-auto {
router = round-robin
nr-of-instances = auto
}
/user/service-direct {
router = direct
}
/user/service-direct2 {
router = direct
# nr-of-instances ignored when router = direct
nr-of-instances = 2
}
/user/service-round-robin {
router = round-robin
}
/user/service-random {
router = random
}
/user/service-scatter-gather {
router = scatter-gather
}
/user/service-least-cpu {
router = least-cpu
}
/user/service-least-ram {
router = least-ram
}
/user/service-least-messages {
router = least-messages
}
/user/service-custom {
router = org.my.Custom
}
/user/service-cluster1 {
cluster {
preferred-nodes = ["node:wallace", "node:gromit"]
}
}
/user/service-cluster2 {
cluster {
preferred-nodes = ["node:wallace", "node:gromit"]
replication {
strategy = write-behind
}
}
}
}
""", ConfigParseOptions.defaults)
class RecipeActor extends Actor {
def receive = { case _ }
}
}
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class RemoteDeployerSpec extends AkkaSpec(RemoteDeployerSpec.deployerConf) {
"A RemoteDeployer" must {
"be able to parse 'akka.actor.deployment._' with specified remote nodes" in {
val service = "/user/service2"
val deployment = system.asInstanceOf[ActorSystemImpl].provider.deployer.lookupDeployment(service)
deployment must be('defined)
deployment must be(Some(
Deploy(
service,
None,
RoundRobin,
NrOfInstances(3),
RemoteScope(Seq(
RemoteAddress(system.name, "wallace", 2552), RemoteAddress(system.name, "gromit", 2552))))))
}
}
}