diff --git a/akka-io/src/main/resources/reference.conf b/akka-io/src/main/resources/reference.conf index 65626a1ea9..54df6977f9 100644 --- a/akka-io/src/main/resources/reference.conf +++ b/akka-io/src/main/resources/reference.conf @@ -52,7 +52,7 @@ akka { # The number of bytes per direct buffer in the pool used to read or write # network data from the kernel. - direct-buffer-size = 131072 + direct-buffer-size = 131072 # 128 KiB # The maximal number of direct buffers kept in the direct buffer pool for # reuse. @@ -62,6 +62,11 @@ akka { # its commander before aborting the connection. register-timeout = 5s + # The maximum number of bytes delivered by a `Received` message. Before + # more data is read from the network the connection actor will try to + # do other work. + received-message-size-limit = unlimited + # Enable fine grained logging of what goes on inside the implementation. # Be aware that this may log more than once per message sent to the actors # of the tcp implementation. diff --git a/akka-io/src/main/scala/akka/io/Tcp.scala b/akka-io/src/main/scala/akka/io/Tcp.scala index 26781718e8..0df8e0e0a6 100644 --- a/akka-io/src/main/scala/akka/io/Tcp.scala +++ b/akka-io/src/main/scala/akka/io/Tcp.scala @@ -199,6 +199,10 @@ class TcpExt(system: ExtendedActorSystem) extends IO.Extension { case "infinite" ⇒ Duration.Undefined case x ⇒ Duration(x) } + val ReceivedMessageSizeLimit = getString("received-message-size-limit") match { + case "unlimited" ⇒ Int.MaxValue + case x ⇒ getInt("received-message-size-limit") + } val SelectorDispatcher = getString("selector-dispatcher") val WorkerDispatcher = getString("worker-dispatcher") val ManagementDispatcher = getString("management-dispatcher") diff --git a/akka-io/src/main/scala/akka/io/TcpConnection.scala b/akka-io/src/main/scala/akka/io/TcpConnection.scala index 25db734d79..00b8ffef24 100644 --- a/akka-io/src/main/scala/akka/io/TcpConnection.scala +++ b/akka-io/src/main/scala/akka/io/TcpConnection.scala @@ -23,6 +23,7 @@ import TcpSelector._ private[io] abstract class TcpConnection(val channel: SocketChannel, val tcp: TcpExt) extends Actor with ActorLogging with WithBufferPool { import tcp.Settings._ + import TcpConnection._ var pendingWrite: PendingWrite = null // Needed to send the ConnectionClosed message in the postStop handler. @@ -115,32 +116,47 @@ private[io] abstract class TcpConnection(val channel: SocketChannel, } def doRead(handler: ActorRef, closeCommander: Option[ActorRef]): Unit = { + @tailrec def innerRead(buffer: ByteBuffer, receivedData: ByteString, remainingLimit: Int): ReadResult = + if (remainingLimit > 0) { + // never read more than the configured limit + buffer.clear() + buffer.limit(math.min(DirectBufferSize, remainingLimit)) + val readBytes = channel.read(buffer) + buffer.flip() + + val totalData = receivedData ++ ByteString(buffer) + + readBytes match { + case DirectBufferSize ⇒ innerRead(buffer, totalData, remainingLimit - DirectBufferSize) + case x if totalData.length > 0 ⇒ GotCompleteData(totalData) + case 0 ⇒ NoData + case -1 ⇒ EndOfStream + case _ ⇒ + throw new IllegalStateException("Unexpected value returned from read: " + readBytes) + } + } else MoreDataWaiting(receivedData) + val buffer = acquireBuffer() - - try { - val readBytes = channel.read(buffer) - buffer.flip() - - if (readBytes > 0) { - if (TraceLogging) log.debug("Read {} bytes", readBytes) - handler ! Received(ByteString(buffer)) - releaseBuffer(buffer) - - if (readBytes == buffer.capacity()) - // directly try reading more because we exhausted our buffer - self ! ChannelReadable - else selector ! ReadInterest - } else if (readBytes == 0) { - if (TraceLogging) log.debug("Read nothing. Registering read interest with selector") + try innerRead(buffer, ByteString.empty, ReceivedMessageSizeLimit) match { + case NoData ⇒ + if (TraceLogging) log.debug("Read nothing.") selector ! ReadInterest - } else if (readBytes == -1) { + case GotCompleteData(data) ⇒ + if (TraceLogging) log.debug("Read {} bytes.", data.length) + + handler ! Received(data) + selector ! ReadInterest + case MoreDataWaiting(data) ⇒ + if (TraceLogging) log.debug("Read {} bytes. More data waiting.", data.length) + + handler ! Received(data) + self ! ChannelReadable + case EndOfStream ⇒ if (TraceLogging) log.debug("Read returned end-of-stream") doCloseConnection(handler, closeCommander, closeReason) - } else throw new IllegalStateException("Unexpected value returned from read: " + readBytes) - } catch { case e: IOException ⇒ handleError(handler, e) - } + } finally releaseBuffer(buffer) } final def doWrite(handler: ActorRef): Unit = { @@ -277,12 +293,20 @@ private[io] abstract class TcpConnection(val channel: SocketChannel, PendingWrite(sender, write.ack, write.data.drop(copied), buffer) } +} + +private[io] object TcpConnection { + sealed trait ReadResult + object NoData extends ReadResult + object EndOfStream extends ReadResult + case class GotCompleteData(data: ByteString) extends ReadResult + case class MoreDataWaiting(data: ByteString) extends ReadResult /** * Used to transport information to the postStop method to notify * interested party about a connection close. */ - private[TcpConnection] case class CloseInformation( + case class CloseInformation( notificationsTo: Set[ActorRef], closedEvent: ConnectionClosed) } diff --git a/akka-io/src/test/scala/akka/io/TcpConnectionSpec.scala b/akka-io/src/test/scala/akka/io/TcpConnectionSpec.scala index 3232a68caf..b9b02313af 100644 --- a/akka-io/src/test/scala/akka/io/TcpConnectionSpec.scala +++ b/akka-io/src/test/scala/akka/io/TcpConnectionSpec.scala @@ -61,6 +61,25 @@ class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms") expectReceivedString("testdata2testdata3") } + "bundle incoming Received messages as long as more data is available" in withEstablishedConnection( + clientSocketOptions = List(SO.ReceiveBufferSize(1000000)) // to make sure enough data gets through + ) { setup ⇒ + import setup._ + + val DataSize = 1000000 + val bigData = new Array[Byte](DataSize) + val buffer = ByteBuffer.wrap(bigData) + + val wrote = serverSideChannel.write(buffer) + wrote must be > 140000 + + expectNoMsg(1000.millis) // data should have been transferred fully by now + + selector.send(connectionActor, ChannelReadable) + + // 140000 is more than the direct buffer size + connectionHandler.expectMsgType[Received].data.length must be > 140000 + } "receive data directly when the connection is established" in withUnacceptedConnection() { unregisteredSetup ⇒ import unregisteredSetup._ @@ -491,7 +510,9 @@ class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms") clientSideChannel) } } - def withEstablishedConnection(setServerSocketOptions: ServerSocketChannel ⇒ Unit = _ ⇒ ())(body: RegisteredSetup ⇒ Any): Unit = withUnacceptedConnection(setServerSocketOptions) { unregisteredSetup ⇒ + def withEstablishedConnection( + setServerSocketOptions: ServerSocketChannel ⇒ Unit = _ ⇒ (), + clientSocketOptions: immutable.Seq[SocketOption] = Nil)(body: RegisteredSetup ⇒ Any): Unit = withUnacceptedConnection(setServerSocketOptions, createConnectionActor(options = clientSocketOptions)) { unregisteredSetup ⇒ import unregisteredSetup._ localServer.configureBlocking(true)