diff --git a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/InterpreterSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/InterpreterSpec.scala index 2f8ba7542d..9ef8d12f5b 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/InterpreterSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/InterpreterSpec.scala @@ -3,6 +3,8 @@ */ package akka.stream.impl.fusing +import akka.stream.impl.fusing.Map + import scala.util.control.NoStackTrace class InterpreterSpec extends InterpreterSpecKit { @@ -419,6 +421,38 @@ class InterpreterSpec extends InterpreterSpecKit { } + "work with jumpback table and completed elements" in new TestSetup(Seq( + Map((x: Int) ⇒ x), + Map((x: Int) ⇒ x), + KeepGoing(), + Map((x: Int) ⇒ x), + Map((x: Int) ⇒ x))) { + + lastEvents() should be(Set.empty) + + downstream.requestOne() + lastEvents() should be(Set(RequestOne)) + + upstream.onNext(1) + lastEvents() should be(Set(OnNext(1))) + + downstream.requestOne() + lastEvents() should be(Set(RequestOne)) + + upstream.onNext(2) + lastEvents() should be(Set(OnNext(2))) + + upstream.onComplete() + lastEvents() should be(Set.empty) + + downstream.requestOne() + lastEvents() should be(Set(OnNext(2))) + + downstream.requestOne() + lastEvents() should be(Set(OnNext(2))) + + } + "implement expand-filter" in pending "implement take-conflate" in pending diff --git a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/InterpreterSpecKit.scala b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/InterpreterSpecKit.scala index 2981b6e55f..fa71665651 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/InterpreterSpecKit.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/InterpreterSpecKit.scala @@ -32,6 +32,23 @@ trait InterpreterSpecKit extends AkkaSpec { } } + private[akka] case class KeepGoing[T]() extends PushPullStage[T, T] { + var lastElem: T = _ + + override def onPush(elem: T, ctx: Context[T]): Directive = { + lastElem = elem + ctx.push(elem) + } + + override def onPull(ctx: Context[T]): Directive = { + if (ctx.isFinishing) { + ctx.push(lastElem) + } else ctx.pull() + } + + override def onUpstreamFinish(ctx: Context[T]): TerminationDirective = ctx.absorbTermination() + } + abstract class TestSetup(ops: Seq[Stage[_, _]], forkLimit: Int = 100, overflowToHeap: Boolean = false) { private var lastEvent: Set[Any] = Set.empty diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/Interpreter.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/Interpreter.scala index 4bbb7482ee..cc328e3ab5 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/Interpreter.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/Interpreter.scala @@ -185,6 +185,15 @@ private[akka] class OneBoundedInterpreter(ops: Seq[Stage[_, _]], val forkLimit: table } + private def updateJumpBacks(lastNonCompletedIndex: Int): Unit = { + var pos = lastNonCompletedIndex + // For every jump that would jump over us we change them to jump into us + while (jumpBacks(pos) < lastNonCompletedIndex && pos < pipeline.length) { + jumpBacks(pos) = lastNonCompletedIndex + pos += 1 + } + } + private sealed trait State extends DetachedContext[Any] with BoundaryContext { final def progress(): Unit = { advance() @@ -264,6 +273,7 @@ private[akka] class OneBoundedInterpreter(ops: Seq[Stage[_, _]], val forkLimit: } override def absorbTermination(): TerminationDirective = { + updateJumpBacks(activeOpIndex) currentOp.holding = false finish() } @@ -330,7 +340,7 @@ private[akka] class OneBoundedInterpreter(ops: Seq[Stage[_, _]], val forkLimit: override def absorbTermination(): TerminationDirective = { currentOp.terminationPending = true currentOp.holding = false - // FIXME: This state is potentially corrupted by the jumpBackTable (not updated when jumping over) + updateJumpBacks(activeOpIndex) if (currentOp.allowedToPush) currentOp.onPull(ctx = Pulling) else exit() null @@ -367,6 +377,7 @@ private[akka] class OneBoundedInterpreter(ops: Seq[Stage[_, _]], val forkLimit: override def absorbTermination(): TerminationDirective = { currentOp.terminationPending = true currentOp.holding = false + updateJumpBacks(activeOpIndex) if (currentOp.allowedToPush) currentOp.onPull(ctx = Pulling) else exit() null