+act #13004 Adding TailChopping router

This commit is contained in:
Krzysztof Janosz 2014-06-05 21:56:05 +02:00
parent b88c964bd4
commit 1658f96862
8 changed files with 631 additions and 27 deletions

View file

@ -0,0 +1,123 @@
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.routing
import java.util.concurrent.atomic.AtomicInteger
import akka.actor.Status.Failure
import scala.concurrent.Await
import scala.concurrent.duration._
import akka.actor.{ ActorRef, Props, Actor, ActorSystem }
import akka.pattern.{ AskTimeoutException, ask }
import akka.testkit._
object TailChoppingSpec {
def newActor(id: Int, sleepTime: Duration)(implicit system: ActorSystem) =
system.actorOf(Props(new Actor {
var times: Int = _
def receive = {
case "stop" context.stop(self)
case "times" sender() ! times
case x
times += 1
Thread sleep sleepTime.toMillis
sender ! "ack"
}
}), "Actor:" + id)
}
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class TailChoppingSpec extends AkkaSpec with DefaultTimeout with ImplicitSender {
import TailChoppingSpec._
def oneOfShouldEqual(what: Any, default: Any, ref: ActorRef*)(f: ActorRef Any) {
val results = ref.map(p f(p))
results.count(_ == what) should equal(1)
results.count(_ == default) should equal(results.size - 1)
}
def allShouldEqual(what: Any, ref: ActorRef*)(f: ActorRef Any) {
val results = ref.map(p f(p))
results.count(_ == what) should equal(results.size)
}
"Tail-chopping group" must {
"deliver a broadcast message using the !" in {
val doneLatch = new TestLatch(2)
val counter1 = new AtomicInteger
val actor1 = system.actorOf(Props(new Actor {
def receive = {
case "end" doneLatch.countDown()
case msg: Int counter1.addAndGet(msg)
}
}))
val counter2 = new AtomicInteger
val actor2 = system.actorOf(Props(new Actor {
def receive = {
case "end" doneLatch.countDown()
case msg: Int counter2.addAndGet(msg)
}
}))
val paths = List(actor1, actor2).map(_.path.toString)
val routedActor = system.actorOf(TailChoppingGroup(paths, within = 1.second, interval = 100.millisecond).props())
routedActor ! Broadcast(1)
routedActor ! Broadcast("end")
Await.ready(doneLatch, TestLatch.DefaultTimeout)
counter1.get should be(1)
counter2.get should be(1)
}
"return response from second actor after inactivity from first one" in {
val actor1 = newActor(1, 1.millis)
val actor2 = newActor(2, 1.millis)
val probe = TestProbe()
val paths = List(actor1, actor2).map(_.path.toString)
val routedActor = system.actorOf(TailChoppingGroup(paths, within = 1.seconds, interval = 50.millisecond).props())
probe.send(routedActor, "")
probe.expectMsg("ack")
oneOfShouldEqual(1, 1, actor1, actor2)(ref Await.result(ref ? "times", timeout.duration))
routedActor ! Broadcast("stop")
}
"throw exception if no result will arrive within given time" in {
val actor1 = newActor(3, 500.millis)
val actor2 = newActor(4, 500.millis)
val probe = TestProbe()
val paths = List(actor1, actor2).map(_.path.toString)
val routedActor = system.actorOf(TailChoppingGroup(paths, within = 300.milliseconds,
interval = 50.milliseconds).props())
probe.send(routedActor, "")
probe.expectMsgPF() {
case Failure(timeoutEx: AskTimeoutException)
}
allShouldEqual(1, actor1, actor2)(ref Await.result(ref ? "times", timeout.duration))
routedActor ! Broadcast("stop")
}
"reply ASAP" in {
val actor1 = newActor(5, 1.seconds)
val actor2 = newActor(6, 4.seconds)
val probe = TestProbe()
val paths = List(actor1, actor2).map(_.path.toString)
val routedActor = system.actorOf(TailChoppingGroup(paths, within = 5.seconds, interval = 100.milliseconds).props())
probe.send(routedActor, "")
probe.expectMsg(max = 2.seconds, "ack")
routedActor ! Broadcast("stop")
}
}
}

View file

@ -109,6 +109,8 @@ akka {
broadcast-group = "akka.routing.BroadcastGroup"
scatter-gather-pool = "akka.routing.ScatterGatherFirstCompletedPool"
scatter-gather-group = "akka.routing.ScatterGatherFirstCompletedGroup"
tail-chopping-pool = "akka.routing.TailChoppingPool"
tail-chopping-group = "akka.routing.TailChoppingGroup"
consistent-hashing-pool = "akka.routing.ConsistentHashingPool"
consistent-hashing-group = "akka.routing.ConsistentHashingGroup"
}
@ -163,6 +165,11 @@ akka {
# number of virtual nodes per node for consistent-hashing router
virtual-nodes-factor = 10
tail-chopping-router {
# interval is duration between sending message to next routee
interval = 10 milliseconds
}
routees {
# Alternatively to giving nr-of-instances you can specify the full
# paths of those actors which should be routed to. This setting takes

View file

@ -0,0 +1,229 @@
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.routing
import java.util.concurrent.atomic.AtomicInteger
import scala.collection.immutable
import akka.actor._
import akka.dispatch.Dispatchers
import com.typesafe.config.Config
import akka.japi.Util.immutableSeq
import scala.concurrent.{ ExecutionContext, Promise }
import akka.pattern.{ AskTimeoutException, after, ask, pipe }
import scala.concurrent.duration._
import akka.util.Timeout
import akka.util.Helpers.ConfigOps
import scala.util.Random
/**
* Sends the message to a first, random picked, routee,
* then wait a specified `interval` and then send to a second, random picked, and so on till one full cycle.
*
* @param scheduler schedules sending messages to routees
*
* @param within expecting at least one reply within this duration, otherwise
* it will reply with [[akka.pattern.AskTimeoutException]] in a [[akka.actor.Status.Failure]]
*
* @param interval duration after which next routee will be picked
*
* @param context execution context used by scheduler
*/
@SerialVersionUID(1L)
final case class TailChoppingRoutingLogic(scheduler: Scheduler, within: FiniteDuration,
interval: FiniteDuration, context: ExecutionContext) extends RoutingLogic {
override def select(message: Any, routees: immutable.IndexedSeq[Routee]): Routee = {
if (routees.isEmpty) NoRoutee
else TailChoppingRoutees(scheduler, routees, within, interval)(context)
}
}
/**
* INTERNAL API
*/
@SerialVersionUID(1L)
private[akka] final case class TailChoppingRoutees(
scheduler: Scheduler, routees: immutable.IndexedSeq[Routee],
within: FiniteDuration, interval: FiniteDuration)(implicit ec: ExecutionContext) extends Routee {
override def send(message: Any, sender: ActorRef): Unit = {
implicit val timeout = Timeout(within)
val promise = Promise[Any]()
val shuffled = Random.shuffle(routees)
val aIdx = new AtomicInteger()
val size = shuffled.length
val tryWithNext = scheduler.schedule(0.millis, interval) {
val idx = aIdx.getAndIncrement
if (idx < size) {
shuffled(idx) match {
case ActorRefRoutee(ref)
promise.tryCompleteWith(ref.ask(message))
case ActorSelectionRoutee(sel)
promise.tryCompleteWith(sel.ask(message))
case _
}
}
}
val sendTimeout = scheduler.scheduleOnce(within)(promise.tryFailure(
new AskTimeoutException(s"Ask timed out on [$sender] after [$within.toMillis} ms]")))
val f = promise.future
f.onComplete {
case _
tryWithNext.cancel()
sendTimeout.cancel()
}
f.pipeTo(sender)
}
}
/**
* A router poll thats sends the message to a first, random picked, routee,
* then wait a specified `interval` and then send to a second, random picked, and so on till one full cycle..
*
* The configuration parameter trumps the constructor arguments. This means that
* if you provide `nrOfInstances` during instantiation they will be ignored if
* the router is defined in the configuration file for the actor being used.
*
* <h1>Supervision Setup</h1>
*
* Any routees that are created by a router will be created as the router's children.
* The router is therefore also the children's supervisor.
*
* The supervision strategy of the router actor can be configured with
* [[#withSupervisorStrategy]]. If no strategy is provided, routers default to
* a strategy of always escalate. This means that errors are passed up to the
* router's supervisor for handling.
*
* The router's supervisor will treat the error as an error with the router itself.
* Therefore a directive to stop or restart will cause the router itself to stop or
* restart. The router, in turn, will cause its children to stop and restart.
*
* @param nrOfInstances initial number of routees in the pool
*
* @param resizer optional resizer that dynamically adjust the pool size
*
* @param within expecting at least one reply within this duration, otherwise
* it will reply with [[akka.pattern.AskTimeoutException]] in a [[akka.actor.Status.Failure]]
*
* @param interval duration after which next routee will be picked
*
* @param supervisorStrategy strategy for supervising the routees, see 'Supervision Setup'
*
* @param routerDispatcher dispatcher to use for the router head actor, which handles
* supervision, death watch and router management messages
*/
@SerialVersionUID(1L)
final case class TailChoppingPool(
override val nrOfInstances: Int, override val resizer: Option[Resizer] = None,
within: FiniteDuration,
interval: FiniteDuration,
override val supervisorStrategy: SupervisorStrategy = Pool.defaultSupervisorStrategy,
override val routerDispatcher: String = Dispatchers.DefaultDispatcherId,
override val usePoolDispatcher: Boolean = false)
extends Pool with PoolOverrideUnsetConfig[TailChoppingPool] {
def this(config: Config) =
this(
nrOfInstances = config.getInt("nr-of-instances"),
within = config.getMillisDuration("within"),
interval = config.getMillisDuration("tail-chopping-router.interval"),
resizer = DefaultResizer.fromConfig(config),
usePoolDispatcher = config.hasPath("pool-dispatcher"))
/**
* Java API
* @param nr initial number of routees in the pool
* @param within expecting at least one reply within this duration, otherwise
* it will reply with [[akka.pattern.AskTimeoutException]] in a [[akka.actor.Status.Failure]]
* @param interval duration after which next routee will be picked
*/
def this(nr: Int, within: FiniteDuration, interval: FiniteDuration) =
this(nrOfInstances = nr, within = within, interval = interval)
override def createRouter(system: ActorSystem): Router =
new Router(TailChoppingRoutingLogic(system.scheduler, within,
interval, system.dispatchers.lookup(routerDispatcher)))
/**
* Setting the supervisor strategy to be used for the head Router actor.
*/
def withSupervisorStrategy(strategy: SupervisorStrategy): TailChoppingPool = copy(supervisorStrategy = strategy)
/**
* Setting the resizer to be used.
*/
def withResizer(resizer: Resizer): TailChoppingPool = copy(resizer = Some(resizer))
/**
* Setting the dispatcher to be used for the router head actor, which handles
* supervision, death watch and router management messages.
*/
def withDispatcher(dispatcherId: String): TailChoppingPool = copy(routerDispatcher = dispatcherId)
/**
* Uses the resizer and/or the supervisor strategy of the given Routerconfig
* if this RouterConfig doesn't have one, i.e. the resizer defined in code is used if
* resizer was not defined in config.
*/
override def withFallback(other: RouterConfig): RouterConfig = this.overrideUnsetConfig(other)
}
/**
* A router group that sends the message to a first, random picked, routee,
* then wait a specified `interval` and then send to a second, random picked, and so on till one full cycle..
*
* The configuration parameter trumps the constructor arguments. This means that
* if you provide `paths` during instantiation they will be ignored if
* the router is defined in the configuration file for the actor being used.
*
* @param paths string representation of the actor paths of the routees, messages are
* sent with [[akka.actor.ActorSelection]] to these paths
*
* @param within expecting at least one reply within this duration, otherwise
* it will reply with [[akka.pattern.AskTimeoutException]] in a [[akka.actor.Status.Failure]]
*
* @param interval duration after which next routee will be picked
*
* @param routerDispatcher dispatcher to use for the router head actor, which handles
* router management messages
*/
final case class TailChoppingGroup(
override val paths: immutable.Iterable[String],
within: FiniteDuration,
interval: FiniteDuration,
override val routerDispatcher: String = Dispatchers.DefaultDispatcherId) extends Group {
def this(config: Config) =
this(
paths = immutableSeq(config.getStringList("routees.paths")),
within = config.getMillisDuration("within"),
interval = config.getMillisDuration("tail-chopping-router.interval"))
/**
* Java API
* @param routeePaths string representation of the actor paths of the routees, messages are
* sent with [[akka.actor.ActorSelection]] to these paths
* @param within expecting at least one reply within this duration, otherwise
* it will reply with [[akka.pattern.AskTimeoutException]] in a [[akka.actor.Status.Failure]]
* @param interval duration after which next routee will be picked
*/
def this(routeePaths: java.lang.Iterable[String], within: FiniteDuration, interval: FiniteDuration) =
this(paths = immutableSeq(routeePaths), within = within, interval = interval)
override def createRouter(system: ActorSystem): Router =
new Router(TailChoppingRoutingLogic(system.scheduler, within, interval, system.dispatchers.lookup(routerDispatcher)))
/**
* Setting the dispatcher to be used for the router head actor, which handles
* router management messages
*/
def withDispatcher(dispatcherId: String): TailChoppingGroup = copy(routerDispatcher = dispatcherId)
}

View file

@ -0,0 +1,100 @@
/**
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.actor
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicInteger
import akka.util.Timeout
import org.openjdk.jmh.annotations._
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._
import scala.concurrent.{ Await, Promise }
/*
[info] Benchmark (ratio) (to) Mode Samples Score Score error Units
[info] a.a.ScheduleBenchmark.multipleScheduleOnce 0.1 4 thrpt 40 397174.273 18707.983 ops/s
[info] a.a.ScheduleBenchmark.multipleScheduleOnce 0.1 16 thrpt 40 89385.115 3198.783 ops/s
[info] a.a.ScheduleBenchmark.multipleScheduleOnce 0.1 64 thrpt 40 26152.329 2291.895 ops/s
[info] a.a.ScheduleBenchmark.multipleScheduleOnce 0.35 4 thrpt 40 383100.418 15052.818 ops/s
[info] a.a.ScheduleBenchmark.multipleScheduleOnce 0.35 16 thrpt 40 83574.143 6612.393 ops/s
[info] a.a.ScheduleBenchmark.multipleScheduleOnce 0.35 64 thrpt 40 20509.715 2814.356 ops/s
[info] a.a.ScheduleBenchmark.multipleScheduleOnce 0.9 4 thrpt 40 367227.500 16169.665 ops/s
[info] a.a.ScheduleBenchmark.multipleScheduleOnce 0.9 16 thrpt 40 72611.445 4086.267 ops/s
[info] a.a.ScheduleBenchmark.multipleScheduleOnce 0.9 64 thrpt 40 7332.554 1087.250 ops/s
[info] a.a.ScheduleBenchmark.oneSchedule 0.1 4 thrpt 40 1040918.731 21830.348 ops/s
[info] a.a.ScheduleBenchmark.oneSchedule 0.1 16 thrpt 40 1036284.894 26962.984 ops/s
[info] a.a.ScheduleBenchmark.oneSchedule 0.1 64 thrpt 40 944350.638 32055.335 ops/s
[info] a.a.ScheduleBenchmark.oneSchedule 0.35 4 thrpt 40 1045371.779 34943.155 ops/s
[info] a.a.ScheduleBenchmark.oneSchedule 0.35 16 thrpt 40 954663.161 18032.730 ops/s
[info] a.a.ScheduleBenchmark.oneSchedule 0.35 64 thrpt 40 739593.387 21132.531 ops/s
[info] a.a.ScheduleBenchmark.oneSchedule 0.9 4 thrpt 40 1046392.800 29542.291 ops/s
[info] a.a.ScheduleBenchmark.oneSchedule 0.9 16 thrpt 40 820986.574 22058.708 ops/s
[info] a.a.ScheduleBenchmark.oneSchedule 0.9 64 thrpt 40 210115.907 14176.402 ops/s
*/
@State(Scope.Benchmark)
@BenchmarkMode(Array(Mode.Throughput))
@Fork(2)
@Warmup(iterations = 10, time = 1700, timeUnit = TimeUnit.MILLISECONDS)
@Measurement(iterations = 20, time = 1700, timeUnit = TimeUnit.MILLISECONDS)
class ScheduleBenchmark {
implicit val system: ActorSystem = ActorSystem()
val scheduler: Scheduler = system.scheduler
val interval: FiniteDuration = 25.millis
val within: FiniteDuration = 2.seconds
implicit val timeout: Timeout = Timeout(within)
@Param(Array("4", "16", "64"))
var to = 0
@Param(Array("0.1", "0.35", "0.9"))
var ratio = 0d
var winner: Int = _
var promise: Promise[Any] = _
@Setup(Level.Iteration)
def setup() {
winner = (to * ratio + 1).toInt
promise = Promise[Any]()
}
@TearDown
def shutdown() {
system.shutdown()
system.awaitTermination()
}
def op(idx: Int) = if (idx == winner) promise.trySuccess(idx) else idx
@Benchmark
def oneSchedule = {
val aIdx = new AtomicInteger(1)
val tryWithNext = scheduler.schedule(0.millis, interval) {
val idx = aIdx.getAndIncrement
if (idx <= to) op(idx)
}
promise.future.onComplete {
case _
tryWithNext.cancel()
}
Await.result(promise.future, within)
}
@Benchmark
def multipleScheduleOnce = {
val tryWithNext = (1 to to).foldLeft(0.millis -> List[Cancellable]()) {
case ((interv, c), idx)
(interv + interval, scheduler.scheduleOnce(interv) {
op(idx)
} :: c)
}._2
promise.future.onComplete {
case _
tryWithNext.foreach(_.cancel())
}
Await.result(promise.future, within)
}
}

View file

@ -61,6 +61,8 @@ import akka.routing.ScatterGatherFirstCompletedGroup;
import akka.routing.ScatterGatherFirstCompletedPool;
import akka.routing.BalancingPool;
import akka.routing.SmallestMailboxPool;
import akka.routing.TailChoppingGroup;
import akka.routing.TailChoppingPool;
//#imports2
@ -275,39 +277,66 @@ public class RouterDocTest {
"router20");
//#scatter-gather-group-2
//#consistent-hashing-pool-1
//#tail-chopping-pool-1
ActorRef router21 =
getContext().actorOf(FromConfig.getInstance().props(
Props.create(Worker.class)), "router21");
//#tail-chopping-pool-1
//#tail-chopping-pool-2
FiniteDuration within3 = FiniteDuration.create(10, TimeUnit.SECONDS);
FiniteDuration interval = FiniteDuration.create(20, TimeUnit.MILLISECONDS);
ActorRef router22 =
getContext().actorOf(new TailChoppingPool(5, within3, interval).props(
Props.create(Worker.class)), "router22");
//#tail-chopping-pool-2
//#tail-chopping-group-1
ActorRef router23 =
getContext().actorOf(FromConfig.getInstance().props(), "router23");
//#tail-chopping-group-1
//#tail-chopping-group-2
FiniteDuration within4 = FiniteDuration.create(10, TimeUnit.SECONDS);
FiniteDuration interval2 = FiniteDuration.create(20, TimeUnit.MILLISECONDS);
ActorRef router24 =
getContext().actorOf(new TailChoppingGroup(paths, within4, interval2).props(),
"router24");
//#tail-chopping-group-2
//#consistent-hashing-pool-1
ActorRef router25 =
getContext().actorOf(FromConfig.getInstance().props(Props.create(Worker.class)),
"router21");
"router25");
//#consistent-hashing-pool-1
//#consistent-hashing-pool-2
ActorRef router22 =
ActorRef router26 =
getContext().actorOf(new ConsistentHashingPool(5).props(
Props.create(Worker.class)), "router22");
Props.create(Worker.class)), "router26");
//#consistent-hashing-pool-2
//#consistent-hashing-group-1
ActorRef router23 =
getContext().actorOf(FromConfig.getInstance().props(), "router23");
ActorRef router27 =
getContext().actorOf(FromConfig.getInstance().props(), "router27");
//#consistent-hashing-group-1
//#consistent-hashing-group-2
ActorRef router24 =
getContext().actorOf(new ConsistentHashingGroup(paths).props(), "router24");
ActorRef router28 =
getContext().actorOf(new ConsistentHashingGroup(paths).props(), "router28");
//#consistent-hashing-group-2
//#resize-pool-1
ActorRef router25 =
ActorRef router29 =
getContext().actorOf(FromConfig.getInstance().props(
Props.create(Worker.class)), "router25");
Props.create(Worker.class)), "router29");
//#resize-pool-1
//#resize-pool-2
DefaultResizer resizer = new DefaultResizer(2, 15);
ActorRef router26 =
ActorRef router30 =
getContext().actorOf(new RoundRobinPool(5).withResizer(resizer).props(
Props.create(Worker.class)), "router26");
Props.create(Worker.class)), "router30");
//#resize-pool-2
public void onReceive(Object msg) {}

