aggregate received data as long as there's data in kernel buffers or 'received-message-size-limit' is reached, see #2886

This commit is contained in:
Johannes Rudolph 2013-01-24 15:08:42 +01:00
parent 16945b743c
commit f6fb147afc
4 changed files with 77 additions and 23 deletions

View file

@ -52,7 +52,7 @@ akka {
# The number of bytes per direct buffer in the pool used to read or write # The number of bytes per direct buffer in the pool used to read or write
# network data from the kernel. # network data from the kernel.
direct-buffer-size = 131072 direct-buffer-size = 131072 # 128 KiB
# The maximal number of direct buffers kept in the direct buffer pool for # The maximal number of direct buffers kept in the direct buffer pool for
# reuse. # reuse.
@ -62,6 +62,11 @@ akka {
# its commander before aborting the connection. # its commander before aborting the connection.
register-timeout = 5s register-timeout = 5s
# The maximum number of bytes delivered by a `Received` message. Before
# more data is read from the network the connection actor will try to
# do other work.
received-message-size-limit = unlimited
# Enable fine grained logging of what goes on inside the implementation. # Enable fine grained logging of what goes on inside the implementation.
# Be aware that this may log more than once per message sent to the actors # Be aware that this may log more than once per message sent to the actors
# of the tcp implementation. # of the tcp implementation.

View file

@ -199,6 +199,10 @@ class TcpExt(system: ExtendedActorSystem) extends IO.Extension {
case "infinite" Duration.Undefined case "infinite" Duration.Undefined
case x Duration(x) case x Duration(x)
} }
val ReceivedMessageSizeLimit = getString("received-message-size-limit") match {
case "unlimited" Int.MaxValue
case x getInt("received-message-size-limit")
}
val SelectorDispatcher = getString("selector-dispatcher") val SelectorDispatcher = getString("selector-dispatcher")
val WorkerDispatcher = getString("worker-dispatcher") val WorkerDispatcher = getString("worker-dispatcher")
val ManagementDispatcher = getString("management-dispatcher") val ManagementDispatcher = getString("management-dispatcher")

View file

@ -23,6 +23,7 @@ import TcpSelector._
private[io] abstract class TcpConnection(val channel: SocketChannel, private[io] abstract class TcpConnection(val channel: SocketChannel,
val tcp: TcpExt) extends Actor with ActorLogging with WithBufferPool { val tcp: TcpExt) extends Actor with ActorLogging with WithBufferPool {
import tcp.Settings._ import tcp.Settings._
import TcpConnection._
var pendingWrite: PendingWrite = null var pendingWrite: PendingWrite = null
// Needed to send the ConnectionClosed message in the postStop handler. // Needed to send the ConnectionClosed message in the postStop handler.
@ -115,32 +116,47 @@ private[io] abstract class TcpConnection(val channel: SocketChannel,
} }
def doRead(handler: ActorRef, closeCommander: Option[ActorRef]): Unit = { def doRead(handler: ActorRef, closeCommander: Option[ActorRef]): Unit = {
@tailrec def innerRead(buffer: ByteBuffer, receivedData: ByteString, remainingLimit: Int): ReadResult =
if (remainingLimit > 0) {
// never read more than the configured limit
buffer.clear()
buffer.limit(math.min(DirectBufferSize, remainingLimit))
val readBytes = channel.read(buffer)
buffer.flip()
val totalData = receivedData ++ ByteString(buffer)
readBytes match {
case DirectBufferSize innerRead(buffer, totalData, remainingLimit - DirectBufferSize)
case x if totalData.length > 0 GotCompleteData(totalData)
case 0 NoData
case -1 EndOfStream
case _
throw new IllegalStateException("Unexpected value returned from read: " + readBytes)
}
} else MoreDataWaiting(receivedData)
val buffer = acquireBuffer() val buffer = acquireBuffer()
try innerRead(buffer, ByteString.empty, ReceivedMessageSizeLimit) match {
try { case NoData
val readBytes = channel.read(buffer) if (TraceLogging) log.debug("Read nothing.")
buffer.flip()
if (readBytes > 0) {
if (TraceLogging) log.debug("Read {} bytes", readBytes)
handler ! Received(ByteString(buffer))
releaseBuffer(buffer)
if (readBytes == buffer.capacity())
// directly try reading more because we exhausted our buffer
self ! ChannelReadable
else selector ! ReadInterest
} else if (readBytes == 0) {
if (TraceLogging) log.debug("Read nothing. Registering read interest with selector")
selector ! ReadInterest selector ! ReadInterest
} else if (readBytes == -1) { case GotCompleteData(data)
if (TraceLogging) log.debug("Read {} bytes.", data.length)
handler ! Received(data)
selector ! ReadInterest
case MoreDataWaiting(data)
if (TraceLogging) log.debug("Read {} bytes. More data waiting.", data.length)
handler ! Received(data)
self ! ChannelReadable
case EndOfStream
if (TraceLogging) log.debug("Read returned end-of-stream") if (TraceLogging) log.debug("Read returned end-of-stream")
doCloseConnection(handler, closeCommander, closeReason) doCloseConnection(handler, closeCommander, closeReason)
} else throw new IllegalStateException("Unexpected value returned from read: " + readBytes)
} catch { } catch {
case e: IOException handleError(handler, e) case e: IOException handleError(handler, e)
} } finally releaseBuffer(buffer)
} }
final def doWrite(handler: ActorRef): Unit = { final def doWrite(handler: ActorRef): Unit = {
@ -277,12 +293,20 @@ private[io] abstract class TcpConnection(val channel: SocketChannel,
PendingWrite(sender, write.ack, write.data.drop(copied), buffer) PendingWrite(sender, write.ack, write.data.drop(copied), buffer)
} }
}
private[io] object TcpConnection {
sealed trait ReadResult
object NoData extends ReadResult
object EndOfStream extends ReadResult
case class GotCompleteData(data: ByteString) extends ReadResult
case class MoreDataWaiting(data: ByteString) extends ReadResult
/** /**
* Used to transport information to the postStop method to notify * Used to transport information to the postStop method to notify
* interested party about a connection close. * interested party about a connection close.
*/ */
private[TcpConnection] case class CloseInformation( case class CloseInformation(
notificationsTo: Set[ActorRef], notificationsTo: Set[ActorRef],
closedEvent: ConnectionClosed) closedEvent: ConnectionClosed)
} }

View file

@ -61,6 +61,25 @@ class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms")
expectReceivedString("testdata2testdata3") expectReceivedString("testdata2testdata3")
} }
"bundle incoming Received messages as long as more data is available" in withEstablishedConnection(
clientSocketOptions = List(SO.ReceiveBufferSize(1000000)) // to make sure enough data gets through
) { setup
import setup._
val DataSize = 1000000
val bigData = new Array[Byte](DataSize)
val buffer = ByteBuffer.wrap(bigData)
val wrote = serverSideChannel.write(buffer)
wrote must be > 140000
expectNoMsg(1000.millis) // data should have been transferred fully by now
selector.send(connectionActor, ChannelReadable)
// 140000 is more than the direct buffer size
connectionHandler.expectMsgType[Received].data.length must be > 140000
}
"receive data directly when the connection is established" in withUnacceptedConnection() { unregisteredSetup "receive data directly when the connection is established" in withUnacceptedConnection() { unregisteredSetup
import unregisteredSetup._ import unregisteredSetup._
@ -491,7 +510,9 @@ class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms")
clientSideChannel) clientSideChannel)
} }
} }
def withEstablishedConnection(setServerSocketOptions: ServerSocketChannel Unit = _ ())(body: RegisteredSetup Any): Unit = withUnacceptedConnection(setServerSocketOptions) { unregisteredSetup def withEstablishedConnection(
setServerSocketOptions: ServerSocketChannel Unit = _ (),
clientSocketOptions: immutable.Seq[SocketOption] = Nil)(body: RegisteredSetup Any): Unit = withUnacceptedConnection(setServerSocketOptions, createConnectionActor(options = clientSocketOptions)) { unregisteredSetup
import unregisteredSetup._ import unregisteredSetup._
localServer.configureBlocking(true) localServer.configureBlocking(true)