diff --git a/akka-actor-tests/src/test/scala/akka/io/TcpConnectionSpec.scala b/akka-actor-tests/src/test/scala/akka/io/TcpConnectionSpec.scala index 188a441d78..431ece8ddb 100644 --- a/akka-actor-tests/src/test/scala/akka/io/TcpConnectionSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/io/TcpConnectionSpec.scala @@ -4,11 +4,12 @@ package akka.io -import java.io.{ FileOutputStream, File, IOException } +import java.io.{ File, IOException } import java.net.{ URLClassLoader, ConnectException, InetSocketAddress, SocketException } import java.nio.ByteBuffer import java.nio.channels.{ SelectionKey, Selector, ServerSocketChannel, SocketChannel } import java.nio.channels.spi.SelectorProvider +import java.nio.channels.SelectionKey._ import scala.annotation.tailrec import scala.collection.immutable import scala.concurrent.duration._ @@ -16,30 +17,25 @@ import scala.util.control.NonFatal import org.scalatest.matchers._ import akka.io.Tcp._ import akka.io.SelectionHandler._ -import akka.TestUtils -import TestUtils._ -import akka.actor.{ ActorRef, PoisonPill, Terminated } +import akka.io.Inet.SocketOption +import akka.actor.{ PoisonPill, Terminated, DeathPactException } import akka.testkit.{ AkkaSpec, EventFilter, TestActorRef, TestProbe } import akka.util.{ Helpers, ByteString } -import akka.actor.DeathPactException -import java.nio.channels.SelectionKey._ -import akka.io.Inet.SocketOption +import akka.TestUtils._ class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms") { val serverAddress = temporaryServerAddress() // Helper to avoid Windows localization specific differences - def ignoreIfWindows(): Unit = { + def ignoreIfWindows(): Unit = if (Helpers.isWindows) { info("Detected Windows: ignoring check") pending } - } lazy val ConnectionResetByPeerMessage: String = { val serverSocket = ServerSocketChannel.open() serverSocket.socket.bind(new InetSocketAddress("127.0.0.1", 0)) - try { val clientSocket = SocketChannel.open(new InetSocketAddress("127.0.0.1", serverSocket.socket().getLocalPort)) val clientSocketOnServer = acceptServerSideConnection(serverSocket) @@ -55,7 +51,6 @@ class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms") lazy val ConnectionRefusedMessagePrefix: String = { val serverSocket = ServerSocketChannel.open() serverSocket.socket.bind(new InetSocketAddress("127.0.0.1", 0)) - try { serverSocket.close() val clientSocket = SocketChannel.open(new InetSocketAddress("127.0.0.1", serverSocket.socket().getLocalPort)) @@ -72,147 +67,146 @@ class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms") info("Connection refused message prefix expected is " + ConnectionRefusedMessagePrefix) // common behavior - "set socket options before connecting" in withLocalServer() { localServer ⇒ - val userHandler = TestProbe() - val selector = TestProbe() - val connectionActor = - createConnectionActor(options = Vector(Inet.SO.ReuseAddress(true)))(selector.ref, userHandler.ref) - val clientChannel = connectionActor.underlyingActor.channel - clientChannel.socket.getReuseAddress must be(true) + "set socket options before connecting" in new LocalServerTest() { + run { + val connectionActor = createConnectionActor(options = Vector(Inet.SO.ReuseAddress(true))) + val clientChannel = connectionActor.underlyingActor.channel + clientChannel.socket.getReuseAddress must be(true) + } } - "set socket options after connecting" ignore withLocalServer() { localServer ⇒ - // Workaround for systems where SO_KEEPALIVE is true by default - val userHandler = TestProbe() - val selector = TestProbe() - val connectionActor = - createConnectionActor(options = Vector(SO.KeepAlive(false)))(selector.ref, userHandler.ref) - val clientChannel = connectionActor.underlyingActor.channel - clientChannel.socket.getKeepAlive must be(true) // only set after connection is established - EventFilter.warning(pattern = "registration timeout", occurrences = 1) intercept { + "set socket options after connecting" ignore new LocalServerTest() { + run { + // Workaround for systems where SO_KEEPALIVE is true by default + val connectionActor = createConnectionActor(options = Vector(SO.KeepAlive(false))) + val clientChannel = connectionActor.underlyingActor.channel + clientChannel.socket.getKeepAlive must be(true) // only set after connection is established + EventFilter.warning(pattern = "registration timeout", occurrences = 1) intercept { + selector.send(connectionActor, ChannelConnectable) + clientChannel.socket.getKeepAlive must be(false) + } + } + } + + "send incoming data to the connection handler" in new EstablishedConnectionTest() { + run { + serverSideChannel.write(ByteBuffer.wrap("testdata".getBytes("ASCII"))) + + expectReceivedString("testdata") + + // have two packets in flight before the selector notices + serverSideChannel.write(ByteBuffer.wrap("testdata2".getBytes("ASCII"))) + serverSideChannel.write(ByteBuffer.wrap("testdata3".getBytes("ASCII"))) + + expectReceivedString("testdata2testdata3") + } + } + + "forward incoming data as Received messages instantly as long as more data is available" in + new EstablishedConnectionTest() { // to make sure enough data gets through + override lazy val connectionActor = createConnectionActor(options = List(Inet.SO.ReceiveBufferSize(1000000))) + run { + val bufferSize = Tcp(system).Settings.DirectBufferSize + val DataSize = bufferSize + 1500 + val bigData = new Array[Byte](DataSize) + val buffer = ByteBuffer.wrap(bigData) + + serverSideChannel.socket.setSendBufferSize(150000) + val wrote = serverSideChannel.write(buffer) + wrote must be(DataSize) + + expectNoMsg(1000.millis) // data should have been transferred fully by now + + selector.send(connectionActor, ChannelReadable) + + connectionHandler.expectMsgType[Received].data.length must be(bufferSize) + connectionHandler.expectMsgType[Received].data.length must be(1500) + } + } + + "receive data directly when the connection is established" in new UnacceptedConnectionTest { + run { + val serverSideChannel = acceptServerSideConnection(localServerChannel) + + serverSideChannel.write(ByteBuffer.wrap("immediatedata".getBytes("ASCII"))) + serverSideChannel.configureBlocking(false) + selector.send(connectionActor, ChannelConnectable) - clientChannel.socket.getKeepAlive must be(false) + userHandler.expectMsg(Connected(serverAddress, clientSideChannel.socket.getLocalSocketAddress.asInstanceOf[InetSocketAddress])) + + // we unrealistically register the selector here so that we can observe + // the ordering between Received and ReadInterest + userHandler.send(connectionActor, Register(selector.ref)) + selector.expectMsgType[Received].data.decodeString("ASCII") must be("immediatedata") + selector.expectMsg(ReadInterest) } } - "send incoming data to the connection handler" in withEstablishedConnection() { setup ⇒ - import setup._ - serverSideChannel.write(ByteBuffer.wrap("testdata".getBytes("ASCII"))) + "write data to network (and acknowledge)" in new EstablishedConnectionTest() { + run { + object Ack + val writer = TestProbe() - expectReceivedString("testdata") + // directly acknowledge an empty write + writer.send(connectionActor, Write(ByteString.empty, Ack)) + writer.expectMsg(Ack) - // have two packets in flight before the selector notices - serverSideChannel.write(ByteBuffer.wrap("testdata2".getBytes("ASCII"))) - serverSideChannel.write(ByteBuffer.wrap("testdata3".getBytes("ASCII"))) + // reply to write commander with Ack + val ackedWrite = Write(ByteString("testdata"), Ack) + val buffer = ByteBuffer.allocate(100) + serverSideChannel.read(buffer) must be(0) + writer.send(connectionActor, ackedWrite) + writer.expectMsg(Ack) + pullFromServerSide(remaining = 8, into = buffer) + buffer.flip() + buffer.limit must be(8) - expectReceivedString("testdata2testdata3") - } - - "forward incoming data as Received messages instantly as long as more data is available" in withEstablishedConnection( - clientSocketOptions = List(Inet.SO.ReceiveBufferSize(1000000)) // to make sure enough data gets through - ) { setup ⇒ - import setup._ - - val bufferSize = Tcp(system).Settings.DirectBufferSize - val DataSize = bufferSize + 1500 - val bigData = new Array[Byte](DataSize) - val buffer = ByteBuffer.wrap(bigData) - - serverSideChannel.socket.setSendBufferSize(150000) - val wrote = serverSideChannel.write(buffer) - wrote must be(DataSize) - - expectNoMsg(1000.millis) // data should have been transferred fully by now - - selector.send(connectionActor, ChannelReadable) - - connectionHandler.expectMsgType[Received].data.length must be(bufferSize) - connectionHandler.expectMsgType[Received].data.length must be(1500) + // not reply to write commander for writes without Ack + val unackedWrite = Write(ByteString("morestuff!")) + buffer.clear() + serverSideChannel.read(buffer) must be(0) + writer.send(connectionActor, unackedWrite) + writer.expectNoMsg(500.millis) + pullFromServerSide(remaining = 10, into = buffer) + buffer.flip() + buffer.limit must be(10) + ByteString(buffer).take(10).decodeString("ASCII") must be("morestuff!") } - - "receive data directly when the connection is established" in withUnacceptedConnection() { unregisteredSetup ⇒ - import unregisteredSetup._ - - val serverSideChannel = acceptServerSideConnection(localServer) - - serverSideChannel.write(ByteBuffer.wrap("immediatedata".getBytes("ASCII"))) - serverSideChannel.configureBlocking(false) - - selector.send(connectionActor, ChannelConnectable) - userHandler.expectMsg(Connected(serverAddress, clientSideChannel.socket.getLocalSocketAddress.asInstanceOf[InetSocketAddress])) - - // we unrealistically register the selector here so that we can observe - // the ordering between Received and ReadInterest - userHandler.send(connectionActor, Register(selector.ref)) - selector.expectMsgType[Received].data.decodeString("ASCII") must be("immediatedata") - selector.expectMsg(ReadInterest) } - "write data to network (and acknowledge)" in withEstablishedConnection() { setup ⇒ - import setup._ + "write data after not acknowledged data" in new EstablishedConnectionTest() { + run { + object Ack + val writer = TestProbe() + writer.send(connectionActor, Write(ByteString(42.toByte))) + writer.expectNoMsg(500.millis) - object Ack - val writer = TestProbe() - - // directly acknowledge an empty write - writer.send(connectionActor, Write(ByteString.empty, Ack)) - writer.expectMsg(Ack) - - // reply to write commander with Ack - val ackedWrite = Write(ByteString("testdata"), Ack) - val buffer = ByteBuffer.allocate(100) - serverSideChannel.read(buffer) must be(0) - writer.send(connectionActor, ackedWrite) - writer.expectMsg(Ack) - pullFromServerSide(remaining = 8, into = buffer) - buffer.flip() - buffer.limit must be(8) - - // not reply to write commander for writes without Ack - val unackedWrite = Write(ByteString("morestuff!")) - buffer.clear() - serverSideChannel.read(buffer) must be(0) - writer.send(connectionActor, unackedWrite) - writer.expectNoMsg(500.millis) - pullFromServerSide(remaining = 10, into = buffer) - buffer.flip() - buffer.limit must be(10) - ByteString(buffer).take(10).decodeString("ASCII") must be("morestuff!") + writer.send(connectionActor, Write(ByteString.empty, Ack)) + writer.expectMsg(Ack) + } } - "write data after not acknowledged data" in withEstablishedConnection() { setup ⇒ - import setup._ + "write file to network" in new EstablishedConnectionTest() { + run { + // hacky: we need a file for testing purposes, so try to get the biggest one from our own classpath + val testFile = + classOf[TcpConnectionSpec].getClassLoader.asInstanceOf[URLClassLoader] + .getURLs + .filter(_.getProtocol == "file") + .map(url ⇒ new File(url.toURI)) + .filter(_.exists) + .sortBy(-_.length) + .head - object Ack - val writer = TestProbe() - writer.send(connectionActor, Write(ByteString(42.toByte))) - writer.expectNoMsg(500.millis) + // maximum of 100 MB + val size = math.min(testFile.length(), 100000000).toInt - writer.send(connectionActor, Write(ByteString.empty, Ack)) - writer.expectMsg(Ack) - } - - "write file to network" in withEstablishedConnection() { setup ⇒ - import setup._ - - // hacky: we need a file for testing purposes, so try to get the biggest one from our own classpath - val testFile = - classOf[TcpConnectionSpec].getClassLoader.asInstanceOf[URLClassLoader] - .getURLs - .filter(_.getProtocol == "file") - .map(url ⇒ new File(url.toURI)) - .filter(_.exists) - .sortBy(-_.length) - .head - - // maximum of 100 MB - val size = math.min(testFile.length(), 100000000).toInt - - object Ack - val writer = TestProbe() - writer.send(connectionActor, WriteFile(testFile.getAbsolutePath, 0, size, Ack)) - pullFromServerSide(size, 1000000) - writer.expectMsg(Ack) + object Ack + val writer = TestProbe() + writer.send(connectionActor, WriteFile(testFile.getAbsolutePath, 0, size, Ack)) + pullFromServerSide(size, 1000000) + writer.expectMsg(Ack) + } } /* @@ -228,109 +222,114 @@ class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms") * SO_SNDBUF to 0." */ "stop writing in cases of backpressure and resume afterwards" in - withEstablishedConnection(clientSocketOptions = List(SO.ReceiveBufferSize(1000000))) { setup ⇒ - info("Currently ignored as SO_SNDBUF is usually a lower bound on the send buffer so the test fails as no real " + - "backpressure present.") - pending - ignoreIfWindows() - import setup._ - object Ack1 - object Ack2 + new EstablishedConnectionTest() { + override lazy val connectionActor = createConnectionActor(options = List(Inet.SO.ReceiveBufferSize(1000000))) + run { + info("Currently ignored as SO_SNDBUF is usually a lower bound on the send buffer so the test fails as no real " + + "backpressure present.") + pending + ignoreIfWindows() + object Ack1 + object Ack2 - clientSideChannel.socket.setSendBufferSize(1024) + clientSideChannel.socket.setSendBufferSize(1024) - awaitCond(clientSideChannel.socket.getSendBufferSize == 1024) + awaitCond(clientSideChannel.socket.getSendBufferSize == 1024) - val writer = TestProbe() + val writer = TestProbe() - // producing backpressure by sending much more than currently fits into - // our send buffer - val firstWrite = writeCmd(Ack1) + // producing backpressure by sending much more than currently fits into + // our send buffer + val firstWrite = writeCmd(Ack1) - // try to write the buffer but since the SO_SNDBUF is too small - // it will have to keep the rest of the piece and send it - // when possible - writer.send(connectionActor, firstWrite) - selector.expectMsg(WriteInterest) + // try to write the buffer but since the SO_SNDBUF is too small + // it will have to keep the rest of the piece and send it + // when possible + writer.send(connectionActor, firstWrite) + selector.expectMsg(WriteInterest) - // send another write which should fail immediately - // because we don't store more than one piece in flight - val secondWrite = writeCmd(Ack2) - writer.send(connectionActor, secondWrite) - writer.expectMsg(CommandFailed(secondWrite)) + // send another write which should fail immediately + // because we don't store more than one piece in flight + val secondWrite = writeCmd(Ack2) + writer.send(connectionActor, secondWrite) + writer.expectMsg(CommandFailed(secondWrite)) - // reject even empty writes - writer.send(connectionActor, Write.empty) - writer.expectMsg(CommandFailed(Write.empty)) + // reject even empty writes + writer.send(connectionActor, Write.empty) + writer.expectMsg(CommandFailed(Write.empty)) - // there will be immediately more space in the send buffer because - // some data will have been sent by now, so we assume we can write - // again, but still it can't write everything - selector.send(connectionActor, ChannelWritable) + // there will be immediately more space in the send buffer because + // some data will have been sent by now, so we assume we can write + // again, but still it can't write everything + selector.send(connectionActor, ChannelWritable) - // both buffers should now be filled so no more writing - // is possible - pullFromServerSide(TestSize) - writer.expectMsg(Ack1) + // both buffers should now be filled so no more writing + // is possible + pullFromServerSide(TestSize) + writer.expectMsg(Ack1) + } } - "respect StopReading and ResumeReading" in withEstablishedConnection() { setup ⇒ - import setup._ - connectionHandler.send(connectionActor, SuspendReading) + "respect StopReading and ResumeReading" in new EstablishedConnectionTest() { + run { + connectionHandler.send(connectionActor, SuspendReading) - // the selector interprets StopReading to deregister interest - // for reading - selector.expectMsg(DisableReadInterest) - connectionHandler.send(connectionActor, ResumeReading) - selector.expectMsg(ReadInterest) + // the selector interprets StopReading to deregister interest for reading + selector.expectMsg(DisableReadInterest) + connectionHandler.send(connectionActor, ResumeReading) + selector.expectMsg(ReadInterest) + } } - "close the connection and reply with `Closed` upon reception of a `Close` command" in withEstablishedConnection(setSmallRcvBuffer) { setup ⇒ - import setup._ + "close the connection and reply with `Closed` upon reception of a `Close` command" in + new EstablishedConnectionTest() with SmallRcvBuffer { + run { + // we should test here that a pending write command is properly finished first + object Ack + // set an artificially small send buffer size so that the write is queued + // inside the connection actor + clientSideChannel.socket.setSendBufferSize(1024) - // we should test here that a pending write command is properly finished first - object Ack - // set an artificially small send buffer size so that the write is queued - // inside the connection actor - clientSideChannel.socket.setSendBufferSize(1024) + // we send a write and a close command directly afterwards + connectionHandler.send(connectionActor, writeCmd(Ack)) + val closeCommander = TestProbe() + closeCommander.send(connectionActor, Close) - // we send a write and a close command directly afterwards - connectionHandler.send(connectionActor, writeCmd(Ack)) - val closeCommander = TestProbe() - closeCommander.send(connectionActor, Close) + pullFromServerSide(TestSize) + connectionHandler.expectMsg(Ack) + connectionHandler.expectMsg(Closed) + closeCommander.expectMsg(Closed) + assertThisConnectionActorTerminated() - pullFromServerSide(TestSize) - connectionHandler.expectMsg(Ack) - connectionHandler.expectMsg(Closed) - closeCommander.expectMsg(Closed) - assertThisConnectionActorTerminated() + serverSelectionKey must be(selectedAs(OP_READ, 2.seconds)) - serverSelectionKey must be(selectedAs(SelectionKey.OP_READ, 2.seconds)) + val buffer = ByteBuffer.allocate(1) + serverSideChannel.read(buffer) must be(-1) + } + } - val buffer = ByteBuffer.allocate(1) - serverSideChannel.read(buffer) must be(-1) - } + "send only one `Closed` event to the handler, if the handler commanded the Close" in + new EstablishedConnectionTest() { + run { + connectionHandler.send(connectionActor, Close) + connectionHandler.expectMsg(Closed) + connectionHandler.expectNoMsg(500.millis) + } + } - "send only one `Closed` event to the handler, if the handler commanded the Close" in withEstablishedConnection() { setup ⇒ - import setup._ + "abort the connection and reply with `Aborted` upon reception of an `Abort` command" in + new EstablishedConnectionTest() { + run { + connectionHandler.send(connectionActor, Abort) + connectionHandler.expectMsg(Aborted) - connectionHandler.send(connectionActor, Close) - connectionHandler.expectMsg(Closed) - connectionHandler.expectNoMsg(500.millis) - } + assertThisConnectionActorTerminated() - "abort the connection and reply with `Aborted` upon reception of an `Abort` command" in withEstablishedConnection() { setup ⇒ - import setup._ - - connectionHandler.send(connectionActor, Abort) - connectionHandler.expectMsg(Aborted) - - assertThisConnectionActorTerminated() - - val buffer = ByteBuffer.allocate(1) - val thrown = evaluating { serverSideChannel.read(buffer) } must produce[IOException] - thrown.getMessage must be(ConnectionResetByPeerMessage) - } + val buffer = ByteBuffer.allocate(1) + val thrown = evaluating { serverSideChannel.read(buffer) } must produce[IOException] + thrown.getMessage must be(ConnectionResetByPeerMessage) + } + } /* * Partly disabled on Windows: http://support.microsoft.com/kb/214397 @@ -344,348 +343,360 @@ class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms") * has hit the network medium. The only exception is when you disable the Winsock buffering by setting * SO_SNDBUF to 0." */ - "close the connection and reply with `ConfirmedClosed` upon reception of an `ConfirmedClose` command (simplified)" in withEstablishedConnection(setSmallRcvBuffer) { setup ⇒ - import setup._ + "close the connection and reply with `ConfirmedClosed` upon reception of an `ConfirmedClose` command (simplified)" in + new EstablishedConnectionTest() with SmallRcvBuffer { + run { + // we should test here that a pending write command is properly finished first + object Ack + // set an artificially small send buffer size so that the write is queued + // inside the connection actor + clientSideChannel.socket.setSendBufferSize(1024) - // we should test here that a pending write command is properly finished first - object Ack - // set an artificially small send buffer size so that the write is queued - // inside the connection actor - clientSideChannel.socket.setSendBufferSize(1024) + // we send a write and a close command directly afterwards + connectionHandler.send(connectionActor, writeCmd(Ack)) + connectionHandler.send(connectionActor, ConfirmedClose) - // we send a write and a close command directly afterwards - connectionHandler.send(connectionActor, writeCmd(Ack)) - connectionHandler.send(connectionActor, ConfirmedClose) + pullFromServerSide(TestSize) + connectionHandler.expectMsg(Ack) - pullFromServerSide(TestSize) - connectionHandler.expectMsg(Ack) + selector.send(connectionActor, ChannelReadable) - selector.send(connectionActor, ChannelReadable) + val buffer = ByteBuffer.allocate(1) + serverSelectionKey must be(selectedAs(OP_READ, 2.seconds)) + serverSideChannel.read(buffer) must be(-1) - val buffer = ByteBuffer.allocate(1) - serverSelectionKey must be(selectedAs(SelectionKey.OP_READ, 2.seconds)) - serverSideChannel.read(buffer) must be(-1) + closeServerSideAndWaitForClientReadable() - closeServerSideAndWaitForClientReadable() + selector.send(connectionActor, ChannelReadable) + connectionHandler.expectMsg(ConfirmedClosed) - selector.send(connectionActor, ChannelReadable) - connectionHandler.expectMsg(ConfirmedClosed) + assertThisConnectionActorTerminated() + } + } - assertThisConnectionActorTerminated() - } + "close the connection and reply with `ConfirmedClosed` upon reception of an `ConfirmedClose` command" in + new EstablishedConnectionTest() with SmallRcvBuffer { + run { + ignoreIfWindows() - "close the connection and reply with `ConfirmedClosed` upon reception of an `ConfirmedClose` command" in withEstablishedConnection(setSmallRcvBuffer) { setup ⇒ - ignoreIfWindows() - import setup._ + // we should test here that a pending write command is properly finished first + object Ack + // set an artificially small send buffer size so that the write is queued + // inside the connection actor + clientSideChannel.socket.setSendBufferSize(1024) - // we should test here that a pending write command is properly finished first - object Ack - // set an artificially small send buffer size so that the write is queued - // inside the connection actor - clientSideChannel.socket.setSendBufferSize(1024) + // we send a write and a close command directly afterwards + connectionHandler.send(connectionActor, writeCmd(Ack)) + connectionHandler.send(connectionActor, ConfirmedClose) - // we send a write and a close command directly afterwards - connectionHandler.send(connectionActor, writeCmd(Ack)) - connectionHandler.send(connectionActor, ConfirmedClose) + connectionHandler.expectNoMsg(100.millis) + pullFromServerSide(TestSize) + connectionHandler.expectMsg(Ack) - connectionHandler.expectNoMsg(100.millis) - pullFromServerSide(TestSize) - connectionHandler.expectMsg(Ack) + selector.send(connectionActor, ChannelReadable) + connectionHandler.expectNoMsg(100.millis) // not yet - selector.send(connectionActor, ChannelReadable) - connectionHandler.expectNoMsg(100.millis) // not yet + val buffer = ByteBuffer.allocate(1) + serverSelectionKey must be(selectedAs(SelectionKey.OP_READ, 2.seconds)) + serverSideChannel.read(buffer) must be(-1) - val buffer = ByteBuffer.allocate(1) - serverSelectionKey must be(selectedAs(SelectionKey.OP_READ, 2.seconds)) - serverSideChannel.read(buffer) must be(-1) + closeServerSideAndWaitForClientReadable() - closeServerSideAndWaitForClientReadable() + selector.send(connectionActor, ChannelReadable) + connectionHandler.expectMsg(ConfirmedClosed) - selector.send(connectionActor, ChannelReadable) - connectionHandler.expectMsg(ConfirmedClosed) + assertThisConnectionActorTerminated() + } + } - assertThisConnectionActorTerminated() - } + "report when peer closed the connection" in new EstablishedConnectionTest() { + run { + closeServerSideAndWaitForClientReadable() - "report when peer closed the connection" in withEstablishedConnection() { setup ⇒ - import setup._ - - closeServerSideAndWaitForClientReadable() - - selector.send(connectionActor, ChannelReadable) - connectionHandler.expectMsg(PeerClosed) - - assertThisConnectionActorTerminated() - } - "report when peer closed the connection but allow further writes and acknowledge normal close" in withEstablishedConnection(keepOpenOnPeerClosed = true) { setup ⇒ - import setup._ - - closeServerSideAndWaitForClientReadable(fullClose = false) // send EOF (fin) from the server side - - selector.send(connectionActor, ChannelReadable) - connectionHandler.expectMsg(PeerClosed) - object Ack - connectionHandler.send(connectionActor, writeCmd(Ack)) - pullFromServerSide(TestSize) - connectionHandler.expectMsg(Ack) - connectionHandler.send(connectionActor, Close) - connectionHandler.expectMsg(Closed) - - assertThisConnectionActorTerminated() - } - "report when peer closed the connection but allow further writes and acknowledge confirmed close" in withEstablishedConnection(keepOpenOnPeerClosed = true) { setup ⇒ - import setup._ - - closeServerSideAndWaitForClientReadable(fullClose = false) // send EOF (fin) from the server side - - selector.send(connectionActor, ChannelReadable) - connectionHandler.expectMsg(PeerClosed) - object Ack - connectionHandler.send(connectionActor, writeCmd(Ack)) - pullFromServerSide(TestSize) - connectionHandler.expectMsg(Ack) - connectionHandler.send(connectionActor, ConfirmedClose) - connectionHandler.expectMsg(ConfirmedClosed) - - assertThisConnectionActorTerminated() - } - - "report when peer aborted the connection" in withEstablishedConnection() { setup ⇒ - import setup._ - - EventFilter[IOException](occurrences = 1) intercept { - abortClose(serverSideChannel) selector.send(connectionActor, ChannelReadable) - val err = connectionHandler.expectMsgType[ErrorClosed] - err.cause must be(ConnectionResetByPeerMessage) - } - // wait a while - connectionHandler.expectNoMsg(200.millis) + connectionHandler.expectMsg(PeerClosed) - assertThisConnectionActorTerminated() + assertThisConnectionActorTerminated() + } } - "report when peer closed the connection when trying to write" in withEstablishedConnection() { setup ⇒ - import setup._ + "report when peer closed the connection but allow further writes and acknowledge normal close" in + new EstablishedConnectionTest(keepOpenOnPeerClosed = true) { + run { + closeServerSideAndWaitForClientReadable(fullClose = false) // send EOF (fin) from the server side - val writer = TestProbe() + selector.send(connectionActor, ChannelReadable) + connectionHandler.expectMsg(PeerClosed) + object Ack + connectionHandler.send(connectionActor, writeCmd(Ack)) + pullFromServerSide(TestSize) + connectionHandler.expectMsg(Ack) + connectionHandler.send(connectionActor, Close) + connectionHandler.expectMsg(Closed) - abortClose(serverSideChannel) - EventFilter[IOException](occurrences = 1) intercept { - writer.send(connectionActor, Write(ByteString("testdata"))) - // bother writer and handler should get the message - writer.expectMsgType[ErrorClosed] + assertThisConnectionActorTerminated() + } } - connectionHandler.expectMsgType[ErrorClosed] - assertThisConnectionActorTerminated() + "report when peer closed the connection but allow further writes and acknowledge confirmed close" in + new EstablishedConnectionTest(keepOpenOnPeerClosed = true) { + run { + closeServerSideAndWaitForClientReadable(fullClose = false) // send EOF (fin) from the server side + + selector.send(connectionActor, ChannelReadable) + connectionHandler.expectMsg(PeerClosed) + object Ack + connectionHandler.send(connectionActor, writeCmd(Ack)) + pullFromServerSide(TestSize) + connectionHandler.expectMsg(Ack) + connectionHandler.send(connectionActor, ConfirmedClose) + connectionHandler.expectMsg(ConfirmedClosed) + + assertThisConnectionActorTerminated() + } + } + + "report when peer aborted the connection" in new EstablishedConnectionTest() { + run { + EventFilter[IOException](occurrences = 1) intercept { + abortClose(serverSideChannel) + selector.send(connectionActor, ChannelReadable) + val err = connectionHandler.expectMsgType[ErrorClosed] + err.cause must be(ConnectionResetByPeerMessage) + } + // wait a while + connectionHandler.expectNoMsg(200.millis) + + assertThisConnectionActorTerminated() + } + } + + "report when peer closed the connection when trying to write" in new EstablishedConnectionTest() { + run { + val writer = TestProbe() + + abortClose(serverSideChannel) + EventFilter[IOException](occurrences = 1) intercept { + writer.send(connectionActor, Write(ByteString("testdata"))) + // bother writer and handler should get the message + writer.expectMsgType[ErrorClosed] + } + connectionHandler.expectMsgType[ErrorClosed] + + assertThisConnectionActorTerminated() + } } // This test is disabled on windows, as the assumption that not calling accept on a server socket means that // no TCP level connection has been established with the client does not hold. // RK: I think Windows is no different than any other OS in this regard, there was just a sleep() missing. - "report failed connection attempt while not accepted" in withUnacceptedConnection() { setup ⇒ - import setup._ - ignoreIfWindows + "report failed connection attempt while not accepted" in new UnacceptedConnectionTest { + run { + ignoreIfWindows() - // close instead of accept - localServer.close() + // close instead of accept + localServerChannel.close() - // must give the OS some time to send RST from server to client - Thread.sleep(100) + // must give the OS some time to send RST from server to client + Thread.sleep(100) - EventFilter[SocketException](occurrences = 1) intercept { - selector.send(connectionActor, ChannelConnectable) - userHandler.expectMsg(CommandFailed(Connect(serverAddress))) + EventFilter[SocketException](occurrences = 1) intercept { + selector.send(connectionActor, ChannelConnectable) + userHandler.expectMsg(CommandFailed(Connect(serverAddress))) + } + + verifyActorTermination(connectionActor) } - - verifyActorTermination(connectionActor) } val UnboundAddress = temporaryServerAddress() "report failed connection attempt when target is unreachable" in - withUnacceptedConnection(connectionActorCons = createConnectionActor(serverAddress = UnboundAddress)) { setup ⇒ - import setup._ + new UnacceptedConnectionTest() { + override lazy val connectionActor = createConnectionActor(serverAddress = UnboundAddress) + run { + val sel = SelectorProvider.provider().openSelector() + val key = clientSideChannel.register(sel, SelectionKey.OP_CONNECT | SelectionKey.OP_READ) + // This timeout should be large enough to work on Windows + sel.select(3000) - val sel = SelectorProvider.provider().openSelector() - val key = clientSideChannel.register(sel, SelectionKey.OP_CONNECT | SelectionKey.OP_READ) - // This timeout should be large enough to work on Windows - sel.select(3000) + key.isConnectable must be(true) + EventFilter[ConnectException](occurrences = 1) intercept { + selector.send(connectionActor, ChannelConnectable) + userHandler.expectMsg(CommandFailed(Connect(UnboundAddress))) + } - key.isConnectable must be(true) - EventFilter[ConnectException](occurrences = 1) intercept { + verifyActorTermination(connectionActor) + } + } + + "time out when Connected isn't answered with Register" in new UnacceptedConnectionTest { + run { + localServerChannel.accept() + + EventFilter.warning(pattern = "registration timeout", occurrences = 1) intercept { selector.send(connectionActor, ChannelConnectable) - userHandler.expectMsg(CommandFailed(Connect(UnboundAddress))) + userHandler.expectMsg(Connected(serverAddress, clientSideChannel.socket.getLocalSocketAddress.asInstanceOf[InetSocketAddress])) + + verifyActorTermination(connectionActor) + } + } + } + + "close the connection when user handler dies while connecting" in new UnacceptedConnectionTest { + run { + EventFilter[DeathPactException](occurrences = 1) intercept { + userHandler.ref ! PoisonPill + + verifyActorTermination(connectionActor) + } + } + } + + "close the connection when connection handler dies while connected" in new EstablishedConnectionTest() { + run { + watch(connectionHandler.ref) + watch(connectionActor) + EventFilter[DeathPactException](occurrences = 1) intercept { + system.stop(connectionHandler.ref) + val deaths = Set(expectMsgType[Terminated].actor, expectMsgType[Terminated].actor) + deaths must be(Set(connectionHandler.ref, connectionActor)) + } + } + } + + "support ResumeWriting (backed up)" in new EstablishedConnectionTest() { + run { + val writer = TestProbe() + val write = writeCmd(NoAck) + + // fill up the write buffer until NACK + var written = 0 + while (!writer.msgAvailable) { + writer.send(connectionActor, write) + written += 1 + } + // dump the NACKs + writer.receiveWhile(1.second) { + case CommandFailed(write) ⇒ written -= 1 + } + writer.msgAvailable must be(false) + + // writes must fail now + writer.send(connectionActor, write) + writer.expectMsg(CommandFailed(write)) + writer.send(connectionActor, Write.empty) + writer.expectMsg(CommandFailed(Write.empty)) + + // resuming must not immediately work (queue still full) + writer.send(connectionActor, ResumeWriting) + writer.expectNoMsg(1.second) + + // so drain the queue until it works again + while (!writer.msgAvailable) pullFromServerSide(TestSize) + writer.expectMsg(Duration.Zero, WritingResumed) + + // now write should work again + writer.send(connectionActor, writeCmd("works")) + writer.expectMsg("works") + } + } + + "support ResumeWriting (queue flushed)" in new EstablishedConnectionTest() { + run { + val writer = TestProbe() + val write = writeCmd(NoAck) + + // fill up the write buffer until NACK + var written = 0 + while (!writer.msgAvailable) { + writer.send(connectionActor, write) + written += 1 + } + // dump the NACKs + writer.receiveWhile(1.second) { + case CommandFailed(write) ⇒ written -= 1 } - verifyActorTermination(connectionActor) - } + // drain the queue until it works again + pullFromServerSide(TestSize * written) - "time out when Connected isn't answered with Register" in withUnacceptedConnection() { setup ⇒ - import setup._ - - localServer.accept() - - EventFilter.warning(pattern = "registration timeout", occurrences = 1) intercept { - selector.send(connectionActor, ChannelConnectable) - userHandler.expectMsg(Connected(serverAddress, clientSideChannel.socket.getLocalSocketAddress.asInstanceOf[InetSocketAddress])) - - verifyActorTermination(connectionActor) - } - } - - "close the connection when user handler dies while connecting" in withUnacceptedConnection() { setup ⇒ - import setup._ - - EventFilter[DeathPactException](occurrences = 1) intercept { - userHandler.ref ! PoisonPill - - verifyActorTermination(connectionActor) - } - } - - "close the connection when connection handler dies while connected" in withEstablishedConnection() { setup ⇒ - import setup._ - watch(connectionHandler.ref) - watch(connectionActor) - EventFilter[DeathPactException](occurrences = 1) intercept { - system.stop(connectionHandler.ref) - val deaths = Set(expectMsgType[Terminated].actor, expectMsgType[Terminated].actor) - deaths must be(Set(connectionHandler.ref, connectionActor)) - } - } - - "support ResumeWriting (backed up)" in withEstablishedConnection() { setup ⇒ - import setup._ - - val writer = TestProbe() - val write = writeCmd(NoAck) - - // fill up the write buffer until NACK - var written = 0 - while (!writer.msgAvailable) { + // writes must still fail writer.send(connectionActor, write) - written += 1 + writer.expectMsg(CommandFailed(write)) + writer.send(connectionActor, Write.empty) + writer.expectMsg(CommandFailed(Write.empty)) + + // resuming must work immediately + writer.send(connectionActor, ResumeWriting) + writer.expectMsg(1.second, WritingResumed) + + // now write should work again + writer.send(connectionActor, writeCmd("works")) + writer.expectMsg("works") } - // dump the NACKs - writer.receiveWhile(1.second) { - case CommandFailed(write) ⇒ written -= 1 - } - writer.msgAvailable must be(false) - - // writes must fail now - writer.send(connectionActor, write) - writer.expectMsg(CommandFailed(write)) - writer.send(connectionActor, Write.empty) - writer.expectMsg(CommandFailed(Write.empty)) - - // resuming must not immediately work (queue still full) - writer.send(connectionActor, ResumeWriting) - writer.expectNoMsg(1.second) - - // so drain the queue until it works again - while (!writer.msgAvailable) pullFromServerSide(TestSize) - writer.expectMsg(Duration.Zero, WritingResumed) - - // now write should work again - writer.send(connectionActor, writeCmd("works")) - writer.expectMsg("works") } - "support ResumeWriting (queue flushed)" in withEstablishedConnection() { setup ⇒ - import setup._ + "support useResumeWriting==false (backed up)" in new EstablishedConnectionTest(useResumeWriting = false) { + run { + val writer = TestProbe() + val write = writeCmd(NoAck) - val writer = TestProbe() - val write = writeCmd(NoAck) + // fill up the write buffer until NACK + var written = 0 + while (!writer.msgAvailable) { + writer.send(connectionActor, write) + written += 1 + } + // dump the NACKs + writer.receiveWhile(1.second) { + case CommandFailed(write) ⇒ written -= 1 + } + writer.msgAvailable must be(false) - // fill up the write buffer until NACK - var written = 0 - while (!writer.msgAvailable) { + // writes must fail now writer.send(connectionActor, write) - written += 1 + writer.expectMsg(CommandFailed(write)) + writer.send(connectionActor, Write.empty) + writer.expectMsg(CommandFailed(Write.empty)) + + // so drain the queue until it works again + pullFromServerSide(TestSize * written) + + // now write should work again + writer.send(connectionActor, writeCmd("works")) + writer.expectMsg("works") } - // dump the NACKs - writer.receiveWhile(1.second) { - case CommandFailed(write) ⇒ written -= 1 - } - - // drain the queue until it works again - pullFromServerSide(TestSize * written) - - // writes must still fail - writer.send(connectionActor, write) - writer.expectMsg(CommandFailed(write)) - writer.send(connectionActor, Write.empty) - writer.expectMsg(CommandFailed(Write.empty)) - - // resuming must work immediately - writer.send(connectionActor, ResumeWriting) - writer.expectMsg(1.second, WritingResumed) - - // now write should work again - writer.send(connectionActor, writeCmd("works")) - writer.expectMsg("works") } - "support useResumeWriting==false (backed up)" in withEstablishedConnection(useResumeWriting = false) { setup ⇒ - import setup._ + "support useResumeWriting==false (queue flushed)" in new EstablishedConnectionTest(useResumeWriting = false) { + run { + val writer = TestProbe() + val write = writeCmd(NoAck) - val writer = TestProbe() - val write = writeCmd(NoAck) + // fill up the write buffer until NACK + var written = 0 + while (!writer.msgAvailable) { + writer.send(connectionActor, write) + written += 1 + } + // dump the NACKs + writer.receiveWhile(1.second) { + case CommandFailed(write) ⇒ written -= 1 + } - // fill up the write buffer until NACK - var written = 0 - while (!writer.msgAvailable) { - writer.send(connectionActor, write) - written += 1 + // drain the queue until it works again + pullFromServerSide(TestSize * written) + + // now write should work again + writer.send(connectionActor, writeCmd("works")) + writer.expectMsg("works") } - // dump the NACKs - writer.receiveWhile(1.second) { - case CommandFailed(write) ⇒ written -= 1 - } - writer.msgAvailable must be(false) - - // writes must fail now - writer.send(connectionActor, write) - writer.expectMsg(CommandFailed(write)) - writer.send(connectionActor, Write.empty) - writer.expectMsg(CommandFailed(Write.empty)) - - // so drain the queue until it works again - pullFromServerSide(TestSize * written) - - // now write should work again - writer.send(connectionActor, writeCmd("works")) - writer.expectMsg("works") - } - - "support useResumeWriting==false (queue flushed)" in withEstablishedConnection(useResumeWriting = false) { setup ⇒ - import setup._ - - val writer = TestProbe() - val write = writeCmd(NoAck) - - // fill up the write buffer until NACK - var written = 0 - while (!writer.msgAvailable) { - writer.send(connectionActor, write) - written += 1 - } - // dump the NACKs - writer.receiveWhile(1.second) { - case CommandFailed(write) ⇒ written -= 1 - } - - // drain the queue until it works again - pullFromServerSide(TestSize * written) - - // now write should work again - writer.send(connectionActor, writeCmd("works")) - writer.expectMsg("works") } } + /////////////////////////////////////// TEST SUPPORT //////////////////////////////////////////////// + def acceptServerSideConnection(localServer: ServerSocketChannel): SocketChannel = { @volatile var serverSideChannel: SocketChannel = null awaitCond { @@ -695,40 +706,89 @@ class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms") serverSideChannel } - def withLocalServer(setServerSocketOptions: ServerSocketChannel ⇒ Unit = _ ⇒ ())(body: ServerSocketChannel ⇒ Any): Unit = { - val localServer = ServerSocketChannel.open() - try { - setServerSocketOptions(localServer) - localServer.socket.bind(serverAddress) - localServer.configureBlocking(false) - body(localServer) - } finally localServer.close() + abstract class LocalServerTest { + val localServerChannel = ServerSocketChannel.open() + val userHandler = TestProbe() + val selector = TestProbe() + + def run(body: ⇒ Unit): Unit = { + try { + setServerSocketOptions() + localServerChannel.socket.bind(serverAddress) + localServerChannel.configureBlocking(false) + body + } finally localServerChannel.close() + } + + def setServerSocketOptions() = () + + def createConnectionActor(serverAddress: InetSocketAddress = serverAddress, + options: immutable.Seq[SocketOption] = Nil): TestActorRef[TcpOutgoingConnection] = { + val ref = TestActorRef( + new TcpOutgoingConnection(Tcp(system), userHandler.ref, Connect(serverAddress, options = options)) { + override def postRestart(reason: Throwable): Unit = context.stop(self) // ensure we never restart + override def selector = LocalServerTest.this.selector.ref + }) + ref ! ChannelRegistered + ref + } } - case class UnacceptedSetup( - localServer: ServerSocketChannel, - userHandler: TestProbe, - selector: TestProbe, - connectionActor: TestActorRef[TcpOutgoingConnection], - clientSideChannel: SocketChannel) + trait SmallRcvBuffer { _: LocalServerTest ⇒ + override def setServerSocketOptions(): Unit = localServerChannel.socket.setReceiveBufferSize(1024) + } - case class RegisteredSetup( - unregisteredSetup: UnacceptedSetup, - connectionHandler: TestProbe, - serverSideChannel: SocketChannel) { - def userHandler: TestProbe = unregisteredSetup.userHandler - def selector: TestProbe = unregisteredSetup.selector - def connectionActor: TestActorRef[TcpOutgoingConnection] = unregisteredSetup.connectionActor - def clientSideChannel: SocketChannel = unregisteredSetup.clientSideChannel + abstract class UnacceptedConnectionTest extends LocalServerTest { + // lazy init since potential exceptions must not be triggered in the constructor but during execution of `run` + private[io] lazy val connectionActor = createConnectionActor(serverAddress) + // calling .underlyingActor ensures that the actor is actually created at this point + lazy val clientSideChannel = connectionActor.underlyingActor.channel - val nioSelector = SelectorProvider.provider().openSelector() + override def run(body: ⇒ Unit): Unit = super.run { + selector.expectMsg(RegisterChannel(clientSideChannel, OP_CONNECT)) + body + } + } - val clientSelectionKey = registerChannel(clientSideChannel, "client") - val serverSelectionKey = registerChannel(serverSideChannel, "server") + abstract class EstablishedConnectionTest(keepOpenOnPeerClosed: Boolean = false, useResumeWriting: Boolean = true) + extends UnacceptedConnectionTest { + + // lazy init since potential exceptions must not be triggered in the constructor but during execution of `run` + lazy val serverSideChannel = acceptServerSideConnection(localServerChannel) + lazy val connectionHandler = TestProbe() + lazy val nioSelector = SelectorProvider.provider().openSelector() + lazy val clientSelectionKey = registerChannel(clientSideChannel, "client") + lazy val serverSelectionKey = registerChannel(serverSideChannel, "server") + lazy val defaultbuffer = ByteBuffer.allocate(TestSize) + + override def run(body: ⇒ Unit): Unit = super.run { + try { + serverSideChannel.configureBlocking(false) + serverSideChannel must not be (null) + + selector.send(connectionActor, ChannelConnectable) + userHandler.expectMsg(Connected(serverAddress, clientSideChannel.socket.getLocalSocketAddress.asInstanceOf[InetSocketAddress])) + + userHandler.send(connectionActor, Register(connectionHandler.ref, keepOpenOnPeerClosed, useResumeWriting)) + selector.expectMsg(ReadInterest) + + clientSelectionKey // trigger initialization + serverSelectionKey // trigger initialization + body + } finally { + serverSideChannel.close() + nioSelector.close() + } + } + + val TestSize = 10000 + + def writeCmd(ack: AnyRef) = + Write(ByteString(Array.fill[Byte](TestSize)(0)), ack) def closeServerSideAndWaitForClientReadable(fullClose: Boolean = true): Unit = { if (fullClose) serverSideChannel.close() else serverSideChannel.socket.shutdownOutput() - checkFor(clientSelectionKey, SelectionKey.OP_READ, 3.seconds.toMillis.toInt) must be(true) + checkFor(clientSelectionKey, OP_READ, 3.seconds.toMillis.toInt) must be(true) } def registerChannel(channel: SocketChannel, name: String): SelectionKey = { @@ -754,13 +814,11 @@ class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms") (sel, key) } - val defaultbuffer = ByteBuffer.allocate(TestSize) - /** - * Tries to simultaneously act on client and server side to read from the server - * all pending data from the client. + * Tries to simultaneously act on client and server side to read from the server all pending data from the client. */ - @tailrec final def pullFromServerSide(remaining: Int, remainingTries: Int = 1000, into: ByteBuffer = defaultbuffer): Unit = + @tailrec final def pullFromServerSide(remaining: Int, remainingTries: Int = 1000, + into: ByteBuffer = defaultbuffer): Unit = if (remainingTries <= 0) throw new AssertionError("Pulling took too many loops, remaining data: " + remaining) else if (remaining > 0) { @@ -817,106 +875,20 @@ class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms") "%s key was selected for %s after %s" format (key.attachment(), interestsDesc(interest), duration)) } - val interestsNames = Seq( - SelectionKey.OP_ACCEPT -> "accepting", - SelectionKey.OP_CONNECT -> "connecting", - SelectionKey.OP_READ -> "reading", - SelectionKey.OP_WRITE -> "writing") + val interestsNames = + Seq(OP_ACCEPT -> "accepting", OP_CONNECT -> "connecting", OP_READ -> "reading", OP_WRITE -> "writing") def interestsDesc(interests: Int): String = interestsNames.filter(i ⇒ (i._1 & interests) != 0).map(_._2).mkString(", ") - } - private[io] def withUnacceptedConnection( - setServerSocketOptions: ServerSocketChannel ⇒ Unit = _ ⇒ (), - connectionActorCons: (ActorRef, ActorRef) ⇒ TestActorRef[TcpOutgoingConnection] = createConnectionActor())(body: UnacceptedSetup ⇒ Any): Unit = - - withLocalServer(setServerSocketOptions) { localServer ⇒ - val userHandler = TestProbe() - val selector = TestProbe() - val connectionActor = connectionActorCons(selector.ref, userHandler.ref) - // calling .underlyingActor ensures that the actor is actually created at this point - val clientSideChannel = connectionActor.underlyingActor.channel - - selector.expectMsg(RegisterChannel(clientSideChannel, OP_CONNECT)) - - body { - UnacceptedSetup( - localServer, - userHandler, - selector, - connectionActor, - clientSideChannel) + def abortClose(channel: SocketChannel): Unit = { + try channel.socket.setSoLinger(true, 0) // causes the following close() to send TCP RST + catch { + case NonFatal(e) ⇒ + // setSoLinger can fail due to http://bugs.sun.com/view_bug.do?bug_id=6799574 + // (also affected: OS/X Java 1.6.0_37) + log.debug("setSoLinger(true, 0) failed with {}", e) } + channel.close() } - - def withEstablishedConnection( - setServerSocketOptions: ServerSocketChannel ⇒ Unit = _ ⇒ (), - clientSocketOptions: immutable.Seq[SocketOption] = Nil, - keepOpenOnPeerClosed: Boolean = false, - useResumeWriting: Boolean = true)(body: RegisteredSetup ⇒ Any): Unit = - withUnacceptedConnection(setServerSocketOptions, createConnectionActor(options = clientSocketOptions)) { unregisteredSetup ⇒ - import unregisteredSetup._ - - val serverSideChannel = acceptServerSideConnection(localServer) - serverSideChannel.configureBlocking(false) - - serverSideChannel must not be (null) - selector.send(connectionActor, ChannelConnectable) - userHandler.expectMsg(Connected(serverAddress, clientSideChannel.socket.getLocalSocketAddress.asInstanceOf[InetSocketAddress])) - - val connectionHandler = TestProbe() - userHandler.send(connectionActor, Register(connectionHandler.ref, keepOpenOnPeerClosed, useResumeWriting)) - selector.expectMsg(ReadInterest) - - body { - RegisteredSetup( - unregisteredSetup, - connectionHandler, - serverSideChannel) - } - } - - val TestSize = 10000 - - def writeCmd(ack: AnyRef) = - Write(ByteString(Array.fill[Byte](TestSize)(0)), ack) - - def setSmallRcvBuffer(channel: ServerSocketChannel): Unit = - channel.socket.setReceiveBufferSize(1024) - - def createConnectionActor( - serverAddress: InetSocketAddress = serverAddress, - localAddress: Option[InetSocketAddress] = None, - options: immutable.Seq[SocketOption] = Nil)( - _selector: ActorRef, - commander: ActorRef): TestActorRef[TcpOutgoingConnection] = { - - val ref = TestActorRef( - new TcpOutgoingConnection(Tcp(system), commander, Connect(serverAddress, localAddress, options)) { - override def postRestart(reason: Throwable) { - // ensure we never restart - context.stop(self) - } - override def selector = _selector - }) - - ref ! ChannelRegistered - ref - } - - def abortClose(channel: SocketChannel): Unit = { - try channel.socket.setSoLinger(true, 0) // causes the following close() to send TCP RST - catch { - case NonFatal(e) ⇒ - // setSoLinger can fail due to http://bugs.sun.com/view_bug.do?bug_id=6799574 - // (also affected: OS/X Java 1.6.0_37) - log.debug("setSoLinger(true, 0) failed with {}", e) - } - channel.close() - } - - def abort(channel: SocketChannel) { - channel.socket.setSoLinger(true, 0) - channel.close() } }