From f6dcee423bb3e0bc9e79e88748c83d7d9e195858 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Tue, 11 Sep 2012 15:10:16 +0200 Subject: [PATCH] Implement ConsistentHashingRouter, see #944 * Added trait RouterEnvelope to handle Broadcast and ConsistentHashableEnvelope in same way, could also be useful for custom routers --- .../test/scala/akka/actor/DeployerSpec.scala | 7 + .../routing/ConsistentHashingRouterSpec.scala | 79 +++++++ .../src/main/scala/akka/actor/Deployer.scala | 13 +- .../routing/ConsistentHashingRouter.scala | 212 ++++++++++++++++++ .../src/main/scala/akka/routing/Routing.scala | 14 +- 5 files changed, 316 insertions(+), 9 deletions(-) create mode 100644 akka-actor-tests/src/test/scala/akka/routing/ConsistentHashingRouterSpec.scala create mode 100644 akka-actor/src/main/scala/akka/routing/ConsistentHashingRouter.scala diff --git a/akka-actor-tests/src/test/scala/akka/actor/DeployerSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/DeployerSpec.scala index 1aef438627..37aa133583 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/DeployerSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/DeployerSpec.scala @@ -35,6 +35,9 @@ object DeployerSpec { router = scatter-gather within = 2 seconds } + /service-consistent-hashing { + router = consistent-hashing + } /service-resizer { router = round-robin resizer { @@ -118,6 +121,10 @@ class DeployerSpec extends AkkaSpec(DeployerSpec.deployerConf) { assertRouting("/service-scatter-gather", ScatterGatherFirstCompletedRouter(nrOfInstances = 1, within = 2 seconds), "/service-scatter-gather") } + "be able to parse 'akka.actor.deployment._' with consistent-hashing router" in { + assertRouting("/service-consistent-hashing", ConsistentHashingRouter(1), "/service-consistent-hashing") + } + "be able to parse 'akka.actor.deployment._' with router resizer" in { val resizer = DefaultResizer() assertRouting("/service-resizer", RoundRobinRouter(resizer = Some(resizer)), "/service-resizer") diff --git a/akka-actor-tests/src/test/scala/akka/routing/ConsistentHashingRouterSpec.scala b/akka-actor-tests/src/test/scala/akka/routing/ConsistentHashingRouterSpec.scala new file mode 100644 index 0000000000..fd7a49e867 --- /dev/null +++ b/akka-actor-tests/src/test/scala/akka/routing/ConsistentHashingRouterSpec.scala @@ -0,0 +1,79 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ +package akka.routing + +import scala.concurrent.Await + +import akka.actor.Actor +import akka.actor.ActorRef +import akka.actor.Props +import akka.actor.actorRef2Scala +import akka.pattern.ask +import akka.routing.ConsistentHashingRouter.ConsistentHashable +import akka.routing.ConsistentHashingRouter.ConsistentHashableEnvelope +import akka.testkit.AkkaSpec +import akka.testkit._ + +object ConsistentHashingRouterSpec { + + val config = """ + akka.actor.deployment { + /router1 { + router = consistent-hashing + nr-of-instances = 3 + } + } + """ + + class Echo extends Actor { + def receive = { + case _ ⇒ sender ! self + } + } + + case class Msg(key: Any, data: String) extends ConsistentHashable { + override def consistentHashKey = key + } + + case class MsgKey(name: String) +} + +@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) +class ConsistentHashingRouterSpec extends AkkaSpec(ConsistentHashingRouterSpec.config) with DefaultTimeout with ImplicitSender { + import akka.routing.ConsistentHashingRouterSpec._ + implicit val ec = system.dispatcher + + val router1 = system.actorOf(Props[Echo].withRouter(ConsistentHashingRouter()), "router1") + + "consistent hashing router" must { + "create routees from configuration" in { + val currentRoutees = Await.result(router1 ? CurrentRoutees, remaining).asInstanceOf[RouterRoutees] + currentRoutees.routees.size must be(3) + } + + "select destination based on consistentHashKey of the message" in { + router1 ! Msg("a", "A") + val destinationA = expectMsgPF(remaining) { case ref: ActorRef ⇒ ref } + router1 ! new ConsistentHashableEnvelope { + override def consistentHashKey = "a" + override def message = "AA" + } + expectMsg(destinationA) + + router1 ! Msg(17, "B") + val destinationB = expectMsgPF(remaining) { case ref: ActorRef ⇒ ref } + router1 ! new ConsistentHashableEnvelope { + override def consistentHashKey = 17 + override def message = "BB" + } + expectMsg(destinationB) + + router1 ! Msg(MsgKey("c"), "C") + val destinationC = expectMsgPF(remaining) { case ref: ActorRef ⇒ ref } + router1 ! Msg(MsgKey("c"), "CC") + expectMsg(destinationC) + } + } + +} diff --git a/akka-actor/src/main/scala/akka/actor/Deployer.scala b/akka-actor/src/main/scala/akka/actor/Deployer.scala index 9ceebaacb3..be9da1a505 100644 --- a/akka-actor/src/main/scala/akka/actor/Deployer.scala +++ b/akka-actor/src/main/scala/akka/actor/Deployer.scala @@ -148,12 +148,13 @@ private[akka] class Deployer(val settings: ActorSystem.Settings, val dynamicAcce val resizer: Option[Resizer] = if (config.hasPath("resizer")) Some(DefaultResizer(deployment.getConfig("resizer"))) else None val router: RouterConfig = deployment.getString("router") match { - case "from-code" ⇒ NoRouter - case "round-robin" ⇒ RoundRobinRouter(nrOfInstances, routees, resizer) - case "random" ⇒ RandomRouter(nrOfInstances, routees, resizer) - case "smallest-mailbox" ⇒ SmallestMailboxRouter(nrOfInstances, routees, resizer) - case "scatter-gather" ⇒ ScatterGatherFirstCompletedRouter(nrOfInstances, routees, within, resizer) - case "broadcast" ⇒ BroadcastRouter(nrOfInstances, routees, resizer) + case "from-code" ⇒ NoRouter + case "round-robin" ⇒ RoundRobinRouter(nrOfInstances, routees, resizer) + case "random" ⇒ RandomRouter(nrOfInstances, routees, resizer) + case "smallest-mailbox" ⇒ SmallestMailboxRouter(nrOfInstances, routees, resizer) + case "scatter-gather" ⇒ ScatterGatherFirstCompletedRouter(nrOfInstances, routees, within, resizer) + case "broadcast" ⇒ BroadcastRouter(nrOfInstances, routees, resizer) + case "consistent-hashing" ⇒ ConsistentHashingRouter(nrOfInstances, routees, resizer) case fqn ⇒ val args = Seq(classOf[Config] -> deployment) dynamicAccess.createInstanceFor[RouterConfig](fqn, args).recover({ diff --git a/akka-actor/src/main/scala/akka/routing/ConsistentHashingRouter.scala b/akka-actor/src/main/scala/akka/routing/ConsistentHashingRouter.scala new file mode 100644 index 0000000000..eba812ac60 --- /dev/null +++ b/akka-actor/src/main/scala/akka/routing/ConsistentHashingRouter.scala @@ -0,0 +1,212 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ +package akka.routing + +import scala.collection.JavaConversions.iterableAsScalaIterable +import scala.util.control.NonFatal + +import akka.actor.ActorRef +import akka.actor.SupervisorStrategy +import akka.actor.Props +import akka.dispatch.Dispatchers +import akka.event.Logging +import akka.serialization.SerializationExtension + +object ConsistentHashingRouter { + /** + * Creates a new ConsistentHashingRouter, routing to the specified routees + */ + def apply(routees: Iterable[ActorRef]): ConsistentHashingRouter = + new ConsistentHashingRouter(routees = routees map (_.path.toString)) + + /** + * Java API to create router with the supplied 'routees' actors. + */ + def create(routees: java.lang.Iterable[ActorRef]): ConsistentHashingRouter = { + import scala.collection.JavaConverters._ + apply(routees.asScala) + } + + /** + * Messages need to implement this interface to define what + * data to use for the consistent hash key. Note that it's not + * the hash, but the data to be hashed. If returning an + * `Array[Byte]` or String it will be used as is, otherwise the + * configured [[akka.akka.serialization.Serializer]] will + * be applied to the returned data. + * + * If messages can't implement this interface themselves, + * it's possible to wrap the messages in + * [[akka.routing.ConsistentHashableEnvelope]] + */ + trait ConsistentHashable { + def consistentHashKey: Any + } + + /** + * If messages can't implement [[akka.routing.ConsistentHashable]] + * themselves they can we wrapped by something implementing + * this interface instead. The router will only send the + * wrapped message to the destination, i.e. the envelope will + * be stripped off. + */ + trait ConsistentHashableEnvelope extends ConsistentHashable with RouterEnvelope { + def message: Any + } + + /** + * Default number of replicas (virtual nodes) used in [[akka.routing.ConsistantHash]] + */ + val DefaultReplicas: Int = 10 + +} +/** + * A Router that uses consistent hashing to select a connection based on the + * sent message. The messages must implement [[akka.routing.ConsistentHashable]] + * or be wrapped in a [[akka.routing.ConsistentHashableEnvelope]] to define what + * data to use for the consistent hash key. + * + * Please note that providing both 'nrOfInstances' and 'routees' does not make logical + * sense as this means that the router should both create new actors and use the 'routees' + * actor(s). In this case the 'nrOfInstances' will be ignored and the 'routees' will be used. + *
+ * The configuration parameter trumps the constructor arguments. This means that + * if you provide either 'nrOfInstances' or 'routees' during instantiation they will + * be ignored if the router is defined in the configuration file for the actor being used. + * + *

