Merge pull request #1294 from spray/tcp-connection-immediate-Received-message

TcpConnection: forward received data to handler immediately without concatenating buffers
This commit is contained in:
Roland Kuhn 2013-04-08 11:00:39 -07:00
commit 7a8dbda95d
2 changed files with 18 additions and 29 deletions

View file

@ -108,25 +108,26 @@ class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms")
expectReceivedString("testdata2testdata3") expectReceivedString("testdata2testdata3")
} }
"bundle incoming Received messages as long as more data is available" in withEstablishedConnection( "forward incoming data as Received messages instantly as long as more data is available" in withEstablishedConnection(
clientSocketOptions = List(Inet.SO.ReceiveBufferSize(1000000)) // to make sure enough data gets through clientSocketOptions = List(Inet.SO.ReceiveBufferSize(1000000)) // to make sure enough data gets through
) { setup ) { setup
import setup._ import setup._
val DataSize = 1000000 val bufferSize = Tcp(system).Settings.DirectBufferSize
val DataSize = bufferSize + 1500
val bigData = new Array[Byte](DataSize) val bigData = new Array[Byte](DataSize)
val buffer = ByteBuffer.wrap(bigData) val buffer = ByteBuffer.wrap(bigData)
serverSideChannel.socket.setSendBufferSize(150000) serverSideChannel.socket.setSendBufferSize(150000)
val wrote = serverSideChannel.write(buffer) val wrote = serverSideChannel.write(buffer)
wrote must be > 140000 wrote must be(DataSize)
expectNoMsg(1000.millis) // data should have been transferred fully by now expectNoMsg(1000.millis) // data should have been transferred fully by now
selector.send(connectionActor, ChannelReadable) selector.send(connectionActor, ChannelReadable)
// 140000 is more than the direct buffer size connectionHandler.expectMsgType[Received].data.length must be(bufferSize)
connectionHandler.expectMsgType[Received].data.length must be > 140000 connectionHandler.expectMsgType[Received].data.length must be(1500)
} }
"receive data directly when the connection is established" in withUnacceptedConnection() { unregisteredSetup "receive data directly when the connection is established" in withUnacceptedConnection() { unregisteredSetup

View file

@ -122,7 +122,7 @@ private[io] abstract class TcpConnection(val channel: SocketChannel,
} }
def doRead(handler: ActorRef, closeCommander: Option[ActorRef]): Unit = { def doRead(handler: ActorRef, closeCommander: Option[ActorRef]): Unit = {
@tailrec def innerRead(buffer: ByteBuffer, receivedData: ByteString, remainingLimit: Int): ReadResult = @tailrec def innerRead(buffer: ByteBuffer, remainingLimit: Int): ReadResult =
if (remainingLimit > 0) { if (remainingLimit > 0) {
// never read more than the configured limit // never read more than the configured limit
buffer.clear() buffer.clear()
@ -131,33 +131,22 @@ private[io] abstract class TcpConnection(val channel: SocketChannel,
val readBytes = channel.read(buffer) val readBytes = channel.read(buffer)
buffer.flip() buffer.flip()
val totalData = receivedData ++ ByteString(buffer) if (TraceLogging) log.debug("Read [{}] bytes.", readBytes)
if (readBytes > 0) handler ! Received(ByteString(buffer))
readBytes match { readBytes match {
case `maxBufferSpace` innerRead(buffer, totalData, remainingLimit - maxBufferSpace) case `maxBufferSpace` innerRead(buffer, remainingLimit - maxBufferSpace)
case x if totalData.length > 0 GotCompleteData(totalData) case x if x >= 0 AllRead
case 0 NoData case -1 EndOfStream
case -1 EndOfStream
case _ case _
throw new IllegalStateException("Unexpected value returned from read: " + readBytes) throw new IllegalStateException("Unexpected value returned from read: " + readBytes)
} }
} else MoreDataWaiting(receivedData) } else MoreDataWaiting
val buffer = bufferPool.acquire() val buffer = bufferPool.acquire()
try innerRead(buffer, ByteString.empty, ReceivedMessageSizeLimit) match { try innerRead(buffer, ReceivedMessageSizeLimit) match {
case NoData case AllRead selector ! ReadInterest
if (TraceLogging) log.debug("Read nothing.") case MoreDataWaiting self ! ChannelReadable
selector ! ReadInterest
case GotCompleteData(data)
if (TraceLogging) log.debug("Read [{}] bytes.", data.length)
handler ! Received(data)
selector ! ReadInterest
case MoreDataWaiting(data)
if (TraceLogging) log.debug("Read [{}] bytes. More data waiting.", data.length)
handler ! Received(data)
self ! ChannelReadable
case EndOfStream case EndOfStream
if (TraceLogging) log.debug("Read returned end-of-stream") if (TraceLogging) log.debug("Read returned end-of-stream")
doCloseConnection(handler, closeCommander, closeReason) doCloseConnection(handler, closeCommander, closeReason)
@ -300,10 +289,9 @@ private[io] abstract class TcpConnection(val channel: SocketChannel,
*/ */
private[io] object TcpConnection { private[io] object TcpConnection {
sealed trait ReadResult sealed trait ReadResult
object NoData extends ReadResult
object EndOfStream extends ReadResult object EndOfStream extends ReadResult
case class GotCompleteData(data: ByteString) extends ReadResult object AllRead extends ReadResult
case class MoreDataWaiting(data: ByteString) extends ReadResult object MoreDataWaiting extends ReadResult
/** /**
* Used to transport information to the postStop method to notify * Used to transport information to the postStop method to notify