From 5ea2e0536c8fb5f3e5b3105bb4249bf5920020c8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johan=20Andre=CC=81n?= Date: Fri, 10 Mar 2017 19:08:08 +0100 Subject: [PATCH] Remove superflous ownerId fields in interpreter Connection #22447 --- .../impl/fusing/GraphInterpreterSpecKit.scala | 4 ---- .../impl/PhasedFusingActorMaterializer.scala | 9 ++------ .../stream/impl/fusing/GraphInterpreter.scala | 21 +++++++------------ 3 files changed, 10 insertions(+), 24 deletions(-) diff --git a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/GraphInterpreterSpecKit.scala b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/GraphInterpreterSpecKit.scala index 4d046faecf..0db17fc20a 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/GraphInterpreterSpecKit.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/GraphInterpreterSpecKit.scala @@ -110,11 +110,9 @@ object GraphInterpreterSpecKit { val connection = new Connection( id = idx, - outOwnerId = outOwner.stageId, outOwner = outOwner, outHandler = outOwner.outHandler(0), inOwner = inOwner, - inOwnerId = inOwner.stageId, inHandler = inOwner.inHandler(0) ) @@ -143,10 +141,8 @@ object GraphInterpreterSpecKit { val connection = new Connection( id = idx, - outOwnerId = outOwner.stageId, outOwner = outOwner, outHandler = outOwner.outHandler(outlet.id), - inOwnerId = inOwner.stageId, inOwner = inOwner, inHandler = inOwner.inHandler(inlet.id) ) diff --git a/akka-stream/src/main/scala/akka/stream/impl/PhasedFusingActorMaterializer.scala b/akka-stream/src/main/scala/akka/stream/impl/PhasedFusingActorMaterializer.scala index 472b27bd4a..282b14c179 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/PhasedFusingActorMaterializer.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/PhasedFusingActorMaterializer.scala @@ -616,14 +616,14 @@ final class GraphStageIsland( val c = connections(slot) if (c ne null) c else { - val c2 = new Connection(0, 0, null, 0, null, null, null) + val c2 = new Connection(0, null, null, null, null) connections(slot) = c2 c2 } } def outConn(): Connection = { - val connection = new Connection(0, 0, null, 0, null, null, null) + val connection = new Connection(0, null, null, null, null) outConnections ::= connection connection } @@ -632,7 +632,6 @@ final class GraphStageIsland( val connection = conn(slot) connection.inOwner = logic connection.id = slot - connection.inOwnerId = logic.stageId connection.inHandler = logic.handlers(in.id).asInstanceOf[InHandler] logic.portToConn(in.id) = connection } @@ -641,7 +640,6 @@ final class GraphStageIsland( val connection = conn(slot) connection.outOwner = logic connection.id = slot - connection.outOwnerId = logic.stageId connection.outHandler = logic.handlers(logic.inCount + out.id).asInstanceOf[OutHandler] logic.portToConn(logic.inCount + out.id) = connection } @@ -655,11 +653,9 @@ final class GraphStageIsland( boundary.portToConn(boundary.in.id) = connection connection.inHandler = boundary.handlers(0).asInstanceOf[InHandler] connection.inOwner = boundary - connection.inOwnerId = boundary.stageId connection.outOwner = logic connection.id = -1 // Will be filled later - connection.outOwnerId = logic.stageId connection.outHandler = logic.handlers(logic.inCount + out.id).asInstanceOf[OutHandler] logic.portToConn(logic.inCount + out.id) = connection @@ -678,7 +674,6 @@ final class GraphStageIsland( boundary.portToConn(boundary.out.id + boundary.inCount) = connection connection.outHandler = boundary.handlers(0).asInstanceOf[OutHandler] connection.outOwner = boundary - connection.outOwnerId = boundary.stageId } override def onIslandReady(): Unit = { diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphInterpreter.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphInterpreter.scala index c1625bc533..978a728692 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphInterpreter.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphInterpreter.scala @@ -65,18 +65,14 @@ object GraphInterpreter { * between an output and input ports. * * @param id Identifier of the connection. - * @param inOwnerId Identifier of the owner of the input side of the connection. * @param inOwner The stage logic that corresponds to the input side of the connection. - * @param outOwnerId Identifier of the owner of the output side of the connection. * @param outOwner The stage logic that corresponds to the output side of the connection. * @param inHandler The handler that contains the callback for input events. * @param outHandler The handler that contains the callback for output events. */ final class Connection( var id: Int, - var inOwnerId: Int, var inOwner: GraphStageLogic, - var outOwnerId: Int, var outOwner: GraphStageLogic, var inHandler: InHandler, var outHandler: OutHandler) { @@ -84,7 +80,7 @@ object GraphInterpreter { var slot: Any = Empty override def toString = - if (GraphInterpreter.Debug) s"Connection($id, $inOwnerId, $inOwner, $outOwnerId, $outOwner, $inHandler, $outHandler, $portState, $slot)" + if (GraphInterpreter.Debug) s"Connection($id, $inOwner, $outOwner, $inHandler, $outHandler, $portState, $slot)" else s"Connection($id, $portState, $slot, $inHandler, $outHandler)" } @@ -121,7 +117,6 @@ object GraphInterpreter { * From an external viewpoint, the GraphInterpreter takes an assembly of graph processing stages encoded as a * [[GraphInterpreter#GraphAssembly]] object and provides facilities to execute and interact with this assembly. * The lifecycle of the Interpreter is roughly the following: - * - Boundary logics are attached via [[attachDownstreamBoundary()]] and [[attachUpstreamBoundary()]] * - [[init()]] is called * - [[execute()]] is called whenever there is need for execution, providing an upper limit on the processed events * - [[finish()]] is called before the interpreter is disposed, preferably after [[isCompleted]] returned true, although @@ -323,10 +318,10 @@ final class GraphInterpreter( private def outOwnerName(connection: Connection): String = connection.outOwner.toString // Debug name for a connections input part - private def inLogicName(connection: Connection): String = logics(connection.inOwnerId).toString + private def inLogicName(connection: Connection): String = logics(connection.inOwner.stageId).toString // Debug name for a connections output part - private def outLogicName(connection: Connection): String = logics(connection.outOwnerId).toString + private def outLogicName(connection: Connection): String = logics(connection.outOwner.stageId).toString private def shutdownCounters: String = shutdownCounter.map(x ⇒ if (x >= KeepGoingFlag) s"${x & KeepGoingMask}(KeepGoing)" else x.toString).mkString(",") @@ -476,7 +471,7 @@ final class GraphInterpreter( activeStage = connection.outOwner if (Debug) println(s"$Name CANCEL ${inOwnerName(connection)} -> ${outOwnerName(connection)} (${connection.outHandler}) [${outLogicName(connection)}]") connection.portState |= OutClosed - completeConnection(connection.outOwnerId) + completeConnection(connection.outOwner.stageId) connection.outHandler.onDownstreamFinish() } else if ((code & (OutClosed | InClosed)) == OutClosed) { // COMPLETIONS @@ -486,7 +481,7 @@ final class GraphInterpreter( if (Debug) println(s"$Name COMPLETE ${outOwnerName(connection)} -> ${inOwnerName(connection)} (${connection.inHandler}) [${inLogicName(connection)}]") connection.portState |= InClosed activeStage = connection.inOwner - completeConnection(connection.inOwnerId) + completeConnection(connection.inOwner.stageId) if ((connection.portState & InFailed) == 0) connection.inHandler.onUpstreamFinish() else connection.inHandler.onUpstreamFailure(connection.slot.asInstanceOf[Failed].ex) } else { @@ -587,7 +582,7 @@ final class GraphInterpreter( enqueue(connection) } else if ((currentState & (InClosed | Pushing | Pulling | OutClosed)) == 0) enqueue(connection) - if ((currentState & OutClosed) == 0) completeConnection(connection.outOwnerId) + if ((currentState & OutClosed) == 0) completeConnection(connection.outOwner.stageId) } private[stream] def fail(connection: Connection, ex: Throwable): Unit = { @@ -605,7 +600,7 @@ final class GraphInterpreter( enqueue(connection) } } - if ((currentState & OutClosed) == 0) completeConnection(connection.outOwnerId) + if ((currentState & OutClosed) == 0) completeConnection(connection.outOwner.stageId) } private[stream] def cancel(connection: Connection): Unit = { @@ -622,7 +617,7 @@ final class GraphInterpreter( enqueue(connection) } } - if ((currentState & InClosed) == 0) completeConnection(connection.inOwnerId) + if ((currentState & InClosed) == 0) completeConnection(connection.inOwner.stageId) } /**