Stash before getting first listing in group router #26951

This commit is contained in:
Johan Andrén 2019-05-23 14:58:19 +02:00 committed by GitHub
parent a6f717c9b0
commit 55ae1ad5c4
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 40 additions and 8 deletions

View file

@ -135,9 +135,7 @@ class RoutersSpec extends ScalaTestWithActorTestKit("""
val group = spawn(Routers.group(serviceKey), "group-router-1")
// give the group a little time to get a listing from the receptionist
Thread.sleep(receptionistDelayMs)
// ok to do right away
(0 to 3).foreach { n =>
val msg = s"message-$n"
group ! msg

View file

@ -7,8 +7,7 @@ package akka.actor.typed.internal.routing
import akka.actor.typed._
import akka.actor.typed.receptionist.Receptionist
import akka.actor.typed.receptionist.ServiceKey
import akka.actor.typed.scaladsl.AbstractBehavior
import akka.actor.typed.scaladsl.ActorContext
import akka.actor.typed.scaladsl.{ AbstractBehavior, ActorContext, StashBuffer }
import akka.annotation.InternalApi
/**
@ -24,7 +23,7 @@ private[akka] final case class GroupRouterBuilder[T] private[akka] (
with scaladsl.GroupRouter[T] {
// deferred creation of the actual router
def apply(ctx: TypedActorContext[T]): Behavior[T] = new GroupRouterImpl[T](ctx.asScala, key, logicFactory())
def apply(ctx: TypedActorContext[T]): Behavior[T] = new InitialGroupRouterImpl[T](ctx.asScala, key, logicFactory())
def withRandomRouting(): GroupRouterBuilder[T] = copy(logicFactory = () => new RoutingLogics.RandomLogic[T]())
@ -34,15 +33,50 @@ private[akka] final case class GroupRouterBuilder[T] private[akka] (
/**
* INTERNAL API
*
* Starting behavior for a group router before it got a first listing back from the receptionist
*/
@InternalApi
private final class GroupRouterImpl[T](ctx: ActorContext[T], serviceKey: ServiceKey[T], routingLogic: RoutingLogic[T])
private final class InitialGroupRouterImpl[T](
ctx: ActorContext[T],
serviceKey: ServiceKey[T],
routingLogic: RoutingLogic[T])
extends AbstractBehavior[T] {
// casting trix to avoid having to wrap incoming messages - note that this will cause problems if intercepting
// messages to a router
ctx.system.receptionist ! Receptionist.Subscribe(serviceKey, ctx.self.unsafeUpcast[Any].narrow[Receptionist.Listing])
private var routeesEmpty = true
private val stash = StashBuffer[T](capacity = 10000)
def onMessage(msg: T): Behavior[T] = msg match {
case serviceKey.Listing(update) =>
// we don't need to watch, because receptionist already does that
routingLogic.routeesUpdated(update)
val activeGroupRouter = new GroupRouterImpl[T](ctx, serviceKey, routingLogic, update.isEmpty)
stash.unstashAll(ctx, activeGroupRouter)
case msg: T @unchecked =>
if (!stash.isFull) stash.stash(msg)
else ctx.system.deadLetters ! Dropped(msg, ctx.self) // don't fail on full stash
this
}
}
/**
* INTERNAL API
*/
@InternalApi
private final class GroupRouterImpl[T](
ctx: ActorContext[T],
serviceKey: ServiceKey[T],
routingLogic: RoutingLogic[T],
routeesInitiallyEmpty: Boolean)
extends AbstractBehavior[T] {
// casting trix to avoid having to wrap incoming messages - note that this will cause problems if intercepting
// messages to a router
ctx.system.receptionist ! Receptionist.Subscribe(serviceKey, ctx.self.unsafeUpcast[Any].narrow[Receptionist.Listing])
private var routeesEmpty = routeesInitiallyEmpty
def onMessage(msg: T): Behavior[T] = msg match {
case serviceKey.Listing(update) =>