feat: Make SingleConsumerMultiProducer the default mailbox for stream.

This commit is contained in:
He-Pin 2024-01-08 19:24:33 +08:00 committed by kerr
parent ccce5c0426
commit 3f8be563b9
3 changed files with 25 additions and 3 deletions

View file

@ -21,6 +21,14 @@ pekko {
# or full dispatcher configuration to be used by ActorMaterializer when creating Actors.
dispatcher = "pekko.actor.default-dispatcher"
# FQCN of the MailboxType. The Class of the FQCN must have a public
# constructor with
# (org.apache.pekko.actor.ActorSystem.Settings, com.typesafe.config.Config) parameters.
# defaults to the single consumer mailbox for better performance.
mailbox {
mailbox-type = "org.apache.pekko.dispatch.SingleConsumerOnlyUnboundedMailbox"
}
# Fully qualified config path which holds the dispatcher configuration
# or full dispatcher configuration to be used by stream operators that
# perform blocking operations

View file

@ -63,6 +63,7 @@ import pekko.util.OptionVal
case Dispatchers.DefaultDispatcherId =>
// the caller said to use the default dispatcher, but that can been trumped by the dispatcher attribute
props.withDispatcher(context.effectiveAttributes.mandatoryAttribute[ActorAttributes.Dispatcher].dispatcher)
.withMailbox(PhasedFusingActorMaterializer.MailboxConfigName)
case _ => props
}
@ -195,10 +196,13 @@ private[pekko] class SubFusingActorMaterializerImpl(
* INTERNAL API
*/
@InternalApi private[pekko] object StreamSupervisor {
def props(attributes: Attributes, haveShutDown: AtomicBoolean): Props =
def props(attributes: Attributes, haveShutDown: AtomicBoolean): Props = {
Props(new StreamSupervisor(haveShutDown))
.withDeploy(Deploy.local)
.withDispatcher(attributes.mandatoryAttribute[ActorAttributes.Dispatcher].dispatcher)
.withMailbox(PhasedFusingActorMaterializer.MailboxConfigName)
}
private[stream] val baseName = "StreamSupervisor"
private val actorName = SeqActorName(baseName)
def nextName(): String = actorName.next()

View file

@ -62,6 +62,8 @@ import pekko.util.OptionVal
val Debug = false
val MailboxConfigName: String = "pekko.stream.materializer.mailbox"
val DefaultPhase: Phase[Any] = new Phase[Any] {
override def apply(
settings: ActorMaterializerSettings,
@ -116,7 +118,10 @@ import pekko.util.OptionVal
val dispatcher = attributes.mandatoryAttribute[ActorAttributes.Dispatcher].dispatcher
val supervisorProps =
StreamSupervisor.props(attributes, haveShutDown).withDispatcher(dispatcher).withDeploy(Deploy.local)
StreamSupervisor.props(attributes, haveShutDown)
.withDispatcher(dispatcher)
.withMailbox(MailboxConfigName)
.withDeploy(Deploy.local)
// FIXME why do we need a global unique name for the child?
val streamSupervisor = context.actorOf(supervisorProps, StreamSupervisor.nextName())
@ -625,6 +630,7 @@ private final case class SavedIslandData(
val effectiveProps = props.dispatcher match {
case Dispatchers.DefaultDispatcherId =>
props.withDispatcher(context.effectiveAttributes.mandatoryAttribute[ActorAttributes.Dispatcher].dispatcher)
.withMailbox(MailboxConfigName)
case _ => props
}
@ -819,6 +825,7 @@ private final case class SavedIslandData(
val props = ActorGraphInterpreter
.props(shell)
.withDispatcher(effectiveAttributes.mandatoryAttribute[ActorAttributes.Dispatcher].dispatcher)
.withMailbox(PhasedFusingActorMaterializer.MailboxConfigName)
val actorName = fullIslandName match {
case OptionVal.Some(n) => n
@ -974,7 +981,10 @@ private final case class SavedIslandData(
val maxInputBuffer = attributes.mandatoryAttribute[Attributes.InputBuffer].max
val props =
TLSActor.props(maxInputBuffer, tls.createSSLEngine, tls.verifySession, tls.closing).withDispatcher(dispatcher)
TLSActor.props(maxInputBuffer, tls.createSSLEngine, tls.verifySession, tls.closing)
.withDispatcher(dispatcher)
.withMailbox(PhasedFusingActorMaterializer.MailboxConfigName)
tlsActor = materializer.actorOf(props, "TLS-for-" + islandName)
def factory(id: Int) = new ActorPublisher[Any](tlsActor) {
override val wakeUpMsg = FanOut.SubstreamSubscribePending(id)