diff --git a/akka-actor/src/main/scala/akka/routing/ConsistentHash.scala b/akka-actor/src/main/scala/akka/routing/ConsistentHash.scala new file mode 100644 index 0000000000..2b89b127ee --- /dev/null +++ b/akka-actor/src/main/scala/akka/routing/ConsistentHash.scala @@ -0,0 +1,255 @@ +/** + * Copyright (C) 2009-2011 Scalable Solutions AB + */ + +package akka.routing + +import scala.collection.immutable.{TreeSet, Seq} +import scala.collection.mutable.{Buffer, Map} + +// ============================================================================================= +// Adapted from HashRing.scala in Debasish Ghosh's Redis Client, licensed under Apache 2 license +// ============================================================================================= + +/** + * Consistent Hashing node ring abstraction. + * + * Not thread-safe, to be used from within an Actor or protected some other way. + * + * @author Jonas Bonér + */ +class ConsistentHash[T](nodes: Seq[T], replicas: Int) { + private val cluster = Buffer[T]() + private var sortedKeys = TreeSet[Long]() + private var ring = Map[Long, T]() + + nodes.foreach(this += _) + + def +=(node: T) { + cluster += node + (1 to replicas) foreach { replica => + val key = hashFor((node + ":" + replica).getBytes("UTF-8")) + ring += (key -> node) + sortedKeys = sortedKeys + key + } + } + + def -=(node: T) { + cluster -= node + (1 to replicas) foreach { replica => + val key = hashFor((node + ":" + replica).getBytes("UTF-8")) + ring -= key + sortedKeys = sortedKeys - key + } + } + + def nodeFor(key: Array[Byte]): T = { + val hash = hashFor(key) + if (sortedKeys contains hash) ring(hash) + else { + if (hash < sortedKeys.firstKey) ring(sortedKeys.firstKey) + else if (hash > sortedKeys.lastKey) ring(sortedKeys.lastKey) + else ring(sortedKeys.rangeImpl(None, Some(hash)).lastKey) + } + } + + private def hashFor(bytes: Array[Byte]): Long = { + val hash = MurmurHash.arrayHash(bytes) + if (hash == Int.MinValue) hash + 1 + math.abs(hash) + } +} + +/* __ *\ +** ________ ___ / / ___ Scala API ** +** / __/ __// _ | / / / _ | (c) 2003-2011, LAMP/EPFL ** +** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ ** +** /____/\___/_/ |_/____/_/ | | ** +** |/ ** +\* */ + +/** An implementation of Austin Appleby's MurmurHash 3.0 algorithm + * (32 bit version); reference: http://code.google.com/p/smhasher + * + * This is the hash used by collections and case classes (including + * tuples). + * + * @author Rex Kerr + * @version 2.9 + * @since 2.9 + */ + +import java.lang.Integer.{ rotateLeft => rotl } + +/** A class designed to generate well-distributed non-cryptographic + * hashes. It is designed to be passed to a collection's foreach method, + * or can take individual hash values with append. Its own hash code is + * set equal to the hash code of whatever it is hashing. + */ +class MurmurHash[@specialized(Int,Long,Float,Double) T](seed: Int) extends (T => Unit) { + import MurmurHash._ + + private var h = startHash(seed) + private var c = hiddenMagicA + private var k = hiddenMagicB + private var hashed = false + private var hashvalue = h + + /** Begin a new hash using the same seed. */ + def reset() { + h = startHash(seed) + c = hiddenMagicA + k = hiddenMagicB + hashed = false + } + + /** Incorporate the hash value of one item. */ + def apply(t: T) { + h = extendHash(h,t.##,c,k) + c = nextMagicA(c) + k = nextMagicB(k) + hashed = false + } + + /** Incorporate a known hash value. */ + def append(i: Int) { + h = extendHash(h,i,c,k) + c = nextMagicA(c) + k = nextMagicB(k) + hashed = false + } + + /** Retrieve the hash value */ + def hash = { + if (!hashed) { + hashvalue = finalizeHash(h) + hashed = true + } + hashvalue + } + override def hashCode = hash +} + +/** An object designed to generate well-distributed non-cryptographic + * hashes. It is designed to hash a collection of integers; along with + * the integers to hash, it generates two magic streams of integers to + * increase the distribution of repetitive input sequences. Thus, + * three methods need to be called at each step (to start and to + * incorporate a new integer) to update the values. Only one method + * needs to be called to finalize the hash. + */ + +object MurmurHash { + // Magic values used for MurmurHash's 32 bit hash. + // Don't change these without consulting a hashing expert! + final private val visibleMagic = 0x971e137b + final private val hiddenMagicA = 0x95543787 + final private val hiddenMagicB = 0x2ad7eb25 + final private val visibleMixer = 0x52dce729 + final private val hiddenMixerA = 0x7b7d159c + final private val hiddenMixerB = 0x6bce6396 + final private val finalMixer1 = 0x85ebca6b + final private val finalMixer2 = 0xc2b2ae35 + + // Arbitrary values used for hashing certain classes + final private val seedString = 0xf7ca7fd2 + final private val seedArray = 0x3c074a61 + + /** The first 23 magic integers from the first stream are stored here */ + val storedMagicA = + Iterator.iterate(hiddenMagicA)(nextMagicA).take(23).toArray + + /** The first 23 magic integers from the second stream are stored here */ + val storedMagicB = + Iterator.iterate(hiddenMagicB)(nextMagicB).take(23).toArray + + /** Begin a new hash with a seed value. */ + def startHash(seed: Int) = seed ^ visibleMagic + + /** The initial magic integers in the first stream. */ + def startMagicA = hiddenMagicA + + /** The initial magic integer in the second stream. */ + def startMagicB = hiddenMagicB + + /** Incorporates a new value into an existing hash. + * + * @param hash the prior hash value + * @param value the new value to incorporate + * @param magicA a magic integer from the stream + * @param magicB a magic integer from a different stream + * @return the updated hash value + */ + def extendHash(hash: Int, value: Int, magicA: Int, magicB: Int) = { + (hash ^ rotl(value*magicA,11)*magicB)*3 + visibleMixer + } + + /** Given a magic integer from the first stream, compute the next */ + def nextMagicA(magicA: Int) = magicA*5 + hiddenMixerA + + /** Given a magic integer from the second stream, compute the next */ + def nextMagicB(magicB: Int) = magicB*5 + hiddenMixerB + + /** Once all hashes have been incorporated, this performs a final mixing */ + def finalizeHash(hash: Int) = { + var i = (hash ^ (hash>>>16)) + i *= finalMixer1 + i ^= (i >>> 13) + i *= finalMixer2 + i ^= (i >>> 16) + i + } + + /** Compute a high-quality hash of an array */ + def arrayHash[@specialized T](a: Array[T]) = { + var h = startHash(a.length * seedArray) + var c = hiddenMagicA + var k = hiddenMagicB + var j = 0 + while (j < a.length) { + h = extendHash(h, a(j).##, c, k) + c = nextMagicA(c) + k = nextMagicB(k) + j += 1 + } + finalizeHash(h) + } + + /** Compute a high-quality hash of a string */ + def stringHash(s: String) = { + var h = startHash(s.length * seedString) + var c = hiddenMagicA + var k = hiddenMagicB + var j = 0 + while (j+1 < s.length) { + val i = (s.charAt(j)<<16) + s.charAt(j+1); + h = extendHash(h,i,c,k) + c = nextMagicA(c) + k = nextMagicB(k) + j += 2 + } + if (j < s.length) h = extendHash(h,s.charAt(j),c,k) + finalizeHash(h) + } + + /** Compute a hash that is symmetric in its arguments--that is, + * where the order of appearance of elements does not matter. + * This is useful for hashing sets, for example. + */ + def symmetricHash[T](xs: TraversableOnce[T], seed: Int) = { + var a,b,n = 0 + var c = 1 + xs.foreach(i => { + val h = i.## + a += h + b ^= h + if (h != 0) c *= h + n += 1 + }) + var h = startHash(seed * n) + h = extendHash(h, a, storedMagicA(0), storedMagicB(0)) + h = extendHash(h, b, storedMagicA(1), storedMagicB(1)) + h = extendHash(h, c, storedMagicA(2), storedMagicB(2)) + finalizeHash(h) + } +} diff --git a/akka-actor/src/main/scala/akka/routing/Iterators.scala b/akka-actor/src/main/scala/akka/routing/Iterators.scala deleted file mode 100644 index 6172cf7ea6..0000000000 --- a/akka-actor/src/main/scala/akka/routing/Iterators.scala +++ /dev/null @@ -1,48 +0,0 @@ -/** - * Copyright (C) 2009-2011 Scalable Solutions AB - */ - -package akka.routing - -import akka.actor.ActorRef -import scala.collection.JavaConversions._ -import scala.collection.immutable.Seq - -/** - * An Iterator that is either always empty or yields an infinite number of Ts. - */ -trait InfiniteIterator[T] extends Iterator[T] { - val items: Seq[T] -} - -/** - * CyclicIterator is a round-robin style InfiniteIterator that cycles the supplied List. - */ -case class CyclicIterator[T](val items: Seq[T]) extends InfiniteIterator[T] { - def this(items: java.util.List[T]) = this(items.toList) - - @volatile private[this] var current: Seq[T] = items - - def hasNext = items != Nil - - def next = { - val nc = if (current == Nil) items else current - current = nc.tail - nc.head - } - - override def exists(f: T => Boolean): Boolean = items.exists(f) -} - -/** - * This InfiniteIterator always returns the Actor that has the currently smallest mailbox - * useful for work-stealing. - */ -case class SmallestMailboxFirstIterator(val items : Seq[ActorRef]) extends InfiniteIterator[ActorRef] { - def this(items: java.util.List[ActorRef]) = this(items.toList) - def hasNext = items != Nil - - def next = items.reduceLeft((a1, a2) => if (a1.mailboxSize < a2.mailboxSize) a1 else a2) - - override def exists(f: ActorRef => Boolean): Boolean = items.exists(f) -} diff --git a/akka-actor/src/main/scala/akka/routing/Routers.scala b/akka-actor/src/main/scala/akka/routing/Routers.scala deleted file mode 100644 index 57511076e8..0000000000 --- a/akka-actor/src/main/scala/akka/routing/Routers.scala +++ /dev/null @@ -1,87 +0,0 @@ -/** - * Copyright (C) 2009-2011 Scalable Solutions AB - */ - -package akka.routing - -import akka.actor.{UntypedActor, Actor, ActorRef} - -/** - * A Dispatcher is a trait whose purpose is to route incoming messages to actors. - */ -trait Dispatcher { this: Actor => - - protected def transform(msg: Any): Any = msg - - protected def routes: PartialFunction[Any, ActorRef] - - protected def broadcast(message: Any) {} - - protected def dispatch: Receive = { - case Routing.Broadcast(message) => - broadcast(message) - case a if routes.isDefinedAt(a) => - if (isSenderDefined) routes(a).forward(transform(a))(someSelf) - else routes(a).!(transform(a))(None) - } - - def receive = dispatch - - private def isSenderDefined = self.senderFuture.isDefined || self.sender.isDefined -} - -/** - * An UntypedDispatcher is an abstract class whose purpose is to route incoming messages to actors. - */ -abstract class UntypedDispatcher extends UntypedActor { - protected def transform(msg: Any): Any = msg - - protected def route(msg: Any): ActorRef - - protected def broadcast(message: Any) {} - - private def isSenderDefined = self.senderFuture.isDefined || self.sender.isDefined - - @throws(classOf[Exception]) - def onReceive(msg: Any): Unit = { - if (msg.isInstanceOf[Routing.Broadcast]) broadcast(msg.asInstanceOf[Routing.Broadcast].message) - else { - val r = route(msg) - if (r eq null) throw new IllegalStateException("No route for " + msg + " defined!") - if (isSenderDefined) r.forward(transform(msg))(someSelf) - else r.!(transform(msg))(None) - } - } -} - -/** - * A LoadBalancer is a specialized kind of Dispatcher, that is supplied an InfiniteIterator of targets - * to dispatch incoming messages to. - */ -trait LoadBalancer extends Dispatcher { self: Actor => - protected def seq: InfiniteIterator[ActorRef] - - protected def routes = { - case x if seq.hasNext => seq.next - } - - override def broadcast(message: Any) = seq.items.foreach(_ ! message) - - override def isDefinedAt(msg: Any) = seq.exists( _.isDefinedAt(msg) ) -} - -/** - * A UntypedLoadBalancer is a specialized kind of UntypedDispatcher, that is supplied an InfiniteIterator of targets - * to dispatch incoming messages to. - */ -abstract class UntypedLoadBalancer extends UntypedDispatcher { - protected def seq: InfiniteIterator[ActorRef] - - protected def route(msg: Any) = - if (seq.hasNext) seq.next - else null - - override def broadcast(message: Any) = seq.items.foreach(_ ! message) - - override def isDefinedAt(msg: Any) = seq.exists( _.isDefinedAt(msg) ) -} diff --git a/akka-actor/src/main/scala/akka/routing/Routing.scala b/akka-actor/src/main/scala/akka/routing/Routing.scala index d31653a2fb..ef8f63e545 100644 --- a/akka-actor/src/main/scala/akka/routing/Routing.scala +++ b/akka-actor/src/main/scala/akka/routing/Routing.scala @@ -4,9 +4,13 @@ package akka.routing -import akka.actor.{Actor, ActorRef} +import akka.actor.{UntypedActor, Actor, ActorRef} import akka.actor.Actor._ +import akka.actor.ActorRef +import scala.collection.JavaConversions._ +import scala.collection.immutable.Seq + object Routing { sealed trait RoutingMessage @@ -62,3 +66,123 @@ object Routing { def loggerActor(actorToLog: ActorRef, logger: (Any) => Unit): ActorRef = dispatcherActor({case _ => actorToLog}, logger) } + +/** + * An Iterator that is either always empty or yields an infinite number of Ts. + */ +trait InfiniteIterator[T] extends Iterator[T] { + val items: Seq[T] +} + +/** + * CyclicIterator is a round-robin style InfiniteIterator that cycles the supplied List. + */ +case class CyclicIterator[T](val items: Seq[T]) extends InfiniteIterator[T] { + def this(items: java.util.List[T]) = this(items.toList) + + @volatile private[this] var current: Seq[T] = items + + def hasNext = items != Nil + + def next = { + val nc = if (current == Nil) items else current + current = nc.tail + nc.head + } + + override def exists(f: T => Boolean): Boolean = items.exists(f) +} + +/** + * This InfiniteIterator always returns the Actor that has the currently smallest mailbox + * useful for work-stealing. + */ +case class SmallestMailboxFirstIterator(val items : Seq[ActorRef]) extends InfiniteIterator[ActorRef] { + def this(items: java.util.List[ActorRef]) = this(items.toList) + def hasNext = items != Nil + + def next = items.reduceLeft((a1, a2) => if (a1.mailboxSize < a2.mailboxSize) a1 else a2) + + override def exists(f: ActorRef => Boolean): Boolean = items.exists(f) +} + +/** + * A Dispatcher is a trait whose purpose is to route incoming messages to actors. + */ +trait Dispatcher { this: Actor => + + protected def transform(msg: Any): Any = msg + + protected def routes: PartialFunction[Any, ActorRef] + + protected def broadcast(message: Any) {} + + protected def dispatch: Receive = { + case Routing.Broadcast(message) => + broadcast(message) + case a if routes.isDefinedAt(a) => + if (isSenderDefined) routes(a).forward(transform(a))(someSelf) + else routes(a).!(transform(a))(None) + } + + def receive = dispatch + + private def isSenderDefined = self.senderFuture.isDefined || self.sender.isDefined +} + +/** + * An UntypedDispatcher is an abstract class whose purpose is to route incoming messages to actors. + */ +abstract class UntypedDispatcher extends UntypedActor { + protected def transform(msg: Any): Any = msg + + protected def route(msg: Any): ActorRef + + protected def broadcast(message: Any) {} + + private def isSenderDefined = self.senderFuture.isDefined || self.sender.isDefined + + @throws(classOf[Exception]) + def onReceive(msg: Any): Unit = { + if (msg.isInstanceOf[Routing.Broadcast]) broadcast(msg.asInstanceOf[Routing.Broadcast].message) + else { + val r = route(msg) + if (r eq null) throw new IllegalStateException("No route for " + msg + " defined!") + if (isSenderDefined) r.forward(transform(msg))(someSelf) + else r.!(transform(msg))(None) + } + } +} + +/** + * A LoadBalancer is a specialized kind of Dispatcher, that is supplied an InfiniteIterator of targets + * to dispatch incoming messages to. + */ +trait LoadBalancer extends Dispatcher { self: Actor => + protected def seq: InfiniteIterator[ActorRef] + + protected def routes = { + case x if seq.hasNext => seq.next + } + + override def broadcast(message: Any) = seq.items.foreach(_ ! message) + + override def isDefinedAt(msg: Any) = seq.exists( _.isDefinedAt(msg) ) +} + +/** + * A UntypedLoadBalancer is a specialized kind of UntypedDispatcher, that is supplied an InfiniteIterator of targets + * to dispatch incoming messages to. + */ +abstract class UntypedLoadBalancer extends UntypedDispatcher { + protected def seq: InfiniteIterator[ActorRef] + + protected def route(msg: Any) = + if (seq.hasNext) seq.next + else null + + override def broadcast(message: Any) = seq.items.foreach(_ ! message) + + override def isDefinedAt(msg: Any) = seq.exists( _.isDefinedAt(msg) ) +} + diff --git a/akka-cluster/src/main/scala/akka/cluster/MurmurHash.scala b/akka-cluster/src/main/scala/akka/cluster/MurmurHash.scala index 6441a53f76..d6cbfdc2c3 100644 --- a/akka-cluster/src/main/scala/akka/cluster/MurmurHash.scala +++ b/akka-cluster/src/main/scala/akka/cluster/MurmurHash.scala @@ -6,7 +6,7 @@ ** |/ ** \* */ -package akka.cluster +package akka.routing /** An implementation of Austin Appleby's MurmurHash 3.0 algorithm * (32 bit version); reference: http://code.google.com/p/smhasher