diff --git a/akka-actor/src/main/scala/akka/actor/ActorSelection.scala b/akka-actor/src/main/scala/akka/actor/ActorSelection.scala index 9e67483077..5441997e5c 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorSelection.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorSelection.scala @@ -89,6 +89,16 @@ abstract class ActorSelection extends Serializable { builder.toString } + /** + * The [[akka.actor.ActorPath]] of the anchor actor. + */ + def anchorPath: ActorPath = anchor.path + + /** + * String representation of the path elements, starting with "/" and separated with "/". + */ + def pathString: String = path.mkString("/", "/", "") + override def equals(obj: Any): Boolean = obj match { case s: ActorSelection ⇒ this.anchor == s.anchor && this.path == s.path case _ ⇒ false diff --git a/akka-actor/src/main/scala/akka/routing/ConsistentHash.scala b/akka-actor/src/main/scala/akka/routing/ConsistentHash.scala index d4cc129b29..30c36096b5 100644 --- a/akka-actor/src/main/scala/akka/routing/ConsistentHash.scala +++ b/akka-actor/src/main/scala/akka/routing/ConsistentHash.scala @@ -37,8 +37,11 @@ class ConsistentHash[T: ClassTag] private (nodes: immutable.SortedMap[Int, T], v * Note that the instance is immutable and this * operation returns a new instance. */ - def :+(node: T): ConsistentHash[T] = - new ConsistentHash(nodes ++ ((1 to virtualNodesFactor) map { r ⇒ (nodeHashFor(node, r) -> node) }), virtualNodesFactor) + def :+(node: T): ConsistentHash[T] = { + val nodeHash = hashFor(node.toString) + new ConsistentHash(nodes ++ ((1 to virtualNodesFactor) map { r ⇒ (concatenateNodeHash(nodeHash, r) -> node) }), + virtualNodesFactor) + } /** * Java API: Adds a node to the node ring. @@ -52,8 +55,11 @@ class ConsistentHash[T: ClassTag] private (nodes: immutable.SortedMap[Int, T], v * Note that the instance is immutable and this * operation returns a new instance. */ - def :-(node: T): ConsistentHash[T] = - new ConsistentHash(nodes -- ((1 to virtualNodesFactor) map { r ⇒ nodeHashFor(node, r) }), virtualNodesFactor) + def :-(node: T): ConsistentHash[T] = { + val nodeHash = hashFor(node.toString) + new ConsistentHash(nodes -- ((1 to virtualNodesFactor) map { r ⇒ concatenateNodeHash(nodeHash, r) }), + virtualNodesFactor) + } /** * Java API: Removes a node from the node ring. @@ -105,7 +111,11 @@ class ConsistentHash[T: ClassTag] private (nodes: immutable.SortedMap[Int, T], v object ConsistentHash { def apply[T: ClassTag](nodes: Iterable[T], virtualNodesFactor: Int): ConsistentHash[T] = { new ConsistentHash(immutable.SortedMap.empty[Int, T] ++ - (for (node ← nodes; vnode ← 1 to virtualNodesFactor) yield (nodeHashFor(node, vnode) -> node)), + (for { + node ← nodes + nodeHash = hashFor(node.toString) + vnode ← 1 to virtualNodesFactor + } yield (concatenateNodeHash(nodeHash, vnode) -> node)), virtualNodesFactor) } @@ -117,9 +127,9 @@ object ConsistentHash { apply(nodes.asScala, virtualNodesFactor)(ClassTag(classOf[Any].asInstanceOf[Class[T]])) } - private def nodeHashFor(node: Any, vnode: Int): Int = { + private def concatenateNodeHash(nodeHash: Int, vnode: Int): Int = { import MurmurHash._ - var h = startHash(node.##) + var h = startHash(nodeHash) h = extendHash(h, vnode, startMagicA, startMagicB) finalizeHash(h) } diff --git a/akka-actor/src/main/scala/akka/routing/ConsistentHashing.scala b/akka-actor/src/main/scala/akka/routing/ConsistentHashing.scala index 298383a79c..76b0480f64 100644 --- a/akka-actor/src/main/scala/akka/routing/ConsistentHashing.scala +++ b/akka-actor/src/main/scala/akka/routing/ConsistentHashing.scala @@ -18,6 +18,7 @@ import akka.actor.ActorRef import akka.serialization.SerializationExtension import scala.util.control.NonFatal import akka.event.Logging +import akka.actor.ActorPath object ConsistentHashingRouter { @@ -416,15 +417,15 @@ final case class ConsistentHashingGroup( private[akka] case class ConsistentRoutee(routee: Routee, selfAddress: Address) { override def toString: String = routee match { - case ActorRefRoutee(ref) ⇒ toStringWithfullAddress(ref) - case ActorSelectionRoutee(sel) ⇒ toStringWithfullAddress(sel.anchor) + case ActorRefRoutee(ref) ⇒ toStringWithfullAddress(ref.path) + case ActorSelectionRoutee(sel) ⇒ toStringWithfullAddress(sel.anchorPath) + sel.pathString case other ⇒ other.toString } - private def toStringWithfullAddress(ref: ActorRef): String = { - ref.path.address match { - case Address(_, _, None, None) ⇒ ref.path.toStringWithAddress(selfAddress) - case a ⇒ ref.path.toString + private def toStringWithfullAddress(path: ActorPath): String = { + path.address match { + case Address(_, _, None, None) ⇒ path.toStringWithAddress(selfAddress) + case a ⇒ path.toString } } } diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/ClusterConsistentHashingGroupSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/ClusterConsistentHashingGroupSpec.scala new file mode 100644 index 0000000000..a211231d1d --- /dev/null +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/ClusterConsistentHashingGroupSpec.scala @@ -0,0 +1,98 @@ +/** + * Copyright (C) 2009-2014 Typesafe Inc. + */ +package akka.cluster.routing + +import scala.concurrent.Await +import scala.concurrent.duration._ +import akka.actor.Actor +import akka.actor.ActorRef +import akka.actor.Address +import akka.actor.Props +import akka.cluster.MultiNodeClusterSpec +import akka.pattern.ask +import akka.remote.testkit.MultiNodeConfig +import akka.remote.testkit.MultiNodeSpec +import akka.routing.Broadcast +import akka.routing.ConsistentHashingGroup +import akka.routing.ConsistentHashingRouter.ConsistentHashMapping +import akka.routing.GetRoutees +import akka.routing.Routees +import akka.testkit._ + +object ClusterConsistentHashingGroupMultiJvmSpec extends MultiNodeConfig { + + case object Get + case class Collected(messages: Set[Any]) + + class Destination extends Actor { + var receivedMessages = Set.empty[Any] + def receive = { + case Get ⇒ sender() ! Collected(receivedMessages) + case m ⇒ receivedMessages += m + } + } + + val first = role("first") + val second = role("second") + val third = role("third") + + commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig)) + +} + +class ClusterConsistentHashingGroupMultiJvmNode1 extends ClusterConsistentHashingGroupSpec +class ClusterConsistentHashingGroupMultiJvmNode2 extends ClusterConsistentHashingGroupSpec +class ClusterConsistentHashingGroupMultiJvmNode3 extends ClusterConsistentHashingGroupSpec + +abstract class ClusterConsistentHashingGroupSpec extends MultiNodeSpec(ClusterConsistentHashingGroupMultiJvmSpec) + with MultiNodeClusterSpec + with ImplicitSender with DefaultTimeout { + import ClusterConsistentHashingGroupMultiJvmSpec._ + + /** + * Fills in self address for local ActorRef + */ + private def fullAddress(actorRef: ActorRef): Address = actorRef.path.address match { + case Address(_, _, None, None) ⇒ cluster.selfAddress + case a ⇒ a + } + + def currentRoutees(router: ActorRef) = + Await.result(router ? GetRoutees, remaining).asInstanceOf[Routees].routees + + "A cluster router with a consistent hashing group" must { + "start cluster with 3 nodes" taggedAs LongRunningTest in { + system.actorOf(Props[Destination], "dest") + awaitClusterUp(first, second, third) + enterBarrier("after-1") + } + + "send to same destinations from different nodes" taggedAs LongRunningTest in { + def hashMapping: ConsistentHashMapping = { + case s: String ⇒ s + } + val paths = List("/user/dest") + val router = system.actorOf(ClusterRouterGroup(local = ConsistentHashingGroup(paths, hashMapping = hashMapping), + settings = ClusterRouterGroupSettings(totalInstances = 10, paths, allowLocalRoutees = true, useRole = None)).props(), + "router") + // it may take some time until router receives cluster member events + awaitAssert { currentRoutees(router).size should be(3) } + val keys = List("A", "B", "C", "D", "E", "F", "G") + for (_ ← 1 to 10; k ← keys) { router ! k } + enterBarrier("messages-sent") + router ! Broadcast(Get) + val a = expectMsgType[Collected].messages + val b = expectMsgType[Collected].messages + val c = expectMsgType[Collected].messages + + a.intersect(b) should be(Set.empty) + a.intersect(c) should be(Set.empty) + b.intersect(c) should be(Set.empty) + + (a.size + b.size + c.size) should be(keys.size) + enterBarrier("after-2") + } + + } +} diff --git a/akka-remote/src/test/scala/akka/remote/RemoteConsistentHashingRouterSpec.scala b/akka-remote/src/test/scala/akka/remote/RemoteConsistentHashingRouterSpec.scala new file mode 100644 index 0000000000..f844403951 --- /dev/null +++ b/akka-remote/src/test/scala/akka/remote/RemoteConsistentHashingRouterSpec.scala @@ -0,0 +1,37 @@ +/** + * Copyright (C) 2009-2014 Typesafe Inc. + */ +package akka.remote + +import akka.testkit.AkkaSpec +import akka.actor.Address +import akka.routing.ActorSelectionRoutee +import akka.routing.ConsistentRoutee +import akka.routing.ConsistentHash + +@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) +class RemoteConsistentHashingRouterSpec extends AkkaSpec(""" + akka.actor.provider = "akka.remote.RemoteActorRefProvider" """) { + + "ConsistentHashingGroup" must { + + "use same hash ring indepenent of self address" in { + // simulating running router on two different nodes (a1, a2) with target routees on 3 other nodes (s1, s2, s3) + val a1 = Address("akka.tcp", "Sys", "client1", 2552) + val a2 = Address("akka.tcp", "Sys", "client2", 2552) + val s1 = ActorSelectionRoutee(system.actorSelection("akka.tcp://Sys@server1:2552/user/a/b")) + val s2 = ActorSelectionRoutee(system.actorSelection("akka.tcp://Sys@server2:2552/user/a/b")) + val s3 = ActorSelectionRoutee(system.actorSelection("akka.tcp://Sys@server3:2552/user/a/b")) + val nodes1 = List(ConsistentRoutee(s1, a1), ConsistentRoutee(s2, a1), ConsistentRoutee(s3, a1)) + val nodes2 = List(ConsistentRoutee(s1, a2), ConsistentRoutee(s2, a2), ConsistentRoutee(s3, a2)) + val consistentHash1 = ConsistentHash(nodes1, 10) + val consistentHash2 = ConsistentHash(nodes2, 10) + val keys = List("A", "B", "C", "D", "E", "F", "G") + val result1 = keys collect { case k => consistentHash1.nodeFor(k).routee } + val result2 = keys collect { case k => consistentHash2.nodeFor(k).routee } + result1 should be(result2) + } + + } + +}