+str #17418: Improved error handling for termination cases in Interpreter

This commit is contained in:
Endre Sándor Varga 2015-05-08 12:47:49 +02:00
parent ba3a69369f
commit 17733e5a54
3 changed files with 44 additions and 15 deletions

View file

@ -3,6 +3,9 @@
*/ */
package akka.stream.impl.fusing package akka.stream.impl.fusing
import akka.stream.stage._
import scala.util.control.NoStackTrace
import akka.stream.Supervision import akka.stream.Supervision
class InterpreterSpec extends InterpreterSpecKit { class InterpreterSpec extends InterpreterSpecKit {
@ -496,6 +499,25 @@ class InterpreterSpec extends InterpreterSpecKit {
lastEvents() should be(Set(OnNext("foo"), OnComplete)) lastEvents() should be(Set(OnNext("foo"), OnComplete))
} }
"report error if pull is called while op is terminating" in new TestSetup(Seq(new PushPullStage[Any, Any] {
override def onPull(ctx: Context[Any]): SyncDirective = ctx.pull()
override def onPush(elem: Any, ctx: Context[Any]): SyncDirective = ctx.pull()
override def onUpstreamFinish(ctx: Context[Any]): TerminationDirective = ctx.absorbTermination()
})) {
lastEvents() should be(Set.empty)
downstream.requestOne()
lastEvents() should be(Set(RequestOne))
upstream.onComplete()
val ev = lastEvents()
ev.nonEmpty should be(true)
ev.forall {
case OnError(_: IllegalStateException) true
case _ false
} should be(true)
}
"implement expand-filter" in pending "implement expand-filter" in pending
"implement take-conflate" in pending "implement take-conflate" in pending

View file

@ -270,14 +270,17 @@ private[akka] class OneBoundedInterpreter(ops: Seq[Stage[_, _]],
protected def mustHave(b: Int): Unit = protected def mustHave(b: Int): Unit =
if (!hasBits(b)) { if (!hasBits(b)) {
def format(b: Int) = def format(b: Int) = {
(b & BothBalls: @switch) match { val ballStatus = (b & BothBalls: @switch) match {
case 0 "no balls" case 0 "no balls"
case UpstreamBall "upstream ball" case UpstreamBall "upstream ball"
case DownstreamBall "downstream ball" case DownstreamBall "downstream ball"
case BothBalls "upstream & downstream balls" case BothBalls "upstream & downstream balls"
} }
throw new IllegalStateException(s"operation requires ${format(b)} while holding ${format(currentOp.bits)} and receiving ${format(incomingBall)}") if ((b & NoTerminationPending) > 0) ballStatus + " and not isFinishing"
else ballStatus + " and isFinishing"
}
throw new IllegalStateException(s"operation requires [${format(b)}] while holding [${format(currentOp.bits)}] and receiving [${format(incomingBall)}]")
} }
override def push(elem: Any): DownstreamDirective = { override def push(elem: Any): DownstreamDirective = {
@ -294,11 +297,13 @@ private[akka] class OneBoundedInterpreter(ops: Seq[Stage[_, _]],
} }
override def pull(): UpstreamDirective = { override def pull(): UpstreamDirective = {
var requirements = NoTerminationPending
if (currentOp.isDetached) { if (currentOp.isDetached) {
if (incomingBall == DownstreamBall) if (incomingBall == DownstreamBall)
throw new IllegalStateException("Cannot pull during onPull, only push, pushAndPull or holdDownstreamAndPull") throw new IllegalStateException("Cannot pull during onPull, only push, pushAndPull or holdDownstreamAndPull")
mustHave(UpstreamBall) requirements |= UpstreamBall
} }
mustHave(requirements)
removeBits(UpstreamBall) removeBits(UpstreamBall)
addBits(PrecedingWasPull) addBits(PrecedingWasPull)
state = Pulling state = Pulling
@ -325,7 +330,7 @@ private[akka] class OneBoundedInterpreter(ops: Seq[Stage[_, _]],
null null
} }
def isFinishing: Boolean = hasBits(TerminationPending) def isFinishing: Boolean = !hasBits(NoTerminationPending)
final protected def pushAndFinishCommon(elem: Any, finishState: UntypedOp): Unit = { final protected def pushAndFinishCommon(elem: Any, finishState: UntypedOp): Unit = {
finishCurrentOp(finishState) finishCurrentOp(finishState)
@ -362,6 +367,7 @@ private[akka] class OneBoundedInterpreter(ops: Seq[Stage[_, _]],
} }
override def holdUpstream(): UpstreamDirective = { override def holdUpstream(): UpstreamDirective = {
mustHave(NoTerminationPending)
removeBits(PrecedingWasPull) removeBits(PrecedingWasPull)
addBits(UpstreamBall) addBits(UpstreamBall)
exit() exit()
@ -371,7 +377,7 @@ private[akka] class OneBoundedInterpreter(ops: Seq[Stage[_, _]],
ReactiveStreamsCompliance.requireNonNullElement(elem) ReactiveStreamsCompliance.requireNonNullElement(elem)
if (incomingBall != UpstreamBall) if (incomingBall != UpstreamBall)
throw new IllegalStateException("can only holdUpstreamAndPush from onPush") throw new IllegalStateException("can only holdUpstreamAndPush from onPush")
mustHave(BothBalls) mustHave(BothBallsAndNoTerminationPending)
removeBits(PrecedingWasPull | DownstreamBall) removeBits(PrecedingWasPull | DownstreamBall)
addBits(UpstreamBall) addBits(UpstreamBall)
elementInFlight = elem elementInFlight = elem
@ -389,7 +395,7 @@ private[akka] class OneBoundedInterpreter(ops: Seq[Stage[_, _]],
override def holdDownstreamAndPull(): DownstreamDirective = { override def holdDownstreamAndPull(): DownstreamDirective = {
if (incomingBall != DownstreamBall) if (incomingBall != DownstreamBall)
throw new IllegalStateException("can only holdDownstreamAndPull from onPull") throw new IllegalStateException("can only holdDownstreamAndPull from onPull")
mustHave(BothBalls) mustHave(BothBallsAndNoTerminationPending)
addBits(PrecedingWasPull | DownstreamBall) addBits(PrecedingWasPull | DownstreamBall)
removeBits(UpstreamBall) removeBits(UpstreamBall)
state = Pulling state = Pulling
@ -400,7 +406,7 @@ private[akka] class OneBoundedInterpreter(ops: Seq[Stage[_, _]],
override def pushAndPull(elem: Any): FreeDirective = { override def pushAndPull(elem: Any): FreeDirective = {
ReactiveStreamsCompliance.requireNonNullElement(elem) ReactiveStreamsCompliance.requireNonNullElement(elem)
mustHave(BothBalls) mustHave(BothBallsAndNoTerminationPending)
addBits(PrecedingWasPull) addBits(PrecedingWasPull)
removeBits(BothBalls) removeBits(BothBalls)
fork(Pushing, elem) fork(Pushing, elem)
@ -410,7 +416,7 @@ private[akka] class OneBoundedInterpreter(ops: Seq[Stage[_, _]],
override def absorbTermination(): TerminationDirective = { override def absorbTermination(): TerminationDirective = {
updateJumpBacks(activeOpIndex) updateJumpBacks(activeOpIndex)
removeBits(BothBalls) removeBits(BothBallsAndNoTerminationPending)
finish() finish()
} }
@ -479,7 +485,7 @@ private[akka] class OneBoundedInterpreter(ops: Seq[Stage[_, _]],
} }
override def run(): Unit = { override def run(): Unit = {
if (hasBits(TerminationPending)) exit() if (!hasBits(NoTerminationPending)) exit()
else currentOp.onUpstreamFinish(ctx = this) else currentOp.onUpstreamFinish(ctx = this)
} }
@ -489,7 +495,7 @@ private[akka] class OneBoundedInterpreter(ops: Seq[Stage[_, _]],
} }
override def absorbTermination(): TerminationDirective = { override def absorbTermination(): TerminationDirective = {
addBits(TerminationPending) removeBits(NoTerminationPending)
removeBits(UpstreamBall) removeBits(UpstreamBall)
updateJumpBacks(activeOpIndex) updateJumpBacks(activeOpIndex)
if (hasBits(DownstreamBall) || (!currentOp.isDetached && hasBits(PrecedingWasPull))) { if (hasBits(DownstreamBall) || (!currentOp.isDetached && hasBits(PrecedingWasPull))) {
@ -512,7 +518,7 @@ private[akka] class OneBoundedInterpreter(ops: Seq[Stage[_, _]],
} }
def run(): Unit = { def run(): Unit = {
if (hasBits(TerminationPending)) exit() if (!hasBits(NoTerminationPending)) exit()
else currentOp.onDownstreamFinish(ctx = this) else currentOp.onDownstreamFinish(ctx = this)
} }
@ -536,7 +542,7 @@ private[akka] class OneBoundedInterpreter(ops: Seq[Stage[_, _]],
def run(): Unit = currentOp.onUpstreamFailure(cause, ctx = this) def run(): Unit = currentOp.onUpstreamFailure(cause, ctx = this)
override def absorbTermination(): TerminationDirective = { override def absorbTermination(): TerminationDirective = {
addBits(TerminationPending) removeBits(NoTerminationPending)
removeBits(UpstreamBall) removeBits(UpstreamBall)
updateJumpBacks(activeOpIndex) updateJumpBacks(activeOpIndex)
if (hasBits(DownstreamBall) || (!currentOp.isDetached && hasBits(PrecedingWasPull))) { if (hasBits(DownstreamBall) || (!currentOp.isDetached && hasBits(PrecedingWasPull))) {

View file

@ -34,15 +34,16 @@ private[stream] object AbstractStage {
final val UpstreamBall = 1 final val UpstreamBall = 1
final val DownstreamBall = 2 final val DownstreamBall = 2
final val BothBalls = UpstreamBall | DownstreamBall final val BothBalls = UpstreamBall | DownstreamBall
final val BothBallsAndNoTerminationPending = UpstreamBall | DownstreamBall | NoTerminationPending
final val PrecedingWasPull = 0x4000 final val PrecedingWasPull = 0x4000
final val TerminationPending = 0x8000 final val NoTerminationPending = 0x8000
} }
abstract class AbstractStage[-In, Out, PushD <: Directive, PullD <: Directive, Ctx <: Context[Out], LifeCtx <: LifecycleContext] extends Stage[In, Out] { abstract class AbstractStage[-In, Out, PushD <: Directive, PullD <: Directive, Ctx <: Context[Out], LifeCtx <: LifecycleContext] extends Stage[In, Out] {
/** /**
* INTERNAL API * INTERNAL API
*/ */
private[stream] var bits = 0 private[stream] var bits = AbstractStage.NoTerminationPending
/** /**
* INTERNAL API * INTERNAL API