Supervision Setup

+ * + * The router creates a “head” actor which supervises and/or monitors the + * routees. Instances are created as children of this actor, hence the + * children are not supervised by the parent of the router. Common choices are + * to always escalate (meaning that fault handling is always applied to all + * children simultaneously; this is the default) or use the parent’s strategy, + * which will result in routed children being treated individually, but it is + * possible as well to use Routers to give different supervisor strategies to + * different groups of children. + * + * @param routees string representation of the actor paths of the routees that will be looked up + * using `actorFor` in [[akka.actor.ActorRefProvider]] + * @param replicas number of replicas (virtual nodes) used in [[akka.routing.ConsistantHash]] + */ +@SerialVersionUID(1L) +case class ConsistentHashingRouter( + nrOfInstances: Int = 0, routees: Iterable[String] = Nil, override val resizer: Option[Resizer] = None, + val routerDispatcher: String = Dispatchers.DefaultDispatcherId, + val supervisorStrategy: SupervisorStrategy = Router.defaultSupervisorStrategy, + val replicas: Int = ConsistentHashingRouter.DefaultReplicas) + extends RouterConfig with ConsistentHashingLike { + + /** + * Constructor that sets nrOfInstances to be created. + * Java API + */ + def this(nr: Int) = this(nrOfInstances = nr) + + /** + * Constructor that sets the routees to be used. + * Java API + * @param routeePaths string representation of the actor paths of the routees that will be looked up + * using `actorFor` in [[akka.actor.ActorRefProvider]] + */ + def this(routeePaths: java.lang.Iterable[String]) = this(routees = iterableAsScalaIterable(routeePaths)) + + /** + * Constructor that sets the resizer to be used. + * Java API + */ + def this(resizer: Resizer) = this(resizer = Some(resizer)) + + /** + * Java API for setting routerDispatcher + */ + def withDispatcher(dispatcherId: String): ConsistentHashingRouter = copy(routerDispatcher = dispatcherId) + + /** + * Java API for setting the supervisor strategy to be used for the “head” + * Router actor. + */ + def withSupervisorStrategy(strategy: SupervisorStrategy): ConsistentHashingRouter = copy(supervisorStrategy = strategy) + + /** + * Java API for setting the number of replicas (virtual nodes) used in [[akka.routing.ConsistantHash]] + */ + def withReplicas(replicas: Int): ConsistentHashingRouter = copy(replicas = replicas) + + /** + * Uses the resizer of the given RouterConfig if this RouterConfig + * doesn't have one, i.e. the resizer defined in code is used if + * resizer was not defined in config. + */ + override def withFallback(other: RouterConfig): RouterConfig = { + if (this.resizer.isEmpty && other.resizer.isDefined) copy(resizer = other.resizer) + else this + } +} + +trait ConsistentHashingLike { this: RouterConfig ⇒ + + import ConsistentHashingRouter._ + + def nrOfInstances: Int + + def routees: Iterable[String] + + def replicas: Int + + override def createRoute(routeeProvider: RouteeProvider): Route = { + if (resizer.isEmpty) { + if (routees.isEmpty) routeeProvider.createRoutees(nrOfInstances) + else routeeProvider.registerRouteesFor(routees) + } + + val log = Logging(routeeProvider.context.system, routeeProvider.context.self) + + // consistentHashRoutees and consistentHash are updated together, synchronized on the consistentHashLock + val consistentHashLock = new Object + var consistentHashRoutees: IndexedSeq[ActorRef] = null + var consistentHash: ConsistentHash[ActorRef] = null + upateConsistentHash() + + // update consistentHash when routees has changed + // changes to routees are rare and when no changes this is a quick operation + def upateConsistentHash(): ConsistentHash[ActorRef] = consistentHashLock.synchronized { + val currentRoutees = routeeProvider.routees + if ((currentRoutees ne consistentHashRoutees) && currentRoutees != consistentHashRoutees) { + consistentHashRoutees = currentRoutees + consistentHash = ConsistentHash(currentRoutees, replicas) + } + consistentHash + } + + def target(hashData: Any): ActorRef = try { + val hash = hashData match { + case bytes: Array[Byte] ⇒ bytes + case str: String ⇒ str.getBytes("UTF-8") + case x: AnyRef ⇒ SerializationExtension(routeeProvider.context.system).serialize(x).get + } + val currentConsistenHash = upateConsistentHash() + if (currentConsistenHash.isEmpty) routeeProvider.context.system.deadLetters + else currentConsistenHash.nodeFor(hash) + } catch { + case NonFatal(e) ⇒ + // serialization failed + log.warning("Couldn't route message with consistentHashKey [%s] due to [%s]".format(hashData, e.getMessage)) + routeeProvider.context.system.deadLetters + } + + { + case (sender, message) ⇒ + message match { + case Broadcast(msg) ⇒ toAll(sender, routeeProvider.routees) + case hashable: ConsistentHashable ⇒ List(Destination(sender, target(hashable.consistentHashKey))) + case other ⇒ + log.warning("Message [{}] must implement [{}] or be wrapped in [{}]", + message.getClass.getName, classOf[ConsistentHashable].getName, + classOf[ConsistentHashableEnvelope].getName) + List(Destination(sender, routeeProvider.context.system.deadLetters)) + } + } + } +} \ No newline at end of file diff --git a/akka-actor/src/main/scala/akka/routing/Routing.scala b/akka-actor/src/main/scala/akka/routing/Routing.scala index 1d18e7ed2e..9e17bf2b9a 100644 --- a/akka-actor/src/main/scala/akka/routing/Routing.scala +++ b/akka-actor/src/main/scala/akka/routing/Routing.scala @@ -115,8 +115,8 @@ private[akka] class RoutedActorCell(_system: ActorSystemImpl, _ref: InternalActo val s = if (sender eq null) system.deadLetters else sender val msg = message match { - case Broadcast(m) ⇒ m - case m ⇒ m + case wrapped: RouterEnvelope ⇒ wrapped.message + case m ⇒ m } applyRoute(s, message) match { @@ -400,7 +400,15 @@ private object Router { * Router implementations may choose to handle this message differently. */ @SerialVersionUID(1L) -case class Broadcast(message: Any) +case class Broadcast(message: Any) extends RouterEnvelope + +/** + * Only the contained message will be forwarded to the + * destination, i.e. the envelope will be stripped off. + */ +trait RouterEnvelope { + def message: Any +} /** * Sending this message to a router will make it send back its currently used routees.