possibility to select dispatcher for delivery actors (#28799)

* possibility to select dispatcher for delivery actors

* by using sameAsParent for spawned children

* mention in Scaladoc
This commit is contained in:
Patrik Nordwall 2020-04-07 08:28:05 +02:00 committed by GitHub
parent e3992656c8
commit 4c57493807
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 24 additions and 6 deletions

View file

@ -76,6 +76,9 @@ import com.typesafe.config.Config
* The `producerId` is used in logging and included as MDC entry with key `"producerId"`. It's propagated * The `producerId` is used in logging and included as MDC entry with key `"producerId"`. It's propagated
* to the `ConsumerController` and is useful for correlating log messages. It can be any `String` but it's * to the `ConsumerController` and is useful for correlating log messages. It can be any `String` but it's
* recommended to use a unique identifier of representing the producer. * recommended to use a unique identifier of representing the producer.
*
* If the `DurableProducerQueue` is defined it is created as a child actor of the `ProducerController` actor.
* It will use the same dispatcher as the parent `ProducerController`.
*/ */
@ApiMayChange // TODO #28719 when removing ApiMayChange consider removing `case class` for some of the messages @ApiMayChange // TODO #28719 when removing ApiMayChange consider removing `case class` for some of the messages
object ProducerController { object ProducerController {

View file

@ -90,6 +90,10 @@ import com.typesafe.config.Config
* The `producerId` is used in logging and included as MDC entry with key `"producerId"`. It's propagated * The `producerId` is used in logging and included as MDC entry with key `"producerId"`. It's propagated
* to the `ConsumerController` and is useful for correlating log messages. It can be any `String` but it's * to the `ConsumerController` and is useful for correlating log messages. It can be any `String` but it's
* recommended to use a unique identifier of representing the producer. * recommended to use a unique identifier of representing the producer.
*
* If the `DurableProducerQueue` is defined it is created as a child actor of the `WorkPullingProducerController` actor.
* `ProducerController` actors are created for each registered worker. Those child actors use the same dispatcher
* as the parent `WorkPullingProducerController`.
*/ */
@ApiMayChange // TODO #28719 when removing ApiMayChange consider removing `case class` for some of the messages @ApiMayChange // TODO #28719 when removing ApiMayChange consider removing `case class` for some of the messages
object WorkPullingProducerController { object WorkPullingProducerController {

View file

@ -13,6 +13,7 @@ import scala.util.Success
import akka.actor.DeadLetterSuppression import akka.actor.DeadLetterSuppression
import akka.actor.typed.ActorRef import akka.actor.typed.ActorRef
import akka.actor.typed.Behavior import akka.actor.typed.Behavior
import akka.actor.typed.DispatcherSelector
import akka.actor.typed.delivery.ConsumerController import akka.actor.typed.delivery.ConsumerController
import akka.actor.typed.delivery.ConsumerController.SequencedMessage import akka.actor.typed.delivery.ConsumerController.SequencedMessage
import akka.actor.typed.delivery.DurableProducerQueue import akka.actor.typed.delivery.DurableProducerQueue
@ -191,7 +192,7 @@ object ProducerControllerImpl {
settings: ProducerController.Settings): Option[ActorRef[DurableProducerQueue.Command[A]]] = { settings: ProducerController.Settings): Option[ActorRef[DurableProducerQueue.Command[A]]] = {
durableQueueBehavior.map { b => durableQueueBehavior.map { b =>
val ref = context.spawn(b, "durable") val ref = context.spawn(b, "durable", DispatcherSelector.sameAsParent())
context.watchWith(ref, DurableQueueTerminated) context.watchWith(ref, DurableQueueTerminated)
askLoadState(context, Some(ref), settings, attempt = 1) askLoadState(context, Some(ref), settings, attempt = 1)
ref ref

View file

@ -15,6 +15,7 @@ import scala.util.Success
import akka.Done import akka.Done
import akka.actor.typed.ActorRef import akka.actor.typed.ActorRef
import akka.actor.typed.Behavior import akka.actor.typed.Behavior
import akka.actor.typed.DispatcherSelector
import akka.actor.typed.delivery.ConsumerController import akka.actor.typed.delivery.ConsumerController
import akka.actor.typed.delivery.DurableProducerQueue import akka.actor.typed.delivery.DurableProducerQueue
import akka.actor.typed.delivery.DurableProducerQueue.ConfirmationQualifier import akka.actor.typed.delivery.DurableProducerQueue.ConfirmationQualifier
@ -230,7 +231,7 @@ import akka.util.Timeout
settings: WorkPullingProducerController.Settings): Option[ActorRef[DurableProducerQueue.Command[A]]] = { settings: WorkPullingProducerController.Settings): Option[ActorRef[DurableProducerQueue.Command[A]]] = {
durableQueueBehavior.map { b => durableQueueBehavior.map { b =>
val ref = context.spawn(b, "durable") val ref = context.spawn(b, "durable", DispatcherSelector.sameAsParent())
context.watchWith(ref, DurableQueueTerminated) context.watchWith(ref, DurableQueueTerminated)
askLoadState(context, Some(ref), settings, attempt = 1) askLoadState(context, Some(ref), settings, attempt = 1)
ref ref
@ -549,7 +550,8 @@ private class WorkPullingProducerControllerImpl[A: ClassTag](
context.log.debug2("Registered worker [{}], with producerId [{}].", c, outKey) context.log.debug2("Registered worker [{}], with producerId [{}].", c, outKey)
val p = context.spawn( val p = context.spawn(
ProducerController[A](outKey, durableQueueBehavior = None, settings.producerControllerSettings), ProducerController[A](outKey, durableQueueBehavior = None, settings.producerControllerSettings),
uuid) uuid,
DispatcherSelector.sameAsParent())
p ! ProducerController.Start(workerRequestNextAdapter) p ! ProducerController.Start(workerRequestNextAdapter)
p ! ProducerController.RegisterConsumer(c) p ! ProducerController.RegisterConsumer(c)
acc.copy(out = acc.out.updated(outKey, OutState(p, c, 0L, Vector.empty, None))) acc.copy(out = acc.out.updated(outKey, OutState(p, c, 0L, Vector.empty, None)))

View file

@ -84,6 +84,10 @@ import com.typesafe.config.Config
* The `producerId` is used in logging and included as MDC entry with key `"producerId"`. It's propagated * The `producerId` is used in logging and included as MDC entry with key `"producerId"`. It's propagated
* to the `ConsumerController` and is useful for correlating log messages. It can be any `String` but it's * to the `ConsumerController` and is useful for correlating log messages. It can be any `String` but it's
* recommended to use a unique identifier of representing the producer. * recommended to use a unique identifier of representing the producer.
*
* If the `DurableProducerQueue` is defined it is created as a child actor of the `ShardingProducerController` actor.
* `ProducerController` actors are created for each destination entity. Those child actors use the same dispatcher
* as the parent `ShardingProducerController`.
*/ */
@ApiMayChange // TODO #28719 when removing ApiMayChange consider removing `case class` for some of the messages @ApiMayChange // TODO #28719 when removing ApiMayChange consider removing `case class` for some of the messages
object ShardingProducerController { object ShardingProducerController {

View file

@ -6,6 +6,7 @@ package akka.cluster.sharding.typed.delivery.internal
import akka.actor.typed.ActorRef import akka.actor.typed.ActorRef
import akka.actor.typed.Behavior import akka.actor.typed.Behavior
import akka.actor.typed.DispatcherSelector
import akka.actor.typed.Terminated import akka.actor.typed.Terminated
import akka.actor.typed.delivery.ConsumerController import akka.actor.typed.delivery.ConsumerController
import akka.actor.typed.delivery.internal.ConsumerControllerImpl import akka.actor.typed.delivery.internal.ConsumerControllerImpl
@ -92,7 +93,8 @@ private class ShardingConsumerControllerImpl[A](
context.log.debug("Starting ConsumerController for producerId [{}].", seqMsg.producerId) context.log.debug("Starting ConsumerController for producerId [{}].", seqMsg.producerId)
val cc = context.spawn( val cc = context.spawn(
ConsumerController[A](settings.consumerControllerSettings), ConsumerController[A](settings.consumerControllerSettings),
s"consumerController-${seqMsg.producerId}") s"consumerController-${seqMsg.producerId}",
DispatcherSelector.sameAsParent())
context.watch(cc) context.watch(cc)
cc ! ConsumerController.Start(deliverTo) cc ! ConsumerController.Start(deliverTo)
cc ! seqMsg cc ! seqMsg

View file

@ -13,6 +13,7 @@ import scala.util.Success
import akka.Done import akka.Done
import akka.actor.typed.ActorRef import akka.actor.typed.ActorRef
import akka.actor.typed.Behavior import akka.actor.typed.Behavior
import akka.actor.typed.DispatcherSelector
import akka.actor.typed.delivery.ConsumerController import akka.actor.typed.delivery.ConsumerController
import akka.actor.typed.delivery.DurableProducerQueue import akka.actor.typed.delivery.DurableProducerQueue
import akka.actor.typed.delivery.DurableProducerQueue.ConfirmationQualifier import akka.actor.typed.delivery.DurableProducerQueue.ConfirmationQualifier
@ -236,7 +237,7 @@ import akka.util.Timeout
settings: ShardingProducerController.Settings): Option[ActorRef[DurableProducerQueue.Command[A]]] = { settings: ShardingProducerController.Settings): Option[ActorRef[DurableProducerQueue.Command[A]]] = {
durableQueueBehavior.map { b => durableQueueBehavior.map { b =>
val ref = context.spawn(b, "durable") val ref = context.spawn(b, "durable", DispatcherSelector.sameAsParent())
context.watchWith(ref, DurableQueueTerminated) context.watchWith(ref, DurableQueueTerminated)
askLoadState(context, Some(ref), settings, attempt = 1) askLoadState(context, Some(ref), settings, attempt = 1)
ref ref
@ -332,7 +333,8 @@ private class ShardingProducerControllerImpl[A: ClassTag](
} }
val p = context.spawn( val p = context.spawn(
ProducerController[A](outKey, durableQueueBehavior = None, settings.producerControllerSettings, send), ProducerController[A](outKey, durableQueueBehavior = None, settings.producerControllerSettings, send),
entityId) entityId,
DispatcherSelector.sameAsParent())
p ! ProducerController.Start(requestNextAdapter) p ! ProducerController.Start(requestNextAdapter)
s.copy( s.copy(
out = s.out.updated( out = s.out.updated(