diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/delivery/ProducerController.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/delivery/ProducerController.scala index 80222779b4..7f25e678f0 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/delivery/ProducerController.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/delivery/ProducerController.scala @@ -76,6 +76,9 @@ import com.typesafe.config.Config * The `producerId` is used in logging and included as MDC entry with key `"producerId"`. It's propagated * to the `ConsumerController` and is useful for correlating log messages. It can be any `String` but it's * recommended to use a unique identifier of representing the producer. + * + * If the `DurableProducerQueue` is defined it is created as a child actor of the `ProducerController` actor. + * It will use the same dispatcher as the parent `ProducerController`. */ @ApiMayChange // TODO #28719 when removing ApiMayChange consider removing `case class` for some of the messages object ProducerController { diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/delivery/WorkPullingProducerController.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/delivery/WorkPullingProducerController.scala index ffdde54b0a..d34ead4cda 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/delivery/WorkPullingProducerController.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/delivery/WorkPullingProducerController.scala @@ -90,6 +90,10 @@ import com.typesafe.config.Config * The `producerId` is used in logging and included as MDC entry with key `"producerId"`. It's propagated * to the `ConsumerController` and is useful for correlating log messages. It can be any `String` but it's * recommended to use a unique identifier of representing the producer. + * + * If the `DurableProducerQueue` is defined it is created as a child actor of the `WorkPullingProducerController` actor. + * `ProducerController` actors are created for each registered worker. Those child actors use the same dispatcher + * as the parent `WorkPullingProducerController`. */ @ApiMayChange // TODO #28719 when removing ApiMayChange consider removing `case class` for some of the messages object WorkPullingProducerController { diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/delivery/internal/ProducerControllerImpl.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/delivery/internal/ProducerControllerImpl.scala index 40d26ece1e..b1fa7cabc6 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/delivery/internal/ProducerControllerImpl.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/delivery/internal/ProducerControllerImpl.scala @@ -13,6 +13,7 @@ import scala.util.Success import akka.actor.DeadLetterSuppression import akka.actor.typed.ActorRef import akka.actor.typed.Behavior +import akka.actor.typed.DispatcherSelector import akka.actor.typed.delivery.ConsumerController import akka.actor.typed.delivery.ConsumerController.SequencedMessage import akka.actor.typed.delivery.DurableProducerQueue @@ -191,7 +192,7 @@ object ProducerControllerImpl { settings: ProducerController.Settings): Option[ActorRef[DurableProducerQueue.Command[A]]] = { durableQueueBehavior.map { b => - val ref = context.spawn(b, "durable") + val ref = context.spawn(b, "durable", DispatcherSelector.sameAsParent()) context.watchWith(ref, DurableQueueTerminated) askLoadState(context, Some(ref), settings, attempt = 1) ref diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/delivery/internal/WorkPullingProducerControllerImpl.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/delivery/internal/WorkPullingProducerControllerImpl.scala index 70444d90bf..a5290da946 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/delivery/internal/WorkPullingProducerControllerImpl.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/delivery/internal/WorkPullingProducerControllerImpl.scala @@ -15,6 +15,7 @@ import scala.util.Success import akka.Done import akka.actor.typed.ActorRef import akka.actor.typed.Behavior +import akka.actor.typed.DispatcherSelector import akka.actor.typed.delivery.ConsumerController import akka.actor.typed.delivery.DurableProducerQueue import akka.actor.typed.delivery.DurableProducerQueue.ConfirmationQualifier @@ -230,7 +231,7 @@ import akka.util.Timeout settings: WorkPullingProducerController.Settings): Option[ActorRef[DurableProducerQueue.Command[A]]] = { durableQueueBehavior.map { b => - val ref = context.spawn(b, "durable") + val ref = context.spawn(b, "durable", DispatcherSelector.sameAsParent()) context.watchWith(ref, DurableQueueTerminated) askLoadState(context, Some(ref), settings, attempt = 1) ref @@ -549,7 +550,8 @@ private class WorkPullingProducerControllerImpl[A: ClassTag]( context.log.debug2("Registered worker [{}], with producerId [{}].", c, outKey) val p = context.spawn( ProducerController[A](outKey, durableQueueBehavior = None, settings.producerControllerSettings), - uuid) + uuid, + DispatcherSelector.sameAsParent()) p ! ProducerController.Start(workerRequestNextAdapter) p ! ProducerController.RegisterConsumer(c) acc.copy(out = acc.out.updated(outKey, OutState(p, c, 0L, Vector.empty, None))) diff --git a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/delivery/ShardingProducerController.scala b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/delivery/ShardingProducerController.scala index ca84d81eb6..e7716e75b4 100644 --- a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/delivery/ShardingProducerController.scala +++ b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/delivery/ShardingProducerController.scala @@ -84,6 +84,10 @@ import com.typesafe.config.Config * The `producerId` is used in logging and included as MDC entry with key `"producerId"`. It's propagated * to the `ConsumerController` and is useful for correlating log messages. It can be any `String` but it's * recommended to use a unique identifier of representing the producer. + * + * If the `DurableProducerQueue` is defined it is created as a child actor of the `ShardingProducerController` actor. + * `ProducerController` actors are created for each destination entity. Those child actors use the same dispatcher + * as the parent `ShardingProducerController`. */ @ApiMayChange // TODO #28719 when removing ApiMayChange consider removing `case class` for some of the messages object ShardingProducerController { diff --git a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/delivery/internal/ShardingConsumerControllerImpl.scala b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/delivery/internal/ShardingConsumerControllerImpl.scala index cda803f069..a95a7e3627 100644 --- a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/delivery/internal/ShardingConsumerControllerImpl.scala +++ b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/delivery/internal/ShardingConsumerControllerImpl.scala @@ -6,6 +6,7 @@ package akka.cluster.sharding.typed.delivery.internal import akka.actor.typed.ActorRef import akka.actor.typed.Behavior +import akka.actor.typed.DispatcherSelector import akka.actor.typed.Terminated import akka.actor.typed.delivery.ConsumerController import akka.actor.typed.delivery.internal.ConsumerControllerImpl @@ -92,7 +93,8 @@ private class ShardingConsumerControllerImpl[A]( context.log.debug("Starting ConsumerController for producerId [{}].", seqMsg.producerId) val cc = context.spawn( ConsumerController[A](settings.consumerControllerSettings), - s"consumerController-${seqMsg.producerId}") + s"consumerController-${seqMsg.producerId}", + DispatcherSelector.sameAsParent()) context.watch(cc) cc ! ConsumerController.Start(deliverTo) cc ! seqMsg diff --git a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/delivery/internal/ShardingProducerControllerImpl.scala b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/delivery/internal/ShardingProducerControllerImpl.scala index 878ea438c4..3b1665fba0 100644 --- a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/delivery/internal/ShardingProducerControllerImpl.scala +++ b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/delivery/internal/ShardingProducerControllerImpl.scala @@ -13,6 +13,7 @@ import scala.util.Success import akka.Done import akka.actor.typed.ActorRef import akka.actor.typed.Behavior +import akka.actor.typed.DispatcherSelector import akka.actor.typed.delivery.ConsumerController import akka.actor.typed.delivery.DurableProducerQueue import akka.actor.typed.delivery.DurableProducerQueue.ConfirmationQualifier @@ -236,7 +237,7 @@ import akka.util.Timeout settings: ShardingProducerController.Settings): Option[ActorRef[DurableProducerQueue.Command[A]]] = { durableQueueBehavior.map { b => - val ref = context.spawn(b, "durable") + val ref = context.spawn(b, "durable", DispatcherSelector.sameAsParent()) context.watchWith(ref, DurableQueueTerminated) askLoadState(context, Some(ref), settings, attempt = 1) ref @@ -332,7 +333,8 @@ private class ShardingProducerControllerImpl[A: ClassTag]( } val p = context.spawn( ProducerController[A](outKey, durableQueueBehavior = None, settings.producerControllerSettings, send), - entityId) + entityId, + DispatcherSelector.sameAsParent()) p ! ProducerController.Start(requestNextAdapter) s.copy( out = s.out.updated(