Event chasing implemented

This commit is contained in:
Endre Sándor Varga 2016-07-20 17:10:50 +02:00
parent b01acfd8d6
commit 3a1a0cc4c4
4 changed files with 117 additions and 34 deletions

View file

@ -47,6 +47,8 @@ object GraphInterpreter {
final val KeepGoingFlag = 0x4000000
final val KeepGoingMask = 0x3ffffff
final val ChaseLimit = 16
/**
* Marker object that indicates that a port holds no element since it was already grabbed. The port is still pullable,
* but there is no more element to grab.
@ -381,7 +383,7 @@ final class GraphInterpreter(
shape.inlets.size + shape.outlets.size
}
private var _subFusingMaterializer: Materializer = _
private[this] var _subFusingMaterializer: Materializer = _
def subFusingMaterializer: Materializer = _subFusingMaterializer
// An event queue implemented as a circular buffer
@ -391,6 +393,10 @@ final class GraphInterpreter(
private[this] var queueHead: Int = 0
private[this] var queueTail: Int = 0
private[this] var chaseCounter = 0 // the first events in preStart blocks should be not chased
private[this] var chasedPush: Int = NoEvent
private[this] var chasedPull: Int = NoEvent
private def queueStatus: String = {
val contents = (queueHead until queueTail).map(idx {
val conn = eventQueue(idx & mask)
@ -539,20 +545,66 @@ final class GraphInterpreter(
try {
while (eventsRemaining > 0 && queueTail != queueHead) {
val connection = dequeue()
eventsRemaining -= 1
chaseCounter = math.min(ChaseLimit, eventsRemaining)
def reportStageError(e: Throwable): Unit = {
if (activeStage == null) throw e
else {
val stage = assembly.stages(activeStage.stageId)
log.error(e, "Error in stage [{}]: {}", stage, e.getMessage)
activeStage.failStage(e)
// Abort chasing
chaseCounter = 0
if (chasedPush != NoEvent) {
enqueue(chasedPush)
chasedPush = NoEvent
}
if (chasedPull != NoEvent) {
enqueue(chasedPull)
chasedPull = NoEvent
}
}
}
try processEvent(connection)
catch {
case NonFatal(e)
if (activeStage == null) throw e
else {
val stage = assembly.stages(activeStage.stageId)
log.error(e, "Error in stage [{}]: {}", stage, e.getMessage)
activeStage.failStage(e)
}
case NonFatal(e) reportStageError(e)
}
afterStageHasRun(activeStage)
eventsRemaining -= 1
// Chasing PUSH events
while (chasedPush != NoEvent) {
val connection = chasedPush
chasedPush = NoEvent
try processPush(connection)
catch {
case NonFatal(e) reportStageError(e)
}
afterStageHasRun(activeStage)
}
// Chasing PULL events
while (chasedPull != NoEvent) {
val connection = chasedPull
chasedPull = NoEvent
try processPull(connection)
catch {
case NonFatal(e) reportStageError(e)
}
afterStageHasRun(activeStage)
}
if (chasedPush != NoEvent) {
enqueue(chasedPush)
chasedPush = NoEvent
}
}
// Event *must* be enqueued while not in the execute loop (events enqueued from external, possibly async events)
chaseCounter = 0
} finally {
currentInterpreterHolder(0) = previousInterpreter
}
@ -577,18 +629,12 @@ final class GraphInterpreter(
} finally currentInterpreterHolder(0) = previousInterpreter
}
private def safeLogics(id: Int) =
if (id == Boundary) null
else logics(id)
// Decodes and processes a single event for the given connection
private def processEvent(connection: Int): Unit = {
def safeLogics(id: Int) =
if (id == Boundary) null
else logics(id)
def processElement(): Unit = {
if (Debug) println(s"$Name PUSH ${outOwnerName(connection)} -> ${inOwnerName(connection)}, ${connectionSlots(connection)} (${inHandlers(connection)}) [${inLogicName(connection)}]")
activeStage = safeLogics(assembly.inOwners(connection))
portStates(connection) ^= PushEndFlip
inHandlers(connection).onPush()
}
// this must be the state after returning without delivering any signals, to avoid double-finalization of some unlucky stage
// (this can happen if a stage completes voluntarily while connection close events are still queued)
@ -598,14 +644,11 @@ final class GraphInterpreter(
// Manual fast decoding, fast paths are PUSH and PULL
// PUSH
if ((code & (Pushing | InClosed | OutClosed)) == Pushing) {
processElement()
processPush(connection)
// PULL
} else if ((code & (Pulling | OutClosed | InClosed)) == Pulling) {
if (Debug) println(s"$Name PULL ${inOwnerName(connection)} -> ${outOwnerName(connection)} (${outHandlers(connection)}) [${outLogicName(connection)}]")
portStates(connection) ^= PullEndFlip
activeStage = safeLogics(assembly.outOwners(connection))
outHandlers(connection).onPull()
processPull(connection)
// CANCEL
} else if ((code & (OutClosed | InClosed)) == InClosed) {
@ -629,13 +672,27 @@ final class GraphInterpreter(
else inHandlers(connection).onUpstreamFailure(connectionSlots(connection).asInstanceOf[Failed].ex)
} else {
// Push is pending, first process push, then re-enqueue closing event
processElement()
processPush(connection)
enqueue(connection)
}
}
}
private def processPush(connection: Int): Unit = {
if (Debug) println(s"$Name PUSH ${outOwnerName(connection)} -> ${inOwnerName(connection)}, ${connectionSlots(connection)} (${inHandlers(connection)}) [${inLogicName(connection)}]")
activeStage = safeLogics(assembly.inOwners(connection))
portStates(connection) ^= PushEndFlip
inHandlers(connection).onPush()
}
private def processPull(connection: Int): Unit = {
if (Debug) println(s"$Name PULL ${inOwnerName(connection)} -> ${outOwnerName(connection)} (${outHandlers(connection)}) [${outLogicName(connection)}]")
activeStage = safeLogics(assembly.outOwners(connection))
portStates(connection) ^= PullEndFlip
outHandlers(connection).onPull()
}
private def dequeue(): Int = {
val idx = queueHead & mask
if (fuzzingMode) {
@ -688,11 +745,31 @@ final class GraphInterpreter(
}
}
private[stream] def chasePush(connection: Int): Unit = {
if (chaseCounter > 0 && chasedPush == NoEvent) {
chaseCounter -= 1
chasedPush = connection
} else enqueue(connection)
}
private[stream] def chasePull(connection: Int): Unit = {
if (chaseCounter > 0 && chasedPull == NoEvent) {
chaseCounter -= 1
chasedPull = connection
} else enqueue(connection)
}
private[stream] def complete(connection: Int): Unit = {
val currentState = portStates(connection)
if (Debug) println(s"$Name complete($connection) [$currentState]")
portStates(connection) = currentState | OutClosed
if ((currentState & (InClosed | Pushing | Pulling | OutClosed)) == 0) enqueue(connection)
// Push-Close needs special treatment, cannot be chased, convert back to ordinary event
if (chasedPush == connection) {
chasedPush = NoEvent
enqueue(connection)
} else if ((currentState & (InClosed | Pushing | Pulling | OutClosed)) == 0) enqueue(connection)
if ((currentState & OutClosed) == 0) completeConnection(assembly.outOwners(connection))
}