Use full address when hashing routees, see #2657

* Problem: ConsistentHashingRouter used on different nodes with remote
  and local routees doesn't route to same targets.
* Reason: Full address, with host and port was not used in the
  representation of the routees, which produced different  hash rings
  on different nodes.
* Solution: Fill in full address in the toString representation for
  LocalActorRef
* IMPORTANT: Discovered that rootPath of the provider didn't include the
  full address. It is documented that it should and I needed that to
  be able to grab the address without depending on remoting. This caused
  changes in RemoteActorRefProvider. Initialization order is a bit scary
  there.
This commit is contained in:
Patrik Nordwall 2012-10-26 11:42:18 +02:00
parent 3ee7dbcc45
commit 49500ab248
3 changed files with 45 additions and 16 deletions

View file

@ -32,7 +32,9 @@ class RemoteActorRefProvider(
private var _log = local.log
def log: LoggingAdapter = _log
override def rootPath: ActorPath = local.rootPath
@volatile
private var _rootPath = local.rootPath
override def rootPath: ActorPath = _rootPath
override def deadLetters: InternalActorRef = local.deadLetters
// these are only available after init()
@ -61,7 +63,7 @@ class RemoteActorRefProvider(
def init(system: ActorSystemImpl): Unit = {
local.init(system)
_remoteDaemon = new RemoteSystemDaemon(system, rootPath / "remote", rootGuardian, log, untrustedMode = remoteSettings.UntrustedMode)
_remoteDaemon = new RemoteSystemDaemon(system, local.rootPath / "remote", rootGuardian, log, untrustedMode = remoteSettings.UntrustedMode)
local.registerExtraNames(Map(("remote", remoteDaemon)))
_serialization = SerializationExtension(system)
@ -82,6 +84,8 @@ class RemoteActorRefProvider(
// this enables reception of remote requests
_transport.start()
_rootPath = RootActorPath(local.rootPath.address.copy(host = transport.address.host, port = transport.address.port))
val remoteClientLifeCycleHandler = system.systemActorOf(Props(new Actor {
def receive = {
case RemoteClientError(cause, remote, address) remote.shutdownClientConnection(address)
@ -154,7 +158,7 @@ class RemoteActorRefProvider(
Iterator(props.deploy) ++ deployment.iterator reduce ((a, b) b withFallback a) match {
case d @ Deploy(_, _, _, RemoteScope(addr))
if (addr == rootPath.address || addr == transport.address) {
if (isSelfAddress(addr)) {
local.actorOf(system, props, supervisor, path, false, deployment.headOption, false, async)
} else {
val rpath = RootActorPath(addr) / "remote" / transport.address.hostPort / path.elements
@ -167,12 +171,12 @@ class RemoteActorRefProvider(
}
def actorFor(path: ActorPath): InternalActorRef =
if (path.address == rootPath.address || path.address == transport.address) actorFor(rootGuardian, path.elements)
if (isSelfAddress(path.address)) actorFor(rootGuardian, path.elements)
else new RemoteActorRef(this, transport, path, Nobody, props = None, deploy = None)
def actorFor(ref: InternalActorRef, path: String): InternalActorRef = path match {
case ActorPathExtractor(address, elems)
if (address == rootPath.address || address == transport.address) actorFor(rootGuardian, elems)
if (isSelfAddress(address)) actorFor(rootGuardian, elems)
else new RemoteActorRef(this, transport, new RootActorPath(address) / elems, Nobody, props = None, deploy = None)
case _ local.actorFor(ref, path)
}
@ -190,14 +194,16 @@ class RemoteActorRefProvider(
}
def getExternalAddressFor(addr: Address): Option[Address] = {
val ta = transport.address
val ra = rootPath.address
addr match {
case `ta` | `ra` Some(rootPath.address)
case _ if isSelfAddress(addr) Some(local.rootPath.address)
case Address("akka", _, Some(_), Some(_)) Some(transport.address)
case _ None
}
}
private def isSelfAddress(address: Address): Boolean =
address == local.rootPath.address || address == rootPath.address || address == transport.address
}
private[akka] trait RemoteRef extends ActorRefScope {