Added consistent hashing abstraction class
This commit is contained in:
parent
d5a912e74b
commit
cfea06c5d5
5 changed files with 381 additions and 137 deletions
255
akka-actor/src/main/scala/akka/routing/ConsistentHash.scala
Normal file
255
akka-actor/src/main/scala/akka/routing/ConsistentHash.scala
Normal file
|
|
@ -0,0 +1,255 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
|
||||
*/
|
||||
|
||||
package akka.routing
|
||||
|
||||
import scala.collection.immutable.{TreeSet, Seq}
|
||||
import scala.collection.mutable.{Buffer, Map}
|
||||
|
||||
// =============================================================================================
|
||||
// Adapted from HashRing.scala in Debasish Ghosh's Redis Client, licensed under Apache 2 license
|
||||
// =============================================================================================
|
||||
|
||||
/**
|
||||
* Consistent Hashing node ring abstraction.
|
||||
*
|
||||
* Not thread-safe, to be used from within an Actor or protected some other way.
|
||||
*
|
||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
*/
|
||||
class ConsistentHash[T](nodes: Seq[T], replicas: Int) {
|
||||
private val cluster = Buffer[T]()
|
||||
private var sortedKeys = TreeSet[Long]()
|
||||
private var ring = Map[Long, T]()
|
||||
|
||||
nodes.foreach(this += _)
|
||||
|
||||
def +=(node: T) {
|
||||
cluster += node
|
||||
(1 to replicas) foreach { replica =>
|
||||
val key = hashFor((node + ":" + replica).getBytes("UTF-8"))
|
||||
ring += (key -> node)
|
||||
sortedKeys = sortedKeys + key
|
||||
}
|
||||
}
|
||||
|
||||
def -=(node: T) {
|
||||
cluster -= node
|
||||
(1 to replicas) foreach { replica =>
|
||||
val key = hashFor((node + ":" + replica).getBytes("UTF-8"))
|
||||
ring -= key
|
||||
sortedKeys = sortedKeys - key
|
||||
}
|
||||
}
|
||||
|
||||
def nodeFor(key: Array[Byte]): T = {
|
||||
val hash = hashFor(key)
|
||||
if (sortedKeys contains hash) ring(hash)
|
||||
else {
|
||||
if (hash < sortedKeys.firstKey) ring(sortedKeys.firstKey)
|
||||
else if (hash > sortedKeys.lastKey) ring(sortedKeys.lastKey)
|
||||
else ring(sortedKeys.rangeImpl(None, Some(hash)).lastKey)
|
||||
}
|
||||
}
|
||||
|
||||
private def hashFor(bytes: Array[Byte]): Long = {
|
||||
val hash = MurmurHash.arrayHash(bytes)
|
||||
if (hash == Int.MinValue) hash + 1
|
||||
math.abs(hash)
|
||||
}
|
||||
}
|
||||
|
||||
/* __ *\
|
||||
** ________ ___ / / ___ Scala API **
|
||||
** / __/ __// _ | / / / _ | (c) 2003-2011, LAMP/EPFL **
|
||||
** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ **
|
||||
** /____/\___/_/ |_/____/_/ | | **
|
||||
** |/ **
|
||||
\* */
|
||||
|
||||
/** An implementation of Austin Appleby's MurmurHash 3.0 algorithm
|
||||
* (32 bit version); reference: http://code.google.com/p/smhasher
|
||||
*
|
||||
* This is the hash used by collections and case classes (including
|
||||
* tuples).
|
||||
*
|
||||
* @author Rex Kerr
|
||||
* @version 2.9
|
||||
* @since 2.9
|
||||
*/
|
||||
|
||||
import java.lang.Integer.{ rotateLeft => rotl }
|
||||
|
||||
/** A class designed to generate well-distributed non-cryptographic
|
||||
* hashes. It is designed to be passed to a collection's foreach method,
|
||||
* or can take individual hash values with append. Its own hash code is
|
||||
* set equal to the hash code of whatever it is hashing.
|
||||
*/
|
||||
class MurmurHash[@specialized(Int,Long,Float,Double) T](seed: Int) extends (T => Unit) {
|
||||
import MurmurHash._
|
||||
|
||||
private var h = startHash(seed)
|
||||
private var c = hiddenMagicA
|
||||
private var k = hiddenMagicB
|
||||
private var hashed = false
|
||||
private var hashvalue = h
|
||||
|
||||
/** Begin a new hash using the same seed. */
|
||||
def reset() {
|
||||
h = startHash(seed)
|
||||
c = hiddenMagicA
|
||||
k = hiddenMagicB
|
||||
hashed = false
|
||||
}
|
||||
|
||||
/** Incorporate the hash value of one item. */
|
||||
def apply(t: T) {
|
||||
h = extendHash(h,t.##,c,k)
|
||||
c = nextMagicA(c)
|
||||
k = nextMagicB(k)
|
||||
hashed = false
|
||||
}
|
||||
|
||||
/** Incorporate a known hash value. */
|
||||
def append(i: Int) {
|
||||
h = extendHash(h,i,c,k)
|
||||
c = nextMagicA(c)
|
||||
k = nextMagicB(k)
|
||||
hashed = false
|
||||
}
|
||||
|
||||
/** Retrieve the hash value */
|
||||
def hash = {
|
||||
if (!hashed) {
|
||||
hashvalue = finalizeHash(h)
|
||||
hashed = true
|
||||
}
|
||||
hashvalue
|
||||
}
|
||||
override def hashCode = hash
|
||||
}
|
||||
|
||||
/** An object designed to generate well-distributed non-cryptographic
|
||||
* hashes. It is designed to hash a collection of integers; along with
|
||||
* the integers to hash, it generates two magic streams of integers to
|
||||
* increase the distribution of repetitive input sequences. Thus,
|
||||
* three methods need to be called at each step (to start and to
|
||||
* incorporate a new integer) to update the values. Only one method
|
||||
* needs to be called to finalize the hash.
|
||||
*/
|
||||
|
||||
object MurmurHash {
|
||||
// Magic values used for MurmurHash's 32 bit hash.
|
||||
// Don't change these without consulting a hashing expert!
|
||||
final private val visibleMagic = 0x971e137b
|
||||
final private val hiddenMagicA = 0x95543787
|
||||
final private val hiddenMagicB = 0x2ad7eb25
|
||||
final private val visibleMixer = 0x52dce729
|
||||
final private val hiddenMixerA = 0x7b7d159c
|
||||
final private val hiddenMixerB = 0x6bce6396
|
||||
final private val finalMixer1 = 0x85ebca6b
|
||||
final private val finalMixer2 = 0xc2b2ae35
|
||||
|
||||
// Arbitrary values used for hashing certain classes
|
||||
final private val seedString = 0xf7ca7fd2
|
||||
final private val seedArray = 0x3c074a61
|
||||
|
||||
/** The first 23 magic integers from the first stream are stored here */
|
||||
val storedMagicA =
|
||||
Iterator.iterate(hiddenMagicA)(nextMagicA).take(23).toArray
|
||||
|
||||
/** The first 23 magic integers from the second stream are stored here */
|
||||
val storedMagicB =
|
||||
Iterator.iterate(hiddenMagicB)(nextMagicB).take(23).toArray
|
||||
|
||||
/** Begin a new hash with a seed value. */
|
||||
def startHash(seed: Int) = seed ^ visibleMagic
|
||||
|
||||
/** The initial magic integers in the first stream. */
|
||||
def startMagicA = hiddenMagicA
|
||||
|
||||
/** The initial magic integer in the second stream. */
|
||||
def startMagicB = hiddenMagicB
|
||||
|
||||
/** Incorporates a new value into an existing hash.
|
||||
*
|
||||
* @param hash the prior hash value
|
||||
* @param value the new value to incorporate
|
||||
* @param magicA a magic integer from the stream
|
||||
* @param magicB a magic integer from a different stream
|
||||
* @return the updated hash value
|
||||
*/
|
||||
def extendHash(hash: Int, value: Int, magicA: Int, magicB: Int) = {
|
||||
(hash ^ rotl(value*magicA,11)*magicB)*3 + visibleMixer
|
||||
}
|
||||
|
||||
/** Given a magic integer from the first stream, compute the next */
|
||||
def nextMagicA(magicA: Int) = magicA*5 + hiddenMixerA
|
||||
|
||||
/** Given a magic integer from the second stream, compute the next */
|
||||
def nextMagicB(magicB: Int) = magicB*5 + hiddenMixerB
|
||||
|
||||
/** Once all hashes have been incorporated, this performs a final mixing */
|
||||
def finalizeHash(hash: Int) = {
|
||||
var i = (hash ^ (hash>>>16))
|
||||
i *= finalMixer1
|
||||
i ^= (i >>> 13)
|
||||
i *= finalMixer2
|
||||
i ^= (i >>> 16)
|
||||
i
|
||||
}
|
||||
|
||||
/** Compute a high-quality hash of an array */
|
||||
def arrayHash[@specialized T](a: Array[T]) = {
|
||||
var h = startHash(a.length * seedArray)
|
||||
var c = hiddenMagicA
|
||||
var k = hiddenMagicB
|
||||
var j = 0
|
||||
while (j < a.length) {
|
||||
h = extendHash(h, a(j).##, c, k)
|
||||
c = nextMagicA(c)
|
||||
k = nextMagicB(k)
|
||||
j += 1
|
||||
}
|
||||
finalizeHash(h)
|
||||
}
|
||||
|
||||
/** Compute a high-quality hash of a string */
|
||||
def stringHash(s: String) = {
|
||||
var h = startHash(s.length * seedString)
|
||||
var c = hiddenMagicA
|
||||
var k = hiddenMagicB
|
||||
var j = 0
|
||||
while (j+1 < s.length) {
|
||||
val i = (s.charAt(j)<<16) + s.charAt(j+1);
|
||||
h = extendHash(h,i,c,k)
|
||||
c = nextMagicA(c)
|
||||
k = nextMagicB(k)
|
||||
j += 2
|
||||
}
|
||||
if (j < s.length) h = extendHash(h,s.charAt(j),c,k)
|
||||
finalizeHash(h)
|
||||
}
|
||||
|
||||
/** Compute a hash that is symmetric in its arguments--that is,
|
||||
* where the order of appearance of elements does not matter.
|
||||
* This is useful for hashing sets, for example.
|
||||
*/
|
||||
def symmetricHash[T](xs: TraversableOnce[T], seed: Int) = {
|
||||
var a,b,n = 0
|
||||
var c = 1
|
||||
xs.foreach(i => {
|
||||
val h = i.##
|
||||
a += h
|
||||
b ^= h
|
||||
if (h != 0) c *= h
|
||||
n += 1
|
||||
})
|
||||
var h = startHash(seed * n)
|
||||
h = extendHash(h, a, storedMagicA(0), storedMagicB(0))
|
||||
h = extendHash(h, b, storedMagicA(1), storedMagicB(1))
|
||||
h = extendHash(h, c, storedMagicA(2), storedMagicB(2))
|
||||
finalizeHash(h)
|
||||
}
|
||||
}
|
||||
|
|
@ -1,48 +0,0 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
|
||||
*/
|
||||
|
||||
package akka.routing
|
||||
|
||||
import akka.actor.ActorRef
|
||||
import scala.collection.JavaConversions._
|
||||
import scala.collection.immutable.Seq
|
||||
|
||||
/**
|
||||
* An Iterator that is either always empty or yields an infinite number of Ts.
|
||||
*/
|
||||
trait InfiniteIterator[T] extends Iterator[T] {
|
||||
val items: Seq[T]
|
||||
}
|
||||
|
||||
/**
|
||||
* CyclicIterator is a round-robin style InfiniteIterator that cycles the supplied List.
|
||||
*/
|
||||
case class CyclicIterator[T](val items: Seq[T]) extends InfiniteIterator[T] {
|
||||
def this(items: java.util.List[T]) = this(items.toList)
|
||||
|
||||
@volatile private[this] var current: Seq[T] = items
|
||||
|
||||
def hasNext = items != Nil
|
||||
|
||||
def next = {
|
||||
val nc = if (current == Nil) items else current
|
||||
current = nc.tail
|
||||
nc.head
|
||||
}
|
||||
|
||||
override def exists(f: T => Boolean): Boolean = items.exists(f)
|
||||
}
|
||||
|
||||
/**
|
||||
* This InfiniteIterator always returns the Actor that has the currently smallest mailbox
|
||||
* useful for work-stealing.
|
||||
*/
|
||||
case class SmallestMailboxFirstIterator(val items : Seq[ActorRef]) extends InfiniteIterator[ActorRef] {
|
||||
def this(items: java.util.List[ActorRef]) = this(items.toList)
|
||||
def hasNext = items != Nil
|
||||
|
||||
def next = items.reduceLeft((a1, a2) => if (a1.mailboxSize < a2.mailboxSize) a1 else a2)
|
||||
|
||||
override def exists(f: ActorRef => Boolean): Boolean = items.exists(f)
|
||||
}
|
||||
|
|
@ -1,87 +0,0 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
|
||||
*/
|
||||
|
||||
package akka.routing
|
||||
|
||||
import akka.actor.{UntypedActor, Actor, ActorRef}
|
||||
|
||||
/**
|
||||
* A Dispatcher is a trait whose purpose is to route incoming messages to actors.
|
||||
*/
|
||||
trait Dispatcher { this: Actor =>
|
||||
|
||||
protected def transform(msg: Any): Any = msg
|
||||
|
||||
protected def routes: PartialFunction[Any, ActorRef]
|
||||
|
||||
protected def broadcast(message: Any) {}
|
||||
|
||||
protected def dispatch: Receive = {
|
||||
case Routing.Broadcast(message) =>
|
||||
broadcast(message)
|
||||
case a if routes.isDefinedAt(a) =>
|
||||
if (isSenderDefined) routes(a).forward(transform(a))(someSelf)
|
||||
else routes(a).!(transform(a))(None)
|
||||
}
|
||||
|
||||
def receive = dispatch
|
||||
|
||||
private def isSenderDefined = self.senderFuture.isDefined || self.sender.isDefined
|
||||
}
|
||||
|
||||
/**
|
||||
* An UntypedDispatcher is an abstract class whose purpose is to route incoming messages to actors.
|
||||
*/
|
||||
abstract class UntypedDispatcher extends UntypedActor {
|
||||
protected def transform(msg: Any): Any = msg
|
||||
|
||||
protected def route(msg: Any): ActorRef
|
||||
|
||||
protected def broadcast(message: Any) {}
|
||||
|
||||
private def isSenderDefined = self.senderFuture.isDefined || self.sender.isDefined
|
||||
|
||||
@throws(classOf[Exception])
|
||||
def onReceive(msg: Any): Unit = {
|
||||
if (msg.isInstanceOf[Routing.Broadcast]) broadcast(msg.asInstanceOf[Routing.Broadcast].message)
|
||||
else {
|
||||
val r = route(msg)
|
||||
if (r eq null) throw new IllegalStateException("No route for " + msg + " defined!")
|
||||
if (isSenderDefined) r.forward(transform(msg))(someSelf)
|
||||
else r.!(transform(msg))(None)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* A LoadBalancer is a specialized kind of Dispatcher, that is supplied an InfiniteIterator of targets
|
||||
* to dispatch incoming messages to.
|
||||
*/
|
||||
trait LoadBalancer extends Dispatcher { self: Actor =>
|
||||
protected def seq: InfiniteIterator[ActorRef]
|
||||
|
||||
protected def routes = {
|
||||
case x if seq.hasNext => seq.next
|
||||
}
|
||||
|
||||
override def broadcast(message: Any) = seq.items.foreach(_ ! message)
|
||||
|
||||
override def isDefinedAt(msg: Any) = seq.exists( _.isDefinedAt(msg) )
|
||||
}
|
||||
|
||||
/**
|
||||
* A UntypedLoadBalancer is a specialized kind of UntypedDispatcher, that is supplied an InfiniteIterator of targets
|
||||
* to dispatch incoming messages to.
|
||||
*/
|
||||
abstract class UntypedLoadBalancer extends UntypedDispatcher {
|
||||
protected def seq: InfiniteIterator[ActorRef]
|
||||
|
||||
protected def route(msg: Any) =
|
||||
if (seq.hasNext) seq.next
|
||||
else null
|
||||
|
||||
override def broadcast(message: Any) = seq.items.foreach(_ ! message)
|
||||
|
||||
override def isDefinedAt(msg: Any) = seq.exists( _.isDefinedAt(msg) )
|
||||
}
|
||||
|
|
@ -4,9 +4,13 @@
|
|||
|
||||
package akka.routing
|
||||
|
||||
import akka.actor.{Actor, ActorRef}
|
||||
import akka.actor.{UntypedActor, Actor, ActorRef}
|
||||
import akka.actor.Actor._
|
||||
|
||||
import akka.actor.ActorRef
|
||||
import scala.collection.JavaConversions._
|
||||
import scala.collection.immutable.Seq
|
||||
|
||||
object Routing {
|
||||
|
||||
sealed trait RoutingMessage
|
||||
|
|
@ -62,3 +66,123 @@ object Routing {
|
|||
def loggerActor(actorToLog: ActorRef, logger: (Any) => Unit): ActorRef =
|
||||
dispatcherActor({case _ => actorToLog}, logger)
|
||||
}
|
||||
|
||||
/**
|
||||
* An Iterator that is either always empty or yields an infinite number of Ts.
|
||||
*/
|
||||
trait InfiniteIterator[T] extends Iterator[T] {
|
||||
val items: Seq[T]
|
||||
}
|
||||
|
||||
/**
|
||||
* CyclicIterator is a round-robin style InfiniteIterator that cycles the supplied List.
|
||||
*/
|
||||
case class CyclicIterator[T](val items: Seq[T]) extends InfiniteIterator[T] {
|
||||
def this(items: java.util.List[T]) = this(items.toList)
|
||||
|
||||
@volatile private[this] var current: Seq[T] = items
|
||||
|
||||
def hasNext = items != Nil
|
||||
|
||||
def next = {
|
||||
val nc = if (current == Nil) items else current
|
||||
current = nc.tail
|
||||
nc.head
|
||||
}
|
||||
|
||||
override def exists(f: T => Boolean): Boolean = items.exists(f)
|
||||
}
|
||||
|
||||
/**
|
||||
* This InfiniteIterator always returns the Actor that has the currently smallest mailbox
|
||||
* useful for work-stealing.
|
||||
*/
|
||||
case class SmallestMailboxFirstIterator(val items : Seq[ActorRef]) extends InfiniteIterator[ActorRef] {
|
||||
def this(items: java.util.List[ActorRef]) = this(items.toList)
|
||||
def hasNext = items != Nil
|
||||
|
||||
def next = items.reduceLeft((a1, a2) => if (a1.mailboxSize < a2.mailboxSize) a1 else a2)
|
||||
|
||||
override def exists(f: ActorRef => Boolean): Boolean = items.exists(f)
|
||||
}
|
||||
|
||||
/**
|
||||
* A Dispatcher is a trait whose purpose is to route incoming messages to actors.
|
||||
*/
|
||||
trait Dispatcher { this: Actor =>
|
||||
|
||||
protected def transform(msg: Any): Any = msg
|
||||
|
||||
protected def routes: PartialFunction[Any, ActorRef]
|
||||
|
||||
protected def broadcast(message: Any) {}
|
||||
|
||||
protected def dispatch: Receive = {
|
||||
case Routing.Broadcast(message) =>
|
||||
broadcast(message)
|
||||
case a if routes.isDefinedAt(a) =>
|
||||
if (isSenderDefined) routes(a).forward(transform(a))(someSelf)
|
||||
else routes(a).!(transform(a))(None)
|
||||
}
|
||||
|
||||
def receive = dispatch
|
||||
|
||||
private def isSenderDefined = self.senderFuture.isDefined || self.sender.isDefined
|
||||
}
|
||||
|
||||
/**
|
||||
* An UntypedDispatcher is an abstract class whose purpose is to route incoming messages to actors.
|
||||
*/
|
||||
abstract class UntypedDispatcher extends UntypedActor {
|
||||
protected def transform(msg: Any): Any = msg
|
||||
|
||||
protected def route(msg: Any): ActorRef
|
||||
|
||||
protected def broadcast(message: Any) {}
|
||||
|
||||
private def isSenderDefined = self.senderFuture.isDefined || self.sender.isDefined
|
||||
|
||||
@throws(classOf[Exception])
|
||||
def onReceive(msg: Any): Unit = {
|
||||
if (msg.isInstanceOf[Routing.Broadcast]) broadcast(msg.asInstanceOf[Routing.Broadcast].message)
|
||||
else {
|
||||
val r = route(msg)
|
||||
if (r eq null) throw new IllegalStateException("No route for " + msg + " defined!")
|
||||
if (isSenderDefined) r.forward(transform(msg))(someSelf)
|
||||
else r.!(transform(msg))(None)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* A LoadBalancer is a specialized kind of Dispatcher, that is supplied an InfiniteIterator of targets
|
||||
* to dispatch incoming messages to.
|
||||
*/
|
||||
trait LoadBalancer extends Dispatcher { self: Actor =>
|
||||
protected def seq: InfiniteIterator[ActorRef]
|
||||
|
||||
protected def routes = {
|
||||
case x if seq.hasNext => seq.next
|
||||
}
|
||||
|
||||
override def broadcast(message: Any) = seq.items.foreach(_ ! message)
|
||||
|
||||
override def isDefinedAt(msg: Any) = seq.exists( _.isDefinedAt(msg) )
|
||||
}
|
||||
|
||||
/**
|
||||
* A UntypedLoadBalancer is a specialized kind of UntypedDispatcher, that is supplied an InfiniteIterator of targets
|
||||
* to dispatch incoming messages to.
|
||||
*/
|
||||
abstract class UntypedLoadBalancer extends UntypedDispatcher {
|
||||
protected def seq: InfiniteIterator[ActorRef]
|
||||
|
||||
protected def route(msg: Any) =
|
||||
if (seq.hasNext) seq.next
|
||||
else null
|
||||
|
||||
override def broadcast(message: Any) = seq.items.foreach(_ ! message)
|
||||
|
||||
override def isDefinedAt(msg: Any) = seq.exists( _.isDefinedAt(msg) )
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -6,7 +6,7 @@
|
|||
** |/ **
|
||||
\* */
|
||||
|
||||
package akka.cluster
|
||||
package akka.routing
|
||||
|
||||
/** An implementation of Austin Appleby's MurmurHash 3.0 algorithm
|
||||
* (32 bit version); reference: http://code.google.com/p/smhasher
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue