diff --git a/akka-stream-tests/src/test/scala/akka/stream/impl/GraphStageLogicSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/impl/GraphStageLogicSpec.scala index f8b03045c2..918c9d753d 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/impl/GraphStageLogicSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/impl/GraphStageLogicSpec.scala @@ -206,8 +206,8 @@ class GraphStageLogicSpec extends StreamSpec with GraphInterpreterSpecKit { .connect(passThrough.out, Downstream) .init() - interpreter.complete(0) - interpreter.cancel(1) + interpreter.complete(interpreter.connections(0)) + interpreter.cancel(interpreter.connections(1)) interpreter.execute(2) expectMsg("postStop2") diff --git a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/GraphInterpreterSpecKit.scala b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/GraphInterpreterSpecKit.scala index cf0a8f788c..c0bd29dd10 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/GraphInterpreterSpecKit.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/GraphInterpreterSpecKit.scala @@ -71,17 +71,17 @@ trait GraphInterpreterSpecKit extends StreamSpec { def init(): Unit = { val assembly = buildAssembly() - val (inHandlers, outHandlers, logics) = + val (conns, logics) = assembly.materialize(Attributes.none, assembly.stages.map(_.module), new java.util.HashMap, _ ⇒ ()) - _interpreter = new GraphInterpreter(assembly, NoMaterializer, logger, inHandlers, outHandlers, logics, + _interpreter = new GraphInterpreter(assembly, NoMaterializer, logger, logics, conns, (_, _, _) ⇒ (), fuzzingMode = false, null) for ((upstream, i) ← upstreams.zipWithIndex) { - _interpreter.attachUpstreamBoundary(i, upstream._1) + _interpreter.attachUpstreamBoundary(conns(i), upstream._1) } for ((downstream, i) ← downstreams.zipWithIndex) { - _interpreter.attachDownstreamBoundary(i + upstreams.size + connections.size, downstream._2) + _interpreter.attachDownstreamBoundary(conns(i + upstreams.size + connections.size), downstream._2) } _interpreter.init(null) @@ -89,9 +89,9 @@ trait GraphInterpreterSpecKit extends StreamSpec { } def manualInit(assembly: GraphAssembly): Unit = { - val (inHandlers, outHandlers, logics) = + val (connections, logics) = assembly.materialize(Attributes.none, assembly.stages.map(_.module), new java.util.HashMap, _ ⇒ ()) - _interpreter = new GraphInterpreter(assembly, NoMaterializer, logger, inHandlers, outHandlers, logics, + _interpreter = new GraphInterpreter(assembly, NoMaterializer, logger, logics, connections, (_, _, _) ⇒ (), fuzzingMode = false, null) } @@ -202,7 +202,7 @@ trait GraphInterpreterSpecKit extends StreamSpec { // Modified onPush that does not grab() automatically the element. This accesses some internals. override def onPush(): Unit = { - val internalEvent = interpreter.connectionSlots(portToConn(in.id)) + val internalEvent = portToConn(in.id).slot internalEvent match { case Failed(_, elem) ⇒ lastEvent += OnNext(DownstreamPortProbe.this, elem) @@ -224,8 +224,8 @@ trait GraphInterpreterSpecKit extends StreamSpec { outOwners = Array(-1)) manualInit(assembly) - interpreter.attachDownstreamBoundary(0, in) - interpreter.attachUpstreamBoundary(0, out) + interpreter.attachDownstreamBoundary(interpreter.connections(0), in) + interpreter.attachUpstreamBoundary(interpreter.connections(0), out) interpreter.init(null) } diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSpec.scala index 10a6da89f8..de3c8019f9 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSpec.scala @@ -65,10 +65,10 @@ class FlowSpec extends StreamSpec(ConfigFactory.parseString("akka.actor.debug.re Array(null, stage.shape.out), Array(-1, 0)) - val (inHandlers, outHandlers, logics) = + val (connections, logics) = assembly.materialize(Attributes.none, assembly.stages.map(_.module), new java.util.HashMap, _ ⇒ ()) - val shell = new GraphInterpreterShell(assembly, inHandlers, outHandlers, logics, stage.shape, settings, + val shell = new GraphInterpreterShell(assembly, connections, logics, stage.shape, settings, materializer.asInstanceOf[ActorMaterializerImpl]) val props = Props(new BrokenActorInterpreter(shell, "a3")) diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala index 8c2b997042..1f9b53e632 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala @@ -211,9 +211,9 @@ private[akka] case class ActorMaterializerImpl( private def matGraph(graph: GraphModule, effectiveAttributes: Attributes, matVal: ju.Map[Module, Any]): Unit = { val calculatedSettings = effectiveSettings(effectiveAttributes) - val (inHandlers, outHandlers, logics) = graph.assembly.materialize(effectiveAttributes, graph.matValIDs, matVal, registerSrc) + val (connections, logics) = graph.assembly.materialize(effectiveAttributes, graph.matValIDs, matVal, registerSrc) - val shell = new GraphInterpreterShell(graph.assembly, inHandlers, outHandlers, logics, graph.shape, + val shell = new GraphInterpreterShell(graph.assembly, connections, logics, graph.shape, calculatedSettings, ActorMaterializerImpl.this) val impl = diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorGraphInterpreter.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorGraphInterpreter.scala index 3b14652c29..403513cda9 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorGraphInterpreter.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorGraphInterpreter.scala @@ -12,7 +12,7 @@ import akka.stream._ import akka.stream.impl.ReactiveStreamsCompliance._ import akka.stream.impl.StreamLayout.{ AtomicModule, CompositeModule, CopiedModule, Module } import akka.stream.impl.{ SubFusingActorMaterializerImpl, _ } -import akka.stream.impl.fusing.GraphInterpreter.{ DownstreamBoundaryStageLogic, GraphAssembly, UpstreamBoundaryStageLogic } +import akka.stream.impl.fusing.GraphInterpreter.{ Connection, DownstreamBoundaryStageLogic, GraphAssembly, UpstreamBoundaryStageLogic } import akka.stream.stage.{ GraphStageLogic, InHandler, OutHandler } import org.reactivestreams.{ Subscriber, Subscription } @@ -308,8 +308,7 @@ object ActorGraphInterpreter { */ final class GraphInterpreterShell( assembly: GraphAssembly, - inHandlers: Array[InHandler], - outHandlers: Array[OutHandler], + connections: Array[Connection], logics: Array[GraphStageLogic], shape: Shape, settings: ActorMaterializerSettings, @@ -322,7 +321,7 @@ final class GraphInterpreterShell( private var enqueueToShortCircuit: (Any) ⇒ Unit = _ - lazy val interpreter: GraphInterpreter = new GraphInterpreter(assembly, mat, log, inHandlers, outHandlers, logics, + lazy val interpreter: GraphInterpreter = new GraphInterpreter(assembly, mat, log, logics, connections, (logic, event, handler) ⇒ { val asyncInput = AsyncInput(this, logic, event, handler) val currentInterpreter = GraphInterpreter.currentInterpreterOrNull @@ -366,7 +365,7 @@ final class GraphInterpreterShell( while (i < inputs.length) { val in = new BatchingActorInputBoundary(settings.maxInputBufferSize, i) inputs(i) = in - interpreter.attachUpstreamBoundary(i, in) + interpreter.attachUpstreamBoundary(connections(i), in) i += 1 } val offset = assembly.connectionCount - outputs.length @@ -374,7 +373,7 @@ final class GraphInterpreterShell( while (i < outputs.length) { val out = new ActorOutputBoundary(self, this, i) outputs(i) = out - interpreter.attachDownstreamBoundary(i + offset, out) + interpreter.attachDownstreamBoundary(connections(i + offset), out) i += 1 } interpreter.init(subMat) diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphInterpreter.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphInterpreter.scala index 3cba149fb0..1ca60cb2ab 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphInterpreter.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphInterpreter.scala @@ -27,7 +27,7 @@ object GraphInterpreter { */ final val Debug = false - final val NoEvent = -1 + final val NoEvent = null final val Boundary = -1 final val InReady = 1 @@ -66,6 +66,37 @@ object GraphInterpreter { val singleNoAttribute: Array[Attributes] = Array(Attributes.none) + /** + * INERNAL API + * + * Contains all the necessary information for the GraphInterpreter to be able to implement a connection + * between an output and input ports. + * + * @param id Identifier of the connection. Corresponds to the array slot in the [[GraphAssembly]] + * @param inOwnerId Identifier of the owner of the input side of the connection. Corresponds to the array slot in + * the [[GraphAssembly]] + * @param inOwner The stage logic that corresponds to the input side of the connection. + * @param outOwnerId Identifier of the owner of the output side of the connection. Corresponds to the array slot + * in the [[GraphAssembly]] + * @param outOwner The stage logic that corresponds to the output side of the connection. + * @param inHandler The handler that contains the callback for input events. + * @param outHandler The handler that contains the callback for output events. + */ + final class Connection( + val id: Int, + val inOwnerId: Int, + val inOwner: GraphStageLogic, + val outOwnerId: Int, + val outOwner: GraphStageLogic, + var inHandler: InHandler, + var outHandler: OutHandler + ) { + var portState: Int = InReady + var slot: Any = Empty + + override def toString = s"Connection($id, $portState, $slot, $inHandler, $outHandler)" + } + /** * INTERNAL API * @@ -117,16 +148,14 @@ object GraphInterpreter { * handlers and the stage logic instances. * * Returns a tuple of - * - lookup table for InHandlers - * - lookup table for OutHandlers + * - lookup table for Connections * - array of the logics - * - materialized value */ def materialize( inheritedAttributes: Attributes, copiedModules: Array[Module], matVal: ju.Map[Module, Any], - register: MaterializedValueSource[Any] ⇒ Unit): (Array[InHandler], Array[OutHandler], Array[GraphStageLogic]) = { + register: MaterializedValueSource[Any] ⇒ Unit): (Array[Connection], Array[GraphStageLogic]) = { val logics = Array.ofDim[GraphStageLogic](stages.length) var i = 0 @@ -167,32 +196,43 @@ object GraphInterpreter { i += 1 } - val inHandlers = Array.ofDim[InHandler](connectionCount) - val outHandlers = Array.ofDim[OutHandler](connectionCount) + val connections = Array.ofDim[Connection](connectionCount) i = 0 while (i < connectionCount) { + connections(i) = new Connection( + id = i, + inOwner = if (inOwners(i) == Boundary) null else logics(inOwners(i)), + inOwnerId = inOwners(i), + outOwner = if (outOwners(i) == Boundary) null else logics(outOwners(i)), + outOwnerId = outOwners(i), + inHandler = null, + outHandler = null + ) + if (ins(i) ne null) { val logic = logics(inOwners(i)) logic.handlers(ins(i).id) match { - case null ⇒ throw new IllegalStateException(s"no handler defined in stage $logic for port ${ins(i)}") - case h: InHandler ⇒ inHandlers(i) = h + case null ⇒ throw new IllegalStateException(s"no handler defined in stage $logic for port ${ins(i)}") + case h: InHandler ⇒ + connections(i).inHandler = h } - logics(inOwners(i)).portToConn(ins(i).id) = i + logics(inOwners(i)).portToConn(ins(i).id) = connections(i) } if (outs(i) ne null) { val logic = logics(outOwners(i)) val inCount = logic.inCount logic.handlers(outs(i).id + inCount) match { - case null ⇒ throw new IllegalStateException(s"no handler defined in stage $logic for port ${outs(i)}") - case h: OutHandler ⇒ outHandlers(i) = h + case null ⇒ throw new IllegalStateException(s"no handler defined in stage $logic for port ${outs(i)}") + case h: OutHandler ⇒ + connections(i).outHandler = h } - logic.portToConn(outs(i).id + inCount) = i + logic.portToConn(outs(i).id + inCount) = connections(i) } i += 1 } - (inHandlers, outHandlers, logics) + (connections, logics) } override def toString: String = { @@ -293,25 +333,27 @@ object GraphInterpreter { * The internal architecture of the interpreter is based on the usage of arrays and optimized for reducing allocations * on the hot paths. * - * One of the basic abstractions inside the interpreter is the notion of *connection*. In the abstract sense a - * connection represents an output-input port pair (an analogue for a connected RS Publisher-Subscriber pair), - * while in the practical sense a connection is a number which represents slots in certain arrays. + * One of the basic abstractions inside the interpreter is the [[akka.stream.impl.fusing.GraphInterpreter.Connection]]. + * A connection represents an output-input port pair (an analogue for a connected RS Publisher-Subscriber pair). + * The Connection object contains all the necessary data for the interpreter to pass elements, demand, completion + * or errors across the Connection. * In particular * - portStates contains a bitfield that tracks the states of the ports (output-input) corresponding to this * connection. This bitfield is used to decode the event that is in-flight. - * - connectionSlots is a mapping from a connection id to a potential element or exception that accompanies the + * - connectionSlot contains a potential element or exception that accompanies the * event encoded in the portStates bitfield - * - inHandlers is a mapping from a connection id to the [[InHandler]] instance that handles the events corresponding + * - inHandler contains the [[InHandler]] instance that handles the events corresponding * to the input port of the connection - * - outHandlers is a mapping from a connection id to the [[OutHandler]] instance that handles the events corresponding + * - outHandler contains the [[OutHandler]] instance that handles the events corresponding * to the output port of the connection * - * On top of these lookup tables there is an eventQueue, represented as a circular buffer of integers. The integers - * it contains represents connections that have pending events to be processed. The pending event itself is encoded - * in the portStates bitfield. This implies that there can be only one event in flight for a given connection, which - * is true in almost all cases, except a complete-after-push or fail-after-push. + * On top of the Connection table there is an eventQueue, represented as a circular buffer of Connections. The queue + * contains the Connections that have pending events to be processed. The pending event itself is encoded + * in the portState bitfield of the Connection. This implies that there can be only one event in flight for a given + * Connection, which is true in almost all cases, except a complete-after-push or fail-after-push which has to + * be decoded accordingly. * - * The layout of the portStates bitfield is the following: + * The layout of the portState bitfield is the following: * * |- state machn.-| Only one bit is hot among these bits * 64 32 16 | 8 4 2 1 | @@ -335,10 +377,10 @@ object GraphInterpreter { * Sending an event is usually the following sequence: * - An action is requested by a stage logic (push, pull, complete, etc.) * - the state machine in portStates is transitioned from a ready state to a pending event - * - the id of the affected connection is enqueued + * - the affected Connection is enqueued * * Receiving an event is usually the following sequence: - * - id of connection to be processed is dequeued + * - the Connection to be processed is dequeued * - the type of the event is determined from the bits set on portStates * - the state machine in portStates is transitioned to a ready state * - using the inHandlers/outHandlers table the corresponding callback is called on the stage logic. @@ -352,9 +394,8 @@ final class GraphInterpreter( private val assembly: GraphInterpreter.GraphAssembly, val materializer: Materializer, val log: LoggingAdapter, - val inHandlers: Array[InHandler], // Lookup table for the InHandler of a connection - val outHandlers: Array[OutHandler], // Lookup table for the outHandler of the connection val logics: Array[GraphStageLogic], // Array of stage logics + val connections: Array[GraphInterpreter.Connection], val onAsyncInput: (GraphStageLogic, Any, (Any) ⇒ Unit) ⇒ Unit, val fuzzingMode: Boolean, val context: ActorRef) { @@ -362,11 +403,11 @@ final class GraphInterpreter( // Maintains additional information for events, basically elements in-flight, or failure. // Other events are encoded in the portStates bitfield. - val connectionSlots = Array.fill[Any](assembly.connectionCount)(Empty) + //val connectionSlots = Array.fill[Any](assembly.connectionCount)(Empty) // Bitfield encoding pending events and various states for efficient querying and updates. See the documentation // of the class for a full description. - val portStates = Array.fill[Int](assembly.connectionCount)(InReady) + //val portStates = Array.fill[Int](assembly.connectionCount)(InReady) /** * INTERNAL API @@ -388,19 +429,19 @@ final class GraphInterpreter( // An event queue implemented as a circular buffer // FIXME: This calculates the maximum size ever needed, but most assemblies can run on a smaller queue - private[this] val eventQueue = Array.ofDim[Int](1 << (32 - Integer.numberOfLeadingZeros(assembly.connectionCount - 1))) + private[this] val eventQueue = Array.ofDim[Connection](1 << (32 - Integer.numberOfLeadingZeros(assembly.connectionCount - 1))) private[this] val mask = eventQueue.length - 1 private[this] var queueHead: Int = 0 private[this] var queueTail: Int = 0 private[this] var chaseCounter = 0 // the first events in preStart blocks should be not chased - private[this] var chasedPush: Int = NoEvent - private[this] var chasedPull: Int = NoEvent + private[this] var chasedPush: Connection = NoEvent + private[this] var chasedPull: Connection = NoEvent private def queueStatus: String = { val contents = (queueHead until queueTail).map(idx ⇒ { val conn = eventQueue(idx & mask) - (conn, portStates(conn), connectionSlots(conn)) + conn }) s"(${eventQueue.length}, $queueHead, $queueTail)(${contents.mkString(", ")})" } @@ -420,36 +461,42 @@ final class GraphInterpreter( * Assign the boundary logic to a given connection. This will serve as the interface to the external world * (outside the interpreter) to process and inject events. */ - def attachUpstreamBoundary(connection: Int, logic: UpstreamBoundaryStageLogic[_]): Unit = { + def attachUpstreamBoundary(connection: Connection, logic: UpstreamBoundaryStageLogic[_]): Unit = { logic.portToConn(logic.out.id + logic.inCount) = connection logic.interpreter = this - outHandlers(connection) = logic.handlers(0).asInstanceOf[OutHandler] + connection.outHandler = logic.handlers(0).asInstanceOf[OutHandler] } + def attachUpstreamBoundary(connection: Int, logic: UpstreamBoundaryStageLogic[_]): Unit = + attachUpstreamBoundary(connections(connection), logic) + /** * Assign the boundary logic to a given connection. This will serve as the interface to the external world * (outside the interpreter) to process and inject events. */ - def attachDownstreamBoundary(connection: Int, logic: DownstreamBoundaryStageLogic[_]): Unit = { + def attachDownstreamBoundary(connection: Connection, logic: DownstreamBoundaryStageLogic[_]): Unit = { logic.portToConn(logic.in.id) = connection logic.interpreter = this - inHandlers(connection) = logic.handlers(0).asInstanceOf[InHandler] + connection.inHandler = logic.handlers(0).asInstanceOf[InHandler] } + def attachDownstreamBoundary(connection: Int, logic: DownstreamBoundaryStageLogic[_]): Unit = + attachDownstreamBoundary(connections(connection), logic) + /** * Dynamic handler changes are communicated from a GraphStageLogic by this method. */ - def setHandler(connection: Int, handler: InHandler): Unit = { + def setHandler(connection: Connection, handler: InHandler): Unit = { if (Debug) println(s"$Name SETHANDLER ${inOwnerName(connection)} (in) $handler") - inHandlers(connection) = handler + connection.inHandler = handler } /** * Dynamic handler changes are communicated from a GraphStageLogic by this method. */ - def setHandler(connection: Int, handler: OutHandler): Unit = { + def setHandler(connection: Connection, handler: OutHandler): Unit = { if (Debug) println(s"$Name SETHANDLER ${outOwnerName(connection)} (out) $handler") - outHandlers(connection) = handler + connection.outHandler = handler } /** @@ -502,29 +549,29 @@ final class GraphInterpreter( } // Debug name for a connections input part - private def inOwnerName(connection: Int): String = - assembly.inOwners(connection) match { + private def inOwnerName(connection: Connection): String = + assembly.inOwners(connection.id) match { case Boundary ⇒ "DownstreamBoundary" case owner ⇒ assembly.stages(owner).toString } // Debug name for a connections output part - private def outOwnerName(connection: Int): String = - assembly.outOwners(connection) match { + private def outOwnerName(connection: Connection): String = + assembly.outOwners(connection.id) match { case Boundary ⇒ "UpstreamBoundary" case owner ⇒ assembly.stages(owner).toString } // Debug name for a connections input part - private def inLogicName(connection: Int): String = - assembly.inOwners(connection) match { + private def inLogicName(connection: Connection): String = + assembly.inOwners(connection.id) match { case Boundary ⇒ "DownstreamBoundary" case owner ⇒ logics(owner).toString } // Debug name for a connections output part - private def outLogicName(connection: Int): String = - assembly.outOwners(connection) match { + private def outLogicName(connection: Connection): String = + assembly.outOwners(connection.id) match { case Boundary ⇒ "UpstreamBoundary" case owner ⇒ logics(owner).toString } @@ -558,23 +605,51 @@ final class GraphInterpreter( // Abort chasing chaseCounter = 0 - if (chasedPush != NoEvent) { + if (chasedPush ne NoEvent) { enqueue(chasedPush) chasedPush = NoEvent } - if (chasedPull != NoEvent) { + if (chasedPull ne NoEvent) { enqueue(chasedPull) chasedPull = NoEvent } } } + /* + * This is the "normal" event processing code which dequeues directly from the internal event queue. Since + * most execution paths tend to produce either a Push that will be propagated along a longer chain we take + * extra steps below to make this more efficient. + */ try processEvent(connection) catch { case NonFatal(e) ⇒ reportStageError(e) } afterStageHasRun(activeStage) + /* + * "Event chasing" optimization follows from here. This optimization works under the assumption that a Push or + * Pull is very likely immediately followed by another Push/Pull. The difference from the "normal" event + * dispatch is that chased events are never touching the event queue, they use a "streamlined" execution path + * instead. Looking at the scenario of a Push, the following events will happen. + * - "normal" dispatch executes an onPush event + * - stage eventually calls push() + * - code inside the push() method checks the validity of the call, and also if it can be safely ignored + * (because the target stage already completed we just have not been notified yet) + * - if the upper limit of ChaseLimit has not been reached, then the Connection is put into the chasedPush + * variable + * - the loop below immediately captures this push and dispatches it + * + * What is saved by this optimization is three steps: + * - no need to enqueue the Connection in the queue (array), it ends up in a simple variable, reducing + * pressure on array load-store + * - no need to dequeue the Connection from the queue, similar to above + * - no need to decode the event, we know it is a Push already + * - no need to check for validity of the event because we already checked at the push() call, and there + * can be no concurrent events interleaved unlike with the normal dispatch (think about a cancel() that is + * called in the target stage just before the onPush() arrives). This avoids unnecessary branching. + */ + // Chasing PUSH events while (chasedPush != NoEvent) { val connection = chasedPush @@ -629,17 +704,13 @@ final class GraphInterpreter( } finally currentInterpreterHolder(0) = previousInterpreter } - private def safeLogics(id: Int) = - if (id == Boundary) null - else logics(id) - // Decodes and processes a single event for the given connection - private def processEvent(connection: Int): Unit = { + private def processEvent(connection: Connection): Unit = { // this must be the state after returning without delivering any signals, to avoid double-finalization of some unlucky stage // (this can happen if a stage completes voluntarily while connection close events are still queued) activeStage = null - val code = portStates(connection) + val code = connection.portState // Manual fast decoding, fast paths are PUSH and PULL // PUSH @@ -652,24 +723,22 @@ final class GraphInterpreter( // CANCEL } else if ((code & (OutClosed | InClosed)) == InClosed) { - val stageId = assembly.outOwners(connection) - activeStage = safeLogics(stageId) - if (Debug) println(s"$Name CANCEL ${inOwnerName(connection)} -> ${outOwnerName(connection)} (${outHandlers(connection)}) [${outLogicName(connection)}]") - portStates(connection) |= OutClosed - completeConnection(stageId) - outHandlers(connection).onDownstreamFinish() + activeStage = connection.outOwner + if (Debug) println(s"$Name CANCEL ${inOwnerName(connection)} -> ${outOwnerName(connection)} (${connection.outHandler}) [${outLogicName(connection)}]") + connection.portState |= OutClosed + completeConnection(connection.outOwnerId) + connection.outHandler.onDownstreamFinish() } else if ((code & (OutClosed | InClosed)) == OutClosed) { // COMPLETIONS if ((code & Pushing) == 0) { // Normal completion (no push pending) - if (Debug) println(s"$Name COMPLETE ${outOwnerName(connection)} -> ${inOwnerName(connection)} (${inHandlers(connection)}) [${inLogicName(connection)}]") - portStates(connection) |= InClosed - val stageId = assembly.inOwners(connection) - activeStage = safeLogics(stageId) - completeConnection(stageId) - if ((portStates(connection) & InFailed) == 0) inHandlers(connection).onUpstreamFinish() - else inHandlers(connection).onUpstreamFailure(connectionSlots(connection).asInstanceOf[Failed].ex) + if (Debug) println(s"$Name COMPLETE ${outOwnerName(connection)} -> ${inOwnerName(connection)} (${connection.inHandler}) [${inLogicName(connection)}]") + connection.portState |= InClosed + activeStage = connection.inOwner + completeConnection(connection.inOwnerId) + if ((connection.portState & InFailed) == 0) connection.inHandler.onUpstreamFinish() + else connection.inHandler.onUpstreamFailure(connection.slot.asInstanceOf[Failed].ex) } else { // Push is pending, first process push, then re-enqueue closing event processPush(connection) @@ -679,21 +748,21 @@ final class GraphInterpreter( } } - private def processPush(connection: Int): Unit = { - if (Debug) println(s"$Name PUSH ${outOwnerName(connection)} -> ${inOwnerName(connection)}, ${connectionSlots(connection)} (${inHandlers(connection)}) [${inLogicName(connection)}]") - activeStage = safeLogics(assembly.inOwners(connection)) - portStates(connection) ^= PushEndFlip - inHandlers(connection).onPush() + private def processPush(connection: Connection): Unit = { + if (Debug) println(s"$Name PUSH ${outOwnerName(connection)} -> ${inOwnerName(connection)}, ${connection.slot} (${connection.inHandler}) [${inLogicName(connection)}]") + activeStage = connection.inOwner + connection.portState ^= PushEndFlip + connection.inHandler.onPush() } - private def processPull(connection: Int): Unit = { - if (Debug) println(s"$Name PULL ${inOwnerName(connection)} -> ${outOwnerName(connection)} (${outHandlers(connection)}) [${outLogicName(connection)}]") - activeStage = safeLogics(assembly.outOwners(connection)) - portStates(connection) ^= PullEndFlip - outHandlers(connection).onPull() + private def processPull(connection: Connection): Unit = { + if (Debug) println(s"$Name PULL ${inOwnerName(connection)} -> ${outOwnerName(connection)} (${connection.outHandler}) [${outLogicName(connection)}]") + activeStage = connection.outOwner + connection.portState ^= PullEndFlip + connection.outHandler.onPull() } - private def dequeue(): Int = { + private def dequeue(): Connection = { val idx = queueHead & mask if (fuzzingMode) { val swapWith = (ThreadLocalRandom.current.nextInt(queueTail - queueHead) + queueHead) & mask @@ -707,7 +776,7 @@ final class GraphInterpreter( elem } - def enqueue(connection: Int): Unit = { + def enqueue(connection: Connection): Unit = { if (Debug) if (queueTail - queueHead > mask) new Exception(s"$Name internal queue full ($queueStatus) + $connection").printStackTrace() eventQueue(queueTail & mask) = connection queueTail += 1 @@ -745,24 +814,24 @@ final class GraphInterpreter( } } - private[stream] def chasePush(connection: Int): Unit = { + private[stream] def chasePush(connection: Connection): Unit = { if (chaseCounter > 0 && chasedPush == NoEvent) { chaseCounter -= 1 chasedPush = connection } else enqueue(connection) } - private[stream] def chasePull(connection: Int): Unit = { + private[stream] def chasePull(connection: Connection): Unit = { if (chaseCounter > 0 && chasedPull == NoEvent) { chaseCounter -= 1 chasedPull = connection } else enqueue(connection) } - private[stream] def complete(connection: Int): Unit = { - val currentState = portStates(connection) + private[stream] def complete(connection: Connection): Unit = { + val currentState = connection.portState if (Debug) println(s"$Name complete($connection) [$currentState]") - portStates(connection) = currentState | OutClosed + connection.portState = currentState | OutClosed // Push-Close needs special treatment, cannot be chased, convert back to ordinary event if (chasedPush == connection) { @@ -770,30 +839,30 @@ final class GraphInterpreter( enqueue(connection) } else if ((currentState & (InClosed | Pushing | Pulling | OutClosed)) == 0) enqueue(connection) - if ((currentState & OutClosed) == 0) completeConnection(assembly.outOwners(connection)) + if ((currentState & OutClosed) == 0) completeConnection(connection.outOwnerId) } - private[stream] def fail(connection: Int, ex: Throwable): Unit = { - val currentState = portStates(connection) + private[stream] def fail(connection: Connection, ex: Throwable): Unit = { + val currentState = connection.portState if (Debug) println(s"$Name fail($connection, $ex) [$currentState]") - portStates(connection) = currentState | OutClosed + connection.portState = currentState | OutClosed if ((currentState & (InClosed | OutClosed)) == 0) { - portStates(connection) = currentState | (OutClosed | InFailed) - connectionSlots(connection) = Failed(ex, connectionSlots(connection)) + connection.portState = currentState | (OutClosed | InFailed) + connection.slot = Failed(ex, connection.slot) if ((currentState & (Pulling | Pushing)) == 0) enqueue(connection) } - if ((currentState & OutClosed) == 0) completeConnection(assembly.outOwners(connection)) + if ((currentState & OutClosed) == 0) completeConnection(connection.outOwnerId) } - private[stream] def cancel(connection: Int): Unit = { - val currentState = portStates(connection) + private[stream] def cancel(connection: Connection): Unit = { + val currentState = connection.portState if (Debug) println(s"$Name cancel($connection) [$currentState]") - portStates(connection) = currentState | InClosed + connection.portState = currentState | InClosed if ((currentState & OutClosed) == 0) { - connectionSlots(connection) = Empty + connection.slot = Empty if ((currentState & (Pulling | Pushing | InClosed)) == 0) enqueue(connection) } - if ((currentState & InClosed) == 0) completeConnection(assembly.inOwners(connection)) + if ((currentState & InClosed) == 0) completeConnection(connection.inOwnerId) } /** @@ -822,8 +891,8 @@ final class GraphInterpreter( else "N" + owner } - for (i ← portStates.indices) { - portStates(i) match { + for (i ← connections.indices) { + connections(i).portState match { case InReady ⇒ builder.append(s""" ${nameIn(i)} -> ${nameOut(i)} [label=shouldPull; color=blue]""") case OutReady ⇒ diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/IteratorInterpreter.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/IteratorInterpreter.scala index 263cece018..cb011795b3 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/IteratorInterpreter.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/IteratorInterpreter.scala @@ -137,20 +137,19 @@ private[akka] class IteratorInterpreter[I, O]( } val assembly = new GraphAssembly(stagesArray, attributes, ins, inOwners, outs, outOwners) - val (inHandlers, outHandlers, logics) = + val (connections, logics) = assembly.materialize(Attributes.none, assembly.stages.map(_.module), new ju.HashMap, _ ⇒ ()) val interpreter = new GraphInterpreter( assembly, NoMaterializer, NoLogging, - inHandlers, - outHandlers, logics, + connections, (_, _, _) ⇒ throw new UnsupportedOperationException("IteratorInterpreter does not support asynchronous events."), fuzzingMode = false, null) - interpreter.attachUpstreamBoundary(0, upstream) - interpreter.attachDownstreamBoundary(length, downstream) + interpreter.attachUpstreamBoundary(connections(0), upstream) + interpreter.attachDownstreamBoundary(connections(length), downstream) interpreter.init(null) } diff --git a/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala b/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala index 1bf2d3a7ee..b810f192ec 100644 --- a/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala +++ b/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala @@ -220,7 +220,7 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount: * INTERNAL API */ // Using common array to reduce overhead for small port counts - private[stream] val portToConn = Array.ofDim[Int](handlers.length) + private[stream] val portToConn = Array.ofDim[Connection](handlers.length) /** * INTERNAL API @@ -318,8 +318,8 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount: if (_interpreter != null) _interpreter.setHandler(conn(out), handler) } - private def conn(in: Inlet[_]): Int = portToConn(in.id) - private def conn(out: Outlet[_]): Int = portToConn(out.id + inCount) + private def conn(in: Inlet[_]): Connection = portToConn(in.id) + private def conn(out: Outlet[_]): Connection = portToConn(out.id + inCount) /** * Retrieves the current callback for the events on the given [[Outlet]] @@ -341,11 +341,11 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount: */ final protected def pull[T](in: Inlet[T]): Unit = { val connection = conn(in) - val portState = interpreter.portStates(connection) + val portState = connection.portState val it = interpreter if ((portState & (InReady | InClosed | OutClosed)) == InReady) { - it.portStates(connection) = portState ^ PullStartFlip + connection.portState = portState ^ PullStartFlip it.chasePull(connection) } else { // Detailed error information should not add overhead to the hot path @@ -354,7 +354,7 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount: // There were no errors, the pull was simply ignored as the target stage already closed its port. We // still need to track proper state though. - it.portStates(connection) = portState ^ PullStartFlip + connection.portState = portState ^ PullStartFlip } } @@ -381,18 +381,18 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount: final protected def grab[T](in: Inlet[T]): T = { val connection = conn(in) val it = interpreter - val elem = it.connectionSlots(connection) + val elem = connection.slot // Fast path - if ((it.portStates(connection) & (InReady | InFailed)) == InReady && (elem.asInstanceOf[AnyRef] ne Empty)) { - it.connectionSlots(connection) = Empty + if ((connection.portState & (InReady | InFailed)) == InReady && (elem.asInstanceOf[AnyRef] ne Empty)) { + connection.slot = Empty elem.asInstanceOf[T] } else { // Slow path require(isAvailable(in), s"Cannot get element from already empty input port ($in)") - val failed = it.connectionSlots(connection).asInstanceOf[Failed] + val failed = connection.slot.asInstanceOf[Failed] val elem = failed.previousElem.asInstanceOf[T] - it.connectionSlots(connection) = Failed(failed.ex, Empty) + connection.slot = Failed(failed.ex, Empty) elem } } @@ -401,7 +401,7 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount: * Indicates whether there is already a pending pull for the given input port. If this method returns true * then [[isAvailable()]] must return false for that same port. */ - final protected def hasBeenPulled[T](in: Inlet[T]): Boolean = (interpreter.portStates(conn(in)) & (InReady | InClosed)) == 0 + final protected def hasBeenPulled[T](in: Inlet[T]): Boolean = (conn(in).portState & (InReady | InClosed)) == 0 /** * Indicates whether there is an element waiting at the given input port. [[grab()]] can be used to retrieve the @@ -412,14 +412,14 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount: final protected def isAvailable[T](in: Inlet[T]): Boolean = { val connection = conn(in) - val normalArrived = (interpreter.portStates(conn(in)) & (InReady | InFailed)) == InReady + val normalArrived = (conn(in).portState & (InReady | InFailed)) == InReady // Fast path - if (normalArrived) interpreter.connectionSlots(connection).asInstanceOf[AnyRef] ne Empty + if (normalArrived) connection.slot.asInstanceOf[AnyRef] ne Empty else { // Slow path on failure - if ((interpreter.portStates(conn(in)) & (InReady | InFailed)) == (InReady | InFailed)) { - interpreter.connectionSlots(connection) match { + if ((connection.portState & (InReady | InFailed)) == (InReady | InFailed)) { + connection.slot match { case Failed(_, elem) ⇒ elem.asInstanceOf[AnyRef] ne Empty case _ ⇒ false // This can only be Empty actually (if a cancel was concurrent with a failure) } @@ -430,7 +430,7 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount: /** * Indicates whether the port has been closed. A closed port cannot be pulled. */ - final protected def isClosed[T](in: Inlet[T]): Boolean = (interpreter.portStates(conn(in)) & InClosed) != 0 + final protected def isClosed[T](in: Inlet[T]): Boolean = (conn(in).portState & InClosed) != 0 /** * Emits an element through the given output port. Calling this method twice before a [[pull()]] has been arrived @@ -439,17 +439,17 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount: */ final protected def push[T](out: Outlet[T], elem: T): Unit = { val connection = conn(out) - val portState = interpreter.portStates(connection) + val portState = connection.portState val it = interpreter - it.portStates(connection) = portState ^ PushStartFlip + connection.portState = portState ^ PushStartFlip if ((portState & (OutReady | OutClosed | InClosed)) == OutReady && (elem != null)) { - it.connectionSlots(connection) = elem + connection.slot = elem it.chasePush(connection) } else { // Restore state for the error case - it.portStates(connection) = portState + connection.portState = portState // Detailed error information should not add overhead to the hot path ReactiveStreamsCompliance.requireNonNullElement(elem) @@ -457,7 +457,7 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount: require(!isClosed(out), s"Cannot pull closed port ($out)") // No error, just InClosed caused the actual pull to be ignored, but the status flag still needs to be flipped - it.portStates(connection) = portState ^ PushStartFlip + connection.portState = portState ^ PushStartFlip } } @@ -523,12 +523,12 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount: * Return true if the given output port is ready to be pushed. */ final def isAvailable[T](out: Outlet[T]): Boolean = - (interpreter.portStates(conn(out)) & (OutReady | OutClosed)) == OutReady + (conn(out).portState & (OutReady | OutClosed)) == OutReady /** * Indicates whether the port has been closed. A closed port cannot be pushed. */ - final protected def isClosed[T](out: Outlet[T]): Boolean = (interpreter.portStates(conn(out)) & OutClosed) != 0 + final protected def isClosed[T](out: Outlet[T]): Boolean = (conn(out).portState & OutClosed) != 0 /** * Read a number of elements from the given inlet and continue with the given function,