diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphInterpreter.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphInterpreter.scala index e17da07184..70fc0c7c8e 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphInterpreter.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphInterpreter.scala @@ -668,10 +668,15 @@ import akka.stream.snapshot._ logicSnapshots(logicIndexes(connection.inOwner)), logicSnapshots(logicIndexes(connection.outOwner)), connection.portState match { - case InReady => ConnectionSnapshot.ShouldPull - case OutReady => ConnectionSnapshot.ShouldPush - case x if (x | InClosed | OutClosed) == (InClosed | OutClosed) => + case InReady => ConnectionSnapshot.ShouldPull + case OutReady => ConnectionSnapshot.ShouldPush + case x if (x & (InClosed | OutClosed)) == (InClosed | OutClosed) => + // At least one side of the connection is closed: we show it as closed ConnectionSnapshot.Closed + case _ => + // This should not be possible: connection alive and both push and pull enqueued but not received + throw new IllegalStateException(s"Unexpected connection state for $connection: ${connection.portState}") + }) }