From 6ba5c0b4c69f4984f6a8336e7dda80c1f04b5c07 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Endre=20S=C3=A1ndor=20Varga?= Date: Mon, 4 May 2015 16:05:36 +0200 Subject: [PATCH] =str #17351: Fix pushAndFinish improper stage termination --- .../stream/impl/fusing/InterpreterSpec.scala | 74 ++++++++++++++++++- .../impl/fusing/InterpreterSpecKit.scala | 5 ++ .../akka/stream/scaladsl/FlowStageSpec.scala | 30 ++++++++ .../akka/stream/impl/fusing/Interpreter.scala | 46 ++++++++++-- 4 files changed, 146 insertions(+), 9 deletions(-) diff --git a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/InterpreterSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/InterpreterSpec.scala index 9fe4787199..eae8ac1856 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/InterpreterSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/InterpreterSpec.scala @@ -3,6 +3,8 @@ */ package akka.stream.impl.fusing +import akka.stream.stage.{ TerminationDirective, SyncDirective, Context, PushStage } + import scala.util.control.NoStackTrace import akka.stream.Supervision @@ -460,6 +462,52 @@ class InterpreterSpec extends InterpreterSpecKit { } + // This test is related to issue #17351 + class PushFinishStage extends PushStage[Any, Any] { + override def onPush(elem: Any, ctx: Context[Any]): SyncDirective = ctx.pushAndFinish(elem) + override def onUpstreamFinish(ctx: Context[Any]): TerminationDirective = + ctx.fail(akka.stream.testkit.Utils.TE("Cannot happen")) + } + + "work with pushAndFinish if upstream completes with pushAndFinish" in new TestSetup(Seq( + new PushFinishStage)) { + + lastEvents() should be(Set.empty) + + downstream.requestOne() + lastEvents() should be(Set(RequestOne)) + + upstream.onNextAndComplete("foo") + lastEvents() should be(Set(OnNext("foo"), OnComplete)) + } + + "work with pushAndFinish if indirect upstream completes with pushAndFinish" in new TestSetup(Seq( + Map((x: Any) ⇒ x, stoppingDecider), + new PushFinishStage, + Map((x: Any) ⇒ x, stoppingDecider))) { + + lastEvents() should be(Set.empty) + + downstream.requestOne() + lastEvents() should be(Set(RequestOne)) + + upstream.onNextAndComplete("foo") + lastEvents() should be(Set(OnNext("foo"), OnComplete)) + } + + "work with pushAndFinish if upstream completes with pushAndFinish and downstream immediately pulls" in new TestSetup(Seq( + new PushFinishStage, + Fold("", (x: String, y: String) ⇒ x + y, stoppingDecider))) { + + lastEvents() should be(Set.empty) + + downstream.requestOne() + lastEvents() should be(Set(RequestOne)) + + upstream.onNextAndComplete("foo") + lastEvents() should be(Set(OnNext("foo"), OnComplete)) + } + "implement expand-filter" in pending "implement take-conflate" in pending @@ -470,7 +518,31 @@ class InterpreterSpec extends InterpreterSpecKit { "implement expand-take" in pending - "implement take-take" in pending + "implement take-take" in new TestSetup(Seq( + Take(1), + Take(1))) { + lastEvents() should be(Set.empty) + + downstream.requestOne() + lastEvents() should be(Set(RequestOne)) + + upstream.onNext("foo") + lastEvents() should be(Set(OnNext("foo"), OnComplete, Cancel)) + + } + + "implement take-take with pushAndFinish from upstream" in new TestSetup(Seq( + Take(1), + Take(1))) { + lastEvents() should be(Set.empty) + + downstream.requestOne() + lastEvents() should be(Set(RequestOne)) + + upstream.onNextAndComplete("foo") + lastEvents() should be(Set(OnNext("foo"), OnComplete)) + + } "implement take-drop" in pending diff --git a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/InterpreterSpecKit.scala b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/InterpreterSpecKit.scala index 07b2e08580..a089c77406 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/InterpreterSpecKit.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/InterpreterSpecKit.scala @@ -92,6 +92,11 @@ trait InterpreterSpecKit extends AkkaSpec { def onNext(elem: Any): Unit = enterAndPush(elem) def onComplete(): Unit = enterAndFinish() + def onNextAndComplete(elem: Any): Unit = { + context.enter() + context.pushAndFinish(elem) + context.execute() + } def onError(cause: Throwable): Unit = enterAndFail(cause) } diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowStageSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowStageSpec.scala index c252842be4..6a7cfa3113 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowStageSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowStageSpec.scala @@ -424,6 +424,36 @@ class FlowStageSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug upsub.expectCancellation() } + "not trigger onUpstreamFinished after pushAndFinish" in assertAllStagesStopped { + val in = TestPublisher.manualProbe[Int]() + val flow = + Source(in) + .transform(() ⇒ new StatefulStage[Int, Int] { + + def initial: StageState[Int, Int] = new State { + override def onPush(element: Int, ctx: Context[Int]) = + ctx.pushAndFinish(element) + } + override def onUpstreamFinish(ctx: Context[Int]): TerminationDirective = + terminationEmit(Iterator(42), ctx) + }) + .runWith(Sink.publisher) + + val inSub = in.expectSubscription() + + val out = TestSubscriber.manualProbe[Int]() + flow.subscribe(out) + val outSub = out.expectSubscription() + + inSub.sendNext(23) + inSub.sendComplete() + + outSub.request(1) // it depends on this line, i.e. generating backpressure between buffer and stage execution + + out.expectNext(23) + out.expectComplete() + } + } } diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/Interpreter.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/Interpreter.scala index 26e25412f3..ed4bbc1963 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/Interpreter.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/Interpreter.scala @@ -37,6 +37,25 @@ private[akka] object OneBoundedInterpreter { override def onDownstreamFinish(ctx: BoundaryContext): TerminationDirective = ctx.exit() override def onUpstreamFailure(cause: Throwable, ctx: BoundaryContext): TerminationDirective = ctx.exit() } + + /** + * INTERNAL API + * + * This artificial op is used as a boundary to prevent the first forked onPush of execution of a pushFinish to enter + * the originating stage again. This stage allows the forked upstream onUpstreamFinish to pass through if there was + * no onPull called on the stage. Calling onPull on this op makes it a Finished op, which absorbs the + * onUpstreamTermination, but otherwise onUpstreamTermination results in calling finish() + */ + private[akka] object PushFinished extends BoundaryStage { + override def onPush(elem: Any, ctx: BoundaryContext): UpstreamDirective = ctx.finish() + override def onPull(ctx: BoundaryContext): DownstreamDirective = ctx.finish() + // This allows propagation of an onUpstreamFinish call. Note that if onPull has been called on this stage + // before, then the call ctx.finish() in onPull already turned this op to a normal Finished, i.e. it will no longer + // propagate onUpstreamFinish. + override def onUpstreamFinish(ctx: BoundaryContext): TerminationDirective = ctx.finish() + override def onDownstreamFinish(ctx: BoundaryContext): TerminationDirective = ctx.exit() + override def onUpstreamFailure(cause: Throwable, ctx: BoundaryContext): TerminationDirective = ctx.exit() + } } /** @@ -200,7 +219,7 @@ private[akka] class OneBoundedInterpreter(ops: Seq[Stage[_, _]], private def updateJumpBacks(lastNonCompletedIndex: Int): Unit = { var pos = lastNonCompletedIndex // For every jump that would jump over us we change them to jump into us - while (jumpBacks(pos) < lastNonCompletedIndex && pos < pipeline.length) { + while (pos < pipeline.length && jumpBacks(pos) < lastNonCompletedIndex) { jumpBacks(pos) = lastNonCompletedIndex pos += 1 } @@ -290,6 +309,7 @@ private[akka] class OneBoundedInterpreter(ops: Seq[Stage[_, _]], } override def finish(): FreeDirective = { + finishCurrentOp() fork(Completing) state = Cancelling null @@ -297,13 +317,19 @@ private[akka] class OneBoundedInterpreter(ops: Seq[Stage[_, _]], def isFinishing: Boolean = hasBits(TerminationPending) - override def pushAndFinish(elem: Any): DownstreamDirective = { + final protected def pushAndFinishCommon(elem: Any): Unit = { ReactiveStreamsCompliance.requireNonNullElement(elem) if (currentOp.isDetached) { mustHave(DownstreamBall) } removeBits(DownstreamBall | PrecedingWasPull) + } + + override def pushAndFinish(elem: Any): DownstreamDirective = { + pushAndFinishCommon(elem) + // Spit the execution domain in two, and invoke op postStop callbacks if there are any finishCurrentOp() + // This MUST be an unsafeFork because the execution of PushFinish MUST strictly come before the finish execution // path. Other forks are not order dependent because they execute on isolated execution domains which cannot // "cross paths". This unsafeFork is relatively safe here because PushAndFinish simply absorbs all later downstream @@ -311,8 +337,12 @@ private[akka] class OneBoundedInterpreter(ops: Seq[Stage[_, _]], // It might be that there are some degenerate cases where this can blow up the stack with a very long chain but I // am not aware of such scenario yet. If you know one, put it in InterpreterStressSpec :) unsafeFork(PushFinish, elem) + + // Same as finish, without calling finishCurrentOp elementInFlight = null - finish() + fork(Completing) + state = Cancelling + null } override def fail(cause: Throwable): FreeDirective = { @@ -397,11 +427,11 @@ private[akka] class OneBoundedInterpreter(ops: Seq[Stage[_, _]], override def run(): Unit = currentOp.onPush(elementInFlight, ctx = this) override def pushAndFinish(elem: Any): DownstreamDirective = { - ReactiveStreamsCompliance.requireNonNullElement(elem) - /* - * FIXME (RK) please someone explain why this works: the stage already - * terminated, but eventually it will see another onPull because nobody noticed. - */ + pushAndFinishCommon(elem) + // Put an isolation barrier that will prevent the onPull of this op to be called again. This barrier + // is different from simple Finished that it allows onUpstreamTerminated to pass through, unless onPull + // has been called on the stage + pipeline(activeOpIndex) = PushFinished.asInstanceOf[UntypedOp] elementInFlight = elem state = PushFinish null