diff --git a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/ChasingEventsSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/ChasingEventsSpec.scala new file mode 100644 index 0000000000..8f4f69f77f --- /dev/null +++ b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/ChasingEventsSpec.scala @@ -0,0 +1,115 @@ +/** + * Copyright (C) 2015-2016 Lightbend Inc. + */ +package akka.stream.impl.fusing + +import akka.stream.scaladsl.{ Sink, Source } +import akka.stream._ +import akka.stream.stage.{ GraphStage, GraphStageLogic, InHandler, OutHandler } +import akka.stream.testkit.Utils.TE +import akka.stream.testkit.{ TestPublisher, TestSubscriber } +import akka.testkit.AkkaSpec + +class ChasingEventsSpec extends AkkaSpec { + + implicit val materializer = ActorMaterializer(ActorMaterializerSettings(system).withFuzzing(false)) + + class CancelInChasedPull extends GraphStage[FlowShape[Int, Int]] { + val in = Inlet[Int]("Propagate.in") + val out = Outlet[Int]("Propagate.out") + override val shape: FlowShape[Int, Int] = FlowShape(in, out) + + override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with InHandler with OutHandler { + private var first = true + override def onPush(): Unit = push(out, grab(in)) + override def onPull(): Unit = { + pull(in) + if (!first) cancel(in) + first = false + } + + setHandlers(in, out, this) + } + } + + class CompleteInChasedPush extends GraphStage[FlowShape[Int, Int]] { + val in = Inlet[Int]("Propagate.in") + val out = Outlet[Int]("Propagate.out") + override val shape: FlowShape[Int, Int] = FlowShape(in, out) + + override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with InHandler with OutHandler { + private var first = true + override def onPush(): Unit = { + push(out, grab(in)) + complete(out) + } + override def onPull(): Unit = pull(in) + + setHandlers(in, out, this) + } + } + + class FailureInChasedPush extends GraphStage[FlowShape[Int, Int]] { + val in = Inlet[Int]("Propagate.in") + val out = Outlet[Int]("Propagate.out") + override val shape: FlowShape[Int, Int] = FlowShape(in, out) + + override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with InHandler with OutHandler { + private var first = true + override def onPush(): Unit = { + push(out, grab(in)) + fail(out, TE("test failure")) + } + override def onPull(): Unit = pull(in) + + setHandlers(in, out, this) + } + } + + class ChasableSink extends GraphStage[SinkShape[Int]] { + val in = Inlet[Int]("Chaseable.in") + override val shape: SinkShape[Int] = SinkShape(in) + + @throws(classOf[Exception]) + override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with InHandler { + override def preStart(): Unit = pull(in) + override def onPush(): Unit = pull(in) + setHandler(in, this) + } + } + + "Event chasing" must { + + "propagate cancel if enqueued immediately after pull" in { + val upstream = TestPublisher.probe[Int]() + + Source.fromPublisher(upstream).via(new CancelInChasedPull).runWith(Sink.ignore) + + upstream.sendNext(0) + upstream.expectCancellation() + + } + + "propagate complete if enqueued immediately after push" in { + val downstream = TestSubscriber.probe[Int]() + + Source(1 to 10).via(new CompleteInChasedPush).runWith(Sink.fromSubscriber(downstream)) + + downstream.requestNext(1) + downstream.expectComplete() + + } + + "propagate failure if enqueued immediately after push" in { + val downstream = TestSubscriber.probe[Int]() + + Source(1 to 10).via(new FailureInChasedPush).runWith(Sink.fromSubscriber(downstream)) + + downstream.requestNext(1) + downstream.expectError() + + } + + } + +} diff --git a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/GraphInterpreterPortsSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/GraphInterpreterPortsSpec.scala index 0bc6b81c37..37e7754190 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/GraphInterpreterPortsSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/GraphInterpreterPortsSpec.scala @@ -12,1166 +12,1169 @@ class GraphInterpreterPortsSpec extends StreamSpec with GraphInterpreterSpecKit // FIXME test failure scenarios - "properly transition on push and pull" in new PortTestSetup { - lastEvents() should be(Set.empty) - out.isAvailable should be(false) - out.isClosed should be(false) - in.isAvailable should be(false) - in.hasBeenPulled should be(false) - in.isClosed should be(false) - an[IllegalArgumentException] should be thrownBy { out.push(0) } - an[IllegalArgumentException] should be thrownBy { in.grab() } - - in.pull() - - lastEvents() should be(Set.empty) - out.isAvailable should be(false) - out.isClosed should be(false) - in.isAvailable should be(false) - in.hasBeenPulled should be(true) - in.isClosed should be(false) - an[IllegalArgumentException] should be thrownBy { in.pull() } - an[IllegalArgumentException] should be thrownBy { out.push(0) } - an[IllegalArgumentException] should be thrownBy { in.grab() } - - stepAll() - - lastEvents() should be(Set(RequestOne(out))) - out.isAvailable should be(true) - out.isClosed should be(false) - in.isAvailable should be(false) - in.hasBeenPulled should be(true) - in.isClosed should be(false) - an[IllegalArgumentException] should be thrownBy { in.pull() } - an[IllegalArgumentException] should be thrownBy { in.grab() } - - out.push(0) - - lastEvents() should be(Set.empty) - out.isAvailable should be(false) - out.isClosed should be(false) - in.isAvailable should be(false) - in.hasBeenPulled should be(true) - in.isClosed should be(false) - an[IllegalArgumentException] should be thrownBy { in.pull() } - an[IllegalArgumentException] should be thrownBy { out.push(0) } - an[IllegalArgumentException] should be thrownBy { in.grab() } - - stepAll() - - lastEvents() should be(Set(OnNext(in, 0))) - out.isAvailable should be(false) - out.isClosed should be(false) - in.isAvailable should be(true) - in.hasBeenPulled should be(false) - in.isClosed should be(false) - an[IllegalArgumentException] should be thrownBy { out.push(0) } - - in.grab() should ===(0) - - lastEvents() should be(Set.empty) - out.isAvailable should be(false) - out.isClosed should be(false) - in.isAvailable should be(false) - in.hasBeenPulled should be(false) - in.isClosed should be(false) - an[IllegalArgumentException] should be thrownBy { out.push(0) } - an[IllegalArgumentException] should be thrownBy { in.grab() } - - // Cycle completed - } - - "drop ungrabbed element on pull" in new PortTestSetup { - in.pull() - step() - clearEvents() - out.push(0) - step() - - lastEvents() should be(Set(OnNext(in, 0))) - - in.pull() - - an[IllegalArgumentException] should be thrownBy { in.grab() } - } - - "propagate complete while downstream is active" in new PortTestSetup { - lastEvents() should be(Set.empty) - out.isAvailable should be(false) - out.isClosed should be(false) - in.isAvailable should be(false) - in.hasBeenPulled should be(false) - in.isClosed should be(false) - - out.complete() - - lastEvents() should be(Set.empty) - out.isAvailable should be(false) - out.isClosed should be(true) - in.isAvailable should be(false) - in.hasBeenPulled should be(false) - in.isClosed should be(false) - an[IllegalArgumentException] should be thrownBy { out.push(0) } - - stepAll() - - lastEvents() should be(Set(OnComplete(in))) - out.isAvailable should be(false) - out.isClosed should be(true) - in.isAvailable should be(false) - in.hasBeenPulled should be(false) - in.isClosed should be(true) - an[IllegalArgumentException] should be thrownBy { in.pull() } - an[IllegalArgumentException] should be thrownBy { out.push(0) } - an[IllegalArgumentException] should be thrownBy { in.grab() } - - in.cancel() // This should have no effect now - stepAll() - - lastEvents() should be(Set.empty) - out.isAvailable should be(false) - out.isClosed should be(true) - in.isAvailable should be(false) - in.hasBeenPulled should be(false) - in.isClosed should be(true) - an[IllegalArgumentException] should be thrownBy { in.pull() } - an[IllegalArgumentException] should be thrownBy { out.push(0) } - an[IllegalArgumentException] should be thrownBy { in.grab() } - - out.complete() // This should have no effect now - stepAll() - - lastEvents() should be(Set.empty) - out.isAvailable should be(false) - out.isClosed should be(true) - in.isAvailable should be(false) - in.hasBeenPulled should be(false) - in.isClosed should be(true) - an[IllegalArgumentException] should be thrownBy { in.pull() } - an[IllegalArgumentException] should be thrownBy { out.push(0) } - an[IllegalArgumentException] should be thrownBy { in.grab() } - } - - "propagate complete while upstream is active" in new PortTestSetup { - in.pull() - stepAll() - - lastEvents() should be(Set(RequestOne(out))) - out.isAvailable should be(true) - out.isClosed should be(false) - in.isAvailable should be(false) - in.hasBeenPulled should be(true) - in.isClosed should be(false) - - out.complete() - - lastEvents() should be(Set.empty) - out.isAvailable should be(false) - out.isClosed should be(true) - in.isAvailable should be(false) - in.hasBeenPulled should be(true) - in.isClosed should be(false) - an[IllegalArgumentException] should be thrownBy { out.push(0) } - - stepAll() - - lastEvents() should be(Set(OnComplete(in))) - out.isAvailable should be(false) - out.isClosed should be(true) - in.isAvailable should be(false) - in.hasBeenPulled should be(false) - in.isClosed should be(true) - an[IllegalArgumentException] should be thrownBy { in.pull() } - an[IllegalArgumentException] should be thrownBy { out.push(0) } - an[IllegalArgumentException] should be thrownBy { in.grab() } - - in.cancel() // This should have no effect now - stepAll() - - lastEvents() should be(Set.empty) - out.isAvailable should be(false) - out.isClosed should be(true) - in.isAvailable should be(false) - in.hasBeenPulled should be(false) - in.isClosed should be(true) - an[IllegalArgumentException] should be thrownBy { in.pull() } - an[IllegalArgumentException] should be thrownBy { out.push(0) } - an[IllegalArgumentException] should be thrownBy { in.grab() } - - out.complete() // This should have no effect now - stepAll() - - lastEvents() should be(Set.empty) - out.isAvailable should be(false) - out.isClosed should be(true) - in.isAvailable should be(false) - in.hasBeenPulled should be(false) - in.isClosed should be(true) - an[IllegalArgumentException] should be thrownBy { in.pull() } - an[IllegalArgumentException] should be thrownBy { out.push(0) } - an[IllegalArgumentException] should be thrownBy { in.grab() } - - } - - "propagate complete while pull is in flight" in new PortTestSetup { - in.pull() - - lastEvents() should be(Set.empty) - out.isAvailable should be(false) - out.isClosed should be(false) - in.isAvailable should be(false) - in.hasBeenPulled should be(true) - in.isClosed should be(false) - - out.complete() - - lastEvents() should be(Set.empty) - out.isAvailable should be(false) - out.isClosed should be(true) - in.isAvailable should be(false) - in.hasBeenPulled should be(true) - in.isClosed should be(false) - an[IllegalArgumentException] should be thrownBy { out.push(0) } - - stepAll() - - lastEvents() should be(Set(OnComplete(in))) - out.isAvailable should be(false) - out.isClosed should be(true) - in.isAvailable should be(false) - in.hasBeenPulled should be(false) - in.isClosed should be(true) - an[IllegalArgumentException] should be thrownBy { in.pull() } - an[IllegalArgumentException] should be thrownBy { out.push(0) } - an[IllegalArgumentException] should be thrownBy { in.grab() } - - in.cancel() // This should have no effect now - stepAll() - - lastEvents() should be(Set.empty) - out.isAvailable should be(false) - out.isClosed should be(true) - in.isAvailable should be(false) - in.hasBeenPulled should be(false) - in.isClosed should be(true) - an[IllegalArgumentException] should be thrownBy { in.pull() } - an[IllegalArgumentException] should be thrownBy { out.push(0) } - an[IllegalArgumentException] should be thrownBy { in.grab() } - - out.complete() // This should have no effect now - stepAll() - - lastEvents() should be(Set.empty) - out.isAvailable should be(false) - out.isClosed should be(true) - in.isAvailable should be(false) - in.hasBeenPulled should be(false) - in.isClosed should be(true) - an[IllegalArgumentException] should be thrownBy { in.pull() } - an[IllegalArgumentException] should be thrownBy { out.push(0) } - an[IllegalArgumentException] should be thrownBy { in.grab() } - } - - "propagate complete while push is in flight" in new PortTestSetup { - in.pull() - stepAll() - clearEvents() - - out.push(0) - - lastEvents() should be(Set.empty) - out.isAvailable should be(false) - out.isClosed should be(false) - in.isAvailable should be(false) - in.hasBeenPulled should be(true) - in.isClosed should be(false) - - out.complete() - - lastEvents() should be(Set.empty) - out.isAvailable should be(false) - out.isClosed should be(true) - in.isAvailable should be(false) - in.hasBeenPulled should be(true) - in.isClosed should be(false) - an[IllegalArgumentException] should be thrownBy { out.push(0) } - - step() - - lastEvents() should be(Set(OnNext(in, 0))) - out.isAvailable should be(false) - out.isClosed should be(true) - in.isAvailable should be(true) - in.hasBeenPulled should be(false) - in.isClosed should be(false) - an[IllegalArgumentException] should be thrownBy { out.push(0) } - in.grab() should ===(0) - an[IllegalArgumentException] should be thrownBy { in.grab() } - - step() - - lastEvents() should be(Set(OnComplete(in))) - out.isAvailable should be(false) - out.isClosed should be(true) - in.isAvailable should be(false) - in.hasBeenPulled should be(false) - in.isClosed should be(true) - an[IllegalArgumentException] should be thrownBy { in.pull() } - an[IllegalArgumentException] should be thrownBy { out.push(0) } - an[IllegalArgumentException] should be thrownBy { in.grab() } - - out.complete() // This should have no effect now - stepAll() - - lastEvents() should be(Set.empty) - out.isAvailable should be(false) - out.isClosed should be(true) - in.isAvailable should be(false) - in.hasBeenPulled should be(false) - in.isClosed should be(true) - an[IllegalArgumentException] should be thrownBy { in.pull() } - an[IllegalArgumentException] should be thrownBy { out.push(0) } - an[IllegalArgumentException] should be thrownBy { in.grab() } - } - - "propagate complete while push is in flight and keep ungrabbed element" in new PortTestSetup { - in.pull() - stepAll() - clearEvents() - - out.push(0) - out.complete() - step() - - lastEvents() should be(Set(OnNext(in, 0))) - step() - - lastEvents() should be(Set(OnComplete(in))) - out.isAvailable should be(false) - out.isClosed should be(true) - in.isAvailable should be(true) - in.hasBeenPulled should be(false) - in.isClosed should be(true) - in.grab() should ===(0) - } - - "propagate complete while push is in flight and pulled after the push" in new PortTestSetup { - in.pull() - stepAll() - clearEvents() - - out.push(0) - out.complete() - step() - - lastEvents() should be(Set(OnNext(in, 0))) - in.grab() should ===(0) - - in.pull() - stepAll() - - lastEvents() should be(Set(OnComplete(in))) - out.isAvailable should be(false) - out.isClosed should be(true) - in.isAvailable should be(false) - in.hasBeenPulled should be(false) - in.isClosed should be(true) - an[IllegalArgumentException] should be thrownBy { in.pull() } - an[IllegalArgumentException] should be thrownBy { out.push(0) } - an[IllegalArgumentException] should be thrownBy { in.grab() } - } - - "ignore pull while completing" in new PortTestSetup { - out.complete() - in.pull() - // While the pull event is not enqueued at this point, we should still report the state correctly - in.hasBeenPulled should be(true) - - stepAll() - - lastEvents() should be(Set(OnComplete(in))) - out.isAvailable should be(false) - out.isClosed should be(true) - in.isAvailable should be(false) - in.hasBeenPulled should be(false) - in.isClosed should be(true) - an[IllegalArgumentException] should be thrownBy { in.pull() } - an[IllegalArgumentException] should be thrownBy { out.push(0) } - an[IllegalArgumentException] should be thrownBy { in.grab() } - } - - "propagate cancel while downstream is active" in new PortTestSetup { - lastEvents() should be(Set.empty) - out.isAvailable should be(false) - out.isClosed should be(false) - in.isAvailable should be(false) - in.hasBeenPulled should be(false) - in.isClosed should be(false) - - in.cancel() - - lastEvents() should be(Set.empty) - out.isAvailable should be(false) - out.isClosed should be(false) - in.isAvailable should be(false) - in.hasBeenPulled should be(false) - in.isClosed should be(true) - an[IllegalArgumentException] should be thrownBy { in.pull() } - an[IllegalArgumentException] should be thrownBy { in.grab() } - - stepAll() - - lastEvents() should be(Set(Cancel(out))) - out.isAvailable should be(false) - out.isClosed should be(true) - in.isAvailable should be(false) - in.hasBeenPulled should be(false) - in.isClosed should be(true) - an[IllegalArgumentException] should be thrownBy { in.pull() } - an[IllegalArgumentException] should be thrownBy { out.push(0) } - an[IllegalArgumentException] should be thrownBy { in.grab() } - - out.complete() // This should have no effect now - stepAll() - - lastEvents() should be(Set.empty) - out.isAvailable should be(false) - out.isClosed should be(true) - in.isAvailable should be(false) - in.hasBeenPulled should be(false) - in.isClosed should be(true) - an[IllegalArgumentException] should be thrownBy { in.pull() } - an[IllegalArgumentException] should be thrownBy { out.push(0) } - an[IllegalArgumentException] should be thrownBy { in.grab() } - - in.cancel() // This should have no effect now - stepAll() - - lastEvents() should be(Set.empty) - out.isAvailable should be(false) - out.isClosed should be(true) - in.isAvailable should be(false) - in.hasBeenPulled should be(false) - in.isClosed should be(true) - an[IllegalArgumentException] should be thrownBy { in.pull() } - an[IllegalArgumentException] should be thrownBy { out.push(0) } - an[IllegalArgumentException] should be thrownBy { in.grab() } - } - - "propagate cancel while upstream is active" in new PortTestSetup { - in.pull() - stepAll() - - lastEvents() should be(Set(RequestOne(out))) - out.isAvailable should be(true) - out.isClosed should be(false) - in.isAvailable should be(false) - in.hasBeenPulled should be(true) - in.isClosed should be(false) - - in.cancel() - - lastEvents() should be(Set.empty) - out.isAvailable should be(true) - out.isClosed should be(false) - in.isAvailable should be(false) - in.hasBeenPulled should be(false) - in.isClosed should be(true) - an[IllegalArgumentException] should be thrownBy { in.pull() } - an[IllegalArgumentException] should be thrownBy { in.grab() } - - stepAll() - - lastEvents() should be(Set(Cancel(out))) - out.isAvailable should be(false) - out.isClosed should be(true) - in.isAvailable should be(false) - in.hasBeenPulled should be(false) - in.isClosed should be(true) - an[IllegalArgumentException] should be thrownBy { in.pull() } - an[IllegalArgumentException] should be thrownBy { out.push(0) } - an[IllegalArgumentException] should be thrownBy { in.grab() } - - out.complete() // This should have no effect now - stepAll() - - lastEvents() should be(Set.empty) - out.isAvailable should be(false) - out.isClosed should be(true) - in.isAvailable should be(false) - in.hasBeenPulled should be(false) - in.isClosed should be(true) - an[IllegalArgumentException] should be thrownBy { in.pull() } - an[IllegalArgumentException] should be thrownBy { out.push(0) } - an[IllegalArgumentException] should be thrownBy { in.grab() } - - in.cancel() // This should have no effect now - stepAll() - - lastEvents() should be(Set.empty) - out.isAvailable should be(false) - out.isClosed should be(true) - in.isAvailable should be(false) - in.hasBeenPulled should be(false) - in.isClosed should be(true) - an[IllegalArgumentException] should be thrownBy { in.pull() } - an[IllegalArgumentException] should be thrownBy { out.push(0) } - an[IllegalArgumentException] should be thrownBy { in.grab() } - - } - - "propagate cancel while pull is in flight" in new PortTestSetup { - in.pull() - - lastEvents() should be(Set.empty) - out.isAvailable should be(false) - out.isClosed should be(false) - in.isAvailable should be(false) - in.hasBeenPulled should be(true) - in.isClosed should be(false) - - in.cancel() - - lastEvents() should be(Set.empty) - out.isAvailable should be(false) - out.isClosed should be(false) - in.isAvailable should be(false) - in.hasBeenPulled should be(false) - in.isClosed should be(true) - an[IllegalArgumentException] should be thrownBy { out.push(0) } - an[IllegalArgumentException] should be thrownBy { in.grab() } - - stepAll() - - lastEvents() should be(Set(Cancel(out))) - out.isAvailable should be(false) - out.isClosed should be(true) - in.isAvailable should be(false) - in.hasBeenPulled should be(false) - in.isClosed should be(true) - an[IllegalArgumentException] should be thrownBy { in.pull() } - an[IllegalArgumentException] should be thrownBy { out.push(0) } - an[IllegalArgumentException] should be thrownBy { in.grab() } - - out.complete() // This should have no effect now - stepAll() - - lastEvents() should be(Set.empty) - out.isAvailable should be(false) - out.isClosed should be(true) - in.isAvailable should be(false) - in.hasBeenPulled should be(false) - in.isClosed should be(true) - an[IllegalArgumentException] should be thrownBy { in.pull() } - an[IllegalArgumentException] should be thrownBy { out.push(0) } - an[IllegalArgumentException] should be thrownBy { in.grab() } - - in.cancel() // This should have no effect now - stepAll() - - lastEvents() should be(Set.empty) - out.isAvailable should be(false) - out.isClosed should be(true) - in.isAvailable should be(false) - in.hasBeenPulled should be(false) - in.isClosed should be(true) - an[IllegalArgumentException] should be thrownBy { in.pull() } - an[IllegalArgumentException] should be thrownBy { out.push(0) } - an[IllegalArgumentException] should be thrownBy { in.grab() } - } - - "propagate cancel while push is in flight" in new PortTestSetup { - in.pull() - stepAll() - clearEvents() - - out.push(0) - - lastEvents() should be(Set.empty) - out.isAvailable should be(false) - out.isClosed should be(false) - in.isAvailable should be(false) - in.hasBeenPulled should be(true) - in.isClosed should be(false) - - in.cancel() - - lastEvents() should be(Set.empty) - out.isAvailable should be(false) - out.isClosed should be(false) - in.isAvailable should be(false) - in.hasBeenPulled should be(false) - in.isClosed should be(true) - an[IllegalArgumentException] should be thrownBy { in.pull() } - - stepAll() - - lastEvents() should be(Set(Cancel(out))) - out.isAvailable should be(false) - out.isClosed should be(true) - in.isAvailable should be(false) - in.hasBeenPulled should be(false) - in.isClosed should be(true) - an[IllegalArgumentException] should be thrownBy { in.pull() } - an[IllegalArgumentException] should be thrownBy { out.push(0) } - an[IllegalArgumentException] should be thrownBy { in.grab() } - - out.complete() // This should have no effect now - stepAll() - - lastEvents() should be(Set.empty) - out.isAvailable should be(false) - out.isClosed should be(true) - in.isAvailable should be(false) - in.hasBeenPulled should be(false) - in.isClosed should be(true) - an[IllegalArgumentException] should be thrownBy { in.pull() } - an[IllegalArgumentException] should be thrownBy { out.push(0) } - an[IllegalArgumentException] should be thrownBy { in.grab() } - - in.cancel() // This should have no effect now - stepAll() - - lastEvents() should be(Set.empty) - out.isAvailable should be(false) - out.isClosed should be(true) - in.isAvailable should be(false) - in.hasBeenPulled should be(false) - in.isClosed should be(true) - an[IllegalArgumentException] should be thrownBy { in.pull() } - an[IllegalArgumentException] should be thrownBy { out.push(0) } - an[IllegalArgumentException] should be thrownBy { in.grab() } - } - - "ignore push while cancelling" in new PortTestSetup { - in.pull() - stepAll() - clearEvents() - - in.cancel() - out.push(0) - // While the push event is not enqueued at this point, we should still report the state correctly - out.isAvailable should be(false) - - stepAll() - - lastEvents() should be(Set(Cancel(out))) - out.isAvailable should be(false) - out.isClosed should be(true) - in.isAvailable should be(false) - in.hasBeenPulled should be(false) - in.isClosed should be(true) - an[IllegalArgumentException] should be thrownBy { in.pull() } - an[IllegalArgumentException] should be thrownBy { out.push(0) } - an[IllegalArgumentException] should be thrownBy { in.grab() } - } - - "clear ungrabbed element even when cancelled" in new PortTestSetup { - in.pull() - stepAll() - clearEvents() - out.push(0) - stepAll() - - lastEvents() should be(Set(OnNext(in, 0))) - - in.cancel() - - lastEvents() should be(Set.empty) - out.isAvailable should be(false) - out.isClosed should be(false) - in.isAvailable should be(false) - in.hasBeenPulled should be(false) - in.isClosed should be(true) - an[IllegalArgumentException] should be thrownBy { in.pull() } - an[IllegalArgumentException] should be thrownBy { in.grab() } - - stepAll() - lastEvents() should be(Set(Cancel(out))) - out.isAvailable should be(false) - out.isClosed should be(true) - in.isAvailable should be(false) - in.hasBeenPulled should be(false) - in.isClosed should be(true) - an[IllegalArgumentException] should be thrownBy { in.pull() } - an[IllegalArgumentException] should be thrownBy { out.push(0) } - an[IllegalArgumentException] should be thrownBy { in.grab() } - } - - "ignore any completion if they are concurrent (cancel first)" in new PortTestSetup { - in.cancel() - out.complete() - - stepAll() - - lastEvents() should be(Set.empty) - out.isAvailable should be(false) - out.isClosed should be(true) - in.isAvailable should be(false) - in.hasBeenPulled should be(false) - in.isClosed should be(true) - an[IllegalArgumentException] should be thrownBy { in.pull() } - an[IllegalArgumentException] should be thrownBy { out.push(0) } - an[IllegalArgumentException] should be thrownBy { in.grab() } - } - - "ignore any completion if they are concurrent (complete first)" in new PortTestSetup { - out.complete() - in.cancel() - - stepAll() - - lastEvents() should be(Set.empty) - out.isAvailable should be(false) - out.isClosed should be(true) - in.isAvailable should be(false) - in.hasBeenPulled should be(false) - in.isClosed should be(true) - an[IllegalArgumentException] should be thrownBy { in.pull() } - an[IllegalArgumentException] should be thrownBy { out.push(0) } - an[IllegalArgumentException] should be thrownBy { in.grab() } - } - - "ignore completion from a push-complete if cancelled while in flight" in new PortTestSetup { - in.pull() - stepAll() - clearEvents() - - out.push(0) - out.complete() - in.cancel() - - stepAll() - - lastEvents() should be(Set.empty) - out.isAvailable should be(false) - out.isClosed should be(true) - in.isAvailable should be(false) - in.hasBeenPulled should be(false) - in.isClosed should be(true) - an[IllegalArgumentException] should be thrownBy { in.pull() } - an[IllegalArgumentException] should be thrownBy { out.push(0) } - an[IllegalArgumentException] should be thrownBy { in.grab() } - } - - "ignore completion from a push-complete if cancelled after onPush" in new PortTestSetup { - in.pull() - stepAll() - clearEvents() - - out.push(0) - out.complete() - - step() - - lastEvents() should be(Set(OnNext(in, 0))) - out.isAvailable should be(false) - out.isClosed should be(true) - in.isAvailable should be(true) - in.hasBeenPulled should be(false) - in.isClosed should be(false) - an[IllegalArgumentException] should be thrownBy { out.push(0) } - in.grab() should ===(0) - - in.cancel() - stepAll() - - lastEvents() should be(Set.empty) - out.isAvailable should be(false) - out.isClosed should be(true) - in.isAvailable should be(false) - in.hasBeenPulled should be(false) - in.isClosed should be(true) - an[IllegalArgumentException] should be thrownBy { in.pull() } - an[IllegalArgumentException] should be thrownBy { out.push(0) } - an[IllegalArgumentException] should be thrownBy { in.grab() } - } - - "not allow to grab element before it arrives" in new PortTestSetup { - in.pull() - stepAll() - out.push(0) - - an[IllegalArgumentException] should be thrownBy { in.grab() } - } - - "not allow to grab element if already cancelled" in new PortTestSetup { - in.pull() - stepAll() - - out.push(0) - in.cancel() - - stepAll() - - an[IllegalArgumentException] should be thrownBy { in.grab() } - } - - "propagate failure while downstream is active" in new PortTestSetup { - lastEvents() should be(Set.empty) - out.isAvailable should be(false) - out.isClosed should be(false) - in.isAvailable should be(false) - in.hasBeenPulled should be(false) - in.isClosed should be(false) - - out.fail(TE("test")) - - lastEvents() should be(Set.empty) - out.isAvailable should be(false) - out.isClosed should be(true) - in.isAvailable should be(false) - in.hasBeenPulled should be(false) - in.isClosed should be(false) - an[IllegalArgumentException] should be thrownBy { out.push(0) } - - stepAll() - - lastEvents() should be(Set(OnError(in, TE("test")))) - out.isAvailable should be(false) - out.isClosed should be(true) - in.isAvailable should be(false) - in.hasBeenPulled should be(false) - in.isClosed should be(true) - an[IllegalArgumentException] should be thrownBy { in.pull() } - an[IllegalArgumentException] should be thrownBy { out.push(0) } - an[IllegalArgumentException] should be thrownBy { in.grab() } - - in.cancel() // This should have no effect now - stepAll() - - lastEvents() should be(Set.empty) - out.isAvailable should be(false) - out.isClosed should be(true) - in.isAvailable should be(false) - in.hasBeenPulled should be(false) - in.isClosed should be(true) - an[IllegalArgumentException] should be thrownBy { in.pull() } - an[IllegalArgumentException] should be thrownBy { out.push(0) } - an[IllegalArgumentException] should be thrownBy { in.grab() } - - out.complete() // This should have no effect now - stepAll() - - lastEvents() should be(Set.empty) - out.isAvailable should be(false) - out.isClosed should be(true) - in.isAvailable should be(false) - in.hasBeenPulled should be(false) - in.isClosed should be(true) - an[IllegalArgumentException] should be thrownBy { in.pull() } - an[IllegalArgumentException] should be thrownBy { out.push(0) } - an[IllegalArgumentException] should be thrownBy { in.grab() } - } - - "propagate failure while upstream is active" in new PortTestSetup { - in.pull() - stepAll() - - lastEvents() should be(Set(RequestOne(out))) - out.isAvailable should be(true) - out.isClosed should be(false) - in.isAvailable should be(false) - in.hasBeenPulled should be(true) - in.isClosed should be(false) - - out.fail(TE("test")) - - lastEvents() should be(Set.empty) - out.isAvailable should be(false) - out.isClosed should be(true) - in.isAvailable should be(false) - in.hasBeenPulled should be(true) - in.isClosed should be(false) - an[IllegalArgumentException] should be thrownBy { out.push(0) } - - stepAll() - - lastEvents() should be(Set(OnError(in, TE("test")))) - out.isAvailable should be(false) - out.isClosed should be(true) - in.isAvailable should be(false) - in.hasBeenPulled should be(false) - in.isClosed should be(true) - an[IllegalArgumentException] should be thrownBy { in.pull() } - an[IllegalArgumentException] should be thrownBy { out.push(0) } - an[IllegalArgumentException] should be thrownBy { in.grab() } - - in.cancel() // This should have no effect now - stepAll() - - lastEvents() should be(Set.empty) - out.isAvailable should be(false) - out.isClosed should be(true) - in.isAvailable should be(false) - in.hasBeenPulled should be(false) - in.isClosed should be(true) - an[IllegalArgumentException] should be thrownBy { in.pull() } - an[IllegalArgumentException] should be thrownBy { out.push(0) } - an[IllegalArgumentException] should be thrownBy { in.grab() } - - out.complete() // This should have no effect now - stepAll() - - lastEvents() should be(Set.empty) - out.isAvailable should be(false) - out.isClosed should be(true) - in.isAvailable should be(false) - in.hasBeenPulled should be(false) - in.isClosed should be(true) - an[IllegalArgumentException] should be thrownBy { in.pull() } - an[IllegalArgumentException] should be thrownBy { out.push(0) } - an[IllegalArgumentException] should be thrownBy { in.grab() } - - } - - "propagate failure while pull is in flight" in new PortTestSetup { - in.pull() - - lastEvents() should be(Set.empty) - out.isAvailable should be(false) - out.isClosed should be(false) - in.isAvailable should be(false) - in.hasBeenPulled should be(true) - in.isClosed should be(false) - - out.fail(TE("test")) - - lastEvents() should be(Set.empty) - out.isAvailable should be(false) - out.isClosed should be(true) - in.isAvailable should be(false) - in.hasBeenPulled should be(true) - in.isClosed should be(false) - an[IllegalArgumentException] should be thrownBy { out.push(0) } - - stepAll() - - lastEvents() should be(Set(OnError(in, TE("test")))) - out.isAvailable should be(false) - out.isClosed should be(true) - in.isAvailable should be(false) - in.hasBeenPulled should be(false) - in.isClosed should be(true) - an[IllegalArgumentException] should be thrownBy { in.pull() } - an[IllegalArgumentException] should be thrownBy { out.push(0) } - an[IllegalArgumentException] should be thrownBy { in.grab() } - - in.cancel() // This should have no effect now - stepAll() - - lastEvents() should be(Set.empty) - out.isAvailable should be(false) - out.isClosed should be(true) - in.isAvailable should be(false) - in.hasBeenPulled should be(false) - in.isClosed should be(true) - an[IllegalArgumentException] should be thrownBy { in.pull() } - an[IllegalArgumentException] should be thrownBy { out.push(0) } - an[IllegalArgumentException] should be thrownBy { in.grab() } - - out.complete() // This should have no effect now - stepAll() - - lastEvents() should be(Set.empty) - out.isAvailable should be(false) - out.isClosed should be(true) - in.isAvailable should be(false) - in.hasBeenPulled should be(false) - in.isClosed should be(true) - an[IllegalArgumentException] should be thrownBy { in.pull() } - an[IllegalArgumentException] should be thrownBy { out.push(0) } - an[IllegalArgumentException] should be thrownBy { in.grab() } - } - - "propagate failure while push is in flight" in new PortTestSetup { - in.pull() - stepAll() - clearEvents() - - out.push(0) - - lastEvents() should be(Set.empty) - out.isAvailable should be(false) - out.isClosed should be(false) - in.isAvailable should be(false) - in.hasBeenPulled should be(true) - in.isClosed should be(false) - - out.fail(TE("test")) - - lastEvents() should be(Set.empty) - out.isAvailable should be(false) - out.isClosed should be(true) - in.isAvailable should be(false) - in.hasBeenPulled should be(true) - in.isClosed should be(false) - an[IllegalArgumentException] should be thrownBy { out.push(0) } - - step() - - lastEvents() should be(Set(OnNext(in, 0))) - out.isAvailable should be(false) - out.isClosed should be(true) - in.isAvailable should be(true) - in.hasBeenPulled should be(false) - in.isClosed should be(false) - an[IllegalArgumentException] should be thrownBy { out.push(0) } - in.grab() should ===(0) - an[IllegalArgumentException] should be thrownBy { in.grab() } - - step() - - lastEvents() should be(Set(OnError(in, TE("test")))) - out.isAvailable should be(false) - out.isClosed should be(true) - in.isAvailable should be(false) - in.hasBeenPulled should be(false) - in.isClosed should be(true) - an[IllegalArgumentException] should be thrownBy { in.pull() } - an[IllegalArgumentException] should be thrownBy { out.push(0) } - an[IllegalArgumentException] should be thrownBy { in.grab() } - - out.complete() // This should have no effect now - stepAll() - - lastEvents() should be(Set.empty) - out.isAvailable should be(false) - out.isClosed should be(true) - in.isAvailable should be(false) - in.hasBeenPulled should be(false) - in.isClosed should be(true) - an[IllegalArgumentException] should be thrownBy { in.pull() } - an[IllegalArgumentException] should be thrownBy { out.push(0) } - an[IllegalArgumentException] should be thrownBy { in.grab() } - } - - "propagate failure while push is in flight and keep ungrabbed element" in new PortTestSetup { - in.pull() - stepAll() - clearEvents() - - out.push(0) - out.fail(TE("test")) - step() - - lastEvents() should be(Set(OnNext(in, 0))) - step() - - lastEvents() should be(Set(OnError(in, TE("test")))) - out.isAvailable should be(false) - out.isClosed should be(true) - in.isAvailable should be(true) - in.hasBeenPulled should be(false) - in.isClosed should be(true) - in.grab() should ===(0) - } - - "ignore pull while failing" in new PortTestSetup { - out.fail(TE("test")) - in.pull() - in.hasBeenPulled should be(true) - - stepAll() - - lastEvents() should be(Set(OnError(in, TE("test")))) - out.isAvailable should be(false) - out.isClosed should be(true) - in.isAvailable should be(false) - in.hasBeenPulled should be(false) - in.isClosed should be(true) - an[IllegalArgumentException] should be thrownBy { in.pull() } - an[IllegalArgumentException] should be thrownBy { out.push(0) } - an[IllegalArgumentException] should be thrownBy { in.grab() } - } - - "ignore any failure completion if they are concurrent (cancel first)" in new PortTestSetup { - in.cancel() - out.fail(TE("test")) - - stepAll() - - lastEvents() should be(Set.empty) - out.isAvailable should be(false) - out.isClosed should be(true) - in.isAvailable should be(false) - in.hasBeenPulled should be(false) - in.isClosed should be(true) - an[IllegalArgumentException] should be thrownBy { in.pull() } - an[IllegalArgumentException] should be thrownBy { out.push(0) } - an[IllegalArgumentException] should be thrownBy { in.grab() } - } - - "ignore any failure completion if they are concurrent (complete first)" in new PortTestSetup { - out.fail(TE("test")) - in.cancel() - - stepAll() - - lastEvents() should be(Set.empty) - out.isAvailable should be(false) - out.isClosed should be(true) - in.isAvailable should be(false) - in.hasBeenPulled should be(false) - in.isClosed should be(true) - an[IllegalArgumentException] should be thrownBy { in.pull() } - an[IllegalArgumentException] should be thrownBy { out.push(0) } - an[IllegalArgumentException] should be thrownBy { in.grab() } - } - - "ignore failure from a push-then-fail if cancelled while in flight" in new PortTestSetup { - in.pull() - stepAll() - clearEvents() - - out.push(0) - out.fail(TE("test")) - in.cancel() - - stepAll() - - lastEvents() should be(Set.empty) - out.isAvailable should be(false) - out.isClosed should be(true) - in.isAvailable should be(false) - in.hasBeenPulled should be(false) - in.isClosed should be(true) - an[IllegalArgumentException] should be thrownBy { in.pull() } - an[IllegalArgumentException] should be thrownBy { out.push(0) } - an[IllegalArgumentException] should be thrownBy { in.grab() } - } - - "ignore failure from a push-then-fail if cancelled after onPush" in new PortTestSetup { - in.pull() - stepAll() - clearEvents() - - out.push(0) - out.fail(TE("test")) - - step() - - lastEvents() should be(Set(OnNext(in, 0))) - out.isAvailable should be(false) - out.isClosed should be(true) - in.isAvailable should be(true) - in.hasBeenPulled should be(false) - in.isClosed should be(false) - an[IllegalArgumentException] should be thrownBy { out.push(0) } - in.grab() should ===(0) - - in.cancel() - stepAll() - - lastEvents() should be(Set.empty) - out.isAvailable should be(false) - out.isClosed should be(true) - in.isAvailable should be(false) - in.hasBeenPulled should be(false) - in.isClosed should be(true) - an[IllegalArgumentException] should be thrownBy { in.pull() } - an[IllegalArgumentException] should be thrownBy { out.push(0) } - an[IllegalArgumentException] should be thrownBy { in.grab() } + for (chasing ← List(false, true)) { + + s"properly transition on push and pull (chasing = $chasing)" in new PortTestSetup(chasing) { + lastEvents() should be(Set.empty) + out.isAvailable should be(false) + out.isClosed should be(false) + in.isAvailable should be(false) + in.hasBeenPulled should be(false) + in.isClosed should be(false) + an[IllegalArgumentException] should be thrownBy { out.push(0) } + an[IllegalArgumentException] should be thrownBy { in.grab() } + + in.pull() + + lastEvents() should be(Set.empty) + out.isAvailable should be(false) + out.isClosed should be(false) + in.isAvailable should be(false) + in.hasBeenPulled should be(true) + in.isClosed should be(false) + an[IllegalArgumentException] should be thrownBy { in.pull() } + an[IllegalArgumentException] should be thrownBy { out.push(0) } + an[IllegalArgumentException] should be thrownBy { in.grab() } + + stepAll() + + lastEvents() should be(Set(RequestOne(out))) + out.isAvailable should be(true) + out.isClosed should be(false) + in.isAvailable should be(false) + in.hasBeenPulled should be(true) + in.isClosed should be(false) + an[IllegalArgumentException] should be thrownBy { in.pull() } + an[IllegalArgumentException] should be thrownBy { in.grab() } + + out.push(0) + + lastEvents() should be(Set.empty) + out.isAvailable should be(false) + out.isClosed should be(false) + in.isAvailable should be(false) + in.hasBeenPulled should be(true) + in.isClosed should be(false) + an[IllegalArgumentException] should be thrownBy { in.pull() } + an[IllegalArgumentException] should be thrownBy { out.push(0) } + an[IllegalArgumentException] should be thrownBy { in.grab() } + + stepAll() + + lastEvents() should be(Set(OnNext(in, 0))) + out.isAvailable should be(false) + out.isClosed should be(false) + in.isAvailable should be(true) + in.hasBeenPulled should be(false) + in.isClosed should be(false) + an[IllegalArgumentException] should be thrownBy { out.push(0) } + + in.grab() should ===(0) + + lastEvents() should be(Set.empty) + out.isAvailable should be(false) + out.isClosed should be(false) + in.isAvailable should be(false) + in.hasBeenPulled should be(false) + in.isClosed should be(false) + an[IllegalArgumentException] should be thrownBy { out.push(0) } + an[IllegalArgumentException] should be thrownBy { in.grab() } + + // Cycle completed + } + + s"drop ungrabbed element on pull (chasing = $chasing)" in new PortTestSetup(chasing) { + in.pull() + step() + clearEvents() + out.push(0) + step() + + lastEvents() should be(Set(OnNext(in, 0))) + + in.pull() + + an[IllegalArgumentException] should be thrownBy { in.grab() } + } + + s"propagate complete while downstream is active (chasing = $chasing)" in new PortTestSetup(chasing) { + lastEvents() should be(Set.empty) + out.isAvailable should be(false) + out.isClosed should be(false) + in.isAvailable should be(false) + in.hasBeenPulled should be(false) + in.isClosed should be(false) + + out.complete() + + lastEvents() should be(Set.empty) + out.isAvailable should be(false) + out.isClosed should be(true) + in.isAvailable should be(false) + in.hasBeenPulled should be(false) + in.isClosed should be(false) + an[IllegalArgumentException] should be thrownBy { out.push(0) } + + stepAll() + + lastEvents() should be(Set(OnComplete(in))) + out.isAvailable should be(false) + out.isClosed should be(true) + in.isAvailable should be(false) + in.hasBeenPulled should be(false) + in.isClosed should be(true) + an[IllegalArgumentException] should be thrownBy { in.pull() } + an[IllegalArgumentException] should be thrownBy { out.push(0) } + an[IllegalArgumentException] should be thrownBy { in.grab() } + + in.cancel() // This should have no effect now + stepAll() + + lastEvents() should be(Set.empty) + out.isAvailable should be(false) + out.isClosed should be(true) + in.isAvailable should be(false) + in.hasBeenPulled should be(false) + in.isClosed should be(true) + an[IllegalArgumentException] should be thrownBy { in.pull() } + an[IllegalArgumentException] should be thrownBy { out.push(0) } + an[IllegalArgumentException] should be thrownBy { in.grab() } + + out.complete() // This should have no effect now + stepAll() + + lastEvents() should be(Set.empty) + out.isAvailable should be(false) + out.isClosed should be(true) + in.isAvailable should be(false) + in.hasBeenPulled should be(false) + in.isClosed should be(true) + an[IllegalArgumentException] should be thrownBy { in.pull() } + an[IllegalArgumentException] should be thrownBy { out.push(0) } + an[IllegalArgumentException] should be thrownBy { in.grab() } + } + + s"propagate complete while upstream is active (chasing = $chasing)" in new PortTestSetup(chasing) { + in.pull() + stepAll() + + lastEvents() should be(Set(RequestOne(out))) + out.isAvailable should be(true) + out.isClosed should be(false) + in.isAvailable should be(false) + in.hasBeenPulled should be(true) + in.isClosed should be(false) + + out.complete() + + lastEvents() should be(Set.empty) + out.isAvailable should be(false) + out.isClosed should be(true) + in.isAvailable should be(false) + in.hasBeenPulled should be(true) + in.isClosed should be(false) + an[IllegalArgumentException] should be thrownBy { out.push(0) } + + stepAll() + + lastEvents() should be(Set(OnComplete(in))) + out.isAvailable should be(false) + out.isClosed should be(true) + in.isAvailable should be(false) + in.hasBeenPulled should be(false) + in.isClosed should be(true) + an[IllegalArgumentException] should be thrownBy { in.pull() } + an[IllegalArgumentException] should be thrownBy { out.push(0) } + an[IllegalArgumentException] should be thrownBy { in.grab() } + + in.cancel() // This should have no effect now + stepAll() + + lastEvents() should be(Set.empty) + out.isAvailable should be(false) + out.isClosed should be(true) + in.isAvailable should be(false) + in.hasBeenPulled should be(false) + in.isClosed should be(true) + an[IllegalArgumentException] should be thrownBy { in.pull() } + an[IllegalArgumentException] should be thrownBy { out.push(0) } + an[IllegalArgumentException] should be thrownBy { in.grab() } + + out.complete() // This should have no effect now + stepAll() + + lastEvents() should be(Set.empty) + out.isAvailable should be(false) + out.isClosed should be(true) + in.isAvailable should be(false) + in.hasBeenPulled should be(false) + in.isClosed should be(true) + an[IllegalArgumentException] should be thrownBy { in.pull() } + an[IllegalArgumentException] should be thrownBy { out.push(0) } + an[IllegalArgumentException] should be thrownBy { in.grab() } + + } + + s"propagate complete while pull is in flight (chasing = $chasing)" in new PortTestSetup(chasing) { + in.pull() + + lastEvents() should be(Set.empty) + out.isAvailable should be(false) + out.isClosed should be(false) + in.isAvailable should be(false) + in.hasBeenPulled should be(true) + in.isClosed should be(false) + + out.complete() + + lastEvents() should be(Set.empty) + out.isAvailable should be(false) + out.isClosed should be(true) + in.isAvailable should be(false) + in.hasBeenPulled should be(true) + in.isClosed should be(false) + an[IllegalArgumentException] should be thrownBy { out.push(0) } + + stepAll() + + lastEvents() should be(Set(OnComplete(in))) + out.isAvailable should be(false) + out.isClosed should be(true) + in.isAvailable should be(false) + in.hasBeenPulled should be(false) + in.isClosed should be(true) + an[IllegalArgumentException] should be thrownBy { in.pull() } + an[IllegalArgumentException] should be thrownBy { out.push(0) } + an[IllegalArgumentException] should be thrownBy { in.grab() } + + in.cancel() // This should have no effect now + stepAll() + + lastEvents() should be(Set.empty) + out.isAvailable should be(false) + out.isClosed should be(true) + in.isAvailable should be(false) + in.hasBeenPulled should be(false) + in.isClosed should be(true) + an[IllegalArgumentException] should be thrownBy { in.pull() } + an[IllegalArgumentException] should be thrownBy { out.push(0) } + an[IllegalArgumentException] should be thrownBy { in.grab() } + + out.complete() // This should have no effect now + stepAll() + + lastEvents() should be(Set.empty) + out.isAvailable should be(false) + out.isClosed should be(true) + in.isAvailable should be(false) + in.hasBeenPulled should be(false) + in.isClosed should be(true) + an[IllegalArgumentException] should be thrownBy { in.pull() } + an[IllegalArgumentException] should be thrownBy { out.push(0) } + an[IllegalArgumentException] should be thrownBy { in.grab() } + } + + s"propagate complete while push is in flight (chasing = $chasing)" in new PortTestSetup(chasing) { + in.pull() + stepAll() + clearEvents() + + out.push(0) + + lastEvents() should be(Set.empty) + out.isAvailable should be(false) + out.isClosed should be(false) + in.isAvailable should be(false) + in.hasBeenPulled should be(true) + in.isClosed should be(false) + + out.complete() + + lastEvents() should be(Set.empty) + out.isAvailable should be(false) + out.isClosed should be(true) + in.isAvailable should be(false) + in.hasBeenPulled should be(true) + in.isClosed should be(false) + an[IllegalArgumentException] should be thrownBy { out.push(0) } + + step() + + lastEvents() should be(Set(OnNext(in, 0))) + out.isAvailable should be(false) + out.isClosed should be(true) + in.isAvailable should be(true) + in.hasBeenPulled should be(false) + in.isClosed should be(false) + an[IllegalArgumentException] should be thrownBy { out.push(0) } + in.grab() should ===(0) + an[IllegalArgumentException] should be thrownBy { in.grab() } + + step() + + lastEvents() should be(Set(OnComplete(in))) + out.isAvailable should be(false) + out.isClosed should be(true) + in.isAvailable should be(false) + in.hasBeenPulled should be(false) + in.isClosed should be(true) + an[IllegalArgumentException] should be thrownBy { in.pull() } + an[IllegalArgumentException] should be thrownBy { out.push(0) } + an[IllegalArgumentException] should be thrownBy { in.grab() } + + out.complete() // This should have no effect now + stepAll() + + lastEvents() should be(Set.empty) + out.isAvailable should be(false) + out.isClosed should be(true) + in.isAvailable should be(false) + in.hasBeenPulled should be(false) + in.isClosed should be(true) + an[IllegalArgumentException] should be thrownBy { in.pull() } + an[IllegalArgumentException] should be thrownBy { out.push(0) } + an[IllegalArgumentException] should be thrownBy { in.grab() } + } + + s"propagate complete while push is in flight and keep ungrabbed element (chasing = $chasing)" in new PortTestSetup(chasing) { + in.pull() + stepAll() + clearEvents() + + out.push(0) + out.complete() + step() + + lastEvents() should be(Set(OnNext(in, 0))) + step() + + lastEvents() should be(Set(OnComplete(in))) + out.isAvailable should be(false) + out.isClosed should be(true) + in.isAvailable should be(true) + in.hasBeenPulled should be(false) + in.isClosed should be(true) + in.grab() should ===(0) + } + + s"propagate complete while push is in flight and pulled after the push (chasing = $chasing)" in new PortTestSetup(chasing) { + in.pull() + stepAll() + clearEvents() + + out.push(0) + out.complete() + step() + + lastEvents() should be(Set(OnNext(in, 0))) + in.grab() should ===(0) + + in.pull() + stepAll() + + lastEvents() should be(Set(OnComplete(in))) + out.isAvailable should be(false) + out.isClosed should be(true) + in.isAvailable should be(false) + in.hasBeenPulled should be(false) + in.isClosed should be(true) + an[IllegalArgumentException] should be thrownBy { in.pull() } + an[IllegalArgumentException] should be thrownBy { out.push(0) } + an[IllegalArgumentException] should be thrownBy { in.grab() } + } + + s"ignore pull while completing (chasing = $chasing)" in new PortTestSetup(chasing) { + out.complete() + in.pull() + // While the pull event is not enqueued at this point, we should still report the state correctly + in.hasBeenPulled should be(true) + + stepAll() + + lastEvents() should be(Set(OnComplete(in))) + out.isAvailable should be(false) + out.isClosed should be(true) + in.isAvailable should be(false) + in.hasBeenPulled should be(false) + in.isClosed should be(true) + an[IllegalArgumentException] should be thrownBy { in.pull() } + an[IllegalArgumentException] should be thrownBy { out.push(0) } + an[IllegalArgumentException] should be thrownBy { in.grab() } + } + + s"propagate cancel while downstream is active (chasing = $chasing)" in new PortTestSetup(chasing) { + lastEvents() should be(Set.empty) + out.isAvailable should be(false) + out.isClosed should be(false) + in.isAvailable should be(false) + in.hasBeenPulled should be(false) + in.isClosed should be(false) + + in.cancel() + + lastEvents() should be(Set.empty) + out.isAvailable should be(false) + out.isClosed should be(false) + in.isAvailable should be(false) + in.hasBeenPulled should be(false) + in.isClosed should be(true) + an[IllegalArgumentException] should be thrownBy { in.pull() } + an[IllegalArgumentException] should be thrownBy { in.grab() } + + stepAll() + + lastEvents() should be(Set(Cancel(out))) + out.isAvailable should be(false) + out.isClosed should be(true) + in.isAvailable should be(false) + in.hasBeenPulled should be(false) + in.isClosed should be(true) + an[IllegalArgumentException] should be thrownBy { in.pull() } + an[IllegalArgumentException] should be thrownBy { out.push(0) } + an[IllegalArgumentException] should be thrownBy { in.grab() } + + out.complete() // This should have no effect now + stepAll() + + lastEvents() should be(Set.empty) + out.isAvailable should be(false) + out.isClosed should be(true) + in.isAvailable should be(false) + in.hasBeenPulled should be(false) + in.isClosed should be(true) + an[IllegalArgumentException] should be thrownBy { in.pull() } + an[IllegalArgumentException] should be thrownBy { out.push(0) } + an[IllegalArgumentException] should be thrownBy { in.grab() } + + in.cancel() // This should have no effect now + stepAll() + + lastEvents() should be(Set.empty) + out.isAvailable should be(false) + out.isClosed should be(true) + in.isAvailable should be(false) + in.hasBeenPulled should be(false) + in.isClosed should be(true) + an[IllegalArgumentException] should be thrownBy { in.pull() } + an[IllegalArgumentException] should be thrownBy { out.push(0) } + an[IllegalArgumentException] should be thrownBy { in.grab() } + } + + s"propagate cancel while upstream is active (chasing = $chasing)" in new PortTestSetup(chasing) { + in.pull() + stepAll() + + lastEvents() should be(Set(RequestOne(out))) + out.isAvailable should be(true) + out.isClosed should be(false) + in.isAvailable should be(false) + in.hasBeenPulled should be(true) + in.isClosed should be(false) + + in.cancel() + + lastEvents() should be(Set.empty) + out.isAvailable should be(true) + out.isClosed should be(false) + in.isAvailable should be(false) + in.hasBeenPulled should be(false) + in.isClosed should be(true) + an[IllegalArgumentException] should be thrownBy { in.pull() } + an[IllegalArgumentException] should be thrownBy { in.grab() } + + stepAll() + + lastEvents() should be(Set(Cancel(out))) + out.isAvailable should be(false) + out.isClosed should be(true) + in.isAvailable should be(false) + in.hasBeenPulled should be(false) + in.isClosed should be(true) + an[IllegalArgumentException] should be thrownBy { in.pull() } + an[IllegalArgumentException] should be thrownBy { out.push(0) } + an[IllegalArgumentException] should be thrownBy { in.grab() } + + out.complete() // This should have no effect now + stepAll() + + lastEvents() should be(Set.empty) + out.isAvailable should be(false) + out.isClosed should be(true) + in.isAvailable should be(false) + in.hasBeenPulled should be(false) + in.isClosed should be(true) + an[IllegalArgumentException] should be thrownBy { in.pull() } + an[IllegalArgumentException] should be thrownBy { out.push(0) } + an[IllegalArgumentException] should be thrownBy { in.grab() } + + in.cancel() // This should have no effect now + stepAll() + + lastEvents() should be(Set.empty) + out.isAvailable should be(false) + out.isClosed should be(true) + in.isAvailable should be(false) + in.hasBeenPulled should be(false) + in.isClosed should be(true) + an[IllegalArgumentException] should be thrownBy { in.pull() } + an[IllegalArgumentException] should be thrownBy { out.push(0) } + an[IllegalArgumentException] should be thrownBy { in.grab() } + + } + + s"propagate cancel while pull is in flight (chasing = $chasing)" in new PortTestSetup(chasing) { + in.pull() + + lastEvents() should be(Set.empty) + out.isAvailable should be(false) + out.isClosed should be(false) + in.isAvailable should be(false) + in.hasBeenPulled should be(true) + in.isClosed should be(false) + + in.cancel() + + lastEvents() should be(Set.empty) + out.isAvailable should be(false) + out.isClosed should be(false) + in.isAvailable should be(false) + in.hasBeenPulled should be(false) + in.isClosed should be(true) + an[IllegalArgumentException] should be thrownBy { out.push(0) } + an[IllegalArgumentException] should be thrownBy { in.grab() } + + stepAll() + + lastEvents() should be(Set(Cancel(out))) + out.isAvailable should be(false) + out.isClosed should be(true) + in.isAvailable should be(false) + in.hasBeenPulled should be(false) + in.isClosed should be(true) + an[IllegalArgumentException] should be thrownBy { in.pull() } + an[IllegalArgumentException] should be thrownBy { out.push(0) } + an[IllegalArgumentException] should be thrownBy { in.grab() } + + out.complete() // This should have no effect now + stepAll() + + lastEvents() should be(Set.empty) + out.isAvailable should be(false) + out.isClosed should be(true) + in.isAvailable should be(false) + in.hasBeenPulled should be(false) + in.isClosed should be(true) + an[IllegalArgumentException] should be thrownBy { in.pull() } + an[IllegalArgumentException] should be thrownBy { out.push(0) } + an[IllegalArgumentException] should be thrownBy { in.grab() } + + in.cancel() // This should have no effect now + stepAll() + + lastEvents() should be(Set.empty) + out.isAvailable should be(false) + out.isClosed should be(true) + in.isAvailable should be(false) + in.hasBeenPulled should be(false) + in.isClosed should be(true) + an[IllegalArgumentException] should be thrownBy { in.pull() } + an[IllegalArgumentException] should be thrownBy { out.push(0) } + an[IllegalArgumentException] should be thrownBy { in.grab() } + } + + s"propagate cancel while push is in flight (chasing = $chasing)" in new PortTestSetup(chasing) { + in.pull() + stepAll() + clearEvents() + + out.push(0) + + lastEvents() should be(Set.empty) + out.isAvailable should be(false) + out.isClosed should be(false) + in.isAvailable should be(false) + in.hasBeenPulled should be(true) + in.isClosed should be(false) + + in.cancel() + + lastEvents() should be(Set.empty) + out.isAvailable should be(false) + out.isClosed should be(false) + in.isAvailable should be(false) + in.hasBeenPulled should be(false) + in.isClosed should be(true) + an[IllegalArgumentException] should be thrownBy { in.pull() } + + stepAll() + + lastEvents() should be(Set(Cancel(out))) + out.isAvailable should be(false) + out.isClosed should be(true) + in.isAvailable should be(false) + in.hasBeenPulled should be(false) + in.isClosed should be(true) + an[IllegalArgumentException] should be thrownBy { in.pull() } + an[IllegalArgumentException] should be thrownBy { out.push(0) } + an[IllegalArgumentException] should be thrownBy { in.grab() } + + out.complete() // This should have no effect now + stepAll() + + lastEvents() should be(Set.empty) + out.isAvailable should be(false) + out.isClosed should be(true) + in.isAvailable should be(false) + in.hasBeenPulled should be(false) + in.isClosed should be(true) + an[IllegalArgumentException] should be thrownBy { in.pull() } + an[IllegalArgumentException] should be thrownBy { out.push(0) } + an[IllegalArgumentException] should be thrownBy { in.grab() } + + in.cancel() // This should have no effect now + stepAll() + + lastEvents() should be(Set.empty) + out.isAvailable should be(false) + out.isClosed should be(true) + in.isAvailable should be(false) + in.hasBeenPulled should be(false) + in.isClosed should be(true) + an[IllegalArgumentException] should be thrownBy { in.pull() } + an[IllegalArgumentException] should be thrownBy { out.push(0) } + an[IllegalArgumentException] should be thrownBy { in.grab() } + } + + s"ignore push while cancelling (chasing = $chasing)" in new PortTestSetup(chasing) { + in.pull() + stepAll() + clearEvents() + + in.cancel() + out.push(0) + // While the push event is not enqueued at this point, we should still report the state correctly + out.isAvailable should be(false) + + stepAll() + + lastEvents() should be(Set(Cancel(out))) + out.isAvailable should be(false) + out.isClosed should be(true) + in.isAvailable should be(false) + in.hasBeenPulled should be(false) + in.isClosed should be(true) + an[IllegalArgumentException] should be thrownBy { in.pull() } + an[IllegalArgumentException] should be thrownBy { out.push(0) } + an[IllegalArgumentException] should be thrownBy { in.grab() } + } + + s"clear ungrabbed element even when cancelled (chasing = $chasing)" in new PortTestSetup(chasing) { + in.pull() + stepAll() + clearEvents() + out.push(0) + stepAll() + + lastEvents() should be(Set(OnNext(in, 0))) + + in.cancel() + + lastEvents() should be(Set.empty) + out.isAvailable should be(false) + out.isClosed should be(false) + in.isAvailable should be(false) + in.hasBeenPulled should be(false) + in.isClosed should be(true) + an[IllegalArgumentException] should be thrownBy { in.pull() } + an[IllegalArgumentException] should be thrownBy { in.grab() } + + stepAll() + lastEvents() should be(Set(Cancel(out))) + out.isAvailable should be(false) + out.isClosed should be(true) + in.isAvailable should be(false) + in.hasBeenPulled should be(false) + in.isClosed should be(true) + an[IllegalArgumentException] should be thrownBy { in.pull() } + an[IllegalArgumentException] should be thrownBy { out.push(0) } + an[IllegalArgumentException] should be thrownBy { in.grab() } + } + + s"ignore any completion if they are concurrent (cancel first) (chasing = $chasing)" in new PortTestSetup(chasing) { + in.cancel() + out.complete() + + stepAll() + + lastEvents() should be(Set.empty) + out.isAvailable should be(false) + out.isClosed should be(true) + in.isAvailable should be(false) + in.hasBeenPulled should be(false) + in.isClosed should be(true) + an[IllegalArgumentException] should be thrownBy { in.pull() } + an[IllegalArgumentException] should be thrownBy { out.push(0) } + an[IllegalArgumentException] should be thrownBy { in.grab() } + } + + s"ignore any completion if they are concurrent (complete first) (chasing = $chasing)" in new PortTestSetup(chasing) { + out.complete() + in.cancel() + + stepAll() + + lastEvents() should be(Set.empty) + out.isAvailable should be(false) + out.isClosed should be(true) + in.isAvailable should be(false) + in.hasBeenPulled should be(false) + in.isClosed should be(true) + an[IllegalArgumentException] should be thrownBy { in.pull() } + an[IllegalArgumentException] should be thrownBy { out.push(0) } + an[IllegalArgumentException] should be thrownBy { in.grab() } + } + + s"ignore completion from a push-complete if cancelled while in flight (chasing = $chasing)" in new PortTestSetup(chasing) { + in.pull() + stepAll() + clearEvents() + + out.push(0) + out.complete() + in.cancel() + + stepAll() + + lastEvents() should be(Set.empty) + out.isAvailable should be(false) + out.isClosed should be(true) + in.isAvailable should be(false) + in.hasBeenPulled should be(false) + in.isClosed should be(true) + an[IllegalArgumentException] should be thrownBy { in.pull() } + an[IllegalArgumentException] should be thrownBy { out.push(0) } + an[IllegalArgumentException] should be thrownBy { in.grab() } + } + + s"ignore completion from a push-complete if cancelled after onPush (chasing = $chasing)" in new PortTestSetup(chasing) { + in.pull() + stepAll() + clearEvents() + + out.push(0) + out.complete() + + step() + + lastEvents() should be(Set(OnNext(in, 0))) + out.isAvailable should be(false) + out.isClosed should be(true) + in.isAvailable should be(true) + in.hasBeenPulled should be(false) + in.isClosed should be(false) + an[IllegalArgumentException] should be thrownBy { out.push(0) } + in.grab() should ===(0) + + in.cancel() + stepAll() + + lastEvents() should be(Set.empty) + out.isAvailable should be(false) + out.isClosed should be(true) + in.isAvailable should be(false) + in.hasBeenPulled should be(false) + in.isClosed should be(true) + an[IllegalArgumentException] should be thrownBy { in.pull() } + an[IllegalArgumentException] should be thrownBy { out.push(0) } + an[IllegalArgumentException] should be thrownBy { in.grab() } + } + + s"not allow to grab element before it arrives (chasing = $chasing)" in new PortTestSetup(chasing) { + in.pull() + stepAll() + out.push(0) + + an[IllegalArgumentException] should be thrownBy { in.grab() } + } + + s"not allow to grab element if already cancelled (chasing = $chasing)" in new PortTestSetup(chasing) { + in.pull() + stepAll() + + out.push(0) + in.cancel() + + stepAll() + + an[IllegalArgumentException] should be thrownBy { in.grab() } + } + + s"propagate failure while downstream is active (chasing = $chasing)" in new PortTestSetup(chasing) { + lastEvents() should be(Set.empty) + out.isAvailable should be(false) + out.isClosed should be(false) + in.isAvailable should be(false) + in.hasBeenPulled should be(false) + in.isClosed should be(false) + + out.fail(TE("test")) + + lastEvents() should be(Set.empty) + out.isAvailable should be(false) + out.isClosed should be(true) + in.isAvailable should be(false) + in.hasBeenPulled should be(false) + in.isClosed should be(false) + an[IllegalArgumentException] should be thrownBy { out.push(0) } + + stepAll() + + lastEvents() should be(Set(OnError(in, TE("test")))) + out.isAvailable should be(false) + out.isClosed should be(true) + in.isAvailable should be(false) + in.hasBeenPulled should be(false) + in.isClosed should be(true) + an[IllegalArgumentException] should be thrownBy { in.pull() } + an[IllegalArgumentException] should be thrownBy { out.push(0) } + an[IllegalArgumentException] should be thrownBy { in.grab() } + + in.cancel() // This should have no effect now + stepAll() + + lastEvents() should be(Set.empty) + out.isAvailable should be(false) + out.isClosed should be(true) + in.isAvailable should be(false) + in.hasBeenPulled should be(false) + in.isClosed should be(true) + an[IllegalArgumentException] should be thrownBy { in.pull() } + an[IllegalArgumentException] should be thrownBy { out.push(0) } + an[IllegalArgumentException] should be thrownBy { in.grab() } + + out.complete() // This should have no effect now + stepAll() + + lastEvents() should be(Set.empty) + out.isAvailable should be(false) + out.isClosed should be(true) + in.isAvailable should be(false) + in.hasBeenPulled should be(false) + in.isClosed should be(true) + an[IllegalArgumentException] should be thrownBy { in.pull() } + an[IllegalArgumentException] should be thrownBy { out.push(0) } + an[IllegalArgumentException] should be thrownBy { in.grab() } + } + + s"propagate failure while upstream is active (chasing = $chasing)" in new PortTestSetup(chasing) { + in.pull() + stepAll() + + lastEvents() should be(Set(RequestOne(out))) + out.isAvailable should be(true) + out.isClosed should be(false) + in.isAvailable should be(false) + in.hasBeenPulled should be(true) + in.isClosed should be(false) + + out.fail(TE("test")) + + lastEvents() should be(Set.empty) + out.isAvailable should be(false) + out.isClosed should be(true) + in.isAvailable should be(false) + in.hasBeenPulled should be(true) + in.isClosed should be(false) + an[IllegalArgumentException] should be thrownBy { out.push(0) } + + stepAll() + + lastEvents() should be(Set(OnError(in, TE("test")))) + out.isAvailable should be(false) + out.isClosed should be(true) + in.isAvailable should be(false) + in.hasBeenPulled should be(false) + in.isClosed should be(true) + an[IllegalArgumentException] should be thrownBy { in.pull() } + an[IllegalArgumentException] should be thrownBy { out.push(0) } + an[IllegalArgumentException] should be thrownBy { in.grab() } + + in.cancel() // This should have no effect now + stepAll() + + lastEvents() should be(Set.empty) + out.isAvailable should be(false) + out.isClosed should be(true) + in.isAvailable should be(false) + in.hasBeenPulled should be(false) + in.isClosed should be(true) + an[IllegalArgumentException] should be thrownBy { in.pull() } + an[IllegalArgumentException] should be thrownBy { out.push(0) } + an[IllegalArgumentException] should be thrownBy { in.grab() } + + out.complete() // This should have no effect now + stepAll() + + lastEvents() should be(Set.empty) + out.isAvailable should be(false) + out.isClosed should be(true) + in.isAvailable should be(false) + in.hasBeenPulled should be(false) + in.isClosed should be(true) + an[IllegalArgumentException] should be thrownBy { in.pull() } + an[IllegalArgumentException] should be thrownBy { out.push(0) } + an[IllegalArgumentException] should be thrownBy { in.grab() } + + } + + s"propagate failure while pull is in flight (chasing = $chasing)" in new PortTestSetup(chasing) { + in.pull() + + lastEvents() should be(Set.empty) + out.isAvailable should be(false) + out.isClosed should be(false) + in.isAvailable should be(false) + in.hasBeenPulled should be(true) + in.isClosed should be(false) + + out.fail(TE("test")) + + lastEvents() should be(Set.empty) + out.isAvailable should be(false) + out.isClosed should be(true) + in.isAvailable should be(false) + in.hasBeenPulled should be(true) + in.isClosed should be(false) + an[IllegalArgumentException] should be thrownBy { out.push(0) } + + stepAll() + + lastEvents() should be(Set(OnError(in, TE("test")))) + out.isAvailable should be(false) + out.isClosed should be(true) + in.isAvailable should be(false) + in.hasBeenPulled should be(false) + in.isClosed should be(true) + an[IllegalArgumentException] should be thrownBy { in.pull() } + an[IllegalArgumentException] should be thrownBy { out.push(0) } + an[IllegalArgumentException] should be thrownBy { in.grab() } + + in.cancel() // This should have no effect now + stepAll() + + lastEvents() should be(Set.empty) + out.isAvailable should be(false) + out.isClosed should be(true) + in.isAvailable should be(false) + in.hasBeenPulled should be(false) + in.isClosed should be(true) + an[IllegalArgumentException] should be thrownBy { in.pull() } + an[IllegalArgumentException] should be thrownBy { out.push(0) } + an[IllegalArgumentException] should be thrownBy { in.grab() } + + out.complete() // This should have no effect now + stepAll() + + lastEvents() should be(Set.empty) + out.isAvailable should be(false) + out.isClosed should be(true) + in.isAvailable should be(false) + in.hasBeenPulled should be(false) + in.isClosed should be(true) + an[IllegalArgumentException] should be thrownBy { in.pull() } + an[IllegalArgumentException] should be thrownBy { out.push(0) } + an[IllegalArgumentException] should be thrownBy { in.grab() } + } + + s"propagate failure while push is in flight (chasing = $chasing)" in new PortTestSetup(chasing) { + in.pull() + stepAll() + clearEvents() + + out.push(0) + + lastEvents() should be(Set.empty) + out.isAvailable should be(false) + out.isClosed should be(false) + in.isAvailable should be(false) + in.hasBeenPulled should be(true) + in.isClosed should be(false) + + out.fail(TE("test")) + + lastEvents() should be(Set.empty) + out.isAvailable should be(false) + out.isClosed should be(true) + in.isAvailable should be(false) + in.hasBeenPulled should be(true) + in.isClosed should be(false) + an[IllegalArgumentException] should be thrownBy { out.push(0) } + + step() + + lastEvents() should be(Set(OnNext(in, 0))) + out.isAvailable should be(false) + out.isClosed should be(true) + in.isAvailable should be(true) + in.hasBeenPulled should be(false) + in.isClosed should be(false) + an[IllegalArgumentException] should be thrownBy { out.push(0) } + in.grab() should ===(0) + an[IllegalArgumentException] should be thrownBy { in.grab() } + + step() + + lastEvents() should be(Set(OnError(in, TE("test")))) + out.isAvailable should be(false) + out.isClosed should be(true) + in.isAvailable should be(false) + in.hasBeenPulled should be(false) + in.isClosed should be(true) + an[IllegalArgumentException] should be thrownBy { in.pull() } + an[IllegalArgumentException] should be thrownBy { out.push(0) } + an[IllegalArgumentException] should be thrownBy { in.grab() } + + out.complete() // This should have no effect now + stepAll() + + lastEvents() should be(Set.empty) + out.isAvailable should be(false) + out.isClosed should be(true) + in.isAvailable should be(false) + in.hasBeenPulled should be(false) + in.isClosed should be(true) + an[IllegalArgumentException] should be thrownBy { in.pull() } + an[IllegalArgumentException] should be thrownBy { out.push(0) } + an[IllegalArgumentException] should be thrownBy { in.grab() } + } + + s"propagate failure while push is in flight and keep ungrabbed element (chasing = $chasing)" in new PortTestSetup(chasing) { + in.pull() + stepAll() + clearEvents() + + out.push(0) + out.fail(TE("test")) + step() + + lastEvents() should be(Set(OnNext(in, 0))) + step() + + lastEvents() should be(Set(OnError(in, TE("test")))) + out.isAvailable should be(false) + out.isClosed should be(true) + in.isAvailable should be(true) + in.hasBeenPulled should be(false) + in.isClosed should be(true) + in.grab() should ===(0) + } + + s"ignore pull while failing (chasing = $chasing)" in new PortTestSetup(chasing) { + out.fail(TE("test")) + in.pull() + in.hasBeenPulled should be(true) + + stepAll() + + lastEvents() should be(Set(OnError(in, TE("test")))) + out.isAvailable should be(false) + out.isClosed should be(true) + in.isAvailable should be(false) + in.hasBeenPulled should be(false) + in.isClosed should be(true) + an[IllegalArgumentException] should be thrownBy { in.pull() } + an[IllegalArgumentException] should be thrownBy { out.push(0) } + an[IllegalArgumentException] should be thrownBy { in.grab() } + } + + s"ignore any failure completion if they are concurrent (cancel first) (chasing = $chasing)" in new PortTestSetup(chasing) { + in.cancel() + out.fail(TE("test")) + + stepAll() + + lastEvents() should be(Set.empty) + out.isAvailable should be(false) + out.isClosed should be(true) + in.isAvailable should be(false) + in.hasBeenPulled should be(false) + in.isClosed should be(true) + an[IllegalArgumentException] should be thrownBy { in.pull() } + an[IllegalArgumentException] should be thrownBy { out.push(0) } + an[IllegalArgumentException] should be thrownBy { in.grab() } + } + + s"ignore any failure completion if they are concurrent (complete first) (chasing = $chasing)" in new PortTestSetup(chasing) { + out.fail(TE("test")) + in.cancel() + + stepAll() + + lastEvents() should be(Set.empty) + out.isAvailable should be(false) + out.isClosed should be(true) + in.isAvailable should be(false) + in.hasBeenPulled should be(false) + in.isClosed should be(true) + an[IllegalArgumentException] should be thrownBy { in.pull() } + an[IllegalArgumentException] should be thrownBy { out.push(0) } + an[IllegalArgumentException] should be thrownBy { in.grab() } + } + + s"ignore failure from a push-then-fail if cancelled while in flight (chasing = $chasing)" in new PortTestSetup(chasing) { + in.pull() + stepAll() + clearEvents() + + out.push(0) + out.fail(TE("test")) + in.cancel() + + stepAll() + + lastEvents() should be(Set.empty) + out.isAvailable should be(false) + out.isClosed should be(true) + in.isAvailable should be(false) + in.hasBeenPulled should be(false) + in.isClosed should be(true) + an[IllegalArgumentException] should be thrownBy { in.pull() } + an[IllegalArgumentException] should be thrownBy { out.push(0) } + an[IllegalArgumentException] should be thrownBy { in.grab() } + } + + s"ignore failure from a push-then-fail if cancelled after onPush (chasing = $chasing)" in new PortTestSetup(chasing) { + in.pull() + stepAll() + clearEvents() + + out.push(0) + out.fail(TE("test")) + + step() + + lastEvents() should be(Set(OnNext(in, 0))) + out.isAvailable should be(false) + out.isClosed should be(true) + in.isAvailable should be(true) + in.hasBeenPulled should be(false) + in.isClosed should be(false) + an[IllegalArgumentException] should be thrownBy { out.push(0) } + in.grab() should ===(0) + + in.cancel() + stepAll() + + lastEvents() should be(Set.empty) + out.isAvailable should be(false) + out.isClosed should be(true) + in.isAvailable should be(false) + in.hasBeenPulled should be(false) + in.isClosed should be(true) + an[IllegalArgumentException] should be thrownBy { in.pull() } + an[IllegalArgumentException] should be thrownBy { out.push(0) } + an[IllegalArgumentException] should be thrownBy { in.grab() } + } } } diff --git a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/GraphInterpreterSpecKit.scala b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/GraphInterpreterSpecKit.scala index c0bd29dd10..f571886fe4 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/GraphInterpreterSpecKit.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/GraphInterpreterSpecKit.scala @@ -176,10 +176,29 @@ trait GraphInterpreterSpecKit extends StreamSpec { } - abstract class PortTestSetup extends TestSetup { + abstract class PortTestSetup(chasing: Boolean = false) extends TestSetup { val out = new UpstreamPortProbe[Int] val in = new DownstreamPortProbe[Int] + class EventPropagateStage extends GraphStage[FlowShape[Int, Int]] { + val in = Inlet[Int]("Propagate.in") + val out = Outlet[Int]("Propagate.out") + override val shape: FlowShape[Int, Int] = FlowShape(in, out) + + override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with InHandler with OutHandler { + override def onPush(): Unit = push(out, grab(in)) + override def onPull(): Unit = pull(in) + override def onUpstreamFinish(): Unit = complete(out) + override def onUpstreamFailure(ex: Throwable): Unit = fail(out, ex) + override def onDownstreamFinish(): Unit = cancel(in) + + setHandlers(in, out, this) + } + } + + // step() means different depending whether we have a stage between the two probes or not + override def step(): Unit = interpreter.execute(eventLimit = if (!chasing) 1 else 2) + class UpstreamPortProbe[T] extends UpstreamProbe[T]("upstreamPort") { def isAvailable: Boolean = isAvailable(out) def isClosed: Boolean = isClosed(out) @@ -215,16 +234,27 @@ trait GraphInterpreterSpecKit extends StreamSpec { }) } - private val assembly = new GraphAssembly( - stages = Array.empty, - originalAttributes = Array.empty, - ins = Array(null), - inOwners = Array(-1), - outs = Array(null), - outOwners = Array(-1)) + private val assembly = if (!chasing) { + new GraphAssembly( + stages = Array.empty, + originalAttributes = Array.empty, + ins = Array(null), + inOwners = Array(-1), + outs = Array(null), + outOwners = Array(-1)) + } else { + val propagateStage = new EventPropagateStage + new GraphAssembly( + stages = Array(propagateStage), + originalAttributes = Array(Attributes.none), + ins = Array(propagateStage.in, null), + inOwners = Array(0, -1), + outs = Array(null, propagateStage.out), + outOwners = Array(-1, 0)) + } manualInit(assembly) - interpreter.attachDownstreamBoundary(interpreter.connections(0), in) + interpreter.attachDownstreamBoundary(interpreter.connections(if (chasing) 1 else 0), in) interpreter.attachUpstreamBoundary(interpreter.connections(0), out) interpreter.init(null) } diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphInterpreter.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphInterpreter.scala index 57a1d6ea5c..dfc8fc8d6c 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphInterpreter.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphInterpreter.scala @@ -842,6 +842,12 @@ final class GraphInterpreter( connection.portState = currentState | (OutClosed | InFailed) connection.slot = Failed(ex, connection.slot) if ((currentState & (Pulling | Pushing)) == 0) enqueue(connection) + else if (chasedPush eq connection) { + // Abort chasing so Failure is not lost (chasing does NOT decode the event but assumes it to be a PUSH + // but we just changed the event!) + chasedPush = NoEvent + enqueue(connection) + } } if ((currentState & OutClosed) == 0) completeConnection(connection.outOwnerId) } @@ -853,6 +859,12 @@ final class GraphInterpreter( if ((currentState & OutClosed) == 0) { connection.slot = Empty if ((currentState & (Pulling | Pushing | InClosed)) == 0) enqueue(connection) + else if (chasedPull eq connection) { + // Abort chasing so Cancel is not lost (chasing does NOT decode the event but assumes it to be a PULL + // but we just changed the event!) + chasedPull = NoEvent + enqueue(connection) + } } if ((currentState & InClosed) == 0) completeConnection(connection.inOwnerId) }