Merge pull request #21447 from drewhk/wip-21446-harden-event-chasing-drewhk

#21446: Completion events must not be swallowed if chasing
This commit is contained in:
drewhk 2016-09-13 12:17:02 +02:00 committed by GitHub
commit ec50bd1441
4 changed files with 1329 additions and 1169 deletions

View file

@ -0,0 +1,115 @@
/**
* Copyright (C) 2015-2016 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.stream.impl.fusing
import akka.stream.scaladsl.{ Sink, Source }
import akka.stream._
import akka.stream.stage.{ GraphStage, GraphStageLogic, InHandler, OutHandler }
import akka.stream.testkit.Utils.TE
import akka.stream.testkit.{ TestPublisher, TestSubscriber }
import akka.testkit.AkkaSpec
class ChasingEventsSpec extends AkkaSpec {
implicit val materializer = ActorMaterializer(ActorMaterializerSettings(system).withFuzzing(false))
class CancelInChasedPull extends GraphStage[FlowShape[Int, Int]] {
val in = Inlet[Int]("Propagate.in")
val out = Outlet[Int]("Propagate.out")
override val shape: FlowShape[Int, Int] = FlowShape(in, out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with InHandler with OutHandler {
private var first = true
override def onPush(): Unit = push(out, grab(in))
override def onPull(): Unit = {
pull(in)
if (!first) cancel(in)
first = false
}
setHandlers(in, out, this)
}
}
class CompleteInChasedPush extends GraphStage[FlowShape[Int, Int]] {
val in = Inlet[Int]("Propagate.in")
val out = Outlet[Int]("Propagate.out")
override val shape: FlowShape[Int, Int] = FlowShape(in, out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with InHandler with OutHandler {
private var first = true
override def onPush(): Unit = {
push(out, grab(in))
complete(out)
}
override def onPull(): Unit = pull(in)
setHandlers(in, out, this)
}
}
class FailureInChasedPush extends GraphStage[FlowShape[Int, Int]] {
val in = Inlet[Int]("Propagate.in")
val out = Outlet[Int]("Propagate.out")
override val shape: FlowShape[Int, Int] = FlowShape(in, out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with InHandler with OutHandler {
private var first = true
override def onPush(): Unit = {
push(out, grab(in))
fail(out, TE("test failure"))
}
override def onPull(): Unit = pull(in)
setHandlers(in, out, this)
}
}
class ChasableSink extends GraphStage[SinkShape[Int]] {
val in = Inlet[Int]("Chaseable.in")
override val shape: SinkShape[Int] = SinkShape(in)
@throws(classOf[Exception])
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with InHandler {
override def preStart(): Unit = pull(in)
override def onPush(): Unit = pull(in)
setHandler(in, this)
}
}
"Event chasing" must {
"propagate cancel if enqueued immediately after pull" in {
val upstream = TestPublisher.probe[Int]()
Source.fromPublisher(upstream).via(new CancelInChasedPull).runWith(Sink.ignore)
upstream.sendNext(0)
upstream.expectCancellation()
}
"propagate complete if enqueued immediately after push" in {
val downstream = TestSubscriber.probe[Int]()
Source(1 to 10).via(new CompleteInChasedPush).runWith(Sink.fromSubscriber(downstream))
downstream.requestNext(1)
downstream.expectComplete()
}
"propagate failure if enqueued immediately after push" in {
val downstream = TestSubscriber.probe[Int]()
Source(1 to 10).via(new FailureInChasedPush).runWith(Sink.fromSubscriber(downstream))
downstream.requestNext(1)
downstream.expectError()
}
}
}

View file

@ -178,10 +178,29 @@ trait GraphInterpreterSpecKit extends StreamSpec {
}
abstract class PortTestSetup extends TestSetup {
abstract class PortTestSetup(chasing: Boolean = false) extends TestSetup {
val out = new UpstreamPortProbe[Int]
val in = new DownstreamPortProbe[Int]
class EventPropagateStage extends GraphStage[FlowShape[Int, Int]] {
val in = Inlet[Int]("Propagate.in")
val out = Outlet[Int]("Propagate.out")
override val shape: FlowShape[Int, Int] = FlowShape(in, out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with InHandler with OutHandler {
override def onPush(): Unit = push(out, grab(in))
override def onPull(): Unit = pull(in)
override def onUpstreamFinish(): Unit = complete(out)
override def onUpstreamFailure(ex: Throwable): Unit = fail(out, ex)
override def onDownstreamFinish(): Unit = cancel(in)
setHandlers(in, out, this)
}
}
// step() means different depending whether we have a stage between the two probes or not
override def step(): Unit = interpreter.execute(eventLimit = if (!chasing) 1 else 2)
class UpstreamPortProbe[T] extends UpstreamProbe[T]("upstreamPort") {
def isAvailable: Boolean = isAvailable(out)
def isClosed: Boolean = isClosed(out)
@ -217,16 +236,27 @@ trait GraphInterpreterSpecKit extends StreamSpec {
})
}
private val assembly = new GraphAssembly(
stages = Array.empty,
originalAttributes = Array.empty,
ins = Array(null),
inOwners = Array(-1),
outs = Array(null),
outOwners = Array(-1))
private val assembly = if (!chasing) {
new GraphAssembly(
stages = Array.empty,
originalAttributes = Array.empty,
ins = Array(null),
inOwners = Array(-1),
outs = Array(null),
outOwners = Array(-1))
} else {
val propagateStage = new EventPropagateStage
new GraphAssembly(
stages = Array(propagateStage),
originalAttributes = Array(Attributes.none),
ins = Array(propagateStage.in, null),
inOwners = Array(0, -1),
outs = Array(null, propagateStage.out),
outOwners = Array(-1, 0))
}
manualInit(assembly)
interpreter.attachDownstreamBoundary(interpreter.connections(0), in)
interpreter.attachDownstreamBoundary(interpreter.connections(if (chasing) 1 else 0), in)
interpreter.attachUpstreamBoundary(interpreter.connections(0), out)
interpreter.init(null)
}

View file

@ -842,6 +842,12 @@ final class GraphInterpreter(
connection.portState = currentState | (OutClosed | InFailed)
connection.slot = Failed(ex, connection.slot)
if ((currentState & (Pulling | Pushing)) == 0) enqueue(connection)
else if (chasedPush eq connection) {
// Abort chasing so Failure is not lost (chasing does NOT decode the event but assumes it to be a PUSH
// but we just changed the event!)
chasedPush = NoEvent
enqueue(connection)
}
}
if ((currentState & OutClosed) == 0) completeConnection(connection.outOwnerId)
}
@ -853,6 +859,12 @@ final class GraphInterpreter(
if ((currentState & OutClosed) == 0) {
connection.slot = Empty
if ((currentState & (Pulling | Pushing | InClosed)) == 0) enqueue(connection)
else if (chasedPull eq connection) {
// Abort chasing so Cancel is not lost (chasing does NOT decode the event but assumes it to be a PULL
// but we just changed the event!)
chasedPull = NoEvent
enqueue(connection)
}
}
if ((currentState & InClosed) == 0) completeConnection(connection.inOwnerId)
}