Merge pull request #16398 from drewhk/wip-16345-interpreter-overflow-drewhk

=str: #16345: fix IndexOOB, and error handling in the interpreter
This commit is contained in:
drewhk 2014-11-27 11:33:29 +01:00
commit 86337e22ce
3 changed files with 94 additions and 69 deletions

View file

@ -50,7 +50,7 @@ trait InterpreterSpecKit extends AkkaSpec {
override def onDownstreamFinish(ctx: BoundaryContext): TerminationDirective = {
lastEvent += Cancel
ctx.exit()
ctx.finish()
}
override def onPull(ctx: BoundaryContext): Directive = {
@ -75,12 +75,12 @@ trait InterpreterSpecKit extends AkkaSpec {
override def onUpstreamFinish(ctx: BoundaryContext): TerminationDirective = {
lastEvent += OnComplete
ctx.exit()
ctx.finish()
}
override def onUpstreamFailure(cause: Throwable, ctx: BoundaryContext): TerminationDirective = {
lastEvent += OnError(cause)
ctx.exit()
ctx.finish()
}
override def onPull(ctx: BoundaryContext): Directive =

View file

@ -6,8 +6,7 @@ package akka.stream.scaladsl
import scala.concurrent.Await
import scala.concurrent.duration._
import scala.concurrent.forkjoin.ThreadLocalRandom
import akka.stream.FlowMaterializer
import akka.stream.MaterializerSettings
import akka.stream.{ OverflowStrategy, FlowMaterializer, MaterializerSettings }
import akka.stream.testkit.{ StreamTestKit, AkkaSpec }
class FlowConflateSpec extends AkkaSpec {
@ -93,6 +92,14 @@ class FlowConflateSpec extends AkkaSpec {
}
"work with a buffer and fold" in {
val future = Source(1 to 50)
.conflate(seed = i i)(aggregate = (sum, i) sum + i)
.buffer(50, OverflowStrategy.backpressure)
.fold(0)(_ + _)
Await.result(future, 3.seconds) should be((1 to 50).sum)
}
}
}

View file

@ -161,7 +161,7 @@ private[akka] class OneBoundedInterpreter(ops: Seq[Stage[_, _]], val forkLimit:
// make it possible for the Pushing state to be an object
private var elementInFlight: Any = _
// Points to the current point of execution inside the pipeline
private var activeOp = -1
private var activeOpIndex = -1
// The current interpreter state that decides what happens at the next round
private var state: State = Pushing
@ -170,6 +170,10 @@ private[akka] class OneBoundedInterpreter(ops: Seq[Stage[_, _]], val forkLimit:
// List that is used as an auxiliary stack if fork recursion depth reaches forkLimit
private var overflowStack = List.empty[(Int, State, Any)]
private var lastOpFailing: Int = -1
@inline private def currentOp: UntypedOp = pipeline(activeOpIndex)
// see the jumpBacks variable for explanation
private def calculateJumpBacks: Array[Int] = {
val table = Array.ofDim[Int](pipeline.length)
@ -182,19 +186,36 @@ private[akka] class OneBoundedInterpreter(ops: Seq[Stage[_, _]], val forkLimit:
}
private sealed trait State extends DetachedContext[Any] with BoundaryContext {
final def progress(): Unit = {
advance()
if (inside) run()
else exit()
}
/**
* Override this method to do execution steps necessary after executing an op, and advance the activeOpIndex
* to another value (next or previous steps). Do NOT put code that invokes the next op, override run instead.
*/
def advance(): Unit
/**
* Override this method to enter the current op and execute it. Do NOT put code that should be executed after the
* op has been invoked, that should be in the advance() method of the next state resulting from the invokation of
* the op.
*/
def run(): Unit
override def push(elem: Any): DownstreamDirective = {
if (pipeline(activeOp).holding) throw new IllegalStateException("Cannot push while holding, only pushAndPull")
pipeline(activeOp).allowedToPush = false
if (currentOp.holding) throw new IllegalStateException("Cannot push while holding, only pushAndPull")
currentOp.allowedToPush = false
elementInFlight = elem
state = Pushing
PhantomDirective
}
override def pull(): UpstreamDirective = {
if (pipeline(activeOp).holding) throw new IllegalStateException("Cannot pull while holding, only pushAndPull")
pipeline(activeOp).allowedToPush = !pipeline(activeOp).isInstanceOf[DetachedStage[_, _]]
if (currentOp.holding) throw new IllegalStateException("Cannot pull while holding, only pushAndPull")
currentOp.allowedToPush = !currentOp.isInstanceOf[DetachedStage[_, _]]
state = Pulling
PhantomDirective
}
@ -205,10 +226,10 @@ private[akka] class OneBoundedInterpreter(ops: Seq[Stage[_, _]], val forkLimit:
PhantomDirective
}
def isFinishing: Boolean = pipeline(activeOp).terminationPending
def isFinishing: Boolean = currentOp.terminationPending
override def pushAndFinish(elem: Any): DownstreamDirective = {
pipeline(activeOp) = Finished.asInstanceOf[UntypedOp]
pipeline(activeOpIndex) = Finished.asInstanceOf[UntypedOp]
// This MUST be an unsafeFork because the execution of PushFinish MUST strictly come before the finish execution
// path. Other forks are not order dependent because they execute on isolated execution domains which cannot
// "cross paths". This unsafeFork is relatively safe here because PushAndFinish simply absorbs all later downstream
@ -227,45 +248,41 @@ private[akka] class OneBoundedInterpreter(ops: Seq[Stage[_, _]], val forkLimit:
}
override def hold(): FreeDirective = {
if (pipeline(activeOp).holding) throw new IllegalStateException("Cannot hold while already holding")
pipeline(activeOp).holding = true
if (currentOp.holding) throw new IllegalStateException("Cannot hold while already holding")
currentOp.holding = true
exit()
}
override def isHolding: Boolean = pipeline(activeOp).holding
override def isHolding: Boolean = currentOp.holding
override def pushAndPull(elem: Any): FreeDirective = {
if (!pipeline(activeOp).holding) throw new IllegalStateException("Cannot pushAndPull without holding first")
pipeline(activeOp).holding = false
if (!currentOp.holding) throw new IllegalStateException("Cannot pushAndPull without holding first")
currentOp.holding = false
fork(Pushing, elem)
state = Pulling
PhantomDirective
}
override def absorbTermination(): TerminationDirective = {
pipeline(activeOp).holding = false
currentOp.holding = false
finish()
}
override def exit(): FreeDirective = {
elementInFlight = null
activeOp = -1
activeOpIndex = -1
PhantomDirective
}
}
private object Pushing extends State {
override def advance(): Unit = {
activeOp += 1
pipeline(activeOp).onPush(elementInFlight, ctx = this)
}
override def advance(): Unit = activeOpIndex += 1
override def run(): Unit = currentOp.onPush(elementInFlight, ctx = this)
}
private object PushFinish extends State {
override def advance(): Unit = {
activeOp += 1
pipeline(activeOp).onPush(elementInFlight, ctx = this)
}
override def advance(): Unit = activeOpIndex += 1
override def run(): Unit = currentOp.onPush(elementInFlight, ctx = this)
override def pushAndFinish(elem: Any): DownstreamDirective = {
elementInFlight = elem
@ -282,25 +299,26 @@ private[akka] class OneBoundedInterpreter(ops: Seq[Stage[_, _]], val forkLimit:
private object Pulling extends State {
override def advance(): Unit = {
elementInFlight = null
activeOp = jumpBacks(activeOp)
pipeline(activeOp).onPull(ctx = this)
activeOpIndex = jumpBacks(activeOpIndex)
}
override def run(): Unit = currentOp.onPull(ctx = this)
override def hold(): FreeDirective = {
currentOp.allowedToPush = true
super.hold()
pipeline(activeOp).allowedToPush = true
PhantomDirective
}
}
private object Completing extends State {
override def advance(): Unit = {
elementInFlight = null
pipeline(activeOp) = Finished.asInstanceOf[UntypedOp]
activeOp += 1
pipeline(activeOpIndex) = Finished.asInstanceOf[UntypedOp]
activeOpIndex += 1
}
// FIXME issue #16345, ArrayIndexOutOfBoundsException
if (!pipeline(activeOp).terminationPending) pipeline(activeOp).onUpstreamFinish(ctx = this)
override def run(): Unit = {
if (!currentOp.terminationPending) currentOp.onUpstreamFinish(ctx = this)
else exit()
}
@ -310,10 +328,10 @@ private[akka] class OneBoundedInterpreter(ops: Seq[Stage[_, _]], val forkLimit:
}
override def absorbTermination(): TerminationDirective = {
pipeline(activeOp).terminationPending = true
pipeline(activeOp).holding = false
currentOp.terminationPending = true
currentOp.holding = false
// FIXME: This state is potentially corrupted by the jumpBackTable (not updated when jumping over)
if (pipeline(activeOp).allowedToPush) pipeline(activeOp).onPull(ctx = Pulling)
if (currentOp.allowedToPush) currentOp.onPull(ctx = Pulling)
else exit()
PhantomDirective
}
@ -322,11 +340,12 @@ private[akka] class OneBoundedInterpreter(ops: Seq[Stage[_, _]], val forkLimit:
private object Cancelling extends State {
override def advance(): Unit = {
elementInFlight = null
pipeline(activeOp) = Finished.asInstanceOf[UntypedOp]
activeOp -= 1
pipeline(activeOpIndex) = Finished.asInstanceOf[UntypedOp]
activeOpIndex -= 1
}
// FIXME issue #16345, ArrayIndexOutOfBoundsException
if (!pipeline(activeOp).terminationPending) pipeline(activeOp).onDownstreamFinish(ctx = this)
def run(): Unit = {
if (!currentOp.terminationPending) currentOp.onDownstreamFinish(ctx = this)
else exit()
}
@ -339,40 +358,38 @@ private[akka] class OneBoundedInterpreter(ops: Seq[Stage[_, _]], val forkLimit:
private final case class Failing(cause: Throwable) extends State {
override def advance(): Unit = {
elementInFlight = null
pipeline(activeOp) = Finished.asInstanceOf[UntypedOp]
activeOp += 1
pipeline(activeOp).onUpstreamFailure(cause, ctx = this)
pipeline(activeOpIndex) = Finished.asInstanceOf[UntypedOp]
activeOpIndex += 1
}
def run(): Unit = currentOp.onUpstreamFailure(cause, ctx = this)
override def absorbTermination(): TerminationDirective = {
pipeline(activeOp).terminationPending = true
pipeline(activeOp).holding = false
if (pipeline(activeOp).allowedToPush) pipeline(activeOp).onPull(ctx = Pulling)
currentOp.terminationPending = true
currentOp.holding = false
if (currentOp.allowedToPush) currentOp.onPull(ctx = Pulling)
else exit()
PhantomDirective
}
}
private def inside: Boolean = activeOpIndex > -1 && activeOpIndex < pipeline.length
@tailrec private def execute(): Unit = {
while (activeOp > -1 && activeOp < pipeline.length) {
while (inside) {
try {
state.advance()
state.progress()
} catch {
case NonFatal(e)
try {
state.fail(e)
} catch {
case NonFatal(_)
// TODO: Make pipeline all failed
throw new IllegalStateException("Double Fault: Failure while handling failure", e)
}
case NonFatal(e) if lastOpFailing != activeOpIndex
lastOpFailing = activeOpIndex
state.fail(e)
}
}
// Execute all delayed forks that were put on the heap if the fork limit has been reached
if (overflowStack.nonEmpty) {
val memo = overflowStack.head
activeOp = memo._1
activeOpIndex = memo._1
state = memo._2
elementInFlight = memo._3
overflowStack = overflowStack.tail
@ -391,7 +408,7 @@ private[akka] class OneBoundedInterpreter(ops: Seq[Stage[_, _]], val forkLimit:
forkCount += 1
if (forkCount == forkLimit) {
if (!overflowToHeap) throw new IllegalStateException("Fork limit reached")
else overflowStack ::= ((activeOp, forkState, elem))
else overflowStack ::= ((activeOpIndex, forkState, elem))
} else unsafeFork(forkState, elem)
forkCount -= 1
}
@ -401,11 +418,11 @@ private[akka] class OneBoundedInterpreter(ops: Seq[Stage[_, _]], val forkLimit:
* is order dependent since the push and complete events travel in the same direction and not isolated by a boundary
*/
private def unsafeFork(forkState: State, elem: Any = null): Unit = {
val savePos = activeOp
val savePos = activeOpIndex
elementInFlight = elem
state = forkState
execute()
activeOp = savePos
activeOpIndex = savePos
}
def init(): Unit = {
@ -427,45 +444,46 @@ private[akka] class OneBoundedInterpreter(ops: Seq[Stage[_, _]], val forkLimit:
pipeline(op).asInstanceOf[BoundaryStage].bctx = new State {
val entryPoint = op
override def run(): Unit = ()
override def advance(): Unit = ()
override def push(elem: Any): DownstreamDirective = {
activeOp = entryPoint
activeOpIndex = entryPoint
super.push(elem)
execute()
PhantomDirective
}
override def pull(): UpstreamDirective = {
activeOp = entryPoint
activeOpIndex = entryPoint
super.pull()
execute()
PhantomDirective
}
override def finish(): FreeDirective = {
activeOp = entryPoint
activeOpIndex = entryPoint
super.finish()
execute()
PhantomDirective
}
override def fail(cause: Throwable): FreeDirective = {
activeOp = entryPoint
activeOpIndex = entryPoint
super.fail(cause)
execute()
PhantomDirective
}
override def hold(): FreeDirective = {
activeOp = entryPoint
activeOpIndex = entryPoint
super.hold()
execute()
PhantomDirective
}
override def pushAndPull(elem: Any): FreeDirective = {
activeOp = entryPoint
activeOpIndex = entryPoint
super.pushAndPull(elem)
execute()
PhantomDirective
@ -486,7 +504,7 @@ private[akka] class OneBoundedInterpreter(ops: Seq[Stage[_, _]], val forkLimit:
var op = pipeline.length - 1
while (op >= 0) {
if (pipeline(op).isInstanceOf[DetachedStage[_, _]]) {
activeOp = op
activeOpIndex = op
state = Pulling
execute()
}