diff --git a/akka-actor-typed-tests/src/test/java/akka/actor/typed/javadsl/RoutersTest.java b/akka-actor-typed-tests/src/test/java/akka/actor/typed/javadsl/RoutersTest.java new file mode 100644 index 0000000000..02b54b6c92 --- /dev/null +++ b/akka-actor-typed-tests/src/test/java/akka/actor/typed/javadsl/RoutersTest.java @@ -0,0 +1,20 @@ +/* + * Copyright (C) 2009-2019 Lightbend Inc. + */ + +package akka.actor.typed.javadsl; + +import akka.actor.typed.Behavior; +import akka.actor.typed.receptionist.ServiceKey; + +public class RoutersTest { + + public void compileOnlyApiTest() { + + final ServiceKey key = ServiceKey.create(String.class, "key"); + Behavior group = Routers.group(key).withRandomRouting().withRoundRobinRouting(); + + Behavior pool = + Routers.pool(5, Behaviors.empty()).withRandomRouting().withRoundRobinRouting(); + } +} diff --git a/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/RouterTest.java b/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/RouterTest.java new file mode 100644 index 0000000000..d91f7327b5 --- /dev/null +++ b/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/RouterTest.java @@ -0,0 +1,117 @@ +/* + * Copyright (C) 2019 Lightbend Inc. + */ + +package jdocs.akka.typed; +/* + * Copyright (C) 2009-2019 Lightbend Inc. + */ + +import akka.actor.typed.ActorSystem; +// #pool +import akka.actor.typed.ActorRef; +import akka.actor.typed.Behavior; +import akka.actor.typed.SupervisorStrategy; +import akka.actor.typed.javadsl.Behaviors; +import akka.actor.typed.javadsl.GroupRouter; +import akka.actor.typed.javadsl.PoolRouter; +import akka.actor.typed.javadsl.Routers; +import akka.actor.typed.receptionist.Receptionist; +import akka.actor.typed.receptionist.ServiceKey; + +// #pool + +public class RouterTest { + + static // #pool + class Worker { + interface Command {} + + static class DoLog implements Command { + public final String text; + + public DoLog(String text) { + this.text = text; + } + } + + static final Behavior behavior = + Behaviors.setup( + context -> { + context.getLog().info("Starting worker"); + + return Behaviors.receive(Command.class) + .onMessage( + DoLog.class, + (notUsed, doLog) -> { + context.getLog().info("Got message {}", doLog.text); + return Behaviors.same(); + }) + .build(); + }); + } + + // #pool + + static Behavior showPoolRouting() { + return Behaviors.setup( + context -> { + // #pool + // make sure the workers are restarted if they fail + Behavior supervisedWorker = + Behaviors.supervise(Worker.behavior).onFailure(SupervisorStrategy.restart()); + PoolRouter pool = Routers.pool(4, supervisedWorker); + ActorRef router = context.spawn(pool, "worker-pool"); + + for (int i = 0; i < 10; i++) { + router.tell(new Worker.DoLog("msg " + i)); + } + // #pool + + // #strategy + PoolRouter alternativePool = pool.withPoolSize(2).withRoundRobinRouting(); + // #strategy + + return Behaviors.empty(); + }); + } + + static Behavior showGroupRouting() { + ServiceKey serviceKey = ServiceKey.create(Worker.Command.class, "log-worker"); + return Behaviors.setup( + context -> { + // #group + // this would likely happen elsewhere - if we create it locally we + // can just as well use a pool + ActorRef worker = context.spawn(Worker.behavior, "worker"); + context.getSystem().receptionist().tell(Receptionist.register(serviceKey, worker)); + + GroupRouter group = Routers.group(serviceKey); + ActorRef router = context.spawn(group, "worker-group"); + + // note that since registration of workers goes through the receptionist there is no + // guarantee the router has seen any workers yet if we hit it directly like this and + // these messages may end up in dead letters - in a real application you would not use + // a group router like this - it is to keep the sample simple + for (int i = 0; i < 10; i++) { + router.tell(new Worker.DoLog("msg " + i)); + } + // #group + + return Behaviors.empty(); + }); + } + + public static void main(String[] args) { + ActorSystem system = + ActorSystem.create( + Behaviors.setup( + context -> { + context.spawn(showPoolRouting(), "pool-router-setup"); + context.spawn(showGroupRouting(), "group-router-setup"); + + return Behaviors.empty(); + }), + "RouterTest"); + } +} diff --git a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/internal/routing/RoutingLogicSpec.scala b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/internal/routing/RoutingLogicSpec.scala new file mode 100644 index 0000000000..b763d8b626 --- /dev/null +++ b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/internal/routing/RoutingLogicSpec.scala @@ -0,0 +1,138 @@ +/* + * Copyright (C) 2009-2019 Lightbend Inc. + */ + +package akka.actor.typed.internal.routing +import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit +import akka.actor.testkit.typed.scaladsl.TestProbe +import org.scalatest.Matchers +import org.scalatest.WordSpecLike + +class RoutingLogicSpec extends ScalaTestWithActorTestKit with WordSpecLike with Matchers { + + "The round robin routing logic" must { + + "round robin" in { + val refA = TestProbe("a").ref + val refB = TestProbe("b").ref + val refC = TestProbe("c").ref + val allRoutees = Set(refA, refB, refC) + + val logic = new RoutingLogics.RoundRobinLogic[Any] + + logic.routeesUpdated(allRoutees) + logic.selectRoutee() should ===(refA) + logic.selectRoutee() should ===(refB) + logic.selectRoutee() should ===(refC) + logic.selectRoutee() should ===(refA) + } + + "not skip one on removal" in { + val refA = TestProbe("a").ref + val refB = TestProbe("b").ref + val refC = TestProbe("c").ref + val allRoutees = Set(refA, refB, refC) + + val logic = new RoutingLogics.RoundRobinLogic[Any] + logic.routeesUpdated(allRoutees) + logic.selectRoutee() should ===(refA) + logic.selectRoutee() should ===(refB) + + val bRemoved = Set(refA, refC) + logic.routeesUpdated(bRemoved) + logic.selectRoutee() should ===(refC) + } + + "handle last one removed" in { + val refA = TestProbe("a").ref + val refB = TestProbe("b").ref + val allRoutees = Set(refA, refB) + + val logic = new RoutingLogics.RoundRobinLogic[Any] + logic.routeesUpdated(allRoutees) + logic.selectRoutee() should ===(refA) + + val bRemoved = Set(refA) + logic.routeesUpdated(bRemoved) + logic.selectRoutee() should ===(refA) + } + + "move on to next when several removed" in { + // this can happen with a group router update + val refA = TestProbe("a").ref + val refB = TestProbe("b").ref + val refC = TestProbe("c").ref + val refD = TestProbe("d").ref + val allRoutees = Set(refA, refB, refC, refD) + + val logic = new RoutingLogics.RoundRobinLogic[Any] + logic.routeesUpdated(allRoutees) + logic.selectRoutee() should ===(refA) + logic.selectRoutee() should ===(refB) + + val severalRemoved = Set(refA, refC) + logic.routeesUpdated(severalRemoved) + logic.selectRoutee() should ===(refC) + } + + "wrap around when several removed" in { + // this can happen with a group router update + val refA = TestProbe("a").ref + val refB = TestProbe("b").ref + val refC = TestProbe("c").ref + val refD = TestProbe("d").ref + val allRoutees = Set(refA, refB, refC, refD) + + val logic = new RoutingLogics.RoundRobinLogic[Any] + logic.routeesUpdated(allRoutees) + logic.selectRoutee() should ===(refA) + logic.selectRoutee() should ===(refB) + logic.selectRoutee() should ===(refC) + + val severalRemoved = Set(refA, refC) + logic.routeesUpdated(severalRemoved) + logic.selectRoutee() should ===(refA) + } + + "pick first in with a completely new set of routees" in { + // this can happen with a group router update + val refA = TestProbe("a").ref + val refB = TestProbe("b").ref + val refC = TestProbe("c").ref + val refD = TestProbe("d").ref + val initialRoutees = Set(refA, refB) + + val logic = new RoutingLogics.RoundRobinLogic[Any] + logic.routeesUpdated(initialRoutees) + logic.selectRoutee() should ===(refA) + logic.selectRoutee() should ===(refB) + logic.selectRoutee() should ===(refA) + + val severalRemoved = Set(refC, refD) + logic.routeesUpdated(severalRemoved) + logic.selectRoutee() should ===(refC) + } + + } + + "The random logic" must { + "select randomly" in { + val refA = TestProbe("a").ref + val refB = TestProbe("b").ref + val refC = TestProbe("c").ref + val refD = TestProbe("d").ref + val routees = Set(refA, refB, refC, refD) + + val logic = new RoutingLogics.RandomLogic[Any]() + logic.routeesUpdated(routees) + + (0 to 10).foreach { _ ⇒ + // not much to verify here, but let's exercise it at least + val routee = logic.selectRoutee() + routees should contain(routee) + } + + } + + } +} diff --git a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/scaladsl/RoutersSpec.scala b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/scaladsl/RoutersSpec.scala new file mode 100644 index 0000000000..2980b02933 --- /dev/null +++ b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/scaladsl/RoutersSpec.scala @@ -0,0 +1,207 @@ +/* + * Copyright (C) 2009-2019 Lightbend Inc. + */ + +package akka.actor.typed.scaladsl +import java.util.concurrent.atomic.AtomicInteger + +import akka.actor.DeadLetter +import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit +import akka.actor.testkit.typed.scaladsl.TestProbe +import akka.actor.typed.Behavior +import akka.actor.typed.receptionist.Receptionist +import akka.actor.typed.receptionist.ServiceKey +import akka.actor.typed.scaladsl.adapter._ +import akka.testkit.EventFilter +import org.scalatest.Matchers +import org.scalatest.WordSpecLike + +class RoutersSpec extends ScalaTestWithActorTestKit(""" + akka.loggers = ["akka.testkit.TestEventListener"] + akka.loglevel=debug + """) with WordSpecLike with Matchers { + + // needed for the event filter + implicit val untypedSystem = system.toUntyped + + def compileOnlyApiCoverage(): Unit = { + Routers.group(ServiceKey[String]("key")) + .withRandomRouting() + .withRoundRobinRouting() + + Routers.pool(10)(Behavior.empty[Any]) + .withRandomRouting() + .withRoundRobinRouting() + } + + "The router pool" must { + + "create n children and route messages to" in { + val childCounter = new AtomicInteger(0) + val probe = createTestProbe[String]() + val pool = spawn(Routers.pool[String](4)(Behaviors.setup { _ ⇒ + val id = childCounter.getAndIncrement() + probe.ref ! s"started $id" + Behaviors.receiveMessage { msg ⇒ + probe.ref ! s"$id $msg" + Behaviors.same + } + })) + + // ordering of these msgs is not guaranteed + val expectedStarted = (0 to 3).map { n ⇒ s"started $n" }.toSet + probe.receiveMessages(4).toSet should ===(expectedStarted) + + (0 to 8).foreach { n ⇒ + pool ! s"message-$n" + val expectedRecipient = n % 4 + probe.expectMessage(s"$expectedRecipient message-$n") + } + } + + "keep routing to the rest of the children if some children stops" in { + val probe = createTestProbe[String]() + val pool = spawn(Routers.pool[String](4)(Behaviors.setup { _ ⇒ + Behaviors.receiveMessage { + case "stop" ⇒ + Behaviors.stopped + case msg ⇒ + probe.ref ! msg + Behaviors.same + } + })) + + EventFilter.debug(start = "Pool child stopped", occurrences = 2).intercept { + pool ! "stop" + pool ! "stop" + } + + // there is a race here where the child stopped but the router did not see that message yet, and may + // deliver messages to it, which will end up in dead letters. + // this test protects against that by waiting for the log entry to show up + + val responses = (0 to 4).map { n ⇒ + val msg = s"message-$n" + pool ! msg + probe.expectMessageType[String] + } + + responses should contain allOf ("message-0", "message-1", "message-2", "message-3", "message-4") + } + + "stops if all children stops" in { + val probe = createTestProbe() + val pool = spawn(Routers.pool[String](4)(Behaviors.setup { _ ⇒ + Behaviors.receiveMessage { _ ⇒ + Behaviors.stopped + } + })) + + EventFilter.info(start = "Last pool child stopped, stopping pool", occurrences = 1).intercept { + (0 to 3).foreach { _ ⇒ + pool ! "stop" + } + probe.expectTerminated(pool) + } + } + + } + + "The router group" must { + + val receptionistDelayMs = 250 + + "route messages across routees registered to the receptionist" in { + val serviceKey = ServiceKey[String]("group-routing-1") + val probe = createTestProbe[String]() + val routeeBehavior: Behavior[String] = Behaviors.receiveMessage { msg ⇒ + probe.ref ! msg + Behaviors.same + } + + (0 to 3).foreach { n ⇒ + val ref = spawn(routeeBehavior, s"group-1-routee-$n") + system.receptionist ! Receptionist.register(serviceKey, ref) + } + + val group = spawn(Routers.group(serviceKey), "group-router-1") + + // give the group a little time to get a listing from the receptionist + Thread.sleep(receptionistDelayMs) + + (0 to 3).foreach { n ⇒ + val msg = s"message-$n" + group ! msg + probe.expectMessage(msg) + } + + testKit.stop(group) + } + + "pass messages to dead letters when there are no routees available" in { + val serviceKey = ServiceKey[String]("group-routing-2") + val group = spawn(Routers.group(serviceKey), "group-router-2") + val probe = TestProbe[DeadLetter]() + system.toUntyped.eventStream.subscribe(probe.ref.toUntyped, classOf[DeadLetter]) + + (0 to 3).foreach { n ⇒ + val msg = s"message-$n" + /* FIXME cant watch log events until #26432 is fixed + EventFilter.info(start = "Message [java.lang.String] ... was not delivered.", occurrences = 1).intercept { */ + group ! msg + probe.expectMessageType[DeadLetter] + /* } */ + } + + testKit.stop(group) + } + + "handle a changing set of routees" in { + val serviceKey = ServiceKey[String]("group-routing-3") + val probe = createTestProbe[String]() + val routeeBehavior: Behavior[String] = Behaviors.receiveMessage { + case "stop" ⇒ + Behaviors.stopped + case msg ⇒ + probe.ref ! msg + Behaviors.same + } + + val ref1 = spawn(routeeBehavior, s"group-3-routee-1") + system.receptionist ! Receptionist.register(serviceKey, ref1) + + val ref2 = spawn(routeeBehavior, s"group-3-routee-2") + system.receptionist ! Receptionist.register(serviceKey, ref2) + + val ref3 = spawn(routeeBehavior, s"group-3-routee-3") + system.receptionist ! Receptionist.register(serviceKey, ref3) + + val group = spawn(Routers.group(serviceKey), "group-router-3") + + // give the group a little time to get a listing from the receptionist + Thread.sleep(receptionistDelayMs) + + (0 to 3).foreach { n ⇒ + val msg = s"message-$n" + group ! msg + probe.expectMessage(msg) + } + + ref2 ! "stop" + + // give the group a little time to get an updated listing from the receptionist + Thread.sleep(receptionistDelayMs) + + (0 to 3).foreach { n ⇒ + val msg = s"message-$n" + group ! msg + probe.expectMessage(msg) + } + + testKit.stop(group) + + } + + } + +} diff --git a/akka-actor-typed-tests/src/test/scala/docs/akka/typed/RouterSpec.scala b/akka-actor-typed-tests/src/test/scala/docs/akka/typed/RouterSpec.scala new file mode 100644 index 0000000000..b583de1266 --- /dev/null +++ b/akka-actor-typed-tests/src/test/scala/docs/akka/typed/RouterSpec.scala @@ -0,0 +1,94 @@ +/* + * Copyright (C) 2009-2019 Lightbend Inc. + */ + +package docs.akka.typed + +// #pool +import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit +import akka.actor.typed.Behavior +import akka.actor.typed.SupervisorStrategy +import akka.actor.typed.receptionist.Receptionist +import akka.actor.typed.receptionist.ServiceKey +import akka.actor.typed.scaladsl.Behaviors +import akka.actor.typed.scaladsl.Routers +import org.scalatest.WordSpecLike + +// #pool + +object RouterSpec { + + // #pool + object Worker { + sealed trait Command + case class DoLog(text: String) extends Command + + val behavior: Behavior[Command] = Behaviors.setup { ctx ⇒ + ctx.log.info("Starting worker") + + Behaviors.receiveMessage { + case DoLog(text) ⇒ + ctx.log.info("Got message {}", text) + Behaviors.same + } + } + } + + // #pool + + val serviceKey = ServiceKey[Worker.Command]("log-worker") +} + +class RouterSpec extends ScalaTestWithActorTestKit with WordSpecLike { + import RouterSpec._ + + "The routing sample" must { + + "show pool routing" in { + spawn(Behaviors.setup[Unit] { ctx ⇒ + // #pool + // make sure the workers are restarted if they fail + val supervisedWorker = Behaviors.supervise(Worker.behavior) + .onFailure[Exception](SupervisorStrategy.restart) + val pool = Routers.pool(poolSize = 4)(supervisedWorker) + val router = ctx.spawn(pool, "worker-pool") + + (0 to 10).foreach { n ⇒ + router ! Worker.DoLog(s"msg $n") + } + // #pool + + // #strategy + val alternativePool = pool.withPoolSize(2).withRoundRobinRouting() + // #strategy + + Behaviors.empty + }) + } + + "show group routing" in { + + spawn(Behaviors.setup[Unit] { ctx ⇒ + // #group + // this would likely happen elsewhere - if we create it locally we + // can just as well use a pool + val worker = ctx.spawn(Worker.behavior, "worker") + ctx.system.receptionist ! Receptionist.Register(serviceKey, worker) + + val group = Routers.group(serviceKey); + val router = ctx.spawn(group, "worker-group"); + + // note that since registration of workers goes through the receptionist there is no + // guarantee the router has seen any workers yet if we hit it directly like this and + // these messages may end up in dead letters - in a real application you would not use + // a group router like this - it is to keep the sample simple + (0 to 10).foreach { n ⇒ + router ! Worker.DoLog(s"msg $n") + } + // #group + + Behaviors.empty + }) + } + } +} diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/MessageAndSignals.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/MessageAndSignals.scala index b87ef3a965..b656d57d93 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/MessageAndSignals.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/MessageAndSignals.scala @@ -8,7 +8,7 @@ import akka.annotation.DoNotInherit /** * Envelope that is published on the eventStream for every message that is - * dropped due to overfull queues. + * dropped due to overfull queues or routers with no routees. */ final case class Dropped(msg: Any, recipient: ActorRef[Nothing]) { /** Java API */ diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/routing/GroupRouterImpl.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/routing/GroupRouterImpl.scala new file mode 100644 index 0000000000..54a547fb08 --- /dev/null +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/routing/GroupRouterImpl.scala @@ -0,0 +1,61 @@ +/* + * Copyright (C) 2009-2019 Lightbend Inc. + */ + +package akka.actor.typed.internal.routing + +import akka.actor.typed._ +import akka.actor.typed.receptionist.Receptionist +import akka.actor.typed.receptionist.ServiceKey +import akka.actor.typed.scaladsl.AbstractBehavior +import akka.actor.typed.scaladsl.ActorContext +import akka.annotation.InternalApi + +/** + * Provides builder style configuration options for group routers while still being a behavior that can be spawned + * + * INTERNAL API + */ +@InternalApi +private[akka] final case class GroupRouterBuilder[T] private[akka] ( + key: ServiceKey[T], + logicFactory: () ⇒ RoutingLogic[T] = () ⇒ new RoutingLogics.RandomLogic[T]() +) extends javadsl.GroupRouter[T] + with scaladsl.GroupRouter[T] { + + // deferred creation of the actual router + def apply(ctx: TypedActorContext[T]): Behavior[T] = new GroupRouterImpl[T](ctx.asScala, key, logicFactory()) + + def withRandomRouting(): GroupRouterBuilder[T] = copy(logicFactory = () ⇒ new RoutingLogics.RandomLogic[T]()) + + def withRoundRobinRouting(): GroupRouterBuilder[T] = copy(logicFactory = () ⇒ new RoutingLogics.RoundRobinLogic[T]) + +} + +/** + * INTERNAL API + */ +@InternalApi +private final class GroupRouterImpl[T]( + ctx: ActorContext[T], + serviceKey: ServiceKey[T], + routingLogic: RoutingLogic[T] +) extends AbstractBehavior[T] { + + // casting trix to avoid having to wrap incoming messages - note that this will cause problems if intercepting + // messages to a router + ctx.system.receptionist ! Receptionist.Subscribe(serviceKey, ctx.self.unsafeUpcast[Any].narrow[Receptionist.Listing]) + private var routeesEmpty = true + + def onMessage(msg: T): Behavior[T] = msg match { + case serviceKey.Listing(update) ⇒ + // we don't need to watch, because receptionist already does that + routingLogic.routeesUpdated(update) + routeesEmpty = update.isEmpty + this + case msg: T @unchecked ⇒ + if (!routeesEmpty) routingLogic.selectRoutee() ! msg + else ctx.system.deadLetters ! Dropped(msg, ctx.self) + this + } +} diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/routing/PoolRouterImpl.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/routing/PoolRouterImpl.scala new file mode 100644 index 0000000000..ba5782cc90 --- /dev/null +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/routing/PoolRouterImpl.scala @@ -0,0 +1,79 @@ +/* + * Copyright (C) 2009-2019 Lightbend Inc. + */ + +package akka.actor.typed.internal.routing + +import akka.actor.typed._ +import akka.actor.typed.scaladsl.AbstractBehavior +import akka.actor.typed.scaladsl.ActorContext +import akka.annotation.InternalApi + +/** + * INTERNAL API + */ +@InternalApi +private[akka] final case class PoolRouterBuilder[T]( + poolSize: Int, + behavior: Behavior[T], + logicFactory: () ⇒ RoutingLogic[T] = () ⇒ new RoutingLogics.RoundRobinLogic[T] +) extends javadsl.PoolRouter[T] + with scaladsl.PoolRouter[T] { + if (poolSize < 1) throw new IllegalArgumentException(s"pool size must be positive, was $poolSize") + + // deferred creation of the actual router + def apply(ctx: TypedActorContext[T]): Behavior[T] = new PoolRouterImpl[T](ctx.asScala, poolSize, behavior, logicFactory()) + + def withRandomRouting(): PoolRouterBuilder[T] = copy(logicFactory = () ⇒ new RoutingLogics.RandomLogic[T]()) + + def withRoundRobinRouting(): PoolRouterBuilder[T] = copy(logicFactory = () ⇒ new RoutingLogics.RoundRobinLogic[T]) + + def withPoolSize(poolSize: Int): PoolRouterBuilder[T] = copy(poolSize = poolSize) +} + +/** + * INTERNAL API + */ +@InternalApi +private final class PoolRouterImpl[T]( + ctx: ActorContext[T], + poolSize: Int, + behavior: Behavior[T], + logic: RoutingLogic[T] +) extends AbstractBehavior[T] { + + (1 to poolSize).foreach { _ ⇒ + val child = ctx.spawnAnonymous(behavior) + ctx.watch(child) + child + } + onRouteesChanged() + + private def onRouteesChanged(): Unit = { + val children = ctx.children.toSet.asInstanceOf[Set[ActorRef[T]]] + logic.routeesUpdated(children) + } + + def onMessage(msg: T): Behavior[T] = { + logic.selectRoutee() ! msg + this + } + + override def onSignal: PartialFunction[Signal, Behavior[T]] = { + case Terminated(child) ⇒ + // Note that if several children are stopping concurrently children may already be empty + // for the `Terminated` we receive for the first child. This means it is not certain that + // there will be a log entry per child in those cases (it does not make sense to keep the + // pool alive just to get the logging right when there are no routees available) + if (ctx.children.nonEmpty) { + ctx.log.debug("Pool child stopped [{}]", child.path) + onRouteesChanged() + this + } else { + ctx.log.info("Last pool child stopped, stopping pool [{}]", ctx.self.path) + Behavior.stopped + } + } + +} + diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/routing/RoutingLogic.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/routing/RoutingLogic.scala new file mode 100644 index 0000000000..22fcf1489c --- /dev/null +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/routing/RoutingLogic.scala @@ -0,0 +1,90 @@ +/* + * Copyright (C) 2009-2019 Lightbend Inc. + */ + +package akka.actor.typed.internal.routing + +import akka.actor.typed.ActorRef +import akka.annotation.InternalApi +import akka.dispatch.forkjoin.ThreadLocalRandom + +/** + * Kept in the behavior, not shared between instances, meant to be stateful. + * + * INTERNAL API + */ +@InternalApi +sealed private[akka] trait RoutingLogic[T] { + /** + * @param routees available routees, will contain at least one element. Must not be mutated by select logic. + */ + def selectRoutee(): ActorRef[T] + + /** + * Invoked an initial time before `selectRoutee` is ever called and then every time the set of available + * routees changes. + * + * @param newRoutees The updated set of routees. For a group router this could be empty, in that case + * `selectRoutee()` will not be called before `routeesUpdated` is invoked again with at + * least one routee. For a pool the pool stops instead of ever calling `routeesUpdated` + * with an empty list of routees. + */ + def routeesUpdated(newRoutees: Set[ActorRef[T]]): Unit +} + +/** + * INTERNAL API + */ +@InternalApi +private[akka] object RoutingLogics { + + final class RoundRobinLogic[T] extends RoutingLogic[T] { + + private var currentRoutees: Array[ActorRef[T]] = _ + + private var nextIdx = 0 + + def selectRoutee(): ActorRef[T] = { + if (nextIdx >= currentRoutees.length) nextIdx = 0 + val selected = currentRoutees(nextIdx) + nextIdx += 1 + selected + } + + override def routeesUpdated(newRoutees: Set[ActorRef[T]]): Unit = { + // make sure we keep a somewhat similar order so we can potentially continue roundrobining + // from where we were unless the set of routees completely changed + // Also, avoid putting all entries from the same node next to each other in case of cluster + val sortedNewRoutees = newRoutees.toArray.sortBy(ref ⇒ (ref.path.toStringWithoutAddress, ref.path.address)) + + if (currentRoutees ne null) { + val firstDiffIndex = { + var idx = 0 + while (idx < currentRoutees.length && + idx < sortedNewRoutees.length && + currentRoutees(idx) == sortedNewRoutees(idx)) { + idx += 1 + } + idx + } + if (nextIdx > firstDiffIndex) nextIdx -= 1 + } + currentRoutees = sortedNewRoutees + } + } + + final class RandomLogic[T] extends RoutingLogic[T] { + + private var currentRoutees: Array[ActorRef[T]] = _ + + override def selectRoutee(): ActorRef[T] = { + val selectedIdx = ThreadLocalRandom.current().nextInt(currentRoutees.length) + currentRoutees(selectedIdx) + } + override def routeesUpdated(newRoutees: Set[ActorRef[T]]): Unit = { + currentRoutees = newRoutees.toArray + } + + } + +} diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/javadsl/Routers.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/javadsl/Routers.scala new file mode 100644 index 0000000000..9735053a43 --- /dev/null +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/javadsl/Routers.scala @@ -0,0 +1,99 @@ +/* + * Copyright (C) 2009-2019 Lightbend Inc. + */ + +package akka.actor.typed.javadsl + +import akka.actor.typed.Behavior +import akka.actor.typed.Behavior.DeferredBehavior +import akka.actor.typed.internal.routing.GroupRouterBuilder +import akka.actor.typed.internal.routing.PoolRouterBuilder +import akka.actor.typed.receptionist.ServiceKey +import akka.actor.typed.scaladsl.PoolRouter +import akka.annotation.DoNotInherit + +object Routers { + + /** + * A router that will keep track of the available routees registered to the [[akka.actor.typed.receptionist.Receptionist]] + * and route over those by random selection. + * + * In a clustered app this means the routees could live on any node in the cluster. + * The current impl does not try to avoid sending messages to unreachable cluster nodes. + * + * Note that there is a delay between a routee stopping and this being detected by the receptionist, and another + * before the group detects this, therefore it is best to unregister routees from the receptionist and not stop + * until the deregistration is complete to minimize the risk of lost messages. + */ + def group[T](key: ServiceKey[T]): GroupRouter[T] = + new GroupRouterBuilder[T](key) + + /** + * Spawn `poolSize` children with the given `behavior` and forward messages to them using round robin. + * If a child is stopped it is removed from the pool, to have children restart on failure use supervision. + * When all children are stopped the pool stops itself. To stop the pool from the outside, use `ActorContext.stop` + * from the parent actor. + * + * Note that if a child stops there is a slight chance that messages still get delivered to it, and get lost, + * before the pool sees that the child stopped. Therefore it is best to _not_ stop children arbitrarily. + */ + def pool[T](poolSize: Int)(behavior: Behavior[T]): PoolRouter[T] = + new PoolRouterBuilder[T](poolSize, behavior) + +} + +/** + * Provides builder style configuration options for group routers + * + * Not for user extension. Use [[Routers#group]] to create + */ +@DoNotInherit +abstract class GroupRouter[T] extends DeferredBehavior[T] { + + /** + * Route messages by randomly selecting the routee from the available routees. + * + * This is the default for group routers. + */ + def withRandomRouting(): GroupRouter[T] + + /** + * Route messages by using round robin. + * + * Round robin gives fair routing where every available routee gets the same amount of messages as long as the set + * of routees stays relatively stable, but may be unfair if the set of routees changes a lot. + */ + def withRoundRobinRouting(): GroupRouter[T] + +} + +/** + * Provides builder style configuration options for pool routers + * + * Not for user extension. Use [[Routers#pool]] to create + */ +@DoNotInherit +abstract class PoolRouter[T] extends DeferredBehavior[T] { + + /** + * Route messages by randomly selecting the routee from the available routees. + * + * Random routing makes it less likely that every `poolsize` message from a single producer ends up in the same + * mailbox of a slow actor. + */ + def withRandomRouting(): PoolRouter[T] + + /** + * Route messages through round robin, providing a fair distribution of messages across the routees. + * + * Round robin gives fair routing where every available routee gets the same amount of messages + * + * This is the default for pool routers. + */ + def withRoundRobinRouting(): PoolRouter[T] + + /** + * Set a new pool size from the one set at construction + */ + def withPoolSize(poolSize: Int): PoolRouter[T] +} diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/Routers.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/Routers.scala new file mode 100644 index 0000000000..ba12d86e81 --- /dev/null +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/Routers.scala @@ -0,0 +1,95 @@ +/* + * Copyright (C) 2009-2019 Lightbend Inc. + */ + +package akka.actor.typed.scaladsl +import akka.actor.typed.Behavior +import akka.actor.typed.internal.routing.GroupRouterBuilder +import akka.actor.typed.internal.routing.PoolRouterBuilder +import akka.actor.typed.receptionist.ServiceKey +import akka.annotation.DoNotInherit + +object Routers { + + /** + * A router that will keep track of the available routees registered to the [[akka.actor.typed.receptionist.Receptionist]] + * and route over those by random selection. + * + * In a clustered app this means the routees could live on any node in the cluster. + * The current impl does not try to avoid sending messages to unreachable cluster nodes. + * + * Note that there is a delay between a routee stopping and this being detected by the receptionist and another + * before the group detects this. Because of this it is best to unregister routees from the receptionist and not stop + * until the deregistration is complete to minimize the risk of lost messages. + */ + def group[T](key: ServiceKey[T]): GroupRouter[T] = + // fixme: potential detection of cluster and selecting a different impl https://github.com/akka/akka/issues/26355 + new GroupRouterBuilder[T](key) + + /** + * Spawn `poolSize` children with the given `behavior` and forward messages to them using round robin. + * If a child is stopped it is removed from the pool. To have children restart on failure, use supervision. + * When all children are stopped the pool stops itself. To stop the pool from the outside, use `ActorContext.stop` + * from the parent actor. + * + * Note that if a child stops, there is a slight chance that messages still get delivered to it, and get lost, + * before the pool sees that the child stopped. Therefore it is best to _not_ stop children arbitrarily. + */ + def pool[T](poolSize: Int)(behavior: Behavior[T]): PoolRouter[T] = + new PoolRouterBuilder[T](poolSize, behavior) + +} + +/** + * Provides builder style configuration options for group routers + * + * Not for user extension. Use [[Routers#group]] to create + */ +@DoNotInherit +trait GroupRouter[T] extends Behavior[T] { + + /** + * Route messages by randomly selecting the routee from the available routees. This is the default for group routers. + */ + def withRandomRouting(): GroupRouter[T] + + /** + * Route messages by using round robin. + * + * Round robin gives fair routing where every available routee gets the same amount of messages as long as the set + * of routees stays relatively stable, but may be unfair if the set of routees changes a lot. + */ + def withRoundRobinRouting(): GroupRouter[T] + +} + +/** + * Provides builder style configuration options for pool routers + * + * Not for user extension. Use [[Routers#pool]] to create + */ +@DoNotInherit +trait PoolRouter[T] extends Behavior[T] { + + /** + * Route messages by randomly selecting the routee from the available routees. + * + * Random routing makes it less likely that every `poolsize` message from a single producer ends up in the same + * mailbox of a slow actor. + */ + def withRandomRouting(): PoolRouter[T] + + /** + * Route messages through round robin, providing a fair distribution of messages across the routees. + * + * Round robin gives fair routing where every available routee gets the same amount of messages + * + * This is the default for pool routers. + */ + def withRoundRobinRouting(): PoolRouter[T] + + /** + * Set a new pool size from the one set at construction + */ + def withPoolSize(poolSize: Int): PoolRouter[T] +} diff --git a/akka-docs/src/main/paradox/typed/index.md b/akka-docs/src/main/paradox/typed/index.md index c580082f6a..db8f377c8c 100644 --- a/akka-docs/src/main/paradox/typed/index.md +++ b/akka-docs/src/main/paradox/typed/index.md @@ -21,6 +21,7 @@ This module is currently marked as @ref:[may change](../common/may-change.md) in * [interaction patterns](interaction-patterns.md) * [fault-tolerance](fault-tolerance.md) * [actor-discovery](actor-discovery.md) +* [routers](routers.md) * [stash](stash.md) * [stream](stream.md) * [cluster](cluster.md) diff --git a/akka-docs/src/main/paradox/typed/routers.md b/akka-docs/src/main/paradox/typed/routers.md new file mode 100644 index 0000000000..5c67207411 --- /dev/null +++ b/akka-docs/src/main/paradox/typed/routers.md @@ -0,0 +1,89 @@ +# Routers + +## Dependency + +To use Akka Actor Typed, you must add the following dependency in your project: + +@@dependency[sbt,Maven,Gradle] { + group=com.typesafe.akka + artifact=akka-actor-typed_$scala.binary_version$ + version=$akka.version$ +} + +## Introduction + +In some cases it is useful to distribute messages of the same type over a set of actors, so that messages can be +processed in parallel - a single actor will only process one message at a time. + +The router itself is a behavior that is spawned into a running actor that will then forward any message sent to it +to one final recipient out of the set of routees. + +There are two kinds of routers included in Akka Typed - the pool router and the group router. + +## Pool Router + +The pool router is created with a routee `Behavior` and spawns a number of children with that behavior which it will +then forward messages to. + +If a child is stopped the pool router removes it from its set of routees. When the last child stops the router itself stops. +To make a resilient router that deals with failures the routee `Behavior` must be supervised. + + +Scala +: @@snip [RouterSpec.scala](/akka-actor-typed-tests/src/test/scala/docs/akka/typed/RouterSpec.scala) { #pool } + +Java +: @@snip [RouterTest.java](/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/RouterTest.java) { #pool } + + +## Group Router + +The group router is created with a `ServiceKey` and uses the receptionist (see @ref:[Receptionist](actor-discovery.md#Receptionist)) to discover +available actors for that key and routes messages to one of the currently known registered actors for a key. + +Since the receptionist is used this means the group router is cluster aware out of the box and will pick up routees +registered on any node in the cluster (there is currently no logic to avoid routing to unreachable nodes, see [#26355](https://github.com/akka/akka/issues/26355)). + +It also means that the set of routees is eventually consistent, and that immediately when the group router is started +the set of routees it knows about is empty. When the set of routees is empty messages sent to the router is forwarded +to dead letters. + +Scala +: @@snip [RouterSpec.scala](/akka-actor-typed-tests/src/test/scala/docs/akka/typed/RouterSpec.scala) { #group } + +Java +: @@snip [RouterTest.java](/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/RouterTest.java) { #group } + + +## Routing strategies + +There are two different strategies for selecting what routee a message is forwarded to that can be selected +from the router before spawning it: + +Scala +: @@snip [RouterSpec.scala](/akka-actor-typed-tests/src/test/scala/docs/akka/typed/RouterSpec.scala) { #strategy } + +Java +: @@snip [RouterTest.java](/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/RouterTest.java) { #strategy } + +### Round Robin + +Rotates over the set of routees making sure that if there are `n` routees, then for `n` messages +sent through the router, each actor is forwarded one message. + +This is the default for pool routers. + +### Random +Randomly selects a routee when a message is sent through the router. + +This is the default for group routers. + +## Routers and performance + +Note that if the routees are sharing a resource, the resource will determine if increasing the number of +actors will actually give higher throughput or faster answers. For example if the routees are CPU bound actors +it will not give better performance to create more routees than there are threads to execute the actors. + +Since the router itself is an actor and has a mailbox this means that messages are routed sequentially to the routees +where it can be processed in parallel (depending on the available threads in the dispatcher). +In a high throughput use cases the sequential routing could be a bottle neck. Akka Typed does not provide an optimized tool for this. \ No newline at end of file