=tcp #15766: Workaround for Windows to detect TCP abort

(cherry picked from commit 9340215)
This commit is contained in:
Endre Sándor Varga 2014-09-15 11:19:07 +02:00
parent fe1718c88f
commit 3a2f4e14d4
5 changed files with 158 additions and 6 deletions

View file

@ -5,7 +5,7 @@
package akka.io package akka.io
import java.io.{ File, IOException } import java.io.{ File, IOException }
import java.net.{ URLClassLoader, InetSocketAddress } import java.net.{ ServerSocket, URLClassLoader, InetSocketAddress }
import java.nio.ByteBuffer import java.nio.ByteBuffer
import java.nio.channels._ import java.nio.channels._
import java.nio.channels.spi.SelectorProvider import java.nio.channels.spi.SelectorProvider
@ -149,6 +149,7 @@ class TcpConnectionSpec extends AkkaSpec("""
userHandler.send(connectionActor, Register(userHandler.ref)) userHandler.send(connectionActor, Register(userHandler.ref))
userHandler.expectMsgType[Received].data.decodeString("ASCII") should be("immediatedata") userHandler.expectMsgType[Received].data.decodeString("ASCII") should be("immediatedata")
ignoreWindowsWorkaroundForTicket15766()
interestCallReceiver.expectMsg(OP_READ) interestCallReceiver.expectMsg(OP_READ)
} }
} }
@ -449,7 +450,10 @@ class TcpConnectionSpec extends AkkaSpec("""
assertThisConnectionActorTerminated() assertThisConnectionActorTerminated()
val buffer = ByteBuffer.allocate(1) val buffer = ByteBuffer.allocate(1)
val thrown = evaluating { serverSideChannel.read(buffer) } should produce[IOException] val thrown = evaluating {
windowsWorkaroundToDetectAbort()
serverSideChannel.read(buffer)
} should produce[IOException]
thrown.getMessage should be(ConnectionResetByPeerMessage) thrown.getMessage should be(ConnectionResetByPeerMessage)
} }
} }
@ -599,7 +603,7 @@ class TcpConnectionSpec extends AkkaSpec("""
abortClose(serverSideChannel) abortClose(serverSideChannel)
writer.send(connectionActor, Write(ByteString("testdata"))) writer.send(connectionActor, Write(ByteString("testdata")))
// bother writer and handler should get the message // both writer and handler should get the message
writer.expectMsgType[ErrorClosed] writer.expectMsgType[ErrorClosed]
connectionHandler.expectMsgType[ErrorClosed] connectionHandler.expectMsgType[ErrorClosed]
@ -815,6 +819,25 @@ class TcpConnectionSpec extends AkkaSpec("""
writer.expectMsg(works) writer.expectMsg(works)
} }
} }
"report abort before handler is registered (reproducer from #15033)" in {
// This test needs the OP_CONNECT workaround on Windows, see original report #15033 and parent ticket #15766
val bindAddress = new InetSocketAddress(23402)
val serverSocket = new ServerSocket(bindAddress.getPort, 100, bindAddress.getAddress)
val connectionProbe = TestProbe()
connectionProbe.send(IO(Tcp), Connect(bindAddress))
IO(Tcp) ! Connect(bindAddress)
val socket = serverSocket.accept()
connectionProbe.expectMsgType[Tcp.Connected]
val connectionActor = connectionProbe.sender()
connectionActor ! PoisonPill
verifyActorTermination(connectionActor)
an[IOException] should be thrownBy { socket.getInputStream.read() }
}
} }
/////////////////////////////////////// TEST SUPPORT //////////////////////////////////////////////// /////////////////////////////////////// TEST SUPPORT ////////////////////////////////////////////////
@ -840,6 +863,11 @@ class TcpConnectionSpec extends AkkaSpec("""
var registerCallReceiver = TestProbe() var registerCallReceiver = TestProbe()
var interestCallReceiver = TestProbe() var interestCallReceiver = TestProbe()
def ignoreWindowsWorkaroundForTicket15766(): Unit = {
// Due to the Windows workaround of #15766 we need to set an OP_CONNECT to reliably detect connection resets
if (Helpers.isWindows) interestCallReceiver.expectMsg(OP_CONNECT)
}
def run(body: Unit): Unit = { def run(body: Unit): Unit = {
try { try {
setServerSocketOptions() setServerSocketOptions()
@ -911,6 +939,19 @@ class TcpConnectionSpec extends AkkaSpec("""
lazy val serverSelectionKey = registerChannel(serverSideChannel, "server") lazy val serverSelectionKey = registerChannel(serverSideChannel, "server")
lazy val defaultbuffer = ByteBuffer.allocate(TestSize) lazy val defaultbuffer = ByteBuffer.allocate(TestSize)
def windowsWorkaroundToDetectAbort(): Unit = {
// Due to a Windows quirk we need to set an OP_CONNECT to reliably detect connection resets, see #1576
if (Helpers.isWindows) {
serverSelectionKey.interestOps(OP_CONNECT)
nioSelector.select(10)
}
}
override def ignoreWindowsWorkaroundForTicket15766(): Unit = {
super.ignoreWindowsWorkaroundForTicket15766()
if (Helpers.isWindows) clientSelectionKey.interestOps(OP_CONNECT)
}
override def run(body: Unit): Unit = super.run { override def run(body: Unit): Unit = super.run {
try { try {
serverSideChannel.configureBlocking(false) serverSideChannel.configureBlocking(false)
@ -921,6 +962,7 @@ class TcpConnectionSpec extends AkkaSpec("""
userHandler.expectMsg(Connected(serverAddress, clientSideChannel.socket.getLocalSocketAddress.asInstanceOf[InetSocketAddress])) userHandler.expectMsg(Connected(serverAddress, clientSideChannel.socket.getLocalSocketAddress.asInstanceOf[InetSocketAddress]))
userHandler.send(connectionActor, Register(connectionHandler.ref, keepOpenOnPeerClosed, useResumeWriting)) userHandler.send(connectionActor, Register(connectionHandler.ref, keepOpenOnPeerClosed, useResumeWriting))
ignoreWindowsWorkaroundForTicket15766()
if (!pullMode) interestCallReceiver.expectMsg(OP_READ) if (!pullMode) interestCallReceiver.expectMsg(OP_READ)
clientSelectionKey // trigger initialization clientSelectionKey // trigger initialization
@ -1040,6 +1082,7 @@ class TcpConnectionSpec extends AkkaSpec("""
log.debug("setSoLinger(true, 0) failed with {}", e) log.debug("setSoLinger(true, 0) failed with {}", e)
} }
channel.close() channel.close()
if (Helpers.isWindows) nioSelector.select(10) // Windows needs this
} }
} }

View file

@ -4,7 +4,7 @@
package akka.io package akka.io
import akka.actor.PoisonPill import akka.actor.{ ActorRef, PoisonPill }
import akka.io.Tcp._ import akka.io.Tcp._
import akka.testkit.{ TestProbe, AkkaSpec } import akka.testkit.{ TestProbe, AkkaSpec }
import akka.TestUtils._ import akka.TestUtils._
@ -34,7 +34,7 @@ class TcpIntegrationSpec extends AkkaSpec("""
verifyActorTermination(serverConnection) verifyActorTermination(serverConnection)
} }
"properly handle connection abort from one side" in new TestSetup { "properly handle connection abort from client side" in new TestSetup {
val (clientHandler, clientConnection, serverHandler, serverConnection) = establishNewClientConnection() val (clientHandler, clientConnection, serverHandler, serverConnection) = establishNewClientConnection()
clientHandler.send(clientConnection, Abort) clientHandler.send(clientConnection, Abort)
clientHandler.expectMsg(Aborted) clientHandler.expectMsg(Aborted)
@ -43,6 +43,77 @@ class TcpIntegrationSpec extends AkkaSpec("""
verifyActorTermination(serverConnection) verifyActorTermination(serverConnection)
} }
"properly handle connection abort from client side after chit-chat" in new TestSetup {
val (clientHandler, clientConnection, serverHandler, serverConnection) = establishNewClientConnection()
chitchat(clientHandler, clientConnection, serverHandler, serverConnection)
clientHandler.send(clientConnection, Abort)
clientHandler.expectMsg(Aborted)
serverHandler.expectMsgType[ErrorClosed]
verifyActorTermination(clientConnection)
verifyActorTermination(serverConnection)
}
"properly handle connection abort from server side" in new TestSetup {
val (clientHandler, clientConnection, serverHandler, serverConnection) = establishNewClientConnection()
serverHandler.send(serverConnection, Abort)
serverHandler.expectMsg(Aborted)
clientHandler.expectMsgType[ErrorClosed]
verifyActorTermination(clientConnection)
verifyActorTermination(serverConnection)
}
"properly handle connection abort from server side after chit-chat" in new TestSetup {
val (clientHandler, clientConnection, serverHandler, serverConnection) = establishNewClientConnection()
chitchat(clientHandler, clientConnection, serverHandler, serverConnection)
serverHandler.send(serverConnection, Abort)
serverHandler.expectMsg(Aborted)
clientHandler.expectMsgType[ErrorClosed]
verifyActorTermination(clientConnection)
verifyActorTermination(serverConnection)
}
"properly handle connection abort via PosionPill from client side" in new TestSetup {
val (clientHandler, clientConnection, serverHandler, serverConnection) = establishNewClientConnection()
clientHandler.send(clientConnection, PoisonPill)
verifyActorTermination(clientConnection)
serverHandler.expectMsgType[ErrorClosed]
verifyActorTermination(serverConnection)
}
"properly handle connection abort via PosionPill from client side after chit-chat" in new TestSetup {
val (clientHandler, clientConnection, serverHandler, serverConnection) = establishNewClientConnection()
chitchat(clientHandler, clientConnection, serverHandler, serverConnection)
clientHandler.send(clientConnection, PoisonPill)
verifyActorTermination(clientConnection)
serverHandler.expectMsgType[ErrorClosed]
verifyActorTermination(serverConnection)
}
"properly handle connection abort via PosionPill from server side" in new TestSetup {
val (clientHandler, clientConnection, serverHandler, serverConnection) = establishNewClientConnection()
serverHandler.send(serverConnection, PoisonPill)
verifyActorTermination(serverConnection)
clientHandler.expectMsgType[ErrorClosed]
verifyActorTermination(clientConnection)
}
"properly handle connection abort via PosionPill from server side after chit-chat" in new TestSetup {
val (clientHandler, clientConnection, serverHandler, serverConnection) = establishNewClientConnection()
chitchat(clientHandler, clientConnection, serverHandler, serverConnection)
serverHandler.send(serverConnection, PoisonPill)
verifyActorTermination(serverConnection)
clientHandler.expectMsgType[ErrorClosed]
verifyActorTermination(clientConnection)
}
"properly complete one client/server request/response cycle" in new TestSetup { "properly complete one client/server request/response cycle" in new TestSetup {
val (clientHandler, clientConnection, serverHandler, serverConnection) = establishNewClientConnection() val (clientHandler, clientConnection, serverHandler, serverConnection) = establishNewClientConnection()
@ -108,4 +179,21 @@ class TcpIntegrationSpec extends AkkaSpec("""
verifyActorTermination(connectionActor) verifyActorTermination(connectionActor)
} }
} }
def chitchat(
clientHandler: TestProbe,
clientConnection: ActorRef,
serverHandler: TestProbe,
serverConnection: ActorRef,
rounds: Int = 100) = {
val testData = ByteString(0)
(1 to rounds) foreach { _
clientHandler.send(clientConnection, Write(testData))
serverHandler.expectMsg(Received(testData))
serverHandler.send(serverConnection, Write(testData))
clientHandler.expectMsg(Received(testData))
}
}
} }

View file

@ -627,6 +627,15 @@ akka {
# OP_CONNECT. Retries are needed if the OP_CONNECT notification doesn't imply that # OP_CONNECT. Retries are needed if the OP_CONNECT notification doesn't imply that
# `finishConnect` will succeed, which is the case on Android. # `finishConnect` will succeed, which is the case on Android.
finish-connect-retries = 5 finish-connect-retries = 5
# On Windows connection aborts are not reliably detected unless an OP_READ is
# registered on the selector _after_ the connection has been reset. This
# workaround enables an OP_CONNECT which forces the abort to be visible on Windows.
# Enabling this setting on other platforms than Windows will cause various failures
# and undefined behavior.
# Possible values of this key are on, off and auto where auto will enable the
# workaround if Windows is detected automatically.
windows-connection-abort-workaround-enabled = auto
} }
udp { udp {

View file

@ -5,13 +5,15 @@
package akka.io package akka.io
import java.net.InetSocketAddress import java.net.InetSocketAddress
import java.net.Socket
import akka.ConfigurationException
import java.nio.channels.SocketChannel import java.nio.channels.SocketChannel
import akka.io.Inet._ import akka.io.Inet._
import com.typesafe.config.Config import com.typesafe.config.Config
import scala.concurrent.duration._ import scala.concurrent.duration._
import scala.collection.immutable import scala.collection.immutable
import scala.collection.JavaConverters._ import scala.collection.JavaConverters._
import akka.util.ByteString import akka.util.{ Helpers, ByteString }
import akka.util.Helpers.Requiring import akka.util.Helpers.Requiring
import akka.actor._ import akka.actor._
import java.lang.{ Iterable JIterable } import java.lang.{ Iterable JIterable }
@ -544,6 +546,11 @@ class TcpExt(system: ExtendedActorSystem) extends IO.Extension {
val FinishConnectRetries: Int = getInt("finish-connect-retries") requiring (_ > 0, val FinishConnectRetries: Int = getInt("finish-connect-retries") requiring (_ > 0,
"finish-connect-retries must be > 0") "finish-connect-retries must be > 0")
val WindowsConnectionAbortWorkaroundEnabled: Boolean = getString("windows-connection-abort-workaround-enabled") match {
case "auto" Helpers.isWindows
case _ => getBoolean("windows-connection-abort-workaround-enabled")
}
private[this] def getIntBytes(path: String): Int = { private[this] def getIntBytes(path: String): Int = {
val size = getBytes(path) val size = getBytes(path)
require(size < Int.MaxValue, s"$path must be < 2 GiB") require(size < Int.MaxValue, s"$path must be < 2 GiB")

View file

@ -55,6 +55,7 @@ private[io] abstract class TcpConnection(val tcp: TcpExt, val channel: SocketCha
if (TraceLogging) log.debug("[{}] registered as connection handler", handler) if (TraceLogging) log.debug("[{}] registered as connection handler", handler)
val info = ConnectionInfo(registration, handler, keepOpenOnPeerClosed, useResumeWriting) val info = ConnectionInfo(registration, handler, keepOpenOnPeerClosed, useResumeWriting)
// if we have resumed reading from pullMode while waiting for Register then register OP_READ interest // if we have resumed reading from pullMode while waiting for Register then register OP_READ interest
if (pullMode && !readingSuspended) resumeReading(info) if (pullMode && !readingSuspended) resumeReading(info)
doRead(info, None) // immediately try reading, pullMode is handled by readingSuspended doRead(info, None) // immediately try reading, pullMode is handled by readingSuspended
@ -186,6 +187,10 @@ private[io] abstract class TcpConnection(val tcp: TcpExt, val channel: SocketCha
channel.socket.getLocalSocketAddress.asInstanceOf[InetSocketAddress]) channel.socket.getLocalSocketAddress.asInstanceOf[InetSocketAddress])
context.setReceiveTimeout(RegisterTimeout) context.setReceiveTimeout(RegisterTimeout)
// !!WARNING!! The line below is needed to make Windows notify us about aborted connections, see #15766
if (WindowsConnectionAbortWorkaroundEnabled) registration.enableInterest(OP_CONNECT)
context.become(waitingForRegistration(registration, commander)) context.become(waitingForRegistration(registration, commander))
} }