Merge pull request #16267 from drewhk/wip-15991-tcp-pendingwrite-fix-forward-port-drewhk
=act #15991 delay sending of ACK until .. (forward port)
This commit is contained in:
commit
8cd0286774
1 changed files with 10 additions and 7 deletions
|
|
@ -106,8 +106,9 @@ private[io] abstract class TcpConnection(val tcp: TcpExt, val channel: SocketCha
|
||||||
if (!writePending) // writing is now finished
|
if (!writePending) // writing is now finished
|
||||||
handleClose(info, closeCommander, closedEvent)
|
handleClose(info, closeCommander, closedEvent)
|
||||||
|
|
||||||
case UpdatePendingWrite(remaining) ⇒
|
case UpdatePendingWriteAndThen(remaining, work) ⇒
|
||||||
pendingWrite = remaining
|
pendingWrite = remaining
|
||||||
|
work()
|
||||||
if (writePending) info.registration.enableInterest(OP_WRITE)
|
if (writePending) info.registration.enableInterest(OP_WRITE)
|
||||||
else handleClose(info, closeCommander, closedEvent)
|
else handleClose(info, closeCommander, closedEvent)
|
||||||
|
|
||||||
|
|
@ -166,8 +167,9 @@ private[io] abstract class TcpConnection(val tcp: TcpExt, val channel: SocketCha
|
||||||
else sender() ! CommandFailed(ResumeWriting)
|
else sender() ! CommandFailed(ResumeWriting)
|
||||||
} else sender() ! WritingResumed
|
} else sender() ! WritingResumed
|
||||||
|
|
||||||
case UpdatePendingWrite(remaining) ⇒
|
case UpdatePendingWriteAndThen(remaining, work) ⇒
|
||||||
pendingWrite = remaining
|
pendingWrite = remaining
|
||||||
|
work()
|
||||||
if (writePending) info.registration.enableInterest(OP_WRITE)
|
if (writePending) info.registration.enableInterest(OP_WRITE)
|
||||||
|
|
||||||
case WriteFileFailed(e) ⇒ handleError(info.handler, e) // rethrow exception from dispatcher task
|
case WriteFileFailed(e) ⇒ handleError(info.handler, e) // rethrow exception from dispatcher task
|
||||||
|
|
@ -430,12 +432,11 @@ private[io] abstract class TcpConnection(val tcp: TcpExt, val channel: SocketCha
|
||||||
|
|
||||||
if (written < remaining) {
|
if (written < remaining) {
|
||||||
val updated = new PendingWriteFile(commander, fileChannel, offset + written, remaining - written, ack, tail)
|
val updated = new PendingWriteFile(commander, fileChannel, offset + written, remaining - written, ack, tail)
|
||||||
self ! UpdatePendingWrite(updated)
|
self ! UpdatePendingWriteAndThen(updated, TcpConnection.doNothing)
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
if (!ack.isInstanceOf[NoAck]) commander ! ack
|
|
||||||
release()
|
release()
|
||||||
self ! UpdatePendingWrite(PendingWrite(commander, tail))
|
val andThen = if (!ack.isInstanceOf[NoAck]) () ⇒ commander ! ack else doNothing
|
||||||
|
self ! UpdatePendingWriteAndThen(PendingWrite(commander, tail), andThen)
|
||||||
}
|
}
|
||||||
} catch {
|
} catch {
|
||||||
case e: IOException ⇒ self ! WriteFileFailed(e)
|
case e: IOException ⇒ self ! WriteFileFailed(e)
|
||||||
|
|
@ -468,7 +469,7 @@ private[io] object TcpConnection {
|
||||||
|
|
||||||
// INTERNAL MESSAGES
|
// INTERNAL MESSAGES
|
||||||
|
|
||||||
final case class UpdatePendingWrite(remainingWrite: PendingWrite) extends NoSerializationVerificationNeeded
|
final case class UpdatePendingWriteAndThen(remainingWrite: PendingWrite, work: () ⇒ Unit) extends NoSerializationVerificationNeeded
|
||||||
final case class WriteFileFailed(e: IOException)
|
final case class WriteFileFailed(e: IOException)
|
||||||
|
|
||||||
sealed abstract class PendingWrite {
|
sealed abstract class PendingWrite {
|
||||||
|
|
@ -482,4 +483,6 @@ private[io] object TcpConnection {
|
||||||
def doWrite(info: ConnectionInfo): PendingWrite = throw new IllegalStateException
|
def doWrite(info: ConnectionInfo): PendingWrite = throw new IllegalStateException
|
||||||
def release(): Unit = throw new IllegalStateException
|
def release(): Unit = throw new IllegalStateException
|
||||||
}
|
}
|
||||||
|
|
||||||
|
val doNothing: () ⇒ Unit = () ⇒ ()
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue