diff --git a/akka-actor-tests/src/test/scala/akka/actor/DeployerSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/DeployerSpec.scala index 1aef438627..37aa133583 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/DeployerSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/DeployerSpec.scala @@ -35,6 +35,9 @@ object DeployerSpec { router = scatter-gather within = 2 seconds } + /service-consistent-hashing { + router = consistent-hashing + } /service-resizer { router = round-robin resizer { @@ -118,6 +121,10 @@ class DeployerSpec extends AkkaSpec(DeployerSpec.deployerConf) { assertRouting("/service-scatter-gather", ScatterGatherFirstCompletedRouter(nrOfInstances = 1, within = 2 seconds), "/service-scatter-gather") } + "be able to parse 'akka.actor.deployment._' with consistent-hashing router" in { + assertRouting("/service-consistent-hashing", ConsistentHashingRouter(1), "/service-consistent-hashing") + } + "be able to parse 'akka.actor.deployment._' with router resizer" in { val resizer = DefaultResizer() assertRouting("/service-resizer", RoundRobinRouter(resizer = Some(resizer)), "/service-resizer") diff --git a/akka-actor-tests/src/test/scala/akka/config/ConfigSpec.scala b/akka-actor-tests/src/test/scala/akka/config/ConfigSpec.scala index 5fe3466b25..56f8cd45fc 100644 --- a/akka-actor-tests/src/test/scala/akka/config/ConfigSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/config/ConfigSpec.scala @@ -43,6 +43,9 @@ class ConfigSpec extends AkkaSpec(ConfigFactory.defaultReference(ActorSystem.fin getBoolean("akka.jvm-exit-on-fatal-error") must be(true) settings.JvmExitOnFatalError must be(true) + + getInt("akka.actor.deployment.default.virtual-nodes-factor") must be(10) + settings.DefaultVirtualNodesFactor must be(10) } { diff --git a/akka-actor-tests/src/test/scala/akka/routing/ConsistentHashingRouterSpec.scala b/akka-actor-tests/src/test/scala/akka/routing/ConsistentHashingRouterSpec.scala new file mode 100644 index 0000000000..867da83bd7 --- /dev/null +++ b/akka-actor-tests/src/test/scala/akka/routing/ConsistentHashingRouterSpec.scala @@ -0,0 +1,104 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ +package akka.routing + +import scala.concurrent.Await + +import akka.actor.Actor +import akka.actor.ActorRef +import akka.actor.Props +import akka.actor.actorRef2Scala +import akka.pattern.ask +import akka.routing.ConsistentHashingRouter.ConsistentHashable +import akka.routing.ConsistentHashingRouter.ConsistentHashableEnvelope +import akka.routing.ConsistentHashingRouter.ConsistentHashMapping +import akka.testkit.AkkaSpec +import akka.testkit._ + +object ConsistentHashingRouterSpec { + + val config = """ + akka.actor.deployment { + /router1 { + router = consistent-hashing + nr-of-instances = 3 + virtual-nodes-factor = 17 + } + /router2 { + router = consistent-hashing + nr-of-instances = 5 + } + } + """ + + class Echo extends Actor { + def receive = { + case _ ⇒ sender ! self + } + } + + case class Msg(key: Any, data: String) extends ConsistentHashable { + override def consistentHashKey = key + } + + case class MsgKey(name: String) + + case class Msg2(key: Any, data: String) +} + +@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) +class ConsistentHashingRouterSpec extends AkkaSpec(ConsistentHashingRouterSpec.config) with DefaultTimeout with ImplicitSender { + import akka.routing.ConsistentHashingRouterSpec._ + implicit val ec = system.dispatcher + + val router1 = system.actorOf(Props[Echo].withRouter(FromConfig()), "router1") + + "consistent hashing router" must { + "create routees from configuration" in { + val currentRoutees = Await.result(router1 ? CurrentRoutees, remaining).asInstanceOf[RouterRoutees] + currentRoutees.routees.size must be(3) + } + + "select destination based on consistentHashKey of the message" in { + router1 ! Msg("a", "A") + val destinationA = expectMsgType[ActorRef] + router1 ! ConsistentHashableEnvelope(message = "AA", hashKey = "a") + expectMsg(destinationA) + + router1 ! Msg(17, "B") + val destinationB = expectMsgType[ActorRef] + router1 ! ConsistentHashableEnvelope(message = "BB", hashKey = 17) + expectMsg(destinationB) + + router1 ! Msg(MsgKey("c"), "C") + val destinationC = expectMsgType[ActorRef] + router1 ! ConsistentHashableEnvelope(message = "CC", hashKey = MsgKey("c")) + expectMsg(destinationC) + } + + "select destination with defined consistentHashRoute" in { + def hashMapping: ConsistentHashMapping = { + case Msg2(key, data) ⇒ key + } + val router2 = system.actorOf(Props[Echo].withRouter(ConsistentHashingRouter( + hashMapping = hashMapping)), "router2") + + router2 ! Msg2("a", "A") + val destinationA = expectMsgType[ActorRef] + router2 ! ConsistentHashableEnvelope(message = "AA", hashKey = "a") + expectMsg(destinationA) + + router2 ! Msg2(17, "B") + val destinationB = expectMsgType[ActorRef] + router2 ! ConsistentHashableEnvelope(message = "BB", hashKey = 17) + expectMsg(destinationB) + + router2 ! Msg2(MsgKey("c"), "C") + val destinationC = expectMsgType[ActorRef] + router2 ! ConsistentHashableEnvelope(message = "CC", hashKey = MsgKey("c")) + expectMsg(destinationC) + } + } + +} diff --git a/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala b/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala index 9e126e75ed..2d4f2245f1 100644 --- a/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala @@ -171,8 +171,6 @@ class RoutingSpec extends AkkaSpec(RoutingSpec.config) with DefaultTimeout with } expectMsgType[ActorKilledException] - //#supervision - val router2 = system.actorOf(Props.empty.withRouter(RoundRobinRouter(1).withSupervisorStrategy(escalator))) router2 ! CurrentRoutees EventFilter[ActorKilledException](occurrences = 2) intercept { diff --git a/akka-actor/src/main/resources/reference.conf b/akka-actor/src/main/resources/reference.conf index a585fe3406..163d2e83d9 100644 --- a/akka-actor/src/main/resources/reference.conf +++ b/akka-actor/src/main/resources/reference.conf @@ -14,7 +14,7 @@ akka { # Event handlers to register at boot time (Logging$DefaultLogger logs to STDOUT) event-handlers = ["akka.event.Logging$DefaultLogger"] - + # Event handlers are created and registered synchronously during ActorSystem # start-up, and since they are actors, this timeout is used to bound the # waiting time @@ -49,7 +49,7 @@ akka { # FQCN of the ActorRefProvider to be used; the below is the built-in default, # another one is akka.remote.RemoteActorRefProvider in the akka-remote bundle. provider = "akka.actor.LocalActorRefProvider" - + # The guardian "/user" will use this subclass of akka.actor.SupervisorStrategyConfigurator # to obtain its supervisorStrategy. Besides the default there is # akka.actor.StoppingSupervisorStrategy @@ -69,7 +69,7 @@ akka { # Serializes and deserializes creators (in Props) to ensure that they can be sent over the network, # this is only intended for testing. serialize-creators = off - + # Timeout for send operations to top-level actors which are in the process of being started. # This is only relevant if using a bounded mailbox or the CallingThreadDispatcher for a top-level actor. unstarted-push-timeout = 10s @@ -108,6 +108,9 @@ akka { # within is the timeout used for routers containing future calls within = 5 seconds + # number of virtual nodes per node for consistent-hashing router + virtual-nodes-factor = 10 + routees { # Alternatively to giving nr-of-instances you can specify the full # paths of those actors which should be routed to. This setting takes @@ -296,7 +299,7 @@ akka { # enable DEBUG logging of subscription changes on the eventStream event-stream = off - + # enable DEBUG logging of unhandled messages unhandled = off @@ -318,13 +321,13 @@ akka { serialization-bindings { "java.io.Serializable" = java } - + # Configuration items which are used by the akka.actor.ActorDSL._ methods dsl { # Maximum queue size of the actor created by newInbox(); this protects against # faulty programs which use select() and consistently miss messages inbox-size = 1000 - + # Default timeout to assume for operations like Inbox.receive et al default-timeout = 5s } diff --git a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala index 3d3c4b83d4..182239adbf 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala @@ -166,6 +166,8 @@ object ActorSystem { final val Daemonicity: Boolean = getBoolean("akka.daemonic") final val JvmExitOnFatalError: Boolean = getBoolean("akka.jvm-exit-on-fatal-error") + final val DefaultVirtualNodesFactor: Int = getInt("akka.actor.deployment.default.virtual-nodes-factor") + if (ConfigVersion != Version) throw new akka.ConfigurationException("Akka JAR version [" + Version + "] does not match the provided config version [" + ConfigVersion + "]") diff --git a/akka-actor/src/main/scala/akka/actor/Deployer.scala b/akka-actor/src/main/scala/akka/actor/Deployer.scala index 9ceebaacb3..3f5e2b1785 100644 --- a/akka-actor/src/main/scala/akka/actor/Deployer.scala +++ b/akka-actor/src/main/scala/akka/actor/Deployer.scala @@ -143,8 +143,6 @@ private[akka] class Deployer(val settings: ActorSystem.Settings, val dynamicAcce val nrOfInstances = deployment.getInt("nr-of-instances") - val within = Duration(deployment.getMilliseconds("within"), TimeUnit.MILLISECONDS) - val resizer: Option[Resizer] = if (config.hasPath("resizer")) Some(DefaultResizer(deployment.getConfig("resizer"))) else None val router: RouterConfig = deployment.getString("router") match { @@ -152,8 +150,13 @@ private[akka] class Deployer(val settings: ActorSystem.Settings, val dynamicAcce case "round-robin" ⇒ RoundRobinRouter(nrOfInstances, routees, resizer) case "random" ⇒ RandomRouter(nrOfInstances, routees, resizer) case "smallest-mailbox" ⇒ SmallestMailboxRouter(nrOfInstances, routees, resizer) - case "scatter-gather" ⇒ ScatterGatherFirstCompletedRouter(nrOfInstances, routees, within, resizer) case "broadcast" ⇒ BroadcastRouter(nrOfInstances, routees, resizer) + case "scatter-gather" ⇒ + val within = Duration(deployment.getMilliseconds("within"), TimeUnit.MILLISECONDS) + ScatterGatherFirstCompletedRouter(nrOfInstances, routees, within, resizer) + case "consistent-hashing" ⇒ + val vnodes = deployment.getInt("virtual-nodes-factor") + ConsistentHashingRouter(nrOfInstances, routees, resizer, virtualNodesFactor = vnodes) case fqn ⇒ val args = Seq(classOf[Config] -> deployment) dynamicAccess.createInstanceFor[RouterConfig](fqn, args).recover({ diff --git a/akka-actor/src/main/scala/akka/routing/ConsistentHash.scala b/akka-actor/src/main/scala/akka/routing/ConsistentHash.scala index afa321d07d..79c31cda33 100644 --- a/akka-actor/src/main/scala/akka/routing/ConsistentHash.scala +++ b/akka-actor/src/main/scala/akka/routing/ConsistentHash.scala @@ -4,255 +4,126 @@ package akka.routing -import scala.collection.immutable.{ TreeSet, Seq } -import scala.collection.mutable.{ Buffer, Map } - -// ============================================================================================= -// Adapted from HashRing.scala in Debasish Ghosh's Redis Client, licensed under Apache 2 license -// ============================================================================================= +import scala.collection.immutable.SortedMap +import scala.reflect.ClassTag +import java.util.Arrays /** - * Consistent Hashing node ring abstraction. + * Consistent Hashing node ring implementation. + * + * A good explanation of Consistent Hashing: + * http://weblogs.java.net/blog/tomwhite/archive/2007/11/consistent_hash.html + * + * Note that toString of the ring nodes are used for the node + * hash, i.e. make sure it is different for different nodes. * - * Not thread-safe, to be used from within an Actor or protected some other way. */ -class ConsistentHash[T](nodes: Seq[T], replicas: Int) { - private val cluster = Buffer[T]() - private var sortedKeys = TreeSet[Long]() - private var ring = Map[Long, T]() +class ConsistentHash[T: ClassTag] private (nodes: SortedMap[Int, T], virtualNodesFactor: Int) { - nodes.foreach(this += _) + import ConsistentHash._ - def +=(node: T): Unit = { - cluster += node - (1 to replicas) foreach { replica ⇒ - val key = hashFor((node + ":" + replica).getBytes("UTF-8")) - ring += (key -> node) - sortedKeys = sortedKeys + key - } + if (virtualNodesFactor < 1) throw new IllegalArgumentException("virtualNodesFactor must be >= 1") + + // arrays for fast binary search and access + // nodeHashRing is the sorted hash values of the nodes + // nodeRing is the nodes sorted in the same order as nodeHashRing, i.e. same index + private val (nodeHashRing: Array[Int], nodeRing: Array[T]) = { + val (nhr: Seq[Int], nr: Seq[T]) = nodes.toSeq.unzip + (nhr.toArray, nr.toArray) } - def -=(node: T): Unit = { - cluster -= node - (1 to replicas) foreach { replica ⇒ - val key = hashFor((node + ":" + replica).getBytes("UTF-8")) - ring -= key - sortedKeys = sortedKeys - key - } - } + /** + * Adds a node to the node ring. + * Note that the instance is immutable and this + * operation returns a new instance. + */ + def :+(node: T): ConsistentHash[T] = + new ConsistentHash(nodes ++ ((1 to virtualNodesFactor) map { r ⇒ (nodeHashFor(node, r) -> node) }), virtualNodesFactor) - def nodeFor(key: Array[Byte]): T = { - val hash = hashFor(key) - if (sortedKeys contains hash) ring(hash) + /** + * Adds a node to the node ring. + * Note that the instance is immutable and this + * operation returns a new instance. + * JAVA API + */ + def add(node: T): ConsistentHash[T] = this :+ node + + /** + * Removes a node from the node ring. + * Note that the instance is immutable and this + * operation returns a new instance. + */ + def :-(node: T): ConsistentHash[T] = + new ConsistentHash(nodes -- ((1 to virtualNodesFactor) map { r ⇒ nodeHashFor(node, r) }), virtualNodesFactor) + + /** + * Removes a node from the node ring. + * Note that the instance is immutable and this + * operation returns a new instance. + * JAVA API + */ + def remove(node: T): ConsistentHash[T] = this :- node + + // converts the result of Arrays.binarySearch into a index in the nodeRing array + // see documentation of Arrays.binarySearch for what it returns + private def idx(i: Int): Int = { + if (i >= 0) i // exact match else { - if (hash < sortedKeys.firstKey) ring(sortedKeys.firstKey) - else if (hash > sortedKeys.lastKey) ring(sortedKeys.lastKey) - else ring(sortedKeys.rangeImpl(None, Some(hash)).lastKey) + val j = math.abs(i + 1) + if (j >= nodeHashRing.length) 0 // after last, use first + else j // next node clockwise } } - private def hashFor(bytes: Array[Byte]): Long = { - val hash = MurmurHash.arrayHash(bytes) - if (hash == Int.MinValue) hash + 1 - math.abs(hash) - } -} - -/* __ *\ -** ________ ___ / / ___ Scala API ** -** / __/ __// _ | / / / _ | (c) 2003-2011, LAMP/EPFL ** -** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ ** -** /____/\___/_/ |_/____/_/ | | ** -** |/ ** -\* */ - -/** - * An implementation of Austin Appleby's MurmurHash 3.0 algorithm - * (32 bit version); reference: http://code.google.com/p/smhasher - * - * This is the hash used by collections and case classes (including - * tuples). - * - * @author Rex Kerr - * @version 2.9 - * @since 2.9 - */ - -import java.lang.Integer.{ rotateLeft ⇒ rotl } - -/** - * A class designed to generate well-distributed non-cryptographic - * hashes. It is designed to be passed to a collection's foreach method, - * or can take individual hash values with append. Its own hash code is - * set equal to the hash code of whatever it is hashing. - */ -class MurmurHash[@specialized(Int, Long, Float, Double) T](seed: Int) extends (T ⇒ Unit) { - import MurmurHash._ - - private var h = startHash(seed) - private var c = hiddenMagicA - private var k = hiddenMagicB - private var hashed = false - private var hashvalue = h - - /** Begin a new hash using the same seed. */ - def reset(): Unit = { - h = startHash(seed) - c = hiddenMagicA - k = hiddenMagicB - hashed = false - } - - /** Incorporate the hash value of one item. */ - def apply(t: T): Unit = { - h = extendHash(h, t.##, c, k) - c = nextMagicA(c) - k = nextMagicB(k) - hashed = false - } - - /** Incorporate a known hash value. */ - def append(i: Int): Unit = { - h = extendHash(h, i, c, k) - c = nextMagicA(c) - k = nextMagicB(k) - hashed = false - } - - /** Retrieve the hash value */ - def hash: Int = { - if (!hashed) { - hashvalue = finalizeHash(h) - hashed = true - } - hashvalue - } - - override def hashCode: Int = hash -} - -/** - * An object designed to generate well-distributed non-cryptographic - * hashes. It is designed to hash a collection of integers; along with - * the integers to hash, it generates two magic streams of integers to - * increase the distribution of repetitive input sequences. Thus, - * three methods need to be called at each step (to start and to - * incorporate a new integer) to update the values. Only one method - * needs to be called to finalize the hash. - */ - -object MurmurHash { - // Magic values used for MurmurHash's 32 bit hash. - // Don't change these without consulting a hashing expert! - final private val visibleMagic: Int = 0x971e137b - final private val hiddenMagicA: Int = 0x95543787 - final private val hiddenMagicB: Int = 0x2ad7eb25 - final private val visibleMixer: Int = 0x52dce729 - final private val hiddenMixerA: Int = 0x7b7d159c - final private val hiddenMixerB: Int = 0x6bce6396 - final private val finalMixer1: Int = 0x85ebca6b - final private val finalMixer2: Int = 0xc2b2ae35 - - // Arbitrary values used for hashing certain classes - final private val seedString: Int = 0xf7ca7fd2 - final private val seedArray: Int = 0x3c074a61 - - /** The first 23 magic integers from the first stream are stored here */ - val storedMagicA: Array[Int] = - Iterator.iterate(hiddenMagicA)(nextMagicA).take(23).toArray - - /** The first 23 magic integers from the second stream are stored here */ - val storedMagicB: Array[Int] = - Iterator.iterate(hiddenMagicB)(nextMagicB).take(23).toArray - - /** Begin a new hash with a seed value. */ - def startHash(seed: Int): Int = seed ^ visibleMagic - - /** The initial magic integers in the first stream. */ - def startMagicA: Int = hiddenMagicA - - /** The initial magic integer in the second stream. */ - def startMagicB: Int = hiddenMagicB - - /** - * Incorporates a new value into an existing hash. - * - * @param hash the prior hash value - * @param value the new value to incorporate - * @param magicA a magic integer from the stream - * @param magicB a magic integer from a different stream - * @return the updated hash value - */ - def extendHash(hash: Int, value: Int, magicA: Int, magicB: Int): Int = - (hash ^ rotl(value * magicA, 11) * magicB) * 3 + visibleMixer - - /** Given a magic integer from the first stream, compute the next */ - def nextMagicA(magicA: Int): Int = magicA * 5 + hiddenMixerA - - /** Given a magic integer from the second stream, compute the next */ - def nextMagicB(magicB: Int): Int = magicB * 5 + hiddenMixerB - - /** Once all hashes have been incorporated, this performs a final mixing */ - def finalizeHash(hash: Int): Int = { - var i = (hash ^ (hash >>> 16)) - i *= finalMixer1 - i ^= (i >>> 13) - i *= finalMixer2 - i ^= (i >>> 16) - i - } - - /** Compute a high-quality hash of an array */ - def arrayHash[@specialized T](a: Array[T]): Int = { - var h = startHash(a.length * seedArray) - var c = hiddenMagicA - var k = hiddenMagicB - var j = 0 - while (j < a.length) { - h = extendHash(h, a(j).##, c, k) - c = nextMagicA(c) - k = nextMagicB(k) - j += 1 - } - finalizeHash(h) - } - - /** Compute a high-quality hash of a string */ - def stringHash(s: String): Int = { - var h = startHash(s.length * seedString) - var c = hiddenMagicA - var k = hiddenMagicB - var j = 0 - while (j + 1 < s.length) { - val i = (s.charAt(j) << 16) + s.charAt(j + 1); - h = extendHash(h, i, c, k) - c = nextMagicA(c) - k = nextMagicB(k) - j += 2 - } - if (j < s.length) h = extendHash(h, s.charAt(j), c, k) - finalizeHash(h) - } - /** - * Compute a hash that is symmetric in its arguments--that is, - * where the order of appearance of elements does not matter. - * This is useful for hashing sets, for example. + * Get the node responsible for the data key. + * Can only be used if nodes exists in the node ring, + * otherwise throws `IllegalStateException` */ - def symmetricHash[T](xs: TraversableOnce[T], seed: Int): Int = { - var a, b, n = 0 - var c = 1 - xs.foreach(i ⇒ { - val h = i.## - a += h - b ^= h - if (h != 0) c *= h - n += 1 - }) - var h = startHash(seed * n) - h = extendHash(h, a, storedMagicA(0), storedMagicB(0)) - h = extendHash(h, b, storedMagicA(1), storedMagicB(1)) - h = extendHash(h, c, storedMagicA(2), storedMagicB(2)) - finalizeHash(h) + def nodeFor(key: Array[Byte]): T = { + if (isEmpty) throw new IllegalStateException("Can't get node for [%s] from an empty node ring" format key) + + nodeRing(idx(Arrays.binarySearch(nodeHashRing, hashFor(key)))) } + + /** + * Get the node responsible for the data key. + * Can only be used if nodes exists in the node ring, + * otherwise throws `IllegalStateException` + */ + def nodeFor(key: String): T = { + if (isEmpty) throw new IllegalStateException("Can't get node for [%s] from an empty node ring" format key) + + nodeRing(idx(Arrays.binarySearch(nodeHashRing, hashFor(key)))) + } + + /** + * Is the node ring empty, i.e. no nodes added or all removed. + */ + def isEmpty: Boolean = nodes.isEmpty + +} + +object ConsistentHash { + def apply[T: ClassTag](nodes: Iterable[T], virtualNodesFactor: Int): ConsistentHash[T] = { + new ConsistentHash(SortedMap.empty[Int, T] ++ + (for (node ← nodes; vnode ← 1 to virtualNodesFactor) yield (nodeHashFor(node, vnode) -> node)), + virtualNodesFactor) + } + + /** + * Factory method to create a ConsistentHash + * JAVA API + */ + def create[T](nodes: java.lang.Iterable[T], virtualNodesFactor: Int): ConsistentHash[T] = { + import scala.collection.JavaConverters._ + apply(nodes.asScala, virtualNodesFactor)(ClassTag(classOf[Any].asInstanceOf[Class[T]])) + } + + private def nodeHashFor(node: Any, vnode: Int): Int = + hashFor((node + ":" + vnode).getBytes("UTF-8")) + + private def hashFor(bytes: Array[Byte]): Int = MurmurHash.arrayHash(bytes) + + private def hashFor(string: String): Int = MurmurHash.stringHash(string) } diff --git a/akka-actor/src/main/scala/akka/routing/ConsistentHashingRouter.scala b/akka-actor/src/main/scala/akka/routing/ConsistentHashingRouter.scala new file mode 100644 index 0000000000..3b9802d7fd --- /dev/null +++ b/akka-actor/src/main/scala/akka/routing/ConsistentHashingRouter.scala @@ -0,0 +1,297 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ +package akka.routing + +import scala.collection.JavaConversions.iterableAsScalaIterable +import scala.util.control.NonFatal +import akka.actor.ActorRef +import akka.actor.SupervisorStrategy +import akka.actor.Props +import akka.dispatch.Dispatchers +import akka.event.Logging +import akka.serialization.SerializationExtension +import java.util.concurrent.atomic.AtomicReference + +object ConsistentHashingRouter { + /** + * Creates a new ConsistentHashingRouter, routing to the specified routees + */ + def apply(routees: Iterable[ActorRef]): ConsistentHashingRouter = + new ConsistentHashingRouter(routees = routees map (_.path.toString)) + + /** + * Java API to create router with the supplied 'routees' actors. + */ + def create(routees: java.lang.Iterable[ActorRef]): ConsistentHashingRouter = { + import scala.collection.JavaConverters._ + apply(routees.asScala) + } + + /** + * If you don't define the `hashMapping` when + * constructing the [[akka.routing.ConsistentHashingRouter]] + * the messages need to implement this interface to define what + * data to use for the consistent hash key. Note that it's not + * the hash, but the data to be hashed. + * + * If returning an `Array[Byte]` or String it will be used as is, + * otherwise the configured [[akka.akka.serialization.Serializer]] + * will be applied to the returned data. + * + * If messages can't implement this interface themselves, + * it's possible to wrap the messages in + * [[akka.routing.ConsistentHashingRouter.ConsistentHashableEnvelope]], + * or use [[akka.routing.ConsistentHashingRouter.ConsistentHashableEnvelope]] + */ + trait ConsistentHashable { + def consistentHashKey: Any + } + + /** + * If you don't define the `hashMapping` when + * constructing the [[akka.routing.ConsistentHashingRouter]] + * and messages can't implement [[akka.routing.ConsistentHashingRouter.ConsistentHashable]] + * themselves they can we wrapped by this envelope instead. The + * router will only send the wrapped message to the destination, + * i.e. the envelope will be stripped off. + */ + @SerialVersionUID(1L) + final case class ConsistentHashableEnvelope(message: Any, hashKey: Any) + extends ConsistentHashable with RouterEnvelope { + override def consistentHashKey: Any = hashKey + } + + /** + * Partial function from message to the data to + * use for the consistent hash key. Note that it's not + * the hash that is to be returned, but the data to be hashed. + * + * If returning an `Array[Byte]` or String it will be used as is, + * otherwise the configured [[akka.akka.serialization.Serializer]] + * will be applied to the returned data. + */ + type ConsistentHashMapping = PartialFunction[Any, Any] + + @SerialVersionUID(1L) + object emptyConsistentHashMapping extends ConsistentHashMapping { + def isDefinedAt(x: Any) = false + def apply(x: Any) = throw new UnsupportedOperationException("Empty ConsistentHashMapping apply()") + } + + /** + * JAVA API + * Mapping from message to the data to use for the consistent hash key. + * Note that it's not the hash that is to be returned, but the data to be + * hashed. + * + * May return `null` to indicate that the message is not handled by + * this mapping. + * + * If returning an `Array[Byte]` or String it will be used as is, + * otherwise the configured [[akka.akka.serialization.Serializer]] + * will be applied to the returned data. + */ + trait ConsistentHashMapper { + def hashKey(message: Any): Any + } +} +/** + * A Router that uses consistent hashing to select a connection based on the + * sent message. + * + * There is 3 ways to define what data to use for the consistent hash key. + * + * 1. You can define `hashMapping` / `withHashMapper` + * of the router to map incoming messages to their consistent hash key. + * This makes the decision transparent for the sender. + * + * 2. The messages may implement [[akka.routing.ConsistentHashingRouter.ConsistentHashable]]. + * The key is part of the message and it's convenient to define it together + * with the message definition. + * + * 3. The messages can be be wrapped in a [[akka.routing.ConsistentHashingRouter.ConsistentHashableEnvelope]] + * to define what data to use for the consistent hash key. The sender knows + * the key to use. + * + * These ways to define the consistent hash key can be use together and at + * the same time for one router. The `hashMapping` is tried first. + * + * Please note that providing both 'nrOfInstances' and 'routees' does not make logical + * sense as this means that the router should both create new actors and use the 'routees' + * actor(s). In this case the 'nrOfInstances' will be ignored and the 'routees' will be used. + *
+ * The configuration parameter trumps the constructor arguments. This means that + * if you provide either 'nrOfInstances' or 'routees' during instantiation they will + * be ignored if the router is defined in the configuration file for the actor being used. + * + *

Supervision Setup

+ * + * The router creates a “head” actor which supervises and/or monitors the + * routees. Instances are created as children of this actor, hence the + * children are not supervised by the parent of the router. Common choices are + * to always escalate (meaning that fault handling is always applied to all + * children simultaneously; this is the default) or use the parent’s strategy, + * which will result in routed children being treated individually, but it is + * possible as well to use Routers to give different supervisor strategies to + * different groups of children. + * + * @param routees string representation of the actor paths of the routees that will be looked up + * using `actorFor` in [[akka.actor.ActorRefProvider]] + * @param virtualNodesFactor number of virtual nodes per node, used in [[akka.routing.ConsistantHash]] + * @param hashMapping partial function from message to the data to + * use for the consistent hash key + */ +@SerialVersionUID(1L) +case class ConsistentHashingRouter( + nrOfInstances: Int = 0, routees: Iterable[String] = Nil, override val resizer: Option[Resizer] = None, + val routerDispatcher: String = Dispatchers.DefaultDispatcherId, + val supervisorStrategy: SupervisorStrategy = Router.defaultSupervisorStrategy, + val virtualNodesFactor: Int = 0, + val hashMapping: ConsistentHashingRouter.ConsistentHashMapping = ConsistentHashingRouter.emptyConsistentHashMapping) + extends RouterConfig with ConsistentHashingLike { + + /** + * Constructor that sets nrOfInstances to be created. + * Java API + */ + def this(nr: Int) = this(nrOfInstances = nr) + + /** + * Constructor that sets the routees to be used. + * Java API + * @param routeePaths string representation of the actor paths of the routees that will be looked up + * using `actorFor` in [[akka.actor.ActorRefProvider]] + */ + def this(routeePaths: java.lang.Iterable[String]) = this(routees = iterableAsScalaIterable(routeePaths)) + + /** + * Constructor that sets the resizer to be used. + * Java API + */ + def this(resizer: Resizer) = this(resizer = Some(resizer)) + + /** + * Java API for setting routerDispatcher + */ + def withDispatcher(dispatcherId: String): ConsistentHashingRouter = copy(routerDispatcher = dispatcherId) + + /** + * Java API for setting the supervisor strategy to be used for the “head” + * Router actor. + */ + def withSupervisorStrategy(strategy: SupervisorStrategy): ConsistentHashingRouter = copy(supervisorStrategy = strategy) + + /** + * Java API for setting the number of virtual nodes per node, used in [[akka.routing.ConsistantHash]] + */ + def withVirtualNodesFactor(vnodes: Int): ConsistentHashingRouter = copy(virtualNodesFactor = vnodes) + + /** + * Java API for setting the mapping from message to the data to use for the consistent hash key. + */ + def withHashMapper(mapping: ConsistentHashingRouter.ConsistentHashMapper) = { + copy(hashMapping = { + case message if (mapping.hashKey(message).asInstanceOf[AnyRef] ne null) ⇒ + mapping.hashKey(message) + }) + } + + /** + * Uses the resizer of the given RouterConfig if this RouterConfig + * doesn't have one, i.e. the resizer defined in code is used if + * resizer was not defined in config. + * Uses the the `hashMapping` defined in code, since + * that can't be defined in configuration. + */ + override def withFallback(other: RouterConfig): RouterConfig = other match { + case _: FromConfig ⇒ this + case otherRouter: ConsistentHashingRouter ⇒ + val useResizer = + if (this.resizer.isEmpty && otherRouter.resizer.isDefined) otherRouter.resizer + else this.resizer + copy(resizer = useResizer, hashMapping = otherRouter.hashMapping) + case _ ⇒ throw new IllegalArgumentException("Expected ConsistentHashingRouter, got [%s]".format(other)) + } +} + +/** + * The core pieces of the routing logic is located in this + * trait to be able to extend. + */ +trait ConsistentHashingLike { this: RouterConfig ⇒ + + import ConsistentHashingRouter._ + + def nrOfInstances: Int + + def routees: Iterable[String] + + def virtualNodesFactor: Int + + def hashMapping: ConsistentHashMapping + + override def createRoute(routeeProvider: RouteeProvider): Route = { + if (resizer.isEmpty) { + if (routees.isEmpty) routeeProvider.createRoutees(nrOfInstances) + else routeeProvider.registerRouteesFor(routees) + } + + val log = Logging(routeeProvider.context.system, routeeProvider.context.self) + val vnodes = + if (virtualNodesFactor == 0) routeeProvider.context.system.settings.DefaultVirtualNodesFactor + else virtualNodesFactor + + // tuple of routees and the ConsistentHash, updated together in updateConsistentHash + val consistentHashRef = new AtomicReference[(IndexedSeq[ActorRef], ConsistentHash[ActorRef])]((null, null)) + updateConsistentHash() + + // update consistentHash when routees has changed + // changes to routees are rare and when no changes this is a quick operation + def updateConsistentHash(): ConsistentHash[ActorRef] = { + val oldConsistentHashTuple = consistentHashRef.get + val (oldConsistentHashRoutees, oldConsistentHash) = oldConsistentHashTuple + val currentRoutees = routeeProvider.routees + if (currentRoutees ne oldConsistentHashRoutees) { + // when other instance, same content, no need to re-hash, but try to set routees + val consistentHash = + if (currentRoutees == oldConsistentHashRoutees) oldConsistentHash + else ConsistentHash(currentRoutees, vnodes) // re-hash + // ignore, don't update, in case of CAS failure + consistentHashRef.compareAndSet(oldConsistentHashTuple, (currentRoutees, consistentHash)) + consistentHash + } else oldConsistentHash + } + + def target(hashData: Any): ActorRef = try { + val currentConsistenHash = updateConsistentHash() + if (currentConsistenHash.isEmpty) routeeProvider.context.system.deadLetters + else hashData match { + case bytes: Array[Byte] ⇒ currentConsistenHash.nodeFor(bytes) + case str: String ⇒ currentConsistenHash.nodeFor(str) + case x: AnyRef ⇒ currentConsistenHash.nodeFor(SerializationExtension(routeeProvider.context.system).serialize(x).get) + } + } catch { + case NonFatal(e) ⇒ + // serialization failed + log.warning("Couldn't route message with consistent hash key [{}] due to [{}]", hashData, e.getMessage) + routeeProvider.context.system.deadLetters + } + + { + case (sender, message) ⇒ + message match { + case Broadcast(msg) ⇒ toAll(sender, routeeProvider.routees) + case _ if hashMapping.isDefinedAt(message) ⇒ + List(Destination(sender, target(hashMapping(message)))) + case hashable: ConsistentHashable ⇒ List(Destination(sender, target(hashable.consistentHashKey))) + case other ⇒ + log.warning("Message [{}] must be handled by hashMapping, or implement [{}] or be wrapped in [{}]", + message.getClass.getName, classOf[ConsistentHashable].getName, + classOf[ConsistentHashableEnvelope].getName) + List(Destination(sender, routeeProvider.context.system.deadLetters)) + } + + } + } +} \ No newline at end of file diff --git a/akka-actor/src/main/scala/akka/routing/MurmurHash.scala b/akka-actor/src/main/scala/akka/routing/MurmurHash.scala new file mode 100644 index 0000000000..fc67613a5d --- /dev/null +++ b/akka-actor/src/main/scala/akka/routing/MurmurHash.scala @@ -0,0 +1,149 @@ +/* __ *\ +** ________ ___ / / ___ Scala API ** +** / __/ __// _ | / / / _ | (c) 2003-2011, LAMP/EPFL ** +** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ ** +** /____/\___/_/ |_/____/_/ | | ** +** |/ ** +\* */ + +/** + * An implementation of Austin Appleby's MurmurHash 3.0 algorithm + * (32 bit version); reference: http://code.google.com/p/smhasher + * + * This is the hash used by collections and case classes (including + * tuples). + * + * @author Rex Kerr + * @version 2.9 + * @since 2.9 + */ + +package akka.routing + +import java.lang.Integer.{ rotateLeft ⇒ rotl } + +/** + * An object designed to generate well-distributed non-cryptographic + * hashes. It is designed to hash a collection of integers; along with + * the integers to hash, it generates two magic streams of integers to + * increase the distribution of repetitive input sequences. Thus, + * three methods need to be called at each step (to start and to + * incorporate a new integer) to update the values. Only one method + * needs to be called to finalize the hash. + */ + +object MurmurHash { + // Magic values used for MurmurHash's 32 bit hash. + // Don't change these without consulting a hashing expert! + final private val visibleMagic: Int = 0x971e137b + final private val hiddenMagicA: Int = 0x95543787 + final private val hiddenMagicB: Int = 0x2ad7eb25 + final private val visibleMixer: Int = 0x52dce729 + final private val hiddenMixerA: Int = 0x7b7d159c + final private val hiddenMixerB: Int = 0x6bce6396 + final private val finalMixer1: Int = 0x85ebca6b + final private val finalMixer2: Int = 0xc2b2ae35 + + // Arbitrary values used for hashing certain classes + final private val seedString: Int = 0xf7ca7fd2 + final private val seedArray: Int = 0x3c074a61 + + /** The first 23 magic integers from the first stream are stored here */ + private val storedMagicA: Array[Int] = + Iterator.iterate(hiddenMagicA)(nextMagicA).take(23).toArray + + /** The first 23 magic integers from the second stream are stored here */ + private val storedMagicB: Array[Int] = + Iterator.iterate(hiddenMagicB)(nextMagicB).take(23).toArray + + /** Begin a new hash with a seed value. */ + def startHash(seed: Int): Int = seed ^ visibleMagic + + /** The initial magic integers in the first stream. */ + def startMagicA: Int = hiddenMagicA + + /** The initial magic integer in the second stream. */ + def startMagicB: Int = hiddenMagicB + + /** + * Incorporates a new value into an existing hash. + * + * @param hash the prior hash value + * @param value the new value to incorporate + * @param magicA a magic integer from the stream + * @param magicB a magic integer from a different stream + * @return the updated hash value + */ + def extendHash(hash: Int, value: Int, magicA: Int, magicB: Int): Int = + (hash ^ rotl(value * magicA, 11) * magicB) * 3 + visibleMixer + + /** Given a magic integer from the first stream, compute the next */ + def nextMagicA(magicA: Int): Int = magicA * 5 + hiddenMixerA + + /** Given a magic integer from the second stream, compute the next */ + def nextMagicB(magicB: Int): Int = magicB * 5 + hiddenMixerB + + /** Once all hashes have been incorporated, this performs a final mixing */ + def finalizeHash(hash: Int): Int = { + var i = (hash ^ (hash >>> 16)) + i *= finalMixer1 + i ^= (i >>> 13) + i *= finalMixer2 + i ^= (i >>> 16) + i + } + + /** Compute a high-quality hash of an array */ + def arrayHash[@specialized T](a: Array[T]): Int = { + var h = startHash(a.length * seedArray) + var c = hiddenMagicA + var k = hiddenMagicB + var j = 0 + while (j < a.length) { + h = extendHash(h, a(j).##, c, k) + c = nextMagicA(c) + k = nextMagicB(k) + j += 1 + } + finalizeHash(h) + } + + /** Compute a high-quality hash of a string */ + def stringHash(s: String): Int = { + var h = startHash(s.length * seedString) + var c = hiddenMagicA + var k = hiddenMagicB + var j = 0 + while (j + 1 < s.length) { + val i = (s.charAt(j) << 16) + s.charAt(j + 1); + h = extendHash(h, i, c, k) + c = nextMagicA(c) + k = nextMagicB(k) + j += 2 + } + if (j < s.length) h = extendHash(h, s.charAt(j), c, k) + finalizeHash(h) + } + + /** + * Compute a hash that is symmetric in its arguments--that is, + * where the order of appearance of elements does not matter. + * This is useful for hashing sets, for example. + */ + def symmetricHash[T](xs: TraversableOnce[T], seed: Int): Int = { + var a, b, n = 0 + var c = 1 + xs.foreach(i ⇒ { + val h = i.## + a += h + b ^= h + if (h != 0) c *= h + n += 1 + }) + var h = startHash(seed * n) + h = extendHash(h, a, storedMagicA(0), storedMagicB(0)) + h = extendHash(h, b, storedMagicA(1), storedMagicB(1)) + h = extendHash(h, c, storedMagicA(2), storedMagicB(2)) + finalizeHash(h) + } +} diff --git a/akka-actor/src/main/scala/akka/routing/Routing.scala b/akka-actor/src/main/scala/akka/routing/Routing.scala index cb6f37eb8d..8fcff84831 100644 --- a/akka-actor/src/main/scala/akka/routing/Routing.scala +++ b/akka-actor/src/main/scala/akka/routing/Routing.scala @@ -115,8 +115,8 @@ private[akka] class RoutedActorCell(_system: ActorSystemImpl, _ref: InternalActo val s = if (sender eq null) system.deadLetters else sender val msg = message match { - case Broadcast(m) ⇒ m - case m ⇒ m + case wrapped: RouterEnvelope ⇒ wrapped.message + case m ⇒ m } applyRoute(s, message) match { @@ -400,7 +400,15 @@ private object Router { * Router implementations may choose to handle this message differently. */ @SerialVersionUID(1L) -case class Broadcast(message: Any) +case class Broadcast(message: Any) extends RouterEnvelope + +/** + * Only the contained message will be forwarded to the + * destination, i.e. the envelope will be stripped off. + */ +trait RouterEnvelope { + def message: Any +} /** * Sending this message to a router will make it send back its currently used routees. @@ -588,6 +596,10 @@ case class RoundRobinRouter(nrOfInstances: Int = 0, routees: Iterable[String] = } } +/** + * The core pieces of the routing logic is located in this + * trait to be able to extend. + */ trait RoundRobinLike { this: RouterConfig ⇒ def nrOfInstances: Int @@ -721,6 +733,10 @@ case class RandomRouter(nrOfInstances: Int = 0, routees: Iterable[String] = Nil, } } +/** + * The core pieces of the routing logic is located in this + * trait to be able to extend. + */ trait RandomLike { this: RouterConfig ⇒ def nrOfInstances: Int @@ -861,6 +877,10 @@ case class SmallestMailboxRouter(nrOfInstances: Int = 0, routees: Iterable[Strin } } +/** + * The core pieces of the routing logic is located in this + * trait to be able to extend. + */ trait SmallestMailboxLike { this: RouterConfig ⇒ def nrOfInstances: Int @@ -1076,6 +1096,10 @@ case class BroadcastRouter(nrOfInstances: Int = 0, routees: Iterable[String] = N } } +/** + * The core pieces of the routing logic is located in this + * trait to be able to extend. + */ trait BroadcastLike { this: RouterConfig ⇒ def nrOfInstances: Int @@ -1205,6 +1229,10 @@ case class ScatterGatherFirstCompletedRouter(nrOfInstances: Int = 0, routees: It } } +/** + * The core pieces of the routing logic is located in this + * trait to be able to extend. + */ trait ScatterGatherFirstCompletedLike { this: RouterConfig ⇒ def nrOfInstances: Int diff --git a/akka-docs/java/code/docs/jrouting/ConsistentHashingRouterDocTest.scala b/akka-docs/java/code/docs/jrouting/ConsistentHashingRouterDocTest.scala new file mode 100644 index 0000000000..a043f9cb73 --- /dev/null +++ b/akka-docs/java/code/docs/jrouting/ConsistentHashingRouterDocTest.scala @@ -0,0 +1,5 @@ +package docs.jrouting; + +import org.scalatest.junit.JUnitSuite + +class ConsistentHashingRouterDocTest extends ConsistentHashingRouterDocTestBase with JUnitSuite diff --git a/akka-docs/java/code/docs/jrouting/ConsistentHashingRouterDocTestBase.java b/akka-docs/java/code/docs/jrouting/ConsistentHashingRouterDocTestBase.java new file mode 100644 index 0000000000..378049578f --- /dev/null +++ b/akka-docs/java/code/docs/jrouting/ConsistentHashingRouterDocTestBase.java @@ -0,0 +1,136 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ +package docs.jrouting; + +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import akka.testkit.JavaTestKit; +import akka.actor.ActorSystem; + +//#imports1 +import akka.actor.UntypedActor; +import akka.routing.ConsistentHashingRouter.ConsistentHashable; +import java.util.Map; +import java.util.HashMap; +import java.io.Serializable; +//#imports1 + +//#imports2 +import akka.actor.Props; +import akka.actor.ActorRef; +import akka.routing.ConsistentHashingRouter; +import akka.routing.ConsistentHashingRouter.ConsistentHashMapper; +import akka.routing.ConsistentHashingRouter.ConsistentHashableEnvelope; +//#imports2 + +public class ConsistentHashingRouterDocTestBase { + + static ActorSystem system; + + @BeforeClass + public static void setup() { + system = ActorSystem.create(); + } + + @AfterClass + public static void teardown() { + system.shutdown(); + } + + //#cache-actor + + public static class Cache extends UntypedActor { + Map cache = new HashMap(); + + public void onReceive(Object msg) { + if (msg instanceof Entry) { + Entry entry = (Entry) msg; + cache.put(entry.key, entry.value); + } else if (msg instanceof Get) { + Get get = (Get) msg; + Object value = cache.get(get.key); + getSender().tell(value == null ? NOT_FOUND : value, + getContext().self()); + } else if (msg instanceof Evict) { + Evict evict = (Evict) msg; + cache.remove(evict.key); + } else { + unhandled(msg); + } + } + } + + public static final class Evict implements Serializable { + public final String key; + public Evict(String key) { + this.key = key; + } + } + + public static final class Get implements Serializable, ConsistentHashable { + public final String key; + public Get(String key) { + this.key = key; + } + public Object consistentHashKey() { + return key; + } + } + + public static final class Entry implements Serializable { + public final String key; + public final String value; + public Entry(String key, String value) { + this.key = key; + this.value = value; + } + } + + public static final String NOT_FOUND = "NOT_FOUND"; + //#cache-actor + + + @Test + public void demonstrateUsageOfConsistentHashableRouter() { + + new JavaTestKit(system) {{ + + //#consistent-hashing-router + + final ConsistentHashMapper hashMapper = new ConsistentHashMapper() { + @Override + public Object hashKey(Object message) { + if (message instanceof Evict) { + return ((Evict) message).key; + } else { + return null; + } + } + }; + + ActorRef cache = system.actorOf(new Props(Cache.class).withRouter( + new ConsistentHashingRouter(10).withHashMapper(hashMapper)), + "cache"); + + cache.tell(new ConsistentHashableEnvelope( + new Entry("hello", "HELLO"), "hello"), getRef()); + cache.tell(new ConsistentHashableEnvelope( + new Entry("hi", "HI"), "hi"), getRef()); + + cache.tell(new Get("hello"), getRef()); + expectMsgEquals("HELLO"); + + cache.tell(new Get("hi"), getRef()); + expectMsgEquals("HI"); + + cache.tell(new Evict("hi"), getRef()); + cache.tell(new Get("hi"), getRef()); + expectMsgEquals(NOT_FOUND); + + //#consistent-hashing-router + }}; + } + +} diff --git a/akka-docs/java/routing.rst b/akka-docs/java/routing.rst index 845e2825e3..4653a07692 100644 --- a/akka-docs/java/routing.rst +++ b/akka-docs/java/routing.rst @@ -15,13 +15,14 @@ is really easy to create your own. The routers shipped with Akka are: * ``akka.routing.SmallestMailboxRouter`` * ``akka.routing.BroadcastRouter`` * ``akka.routing.ScatterGatherFirstCompletedRouter`` +* ``akka.routing.ConsistentHashingRouter`` Routers In Action ^^^^^^^^^^^^^^^^^ This is an example of how to create a router that is defined in configuration: -.. includecode:: ../scala/code/docs/routing/RouterViaConfigExample.scala#config +.. includecode:: ../scala/code/docs/routing/RouterViaConfigDocSpec.scala#config-round-robin .. includecode:: code/docs/jrouting/RouterViaConfigExample.java#configurableRouting @@ -177,6 +178,10 @@ is exactly what you would expect from a round-robin router to happen. (The name of an actor is automatically created in the format ``$letter`` unless you specify it - hence the names printed above.) +This is an example of how to define a round-robin router in configuration: + +.. includecode:: ../scala/code/docs/routing/RouterViaConfigDocSpec.scala#config-round-robin + RandomRouter ************ As the name implies this router type selects one of its routees randomly and forwards @@ -204,6 +209,10 @@ When run you should see a similar output to this: The result from running the random router should be different, or at least random, every time you run it. Try to run it a couple of times to verify its behavior if you don't trust us. +This is an example of how to define a random router in configuration: + +.. includecode:: ../scala/code/docs/routing/RouterViaConfigDocSpec.scala#config-random + SmallestMailboxRouter ********************* A Router that tries to send to the non-suspended routee with fewest messages in mailbox. @@ -219,6 +228,10 @@ Code example: .. includecode:: code/docs/jrouting/ParentActor.java#smallestMailboxRouter +This is an example of how to define a smallest-mailbox router in configuration: + +.. includecode:: ../scala/code/docs/routing/RouterViaConfigDocSpec.scala#config-smallest-mailbox + BroadcastRouter *************** A broadcast router forwards the message it receives to *all* its routees. @@ -238,6 +251,10 @@ When run you should see a similar output to this: As you can see here above each of the routees, five in total, received the broadcast message. +This is an example of how to define a broadcast router in configuration: + +.. includecode:: ../scala/code/docs/routing/RouterViaConfigDocSpec.scala#config-broadcast + ScatterGatherFirstCompletedRouter ********************************* The ScatterGatherFirstCompletedRouter will send the message on to all its routees as a future. @@ -255,6 +272,51 @@ When run you should see this: From the output above you can't really see that all the routees performed the calculation, but they did! The result you see is from the first routee that returned its calculation to the router. +This is an example of how to define a scatter-gather router in configuration: + +.. includecode:: ../scala/code/docs/routing/RouterViaConfigDocSpec.scala#config-scatter-gather + +ConsistentHashingRouter +*********************** + +The ConsistentHashingRouter uses `consistent hashing `_ +to select a connection based on the sent message. This +`article `_ gives good +insight into how consistent hashing is implemented. + +There is 3 ways to define what data to use for the consistent hash key. + +* You can define ``withHashMapper`` of the router to map incoming + messages to their consistent hash key. This makes the the decision + transparent for the sender. + +* The messages may implement ``akka.routing.ConsistentHashingRouter.ConsistentHashable``. + The key is part of the message and it's convenient to define it together + with the message definition. + +* The messages can be be wrapped in a ``akka.routing.ConsistentHashingRouter.ConsistentHashableEnvelope`` + to define what data to use for the consistent hash key. The sender knows + the key to use. + +These ways to define the consistent hash key can be use together and at +the same time for one router. The ``withHashMapper`` is tried first. + +Code example: + +.. includecode:: code/docs/jrouting/ConsistentHashingRouterDocTestBase.java + :include: imports1,cache-actor + +.. includecode:: code/docs/jrouting/ConsistentHashingRouterDocTestBase.java + :include: imports2,consistent-hashing-router + +In the above example you see that the ``Get`` message implements ``ConsistentHashable`` itself, +while the ``Entry`` message is wrapped in a ``ConsistentHashableEnvelope``. The ``Evict`` +message is handled by the ``withHashMapper``. + +This is an example of how to define a consistent-hashing router in configuration: + +.. includecode:: ../scala/code/docs/routing/RouterViaConfigDocSpec.scala#config-consistent-hashing + Broadcast Messages ^^^^^^^^^^^^^^^^^^ @@ -276,7 +338,7 @@ of routees dynamically. This is an example of how to create a resizable router that is defined in configuration: -.. includecode:: ../scala/code/docs/routing/RouterViaConfigExample.scala#config-resize +.. includecode:: ../scala/code/docs/routing/RouterViaConfigDocSpec.scala#config-resize .. includecode:: code/docs/jrouting/RouterViaConfigExample.java#configurableRoutingWithResizer diff --git a/akka-docs/project/migration-guide-2.0.x-2.1.x.rst b/akka-docs/project/migration-guide-2.0.x-2.1.x.rst index 04284b3f12..ec04dacdda 100644 --- a/akka-docs/project/migration-guide-2.0.x-2.1.x.rst +++ b/akka-docs/project/migration-guide-2.0.x-2.1.x.rst @@ -287,7 +287,6 @@ Both Actors and UntypedActors using ``Stash`` now overrides postStop to make sur stashed messages are put into the dead letters when the actor stops, make sure you call super.postStop if you override it. - Forward of Terminated message ============================= @@ -442,3 +441,20 @@ v2.1:: final isSet = timeout.isFinite(); getContext().setReceiveTimeout(Duration.Undefined()); +ConsistentHash +============== + +``akka.routing.ConsistentHash`` has been changed to an immutable data structure. + +v2.0:: + + val consistentHash = new ConsistentHash(Seq(a1, a2, a3), replicas = 10) + consistentHash += a4 + val a = consistentHash.nodeFor(data) + +v2.1:: + + var consistentHash = ConsistentHash(Seq(a1, a2, a3), replicas = 10) + consistentHash = consistentHash :+ a4 + val a = consistentHash.nodeFor(data) + diff --git a/akka-docs/scala/code/docs/routing/ConsistentHashingRouterDocSpec.scala b/akka-docs/scala/code/docs/routing/ConsistentHashingRouterDocSpec.scala new file mode 100644 index 0000000000..dcb40b0c67 --- /dev/null +++ b/akka-docs/scala/code/docs/routing/ConsistentHashingRouterDocSpec.scala @@ -0,0 +1,73 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ +package docs.routing + +import akka.testkit.AkkaSpec +import akka.testkit.ImplicitSender + +object ConsistentHashingRouterDocSpec { + + //#cache-actor + import akka.actor.Actor + import akka.routing.ConsistentHashingRouter.ConsistentHashable + + class Cache extends Actor { + var cache = Map.empty[String, String] + + def receive = { + case Entry(key, value) ⇒ cache += (key -> value) + case Get(key) ⇒ sender ! cache.get(key) + case Evict(key) ⇒ cache -= key + } + } + + case class Evict(key: String) + + case class Get(key: String) extends ConsistentHashable { + override def consistentHashKey: Any = key + } + + case class Entry(key: String, value: String) + //#cache-actor + +} + +class ConsistentHashingRouterDocSpec extends AkkaSpec with ImplicitSender { + + import ConsistentHashingRouterDocSpec._ + + "demonstrate usage of ConsistentHashableRouter" in { + + //#consistent-hashing-router + import akka.actor.Props + import akka.routing.ConsistentHashingRouter + import akka.routing.ConsistentHashingRouter.ConsistentHashMapping + import akka.routing.ConsistentHashingRouter.ConsistentHashableEnvelope + + def hashMapping: ConsistentHashMapping = { + case Evict(key) ⇒ key + } + + val cache = system.actorOf(Props[Cache].withRouter(ConsistentHashingRouter(10, + hashMapping = hashMapping)), name = "cache") + + cache ! ConsistentHashableEnvelope( + message = Entry("hello", "HELLO"), hashKey = "hello") + cache ! ConsistentHashableEnvelope( + message = Entry("hi", "HI"), hashKey = "hi") + + cache ! Get("hello") + expectMsg(Some("HELLO")) + + cache ! Get("hi") + expectMsg(Some("HI")) + + cache ! Evict("hi") + cache ! Get("hi") + expectMsg(None) + + //#consistent-hashing-router + } + +} diff --git a/akka-docs/scala/code/docs/routing/RouterViaConfigDocSpec.scala b/akka-docs/scala/code/docs/routing/RouterViaConfigDocSpec.scala new file mode 100644 index 0000000000..24dc291087 --- /dev/null +++ b/akka-docs/scala/code/docs/routing/RouterViaConfigDocSpec.scala @@ -0,0 +1,158 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ +package docs.routing + +import akka.actor.{ Actor, Props, ActorSystem, ActorLogging } +import com.typesafe.config.ConfigFactory +import akka.routing.FromConfig +import akka.routing.ConsistentHashingRouter.ConsistentHashable +import akka.testkit.AkkaSpec +import akka.testkit.ImplicitSender + +object RouterWithConfigDocSpec { + + val config = ConfigFactory.parseString(""" + + //#config-round-robin + akka.actor.deployment { + /myrouter1 { + router = round-robin + nr-of-instances = 5 + } + } + //#config-round-robin + + //#config-resize + akka.actor.deployment { + /myrouter2 { + router = round-robin + resizer { + lower-bound = 2 + upper-bound = 15 + } + } + } + //#config-resize + + //#config-random + akka.actor.deployment { + /myrouter3 { + router = random + nr-of-instances = 5 + } + } + //#config-random + + //#config-smallest-mailbox + akka.actor.deployment { + /myrouter4 { + router = smallest-mailbox + nr-of-instances = 5 + } + } + //#config-smallest-mailbox + + //#config-broadcast + akka.actor.deployment { + /myrouter5 { + router = broadcast + nr-of-instances = 5 + } + } + //#config-broadcast + + //#config-scatter-gather + akka.actor.deployment { + /myrouter6 { + router = scatter-gather + nr-of-instances = 5 + within = 10 seconds + } + } + //#config-scatter-gather + + //#config-consistent-hashing + akka.actor.deployment { + /myrouter7 { + router = consistent-hashing + nr-of-instances = 5 + virtual-nodes-factor = 10 + } + } + //#config-consistent-hashing + + """) + + case class Message(nbr: Int) extends ConsistentHashable { + override def consistentHashKey = nbr + } + + class ExampleActor extends Actor with ActorLogging { + def receive = { + case Message(nbr) ⇒ + log.debug("Received %s in router %s".format(nbr, self.path.name)) + sender ! nbr + } + } + +} + +class RouterWithConfigDocSpec extends AkkaSpec(RouterWithConfigDocSpec.config) with ImplicitSender { + + import RouterWithConfigDocSpec._ + + "demonstrate configured round-robin router" in { + //#configurableRouting + val router = system.actorOf(Props[ExampleActor].withRouter(FromConfig()), + "myrouter1") + //#configurableRouting + 1 to 10 foreach { i ⇒ router ! Message(i) } + receiveN(10) + } + + "demonstrate configured random router" in { + val router = system.actorOf(Props[ExampleActor].withRouter(FromConfig()), + "myrouter3") + 1 to 10 foreach { i ⇒ router ! Message(i) } + receiveN(10) + } + + "demonstrate configured smallest-mailbox router" in { + val router = system.actorOf(Props[ExampleActor].withRouter(FromConfig()), + "myrouter4") + 1 to 10 foreach { i ⇒ router ! Message(i) } + receiveN(10) + } + + "demonstrate configured broadcast router" in { + val router = system.actorOf(Props[ExampleActor].withRouter(FromConfig()), + "myrouter5") + 1 to 10 foreach { i ⇒ router ! Message(i) } + receiveN(5 * 10) + } + + "demonstrate configured scatter-gather router" in { + val router = system.actorOf(Props[ExampleActor].withRouter(FromConfig()), + "myrouter6") + 1 to 10 foreach { i ⇒ router ! Message(i) } + receiveN(10) + } + + "demonstrate configured consistent-hashing router" in { + val router = system.actorOf(Props[ExampleActor].withRouter(FromConfig()), + "myrouter7") + 1 to 10 foreach { i ⇒ router ! Message(i) } + receiveN(10) + } + + "demonstrate configured round-robin router with resizer" in { + //#configurableRoutingWithResizer + val router = system.actorOf(Props[ExampleActor].withRouter(FromConfig()), + "myrouter2") + //#configurableRoutingWithResizer + 1 to 10 foreach { i ⇒ router ! Message(i) } + receiveN(10) + } + +} \ No newline at end of file diff --git a/akka-docs/scala/code/docs/routing/RouterViaConfigExample.scala b/akka-docs/scala/code/docs/routing/RouterViaConfigExample.scala deleted file mode 100644 index 5d34e429bb..0000000000 --- a/akka-docs/scala/code/docs/routing/RouterViaConfigExample.scala +++ /dev/null @@ -1,52 +0,0 @@ -/** - * Copyright (C) 2009-2012 Typesafe Inc. - */ -package docs.routing - -import akka.actor.{ Actor, Props, ActorSystem } -import com.typesafe.config.ConfigFactory -import akka.routing.FromConfig - -case class Message(nbr: Int) - -class ExampleActor extends Actor { - def receive = { - case Message(nbr) ⇒ println("Received %s in router %s".format(nbr, self.path.name)) - } -} - -object RouterWithConfigExample extends App { - val config = ConfigFactory.parseString(""" - //#config - akka.actor.deployment { - /router { - router = round-robin - nr-of-instances = 5 - } - } - //#config - //#config-resize - akka.actor.deployment { - /router2 { - router = round-robin - resizer { - lower-bound = 2 - upper-bound = 15 - } - } - } - //#config-resize - """) - val system = ActorSystem("Example", config) - //#configurableRouting - val router = system.actorOf(Props[ExampleActor].withRouter(FromConfig()), - "router") - //#configurableRouting - 1 to 10 foreach { i ⇒ router ! Message(i) } - - //#configurableRoutingWithResizer - val router2 = system.actorOf(Props[ExampleActor].withRouter(FromConfig()), - "router2") - //#configurableRoutingWithResizer - 1 to 10 foreach { i ⇒ router2 ! Message(i) } -} \ No newline at end of file diff --git a/akka-docs/scala/routing.rst b/akka-docs/scala/routing.rst index 353e82d277..c1fa21b23f 100644 --- a/akka-docs/scala/routing.rst +++ b/akka-docs/scala/routing.rst @@ -15,15 +15,16 @@ is really easy to create your own. The routers shipped with Akka are: * ``akka.routing.SmallestMailboxRouter`` * ``akka.routing.BroadcastRouter`` * ``akka.routing.ScatterGatherFirstCompletedRouter`` +* ``akka.routing.ConsistentHashingRouter`` Routers In Action ^^^^^^^^^^^^^^^^^ This is an example of how to create a router that is defined in configuration: -.. includecode:: code/docs/routing/RouterViaConfigExample.scala#config +.. includecode:: code/docs/routing/RouterViaConfigDocSpec.scala#config-round-robin -.. includecode:: code/docs/routing/RouterViaConfigExample.scala#configurableRouting +.. includecode:: code/docs/routing/RouterViaConfigDocSpec.scala#configurableRouting This is an example of how to programmatically create a router and set the number of routees it should create: @@ -125,7 +126,7 @@ not have an effect on the number of actors in the pool. Setting the strategy is easily done: -.. includecode:: ../../akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala +.. includecode:: ../../akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala#supervision :include: supervision :exclude: custom-strategy @@ -179,6 +180,10 @@ is exactly what you would expect from a round-robin router to happen. (The name of an actor is automatically created in the format ``$letter`` unless you specify it - hence the names printed above.) +This is an example of how to define a round-robin router in configuration: + +.. includecode:: code/docs/routing/RouterViaConfigDocSpec.scala#config-round-robin + RandomRouter ************ As the name implies this router type selects one of its routees randomly and forwards @@ -206,6 +211,10 @@ When run you should see a similar output to this: The result from running the random router should be different, or at least random, every time you run it. Try to run it a couple of times to verify its behavior if you don't trust us. +This is an example of how to define a random router in configuration: + +.. includecode:: code/docs/routing/RouterViaConfigDocSpec.scala#config-random + SmallestMailboxRouter ********************* A Router that tries to send to the non-suspended routee with fewest messages in mailbox. @@ -221,6 +230,11 @@ Code example: .. includecode:: code/docs/routing/RouterTypeExample.scala#smallestMailboxRouter + +This is an example of how to define a smallest-mailbox router in configuration: + +.. includecode:: code/docs/routing/RouterViaConfigDocSpec.scala#config-smallest-mailbox + BroadcastRouter *************** A broadcast router forwards the message it receives to *all* its routees. @@ -240,6 +254,11 @@ When run you should see a similar output to this: As you can see here above each of the routees, five in total, received the broadcast message. +This is an example of how to define a broadcast router in configuration: + +.. includecode:: code/docs/routing/RouterViaConfigDocSpec.scala#config-broadcast + + ScatterGatherFirstCompletedRouter ********************************* The ScatterGatherFirstCompletedRouter will send the message on to all its routees as a future. @@ -257,6 +276,51 @@ When run you should see this: From the output above you can't really see that all the routees performed the calculation, but they did! The result you see is from the first routee that returned its calculation to the router. +This is an example of how to define a scatter-gather router in configuration: + +.. includecode:: code/docs/routing/RouterViaConfigDocSpec.scala#config-scatter-gather + + +ConsistentHashingRouter +*********************** + +The ConsistentHashingRouter uses `consistent hashing `_ +to select a connection based on the sent message. This +`article `_ gives good +insight into how consistent hashing is implemented. + +There is 3 ways to define what data to use for the consistent hash key. + +* You can define ``hashMapping`` of the router to map incoming + messages to their consistent hash key. This makes the decision + transparent for the sender. + +* The messages may implement ``akka.routing.ConsistentHashingRouter.ConsistentHashable``. + The key is part of the message and it's convenient to define it together + with the message definition. + +* The messages can be be wrapped in a ``akka.routing.ConsistentHashingRouter.ConsistentHashableEnvelope`` + to define what data to use for the consistent hash key. The sender knows + the key to use. + +These ways to define the consistent hash key can be use together and at +the same time for one router. The ``hashMapping`` is tried first. + +Code example: + +.. includecode:: code/docs/routing/ConsistentHashingRouterDocSpec.scala#cache-actor + +.. includecode:: code/docs/routing/ConsistentHashingRouterDocSpec.scala#consistent-hashing-router + +In the above example you see that the ``Get`` message implements ``ConsistentHashable`` itself, +while the ``Entry`` message is wrapped in a ``ConsistentHashableEnvelope``. The ``Evict`` +message is handled by the ``hashMapping`` partial function. + +This is an example of how to define a consistent-hashing router in configuration: + +.. includecode:: code/docs/routing/RouterViaConfigDocSpec.scala#config-consistent-hashing + + Broadcast Messages ^^^^^^^^^^^^^^^^^^ @@ -278,9 +342,9 @@ of routees dynamically. This is an example of how to create a resizable router that is defined in configuration: -.. includecode:: code/docs/routing/RouterViaConfigExample.scala#config-resize +.. includecode:: code/docs/routing/RouterViaConfigDocSpec.scala#config-resize -.. includecode:: code/docs/routing/RouterViaConfigExample.scala#configurableRoutingWithResizer +.. includecode:: code/docs/routing/RouterViaConfigDocSpec.scala#configurableRoutingWithResizer Several more configuration options are available and described in ``akka.actor.deployment.default.resizer`` section of the reference :ref:`configuration`.