From b4983479336d4de37471324d2efa26d984158756 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Endre=20S=C3=A1ndor=20Varga?= Date: Wed, 20 Jul 2016 12:08:30 +0200 Subject: [PATCH 1/7] baseline benchmark --- .../akka/stream/FusedGraphsBenchmark.scala | 322 ++++++++++++++++++ 1 file changed, 322 insertions(+) create mode 100644 akka-bench-jmh/src/main/scala/akka/stream/FusedGraphsBenchmark.scala diff --git a/akka-bench-jmh/src/main/scala/akka/stream/FusedGraphsBenchmark.scala b/akka-bench-jmh/src/main/scala/akka/stream/FusedGraphsBenchmark.scala new file mode 100644 index 0000000000..ba8e12a2a0 --- /dev/null +++ b/akka-bench-jmh/src/main/scala/akka/stream/FusedGraphsBenchmark.scala @@ -0,0 +1,322 @@ +/** + * Copyright (C) 2014-2016 Lightbend Inc. + */ + +package akka.stream + +import java.util.concurrent.{ CountDownLatch, TimeUnit } + +import akka.NotUsed +import akka.actor.ActorSystem +import akka.stream.impl.fusing.GraphStages +import akka.stream.scaladsl._ +import akka.stream.stage._ +import org.openjdk.jmh.annotations.{ OperationsPerInvocation, _ } + +import scala.concurrent.Await +import scala.concurrent.duration._ + +object FusedGraphsBenchmark { + val ElementCount = 100 * 1000 + + @volatile var blackhole: org.openjdk.jmh.infra.Blackhole = _ +} + +// Just to avoid allocations and still have a way to do some work in stages. The value itself does not matter +// so no issues with sharing (the result does not make any sense, but hey) +class MutableElement(var value: Int) + +class TestSource(elems: Array[MutableElement]) extends GraphStage[SourceShape[MutableElement]] { + val out = Outlet[MutableElement]("TestSource.out") + override val shape = SourceShape(out) + + override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with OutHandler { + private[this] var left = FusedGraphsBenchmark.ElementCount - 1 + + override def onPull(): Unit = { + if (left >= 0) { + push(out, elems(left)) + left -= 1 + } else completeStage() + } + + setHandler(out, this) + } +} + +class JitSafeCompletionLatch extends GraphStageWithMaterializedValue[SinkShape[MutableElement], CountDownLatch] { + val in = Inlet[MutableElement]("JitSafeCompletionLatch.in") + override val shape = SinkShape(in) + + override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, CountDownLatch) = { + val latch = new CountDownLatch(1) + val logic = new GraphStageLogic(shape) with InHandler { + private[this] var sum = 0 + + override def preStart(): Unit = pull(in) + override def onPush(): Unit = { + sum += grab(in).value + pull(in) + } + + override def onUpstreamFinish(): Unit = { + // Do not ignore work along the chain + FusedGraphsBenchmark.blackhole.consume(sum) + latch.countDown() + completeStage() + } + + setHandler(in, this) + } + + (logic, latch) + } +} + +class IdentityStage extends GraphStage[FlowShape[MutableElement, MutableElement]] { + val in = Inlet[MutableElement]("Identity.in") + val out = Outlet[MutableElement]("Identity.out") + override val shape = FlowShape(in, out) + + override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with InHandler with OutHandler { + override def onPush(): Unit = push(out, grab(in)) + override def onPull(): Unit = pull(in) + + setHandlers(in, out, this) + } +} + +@State(Scope.Benchmark) +@OutputTimeUnit(TimeUnit.MILLISECONDS) +@BenchmarkMode(Array(Mode.Throughput)) +class FusedGraphsBenchmark { + import FusedGraphsBenchmark._ + + implicit val system = ActorSystem("test") + var materializer: ActorMaterializer = _ + var testElements: Array[MutableElement] = _ + + var singleIdentity: RunnableGraph[CountDownLatch] = _ + var chainOfIdentities: RunnableGraph[CountDownLatch] = _ + var singleMap: RunnableGraph[CountDownLatch] = _ + var chainOfMaps: RunnableGraph[CountDownLatch] = _ + var repeatTakeMapAndFold: RunnableGraph[CountDownLatch] = _ + var singleBuffer: RunnableGraph[CountDownLatch] = _ + var chainOfBuffers: RunnableGraph[CountDownLatch] = _ + var broadcastZip: RunnableGraph[CountDownLatch] = _ + var balanceMerge: RunnableGraph[CountDownLatch] = _ + var broadcastZipBalanceMerge: RunnableGraph[CountDownLatch] = _ + + @Setup + def setup(): Unit = { + val settings = ActorMaterializerSettings(system) + .withFuzzing(false) + .withSyncProcessingLimit(Int.MaxValue) + .withAutoFusing(false) // We fuse manually in this test in the setup + + materializer = ActorMaterializer(settings) + testElements = Array.fill(ElementCount)(new MutableElement(0)) + val addFunc = (x: MutableElement) => { x.value += 1; x } + + val testSource = Source.fromGraph(new TestSource(testElements)) + val testSink = Sink.fromGraph(new JitSafeCompletionLatch) + + def fuse(r: RunnableGraph[CountDownLatch]): RunnableGraph[CountDownLatch] = { + RunnableGraph.fromGraph(Fusing.aggressive(r)) + } + + val identityStage = new IdentityStage + + singleIdentity = + fuse( + testSource + .via(identityStage) + .toMat(testSink)(Keep.right) + ) + + chainOfIdentities = + fuse( + testSource + .via(identityStage) + .via(identityStage) + .via(identityStage) + .via(identityStage) + .via(identityStage) + .via(identityStage) + .via(identityStage) + .via(identityStage) + .via(identityStage) + .via(identityStage) + .toMat(testSink)(Keep.right) + ) + + singleMap = + fuse( + testSource + .map(addFunc) + .toMat(testSink)(Keep.right) + ) + + chainOfMaps = + fuse( + testSource + .map(addFunc) + .toMat(testSink)(Keep.right) + ) + + repeatTakeMapAndFold = + fuse( + Source.repeat(new MutableElement(0)) + .take(ElementCount) + .map(addFunc) + .map(addFunc) + .fold(new MutableElement(0))((acc, x) => { acc.value += x.value; acc }) + .toMat(testSink)(Keep.right) + ) + + singleBuffer = + fuse( + testSource + .buffer(10, OverflowStrategy.backpressure) + .toMat(testSink)(Keep.right) + ) + + chainOfBuffers = + fuse( + testSource + .buffer(10, OverflowStrategy.backpressure) + .buffer(10, OverflowStrategy.backpressure) + .buffer(10, OverflowStrategy.backpressure) + .buffer(10, OverflowStrategy.backpressure) + .buffer(10, OverflowStrategy.backpressure) + .buffer(10, OverflowStrategy.backpressure) + .buffer(10, OverflowStrategy.backpressure) + .buffer(10, OverflowStrategy.backpressure) + .buffer(10, OverflowStrategy.backpressure) + .buffer(10, OverflowStrategy.backpressure) + .toMat(testSink)(Keep.right) + ) + + val broadcastZipFlow: Flow[MutableElement, MutableElement, NotUsed] = Flow.fromGraph(GraphDSL.create() { implicit b => + import GraphDSL.Implicits._ + + val bcast = b.add(Broadcast[MutableElement](2)) + val zip = b.add(Zip[MutableElement, MutableElement]()) + + bcast ~> zip.in0 + bcast ~> zip.in1 + + FlowShape(bcast.in, zip.out.map(_._1).outlet) + }) + + val balanceMergeFlow: Flow[MutableElement, MutableElement, NotUsed] = Flow.fromGraph(GraphDSL.create() { implicit b => + import GraphDSL.Implicits._ + + val balance = b.add(Balance[MutableElement](2)) + val merge = b.add(Merge[MutableElement](2)) + + balance ~> merge + balance ~> merge + + FlowShape(balance.in, merge.out) + }) + + broadcastZip = + fuse( + testSource + .via(broadcastZipFlow) + .toMat(testSink)(Keep.right) + ) + + balanceMerge = + fuse( + testSource + .via(balanceMergeFlow) + .toMat(testSink)(Keep.right) + ) + + broadcastZipBalanceMerge = + fuse( + testSource + .via(broadcastZipFlow) + .via(balanceMergeFlow) + .toMat(testSink)(Keep.right) + ) + } + + @Benchmark + @OperationsPerInvocation(100 * 1000) + def single_identity(blackhole: org.openjdk.jmh.infra.Blackhole): Unit = { + FusedGraphsBenchmark.blackhole = blackhole + singleIdentity.run()(materializer).await() + } + + @Benchmark + @OperationsPerInvocation(100 * 1000) + def chain_of_identities(blackhole: org.openjdk.jmh.infra.Blackhole): Unit = { + FusedGraphsBenchmark.blackhole = blackhole + chainOfIdentities.run()(materializer).await() + } + + @Benchmark + @OperationsPerInvocation(100 * 1000) + def single_map(blackhole: org.openjdk.jmh.infra.Blackhole): Unit = { + FusedGraphsBenchmark.blackhole = blackhole + singleMap.run()(materializer).await() + } + + @Benchmark + @OperationsPerInvocation(100 * 1000) + def chain_of_maps(blackhole: org.openjdk.jmh.infra.Blackhole): Unit = { + FusedGraphsBenchmark.blackhole = blackhole + chainOfMaps.run()(materializer).await() + } + + @Benchmark + @OperationsPerInvocation(100 * 1000) + def repeat_take_map_and_fold(blackhole: org.openjdk.jmh.infra.Blackhole): Unit = { + FusedGraphsBenchmark.blackhole = blackhole + repeatTakeMapAndFold.run()(materializer).await() + } + + @Benchmark + @OperationsPerInvocation(100 * 1000) + def single_buffer(blackhole: org.openjdk.jmh.infra.Blackhole): Unit = { + FusedGraphsBenchmark.blackhole = blackhole + singleBuffer.run()(materializer).await() + } + + @Benchmark + @OperationsPerInvocation(100 * 1000) + def chain_of_buffers(blackhole: org.openjdk.jmh.infra.Blackhole): Unit = { + FusedGraphsBenchmark.blackhole = blackhole + chainOfBuffers.run()(materializer).await() + } + + @Benchmark + @OperationsPerInvocation(100 * 1000) + def broadcast_zip(blackhole: org.openjdk.jmh.infra.Blackhole): Unit = { + FusedGraphsBenchmark.blackhole = blackhole + broadcastZip.run()(materializer).await() + } + + @Benchmark + @OperationsPerInvocation(100 * 1000) + def balance_merge(blackhole: org.openjdk.jmh.infra.Blackhole): Unit = { + FusedGraphsBenchmark.blackhole = blackhole + balanceMerge.run()(materializer).await() + } + + @Benchmark + @OperationsPerInvocation(100 * 1000) + def boradcast_zip_balance_merge(blackhole: org.openjdk.jmh.infra.Blackhole): Unit = { + FusedGraphsBenchmark.blackhole = blackhole + broadcastZipBalanceMerge.run()(materializer).await() + } + + @TearDown + def shutdown(): Unit = { + Await.result(system.terminate(), 5.seconds) + } + +} From 5382014133d01cd38729d731763841680f93a42c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Endre=20S=C3=A1ndor=20Varga?= Date: Wed, 20 Jul 2016 13:26:27 +0200 Subject: [PATCH 2/7] make Map a GraphStage --- .../impl/fusing/GraphInterpreterSpecKit.scala | 2 - .../stream/impl/fusing/InterpreterSpec.scala | 77 +++++++++++-------- .../impl/fusing/InterpreterStressSpec.scala | 2 +- .../fusing/InterpreterSupervisionSpec.scala | 47 ++++++----- .../impl/fusing/IteratorInterpreterSpec.scala | 10 +-- .../fusing/LifecycleInterpreterSpec.scala | 8 +- .../scala/akka/stream/scaladsl/FlowSpec.scala | 2 +- .../main/scala/akka/stream/impl/Stages.scala | 4 - .../scala/akka/stream/impl/fusing/Ops.scala | 29 ++++++- .../scala/akka/stream/scaladsl/Flow.scala | 2 +- 10 files changed, 108 insertions(+), 75 deletions(-) diff --git a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/GraphInterpreterSpecKit.scala b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/GraphInterpreterSpecKit.scala index a6f060be2d..cf0a8f788c 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/GraphInterpreterSpecKit.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/GraphInterpreterSpecKit.scala @@ -310,8 +310,6 @@ trait GraphInterpreterSpecKit extends StreamSpec { abstract class OneBoundedSetup[T](_ops: GraphStageWithMaterializedValue[Shape, Any]*) extends Builder { val ops = _ops.toArray - def this(op: Seq[Stage[_, _]], dummy: Int = 42) = this(op.map(_.toGS): _*) - val upstream = new UpstreamOneBoundedProbe[T] val downstream = new DownstreamOneBoundedPortProbe[T] var lastEvent = Set.empty[TestEvent] diff --git a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/InterpreterSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/InterpreterSpec.scala index f8b3526713..61d982801d 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/InterpreterSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/InterpreterSpec.scala @@ -7,8 +7,8 @@ import akka.stream.impl.ConstantFun import akka.stream.stage._ import akka.stream.testkit.StreamSpec import akka.testkit.EventFilter - import akka.stream._ +import akka.stream.impl.fusing.GraphStages.SimpleLinearGraphStage class InterpreterSpec extends StreamSpec with GraphInterpreterSpecKit { import Supervision.stoppingDecider @@ -24,7 +24,7 @@ class InterpreterSpec extends StreamSpec with GraphInterpreterSpecKit { "Interpreter" must { - "implement map correctly" in new OneBoundedSetup[Int](Seq(Map((x: Int) ⇒ x + 1, stoppingDecider))) { + "implement map correctly" in new OneBoundedSetup[Int](Map((x: Int) ⇒ x + 1)) { lastEvents() should be(Set.empty) downstream.requestOne() @@ -43,10 +43,10 @@ class InterpreterSpec extends StreamSpec with GraphInterpreterSpecKit { lastEvents() should be(Set(OnComplete)) } - "implement chain of maps correctly" in new OneBoundedSetup[Int](Seq( - Map((x: Int) ⇒ x + 1, stoppingDecider), - Map((x: Int) ⇒ x * 2, stoppingDecider), - Map((x: Int) ⇒ x + 1, stoppingDecider))) { + "implement chain of maps correctly" in new OneBoundedSetup[Int]( + Map((x: Int) ⇒ x + 1), + Map((x: Int) ⇒ x * 2), + Map((x: Int) ⇒ x + 1)) { lastEvents() should be(Set.empty) @@ -66,7 +66,7 @@ class InterpreterSpec extends StreamSpec with GraphInterpreterSpecKit { lastEvents() should be(Set(Cancel)) } - "work with only boundary ops" in new OneBoundedSetup[Int](Seq.empty) { + "work with only boundary ops" in new OneBoundedSetup[Int]() { lastEvents() should be(Set.empty) downstream.requestOne() @@ -149,7 +149,7 @@ class InterpreterSpec extends StreamSpec with GraphInterpreterSpecKit { "implement take inside a chain" in new OneBoundedSetup[Int]( Filter((x: Int) ⇒ x != 0), takeTwo, - Map((x: Int) ⇒ x + 1, stoppingDecider).toGS) { + Map((x: Int) ⇒ x + 1)) { lastEvents() should be(Set.empty) @@ -433,11 +433,11 @@ class InterpreterSpec extends StreamSpec with GraphInterpreterSpecKit { // Note, the new interpreter has no jumpback table, still did not want to remove the test "work with jumpback table and completed elements" in new OneBoundedSetup[Int]( - Map((x: Int) ⇒ x, stoppingDecider).toGS, - Map((x: Int) ⇒ x, stoppingDecider).toGS, + Map((x: Int) ⇒ x), + Map((x: Int) ⇒ x), KeepGoing(), - Map((x: Int) ⇒ x, stoppingDecider).toGS, - Map((x: Int) ⇒ x, stoppingDecider).toGS) { + Map((x: Int) ⇒ x), + Map((x: Int) ⇒ x)) { lastEvents() should be(Set.empty) @@ -464,8 +464,7 @@ class InterpreterSpec extends StreamSpec with GraphInterpreterSpecKit { } - "work with pushAndFinish if upstream completes with pushAndFinish" in new OneBoundedSetup[Int](Seq( - new PushFinishStage)) { + "work with pushAndFinish if upstream completes with pushAndFinish" in new OneBoundedSetup[Int](new PushFinishStage) { lastEvents() should be(Set.empty) @@ -476,10 +475,10 @@ class InterpreterSpec extends StreamSpec with GraphInterpreterSpecKit { lastEvents() should be(Set(OnNext(0), OnComplete)) } - "work with pushAndFinish if indirect upstream completes with pushAndFinish" in new OneBoundedSetup[Int](Seq( - Map((x: Any) ⇒ x, stoppingDecider), + "work with pushAndFinish if indirect upstream completes with pushAndFinish" in new OneBoundedSetup[Int]( + Map((x: Any) ⇒ x), new PushFinishStage, - Map((x: Any) ⇒ x, stoppingDecider))) { + Map((x: Any) ⇒ x)) { lastEvents() should be(Set.empty) @@ -491,7 +490,7 @@ class InterpreterSpec extends StreamSpec with GraphInterpreterSpecKit { } "work with pushAndFinish if upstream completes with pushAndFinish and downstream immediately pulls" in new OneBoundedSetup[Int]( - (new PushFinishStage).toGS, + new PushFinishStage, Fold(0, (x: Int, y: Int) ⇒ x + y)) { lastEvents() should be(Set.empty) @@ -503,11 +502,18 @@ class InterpreterSpec extends StreamSpec with GraphInterpreterSpecKit { lastEvents() should be(Set(OnNext(1), OnComplete)) } - "report error if pull is called while op is terminating" in new OneBoundedSetup[Int](Seq(new PushPullStage[Any, Any] { - override def onPull(ctx: Context[Any]): SyncDirective = ctx.pull() - override def onPush(elem: Any, ctx: Context[Any]): SyncDirective = ctx.pull() - override def onUpstreamFinish(ctx: Context[Any]): TerminationDirective = ctx.absorbTermination() - })) { + "report error if pull is called while op is terminating" in new OneBoundedSetup[Int]( + new SimpleLinearGraphStage[Any] { + override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = + new GraphStageLogic(shape) with InHandler with OutHandler { + override def onPush(): Unit = pull(in) + override def onPull(): Unit = pull(in) + override def onUpstreamFinish(): Unit = if (!hasBeenPulled(in)) pull(in) + + setHandlers(in, out, this) + } + } + ) { lastEvents() should be(Set.empty) downstream.requestOne() @@ -558,8 +564,8 @@ class InterpreterSpec extends StreamSpec with GraphInterpreterSpecKit { override def onDownstreamFinish(ctx: Context[Int]): TerminationDirective = ctx.absorbTermination() } - "not allow absorbTermination from onDownstreamFinish()" in new OneBoundedSetup[Int](Seq( - new InvalidAbsorbTermination)) { + // This test must be kept since it tests the compatibility layer, which while is deprecated it is still here. + "not allow absorbTermination from onDownstreamFinish()" in new OneBoundedSetup[Int]((new InvalidAbsorbTermination).toGS) { lastEvents() should be(Set.empty) EventFilter[UnsupportedOperationException]("It is not allowed to call absorbTermination() from onDownstreamFinish.", occurrences = 1).intercept { @@ -635,16 +641,19 @@ class InterpreterSpec extends StreamSpec with GraphInterpreterSpecKit { override val shape: FlowShape[T, T] = FlowShape(in, out) } - // This test is related to issue #17351 - private[akka] class PushFinishStage(onPostStop: () ⇒ Unit = () ⇒ ()) extends PushStage[Any, Any] { - override def onPush(elem: Any, ctx: Context[Any]): SyncDirective = - ctx.pushAndFinish(elem) + private[akka] class PushFinishStage(onPostStop: () ⇒ Unit = () ⇒ ()) extends SimpleLinearGraphStage[Any] { + override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = + new GraphStageLogic(shape) with InHandler with OutHandler { + override def onPush(): Unit = { + push(out, grab(in)) + completeStage() + } + override def onPull(): Unit = pull(in) + override def onUpstreamFinish(): Unit = failStage(akka.stream.testkit.Utils.TE("Cannot happen")) + override def postStop(): Unit = onPostStop() - override def onUpstreamFinish(ctx: Context[Any]): TerminationDirective = - ctx.fail(akka.stream.testkit.Utils.TE("Cannot happen")) - - override def postStop(): Unit = - onPostStop() + setHandlers(in, out, this) + } } } diff --git a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/InterpreterStressSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/InterpreterStressSpec.scala index d7ef645b1a..ff7895fe71 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/InterpreterStressSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/InterpreterStressSpec.scala @@ -14,7 +14,7 @@ class InterpreterStressSpec extends StreamSpec with GraphInterpreterSpecKit { val halfLength = chainLength / 2 val repetition = 100 - val map = Map((x: Int) ⇒ x + 1, stoppingDecider).toGS + val map = Map((x: Int) ⇒ x + 1) // GraphStages can be reused val dropOne = Drop(1) diff --git a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/InterpreterSupervisionSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/InterpreterSupervisionSpec.scala index 5e40d02b04..355c6453b7 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/InterpreterSupervisionSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/InterpreterSupervisionSpec.scala @@ -6,11 +6,9 @@ package akka.stream.impl.fusing import akka.stream.testkit.StreamSpec import scala.util.control.NoStackTrace -import akka.stream.Supervision -import akka.stream.stage.Context -import akka.stream.stage.PushPullStage -import akka.stream.stage.Stage -import akka.stream.stage.SyncDirective +import akka.stream.{ ActorAttributes, Attributes, Supervision } +import akka.stream.stage._ +import akka.testkit.AkkaSpec class InterpreterSupervisionSpec extends StreamSpec with GraphInterpreterSpecKit { import Supervision.stoppingDecider @@ -21,16 +19,22 @@ class InterpreterSupervisionSpec extends StreamSpec with GraphInterpreterSpecKit override def toString = "TE" } + class ResumingMap[In, Out](_f: In ⇒ Out) extends Map(_f) { + + override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = + super.createLogic(inheritedAttributes.and(ActorAttributes.supervisionStrategy(resumingDecider))) + } + "Interpreter error handling" must { - "handle external failure" in new OneBoundedSetup[Int](Seq(Map((x: Int) ⇒ x + 1, stoppingDecider))) { + "handle external failure" in new OneBoundedSetup[Int](Map((x: Int) ⇒ x + 1)) { lastEvents() should be(Set.empty) upstream.onError(TE) lastEvents() should be(Set(OnError(TE))) } - "emit failure when op throws" in new OneBoundedSetup[Int](Seq(Map((x: Int) ⇒ if (x == 0) throw TE else x, stoppingDecider))) { + "emit failure when op throws" in new OneBoundedSetup[Int](Map((x: Int) ⇒ if (x == 0) throw TE else x)) { downstream.requestOne() lastEvents() should be(Set(RequestOne)) upstream.onNext(2) @@ -42,10 +46,10 @@ class InterpreterSupervisionSpec extends StreamSpec with GraphInterpreterSpecKit lastEvents() should be(Set(Cancel, OnError(TE))) } - "emit failure when op throws in middle of the chain" in new OneBoundedSetup[Int](Seq( - Map((x: Int) ⇒ x + 1, stoppingDecider), - Map((x: Int) ⇒ if (x == 0) throw TE else x + 10, stoppingDecider), - Map((x: Int) ⇒ x + 100, stoppingDecider))) { + "emit failure when op throws in middle of the chain" in new OneBoundedSetup[Int]( + Map((x: Int) ⇒ x + 1), + Map((x: Int) ⇒ if (x == 0) throw TE else x + 10), + Map((x: Int) ⇒ x + 100)) { downstream.requestOne() lastEvents() should be(Set(RequestOne)) @@ -58,7 +62,9 @@ class InterpreterSupervisionSpec extends StreamSpec with GraphInterpreterSpecKit lastEvents() should be(Set(Cancel, OnError(TE))) } - "resume when Map throws" in new OneBoundedSetup[Int](Seq(Map((x: Int) ⇒ if (x == 0) throw TE else x, resumingDecider))) { + "resume when Map throws" in new OneBoundedSetup[Int]( + new ResumingMap((x: Int) ⇒ if (x == 0) throw TE else x) + ) { downstream.requestOne() lastEvents() should be(Set(RequestOne)) upstream.onNext(2) @@ -82,10 +88,11 @@ class InterpreterSupervisionSpec extends StreamSpec with GraphInterpreterSpecKit lastEvents() should be(Set(OnNext(4))) } - "resume when Map throws in middle of the chain" in new OneBoundedSetup[Int](Seq( - Map((x: Int) ⇒ x + 1, resumingDecider), - Map((x: Int) ⇒ if (x == 0) throw TE else x + 10, resumingDecider), - Map((x: Int) ⇒ x + 100, resumingDecider))) { + "resume when Map throws in middle of the chain" in new OneBoundedSetup[Int]( + new ResumingMap((x: Int) ⇒ x + 1), + new ResumingMap((x: Int) ⇒ if (x == 0) throw TE else x + 10), + new ResumingMap((x: Int) ⇒ x + 100) + ) { downstream.requestOne() lastEvents() should be(Set(RequestOne)) @@ -102,8 +109,8 @@ class InterpreterSupervisionSpec extends StreamSpec with GraphInterpreterSpecKit } "resume when Map throws before Grouped" in new OneBoundedSetup[Int]( - Map((x: Int) ⇒ x + 1, resumingDecider).toGS, - Map((x: Int) ⇒ if (x <= 0) throw TE else x + 10, resumingDecider).toGS, + new ResumingMap((x: Int) ⇒ x + 1), + new ResumingMap((x: Int) ⇒ if (x <= 0) throw TE else x + 10), Grouped(3)) { downstream.requestOne() @@ -122,8 +129,8 @@ class InterpreterSupervisionSpec extends StreamSpec with GraphInterpreterSpecKit } "complete after resume when Map throws before Grouped" in new OneBoundedSetup[Int]( - Map((x: Int) ⇒ x + 1, resumingDecider).toGS, - Map((x: Int) ⇒ if (x <= 0) throw TE else x + 10, resumingDecider).toGS, + new ResumingMap((x: Int) ⇒ x + 1), + new ResumingMap((x: Int) ⇒ if (x <= 0) throw TE else x + 10), Grouped(1000)) { downstream.requestOne() diff --git a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/IteratorInterpreterSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/IteratorInterpreterSpec.scala index d991673ed0..c5e1906a2e 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/IteratorInterpreterSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/IteratorInterpreterSpec.scala @@ -16,14 +16,14 @@ class IteratorInterpreterSpec extends StreamSpec with GraphInterpreterSpecKit { "work in the happy case" in { val itr = new IteratorInterpreter[Int, Int]((1 to 10).iterator, Seq( - Map((x: Int) ⇒ x + 1, stoppingDecider).toGS)).iterator + Map((x: Int) ⇒ x + 1))).iterator itr.toSeq should be(2 to 11) } "hasNext should not affect elements" in { val itr = new IteratorInterpreter[Int, Int]((1 to 10).iterator, Seq( - Map((x: Int) ⇒ x, stoppingDecider).toGS)).iterator + Map((x: Int) ⇒ x))).iterator itr.hasNext should be(true) itr.hasNext should be(true) @@ -42,7 +42,7 @@ class IteratorInterpreterSpec extends StreamSpec with GraphInterpreterSpecKit { "throw exceptions on empty iterator" in { val itr = new IteratorInterpreter[Int, Int](List(1).iterator, Seq( - Map((x: Int) ⇒ x, stoppingDecider).toGS)).iterator + Map((x: Int) ⇒ x))).iterator itr.next() should be(1) a[NoSuchElementException] should be thrownBy { itr.next() } @@ -50,7 +50,7 @@ class IteratorInterpreterSpec extends StreamSpec with GraphInterpreterSpecKit { "throw exceptions when op in chain throws" in { val itr = new IteratorInterpreter[Int, Int](List(1, 2, 3).iterator, Seq( - Map((n: Int) ⇒ if (n == 2) throw new ArithmeticException() else n, stoppingDecider).toGS)).iterator + Map((n: Int) ⇒ if (n == 2) throw new ArithmeticException() else n))).iterator itr.next() should be(1) itr.hasNext should be(true) @@ -60,7 +60,7 @@ class IteratorInterpreterSpec extends StreamSpec with GraphInterpreterSpecKit { "work with an empty iterator" in { val itr = new IteratorInterpreter[Int, Int](Iterator.empty, Seq( - Map((x: Int) ⇒ x + 1, stoppingDecider).toGS)).iterator + Map((x: Int) ⇒ x + 1))).iterator itr.hasNext should be(false) a[NoSuchElementException] should be thrownBy { itr.next() } diff --git a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/LifecycleInterpreterSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/LifecycleInterpreterSpec.scala index 9b3959c6a2..7faf0e68c9 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/LifecycleInterpreterSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/LifecycleInterpreterSpec.scala @@ -84,9 +84,9 @@ class LifecycleInterpreterSpec extends StreamSpec with GraphInterpreterSpecKit { } "onError when preStart fails with stages after" in new OneBoundedSetup[String]( - Map((x: Int) ⇒ x, stoppingDecider).toGS, + Map((x: Int) ⇒ x), PreStartFailer(() ⇒ throw TE("Boom!")), - Map((x: Int) ⇒ x, stoppingDecider).toGS) { + Map((x: Int) ⇒ x)) { lastEvents() should ===(Set(Cancel, OnError(TE("Boom!")))) } @@ -112,9 +112,9 @@ class LifecycleInterpreterSpec extends StreamSpec with GraphInterpreterSpecKit { } "postStop when pushAndFinish called with pushAndFinish if indirect upstream completes with pushAndFinish" in new OneBoundedSetup[String]( - Map((x: Any) ⇒ x, stoppingDecider).toGS, + Map((x: Any) ⇒ x), new PushFinishStage(onPostStop = () ⇒ testActor ! "stop"), - Map((x: Any) ⇒ x, stoppingDecider).toGS) { + Map((x: Any) ⇒ x)) { lastEvents() should be(Set.empty) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSpec.scala index 98b1032fe2..10a6da89f8 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSpec.scala @@ -55,7 +55,7 @@ class FlowSpec extends StreamSpec(ConfigFactory.parseString("akka.actor.debug.re } val faultyFlow: Flow[Any, Any, NotUsed] ⇒ Flow[Any, Any, NotUsed] = in ⇒ in.via({ - val stage = new PushPullGraphStage((_) ⇒ fusing.Map({ x: Any ⇒ x }, stoppingDecider), Attributes.none) + val stage = fusing.Map({ x: Any ⇒ x }) val assembly = new GraphAssembly( Array(stage), diff --git a/akka-stream/src/main/scala/akka/stream/impl/Stages.scala b/akka-stream/src/main/scala/akka/stream/impl/Stages.scala index c5c8371cd6..961a3b8a5a 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Stages.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Stages.scala @@ -156,10 +156,6 @@ object Stages { } - final case class Map[In, Out](f: In ⇒ Out, attributes: Attributes = map) extends SymbolicStage[In, Out] { - override def create(attr: Attributes): Stage[In, Out] = fusing.Map(f, supervision(attr)) - } - final case class Buffer[T](size: Int, overflowStrategy: OverflowStrategy, attributes: Attributes = buffer) extends SymbolicStage[T, T] { require(size > 0, s"Buffer size must be larger than zero but was [$size]") override def create(attr: Attributes): Stage[T, T] = fusing.Buffer(size, overflowStrategy) diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala index 6fcbbb7c00..6336133c6d 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala @@ -25,10 +25,33 @@ import akka.stream.impl.Stages.DefaultAttributes /** * INTERNAL API */ -final case class Map[In, Out](f: In ⇒ Out, decider: Supervision.Decider) extends PushStage[In, Out] { - override def onPush(elem: In, ctx: Context[Out]): SyncDirective = ctx.push(f(elem)) +// FIXME: Not final because InterpreterSupervisionSpec. Some better option is needed here +case class Map[In, Out](f: In ⇒ Out) extends GraphStage[FlowShape[In, Out]] { + val in = Inlet[In]("Map.in") + val out = Outlet[Out]("Map.out") + override val shape = FlowShape(in, out) + override def initialAttributes: Attributes = DefaultAttributes.map - override def decide(t: Throwable): Supervision.Directive = decider(t) + override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = + new GraphStageLogic(shape) with InHandler with OutHandler { + private def decider = + inheritedAttributes.get[SupervisionStrategy].map(_.decider).getOrElse(Supervision.stoppingDecider) + + override def onPush(): Unit = { + try { + push(out, f(grab(in))) + } catch { + case NonFatal(ex) ⇒ decider(ex) match { + case Supervision.Stop ⇒ failStage(ex) + case _ ⇒ pull(in) + } + } + } + + override def onPull(): Unit = pull(in) + + setHandlers(in, out, this) + } } /** diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala index de56a2966f..de991e41a1 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala @@ -475,7 +475,7 @@ trait FlowOps[+Out, +Mat] { * '''Cancels when''' downstream cancels * */ - def map[T](f: Out ⇒ T): Repr[T] = andThen(Map(f)) + def map[T](f: Out ⇒ T): Repr[T] = via(Map(f)) /** * Transform each input element into an `Iterable` of output elements that is From a6cf6c646eae9858ef3ca60cea43a4b07ee81ec3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Endre=20S=C3=A1ndor=20Varga?= Date: Wed, 20 Jul 2016 13:39:23 +0200 Subject: [PATCH 3/7] Optimize Fold supervision --- .../scala/akka/stream/impl/fusing/Ops.scala | 21 ++++++++++++------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala index 6336133c6d..3103712952 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala @@ -379,19 +379,24 @@ final case class Fold[In, Out](zero: Out, f: (Out, In) ⇒ Out) extends GraphSta override val initialAttributes = DefaultAttributes.fold override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = - new SupervisedGraphStageLogic(inheritedAttributes, shape) with InHandler with OutHandler { + new GraphStageLogic(shape) with InHandler with OutHandler { private var aggregator: Out = zero - override def onResume(t: Throwable): Unit = { - aggregator = zero - } + private def decider = + inheritedAttributes.get[SupervisionStrategy].map(_.decider).getOrElse(Supervision.stoppingDecider) - override def onPush(): Unit = withSupervision(() ⇒ grab(in)) match { - case Some(elem) ⇒ { - aggregator = f(aggregator, elem) + override def onPush(): Unit = { + try { + aggregator = f(aggregator, grab(in)) pull(in) + } catch { + case NonFatal(ex) ⇒ decider(ex) match { + case Supervision.Stop ⇒ failStage(ex) + case _ ⇒ + aggregator = zero + pull(in) + } } - case None ⇒ pull(in) } override def onPull(): Unit = { From b01acfd8d6994fb8daacde0e29c12f5051e67afc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Endre=20S=C3=A1ndor=20Varga?= Date: Wed, 20 Jul 2016 13:53:53 +0200 Subject: [PATCH 4/7] Microoptimizaitons in the interpreter --- .../stream/impl/fusing/GraphInterpreter.scala | 19 +------- .../scala/akka/stream/stage/GraphStage.scala | 43 ++++++++++++++----- 2 files changed, 34 insertions(+), 28 deletions(-) diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphInterpreter.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphInterpreter.scala index da05746a19..919b3018cb 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphInterpreter.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphInterpreter.scala @@ -650,7 +650,7 @@ final class GraphInterpreter( elem } - private def enqueue(connection: Int): Unit = { + def enqueue(connection: Int): Unit = { if (Debug) if (queueTail - queueHead > mask) new Exception(s"$Name internal queue full ($queueStatus) + $connection").printStackTrace() eventQueue(queueTail & mask) = connection queueTail += 1 @@ -688,23 +688,6 @@ final class GraphInterpreter( } } - private[stream] def push(connection: Int, elem: Any): Unit = { - val currentState = portStates(connection) - portStates(connection) = currentState ^ PushStartFlip - if ((currentState & InClosed) == 0) { - connectionSlots(connection) = elem - enqueue(connection) - } - } - - private[stream] def pull(connection: Int): Unit = { - val currentState = portStates(connection) - portStates(connection) = currentState ^ PullStartFlip - if ((currentState & OutClosed) == 0) { - enqueue(connection) - } - } - private[stream] def complete(connection: Int): Unit = { val currentState = portStates(connection) if (Debug) println(s"$Name complete($connection) [$currentState]") diff --git a/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala b/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala index a66f8fe2d5..4b46f87f11 100644 --- a/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala +++ b/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala @@ -340,12 +340,21 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount: * query whether pull is allowed to be called or not. This method will also fail if the port is already closed. */ final protected def pull[T](in: Inlet[T]): Unit = { - if ((interpreter.portStates(conn(in)) & (InReady | InClosed)) == InReady) { - interpreter.pull(conn(in)) + val connection = conn(in) + val portState = interpreter.portStates(connection) + val it = interpreter + + if ((portState & (InReady | InClosed | OutClosed)) == InReady) { + it.portStates(connection) = portState ^ PullStartFlip + it.enqueue(connection) } else { // Detailed error information should not add overhead to the hot path require(!isClosed(in), s"Cannot pull closed port ($in)") require(!hasBeenPulled(in), s"Cannot pull port ($in) twice") + + // There were no errors, the pull was simply ignored as the target stage already closed its port. We + // still need to track proper state though. + it.portStates(connection) = portState ^ PullStartFlip } } @@ -371,18 +380,19 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount: */ final protected def grab[T](in: Inlet[T]): T = { val connection = conn(in) + val it = interpreter + val elem = it.connectionSlots(connection) + // Fast path - if ((interpreter.portStates(connection) & (InReady | InFailed)) == InReady && - (interpreter.connectionSlots(connection).asInstanceOf[AnyRef] ne Empty)) { - val elem = interpreter.connectionSlots(connection) - interpreter.connectionSlots(connection) = Empty + if ((it.portStates(connection) & (InReady | InFailed)) == InReady && (elem.asInstanceOf[AnyRef] ne Empty)) { + it.connectionSlots(connection) = Empty elem.asInstanceOf[T] } else { // Slow path require(isAvailable(in), s"Cannot get element from already empty input port ($in)") - val failed = interpreter.connectionSlots(connection).asInstanceOf[Failed] + val failed = it.connectionSlots(connection).asInstanceOf[Failed] val elem = failed.previousElem.asInstanceOf[T] - interpreter.connectionSlots(connection) = Failed(failed.ex, Empty) + it.connectionSlots(connection) = Failed(failed.ex, Empty) elem } } @@ -428,13 +438,26 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount: * used to check if the port is ready to be pushed or not. */ final protected def push[T](out: Outlet[T], elem: T): Unit = { - if ((interpreter.portStates(conn(out)) & (OutReady | OutClosed)) == OutReady && (elem != null)) { - interpreter.push(conn(out), elem) + val connection = conn(out) + val portState = interpreter.portStates(connection) + val it = interpreter + + it.portStates(connection) = portState ^ PushStartFlip + + if ((portState & (OutReady | OutClosed | InClosed)) == OutReady && (elem != null)) { + it.connectionSlots(connection) = elem + it.enqueue(connection) } else { + // Restore state for the error case + it.portStates(connection) = portState + // Detailed error information should not add overhead to the hot path ReactiveStreamsCompliance.requireNonNullElement(elem) require(isAvailable(out), s"Cannot push port ($out) twice") require(!isClosed(out), s"Cannot pull closed port ($out)") + + // No error, just InClosed caused the actual pull to be ignored, but the status flag still needs to be flipped + it.portStates(connection) = portState ^ PushStartFlip } } From 3a1a0cc4c4d3bbefbc04ae1d8edb1456083b8329 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Endre=20S=C3=A1ndor=20Varga?= Date: Wed, 20 Jul 2016 17:10:50 +0200 Subject: [PATCH 5/7] Event chasing implemented --- .../akka/stream/FusedGraphsBenchmark.scala | 9 ++ .../impl/fusing/GraphInterpreterSpec.scala | 7 +- .../stream/impl/fusing/GraphInterpreter.scala | 131 ++++++++++++++---- .../scala/akka/stream/stage/GraphStage.scala | 4 +- 4 files changed, 117 insertions(+), 34 deletions(-) diff --git a/akka-bench-jmh/src/main/scala/akka/stream/FusedGraphsBenchmark.scala b/akka-bench-jmh/src/main/scala/akka/stream/FusedGraphsBenchmark.scala index ba8e12a2a0..f2dea5401f 100644 --- a/akka-bench-jmh/src/main/scala/akka/stream/FusedGraphsBenchmark.scala +++ b/akka-bench-jmh/src/main/scala/akka/stream/FusedGraphsBenchmark.scala @@ -160,6 +160,15 @@ class FusedGraphsBenchmark { chainOfMaps = fuse( testSource + .map(addFunc) + .map(addFunc) + .map(addFunc) + .map(addFunc) + .map(addFunc) + .map(addFunc) + .map(addFunc) + .map(addFunc) + .map(addFunc) .map(addFunc) .toMat(testSink)(Keep.right) ) diff --git a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/GraphInterpreterSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/GraphInterpreterSpec.scala index 9d525ec353..07327b37d3 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/GraphInterpreterSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/GraphInterpreterSpec.scala @@ -368,11 +368,8 @@ class GraphInterpreterSpec extends StreamSpec with GraphInterpreterSpecKit { sink.requestOne(eventLimit = 0) source.onComplete(eventLimit = 3) - lastEvents() should ===(Set(OnNext(sink, "C"))) - - sink.requestOne() - lastEvents() should ===(Set(OnComplete(sink))) - + // OnComplete arrives early due to push chasing + lastEvents() should ===(Set(OnNext(sink, "C"), OnComplete(sink))) } } diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphInterpreter.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphInterpreter.scala index 919b3018cb..3cba149fb0 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphInterpreter.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphInterpreter.scala @@ -47,6 +47,8 @@ object GraphInterpreter { final val KeepGoingFlag = 0x4000000 final val KeepGoingMask = 0x3ffffff + final val ChaseLimit = 16 + /** * Marker object that indicates that a port holds no element since it was already grabbed. The port is still pullable, * but there is no more element to grab. @@ -381,7 +383,7 @@ final class GraphInterpreter( shape.inlets.size + shape.outlets.size } - private var _subFusingMaterializer: Materializer = _ + private[this] var _subFusingMaterializer: Materializer = _ def subFusingMaterializer: Materializer = _subFusingMaterializer // An event queue implemented as a circular buffer @@ -391,6 +393,10 @@ final class GraphInterpreter( private[this] var queueHead: Int = 0 private[this] var queueTail: Int = 0 + private[this] var chaseCounter = 0 // the first events in preStart blocks should be not chased + private[this] var chasedPush: Int = NoEvent + private[this] var chasedPull: Int = NoEvent + private def queueStatus: String = { val contents = (queueHead until queueTail).map(idx ⇒ { val conn = eventQueue(idx & mask) @@ -539,20 +545,66 @@ final class GraphInterpreter( try { while (eventsRemaining > 0 && queueTail != queueHead) { val connection = dequeue() + eventsRemaining -= 1 + chaseCounter = math.min(ChaseLimit, eventsRemaining) + + def reportStageError(e: Throwable): Unit = { + if (activeStage == null) throw e + else { + val stage = assembly.stages(activeStage.stageId) + + log.error(e, "Error in stage [{}]: {}", stage, e.getMessage) + activeStage.failStage(e) + + // Abort chasing + chaseCounter = 0 + if (chasedPush != NoEvent) { + enqueue(chasedPush) + chasedPush = NoEvent + } + if (chasedPull != NoEvent) { + enqueue(chasedPull) + chasedPull = NoEvent + } + } + } + try processEvent(connection) catch { - case NonFatal(e) ⇒ - if (activeStage == null) throw e - else { - val stage = assembly.stages(activeStage.stageId) - - log.error(e, "Error in stage [{}]: {}", stage, e.getMessage) - activeStage.failStage(e) - } + case NonFatal(e) ⇒ reportStageError(e) } afterStageHasRun(activeStage) - eventsRemaining -= 1 + + // Chasing PUSH events + while (chasedPush != NoEvent) { + val connection = chasedPush + chasedPush = NoEvent + try processPush(connection) + catch { + case NonFatal(e) ⇒ reportStageError(e) + } + afterStageHasRun(activeStage) + } + + // Chasing PULL events + while (chasedPull != NoEvent) { + val connection = chasedPull + chasedPull = NoEvent + try processPull(connection) + catch { + case NonFatal(e) ⇒ reportStageError(e) + } + afterStageHasRun(activeStage) + } + + if (chasedPush != NoEvent) { + enqueue(chasedPush) + chasedPush = NoEvent + } + } + // Event *must* be enqueued while not in the execute loop (events enqueued from external, possibly async events) + chaseCounter = 0 } finally { currentInterpreterHolder(0) = previousInterpreter } @@ -577,18 +629,12 @@ final class GraphInterpreter( } finally currentInterpreterHolder(0) = previousInterpreter } + private def safeLogics(id: Int) = + if (id == Boundary) null + else logics(id) + // Decodes and processes a single event for the given connection private def processEvent(connection: Int): Unit = { - def safeLogics(id: Int) = - if (id == Boundary) null - else logics(id) - - def processElement(): Unit = { - if (Debug) println(s"$Name PUSH ${outOwnerName(connection)} -> ${inOwnerName(connection)}, ${connectionSlots(connection)} (${inHandlers(connection)}) [${inLogicName(connection)}]") - activeStage = safeLogics(assembly.inOwners(connection)) - portStates(connection) ^= PushEndFlip - inHandlers(connection).onPush() - } // this must be the state after returning without delivering any signals, to avoid double-finalization of some unlucky stage // (this can happen if a stage completes voluntarily while connection close events are still queued) @@ -598,14 +644,11 @@ final class GraphInterpreter( // Manual fast decoding, fast paths are PUSH and PULL // PUSH if ((code & (Pushing | InClosed | OutClosed)) == Pushing) { - processElement() + processPush(connection) // PULL } else if ((code & (Pulling | OutClosed | InClosed)) == Pulling) { - if (Debug) println(s"$Name PULL ${inOwnerName(connection)} -> ${outOwnerName(connection)} (${outHandlers(connection)}) [${outLogicName(connection)}]") - portStates(connection) ^= PullEndFlip - activeStage = safeLogics(assembly.outOwners(connection)) - outHandlers(connection).onPull() + processPull(connection) // CANCEL } else if ((code & (OutClosed | InClosed)) == InClosed) { @@ -629,13 +672,27 @@ final class GraphInterpreter( else inHandlers(connection).onUpstreamFailure(connectionSlots(connection).asInstanceOf[Failed].ex) } else { // Push is pending, first process push, then re-enqueue closing event - processElement() + processPush(connection) enqueue(connection) } } } + private def processPush(connection: Int): Unit = { + if (Debug) println(s"$Name PUSH ${outOwnerName(connection)} -> ${inOwnerName(connection)}, ${connectionSlots(connection)} (${inHandlers(connection)}) [${inLogicName(connection)}]") + activeStage = safeLogics(assembly.inOwners(connection)) + portStates(connection) ^= PushEndFlip + inHandlers(connection).onPush() + } + + private def processPull(connection: Int): Unit = { + if (Debug) println(s"$Name PULL ${inOwnerName(connection)} -> ${outOwnerName(connection)} (${outHandlers(connection)}) [${outLogicName(connection)}]") + activeStage = safeLogics(assembly.outOwners(connection)) + portStates(connection) ^= PullEndFlip + outHandlers(connection).onPull() + } + private def dequeue(): Int = { val idx = queueHead & mask if (fuzzingMode) { @@ -688,11 +745,31 @@ final class GraphInterpreter( } } + private[stream] def chasePush(connection: Int): Unit = { + if (chaseCounter > 0 && chasedPush == NoEvent) { + chaseCounter -= 1 + chasedPush = connection + } else enqueue(connection) + } + + private[stream] def chasePull(connection: Int): Unit = { + if (chaseCounter > 0 && chasedPull == NoEvent) { + chaseCounter -= 1 + chasedPull = connection + } else enqueue(connection) + } + private[stream] def complete(connection: Int): Unit = { val currentState = portStates(connection) if (Debug) println(s"$Name complete($connection) [$currentState]") portStates(connection) = currentState | OutClosed - if ((currentState & (InClosed | Pushing | Pulling | OutClosed)) == 0) enqueue(connection) + + // Push-Close needs special treatment, cannot be chased, convert back to ordinary event + if (chasedPush == connection) { + chasedPush = NoEvent + enqueue(connection) + } else if ((currentState & (InClosed | Pushing | Pulling | OutClosed)) == 0) enqueue(connection) + if ((currentState & OutClosed) == 0) completeConnection(assembly.outOwners(connection)) } diff --git a/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala b/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala index 4b46f87f11..1bf2d3a7ee 100644 --- a/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala +++ b/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala @@ -346,7 +346,7 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount: if ((portState & (InReady | InClosed | OutClosed)) == InReady) { it.portStates(connection) = portState ^ PullStartFlip - it.enqueue(connection) + it.chasePull(connection) } else { // Detailed error information should not add overhead to the hot path require(!isClosed(in), s"Cannot pull closed port ($in)") @@ -446,7 +446,7 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount: if ((portState & (OutReady | OutClosed | InClosed)) == OutReady && (elem != null)) { it.connectionSlots(connection) = elem - it.enqueue(connection) + it.chasePush(connection) } else { // Restore state for the error case it.portStates(connection) = portState From b8ceb863c9ca10b8168895c7c1ce9e88147b41b0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Endre=20S=C3=A1ndor=20Varga?= Date: Fri, 22 Jul 2016 15:39:37 +0200 Subject: [PATCH 6/7] reducing array load pressure by making connections an explicit object --- .../stream/impl/GraphStageLogicSpec.scala | 4 +- .../impl/fusing/GraphInterpreterSpecKit.scala | 18 +- .../scala/akka/stream/scaladsl/FlowSpec.scala | 4 +- .../stream/impl/ActorMaterializerImpl.scala | 4 +- .../impl/fusing/ActorGraphInterpreter.scala | 11 +- .../stream/impl/fusing/GraphInterpreter.scala | 275 +++++++++++------- .../impl/fusing/IteratorInterpreter.scala | 9 +- .../scala/akka/stream/stage/GraphStage.scala | 48 +-- 8 files changed, 220 insertions(+), 153 deletions(-) diff --git a/akka-stream-tests/src/test/scala/akka/stream/impl/GraphStageLogicSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/impl/GraphStageLogicSpec.scala index f8b03045c2..918c9d753d 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/impl/GraphStageLogicSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/impl/GraphStageLogicSpec.scala @@ -206,8 +206,8 @@ class GraphStageLogicSpec extends StreamSpec with GraphInterpreterSpecKit { .connect(passThrough.out, Downstream) .init() - interpreter.complete(0) - interpreter.cancel(1) + interpreter.complete(interpreter.connections(0)) + interpreter.cancel(interpreter.connections(1)) interpreter.execute(2) expectMsg("postStop2") diff --git a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/GraphInterpreterSpecKit.scala b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/GraphInterpreterSpecKit.scala index cf0a8f788c..c0bd29dd10 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/GraphInterpreterSpecKit.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/GraphInterpreterSpecKit.scala @@ -71,17 +71,17 @@ trait GraphInterpreterSpecKit extends StreamSpec { def init(): Unit = { val assembly = buildAssembly() - val (inHandlers, outHandlers, logics) = + val (conns, logics) = assembly.materialize(Attributes.none, assembly.stages.map(_.module), new java.util.HashMap, _ ⇒ ()) - _interpreter = new GraphInterpreter(assembly, NoMaterializer, logger, inHandlers, outHandlers, logics, + _interpreter = new GraphInterpreter(assembly, NoMaterializer, logger, logics, conns, (_, _, _) ⇒ (), fuzzingMode = false, null) for ((upstream, i) ← upstreams.zipWithIndex) { - _interpreter.attachUpstreamBoundary(i, upstream._1) + _interpreter.attachUpstreamBoundary(conns(i), upstream._1) } for ((downstream, i) ← downstreams.zipWithIndex) { - _interpreter.attachDownstreamBoundary(i + upstreams.size + connections.size, downstream._2) + _interpreter.attachDownstreamBoundary(conns(i + upstreams.size + connections.size), downstream._2) } _interpreter.init(null) @@ -89,9 +89,9 @@ trait GraphInterpreterSpecKit extends StreamSpec { } def manualInit(assembly: GraphAssembly): Unit = { - val (inHandlers, outHandlers, logics) = + val (connections, logics) = assembly.materialize(Attributes.none, assembly.stages.map(_.module), new java.util.HashMap, _ ⇒ ()) - _interpreter = new GraphInterpreter(assembly, NoMaterializer, logger, inHandlers, outHandlers, logics, + _interpreter = new GraphInterpreter(assembly, NoMaterializer, logger, logics, connections, (_, _, _) ⇒ (), fuzzingMode = false, null) } @@ -202,7 +202,7 @@ trait GraphInterpreterSpecKit extends StreamSpec { // Modified onPush that does not grab() automatically the element. This accesses some internals. override def onPush(): Unit = { - val internalEvent = interpreter.connectionSlots(portToConn(in.id)) + val internalEvent = portToConn(in.id).slot internalEvent match { case Failed(_, elem) ⇒ lastEvent += OnNext(DownstreamPortProbe.this, elem) @@ -224,8 +224,8 @@ trait GraphInterpreterSpecKit extends StreamSpec { outOwners = Array(-1)) manualInit(assembly) - interpreter.attachDownstreamBoundary(0, in) - interpreter.attachUpstreamBoundary(0, out) + interpreter.attachDownstreamBoundary(interpreter.connections(0), in) + interpreter.attachUpstreamBoundary(interpreter.connections(0), out) interpreter.init(null) } diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSpec.scala index 10a6da89f8..de3c8019f9 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSpec.scala @@ -65,10 +65,10 @@ class FlowSpec extends StreamSpec(ConfigFactory.parseString("akka.actor.debug.re Array(null, stage.shape.out), Array(-1, 0)) - val (inHandlers, outHandlers, logics) = + val (connections, logics) = assembly.materialize(Attributes.none, assembly.stages.map(_.module), new java.util.HashMap, _ ⇒ ()) - val shell = new GraphInterpreterShell(assembly, inHandlers, outHandlers, logics, stage.shape, settings, + val shell = new GraphInterpreterShell(assembly, connections, logics, stage.shape, settings, materializer.asInstanceOf[ActorMaterializerImpl]) val props = Props(new BrokenActorInterpreter(shell, "a3")) diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala index 8c2b997042..1f9b53e632 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala @@ -211,9 +211,9 @@ private[akka] case class ActorMaterializerImpl( private def matGraph(graph: GraphModule, effectiveAttributes: Attributes, matVal: ju.Map[Module, Any]): Unit = { val calculatedSettings = effectiveSettings(effectiveAttributes) - val (inHandlers, outHandlers, logics) = graph.assembly.materialize(effectiveAttributes, graph.matValIDs, matVal, registerSrc) + val (connections, logics) = graph.assembly.materialize(effectiveAttributes, graph.matValIDs, matVal, registerSrc) - val shell = new GraphInterpreterShell(graph.assembly, inHandlers, outHandlers, logics, graph.shape, + val shell = new GraphInterpreterShell(graph.assembly, connections, logics, graph.shape, calculatedSettings, ActorMaterializerImpl.this) val impl = diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorGraphInterpreter.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorGraphInterpreter.scala index 3b14652c29..403513cda9 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorGraphInterpreter.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorGraphInterpreter.scala @@ -12,7 +12,7 @@ import akka.stream._ import akka.stream.impl.ReactiveStreamsCompliance._ import akka.stream.impl.StreamLayout.{ AtomicModule, CompositeModule, CopiedModule, Module } import akka.stream.impl.{ SubFusingActorMaterializerImpl, _ } -import akka.stream.impl.fusing.GraphInterpreter.{ DownstreamBoundaryStageLogic, GraphAssembly, UpstreamBoundaryStageLogic } +import akka.stream.impl.fusing.GraphInterpreter.{ Connection, DownstreamBoundaryStageLogic, GraphAssembly, UpstreamBoundaryStageLogic } import akka.stream.stage.{ GraphStageLogic, InHandler, OutHandler } import org.reactivestreams.{ Subscriber, Subscription } @@ -308,8 +308,7 @@ object ActorGraphInterpreter { */ final class GraphInterpreterShell( assembly: GraphAssembly, - inHandlers: Array[InHandler], - outHandlers: Array[OutHandler], + connections: Array[Connection], logics: Array[GraphStageLogic], shape: Shape, settings: ActorMaterializerSettings, @@ -322,7 +321,7 @@ final class GraphInterpreterShell( private var enqueueToShortCircuit: (Any) ⇒ Unit = _ - lazy val interpreter: GraphInterpreter = new GraphInterpreter(assembly, mat, log, inHandlers, outHandlers, logics, + lazy val interpreter: GraphInterpreter = new GraphInterpreter(assembly, mat, log, logics, connections, (logic, event, handler) ⇒ { val asyncInput = AsyncInput(this, logic, event, handler) val currentInterpreter = GraphInterpreter.currentInterpreterOrNull @@ -366,7 +365,7 @@ final class GraphInterpreterShell( while (i < inputs.length) { val in = new BatchingActorInputBoundary(settings.maxInputBufferSize, i) inputs(i) = in - interpreter.attachUpstreamBoundary(i, in) + interpreter.attachUpstreamBoundary(connections(i), in) i += 1 } val offset = assembly.connectionCount - outputs.length @@ -374,7 +373,7 @@ final class GraphInterpreterShell( while (i < outputs.length) { val out = new ActorOutputBoundary(self, this, i) outputs(i) = out - interpreter.attachDownstreamBoundary(i + offset, out) + interpreter.attachDownstreamBoundary(connections(i + offset), out) i += 1 } interpreter.init(subMat) diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphInterpreter.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphInterpreter.scala index 3cba149fb0..1ca60cb2ab 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphInterpreter.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphInterpreter.scala @@ -27,7 +27,7 @@ object GraphInterpreter { */ final val Debug = false - final val NoEvent = -1 + final val NoEvent = null final val Boundary = -1 final val InReady = 1 @@ -66,6 +66,37 @@ object GraphInterpreter { val singleNoAttribute: Array[Attributes] = Array(Attributes.none) + /** + * INERNAL API + * + * Contains all the necessary information for the GraphInterpreter to be able to implement a connection + * between an output and input ports. + * + * @param id Identifier of the connection. Corresponds to the array slot in the [[GraphAssembly]] + * @param inOwnerId Identifier of the owner of the input side of the connection. Corresponds to the array slot in + * the [[GraphAssembly]] + * @param inOwner The stage logic that corresponds to the input side of the connection. + * @param outOwnerId Identifier of the owner of the output side of the connection. Corresponds to the array slot + * in the [[GraphAssembly]] + * @param outOwner The stage logic that corresponds to the output side of the connection. + * @param inHandler The handler that contains the callback for input events. + * @param outHandler The handler that contains the callback for output events. + */ + final class Connection( + val id: Int, + val inOwnerId: Int, + val inOwner: GraphStageLogic, + val outOwnerId: Int, + val outOwner: GraphStageLogic, + var inHandler: InHandler, + var outHandler: OutHandler + ) { + var portState: Int = InReady + var slot: Any = Empty + + override def toString = s"Connection($id, $portState, $slot, $inHandler, $outHandler)" + } + /** * INTERNAL API * @@ -117,16 +148,14 @@ object GraphInterpreter { * handlers and the stage logic instances. * * Returns a tuple of - * - lookup table for InHandlers - * - lookup table for OutHandlers + * - lookup table for Connections * - array of the logics - * - materialized value */ def materialize( inheritedAttributes: Attributes, copiedModules: Array[Module], matVal: ju.Map[Module, Any], - register: MaterializedValueSource[Any] ⇒ Unit): (Array[InHandler], Array[OutHandler], Array[GraphStageLogic]) = { + register: MaterializedValueSource[Any] ⇒ Unit): (Array[Connection], Array[GraphStageLogic]) = { val logics = Array.ofDim[GraphStageLogic](stages.length) var i = 0 @@ -167,32 +196,43 @@ object GraphInterpreter { i += 1 } - val inHandlers = Array.ofDim[InHandler](connectionCount) - val outHandlers = Array.ofDim[OutHandler](connectionCount) + val connections = Array.ofDim[Connection](connectionCount) i = 0 while (i < connectionCount) { + connections(i) = new Connection( + id = i, + inOwner = if (inOwners(i) == Boundary) null else logics(inOwners(i)), + inOwnerId = inOwners(i), + outOwner = if (outOwners(i) == Boundary) null else logics(outOwners(i)), + outOwnerId = outOwners(i), + inHandler = null, + outHandler = null + ) + if (ins(i) ne null) { val logic = logics(inOwners(i)) logic.handlers(ins(i).id) match { - case null ⇒ throw new IllegalStateException(s"no handler defined in stage $logic for port ${ins(i)}") - case h: InHandler ⇒ inHandlers(i) = h + case null ⇒ throw new IllegalStateException(s"no handler defined in stage $logic for port ${ins(i)}") + case h: InHandler ⇒ + connections(i).inHandler = h } - logics(inOwners(i)).portToConn(ins(i).id) = i + logics(inOwners(i)).portToConn(ins(i).id) = connections(i) } if (outs(i) ne null) { val logic = logics(outOwners(i)) val inCount = logic.inCount logic.handlers(outs(i).id + inCount) match { - case null ⇒ throw new IllegalStateException(s"no handler defined in stage $logic for port ${outs(i)}") - case h: OutHandler ⇒ outHandlers(i) = h + case null ⇒ throw new IllegalStateException(s"no handler defined in stage $logic for port ${outs(i)}") + case h: OutHandler ⇒ + connections(i).outHandler = h } - logic.portToConn(outs(i).id + inCount) = i + logic.portToConn(outs(i).id + inCount) = connections(i) } i += 1 } - (inHandlers, outHandlers, logics) + (connections, logics) } override def toString: String = { @@ -293,25 +333,27 @@ object GraphInterpreter { * The internal architecture of the interpreter is based on the usage of arrays and optimized for reducing allocations * on the hot paths. * - * One of the basic abstractions inside the interpreter is the notion of *connection*. In the abstract sense a - * connection represents an output-input port pair (an analogue for a connected RS Publisher-Subscriber pair), - * while in the practical sense a connection is a number which represents slots in certain arrays. + * One of the basic abstractions inside the interpreter is the [[akka.stream.impl.fusing.GraphInterpreter.Connection]]. + * A connection represents an output-input port pair (an analogue for a connected RS Publisher-Subscriber pair). + * The Connection object contains all the necessary data for the interpreter to pass elements, demand, completion + * or errors across the Connection. * In particular * - portStates contains a bitfield that tracks the states of the ports (output-input) corresponding to this * connection. This bitfield is used to decode the event that is in-flight. - * - connectionSlots is a mapping from a connection id to a potential element or exception that accompanies the + * - connectionSlot contains a potential element or exception that accompanies the * event encoded in the portStates bitfield - * - inHandlers is a mapping from a connection id to the [[InHandler]] instance that handles the events corresponding + * - inHandler contains the [[InHandler]] instance that handles the events corresponding * to the input port of the connection - * - outHandlers is a mapping from a connection id to the [[OutHandler]] instance that handles the events corresponding + * - outHandler contains the [[OutHandler]] instance that handles the events corresponding * to the output port of the connection * - * On top of these lookup tables there is an eventQueue, represented as a circular buffer of integers. The integers - * it contains represents connections that have pending events to be processed. The pending event itself is encoded - * in the portStates bitfield. This implies that there can be only one event in flight for a given connection, which - * is true in almost all cases, except a complete-after-push or fail-after-push. + * On top of the Connection table there is an eventQueue, represented as a circular buffer of Connections. The queue + * contains the Connections that have pending events to be processed. The pending event itself is encoded + * in the portState bitfield of the Connection. This implies that there can be only one event in flight for a given + * Connection, which is true in almost all cases, except a complete-after-push or fail-after-push which has to + * be decoded accordingly. * - * The layout of the portStates bitfield is the following: + * The layout of the portState bitfield is the following: * * |- state machn.-| Only one bit is hot among these bits * 64 32 16 | 8 4 2 1 | @@ -335,10 +377,10 @@ object GraphInterpreter { * Sending an event is usually the following sequence: * - An action is requested by a stage logic (push, pull, complete, etc.) * - the state machine in portStates is transitioned from a ready state to a pending event - * - the id of the affected connection is enqueued + * - the affected Connection is enqueued * * Receiving an event is usually the following sequence: - * - id of connection to be processed is dequeued + * - the Connection to be processed is dequeued * - the type of the event is determined from the bits set on portStates * - the state machine in portStates is transitioned to a ready state * - using the inHandlers/outHandlers table the corresponding callback is called on the stage logic. @@ -352,9 +394,8 @@ final class GraphInterpreter( private val assembly: GraphInterpreter.GraphAssembly, val materializer: Materializer, val log: LoggingAdapter, - val inHandlers: Array[InHandler], // Lookup table for the InHandler of a connection - val outHandlers: Array[OutHandler], // Lookup table for the outHandler of the connection val logics: Array[GraphStageLogic], // Array of stage logics + val connections: Array[GraphInterpreter.Connection], val onAsyncInput: (GraphStageLogic, Any, (Any) ⇒ Unit) ⇒ Unit, val fuzzingMode: Boolean, val context: ActorRef) { @@ -362,11 +403,11 @@ final class GraphInterpreter( // Maintains additional information for events, basically elements in-flight, or failure. // Other events are encoded in the portStates bitfield. - val connectionSlots = Array.fill[Any](assembly.connectionCount)(Empty) + //val connectionSlots = Array.fill[Any](assembly.connectionCount)(Empty) // Bitfield encoding pending events and various states for efficient querying and updates. See the documentation // of the class for a full description. - val portStates = Array.fill[Int](assembly.connectionCount)(InReady) + //val portStates = Array.fill[Int](assembly.connectionCount)(InReady) /** * INTERNAL API @@ -388,19 +429,19 @@ final class GraphInterpreter( // An event queue implemented as a circular buffer // FIXME: This calculates the maximum size ever needed, but most assemblies can run on a smaller queue - private[this] val eventQueue = Array.ofDim[Int](1 << (32 - Integer.numberOfLeadingZeros(assembly.connectionCount - 1))) + private[this] val eventQueue = Array.ofDim[Connection](1 << (32 - Integer.numberOfLeadingZeros(assembly.connectionCount - 1))) private[this] val mask = eventQueue.length - 1 private[this] var queueHead: Int = 0 private[this] var queueTail: Int = 0 private[this] var chaseCounter = 0 // the first events in preStart blocks should be not chased - private[this] var chasedPush: Int = NoEvent - private[this] var chasedPull: Int = NoEvent + private[this] var chasedPush: Connection = NoEvent + private[this] var chasedPull: Connection = NoEvent private def queueStatus: String = { val contents = (queueHead until queueTail).map(idx ⇒ { val conn = eventQueue(idx & mask) - (conn, portStates(conn), connectionSlots(conn)) + conn }) s"(${eventQueue.length}, $queueHead, $queueTail)(${contents.mkString(", ")})" } @@ -420,36 +461,42 @@ final class GraphInterpreter( * Assign the boundary logic to a given connection. This will serve as the interface to the external world * (outside the interpreter) to process and inject events. */ - def attachUpstreamBoundary(connection: Int, logic: UpstreamBoundaryStageLogic[_]): Unit = { + def attachUpstreamBoundary(connection: Connection, logic: UpstreamBoundaryStageLogic[_]): Unit = { logic.portToConn(logic.out.id + logic.inCount) = connection logic.interpreter = this - outHandlers(connection) = logic.handlers(0).asInstanceOf[OutHandler] + connection.outHandler = logic.handlers(0).asInstanceOf[OutHandler] } + def attachUpstreamBoundary(connection: Int, logic: UpstreamBoundaryStageLogic[_]): Unit = + attachUpstreamBoundary(connections(connection), logic) + /** * Assign the boundary logic to a given connection. This will serve as the interface to the external world * (outside the interpreter) to process and inject events. */ - def attachDownstreamBoundary(connection: Int, logic: DownstreamBoundaryStageLogic[_]): Unit = { + def attachDownstreamBoundary(connection: Connection, logic: DownstreamBoundaryStageLogic[_]): Unit = { logic.portToConn(logic.in.id) = connection logic.interpreter = this - inHandlers(connection) = logic.handlers(0).asInstanceOf[InHandler] + connection.inHandler = logic.handlers(0).asInstanceOf[InHandler] } + def attachDownstreamBoundary(connection: Int, logic: DownstreamBoundaryStageLogic[_]): Unit = + attachDownstreamBoundary(connections(connection), logic) + /** * Dynamic handler changes are communicated from a GraphStageLogic by this method. */ - def setHandler(connection: Int, handler: InHandler): Unit = { + def setHandler(connection: Connection, handler: InHandler): Unit = { if (Debug) println(s"$Name SETHANDLER ${inOwnerName(connection)} (in) $handler") - inHandlers(connection) = handler + connection.inHandler = handler } /** * Dynamic handler changes are communicated from a GraphStageLogic by this method. */ - def setHandler(connection: Int, handler: OutHandler): Unit = { + def setHandler(connection: Connection, handler: OutHandler): Unit = { if (Debug) println(s"$Name SETHANDLER ${outOwnerName(connection)} (out) $handler") - outHandlers(connection) = handler + connection.outHandler = handler } /** @@ -502,29 +549,29 @@ final class GraphInterpreter( } // Debug name for a connections input part - private def inOwnerName(connection: Int): String = - assembly.inOwners(connection) match { + private def inOwnerName(connection: Connection): String = + assembly.inOwners(connection.id) match { case Boundary ⇒ "DownstreamBoundary" case owner ⇒ assembly.stages(owner).toString } // Debug name for a connections output part - private def outOwnerName(connection: Int): String = - assembly.outOwners(connection) match { + private def outOwnerName(connection: Connection): String = + assembly.outOwners(connection.id) match { case Boundary ⇒ "UpstreamBoundary" case owner ⇒ assembly.stages(owner).toString } // Debug name for a connections input part - private def inLogicName(connection: Int): String = - assembly.inOwners(connection) match { + private def inLogicName(connection: Connection): String = + assembly.inOwners(connection.id) match { case Boundary ⇒ "DownstreamBoundary" case owner ⇒ logics(owner).toString } // Debug name for a connections output part - private def outLogicName(connection: Int): String = - assembly.outOwners(connection) match { + private def outLogicName(connection: Connection): String = + assembly.outOwners(connection.id) match { case Boundary ⇒ "UpstreamBoundary" case owner ⇒ logics(owner).toString } @@ -558,23 +605,51 @@ final class GraphInterpreter( // Abort chasing chaseCounter = 0 - if (chasedPush != NoEvent) { + if (chasedPush ne NoEvent) { enqueue(chasedPush) chasedPush = NoEvent } - if (chasedPull != NoEvent) { + if (chasedPull ne NoEvent) { enqueue(chasedPull) chasedPull = NoEvent } } } + /* + * This is the "normal" event processing code which dequeues directly from the internal event queue. Since + * most execution paths tend to produce either a Push that will be propagated along a longer chain we take + * extra steps below to make this more efficient. + */ try processEvent(connection) catch { case NonFatal(e) ⇒ reportStageError(e) } afterStageHasRun(activeStage) + /* + * "Event chasing" optimization follows from here. This optimization works under the assumption that a Push or + * Pull is very likely immediately followed by another Push/Pull. The difference from the "normal" event + * dispatch is that chased events are never touching the event queue, they use a "streamlined" execution path + * instead. Looking at the scenario of a Push, the following events will happen. + * - "normal" dispatch executes an onPush event + * - stage eventually calls push() + * - code inside the push() method checks the validity of the call, and also if it can be safely ignored + * (because the target stage already completed we just have not been notified yet) + * - if the upper limit of ChaseLimit has not been reached, then the Connection is put into the chasedPush + * variable + * - the loop below immediately captures this push and dispatches it + * + * What is saved by this optimization is three steps: + * - no need to enqueue the Connection in the queue (array), it ends up in a simple variable, reducing + * pressure on array load-store + * - no need to dequeue the Connection from the queue, similar to above + * - no need to decode the event, we know it is a Push already + * - no need to check for validity of the event because we already checked at the push() call, and there + * can be no concurrent events interleaved unlike with the normal dispatch (think about a cancel() that is + * called in the target stage just before the onPush() arrives). This avoids unnecessary branching. + */ + // Chasing PUSH events while (chasedPush != NoEvent) { val connection = chasedPush @@ -629,17 +704,13 @@ final class GraphInterpreter( } finally currentInterpreterHolder(0) = previousInterpreter } - private def safeLogics(id: Int) = - if (id == Boundary) null - else logics(id) - // Decodes and processes a single event for the given connection - private def processEvent(connection: Int): Unit = { + private def processEvent(connection: Connection): Unit = { // this must be the state after returning without delivering any signals, to avoid double-finalization of some unlucky stage // (this can happen if a stage completes voluntarily while connection close events are still queued) activeStage = null - val code = portStates(connection) + val code = connection.portState // Manual fast decoding, fast paths are PUSH and PULL // PUSH @@ -652,24 +723,22 @@ final class GraphInterpreter( // CANCEL } else if ((code & (OutClosed | InClosed)) == InClosed) { - val stageId = assembly.outOwners(connection) - activeStage = safeLogics(stageId) - if (Debug) println(s"$Name CANCEL ${inOwnerName(connection)} -> ${outOwnerName(connection)} (${outHandlers(connection)}) [${outLogicName(connection)}]") - portStates(connection) |= OutClosed - completeConnection(stageId) - outHandlers(connection).onDownstreamFinish() + activeStage = connection.outOwner + if (Debug) println(s"$Name CANCEL ${inOwnerName(connection)} -> ${outOwnerName(connection)} (${connection.outHandler}) [${outLogicName(connection)}]") + connection.portState |= OutClosed + completeConnection(connection.outOwnerId) + connection.outHandler.onDownstreamFinish() } else if ((code & (OutClosed | InClosed)) == OutClosed) { // COMPLETIONS if ((code & Pushing) == 0) { // Normal completion (no push pending) - if (Debug) println(s"$Name COMPLETE ${outOwnerName(connection)} -> ${inOwnerName(connection)} (${inHandlers(connection)}) [${inLogicName(connection)}]") - portStates(connection) |= InClosed - val stageId = assembly.inOwners(connection) - activeStage = safeLogics(stageId) - completeConnection(stageId) - if ((portStates(connection) & InFailed) == 0) inHandlers(connection).onUpstreamFinish() - else inHandlers(connection).onUpstreamFailure(connectionSlots(connection).asInstanceOf[Failed].ex) + if (Debug) println(s"$Name COMPLETE ${outOwnerName(connection)} -> ${inOwnerName(connection)} (${connection.inHandler}) [${inLogicName(connection)}]") + connection.portState |= InClosed + activeStage = connection.inOwner + completeConnection(connection.inOwnerId) + if ((connection.portState & InFailed) == 0) connection.inHandler.onUpstreamFinish() + else connection.inHandler.onUpstreamFailure(connection.slot.asInstanceOf[Failed].ex) } else { // Push is pending, first process push, then re-enqueue closing event processPush(connection) @@ -679,21 +748,21 @@ final class GraphInterpreter( } } - private def processPush(connection: Int): Unit = { - if (Debug) println(s"$Name PUSH ${outOwnerName(connection)} -> ${inOwnerName(connection)}, ${connectionSlots(connection)} (${inHandlers(connection)}) [${inLogicName(connection)}]") - activeStage = safeLogics(assembly.inOwners(connection)) - portStates(connection) ^= PushEndFlip - inHandlers(connection).onPush() + private def processPush(connection: Connection): Unit = { + if (Debug) println(s"$Name PUSH ${outOwnerName(connection)} -> ${inOwnerName(connection)}, ${connection.slot} (${connection.inHandler}) [${inLogicName(connection)}]") + activeStage = connection.inOwner + connection.portState ^= PushEndFlip + connection.inHandler.onPush() } - private def processPull(connection: Int): Unit = { - if (Debug) println(s"$Name PULL ${inOwnerName(connection)} -> ${outOwnerName(connection)} (${outHandlers(connection)}) [${outLogicName(connection)}]") - activeStage = safeLogics(assembly.outOwners(connection)) - portStates(connection) ^= PullEndFlip - outHandlers(connection).onPull() + private def processPull(connection: Connection): Unit = { + if (Debug) println(s"$Name PULL ${inOwnerName(connection)} -> ${outOwnerName(connection)} (${connection.outHandler}) [${outLogicName(connection)}]") + activeStage = connection.outOwner + connection.portState ^= PullEndFlip + connection.outHandler.onPull() } - private def dequeue(): Int = { + private def dequeue(): Connection = { val idx = queueHead & mask if (fuzzingMode) { val swapWith = (ThreadLocalRandom.current.nextInt(queueTail - queueHead) + queueHead) & mask @@ -707,7 +776,7 @@ final class GraphInterpreter( elem } - def enqueue(connection: Int): Unit = { + def enqueue(connection: Connection): Unit = { if (Debug) if (queueTail - queueHead > mask) new Exception(s"$Name internal queue full ($queueStatus) + $connection").printStackTrace() eventQueue(queueTail & mask) = connection queueTail += 1 @@ -745,24 +814,24 @@ final class GraphInterpreter( } } - private[stream] def chasePush(connection: Int): Unit = { + private[stream] def chasePush(connection: Connection): Unit = { if (chaseCounter > 0 && chasedPush == NoEvent) { chaseCounter -= 1 chasedPush = connection } else enqueue(connection) } - private[stream] def chasePull(connection: Int): Unit = { + private[stream] def chasePull(connection: Connection): Unit = { if (chaseCounter > 0 && chasedPull == NoEvent) { chaseCounter -= 1 chasedPull = connection } else enqueue(connection) } - private[stream] def complete(connection: Int): Unit = { - val currentState = portStates(connection) + private[stream] def complete(connection: Connection): Unit = { + val currentState = connection.portState if (Debug) println(s"$Name complete($connection) [$currentState]") - portStates(connection) = currentState | OutClosed + connection.portState = currentState | OutClosed // Push-Close needs special treatment, cannot be chased, convert back to ordinary event if (chasedPush == connection) { @@ -770,30 +839,30 @@ final class GraphInterpreter( enqueue(connection) } else if ((currentState & (InClosed | Pushing | Pulling | OutClosed)) == 0) enqueue(connection) - if ((currentState & OutClosed) == 0) completeConnection(assembly.outOwners(connection)) + if ((currentState & OutClosed) == 0) completeConnection(connection.outOwnerId) } - private[stream] def fail(connection: Int, ex: Throwable): Unit = { - val currentState = portStates(connection) + private[stream] def fail(connection: Connection, ex: Throwable): Unit = { + val currentState = connection.portState if (Debug) println(s"$Name fail($connection, $ex) [$currentState]") - portStates(connection) = currentState | OutClosed + connection.portState = currentState | OutClosed if ((currentState & (InClosed | OutClosed)) == 0) { - portStates(connection) = currentState | (OutClosed | InFailed) - connectionSlots(connection) = Failed(ex, connectionSlots(connection)) + connection.portState = currentState | (OutClosed | InFailed) + connection.slot = Failed(ex, connection.slot) if ((currentState & (Pulling | Pushing)) == 0) enqueue(connection) } - if ((currentState & OutClosed) == 0) completeConnection(assembly.outOwners(connection)) + if ((currentState & OutClosed) == 0) completeConnection(connection.outOwnerId) } - private[stream] def cancel(connection: Int): Unit = { - val currentState = portStates(connection) + private[stream] def cancel(connection: Connection): Unit = { + val currentState = connection.portState if (Debug) println(s"$Name cancel($connection) [$currentState]") - portStates(connection) = currentState | InClosed + connection.portState = currentState | InClosed if ((currentState & OutClosed) == 0) { - connectionSlots(connection) = Empty + connection.slot = Empty if ((currentState & (Pulling | Pushing | InClosed)) == 0) enqueue(connection) } - if ((currentState & InClosed) == 0) completeConnection(assembly.inOwners(connection)) + if ((currentState & InClosed) == 0) completeConnection(connection.inOwnerId) } /** @@ -822,8 +891,8 @@ final class GraphInterpreter( else "N" + owner } - for (i ← portStates.indices) { - portStates(i) match { + for (i ← connections.indices) { + connections(i).portState match { case InReady ⇒ builder.append(s""" ${nameIn(i)} -> ${nameOut(i)} [label=shouldPull; color=blue]""") case OutReady ⇒ diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/IteratorInterpreter.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/IteratorInterpreter.scala index 263cece018..cb011795b3 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/IteratorInterpreter.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/IteratorInterpreter.scala @@ -137,20 +137,19 @@ private[akka] class IteratorInterpreter[I, O]( } val assembly = new GraphAssembly(stagesArray, attributes, ins, inOwners, outs, outOwners) - val (inHandlers, outHandlers, logics) = + val (connections, logics) = assembly.materialize(Attributes.none, assembly.stages.map(_.module), new ju.HashMap, _ ⇒ ()) val interpreter = new GraphInterpreter( assembly, NoMaterializer, NoLogging, - inHandlers, - outHandlers, logics, + connections, (_, _, _) ⇒ throw new UnsupportedOperationException("IteratorInterpreter does not support asynchronous events."), fuzzingMode = false, null) - interpreter.attachUpstreamBoundary(0, upstream) - interpreter.attachDownstreamBoundary(length, downstream) + interpreter.attachUpstreamBoundary(connections(0), upstream) + interpreter.attachDownstreamBoundary(connections(length), downstream) interpreter.init(null) } diff --git a/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala b/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala index 1bf2d3a7ee..b810f192ec 100644 --- a/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala +++ b/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala @@ -220,7 +220,7 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount: * INTERNAL API */ // Using common array to reduce overhead for small port counts - private[stream] val portToConn = Array.ofDim[Int](handlers.length) + private[stream] val portToConn = Array.ofDim[Connection](handlers.length) /** * INTERNAL API @@ -318,8 +318,8 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount: if (_interpreter != null) _interpreter.setHandler(conn(out), handler) } - private def conn(in: Inlet[_]): Int = portToConn(in.id) - private def conn(out: Outlet[_]): Int = portToConn(out.id + inCount) + private def conn(in: Inlet[_]): Connection = portToConn(in.id) + private def conn(out: Outlet[_]): Connection = portToConn(out.id + inCount) /** * Retrieves the current callback for the events on the given [[Outlet]] @@ -341,11 +341,11 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount: */ final protected def pull[T](in: Inlet[T]): Unit = { val connection = conn(in) - val portState = interpreter.portStates(connection) + val portState = connection.portState val it = interpreter if ((portState & (InReady | InClosed | OutClosed)) == InReady) { - it.portStates(connection) = portState ^ PullStartFlip + connection.portState = portState ^ PullStartFlip it.chasePull(connection) } else { // Detailed error information should not add overhead to the hot path @@ -354,7 +354,7 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount: // There were no errors, the pull was simply ignored as the target stage already closed its port. We // still need to track proper state though. - it.portStates(connection) = portState ^ PullStartFlip + connection.portState = portState ^ PullStartFlip } } @@ -381,18 +381,18 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount: final protected def grab[T](in: Inlet[T]): T = { val connection = conn(in) val it = interpreter - val elem = it.connectionSlots(connection) + val elem = connection.slot // Fast path - if ((it.portStates(connection) & (InReady | InFailed)) == InReady && (elem.asInstanceOf[AnyRef] ne Empty)) { - it.connectionSlots(connection) = Empty + if ((connection.portState & (InReady | InFailed)) == InReady && (elem.asInstanceOf[AnyRef] ne Empty)) { + connection.slot = Empty elem.asInstanceOf[T] } else { // Slow path require(isAvailable(in), s"Cannot get element from already empty input port ($in)") - val failed = it.connectionSlots(connection).asInstanceOf[Failed] + val failed = connection.slot.asInstanceOf[Failed] val elem = failed.previousElem.asInstanceOf[T] - it.connectionSlots(connection) = Failed(failed.ex, Empty) + connection.slot = Failed(failed.ex, Empty) elem } } @@ -401,7 +401,7 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount: * Indicates whether there is already a pending pull for the given input port. If this method returns true * then [[isAvailable()]] must return false for that same port. */ - final protected def hasBeenPulled[T](in: Inlet[T]): Boolean = (interpreter.portStates(conn(in)) & (InReady | InClosed)) == 0 + final protected def hasBeenPulled[T](in: Inlet[T]): Boolean = (conn(in).portState & (InReady | InClosed)) == 0 /** * Indicates whether there is an element waiting at the given input port. [[grab()]] can be used to retrieve the @@ -412,14 +412,14 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount: final protected def isAvailable[T](in: Inlet[T]): Boolean = { val connection = conn(in) - val normalArrived = (interpreter.portStates(conn(in)) & (InReady | InFailed)) == InReady + val normalArrived = (conn(in).portState & (InReady | InFailed)) == InReady // Fast path - if (normalArrived) interpreter.connectionSlots(connection).asInstanceOf[AnyRef] ne Empty + if (normalArrived) connection.slot.asInstanceOf[AnyRef] ne Empty else { // Slow path on failure - if ((interpreter.portStates(conn(in)) & (InReady | InFailed)) == (InReady | InFailed)) { - interpreter.connectionSlots(connection) match { + if ((connection.portState & (InReady | InFailed)) == (InReady | InFailed)) { + connection.slot match { case Failed(_, elem) ⇒ elem.asInstanceOf[AnyRef] ne Empty case _ ⇒ false // This can only be Empty actually (if a cancel was concurrent with a failure) } @@ -430,7 +430,7 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount: /** * Indicates whether the port has been closed. A closed port cannot be pulled. */ - final protected def isClosed[T](in: Inlet[T]): Boolean = (interpreter.portStates(conn(in)) & InClosed) != 0 + final protected def isClosed[T](in: Inlet[T]): Boolean = (conn(in).portState & InClosed) != 0 /** * Emits an element through the given output port. Calling this method twice before a [[pull()]] has been arrived @@ -439,17 +439,17 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount: */ final protected def push[T](out: Outlet[T], elem: T): Unit = { val connection = conn(out) - val portState = interpreter.portStates(connection) + val portState = connection.portState val it = interpreter - it.portStates(connection) = portState ^ PushStartFlip + connection.portState = portState ^ PushStartFlip if ((portState & (OutReady | OutClosed | InClosed)) == OutReady && (elem != null)) { - it.connectionSlots(connection) = elem + connection.slot = elem it.chasePush(connection) } else { // Restore state for the error case - it.portStates(connection) = portState + connection.portState = portState // Detailed error information should not add overhead to the hot path ReactiveStreamsCompliance.requireNonNullElement(elem) @@ -457,7 +457,7 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount: require(!isClosed(out), s"Cannot pull closed port ($out)") // No error, just InClosed caused the actual pull to be ignored, but the status flag still needs to be flipped - it.portStates(connection) = portState ^ PushStartFlip + connection.portState = portState ^ PushStartFlip } } @@ -523,12 +523,12 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount: * Return true if the given output port is ready to be pushed. */ final def isAvailable[T](out: Outlet[T]): Boolean = - (interpreter.portStates(conn(out)) & (OutReady | OutClosed)) == OutReady + (conn(out).portState & (OutReady | OutClosed)) == OutReady /** * Indicates whether the port has been closed. A closed port cannot be pushed. */ - final protected def isClosed[T](out: Outlet[T]): Boolean = (interpreter.portStates(conn(out)) & OutClosed) != 0 + final protected def isClosed[T](out: Outlet[T]): Boolean = (conn(out).portState & OutClosed) != 0 /** * Read a number of elements from the given inlet and continue with the given function, From 083bfb66926471f6ae46e5055de18d6eeb01d9bd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Endre=20S=C3=A1ndor=20Varga?= Date: Mon, 1 Aug 2016 12:20:30 +0200 Subject: [PATCH 7/7] Fixed bug in Chunker, exposed by interpreter changes --- .../stream/javadsl/cookbook/RecipeByteStrings.java | 11 +++++++++-- .../code/docs/stream/cookbook/RecipeByteStrings.scala | 11 +++++++++-- project/MiMa.scala | 5 ++++- 3 files changed, 22 insertions(+), 5 deletions(-) diff --git a/akka-docs/rst/java/code/docs/stream/javadsl/cookbook/RecipeByteStrings.java b/akka-docs/rst/java/code/docs/stream/javadsl/cookbook/RecipeByteStrings.java index 81ff223327..f9c7255473 100644 --- a/akka-docs/rst/java/code/docs/stream/javadsl/cookbook/RecipeByteStrings.java +++ b/akka-docs/rst/java/code/docs/stream/javadsl/cookbook/RecipeByteStrings.java @@ -100,8 +100,15 @@ public class RecipeByteStrings extends RecipeTest { @Override public void onUpstreamFinish() throws Exception { if (buffer.isEmpty()) completeStage(); - // elements left in buffer, keep accepting downstream pulls - // and push from buffer until buffer is emitted + else { + // There are elements left in buffer, so + // we keep accepting downstream pulls and push from buffer until emptied. + // + // It might be though, that the upstream finished while it was pulled, in which + // case we will not get an onPull from the downstream, because we already had one. + // In that case we need to emit from the buffer. + if (isAvailable(out)) emitChunk(); + } } }); } diff --git a/akka-docs/rst/scala/code/docs/stream/cookbook/RecipeByteStrings.scala b/akka-docs/rst/scala/code/docs/stream/cookbook/RecipeByteStrings.scala index 4c5f1d356e..8cea3af8d8 100644 --- a/akka-docs/rst/scala/code/docs/stream/cookbook/RecipeByteStrings.scala +++ b/akka-docs/rst/scala/code/docs/stream/cookbook/RecipeByteStrings.scala @@ -42,8 +42,15 @@ class RecipeByteStrings extends RecipeSpec { override def onUpstreamFinish(): Unit = { if (buffer.isEmpty) completeStage() - // elements left in buffer, keep accepting downstream pulls - // and push from buffer until buffer is emitted + else { + // There are elements left in buffer, so + // we keep accepting downstream pulls and push from buffer until emptied. + // + // It might be though, that the upstream finished while it was pulled, in which + // case we will not get an onPull from the downstream, because we already had one. + // In that case we need to emit from the buffer. + if (isAvailable(out)) emitChunk() + } } }) diff --git a/project/MiMa.scala b/project/MiMa.scala index 07112a6c53..4d61603f6d 100644 --- a/project/MiMa.scala +++ b/project/MiMa.scala @@ -903,7 +903,10 @@ object MiMa extends AutoPlugin { ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.cluster.pubsub.protobuf.msg.DistributedPubSubMessages#StatusOrBuilder.hasReplyToStatus"), // #20543 GraphStage subtypes should not be private to akka - ProblemFilters.exclude[DirectAbstractMethodProblem]("akka.stream.ActorMaterializer.actorOf") + ProblemFilters.exclude[DirectAbstractMethodProblem]("akka.stream.ActorMaterializer.actorOf"), + + // Interpreter internals change + ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.stream.stage.GraphStageLogic.portToConn") ), "2.4.9" -> Seq( // #20994 adding new decode method, since we're on JDK7+ now