=str: GraphStage improvements

This commit is contained in:
Endre Sándor Varga 2015-09-17 10:08:12 +02:00
parent 170f24eadc
commit 174589f2dc
6 changed files with 112 additions and 147 deletions

View file

@ -255,6 +255,7 @@ private[stream] final class GraphInterpreter(
def init(): Unit = {
var i = 0
while (i < logics.length) {
logics(i).stageId = i
logics(i).preStart()
i += 1
}
@ -373,7 +374,7 @@ private[stream] final class GraphInterpreter(
def isConnectionCompleted(connection: Int): Boolean = connectionStates(connection).isInstanceOf[CompletedState]
// Returns true if the given stage is alredy completed
private def isStageCompleted(stageId: Int): Boolean = stageId != Boundary && shutdownCounter(stageId) == 0
def isStageCompleted(stageId: Int): Boolean = stageId != Boundary && shutdownCounter(stageId) == 0
private def isPushInFlight(connection: Int): Boolean =
!inAvailable(connection) &&