From 6d5458dfebde084e52b371cc1c77844e4cd6269e Mon Sep 17 00:00:00 2001 From: Johannes Rudolph Date: Mon, 21 Jan 2013 17:26:52 +0100 Subject: [PATCH] further improve pullFromServerSide stability --- .../scala/akka/io/TcpConnectionSpec.scala | 48 ++++++++++++------- 1 file changed, 32 insertions(+), 16 deletions(-) diff --git a/akka-io/src/test/scala/akka/io/TcpConnectionSpec.scala b/akka-io/src/test/scala/akka/io/TcpConnectionSpec.scala index 55e3675a2b..01f04fb147 100644 --- a/akka-io/src/test/scala/akka/io/TcpConnectionSpec.scala +++ b/akka-io/src/test/scala/akka/io/TcpConnectionSpec.scala @@ -340,25 +340,41 @@ class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms") serverSideChannel.register(nioSelector, SelectionKey.OP_READ | SelectionKey.OP_WRITE) val buffer = ByteBuffer.allocate(TestSize) - @tailrec final def pullFromServerSide(remaining: Int): Unit = - if (remaining > 0) { - nioSelector.select(100) - if (clientSelectionKey.isValid && clientSelectionKey.isWritable) - selector.send(connectionActor, ChannelWritable) - if (serverSelectionKey.isValid && serverSelectionKey.isReadable) { - buffer.clear() - val read = serverSideChannel.read(buffer) - if (read == 0) - throw new IllegalStateException("Didn't make any progress") - else if (read == -1) - throw new IllegalStateException("Connection was closed unexpectedly with remaining bytes " + remaining) + /** + * Tries to simultaneously act on client and server side to read from the server + * all pending data from the client. + */ + @tailrec final def pullFromServerSide(remaining: Int, waitingForWrite: Boolean = true): Unit = + if (remaining > 0) + pullFromServerSide(remaining - tryReading(), checkForWriteInterest(waitingForWrite)) - pullFromServerSide(remaining - read) - } else - pullFromServerSide(remaining) + private def checkForWriteInterest(waitingForWrite: Boolean): Boolean = { + val waitingForWrite0 = + if (selector.msgAvailable) { + selector.expectMsg(WriteInterest) + true + } else waitingForWrite - } + nioSelector.select(1) + + if (waitingForWrite0 && clientSelectionKey.isValid && clientSelectionKey.isWritable) { + selector.send(connectionActor, ChannelWritable) + false + } else waitingForWrite0 + } + private def tryReading(): Int = + if (serverSelectionKey.isValid && serverSelectionKey.isReadable) { + buffer.clear() + val read = serverSideChannel.read(buffer) + if (read == 0) + throw new IllegalStateException("Didn't make any progress") + else if (read == -1) + throw new IllegalStateException("Connection was closed unexpectedly with remaining bytes " + remaining) + + read + } else + 0 @tailrec final def expectReceivedString(data: String): Unit = { data.length must be > 0