=str #19299 Performance Flow.flatMapMerge

This commit is contained in:
Alexander Golubev 2016-02-02 16:39:47 -05:00
parent ad8ab128c4
commit b2b2ce44b5
19 changed files with 224 additions and 134 deletions

View file

@ -56,7 +56,7 @@ class ScheduleBenchmark {
var promise: Promise[Any] = _ var promise: Promise[Any] = _
@Setup(Level.Iteration) @Setup(Level.Iteration)
def setup():Unit = { def setup():Unit = {
winner = (to * ratio + 1).toInt winner = (to * ratio + 1).toInt
promise = Promise[Any]() promise = Promise[Any]()
} }

View file

@ -53,7 +53,7 @@ class HttpBenchmark {
} }
@TearDown @TearDown
def shutdown():Unit ={ def shutdown():Unit = {
Await.ready(Http().shutdownAllConnectionPools(), 1.second) Await.ready(Http().shutdownAllConnectionPools(), 1.second)
binding.unbind() binding.unbind()
Await.result(system.terminate(), 5.seconds) Await.result(system.terminate(), 5.seconds)

View file

@ -117,5 +117,4 @@ class MaterializationBenchmark {
@Benchmark @Benchmark
def graph_with_imported_flow():Unit = graphWithImportedFlow.run() def graph_with_imported_flow():Unit = graphWithImportedFlow.run()
} }

View file

@ -443,23 +443,22 @@ public class GraphStageDocTest extends AbstractJavaTest {
public void demonstrateAnAsynchronousSideChannel() throws Exception{ public void demonstrateAnAsynchronousSideChannel() throws Exception{
// tests: // tests:
TestSubscriber.Probe<Integer> out = TestSubscriber.probe(system);
TestPublisher.Probe<Integer> in = TestPublisher.probe(0, system);
CompletableFuture<Done> switchF = new CompletableFuture<>(); CompletableFuture<Done> switchF = new CompletableFuture<>();
Graph<FlowShape<Integer, Integer>, NotUsed> killSwitch = Graph<FlowShape<Integer, Integer>, NotUsed> killSwitch =
Flow.fromGraph(new KillSwitch<>(switchF)); Flow.fromGraph(new KillSwitch<>(switchF));
ExecutionContext ec = system.dispatcher(); Source.fromPublisher(in).via(killSwitch).to(Sink.fromSubscriber(out)).run(mat);
CompletionStage<Integer> valueAfterKill = switchF.thenApply(in -> 4); out.request(1);
in.sendNext(1);
out.expectNext(1);
CompletionStage<Integer> result =
Source.from(Arrays.asList(1, 2, 3)).concat(Source.fromCompletionStage(valueAfterKill))
.via(killSwitch)
.runFold(0, (n, sum) -> n + sum, mat);
switchF.complete(Done.getInstance()); switchF.complete(Done.getInstance());
assertEquals(new Integer(6), result.toCompletableFuture().get(3, TimeUnit.SECONDS)); out.expectComplete();
} }

View file

@ -272,7 +272,6 @@ class GraphStageDocSpec extends AkkaSpec {
"Demonstrate an asynchronous side channel" in { "Demonstrate an asynchronous side channel" in {
import system.dispatcher import system.dispatcher
//#async-side-channel //#async-side-channel
// will close upstream in all materializations of the graph stage instance // will close upstream in all materializations of the graph stage instance
// when the future completes // when the future completes

View file

@ -57,7 +57,6 @@ object WSClientAutobahnTest extends App {
import Console._ import Console._
info.flatMap { i info.flatMap { i
val prefix = f"$YELLOW${i.caseInfo.id}%-7s$RESET - $RESET${i.caseInfo.description}$RESET ... " val prefix = f"$YELLOW${i.caseInfo.id}%-7s$RESET - $RESET${i.caseInfo.description}$RESET ... "
//println(prefix)
status.onComplete { status.onComplete {
case Success((CaseStatus(status), millis)) case Success((CaseStatus(status), millis))

View file

@ -61,7 +61,7 @@ interface AsyncWritePlugin {
* result `Iterable` for the happy path, i.e. when no messages are rejected. * result `Iterable` for the happy path, i.e. when no messages are rejected.
* *
* Calls to this method are serialized by the enclosing journal actor. If you spawn * Calls to this method are serialized by the enclosing journal actor. If you spawn
* work in asyncronous tasks it is alright that they complete the futures in any order, * work in asynchronous tasks it is alright that they complete the futures in any order,
* but the actual writes for a specific persistenceId should be serialized to avoid * but the actual writes for a specific persistenceId should be serialized to avoid
* issues such as events of a later write are visible to consumers (query side, or replay) * issues such as events of a later write are visible to consumers (query side, or replay)
* before the events of an earlier write are visible. This can also be done with * before the events of an earlier write are visible. This can also be done with

View file

@ -212,7 +212,7 @@ trait AsyncWriteJournal extends Actor with WriteJournalBase with AsyncRecovery {
* `Future.successful(Nil)` for the happy path, i.e. when no messages are rejected. * `Future.successful(Nil)` for the happy path, i.e. when no messages are rejected.
* *
* Calls to this method are serialized by the enclosing journal actor. If you spawn * Calls to this method are serialized by the enclosing journal actor. If you spawn
* work in asyncronous tasks it is alright that they complete the futures in any order, * work in asynchronous tasks it is alright that they complete the futures in any order,
* but the actual writes for a specific persistenceId should be serialized to avoid * but the actual writes for a specific persistenceId should be serialized to avoid
* issues such as events of a later write are visible to consumers (query side, or replay) * issues such as events of a later write are visible to consumers (query side, or replay)
* before the events of an earlier write are visible. * before the events of an earlier write are visible.

View file

@ -448,7 +448,6 @@ public class SourceTest extends StreamTest {
@Test @Test
public void mustWorkFromFuture() throws Exception { public void mustWorkFromFuture() throws Exception {
final JavaTestKit probe = new JavaTestKit(system);
final Iterable<String> input = Arrays.asList("A", "B", "C"); final Iterable<String> input = Arrays.asList("A", "B", "C");
CompletionStage<String> future1 = Source.from(input).runWith(Sink.<String>head(), materializer); CompletionStage<String> future1 = Source.from(input).runWith(Sink.<String>head(), materializer);
CompletionStage<String> future2 = Source.fromCompletionStage(future1).runWith(Sink.<String>head(), materializer); CompletionStage<String> future2 = Source.fromCompletionStage(future1).runWith(Sink.<String>head(), materializer);

View file

@ -74,7 +74,7 @@ trait GraphInterpreterSpecKit extends AkkaSpec {
val (inHandlers, outHandlers, logics) = val (inHandlers, outHandlers, logics) =
assembly.materialize(Attributes.none, assembly.stages.map(_.module), new java.util.HashMap, _ ()) assembly.materialize(Attributes.none, assembly.stages.map(_.module), new java.util.HashMap, _ ())
_interpreter = new GraphInterpreter(assembly, NoMaterializer, logger, inHandlers, outHandlers, logics, _interpreter = new GraphInterpreter(assembly, NoMaterializer, logger, inHandlers, outHandlers, logics,
(_, _, _) (), fuzzingMode = false) (_, _, _) (), fuzzingMode = false, null)
for ((upstream, i) upstreams.zipWithIndex) { for ((upstream, i) upstreams.zipWithIndex) {
_interpreter.attachUpstreamBoundary(i, upstream._1) _interpreter.attachUpstreamBoundary(i, upstream._1)
@ -92,7 +92,7 @@ trait GraphInterpreterSpecKit extends AkkaSpec {
val (inHandlers, outHandlers, logics) = val (inHandlers, outHandlers, logics) =
assembly.materialize(Attributes.none, assembly.stages.map(_.module), new java.util.HashMap, _ ()) assembly.materialize(Attributes.none, assembly.stages.map(_.module), new java.util.HashMap, _ ())
_interpreter = new GraphInterpreter(assembly, NoMaterializer, logger, inHandlers, outHandlers, logics, _interpreter = new GraphInterpreter(assembly, NoMaterializer, logger, inHandlers, outHandlers, logics,
(_, _, _) (), fuzzingMode = false) (_, _, _) (), fuzzingMode = false, null)
} }
def builder(stages: GraphStageWithMaterializedValue[_ <: Shape, _]*): AssemblyBuilder = new AssemblyBuilder(stages) def builder(stages: GraphStageWithMaterializedValue[_ <: Shape, _]*): AssemblyBuilder = new AssemblyBuilder(stages)

View file

@ -5,7 +5,7 @@ package akka.stream.scaladsl
import akka.NotUsed import akka.NotUsed
import akka.testkit.AkkaSpec import akka.testkit.AkkaSpec
import akka.stream.ActorMaterializer import akka.stream.{ ActorMaterializerSettings, ActorMaterializer }
import scala.concurrent._ import scala.concurrent._
import scala.concurrent.duration._ import scala.concurrent.duration._
import org.scalatest.concurrent.ScalaFutures import org.scalatest.concurrent.ScalaFutures
@ -122,23 +122,22 @@ class FlowFlattenMergeSpec extends AkkaSpec {
} }
"cancel substreams when failing map function" in { "cancel substreams when failing map function" in {
val p1, p2 = TestPublisher.probe[Int]() val settings = ActorMaterializerSettings(system).withSyncProcessingLimit(1).withInputBuffer(1, 1)
val mat = ActorMaterializer(settings)
val p = TestPublisher.probe[Int]()
val ex = new Exception("buh") val ex = new Exception("buh")
val latch = TestLatch() val latch = TestLatch()
Source(1 to 3) Source(1 to 3)
.flatMapMerge(10, { .flatMapMerge(10, {
case 1 Source.fromPublisher(p1) case 1 Source.fromPublisher(p)
case 2 Source.fromPublisher(p2) case 2
case 3
Await.ready(latch, 3.seconds) Await.ready(latch, 3.seconds)
throw ex throw ex
}) })
.runWith(Sink.head) .runWith(Sink.head)(mat)
p1.expectRequest() p.expectRequest()
p2.expectRequest()
latch.countDown() latch.countDown()
p1.expectCancellation() p.expectCancellation()
p2.expectCancellation()
} }
"cancel substreams when being cancelled" in { "cancel substreams when being cancelled" in {

View file

@ -45,17 +45,22 @@ akka {
# this may cause an initial runtime overhead, but most of the time fusing is # this may cause an initial runtime overhead, but most of the time fusing is
# desirable since it reduces the number of Actors that are created. # desirable since it reduces the number of Actors that are created.
auto-fusing = on auto-fusing = on
# Those stream elements which have explicit buffers (like mapAsync, mapAsyncUnordered, # Those stream elements which have explicit buffers (like mapAsync, mapAsyncUnordered,
# buffer, flatMapMerge, Source.actorRef, Source.queue, etc.) will preallocate a fixed # buffer, flatMapMerge, Source.actorRef, Source.queue, etc.) will preallocate a fixed
# buffer upon stream materialization if the requested buffer size is less than this # buffer upon stream materialization if the requested buffer size is less than this
# configuration parameter. The default is very high because failing early is better # configuration parameter. The default is very high because failing early is better
# than failing under load. # than failing under load.
# #
# Buffers sized larger than this will dynamically grow/shrink and consume more memory # Buffers sized larger than this will dynamically grow/shrink and consume more memory
# per element than the fixed size buffers. # per element than the fixed size buffers.
max-fixed-buffer-size = 1000000000 max-fixed-buffer-size = 1000000000
# Maximum number of sync messages that actor can process for stream to substream communication.
# Parameter allows to interrupt synchronous processing to get upsteam/downstream messages.
# Allows to accelerate message processing that happening withing same actor but keep system responsive.
sync-processing-limit = 1000
debug { debug {
# Enables the fuzzing mode which increases the chance of race conditions # Enables the fuzzing mode which increases the chance of race conditions
# by aggressively reordering events and making certain operations more # by aggressively reordering events and making certain operations more

View file

@ -9,6 +9,7 @@ import java.util.concurrent.atomic.{ AtomicBoolean }
import akka.actor.{ ActorContext, ActorRef, ActorRefFactory, ActorSystem, ExtendedActorSystem, Props } import akka.actor.{ ActorContext, ActorRef, ActorRefFactory, ActorSystem, ExtendedActorSystem, Props }
import akka.event.LoggingAdapter import akka.event.LoggingAdapter
import akka.stream.ActorMaterializerSettings.defaultMaxFixedBufferSize
import akka.stream.impl._ import akka.stream.impl._
import com.typesafe.config.Config import com.typesafe.config.Config
@ -224,7 +225,7 @@ object ActorMaterializerSettings {
* Create [[ActorMaterializerSettings]] from a Config subsection (Scala). * Create [[ActorMaterializerSettings]] from a Config subsection (Scala).
*/ */
def apply(config: Config): ActorMaterializerSettings = def apply(config: Config): ActorMaterializerSettings =
ActorMaterializerSettings( new ActorMaterializerSettings(
initialInputBufferSize = config.getInt("initial-input-buffer-size"), initialInputBufferSize = config.getInt("initial-input-buffer-size"),
maxInputBufferSize = config.getInt("max-input-buffer-size"), maxInputBufferSize = config.getInt("max-input-buffer-size"),
dispatcher = config.getString("dispatcher"), dispatcher = config.getString("dispatcher"),
@ -234,7 +235,8 @@ object ActorMaterializerSettings {
outputBurstLimit = config.getInt("output-burst-limit"), outputBurstLimit = config.getInt("output-burst-limit"),
fuzzingMode = config.getBoolean("debug.fuzzing-mode"), fuzzingMode = config.getBoolean("debug.fuzzing-mode"),
autoFusing = config.getBoolean("auto-fusing"), autoFusing = config.getBoolean("auto-fusing"),
maxFixedBufferSize = config.getInt("max-fixed-buffer-size")) maxFixedBufferSize = config.getInt("max-fixed-buffer-size"),
syncProcessingLimit = config.getInt("sync-processing-limit"))
/** /**
* Create [[ActorMaterializerSettings]] from individual settings (Java). * Create [[ActorMaterializerSettings]] from individual settings (Java).
@ -266,13 +268,14 @@ object ActorMaterializerSettings {
def create(config: Config): ActorMaterializerSettings = def create(config: Config): ActorMaterializerSettings =
apply(config) apply(config)
private val defaultMaxFixedBufferSize = 1000
} }
/** /**
* This class describes the configurable properties of the [[ActorMaterializer]]. * This class describes the configurable properties of the [[ActorMaterializer]].
* Please refer to the `withX` methods for descriptions of the individual settings. * Please refer to the `withX` methods for descriptions of the individual settings.
*/ */
final class ActorMaterializerSettings( final class ActorMaterializerSettings private (
val initialInputBufferSize: Int, val initialInputBufferSize: Int,
val maxInputBufferSize: Int, val maxInputBufferSize: Int,
val dispatcher: String, val dispatcher: String,
@ -282,9 +285,25 @@ final class ActorMaterializerSettings(
val outputBurstLimit: Int, val outputBurstLimit: Int,
val fuzzingMode: Boolean, val fuzzingMode: Boolean,
val autoFusing: Boolean, val autoFusing: Boolean,
val maxFixedBufferSize: Int) { val maxFixedBufferSize: Int,
val syncProcessingLimit: Int) {
def this(initialInputBufferSize: Int,
maxInputBufferSize: Int,
dispatcher: String,
supervisionDecider: Supervision.Decider,
subscriptionTimeoutSettings: StreamSubscriptionTimeoutSettings,
debugLogging: Boolean,
outputBurstLimit: Int,
fuzzingMode: Boolean,
autoFusing: Boolean,
maxFixedBufferSize: Int) {
this(initialInputBufferSize, maxInputBufferSize, dispatcher, supervisionDecider, subscriptionTimeoutSettings, debugLogging,
outputBurstLimit, fuzzingMode, autoFusing, maxFixedBufferSize, defaultMaxFixedBufferSize)
}
require(initialInputBufferSize > 0, "initialInputBufferSize must be > 0") require(initialInputBufferSize > 0, "initialInputBufferSize must be > 0")
require(syncProcessingLimit > 0, "syncProcessingLimit must be > 0")
requirePowerOfTwo(maxInputBufferSize, "maxInputBufferSize") requirePowerOfTwo(maxInputBufferSize, "maxInputBufferSize")
require(initialInputBufferSize <= maxInputBufferSize, s"initialInputBufferSize($initialInputBufferSize) must be <= maxInputBufferSize($maxInputBufferSize)") require(initialInputBufferSize <= maxInputBufferSize, s"initialInputBufferSize($initialInputBufferSize) must be <= maxInputBufferSize($maxInputBufferSize)")
@ -299,10 +318,12 @@ final class ActorMaterializerSettings(
outputBurstLimit: Int = this.outputBurstLimit, outputBurstLimit: Int = this.outputBurstLimit,
fuzzingMode: Boolean = this.fuzzingMode, fuzzingMode: Boolean = this.fuzzingMode,
autoFusing: Boolean = this.autoFusing, autoFusing: Boolean = this.autoFusing,
maxFixedBufferSize: Int = this.maxFixedBufferSize) = maxFixedBufferSize: Int = this.maxFixedBufferSize,
syncProcessingLimit: Int = this.syncProcessingLimit) = {
new ActorMaterializerSettings( new ActorMaterializerSettings(
initialInputBufferSize, maxInputBufferSize, dispatcher, supervisionDecider, subscriptionTimeoutSettings, debugLogging, initialInputBufferSize, maxInputBufferSize, dispatcher, supervisionDecider, subscriptionTimeoutSettings, debugLogging,
outputBurstLimit, fuzzingMode, autoFusing, maxFixedBufferSize) outputBurstLimit, fuzzingMode, autoFusing, maxFixedBufferSize, syncProcessingLimit)
}
/** /**
* Each asynchronous piece of a materialized stream topology is executed by one Actor * Each asynchronous piece of a materialized stream topology is executed by one Actor
@ -366,6 +387,13 @@ final class ActorMaterializerSettings(
if (limit == this.outputBurstLimit) this if (limit == this.outputBurstLimit) this
else copy(outputBurstLimit = limit) else copy(outputBurstLimit = limit)
/**
* Limit for number of messages that can be processed synchronously in stream to substream communication
*/
def withSyncProcessingLimit(limit: Int): ActorMaterializerSettings =
if (limit == this.syncProcessingLimit) this
else copy(syncProcessingLimit = limit)
/** /**
* Enable to log all elements that are dropped due to failures (at DEBUG level). * Enable to log all elements that are dropped due to failures (at DEBUG level).
*/ */
@ -413,12 +441,13 @@ final class ActorMaterializerSettings(
s.subscriptionTimeoutSettings == subscriptionTimeoutSettings && s.subscriptionTimeoutSettings == subscriptionTimeoutSettings &&
s.debugLogging == debugLogging && s.debugLogging == debugLogging &&
s.outputBurstLimit == outputBurstLimit && s.outputBurstLimit == outputBurstLimit &&
s.syncProcessingLimit == syncProcessingLimit &&
s.fuzzingMode == fuzzingMode && s.fuzzingMode == fuzzingMode &&
s.autoFusing == autoFusing s.autoFusing == autoFusing
case _ false case _ false
} }
override def toString: String = s"ActorMaterializerSettings($initialInputBufferSize,$maxInputBufferSize,$dispatcher,$supervisionDecider,$subscriptionTimeoutSettings,$debugLogging,$outputBurstLimit,$fuzzingMode,$autoFusing)" override def toString: String = s"ActorMaterializerSettings($initialInputBufferSize,$maxInputBufferSize,$dispatcher,$supervisionDecider,$subscriptionTimeoutSettings,$debugLogging,$outputBurstLimit,$syncProcessingLimit,$fuzzingMode,$autoFusing)"
} }
object StreamSubscriptionTimeoutSettings { object StreamSubscriptionTimeoutSettings {

View file

@ -3,14 +3,15 @@
*/ */
package akka.stream.impl.fusing package akka.stream.impl.fusing
import java.util
import java.util.concurrent.TimeoutException import java.util.concurrent.TimeoutException
import akka.actor._ import akka.actor._
import akka.event.Logging import akka.event.Logging
import akka.stream._ import akka.stream._
import akka.stream.impl._
import akka.stream.impl.ReactiveStreamsCompliance._ import akka.stream.impl.ReactiveStreamsCompliance._
import akka.stream.impl.StreamLayout.{ CompositeModule, CopiedModule, Module } import akka.stream.impl.StreamLayout.{ CompositeModule, CopiedModule, Module }
import akka.stream.impl.fusing.GraphInterpreter.{ DownstreamBoundaryStageLogic, UpstreamBoundaryStageLogic, GraphAssembly } import akka.stream.impl.fusing.GraphInterpreter.{ DownstreamBoundaryStageLogic, UpstreamBoundaryStageLogic, GraphAssembly }
import akka.stream.impl.{ ActorPublisher, ReactiveStreamsCompliance }
import akka.stream.stage.{ GraphStageLogic, InHandler, OutHandler } import akka.stream.stage.{ GraphStageLogic, InHandler, OutHandler }
import org.reactivestreams.{ Subscriber, Subscription } import org.reactivestreams.{ Subscriber, Subscription }
import scala.concurrent.forkjoin.ThreadLocalRandom import scala.concurrent.forkjoin.ThreadLocalRandom
@ -18,7 +19,6 @@ import scala.util.control.NonFatal
import akka.stream.impl.ActorMaterializerImpl import akka.stream.impl.ActorMaterializerImpl
import akka.stream.impl.SubFusingActorMaterializerImpl import akka.stream.impl.SubFusingActorMaterializerImpl
import scala.annotation.tailrec import scala.annotation.tailrec
import akka.stream.impl.StreamSupervisor
/** /**
* INTERNAL API * INTERNAL API
@ -316,8 +316,16 @@ private[stream] final class GraphInterpreterShell(
private var self: ActorRef = _ private var self: ActorRef = _
lazy val log = Logging(mat.system.eventStream, self) lazy val log = Logging(mat.system.eventStream, self)
lazy val interpreter = new GraphInterpreter(assembly, mat, log, inHandlers, outHandlers, logics, private var enqueueToShortCircuit: (Any) Unit = _
(logic, event, handler) self ! AsyncInput(this, logic, event, handler), settings.fuzzingMode)
lazy val interpreter: GraphInterpreter = new GraphInterpreter(assembly, mat, log, inHandlers, outHandlers, logics,
(logic, event, handler) {
val asyncInput = AsyncInput(this, logic, event, handler)
val currentInterpreter = GraphInterpreter.currentInterpreterOrNull
if (currentInterpreter == null || (currentInterpreter.context ne self))
self ! asyncInput
else enqueueToShortCircuit(asyncInput)
}, settings.fuzzingMode, self)
private val inputs = new Array[BatchingActorInputBoundary](shape.inlets.size) private val inputs = new Array[BatchingActorInputBoundary](shape.inlets.size)
private val outputs = new Array[ActorOutputBoundary](shape.outlets.size) private val outputs = new Array[ActorOutputBoundary](shape.outlets.size)
@ -332,18 +340,24 @@ private[stream] final class GraphInterpreterShell(
* a self-message for fairness with other actors. The basic assumption here is * a self-message for fairness with other actors. The basic assumption here is
* to give each input buffer slot a chance to run through the whole pipeline * to give each input buffer slot a chance to run through the whole pipeline
* and back (for the demand). * and back (for the demand).
*
* Considered use case:
* - assume a composite Sink of one expand and one fold
* - assume an infinitely fast source of data
* - assume maxInputBufferSize == 1
* - if the event limit is greater than maxInputBufferSize * (ins + outs) than there will always be expand activity
* because no data can enter fast enough from the outside
*/ */
private val eventLimit = settings.maxInputBufferSize * (assembly.ins.length + assembly.outs.length) val shellEventLimit = settings.maxInputBufferSize * (assembly.ins.length + assembly.outs.length)
// Limits the number of events processed by the interpreter on an abort event. // Limits the number of events processed by the interpreter on an abort event.
// TODO: Better heuristic here // TODO: Better heuristic here
private val abortLimit = eventLimit * 2 private val abortLimit = shellEventLimit * 2
private var resumeScheduled = false private var resumeScheduled = false
def isInitialized: Boolean = self != null def isInitialized: Boolean = self != null
def init(self: ActorRef, subMat: SubFusingActorMaterializerImpl, enqueueToShortCircuit: (Any) Unit, eventLimit: Int): Int = {
def init(self: ActorRef, subMat: SubFusingActorMaterializerImpl): Unit = {
this.self = self this.self = self
this.enqueueToShortCircuit = enqueueToShortCircuit
var i = 0 var i = 0
while (i < inputs.length) { while (i < inputs.length) {
val in = new BatchingActorInputBoundary(settings.maxInputBufferSize, i) val in = new BatchingActorInputBoundary(settings.maxInputBufferSize, i)
@ -360,66 +374,74 @@ private[stream] final class GraphInterpreterShell(
i += 1 i += 1
} }
interpreter.init(subMat) interpreter.init(subMat)
runBatch() runBatch(eventLimit)
} }
def receive(event: BoundaryEvent): Unit = def receive(event: BoundaryEvent, eventLimit: Int): Int = {
if (waitingForShutdown) event match { resumeScheduled = false
case ExposedPublisher(_, id, publisher) if (waitingForShutdown) {
outputs(id).exposedPublisher(publisher) event match {
publishersPending -= 1 case ExposedPublisher(_, id, publisher)
if (canShutDown) _isTerminated = true outputs(id).exposedPublisher(publisher)
case OnSubscribe(_, _, sub) publishersPending -= 1
tryCancel(sub) if (canShutDown) _isTerminated = true
subscribesPending -= 1 case OnSubscribe(_, _, sub)
if (canShutDown) _isTerminated = true tryCancel(sub)
case Abort(_) subscribesPending -= 1
tryAbort(new TimeoutException("Streaming actor has been already stopped processing (normally), but not all of its " + if (canShutDown) _isTerminated = true
s"inputs or outputs have been subscribed in [${settings.subscriptionTimeoutSettings.timeout}}]. Aborting actor now.")) case Abort(_)
case _ // Ignore, there is nothing to do anyway tryAbort(new TimeoutException("Streaming actor has been already stopped processing (normally), but not all of its " +
} s"inputs or outputs have been subscribed in [${settings.subscriptionTimeoutSettings.timeout}}]. Aborting actor now."))
else event match { case _ // Ignore, there is nothing to do anyway
}
eventLimit
} else event match {
// Cases that are most likely on the hot path, in decreasing order of frequency // Cases that are most likely on the hot path, in decreasing order of frequency
case OnNext(_, id: Int, e: Any) case OnNext(_, id: Int, e: Any)
if (GraphInterpreter.Debug) println(s"${interpreter.Name} onNext $e id=$id") if (GraphInterpreter.Debug) println(s"${interpreter.Name} onNext $e id=$id")
inputs(id).onNext(e) inputs(id).onNext(e)
runBatch() runBatch(eventLimit)
case RequestMore(_, id: Int, demand: Long) case RequestMore(_, id: Int, demand: Long)
if (GraphInterpreter.Debug) println(s"${interpreter.Name} request $demand id=$id") if (GraphInterpreter.Debug) println(s"${interpreter.Name} request $demand id=$id")
outputs(id).requestMore(demand) outputs(id).requestMore(demand)
runBatch() runBatch(eventLimit)
case Resume(_) case Resume(_)
if (GraphInterpreter.Debug) println(s"${interpreter.Name} resume") if (GraphInterpreter.Debug) println(s"${interpreter.Name} resume")
resumeScheduled = false if (interpreter.isSuspended) runBatch(eventLimit) else eventLimit
if (interpreter.isSuspended) runBatch()
case AsyncInput(_, logic, event, handler) case AsyncInput(_, logic, event, handler)
interpreter.runAsyncInput(logic, event, handler) interpreter.runAsyncInput(logic, event, handler)
runBatch() if (eventLimit == 1 && interpreter.isSuspended) {
sendResume(true)
0
} else runBatch(eventLimit - 1)
// Initialization and completion messages // Initialization and completion messages
case OnError(_, id: Int, cause: Throwable) case OnError(_, id: Int, cause: Throwable)
if (GraphInterpreter.Debug) println(s"${interpreter.Name} onError id=$id") if (GraphInterpreter.Debug) println(s"${interpreter.Name} onError id=$id")
inputs(id).onError(cause) inputs(id).onError(cause)
runBatch() runBatch(eventLimit)
case OnComplete(_, id: Int) case OnComplete(_, id: Int)
if (GraphInterpreter.Debug) println(s"${interpreter.Name} onComplete id=$id") if (GraphInterpreter.Debug) println(s"${interpreter.Name} onComplete id=$id")
inputs(id).onComplete() inputs(id).onComplete()
runBatch() runBatch(eventLimit)
case OnSubscribe(_, id: Int, subscription: Subscription) case OnSubscribe(_, id: Int, subscription: Subscription)
if (GraphInterpreter.Debug) println(s"${interpreter.Name} onSubscribe id=$id") if (GraphInterpreter.Debug) println(s"${interpreter.Name} onSubscribe id=$id")
subscribesPending -= 1 subscribesPending -= 1
inputs(id).onSubscribe(subscription) inputs(id).onSubscribe(subscription)
runBatch() runBatch(eventLimit)
case Cancel(_, id: Int) case Cancel(_, id: Int)
if (GraphInterpreter.Debug) println(s"${interpreter.Name} cancel id=$id") if (GraphInterpreter.Debug) println(s"${interpreter.Name} cancel id=$id")
outputs(id).cancel() outputs(id).cancel()
runBatch() runBatch(eventLimit)
case SubscribePending(_, id: Int) case SubscribePending(_, id: Int)
outputs(id).subscribePending() outputs(id).subscribePending()
eventLimit
case ExposedPublisher(_, id, publisher) case ExposedPublisher(_, id, publisher)
publishersPending -= 1 publishersPending -= 1
outputs(id).exposedPublisher(publisher) outputs(id).exposedPublisher(publisher)
eventLimit
} }
}
private var _isTerminated = false private var _isTerminated = false
def isTerminated: Boolean = _isTerminated def isTerminated: Boolean = _isTerminated
@ -430,16 +452,16 @@ private[stream] final class GraphInterpreterShell(
private val resume = Resume(this) private val resume = Resume(this)
private def runBatch(): Unit = { def sendResume(sendResume: Boolean): Unit = {
resumeScheduled = true
if (sendResume) self ! resume
else enqueueToShortCircuit(resume)
}
def runBatch(actorEventLimit: Int): Int = {
try { try {
val effectiveLimit = { val usingShellLimit = shellEventLimit < actorEventLimit
if (!settings.fuzzingMode) eventLimit val remainingQuota = interpreter.execute(Math.min(actorEventLimit, shellEventLimit))
else {
if (ThreadLocalRandom.current().nextBoolean()) Thread.`yield`()
ThreadLocalRandom.current().nextInt(2) // 1 or zero events to be processed
}
}
interpreter.execute(effectiveLimit)
if (interpreter.isCompleted) { if (interpreter.isCompleted) {
// Cannot stop right away if not completely subscribed // Cannot stop right away if not completely subscribed
if (canShutDown) _isTerminated = true if (canShutDown) _isTerminated = true
@ -449,13 +471,13 @@ private[stream] final class GraphInterpreterShell(
override def run(): Unit = self ! Abort(GraphInterpreterShell.this) override def run(): Unit = self ! Abort(GraphInterpreterShell.this)
}) })
} }
} else if (interpreter.isSuspended && !resumeScheduled) { } else if (interpreter.isSuspended && !resumeScheduled) sendResume(!usingShellLimit)
resumeScheduled = true
self ! resume if (usingShellLimit) actorEventLimit - shellEventLimit + remainingQuota else remainingQuota
}
} catch { } catch {
case NonFatal(e) case NonFatal(e)
tryAbort(e) tryAbort(e)
actorEventLimit - 1
} }
} }
@ -498,7 +520,7 @@ private[stream] class ActorGraphInterpreter(_initial: GraphInterpreterShell) ext
def tryInit(shell: GraphInterpreterShell): Boolean = def tryInit(shell: GraphInterpreterShell): Boolean =
try { try {
shell.init(self, subFusingMaterializerImpl) currentLimit = shell.init(self, subFusingMaterializerImpl, enqueueToShortCircuit(_), currentLimit)
if (GraphInterpreter.Debug) println(s"registering new shell in ${_initial}\n ${shell.toString.replace("\n", "\n ")}") if (GraphInterpreter.Debug) println(s"registering new shell in ${_initial}\n ${shell.toString.replace("\n", "\n ")}")
if (shell.isTerminated) false if (shell.isTerminated) false
else { else {
@ -511,9 +533,20 @@ private[stream] class ActorGraphInterpreter(_initial: GraphInterpreterShell) ext
false false
} }
//this limits number of messages that can be processed synchronously during one actor receive.
private val eventLimit: Int = _initial.mat.settings.syncProcessingLimit
private var currentLimit: Int = eventLimit
//this is a var in order to save the allocation when no short-circuiting actually happens
private var shortCircuitBuffer: util.LinkedList[Any] = null
def enqueueToShortCircuit(input: Any): Unit = {
if (shortCircuitBuffer == null) shortCircuitBuffer = new util.LinkedList[Any]()
shortCircuitBuffer.add(input)
}
def registerShell(shell: GraphInterpreterShell): ActorRef = { def registerShell(shell: GraphInterpreterShell): ActorRef = {
newShells ::= shell newShells ::= shell
self ! Resume enqueueToShortCircuit(Resume)
self self
} }
@ -537,35 +570,57 @@ private[stream] class ActorGraphInterpreter(_initial: GraphInterpreterShell) ext
override def preStart(): Unit = { override def preStart(): Unit = {
tryInit(_initial) tryInit(_initial)
if (activeInterpreters.isEmpty) context.stop(self) if (activeInterpreters.isEmpty) context.stop(self)
else if (shortCircuitBuffer != null) shortCircuitBatch()
}
private def shortCircuitBatch(): Unit = {
while (!shortCircuitBuffer.isEmpty && currentLimit > 0 && activeInterpreters.nonEmpty)
shortCircuitBuffer.poll() match {
case b: BoundaryEvent processEvent(b)
case Resume finishShellRegistration()
}
if (!shortCircuitBuffer.isEmpty && currentLimit == 0) self ! Resume
}
private def processEvent(b: BoundaryEvent): Unit = {
val shell = b.shell
if (!shell.isTerminated && (shell.isInitialized || tryInit(shell))) {
currentLimit = shell.receive(b, currentLimit)
if (shell.isTerminated) {
activeInterpreters -= shell
if (activeInterpreters.isEmpty && newShells.isEmpty) context.stop(self)
}
}
} }
override def receive: Receive = { override def receive: Receive = {
case b: BoundaryEvent case b: BoundaryEvent
val shell = b.shell currentLimit = eventLimit
if (!shell.isTerminated && (shell.isInitialized || tryInit(shell))) { processEvent(b)
shell.receive(b) if (shortCircuitBuffer != null) shortCircuitBatch()
if (shell.isTerminated) {
activeInterpreters -= shell case Resume
if (activeInterpreters.isEmpty && newShells.isEmpty) context.stop(self) currentLimit = eventLimit
} if (shortCircuitBuffer != null) shortCircuitBatch()
}
case Resume finishShellRegistration()
case StreamSupervisor.PrintDebugDump case StreamSupervisor.PrintDebugDump
println(s"activeShells:") val builder = new java.lang.StringBuilder(s"activeShells (actor: $self):\n")
activeInterpreters.foreach { shell activeInterpreters.foreach { shell
println(" " + shell.toString.replace("\n", "\n ")) builder.append(" " + shell.toString.replace("\n", "\n "))
shell.interpreter.dumpWaits() builder.append(shell.interpreter.toString)
} }
println(s"newShells:") builder.append(s"newShells:")
newShells.foreach { shell newShells.foreach { shell
println(" " + shell.toString.replace("\n", "\n ")) builder.append(" " + shell.toString.replace("\n", "\n "))
shell.interpreter.dumpWaits() builder.append(shell.interpreter.toString)
} }
println(builder)
} }
override def postStop(): Unit = { override def postStop(): Unit = {
val ex = AbruptTerminationException(self) val ex = AbruptTerminationException(self)
activeInterpreters.foreach(_.tryAbort(ex)) activeInterpreters.foreach(_.tryAbort(ex))
activeInterpreters = Set.empty[GraphInterpreterShell]
newShells.foreach(s if (tryInit(s)) s.tryAbort(ex)) newShells.foreach(s if (tryInit(s)) s.tryAbort(ex))
} }
} }

View file

@ -4,13 +4,13 @@
package akka.stream.impl.fusing package akka.stream.impl.fusing
import java.util.Arrays import java.util.Arrays
import akka.actor.ActorRef
import akka.event.LoggingAdapter import akka.event.LoggingAdapter
import akka.stream.stage._ import akka.stream.stage._
import scala.annotation.tailrec import scala.annotation.tailrec
import scala.collection.immutable import scala.collection.immutable
import akka.stream._ import akka.stream._
import akka.stream.impl.StreamLayout._ import akka.stream.impl.StreamLayout._
import akka.stream.impl.fusing.GraphStages.MaterializedValueSource
import scala.concurrent.forkjoin.ThreadLocalRandom import scala.concurrent.forkjoin.ThreadLocalRandom
import scala.util.control.NonFatal import scala.util.control.NonFatal
import java.{ util ju } import java.{ util ju }
@ -193,12 +193,12 @@ private[akka] object GraphInterpreter {
override def toString: String = override def toString: String =
"GraphAssembly\n " + "GraphAssembly\n " +
stages.mkString("[", ",", "]") + "\n " + stages.mkString("Stages: [", ",", "]") + "\n " +
originalAttributes.mkString("[", ",", "]") + "\n " + originalAttributes.mkString("Attributes: [", ",", "]") + "\n " +
ins.mkString("[", ",", "]") + "\n " + ins.mkString("Inlets: [", ",", "]") + "\n " +
inOwners.mkString("[", ",", "]") + "\n " + inOwners.mkString("InOwners: [", ",", "]") + "\n " +
outs.mkString("[", ",", "]") + "\n " + outs.mkString("Outlets: [", ",", "]") + "\n " +
outOwners.mkString("[", ",", "]") outOwners.mkString("OutOwners: [", ",", "]")
} }
object GraphAssembly { object GraphAssembly {
@ -348,7 +348,8 @@ private[stream] final class GraphInterpreter(
val outHandlers: Array[OutHandler], // Lookup table for the outHandler of the connection val outHandlers: Array[OutHandler], // Lookup table for the outHandler of the connection
val logics: Array[GraphStageLogic], // Array of stage logics val logics: Array[GraphStageLogic], // Array of stage logics
val onAsyncInput: (GraphStageLogic, Any, (Any) Unit) Unit, val onAsyncInput: (GraphStageLogic, Any, (Any) Unit) Unit,
val fuzzingMode: Boolean) { val fuzzingMode: Boolean,
val context: ActorRef) {
import GraphInterpreter._ import GraphInterpreter._
// Maintains additional information for events, basically elements in-flight, or failure. // Maintains additional information for events, basically elements in-flight, or failure.
@ -523,13 +524,13 @@ private[stream] final class GraphInterpreter(
* Executes pending events until the given limit is met. If there were remaining events, isSuspended will return * Executes pending events until the given limit is met. If there were remaining events, isSuspended will return
* true. * true.
*/ */
def execute(eventLimit: Int): Unit = { def execute(eventLimit: Int): Int = {
if (Debug) println(s"$Name ---------------- EXECUTE $queueStatus (running=$runningStages, shutdown=$shutdownCounters)") if (Debug) println(s"$Name ---------------- EXECUTE $queueStatus (running=$runningStages, shutdown=$shutdownCounters)")
val currentInterpreterHolder = _currentInterpreter.get() val currentInterpreterHolder = _currentInterpreter.get()
val previousInterpreter = currentInterpreterHolder(0) val previousInterpreter = currentInterpreterHolder(0)
currentInterpreterHolder(0) = this currentInterpreterHolder(0) = this
var eventsRemaining = eventLimit
try { try {
var eventsRemaining = eventLimit
while (eventsRemaining > 0 && queueTail != queueHead) { while (eventsRemaining > 0 && queueTail != queueHead) {
val connection = dequeue() val connection = dequeue()
try processEvent(connection) try processEvent(connection)
@ -551,6 +552,7 @@ private[stream] final class GraphInterpreter(
} }
if (Debug) println(s"$Name ---------------- $queueStatus (running=$runningStages, shutdown=$shutdownCounters)") if (Debug) println(s"$Name ---------------- $queueStatus (running=$runningStages, shutdown=$shutdownCounters)")
// TODO: deadlock detection // TODO: deadlock detection
eventsRemaining
} }
def runAsyncInput(logic: GraphStageLogic, evt: Any, handler: (Any) Unit): Unit = def runAsyncInput(logic: GraphStageLogic, evt: Any, handler: (Any) Unit): Unit =
@ -654,7 +656,7 @@ private[stream] final class GraphInterpreter(
finalizeStage(logic) finalizeStage(logic)
} }
// Returns true if the given stage is alredy completed // Returns true if the given stage is already completed
def isStageCompleted(stage: GraphStageLogic): Boolean = stage != null && shutdownCounter(stage.stageId) == 0 def isStageCompleted(stage: GraphStageLogic): Boolean = stage != null && shutdownCounter(stage.stageId) == 0
// Register that a connection in which the given stage participated has been completed and therefore the stage // Register that a connection in which the given stage participated has been completed and therefore the stage
@ -734,12 +736,13 @@ private[stream] final class GraphInterpreter(
* Only invoke this after the interpreter completely settled, otherwise the results might be off. This is a very * Only invoke this after the interpreter completely settled, otherwise the results might be off. This is a very
* simplistic tool, make sure you are understanding what you are doing and then it will serve you well. * simplistic tool, make sure you are understanding what you are doing and then it will serve you well.
*/ */
def dumpWaits(): Unit = { def dumpWaits(): Unit = println(toString)
println("digraph waits {")
for (i assembly.stages.indices) { override def toString: String = {
println(s"""N$i [label="${assembly.stages(i)}"]""") val builder = new StringBuilder("digraph waits {\n")
}
for (i assembly.stages.indices)
builder.append(s"""N$i [label="${assembly.stages(i)}"]""" + "\n")
def nameIn(port: Int): String = { def nameIn(port: Int): String = {
val owner = assembly.inOwners(port) val owner = assembly.inOwners(port)
@ -756,17 +759,18 @@ private[stream] final class GraphInterpreter(
for (i portStates.indices) { for (i portStates.indices) {
portStates(i) match { portStates(i) match {
case InReady case InReady
println(s""" ${nameIn(i)} -> ${nameOut(i)} [label="shouldPull"; color=blue]; """) builder.append(s""" ${nameIn(i)} -> ${nameOut(i)} [label=shouldPull; color=blue]""")
case OutReady case OutReady
println(s""" ${nameOut(i)} -> ${nameIn(i)} [label="shouldPush"; color=red]; """) builder.append(s""" ${nameOut(i)} -> ${nameIn(i)} [label=shouldPush; color=red];""")
case x if (x | InClosed | OutClosed) == (InClosed | OutClosed) case x if (x | InClosed | OutClosed) == (InClosed | OutClosed)
println(s""" ${nameIn(i)} -> ${nameOut(i)} [style=dotted; label="closed" dir=both]; """) builder.append(s""" ${nameIn(i)} -> ${nameOut(i)} [style=dotted; label=closed dir=both];""")
case _ case _
} }
builder.append("\n")
} }
println("}") builder.append("}\n")
println(s"// $queueStatus (running=$runningStages, shutdown=${shutdownCounter.mkString(",")})") builder.append(s"// $queueStatus (running=$runningStages, shutdown=${shutdownCounter.mkString(",")})")
builder.toString()
} }
} }

View file

@ -145,7 +145,8 @@ private[akka] class IteratorInterpreter[I, O](val input: Iterator[I], val ops: S
outHandlers, outHandlers,
logics, logics,
(_, _, _) throw new UnsupportedOperationException("IteratorInterpreter does not support asynchronous events."), (_, _, _) throw new UnsupportedOperationException("IteratorInterpreter does not support asynchronous events."),
fuzzingMode = false) fuzzingMode = false,
null)
interpreter.attachUpstreamBoundary(0, upstream) interpreter.attachUpstreamBoundary(0, upstream)
interpreter.attachDownstreamBoundary(ops.length, downstream) interpreter.attachDownstreamBoundary(ops.length, downstream)
interpreter.init(null) interpreter.init(null)

View file

@ -77,9 +77,7 @@ final class FlattenMerge[T, M](breadth: Int) extends GraphStage[FlowShape[Graph[
q.enqueue(sinkIn) q.enqueue(sinkIn)
} }
} }
override def onUpstreamFinish(): Unit = { override def onUpstreamFinish(): Unit = if (!sinkIn.isAvailable) removeSource(sinkIn)
if (!sinkIn.isAvailable) removeSource(sinkIn)
}
}) })
sinkIn.pull() sinkIn.pull()
sources += sinkIn sources += sinkIn
@ -93,9 +91,8 @@ final class FlattenMerge[T, M](breadth: Int) extends GraphStage[FlowShape[Graph[
if (activeSources == 0 && isClosed(in)) completeStage() if (activeSources == 0 && isClosed(in)) completeStage()
} }
override def postStop(): Unit = { override def postStop(): Unit = sources.foreach(_.cancel())
sources.foreach(_.cancel())
}
} }
override def toString: String = s"FlattenMerge($breadth)" override def toString: String = s"FlattenMerge($breadth)"

View file

@ -11,7 +11,8 @@ import akka.japi.function.{ Effect, Procedure }
import akka.stream._ import akka.stream._
import akka.stream.impl.StreamLayout.Module import akka.stream.impl.StreamLayout.Module
import akka.stream.impl.fusing.{ GraphInterpreter, GraphStageModule, SubSource, SubSink } import akka.stream.impl.fusing.{ GraphInterpreter, GraphStageModule, SubSource, SubSink }
import akka.stream.impl.{ ReactiveStreamsCompliance} import akka.stream.impl.ReactiveStreamsCompliance
import scala.collection.mutable.ArrayBuffer
import scala.collection.{ immutable, mutable } import scala.collection.{ immutable, mutable }
import scala.concurrent.duration.FiniteDuration import scala.concurrent.duration.FiniteDuration
import akka.stream.actor.ActorSubscriberMessage import akka.stream.actor.ActorSubscriberMessage

View file

@ -646,6 +646,10 @@ object MiMa extends AutoPlugin {
), ),
"2.4.2" -> Seq( "2.4.2" -> Seq(
FilterAnyProblemStartingWith("akka.stream.impl.VirtualProcessor"), FilterAnyProblemStartingWith("akka.stream.impl.VirtualProcessor"),
ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.stream.impl.fusing.GraphInterpreter.execute"),
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.fusing.GraphInterpreter.this"),
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.fusing.GraphInterpreterShell.init"),
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.fusing.GraphInterpreterShell.receive"),
ProblemFilters.exclude[MissingClassProblem]("akka.stream.impl.Stages$Drop"), ProblemFilters.exclude[MissingClassProblem]("akka.stream.impl.Stages$Drop"),
ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.stream.impl.MaterializerSession.assignPort"), ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.stream.impl.MaterializerSession.assignPort"),
ProblemFilters.exclude[MissingClassProblem]("akka.stream.impl.Stages$Drop$"), ProblemFilters.exclude[MissingClassProblem]("akka.stream.impl.Stages$Drop$"),