* Refactor example to make it clear #29578 Restructure example * Fix formatting * Right formatting to disable format checker
This commit is contained in:
parent
c931b9bb93
commit
2c659a046e
3 changed files with 91 additions and 65 deletions
|
|
@ -25,7 +25,7 @@ import akka.actor.typed.receptionist.ServiceKey;
|
|||
|
||||
public class RouterTest {
|
||||
|
||||
static // #pool
|
||||
static // #routee
|
||||
class Worker {
|
||||
interface Command {}
|
||||
|
||||
|
|
@ -54,12 +54,14 @@ public class RouterTest {
|
|||
}
|
||||
}
|
||||
|
||||
// #pool
|
||||
// #routee
|
||||
|
||||
static Behavior<Void> showPoolRouting() {
|
||||
return Behaviors.setup(
|
||||
return
|
||||
// #pool
|
||||
// This would be defined within your actor class
|
||||
Behaviors.setup(
|
||||
context -> {
|
||||
// #pool
|
||||
int poolSize = 4;
|
||||
PoolRouter<Worker.Command> pool =
|
||||
Routers.pool(
|
||||
|
|
@ -87,7 +89,10 @@ public class RouterTest {
|
|||
// #strategy
|
||||
|
||||
return Behaviors.empty();
|
||||
// #pool
|
||||
});
|
||||
// #pool
|
||||
|
||||
}
|
||||
|
||||
static Behavior<Void> showGroupRouting() {
|
||||
|
|
@ -95,9 +100,11 @@ public class RouterTest {
|
|||
ServiceKey<Worker.Command> serviceKey = ServiceKey.create(Worker.Command.class, "log-worker");
|
||||
|
||||
// #group
|
||||
return Behaviors.setup(
|
||||
return
|
||||
// #group
|
||||
Behaviors.setup(
|
||||
context -> {
|
||||
// #group
|
||||
|
||||
// this would likely happen elsewhere - if we create it locally we
|
||||
// can just as well use a pool
|
||||
ActorRef<Worker.Command> worker = context.spawn(Worker.create(), "worker");
|
||||
|
|
@ -106,15 +113,16 @@ public class RouterTest {
|
|||
GroupRouter<Worker.Command> group = Routers.group(serviceKey);
|
||||
ActorRef<Worker.Command> router = context.spawn(group, "worker-group");
|
||||
|
||||
// the group router will stash messages until it sees the first listing of registered
|
||||
// the group router will stash messages until it sees the first listing of
|
||||
// registered
|
||||
// services from the receptionist, so it is safe to send messages right away
|
||||
for (int i = 0; i < 10; i++) {
|
||||
router.tell(new Worker.DoLog("msg " + i));
|
||||
}
|
||||
// #group
|
||||
|
||||
return Behaviors.empty();
|
||||
});
|
||||
// #group
|
||||
}
|
||||
|
||||
public static void main(String[] args) {
|
||||
|
|
|
|||
|
|
@ -10,13 +10,12 @@ import akka.actor.testkit.typed.scaladsl.{ LogCapturing, ScalaTestWithActorTestK
|
|||
import akka.actor.typed.{ Behavior, SupervisorStrategy }
|
||||
import akka.actor.typed.receptionist.{ Receptionist, ServiceKey }
|
||||
import akka.actor.typed.scaladsl.{ Behaviors, Routers }
|
||||
import org.scalatest.wordspec.AnyWordSpecLike
|
||||
|
||||
// #pool
|
||||
|
||||
import org.scalatest.wordspec.AnyWordSpecLike
|
||||
object RouterSpec {
|
||||
|
||||
// #pool
|
||||
// #routee
|
||||
object Worker {
|
||||
sealed trait Command
|
||||
case class DoLog(text: String) extends Command
|
||||
|
|
@ -32,11 +31,14 @@ object RouterSpec {
|
|||
}
|
||||
}
|
||||
|
||||
// #pool
|
||||
// #routee
|
||||
// This code is extra indented for visualization purposes
|
||||
// format: OFF
|
||||
// #group
|
||||
val serviceKey = ServiceKey[Worker.Command]("log-worker")
|
||||
val serviceKey = ServiceKey[Worker.Command]("log-worker")
|
||||
|
||||
// #group
|
||||
// format: ON
|
||||
}
|
||||
|
||||
class RouterSpec extends ScalaTestWithActorTestKit("akka.loglevel=warning") with AnyWordSpecLike with LogCapturing {
|
||||
|
|
@ -54,36 +56,42 @@ class RouterSpec extends ScalaTestWithActorTestKit("akka.loglevel=warning") with
|
|||
def DoLog(text: String) = RouterSpec.Worker.DoLog(text)
|
||||
}
|
||||
|
||||
spawn(Behaviors.setup[Unit] { ctx =>
|
||||
spawn(
|
||||
// #pool
|
||||
val pool = Routers.pool(poolSize = 4)(
|
||||
// make sure the workers are restarted if they fail
|
||||
Behaviors.supervise(Worker()).onFailure[Exception](SupervisorStrategy.restart))
|
||||
val router = ctx.spawn(pool, "worker-pool")
|
||||
// This would be defined within your actor object
|
||||
Behaviors.setup[Unit] { ctx =>
|
||||
val pool = Routers.pool(poolSize = 4) {
|
||||
// make sure the workers are restarted if they fail
|
||||
Behaviors.supervise(Worker()).onFailure[Exception](SupervisorStrategy.restart)
|
||||
}
|
||||
val router = ctx.spawn(pool, "worker-pool")
|
||||
|
||||
(0 to 10).foreach { n =>
|
||||
router ! Worker.DoLog(s"msg $n")
|
||||
}
|
||||
// #pool
|
||||
|
||||
// #pool-dispatcher
|
||||
// make sure workers use the default blocking IO dispatcher
|
||||
val blockingPool = pool.withRouteeProps(routeeProps = DispatcherSelector.blocking())
|
||||
// spawn head router using the same executor as the parent
|
||||
val blockingRouter = ctx.spawn(blockingPool, "blocking-pool", DispatcherSelector.sameAsParent())
|
||||
// #pool-dispatcher
|
||||
|
||||
blockingRouter ! Worker.DoLog("msg")
|
||||
|
||||
// #strategy
|
||||
val alternativePool = pool.withPoolSize(2).withRoundRobinRouting()
|
||||
// #strategy
|
||||
|
||||
val alternativeRouter = ctx.spawn(alternativePool, "alternative-pool")
|
||||
alternativeRouter ! Worker.DoLog("msg")
|
||||
//#pool
|
||||
Behaviors.empty
|
||||
|
||||
(0 to 10).foreach { n =>
|
||||
router ! Worker.DoLog(s"msg $n")
|
||||
}
|
||||
// #pool
|
||||
|
||||
// #pool-dispatcher
|
||||
// make sure workers use the default blocking IO dispatcher
|
||||
val blockingPool = pool.withRouteeProps(routeeProps = DispatcherSelector.blocking())
|
||||
// spawn head router using the same executor as the parent
|
||||
val blockingRouter = ctx.spawn(blockingPool, "blocking-pool", DispatcherSelector.sameAsParent())
|
||||
// #pool-dispatcher
|
||||
|
||||
blockingRouter ! Worker.DoLog("msg")
|
||||
|
||||
// #strategy
|
||||
val alternativePool = pool.withPoolSize(2).withRoundRobinRouting()
|
||||
// #strategy
|
||||
|
||||
val alternativeRouter = ctx.spawn(alternativePool, "alternative-pool")
|
||||
alternativeRouter ! Worker.DoLog("msg")
|
||||
|
||||
Behaviors.empty
|
||||
})
|
||||
//#pool
|
||||
)
|
||||
|
||||
probe.receiveMessages(11)
|
||||
}
|
||||
|
|
@ -98,25 +106,27 @@ class RouterSpec extends ScalaTestWithActorTestKit("akka.loglevel=warning") with
|
|||
def DoLog(text: String) = RouterSpec.Worker.DoLog(text)
|
||||
}
|
||||
|
||||
spawn(Behaviors.setup[Unit] { ctx =>
|
||||
spawn(
|
||||
// #group
|
||||
// this would likely happen elsewhere - if we create it locally we
|
||||
// can just as well use a pool
|
||||
val worker = ctx.spawn(Worker(), "worker")
|
||||
ctx.system.receptionist ! Receptionist.Register(serviceKey, worker)
|
||||
Behaviors.setup[Unit] { ctx =>
|
||||
// this would likely happen elsewhere - if we create it locally we
|
||||
// can just as well use a pool
|
||||
val worker = ctx.spawn(Worker(), "worker")
|
||||
ctx.system.receptionist ! Receptionist.Register(serviceKey, worker)
|
||||
|
||||
val group = Routers.group(serviceKey)
|
||||
val router = ctx.spawn(group, "worker-group")
|
||||
val group = Routers.group(serviceKey)
|
||||
val router = ctx.spawn(group, "worker-group")
|
||||
|
||||
// the group router will stash messages until it sees the first listing of registered
|
||||
// services from the receptionist, so it is safe to send messages right away
|
||||
(0 to 10).foreach { n =>
|
||||
router ! Worker.DoLog(s"msg $n")
|
||||
// the group router will stash messages until it sees the first listing of registered
|
||||
// services from the receptionist, so it is safe to send messages right away
|
||||
(0 to 10).foreach { n =>
|
||||
router ! Worker.DoLog(s"msg $n")
|
||||
}
|
||||
|
||||
Behaviors.empty
|
||||
}
|
||||
// #group
|
||||
|
||||
Behaviors.empty
|
||||
})
|
||||
)
|
||||
|
||||
probe.receiveMessages(10)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -16,17 +16,17 @@ To use Akka Actor Typed, you must add the following dependency in your project:
|
|||
|
||||
## Introduction
|
||||
|
||||
In some cases it is useful to distribute messages of the same type over a set of actors, so that messages can be
|
||||
processed in parallel - a single actor will only process one message at a time.
|
||||
In some cases it is useful to distribute messages of the same type over a set of actors, so that messages can be
|
||||
processed in parallel - a single actor will only process one message at a time.
|
||||
|
||||
The router itself is a behavior that is spawned into a running actor that will then forward any message sent to it
|
||||
to one final recipient out of the set of routees.
|
||||
|
||||
There are two kinds of routers included in Akka Typed - the pool router and the group router.
|
||||
There are two kinds of routers included in Akka Typed - the pool router and the group router.
|
||||
|
||||
## Pool Router
|
||||
|
||||
The pool router is created with a routee `Behavior` and spawns a number of children with that behavior which it will
|
||||
The pool router is created with a routee `Behavior` and spawns a number of children with that behavior which it will
|
||||
then forward messages to.
|
||||
|
||||
If a child is stopped the pool router removes it from its set of routees. When the last child stops the router itself stops.
|
||||
|
|
@ -34,6 +34,16 @@ To make a resilient router that deals with failures the routee `Behavior` must b
|
|||
|
||||
As actor children are always local the routees are never spread across a cluster with a pool router.
|
||||
|
||||
Let's first introduce the routee:
|
||||
|
||||
Scala
|
||||
: @@snip [RouterSpec.scala](/akka-actor-typed-tests/src/test/scala/docs/akka/typed/RouterSpec.scala) { #routee }
|
||||
|
||||
Java
|
||||
: @@snip [RouterTest.java](/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/RouterTest.java) { #routee }
|
||||
|
||||
After having defined the routee, we can now concentrate on configuring the router itself. Note again the the router is an Actor in itself:
|
||||
|
||||
Scala
|
||||
: @@snip [RouterSpec.scala](/akka-actor-typed-tests/src/test/scala/docs/akka/typed/RouterSpec.scala) { #pool }
|
||||
|
||||
|
|
@ -61,7 +71,7 @@ Since the receptionist is used this means the group router is cluster-aware out
|
|||
messages to registered actors on any node in the cluster that is reachable. If no reachable actor exists the router
|
||||
will fallback and route messages to actors on nodes marked as unreachable.
|
||||
|
||||
That the receptionist is used also means that the set of routees is eventually consistent, and that immediately when
|
||||
That the receptionist is used also means that the set of routees is eventually consistent, and that immediately when
|
||||
the group router is started the set of routees it knows about is empty, until it has seen a listing from the receptionist
|
||||
it stashes incoming messages and forwards them as soon as it gets a listing from the receptionist.
|
||||
|
||||
|
|
@ -74,7 +84,6 @@ Scala
|
|||
Java
|
||||
: @@snip [RouterTest.java](/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/RouterTest.java) { #group }
|
||||
|
||||
|
||||
## Routing strategies
|
||||
|
||||
There are three different strategies for selecting which routee a message is forwarded to that can be selected
|
||||
|
|
@ -84,7 +93,7 @@ Scala
|
|||
: @@snip [RouterSpec.scala](/akka-actor-typed-tests/src/test/scala/docs/akka/typed/RouterSpec.scala) { #strategy }
|
||||
|
||||
Java
|
||||
: @@snip [RouterTest.java](/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/RouterTest.java) { #strategy }
|
||||
: @@snip [RouterTest.java](/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/RouterTest.java) { #strategy }
|
||||
|
||||
### Round Robin
|
||||
|
||||
|
|
@ -98,7 +107,6 @@ This is the default for pool routers as the pool of routees is expected to remai
|
|||
|
||||
An optional parameter `preferLocalRoutees` can be used for this strategy. Routers will only use routees located in local actor system if `preferLocalRoutees` is true and local routees do exist. The default value for this parameter is false.
|
||||
|
||||
|
||||
### Random
|
||||
|
||||
Randomly selects a routee when a message is sent through the router.
|
||||
|
|
@ -108,9 +116,9 @@ This is the default for group routers as the group of routees is expected to cha
|
|||
An optional parameter `preferLocalRoutees` can be used for this strategy. Routers will only use routees located in local actor system if `preferLocalRoutees` is true and local routees do exist. The default value for this parameter is false.
|
||||
|
||||
### Consistent Hashing
|
||||
|
||||
|
||||
Uses [consistent hashing](https://en.wikipedia.org/wiki/Consistent_hashing) to select a routee based
|
||||
on the sent message. This [article](http://www.tom-e-white.com/2007/11/consistent-hashing.html)
|
||||
on the sent message. This [article](http://www.tom-e-white.com/2007/11/consistent-hashing.html)
|
||||
gives good insight into how consistent hashing is implemented.
|
||||
|
||||
Currently you have to define hashMapping of the router to map incoming messages to their consistent
|
||||
|
|
@ -125,7 +133,7 @@ See also @ref[Akka Cluster Sharding](cluster-sharding.md) which provides stable
|
|||
|
||||
Note that if the routees are sharing a resource, the resource will determine if increasing the number of
|
||||
actors will actually give higher throughput or faster answers. For example if the routees are CPU bound actors
|
||||
it will not give better performance to create more routees than there are threads to execute the actors.
|
||||
it will not give better performance to create more routees than there are threads to execute the actors.
|
||||
|
||||
Since the router itself is an actor and has a mailbox this means that messages are routed sequentially to the routees
|
||||
where it can be processed in parallel (depending on the available threads in the dispatcher).
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue