Go back to taking a Behavior to typed pool routers, #27807 (#27942)

This commit is contained in:
Christopher Batey 2019-10-09 16:07:24 +01:00 committed by Patrik Nordwall
parent 9fa452daea
commit 9a2fa8d2a9
8 changed files with 36 additions and 46 deletions

View file

@ -15,8 +15,6 @@ public class RoutersTest {
Behavior<String> group = Routers.group(key).withRandomRouting().withRoundRobinRouting();
Behavior<String> pool =
Routers.pool(5, () -> Behaviors.<String>empty())
.withRandomRouting()
.withRoundRobinRouting();
Routers.pool(5, Behaviors.<String>empty()).withRandomRouting().withRoundRobinRouting();
}
}

View file

@ -63,9 +63,8 @@ public class RouterTest {
PoolRouter<Worker.Command> pool =
Routers.pool(
poolSize,
() ->
// make sure the workers are restarted if they fail
Behaviors.supervise(Worker.create()).onFailure(SupervisorStrategy.restart()));
// make sure the workers are restarted if they fail
Behaviors.supervise(Worker.create()).onFailure(SupervisorStrategy.restart()));
ActorRef<Worker.Command> router = context.spawn(pool, "worker-pool");
for (int i = 0; i < 10; i++) {

View file

@ -31,8 +31,8 @@ class RoutersSpec extends ScalaTestWithActorTestKit("""
def compileOnlyApiCoverage(): Unit = {
Routers.group(ServiceKey[String]("key")).withRandomRouting().withRoundRobinRouting()
Routers.pool(10)(() => Behaviors.empty[Any]).withRandomRouting()
Routers.pool(10)(() => Behaviors.empty[Any]).withRoundRobinRouting()
Routers.pool(10)(Behaviors.empty[Any]).withRandomRouting()
Routers.pool(10)(Behaviors.empty[Any]).withRoundRobinRouting()
}
"The router pool" must {
@ -41,15 +41,14 @@ class RoutersSpec extends ScalaTestWithActorTestKit("""
val childCounter = new AtomicInteger(0)
case class Ack(msg: String, recipient: Int)
val probe = createTestProbe[AnyRef]()
val pool = spawn(Routers.pool[String](4)(() =>
Behaviors.setup { _ =>
val id = childCounter.getAndIncrement()
probe.ref ! s"started $id"
Behaviors.receiveMessage { msg =>
probe.ref ! Ack(msg, id)
Behaviors.same
}
}))
val pool = spawn(Routers.pool[String](4)(Behaviors.setup { _ =>
val id = childCounter.getAndIncrement()
probe.ref ! s"started $id"
Behaviors.receiveMessage { msg =>
probe.ref ! Ack(msg, id)
Behaviors.same
}
}))
// ordering of these msgs is not guaranteed
val expectedStarted = (0 to 3).map { n =>
@ -76,14 +75,13 @@ class RoutersSpec extends ScalaTestWithActorTestKit("""
"keep routing to the rest of the children if some children stops" in {
val probe = createTestProbe[String]()
val pool = spawn(Routers.pool[String](4)(() =>
Behaviors.receiveMessage {
case "stop" =>
Behaviors.stopped
case msg =>
probe.ref ! msg
Behaviors.same
}))
val pool = spawn(Routers.pool[String](4)(Behaviors.receiveMessage {
case "stop" =>
Behaviors.stopped
case msg =>
probe.ref ! msg
Behaviors.same
}))
LoggingEventFilter.debug("Pool child stopped").withOccurrences(2).intercept {
pool ! "stop"
@ -105,10 +103,9 @@ class RoutersSpec extends ScalaTestWithActorTestKit("""
"stops if all children stops" in {
val probe = createTestProbe()
val pool = spawn(Routers.pool[String](4)(() =>
Behaviors.receiveMessage { _ =>
Behaviors.stopped
}))
val pool = spawn(Routers.pool[String](4)(Behaviors.receiveMessage { _ =>
Behaviors.stopped
}))
LoggingEventFilter.info("Last pool child stopped, stopping pool").intercept {
(0 to 3).foreach { _ =>

View file

@ -57,7 +57,7 @@ class RouterSpec extends ScalaTestWithActorTestKit("akka.loglevel=warning") with
spawn(Behaviors.setup[Unit] { ctx =>
// #pool
val pool = Routers.pool(poolSize = 4)(() =>
val pool = Routers.pool(poolSize = 4)(
// make sure the workers are restarted if they fail
Behaviors.supervise(Worker()).onFailure[Exception](SupervisorStrategy.restart))
val router = ctx.spawn(pool, "worker-pool")
@ -97,8 +97,8 @@ class RouterSpec extends ScalaTestWithActorTestKit("akka.loglevel=warning") with
val worker = ctx.spawn(Worker(), "worker")
ctx.system.receptionist ! Receptionist.Register(serviceKey, worker)
val group = Routers.group(serviceKey);
val router = ctx.spawn(group, "worker-group");
val group = Routers.group(serviceKey)
val router = ctx.spawn(group, "worker-group")
// the group router will stash messages until it sees the first listing of registered
// services from the receptionist, so it is safe to send messages right away

View file

@ -16,7 +16,7 @@ import akka.annotation.InternalApi
@InternalApi
private[akka] final case class PoolRouterBuilder[T](
poolSize: Int,
behaviorFactory: () => Behavior[T],
behavior: Behavior[T],
logicFactory: () => RoutingLogic[T] = () => new RoutingLogics.RoundRobinLogic[T])
extends javadsl.PoolRouter[T]
with scaladsl.PoolRouter[T] {
@ -24,7 +24,7 @@ private[akka] final case class PoolRouterBuilder[T](
// deferred creation of the actual router
def apply(ctx: TypedActorContext[T]): Behavior[T] =
new PoolRouterImpl[T](ctx.asScala, poolSize, behaviorFactory, logicFactory())
new PoolRouterImpl[T](ctx.asScala, poolSize, behavior, logicFactory())
def withRandomRouting(): PoolRouterBuilder[T] = copy(logicFactory = () => new RoutingLogics.RandomLogic[T]())
@ -40,12 +40,12 @@ private[akka] final case class PoolRouterBuilder[T](
private final class PoolRouterImpl[T](
ctx: ActorContext[T],
poolSize: Int,
behaviorFactory: () => Behavior[T],
behavior: Behavior[T],
logic: RoutingLogic[T])
extends AbstractBehavior[T](ctx) {
(1 to poolSize).foreach { _ =>
val child = context.spawnAnonymous(behaviorFactory())
val child = context.spawnAnonymous(behavior)
context.watch(child)
child
}

View file

@ -10,7 +10,6 @@ import akka.actor.typed.internal.routing.GroupRouterBuilder
import akka.actor.typed.internal.routing.PoolRouterBuilder
import akka.actor.typed.receptionist.ServiceKey
import akka.annotation.DoNotInherit
import akka.japi.function.Creator
object Routers {
@ -37,8 +36,8 @@ object Routers {
* Note that if a child stops there is a slight chance that messages still get delivered to it, and get lost,
* before the pool sees that the child stopped. Therefore it is best to _not_ stop children arbitrarily.
*/
def pool[T](poolSize: Int)(behaviorFactory: Creator[Behavior[T]]): PoolRouter[T] =
new PoolRouterBuilder[T](poolSize, behaviorFactory.create _)
def pool[T](poolSize: Int)(behavior: Behavior[T]): PoolRouter[T] =
new PoolRouterBuilder[T](poolSize, behavior)
}

View file

@ -34,8 +34,8 @@ object Routers {
* Note that if a child stops, there is a slight chance that messages still get delivered to it, and get lost,
* before the pool sees that the child stopped. Therefore it is best to _not_ stop children arbitrarily.
*/
def pool[T](poolSize: Int)(behaviorFactory: () => Behavior[T]): PoolRouter[T] =
new PoolRouterBuilder[T](poolSize, behaviorFactory)
def pool[T](poolSize: Int)(behavior: Behavior[T]): PoolRouter[T] =
new PoolRouterBuilder[T](poolSize, behavior)
}
@ -73,7 +73,7 @@ trait PoolRouter[T] extends Behavior[T] {
/**
* Route messages by randomly selecting the routee from the available routees.
*
* Random routing makes it less likely that every `poolsize` message from a single producer ends up in the same
* Random routing makes it less likely that every `poolSize` message from a single producer ends up in the same
* mailbox of a slow actor.
*/
def withRandomRouting(): PoolRouter[T]

View file

@ -26,15 +26,12 @@ There are two kinds of routers included in Akka Typed - the pool router and the
## Pool Router
The pool router is created with a routee `Behavior` factory and spawns a number of children with that behavior which it will
The pool router is created with a routee `Behavior` and spawns a number of children with that behavior which it will
then forward messages to.
If a child is stopped the pool router removes it from its set of routees. When the last child stops the router itself stops.
To make a resilient router that deals with failures the routee `Behavior` must be supervised.
Note that it is important that the `Routers.pool` factory returns a new behavior instance for every call to the factory or else
routees may end up sharing mutable state and not work as expected.
Scala
: @@snip [RouterSpec.scala](/akka-actor-typed-tests/src/test/scala/docs/akka/typed/RouterSpec.scala) { #pool }