Make ConsistentHash immutable, see #944

* Makes it thread safe
* Also changed two scary (mutable Array) fields in MurmurHash to private
* Note in migration guide about changed api for ConsistentHash
This commit is contained in:
Patrik Nordwall 2012-09-09 16:47:43 +02:00
parent 705c118ea2
commit 521d20ba73
2 changed files with 55 additions and 21 deletions

View file

@ -5,6 +5,7 @@
package akka.routing
import scala.collection.immutable.TreeMap
import java.lang.Integer.{ rotateLeft rotl }
/* __ *\
** ________ ___ / / ___ Scala API **
@ -26,8 +27,6 @@ import scala.collection.immutable.TreeMap
* @since 2.9
*/
import java.lang.Integer.{ rotateLeft rotl }
// =============================================================================================
// Adapted from HashRing.scala in Debasish Ghosh's Redis Client, licensed under Apache 2 license
// =============================================================================================
@ -41,44 +40,46 @@ import java.lang.Integer.{ rotateLeft ⇒ rotl }
* Note that toString of the ring nodes are used for the node
* hash, i.e. make sure it is different for different nodes.
*
* Not thread-safe, to be used from within an Actor or protected some other way.
*/
class ConsistentHash[T](nodes: Iterable[T], replicas: Int) {
private var ring = TreeMap[Int, T]()
class ConsistentHash[T] private (replicas: Int, ring: TreeMap[Int, T]) {
import ConsistentHash._
if (replicas < 1) throw new IllegalArgumentException("replicas must be >= 1")
nodes.foreach(this += _)
/**
* Adds a node to the ring.
* Note that the instance is immutable and this
* operation returns a new instance.
*/
def +=(node: T): Unit = {
(1 to replicas) foreach { replica
ring += (nodeHashFor(node, replica) -> node)
}
def :+(node: T): ConsistentHash[T] = {
new ConsistentHash(replicas, ring ++ ((1 to replicas) map { r (nodeHashFor(node, r) -> node) }))
}
/**
* Adds a node to the ring.
* Note that the instance is immutable and this
* operation returns a new instance.
* JAVA API
*/
def add(node: T): Unit = this += node
def add(node: T): ConsistentHash[T] = this :+ node
/**
* Removes a node from the ring.
* Note that the instance is immutable and this
* operation returns a new instance.
*/
def -=(node: T): Unit = {
(1 to replicas) foreach { replica
ring -= nodeHashFor(node, replica)
}
def :-(node: T): ConsistentHash[T] = {
new ConsistentHash(replicas, ring -- ((1 to replicas) map { r nodeHashFor(node, r) }))
}
/**
* Removes a node from the ring.
* Note that the instance is immutable and this
* operation returns a new instance.
* JAVA API
*/
def remove(node: T): Unit = this -= node
def remove(node: T): ConsistentHash[T] = this :- node
/**
* Get the node responsible for the data key.
@ -100,7 +101,24 @@ class ConsistentHash[T](nodes: Iterable[T], replicas: Int) {
*/
def isEmpty: Boolean = ring.isEmpty
private def nodeHashFor(node: T, replica: Int): Int = {
}
object ConsistentHash {
def apply[T](nodes: Iterable[T], replicas: Int) = {
new ConsistentHash(replicas, TreeMap.empty[Int, T] ++
(for (node nodes; replica 1 to replicas) yield (nodeHashFor(node, replica) -> node)))
}
/**
* Factory method to create a ConsistentHash
* JAVA API
*/
def create[T](nodes: java.lang.Iterable[T], replicas: Int) = {
import scala.collection.JavaConverters._
apply(nodes.asScala, replicas)
}
private def nodeHashFor(node: Any, replica: Int): Int = {
hashFor((node + ":" + replica).getBytes("UTF-8"))
}
@ -189,11 +207,11 @@ object MurmurHash {
final private val seedArray: Int = 0x3c074a61
/** The first 23 magic integers from the first stream are stored here */
val storedMagicA: Array[Int] =
private val storedMagicA: Array[Int] =
Iterator.iterate(hiddenMagicA)(nextMagicA).take(23).toArray
/** The first 23 magic integers from the second stream are stored here */
val storedMagicB: Array[Int] =
private val storedMagicB: Array[Int] =
Iterator.iterate(hiddenMagicB)(nextMagicB).take(23).toArray
/** Begin a new hash with a seed value. */

View file

@ -286,7 +286,6 @@ Both Actors and UntypedActors using ``Stash`` now overrides postStop to make sur
stashed messages are put into the dead letters when the actor stops, make sure you call
super.postStop if you override it.
Forward of Terminated message
=============================
@ -359,3 +358,20 @@ v2.1::
-requestedCapacity, stopDelay)
ConsistentHash
==============
``akka.routing.ConsistentHash`` has been changed to an immutable data structure.
v2.0::
val consistentHash = new ConsistentHash(Seq(a1, a2, a3), replicas = 10)
consistentHash += a4
val a = consistentHash.nodeFor(data)
v2.1::
var consistentHash = ConsistentHash(Seq(a1, a2, a3), replicas = 10)
consistentHash = consistentHash :+ a4
val a = consistentHash.nodeFor(data)