From 97d915069248ead8f42f4abfbeacff1ea3559c13 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Endre=20S=C3=A1ndor=20Varga?= Date: Fri, 3 Jul 2015 13:42:44 +0200 Subject: [PATCH 1/2] =str #17854: fullClose should not Abort --- .../stream/impl/io/TcpConnectionStream.scala | 48 +++++++++---------- 1 file changed, 22 insertions(+), 26 deletions(-) diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/TcpConnectionStream.scala b/akka-stream/src/main/scala/akka/stream/impl/io/TcpConnectionStream.scala index 47df4ecce2..b826e8d4a9 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/io/TcpConnectionStream.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/TcpConnectionStream.scala @@ -75,10 +75,10 @@ private[akka] abstract class TcpStreamActor(val settings: ActorMaterializerSetti readPump.pump() } case ConfirmedClosed ⇒ - cancelWithoutTcpClose() + cancel() readPump.pump() case PeerClosed ⇒ - cancelWithoutTcpClose() + cancel() readPump.pump() } @@ -90,19 +90,7 @@ private[akka] abstract class TcpStreamActor(val settings: ActorMaterializerSetti if (!closed) { closed = true pendingElement = null - if (connection ne null) { - if (tcpOutputs.isClosed) - connection ! Abort - else - connection ! ResumeReading - } - } - } - - def cancelWithoutTcpClose(): Unit = { - if (!closed) { - closed = true - pendingElement = null + if (!tcpOutputs.isFlushed && (connection ne null)) connection ! ResumeReading } } @@ -154,17 +142,25 @@ private[akka] abstract class TcpStreamActor(val settings: ActorMaterializerSetti override def complete(): Unit = { if (!closed && initialized) { closed = true - if (tcpInputs.isClosed && (halfClose || lastWriteAcked)) { - // We can immediately close if - // - half close mode, and read size already finished - // - full close mode, and last write has been acked - // - // if in full close mode, and has a non-acknowledged write, we will do the closing in handleWrite - // when the Ack arrives - connection ! Close - tryShutdown() - } else - connection ! ConfirmedClose + + if (halfClose) { + if (tcpInputs.isClosed) { + // Reading has stopped, either because of cancel, or PeerClosed, just Close now + connection ! Close + tryShutdown() + } + else { + // We still read, so we only close the write side + connection ! ConfirmedClose + } + } else { + if (lastWriteAcked) { + // No pending writes, close now + connection ! Close + tryShutdown() + } + // Else wait for final Ack (see handleWrite) + } } } From afa4c5c5636a16ed11702d07a8da65a0b547d18b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Endre=20S=C3=A1ndor=20Varga?= Date: Tue, 7 Jul 2015 13:51:48 +0200 Subject: [PATCH 2/2] Fixed wrong test-case: - there is no "full-close" if client is still writing, it will always RST (now using lazyEmpty) - must Await for bind, otherwise there is a race between connect and bind --- .../test/scala/akka/stream/io/TcpSpec.scala | 26 +++++++++++-------- .../stream/impl/io/TcpConnectionStream.scala | 3 +-- 2 files changed, 16 insertions(+), 13 deletions(-) diff --git a/akka-stream-tests/src/test/scala/akka/stream/io/TcpSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/io/TcpSpec.scala index cca24bb8d5..00a0899095 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/io/TcpSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/io/TcpSpec.scala @@ -346,17 +346,19 @@ class TcpSpec extends AkkaSpec("akka.io.tcp.windows-connection-abort-workaround- val writeButIgnoreRead: Flow[ByteString, ByteString, Unit] = Flow.wrap(Sink.ignore, Source.single(ByteString("Early response")))(Keep.right) - val binding = Tcp() - .bind(serverAddress.getHostName, serverAddress.getPort, halfClose = false) - .toMat(Sink.foreach(_.flow.join(writeButIgnoreRead).run()))(Keep.left).run() + val binding = + Await.result( + Tcp().bind(serverAddress.getHostName, serverAddress.getPort, halfClose = false).toMat(Sink.foreach { conn ⇒ + conn.flow.join(writeButIgnoreRead).run() + })(Keep.left).run(), 3.seconds) - val result = Source.repeat(ByteString("client data")) + val result = Source.lazyEmpty[ByteString] .via(Tcp().outgoingConnection(serverAddress.getHostName, serverAddress.getPort)) .runFold(ByteString.empty)(_ ++ _) - val r: ByteString = Await.result(result, 3.seconds) - r should ===(ByteString("Early response")) - binding.foreach(_.unbind()) + Await.result(result, 3.seconds) should ===(ByteString("Early response")) + + binding.unbind() } "Echo should work even if server is in full close mode" in { @@ -364,9 +366,11 @@ class TcpSpec extends AkkaSpec("akka.io.tcp.windows-connection-abort-workaround- val serverAddress = temporaryServerAddress() - val binding = Tcp().bind(serverAddress.getHostName, serverAddress.getPort, halfClose = false).toMat(Sink.foreach { conn ⇒ - conn.flow.join(Flow[ByteString]).run() - })(Keep.left).run() + val binding = + Await.result( + Tcp().bind(serverAddress.getHostName, serverAddress.getPort, halfClose = false).toMat(Sink.foreach { conn ⇒ + conn.flow.join(Flow[ByteString]).run() + })(Keep.left).run(), 3.seconds) val result = Source(immutable.Iterable.fill(10000)(ByteString(0))) .via(Tcp().outgoingConnection(serverAddress, halfClose = true)) @@ -374,7 +378,7 @@ class TcpSpec extends AkkaSpec("akka.io.tcp.windows-connection-abort-workaround- Await.result(result, 3.seconds) should ===(10000) - binding.foreach(_.unbind()) + binding.unbind() } "handle when connection actor terminates unexpectedly" in { diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/TcpConnectionStream.scala b/akka-stream/src/main/scala/akka/stream/impl/io/TcpConnectionStream.scala index b826e8d4a9..a053f212a1 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/io/TcpConnectionStream.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/TcpConnectionStream.scala @@ -148,8 +148,7 @@ private[akka] abstract class TcpStreamActor(val settings: ActorMaterializerSetti // Reading has stopped, either because of cancel, or PeerClosed, just Close now connection ! Close tryShutdown() - } - else { + } else { // We still read, so we only close the write side connection ! ConfirmedClose }