=act #3672 fix garbling of big outgoing Tcp.Write caused by mixup of buffers
This commit is contained in:
parent
f194860dfb
commit
f87f166aac
2 changed files with 26 additions and 3 deletions
|
|
@ -5,7 +5,7 @@
|
||||||
package akka.io
|
package akka.io
|
||||||
|
|
||||||
import java.io.{ File, IOException }
|
import java.io.{ File, IOException }
|
||||||
import java.net.{ URLClassLoader, ConnectException, InetSocketAddress, SocketException, SocketTimeoutException }
|
import java.net.{ URLClassLoader, InetSocketAddress }
|
||||||
import java.nio.ByteBuffer
|
import java.nio.ByteBuffer
|
||||||
import java.nio.channels._
|
import java.nio.channels._
|
||||||
import java.nio.channels.spi.SelectorProvider
|
import java.nio.channels.spi.SelectorProvider
|
||||||
|
|
@ -22,6 +22,7 @@ import akka.actor._
|
||||||
import akka.testkit.{ AkkaSpec, EventFilter, TestActorRef, TestProbe }
|
import akka.testkit.{ AkkaSpec, EventFilter, TestActorRef, TestProbe }
|
||||||
import akka.util.{ Helpers, ByteString }
|
import akka.util.{ Helpers, ByteString }
|
||||||
import akka.TestUtils._
|
import akka.TestUtils._
|
||||||
|
import java.util.Random
|
||||||
|
|
||||||
object TcpConnectionSpec {
|
object TcpConnectionSpec {
|
||||||
case class Ack(i: Int) extends Event
|
case class Ack(i: Int) extends Event
|
||||||
|
|
@ -176,6 +177,28 @@ class TcpConnectionSpec extends AkkaSpec("""
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
"send big buffers to network correctly" in new EstablishedConnectionTest() {
|
||||||
|
run {
|
||||||
|
val bufferSize = 512 * 1024 // 256kB are too few
|
||||||
|
val random = new Random(0)
|
||||||
|
val testBytes = new Array[Byte](bufferSize)
|
||||||
|
random.nextBytes(testBytes)
|
||||||
|
val testData = ByteString(testBytes)
|
||||||
|
|
||||||
|
val writer = TestProbe()
|
||||||
|
|
||||||
|
val write = Write(testData, Ack)
|
||||||
|
val buffer = ByteBuffer.allocate(bufferSize)
|
||||||
|
serverSideChannel.read(buffer) must be(0)
|
||||||
|
writer.send(connectionActor, write)
|
||||||
|
pullFromServerSide(remaining = bufferSize, into = buffer)
|
||||||
|
buffer.flip()
|
||||||
|
buffer.limit must be(bufferSize)
|
||||||
|
|
||||||
|
ByteString(buffer) must be(testData)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
"write data after not acknowledged data" in new EstablishedConnectionTest() {
|
"write data after not acknowledged data" in new EstablishedConnectionTest() {
|
||||||
run {
|
run {
|
||||||
val writer = TestProbe()
|
val writer = TestProbe()
|
||||||
|
|
|
||||||
|
|
@ -348,9 +348,9 @@ private[io] abstract class TcpConnection(val tcp: TcpExt, val channel: SocketCha
|
||||||
|
|
||||||
} else if (data.nonEmpty) {
|
} else if (data.nonEmpty) {
|
||||||
buffer.clear()
|
buffer.clear()
|
||||||
val copied = remainingData.copyToBuffer(buffer)
|
val copied = data.copyToBuffer(buffer)
|
||||||
buffer.flip()
|
buffer.flip()
|
||||||
writeToChannel(remainingData drop copied)
|
writeToChannel(data drop copied)
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
if (!ack.isInstanceOf[NoAck]) commander ! ack
|
if (!ack.isInstanceOf[NoAck]) commander ! ack
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue