further improve pullFromServerSide stability

This commit is contained in:
Johannes Rudolph 2013-01-21 17:26:52 +01:00
parent 3d34f57c5b
commit 6d5458dfeb

View file

@ -340,25 +340,41 @@ class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms")
serverSideChannel.register(nioSelector, SelectionKey.OP_READ | SelectionKey.OP_WRITE)
val buffer = ByteBuffer.allocate(TestSize)
@tailrec final def pullFromServerSide(remaining: Int): Unit =
if (remaining > 0) {
nioSelector.select(100)
if (clientSelectionKey.isValid && clientSelectionKey.isWritable)
selector.send(connectionActor, ChannelWritable)
if (serverSelectionKey.isValid && serverSelectionKey.isReadable) {
buffer.clear()
val read = serverSideChannel.read(buffer)
if (read == 0)
throw new IllegalStateException("Didn't make any progress")
else if (read == -1)
throw new IllegalStateException("Connection was closed unexpectedly with remaining bytes " + remaining)
/**
* Tries to simultaneously act on client and server side to read from the server
* all pending data from the client.
*/
@tailrec final def pullFromServerSide(remaining: Int, waitingForWrite: Boolean = true): Unit =
if (remaining > 0)
pullFromServerSide(remaining - tryReading(), checkForWriteInterest(waitingForWrite))
pullFromServerSide(remaining - read)
} else
pullFromServerSide(remaining)
private def checkForWriteInterest(waitingForWrite: Boolean): Boolean = {
val waitingForWrite0 =
if (selector.msgAvailable) {
selector.expectMsg(WriteInterest)
true
} else waitingForWrite
}
nioSelector.select(1)
if (waitingForWrite0 && clientSelectionKey.isValid && clientSelectionKey.isWritable) {
selector.send(connectionActor, ChannelWritable)
false
} else waitingForWrite0
}
private def tryReading(): Int =
if (serverSelectionKey.isValid && serverSelectionKey.isReadable) {
buffer.clear()
val read = serverSideChannel.read(buffer)
if (read == 0)
throw new IllegalStateException("Didn't make any progress")
else if (read == -1)
throw new IllegalStateException("Connection was closed unexpectedly with remaining bytes " + remaining)
read
} else
0
@tailrec final def expectReceivedString(data: String): Unit = {
data.length must be > 0