+str #18835: Allow stages to stay alive even after all ports closed

This commit is contained in:
Endre Sándor Varga 2015-11-11 16:40:03 +01:00
parent 4e127e6206
commit 3822f6cb1c
3 changed files with 220 additions and 3 deletions

View file

@ -357,7 +357,8 @@ private[stream] final class GraphInterpreter(
// Counts how many active connections a stage has. Once it reaches zero, the stage is automatically stopped.
private[this] val shutdownCounter = Array.tabulate(assembly.stages.length) { i
val shape = assembly.stages(i).shape
shape.inlets.size + shape.outlets.size
val keepGoing = if (logics(i).keepGoingAfterAllPortsClosed) 1 else 0
shape.inlets.size + shape.outlets.size + keepGoing
}
// An event queue implemented as a circular buffer
@ -618,6 +619,12 @@ private[stream] final class GraphInterpreter(
}
}
// Call only for keep-alive stages
def closeKeptAliveStageIfNeeded(stageId: Int): Unit =
if (stageId != Boundary && shutdownCounter(stageId) == 1) {
shutdownCounter(stageId) = 0
}
private def finalizeStage(logic: GraphStageLogic): Unit = {
try {
logic.postStop()