diff --git a/akka-io/src/test/scala/akka/io/TcpConnectionSpec.scala b/akka-io/src/test/scala/akka/io/TcpConnectionSpec.scala index 4b6b5ec7c7..55e3675a2b 100644 --- a/akka-io/src/test/scala/akka/io/TcpConnectionSpec.scala +++ b/akka-io/src/test/scala/akka/io/TcpConnectionSpec.scala @@ -6,7 +6,7 @@ package akka.io import scala.annotation.tailrec -import java.nio.channels.{ SelectionKey, SocketChannel, ServerSocketChannel } +import java.nio.channels.{ Selector, SelectionKey, SocketChannel, ServerSocketChannel } import java.nio.ByteBuffer import java.nio.channels.spi.SelectorProvider import java.io.IOException @@ -49,8 +49,9 @@ class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms") "send incoming data to the connection handler" in withEstablishedConnection() { setup ⇒ import setup._ serverSideChannel.write(ByteBuffer.wrap("testdata".getBytes("ASCII"))) - // emulate selector behavior + expectReceivedString("testdata") + // have two packets in flight before the selector notices serverSideChannel.write(ByteBuffer.wrap("testdata2".getBytes("ASCII"))) serverSideChannel.write(ByteBuffer.wrap("testdata3".getBytes("ASCII"))) @@ -60,7 +61,6 @@ class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms") "write data to network (and acknowledge)" in withEstablishedConnection() { setup ⇒ import setup._ - serverSideChannel.configureBlocking(false) object Ack val writer = TestProbe() @@ -331,21 +331,33 @@ class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms") def connectionActor: TestActorRef[TcpOutgoingConnection] = unregisteredSetup.connectionActor def clientSideChannel: SocketChannel = unregisteredSetup.clientSideChannel + val (nioSelector, clientSelectionKey) = { + val sel = SelectorProvider.provider().openSelector() + val key = clientSideChannel.register(sel, SelectionKey.OP_READ | SelectionKey.OP_WRITE) + (sel, key) + } + val serverSelectionKey = + serverSideChannel.register(nioSelector, SelectionKey.OP_READ | SelectionKey.OP_WRITE) + val buffer = ByteBuffer.allocate(TestSize) @tailrec final def pullFromServerSide(remaining: Int): Unit = if (remaining > 0) { - if (selector.msgAvailable) { - selector.expectMsg(WriteInterest) + nioSelector.select(100) + if (clientSelectionKey.isValid && clientSelectionKey.isWritable) selector.send(connectionActor, ChannelWritable) - } - buffer.clear() - val read = serverSideChannel.read(buffer) - if (read == 0) - throw new IllegalStateException("Didn't make any progress") - else if (read == -1) - throw new IllegalStateException("Connection was closed unexpectedly with remaining bytes " + remaining) - pullFromServerSide(remaining - read) + if (serverSelectionKey.isValid && serverSelectionKey.isReadable) { + buffer.clear() + val read = serverSideChannel.read(buffer) + if (read == 0) + throw new IllegalStateException("Didn't make any progress") + else if (read == -1) + throw new IllegalStateException("Connection was closed unexpectedly with remaining bytes " + remaining) + + pullFromServerSide(remaining - read) + } else + pullFromServerSide(remaining) + } @tailrec final def expectReceivedString(data: String): Unit = { @@ -391,6 +403,7 @@ class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms") localServer.configureBlocking(true) val serverSideChannel = localServer.accept() + serverSideChannel.configureBlocking(false) serverSideChannel must not be (null) selector.send(connectionActor, ChannelConnectable)