diff --git a/akka-actor/src/main/scala/akka/routing/ConsistentHash.scala b/akka-actor/src/main/scala/akka/routing/ConsistentHash.scala index d3bef92e6c..819fea2586 100644 --- a/akka-actor/src/main/scala/akka/routing/ConsistentHash.scala +++ b/akka-actor/src/main/scala/akka/routing/ConsistentHash.scala @@ -120,8 +120,10 @@ object ConsistentHash { apply(nodes.asScala, virtualNodesFactor)(ClassTag(classOf[Any].asInstanceOf[Class[T]])) } - private def nodeHashFor(node: Any, vnode: Int): Int = - hashFor((node + ":" + vnode).getBytes("UTF-8")) + private def nodeHashFor(node: Any, vnode: Int): Int = { + val baseStr = node.toString + ":" + hashFor(baseStr + vnode) + } private def hashFor(bytes: Array[Byte]): Int = MurmurHash.arrayHash(bytes) diff --git a/akka-actor/src/main/scala/akka/routing/ConsistentHashingRouter.scala b/akka-actor/src/main/scala/akka/routing/ConsistentHashingRouter.scala index 71a36be2b7..ca9027c8e8 100644 --- a/akka-actor/src/main/scala/akka/routing/ConsistentHashingRouter.scala +++ b/akka-actor/src/main/scala/akka/routing/ConsistentHashingRouter.scala @@ -12,6 +12,8 @@ import akka.dispatch.Dispatchers import akka.event.Logging import akka.serialization.SerializationExtension import java.util.concurrent.atomic.AtomicReference +import akka.actor.Address +import akka.actor.ExtendedActorSystem object ConsistentHashingRouter { /** @@ -238,20 +240,22 @@ trait ConsistentHashingLike { this: RouterConfig ⇒ } val log = Logging(routeeProvider.context.system, routeeProvider.context.self) + val selfAddress = routeeProvider.context.system.asInstanceOf[ExtendedActorSystem].provider.rootPath.address val vnodes = if (virtualNodesFactor == 0) routeeProvider.context.system.settings.DefaultVirtualNodesFactor else virtualNodesFactor // tuple of routees and the ConsistentHash, updated together in updateConsistentHash - val consistentHashRef = new AtomicReference[(IndexedSeq[ActorRef], ConsistentHash[ActorRef])]((null, null)) + val consistentHashRef = new AtomicReference[(IndexedSeq[ConsistentActorRef], ConsistentHash[ConsistentActorRef])]((null, null)) updateConsistentHash() // update consistentHash when routees has changed // changes to routees are rare and when no changes this is a quick operation - def updateConsistentHash(): ConsistentHash[ActorRef] = { + def updateConsistentHash(): ConsistentHash[ConsistentActorRef] = { val oldConsistentHashTuple = consistentHashRef.get val (oldConsistentHashRoutees, oldConsistentHash) = oldConsistentHashTuple - val currentRoutees = routeeProvider.routees + val currentRoutees = routeeProvider.routees map { ConsistentActorRef(_, selfAddress) } + if (currentRoutees ne oldConsistentHashRoutees) { // when other instance, same content, no need to re-hash, but try to set routees val consistentHash = @@ -267,9 +271,9 @@ trait ConsistentHashingLike { this: RouterConfig ⇒ val currentConsistenHash = updateConsistentHash() if (currentConsistenHash.isEmpty) routeeProvider.context.system.deadLetters else hashData match { - case bytes: Array[Byte] ⇒ currentConsistenHash.nodeFor(bytes) - case str: String ⇒ currentConsistenHash.nodeFor(str) - case x: AnyRef ⇒ currentConsistenHash.nodeFor(SerializationExtension(routeeProvider.context.system).serialize(x).get) + case bytes: Array[Byte] ⇒ currentConsistenHash.nodeFor(bytes).actorRef + case str: String ⇒ currentConsistenHash.nodeFor(str).actorRef + case x: AnyRef ⇒ currentConsistenHash.nodeFor(SerializationExtension(routeeProvider.context.system).serialize(x).get).actorRef } } catch { case NonFatal(e) ⇒ @@ -294,4 +298,21 @@ trait ConsistentHashingLike { this: RouterConfig ⇒ } } +} + +/** + * INTERNAL API + * Important to use ActorRef with full address, with host and port, in the hash ring, + * so that same ring is produced on different nodes. + * The ConsistentHash uses toString of the ring nodes, and the ActorRef itself + * isn't a good representation, because LocalActorRef doesn't include the + * host and port. + */ +private[akka] case class ConsistentActorRef(actorRef: ActorRef, selfAddress: Address) { + override def toString: String = { + actorRef.path.address match { + case Address(_, _, None, None) ⇒ actorRef.path.toStringWithAddress(selfAddress) + case a ⇒ actorRef.path.toString + } + } } \ No newline at end of file diff --git a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala index 1d9ad9edc2..4967978582 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala @@ -32,7 +32,9 @@ class RemoteActorRefProvider( private var _log = local.log def log: LoggingAdapter = _log - override def rootPath: ActorPath = local.rootPath + @volatile + private var _rootPath = local.rootPath + override def rootPath: ActorPath = _rootPath override def deadLetters: InternalActorRef = local.deadLetters // these are only available after init() @@ -61,7 +63,7 @@ class RemoteActorRefProvider( def init(system: ActorSystemImpl): Unit = { local.init(system) - _remoteDaemon = new RemoteSystemDaemon(system, rootPath / "remote", rootGuardian, log, untrustedMode = remoteSettings.UntrustedMode) + _remoteDaemon = new RemoteSystemDaemon(system, local.rootPath / "remote", rootGuardian, log, untrustedMode = remoteSettings.UntrustedMode) local.registerExtraNames(Map(("remote", remoteDaemon))) _serialization = SerializationExtension(system) @@ -82,6 +84,8 @@ class RemoteActorRefProvider( // this enables reception of remote requests _transport.start() + _rootPath = RootActorPath(local.rootPath.address.copy(host = transport.address.host, port = transport.address.port)) + val remoteClientLifeCycleHandler = system.systemActorOf(Props(new Actor { def receive = { case RemoteClientError(cause, remote, address) ⇒ remote.shutdownClientConnection(address) @@ -154,7 +158,7 @@ class RemoteActorRefProvider( Iterator(props.deploy) ++ deployment.iterator reduce ((a, b) ⇒ b withFallback a) match { case d @ Deploy(_, _, _, RemoteScope(addr)) ⇒ - if (addr == rootPath.address || addr == transport.address) { + if (isSelfAddress(addr)) { local.actorOf(system, props, supervisor, path, false, deployment.headOption, false, async) } else { val rpath = RootActorPath(addr) / "remote" / transport.address.hostPort / path.elements @@ -167,12 +171,12 @@ class RemoteActorRefProvider( } def actorFor(path: ActorPath): InternalActorRef = - if (path.address == rootPath.address || path.address == transport.address) actorFor(rootGuardian, path.elements) + if (isSelfAddress(path.address)) actorFor(rootGuardian, path.elements) else new RemoteActorRef(this, transport, path, Nobody, props = None, deploy = None) def actorFor(ref: InternalActorRef, path: String): InternalActorRef = path match { case ActorPathExtractor(address, elems) ⇒ - if (address == rootPath.address || address == transport.address) actorFor(rootGuardian, elems) + if (isSelfAddress(address)) actorFor(rootGuardian, elems) else new RemoteActorRef(this, transport, new RootActorPath(address) / elems, Nobody, props = None, deploy = None) case _ ⇒ local.actorFor(ref, path) } @@ -190,14 +194,16 @@ class RemoteActorRefProvider( } def getExternalAddressFor(addr: Address): Option[Address] = { - val ta = transport.address - val ra = rootPath.address addr match { - case `ta` | `ra` ⇒ Some(rootPath.address) + case _ if isSelfAddress(addr) ⇒ Some(local.rootPath.address) case Address("akka", _, Some(_), Some(_)) ⇒ Some(transport.address) case _ ⇒ None } } + + private def isSelfAddress(address: Address): Boolean = + address == local.rootPath.address || address == rootPath.address || address == transport.address + } private[akka] trait RemoteRef extends ActorRefScope {