#21446: Completion events must not be swallowed if chasing

This commit is contained in:
Endre Sándor Varga 2016-09-12 16:50:02 +02:00
parent 4821fb158e
commit b2f0ca6750
4 changed files with 1329 additions and 1169 deletions

View file

@ -176,10 +176,29 @@ trait GraphInterpreterSpecKit extends StreamSpec {
}
abstract class PortTestSetup extends TestSetup {
abstract class PortTestSetup(chasing: Boolean = false) extends TestSetup {
val out = new UpstreamPortProbe[Int]
val in = new DownstreamPortProbe[Int]
class EventPropagateStage extends GraphStage[FlowShape[Int, Int]] {
val in = Inlet[Int]("Propagate.in")
val out = Outlet[Int]("Propagate.out")
override val shape: FlowShape[Int, Int] = FlowShape(in, out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with InHandler with OutHandler {
override def onPush(): Unit = push(out, grab(in))
override def onPull(): Unit = pull(in)
override def onUpstreamFinish(): Unit = complete(out)
override def onUpstreamFailure(ex: Throwable): Unit = fail(out, ex)
override def onDownstreamFinish(): Unit = cancel(in)
setHandlers(in, out, this)
}
}
// step() means different depending whether we have a stage between the two probes or not
override def step(): Unit = interpreter.execute(eventLimit = if (!chasing) 1 else 2)
class UpstreamPortProbe[T] extends UpstreamProbe[T]("upstreamPort") {
def isAvailable: Boolean = isAvailable(out)
def isClosed: Boolean = isClosed(out)
@ -215,16 +234,27 @@ trait GraphInterpreterSpecKit extends StreamSpec {
})
}
private val assembly = new GraphAssembly(
stages = Array.empty,
originalAttributes = Array.empty,
ins = Array(null),
inOwners = Array(-1),
outs = Array(null),
outOwners = Array(-1))
private val assembly = if (!chasing) {
new GraphAssembly(
stages = Array.empty,
originalAttributes = Array.empty,
ins = Array(null),
inOwners = Array(-1),
outs = Array(null),
outOwners = Array(-1))
} else {
val propagateStage = new EventPropagateStage
new GraphAssembly(
stages = Array(propagateStage),
originalAttributes = Array(Attributes.none),
ins = Array(propagateStage.in, null),
inOwners = Array(0, -1),
outs = Array(null, propagateStage.out),
outOwners = Array(-1, 0))
}
manualInit(assembly)
interpreter.attachDownstreamBoundary(interpreter.connections(0), in)
interpreter.attachDownstreamBoundary(interpreter.connections(if (chasing) 1 else 0), in)
interpreter.attachUpstreamBoundary(interpreter.connections(0), out)
interpreter.init(null)
}