=str #16231: Protect jumpback table from corruption

This commit is contained in:
Endre Sándor Varga 2015-01-26 10:31:57 +01:00
parent 82195d63a3
commit 5202d59dda
3 changed files with 63 additions and 1 deletions

View file

@ -3,6 +3,8 @@
*/
package akka.stream.impl.fusing
import akka.stream.impl.fusing.Map
import scala.util.control.NoStackTrace
class InterpreterSpec extends InterpreterSpecKit {
@ -419,6 +421,38 @@ class InterpreterSpec extends InterpreterSpecKit {
}
"work with jumpback table and completed elements" in new TestSetup(Seq(
Map((x: Int) x),
Map((x: Int) x),
KeepGoing(),
Map((x: Int) x),
Map((x: Int) x))) {
lastEvents() should be(Set.empty)
downstream.requestOne()
lastEvents() should be(Set(RequestOne))
upstream.onNext(1)
lastEvents() should be(Set(OnNext(1)))
downstream.requestOne()
lastEvents() should be(Set(RequestOne))
upstream.onNext(2)
lastEvents() should be(Set(OnNext(2)))
upstream.onComplete()
lastEvents() should be(Set.empty)
downstream.requestOne()
lastEvents() should be(Set(OnNext(2)))
downstream.requestOne()
lastEvents() should be(Set(OnNext(2)))
}
"implement expand-filter" in pending
"implement take-conflate" in pending

View file

@ -32,6 +32,23 @@ trait InterpreterSpecKit extends AkkaSpec {
}
}
private[akka] case class KeepGoing[T]() extends PushPullStage[T, T] {
var lastElem: T = _
override def onPush(elem: T, ctx: Context[T]): Directive = {
lastElem = elem
ctx.push(elem)
}
override def onPull(ctx: Context[T]): Directive = {
if (ctx.isFinishing) {
ctx.push(lastElem)
} else ctx.pull()
}
override def onUpstreamFinish(ctx: Context[T]): TerminationDirective = ctx.absorbTermination()
}
abstract class TestSetup(ops: Seq[Stage[_, _]], forkLimit: Int = 100, overflowToHeap: Boolean = false) {
private var lastEvent: Set[Any] = Set.empty

View file

@ -185,6 +185,15 @@ private[akka] class OneBoundedInterpreter(ops: Seq[Stage[_, _]], val forkLimit:
table
}
private def updateJumpBacks(lastNonCompletedIndex: Int): Unit = {
var pos = lastNonCompletedIndex
// For every jump that would jump over us we change them to jump into us
while (jumpBacks(pos) < lastNonCompletedIndex && pos < pipeline.length) {
jumpBacks(pos) = lastNonCompletedIndex
pos += 1
}
}
private sealed trait State extends DetachedContext[Any] with BoundaryContext {
final def progress(): Unit = {
advance()
@ -264,6 +273,7 @@ private[akka] class OneBoundedInterpreter(ops: Seq[Stage[_, _]], val forkLimit:
}
override def absorbTermination(): TerminationDirective = {
updateJumpBacks(activeOpIndex)
currentOp.holding = false
finish()
}
@ -330,7 +340,7 @@ private[akka] class OneBoundedInterpreter(ops: Seq[Stage[_, _]], val forkLimit:
override def absorbTermination(): TerminationDirective = {
currentOp.terminationPending = true
currentOp.holding = false
// FIXME: This state is potentially corrupted by the jumpBackTable (not updated when jumping over)
updateJumpBacks(activeOpIndex)
if (currentOp.allowedToPush) currentOp.onPull(ctx = Pulling)
else exit()
null
@ -367,6 +377,7 @@ private[akka] class OneBoundedInterpreter(ops: Seq[Stage[_, _]], val forkLimit:
override def absorbTermination(): TerminationDirective = {
currentOp.terminationPending = true
currentOp.holding = false
updateJumpBacks(activeOpIndex)
if (currentOp.allowedToPush) currentOp.onPull(ctx = Pulling)
else exit()
null