View file

@ -31,6 +31,7 @@ The routing logic shipped with Akka are:
* ``akka.routing.SmallestMailboxRoutingLogic``
* ``akka.routing.BroadcastRoutingLogic``
* ``akka.routing.ScatterGatherFirstCompletedRoutingLogic``
* ``akka.routing.TailChoppingRoutingLogic``
* ``akka.routing.ConsistentHashingRoutingLogic``
We create the routees as ordinary child actors wrapped in ``ActorRefRoutee``. We watch
@ -370,6 +371,40 @@ ScatterGatherFirstCompletedGroup defined in code:
.. includecode:: code/docs/jrouting/RouterDocTest.java
:include: paths,scatter-gather-group-2
TailChoppingPool and TailChoppingGroup
--------------------------------------
The TailChoppingRouter will first send the message to one, randomly picked, routee
and then after a small delay to to a second routee (picked randomly from the remaining routees) and so on.
It waits for first reply it gets back and forwards it back to original sender. Other replies are discarded.
The goal of this router is to decrease latency by performing redundant queries to multiple routees, assuming that
one of the other actors may still be faster to respond than the initial one.
This optimisation was described nicely in a blog post by Peter Bailis:
`Doing redundant work to speed up distributed queries <http://www.bailis.org/blog/doing-redundant-work-to-speed-up-distributed-queries/>`_.
TailChoppingPool defined in configuration:
.. includecode:: ../scala/code/docs/routing/RouterDocSpec.scala#config-tail-chopping-pool
.. includecode:: code/docs/jrouting/RouterDocTest.java#tail-chopping-pool-1
TailChoppingPool defined in code:
.. includecode:: code/docs/jrouting/RouterDocTest.java#tail-chopping-pool-2
TailChoppingGroup defined in configuration:
.. includecode:: ../scala/code/docs/routing/RouterDocSpec.scala#config-tail-chopping-group
.. includecode:: code/docs/jrouting/RouterDocTest.java#tail-chopping-group-1
TailChoppingGroup defined in code:
.. includecode:: code/docs/jrouting/RouterDocTest.java
:include: paths,tail-chopping-group-2
ConsistentHashingPool and ConsistentHashingGroup
------------------------------------------------

View file

@ -21,6 +21,8 @@ import akka.routing.ScatterGatherFirstCompletedGroup
import akka.routing.RandomGroup
import akka.routing.ScatterGatherFirstCompletedPool
import akka.routing.BalancingPool
import akka.routing.TailChoppingGroup
import akka.routing.TailChoppingPool
object RouterDocSpec {
@ -129,9 +131,31 @@ akka.actor.deployment {
}
#//#config-scatter-gather-group
#//#config-consistent-hashing-pool
#//#config-tail-chopping-pool
akka.actor.deployment {
/parent/router21 {
router = tail-chopping-pool
nr-of-instances = 5
within = 10 seconds
tail-chopping-router.interval = 20 milliseconds
}
}
#//#config-tail-chopping-pool
#//#config-tail-chopping-group
akka.actor.deployment {
/parent/router23 {
router = tail-chopping-group
routees.paths = ["/user/workers/w1", "/user/workers/w2", "/user/workers/w3"]
within = 10 seconds
tail-chopping-router.interval = 20 milliseconds
}
}
#//#config-tail-chopping-group
#//#config-consistent-hashing-pool
akka.actor.deployment {
/parent/router25 {
router = consistent-hashing-pool
nr-of-instances = 5
virtual-nodes-factor = 10
@ -141,7 +165,7 @@ akka.actor.deployment {
#//#config-consistent-hashing-group
akka.actor.deployment {
/parent/router23 {
/parent/router27 {
router = consistent-hashing-group
routees.paths = ["/user/workers/w1", "/user/workers/w2", "/user/workers/w3"]
virtual-nodes-factor = 10
@ -173,7 +197,7 @@ akka.actor.deployment {
#//#config-resize-pool
akka.actor.deployment {
/parent/router25 {
/parent/router29 {
router = round-robin-pool
resizer {
lower-bound = 2
@ -356,37 +380,59 @@ router-dispatcher {}
within = 10.seconds).props(), "router20")
//#scatter-gather-group-2
//#consistent-hashing-pool-1
//#tail-chopping-pool-1
val router21: ActorRef =
context.actorOf(FromConfig.props(Props[Worker]), "router21")
//#tail-chopping-pool-1
//#tail-chopping-pool-2
val router22: ActorRef =
context.actorOf(TailChoppingPool(5, within = 10.seconds, interval = 20.millis).
props(Props[Worker]), "router22")
//#tail-chopping-pool-2
//#tail-chopping-group-1
val router23: ActorRef =
context.actorOf(FromConfig.props(), "router23")
//#tail-chopping-group-1
//#tail-chopping-group-2
val router24: ActorRef =
context.actorOf(TailChoppingGroup(paths,
within = 10.seconds, interval = 20.millis).props(), "router24")
//#tail-chopping-group-2
//#consistent-hashing-pool-1
val router25: ActorRef =
context.actorOf(FromConfig.props(Props[Worker]), "router25")
//#consistent-hashing-pool-1
//#consistent-hashing-pool-2
val router22: ActorRef =
val router26: ActorRef =
context.actorOf(ConsistentHashingPool(5).props(Props[Worker]),
"router22")
"router26")
//#consistent-hashing-pool-2
//#consistent-hashing-group-1
val router23: ActorRef =
context.actorOf(FromConfig.props(), "router23")
val router27: ActorRef =
context.actorOf(FromConfig.props(), "router27")
//#consistent-hashing-group-1
//#consistent-hashing-group-2
val router24: ActorRef =
context.actorOf(ConsistentHashingGroup(paths).props(), "router24")
val router28: ActorRef =
context.actorOf(ConsistentHashingGroup(paths).props(), "router28")
//#consistent-hashing-group-2
//#resize-pool-1
val router25: ActorRef =
context.actorOf(FromConfig.props(Props[Worker]), "router25")
val router29: ActorRef =
context.actorOf(FromConfig.props(Props[Worker]), "router29")
//#resize-pool-1
//#resize-pool-2
val resizer = DefaultResizer(lowerBound = 2, upperBound = 15)
val router26: ActorRef =
val router30: ActorRef =
context.actorOf(RoundRobinPool(5, Some(resizer)).props(Props[Worker]),
"router26")
"router30")
//#resize-pool-2
def receive = {

View file

@ -31,6 +31,7 @@ The routing logic shipped with Akka are:
* ``akka.routing.SmallestMailboxRoutingLogic``
* ``akka.routing.BroadcastRoutingLogic``
* ``akka.routing.ScatterGatherFirstCompletedRoutingLogic``
* ``akka.routing.TailChoppingRoutingLogic``
* ``akka.routing.ConsistentHashingRoutingLogic``
We create the routees as ordinary child actors wrapped in ``ActorRefRoutee``. We watch
@ -369,6 +370,40 @@ ScatterGatherFirstCompletedGroup defined in code:
.. includecode:: code/docs/routing/RouterDocSpec.scala
:include: paths,scatter-gather-group-2
TailChoppingPool and TailChoppingGroup
--------------------------------------
The TailChoppingRouter will first send the message to one, randomly picked, routee
and then after a small delay to to a second routee (picked randomly from the remaining routees) and so on.
It waits for first reply it gets back and forwards it back to original sender. Other replies are discarded.
The goal of this router is to decrease latency by performing redundant queries to multiple routees, assuming that
one of the other actors may still be faster to respond than the initial one.
This optimisation was described nicely in a blog post by Peter Bailis:
`Doing redundant work to speed up distributed queries <http://www.bailis.org/blog/doing-redundant-work-to-speed-up-distributed-queries/>`_.
TailChoppingPool defined in configuration:
.. includecode:: code/docs/routing/RouterDocSpec.scala#config-tail-chopping-pool
.. includecode:: code/docs/routing/RouterDocSpec.scala#tail-chopping-pool-1
TailChoppingPool defined in code:
.. includecode:: code/docs/routing/RouterDocSpec.scala#tail-chopping-pool-2
TailChoppingGroup defined in configuration:
.. includecode:: code/docs/routing/RouterDocSpec.scala#config-tail-chopping-group
.. includecode:: code/docs/routing/RouterDocSpec.scala#tail-chopping-group-1
TailChoppingGroup defined in code:
.. includecode:: code/docs/routing/RouterDocSpec.scala
:include: paths,tail-chopping-group-2
ConsistentHashingPool and ConsistentHashingGroup
------------------------------------------------