diff --git a/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/RouterTest.java b/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/RouterTest.java index 42de583785..ea3a904134 100644 --- a/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/RouterTest.java +++ b/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/RouterTest.java @@ -25,7 +25,7 @@ import akka.actor.typed.receptionist.ServiceKey; public class RouterTest { - static // #pool + static // #routee class Worker { interface Command {} @@ -54,12 +54,14 @@ public class RouterTest { } } - // #pool + // #routee static Behavior showPoolRouting() { - return Behaviors.setup( + return + // #pool + // This would be defined within your actor class + Behaviors.setup( context -> { - // #pool int poolSize = 4; PoolRouter pool = Routers.pool( @@ -87,7 +89,10 @@ public class RouterTest { // #strategy return Behaviors.empty(); + // #pool }); + // #pool + } static Behavior showGroupRouting() { @@ -95,9 +100,11 @@ public class RouterTest { ServiceKey serviceKey = ServiceKey.create(Worker.Command.class, "log-worker"); // #group - return Behaviors.setup( + return + // #group + Behaviors.setup( context -> { - // #group + // this would likely happen elsewhere - if we create it locally we // can just as well use a pool ActorRef worker = context.spawn(Worker.create(), "worker"); @@ -106,15 +113,16 @@ public class RouterTest { GroupRouter group = Routers.group(serviceKey); ActorRef router = context.spawn(group, "worker-group"); - // the group router will stash messages until it sees the first listing of registered + // the group router will stash messages until it sees the first listing of + // registered // services from the receptionist, so it is safe to send messages right away for (int i = 0; i < 10; i++) { router.tell(new Worker.DoLog("msg " + i)); } - // #group return Behaviors.empty(); }); + // #group } public static void main(String[] args) { diff --git a/akka-actor-typed-tests/src/test/scala/docs/akka/typed/RouterSpec.scala b/akka-actor-typed-tests/src/test/scala/docs/akka/typed/RouterSpec.scala index 8b48f37312..529cbd3392 100644 --- a/akka-actor-typed-tests/src/test/scala/docs/akka/typed/RouterSpec.scala +++ b/akka-actor-typed-tests/src/test/scala/docs/akka/typed/RouterSpec.scala @@ -10,13 +10,12 @@ import akka.actor.testkit.typed.scaladsl.{ LogCapturing, ScalaTestWithActorTestK import akka.actor.typed.{ Behavior, SupervisorStrategy } import akka.actor.typed.receptionist.{ Receptionist, ServiceKey } import akka.actor.typed.scaladsl.{ Behaviors, Routers } -import org.scalatest.wordspec.AnyWordSpecLike // #pool - +import org.scalatest.wordspec.AnyWordSpecLike object RouterSpec { - // #pool + // #routee object Worker { sealed trait Command case class DoLog(text: String) extends Command @@ -32,11 +31,14 @@ object RouterSpec { } } - // #pool + // #routee + // This code is extra indented for visualization purposes + // format: OFF // #group - val serviceKey = ServiceKey[Worker.Command]("log-worker") + val serviceKey = ServiceKey[Worker.Command]("log-worker") // #group + // format: ON } class RouterSpec extends ScalaTestWithActorTestKit("akka.loglevel=warning") with AnyWordSpecLike with LogCapturing { @@ -54,36 +56,42 @@ class RouterSpec extends ScalaTestWithActorTestKit("akka.loglevel=warning") with def DoLog(text: String) = RouterSpec.Worker.DoLog(text) } - spawn(Behaviors.setup[Unit] { ctx => + spawn( // #pool - val pool = Routers.pool(poolSize = 4)( - // make sure the workers are restarted if they fail - Behaviors.supervise(Worker()).onFailure[Exception](SupervisorStrategy.restart)) - val router = ctx.spawn(pool, "worker-pool") + // This would be defined within your actor object + Behaviors.setup[Unit] { ctx => + val pool = Routers.pool(poolSize = 4) { + // make sure the workers are restarted if they fail + Behaviors.supervise(Worker()).onFailure[Exception](SupervisorStrategy.restart) + } + val router = ctx.spawn(pool, "worker-pool") + + (0 to 10).foreach { n => + router ! Worker.DoLog(s"msg $n") + } + // #pool + + // #pool-dispatcher + // make sure workers use the default blocking IO dispatcher + val blockingPool = pool.withRouteeProps(routeeProps = DispatcherSelector.blocking()) + // spawn head router using the same executor as the parent + val blockingRouter = ctx.spawn(blockingPool, "blocking-pool", DispatcherSelector.sameAsParent()) + // #pool-dispatcher + + blockingRouter ! Worker.DoLog("msg") + + // #strategy + val alternativePool = pool.withPoolSize(2).withRoundRobinRouting() + // #strategy + + val alternativeRouter = ctx.spawn(alternativePool, "alternative-pool") + alternativeRouter ! Worker.DoLog("msg") + //#pool + Behaviors.empty - (0 to 10).foreach { n => - router ! Worker.DoLog(s"msg $n") } - // #pool - - // #pool-dispatcher - // make sure workers use the default blocking IO dispatcher - val blockingPool = pool.withRouteeProps(routeeProps = DispatcherSelector.blocking()) - // spawn head router using the same executor as the parent - val blockingRouter = ctx.spawn(blockingPool, "blocking-pool", DispatcherSelector.sameAsParent()) - // #pool-dispatcher - - blockingRouter ! Worker.DoLog("msg") - - // #strategy - val alternativePool = pool.withPoolSize(2).withRoundRobinRouting() - // #strategy - - val alternativeRouter = ctx.spawn(alternativePool, "alternative-pool") - alternativeRouter ! Worker.DoLog("msg") - - Behaviors.empty - }) + //#pool + ) probe.receiveMessages(11) } @@ -98,25 +106,27 @@ class RouterSpec extends ScalaTestWithActorTestKit("akka.loglevel=warning") with def DoLog(text: String) = RouterSpec.Worker.DoLog(text) } - spawn(Behaviors.setup[Unit] { ctx => + spawn( // #group - // this would likely happen elsewhere - if we create it locally we - // can just as well use a pool - val worker = ctx.spawn(Worker(), "worker") - ctx.system.receptionist ! Receptionist.Register(serviceKey, worker) + Behaviors.setup[Unit] { ctx => + // this would likely happen elsewhere - if we create it locally we + // can just as well use a pool + val worker = ctx.spawn(Worker(), "worker") + ctx.system.receptionist ! Receptionist.Register(serviceKey, worker) - val group = Routers.group(serviceKey) - val router = ctx.spawn(group, "worker-group") + val group = Routers.group(serviceKey) + val router = ctx.spawn(group, "worker-group") - // the group router will stash messages until it sees the first listing of registered - // services from the receptionist, so it is safe to send messages right away - (0 to 10).foreach { n => - router ! Worker.DoLog(s"msg $n") + // the group router will stash messages until it sees the first listing of registered + // services from the receptionist, so it is safe to send messages right away + (0 to 10).foreach { n => + router ! Worker.DoLog(s"msg $n") + } + + Behaviors.empty } // #group - - Behaviors.empty - }) + ) probe.receiveMessages(10) } diff --git a/akka-docs/src/main/paradox/typed/routers.md b/akka-docs/src/main/paradox/typed/routers.md index a8d43d0047..4aea1048e6 100644 --- a/akka-docs/src/main/paradox/typed/routers.md +++ b/akka-docs/src/main/paradox/typed/routers.md @@ -16,17 +16,17 @@ To use Akka Actor Typed, you must add the following dependency in your project: ## Introduction -In some cases it is useful to distribute messages of the same type over a set of actors, so that messages can be -processed in parallel - a single actor will only process one message at a time. +In some cases it is useful to distribute messages of the same type over a set of actors, so that messages can be +processed in parallel - a single actor will only process one message at a time. The router itself is a behavior that is spawned into a running actor that will then forward any message sent to it to one final recipient out of the set of routees. -There are two kinds of routers included in Akka Typed - the pool router and the group router. +There are two kinds of routers included in Akka Typed - the pool router and the group router. ## Pool Router -The pool router is created with a routee `Behavior` and spawns a number of children with that behavior which it will +The pool router is created with a routee `Behavior` and spawns a number of children with that behavior which it will then forward messages to. If a child is stopped the pool router removes it from its set of routees. When the last child stops the router itself stops. @@ -34,6 +34,16 @@ To make a resilient router that deals with failures the routee `Behavior` must b As actor children are always local the routees are never spread across a cluster with a pool router. +Let's first introduce the routee: + +Scala +: @@snip [RouterSpec.scala](/akka-actor-typed-tests/src/test/scala/docs/akka/typed/RouterSpec.scala) { #routee } + +Java +: @@snip [RouterTest.java](/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/RouterTest.java) { #routee } + +After having defined the routee, we can now concentrate on configuring the router itself. Note again the the router is an Actor in itself: + Scala : @@snip [RouterSpec.scala](/akka-actor-typed-tests/src/test/scala/docs/akka/typed/RouterSpec.scala) { #pool } @@ -61,7 +71,7 @@ Since the receptionist is used this means the group router is cluster-aware out messages to registered actors on any node in the cluster that is reachable. If no reachable actor exists the router will fallback and route messages to actors on nodes marked as unreachable. -That the receptionist is used also means that the set of routees is eventually consistent, and that immediately when +That the receptionist is used also means that the set of routees is eventually consistent, and that immediately when the group router is started the set of routees it knows about is empty, until it has seen a listing from the receptionist it stashes incoming messages and forwards them as soon as it gets a listing from the receptionist. @@ -74,7 +84,6 @@ Scala Java : @@snip [RouterTest.java](/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/RouterTest.java) { #group } - ## Routing strategies There are three different strategies for selecting which routee a message is forwarded to that can be selected @@ -84,7 +93,7 @@ Scala : @@snip [RouterSpec.scala](/akka-actor-typed-tests/src/test/scala/docs/akka/typed/RouterSpec.scala) { #strategy } Java -: @@snip [RouterTest.java](/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/RouterTest.java) { #strategy } +: @@snip [RouterTest.java](/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/RouterTest.java) { #strategy } ### Round Robin @@ -98,7 +107,6 @@ This is the default for pool routers as the pool of routees is expected to remai An optional parameter `preferLocalRoutees` can be used for this strategy. Routers will only use routees located in local actor system if `preferLocalRoutees` is true and local routees do exist. The default value for this parameter is false. - ### Random Randomly selects a routee when a message is sent through the router. @@ -108,9 +116,9 @@ This is the default for group routers as the group of routees is expected to cha An optional parameter `preferLocalRoutees` can be used for this strategy. Routers will only use routees located in local actor system if `preferLocalRoutees` is true and local routees do exist. The default value for this parameter is false. ### Consistent Hashing - + Uses [consistent hashing](https://en.wikipedia.org/wiki/Consistent_hashing) to select a routee based -on the sent message. This [article](http://www.tom-e-white.com/2007/11/consistent-hashing.html) +on the sent message. This [article](http://www.tom-e-white.com/2007/11/consistent-hashing.html) gives good insight into how consistent hashing is implemented. Currently you have to define hashMapping of the router to map incoming messages to their consistent @@ -125,7 +133,7 @@ See also @ref[Akka Cluster Sharding](cluster-sharding.md) which provides stable Note that if the routees are sharing a resource, the resource will determine if increasing the number of actors will actually give higher throughput or faster answers. For example if the routees are CPU bound actors -it will not give better performance to create more routees than there are threads to execute the actors. +it will not give better performance to create more routees than there are threads to execute the actors. Since the router itself is an actor and has a mailbox this means that messages are routed sequentially to the routees where it can be processed in parallel (depending on the available threads in the dispatcher).