From 705c118ea279a6f886c270c917d3c60638927cef Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Sat, 8 Sep 2012 20:54:16 +0200 Subject: [PATCH 01/19] Pick correct node at the ring boundary and change to TreeMap, see #944 * The small problem was that it picked node counter clockwise in the ring for all, except for crc < firstKey, where it picked firstKey instead of lastKey * Changed to clockwise selection and correct selection at the boundary, i.e. hash > lastKey => firstKey and hash < firstKey => firstKey * Changed to TreeMap instead of the 2 separate TreeSet and Map collections * Cleanup and documentation --- .../scala/akka/routing/ConsistentHash.scala | 137 +++++++++++------- 1 file changed, 84 insertions(+), 53 deletions(-) diff --git a/akka-actor/src/main/scala/akka/routing/ConsistentHash.scala b/akka-actor/src/main/scala/akka/routing/ConsistentHash.scala index afa321d07d..ae14612e99 100644 --- a/akka-actor/src/main/scala/akka/routing/ConsistentHash.scala +++ b/akka-actor/src/main/scala/akka/routing/ConsistentHash.scala @@ -4,59 +4,7 @@ package akka.routing -import scala.collection.immutable.{ TreeSet, Seq } -import scala.collection.mutable.{ Buffer, Map } - -// ============================================================================================= -// Adapted from HashRing.scala in Debasish Ghosh's Redis Client, licensed under Apache 2 license -// ============================================================================================= - -/** - * Consistent Hashing node ring abstraction. - * - * Not thread-safe, to be used from within an Actor or protected some other way. - */ -class ConsistentHash[T](nodes: Seq[T], replicas: Int) { - private val cluster = Buffer[T]() - private var sortedKeys = TreeSet[Long]() - private var ring = Map[Long, T]() - - nodes.foreach(this += _) - - def +=(node: T): Unit = { - cluster += node - (1 to replicas) foreach { replica ⇒ - val key = hashFor((node + ":" + replica).getBytes("UTF-8")) - ring += (key -> node) - sortedKeys = sortedKeys + key - } - } - - def -=(node: T): Unit = { - cluster -= node - (1 to replicas) foreach { replica ⇒ - val key = hashFor((node + ":" + replica).getBytes("UTF-8")) - ring -= key - sortedKeys = sortedKeys - key - } - } - - def nodeFor(key: Array[Byte]): T = { - val hash = hashFor(key) - if (sortedKeys contains hash) ring(hash) - else { - if (hash < sortedKeys.firstKey) ring(sortedKeys.firstKey) - else if (hash > sortedKeys.lastKey) ring(sortedKeys.lastKey) - else ring(sortedKeys.rangeImpl(None, Some(hash)).lastKey) - } - } - - private def hashFor(bytes: Array[Byte]): Long = { - val hash = MurmurHash.arrayHash(bytes) - if (hash == Int.MinValue) hash + 1 - math.abs(hash) - } -} +import scala.collection.immutable.TreeMap /* __ *\ ** ________ ___ / / ___ Scala API ** @@ -80,6 +28,89 @@ class ConsistentHash[T](nodes: Seq[T], replicas: Int) { import java.lang.Integer.{ rotateLeft ⇒ rotl } +// ============================================================================================= +// Adapted from HashRing.scala in Debasish Ghosh's Redis Client, licensed under Apache 2 license +// ============================================================================================= + +/** + * Consistent Hashing node ring abstraction. + * + * A good explanation of Consistent Hashing: + * http://weblogs.java.net/blog/tomwhite/archive/2007/11/consistent_hash.html + * + * Note that toString of the ring nodes are used for the node + * hash, i.e. make sure it is different for different nodes. + * + * Not thread-safe, to be used from within an Actor or protected some other way. + */ +class ConsistentHash[T](nodes: Iterable[T], replicas: Int) { + private var ring = TreeMap[Int, T]() + + if (replicas < 1) throw new IllegalArgumentException("replicas must be >= 1") + + nodes.foreach(this += _) + + /** + * Adds a node to the ring. + */ + def +=(node: T): Unit = { + (1 to replicas) foreach { replica ⇒ + ring += (nodeHashFor(node, replica) -> node) + } + } + + /** + * Adds a node to the ring. + * JAVA API + */ + def add(node: T): Unit = this += node + + /** + * Removes a node from the ring. + */ + def -=(node: T): Unit = { + (1 to replicas) foreach { replica ⇒ + ring -= nodeHashFor(node, replica) + } + } + + /** + * Removes a node from the ring. + * JAVA API + */ + def remove(node: T): Unit = this -= node + + /** + * Get the node responsible for the data key. + * Can only be used if nodes exists in the ring, + * otherwise throws `IllegalStateException` + */ + def nodeFor(key: Array[Byte]): T = { + if (isEmpty) throw new IllegalStateException("Can't get node for [%s] from an empty ring" format key) + val hash = hashFor(key) + def nextClockwise: T = { + val (ringKey, node) = ring.rangeImpl(Some(hash), None).headOption.getOrElse(ring.head) + node + } + ring.getOrElse(hash, nextClockwise) + } + + /** + * Is the ring empty, i.e. no nodes added or all removed. + */ + def isEmpty: Boolean = ring.isEmpty + + private def nodeHashFor(node: T, replica: Int): Int = { + hashFor((node + ":" + replica).getBytes("UTF-8")) + } + + private def hashFor(bytes: Array[Byte]): Int = { + val hash = MurmurHash.arrayHash(bytes) + if (hash == Int.MinValue) hash + 1 + math.abs(hash) + } +} + /** * A class designed to generate well-distributed non-cryptographic * hashes. It is designed to be passed to a collection's foreach method, From 521d20ba73561c7682bafd31e652a0bce5602d9d Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Sun, 9 Sep 2012 16:47:43 +0200 Subject: [PATCH 02/19] Make ConsistentHash immutable, see #944 * Makes it thread safe * Also changed two scary (mutable Array) fields in MurmurHash to private * Note in migration guide about changed api for ConsistentHash --- .../scala/akka/routing/ConsistentHash.scala | 58 ++++++++++++------- .../project/migration-guide-2.0.x-2.1.x.rst | 18 +++++- 2 files changed, 55 insertions(+), 21 deletions(-) diff --git a/akka-actor/src/main/scala/akka/routing/ConsistentHash.scala b/akka-actor/src/main/scala/akka/routing/ConsistentHash.scala index ae14612e99..21f9bffbce 100644 --- a/akka-actor/src/main/scala/akka/routing/ConsistentHash.scala +++ b/akka-actor/src/main/scala/akka/routing/ConsistentHash.scala @@ -5,6 +5,7 @@ package akka.routing import scala.collection.immutable.TreeMap +import java.lang.Integer.{ rotateLeft ⇒ rotl } /* __ *\ ** ________ ___ / / ___ Scala API ** @@ -26,8 +27,6 @@ import scala.collection.immutable.TreeMap * @since 2.9 */ -import java.lang.Integer.{ rotateLeft ⇒ rotl } - // ============================================================================================= // Adapted from HashRing.scala in Debasish Ghosh's Redis Client, licensed under Apache 2 license // ============================================================================================= @@ -41,44 +40,46 @@ import java.lang.Integer.{ rotateLeft ⇒ rotl } * Note that toString of the ring nodes are used for the node * hash, i.e. make sure it is different for different nodes. * - * Not thread-safe, to be used from within an Actor or protected some other way. */ -class ConsistentHash[T](nodes: Iterable[T], replicas: Int) { - private var ring = TreeMap[Int, T]() +class ConsistentHash[T] private (replicas: Int, ring: TreeMap[Int, T]) { + + import ConsistentHash._ if (replicas < 1) throw new IllegalArgumentException("replicas must be >= 1") - nodes.foreach(this += _) - /** * Adds a node to the ring. + * Note that the instance is immutable and this + * operation returns a new instance. */ - def +=(node: T): Unit = { - (1 to replicas) foreach { replica ⇒ - ring += (nodeHashFor(node, replica) -> node) - } + def :+(node: T): ConsistentHash[T] = { + new ConsistentHash(replicas, ring ++ ((1 to replicas) map { r ⇒ (nodeHashFor(node, r) -> node) })) } /** * Adds a node to the ring. + * Note that the instance is immutable and this + * operation returns a new instance. * JAVA API */ - def add(node: T): Unit = this += node + def add(node: T): ConsistentHash[T] = this :+ node /** * Removes a node from the ring. + * Note that the instance is immutable and this + * operation returns a new instance. */ - def -=(node: T): Unit = { - (1 to replicas) foreach { replica ⇒ - ring -= nodeHashFor(node, replica) - } + def :-(node: T): ConsistentHash[T] = { + new ConsistentHash(replicas, ring -- ((1 to replicas) map { r ⇒ nodeHashFor(node, r) })) } /** * Removes a node from the ring. + * Note that the instance is immutable and this + * operation returns a new instance. * JAVA API */ - def remove(node: T): Unit = this -= node + def remove(node: T): ConsistentHash[T] = this :- node /** * Get the node responsible for the data key. @@ -100,7 +101,24 @@ class ConsistentHash[T](nodes: Iterable[T], replicas: Int) { */ def isEmpty: Boolean = ring.isEmpty - private def nodeHashFor(node: T, replica: Int): Int = { +} + +object ConsistentHash { + def apply[T](nodes: Iterable[T], replicas: Int) = { + new ConsistentHash(replicas, TreeMap.empty[Int, T] ++ + (for (node ← nodes; replica ← 1 to replicas) yield (nodeHashFor(node, replica) -> node))) + } + + /** + * Factory method to create a ConsistentHash + * JAVA API + */ + def create[T](nodes: java.lang.Iterable[T], replicas: Int) = { + import scala.collection.JavaConverters._ + apply(nodes.asScala, replicas) + } + + private def nodeHashFor(node: Any, replica: Int): Int = { hashFor((node + ":" + replica).getBytes("UTF-8")) } @@ -189,11 +207,11 @@ object MurmurHash { final private val seedArray: Int = 0x3c074a61 /** The first 23 magic integers from the first stream are stored here */ - val storedMagicA: Array[Int] = + private val storedMagicA: Array[Int] = Iterator.iterate(hiddenMagicA)(nextMagicA).take(23).toArray /** The first 23 magic integers from the second stream are stored here */ - val storedMagicB: Array[Int] = + private val storedMagicB: Array[Int] = Iterator.iterate(hiddenMagicB)(nextMagicB).take(23).toArray /** Begin a new hash with a seed value. */ diff --git a/akka-docs/project/migration-guide-2.0.x-2.1.x.rst b/akka-docs/project/migration-guide-2.0.x-2.1.x.rst index fd5629ea3b..2a9f925f7f 100644 --- a/akka-docs/project/migration-guide-2.0.x-2.1.x.rst +++ b/akka-docs/project/migration-guide-2.0.x-2.1.x.rst @@ -286,7 +286,6 @@ Both Actors and UntypedActors using ``Stash`` now overrides postStop to make sur stashed messages are put into the dead letters when the actor stops, make sure you call super.postStop if you override it. - Forward of Terminated message ============================= @@ -359,3 +358,20 @@ v2.1:: -requestedCapacity, stopDelay) +ConsistentHash +============== + +``akka.routing.ConsistentHash`` has been changed to an immutable data structure. + +v2.0:: + + val consistentHash = new ConsistentHash(Seq(a1, a2, a3), replicas = 10) + consistentHash += a4 + val a = consistentHash.nodeFor(data) + +v2.1:: + + var consistentHash = ConsistentHash(Seq(a1, a2, a3), replicas = 10) + consistentHash = consistentHash :+ a4 + val a = consistentHash.nodeFor(data) + From f6dcee423bb3e0bc9e79e88748c83d7d9e195858 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Tue, 11 Sep 2012 15:10:16 +0200 Subject: [PATCH 03/19] Implement ConsistentHashingRouter, see #944 * Added trait RouterEnvelope to handle Broadcast and ConsistentHashableEnvelope in same way, could also be useful for custom routers --- .../test/scala/akka/actor/DeployerSpec.scala | 7 + .../routing/ConsistentHashingRouterSpec.scala | 79 +++++++ .../src/main/scala/akka/actor/Deployer.scala | 13 +- .../routing/ConsistentHashingRouter.scala | 212 ++++++++++++++++++ .../src/main/scala/akka/routing/Routing.scala | 14 +- 5 files changed, 316 insertions(+), 9 deletions(-) create mode 100644 akka-actor-tests/src/test/scala/akka/routing/ConsistentHashingRouterSpec.scala create mode 100644 akka-actor/src/main/scala/akka/routing/ConsistentHashingRouter.scala diff --git a/akka-actor-tests/src/test/scala/akka/actor/DeployerSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/DeployerSpec.scala index 1aef438627..37aa133583 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/DeployerSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/DeployerSpec.scala @@ -35,6 +35,9 @@ object DeployerSpec { router = scatter-gather within = 2 seconds } + /service-consistent-hashing { + router = consistent-hashing + } /service-resizer { router = round-robin resizer { @@ -118,6 +121,10 @@ class DeployerSpec extends AkkaSpec(DeployerSpec.deployerConf) { assertRouting("/service-scatter-gather", ScatterGatherFirstCompletedRouter(nrOfInstances = 1, within = 2 seconds), "/service-scatter-gather") } + "be able to parse 'akka.actor.deployment._' with consistent-hashing router" in { + assertRouting("/service-consistent-hashing", ConsistentHashingRouter(1), "/service-consistent-hashing") + } + "be able to parse 'akka.actor.deployment._' with router resizer" in { val resizer = DefaultResizer() assertRouting("/service-resizer", RoundRobinRouter(resizer = Some(resizer)), "/service-resizer") diff --git a/akka-actor-tests/src/test/scala/akka/routing/ConsistentHashingRouterSpec.scala b/akka-actor-tests/src/test/scala/akka/routing/ConsistentHashingRouterSpec.scala new file mode 100644 index 0000000000..fd7a49e867 --- /dev/null +++ b/akka-actor-tests/src/test/scala/akka/routing/ConsistentHashingRouterSpec.scala @@ -0,0 +1,79 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ +package akka.routing + +import scala.concurrent.Await + +import akka.actor.Actor +import akka.actor.ActorRef +import akka.actor.Props +import akka.actor.actorRef2Scala +import akka.pattern.ask +import akka.routing.ConsistentHashingRouter.ConsistentHashable +import akka.routing.ConsistentHashingRouter.ConsistentHashableEnvelope +import akka.testkit.AkkaSpec +import akka.testkit._ + +object ConsistentHashingRouterSpec { + + val config = """ + akka.actor.deployment { + /router1 { + router = consistent-hashing + nr-of-instances = 3 + } + } + """ + + class Echo extends Actor { + def receive = { + case _ ⇒ sender ! self + } + } + + case class Msg(key: Any, data: String) extends ConsistentHashable { + override def consistentHashKey = key + } + + case class MsgKey(name: String) +} + +@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) +class ConsistentHashingRouterSpec extends AkkaSpec(ConsistentHashingRouterSpec.config) with DefaultTimeout with ImplicitSender { + import akka.routing.ConsistentHashingRouterSpec._ + implicit val ec = system.dispatcher + + val router1 = system.actorOf(Props[Echo].withRouter(ConsistentHashingRouter()), "router1") + + "consistent hashing router" must { + "create routees from configuration" in { + val currentRoutees = Await.result(router1 ? CurrentRoutees, remaining).asInstanceOf[RouterRoutees] + currentRoutees.routees.size must be(3) + } + + "select destination based on consistentHashKey of the message" in { + router1 ! Msg("a", "A") + val destinationA = expectMsgPF(remaining) { case ref: ActorRef ⇒ ref } + router1 ! new ConsistentHashableEnvelope { + override def consistentHashKey = "a" + override def message = "AA" + } + expectMsg(destinationA) + + router1 ! Msg(17, "B") + val destinationB = expectMsgPF(remaining) { case ref: ActorRef ⇒ ref } + router1 ! new ConsistentHashableEnvelope { + override def consistentHashKey = 17 + override def message = "BB" + } + expectMsg(destinationB) + + router1 ! Msg(MsgKey("c"), "C") + val destinationC = expectMsgPF(remaining) { case ref: ActorRef ⇒ ref } + router1 ! Msg(MsgKey("c"), "CC") + expectMsg(destinationC) + } + } + +} diff --git a/akka-actor/src/main/scala/akka/actor/Deployer.scala b/akka-actor/src/main/scala/akka/actor/Deployer.scala index 9ceebaacb3..be9da1a505 100644 --- a/akka-actor/src/main/scala/akka/actor/Deployer.scala +++ b/akka-actor/src/main/scala/akka/actor/Deployer.scala @@ -148,12 +148,13 @@ private[akka] class Deployer(val settings: ActorSystem.Settings, val dynamicAcce val resizer: Option[Resizer] = if (config.hasPath("resizer")) Some(DefaultResizer(deployment.getConfig("resizer"))) else None val router: RouterConfig = deployment.getString("router") match { - case "from-code" ⇒ NoRouter - case "round-robin" ⇒ RoundRobinRouter(nrOfInstances, routees, resizer) - case "random" ⇒ RandomRouter(nrOfInstances, routees, resizer) - case "smallest-mailbox" ⇒ SmallestMailboxRouter(nrOfInstances, routees, resizer) - case "scatter-gather" ⇒ ScatterGatherFirstCompletedRouter(nrOfInstances, routees, within, resizer) - case "broadcast" ⇒ BroadcastRouter(nrOfInstances, routees, resizer) + case "from-code" ⇒ NoRouter + case "round-robin" ⇒ RoundRobinRouter(nrOfInstances, routees, resizer) + case "random" ⇒ RandomRouter(nrOfInstances, routees, resizer) + case "smallest-mailbox" ⇒ SmallestMailboxRouter(nrOfInstances, routees, resizer) + case "scatter-gather" ⇒ ScatterGatherFirstCompletedRouter(nrOfInstances, routees, within, resizer) + case "broadcast" ⇒ BroadcastRouter(nrOfInstances, routees, resizer) + case "consistent-hashing" ⇒ ConsistentHashingRouter(nrOfInstances, routees, resizer) case fqn ⇒ val args = Seq(classOf[Config] -> deployment) dynamicAccess.createInstanceFor[RouterConfig](fqn, args).recover({ diff --git a/akka-actor/src/main/scala/akka/routing/ConsistentHashingRouter.scala b/akka-actor/src/main/scala/akka/routing/ConsistentHashingRouter.scala new file mode 100644 index 0000000000..eba812ac60 --- /dev/null +++ b/akka-actor/src/main/scala/akka/routing/ConsistentHashingRouter.scala @@ -0,0 +1,212 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ +package akka.routing + +import scala.collection.JavaConversions.iterableAsScalaIterable +import scala.util.control.NonFatal + +import akka.actor.ActorRef +import akka.actor.SupervisorStrategy +import akka.actor.Props +import akka.dispatch.Dispatchers +import akka.event.Logging +import akka.serialization.SerializationExtension + +object ConsistentHashingRouter { + /** + * Creates a new ConsistentHashingRouter, routing to the specified routees + */ + def apply(routees: Iterable[ActorRef]): ConsistentHashingRouter = + new ConsistentHashingRouter(routees = routees map (_.path.toString)) + + /** + * Java API to create router with the supplied 'routees' actors. + */ + def create(routees: java.lang.Iterable[ActorRef]): ConsistentHashingRouter = { + import scala.collection.JavaConverters._ + apply(routees.asScala) + } + + /** + * Messages need to implement this interface to define what + * data to use for the consistent hash key. Note that it's not + * the hash, but the data to be hashed. If returning an + * `Array[Byte]` or String it will be used as is, otherwise the + * configured [[akka.akka.serialization.Serializer]] will + * be applied to the returned data. + * + * If messages can't implement this interface themselves, + * it's possible to wrap the messages in + * [[akka.routing.ConsistentHashableEnvelope]] + */ + trait ConsistentHashable { + def consistentHashKey: Any + } + + /** + * If messages can't implement [[akka.routing.ConsistentHashable]] + * themselves they can we wrapped by something implementing + * this interface instead. The router will only send the + * wrapped message to the destination, i.e. the envelope will + * be stripped off. + */ + trait ConsistentHashableEnvelope extends ConsistentHashable with RouterEnvelope { + def message: Any + } + + /** + * Default number of replicas (virtual nodes) used in [[akka.routing.ConsistantHash]] + */ + val DefaultReplicas: Int = 10 + +} +/** + * A Router that uses consistent hashing to select a connection based on the + * sent message. The messages must implement [[akka.routing.ConsistentHashable]] + * or be wrapped in a [[akka.routing.ConsistentHashableEnvelope]] to define what + * data to use for the consistent hash key. + * + * Please note that providing both 'nrOfInstances' and 'routees' does not make logical + * sense as this means that the router should both create new actors and use the 'routees' + * actor(s). In this case the 'nrOfInstances' will be ignored and the 'routees' will be used. + *
+ * The configuration parameter trumps the constructor arguments. This means that + * if you provide either 'nrOfInstances' or 'routees' during instantiation they will + * be ignored if the router is defined in the configuration file for the actor being used. + * + *

Supervision Setup

+ * + * The router creates a “head” actor which supervises and/or monitors the + * routees. Instances are created as children of this actor, hence the + * children are not supervised by the parent of the router. Common choices are + * to always escalate (meaning that fault handling is always applied to all + * children simultaneously; this is the default) or use the parent’s strategy, + * which will result in routed children being treated individually, but it is + * possible as well to use Routers to give different supervisor strategies to + * different groups of children. + * + * @param routees string representation of the actor paths of the routees that will be looked up + * using `actorFor` in [[akka.actor.ActorRefProvider]] + * @param replicas number of replicas (virtual nodes) used in [[akka.routing.ConsistantHash]] + */ +@SerialVersionUID(1L) +case class ConsistentHashingRouter( + nrOfInstances: Int = 0, routees: Iterable[String] = Nil, override val resizer: Option[Resizer] = None, + val routerDispatcher: String = Dispatchers.DefaultDispatcherId, + val supervisorStrategy: SupervisorStrategy = Router.defaultSupervisorStrategy, + val replicas: Int = ConsistentHashingRouter.DefaultReplicas) + extends RouterConfig with ConsistentHashingLike { + + /** + * Constructor that sets nrOfInstances to be created. + * Java API + */ + def this(nr: Int) = this(nrOfInstances = nr) + + /** + * Constructor that sets the routees to be used. + * Java API + * @param routeePaths string representation of the actor paths of the routees that will be looked up + * using `actorFor` in [[akka.actor.ActorRefProvider]] + */ + def this(routeePaths: java.lang.Iterable[String]) = this(routees = iterableAsScalaIterable(routeePaths)) + + /** + * Constructor that sets the resizer to be used. + * Java API + */ + def this(resizer: Resizer) = this(resizer = Some(resizer)) + + /** + * Java API for setting routerDispatcher + */ + def withDispatcher(dispatcherId: String): ConsistentHashingRouter = copy(routerDispatcher = dispatcherId) + + /** + * Java API for setting the supervisor strategy to be used for the “head” + * Router actor. + */ + def withSupervisorStrategy(strategy: SupervisorStrategy): ConsistentHashingRouter = copy(supervisorStrategy = strategy) + + /** + * Java API for setting the number of replicas (virtual nodes) used in [[akka.routing.ConsistantHash]] + */ + def withReplicas(replicas: Int): ConsistentHashingRouter = copy(replicas = replicas) + + /** + * Uses the resizer of the given RouterConfig if this RouterConfig + * doesn't have one, i.e. the resizer defined in code is used if + * resizer was not defined in config. + */ + override def withFallback(other: RouterConfig): RouterConfig = { + if (this.resizer.isEmpty && other.resizer.isDefined) copy(resizer = other.resizer) + else this + } +} + +trait ConsistentHashingLike { this: RouterConfig ⇒ + + import ConsistentHashingRouter._ + + def nrOfInstances: Int + + def routees: Iterable[String] + + def replicas: Int + + override def createRoute(routeeProvider: RouteeProvider): Route = { + if (resizer.isEmpty) { + if (routees.isEmpty) routeeProvider.createRoutees(nrOfInstances) + else routeeProvider.registerRouteesFor(routees) + } + + val log = Logging(routeeProvider.context.system, routeeProvider.context.self) + + // consistentHashRoutees and consistentHash are updated together, synchronized on the consistentHashLock + val consistentHashLock = new Object + var consistentHashRoutees: IndexedSeq[ActorRef] = null + var consistentHash: ConsistentHash[ActorRef] = null + upateConsistentHash() + + // update consistentHash when routees has changed + // changes to routees are rare and when no changes this is a quick operation + def upateConsistentHash(): ConsistentHash[ActorRef] = consistentHashLock.synchronized { + val currentRoutees = routeeProvider.routees + if ((currentRoutees ne consistentHashRoutees) && currentRoutees != consistentHashRoutees) { + consistentHashRoutees = currentRoutees + consistentHash = ConsistentHash(currentRoutees, replicas) + } + consistentHash + } + + def target(hashData: Any): ActorRef = try { + val hash = hashData match { + case bytes: Array[Byte] ⇒ bytes + case str: String ⇒ str.getBytes("UTF-8") + case x: AnyRef ⇒ SerializationExtension(routeeProvider.context.system).serialize(x).get + } + val currentConsistenHash = upateConsistentHash() + if (currentConsistenHash.isEmpty) routeeProvider.context.system.deadLetters + else currentConsistenHash.nodeFor(hash) + } catch { + case NonFatal(e) ⇒ + // serialization failed + log.warning("Couldn't route message with consistentHashKey [%s] due to [%s]".format(hashData, e.getMessage)) + routeeProvider.context.system.deadLetters + } + + { + case (sender, message) ⇒ + message match { + case Broadcast(msg) ⇒ toAll(sender, routeeProvider.routees) + case hashable: ConsistentHashable ⇒ List(Destination(sender, target(hashable.consistentHashKey))) + case other ⇒ + log.warning("Message [{}] must implement [{}] or be wrapped in [{}]", + message.getClass.getName, classOf[ConsistentHashable].getName, + classOf[ConsistentHashableEnvelope].getName) + List(Destination(sender, routeeProvider.context.system.deadLetters)) + } + } + } +} \ No newline at end of file diff --git a/akka-actor/src/main/scala/akka/routing/Routing.scala b/akka-actor/src/main/scala/akka/routing/Routing.scala index 1d18e7ed2e..9e17bf2b9a 100644 --- a/akka-actor/src/main/scala/akka/routing/Routing.scala +++ b/akka-actor/src/main/scala/akka/routing/Routing.scala @@ -115,8 +115,8 @@ private[akka] class RoutedActorCell(_system: ActorSystemImpl, _ref: InternalActo val s = if (sender eq null) system.deadLetters else sender val msg = message match { - case Broadcast(m) ⇒ m - case m ⇒ m + case wrapped: RouterEnvelope ⇒ wrapped.message + case m ⇒ m } applyRoute(s, message) match { @@ -400,7 +400,15 @@ private object Router { * Router implementations may choose to handle this message differently. */ @SerialVersionUID(1L) -case class Broadcast(message: Any) +case class Broadcast(message: Any) extends RouterEnvelope + +/** + * Only the contained message will be forwarded to the + * destination, i.e. the envelope will be stripped off. + */ +trait RouterEnvelope { + def message: Any +} /** * Sending this message to a router will make it send back its currently used routees. From ea78e6d8052b1fbcd58452d79c1313c3c4195d1b Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Tue, 11 Sep 2012 15:10:33 +0200 Subject: [PATCH 04/19] Documentation of constent hashing router, see #944 * Document sample to illustrate usage * Rewrote RouterViaConfig to real DocSpec * Added configuration snippets for all router types --- .../test/scala/akka/routing/RoutingSpec.scala | 2 - akka-docs/java/routing.rst | 50 +++++- .../ConsistentHashingRouterDocSpec.scala | 60 +++++++ .../docs/routing/RouterViaConfigDocSpec.scala | 157 ++++++++++++++++++ .../docs/routing/RouterViaConfigExample.scala | 52 ------ akka-docs/scala/routing.rst | 63 ++++++- 6 files changed, 323 insertions(+), 61 deletions(-) create mode 100644 akka-docs/scala/code/docs/routing/ConsistentHashingRouterDocSpec.scala create mode 100644 akka-docs/scala/code/docs/routing/RouterViaConfigDocSpec.scala delete mode 100644 akka-docs/scala/code/docs/routing/RouterViaConfigExample.scala diff --git a/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala b/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala index 9e126e75ed..2d4f2245f1 100644 --- a/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala @@ -171,8 +171,6 @@ class RoutingSpec extends AkkaSpec(RoutingSpec.config) with DefaultTimeout with } expectMsgType[ActorKilledException] - //#supervision - val router2 = system.actorOf(Props.empty.withRouter(RoundRobinRouter(1).withSupervisorStrategy(escalator))) router2 ! CurrentRoutees EventFilter[ActorKilledException](occurrences = 2) intercept { diff --git a/akka-docs/java/routing.rst b/akka-docs/java/routing.rst index 845e2825e3..561cbd9446 100644 --- a/akka-docs/java/routing.rst +++ b/akka-docs/java/routing.rst @@ -21,7 +21,7 @@ Routers In Action This is an example of how to create a router that is defined in configuration: -.. includecode:: ../scala/code/docs/routing/RouterViaConfigExample.scala#config +.. includecode:: ../scala/code/docs/routing/RouterViaConfigDocSpec.scala#config-round-robin .. includecode:: code/docs/jrouting/RouterViaConfigExample.java#configurableRouting @@ -177,6 +177,10 @@ is exactly what you would expect from a round-robin router to happen. (The name of an actor is automatically created in the format ``$letter`` unless you specify it - hence the names printed above.) +This is an example of how to define a round-robin router in configuration: + +.. includecode:: ../scala/code/docs/routing/RouterViaConfigDocSpec.scala#config-round-robin + RandomRouter ************ As the name implies this router type selects one of its routees randomly and forwards @@ -204,6 +208,10 @@ When run you should see a similar output to this: The result from running the random router should be different, or at least random, every time you run it. Try to run it a couple of times to verify its behavior if you don't trust us. +This is an example of how to define a random router in configuration: + +.. includecode:: ../scala/code/docs/routing/RouterViaConfigDocSpec.scala#config-random + SmallestMailboxRouter ********************* A Router that tries to send to the non-suspended routee with fewest messages in mailbox. @@ -219,6 +227,10 @@ Code example: .. includecode:: code/docs/jrouting/ParentActor.java#smallestMailboxRouter +This is an example of how to define a smallest-mailbox router in configuration: + +.. includecode:: ../scala/code/docs/routing/RouterViaConfigDocSpec.scala#config-smallest-mailbox + BroadcastRouter *************** A broadcast router forwards the message it receives to *all* its routees. @@ -238,6 +250,10 @@ When run you should see a similar output to this: As you can see here above each of the routees, five in total, received the broadcast message. +This is an example of how to define a broadcast router in configuration: + +.. includecode:: ../scala/code/docs/routing/RouterViaConfigDocSpec.scala#config-broadcast + ScatterGatherFirstCompletedRouter ********************************* The ScatterGatherFirstCompletedRouter will send the message on to all its routees as a future. @@ -255,6 +271,36 @@ When run you should see this: From the output above you can't really see that all the routees performed the calculation, but they did! The result you see is from the first routee that returned its calculation to the router. +This is an example of how to define a scatter-gather router in configuration: + +.. includecode:: ../scala/code/docs/routing/RouterViaConfigDocSpec.scala#config-scatter-gather + +ConsistentHashingRouter +*********************** + +The ConsistentHashingRouter uses `consistent hashing `_ +to select a connection based on the sent message. This +`article `_ gives good +insight into how consistent hashing is implemented. + +The messages sent to a ConsistentHashingRouter must implement +``akka.routing.ConsistentHashable`` or be wrapped in a ``akka.routing.ConsistentHashableEnvelope`` +to define what data to use for the consistent hash key. If returning a +byte array or String it will be used as is, otherwise the configured +:ref:`serializer ` will be applied to the returned data +to create a byte array that will be hashed. + +Code example: + +FIXME Java example of consistent routing + +In the above example you see that the ``Get`` message implements ``ConsistentHashable`` itself, +while the ``Entry`` message is wrapped in a ``ConsistentHashableEnvelope``. + +This is an example of how to define a consistent-hashing router in configuration: + +.. includecode:: ../scala/code/docs/routing/RouterViaConfigDocSpec.scala#config-consistent-hashing + Broadcast Messages ^^^^^^^^^^^^^^^^^^ @@ -276,7 +322,7 @@ of routees dynamically. This is an example of how to create a resizable router that is defined in configuration: -.. includecode:: ../scala/code/docs/routing/RouterViaConfigExample.scala#config-resize +.. includecode:: ../scala/code/docs/routing/RouterViaConfigDocSpec.scala#config-resize .. includecode:: code/docs/jrouting/RouterViaConfigExample.java#configurableRoutingWithResizer diff --git a/akka-docs/scala/code/docs/routing/ConsistentHashingRouterDocSpec.scala b/akka-docs/scala/code/docs/routing/ConsistentHashingRouterDocSpec.scala new file mode 100644 index 0000000000..8bb59a7bce --- /dev/null +++ b/akka-docs/scala/code/docs/routing/ConsistentHashingRouterDocSpec.scala @@ -0,0 +1,60 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ +package docs.routing + +import akka.testkit.AkkaSpec +import akka.testkit.ImplicitSender + +object ConsistentHashingRouterDocSpec { + + //#cache-actor + import akka.actor.Actor + import akka.routing.ConsistentHashingRouter.ConsistentHashable + import akka.routing.ConsistentHashingRouter.ConsistentHashableEnvelope + + class Cache extends Actor { + var cache = Map.empty[String, String] + + def receive = { + case Entry(key, value) ⇒ cache += (key -> value) + case Get(key) ⇒ sender ! cache.get(key) + } + } + + case class Get(key: String) extends ConsistentHashable { + override def consistentHashKey: Any = key + } + + case class Entry(key: String, value: String) + case class EntryEnvelope(entry: Entry) extends ConsistentHashableEnvelope { + override def consistentHashKey: Any = entry.key + override def message: Any = entry + } + //#cache-actor + +} + +class ConsistentHashingRouterDocSpec extends AkkaSpec with ImplicitSender { + + import ConsistentHashingRouterDocSpec._ + + "demonstrate usage of ConsistentHashableRouter" in { + + //#consistent-hashing-router + import akka.actor.Props + import akka.routing.ConsistentHashingRouter + + val cache = system.actorOf(Props[Cache].withRouter(ConsistentHashingRouter(10)), "cache") + + cache ! EntryEnvelope(Entry("hello", "HELLO")) + cache ! EntryEnvelope(Entry("hi", "HI")) + + cache ! Get("hello") + cache ! Get("hi") + expectMsg(Some("HELLO")) + expectMsg(Some("HI")) + //#consistent-hashing-router + } + +} diff --git a/akka-docs/scala/code/docs/routing/RouterViaConfigDocSpec.scala b/akka-docs/scala/code/docs/routing/RouterViaConfigDocSpec.scala new file mode 100644 index 0000000000..265b0f8361 --- /dev/null +++ b/akka-docs/scala/code/docs/routing/RouterViaConfigDocSpec.scala @@ -0,0 +1,157 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ +package docs.routing + +import akka.actor.{ Actor, Props, ActorSystem, ActorLogging } +import com.typesafe.config.ConfigFactory +import akka.routing.FromConfig +import akka.routing.ConsistentHashingRouter.ConsistentHashable +import akka.testkit.AkkaSpec +import akka.testkit.ImplicitSender + +object RouterWithConfigDocSpec { + + val config = ConfigFactory.parseString(""" + + //#config-round-robin + akka.actor.deployment { + /myrouter1 { + router = round-robin + nr-of-instances = 5 + } + } + //#config-round-robin + + //#config-resize + akka.actor.deployment { + /myrouter2 { + router = round-robin + resizer { + lower-bound = 2 + upper-bound = 15 + } + } + } + //#config-resize + + //#config-random + akka.actor.deployment { + /myrouter3 { + router = random + nr-of-instances = 5 + } + } + //#config-random + + //#config-smallest-mailbox + akka.actor.deployment { + /myrouter4 { + router = smallest-mailbox + nr-of-instances = 5 + } + } + //#config-smallest-mailbox + + //#config-broadcast + akka.actor.deployment { + /myrouter5 { + router = broadcast + nr-of-instances = 5 + } + } + //#config-broadcast + + //#config-scatter-gather + akka.actor.deployment { + /myrouter6 { + router = scatter-gather + nr-of-instances = 5 + within = 10 seconds + } + } + //#config-scatter-gather + + //#config-consistent-hashing + akka.actor.deployment { + /myrouter7 { + router = consistent-hashing + nr-of-instances = 5 + } + } + //#config-consistent-hashing + + """) + + case class Message(nbr: Int) extends ConsistentHashable { + override def consistentHashKey = nbr + } + + class ExampleActor extends Actor with ActorLogging { + def receive = { + case Message(nbr) ⇒ + log.debug("Received %s in router %s".format(nbr, self.path.name)) + sender ! nbr + } + } + +} + +class RouterWithConfigDocSpec extends AkkaSpec(RouterWithConfigDocSpec.config) with ImplicitSender { + + import RouterWithConfigDocSpec._ + + "demonstrate configured round-robin router" in { + //#configurableRouting + val router = system.actorOf(Props[ExampleActor].withRouter(FromConfig()), + "myrouter1") + //#configurableRouting + 1 to 10 foreach { i ⇒ router ! Message(i) } + receiveN(10) + } + + "demonstrate configured random router" in { + val router = system.actorOf(Props[ExampleActor].withRouter(FromConfig()), + "myrouter3") + 1 to 10 foreach { i ⇒ router ! Message(i) } + receiveN(10) + } + + "demonstrate configured smallest-mailbox router" in { + val router = system.actorOf(Props[ExampleActor].withRouter(FromConfig()), + "myrouter4") + 1 to 10 foreach { i ⇒ router ! Message(i) } + receiveN(10) + } + + "demonstrate configured broadcast router" in { + val router = system.actorOf(Props[ExampleActor].withRouter(FromConfig()), + "myrouter5") + 1 to 10 foreach { i ⇒ router ! Message(i) } + receiveN(5 * 10) + } + + "demonstrate configured scatter-gather router" in { + val router = system.actorOf(Props[ExampleActor].withRouter(FromConfig()), + "myrouter6") + 1 to 10 foreach { i ⇒ router ! Message(i) } + receiveN(10) + } + + "demonstrate configured consistent-hashing router" in { + val router = system.actorOf(Props[ExampleActor].withRouter(FromConfig()), + "myrouter7") + 1 to 10 foreach { i ⇒ router ! Message(i) } + receiveN(10) + } + + "demonstrate configured round-robin router with resizer" in { + //#configurableRoutingWithResizer + val router = system.actorOf(Props[ExampleActor].withRouter(FromConfig()), + "myrouter2") + //#configurableRoutingWithResizer + 1 to 10 foreach { i ⇒ router ! Message(i) } + receiveN(10) + } + +} \ No newline at end of file diff --git a/akka-docs/scala/code/docs/routing/RouterViaConfigExample.scala b/akka-docs/scala/code/docs/routing/RouterViaConfigExample.scala deleted file mode 100644 index 5d34e429bb..0000000000 --- a/akka-docs/scala/code/docs/routing/RouterViaConfigExample.scala +++ /dev/null @@ -1,52 +0,0 @@ -/** - * Copyright (C) 2009-2012 Typesafe Inc. - */ -package docs.routing - -import akka.actor.{ Actor, Props, ActorSystem } -import com.typesafe.config.ConfigFactory -import akka.routing.FromConfig - -case class Message(nbr: Int) - -class ExampleActor extends Actor { - def receive = { - case Message(nbr) ⇒ println("Received %s in router %s".format(nbr, self.path.name)) - } -} - -object RouterWithConfigExample extends App { - val config = ConfigFactory.parseString(""" - //#config - akka.actor.deployment { - /router { - router = round-robin - nr-of-instances = 5 - } - } - //#config - //#config-resize - akka.actor.deployment { - /router2 { - router = round-robin - resizer { - lower-bound = 2 - upper-bound = 15 - } - } - } - //#config-resize - """) - val system = ActorSystem("Example", config) - //#configurableRouting - val router = system.actorOf(Props[ExampleActor].withRouter(FromConfig()), - "router") - //#configurableRouting - 1 to 10 foreach { i ⇒ router ! Message(i) } - - //#configurableRoutingWithResizer - val router2 = system.actorOf(Props[ExampleActor].withRouter(FromConfig()), - "router2") - //#configurableRoutingWithResizer - 1 to 10 foreach { i ⇒ router2 ! Message(i) } -} \ No newline at end of file diff --git a/akka-docs/scala/routing.rst b/akka-docs/scala/routing.rst index 353e82d277..3f384895c6 100644 --- a/akka-docs/scala/routing.rst +++ b/akka-docs/scala/routing.rst @@ -15,15 +15,16 @@ is really easy to create your own. The routers shipped with Akka are: * ``akka.routing.SmallestMailboxRouter`` * ``akka.routing.BroadcastRouter`` * ``akka.routing.ScatterGatherFirstCompletedRouter`` +* ``akka.routing.ConsistentHashingRouter`` Routers In Action ^^^^^^^^^^^^^^^^^ This is an example of how to create a router that is defined in configuration: -.. includecode:: code/docs/routing/RouterViaConfigExample.scala#config +.. includecode:: code/docs/routing/RouterViaConfigDocSpec.scala#config-round-robin -.. includecode:: code/docs/routing/RouterViaConfigExample.scala#configurableRouting +.. includecode:: code/docs/routing/RouterViaConfigDocSpec.scala#configurableRouting This is an example of how to programmatically create a router and set the number of routees it should create: @@ -125,7 +126,7 @@ not have an effect on the number of actors in the pool. Setting the strategy is easily done: -.. includecode:: ../../akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala +.. includecode:: ../../akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala#supervision :include: supervision :exclude: custom-strategy @@ -179,6 +180,10 @@ is exactly what you would expect from a round-robin router to happen. (The name of an actor is automatically created in the format ``$letter`` unless you specify it - hence the names printed above.) +This is an example of how to define a round-robin router in configuration: + +.. includecode:: code/docs/routing/RouterViaConfigDocSpec.scala#config-round-robin + RandomRouter ************ As the name implies this router type selects one of its routees randomly and forwards @@ -206,6 +211,10 @@ When run you should see a similar output to this: The result from running the random router should be different, or at least random, every time you run it. Try to run it a couple of times to verify its behavior if you don't trust us. +This is an example of how to define a random router in configuration: + +.. includecode:: code/docs/routing/RouterViaConfigDocSpec.scala#config-random + SmallestMailboxRouter ********************* A Router that tries to send to the non-suspended routee with fewest messages in mailbox. @@ -221,6 +230,11 @@ Code example: .. includecode:: code/docs/routing/RouterTypeExample.scala#smallestMailboxRouter + +This is an example of how to define a smallest-mailbox router in configuration: + +.. includecode:: code/docs/routing/RouterViaConfigDocSpec.scala#config-smallest-mailbox + BroadcastRouter *************** A broadcast router forwards the message it receives to *all* its routees. @@ -240,6 +254,11 @@ When run you should see a similar output to this: As you can see here above each of the routees, five in total, received the broadcast message. +This is an example of how to define a broadcast router in configuration: + +.. includecode:: code/docs/routing/RouterViaConfigDocSpec.scala#config-broadcast + + ScatterGatherFirstCompletedRouter ********************************* The ScatterGatherFirstCompletedRouter will send the message on to all its routees as a future. @@ -257,6 +276,40 @@ When run you should see this: From the output above you can't really see that all the routees performed the calculation, but they did! The result you see is from the first routee that returned its calculation to the router. +This is an example of how to define a scatter-gather router in configuration: + +.. includecode:: code/docs/routing/RouterViaConfigDocSpec.scala#config-scatter-gather + + +ConsistentHashingRouter +*********************** + +The ConsistentHashingRouter uses `consistent hashing `_ +to select a connection based on the sent message. This +`article `_ gives good +insight into how consistent hashing is implemented. + +The messages sent to a ConsistentHashingRouter must implement +``akka.routing.ConsistentHashable`` or be wrapped in a ``akka.routing.ConsistentHashableEnvelope`` +to define what data to use for the consistent hash key. If returning a +byte array or String it will be used as is, otherwise the configured +:ref:`serializer ` will be applied to the returned data +to create a byte array that will be hashed. + +Code example: + +.. includecode:: code/docs/routing/ConsistentHashingRouterDocSpec.scala#cache-actor + +.. includecode:: code/docs/routing/ConsistentHashingRouterDocSpec.scala#consistent-hashing-router + +In the above example you see that the ``Get`` message implements ``ConsistentHashable`` itself, +while the ``Entry`` message is wrapped in a ``ConsistentHashableEnvelope``. + +This is an example of how to define a consistent-hashing router in configuration: + +.. includecode:: code/docs/routing/RouterViaConfigDocSpec.scala#config-consistent-hashing + + Broadcast Messages ^^^^^^^^^^^^^^^^^^ @@ -278,9 +331,9 @@ of routees dynamically. This is an example of how to create a resizable router that is defined in configuration: -.. includecode:: code/docs/routing/RouterViaConfigExample.scala#config-resize +.. includecode:: code/docs/routing/RouterViaConfigDocSpec.scala#config-resize -.. includecode:: code/docs/routing/RouterViaConfigExample.scala#configurableRoutingWithResizer +.. includecode:: code/docs/routing/RouterViaConfigDocSpec.scala#configurableRoutingWithResizer Several more configuration options are available and described in ``akka.actor.deployment.default.resizer`` section of the reference :ref:`configuration`. From d74464ba50ebe1f75a5720f9f0e7f76f9df4d53c Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Thu, 13 Sep 2012 14:25:25 +0200 Subject: [PATCH 05/19] Move MurmurHash to separate file due to license header, see #944 * Also removed unused 'class MurmurHash', we only use 'object MurmurHash' --- .../scala/akka/routing/ConsistentHash.scala | 197 ------------------ .../main/scala/akka/routing/MurmurHash.scala | 149 +++++++++++++ 2 files changed, 149 insertions(+), 197 deletions(-) create mode 100644 akka-actor/src/main/scala/akka/routing/MurmurHash.scala diff --git a/akka-actor/src/main/scala/akka/routing/ConsistentHash.scala b/akka-actor/src/main/scala/akka/routing/ConsistentHash.scala index 21f9bffbce..5dc23f1383 100644 --- a/akka-actor/src/main/scala/akka/routing/ConsistentHash.scala +++ b/akka-actor/src/main/scala/akka/routing/ConsistentHash.scala @@ -5,27 +5,6 @@ package akka.routing import scala.collection.immutable.TreeMap -import java.lang.Integer.{ rotateLeft ⇒ rotl } - -/* __ *\ -** ________ ___ / / ___ Scala API ** -** / __/ __// _ | / / / _ | (c) 2003-2011, LAMP/EPFL ** -** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ ** -** /____/\___/_/ |_/____/_/ | | ** -** |/ ** -\* */ - -/** - * An implementation of Austin Appleby's MurmurHash 3.0 algorithm - * (32 bit version); reference: http://code.google.com/p/smhasher - * - * This is the hash used by collections and case classes (including - * tuples). - * - * @author Rex Kerr - * @version 2.9 - * @since 2.9 - */ // ============================================================================================= // Adapted from HashRing.scala in Debasish Ghosh's Redis Client, licensed under Apache 2 license @@ -129,179 +108,3 @@ object ConsistentHash { } } -/** - * A class designed to generate well-distributed non-cryptographic - * hashes. It is designed to be passed to a collection's foreach method, - * or can take individual hash values with append. Its own hash code is - * set equal to the hash code of whatever it is hashing. - */ -class MurmurHash[@specialized(Int, Long, Float, Double) T](seed: Int) extends (T ⇒ Unit) { - import MurmurHash._ - - private var h = startHash(seed) - private var c = hiddenMagicA - private var k = hiddenMagicB - private var hashed = false - private var hashvalue = h - - /** Begin a new hash using the same seed. */ - def reset(): Unit = { - h = startHash(seed) - c = hiddenMagicA - k = hiddenMagicB - hashed = false - } - - /** Incorporate the hash value of one item. */ - def apply(t: T): Unit = { - h = extendHash(h, t.##, c, k) - c = nextMagicA(c) - k = nextMagicB(k) - hashed = false - } - - /** Incorporate a known hash value. */ - def append(i: Int): Unit = { - h = extendHash(h, i, c, k) - c = nextMagicA(c) - k = nextMagicB(k) - hashed = false - } - - /** Retrieve the hash value */ - def hash: Int = { - if (!hashed) { - hashvalue = finalizeHash(h) - hashed = true - } - hashvalue - } - - override def hashCode: Int = hash -} - -/** - * An object designed to generate well-distributed non-cryptographic - * hashes. It is designed to hash a collection of integers; along with - * the integers to hash, it generates two magic streams of integers to - * increase the distribution of repetitive input sequences. Thus, - * three methods need to be called at each step (to start and to - * incorporate a new integer) to update the values. Only one method - * needs to be called to finalize the hash. - */ - -object MurmurHash { - // Magic values used for MurmurHash's 32 bit hash. - // Don't change these without consulting a hashing expert! - final private val visibleMagic: Int = 0x971e137b - final private val hiddenMagicA: Int = 0x95543787 - final private val hiddenMagicB: Int = 0x2ad7eb25 - final private val visibleMixer: Int = 0x52dce729 - final private val hiddenMixerA: Int = 0x7b7d159c - final private val hiddenMixerB: Int = 0x6bce6396 - final private val finalMixer1: Int = 0x85ebca6b - final private val finalMixer2: Int = 0xc2b2ae35 - - // Arbitrary values used for hashing certain classes - final private val seedString: Int = 0xf7ca7fd2 - final private val seedArray: Int = 0x3c074a61 - - /** The first 23 magic integers from the first stream are stored here */ - private val storedMagicA: Array[Int] = - Iterator.iterate(hiddenMagicA)(nextMagicA).take(23).toArray - - /** The first 23 magic integers from the second stream are stored here */ - private val storedMagicB: Array[Int] = - Iterator.iterate(hiddenMagicB)(nextMagicB).take(23).toArray - - /** Begin a new hash with a seed value. */ - def startHash(seed: Int): Int = seed ^ visibleMagic - - /** The initial magic integers in the first stream. */ - def startMagicA: Int = hiddenMagicA - - /** The initial magic integer in the second stream. */ - def startMagicB: Int = hiddenMagicB - - /** - * Incorporates a new value into an existing hash. - * - * @param hash the prior hash value - * @param value the new value to incorporate - * @param magicA a magic integer from the stream - * @param magicB a magic integer from a different stream - * @return the updated hash value - */ - def extendHash(hash: Int, value: Int, magicA: Int, magicB: Int): Int = - (hash ^ rotl(value * magicA, 11) * magicB) * 3 + visibleMixer - - /** Given a magic integer from the first stream, compute the next */ - def nextMagicA(magicA: Int): Int = magicA * 5 + hiddenMixerA - - /** Given a magic integer from the second stream, compute the next */ - def nextMagicB(magicB: Int): Int = magicB * 5 + hiddenMixerB - - /** Once all hashes have been incorporated, this performs a final mixing */ - def finalizeHash(hash: Int): Int = { - var i = (hash ^ (hash >>> 16)) - i *= finalMixer1 - i ^= (i >>> 13) - i *= finalMixer2 - i ^= (i >>> 16) - i - } - - /** Compute a high-quality hash of an array */ - def arrayHash[@specialized T](a: Array[T]): Int = { - var h = startHash(a.length * seedArray) - var c = hiddenMagicA - var k = hiddenMagicB - var j = 0 - while (j < a.length) { - h = extendHash(h, a(j).##, c, k) - c = nextMagicA(c) - k = nextMagicB(k) - j += 1 - } - finalizeHash(h) - } - - /** Compute a high-quality hash of a string */ - def stringHash(s: String): Int = { - var h = startHash(s.length * seedString) - var c = hiddenMagicA - var k = hiddenMagicB - var j = 0 - while (j + 1 < s.length) { - val i = (s.charAt(j) << 16) + s.charAt(j + 1); - h = extendHash(h, i, c, k) - c = nextMagicA(c) - k = nextMagicB(k) - j += 2 - } - if (j < s.length) h = extendHash(h, s.charAt(j), c, k) - finalizeHash(h) - } - - /** - * Compute a hash that is symmetric in its arguments--that is, - * where the order of appearance of elements does not matter. - * This is useful for hashing sets, for example. - */ - def symmetricHash[T](xs: TraversableOnce[T], seed: Int): Int = { - var a, b, n = 0 - var c = 1 - xs.foreach(i ⇒ { - val h = i.## - a += h - b ^= h - if (h != 0) c *= h - n += 1 - }) - var h = startHash(seed * n) - h = extendHash(h, a, storedMagicA(0), storedMagicB(0)) - h = extendHash(h, b, storedMagicA(1), storedMagicB(1)) - h = extendHash(h, c, storedMagicA(2), storedMagicB(2)) - finalizeHash(h) - } -} diff --git a/akka-actor/src/main/scala/akka/routing/MurmurHash.scala b/akka-actor/src/main/scala/akka/routing/MurmurHash.scala new file mode 100644 index 0000000000..fc67613a5d --- /dev/null +++ b/akka-actor/src/main/scala/akka/routing/MurmurHash.scala @@ -0,0 +1,149 @@ +/* __ *\ +** ________ ___ / / ___ Scala API ** +** / __/ __// _ | / / / _ | (c) 2003-2011, LAMP/EPFL ** +** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ ** +** /____/\___/_/ |_/____/_/ | | ** +** |/ ** +\* */ + +/** + * An implementation of Austin Appleby's MurmurHash 3.0 algorithm + * (32 bit version); reference: http://code.google.com/p/smhasher + * + * This is the hash used by collections and case classes (including + * tuples). + * + * @author Rex Kerr + * @version 2.9 + * @since 2.9 + */ + +package akka.routing + +import java.lang.Integer.{ rotateLeft ⇒ rotl } + +/** + * An object designed to generate well-distributed non-cryptographic + * hashes. It is designed to hash a collection of integers; along with + * the integers to hash, it generates two magic streams of integers to + * increase the distribution of repetitive input sequences. Thus, + * three methods need to be called at each step (to start and to + * incorporate a new integer) to update the values. Only one method + * needs to be called to finalize the hash. + */ + +object MurmurHash { + // Magic values used for MurmurHash's 32 bit hash. + // Don't change these without consulting a hashing expert! + final private val visibleMagic: Int = 0x971e137b + final private val hiddenMagicA: Int = 0x95543787 + final private val hiddenMagicB: Int = 0x2ad7eb25 + final private val visibleMixer: Int = 0x52dce729 + final private val hiddenMixerA: Int = 0x7b7d159c + final private val hiddenMixerB: Int = 0x6bce6396 + final private val finalMixer1: Int = 0x85ebca6b + final private val finalMixer2: Int = 0xc2b2ae35 + + // Arbitrary values used for hashing certain classes + final private val seedString: Int = 0xf7ca7fd2 + final private val seedArray: Int = 0x3c074a61 + + /** The first 23 magic integers from the first stream are stored here */ + private val storedMagicA: Array[Int] = + Iterator.iterate(hiddenMagicA)(nextMagicA).take(23).toArray + + /** The first 23 magic integers from the second stream are stored here */ + private val storedMagicB: Array[Int] = + Iterator.iterate(hiddenMagicB)(nextMagicB).take(23).toArray + + /** Begin a new hash with a seed value. */ + def startHash(seed: Int): Int = seed ^ visibleMagic + + /** The initial magic integers in the first stream. */ + def startMagicA: Int = hiddenMagicA + + /** The initial magic integer in the second stream. */ + def startMagicB: Int = hiddenMagicB + + /** + * Incorporates a new value into an existing hash. + * + * @param hash the prior hash value + * @param value the new value to incorporate + * @param magicA a magic integer from the stream + * @param magicB a magic integer from a different stream + * @return the updated hash value + */ + def extendHash(hash: Int, value: Int, magicA: Int, magicB: Int): Int = + (hash ^ rotl(value * magicA, 11) * magicB) * 3 + visibleMixer + + /** Given a magic integer from the first stream, compute the next */ + def nextMagicA(magicA: Int): Int = magicA * 5 + hiddenMixerA + + /** Given a magic integer from the second stream, compute the next */ + def nextMagicB(magicB: Int): Int = magicB * 5 + hiddenMixerB + + /** Once all hashes have been incorporated, this performs a final mixing */ + def finalizeHash(hash: Int): Int = { + var i = (hash ^ (hash >>> 16)) + i *= finalMixer1 + i ^= (i >>> 13) + i *= finalMixer2 + i ^= (i >>> 16) + i + } + + /** Compute a high-quality hash of an array */ + def arrayHash[@specialized T](a: Array[T]): Int = { + var h = startHash(a.length * seedArray) + var c = hiddenMagicA + var k = hiddenMagicB + var j = 0 + while (j < a.length) { + h = extendHash(h, a(j).##, c, k) + c = nextMagicA(c) + k = nextMagicB(k) + j += 1 + } + finalizeHash(h) + } + + /** Compute a high-quality hash of a string */ + def stringHash(s: String): Int = { + var h = startHash(s.length * seedString) + var c = hiddenMagicA + var k = hiddenMagicB + var j = 0 + while (j + 1 < s.length) { + val i = (s.charAt(j) << 16) + s.charAt(j + 1); + h = extendHash(h, i, c, k) + c = nextMagicA(c) + k = nextMagicB(k) + j += 2 + } + if (j < s.length) h = extendHash(h, s.charAt(j), c, k) + finalizeHash(h) + } + + /** + * Compute a hash that is symmetric in its arguments--that is, + * where the order of appearance of elements does not matter. + * This is useful for hashing sets, for example. + */ + def symmetricHash[T](xs: TraversableOnce[T], seed: Int): Int = { + var a, b, n = 0 + var c = 1 + xs.foreach(i ⇒ { + val h = i.## + a += h + b ^= h + if (h != 0) c *= h + n += 1 + }) + var h = startHash(seed * n) + h = extendHash(h, a, storedMagicA(0), storedMagicB(0)) + h = extendHash(h, b, storedMagicA(1), storedMagicB(1)) + h = extendHash(h, c, storedMagicA(2), storedMagicB(2)) + finalizeHash(h) + } +} From 5a90d7198cea48e207ae5618971357a4ddb931ea Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Thu, 13 Sep 2012 18:06:35 +0200 Subject: [PATCH 06/19] Improvements based on review feedback, see #944 --- .../test/scala/akka/config/ConfigSpec.scala | 3 + .../routing/ConsistentHashingRouterSpec.scala | 11 +--- akka-actor/src/main/resources/reference.conf | 15 +++-- .../main/scala/akka/actor/ActorSystem.scala | 2 + .../src/main/scala/akka/actor/Deployer.scala | 20 +++--- .../scala/akka/routing/ConsistentHash.scala | 59 ++++++++---------- .../routing/ConsistentHashingRouter.scala | 61 ++++++++++--------- .../src/main/scala/akka/routing/Routing.scala | 20 ++++++ .../docs/routing/RouterViaConfigDocSpec.scala | 1 + 9 files changed, 106 insertions(+), 86 deletions(-) diff --git a/akka-actor-tests/src/test/scala/akka/config/ConfigSpec.scala b/akka-actor-tests/src/test/scala/akka/config/ConfigSpec.scala index 5fe3466b25..56f8cd45fc 100644 --- a/akka-actor-tests/src/test/scala/akka/config/ConfigSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/config/ConfigSpec.scala @@ -43,6 +43,9 @@ class ConfigSpec extends AkkaSpec(ConfigFactory.defaultReference(ActorSystem.fin getBoolean("akka.jvm-exit-on-fatal-error") must be(true) settings.JvmExitOnFatalError must be(true) + + getInt("akka.actor.deployment.default.virtual-nodes-factor") must be(10) + settings.DefaultVirtualNodesFactor must be(10) } { diff --git a/akka-actor-tests/src/test/scala/akka/routing/ConsistentHashingRouterSpec.scala b/akka-actor-tests/src/test/scala/akka/routing/ConsistentHashingRouterSpec.scala index fd7a49e867..7a28190523 100644 --- a/akka-actor-tests/src/test/scala/akka/routing/ConsistentHashingRouterSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/routing/ConsistentHashingRouterSpec.scala @@ -22,6 +22,7 @@ object ConsistentHashingRouterSpec { /router1 { router = consistent-hashing nr-of-instances = 3 + virtual-nodes-factor = 17 } } """ @@ -55,18 +56,12 @@ class ConsistentHashingRouterSpec extends AkkaSpec(ConsistentHashingRouterSpec.c "select destination based on consistentHashKey of the message" in { router1 ! Msg("a", "A") val destinationA = expectMsgPF(remaining) { case ref: ActorRef ⇒ ref } - router1 ! new ConsistentHashableEnvelope { - override def consistentHashKey = "a" - override def message = "AA" - } + router1 ! ConsistentHashableEnvelope(message = "AA", consistentHashKey = "a") expectMsg(destinationA) router1 ! Msg(17, "B") val destinationB = expectMsgPF(remaining) { case ref: ActorRef ⇒ ref } - router1 ! new ConsistentHashableEnvelope { - override def consistentHashKey = 17 - override def message = "BB" - } + router1 ! ConsistentHashableEnvelope(message = "BB", consistentHashKey = 17) expectMsg(destinationB) router1 ! Msg(MsgKey("c"), "C") diff --git a/akka-actor/src/main/resources/reference.conf b/akka-actor/src/main/resources/reference.conf index a585fe3406..163d2e83d9 100644 --- a/akka-actor/src/main/resources/reference.conf +++ b/akka-actor/src/main/resources/reference.conf @@ -14,7 +14,7 @@ akka { # Event handlers to register at boot time (Logging$DefaultLogger logs to STDOUT) event-handlers = ["akka.event.Logging$DefaultLogger"] - + # Event handlers are created and registered synchronously during ActorSystem # start-up, and since they are actors, this timeout is used to bound the # waiting time @@ -49,7 +49,7 @@ akka { # FQCN of the ActorRefProvider to be used; the below is the built-in default, # another one is akka.remote.RemoteActorRefProvider in the akka-remote bundle. provider = "akka.actor.LocalActorRefProvider" - + # The guardian "/user" will use this subclass of akka.actor.SupervisorStrategyConfigurator # to obtain its supervisorStrategy. Besides the default there is # akka.actor.StoppingSupervisorStrategy @@ -69,7 +69,7 @@ akka { # Serializes and deserializes creators (in Props) to ensure that they can be sent over the network, # this is only intended for testing. serialize-creators = off - + # Timeout for send operations to top-level actors which are in the process of being started. # This is only relevant if using a bounded mailbox or the CallingThreadDispatcher for a top-level actor. unstarted-push-timeout = 10s @@ -108,6 +108,9 @@ akka { # within is the timeout used for routers containing future calls within = 5 seconds + # number of virtual nodes per node for consistent-hashing router + virtual-nodes-factor = 10 + routees { # Alternatively to giving nr-of-instances you can specify the full # paths of those actors which should be routed to. This setting takes @@ -296,7 +299,7 @@ akka { # enable DEBUG logging of subscription changes on the eventStream event-stream = off - + # enable DEBUG logging of unhandled messages unhandled = off @@ -318,13 +321,13 @@ akka { serialization-bindings { "java.io.Serializable" = java } - + # Configuration items which are used by the akka.actor.ActorDSL._ methods dsl { # Maximum queue size of the actor created by newInbox(); this protects against # faulty programs which use select() and consistently miss messages inbox-size = 1000 - + # Default timeout to assume for operations like Inbox.receive et al default-timeout = 5s } diff --git a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala index bce966b99e..deec7697df 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala @@ -166,6 +166,8 @@ object ActorSystem { final val Daemonicity: Boolean = getBoolean("akka.daemonic") final val JvmExitOnFatalError: Boolean = getBoolean("akka.jvm-exit-on-fatal-error") + final val DefaultVirtualNodesFactor: Int = getInt("akka.actor.deployment.default.virtual-nodes-factor") + if (ConfigVersion != Version) throw new akka.ConfigurationException("Akka JAR version [" + Version + "] does not match the provided config version [" + ConfigVersion + "]") diff --git a/akka-actor/src/main/scala/akka/actor/Deployer.scala b/akka-actor/src/main/scala/akka/actor/Deployer.scala index be9da1a505..3f5e2b1785 100644 --- a/akka-actor/src/main/scala/akka/actor/Deployer.scala +++ b/akka-actor/src/main/scala/akka/actor/Deployer.scala @@ -143,18 +143,20 @@ private[akka] class Deployer(val settings: ActorSystem.Settings, val dynamicAcce val nrOfInstances = deployment.getInt("nr-of-instances") - val within = Duration(deployment.getMilliseconds("within"), TimeUnit.MILLISECONDS) - val resizer: Option[Resizer] = if (config.hasPath("resizer")) Some(DefaultResizer(deployment.getConfig("resizer"))) else None val router: RouterConfig = deployment.getString("router") match { - case "from-code" ⇒ NoRouter - case "round-robin" ⇒ RoundRobinRouter(nrOfInstances, routees, resizer) - case "random" ⇒ RandomRouter(nrOfInstances, routees, resizer) - case "smallest-mailbox" ⇒ SmallestMailboxRouter(nrOfInstances, routees, resizer) - case "scatter-gather" ⇒ ScatterGatherFirstCompletedRouter(nrOfInstances, routees, within, resizer) - case "broadcast" ⇒ BroadcastRouter(nrOfInstances, routees, resizer) - case "consistent-hashing" ⇒ ConsistentHashingRouter(nrOfInstances, routees, resizer) + case "from-code" ⇒ NoRouter + case "round-robin" ⇒ RoundRobinRouter(nrOfInstances, routees, resizer) + case "random" ⇒ RandomRouter(nrOfInstances, routees, resizer) + case "smallest-mailbox" ⇒ SmallestMailboxRouter(nrOfInstances, routees, resizer) + case "broadcast" ⇒ BroadcastRouter(nrOfInstances, routees, resizer) + case "scatter-gather" ⇒ + val within = Duration(deployment.getMilliseconds("within"), TimeUnit.MILLISECONDS) + ScatterGatherFirstCompletedRouter(nrOfInstances, routees, within, resizer) + case "consistent-hashing" ⇒ + val vnodes = deployment.getInt("virtual-nodes-factor") + ConsistentHashingRouter(nrOfInstances, routees, resizer, virtualNodesFactor = vnodes) case fqn ⇒ val args = Seq(classOf[Config] -> deployment) dynamicAccess.createInstanceFor[RouterConfig](fqn, args).recover({ diff --git a/akka-actor/src/main/scala/akka/routing/ConsistentHash.scala b/akka-actor/src/main/scala/akka/routing/ConsistentHash.scala index 5dc23f1383..19e457a762 100644 --- a/akka-actor/src/main/scala/akka/routing/ConsistentHash.scala +++ b/akka-actor/src/main/scala/akka/routing/ConsistentHash.scala @@ -20,23 +20,22 @@ import scala.collection.immutable.TreeMap * hash, i.e. make sure it is different for different nodes. * */ -class ConsistentHash[T] private (replicas: Int, ring: TreeMap[Int, T]) { +class ConsistentHash[T] private (nodeRing: TreeMap[Int, T], virtualNodesFactor: Int) { import ConsistentHash._ - if (replicas < 1) throw new IllegalArgumentException("replicas must be >= 1") + if (virtualNodesFactor < 1) throw new IllegalArgumentException("virtualNodesFactor must be >= 1") /** - * Adds a node to the ring. + * Adds a node to the node ring. * Note that the instance is immutable and this * operation returns a new instance. */ - def :+(node: T): ConsistentHash[T] = { - new ConsistentHash(replicas, ring ++ ((1 to replicas) map { r ⇒ (nodeHashFor(node, r) -> node) })) - } + def :+(node: T): ConsistentHash[T] = + new ConsistentHash(nodeRing ++ ((1 to virtualNodesFactor) map { r ⇒ (nodeHashFor(node, r) -> node) }), virtualNodesFactor) /** - * Adds a node to the ring. + * Adds a node to the node ring. * Note that the instance is immutable and this * operation returns a new instance. * JAVA API @@ -44,16 +43,15 @@ class ConsistentHash[T] private (replicas: Int, ring: TreeMap[Int, T]) { def add(node: T): ConsistentHash[T] = this :+ node /** - * Removes a node from the ring. + * Removes a node from the node ring. * Note that the instance is immutable and this * operation returns a new instance. */ - def :-(node: T): ConsistentHash[T] = { - new ConsistentHash(replicas, ring -- ((1 to replicas) map { r ⇒ nodeHashFor(node, r) })) - } + def :-(node: T): ConsistentHash[T] = + new ConsistentHash(nodeRing -- ((1 to virtualNodesFactor) map { r ⇒ nodeHashFor(node, r) }), virtualNodesFactor) /** - * Removes a node from the ring. + * Removes a node from the node ring. * Note that the instance is immutable and this * operation returns a new instance. * JAVA API @@ -62,49 +60,42 @@ class ConsistentHash[T] private (replicas: Int, ring: TreeMap[Int, T]) { /** * Get the node responsible for the data key. - * Can only be used if nodes exists in the ring, + * Can only be used if nodes exists in the node ring, * otherwise throws `IllegalStateException` */ def nodeFor(key: Array[Byte]): T = { - if (isEmpty) throw new IllegalStateException("Can't get node for [%s] from an empty ring" format key) + if (isEmpty) throw new IllegalStateException("Can't get node for [%s] from an empty node ring" format key) val hash = hashFor(key) - def nextClockwise: T = { - val (ringKey, node) = ring.rangeImpl(Some(hash), None).headOption.getOrElse(ring.head) - node - } - ring.getOrElse(hash, nextClockwise) + // find the next node clockwise in the nodeRing, pick first if end of tree + nodeRing.rangeImpl(Some(hash), None).headOption.getOrElse(nodeRing.head)._2 } /** - * Is the ring empty, i.e. no nodes added or all removed. + * Is the node ring empty, i.e. no nodes added or all removed. */ - def isEmpty: Boolean = ring.isEmpty + def isEmpty: Boolean = nodeRing.isEmpty } object ConsistentHash { - def apply[T](nodes: Iterable[T], replicas: Int) = { - new ConsistentHash(replicas, TreeMap.empty[Int, T] ++ - (for (node ← nodes; replica ← 1 to replicas) yield (nodeHashFor(node, replica) -> node))) + def apply[T](nodes: Iterable[T], virtualNodesFactor: Int) = { + new ConsistentHash(TreeMap.empty[Int, T] ++ + (for (node ← nodes; vnode ← 1 to virtualNodesFactor) yield (nodeHashFor(node, vnode) -> node)), + virtualNodesFactor) } /** * Factory method to create a ConsistentHash * JAVA API */ - def create[T](nodes: java.lang.Iterable[T], replicas: Int) = { + def create[T](nodes: java.lang.Iterable[T], virtualNodesFactor: Int) = { import scala.collection.JavaConverters._ - apply(nodes.asScala, replicas) + apply(nodes.asScala, virtualNodesFactor) } - private def nodeHashFor(node: Any, replica: Int): Int = { - hashFor((node + ":" + replica).getBytes("UTF-8")) - } + private def nodeHashFor(node: Any, vnode: Int): Int = + hashFor((node + ":" + vnode).getBytes("UTF-8")) - private def hashFor(bytes: Array[Byte]): Int = { - val hash = MurmurHash.arrayHash(bytes) - if (hash == Int.MinValue) hash + 1 - math.abs(hash) - } + private def hashFor(bytes: Array[Byte]): Int = MurmurHash.arrayHash(bytes) } diff --git a/akka-actor/src/main/scala/akka/routing/ConsistentHashingRouter.scala b/akka-actor/src/main/scala/akka/routing/ConsistentHashingRouter.scala index eba812ac60..e177fc413a 100644 --- a/akka-actor/src/main/scala/akka/routing/ConsistentHashingRouter.scala +++ b/akka-actor/src/main/scala/akka/routing/ConsistentHashingRouter.scala @@ -5,13 +5,13 @@ package akka.routing import scala.collection.JavaConversions.iterableAsScalaIterable import scala.util.control.NonFatal - import akka.actor.ActorRef import akka.actor.SupervisorStrategy import akka.actor.Props import akka.dispatch.Dispatchers import akka.event.Logging import akka.serialization.SerializationExtension +import java.util.concurrent.atomic.AtomicReference object ConsistentHashingRouter { /** @@ -46,19 +46,13 @@ object ConsistentHashingRouter { /** * If messages can't implement [[akka.routing.ConsistentHashable]] - * themselves they can we wrapped by something implementing - * this interface instead. The router will only send the - * wrapped message to the destination, i.e. the envelope will - * be stripped off. + * themselves they can we wrapped by this envelope instead. The + * router will only send the wrapped message to the destination, + * i.e. the envelope will be stripped off. */ - trait ConsistentHashableEnvelope extends ConsistentHashable with RouterEnvelope { - def message: Any - } - - /** - * Default number of replicas (virtual nodes) used in [[akka.routing.ConsistantHash]] - */ - val DefaultReplicas: Int = 10 + @SerialVersionUID(1L) + case class ConsistentHashableEnvelope(message: Any, consistentHashKey: Any) + extends ConsistentHashable with RouterEnvelope } /** @@ -88,14 +82,14 @@ object ConsistentHashingRouter { * * @param routees string representation of the actor paths of the routees that will be looked up * using `actorFor` in [[akka.actor.ActorRefProvider]] - * @param replicas number of replicas (virtual nodes) used in [[akka.routing.ConsistantHash]] + * @param virtualNodesFactor number of virtual nodes per node, used in [[akka.routing.ConsistantHash]] */ @SerialVersionUID(1L) case class ConsistentHashingRouter( nrOfInstances: Int = 0, routees: Iterable[String] = Nil, override val resizer: Option[Resizer] = None, val routerDispatcher: String = Dispatchers.DefaultDispatcherId, val supervisorStrategy: SupervisorStrategy = Router.defaultSupervisorStrategy, - val replicas: Int = ConsistentHashingRouter.DefaultReplicas) + val virtualNodesFactor: Int = 0) extends RouterConfig with ConsistentHashingLike { /** @@ -130,9 +124,9 @@ case class ConsistentHashingRouter( def withSupervisorStrategy(strategy: SupervisorStrategy): ConsistentHashingRouter = copy(supervisorStrategy = strategy) /** - * Java API for setting the number of replicas (virtual nodes) used in [[akka.routing.ConsistantHash]] + * Java API for setting the number of virtual nodes per node, used in [[akka.routing.ConsistantHash]] */ - def withReplicas(replicas: Int): ConsistentHashingRouter = copy(replicas = replicas) + def withVirtualNodesFactor(vnodes: Int): ConsistentHashingRouter = copy(virtualNodesFactor = vnodes) /** * Uses the resizer of the given RouterConfig if this RouterConfig @@ -145,6 +139,10 @@ case class ConsistentHashingRouter( } } +/** + * The core pieces of the routing logic is located in this + * trait to be able to extend. + */ trait ConsistentHashingLike { this: RouterConfig ⇒ import ConsistentHashingRouter._ @@ -153,7 +151,7 @@ trait ConsistentHashingLike { this: RouterConfig ⇒ def routees: Iterable[String] - def replicas: Int + def virtualNodesFactor: Int override def createRoute(routeeProvider: RouteeProvider): Route = { if (resizer.isEmpty) { @@ -162,20 +160,25 @@ trait ConsistentHashingLike { this: RouterConfig ⇒ } val log = Logging(routeeProvider.context.system, routeeProvider.context.self) + val vnodes = + if (virtualNodesFactor == 0) routeeProvider.context.system.settings.DefaultVirtualNodesFactor + else virtualNodesFactor - // consistentHashRoutees and consistentHash are updated together, synchronized on the consistentHashLock - val consistentHashLock = new Object - var consistentHashRoutees: IndexedSeq[ActorRef] = null - var consistentHash: ConsistentHash[ActorRef] = null - upateConsistentHash() + var consistentHashRoutees = new AtomicReference[IndexedSeq[ActorRef]] + @volatile var consistentHash: ConsistentHash[ActorRef] = null + updateConsistentHash() // update consistentHash when routees has changed // changes to routees are rare and when no changes this is a quick operation - def upateConsistentHash(): ConsistentHash[ActorRef] = consistentHashLock.synchronized { + def updateConsistentHash(): ConsistentHash[ActorRef] = { val currentRoutees = routeeProvider.routees - if ((currentRoutees ne consistentHashRoutees) && currentRoutees != consistentHashRoutees) { - consistentHashRoutees = currentRoutees - consistentHash = ConsistentHash(currentRoutees, replicas) + if (currentRoutees ne consistentHashRoutees.get) { + val oldConsistentHashRoutees = consistentHashRoutees.get + val rehash = currentRoutees != oldConsistentHashRoutees + // when other instance, same content, no need to re-hash, but try to set currentRoutees + // ignore, don't update, in case of CAS failure + if (consistentHashRoutees.compareAndSet(oldConsistentHashRoutees, currentRoutees) && rehash) + consistentHash = ConsistentHash(currentRoutees, vnodes) } consistentHash } @@ -186,13 +189,13 @@ trait ConsistentHashingLike { this: RouterConfig ⇒ case str: String ⇒ str.getBytes("UTF-8") case x: AnyRef ⇒ SerializationExtension(routeeProvider.context.system).serialize(x).get } - val currentConsistenHash = upateConsistentHash() + val currentConsistenHash = updateConsistentHash() if (currentConsistenHash.isEmpty) routeeProvider.context.system.deadLetters else currentConsistenHash.nodeFor(hash) } catch { case NonFatal(e) ⇒ // serialization failed - log.warning("Couldn't route message with consistentHashKey [%s] due to [%s]".format(hashData, e.getMessage)) + log.warning("Couldn't route message with consistentHashKey [{}] due to [{}]", hashData, e.getMessage) routeeProvider.context.system.deadLetters } diff --git a/akka-actor/src/main/scala/akka/routing/Routing.scala b/akka-actor/src/main/scala/akka/routing/Routing.scala index 9e17bf2b9a..da3c439c4d 100644 --- a/akka-actor/src/main/scala/akka/routing/Routing.scala +++ b/akka-actor/src/main/scala/akka/routing/Routing.scala @@ -596,6 +596,10 @@ case class RoundRobinRouter(nrOfInstances: Int = 0, routees: Iterable[String] = } } +/** + * The core pieces of the routing logic is located in this + * trait to be able to extend. + */ trait RoundRobinLike { this: RouterConfig ⇒ def nrOfInstances: Int @@ -729,6 +733,10 @@ case class RandomRouter(nrOfInstances: Int = 0, routees: Iterable[String] = Nil, } } +/** + * The core pieces of the routing logic is located in this + * trait to be able to extend. + */ trait RandomLike { this: RouterConfig ⇒ def nrOfInstances: Int @@ -869,6 +877,10 @@ case class SmallestMailboxRouter(nrOfInstances: Int = 0, routees: Iterable[Strin } } +/** + * The core pieces of the routing logic is located in this + * trait to be able to extend. + */ trait SmallestMailboxLike { this: RouterConfig ⇒ def nrOfInstances: Int @@ -1084,6 +1096,10 @@ case class BroadcastRouter(nrOfInstances: Int = 0, routees: Iterable[String] = N } } +/** + * The core pieces of the routing logic is located in this + * trait to be able to extend. + */ trait BroadcastLike { this: RouterConfig ⇒ def nrOfInstances: Int @@ -1213,6 +1229,10 @@ case class ScatterGatherFirstCompletedRouter(nrOfInstances: Int = 0, routees: It } } +/** + * The core pieces of the routing logic is located in this + * trait to be able to extend. + */ trait ScatterGatherFirstCompletedLike { this: RouterConfig ⇒ def nrOfInstances: Int diff --git a/akka-docs/scala/code/docs/routing/RouterViaConfigDocSpec.scala b/akka-docs/scala/code/docs/routing/RouterViaConfigDocSpec.scala index 265b0f8361..24dc291087 100644 --- a/akka-docs/scala/code/docs/routing/RouterViaConfigDocSpec.scala +++ b/akka-docs/scala/code/docs/routing/RouterViaConfigDocSpec.scala @@ -77,6 +77,7 @@ object RouterWithConfigDocSpec { /myrouter7 { router = consistent-hashing nr-of-instances = 5 + virtual-nodes-factor = 10 } } //#config-consistent-hashing From 5c20c07199bc35a99098e2a4c43729930a3e631a Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Thu, 13 Sep 2012 20:44:17 +0200 Subject: [PATCH 07/19] Use tuple to update ConsistentHash and routees together, see #944 --- .../routing/ConsistentHashingRouter.scala | 22 ++++++++++--------- 1 file changed, 12 insertions(+), 10 deletions(-) diff --git a/akka-actor/src/main/scala/akka/routing/ConsistentHashingRouter.scala b/akka-actor/src/main/scala/akka/routing/ConsistentHashingRouter.scala index e177fc413a..1e78025350 100644 --- a/akka-actor/src/main/scala/akka/routing/ConsistentHashingRouter.scala +++ b/akka-actor/src/main/scala/akka/routing/ConsistentHashingRouter.scala @@ -164,23 +164,25 @@ trait ConsistentHashingLike { this: RouterConfig ⇒ if (virtualNodesFactor == 0) routeeProvider.context.system.settings.DefaultVirtualNodesFactor else virtualNodesFactor - var consistentHashRoutees = new AtomicReference[IndexedSeq[ActorRef]] - @volatile var consistentHash: ConsistentHash[ActorRef] = null + // tuple of routees and the ConsistentHash, updated together in updateConsistentHash + val consistentHashRef = new AtomicReference[(IndexedSeq[ActorRef], ConsistentHash[ActorRef])]((null, null)) updateConsistentHash() // update consistentHash when routees has changed // changes to routees are rare and when no changes this is a quick operation def updateConsistentHash(): ConsistentHash[ActorRef] = { val currentRoutees = routeeProvider.routees - if (currentRoutees ne consistentHashRoutees.get) { - val oldConsistentHashRoutees = consistentHashRoutees.get - val rehash = currentRoutees != oldConsistentHashRoutees - // when other instance, same content, no need to re-hash, but try to set currentRoutees + val oldConsistentHashTuple = consistentHashRef.get + val (oldConsistentHashRoutees, oldConsistentHash) = oldConsistentHashTuple + if (currentRoutees ne oldConsistentHashRoutees) { + // when other instance, same content, no need to re-hash, but try to set routees + val consistentHash = + if (currentRoutees == oldConsistentHashRoutees) oldConsistentHash + else ConsistentHash(currentRoutees, vnodes) // re-hash // ignore, don't update, in case of CAS failure - if (consistentHashRoutees.compareAndSet(oldConsistentHashRoutees, currentRoutees) && rehash) - consistentHash = ConsistentHash(currentRoutees, vnodes) - } - consistentHash + consistentHashRef.compareAndSet(oldConsistentHashTuple, (currentRoutees, consistentHash)) + consistentHash + } else oldConsistentHash } def target(hashData: Any): ActorRef = try { From a515c28e931651c75a6648a568958e84845c1cde Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Thu, 13 Sep 2012 22:04:22 +0200 Subject: [PATCH 08/19] Correction, get from ref first, see #944 --- .../src/main/scala/akka/routing/ConsistentHashingRouter.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/akka-actor/src/main/scala/akka/routing/ConsistentHashingRouter.scala b/akka-actor/src/main/scala/akka/routing/ConsistentHashingRouter.scala index 1e78025350..b0a9ccb113 100644 --- a/akka-actor/src/main/scala/akka/routing/ConsistentHashingRouter.scala +++ b/akka-actor/src/main/scala/akka/routing/ConsistentHashingRouter.scala @@ -171,9 +171,9 @@ trait ConsistentHashingLike { this: RouterConfig ⇒ // update consistentHash when routees has changed // changes to routees are rare and when no changes this is a quick operation def updateConsistentHash(): ConsistentHash[ActorRef] = { - val currentRoutees = routeeProvider.routees val oldConsistentHashTuple = consistentHashRef.get val (oldConsistentHashRoutees, oldConsistentHash) = oldConsistentHashTuple + val currentRoutees = routeeProvider.routees if (currentRoutees ne oldConsistentHashRoutees) { // when other instance, same content, no need to re-hash, but try to set routees val consistentHash = From e3bd02b82cbf098f809a81f81a575f77e11c2239 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Fri, 14 Sep 2012 11:06:55 +0200 Subject: [PATCH 09/19] Optimized ConsistentHash with array and binarySearch, see #944 * Benchmarked with https://gist.github.com/3720666 * Instead of using TreeMap and rangeImpl this uses an sorted array of hash values of the nodes, and uses binarySearch in java.util.Arrays to find next node clockwise * Benchmarked with https://gist.github.com/3720666 * Benchmark results show improvement from 500 kmsg/s to 700 kmsg/s, round-robin handles 1300 kmsg/s --- .../scala/akka/routing/ConsistentHash.scala | 32 +++++++++++++++---- 1 file changed, 25 insertions(+), 7 deletions(-) diff --git a/akka-actor/src/main/scala/akka/routing/ConsistentHash.scala b/akka-actor/src/main/scala/akka/routing/ConsistentHash.scala index 19e457a762..51f593ad40 100644 --- a/akka-actor/src/main/scala/akka/routing/ConsistentHash.scala +++ b/akka-actor/src/main/scala/akka/routing/ConsistentHash.scala @@ -5,6 +5,7 @@ package akka.routing import scala.collection.immutable.TreeMap +import java.util.Arrays // ============================================================================================= // Adapted from HashRing.scala in Debasish Ghosh's Redis Client, licensed under Apache 2 license @@ -20,19 +21,26 @@ import scala.collection.immutable.TreeMap * hash, i.e. make sure it is different for different nodes. * */ -class ConsistentHash[T] private (nodeRing: TreeMap[Int, T], virtualNodesFactor: Int) { +class ConsistentHash[T] private (nodes: Map[Int, T], virtualNodesFactor: Int) { import ConsistentHash._ if (virtualNodesFactor < 1) throw new IllegalArgumentException("virtualNodesFactor must be >= 1") + // sorted hash values of the nodes + private val nodeRing: Array[Int] = { + val nodeRing = nodes.keys.toArray + Arrays.sort(nodeRing) + nodeRing + } + /** * Adds a node to the node ring. * Note that the instance is immutable and this * operation returns a new instance. */ def :+(node: T): ConsistentHash[T] = - new ConsistentHash(nodeRing ++ ((1 to virtualNodesFactor) map { r ⇒ (nodeHashFor(node, r) -> node) }), virtualNodesFactor) + new ConsistentHash(nodes ++ ((1 to virtualNodesFactor) map { r ⇒ (nodeHashFor(node, r) -> node) }), virtualNodesFactor) /** * Adds a node to the node ring. @@ -48,7 +56,7 @@ class ConsistentHash[T] private (nodeRing: TreeMap[Int, T], virtualNodesFactor: * operation returns a new instance. */ def :-(node: T): ConsistentHash[T] = - new ConsistentHash(nodeRing -- ((1 to virtualNodesFactor) map { r ⇒ nodeHashFor(node, r) }), virtualNodesFactor) + new ConsistentHash(nodes -- ((1 to virtualNodesFactor) map { r ⇒ nodeHashFor(node, r) }), virtualNodesFactor) /** * Removes a node from the node ring. @@ -65,15 +73,25 @@ class ConsistentHash[T] private (nodeRing: TreeMap[Int, T], virtualNodesFactor: */ def nodeFor(key: Array[Byte]): T = { if (isEmpty) throw new IllegalStateException("Can't get node for [%s] from an empty node ring" format key) - val hash = hashFor(key) - // find the next node clockwise in the nodeRing, pick first if end of tree - nodeRing.rangeImpl(Some(hash), None).headOption.getOrElse(nodeRing.head)._2 + + // converts the result of Arrays.binarySearch into a index in the nodeRing array + // see documentation of Arrays.binarySearch for what it returns + def idx(i: Int): Int = { + if (i >= 0) i // exact match + else { + val j = math.abs(i + 1) + if (j >= nodeRing.length) 0 // after last, use first + else j // next node clockwise + } + } + val nodeRingIndex = idx(Arrays.binarySearch(nodeRing, hashFor(key))) + nodes(nodeRing(nodeRingIndex)) } /** * Is the node ring empty, i.e. no nodes added or all removed. */ - def isEmpty: Boolean = nodeRing.isEmpty + def isEmpty: Boolean = nodes.isEmpty } From a4dd6b754754fbdd61e1fcfa76497664380985df Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Fri, 14 Sep 2012 13:47:58 +0200 Subject: [PATCH 10/19] Support partial function to map message to hash key, see #944 * Partial function, ConsistentHashRoute, for Scala API * withConsistentHashMapping ConsistentHashMapping for Java API * Updated documentation --- .../routing/ConsistentHashingRouterSpec.scala | 31 ++++- .../src/test/scala/perf/RouterPerf.scala | 79 +++++++++++++ .../routing/ConsistentHashingRouter.scala | 106 +++++++++++++++--- akka-docs/java/routing.rst | 23 +++- .../ConsistentHashingRouterDocSpec.scala | 3 + akka-docs/scala/routing.rst | 22 +++- 6 files changed, 235 insertions(+), 29 deletions(-) create mode 100644 akka-actor-tests/src/test/scala/perf/RouterPerf.scala diff --git a/akka-actor-tests/src/test/scala/akka/routing/ConsistentHashingRouterSpec.scala b/akka-actor-tests/src/test/scala/akka/routing/ConsistentHashingRouterSpec.scala index 7a28190523..b4ef639067 100644 --- a/akka-actor-tests/src/test/scala/akka/routing/ConsistentHashingRouterSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/routing/ConsistentHashingRouterSpec.scala @@ -24,6 +24,10 @@ object ConsistentHashingRouterSpec { nr-of-instances = 3 virtual-nodes-factor = 17 } + /router2 { + router = consistent-hashing + nr-of-instances = 5 + } } """ @@ -38,6 +42,8 @@ object ConsistentHashingRouterSpec { } case class MsgKey(name: String) + + case class Msg2(key: Any, data: String) } @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) @@ -66,7 +72,30 @@ class ConsistentHashingRouterSpec extends AkkaSpec(ConsistentHashingRouterSpec.c router1 ! Msg(MsgKey("c"), "C") val destinationC = expectMsgPF(remaining) { case ref: ActorRef ⇒ ref } - router1 ! Msg(MsgKey("c"), "CC") + router1 ! ConsistentHashableEnvelope(message = "CC", consistentHashKey = MsgKey("c")) + expectMsg(destinationC) + } + + "select destination with defined consistentHashRoute" in { + def consistentHashRoute: ConsistentHashingRouter.ConsistentHashRoute = { + case Msg2(key, data) ⇒ key + } + val router2 = system.actorOf(Props[Echo].withRouter(ConsistentHashingRouter( + consistentHashRoute = consistentHashRoute)), "router2") + + router2 ! Msg2("a", "A") + val destinationA = expectMsgPF(remaining) { case ref: ActorRef ⇒ ref } + router2 ! ConsistentHashableEnvelope(message = "AA", consistentHashKey = "a") + expectMsg(destinationA) + + router2 ! Msg2(17, "B") + val destinationB = expectMsgPF(remaining) { case ref: ActorRef ⇒ ref } + router2 ! ConsistentHashableEnvelope(message = "BB", consistentHashKey = 17) + expectMsg(destinationB) + + router2 ! Msg2(MsgKey("c"), "C") + val destinationC = expectMsgPF(remaining) { case ref: ActorRef ⇒ ref } + router2 ! ConsistentHashableEnvelope(message = "CC", consistentHashKey = MsgKey("c")) expectMsg(destinationC) } } diff --git a/akka-actor-tests/src/test/scala/perf/RouterPerf.scala b/akka-actor-tests/src/test/scala/perf/RouterPerf.scala new file mode 100644 index 0000000000..20f9554540 --- /dev/null +++ b/akka-actor-tests/src/test/scala/perf/RouterPerf.scala @@ -0,0 +1,79 @@ +package perf + +import akka.actor.ActorSystem +import akka.actor.Actor +import akka.actor.Props +import scala.concurrent.forkjoin.ThreadLocalRandom +import akka.routing.ConsistentHashingRouter +import akka.routing.ConsistentHashingRouter.ConsistentHashableEnvelope +import scala.util.Random +import akka.routing.Broadcast +import akka.actor.ActorLogging +import scala.concurrent.util.Duration +import akka.routing.RoundRobinRouter + +object RouterPerf extends App { + val system = ActorSystem("PerfApp") + var perf = new RouterPerf(system) + // Thread.sleep(20000) // hook up profiler here + perf.run() +} + +class RouterPerf(system: ActorSystem) { + + def run(): Unit = { + // nbrMessages = 10000000 + val sender = system.actorOf(Props(new Sender( + nbrMessages = 10000000, nbrRoutees = 10, nbrIterations = 10)), name = "sender") + sender ! "start" + } + +} + +class Sender(nbrMessages: Int, nbrRoutees: Int, nbrIterations: Int) extends Actor with ActorLogging { + val router = context.actorOf(Props[Destination].withRouter(ConsistentHashingRouter(nbrRoutees, + virtualNodesFactor = 10)), "router") + // val router = context.actorOf(Props[Destination].withRouter(RoundRobinRouter(nbrRoutees)), "router") + val rnd = new Random + val messages = Vector.fill(1000)(ConsistentHashableEnvelope("msg", rnd.nextString(10))) + var startTime = 0L + var doneCounter = 0 + var iterationCounter = 0 + + def receive = { + case "start" ⇒ + iterationCounter += 1 + doneCounter = 0 + startTime = System.nanoTime + val messgesSize = messages.size + for (n ← 1 to nbrMessages) { router ! messages(n % messgesSize) } + router ! Broadcast("done") + + case "done" ⇒ + doneCounter += 1 + if (doneCounter == nbrRoutees) { + val duration = Duration.fromNanos(System.nanoTime - startTime) + val mps = (nbrMessages.toDouble * 1000 / duration.toMillis).toInt + // log.info("Processed [{}] messages in [{} millis], i.e. [{}] msg/s", + // nbrMessages, duration.toMillis, mps) + println("Processed [%s] messages in [%s millis], i.e. [%s] msg/s".format( + nbrMessages, duration.toMillis, mps)) + if (iterationCounter < nbrIterations) + self ! "start" + else + context.system.shutdown() + } + } +} + +class Destination extends Actor with ActorLogging { + var count = 0 + def receive = { + case "done" ⇒ + log.info("Handled [{}] messages", count) + count = 0 + sender ! "done" + case msg ⇒ count += 1 + + } +} \ No newline at end of file diff --git a/akka-actor/src/main/scala/akka/routing/ConsistentHashingRouter.scala b/akka-actor/src/main/scala/akka/routing/ConsistentHashingRouter.scala index b0a9ccb113..a08bbdd481 100644 --- a/akka-actor/src/main/scala/akka/routing/ConsistentHashingRouter.scala +++ b/akka-actor/src/main/scala/akka/routing/ConsistentHashingRouter.scala @@ -29,23 +29,29 @@ object ConsistentHashingRouter { } /** - * Messages need to implement this interface to define what + * If you don't define the consistentHashRoute when + * constructing the [[akka.routing.ConsistentHashingRouter]] + * the messages need to implement this interface to define what * data to use for the consistent hash key. Note that it's not - * the hash, but the data to be hashed. If returning an - * `Array[Byte]` or String it will be used as is, otherwise the - * configured [[akka.akka.serialization.Serializer]] will - * be applied to the returned data. + * the hash, but the data to be hashed. + * + * If returning an `Array[Byte]` or String it will be used as is, + * otherwise the configured [[akka.akka.serialization.Serializer]] + * will be applied to the returned data. * * If messages can't implement this interface themselves, * it's possible to wrap the messages in - * [[akka.routing.ConsistentHashableEnvelope]] + * [[akka.routing.ConsistentHashingRouter.ConsistentHashableEnvelope]], + * or use [[akka.routing.ConsistentHashingRouter.ConsistentHashableEnvelope]] */ trait ConsistentHashable { def consistentHashKey: Any } /** - * If messages can't implement [[akka.routing.ConsistentHashable]] + * If you don't define the consistentHashRoute when + * constructing the [[akka.routing.ConsistentHashingRouter]] + * and messages can't implement [[akka.routing.ConsistentHashingRouter.ConsistentHashable]] * themselves they can we wrapped by this envelope instead. The * router will only send the wrapped message to the destination, * i.e. the envelope will be stripped off. @@ -54,12 +60,57 @@ object ConsistentHashingRouter { case class ConsistentHashableEnvelope(message: Any, consistentHashKey: Any) extends ConsistentHashable with RouterEnvelope + /** + * Partial function from message to the data to + * use for the consistent hash key. Note that it's not + * the hash that is to be returned, but the data to be hashed. + * + * If returning an `Array[Byte]` or String it will be used as is, + * otherwise the configured [[akka.akka.serialization.Serializer]] + * will be applied to the returned data. + */ + type ConsistentHashRoute = PartialFunction[Any, Any] + + @SerialVersionUID(1L) + object emptyConsistentHashRoute extends ConsistentHashRoute { + def isDefinedAt(x: Any) = false + def apply(x: Any) = throw new UnsupportedOperationException("Empty ConsistentHashRoute apply()") + } + + /** + * JAVA API + * Mapping from message to the data to use for the consistent hash key. + * Note that it's not the hash that is to be returned, but the data to be + * hashed. + * + * If returning an `Array[Byte]` or String it will be used as is, + * otherwise the configured [[akka.akka.serialization.Serializer]] + * will be applied to the returned data. + */ + trait ConsistentHashMapping { + def consistentHashKey(message: Any): Any + } } /** * A Router that uses consistent hashing to select a connection based on the - * sent message. The messages must implement [[akka.routing.ConsistentHashable]] - * or be wrapped in a [[akka.routing.ConsistentHashableEnvelope]] to define what - * data to use for the consistent hash key. + * sent message. + * + * There is 3 ways to define what data to use for the consistent hash key. + * + * 1. You can define `consistentHashRoute` / `withConsistentHashMapping` + * of the router to map incoming messages to their consistent hash key. + * This makes the makes the decision transparent for the sender. + * + * 2. The messages may implement [[akka.routing.ConsistentHashable]]. + * The key is part of the message and it's convenient to define it together + * with the message definition. + * + * 3. The messages can be be wrapped in a [[akka.routing.ConsistentHashingRouter.ConsistentHashableEnvelope]] + * to define what data to use for the consistent hash key. The sender knows + * the key to use. + * + * These ways to define the consistent hash key can be use together and at + * the same time for one router. The `consistentHashRoute` is tried first. * * Please note that providing both 'nrOfInstances' and 'routees' does not make logical * sense as this means that the router should both create new actors and use the 'routees' @@ -83,13 +134,16 @@ object ConsistentHashingRouter { * @param routees string representation of the actor paths of the routees that will be looked up * using `actorFor` in [[akka.actor.ActorRefProvider]] * @param virtualNodesFactor number of virtual nodes per node, used in [[akka.routing.ConsistantHash]] + * @param consistentHashRoute partial function from message to the data to + * use for the consistent hash key */ @SerialVersionUID(1L) case class ConsistentHashingRouter( nrOfInstances: Int = 0, routees: Iterable[String] = Nil, override val resizer: Option[Resizer] = None, val routerDispatcher: String = Dispatchers.DefaultDispatcherId, val supervisorStrategy: SupervisorStrategy = Router.defaultSupervisorStrategy, - val virtualNodesFactor: Int = 0) + val virtualNodesFactor: Int = 0, + val consistentHashRoute: ConsistentHashingRouter.ConsistentHashRoute = ConsistentHashingRouter.emptyConsistentHashRoute) extends RouterConfig with ConsistentHashingLike { /** @@ -128,14 +182,29 @@ case class ConsistentHashingRouter( */ def withVirtualNodesFactor(vnodes: Int): ConsistentHashingRouter = copy(virtualNodesFactor = vnodes) + /** + * Java API for setting the mapping from message to the data to use for the consistent hash key + */ + def withConsistentHashMapping(mapping: ConsistentHashingRouter.ConsistentHashMapping) = { + copy(consistentHashRoute = { + case message ⇒ mapping.consistentHashKey(message) + }) + } + /** * Uses the resizer of the given RouterConfig if this RouterConfig * doesn't have one, i.e. the resizer defined in code is used if * resizer was not defined in config. + * Uses the the consistentHashRoute defined in code, since + * that can't be defined in configuration. */ - override def withFallback(other: RouterConfig): RouterConfig = { - if (this.resizer.isEmpty && other.resizer.isDefined) copy(resizer = other.resizer) - else this + override def withFallback(other: RouterConfig): RouterConfig = other match { + case otherRouter: ConsistentHashingRouter ⇒ + val useResizer = + if (this.resizer.isEmpty && otherRouter.resizer.isDefined) otherRouter.resizer + else this.resizer + copy(resizer = useResizer, consistentHashRoute = otherRouter.consistentHashRoute) + case _ ⇒ throw new IllegalArgumentException("Expected ConsistentHashingRouter, got [%s]".format(other)) } } @@ -153,6 +222,8 @@ trait ConsistentHashingLike { this: RouterConfig ⇒ def virtualNodesFactor: Int + def consistentHashRoute: ConsistentHashRoute + override def createRoute(routeeProvider: RouteeProvider): Route = { if (resizer.isEmpty) { if (routees.isEmpty) routeeProvider.createRoutees(nrOfInstances) @@ -204,14 +275,17 @@ trait ConsistentHashingLike { this: RouterConfig ⇒ { case (sender, message) ⇒ message match { - case Broadcast(msg) ⇒ toAll(sender, routeeProvider.routees) + case Broadcast(msg) ⇒ toAll(sender, routeeProvider.routees) + case _ if consistentHashRoute.isDefinedAt(message) ⇒ + List(Destination(sender, target(consistentHashRoute(message)))) case hashable: ConsistentHashable ⇒ List(Destination(sender, target(hashable.consistentHashKey))) case other ⇒ - log.warning("Message [{}] must implement [{}] or be wrapped in [{}]", + log.warning("Message [{}] must be handled by consistentHashRoute, or implement [{}] or be wrapped in [{}]", message.getClass.getName, classOf[ConsistentHashable].getName, classOf[ConsistentHashableEnvelope].getName) List(Destination(sender, routeeProvider.context.system.deadLetters)) } + } } } \ No newline at end of file diff --git a/akka-docs/java/routing.rst b/akka-docs/java/routing.rst index 561cbd9446..0096c9e3e8 100644 --- a/akka-docs/java/routing.rst +++ b/akka-docs/java/routing.rst @@ -15,6 +15,7 @@ is really easy to create your own. The routers shipped with Akka are: * ``akka.routing.SmallestMailboxRouter`` * ``akka.routing.BroadcastRouter`` * ``akka.routing.ScatterGatherFirstCompletedRouter`` +* ``akka.routing.ConsistentHashingRouter`` Routers In Action ^^^^^^^^^^^^^^^^^ @@ -283,12 +284,22 @@ to select a connection based on the sent message. This `article `_ gives good insight into how consistent hashing is implemented. -The messages sent to a ConsistentHashingRouter must implement -``akka.routing.ConsistentHashable`` or be wrapped in a ``akka.routing.ConsistentHashableEnvelope`` -to define what data to use for the consistent hash key. If returning a -byte array or String it will be used as is, otherwise the configured -:ref:`serializer ` will be applied to the returned data -to create a byte array that will be hashed. +There is 3 ways to define what data to use for the consistent hash key. + +* You can define ``withConsistentHashMapping`` of the router to map incoming + messages to their consistent hash key. This makes the makes the decision + transparent for the sender. + +* The messages may implement ``akka.routing.ConsistentHashable``. + The key is part of the message and it's convenient to define it together + with the message definition. + +* The messages can be be wrapped in a ``akka.routing.ConsistentHashingRouter.ConsistentHashableEnvelope`` + to define what data to use for the consistent hash key. The sender knows + the key to use. + +These ways to define the consistent hash key can be use together and at +the same time for one router. The ``withConsistentHashMapping`` is tried first. Code example: diff --git a/akka-docs/scala/code/docs/routing/ConsistentHashingRouterDocSpec.scala b/akka-docs/scala/code/docs/routing/ConsistentHashingRouterDocSpec.scala index 8bb59a7bce..42af05c049 100644 --- a/akka-docs/scala/code/docs/routing/ConsistentHashingRouterDocSpec.scala +++ b/akka-docs/scala/code/docs/routing/ConsistentHashingRouterDocSpec.scala @@ -19,9 +19,12 @@ object ConsistentHashingRouterDocSpec { def receive = { case Entry(key, value) ⇒ cache += (key -> value) case Get(key) ⇒ sender ! cache.get(key) + case Evict(key) => cache -= key } } + case class Evict(key: String) + case class Get(key: String) extends ConsistentHashable { override def consistentHashKey: Any = key } diff --git a/akka-docs/scala/routing.rst b/akka-docs/scala/routing.rst index 3f384895c6..0cffa3c1a7 100644 --- a/akka-docs/scala/routing.rst +++ b/akka-docs/scala/routing.rst @@ -289,12 +289,22 @@ to select a connection based on the sent message. This `article `_ gives good insight into how consistent hashing is implemented. -The messages sent to a ConsistentHashingRouter must implement -``akka.routing.ConsistentHashable`` or be wrapped in a ``akka.routing.ConsistentHashableEnvelope`` -to define what data to use for the consistent hash key. If returning a -byte array or String it will be used as is, otherwise the configured -:ref:`serializer ` will be applied to the returned data -to create a byte array that will be hashed. +There is 3 ways to define what data to use for the consistent hash key. + +* You can define ``consistentHashRoute`` of the router to map incoming + messages to their consistent hash key. This makes the makes the decision + transparent for the sender. + +* The messages may implement ``akka.routing.ConsistentHashable``. + The key is part of the message and it's convenient to define it together + with the message definition. + +* The messages can be be wrapped in a ``akka.routing.ConsistentHashingRouter.ConsistentHashableEnvelope`` + to define what data to use for the consistent hash key. The sender knows + the key to use. + +These ways to define the consistent hash key can be use together and at +the same time for one router. The ``consistentHashRoute`` is tried first. Code example: From 48d8a09075fd3dd000882fb4692e28e3e4b85839 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Fri, 14 Sep 2012 13:53:04 +0200 Subject: [PATCH 11/19] Remove note about redis client, completely different impl, see #944 --- akka-actor/src/main/scala/akka/routing/ConsistentHash.scala | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/akka-actor/src/main/scala/akka/routing/ConsistentHash.scala b/akka-actor/src/main/scala/akka/routing/ConsistentHash.scala index 51f593ad40..bc86f5f82d 100644 --- a/akka-actor/src/main/scala/akka/routing/ConsistentHash.scala +++ b/akka-actor/src/main/scala/akka/routing/ConsistentHash.scala @@ -7,12 +7,8 @@ package akka.routing import scala.collection.immutable.TreeMap import java.util.Arrays -// ============================================================================================= -// Adapted from HashRing.scala in Debasish Ghosh's Redis Client, licensed under Apache 2 license -// ============================================================================================= - /** - * Consistent Hashing node ring abstraction. + * Consistent Hashing node ring implementation. * * A good explanation of Consistent Hashing: * http://weblogs.java.net/blog/tomwhite/archive/2007/11/consistent_hash.html From 50fc5a03a2adc35ca0bd3d20e5db1b68decd50d1 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Fri, 14 Sep 2012 14:28:47 +0200 Subject: [PATCH 12/19] Complete unfinished ConsistentHashingRouterDocSpec, see #944 --- akka-docs/java/routing.rst | 3 +- .../ConsistentHashingRouterDocSpec.scala | 29 +++++++++++++------ akka-docs/scala/routing.rst | 3 +- 3 files changed, 24 insertions(+), 11 deletions(-) diff --git a/akka-docs/java/routing.rst b/akka-docs/java/routing.rst index 0096c9e3e8..dad68ecf30 100644 --- a/akka-docs/java/routing.rst +++ b/akka-docs/java/routing.rst @@ -306,7 +306,8 @@ Code example: FIXME Java example of consistent routing In the above example you see that the ``Get`` message implements ``ConsistentHashable`` itself, -while the ``Entry`` message is wrapped in a ``ConsistentHashableEnvelope``. +while the ``Entry`` message is wrapped in a ``ConsistentHashableEnvelope``. The ``Evict`` +message is handled by the ``withConsistentHashMapping``. This is an example of how to define a consistent-hashing router in configuration: diff --git a/akka-docs/scala/code/docs/routing/ConsistentHashingRouterDocSpec.scala b/akka-docs/scala/code/docs/routing/ConsistentHashingRouterDocSpec.scala index 42af05c049..89a329c743 100644 --- a/akka-docs/scala/code/docs/routing/ConsistentHashingRouterDocSpec.scala +++ b/akka-docs/scala/code/docs/routing/ConsistentHashingRouterDocSpec.scala @@ -19,7 +19,7 @@ object ConsistentHashingRouterDocSpec { def receive = { case Entry(key, value) ⇒ cache += (key -> value) case Get(key) ⇒ sender ! cache.get(key) - case Evict(key) => cache -= key + case Evict(key) ⇒ cache -= key } } @@ -30,10 +30,6 @@ object ConsistentHashingRouterDocSpec { } case class Entry(key: String, value: String) - case class EntryEnvelope(entry: Entry) extends ConsistentHashableEnvelope { - override def consistentHashKey: Any = entry.key - override def message: Any = entry - } //#cache-actor } @@ -47,16 +43,31 @@ class ConsistentHashingRouterDocSpec extends AkkaSpec with ImplicitSender { //#consistent-hashing-router import akka.actor.Props import akka.routing.ConsistentHashingRouter + import akka.routing.ConsistentHashingRouter.ConsistentHashRoute + import akka.routing.ConsistentHashingRouter.ConsistentHashableEnvelope - val cache = system.actorOf(Props[Cache].withRouter(ConsistentHashingRouter(10)), "cache") + def consistentHashRoute: ConsistentHashingRouter.ConsistentHashRoute = { + case Evict(key) ⇒ key + } - cache ! EntryEnvelope(Entry("hello", "HELLO")) - cache ! EntryEnvelope(Entry("hi", "HI")) + val cache = system.actorOf(Props[Cache].withRouter(ConsistentHashingRouter(10, + consistentHashRoute = consistentHashRoute)), name = "cache") + + cache ! ConsistentHashableEnvelope( + message = Entry("hello", "HELLO"), consistentHashKey = "hello") + cache ! ConsistentHashableEnvelope( + message = Entry("hi", "HI"), consistentHashKey = "hi") cache ! Get("hello") - cache ! Get("hi") expectMsg(Some("HELLO")) + + cache ! Get("hi") expectMsg(Some("HI")) + + cache ! Evict("hi") + cache ! Get("hi") + expectMsg(None) + //#consistent-hashing-router } diff --git a/akka-docs/scala/routing.rst b/akka-docs/scala/routing.rst index 0cffa3c1a7..e10a353988 100644 --- a/akka-docs/scala/routing.rst +++ b/akka-docs/scala/routing.rst @@ -313,7 +313,8 @@ Code example: .. includecode:: code/docs/routing/ConsistentHashingRouterDocSpec.scala#consistent-hashing-router In the above example you see that the ``Get`` message implements ``ConsistentHashable`` itself, -while the ``Entry`` message is wrapped in a ``ConsistentHashableEnvelope``. +while the ``Entry`` message is wrapped in a ``ConsistentHashableEnvelope``. The ``Evict`` +message is handled by the ``consistentHashRoute`` partial function. This is an example of how to define a consistent-hashing router in configuration: From 08dbc4d23500a0c0efff816bcf1ffa04dd1b758f Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Mon, 17 Sep 2012 09:30:52 +0200 Subject: [PATCH 13/19] Incorporate minor review feedback, see #944 --- .../akka/routing/ConsistentHashingRouterSpec.scala | 12 ++++++------ .../routing/ConsistentHashingRouterDocSpec.scala | 2 +- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/akka-actor-tests/src/test/scala/akka/routing/ConsistentHashingRouterSpec.scala b/akka-actor-tests/src/test/scala/akka/routing/ConsistentHashingRouterSpec.scala index b4ef639067..5ac3ea5566 100644 --- a/akka-actor-tests/src/test/scala/akka/routing/ConsistentHashingRouterSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/routing/ConsistentHashingRouterSpec.scala @@ -61,17 +61,17 @@ class ConsistentHashingRouterSpec extends AkkaSpec(ConsistentHashingRouterSpec.c "select destination based on consistentHashKey of the message" in { router1 ! Msg("a", "A") - val destinationA = expectMsgPF(remaining) { case ref: ActorRef ⇒ ref } + val destinationA = expectMsgType[ActorRef] router1 ! ConsistentHashableEnvelope(message = "AA", consistentHashKey = "a") expectMsg(destinationA) router1 ! Msg(17, "B") - val destinationB = expectMsgPF(remaining) { case ref: ActorRef ⇒ ref } + val destinationB = expectMsgType[ActorRef] router1 ! ConsistentHashableEnvelope(message = "BB", consistentHashKey = 17) expectMsg(destinationB) router1 ! Msg(MsgKey("c"), "C") - val destinationC = expectMsgPF(remaining) { case ref: ActorRef ⇒ ref } + val destinationC = expectMsgType[ActorRef] router1 ! ConsistentHashableEnvelope(message = "CC", consistentHashKey = MsgKey("c")) expectMsg(destinationC) } @@ -84,17 +84,17 @@ class ConsistentHashingRouterSpec extends AkkaSpec(ConsistentHashingRouterSpec.c consistentHashRoute = consistentHashRoute)), "router2") router2 ! Msg2("a", "A") - val destinationA = expectMsgPF(remaining) { case ref: ActorRef ⇒ ref } + val destinationA = expectMsgType[ActorRef] router2 ! ConsistentHashableEnvelope(message = "AA", consistentHashKey = "a") expectMsg(destinationA) router2 ! Msg2(17, "B") - val destinationB = expectMsgPF(remaining) { case ref: ActorRef ⇒ ref } + val destinationB = expectMsgType[ActorRef] router2 ! ConsistentHashableEnvelope(message = "BB", consistentHashKey = 17) expectMsg(destinationB) router2 ! Msg2(MsgKey("c"), "C") - val destinationC = expectMsgPF(remaining) { case ref: ActorRef ⇒ ref } + val destinationC = expectMsgType[ActorRef] router2 ! ConsistentHashableEnvelope(message = "CC", consistentHashKey = MsgKey("c")) expectMsg(destinationC) } diff --git a/akka-docs/scala/code/docs/routing/ConsistentHashingRouterDocSpec.scala b/akka-docs/scala/code/docs/routing/ConsistentHashingRouterDocSpec.scala index 89a329c743..945a78e1eb 100644 --- a/akka-docs/scala/code/docs/routing/ConsistentHashingRouterDocSpec.scala +++ b/akka-docs/scala/code/docs/routing/ConsistentHashingRouterDocSpec.scala @@ -46,7 +46,7 @@ class ConsistentHashingRouterDocSpec extends AkkaSpec with ImplicitSender { import akka.routing.ConsistentHashingRouter.ConsistentHashRoute import akka.routing.ConsistentHashingRouter.ConsistentHashableEnvelope - def consistentHashRoute: ConsistentHashingRouter.ConsistentHashRoute = { + def consistentHashRoute: ConsistentHashRoute = { case Evict(key) ⇒ key } From d59612609fec5e8e20dc40ce360b49dd0088d90a Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Mon, 17 Sep 2012 09:39:35 +0200 Subject: [PATCH 14/19] Handle FromConfig in withFallback also, see #944 --- .../test/scala/akka/routing/ConsistentHashingRouterSpec.scala | 2 +- .../src/main/scala/akka/routing/ConsistentHashingRouter.scala | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/akka-actor-tests/src/test/scala/akka/routing/ConsistentHashingRouterSpec.scala b/akka-actor-tests/src/test/scala/akka/routing/ConsistentHashingRouterSpec.scala index 5ac3ea5566..0024ceea44 100644 --- a/akka-actor-tests/src/test/scala/akka/routing/ConsistentHashingRouterSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/routing/ConsistentHashingRouterSpec.scala @@ -51,7 +51,7 @@ class ConsistentHashingRouterSpec extends AkkaSpec(ConsistentHashingRouterSpec.c import akka.routing.ConsistentHashingRouterSpec._ implicit val ec = system.dispatcher - val router1 = system.actorOf(Props[Echo].withRouter(ConsistentHashingRouter()), "router1") + val router1 = system.actorOf(Props[Echo].withRouter(FromConfig()), "router1") "consistent hashing router" must { "create routees from configuration" in { diff --git a/akka-actor/src/main/scala/akka/routing/ConsistentHashingRouter.scala b/akka-actor/src/main/scala/akka/routing/ConsistentHashingRouter.scala index a08bbdd481..d8d384c5c0 100644 --- a/akka-actor/src/main/scala/akka/routing/ConsistentHashingRouter.scala +++ b/akka-actor/src/main/scala/akka/routing/ConsistentHashingRouter.scala @@ -199,6 +199,7 @@ case class ConsistentHashingRouter( * that can't be defined in configuration. */ override def withFallback(other: RouterConfig): RouterConfig = other match { + case fromConfig: FromConfig ⇒ this case otherRouter: ConsistentHashingRouter ⇒ val useResizer = if (this.resizer.isEmpty && otherRouter.resizer.isDefined) otherRouter.resizer From 48a0fda9915802d5516e476c489b01edc840a0a1 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Mon, 17 Sep 2012 11:40:06 +0200 Subject: [PATCH 15/19] Java doc sample for consistent hashing router, see #944 * Allow return null as undefined ConsistentHashMapping --- .../routing/ConsistentHashingRouter.scala | 14 +- .../ConsistentHashingRouterDocTest.scala | 5 + .../ConsistentHashingRouterDocTestBase.java | 136 ++++++++++++++++++ akka-docs/java/routing.rst | 10 +- .../ConsistentHashingRouterDocSpec.scala | 1 - akka-docs/scala/routing.rst | 4 +- 6 files changed, 159 insertions(+), 11 deletions(-) create mode 100644 akka-docs/java/code/docs/jrouting/ConsistentHashingRouterDocTest.scala create mode 100644 akka-docs/java/code/docs/jrouting/ConsistentHashingRouterDocTestBase.java diff --git a/akka-actor/src/main/scala/akka/routing/ConsistentHashingRouter.scala b/akka-actor/src/main/scala/akka/routing/ConsistentHashingRouter.scala index d8d384c5c0..9650a038e9 100644 --- a/akka-actor/src/main/scala/akka/routing/ConsistentHashingRouter.scala +++ b/akka-actor/src/main/scala/akka/routing/ConsistentHashingRouter.scala @@ -83,6 +83,9 @@ object ConsistentHashingRouter { * Note that it's not the hash that is to be returned, but the data to be * hashed. * + * May return `null` to indicate that the message is not handled by + * this mapping. + * * If returning an `Array[Byte]` or String it will be used as is, * otherwise the configured [[akka.akka.serialization.Serializer]] * will be applied to the returned data. @@ -99,9 +102,9 @@ object ConsistentHashingRouter { * * 1. You can define `consistentHashRoute` / `withConsistentHashMapping` * of the router to map incoming messages to their consistent hash key. - * This makes the makes the decision transparent for the sender. + * This makes the decision transparent for the sender. * - * 2. The messages may implement [[akka.routing.ConsistentHashable]]. + * 2. The messages may implement [[akka.routing.ConsistentHashingRouter.ConsistentHashable]]. * The key is part of the message and it's convenient to define it together * with the message definition. * @@ -183,11 +186,12 @@ case class ConsistentHashingRouter( def withVirtualNodesFactor(vnodes: Int): ConsistentHashingRouter = copy(virtualNodesFactor = vnodes) /** - * Java API for setting the mapping from message to the data to use for the consistent hash key + * Java API for setting the mapping from message to the data to use for the consistent hash key. */ def withConsistentHashMapping(mapping: ConsistentHashingRouter.ConsistentHashMapping) = { copy(consistentHashRoute = { - case message ⇒ mapping.consistentHashKey(message) + case message if (mapping.consistentHashKey(message).asInstanceOf[AnyRef] ne null) ⇒ + mapping.consistentHashKey(message) }) } @@ -199,7 +203,7 @@ case class ConsistentHashingRouter( * that can't be defined in configuration. */ override def withFallback(other: RouterConfig): RouterConfig = other match { - case fromConfig: FromConfig ⇒ this + case _: FromConfig ⇒ this case otherRouter: ConsistentHashingRouter ⇒ val useResizer = if (this.resizer.isEmpty && otherRouter.resizer.isDefined) otherRouter.resizer diff --git a/akka-docs/java/code/docs/jrouting/ConsistentHashingRouterDocTest.scala b/akka-docs/java/code/docs/jrouting/ConsistentHashingRouterDocTest.scala new file mode 100644 index 0000000000..a043f9cb73 --- /dev/null +++ b/akka-docs/java/code/docs/jrouting/ConsistentHashingRouterDocTest.scala @@ -0,0 +1,5 @@ +package docs.jrouting; + +import org.scalatest.junit.JUnitSuite + +class ConsistentHashingRouterDocTest extends ConsistentHashingRouterDocTestBase with JUnitSuite diff --git a/akka-docs/java/code/docs/jrouting/ConsistentHashingRouterDocTestBase.java b/akka-docs/java/code/docs/jrouting/ConsistentHashingRouterDocTestBase.java new file mode 100644 index 0000000000..6f00ed1509 --- /dev/null +++ b/akka-docs/java/code/docs/jrouting/ConsistentHashingRouterDocTestBase.java @@ -0,0 +1,136 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ +package docs.jrouting; + +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import akka.testkit.JavaTestKit; +import akka.actor.ActorSystem; + +//#imports1 +import akka.actor.UntypedActor; +import akka.routing.ConsistentHashingRouter.ConsistentHashable; +import java.util.Map; +import java.util.HashMap; +import java.io.Serializable; +//#imports1 + +//#imports2 +import akka.actor.Props; +import akka.actor.ActorRef; +import akka.routing.ConsistentHashingRouter; +import akka.routing.ConsistentHashingRouter.ConsistentHashMapping; +import akka.routing.ConsistentHashingRouter.ConsistentHashableEnvelope; +//#imports2 + +public class ConsistentHashingRouterDocTestBase { + + static ActorSystem system; + + @BeforeClass + public static void setup() { + system = ActorSystem.create(); + } + + @AfterClass + public static void teardown() { + system.shutdown(); + } + + //#cache-actor + + public static class Cache extends UntypedActor { + Map cache = new HashMap(); + + public void onReceive(Object msg) { + if (msg instanceof Entry) { + Entry entry = (Entry) msg; + cache.put(entry.key, entry.value); + } else if (msg instanceof Get) { + Get get = (Get) msg; + Object value = cache.get(get.key); + getSender().tell(value == null ? NOT_FOUND : value, + getContext().self()); + } else if (msg instanceof Evict) { + Evict evict = (Evict) msg; + cache.remove(evict.key); + } else { + unhandled(msg); + } + } + } + + public static class Evict implements Serializable { + public final String key; + public Evict(String key) { + this.key = key; + } + } + + public static class Get implements Serializable, ConsistentHashable { + public final String key; + public Get(String key) { + this.key = key; + } + public Object consistentHashKey() { + return key; + } + } + + public static class Entry implements Serializable { + public final String key; + public final String value; + public Entry(String key, String value) { + this.key = key; + this.value = value; + } + } + + public static final String NOT_FOUND = "NOT_FOUND"; + //#cache-actor + + + @Test + public void demonstrateUsageOfConsistentHashableRouter() { + + new JavaTestKit(system) {{ + + //#consistent-hashing-router + + final ConsistentHashMapping consistentHashMapping = new ConsistentHashMapping() { + @Override + public Object consistentHashKey(Object message) { + if (message instanceof Evict) { + return ((Evict) message).key; + } else { + return null; + } + } + }; + + ActorRef cache = system.actorOf(new Props(Cache.class).withRouter( + new ConsistentHashingRouter(10).withConsistentHashMapping(consistentHashMapping)), + "cache"); + + cache.tell(new ConsistentHashableEnvelope( + new Entry("hello", "HELLO"), "hello"), getRef()); + cache.tell(new ConsistentHashableEnvelope( + new Entry("hi", "HI"), "hi"), getRef()); + + cache.tell(new Get("hello"), getRef()); + expectMsgEquals("HELLO"); + + cache.tell(new Get("hi"), getRef()); + expectMsgEquals("HI"); + + cache.tell(new Evict("hi"), getRef()); + cache.tell(new Get("hi"), getRef()); + expectMsgEquals(NOT_FOUND); + + //#consistent-hashing-router + }}; + } + +} diff --git a/akka-docs/java/routing.rst b/akka-docs/java/routing.rst index dad68ecf30..149a3a315b 100644 --- a/akka-docs/java/routing.rst +++ b/akka-docs/java/routing.rst @@ -287,10 +287,10 @@ insight into how consistent hashing is implemented. There is 3 ways to define what data to use for the consistent hash key. * You can define ``withConsistentHashMapping`` of the router to map incoming - messages to their consistent hash key. This makes the makes the decision + messages to their consistent hash key. This makes the the decision transparent for the sender. -* The messages may implement ``akka.routing.ConsistentHashable``. +* The messages may implement ``akka.routing.ConsistentHashingRouter.ConsistentHashable``. The key is part of the message and it's convenient to define it together with the message definition. @@ -303,7 +303,11 @@ the same time for one router. The ``withConsistentHashMapping`` is tried first. Code example: -FIXME Java example of consistent routing +.. includecode:: code/docs/jrouting/ConsistentHashingRouterDocTestBase.java + :include: imports1,cache-actor + +.. includecode:: code/docs/jrouting/ConsistentHashingRouterDocTestBase.java + :include: imports2,consistent-hashing-router In the above example you see that the ``Get`` message implements ``ConsistentHashable`` itself, while the ``Entry`` message is wrapped in a ``ConsistentHashableEnvelope``. The ``Evict`` diff --git a/akka-docs/scala/code/docs/routing/ConsistentHashingRouterDocSpec.scala b/akka-docs/scala/code/docs/routing/ConsistentHashingRouterDocSpec.scala index 945a78e1eb..7041db6049 100644 --- a/akka-docs/scala/code/docs/routing/ConsistentHashingRouterDocSpec.scala +++ b/akka-docs/scala/code/docs/routing/ConsistentHashingRouterDocSpec.scala @@ -11,7 +11,6 @@ object ConsistentHashingRouterDocSpec { //#cache-actor import akka.actor.Actor import akka.routing.ConsistentHashingRouter.ConsistentHashable - import akka.routing.ConsistentHashingRouter.ConsistentHashableEnvelope class Cache extends Actor { var cache = Map.empty[String, String] diff --git a/akka-docs/scala/routing.rst b/akka-docs/scala/routing.rst index e10a353988..4766b2e6ca 100644 --- a/akka-docs/scala/routing.rst +++ b/akka-docs/scala/routing.rst @@ -292,10 +292,10 @@ insight into how consistent hashing is implemented. There is 3 ways to define what data to use for the consistent hash key. * You can define ``consistentHashRoute`` of the router to map incoming - messages to their consistent hash key. This makes the makes the decision + messages to their consistent hash key. This makes the decision transparent for the sender. -* The messages may implement ``akka.routing.ConsistentHashable``. +* The messages may implement ``akka.routing.ConsistentHashingRouter.ConsistentHashable``. The key is part of the message and it's convenient to define it together with the message definition. From b4f59483400bfeb9d515b6557b23628ae53bd8a5 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Mon, 17 Sep 2012 12:12:19 +0200 Subject: [PATCH 16/19] Remove router benchmark, see #944 --- .../src/test/scala/perf/RouterPerf.scala | 79 ------------------- 1 file changed, 79 deletions(-) delete mode 100644 akka-actor-tests/src/test/scala/perf/RouterPerf.scala diff --git a/akka-actor-tests/src/test/scala/perf/RouterPerf.scala b/akka-actor-tests/src/test/scala/perf/RouterPerf.scala deleted file mode 100644 index 20f9554540..0000000000 --- a/akka-actor-tests/src/test/scala/perf/RouterPerf.scala +++ /dev/null @@ -1,79 +0,0 @@ -package perf - -import akka.actor.ActorSystem -import akka.actor.Actor -import akka.actor.Props -import scala.concurrent.forkjoin.ThreadLocalRandom -import akka.routing.ConsistentHashingRouter -import akka.routing.ConsistentHashingRouter.ConsistentHashableEnvelope -import scala.util.Random -import akka.routing.Broadcast -import akka.actor.ActorLogging -import scala.concurrent.util.Duration -import akka.routing.RoundRobinRouter - -object RouterPerf extends App { - val system = ActorSystem("PerfApp") - var perf = new RouterPerf(system) - // Thread.sleep(20000) // hook up profiler here - perf.run() -} - -class RouterPerf(system: ActorSystem) { - - def run(): Unit = { - // nbrMessages = 10000000 - val sender = system.actorOf(Props(new Sender( - nbrMessages = 10000000, nbrRoutees = 10, nbrIterations = 10)), name = "sender") - sender ! "start" - } - -} - -class Sender(nbrMessages: Int, nbrRoutees: Int, nbrIterations: Int) extends Actor with ActorLogging { - val router = context.actorOf(Props[Destination].withRouter(ConsistentHashingRouter(nbrRoutees, - virtualNodesFactor = 10)), "router") - // val router = context.actorOf(Props[Destination].withRouter(RoundRobinRouter(nbrRoutees)), "router") - val rnd = new Random - val messages = Vector.fill(1000)(ConsistentHashableEnvelope("msg", rnd.nextString(10))) - var startTime = 0L - var doneCounter = 0 - var iterationCounter = 0 - - def receive = { - case "start" ⇒ - iterationCounter += 1 - doneCounter = 0 - startTime = System.nanoTime - val messgesSize = messages.size - for (n ← 1 to nbrMessages) { router ! messages(n % messgesSize) } - router ! Broadcast("done") - - case "done" ⇒ - doneCounter += 1 - if (doneCounter == nbrRoutees) { - val duration = Duration.fromNanos(System.nanoTime - startTime) - val mps = (nbrMessages.toDouble * 1000 / duration.toMillis).toInt - // log.info("Processed [{}] messages in [{} millis], i.e. [{}] msg/s", - // nbrMessages, duration.toMillis, mps) - println("Processed [%s] messages in [%s millis], i.e. [%s] msg/s".format( - nbrMessages, duration.toMillis, mps)) - if (iterationCounter < nbrIterations) - self ! "start" - else - context.system.shutdown() - } - } -} - -class Destination extends Actor with ActorLogging { - var count = 0 - def receive = { - case "done" ⇒ - log.info("Handled [{}] messages", count) - count = 0 - sender ! "done" - case msg ⇒ count += 1 - - } -} \ No newline at end of file From 64a1fb023582f40de222e1fc8ee2f24250f43064 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Mon, 17 Sep 2012 13:24:13 +0200 Subject: [PATCH 17/19] Rename a few things, see #944 * hashKey * hashMapping ConsistentHashMapping * withHashMapper ConsistentHashMapper --- .../routing/ConsistentHashingRouterSpec.scala | 17 ++++--- .../routing/ConsistentHashingRouter.scala | 50 ++++++++++--------- .../ConsistentHashingRouterDocTestBase.java | 16 +++--- akka-docs/java/routing.rst | 6 +-- .../ConsistentHashingRouterDocSpec.scala | 10 ++-- akka-docs/scala/routing.rst | 6 +-- 6 files changed, 54 insertions(+), 51 deletions(-) diff --git a/akka-actor-tests/src/test/scala/akka/routing/ConsistentHashingRouterSpec.scala b/akka-actor-tests/src/test/scala/akka/routing/ConsistentHashingRouterSpec.scala index 0024ceea44..867da83bd7 100644 --- a/akka-actor-tests/src/test/scala/akka/routing/ConsistentHashingRouterSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/routing/ConsistentHashingRouterSpec.scala @@ -12,6 +12,7 @@ import akka.actor.actorRef2Scala import akka.pattern.ask import akka.routing.ConsistentHashingRouter.ConsistentHashable import akka.routing.ConsistentHashingRouter.ConsistentHashableEnvelope +import akka.routing.ConsistentHashingRouter.ConsistentHashMapping import akka.testkit.AkkaSpec import akka.testkit._ @@ -62,40 +63,40 @@ class ConsistentHashingRouterSpec extends AkkaSpec(ConsistentHashingRouterSpec.c "select destination based on consistentHashKey of the message" in { router1 ! Msg("a", "A") val destinationA = expectMsgType[ActorRef] - router1 ! ConsistentHashableEnvelope(message = "AA", consistentHashKey = "a") + router1 ! ConsistentHashableEnvelope(message = "AA", hashKey = "a") expectMsg(destinationA) router1 ! Msg(17, "B") val destinationB = expectMsgType[ActorRef] - router1 ! ConsistentHashableEnvelope(message = "BB", consistentHashKey = 17) + router1 ! ConsistentHashableEnvelope(message = "BB", hashKey = 17) expectMsg(destinationB) router1 ! Msg(MsgKey("c"), "C") val destinationC = expectMsgType[ActorRef] - router1 ! ConsistentHashableEnvelope(message = "CC", consistentHashKey = MsgKey("c")) + router1 ! ConsistentHashableEnvelope(message = "CC", hashKey = MsgKey("c")) expectMsg(destinationC) } "select destination with defined consistentHashRoute" in { - def consistentHashRoute: ConsistentHashingRouter.ConsistentHashRoute = { + def hashMapping: ConsistentHashMapping = { case Msg2(key, data) ⇒ key } val router2 = system.actorOf(Props[Echo].withRouter(ConsistentHashingRouter( - consistentHashRoute = consistentHashRoute)), "router2") + hashMapping = hashMapping)), "router2") router2 ! Msg2("a", "A") val destinationA = expectMsgType[ActorRef] - router2 ! ConsistentHashableEnvelope(message = "AA", consistentHashKey = "a") + router2 ! ConsistentHashableEnvelope(message = "AA", hashKey = "a") expectMsg(destinationA) router2 ! Msg2(17, "B") val destinationB = expectMsgType[ActorRef] - router2 ! ConsistentHashableEnvelope(message = "BB", consistentHashKey = 17) + router2 ! ConsistentHashableEnvelope(message = "BB", hashKey = 17) expectMsg(destinationB) router2 ! Msg2(MsgKey("c"), "C") val destinationC = expectMsgType[ActorRef] - router2 ! ConsistentHashableEnvelope(message = "CC", consistentHashKey = MsgKey("c")) + router2 ! ConsistentHashableEnvelope(message = "CC", hashKey = MsgKey("c")) expectMsg(destinationC) } } diff --git a/akka-actor/src/main/scala/akka/routing/ConsistentHashingRouter.scala b/akka-actor/src/main/scala/akka/routing/ConsistentHashingRouter.scala index 9650a038e9..4937691d88 100644 --- a/akka-actor/src/main/scala/akka/routing/ConsistentHashingRouter.scala +++ b/akka-actor/src/main/scala/akka/routing/ConsistentHashingRouter.scala @@ -29,7 +29,7 @@ object ConsistentHashingRouter { } /** - * If you don't define the consistentHashRoute when + * If you don't define the `hashMapping` when * constructing the [[akka.routing.ConsistentHashingRouter]] * the messages need to implement this interface to define what * data to use for the consistent hash key. Note that it's not @@ -49,7 +49,7 @@ object ConsistentHashingRouter { } /** - * If you don't define the consistentHashRoute when + * If you don't define the `hashMapping` when * constructing the [[akka.routing.ConsistentHashingRouter]] * and messages can't implement [[akka.routing.ConsistentHashingRouter.ConsistentHashable]] * themselves they can we wrapped by this envelope instead. The @@ -57,8 +57,10 @@ object ConsistentHashingRouter { * i.e. the envelope will be stripped off. */ @SerialVersionUID(1L) - case class ConsistentHashableEnvelope(message: Any, consistentHashKey: Any) - extends ConsistentHashable with RouterEnvelope + final case class ConsistentHashableEnvelope(message: Any, hashKey: Any) + extends ConsistentHashable with RouterEnvelope { + override def consistentHashKey: Any = hashKey + } /** * Partial function from message to the data to @@ -69,12 +71,12 @@ object ConsistentHashingRouter { * otherwise the configured [[akka.akka.serialization.Serializer]] * will be applied to the returned data. */ - type ConsistentHashRoute = PartialFunction[Any, Any] + type ConsistentHashMapping = PartialFunction[Any, Any] @SerialVersionUID(1L) - object emptyConsistentHashRoute extends ConsistentHashRoute { + object emptyConsistentHashMapping extends ConsistentHashMapping { def isDefinedAt(x: Any) = false - def apply(x: Any) = throw new UnsupportedOperationException("Empty ConsistentHashRoute apply()") + def apply(x: Any) = throw new UnsupportedOperationException("Empty ConsistentHashMapping apply()") } /** @@ -90,8 +92,8 @@ object ConsistentHashingRouter { * otherwise the configured [[akka.akka.serialization.Serializer]] * will be applied to the returned data. */ - trait ConsistentHashMapping { - def consistentHashKey(message: Any): Any + trait ConsistentHashMapper { + def hashKey(message: Any): Any } } /** @@ -100,7 +102,7 @@ object ConsistentHashingRouter { * * There is 3 ways to define what data to use for the consistent hash key. * - * 1. You can define `consistentHashRoute` / `withConsistentHashMapping` + * 1. You can define `hashMapping` / `withHashMapper` * of the router to map incoming messages to their consistent hash key. * This makes the decision transparent for the sender. * @@ -113,7 +115,7 @@ object ConsistentHashingRouter { * the key to use. * * These ways to define the consistent hash key can be use together and at - * the same time for one router. The `consistentHashRoute` is tried first. + * the same time for one router. The `hashMapping` is tried first. * * Please note that providing both 'nrOfInstances' and 'routees' does not make logical * sense as this means that the router should both create new actors and use the 'routees' @@ -137,7 +139,7 @@ object ConsistentHashingRouter { * @param routees string representation of the actor paths of the routees that will be looked up * using `actorFor` in [[akka.actor.ActorRefProvider]] * @param virtualNodesFactor number of virtual nodes per node, used in [[akka.routing.ConsistantHash]] - * @param consistentHashRoute partial function from message to the data to + * @param hashMapping partial function from message to the data to * use for the consistent hash key */ @SerialVersionUID(1L) @@ -146,7 +148,7 @@ case class ConsistentHashingRouter( val routerDispatcher: String = Dispatchers.DefaultDispatcherId, val supervisorStrategy: SupervisorStrategy = Router.defaultSupervisorStrategy, val virtualNodesFactor: Int = 0, - val consistentHashRoute: ConsistentHashingRouter.ConsistentHashRoute = ConsistentHashingRouter.emptyConsistentHashRoute) + val hashMapping: ConsistentHashingRouter.ConsistentHashMapping = ConsistentHashingRouter.emptyConsistentHashMapping) extends RouterConfig with ConsistentHashingLike { /** @@ -188,10 +190,10 @@ case class ConsistentHashingRouter( /** * Java API for setting the mapping from message to the data to use for the consistent hash key. */ - def withConsistentHashMapping(mapping: ConsistentHashingRouter.ConsistentHashMapping) = { - copy(consistentHashRoute = { - case message if (mapping.consistentHashKey(message).asInstanceOf[AnyRef] ne null) ⇒ - mapping.consistentHashKey(message) + def withHashMapper(mapping: ConsistentHashingRouter.ConsistentHashMapper) = { + copy(hashMapping = { + case message if (mapping.hashKey(message).asInstanceOf[AnyRef] ne null) ⇒ + mapping.hashKey(message) }) } @@ -199,7 +201,7 @@ case class ConsistentHashingRouter( * Uses the resizer of the given RouterConfig if this RouterConfig * doesn't have one, i.e. the resizer defined in code is used if * resizer was not defined in config. - * Uses the the consistentHashRoute defined in code, since + * Uses the the `hashMapping` defined in code, since * that can't be defined in configuration. */ override def withFallback(other: RouterConfig): RouterConfig = other match { @@ -208,7 +210,7 @@ case class ConsistentHashingRouter( val useResizer = if (this.resizer.isEmpty && otherRouter.resizer.isDefined) otherRouter.resizer else this.resizer - copy(resizer = useResizer, consistentHashRoute = otherRouter.consistentHashRoute) + copy(resizer = useResizer, hashMapping = otherRouter.hashMapping) case _ ⇒ throw new IllegalArgumentException("Expected ConsistentHashingRouter, got [%s]".format(other)) } } @@ -227,7 +229,7 @@ trait ConsistentHashingLike { this: RouterConfig ⇒ def virtualNodesFactor: Int - def consistentHashRoute: ConsistentHashRoute + def hashMapping: ConsistentHashMapping override def createRoute(routeeProvider: RouteeProvider): Route = { if (resizer.isEmpty) { @@ -273,7 +275,7 @@ trait ConsistentHashingLike { this: RouterConfig ⇒ } catch { case NonFatal(e) ⇒ // serialization failed - log.warning("Couldn't route message with consistentHashKey [{}] due to [{}]", hashData, e.getMessage) + log.warning("Couldn't route message with consistent hash key [{}] due to [{}]", hashData, e.getMessage) routeeProvider.context.system.deadLetters } @@ -281,11 +283,11 @@ trait ConsistentHashingLike { this: RouterConfig ⇒ case (sender, message) ⇒ message match { case Broadcast(msg) ⇒ toAll(sender, routeeProvider.routees) - case _ if consistentHashRoute.isDefinedAt(message) ⇒ - List(Destination(sender, target(consistentHashRoute(message)))) + case _ if hashMapping.isDefinedAt(message) ⇒ + List(Destination(sender, target(hashMapping(message)))) case hashable: ConsistentHashable ⇒ List(Destination(sender, target(hashable.consistentHashKey))) case other ⇒ - log.warning("Message [{}] must be handled by consistentHashRoute, or implement [{}] or be wrapped in [{}]", + log.warning("Message [{}] must be handled by hashMapping, or implement [{}] or be wrapped in [{}]", message.getClass.getName, classOf[ConsistentHashable].getName, classOf[ConsistentHashableEnvelope].getName) List(Destination(sender, routeeProvider.context.system.deadLetters)) diff --git a/akka-docs/java/code/docs/jrouting/ConsistentHashingRouterDocTestBase.java b/akka-docs/java/code/docs/jrouting/ConsistentHashingRouterDocTestBase.java index 6f00ed1509..378049578f 100644 --- a/akka-docs/java/code/docs/jrouting/ConsistentHashingRouterDocTestBase.java +++ b/akka-docs/java/code/docs/jrouting/ConsistentHashingRouterDocTestBase.java @@ -21,7 +21,7 @@ import java.io.Serializable; import akka.actor.Props; import akka.actor.ActorRef; import akka.routing.ConsistentHashingRouter; -import akka.routing.ConsistentHashingRouter.ConsistentHashMapping; +import akka.routing.ConsistentHashingRouter.ConsistentHashMapper; import akka.routing.ConsistentHashingRouter.ConsistentHashableEnvelope; //#imports2 @@ -40,7 +40,7 @@ public class ConsistentHashingRouterDocTestBase { } //#cache-actor - + public static class Cache extends UntypedActor { Map cache = new HashMap(); @@ -62,14 +62,14 @@ public class ConsistentHashingRouterDocTestBase { } } - public static class Evict implements Serializable { + public static final class Evict implements Serializable { public final String key; public Evict(String key) { this.key = key; } } - public static class Get implements Serializable, ConsistentHashable { + public static final class Get implements Serializable, ConsistentHashable { public final String key; public Get(String key) { this.key = key; @@ -79,7 +79,7 @@ public class ConsistentHashingRouterDocTestBase { } } - public static class Entry implements Serializable { + public static final class Entry implements Serializable { public final String key; public final String value; public Entry(String key, String value) { @@ -99,9 +99,9 @@ public class ConsistentHashingRouterDocTestBase { //#consistent-hashing-router - final ConsistentHashMapping consistentHashMapping = new ConsistentHashMapping() { + final ConsistentHashMapper hashMapper = new ConsistentHashMapper() { @Override - public Object consistentHashKey(Object message) { + public Object hashKey(Object message) { if (message instanceof Evict) { return ((Evict) message).key; } else { @@ -111,7 +111,7 @@ public class ConsistentHashingRouterDocTestBase { }; ActorRef cache = system.actorOf(new Props(Cache.class).withRouter( - new ConsistentHashingRouter(10).withConsistentHashMapping(consistentHashMapping)), + new ConsistentHashingRouter(10).withHashMapper(hashMapper)), "cache"); cache.tell(new ConsistentHashableEnvelope( diff --git a/akka-docs/java/routing.rst b/akka-docs/java/routing.rst index 149a3a315b..4653a07692 100644 --- a/akka-docs/java/routing.rst +++ b/akka-docs/java/routing.rst @@ -286,7 +286,7 @@ insight into how consistent hashing is implemented. There is 3 ways to define what data to use for the consistent hash key. -* You can define ``withConsistentHashMapping`` of the router to map incoming +* You can define ``withHashMapper`` of the router to map incoming messages to their consistent hash key. This makes the the decision transparent for the sender. @@ -299,7 +299,7 @@ There is 3 ways to define what data to use for the consistent hash key. the key to use. These ways to define the consistent hash key can be use together and at -the same time for one router. The ``withConsistentHashMapping`` is tried first. +the same time for one router. The ``withHashMapper`` is tried first. Code example: @@ -311,7 +311,7 @@ Code example: In the above example you see that the ``Get`` message implements ``ConsistentHashable`` itself, while the ``Entry`` message is wrapped in a ``ConsistentHashableEnvelope``. The ``Evict`` -message is handled by the ``withConsistentHashMapping``. +message is handled by the ``withHashMapper``. This is an example of how to define a consistent-hashing router in configuration: diff --git a/akka-docs/scala/code/docs/routing/ConsistentHashingRouterDocSpec.scala b/akka-docs/scala/code/docs/routing/ConsistentHashingRouterDocSpec.scala index 7041db6049..dcb40b0c67 100644 --- a/akka-docs/scala/code/docs/routing/ConsistentHashingRouterDocSpec.scala +++ b/akka-docs/scala/code/docs/routing/ConsistentHashingRouterDocSpec.scala @@ -42,20 +42,20 @@ class ConsistentHashingRouterDocSpec extends AkkaSpec with ImplicitSender { //#consistent-hashing-router import akka.actor.Props import akka.routing.ConsistentHashingRouter - import akka.routing.ConsistentHashingRouter.ConsistentHashRoute + import akka.routing.ConsistentHashingRouter.ConsistentHashMapping import akka.routing.ConsistentHashingRouter.ConsistentHashableEnvelope - def consistentHashRoute: ConsistentHashRoute = { + def hashMapping: ConsistentHashMapping = { case Evict(key) ⇒ key } val cache = system.actorOf(Props[Cache].withRouter(ConsistentHashingRouter(10, - consistentHashRoute = consistentHashRoute)), name = "cache") + hashMapping = hashMapping)), name = "cache") cache ! ConsistentHashableEnvelope( - message = Entry("hello", "HELLO"), consistentHashKey = "hello") + message = Entry("hello", "HELLO"), hashKey = "hello") cache ! ConsistentHashableEnvelope( - message = Entry("hi", "HI"), consistentHashKey = "hi") + message = Entry("hi", "HI"), hashKey = "hi") cache ! Get("hello") expectMsg(Some("HELLO")) diff --git a/akka-docs/scala/routing.rst b/akka-docs/scala/routing.rst index 4766b2e6ca..c1fa21b23f 100644 --- a/akka-docs/scala/routing.rst +++ b/akka-docs/scala/routing.rst @@ -291,7 +291,7 @@ insight into how consistent hashing is implemented. There is 3 ways to define what data to use for the consistent hash key. -* You can define ``consistentHashRoute`` of the router to map incoming +* You can define ``hashMapping`` of the router to map incoming messages to their consistent hash key. This makes the decision transparent for the sender. @@ -304,7 +304,7 @@ There is 3 ways to define what data to use for the consistent hash key. the key to use. These ways to define the consistent hash key can be use together and at -the same time for one router. The ``consistentHashRoute`` is tried first. +the same time for one router. The ``hashMapping`` is tried first. Code example: @@ -314,7 +314,7 @@ Code example: In the above example you see that the ``Get`` message implements ``ConsistentHashable`` itself, while the ``Entry`` message is wrapped in a ``ConsistentHashableEnvelope``. The ``Evict`` -message is handled by the ``consistentHashRoute`` partial function. +message is handled by the ``hashMapping`` partial function. This is an example of how to define a consistent-hashing router in configuration: From 44f4fdb003dbf8014bef8bc2bc22c3e974815eaf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bj=C3=B6rn=20Antonsson?= Date: Mon, 17 Sep 2012 09:51:15 +0200 Subject: [PATCH 18/19] Some optimizations for the ConsistentHashingRouter --- .../scala/akka/routing/ConsistentHash.scala | 45 ++++++++++++------- .../routing/ConsistentHashingRouter.scala | 11 +++-- 2 files changed, 33 insertions(+), 23 deletions(-) diff --git a/akka-actor/src/main/scala/akka/routing/ConsistentHash.scala b/akka-actor/src/main/scala/akka/routing/ConsistentHash.scala index bc86f5f82d..fca0837662 100644 --- a/akka-actor/src/main/scala/akka/routing/ConsistentHash.scala +++ b/akka-actor/src/main/scala/akka/routing/ConsistentHash.scala @@ -24,10 +24,9 @@ class ConsistentHash[T] private (nodes: Map[Int, T], virtualNodesFactor: Int) { if (virtualNodesFactor < 1) throw new IllegalArgumentException("virtualNodesFactor must be >= 1") // sorted hash values of the nodes - private val nodeRing: Array[Int] = { - val nodeRing = nodes.keys.toArray - Arrays.sort(nodeRing) - nodeRing + private val (nodeHashRing: Array[Int], nodeRing: Vector[T]) = { + val (nhr: IndexedSeq[Int], nr: IndexedSeq[AnyRef]) = nodes.toArray.sortBy(_._1).unzip + (nhr.toArray, Vector[T]() ++ nr) } /** @@ -62,6 +61,17 @@ class ConsistentHash[T] private (nodes: Map[Int, T], virtualNodesFactor: Int) { */ def remove(node: T): ConsistentHash[T] = this :- node + // converts the result of Arrays.binarySearch into a index in the nodeRing array + // see documentation of Arrays.binarySearch for what it returns + private def idx(i: Int): Int = { + if (i >= 0) i // exact match + else { + val j = math.abs(i + 1) + if (j >= nodeHashRing.length) 0 // after last, use first + else j // next node clockwise + } + } + /** * Get the node responsible for the data key. * Can only be used if nodes exists in the node ring, @@ -70,18 +80,18 @@ class ConsistentHash[T] private (nodes: Map[Int, T], virtualNodesFactor: Int) { def nodeFor(key: Array[Byte]): T = { if (isEmpty) throw new IllegalStateException("Can't get node for [%s] from an empty node ring" format key) - // converts the result of Arrays.binarySearch into a index in the nodeRing array - // see documentation of Arrays.binarySearch for what it returns - def idx(i: Int): Int = { - if (i >= 0) i // exact match - else { - val j = math.abs(i + 1) - if (j >= nodeRing.length) 0 // after last, use first - else j // next node clockwise - } - } - val nodeRingIndex = idx(Arrays.binarySearch(nodeRing, hashFor(key))) - nodes(nodeRing(nodeRingIndex)) + nodeRing(idx(Arrays.binarySearch(nodeHashRing, hashFor(key)))) + } + + /** + * Get the node responsible for the data key. + * Can only be used if nodes exists in the node ring, + * otherwise throws `IllegalStateException` + */ + def nodeFor(key: String): T = { + if (isEmpty) throw new IllegalStateException("Can't get node for [%s] from an empty node ring" format key) + + nodeRing(idx(Arrays.binarySearch(nodeHashRing, hashFor(key)))) } /** @@ -111,5 +121,6 @@ object ConsistentHash { hashFor((node + ":" + vnode).getBytes("UTF-8")) private def hashFor(bytes: Array[Byte]): Int = MurmurHash.arrayHash(bytes) -} + private def hashFor(string: String): Int = MurmurHash.stringHash(string) +} diff --git a/akka-actor/src/main/scala/akka/routing/ConsistentHashingRouter.scala b/akka-actor/src/main/scala/akka/routing/ConsistentHashingRouter.scala index 4937691d88..3b9802d7fd 100644 --- a/akka-actor/src/main/scala/akka/routing/ConsistentHashingRouter.scala +++ b/akka-actor/src/main/scala/akka/routing/ConsistentHashingRouter.scala @@ -264,14 +264,13 @@ trait ConsistentHashingLike { this: RouterConfig ⇒ } def target(hashData: Any): ActorRef = try { - val hash = hashData match { - case bytes: Array[Byte] ⇒ bytes - case str: String ⇒ str.getBytes("UTF-8") - case x: AnyRef ⇒ SerializationExtension(routeeProvider.context.system).serialize(x).get - } val currentConsistenHash = updateConsistentHash() if (currentConsistenHash.isEmpty) routeeProvider.context.system.deadLetters - else currentConsistenHash.nodeFor(hash) + else hashData match { + case bytes: Array[Byte] ⇒ currentConsistenHash.nodeFor(bytes) + case str: String ⇒ currentConsistenHash.nodeFor(str) + case x: AnyRef ⇒ currentConsistenHash.nodeFor(SerializationExtension(routeeProvider.context.system).serialize(x).get) + } } catch { case NonFatal(e) ⇒ // serialization failed From 90d2fb46fca24ab908b068c5893bdb9cfabf4e6c Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Mon, 17 Sep 2012 20:46:53 +0200 Subject: [PATCH 19/19] ClassTag to construct arrays, see #944 * array instead of Vector --- .../scala/akka/routing/ConsistentHash.scala | 23 +++++++++++-------- 1 file changed, 13 insertions(+), 10 deletions(-) diff --git a/akka-actor/src/main/scala/akka/routing/ConsistentHash.scala b/akka-actor/src/main/scala/akka/routing/ConsistentHash.scala index fca0837662..79c31cda33 100644 --- a/akka-actor/src/main/scala/akka/routing/ConsistentHash.scala +++ b/akka-actor/src/main/scala/akka/routing/ConsistentHash.scala @@ -4,7 +4,8 @@ package akka.routing -import scala.collection.immutable.TreeMap +import scala.collection.immutable.SortedMap +import scala.reflect.ClassTag import java.util.Arrays /** @@ -17,16 +18,18 @@ import java.util.Arrays * hash, i.e. make sure it is different for different nodes. * */ -class ConsistentHash[T] private (nodes: Map[Int, T], virtualNodesFactor: Int) { +class ConsistentHash[T: ClassTag] private (nodes: SortedMap[Int, T], virtualNodesFactor: Int) { import ConsistentHash._ if (virtualNodesFactor < 1) throw new IllegalArgumentException("virtualNodesFactor must be >= 1") - // sorted hash values of the nodes - private val (nodeHashRing: Array[Int], nodeRing: Vector[T]) = { - val (nhr: IndexedSeq[Int], nr: IndexedSeq[AnyRef]) = nodes.toArray.sortBy(_._1).unzip - (nhr.toArray, Vector[T]() ++ nr) + // arrays for fast binary search and access + // nodeHashRing is the sorted hash values of the nodes + // nodeRing is the nodes sorted in the same order as nodeHashRing, i.e. same index + private val (nodeHashRing: Array[Int], nodeRing: Array[T]) = { + val (nhr: Seq[Int], nr: Seq[T]) = nodes.toSeq.unzip + (nhr.toArray, nr.toArray) } /** @@ -102,8 +105,8 @@ class ConsistentHash[T] private (nodes: Map[Int, T], virtualNodesFactor: Int) { } object ConsistentHash { - def apply[T](nodes: Iterable[T], virtualNodesFactor: Int) = { - new ConsistentHash(TreeMap.empty[Int, T] ++ + def apply[T: ClassTag](nodes: Iterable[T], virtualNodesFactor: Int): ConsistentHash[T] = { + new ConsistentHash(SortedMap.empty[Int, T] ++ (for (node ← nodes; vnode ← 1 to virtualNodesFactor) yield (nodeHashFor(node, vnode) -> node)), virtualNodesFactor) } @@ -112,9 +115,9 @@ object ConsistentHash { * Factory method to create a ConsistentHash * JAVA API */ - def create[T](nodes: java.lang.Iterable[T], virtualNodesFactor: Int) = { + def create[T](nodes: java.lang.Iterable[T], virtualNodesFactor: Int): ConsistentHash[T] = { import scala.collection.JavaConverters._ - apply(nodes.asScala, virtualNodesFactor) + apply(nodes.asScala, virtualNodesFactor)(ClassTag(classOf[Any].asInstanceOf[Class[T]])) } private def nodeHashFor(node: Any, vnode: Int): Int =