diff --git a/akka-bench-jmh-dev/src/main/scala/akka/BenchRunner.scala b/akka-bench-jmh-dev/src/main/scala/akka/BenchRunner.scala index 184b220786..44e5856ecb 100644 --- a/akka-bench-jmh-dev/src/main/scala/akka/BenchRunner.scala +++ b/akka-bench-jmh-dev/src/main/scala/akka/BenchRunner.scala @@ -8,10 +8,11 @@ object BenchRunner { def main(args: Array[String]) = { import scala.collection.JavaConversions._ - val args2 = args.toList match { - case "quick" :: tail => "-i 1 -wi 1 -f1 -t1".split(" ").toList ::: tail - case "full" :: tail => "-i 10 -wi 4 -f3 -t1".split(" ").toList ::: tail - case other => other + val args2 = args.toList.flatMap { + case "quick" => "-i 1 -wi 1 -f1 -t1".split(" ").toList + case "full" => "-i 10 -wi 4 -f3 -t1".split(" ").toList + case "jitwatch" => "-jvmArgs=-XX:+UnlockDiagnosticVMOptions -XX:+TraceClassLoading -XX:+LogCompilation" :: Nil + case other => other :: Nil } val opts = new CommandLineOptions(args2: _*) diff --git a/akka-bench-jmh-dev/src/main/scala/akka/stream/FlowMapBenchmark.scala b/akka-bench-jmh-dev/src/main/scala/akka/stream/FlowMapBenchmark.scala index 729be6dc1c..8271c61ce5 100644 --- a/akka-bench-jmh-dev/src/main/scala/akka/stream/FlowMapBenchmark.scala +++ b/akka-bench-jmh-dev/src/main/scala/akka/stream/FlowMapBenchmark.scala @@ -12,6 +12,7 @@ import org.openjdk.jmh.annotations._ import scala.concurrent.Lock import scala.util.Success import akka.stream.impl.fusing.GraphStages +import org.reactivestreams._ @State(Scope.Benchmark) @OutputTimeUnit(TimeUnit.MILLISECONDS) @@ -25,6 +26,15 @@ class FlowMapBenchmark { log-dead-letters-during-shutdown = off loglevel = "WARNING" + actor.default-dispatcher { + #executor = "thread-pool-executor" + throughput = 1024 + } + + actor.default-mailbox { + mailbox-type = "akka.dispatch.SingleConsumerOnlyUnboundedMailbox" + } + test { timefactor = 1.0 filter-leeway = 3s @@ -40,10 +50,8 @@ class FlowMapBenchmark { var materializer: ActorMaterializer = _ - final val UseGraphStageIdentity = false - - // manual, and not via @Param, because we want @OperationsPerInvocation on our tests - final val data100k = (1 to 100000).toVector + @Param(Array("true", "false")) + val UseGraphStageIdentity = false final val successMarker = Success(1) final val successFailure = Success(new Exception) @@ -51,7 +59,7 @@ class FlowMapBenchmark { // safe to be benchmark scoped because the flows we construct in this bench are stateless var flow: Source[Int, Unit] = _ - @Param(Array("2", "8")) // todo + @Param(Array("8", "32", "128")) val initialInputBufferSize = 0 @Param(Array("1", "5", "10")) @@ -60,11 +68,38 @@ class FlowMapBenchmark { @Setup def setup() { val settings = ActorMaterializerSettings(system) - .withInputBuffer(initialInputBufferSize, 16) + .withInputBuffer(initialInputBufferSize, initialInputBufferSize) materializer = ActorMaterializer(settings) - flow = mkMaps(Source(data100k), numberOfMapOps) { + // Important to use a synchronous, zero overhead source, otherwise the slowness of the source + // might bias the benchmark, since the stream always adjusts the rate to the slowest stage. + val syncTestPublisher = new Publisher[Int] { + override def subscribe(s: Subscriber[_ >: Int]): Unit = { + val sub = new Subscription { + var counter = 0 // Piggyback on caller thread, no need for volatile + + override def request(n: Long): Unit = { + var i = n + while (i > 0) { + s.onNext(counter) + counter += 1 + if (counter == 100000) { + s.onComplete() + return + } + i -= 1 + } + } + + override def cancel(): Unit = () + } + + s.onSubscribe(sub) + } + } + + flow = mkMaps(Source(syncTestPublisher), numberOfMapOps) { if (UseGraphStageIdentity) new GraphStages.Identity[Int] else diff --git a/akka-bench-jmh-dev/src/main/scala/akka/stream/InterpreterBenchmark.scala b/akka-bench-jmh-dev/src/main/scala/akka/stream/InterpreterBenchmark.scala index 370a54f7a2..c3af314393 100644 --- a/akka-bench-jmh-dev/src/main/scala/akka/stream/InterpreterBenchmark.scala +++ b/akka-bench-jmh-dev/src/main/scala/akka/stream/InterpreterBenchmark.scala @@ -51,9 +51,13 @@ class InterpreterBenchmark { @Benchmark @OperationsPerInvocation(100000) def onebounded_interpreter_100k_elements() { + val lock = new Lock() + lock.acquire() val sink = OneBoundedDataSink(data100k.size) - // FIXME: This should not be here, this is pure setup overhead - val ops = Vector.fill(numberOfIds)(MapStage(identity[Int], Supervision.stoppingDecider)) + val ops = Vector.fill(numberOfIds)(new PushPullStage[Int, Int] { + override def onPull(ctx: _root_.akka.stream.stage.Context[Int]) = ctx.pull() + override def onPush(elem: Int, ctx: _root_.akka.stream.stage.Context[Int]) = ctx.push(elem) + }) val interpreter = new OneBoundedInterpreter(OneBoundedDataSource(data100k) +: ops :+ sink, (op, ctx, event) ⇒ (), Logging(NoopBus, classOf[InterpreterBenchmark]), @@ -70,6 +74,7 @@ object InterpreterBenchmark { case class GraphDataSource[T](override val toString: String, data: Vector[T]) extends UpstreamBoundaryStageLogic[T] { var idx = 0 val out = Outlet[T]("out") + out.id = 0 setHandler(out, new OutHandler { override def onPull(): Unit = { @@ -87,6 +92,7 @@ object InterpreterBenchmark { case class GraphDataSink[T](override val toString: String, var expected: Int) extends DownstreamBoundaryStageLogic[T] { val in = Inlet[T]("in") + in.id = 0 setHandler(in, new InHandler { override def onPush(): Unit = { diff --git a/akka-http-core/src/main/scala/akka/http/impl/util/package.scala b/akka-http-core/src/main/scala/akka/http/impl/util/package.scala index 4d90d50aa3..de25d0ee5b 100644 --- a/akka-http-core/src/main/scala/akka/http/impl/util/package.scala +++ b/akka-http-core/src/main/scala/akka/http/impl/util/package.scala @@ -138,7 +138,7 @@ package util { val out = Outlet[HttpEntity.Strict]("out") override val shape = FlowShape(in, out) - override def createLogic: GraphStageLogic = new GraphStageLogic { + override def createLogic: GraphStageLogic = new GraphStageLogic(shape) { var bytes = ByteString.newBuilder private var emptyStream = false diff --git a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/ActorGraphInterpreterSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/ActorGraphInterpreterSpec.scala index c204319cb2..3a761595aa 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/ActorGraphInterpreterSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/ActorGraphInterpreterSpec.scala @@ -47,7 +47,7 @@ class ActorGraphInterpreterSpec extends AkkaSpec { val out2 = Outlet[Int]("out2") val shape = BidiShape(in1, out1, in2, out2) - override def createLogic: GraphStageLogic = new GraphStageLogic { + override def createLogic: GraphStageLogic = new GraphStageLogic(shape) { setHandler(in1, new InHandler { override def onPush(): Unit = push(out1, grab(in1)) override def onUpstreamFinish(): Unit = complete(out1) @@ -88,7 +88,7 @@ class ActorGraphInterpreterSpec extends AkkaSpec { val out2 = Outlet[Int]("out2") val shape = BidiShape(in1, out1, in2, out2) - override def createLogic: GraphStageLogic = new GraphStageLogic { + override def createLogic: GraphStageLogic = new GraphStageLogic(shape) { setHandler(in1, new InHandler { override def onPush(): Unit = push(out1, grab(in1)) @@ -134,7 +134,7 @@ class ActorGraphInterpreterSpec extends AkkaSpec { val out2 = Outlet[Int]("out2") val shape = BidiShape(in1, out1, in2, out2) - override def createLogic: GraphStageLogic = new GraphStageLogic { + override def createLogic: GraphStageLogic = new GraphStageLogic(shape) { setHandler(in1, new InHandler { override def onPush(): Unit = push(out1, grab(in1)) @@ -183,7 +183,7 @@ class ActorGraphInterpreterSpec extends AkkaSpec { val out2 = Outlet[Int]("out2") val shape = BidiShape(in1, out1, in2, out2) - override def createLogic: GraphStageLogic = new GraphStageLogic { + override def createLogic: GraphStageLogic = new GraphStageLogic(shape) { setHandler(in1, new InHandler { override def onPush(): Unit = push(out2, grab(in1)) diff --git a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/GraphInterpreterFailureModesSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/GraphInterpreterFailureModesSpec.scala new file mode 100644 index 0000000000..a1b0d01f59 --- /dev/null +++ b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/GraphInterpreterFailureModesSpec.scala @@ -0,0 +1,110 @@ +package akka.stream.impl.fusing + +import akka.stream.testkit.Utils.TE +import akka.testkit.EventFilter + +class GraphInterpreterFailureModesSpec extends GraphInterpreterSpecKit { + + "GraphInterpreter" must { + + "handle failure on onPull" in new FailingStageSetup { + lastEvents() should be(Set(PreStart(stage))) + + downstream.pull() + failOnNextEvent() + stepAll() + + lastEvents() should be(Set(Cancel(upstream), OnError(downstream, testException), PostStop(stage))) + } + + "handle failure on onPush" in new FailingStageSetup { + lastEvents() should be(Set(PreStart(stage))) + + downstream.pull() + stepAll() + clearEvents() + upstream.push(0) + failOnNextEvent() + stepAll() + + lastEvents() should be(Set(Cancel(upstream), OnError(downstream, testException), PostStop(stage))) + } + + "handle failure on onPull while cancel is pending" in new FailingStageSetup { + lastEvents() should be(Set(PreStart(stage))) + + downstream.pull() + downstream.cancel() + failOnNextEvent() + stepAll() + + lastEvents() should be(Set(Cancel(upstream), PostStop(stage))) + } + + "handle failure on onPush while complete is pending" in new FailingStageSetup { + lastEvents() should be(Set(PreStart(stage))) + + downstream.pull() + stepAll() + clearEvents() + upstream.push(0) + upstream.complete() + failOnNextEvent() + stepAll() + + lastEvents() should be(Set(OnError(downstream, testException), PostStop(stage))) + } + + "handle failure on onUpstreamFinish" in new FailingStageSetup { + lastEvents() should be(Set(PreStart(stage))) + + upstream.complete() + failOnNextEvent() + stepAll() + + lastEvents() should be(Set(OnError(downstream, testException), PostStop(stage))) + } + + "handle failure on onUpstreamFailure" in new FailingStageSetup { + lastEvents() should be(Set(PreStart(stage))) + + upstream.fail(TE("another exception")) // this is not the exception that will be propagated + failOnNextEvent() + stepAll() + + lastEvents() should be(Set(OnError(downstream, testException), PostStop(stage))) + } + + "handle failure on onDownstreamFinish" in new FailingStageSetup { + lastEvents() should be(Set(PreStart(stage))) + + downstream.cancel() + failOnNextEvent() + stepAll() + + lastEvents() should be(Set(Cancel(upstream), PostStop(stage))) + } + + "handle failure in preStart" in new FailingStageSetup(initFailOnNextEvent = true) { + stepAll() + + lastEvents() should be(Set(Cancel(upstream), OnError(downstream, testException), PostStop(stage))) + } + + "handle failure in postStop" in new FailingStageSetup { + lastEvents() should be(Set(PreStart(stage))) + + upstream.complete() + downstream.cancel() + failOnPostStop() + + EventFilter.error("Error during postStop in [stage]").intercept { + stepAll() + lastEvents() should be(Set.empty) + } + + } + + } + +} diff --git a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/GraphInterpreterPortsSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/GraphInterpreterPortsSpec.scala index d32c98146c..88a9a1a527 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/GraphInterpreterPortsSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/GraphInterpreterPortsSpec.scala @@ -4,11 +4,14 @@ package akka.stream.impl.fusing import akka.stream.testkit.AkkaSpec +import akka.stream.testkit.Utils._ -class GraphInterpreterPortsSpec extends AkkaSpec with GraphInterpreterSpecKit { +class GraphInterpreterPortsSpec extends GraphInterpreterSpecKit { "Port states" must { + // FIXME test failure scenarios + "properly transition on push and pull" in new PortTestSetup { lastEvents() should be(Set.empty) out.isAvailable should be(false) @@ -352,6 +355,32 @@ class GraphInterpreterPortsSpec extends AkkaSpec with GraphInterpreterSpecKit { in.grab() should ===(0) } + "propagate complete while push is in flight and pulled after the push" in new PortTestSetup { + in.pull() + stepAll() + clearEvents() + + out.push(0) + out.complete() + step() + + lastEvents() should be(Set(OnNext(in, 0))) + in.grab() should ===(0) + + in.pull() + stepAll() + + lastEvents() should be(Set(OnComplete(in))) + out.isAvailable should be(false) + out.isClosed should be(true) + in.isAvailable should be(false) + in.hasBeenPulled should be(false) + in.isClosed should be(true) + an[IllegalArgumentException] should be thrownBy { in.pull() } + an[IllegalArgumentException] should be thrownBy { out.push(0) } + an[IllegalArgumentException] should be thrownBy { in.grab() } + } + "ignore pull while completing" in new PortTestSetup { out.complete() in.pull() @@ -754,6 +783,392 @@ class GraphInterpreterPortsSpec extends AkkaSpec with GraphInterpreterSpecKit { an[IllegalArgumentException] should be thrownBy { in.grab() } } + "not allow to grab element before it arrives" in new PortTestSetup { + in.pull() + stepAll() + out.push(0) + + an[IllegalArgumentException] should be thrownBy { in.grab() } + } + + "not allow to grab element if already cancelled" in new PortTestSetup { + in.pull() + stepAll() + + out.push(0) + in.cancel() + + stepAll() + + an[IllegalArgumentException] should be thrownBy { in.grab() } + } + + "propagate failure while downstream is active" in new PortTestSetup { + lastEvents() should be(Set.empty) + out.isAvailable should be(false) + out.isClosed should be(false) + in.isAvailable should be(false) + in.hasBeenPulled should be(false) + in.isClosed should be(false) + + out.fail(TE("test")) + + lastEvents() should be(Set.empty) + out.isAvailable should be(false) + out.isClosed should be(true) + in.isAvailable should be(false) + in.hasBeenPulled should be(false) + in.isClosed should be(false) + an[IllegalArgumentException] should be thrownBy { out.push(0) } + + stepAll() + + lastEvents() should be(Set(OnError(in, TE("test")))) + out.isAvailable should be(false) + out.isClosed should be(true) + in.isAvailable should be(false) + in.hasBeenPulled should be(false) + in.isClosed should be(true) + an[IllegalArgumentException] should be thrownBy { in.pull() } + an[IllegalArgumentException] should be thrownBy { out.push(0) } + an[IllegalArgumentException] should be thrownBy { in.grab() } + + in.cancel() // This should have no effect now + stepAll() + + lastEvents() should be(Set.empty) + out.isAvailable should be(false) + out.isClosed should be(true) + in.isAvailable should be(false) + in.hasBeenPulled should be(false) + in.isClosed should be(true) + an[IllegalArgumentException] should be thrownBy { in.pull() } + an[IllegalArgumentException] should be thrownBy { out.push(0) } + an[IllegalArgumentException] should be thrownBy { in.grab() } + + out.complete() // This should have no effect now + stepAll() + + lastEvents() should be(Set.empty) + out.isAvailable should be(false) + out.isClosed should be(true) + in.isAvailable should be(false) + in.hasBeenPulled should be(false) + in.isClosed should be(true) + an[IllegalArgumentException] should be thrownBy { in.pull() } + an[IllegalArgumentException] should be thrownBy { out.push(0) } + an[IllegalArgumentException] should be thrownBy { in.grab() } + } + + "propagate failure while upstream is active" in new PortTestSetup { + in.pull() + stepAll() + + lastEvents() should be(Set(RequestOne(out))) + out.isAvailable should be(true) + out.isClosed should be(false) + in.isAvailable should be(false) + in.hasBeenPulled should be(true) + in.isClosed should be(false) + + out.fail(TE("test")) + + lastEvents() should be(Set.empty) + out.isAvailable should be(false) + out.isClosed should be(true) + in.isAvailable should be(false) + in.hasBeenPulled should be(true) + in.isClosed should be(false) + an[IllegalArgumentException] should be thrownBy { out.push(0) } + + stepAll() + + lastEvents() should be(Set(OnError(in, TE("test")))) + out.isAvailable should be(false) + out.isClosed should be(true) + in.isAvailable should be(false) + in.hasBeenPulled should be(false) + in.isClosed should be(true) + an[IllegalArgumentException] should be thrownBy { in.pull() } + an[IllegalArgumentException] should be thrownBy { out.push(0) } + an[IllegalArgumentException] should be thrownBy { in.grab() } + + in.cancel() // This should have no effect now + stepAll() + + lastEvents() should be(Set.empty) + out.isAvailable should be(false) + out.isClosed should be(true) + in.isAvailable should be(false) + in.hasBeenPulled should be(false) + in.isClosed should be(true) + an[IllegalArgumentException] should be thrownBy { in.pull() } + an[IllegalArgumentException] should be thrownBy { out.push(0) } + an[IllegalArgumentException] should be thrownBy { in.grab() } + + out.complete() // This should have no effect now + stepAll() + + lastEvents() should be(Set.empty) + out.isAvailable should be(false) + out.isClosed should be(true) + in.isAvailable should be(false) + in.hasBeenPulled should be(false) + in.isClosed should be(true) + an[IllegalArgumentException] should be thrownBy { in.pull() } + an[IllegalArgumentException] should be thrownBy { out.push(0) } + an[IllegalArgumentException] should be thrownBy { in.grab() } + + } + + "propagate failure while pull is in flight" in new PortTestSetup { + in.pull() + + lastEvents() should be(Set.empty) + out.isAvailable should be(false) + out.isClosed should be(false) + in.isAvailable should be(false) + in.hasBeenPulled should be(true) + in.isClosed should be(false) + + out.fail(TE("test")) + + lastEvents() should be(Set.empty) + out.isAvailable should be(false) + out.isClosed should be(true) + in.isAvailable should be(false) + in.hasBeenPulled should be(true) + in.isClosed should be(false) + an[IllegalArgumentException] should be thrownBy { out.push(0) } + + stepAll() + + lastEvents() should be(Set(OnError(in, TE("test")))) + out.isAvailable should be(false) + out.isClosed should be(true) + in.isAvailable should be(false) + in.hasBeenPulled should be(false) + in.isClosed should be(true) + an[IllegalArgumentException] should be thrownBy { in.pull() } + an[IllegalArgumentException] should be thrownBy { out.push(0) } + an[IllegalArgumentException] should be thrownBy { in.grab() } + + in.cancel() // This should have no effect now + stepAll() + + lastEvents() should be(Set.empty) + out.isAvailable should be(false) + out.isClosed should be(true) + in.isAvailable should be(false) + in.hasBeenPulled should be(false) + in.isClosed should be(true) + an[IllegalArgumentException] should be thrownBy { in.pull() } + an[IllegalArgumentException] should be thrownBy { out.push(0) } + an[IllegalArgumentException] should be thrownBy { in.grab() } + + out.complete() // This should have no effect now + stepAll() + + lastEvents() should be(Set.empty) + out.isAvailable should be(false) + out.isClosed should be(true) + in.isAvailable should be(false) + in.hasBeenPulled should be(false) + in.isClosed should be(true) + an[IllegalArgumentException] should be thrownBy { in.pull() } + an[IllegalArgumentException] should be thrownBy { out.push(0) } + an[IllegalArgumentException] should be thrownBy { in.grab() } + } + + "propagate failure while push is in flight" in new PortTestSetup { + in.pull() + stepAll() + clearEvents() + + out.push(0) + + lastEvents() should be(Set.empty) + out.isAvailable should be(false) + out.isClosed should be(false) + in.isAvailable should be(false) + in.hasBeenPulled should be(true) + in.isClosed should be(false) + + out.fail(TE("test")) + + lastEvents() should be(Set.empty) + out.isAvailable should be(false) + out.isClosed should be(true) + in.isAvailable should be(false) + in.hasBeenPulled should be(true) + in.isClosed should be(false) + an[IllegalArgumentException] should be thrownBy { out.push(0) } + + step() + + lastEvents() should be(Set(OnNext(in, 0))) + out.isAvailable should be(false) + out.isClosed should be(true) + in.isAvailable should be(true) + in.hasBeenPulled should be(false) + in.isClosed should be(false) + an[IllegalArgumentException] should be thrownBy { out.push(0) } + in.grab() should ===(0) + an[IllegalArgumentException] should be thrownBy { in.grab() } + + step() + + lastEvents() should be(Set(OnError(in, TE("test")))) + out.isAvailable should be(false) + out.isClosed should be(true) + in.isAvailable should be(false) + in.hasBeenPulled should be(false) + in.isClosed should be(true) + an[IllegalArgumentException] should be thrownBy { in.pull() } + an[IllegalArgumentException] should be thrownBy { out.push(0) } + an[IllegalArgumentException] should be thrownBy { in.grab() } + + out.complete() // This should have no effect now + stepAll() + + lastEvents() should be(Set.empty) + out.isAvailable should be(false) + out.isClosed should be(true) + in.isAvailable should be(false) + in.hasBeenPulled should be(false) + in.isClosed should be(true) + an[IllegalArgumentException] should be thrownBy { in.pull() } + an[IllegalArgumentException] should be thrownBy { out.push(0) } + an[IllegalArgumentException] should be thrownBy { in.grab() } + } + + "propagate failure while push is in flight and keep ungrabbed element" in new PortTestSetup { + in.pull() + stepAll() + clearEvents() + + out.push(0) + out.fail(TE("test")) + step() + + lastEvents() should be(Set(OnNext(in, 0))) + step() + + lastEvents() should be(Set(OnError(in, TE("test")))) + out.isAvailable should be(false) + out.isClosed should be(true) + in.isAvailable should be(true) + in.hasBeenPulled should be(false) + in.isClosed should be(true) + in.grab() should ===(0) + } + + "ignore pull while failing" in new PortTestSetup { + out.fail(TE("test")) + in.pull() + + stepAll() + + lastEvents() should be(Set(OnError(in, TE("test")))) + out.isAvailable should be(false) + out.isClosed should be(true) + in.isAvailable should be(false) + in.hasBeenPulled should be(false) + in.isClosed should be(true) + an[IllegalArgumentException] should be thrownBy { in.pull() } + an[IllegalArgumentException] should be thrownBy { out.push(0) } + an[IllegalArgumentException] should be thrownBy { in.grab() } + } + + "ignore any failure completion if they are concurrent (cancel first)" in new PortTestSetup { + in.cancel() + out.fail(TE("test")) + + stepAll() + + lastEvents() should be(Set.empty) + out.isAvailable should be(false) + out.isClosed should be(true) + in.isAvailable should be(false) + in.hasBeenPulled should be(false) + in.isClosed should be(true) + an[IllegalArgumentException] should be thrownBy { in.pull() } + an[IllegalArgumentException] should be thrownBy { out.push(0) } + an[IllegalArgumentException] should be thrownBy { in.grab() } + } + + "ignore any failure completion if they are concurrent (complete first)" in new PortTestSetup { + out.fail(TE("test")) + in.cancel() + + stepAll() + + lastEvents() should be(Set.empty) + out.isAvailable should be(false) + out.isClosed should be(true) + in.isAvailable should be(false) + in.hasBeenPulled should be(false) + in.isClosed should be(true) + an[IllegalArgumentException] should be thrownBy { in.pull() } + an[IllegalArgumentException] should be thrownBy { out.push(0) } + an[IllegalArgumentException] should be thrownBy { in.grab() } + } + + "ignore failure from a push-then-fail if cancelled while in flight" in new PortTestSetup { + in.pull() + stepAll() + clearEvents() + + out.push(0) + out.fail(TE("test")) + in.cancel() + + stepAll() + + lastEvents() should be(Set.empty) + out.isAvailable should be(false) + out.isClosed should be(true) + in.isAvailable should be(false) + in.hasBeenPulled should be(false) + in.isClosed should be(true) + an[IllegalArgumentException] should be thrownBy { in.pull() } + an[IllegalArgumentException] should be thrownBy { out.push(0) } + an[IllegalArgumentException] should be thrownBy { in.grab() } + } + + "ignore failure from a push-then-fail if cancelled after onPush" in new PortTestSetup { + in.pull() + stepAll() + clearEvents() + + out.push(0) + out.fail(TE("test")) + + step() + + lastEvents() should be(Set(OnNext(in, 0))) + out.isAvailable should be(false) + out.isClosed should be(true) + in.isAvailable should be(true) + in.hasBeenPulled should be(false) + in.isClosed should be(false) + an[IllegalArgumentException] should be thrownBy { out.push(0) } + in.grab() should ===(0) + + in.cancel() + stepAll() + + lastEvents() should be(Set.empty) + out.isAvailable should be(false) + out.isClosed should be(true) + in.isAvailable should be(false) + in.hasBeenPulled should be(false) + in.isClosed should be(true) + an[IllegalArgumentException] should be thrownBy { in.pull() } + an[IllegalArgumentException] should be thrownBy { out.push(0) } + an[IllegalArgumentException] should be thrownBy { in.grab() } + } + } } diff --git a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/GraphInterpreterSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/GraphInterpreterSpec.scala index ae3944d14a..fed8396499 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/GraphInterpreterSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/GraphInterpreterSpec.scala @@ -7,7 +7,7 @@ import akka.stream.testkit.AkkaSpec import akka.stream.scaladsl.{ Merge, Broadcast, Balance, Zip } import GraphInterpreter._ -class GraphInterpreterSpec extends AkkaSpec with GraphInterpreterSpecKit { +class GraphInterpreterSpec extends GraphInterpreterSpecKit { import GraphStages._ "GraphInterpreter" must { @@ -273,8 +273,6 @@ class GraphInterpreterSpec extends AkkaSpec with GraphInterpreterSpecKit { lastEvents() should ===(Set(OnNext(sink2, 2))) } - "implement bidi-stage" in pending - "implement non-divergent cycle" in new TestSetup { val source = new UpstreamProbe[Int]("source") val sink = new DownstreamProbe[Int]("sink") diff --git a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/GraphInterpreterSpecKit.scala b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/GraphInterpreterSpecKit.scala index d1b40d3324..273583ba91 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/GraphInterpreterSpecKit.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/GraphInterpreterSpecKit.scala @@ -3,11 +3,14 @@ */ package akka.stream.impl.fusing -import akka.stream.{ NoMaterializer, Outlet, Inlet, Shape } -import akka.stream.impl.fusing.GraphInterpreter.{ GraphAssembly, DownstreamBoundaryStageLogic, UpstreamBoundaryStageLogic } +import akka.event.Logging +import akka.stream._ +import akka.stream.impl.fusing.GraphInterpreter.{ Failed, GraphAssembly, DownstreamBoundaryStageLogic, UpstreamBoundaryStageLogic } import akka.stream.stage.{ InHandler, OutHandler, GraphStage, GraphStageLogic } +import akka.stream.testkit.AkkaSpec +import akka.stream.testkit.Utils.TE -trait GraphInterpreterSpecKit { +trait GraphInterpreterSpecKit extends AkkaSpec { sealed trait TestEvent { def source: GraphStageLogic @@ -20,11 +23,17 @@ trait GraphInterpreterSpecKit { case class RequestOne(source: GraphStageLogic) extends TestEvent case class RequestAnother(source: GraphStageLogic) extends TestEvent + case class PreStart(source: GraphStageLogic) extends TestEvent + case class PostStop(source: GraphStageLogic) extends TestEvent + abstract class TestSetup { protected var lastEvent: Set[TestEvent] = Set.empty private var _interpreter: GraphInterpreter = _ protected def interpreter: GraphInterpreter = _interpreter + def stepAll(): Unit = interpreter.execute(eventLimit = Int.MaxValue) + def step(): Unit = interpreter.execute(eventLimit = 1) + class AssemblyBuilder(stages: Seq[GraphStage[_ <: Shape]]) { var upstreams = Vector.empty[(UpstreamBoundaryStageLogic[_], Inlet[_])] var downstreams = Vector.empty[(Outlet[_], DownstreamBoundaryStageLogic[_])] @@ -59,7 +68,7 @@ trait GraphInterpreterSpecKit { (Vector.fill(upstreams.size)(-1) ++ outOwners).toArray) val (inHandlers, outHandlers, logics, _) = assembly.materialize() - _interpreter = new GraphInterpreter(assembly, NoMaterializer, inHandlers, outHandlers, logics, (_, _, _) ⇒ ()) + _interpreter = new GraphInterpreter(assembly, NoMaterializer, Logging(system, classOf[TestSetup]), inHandlers, outHandlers, logics, (_, _, _) ⇒ ()) for ((upstream, i) ← upstreams.zipWithIndex) { _interpreter.attachUpstreamBoundary(i, upstream._1) @@ -75,7 +84,7 @@ trait GraphInterpreterSpecKit { def manualInit(assembly: GraphAssembly): Unit = { val (inHandlers, outHandlers, logics, _) = assembly.materialize() - _interpreter = new GraphInterpreter(assembly, NoMaterializer, inHandlers, outHandlers, logics, (_, _, _) ⇒ ()) + _interpreter = new GraphInterpreter(assembly, NoMaterializer, Logging(system, classOf[TestSetup]), inHandlers, outHandlers, logics, (_, _, _) ⇒ ()) } def builder(stages: GraphStage[_ <: Shape]*): AssemblyBuilder = new AssemblyBuilder(stages.toSeq) @@ -90,10 +99,11 @@ trait GraphInterpreterSpecKit { class UpstreamProbe[T](override val toString: String) extends UpstreamBoundaryStageLogic[T] { val out = Outlet[T]("out") + out.id = 0 setHandler(out, new OutHandler { override def onPull(): Unit = lastEvent += RequestOne(UpstreamProbe.this) - override def onDownstreamFinish() = lastEvent += Cancel(UpstreamProbe.this) + override def onDownstreamFinish(): Unit = lastEvent += Cancel(UpstreamProbe.this) }) def onNext(elem: T, eventLimit: Int = Int.MaxValue): Unit = { @@ -105,11 +115,12 @@ trait GraphInterpreterSpecKit { class DownstreamProbe[T](override val toString: String) extends DownstreamBoundaryStageLogic[T] { val in = Inlet[T]("in") + in.id = 0 setHandler(in, new InHandler { override def onPush(): Unit = lastEvent += OnNext(DownstreamProbe.this, grab(in)) - override def onUpstreamFinish() = lastEvent += OnComplete(DownstreamProbe.this) - override def onUpstreamFailure(ex: Throwable) = OnError(DownstreamProbe.this, ex) + override def onUpstreamFinish(): Unit = lastEvent += OnComplete(DownstreamProbe.this) + override def onUpstreamFailure(ex: Throwable): Unit = lastEvent += OnError(DownstreamProbe.this, ex) }) def requestOne(eventLimit: Int = Int.MaxValue): Unit = { @@ -146,20 +157,20 @@ trait GraphInterpreterSpecKit { setHandler(in, new InHandler { // Modified onPush that does not grab() automatically the element. This accesses some internals. - override def onPush(): Unit = - lastEvent += - OnNext( - DownstreamPortProbe.this, - interpreter.connectionStates(inToConn(in))) + override def onPush(): Unit = { + val internalEvent = interpreter.connectionSlots(inToConn(in.id)) + + internalEvent match { + case Failed(_, elem) ⇒ lastEvent += OnNext(DownstreamPortProbe.this, elem) + case elem ⇒ lastEvent += OnNext(DownstreamPortProbe.this, elem) + } + } override def onUpstreamFinish() = lastEvent += OnComplete(DownstreamPortProbe.this) - override def onUpstreamFailure(ex: Throwable) = OnError(DownstreamPortProbe.this, ex) + override def onUpstreamFailure(ex: Throwable) = lastEvent += OnError(DownstreamPortProbe.this, ex) }) } - def stepAll(): Unit = interpreter.execute(eventLimit = Int.MaxValue) - def step(): Unit = interpreter.execute(eventLimit = 1) - private val assembly = GraphAssembly( stages = Array.empty, ins = Array(null), @@ -173,4 +184,73 @@ trait GraphInterpreterSpecKit { interpreter.init() } + abstract class FailingStageSetup(initFailOnNextEvent: Boolean = false) extends TestSetup { + + val upstream = new UpstreamPortProbe[Int] + val downstream = new DownstreamPortProbe[Int] + + private var _failOnNextEvent: Boolean = initFailOnNextEvent + private var _failOnPostStop: Boolean = false + + def failOnNextEvent(): Unit = _failOnNextEvent = true + def failOnPostStop(): Unit = _failOnPostStop = true + + def testException = TE("test") + + private val stagein = Inlet[Int]("sandwitch.in") + private val stageout = Outlet[Int]("sandwitch.out") + private val stageshape = FlowShape(stagein, stageout) + + // Must be lazy because I turned this stage "inside-out" therefore changing initialization order + // to make tests a bit more readable + lazy val stage: GraphStageLogic = new GraphStageLogic(stageshape) { + private def mayFail(task: ⇒ Unit): Unit = { + if (!_failOnNextEvent) task + else { + _failOnNextEvent = false + throw testException + } + } + + setHandler(stagein, new InHandler { + override def onPush(): Unit = mayFail(push(stageout, grab(stagein))) + override def onUpstreamFinish(): Unit = mayFail(completeStage()) + override def onUpstreamFailure(ex: Throwable): Unit = mayFail(failStage(ex)) + }) + + setHandler(stageout, new OutHandler { + override def onPull(): Unit = mayFail(pull(stagein)) + override def onDownstreamFinish(): Unit = mayFail(completeStage()) + }) + + override def preStart(): Unit = mayFail(lastEvent += PreStart(stage)) + override def postStop(): Unit = + if (!_failOnPostStop) lastEvent += PostStop(stage) + else throw testException + + override def toString = "stage" + } + + private val sandwitchStage = new GraphStage[FlowShape[Int, Int]] { + override def shape = stageshape + override def createLogic: GraphStageLogic = stage + } + + class UpstreamPortProbe[T] extends UpstreamProbe[T]("upstreamPort") { + def push(elem: T): Unit = push(out, elem) + def complete(): Unit = complete(out) + def fail(ex: Throwable): Unit = fail(out, ex) + } + + class DownstreamPortProbe[T] extends DownstreamProbe[T]("upstreamPort") { + def pull(): Unit = pull(in) + def cancel(): Unit = cancel(in) + } + + builder(sandwitchStage) + .connect(upstream, stagein) + .connect(stageout, downstream) + .init() + } + } diff --git a/akka-stream/src/main/boilerplate/akka/stream/scaladsl/UnzipWithApply.scala.template b/akka-stream/src/main/boilerplate/akka/stream/scaladsl/UnzipWithApply.scala.template index 80061988f9..10ac47fa63 100644 --- a/akka-stream/src/main/boilerplate/akka/stream/scaladsl/UnzipWithApply.scala.template +++ b/akka-stream/src/main/boilerplate/akka/stream/scaladsl/UnzipWithApply.scala.template @@ -49,7 +49,7 @@ class UnzipWith1[In, [#A1#]](unzipper: In ⇒ ([#A1#])) extends GraphStage[FanOu [#def out0: Outlet[A1] = shape.out0# ] - override def createLogic: GraphStageLogic = new GraphStageLogic { + override def createLogic: GraphStageLogic = new GraphStageLogic(shape) { var pendingCount = 1 var downstreamRunning = 1 diff --git a/akka-stream/src/main/boilerplate/akka/stream/scaladsl/ZipWithApply.scala.template b/akka-stream/src/main/boilerplate/akka/stream/scaladsl/ZipWithApply.scala.template index 672dbb2466..9f40ba4c8b 100644 --- a/akka-stream/src/main/boilerplate/akka/stream/scaladsl/ZipWithApply.scala.template +++ b/akka-stream/src/main/boilerplate/akka/stream/scaladsl/ZipWithApply.scala.template @@ -30,7 +30,7 @@ class ZipWith1[[#A1#], O] (zipper: ([#A1#]) ⇒ O) extends GraphStage[FanInShape [#val in0: Inlet[A1] = shape.in0# ] - override def createLogic: GraphStageLogic = new GraphStageLogic { + override def createLogic: GraphStageLogic = new GraphStageLogic(shape) { var pending = 1 private def pushAll(): Unit = push(out, zipper([#grab(in0)#])) diff --git a/akka-stream/src/main/scala/akka/stream/Shape.scala b/akka-stream/src/main/scala/akka/stream/Shape.scala index 63101e5689..a7ee5ffeee 100644 --- a/akka-stream/src/main/scala/akka/stream/Shape.scala +++ b/akka-stream/src/main/scala/akka/stream/Shape.scala @@ -15,8 +15,13 @@ import scala.collection.JavaConverters._ * for otherwise unreasonable existential types. */ sealed abstract class InPort { self: Inlet[_] ⇒ - final override def hashCode: Int = System.identityHashCode(this) + final override def hashCode: Int = super.hashCode final override def equals(that: Any): Boolean = this eq that.asInstanceOf[AnyRef] + + /** + * INTERNAL API + */ + private[stream] var id: Int = -1 } /** * An output port of a StreamLayout.Module. This type logically belongs @@ -25,8 +30,13 @@ sealed abstract class InPort { self: Inlet[_] ⇒ * for otherwise unreasonable existential types. */ sealed abstract class OutPort { self: Outlet[_] ⇒ - final override def hashCode: Int = System.identityHashCode(this) + final override def hashCode: Int = super.hashCode final override def equals(that: Any): Boolean = this eq that.asInstanceOf[AnyRef] + + /** + * INTERNAL API + */ + private[stream] var id: Int = -1 } /** diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorGraphInterpreter.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorGraphInterpreter.scala index bc41e2492a..2799439985 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorGraphInterpreter.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorGraphInterpreter.scala @@ -100,6 +100,7 @@ private[stream] object ActorGraphInterpreter { private var batchRemaining = requestBatchSize val out: Outlet[Any] = Outlet[Any]("UpstreamBoundary" + id) + out.id = 0 private def dequeue(): Any = { val elem = inputBuffer(nextInputElementCursor) @@ -195,6 +196,7 @@ private[stream] object ActorGraphInterpreter { class ActorOutputBoundary(actor: ActorRef, id: Int) extends DownstreamBoundaryStageLogic[Any] { val in: Inlet[Any] = Inlet[Any]("UpstreamBoundary" + id) + in.id = 0 private var exposedPublisher: ActorPublisher[Any] = _ @@ -299,6 +301,7 @@ private[stream] class ActorGraphInterpreter( val interpreter = new GraphInterpreter( assembly, mat, + Logging(this), inHandlers, outHandlers, logics, diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphInterpreter.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphInterpreter.scala index 84d472bdc2..611ddfd39e 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphInterpreter.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphInterpreter.scala @@ -3,6 +3,8 @@ */ package akka.stream.impl.fusing +import akka.event.LoggingAdapter +import akka.io.Tcp.Closed import akka.stream.stage._ import akka.stream.{ Materializer, Shape, Inlet, Outlet } @@ -19,37 +21,35 @@ private[stream] object GraphInterpreter { */ final val Debug = false + final val NoEvent = -1 + final val Boundary = -1 + + final val InReady = 1 + final val Pulling = 2 + final val Pushing = 4 + final val OutReady = 8 + + final val InClosed = 16 + final val OutClosed = 32 + final val InFailed = 64 + + final val PullStartFlip = 3 // 0011 + final val PullEndFlip = 10 // 1010 + final val PushStartFlip = 12 //1100 + final val PushEndFlip = 5 //0101 + /** * Marker object that indicates that a port holds no element since it was already grabbed. The port is still pullable, * but there is no more element to grab. */ case object Empty + final case class Failed(ex: Throwable, previousElem: Any) - sealed trait ConnectionState - case object Pulled extends ConnectionState - - sealed trait HasElementState - - sealed trait CompletingState extends ConnectionState - final case class CompletedHasElement(element: Any) extends CompletingState with HasElementState - final case class PushCompleted(element: Any) extends CompletingState with HasElementState - case object Completed extends CompletingState - case object Cancelled extends CompletingState - final case class Failed(ex: Throwable) extends CompletingState - - val NoEvent = -1 - val Boundary = -1 - - sealed trait PortState - case object InFlight extends PortState - case object Available extends PortState - case object Closed extends PortState - - abstract class UpstreamBoundaryStageLogic[T] extends GraphStageLogic { + abstract class UpstreamBoundaryStageLogic[T] extends GraphStageLogic(inCount = 0, outCount = 1) { def out: Outlet[T] } - abstract class DownstreamBoundaryStageLogic[T] extends GraphStageLogic { + abstract class DownstreamBoundaryStageLogic[T] extends GraphStageLogic(inCount = 1, outCount = 0) { def in: Inlet[T] } @@ -107,27 +107,52 @@ private[stream] object GraphInterpreter { val logics = Array.ofDim[GraphStageLogic](stages.length) var finalMat: Any = () - for (i ← stages.indices) { - // FIXME: Support for materialized values in fused islands is not yet figured out! - val (logic, mat) = stages(i).createLogicAndMaterializedValue - // FIXME: Current temporary hack to support non-fused stages. If there is one stage that will be under index 0. - if (i == 0) finalMat = mat + var i = 0 + while (i < stages.length) { + // Port initialization loops, these must come first + val shape = stages(i).asInstanceOf[GraphStageWithMaterializedValue[Shape, _]].shape - logics(i) = logic + var idx = 0 + val inletItr = shape.inlets.iterator + while (inletItr.hasNext) { + val inlet = inletItr.next() + require(inlet.id == -1 || inlet.id == idx, s"Inlet $inlet was shared among multiple stages. This is illegal.") + inlet.id = idx + idx += 1 + } + + idx = 0 + val outletItr = shape.outlets.iterator + while (outletItr.hasNext) { + val outlet = outletItr.next() + require(outlet.id == -1 || outlet.id == idx, s"Outlet $outlet was shared among multiple stages. This is illegal.") + outlet.id = idx + idx += 1 + } + + // FIXME: Support for materialized values in fused islands is not yet figured out! + val logicAndMat = stages(i).createLogicAndMaterializedValue + // FIXME: Current temporary hack to support non-fused stages. If there is one stage that will be under index 0. + if (i == 0) finalMat = logicAndMat._2 + + logics(i) = logicAndMat._1 + i += 1 } val inHandlers = Array.ofDim[InHandler](connectionCount) val outHandlers = Array.ofDim[OutHandler](connectionCount) - for (i ← 0 until connectionCount) { + i = 0 + while (i < connectionCount) { if (ins(i) ne null) { - inHandlers(i) = logics(inOwners(i)).inHandlers(ins(i)) - logics(inOwners(i)).inToConn += ins(i) -> i + inHandlers(i) = logics(inOwners(i)).inHandlers(ins(i).id) + logics(inOwners(i)).inToConn(ins(i).id) = i } if (outs(i) ne null) { - outHandlers(i) = logics(outOwners(i)).outHandlers(outs(i)) - logics(outOwners(i)).outToConn += outs(i) -> i + outHandlers(i) = logics(outOwners(i)).outHandlers(outs(i).id) + logics(outOwners(i)).outToConn(outs(i).id) = i } + i += 1 } (inHandlers, outHandlers, logics, finalMat) @@ -168,12 +193,10 @@ private[stream] object GraphInterpreter { * connection represents an output-input port pair (an analogue for a connected RS Publisher-Subscriber pair), * while in the practical sense a connection is a number which represents slots in certain arrays. * In particular - * - connectionStates is a mapping from a connection id to a current (or future) state of the connection - * - inStates is a mapping from a connection to a [[akka.stream.impl.fusing.GraphInterpreter.PortState]] - * that indicates whether the input corresponding - * to the connection is currently pullable or completed - * - outStates is a mapping from a connection to a [[akka.stream.impl.fusing.GraphInterpreter.PortState]] - * that indicates whether the input corresponding to the connection is currently pushable or completed + * - portStates contains a bitfield that tracks the states of the ports (output-input) corresponding to this + * connection. This bitfield is used to decode the event that is in-flight. + * - connectionSlots is a mapping from a connection id to a potential element or exception that accompanies the + * event encoded in the portStates bitfield * - inHandlers is a mapping from a connection id to the [[InHandler]] instance that handles the events corresponding * to the input port of the connection * - outHandlers is a mapping from a connection id to the [[OutHandler]] instance that handles the events corresponding @@ -181,20 +204,39 @@ private[stream] object GraphInterpreter { * * On top of these lookup tables there is an eventQueue, represented as a circular buffer of integers. The integers * it contains represents connections that have pending events to be processed. The pending event itself is encoded - * in the connectionStates table. This implies that there can be only one event in flight for a given connection, which - * is true in almost all cases, except a complete-after-push which is therefore handled with a special event - * [[GraphInterpreter#PushCompleted]]. + * in the portStates bitfield. This implies that there can be only one event in flight for a given connection, which + * is true in almost all cases, except a complete-after-push or fail-after-push. + * + * The layout of the portStates bitfield is the following: + * + * |- state machn.-| Only one bit is hot among these bits + * 64 32 16 | 8 4 2 1 | + * +---+---+---|---+---+---+---| + * | | | | | | | + * | | | | | | | From the following flags only one is active in any given time. These bits encode + * | | | | | | | state machine states, and they are "moved" around using XOR masks to keep other bits + * | | | | | | | intact. + * | | | | | | | + * | | | | | | +- InReady: The input port is ready to be pulled + * | | | | | +----- Pulling: A pull is active, but have not arrived yet (queued) + * | | | | +--------- Pushing: A push is active, but have not arrived yet (queued) + * | | | +------------- OutReady: The output port is ready to be pushed + * | | | + * | | +----------------- InClosed: The input port is closed and will not receive any events. + * | | A push might be still in flight which will be then processed first. + * | +--------------------- OutClosed: The output port is closed and will not receive any events. + * +------------------------- InFailed: Always set in conjunction with InClosed. Indicates that the close event + * is a failure * * Sending an event is usually the following sequence: * - An action is requested by a stage logic (push, pull, complete, etc.) - * - the availability of the port is set on the sender side to Limbo (inStates or outStates) - * - the scheduled event is put in the slot of the connection in the connectionStates table + * - the state machine in portStates is transitioned from a ready state to a pending event * - the id of the affected connection is enqueued * * Receiving an event is usually the following sequence: * - id of connection to be processed is dequeued - * - the type of the event is determined by the object in the corresponding connectionStates slot - * - the availability of the port is set on the receiver side to be Available (inStates or outStates) + * - the type of the event is determined from the bits set on portStates + * - the state machine in portStates is transitioned to a ready state * - using the inHandlers/outHandlers table the corresponding callback is called on the stage logic. * * Because of the FIFO construction of the queue the interpreter is fair, i.e. a pending event is always executed @@ -205,52 +247,48 @@ private[stream] object GraphInterpreter { private[stream] final class GraphInterpreter( private val assembly: GraphInterpreter.GraphAssembly, val materializer: Materializer, + val log: LoggingAdapter, val inHandlers: Array[InHandler], // Lookup table for the InHandler of a connection val outHandlers: Array[OutHandler], // Lookup table for the outHandler of the connection val logics: Array[GraphStageLogic], // Array of stage logics val onAsyncInput: (GraphStageLogic, Any, (Any) ⇒ Unit) ⇒ Unit) { import GraphInterpreter._ - // Maintains the next event (and state) of the connection. - // Technically the connection cannot be considered being in the state that is encoded here before the enqueued - // connection event has been processed. The inStates and outStates arrays usually protect access to this - // field while it is in transient state. - val connectionStates = Array.fill[Any](assembly.connectionCount)(Empty) + // Maintains additional information for events, basically elements in-flight, or failure. + // Other events are encoded in the portStates bitfield. + val connectionSlots = Array.fill[Any](assembly.connectionCount)(Empty) - // Indicates whether the input port is pullable. After pulling it becomes false - // Be aware that when inAvailable goes to false outAvailable does not become true immediately, only after - // the corresponding event in the queue has been processed - val inStates = Array.fill[PortState](assembly.connectionCount)(Available) + // Bitfield encoding pending events and various states for efficient querying and updates. See the documentation + // of the class for a full description. + val portStates = Array.fill[Int](assembly.connectionCount)(InReady) - // Indicates whether the output port is pushable. After pushing it becomes false - // Be aware that when inAvailable goes to false outAvailable does not become true immediately, only after - // the corresponding event in the queue has been processed - val outStates = Array.fill[PortState](assembly.connectionCount)(InFlight) + private[this] var activeStageId = Boundary // The number of currently running stages. Once this counter reaches zero, the interpreter is considered to be // completed - private var runningStages = assembly.stages.length + private[this] var runningStages = assembly.stages.length // Counts how many active connections a stage has. Once it reaches zero, the stage is automatically stopped. - private val shutdownCounter = Array.tabulate(assembly.stages.length) { i ⇒ + private[this] val shutdownCounter = Array.tabulate(assembly.stages.length) { i ⇒ val shape = assembly.stages(i).shape.asInstanceOf[Shape] shape.inlets.size + shape.outlets.size } // An event queue implemented as a circular buffer - private val eventQueue = Array.ofDim[Int](256) - private val mask = eventQueue.length - 1 - private var queueHead: Int = 0 - private var queueTail: Int = 0 + // FIXME: This calculates the maximum size ever needed, but most assemblies can run on a smaller queue + private[this] val eventQueue = Array.ofDim[Int](1 << Integer.highestOneBit(assembly.connectionCount)) + private[this] val mask = eventQueue.length - 1 + private[this] var queueHead: Int = 0 + private[this] var queueTail: Int = 0 /** * Assign the boundary logic to a given connection. This will serve as the interface to the external world * (outside the interpreter) to process and inject events. */ def attachUpstreamBoundary(connection: Int, logic: UpstreamBoundaryStageLogic[_]): Unit = { - logic.outToConn += logic.out -> connection + logic.outToConn(logic.out.id) = connection logic.interpreter = this - outHandlers(connection) = logic.outHandlers.head._2 + outHandlers(connection) = logic.outHandlers(0) } /** @@ -258,9 +296,9 @@ private[stream] final class GraphInterpreter( * (outside the interpreter) to process and inject events. */ def attachDownstreamBoundary(connection: Int, logic: DownstreamBoundaryStageLogic[_]): Unit = { - logic.inToConn += logic.in -> connection + logic.inToConn(logic.in.id) = connection logic.interpreter = this - inHandlers(connection) = logic.inHandlers.head._2 + inHandlers(connection) = logic.inHandlers(0) } /** @@ -279,10 +317,15 @@ private[stream] final class GraphInterpreter( def init(): Unit = { var i = 0 while (i < logics.length) { - logics(i).stageId = i - logics(i).interpreter = this - logics(i).beforePreStart() - logics(i).preStart() + val logic = logics(i) + logic.stageId = i + logic.interpreter = this + try { + logic.beforePreStart() + logic.preStart() + } catch { + case NonFatal(e) ⇒ logic.failStage(e) + } i += 1 } } @@ -293,10 +336,7 @@ private[stream] final class GraphInterpreter( def finish(): Unit = { var i = 0 while (i < logics.length) { - if (!isStageCompleted(i)) { - logics(i).postStop() - logics(i).afterPostStop() - } + if (!isStageCompleted(i)) finalizeStage(logics(i)) i += 1 } } @@ -323,16 +363,8 @@ private[stream] final class GraphInterpreter( try processEvent(connection) catch { case NonFatal(e) ⇒ - val stageId = connectionStates(connection) match { - case Failed(ex) ⇒ throw new IllegalStateException("Double fault. Failure while handling failure.", e) - case Pulled ⇒ assembly.outOwners(connection) - case Completed ⇒ assembly.inOwners(connection) - case Cancelled ⇒ assembly.outOwners(connection) - case PushCompleted(elem) ⇒ assembly.inOwners(connection) - case pushedElem ⇒ assembly.inOwners(connection) - } - if (stageId == Boundary) throw e - else logics(stageId).failStage(e) + if (activeStageId == Boundary) throw e + else logics(activeStageId).failStage(e) } eventsRemaining -= 1 if (eventsRemaining > 0) connection = dequeue() @@ -344,61 +376,62 @@ private[stream] final class GraphInterpreter( private def processEvent(connection: Int): Unit = { def processElement(elem: Any): Unit = { - if (!isStageCompleted(assembly.inOwners(connection))) { - if (GraphInterpreter.Debug) println(s"PUSH ${outOwnerName(connection)} -> ${inOwnerName(connection)}, $elem") - inStates(connection) = Available - inHandlers(connection).onPush() + if (GraphInterpreter.Debug) println(s"PUSH ${outOwnerName(connection)} -> ${inOwnerName(connection)}, $elem") + activeStageId = assembly.inOwners(connection) + portStates(connection) ^= PushEndFlip + inHandlers(connection).onPush() + } + + val code = portStates(connection) + + // Manual fast decoding, fast paths are PUSH and PULL + // PUSH + if ((code & (Pushing | InClosed | OutClosed)) == Pushing) { + processElement(connectionSlots(connection)) + + // PULL + } else if ((code & (Pulling | OutClosed | InClosed)) == Pulling) { + if (GraphInterpreter.Debug) println(s"PULL ${inOwnerName(connection)} -> ${outOwnerName(connection)}") + portStates(connection) ^= PullEndFlip + activeStageId = assembly.outOwners(connection) + outHandlers(connection).onPull() + + // CANCEL + } else if ((code & (OutClosed | InClosed)) == InClosed) { + val stageId = assembly.outOwners(connection) + if (GraphInterpreter.Debug) println(s"CANCEL ${inOwnerName(connection)} -> ${outOwnerName(connection)}") + portStates(connection) |= OutClosed + activeStageId = assembly.outOwners(connection) + outHandlers(connection).onDownstreamFinish() + completeConnection(stageId) + } else if ((code & (OutClosed | InClosed)) == OutClosed) { + // COMPLETIONS + + val stageId = assembly.inOwners(connection) + + if ((code & Pushing) == 0) { + // Normal completion (no push pending) + if (GraphInterpreter.Debug) println(s"COMPLETE ${outOwnerName(connection)} -> ${inOwnerName(connection)}") + portStates(connection) |= InClosed + activeStageId = assembly.inOwners(connection) + if ((portStates(connection) & InFailed) == 0) inHandlers(connection).onUpstreamFinish() + else inHandlers(connection).onUpstreamFailure(connectionSlots(connection).asInstanceOf[Failed].ex) + completeConnection(stageId) + } else { + // Push is pending, first process push, then re-enqueue closing event + // Non-failure case + val code = portStates(connection) & (InClosed | InFailed) + if (code == 0) { + processElement(connectionSlots(connection)) + enqueue(connection) + } else if (code == InFailed) { + // Failure case + processElement(connectionSlots(connection).asInstanceOf[Failed].previousElem) + enqueue(connection) + } } - } - - connectionStates(connection) match { - case Pulled ⇒ - if (!isStageCompleted(assembly.outOwners(connection))) { - if (GraphInterpreter.Debug) println(s"PULL ${inOwnerName(connection)} -> ${outOwnerName(connection)}") - outStates(connection) = Available - outHandlers(connection).onPull() - } - case Completed | CompletedHasElement(_) ⇒ - val stageId = assembly.inOwners(connection) - if (!isStageCompleted(stageId) && inStates(connection) != Closed) { - if (GraphInterpreter.Debug) println(s"COMPLETE ${outOwnerName(connection)} -> ${inOwnerName(connection)}") - inStates(connection) = Closed - inHandlers(connection).onUpstreamFinish() - completeConnection(stageId) - } - case Failed(ex) ⇒ - val stageId = assembly.inOwners(connection) - if (!isStageCompleted(stageId) && inStates(connection) != Closed) { - if (GraphInterpreter.Debug) println(s"FAIL ${outOwnerName(connection)} -> ${inOwnerName(connection)}") - inStates(connection) = Closed - inHandlers(connection).onUpstreamFailure(ex) - completeConnection(stageId) - } - case Cancelled ⇒ - val stageId = assembly.outOwners(connection) - if (!isStageCompleted(stageId) && outStates(connection) != Closed) { - if (GraphInterpreter.Debug) println(s"CANCEL ${inOwnerName(connection)} -> ${outOwnerName(connection)}") - outStates(connection) = Closed - outHandlers(connection).onDownstreamFinish() - completeConnection(stageId) - } - case PushCompleted(elem) ⇒ - val stageId = assembly.inOwners(connection) - if (!isStageCompleted(stageId) && inStates(connection) != Closed) { - inStates(connection) = Available - connectionStates(connection) = elem - processElement(elem) - val elemAfter = connectionStates(connection) - if (elemAfter == Empty) enqueue(connection, Completed) - else enqueue(connection, CompletedHasElement(elemAfter)) - } else { - connectionStates(connection) = Completed - } - - case pushedElem ⇒ processElement(pushedElem) } - } private def dequeue(): Int = { @@ -412,29 +445,14 @@ private[stream] final class GraphInterpreter( } } - private def enqueue(connection: Int, event: Any): Unit = { - connectionStates(connection) = event + private def enqueue(connection: Int): Unit = { eventQueue(queueTail & mask) = connection queueTail += 1 } - // Returns true if a connection has been completed *or if the completion event is already enqueued*. This is useful - // to prevent redundant completion events in case of concurrent invocation on both sides of the connection. - // I.e. when one side already enqueued the completion event, then the other side will not enqueue the event since - // there is noone to process it anymore. - def isConnectionCompleting(connection: Int): Boolean = connectionStates(connection).isInstanceOf[CompletingState] - // Returns true if the given stage is alredy completed def isStageCompleted(stageId: Int): Boolean = stageId != Boundary && shutdownCounter(stageId) == 0 - private def isPushInFlight(connection: Int): Boolean = - (inStates(connection) == InFlight) && // Other side has not been notified - hasElement(connection) - - private def hasElement(connection: Int): Boolean = - !connectionStates(connection).isInstanceOf[ConnectionState] && - connectionStates(connection) != Empty - // Register that a connection in which the given stage participated has been completed and therefore the stage // itself might stop, too. private def completeConnection(stageId: Int): Unit = { @@ -445,53 +463,66 @@ private[stream] final class GraphInterpreter( // This was the last active connection keeping this stage alive if (activeConnections == 1) { runningStages -= 1 - logics(stageId).postStop() - logics(stageId).afterPostStop() + finalizeStage(logics(stageId)) } } } } + private def finalizeStage(logic: GraphStageLogic): Unit = { + try { + logic.postStop() + logic.afterPostStop() + } catch { + case NonFatal(e) ⇒ + log.error(s"Error during postStop in [${assembly.stages(logic.stageId)}]", e) + } + } + private[stream] def push(connection: Int, elem: Any): Unit = { - if (!(inStates(connection) eq Closed)) { - outStates(connection) = InFlight - enqueue(connection, elem) + if ((portStates(connection) & InClosed) == 0) { + portStates(connection) ^= PushStartFlip + connectionSlots(connection) = elem + enqueue(connection) } } private[stream] def pull(connection: Int): Unit = { - if (!(outStates(connection) eq Closed)) { - inStates(connection) = InFlight - enqueue(connection, Pulled) + if ((portStates(connection) & OutClosed) == 0) { + portStates(connection) ^= PullStartFlip + enqueue(connection) } } private[stream] def complete(connection: Int): Unit = { - outStates(connection) = Closed - if (!isConnectionCompleting(connection) && (inStates(connection) ne Closed)) { - if (hasElement(connection)) { - // There is a pending push, we change the signal to be a PushCompleted (there can be only one signal in flight - // for a connection) - if (inStates(connection) == InFlight) - connectionStates(connection) = PushCompleted(connectionStates(connection)) - else enqueue(connection, CompletedHasElement(connectionStates(connection))) - } else enqueue(connection, Completed) + val currentState = portStates(connection) + portStates(connection) = portStates(connection) | OutClosed + if ((currentState & InClosed) == 0) { + if ((currentState & Pushing) != 0) {} // FIXME: Fold into previous condition + else if (connectionSlots(connection) != Empty) + enqueue(connection) + else + enqueue(connection) } completeConnection(assembly.outOwners(connection)) } private[stream] def fail(connection: Int, ex: Throwable): Unit = { - outStates(connection) = Closed - if (!isConnectionCompleting(connection) && (inStates(connection) ne Closed)) - enqueue(connection, Failed(ex)) + portStates(connection) |= (OutClosed | InFailed) + if ((portStates(connection) & InClosed) == 0) { + connectionSlots(connection) = Failed(ex, connectionSlots(connection)) + enqueue(connection) + } completeConnection(assembly.outOwners(connection)) } private[stream] def cancel(connection: Int): Unit = { - inStates(connection) = Closed - if (!isConnectionCompleting(connection) && (outStates(connection) ne Closed)) - enqueue(connection, Cancelled) + portStates(connection) |= InClosed + if ((portStates(connection) & OutClosed) == 0) { + connectionSlots(connection) = Empty + enqueue(connection) + } completeConnection(assembly.inOwners(connection)) } diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphStages.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphStages.scala index 8fe324ee1a..6370e2a6a2 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphStages.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphStages.scala @@ -25,10 +25,9 @@ object GraphStages { val out = Outlet[T]("out") override val shape = FlowShape(in, out) - protected abstract class SimpleLinearStageLogic extends GraphStageLogic { + protected abstract class SimpleLinearStageLogic extends GraphStageLogic(shape) { setHandler(out, new OutHandler { override def onPull(): Unit = pull(in) - override def onDownstreamFinish(): Unit = completeStage() }) } @@ -36,11 +35,9 @@ object GraphStages { class Identity[T] extends SimpleLinearGraphStage[T] { - override def createLogic: GraphStageLogic = new SimpleLinearStageLogic { + override def createLogic: GraphStageLogic = new SimpleLinearStageLogic() { setHandler(in, new InHandler { override def onPush(): Unit = push(out, grab(in)) - override def onUpstreamFinish(): Unit = completeStage() - override def onUpstreamFailure(ex: Throwable): Unit = failStage(ex) }) } @@ -52,7 +49,7 @@ object GraphStages { val out = Outlet[T]("out") override val shape = FlowShape(in, out) - override def createLogic: GraphStageLogic = new GraphStageLogic { + override def createLogic: GraphStageLogic = new GraphStageLogic(shape) { var initialized = false setHandler(in, new InHandler { @@ -108,7 +105,7 @@ object GraphStages { val cancelled = new AtomicBoolean(false) val cancellable = new TickSourceCancellable(cancelled) - val logic = new GraphStageLogic { + val logic = new GraphStageLogic(shape) { override def preStart() = { schedulePeriodicallyWithInitialDelay("TickTimer", initialDelay, interval) val callback = getAsyncCallback[Unit]((_) ⇒ { diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala index 4538f3a957..545104b56c 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala @@ -744,7 +744,7 @@ private[stream] class GroupedWithin[T](n: Int, d: FiniteDuration) extends GraphS val out = Outlet[immutable.Seq[T]]("out") val shape = FlowShape(in, out) - override def createLogic: GraphStageLogic = new GraphStageLogic { + override def createLogic: GraphStageLogic = new GraphStageLogic(shape) { private val buf: VectorBuilder[T] = new VectorBuilder // True if: // - buf is nonEmpty diff --git a/akka-stream/src/main/scala/akka/stream/io/Timeouts.scala b/akka-stream/src/main/scala/akka/stream/io/Timeouts.scala index a373129a4d..3fd3716ce2 100644 --- a/akka-stream/src/main/scala/akka/stream/io/Timeouts.scala +++ b/akka-stream/src/main/scala/akka/stream/io/Timeouts.scala @@ -129,7 +129,7 @@ object Timeouts { override def toString = "IdleTimeoutBidi" - override def createLogic: GraphStageLogic = new GraphStageLogic { + override def createLogic: GraphStageLogic = new GraphStageLogic(shape) { private var nextDeadline: Deadline = Deadline.now + timeout setHandler(in1, new InHandler { diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala index 59cc1a5adc..5eec534865 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala @@ -40,7 +40,7 @@ class Merge[T](val inputPorts: Int) extends GraphStage[UniformFanInShape[T, T]] val out: Outlet[T] = Outlet[T]("Merge.out") override val shape: UniformFanInShape[T, T] = UniformFanInShape(out, in: _*) - override def createLogic: GraphStageLogic = new GraphStageLogic { + override def createLogic: GraphStageLogic = new GraphStageLogic(shape) { private var initialized = false private val pendingQueue = Array.ofDim[Inlet[T]](inputPorts) @@ -143,7 +143,7 @@ class MergePreferred[T] private (secondaryPorts: Int) extends GraphStage[MergePr def preferred: Inlet[T] = shape.preferred // FIXME: Factor out common stuff with Merge - override def createLogic: GraphStageLogic = new GraphStageLogic { + override def createLogic: GraphStageLogic = new GraphStageLogic(shape) { private var initialized = false private val pendingQueue = Array.ofDim[Inlet[T]](secondaryPorts) @@ -252,7 +252,7 @@ class Broadcast[T](private val outputPorts: Int, eagerCancel: Boolean) extends G val out: immutable.IndexedSeq[Outlet[T]] = Vector.tabulate(outputPorts)(i ⇒ Outlet[T]("Broadcast.out" + i)) override val shape: UniformFanOutShape[T, T] = UniformFanOutShape(in, out: _*) - override def createLogic: GraphStageLogic = new GraphStageLogic { + override def createLogic: GraphStageLogic = new GraphStageLogic(shape) { private var pendingCount = outputPorts private val pending = Array.fill[Boolean](outputPorts)(true) private var downstreamsRunning = outputPorts @@ -350,7 +350,7 @@ class Balance[T](val outputPorts: Int, waitForAllDownstreams: Boolean) extends G val out: immutable.IndexedSeq[Outlet[T]] = Vector.tabulate(outputPorts)(i ⇒ Outlet[T]("Balance.out" + i)) override val shape: UniformFanOutShape[T, T] = UniformFanOutShape[T, T](in, out: _*) - override def createLogic: GraphStageLogic = new GraphStageLogic { + override def createLogic: GraphStageLogic = new GraphStageLogic(shape) { private val pendingQueue = Array.ofDim[Outlet[T]](outputPorts) private var pendingHead: Int = 0 private var pendingTail: Int = 0 @@ -522,7 +522,7 @@ class Concat[T](inputCount: Int) extends GraphStage[UniformFanInShape[T, T]] { val out: Outlet[T] = Outlet[T]("Concat.out") override val shape: UniformFanInShape[T, T] = UniformFanInShape(out, in: _*) - override def createLogic = new GraphStageLogic { + override def createLogic = new GraphStageLogic(shape) { var activeStream: Int = 0 { diff --git a/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala b/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala index 1da600c696..3007504760 100644 --- a/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala +++ b/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala @@ -85,10 +85,12 @@ private object TimerMessages { * The stage logic is always stopped once all its input and output ports have been closed, i.e. it is not possible to * keep the stage alive for further processing once it does not have any open ports. */ -abstract class GraphStageLogic { +abstract class GraphStageLogic private[stream] (inCount: Int, outCount: Int) { import GraphInterpreter._ import TimerMessages._ + def this(shape: Shape) = this(shape.inlets.size, shape.outlets.size) + private val keyToTimers = mutable.Map[Any, Timer]() private val timerIdGen = Iterator from 1 private var queuedTimerEvents = List.empty[Queued] @@ -109,20 +111,20 @@ abstract class GraphStageLogic { /** * INTERNAL API */ - private[stream] var inHandlers = scala.collection.Map.empty[Inlet[_], InHandler] + private[stream] var inHandlers = Array.ofDim[InHandler](inCount) /** * INTERNAL API */ - private[stream] var outHandlers = scala.collection.Map.empty[Outlet[_], OutHandler] + private[stream] var outHandlers = Array.ofDim[OutHandler](outCount) /** * INTERNAL API */ - private[stream] var inToConn = scala.collection.Map.empty[Inlet[_], Int] + private[stream] var inToConn = Array.ofDim[Int](inHandlers.length) /** * INTERNAL API */ - private[stream] var outToConn = scala.collection.Map.empty[Outlet[_], Int] + private[stream] var outToConn = Array.ofDim[Int](outHandlers.length) /** * INTERNAL API @@ -134,18 +136,18 @@ abstract class GraphStageLogic { */ final protected def setHandler(in: Inlet[_], handler: InHandler): Unit = { handler.ownerStageLogic = this - inHandlers += in -> handler + inHandlers(in.id) = handler } /** * Assigns callbacks for the events for an [[Outlet]] */ final protected def setHandler(out: Outlet[_], handler: OutHandler): Unit = { handler.ownerStageLogic = this - outHandlers += out -> handler + outHandlers(out.id) = handler } - private def conn[T](in: Inlet[T]): Int = inToConn(in) - private def conn[T](out: Outlet[T]): Int = outToConn(out) + private def conn[T](in: Inlet[T]): Int = inToConn(in.id) + private def conn[T](out: Outlet[T]): Int = outToConn(out.id) /** * Requests an element on the given port. Calling this method twice before an element arrived will fail. @@ -153,9 +155,13 @@ abstract class GraphStageLogic { * query whether pull is allowed to be called or not. */ final protected def pull[T](in: Inlet[T]): Unit = { - require(!hasBeenPulled(in), "Cannot pull port twice") - require(!isClosed(in), "Cannot pull closed port") - interpreter.pull(conn(in)) + if ((interpreter.portStates(conn(in)) & (InReady | InClosed)) == InReady) { + interpreter.pull(conn(in)) + } else { + // Detailed error information should not add overhead to the hot path + require(!isClosed(in), "Cannot pull closed port") + require(!hasBeenPulled(in), "Cannot pull port twice") + } } /** @@ -171,17 +177,20 @@ abstract class GraphStageLogic { * The method [[isAvailable()]] can be used to query if the port has an element that can be grabbed or not. */ final protected def grab[T](in: Inlet[T]): T = { - require(isAvailable(in), "Cannot get element from already empty input port") val connection = conn(in) - val elem = interpreter.connectionStates(connection) - - elem match { - case CompletedHasElement(realElem) ⇒ - interpreter.connectionStates(connection) = Completed - realElem.asInstanceOf[T] - case _ ⇒ - interpreter.connectionStates(connection) = Empty - elem.asInstanceOf[T] + // Fast path + if ((interpreter.portStates(connection) & (InReady | InFailed)) == InReady && + (interpreter.connectionSlots(connection).asInstanceOf[AnyRef] ne Empty)) { + val elem = interpreter.connectionSlots(connection) + interpreter.connectionSlots(connection) = Empty + elem.asInstanceOf[T] + } else { + // Slow path + require(isAvailable(in), "Cannot get element from already empty input port") + val failed = interpreter.connectionSlots(connection).asInstanceOf[Failed] + val elem = failed.previousElem.asInstanceOf[T] + interpreter.connectionSlots(connection) = Failed(failed.ex, Empty) + elem } } @@ -189,7 +198,7 @@ abstract class GraphStageLogic { * Indicates whether there is already a pending pull for the given input port. If this method returns true * then [[isAvailable()]] must return false for that same port. */ - final protected def hasBeenPulled[T](in: Inlet[T]): Boolean = interpreter.inStates(conn(in)) eq InFlight + final protected def hasBeenPulled[T](in: Inlet[T]): Boolean = (interpreter.portStates(conn(in)) & (InReady | InClosed)) == 0 /** * Indicates whether there is an element waiting at the given input port. [[grab()]] can be used to retrieve the @@ -199,19 +208,26 @@ abstract class GraphStageLogic { */ final protected def isAvailable[T](in: Inlet[T]): Boolean = { val connection = conn(in) - val state = interpreter.connectionStates(connection) - val arrived = interpreter.inStates(connection) ne InFlight - val hasElementState = state.isInstanceOf[HasElementState] - val rawElement = (state != Empty) && !state.isInstanceOf[ConnectionState] + val normalArrived = (interpreter.portStates(conn(in)) & (InReady | InFailed)) == InReady - arrived && (hasElementState || rawElement) + // Fast path + if (normalArrived) interpreter.connectionSlots(connection).asInstanceOf[AnyRef] ne Empty + else { + // Slow path on failure + if ((interpreter.portStates(conn(in)) & (InReady | InFailed)) == (InReady | InFailed)) { + interpreter.connectionSlots(connection) match { + case Failed(_, elem) ⇒ elem.asInstanceOf[AnyRef] ne Empty + case _ ⇒ false // This can only be Empty actually (if a cancel was concurrent with a failure) + } + } else false + } } /** * Indicates whether the port has been closed. A closed port cannot be pulled. */ - final protected def isClosed[T](in: Inlet[T]): Boolean = interpreter.inStates(conn(in)) eq Closed + final protected def isClosed[T](in: Inlet[T]): Boolean = (interpreter.portStates(conn(in)) & InClosed) != 0 /** * Emits an element through the given output port. Calling this method twice before a [[pull()]] has been arrived @@ -219,9 +235,13 @@ abstract class GraphStageLogic { * used to check if the port is ready to be pushed or not. */ final protected def push[T](out: Outlet[T], elem: T): Unit = { - require(isAvailable(out), "Cannot push port twice") - require(!isClosed(out), "Cannot pull closed port") - interpreter.push(conn(out), elem) + if ((interpreter.portStates(conn(out)) & (OutReady | OutClosed)) == OutReady) { + interpreter.push(conn(out), elem) + } else { + // Detailed error information should not add overhead to the hot path + require(isAvailable(out), "Cannot push port twice") + require(!isClosed(out), "Cannot pull closed port") + } } /** @@ -239,8 +259,16 @@ abstract class GraphStageLogic { * then stops the stage, then [[postStop()]] is called. */ final def completeStage(): Unit = { - inToConn.valuesIterator.foreach(interpreter.cancel) - outToConn.valuesIterator.foreach(interpreter.complete) + var i = 0 + while (i < inToConn.length) { + interpreter.cancel(inToConn(i)) + i += 1 + } + i = 0 + while (i < outToConn.length) { + interpreter.complete(outToConn(i)) + i += 1 + } } /** @@ -248,19 +276,28 @@ abstract class GraphStageLogic { * then stops the stage, then [[postStop()]] is called. */ final def failStage(ex: Throwable): Unit = { - inToConn.valuesIterator.foreach(interpreter.cancel) - outToConn.valuesIterator.foreach(interpreter.fail(_, ex)) + var i = 0 + while (i < inToConn.length) { + interpreter.cancel(inToConn(i)) + i += 1 + } + i = 0 + while (i < outToConn.length) { + interpreter.fail(outToConn(i), ex) + i += 1 + } } /** * Return true if the given output port is ready to be pushed. */ - final def isAvailable[T](out: Outlet[T]): Boolean = interpreter.outStates(conn(out)) eq Available + final def isAvailable[T](out: Outlet[T]): Boolean = + (interpreter.portStates(conn(out)) & (OutReady | OutClosed)) == OutReady /** * Indicates whether the port has been closed. A closed port cannot be pushed. */ - final protected def isClosed[T](out: Outlet[T]): Boolean = interpreter.outStates(conn(out)) eq Closed + final protected def isClosed[T](out: Outlet[T]): Boolean = (interpreter.portStates(conn(out)) & OutClosed) != 0 /** * Obtain a callback object that can be used asynchronously to re-enter the