reducing array load pressure by making connections an explicit object
This commit is contained in:
parent
3a1a0cc4c4
commit
b8ceb863c9
8 changed files with 220 additions and 153 deletions
|
|
@ -27,7 +27,7 @@ object GraphInterpreter {
|
|||
*/
|
||||
final val Debug = false
|
||||
|
||||
final val NoEvent = -1
|
||||
final val NoEvent = null
|
||||
final val Boundary = -1
|
||||
|
||||
final val InReady = 1
|
||||
|
|
@ -66,6 +66,37 @@ object GraphInterpreter {
|
|||
|
||||
val singleNoAttribute: Array[Attributes] = Array(Attributes.none)
|
||||
|
||||
/**
|
||||
* INERNAL API
|
||||
*
|
||||
* Contains all the necessary information for the GraphInterpreter to be able to implement a connection
|
||||
* between an output and input ports.
|
||||
*
|
||||
* @param id Identifier of the connection. Corresponds to the array slot in the [[GraphAssembly]]
|
||||
* @param inOwnerId Identifier of the owner of the input side of the connection. Corresponds to the array slot in
|
||||
* the [[GraphAssembly]]
|
||||
* @param inOwner The stage logic that corresponds to the input side of the connection.
|
||||
* @param outOwnerId Identifier of the owner of the output side of the connection. Corresponds to the array slot
|
||||
* in the [[GraphAssembly]]
|
||||
* @param outOwner The stage logic that corresponds to the output side of the connection.
|
||||
* @param inHandler The handler that contains the callback for input events.
|
||||
* @param outHandler The handler that contains the callback for output events.
|
||||
*/
|
||||
final class Connection(
|
||||
val id: Int,
|
||||
val inOwnerId: Int,
|
||||
val inOwner: GraphStageLogic,
|
||||
val outOwnerId: Int,
|
||||
val outOwner: GraphStageLogic,
|
||||
var inHandler: InHandler,
|
||||
var outHandler: OutHandler
|
||||
) {
|
||||
var portState: Int = InReady
|
||||
var slot: Any = Empty
|
||||
|
||||
override def toString = s"Connection($id, $portState, $slot, $inHandler, $outHandler)"
|
||||
}
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
*
|
||||
|
|
@ -117,16 +148,14 @@ object GraphInterpreter {
|
|||
* handlers and the stage logic instances.
|
||||
*
|
||||
* Returns a tuple of
|
||||
* - lookup table for InHandlers
|
||||
* - lookup table for OutHandlers
|
||||
* - lookup table for Connections
|
||||
* - array of the logics
|
||||
* - materialized value
|
||||
*/
|
||||
def materialize(
|
||||
inheritedAttributes: Attributes,
|
||||
copiedModules: Array[Module],
|
||||
matVal: ju.Map[Module, Any],
|
||||
register: MaterializedValueSource[Any] ⇒ Unit): (Array[InHandler], Array[OutHandler], Array[GraphStageLogic]) = {
|
||||
register: MaterializedValueSource[Any] ⇒ Unit): (Array[Connection], Array[GraphStageLogic]) = {
|
||||
val logics = Array.ofDim[GraphStageLogic](stages.length)
|
||||
|
||||
var i = 0
|
||||
|
|
@ -167,32 +196,43 @@ object GraphInterpreter {
|
|||
i += 1
|
||||
}
|
||||
|
||||
val inHandlers = Array.ofDim[InHandler](connectionCount)
|
||||
val outHandlers = Array.ofDim[OutHandler](connectionCount)
|
||||
val connections = Array.ofDim[Connection](connectionCount)
|
||||
|
||||
i = 0
|
||||
while (i < connectionCount) {
|
||||
connections(i) = new Connection(
|
||||
id = i,
|
||||
inOwner = if (inOwners(i) == Boundary) null else logics(inOwners(i)),
|
||||
inOwnerId = inOwners(i),
|
||||
outOwner = if (outOwners(i) == Boundary) null else logics(outOwners(i)),
|
||||
outOwnerId = outOwners(i),
|
||||
inHandler = null,
|
||||
outHandler = null
|
||||
)
|
||||
|
||||
if (ins(i) ne null) {
|
||||
val logic = logics(inOwners(i))
|
||||
logic.handlers(ins(i).id) match {
|
||||
case null ⇒ throw new IllegalStateException(s"no handler defined in stage $logic for port ${ins(i)}")
|
||||
case h: InHandler ⇒ inHandlers(i) = h
|
||||
case null ⇒ throw new IllegalStateException(s"no handler defined in stage $logic for port ${ins(i)}")
|
||||
case h: InHandler ⇒
|
||||
connections(i).inHandler = h
|
||||
}
|
||||
logics(inOwners(i)).portToConn(ins(i).id) = i
|
||||
logics(inOwners(i)).portToConn(ins(i).id) = connections(i)
|
||||
}
|
||||
if (outs(i) ne null) {
|
||||
val logic = logics(outOwners(i))
|
||||
val inCount = logic.inCount
|
||||
logic.handlers(outs(i).id + inCount) match {
|
||||
case null ⇒ throw new IllegalStateException(s"no handler defined in stage $logic for port ${outs(i)}")
|
||||
case h: OutHandler ⇒ outHandlers(i) = h
|
||||
case null ⇒ throw new IllegalStateException(s"no handler defined in stage $logic for port ${outs(i)}")
|
||||
case h: OutHandler ⇒
|
||||
connections(i).outHandler = h
|
||||
}
|
||||
logic.portToConn(outs(i).id + inCount) = i
|
||||
logic.portToConn(outs(i).id + inCount) = connections(i)
|
||||
}
|
||||
i += 1
|
||||
}
|
||||
|
||||
(inHandlers, outHandlers, logics)
|
||||
(connections, logics)
|
||||
}
|
||||
|
||||
override def toString: String = {
|
||||
|
|
@ -293,25 +333,27 @@ object GraphInterpreter {
|
|||
* The internal architecture of the interpreter is based on the usage of arrays and optimized for reducing allocations
|
||||
* on the hot paths.
|
||||
*
|
||||
* One of the basic abstractions inside the interpreter is the notion of *connection*. In the abstract sense a
|
||||
* connection represents an output-input port pair (an analogue for a connected RS Publisher-Subscriber pair),
|
||||
* while in the practical sense a connection is a number which represents slots in certain arrays.
|
||||
* One of the basic abstractions inside the interpreter is the [[akka.stream.impl.fusing.GraphInterpreter.Connection]].
|
||||
* A connection represents an output-input port pair (an analogue for a connected RS Publisher-Subscriber pair).
|
||||
* The Connection object contains all the necessary data for the interpreter to pass elements, demand, completion
|
||||
* or errors across the Connection.
|
||||
* In particular
|
||||
* - portStates contains a bitfield that tracks the states of the ports (output-input) corresponding to this
|
||||
* connection. This bitfield is used to decode the event that is in-flight.
|
||||
* - connectionSlots is a mapping from a connection id to a potential element or exception that accompanies the
|
||||
* - connectionSlot contains a potential element or exception that accompanies the
|
||||
* event encoded in the portStates bitfield
|
||||
* - inHandlers is a mapping from a connection id to the [[InHandler]] instance that handles the events corresponding
|
||||
* - inHandler contains the [[InHandler]] instance that handles the events corresponding
|
||||
* to the input port of the connection
|
||||
* - outHandlers is a mapping from a connection id to the [[OutHandler]] instance that handles the events corresponding
|
||||
* - outHandler contains the [[OutHandler]] instance that handles the events corresponding
|
||||
* to the output port of the connection
|
||||
*
|
||||
* On top of these lookup tables there is an eventQueue, represented as a circular buffer of integers. The integers
|
||||
* it contains represents connections that have pending events to be processed. The pending event itself is encoded
|
||||
* in the portStates bitfield. This implies that there can be only one event in flight for a given connection, which
|
||||
* is true in almost all cases, except a complete-after-push or fail-after-push.
|
||||
* On top of the Connection table there is an eventQueue, represented as a circular buffer of Connections. The queue
|
||||
* contains the Connections that have pending events to be processed. The pending event itself is encoded
|
||||
* in the portState bitfield of the Connection. This implies that there can be only one event in flight for a given
|
||||
* Connection, which is true in almost all cases, except a complete-after-push or fail-after-push which has to
|
||||
* be decoded accordingly.
|
||||
*
|
||||
* The layout of the portStates bitfield is the following:
|
||||
* The layout of the portState bitfield is the following:
|
||||
*
|
||||
* |- state machn.-| Only one bit is hot among these bits
|
||||
* 64 32 16 | 8 4 2 1 |
|
||||
|
|
@ -335,10 +377,10 @@ object GraphInterpreter {
|
|||
* Sending an event is usually the following sequence:
|
||||
* - An action is requested by a stage logic (push, pull, complete, etc.)
|
||||
* - the state machine in portStates is transitioned from a ready state to a pending event
|
||||
* - the id of the affected connection is enqueued
|
||||
* - the affected Connection is enqueued
|
||||
*
|
||||
* Receiving an event is usually the following sequence:
|
||||
* - id of connection to be processed is dequeued
|
||||
* - the Connection to be processed is dequeued
|
||||
* - the type of the event is determined from the bits set on portStates
|
||||
* - the state machine in portStates is transitioned to a ready state
|
||||
* - using the inHandlers/outHandlers table the corresponding callback is called on the stage logic.
|
||||
|
|
@ -352,9 +394,8 @@ final class GraphInterpreter(
|
|||
private val assembly: GraphInterpreter.GraphAssembly,
|
||||
val materializer: Materializer,
|
||||
val log: LoggingAdapter,
|
||||
val inHandlers: Array[InHandler], // Lookup table for the InHandler of a connection
|
||||
val outHandlers: Array[OutHandler], // Lookup table for the outHandler of the connection
|
||||
val logics: Array[GraphStageLogic], // Array of stage logics
|
||||
val connections: Array[GraphInterpreter.Connection],
|
||||
val onAsyncInput: (GraphStageLogic, Any, (Any) ⇒ Unit) ⇒ Unit,
|
||||
val fuzzingMode: Boolean,
|
||||
val context: ActorRef) {
|
||||
|
|
@ -362,11 +403,11 @@ final class GraphInterpreter(
|
|||
|
||||
// Maintains additional information for events, basically elements in-flight, or failure.
|
||||
// Other events are encoded in the portStates bitfield.
|
||||
val connectionSlots = Array.fill[Any](assembly.connectionCount)(Empty)
|
||||
//val connectionSlots = Array.fill[Any](assembly.connectionCount)(Empty)
|
||||
|
||||
// Bitfield encoding pending events and various states for efficient querying and updates. See the documentation
|
||||
// of the class for a full description.
|
||||
val portStates = Array.fill[Int](assembly.connectionCount)(InReady)
|
||||
//val portStates = Array.fill[Int](assembly.connectionCount)(InReady)
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
|
|
@ -388,19 +429,19 @@ final class GraphInterpreter(
|
|||
|
||||
// An event queue implemented as a circular buffer
|
||||
// FIXME: This calculates the maximum size ever needed, but most assemblies can run on a smaller queue
|
||||
private[this] val eventQueue = Array.ofDim[Int](1 << (32 - Integer.numberOfLeadingZeros(assembly.connectionCount - 1)))
|
||||
private[this] val eventQueue = Array.ofDim[Connection](1 << (32 - Integer.numberOfLeadingZeros(assembly.connectionCount - 1)))
|
||||
private[this] val mask = eventQueue.length - 1
|
||||
private[this] var queueHead: Int = 0
|
||||
private[this] var queueTail: Int = 0
|
||||
|
||||
private[this] var chaseCounter = 0 // the first events in preStart blocks should be not chased
|
||||
private[this] var chasedPush: Int = NoEvent
|
||||
private[this] var chasedPull: Int = NoEvent
|
||||
private[this] var chasedPush: Connection = NoEvent
|
||||
private[this] var chasedPull: Connection = NoEvent
|
||||
|
||||
private def queueStatus: String = {
|
||||
val contents = (queueHead until queueTail).map(idx ⇒ {
|
||||
val conn = eventQueue(idx & mask)
|
||||
(conn, portStates(conn), connectionSlots(conn))
|
||||
conn
|
||||
})
|
||||
s"(${eventQueue.length}, $queueHead, $queueTail)(${contents.mkString(", ")})"
|
||||
}
|
||||
|
|
@ -420,36 +461,42 @@ final class GraphInterpreter(
|
|||
* Assign the boundary logic to a given connection. This will serve as the interface to the external world
|
||||
* (outside the interpreter) to process and inject events.
|
||||
*/
|
||||
def attachUpstreamBoundary(connection: Int, logic: UpstreamBoundaryStageLogic[_]): Unit = {
|
||||
def attachUpstreamBoundary(connection: Connection, logic: UpstreamBoundaryStageLogic[_]): Unit = {
|
||||
logic.portToConn(logic.out.id + logic.inCount) = connection
|
||||
logic.interpreter = this
|
||||
outHandlers(connection) = logic.handlers(0).asInstanceOf[OutHandler]
|
||||
connection.outHandler = logic.handlers(0).asInstanceOf[OutHandler]
|
||||
}
|
||||
|
||||
def attachUpstreamBoundary(connection: Int, logic: UpstreamBoundaryStageLogic[_]): Unit =
|
||||
attachUpstreamBoundary(connections(connection), logic)
|
||||
|
||||
/**
|
||||
* Assign the boundary logic to a given connection. This will serve as the interface to the external world
|
||||
* (outside the interpreter) to process and inject events.
|
||||
*/
|
||||
def attachDownstreamBoundary(connection: Int, logic: DownstreamBoundaryStageLogic[_]): Unit = {
|
||||
def attachDownstreamBoundary(connection: Connection, logic: DownstreamBoundaryStageLogic[_]): Unit = {
|
||||
logic.portToConn(logic.in.id) = connection
|
||||
logic.interpreter = this
|
||||
inHandlers(connection) = logic.handlers(0).asInstanceOf[InHandler]
|
||||
connection.inHandler = logic.handlers(0).asInstanceOf[InHandler]
|
||||
}
|
||||
|
||||
def attachDownstreamBoundary(connection: Int, logic: DownstreamBoundaryStageLogic[_]): Unit =
|
||||
attachDownstreamBoundary(connections(connection), logic)
|
||||
|
||||
/**
|
||||
* Dynamic handler changes are communicated from a GraphStageLogic by this method.
|
||||
*/
|
||||
def setHandler(connection: Int, handler: InHandler): Unit = {
|
||||
def setHandler(connection: Connection, handler: InHandler): Unit = {
|
||||
if (Debug) println(s"$Name SETHANDLER ${inOwnerName(connection)} (in) $handler")
|
||||
inHandlers(connection) = handler
|
||||
connection.inHandler = handler
|
||||
}
|
||||
|
||||
/**
|
||||
* Dynamic handler changes are communicated from a GraphStageLogic by this method.
|
||||
*/
|
||||
def setHandler(connection: Int, handler: OutHandler): Unit = {
|
||||
def setHandler(connection: Connection, handler: OutHandler): Unit = {
|
||||
if (Debug) println(s"$Name SETHANDLER ${outOwnerName(connection)} (out) $handler")
|
||||
outHandlers(connection) = handler
|
||||
connection.outHandler = handler
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -502,29 +549,29 @@ final class GraphInterpreter(
|
|||
}
|
||||
|
||||
// Debug name for a connections input part
|
||||
private def inOwnerName(connection: Int): String =
|
||||
assembly.inOwners(connection) match {
|
||||
private def inOwnerName(connection: Connection): String =
|
||||
assembly.inOwners(connection.id) match {
|
||||
case Boundary ⇒ "DownstreamBoundary"
|
||||
case owner ⇒ assembly.stages(owner).toString
|
||||
}
|
||||
|
||||
// Debug name for a connections output part
|
||||
private def outOwnerName(connection: Int): String =
|
||||
assembly.outOwners(connection) match {
|
||||
private def outOwnerName(connection: Connection): String =
|
||||
assembly.outOwners(connection.id) match {
|
||||
case Boundary ⇒ "UpstreamBoundary"
|
||||
case owner ⇒ assembly.stages(owner).toString
|
||||
}
|
||||
|
||||
// Debug name for a connections input part
|
||||
private def inLogicName(connection: Int): String =
|
||||
assembly.inOwners(connection) match {
|
||||
private def inLogicName(connection: Connection): String =
|
||||
assembly.inOwners(connection.id) match {
|
||||
case Boundary ⇒ "DownstreamBoundary"
|
||||
case owner ⇒ logics(owner).toString
|
||||
}
|
||||
|
||||
// Debug name for a connections output part
|
||||
private def outLogicName(connection: Int): String =
|
||||
assembly.outOwners(connection) match {
|
||||
private def outLogicName(connection: Connection): String =
|
||||
assembly.outOwners(connection.id) match {
|
||||
case Boundary ⇒ "UpstreamBoundary"
|
||||
case owner ⇒ logics(owner).toString
|
||||
}
|
||||
|
|
@ -558,23 +605,51 @@ final class GraphInterpreter(
|
|||
|
||||
// Abort chasing
|
||||
chaseCounter = 0
|
||||
if (chasedPush != NoEvent) {
|
||||
if (chasedPush ne NoEvent) {
|
||||
enqueue(chasedPush)
|
||||
chasedPush = NoEvent
|
||||
}
|
||||
if (chasedPull != NoEvent) {
|
||||
if (chasedPull ne NoEvent) {
|
||||
enqueue(chasedPull)
|
||||
chasedPull = NoEvent
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* This is the "normal" event processing code which dequeues directly from the internal event queue. Since
|
||||
* most execution paths tend to produce either a Push that will be propagated along a longer chain we take
|
||||
* extra steps below to make this more efficient.
|
||||
*/
|
||||
try processEvent(connection)
|
||||
catch {
|
||||
case NonFatal(e) ⇒ reportStageError(e)
|
||||
}
|
||||
afterStageHasRun(activeStage)
|
||||
|
||||
/*
|
||||
* "Event chasing" optimization follows from here. This optimization works under the assumption that a Push or
|
||||
* Pull is very likely immediately followed by another Push/Pull. The difference from the "normal" event
|
||||
* dispatch is that chased events are never touching the event queue, they use a "streamlined" execution path
|
||||
* instead. Looking at the scenario of a Push, the following events will happen.
|
||||
* - "normal" dispatch executes an onPush event
|
||||
* - stage eventually calls push()
|
||||
* - code inside the push() method checks the validity of the call, and also if it can be safely ignored
|
||||
* (because the target stage already completed we just have not been notified yet)
|
||||
* - if the upper limit of ChaseLimit has not been reached, then the Connection is put into the chasedPush
|
||||
* variable
|
||||
* - the loop below immediately captures this push and dispatches it
|
||||
*
|
||||
* What is saved by this optimization is three steps:
|
||||
* - no need to enqueue the Connection in the queue (array), it ends up in a simple variable, reducing
|
||||
* pressure on array load-store
|
||||
* - no need to dequeue the Connection from the queue, similar to above
|
||||
* - no need to decode the event, we know it is a Push already
|
||||
* - no need to check for validity of the event because we already checked at the push() call, and there
|
||||
* can be no concurrent events interleaved unlike with the normal dispatch (think about a cancel() that is
|
||||
* called in the target stage just before the onPush() arrives). This avoids unnecessary branching.
|
||||
*/
|
||||
|
||||
// Chasing PUSH events
|
||||
while (chasedPush != NoEvent) {
|
||||
val connection = chasedPush
|
||||
|
|
@ -629,17 +704,13 @@ final class GraphInterpreter(
|
|||
} finally currentInterpreterHolder(0) = previousInterpreter
|
||||
}
|
||||
|
||||
private def safeLogics(id: Int) =
|
||||
if (id == Boundary) null
|
||||
else logics(id)
|
||||
|
||||
// Decodes and processes a single event for the given connection
|
||||
private def processEvent(connection: Int): Unit = {
|
||||
private def processEvent(connection: Connection): Unit = {
|
||||
|
||||
// this must be the state after returning without delivering any signals, to avoid double-finalization of some unlucky stage
|
||||
// (this can happen if a stage completes voluntarily while connection close events are still queued)
|
||||
activeStage = null
|
||||
val code = portStates(connection)
|
||||
val code = connection.portState
|
||||
|
||||
// Manual fast decoding, fast paths are PUSH and PULL
|
||||
// PUSH
|
||||
|
|
@ -652,24 +723,22 @@ final class GraphInterpreter(
|
|||
|
||||
// CANCEL
|
||||
} else if ((code & (OutClosed | InClosed)) == InClosed) {
|
||||
val stageId = assembly.outOwners(connection)
|
||||
activeStage = safeLogics(stageId)
|
||||
if (Debug) println(s"$Name CANCEL ${inOwnerName(connection)} -> ${outOwnerName(connection)} (${outHandlers(connection)}) [${outLogicName(connection)}]")
|
||||
portStates(connection) |= OutClosed
|
||||
completeConnection(stageId)
|
||||
outHandlers(connection).onDownstreamFinish()
|
||||
activeStage = connection.outOwner
|
||||
if (Debug) println(s"$Name CANCEL ${inOwnerName(connection)} -> ${outOwnerName(connection)} (${connection.outHandler}) [${outLogicName(connection)}]")
|
||||
connection.portState |= OutClosed
|
||||
completeConnection(connection.outOwnerId)
|
||||
connection.outHandler.onDownstreamFinish()
|
||||
} else if ((code & (OutClosed | InClosed)) == OutClosed) {
|
||||
// COMPLETIONS
|
||||
|
||||
if ((code & Pushing) == 0) {
|
||||
// Normal completion (no push pending)
|
||||
if (Debug) println(s"$Name COMPLETE ${outOwnerName(connection)} -> ${inOwnerName(connection)} (${inHandlers(connection)}) [${inLogicName(connection)}]")
|
||||
portStates(connection) |= InClosed
|
||||
val stageId = assembly.inOwners(connection)
|
||||
activeStage = safeLogics(stageId)
|
||||
completeConnection(stageId)
|
||||
if ((portStates(connection) & InFailed) == 0) inHandlers(connection).onUpstreamFinish()
|
||||
else inHandlers(connection).onUpstreamFailure(connectionSlots(connection).asInstanceOf[Failed].ex)
|
||||
if (Debug) println(s"$Name COMPLETE ${outOwnerName(connection)} -> ${inOwnerName(connection)} (${connection.inHandler}) [${inLogicName(connection)}]")
|
||||
connection.portState |= InClosed
|
||||
activeStage = connection.inOwner
|
||||
completeConnection(connection.inOwnerId)
|
||||
if ((connection.portState & InFailed) == 0) connection.inHandler.onUpstreamFinish()
|
||||
else connection.inHandler.onUpstreamFailure(connection.slot.asInstanceOf[Failed].ex)
|
||||
} else {
|
||||
// Push is pending, first process push, then re-enqueue closing event
|
||||
processPush(connection)
|
||||
|
|
@ -679,21 +748,21 @@ final class GraphInterpreter(
|
|||
}
|
||||
}
|
||||
|
||||
private def processPush(connection: Int): Unit = {
|
||||
if (Debug) println(s"$Name PUSH ${outOwnerName(connection)} -> ${inOwnerName(connection)}, ${connectionSlots(connection)} (${inHandlers(connection)}) [${inLogicName(connection)}]")
|
||||
activeStage = safeLogics(assembly.inOwners(connection))
|
||||
portStates(connection) ^= PushEndFlip
|
||||
inHandlers(connection).onPush()
|
||||
private def processPush(connection: Connection): Unit = {
|
||||
if (Debug) println(s"$Name PUSH ${outOwnerName(connection)} -> ${inOwnerName(connection)}, ${connection.slot} (${connection.inHandler}) [${inLogicName(connection)}]")
|
||||
activeStage = connection.inOwner
|
||||
connection.portState ^= PushEndFlip
|
||||
connection.inHandler.onPush()
|
||||
}
|
||||
|
||||
private def processPull(connection: Int): Unit = {
|
||||
if (Debug) println(s"$Name PULL ${inOwnerName(connection)} -> ${outOwnerName(connection)} (${outHandlers(connection)}) [${outLogicName(connection)}]")
|
||||
activeStage = safeLogics(assembly.outOwners(connection))
|
||||
portStates(connection) ^= PullEndFlip
|
||||
outHandlers(connection).onPull()
|
||||
private def processPull(connection: Connection): Unit = {
|
||||
if (Debug) println(s"$Name PULL ${inOwnerName(connection)} -> ${outOwnerName(connection)} (${connection.outHandler}) [${outLogicName(connection)}]")
|
||||
activeStage = connection.outOwner
|
||||
connection.portState ^= PullEndFlip
|
||||
connection.outHandler.onPull()
|
||||
}
|
||||
|
||||
private def dequeue(): Int = {
|
||||
private def dequeue(): Connection = {
|
||||
val idx = queueHead & mask
|
||||
if (fuzzingMode) {
|
||||
val swapWith = (ThreadLocalRandom.current.nextInt(queueTail - queueHead) + queueHead) & mask
|
||||
|
|
@ -707,7 +776,7 @@ final class GraphInterpreter(
|
|||
elem
|
||||
}
|
||||
|
||||
def enqueue(connection: Int): Unit = {
|
||||
def enqueue(connection: Connection): Unit = {
|
||||
if (Debug) if (queueTail - queueHead > mask) new Exception(s"$Name internal queue full ($queueStatus) + $connection").printStackTrace()
|
||||
eventQueue(queueTail & mask) = connection
|
||||
queueTail += 1
|
||||
|
|
@ -745,24 +814,24 @@ final class GraphInterpreter(
|
|||
}
|
||||
}
|
||||
|
||||
private[stream] def chasePush(connection: Int): Unit = {
|
||||
private[stream] def chasePush(connection: Connection): Unit = {
|
||||
if (chaseCounter > 0 && chasedPush == NoEvent) {
|
||||
chaseCounter -= 1
|
||||
chasedPush = connection
|
||||
} else enqueue(connection)
|
||||
}
|
||||
|
||||
private[stream] def chasePull(connection: Int): Unit = {
|
||||
private[stream] def chasePull(connection: Connection): Unit = {
|
||||
if (chaseCounter > 0 && chasedPull == NoEvent) {
|
||||
chaseCounter -= 1
|
||||
chasedPull = connection
|
||||
} else enqueue(connection)
|
||||
}
|
||||
|
||||
private[stream] def complete(connection: Int): Unit = {
|
||||
val currentState = portStates(connection)
|
||||
private[stream] def complete(connection: Connection): Unit = {
|
||||
val currentState = connection.portState
|
||||
if (Debug) println(s"$Name complete($connection) [$currentState]")
|
||||
portStates(connection) = currentState | OutClosed
|
||||
connection.portState = currentState | OutClosed
|
||||
|
||||
// Push-Close needs special treatment, cannot be chased, convert back to ordinary event
|
||||
if (chasedPush == connection) {
|
||||
|
|
@ -770,30 +839,30 @@ final class GraphInterpreter(
|
|||
enqueue(connection)
|
||||
} else if ((currentState & (InClosed | Pushing | Pulling | OutClosed)) == 0) enqueue(connection)
|
||||
|
||||
if ((currentState & OutClosed) == 0) completeConnection(assembly.outOwners(connection))
|
||||
if ((currentState & OutClosed) == 0) completeConnection(connection.outOwnerId)
|
||||
}
|
||||
|
||||
private[stream] def fail(connection: Int, ex: Throwable): Unit = {
|
||||
val currentState = portStates(connection)
|
||||
private[stream] def fail(connection: Connection, ex: Throwable): Unit = {
|
||||
val currentState = connection.portState
|
||||
if (Debug) println(s"$Name fail($connection, $ex) [$currentState]")
|
||||
portStates(connection) = currentState | OutClosed
|
||||
connection.portState = currentState | OutClosed
|
||||
if ((currentState & (InClosed | OutClosed)) == 0) {
|
||||
portStates(connection) = currentState | (OutClosed | InFailed)
|
||||
connectionSlots(connection) = Failed(ex, connectionSlots(connection))
|
||||
connection.portState = currentState | (OutClosed | InFailed)
|
||||
connection.slot = Failed(ex, connection.slot)
|
||||
if ((currentState & (Pulling | Pushing)) == 0) enqueue(connection)
|
||||
}
|
||||
if ((currentState & OutClosed) == 0) completeConnection(assembly.outOwners(connection))
|
||||
if ((currentState & OutClosed) == 0) completeConnection(connection.outOwnerId)
|
||||
}
|
||||
|
||||
private[stream] def cancel(connection: Int): Unit = {
|
||||
val currentState = portStates(connection)
|
||||
private[stream] def cancel(connection: Connection): Unit = {
|
||||
val currentState = connection.portState
|
||||
if (Debug) println(s"$Name cancel($connection) [$currentState]")
|
||||
portStates(connection) = currentState | InClosed
|
||||
connection.portState = currentState | InClosed
|
||||
if ((currentState & OutClosed) == 0) {
|
||||
connectionSlots(connection) = Empty
|
||||
connection.slot = Empty
|
||||
if ((currentState & (Pulling | Pushing | InClosed)) == 0) enqueue(connection)
|
||||
}
|
||||
if ((currentState & InClosed) == 0) completeConnection(assembly.inOwners(connection))
|
||||
if ((currentState & InClosed) == 0) completeConnection(connection.inOwnerId)
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -822,8 +891,8 @@ final class GraphInterpreter(
|
|||
else "N" + owner
|
||||
}
|
||||
|
||||
for (i ← portStates.indices) {
|
||||
portStates(i) match {
|
||||
for (i ← connections.indices) {
|
||||
connections(i).portState match {
|
||||
case InReady ⇒
|
||||
builder.append(s""" ${nameIn(i)} -> ${nameOut(i)} [label=shouldPull; color=blue]""")
|
||||
case OutReady ⇒
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue