Remove superflous ownerId fields in interpreter Connection #22447
This commit is contained in:
parent
205a538df3
commit
5ea2e0536c
3 changed files with 10 additions and 24 deletions
|
|
@ -110,11 +110,9 @@ object GraphInterpreterSpecKit {
|
||||||
|
|
||||||
val connection = new Connection(
|
val connection = new Connection(
|
||||||
id = idx,
|
id = idx,
|
||||||
outOwnerId = outOwner.stageId,
|
|
||||||
outOwner = outOwner,
|
outOwner = outOwner,
|
||||||
outHandler = outOwner.outHandler(0),
|
outHandler = outOwner.outHandler(0),
|
||||||
inOwner = inOwner,
|
inOwner = inOwner,
|
||||||
inOwnerId = inOwner.stageId,
|
|
||||||
inHandler = inOwner.inHandler(0)
|
inHandler = inOwner.inHandler(0)
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
@ -143,10 +141,8 @@ object GraphInterpreterSpecKit {
|
||||||
|
|
||||||
val connection = new Connection(
|
val connection = new Connection(
|
||||||
id = idx,
|
id = idx,
|
||||||
outOwnerId = outOwner.stageId,
|
|
||||||
outOwner = outOwner,
|
outOwner = outOwner,
|
||||||
outHandler = outOwner.outHandler(outlet.id),
|
outHandler = outOwner.outHandler(outlet.id),
|
||||||
inOwnerId = inOwner.stageId,
|
|
||||||
inOwner = inOwner,
|
inOwner = inOwner,
|
||||||
inHandler = inOwner.inHandler(inlet.id)
|
inHandler = inOwner.inHandler(inlet.id)
|
||||||
)
|
)
|
||||||
|
|
|
||||||
|
|
@ -616,14 +616,14 @@ final class GraphStageIsland(
|
||||||
val c = connections(slot)
|
val c = connections(slot)
|
||||||
if (c ne null) c
|
if (c ne null) c
|
||||||
else {
|
else {
|
||||||
val c2 = new Connection(0, 0, null, 0, null, null, null)
|
val c2 = new Connection(0, null, null, null, null)
|
||||||
connections(slot) = c2
|
connections(slot) = c2
|
||||||
c2
|
c2
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
def outConn(): Connection = {
|
def outConn(): Connection = {
|
||||||
val connection = new Connection(0, 0, null, 0, null, null, null)
|
val connection = new Connection(0, null, null, null, null)
|
||||||
outConnections ::= connection
|
outConnections ::= connection
|
||||||
connection
|
connection
|
||||||
}
|
}
|
||||||
|
|
@ -632,7 +632,6 @@ final class GraphStageIsland(
|
||||||
val connection = conn(slot)
|
val connection = conn(slot)
|
||||||
connection.inOwner = logic
|
connection.inOwner = logic
|
||||||
connection.id = slot
|
connection.id = slot
|
||||||
connection.inOwnerId = logic.stageId
|
|
||||||
connection.inHandler = logic.handlers(in.id).asInstanceOf[InHandler]
|
connection.inHandler = logic.handlers(in.id).asInstanceOf[InHandler]
|
||||||
logic.portToConn(in.id) = connection
|
logic.portToConn(in.id) = connection
|
||||||
}
|
}
|
||||||
|
|
@ -641,7 +640,6 @@ final class GraphStageIsland(
|
||||||
val connection = conn(slot)
|
val connection = conn(slot)
|
||||||
connection.outOwner = logic
|
connection.outOwner = logic
|
||||||
connection.id = slot
|
connection.id = slot
|
||||||
connection.outOwnerId = logic.stageId
|
|
||||||
connection.outHandler = logic.handlers(logic.inCount + out.id).asInstanceOf[OutHandler]
|
connection.outHandler = logic.handlers(logic.inCount + out.id).asInstanceOf[OutHandler]
|
||||||
logic.portToConn(logic.inCount + out.id) = connection
|
logic.portToConn(logic.inCount + out.id) = connection
|
||||||
}
|
}
|
||||||
|
|
@ -655,11 +653,9 @@ final class GraphStageIsland(
|
||||||
boundary.portToConn(boundary.in.id) = connection
|
boundary.portToConn(boundary.in.id) = connection
|
||||||
connection.inHandler = boundary.handlers(0).asInstanceOf[InHandler]
|
connection.inHandler = boundary.handlers(0).asInstanceOf[InHandler]
|
||||||
connection.inOwner = boundary
|
connection.inOwner = boundary
|
||||||
connection.inOwnerId = boundary.stageId
|
|
||||||
|
|
||||||
connection.outOwner = logic
|
connection.outOwner = logic
|
||||||
connection.id = -1 // Will be filled later
|
connection.id = -1 // Will be filled later
|
||||||
connection.outOwnerId = logic.stageId
|
|
||||||
connection.outHandler = logic.handlers(logic.inCount + out.id).asInstanceOf[OutHandler]
|
connection.outHandler = logic.handlers(logic.inCount + out.id).asInstanceOf[OutHandler]
|
||||||
logic.portToConn(logic.inCount + out.id) = connection
|
logic.portToConn(logic.inCount + out.id) = connection
|
||||||
|
|
||||||
|
|
@ -678,7 +674,6 @@ final class GraphStageIsland(
|
||||||
boundary.portToConn(boundary.out.id + boundary.inCount) = connection
|
boundary.portToConn(boundary.out.id + boundary.inCount) = connection
|
||||||
connection.outHandler = boundary.handlers(0).asInstanceOf[OutHandler]
|
connection.outHandler = boundary.handlers(0).asInstanceOf[OutHandler]
|
||||||
connection.outOwner = boundary
|
connection.outOwner = boundary
|
||||||
connection.outOwnerId = boundary.stageId
|
|
||||||
}
|
}
|
||||||
|
|
||||||
override def onIslandReady(): Unit = {
|
override def onIslandReady(): Unit = {
|
||||||
|
|
|
||||||
|
|
@ -65,18 +65,14 @@ object GraphInterpreter {
|
||||||
* between an output and input ports.
|
* between an output and input ports.
|
||||||
*
|
*
|
||||||
* @param id Identifier of the connection.
|
* @param id Identifier of the connection.
|
||||||
* @param inOwnerId Identifier of the owner of the input side of the connection.
|
|
||||||
* @param inOwner The stage logic that corresponds to the input side of the connection.
|
* @param inOwner The stage logic that corresponds to the input side of the connection.
|
||||||
* @param outOwnerId Identifier of the owner of the output side of the connection.
|
|
||||||
* @param outOwner The stage logic that corresponds to the output side of the connection.
|
* @param outOwner The stage logic that corresponds to the output side of the connection.
|
||||||
* @param inHandler The handler that contains the callback for input events.
|
* @param inHandler The handler that contains the callback for input events.
|
||||||
* @param outHandler The handler that contains the callback for output events.
|
* @param outHandler The handler that contains the callback for output events.
|
||||||
*/
|
*/
|
||||||
final class Connection(
|
final class Connection(
|
||||||
var id: Int,
|
var id: Int,
|
||||||
var inOwnerId: Int,
|
|
||||||
var inOwner: GraphStageLogic,
|
var inOwner: GraphStageLogic,
|
||||||
var outOwnerId: Int,
|
|
||||||
var outOwner: GraphStageLogic,
|
var outOwner: GraphStageLogic,
|
||||||
var inHandler: InHandler,
|
var inHandler: InHandler,
|
||||||
var outHandler: OutHandler) {
|
var outHandler: OutHandler) {
|
||||||
|
|
@ -84,7 +80,7 @@ object GraphInterpreter {
|
||||||
var slot: Any = Empty
|
var slot: Any = Empty
|
||||||
|
|
||||||
override def toString =
|
override def toString =
|
||||||
if (GraphInterpreter.Debug) s"Connection($id, $inOwnerId, $inOwner, $outOwnerId, $outOwner, $inHandler, $outHandler, $portState, $slot)"
|
if (GraphInterpreter.Debug) s"Connection($id, $inOwner, $outOwner, $inHandler, $outHandler, $portState, $slot)"
|
||||||
else s"Connection($id, $portState, $slot, $inHandler, $outHandler)"
|
else s"Connection($id, $portState, $slot, $inHandler, $outHandler)"
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -121,7 +117,6 @@ object GraphInterpreter {
|
||||||
* From an external viewpoint, the GraphInterpreter takes an assembly of graph processing stages encoded as a
|
* From an external viewpoint, the GraphInterpreter takes an assembly of graph processing stages encoded as a
|
||||||
* [[GraphInterpreter#GraphAssembly]] object and provides facilities to execute and interact with this assembly.
|
* [[GraphInterpreter#GraphAssembly]] object and provides facilities to execute and interact with this assembly.
|
||||||
* The lifecycle of the Interpreter is roughly the following:
|
* The lifecycle of the Interpreter is roughly the following:
|
||||||
* - Boundary logics are attached via [[attachDownstreamBoundary()]] and [[attachUpstreamBoundary()]]
|
|
||||||
* - [[init()]] is called
|
* - [[init()]] is called
|
||||||
* - [[execute()]] is called whenever there is need for execution, providing an upper limit on the processed events
|
* - [[execute()]] is called whenever there is need for execution, providing an upper limit on the processed events
|
||||||
* - [[finish()]] is called before the interpreter is disposed, preferably after [[isCompleted]] returned true, although
|
* - [[finish()]] is called before the interpreter is disposed, preferably after [[isCompleted]] returned true, although
|
||||||
|
|
@ -323,10 +318,10 @@ final class GraphInterpreter(
|
||||||
private def outOwnerName(connection: Connection): String = connection.outOwner.toString
|
private def outOwnerName(connection: Connection): String = connection.outOwner.toString
|
||||||
|
|
||||||
// Debug name for a connections input part
|
// Debug name for a connections input part
|
||||||
private def inLogicName(connection: Connection): String = logics(connection.inOwnerId).toString
|
private def inLogicName(connection: Connection): String = logics(connection.inOwner.stageId).toString
|
||||||
|
|
||||||
// Debug name for a connections output part
|
// Debug name for a connections output part
|
||||||
private def outLogicName(connection: Connection): String = logics(connection.outOwnerId).toString
|
private def outLogicName(connection: Connection): String = logics(connection.outOwner.stageId).toString
|
||||||
|
|
||||||
private def shutdownCounters: String =
|
private def shutdownCounters: String =
|
||||||
shutdownCounter.map(x ⇒ if (x >= KeepGoingFlag) s"${x & KeepGoingMask}(KeepGoing)" else x.toString).mkString(",")
|
shutdownCounter.map(x ⇒ if (x >= KeepGoingFlag) s"${x & KeepGoingMask}(KeepGoing)" else x.toString).mkString(",")
|
||||||
|
|
@ -476,7 +471,7 @@ final class GraphInterpreter(
|
||||||
activeStage = connection.outOwner
|
activeStage = connection.outOwner
|
||||||
if (Debug) println(s"$Name CANCEL ${inOwnerName(connection)} -> ${outOwnerName(connection)} (${connection.outHandler}) [${outLogicName(connection)}]")
|
if (Debug) println(s"$Name CANCEL ${inOwnerName(connection)} -> ${outOwnerName(connection)} (${connection.outHandler}) [${outLogicName(connection)}]")
|
||||||
connection.portState |= OutClosed
|
connection.portState |= OutClosed
|
||||||
completeConnection(connection.outOwnerId)
|
completeConnection(connection.outOwner.stageId)
|
||||||
connection.outHandler.onDownstreamFinish()
|
connection.outHandler.onDownstreamFinish()
|
||||||
} else if ((code & (OutClosed | InClosed)) == OutClosed) {
|
} else if ((code & (OutClosed | InClosed)) == OutClosed) {
|
||||||
// COMPLETIONS
|
// COMPLETIONS
|
||||||
|
|
@ -486,7 +481,7 @@ final class GraphInterpreter(
|
||||||
if (Debug) println(s"$Name COMPLETE ${outOwnerName(connection)} -> ${inOwnerName(connection)} (${connection.inHandler}) [${inLogicName(connection)}]")
|
if (Debug) println(s"$Name COMPLETE ${outOwnerName(connection)} -> ${inOwnerName(connection)} (${connection.inHandler}) [${inLogicName(connection)}]")
|
||||||
connection.portState |= InClosed
|
connection.portState |= InClosed
|
||||||
activeStage = connection.inOwner
|
activeStage = connection.inOwner
|
||||||
completeConnection(connection.inOwnerId)
|
completeConnection(connection.inOwner.stageId)
|
||||||
if ((connection.portState & InFailed) == 0) connection.inHandler.onUpstreamFinish()
|
if ((connection.portState & InFailed) == 0) connection.inHandler.onUpstreamFinish()
|
||||||
else connection.inHandler.onUpstreamFailure(connection.slot.asInstanceOf[Failed].ex)
|
else connection.inHandler.onUpstreamFailure(connection.slot.asInstanceOf[Failed].ex)
|
||||||
} else {
|
} else {
|
||||||
|
|
@ -587,7 +582,7 @@ final class GraphInterpreter(
|
||||||
enqueue(connection)
|
enqueue(connection)
|
||||||
} else if ((currentState & (InClosed | Pushing | Pulling | OutClosed)) == 0) enqueue(connection)
|
} else if ((currentState & (InClosed | Pushing | Pulling | OutClosed)) == 0) enqueue(connection)
|
||||||
|
|
||||||
if ((currentState & OutClosed) == 0) completeConnection(connection.outOwnerId)
|
if ((currentState & OutClosed) == 0) completeConnection(connection.outOwner.stageId)
|
||||||
}
|
}
|
||||||
|
|
||||||
private[stream] def fail(connection: Connection, ex: Throwable): Unit = {
|
private[stream] def fail(connection: Connection, ex: Throwable): Unit = {
|
||||||
|
|
@ -605,7 +600,7 @@ final class GraphInterpreter(
|
||||||
enqueue(connection)
|
enqueue(connection)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if ((currentState & OutClosed) == 0) completeConnection(connection.outOwnerId)
|
if ((currentState & OutClosed) == 0) completeConnection(connection.outOwner.stageId)
|
||||||
}
|
}
|
||||||
|
|
||||||
private[stream] def cancel(connection: Connection): Unit = {
|
private[stream] def cancel(connection: Connection): Unit = {
|
||||||
|
|
@ -622,7 +617,7 @@ final class GraphInterpreter(
|
||||||
enqueue(connection)
|
enqueue(connection)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if ((currentState & InClosed) == 0) completeConnection(connection.inOwnerId)
|
if ((currentState & InClosed) == 0) completeConnection(connection.inOwner.stageId)
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue