Support partial function to map message to hash key, see #944

* Partial function, ConsistentHashRoute, for Scala API
* withConsistentHashMapping ConsistentHashMapping for
  Java API
* Updated documentation
This commit is contained in:
Patrik Nordwall 2012-09-14 13:47:58 +02:00
parent e3bd02b82c
commit a4dd6b7547
6 changed files with 235 additions and 29 deletions

View file

@ -24,6 +24,10 @@ object ConsistentHashingRouterSpec {
nr-of-instances = 3
virtual-nodes-factor = 17
}
/router2 {
router = consistent-hashing
nr-of-instances = 5
}
}
"""
@ -38,6 +42,8 @@ object ConsistentHashingRouterSpec {
}
case class MsgKey(name: String)
case class Msg2(key: Any, data: String)
}
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
@ -66,7 +72,30 @@ class ConsistentHashingRouterSpec extends AkkaSpec(ConsistentHashingRouterSpec.c
router1 ! Msg(MsgKey("c"), "C")
val destinationC = expectMsgPF(remaining) { case ref: ActorRef ref }
router1 ! Msg(MsgKey("c"), "CC")
router1 ! ConsistentHashableEnvelope(message = "CC", consistentHashKey = MsgKey("c"))
expectMsg(destinationC)
}
"select destination with defined consistentHashRoute" in {
def consistentHashRoute: ConsistentHashingRouter.ConsistentHashRoute = {
case Msg2(key, data) key
}
val router2 = system.actorOf(Props[Echo].withRouter(ConsistentHashingRouter(
consistentHashRoute = consistentHashRoute)), "router2")
router2 ! Msg2("a", "A")
val destinationA = expectMsgPF(remaining) { case ref: ActorRef ref }
router2 ! ConsistentHashableEnvelope(message = "AA", consistentHashKey = "a")
expectMsg(destinationA)
router2 ! Msg2(17, "B")
val destinationB = expectMsgPF(remaining) { case ref: ActorRef ref }
router2 ! ConsistentHashableEnvelope(message = "BB", consistentHashKey = 17)
expectMsg(destinationB)
router2 ! Msg2(MsgKey("c"), "C")
val destinationC = expectMsgPF(remaining) { case ref: ActorRef ref }
router2 ! ConsistentHashableEnvelope(message = "CC", consistentHashKey = MsgKey("c"))
expectMsg(destinationC)
}
}

View file

@ -0,0 +1,79 @@
package perf
import akka.actor.ActorSystem
import akka.actor.Actor
import akka.actor.Props
import scala.concurrent.forkjoin.ThreadLocalRandom
import akka.routing.ConsistentHashingRouter
import akka.routing.ConsistentHashingRouter.ConsistentHashableEnvelope
import scala.util.Random
import akka.routing.Broadcast
import akka.actor.ActorLogging
import scala.concurrent.util.Duration
import akka.routing.RoundRobinRouter
object RouterPerf extends App {
val system = ActorSystem("PerfApp")
var perf = new RouterPerf(system)
// Thread.sleep(20000) // hook up profiler here
perf.run()
}
class RouterPerf(system: ActorSystem) {
def run(): Unit = {
// nbrMessages = 10000000
val sender = system.actorOf(Props(new Sender(
nbrMessages = 10000000, nbrRoutees = 10, nbrIterations = 10)), name = "sender")
sender ! "start"
}
}
class Sender(nbrMessages: Int, nbrRoutees: Int, nbrIterations: Int) extends Actor with ActorLogging {
val router = context.actorOf(Props[Destination].withRouter(ConsistentHashingRouter(nbrRoutees,
virtualNodesFactor = 10)), "router")
// val router = context.actorOf(Props[Destination].withRouter(RoundRobinRouter(nbrRoutees)), "router")
val rnd = new Random
val messages = Vector.fill(1000)(ConsistentHashableEnvelope("msg", rnd.nextString(10)))
var startTime = 0L
var doneCounter = 0
var iterationCounter = 0
def receive = {
case "start"
iterationCounter += 1
doneCounter = 0
startTime = System.nanoTime
val messgesSize = messages.size
for (n 1 to nbrMessages) { router ! messages(n % messgesSize) }
router ! Broadcast("done")
case "done"
doneCounter += 1
if (doneCounter == nbrRoutees) {
val duration = Duration.fromNanos(System.nanoTime - startTime)
val mps = (nbrMessages.toDouble * 1000 / duration.toMillis).toInt
// log.info("Processed [{}] messages in [{} millis], i.e. [{}] msg/s",
// nbrMessages, duration.toMillis, mps)
println("Processed [%s] messages in [%s millis], i.e. [%s] msg/s".format(
nbrMessages, duration.toMillis, mps))
if (iterationCounter < nbrIterations)
self ! "start"
else
context.system.shutdown()
}
}
}
class Destination extends Actor with ActorLogging {
var count = 0
def receive = {
case "done"
log.info("Handled [{}] messages", count)
count = 0
sender ! "done"
case msg count += 1
}
}

View file

@ -29,23 +29,29 @@ object ConsistentHashingRouter {
}
/**
* Messages need to implement this interface to define what
* If you don't define the consistentHashRoute when
* constructing the [[akka.routing.ConsistentHashingRouter]]
* the messages need to implement this interface to define what
* data to use for the consistent hash key. Note that it's not
* the hash, but the data to be hashed. If returning an
* `Array[Byte]` or String it will be used as is, otherwise the
* configured [[akka.akka.serialization.Serializer]] will
* be applied to the returned data.
* the hash, but the data to be hashed.
*
* If returning an `Array[Byte]` or String it will be used as is,
* otherwise the configured [[akka.akka.serialization.Serializer]]
* will be applied to the returned data.
*
* If messages can't implement this interface themselves,
* it's possible to wrap the messages in
* [[akka.routing.ConsistentHashableEnvelope]]
* [[akka.routing.ConsistentHashingRouter.ConsistentHashableEnvelope]],
* or use [[akka.routing.ConsistentHashingRouter.ConsistentHashableEnvelope]]
*/
trait ConsistentHashable {
def consistentHashKey: Any
}
/**
* If messages can't implement [[akka.routing.ConsistentHashable]]
* If you don't define the consistentHashRoute when
* constructing the [[akka.routing.ConsistentHashingRouter]]
* and messages can't implement [[akka.routing.ConsistentHashingRouter.ConsistentHashable]]
* themselves they can we wrapped by this envelope instead. The
* router will only send the wrapped message to the destination,
* i.e. the envelope will be stripped off.
@ -54,12 +60,57 @@ object ConsistentHashingRouter {
case class ConsistentHashableEnvelope(message: Any, consistentHashKey: Any)
extends ConsistentHashable with RouterEnvelope
/**
* Partial function from message to the data to
* use for the consistent hash key. Note that it's not
* the hash that is to be returned, but the data to be hashed.
*
* If returning an `Array[Byte]` or String it will be used as is,
* otherwise the configured [[akka.akka.serialization.Serializer]]
* will be applied to the returned data.
*/
type ConsistentHashRoute = PartialFunction[Any, Any]
@SerialVersionUID(1L)
object emptyConsistentHashRoute extends ConsistentHashRoute {
def isDefinedAt(x: Any) = false
def apply(x: Any) = throw new UnsupportedOperationException("Empty ConsistentHashRoute apply()")
}
/**
* JAVA API
* Mapping from message to the data to use for the consistent hash key.
* Note that it's not the hash that is to be returned, but the data to be
* hashed.
*
* If returning an `Array[Byte]` or String it will be used as is,
* otherwise the configured [[akka.akka.serialization.Serializer]]
* will be applied to the returned data.
*/
trait ConsistentHashMapping {
def consistentHashKey(message: Any): Any
}
}
/**
* A Router that uses consistent hashing to select a connection based on the
* sent message. The messages must implement [[akka.routing.ConsistentHashable]]
* or be wrapped in a [[akka.routing.ConsistentHashableEnvelope]] to define what
* data to use for the consistent hash key.
* sent message.
*
* There is 3 ways to define what data to use for the consistent hash key.
*
* 1. You can define `consistentHashRoute` / `withConsistentHashMapping`
* of the router to map incoming messages to their consistent hash key.
* This makes the makes the decision transparent for the sender.
*
* 2. The messages may implement [[akka.routing.ConsistentHashable]].
* The key is part of the message and it's convenient to define it together
* with the message definition.
*
* 3. The messages can be be wrapped in a [[akka.routing.ConsistentHashingRouter.ConsistentHashableEnvelope]]
* to define what data to use for the consistent hash key. The sender knows
* the key to use.
*
* These ways to define the consistent hash key can be use together and at
* the same time for one router. The `consistentHashRoute` is tried first.
*
* Please note that providing both 'nrOfInstances' and 'routees' does not make logical
* sense as this means that the router should both create new actors and use the 'routees'
@ -83,13 +134,16 @@ object ConsistentHashingRouter {
* @param routees string representation of the actor paths of the routees that will be looked up
* using `actorFor` in [[akka.actor.ActorRefProvider]]
* @param virtualNodesFactor number of virtual nodes per node, used in [[akka.routing.ConsistantHash]]
* @param consistentHashRoute partial function from message to the data to
* use for the consistent hash key
*/
@SerialVersionUID(1L)
case class ConsistentHashingRouter(
nrOfInstances: Int = 0, routees: Iterable[String] = Nil, override val resizer: Option[Resizer] = None,
val routerDispatcher: String = Dispatchers.DefaultDispatcherId,
val supervisorStrategy: SupervisorStrategy = Router.defaultSupervisorStrategy,
val virtualNodesFactor: Int = 0)
val virtualNodesFactor: Int = 0,
val consistentHashRoute: ConsistentHashingRouter.ConsistentHashRoute = ConsistentHashingRouter.emptyConsistentHashRoute)
extends RouterConfig with ConsistentHashingLike {
/**
@ -128,14 +182,29 @@ case class ConsistentHashingRouter(
*/
def withVirtualNodesFactor(vnodes: Int): ConsistentHashingRouter = copy(virtualNodesFactor = vnodes)
/**
* Java API for setting the mapping from message to the data to use for the consistent hash key
*/
def withConsistentHashMapping(mapping: ConsistentHashingRouter.ConsistentHashMapping) = {
copy(consistentHashRoute = {
case message mapping.consistentHashKey(message)
})
}
/**
* Uses the resizer of the given RouterConfig if this RouterConfig
* doesn't have one, i.e. the resizer defined in code is used if
* resizer was not defined in config.
* Uses the the consistentHashRoute defined in code, since
* that can't be defined in configuration.
*/
override def withFallback(other: RouterConfig): RouterConfig = {
if (this.resizer.isEmpty && other.resizer.isDefined) copy(resizer = other.resizer)
else this
override def withFallback(other: RouterConfig): RouterConfig = other match {
case otherRouter: ConsistentHashingRouter
val useResizer =
if (this.resizer.isEmpty && otherRouter.resizer.isDefined) otherRouter.resizer
else this.resizer
copy(resizer = useResizer, consistentHashRoute = otherRouter.consistentHashRoute)
case _ throw new IllegalArgumentException("Expected ConsistentHashingRouter, got [%s]".format(other))
}
}
@ -153,6 +222,8 @@ trait ConsistentHashingLike { this: RouterConfig ⇒
def virtualNodesFactor: Int
def consistentHashRoute: ConsistentHashRoute
override def createRoute(routeeProvider: RouteeProvider): Route = {
if (resizer.isEmpty) {
if (routees.isEmpty) routeeProvider.createRoutees(nrOfInstances)
@ -204,14 +275,17 @@ trait ConsistentHashingLike { this: RouterConfig ⇒
{
case (sender, message)
message match {
case Broadcast(msg) toAll(sender, routeeProvider.routees)
case Broadcast(msg) toAll(sender, routeeProvider.routees)
case _ if consistentHashRoute.isDefinedAt(message)
List(Destination(sender, target(consistentHashRoute(message))))
case hashable: ConsistentHashable List(Destination(sender, target(hashable.consistentHashKey)))
case other
log.warning("Message [{}] must implement [{}] or be wrapped in [{}]",
log.warning("Message [{}] must be handled by consistentHashRoute, or implement [{}] or be wrapped in [{}]",
message.getClass.getName, classOf[ConsistentHashable].getName,
classOf[ConsistentHashableEnvelope].getName)
List(Destination(sender, routeeProvider.context.system.deadLetters))
}
}
}
}

View file

@ -15,6 +15,7 @@ is really easy to create your own. The routers shipped with Akka are:
* ``akka.routing.SmallestMailboxRouter``
* ``akka.routing.BroadcastRouter``
* ``akka.routing.ScatterGatherFirstCompletedRouter``
* ``akka.routing.ConsistentHashingRouter``
Routers In Action
^^^^^^^^^^^^^^^^^
@ -283,12 +284,22 @@ to select a connection based on the sent message. This
`article <http://weblogs.java.net/blog/tomwhite/archive/2007/11/consistent_hash.html>`_ gives good
insight into how consistent hashing is implemented.
The messages sent to a ConsistentHashingRouter must implement
``akka.routing.ConsistentHashable`` or be wrapped in a ``akka.routing.ConsistentHashableEnvelope``
to define what data to use for the consistent hash key. If returning a
byte array or String it will be used as is, otherwise the configured
:ref:`serializer <serialization-scala>` will be applied to the returned data
to create a byte array that will be hashed.
There is 3 ways to define what data to use for the consistent hash key.
* You can define ``withConsistentHashMapping`` of the router to map incoming
messages to their consistent hash key. This makes the makes the decision
transparent for the sender.
* The messages may implement ``akka.routing.ConsistentHashable``.
The key is part of the message and it's convenient to define it together
with the message definition.
* The messages can be be wrapped in a ``akka.routing.ConsistentHashingRouter.ConsistentHashableEnvelope``
to define what data to use for the consistent hash key. The sender knows
the key to use.
These ways to define the consistent hash key can be use together and at
the same time for one router. The ``withConsistentHashMapping`` is tried first.
Code example:

View file

@ -19,9 +19,12 @@ object ConsistentHashingRouterDocSpec {
def receive = {
case Entry(key, value) cache += (key -> value)
case Get(key) sender ! cache.get(key)
case Evict(key) => cache -= key
}
}
case class Evict(key: String)
case class Get(key: String) extends ConsistentHashable {
override def consistentHashKey: Any = key
}

View file

@ -289,12 +289,22 @@ to select a connection based on the sent message. This
`article <http://weblogs.java.net/blog/tomwhite/archive/2007/11/consistent_hash.html>`_ gives good
insight into how consistent hashing is implemented.
The messages sent to a ConsistentHashingRouter must implement
``akka.routing.ConsistentHashable`` or be wrapped in a ``akka.routing.ConsistentHashableEnvelope``
to define what data to use for the consistent hash key. If returning a
byte array or String it will be used as is, otherwise the configured
:ref:`serializer <serialization-scala>` will be applied to the returned data
to create a byte array that will be hashed.
There is 3 ways to define what data to use for the consistent hash key.
* You can define ``consistentHashRoute`` of the router to map incoming
messages to their consistent hash key. This makes the makes the decision
transparent for the sender.
* The messages may implement ``akka.routing.ConsistentHashable``.
The key is part of the message and it's convenient to define it together
with the message definition.
* The messages can be be wrapped in a ``akka.routing.ConsistentHashingRouter.ConsistentHashableEnvelope``
to define what data to use for the consistent hash key. The sender knows
the key to use.
These ways to define the consistent hash key can be use together and at
the same time for one router. The ``consistentHashRoute`` is tried first.
Code example: