stream: smart batching of writes in TcpStages (#30334)

Co-authored-by: Johannes Rudolph <johannes.rudolph@gmail.com>
This commit is contained in:
Patrik Nordwall 2021-07-01 09:19:36 +02:00 committed by GitHub
parent 8573c70883
commit 53727df35a
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 96 additions and 14 deletions

View file

@ -87,6 +87,39 @@ akka {
# slightly more bytes than this limit (at most one element more). It can be set to 0
# to disable the usage of the buffer.
write-buffer-size = 16 KiB
# In addition to the buffering described for property write-buffer-size, try to collect
# more consecutive writes from the upstream stream producers.
#
# The rationale is to increase write efficiency by avoiding separate small
# writes to the network which is expensive to do. Merging those writes together
# (up to `write-buffer-size`) improves throughput for small writes.
#
# The idea is that a running stream may produce multiple small writes consecutively
# in one go without waiting for any external input. To probe the stream for
# data, this features delays sending a write immediately by probing the stream
# for more writes. This works by rescheduling the TCP connection stage via the
# actor mailbox of the underlying actor. Thus, before the stage is reactivated
# the upstream gets another opportunity to emit writes.
#
# When the stage is reactivated and if new writes are detected another round-trip
# is scheduled. The loop repeats until either the number of round trips given in this
# setting is reached, the buffer reaches `write-buffer-size`, or no new writes
# were detected during the last round-trip.
#
# This mechanism ensures that a write is guaranteed to be sent when the remaining stream
# becomes idle waiting for external signals.
#
# In most cases, the extra latency this mechanism introduces should be negligible,
# but depending on the stream setup it may introduce a noticeable delay,
# if the upstream continuously produces small amounts of writes in a
# blocking (CPU-bound) way.
#
# In that case, the feature can either be disabled, or the producing CPU-bound
# work can be taken off-stream to avoid excessive delays (e.g. using `mapAsync` instead of `map`).
#
# A value of 0 disables this feature.
coalesce-writes = 10
}
# Time to wait for async materializer creation before throwing an exception

View file

@ -809,19 +809,30 @@ object IOSettings {
@nowarn("msg=deprecated")
final class IOSettings private (
@deprecated("Use attribute 'TcpAttributes.TcpWriteBufferSize' to read the concrete setting value", "2.6.0")
val tcpWriteBufferSize: Int) {
val tcpWriteBufferSize: Int,
val coalesceWrites: Int) {
// constructor for binary compatibility with version 2.6.15 and earlier
@deprecated("Use attribute 'TcpAttributes.TcpWriteBufferSize' to read the concrete setting value", "2.6.0")
def this(tcpWriteBufferSize: Int) = this(tcpWriteBufferSize, coalesceWrites = 10)
def withTcpWriteBufferSize(value: Int): IOSettings = copy(tcpWriteBufferSize = value)
private def copy(tcpWriteBufferSize: Int): IOSettings = new IOSettings(tcpWriteBufferSize = tcpWriteBufferSize)
def withCoalesceWrites(value: Int): IOSettings = copy(coalesceWrites = value)
private def copy(tcpWriteBufferSize: Int = tcpWriteBufferSize, coalesceWrites: Int = coalesceWrites): IOSettings =
new IOSettings(tcpWriteBufferSize, coalesceWrites)
override def equals(other: Any): Boolean = other match {
case s: IOSettings => s.tcpWriteBufferSize == tcpWriteBufferSize
case s: IOSettings => s.tcpWriteBufferSize == tcpWriteBufferSize && s.coalesceWrites == coalesceWrites
case _ => false
}
override def hashCode(): Int =
31 * tcpWriteBufferSize + coalesceWrites
override def toString =
s"""IoSettings(${tcpWriteBufferSize})"""
s"""IoSettings($tcpWriteBufferSize,$coalesceWrites)"""
}
object StreamSubscriptionTimeoutSettings {

View file

@ -11,7 +11,6 @@ import java.util.concurrent.atomic.{ AtomicBoolean, AtomicLong }
import scala.collection.immutable
import scala.concurrent.{ Future, Promise }
import scala.concurrent.duration.{ Duration, FiniteDuration }
import scala.annotation.nowarn
import akka.{ Done, NotUsed }
@ -214,6 +213,9 @@ private[stream] object ConnectionSourceStage {
@InternalApi private[stream] object TcpConnectionStage {
case object WriteAck extends Tcp.Event
private case object WriteDelayAck extends Tcp.Event
private val WriteDelayMessage = Write(ByteString.empty, WriteDelayAck)
trait TcpRole {
def halfClose: Boolean
}
@ -253,8 +255,7 @@ private[stream] object ConnectionSourceStage {
@nowarn("msg=deprecated")
private val writeBufferSize = inheritedAttributes
.get[TcpAttributes.TcpWriteBufferSize](
TcpAttributes.TcpWriteBufferSize(
ActorMaterializerHelper.downcast(eagerMaterializer).settings.ioSettings.tcpWriteBufferSize))
TcpAttributes.TcpWriteBufferSize(eagerMaterializer.settings.ioSettings.tcpWriteBufferSize))
.size
private var writeBuffer = ByteString.empty
@ -264,6 +265,12 @@ private[stream] object ConnectionSourceStage {
// upstream already finished but are still writing the last data to the connection
private var connectionClosePending = false
@nowarn("msg=deprecated")
private val coalesceWrites = eagerMaterializer.settings.ioSettings.coalesceWrites
private def coalesceWritesDisabled = coalesceWrites == 0
private var writeDelayCountDown = 0
private var previousWriteBufferSize = 0
// No reading until role have been decided
setHandler(bytesOut, new OutHandler {
override def onPull(): Unit = ()
@ -309,6 +316,23 @@ private[stream] object ConnectionSourceStage {
}
}
private def sendWriteBuffer(): Unit = {
connection ! Write(writeBuffer, WriteAck)
writeInProgress = true
writeBuffer = ByteString.empty
}
/*
* Coalesce more frames by collecting more frames while waiting for round trip to the
* connection actor. WriteDelayMessage is an empty Write message and WriteDelayAck will
* be sent back as reply.
*/
private def sendWriteDelay(): Unit = {
previousWriteBufferSize = writeBuffer.length
writeInProgress = true
connection ! WriteDelayMessage
}
// Used for both inbound and outbound connections
private def connected(evt: (ActorRef, Any)): Unit = {
val msg = evt._2
@ -318,13 +342,24 @@ private[stream] object ConnectionSourceStage {
if (isClosed(bytesOut)) connection ! ResumeReading
else push(bytesOut, data)
case WriteDelayAck =>
// Immediately flush the write buffer if no more frames have been collected during the WriteDelayMessage
// round trip to the connection actor, or if reaching the configured maximum number of round trips, or
// if writeBuffer capacity has been exceeded.
writeDelayCountDown -= 1
if (writeDelayCountDown == 0 || previousWriteBufferSize == writeBuffer.length || writeBuffer.length >= writeBufferSize)
sendWriteBuffer()
else
sendWriteDelay()
case WriteAck =>
if (writeBuffer.isEmpty)
writeInProgress = false
else if (coalesceWritesDisabled || writeBuffer.length >= writeBufferSize)
sendWriteBuffer()
else {
connection ! Write(writeBuffer, WriteAck)
writeInProgress = true
writeBuffer = ByteString.empty
writeDelayCountDown = coalesceWrites
sendWriteDelay()
}
if (!writeInProgress && connectionClosePending) {
@ -417,12 +452,15 @@ private[stream] object ConnectionSourceStage {
ReactiveStreamsCompliance.requireNonNullElement(elem)
if (writeInProgress) {
writeBuffer = writeBuffer ++ elem
} else if (coalesceWritesDisabled || writeBuffer.length >= writeBufferSize) {
writeBuffer = writeBuffer ++ elem
sendWriteBuffer()
} else {
connection ! Write(writeBuffer ++ elem, WriteAck)
writeInProgress = true
writeBuffer = ByteString.empty
writeBuffer = writeBuffer ++ elem
writeDelayCountDown = coalesceWrites
sendWriteDelay()
}
if (writeBuffer.size < writeBufferSize)
if (writeBuffer.length < writeBufferSize)
pull(bytesIn)
}