#19197 implement SubFusingActorMaterializer

and use it in FlattenMerge
This commit is contained in:
Roland Kuhn 2015-12-17 13:35:37 +01:00
parent a95a5b3af8
commit af99b1eae8
10 changed files with 290 additions and 156 deletions

View file

@ -11,11 +11,16 @@ import akka.stream.Attributes._
import akka.stream.Fusing.FusedGraph import akka.stream.Fusing.FusedGraph
import scala.annotation.tailrec import scala.annotation.tailrec
import akka.stream.impl.StreamLayout.Module import akka.stream.impl.StreamLayout.Module
import org.scalatest.concurrent.ScalaFutures
import scala.concurrent.duration._
import akka.stream.impl.fusing.GraphInterpreter
import akka.event.BusLogging
class FusingSpec extends AkkaSpec with ConversionCheckedTripleEquals { class FusingSpec extends AkkaSpec with ScalaFutures with ConversionCheckedTripleEquals {
final val Debug = false final val Debug = false
implicit val materializer = ActorMaterializer() implicit val materializer = ActorMaterializer()
implicit val patience = PatienceConfig(1.second)
def graph(async: Boolean) = def graph(async: Boolean) =
Source.unfoldInf(1)(x (x, x)).filter(_ % 2 == 1) Source.unfoldInf(1)(x (x, x)).filter(_ % 2 == 1)
@ -87,4 +92,38 @@ class FusingSpec extends AkkaSpec with ConversionCheckedTripleEquals {
} }
"SubFusingActorMaterializer" must {
"work with asynchronous boundaries in the subflows" in {
val async = Flow[Int].map(_ * 2).withAttributes(Attributes.asyncBoundary)
Source(0 to 9)
.map(_ * 10)
.flatMapMerge(5, i Source(i to (i + 9)).via(async))
.grouped(1000)
.runWith(Sink.head)
.futureValue
.sorted should ===(0 to 198 by 2)
}
"use multiple actors when there are asynchronous boundaries in the subflows" in {
def ref = {
val bus = GraphInterpreter.currentInterpreter.log.asInstanceOf[BusLogging]
bus.logSource
}
val async = Flow[Int].map(x { testActor ! ref; x }).withAttributes(Attributes.asyncBoundary)
Source(0 to 9)
.map(x { testActor ! ref; x })
.flatMapMerge(5, i Source.single(i).via(async))
.grouped(1000)
.runWith(Sink.head)
.futureValue
.sorted should ===(0 to 9)
val refs = receiveN(20)
withClue(s"refs=\n${refs.mkString("\n")}") {
refs.toSet.size should ===(11)
}
}
}
} }

View file

