diff --git a/akka-actor-tests/src/test/scala/akka/config/ConfigSpec.scala b/akka-actor-tests/src/test/scala/akka/config/ConfigSpec.scala index 5fe3466b25..56f8cd45fc 100644 --- a/akka-actor-tests/src/test/scala/akka/config/ConfigSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/config/ConfigSpec.scala @@ -43,6 +43,9 @@ class ConfigSpec extends AkkaSpec(ConfigFactory.defaultReference(ActorSystem.fin getBoolean("akka.jvm-exit-on-fatal-error") must be(true) settings.JvmExitOnFatalError must be(true) + + getInt("akka.actor.deployment.default.virtual-nodes-factor") must be(10) + settings.DefaultVirtualNodesFactor must be(10) } { diff --git a/akka-actor-tests/src/test/scala/akka/routing/ConsistentHashingRouterSpec.scala b/akka-actor-tests/src/test/scala/akka/routing/ConsistentHashingRouterSpec.scala index fd7a49e867..7a28190523 100644 --- a/akka-actor-tests/src/test/scala/akka/routing/ConsistentHashingRouterSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/routing/ConsistentHashingRouterSpec.scala @@ -22,6 +22,7 @@ object ConsistentHashingRouterSpec { /router1 { router = consistent-hashing nr-of-instances = 3 + virtual-nodes-factor = 17 } } """ @@ -55,18 +56,12 @@ class ConsistentHashingRouterSpec extends AkkaSpec(ConsistentHashingRouterSpec.c "select destination based on consistentHashKey of the message" in { router1 ! Msg("a", "A") val destinationA = expectMsgPF(remaining) { case ref: ActorRef ⇒ ref } - router1 ! new ConsistentHashableEnvelope { - override def consistentHashKey = "a" - override def message = "AA" - } + router1 ! ConsistentHashableEnvelope(message = "AA", consistentHashKey = "a") expectMsg(destinationA) router1 ! Msg(17, "B") val destinationB = expectMsgPF(remaining) { case ref: ActorRef ⇒ ref } - router1 ! new ConsistentHashableEnvelope { - override def consistentHashKey = 17 - override def message = "BB" - } + router1 ! ConsistentHashableEnvelope(message = "BB", consistentHashKey = 17) expectMsg(destinationB) router1 ! Msg(MsgKey("c"), "C") diff --git a/akka-actor/src/main/resources/reference.conf b/akka-actor/src/main/resources/reference.conf index a585fe3406..163d2e83d9 100644 --- a/akka-actor/src/main/resources/reference.conf +++ b/akka-actor/src/main/resources/reference.conf @@ -14,7 +14,7 @@ akka { # Event handlers to register at boot time (Logging$DefaultLogger logs to STDOUT) event-handlers = ["akka.event.Logging$DefaultLogger"] - + # Event handlers are created and registered synchronously during ActorSystem # start-up, and since they are actors, this timeout is used to bound the # waiting time @@ -49,7 +49,7 @@ akka { # FQCN of the ActorRefProvider to be used; the below is the built-in default, # another one is akka.remote.RemoteActorRefProvider in the akka-remote bundle. provider = "akka.actor.LocalActorRefProvider" - + # The guardian "/user" will use this subclass of akka.actor.SupervisorStrategyConfigurator # to obtain its supervisorStrategy. Besides the default there is # akka.actor.StoppingSupervisorStrategy @@ -69,7 +69,7 @@ akka { # Serializes and deserializes creators (in Props) to ensure that they can be sent over the network, # this is only intended for testing. serialize-creators = off - + # Timeout for send operations to top-level actors which are in the process of being started. # This is only relevant if using a bounded mailbox or the CallingThreadDispatcher for a top-level actor. unstarted-push-timeout = 10s @@ -108,6 +108,9 @@ akka { # within is the timeout used for routers containing future calls within = 5 seconds + # number of virtual nodes per node for consistent-hashing router + virtual-nodes-factor = 10 + routees { # Alternatively to giving nr-of-instances you can specify the full # paths of those actors which should be routed to. This setting takes @@ -296,7 +299,7 @@ akka { # enable DEBUG logging of subscription changes on the eventStream event-stream = off - + # enable DEBUG logging of unhandled messages unhandled = off @@ -318,13 +321,13 @@ akka { serialization-bindings { "java.io.Serializable" = java } - + # Configuration items which are used by the akka.actor.ActorDSL._ methods dsl { # Maximum queue size of the actor created by newInbox(); this protects against # faulty programs which use select() and consistently miss messages inbox-size = 1000 - + # Default timeout to assume for operations like Inbox.receive et al default-timeout = 5s } diff --git a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala index bce966b99e..deec7697df 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala @@ -166,6 +166,8 @@ object ActorSystem { final val Daemonicity: Boolean = getBoolean("akka.daemonic") final val JvmExitOnFatalError: Boolean = getBoolean("akka.jvm-exit-on-fatal-error") + final val DefaultVirtualNodesFactor: Int = getInt("akka.actor.deployment.default.virtual-nodes-factor") + if (ConfigVersion != Version) throw new akka.ConfigurationException("Akka JAR version [" + Version + "] does not match the provided config version [" + ConfigVersion + "]") diff --git a/akka-actor/src/main/scala/akka/actor/Deployer.scala b/akka-actor/src/main/scala/akka/actor/Deployer.scala index be9da1a505..3f5e2b1785 100644 --- a/akka-actor/src/main/scala/akka/actor/Deployer.scala +++ b/akka-actor/src/main/scala/akka/actor/Deployer.scala @@ -143,18 +143,20 @@ private[akka] class Deployer(val settings: ActorSystem.Settings, val dynamicAcce val nrOfInstances = deployment.getInt("nr-of-instances") - val within = Duration(deployment.getMilliseconds("within"), TimeUnit.MILLISECONDS) - val resizer: Option[Resizer] = if (config.hasPath("resizer")) Some(DefaultResizer(deployment.getConfig("resizer"))) else None val router: RouterConfig = deployment.getString("router") match { - case "from-code" ⇒ NoRouter - case "round-robin" ⇒ RoundRobinRouter(nrOfInstances, routees, resizer) - case "random" ⇒ RandomRouter(nrOfInstances, routees, resizer) - case "smallest-mailbox" ⇒ SmallestMailboxRouter(nrOfInstances, routees, resizer) - case "scatter-gather" ⇒ ScatterGatherFirstCompletedRouter(nrOfInstances, routees, within, resizer) - case "broadcast" ⇒ BroadcastRouter(nrOfInstances, routees, resizer) - case "consistent-hashing" ⇒ ConsistentHashingRouter(nrOfInstances, routees, resizer) + case "from-code" ⇒ NoRouter + case "round-robin" ⇒ RoundRobinRouter(nrOfInstances, routees, resizer) + case "random" ⇒ RandomRouter(nrOfInstances, routees, resizer) + case "smallest-mailbox" ⇒ SmallestMailboxRouter(nrOfInstances, routees, resizer) + case "broadcast" ⇒ BroadcastRouter(nrOfInstances, routees, resizer) + case "scatter-gather" ⇒ + val within = Duration(deployment.getMilliseconds("within"), TimeUnit.MILLISECONDS) + ScatterGatherFirstCompletedRouter(nrOfInstances, routees, within, resizer) + case "consistent-hashing" ⇒ + val vnodes = deployment.getInt("virtual-nodes-factor") + ConsistentHashingRouter(nrOfInstances, routees, resizer, virtualNodesFactor = vnodes) case fqn ⇒ val args = Seq(classOf[Config] -> deployment) dynamicAccess.createInstanceFor[RouterConfig](fqn, args).recover({ diff --git a/akka-actor/src/main/scala/akka/routing/ConsistentHash.scala b/akka-actor/src/main/scala/akka/routing/ConsistentHash.scala index 5dc23f1383..19e457a762 100644 --- a/akka-actor/src/main/scala/akka/routing/ConsistentHash.scala +++ b/akka-actor/src/main/scala/akka/routing/ConsistentHash.scala @@ -20,23 +20,22 @@ import scala.collection.immutable.TreeMap * hash, i.e. make sure it is different for different nodes. * */ -class ConsistentHash[T] private (replicas: Int, ring: TreeMap[Int, T]) { +class ConsistentHash[T] private (nodeRing: TreeMap[Int, T], virtualNodesFactor: Int) { import ConsistentHash._ - if (replicas < 1) throw new IllegalArgumentException("replicas must be >= 1") + if (virtualNodesFactor < 1) throw new IllegalArgumentException("virtualNodesFactor must be >= 1") /** - * Adds a node to the ring. + * Adds a node to the node ring. * Note that the instance is immutable and this * operation returns a new instance. */ - def :+(node: T): ConsistentHash[T] = { - new ConsistentHash(replicas, ring ++ ((1 to replicas) map { r ⇒ (nodeHashFor(node, r) -> node) })) - } + def :+(node: T): ConsistentHash[T] = + new ConsistentHash(nodeRing ++ ((1 to virtualNodesFactor) map { r ⇒ (nodeHashFor(node, r) -> node) }), virtualNodesFactor) /** - * Adds a node to the ring. + * Adds a node to the node ring. * Note that the instance is immutable and this * operation returns a new instance. * JAVA API @@ -44,16 +43,15 @@ class ConsistentHash[T] private (replicas: Int, ring: TreeMap[Int, T]) { def add(node: T): ConsistentHash[T] = this :+ node /** - * Removes a node from the ring. + * Removes a node from the node ring. * Note that the instance is immutable and this * operation returns a new instance. */ - def :-(node: T): ConsistentHash[T] = { - new ConsistentHash(replicas, ring -- ((1 to replicas) map { r ⇒ nodeHashFor(node, r) })) - } + def :-(node: T): ConsistentHash[T] = + new ConsistentHash(nodeRing -- ((1 to virtualNodesFactor) map { r ⇒ nodeHashFor(node, r) }), virtualNodesFactor) /** - * Removes a node from the ring. + * Removes a node from the node ring. * Note that the instance is immutable and this * operation returns a new instance. * JAVA API @@ -62,49 +60,42 @@ class ConsistentHash[T] private (replicas: Int, ring: TreeMap[Int, T]) { /** * Get the node responsible for the data key. - * Can only be used if nodes exists in the ring, + * Can only be used if nodes exists in the node ring, * otherwise throws `IllegalStateException` */ def nodeFor(key: Array[Byte]): T = { - if (isEmpty) throw new IllegalStateException("Can't get node for [%s] from an empty ring" format key) + if (isEmpty) throw new IllegalStateException("Can't get node for [%s] from an empty node ring" format key) val hash = hashFor(key) - def nextClockwise: T = { - val (ringKey, node) = ring.rangeImpl(Some(hash), None).headOption.getOrElse(ring.head) - node - } - ring.getOrElse(hash, nextClockwise) + // find the next node clockwise in the nodeRing, pick first if end of tree + nodeRing.rangeImpl(Some(hash), None).headOption.getOrElse(nodeRing.head)._2 } /** - * Is the ring empty, i.e. no nodes added or all removed. + * Is the node ring empty, i.e. no nodes added or all removed. */ - def isEmpty: Boolean = ring.isEmpty + def isEmpty: Boolean = nodeRing.isEmpty } object ConsistentHash { - def apply[T](nodes: Iterable[T], replicas: Int) = { - new ConsistentHash(replicas, TreeMap.empty[Int, T] ++ - (for (node ← nodes; replica ← 1 to replicas) yield (nodeHashFor(node, replica) -> node))) + def apply[T](nodes: Iterable[T], virtualNodesFactor: Int) = { + new ConsistentHash(TreeMap.empty[Int, T] ++ + (for (node ← nodes; vnode ← 1 to virtualNodesFactor) yield (nodeHashFor(node, vnode) -> node)), + virtualNodesFactor) } /** * Factory method to create a ConsistentHash * JAVA API */ - def create[T](nodes: java.lang.Iterable[T], replicas: Int) = { + def create[T](nodes: java.lang.Iterable[T], virtualNodesFactor: Int) = { import scala.collection.JavaConverters._ - apply(nodes.asScala, replicas) + apply(nodes.asScala, virtualNodesFactor) } - private def nodeHashFor(node: Any, replica: Int): Int = { - hashFor((node + ":" + replica).getBytes("UTF-8")) - } + private def nodeHashFor(node: Any, vnode: Int): Int = + hashFor((node + ":" + vnode).getBytes("UTF-8")) - private def hashFor(bytes: Array[Byte]): Int = { - val hash = MurmurHash.arrayHash(bytes) - if (hash == Int.MinValue) hash + 1 - math.abs(hash) - } + private def hashFor(bytes: Array[Byte]): Int = MurmurHash.arrayHash(bytes) } diff --git a/akka-actor/src/main/scala/akka/routing/ConsistentHashingRouter.scala b/akka-actor/src/main/scala/akka/routing/ConsistentHashingRouter.scala index eba812ac60..e177fc413a 100644 --- a/akka-actor/src/main/scala/akka/routing/ConsistentHashingRouter.scala +++ b/akka-actor/src/main/scala/akka/routing/ConsistentHashingRouter.scala @@ -5,13 +5,13 @@ package akka.routing import scala.collection.JavaConversions.iterableAsScalaIterable import scala.util.control.NonFatal - import akka.actor.ActorRef import akka.actor.SupervisorStrategy import akka.actor.Props import akka.dispatch.Dispatchers import akka.event.Logging import akka.serialization.SerializationExtension +import java.util.concurrent.atomic.AtomicReference object ConsistentHashingRouter { /** @@ -46,19 +46,13 @@ object ConsistentHashingRouter { /** * If messages can't implement [[akka.routing.ConsistentHashable]] - * themselves they can we wrapped by something implementing - * this interface instead. The router will only send the - * wrapped message to the destination, i.e. the envelope will - * be stripped off. + * themselves they can we wrapped by this envelope instead. The + * router will only send the wrapped message to the destination, + * i.e. the envelope will be stripped off. */ - trait ConsistentHashableEnvelope extends ConsistentHashable with RouterEnvelope { - def message: Any - } - - /** - * Default number of replicas (virtual nodes) used in [[akka.routing.ConsistantHash]] - */ - val DefaultReplicas: Int = 10 + @SerialVersionUID(1L) + case class ConsistentHashableEnvelope(message: Any, consistentHashKey: Any) + extends ConsistentHashable with RouterEnvelope } /** @@ -88,14 +82,14 @@ object ConsistentHashingRouter { * * @param routees string representation of the actor paths of the routees that will be looked up * using `actorFor` in [[akka.actor.ActorRefProvider]] - * @param replicas number of replicas (virtual nodes) used in [[akka.routing.ConsistantHash]] + * @param virtualNodesFactor number of virtual nodes per node, used in [[akka.routing.ConsistantHash]] */ @SerialVersionUID(1L) case class ConsistentHashingRouter( nrOfInstances: Int = 0, routees: Iterable[String] = Nil, override val resizer: Option[Resizer] = None, val routerDispatcher: String = Dispatchers.DefaultDispatcherId, val supervisorStrategy: SupervisorStrategy = Router.defaultSupervisorStrategy, - val replicas: Int = ConsistentHashingRouter.DefaultReplicas) + val virtualNodesFactor: Int = 0) extends RouterConfig with ConsistentHashingLike { /** @@ -130,9 +124,9 @@ case class ConsistentHashingRouter( def withSupervisorStrategy(strategy: SupervisorStrategy): ConsistentHashingRouter = copy(supervisorStrategy = strategy) /** - * Java API for setting the number of replicas (virtual nodes) used in [[akka.routing.ConsistantHash]] + * Java API for setting the number of virtual nodes per node, used in [[akka.routing.ConsistantHash]] */ - def withReplicas(replicas: Int): ConsistentHashingRouter = copy(replicas = replicas) + def withVirtualNodesFactor(vnodes: Int): ConsistentHashingRouter = copy(virtualNodesFactor = vnodes) /** * Uses the resizer of the given RouterConfig if this RouterConfig @@ -145,6 +139,10 @@ case class ConsistentHashingRouter( } } +/** + * The core pieces of the routing logic is located in this + * trait to be able to extend. + */ trait ConsistentHashingLike { this: RouterConfig ⇒ import ConsistentHashingRouter._ @@ -153,7 +151,7 @@ trait ConsistentHashingLike { this: RouterConfig ⇒ def routees: Iterable[String] - def replicas: Int + def virtualNodesFactor: Int override def createRoute(routeeProvider: RouteeProvider): Route = { if (resizer.isEmpty) { @@ -162,20 +160,25 @@ trait ConsistentHashingLike { this: RouterConfig ⇒ } val log = Logging(routeeProvider.context.system, routeeProvider.context.self) + val vnodes = + if (virtualNodesFactor == 0) routeeProvider.context.system.settings.DefaultVirtualNodesFactor + else virtualNodesFactor - // consistentHashRoutees and consistentHash are updated together, synchronized on the consistentHashLock - val consistentHashLock = new Object - var consistentHashRoutees: IndexedSeq[ActorRef] = null - var consistentHash: ConsistentHash[ActorRef] = null - upateConsistentHash() + var consistentHashRoutees = new AtomicReference[IndexedSeq[ActorRef]] + @volatile var consistentHash: ConsistentHash[ActorRef] = null + updateConsistentHash() // update consistentHash when routees has changed // changes to routees are rare and when no changes this is a quick operation - def upateConsistentHash(): ConsistentHash[ActorRef] = consistentHashLock.synchronized { + def updateConsistentHash(): ConsistentHash[ActorRef] = { val currentRoutees = routeeProvider.routees - if ((currentRoutees ne consistentHashRoutees) && currentRoutees != consistentHashRoutees) { - consistentHashRoutees = currentRoutees - consistentHash = ConsistentHash(currentRoutees, replicas) + if (currentRoutees ne consistentHashRoutees.get) { + val oldConsistentHashRoutees = consistentHashRoutees.get + val rehash = currentRoutees != oldConsistentHashRoutees + // when other instance, same content, no need to re-hash, but try to set currentRoutees + // ignore, don't update, in case of CAS failure + if (consistentHashRoutees.compareAndSet(oldConsistentHashRoutees, currentRoutees) && rehash) + consistentHash = ConsistentHash(currentRoutees, vnodes) } consistentHash } @@ -186,13 +189,13 @@ trait ConsistentHashingLike { this: RouterConfig ⇒ case str: String ⇒ str.getBytes("UTF-8") case x: AnyRef ⇒ SerializationExtension(routeeProvider.context.system).serialize(x).get } - val currentConsistenHash = upateConsistentHash() + val currentConsistenHash = updateConsistentHash() if (currentConsistenHash.isEmpty) routeeProvider.context.system.deadLetters else currentConsistenHash.nodeFor(hash) } catch { case NonFatal(e) ⇒ // serialization failed - log.warning("Couldn't route message with consistentHashKey [%s] due to [%s]".format(hashData, e.getMessage)) + log.warning("Couldn't route message with consistentHashKey [{}] due to [{}]", hashData, e.getMessage) routeeProvider.context.system.deadLetters } diff --git a/akka-actor/src/main/scala/akka/routing/Routing.scala b/akka-actor/src/main/scala/akka/routing/Routing.scala index 9e17bf2b9a..da3c439c4d 100644 --- a/akka-actor/src/main/scala/akka/routing/Routing.scala +++ b/akka-actor/src/main/scala/akka/routing/Routing.scala @@ -596,6 +596,10 @@ case class RoundRobinRouter(nrOfInstances: Int = 0, routees: Iterable[String] = } } +/** + * The core pieces of the routing logic is located in this + * trait to be able to extend. + */ trait RoundRobinLike { this: RouterConfig ⇒ def nrOfInstances: Int @@ -729,6 +733,10 @@ case class RandomRouter(nrOfInstances: Int = 0, routees: Iterable[String] = Nil, } } +/** + * The core pieces of the routing logic is located in this + * trait to be able to extend. + */ trait RandomLike { this: RouterConfig ⇒ def nrOfInstances: Int @@ -869,6 +877,10 @@ case class SmallestMailboxRouter(nrOfInstances: Int = 0, routees: Iterable[Strin } } +/** + * The core pieces of the routing logic is located in this + * trait to be able to extend. + */ trait SmallestMailboxLike { this: RouterConfig ⇒ def nrOfInstances: Int @@ -1084,6 +1096,10 @@ case class BroadcastRouter(nrOfInstances: Int = 0, routees: Iterable[String] = N } } +/** + * The core pieces of the routing logic is located in this + * trait to be able to extend. + */ trait BroadcastLike { this: RouterConfig ⇒ def nrOfInstances: Int @@ -1213,6 +1229,10 @@ case class ScatterGatherFirstCompletedRouter(nrOfInstances: Int = 0, routees: It } } +/** + * The core pieces of the routing logic is located in this + * trait to be able to extend. + */ trait ScatterGatherFirstCompletedLike { this: RouterConfig ⇒ def nrOfInstances: Int diff --git a/akka-docs/scala/code/docs/routing/RouterViaConfigDocSpec.scala b/akka-docs/scala/code/docs/routing/RouterViaConfigDocSpec.scala index 265b0f8361..24dc291087 100644 --- a/akka-docs/scala/code/docs/routing/RouterViaConfigDocSpec.scala +++ b/akka-docs/scala/code/docs/routing/RouterViaConfigDocSpec.scala @@ -77,6 +77,7 @@ object RouterWithConfigDocSpec { /myrouter7 { router = consistent-hashing nr-of-instances = 5 + virtual-nodes-factor = 10 } } //#config-consistent-hashing