2011-05-17 21:15:27 +02:00
|
|
|
/**
|
2012-01-19 18:21:06 +01:00
|
|
|
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
|
2011-05-17 21:15:27 +02:00
|
|
|
*/
|
|
|
|
|
|
|
|
|
|
package akka.routing
|
|
|
|
|
|
2011-05-18 17:25:30 +02:00
|
|
|
import scala.collection.immutable.{ TreeSet, Seq }
|
|
|
|
|
import scala.collection.mutable.{ Buffer, Map }
|
2011-05-17 21:15:27 +02:00
|
|
|
|
|
|
|
|
// =============================================================================================
|
|
|
|
|
// Adapted from HashRing.scala in Debasish Ghosh's Redis Client, licensed under Apache 2 license
|
|
|
|
|
// =============================================================================================
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Consistent Hashing node ring abstraction.
|
|
|
|
|
*
|
|
|
|
|
* Not thread-safe, to be used from within an Actor or protected some other way.
|
|
|
|
|
*/
|
|
|
|
|
class ConsistentHash[T](nodes: Seq[T], replicas: Int) {
|
2011-05-18 17:25:30 +02:00
|
|
|
private val cluster = Buffer[T]()
|
2011-05-17 21:15:27 +02:00
|
|
|
private var sortedKeys = TreeSet[Long]()
|
2011-05-18 17:25:30 +02:00
|
|
|
private var ring = Map[Long, T]()
|
2011-05-17 21:15:27 +02:00
|
|
|
|
|
|
|
|
nodes.foreach(this += _)
|
|
|
|
|
|
|
|
|
|
def +=(node: T) {
|
|
|
|
|
cluster += node
|
2011-05-18 17:25:30 +02:00
|
|
|
(1 to replicas) foreach { replica ⇒
|
2011-05-17 21:15:27 +02:00
|
|
|
val key = hashFor((node + ":" + replica).getBytes("UTF-8"))
|
|
|
|
|
ring += (key -> node)
|
|
|
|
|
sortedKeys = sortedKeys + key
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
def -=(node: T) {
|
|
|
|
|
cluster -= node
|
2011-05-18 17:25:30 +02:00
|
|
|
(1 to replicas) foreach { replica ⇒
|
2011-05-17 21:15:27 +02:00
|
|
|
val key = hashFor((node + ":" + replica).getBytes("UTF-8"))
|
|
|
|
|
ring -= key
|
|
|
|
|
sortedKeys = sortedKeys - key
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
def nodeFor(key: Array[Byte]): T = {
|
|
|
|
|
val hash = hashFor(key)
|
|
|
|
|
if (sortedKeys contains hash) ring(hash)
|
|
|
|
|
else {
|
2011-05-18 17:25:30 +02:00
|
|
|
if (hash < sortedKeys.firstKey) ring(sortedKeys.firstKey)
|
|
|
|
|
else if (hash > sortedKeys.lastKey) ring(sortedKeys.lastKey)
|
|
|
|
|
else ring(sortedKeys.rangeImpl(None, Some(hash)).lastKey)
|
2011-05-17 21:15:27 +02:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private def hashFor(bytes: Array[Byte]): Long = {
|
|
|
|
|
val hash = MurmurHash.arrayHash(bytes)
|
|
|
|
|
if (hash == Int.MinValue) hash + 1
|
|
|
|
|
math.abs(hash)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/* __ *\
|
|
|
|
|
** ________ ___ / / ___ Scala API **
|
|
|
|
|
** / __/ __// _ | / / / _ | (c) 2003-2011, LAMP/EPFL **
|
|
|
|
|
** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ **
|
|
|
|
|
** /____/\___/_/ |_/____/_/ | | **
|
|
|
|
|
** |/ **
|
|
|
|
|
\* */
|
|
|
|
|
|
2011-05-18 17:25:30 +02:00
|
|
|
/**
|
|
|
|
|
* An implementation of Austin Appleby's MurmurHash 3.0 algorithm
|
2011-05-17 21:15:27 +02:00
|
|
|
* (32 bit version); reference: http://code.google.com/p/smhasher
|
|
|
|
|
*
|
|
|
|
|
* This is the hash used by collections and case classes (including
|
|
|
|
|
* tuples).
|
|
|
|
|
*
|
|
|
|
|
* @author Rex Kerr
|
|
|
|
|
* @version 2.9
|
|
|
|
|
* @since 2.9
|
|
|
|
|
*/
|
|
|
|
|
|
2011-05-18 17:25:30 +02:00
|
|
|
import java.lang.Integer.{ rotateLeft ⇒ rotl }
|
2011-05-17 21:15:27 +02:00
|
|
|
|
2011-05-18 17:25:30 +02:00
|
|
|
/**
|
|
|
|
|
* A class designed to generate well-distributed non-cryptographic
|
2011-05-17 21:15:27 +02:00
|
|
|
* hashes. It is designed to be passed to a collection's foreach method,
|
|
|
|
|
* or can take individual hash values with append. Its own hash code is
|
|
|
|
|
* set equal to the hash code of whatever it is hashing.
|
|
|
|
|
*/
|
2011-05-18 17:25:30 +02:00
|
|
|
class MurmurHash[@specialized(Int, Long, Float, Double) T](seed: Int) extends (T ⇒ Unit) {
|
2011-05-17 21:15:27 +02:00
|
|
|
import MurmurHash._
|
|
|
|
|
|
|
|
|
|
private var h = startHash(seed)
|
|
|
|
|
private var c = hiddenMagicA
|
|
|
|
|
private var k = hiddenMagicB
|
|
|
|
|
private var hashed = false
|
|
|
|
|
private var hashvalue = h
|
|
|
|
|
|
|
|
|
|
/** Begin a new hash using the same seed. */
|
|
|
|
|
def reset() {
|
|
|
|
|
h = startHash(seed)
|
|
|
|
|
c = hiddenMagicA
|
|
|
|
|
k = hiddenMagicB
|
|
|
|
|
hashed = false
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/** Incorporate the hash value of one item. */
|
|
|
|
|
def apply(t: T) {
|
2011-05-18 17:25:30 +02:00
|
|
|
h = extendHash(h, t.##, c, k)
|
2011-05-17 21:15:27 +02:00
|
|
|
c = nextMagicA(c)
|
|
|
|
|
k = nextMagicB(k)
|
|
|
|
|
hashed = false
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/** Incorporate a known hash value. */
|
|
|
|
|
def append(i: Int) {
|
2011-05-18 17:25:30 +02:00
|
|
|
h = extendHash(h, i, c, k)
|
2011-05-17 21:15:27 +02:00
|
|
|
c = nextMagicA(c)
|
|
|
|
|
k = nextMagicB(k)
|
|
|
|
|
hashed = false
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/** Retrieve the hash value */
|
|
|
|
|
def hash = {
|
|
|
|
|
if (!hashed) {
|
|
|
|
|
hashvalue = finalizeHash(h)
|
|
|
|
|
hashed = true
|
|
|
|
|
}
|
|
|
|
|
hashvalue
|
|
|
|
|
}
|
|
|
|
|
override def hashCode = hash
|
|
|
|
|
}
|
|
|
|
|
|
2011-05-18 17:25:30 +02:00
|
|
|
/**
|
|
|
|
|
* An object designed to generate well-distributed non-cryptographic
|
2011-05-17 21:15:27 +02:00
|
|
|
* hashes. It is designed to hash a collection of integers; along with
|
|
|
|
|
* the integers to hash, it generates two magic streams of integers to
|
|
|
|
|
* increase the distribution of repetitive input sequences. Thus,
|
|
|
|
|
* three methods need to be called at each step (to start and to
|
|
|
|
|
* incorporate a new integer) to update the values. Only one method
|
|
|
|
|
* needs to be called to finalize the hash.
|
|
|
|
|
*/
|
|
|
|
|
|
|
|
|
|
object MurmurHash {
|
|
|
|
|
// Magic values used for MurmurHash's 32 bit hash.
|
|
|
|
|
// Don't change these without consulting a hashing expert!
|
|
|
|
|
final private val visibleMagic = 0x971e137b
|
|
|
|
|
final private val hiddenMagicA = 0x95543787
|
|
|
|
|
final private val hiddenMagicB = 0x2ad7eb25
|
|
|
|
|
final private val visibleMixer = 0x52dce729
|
|
|
|
|
final private val hiddenMixerA = 0x7b7d159c
|
|
|
|
|
final private val hiddenMixerB = 0x6bce6396
|
|
|
|
|
final private val finalMixer1 = 0x85ebca6b
|
|
|
|
|
final private val finalMixer2 = 0xc2b2ae35
|
|
|
|
|
|
|
|
|
|
// Arbitrary values used for hashing certain classes
|
|
|
|
|
final private val seedString = 0xf7ca7fd2
|
|
|
|
|
final private val seedArray = 0x3c074a61
|
|
|
|
|
|
|
|
|
|
/** The first 23 magic integers from the first stream are stored here */
|
|
|
|
|
val storedMagicA =
|
|
|
|
|
Iterator.iterate(hiddenMagicA)(nextMagicA).take(23).toArray
|
|
|
|
|
|
|
|
|
|
/** The first 23 magic integers from the second stream are stored here */
|
|
|
|
|
val storedMagicB =
|
|
|
|
|
Iterator.iterate(hiddenMagicB)(nextMagicB).take(23).toArray
|
|
|
|
|
|
|
|
|
|
/** Begin a new hash with a seed value. */
|
|
|
|
|
def startHash(seed: Int) = seed ^ visibleMagic
|
|
|
|
|
|
|
|
|
|
/** The initial magic integers in the first stream. */
|
|
|
|
|
def startMagicA = hiddenMagicA
|
|
|
|
|
|
|
|
|
|
/** The initial magic integer in the second stream. */
|
|
|
|
|
def startMagicB = hiddenMagicB
|
|
|
|
|
|
2011-05-18 17:25:30 +02:00
|
|
|
/**
|
|
|
|
|
* Incorporates a new value into an existing hash.
|
2011-05-17 21:15:27 +02:00
|
|
|
*
|
|
|
|
|
* @param hash the prior hash value
|
|
|
|
|
* @param value the new value to incorporate
|
|
|
|
|
* @param magicA a magic integer from the stream
|
|
|
|
|
* @param magicB a magic integer from a different stream
|
|
|
|
|
* @return the updated hash value
|
|
|
|
|
*/
|
|
|
|
|
def extendHash(hash: Int, value: Int, magicA: Int, magicB: Int) = {
|
2011-05-18 17:25:30 +02:00
|
|
|
(hash ^ rotl(value * magicA, 11) * magicB) * 3 + visibleMixer
|
2011-05-17 21:15:27 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/** Given a magic integer from the first stream, compute the next */
|
2011-05-18 17:25:30 +02:00
|
|
|
def nextMagicA(magicA: Int) = magicA * 5 + hiddenMixerA
|
2011-05-17 21:15:27 +02:00
|
|
|
|
|
|
|
|
/** Given a magic integer from the second stream, compute the next */
|
2011-05-18 17:25:30 +02:00
|
|
|
def nextMagicB(magicB: Int) = magicB * 5 + hiddenMixerB
|
2011-05-17 21:15:27 +02:00
|
|
|
|
|
|
|
|
/** Once all hashes have been incorporated, this performs a final mixing */
|
|
|
|
|
def finalizeHash(hash: Int) = {
|
2011-05-18 17:25:30 +02:00
|
|
|
var i = (hash ^ (hash >>> 16))
|
2011-05-17 21:15:27 +02:00
|
|
|
i *= finalMixer1
|
|
|
|
|
i ^= (i >>> 13)
|
|
|
|
|
i *= finalMixer2
|
|
|
|
|
i ^= (i >>> 16)
|
|
|
|
|
i
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/** Compute a high-quality hash of an array */
|
|
|
|
|
def arrayHash[@specialized T](a: Array[T]) = {
|
|
|
|
|
var h = startHash(a.length * seedArray)
|
|
|
|
|
var c = hiddenMagicA
|
|
|
|
|
var k = hiddenMagicB
|
|
|
|
|
var j = 0
|
|
|
|
|
while (j < a.length) {
|
|
|
|
|
h = extendHash(h, a(j).##, c, k)
|
|
|
|
|
c = nextMagicA(c)
|
|
|
|
|
k = nextMagicB(k)
|
|
|
|
|
j += 1
|
|
|
|
|
}
|
|
|
|
|
finalizeHash(h)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/** Compute a high-quality hash of a string */
|
|
|
|
|
def stringHash(s: String) = {
|
|
|
|
|
var h = startHash(s.length * seedString)
|
|
|
|
|
var c = hiddenMagicA
|
|
|
|
|
var k = hiddenMagicB
|
|
|
|
|
var j = 0
|
2011-05-18 17:25:30 +02:00
|
|
|
while (j + 1 < s.length) {
|
|
|
|
|
val i = (s.charAt(j) << 16) + s.charAt(j + 1);
|
|
|
|
|
h = extendHash(h, i, c, k)
|
2011-05-17 21:15:27 +02:00
|
|
|
c = nextMagicA(c)
|
|
|
|
|
k = nextMagicB(k)
|
|
|
|
|
j += 2
|
|
|
|
|
}
|
2011-05-18 17:25:30 +02:00
|
|
|
if (j < s.length) h = extendHash(h, s.charAt(j), c, k)
|
2011-05-17 21:15:27 +02:00
|
|
|
finalizeHash(h)
|
|
|
|
|
}
|
|
|
|
|
|
2011-05-18 17:25:30 +02:00
|
|
|
/**
|
|
|
|
|
* Compute a hash that is symmetric in its arguments--that is,
|
2011-05-17 21:15:27 +02:00
|
|
|
* where the order of appearance of elements does not matter.
|
|
|
|
|
* This is useful for hashing sets, for example.
|
|
|
|
|
*/
|
|
|
|
|
def symmetricHash[T](xs: TraversableOnce[T], seed: Int) = {
|
2011-05-18 17:25:30 +02:00
|
|
|
var a, b, n = 0
|
2011-05-17 21:15:27 +02:00
|
|
|
var c = 1
|
2011-05-18 17:25:30 +02:00
|
|
|
xs.foreach(i ⇒ {
|
2011-05-17 21:15:27 +02:00
|
|
|
val h = i.##
|
|
|
|
|
a += h
|
|
|
|
|
b ^= h
|
|
|
|
|
if (h != 0) c *= h
|
|
|
|
|
n += 1
|
|
|
|
|
})
|
|
|
|
|
var h = startHash(seed * n)
|
|
|
|
|
h = extendHash(h, a, storedMagicA(0), storedMagicB(0))
|
|
|
|
|
h = extendHash(h, b, storedMagicA(1), storedMagicB(1))
|
|
|
|
|
h = extendHash(h, c, storedMagicA(2), storedMagicB(2))
|
|
|
|
|
finalizeHash(h)
|
|
|
|
|
}
|
|
|
|
|
}
|