diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphInterpreter.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphInterpreter.scala index da05746a19..919b3018cb 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphInterpreter.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphInterpreter.scala @@ -650,7 +650,7 @@ final class GraphInterpreter( elem } - private def enqueue(connection: Int): Unit = { + def enqueue(connection: Int): Unit = { if (Debug) if (queueTail - queueHead > mask) new Exception(s"$Name internal queue full ($queueStatus) + $connection").printStackTrace() eventQueue(queueTail & mask) = connection queueTail += 1 @@ -688,23 +688,6 @@ final class GraphInterpreter( } } - private[stream] def push(connection: Int, elem: Any): Unit = { - val currentState = portStates(connection) - portStates(connection) = currentState ^ PushStartFlip - if ((currentState & InClosed) == 0) { - connectionSlots(connection) = elem - enqueue(connection) - } - } - - private[stream] def pull(connection: Int): Unit = { - val currentState = portStates(connection) - portStates(connection) = currentState ^ PullStartFlip - if ((currentState & OutClosed) == 0) { - enqueue(connection) - } - } - private[stream] def complete(connection: Int): Unit = { val currentState = portStates(connection) if (Debug) println(s"$Name complete($connection) [$currentState]") diff --git a/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala b/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala index a66f8fe2d5..4b46f87f11 100644 --- a/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala +++ b/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala @@ -340,12 +340,21 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount: * query whether pull is allowed to be called or not. This method will also fail if the port is already closed. */ final protected def pull[T](in: Inlet[T]): Unit = { - if ((interpreter.portStates(conn(in)) & (InReady | InClosed)) == InReady) { - interpreter.pull(conn(in)) + val connection = conn(in) + val portState = interpreter.portStates(connection) + val it = interpreter + + if ((portState & (InReady | InClosed | OutClosed)) == InReady) { + it.portStates(connection) = portState ^ PullStartFlip + it.enqueue(connection) } else { // Detailed error information should not add overhead to the hot path require(!isClosed(in), s"Cannot pull closed port ($in)") require(!hasBeenPulled(in), s"Cannot pull port ($in) twice") + + // There were no errors, the pull was simply ignored as the target stage already closed its port. We + // still need to track proper state though. + it.portStates(connection) = portState ^ PullStartFlip } } @@ -371,18 +380,19 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount: */ final protected def grab[T](in: Inlet[T]): T = { val connection = conn(in) + val it = interpreter + val elem = it.connectionSlots(connection) + // Fast path - if ((interpreter.portStates(connection) & (InReady | InFailed)) == InReady && - (interpreter.connectionSlots(connection).asInstanceOf[AnyRef] ne Empty)) { - val elem = interpreter.connectionSlots(connection) - interpreter.connectionSlots(connection) = Empty + if ((it.portStates(connection) & (InReady | InFailed)) == InReady && (elem.asInstanceOf[AnyRef] ne Empty)) { + it.connectionSlots(connection) = Empty elem.asInstanceOf[T] } else { // Slow path require(isAvailable(in), s"Cannot get element from already empty input port ($in)") - val failed = interpreter.connectionSlots(connection).asInstanceOf[Failed] + val failed = it.connectionSlots(connection).asInstanceOf[Failed] val elem = failed.previousElem.asInstanceOf[T] - interpreter.connectionSlots(connection) = Failed(failed.ex, Empty) + it.connectionSlots(connection) = Failed(failed.ex, Empty) elem } } @@ -428,13 +438,26 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount: * used to check if the port is ready to be pushed or not. */ final protected def push[T](out: Outlet[T], elem: T): Unit = { - if ((interpreter.portStates(conn(out)) & (OutReady | OutClosed)) == OutReady && (elem != null)) { - interpreter.push(conn(out), elem) + val connection = conn(out) + val portState = interpreter.portStates(connection) + val it = interpreter + + it.portStates(connection) = portState ^ PushStartFlip + + if ((portState & (OutReady | OutClosed | InClosed)) == OutReady && (elem != null)) { + it.connectionSlots(connection) = elem + it.enqueue(connection) } else { + // Restore state for the error case + it.portStates(connection) = portState + // Detailed error information should not add overhead to the hot path ReactiveStreamsCompliance.requireNonNullElement(elem) require(isAvailable(out), s"Cannot push port ($out) twice") require(!isClosed(out), s"Cannot pull closed port ($out)") + + // No error, just InClosed caused the actual pull to be ignored, but the status flag still needs to be flipped + it.portStates(connection) = portState ^ PushStartFlip } }