diff --git a/akka-actor-tests/src/test/scala/akka/io/TcpConnectionSpec.scala b/akka-actor-tests/src/test/scala/akka/io/TcpConnectionSpec.scala index b130ead288..8f85966503 100644 --- a/akka-actor-tests/src/test/scala/akka/io/TcpConnectionSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/io/TcpConnectionSpec.scala @@ -10,6 +10,8 @@ import java.nio.ByteBuffer import java.nio.channels._ import java.nio.channels.spi.SelectorProvider import java.nio.channels.SelectionKey._ +import com.typesafe.config.ConfigFactory + import scala.annotation.tailrec import scala.collection.immutable import scala.concurrent.duration._ @@ -33,7 +35,7 @@ object TcpConnectionSpec { class TcpConnectionSpec extends AkkaSpec(""" akka.io.tcp.register-timeout = 500ms akka.actor.serialize-creators = on - """) { + """) { thisSpecs ⇒ import TcpConnectionSpec._ // Helper to avoid Windows localization specific differences @@ -356,26 +358,50 @@ class TcpConnectionSpec extends AkkaSpec(""" } "respect pull mode" in new EstablishedConnectionTest(pullMode = true) { - run { - serverSideChannel.write(ByteBuffer.wrap("testdata".getBytes("ASCII"))) + // override config to decrease default buffer size + val config = + ConfigFactory.load( + ConfigFactory.parseString("akka.io.tcp.direct-buffer-size = 1k") + .withFallback(AkkaSpec.testConf)) + override implicit def system: ActorSystem = ActorSystem("respectPullModeTest", config) + + try run { + val maxBufferSize = 1 * 1024 + val ts = "t" * maxBufferSize + val us = "u" * (maxBufferSize / 2) + + // send a batch that is bigger than the default buffer to make sure we don't recurse and + // send more than one Received messages + serverSideChannel.write(ByteBuffer.wrap((ts ++ us).getBytes("ASCII"))) connectionHandler.expectNoMsg(100.millis) connectionActor ! ResumeReading interestCallReceiver.expectMsg(OP_READ) selector.send(connectionActor, ChannelReadable) - connectionHandler.expectMsgType[Received].data.decodeString("ASCII") should be("testdata") - - // have two packets in flight before the selector notices - serverSideChannel.write(ByteBuffer.wrap("testdata2".getBytes("ASCII"))) - serverSideChannel.write(ByteBuffer.wrap("testdata3".getBytes("ASCII"))) + connectionHandler.expectMsgType[Received].data.decodeString("ASCII") should be(ts) + interestCallReceiver.expectNoMsg(100.millis) connectionHandler.expectNoMsg(100.millis) connectionActor ! ResumeReading interestCallReceiver.expectMsg(OP_READ) selector.send(connectionActor, ChannelReadable) - connectionHandler.expectMsgType[Received].data.decodeString("ASCII") should be("testdata2testdata3") + connectionHandler.expectMsgType[Received].data.decodeString("ASCII") should be(us) + + // make sure that after reading all pending data we don't yet register for reading more data + interestCallReceiver.expectNoMsg(100.millis) + connectionHandler.expectNoMsg(100.millis) + + val vs = "v" * (maxBufferSize / 2) + serverSideChannel.write(ByteBuffer.wrap(vs.getBytes("ASCII"))) + + connectionActor ! ResumeReading + interestCallReceiver.expectMsg(OP_READ) + selector.send(connectionActor, ChannelReadable) + + connectionHandler.expectMsgType[Received].data.decodeString("ASCII") should be(vs) } + finally system.shutdown() } "close the connection and reply with `Closed` upon reception of a `Close` command" in @@ -803,6 +829,9 @@ class TcpConnectionSpec extends AkkaSpec(""" } abstract class LocalServerTest extends ChannelRegistry { + /** Allows overriding the system used */ + implicit def system: ActorSystem = thisSpecs.system + val serverAddress = temporaryServerAddress() val localServerChannel = ServerSocketChannel.open() val userHandler = TestProbe() diff --git a/akka-actor/src/main/scala/akka/io/TcpConnection.scala b/akka-actor/src/main/scala/akka/io/TcpConnection.scala index 084f6d7de9..b37c65dd0c 100644 --- a/akka-actor/src/main/scala/akka/io/TcpConnection.scala +++ b/akka-actor/src/main/scala/akka/io/TcpConnection.scala @@ -213,7 +213,7 @@ private[io] abstract class TcpConnection(val tcp: TcpExt, val channel: SocketCha if (readBytes > 0) info.handler ! Received(ByteString(buffer)) readBytes match { - case `maxBufferSpace` ⇒ innerRead(buffer, remainingLimit - maxBufferSpace) + case `maxBufferSpace` ⇒ if (pullMode) MoreDataWaiting else innerRead(buffer, remainingLimit - maxBufferSpace) case x if x >= 0 ⇒ AllRead case -1 ⇒ EndOfStream case _ ⇒