From b4c82b8cdf02a6cb3f89f8194cc1cd86d6df033b Mon Sep 17 00:00:00 2001 From: Johannes Rudolph Date: Tue, 30 Sep 2014 10:40:48 +0200 Subject: [PATCH] =act #15991 delay sending of ACK until we have updated pendingWrite Otherwise, the user-level actor may already have sent the next chunk before the pendingWrite has been updated and the TcpConnection will reject it. (cherry picked from commit 96758e6) --- .../src/main/scala/akka/io/TcpConnection.scala | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/akka-actor/src/main/scala/akka/io/TcpConnection.scala b/akka-actor/src/main/scala/akka/io/TcpConnection.scala index b37c65dd0c..9db1fd8ecb 100644 --- a/akka-actor/src/main/scala/akka/io/TcpConnection.scala +++ b/akka-actor/src/main/scala/akka/io/TcpConnection.scala @@ -105,8 +105,9 @@ private[io] abstract class TcpConnection(val tcp: TcpExt, val channel: SocketCha if (!writePending) // writing is now finished handleClose(info, closeCommander, closedEvent) - case UpdatePendingWrite(remaining) ⇒ + case UpdatePendingWriteAndThen(remaining, work) ⇒ pendingWrite = remaining + work() if (writePending) info.registration.enableInterest(OP_WRITE) else handleClose(info, closeCommander, closedEvent) @@ -165,8 +166,9 @@ private[io] abstract class TcpConnection(val tcp: TcpExt, val channel: SocketCha else sender() ! CommandFailed(ResumeWriting) } else sender() ! WritingResumed - case UpdatePendingWrite(remaining) ⇒ + case UpdatePendingWriteAndThen(remaining, work) ⇒ pendingWrite = remaining + work() if (writePending) info.registration.enableInterest(OP_WRITE) case WriteFileFailed(e) ⇒ handleError(info.handler, e) // rethrow exception from dispatcher task @@ -425,12 +427,11 @@ private[io] abstract class TcpConnection(val tcp: TcpExt, val channel: SocketCha if (written < remaining) { val updated = new PendingWriteFile(commander, fileChannel, offset + written, remaining - written, ack, tail) - self ! UpdatePendingWrite(updated) - + self ! UpdatePendingWriteAndThen(updated, TcpConnection.doNothing) } else { - if (!ack.isInstanceOf[NoAck]) commander ! ack release() - self ! UpdatePendingWrite(PendingWrite(commander, tail)) + val andThen = if (!ack.isInstanceOf[NoAck]) () ⇒ commander ! ack else doNothing + self ! UpdatePendingWriteAndThen(PendingWrite(commander, tail), andThen) } } catch { case e: IOException ⇒ self ! WriteFileFailed(e) @@ -463,7 +464,7 @@ private[io] object TcpConnection { // INTERNAL MESSAGES - final case class UpdatePendingWrite(remainingWrite: PendingWrite) extends NoSerializationVerificationNeeded + final case class UpdatePendingWriteAndThen(remainingWrite: PendingWrite, work: () ⇒ Unit) extends NoSerializationVerificationNeeded final case class WriteFileFailed(e: IOException) sealed abstract class PendingWrite { @@ -477,4 +478,6 @@ private[io] object TcpConnection { def doWrite(info: ConnectionInfo): PendingWrite = throw new IllegalStateException def release(): Unit = throw new IllegalStateException } + + val doNothing: () ⇒ Unit = () ⇒ () }