Mini cleanup to stream.ActorMaterializer (#25866)

Do it once, instead of in all 4 call sites.
This commit is contained in:
Dale Wijnand 2018-11-05 13:10:54 +00:00 committed by Johan Andrén
parent 079aa46733
commit 91101d996c
3 changed files with 12 additions and 10 deletions

View file

@ -67,13 +67,12 @@ object ActorMaterializer {
FlowNames(system).name.copy(namePrefix)) FlowNames(system).name.copy(namePrefix))
} }
private def actorOfStreamSupervisor(materializerSettings: ActorMaterializerSettings, context: ActorRefFactory, haveShutDown: AtomicBoolean) = private def actorOfStreamSupervisor(materializerSettings: ActorMaterializerSettings, context: ActorRefFactory, haveShutDown: AtomicBoolean) = {
val props = StreamSupervisor.props(materializerSettings, haveShutDown)
context match { context match {
case s: ExtendedActorSystem case s: ExtendedActorSystem s.systemActorOf(props, StreamSupervisor.nextName())
s.systemActorOf(StreamSupervisor.props(materializerSettings, haveShutDown).withDispatcher(materializerSettings.dispatcher), StreamSupervisor.nextName()) case a: ActorContext a.actorOf(props, StreamSupervisor.nextName())
}
case a: ActorContext
a.actorOf(StreamSupervisor.props(materializerSettings, haveShutDown).withDispatcher(materializerSettings.dispatcher), StreamSupervisor.nextName())
} }
/** /**
@ -101,8 +100,9 @@ object ActorMaterializer {
system, system,
materializerSettings, materializerSettings,
system.dispatchers, system.dispatchers,
system.systemActorOf(StreamSupervisor.props(materializerSettings, haveShutDown) system.systemActorOf(
.withDispatcher(materializerSettings.dispatcher), StreamSupervisor.nextName()), StreamSupervisor.props(materializerSettings, haveShutDown),
StreamSupervisor.nextName()),
haveShutDown, haveShutDown,
FlowNames(system).name.copy(namePrefix)) FlowNames(system).name.copy(namePrefix))
} }

View file

@ -152,6 +152,7 @@ private[akka] class SubFusingActorMaterializerImpl(val delegate: ExtendedActorMa
@InternalApi private[akka] object StreamSupervisor { @InternalApi private[akka] object StreamSupervisor {
def props(settings: ActorMaterializerSettings, haveShutDown: AtomicBoolean): Props = def props(settings: ActorMaterializerSettings, haveShutDown: AtomicBoolean): Props =
Props(new StreamSupervisor(settings, haveShutDown)).withDeploy(Deploy.local) Props(new StreamSupervisor(settings, haveShutDown)).withDeploy(Deploy.local)
.withDispatcher(settings.dispatcher)
private[stream] val baseName = "StreamSupervisor" private[stream] val baseName = "StreamSupervisor"
private val actorName = SeqActorName(baseName) private val actorName = SeqActorName(baseName)
def nextName(): String = actorName.next() def nextName(): String = actorName.next()

View file

@ -71,8 +71,9 @@ import akka.util.OptionVal
val system = actorSystemOf(context) val system = actorSystemOf(context)
val materializerSettings = ActorMaterializerSettings(system) val materializerSettings = ActorMaterializerSettings(system)
val streamSupervisor = context.actorOf(StreamSupervisor.props(materializerSettings, haveShutDown) val streamSupervisor = context.actorOf(
.withDispatcher(materializerSettings.dispatcher), StreamSupervisor.nextName()) StreamSupervisor.props(materializerSettings, haveShutDown),
StreamSupervisor.nextName())
PhasedFusingActorMaterializer( PhasedFusingActorMaterializer(
system, system,