diff --git a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/InterpreterSpecKit.scala b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/InterpreterSpecKit.scala index 575f1c715c..2981b6e55f 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/InterpreterSpecKit.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/InterpreterSpecKit.scala @@ -50,7 +50,7 @@ trait InterpreterSpecKit extends AkkaSpec { override def onDownstreamFinish(ctx: BoundaryContext): TerminationDirective = { lastEvent += Cancel - ctx.exit() + ctx.finish() } override def onPull(ctx: BoundaryContext): Directive = { @@ -75,12 +75,12 @@ trait InterpreterSpecKit extends AkkaSpec { override def onUpstreamFinish(ctx: BoundaryContext): TerminationDirective = { lastEvent += OnComplete - ctx.exit() + ctx.finish() } override def onUpstreamFailure(cause: Throwable, ctx: BoundaryContext): TerminationDirective = { lastEvent += OnError(cause) - ctx.exit() + ctx.finish() } override def onPull(ctx: BoundaryContext): Directive = diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowConflateSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowConflateSpec.scala index 4574c7c4d4..ae012f6724 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowConflateSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowConflateSpec.scala @@ -6,8 +6,7 @@ package akka.stream.scaladsl import scala.concurrent.Await import scala.concurrent.duration._ import scala.concurrent.forkjoin.ThreadLocalRandom -import akka.stream.FlowMaterializer -import akka.stream.MaterializerSettings +import akka.stream.{ OverflowStrategy, FlowMaterializer, MaterializerSettings } import akka.stream.testkit.{ StreamTestKit, AkkaSpec } class FlowConflateSpec extends AkkaSpec { @@ -93,6 +92,14 @@ class FlowConflateSpec extends AkkaSpec { } + "work with a buffer and fold" in { + val future = Source(1 to 50) + .conflate(seed = i ⇒ i)(aggregate = (sum, i) ⇒ sum + i) + .buffer(50, OverflowStrategy.backpressure) + .fold(0)(_ + _) + Await.result(future, 3.seconds) should be((1 to 50).sum) + } + } } diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/Interpreter.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/Interpreter.scala index c9c1c3fa28..f10fc03b65 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/Interpreter.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/Interpreter.scala @@ -161,7 +161,7 @@ private[akka] class OneBoundedInterpreter(ops: Seq[Stage[_, _]], val forkLimit: // make it possible for the Pushing state to be an object private var elementInFlight: Any = _ // Points to the current point of execution inside the pipeline - private var activeOp = -1 + private var activeOpIndex = -1 // The current interpreter state that decides what happens at the next round private var state: State = Pushing @@ -170,6 +170,10 @@ private[akka] class OneBoundedInterpreter(ops: Seq[Stage[_, _]], val forkLimit: // List that is used as an auxiliary stack if fork recursion depth reaches forkLimit private var overflowStack = List.empty[(Int, State, Any)] + private var lastOpFailing: Int = -1 + + @inline private def currentOp: UntypedOp = pipeline(activeOpIndex) + // see the jumpBacks variable for explanation private def calculateJumpBacks: Array[Int] = { val table = Array.ofDim[Int](pipeline.length) @@ -182,19 +186,36 @@ private[akka] class OneBoundedInterpreter(ops: Seq[Stage[_, _]], val forkLimit: } private sealed trait State extends DetachedContext[Any] with BoundaryContext { + final def progress(): Unit = { + advance() + if (inside) run() + else exit() + } + + /** + * Override this method to do execution steps necessary after executing an op, and advance the activeOpIndex + * to another value (next or previous steps). Do NOT put code that invokes the next op, override run instead. + */ def advance(): Unit + /** + * Override this method to enter the current op and execute it. Do NOT put code that should be executed after the + * op has been invoked, that should be in the advance() method of the next state resulting from the invokation of + * the op. + */ + def run(): Unit + override def push(elem: Any): DownstreamDirective = { - if (pipeline(activeOp).holding) throw new IllegalStateException("Cannot push while holding, only pushAndPull") - pipeline(activeOp).allowedToPush = false + if (currentOp.holding) throw new IllegalStateException("Cannot push while holding, only pushAndPull") + currentOp.allowedToPush = false elementInFlight = elem state = Pushing PhantomDirective } override def pull(): UpstreamDirective = { - if (pipeline(activeOp).holding) throw new IllegalStateException("Cannot pull while holding, only pushAndPull") - pipeline(activeOp).allowedToPush = !pipeline(activeOp).isInstanceOf[DetachedStage[_, _]] + if (currentOp.holding) throw new IllegalStateException("Cannot pull while holding, only pushAndPull") + currentOp.allowedToPush = !currentOp.isInstanceOf[DetachedStage[_, _]] state = Pulling PhantomDirective } @@ -205,10 +226,10 @@ private[akka] class OneBoundedInterpreter(ops: Seq[Stage[_, _]], val forkLimit: PhantomDirective } - def isFinishing: Boolean = pipeline(activeOp).terminationPending + def isFinishing: Boolean = currentOp.terminationPending override def pushAndFinish(elem: Any): DownstreamDirective = { - pipeline(activeOp) = Finished.asInstanceOf[UntypedOp] + pipeline(activeOpIndex) = Finished.asInstanceOf[UntypedOp] // This MUST be an unsafeFork because the execution of PushFinish MUST strictly come before the finish execution // path. Other forks are not order dependent because they execute on isolated execution domains which cannot // "cross paths". This unsafeFork is relatively safe here because PushAndFinish simply absorbs all later downstream @@ -227,45 +248,41 @@ private[akka] class OneBoundedInterpreter(ops: Seq[Stage[_, _]], val forkLimit: } override def hold(): FreeDirective = { - if (pipeline(activeOp).holding) throw new IllegalStateException("Cannot hold while already holding") - pipeline(activeOp).holding = true + if (currentOp.holding) throw new IllegalStateException("Cannot hold while already holding") + currentOp.holding = true exit() } - override def isHolding: Boolean = pipeline(activeOp).holding + override def isHolding: Boolean = currentOp.holding override def pushAndPull(elem: Any): FreeDirective = { - if (!pipeline(activeOp).holding) throw new IllegalStateException("Cannot pushAndPull without holding first") - pipeline(activeOp).holding = false + if (!currentOp.holding) throw new IllegalStateException("Cannot pushAndPull without holding first") + currentOp.holding = false fork(Pushing, elem) state = Pulling PhantomDirective } override def absorbTermination(): TerminationDirective = { - pipeline(activeOp).holding = false + currentOp.holding = false finish() } override def exit(): FreeDirective = { elementInFlight = null - activeOp = -1 + activeOpIndex = -1 PhantomDirective } } private object Pushing extends State { - override def advance(): Unit = { - activeOp += 1 - pipeline(activeOp).onPush(elementInFlight, ctx = this) - } + override def advance(): Unit = activeOpIndex += 1 + override def run(): Unit = currentOp.onPush(elementInFlight, ctx = this) } private object PushFinish extends State { - override def advance(): Unit = { - activeOp += 1 - pipeline(activeOp).onPush(elementInFlight, ctx = this) - } + override def advance(): Unit = activeOpIndex += 1 + override def run(): Unit = currentOp.onPush(elementInFlight, ctx = this) override def pushAndFinish(elem: Any): DownstreamDirective = { elementInFlight = elem @@ -282,25 +299,26 @@ private[akka] class OneBoundedInterpreter(ops: Seq[Stage[_, _]], val forkLimit: private object Pulling extends State { override def advance(): Unit = { elementInFlight = null - activeOp = jumpBacks(activeOp) - pipeline(activeOp).onPull(ctx = this) + activeOpIndex = jumpBacks(activeOpIndex) } + override def run(): Unit = currentOp.onPull(ctx = this) + override def hold(): FreeDirective = { + currentOp.allowedToPush = true super.hold() - pipeline(activeOp).allowedToPush = true - PhantomDirective } } private object Completing extends State { override def advance(): Unit = { elementInFlight = null - pipeline(activeOp) = Finished.asInstanceOf[UntypedOp] - activeOp += 1 + pipeline(activeOpIndex) = Finished.asInstanceOf[UntypedOp] + activeOpIndex += 1 + } - // FIXME issue #16345, ArrayIndexOutOfBoundsException - if (!pipeline(activeOp).terminationPending) pipeline(activeOp).onUpstreamFinish(ctx = this) + override def run(): Unit = { + if (!currentOp.terminationPending) currentOp.onUpstreamFinish(ctx = this) else exit() } @@ -310,10 +328,10 @@ private[akka] class OneBoundedInterpreter(ops: Seq[Stage[_, _]], val forkLimit: } override def absorbTermination(): TerminationDirective = { - pipeline(activeOp).terminationPending = true - pipeline(activeOp).holding = false + currentOp.terminationPending = true + currentOp.holding = false // FIXME: This state is potentially corrupted by the jumpBackTable (not updated when jumping over) - if (pipeline(activeOp).allowedToPush) pipeline(activeOp).onPull(ctx = Pulling) + if (currentOp.allowedToPush) currentOp.onPull(ctx = Pulling) else exit() PhantomDirective } @@ -322,11 +340,12 @@ private[akka] class OneBoundedInterpreter(ops: Seq[Stage[_, _]], val forkLimit: private object Cancelling extends State { override def advance(): Unit = { elementInFlight = null - pipeline(activeOp) = Finished.asInstanceOf[UntypedOp] - activeOp -= 1 + pipeline(activeOpIndex) = Finished.asInstanceOf[UntypedOp] + activeOpIndex -= 1 + } - // FIXME issue #16345, ArrayIndexOutOfBoundsException - if (!pipeline(activeOp).terminationPending) pipeline(activeOp).onDownstreamFinish(ctx = this) + def run(): Unit = { + if (!currentOp.terminationPending) currentOp.onDownstreamFinish(ctx = this) else exit() } @@ -339,40 +358,38 @@ private[akka] class OneBoundedInterpreter(ops: Seq[Stage[_, _]], val forkLimit: private final case class Failing(cause: Throwable) extends State { override def advance(): Unit = { elementInFlight = null - pipeline(activeOp) = Finished.asInstanceOf[UntypedOp] - activeOp += 1 - pipeline(activeOp).onUpstreamFailure(cause, ctx = this) + pipeline(activeOpIndex) = Finished.asInstanceOf[UntypedOp] + activeOpIndex += 1 } + def run(): Unit = currentOp.onUpstreamFailure(cause, ctx = this) + override def absorbTermination(): TerminationDirective = { - pipeline(activeOp).terminationPending = true - pipeline(activeOp).holding = false - if (pipeline(activeOp).allowedToPush) pipeline(activeOp).onPull(ctx = Pulling) + currentOp.terminationPending = true + currentOp.holding = false + if (currentOp.allowedToPush) currentOp.onPull(ctx = Pulling) else exit() PhantomDirective } } + private def inside: Boolean = activeOpIndex > -1 && activeOpIndex < pipeline.length + @tailrec private def execute(): Unit = { - while (activeOp > -1 && activeOp < pipeline.length) { + while (inside) { try { - state.advance() + state.progress() } catch { - case NonFatal(e) ⇒ - try { - state.fail(e) - } catch { - case NonFatal(_) ⇒ - // TODO: Make pipeline all failed - throw new IllegalStateException("Double Fault: Failure while handling failure", e) - } + case NonFatal(e) if lastOpFailing != activeOpIndex ⇒ + lastOpFailing = activeOpIndex + state.fail(e) } } // Execute all delayed forks that were put on the heap if the fork limit has been reached if (overflowStack.nonEmpty) { val memo = overflowStack.head - activeOp = memo._1 + activeOpIndex = memo._1 state = memo._2 elementInFlight = memo._3 overflowStack = overflowStack.tail @@ -391,7 +408,7 @@ private[akka] class OneBoundedInterpreter(ops: Seq[Stage[_, _]], val forkLimit: forkCount += 1 if (forkCount == forkLimit) { if (!overflowToHeap) throw new IllegalStateException("Fork limit reached") - else overflowStack ::= ((activeOp, forkState, elem)) + else overflowStack ::= ((activeOpIndex, forkState, elem)) } else unsafeFork(forkState, elem) forkCount -= 1 } @@ -401,11 +418,11 @@ private[akka] class OneBoundedInterpreter(ops: Seq[Stage[_, _]], val forkLimit: * is order dependent since the push and complete events travel in the same direction and not isolated by a boundary */ private def unsafeFork(forkState: State, elem: Any = null): Unit = { - val savePos = activeOp + val savePos = activeOpIndex elementInFlight = elem state = forkState execute() - activeOp = savePos + activeOpIndex = savePos } def init(): Unit = { @@ -427,45 +444,46 @@ private[akka] class OneBoundedInterpreter(ops: Seq[Stage[_, _]], val forkLimit: pipeline(op).asInstanceOf[BoundaryStage].bctx = new State { val entryPoint = op + override def run(): Unit = () override def advance(): Unit = () override def push(elem: Any): DownstreamDirective = { - activeOp = entryPoint + activeOpIndex = entryPoint super.push(elem) execute() PhantomDirective } override def pull(): UpstreamDirective = { - activeOp = entryPoint + activeOpIndex = entryPoint super.pull() execute() PhantomDirective } override def finish(): FreeDirective = { - activeOp = entryPoint + activeOpIndex = entryPoint super.finish() execute() PhantomDirective } override def fail(cause: Throwable): FreeDirective = { - activeOp = entryPoint + activeOpIndex = entryPoint super.fail(cause) execute() PhantomDirective } override def hold(): FreeDirective = { - activeOp = entryPoint + activeOpIndex = entryPoint super.hold() execute() PhantomDirective } override def pushAndPull(elem: Any): FreeDirective = { - activeOp = entryPoint + activeOpIndex = entryPoint super.pushAndPull(elem) execute() PhantomDirective @@ -486,7 +504,7 @@ private[akka] class OneBoundedInterpreter(ops: Seq[Stage[_, _]], val forkLimit: var op = pipeline.length - 1 while (op >= 0) { if (pipeline(op).isInstanceOf[DetachedStage[_, _]]) { - activeOp = op + activeOpIndex = op state = Pulling execute() }