diff --git a/akka-actor-typed-tests/src/test/java/akka/actor/typed/javadsl/RoutersTest.java b/akka-actor-typed-tests/src/test/java/akka/actor/typed/javadsl/RoutersTest.java index 02b54b6c92..58a5eab6ba 100644 --- a/akka-actor-typed-tests/src/test/java/akka/actor/typed/javadsl/RoutersTest.java +++ b/akka-actor-typed-tests/src/test/java/akka/actor/typed/javadsl/RoutersTest.java @@ -15,6 +15,8 @@ public class RoutersTest { Behavior group = Routers.group(key).withRandomRouting().withRoundRobinRouting(); Behavior pool = - Routers.pool(5, Behaviors.empty()).withRandomRouting().withRoundRobinRouting(); + Routers.pool(5, () -> Behaviors.empty()) + .withRandomRouting() + .withRoundRobinRouting(); } } diff --git a/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/RouterTest.java b/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/RouterTest.java index d91f7327b5..dac7c08764 100644 --- a/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/RouterTest.java +++ b/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/RouterTest.java @@ -57,10 +57,12 @@ public class RouterTest { return Behaviors.setup( context -> { // #pool - // make sure the workers are restarted if they fail - Behavior supervisedWorker = - Behaviors.supervise(Worker.behavior).onFailure(SupervisorStrategy.restart()); - PoolRouter pool = Routers.pool(4, supervisedWorker); + PoolRouter pool = + Routers.pool( + 4, + () -> + // make sure the workers are restarted if they fail + Behaviors.supervise(Worker.behavior).onFailure(SupervisorStrategy.restart())); ActorRef router = context.spawn(pool, "worker-pool"); for (int i = 0; i < 10; i++) { diff --git a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/scaladsl/RoutersSpec.scala b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/scaladsl/RoutersSpec.scala index 05217ab1a0..ee583a26c2 100644 --- a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/scaladsl/RoutersSpec.scala +++ b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/scaladsl/RoutersSpec.scala @@ -27,7 +27,8 @@ class RoutersSpec extends ScalaTestWithActorTestKit(""" def compileOnlyApiCoverage(): Unit = { Routers.group(ServiceKey[String]("key")).withRandomRouting().withRoundRobinRouting() - Routers.pool(10)(Behavior.empty[Any]).withRandomRouting().withRoundRobinRouting() + Routers.pool(10)(() => Behavior.empty[Any]).withRandomRouting() + Routers.pool(10)(() => Behavior.empty[Any]).withRoundRobinRouting() } "The router pool" must { @@ -36,14 +37,15 @@ class RoutersSpec extends ScalaTestWithActorTestKit(""" val childCounter = new AtomicInteger(0) case class Ack(msg: String, recipient: Int) val probe = createTestProbe[AnyRef]() - val pool = spawn(Routers.pool[String](4)(Behaviors.setup { _ => - val id = childCounter.getAndIncrement() - probe.ref ! s"started $id" - Behaviors.receiveMessage { msg => - probe.ref ! Ack(msg, id) - Behaviors.same - } - })) + val pool = spawn(Routers.pool[String](4)(() => + Behaviors.setup { _ => + val id = childCounter.getAndIncrement() + probe.ref ! s"started $id" + Behaviors.receiveMessage { msg => + probe.ref ! Ack(msg, id) + Behaviors.same + } + })) // ordering of these msgs is not guaranteed val expectedStarted = (0 to 3).map { n => @@ -70,15 +72,14 @@ class RoutersSpec extends ScalaTestWithActorTestKit(""" "keep routing to the rest of the children if some children stops" in { val probe = createTestProbe[String]() - val pool = spawn(Routers.pool[String](4)(Behaviors.setup { _ => + val pool = spawn(Routers.pool[String](4)(() => Behaviors.receiveMessage { case "stop" => Behaviors.stopped case msg => probe.ref ! msg Behaviors.same - } - })) + })) EventFilter.debug(start = "Pool child stopped", occurrences = 2).intercept { pool ! "stop" @@ -100,11 +101,10 @@ class RoutersSpec extends ScalaTestWithActorTestKit(""" "stops if all children stops" in { val probe = createTestProbe() - val pool = spawn(Routers.pool[String](4)(Behaviors.setup { _ => + val pool = spawn(Routers.pool[String](4)(() => Behaviors.receiveMessage { _ => Behaviors.stopped - } - })) + })) EventFilter.info(start = "Last pool child stopped, stopping pool", occurrences = 1).intercept { (0 to 3).foreach { _ => diff --git a/akka-actor-typed-tests/src/test/scala/docs/akka/typed/RouterSpec.scala b/akka-actor-typed-tests/src/test/scala/docs/akka/typed/RouterSpec.scala index 7daa3536eb..613ce4d985 100644 --- a/akka-actor-typed-tests/src/test/scala/docs/akka/typed/RouterSpec.scala +++ b/akka-actor-typed-tests/src/test/scala/docs/akka/typed/RouterSpec.scala @@ -47,9 +47,9 @@ class RouterSpec extends ScalaTestWithActorTestKit with WordSpecLike { "show pool routing" in { spawn(Behaviors.setup[Unit] { ctx => // #pool - // make sure the workers are restarted if they fail - val supervisedWorker = Behaviors.supervise(Worker.behavior).onFailure[Exception](SupervisorStrategy.restart) - val pool = Routers.pool(poolSize = 4)(supervisedWorker) + val pool = Routers.pool(poolSize = 4)(() => + // make sure the workers are restarted if they fail + Behaviors.supervise(Worker.behavior).onFailure[Exception](SupervisorStrategy.restart)) val router = ctx.spawn(pool, "worker-pool") (0 to 10).foreach { n => diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/routing/PoolRouterImpl.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/routing/PoolRouterImpl.scala index b4e3512553..aa6d923aa4 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/routing/PoolRouterImpl.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/routing/PoolRouterImpl.scala @@ -15,7 +15,7 @@ import akka.annotation.InternalApi @InternalApi private[akka] final case class PoolRouterBuilder[T]( poolSize: Int, - behavior: Behavior[T], + behaviorFactory: () => Behavior[T], logicFactory: () => RoutingLogic[T] = () => new RoutingLogics.RoundRobinLogic[T]) extends javadsl.PoolRouter[T] with scaladsl.PoolRouter[T] { @@ -23,7 +23,7 @@ private[akka] final case class PoolRouterBuilder[T]( // deferred creation of the actual router def apply(ctx: TypedActorContext[T]): Behavior[T] = - new PoolRouterImpl[T](ctx.asScala, poolSize, behavior, logicFactory()) + new PoolRouterImpl[T](ctx.asScala, poolSize, behaviorFactory, logicFactory()) def withRandomRouting(): PoolRouterBuilder[T] = copy(logicFactory = () => new RoutingLogics.RandomLogic[T]()) @@ -39,12 +39,12 @@ private[akka] final case class PoolRouterBuilder[T]( private final class PoolRouterImpl[T]( ctx: ActorContext[T], poolSize: Int, - behavior: Behavior[T], + behaviorFactory: () => Behavior[T], logic: RoutingLogic[T]) extends AbstractBehavior[T] { (1 to poolSize).foreach { _ => - val child = ctx.spawnAnonymous(behavior) + val child = ctx.spawnAnonymous(behaviorFactory()) ctx.watch(child) child } diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/javadsl/Routers.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/javadsl/Routers.scala index 777599f557..ea77446a9f 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/javadsl/Routers.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/javadsl/Routers.scala @@ -10,6 +10,7 @@ import akka.actor.typed.internal.routing.GroupRouterBuilder import akka.actor.typed.internal.routing.PoolRouterBuilder import akka.actor.typed.receptionist.ServiceKey import akka.annotation.DoNotInherit +import akka.japi.function.Creator object Routers { @@ -36,8 +37,8 @@ object Routers { * Note that if a child stops there is a slight chance that messages still get delivered to it, and get lost, * before the pool sees that the child stopped. Therefore it is best to _not_ stop children arbitrarily. */ - def pool[T](poolSize: Int)(behavior: Behavior[T]): PoolRouter[T] = - new PoolRouterBuilder[T](poolSize, behavior) + def pool[T](poolSize: Int)(behaviorFactory: Creator[Behavior[T]]): PoolRouter[T] = + new PoolRouterBuilder[T](poolSize, behaviorFactory.create _) } diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/Routers.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/Routers.scala index 0af4beefd8..d4028c5003 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/Routers.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/Routers.scala @@ -34,8 +34,8 @@ object Routers { * Note that if a child stops, there is a slight chance that messages still get delivered to it, and get lost, * before the pool sees that the child stopped. Therefore it is best to _not_ stop children arbitrarily. */ - def pool[T](poolSize: Int)(behavior: Behavior[T]): PoolRouter[T] = - new PoolRouterBuilder[T](poolSize, behavior) + def pool[T](poolSize: Int)(behaviorFactory: () => Behavior[T]): PoolRouter[T] = + new PoolRouterBuilder[T](poolSize, behaviorFactory) } diff --git a/akka-docs/src/main/paradox/project/migration-guide-2.5.x-2.6.x.md b/akka-docs/src/main/paradox/project/migration-guide-2.5.x-2.6.x.md index 82f0d3ccf5..ae091d04cd 100644 --- a/akka-docs/src/main/paradox/project/migration-guide-2.5.x-2.6.x.md +++ b/akka-docs/src/main/paradox/project/migration-guide-2.5.x-2.6.x.md @@ -222,4 +222,4 @@ Akka Typed APIs are still marked as [may change](../common/may-change.md) and th * Factory method `Entity.ofPersistentEntity` is renamed to `Entity.ofEventSourcedEntity` in the Java API for Akka Cluster Sharding Typed. * New abstract class `EventSourcedEntityWithEnforcedReplies` in Java API for Akka Cluster Sharding Typed and corresponding factory method `Entity.ofEventSourcedEntityWithEnforcedReplies` to ease the creation of `EventSourcedBehavior` with enforced replies. * New method `EventSourcedEntity.withEnforcedReplies` added to Scala API to ease the creation of `EventSourcedBehavior` with enforced replies. - +* `Routers.pool` now take a factory function rather than a `Behavior` to protect against accidentally sharing same behavior instance and state across routees. diff --git a/akka-docs/src/main/paradox/typed/routers.md b/akka-docs/src/main/paradox/typed/routers.md index 5c67207411..a491d53fc6 100644 --- a/akka-docs/src/main/paradox/typed/routers.md +++ b/akka-docs/src/main/paradox/typed/routers.md @@ -22,12 +22,14 @@ There are two kinds of routers included in Akka Typed - the pool router and the ## Pool Router -The pool router is created with a routee `Behavior` and spawns a number of children with that behavior which it will +The pool router is created with a routee `Behavior` factory and spawns a number of children with that behavior which it will then forward messages to. If a child is stopped the pool router removes it from its set of routees. When the last child stops the router itself stops. To make a resilient router that deals with failures the routee `Behavior` must be supervised. +Note that it is important that the factory returns a new behavior instance for every call to the factory or else +routees may end up sharing mutable state and not work as expected. Scala : @@snip [RouterSpec.scala](/akka-actor-typed-tests/src/test/scala/docs/akka/typed/RouterSpec.scala) { #pool }