2015-08-19 15:22:02 +02:00
|
|
|
/**
|
2017-01-04 17:37:10 +01:00
|
|
|
* Copyright (C) 2015-2017 Lightbend Inc. <http://www.lightbend.com>
|
2015-08-19 15:22:02 +02:00
|
|
|
*/
|
|
|
|
|
package akka.stream.impl.fusing
|
|
|
|
|
|
2016-02-02 16:39:47 -05:00
|
|
|
import akka.actor.ActorRef
|
2015-09-25 16:36:53 +02:00
|
|
|
import akka.event.LoggingAdapter
|
2015-09-18 14:30:43 +02:00
|
|
|
import akka.stream.stage._
|
2015-10-31 14:46:10 +01:00
|
|
|
import akka.stream._
|
2016-07-05 14:59:48 +02:00
|
|
|
import java.util.concurrent.ThreadLocalRandom
|
2017-03-09 17:04:46 +01:00
|
|
|
|
2015-09-11 15:50:17 +02:00
|
|
|
import scala.util.control.NonFatal
|
2015-08-19 15:22:02 +02:00
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* INTERNAL API
|
|
|
|
|
*
|
|
|
|
|
* (See the class for the documentation of the internals)
|
|
|
|
|
*/
|
2016-05-03 18:58:26 -07:00
|
|
|
object GraphInterpreter {
|
2015-08-19 15:22:02 +02:00
|
|
|
/**
|
|
|
|
|
* Compile time constant, enable it for debug logging to the console.
|
|
|
|
|
*/
|
|
|
|
|
final val Debug = false
|
|
|
|
|
|
2016-07-22 15:39:37 +02:00
|
|
|
final val NoEvent = null
|
2015-09-25 16:36:53 +02:00
|
|
|
|
|
|
|
|
final val InReady = 1
|
|
|
|
|
final val Pulling = 2
|
|
|
|
|
final val Pushing = 4
|
|
|
|
|
final val OutReady = 8
|
|
|
|
|
|
|
|
|
|
final val InClosed = 16
|
|
|
|
|
final val OutClosed = 32
|
|
|
|
|
final val InFailed = 64
|
|
|
|
|
|
|
|
|
|
final val PullStartFlip = 3 // 0011
|
|
|
|
|
final val PullEndFlip = 10 // 1010
|
|
|
|
|
final val PushStartFlip = 12 //1100
|
|
|
|
|
final val PushEndFlip = 5 //0101
|
|
|
|
|
|
2015-12-20 12:54:05 +01:00
|
|
|
final val KeepGoingFlag = 0x4000000
|
|
|
|
|
final val KeepGoingMask = 0x3ffffff
|
|
|
|
|
|
2015-08-19 15:22:02 +02:00
|
|
|
/**
|
|
|
|
|
* Marker object that indicates that a port holds no element since it was already grabbed. The port is still pullable,
|
|
|
|
|
* but there is no more element to grab.
|
|
|
|
|
*/
|
|
|
|
|
case object Empty
|
2015-09-25 16:36:53 +02:00
|
|
|
final case class Failed(ex: Throwable, previousElem: Any)
|
2015-08-19 15:22:02 +02:00
|
|
|
|
2015-09-25 16:36:53 +02:00
|
|
|
abstract class UpstreamBoundaryStageLogic[T] extends GraphStageLogic(inCount = 0, outCount = 1) {
|
2015-08-19 15:22:02 +02:00
|
|
|
def out: Outlet[T]
|
|
|
|
|
}
|
|
|
|
|
|
2015-09-25 16:36:53 +02:00
|
|
|
abstract class DownstreamBoundaryStageLogic[T] extends GraphStageLogic(inCount = 1, outCount = 0) {
|
2015-08-19 15:22:02 +02:00
|
|
|
def in: Inlet[T]
|
|
|
|
|
}
|
|
|
|
|
|
2015-10-31 14:46:10 +01:00
|
|
|
val singleNoAttribute: Array[Attributes] = Array(Attributes.none)
|
|
|
|
|
|
2016-07-22 15:39:37 +02:00
|
|
|
/**
|
|
|
|
|
* INERNAL API
|
|
|
|
|
*
|
|
|
|
|
* Contains all the necessary information for the GraphInterpreter to be able to implement a connection
|
|
|
|
|
* between an output and input ports.
|
|
|
|
|
*
|
2017-03-07 17:06:41 +01:00
|
|
|
* @param id Identifier of the connection.
|
2016-07-22 15:39:37 +02:00
|
|
|
* @param inOwner The stage logic that corresponds to the input side of the connection.
|
|
|
|
|
* @param outOwner The stage logic that corresponds to the output side of the connection.
|
|
|
|
|
* @param inHandler The handler that contains the callback for input events.
|
|
|
|
|
* @param outHandler The handler that contains the callback for output events.
|
|
|
|
|
*/
|
|
|
|
|
final class Connection(
|
2016-07-27 13:29:23 +02:00
|
|
|
var id: Int,
|
|
|
|
|
var inOwner: GraphStageLogic,
|
|
|
|
|
var outOwner: GraphStageLogic,
|
2016-07-22 15:39:37 +02:00
|
|
|
var inHandler: InHandler,
|
2017-03-09 17:35:02 +01:00
|
|
|
var outHandler: OutHandler) {
|
2016-07-22 15:39:37 +02:00
|
|
|
var portState: Int = InReady
|
|
|
|
|
var slot: Any = Empty
|
|
|
|
|
|
2017-03-07 17:06:41 +01:00
|
|
|
override def toString =
|
2017-03-10 19:08:08 +01:00
|
|
|
if (GraphInterpreter.Debug) s"Connection($id, $inOwner, $outOwner, $inHandler, $outHandler, $portState, $slot)"
|
2017-03-07 17:06:41 +01:00
|
|
|
else s"Connection($id, $portState, $slot, $inHandler, $outHandler)"
|
2016-07-22 15:39:37 +02:00
|
|
|
}
|
|
|
|
|
|
2015-11-04 15:10:20 +00:00
|
|
|
/**
|
|
|
|
|
* INTERNAL API
|
|
|
|
|
*/
|
|
|
|
|
private val _currentInterpreter = new ThreadLocal[Array[AnyRef]] {
|
|
|
|
|
/*
|
|
|
|
|
* Using an Object-array avoids holding on to the GraphInterpreter class
|
|
|
|
|
* when this accidentally leaks onto threads that are not stopped when this
|
|
|
|
|
* class should be unloaded.
|
|
|
|
|
*/
|
|
|
|
|
override def initialValue = new Array(1)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* INTERNAL API
|
|
|
|
|
*/
|
2015-12-20 12:54:05 +01:00
|
|
|
private[akka] def currentInterpreter: GraphInterpreter =
|
2015-11-04 15:10:20 +00:00
|
|
|
_currentInterpreter.get()(0).asInstanceOf[GraphInterpreter].nonNull
|
|
|
|
|
// nonNull is just a debug helper to find nulls more timely
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* INTERNAL API
|
|
|
|
|
*/
|
2015-12-20 12:54:05 +01:00
|
|
|
private[akka] def currentInterpreterOrNull: GraphInterpreter =
|
2015-11-04 15:10:20 +00:00
|
|
|
_currentInterpreter.get()(0).asInstanceOf[GraphInterpreter]
|
|
|
|
|
|
2015-08-19 15:22:02 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
2016-02-24 22:00:32 -08:00
|
|
|
* INTERNAL API
|
2015-08-19 15:22:02 +02:00
|
|
|
*
|
|
|
|
|
* From an external viewpoint, the GraphInterpreter takes an assembly of graph processing stages encoded as a
|
|
|
|
|
* [[GraphInterpreter#GraphAssembly]] object and provides facilities to execute and interact with this assembly.
|
2016-09-25 08:24:08 +09:00
|
|
|
* The lifecycle of the Interpreter is roughly the following:
|
2015-08-19 15:22:02 +02:00
|
|
|
* - [[init()]] is called
|
|
|
|
|
* - [[execute()]] is called whenever there is need for execution, providing an upper limit on the processed events
|
|
|
|
|
* - [[finish()]] is called before the interpreter is disposed, preferably after [[isCompleted]] returned true, although
|
|
|
|
|
* in abort cases this is not strictly necessary
|
|
|
|
|
*
|
|
|
|
|
* The [[execute()]] method of the interpreter accepts an upper bound on the events it will process. After this limit
|
|
|
|
|
* is reached or there are no more pending events to be processed, the call returns. It is possible to inspect
|
|
|
|
|
* if there are unprocessed events left via the [[isSuspended]] method. [[isCompleted]] returns true once all stages
|
|
|
|
|
* reported completion inside the interpreter.
|
|
|
|
|
*
|
|
|
|
|
* The internal architecture of the interpreter is based on the usage of arrays and optimized for reducing allocations
|
|
|
|
|
* on the hot paths.
|
|
|
|
|
*
|
2016-07-22 15:39:37 +02:00
|
|
|
* One of the basic abstractions inside the interpreter is the [[akka.stream.impl.fusing.GraphInterpreter.Connection]].
|
|
|
|
|
* A connection represents an output-input port pair (an analogue for a connected RS Publisher-Subscriber pair).
|
|
|
|
|
* The Connection object contains all the necessary data for the interpreter to pass elements, demand, completion
|
|
|
|
|
* or errors across the Connection.
|
2015-08-19 15:22:02 +02:00
|
|
|
* In particular
|
2015-09-25 16:36:53 +02:00
|
|
|
* - portStates contains a bitfield that tracks the states of the ports (output-input) corresponding to this
|
|
|
|
|
* connection. This bitfield is used to decode the event that is in-flight.
|
2016-07-22 15:39:37 +02:00
|
|
|
* - connectionSlot contains a potential element or exception that accompanies the
|
2015-09-25 16:36:53 +02:00
|
|
|
* event encoded in the portStates bitfield
|
2016-07-22 15:39:37 +02:00
|
|
|
* - inHandler contains the [[InHandler]] instance that handles the events corresponding
|
2015-08-19 15:22:02 +02:00
|
|
|
* to the input port of the connection
|
2016-07-22 15:39:37 +02:00
|
|
|
* - outHandler contains the [[OutHandler]] instance that handles the events corresponding
|
2015-08-19 15:22:02 +02:00
|
|
|
* to the output port of the connection
|
|
|
|
|
*
|
2016-07-22 15:39:37 +02:00
|
|
|
* On top of the Connection table there is an eventQueue, represented as a circular buffer of Connections. The queue
|
|
|
|
|
* contains the Connections that have pending events to be processed. The pending event itself is encoded
|
|
|
|
|
* in the portState bitfield of the Connection. This implies that there can be only one event in flight for a given
|
|
|
|
|
* Connection, which is true in almost all cases, except a complete-after-push or fail-after-push which has to
|
|
|
|
|
* be decoded accordingly.
|
2015-09-25 16:36:53 +02:00
|
|
|
*
|
2016-07-22 15:39:37 +02:00
|
|
|
* The layout of the portState bitfield is the following:
|
2015-09-25 16:36:53 +02:00
|
|
|
*
|
|
|
|
|
* |- state machn.-| Only one bit is hot among these bits
|
|
|
|
|
* 64 32 16 | 8 4 2 1 |
|
|
|
|
|
* +---+---+---|---+---+---+---|
|
|
|
|
|
* | | | | | | |
|
|
|
|
|
* | | | | | | | From the following flags only one is active in any given time. These bits encode
|
|
|
|
|
* | | | | | | | state machine states, and they are "moved" around using XOR masks to keep other bits
|
|
|
|
|
* | | | | | | | intact.
|
|
|
|
|
* | | | | | | |
|
|
|
|
|
* | | | | | | +- InReady: The input port is ready to be pulled
|
|
|
|
|
* | | | | | +----- Pulling: A pull is active, but have not arrived yet (queued)
|
|
|
|
|
* | | | | +--------- Pushing: A push is active, but have not arrived yet (queued)
|
|
|
|
|
* | | | +------------- OutReady: The output port is ready to be pushed
|
|
|
|
|
* | | |
|
|
|
|
|
* | | +----------------- InClosed: The input port is closed and will not receive any events.
|
|
|
|
|
* | | A push might be still in flight which will be then processed first.
|
|
|
|
|
* | +--------------------- OutClosed: The output port is closed and will not receive any events.
|
|
|
|
|
* +------------------------- InFailed: Always set in conjunction with InClosed. Indicates that the close event
|
|
|
|
|
* is a failure
|
2015-08-19 15:22:02 +02:00
|
|
|
*
|
|
|
|
|
* Sending an event is usually the following sequence:
|
|
|
|
|
* - An action is requested by a stage logic (push, pull, complete, etc.)
|
2015-09-25 16:36:53 +02:00
|
|
|
* - the state machine in portStates is transitioned from a ready state to a pending event
|
2016-07-22 15:39:37 +02:00
|
|
|
* - the affected Connection is enqueued
|
2015-08-19 15:22:02 +02:00
|
|
|
*
|
|
|
|
|
* Receiving an event is usually the following sequence:
|
2016-07-22 15:39:37 +02:00
|
|
|
* - the Connection to be processed is dequeued
|
2015-09-25 16:36:53 +02:00
|
|
|
* - the type of the event is determined from the bits set on portStates
|
|
|
|
|
* - the state machine in portStates is transitioned to a ready state
|
2015-08-19 15:22:02 +02:00
|
|
|
* - using the inHandlers/outHandlers table the corresponding callback is called on the stage logic.
|
|
|
|
|
*
|
|
|
|
|
* Because of the FIFO construction of the queue the interpreter is fair, i.e. a pending event is always executed
|
|
|
|
|
* after a bounded number of other events. This property, together with suspendability means that even infinite cycles can
|
2015-10-31 14:46:10 +01:00
|
|
|
* be modeled, or even dissolved (if preempted and a "stealing" external event is injected; for example the non-cycle
|
2015-08-19 15:22:02 +02:00
|
|
|
* edge of a balance is pulled, dissolving the original cycle).
|
|
|
|
|
*/
|
2016-05-03 18:58:26 -07:00
|
|
|
final class GraphInterpreter(
|
2016-07-27 13:29:23 +02:00
|
|
|
val materializer: Materializer,
|
|
|
|
|
val log: LoggingAdapter,
|
|
|
|
|
val logics: Array[GraphStageLogic], // Array of stage logics
|
|
|
|
|
val connections: Array[GraphInterpreter.Connection],
|
|
|
|
|
val onAsyncInput: (GraphStageLogic, Any, (Any) ⇒ Unit) ⇒ Unit,
|
|
|
|
|
val fuzzingMode: Boolean,
|
|
|
|
|
val context: ActorRef) {
|
|
|
|
|
|
2015-08-19 15:22:02 +02:00
|
|
|
import GraphInterpreter._
|
|
|
|
|
|
2016-08-08 17:05:47 +02:00
|
|
|
private[this] val ChaseLimit = if (fuzzingMode) 0 else 16
|
2015-08-19 15:22:02 +02:00
|
|
|
|
2015-11-04 15:10:20 +00:00
|
|
|
/**
|
|
|
|
|
* INTERNAL API
|
|
|
|
|
*/
|
|
|
|
|
private[stream] var activeStage: GraphStageLogic = _
|
2015-08-19 15:22:02 +02:00
|
|
|
|
|
|
|
|
// The number of currently running stages. Once this counter reaches zero, the interpreter is considered to be
|
|
|
|
|
// completed
|
2016-07-27 13:29:23 +02:00
|
|
|
private[this] var runningStages = logics.length
|
2015-08-19 15:22:02 +02:00
|
|
|
|
|
|
|
|
// Counts how many active connections a stage has. Once it reaches zero, the stage is automatically stopped.
|
2016-07-27 13:29:23 +02:00
|
|
|
private[this] val shutdownCounter = Array.tabulate(logics.length) { i ⇒
|
|
|
|
|
logics(i).handlers.length
|
2015-08-19 15:22:02 +02:00
|
|
|
}
|
|
|
|
|
|
2016-07-20 17:10:50 +02:00
|
|
|
private[this] var _subFusingMaterializer: Materializer = _
|
2015-12-17 13:35:37 +01:00
|
|
|
def subFusingMaterializer: Materializer = _subFusingMaterializer
|
|
|
|
|
|
2015-08-19 15:22:02 +02:00
|
|
|
// An event queue implemented as a circular buffer
|
2015-09-25 16:36:53 +02:00
|
|
|
// FIXME: This calculates the maximum size ever needed, but most assemblies can run on a smaller queue
|
2017-03-09 17:35:02 +01:00
|
|
|
private[this] val eventQueue = new Array[Connection](1 << (32 - Integer.numberOfLeadingZeros(connections.length - 1)))
|
2015-09-25 16:36:53 +02:00
|
|
|
private[this] val mask = eventQueue.length - 1
|
|
|
|
|
private[this] var queueHead: Int = 0
|
|
|
|
|
private[this] var queueTail: Int = 0
|
2015-08-19 15:22:02 +02:00
|
|
|
|
2016-07-20 17:10:50 +02:00
|
|
|
private[this] var chaseCounter = 0 // the first events in preStart blocks should be not chased
|
2016-07-22 15:39:37 +02:00
|
|
|
private[this] var chasedPush: Connection = NoEvent
|
|
|
|
|
private[this] var chasedPull: Connection = NoEvent
|
2016-07-20 17:10:50 +02:00
|
|
|
|
2015-10-31 14:46:10 +01:00
|
|
|
private def queueStatus: String = {
|
|
|
|
|
val contents = (queueHead until queueTail).map(idx ⇒ {
|
|
|
|
|
val conn = eventQueue(idx & mask)
|
2016-07-22 15:39:37 +02:00
|
|
|
conn
|
2015-10-31 14:46:10 +01:00
|
|
|
})
|
|
|
|
|
s"(${eventQueue.length}, $queueHead, $queueTail)(${contents.mkString(", ")})"
|
|
|
|
|
}
|
|
|
|
|
private[this] var _Name: String = _
|
|
|
|
|
def Name: String =
|
|
|
|
|
if (_Name eq null) {
|
|
|
|
|
_Name = f"${System.identityHashCode(this)}%08X"
|
|
|
|
|
_Name
|
|
|
|
|
} else _Name
|
|
|
|
|
|
2015-11-04 15:10:20 +00:00
|
|
|
/**
|
|
|
|
|
* INTERNAL API
|
|
|
|
|
*/
|
|
|
|
|
private[stream] def nonNull: GraphInterpreter = this
|
|
|
|
|
|
2015-10-21 17:52:11 +02:00
|
|
|
/**
|
|
|
|
|
* Dynamic handler changes are communicated from a GraphStageLogic by this method.
|
|
|
|
|
*/
|
2016-07-22 15:39:37 +02:00
|
|
|
def setHandler(connection: Connection, handler: InHandler): Unit = {
|
2015-10-31 14:46:10 +01:00
|
|
|
if (Debug) println(s"$Name SETHANDLER ${inOwnerName(connection)} (in) $handler")
|
2016-07-22 15:39:37 +02:00
|
|
|
connection.inHandler = handler
|
2015-10-21 17:52:11 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Dynamic handler changes are communicated from a GraphStageLogic by this method.
|
|
|
|
|
*/
|
2016-07-22 15:39:37 +02:00
|
|
|
def setHandler(connection: Connection, handler: OutHandler): Unit = {
|
2015-10-31 14:46:10 +01:00
|
|
|
if (Debug) println(s"$Name SETHANDLER ${outOwnerName(connection)} (out) $handler")
|
2016-07-22 15:39:37 +02:00
|
|
|
connection.outHandler = handler
|
2015-10-21 17:52:11 +02:00
|
|
|
}
|
|
|
|
|
|
2015-08-19 15:22:02 +02:00
|
|
|
/**
|
|
|
|
|
* Returns true if there are pending unprocessed events in the event queue.
|
|
|
|
|
*/
|
|
|
|
|
def isSuspended: Boolean = queueHead != queueTail
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Returns true if there are no more running stages and pending events.
|
|
|
|
|
*/
|
|
|
|
|
def isCompleted: Boolean = runningStages == 0 && !isSuspended
|
|
|
|
|
|
|
|
|
|
/**
|
2015-12-17 13:35:37 +01:00
|
|
|
* Initializes the states of all the stage logics by calling preStart().
|
|
|
|
|
* The passed-in materializer is intended to be a SubFusingActorMaterializer
|
|
|
|
|
* that avoids creating new Actors when stages materialize sub-flows. If no
|
|
|
|
|
* such materializer is available, passing in `null` will reuse the normal
|
|
|
|
|
* materializer for the GraphInterpreter—fusing is only an optimization.
|
2015-08-19 15:22:02 +02:00
|
|
|
*/
|
2015-12-17 13:35:37 +01:00
|
|
|
def init(subMat: Materializer): Unit = {
|
|
|
|
|
_subFusingMaterializer = if (subMat == null) materializer else subMat
|
2015-08-19 15:22:02 +02:00
|
|
|
var i = 0
|
|
|
|
|
while (i < logics.length) {
|
2015-09-25 16:36:53 +02:00
|
|
|
val logic = logics(i)
|
|
|
|
|
logic.interpreter = this
|
|
|
|
|
try {
|
|
|
|
|
logic.beforePreStart()
|
|
|
|
|
logic.preStart()
|
|
|
|
|
} catch {
|
2016-01-29 15:13:18 +01:00
|
|
|
case NonFatal(e) ⇒
|
2017-03-09 17:04:46 +01:00
|
|
|
log.error(e, "Error during preStart in [{}]: {}", logic.originalStage.getOrElse(logic), e.getMessage)
|
2016-01-29 15:13:18 +01:00
|
|
|
logic.failStage(e)
|
2015-09-25 16:36:53 +02:00
|
|
|
}
|
2015-10-31 14:46:10 +01:00
|
|
|
afterStageHasRun(logic)
|
2015-08-19 15:22:02 +02:00
|
|
|
i += 1
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Finalizes the state of all stages by calling postStop() (if necessary).
|
|
|
|
|
*/
|
|
|
|
|
def finish(): Unit = {
|
|
|
|
|
var i = 0
|
|
|
|
|
while (i < logics.length) {
|
2015-10-31 14:46:10 +01:00
|
|
|
val logic = logics(i)
|
|
|
|
|
if (!isStageCompleted(logic)) finalizeStage(logic)
|
2015-08-19 15:22:02 +02:00
|
|
|
i += 1
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Debug name for a connections input part
|
2016-07-27 13:29:23 +02:00
|
|
|
private def inOwnerName(connection: Connection): String = connection.inOwner.toString
|
2015-08-19 15:22:02 +02:00
|
|
|
|
2016-02-24 22:00:32 -08:00
|
|
|
// Debug name for a connections output part
|
2016-07-27 13:29:23 +02:00
|
|
|
private def outOwnerName(connection: Connection): String = connection.outOwner.toString
|
2015-08-19 15:22:02 +02:00
|
|
|
|
2015-10-31 14:46:10 +01:00
|
|
|
// Debug name for a connections input part
|
2017-03-10 19:08:08 +01:00
|
|
|
private def inLogicName(connection: Connection): String = logics(connection.inOwner.stageId).toString
|
2015-10-31 14:46:10 +01:00
|
|
|
|
2016-02-24 22:00:32 -08:00
|
|
|
// Debug name for a connections output part
|
2017-03-10 19:08:08 +01:00
|
|
|
private def outLogicName(connection: Connection): String = logics(connection.outOwner.stageId).toString
|
2015-10-31 14:46:10 +01:00
|
|
|
|
2015-12-20 12:54:05 +01:00
|
|
|
private def shutdownCounters: String =
|
|
|
|
|
shutdownCounter.map(x ⇒ if (x >= KeepGoingFlag) s"${x & KeepGoingMask}(KeepGoing)" else x.toString).mkString(",")
|
|
|
|
|
|
2015-08-19 15:22:02 +02:00
|
|
|
/**
|
|
|
|
|
* Executes pending events until the given limit is met. If there were remaining events, isSuspended will return
|
|
|
|
|
* true.
|
|
|
|
|
*/
|
2016-02-02 16:39:47 -05:00
|
|
|
def execute(eventLimit: Int): Int = {
|
2015-12-20 12:54:05 +01:00
|
|
|
if (Debug) println(s"$Name ---------------- EXECUTE $queueStatus (running=$runningStages, shutdown=$shutdownCounters)")
|
2015-11-06 08:57:24 +01:00
|
|
|
val currentInterpreterHolder = _currentInterpreter.get()
|
|
|
|
|
val previousInterpreter = currentInterpreterHolder(0)
|
|
|
|
|
currentInterpreterHolder(0) = this
|
2016-02-02 16:39:47 -05:00
|
|
|
var eventsRemaining = eventLimit
|
2015-11-04 15:10:20 +00:00
|
|
|
try {
|
|
|
|
|
while (eventsRemaining > 0 && queueTail != queueHead) {
|
|
|
|
|
val connection = dequeue()
|
2016-07-20 17:10:50 +02:00
|
|
|
eventsRemaining -= 1
|
|
|
|
|
chaseCounter = math.min(ChaseLimit, eventsRemaining)
|
2016-01-29 15:13:18 +01:00
|
|
|
|
2016-07-20 17:10:50 +02:00
|
|
|
def reportStageError(e: Throwable): Unit = {
|
|
|
|
|
if (activeStage == null) throw e
|
|
|
|
|
else {
|
2017-03-09 17:04:46 +01:00
|
|
|
log.error(e, "Error in stage [{}]: {}", activeStage.originalStage.getOrElse(activeStage), e.getMessage)
|
2016-07-20 17:10:50 +02:00
|
|
|
activeStage.failStage(e)
|
|
|
|
|
|
|
|
|
|
// Abort chasing
|
|
|
|
|
chaseCounter = 0
|
2016-07-22 15:39:37 +02:00
|
|
|
if (chasedPush ne NoEvent) {
|
2016-07-20 17:10:50 +02:00
|
|
|
enqueue(chasedPush)
|
|
|
|
|
chasedPush = NoEvent
|
2016-01-29 15:13:18 +01:00
|
|
|
}
|
2016-07-22 15:39:37 +02:00
|
|
|
if (chasedPull ne NoEvent) {
|
2016-07-20 17:10:50 +02:00
|
|
|
enqueue(chasedPull)
|
|
|
|
|
chasedPull = NoEvent
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2016-07-22 15:39:37 +02:00
|
|
|
/*
|
|
|
|
|
* This is the "normal" event processing code which dequeues directly from the internal event queue. Since
|
|
|
|
|
* most execution paths tend to produce either a Push that will be propagated along a longer chain we take
|
|
|
|
|
* extra steps below to make this more efficient.
|
|
|
|
|
*/
|
2016-07-20 17:10:50 +02:00
|
|
|
try processEvent(connection)
|
|
|
|
|
catch {
|
|
|
|
|
case NonFatal(e) ⇒ reportStageError(e)
|
2015-11-04 15:10:20 +00:00
|
|
|
}
|
|
|
|
|
afterStageHasRun(activeStage)
|
2016-07-20 17:10:50 +02:00
|
|
|
|
2016-07-22 15:39:37 +02:00
|
|
|
/*
|
|
|
|
|
* "Event chasing" optimization follows from here. This optimization works under the assumption that a Push or
|
|
|
|
|
* Pull is very likely immediately followed by another Push/Pull. The difference from the "normal" event
|
|
|
|
|
* dispatch is that chased events are never touching the event queue, they use a "streamlined" execution path
|
|
|
|
|
* instead. Looking at the scenario of a Push, the following events will happen.
|
|
|
|
|
* - "normal" dispatch executes an onPush event
|
|
|
|
|
* - stage eventually calls push()
|
|
|
|
|
* - code inside the push() method checks the validity of the call, and also if it can be safely ignored
|
|
|
|
|
* (because the target stage already completed we just have not been notified yet)
|
|
|
|
|
* - if the upper limit of ChaseLimit has not been reached, then the Connection is put into the chasedPush
|
|
|
|
|
* variable
|
|
|
|
|
* - the loop below immediately captures this push and dispatches it
|
|
|
|
|
*
|
|
|
|
|
* What is saved by this optimization is three steps:
|
|
|
|
|
* - no need to enqueue the Connection in the queue (array), it ends up in a simple variable, reducing
|
|
|
|
|
* pressure on array load-store
|
|
|
|
|
* - no need to dequeue the Connection from the queue, similar to above
|
|
|
|
|
* - no need to decode the event, we know it is a Push already
|
|
|
|
|
* - no need to check for validity of the event because we already checked at the push() call, and there
|
|
|
|
|
* can be no concurrent events interleaved unlike with the normal dispatch (think about a cancel() that is
|
|
|
|
|
* called in the target stage just before the onPush() arrives). This avoids unnecessary branching.
|
|
|
|
|
*/
|
|
|
|
|
|
2016-07-20 17:10:50 +02:00
|
|
|
// Chasing PUSH events
|
|
|
|
|
while (chasedPush != NoEvent) {
|
|
|
|
|
val connection = chasedPush
|
|
|
|
|
chasedPush = NoEvent
|
|
|
|
|
try processPush(connection)
|
|
|
|
|
catch {
|
|
|
|
|
case NonFatal(e) ⇒ reportStageError(e)
|
|
|
|
|
}
|
|
|
|
|
afterStageHasRun(activeStage)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Chasing PULL events
|
|
|
|
|
while (chasedPull != NoEvent) {
|
|
|
|
|
val connection = chasedPull
|
|
|
|
|
chasedPull = NoEvent
|
|
|
|
|
try processPull(connection)
|
|
|
|
|
catch {
|
|
|
|
|
case NonFatal(e) ⇒ reportStageError(e)
|
|
|
|
|
}
|
|
|
|
|
afterStageHasRun(activeStage)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (chasedPush != NoEvent) {
|
|
|
|
|
enqueue(chasedPush)
|
|
|
|
|
chasedPush = NoEvent
|
|
|
|
|
}
|
|
|
|
|
|
2015-09-11 15:50:17 +02:00
|
|
|
}
|
2016-07-20 17:10:50 +02:00
|
|
|
// Event *must* be enqueued while not in the execute loop (events enqueued from external, possibly async events)
|
|
|
|
|
chaseCounter = 0
|
2015-11-04 15:10:20 +00:00
|
|
|
} finally {
|
2015-11-06 08:57:24 +01:00
|
|
|
currentInterpreterHolder(0) = previousInterpreter
|
2015-08-19 15:22:02 +02:00
|
|
|
}
|
2015-12-20 12:54:05 +01:00
|
|
|
if (Debug) println(s"$Name ---------------- $queueStatus (running=$runningStages, shutdown=$shutdownCounters)")
|
2015-08-19 15:22:02 +02:00
|
|
|
// TODO: deadlock detection
|
2016-02-02 16:39:47 -05:00
|
|
|
eventsRemaining
|
2015-08-19 15:22:02 +02:00
|
|
|
}
|
|
|
|
|
|
2015-12-20 12:54:05 +01:00
|
|
|
def runAsyncInput(logic: GraphStageLogic, evt: Any, handler: (Any) ⇒ Unit): Unit =
|
|
|
|
|
if (!isStageCompleted(logic)) {
|
|
|
|
|
if (GraphInterpreter.Debug) println(s"$Name ASYNC $evt ($handler) [$logic]")
|
|
|
|
|
val currentInterpreterHolder = _currentInterpreter.get()
|
|
|
|
|
val previousInterpreter = currentInterpreterHolder(0)
|
|
|
|
|
currentInterpreterHolder(0) = this
|
|
|
|
|
try {
|
|
|
|
|
activeStage = logic
|
|
|
|
|
try handler(evt)
|
|
|
|
|
catch {
|
|
|
|
|
case NonFatal(ex) ⇒ logic.failStage(ex)
|
|
|
|
|
}
|
|
|
|
|
afterStageHasRun(logic)
|
|
|
|
|
} finally currentInterpreterHolder(0) = previousInterpreter
|
|
|
|
|
}
|
|
|
|
|
|
2015-08-19 15:22:02 +02:00
|
|
|
// Decodes and processes a single event for the given connection
|
2016-07-22 15:39:37 +02:00
|
|
|
private def processEvent(connection: Connection): Unit = {
|
2015-08-19 15:22:02 +02:00
|
|
|
|
2015-10-31 14:46:10 +01:00
|
|
|
// this must be the state after returning without delivering any signals, to avoid double-finalization of some unlucky stage
|
|
|
|
|
// (this can happen if a stage completes voluntarily while connection close events are still queued)
|
|
|
|
|
activeStage = null
|
2016-07-22 15:39:37 +02:00
|
|
|
val code = connection.portState
|
2015-09-25 16:36:53 +02:00
|
|
|
|
|
|
|
|
// Manual fast decoding, fast paths are PUSH and PULL
|
|
|
|
|
// PUSH
|
|
|
|
|
if ((code & (Pushing | InClosed | OutClosed)) == Pushing) {
|
2016-07-20 17:10:50 +02:00
|
|
|
processPush(connection)
|
2015-09-25 16:36:53 +02:00
|
|
|
|
|
|
|
|
// PULL
|
|
|
|
|
} else if ((code & (Pulling | OutClosed | InClosed)) == Pulling) {
|
2016-07-20 17:10:50 +02:00
|
|
|
processPull(connection)
|
2015-09-25 16:36:53 +02:00
|
|
|
|
|
|
|
|
// CANCEL
|
|
|
|
|
} else if ((code & (OutClosed | InClosed)) == InClosed) {
|
2016-07-22 15:39:37 +02:00
|
|
|
activeStage = connection.outOwner
|
|
|
|
|
if (Debug) println(s"$Name CANCEL ${inOwnerName(connection)} -> ${outOwnerName(connection)} (${connection.outHandler}) [${outLogicName(connection)}]")
|
|
|
|
|
connection.portState |= OutClosed
|
2017-03-10 19:08:08 +01:00
|
|
|
completeConnection(connection.outOwner.stageId)
|
2016-07-22 15:39:37 +02:00
|
|
|
connection.outHandler.onDownstreamFinish()
|
2015-09-25 16:36:53 +02:00
|
|
|
} else if ((code & (OutClosed | InClosed)) == OutClosed) {
|
|
|
|
|
// COMPLETIONS
|
|
|
|
|
|
|
|
|
|
if ((code & Pushing) == 0) {
|
|
|
|
|
// Normal completion (no push pending)
|
2016-07-22 15:39:37 +02:00
|
|
|
if (Debug) println(s"$Name COMPLETE ${outOwnerName(connection)} -> ${inOwnerName(connection)} (${connection.inHandler}) [${inLogicName(connection)}]")
|
|
|
|
|
connection.portState |= InClosed
|
|
|
|
|
activeStage = connection.inOwner
|
2017-03-10 19:08:08 +01:00
|
|
|
completeConnection(connection.inOwner.stageId)
|
2016-07-22 15:39:37 +02:00
|
|
|
if ((connection.portState & InFailed) == 0) connection.inHandler.onUpstreamFinish()
|
|
|
|
|
else connection.inHandler.onUpstreamFailure(connection.slot.asInstanceOf[Failed].ex)
|
2015-09-25 16:36:53 +02:00
|
|
|
} else {
|
|
|
|
|
// Push is pending, first process push, then re-enqueue closing event
|
2016-07-20 17:10:50 +02:00
|
|
|
processPush(connection)
|
2015-10-31 14:46:10 +01:00
|
|
|
enqueue(connection)
|
2015-09-25 16:36:53 +02:00
|
|
|
}
|
2015-08-19 15:22:02 +02:00
|
|
|
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2016-07-22 15:39:37 +02:00
|
|
|
private def processPush(connection: Connection): Unit = {
|
|
|
|
|
if (Debug) println(s"$Name PUSH ${outOwnerName(connection)} -> ${inOwnerName(connection)}, ${connection.slot} (${connection.inHandler}) [${inLogicName(connection)}]")
|
|
|
|
|
activeStage = connection.inOwner
|
|
|
|
|
connection.portState ^= PushEndFlip
|
|
|
|
|
connection.inHandler.onPush()
|
2016-07-20 17:10:50 +02:00
|
|
|
}
|
|
|
|
|
|
2016-07-22 15:39:37 +02:00
|
|
|
private def processPull(connection: Connection): Unit = {
|
|
|
|
|
if (Debug) println(s"$Name PULL ${inOwnerName(connection)} -> ${outOwnerName(connection)} (${connection.outHandler}) [${outLogicName(connection)}]")
|
|
|
|
|
activeStage = connection.outOwner
|
|
|
|
|
connection.portState ^= PullEndFlip
|
|
|
|
|
connection.outHandler.onPull()
|
2016-07-20 17:10:50 +02:00
|
|
|
}
|
|
|
|
|
|
2016-07-22 15:39:37 +02:00
|
|
|
private def dequeue(): Connection = {
|
2015-11-09 16:19:12 +01:00
|
|
|
val idx = queueHead & mask
|
|
|
|
|
if (fuzzingMode) {
|
|
|
|
|
val swapWith = (ThreadLocalRandom.current.nextInt(queueTail - queueHead) + queueHead) & mask
|
|
|
|
|
val ev = eventQueue(swapWith)
|
|
|
|
|
eventQueue(swapWith) = eventQueue(idx)
|
|
|
|
|
eventQueue(idx) = ev
|
2015-08-19 15:22:02 +02:00
|
|
|
}
|
2015-11-09 16:19:12 +01:00
|
|
|
val elem = eventQueue(idx)
|
|
|
|
|
eventQueue(idx) = NoEvent
|
|
|
|
|
queueHead += 1
|
|
|
|
|
elem
|
2015-08-19 15:22:02 +02:00
|
|
|
}
|
|
|
|
|
|
2016-07-22 15:39:37 +02:00
|
|
|
def enqueue(connection: Connection): Unit = {
|
2015-10-31 14:46:10 +01:00
|
|
|
if (Debug) if (queueTail - queueHead > mask) new Exception(s"$Name internal queue full ($queueStatus) + $connection").printStackTrace()
|
2015-08-19 15:22:02 +02:00
|
|
|
eventQueue(queueTail & mask) = connection
|
|
|
|
|
queueTail += 1
|
|
|
|
|
}
|
|
|
|
|
|
2015-10-31 14:46:10 +01:00
|
|
|
def afterStageHasRun(logic: GraphStageLogic): Unit =
|
|
|
|
|
if (isStageCompleted(logic)) {
|
|
|
|
|
runningStages -= 1
|
|
|
|
|
finalizeStage(logic)
|
|
|
|
|
}
|
|
|
|
|
|
2016-02-02 16:39:47 -05:00
|
|
|
// Returns true if the given stage is already completed
|
2015-10-31 14:46:10 +01:00
|
|
|
def isStageCompleted(stage: GraphStageLogic): Boolean = stage != null && shutdownCounter(stage.stageId) == 0
|
2015-08-19 15:22:02 +02:00
|
|
|
|
|
|
|
|
// Register that a connection in which the given stage participated has been completed and therefore the stage
|
|
|
|
|
// itself might stop, too.
|
|
|
|
|
private def completeConnection(stageId: Int): Unit = {
|
2016-07-27 13:29:23 +02:00
|
|
|
val activeConnections = shutdownCounter(stageId)
|
|
|
|
|
if (activeConnections > 0) shutdownCounter(stageId) = activeConnections - 1
|
2015-08-19 15:22:02 +02:00
|
|
|
}
|
|
|
|
|
|
2015-12-20 12:54:05 +01:00
|
|
|
private[stream] def setKeepGoing(logic: GraphStageLogic, enabled: Boolean): Unit =
|
|
|
|
|
if (enabled) shutdownCounter(logic.stageId) |= KeepGoingFlag
|
|
|
|
|
else shutdownCounter(logic.stageId) &= KeepGoingMask
|
2015-11-11 16:40:03 +01:00
|
|
|
|
2015-09-25 16:36:53 +02:00
|
|
|
private def finalizeStage(logic: GraphStageLogic): Unit = {
|
|
|
|
|
try {
|
|
|
|
|
logic.postStop()
|
|
|
|
|
logic.afterPostStop()
|
|
|
|
|
} catch {
|
|
|
|
|
case NonFatal(e) ⇒
|
2017-03-09 17:04:46 +01:00
|
|
|
log.error(e, s"Error during postStop in [{}]: {}", logic.originalStage.getOrElse(logic), e.getMessage)
|
2015-09-25 16:36:53 +02:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2016-07-22 15:39:37 +02:00
|
|
|
private[stream] def chasePush(connection: Connection): Unit = {
|
2016-07-20 17:10:50 +02:00
|
|
|
if (chaseCounter > 0 && chasedPush == NoEvent) {
|
|
|
|
|
chaseCounter -= 1
|
|
|
|
|
chasedPush = connection
|
|
|
|
|
} else enqueue(connection)
|
|
|
|
|
}
|
|
|
|
|
|
2016-07-22 15:39:37 +02:00
|
|
|
private[stream] def chasePull(connection: Connection): Unit = {
|
2016-07-20 17:10:50 +02:00
|
|
|
if (chaseCounter > 0 && chasedPull == NoEvent) {
|
|
|
|
|
chaseCounter -= 1
|
|
|
|
|
chasedPull = connection
|
|
|
|
|
} else enqueue(connection)
|
|
|
|
|
}
|
|
|
|
|
|
2016-07-22 15:39:37 +02:00
|
|
|
private[stream] def complete(connection: Connection): Unit = {
|
|
|
|
|
val currentState = connection.portState
|
2015-10-31 14:46:10 +01:00
|
|
|
if (Debug) println(s"$Name complete($connection) [$currentState]")
|
2016-07-22 15:39:37 +02:00
|
|
|
connection.portState = currentState | OutClosed
|
2016-07-20 17:10:50 +02:00
|
|
|
|
|
|
|
|
// Push-Close needs special treatment, cannot be chased, convert back to ordinary event
|
|
|
|
|
if (chasedPush == connection) {
|
|
|
|
|
chasedPush = NoEvent
|
|
|
|
|
enqueue(connection)
|
|
|
|
|
} else if ((currentState & (InClosed | Pushing | Pulling | OutClosed)) == 0) enqueue(connection)
|
|
|
|
|
|
2017-03-10 19:08:08 +01:00
|
|
|
if ((currentState & OutClosed) == 0) completeConnection(connection.outOwner.stageId)
|
2015-08-19 15:22:02 +02:00
|
|
|
}
|
|
|
|
|
|
2016-07-22 15:39:37 +02:00
|
|
|
private[stream] def fail(connection: Connection, ex: Throwable): Unit = {
|
|
|
|
|
val currentState = connection.portState
|
2015-10-31 14:46:10 +01:00
|
|
|
if (Debug) println(s"$Name fail($connection, $ex) [$currentState]")
|
2016-07-22 15:39:37 +02:00
|
|
|
connection.portState = currentState | OutClosed
|
2015-12-14 16:42:43 +01:00
|
|
|
if ((currentState & (InClosed | OutClosed)) == 0) {
|
2016-07-22 15:39:37 +02:00
|
|
|
connection.portState = currentState | (OutClosed | InFailed)
|
|
|
|
|
connection.slot = Failed(ex, connection.slot)
|
2015-10-31 14:46:10 +01:00
|
|
|
if ((currentState & (Pulling | Pushing)) == 0) enqueue(connection)
|
2016-09-12 16:50:02 +02:00
|
|
|
else if (chasedPush eq connection) {
|
|
|
|
|
// Abort chasing so Failure is not lost (chasing does NOT decode the event but assumes it to be a PUSH
|
|
|
|
|
// but we just changed the event!)
|
|
|
|
|
chasedPush = NoEvent
|
|
|
|
|
enqueue(connection)
|
|
|
|
|
}
|
2015-09-25 16:36:53 +02:00
|
|
|
}
|
2017-03-10 19:08:08 +01:00
|
|
|
if ((currentState & OutClosed) == 0) completeConnection(connection.outOwner.stageId)
|
2015-08-19 15:22:02 +02:00
|
|
|
}
|
|
|
|
|
|
2016-07-22 15:39:37 +02:00
|
|
|
private[stream] def cancel(connection: Connection): Unit = {
|
|
|
|
|
val currentState = connection.portState
|
2015-10-31 14:46:10 +01:00
|
|
|
if (Debug) println(s"$Name cancel($connection) [$currentState]")
|
2016-07-22 15:39:37 +02:00
|
|
|
connection.portState = currentState | InClosed
|
2015-10-26 10:48:22 +01:00
|
|
|
if ((currentState & OutClosed) == 0) {
|
2016-07-22 15:39:37 +02:00
|
|
|
connection.slot = Empty
|
2015-12-20 12:54:05 +01:00
|
|
|
if ((currentState & (Pulling | Pushing | InClosed)) == 0) enqueue(connection)
|
2016-09-12 16:50:02 +02:00
|
|
|
else if (chasedPull eq connection) {
|
|
|
|
|
// Abort chasing so Cancel is not lost (chasing does NOT decode the event but assumes it to be a PULL
|
|
|
|
|
// but we just changed the event!)
|
|
|
|
|
chasedPull = NoEvent
|
|
|
|
|
enqueue(connection)
|
|
|
|
|
}
|
2015-09-25 16:36:53 +02:00
|
|
|
}
|
2017-03-10 19:08:08 +01:00
|
|
|
if ((currentState & InClosed) == 0) completeConnection(connection.inOwner.stageId)
|
2015-08-19 15:22:02 +02:00
|
|
|
}
|
|
|
|
|
|
2015-12-20 12:54:05 +01:00
|
|
|
/**
|
|
|
|
|
* Debug utility to dump the "waits-on" relationships in DOT format to the console for analysis of deadlocks.
|
2017-03-09 17:04:46 +01:00
|
|
|
* Use dot/graphviz to render graph.
|
2015-12-20 12:54:05 +01:00
|
|
|
*
|
|
|
|
|
* Only invoke this after the interpreter completely settled, otherwise the results might be off. This is a very
|
|
|
|
|
* simplistic tool, make sure you are understanding what you are doing and then it will serve you well.
|
|
|
|
|
*/
|
2016-02-02 16:39:47 -05:00
|
|
|
def dumpWaits(): Unit = println(toString)
|
2015-12-20 12:54:05 +01:00
|
|
|
|
2017-03-09 17:04:46 +01:00
|
|
|
override def toString: String = {
|
|
|
|
|
try {
|
|
|
|
|
val builder = new StringBuilder("\ndot format graph for deadlock analysis:\n")
|
|
|
|
|
builder.append("================================================================\n")
|
|
|
|
|
builder.append("digraph waits {\n")
|
|
|
|
|
|
|
|
|
|
for (i ← logics.indices) {
|
|
|
|
|
val logic = logics(i)
|
|
|
|
|
val label = logic.originalStage.getOrElse(logic).toString
|
|
|
|
|
builder.append(s""" N$i [label="$label"];""").append('\n')
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
val logicIndexes = logics.zipWithIndex.map { case (stage, idx) ⇒ stage → idx }.toMap
|
|
|
|
|
for (connection ← connections) {
|
|
|
|
|
val inName = "N" + logicIndexes(connection.inOwner)
|
|
|
|
|
val outName = "N" + logicIndexes(connection.outOwner)
|
|
|
|
|
|
|
|
|
|
builder.append(s" $inName -> $outName ")
|
|
|
|
|
connection.portState match {
|
|
|
|
|
case InReady ⇒
|
|
|
|
|
builder.append("[label=shouldPull; color=blue];")
|
|
|
|
|
case OutReady ⇒
|
|
|
|
|
builder.append(s"[label=shouldPush; color=red];")
|
|
|
|
|
case x if (x | InClosed | OutClosed) == (InClosed | OutClosed) ⇒
|
|
|
|
|
builder.append("[style=dotted; label=closed dir=both];")
|
|
|
|
|
case _ ⇒
|
|
|
|
|
}
|
|
|
|
|
builder.append("\n")
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
builder.append("}\n================================================================\n")
|
|
|
|
|
builder.append(s"// $queueStatus (running=$runningStages, shutdown=${shutdownCounter.mkString(",")})")
|
|
|
|
|
builder.toString()
|
|
|
|
|
} catch {
|
|
|
|
|
case _: NoSuchElementException ⇒ "Not all logics has a stage listed, cannot create graph"
|
|
|
|
|
}
|
|
|
|
|
}
|
2015-10-31 14:46:10 +01:00
|
|
|
}
|