=act #15991 delay sending of ACK until we have updated pendingWrite

Otherwise, the user-level actor may already have sent the next chunk before
the pendingWrite has been updated and the TcpConnection will reject it.
(cherry picked from commit 96758e6)
This commit is contained in:
Johannes Rudolph 2014-09-30 10:40:48 +02:00 committed by Endre Sándor Varga
parent fe1718c88f
commit b4c82b8cdf

View file

@ -105,8 +105,9 @@ private[io] abstract class TcpConnection(val tcp: TcpExt, val channel: SocketCha
if (!writePending) // writing is now finished if (!writePending) // writing is now finished
handleClose(info, closeCommander, closedEvent) handleClose(info, closeCommander, closedEvent)
case UpdatePendingWrite(remaining) case UpdatePendingWriteAndThen(remaining, work)
pendingWrite = remaining pendingWrite = remaining
work()
if (writePending) info.registration.enableInterest(OP_WRITE) if (writePending) info.registration.enableInterest(OP_WRITE)
else handleClose(info, closeCommander, closedEvent) else handleClose(info, closeCommander, closedEvent)
@ -165,8 +166,9 @@ private[io] abstract class TcpConnection(val tcp: TcpExt, val channel: SocketCha
else sender() ! CommandFailed(ResumeWriting) else sender() ! CommandFailed(ResumeWriting)
} else sender() ! WritingResumed } else sender() ! WritingResumed
case UpdatePendingWrite(remaining) case UpdatePendingWriteAndThen(remaining, work)
pendingWrite = remaining pendingWrite = remaining
work()
if (writePending) info.registration.enableInterest(OP_WRITE) if (writePending) info.registration.enableInterest(OP_WRITE)
case WriteFileFailed(e) handleError(info.handler, e) // rethrow exception from dispatcher task case WriteFileFailed(e) handleError(info.handler, e) // rethrow exception from dispatcher task
@ -425,12 +427,11 @@ private[io] abstract class TcpConnection(val tcp: TcpExt, val channel: SocketCha
if (written < remaining) { if (written < remaining) {
val updated = new PendingWriteFile(commander, fileChannel, offset + written, remaining - written, ack, tail) val updated = new PendingWriteFile(commander, fileChannel, offset + written, remaining - written, ack, tail)
self ! UpdatePendingWrite(updated) self ! UpdatePendingWriteAndThen(updated, TcpConnection.doNothing)
} else { } else {
if (!ack.isInstanceOf[NoAck]) commander ! ack
release() release()
self ! UpdatePendingWrite(PendingWrite(commander, tail)) val andThen = if (!ack.isInstanceOf[NoAck]) () commander ! ack else doNothing
self ! UpdatePendingWriteAndThen(PendingWrite(commander, tail), andThen)
} }
} catch { } catch {
case e: IOException self ! WriteFileFailed(e) case e: IOException self ! WriteFileFailed(e)
@ -463,7 +464,7 @@ private[io] object TcpConnection {
// INTERNAL MESSAGES // INTERNAL MESSAGES
final case class UpdatePendingWrite(remainingWrite: PendingWrite) extends NoSerializationVerificationNeeded final case class UpdatePendingWriteAndThen(remainingWrite: PendingWrite, work: () Unit) extends NoSerializationVerificationNeeded
final case class WriteFileFailed(e: IOException) final case class WriteFileFailed(e: IOException)
sealed abstract class PendingWrite { sealed abstract class PendingWrite {
@ -477,4 +478,6 @@ private[io] object TcpConnection {
def doWrite(info: ConnectionInfo): PendingWrite = throw new IllegalStateException def doWrite(info: ConnectionInfo): PendingWrite = throw new IllegalStateException
def release(): Unit = throw new IllegalStateException def release(): Unit = throw new IllegalStateException
} }
val doNothing: () Unit = () ()
} }