From f87f166aacd2d23eb40d56a4d4bcedc2b71ccf6e Mon Sep 17 00:00:00 2001 From: Johannes Rudolph Date: Thu, 17 Oct 2013 14:10:00 +0200 Subject: [PATCH] =act #3672 fix garbling of big outgoing Tcp.Write caused by mixup of buffers --- .../scala/akka/io/TcpConnectionSpec.scala | 25 ++++++++++++++++++- .../main/scala/akka/io/TcpConnection.scala | 4 +-- 2 files changed, 26 insertions(+), 3 deletions(-) diff --git a/akka-actor-tests/src/test/scala/akka/io/TcpConnectionSpec.scala b/akka-actor-tests/src/test/scala/akka/io/TcpConnectionSpec.scala index 7e3a62817c..4bdfac145e 100644 --- a/akka-actor-tests/src/test/scala/akka/io/TcpConnectionSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/io/TcpConnectionSpec.scala @@ -5,7 +5,7 @@ package akka.io import java.io.{ File, IOException } -import java.net.{ URLClassLoader, ConnectException, InetSocketAddress, SocketException, SocketTimeoutException } +import java.net.{ URLClassLoader, InetSocketAddress } import java.nio.ByteBuffer import java.nio.channels._ import java.nio.channels.spi.SelectorProvider @@ -22,6 +22,7 @@ import akka.actor._ import akka.testkit.{ AkkaSpec, EventFilter, TestActorRef, TestProbe } import akka.util.{ Helpers, ByteString } import akka.TestUtils._ +import java.util.Random object TcpConnectionSpec { case class Ack(i: Int) extends Event @@ -176,6 +177,28 @@ class TcpConnectionSpec extends AkkaSpec(""" } } + "send big buffers to network correctly" in new EstablishedConnectionTest() { + run { + val bufferSize = 512 * 1024 // 256kB are too few + val random = new Random(0) + val testBytes = new Array[Byte](bufferSize) + random.nextBytes(testBytes) + val testData = ByteString(testBytes) + + val writer = TestProbe() + + val write = Write(testData, Ack) + val buffer = ByteBuffer.allocate(bufferSize) + serverSideChannel.read(buffer) must be(0) + writer.send(connectionActor, write) + pullFromServerSide(remaining = bufferSize, into = buffer) + buffer.flip() + buffer.limit must be(bufferSize) + + ByteString(buffer) must be(testData) + } + } + "write data after not acknowledged data" in new EstablishedConnectionTest() { run { val writer = TestProbe() diff --git a/akka-actor/src/main/scala/akka/io/TcpConnection.scala b/akka-actor/src/main/scala/akka/io/TcpConnection.scala index 6f032d96db..a2b7dc8525 100644 --- a/akka-actor/src/main/scala/akka/io/TcpConnection.scala +++ b/akka-actor/src/main/scala/akka/io/TcpConnection.scala @@ -348,9 +348,9 @@ private[io] abstract class TcpConnection(val tcp: TcpExt, val channel: SocketCha } else if (data.nonEmpty) { buffer.clear() - val copied = remainingData.copyToBuffer(buffer) + val copied = data.copyToBuffer(buffer) buffer.flip() - writeToChannel(remainingData drop copied) + writeToChannel(data drop copied) } else { if (!ack.isInstanceOf[NoAck]) commander ! ack