diff --git a/akka-actor-tests/src/test/scala/akka/io/TcpConnectionSpec.scala b/akka-actor-tests/src/test/scala/akka/io/TcpConnectionSpec.scala index fceb271ab4..991a0672ad 100644 --- a/akka-actor-tests/src/test/scala/akka/io/TcpConnectionSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/io/TcpConnectionSpec.scala @@ -107,25 +107,26 @@ class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms") expectReceivedString("testdata2testdata3") } - "bundle incoming Received messages as long as more data is available" in withEstablishedConnection( + "forward incoming data as Received messages instantly as long as more data is available" in withEstablishedConnection( clientSocketOptions = List(Inet.SO.ReceiveBufferSize(1000000)) // to make sure enough data gets through ) { setup ⇒ import setup._ - val DataSize = 1000000 + val bufferSize = Tcp(system).Settings.DirectBufferSize + val DataSize = bufferSize + 1500 val bigData = new Array[Byte](DataSize) val buffer = ByteBuffer.wrap(bigData) serverSideChannel.socket.setSendBufferSize(150000) val wrote = serverSideChannel.write(buffer) - wrote must be > 140000 + wrote must be(DataSize) expectNoMsg(1000.millis) // data should have been transferred fully by now selector.send(connectionActor, ChannelReadable) - // 140000 is more than the direct buffer size - connectionHandler.expectMsgType[Received].data.length must be > 140000 + connectionHandler.expectMsgType[Received].data.length must be(bufferSize) + connectionHandler.expectMsgType[Received].data.length must be(1500) } "receive data directly when the connection is established" in withUnacceptedConnection() { unregisteredSetup ⇒ diff --git a/akka-actor/src/main/scala/akka/io/TcpConnection.scala b/akka-actor/src/main/scala/akka/io/TcpConnection.scala index ec866ae75f..521288fae9 100644 --- a/akka-actor/src/main/scala/akka/io/TcpConnection.scala +++ b/akka-actor/src/main/scala/akka/io/TcpConnection.scala @@ -122,7 +122,7 @@ private[io] abstract class TcpConnection(val channel: SocketChannel, } def doRead(handler: ActorRef, closeCommander: Option[ActorRef]): Unit = { - @tailrec def innerRead(buffer: ByteBuffer, receivedData: ByteString, remainingLimit: Int): ReadResult = + @tailrec def innerRead(buffer: ByteBuffer, remainingLimit: Int): ReadResult = if (remainingLimit > 0) { // never read more than the configured limit buffer.clear() @@ -131,33 +131,22 @@ private[io] abstract class TcpConnection(val channel: SocketChannel, val readBytes = channel.read(buffer) buffer.flip() - val totalData = receivedData ++ ByteString(buffer) + if (TraceLogging) log.debug("Read [{}] bytes.", readBytes) + if (readBytes > 0) handler ! Received(ByteString(buffer)) readBytes match { - case `maxBufferSpace` ⇒ innerRead(buffer, totalData, remainingLimit - maxBufferSpace) - case x if totalData.length > 0 ⇒ GotCompleteData(totalData) - case 0 ⇒ NoData - case -1 ⇒ EndOfStream + case `maxBufferSpace` ⇒ innerRead(buffer, remainingLimit - maxBufferSpace) + case x if x >= 0 ⇒ AllRead + case -1 ⇒ EndOfStream case _ ⇒ throw new IllegalStateException("Unexpected value returned from read: " + readBytes) } - } else MoreDataWaiting(receivedData) + } else MoreDataWaiting val buffer = bufferPool.acquire() - try innerRead(buffer, ByteString.empty, ReceivedMessageSizeLimit) match { - case NoData ⇒ - if (TraceLogging) log.debug("Read nothing.") - selector ! ReadInterest - case GotCompleteData(data) ⇒ - if (TraceLogging) log.debug("Read [{}] bytes.", data.length) - - handler ! Received(data) - selector ! ReadInterest - case MoreDataWaiting(data) ⇒ - if (TraceLogging) log.debug("Read [{}] bytes. More data waiting.", data.length) - - handler ! Received(data) - self ! ChannelReadable + try innerRead(buffer, ReceivedMessageSizeLimit) match { + case AllRead ⇒ selector ! ReadInterest + case MoreDataWaiting ⇒ self ! ChannelReadable case EndOfStream ⇒ if (TraceLogging) log.debug("Read returned end-of-stream") doCloseConnection(handler, closeCommander, closeReason) @@ -307,10 +296,9 @@ private[io] abstract class TcpConnection(val channel: SocketChannel, */ private[io] object TcpConnection { sealed trait ReadResult - object NoData extends ReadResult object EndOfStream extends ReadResult - case class GotCompleteData(data: ByteString) extends ReadResult - case class MoreDataWaiting(data: ByteString) extends ReadResult + object AllRead extends ReadResult + object MoreDataWaiting extends ReadResult /** * Used to transport information to the postStop method to notify