diff --git a/akka-actor-tests/src/test/scala/akka/io/TcpConnectionSpec.scala b/akka-actor-tests/src/test/scala/akka/io/TcpConnectionSpec.scala index 8f85966503..c56ea9f829 100644 --- a/akka-actor-tests/src/test/scala/akka/io/TcpConnectionSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/io/TcpConnectionSpec.scala @@ -5,7 +5,7 @@ package akka.io import java.io.{ File, IOException } -import java.net.{ URLClassLoader, InetSocketAddress } +import java.net.{ ServerSocket, URLClassLoader, InetSocketAddress } import java.nio.ByteBuffer import java.nio.channels._ import java.nio.channels.spi.SelectorProvider @@ -149,6 +149,7 @@ class TcpConnectionSpec extends AkkaSpec(""" userHandler.send(connectionActor, Register(userHandler.ref)) userHandler.expectMsgType[Received].data.decodeString("ASCII") should be("immediatedata") + ignoreWindowsWorkaroundForTicket15766() interestCallReceiver.expectMsg(OP_READ) } } @@ -449,7 +450,10 @@ class TcpConnectionSpec extends AkkaSpec(""" assertThisConnectionActorTerminated() val buffer = ByteBuffer.allocate(1) - val thrown = evaluating { serverSideChannel.read(buffer) } should produce[IOException] + val thrown = evaluating { + windowsWorkaroundToDetectAbort() + serverSideChannel.read(buffer) + } should produce[IOException] thrown.getMessage should be(ConnectionResetByPeerMessage) } } @@ -599,7 +603,7 @@ class TcpConnectionSpec extends AkkaSpec(""" abortClose(serverSideChannel) writer.send(connectionActor, Write(ByteString("testdata"))) - // bother writer and handler should get the message + // both writer and handler should get the message writer.expectMsgType[ErrorClosed] connectionHandler.expectMsgType[ErrorClosed] @@ -815,6 +819,25 @@ class TcpConnectionSpec extends AkkaSpec(""" writer.expectMsg(works) } } + + "report abort before handler is registered (reproducer from #15033)" in { + // This test needs the OP_CONNECT workaround on Windows, see original report #15033 and parent ticket #15766 + + val bindAddress = new InetSocketAddress(23402) + val serverSocket = new ServerSocket(bindAddress.getPort, 100, bindAddress.getAddress) + val connectionProbe = TestProbe() + + connectionProbe.send(IO(Tcp), Connect(bindAddress)) + + IO(Tcp) ! Connect(bindAddress) + + val socket = serverSocket.accept() + connectionProbe.expectMsgType[Tcp.Connected] + val connectionActor = connectionProbe.sender() + connectionActor ! PoisonPill + verifyActorTermination(connectionActor) + an[IOException] should be thrownBy { socket.getInputStream.read() } + } } /////////////////////////////////////// TEST SUPPORT //////////////////////////////////////////////// @@ -840,6 +863,11 @@ class TcpConnectionSpec extends AkkaSpec(""" var registerCallReceiver = TestProbe() var interestCallReceiver = TestProbe() + def ignoreWindowsWorkaroundForTicket15766(): Unit = { + // Due to the Windows workaround of #15766 we need to set an OP_CONNECT to reliably detect connection resets + if (Helpers.isWindows) interestCallReceiver.expectMsg(OP_CONNECT) + } + def run(body: ⇒ Unit): Unit = { try { setServerSocketOptions() @@ -911,6 +939,19 @@ class TcpConnectionSpec extends AkkaSpec(""" lazy val serverSelectionKey = registerChannel(serverSideChannel, "server") lazy val defaultbuffer = ByteBuffer.allocate(TestSize) + def windowsWorkaroundToDetectAbort(): Unit = { + // Due to a Windows quirk we need to set an OP_CONNECT to reliably detect connection resets, see #1576 + if (Helpers.isWindows) { + serverSelectionKey.interestOps(OP_CONNECT) + nioSelector.select(10) + } + } + + override def ignoreWindowsWorkaroundForTicket15766(): Unit = { + super.ignoreWindowsWorkaroundForTicket15766() + if (Helpers.isWindows) clientSelectionKey.interestOps(OP_CONNECT) + } + override def run(body: ⇒ Unit): Unit = super.run { try { serverSideChannel.configureBlocking(false) @@ -921,6 +962,7 @@ class TcpConnectionSpec extends AkkaSpec(""" userHandler.expectMsg(Connected(serverAddress, clientSideChannel.socket.getLocalSocketAddress.asInstanceOf[InetSocketAddress])) userHandler.send(connectionActor, Register(connectionHandler.ref, keepOpenOnPeerClosed, useResumeWriting)) + ignoreWindowsWorkaroundForTicket15766() if (!pullMode) interestCallReceiver.expectMsg(OP_READ) clientSelectionKey // trigger initialization @@ -1040,6 +1082,7 @@ class TcpConnectionSpec extends AkkaSpec(""" log.debug("setSoLinger(true, 0) failed with {}", e) } channel.close() + if (Helpers.isWindows) nioSelector.select(10) // Windows needs this } } diff --git a/akka-actor-tests/src/test/scala/akka/io/TcpIntegrationSpec.scala b/akka-actor-tests/src/test/scala/akka/io/TcpIntegrationSpec.scala index a1b3e9e98b..6ebb0af7be 100644 --- a/akka-actor-tests/src/test/scala/akka/io/TcpIntegrationSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/io/TcpIntegrationSpec.scala @@ -4,7 +4,7 @@ package akka.io -import akka.actor.PoisonPill +import akka.actor.{ ActorRef, PoisonPill } import akka.io.Tcp._ import akka.testkit.{ TestProbe, AkkaSpec } import akka.TestUtils._ @@ -34,7 +34,7 @@ class TcpIntegrationSpec extends AkkaSpec(""" verifyActorTermination(serverConnection) } - "properly handle connection abort from one side" in new TestSetup { + "properly handle connection abort from client side" in new TestSetup { val (clientHandler, clientConnection, serverHandler, serverConnection) = establishNewClientConnection() clientHandler.send(clientConnection, Abort) clientHandler.expectMsg(Aborted) @@ -43,6 +43,77 @@ class TcpIntegrationSpec extends AkkaSpec(""" verifyActorTermination(serverConnection) } + "properly handle connection abort from client side after chit-chat" in new TestSetup { + val (clientHandler, clientConnection, serverHandler, serverConnection) = establishNewClientConnection() + chitchat(clientHandler, clientConnection, serverHandler, serverConnection) + + clientHandler.send(clientConnection, Abort) + clientHandler.expectMsg(Aborted) + serverHandler.expectMsgType[ErrorClosed] + verifyActorTermination(clientConnection) + verifyActorTermination(serverConnection) + } + + "properly handle connection abort from server side" in new TestSetup { + val (clientHandler, clientConnection, serverHandler, serverConnection) = establishNewClientConnection() + serverHandler.send(serverConnection, Abort) + serverHandler.expectMsg(Aborted) + clientHandler.expectMsgType[ErrorClosed] + verifyActorTermination(clientConnection) + verifyActorTermination(serverConnection) + } + + "properly handle connection abort from server side after chit-chat" in new TestSetup { + val (clientHandler, clientConnection, serverHandler, serverConnection) = establishNewClientConnection() + chitchat(clientHandler, clientConnection, serverHandler, serverConnection) + + serverHandler.send(serverConnection, Abort) + serverHandler.expectMsg(Aborted) + clientHandler.expectMsgType[ErrorClosed] + verifyActorTermination(clientConnection) + verifyActorTermination(serverConnection) + } + + "properly handle connection abort via PosionPill from client side" in new TestSetup { + val (clientHandler, clientConnection, serverHandler, serverConnection) = establishNewClientConnection() + clientHandler.send(clientConnection, PoisonPill) + verifyActorTermination(clientConnection) + + serverHandler.expectMsgType[ErrorClosed] + verifyActorTermination(serverConnection) + } + + "properly handle connection abort via PosionPill from client side after chit-chat" in new TestSetup { + val (clientHandler, clientConnection, serverHandler, serverConnection) = establishNewClientConnection() + chitchat(clientHandler, clientConnection, serverHandler, serverConnection) + + clientHandler.send(clientConnection, PoisonPill) + verifyActorTermination(clientConnection) + + serverHandler.expectMsgType[ErrorClosed] + verifyActorTermination(serverConnection) + } + + "properly handle connection abort via PosionPill from server side" in new TestSetup { + val (clientHandler, clientConnection, serverHandler, serverConnection) = establishNewClientConnection() + serverHandler.send(serverConnection, PoisonPill) + verifyActorTermination(serverConnection) + + clientHandler.expectMsgType[ErrorClosed] + verifyActorTermination(clientConnection) + } + + "properly handle connection abort via PosionPill from server side after chit-chat" in new TestSetup { + val (clientHandler, clientConnection, serverHandler, serverConnection) = establishNewClientConnection() + chitchat(clientHandler, clientConnection, serverHandler, serverConnection) + + serverHandler.send(serverConnection, PoisonPill) + verifyActorTermination(serverConnection) + + clientHandler.expectMsgType[ErrorClosed] + verifyActorTermination(clientConnection) + } + "properly complete one client/server request/response cycle" in new TestSetup { val (clientHandler, clientConnection, serverHandler, serverConnection) = establishNewClientConnection() @@ -108,4 +179,21 @@ class TcpIntegrationSpec extends AkkaSpec(""" verifyActorTermination(connectionActor) } } + + def chitchat( + clientHandler: TestProbe, + clientConnection: ActorRef, + serverHandler: TestProbe, + serverConnection: ActorRef, + rounds: Int = 100) = { + + val testData = ByteString(0) + (1 to rounds) foreach { _ ⇒ + clientHandler.send(clientConnection, Write(testData)) + serverHandler.expectMsg(Received(testData)) + serverHandler.send(serverConnection, Write(testData)) + clientHandler.expectMsg(Received(testData)) + } + } + } diff --git a/akka-actor/src/main/resources/reference.conf b/akka-actor/src/main/resources/reference.conf index 6878e9bd93..198eaf8846 100644 --- a/akka-actor/src/main/resources/reference.conf +++ b/akka-actor/src/main/resources/reference.conf @@ -627,6 +627,15 @@ akka { # OP_CONNECT. Retries are needed if the OP_CONNECT notification doesn't imply that # `finishConnect` will succeed, which is the case on Android. finish-connect-retries = 5 + + # On Windows connection aborts are not reliably detected unless an OP_READ is + # registered on the selector _after_ the connection has been reset. This + # workaround enables an OP_CONNECT which forces the abort to be visible on Windows. + # Enabling this setting on other platforms than Windows will cause various failures + # and undefined behavior. + # Possible values of this key are on, off and auto where auto will enable the + # workaround if Windows is detected automatically. + windows-connection-abort-workaround-enabled = auto } udp { diff --git a/akka-actor/src/main/scala/akka/io/Tcp.scala b/akka-actor/src/main/scala/akka/io/Tcp.scala index 8dbe38d405..286832ec3b 100644 --- a/akka-actor/src/main/scala/akka/io/Tcp.scala +++ b/akka-actor/src/main/scala/akka/io/Tcp.scala @@ -5,13 +5,15 @@ package akka.io import java.net.InetSocketAddress +import java.net.Socket +import akka.ConfigurationException import java.nio.channels.SocketChannel import akka.io.Inet._ import com.typesafe.config.Config import scala.concurrent.duration._ import scala.collection.immutable import scala.collection.JavaConverters._ -import akka.util.ByteString +import akka.util.{ Helpers, ByteString } import akka.util.Helpers.Requiring import akka.actor._ import java.lang.{ Iterable ⇒ JIterable } @@ -544,6 +546,11 @@ class TcpExt(system: ExtendedActorSystem) extends IO.Extension { val FinishConnectRetries: Int = getInt("finish-connect-retries") requiring (_ > 0, "finish-connect-retries must be > 0") + val WindowsConnectionAbortWorkaroundEnabled: Boolean = getString("windows-connection-abort-workaround-enabled") match { + case "auto" ⇒ Helpers.isWindows + case _ => getBoolean("windows-connection-abort-workaround-enabled") + } + private[this] def getIntBytes(path: String): Int = { val size = getBytes(path) require(size < Int.MaxValue, s"$path must be < 2 GiB") diff --git a/akka-actor/src/main/scala/akka/io/TcpConnection.scala b/akka-actor/src/main/scala/akka/io/TcpConnection.scala index b37c65dd0c..59b28a85cb 100644 --- a/akka-actor/src/main/scala/akka/io/TcpConnection.scala +++ b/akka-actor/src/main/scala/akka/io/TcpConnection.scala @@ -55,6 +55,7 @@ private[io] abstract class TcpConnection(val tcp: TcpExt, val channel: SocketCha if (TraceLogging) log.debug("[{}] registered as connection handler", handler) val info = ConnectionInfo(registration, handler, keepOpenOnPeerClosed, useResumeWriting) + // if we have resumed reading from pullMode while waiting for Register then register OP_READ interest if (pullMode && !readingSuspended) resumeReading(info) doRead(info, None) // immediately try reading, pullMode is handled by readingSuspended @@ -186,6 +187,10 @@ private[io] abstract class TcpConnection(val tcp: TcpExt, val channel: SocketCha channel.socket.getLocalSocketAddress.asInstanceOf[InetSocketAddress]) context.setReceiveTimeout(RegisterTimeout) + + // !!WARNING!! The line below is needed to make Windows notify us about aborted connections, see #15766 + if (WindowsConnectionAbortWorkaroundEnabled) registration.enableInterest(OP_CONNECT) + context.become(waitingForRegistration(registration, commander)) }