Merge pull request #23938 from akka/wip-23919-optimize-stream-tcp-writes2-patriknw
rename according to previous review, #23919
This commit is contained in:
commit
c4f52a34f4
1 changed files with 8 additions and 8 deletions
|
|
@ -204,7 +204,7 @@ private[stream] object ConnectionSourceStage {
|
|||
|
||||
private val writeBufferSize = role.ioSettings.tcpWriteBufferSize
|
||||
private var writeBuffer = ByteString.empty
|
||||
private var writePending = false
|
||||
private var writeInProgress = false
|
||||
private var connectionClosePending = false
|
||||
|
||||
// No reading until role have been decided
|
||||
|
|
@ -256,14 +256,14 @@ private[stream] object ConnectionSourceStage {
|
|||
|
||||
case WriteAck ⇒
|
||||
if (writeBuffer.isEmpty)
|
||||
writePending = false
|
||||
writeInProgress = false
|
||||
else {
|
||||
connection ! Write(writeBuffer, WriteAck)
|
||||
writePending = true
|
||||
writeInProgress = true
|
||||
writeBuffer = ByteString.empty
|
||||
}
|
||||
|
||||
if (!writePending && connectionClosePending) {
|
||||
if (!writeInProgress && connectionClosePending) {
|
||||
// continue onUpstreamFinish
|
||||
closeConnection()
|
||||
}
|
||||
|
|
@ -287,13 +287,13 @@ private[stream] object ConnectionSourceStage {
|
|||
if (isClosed(bytesOut) || !role.halfClose) {
|
||||
// Reading has stopped before, either because of cancel, or PeerClosed, so just Close now
|
||||
// (or half-close is turned off)
|
||||
if (writePending)
|
||||
if (writeInProgress)
|
||||
connectionClosePending = true // will continue when WriteAck is received and writeBuffer drained
|
||||
else
|
||||
connection ! Close
|
||||
} else if (connection != null) {
|
||||
// We still read, so we only close the write side
|
||||
if (writePending)
|
||||
if (writeInProgress)
|
||||
connectionClosePending = true // will continue when WriteAck is received and writeBuffer drained
|
||||
else
|
||||
connection ! ConfirmedClose
|
||||
|
|
@ -318,11 +318,11 @@ private[stream] object ConnectionSourceStage {
|
|||
override def onPush(): Unit = {
|
||||
val elem = grab(bytesIn)
|
||||
ReactiveStreamsCompliance.requireNonNullElement(elem)
|
||||
if (writePending) {
|
||||
if (writeInProgress) {
|
||||
writeBuffer = writeBuffer ++ elem
|
||||
} else {
|
||||
connection ! Write(writeBuffer ++ elem, WriteAck)
|
||||
writePending = true
|
||||
writeInProgress = true
|
||||
writeBuffer = ByteString.empty
|
||||
}
|
||||
if (writeBuffer.size < writeBufferSize)
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue