From a4dd6b754754fbdd61e1fcfa76497664380985df Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Fri, 14 Sep 2012 13:47:58 +0200 Subject: [PATCH] Support partial function to map message to hash key, see #944 * Partial function, ConsistentHashRoute, for Scala API * withConsistentHashMapping ConsistentHashMapping for Java API * Updated documentation --- .../routing/ConsistentHashingRouterSpec.scala | 31 ++++- .../src/test/scala/perf/RouterPerf.scala | 79 +++++++++++++ .../routing/ConsistentHashingRouter.scala | 106 +++++++++++++++--- akka-docs/java/routing.rst | 23 +++- .../ConsistentHashingRouterDocSpec.scala | 3 + akka-docs/scala/routing.rst | 22 +++- 6 files changed, 235 insertions(+), 29 deletions(-) create mode 100644 akka-actor-tests/src/test/scala/perf/RouterPerf.scala diff --git a/akka-actor-tests/src/test/scala/akka/routing/ConsistentHashingRouterSpec.scala b/akka-actor-tests/src/test/scala/akka/routing/ConsistentHashingRouterSpec.scala index 7a28190523..b4ef639067 100644 --- a/akka-actor-tests/src/test/scala/akka/routing/ConsistentHashingRouterSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/routing/ConsistentHashingRouterSpec.scala @@ -24,6 +24,10 @@ object ConsistentHashingRouterSpec { nr-of-instances = 3 virtual-nodes-factor = 17 } + /router2 { + router = consistent-hashing + nr-of-instances = 5 + } } """ @@ -38,6 +42,8 @@ object ConsistentHashingRouterSpec { } case class MsgKey(name: String) + + case class Msg2(key: Any, data: String) } @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) @@ -66,7 +72,30 @@ class ConsistentHashingRouterSpec extends AkkaSpec(ConsistentHashingRouterSpec.c router1 ! Msg(MsgKey("c"), "C") val destinationC = expectMsgPF(remaining) { case ref: ActorRef ⇒ ref } - router1 ! Msg(MsgKey("c"), "CC") + router1 ! ConsistentHashableEnvelope(message = "CC", consistentHashKey = MsgKey("c")) + expectMsg(destinationC) + } + + "select destination with defined consistentHashRoute" in { + def consistentHashRoute: ConsistentHashingRouter.ConsistentHashRoute = { + case Msg2(key, data) ⇒ key + } + val router2 = system.actorOf(Props[Echo].withRouter(ConsistentHashingRouter( + consistentHashRoute = consistentHashRoute)), "router2") + + router2 ! Msg2("a", "A") + val destinationA = expectMsgPF(remaining) { case ref: ActorRef ⇒ ref } + router2 ! ConsistentHashableEnvelope(message = "AA", consistentHashKey = "a") + expectMsg(destinationA) + + router2 ! Msg2(17, "B") + val destinationB = expectMsgPF(remaining) { case ref: ActorRef ⇒ ref } + router2 ! ConsistentHashableEnvelope(message = "BB", consistentHashKey = 17) + expectMsg(destinationB) + + router2 ! Msg2(MsgKey("c"), "C") + val destinationC = expectMsgPF(remaining) { case ref: ActorRef ⇒ ref } + router2 ! ConsistentHashableEnvelope(message = "CC", consistentHashKey = MsgKey("c")) expectMsg(destinationC) } } diff --git a/akka-actor-tests/src/test/scala/perf/RouterPerf.scala b/akka-actor-tests/src/test/scala/perf/RouterPerf.scala new file mode 100644 index 0000000000..20f9554540 --- /dev/null +++ b/akka-actor-tests/src/test/scala/perf/RouterPerf.scala @@ -0,0 +1,79 @@ +package perf + +import akka.actor.ActorSystem +import akka.actor.Actor +import akka.actor.Props +import scala.concurrent.forkjoin.ThreadLocalRandom +import akka.routing.ConsistentHashingRouter +import akka.routing.ConsistentHashingRouter.ConsistentHashableEnvelope +import scala.util.Random +import akka.routing.Broadcast +import akka.actor.ActorLogging +import scala.concurrent.util.Duration +import akka.routing.RoundRobinRouter + +object RouterPerf extends App { + val system = ActorSystem("PerfApp") + var perf = new RouterPerf(system) + // Thread.sleep(20000) // hook up profiler here + perf.run() +} + +class RouterPerf(system: ActorSystem) { + + def run(): Unit = { + // nbrMessages = 10000000 + val sender = system.actorOf(Props(new Sender( + nbrMessages = 10000000, nbrRoutees = 10, nbrIterations = 10)), name = "sender") + sender ! "start" + } + +} + +class Sender(nbrMessages: Int, nbrRoutees: Int, nbrIterations: Int) extends Actor with ActorLogging { + val router = context.actorOf(Props[Destination].withRouter(ConsistentHashingRouter(nbrRoutees, + virtualNodesFactor = 10)), "router") + // val router = context.actorOf(Props[Destination].withRouter(RoundRobinRouter(nbrRoutees)), "router") + val rnd = new Random + val messages = Vector.fill(1000)(ConsistentHashableEnvelope("msg", rnd.nextString(10))) + var startTime = 0L + var doneCounter = 0 + var iterationCounter = 0 + + def receive = { + case "start" ⇒ + iterationCounter += 1 + doneCounter = 0 + startTime = System.nanoTime + val messgesSize = messages.size + for (n ← 1 to nbrMessages) { router ! messages(n % messgesSize) } + router ! Broadcast("done") + + case "done" ⇒ + doneCounter += 1 + if (doneCounter == nbrRoutees) { + val duration = Duration.fromNanos(System.nanoTime - startTime) + val mps = (nbrMessages.toDouble * 1000 / duration.toMillis).toInt + // log.info("Processed [{}] messages in [{} millis], i.e. [{}] msg/s", + // nbrMessages, duration.toMillis, mps) + println("Processed [%s] messages in [%s millis], i.e. [%s] msg/s".format( + nbrMessages, duration.toMillis, mps)) + if (iterationCounter < nbrIterations) + self ! "start" + else + context.system.shutdown() + } + } +} + +class Destination extends Actor with ActorLogging { + var count = 0 + def receive = { + case "done" ⇒ + log.info("Handled [{}] messages", count) + count = 0 + sender ! "done" + case msg ⇒ count += 1 + + } +} \ No newline at end of file diff --git a/akka-actor/src/main/scala/akka/routing/ConsistentHashingRouter.scala b/akka-actor/src/main/scala/akka/routing/ConsistentHashingRouter.scala index b0a9ccb113..a08bbdd481 100644 --- a/akka-actor/src/main/scala/akka/routing/ConsistentHashingRouter.scala +++ b/akka-actor/src/main/scala/akka/routing/ConsistentHashingRouter.scala @@ -29,23 +29,29 @@ object ConsistentHashingRouter { } /** - * Messages need to implement this interface to define what + * If you don't define the consistentHashRoute when + * constructing the [[akka.routing.ConsistentHashingRouter]] + * the messages need to implement this interface to define what * data to use for the consistent hash key. Note that it's not - * the hash, but the data to be hashed. If returning an - * `Array[Byte]` or String it will be used as is, otherwise the - * configured [[akka.akka.serialization.Serializer]] will - * be applied to the returned data. + * the hash, but the data to be hashed. + * + * If returning an `Array[Byte]` or String it will be used as is, + * otherwise the configured [[akka.akka.serialization.Serializer]] + * will be applied to the returned data. * * If messages can't implement this interface themselves, * it's possible to wrap the messages in - * [[akka.routing.ConsistentHashableEnvelope]] + * [[akka.routing.ConsistentHashingRouter.ConsistentHashableEnvelope]], + * or use [[akka.routing.ConsistentHashingRouter.ConsistentHashableEnvelope]] */ trait ConsistentHashable { def consistentHashKey: Any } /** - * If messages can't implement [[akka.routing.ConsistentHashable]] + * If you don't define the consistentHashRoute when + * constructing the [[akka.routing.ConsistentHashingRouter]] + * and messages can't implement [[akka.routing.ConsistentHashingRouter.ConsistentHashable]] * themselves they can we wrapped by this envelope instead. The * router will only send the wrapped message to the destination, * i.e. the envelope will be stripped off. @@ -54,12 +60,57 @@ object ConsistentHashingRouter { case class ConsistentHashableEnvelope(message: Any, consistentHashKey: Any) extends ConsistentHashable with RouterEnvelope + /** + * Partial function from message to the data to + * use for the consistent hash key. Note that it's not + * the hash that is to be returned, but the data to be hashed. + * + * If returning an `Array[Byte]` or String it will be used as is, + * otherwise the configured [[akka.akka.serialization.Serializer]] + * will be applied to the returned data. + */ + type ConsistentHashRoute = PartialFunction[Any, Any] + + @SerialVersionUID(1L) + object emptyConsistentHashRoute extends ConsistentHashRoute { + def isDefinedAt(x: Any) = false + def apply(x: Any) = throw new UnsupportedOperationException("Empty ConsistentHashRoute apply()") + } + + /** + * JAVA API + * Mapping from message to the data to use for the consistent hash key. + * Note that it's not the hash that is to be returned, but the data to be + * hashed. + * + * If returning an `Array[Byte]` or String it will be used as is, + * otherwise the configured [[akka.akka.serialization.Serializer]] + * will be applied to the returned data. + */ + trait ConsistentHashMapping { + def consistentHashKey(message: Any): Any + } } /** * A Router that uses consistent hashing to select a connection based on the - * sent message. The messages must implement [[akka.routing.ConsistentHashable]] - * or be wrapped in a [[akka.routing.ConsistentHashableEnvelope]] to define what - * data to use for the consistent hash key. + * sent message. + * + * There is 3 ways to define what data to use for the consistent hash key. + * + * 1. You can define `consistentHashRoute` / `withConsistentHashMapping` + * of the router to map incoming messages to their consistent hash key. + * This makes the makes the decision transparent for the sender. + * + * 2. The messages may implement [[akka.routing.ConsistentHashable]]. + * The key is part of the message and it's convenient to define it together + * with the message definition. + * + * 3. The messages can be be wrapped in a [[akka.routing.ConsistentHashingRouter.ConsistentHashableEnvelope]] + * to define what data to use for the consistent hash key. The sender knows + * the key to use. + * + * These ways to define the consistent hash key can be use together and at + * the same time for one router. The `consistentHashRoute` is tried first. * * Please note that providing both 'nrOfInstances' and 'routees' does not make logical * sense as this means that the router should both create new actors and use the 'routees' @@ -83,13 +134,16 @@ object ConsistentHashingRouter { * @param routees string representation of the actor paths of the routees that will be looked up * using `actorFor` in [[akka.actor.ActorRefProvider]] * @param virtualNodesFactor number of virtual nodes per node, used in [[akka.routing.ConsistantHash]] + * @param consistentHashRoute partial function from message to the data to + * use for the consistent hash key */ @SerialVersionUID(1L) case class ConsistentHashingRouter( nrOfInstances: Int = 0, routees: Iterable[String] = Nil, override val resizer: Option[Resizer] = None, val routerDispatcher: String = Dispatchers.DefaultDispatcherId, val supervisorStrategy: SupervisorStrategy = Router.defaultSupervisorStrategy, - val virtualNodesFactor: Int = 0) + val virtualNodesFactor: Int = 0, + val consistentHashRoute: ConsistentHashingRouter.ConsistentHashRoute = ConsistentHashingRouter.emptyConsistentHashRoute) extends RouterConfig with ConsistentHashingLike { /** @@ -128,14 +182,29 @@ case class ConsistentHashingRouter( */ def withVirtualNodesFactor(vnodes: Int): ConsistentHashingRouter = copy(virtualNodesFactor = vnodes) + /** + * Java API for setting the mapping from message to the data to use for the consistent hash key + */ + def withConsistentHashMapping(mapping: ConsistentHashingRouter.ConsistentHashMapping) = { + copy(consistentHashRoute = { + case message ⇒ mapping.consistentHashKey(message) + }) + } + /** * Uses the resizer of the given RouterConfig if this RouterConfig * doesn't have one, i.e. the resizer defined in code is used if * resizer was not defined in config. + * Uses the the consistentHashRoute defined in code, since + * that can't be defined in configuration. */ - override def withFallback(other: RouterConfig): RouterConfig = { - if (this.resizer.isEmpty && other.resizer.isDefined) copy(resizer = other.resizer) - else this + override def withFallback(other: RouterConfig): RouterConfig = other match { + case otherRouter: ConsistentHashingRouter ⇒ + val useResizer = + if (this.resizer.isEmpty && otherRouter.resizer.isDefined) otherRouter.resizer + else this.resizer + copy(resizer = useResizer, consistentHashRoute = otherRouter.consistentHashRoute) + case _ ⇒ throw new IllegalArgumentException("Expected ConsistentHashingRouter, got [%s]".format(other)) } } @@ -153,6 +222,8 @@ trait ConsistentHashingLike { this: RouterConfig ⇒ def virtualNodesFactor: Int + def consistentHashRoute: ConsistentHashRoute + override def createRoute(routeeProvider: RouteeProvider): Route = { if (resizer.isEmpty) { if (routees.isEmpty) routeeProvider.createRoutees(nrOfInstances) @@ -204,14 +275,17 @@ trait ConsistentHashingLike { this: RouterConfig ⇒ { case (sender, message) ⇒ message match { - case Broadcast(msg) ⇒ toAll(sender, routeeProvider.routees) + case Broadcast(msg) ⇒ toAll(sender, routeeProvider.routees) + case _ if consistentHashRoute.isDefinedAt(message) ⇒ + List(Destination(sender, target(consistentHashRoute(message)))) case hashable: ConsistentHashable ⇒ List(Destination(sender, target(hashable.consistentHashKey))) case other ⇒ - log.warning("Message [{}] must implement [{}] or be wrapped in [{}]", + log.warning("Message [{}] must be handled by consistentHashRoute, or implement [{}] or be wrapped in [{}]", message.getClass.getName, classOf[ConsistentHashable].getName, classOf[ConsistentHashableEnvelope].getName) List(Destination(sender, routeeProvider.context.system.deadLetters)) } + } } } \ No newline at end of file diff --git a/akka-docs/java/routing.rst b/akka-docs/java/routing.rst index 561cbd9446..0096c9e3e8 100644 --- a/akka-docs/java/routing.rst +++ b/akka-docs/java/routing.rst @@ -15,6 +15,7 @@ is really easy to create your own. The routers shipped with Akka are: * ``akka.routing.SmallestMailboxRouter`` * ``akka.routing.BroadcastRouter`` * ``akka.routing.ScatterGatherFirstCompletedRouter`` +* ``akka.routing.ConsistentHashingRouter`` Routers In Action ^^^^^^^^^^^^^^^^^ @@ -283,12 +284,22 @@ to select a connection based on the sent message. This `article `_ gives good insight into how consistent hashing is implemented. -The messages sent to a ConsistentHashingRouter must implement -``akka.routing.ConsistentHashable`` or be wrapped in a ``akka.routing.ConsistentHashableEnvelope`` -to define what data to use for the consistent hash key. If returning a -byte array or String it will be used as is, otherwise the configured -:ref:`serializer ` will be applied to the returned data -to create a byte array that will be hashed. +There is 3 ways to define what data to use for the consistent hash key. + +* You can define ``withConsistentHashMapping`` of the router to map incoming + messages to their consistent hash key. This makes the makes the decision + transparent for the sender. + +* The messages may implement ``akka.routing.ConsistentHashable``. + The key is part of the message and it's convenient to define it together + with the message definition. + +* The messages can be be wrapped in a ``akka.routing.ConsistentHashingRouter.ConsistentHashableEnvelope`` + to define what data to use for the consistent hash key. The sender knows + the key to use. + +These ways to define the consistent hash key can be use together and at +the same time for one router. The ``withConsistentHashMapping`` is tried first. Code example: diff --git a/akka-docs/scala/code/docs/routing/ConsistentHashingRouterDocSpec.scala b/akka-docs/scala/code/docs/routing/ConsistentHashingRouterDocSpec.scala index 8bb59a7bce..42af05c049 100644 --- a/akka-docs/scala/code/docs/routing/ConsistentHashingRouterDocSpec.scala +++ b/akka-docs/scala/code/docs/routing/ConsistentHashingRouterDocSpec.scala @@ -19,9 +19,12 @@ object ConsistentHashingRouterDocSpec { def receive = { case Entry(key, value) ⇒ cache += (key -> value) case Get(key) ⇒ sender ! cache.get(key) + case Evict(key) => cache -= key } } + case class Evict(key: String) + case class Get(key: String) extends ConsistentHashable { override def consistentHashKey: Any = key } diff --git a/akka-docs/scala/routing.rst b/akka-docs/scala/routing.rst index 3f384895c6..0cffa3c1a7 100644 --- a/akka-docs/scala/routing.rst +++ b/akka-docs/scala/routing.rst @@ -289,12 +289,22 @@ to select a connection based on the sent message. This `article `_ gives good insight into how consistent hashing is implemented. -The messages sent to a ConsistentHashingRouter must implement -``akka.routing.ConsistentHashable`` or be wrapped in a ``akka.routing.ConsistentHashableEnvelope`` -to define what data to use for the consistent hash key. If returning a -byte array or String it will be used as is, otherwise the configured -:ref:`serializer ` will be applied to the returned data -to create a byte array that will be hashed. +There is 3 ways to define what data to use for the consistent hash key. + +* You can define ``consistentHashRoute`` of the router to map incoming + messages to their consistent hash key. This makes the makes the decision + transparent for the sender. + +* The messages may implement ``akka.routing.ConsistentHashable``. + The key is part of the message and it's convenient to define it together + with the message definition. + +* The messages can be be wrapped in a ``akka.routing.ConsistentHashingRouter.ConsistentHashableEnvelope`` + to define what data to use for the consistent hash key. The sender knows + the key to use. + +These ways to define the consistent hash key can be use together and at +the same time for one router. The ``consistentHashRoute`` is tried first. Code example: