diff --git a/akka-actor-tests/src/test/scala/akka/routing/TailChoppingSpec.scala b/akka-actor-tests/src/test/scala/akka/routing/TailChoppingSpec.scala new file mode 100644 index 0000000000..69e8fa24e0 --- /dev/null +++ b/akka-actor-tests/src/test/scala/akka/routing/TailChoppingSpec.scala @@ -0,0 +1,123 @@ +/** + * Copyright (C) 2009-2014 Typesafe Inc. + */ +package akka.routing + +import java.util.concurrent.atomic.AtomicInteger +import akka.actor.Status.Failure +import scala.concurrent.Await +import scala.concurrent.duration._ +import akka.actor.{ ActorRef, Props, Actor, ActorSystem } +import akka.pattern.{ AskTimeoutException, ask } +import akka.testkit._ + +object TailChoppingSpec { + def newActor(id: Int, sleepTime: Duration)(implicit system: ActorSystem) = + system.actorOf(Props(new Actor { + var times: Int = _ + + def receive = { + case "stop" ⇒ context.stop(self) + case "times" ⇒ sender() ! times + case x ⇒ + times += 1 + Thread sleep sleepTime.toMillis + sender ! "ack" + } + }), "Actor:" + id) +} + +@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) +class TailChoppingSpec extends AkkaSpec with DefaultTimeout with ImplicitSender { + import TailChoppingSpec._ + + def oneOfShouldEqual(what: Any, default: Any, ref: ActorRef*)(f: ActorRef ⇒ Any) { + val results = ref.map(p ⇒ f(p)) + results.count(_ == what) should equal(1) + results.count(_ == default) should equal(results.size - 1) + } + + def allShouldEqual(what: Any, ref: ActorRef*)(f: ActorRef ⇒ Any) { + val results = ref.map(p ⇒ f(p)) + results.count(_ == what) should equal(results.size) + } + + "Tail-chopping group" must { + + "deliver a broadcast message using the !" in { + val doneLatch = new TestLatch(2) + + val counter1 = new AtomicInteger + val actor1 = system.actorOf(Props(new Actor { + def receive = { + case "end" ⇒ doneLatch.countDown() + case msg: Int ⇒ counter1.addAndGet(msg) + } + })) + + val counter2 = new AtomicInteger + val actor2 = system.actorOf(Props(new Actor { + def receive = { + case "end" ⇒ doneLatch.countDown() + case msg: Int ⇒ counter2.addAndGet(msg) + } + })) + + val paths = List(actor1, actor2).map(_.path.toString) + val routedActor = system.actorOf(TailChoppingGroup(paths, within = 1.second, interval = 100.millisecond).props()) + routedActor ! Broadcast(1) + routedActor ! Broadcast("end") + + Await.ready(doneLatch, TestLatch.DefaultTimeout) + + counter1.get should be(1) + counter2.get should be(1) + } + + "return response from second actor after inactivity from first one" in { + val actor1 = newActor(1, 1.millis) + val actor2 = newActor(2, 1.millis) + val probe = TestProbe() + val paths = List(actor1, actor2).map(_.path.toString) + val routedActor = system.actorOf(TailChoppingGroup(paths, within = 1.seconds, interval = 50.millisecond).props()) + + probe.send(routedActor, "") + probe.expectMsg("ack") + + oneOfShouldEqual(1, 1, actor1, actor2)(ref ⇒ Await.result(ref ? "times", timeout.duration)) + + routedActor ! Broadcast("stop") + } + + "throw exception if no result will arrive within given time" in { + val actor1 = newActor(3, 500.millis) + val actor2 = newActor(4, 500.millis) + val probe = TestProbe() + val paths = List(actor1, actor2).map(_.path.toString) + val routedActor = system.actorOf(TailChoppingGroup(paths, within = 300.milliseconds, + interval = 50.milliseconds).props()) + + probe.send(routedActor, "") + probe.expectMsgPF() { + case Failure(timeoutEx: AskTimeoutException) ⇒ + } + + allShouldEqual(1, actor1, actor2)(ref ⇒ Await.result(ref ? "times", timeout.duration)) + + routedActor ! Broadcast("stop") + } + + "reply ASAP" in { + val actor1 = newActor(5, 1.seconds) + val actor2 = newActor(6, 4.seconds) + val probe = TestProbe() + val paths = List(actor1, actor2).map(_.path.toString) + val routedActor = system.actorOf(TailChoppingGroup(paths, within = 5.seconds, interval = 100.milliseconds).props()) + + probe.send(routedActor, "") + probe.expectMsg(max = 2.seconds, "ack") + + routedActor ! Broadcast("stop") + } + } +} diff --git a/akka-actor/src/main/resources/reference.conf b/akka-actor/src/main/resources/reference.conf index ed76edc77e..0cc63e9fd3 100644 --- a/akka-actor/src/main/resources/reference.conf +++ b/akka-actor/src/main/resources/reference.conf @@ -109,6 +109,8 @@ akka { broadcast-group = "akka.routing.BroadcastGroup" scatter-gather-pool = "akka.routing.ScatterGatherFirstCompletedPool" scatter-gather-group = "akka.routing.ScatterGatherFirstCompletedGroup" + tail-chopping-pool = "akka.routing.TailChoppingPool" + tail-chopping-group = "akka.routing.TailChoppingGroup" consistent-hashing-pool = "akka.routing.ConsistentHashingPool" consistent-hashing-group = "akka.routing.ConsistentHashingGroup" } @@ -163,6 +165,11 @@ akka { # number of virtual nodes per node for consistent-hashing router virtual-nodes-factor = 10 + tail-chopping-router { + # interval is duration between sending message to next routee + interval = 10 milliseconds + } + routees { # Alternatively to giving nr-of-instances you can specify the full # paths of those actors which should be routed to. This setting takes diff --git a/akka-actor/src/main/scala/akka/routing/TailChopping.scala b/akka-actor/src/main/scala/akka/routing/TailChopping.scala new file mode 100644 index 0000000000..4d51bdf90f --- /dev/null +++ b/akka-actor/src/main/scala/akka/routing/TailChopping.scala @@ -0,0 +1,229 @@ +/** + * Copyright (C) 2009-2014 Typesafe Inc. + */ +package akka.routing + +import java.util.concurrent.atomic.AtomicInteger + +import scala.collection.immutable +import akka.actor._ +import akka.dispatch.Dispatchers +import com.typesafe.config.Config +import akka.japi.Util.immutableSeq +import scala.concurrent.{ ExecutionContext, Promise } +import akka.pattern.{ AskTimeoutException, after, ask, pipe } +import scala.concurrent.duration._ +import akka.util.Timeout +import akka.util.Helpers.ConfigOps + +import scala.util.Random + +/** + * Sends the message to a first, random picked, routee, + * then wait a specified `interval` and then send to a second, random picked, and so on till one full cycle. + * + * @param scheduler schedules sending messages to routees + * + * @param within expecting at least one reply within this duration, otherwise + * it will reply with [[akka.pattern.AskTimeoutException]] in a [[akka.actor.Status.Failure]] + * + * @param interval duration after which next routee will be picked + * + * @param context execution context used by scheduler + */ +@SerialVersionUID(1L) +final case class TailChoppingRoutingLogic(scheduler: Scheduler, within: FiniteDuration, + interval: FiniteDuration, context: ExecutionContext) extends RoutingLogic { + override def select(message: Any, routees: immutable.IndexedSeq[Routee]): Routee = { + if (routees.isEmpty) NoRoutee + else TailChoppingRoutees(scheduler, routees, within, interval)(context) + } +} + +/** + * INTERNAL API + */ +@SerialVersionUID(1L) +private[akka] final case class TailChoppingRoutees( + scheduler: Scheduler, routees: immutable.IndexedSeq[Routee], + within: FiniteDuration, interval: FiniteDuration)(implicit ec: ExecutionContext) extends Routee { + + override def send(message: Any, sender: ActorRef): Unit = { + implicit val timeout = Timeout(within) + val promise = Promise[Any]() + val shuffled = Random.shuffle(routees) + + val aIdx = new AtomicInteger() + val size = shuffled.length + + val tryWithNext = scheduler.schedule(0.millis, interval) { + val idx = aIdx.getAndIncrement + if (idx < size) { + shuffled(idx) match { + case ActorRefRoutee(ref) ⇒ + promise.tryCompleteWith(ref.ask(message)) + case ActorSelectionRoutee(sel) ⇒ + promise.tryCompleteWith(sel.ask(message)) + case _ ⇒ + } + } + } + + val sendTimeout = scheduler.scheduleOnce(within)(promise.tryFailure( + new AskTimeoutException(s"Ask timed out on [$sender] after [$within.toMillis} ms]"))) + + val f = promise.future + f.onComplete { + case _ ⇒ + tryWithNext.cancel() + sendTimeout.cancel() + } + f.pipeTo(sender) + } +} + +/** + * A router poll thats sends the message to a first, random picked, routee, + * then wait a specified `interval` and then send to a second, random picked, and so on till one full cycle.. + * + * The configuration parameter trumps the constructor arguments. This means that + * if you provide `nrOfInstances` during instantiation they will be ignored if + * the router is defined in the configuration file for the actor being used. + * + *

Supervision Setup

+ * + * Any routees that are created by a router will be created as the router's children. + * The router is therefore also the children's supervisor. + * + * The supervision strategy of the router actor can be configured with + * [[#withSupervisorStrategy]]. If no strategy is provided, routers default to + * a strategy of “always escalate”. This means that errors are passed up to the + * router's supervisor for handling. + * + * The router's supervisor will treat the error as an error with the router itself. + * Therefore a directive to stop or restart will cause the router itself to stop or + * restart. The router, in turn, will cause its children to stop and restart. + * + * @param nrOfInstances initial number of routees in the pool + * + * @param resizer optional resizer that dynamically adjust the pool size + * + * @param within expecting at least one reply within this duration, otherwise + * it will reply with [[akka.pattern.AskTimeoutException]] in a [[akka.actor.Status.Failure]] + * + * @param interval duration after which next routee will be picked + * + * @param supervisorStrategy strategy for supervising the routees, see 'Supervision Setup' + * + * @param routerDispatcher dispatcher to use for the router head actor, which handles + * supervision, death watch and router management messages + */ +@SerialVersionUID(1L) +final case class TailChoppingPool( + override val nrOfInstances: Int, override val resizer: Option[Resizer] = None, + within: FiniteDuration, + interval: FiniteDuration, + override val supervisorStrategy: SupervisorStrategy = Pool.defaultSupervisorStrategy, + override val routerDispatcher: String = Dispatchers.DefaultDispatcherId, + override val usePoolDispatcher: Boolean = false) + extends Pool with PoolOverrideUnsetConfig[TailChoppingPool] { + + def this(config: Config) = + this( + nrOfInstances = config.getInt("nr-of-instances"), + within = config.getMillisDuration("within"), + interval = config.getMillisDuration("tail-chopping-router.interval"), + resizer = DefaultResizer.fromConfig(config), + usePoolDispatcher = config.hasPath("pool-dispatcher")) + + /** + * Java API + * @param nr initial number of routees in the pool + * @param within expecting at least one reply within this duration, otherwise + * it will reply with [[akka.pattern.AskTimeoutException]] in a [[akka.actor.Status.Failure]] + * @param interval duration after which next routee will be picked + */ + def this(nr: Int, within: FiniteDuration, interval: FiniteDuration) = + this(nrOfInstances = nr, within = within, interval = interval) + + override def createRouter(system: ActorSystem): Router = + new Router(TailChoppingRoutingLogic(system.scheduler, within, + interval, system.dispatchers.lookup(routerDispatcher))) + + /** + * Setting the supervisor strategy to be used for the “head” Router actor. + */ + def withSupervisorStrategy(strategy: SupervisorStrategy): TailChoppingPool = copy(supervisorStrategy = strategy) + + /** + * Setting the resizer to be used. + */ + def withResizer(resizer: Resizer): TailChoppingPool = copy(resizer = Some(resizer)) + + /** + * Setting the dispatcher to be used for the router head actor, which handles + * supervision, death watch and router management messages. + */ + def withDispatcher(dispatcherId: String): TailChoppingPool = copy(routerDispatcher = dispatcherId) + + /** + * Uses the resizer and/or the supervisor strategy of the given Routerconfig + * if this RouterConfig doesn't have one, i.e. the resizer defined in code is used if + * resizer was not defined in config. + */ + override def withFallback(other: RouterConfig): RouterConfig = this.overrideUnsetConfig(other) + +} + +/** + * A router group that sends the message to a first, random picked, routee, + * then wait a specified `interval` and then send to a second, random picked, and so on till one full cycle.. + * + * The configuration parameter trumps the constructor arguments. This means that + * if you provide `paths` during instantiation they will be ignored if + * the router is defined in the configuration file for the actor being used. + * + * @param paths string representation of the actor paths of the routees, messages are + * sent with [[akka.actor.ActorSelection]] to these paths + * + * @param within expecting at least one reply within this duration, otherwise + * it will reply with [[akka.pattern.AskTimeoutException]] in a [[akka.actor.Status.Failure]] + * + * @param interval duration after which next routee will be picked + * + * @param routerDispatcher dispatcher to use for the router head actor, which handles + * router management messages + */ +final case class TailChoppingGroup( + override val paths: immutable.Iterable[String], + within: FiniteDuration, + interval: FiniteDuration, + override val routerDispatcher: String = Dispatchers.DefaultDispatcherId) extends Group { + + def this(config: Config) = + this( + paths = immutableSeq(config.getStringList("routees.paths")), + within = config.getMillisDuration("within"), + interval = config.getMillisDuration("tail-chopping-router.interval")) + + /** + * Java API + * @param routeePaths string representation of the actor paths of the routees, messages are + * sent with [[akka.actor.ActorSelection]] to these paths + * @param within expecting at least one reply within this duration, otherwise + * it will reply with [[akka.pattern.AskTimeoutException]] in a [[akka.actor.Status.Failure]] + * @param interval duration after which next routee will be picked + */ + def this(routeePaths: java.lang.Iterable[String], within: FiniteDuration, interval: FiniteDuration) = + this(paths = immutableSeq(routeePaths), within = within, interval = interval) + + override def createRouter(system: ActorSystem): Router = + new Router(TailChoppingRoutingLogic(system.scheduler, within, interval, system.dispatchers.lookup(routerDispatcher))) + + /** + * Setting the dispatcher to be used for the router head actor, which handles + * router management messages + */ + def withDispatcher(dispatcherId: String): TailChoppingGroup = copy(routerDispatcher = dispatcherId) + +} diff --git a/akka-bench-jmh/src/main/scala/akka/actor/ScheduleBenchmark.scala b/akka-bench-jmh/src/main/scala/akka/actor/ScheduleBenchmark.scala new file mode 100644 index 0000000000..f64fc31357 --- /dev/null +++ b/akka-bench-jmh/src/main/scala/akka/actor/ScheduleBenchmark.scala @@ -0,0 +1,100 @@ +/** + * Copyright (C) 2014 Typesafe Inc. + */ +package akka.actor + +import java.util.concurrent.TimeUnit +import java.util.concurrent.atomic.AtomicInteger + +import akka.util.Timeout +import org.openjdk.jmh.annotations._ + +import scala.concurrent.ExecutionContext.Implicits.global +import scala.concurrent.duration._ +import scala.concurrent.{ Await, Promise } + +/* +[info] Benchmark (ratio) (to) Mode Samples Score Score error Units +[info] a.a.ScheduleBenchmark.multipleScheduleOnce 0.1 4 thrpt 40 397174.273 18707.983 ops/s +[info] a.a.ScheduleBenchmark.multipleScheduleOnce 0.1 16 thrpt 40 89385.115 3198.783 ops/s +[info] a.a.ScheduleBenchmark.multipleScheduleOnce 0.1 64 thrpt 40 26152.329 2291.895 ops/s +[info] a.a.ScheduleBenchmark.multipleScheduleOnce 0.35 4 thrpt 40 383100.418 15052.818 ops/s +[info] a.a.ScheduleBenchmark.multipleScheduleOnce 0.35 16 thrpt 40 83574.143 6612.393 ops/s +[info] a.a.ScheduleBenchmark.multipleScheduleOnce 0.35 64 thrpt 40 20509.715 2814.356 ops/s +[info] a.a.ScheduleBenchmark.multipleScheduleOnce 0.9 4 thrpt 40 367227.500 16169.665 ops/s +[info] a.a.ScheduleBenchmark.multipleScheduleOnce 0.9 16 thrpt 40 72611.445 4086.267 ops/s +[info] a.a.ScheduleBenchmark.multipleScheduleOnce 0.9 64 thrpt 40 7332.554 1087.250 ops/s +[info] a.a.ScheduleBenchmark.oneSchedule 0.1 4 thrpt 40 1040918.731 21830.348 ops/s +[info] a.a.ScheduleBenchmark.oneSchedule 0.1 16 thrpt 40 1036284.894 26962.984 ops/s +[info] a.a.ScheduleBenchmark.oneSchedule 0.1 64 thrpt 40 944350.638 32055.335 ops/s +[info] a.a.ScheduleBenchmark.oneSchedule 0.35 4 thrpt 40 1045371.779 34943.155 ops/s +[info] a.a.ScheduleBenchmark.oneSchedule 0.35 16 thrpt 40 954663.161 18032.730 ops/s +[info] a.a.ScheduleBenchmark.oneSchedule 0.35 64 thrpt 40 739593.387 21132.531 ops/s +[info] a.a.ScheduleBenchmark.oneSchedule 0.9 4 thrpt 40 1046392.800 29542.291 ops/s +[info] a.a.ScheduleBenchmark.oneSchedule 0.9 16 thrpt 40 820986.574 22058.708 ops/s +[info] a.a.ScheduleBenchmark.oneSchedule 0.9 64 thrpt 40 210115.907 14176.402 ops/s + */ +@State(Scope.Benchmark) +@BenchmarkMode(Array(Mode.Throughput)) +@Fork(2) +@Warmup(iterations = 10, time = 1700, timeUnit = TimeUnit.MILLISECONDS) +@Measurement(iterations = 20, time = 1700, timeUnit = TimeUnit.MILLISECONDS) +class ScheduleBenchmark { + implicit val system: ActorSystem = ActorSystem() + val scheduler: Scheduler = system.scheduler + val interval: FiniteDuration = 25.millis + val within: FiniteDuration = 2.seconds + implicit val timeout: Timeout = Timeout(within) + + @Param(Array("4", "16", "64")) + var to = 0 + + @Param(Array("0.1", "0.35", "0.9")) + var ratio = 0d + + var winner: Int = _ + var promise: Promise[Any] = _ + + @Setup(Level.Iteration) + def setup() { + winner = (to * ratio + 1).toInt + promise = Promise[Any]() + } + + @TearDown + def shutdown() { + system.shutdown() + system.awaitTermination() + } + + def op(idx: Int) = if (idx == winner) promise.trySuccess(idx) else idx + + @Benchmark + def oneSchedule = { + val aIdx = new AtomicInteger(1) + val tryWithNext = scheduler.schedule(0.millis, interval) { + val idx = aIdx.getAndIncrement + if (idx <= to) op(idx) + } + promise.future.onComplete { + case _ ⇒ + tryWithNext.cancel() + } + Await.result(promise.future, within) + } + + @Benchmark + def multipleScheduleOnce = { + val tryWithNext = (1 to to).foldLeft(0.millis -> List[Cancellable]()) { + case ((interv, c), idx) ⇒ + (interv + interval, scheduler.scheduleOnce(interv) { + op(idx) + } :: c) + }._2 + promise.future.onComplete { + case _ ⇒ + tryWithNext.foreach(_.cancel()) + } + Await.result(promise.future, within) + } +} \ No newline at end of file diff --git a/akka-docs/rst/java/code/docs/jrouting/RouterDocTest.java b/akka-docs/rst/java/code/docs/jrouting/RouterDocTest.java index b5f5ffa919..2cc54adcb8 100644 --- a/akka-docs/rst/java/code/docs/jrouting/RouterDocTest.java +++ b/akka-docs/rst/java/code/docs/jrouting/RouterDocTest.java @@ -61,6 +61,8 @@ import akka.routing.ScatterGatherFirstCompletedGroup; import akka.routing.ScatterGatherFirstCompletedPool; import akka.routing.BalancingPool; import akka.routing.SmallestMailboxPool; +import akka.routing.TailChoppingGroup; +import akka.routing.TailChoppingPool; //#imports2 @@ -275,39 +277,66 @@ public class RouterDocTest { "router20"); //#scatter-gather-group-2 - //#consistent-hashing-pool-1 + //#tail-chopping-pool-1 ActorRef router21 = + getContext().actorOf(FromConfig.getInstance().props( + Props.create(Worker.class)), "router21"); + //#tail-chopping-pool-1 + + //#tail-chopping-pool-2 + FiniteDuration within3 = FiniteDuration.create(10, TimeUnit.SECONDS); + FiniteDuration interval = FiniteDuration.create(20, TimeUnit.MILLISECONDS); + ActorRef router22 = + getContext().actorOf(new TailChoppingPool(5, within3, interval).props( + Props.create(Worker.class)), "router22"); + //#tail-chopping-pool-2 + + //#tail-chopping-group-1 + ActorRef router23 = + getContext().actorOf(FromConfig.getInstance().props(), "router23"); + //#tail-chopping-group-1 + + //#tail-chopping-group-2 + FiniteDuration within4 = FiniteDuration.create(10, TimeUnit.SECONDS); + FiniteDuration interval2 = FiniteDuration.create(20, TimeUnit.MILLISECONDS); + ActorRef router24 = + getContext().actorOf(new TailChoppingGroup(paths, within4, interval2).props(), + "router24"); + //#tail-chopping-group-2 + + //#consistent-hashing-pool-1 + ActorRef router25 = getContext().actorOf(FromConfig.getInstance().props(Props.create(Worker.class)), - "router21"); + "router25"); //#consistent-hashing-pool-1 //#consistent-hashing-pool-2 - ActorRef router22 = + ActorRef router26 = getContext().actorOf(new ConsistentHashingPool(5).props( - Props.create(Worker.class)), "router22"); + Props.create(Worker.class)), "router26"); //#consistent-hashing-pool-2 //#consistent-hashing-group-1 - ActorRef router23 = - getContext().actorOf(FromConfig.getInstance().props(), "router23"); + ActorRef router27 = + getContext().actorOf(FromConfig.getInstance().props(), "router27"); //#consistent-hashing-group-1 //#consistent-hashing-group-2 - ActorRef router24 = - getContext().actorOf(new ConsistentHashingGroup(paths).props(), "router24"); + ActorRef router28 = + getContext().actorOf(new ConsistentHashingGroup(paths).props(), "router28"); //#consistent-hashing-group-2 //#resize-pool-1 - ActorRef router25 = + ActorRef router29 = getContext().actorOf(FromConfig.getInstance().props( - Props.create(Worker.class)), "router25"); + Props.create(Worker.class)), "router29"); //#resize-pool-1 //#resize-pool-2 DefaultResizer resizer = new DefaultResizer(2, 15); - ActorRef router26 = + ActorRef router30 = getContext().actorOf(new RoundRobinPool(5).withResizer(resizer).props( - Props.create(Worker.class)), "router26"); + Props.create(Worker.class)), "router30"); //#resize-pool-2 public void onReceive(Object msg) {} diff --git a/akka-docs/rst/java/routing.rst b/akka-docs/rst/java/routing.rst index b7d4f34289..ea1d077b82 100644 --- a/akka-docs/rst/java/routing.rst +++ b/akka-docs/rst/java/routing.rst @@ -31,6 +31,7 @@ The routing logic shipped with Akka are: * ``akka.routing.SmallestMailboxRoutingLogic`` * ``akka.routing.BroadcastRoutingLogic`` * ``akka.routing.ScatterGatherFirstCompletedRoutingLogic`` +* ``akka.routing.TailChoppingRoutingLogic`` * ``akka.routing.ConsistentHashingRoutingLogic`` We create the routees as ordinary child actors wrapped in ``ActorRefRoutee``. We watch @@ -370,6 +371,40 @@ ScatterGatherFirstCompletedGroup defined in code: .. includecode:: code/docs/jrouting/RouterDocTest.java :include: paths,scatter-gather-group-2 +TailChoppingPool and TailChoppingGroup +-------------------------------------- + +The TailChoppingRouter will first send the message to one, randomly picked, routee +and then after a small delay to to a second routee (picked randomly from the remaining routees) and so on. +It waits for first reply it gets back and forwards it back to original sender. Other replies are discarded. + +The goal of this router is to decrease latency by performing redundant queries to multiple routees, assuming that +one of the other actors may still be faster to respond than the initial one. + +This optimisation was described nicely in a blog post by Peter Bailis: +`Doing redundant work to speed up distributed queries `_. + +TailChoppingPool defined in configuration: + +.. includecode:: ../scala/code/docs/routing/RouterDocSpec.scala#config-tail-chopping-pool + +.. includecode:: code/docs/jrouting/RouterDocTest.java#tail-chopping-pool-1 + +TailChoppingPool defined in code: + +.. includecode:: code/docs/jrouting/RouterDocTest.java#tail-chopping-pool-2 + +TailChoppingGroup defined in configuration: + +.. includecode:: ../scala/code/docs/routing/RouterDocSpec.scala#config-tail-chopping-group + +.. includecode:: code/docs/jrouting/RouterDocTest.java#tail-chopping-group-1 + +TailChoppingGroup defined in code: + +.. includecode:: code/docs/jrouting/RouterDocTest.java + :include: paths,tail-chopping-group-2 + ConsistentHashingPool and ConsistentHashingGroup ------------------------------------------------ diff --git a/akka-docs/rst/scala/code/docs/routing/RouterDocSpec.scala b/akka-docs/rst/scala/code/docs/routing/RouterDocSpec.scala index 08bba9e7e8..f957253891 100644 --- a/akka-docs/rst/scala/code/docs/routing/RouterDocSpec.scala +++ b/akka-docs/rst/scala/code/docs/routing/RouterDocSpec.scala @@ -21,6 +21,8 @@ import akka.routing.ScatterGatherFirstCompletedGroup import akka.routing.RandomGroup import akka.routing.ScatterGatherFirstCompletedPool import akka.routing.BalancingPool +import akka.routing.TailChoppingGroup +import akka.routing.TailChoppingPool object RouterDocSpec { @@ -128,10 +130,32 @@ akka.actor.deployment { } } #//#config-scatter-gather-group + +#//#config-tail-chopping-pool +akka.actor.deployment { + /parent/router21 { + router = tail-chopping-pool + nr-of-instances = 5 + within = 10 seconds + tail-chopping-router.interval = 20 milliseconds + } +} +#//#config-tail-chopping-pool + +#//#config-tail-chopping-group +akka.actor.deployment { + /parent/router23 { + router = tail-chopping-group + routees.paths = ["/user/workers/w1", "/user/workers/w2", "/user/workers/w3"] + within = 10 seconds + tail-chopping-router.interval = 20 milliseconds + } +} +#//#config-tail-chopping-group #//#config-consistent-hashing-pool akka.actor.deployment { - /parent/router21 { + /parent/router25 { router = consistent-hashing-pool nr-of-instances = 5 virtual-nodes-factor = 10 @@ -141,7 +165,7 @@ akka.actor.deployment { #//#config-consistent-hashing-group akka.actor.deployment { - /parent/router23 { + /parent/router27 { router = consistent-hashing-group routees.paths = ["/user/workers/w1", "/user/workers/w2", "/user/workers/w3"] virtual-nodes-factor = 10 @@ -173,7 +197,7 @@ akka.actor.deployment { #//#config-resize-pool akka.actor.deployment { - /parent/router25 { + /parent/router29 { router = round-robin-pool resizer { lower-bound = 2 @@ -354,39 +378,61 @@ router-dispatcher {} val router20: ActorRef = context.actorOf(ScatterGatherFirstCompletedGroup(paths, within = 10.seconds).props(), "router20") - //#scatter-gather-group-2 + //#scatter-gather-group-2 - //#consistent-hashing-pool-1 + //#tail-chopping-pool-1 val router21: ActorRef = context.actorOf(FromConfig.props(Props[Worker]), "router21") + //#tail-chopping-pool-1 + + //#tail-chopping-pool-2 + val router22: ActorRef = + context.actorOf(TailChoppingPool(5, within = 10.seconds, interval = 20.millis). + props(Props[Worker]), "router22") + //#tail-chopping-pool-2 + + //#tail-chopping-group-1 + val router23: ActorRef = + context.actorOf(FromConfig.props(), "router23") + //#tail-chopping-group-1 + + //#tail-chopping-group-2 + val router24: ActorRef = + context.actorOf(TailChoppingGroup(paths, + within = 10.seconds, interval = 20.millis).props(), "router24") + //#tail-chopping-group-2 + + //#consistent-hashing-pool-1 + val router25: ActorRef = + context.actorOf(FromConfig.props(Props[Worker]), "router25") //#consistent-hashing-pool-1 //#consistent-hashing-pool-2 - val router22: ActorRef = + val router26: ActorRef = context.actorOf(ConsistentHashingPool(5).props(Props[Worker]), - "router22") + "router26") //#consistent-hashing-pool-2 //#consistent-hashing-group-1 - val router23: ActorRef = - context.actorOf(FromConfig.props(), "router23") + val router27: ActorRef = + context.actorOf(FromConfig.props(), "router27") //#consistent-hashing-group-1 //#consistent-hashing-group-2 - val router24: ActorRef = - context.actorOf(ConsistentHashingGroup(paths).props(), "router24") + val router28: ActorRef = + context.actorOf(ConsistentHashingGroup(paths).props(), "router28") //#consistent-hashing-group-2 //#resize-pool-1 - val router25: ActorRef = - context.actorOf(FromConfig.props(Props[Worker]), "router25") + val router29: ActorRef = + context.actorOf(FromConfig.props(Props[Worker]), "router29") //#resize-pool-1 //#resize-pool-2 val resizer = DefaultResizer(lowerBound = 2, upperBound = 15) - val router26: ActorRef = + val router30: ActorRef = context.actorOf(RoundRobinPool(5, Some(resizer)).props(Props[Worker]), - "router26") + "router30") //#resize-pool-2 def receive = { diff --git a/akka-docs/rst/scala/routing.rst b/akka-docs/rst/scala/routing.rst index 22abc6b34e..892e25b0c2 100644 --- a/akka-docs/rst/scala/routing.rst +++ b/akka-docs/rst/scala/routing.rst @@ -31,6 +31,7 @@ The routing logic shipped with Akka are: * ``akka.routing.SmallestMailboxRoutingLogic`` * ``akka.routing.BroadcastRoutingLogic`` * ``akka.routing.ScatterGatherFirstCompletedRoutingLogic`` +* ``akka.routing.TailChoppingRoutingLogic`` * ``akka.routing.ConsistentHashingRoutingLogic`` We create the routees as ordinary child actors wrapped in ``ActorRefRoutee``. We watch @@ -369,6 +370,40 @@ ScatterGatherFirstCompletedGroup defined in code: .. includecode:: code/docs/routing/RouterDocSpec.scala :include: paths,scatter-gather-group-2 +TailChoppingPool and TailChoppingGroup +-------------------------------------- + +The TailChoppingRouter will first send the message to one, randomly picked, routee +and then after a small delay to to a second routee (picked randomly from the remaining routees) and so on. +It waits for first reply it gets back and forwards it back to original sender. Other replies are discarded. + +The goal of this router is to decrease latency by performing redundant queries to multiple routees, assuming that +one of the other actors may still be faster to respond than the initial one. + +This optimisation was described nicely in a blog post by Peter Bailis: +`Doing redundant work to speed up distributed queries `_. + +TailChoppingPool defined in configuration: + +.. includecode:: code/docs/routing/RouterDocSpec.scala#config-tail-chopping-pool + +.. includecode:: code/docs/routing/RouterDocSpec.scala#tail-chopping-pool-1 + +TailChoppingPool defined in code: + +.. includecode:: code/docs/routing/RouterDocSpec.scala#tail-chopping-pool-2 + +TailChoppingGroup defined in configuration: + +.. includecode:: code/docs/routing/RouterDocSpec.scala#config-tail-chopping-group + +.. includecode:: code/docs/routing/RouterDocSpec.scala#tail-chopping-group-1 + +TailChoppingGroup defined in code: + +.. includecode:: code/docs/routing/RouterDocSpec.scala + :include: paths,tail-chopping-group-2 + ConsistentHashingPool and ConsistentHashingGroup ------------------------------------------------