Typed actor routers #25612
This commit is contained in:
parent
ba2447159a
commit
0ebe54ce74
13 changed files with 1091 additions and 1 deletions
|
|
@ -0,0 +1,20 @@
|
|||
/*
|
||||
* Copyright (C) 2009-2019 Lightbend Inc. <https://www.lightbend.com>
|
||||
*/
|
||||
|
||||
package akka.actor.typed.javadsl;
|
||||
|
||||
import akka.actor.typed.Behavior;
|
||||
import akka.actor.typed.receptionist.ServiceKey;
|
||||
|
||||
public class RoutersTest {
|
||||
|
||||
public void compileOnlyApiTest() {
|
||||
|
||||
final ServiceKey<String> key = ServiceKey.create(String.class, "key");
|
||||
Behavior<String> group = Routers.group(key).withRandomRouting().withRoundRobinRouting();
|
||||
|
||||
Behavior<String> pool =
|
||||
Routers.pool(5, Behaviors.<String>empty()).withRandomRouting().withRoundRobinRouting();
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,117 @@
|
|||
/*
|
||||
* Copyright (C) 2019 Lightbend Inc. <https://www.lightbend.com>
|
||||
*/
|
||||
|
||||
package jdocs.akka.typed;
|
||||
/*
|
||||
* Copyright (C) 2009-2019 Lightbend Inc. <https://www.lightbend.com>
|
||||
*/
|
||||
|
||||
import akka.actor.typed.ActorSystem;
|
||||
// #pool
|
||||
import akka.actor.typed.ActorRef;
|
||||
import akka.actor.typed.Behavior;
|
||||
import akka.actor.typed.SupervisorStrategy;
|
||||
import akka.actor.typed.javadsl.Behaviors;
|
||||
import akka.actor.typed.javadsl.GroupRouter;
|
||||
import akka.actor.typed.javadsl.PoolRouter;
|
||||
import akka.actor.typed.javadsl.Routers;
|
||||
import akka.actor.typed.receptionist.Receptionist;
|
||||
import akka.actor.typed.receptionist.ServiceKey;
|
||||
|
||||
// #pool
|
||||
|
||||
public class RouterTest {
|
||||
|
||||
static // #pool
|
||||
class Worker {
|
||||
interface Command {}
|
||||
|
||||
static class DoLog implements Command {
|
||||
public final String text;
|
||||
|
||||
public DoLog(String text) {
|
||||
this.text = text;
|
||||
}
|
||||
}
|
||||
|
||||
static final Behavior<Command> behavior =
|
||||
Behaviors.setup(
|
||||
context -> {
|
||||
context.getLog().info("Starting worker");
|
||||
|
||||
return Behaviors.receive(Command.class)
|
||||
.onMessage(
|
||||
DoLog.class,
|
||||
(notUsed, doLog) -> {
|
||||
context.getLog().info("Got message {}", doLog.text);
|
||||
return Behaviors.same();
|
||||
})
|
||||
.build();
|
||||
});
|
||||
}
|
||||
|
||||
// #pool
|
||||
|
||||
static Behavior<Void> showPoolRouting() {
|
||||
return Behaviors.setup(
|
||||
context -> {
|
||||
// #pool
|
||||
// make sure the workers are restarted if they fail
|
||||
Behavior<Worker.Command> supervisedWorker =
|
||||
Behaviors.supervise(Worker.behavior).onFailure(SupervisorStrategy.restart());
|
||||
PoolRouter<Worker.Command> pool = Routers.pool(4, supervisedWorker);
|
||||
ActorRef<Worker.Command> router = context.spawn(pool, "worker-pool");
|
||||
|
||||
for (int i = 0; i < 10; i++) {
|
||||
router.tell(new Worker.DoLog("msg " + i));
|
||||
}
|
||||
// #pool
|
||||
|
||||
// #strategy
|
||||
PoolRouter<Worker.Command> alternativePool = pool.withPoolSize(2).withRoundRobinRouting();
|
||||
// #strategy
|
||||
|
||||
return Behaviors.empty();
|
||||
});
|
||||
}
|
||||
|
||||
static Behavior<Void> showGroupRouting() {
|
||||
ServiceKey<Worker.Command> serviceKey = ServiceKey.create(Worker.Command.class, "log-worker");
|
||||
return Behaviors.setup(
|
||||
context -> {
|
||||
// #group
|
||||
// this would likely happen elsewhere - if we create it locally we
|
||||
// can just as well use a pool
|
||||
ActorRef<Worker.Command> worker = context.spawn(Worker.behavior, "worker");
|
||||
context.getSystem().receptionist().tell(Receptionist.register(serviceKey, worker));
|
||||
|
||||
GroupRouter<Worker.Command> group = Routers.group(serviceKey);
|
||||
ActorRef<Worker.Command> router = context.spawn(group, "worker-group");
|
||||
|
||||
// note that since registration of workers goes through the receptionist there is no
|
||||
// guarantee the router has seen any workers yet if we hit it directly like this and
|
||||
// these messages may end up in dead letters - in a real application you would not use
|
||||
// a group router like this - it is to keep the sample simple
|
||||
for (int i = 0; i < 10; i++) {
|
||||
router.tell(new Worker.DoLog("msg " + i));
|
||||
}
|
||||
// #group
|
||||
|
||||
return Behaviors.empty();
|
||||
});
|
||||
}
|
||||
|
||||
public static void main(String[] args) {
|
||||
ActorSystem<Void> system =
|
||||
ActorSystem.create(
|
||||
Behaviors.setup(
|
||||
context -> {
|
||||
context.spawn(showPoolRouting(), "pool-router-setup");
|
||||
context.spawn(showGroupRouting(), "group-router-setup");
|
||||
|
||||
return Behaviors.empty();
|
||||
}),
|
||||
"RouterTest");
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,138 @@
|
|||
/*
|
||||
* Copyright (C) 2009-2019 Lightbend Inc. <https://www.lightbend.com>
|
||||
*/
|
||||
|
||||
package akka.actor.typed.internal.routing
|
||||
import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
|
||||
import akka.actor.testkit.typed.scaladsl.TestProbe
|
||||
import org.scalatest.Matchers
|
||||
import org.scalatest.WordSpecLike
|
||||
|
||||
class RoutingLogicSpec extends ScalaTestWithActorTestKit with WordSpecLike with Matchers {
|
||||
|
||||
"The round robin routing logic" must {
|
||||
|
||||
"round robin" in {
|
||||
val refA = TestProbe("a").ref
|
||||
val refB = TestProbe("b").ref
|
||||
val refC = TestProbe("c").ref
|
||||
val allRoutees = Set(refA, refB, refC)
|
||||
|
||||
val logic = new RoutingLogics.RoundRobinLogic[Any]
|
||||
|
||||
logic.routeesUpdated(allRoutees)
|
||||
logic.selectRoutee() should ===(refA)
|
||||
logic.selectRoutee() should ===(refB)
|
||||
logic.selectRoutee() should ===(refC)
|
||||
logic.selectRoutee() should ===(refA)
|
||||
}
|
||||
|
||||
"not skip one on removal" in {
|
||||
val refA = TestProbe("a").ref
|
||||
val refB = TestProbe("b").ref
|
||||
val refC = TestProbe("c").ref
|
||||
val allRoutees = Set(refA, refB, refC)
|
||||
|
||||
val logic = new RoutingLogics.RoundRobinLogic[Any]
|
||||
logic.routeesUpdated(allRoutees)
|
||||
logic.selectRoutee() should ===(refA)
|
||||
logic.selectRoutee() should ===(refB)
|
||||
|
||||
val bRemoved = Set(refA, refC)
|
||||
logic.routeesUpdated(bRemoved)
|
||||
logic.selectRoutee() should ===(refC)
|
||||
}
|
||||
|
||||
"handle last one removed" in {
|
||||
val refA = TestProbe("a").ref
|
||||
val refB = TestProbe("b").ref
|
||||
val allRoutees = Set(refA, refB)
|
||||
|
||||
val logic = new RoutingLogics.RoundRobinLogic[Any]
|
||||
logic.routeesUpdated(allRoutees)
|
||||
logic.selectRoutee() should ===(refA)
|
||||
|
||||
val bRemoved = Set(refA)
|
||||
logic.routeesUpdated(bRemoved)
|
||||
logic.selectRoutee() should ===(refA)
|
||||
}
|
||||
|
||||
"move on to next when several removed" in {
|
||||
// this can happen with a group router update
|
||||
val refA = TestProbe("a").ref
|
||||
val refB = TestProbe("b").ref
|
||||
val refC = TestProbe("c").ref
|
||||
val refD = TestProbe("d").ref
|
||||
val allRoutees = Set(refA, refB, refC, refD)
|
||||
|
||||
val logic = new RoutingLogics.RoundRobinLogic[Any]
|
||||
logic.routeesUpdated(allRoutees)
|
||||
logic.selectRoutee() should ===(refA)
|
||||
logic.selectRoutee() should ===(refB)
|
||||
|
||||
val severalRemoved = Set(refA, refC)
|
||||
logic.routeesUpdated(severalRemoved)
|
||||
logic.selectRoutee() should ===(refC)
|
||||
}
|
||||
|
||||
"wrap around when several removed" in {
|
||||
// this can happen with a group router update
|
||||
val refA = TestProbe("a").ref
|
||||
val refB = TestProbe("b").ref
|
||||
val refC = TestProbe("c").ref
|
||||
val refD = TestProbe("d").ref
|
||||
val allRoutees = Set(refA, refB, refC, refD)
|
||||
|
||||
val logic = new RoutingLogics.RoundRobinLogic[Any]
|
||||
logic.routeesUpdated(allRoutees)
|
||||
logic.selectRoutee() should ===(refA)
|
||||
logic.selectRoutee() should ===(refB)
|
||||
logic.selectRoutee() should ===(refC)
|
||||
|
||||
val severalRemoved = Set(refA, refC)
|
||||
logic.routeesUpdated(severalRemoved)
|
||||
logic.selectRoutee() should ===(refA)
|
||||
}
|
||||
|
||||
"pick first in with a completely new set of routees" in {
|
||||
// this can happen with a group router update
|
||||
val refA = TestProbe("a").ref
|
||||
val refB = TestProbe("b").ref
|
||||
val refC = TestProbe("c").ref
|
||||
val refD = TestProbe("d").ref
|
||||
val initialRoutees = Set(refA, refB)
|
||||
|
||||
val logic = new RoutingLogics.RoundRobinLogic[Any]
|
||||
logic.routeesUpdated(initialRoutees)
|
||||
logic.selectRoutee() should ===(refA)
|
||||
logic.selectRoutee() should ===(refB)
|
||||
logic.selectRoutee() should ===(refA)
|
||||
|
||||
val severalRemoved = Set(refC, refD)
|
||||
logic.routeesUpdated(severalRemoved)
|
||||
logic.selectRoutee() should ===(refC)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
"The random logic" must {
|
||||
"select randomly" in {
|
||||
val refA = TestProbe("a").ref
|
||||
val refB = TestProbe("b").ref
|
||||
val refC = TestProbe("c").ref
|
||||
val refD = TestProbe("d").ref
|
||||
val routees = Set(refA, refB, refC, refD)
|
||||
|
||||
val logic = new RoutingLogics.RandomLogic[Any]()
|
||||
logic.routeesUpdated(routees)
|
||||
|
||||
(0 to 10).foreach { _ ⇒
|
||||
// not much to verify here, but let's exercise it at least
|
||||
val routee = logic.selectRoutee()
|
||||
routees should contain(routee)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,207 @@
|
|||
/*
|
||||
* Copyright (C) 2009-2019 Lightbend Inc. <https://www.lightbend.com>
|
||||
*/
|
||||
|
||||
package akka.actor.typed.scaladsl
|
||||
import java.util.concurrent.atomic.AtomicInteger
|
||||
|
||||
import akka.actor.DeadLetter
|
||||
import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
|
||||
import akka.actor.testkit.typed.scaladsl.TestProbe
|
||||
import akka.actor.typed.Behavior
|
||||
import akka.actor.typed.receptionist.Receptionist
|
||||
import akka.actor.typed.receptionist.ServiceKey
|
||||
import akka.actor.typed.scaladsl.adapter._
|
||||
import akka.testkit.EventFilter
|
||||
import org.scalatest.Matchers
|
||||
import org.scalatest.WordSpecLike
|
||||
|
||||
class RoutersSpec extends ScalaTestWithActorTestKit("""
|
||||
akka.loggers = ["akka.testkit.TestEventListener"]
|
||||
akka.loglevel=debug
|
||||
""") with WordSpecLike with Matchers {
|
||||
|
||||
// needed for the event filter
|
||||
implicit val untypedSystem = system.toUntyped
|
||||
|
||||
def compileOnlyApiCoverage(): Unit = {
|
||||
Routers.group(ServiceKey[String]("key"))
|
||||
.withRandomRouting()
|
||||
.withRoundRobinRouting()
|
||||
|
||||
Routers.pool(10)(Behavior.empty[Any])
|
||||
.withRandomRouting()
|
||||
.withRoundRobinRouting()
|
||||
}
|
||||
|
||||
"The router pool" must {
|
||||
|
||||
"create n children and route messages to" in {
|
||||
val childCounter = new AtomicInteger(0)
|
||||
val probe = createTestProbe[String]()
|
||||
val pool = spawn(Routers.pool[String](4)(Behaviors.setup { _ ⇒
|
||||
val id = childCounter.getAndIncrement()
|
||||
probe.ref ! s"started $id"
|
||||
Behaviors.receiveMessage { msg ⇒
|
||||
probe.ref ! s"$id $msg"
|
||||
Behaviors.same
|
||||
}
|
||||
}))
|
||||
|
||||
// ordering of these msgs is not guaranteed
|
||||
val expectedStarted = (0 to 3).map { n ⇒ s"started $n" }.toSet
|
||||
probe.receiveMessages(4).toSet should ===(expectedStarted)
|
||||
|
||||
(0 to 8).foreach { n ⇒
|
||||
pool ! s"message-$n"
|
||||
val expectedRecipient = n % 4
|
||||
probe.expectMessage(s"$expectedRecipient message-$n")
|
||||
}
|
||||
}
|
||||
|
||||
"keep routing to the rest of the children if some children stops" in {
|
||||
val probe = createTestProbe[String]()
|
||||
val pool = spawn(Routers.pool[String](4)(Behaviors.setup { _ ⇒
|
||||
Behaviors.receiveMessage {
|
||||
case "stop" ⇒
|
||||
Behaviors.stopped
|
||||
case msg ⇒
|
||||
probe.ref ! msg
|
||||
Behaviors.same
|
||||
}
|
||||
}))
|
||||
|
||||
EventFilter.debug(start = "Pool child stopped", occurrences = 2).intercept {
|
||||
pool ! "stop"
|
||||
pool ! "stop"
|
||||
}
|
||||
|
||||
// there is a race here where the child stopped but the router did not see that message yet, and may
|
||||
// deliver messages to it, which will end up in dead letters.
|
||||
// this test protects against that by waiting for the log entry to show up
|
||||
|
||||
val responses = (0 to 4).map { n ⇒
|
||||
val msg = s"message-$n"
|
||||
pool ! msg
|
||||
probe.expectMessageType[String]
|
||||
}
|
||||
|
||||
responses should contain allOf ("message-0", "message-1", "message-2", "message-3", "message-4")
|
||||
}
|
||||
|
||||
"stops if all children stops" in {
|
||||
val probe = createTestProbe()
|
||||
val pool = spawn(Routers.pool[String](4)(Behaviors.setup { _ ⇒
|
||||
Behaviors.receiveMessage { _ ⇒
|
||||
Behaviors.stopped
|
||||
}
|
||||
}))
|
||||
|
||||
EventFilter.info(start = "Last pool child stopped, stopping pool", occurrences = 1).intercept {
|
||||
(0 to 3).foreach { _ ⇒
|
||||
pool ! "stop"
|
||||
}
|
||||
probe.expectTerminated(pool)
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
"The router group" must {
|
||||
|
||||
val receptionistDelayMs = 250
|
||||
|
||||
"route messages across routees registered to the receptionist" in {
|
||||
val serviceKey = ServiceKey[String]("group-routing-1")
|
||||
val probe = createTestProbe[String]()
|
||||
val routeeBehavior: Behavior[String] = Behaviors.receiveMessage { msg ⇒
|
||||
probe.ref ! msg
|
||||
Behaviors.same
|
||||
}
|
||||
|
||||
(0 to 3).foreach { n ⇒
|
||||
val ref = spawn(routeeBehavior, s"group-1-routee-$n")
|
||||
system.receptionist ! Receptionist.register(serviceKey, ref)
|
||||
}
|
||||
|
||||
val group = spawn(Routers.group(serviceKey), "group-router-1")
|
||||
|
||||
// give the group a little time to get a listing from the receptionist
|
||||
Thread.sleep(receptionistDelayMs)
|
||||
|
||||
(0 to 3).foreach { n ⇒
|
||||
val msg = s"message-$n"
|
||||
group ! msg
|
||||
probe.expectMessage(msg)
|
||||
}
|
||||
|
||||
testKit.stop(group)
|
||||
}
|
||||
|
||||
"pass messages to dead letters when there are no routees available" in {
|
||||
val serviceKey = ServiceKey[String]("group-routing-2")
|
||||
val group = spawn(Routers.group(serviceKey), "group-router-2")
|
||||
val probe = TestProbe[DeadLetter]()
|
||||
system.toUntyped.eventStream.subscribe(probe.ref.toUntyped, classOf[DeadLetter])
|
||||
|
||||
(0 to 3).foreach { n ⇒
|
||||
val msg = s"message-$n"
|
||||
/* FIXME cant watch log events until #26432 is fixed
|
||||
EventFilter.info(start = "Message [java.lang.String] ... was not delivered.", occurrences = 1).intercept { */
|
||||
group ! msg
|
||||
probe.expectMessageType[DeadLetter]
|
||||
/* } */
|
||||
}
|
||||
|
||||
testKit.stop(group)
|
||||
}
|
||||
|
||||
"handle a changing set of routees" in {
|
||||
val serviceKey = ServiceKey[String]("group-routing-3")
|
||||
val probe = createTestProbe[String]()
|
||||
val routeeBehavior: Behavior[String] = Behaviors.receiveMessage {
|
||||
case "stop" ⇒
|
||||
Behaviors.stopped
|
||||
case msg ⇒
|
||||
probe.ref ! msg
|
||||
Behaviors.same
|
||||
}
|
||||
|
||||
val ref1 = spawn(routeeBehavior, s"group-3-routee-1")
|
||||
system.receptionist ! Receptionist.register(serviceKey, ref1)
|
||||
|
||||
val ref2 = spawn(routeeBehavior, s"group-3-routee-2")
|
||||
system.receptionist ! Receptionist.register(serviceKey, ref2)
|
||||
|
||||
val ref3 = spawn(routeeBehavior, s"group-3-routee-3")
|
||||
system.receptionist ! Receptionist.register(serviceKey, ref3)
|
||||
|
||||
val group = spawn(Routers.group(serviceKey), "group-router-3")
|
||||
|
||||
// give the group a little time to get a listing from the receptionist
|
||||
Thread.sleep(receptionistDelayMs)
|
||||
|
||||
(0 to 3).foreach { n ⇒
|
||||
val msg = s"message-$n"
|
||||
group ! msg
|
||||
probe.expectMessage(msg)
|
||||
}
|
||||
|
||||
ref2 ! "stop"
|
||||
|
||||
// give the group a little time to get an updated listing from the receptionist
|
||||
Thread.sleep(receptionistDelayMs)
|
||||
|
||||
(0 to 3).foreach { n ⇒
|
||||
val msg = s"message-$n"
|
||||
group ! msg
|
||||
probe.expectMessage(msg)
|
||||
}
|
||||
|
||||
testKit.stop(group)
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -0,0 +1,94 @@
|
|||
/*
|
||||
* Copyright (C) 2009-2019 Lightbend Inc. <https://www.lightbend.com>
|
||||
*/
|
||||
|
||||
package docs.akka.typed
|
||||
|
||||
// #pool
|
||||
import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
|
||||
import akka.actor.typed.Behavior
|
||||
import akka.actor.typed.SupervisorStrategy
|
||||
import akka.actor.typed.receptionist.Receptionist
|
||||
import akka.actor.typed.receptionist.ServiceKey
|
||||
import akka.actor.typed.scaladsl.Behaviors
|
||||
import akka.actor.typed.scaladsl.Routers
|
||||
import org.scalatest.WordSpecLike
|
||||
|
||||
// #pool
|
||||
|
||||
object RouterSpec {
|
||||
|
||||
// #pool
|
||||
object Worker {
|
||||
sealed trait Command
|
||||
case class DoLog(text: String) extends Command
|
||||
|
||||
val behavior: Behavior[Command] = Behaviors.setup { ctx ⇒
|
||||
ctx.log.info("Starting worker")
|
||||
|
||||
Behaviors.receiveMessage {
|
||||
case DoLog(text) ⇒
|
||||
ctx.log.info("Got message {}", text)
|
||||
Behaviors.same
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// #pool
|
||||
|
||||
val serviceKey = ServiceKey[Worker.Command]("log-worker")
|
||||
}
|
||||
|
||||
class RouterSpec extends ScalaTestWithActorTestKit with WordSpecLike {
|
||||
import RouterSpec._
|
||||
|
||||
"The routing sample" must {
|
||||
|
||||
"show pool routing" in {
|
||||
spawn(Behaviors.setup[Unit] { ctx ⇒
|
||||
// #pool
|
||||
// make sure the workers are restarted if they fail
|
||||
val supervisedWorker = Behaviors.supervise(Worker.behavior)
|
||||
.onFailure[Exception](SupervisorStrategy.restart)
|
||||
val pool = Routers.pool(poolSize = 4)(supervisedWorker)
|
||||
val router = ctx.spawn(pool, "worker-pool")
|
||||
|
||||
(0 to 10).foreach { n ⇒
|
||||
router ! Worker.DoLog(s"msg $n")
|
||||
}
|
||||
// #pool
|
||||
|
||||
// #strategy
|
||||
val alternativePool = pool.withPoolSize(2).withRoundRobinRouting()
|
||||
// #strategy
|
||||
|
||||
Behaviors.empty
|
||||
})
|
||||
}
|
||||
|
||||
"show group routing" in {
|
||||
|
||||
spawn(Behaviors.setup[Unit] { ctx ⇒
|
||||
// #group
|
||||
// this would likely happen elsewhere - if we create it locally we
|
||||
// can just as well use a pool
|
||||
val worker = ctx.spawn(Worker.behavior, "worker")
|
||||
ctx.system.receptionist ! Receptionist.Register(serviceKey, worker)
|
||||
|
||||
val group = Routers.group(serviceKey);
|
||||
val router = ctx.spawn(group, "worker-group");
|
||||
|
||||
// note that since registration of workers goes through the receptionist there is no
|
||||
// guarantee the router has seen any workers yet if we hit it directly like this and
|
||||
// these messages may end up in dead letters - in a real application you would not use
|
||||
// a group router like this - it is to keep the sample simple
|
||||
(0 to 10).foreach { n ⇒
|
||||
router ! Worker.DoLog(s"msg $n")
|
||||
}
|
||||
// #group
|
||||
|
||||
Behaviors.empty
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -8,7 +8,7 @@ import akka.annotation.DoNotInherit
|
|||
|
||||
/**
|
||||
* Envelope that is published on the eventStream for every message that is
|
||||
* dropped due to overfull queues.
|
||||
* dropped due to overfull queues or routers with no routees.
|
||||
*/
|
||||
final case class Dropped(msg: Any, recipient: ActorRef[Nothing]) {
|
||||
/** Java API */
|
||||
|
|
|
|||
|
|
@ -0,0 +1,61 @@
|
|||
/*
|
||||
* Copyright (C) 2009-2019 Lightbend Inc. <https://www.lightbend.com>
|
||||
*/
|
||||
|
||||
package akka.actor.typed.internal.routing
|
||||
|
||||
import akka.actor.typed._
|
||||
import akka.actor.typed.receptionist.Receptionist
|
||||
import akka.actor.typed.receptionist.ServiceKey
|
||||
import akka.actor.typed.scaladsl.AbstractBehavior
|
||||
import akka.actor.typed.scaladsl.ActorContext
|
||||
import akka.annotation.InternalApi
|
||||
|
||||
/**
|
||||
* Provides builder style configuration options for group routers while still being a behavior that can be spawned
|
||||
*
|
||||
* INTERNAL API
|
||||
*/
|
||||
@InternalApi
|
||||
private[akka] final case class GroupRouterBuilder[T] private[akka] (
|
||||
key: ServiceKey[T],
|
||||
logicFactory: () ⇒ RoutingLogic[T] = () ⇒ new RoutingLogics.RandomLogic[T]()
|
||||
) extends javadsl.GroupRouter[T]
|
||||
with scaladsl.GroupRouter[T] {
|
||||
|
||||
// deferred creation of the actual router
|
||||
def apply(ctx: TypedActorContext[T]): Behavior[T] = new GroupRouterImpl[T](ctx.asScala, key, logicFactory())
|
||||
|
||||
def withRandomRouting(): GroupRouterBuilder[T] = copy(logicFactory = () ⇒ new RoutingLogics.RandomLogic[T]())
|
||||
|
||||
def withRoundRobinRouting(): GroupRouterBuilder[T] = copy(logicFactory = () ⇒ new RoutingLogics.RoundRobinLogic[T])
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
*/
|
||||
@InternalApi
|
||||
private final class GroupRouterImpl[T](
|
||||
ctx: ActorContext[T],
|
||||
serviceKey: ServiceKey[T],
|
||||
routingLogic: RoutingLogic[T]
|
||||
) extends AbstractBehavior[T] {
|
||||
|
||||
// casting trix to avoid having to wrap incoming messages - note that this will cause problems if intercepting
|
||||
// messages to a router
|
||||
ctx.system.receptionist ! Receptionist.Subscribe(serviceKey, ctx.self.unsafeUpcast[Any].narrow[Receptionist.Listing])
|
||||
private var routeesEmpty = true
|
||||
|
||||
def onMessage(msg: T): Behavior[T] = msg match {
|
||||
case serviceKey.Listing(update) ⇒
|
||||
// we don't need to watch, because receptionist already does that
|
||||
routingLogic.routeesUpdated(update)
|
||||
routeesEmpty = update.isEmpty
|
||||
this
|
||||
case msg: T @unchecked ⇒
|
||||
if (!routeesEmpty) routingLogic.selectRoutee() ! msg
|
||||
else ctx.system.deadLetters ! Dropped(msg, ctx.self)
|
||||
this
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,79 @@
|
|||
/*
|
||||
* Copyright (C) 2009-2019 Lightbend Inc. <https://www.lightbend.com>
|
||||
*/
|
||||
|
||||
package akka.actor.typed.internal.routing
|
||||
|
||||
import akka.actor.typed._
|
||||
import akka.actor.typed.scaladsl.AbstractBehavior
|
||||
import akka.actor.typed.scaladsl.ActorContext
|
||||
import akka.annotation.InternalApi
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
*/
|
||||
@InternalApi
|
||||
private[akka] final case class PoolRouterBuilder[T](
|
||||
poolSize: Int,
|
||||
behavior: Behavior[T],
|
||||
logicFactory: () ⇒ RoutingLogic[T] = () ⇒ new RoutingLogics.RoundRobinLogic[T]
|
||||
) extends javadsl.PoolRouter[T]
|
||||
with scaladsl.PoolRouter[T] {
|
||||
if (poolSize < 1) throw new IllegalArgumentException(s"pool size must be positive, was $poolSize")
|
||||
|
||||
// deferred creation of the actual router
|
||||
def apply(ctx: TypedActorContext[T]): Behavior[T] = new PoolRouterImpl[T](ctx.asScala, poolSize, behavior, logicFactory())
|
||||
|
||||
def withRandomRouting(): PoolRouterBuilder[T] = copy(logicFactory = () ⇒ new RoutingLogics.RandomLogic[T]())
|
||||
|
||||
def withRoundRobinRouting(): PoolRouterBuilder[T] = copy(logicFactory = () ⇒ new RoutingLogics.RoundRobinLogic[T])
|
||||
|
||||
def withPoolSize(poolSize: Int): PoolRouterBuilder[T] = copy(poolSize = poolSize)
|
||||
}
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
*/
|
||||
@InternalApi
|
||||
private final class PoolRouterImpl[T](
|
||||
ctx: ActorContext[T],
|
||||
poolSize: Int,
|
||||
behavior: Behavior[T],
|
||||
logic: RoutingLogic[T]
|
||||
) extends AbstractBehavior[T] {
|
||||
|
||||
(1 to poolSize).foreach { _ ⇒
|
||||
val child = ctx.spawnAnonymous(behavior)
|
||||
ctx.watch(child)
|
||||
child
|
||||
}
|
||||
onRouteesChanged()
|
||||
|
||||
private def onRouteesChanged(): Unit = {
|
||||
val children = ctx.children.toSet.asInstanceOf[Set[ActorRef[T]]]
|
||||
logic.routeesUpdated(children)
|
||||
}
|
||||
|
||||
def onMessage(msg: T): Behavior[T] = {
|
||||
logic.selectRoutee() ! msg
|
||||
this
|
||||
}
|
||||
|
||||
override def onSignal: PartialFunction[Signal, Behavior[T]] = {
|
||||
case Terminated(child) ⇒
|
||||
// Note that if several children are stopping concurrently children may already be empty
|
||||
// for the `Terminated` we receive for the first child. This means it is not certain that
|
||||
// there will be a log entry per child in those cases (it does not make sense to keep the
|
||||
// pool alive just to get the logging right when there are no routees available)
|
||||
if (ctx.children.nonEmpty) {
|
||||
ctx.log.debug("Pool child stopped [{}]", child.path)
|
||||
onRouteesChanged()
|
||||
this
|
||||
} else {
|
||||
ctx.log.info("Last pool child stopped, stopping pool [{}]", ctx.self.path)
|
||||
Behavior.stopped
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
|
@ -0,0 +1,90 @@
|
|||
/*
|
||||
* Copyright (C) 2009-2019 Lightbend Inc. <https://www.lightbend.com>
|
||||
*/
|
||||
|
||||
package akka.actor.typed.internal.routing
|
||||
|
||||
import akka.actor.typed.ActorRef
|
||||
import akka.annotation.InternalApi
|
||||
import akka.dispatch.forkjoin.ThreadLocalRandom
|
||||
|
||||
/**
|
||||
* Kept in the behavior, not shared between instances, meant to be stateful.
|
||||
*
|
||||
* INTERNAL API
|
||||
*/
|
||||
@InternalApi
|
||||
sealed private[akka] trait RoutingLogic[T] {
|
||||
/**
|
||||
* @param routees available routees, will contain at least one element. Must not be mutated by select logic.
|
||||
*/
|
||||
def selectRoutee(): ActorRef[T]
|
||||
|
||||
/**
|
||||
* Invoked an initial time before `selectRoutee` is ever called and then every time the set of available
|
||||
* routees changes.
|
||||
*
|
||||
* @param newRoutees The updated set of routees. For a group router this could be empty, in that case
|
||||
* `selectRoutee()` will not be called before `routeesUpdated` is invoked again with at
|
||||
* least one routee. For a pool the pool stops instead of ever calling `routeesUpdated`
|
||||
* with an empty list of routees.
|
||||
*/
|
||||
def routeesUpdated(newRoutees: Set[ActorRef[T]]): Unit
|
||||
}
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
*/
|
||||
@InternalApi
|
||||
private[akka] object RoutingLogics {
|
||||
|
||||
final class RoundRobinLogic[T] extends RoutingLogic[T] {
|
||||
|
||||
private var currentRoutees: Array[ActorRef[T]] = _
|
||||
|
||||
private var nextIdx = 0
|
||||
|
||||
def selectRoutee(): ActorRef[T] = {
|
||||
if (nextIdx >= currentRoutees.length) nextIdx = 0
|
||||
val selected = currentRoutees(nextIdx)
|
||||
nextIdx += 1
|
||||
selected
|
||||
}
|
||||
|
||||
override def routeesUpdated(newRoutees: Set[ActorRef[T]]): Unit = {
|
||||
// make sure we keep a somewhat similar order so we can potentially continue roundrobining
|
||||
// from where we were unless the set of routees completely changed
|
||||
// Also, avoid putting all entries from the same node next to each other in case of cluster
|
||||
val sortedNewRoutees = newRoutees.toArray.sortBy(ref ⇒ (ref.path.toStringWithoutAddress, ref.path.address))
|
||||
|
||||
if (currentRoutees ne null) {
|
||||
val firstDiffIndex = {
|
||||
var idx = 0
|
||||
while (idx < currentRoutees.length &&
|
||||
idx < sortedNewRoutees.length &&
|
||||
currentRoutees(idx) == sortedNewRoutees(idx)) {
|
||||
idx += 1
|
||||
}
|
||||
idx
|
||||
}
|
||||
if (nextIdx > firstDiffIndex) nextIdx -= 1
|
||||
}
|
||||
currentRoutees = sortedNewRoutees
|
||||
}
|
||||
}
|
||||
|
||||
final class RandomLogic[T] extends RoutingLogic[T] {
|
||||
|
||||
private var currentRoutees: Array[ActorRef[T]] = _
|
||||
|
||||
override def selectRoutee(): ActorRef[T] = {
|
||||
val selectedIdx = ThreadLocalRandom.current().nextInt(currentRoutees.length)
|
||||
currentRoutees(selectedIdx)
|
||||
}
|
||||
override def routeesUpdated(newRoutees: Set[ActorRef[T]]): Unit = {
|
||||
currentRoutees = newRoutees.toArray
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -0,0 +1,99 @@
|
|||
/*
|
||||
* Copyright (C) 2009-2019 Lightbend Inc. <https://www.lightbend.com>
|
||||
*/
|
||||
|
||||
package akka.actor.typed.javadsl
|
||||
|
||||
import akka.actor.typed.Behavior
|
||||
import akka.actor.typed.Behavior.DeferredBehavior
|
||||
import akka.actor.typed.internal.routing.GroupRouterBuilder
|
||||
import akka.actor.typed.internal.routing.PoolRouterBuilder
|
||||
import akka.actor.typed.receptionist.ServiceKey
|
||||
import akka.actor.typed.scaladsl.PoolRouter
|
||||
import akka.annotation.DoNotInherit
|
||||
|
||||
object Routers {
|
||||
|
||||
/**
|
||||
* A router that will keep track of the available routees registered to the [[akka.actor.typed.receptionist.Receptionist]]
|
||||
* and route over those by random selection.
|
||||
*
|
||||
* In a clustered app this means the routees could live on any node in the cluster.
|
||||
* The current impl does not try to avoid sending messages to unreachable cluster nodes.
|
||||
*
|
||||
* Note that there is a delay between a routee stopping and this being detected by the receptionist, and another
|
||||
* before the group detects this, therefore it is best to unregister routees from the receptionist and not stop
|
||||
* until the deregistration is complete to minimize the risk of lost messages.
|
||||
*/
|
||||
def group[T](key: ServiceKey[T]): GroupRouter[T] =
|
||||
new GroupRouterBuilder[T](key)
|
||||
|
||||
/**
|
||||
* Spawn `poolSize` children with the given `behavior` and forward messages to them using round robin.
|
||||
* If a child is stopped it is removed from the pool, to have children restart on failure use supervision.
|
||||
* When all children are stopped the pool stops itself. To stop the pool from the outside, use `ActorContext.stop`
|
||||
* from the parent actor.
|
||||
*
|
||||
* Note that if a child stops there is a slight chance that messages still get delivered to it, and get lost,
|
||||
* before the pool sees that the child stopped. Therefore it is best to _not_ stop children arbitrarily.
|
||||
*/
|
||||
def pool[T](poolSize: Int)(behavior: Behavior[T]): PoolRouter[T] =
|
||||
new PoolRouterBuilder[T](poolSize, behavior)
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Provides builder style configuration options for group routers
|
||||
*
|
||||
* Not for user extension. Use [[Routers#group]] to create
|
||||
*/
|
||||
@DoNotInherit
|
||||
abstract class GroupRouter[T] extends DeferredBehavior[T] {
|
||||
|
||||
/**
|
||||
* Route messages by randomly selecting the routee from the available routees.
|
||||
*
|
||||
* This is the default for group routers.
|
||||
*/
|
||||
def withRandomRouting(): GroupRouter[T]
|
||||
|
||||
/**
|
||||
* Route messages by using round robin.
|
||||
*
|
||||
* Round robin gives fair routing where every available routee gets the same amount of messages as long as the set
|
||||
* of routees stays relatively stable, but may be unfair if the set of routees changes a lot.
|
||||
*/
|
||||
def withRoundRobinRouting(): GroupRouter[T]
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Provides builder style configuration options for pool routers
|
||||
*
|
||||
* Not for user extension. Use [[Routers#pool]] to create
|
||||
*/
|
||||
@DoNotInherit
|
||||
abstract class PoolRouter[T] extends DeferredBehavior[T] {
|
||||
|
||||
/**
|
||||
* Route messages by randomly selecting the routee from the available routees.
|
||||
*
|
||||
* Random routing makes it less likely that every `poolsize` message from a single producer ends up in the same
|
||||
* mailbox of a slow actor.
|
||||
*/
|
||||
def withRandomRouting(): PoolRouter[T]
|
||||
|
||||
/**
|
||||
* Route messages through round robin, providing a fair distribution of messages across the routees.
|
||||
*
|
||||
* Round robin gives fair routing where every available routee gets the same amount of messages
|
||||
*
|
||||
* This is the default for pool routers.
|
||||
*/
|
||||
def withRoundRobinRouting(): PoolRouter[T]
|
||||
|
||||
/**
|
||||
* Set a new pool size from the one set at construction
|
||||
*/
|
||||
def withPoolSize(poolSize: Int): PoolRouter[T]
|
||||
}
|
||||
|
|
@ -0,0 +1,95 @@
|
|||
/*
|
||||
* Copyright (C) 2009-2019 Lightbend Inc. <https://www.lightbend.com>
|
||||
*/
|
||||
|
||||
package akka.actor.typed.scaladsl
|
||||
import akka.actor.typed.Behavior
|
||||
import akka.actor.typed.internal.routing.GroupRouterBuilder
|
||||
import akka.actor.typed.internal.routing.PoolRouterBuilder
|
||||
import akka.actor.typed.receptionist.ServiceKey
|
||||
import akka.annotation.DoNotInherit
|
||||
|
||||
object Routers {
|
||||
|
||||
/**
|
||||
* A router that will keep track of the available routees registered to the [[akka.actor.typed.receptionist.Receptionist]]
|
||||
* and route over those by random selection.
|
||||
*
|
||||
* In a clustered app this means the routees could live on any node in the cluster.
|
||||
* The current impl does not try to avoid sending messages to unreachable cluster nodes.
|
||||
*
|
||||
* Note that there is a delay between a routee stopping and this being detected by the receptionist and another
|
||||
* before the group detects this. Because of this it is best to unregister routees from the receptionist and not stop
|
||||
* until the deregistration is complete to minimize the risk of lost messages.
|
||||
*/
|
||||
def group[T](key: ServiceKey[T]): GroupRouter[T] =
|
||||
// fixme: potential detection of cluster and selecting a different impl https://github.com/akka/akka/issues/26355
|
||||
new GroupRouterBuilder[T](key)
|
||||
|
||||
/**
|
||||
* Spawn `poolSize` children with the given `behavior` and forward messages to them using round robin.
|
||||
* If a child is stopped it is removed from the pool. To have children restart on failure, use supervision.
|
||||
* When all children are stopped the pool stops itself. To stop the pool from the outside, use `ActorContext.stop`
|
||||
* from the parent actor.
|
||||
*
|
||||
* Note that if a child stops, there is a slight chance that messages still get delivered to it, and get lost,
|
||||
* before the pool sees that the child stopped. Therefore it is best to _not_ stop children arbitrarily.
|
||||
*/
|
||||
def pool[T](poolSize: Int)(behavior: Behavior[T]): PoolRouter[T] =
|
||||
new PoolRouterBuilder[T](poolSize, behavior)
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Provides builder style configuration options for group routers
|
||||
*
|
||||
* Not for user extension. Use [[Routers#group]] to create
|
||||
*/
|
||||
@DoNotInherit
|
||||
trait GroupRouter[T] extends Behavior[T] {
|
||||
|
||||
/**
|
||||
* Route messages by randomly selecting the routee from the available routees. This is the default for group routers.
|
||||
*/
|
||||
def withRandomRouting(): GroupRouter[T]
|
||||
|
||||
/**
|
||||
* Route messages by using round robin.
|
||||
*
|
||||
* Round robin gives fair routing where every available routee gets the same amount of messages as long as the set
|
||||
* of routees stays relatively stable, but may be unfair if the set of routees changes a lot.
|
||||
*/
|
||||
def withRoundRobinRouting(): GroupRouter[T]
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Provides builder style configuration options for pool routers
|
||||
*
|
||||
* Not for user extension. Use [[Routers#pool]] to create
|
||||
*/
|
||||
@DoNotInherit
|
||||
trait PoolRouter[T] extends Behavior[T] {
|
||||
|
||||
/**
|
||||
* Route messages by randomly selecting the routee from the available routees.
|
||||
*
|
||||
* Random routing makes it less likely that every `poolsize` message from a single producer ends up in the same
|
||||
* mailbox of a slow actor.
|
||||
*/
|
||||
def withRandomRouting(): PoolRouter[T]
|
||||
|
||||
/**
|
||||
* Route messages through round robin, providing a fair distribution of messages across the routees.
|
||||
*
|
||||
* Round robin gives fair routing where every available routee gets the same amount of messages
|
||||
*
|
||||
* This is the default for pool routers.
|
||||
*/
|
||||
def withRoundRobinRouting(): PoolRouter[T]
|
||||
|
||||
/**
|
||||
* Set a new pool size from the one set at construction
|
||||
*/
|
||||
def withPoolSize(poolSize: Int): PoolRouter[T]
|
||||
}
|
||||
|
|
@ -21,6 +21,7 @@ This module is currently marked as @ref:[may change](../common/may-change.md) in
|
|||
* [interaction patterns](interaction-patterns.md)
|
||||
* [fault-tolerance](fault-tolerance.md)
|
||||
* [actor-discovery](actor-discovery.md)
|
||||
* [routers](routers.md)
|
||||
* [stash](stash.md)
|
||||
* [stream](stream.md)
|
||||
* [cluster](cluster.md)
|
||||
|
|
|
|||
89
akka-docs/src/main/paradox/typed/routers.md
Normal file
89
akka-docs/src/main/paradox/typed/routers.md
Normal file
|
|
@ -0,0 +1,89 @@
|
|||
# Routers
|
||||
|
||||
## Dependency
|
||||
|
||||
To use Akka Actor Typed, you must add the following dependency in your project:
|
||||
|
||||
@@dependency[sbt,Maven,Gradle] {
|
||||
group=com.typesafe.akka
|
||||
artifact=akka-actor-typed_$scala.binary_version$
|
||||
version=$akka.version$
|
||||
}
|
||||
|
||||
## Introduction
|
||||
|
||||
In some cases it is useful to distribute messages of the same type over a set of actors, so that messages can be
|
||||
processed in parallel - a single actor will only process one message at a time.
|
||||
|
||||
The router itself is a behavior that is spawned into a running actor that will then forward any message sent to it
|
||||
to one final recipient out of the set of routees.
|
||||
|
||||
There are two kinds of routers included in Akka Typed - the pool router and the group router.
|
||||
|
||||
## Pool Router
|
||||
|
||||
The pool router is created with a routee `Behavior` and spawns a number of children with that behavior which it will
|
||||
then forward messages to.
|
||||
|
||||
If a child is stopped the pool router removes it from its set of routees. When the last child stops the router itself stops.
|
||||
To make a resilient router that deals with failures the routee `Behavior` must be supervised.
|
||||
|
||||
|
||||
Scala
|
||||
: @@snip [RouterSpec.scala](/akka-actor-typed-tests/src/test/scala/docs/akka/typed/RouterSpec.scala) { #pool }
|
||||
|
||||
Java
|
||||
: @@snip [RouterTest.java](/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/RouterTest.java) { #pool }
|
||||
|
||||
|
||||
## Group Router
|
||||
|
||||
The group router is created with a `ServiceKey` and uses the receptionist (see @ref:[Receptionist](actor-discovery.md#Receptionist)) to discover
|
||||
available actors for that key and routes messages to one of the currently known registered actors for a key.
|
||||
|
||||
Since the receptionist is used this means the group router is cluster aware out of the box and will pick up routees
|
||||
registered on any node in the cluster (there is currently no logic to avoid routing to unreachable nodes, see [#26355](https://github.com/akka/akka/issues/26355)).
|
||||
|
||||
It also means that the set of routees is eventually consistent, and that immediately when the group router is started
|
||||
the set of routees it knows about is empty. When the set of routees is empty messages sent to the router is forwarded
|
||||
to dead letters.
|
||||
|
||||
Scala
|
||||
: @@snip [RouterSpec.scala](/akka-actor-typed-tests/src/test/scala/docs/akka/typed/RouterSpec.scala) { #group }
|
||||
|
||||
Java
|
||||
: @@snip [RouterTest.java](/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/RouterTest.java) { #group }
|
||||
|
||||
|
||||
## Routing strategies
|
||||
|
||||
There are two different strategies for selecting what routee a message is forwarded to that can be selected
|
||||
from the router before spawning it:
|
||||
|
||||
Scala
|
||||
: @@snip [RouterSpec.scala](/akka-actor-typed-tests/src/test/scala/docs/akka/typed/RouterSpec.scala) { #strategy }
|
||||
|
||||
Java
|
||||
: @@snip [RouterTest.java](/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/RouterTest.java) { #strategy }
|
||||
|
||||
### Round Robin
|
||||
|
||||
Rotates over the set of routees making sure that if there are `n` routees, then for `n` messages
|
||||
sent through the router, each actor is forwarded one message.
|
||||
|
||||
This is the default for pool routers.
|
||||
|
||||
### Random
|
||||
Randomly selects a routee when a message is sent through the router.
|
||||
|
||||
This is the default for group routers.
|
||||
|
||||
## Routers and performance
|
||||
|
||||
Note that if the routees are sharing a resource, the resource will determine if increasing the number of
|
||||
actors will actually give higher throughput or faster answers. For example if the routees are CPU bound actors
|
||||
it will not give better performance to create more routees than there are threads to execute the actors.
|
||||
|
||||
Since the router itself is an actor and has a mailbox this means that messages are routed sequentially to the routees
|
||||
where it can be processed in parallel (depending on the available threads in the dispatcher).
|
||||
In a high throughput use cases the sequential routing could be a bottle neck. Akka Typed does not provide an optimized tool for this.
|
||||
Loading…
Add table
Add a link
Reference in a new issue