!str,htc remove FlexiMerge/Route

- replace all occurrences with equivalent GraphStage implementations

This commit introduces a mini-DSL for GraphStage that allows emitting or
reading multiple elements to/from a port with one statement, installing
stateful handlers on the port to make it work. The emitting side allows
stacked continuations, meaning that while an emit() is ongoing (waiting
for demand) another one can be added to the queue; this allows
convenient formualation of merge-type stages.
This commit is contained in:
Roland Kuhn 2015-10-21 17:52:11 +02:00
parent dc07fd250c
commit 02810cfa64
45 changed files with 1001 additions and 3227 deletions

View file

@ -145,12 +145,20 @@ private[stream] object GraphInterpreter {
i = 0
while (i < connectionCount) {
if (ins(i) ne null) {
inHandlers(i) = logics(inOwners(i)).inHandlers(ins(i).id)
logics(inOwners(i)).inToConn(ins(i).id) = i
val l = logics(inOwners(i))
l.inHandlers(ins(i).id) match {
case null throw new IllegalStateException(s"no handler defined in stage $l for port ${ins(i)}")
case h inHandlers(i) = h
}
l.inToConn(ins(i).id) = i
}
if (outs(i) ne null) {
outHandlers(i) = logics(outOwners(i)).outHandlers(outs(i).id)
logics(outOwners(i)).outToConn(outs(i).id) = i
val l = logics(outOwners(i))
l.outHandlers(outs(i).id) match {
case null throw new IllegalStateException(s"no handler defined in stage $l for port ${outs(i)}")
case h outHandlers(i) = h
}
l.outToConn(outs(i).id) = i
}
i += 1
}
@ -301,6 +309,22 @@ private[stream] final class GraphInterpreter(
inHandlers(connection) = logic.inHandlers(0)
}
/**
* Dynamic handler changes are communicated from a GraphStageLogic by this method.
*/
def setHandler(connection: Int, handler: InHandler): Unit = {
if (GraphInterpreter.Debug) println(s"SETHANDLER ${inOwnerName(connection)} (in) $handler")
inHandlers(connection) = handler
}
/**
* Dynamic handler changes are communicated from a GraphStageLogic by this method.
*/
def setHandler(connection: Int, handler: OutHandler): Unit = {
if (GraphInterpreter.Debug) println(s"SETHANDLER ${outOwnerName(connection)} (out) $handler")
outHandlers(connection) = handler
}
/**
* Returns true if there are pending unprocessed events in the event queue.
*/
@ -376,7 +400,7 @@ private[stream] final class GraphInterpreter(
private def processEvent(connection: Int): Unit = {
def processElement(elem: Any): Unit = {
if (GraphInterpreter.Debug) println(s"PUSH ${outOwnerName(connection)} -> ${inOwnerName(connection)}, $elem")
if (GraphInterpreter.Debug) println(s"PUSH ${outOwnerName(connection)} -> ${inOwnerName(connection)}, $elem (${inHandlers(connection)})")
activeStageId = assembly.inOwners(connection)
portStates(connection) ^= PushEndFlip
inHandlers(connection).onPush()
@ -391,7 +415,7 @@ private[stream] final class GraphInterpreter(
// PULL
} else if ((code & (Pulling | OutClosed | InClosed)) == Pulling) {
if (GraphInterpreter.Debug) println(s"PULL ${inOwnerName(connection)} -> ${outOwnerName(connection)}")
if (GraphInterpreter.Debug) println(s"PULL ${inOwnerName(connection)} -> ${outOwnerName(connection)} (${outHandlers(connection)})")
portStates(connection) ^= PullEndFlip
activeStageId = assembly.outOwners(connection)
outHandlers(connection).onPull()
@ -399,7 +423,7 @@ private[stream] final class GraphInterpreter(
// CANCEL
} else if ((code & (OutClosed | InClosed)) == InClosed) {
val stageId = assembly.outOwners(connection)
if (GraphInterpreter.Debug) println(s"CANCEL ${inOwnerName(connection)} -> ${outOwnerName(connection)}")
if (GraphInterpreter.Debug) println(s"CANCEL ${inOwnerName(connection)} -> ${outOwnerName(connection)} (${outHandlers(connection)})")
portStates(connection) |= OutClosed
activeStageId = assembly.outOwners(connection)
outHandlers(connection).onDownstreamFinish()
@ -411,7 +435,7 @@ private[stream] final class GraphInterpreter(
if ((code & Pushing) == 0) {
// Normal completion (no push pending)
if (GraphInterpreter.Debug) println(s"COMPLETE ${outOwnerName(connection)} -> ${inOwnerName(connection)}")
if (GraphInterpreter.Debug) println(s"COMPLETE ${outOwnerName(connection)} -> ${inOwnerName(connection)} (${inHandlers(connection)})")
portStates(connection) |= InClosed
activeStageId = assembly.inOwners(connection)
if ((portStates(connection) & InFailed) == 0) inHandlers(connection).onUpstreamFinish()