diff --git a/akka-stream-tests/src/test/scala/akka/stream/FusingSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/FusingSpec.scala index d8668937eb..84f4abd3d3 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/FusingSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/FusingSpec.scala @@ -11,11 +11,16 @@ import akka.stream.Attributes._ import akka.stream.Fusing.FusedGraph import scala.annotation.tailrec import akka.stream.impl.StreamLayout.Module +import org.scalatest.concurrent.ScalaFutures +import scala.concurrent.duration._ +import akka.stream.impl.fusing.GraphInterpreter +import akka.event.BusLogging -class FusingSpec extends AkkaSpec with ConversionCheckedTripleEquals { +class FusingSpec extends AkkaSpec with ScalaFutures with ConversionCheckedTripleEquals { final val Debug = false implicit val materializer = ActorMaterializer() + implicit val patience = PatienceConfig(1.second) def graph(async: Boolean) = Source.unfoldInf(1)(x ⇒ (x, x)).filter(_ % 2 == 1) @@ -87,4 +92,38 @@ class FusingSpec extends AkkaSpec with ConversionCheckedTripleEquals { } + "SubFusingActorMaterializer" must { + + "work with asynchronous boundaries in the subflows" in { + val async = Flow[Int].map(_ * 2).withAttributes(Attributes.asyncBoundary) + Source(0 to 9) + .map(_ * 10) + .flatMapMerge(5, i ⇒ Source(i to (i + 9)).via(async)) + .grouped(1000) + .runWith(Sink.head) + .futureValue + .sorted should ===(0 to 198 by 2) + } + + "use multiple actors when there are asynchronous boundaries in the subflows" in { + def ref = { + val bus = GraphInterpreter.currentInterpreter.log.asInstanceOf[BusLogging] + bus.logSource + } + val async = Flow[Int].map(x ⇒ { testActor ! ref; x }).withAttributes(Attributes.asyncBoundary) + Source(0 to 9) + .map(x ⇒ { testActor ! ref; x }) + .flatMapMerge(5, i ⇒ Source.single(i).via(async)) + .grouped(1000) + .runWith(Sink.head) + .futureValue + .sorted should ===(0 to 9) + val refs = receiveN(20) + withClue(s"refs=\n${refs.mkString("\n")}") { + refs.toSet.size should ===(11) + } + } + + } + } diff --git a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/GraphInterpreterSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/GraphInterpreterSpec.scala index 61870f2e95..06a0060251 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/GraphInterpreterSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/GraphInterpreterSpec.scala @@ -56,7 +56,7 @@ class GraphInterpreterSpec extends AkkaSpec with GraphInterpreterSpecKit { manualInit(assembly) interpreter.attachDownstreamBoundary(2, sink) interpreter.attachUpstreamBoundary(0, source) - interpreter.init() + interpreter.init(null) lastEvents() should ===(Set.empty) diff --git a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/GraphInterpreterSpecKit.scala b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/GraphInterpreterSpecKit.scala index a66dd6f5bd..d408e5dc96 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/GraphInterpreterSpecKit.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/GraphInterpreterSpecKit.scala @@ -84,7 +84,7 @@ trait GraphInterpreterSpecKit extends AkkaSpec { _interpreter.attachDownstreamBoundary(i + upstreams.size + connections.size, downstream._2) } - _interpreter.init() + _interpreter.init(null) } } @@ -226,7 +226,7 @@ trait GraphInterpreterSpecKit extends AkkaSpec { manualInit(assembly) interpreter.attachDownstreamBoundary(0, in) interpreter.attachUpstreamBoundary(0, out) - interpreter.init() + interpreter.init(null) } abstract class FailingStageSetup(initFailOnNextEvent: Boolean = false) extends TestSetup { @@ -351,7 +351,7 @@ trait GraphInterpreterSpecKit extends AkkaSpec { interpreter.attachUpstreamBoundary(0, upstream) interpreter.attachDownstreamBoundary(ops.length, downstream) - interpreter.init() + interpreter.init(null) } diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSpec.scala index 43a049d01e..416bdc2a8d 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSpec.scala @@ -22,6 +22,7 @@ import scala.collection.immutable import scala.concurrent.Await import scala.concurrent.duration._ import scala.util.control.NoStackTrace +import akka.stream.impl.fusing.GraphInterpreterShell object FlowSpec { class Fruit @@ -41,22 +42,14 @@ class FlowSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.rece val identity: Flow[Any, Any, Unit] ⇒ Flow[Any, Any, Unit] = in ⇒ in.map(e ⇒ e) val identity2: Flow[Any, Any, Unit] ⇒ Flow[Any, Any, Unit] = in ⇒ identity(in) - class BrokenActorInterpreter( - _assembly: GraphAssembly, - _inHandlers: Array[InHandler], - _outHandlers: Array[OutHandler], - _logics: Array[GraphStageLogic], - _shape: Shape, - _settings: ActorMaterializerSettings, - _materializer: Materializer, - brokenMessage: Any) - extends ActorGraphInterpreter(_assembly, _inHandlers, _outHandlers, _logics, _shape, _settings, _materializer) { + class BrokenActorInterpreter(_shell: GraphInterpreterShell, brokenMessage: Any) + extends ActorGraphInterpreter(_shell) { import akka.stream.actor.ActorSubscriberMessage._ override protected[akka] def aroundReceive(receive: Receive, msg: Any) = { msg match { - case ActorGraphInterpreter.OnNext(0, m) if m == brokenMessage ⇒ + case ActorGraphInterpreter.OnNext(_, 0, m) if m == brokenMessage ⇒ throw new NullPointerException(s"I'm so broken [$m]") case _ ⇒ super.aroundReceive(receive, msg) } @@ -77,14 +70,17 @@ class FlowSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.rece val (inHandlers, outHandlers, logics) = assembly.materialize(Attributes.none, assembly.stages.map(_.module), new java.util.HashMap, _ ⇒ ()) - val props = Props(new BrokenActorInterpreter(assembly, inHandlers, outHandlers, logics, stage.shape, settings, materializer, "a3")) + val shell = new GraphInterpreterShell(assembly, inHandlers, outHandlers, logics, stage.shape, settings, + materializer.asInstanceOf[ActorMaterializerImpl]) + + val props = Props(new BrokenActorInterpreter(shell, "a3")) .withDispatcher("akka.test.stream-dispatcher").withDeploy(Deploy.local) val impl = system.actorOf(props, "borken-stage-actor") - val subscriber = new ActorGraphInterpreter.BoundarySubscriber(impl, 0) - val publisher = new ActorPublisher[Any](impl) { override val wakeUpMsg = ActorGraphInterpreter.SubscribePending(0) } + val subscriber = new ActorGraphInterpreter.BoundarySubscriber(impl, shell, 0) + val publisher = new ActorPublisher[Any](impl) { override val wakeUpMsg = ActorGraphInterpreter.SubscribePending(shell, 0) } - impl ! ActorGraphInterpreter.ExposedPublisher(0, publisher) + impl ! ActorGraphInterpreter.ExposedPublisher(shell, 0, publisher) Flow.fromSinkAndSource(Sink(subscriber), Source(publisher)) }) diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala index dee5691de6..4e946fe6d6 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala @@ -20,6 +20,7 @@ import scala.concurrent.{ Await, ExecutionContextExecutor } import akka.stream.impl.fusing.GraphStageModule import akka.stream.impl.fusing.GraphInterpreter.GraphAssembly import akka.stream.impl.fusing.Fusing +import akka.stream.impl.fusing.GraphInterpreterShell /** * INTERNAL API @@ -44,7 +45,7 @@ private[akka] case class ActorMaterializerImpl(system: ActorSystem, override def isShutdown: Boolean = haveShutDown.get() - override def withNamePrefix(name: String): Materializer = this.copy(flowNames = flowNames.copy(name)) + override def withNamePrefix(name: String): ActorMaterializerImpl = this.copy(flowNames = flowNames.copy(name)) private[this] def createFlowName(): String = flowNames.next() @@ -73,7 +74,11 @@ private[akka] case class ActorMaterializerImpl(system: ActorSystem, override def scheduleOnce(delay: FiniteDuration, task: Runnable) = system.scheduler.scheduleOnce(delay, task)(executionContext) - override def materialize[Mat](_runnableGraph: Graph[ClosedShape, Mat]): Mat = { + override def materialize[Mat](_runnableGraph: Graph[ClosedShape, Mat]): Mat = + materialize(_runnableGraph, null) + + private[stream] def materialize[Mat](_runnableGraph: Graph[ClosedShape, Mat], + subflowFuser: GraphInterpreterShell ⇒ ActorRef): Mat = { val runnableGraph = if (settings.autoFusing) Fusing.aggressive(_runnableGraph) else _runnableGraph @@ -146,17 +151,24 @@ private[akka] case class ActorMaterializerImpl(system: ActorSystem, val calculatedSettings = effectiveSettings(effectiveAttributes) val (inHandlers, outHandlers, logics) = graph.assembly.materialize(effectiveAttributes, graph.matValIDs, matVal, registerSrc) - val props = ActorGraphInterpreter.props( - graph.assembly, inHandlers, outHandlers, logics, graph.shape, calculatedSettings, ActorMaterializerImpl.this) + val shell = new GraphInterpreterShell(graph.assembly, inHandlers, outHandlers, logics, graph.shape, + calculatedSettings, ActorMaterializerImpl.this) + + val impl = + if (subflowFuser != null && !effectiveAttributes.contains(Attributes.AsyncBoundary)) { + subflowFuser(shell) + } else { + val props = ActorGraphInterpreter.props(shell) + actorOf(props, stageName(effectiveAttributes), calculatedSettings.dispatcher) + } - val impl = actorOf(props, stageName(effectiveAttributes), calculatedSettings.dispatcher) for ((inlet, i) ← graph.shape.inlets.iterator.zipWithIndex) { - val subscriber = new ActorGraphInterpreter.BoundarySubscriber(impl, i) + val subscriber = new ActorGraphInterpreter.BoundarySubscriber(impl, shell, i) assignPort(inlet, subscriber) } for ((outlet, i) ← graph.shape.outlets.iterator.zipWithIndex) { - val publisher = new ActorPublisher[Any](impl) { override val wakeUpMsg = ActorGraphInterpreter.SubscribePending(i) } - impl ! ActorGraphInterpreter.ExposedPublisher(i, publisher) + val publisher = new ActorGraphInterpreter.BoundaryPublisher(impl, shell, i) + impl ! ActorGraphInterpreter.ExposedPublisher(shell, i, publisher) assignPort(outlet, publisher) } } @@ -207,6 +219,20 @@ private[akka] case class ActorMaterializerImpl(system: ActorSystem, } +private[akka] class SubFusingActorMaterializerImpl(val delegate: ActorMaterializerImpl, registerShell: GraphInterpreterShell ⇒ ActorRef) extends Materializer { + override def executionContext: ExecutionContextExecutor = delegate.executionContext + + override def materialize[Mat](runnable: Graph[ClosedShape, Mat]): Mat = delegate.materialize(runnable, registerShell) + + override def scheduleOnce(delay: FiniteDuration, task: Runnable): Cancellable = delegate.scheduleOnce(delay, task) + + override def schedulePeriodically(initialDelay: FiniteDuration, interval: FiniteDuration, task: Runnable): Cancellable = + delegate.schedulePeriodically(initialDelay, interval, task) + + def withNamePrefix(name: String): SubFusingActorMaterializerImpl = + new SubFusingActorMaterializerImpl(delegate.withNamePrefix(name), registerShell) +} + /** * INTERNAL API */ diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorGraphInterpreter.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorGraphInterpreter.scala index 7de5f225ee..6267d26e2b 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorGraphInterpreter.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorGraphInterpreter.scala @@ -4,7 +4,6 @@ package akka.stream.impl.fusing import java.util.concurrent.TimeoutException - import akka.actor._ import akka.event.Logging import akka.stream._ @@ -14,9 +13,11 @@ import akka.stream.impl.fusing.GraphInterpreter.{ DownstreamBoundaryStageLogic, import akka.stream.impl.{ ActorPublisher, ReactiveStreamsCompliance } import akka.stream.stage.{ GraphStageLogic, InHandler, OutHandler } import org.reactivestreams.{ Subscriber, Subscription } - import scala.concurrent.forkjoin.ThreadLocalRandom import scala.util.control.NonFatal +import akka.event.LoggingAdapter +import akka.stream.impl.ActorMaterializerImpl +import akka.stream.impl.SubFusingActorMaterializerImpl /** * INTERNAL API @@ -41,52 +42,53 @@ private[stream] case class GraphModule(assembly: GraphAssembly, shape: Shape, at * INTERNAL API */ private[stream] object ActorGraphInterpreter { - trait BoundaryEvent extends DeadLetterSuppression with NoSerializationVerificationNeeded + trait BoundaryEvent extends DeadLetterSuppression with NoSerializationVerificationNeeded { + def shell: GraphInterpreterShell + } - final case class OnError(id: Int, cause: Throwable) extends BoundaryEvent - final case class OnComplete(id: Int) extends BoundaryEvent - final case class OnNext(id: Int, e: Any) extends BoundaryEvent - final case class OnSubscribe(id: Int, subscription: Subscription) extends BoundaryEvent + final case class OnError(shell: GraphInterpreterShell, id: Int, cause: Throwable) extends BoundaryEvent + final case class OnComplete(shell: GraphInterpreterShell, id: Int) extends BoundaryEvent + final case class OnNext(shell: GraphInterpreterShell, id: Int, e: Any) extends BoundaryEvent + final case class OnSubscribe(shell: GraphInterpreterShell, id: Int, subscription: Subscription) extends BoundaryEvent - final case class RequestMore(id: Int, demand: Long) extends BoundaryEvent - final case class Cancel(id: Int) extends BoundaryEvent - final case class SubscribePending(id: Int) extends BoundaryEvent - final case class ExposedPublisher(id: Int, publisher: ActorPublisher[Any]) extends BoundaryEvent + final case class RequestMore(shell: GraphInterpreterShell, id: Int, demand: Long) extends BoundaryEvent + final case class Cancel(shell: GraphInterpreterShell, id: Int) extends BoundaryEvent + final case class SubscribePending(shell: GraphInterpreterShell, id: Int) extends BoundaryEvent + final case class ExposedPublisher(shell: GraphInterpreterShell, id: Int, publisher: ActorPublisher[Any]) extends BoundaryEvent - final case class AsyncInput(logic: GraphStageLogic, evt: Any, handler: (Any) ⇒ Unit) extends BoundaryEvent + final case class AsyncInput(shell: GraphInterpreterShell, logic: GraphStageLogic, evt: Any, handler: (Any) ⇒ Unit) extends BoundaryEvent - case object Resume extends BoundaryEvent + case class Resume(shell: GraphInterpreterShell) extends BoundaryEvent + case class Abort(shell: GraphInterpreterShell) extends BoundaryEvent - final class BoundarySubscription(val parent: ActorRef, val id: Int) extends Subscription { - override def request(elements: Long): Unit = parent ! RequestMore(id, elements) - override def cancel(): Unit = parent ! Cancel(id) + final class BoundaryPublisher(parent: ActorRef, shell: GraphInterpreterShell, id: Int) extends ActorPublisher[Any](parent) { + override val wakeUpMsg = ActorGraphInterpreter.SubscribePending(shell, id) + } + + final class BoundarySubscription(parent: ActorRef, shell: GraphInterpreterShell, id: Int) extends Subscription { + override def request(elements: Long): Unit = parent ! RequestMore(shell, id, elements) + override def cancel(): Unit = parent ! Cancel(shell, id) override def toString = s"BoundarySubscription[$parent, $id]" } - final class BoundarySubscriber(val parent: ActorRef, id: Int) extends Subscriber[Any] { + final class BoundarySubscriber(parent: ActorRef, shell: GraphInterpreterShell, id: Int) extends Subscriber[Any] { override def onError(cause: Throwable): Unit = { ReactiveStreamsCompliance.requireNonNullException(cause) - parent ! OnError(id, cause) + parent ! OnError(shell, id, cause) } - override def onComplete(): Unit = parent ! OnComplete(id) + override def onComplete(): Unit = parent ! OnComplete(shell, id) override def onNext(element: Any): Unit = { ReactiveStreamsCompliance.requireNonNullElement(element) - parent ! OnNext(id, element) + parent ! OnNext(shell, id, element) } override def onSubscribe(subscription: Subscription): Unit = { ReactiveStreamsCompliance.requireNonNullSubscription(subscription) - parent ! OnSubscribe(id, subscription) + parent ! OnSubscribe(shell, id, subscription) } } - def props(assembly: GraphAssembly, - inHandlers: Array[InHandler], - outHandlers: Array[OutHandler], - logics: Array[GraphStageLogic], - shape: Shape, - settings: ActorMaterializerSettings, - mat: Materializer): Props = - Props(new ActorGraphInterpreter(assembly, inHandlers, outHandlers, logics, shape, settings, mat)).withDeploy(Deploy.local) + def props(shell: GraphInterpreterShell): Props = + Props(new ActorGraphInterpreter(shell)).withDeploy(Deploy.local) class BatchingActorInputBoundary(size: Int, id: Int) extends UpstreamBoundaryStageLogic[Any] { require(size > 0, "buffer size cannot be zero") @@ -201,7 +203,7 @@ private[stream] object ActorGraphInterpreter { override def toString: String = s"BatchingActorInputBoundary(id=$id, fill=$inputBufferElements/$size, completed=$upstreamCompleted, canceled=$downstreamCanceled)" } - private[stream] class ActorOutputBoundary(actor: ActorRef, id: Int) extends DownstreamBoundaryStageLogic[Any] { + private[stream] class ActorOutputBoundary(actor: ActorRef, shell: GraphInterpreterShell, id: Int) extends DownstreamBoundaryStageLogic[Any] { val in: Inlet[Any] = Inlet[Any]("UpstreamBoundary" + id) in.id = 0 @@ -258,7 +260,7 @@ private[stream] object ActorGraphInterpreter { exposedPublisher.takePendingSubscribers() foreach { sub ⇒ if (subscriber eq null) { subscriber = sub - tryOnSubscribe(subscriber, new BoundarySubscription(actor, id)) + tryOnSubscribe(subscriber, new BoundarySubscription(actor, shell, id)) if (GraphInterpreter.Debug) println(s"${interpreter.Name} subscribe subscriber=$sub") } else rejectAdditionalSubscriber(subscriber, s"${Logging.simpleName(this)}") @@ -301,28 +303,25 @@ private[stream] object ActorGraphInterpreter { /** * INTERNAL API */ -private[stream] class ActorGraphInterpreter( +private[stream] final class GraphInterpreterShell( assembly: GraphAssembly, inHandlers: Array[InHandler], outHandlers: Array[OutHandler], logics: Array[GraphStageLogic], shape: Shape, settings: ActorMaterializerSettings, - mat: Materializer) extends Actor { + mat: ActorMaterializerImpl) { + import ActorGraphInterpreter._ - val interpreter = new GraphInterpreter( - assembly, - mat, - Logging(this), - inHandlers, - outHandlers, - logics, - (logic, event, handler) ⇒ self ! AsyncInput(logic, event, handler), - settings.fuzzingMode) + private var self: ActorRef = _ + lazy val log = Logging(mat.system.eventStream, self) - private val inputs = Array.tabulate(shape.inlets.size)(new BatchingActorInputBoundary(settings.maxInputBufferSize, _)) - private val outputs = Array.tabulate(shape.outlets.size)(new ActorOutputBoundary(self, _)) + lazy val interpreter = new GraphInterpreter(assembly, mat, log, inHandlers, outHandlers, logics, + (logic, event, handler) ⇒ self ! AsyncInput(this, logic, event, handler), settings.fuzzingMode) + + private val inputs = new Array[BatchingActorInputBoundary](shape.inlets.size) + private val outputs = new Array[ActorOutputBoundary](shape.outlets.size) private var subscribesPending = inputs.length private var publishersPending = outputs.length @@ -333,97 +332,108 @@ private[stream] class ActorGraphInterpreter( * to give each input buffer slot a chance to run through the whole pipeline * and back (for the demand). */ - val eventLimit = settings.maxInputBufferSize * (assembly.ins.length + assembly.outs.length) + private val eventLimit = settings.maxInputBufferSize * (assembly.ins.length + assembly.outs.length) // Limits the number of events processed by the interpreter on an abort event. // TODO: Better heuristic here private val abortLimit = eventLimit * 2 private var resumeScheduled = false - override def preStart(): Unit = { + def init(self: ActorRef, registerShell: GraphInterpreterShell ⇒ ActorRef): Unit = { + this.self = self var i = 0 while (i < inputs.length) { - interpreter.attachUpstreamBoundary(i, inputs(i)) + val in = new BatchingActorInputBoundary(settings.maxInputBufferSize, i) + inputs(i) = in + interpreter.attachUpstreamBoundary(i, in) i += 1 } val offset = assembly.connectionCount - outputs.length i = 0 while (i < outputs.length) { - interpreter.attachDownstreamBoundary(i + offset, outputs(i)) + val out = new ActorOutputBoundary(self, this, i) + outputs(i) = out + interpreter.attachDownstreamBoundary(i + offset, out) i += 1 } - interpreter.init() + interpreter.init(new SubFusingActorMaterializerImpl(mat, registerShell)) runBatch() } - override def receive: Receive = { - // Cases that are most likely on the hot path, in decreasing order of frequency - case OnNext(id: Int, e: Any) ⇒ - if (GraphInterpreter.Debug) println(s"${interpreter.Name} onNext $e id=$id") - inputs(id).onNext(e) - runBatch() - case RequestMore(id: Int, demand: Long) ⇒ - if (GraphInterpreter.Debug) println(s"${interpreter.Name} request $demand id=$id") - outputs(id).requestMore(demand) - runBatch() - case Resume ⇒ - if (GraphInterpreter.Debug) println(s"${interpreter.Name} resume") - resumeScheduled = false - if (interpreter.isSuspended) runBatch() - case AsyncInput(logic, event, handler) ⇒ - if (GraphInterpreter.Debug) println(s"${interpreter.Name} ASYNC $event ($handler) [$logic]") - if (!interpreter.isStageCompleted(logic)) { - try handler(event) - catch { - case NonFatal(e) ⇒ logic.failStage(e) + def receive(event: BoundaryEvent): Unit = + if (waitingForShutdown) event match { + case ExposedPublisher(_, id, publisher) ⇒ + outputs(id).exposedPublisher(publisher) + publishersPending -= 1 + if (canShutDown) _isTerminated = true + case OnSubscribe(_, _, sub) ⇒ + tryCancel(sub) + subscribesPending -= 1 + if (canShutDown) _isTerminated = true + case Abort(_) ⇒ + tryAbort(new TimeoutException("Streaming actor has been already stopped processing (normally), but not all of its " + + s"inputs or outputs have been subscribed in [${settings.subscriptionTimeoutSettings.timeout}}]. Aborting actor now.")) + case _ ⇒ // Ignore, there is nothing to do anyway + } + else event match { + // Cases that are most likely on the hot path, in decreasing order of frequency + case OnNext(_, id: Int, e: Any) ⇒ + if (GraphInterpreter.Debug) println(s"${interpreter.Name} onNext $e id=$id") + inputs(id).onNext(e) + runBatch() + case RequestMore(_, id: Int, demand: Long) ⇒ + if (GraphInterpreter.Debug) println(s"${interpreter.Name} request $demand id=$id") + outputs(id).requestMore(demand) + runBatch() + case Resume(_) ⇒ + if (GraphInterpreter.Debug) println(s"${interpreter.Name} resume") + resumeScheduled = false + if (interpreter.isSuspended) runBatch() + case AsyncInput(_, logic, event, handler) ⇒ + if (GraphInterpreter.Debug) println(s"${interpreter.Name} ASYNC $event ($handler) [$logic]") + if (!interpreter.isStageCompleted(logic)) { + try handler(event) + catch { + case NonFatal(e) ⇒ logic.failStage(e) + } + interpreter.afterStageHasRun(logic) } - interpreter.afterStageHasRun(logic) - } - runBatch() + runBatch() - // Initialization and completion messages - case OnError(id: Int, cause: Throwable) ⇒ - if (GraphInterpreter.Debug) println(s"${interpreter.Name} onError id=$id") - inputs(id).onError(cause) - runBatch() - case OnComplete(id: Int) ⇒ - if (GraphInterpreter.Debug) println(s"${interpreter.Name} onComplete id=$id") - inputs(id).onComplete() - runBatch() - case OnSubscribe(id: Int, subscription: Subscription) ⇒ - if (GraphInterpreter.Debug) println(s"${interpreter.Name} onSubscribe id=$id") - subscribesPending -= 1 - inputs(id).onSubscribe(subscription) - runBatch() - case Cancel(id: Int) ⇒ - if (GraphInterpreter.Debug) println(s"${interpreter.Name} cancel id=$id") - outputs(id).cancel() - runBatch() - case SubscribePending(id: Int) ⇒ - outputs(id).subscribePending() - case ExposedPublisher(id, publisher) ⇒ - publishersPending -= 1 - outputs(id).exposedPublisher(publisher) + // Initialization and completion messages + case OnError(_, id: Int, cause: Throwable) ⇒ + if (GraphInterpreter.Debug) println(s"${interpreter.Name} onError id=$id") + inputs(id).onError(cause) + runBatch() + case OnComplete(_, id: Int) ⇒ + if (GraphInterpreter.Debug) println(s"${interpreter.Name} onComplete id=$id") + inputs(id).onComplete() + runBatch() + case OnSubscribe(_, id: Int, subscription: Subscription) ⇒ + if (GraphInterpreter.Debug) println(s"${interpreter.Name} onSubscribe id=$id") + subscribesPending -= 1 + inputs(id).onSubscribe(subscription) + runBatch() + case Cancel(_, id: Int) ⇒ + if (GraphInterpreter.Debug) println(s"${interpreter.Name} cancel id=$id") + outputs(id).cancel() + runBatch() + case SubscribePending(_, id: Int) ⇒ + outputs(id).subscribePending() + case ExposedPublisher(_, id, publisher) ⇒ + publishersPending -= 1 + outputs(id).exposedPublisher(publisher) + } - } - - private def waitShutdown: Receive = { - case ExposedPublisher(id, publisher) ⇒ - outputs(id).exposedPublisher(publisher) - publishersPending -= 1 - if (canShutDown) context.stop(self) - case OnSubscribe(_, sub) ⇒ - tryCancel(sub) - subscribesPending -= 1 - if (canShutDown) context.stop(self) - case ReceiveTimeout ⇒ - tryAbort(new TimeoutException("Streaming actor has been already stopped processing (normally), but not all of its " + - s"inputs or outputs have been subscribed in [${settings.subscriptionTimeoutSettings.timeout}}]. Aborting actor now.")) - case _ ⇒ // Ignore, there is nothing to do anyway - } + private var _isTerminated = false + def isTerminated: Boolean = _isTerminated private def canShutDown: Boolean = subscribesPending + publishersPending == 0 + private var waitingForShutdown: Boolean = false + + private val resume = Resume(this) + private def runBatch(): Unit = { try { val effectiveLimit = { @@ -436,18 +446,19 @@ private[stream] class ActorGraphInterpreter( interpreter.execute(effectiveLimit) if (interpreter.isCompleted) { // Cannot stop right away if not completely subscribed - if (canShutDown) context.stop(self) + if (canShutDown) _isTerminated = true else { - context.become(waitShutdown) - context.setReceiveTimeout(settings.subscriptionTimeoutSettings.timeout) + waitingForShutdown = true + mat.scheduleOnce(settings.subscriptionTimeoutSettings.timeout, new Runnable { + override def run(): Unit = self ! Abort(GraphInterpreterShell.this) + }) } } else if (interpreter.isSuspended && !resumeScheduled) { resumeScheduled = true - self ! Resume + self ! resume } } catch { case NonFatal(e) ⇒ - context.stop(self) tryAbort(e) } } @@ -458,20 +469,57 @@ private[stream] class ActorGraphInterpreter( * - the event limit is reached * - a new error is encountered */ - private def tryAbort(ex: Throwable): Unit = { + def tryAbort(ex: Throwable): Unit = { // This should handle termination while interpreter is running. If the upstream have been closed already this // call has no effect and therefore do the right thing: nothing. try { inputs.foreach(_.onInternalError(ex)) interpreter.execute(abortLimit) interpreter.finish() - } // Will only have an effect if the above call to the interpreter failed to emit a proper failure to the downstream - // otherwise this will have no effect - finally { + } catch { + case NonFatal(_) ⇒ + } finally { + _isTerminated = true + // Will only have an effect if the above call to the interpreter failed to emit a proper failure to the downstream + // otherwise this will have no effect outputs.foreach(_.fail(ex)) inputs.foreach(_.cancel()) } } - override def postStop(): Unit = tryAbort(AbruptTerminationException(self)) + override def toString: String = s"GraphInterpreterShell\n ${assembly.toString.replace("\n", "\n ")}" +} + +/** + * INTERNAL API + */ +private[stream] class ActorGraphInterpreter(_initial: GraphInterpreterShell) extends Actor { + import ActorGraphInterpreter._ + + var activeInterpreters = Set(_initial) + + def registerShell(shell: GraphInterpreterShell): ActorRef = { + shell.init(self, registerShell) + if (GraphInterpreter.Debug) println(s"registering new shell in ${_initial}\n ${shell.toString.replace("\n", "\n ")}") + activeInterpreters += shell + self + } + + override def preStart(): Unit = { + activeInterpreters.foreach(_.init(self, registerShell)) + } + + override def receive: Receive = { + case b: BoundaryEvent ⇒ + val shell = b.shell + if (GraphInterpreter.Debug) + if (!activeInterpreters.contains(shell)) println(s"RECEIVED EVENT $b FOR UNKNOWN SHELL $shell") + shell.receive(b) + if (shell.isTerminated) { + activeInterpreters -= shell + if (activeInterpreters.isEmpty) context.stop(self) + } + } + + override def postStop(): Unit = activeInterpreters.foreach(_.tryAbort(AbruptTerminationException(self))) } diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphInterpreter.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphInterpreter.scala index 09db604597..3c653a2341 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphInterpreter.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphInterpreter.scala @@ -372,6 +372,9 @@ private[stream] final class GraphInterpreter( shape.inlets.size + shape.outlets.size + keepGoing } + private var _subFusingMaterializer: Materializer = _ + def subFusingMaterializer: Materializer = _subFusingMaterializer + // An event queue implemented as a circular buffer // FIXME: This calculates the maximum size ever needed, but most assemblies can run on a smaller queue private[this] val eventQueue = Array.ofDim[Int](1 << (32 - Integer.numberOfLeadingZeros(assembly.connectionCount - 1))) @@ -445,9 +448,14 @@ private[stream] final class GraphInterpreter( def isCompleted: Boolean = runningStages == 0 && !isSuspended /** - * Initializes the states of all the stage logics by calling preStart() + * Initializes the states of all the stage logics by calling preStart(). + * The passed-in materializer is intended to be a SubFusingActorMaterializer + * that avoids creating new Actors when stages materialize sub-flows. If no + * such materializer is available, passing in `null` will reuse the normal + * materializer for the GraphInterpreter—fusing is only an optimization. */ - def init(): Unit = { + def init(subMat: Materializer): Unit = { + _subFusingMaterializer = if (subMat == null) materializer else subMat var i = 0 while (i < logics.length) { val logic = logics(i) diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/IteratorInterpreter.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/IteratorInterpreter.scala index fa94d17dd7..0143445314 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/IteratorInterpreter.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/IteratorInterpreter.scala @@ -150,7 +150,7 @@ private[akka] class IteratorInterpreter[I, O](val input: Iterator[I], val ops: S fuzzingMode = false) interpreter.attachUpstreamBoundary(0, upstream) interpreter.attachDownstreamBoundary(ops.length, downstream) - interpreter.init() + interpreter.init(null) } init() diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/StreamOfStreams.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/StreamOfStreams.scala index bf03be2c71..e5783b899e 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/StreamOfStreams.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/StreamOfStreams.scala @@ -125,7 +125,7 @@ final class FlattenMerge[T, M](breadth: Int) extends GraphStage[FlowShape[Graph[ if (localSource.elem == null) removeSource(localSource) case OnError(ex) ⇒ failStage(ex) - }.invoke))(interpreter.materializer) + }.invoke))(interpreter.subFusingMaterializer) localSource.activate(subF) } @@ -140,6 +140,8 @@ final class FlattenMerge[T, M](breadth: Int) extends GraphStage[FlowShape[Graph[ sources.foreach(_.cancel()) } } + + override def toString: String = s"FlattenMerge($breadth)" } /** @@ -285,8 +287,8 @@ object PrefixAndTail { override def completeSubstream(): Unit = onParentFinish.invoke(()) override def failSubstream(ex: Throwable): Unit = onParentFailure.invoke(ex) - override def onPull(): Unit = pullParent() - override def onDownstreamFinish(): Unit = cancelParent() + override def onPull(): Unit = pullParent(()) + override def onDownstreamFinish(): Unit = cancelParent(()) } override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new TailSourceLogic(shape) @@ -413,4 +415,6 @@ final class PrefixAndTail[T](n: Int) extends GraphStage[FlowShape[T, (immutable. } override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new PrefixAndTailLogic(shape) -} \ No newline at end of file + + override def toString: String = s"PrefixAndTail($n)" +} diff --git a/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala b/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala index 2a57396f85..565e3213de 100644 --- a/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala +++ b/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala @@ -296,6 +296,19 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount: throw new IllegalStateException("not yet initialized: only setHandler is allowed in GraphStageLogic constructor") else _interpreter + /** + * The [[akka.stream.Materializer]] that has set this GraphStage in motion. + */ + protected def materializer: Materializer = interpreter.materializer + + /** + * An [[akka.stream.Materializer]] that may run fusable parts of the graphs + * that it materializes within the same actor as the current GraphStage (if + * fusing is available). This materializer must not be shared outside of the + * GraphStage. + */ + protected def subFusingMaterializer: Materializer = interpreter.subFusingMaterializer + /** * Input handler that terminates the stage upon receiving completion. * The stage fails upon receiving a failure.