improve stability of tests, pullFromServerSide now uses real selector to check for conditions

This commit is contained in:
Johannes Rudolph 2013-01-21 17:05:44 +01:00
parent b4baf66442
commit 3d34f57c5b

View file

@ -6,7 +6,7 @@ package akka.io
import scala.annotation.tailrec
import java.nio.channels.{ SelectionKey, SocketChannel, ServerSocketChannel }
import java.nio.channels.{ Selector, SelectionKey, SocketChannel, ServerSocketChannel }
import java.nio.ByteBuffer
import java.nio.channels.spi.SelectorProvider
import java.io.IOException
@ -49,8 +49,9 @@ class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms")
"send incoming data to the connection handler" in withEstablishedConnection() { setup
import setup._
serverSideChannel.write(ByteBuffer.wrap("testdata".getBytes("ASCII")))
// emulate selector behavior
expectReceivedString("testdata")
// have two packets in flight before the selector notices
serverSideChannel.write(ByteBuffer.wrap("testdata2".getBytes("ASCII")))
serverSideChannel.write(ByteBuffer.wrap("testdata3".getBytes("ASCII")))
@ -60,7 +61,6 @@ class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms")
"write data to network (and acknowledge)" in withEstablishedConnection() { setup
import setup._
serverSideChannel.configureBlocking(false)
object Ack
val writer = TestProbe()
@ -331,21 +331,33 @@ class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms")
def connectionActor: TestActorRef[TcpOutgoingConnection] = unregisteredSetup.connectionActor
def clientSideChannel: SocketChannel = unregisteredSetup.clientSideChannel
val (nioSelector, clientSelectionKey) = {
val sel = SelectorProvider.provider().openSelector()
val key = clientSideChannel.register(sel, SelectionKey.OP_READ | SelectionKey.OP_WRITE)
(sel, key)
}
val serverSelectionKey =
serverSideChannel.register(nioSelector, SelectionKey.OP_READ | SelectionKey.OP_WRITE)
val buffer = ByteBuffer.allocate(TestSize)
@tailrec final def pullFromServerSide(remaining: Int): Unit =
if (remaining > 0) {
if (selector.msgAvailable) {
selector.expectMsg(WriteInterest)
nioSelector.select(100)
if (clientSelectionKey.isValid && clientSelectionKey.isWritable)
selector.send(connectionActor, ChannelWritable)
}
buffer.clear()
val read = serverSideChannel.read(buffer)
if (read == 0)
throw new IllegalStateException("Didn't make any progress")
else if (read == -1)
throw new IllegalStateException("Connection was closed unexpectedly with remaining bytes " + remaining)
pullFromServerSide(remaining - read)
if (serverSelectionKey.isValid && serverSelectionKey.isReadable) {
buffer.clear()
val read = serverSideChannel.read(buffer)
if (read == 0)
throw new IllegalStateException("Didn't make any progress")
else if (read == -1)
throw new IllegalStateException("Connection was closed unexpectedly with remaining bytes " + remaining)
pullFromServerSide(remaining - read)
} else
pullFromServerSide(remaining)
}
@tailrec final def expectReceivedString(data: String): Unit = {
@ -391,6 +403,7 @@ class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms")
localServer.configureBlocking(true)
val serverSideChannel = localServer.accept()
serverSideChannel.configureBlocking(false)
serverSideChannel must not be (null)
selector.send(connectionActor, ChannelConnectable)