diff --git a/akka-stream-tests/src/test/scala/akka/stream/impl/ActorInterpreterSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/impl/ActorInterpreterSpec.scala new file mode 100644 index 0000000000..418fd7de74 --- /dev/null +++ b/akka-stream-tests/src/test/scala/akka/stream/impl/ActorInterpreterSpec.scala @@ -0,0 +1,166 @@ +/** + * Copyright (C) 2015 Typesafe Inc. + */ +package akka.stream.impl + +import akka.stream.Supervision._ +import akka.stream.testkit.AkkaSpec +import akka.stream._ +import akka.stream.scaladsl._ +import akka.stream.testkit.StreamTestKit._ +import akka.stream.impl.fusing.ActorInterpreter +import akka.stream.stage.Stage +import akka.stream.stage.PushPullStage +import akka.stream.stage.Context +import akka.testkit.TestLatch +import scala.concurrent.Await +import scala.concurrent.duration._ + +class ActorInterpreterSpec extends AkkaSpec { + import FlowGraph.Implicits._ + + implicit val mat = ActorFlowMaterializer() + + class Setup(ops: List[Stage[_, _]] = List(fusing.Map({ x: Any ⇒ x }, stoppingDecider))) { + val up = PublisherProbe[Int] + val down = SubscriberProbe[Int] + private val props = ActorInterpreter.props(mat.settings, ops).withDispatcher("akka.test.stream-dispatcher") + val actor = system.actorOf(props) + val processor = ActorProcessorFactory[Int, Int](actor) + } + + "An ActorInterpreter" must { + + "pass along early cancellation" in new Setup { + processor.subscribe(down) + val sub = down.expectSubscription() + sub.cancel() + up.subscribe(processor) + val upsub = up.expectSubscription() + upsub.expectCancellation() + } + + "heed cancellation signal while large demand is outstanding" in { + val latch = TestLatch() + val infinite = new PushPullStage[Int, Int] { + override def onPush(elem: Int, ctx: Context[Int]) = ??? + override def onPull(ctx: Context[Int]) = { + Await.ready(latch, 5.seconds) + ctx.push(42) + } + } + val N = system.settings.config.getInt("akka.stream.materializer.output-burst-limit") + + new Setup(infinite :: Nil) { + processor.subscribe(down) + val sub = down.expectSubscription() + up.subscribe(processor) + val upsub = up.expectSubscription() + sub.request(100000000) + sub.cancel() + watch(actor) + latch.countDown() + for (i ← 1 to N) withClue(s"iteration $i: ") { + try down.expectNext(42) catch { case e: Throwable ⇒ fail(e) } + } + // now cancellation request is processed + down.expectNoMsg(500.millis) + upsub.expectCancellation() + expectTerminated(actor) + } + } + + "heed upstream failure while large demand is outstanding" in { + val latch = TestLatch() + val infinite = new PushPullStage[Int, Int] { + override def onPush(elem: Int, ctx: Context[Int]) = ??? + override def onPull(ctx: Context[Int]) = { + Await.ready(latch, 5.seconds) + ctx.push(42) + } + } + val N = system.settings.config.getInt("akka.stream.materializer.output-burst-limit") + + new Setup(infinite :: Nil) { + processor.subscribe(down) + val sub = down.expectSubscription() + up.subscribe(processor) + val upsub = up.expectSubscription() + sub.request(100000000) + val ex = new Exception("FAIL!") + upsub.sendError(ex) + latch.countDown() + for (i ← 1 to N) withClue(s"iteration $i: ") { + try down.expectNext(42) catch { case e: Throwable ⇒ fail(e) } + } + down.expectError(ex) + } + } + + "hold back upstream completion while large demand is outstanding" in { + val latch = TestLatch() + val N = 3 * system.settings.config.getInt("akka.stream.materializer.output-burst-limit") + val infinite = new PushPullStage[Int, Int] { + private var remaining = N + override def onPush(elem: Int, ctx: Context[Int]) = ??? + override def onPull(ctx: Context[Int]) = { + Await.ready(latch, 5.seconds) + remaining -= 1 + if (remaining >= 0) ctx.push(42) + else ctx.finish() + } + override def onUpstreamFinish(ctx: Context[Int]) = { + if (remaining > 0) ctx.absorbTermination() + else ctx.finish() + } + } + + new Setup(infinite :: Nil) { + processor.subscribe(down) + val sub = down.expectSubscription() + up.subscribe(processor) + val upsub = up.expectSubscription() + sub.request(100000000) + upsub.sendComplete() + latch.countDown() + for (i ← 1 to N) withClue(s"iteration $i: ") { + try down.expectNext(42) catch { case e: Throwable ⇒ fail(e) } + } + down.expectComplete() + } + } + + "satisfy large demand" in largeDemand(0) + "satisfy larger demand" in largeDemand(1) + + def largeDemand(extra: Int): Unit = { + val N = 3 * system.settings.config.getInt("akka.stream.materializer.output-burst-limit") + val large = new PushPullStage[Int, Int] { + private var remaining = N + override def onPush(elem: Int, ctx: Context[Int]) = ??? + override def onPull(ctx: Context[Int]) = { + remaining -= 1 + if (remaining >= 0) ctx.push(42) + else ctx.finish() + } + } + + new Setup(large :: Nil) { + processor.subscribe(down) + val sub = down.expectSubscription() + up.subscribe(processor) + val upsub = up.expectSubscription() + sub.request(100000000) + watch(actor) + for (i ← 1 to N) withClue(s"iteration $i: ") { + try down.expectNext(42) catch { case e: Throwable ⇒ fail(e) } + } + down.expectComplete() + upsub.expectCancellation() + expectTerminated(actor) + } + } + + } + +} diff --git a/akka-stream-tests/src/test/scala/akka/stream/impl/InterpreterSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/impl/InterpreterSpec.scala deleted file mode 100644 index 69b92cc393..0000000000 --- a/akka-stream-tests/src/test/scala/akka/stream/impl/InterpreterSpec.scala +++ /dev/null @@ -1,38 +0,0 @@ -/** - * Copyright (C) 2015 Typesafe Inc. - */ -package akka.stream.impl - -import akka.stream.Supervision._ -import akka.stream.testkit.AkkaSpec -import akka.stream._ -import akka.stream.scaladsl._ -import akka.stream.testkit.StreamTestKit._ -import akka.stream.impl.fusing.ActorInterpreter - -class InterpreterSpec extends AkkaSpec { - import FlowGraph.Implicits._ - - implicit val mat = ActorFlowMaterializer() - - class Setup { - val up = PublisherProbe[Int] - val down = SubscriberProbe[Int] - private val props = ActorInterpreter.props(mat.settings, List(fusing.Map({ x: Any ⇒ x }, stoppingDecider))).withDispatcher("akka.test.stream-dispatcher") - val processor = ActorProcessorFactory[Int, Int](system.actorOf(props)) - } - - "An ActorInterpreter" must { - - "pass along early cancellation" in new Setup { - processor.subscribe(down) - val sub = down.expectSubscription() - sub.cancel() - up.subscribe(processor) - val upsub = up.expectSubscription() - upsub.expectCancellation() - } - - } - -} \ No newline at end of file diff --git a/akka-stream/src/main/resources/reference.conf b/akka-stream/src/main/resources/reference.conf index a635da2ee2..695fefeaf0 100644 --- a/akka-stream/src/main/resources/reference.conf +++ b/akka-stream/src/main/resources/reference.conf @@ -37,6 +37,9 @@ akka { # Enable additional troubleshooting logging at DEBUG log level debug-logging = off + + # Maximum number of elements emitted in batch if downstream signals large demand + output-burst-limit = 1000 } } diff --git a/akka-stream/src/main/scala/akka/stream/ActorFlowMaterializer.scala b/akka-stream/src/main/scala/akka/stream/ActorFlowMaterializer.scala index 2043656d99..15f78e31c5 100644 --- a/akka-stream/src/main/scala/akka/stream/ActorFlowMaterializer.scala +++ b/akka-stream/src/main/scala/akka/stream/ActorFlowMaterializer.scala @@ -171,6 +171,7 @@ object ActorFlowMaterializerSettings { supervisionDecider = Supervision.stoppingDecider, subscriptionTimeoutSettings = StreamSubscriptionTimeoutSettings(config), debugLogging = config.getBoolean("debug-logging"), + outputBurstLimit = config.getInt("output-burst-limit"), optimizations = Optimizations.none) /** @@ -205,6 +206,7 @@ final case class ActorFlowMaterializerSettings( supervisionDecider: Supervision.Decider, subscriptionTimeoutSettings: StreamSubscriptionTimeoutSettings, debugLogging: Boolean, + outputBurstLimit: Int, optimizations: Optimizations) { require(initialInputBufferSize > 0, "initialInputBufferSize must be > 0") diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorPublisher.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorPublisher.scala index a97aa4458e..4b92ed8f11 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorPublisher.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorPublisher.scala @@ -65,8 +65,7 @@ private[akka] class ActorPublisher[T](val impl: ActorRef) extends Publisher[T] { def takePendingSubscribers(): immutable.Seq[Subscriber[_ >: T]] = { val pending = pendingSubscribers.getAndSet(Nil) - assert(pending ne null, "takePendingSubscribers must not be called after shutdown") - pending.reverse + if (pending eq null) Nil else pending.reverse } def shutdown(reason: Option[Throwable]): Unit = { diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorInterpreter.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorInterpreter.scala index 35bd75bdf7..7223f6d1a3 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorInterpreter.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorInterpreter.scala @@ -16,6 +16,7 @@ import scala.util.control.NonFatal import akka.actor.Props import akka.actor.ActorLogging import akka.event.LoggingAdapter +import akka.actor.DeadLetterSuppression /** * INTERNAL API @@ -40,6 +41,8 @@ private[akka] class BatchingActorInputBoundary(val size: Int) val subreceive: SubReceive = new SubReceive(waitingForUpstream) + def isFinished = (upstream ne null) && upstreamCompleted + private def dequeue(): Any = { val elem = inputBuffer(nextInputElementCursor) assert(elem ne null) @@ -82,7 +85,7 @@ private[akka] class BatchingActorInputBoundary(val size: Int) override def onDownstreamFinish(ctx: BoundaryContext): TerminationDirective = { cancel() - ctx.exit() + ctx.finish() } def cancel(): Unit = { @@ -102,7 +105,7 @@ private[akka] class BatchingActorInputBoundary(val size: Int) private def onComplete(): Unit = if (!upstreamCompleted) { upstreamCompleted = true - subreceive.become(completed) + // onUpstreamFinish is not back-pressured, stages need to deal with this if (inputBufferElements == 0) enter().finish() } @@ -119,7 +122,6 @@ private[akka] class BatchingActorInputBoundary(val size: Int) private def onError(e: Throwable): Unit = { upstreamCompleted = true - subreceive.become(completed) enter().fail(e) } @@ -142,18 +144,25 @@ private[akka] class BatchingActorInputBoundary(val size: Int) case OnSubscribe(subscription) ⇒ subscription.cancel() // spec rule 2.5 } - private def completed: Actor.Receive = { - case OnSubscribe(subscription) ⇒ throw new IllegalStateException("Cannot subscribe shutdown subscriber") - } +} +private[akka] object ActorOutputBoundary { + /** + * INTERNAL API. + */ + private case object ContinuePulling extends DeadLetterSuppression } /** * INTERNAL API */ -private[akka] class ActorOutputBoundary(val actor: ActorRef, debugLogging: Boolean, log: LoggingAdapter) +private[akka] class ActorOutputBoundary(val actor: ActorRef, + val debugLogging: Boolean, + val log: LoggingAdapter, + val outputBurstLimit: Int) extends BoundaryStage { import ReactiveStreamsCompliance._ + import ActorOutputBoundary._ private var exposedPublisher: ActorPublisher[Any] = _ @@ -162,7 +171,31 @@ private[akka] class ActorOutputBoundary(val actor: ActorRef, debugLogging: Boole // This flag is only used if complete/fail is called externally since this op turns into a Finished one inside the // interpreter (i.e. inside this op this flag has no effects since if it is completed the op will not be invoked) private var downstreamCompleted = false + // this is true while we “hold the ball”; while “false” incoming demand will just be queued up private var upstreamWaiting = true + // the number of elements emitted during a single execution is bounded + private var burstRemaining = outputBurstLimit + + private def tryBounceBall(ctx: BoundaryContext) = { + burstRemaining -= 1 + if (burstRemaining > 0) ctx.pull() + else { + actor ! ContinuePulling + takeBallOut(ctx) + } + } + + private def takeBallOut(ctx: BoundaryContext) = { + upstreamWaiting = true + ctx.exit() + } + + private def tryPutBallIn() = + if (upstreamWaiting) { + burstRemaining = outputBurstLimit + upstreamWaiting = false + enter().pull() + } val subreceive = new SubReceive(waitingExposedPublisher) @@ -191,12 +224,9 @@ private[akka] class ActorOutputBoundary(val actor: ActorRef, debugLogging: Boole override def onPush(elem: Any, ctx: BoundaryContext): Directive = { onNext(elem) - if (downstreamDemand > 0) ctx.pull() - else if (downstreamCompleted) ctx.finish() - else { - upstreamWaiting = true - ctx.exit() - } + if (downstreamCompleted) ctx.finish() + else if (downstreamDemand > 0) tryBounceBall(ctx) + else takeBallOut(ctx) } override def onPull(ctx: BoundaryContext): Directive = @@ -240,12 +270,12 @@ private[akka] class ActorOutputBoundary(val actor: ActorRef, debugLogging: Boole downstreamDemand += elements if (downstreamDemand < 0) downstreamDemand = Long.MaxValue // Long overflow, Reactive Streams Spec 3:17: effectively unbounded - if (upstreamWaiting) { - upstreamWaiting = false - enter().pull() - } + tryPutBallIn() } + case ContinuePulling ⇒ + if (!downstreamCompleted && downstreamDemand > 0) tryPutBallIn() + case Cancel(subscription) ⇒ downstreamCompleted = true subscriber = null @@ -270,7 +300,7 @@ private[akka] class ActorInterpreter(val settings: ActorFlowMaterializerSettings extends Actor with ActorLogging { private val upstream = new BatchingActorInputBoundary(settings.initialInputBufferSize) - private val downstream = new ActorOutputBoundary(self, settings.debugLogging, log) + private val downstream = new ActorOutputBoundary(self, settings.debugLogging, log, settings.outputBurstLimit) private val interpreter = new OneBoundedInterpreter(upstream +: ops :+ downstream) interpreter.init() @@ -278,7 +308,7 @@ private[akka] class ActorInterpreter(val settings: ActorFlowMaterializerSettings override protected[akka] def aroundReceive(receive: Actor.Receive, msg: Any): Unit = { super.aroundReceive(receive, msg) - if (interpreter.isFinished) context.stop(self) + if (interpreter.isFinished && upstream.isFinished) context.stop(self) } override def postStop(): Unit = { diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/Interpreter.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/Interpreter.scala index b48c341436..0f40bd0bd7 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/Interpreter.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/Interpreter.scala @@ -170,7 +170,7 @@ private[akka] class OneBoundedInterpreter(ops: Seq[Stage[_, _]], val forkLimit: // Points to the current point of execution inside the pipeline private var activeOpIndex = -1 // The current interpreter state that decides what happens at the next round - private var state: State = Pushing + private var state: State = _ // Counter that keeps track of the depth of recursive forked executions private var forkCount = 0 @@ -179,6 +179,23 @@ private[akka] class OneBoundedInterpreter(ops: Seq[Stage[_, _]], val forkLimit: private var lastOpFailing: Int = -1 + private def pipeName(op: UntypedOp): String = { + val o = (op: AbstractStage[_, _, _, _, _]) + (o match { + case Finished ⇒ "finished" + case _: BoundaryStage ⇒ "boundary" + case _: StatefulStage[_, _] ⇒ "stateful" + case _: PushStage[_, _] ⇒ "push" + case _: PushPullStage[_, _] ⇒ "pushpull" + case _: DetachedStage[_, _] ⇒ "detached" + case _ ⇒ "other" + }) + s"(${o.allowedToPush},${o.holding},${o.terminationPending})" + } + override def toString = + s"""|OneBoundedInterpreter + | pipeline = ${pipeline map pipeName mkString ":"} + | activeOp=$activeOpIndex state=$state elem=$elementInFlight forks=$forkCount""".stripMargin + @inline private def currentOp: UntypedOp = pipeline(activeOpIndex) // see the jumpBacks variable for explanation @@ -298,6 +315,8 @@ private[akka] class OneBoundedInterpreter(ops: Seq[Stage[_, _]], val forkLimit: private final val Pushing: State = new State { override def advance(): Unit = activeOpIndex += 1 override def run(): Unit = currentOp.onPush(elementInFlight, ctx = this) + + override def toString = "Pushing" } private final val PushFinish: State = new State { @@ -315,6 +334,8 @@ private[akka] class OneBoundedInterpreter(ops: Seq[Stage[_, _]], val forkLimit: state = Completing null } + + override def toString = "PushFinish" } private final val Pulling: State = new State { @@ -330,6 +351,7 @@ private[akka] class OneBoundedInterpreter(ops: Seq[Stage[_, _]], val forkLimit: super.hold() } + override def toString = "Pulling" } private final val Completing: State = new State { @@ -357,6 +379,8 @@ private[akka] class OneBoundedInterpreter(ops: Seq[Stage[_, _]], val forkLimit: else exit() null } + + override def toString = "Completing" } private final val Cancelling: State = new State { @@ -375,6 +399,8 @@ private[akka] class OneBoundedInterpreter(ops: Seq[Stage[_, _]], val forkLimit: state = Cancelling null } + + override def toString = "Cancelling" } private final case class Failing(cause: Throwable) extends State { @@ -545,6 +571,8 @@ private[akka] class OneBoundedInterpreter(ops: Seq[Stage[_, _]], val forkLimit: execute() null } + + override def toString = s"boundary($op)" } } op += 1 diff --git a/akka-stream/src/main/scala/akka/stream/stage/Stage.scala b/akka-stream/src/main/scala/akka/stream/stage/Stage.scala index bfc80dfe83..1f9d714b82 100644 --- a/akka-stream/src/main/scala/akka/stream/stage/Stage.scala +++ b/akka-stream/src/main/scala/akka/stream/stage/Stage.scala @@ -60,6 +60,9 @@ private[stream] abstract class AbstractStage[-In, Out, PushD <: Directive, PullD * with [[akka.stream.stage.Context#isFinishing]]. * * By default the finish signal is immediately propagated with [[akka.stream.stage.Context#finish]]. + * + * *IMPORTANT NOTICE:* this signal is not back-pressured, it might arrive from upstream even though + * the last action by this stage was a “push”. */ def onUpstreamFinish(ctx: Ctx): TerminationDirective = ctx.finish()