diff --git a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/scaladsl/RoutersSpec.scala b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/scaladsl/RoutersSpec.scala index ed7396452d..05217ab1a0 100644 --- a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/scaladsl/RoutersSpec.scala +++ b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/scaladsl/RoutersSpec.scala @@ -135,9 +135,7 @@ class RoutersSpec extends ScalaTestWithActorTestKit(""" val group = spawn(Routers.group(serviceKey), "group-router-1") - // give the group a little time to get a listing from the receptionist - Thread.sleep(receptionistDelayMs) - + // ok to do right away (0 to 3).foreach { n => val msg = s"message-$n" group ! msg diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/routing/GroupRouterImpl.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/routing/GroupRouterImpl.scala index f8ee424cd4..b18ad16e83 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/routing/GroupRouterImpl.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/routing/GroupRouterImpl.scala @@ -7,8 +7,7 @@ package akka.actor.typed.internal.routing import akka.actor.typed._ import akka.actor.typed.receptionist.Receptionist import akka.actor.typed.receptionist.ServiceKey -import akka.actor.typed.scaladsl.AbstractBehavior -import akka.actor.typed.scaladsl.ActorContext +import akka.actor.typed.scaladsl.{ AbstractBehavior, ActorContext, StashBuffer } import akka.annotation.InternalApi /** @@ -24,7 +23,7 @@ private[akka] final case class GroupRouterBuilder[T] private[akka] ( with scaladsl.GroupRouter[T] { // deferred creation of the actual router - def apply(ctx: TypedActorContext[T]): Behavior[T] = new GroupRouterImpl[T](ctx.asScala, key, logicFactory()) + def apply(ctx: TypedActorContext[T]): Behavior[T] = new InitialGroupRouterImpl[T](ctx.asScala, key, logicFactory()) def withRandomRouting(): GroupRouterBuilder[T] = copy(logicFactory = () => new RoutingLogics.RandomLogic[T]()) @@ -34,15 +33,50 @@ private[akka] final case class GroupRouterBuilder[T] private[akka] ( /** * INTERNAL API + * + * Starting behavior for a group router before it got a first listing back from the receptionist */ @InternalApi -private final class GroupRouterImpl[T](ctx: ActorContext[T], serviceKey: ServiceKey[T], routingLogic: RoutingLogic[T]) +private final class InitialGroupRouterImpl[T]( + ctx: ActorContext[T], + serviceKey: ServiceKey[T], + routingLogic: RoutingLogic[T]) extends AbstractBehavior[T] { // casting trix to avoid having to wrap incoming messages - note that this will cause problems if intercepting // messages to a router ctx.system.receptionist ! Receptionist.Subscribe(serviceKey, ctx.self.unsafeUpcast[Any].narrow[Receptionist.Listing]) - private var routeesEmpty = true + + private val stash = StashBuffer[T](capacity = 10000) + + def onMessage(msg: T): Behavior[T] = msg match { + case serviceKey.Listing(update) => + // we don't need to watch, because receptionist already does that + routingLogic.routeesUpdated(update) + val activeGroupRouter = new GroupRouterImpl[T](ctx, serviceKey, routingLogic, update.isEmpty) + stash.unstashAll(ctx, activeGroupRouter) + case msg: T @unchecked => + if (!stash.isFull) stash.stash(msg) + else ctx.system.deadLetters ! Dropped(msg, ctx.self) // don't fail on full stash + this + } +} + +/** + * INTERNAL API + */ +@InternalApi +private final class GroupRouterImpl[T]( + ctx: ActorContext[T], + serviceKey: ServiceKey[T], + routingLogic: RoutingLogic[T], + routeesInitiallyEmpty: Boolean) + extends AbstractBehavior[T] { + + // casting trix to avoid having to wrap incoming messages - note that this will cause problems if intercepting + // messages to a router + ctx.system.receptionist ! Receptionist.Subscribe(serviceKey, ctx.self.unsafeUpcast[Any].narrow[Receptionist.Listing]) + private var routeesEmpty = routeesInitiallyEmpty def onMessage(msg: T): Behavior[T] = msg match { case serviceKey.Listing(update) =>