diff --git a/akka-actor/src/main/scala/akka/routing/ConsistentHash.scala b/akka-actor/src/main/scala/akka/routing/ConsistentHash.scala index bc86f5f82d..fca0837662 100644 --- a/akka-actor/src/main/scala/akka/routing/ConsistentHash.scala +++ b/akka-actor/src/main/scala/akka/routing/ConsistentHash.scala @@ -24,10 +24,9 @@ class ConsistentHash[T] private (nodes: Map[Int, T], virtualNodesFactor: Int) { if (virtualNodesFactor < 1) throw new IllegalArgumentException("virtualNodesFactor must be >= 1") // sorted hash values of the nodes - private val nodeRing: Array[Int] = { - val nodeRing = nodes.keys.toArray - Arrays.sort(nodeRing) - nodeRing + private val (nodeHashRing: Array[Int], nodeRing: Vector[T]) = { + val (nhr: IndexedSeq[Int], nr: IndexedSeq[AnyRef]) = nodes.toArray.sortBy(_._1).unzip + (nhr.toArray, Vector[T]() ++ nr) } /** @@ -62,6 +61,17 @@ class ConsistentHash[T] private (nodes: Map[Int, T], virtualNodesFactor: Int) { */ def remove(node: T): ConsistentHash[T] = this :- node + // converts the result of Arrays.binarySearch into a index in the nodeRing array + // see documentation of Arrays.binarySearch for what it returns + private def idx(i: Int): Int = { + if (i >= 0) i // exact match + else { + val j = math.abs(i + 1) + if (j >= nodeHashRing.length) 0 // after last, use first + else j // next node clockwise + } + } + /** * Get the node responsible for the data key. * Can only be used if nodes exists in the node ring, @@ -70,18 +80,18 @@ class ConsistentHash[T] private (nodes: Map[Int, T], virtualNodesFactor: Int) { def nodeFor(key: Array[Byte]): T = { if (isEmpty) throw new IllegalStateException("Can't get node for [%s] from an empty node ring" format key) - // converts the result of Arrays.binarySearch into a index in the nodeRing array - // see documentation of Arrays.binarySearch for what it returns - def idx(i: Int): Int = { - if (i >= 0) i // exact match - else { - val j = math.abs(i + 1) - if (j >= nodeRing.length) 0 // after last, use first - else j // next node clockwise - } - } - val nodeRingIndex = idx(Arrays.binarySearch(nodeRing, hashFor(key))) - nodes(nodeRing(nodeRingIndex)) + nodeRing(idx(Arrays.binarySearch(nodeHashRing, hashFor(key)))) + } + + /** + * Get the node responsible for the data key. + * Can only be used if nodes exists in the node ring, + * otherwise throws `IllegalStateException` + */ + def nodeFor(key: String): T = { + if (isEmpty) throw new IllegalStateException("Can't get node for [%s] from an empty node ring" format key) + + nodeRing(idx(Arrays.binarySearch(nodeHashRing, hashFor(key)))) } /** @@ -111,5 +121,6 @@ object ConsistentHash { hashFor((node + ":" + vnode).getBytes("UTF-8")) private def hashFor(bytes: Array[Byte]): Int = MurmurHash.arrayHash(bytes) -} + private def hashFor(string: String): Int = MurmurHash.stringHash(string) +} diff --git a/akka-actor/src/main/scala/akka/routing/ConsistentHashingRouter.scala b/akka-actor/src/main/scala/akka/routing/ConsistentHashingRouter.scala index 4937691d88..3b9802d7fd 100644 --- a/akka-actor/src/main/scala/akka/routing/ConsistentHashingRouter.scala +++ b/akka-actor/src/main/scala/akka/routing/ConsistentHashingRouter.scala @@ -264,14 +264,13 @@ trait ConsistentHashingLike { this: RouterConfig ⇒ } def target(hashData: Any): ActorRef = try { - val hash = hashData match { - case bytes: Array[Byte] ⇒ bytes - case str: String ⇒ str.getBytes("UTF-8") - case x: AnyRef ⇒ SerializationExtension(routeeProvider.context.system).serialize(x).get - } val currentConsistenHash = updateConsistentHash() if (currentConsistenHash.isEmpty) routeeProvider.context.system.deadLetters - else currentConsistenHash.nodeFor(hash) + else hashData match { + case bytes: Array[Byte] ⇒ currentConsistenHash.nodeFor(bytes) + case str: String ⇒ currentConsistenHash.nodeFor(str) + case x: AnyRef ⇒ currentConsistenHash.nodeFor(SerializationExtension(routeeProvider.context.system).serialize(x).get) + } } catch { case NonFatal(e) ⇒ // serialization failed