Merge pull request #15573 from spray/w/fix-TCP-pull-mode
=act #15572 don't read recursively in pullMode when a complete buffer was read
This commit is contained in:
commit
b88c964bd4
2 changed files with 39 additions and 10 deletions
|
|
@ -10,6 +10,8 @@ import java.nio.ByteBuffer
|
|||
import java.nio.channels._
|
||||
import java.nio.channels.spi.SelectorProvider
|
||||
import java.nio.channels.SelectionKey._
|
||||
import com.typesafe.config.ConfigFactory
|
||||
|
||||
import scala.annotation.tailrec
|
||||
import scala.collection.immutable
|
||||
import scala.concurrent.duration._
|
||||
|
|
@ -33,7 +35,7 @@ object TcpConnectionSpec {
|
|||
class TcpConnectionSpec extends AkkaSpec("""
|
||||
akka.io.tcp.register-timeout = 500ms
|
||||
akka.actor.serialize-creators = on
|
||||
""") {
|
||||
""") { thisSpecs ⇒
|
||||
import TcpConnectionSpec._
|
||||
|
||||
// Helper to avoid Windows localization specific differences
|
||||
|
|
@ -356,26 +358,50 @@ class TcpConnectionSpec extends AkkaSpec("""
|
|||
}
|
||||
|
||||
"respect pull mode" in new EstablishedConnectionTest(pullMode = true) {
|
||||
run {
|
||||
serverSideChannel.write(ByteBuffer.wrap("testdata".getBytes("ASCII")))
|
||||
// override config to decrease default buffer size
|
||||
val config =
|
||||
ConfigFactory.load(
|
||||
ConfigFactory.parseString("akka.io.tcp.direct-buffer-size = 1k")
|
||||
.withFallback(AkkaSpec.testConf))
|
||||
override implicit def system: ActorSystem = ActorSystem("respectPullModeTest", config)
|
||||
|
||||
try run {
|
||||
val maxBufferSize = 1 * 1024
|
||||
val ts = "t" * maxBufferSize
|
||||
val us = "u" * (maxBufferSize / 2)
|
||||
|
||||
// send a batch that is bigger than the default buffer to make sure we don't recurse and
|
||||
// send more than one Received messages
|
||||
serverSideChannel.write(ByteBuffer.wrap((ts ++ us).getBytes("ASCII")))
|
||||
connectionHandler.expectNoMsg(100.millis)
|
||||
|
||||
connectionActor ! ResumeReading
|
||||
interestCallReceiver.expectMsg(OP_READ)
|
||||
selector.send(connectionActor, ChannelReadable)
|
||||
connectionHandler.expectMsgType[Received].data.decodeString("ASCII") should be("testdata")
|
||||
|
||||
// have two packets in flight before the selector notices
|
||||
serverSideChannel.write(ByteBuffer.wrap("testdata2".getBytes("ASCII")))
|
||||
serverSideChannel.write(ByteBuffer.wrap("testdata3".getBytes("ASCII")))
|
||||
connectionHandler.expectMsgType[Received].data.decodeString("ASCII") should be(ts)
|
||||
|
||||
interestCallReceiver.expectNoMsg(100.millis)
|
||||
connectionHandler.expectNoMsg(100.millis)
|
||||
|
||||
connectionActor ! ResumeReading
|
||||
interestCallReceiver.expectMsg(OP_READ)
|
||||
selector.send(connectionActor, ChannelReadable)
|
||||
connectionHandler.expectMsgType[Received].data.decodeString("ASCII") should be("testdata2testdata3")
|
||||
connectionHandler.expectMsgType[Received].data.decodeString("ASCII") should be(us)
|
||||
|
||||
// make sure that after reading all pending data we don't yet register for reading more data
|
||||
interestCallReceiver.expectNoMsg(100.millis)
|
||||
connectionHandler.expectNoMsg(100.millis)
|
||||
|
||||
val vs = "v" * (maxBufferSize / 2)
|
||||
serverSideChannel.write(ByteBuffer.wrap(vs.getBytes("ASCII")))
|
||||
|
||||
connectionActor ! ResumeReading
|
||||
interestCallReceiver.expectMsg(OP_READ)
|
||||
selector.send(connectionActor, ChannelReadable)
|
||||
|
||||
connectionHandler.expectMsgType[Received].data.decodeString("ASCII") should be(vs)
|
||||
}
|
||||
finally system.shutdown()
|
||||
}
|
||||
|
||||
"close the connection and reply with `Closed` upon reception of a `Close` command" in
|
||||
|
|
@ -803,6 +829,9 @@ class TcpConnectionSpec extends AkkaSpec("""
|
|||
}
|
||||
|
||||
abstract class LocalServerTest extends ChannelRegistry {
|
||||
/** Allows overriding the system used */
|
||||
implicit def system: ActorSystem = thisSpecs.system
|
||||
|
||||
val serverAddress = temporaryServerAddress()
|
||||
val localServerChannel = ServerSocketChannel.open()
|
||||
val userHandler = TestProbe()
|
||||
|
|
|
|||
|
|
@ -213,7 +213,7 @@ private[io] abstract class TcpConnection(val tcp: TcpExt, val channel: SocketCha
|
|||
if (readBytes > 0) info.handler ! Received(ByteString(buffer))
|
||||
|
||||
readBytes match {
|
||||
case `maxBufferSpace` ⇒ innerRead(buffer, remainingLimit - maxBufferSpace)
|
||||
case `maxBufferSpace` ⇒ if (pullMode) MoreDataWaiting else innerRead(buffer, remainingLimit - maxBufferSpace)
|
||||
case x if x >= 0 ⇒ AllRead
|
||||
case -1 ⇒ EndOfStream
|
||||
case _ ⇒
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue