=str #17351: Fix pushAndFinish improper stage termination
This commit is contained in:
parent
035037dd24
commit
6ba5c0b4c6
4 changed files with 146 additions and 9 deletions
|
|
@ -3,6 +3,8 @@
|
||||||
*/
|
*/
|
||||||
package akka.stream.impl.fusing
|
package akka.stream.impl.fusing
|
||||||
|
|
||||||
|
import akka.stream.stage.{ TerminationDirective, SyncDirective, Context, PushStage }
|
||||||
|
|
||||||
import scala.util.control.NoStackTrace
|
import scala.util.control.NoStackTrace
|
||||||
import akka.stream.Supervision
|
import akka.stream.Supervision
|
||||||
|
|
||||||
|
|
@ -460,6 +462,52 @@ class InterpreterSpec extends InterpreterSpecKit {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// This test is related to issue #17351
|
||||||
|
class PushFinishStage extends PushStage[Any, Any] {
|
||||||
|
override def onPush(elem: Any, ctx: Context[Any]): SyncDirective = ctx.pushAndFinish(elem)
|
||||||
|
override def onUpstreamFinish(ctx: Context[Any]): TerminationDirective =
|
||||||
|
ctx.fail(akka.stream.testkit.Utils.TE("Cannot happen"))
|
||||||
|
}
|
||||||
|
|
||||||
|
"work with pushAndFinish if upstream completes with pushAndFinish" in new TestSetup(Seq(
|
||||||
|
new PushFinishStage)) {
|
||||||
|
|
||||||
|
lastEvents() should be(Set.empty)
|
||||||
|
|
||||||
|
downstream.requestOne()
|
||||||
|
lastEvents() should be(Set(RequestOne))
|
||||||
|
|
||||||
|
upstream.onNextAndComplete("foo")
|
||||||
|
lastEvents() should be(Set(OnNext("foo"), OnComplete))
|
||||||
|
}
|
||||||
|
|
||||||
|
"work with pushAndFinish if indirect upstream completes with pushAndFinish" in new TestSetup(Seq(
|
||||||
|
Map((x: Any) ⇒ x, stoppingDecider),
|
||||||
|
new PushFinishStage,
|
||||||
|
Map((x: Any) ⇒ x, stoppingDecider))) {
|
||||||
|
|
||||||
|
lastEvents() should be(Set.empty)
|
||||||
|
|
||||||
|
downstream.requestOne()
|
||||||
|
lastEvents() should be(Set(RequestOne))
|
||||||
|
|
||||||
|
upstream.onNextAndComplete("foo")
|
||||||
|
lastEvents() should be(Set(OnNext("foo"), OnComplete))
|
||||||
|
}
|
||||||
|
|
||||||
|
"work with pushAndFinish if upstream completes with pushAndFinish and downstream immediately pulls" in new TestSetup(Seq(
|
||||||
|
new PushFinishStage,
|
||||||
|
Fold("", (x: String, y: String) ⇒ x + y, stoppingDecider))) {
|
||||||
|
|
||||||
|
lastEvents() should be(Set.empty)
|
||||||
|
|
||||||
|
downstream.requestOne()
|
||||||
|
lastEvents() should be(Set(RequestOne))
|
||||||
|
|
||||||
|
upstream.onNextAndComplete("foo")
|
||||||
|
lastEvents() should be(Set(OnNext("foo"), OnComplete))
|
||||||
|
}
|
||||||
|
|
||||||
"implement expand-filter" in pending
|
"implement expand-filter" in pending
|
||||||
|
|
||||||
"implement take-conflate" in pending
|
"implement take-conflate" in pending
|
||||||
|
|
@ -470,7 +518,31 @@ class InterpreterSpec extends InterpreterSpecKit {
|
||||||
|
|
||||||
"implement expand-take" in pending
|
"implement expand-take" in pending
|
||||||
|
|
||||||
"implement take-take" in pending
|
"implement take-take" in new TestSetup(Seq(
|
||||||
|
Take(1),
|
||||||
|
Take(1))) {
|
||||||
|
lastEvents() should be(Set.empty)
|
||||||
|
|
||||||
|
downstream.requestOne()
|
||||||
|
lastEvents() should be(Set(RequestOne))
|
||||||
|
|
||||||
|
upstream.onNext("foo")
|
||||||
|
lastEvents() should be(Set(OnNext("foo"), OnComplete, Cancel))
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
"implement take-take with pushAndFinish from upstream" in new TestSetup(Seq(
|
||||||
|
Take(1),
|
||||||
|
Take(1))) {
|
||||||
|
lastEvents() should be(Set.empty)
|
||||||
|
|
||||||
|
downstream.requestOne()
|
||||||
|
lastEvents() should be(Set(RequestOne))
|
||||||
|
|
||||||
|
upstream.onNextAndComplete("foo")
|
||||||
|
lastEvents() should be(Set(OnNext("foo"), OnComplete))
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
"implement take-drop" in pending
|
"implement take-drop" in pending
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -92,6 +92,11 @@ trait InterpreterSpecKit extends AkkaSpec {
|
||||||
|
|
||||||
def onNext(elem: Any): Unit = enterAndPush(elem)
|
def onNext(elem: Any): Unit = enterAndPush(elem)
|
||||||
def onComplete(): Unit = enterAndFinish()
|
def onComplete(): Unit = enterAndFinish()
|
||||||
|
def onNextAndComplete(elem: Any): Unit = {
|
||||||
|
context.enter()
|
||||||
|
context.pushAndFinish(elem)
|
||||||
|
context.execute()
|
||||||
|
}
|
||||||
def onError(cause: Throwable): Unit = enterAndFail(cause)
|
def onError(cause: Throwable): Unit = enterAndFail(cause)
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -424,6 +424,36 @@ class FlowStageSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug
|
||||||
upsub.expectCancellation()
|
upsub.expectCancellation()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
"not trigger onUpstreamFinished after pushAndFinish" in assertAllStagesStopped {
|
||||||
|
val in = TestPublisher.manualProbe[Int]()
|
||||||
|
val flow =
|
||||||
|
Source(in)
|
||||||
|
.transform(() ⇒ new StatefulStage[Int, Int] {
|
||||||
|
|
||||||
|
def initial: StageState[Int, Int] = new State {
|
||||||
|
override def onPush(element: Int, ctx: Context[Int]) =
|
||||||
|
ctx.pushAndFinish(element)
|
||||||
|
}
|
||||||
|
override def onUpstreamFinish(ctx: Context[Int]): TerminationDirective =
|
||||||
|
terminationEmit(Iterator(42), ctx)
|
||||||
|
})
|
||||||
|
.runWith(Sink.publisher)
|
||||||
|
|
||||||
|
val inSub = in.expectSubscription()
|
||||||
|
|
||||||
|
val out = TestSubscriber.manualProbe[Int]()
|
||||||
|
flow.subscribe(out)
|
||||||
|
val outSub = out.expectSubscription()
|
||||||
|
|
||||||
|
inSub.sendNext(23)
|
||||||
|
inSub.sendComplete()
|
||||||
|
|
||||||
|
outSub.request(1) // it depends on this line, i.e. generating backpressure between buffer and stage execution
|
||||||
|
|
||||||
|
out.expectNext(23)
|
||||||
|
out.expectComplete()
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -37,6 +37,25 @@ private[akka] object OneBoundedInterpreter {
|
||||||
override def onDownstreamFinish(ctx: BoundaryContext): TerminationDirective = ctx.exit()
|
override def onDownstreamFinish(ctx: BoundaryContext): TerminationDirective = ctx.exit()
|
||||||
override def onUpstreamFailure(cause: Throwable, ctx: BoundaryContext): TerminationDirective = ctx.exit()
|
override def onUpstreamFailure(cause: Throwable, ctx: BoundaryContext): TerminationDirective = ctx.exit()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* INTERNAL API
|
||||||
|
*
|
||||||
|
* This artificial op is used as a boundary to prevent the first forked onPush of execution of a pushFinish to enter
|
||||||
|
* the originating stage again. This stage allows the forked upstream onUpstreamFinish to pass through if there was
|
||||||
|
* no onPull called on the stage. Calling onPull on this op makes it a Finished op, which absorbs the
|
||||||
|
* onUpstreamTermination, but otherwise onUpstreamTermination results in calling finish()
|
||||||
|
*/
|
||||||
|
private[akka] object PushFinished extends BoundaryStage {
|
||||||
|
override def onPush(elem: Any, ctx: BoundaryContext): UpstreamDirective = ctx.finish()
|
||||||
|
override def onPull(ctx: BoundaryContext): DownstreamDirective = ctx.finish()
|
||||||
|
// This allows propagation of an onUpstreamFinish call. Note that if onPull has been called on this stage
|
||||||
|
// before, then the call ctx.finish() in onPull already turned this op to a normal Finished, i.e. it will no longer
|
||||||
|
// propagate onUpstreamFinish.
|
||||||
|
override def onUpstreamFinish(ctx: BoundaryContext): TerminationDirective = ctx.finish()
|
||||||
|
override def onDownstreamFinish(ctx: BoundaryContext): TerminationDirective = ctx.exit()
|
||||||
|
override def onUpstreamFailure(cause: Throwable, ctx: BoundaryContext): TerminationDirective = ctx.exit()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -200,7 +219,7 @@ private[akka] class OneBoundedInterpreter(ops: Seq[Stage[_, _]],
|
||||||
private def updateJumpBacks(lastNonCompletedIndex: Int): Unit = {
|
private def updateJumpBacks(lastNonCompletedIndex: Int): Unit = {
|
||||||
var pos = lastNonCompletedIndex
|
var pos = lastNonCompletedIndex
|
||||||
// For every jump that would jump over us we change them to jump into us
|
// For every jump that would jump over us we change them to jump into us
|
||||||
while (jumpBacks(pos) < lastNonCompletedIndex && pos < pipeline.length) {
|
while (pos < pipeline.length && jumpBacks(pos) < lastNonCompletedIndex) {
|
||||||
jumpBacks(pos) = lastNonCompletedIndex
|
jumpBacks(pos) = lastNonCompletedIndex
|
||||||
pos += 1
|
pos += 1
|
||||||
}
|
}
|
||||||
|
|
@ -290,6 +309,7 @@ private[akka] class OneBoundedInterpreter(ops: Seq[Stage[_, _]],
|
||||||
}
|
}
|
||||||
|
|
||||||
override def finish(): FreeDirective = {
|
override def finish(): FreeDirective = {
|
||||||
|
finishCurrentOp()
|
||||||
fork(Completing)
|
fork(Completing)
|
||||||
state = Cancelling
|
state = Cancelling
|
||||||
null
|
null
|
||||||
|
|
@ -297,13 +317,19 @@ private[akka] class OneBoundedInterpreter(ops: Seq[Stage[_, _]],
|
||||||
|
|
||||||
def isFinishing: Boolean = hasBits(TerminationPending)
|
def isFinishing: Boolean = hasBits(TerminationPending)
|
||||||
|
|
||||||
override def pushAndFinish(elem: Any): DownstreamDirective = {
|
final protected def pushAndFinishCommon(elem: Any): Unit = {
|
||||||
ReactiveStreamsCompliance.requireNonNullElement(elem)
|
ReactiveStreamsCompliance.requireNonNullElement(elem)
|
||||||
if (currentOp.isDetached) {
|
if (currentOp.isDetached) {
|
||||||
mustHave(DownstreamBall)
|
mustHave(DownstreamBall)
|
||||||
}
|
}
|
||||||
removeBits(DownstreamBall | PrecedingWasPull)
|
removeBits(DownstreamBall | PrecedingWasPull)
|
||||||
|
}
|
||||||
|
|
||||||
|
override def pushAndFinish(elem: Any): DownstreamDirective = {
|
||||||
|
pushAndFinishCommon(elem)
|
||||||
|
// Spit the execution domain in two, and invoke op postStop callbacks if there are any
|
||||||
finishCurrentOp()
|
finishCurrentOp()
|
||||||
|
|
||||||
// This MUST be an unsafeFork because the execution of PushFinish MUST strictly come before the finish execution
|
// This MUST be an unsafeFork because the execution of PushFinish MUST strictly come before the finish execution
|
||||||
// path. Other forks are not order dependent because they execute on isolated execution domains which cannot
|
// path. Other forks are not order dependent because they execute on isolated execution domains which cannot
|
||||||
// "cross paths". This unsafeFork is relatively safe here because PushAndFinish simply absorbs all later downstream
|
// "cross paths". This unsafeFork is relatively safe here because PushAndFinish simply absorbs all later downstream
|
||||||
|
|
@ -311,8 +337,12 @@ private[akka] class OneBoundedInterpreter(ops: Seq[Stage[_, _]],
|
||||||
// It might be that there are some degenerate cases where this can blow up the stack with a very long chain but I
|
// It might be that there are some degenerate cases where this can blow up the stack with a very long chain but I
|
||||||
// am not aware of such scenario yet. If you know one, put it in InterpreterStressSpec :)
|
// am not aware of such scenario yet. If you know one, put it in InterpreterStressSpec :)
|
||||||
unsafeFork(PushFinish, elem)
|
unsafeFork(PushFinish, elem)
|
||||||
|
|
||||||
|
// Same as finish, without calling finishCurrentOp
|
||||||
elementInFlight = null
|
elementInFlight = null
|
||||||
finish()
|
fork(Completing)
|
||||||
|
state = Cancelling
|
||||||
|
null
|
||||||
}
|
}
|
||||||
|
|
||||||
override def fail(cause: Throwable): FreeDirective = {
|
override def fail(cause: Throwable): FreeDirective = {
|
||||||
|
|
@ -397,11 +427,11 @@ private[akka] class OneBoundedInterpreter(ops: Seq[Stage[_, _]],
|
||||||
override def run(): Unit = currentOp.onPush(elementInFlight, ctx = this)
|
override def run(): Unit = currentOp.onPush(elementInFlight, ctx = this)
|
||||||
|
|
||||||
override def pushAndFinish(elem: Any): DownstreamDirective = {
|
override def pushAndFinish(elem: Any): DownstreamDirective = {
|
||||||
ReactiveStreamsCompliance.requireNonNullElement(elem)
|
pushAndFinishCommon(elem)
|
||||||
/*
|
// Put an isolation barrier that will prevent the onPull of this op to be called again. This barrier
|
||||||
* FIXME (RK) please someone explain why this works: the stage already
|
// is different from simple Finished that it allows onUpstreamTerminated to pass through, unless onPull
|
||||||
* terminated, but eventually it will see another onPull because nobody noticed.
|
// has been called on the stage
|
||||||
*/
|
pipeline(activeOpIndex) = PushFinished.asInstanceOf[UntypedOp]
|
||||||
elementInFlight = elem
|
elementInFlight = elem
|
||||||
state = PushFinish
|
state = PushFinish
|
||||||
null
|
null
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue