Improvements based on review feedback, see #944

This commit is contained in:
Patrik Nordwall 2012-09-13 18:06:35 +02:00
parent d74464ba50
commit 5a90d7198c
9 changed files with 106 additions and 86 deletions

View file

@ -43,6 +43,9 @@ class ConfigSpec extends AkkaSpec(ConfigFactory.defaultReference(ActorSystem.fin
getBoolean("akka.jvm-exit-on-fatal-error") must be(true)
settings.JvmExitOnFatalError must be(true)
getInt("akka.actor.deployment.default.virtual-nodes-factor") must be(10)
settings.DefaultVirtualNodesFactor must be(10)
}
{

View file

@ -22,6 +22,7 @@ object ConsistentHashingRouterSpec {
/router1 {
router = consistent-hashing
nr-of-instances = 3
virtual-nodes-factor = 17
}
}
"""
@ -55,18 +56,12 @@ class ConsistentHashingRouterSpec extends AkkaSpec(ConsistentHashingRouterSpec.c
"select destination based on consistentHashKey of the message" in {
router1 ! Msg("a", "A")
val destinationA = expectMsgPF(remaining) { case ref: ActorRef ref }
router1 ! new ConsistentHashableEnvelope {
override def consistentHashKey = "a"
override def message = "AA"
}
router1 ! ConsistentHashableEnvelope(message = "AA", consistentHashKey = "a")
expectMsg(destinationA)
router1 ! Msg(17, "B")
val destinationB = expectMsgPF(remaining) { case ref: ActorRef ref }
router1 ! new ConsistentHashableEnvelope {
override def consistentHashKey = 17
override def message = "BB"
}
router1 ! ConsistentHashableEnvelope(message = "BB", consistentHashKey = 17)
expectMsg(destinationB)
router1 ! Msg(MsgKey("c"), "C")

View file

@ -14,7 +14,7 @@ akka {
# Event handlers to register at boot time (Logging$DefaultLogger logs to STDOUT)
event-handlers = ["akka.event.Logging$DefaultLogger"]
# Event handlers are created and registered synchronously during ActorSystem
# start-up, and since they are actors, this timeout is used to bound the
# waiting time
@ -49,7 +49,7 @@ akka {
# FQCN of the ActorRefProvider to be used; the below is the built-in default,
# another one is akka.remote.RemoteActorRefProvider in the akka-remote bundle.
provider = "akka.actor.LocalActorRefProvider"
# The guardian "/user" will use this subclass of akka.actor.SupervisorStrategyConfigurator
# to obtain its supervisorStrategy. Besides the default there is
# akka.actor.StoppingSupervisorStrategy
@ -69,7 +69,7 @@ akka {
# Serializes and deserializes creators (in Props) to ensure that they can be sent over the network,
# this is only intended for testing.
serialize-creators = off
# Timeout for send operations to top-level actors which are in the process of being started.
# This is only relevant if using a bounded mailbox or the CallingThreadDispatcher for a top-level actor.
unstarted-push-timeout = 10s
@ -108,6 +108,9 @@ akka {
# within is the timeout used for routers containing future calls
within = 5 seconds
# number of virtual nodes per node for consistent-hashing router
virtual-nodes-factor = 10
routees {
# Alternatively to giving nr-of-instances you can specify the full
# paths of those actors which should be routed to. This setting takes
@ -296,7 +299,7 @@ akka {
# enable DEBUG logging of subscription changes on the eventStream
event-stream = off
# enable DEBUG logging of unhandled messages
unhandled = off
@ -318,13 +321,13 @@ akka {
serialization-bindings {
"java.io.Serializable" = java
}
# Configuration items which are used by the akka.actor.ActorDSL._ methods
dsl {
# Maximum queue size of the actor created by newInbox(); this protects against
# faulty programs which use select() and consistently miss messages
inbox-size = 1000
# Default timeout to assume for operations like Inbox.receive et al
default-timeout = 5s
}

View file

@ -166,6 +166,8 @@ object ActorSystem {
final val Daemonicity: Boolean = getBoolean("akka.daemonic")
final val JvmExitOnFatalError: Boolean = getBoolean("akka.jvm-exit-on-fatal-error")
final val DefaultVirtualNodesFactor: Int = getInt("akka.actor.deployment.default.virtual-nodes-factor")
if (ConfigVersion != Version)
throw new akka.ConfigurationException("Akka JAR version [" + Version + "] does not match the provided config version [" + ConfigVersion + "]")

View file

@ -143,18 +143,20 @@ private[akka] class Deployer(val settings: ActorSystem.Settings, val dynamicAcce
val nrOfInstances = deployment.getInt("nr-of-instances")
val within = Duration(deployment.getMilliseconds("within"), TimeUnit.MILLISECONDS)
val resizer: Option[Resizer] = if (config.hasPath("resizer")) Some(DefaultResizer(deployment.getConfig("resizer"))) else None
val router: RouterConfig = deployment.getString("router") match {
case "from-code" NoRouter
case "round-robin" RoundRobinRouter(nrOfInstances, routees, resizer)
case "random" RandomRouter(nrOfInstances, routees, resizer)
case "smallest-mailbox" SmallestMailboxRouter(nrOfInstances, routees, resizer)
case "scatter-gather" ScatterGatherFirstCompletedRouter(nrOfInstances, routees, within, resizer)
case "broadcast" BroadcastRouter(nrOfInstances, routees, resizer)
case "consistent-hashing" ConsistentHashingRouter(nrOfInstances, routees, resizer)
case "from-code" NoRouter
case "round-robin" RoundRobinRouter(nrOfInstances, routees, resizer)
case "random" RandomRouter(nrOfInstances, routees, resizer)
case "smallest-mailbox" SmallestMailboxRouter(nrOfInstances, routees, resizer)
case "broadcast" BroadcastRouter(nrOfInstances, routees, resizer)
case "scatter-gather"
val within = Duration(deployment.getMilliseconds("within"), TimeUnit.MILLISECONDS)
ScatterGatherFirstCompletedRouter(nrOfInstances, routees, within, resizer)
case "consistent-hashing"
val vnodes = deployment.getInt("virtual-nodes-factor")
ConsistentHashingRouter(nrOfInstances, routees, resizer, virtualNodesFactor = vnodes)
case fqn
val args = Seq(classOf[Config] -> deployment)
dynamicAccess.createInstanceFor[RouterConfig](fqn, args).recover({

View file

@ -20,23 +20,22 @@ import scala.collection.immutable.TreeMap
* hash, i.e. make sure it is different for different nodes.
*
*/
class ConsistentHash[T] private (replicas: Int, ring: TreeMap[Int, T]) {
class ConsistentHash[T] private (nodeRing: TreeMap[Int, T], virtualNodesFactor: Int) {
import ConsistentHash._
if (replicas < 1) throw new IllegalArgumentException("replicas must be >= 1")
if (virtualNodesFactor < 1) throw new IllegalArgumentException("virtualNodesFactor must be >= 1")
/**
* Adds a node to the ring.
* Adds a node to the node ring.
* Note that the instance is immutable and this
* operation returns a new instance.
*/
def :+(node: T): ConsistentHash[T] = {
new ConsistentHash(replicas, ring ++ ((1 to replicas) map { r (nodeHashFor(node, r) -> node) }))
}
def :+(node: T): ConsistentHash[T] =
new ConsistentHash(nodeRing ++ ((1 to virtualNodesFactor) map { r (nodeHashFor(node, r) -> node) }), virtualNodesFactor)
/**
* Adds a node to the ring.
* Adds a node to the node ring.
* Note that the instance is immutable and this
* operation returns a new instance.
* JAVA API
@ -44,16 +43,15 @@ class ConsistentHash[T] private (replicas: Int, ring: TreeMap[Int, T]) {
def add(node: T): ConsistentHash[T] = this :+ node
/**
* Removes a node from the ring.
* Removes a node from the node ring.
* Note that the instance is immutable and this
* operation returns a new instance.
*/
def :-(node: T): ConsistentHash[T] = {
new ConsistentHash(replicas, ring -- ((1 to replicas) map { r nodeHashFor(node, r) }))
}
def :-(node: T): ConsistentHash[T] =
new ConsistentHash(nodeRing -- ((1 to virtualNodesFactor) map { r nodeHashFor(node, r) }), virtualNodesFactor)
/**
* Removes a node from the ring.
* Removes a node from the node ring.
* Note that the instance is immutable and this
* operation returns a new instance.
* JAVA API
@ -62,49 +60,42 @@ class ConsistentHash[T] private (replicas: Int, ring: TreeMap[Int, T]) {
/**
* Get the node responsible for the data key.
* Can only be used if nodes exists in the ring,
* Can only be used if nodes exists in the node ring,
* otherwise throws `IllegalStateException`
*/
def nodeFor(key: Array[Byte]): T = {
if (isEmpty) throw new IllegalStateException("Can't get node for [%s] from an empty ring" format key)
if (isEmpty) throw new IllegalStateException("Can't get node for [%s] from an empty node ring" format key)
val hash = hashFor(key)
def nextClockwise: T = {
val (ringKey, node) = ring.rangeImpl(Some(hash), None).headOption.getOrElse(ring.head)
node
}
ring.getOrElse(hash, nextClockwise)
// find the next node clockwise in the nodeRing, pick first if end of tree
nodeRing.rangeImpl(Some(hash), None).headOption.getOrElse(nodeRing.head)._2
}
/**
* Is the ring empty, i.e. no nodes added or all removed.
* Is the node ring empty, i.e. no nodes added or all removed.
*/
def isEmpty: Boolean = ring.isEmpty
def isEmpty: Boolean = nodeRing.isEmpty
}
object ConsistentHash {
def apply[T](nodes: Iterable[T], replicas: Int) = {
new ConsistentHash(replicas, TreeMap.empty[Int, T] ++
(for (node nodes; replica 1 to replicas) yield (nodeHashFor(node, replica) -> node)))
def apply[T](nodes: Iterable[T], virtualNodesFactor: Int) = {
new ConsistentHash(TreeMap.empty[Int, T] ++
(for (node nodes; vnode 1 to virtualNodesFactor) yield (nodeHashFor(node, vnode) -> node)),
virtualNodesFactor)
}
/**
* Factory method to create a ConsistentHash
* JAVA API
*/
def create[T](nodes: java.lang.Iterable[T], replicas: Int) = {
def create[T](nodes: java.lang.Iterable[T], virtualNodesFactor: Int) = {
import scala.collection.JavaConverters._
apply(nodes.asScala, replicas)
apply(nodes.asScala, virtualNodesFactor)
}
private def nodeHashFor(node: Any, replica: Int): Int = {
hashFor((node + ":" + replica).getBytes("UTF-8"))
}
private def nodeHashFor(node: Any, vnode: Int): Int =
hashFor((node + ":" + vnode).getBytes("UTF-8"))
private def hashFor(bytes: Array[Byte]): Int = {
val hash = MurmurHash.arrayHash(bytes)
if (hash == Int.MinValue) hash + 1
math.abs(hash)
}
private def hashFor(bytes: Array[Byte]): Int = MurmurHash.arrayHash(bytes)
}

View file

@ -5,13 +5,13 @@ package akka.routing
import scala.collection.JavaConversions.iterableAsScalaIterable
import scala.util.control.NonFatal
import akka.actor.ActorRef
import akka.actor.SupervisorStrategy
import akka.actor.Props
import akka.dispatch.Dispatchers
import akka.event.Logging
import akka.serialization.SerializationExtension
import java.util.concurrent.atomic.AtomicReference
object ConsistentHashingRouter {
/**
@ -46,19 +46,13 @@ object ConsistentHashingRouter {
/**
* If messages can't implement [[akka.routing.ConsistentHashable]]
* themselves they can we wrapped by something implementing
* this interface instead. The router will only send the
* wrapped message to the destination, i.e. the envelope will
* be stripped off.
* themselves they can we wrapped by this envelope instead. The
* router will only send the wrapped message to the destination,
* i.e. the envelope will be stripped off.
*/
trait ConsistentHashableEnvelope extends ConsistentHashable with RouterEnvelope {
def message: Any
}
/**
* Default number of replicas (virtual nodes) used in [[akka.routing.ConsistantHash]]
*/
val DefaultReplicas: Int = 10
@SerialVersionUID(1L)
case class ConsistentHashableEnvelope(message: Any, consistentHashKey: Any)
extends ConsistentHashable with RouterEnvelope
}
/**
@ -88,14 +82,14 @@ object ConsistentHashingRouter {
*
* @param routees string representation of the actor paths of the routees that will be looked up
* using `actorFor` in [[akka.actor.ActorRefProvider]]
* @param replicas number of replicas (virtual nodes) used in [[akka.routing.ConsistantHash]]
* @param virtualNodesFactor number of virtual nodes per node, used in [[akka.routing.ConsistantHash]]
*/
@SerialVersionUID(1L)
case class ConsistentHashingRouter(
nrOfInstances: Int = 0, routees: Iterable[String] = Nil, override val resizer: Option[Resizer] = None,
val routerDispatcher: String = Dispatchers.DefaultDispatcherId,
val supervisorStrategy: SupervisorStrategy = Router.defaultSupervisorStrategy,
val replicas: Int = ConsistentHashingRouter.DefaultReplicas)
val virtualNodesFactor: Int = 0)
extends RouterConfig with ConsistentHashingLike {
/**
@ -130,9 +124,9 @@ case class ConsistentHashingRouter(
def withSupervisorStrategy(strategy: SupervisorStrategy): ConsistentHashingRouter = copy(supervisorStrategy = strategy)
/**
* Java API for setting the number of replicas (virtual nodes) used in [[akka.routing.ConsistantHash]]
* Java API for setting the number of virtual nodes per node, used in [[akka.routing.ConsistantHash]]
*/
def withReplicas(replicas: Int): ConsistentHashingRouter = copy(replicas = replicas)
def withVirtualNodesFactor(vnodes: Int): ConsistentHashingRouter = copy(virtualNodesFactor = vnodes)
/**
* Uses the resizer of the given RouterConfig if this RouterConfig
@ -145,6 +139,10 @@ case class ConsistentHashingRouter(
}
}
/**
* The core pieces of the routing logic is located in this
* trait to be able to extend.
*/
trait ConsistentHashingLike { this: RouterConfig
import ConsistentHashingRouter._
@ -153,7 +151,7 @@ trait ConsistentHashingLike { this: RouterConfig ⇒
def routees: Iterable[String]
def replicas: Int
def virtualNodesFactor: Int
override def createRoute(routeeProvider: RouteeProvider): Route = {
if (resizer.isEmpty) {
@ -162,20 +160,25 @@ trait ConsistentHashingLike { this: RouterConfig ⇒
}
val log = Logging(routeeProvider.context.system, routeeProvider.context.self)
val vnodes =
if (virtualNodesFactor == 0) routeeProvider.context.system.settings.DefaultVirtualNodesFactor
else virtualNodesFactor
// consistentHashRoutees and consistentHash are updated together, synchronized on the consistentHashLock
val consistentHashLock = new Object
var consistentHashRoutees: IndexedSeq[ActorRef] = null
var consistentHash: ConsistentHash[ActorRef] = null
upateConsistentHash()
var consistentHashRoutees = new AtomicReference[IndexedSeq[ActorRef]]
@volatile var consistentHash: ConsistentHash[ActorRef] = null
updateConsistentHash()
// update consistentHash when routees has changed
// changes to routees are rare and when no changes this is a quick operation
def upateConsistentHash(): ConsistentHash[ActorRef] = consistentHashLock.synchronized {
def updateConsistentHash(): ConsistentHash[ActorRef] = {
val currentRoutees = routeeProvider.routees
if ((currentRoutees ne consistentHashRoutees) && currentRoutees != consistentHashRoutees) {
consistentHashRoutees = currentRoutees
consistentHash = ConsistentHash(currentRoutees, replicas)
if (currentRoutees ne consistentHashRoutees.get) {
val oldConsistentHashRoutees = consistentHashRoutees.get
val rehash = currentRoutees != oldConsistentHashRoutees
// when other instance, same content, no need to re-hash, but try to set currentRoutees
// ignore, don't update, in case of CAS failure
if (consistentHashRoutees.compareAndSet(oldConsistentHashRoutees, currentRoutees) && rehash)
consistentHash = ConsistentHash(currentRoutees, vnodes)
}
consistentHash
}
@ -186,13 +189,13 @@ trait ConsistentHashingLike { this: RouterConfig ⇒
case str: String str.getBytes("UTF-8")
case x: AnyRef SerializationExtension(routeeProvider.context.system).serialize(x).get
}
val currentConsistenHash = upateConsistentHash()
val currentConsistenHash = updateConsistentHash()
if (currentConsistenHash.isEmpty) routeeProvider.context.system.deadLetters
else currentConsistenHash.nodeFor(hash)
} catch {
case NonFatal(e)
// serialization failed
log.warning("Couldn't route message with consistentHashKey [%s] due to [%s]".format(hashData, e.getMessage))
log.warning("Couldn't route message with consistentHashKey [{}] due to [{}]", hashData, e.getMessage)
routeeProvider.context.system.deadLetters
}

View file

@ -596,6 +596,10 @@ case class RoundRobinRouter(nrOfInstances: Int = 0, routees: Iterable[String] =
}
}
/**
* The core pieces of the routing logic is located in this
* trait to be able to extend.
*/
trait RoundRobinLike { this: RouterConfig
def nrOfInstances: Int
@ -729,6 +733,10 @@ case class RandomRouter(nrOfInstances: Int = 0, routees: Iterable[String] = Nil,
}
}
/**
* The core pieces of the routing logic is located in this
* trait to be able to extend.
*/
trait RandomLike { this: RouterConfig
def nrOfInstances: Int
@ -869,6 +877,10 @@ case class SmallestMailboxRouter(nrOfInstances: Int = 0, routees: Iterable[Strin
}
}
/**
* The core pieces of the routing logic is located in this
* trait to be able to extend.
*/
trait SmallestMailboxLike { this: RouterConfig
def nrOfInstances: Int
@ -1084,6 +1096,10 @@ case class BroadcastRouter(nrOfInstances: Int = 0, routees: Iterable[String] = N
}
}
/**
* The core pieces of the routing logic is located in this
* trait to be able to extend.
*/
trait BroadcastLike { this: RouterConfig
def nrOfInstances: Int
@ -1213,6 +1229,10 @@ case class ScatterGatherFirstCompletedRouter(nrOfInstances: Int = 0, routees: It
}
}
/**
* The core pieces of the routing logic is located in this
* trait to be able to extend.
*/
trait ScatterGatherFirstCompletedLike { this: RouterConfig
def nrOfInstances: Int

View file

@ -77,6 +77,7 @@ object RouterWithConfigDocSpec {
/myrouter7 {
router = consistent-hashing
nr-of-instances = 5
virtual-nodes-factor = 10
}
}
//#config-consistent-hashing