Implement ConsistentHashingRouter, see #944
* Added trait RouterEnvelope to handle Broadcast and ConsistentHashableEnvelope in same way, could also be useful for custom routers
This commit is contained in:
parent
521d20ba73
commit
f6dcee423b
5 changed files with 316 additions and 9 deletions
|
|
@ -35,6 +35,9 @@ object DeployerSpec {
|
|||
router = scatter-gather
|
||||
within = 2 seconds
|
||||
}
|
||||
/service-consistent-hashing {
|
||||
router = consistent-hashing
|
||||
}
|
||||
/service-resizer {
|
||||
router = round-robin
|
||||
resizer {
|
||||
|
|
@ -118,6 +121,10 @@ class DeployerSpec extends AkkaSpec(DeployerSpec.deployerConf) {
|
|||
assertRouting("/service-scatter-gather", ScatterGatherFirstCompletedRouter(nrOfInstances = 1, within = 2 seconds), "/service-scatter-gather")
|
||||
}
|
||||
|
||||
"be able to parse 'akka.actor.deployment._' with consistent-hashing router" in {
|
||||
assertRouting("/service-consistent-hashing", ConsistentHashingRouter(1), "/service-consistent-hashing")
|
||||
}
|
||||
|
||||
"be able to parse 'akka.actor.deployment._' with router resizer" in {
|
||||
val resizer = DefaultResizer()
|
||||
assertRouting("/service-resizer", RoundRobinRouter(resizer = Some(resizer)), "/service-resizer")
|
||||
|
|
|
|||
|
|
@ -0,0 +1,79 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
package akka.routing
|
||||
|
||||
import scala.concurrent.Await
|
||||
|
||||
import akka.actor.Actor
|
||||
import akka.actor.ActorRef
|
||||
import akka.actor.Props
|
||||
import akka.actor.actorRef2Scala
|
||||
import akka.pattern.ask
|
||||
import akka.routing.ConsistentHashingRouter.ConsistentHashable
|
||||
import akka.routing.ConsistentHashingRouter.ConsistentHashableEnvelope
|
||||
import akka.testkit.AkkaSpec
|
||||
import akka.testkit._
|
||||
|
||||
object ConsistentHashingRouterSpec {
|
||||
|
||||
val config = """
|
||||
akka.actor.deployment {
|
||||
/router1 {
|
||||
router = consistent-hashing
|
||||
nr-of-instances = 3
|
||||
}
|
||||
}
|
||||
"""
|
||||
|
||||
class Echo extends Actor {
|
||||
def receive = {
|
||||
case _ ⇒ sender ! self
|
||||
}
|
||||
}
|
||||
|
||||
case class Msg(key: Any, data: String) extends ConsistentHashable {
|
||||
override def consistentHashKey = key
|
||||
}
|
||||
|
||||
case class MsgKey(name: String)
|
||||
}
|
||||
|
||||
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
|
||||
class ConsistentHashingRouterSpec extends AkkaSpec(ConsistentHashingRouterSpec.config) with DefaultTimeout with ImplicitSender {
|
||||
import akka.routing.ConsistentHashingRouterSpec._
|
||||
implicit val ec = system.dispatcher
|
||||
|
||||
val router1 = system.actorOf(Props[Echo].withRouter(ConsistentHashingRouter()), "router1")
|
||||
|
||||
"consistent hashing router" must {
|
||||
"create routees from configuration" in {
|
||||
val currentRoutees = Await.result(router1 ? CurrentRoutees, remaining).asInstanceOf[RouterRoutees]
|
||||
currentRoutees.routees.size must be(3)
|
||||
}
|
||||
|
||||
"select destination based on consistentHashKey of the message" in {
|
||||
router1 ! Msg("a", "A")
|
||||
val destinationA = expectMsgPF(remaining) { case ref: ActorRef ⇒ ref }
|
||||
router1 ! new ConsistentHashableEnvelope {
|
||||
override def consistentHashKey = "a"
|
||||
override def message = "AA"
|
||||
}
|
||||
expectMsg(destinationA)
|
||||
|
||||
router1 ! Msg(17, "B")
|
||||
val destinationB = expectMsgPF(remaining) { case ref: ActorRef ⇒ ref }
|
||||
router1 ! new ConsistentHashableEnvelope {
|
||||
override def consistentHashKey = 17
|
||||
override def message = "BB"
|
||||
}
|
||||
expectMsg(destinationB)
|
||||
|
||||
router1 ! Msg(MsgKey("c"), "C")
|
||||
val destinationC = expectMsgPF(remaining) { case ref: ActorRef ⇒ ref }
|
||||
router1 ! Msg(MsgKey("c"), "CC")
|
||||
expectMsg(destinationC)
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -148,12 +148,13 @@ private[akka] class Deployer(val settings: ActorSystem.Settings, val dynamicAcce
|
|||
val resizer: Option[Resizer] = if (config.hasPath("resizer")) Some(DefaultResizer(deployment.getConfig("resizer"))) else None
|
||||
|
||||
val router: RouterConfig = deployment.getString("router") match {
|
||||
case "from-code" ⇒ NoRouter
|
||||
case "round-robin" ⇒ RoundRobinRouter(nrOfInstances, routees, resizer)
|
||||
case "random" ⇒ RandomRouter(nrOfInstances, routees, resizer)
|
||||
case "smallest-mailbox" ⇒ SmallestMailboxRouter(nrOfInstances, routees, resizer)
|
||||
case "scatter-gather" ⇒ ScatterGatherFirstCompletedRouter(nrOfInstances, routees, within, resizer)
|
||||
case "broadcast" ⇒ BroadcastRouter(nrOfInstances, routees, resizer)
|
||||
case "from-code" ⇒ NoRouter
|
||||
case "round-robin" ⇒ RoundRobinRouter(nrOfInstances, routees, resizer)
|
||||
case "random" ⇒ RandomRouter(nrOfInstances, routees, resizer)
|
||||
case "smallest-mailbox" ⇒ SmallestMailboxRouter(nrOfInstances, routees, resizer)
|
||||
case "scatter-gather" ⇒ ScatterGatherFirstCompletedRouter(nrOfInstances, routees, within, resizer)
|
||||
case "broadcast" ⇒ BroadcastRouter(nrOfInstances, routees, resizer)
|
||||
case "consistent-hashing" ⇒ ConsistentHashingRouter(nrOfInstances, routees, resizer)
|
||||
case fqn ⇒
|
||||
val args = Seq(classOf[Config] -> deployment)
|
||||
dynamicAccess.createInstanceFor[RouterConfig](fqn, args).recover({
|
||||
|
|
|
|||
|
|
@ -0,0 +1,212 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
package akka.routing
|
||||
|
||||
import scala.collection.JavaConversions.iterableAsScalaIterable
|
||||
import scala.util.control.NonFatal
|
||||
|
||||
import akka.actor.ActorRef
|
||||
import akka.actor.SupervisorStrategy
|
||||
import akka.actor.Props
|
||||
import akka.dispatch.Dispatchers
|
||||
import akka.event.Logging
|
||||
import akka.serialization.SerializationExtension
|
||||
|
||||
object ConsistentHashingRouter {
|
||||
/**
|
||||
* Creates a new ConsistentHashingRouter, routing to the specified routees
|
||||
*/
|
||||
def apply(routees: Iterable[ActorRef]): ConsistentHashingRouter =
|
||||
new ConsistentHashingRouter(routees = routees map (_.path.toString))
|
||||
|
||||
/**
|
||||
* Java API to create router with the supplied 'routees' actors.
|
||||
*/
|
||||
def create(routees: java.lang.Iterable[ActorRef]): ConsistentHashingRouter = {
|
||||
import scala.collection.JavaConverters._
|
||||
apply(routees.asScala)
|
||||
}
|
||||
|
||||
/**
|
||||
* Messages need to implement this interface to define what
|
||||
* data to use for the consistent hash key. Note that it's not
|
||||
* the hash, but the data to be hashed. If returning an
|
||||
* `Array[Byte]` or String it will be used as is, otherwise the
|
||||
* configured [[akka.akka.serialization.Serializer]] will
|
||||
* be applied to the returned data.
|
||||
*
|
||||
* If messages can't implement this interface themselves,
|
||||
* it's possible to wrap the messages in
|
||||
* [[akka.routing.ConsistentHashableEnvelope]]
|
||||
*/
|
||||
trait ConsistentHashable {
|
||||
def consistentHashKey: Any
|
||||
}
|
||||
|
||||
/**
|
||||
* If messages can't implement [[akka.routing.ConsistentHashable]]
|
||||
* themselves they can we wrapped by something implementing
|
||||
* this interface instead. The router will only send the
|
||||
* wrapped message to the destination, i.e. the envelope will
|
||||
* be stripped off.
|
||||
*/
|
||||
trait ConsistentHashableEnvelope extends ConsistentHashable with RouterEnvelope {
|
||||
def message: Any
|
||||
}
|
||||
|
||||
/**
|
||||
* Default number of replicas (virtual nodes) used in [[akka.routing.ConsistantHash]]
|
||||
*/
|
||||
val DefaultReplicas: Int = 10
|
||||
|
||||
}
|
||||
/**
|
||||
* A Router that uses consistent hashing to select a connection based on the
|
||||
* sent message. The messages must implement [[akka.routing.ConsistentHashable]]
|
||||
* or be wrapped in a [[akka.routing.ConsistentHashableEnvelope]] to define what
|
||||
* data to use for the consistent hash key.
|
||||
*
|
||||
* Please note that providing both 'nrOfInstances' and 'routees' does not make logical
|
||||
* sense as this means that the router should both create new actors and use the 'routees'
|
||||
* actor(s). In this case the 'nrOfInstances' will be ignored and the 'routees' will be used.
|
||||
* <br>
|
||||
* <b>The</b> configuration parameter trumps the constructor arguments. This means that
|
||||
* if you provide either 'nrOfInstances' or 'routees' during instantiation they will
|
||||
* be ignored if the router is defined in the configuration file for the actor being used.
|
||||
*
|
||||
* <h1>Supervision Setup</h1>
|
||||
*
|
||||
* The router creates a “head” actor which supervises and/or monitors the
|
||||
* routees. Instances are created as children of this actor, hence the
|
||||
* children are not supervised by the parent of the router. Common choices are
|
||||
* to always escalate (meaning that fault handling is always applied to all
|
||||
* children simultaneously; this is the default) or use the parent’s strategy,
|
||||
* which will result in routed children being treated individually, but it is
|
||||
* possible as well to use Routers to give different supervisor strategies to
|
||||
* different groups of children.
|
||||
*
|
||||
* @param routees string representation of the actor paths of the routees that will be looked up
|
||||
* using `actorFor` in [[akka.actor.ActorRefProvider]]
|
||||
* @param replicas number of replicas (virtual nodes) used in [[akka.routing.ConsistantHash]]
|
||||
*/
|
||||
@SerialVersionUID(1L)
|
||||
case class ConsistentHashingRouter(
|
||||
nrOfInstances: Int = 0, routees: Iterable[String] = Nil, override val resizer: Option[Resizer] = None,
|
||||
val routerDispatcher: String = Dispatchers.DefaultDispatcherId,
|
||||
val supervisorStrategy: SupervisorStrategy = Router.defaultSupervisorStrategy,
|
||||
val replicas: Int = ConsistentHashingRouter.DefaultReplicas)
|
||||
extends RouterConfig with ConsistentHashingLike {
|
||||
|
||||
/**
|
||||
* Constructor that sets nrOfInstances to be created.
|
||||
* Java API
|
||||
*/
|
||||
def this(nr: Int) = this(nrOfInstances = nr)
|
||||
|
||||
/**
|
||||
* Constructor that sets the routees to be used.
|
||||
* Java API
|
||||
* @param routeePaths string representation of the actor paths of the routees that will be looked up
|
||||
* using `actorFor` in [[akka.actor.ActorRefProvider]]
|
||||
*/
|
||||
def this(routeePaths: java.lang.Iterable[String]) = this(routees = iterableAsScalaIterable(routeePaths))
|
||||
|
||||
/**
|
||||
* Constructor that sets the resizer to be used.
|
||||
* Java API
|
||||
*/
|
||||
def this(resizer: Resizer) = this(resizer = Some(resizer))
|
||||
|
||||
/**
|
||||
* Java API for setting routerDispatcher
|
||||
*/
|
||||
def withDispatcher(dispatcherId: String): ConsistentHashingRouter = copy(routerDispatcher = dispatcherId)
|
||||
|
||||
/**
|
||||
* Java API for setting the supervisor strategy to be used for the “head”
|
||||
* Router actor.
|
||||
*/
|
||||
def withSupervisorStrategy(strategy: SupervisorStrategy): ConsistentHashingRouter = copy(supervisorStrategy = strategy)
|
||||
|
||||
/**
|
||||
* Java API for setting the number of replicas (virtual nodes) used in [[akka.routing.ConsistantHash]]
|
||||
*/
|
||||
def withReplicas(replicas: Int): ConsistentHashingRouter = copy(replicas = replicas)
|
||||
|
||||
/**
|
||||
* Uses the resizer of the given RouterConfig if this RouterConfig
|
||||
* doesn't have one, i.e. the resizer defined in code is used if
|
||||
* resizer was not defined in config.
|
||||
*/
|
||||
override def withFallback(other: RouterConfig): RouterConfig = {
|
||||
if (this.resizer.isEmpty && other.resizer.isDefined) copy(resizer = other.resizer)
|
||||
else this
|
||||
}
|
||||
}
|
||||
|
||||
trait ConsistentHashingLike { this: RouterConfig ⇒
|
||||
|
||||
import ConsistentHashingRouter._
|
||||
|
||||
def nrOfInstances: Int
|
||||
|
||||
def routees: Iterable[String]
|
||||
|
||||
def replicas: Int
|
||||
|
||||
override def createRoute(routeeProvider: RouteeProvider): Route = {
|
||||
if (resizer.isEmpty) {
|
||||
if (routees.isEmpty) routeeProvider.createRoutees(nrOfInstances)
|
||||
else routeeProvider.registerRouteesFor(routees)
|
||||
}
|
||||
|
||||
val log = Logging(routeeProvider.context.system, routeeProvider.context.self)
|
||||
|
||||
// consistentHashRoutees and consistentHash are updated together, synchronized on the consistentHashLock
|
||||
val consistentHashLock = new Object
|
||||
var consistentHashRoutees: IndexedSeq[ActorRef] = null
|
||||
var consistentHash: ConsistentHash[ActorRef] = null
|
||||
upateConsistentHash()
|
||||
|
||||
// update consistentHash when routees has changed
|
||||
// changes to routees are rare and when no changes this is a quick operation
|
||||
def upateConsistentHash(): ConsistentHash[ActorRef] = consistentHashLock.synchronized {
|
||||
val currentRoutees = routeeProvider.routees
|
||||
if ((currentRoutees ne consistentHashRoutees) && currentRoutees != consistentHashRoutees) {
|
||||
consistentHashRoutees = currentRoutees
|
||||
consistentHash = ConsistentHash(currentRoutees, replicas)
|
||||
}
|
||||
consistentHash
|
||||
}
|
||||
|
||||
def target(hashData: Any): ActorRef = try {
|
||||
val hash = hashData match {
|
||||
case bytes: Array[Byte] ⇒ bytes
|
||||
case str: String ⇒ str.getBytes("UTF-8")
|
||||
case x: AnyRef ⇒ SerializationExtension(routeeProvider.context.system).serialize(x).get
|
||||
}
|
||||
val currentConsistenHash = upateConsistentHash()
|
||||
if (currentConsistenHash.isEmpty) routeeProvider.context.system.deadLetters
|
||||
else currentConsistenHash.nodeFor(hash)
|
||||
} catch {
|
||||
case NonFatal(e) ⇒
|
||||
// serialization failed
|
||||
log.warning("Couldn't route message with consistentHashKey [%s] due to [%s]".format(hashData, e.getMessage))
|
||||
routeeProvider.context.system.deadLetters
|
||||
}
|
||||
|
||||
{
|
||||
case (sender, message) ⇒
|
||||
message match {
|
||||
case Broadcast(msg) ⇒ toAll(sender, routeeProvider.routees)
|
||||
case hashable: ConsistentHashable ⇒ List(Destination(sender, target(hashable.consistentHashKey)))
|
||||
case other ⇒
|
||||
log.warning("Message [{}] must implement [{}] or be wrapped in [{}]",
|
||||
message.getClass.getName, classOf[ConsistentHashable].getName,
|
||||
classOf[ConsistentHashableEnvelope].getName)
|
||||
List(Destination(sender, routeeProvider.context.system.deadLetters))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -115,8 +115,8 @@ private[akka] class RoutedActorCell(_system: ActorSystemImpl, _ref: InternalActo
|
|||
val s = if (sender eq null) system.deadLetters else sender
|
||||
|
||||
val msg = message match {
|
||||
case Broadcast(m) ⇒ m
|
||||
case m ⇒ m
|
||||
case wrapped: RouterEnvelope ⇒ wrapped.message
|
||||
case m ⇒ m
|
||||
}
|
||||
|
||||
applyRoute(s, message) match {
|
||||
|
|
@ -400,7 +400,15 @@ private object Router {
|
|||
* Router implementations may choose to handle this message differently.
|
||||
*/
|
||||
@SerialVersionUID(1L)
|
||||
case class Broadcast(message: Any)
|
||||
case class Broadcast(message: Any) extends RouterEnvelope
|
||||
|
||||
/**
|
||||
* Only the contained message will be forwarded to the
|
||||
* destination, i.e. the envelope will be stripped off.
|
||||
*/
|
||||
trait RouterEnvelope {
|
||||
def message: Any
|
||||
}
|
||||
|
||||
/**
|
||||
* Sending this message to a router will make it send back its currently used routees.
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue