TcpConnection: forward received data to handler immediately without concatenating buffers
For these reasons: - pipeline effect will allow to start processing on the first part of the data immediately in parallel - data in `Received` messages is now always a simple ByteStrings which will improve iteration speed in the next layer - code becomes simpler
This commit is contained in:
parent
88f7e28c6b
commit
fc6b7830a9
2 changed files with 18 additions and 29 deletions
|
|
@ -107,25 +107,26 @@ class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms")
|
||||||
expectReceivedString("testdata2testdata3")
|
expectReceivedString("testdata2testdata3")
|
||||||
}
|
}
|
||||||
|
|
||||||
"bundle incoming Received messages as long as more data is available" in withEstablishedConnection(
|
"forward incoming data as Received messages instantly as long as more data is available" in withEstablishedConnection(
|
||||||
clientSocketOptions = List(Inet.SO.ReceiveBufferSize(1000000)) // to make sure enough data gets through
|
clientSocketOptions = List(Inet.SO.ReceiveBufferSize(1000000)) // to make sure enough data gets through
|
||||||
) { setup ⇒
|
) { setup ⇒
|
||||||
import setup._
|
import setup._
|
||||||
|
|
||||||
val DataSize = 1000000
|
val bufferSize = Tcp(system).Settings.DirectBufferSize
|
||||||
|
val DataSize = bufferSize + 1500
|
||||||
val bigData = new Array[Byte](DataSize)
|
val bigData = new Array[Byte](DataSize)
|
||||||
val buffer = ByteBuffer.wrap(bigData)
|
val buffer = ByteBuffer.wrap(bigData)
|
||||||
|
|
||||||
serverSideChannel.socket.setSendBufferSize(150000)
|
serverSideChannel.socket.setSendBufferSize(150000)
|
||||||
val wrote = serverSideChannel.write(buffer)
|
val wrote = serverSideChannel.write(buffer)
|
||||||
wrote must be > 140000
|
wrote must be(DataSize)
|
||||||
|
|
||||||
expectNoMsg(1000.millis) // data should have been transferred fully by now
|
expectNoMsg(1000.millis) // data should have been transferred fully by now
|
||||||
|
|
||||||
selector.send(connectionActor, ChannelReadable)
|
selector.send(connectionActor, ChannelReadable)
|
||||||
|
|
||||||
// 140000 is more than the direct buffer size
|
connectionHandler.expectMsgType[Received].data.length must be(bufferSize)
|
||||||
connectionHandler.expectMsgType[Received].data.length must be > 140000
|
connectionHandler.expectMsgType[Received].data.length must be(1500)
|
||||||
}
|
}
|
||||||
|
|
||||||
"receive data directly when the connection is established" in withUnacceptedConnection() { unregisteredSetup ⇒
|
"receive data directly when the connection is established" in withUnacceptedConnection() { unregisteredSetup ⇒
|
||||||
|
|
|
||||||
|
|
@ -122,7 +122,7 @@ private[io] abstract class TcpConnection(val channel: SocketChannel,
|
||||||
}
|
}
|
||||||
|
|
||||||
def doRead(handler: ActorRef, closeCommander: Option[ActorRef]): Unit = {
|
def doRead(handler: ActorRef, closeCommander: Option[ActorRef]): Unit = {
|
||||||
@tailrec def innerRead(buffer: ByteBuffer, receivedData: ByteString, remainingLimit: Int): ReadResult =
|
@tailrec def innerRead(buffer: ByteBuffer, remainingLimit: Int): ReadResult =
|
||||||
if (remainingLimit > 0) {
|
if (remainingLimit > 0) {
|
||||||
// never read more than the configured limit
|
// never read more than the configured limit
|
||||||
buffer.clear()
|
buffer.clear()
|
||||||
|
|
@ -131,33 +131,22 @@ private[io] abstract class TcpConnection(val channel: SocketChannel,
|
||||||
val readBytes = channel.read(buffer)
|
val readBytes = channel.read(buffer)
|
||||||
buffer.flip()
|
buffer.flip()
|
||||||
|
|
||||||
val totalData = receivedData ++ ByteString(buffer)
|
if (TraceLogging) log.debug("Read [{}] bytes.", readBytes)
|
||||||
|
if (readBytes > 0) handler ! Received(ByteString(buffer))
|
||||||
|
|
||||||
readBytes match {
|
readBytes match {
|
||||||
case `maxBufferSpace` ⇒ innerRead(buffer, totalData, remainingLimit - maxBufferSpace)
|
case `maxBufferSpace` ⇒ innerRead(buffer, remainingLimit - maxBufferSpace)
|
||||||
case x if totalData.length > 0 ⇒ GotCompleteData(totalData)
|
case x if x >= 0 ⇒ AllRead
|
||||||
case 0 ⇒ NoData
|
case -1 ⇒ EndOfStream
|
||||||
case -1 ⇒ EndOfStream
|
|
||||||
case _ ⇒
|
case _ ⇒
|
||||||
throw new IllegalStateException("Unexpected value returned from read: " + readBytes)
|
throw new IllegalStateException("Unexpected value returned from read: " + readBytes)
|
||||||
}
|
}
|
||||||
} else MoreDataWaiting(receivedData)
|
} else MoreDataWaiting
|
||||||
|
|
||||||
val buffer = bufferPool.acquire()
|
val buffer = bufferPool.acquire()
|
||||||
try innerRead(buffer, ByteString.empty, ReceivedMessageSizeLimit) match {
|
try innerRead(buffer, ReceivedMessageSizeLimit) match {
|
||||||
case NoData ⇒
|
case AllRead ⇒ selector ! ReadInterest
|
||||||
if (TraceLogging) log.debug("Read nothing.")
|
case MoreDataWaiting ⇒ self ! ChannelReadable
|
||||||
selector ! ReadInterest
|
|
||||||
case GotCompleteData(data) ⇒
|
|
||||||
if (TraceLogging) log.debug("Read [{}] bytes.", data.length)
|
|
||||||
|
|
||||||
handler ! Received(data)
|
|
||||||
selector ! ReadInterest
|
|
||||||
case MoreDataWaiting(data) ⇒
|
|
||||||
if (TraceLogging) log.debug("Read [{}] bytes. More data waiting.", data.length)
|
|
||||||
|
|
||||||
handler ! Received(data)
|
|
||||||
self ! ChannelReadable
|
|
||||||
case EndOfStream ⇒
|
case EndOfStream ⇒
|
||||||
if (TraceLogging) log.debug("Read returned end-of-stream")
|
if (TraceLogging) log.debug("Read returned end-of-stream")
|
||||||
doCloseConnection(handler, closeCommander, closeReason)
|
doCloseConnection(handler, closeCommander, closeReason)
|
||||||
|
|
@ -307,10 +296,9 @@ private[io] abstract class TcpConnection(val channel: SocketChannel,
|
||||||
*/
|
*/
|
||||||
private[io] object TcpConnection {
|
private[io] object TcpConnection {
|
||||||
sealed trait ReadResult
|
sealed trait ReadResult
|
||||||
object NoData extends ReadResult
|
|
||||||
object EndOfStream extends ReadResult
|
object EndOfStream extends ReadResult
|
||||||
case class GotCompleteData(data: ByteString) extends ReadResult
|
object AllRead extends ReadResult
|
||||||
case class MoreDataWaiting(data: ByteString) extends ReadResult
|
object MoreDataWaiting extends ReadResult
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Used to transport information to the postStop method to notify
|
* Used to transport information to the postStop method to notify
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue