diff --git a/akka-stream/src/main/resources/reference.conf b/akka-stream/src/main/resources/reference.conf index dd982714a1..be96c3dfc2 100644 --- a/akka-stream/src/main/resources/reference.conf +++ b/akka-stream/src/main/resources/reference.conf @@ -87,6 +87,39 @@ akka { # slightly more bytes than this limit (at most one element more). It can be set to 0 # to disable the usage of the buffer. write-buffer-size = 16 KiB + + # In addition to the buffering described for property write-buffer-size, try to collect + # more consecutive writes from the upstream stream producers. + # + # The rationale is to increase write efficiency by avoiding separate small + # writes to the network which is expensive to do. Merging those writes together + # (up to `write-buffer-size`) improves throughput for small writes. + # + # The idea is that a running stream may produce multiple small writes consecutively + # in one go without waiting for any external input. To probe the stream for + # data, this features delays sending a write immediately by probing the stream + # for more writes. This works by rescheduling the TCP connection stage via the + # actor mailbox of the underlying actor. Thus, before the stage is reactivated + # the upstream gets another opportunity to emit writes. + # + # When the stage is reactivated and if new writes are detected another round-trip + # is scheduled. The loop repeats until either the number of round trips given in this + # setting is reached, the buffer reaches `write-buffer-size`, or no new writes + # were detected during the last round-trip. + # + # This mechanism ensures that a write is guaranteed to be sent when the remaining stream + # becomes idle waiting for external signals. + # + # In most cases, the extra latency this mechanism introduces should be negligible, + # but depending on the stream setup it may introduce a noticeable delay, + # if the upstream continuously produces small amounts of writes in a + # blocking (CPU-bound) way. + # + # In that case, the feature can either be disabled, or the producing CPU-bound + # work can be taken off-stream to avoid excessive delays (e.g. using `mapAsync` instead of `map`). + # + # A value of 0 disables this feature. + coalesce-writes = 10 } # Time to wait for async materializer creation before throwing an exception diff --git a/akka-stream/src/main/scala/akka/stream/ActorMaterializer.scala b/akka-stream/src/main/scala/akka/stream/ActorMaterializer.scala index c145898a99..f5e9705c6f 100644 --- a/akka-stream/src/main/scala/akka/stream/ActorMaterializer.scala +++ b/akka-stream/src/main/scala/akka/stream/ActorMaterializer.scala @@ -809,19 +809,30 @@ object IOSettings { @nowarn("msg=deprecated") final class IOSettings private ( @deprecated("Use attribute 'TcpAttributes.TcpWriteBufferSize' to read the concrete setting value", "2.6.0") - val tcpWriteBufferSize: Int) { + val tcpWriteBufferSize: Int, + val coalesceWrites: Int) { + + // constructor for binary compatibility with version 2.6.15 and earlier + @deprecated("Use attribute 'TcpAttributes.TcpWriteBufferSize' to read the concrete setting value", "2.6.0") + def this(tcpWriteBufferSize: Int) = this(tcpWriteBufferSize, coalesceWrites = 10) def withTcpWriteBufferSize(value: Int): IOSettings = copy(tcpWriteBufferSize = value) - private def copy(tcpWriteBufferSize: Int): IOSettings = new IOSettings(tcpWriteBufferSize = tcpWriteBufferSize) + def withCoalesceWrites(value: Int): IOSettings = copy(coalesceWrites = value) + + private def copy(tcpWriteBufferSize: Int = tcpWriteBufferSize, coalesceWrites: Int = coalesceWrites): IOSettings = + new IOSettings(tcpWriteBufferSize, coalesceWrites) override def equals(other: Any): Boolean = other match { - case s: IOSettings => s.tcpWriteBufferSize == tcpWriteBufferSize + case s: IOSettings => s.tcpWriteBufferSize == tcpWriteBufferSize && s.coalesceWrites == coalesceWrites case _ => false } + override def hashCode(): Int = + 31 * tcpWriteBufferSize + coalesceWrites + override def toString = - s"""IoSettings(${tcpWriteBufferSize})""" + s"""IoSettings($tcpWriteBufferSize,$coalesceWrites)""" } object StreamSubscriptionTimeoutSettings { diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/TcpStages.scala b/akka-stream/src/main/scala/akka/stream/impl/io/TcpStages.scala index 3f43d3fdf7..e9370ab89d 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/io/TcpStages.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/TcpStages.scala @@ -11,7 +11,6 @@ import java.util.concurrent.atomic.{ AtomicBoolean, AtomicLong } import scala.collection.immutable import scala.concurrent.{ Future, Promise } import scala.concurrent.duration.{ Duration, FiniteDuration } - import scala.annotation.nowarn import akka.{ Done, NotUsed } @@ -214,6 +213,9 @@ private[stream] object ConnectionSourceStage { @InternalApi private[stream] object TcpConnectionStage { case object WriteAck extends Tcp.Event + private case object WriteDelayAck extends Tcp.Event + private val WriteDelayMessage = Write(ByteString.empty, WriteDelayAck) + trait TcpRole { def halfClose: Boolean } @@ -253,8 +255,7 @@ private[stream] object ConnectionSourceStage { @nowarn("msg=deprecated") private val writeBufferSize = inheritedAttributes .get[TcpAttributes.TcpWriteBufferSize]( - TcpAttributes.TcpWriteBufferSize( - ActorMaterializerHelper.downcast(eagerMaterializer).settings.ioSettings.tcpWriteBufferSize)) + TcpAttributes.TcpWriteBufferSize(eagerMaterializer.settings.ioSettings.tcpWriteBufferSize)) .size private var writeBuffer = ByteString.empty @@ -264,6 +265,12 @@ private[stream] object ConnectionSourceStage { // upstream already finished but are still writing the last data to the connection private var connectionClosePending = false + @nowarn("msg=deprecated") + private val coalesceWrites = eagerMaterializer.settings.ioSettings.coalesceWrites + private def coalesceWritesDisabled = coalesceWrites == 0 + private var writeDelayCountDown = 0 + private var previousWriteBufferSize = 0 + // No reading until role have been decided setHandler(bytesOut, new OutHandler { override def onPull(): Unit = () @@ -309,6 +316,23 @@ private[stream] object ConnectionSourceStage { } } + private def sendWriteBuffer(): Unit = { + connection ! Write(writeBuffer, WriteAck) + writeInProgress = true + writeBuffer = ByteString.empty + } + + /* + * Coalesce more frames by collecting more frames while waiting for round trip to the + * connection actor. WriteDelayMessage is an empty Write message and WriteDelayAck will + * be sent back as reply. + */ + private def sendWriteDelay(): Unit = { + previousWriteBufferSize = writeBuffer.length + writeInProgress = true + connection ! WriteDelayMessage + } + // Used for both inbound and outbound connections private def connected(evt: (ActorRef, Any)): Unit = { val msg = evt._2 @@ -318,13 +342,24 @@ private[stream] object ConnectionSourceStage { if (isClosed(bytesOut)) connection ! ResumeReading else push(bytesOut, data) + case WriteDelayAck => + // Immediately flush the write buffer if no more frames have been collected during the WriteDelayMessage + // round trip to the connection actor, or if reaching the configured maximum number of round trips, or + // if writeBuffer capacity has been exceeded. + writeDelayCountDown -= 1 + if (writeDelayCountDown == 0 || previousWriteBufferSize == writeBuffer.length || writeBuffer.length >= writeBufferSize) + sendWriteBuffer() + else + sendWriteDelay() + case WriteAck => if (writeBuffer.isEmpty) writeInProgress = false + else if (coalesceWritesDisabled || writeBuffer.length >= writeBufferSize) + sendWriteBuffer() else { - connection ! Write(writeBuffer, WriteAck) - writeInProgress = true - writeBuffer = ByteString.empty + writeDelayCountDown = coalesceWrites + sendWriteDelay() } if (!writeInProgress && connectionClosePending) { @@ -417,12 +452,15 @@ private[stream] object ConnectionSourceStage { ReactiveStreamsCompliance.requireNonNullElement(elem) if (writeInProgress) { writeBuffer = writeBuffer ++ elem + } else if (coalesceWritesDisabled || writeBuffer.length >= writeBufferSize) { + writeBuffer = writeBuffer ++ elem + sendWriteBuffer() } else { - connection ! Write(writeBuffer ++ elem, WriteAck) - writeInProgress = true - writeBuffer = ByteString.empty + writeBuffer = writeBuffer ++ elem + writeDelayCountDown = coalesceWrites + sendWriteDelay() } - if (writeBuffer.size < writeBufferSize) + if (writeBuffer.length < writeBufferSize) pull(bytesIn) }