Routers.pool now takes a factory (#26987)
Previously took a Behavior which made it impossible to safely use an AbstractBehavior with mutable state as routees without using Behaviors.setup
This commit is contained in:
parent
8d1dcea5d2
commit
2051b7ce6b
9 changed files with 40 additions and 33 deletions
|
|
@ -15,6 +15,8 @@ public class RoutersTest {
|
|||
Behavior<String> group = Routers.group(key).withRandomRouting().withRoundRobinRouting();
|
||||
|
||||
Behavior<String> pool =
|
||||
Routers.pool(5, Behaviors.<String>empty()).withRandomRouting().withRoundRobinRouting();
|
||||
Routers.pool(5, () -> Behaviors.<String>empty())
|
||||
.withRandomRouting()
|
||||
.withRoundRobinRouting();
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -57,10 +57,12 @@ public class RouterTest {
|
|||
return Behaviors.setup(
|
||||
context -> {
|
||||
// #pool
|
||||
// make sure the workers are restarted if they fail
|
||||
Behavior<Worker.Command> supervisedWorker =
|
||||
Behaviors.supervise(Worker.behavior).onFailure(SupervisorStrategy.restart());
|
||||
PoolRouter<Worker.Command> pool = Routers.pool(4, supervisedWorker);
|
||||
PoolRouter<Worker.Command> pool =
|
||||
Routers.pool(
|
||||
4,
|
||||
() ->
|
||||
// make sure the workers are restarted if they fail
|
||||
Behaviors.supervise(Worker.behavior).onFailure(SupervisorStrategy.restart()));
|
||||
ActorRef<Worker.Command> router = context.spawn(pool, "worker-pool");
|
||||
|
||||
for (int i = 0; i < 10; i++) {
|
||||
|
|
|
|||
|
|
@ -27,7 +27,8 @@ class RoutersSpec extends ScalaTestWithActorTestKit("""
|
|||
def compileOnlyApiCoverage(): Unit = {
|
||||
Routers.group(ServiceKey[String]("key")).withRandomRouting().withRoundRobinRouting()
|
||||
|
||||
Routers.pool(10)(Behavior.empty[Any]).withRandomRouting().withRoundRobinRouting()
|
||||
Routers.pool(10)(() => Behavior.empty[Any]).withRandomRouting()
|
||||
Routers.pool(10)(() => Behavior.empty[Any]).withRoundRobinRouting()
|
||||
}
|
||||
|
||||
"The router pool" must {
|
||||
|
|
@ -36,14 +37,15 @@ class RoutersSpec extends ScalaTestWithActorTestKit("""
|
|||
val childCounter = new AtomicInteger(0)
|
||||
case class Ack(msg: String, recipient: Int)
|
||||
val probe = createTestProbe[AnyRef]()
|
||||
val pool = spawn(Routers.pool[String](4)(Behaviors.setup { _ =>
|
||||
val id = childCounter.getAndIncrement()
|
||||
probe.ref ! s"started $id"
|
||||
Behaviors.receiveMessage { msg =>
|
||||
probe.ref ! Ack(msg, id)
|
||||
Behaviors.same
|
||||
}
|
||||
}))
|
||||
val pool = spawn(Routers.pool[String](4)(() =>
|
||||
Behaviors.setup { _ =>
|
||||
val id = childCounter.getAndIncrement()
|
||||
probe.ref ! s"started $id"
|
||||
Behaviors.receiveMessage { msg =>
|
||||
probe.ref ! Ack(msg, id)
|
||||
Behaviors.same
|
||||
}
|
||||
}))
|
||||
|
||||
// ordering of these msgs is not guaranteed
|
||||
val expectedStarted = (0 to 3).map { n =>
|
||||
|
|
@ -70,15 +72,14 @@ class RoutersSpec extends ScalaTestWithActorTestKit("""
|
|||
|
||||
"keep routing to the rest of the children if some children stops" in {
|
||||
val probe = createTestProbe[String]()
|
||||
val pool = spawn(Routers.pool[String](4)(Behaviors.setup { _ =>
|
||||
val pool = spawn(Routers.pool[String](4)(() =>
|
||||
Behaviors.receiveMessage {
|
||||
case "stop" =>
|
||||
Behaviors.stopped
|
||||
case msg =>
|
||||
probe.ref ! msg
|
||||
Behaviors.same
|
||||
}
|
||||
}))
|
||||
}))
|
||||
|
||||
EventFilter.debug(start = "Pool child stopped", occurrences = 2).intercept {
|
||||
pool ! "stop"
|
||||
|
|
@ -100,11 +101,10 @@ class RoutersSpec extends ScalaTestWithActorTestKit("""
|
|||
|
||||
"stops if all children stops" in {
|
||||
val probe = createTestProbe()
|
||||
val pool = spawn(Routers.pool[String](4)(Behaviors.setup { _ =>
|
||||
val pool = spawn(Routers.pool[String](4)(() =>
|
||||
Behaviors.receiveMessage { _ =>
|
||||
Behaviors.stopped
|
||||
}
|
||||
}))
|
||||
}))
|
||||
|
||||
EventFilter.info(start = "Last pool child stopped, stopping pool", occurrences = 1).intercept {
|
||||
(0 to 3).foreach { _ =>
|
||||
|
|
|
|||
|
|
@ -47,9 +47,9 @@ class RouterSpec extends ScalaTestWithActorTestKit with WordSpecLike {
|
|||
"show pool routing" in {
|
||||
spawn(Behaviors.setup[Unit] { ctx =>
|
||||
// #pool
|
||||
// make sure the workers are restarted if they fail
|
||||
val supervisedWorker = Behaviors.supervise(Worker.behavior).onFailure[Exception](SupervisorStrategy.restart)
|
||||
val pool = Routers.pool(poolSize = 4)(supervisedWorker)
|
||||
val pool = Routers.pool(poolSize = 4)(() =>
|
||||
// make sure the workers are restarted if they fail
|
||||
Behaviors.supervise(Worker.behavior).onFailure[Exception](SupervisorStrategy.restart))
|
||||
val router = ctx.spawn(pool, "worker-pool")
|
||||
|
||||
(0 to 10).foreach { n =>
|
||||
|
|
|
|||
|
|
@ -15,7 +15,7 @@ import akka.annotation.InternalApi
|
|||
@InternalApi
|
||||
private[akka] final case class PoolRouterBuilder[T](
|
||||
poolSize: Int,
|
||||
behavior: Behavior[T],
|
||||
behaviorFactory: () => Behavior[T],
|
||||
logicFactory: () => RoutingLogic[T] = () => new RoutingLogics.RoundRobinLogic[T])
|
||||
extends javadsl.PoolRouter[T]
|
||||
with scaladsl.PoolRouter[T] {
|
||||
|
|
@ -23,7 +23,7 @@ private[akka] final case class PoolRouterBuilder[T](
|
|||
|
||||
// deferred creation of the actual router
|
||||
def apply(ctx: TypedActorContext[T]): Behavior[T] =
|
||||
new PoolRouterImpl[T](ctx.asScala, poolSize, behavior, logicFactory())
|
||||
new PoolRouterImpl[T](ctx.asScala, poolSize, behaviorFactory, logicFactory())
|
||||
|
||||
def withRandomRouting(): PoolRouterBuilder[T] = copy(logicFactory = () => new RoutingLogics.RandomLogic[T]())
|
||||
|
||||
|
|
@ -39,12 +39,12 @@ private[akka] final case class PoolRouterBuilder[T](
|
|||
private final class PoolRouterImpl[T](
|
||||
ctx: ActorContext[T],
|
||||
poolSize: Int,
|
||||
behavior: Behavior[T],
|
||||
behaviorFactory: () => Behavior[T],
|
||||
logic: RoutingLogic[T])
|
||||
extends AbstractBehavior[T] {
|
||||
|
||||
(1 to poolSize).foreach { _ =>
|
||||
val child = ctx.spawnAnonymous(behavior)
|
||||
val child = ctx.spawnAnonymous(behaviorFactory())
|
||||
ctx.watch(child)
|
||||
child
|
||||
}
|
||||
|
|
|
|||
|
|
@ -10,6 +10,7 @@ import akka.actor.typed.internal.routing.GroupRouterBuilder
|
|||
import akka.actor.typed.internal.routing.PoolRouterBuilder
|
||||
import akka.actor.typed.receptionist.ServiceKey
|
||||
import akka.annotation.DoNotInherit
|
||||
import akka.japi.function.Creator
|
||||
|
||||
object Routers {
|
||||
|
||||
|
|
@ -36,8 +37,8 @@ object Routers {
|
|||
* Note that if a child stops there is a slight chance that messages still get delivered to it, and get lost,
|
||||
* before the pool sees that the child stopped. Therefore it is best to _not_ stop children arbitrarily.
|
||||
*/
|
||||
def pool[T](poolSize: Int)(behavior: Behavior[T]): PoolRouter[T] =
|
||||
new PoolRouterBuilder[T](poolSize, behavior)
|
||||
def pool[T](poolSize: Int)(behaviorFactory: Creator[Behavior[T]]): PoolRouter[T] =
|
||||
new PoolRouterBuilder[T](poolSize, behaviorFactory.create _)
|
||||
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -34,8 +34,8 @@ object Routers {
|
|||
* Note that if a child stops, there is a slight chance that messages still get delivered to it, and get lost,
|
||||
* before the pool sees that the child stopped. Therefore it is best to _not_ stop children arbitrarily.
|
||||
*/
|
||||
def pool[T](poolSize: Int)(behavior: Behavior[T]): PoolRouter[T] =
|
||||
new PoolRouterBuilder[T](poolSize, behavior)
|
||||
def pool[T](poolSize: Int)(behaviorFactory: () => Behavior[T]): PoolRouter[T] =
|
||||
new PoolRouterBuilder[T](poolSize, behaviorFactory)
|
||||
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -222,4 +222,4 @@ Akka Typed APIs are still marked as [may change](../common/may-change.md) and th
|
|||
* Factory method `Entity.ofPersistentEntity` is renamed to `Entity.ofEventSourcedEntity` in the Java API for Akka Cluster Sharding Typed.
|
||||
* New abstract class `EventSourcedEntityWithEnforcedReplies` in Java API for Akka Cluster Sharding Typed and corresponding factory method `Entity.ofEventSourcedEntityWithEnforcedReplies` to ease the creation of `EventSourcedBehavior` with enforced replies.
|
||||
* New method `EventSourcedEntity.withEnforcedReplies` added to Scala API to ease the creation of `EventSourcedBehavior` with enforced replies.
|
||||
|
||||
* `Routers.pool` now take a factory function rather than a `Behavior` to protect against accidentally sharing same behavior instance and state across routees.
|
||||
|
|
|
|||
|
|
@ -22,12 +22,14 @@ There are two kinds of routers included in Akka Typed - the pool router and the
|
|||
|
||||
## Pool Router
|
||||
|
||||
The pool router is created with a routee `Behavior` and spawns a number of children with that behavior which it will
|
||||
The pool router is created with a routee `Behavior` factory and spawns a number of children with that behavior which it will
|
||||
then forward messages to.
|
||||
|
||||
If a child is stopped the pool router removes it from its set of routees. When the last child stops the router itself stops.
|
||||
To make a resilient router that deals with failures the routee `Behavior` must be supervised.
|
||||
|
||||
Note that it is important that the factory returns a new behavior instance for every call to the factory or else
|
||||
routees may end up sharing mutable state and not work as expected.
|
||||
|
||||
Scala
|
||||
: @@snip [RouterSpec.scala](/akka-actor-typed-tests/src/test/scala/docs/akka/typed/RouterSpec.scala) { #pool }
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue