diff --git a/akka-bench-jmh-dev/src/main/scala/akka/stream/InterpreterBenchmark.scala b/akka-bench-jmh-dev/src/main/scala/akka/stream/InterpreterBenchmark.scala index 5d924bf16c..370a54f7a2 100644 --- a/akka-bench-jmh-dev/src/main/scala/akka/stream/InterpreterBenchmark.scala +++ b/akka-bench-jmh-dev/src/main/scala/akka/stream/InterpreterBenchmark.scala @@ -1,7 +1,7 @@ package akka.stream import akka.event._ -import akka.stream.impl.fusing.{ InterpreterSpecKit, GraphInterpreterSpec, GraphStages, Map => MapStage, OneBoundedInterpreter } +import akka.stream.impl.fusing.{ GraphInterpreterSpecKit, GraphStages, Map => MapStage, OneBoundedInterpreter } import akka.stream.impl.fusing.GraphStages.Identity import akka.stream.impl.fusing.GraphInterpreter.{ DownstreamBoundaryStageLogic, UpstreamBoundaryStageLogic } import akka.stream.stage._ @@ -26,34 +26,33 @@ class InterpreterBenchmark { @Benchmark @OperationsPerInvocation(100000) def graph_interpreter_100k_elements() { - val lock = new Lock() - lock.acquire() - new GraphInterpreterSpec.TestSetup() { - val identities = Vector.fill(numberOfIds)(new Identity[Int]) - val source = new GraphDataSource("source", data100k) - val sink = new GraphDataSink[Int]("sink", data100k.size, lock) + new GraphInterpreterSpecKit { + new TestSetup { + val identities = Vector.fill(numberOfIds)(new Identity[Int]) + val source = new GraphDataSource("source", data100k) + val sink = new GraphDataSink[Int]("sink", data100k.size) - val b = builder(identities:_*) - .connect(source, identities.head.in) - .connect(identities.last.out, sink) + val b = builder(identities: _*) + .connect(source, identities.head.in) + .connect(identities.last.out, sink) - for (i <- (0 until identities.size - 1)) { - b.connect(identities(i).out, identities(i + 1).in) + // FIXME: This should not be here, this is pure setup overhead + for (i <- (0 until identities.size - 1)) { + b.connect(identities(i).out, identities(i + 1).in) + } + + b.init() + sink.requestOne() + interpreter.execute(Int.MaxValue) } - - b.init() - sink.requestOne() - interpreter.execute(Int.MaxValue) } - lock.acquire() } @Benchmark @OperationsPerInvocation(100000) def onebounded_interpreter_100k_elements() { - val lock = new Lock() - lock.acquire() - val sink = OneBoundedDataSink(data100k.size, lock) + val sink = OneBoundedDataSink(data100k.size) + // FIXME: This should not be here, this is pure setup overhead val ops = Vector.fill(numberOfIds)(MapStage(identity[Int], Supervision.stoppingDecider)) val interpreter = new OneBoundedInterpreter(OneBoundedDataSource(data100k) +: ops :+ sink, (op, ctx, event) ⇒ (), @@ -63,7 +62,6 @@ class InterpreterBenchmark { forkLimit = 100, overflowToHeap = false) interpreter.init() sink.requestOne() - lock.acquire() } } @@ -87,16 +85,14 @@ object InterpreterBenchmark { }) } - case class GraphDataSink[T](override val toString: String, var expected: Int, completionLock: Lock) extends DownstreamBoundaryStageLogic[T] { + case class GraphDataSink[T](override val toString: String, var expected: Int) extends DownstreamBoundaryStageLogic[T] { val in = Inlet[T]("in") setHandler(in, new InHandler { override def onPush(): Unit = { expected -= 1 - pull(in) - if (expected == 0) { - completionLock.release() - } + if (expected > 0) pull(in) + // Otherwise do nothing, it will exit the interpreter } override def onUpstreamFinish(): Unit = completeStage() override def onUpstreamFailure(ex: Throwable): Unit = failStage(ex) @@ -126,13 +122,11 @@ object InterpreterBenchmark { throw new UnsupportedOperationException("Cannot push the boundary") } - case class OneBoundedDataSink(var expected: Int, completionLock: Lock) extends BoundaryStage { + case class OneBoundedDataSink(var expected: Int) extends BoundaryStage { override def onPush(elem: Any, ctx: BoundaryContext): Directive = { expected -= 1 - if (expected == 0) { - completionLock.release() - } - ctx.pull() + if (expected == 0) ctx.exit() + else ctx.pull() } override def onUpstreamFinish(ctx: BoundaryContext): TerminationDirective = { diff --git a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/GraphInterpreterPortsSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/GraphInterpreterPortsSpec.scala new file mode 100644 index 0000000000..d32c98146c --- /dev/null +++ b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/GraphInterpreterPortsSpec.scala @@ -0,0 +1,759 @@ +/** + * Copyright (C) 2015 Typesafe Inc. + */ +package akka.stream.impl.fusing + +import akka.stream.testkit.AkkaSpec + +class GraphInterpreterPortsSpec extends AkkaSpec with GraphInterpreterSpecKit { + + "Port states" must { + + "properly transition on push and pull" in new PortTestSetup { + lastEvents() should be(Set.empty) + out.isAvailable should be(false) + out.isClosed should be(false) + in.isAvailable should be(false) + in.hasBeenPulled should be(false) + in.isClosed should be(false) + an[IllegalArgumentException] should be thrownBy { out.push(0) } + an[IllegalArgumentException] should be thrownBy { in.grab() } + + in.pull() + + lastEvents() should be(Set.empty) + out.isAvailable should be(false) + out.isClosed should be(false) + in.isAvailable should be(false) + in.hasBeenPulled should be(true) + in.isClosed should be(false) + an[IllegalArgumentException] should be thrownBy { in.pull() } + an[IllegalArgumentException] should be thrownBy { out.push(0) } + an[IllegalArgumentException] should be thrownBy { in.grab() } + + stepAll() + + lastEvents() should be(Set(RequestOne(out))) + out.isAvailable should be(true) + out.isClosed should be(false) + in.isAvailable should be(false) + in.hasBeenPulled should be(true) + in.isClosed should be(false) + an[IllegalArgumentException] should be thrownBy { in.pull() } + an[IllegalArgumentException] should be thrownBy { in.grab() } + + out.push(0) + + lastEvents() should be(Set.empty) + out.isAvailable should be(false) + out.isClosed should be(false) + in.isAvailable should be(false) + in.hasBeenPulled should be(true) + in.isClosed should be(false) + an[IllegalArgumentException] should be thrownBy { in.pull() } + an[IllegalArgumentException] should be thrownBy { out.push(0) } + an[IllegalArgumentException] should be thrownBy { in.grab() } + + stepAll() + + lastEvents() should be(Set(OnNext(in, 0))) + out.isAvailable should be(false) + out.isClosed should be(false) + in.isAvailable should be(true) + in.hasBeenPulled should be(false) + in.isClosed should be(false) + an[IllegalArgumentException] should be thrownBy { out.push(0) } + + in.grab() should ===(0) + + lastEvents() should be(Set.empty) + out.isAvailable should be(false) + out.isClosed should be(false) + in.isAvailable should be(false) + in.hasBeenPulled should be(false) + in.isClosed should be(false) + an[IllegalArgumentException] should be thrownBy { out.push(0) } + an[IllegalArgumentException] should be thrownBy { in.grab() } + + // Cycle completed + } + + "drop ungrabbed element on pull" in new PortTestSetup { + in.pull() + step() + clearEvents() + out.push(0) + step() + + lastEvents() should be(Set(OnNext(in, 0))) + + in.pull() + + an[IllegalArgumentException] should be thrownBy { in.grab() } + } + + "propagate complete while downstream is active" in new PortTestSetup { + lastEvents() should be(Set.empty) + out.isAvailable should be(false) + out.isClosed should be(false) + in.isAvailable should be(false) + in.hasBeenPulled should be(false) + in.isClosed should be(false) + + out.complete() + + lastEvents() should be(Set.empty) + out.isAvailable should be(false) + out.isClosed should be(true) + in.isAvailable should be(false) + in.hasBeenPulled should be(false) + in.isClosed should be(false) + an[IllegalArgumentException] should be thrownBy { out.push(0) } + + stepAll() + + lastEvents() should be(Set(OnComplete(in))) + out.isAvailable should be(false) + out.isClosed should be(true) + in.isAvailable should be(false) + in.hasBeenPulled should be(false) + in.isClosed should be(true) + an[IllegalArgumentException] should be thrownBy { in.pull() } + an[IllegalArgumentException] should be thrownBy { out.push(0) } + an[IllegalArgumentException] should be thrownBy { in.grab() } + + in.cancel() // This should have no effect now + stepAll() + + lastEvents() should be(Set.empty) + out.isAvailable should be(false) + out.isClosed should be(true) + in.isAvailable should be(false) + in.hasBeenPulled should be(false) + in.isClosed should be(true) + an[IllegalArgumentException] should be thrownBy { in.pull() } + an[IllegalArgumentException] should be thrownBy { out.push(0) } + an[IllegalArgumentException] should be thrownBy { in.grab() } + + out.complete() // This should have no effect now + stepAll() + + lastEvents() should be(Set.empty) + out.isAvailable should be(false) + out.isClosed should be(true) + in.isAvailable should be(false) + in.hasBeenPulled should be(false) + in.isClosed should be(true) + an[IllegalArgumentException] should be thrownBy { in.pull() } + an[IllegalArgumentException] should be thrownBy { out.push(0) } + an[IllegalArgumentException] should be thrownBy { in.grab() } + } + + "propagate complete while upstream is active" in new PortTestSetup { + in.pull() + stepAll() + + lastEvents() should be(Set(RequestOne(out))) + out.isAvailable should be(true) + out.isClosed should be(false) + in.isAvailable should be(false) + in.hasBeenPulled should be(true) + in.isClosed should be(false) + + out.complete() + + lastEvents() should be(Set.empty) + out.isAvailable should be(false) + out.isClosed should be(true) + in.isAvailable should be(false) + in.hasBeenPulled should be(true) + in.isClosed should be(false) + an[IllegalArgumentException] should be thrownBy { out.push(0) } + + stepAll() + + lastEvents() should be(Set(OnComplete(in))) + out.isAvailable should be(false) + out.isClosed should be(true) + in.isAvailable should be(false) + in.hasBeenPulled should be(false) + in.isClosed should be(true) + an[IllegalArgumentException] should be thrownBy { in.pull() } + an[IllegalArgumentException] should be thrownBy { out.push(0) } + an[IllegalArgumentException] should be thrownBy { in.grab() } + + in.cancel() // This should have no effect now + stepAll() + + lastEvents() should be(Set.empty) + out.isAvailable should be(false) + out.isClosed should be(true) + in.isAvailable should be(false) + in.hasBeenPulled should be(false) + in.isClosed should be(true) + an[IllegalArgumentException] should be thrownBy { in.pull() } + an[IllegalArgumentException] should be thrownBy { out.push(0) } + an[IllegalArgumentException] should be thrownBy { in.grab() } + + out.complete() // This should have no effect now + stepAll() + + lastEvents() should be(Set.empty) + out.isAvailable should be(false) + out.isClosed should be(true) + in.isAvailable should be(false) + in.hasBeenPulled should be(false) + in.isClosed should be(true) + an[IllegalArgumentException] should be thrownBy { in.pull() } + an[IllegalArgumentException] should be thrownBy { out.push(0) } + an[IllegalArgumentException] should be thrownBy { in.grab() } + + } + + "propagate complete while pull is in flight" in new PortTestSetup { + in.pull() + + lastEvents() should be(Set.empty) + out.isAvailable should be(false) + out.isClosed should be(false) + in.isAvailable should be(false) + in.hasBeenPulled should be(true) + in.isClosed should be(false) + + out.complete() + + lastEvents() should be(Set.empty) + out.isAvailable should be(false) + out.isClosed should be(true) + in.isAvailable should be(false) + in.hasBeenPulled should be(true) + in.isClosed should be(false) + an[IllegalArgumentException] should be thrownBy { out.push(0) } + + stepAll() + + lastEvents() should be(Set(OnComplete(in))) + out.isAvailable should be(false) + out.isClosed should be(true) + in.isAvailable should be(false) + in.hasBeenPulled should be(false) + in.isClosed should be(true) + an[IllegalArgumentException] should be thrownBy { in.pull() } + an[IllegalArgumentException] should be thrownBy { out.push(0) } + an[IllegalArgumentException] should be thrownBy { in.grab() } + + in.cancel() // This should have no effect now + stepAll() + + lastEvents() should be(Set.empty) + out.isAvailable should be(false) + out.isClosed should be(true) + in.isAvailable should be(false) + in.hasBeenPulled should be(false) + in.isClosed should be(true) + an[IllegalArgumentException] should be thrownBy { in.pull() } + an[IllegalArgumentException] should be thrownBy { out.push(0) } + an[IllegalArgumentException] should be thrownBy { in.grab() } + + out.complete() // This should have no effect now + stepAll() + + lastEvents() should be(Set.empty) + out.isAvailable should be(false) + out.isClosed should be(true) + in.isAvailable should be(false) + in.hasBeenPulled should be(false) + in.isClosed should be(true) + an[IllegalArgumentException] should be thrownBy { in.pull() } + an[IllegalArgumentException] should be thrownBy { out.push(0) } + an[IllegalArgumentException] should be thrownBy { in.grab() } + } + + "propagate complete while push is in flight" in new PortTestSetup { + in.pull() + stepAll() + clearEvents() + + out.push(0) + + lastEvents() should be(Set.empty) + out.isAvailable should be(false) + out.isClosed should be(false) + in.isAvailable should be(false) + in.hasBeenPulled should be(true) + in.isClosed should be(false) + + out.complete() + + lastEvents() should be(Set.empty) + out.isAvailable should be(false) + out.isClosed should be(true) + in.isAvailable should be(false) + in.hasBeenPulled should be(true) + in.isClosed should be(false) + an[IllegalArgumentException] should be thrownBy { out.push(0) } + + step() + + lastEvents() should be(Set(OnNext(in, 0))) + out.isAvailable should be(false) + out.isClosed should be(true) + in.isAvailable should be(true) + in.hasBeenPulled should be(false) + in.isClosed should be(false) + an[IllegalArgumentException] should be thrownBy { out.push(0) } + in.grab() should ===(0) + an[IllegalArgumentException] should be thrownBy { in.grab() } + + step() + + lastEvents() should be(Set(OnComplete(in))) + out.isAvailable should be(false) + out.isClosed should be(true) + in.isAvailable should be(false) + in.hasBeenPulled should be(false) + in.isClosed should be(true) + an[IllegalArgumentException] should be thrownBy { in.pull() } + an[IllegalArgumentException] should be thrownBy { out.push(0) } + an[IllegalArgumentException] should be thrownBy { in.grab() } + + out.complete() // This should have no effect now + stepAll() + + lastEvents() should be(Set.empty) + out.isAvailable should be(false) + out.isClosed should be(true) + in.isAvailable should be(false) + in.hasBeenPulled should be(false) + in.isClosed should be(true) + an[IllegalArgumentException] should be thrownBy { in.pull() } + an[IllegalArgumentException] should be thrownBy { out.push(0) } + an[IllegalArgumentException] should be thrownBy { in.grab() } + } + + "propagate complete while push is in flight and keep ungrabbed element" in new PortTestSetup { + in.pull() + stepAll() + clearEvents() + + out.push(0) + out.complete() + step() + + lastEvents() should be(Set(OnNext(in, 0))) + step() + + lastEvents() should be(Set(OnComplete(in))) + out.isAvailable should be(false) + out.isClosed should be(true) + in.isAvailable should be(true) + in.hasBeenPulled should be(false) + in.isClosed should be(true) + in.grab() should ===(0) + } + + "ignore pull while completing" in new PortTestSetup { + out.complete() + in.pull() + + stepAll() + + lastEvents() should be(Set(OnComplete(in))) + out.isAvailable should be(false) + out.isClosed should be(true) + in.isAvailable should be(false) + in.hasBeenPulled should be(false) + in.isClosed should be(true) + an[IllegalArgumentException] should be thrownBy { in.pull() } + an[IllegalArgumentException] should be thrownBy { out.push(0) } + an[IllegalArgumentException] should be thrownBy { in.grab() } + } + + "propagate cancel while downstream is active" in new PortTestSetup { + lastEvents() should be(Set.empty) + out.isAvailable should be(false) + out.isClosed should be(false) + in.isAvailable should be(false) + in.hasBeenPulled should be(false) + in.isClosed should be(false) + + in.cancel() + + lastEvents() should be(Set.empty) + out.isAvailable should be(false) + out.isClosed should be(false) + in.isAvailable should be(false) + in.hasBeenPulled should be(false) + in.isClosed should be(true) + an[IllegalArgumentException] should be thrownBy { in.pull() } + an[IllegalArgumentException] should be thrownBy { in.grab() } + + stepAll() + + lastEvents() should be(Set(Cancel(out))) + out.isAvailable should be(false) + out.isClosed should be(true) + in.isAvailable should be(false) + in.hasBeenPulled should be(false) + in.isClosed should be(true) + an[IllegalArgumentException] should be thrownBy { in.pull() } + an[IllegalArgumentException] should be thrownBy { out.push(0) } + an[IllegalArgumentException] should be thrownBy { in.grab() } + + out.complete() // This should have no effect now + stepAll() + + lastEvents() should be(Set.empty) + out.isAvailable should be(false) + out.isClosed should be(true) + in.isAvailable should be(false) + in.hasBeenPulled should be(false) + in.isClosed should be(true) + an[IllegalArgumentException] should be thrownBy { in.pull() } + an[IllegalArgumentException] should be thrownBy { out.push(0) } + an[IllegalArgumentException] should be thrownBy { in.grab() } + + in.cancel() // This should have no effect now + stepAll() + + lastEvents() should be(Set.empty) + out.isAvailable should be(false) + out.isClosed should be(true) + in.isAvailable should be(false) + in.hasBeenPulled should be(false) + in.isClosed should be(true) + an[IllegalArgumentException] should be thrownBy { in.pull() } + an[IllegalArgumentException] should be thrownBy { out.push(0) } + an[IllegalArgumentException] should be thrownBy { in.grab() } + } + + "propagate cancel while upstream is active" in new PortTestSetup { + in.pull() + stepAll() + + lastEvents() should be(Set(RequestOne(out))) + out.isAvailable should be(true) + out.isClosed should be(false) + in.isAvailable should be(false) + in.hasBeenPulled should be(true) + in.isClosed should be(false) + + in.cancel() + + lastEvents() should be(Set.empty) + out.isAvailable should be(true) + out.isClosed should be(false) + in.isAvailable should be(false) + in.hasBeenPulled should be(false) + in.isClosed should be(true) + an[IllegalArgumentException] should be thrownBy { in.pull() } + an[IllegalArgumentException] should be thrownBy { in.grab() } + + stepAll() + + lastEvents() should be(Set(Cancel(out))) + out.isAvailable should be(false) + out.isClosed should be(true) + in.isAvailable should be(false) + in.hasBeenPulled should be(false) + in.isClosed should be(true) + an[IllegalArgumentException] should be thrownBy { in.pull() } + an[IllegalArgumentException] should be thrownBy { out.push(0) } + an[IllegalArgumentException] should be thrownBy { in.grab() } + + out.complete() // This should have no effect now + stepAll() + + lastEvents() should be(Set.empty) + out.isAvailable should be(false) + out.isClosed should be(true) + in.isAvailable should be(false) + in.hasBeenPulled should be(false) + in.isClosed should be(true) + an[IllegalArgumentException] should be thrownBy { in.pull() } + an[IllegalArgumentException] should be thrownBy { out.push(0) } + an[IllegalArgumentException] should be thrownBy { in.grab() } + + in.cancel() // This should have no effect now + stepAll() + + lastEvents() should be(Set.empty) + out.isAvailable should be(false) + out.isClosed should be(true) + in.isAvailable should be(false) + in.hasBeenPulled should be(false) + in.isClosed should be(true) + an[IllegalArgumentException] should be thrownBy { in.pull() } + an[IllegalArgumentException] should be thrownBy { out.push(0) } + an[IllegalArgumentException] should be thrownBy { in.grab() } + + } + + "propagate cancel while pull is in flight" in new PortTestSetup { + in.pull() + + lastEvents() should be(Set.empty) + out.isAvailable should be(false) + out.isClosed should be(false) + in.isAvailable should be(false) + in.hasBeenPulled should be(true) + in.isClosed should be(false) + + in.cancel() + + lastEvents() should be(Set.empty) + out.isAvailable should be(false) + out.isClosed should be(false) + in.isAvailable should be(false) + in.hasBeenPulled should be(false) + in.isClosed should be(true) + an[IllegalArgumentException] should be thrownBy { out.push(0) } + an[IllegalArgumentException] should be thrownBy { in.grab() } + + stepAll() + + lastEvents() should be(Set(Cancel(out))) + out.isAvailable should be(false) + out.isClosed should be(true) + in.isAvailable should be(false) + in.hasBeenPulled should be(false) + in.isClosed should be(true) + an[IllegalArgumentException] should be thrownBy { in.pull() } + an[IllegalArgumentException] should be thrownBy { out.push(0) } + an[IllegalArgumentException] should be thrownBy { in.grab() } + + out.complete() // This should have no effect now + stepAll() + + lastEvents() should be(Set.empty) + out.isAvailable should be(false) + out.isClosed should be(true) + in.isAvailable should be(false) + in.hasBeenPulled should be(false) + in.isClosed should be(true) + an[IllegalArgumentException] should be thrownBy { in.pull() } + an[IllegalArgumentException] should be thrownBy { out.push(0) } + an[IllegalArgumentException] should be thrownBy { in.grab() } + + in.cancel() // This should have no effect now + stepAll() + + lastEvents() should be(Set.empty) + out.isAvailable should be(false) + out.isClosed should be(true) + in.isAvailable should be(false) + in.hasBeenPulled should be(false) + in.isClosed should be(true) + an[IllegalArgumentException] should be thrownBy { in.pull() } + an[IllegalArgumentException] should be thrownBy { out.push(0) } + an[IllegalArgumentException] should be thrownBy { in.grab() } + } + + "propagate cancel while push is in flight" in new PortTestSetup { + in.pull() + stepAll() + clearEvents() + + out.push(0) + + lastEvents() should be(Set.empty) + out.isAvailable should be(false) + out.isClosed should be(false) + in.isAvailable should be(false) + in.hasBeenPulled should be(true) + in.isClosed should be(false) + + in.cancel() + + lastEvents() should be(Set.empty) + out.isAvailable should be(false) + out.isClosed should be(false) + in.isAvailable should be(false) + in.hasBeenPulled should be(false) + in.isClosed should be(true) + an[IllegalArgumentException] should be thrownBy { in.pull() } + + stepAll() + + lastEvents() should be(Set(Cancel(out))) + out.isAvailable should be(false) + out.isClosed should be(true) + in.isAvailable should be(false) + in.hasBeenPulled should be(false) + in.isClosed should be(true) + an[IllegalArgumentException] should be thrownBy { in.pull() } + an[IllegalArgumentException] should be thrownBy { out.push(0) } + an[IllegalArgumentException] should be thrownBy { in.grab() } + + out.complete() // This should have no effect now + stepAll() + + lastEvents() should be(Set.empty) + out.isAvailable should be(false) + out.isClosed should be(true) + in.isAvailable should be(false) + in.hasBeenPulled should be(false) + in.isClosed should be(true) + an[IllegalArgumentException] should be thrownBy { in.pull() } + an[IllegalArgumentException] should be thrownBy { out.push(0) } + an[IllegalArgumentException] should be thrownBy { in.grab() } + + in.cancel() // This should have no effect now + stepAll() + + lastEvents() should be(Set.empty) + out.isAvailable should be(false) + out.isClosed should be(true) + in.isAvailable should be(false) + in.hasBeenPulled should be(false) + in.isClosed should be(true) + an[IllegalArgumentException] should be thrownBy { in.pull() } + an[IllegalArgumentException] should be thrownBy { out.push(0) } + an[IllegalArgumentException] should be thrownBy { in.grab() } + } + + "ignore push while cancelling" in new PortTestSetup { + in.pull() + stepAll() + clearEvents() + + in.cancel() + out.push(0) + + stepAll() + + lastEvents() should be(Set(Cancel(out))) + out.isAvailable should be(false) + out.isClosed should be(true) + in.isAvailable should be(false) + in.hasBeenPulled should be(false) + in.isClosed should be(true) + an[IllegalArgumentException] should be thrownBy { in.pull() } + an[IllegalArgumentException] should be thrownBy { out.push(0) } + an[IllegalArgumentException] should be thrownBy { in.grab() } + } + + "clear ungrabbed element even when cancelled" in new PortTestSetup { + in.pull() + stepAll() + clearEvents() + out.push(0) + stepAll() + + lastEvents() should be(Set(OnNext(in, 0))) + + in.cancel() + + lastEvents() should be(Set.empty) + out.isAvailable should be(false) + out.isClosed should be(false) + in.isAvailable should be(false) + in.hasBeenPulled should be(false) + in.isClosed should be(true) + an[IllegalArgumentException] should be thrownBy { in.pull() } + an[IllegalArgumentException] should be thrownBy { in.grab() } + + stepAll() + lastEvents() should be(Set(Cancel(out))) + out.isAvailable should be(false) + out.isClosed should be(true) + in.isAvailable should be(false) + in.hasBeenPulled should be(false) + in.isClosed should be(true) + an[IllegalArgumentException] should be thrownBy { in.pull() } + an[IllegalArgumentException] should be thrownBy { out.push(0) } + an[IllegalArgumentException] should be thrownBy { in.grab() } + } + + "ignore any completion if they are concurrent (cancel first)" in new PortTestSetup { + in.cancel() + out.complete() + + stepAll() + + lastEvents() should be(Set.empty) + out.isAvailable should be(false) + out.isClosed should be(true) + in.isAvailable should be(false) + in.hasBeenPulled should be(false) + in.isClosed should be(true) + an[IllegalArgumentException] should be thrownBy { in.pull() } + an[IllegalArgumentException] should be thrownBy { out.push(0) } + an[IllegalArgumentException] should be thrownBy { in.grab() } + } + + "ignore any completion if they are concurrent (complete first)" in new PortTestSetup { + out.complete() + in.cancel() + + stepAll() + + lastEvents() should be(Set.empty) + out.isAvailable should be(false) + out.isClosed should be(true) + in.isAvailable should be(false) + in.hasBeenPulled should be(false) + in.isClosed should be(true) + an[IllegalArgumentException] should be thrownBy { in.pull() } + an[IllegalArgumentException] should be thrownBy { out.push(0) } + an[IllegalArgumentException] should be thrownBy { in.grab() } + } + + "ignore completion from a push-complete if cancelled while in flight" in new PortTestSetup { + in.pull() + stepAll() + clearEvents() + + out.push(0) + out.complete() + in.cancel() + + stepAll() + + lastEvents() should be(Set.empty) + out.isAvailable should be(false) + out.isClosed should be(true) + in.isAvailable should be(false) + in.hasBeenPulled should be(false) + in.isClosed should be(true) + an[IllegalArgumentException] should be thrownBy { in.pull() } + an[IllegalArgumentException] should be thrownBy { out.push(0) } + an[IllegalArgumentException] should be thrownBy { in.grab() } + } + + "ignore completion from a push-complete if cancelled after onPush" in new PortTestSetup { + in.pull() + stepAll() + clearEvents() + + out.push(0) + out.complete() + + step() + + lastEvents() should be(Set(OnNext(in, 0))) + out.isAvailable should be(false) + out.isClosed should be(true) + in.isAvailable should be(true) + in.hasBeenPulled should be(false) + in.isClosed should be(false) + an[IllegalArgumentException] should be thrownBy { out.push(0) } + in.grab() should ===(0) + + in.cancel() + stepAll() + + lastEvents() should be(Set.empty) + out.isAvailable should be(false) + out.isClosed should be(true) + in.isAvailable should be(false) + in.hasBeenPulled should be(false) + in.isClosed should be(true) + an[IllegalArgumentException] should be thrownBy { in.pull() } + an[IllegalArgumentException] should be thrownBy { out.push(0) } + an[IllegalArgumentException] should be thrownBy { in.grab() } + } + + } + +} diff --git a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/GraphInterpreterSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/GraphInterpreterSpec.scala index 99256fa069..b57a247013 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/GraphInterpreterSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/GraphInterpreterSpec.scala @@ -3,16 +3,10 @@ */ package akka.stream.impl.fusing -import akka.stream._ -import akka.stream.impl.fusing.GraphInterpreterSpec.TestSetup -import akka.stream.stage.{ InHandler, OutHandler, GraphStage, GraphStageLogic } import akka.stream.testkit.AkkaSpec import GraphInterpreter._ -import scala.collection.immutable - -class GraphInterpreterSpec extends AkkaSpec { - import GraphInterpreterSpec._ +class GraphInterpreterSpec extends AkkaSpec with GraphInterpreterSpecKit { import GraphStages._ "GraphInterpreter" must { @@ -26,8 +20,8 @@ class GraphInterpreterSpec extends AkkaSpec { val balance = new Balance[Int](2) "implement identity" in new TestSetup { - val source = UpstreamProbe[Int]("source") - val sink = DownstreamProbe[Int]("sink") + val source = new UpstreamProbe[Int]("source") + val sink = new DownstreamProbe[Int]("sink") builder(identity) .connect(source, identity.in) @@ -70,8 +64,8 @@ class GraphInterpreterSpec extends AkkaSpec { } "implement detacher stage" in new TestSetup { - val source = UpstreamProbe[Int]("source") - val sink = DownstreamProbe[Int]("sink") + val source = new UpstreamProbe[Int]("source") + val sink = new DownstreamProbe[Int]("sink") builder(detacher) .connect(source, detacher.in) @@ -349,113 +343,3 @@ class GraphInterpreterSpec extends AkkaSpec { } } - -object GraphInterpreterSpec { - - sealed trait TestEvent { - def source: GraphStageLogic - } - - case class OnComplete(source: GraphStageLogic) extends TestEvent - case class Cancel(source: GraphStageLogic) extends TestEvent - case class OnError(source: GraphStageLogic, cause: Throwable) extends TestEvent - case class OnNext(source: GraphStageLogic, elem: Any) extends TestEvent - case class RequestOne(source: GraphStageLogic) extends TestEvent - case class RequestAnother(source: GraphStageLogic) extends TestEvent - - abstract class TestSetup { - private var lastEvent: Set[TestEvent] = Set.empty - private var _interpreter: GraphInterpreter = _ - protected def interpreter: GraphInterpreter = _interpreter - - class AssemblyBuilder(stages: Seq[GraphStage[_ <: Shape]]) { - var upstreams = Vector.empty[(UpstreamBoundaryStageLogic[_], Inlet[_])] - var downstreams = Vector.empty[(Outlet[_], DownstreamBoundaryStageLogic[_])] - var connections = Vector.empty[(Outlet[_], Inlet[_])] - - def connect[T](upstream: UpstreamBoundaryStageLogic[T], in: Inlet[T]): AssemblyBuilder = { - upstreams :+= upstream -> in - this - } - - def connect[T](out: Outlet[T], downstream: DownstreamBoundaryStageLogic[T]): AssemblyBuilder = { - downstreams :+= out -> downstream - this - } - - def connect[T](out: Outlet[T], in: Inlet[T]): AssemblyBuilder = { - connections :+= out -> in - this - } - - def init(): Unit = { - val ins = upstreams.map(_._2) ++ connections.map(_._2) - val outs = connections.map(_._1) ++ downstreams.map(_._1) - val inOwners = ins.map { in ⇒ stages.indexWhere(_.shape.inlets.contains(in)) } - val outOwners = outs.map { out ⇒ stages.indexWhere(_.shape.outlets.contains(out)) } - - val assembly = GraphAssembly( - stages.toArray, - (ins ++ Vector.fill(downstreams.size)(null)).toArray, - (inOwners ++ Vector.fill(downstreams.size)(-1)).toArray, - (Vector.fill(upstreams.size)(null) ++ outs).toArray, - (Vector.fill(upstreams.size)(-1) ++ outOwners).toArray) - - val (inHandlers, outHandlers, logics, _) = assembly.materialize() - _interpreter = new GraphInterpreter(assembly, NoMaterializer, inHandlers, outHandlers, logics, (_, _, _) ⇒ ()) - - for ((upstream, i) ← upstreams.zipWithIndex) { - _interpreter.attachUpstreamBoundary(i, upstream._1) - } - - for ((downstream, i) ← downstreams.zipWithIndex) { - _interpreter.attachDownstreamBoundary(i + upstreams.size + connections.size, downstream._2) - } - - _interpreter.init() - } - } - - def manualInit(assembly: GraphAssembly): Unit = { - val (inHandlers, outHandlers, logics, _) = assembly.materialize() - _interpreter = new GraphInterpreter(assembly, NoMaterializer, inHandlers, outHandlers, logics, (_, _, _) ⇒ ()) - } - - def builder(stages: GraphStage[_ <: Shape]*): AssemblyBuilder = new AssemblyBuilder(stages.toSeq) - - def lastEvents(): Set[TestEvent] = { - val result = lastEvent - lastEvent = Set.empty - result - } - - case class UpstreamProbe[T](override val toString: String) extends UpstreamBoundaryStageLogic[T] { - val out = Outlet[T]("out") - - setHandler(out, new OutHandler { - override def onPull(): Unit = lastEvent += RequestOne(UpstreamProbe.this) - }) - - def onNext(elem: T, eventLimit: Int = Int.MaxValue): Unit = { - if (GraphInterpreter.Debug) println(s"----- NEXT: $this $elem") - push(out, elem) - interpreter.execute(eventLimit) - } - } - - case class DownstreamProbe[T](override val toString: String) extends DownstreamBoundaryStageLogic[T] { - val in = Inlet[T]("in") - - setHandler(in, new InHandler { - override def onPush(): Unit = lastEvent += OnNext(DownstreamProbe.this, grab(in)) - }) - - def requestOne(eventLimit: Int = Int.MaxValue): Unit = { - if (GraphInterpreter.Debug) println(s"----- REQ $this") - pull(in) - interpreter.execute(eventLimit) - } - } - - } -} diff --git a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/GraphInterpreterSpecKit.scala b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/GraphInterpreterSpecKit.scala new file mode 100644 index 0000000000..d1b40d3324 --- /dev/null +++ b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/GraphInterpreterSpecKit.scala @@ -0,0 +1,176 @@ +/** + * Copyright (C) 2015 Typesafe Inc. + */ +package akka.stream.impl.fusing + +import akka.stream.{ NoMaterializer, Outlet, Inlet, Shape } +import akka.stream.impl.fusing.GraphInterpreter.{ GraphAssembly, DownstreamBoundaryStageLogic, UpstreamBoundaryStageLogic } +import akka.stream.stage.{ InHandler, OutHandler, GraphStage, GraphStageLogic } + +trait GraphInterpreterSpecKit { + + sealed trait TestEvent { + def source: GraphStageLogic + } + + case class OnComplete(source: GraphStageLogic) extends TestEvent + case class Cancel(source: GraphStageLogic) extends TestEvent + case class OnError(source: GraphStageLogic, cause: Throwable) extends TestEvent + case class OnNext(source: GraphStageLogic, elem: Any) extends TestEvent + case class RequestOne(source: GraphStageLogic) extends TestEvent + case class RequestAnother(source: GraphStageLogic) extends TestEvent + + abstract class TestSetup { + protected var lastEvent: Set[TestEvent] = Set.empty + private var _interpreter: GraphInterpreter = _ + protected def interpreter: GraphInterpreter = _interpreter + + class AssemblyBuilder(stages: Seq[GraphStage[_ <: Shape]]) { + var upstreams = Vector.empty[(UpstreamBoundaryStageLogic[_], Inlet[_])] + var downstreams = Vector.empty[(Outlet[_], DownstreamBoundaryStageLogic[_])] + var connections = Vector.empty[(Outlet[_], Inlet[_])] + + def connect[T](upstream: UpstreamBoundaryStageLogic[T], in: Inlet[T]): AssemblyBuilder = { + upstreams :+= upstream -> in + this + } + + def connect[T](out: Outlet[T], downstream: DownstreamBoundaryStageLogic[T]): AssemblyBuilder = { + downstreams :+= out -> downstream + this + } + + def connect[T](out: Outlet[T], in: Inlet[T]): AssemblyBuilder = { + connections :+= out -> in + this + } + + def init(): Unit = { + val ins = upstreams.map(_._2) ++ connections.map(_._2) + val outs = connections.map(_._1) ++ downstreams.map(_._1) + val inOwners = ins.map { in ⇒ stages.indexWhere(_.shape.inlets.contains(in)) } + val outOwners = outs.map { out ⇒ stages.indexWhere(_.shape.outlets.contains(out)) } + + val assembly = GraphAssembly( + stages.toArray, + (ins ++ Vector.fill(downstreams.size)(null)).toArray, + (inOwners ++ Vector.fill(downstreams.size)(-1)).toArray, + (Vector.fill(upstreams.size)(null) ++ outs).toArray, + (Vector.fill(upstreams.size)(-1) ++ outOwners).toArray) + + val (inHandlers, outHandlers, logics, _) = assembly.materialize() + _interpreter = new GraphInterpreter(assembly, NoMaterializer, inHandlers, outHandlers, logics, (_, _, _) ⇒ ()) + + for ((upstream, i) ← upstreams.zipWithIndex) { + _interpreter.attachUpstreamBoundary(i, upstream._1) + } + + for ((downstream, i) ← downstreams.zipWithIndex) { + _interpreter.attachDownstreamBoundary(i + upstreams.size + connections.size, downstream._2) + } + + _interpreter.init() + } + } + + def manualInit(assembly: GraphAssembly): Unit = { + val (inHandlers, outHandlers, logics, _) = assembly.materialize() + _interpreter = new GraphInterpreter(assembly, NoMaterializer, inHandlers, outHandlers, logics, (_, _, _) ⇒ ()) + } + + def builder(stages: GraphStage[_ <: Shape]*): AssemblyBuilder = new AssemblyBuilder(stages.toSeq) + + def lastEvents(): Set[TestEvent] = { + val result = lastEvent + clearEvents() + result + } + + def clearEvents(): Unit = lastEvent = Set.empty + + class UpstreamProbe[T](override val toString: String) extends UpstreamBoundaryStageLogic[T] { + val out = Outlet[T]("out") + + setHandler(out, new OutHandler { + override def onPull(): Unit = lastEvent += RequestOne(UpstreamProbe.this) + override def onDownstreamFinish() = lastEvent += Cancel(UpstreamProbe.this) + }) + + def onNext(elem: T, eventLimit: Int = Int.MaxValue): Unit = { + if (GraphInterpreter.Debug) println(s"----- NEXT: $this $elem") + push(out, elem) + interpreter.execute(eventLimit) + } + } + + class DownstreamProbe[T](override val toString: String) extends DownstreamBoundaryStageLogic[T] { + val in = Inlet[T]("in") + + setHandler(in, new InHandler { + override def onPush(): Unit = lastEvent += OnNext(DownstreamProbe.this, grab(in)) + override def onUpstreamFinish() = lastEvent += OnComplete(DownstreamProbe.this) + override def onUpstreamFailure(ex: Throwable) = OnError(DownstreamProbe.this, ex) + }) + + def requestOne(eventLimit: Int = Int.MaxValue): Unit = { + if (GraphInterpreter.Debug) println(s"----- REQ $this") + pull(in) + interpreter.execute(eventLimit) + } + } + + } + + abstract class PortTestSetup extends TestSetup { + val out = new UpstreamPortProbe[Int] + val in = new DownstreamPortProbe[Int] + + class UpstreamPortProbe[T] extends UpstreamProbe[T]("upstreamPort") { + def isAvailable: Boolean = isAvailable(out) + def isClosed: Boolean = isClosed(out) + + def push(elem: T): Unit = push(out, elem) + def complete(): Unit = complete(out) + def fail(ex: Throwable): Unit = fail(out, ex) + } + + class DownstreamPortProbe[T] extends DownstreamProbe[T]("upstreamPort") { + def isAvailable: Boolean = isAvailable(in) + def hasBeenPulled: Boolean = hasBeenPulled(in) + def isClosed: Boolean = isClosed(in) + + def pull(): Unit = pull(in) + def cancel(): Unit = cancel(in) + def grab(): T = grab(in) + + setHandler(in, new InHandler { + + // Modified onPush that does not grab() automatically the element. This accesses some internals. + override def onPush(): Unit = + lastEvent += + OnNext( + DownstreamPortProbe.this, + interpreter.connectionStates(inToConn(in))) + + override def onUpstreamFinish() = lastEvent += OnComplete(DownstreamPortProbe.this) + override def onUpstreamFailure(ex: Throwable) = OnError(DownstreamPortProbe.this, ex) + }) + } + + def stepAll(): Unit = interpreter.execute(eventLimit = Int.MaxValue) + def step(): Unit = interpreter.execute(eventLimit = 1) + + private val assembly = GraphAssembly( + stages = Array.empty, + ins = Array(null), + inOwners = Array(-1), + outs = Array(null), + outOwners = Array(-1)) + + manualInit(assembly) + interpreter.attachDownstreamBoundary(0, in) + interpreter.attachUpstreamBoundary(0, out) + interpreter.init() + } + +} diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorGraphInterpreter.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorGraphInterpreter.scala index bcd7652c44..bc41e2492a 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorGraphInterpreter.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorGraphInterpreter.scala @@ -3,6 +3,8 @@ */ package akka.stream.impl.fusing +import java.util.concurrent.TimeoutException + import akka.actor._ import akka.event.Logging import akka.stream._ @@ -120,6 +122,7 @@ private[stream] object ActorGraphInterpreter { } def cancel(): Unit = { + downstreamCanceled = true if (!upstreamCompleted) { upstreamCompleted = true if (upstream ne null) tryCancel(upstream) @@ -137,7 +140,7 @@ private[stream] object ActorGraphInterpreter { } def onError(e: Throwable): Unit = - if (!upstreamCompleted) { + if (!upstreamCompleted || !downstreamCanceled) { upstreamCompleted = true clear() fail(out, e) @@ -200,8 +203,6 @@ private[stream] object ActorGraphInterpreter { // This flag is only used if complete/fail is called externally since this op turns into a Finished one inside the // interpreter (i.e. inside this op this flag has no effects since if it is completed the op will not be invoked) private var downstreamCompleted = false - // this is true while we “hold the ball”; while “false” incoming demand will just be queued up - private var upstreamWaiting = true // when upstream failed before we got the exposed publisher private var upstreamFailed: Option[Throwable] = None @@ -267,7 +268,7 @@ private[stream] object ActorGraphInterpreter { downstreamDemand += elements if (downstreamDemand < 0) downstreamDemand = Long.MaxValue // Long overflow, Reactive Streams Spec 3:17: effectively unbounded - if (!hasBeenPulled(in)) pull(in) + if (!hasBeenPulled(in) && !isClosed(in)) pull(in) } } @@ -303,16 +304,19 @@ private[stream] class ActorGraphInterpreter( logics, (logic, event, handler) ⇒ self ! AsyncInput(logic, event, handler)) - val inputs = Array.tabulate(shape.inlets.size)(new BatchingActorInputBoundary(settings.maxInputBufferSize, _)) - val outputs = Array.tabulate(shape.outlets.size)(new ActorOutputBoundary(self, _)) + private val inputs = Array.tabulate(shape.inlets.size)(new BatchingActorInputBoundary(settings.maxInputBufferSize, _)) + private val outputs = Array.tabulate(shape.outlets.size)(new ActorOutputBoundary(self, _)) + + private var subscribesPending = inputs.length + // Limits the number of events processed by the interpreter before scheduling a self-message for fairness with other // actors. - // TODO: Better heuristic here - val eventLimit = settings.maxInputBufferSize * assembly.stages.length * 4 // Roughly 4 events per element transfer + // TODO: Better heuristic here (take into account buffer size, connection count, 4 events per element, have a max) + val eventLimit = settings.maxInputBufferSize * (inputs.length + outputs.length) * 2 // Limits the number of events processed by the interpreter on an abort event. // TODO: Better heuristic here - val abortLimit = eventLimit * 2 - var resumeScheduled = false + private val abortLimit = eventLimit * 2 + private var resumeScheduled = false override def preStart(): Unit = { var i = 0 @@ -350,7 +354,6 @@ private[stream] class ActorGraphInterpreter( case NonFatal(e) ⇒ logic.failStage(e) } } - runBatch() // Initialization and completion messages @@ -363,6 +366,7 @@ private[stream] class ActorGraphInterpreter( inputs(id).onComplete() runBatch() case OnSubscribe(id: Int, subscription: Subscription) ⇒ + subscribesPending -= 1 inputs(id).onSubscribe(subscription) case Cancel(id: Int) ⇒ if (GraphInterpreter.Debug) println(s" cancel id=$id") @@ -375,16 +379,28 @@ private[stream] class ActorGraphInterpreter( } - override protected[akka] def aroundReceive(receive: Actor.Receive, msg: Any): Unit = { - super.aroundReceive(receive, msg) - + private def waitShutdown: Receive = { + case OnSubscribe(_, sub) ⇒ + tryCancel(sub) + subscribesPending -= 1 + if (subscribesPending == 0) context.stop(self) + case ReceiveTimeout ⇒ + tryAbort(new TimeoutException("Streaming actor has been already stopped processing (normally), but not all of its " + + s"inputs have been subscribed in [${settings.subscriptionTimeoutSettings.timeout}}]. Aborting actor now.")) + case _ ⇒ // Ignore, there is nothing to do anyway } private def runBatch(): Unit = { try { interpreter.execute(eventLimit) - if (interpreter.isCompleted) context.stop(self) - else if (interpreter.isSuspended && !resumeScheduled) { + if (interpreter.isCompleted) { + // Cannot stop right away if not completely subscribed + if (subscribesPending == 0) context.stop(self) + else { + context.become(waitShutdown) + context.setReceiveTimeout(settings.subscriptionTimeoutSettings.timeout) + } + } else if (interpreter.isSuspended && !resumeScheduled) { resumeScheduled = true self ! Resume } diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphInterpreter.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphInterpreter.scala index ab89994eac..84d472bdc2 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphInterpreter.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphInterpreter.scala @@ -26,16 +26,25 @@ private[stream] object GraphInterpreter { case object Empty sealed trait ConnectionState - sealed trait CompletedState extends ConnectionState - case object Pushable extends ConnectionState - case object Completed extends CompletedState - final case class PushCompleted(element: Any) extends ConnectionState - case object Cancelled extends CompletedState - final case class Failed(ex: Throwable) extends CompletedState + case object Pulled extends ConnectionState + + sealed trait HasElementState + + sealed trait CompletingState extends ConnectionState + final case class CompletedHasElement(element: Any) extends CompletingState with HasElementState + final case class PushCompleted(element: Any) extends CompletingState with HasElementState + case object Completed extends CompletingState + case object Cancelled extends CompletingState + final case class Failed(ex: Throwable) extends CompletingState val NoEvent = -1 val Boundary = -1 + sealed trait PortState + case object InFlight extends PortState + case object Available extends PortState + case object Closed extends PortState + abstract class UpstreamBoundaryStageLogic[T] extends GraphStageLogic { def out: Outlet[T] } @@ -160,10 +169,11 @@ private[stream] object GraphInterpreter { * while in the practical sense a connection is a number which represents slots in certain arrays. * In particular * - connectionStates is a mapping from a connection id to a current (or future) state of the connection - * - inAvailable is a mapping from a connection to a boolean that indicates whether the input corresponding - * to the connection is currently pullable - * - outAvailable is a mapping from a connection to a boolean that indicates whether the input corresponding - * to the connection is currently pushable + * - inStates is a mapping from a connection to a [[akka.stream.impl.fusing.GraphInterpreter.PortState]] + * that indicates whether the input corresponding + * to the connection is currently pullable or completed + * - outStates is a mapping from a connection to a [[akka.stream.impl.fusing.GraphInterpreter.PortState]] + * that indicates whether the input corresponding to the connection is currently pushable or completed * - inHandlers is a mapping from a connection id to the [[InHandler]] instance that handles the events corresponding * to the input port of the connection * - outHandlers is a mapping from a connection id to the [[OutHandler]] instance that handles the events corresponding @@ -177,14 +187,14 @@ private[stream] object GraphInterpreter { * * Sending an event is usually the following sequence: * - An action is requested by a stage logic (push, pull, complete, etc.) - * - the availability of the port is set on the sender side to false (inAvailable or outAvailable) + * - the availability of the port is set on the sender side to Limbo (inStates or outStates) * - the scheduled event is put in the slot of the connection in the connectionStates table * - the id of the affected connection is enqueued * * Receiving an event is usually the following sequence: * - id of connection to be processed is dequeued * - the type of the event is determined by the object in the corresponding connectionStates slot - * - the availability of the port is set on the receiver side to be true (inAvailable or outAvailable) + * - the availability of the port is set on the receiver side to be Available (inStates or outStates) * - using the inHandlers/outHandlers table the corresponding callback is called on the stage logic. * * Because of the FIFO construction of the queue the interpreter is fair, i.e. a pending event is always executed @@ -203,19 +213,19 @@ private[stream] final class GraphInterpreter( // Maintains the next event (and state) of the connection. // Technically the connection cannot be considered being in the state that is encoded here before the enqueued - // connection event has been processed. The inAvailable and outAvailable arrays usually protect access to this + // connection event has been processed. The inStates and outStates arrays usually protect access to this // field while it is in transient state. val connectionStates = Array.fill[Any](assembly.connectionCount)(Empty) // Indicates whether the input port is pullable. After pulling it becomes false // Be aware that when inAvailable goes to false outAvailable does not become true immediately, only after // the corresponding event in the queue has been processed - val inAvailable = Array.fill[Boolean](assembly.connectionCount)(true) + val inStates = Array.fill[PortState](assembly.connectionCount)(Available) // Indicates whether the output port is pushable. After pushing it becomes false // Be aware that when inAvailable goes to false outAvailable does not become true immediately, only after // the corresponding event in the queue has been processed - val outAvailable = Array.fill[Boolean](assembly.connectionCount)(false) + val outStates = Array.fill[PortState](assembly.connectionCount)(InFlight) // The number of currently running stages. Once this counter reaches zero, the interpreter is considered to be // completed @@ -228,8 +238,8 @@ private[stream] final class GraphInterpreter( } // An event queue implemented as a circular buffer - private val mask = 255 private val eventQueue = Array.ofDim[Int](256) + private val mask = eventQueue.length - 1 private var queueHead: Int = 0 private var queueTail: Int = 0 @@ -306,6 +316,7 @@ private[stream] final class GraphInterpreter( * true. */ def execute(eventLimit: Int): Unit = { + if (GraphInterpreter.Debug) println("---------------- EXECUTE") var eventsRemaining = eventLimit var connection = dequeue() while (eventsRemaining > 0 && connection != NoEvent) { @@ -314,13 +325,14 @@ private[stream] final class GraphInterpreter( case NonFatal(e) ⇒ val stageId = connectionStates(connection) match { case Failed(ex) ⇒ throw new IllegalStateException("Double fault. Failure while handling failure.", e) - case Pushable ⇒ assembly.outOwners(connection) + case Pulled ⇒ assembly.outOwners(connection) case Completed ⇒ assembly.inOwners(connection) case Cancelled ⇒ assembly.outOwners(connection) case PushCompleted(elem) ⇒ assembly.inOwners(connection) case pushedElem ⇒ assembly.inOwners(connection) } - logics(stageId).failStage(e) + if (stageId == Boundary) throw e + else logics(stageId).failStage(e) } eventsRemaining -= 1 if (eventsRemaining > 0) connection = dequeue() @@ -334,47 +346,55 @@ private[stream] final class GraphInterpreter( def processElement(elem: Any): Unit = { if (!isStageCompleted(assembly.inOwners(connection))) { if (GraphInterpreter.Debug) println(s"PUSH ${outOwnerName(connection)} -> ${inOwnerName(connection)}, $elem") - inAvailable(connection) = true + inStates(connection) = Available inHandlers(connection).onPush() } } connectionStates(connection) match { - case Pushable ⇒ + case Pulled ⇒ if (!isStageCompleted(assembly.outOwners(connection))) { if (GraphInterpreter.Debug) println(s"PULL ${inOwnerName(connection)} -> ${outOwnerName(connection)}") - outAvailable(connection) = true + outStates(connection) = Available outHandlers(connection).onPull() } - case Completed ⇒ + case Completed | CompletedHasElement(_) ⇒ val stageId = assembly.inOwners(connection) - if (!isStageCompleted(stageId)) { + if (!isStageCompleted(stageId) && inStates(connection) != Closed) { if (GraphInterpreter.Debug) println(s"COMPLETE ${outOwnerName(connection)} -> ${inOwnerName(connection)}") - inAvailable(connection) = false + inStates(connection) = Closed inHandlers(connection).onUpstreamFinish() completeConnection(stageId) } case Failed(ex) ⇒ val stageId = assembly.inOwners(connection) - if (!isStageCompleted(stageId)) { + if (!isStageCompleted(stageId) && inStates(connection) != Closed) { if (GraphInterpreter.Debug) println(s"FAIL ${outOwnerName(connection)} -> ${inOwnerName(connection)}") - inAvailable(connection) = false + inStates(connection) = Closed inHandlers(connection).onUpstreamFailure(ex) completeConnection(stageId) } case Cancelled ⇒ val stageId = assembly.outOwners(connection) - if (!isStageCompleted(stageId)) { + if (!isStageCompleted(stageId) && outStates(connection) != Closed) { if (GraphInterpreter.Debug) println(s"CANCEL ${inOwnerName(connection)} -> ${outOwnerName(connection)}") - outAvailable(connection) = false + outStates(connection) = Closed outHandlers(connection).onDownstreamFinish() completeConnection(stageId) } case PushCompleted(elem) ⇒ - inAvailable(connection) = true - connectionStates(connection) = elem - processElement(elem) - enqueue(connection, Completed) + val stageId = assembly.inOwners(connection) + if (!isStageCompleted(stageId) && inStates(connection) != Closed) { + inStates(connection) = Available + connectionStates(connection) = elem + processElement(elem) + val elemAfter = connectionStates(connection) + if (elemAfter == Empty) enqueue(connection, Completed) + else enqueue(connection, CompletedHasElement(elemAfter)) + } else { + connectionStates(connection) = Completed + } + case pushedElem ⇒ processElement(pushedElem) } @@ -402,14 +422,17 @@ private[stream] final class GraphInterpreter( // to prevent redundant completion events in case of concurrent invocation on both sides of the connection. // I.e. when one side already enqueued the completion event, then the other side will not enqueue the event since // there is noone to process it anymore. - def isConnectionCompleted(connection: Int): Boolean = connectionStates(connection).isInstanceOf[CompletedState] + def isConnectionCompleting(connection: Int): Boolean = connectionStates(connection).isInstanceOf[CompletingState] // Returns true if the given stage is alredy completed def isStageCompleted(stageId: Int): Boolean = stageId != Boundary && shutdownCounter(stageId) == 0 private def isPushInFlight(connection: Int): Boolean = - !inAvailable(connection) && - !connectionStates(connection).isInstanceOf[ConnectionState] && + (inStates(connection) == InFlight) && // Other side has not been notified + hasElement(connection) + + private def hasElement(connection: Int): Boolean = + !connectionStates(connection).isInstanceOf[ConnectionState] && connectionStates(connection) != Empty // Register that a connection in which the given stage participated has been completed and therefore the stage @@ -430,37 +453,46 @@ private[stream] final class GraphInterpreter( } private[stream] def push(connection: Int, elem: Any): Unit = { - outAvailable(connection) = false - enqueue(connection, elem) + if (!(inStates(connection) eq Closed)) { + outStates(connection) = InFlight + enqueue(connection, elem) + } } private[stream] def pull(connection: Int): Unit = { - inAvailable(connection) = false - enqueue(connection, Pushable) + if (!(outStates(connection) eq Closed)) { + inStates(connection) = InFlight + enqueue(connection, Pulled) + } } private[stream] def complete(connection: Int): Unit = { - outAvailable(connection) = false - if (!isConnectionCompleted(connection)) { - // There is a pending push, we change the signal to be a PushCompleted (there can be only one signal in flight - // for a connection) - if (isPushInFlight(connection)) - connectionStates(connection) = PushCompleted(connectionStates(connection)) - else - enqueue(connection, Completed) + outStates(connection) = Closed + if (!isConnectionCompleting(connection) && (inStates(connection) ne Closed)) { + if (hasElement(connection)) { + // There is a pending push, we change the signal to be a PushCompleted (there can be only one signal in flight + // for a connection) + if (inStates(connection) == InFlight) + connectionStates(connection) = PushCompleted(connectionStates(connection)) + else enqueue(connection, CompletedHasElement(connectionStates(connection))) + } else enqueue(connection, Completed) } completeConnection(assembly.outOwners(connection)) } private[stream] def fail(connection: Int, ex: Throwable): Unit = { - outAvailable(connection) = false - if (!isConnectionCompleted(connection)) enqueue(connection, Failed(ex)) + outStates(connection) = Closed + if (!isConnectionCompleting(connection) && (inStates(connection) ne Closed)) + enqueue(connection, Failed(ex)) + completeConnection(assembly.outOwners(connection)) } private[stream] def cancel(connection: Int): Unit = { - inAvailable(connection) = false - if (!isConnectionCompleted(connection)) enqueue(connection, Cancelled) + inStates(connection) = Closed + if (!isConnectionCompleting(connection) && (outStates(connection) ne Closed)) + enqueue(connection, Cancelled) + completeConnection(assembly.inOwners(connection)) } diff --git a/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala b/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala index 02c6092791..1da600c696 100644 --- a/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala +++ b/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala @@ -154,11 +154,12 @@ abstract class GraphStageLogic { */ final protected def pull[T](in: Inlet[T]): Unit = { require(!hasBeenPulled(in), "Cannot pull port twice") + require(!isClosed(in), "Cannot pull closed port") interpreter.pull(conn(in)) } /** - * Requests to stop receiving events from a given input port. + * Requests to stop receiving events from a given input port. Cancelling clears any ungrabbed elements from the port. */ final protected def cancel[T](in: Inlet[T]): Unit = interpreter.cancel(conn(in)) @@ -173,15 +174,22 @@ abstract class GraphStageLogic { require(isAvailable(in), "Cannot get element from already empty input port") val connection = conn(in) val elem = interpreter.connectionStates(connection) - interpreter.connectionStates(connection) = Empty - elem.asInstanceOf[T] + + elem match { + case CompletedHasElement(realElem) ⇒ + interpreter.connectionStates(connection) = Completed + realElem.asInstanceOf[T] + case _ ⇒ + interpreter.connectionStates(connection) = Empty + elem.asInstanceOf[T] + } } /** * Indicates whether there is already a pending pull for the given input port. If this method returns true * then [[isAvailable()]] must return false for that same port. */ - final protected def hasBeenPulled[T](in: Inlet[T]): Boolean = !interpreter.inAvailable(conn(in)) + final protected def hasBeenPulled[T](in: Inlet[T]): Boolean = interpreter.inStates(conn(in)) eq InFlight /** * Indicates whether there is an element waiting at the given input port. [[grab()]] can be used to retrieve the @@ -191,9 +199,20 @@ abstract class GraphStageLogic { */ final protected def isAvailable[T](in: Inlet[T]): Boolean = { val connection = conn(in) - interpreter.inAvailable(connection) && !(interpreter.connectionStates(connection) == Empty) + val state = interpreter.connectionStates(connection) + + val arrived = interpreter.inStates(connection) ne InFlight + val hasElementState = state.isInstanceOf[HasElementState] + val rawElement = (state != Empty) && !state.isInstanceOf[ConnectionState] + + arrived && (hasElementState || rawElement) } + /** + * Indicates whether the port has been closed. A closed port cannot be pulled. + */ + final protected def isClosed[T](in: Inlet[T]): Boolean = interpreter.inStates(conn(in)) eq Closed + /** * Emits an element through the given output port. Calling this method twice before a [[pull()]] has been arrived * will fail. There can be only one outstanding push request at any given time. The method [[isAvailable()]] can be @@ -201,6 +220,7 @@ abstract class GraphStageLogic { */ final protected def push[T](out: Outlet[T], elem: T): Unit = { require(isAvailable(out), "Cannot push port twice") + require(!isClosed(out), "Cannot pull closed port") interpreter.push(conn(out), elem) } @@ -235,7 +255,12 @@ abstract class GraphStageLogic { /** * Return true if the given output port is ready to be pushed. */ - final def isAvailable[T](out: Outlet[T]): Boolean = interpreter.outAvailable(conn(out)) + final def isAvailable[T](out: Outlet[T]): Boolean = interpreter.outStates(conn(out)) eq Available + + /** + * Indicates whether the port has been closed. A closed port cannot be pushed. + */ + final protected def isClosed[T](out: Outlet[T]): Boolean = interpreter.outStates(conn(out)) eq Closed /** * Obtain a callback object that can be used asynchronously to re-enter the