diff --git a/akka-actor/src/main/scala/akka/io/TcpConnection.scala b/akka-actor/src/main/scala/akka/io/TcpConnection.scala index 59b28a85cb..e46d3ccc54 100644 --- a/akka-actor/src/main/scala/akka/io/TcpConnection.scala +++ b/akka-actor/src/main/scala/akka/io/TcpConnection.scala @@ -106,8 +106,9 @@ private[io] abstract class TcpConnection(val tcp: TcpExt, val channel: SocketCha if (!writePending) // writing is now finished handleClose(info, closeCommander, closedEvent) - case UpdatePendingWrite(remaining) ⇒ + case UpdatePendingWriteAndThen(remaining, work) ⇒ pendingWrite = remaining + work() if (writePending) info.registration.enableInterest(OP_WRITE) else handleClose(info, closeCommander, closedEvent) @@ -166,8 +167,9 @@ private[io] abstract class TcpConnection(val tcp: TcpExt, val channel: SocketCha else sender() ! CommandFailed(ResumeWriting) } else sender() ! WritingResumed - case UpdatePendingWrite(remaining) ⇒ + case UpdatePendingWriteAndThen(remaining, work) ⇒ pendingWrite = remaining + work() if (writePending) info.registration.enableInterest(OP_WRITE) case WriteFileFailed(e) ⇒ handleError(info.handler, e) // rethrow exception from dispatcher task @@ -430,12 +432,11 @@ private[io] abstract class TcpConnection(val tcp: TcpExt, val channel: SocketCha if (written < remaining) { val updated = new PendingWriteFile(commander, fileChannel, offset + written, remaining - written, ack, tail) - self ! UpdatePendingWrite(updated) - + self ! UpdatePendingWriteAndThen(updated, TcpConnection.doNothing) } else { - if (!ack.isInstanceOf[NoAck]) commander ! ack release() - self ! UpdatePendingWrite(PendingWrite(commander, tail)) + val andThen = if (!ack.isInstanceOf[NoAck]) () ⇒ commander ! ack else doNothing + self ! UpdatePendingWriteAndThen(PendingWrite(commander, tail), andThen) } } catch { case e: IOException ⇒ self ! WriteFileFailed(e) @@ -468,7 +469,7 @@ private[io] object TcpConnection { // INTERNAL MESSAGES - final case class UpdatePendingWrite(remainingWrite: PendingWrite) extends NoSerializationVerificationNeeded + final case class UpdatePendingWriteAndThen(remainingWrite: PendingWrite, work: () ⇒ Unit) extends NoSerializationVerificationNeeded final case class WriteFileFailed(e: IOException) sealed abstract class PendingWrite { @@ -482,4 +483,6 @@ private[io] object TcpConnection { def doWrite(info: ConnectionInfo): PendingWrite = throw new IllegalStateException def release(): Unit = throw new IllegalStateException } + + val doNothing: () ⇒ Unit = () ⇒ () }