System materializer guardian (#27723)

* Eager creation of system materializer on system startup #26850

* System level materializers always spawned under a specific system actor #26850

* No need for RepointableActorRef logic anymore

* MiMa filter

* Make the creation timeout higher
This commit is contained in:
Johan Andrén 2019-09-26 22:49:33 +02:00 committed by Patrik Nordwall
parent 1a90e715f5
commit 4f9a4b5403
12 changed files with 185 additions and 215 deletions

View file

@ -402,9 +402,12 @@ private[remote] abstract class ArteryTransport(_system: ExtendedActorSystem, _pr
startTransport() startTransport()
topLevelFlightRecorder.loFreq(Transport_Started, NoMetaData) topLevelFlightRecorder.loFreq(Transport_Started, NoMetaData)
materializer = ActorMaterializer.systemMaterializer(settings.Advanced.MaterializerSettings, "remote", system) val systemMaterializer = SystemMaterializer(system)
controlMaterializer = materializer =
ActorMaterializer.systemMaterializer(settings.Advanced.ControlStreamMaterializerSettings, "remoteControl", system) systemMaterializer.createAdditionalLegacySystemMaterializer("remote", settings.Advanced.MaterializerSettings)
controlMaterializer = systemMaterializer.createAdditionalLegacySystemMaterializer(
"remoteControl",
settings.Advanced.ControlStreamMaterializerSettings)
messageDispatcher = new MessageDispatcher(system, provider) messageDispatcher = new MessageDispatcher(system, provider)
topLevelFlightRecorder.loFreq(Transport_MaterializerStarted, NoMetaData) topLevelFlightRecorder.loFreq(Transport_MaterializerStarted, NoMetaData)

View file

@ -32,3 +32,14 @@ class SystemMaterializerSpec extends StreamSpec with ScalaFutures {
} }
} }
class SystemMaterializerEagerStartupSpec extends StreamSpec {
"The SystemMaterializer" must {
"be eagerly started on system startup" in {
system.hasExtension(SystemMaterializer.lookup) should ===(true)
}
}
}

View file

@ -7,12 +7,9 @@ package akka.stream.impl
import akka.NotUsed import akka.NotUsed
import akka.stream._ import akka.stream._
import akka.stream.impl.TraversalTestUtils._ import akka.stream.impl.TraversalTestUtils._
import akka.stream.scaladsl.{ BidiFlow, Flow, Keep, Sink, Source } import akka.stream.scaladsl.Keep
import akka.stream.testkit.{ TestPublisher, TestSubscriber }
import akka.testkit.AkkaSpec import akka.testkit.AkkaSpec
import scala.concurrent.Await
class TraversalBuilderSpec extends AkkaSpec { class TraversalBuilderSpec extends AkkaSpec {
"CompositeTraversalBuilder" must { "CompositeTraversalBuilder" must {
@ -438,98 +435,6 @@ class TraversalBuilderSpec extends AkkaSpec {
(flow1, Attributes.name("test") and Attributes.name("flow"), TestIsland1), (flow1, Attributes.name("test") and Attributes.name("flow"), TestIsland1),
(sink, Attributes.none, TestDefaultIsland))) (sink, Attributes.none, TestDefaultIsland)))
} }
//TODO: Dummy test cases just for smoke-testing. Should be removed.
"foo" in {
implicit val mat = PhasedFusingActorMaterializer()
import scala.concurrent.duration._
val graph = Source.repeat(1).take(10).toMat(Sink.fold(0)(_ + _))(Keep.right)
Await.result(graph.run(), 3.seconds) should ===(10)
}
"islands 1" in {
implicit val mat = PhasedFusingActorMaterializer()
val sub = TestSubscriber.probe[Int]()
val graph = Source.repeat(1).take(10).toMat(Sink.asPublisher(false))(Keep.right)
graph.run().subscribe(sub)
sub.request(10)
sub.expectNextN(List(1, 1, 1, 1, 1, 1, 1, 1, 1, 1))
sub.expectComplete()
}
"islands 2" in {
implicit val mat = PhasedFusingActorMaterializer()
val pub = TestPublisher.probe[Int]()
import scala.concurrent.duration._
val graph = Source.asSubscriber[Int].toMat(Sink.fold(0)(_ + _))(Keep.both)
val (sub, future) = graph.run()
pub.subscribe(sub)
pub.sendNext(0)
pub.sendNext(1)
pub.sendNext(2)
pub.sendNext(3)
pub.sendComplete()
Await.result(future, 3.seconds) should ===(6)
}
"islands 3" in {
implicit val mat = PhasedFusingActorMaterializer()
val sub = TestSubscriber.probe[Int]()
Source.repeat(1).take(10).runWith(Sink.fromSubscriber(sub))
sub.request(10)
sub.expectNextN(List(1, 1, 1, 1, 1, 1, 1, 1, 1, 1))
sub.expectComplete()
}
"islands 4" in {
implicit val mat = PhasedFusingActorMaterializer()
val pub = TestPublisher.probe[Int]()
import scala.concurrent.duration._
val future = Source.fromPublisher(pub).runWith(Sink.fold(0)(_ + _))
pub.sendNext(0)
pub.sendNext(1)
pub.sendNext(2)
pub.sendNext(3)
pub.sendComplete()
Await.result(future, 3.seconds) should ===(6)
}
"bidiflow1" in {
implicit val mat = PhasedFusingActorMaterializer()
val flow1 = Flow.fromGraph(fusing.Map((x: Int) => x + 1))
val flow2 = Flow.fromGraph(fusing.Map((x: Int) => x + 1))
val bidi = BidiFlow.fromFlowsMat(flow1, flow2)(Keep.none)
val flow = bidi.join(Flow[Int])
Source.single(1).via(flow).runWith(Sink.ignore)
}
"bidiflow reverse" in {
implicit val mat = PhasedFusingActorMaterializer()
val flow1 = Flow.fromGraph(new fusing.Map((x: Int) => x + 1))
val flow2 = Flow.fromGraph(new fusing.Map((x: Int) => x + 1))
val bidi = BidiFlow.fromFlowsMat(flow1, flow2)(Keep.none)
val flow = Flow[Int].join(bidi.reversed)
Source.single(1).via(flow).runWith(Sink.ignore)
}
} }
} }