@ -56,7 +56,7 @@ class GraphInterpreterSpec extends AkkaSpec with GraphInterpreterSpecKit {
manualInit(assembly) manualInit(assembly)
interpreter.attachDownstreamBoundary(2, sink) interpreter.attachDownstreamBoundary(2, sink)
interpreter.attachUpstreamBoundary(0, source) interpreter.attachUpstreamBoundary(0, source)
interpreter.init() interpreter.init(null)
lastEvents() should ===(Set.empty) lastEvents() should ===(Set.empty)

View file

@ -84,7 +84,7 @@ trait GraphInterpreterSpecKit extends AkkaSpec {
_interpreter.attachDownstreamBoundary(i + upstreams.size + connections.size, downstream._2) _interpreter.attachDownstreamBoundary(i + upstreams.size + connections.size, downstream._2)
} }
_interpreter.init() _interpreter.init(null)
} }
} }
@ -226,7 +226,7 @@ trait GraphInterpreterSpecKit extends AkkaSpec {
manualInit(assembly) manualInit(assembly)
interpreter.attachDownstreamBoundary(0, in) interpreter.attachDownstreamBoundary(0, in)
interpreter.attachUpstreamBoundary(0, out) interpreter.attachUpstreamBoundary(0, out)
interpreter.init() interpreter.init(null)
} }
abstract class FailingStageSetup(initFailOnNextEvent: Boolean = false) extends TestSetup { abstract class FailingStageSetup(initFailOnNextEvent: Boolean = false) extends TestSetup {
@ -351,7 +351,7 @@ trait GraphInterpreterSpecKit extends AkkaSpec {
interpreter.attachUpstreamBoundary(0, upstream) interpreter.attachUpstreamBoundary(0, upstream)
interpreter.attachDownstreamBoundary(ops.length, downstream) interpreter.attachDownstreamBoundary(ops.length, downstream)
interpreter.init() interpreter.init(null)
} }

View file

@ -22,6 +22,7 @@ import scala.collection.immutable
import scala.concurrent.Await import scala.concurrent.Await
import scala.concurrent.duration._ import scala.concurrent.duration._
import scala.util.control.NoStackTrace import scala.util.control.NoStackTrace
import akka.stream.impl.fusing.GraphInterpreterShell
object FlowSpec { object FlowSpec {
class Fruit class Fruit
@ -41,22 +42,14 @@ class FlowSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.rece
val identity: Flow[Any, Any, Unit] Flow[Any, Any, Unit] = in in.map(e e) val identity: Flow[Any, Any, Unit] Flow[Any, Any, Unit] = in in.map(e e)
val identity2: Flow[Any, Any, Unit] Flow[Any, Any, Unit] = in identity(in) val identity2: Flow[Any, Any, Unit] Flow[Any, Any, Unit] = in identity(in)
class BrokenActorInterpreter( class BrokenActorInterpreter(_shell: GraphInterpreterShell, brokenMessage: Any)
_assembly: GraphAssembly, extends ActorGraphInterpreter(_shell) {
_inHandlers: Array[InHandler],
_outHandlers: Array[OutHandler],
_logics: Array[GraphStageLogic],
_shape: Shape,
_settings: ActorMaterializerSettings,
_materializer: Materializer,
brokenMessage: Any)
extends ActorGraphInterpreter(_assembly, _inHandlers, _outHandlers, _logics, _shape, _settings, _materializer) {
import akka.stream.actor.ActorSubscriberMessage._ import akka.stream.actor.ActorSubscriberMessage._
override protected[akka] def aroundReceive(receive: Receive, msg: Any) = { override protected[akka] def aroundReceive(receive: Receive, msg: Any) = {
msg match { msg match {
case ActorGraphInterpreter.OnNext(0, m) if m == brokenMessage case ActorGraphInterpreter.OnNext(_, 0, m) if m == brokenMessage
throw new NullPointerException(s"I'm so broken [$m]") throw new NullPointerException(s"I'm so broken [$m]")
case _ super.aroundReceive(receive, msg) case _ super.aroundReceive(receive, msg)
} }
@ -77,14 +70,17 @@ class FlowSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.rece
val (inHandlers, outHandlers, logics) = val (inHandlers, outHandlers, logics) =
assembly.materialize(Attributes.none, assembly.stages.map(_.module), new java.util.HashMap, _ ()) assembly.materialize(Attributes.none, assembly.stages.map(_.module), new java.util.HashMap, _ ())
val props = Props(new BrokenActorInterpreter(assembly, inHandlers, outHandlers, logics, stage.shape, settings, materializer, "a3")) val shell = new GraphInterpreterShell(assembly, inHandlers, outHandlers, logics, stage.shape, settings,
materializer.asInstanceOf[ActorMaterializerImpl])
val props = Props(new BrokenActorInterpreter(shell, "a3"))
.withDispatcher("akka.test.stream-dispatcher").withDeploy(Deploy.local) .withDispatcher("akka.test.stream-dispatcher").withDeploy(Deploy.local)
val impl = system.actorOf(props, "borken-stage-actor") val impl = system.actorOf(props, "borken-stage-actor")
val subscriber = new ActorGraphInterpreter.BoundarySubscriber(impl, 0) val subscriber = new ActorGraphInterpreter.BoundarySubscriber(impl, shell, 0)
val publisher = new ActorPublisher[Any](impl) { override val wakeUpMsg = ActorGraphInterpreter.SubscribePending(0) } val publisher = new ActorPublisher[Any](impl) { override val wakeUpMsg = ActorGraphInterpreter.SubscribePending(shell, 0) }
impl ! ActorGraphInterpreter.ExposedPublisher(0, publisher) impl ! ActorGraphInterpreter.ExposedPublisher(shell, 0, publisher)
Flow.fromSinkAndSource(Sink(subscriber), Source(publisher)) Flow.fromSinkAndSource(Sink(subscriber), Source(publisher))
}) })

View file

@ -20,6 +20,7 @@ import scala.concurrent.{ Await, ExecutionContextExecutor }
import akka.stream.impl.fusing.GraphStageModule import akka.stream.impl.fusing.GraphStageModule
import akka.stream.impl.fusing.GraphInterpreter.GraphAssembly import akka.stream.impl.fusing.GraphInterpreter.GraphAssembly
import akka.stream.impl.fusing.Fusing import akka.stream.impl.fusing.Fusing
import akka.stream.impl.fusing.GraphInterpreterShell
/** /**
* INTERNAL API * INTERNAL API
@ -44,7 +45,7 @@ private[akka] case class ActorMaterializerImpl(system: ActorSystem,
override def isShutdown: Boolean = haveShutDown.get() override def isShutdown: Boolean = haveShutDown.get()
override def withNamePrefix(name: String): Materializer = this.copy(flowNames = flowNames.copy(name)) override def withNamePrefix(name: String): ActorMaterializerImpl = this.copy(flowNames = flowNames.copy(name))
private[this] def createFlowName(): String = flowNames.next() private[this] def createFlowName(): String = flowNames.next()
@ -73,7 +74,11 @@ private[akka] case class ActorMaterializerImpl(system: ActorSystem,
override def scheduleOnce(delay: FiniteDuration, task: Runnable) = override def scheduleOnce(delay: FiniteDuration, task: Runnable) =
system.scheduler.scheduleOnce(delay, task)(executionContext) system.scheduler.scheduleOnce(delay, task)(executionContext)
override def materialize[Mat](_runnableGraph: Graph[ClosedShape, Mat]): Mat = { override def materialize[Mat](_runnableGraph: Graph[ClosedShape, Mat]): Mat =
materialize(_runnableGraph, null)
private[stream] def materialize[Mat](_runnableGraph: Graph[ClosedShape, Mat],
subflowFuser: GraphInterpreterShell ActorRef): Mat = {
val runnableGraph = val runnableGraph =
if (settings.autoFusing) Fusing.aggressive(_runnableGraph) if (settings.autoFusing) Fusing.aggressive(_runnableGraph)
else _runnableGraph else _runnableGraph
@ -146,17 +151,24 @@ private[akka] case class ActorMaterializerImpl(system: ActorSystem,
val calculatedSettings = effectiveSettings(effectiveAttributes) val calculatedSettings = effectiveSettings(effectiveAttributes)
val (inHandlers, outHandlers, logics) = graph.assembly.materialize(effectiveAttributes, graph.matValIDs, matVal, registerSrc) val (inHandlers, outHandlers, logics) = graph.assembly.materialize(effectiveAttributes, graph.matValIDs, matVal, registerSrc)
val props = ActorGraphInterpreter.props( val shell = new GraphInterpreterShell(graph.assembly, inHandlers, outHandlers, logics, graph.shape,
graph.assembly, inHandlers, outHandlers, logics, graph.shape, calculatedSettings, ActorMaterializerImpl.this) calculatedSettings, ActorMaterializerImpl.this)
val impl =
if (subflowFuser != null && !effectiveAttributes.contains(Attributes.AsyncBoundary)) {
subflowFuser(shell)
} else {
val props = ActorGraphInterpreter.props(shell)
actorOf(props, stageName(effectiveAttributes), calculatedSettings.dispatcher)
}
val impl = actorOf(props, stageName(effectiveAttributes), calculatedSettings.dispatcher)
for ((inlet, i) graph.shape.inlets.iterator.zipWithIndex) { for ((inlet, i) graph.shape.inlets.iterator.zipWithIndex) {
val subscriber = new ActorGraphInterpreter.BoundarySubscriber(impl, i) val subscriber = new ActorGraphInterpreter.BoundarySubscriber(impl, shell, i)
assignPort(inlet, subscriber) assignPort(inlet, subscriber)
} }
for ((outlet, i) graph.shape.outlets.iterator.zipWithIndex) { for ((outlet, i) graph.shape.outlets.iterator.zipWithIndex) {
val publisher = new ActorPublisher[Any](impl) { override val wakeUpMsg = ActorGraphInterpreter.SubscribePending(i) } val publisher = new ActorGraphInterpreter.BoundaryPublisher(impl, shell, i)
impl ! ActorGraphInterpreter.ExposedPublisher(i, publisher) impl ! ActorGraphInterpreter.ExposedPublisher(shell, i, publisher)
assignPort(outlet, publisher) assignPort(outlet, publisher)
} }
} }
@ -207,6 +219,20 @@ private[akka] case class ActorMaterializerImpl(system: ActorSystem,
} }
private[akka] class SubFusingActorMaterializerImpl(val delegate: ActorMaterializerImpl, registerShell: GraphInterpreterShell ActorRef) extends Materializer {
override def executionContext: ExecutionContextExecutor = delegate.executionContext
override def materialize[Mat](runnable: Graph[ClosedShape, Mat]): Mat = delegate.materialize(runnable, registerShell)
override def scheduleOnce(delay: FiniteDuration, task: Runnable): Cancellable = delegate.scheduleOnce(delay, task)
override def schedulePeriodically(initialDelay: FiniteDuration, interval: FiniteDuration, task: Runnable): Cancellable =
delegate.schedulePeriodically(initialDelay, interval, task)
def withNamePrefix(name: String): SubFusingActorMaterializerImpl =
new SubFusingActorMaterializerImpl(delegate.withNamePrefix(name), registerShell)
}
/** /**
* INTERNAL API * INTERNAL API
*/ */

View file

@ -4,7 +4,6 @@
package akka.stream.impl.fusing package akka.stream.impl.fusing
import java.util.concurrent.TimeoutException import java.util.concurrent.TimeoutException
import akka.actor._ import akka.actor._
import akka.event.Logging import akka.event.Logging
import akka.stream._ import akka.stream._
@ -14,9 +13,11 @@ import akka.stream.impl.fusing.GraphInterpreter.{ DownstreamBoundaryStageLogic,
import akka.stream.impl.{ ActorPublisher, ReactiveStreamsCompliance } import akka.stream.impl.{ ActorPublisher, ReactiveStreamsCompliance }
import akka.stream.stage.{ GraphStageLogic, InHandler, OutHandler } import akka.stream.stage.{ GraphStageLogic, InHandler, OutHandler }
import org.reactivestreams.{ Subscriber, Subscription } import org.reactivestreams.{ Subscriber, Subscription }
import scala.concurrent.forkjoin.ThreadLocalRandom import scala.concurrent.forkjoin.ThreadLocalRandom
import scala.util.control.NonFatal import scala.util.control.NonFatal
import akka.event.LoggingAdapter
import akka.stream.impl.ActorMaterializerImpl
import akka.stream.impl.SubFusingActorMaterializerImpl
/** /**
* INTERNAL API * INTERNAL API
@ -41,52 +42,53 @@ private[stream] case class GraphModule(assembly: GraphAssembly, shape: Shape, at
* INTERNAL API * INTERNAL API
*/ */
private[stream] object ActorGraphInterpreter { private[stream] object ActorGraphInterpreter {
trait BoundaryEvent extends DeadLetterSuppression with NoSerializationVerificationNeeded trait BoundaryEvent extends DeadLetterSuppression with NoSerializationVerificationNeeded {
def shell: GraphInterpreterShell
}
final case class OnError(id: Int, cause: Throwable) extends BoundaryEvent final case class OnError(shell: GraphInterpreterShell, id: Int, cause: Throwable) extends BoundaryEvent
final case class OnComplete(id: Int) extends BoundaryEvent final case class OnComplete(shell: GraphInterpreterShell, id: Int) extends BoundaryEvent
final case class OnNext(id: Int, e: Any) extends BoundaryEvent final case class OnNext(shell: GraphInterpreterShell, id: Int, e: Any) extends BoundaryEvent
final case class OnSubscribe(id: Int, subscription: Subscription) extends BoundaryEvent final case class OnSubscribe(shell: GraphInterpreterShell, id: Int, subscription: Subscription) extends BoundaryEvent
final case class RequestMore(id: Int, demand: Long) extends BoundaryEvent final case class RequestMore(shell: GraphInterpreterShell, id: Int, demand: Long) extends BoundaryEvent
final case class Cancel(id: Int) extends BoundaryEvent final case class Cancel(shell: GraphInterpreterShell, id: Int) extends BoundaryEvent
final case class SubscribePending(id: Int) extends BoundaryEvent final case class SubscribePending(shell: GraphInterpreterShell, id: Int) extends BoundaryEvent
final case class ExposedPublisher(id: Int, publisher: ActorPublisher[Any]) extends BoundaryEvent final case class ExposedPublisher(shell: GraphInterpreterShell, id: Int, publisher: ActorPublisher[Any]) extends BoundaryEvent
final case class AsyncInput(logic: GraphStageLogic, evt: Any, handler: (Any) Unit) extends BoundaryEvent final case class AsyncInput(shell: GraphInterpreterShell, logic: GraphStageLogic, evt: Any, handler: (Any) Unit) extends BoundaryEvent
case object Resume extends BoundaryEvent case class Resume(shell: GraphInterpreterShell) extends BoundaryEvent
case class Abort(shell: GraphInterpreterShell) extends BoundaryEvent
final class BoundarySubscription(val parent: ActorRef, val id: Int) extends Subscription { final class BoundaryPublisher(parent: ActorRef, shell: GraphInterpreterShell, id: Int) extends ActorPublisher[Any](parent) {
override def request(elements: Long): Unit = parent ! RequestMore(id, elements) override val wakeUpMsg = ActorGraphInterpreter.SubscribePending(shell, id)
override def cancel(): Unit = parent ! Cancel(id) }
final class BoundarySubscription(parent: ActorRef, shell: GraphInterpreterShell, id: Int) extends Subscription {
override def request(elements: Long): Unit = parent ! RequestMore(shell, id, elements)
override def cancel(): Unit = parent ! Cancel(shell, id)
override def toString = s"BoundarySubscription[$parent, $id]" override def toString = s"BoundarySubscription[$parent, $id]"
} }
final class BoundarySubscriber(val parent: ActorRef, id: Int) extends Subscriber[Any] { final class BoundarySubscriber(parent: ActorRef, shell: GraphInterpreterShell, id: Int) extends Subscriber[Any] {
override def onError(cause: Throwable): Unit = { override def onError(cause: Throwable): Unit = {
ReactiveStreamsCompliance.requireNonNullException(cause) ReactiveStreamsCompliance.requireNonNullException(cause)
parent ! OnError(id, cause) parent ! OnError(shell, id, cause)
} }
override def onComplete(): Unit = parent ! OnComplete(id) override def onComplete(): Unit = parent ! OnComplete(shell, id)
override def onNext(element: Any): Unit = { override def onNext(element: Any): Unit = {
ReactiveStreamsCompliance.requireNonNullElement(element) ReactiveStreamsCompliance.requireNonNullElement(element)
parent ! OnNext(id, element) parent ! OnNext(shell, id, element)
} }
override def onSubscribe(subscription: Subscription): Unit = { override def onSubscribe(subscription: Subscription): Unit = {
ReactiveStreamsCompliance.requireNonNullSubscription(subscription) ReactiveStreamsCompliance.requireNonNullSubscription(subscription)
parent ! OnSubscribe(id, subscription) parent ! OnSubscribe(shell, id, subscription)
} }
} }
def props(assembly: GraphAssembly, def props(shell: GraphInterpreterShell): Props =
inHandlers: Array[InHandler], Props(new ActorGraphInterpreter(shell)).withDeploy(Deploy.local)
outHandlers: Array[OutHandler],
logics: Array[GraphStageLogic],
shape: Shape,
settings: ActorMaterializerSettings,
mat: Materializer): Props =
Props(new ActorGraphInterpreter(assembly, inHandlers, outHandlers, logics, shape, settings, mat)).withDeploy(Deploy.local)
class BatchingActorInputBoundary(size: Int, id: Int) extends UpstreamBoundaryStageLogic[Any] { class BatchingActorInputBoundary(size: Int, id: Int) extends UpstreamBoundaryStageLogic[Any] {
require(size > 0, "buffer size cannot be zero") require(size > 0, "buffer size cannot be zero")
@ -201,7 +203,7 @@ private[stream] object ActorGraphInterpreter {
override def toString: String = s"BatchingActorInputBoundary(id=$id, fill=$inputBufferElements/$size, completed=$upstreamCompleted, canceled=$downstreamCanceled)" override def toString: String = s"BatchingActorInputBoundary(id=$id, fill=$inputBufferElements/$size, completed=$upstreamCompleted, canceled=$downstreamCanceled)"
} }
private[stream] class ActorOutputBoundary(actor: ActorRef, id: Int) extends DownstreamBoundaryStageLogic[Any] { private[stream] class ActorOutputBoundary(actor: ActorRef, shell: GraphInterpreterShell, id: Int) extends DownstreamBoundaryStageLogic[Any] {
val in: Inlet[Any] = Inlet[Any]("UpstreamBoundary" + id) val in: Inlet[Any] = Inlet[Any]("UpstreamBoundary" + id)
in.id = 0 in.id = 0
@ -258,7 +260,7 @@ private[stream] object ActorGraphInterpreter {
exposedPublisher.takePendingSubscribers() foreach { sub exposedPublisher.takePendingSubscribers() foreach { sub
if (subscriber eq null) { if (subscriber eq null) {
subscriber = sub subscriber = sub
tryOnSubscribe(subscriber, new BoundarySubscription(actor, id)) tryOnSubscribe(subscriber, new BoundarySubscription(actor, shell, id))
if (GraphInterpreter.Debug) println(s"${interpreter.Name} subscribe subscriber=$sub") if (GraphInterpreter.Debug) println(s"${interpreter.Name} subscribe subscriber=$sub")
} else } else
rejectAdditionalSubscriber(subscriber, s"${Logging.simpleName(this)}") rejectAdditionalSubscriber(subscriber, s"${Logging.simpleName(this)}")
@ -301,28 +303,25 @@ private[stream] object ActorGraphInterpreter {
/** /**
* INTERNAL API * INTERNAL API
*/ */
private[stream] class ActorGraphInterpreter( private[stream] final class GraphInterpreterShell(
assembly: GraphAssembly, assembly: GraphAssembly,
inHandlers: Array[InHandler], inHandlers: Array[InHandler],
outHandlers: Array[OutHandler], outHandlers: Array[OutHandler],
logics: Array[GraphStageLogic], logics: Array[GraphStageLogic],
shape: Shape, shape: Shape,
settings: ActorMaterializerSettings, settings: ActorMaterializerSettings,
mat: Materializer) extends Actor { mat: ActorMaterializerImpl) {
import ActorGraphInterpreter._ import ActorGraphInterpreter._
val interpreter = new GraphInterpreter( private var self: ActorRef = _
assembly, lazy val log = Logging(mat.system.eventStream, self)
mat,
Logging(this),
inHandlers,
outHandlers,
logics,
(logic, event, handler) self ! AsyncInput(logic, event, handler),
settings.fuzzingMode)
private val inputs = Array.tabulate(shape.inlets.size)(new BatchingActorInputBoundary(settings.maxInputBufferSize, _)) lazy val interpreter = new GraphInterpreter(assembly, mat, log, inHandlers, outHandlers, logics,
private val outputs = Array.tabulate(shape.outlets.size)(new ActorOutputBoundary(self, _)) (logic, event, handler) self ! AsyncInput(this, logic, event, handler), settings.fuzzingMode)
private val inputs = new Array[BatchingActorInputBoundary](shape.inlets.size)
private val outputs = new Array[ActorOutputBoundary](shape.outlets.size)
private var subscribesPending = inputs.length private var subscribesPending = inputs.length
private var publishersPending = outputs.length private var publishersPending = outputs.length
@ -333,97 +332,108 @@ private[stream] class ActorGraphInterpreter(
* to give each input buffer slot a chance to run through the whole pipeline * to give each input buffer slot a chance to run through the whole pipeline
* and back (for the demand). * and back (for the demand).
*/ */
val eventLimit = settings.maxInputBufferSize * (assembly.ins.length + assembly.outs.length) private val eventLimit = settings.maxInputBufferSize * (assembly.ins.length + assembly.outs.length)
// Limits the number of events processed by the interpreter on an abort event. // Limits the number of events processed by the interpreter on an abort event.
// TODO: Better heuristic here // TODO: Better heuristic here
private val abortLimit = eventLimit * 2 private val abortLimit = eventLimit * 2
private var resumeScheduled = false private var resumeScheduled = false
override def preStart(): Unit = { def init(self: ActorRef, registerShell: GraphInterpreterShell ActorRef): Unit = {
this.self = self
var i = 0 var i = 0
while (i < inputs.length) { while (i < inputs.length) {
interpreter.attachUpstreamBoundary(i, inputs(i)) val in = new BatchingActorInputBoundary(settings.maxInputBufferSize, i)
inputs(i) = in
interpreter.attachUpstreamBoundary(i, in)
i += 1 i += 1
} }
val offset = assembly.connectionCount - outputs.length val offset = assembly.connectionCount - outputs.length
i = 0 i = 0
while (i < outputs.length) { while (i < outputs.length) {
interpreter.attachDownstreamBoundary(i + offset, outputs(i)) val out = new ActorOutputBoundary(self, this, i)
outputs(i) = out
interpreter.attachDownstreamBoundary(i + offset, out)
i += 1 i += 1
} }
interpreter.init() interpreter.init(new SubFusingActorMaterializerImpl(mat, registerShell))
runBatch() runBatch()
} }
override def receive: Receive = { def receive(event: BoundaryEvent): Unit =
// Cases that are most likely on the hot path, in decreasing order of frequency if (waitingForShutdown) event match {
case OnNext(id: Int, e: Any) case ExposedPublisher(_, id, publisher)
if (GraphInterpreter.Debug) println(s"${interpreter.Name} onNext $e id=$id") outputs(id).exposedPublisher(publisher)
inputs(id).onNext(e) publishersPending -= 1
runBatch() if (canShutDown) _isTerminated = true
case RequestMore(id: Int, demand: Long) case OnSubscribe(_, _, sub)
if (GraphInterpreter.Debug) println(s"${interpreter.Name} request $demand id=$id") tryCancel(sub)
outputs(id).requestMore(demand) subscribesPending -= 1
runBatch() if (canShutDown) _isTerminated = true
case Resume case Abort(_)
if (GraphInterpreter.Debug) println(s"${interpreter.Name} resume") tryAbort(new TimeoutException("Streaming actor has been already stopped processing (normally), but not all of its " +
resumeScheduled = false s"inputs or outputs have been subscribed in [${settings.subscriptionTimeoutSettings.timeout}}]. Aborting actor now."))
if (interpreter.isSuspended) runBatch() case _ // Ignore, there is nothing to do anyway
case AsyncInput(logic, event, handler) }
if (GraphInterpreter.Debug) println(s"${interpreter.Name} ASYNC $event ($handler) [$logic]") else event match {
if (!interpreter.isStageCompleted(logic)) { // Cases that are most likely on the hot path, in decreasing order of frequency
try handler(event) case OnNext(_, id: Int, e: Any)
catch { if (GraphInterpreter.Debug) println(s"${interpreter.Name} onNext $e id=$id")
case NonFatal(e) logic.failStage(e) inputs(id).onNext(e)
runBatch()
case RequestMore(_, id: Int, demand: Long)
if (GraphInterpreter.Debug) println(s"${interpreter.Name} request $demand id=$id")
outputs(id).requestMore(demand)
runBatch()
case Resume(_)
if (GraphInterpreter.Debug) println(s"${interpreter.Name} resume")
resumeScheduled = false
if (interpreter.isSuspended) runBatch()
case AsyncInput(_, logic, event, handler)
if (GraphInterpreter.Debug) println(s"${interpreter.Name} ASYNC $event ($handler) [$logic]")
if (!interpreter.isStageCompleted(logic)) {
try handler(event)
catch {
case NonFatal(e) logic.failStage(e)
}
interpreter.afterStageHasRun(logic)
} }
interpreter.afterStageHasRun(logic) runBatch()
}
runBatch()
// Initialization and completion messages // Initialization and completion messages
case OnError(id: Int, cause: Throwable) case OnError(_, id: Int, cause: Throwable)
if (GraphInterpreter.Debug) println(s"${interpreter.Name} onError id=$id") if (GraphInterpreter.Debug) println(s"${interpreter.Name} onError id=$id")
inputs(id).onError(cause) inputs(id).onError(cause)
runBatch() runBatch()
case OnComplete(id: Int) case OnComplete(_, id: Int)
if (GraphInterpreter.Debug) println(s"${interpreter.Name} onComplete id=$id") if (GraphInterpreter.Debug) println(s"${interpreter.Name} onComplete id=$id")
inputs(id).onComplete() inputs(id).onComplete()
runBatch() runBatch()
case OnSubscribe(id: Int, subscription: Subscription) case OnSubscribe(_, id: Int, subscription: Subscription)
if (GraphInterpreter.Debug) println(s"${interpreter.Name} onSubscribe id=$id") if (GraphInterpreter.Debug) println(s"${interpreter.Name} onSubscribe id=$id")
subscribesPending -= 1 subscribesPending -= 1
inputs(id).onSubscribe(subscription) inputs(id).onSubscribe(subscription)
runBatch() runBatch()
case Cancel(id: Int) case Cancel(_, id: Int)
if (GraphInterpreter.Debug) println(s"${interpreter.Name} cancel id=$id") if (GraphInterpreter.Debug) println(s"${interpreter.Name} cancel id=$id")
outputs(id).cancel() outputs(id).cancel()
runBatch() runBatch()
case SubscribePending(id: Int) case SubscribePending(_, id: Int)
outputs(id).subscribePending() outputs(id).subscribePending()
case ExposedPublisher(id, publisher) case ExposedPublisher(_, id, publisher)
publishersPending -= 1 publishersPending -= 1
outputs(id).exposedPublisher(publisher) outputs(id).exposedPublisher(publisher)
}
} private var _isTerminated = false
def isTerminated: Boolean = _isTerminated
private def waitShutdown: Receive = {
case ExposedPublisher(id, publisher)
outputs(id).exposedPublisher(publisher)
publishersPending -= 1
if (canShutDown) context.stop(self)
case OnSubscribe(_, sub)
tryCancel(sub)
subscribesPending -= 1
if (canShutDown) context.stop(self)
case ReceiveTimeout
tryAbort(new TimeoutException("Streaming actor has been already stopped processing (normally), but not all of its " +
s"inputs or outputs have been subscribed in [${settings.subscriptionTimeoutSettings.timeout}}]. Aborting actor now."))
case _ // Ignore, there is nothing to do anyway
}
private def canShutDown: Boolean = subscribesPending + publishersPending == 0 private def canShutDown: Boolean = subscribesPending + publishersPending == 0
private var waitingForShutdown: Boolean = false
private val resume = Resume(this)
private def runBatch(): Unit = { private def runBatch(): Unit = {
try { try {
val effectiveLimit = { val effectiveLimit = {
@ -436,18 +446,19 @@ private[stream] class ActorGraphInterpreter(
interpreter.execute(effectiveLimit) interpreter.execute(effectiveLimit)
if (interpreter.isCompleted) { if (interpreter.isCompleted) {
// Cannot stop right away if not completely subscribed // Cannot stop right away if not completely subscribed
if (canShutDown) context.stop(self) if (canShutDown) _isTerminated = true
else { else {
context.become(waitShutdown) waitingForShutdown = true
context.setReceiveTimeout(settings.subscriptionTimeoutSettings.timeout) mat.scheduleOnce(settings.subscriptionTimeoutSettings.timeout, new Runnable {
override def run(): Unit = self ! Abort(GraphInterpreterShell.this)
})
} }
} else if (interpreter.isSuspended && !resumeScheduled) { } else if (interpreter.isSuspended && !resumeScheduled) {
resumeScheduled = true resumeScheduled = true
self ! Resume self ! resume
} }
} catch { } catch {
case NonFatal(e) case NonFatal(e)
context.stop(self)
tryAbort(e) tryAbort(e)
} }
} }
@ -458,20 +469,57 @@ private[stream] class ActorGraphInterpreter(
* - the event limit is reached * - the event limit is reached
* - a new error is encountered * - a new error is encountered
*/ */
private def tryAbort(ex: Throwable): Unit = { def tryAbort(ex: Throwable): Unit = {
// This should handle termination while interpreter is running. If the upstream have been closed already this // This should handle termination while interpreter is running. If the upstream have been closed already this
// call has no effect and therefore do the right thing: nothing. // call has no effect and therefore do the right thing: nothing.
try { try {
inputs.foreach(_.onInternalError(ex)) inputs.foreach(_.onInternalError(ex))
interpreter.execute(abortLimit) interpreter.execute(abortLimit)
interpreter.finish() interpreter.finish()
} // Will only have an effect if the above call to the interpreter failed to emit a proper failure to the downstream } catch {
// otherwise this will have no effect case NonFatal(_)
finally { } finally {
_isTerminated = true
// Will only have an effect if the above call to the interpreter failed to emit a proper failure to the downstream
// otherwise this will have no effect
outputs.foreach(_.fail(ex)) outputs.foreach(_.fail(ex))
inputs.foreach(_.cancel()) inputs.foreach(_.cancel())
} }
} }
override def postStop(): Unit = tryAbort(AbruptTerminationException(self)) override def toString: String = s"GraphInterpreterShell\n ${assembly.toString.replace("\n", "\n ")}"
}
/**
* INTERNAL API
*/
private[stream] class ActorGraphInterpreter(_initial: GraphInterpreterShell) extends Actor {
import ActorGraphInterpreter._
var activeInterpreters = Set(_initial)
def registerShell(shell: GraphInterpreterShell): ActorRef = {
shell.init(self, registerShell)
if (GraphInterpreter.Debug) println(s"registering new shell in ${_initial}\n ${shell.toString.replace("\n", "\n ")}")
activeInterpreters += shell
self
}
override def preStart(): Unit = {
activeInterpreters.foreach(_.init(self, registerShell))
}
override def receive: Receive = {
case b: BoundaryEvent
val shell = b.shell
if (GraphInterpreter.Debug)
if (!activeInterpreters.contains(shell)) println(s"RECEIVED EVENT $b FOR UNKNOWN SHELL $shell")
shell.receive(b)
if (shell.isTerminated) {
activeInterpreters -= shell
if (activeInterpreters.isEmpty) context.stop(self)
}
}
override def postStop(): Unit = activeInterpreters.foreach(_.tryAbort(AbruptTerminationException(self)))
} }

View file

@ -372,6 +372,9 @@ private[stream] final class GraphInterpreter(
shape.inlets.size + shape.outlets.size + keepGoing shape.inlets.size + shape.outlets.size + keepGoing
} }
private var _subFusingMaterializer: Materializer = _
def subFusingMaterializer: Materializer = _subFusingMaterializer
// An event queue implemented as a circular buffer // An event queue implemented as a circular buffer
// FIXME: This calculates the maximum size ever needed, but most assemblies can run on a smaller queue // FIXME: This calculates the maximum size ever needed, but most assemblies can run on a smaller queue
private[this] val eventQueue = Array.ofDim[Int](1 << (32 - Integer.numberOfLeadingZeros(assembly.connectionCount - 1))) private[this] val eventQueue = Array.ofDim[Int](1 << (32 - Integer.numberOfLeadingZeros(assembly.connectionCount - 1)))
@ -445,9 +448,14 @@ private[stream] final class GraphInterpreter(
def isCompleted: Boolean = runningStages == 0 && !isSuspended def isCompleted: Boolean = runningStages == 0 && !isSuspended
/** /**
* Initializes the states of all the stage logics by calling preStart() * Initializes the states of all the stage logics by calling preStart().
* The passed-in materializer is intended to be a SubFusingActorMaterializer
* that avoids creating new Actors when stages materialize sub-flows. If no
* such materializer is available, passing in `null` will reuse the normal
* materializer for the GraphInterpreterfusing is only an optimization.
*/ */
def init(): Unit = { def init(subMat: Materializer): Unit = {
_subFusingMaterializer = if (subMat == null) materializer else subMat
var i = 0 var i = 0
while (i < logics.length) { while (i < logics.length) {
val logic = logics(i) val logic = logics(i)

View file

@ -150,7 +150,7 @@ private[akka] class IteratorInterpreter[I, O](val input: Iterator[I], val ops: S
fuzzingMode = false) fuzzingMode = false)
interpreter.attachUpstreamBoundary(0, upstream) interpreter.attachUpstreamBoundary(0, upstream)
interpreter.attachDownstreamBoundary(ops.length, downstream) interpreter.attachDownstreamBoundary(ops.length, downstream)
interpreter.init() interpreter.init(null)
} }
init() init()

View file

@ -125,7 +125,7 @@ final class FlattenMerge[T, M](breadth: Int) extends GraphStage[FlowShape[Graph[
if (localSource.elem == null) removeSource(localSource) if (localSource.elem == null) removeSource(localSource)
case OnError(ex) case OnError(ex)
failStage(ex) failStage(ex)
}.invoke))(interpreter.materializer) }.invoke))(interpreter.subFusingMaterializer)
localSource.activate(subF) localSource.activate(subF)
} }
@ -140,6 +140,8 @@ final class FlattenMerge[T, M](breadth: Int) extends GraphStage[FlowShape[Graph[
sources.foreach(_.cancel()) sources.foreach(_.cancel())
} }
} }
override def toString: String = s"FlattenMerge($breadth)"
} }
/** /**
@ -285,8 +287,8 @@ object PrefixAndTail {
override def completeSubstream(): Unit = onParentFinish.invoke(()) override def completeSubstream(): Unit = onParentFinish.invoke(())
override def failSubstream(ex: Throwable): Unit = onParentFailure.invoke(ex) override def failSubstream(ex: Throwable): Unit = onParentFailure.invoke(ex)
override def onPull(): Unit = pullParent() override def onPull(): Unit = pullParent(())
override def onDownstreamFinish(): Unit = cancelParent() override def onDownstreamFinish(): Unit = cancelParent(())
} }
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new TailSourceLogic(shape) override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new TailSourceLogic(shape)
@ -413,4 +415,6 @@ final class PrefixAndTail[T](n: Int) extends GraphStage[FlowShape[T, (immutable.
} }
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new PrefixAndTailLogic(shape) override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new PrefixAndTailLogic(shape)
}
override def toString: String = s"PrefixAndTail($n)"
}

View file

@ -296,6 +296,19 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount:
throw new IllegalStateException("not yet initialized: only setHandler is allowed in GraphStageLogic constructor") throw new IllegalStateException("not yet initialized: only setHandler is allowed in GraphStageLogic constructor")
else _interpreter else _interpreter
/**
* The [[akka.stream.Materializer]] that has set this GraphStage in motion.
*/
protected def materializer: Materializer = interpreter.materializer
/**
* An [[akka.stream.Materializer]] that may run fusable parts of the graphs
* that it materializes within the same actor as the current GraphStage (if
* fusing is available). This materializer must not be shared outside of the
* GraphStage.
*/
protected def subFusingMaterializer: Materializer = interpreter.subFusingMaterializer
/** /**
* Input handler that terminates the stage upon receiving completion. * Input handler that terminates the stage upon receiving completion.
* The stage fails upon receiving a failure. * The stage fails upon receiving a failure.