diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/TcpConnectionStream.scala b/akka-stream/src/main/scala/akka/stream/impl/io/TcpConnectionStream.scala index 47df4ecce2..b826e8d4a9 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/io/TcpConnectionStream.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/TcpConnectionStream.scala @@ -75,10 +75,10 @@ private[akka] abstract class TcpStreamActor(val settings: ActorMaterializerSetti readPump.pump() } case ConfirmedClosed ⇒ - cancelWithoutTcpClose() + cancel() readPump.pump() case PeerClosed ⇒ - cancelWithoutTcpClose() + cancel() readPump.pump() } @@ -90,19 +90,7 @@ private[akka] abstract class TcpStreamActor(val settings: ActorMaterializerSetti if (!closed) { closed = true pendingElement = null - if (connection ne null) { - if (tcpOutputs.isClosed) - connection ! Abort - else - connection ! ResumeReading - } - } - } - - def cancelWithoutTcpClose(): Unit = { - if (!closed) { - closed = true - pendingElement = null + if (!tcpOutputs.isFlushed && (connection ne null)) connection ! ResumeReading } } @@ -154,17 +142,25 @@ private[akka] abstract class TcpStreamActor(val settings: ActorMaterializerSetti override def complete(): Unit = { if (!closed && initialized) { closed = true - if (tcpInputs.isClosed && (halfClose || lastWriteAcked)) { - // We can immediately close if - // - half close mode, and read size already finished - // - full close mode, and last write has been acked - // - // if in full close mode, and has a non-acknowledged write, we will do the closing in handleWrite - // when the Ack arrives - connection ! Close - tryShutdown() - } else - connection ! ConfirmedClose + + if (halfClose) { + if (tcpInputs.isClosed) { + // Reading has stopped, either because of cancel, or PeerClosed, just Close now + connection ! Close + tryShutdown() + } + else { + // We still read, so we only close the write side + connection ! ConfirmedClose + } + } else { + if (lastWriteAcked) { + // No pending writes, close now + connection ! Close + tryShutdown() + } + // Else wait for final Ack (see handleWrite) + } } }