diff --git a/akka-stream-tests/src/test/scala/akka/stream/io/TcpSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/io/TcpSpec.scala index cca24bb8d5..00a0899095 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/io/TcpSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/io/TcpSpec.scala @@ -346,17 +346,19 @@ class TcpSpec extends AkkaSpec("akka.io.tcp.windows-connection-abort-workaround- val writeButIgnoreRead: Flow[ByteString, ByteString, Unit] = Flow.wrap(Sink.ignore, Source.single(ByteString("Early response")))(Keep.right) - val binding = Tcp() - .bind(serverAddress.getHostName, serverAddress.getPort, halfClose = false) - .toMat(Sink.foreach(_.flow.join(writeButIgnoreRead).run()))(Keep.left).run() + val binding = + Await.result( + Tcp().bind(serverAddress.getHostName, serverAddress.getPort, halfClose = false).toMat(Sink.foreach { conn ⇒ + conn.flow.join(writeButIgnoreRead).run() + })(Keep.left).run(), 3.seconds) - val result = Source.repeat(ByteString("client data")) + val result = Source.lazyEmpty[ByteString] .via(Tcp().outgoingConnection(serverAddress.getHostName, serverAddress.getPort)) .runFold(ByteString.empty)(_ ++ _) - val r: ByteString = Await.result(result, 3.seconds) - r should ===(ByteString("Early response")) - binding.foreach(_.unbind()) + Await.result(result, 3.seconds) should ===(ByteString("Early response")) + + binding.unbind() } "Echo should work even if server is in full close mode" in { @@ -364,9 +366,11 @@ class TcpSpec extends AkkaSpec("akka.io.tcp.windows-connection-abort-workaround- val serverAddress = temporaryServerAddress() - val binding = Tcp().bind(serverAddress.getHostName, serverAddress.getPort, halfClose = false).toMat(Sink.foreach { conn ⇒ - conn.flow.join(Flow[ByteString]).run() - })(Keep.left).run() + val binding = + Await.result( + Tcp().bind(serverAddress.getHostName, serverAddress.getPort, halfClose = false).toMat(Sink.foreach { conn ⇒ + conn.flow.join(Flow[ByteString]).run() + })(Keep.left).run(), 3.seconds) val result = Source(immutable.Iterable.fill(10000)(ByteString(0))) .via(Tcp().outgoingConnection(serverAddress, halfClose = true)) @@ -374,7 +378,7 @@ class TcpSpec extends AkkaSpec("akka.io.tcp.windows-connection-abort-workaround- Await.result(result, 3.seconds) should ===(10000) - binding.foreach(_.unbind()) + binding.unbind() } "handle when connection actor terminates unexpectedly" in { diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/TcpConnectionStream.scala b/akka-stream/src/main/scala/akka/stream/impl/io/TcpConnectionStream.scala index 47df4ecce2..a053f212a1 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/io/TcpConnectionStream.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/TcpConnectionStream.scala @@ -75,10 +75,10 @@ private[akka] abstract class TcpStreamActor(val settings: ActorMaterializerSetti readPump.pump() } case ConfirmedClosed ⇒ - cancelWithoutTcpClose() + cancel() readPump.pump() case PeerClosed ⇒ - cancelWithoutTcpClose() + cancel() readPump.pump() } @@ -90,19 +90,7 @@ private[akka] abstract class TcpStreamActor(val settings: ActorMaterializerSetti if (!closed) { closed = true pendingElement = null - if (connection ne null) { - if (tcpOutputs.isClosed) - connection ! Abort - else - connection ! ResumeReading - } - } - } - - def cancelWithoutTcpClose(): Unit = { - if (!closed) { - closed = true - pendingElement = null + if (!tcpOutputs.isFlushed && (connection ne null)) connection ! ResumeReading } } @@ -154,17 +142,24 @@ private[akka] abstract class TcpStreamActor(val settings: ActorMaterializerSetti override def complete(): Unit = { if (!closed && initialized) { closed = true - if (tcpInputs.isClosed && (halfClose || lastWriteAcked)) { - // We can immediately close if - // - half close mode, and read size already finished - // - full close mode, and last write has been acked - // - // if in full close mode, and has a non-acknowledged write, we will do the closing in handleWrite - // when the Ack arrives - connection ! Close - tryShutdown() - } else - connection ! ConfirmedClose + + if (halfClose) { + if (tcpInputs.isClosed) { + // Reading has stopped, either because of cancel, or PeerClosed, just Close now + connection ! Close + tryShutdown() + } else { + // We still read, so we only close the write side + connection ! ConfirmedClose + } + } else { + if (lastWriteAcked) { + // No pending writes, close now + connection ! Close + tryShutdown() + } + // Else wait for final Ack (see handleWrite) + } } }