diff --git a/akka-actor/src/main/scala/akka/routing/ConsistentHash.scala b/akka-actor/src/main/scala/akka/routing/ConsistentHash.scala index ae14612e99..21f9bffbce 100644 --- a/akka-actor/src/main/scala/akka/routing/ConsistentHash.scala +++ b/akka-actor/src/main/scala/akka/routing/ConsistentHash.scala @@ -5,6 +5,7 @@ package akka.routing import scala.collection.immutable.TreeMap +import java.lang.Integer.{ rotateLeft ⇒ rotl } /* __ *\ ** ________ ___ / / ___ Scala API ** @@ -26,8 +27,6 @@ import scala.collection.immutable.TreeMap * @since 2.9 */ -import java.lang.Integer.{ rotateLeft ⇒ rotl } - // ============================================================================================= // Adapted from HashRing.scala in Debasish Ghosh's Redis Client, licensed under Apache 2 license // ============================================================================================= @@ -41,44 +40,46 @@ import java.lang.Integer.{ rotateLeft ⇒ rotl } * Note that toString of the ring nodes are used for the node * hash, i.e. make sure it is different for different nodes. * - * Not thread-safe, to be used from within an Actor or protected some other way. */ -class ConsistentHash[T](nodes: Iterable[T], replicas: Int) { - private var ring = TreeMap[Int, T]() +class ConsistentHash[T] private (replicas: Int, ring: TreeMap[Int, T]) { + + import ConsistentHash._ if (replicas < 1) throw new IllegalArgumentException("replicas must be >= 1") - nodes.foreach(this += _) - /** * Adds a node to the ring. + * Note that the instance is immutable and this + * operation returns a new instance. */ - def +=(node: T): Unit = { - (1 to replicas) foreach { replica ⇒ - ring += (nodeHashFor(node, replica) -> node) - } + def :+(node: T): ConsistentHash[T] = { + new ConsistentHash(replicas, ring ++ ((1 to replicas) map { r ⇒ (nodeHashFor(node, r) -> node) })) } /** * Adds a node to the ring. + * Note that the instance is immutable and this + * operation returns a new instance. * JAVA API */ - def add(node: T): Unit = this += node + def add(node: T): ConsistentHash[T] = this :+ node /** * Removes a node from the ring. + * Note that the instance is immutable and this + * operation returns a new instance. */ - def -=(node: T): Unit = { - (1 to replicas) foreach { replica ⇒ - ring -= nodeHashFor(node, replica) - } + def :-(node: T): ConsistentHash[T] = { + new ConsistentHash(replicas, ring -- ((1 to replicas) map { r ⇒ nodeHashFor(node, r) })) } /** * Removes a node from the ring. + * Note that the instance is immutable and this + * operation returns a new instance. * JAVA API */ - def remove(node: T): Unit = this -= node + def remove(node: T): ConsistentHash[T] = this :- node /** * Get the node responsible for the data key. @@ -100,7 +101,24 @@ class ConsistentHash[T](nodes: Iterable[T], replicas: Int) { */ def isEmpty: Boolean = ring.isEmpty - private def nodeHashFor(node: T, replica: Int): Int = { +} + +object ConsistentHash { + def apply[T](nodes: Iterable[T], replicas: Int) = { + new ConsistentHash(replicas, TreeMap.empty[Int, T] ++ + (for (node ← nodes; replica ← 1 to replicas) yield (nodeHashFor(node, replica) -> node))) + } + + /** + * Factory method to create a ConsistentHash + * JAVA API + */ + def create[T](nodes: java.lang.Iterable[T], replicas: Int) = { + import scala.collection.JavaConverters._ + apply(nodes.asScala, replicas) + } + + private def nodeHashFor(node: Any, replica: Int): Int = { hashFor((node + ":" + replica).getBytes("UTF-8")) } @@ -189,11 +207,11 @@ object MurmurHash { final private val seedArray: Int = 0x3c074a61 /** The first 23 magic integers from the first stream are stored here */ - val storedMagicA: Array[Int] = + private val storedMagicA: Array[Int] = Iterator.iterate(hiddenMagicA)(nextMagicA).take(23).toArray /** The first 23 magic integers from the second stream are stored here */ - val storedMagicB: Array[Int] = + private val storedMagicB: Array[Int] = Iterator.iterate(hiddenMagicB)(nextMagicB).take(23).toArray /** Begin a new hash with a seed value. */ diff --git a/akka-docs/project/migration-guide-2.0.x-2.1.x.rst b/akka-docs/project/migration-guide-2.0.x-2.1.x.rst index fd5629ea3b..2a9f925f7f 100644 --- a/akka-docs/project/migration-guide-2.0.x-2.1.x.rst +++ b/akka-docs/project/migration-guide-2.0.x-2.1.x.rst @@ -286,7 +286,6 @@ Both Actors and UntypedActors using ``Stash`` now overrides postStop to make sur stashed messages are put into the dead letters when the actor stops, make sure you call super.postStop if you override it. - Forward of Terminated message ============================= @@ -359,3 +358,20 @@ v2.1:: -requestedCapacity, stopDelay) +ConsistentHash +============== + +``akka.routing.ConsistentHash`` has been changed to an immutable data structure. + +v2.0:: + + val consistentHash = new ConsistentHash(Seq(a1, a2, a3), replicas = 10) + consistentHash += a4 + val a = consistentHash.nodeFor(data) + +v2.1:: + + var consistentHash = ConsistentHash(Seq(a1, a2, a3), replicas = 10) + consistentHash = consistentHash :+ a4 + val a = consistentHash.nodeFor(data) +