From 17733e5a54a672b107bc875ea92a90a40f1e42a6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Endre=20S=C3=A1ndor=20Varga?= Date: Fri, 8 May 2015 12:47:49 +0200 Subject: [PATCH] +str #17418: Improved error handling for termination cases in Interpreter --- .../stream/impl/fusing/InterpreterSpec.scala | 22 +++++++++++++ .../akka/stream/impl/fusing/Interpreter.scala | 32 +++++++++++-------- .../main/scala/akka/stream/stage/Stage.scala | 5 +-- 3 files changed, 44 insertions(+), 15 deletions(-) diff --git a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/InterpreterSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/InterpreterSpec.scala index c6a3865b53..852a6f3e8e 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/InterpreterSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/InterpreterSpec.scala @@ -3,6 +3,9 @@ */ package akka.stream.impl.fusing +import akka.stream.stage._ + +import scala.util.control.NoStackTrace import akka.stream.Supervision class InterpreterSpec extends InterpreterSpecKit { @@ -496,6 +499,25 @@ class InterpreterSpec extends InterpreterSpecKit { lastEvents() should be(Set(OnNext("foo"), OnComplete)) } + "report error if pull is called while op is terminating" in new TestSetup(Seq(new PushPullStage[Any, Any] { + override def onPull(ctx: Context[Any]): SyncDirective = ctx.pull() + override def onPush(elem: Any, ctx: Context[Any]): SyncDirective = ctx.pull() + override def onUpstreamFinish(ctx: Context[Any]): TerminationDirective = ctx.absorbTermination() + })) { + lastEvents() should be(Set.empty) + + downstream.requestOne() + lastEvents() should be(Set(RequestOne)) + + upstream.onComplete() + val ev = lastEvents() + ev.nonEmpty should be(true) + ev.forall { + case OnError(_: IllegalStateException) ⇒ true + case _ ⇒ false + } should be(true) + } + "implement expand-filter" in pending "implement take-conflate" in pending diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/Interpreter.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/Interpreter.scala index c0e275e32a..ea4f862bb2 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/Interpreter.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/Interpreter.scala @@ -270,14 +270,17 @@ private[akka] class OneBoundedInterpreter(ops: Seq[Stage[_, _]], protected def mustHave(b: Int): Unit = if (!hasBits(b)) { - def format(b: Int) = - (b & BothBalls: @switch) match { + def format(b: Int) = { + val ballStatus = (b & BothBalls: @switch) match { case 0 ⇒ "no balls" case UpstreamBall ⇒ "upstream ball" case DownstreamBall ⇒ "downstream ball" case BothBalls ⇒ "upstream & downstream balls" } - throw new IllegalStateException(s"operation requires ${format(b)} while holding ${format(currentOp.bits)} and receiving ${format(incomingBall)}") + if ((b & NoTerminationPending) > 0) ballStatus + " and not isFinishing" + else ballStatus + " and isFinishing" + } + throw new IllegalStateException(s"operation requires [${format(b)}] while holding [${format(currentOp.bits)}] and receiving [${format(incomingBall)}]") } override def push(elem: Any): DownstreamDirective = { @@ -294,11 +297,13 @@ private[akka] class OneBoundedInterpreter(ops: Seq[Stage[_, _]], } override def pull(): UpstreamDirective = { + var requirements = NoTerminationPending if (currentOp.isDetached) { if (incomingBall == DownstreamBall) throw new IllegalStateException("Cannot pull during onPull, only push, pushAndPull or holdDownstreamAndPull") - mustHave(UpstreamBall) + requirements |= UpstreamBall } + mustHave(requirements) removeBits(UpstreamBall) addBits(PrecedingWasPull) state = Pulling @@ -325,7 +330,7 @@ private[akka] class OneBoundedInterpreter(ops: Seq[Stage[_, _]], null } - def isFinishing: Boolean = hasBits(TerminationPending) + def isFinishing: Boolean = !hasBits(NoTerminationPending) final protected def pushAndFinishCommon(elem: Any, finishState: UntypedOp): Unit = { finishCurrentOp(finishState) @@ -362,6 +367,7 @@ private[akka] class OneBoundedInterpreter(ops: Seq[Stage[_, _]], } override def holdUpstream(): UpstreamDirective = { + mustHave(NoTerminationPending) removeBits(PrecedingWasPull) addBits(UpstreamBall) exit() @@ -371,7 +377,7 @@ private[akka] class OneBoundedInterpreter(ops: Seq[Stage[_, _]], ReactiveStreamsCompliance.requireNonNullElement(elem) if (incomingBall != UpstreamBall) throw new IllegalStateException("can only holdUpstreamAndPush from onPush") - mustHave(BothBalls) + mustHave(BothBallsAndNoTerminationPending) removeBits(PrecedingWasPull | DownstreamBall) addBits(UpstreamBall) elementInFlight = elem @@ -389,7 +395,7 @@ private[akka] class OneBoundedInterpreter(ops: Seq[Stage[_, _]], override def holdDownstreamAndPull(): DownstreamDirective = { if (incomingBall != DownstreamBall) throw new IllegalStateException("can only holdDownstreamAndPull from onPull") - mustHave(BothBalls) + mustHave(BothBallsAndNoTerminationPending) addBits(PrecedingWasPull | DownstreamBall) removeBits(UpstreamBall) state = Pulling @@ -400,7 +406,7 @@ private[akka] class OneBoundedInterpreter(ops: Seq[Stage[_, _]], override def pushAndPull(elem: Any): FreeDirective = { ReactiveStreamsCompliance.requireNonNullElement(elem) - mustHave(BothBalls) + mustHave(BothBallsAndNoTerminationPending) addBits(PrecedingWasPull) removeBits(BothBalls) fork(Pushing, elem) @@ -410,7 +416,7 @@ private[akka] class OneBoundedInterpreter(ops: Seq[Stage[_, _]], override def absorbTermination(): TerminationDirective = { updateJumpBacks(activeOpIndex) - removeBits(BothBalls) + removeBits(BothBallsAndNoTerminationPending) finish() } @@ -479,7 +485,7 @@ private[akka] class OneBoundedInterpreter(ops: Seq[Stage[_, _]], } override def run(): Unit = { - if (hasBits(TerminationPending)) exit() + if (!hasBits(NoTerminationPending)) exit() else currentOp.onUpstreamFinish(ctx = this) } @@ -489,7 +495,7 @@ private[akka] class OneBoundedInterpreter(ops: Seq[Stage[_, _]], } override def absorbTermination(): TerminationDirective = { - addBits(TerminationPending) + removeBits(NoTerminationPending) removeBits(UpstreamBall) updateJumpBacks(activeOpIndex) if (hasBits(DownstreamBall) || (!currentOp.isDetached && hasBits(PrecedingWasPull))) { @@ -512,7 +518,7 @@ private[akka] class OneBoundedInterpreter(ops: Seq[Stage[_, _]], } def run(): Unit = { - if (hasBits(TerminationPending)) exit() + if (!hasBits(NoTerminationPending)) exit() else currentOp.onDownstreamFinish(ctx = this) } @@ -536,7 +542,7 @@ private[akka] class OneBoundedInterpreter(ops: Seq[Stage[_, _]], def run(): Unit = currentOp.onUpstreamFailure(cause, ctx = this) override def absorbTermination(): TerminationDirective = { - addBits(TerminationPending) + removeBits(NoTerminationPending) removeBits(UpstreamBall) updateJumpBacks(activeOpIndex) if (hasBits(DownstreamBall) || (!currentOp.isDetached && hasBits(PrecedingWasPull))) { diff --git a/akka-stream/src/main/scala/akka/stream/stage/Stage.scala b/akka-stream/src/main/scala/akka/stream/stage/Stage.scala index 1fee01963b..2e58a33ca0 100644 --- a/akka-stream/src/main/scala/akka/stream/stage/Stage.scala +++ b/akka-stream/src/main/scala/akka/stream/stage/Stage.scala @@ -34,15 +34,16 @@ private[stream] object AbstractStage { final val UpstreamBall = 1 final val DownstreamBall = 2 final val BothBalls = UpstreamBall | DownstreamBall + final val BothBallsAndNoTerminationPending = UpstreamBall | DownstreamBall | NoTerminationPending final val PrecedingWasPull = 0x4000 - final val TerminationPending = 0x8000 + final val NoTerminationPending = 0x8000 } abstract class AbstractStage[-In, Out, PushD <: Directive, PullD <: Directive, Ctx <: Context[Out], LifeCtx <: LifecycleContext] extends Stage[In, Out] { /** * INTERNAL API */ - private[stream] var bits = 0 + private[stream] var bits = AbstractStage.NoTerminationPending /** * INTERNAL API