View file

@ -240,3 +240,5 @@ ProblemFilters.exclude[IncompatibleSignatureProblem]("akka.stream.scaladsl.Flow.
# #26187 Remove ActorPublisher, ActorSubscriber # #26187 Remove ActorPublisher, ActorSubscriber
ProblemFilters.exclude[Problem]("akka.stream.actor.*") ProblemFilters.exclude[Problem]("akka.stream.actor.*")
# system materializer guardian #26850
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.ActorMaterializer.systemMaterializer")

View file

@ -2,6 +2,8 @@
# Akka Stream Reference Config File # # Akka Stream Reference Config File #
##################################### #####################################
# eager creation of the system wide materializer
akka.library-extensions += "akka.stream.SystemMaterializer"
akka { akka {
stream { stream {
@ -87,6 +89,9 @@ akka {
write-buffer-size = 16 KiB write-buffer-size = 16 KiB
} }
# Time to wait for async materializer creation before throwing an exception
creation-timeout = 20 seconds
//#stream-ref //#stream-ref
# configure defaults for SourceRef and SinkRef # configure defaults for SourceRef and SinkRef
stream-ref { stream-ref {

View file

@ -5,7 +5,6 @@
package akka.stream package akka.stream
import java.util.concurrent.TimeUnit import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicBoolean
import akka.actor.ActorContext import akka.actor.ActorContext
import akka.actor.ActorRef import akka.actor.ActorRef
@ -48,7 +47,7 @@ object ActorMaterializer {
implicit context: ActorRefFactory): ActorMaterializer = { implicit context: ActorRefFactory): ActorMaterializer = {
val system = actorSystemOf(context) val system = actorSystemOf(context)
val settings = materializerSettings.getOrElse(ActorMaterializerSettings(system)) val settings = materializerSettings.getOrElse(SystemMaterializer(system).materializerSettings)
apply(settings, namePrefix.getOrElse("flow"))(context) apply(settings, namePrefix.getOrElse("flow"))(context)
} }
@ -69,25 +68,17 @@ object ActorMaterializer {
"2.6.0") "2.6.0")
def apply(materializerSettings: ActorMaterializerSettings, namePrefix: String)( def apply(materializerSettings: ActorMaterializerSettings, namePrefix: String)(
implicit context: ActorRefFactory): ActorMaterializer = { implicit context: ActorRefFactory): ActorMaterializer = {
val haveShutDown = new AtomicBoolean(false)
val system = actorSystemOf(context)
val defaultAttributes = materializerSettings.toAttributes
new PhasedFusingActorMaterializer(
system,
materializerSettings,
defaultAttributes,
system.dispatchers,
actorOfStreamSupervisor(defaultAttributes, context, haveShutDown),
haveShutDown,
FlowNames(system).name.copy(namePrefix))
}
private def actorOfStreamSupervisor(attributes: Attributes, context: ActorRefFactory, haveShutDown: AtomicBoolean) = {
val props = StreamSupervisor.props(attributes, haveShutDown)
context match { context match {
case s: ExtendedActorSystem => s.systemActorOf(props, StreamSupervisor.nextName()) case system: ActorSystem =>
case a: ActorContext => a.actorOf(props, StreamSupervisor.nextName()) // system level materializer, defer to the system materializer extension
SystemMaterializer(system)
.createAdditionalLegacySystemMaterializer(namePrefix, materializerSettings)
.asInstanceOf[ActorMaterializer]
case context: ActorContext =>
// actor context level materializer, will live as a child of this actor
PhasedFusingActorMaterializer(context, namePrefix, materializerSettings, materializerSettings.toAttributes)
} }
} }
@ -109,25 +100,6 @@ object ActorMaterializer {
def apply(materializerSettings: ActorMaterializerSettings)(implicit context: ActorRefFactory): ActorMaterializer = def apply(materializerSettings: ActorMaterializerSettings)(implicit context: ActorRefFactory): ActorMaterializer =
apply(Some(materializerSettings), None) apply(Some(materializerSettings), None)
/**
* INTERNAL API: Creates the `StreamSupervisor` as a system actor.
*/
private[akka] def systemMaterializer(
materializerSettings: ActorMaterializerSettings,
namePrefix: String,
system: ExtendedActorSystem): ActorMaterializer = {
val haveShutDown = new AtomicBoolean(false)
val attributes = materializerSettings.toAttributes
new PhasedFusingActorMaterializer(
system,
materializerSettings,
attributes,
system.dispatchers,
system.systemActorOf(StreamSupervisor.props(attributes, haveShutDown), StreamSupervisor.nextName()),
haveShutDown,
FlowNames(system).name.copy(namePrefix))
}
/** /**
* Java API: Creates an ActorMaterializer that can materialize stream blueprints as running streams. * Java API: Creates an ActorMaterializer that can materialize stream blueprints as running streams.
* *

View file

@ -222,9 +222,8 @@ object Materializer {
* needs or want to test abrupt termination of a custom graph stage. If you want to tie the lifecycle * needs or want to test abrupt termination of a custom graph stage. If you want to tie the lifecycle
* of the materializer to an actor, use the factory that takes an [[ActorContext]] instead. * of the materializer to an actor, use the factory that takes an [[ActorContext]] instead.
*/ */
@silent("deprecated")
def apply(systemProvider: ClassicActorSystemProvider): Materializer = def apply(systemProvider: ClassicActorSystemProvider): Materializer =
ActorMaterializer(None, None)(systemProvider.classicSystem) SystemMaterializer(systemProvider.classicSystem).createAdditionalSystemMaterializer()
/** /**
* Scala API: Create a new materializer that will stay alive as long as the system does or until it is explicitly stopped. * Scala API: Create a new materializer that will stay alive as long as the system does or until it is explicitly stopped.
@ -234,7 +233,6 @@ object Materializer {
* needs or want to test abrupt termination of a custom graph stage. If you want to tie the * needs or want to test abrupt termination of a custom graph stage. If you want to tie the
* lifecycle of the materializer to an actor, use the factory that takes an [[ActorContext]] instead. * lifecycle of the materializer to an actor, use the factory that takes an [[ActorContext]] instead.
*/ */
@silent("deprecated")
def createMaterializer(systemProvider: ClassicActorSystemProvider): Materializer = def createMaterializer(systemProvider: ClassicActorSystemProvider): Materializer =
apply(systemProvider) apply(systemProvider)

View file

@ -5,10 +5,19 @@
package akka.stream package akka.stream
import akka.actor.ActorSystem import akka.actor.ActorSystem
import akka.actor.Deploy
import akka.actor.ExtendedActorSystem import akka.actor.ExtendedActorSystem
import akka.actor.Extension import akka.actor.Extension
import akka.actor.ExtensionId import akka.actor.ExtensionId
import akka.actor.ExtensionIdProvider import akka.actor.ExtensionIdProvider
import akka.annotation.InternalApi
import akka.stream.impl.MaterializerGuardian
import scala.concurrent.Await
import scala.concurrent.Promise
import akka.util.JavaDurationConverters._
import akka.pattern.ask
import akka.util.Timeout
import com.github.ghik.silencer.silent import com.github.ghik.silencer.silent
/** /**
@ -27,9 +36,51 @@ object SystemMaterializer extends ExtensionId[SystemMaterializer] with Extension
} }
final class SystemMaterializer(system: ExtendedActorSystem) extends Extension { final class SystemMaterializer(system: ExtendedActorSystem) extends Extension {
@silent("deprecated") private val systemMaterializerPromise = Promise[Materializer]()
val materializer = {
val settings = ActorMaterializerSettings(system) // load these here so we can share the same instance across materializer guardian and other uses
ActorMaterializer.systemMaterializer(settings, "default", system) /**
* INTERNAL API
*/
@InternalApi @silent("deprecated")
private[akka] val materializerSettings = ActorMaterializerSettings(system)
private implicit val materializerTimeout: Timeout =
system.settings.config.getDuration("akka.stream.materializer.creation-timeout").asScala
@InternalApi @silent("deprecated")
private val materializerGuardian = system.systemActorOf(
MaterializerGuardian
.props(systemMaterializerPromise, materializerSettings)
.withDispatcher(materializerSettings.dispatcher)
.withDeploy(Deploy.local),
"Materializers")
/**
* INTERNAL API
*/
@InternalApi
private[akka] def createAdditionalSystemMaterializer(): Materializer = {
val started =
(materializerGuardian ? MaterializerGuardian.StartMaterializer).mapTo[MaterializerGuardian.MaterializerStarted]
Await.result(started, materializerTimeout.duration).materializer
} }
/**
* INTERNAL API
*/
@InternalApi
@silent("deprecated")
private[akka] def createAdditionalLegacySystemMaterializer(
namePrefix: String,
settings: ActorMaterializerSettings): Materializer = {
val started =
(materializerGuardian ? MaterializerGuardian.LegacyStartMaterializer(namePrefix, settings))
.mapTo[MaterializerGuardian.MaterializerStarted]
Await.result(started, materializerTimeout.duration).materializer
}
// block on async creation to make it effectively final
val materializer = Await.result(systemMaterializerPromise.future, materializerTimeout.duration)
} }

View file

@ -24,7 +24,6 @@ import com.github.ghik.silencer.silent
import scala.collection.immutable import scala.collection.immutable
import scala.concurrent.duration._ import scala.concurrent.duration._
import scala.concurrent.Await
import scala.concurrent.ExecutionContextExecutor import scala.concurrent.ExecutionContextExecutor
import scala.concurrent.Future import scala.concurrent.Future
@ -72,14 +71,6 @@ import scala.concurrent.Future
supervisor match { supervisor match {
case ref: LocalActorRef => case ref: LocalActorRef =>
ref.underlying.attachChild(props, name, systemService = false) ref.underlying.attachChild(props, name, systemService = false)
case ref: RepointableActorRef =>
if (ref.isStarted)
ref.underlying.asInstanceOf[ActorCell].attachChild(props, name, systemService = false)
else {
implicit val timeout = ref.system.settings.CreationTimeout
val f = (supervisor ? StreamSupervisor.Materialize(props, name)).mapTo[ActorRef]
Await.result(f, timeout.duration)
}
case unknown => case unknown =>
throw new IllegalStateException(s"Stream supervisor must be a local actor, was [${unknown.getClass.getName}]") throw new IllegalStateException(s"Stream supervisor must be a local actor, was [${unknown.getClass.getName}]")
} }
@ -210,14 +201,6 @@ private[akka] class SubFusingActorMaterializerImpl(
extends DeadLetterSuppression extends DeadLetterSuppression
with NoSerializationVerificationNeeded with NoSerializationVerificationNeeded
final case class AddFunctionRef(f: (ActorRef, Any) => Unit, name: String)
extends DeadLetterSuppression
with NoSerializationVerificationNeeded
final case class RemoveFunctionRef(ref: FunctionRef)
extends DeadLetterSuppression
with NoSerializationVerificationNeeded
case object GetChildrenSnapshots case object GetChildrenSnapshots
final case class ChildrenSnapshots(seq: immutable.Seq[StreamSnapshot]) final case class ChildrenSnapshots(seq: immutable.Seq[StreamSnapshot])
extends DeadLetterSuppression extends DeadLetterSuppression
@ -248,11 +231,6 @@ private[akka] class SubFusingActorMaterializerImpl(
case Materialize(props, name) => case Materialize(props, name) =>
val impl = context.actorOf(props, name) val impl = context.actorOf(props, name)
sender() ! impl sender() ! impl
case AddFunctionRef(f, name) =>
val ref = context.asInstanceOf[ActorCell].addFunctionRef(f, name)
sender() ! ref
case RemoveFunctionRef(ref) =>
context.asInstanceOf[ActorCell].removeFunctionRef(ref)
case GetChildren => case GetChildren =>
sender() ! Children(context.children.toSet) sender() ! Children(context.children.toSet)
case GetChildrenSnapshots => case GetChildrenSnapshots =>

View file

@ -0,0 +1,69 @@
/*
* Copyright (C) 2009-2019 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.stream.impl
import akka.actor.Actor
import akka.actor.Props
import akka.annotation.InternalApi
import akka.stream.ActorMaterializerSettings
import akka.stream.Materializer
import com.github.ghik.silencer.silent
import scala.concurrent.Promise
/**
* INTERNAL API
*
* The materializer guardian is parent to all materializers created on the `system` level including the default
* system wide materializer. Eagerly started by the SystemMaterializer extension on system startup.
*/
@InternalApi
private[akka] object MaterializerGuardian {
final case object StartMaterializer
final case class MaterializerStarted(materializer: Materializer)
// this is available to keep backwards compatibility with ActorMaterializer and should
// be removed together with ActorMaterialixer in Akka 2.7
final case class LegacyStartMaterializer(namePrefix: String, settings: ActorMaterializerSettings)
def props(systemMaterializer: Promise[Materializer], materializerSettings: ActorMaterializerSettings) =
Props(new MaterializerGuardian(systemMaterializer, materializerSettings))
}
/**
* INTERNAL API
*/
@silent("deprecated")
@InternalApi
private[akka] final class MaterializerGuardian(
systemMaterializerPromise: Promise[Materializer],
materializerSettings: ActorMaterializerSettings)
extends Actor {
import MaterializerGuardian._
private val defaultAttributes = materializerSettings.toAttributes
private val defaultNamePrefix = "flow"
private val systemMaterializer = startMaterializer(defaultNamePrefix, None)
systemMaterializerPromise.success(systemMaterializer)
override def receive: Receive = {
case StartMaterializer =>
sender() ! MaterializerStarted(startMaterializer(defaultNamePrefix, None))
case LegacyStartMaterializer(namePrefix, settings) =>
sender() ! MaterializerStarted(startMaterializer(namePrefix, Some(settings)))
}
private def startMaterializer(namePrefix: String, settings: Option[ActorMaterializerSettings]) = {
val attributes = settings match {
case None => defaultAttributes
case Some(`materializerSettings`) => defaultAttributes
case Some(settings) => settings.toAttributes
}
PhasedFusingActorMaterializer(context, namePrefix, settings.getOrElse(materializerSettings), attributes)
}
}

View file

@ -10,10 +10,9 @@ import java.util.concurrent.atomic.AtomicBoolean
import akka.NotUsed import akka.NotUsed
import akka.actor.ActorContext import akka.actor.ActorContext
import akka.actor.ActorRef import akka.actor.ActorRef
import akka.actor.ActorRefFactory
import akka.actor.ActorSystem import akka.actor.ActorSystem
import akka.actor.Cancellable import akka.actor.Cancellable
import akka.actor.ExtendedActorSystem import akka.actor.Deploy
import akka.actor.PoisonPill import akka.actor.PoisonPill
import akka.actor.Props import akka.actor.Props
import akka.annotation.DoNotInherit import akka.annotation.DoNotInherit
@ -98,38 +97,28 @@ import com.github.ghik.silencer.silent
}, },
GraphStageTag -> DefaultPhase) GraphStageTag -> DefaultPhase)
@silent("deprecated") def apply(
@InternalApi private[akka] def apply()(implicit context: ActorRefFactory): Materializer = { context: ActorContext,
namePrefix: String,
settings: ActorMaterializerSettings,
attributes: Attributes): PhasedFusingActorMaterializer = {
val haveShutDown = new AtomicBoolean(false) val haveShutDown = new AtomicBoolean(false)
val system = actorSystemOf(context)
val materializerSettings = ActorMaterializerSettings(system)
val defaultAttributes = materializerSettings.toAttributes
val streamSupervisor = val supervisorProps =
context.actorOf(StreamSupervisor.props(defaultAttributes, haveShutDown), StreamSupervisor.nextName()) StreamSupervisor.props(attributes, haveShutDown).withDispatcher(context.props.dispatcher).withDeploy(Deploy.local)
PhasedFusingActorMaterializer( // FIXME why do we need a global unique name for the child?
system, val streamSupervisor = context.actorOf(supervisorProps, StreamSupervisor.nextName())
materializerSettings,
defaultAttributes, new PhasedFusingActorMaterializer(
system.dispatchers, context.system,
settings,
attributes,
context.system.dispatchers,
streamSupervisor, streamSupervisor,
haveShutDown, haveShutDown,
FlowNames(system).name.copy("flow")) FlowNames(context.system).name.copy(namePrefix))
} }
private def actorSystemOf(context: ActorRefFactory): ActorSystem = {
val system = context match {
case s: ExtendedActorSystem => s
case c: ActorContext => c.system
case null => throw new IllegalArgumentException("ActorRefFactory context must be defined")
case _ =>
throw new IllegalArgumentException(
s"ActorRefFactory context must be an ActorSystem or ActorContext, got [${context.getClass.getName}]")
}
system
}
} }
private final case class SegmentInfo( private final case class SegmentInfo(

View file

@ -6,11 +6,9 @@ package akka.stream.stage
import java.util.concurrent.atomic.AtomicReference import java.util.concurrent.atomic.AtomicReference
import scala.deprecated
import akka.actor._ import akka.actor._
import akka.annotation.InternalApi import akka.annotation.InternalApi
import akka.japi.function.{ Effect, Procedure } import akka.japi.function.{ Effect, Procedure }
import akka.pattern.ask
import akka.stream._ import akka.stream._
import akka.stream.impl.ActorSubscriberMessage import akka.stream.impl.ActorSubscriberMessage
import akka.stream.impl.fusing.{ GraphInterpreter, GraphStageModule, SubSink, SubSource } import akka.stream.impl.fusing.{ GraphInterpreter, GraphStageModule, SubSink, SubSource }
@ -23,8 +21,7 @@ import akka.{ Done, NotUsed }
import scala.annotation.tailrec import scala.annotation.tailrec
import scala.collection.{ immutable, mutable } import scala.collection.{ immutable, mutable }
import scala.concurrent.duration.FiniteDuration import scala.concurrent.duration.FiniteDuration
import scala.concurrent.{ Await, Future, Promise } import scala.concurrent.{ Future, Promise }
import akka.stream.impl.StreamSupervisor
import com.github.ghik.silencer.silent import com.github.ghik.silencer.silent
/** /**
@ -204,6 +201,11 @@ object GraphStageLogic {
private val callback = getAsyncCallback(internalReceive) private val callback = getAsyncCallback(internalReceive)
private def cell = materializer.supervisor match {
case ref: LocalActorRef => ref.underlying
case unknown =>
throw new IllegalStateException(s"Stream supervisor must be a local actor, was [${unknown.getClass.getName}]")
}
private val functionRef: FunctionRef = { private val functionRef: FunctionRef = {
val f: (ActorRef, Any) => Unit = { val f: (ActorRef, Any) => Unit = {
case (r, PoisonPill) if poisonPillFallback => case (r, PoisonPill) if poisonPillFallback =>
@ -217,22 +219,7 @@ object GraphStageLogic {
case pair => callback.invoke(pair) case pair => callback.invoke(pair)
} }
materializer.supervisor match { cell.addFunctionRef(f, name)
case ref: LocalActorRef =>
ref.underlying.addFunctionRef(f, name)
case ref: RepointableActorRef =>
if (ref.isStarted)
ref.underlying.asInstanceOf[ActorCell].addFunctionRef(f, name)
else {
// this may happen if materialized immediately before Materializer has been fully initialized,
// should be rare
implicit val timeout = ref.system.settings.CreationTimeout
val reply = (materializer.supervisor ? StreamSupervisor.AddFunctionRef(f, name)).mapTo[FunctionRef]
Await.result(reply, timeout.duration)
}
case unknown =>
throw new IllegalStateException(s"Stream supervisor must be a local actor, was [${unknown.getClass.getName}]")
}
} }
/** /**
@ -266,7 +253,7 @@ object GraphStageLogic {
} }
def stop(): Unit = { def stop(): Unit = {
materializer.supervisor ! StreamSupervisor.RemoveFunctionRef(functionRef) cell.removeFunctionRef(functionRef)
} }
def watch(actorRef: ActorRef): Unit = functionRef.watch(actorRef) def watch(actorRef: ActorRef): Unit = functionRef.watch(actorRef)