diff --git a/akka-bench-jmh/src/main/scala/akka/actor/ScheduleBenchmark.scala b/akka-bench-jmh/src/main/scala/akka/actor/ScheduleBenchmark.scala index 0d17030901..4097f3a678 100644 --- a/akka-bench-jmh/src/main/scala/akka/actor/ScheduleBenchmark.scala +++ b/akka-bench-jmh/src/main/scala/akka/actor/ScheduleBenchmark.scala @@ -56,7 +56,7 @@ class ScheduleBenchmark { var promise: Promise[Any] = _ @Setup(Level.Iteration) - def setup():Unit = { + def setup():Unit = { winner = (to * ratio + 1).toInt promise = Promise[Any]() } diff --git a/akka-bench-jmh/src/main/scala/akka/http/HttpBenchmark.scala b/akka-bench-jmh/src/main/scala/akka/http/HttpBenchmark.scala index 061abd488e..c0718b9b1c 100644 --- a/akka-bench-jmh/src/main/scala/akka/http/HttpBenchmark.scala +++ b/akka-bench-jmh/src/main/scala/akka/http/HttpBenchmark.scala @@ -53,7 +53,7 @@ class HttpBenchmark { } @TearDown - def shutdown():Unit ={ + def shutdown():Unit = { Await.ready(Http().shutdownAllConnectionPools(), 1.second) binding.unbind() Await.result(system.terminate(), 5.seconds) diff --git a/akka-bench-jmh/src/main/scala/akka/stream/MaterializationBenchmark.scala b/akka-bench-jmh/src/main/scala/akka/stream/MaterializationBenchmark.scala index 3addd7cf97..76e5e562a6 100644 --- a/akka-bench-jmh/src/main/scala/akka/stream/MaterializationBenchmark.scala +++ b/akka-bench-jmh/src/main/scala/akka/stream/MaterializationBenchmark.scala @@ -117,5 +117,4 @@ class MaterializationBenchmark { @Benchmark def graph_with_imported_flow():Unit = graphWithImportedFlow.run() - } diff --git a/akka-docs/rst/java/code/docs/stream/GraphStageDocTest.java b/akka-docs/rst/java/code/docs/stream/GraphStageDocTest.java index c3e525c161..1bce706cc9 100644 --- a/akka-docs/rst/java/code/docs/stream/GraphStageDocTest.java +++ b/akka-docs/rst/java/code/docs/stream/GraphStageDocTest.java @@ -443,23 +443,22 @@ public class GraphStageDocTest extends AbstractJavaTest { public void demonstrateAnAsynchronousSideChannel() throws Exception{ // tests: + TestSubscriber.Probe out = TestSubscriber.probe(system); + TestPublisher.Probe in = TestPublisher.probe(0, system); + CompletableFuture switchF = new CompletableFuture<>(); Graph, NotUsed> killSwitch = Flow.fromGraph(new KillSwitch<>(switchF)); - ExecutionContext ec = system.dispatcher(); + Source.fromPublisher(in).via(killSwitch).to(Sink.fromSubscriber(out)).run(mat); - CompletionStage valueAfterKill = switchF.thenApply(in -> 4); - - - CompletionStage result = - Source.from(Arrays.asList(1, 2, 3)).concat(Source.fromCompletionStage(valueAfterKill)) - .via(killSwitch) - .runFold(0, (n, sum) -> n + sum, mat); + out.request(1); + in.sendNext(1); + out.expectNext(1); switchF.complete(Done.getInstance()); - assertEquals(new Integer(6), result.toCompletableFuture().get(3, TimeUnit.SECONDS)); + out.expectComplete(); } diff --git a/akka-docs/rst/scala/code/docs/stream/GraphStageDocSpec.scala b/akka-docs/rst/scala/code/docs/stream/GraphStageDocSpec.scala index 70b87d3913..fee76a9bcb 100644 --- a/akka-docs/rst/scala/code/docs/stream/GraphStageDocSpec.scala +++ b/akka-docs/rst/scala/code/docs/stream/GraphStageDocSpec.scala @@ -272,7 +272,6 @@ class GraphStageDocSpec extends AkkaSpec { "Demonstrate an asynchronous side channel" in { import system.dispatcher - //#async-side-channel // will close upstream in all materializations of the graph stage instance // when the future completes diff --git a/akka-http-core/src/test/scala/akka/http/impl/engine/ws/WSClientAutobahnTest.scala b/akka-http-core/src/test/scala/akka/http/impl/engine/ws/WSClientAutobahnTest.scala index 44c4dbe9b6..5f68cafeec 100644 --- a/akka-http-core/src/test/scala/akka/http/impl/engine/ws/WSClientAutobahnTest.scala +++ b/akka-http-core/src/test/scala/akka/http/impl/engine/ws/WSClientAutobahnTest.scala @@ -57,7 +57,6 @@ object WSClientAutobahnTest extends App { import Console._ info.flatMap { i ⇒ val prefix = f"$YELLOW${i.caseInfo.id}%-7s$RESET - $RESET${i.caseInfo.description}$RESET ... " - //println(prefix) status.onComplete { case Success((CaseStatus(status), millis)) ⇒ diff --git a/akka-persistence/src/main/java/akka/persistence/journal/japi/AsyncWritePlugin.java b/akka-persistence/src/main/java/akka/persistence/journal/japi/AsyncWritePlugin.java index 360bcd63a7..088176b9ad 100644 --- a/akka-persistence/src/main/java/akka/persistence/journal/japi/AsyncWritePlugin.java +++ b/akka-persistence/src/main/java/akka/persistence/journal/japi/AsyncWritePlugin.java @@ -61,7 +61,7 @@ interface AsyncWritePlugin { * result `Iterable` for the happy path, i.e. when no messages are rejected. * * Calls to this method are serialized by the enclosing journal actor. If you spawn - * work in asyncronous tasks it is alright that they complete the futures in any order, + * work in asynchronous tasks it is alright that they complete the futures in any order, * but the actual writes for a specific persistenceId should be serialized to avoid * issues such as events of a later write are visible to consumers (query side, or replay) * before the events of an earlier write are visible. This can also be done with diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteJournal.scala b/akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteJournal.scala index b53cca90a9..9e067c70be 100644 --- a/akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteJournal.scala +++ b/akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteJournal.scala @@ -212,7 +212,7 @@ trait AsyncWriteJournal extends Actor with WriteJournalBase with AsyncRecovery { * `Future.successful(Nil)` for the happy path, i.e. when no messages are rejected. * * Calls to this method are serialized by the enclosing journal actor. If you spawn - * work in asyncronous tasks it is alright that they complete the futures in any order, + * work in asynchronous tasks it is alright that they complete the futures in any order, * but the actual writes for a specific persistenceId should be serialized to avoid * issues such as events of a later write are visible to consumers (query side, or replay) * before the events of an earlier write are visible. diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java index af02af9684..17aef36bfe 100644 --- a/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java @@ -448,7 +448,6 @@ public class SourceTest extends StreamTest { @Test public void mustWorkFromFuture() throws Exception { - final JavaTestKit probe = new JavaTestKit(system); final Iterable input = Arrays.asList("A", "B", "C"); CompletionStage future1 = Source.from(input).runWith(Sink.head(), materializer); CompletionStage future2 = Source.fromCompletionStage(future1).runWith(Sink.head(), materializer); diff --git a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/GraphInterpreterSpecKit.scala b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/GraphInterpreterSpecKit.scala index a544e62656..6dafb8e1a8 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/GraphInterpreterSpecKit.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/GraphInterpreterSpecKit.scala @@ -74,7 +74,7 @@ trait GraphInterpreterSpecKit extends AkkaSpec { val (inHandlers, outHandlers, logics) = assembly.materialize(Attributes.none, assembly.stages.map(_.module), new java.util.HashMap, _ ⇒ ()) _interpreter = new GraphInterpreter(assembly, NoMaterializer, logger, inHandlers, outHandlers, logics, - (_, _, _) ⇒ (), fuzzingMode = false) + (_, _, _) ⇒ (), fuzzingMode = false, null) for ((upstream, i) ← upstreams.zipWithIndex) { _interpreter.attachUpstreamBoundary(i, upstream._1) @@ -92,7 +92,7 @@ trait GraphInterpreterSpecKit extends AkkaSpec { val (inHandlers, outHandlers, logics) = assembly.materialize(Attributes.none, assembly.stages.map(_.module), new java.util.HashMap, _ ⇒ ()) _interpreter = new GraphInterpreter(assembly, NoMaterializer, logger, inHandlers, outHandlers, logics, - (_, _, _) ⇒ (), fuzzingMode = false) + (_, _, _) ⇒ (), fuzzingMode = false, null) } def builder(stages: GraphStageWithMaterializedValue[_ <: Shape, _]*): AssemblyBuilder = new AssemblyBuilder(stages) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowFlattenMergeSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowFlattenMergeSpec.scala index 367fe31f73..3b428ee79a 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowFlattenMergeSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowFlattenMergeSpec.scala @@ -5,7 +5,7 @@ package akka.stream.scaladsl import akka.NotUsed import akka.testkit.AkkaSpec -import akka.stream.ActorMaterializer +import akka.stream.{ ActorMaterializerSettings, ActorMaterializer } import scala.concurrent._ import scala.concurrent.duration._ import org.scalatest.concurrent.ScalaFutures @@ -122,23 +122,22 @@ class FlowFlattenMergeSpec extends AkkaSpec { } "cancel substreams when failing map function" in { - val p1, p2 = TestPublisher.probe[Int]() + val settings = ActorMaterializerSettings(system).withSyncProcessingLimit(1).withInputBuffer(1, 1) + val mat = ActorMaterializer(settings) + val p = TestPublisher.probe[Int]() val ex = new Exception("buh") val latch = TestLatch() Source(1 to 3) .flatMapMerge(10, { - case 1 ⇒ Source.fromPublisher(p1) - case 2 ⇒ Source.fromPublisher(p2) - case 3 ⇒ + case 1 ⇒ Source.fromPublisher(p) + case 2 ⇒ Await.ready(latch, 3.seconds) throw ex }) - .runWith(Sink.head) - p1.expectRequest() - p2.expectRequest() + .runWith(Sink.head)(mat) + p.expectRequest() latch.countDown() - p1.expectCancellation() - p2.expectCancellation() + p.expectCancellation() } "cancel substreams when being cancelled" in { diff --git a/akka-stream/src/main/resources/reference.conf b/akka-stream/src/main/resources/reference.conf index 1d4396b7e5..d0e736ce55 100644 --- a/akka-stream/src/main/resources/reference.conf +++ b/akka-stream/src/main/resources/reference.conf @@ -45,17 +45,22 @@ akka { # this may cause an initial runtime overhead, but most of the time fusing is # desirable since it reduces the number of Actors that are created. auto-fusing = on - + # Those stream elements which have explicit buffers (like mapAsync, mapAsyncUnordered, # buffer, flatMapMerge, Source.actorRef, Source.queue, etc.) will preallocate a fixed # buffer upon stream materialization if the requested buffer size is less than this # configuration parameter. The default is very high because failing early is better # than failing under load. # - # Buffers sized larger than this will dynamically grow/shrink and consume more memory + # Buffers sized larger than this will dynamically grow/shrink and consume more memory # per element than the fixed size buffers. max-fixed-buffer-size = 1000000000 + # Maximum number of sync messages that actor can process for stream to substream communication. + # Parameter allows to interrupt synchronous processing to get upsteam/downstream messages. + # Allows to accelerate message processing that happening withing same actor but keep system responsive. + sync-processing-limit = 1000 + debug { # Enables the fuzzing mode which increases the chance of race conditions # by aggressively reordering events and making certain operations more diff --git a/akka-stream/src/main/scala/akka/stream/ActorMaterializer.scala b/akka-stream/src/main/scala/akka/stream/ActorMaterializer.scala index 730cb14977..00211faec2 100644 --- a/akka-stream/src/main/scala/akka/stream/ActorMaterializer.scala +++ b/akka-stream/src/main/scala/akka/stream/ActorMaterializer.scala @@ -9,6 +9,7 @@ import java.util.concurrent.atomic.{ AtomicBoolean } import akka.actor.{ ActorContext, ActorRef, ActorRefFactory, ActorSystem, ExtendedActorSystem, Props } import akka.event.LoggingAdapter +import akka.stream.ActorMaterializerSettings.defaultMaxFixedBufferSize import akka.stream.impl._ import com.typesafe.config.Config @@ -224,7 +225,7 @@ object ActorMaterializerSettings { * Create [[ActorMaterializerSettings]] from a Config subsection (Scala). */ def apply(config: Config): ActorMaterializerSettings = - ActorMaterializerSettings( + new ActorMaterializerSettings( initialInputBufferSize = config.getInt("initial-input-buffer-size"), maxInputBufferSize = config.getInt("max-input-buffer-size"), dispatcher = config.getString("dispatcher"), @@ -234,7 +235,8 @@ object ActorMaterializerSettings { outputBurstLimit = config.getInt("output-burst-limit"), fuzzingMode = config.getBoolean("debug.fuzzing-mode"), autoFusing = config.getBoolean("auto-fusing"), - maxFixedBufferSize = config.getInt("max-fixed-buffer-size")) + maxFixedBufferSize = config.getInt("max-fixed-buffer-size"), + syncProcessingLimit = config.getInt("sync-processing-limit")) /** * Create [[ActorMaterializerSettings]] from individual settings (Java). @@ -266,13 +268,14 @@ object ActorMaterializerSettings { def create(config: Config): ActorMaterializerSettings = apply(config) + private val defaultMaxFixedBufferSize = 1000 } /** * This class describes the configurable properties of the [[ActorMaterializer]]. * Please refer to the `withX` methods for descriptions of the individual settings. */ -final class ActorMaterializerSettings( +final class ActorMaterializerSettings private ( val initialInputBufferSize: Int, val maxInputBufferSize: Int, val dispatcher: String, @@ -282,9 +285,25 @@ final class ActorMaterializerSettings( val outputBurstLimit: Int, val fuzzingMode: Boolean, val autoFusing: Boolean, - val maxFixedBufferSize: Int) { + val maxFixedBufferSize: Int, + val syncProcessingLimit: Int) { + + def this(initialInputBufferSize: Int, + maxInputBufferSize: Int, + dispatcher: String, + supervisionDecider: Supervision.Decider, + subscriptionTimeoutSettings: StreamSubscriptionTimeoutSettings, + debugLogging: Boolean, + outputBurstLimit: Int, + fuzzingMode: Boolean, + autoFusing: Boolean, + maxFixedBufferSize: Int) { + this(initialInputBufferSize, maxInputBufferSize, dispatcher, supervisionDecider, subscriptionTimeoutSettings, debugLogging, + outputBurstLimit, fuzzingMode, autoFusing, maxFixedBufferSize, defaultMaxFixedBufferSize) + } require(initialInputBufferSize > 0, "initialInputBufferSize must be > 0") + require(syncProcessingLimit > 0, "syncProcessingLimit must be > 0") requirePowerOfTwo(maxInputBufferSize, "maxInputBufferSize") require(initialInputBufferSize <= maxInputBufferSize, s"initialInputBufferSize($initialInputBufferSize) must be <= maxInputBufferSize($maxInputBufferSize)") @@ -299,10 +318,12 @@ final class ActorMaterializerSettings( outputBurstLimit: Int = this.outputBurstLimit, fuzzingMode: Boolean = this.fuzzingMode, autoFusing: Boolean = this.autoFusing, - maxFixedBufferSize: Int = this.maxFixedBufferSize) = + maxFixedBufferSize: Int = this.maxFixedBufferSize, + syncProcessingLimit: Int = this.syncProcessingLimit) = { new ActorMaterializerSettings( initialInputBufferSize, maxInputBufferSize, dispatcher, supervisionDecider, subscriptionTimeoutSettings, debugLogging, - outputBurstLimit, fuzzingMode, autoFusing, maxFixedBufferSize) + outputBurstLimit, fuzzingMode, autoFusing, maxFixedBufferSize, syncProcessingLimit) + } /** * Each asynchronous piece of a materialized stream topology is executed by one Actor @@ -366,6 +387,13 @@ final class ActorMaterializerSettings( if (limit == this.outputBurstLimit) this else copy(outputBurstLimit = limit) + /** + * Limit for number of messages that can be processed synchronously in stream to substream communication + */ + def withSyncProcessingLimit(limit: Int): ActorMaterializerSettings = + if (limit == this.syncProcessingLimit) this + else copy(syncProcessingLimit = limit) + /** * Enable to log all elements that are dropped due to failures (at DEBUG level). */ @@ -413,12 +441,13 @@ final class ActorMaterializerSettings( s.subscriptionTimeoutSettings == subscriptionTimeoutSettings && s.debugLogging == debugLogging && s.outputBurstLimit == outputBurstLimit && + s.syncProcessingLimit == syncProcessingLimit && s.fuzzingMode == fuzzingMode && s.autoFusing == autoFusing case _ ⇒ false } - override def toString: String = s"ActorMaterializerSettings($initialInputBufferSize,$maxInputBufferSize,$dispatcher,$supervisionDecider,$subscriptionTimeoutSettings,$debugLogging,$outputBurstLimit,$fuzzingMode,$autoFusing)" + override def toString: String = s"ActorMaterializerSettings($initialInputBufferSize,$maxInputBufferSize,$dispatcher,$supervisionDecider,$subscriptionTimeoutSettings,$debugLogging,$outputBurstLimit,$syncProcessingLimit,$fuzzingMode,$autoFusing)" } object StreamSubscriptionTimeoutSettings { diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorGraphInterpreter.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorGraphInterpreter.scala index 61f6170767..dbb16227aa 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorGraphInterpreter.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorGraphInterpreter.scala @@ -3,14 +3,15 @@ */ package akka.stream.impl.fusing +import java.util import java.util.concurrent.TimeoutException import akka.actor._ import akka.event.Logging import akka.stream._ +import akka.stream.impl._ import akka.stream.impl.ReactiveStreamsCompliance._ import akka.stream.impl.StreamLayout.{ CompositeModule, CopiedModule, Module } import akka.stream.impl.fusing.GraphInterpreter.{ DownstreamBoundaryStageLogic, UpstreamBoundaryStageLogic, GraphAssembly } -import akka.stream.impl.{ ActorPublisher, ReactiveStreamsCompliance } import akka.stream.stage.{ GraphStageLogic, InHandler, OutHandler } import org.reactivestreams.{ Subscriber, Subscription } import scala.concurrent.forkjoin.ThreadLocalRandom @@ -18,7 +19,6 @@ import scala.util.control.NonFatal import akka.stream.impl.ActorMaterializerImpl import akka.stream.impl.SubFusingActorMaterializerImpl import scala.annotation.tailrec -import akka.stream.impl.StreamSupervisor /** * INTERNAL API @@ -316,8 +316,16 @@ private[stream] final class GraphInterpreterShell( private var self: ActorRef = _ lazy val log = Logging(mat.system.eventStream, self) - lazy val interpreter = new GraphInterpreter(assembly, mat, log, inHandlers, outHandlers, logics, - (logic, event, handler) ⇒ self ! AsyncInput(this, logic, event, handler), settings.fuzzingMode) + private var enqueueToShortCircuit: (Any) ⇒ Unit = _ + + lazy val interpreter: GraphInterpreter = new GraphInterpreter(assembly, mat, log, inHandlers, outHandlers, logics, + (logic, event, handler) ⇒ { + val asyncInput = AsyncInput(this, logic, event, handler) + val currentInterpreter = GraphInterpreter.currentInterpreterOrNull + if (currentInterpreter == null || (currentInterpreter.context ne self)) + self ! asyncInput + else enqueueToShortCircuit(asyncInput) + }, settings.fuzzingMode, self) private val inputs = new Array[BatchingActorInputBoundary](shape.inlets.size) private val outputs = new Array[ActorOutputBoundary](shape.outlets.size) @@ -332,18 +340,24 @@ private[stream] final class GraphInterpreterShell( * a self-message for fairness with other actors. The basic assumption here is * to give each input buffer slot a chance to run through the whole pipeline * and back (for the demand). + * + * Considered use case: + * - assume a composite Sink of one expand and one fold + * - assume an infinitely fast source of data + * - assume maxInputBufferSize == 1 + * - if the event limit is greater than maxInputBufferSize * (ins + outs) than there will always be expand activity + * because no data can enter “fast enough” from the outside */ - private val eventLimit = settings.maxInputBufferSize * (assembly.ins.length + assembly.outs.length) - + val shellEventLimit = settings.maxInputBufferSize * (assembly.ins.length + assembly.outs.length) // Limits the number of events processed by the interpreter on an abort event. // TODO: Better heuristic here - private val abortLimit = eventLimit * 2 + private val abortLimit = shellEventLimit * 2 private var resumeScheduled = false def isInitialized: Boolean = self != null - - def init(self: ActorRef, subMat: SubFusingActorMaterializerImpl): Unit = { + def init(self: ActorRef, subMat: SubFusingActorMaterializerImpl, enqueueToShortCircuit: (Any) ⇒ Unit, eventLimit: Int): Int = { this.self = self + this.enqueueToShortCircuit = enqueueToShortCircuit var i = 0 while (i < inputs.length) { val in = new BatchingActorInputBoundary(settings.maxInputBufferSize, i) @@ -360,66 +374,74 @@ private[stream] final class GraphInterpreterShell( i += 1 } interpreter.init(subMat) - runBatch() + runBatch(eventLimit) } - def receive(event: BoundaryEvent): Unit = - if (waitingForShutdown) event match { - case ExposedPublisher(_, id, publisher) ⇒ - outputs(id).exposedPublisher(publisher) - publishersPending -= 1 - if (canShutDown) _isTerminated = true - case OnSubscribe(_, _, sub) ⇒ - tryCancel(sub) - subscribesPending -= 1 - if (canShutDown) _isTerminated = true - case Abort(_) ⇒ - tryAbort(new TimeoutException("Streaming actor has been already stopped processing (normally), but not all of its " + - s"inputs or outputs have been subscribed in [${settings.subscriptionTimeoutSettings.timeout}}]. Aborting actor now.")) - case _ ⇒ // Ignore, there is nothing to do anyway - } - else event match { + def receive(event: BoundaryEvent, eventLimit: Int): Int = { + resumeScheduled = false + if (waitingForShutdown) { + event match { + case ExposedPublisher(_, id, publisher) ⇒ + outputs(id).exposedPublisher(publisher) + publishersPending -= 1 + if (canShutDown) _isTerminated = true + case OnSubscribe(_, _, sub) ⇒ + tryCancel(sub) + subscribesPending -= 1 + if (canShutDown) _isTerminated = true + case Abort(_) ⇒ + tryAbort(new TimeoutException("Streaming actor has been already stopped processing (normally), but not all of its " + + s"inputs or outputs have been subscribed in [${settings.subscriptionTimeoutSettings.timeout}}]. Aborting actor now.")) + case _ ⇒ // Ignore, there is nothing to do anyway + } + eventLimit + } else event match { // Cases that are most likely on the hot path, in decreasing order of frequency case OnNext(_, id: Int, e: Any) ⇒ if (GraphInterpreter.Debug) println(s"${interpreter.Name} onNext $e id=$id") inputs(id).onNext(e) - runBatch() + runBatch(eventLimit) case RequestMore(_, id: Int, demand: Long) ⇒ if (GraphInterpreter.Debug) println(s"${interpreter.Name} request $demand id=$id") outputs(id).requestMore(demand) - runBatch() + runBatch(eventLimit) case Resume(_) ⇒ if (GraphInterpreter.Debug) println(s"${interpreter.Name} resume") - resumeScheduled = false - if (interpreter.isSuspended) runBatch() + if (interpreter.isSuspended) runBatch(eventLimit) else eventLimit case AsyncInput(_, logic, event, handler) ⇒ interpreter.runAsyncInput(logic, event, handler) - runBatch() + if (eventLimit == 1 && interpreter.isSuspended) { + sendResume(true) + 0 + } else runBatch(eventLimit - 1) // Initialization and completion messages case OnError(_, id: Int, cause: Throwable) ⇒ if (GraphInterpreter.Debug) println(s"${interpreter.Name} onError id=$id") inputs(id).onError(cause) - runBatch() + runBatch(eventLimit) case OnComplete(_, id: Int) ⇒ if (GraphInterpreter.Debug) println(s"${interpreter.Name} onComplete id=$id") inputs(id).onComplete() - runBatch() + runBatch(eventLimit) case OnSubscribe(_, id: Int, subscription: Subscription) ⇒ if (GraphInterpreter.Debug) println(s"${interpreter.Name} onSubscribe id=$id") subscribesPending -= 1 inputs(id).onSubscribe(subscription) - runBatch() + runBatch(eventLimit) case Cancel(_, id: Int) ⇒ if (GraphInterpreter.Debug) println(s"${interpreter.Name} cancel id=$id") outputs(id).cancel() - runBatch() + runBatch(eventLimit) case SubscribePending(_, id: Int) ⇒ outputs(id).subscribePending() + eventLimit case ExposedPublisher(_, id, publisher) ⇒ publishersPending -= 1 outputs(id).exposedPublisher(publisher) + eventLimit } + } private var _isTerminated = false def isTerminated: Boolean = _isTerminated @@ -430,16 +452,16 @@ private[stream] final class GraphInterpreterShell( private val resume = Resume(this) - private def runBatch(): Unit = { + def sendResume(sendResume: Boolean): Unit = { + resumeScheduled = true + if (sendResume) self ! resume + else enqueueToShortCircuit(resume) + } + + def runBatch(actorEventLimit: Int): Int = { try { - val effectiveLimit = { - if (!settings.fuzzingMode) eventLimit - else { - if (ThreadLocalRandom.current().nextBoolean()) Thread.`yield`() - ThreadLocalRandom.current().nextInt(2) // 1 or zero events to be processed - } - } - interpreter.execute(effectiveLimit) + val usingShellLimit = shellEventLimit < actorEventLimit + val remainingQuota = interpreter.execute(Math.min(actorEventLimit, shellEventLimit)) if (interpreter.isCompleted) { // Cannot stop right away if not completely subscribed if (canShutDown) _isTerminated = true @@ -449,13 +471,13 @@ private[stream] final class GraphInterpreterShell( override def run(): Unit = self ! Abort(GraphInterpreterShell.this) }) } - } else if (interpreter.isSuspended && !resumeScheduled) { - resumeScheduled = true - self ! resume - } + } else if (interpreter.isSuspended && !resumeScheduled) sendResume(!usingShellLimit) + + if (usingShellLimit) actorEventLimit - shellEventLimit + remainingQuota else remainingQuota } catch { case NonFatal(e) ⇒ tryAbort(e) + actorEventLimit - 1 } } @@ -498,7 +520,7 @@ private[stream] class ActorGraphInterpreter(_initial: GraphInterpreterShell) ext def tryInit(shell: GraphInterpreterShell): Boolean = try { - shell.init(self, subFusingMaterializerImpl) + currentLimit = shell.init(self, subFusingMaterializerImpl, enqueueToShortCircuit(_), currentLimit) if (GraphInterpreter.Debug) println(s"registering new shell in ${_initial}\n ${shell.toString.replace("\n", "\n ")}") if (shell.isTerminated) false else { @@ -511,9 +533,20 @@ private[stream] class ActorGraphInterpreter(_initial: GraphInterpreterShell) ext false } + //this limits number of messages that can be processed synchronously during one actor receive. + private val eventLimit: Int = _initial.mat.settings.syncProcessingLimit + private var currentLimit: Int = eventLimit + //this is a var in order to save the allocation when no short-circuiting actually happens + private var shortCircuitBuffer: util.LinkedList[Any] = null + + def enqueueToShortCircuit(input: Any): Unit = { + if (shortCircuitBuffer == null) shortCircuitBuffer = new util.LinkedList[Any]() + shortCircuitBuffer.add(input) + } + def registerShell(shell: GraphInterpreterShell): ActorRef = { newShells ::= shell - self ! Resume + enqueueToShortCircuit(Resume) self } @@ -537,35 +570,57 @@ private[stream] class ActorGraphInterpreter(_initial: GraphInterpreterShell) ext override def preStart(): Unit = { tryInit(_initial) if (activeInterpreters.isEmpty) context.stop(self) + else if (shortCircuitBuffer != null) shortCircuitBatch() + } + + private def shortCircuitBatch(): Unit = { + while (!shortCircuitBuffer.isEmpty && currentLimit > 0 && activeInterpreters.nonEmpty) + shortCircuitBuffer.poll() match { + case b: BoundaryEvent ⇒ processEvent(b) + case Resume ⇒ finishShellRegistration() + } + if (!shortCircuitBuffer.isEmpty && currentLimit == 0) self ! Resume + } + + private def processEvent(b: BoundaryEvent): Unit = { + val shell = b.shell + if (!shell.isTerminated && (shell.isInitialized || tryInit(shell))) { + currentLimit = shell.receive(b, currentLimit) + if (shell.isTerminated) { + activeInterpreters -= shell + if (activeInterpreters.isEmpty && newShells.isEmpty) context.stop(self) + } + } } override def receive: Receive = { case b: BoundaryEvent ⇒ - val shell = b.shell - if (!shell.isTerminated && (shell.isInitialized || tryInit(shell))) { - shell.receive(b) - if (shell.isTerminated) { - activeInterpreters -= shell - if (activeInterpreters.isEmpty && newShells.isEmpty) context.stop(self) - } - } - case Resume ⇒ finishShellRegistration() + currentLimit = eventLimit + processEvent(b) + if (shortCircuitBuffer != null) shortCircuitBatch() + + case Resume ⇒ + currentLimit = eventLimit + if (shortCircuitBuffer != null) shortCircuitBatch() + case StreamSupervisor.PrintDebugDump ⇒ - println(s"activeShells:") + val builder = new java.lang.StringBuilder(s"activeShells (actor: $self):\n") activeInterpreters.foreach { shell ⇒ - println(" " + shell.toString.replace("\n", "\n ")) - shell.interpreter.dumpWaits() + builder.append(" " + shell.toString.replace("\n", "\n ")) + builder.append(shell.interpreter.toString) } - println(s"newShells:") + builder.append(s"newShells:") newShells.foreach { shell ⇒ - println(" " + shell.toString.replace("\n", "\n ")) - shell.interpreter.dumpWaits() + builder.append(" " + shell.toString.replace("\n", "\n ")) + builder.append(shell.interpreter.toString) } + println(builder) } override def postStop(): Unit = { val ex = AbruptTerminationException(self) activeInterpreters.foreach(_.tryAbort(ex)) + activeInterpreters = Set.empty[GraphInterpreterShell] newShells.foreach(s ⇒ if (tryInit(s)) s.tryAbort(ex)) } -} +} \ No newline at end of file diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphInterpreter.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphInterpreter.scala index 04821b1721..07bac7504b 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphInterpreter.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphInterpreter.scala @@ -4,13 +4,13 @@ package akka.stream.impl.fusing import java.util.Arrays +import akka.actor.ActorRef import akka.event.LoggingAdapter import akka.stream.stage._ import scala.annotation.tailrec import scala.collection.immutable import akka.stream._ import akka.stream.impl.StreamLayout._ -import akka.stream.impl.fusing.GraphStages.MaterializedValueSource import scala.concurrent.forkjoin.ThreadLocalRandom import scala.util.control.NonFatal import java.{ util ⇒ ju } @@ -193,12 +193,12 @@ private[akka] object GraphInterpreter { override def toString: String = "GraphAssembly\n " + - stages.mkString("[", ",", "]") + "\n " + - originalAttributes.mkString("[", ",", "]") + "\n " + - ins.mkString("[", ",", "]") + "\n " + - inOwners.mkString("[", ",", "]") + "\n " + - outs.mkString("[", ",", "]") + "\n " + - outOwners.mkString("[", ",", "]") + stages.mkString("Stages: [", ",", "]") + "\n " + + originalAttributes.mkString("Attributes: [", ",", "]") + "\n " + + ins.mkString("Inlets: [", ",", "]") + "\n " + + inOwners.mkString("InOwners: [", ",", "]") + "\n " + + outs.mkString("Outlets: [", ",", "]") + "\n " + + outOwners.mkString("OutOwners: [", ",", "]") } object GraphAssembly { @@ -348,7 +348,8 @@ private[stream] final class GraphInterpreter( val outHandlers: Array[OutHandler], // Lookup table for the outHandler of the connection val logics: Array[GraphStageLogic], // Array of stage logics val onAsyncInput: (GraphStageLogic, Any, (Any) ⇒ Unit) ⇒ Unit, - val fuzzingMode: Boolean) { + val fuzzingMode: Boolean, + val context: ActorRef) { import GraphInterpreter._ // Maintains additional information for events, basically elements in-flight, or failure. @@ -523,13 +524,13 @@ private[stream] final class GraphInterpreter( * Executes pending events until the given limit is met. If there were remaining events, isSuspended will return * true. */ - def execute(eventLimit: Int): Unit = { + def execute(eventLimit: Int): Int = { if (Debug) println(s"$Name ---------------- EXECUTE $queueStatus (running=$runningStages, shutdown=$shutdownCounters)") val currentInterpreterHolder = _currentInterpreter.get() val previousInterpreter = currentInterpreterHolder(0) currentInterpreterHolder(0) = this + var eventsRemaining = eventLimit try { - var eventsRemaining = eventLimit while (eventsRemaining > 0 && queueTail != queueHead) { val connection = dequeue() try processEvent(connection) @@ -551,6 +552,7 @@ private[stream] final class GraphInterpreter( } if (Debug) println(s"$Name ---------------- $queueStatus (running=$runningStages, shutdown=$shutdownCounters)") // TODO: deadlock detection + eventsRemaining } def runAsyncInput(logic: GraphStageLogic, evt: Any, handler: (Any) ⇒ Unit): Unit = @@ -654,7 +656,7 @@ private[stream] final class GraphInterpreter( finalizeStage(logic) } - // Returns true if the given stage is alredy completed + // Returns true if the given stage is already completed def isStageCompleted(stage: GraphStageLogic): Boolean = stage != null && shutdownCounter(stage.stageId) == 0 // Register that a connection in which the given stage participated has been completed and therefore the stage @@ -734,12 +736,13 @@ private[stream] final class GraphInterpreter( * Only invoke this after the interpreter completely settled, otherwise the results might be off. This is a very * simplistic tool, make sure you are understanding what you are doing and then it will serve you well. */ - def dumpWaits(): Unit = { - println("digraph waits {") + def dumpWaits(): Unit = println(toString) - for (i ← assembly.stages.indices) { - println(s"""N$i [label="${assembly.stages(i)}"]""") - } + override def toString: String = { + val builder = new StringBuilder("digraph waits {\n") + + for (i ← assembly.stages.indices) + builder.append(s"""N$i [label="${assembly.stages(i)}"]""" + "\n") def nameIn(port: Int): String = { val owner = assembly.inOwners(port) @@ -756,17 +759,18 @@ private[stream] final class GraphInterpreter( for (i ← portStates.indices) { portStates(i) match { case InReady ⇒ - println(s""" ${nameIn(i)} -> ${nameOut(i)} [label="shouldPull"; color=blue]; """) + builder.append(s""" ${nameIn(i)} -> ${nameOut(i)} [label=shouldPull; color=blue]""") case OutReady ⇒ - println(s""" ${nameOut(i)} -> ${nameIn(i)} [label="shouldPush"; color=red]; """) + builder.append(s""" ${nameOut(i)} -> ${nameIn(i)} [label=shouldPush; color=red];""") case x if (x | InClosed | OutClosed) == (InClosed | OutClosed) ⇒ - println(s""" ${nameIn(i)} -> ${nameOut(i)} [style=dotted; label="closed" dir=both]; """) + builder.append(s""" ${nameIn(i)} -> ${nameOut(i)} [style=dotted; label=closed dir=both];""") case _ ⇒ } - + builder.append("\n") } - println("}") - println(s"// $queueStatus (running=$runningStages, shutdown=${shutdownCounter.mkString(",")})") + builder.append("}\n") + builder.append(s"// $queueStatus (running=$runningStages, shutdown=${shutdownCounter.mkString(",")})") + builder.toString() } } diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/IteratorInterpreter.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/IteratorInterpreter.scala index 32c7aee126..acd43f87c1 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/IteratorInterpreter.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/IteratorInterpreter.scala @@ -145,7 +145,8 @@ private[akka] class IteratorInterpreter[I, O](val input: Iterator[I], val ops: S outHandlers, logics, (_, _, _) ⇒ throw new UnsupportedOperationException("IteratorInterpreter does not support asynchronous events."), - fuzzingMode = false) + fuzzingMode = false, + null) interpreter.attachUpstreamBoundary(0, upstream) interpreter.attachDownstreamBoundary(ops.length, downstream) interpreter.init(null) diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/StreamOfStreams.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/StreamOfStreams.scala index e2791f917c..13627664e8 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/StreamOfStreams.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/StreamOfStreams.scala @@ -77,9 +77,7 @@ final class FlattenMerge[T, M](breadth: Int) extends GraphStage[FlowShape[Graph[ q.enqueue(sinkIn) } } - override def onUpstreamFinish(): Unit = { - if (!sinkIn.isAvailable) removeSource(sinkIn) - } + override def onUpstreamFinish(): Unit = if (!sinkIn.isAvailable) removeSource(sinkIn) }) sinkIn.pull() sources += sinkIn @@ -93,9 +91,8 @@ final class FlattenMerge[T, M](breadth: Int) extends GraphStage[FlowShape[Graph[ if (activeSources == 0 && isClosed(in)) completeStage() } - override def postStop(): Unit = { - sources.foreach(_.cancel()) - } + override def postStop(): Unit = sources.foreach(_.cancel()) + } override def toString: String = s"FlattenMerge($breadth)" diff --git a/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala b/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala index 68417be70e..3b0ce2f82d 100644 --- a/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala +++ b/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala @@ -11,7 +11,8 @@ import akka.japi.function.{ Effect, Procedure } import akka.stream._ import akka.stream.impl.StreamLayout.Module import akka.stream.impl.fusing.{ GraphInterpreter, GraphStageModule, SubSource, SubSink } -import akka.stream.impl.{ ReactiveStreamsCompliance} +import akka.stream.impl.ReactiveStreamsCompliance +import scala.collection.mutable.ArrayBuffer import scala.collection.{ immutable, mutable } import scala.concurrent.duration.FiniteDuration import akka.stream.actor.ActorSubscriberMessage diff --git a/project/MiMa.scala b/project/MiMa.scala index 4d47c4757e..8df235435f 100644 --- a/project/MiMa.scala +++ b/project/MiMa.scala @@ -646,6 +646,10 @@ object MiMa extends AutoPlugin { ), "2.4.2" -> Seq( FilterAnyProblemStartingWith("akka.stream.impl.VirtualProcessor"), + ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.stream.impl.fusing.GraphInterpreter.execute"), + ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.fusing.GraphInterpreter.this"), + ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.fusing.GraphInterpreterShell.init"), + ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.fusing.GraphInterpreterShell.receive"), ProblemFilters.exclude[MissingClassProblem]("akka.stream.impl.Stages$Drop"), ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.stream.impl.MaterializerSession.assignPort"), ProblemFilters.exclude[MissingClassProblem]("akka.stream.impl.Stages$Drop$"),