diff --git a/akka-stream-testkit/src/test/scala/akka/stream/testkit/StreamTestKitSpec.scala b/akka-stream-testkit/src/test/scala/akka/stream/testkit/StreamTestKitSpec.scala index 773b3e7646..87a7108177 100644 --- a/akka-stream-testkit/src/test/scala/akka/stream/testkit/StreamTestKitSpec.scala +++ b/akka-stream-testkit/src/test/scala/akka/stream/testkit/StreamTestKitSpec.scala @@ -131,9 +131,9 @@ class StreamTestKitSpec extends AkkaSpec { "#expectNextWithTimeoutPF should fail after timeout when element delayed" in { intercept[AssertionError] { - val timeout = 100 millis - val overTimeout = timeout + (10 millis) - Source.tick(overTimeout, 1 millis, 1).runWith(TestSink.probe) + val timeout = 100.millis + val overTimeout = timeout + (10.millis) + Source.tick(overTimeout, 1.millis, 1).runWith(TestSink.probe) .request(1) .expectNextWithTimeoutPF(timeout, { case 1 ⇒ @@ -169,9 +169,9 @@ class StreamTestKitSpec extends AkkaSpec { "#expectNextChainingPF should fail after timeout when element delayed" in { intercept[AssertionError] { - val timeout = 100 millis - val overTimeout = timeout + (10 millis) - Source.tick(overTimeout, 1 millis, 1).runWith(TestSink.probe) + val timeout = 100.millis + val overTimeout = timeout + (10.millis) + Source.tick(overTimeout, 1.millis, 1).runWith(TestSink.probe) .request(1) .expectNextChainingPF(timeout, { case 1 ⇒ diff --git a/akka-stream/src/main/mima-filters/2.5.6.backwards.excludes b/akka-stream/src/main/mima-filters/2.5.6.backwards.excludes index 0077a82945..a1fdcf5948 100644 --- a/akka-stream/src/main/mima-filters/2.5.6.backwards.excludes +++ b/akka-stream/src/main/mima-filters/2.5.6.backwards.excludes @@ -1,2 +1,6 @@ ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.io.compression.GzipCompressor.this") ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.io.compression.DeflateCompressor.this") + +# Optimize TCP stream writes +ProblemFilters.exclude[Problem]("akka.stream.impl.io.*") + diff --git a/akka-stream/src/main/resources/reference.conf b/akka-stream/src/main/resources/reference.conf index efa1753195..747f4120dc 100644 --- a/akka-stream/src/main/resources/reference.conf +++ b/akka-stream/src/main/resources/reference.conf @@ -72,6 +72,16 @@ akka { # of 1 on the corresponding dispatchers. fuzzing-mode = off } + + io.tcp { + # The outgoing bytes are accumulated in a buffer while waiting for acknoledgment + # of pending write. This improves throughput for small messages (frames) without + # sacrificing latency. While waiting for the ack the stage will eagerly pull + # from upstream until the buffer exceeds this size. That means that the buffer may hold + # slightly more bytes than this limit (at most one element more). It can be set to 0 + # to disable the usage of the buffer. + write-buffer-size = 16 KiB + } } # Fully qualified config path which holds the dispatcher configuration diff --git a/akka-stream/src/main/scala/akka/stream/ActorMaterializer.scala b/akka-stream/src/main/scala/akka/stream/ActorMaterializer.scala index c6a837eafd..dc97cfede8 100644 --- a/akka-stream/src/main/scala/akka/stream/ActorMaterializer.scala +++ b/akka-stream/src/main/scala/akka/stream/ActorMaterializer.scala @@ -272,7 +272,8 @@ object ActorMaterializerSettings { fuzzingMode = config.getBoolean("debug.fuzzing-mode"), autoFusing = config.getBoolean("auto-fusing"), maxFixedBufferSize = config.getInt("max-fixed-buffer-size"), - syncProcessingLimit = config.getInt("sync-processing-limit")) + syncProcessingLimit = config.getInt("sync-processing-limit"), + ioSettings = IOSettings(config.getConfig("io"))) /** * Create [[ActorMaterializerSettings]] from individual settings (Java). @@ -322,7 +323,25 @@ final class ActorMaterializerSettings private ( val fuzzingMode: Boolean, val autoFusing: Boolean, val maxFixedBufferSize: Int, - val syncProcessingLimit: Int) { + val syncProcessingLimit: Int, + val ioSettings: IOSettings) { + + // backwards compatibility when added IOSettings, shouldn't be needed since private, but added to satisfy mima + def this( + initialInputBufferSize: Int, + maxInputBufferSize: Int, + dispatcher: String, + supervisionDecider: Supervision.Decider, + subscriptionTimeoutSettings: StreamSubscriptionTimeoutSettings, + debugLogging: Boolean, + outputBurstLimit: Int, + fuzzingMode: Boolean, + autoFusing: Boolean, + maxFixedBufferSize: Int, + syncProcessingLimit: Int) = + this(initialInputBufferSize, maxInputBufferSize, dispatcher, supervisionDecider, subscriptionTimeoutSettings, debugLogging, + outputBurstLimit, fuzzingMode, autoFusing, maxFixedBufferSize, syncProcessingLimit, + IOSettings(tcpWriteBufferSize = 16 * 1024)) def this( initialInputBufferSize: Int, @@ -334,10 +353,9 @@ final class ActorMaterializerSettings private ( outputBurstLimit: Int, fuzzingMode: Boolean, autoFusing: Boolean, - maxFixedBufferSize: Int) { + maxFixedBufferSize: Int) = this(initialInputBufferSize, maxInputBufferSize, dispatcher, supervisionDecider, subscriptionTimeoutSettings, debugLogging, outputBurstLimit, fuzzingMode, autoFusing, maxFixedBufferSize, defaultMaxFixedBufferSize) - } require(initialInputBufferSize > 0, "initialInputBufferSize must be > 0") require(syncProcessingLimit > 0, "syncProcessingLimit must be > 0") @@ -356,10 +374,11 @@ final class ActorMaterializerSettings private ( fuzzingMode: Boolean = this.fuzzingMode, autoFusing: Boolean = this.autoFusing, maxFixedBufferSize: Int = this.maxFixedBufferSize, - syncProcessingLimit: Int = this.syncProcessingLimit) = { + syncProcessingLimit: Int = this.syncProcessingLimit, + ioSettings: IOSettings = this.ioSettings) = { new ActorMaterializerSettings( initialInputBufferSize, maxInputBufferSize, dispatcher, supervisionDecider, subscriptionTimeoutSettings, debugLogging, - outputBurstLimit, fuzzingMode, autoFusing, maxFixedBufferSize, syncProcessingLimit) + outputBurstLimit, fuzzingMode, autoFusing, maxFixedBufferSize, syncProcessingLimit, ioSettings) } /** @@ -465,6 +484,10 @@ final class ActorMaterializerSettings private ( if (settings == this.subscriptionTimeoutSettings) this else copy(subscriptionTimeoutSettings = settings) + def withIOSettings(ioSettings: IOSettings): ActorMaterializerSettings = + if (ioSettings == this.ioSettings) this + else copy(ioSettings = ioSettings) + private def requirePowerOfTwo(n: Integer, name: String): Unit = { require(n > 0, s"$name must be > 0") require((n & (n - 1)) == 0, s"$name must be a power of two") @@ -481,11 +504,52 @@ final class ActorMaterializerSettings private ( s.outputBurstLimit == outputBurstLimit && s.syncProcessingLimit == syncProcessingLimit && s.fuzzingMode == fuzzingMode && - s.autoFusing == autoFusing + s.autoFusing == autoFusing && + s.ioSettings == ioSettings case _ ⇒ false } - override def toString: String = s"ActorMaterializerSettings($initialInputBufferSize,$maxInputBufferSize,$dispatcher,$supervisionDecider,$subscriptionTimeoutSettings,$debugLogging,$outputBurstLimit,$syncProcessingLimit,$fuzzingMode,$autoFusing)" + override def toString: String = s"ActorMaterializerSettings($initialInputBufferSize,$maxInputBufferSize," + + s"$dispatcher,$supervisionDecider,$subscriptionTimeoutSettings,$debugLogging,$outputBurstLimit," + + s"$syncProcessingLimit,$fuzzingMode,$autoFusing,$ioSettings)" +} + +object IOSettings { + def apply(system: ActorSystem): IOSettings = + apply(system.settings.config.getConfig("akka.stream.materializer.io")) + + def apply(config: Config): IOSettings = + new IOSettings( + tcpWriteBufferSize = math.min(Int.MaxValue, config.getBytes("tcp.write-buffer-size")).toInt) + + def apply(tcpWriteBufferSize: Int): IOSettings = + new IOSettings(tcpWriteBufferSize) + + /** Java API */ + def create(config: Config) = apply(config) + + /** Java API */ + def create(system: ActorSystem) = apply(system) + + /** Java API */ + def create(tcpWriteBufferSize: Int): IOSettings = + apply(tcpWriteBufferSize) +} + +final class IOSettings private (val tcpWriteBufferSize: Int) { + + def withTcpWriteBufferSize(value: Int): IOSettings = copy(tcpWriteBufferSize = value) + + private def copy(tcpWriteBufferSize: Int = tcpWriteBufferSize): IOSettings = new IOSettings( + tcpWriteBufferSize = tcpWriteBufferSize) + + override def equals(other: Any): Boolean = other match { + case s: IOSettings ⇒ s.tcpWriteBufferSize == tcpWriteBufferSize + case _ ⇒ false + } + + override def toString = + s"""IoSettings(${tcpWriteBufferSize})""" } object StreamSubscriptionTimeoutSettings { diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/TcpStages.scala b/akka-stream/src/main/scala/akka/stream/impl/io/TcpStages.scala index f09d6b3d9c..50d7a4f46f 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/io/TcpStages.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/TcpStages.scala @@ -36,7 +36,8 @@ import scala.concurrent.{ Future, Promise } val options: immutable.Traversable[SocketOption], val halfClose: Boolean, val idleTimeout: Duration, - val bindShutdownTimeout: FiniteDuration) + val bindShutdownTimeout: FiniteDuration, + val ioSettings: IOSettings) extends GraphStageWithMaterializedValue[SourceShape[StreamTcp.IncomingConnection], Future[StreamTcp.ServerBinding]] { import ConnectionSourceStage._ @@ -114,7 +115,7 @@ import scala.concurrent.{ Future, Promise } connectionFlowsAwaitingInitialization.incrementAndGet() val tcpFlow = - Flow.fromGraph(new IncomingConnectionStage(connection, connected.remoteAddress, halfClose)) + Flow.fromGraph(new IncomingConnectionStage(connection, connected.remoteAddress, halfClose, ioSettings)) .via(detacher[ByteString]) // must read ahead for proper completions .mapMaterializedValue { m ⇒ connectionFlowsAwaitingInitialization.decrementAndGet() @@ -176,13 +177,16 @@ private[stream] object ConnectionSourceStage { trait TcpRole { def halfClose: Boolean + def ioSettings: IOSettings } case class Outbound( manager: ActorRef, connectCmd: Connect, localAddressPromise: Promise[InetSocketAddress], - halfClose: Boolean) extends TcpRole - case class Inbound(connection: ActorRef, halfClose: Boolean) extends TcpRole + halfClose: Boolean, + ioSettings: IOSettings) extends TcpRole + + case class Inbound(connection: ActorRef, halfClose: Boolean, ioSettings: IOSettings) extends TcpRole /* * This is a *non-detached* design, i.e. this does not prefetch itself any of the inputs. It relies on downstream @@ -198,6 +202,11 @@ private[stream] object ConnectionSourceStage { private def bytesOut = shape.out private var connection: ActorRef = _ + private val writeBufferSize = role.ioSettings.tcpWriteBufferSize + private var writeBuffer = ByteString.empty + private var writePending = false + private var connectionClosePending = false + // No reading until role have been decided setHandler(bytesOut, new OutHandler { override def onPull(): Unit = () @@ -206,13 +215,13 @@ private[stream] object ConnectionSourceStage { override def preStart(): Unit = { setKeepGoing(true) role match { - case Inbound(conn, _) ⇒ + case Inbound(conn, _, _) ⇒ setHandler(bytesOut, readHandler) connection = conn getStageActor(connected).watch(connection) connection ! Register(self, keepOpenOnPeerClosed = true, useResumeWriting = false) pull(bytesIn) - case ob @ Outbound(manager, cmd, _, _) ⇒ + case ob @ Outbound(manager, cmd, _, _, _) ⇒ getStageActor(connecting(ob)).watch(manager) manager ! cmd } @@ -238,9 +247,30 @@ private[stream] object ConnectionSourceStage { } private def connected(evt: (ActorRef, Any)): Unit = { - val sender = evt._1 val msg = evt._2 msg match { + case Received(data) ⇒ + // Keep on reading even when closed. There is no "close-read-side" in TCP + if (isClosed(bytesOut)) connection ! ResumeReading + else push(bytesOut, data) + + case WriteAck ⇒ + if (writeBuffer.isEmpty) + writePending = false + else { + connection ! Write(writeBuffer, WriteAck) + writePending = true + writeBuffer = ByteString.empty + } + + if (!writePending && connectionClosePending) { + // continue onUpstreamFinish + closeConnection() + } + + if (!isClosed(bytesIn) && !hasBeenPulled(bytesIn)) + pull(bytesIn) + case Terminated(_) ⇒ failStage(new StreamTcpException("The connection actor has terminated. Stopping now.")) case f @ CommandFailed(cmd) ⇒ failStage(new StreamTcpException(s"Tcp command [$cmd] failed${f.causedByString}")) case ErrorClosed(cause) ⇒ failStage(new StreamTcpException(s"The connection closed with error: $cause")) @@ -249,15 +279,27 @@ private[stream] object ConnectionSourceStage { case ConfirmedClosed ⇒ completeStage() case PeerClosed ⇒ complete(bytesOut) - case Received(data) ⇒ - // Keep on reading even when closed. There is no "close-read-side" in TCP - if (isClosed(bytesOut)) connection ! ResumeReading - else push(bytesOut, data) - - case WriteAck ⇒ if (!isClosed(bytesIn)) pull(bytesIn) } } + private def closeConnection(): Unit = { + // Note that if there are pending bytes in the writeBuffer those must be written first. + if (isClosed(bytesOut) || !role.halfClose) { + // Reading has stopped before, either because of cancel, or PeerClosed, so just Close now + // (or half-close is turned off) + if (writePending) + connectionClosePending = true // will continue when WriteAck is received and writeBuffer drained + else + connection ! Close + } else if (connection != null) { + // We still read, so we only close the write side + if (writePending) + connectionClosePending = true // will continue when WriteAck is received and writeBuffer drained + else + connection ! ConfirmedClose + } else completeStage() + } + val readHandler = new OutHandler { override def onPull(): Unit = { connection ! ResumeReading @@ -276,17 +318,20 @@ private[stream] object ConnectionSourceStage { override def onPush(): Unit = { val elem = grab(bytesIn) ReactiveStreamsCompliance.requireNonNullElement(elem) - connection ! Write(elem.asInstanceOf[ByteString], WriteAck) + if (writePending) { + writeBuffer = writeBuffer ++ elem + } else { + connection ! Write(writeBuffer ++ elem, WriteAck) + writePending = true + writeBuffer = ByteString.empty + } + if (writeBuffer.size < writeBufferSize) + pull(bytesIn) + } - override def onUpstreamFinish(): Unit = { - // Reading has stopped before, either because of cancel, or PeerClosed, so just Close now - // (or half-close is turned off) - if (isClosed(bytesOut) || !role.halfClose) connection ! Close - // We still read, so we only close the write side - else if (connection != null) connection ! ConfirmedClose - else completeStage() - } + override def onUpstreamFinish(): Unit = + closeConnection() override def onUpstreamFailure(ex: Throwable): Unit = { if (connection != null) { @@ -302,18 +347,20 @@ private[stream] object ConnectionSourceStage { }) override def postStop(): Unit = role match { - case Outbound(_, _, localAddressPromise, _) ⇒ + case Outbound(_, _, localAddressPromise, _, _) ⇒ // Fail if has not been completed with an address earlier localAddressPromise.tryFailure(new StreamTcpException("Connection failed.")) case _ ⇒ // do nothing... } + writeBuffer = ByteString.empty } } /** * INTERNAL API */ -@InternalApi private[akka] class IncomingConnectionStage(connection: ActorRef, remoteAddress: InetSocketAddress, halfClose: Boolean) +@InternalApi private[akka] class IncomingConnectionStage( + connection: ActorRef, remoteAddress: InetSocketAddress, halfClose: Boolean, ioSettings: IOSettings) extends GraphStage[FlowShape[ByteString, ByteString]] { import TcpConnectionStage._ @@ -328,7 +375,7 @@ private[stream] object ConnectionSourceStage { if (hasBeenCreated.get) throw new IllegalStateException("Cannot materialize an incoming connection Flow twice.") hasBeenCreated.set(true) - new TcpStreamLogic(shape, Inbound(connection, halfClose), remoteAddress) + new TcpStreamLogic(shape, Inbound(connection, halfClose, ioSettings), remoteAddress) } override def toString = s"TCP-from($remoteAddress)" @@ -343,7 +390,8 @@ private[stream] object ConnectionSourceStage { localAddress: Option[InetSocketAddress] = None, options: immutable.Traversable[SocketOption] = Nil, halfClose: Boolean = true, - connectTimeout: Duration = Duration.Inf) + connectTimeout: Duration = Duration.Inf, + ioSettings: IOSettings) extends GraphStageWithMaterializedValue[FlowShape[ByteString, ByteString], Future[StreamTcp.OutgoingConnection]] { import TcpConnectionStage._ @@ -365,7 +413,8 @@ private[stream] object ConnectionSourceStage { manager, Connect(remoteAddress, localAddress, options, connTimeout, pullMode = true), localAddressPromise, - halfClose), + halfClose, + ioSettings), remoteAddress) (logic, localAddressPromise.future.map(OutgoingConnection(remoteAddress, _))(ExecutionContexts.sameThreadExecutionContext)) diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Tcp.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Tcp.scala index 77ebd031f9..c7770ac05a 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Tcp.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Tcp.scala @@ -65,8 +65,10 @@ object Tcp extends ExtensionId[Tcp] with ExtensionIdProvider { final class Tcp(system: ExtendedActorSystem) extends akka.actor.Extension { import Tcp._ + private val settings = ActorMaterializerSettings(system) + // TODO maybe this should be a new setting, like `akka.stream.tcp.bind.timeout` / `shutdown-timeout` instead? - val bindShutdownTimeout = ActorMaterializer()(system).settings.subscriptionTimeoutSettings.timeout + val bindShutdownTimeout = settings.subscriptionTimeoutSettings.timeout /** * Creates a [[Tcp.ServerBinding]] instance which represents a prospective TCP server binding on the given `endpoint`. @@ -103,7 +105,8 @@ final class Tcp(system: ExtendedActorSystem) extends akka.actor.Extension { options, halfClose, idleTimeout, - bindShutdownTimeout)) + bindShutdownTimeout, + settings.ioSettings)) /** * Creates a [[Tcp.ServerBinding]] instance which represents a prospective TCP server binding on the given `endpoint` @@ -175,7 +178,8 @@ final class Tcp(system: ExtendedActorSystem) extends akka.actor.Extension { localAddress, options, halfClose, - connectTimeout)).via(detacher[ByteString]) // must read ahead for proper completions + connectTimeout, + settings.ioSettings)).via(detacher[ByteString]) // must read ahead for proper completions idleTimeout match { case d: FiniteDuration ⇒ tcpFlow.join(TcpIdleTimeout(d, Some(remoteAddress)))