From 4f9a4b5403e2ca587f1627e4d5d143a79efe54c0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johan=20Andr=C3=A9n?= Date: Thu, 26 Sep 2019 22:49:33 +0200 Subject: [PATCH] System materializer guardian (#27723) * Eager creation of system materializer on system startup #26850 * System level materializers always spawned under a specific system actor #26850 * No need for RepointableActorRef logic anymore * MiMa filter * Make the creation timeout higher --- .../akka/remote/artery/ArteryTransport.scala | 9 +- .../akka/stream/SystemMaterializerSpec.scala | 11 +++ .../stream/impl/TraversalBuilderSpec.scala | 97 +------------------ .../mima-filters/2.5.x.backwards.excludes | 2 + akka-stream/src/main/resources/reference.conf | 5 + .../scala/akka/stream/ActorMaterializer.scala | 48 ++------- .../main/scala/akka/stream/Materializer.scala | 4 +- .../akka/stream/SystemMaterializer.scala | 59 ++++++++++- .../stream/impl/ActorMaterializerImpl.scala | 22 ----- .../stream/impl/MaterializerGuardian.scala | 69 +++++++++++++ .../impl/PhasedFusingActorMaterializer.scala | 45 ++++----- .../scala/akka/stream/stage/GraphStage.scala | 29 ++---- 12 files changed, 185 insertions(+), 215 deletions(-) create mode 100644 akka-stream/src/main/scala/akka/stream/impl/MaterializerGuardian.scala diff --git a/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala b/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala index f2b30f9331..462b64a962 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala @@ -402,9 +402,12 @@ private[remote] abstract class ArteryTransport(_system: ExtendedActorSystem, _pr startTransport() topLevelFlightRecorder.loFreq(Transport_Started, NoMetaData) - materializer = ActorMaterializer.systemMaterializer(settings.Advanced.MaterializerSettings, "remote", system) - controlMaterializer = - ActorMaterializer.systemMaterializer(settings.Advanced.ControlStreamMaterializerSettings, "remoteControl", system) + val systemMaterializer = SystemMaterializer(system) + materializer = + systemMaterializer.createAdditionalLegacySystemMaterializer("remote", settings.Advanced.MaterializerSettings) + controlMaterializer = systemMaterializer.createAdditionalLegacySystemMaterializer( + "remoteControl", + settings.Advanced.ControlStreamMaterializerSettings) messageDispatcher = new MessageDispatcher(system, provider) topLevelFlightRecorder.loFreq(Transport_MaterializerStarted, NoMetaData) diff --git a/akka-stream-tests/src/test/scala/akka/stream/SystemMaterializerSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/SystemMaterializerSpec.scala index a5774521d3..2aa6123797 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/SystemMaterializerSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/SystemMaterializerSpec.scala @@ -32,3 +32,14 @@ class SystemMaterializerSpec extends StreamSpec with ScalaFutures { } } + +class SystemMaterializerEagerStartupSpec extends StreamSpec { + + "The SystemMaterializer" must { + + "be eagerly started on system startup" in { + system.hasExtension(SystemMaterializer.lookup) should ===(true) + } + } + +} diff --git a/akka-stream-tests/src/test/scala/akka/stream/impl/TraversalBuilderSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/impl/TraversalBuilderSpec.scala index d2b30d515d..ec9a9b8f63 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/impl/TraversalBuilderSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/impl/TraversalBuilderSpec.scala @@ -7,12 +7,9 @@ package akka.stream.impl import akka.NotUsed import akka.stream._ import akka.stream.impl.TraversalTestUtils._ -import akka.stream.scaladsl.{ BidiFlow, Flow, Keep, Sink, Source } -import akka.stream.testkit.{ TestPublisher, TestSubscriber } +import akka.stream.scaladsl.Keep import akka.testkit.AkkaSpec -import scala.concurrent.Await - class TraversalBuilderSpec extends AkkaSpec { "CompositeTraversalBuilder" must { @@ -438,98 +435,6 @@ class TraversalBuilderSpec extends AkkaSpec { (flow1, Attributes.name("test") and Attributes.name("flow"), TestIsland1), (sink, Attributes.none, TestDefaultIsland))) } - - //TODO: Dummy test cases just for smoke-testing. Should be removed. - - "foo" in { - implicit val mat = PhasedFusingActorMaterializer() - import scala.concurrent.duration._ - - val graph = Source.repeat(1).take(10).toMat(Sink.fold(0)(_ + _))(Keep.right) - - Await.result(graph.run(), 3.seconds) should ===(10) - } - - "islands 1" in { - implicit val mat = PhasedFusingActorMaterializer() - val sub = TestSubscriber.probe[Int]() - val graph = Source.repeat(1).take(10).toMat(Sink.asPublisher(false))(Keep.right) - - graph.run().subscribe(sub) - - sub.request(10) - sub.expectNextN(List(1, 1, 1, 1, 1, 1, 1, 1, 1, 1)) - sub.expectComplete() - } - - "islands 2" in { - implicit val mat = PhasedFusingActorMaterializer() - val pub = TestPublisher.probe[Int]() - import scala.concurrent.duration._ - - val graph = Source.asSubscriber[Int].toMat(Sink.fold(0)(_ + _))(Keep.both) - - val (sub, future) = graph.run() - pub.subscribe(sub) - - pub.sendNext(0) - pub.sendNext(1) - pub.sendNext(2) - pub.sendNext(3) - pub.sendComplete() - - Await.result(future, 3.seconds) should ===(6) - } - - "islands 3" in { - implicit val mat = PhasedFusingActorMaterializer() - val sub = TestSubscriber.probe[Int]() - Source.repeat(1).take(10).runWith(Sink.fromSubscriber(sub)) - - sub.request(10) - sub.expectNextN(List(1, 1, 1, 1, 1, 1, 1, 1, 1, 1)) - sub.expectComplete() - } - - "islands 4" in { - implicit val mat = PhasedFusingActorMaterializer() - val pub = TestPublisher.probe[Int]() - import scala.concurrent.duration._ - - val future = Source.fromPublisher(pub).runWith(Sink.fold(0)(_ + _)) - pub.sendNext(0) - pub.sendNext(1) - pub.sendNext(2) - pub.sendNext(3) - pub.sendComplete() - - Await.result(future, 3.seconds) should ===(6) - } - - "bidiflow1" in { - implicit val mat = PhasedFusingActorMaterializer() - val flow1 = Flow.fromGraph(fusing.Map((x: Int) => x + 1)) - val flow2 = Flow.fromGraph(fusing.Map((x: Int) => x + 1)) - - val bidi = BidiFlow.fromFlowsMat(flow1, flow2)(Keep.none) - - val flow = bidi.join(Flow[Int]) - - Source.single(1).via(flow).runWith(Sink.ignore) - } - - "bidiflow reverse" in { - implicit val mat = PhasedFusingActorMaterializer() - val flow1 = Flow.fromGraph(new fusing.Map((x: Int) => x + 1)) - val flow2 = Flow.fromGraph(new fusing.Map((x: Int) => x + 1)) - - val bidi = BidiFlow.fromFlowsMat(flow1, flow2)(Keep.none) - - val flow = Flow[Int].join(bidi.reversed) - - Source.single(1).via(flow).runWith(Sink.ignore) - } - } } diff --git a/akka-stream/src/main/mima-filters/2.5.x.backwards.excludes b/akka-stream/src/main/mima-filters/2.5.x.backwards.excludes index 44b064a142..22adda73e3 100644 --- a/akka-stream/src/main/mima-filters/2.5.x.backwards.excludes +++ b/akka-stream/src/main/mima-filters/2.5.x.backwards.excludes @@ -240,3 +240,5 @@ ProblemFilters.exclude[IncompatibleSignatureProblem]("akka.stream.scaladsl.Flow. # #26187 Remove ActorPublisher, ActorSubscriber ProblemFilters.exclude[Problem]("akka.stream.actor.*") +# system materializer guardian #26850 +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.ActorMaterializer.systemMaterializer") diff --git a/akka-stream/src/main/resources/reference.conf b/akka-stream/src/main/resources/reference.conf index 76ca451e7c..152a75993e 100644 --- a/akka-stream/src/main/resources/reference.conf +++ b/akka-stream/src/main/resources/reference.conf @@ -2,6 +2,8 @@ # Akka Stream Reference Config File # ##################################### +# eager creation of the system wide materializer +akka.library-extensions += "akka.stream.SystemMaterializer" akka { stream { @@ -87,6 +89,9 @@ akka { write-buffer-size = 16 KiB } + # Time to wait for async materializer creation before throwing an exception + creation-timeout = 20 seconds + //#stream-ref # configure defaults for SourceRef and SinkRef stream-ref { diff --git a/akka-stream/src/main/scala/akka/stream/ActorMaterializer.scala b/akka-stream/src/main/scala/akka/stream/ActorMaterializer.scala index 7c027cde5d..ae23ecb98a 100644 --- a/akka-stream/src/main/scala/akka/stream/ActorMaterializer.scala +++ b/akka-stream/src/main/scala/akka/stream/ActorMaterializer.scala @@ -5,7 +5,6 @@ package akka.stream import java.util.concurrent.TimeUnit -import java.util.concurrent.atomic.AtomicBoolean import akka.actor.ActorContext import akka.actor.ActorRef @@ -48,7 +47,7 @@ object ActorMaterializer { implicit context: ActorRefFactory): ActorMaterializer = { val system = actorSystemOf(context) - val settings = materializerSettings.getOrElse(ActorMaterializerSettings(system)) + val settings = materializerSettings.getOrElse(SystemMaterializer(system).materializerSettings) apply(settings, namePrefix.getOrElse("flow"))(context) } @@ -69,25 +68,17 @@ object ActorMaterializer { "2.6.0") def apply(materializerSettings: ActorMaterializerSettings, namePrefix: String)( implicit context: ActorRefFactory): ActorMaterializer = { - val haveShutDown = new AtomicBoolean(false) - val system = actorSystemOf(context) - val defaultAttributes = materializerSettings.toAttributes - new PhasedFusingActorMaterializer( - system, - materializerSettings, - defaultAttributes, - system.dispatchers, - actorOfStreamSupervisor(defaultAttributes, context, haveShutDown), - haveShutDown, - FlowNames(system).name.copy(namePrefix)) - } - - private def actorOfStreamSupervisor(attributes: Attributes, context: ActorRefFactory, haveShutDown: AtomicBoolean) = { - val props = StreamSupervisor.props(attributes, haveShutDown) context match { - case s: ExtendedActorSystem => s.systemActorOf(props, StreamSupervisor.nextName()) - case a: ActorContext => a.actorOf(props, StreamSupervisor.nextName()) + case system: ActorSystem => + // system level materializer, defer to the system materializer extension + SystemMaterializer(system) + .createAdditionalLegacySystemMaterializer(namePrefix, materializerSettings) + .asInstanceOf[ActorMaterializer] + + case context: ActorContext => + // actor context level materializer, will live as a child of this actor + PhasedFusingActorMaterializer(context, namePrefix, materializerSettings, materializerSettings.toAttributes) } } @@ -109,25 +100,6 @@ object ActorMaterializer { def apply(materializerSettings: ActorMaterializerSettings)(implicit context: ActorRefFactory): ActorMaterializer = apply(Some(materializerSettings), None) - /** - * INTERNAL API: Creates the `StreamSupervisor` as a system actor. - */ - private[akka] def systemMaterializer( - materializerSettings: ActorMaterializerSettings, - namePrefix: String, - system: ExtendedActorSystem): ActorMaterializer = { - val haveShutDown = new AtomicBoolean(false) - val attributes = materializerSettings.toAttributes - new PhasedFusingActorMaterializer( - system, - materializerSettings, - attributes, - system.dispatchers, - system.systemActorOf(StreamSupervisor.props(attributes, haveShutDown), StreamSupervisor.nextName()), - haveShutDown, - FlowNames(system).name.copy(namePrefix)) - } - /** * Java API: Creates an ActorMaterializer that can materialize stream blueprints as running streams. * diff --git a/akka-stream/src/main/scala/akka/stream/Materializer.scala b/akka-stream/src/main/scala/akka/stream/Materializer.scala index 3c74ffe44f..43d1129e7e 100644 --- a/akka-stream/src/main/scala/akka/stream/Materializer.scala +++ b/akka-stream/src/main/scala/akka/stream/Materializer.scala @@ -222,9 +222,8 @@ object Materializer { * needs or want to test abrupt termination of a custom graph stage. If you want to tie the lifecycle * of the materializer to an actor, use the factory that takes an [[ActorContext]] instead. */ - @silent("deprecated") def apply(systemProvider: ClassicActorSystemProvider): Materializer = - ActorMaterializer(None, None)(systemProvider.classicSystem) + SystemMaterializer(systemProvider.classicSystem).createAdditionalSystemMaterializer() /** * Scala API: Create a new materializer that will stay alive as long as the system does or until it is explicitly stopped. @@ -234,7 +233,6 @@ object Materializer { * needs or want to test abrupt termination of a custom graph stage. If you want to tie the * lifecycle of the materializer to an actor, use the factory that takes an [[ActorContext]] instead. */ - @silent("deprecated") def createMaterializer(systemProvider: ClassicActorSystemProvider): Materializer = apply(systemProvider) diff --git a/akka-stream/src/main/scala/akka/stream/SystemMaterializer.scala b/akka-stream/src/main/scala/akka/stream/SystemMaterializer.scala index e4f9412554..fee8ae3ab9 100644 --- a/akka-stream/src/main/scala/akka/stream/SystemMaterializer.scala +++ b/akka-stream/src/main/scala/akka/stream/SystemMaterializer.scala @@ -5,10 +5,19 @@ package akka.stream import akka.actor.ActorSystem +import akka.actor.Deploy import akka.actor.ExtendedActorSystem import akka.actor.Extension import akka.actor.ExtensionId import akka.actor.ExtensionIdProvider +import akka.annotation.InternalApi +import akka.stream.impl.MaterializerGuardian + +import scala.concurrent.Await +import scala.concurrent.Promise +import akka.util.JavaDurationConverters._ +import akka.pattern.ask +import akka.util.Timeout import com.github.ghik.silencer.silent /** @@ -27,9 +36,51 @@ object SystemMaterializer extends ExtensionId[SystemMaterializer] with Extension } final class SystemMaterializer(system: ExtendedActorSystem) extends Extension { - @silent("deprecated") - val materializer = { - val settings = ActorMaterializerSettings(system) - ActorMaterializer.systemMaterializer(settings, "default", system) + private val systemMaterializerPromise = Promise[Materializer]() + + // load these here so we can share the same instance across materializer guardian and other uses + /** + * INTERNAL API + */ + @InternalApi @silent("deprecated") + private[akka] val materializerSettings = ActorMaterializerSettings(system) + + private implicit val materializerTimeout: Timeout = + system.settings.config.getDuration("akka.stream.materializer.creation-timeout").asScala + + @InternalApi @silent("deprecated") + private val materializerGuardian = system.systemActorOf( + MaterializerGuardian + .props(systemMaterializerPromise, materializerSettings) + .withDispatcher(materializerSettings.dispatcher) + .withDeploy(Deploy.local), + "Materializers") + + /** + * INTERNAL API + */ + @InternalApi + private[akka] def createAdditionalSystemMaterializer(): Materializer = { + val started = + (materializerGuardian ? MaterializerGuardian.StartMaterializer).mapTo[MaterializerGuardian.MaterializerStarted] + Await.result(started, materializerTimeout.duration).materializer } + + /** + * INTERNAL API + */ + @InternalApi + @silent("deprecated") + private[akka] def createAdditionalLegacySystemMaterializer( + namePrefix: String, + settings: ActorMaterializerSettings): Materializer = { + val started = + (materializerGuardian ? MaterializerGuardian.LegacyStartMaterializer(namePrefix, settings)) + .mapTo[MaterializerGuardian.MaterializerStarted] + Await.result(started, materializerTimeout.duration).materializer + } + + // block on async creation to make it effectively final + val materializer = Await.result(systemMaterializerPromise.future, materializerTimeout.duration) + } diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala index 3e0ea42bbf..96fb8e9907 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala @@ -24,7 +24,6 @@ import com.github.ghik.silencer.silent import scala.collection.immutable import scala.concurrent.duration._ -import scala.concurrent.Await import scala.concurrent.ExecutionContextExecutor import scala.concurrent.Future @@ -72,14 +71,6 @@ import scala.concurrent.Future supervisor match { case ref: LocalActorRef => ref.underlying.attachChild(props, name, systemService = false) - case ref: RepointableActorRef => - if (ref.isStarted) - ref.underlying.asInstanceOf[ActorCell].attachChild(props, name, systemService = false) - else { - implicit val timeout = ref.system.settings.CreationTimeout - val f = (supervisor ? StreamSupervisor.Materialize(props, name)).mapTo[ActorRef] - Await.result(f, timeout.duration) - } case unknown => throw new IllegalStateException(s"Stream supervisor must be a local actor, was [${unknown.getClass.getName}]") } @@ -210,14 +201,6 @@ private[akka] class SubFusingActorMaterializerImpl( extends DeadLetterSuppression with NoSerializationVerificationNeeded - final case class AddFunctionRef(f: (ActorRef, Any) => Unit, name: String) - extends DeadLetterSuppression - with NoSerializationVerificationNeeded - - final case class RemoveFunctionRef(ref: FunctionRef) - extends DeadLetterSuppression - with NoSerializationVerificationNeeded - case object GetChildrenSnapshots final case class ChildrenSnapshots(seq: immutable.Seq[StreamSnapshot]) extends DeadLetterSuppression @@ -248,11 +231,6 @@ private[akka] class SubFusingActorMaterializerImpl( case Materialize(props, name) => val impl = context.actorOf(props, name) sender() ! impl - case AddFunctionRef(f, name) => - val ref = context.asInstanceOf[ActorCell].addFunctionRef(f, name) - sender() ! ref - case RemoveFunctionRef(ref) => - context.asInstanceOf[ActorCell].removeFunctionRef(ref) case GetChildren => sender() ! Children(context.children.toSet) case GetChildrenSnapshots => diff --git a/akka-stream/src/main/scala/akka/stream/impl/MaterializerGuardian.scala b/akka-stream/src/main/scala/akka/stream/impl/MaterializerGuardian.scala new file mode 100644 index 0000000000..bf188d531b --- /dev/null +++ b/akka-stream/src/main/scala/akka/stream/impl/MaterializerGuardian.scala @@ -0,0 +1,69 @@ +/* + * Copyright (C) 2009-2019 Lightbend Inc. + */ + +package akka.stream.impl + +import akka.actor.Actor +import akka.actor.Props +import akka.annotation.InternalApi +import akka.stream.ActorMaterializerSettings +import akka.stream.Materializer +import com.github.ghik.silencer.silent + +import scala.concurrent.Promise + +/** + * INTERNAL API + * + * The materializer guardian is parent to all materializers created on the `system` level including the default + * system wide materializer. Eagerly started by the SystemMaterializer extension on system startup. + */ +@InternalApi +private[akka] object MaterializerGuardian { + + final case object StartMaterializer + final case class MaterializerStarted(materializer: Materializer) + + // this is available to keep backwards compatibility with ActorMaterializer and should + // be removed together with ActorMaterialixer in Akka 2.7 + final case class LegacyStartMaterializer(namePrefix: String, settings: ActorMaterializerSettings) + + def props(systemMaterializer: Promise[Materializer], materializerSettings: ActorMaterializerSettings) = + Props(new MaterializerGuardian(systemMaterializer, materializerSettings)) +} + +/** + * INTERNAL API + */ +@silent("deprecated") +@InternalApi +private[akka] final class MaterializerGuardian( + systemMaterializerPromise: Promise[Materializer], + materializerSettings: ActorMaterializerSettings) + extends Actor { + import MaterializerGuardian._ + + private val defaultAttributes = materializerSettings.toAttributes + private val defaultNamePrefix = "flow" + + private val systemMaterializer = startMaterializer(defaultNamePrefix, None) + systemMaterializerPromise.success(systemMaterializer) + + override def receive: Receive = { + case StartMaterializer => + sender() ! MaterializerStarted(startMaterializer(defaultNamePrefix, None)) + case LegacyStartMaterializer(namePrefix, settings) => + sender() ! MaterializerStarted(startMaterializer(namePrefix, Some(settings))) + } + + private def startMaterializer(namePrefix: String, settings: Option[ActorMaterializerSettings]) = { + val attributes = settings match { + case None => defaultAttributes + case Some(`materializerSettings`) => defaultAttributes + case Some(settings) => settings.toAttributes + } + + PhasedFusingActorMaterializer(context, namePrefix, settings.getOrElse(materializerSettings), attributes) + } +} diff --git a/akka-stream/src/main/scala/akka/stream/impl/PhasedFusingActorMaterializer.scala b/akka-stream/src/main/scala/akka/stream/impl/PhasedFusingActorMaterializer.scala index e56048aef7..58efa98328 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/PhasedFusingActorMaterializer.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/PhasedFusingActorMaterializer.scala @@ -10,10 +10,9 @@ import java.util.concurrent.atomic.AtomicBoolean import akka.NotUsed import akka.actor.ActorContext import akka.actor.ActorRef -import akka.actor.ActorRefFactory import akka.actor.ActorSystem import akka.actor.Cancellable -import akka.actor.ExtendedActorSystem +import akka.actor.Deploy import akka.actor.PoisonPill import akka.actor.Props import akka.annotation.DoNotInherit @@ -98,38 +97,28 @@ import com.github.ghik.silencer.silent }, GraphStageTag -> DefaultPhase) - @silent("deprecated") - @InternalApi private[akka] def apply()(implicit context: ActorRefFactory): Materializer = { + def apply( + context: ActorContext, + namePrefix: String, + settings: ActorMaterializerSettings, + attributes: Attributes): PhasedFusingActorMaterializer = { val haveShutDown = new AtomicBoolean(false) - val system = actorSystemOf(context) - val materializerSettings = ActorMaterializerSettings(system) - val defaultAttributes = materializerSettings.toAttributes - val streamSupervisor = - context.actorOf(StreamSupervisor.props(defaultAttributes, haveShutDown), StreamSupervisor.nextName()) + val supervisorProps = + StreamSupervisor.props(attributes, haveShutDown).withDispatcher(context.props.dispatcher).withDeploy(Deploy.local) - PhasedFusingActorMaterializer( - system, - materializerSettings, - defaultAttributes, - system.dispatchers, + // FIXME why do we need a global unique name for the child? + val streamSupervisor = context.actorOf(supervisorProps, StreamSupervisor.nextName()) + + new PhasedFusingActorMaterializer( + context.system, + settings, + attributes, + context.system.dispatchers, streamSupervisor, haveShutDown, - FlowNames(system).name.copy("flow")) + FlowNames(context.system).name.copy(namePrefix)) } - - private def actorSystemOf(context: ActorRefFactory): ActorSystem = { - val system = context match { - case s: ExtendedActorSystem => s - case c: ActorContext => c.system - case null => throw new IllegalArgumentException("ActorRefFactory context must be defined") - case _ => - throw new IllegalArgumentException( - s"ActorRefFactory context must be an ActorSystem or ActorContext, got [${context.getClass.getName}]") - } - system - } - } private final case class SegmentInfo( diff --git a/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala b/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala index fe07d47e5f..fe86f8dc75 100644 --- a/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala +++ b/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala @@ -6,11 +6,9 @@ package akka.stream.stage import java.util.concurrent.atomic.AtomicReference -import scala.deprecated import akka.actor._ import akka.annotation.InternalApi import akka.japi.function.{ Effect, Procedure } -import akka.pattern.ask import akka.stream._ import akka.stream.impl.ActorSubscriberMessage import akka.stream.impl.fusing.{ GraphInterpreter, GraphStageModule, SubSink, SubSource } @@ -23,8 +21,7 @@ import akka.{ Done, NotUsed } import scala.annotation.tailrec import scala.collection.{ immutable, mutable } import scala.concurrent.duration.FiniteDuration -import scala.concurrent.{ Await, Future, Promise } -import akka.stream.impl.StreamSupervisor +import scala.concurrent.{ Future, Promise } import com.github.ghik.silencer.silent /** @@ -204,6 +201,11 @@ object GraphStageLogic { private val callback = getAsyncCallback(internalReceive) + private def cell = materializer.supervisor match { + case ref: LocalActorRef => ref.underlying + case unknown => + throw new IllegalStateException(s"Stream supervisor must be a local actor, was [${unknown.getClass.getName}]") + } private val functionRef: FunctionRef = { val f: (ActorRef, Any) => Unit = { case (r, PoisonPill) if poisonPillFallback => @@ -217,22 +219,7 @@ object GraphStageLogic { case pair => callback.invoke(pair) } - materializer.supervisor match { - case ref: LocalActorRef => - ref.underlying.addFunctionRef(f, name) - case ref: RepointableActorRef => - if (ref.isStarted) - ref.underlying.asInstanceOf[ActorCell].addFunctionRef(f, name) - else { - // this may happen if materialized immediately before Materializer has been fully initialized, - // should be rare - implicit val timeout = ref.system.settings.CreationTimeout - val reply = (materializer.supervisor ? StreamSupervisor.AddFunctionRef(f, name)).mapTo[FunctionRef] - Await.result(reply, timeout.duration) - } - case unknown => - throw new IllegalStateException(s"Stream supervisor must be a local actor, was [${unknown.getClass.getName}]") - } + cell.addFunctionRef(f, name) } /** @@ -266,7 +253,7 @@ object GraphStageLogic { } def stop(): Unit = { - materializer.supervisor ! StreamSupervisor.RemoveFunctionRef(functionRef) + cell.removeFunctionRef(functionRef) } def watch(actorRef: ActorRef): Unit = functionRef.watch(actorRef)