* This is an optimization of TcpStreamLogic to accumulating bytes in a buffer while waiting for acknoledgment of pending write. This improves throughput for small messages (frames) without sacrificing latency. While waiting for the ack the stage will eagerly pull from upstream until the buffer limit is exceeded. Accumulated bytes are written immediately when ack is received. * Noticed 20x throughput improvement with Artery MaxThroughputSpec thanks to this buffer when working on the Artery TCP implementation. The small message (100 bytes) benchmark improved from 30k msg/s to 600k msg/s.
This commit is contained in:
parent
d7330c3c72
commit
f017f6a90a
6 changed files with 175 additions and 44 deletions
|
|
@ -131,9 +131,9 @@ class StreamTestKitSpec extends AkkaSpec {
|
||||||
|
|
||||||
"#expectNextWithTimeoutPF should fail after timeout when element delayed" in {
|
"#expectNextWithTimeoutPF should fail after timeout when element delayed" in {
|
||||||
intercept[AssertionError] {
|
intercept[AssertionError] {
|
||||||
val timeout = 100 millis
|
val timeout = 100.millis
|
||||||
val overTimeout = timeout + (10 millis)
|
val overTimeout = timeout + (10.millis)
|
||||||
Source.tick(overTimeout, 1 millis, 1).runWith(TestSink.probe)
|
Source.tick(overTimeout, 1.millis, 1).runWith(TestSink.probe)
|
||||||
.request(1)
|
.request(1)
|
||||||
.expectNextWithTimeoutPF(timeout, {
|
.expectNextWithTimeoutPF(timeout, {
|
||||||
case 1 ⇒
|
case 1 ⇒
|
||||||
|
|
@ -169,9 +169,9 @@ class StreamTestKitSpec extends AkkaSpec {
|
||||||
|
|
||||||
"#expectNextChainingPF should fail after timeout when element delayed" in {
|
"#expectNextChainingPF should fail after timeout when element delayed" in {
|
||||||
intercept[AssertionError] {
|
intercept[AssertionError] {
|
||||||
val timeout = 100 millis
|
val timeout = 100.millis
|
||||||
val overTimeout = timeout + (10 millis)
|
val overTimeout = timeout + (10.millis)
|
||||||
Source.tick(overTimeout, 1 millis, 1).runWith(TestSink.probe)
|
Source.tick(overTimeout, 1.millis, 1).runWith(TestSink.probe)
|
||||||
.request(1)
|
.request(1)
|
||||||
.expectNextChainingPF(timeout, {
|
.expectNextChainingPF(timeout, {
|
||||||
case 1 ⇒
|
case 1 ⇒
|
||||||
|
|
|
||||||
|
|
@ -1,2 +1,6 @@
|
||||||
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.io.compression.GzipCompressor.this")
|
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.io.compression.GzipCompressor.this")
|
||||||
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.io.compression.DeflateCompressor.this")
|
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.io.compression.DeflateCompressor.this")
|
||||||
|
|
||||||
|
# Optimize TCP stream writes
|
||||||
|
ProblemFilters.exclude[Problem]("akka.stream.impl.io.*")
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -72,6 +72,16 @@ akka {
|
||||||
# of 1 on the corresponding dispatchers.
|
# of 1 on the corresponding dispatchers.
|
||||||
fuzzing-mode = off
|
fuzzing-mode = off
|
||||||
}
|
}
|
||||||
|
|
||||||
|
io.tcp {
|
||||||
|
# The outgoing bytes are accumulated in a buffer while waiting for acknoledgment
|
||||||
|
# of pending write. This improves throughput for small messages (frames) without
|
||||||
|
# sacrificing latency. While waiting for the ack the stage will eagerly pull
|
||||||
|
# from upstream until the buffer exceeds this size. That means that the buffer may hold
|
||||||
|
# slightly more bytes than this limit (at most one element more). It can be set to 0
|
||||||
|
# to disable the usage of the buffer.
|
||||||
|
write-buffer-size = 16 KiB
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
# Fully qualified config path which holds the dispatcher configuration
|
# Fully qualified config path which holds the dispatcher configuration
|
||||||
|
|
|
||||||
|
|
@ -272,7 +272,8 @@ object ActorMaterializerSettings {
|
||||||
fuzzingMode = config.getBoolean("debug.fuzzing-mode"),
|
fuzzingMode = config.getBoolean("debug.fuzzing-mode"),
|
||||||
autoFusing = config.getBoolean("auto-fusing"),
|
autoFusing = config.getBoolean("auto-fusing"),
|
||||||
maxFixedBufferSize = config.getInt("max-fixed-buffer-size"),
|
maxFixedBufferSize = config.getInt("max-fixed-buffer-size"),
|
||||||
syncProcessingLimit = config.getInt("sync-processing-limit"))
|
syncProcessingLimit = config.getInt("sync-processing-limit"),
|
||||||
|
ioSettings = IOSettings(config.getConfig("io")))
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Create [[ActorMaterializerSettings]] from individual settings (Java).
|
* Create [[ActorMaterializerSettings]] from individual settings (Java).
|
||||||
|
|
@ -322,7 +323,25 @@ final class ActorMaterializerSettings private (
|
||||||
val fuzzingMode: Boolean,
|
val fuzzingMode: Boolean,
|
||||||
val autoFusing: Boolean,
|
val autoFusing: Boolean,
|
||||||
val maxFixedBufferSize: Int,
|
val maxFixedBufferSize: Int,
|
||||||
val syncProcessingLimit: Int) {
|
val syncProcessingLimit: Int,
|
||||||
|
val ioSettings: IOSettings) {
|
||||||
|
|
||||||
|
// backwards compatibility when added IOSettings, shouldn't be needed since private, but added to satisfy mima
|
||||||
|
def this(
|
||||||
|
initialInputBufferSize: Int,
|
||||||
|
maxInputBufferSize: Int,
|
||||||
|
dispatcher: String,
|
||||||
|
supervisionDecider: Supervision.Decider,
|
||||||
|
subscriptionTimeoutSettings: StreamSubscriptionTimeoutSettings,
|
||||||
|
debugLogging: Boolean,
|
||||||
|
outputBurstLimit: Int,
|
||||||
|
fuzzingMode: Boolean,
|
||||||
|
autoFusing: Boolean,
|
||||||
|
maxFixedBufferSize: Int,
|
||||||
|
syncProcessingLimit: Int) =
|
||||||
|
this(initialInputBufferSize, maxInputBufferSize, dispatcher, supervisionDecider, subscriptionTimeoutSettings, debugLogging,
|
||||||
|
outputBurstLimit, fuzzingMode, autoFusing, maxFixedBufferSize, syncProcessingLimit,
|
||||||
|
IOSettings(tcpWriteBufferSize = 16 * 1024))
|
||||||
|
|
||||||
def this(
|
def this(
|
||||||
initialInputBufferSize: Int,
|
initialInputBufferSize: Int,
|
||||||
|
|
@ -334,10 +353,9 @@ final class ActorMaterializerSettings private (
|
||||||
outputBurstLimit: Int,
|
outputBurstLimit: Int,
|
||||||
fuzzingMode: Boolean,
|
fuzzingMode: Boolean,
|
||||||
autoFusing: Boolean,
|
autoFusing: Boolean,
|
||||||
maxFixedBufferSize: Int) {
|
maxFixedBufferSize: Int) =
|
||||||
this(initialInputBufferSize, maxInputBufferSize, dispatcher, supervisionDecider, subscriptionTimeoutSettings, debugLogging,
|
this(initialInputBufferSize, maxInputBufferSize, dispatcher, supervisionDecider, subscriptionTimeoutSettings, debugLogging,
|
||||||
outputBurstLimit, fuzzingMode, autoFusing, maxFixedBufferSize, defaultMaxFixedBufferSize)
|
outputBurstLimit, fuzzingMode, autoFusing, maxFixedBufferSize, defaultMaxFixedBufferSize)
|
||||||
}
|
|
||||||
|
|
||||||
require(initialInputBufferSize > 0, "initialInputBufferSize must be > 0")
|
require(initialInputBufferSize > 0, "initialInputBufferSize must be > 0")
|
||||||
require(syncProcessingLimit > 0, "syncProcessingLimit must be > 0")
|
require(syncProcessingLimit > 0, "syncProcessingLimit must be > 0")
|
||||||
|
|
@ -356,10 +374,11 @@ final class ActorMaterializerSettings private (
|
||||||
fuzzingMode: Boolean = this.fuzzingMode,
|
fuzzingMode: Boolean = this.fuzzingMode,
|
||||||
autoFusing: Boolean = this.autoFusing,
|
autoFusing: Boolean = this.autoFusing,
|
||||||
maxFixedBufferSize: Int = this.maxFixedBufferSize,
|
maxFixedBufferSize: Int = this.maxFixedBufferSize,
|
||||||
syncProcessingLimit: Int = this.syncProcessingLimit) = {
|
syncProcessingLimit: Int = this.syncProcessingLimit,
|
||||||
|
ioSettings: IOSettings = this.ioSettings) = {
|
||||||
new ActorMaterializerSettings(
|
new ActorMaterializerSettings(
|
||||||
initialInputBufferSize, maxInputBufferSize, dispatcher, supervisionDecider, subscriptionTimeoutSettings, debugLogging,
|
initialInputBufferSize, maxInputBufferSize, dispatcher, supervisionDecider, subscriptionTimeoutSettings, debugLogging,
|
||||||
outputBurstLimit, fuzzingMode, autoFusing, maxFixedBufferSize, syncProcessingLimit)
|
outputBurstLimit, fuzzingMode, autoFusing, maxFixedBufferSize, syncProcessingLimit, ioSettings)
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -465,6 +484,10 @@ final class ActorMaterializerSettings private (
|
||||||
if (settings == this.subscriptionTimeoutSettings) this
|
if (settings == this.subscriptionTimeoutSettings) this
|
||||||
else copy(subscriptionTimeoutSettings = settings)
|
else copy(subscriptionTimeoutSettings = settings)
|
||||||
|
|
||||||
|
def withIOSettings(ioSettings: IOSettings): ActorMaterializerSettings =
|
||||||
|
if (ioSettings == this.ioSettings) this
|
||||||
|
else copy(ioSettings = ioSettings)
|
||||||
|
|
||||||
private def requirePowerOfTwo(n: Integer, name: String): Unit = {
|
private def requirePowerOfTwo(n: Integer, name: String): Unit = {
|
||||||
require(n > 0, s"$name must be > 0")
|
require(n > 0, s"$name must be > 0")
|
||||||
require((n & (n - 1)) == 0, s"$name must be a power of two")
|
require((n & (n - 1)) == 0, s"$name must be a power of two")
|
||||||
|
|
@ -481,11 +504,52 @@ final class ActorMaterializerSettings private (
|
||||||
s.outputBurstLimit == outputBurstLimit &&
|
s.outputBurstLimit == outputBurstLimit &&
|
||||||
s.syncProcessingLimit == syncProcessingLimit &&
|
s.syncProcessingLimit == syncProcessingLimit &&
|
||||||
s.fuzzingMode == fuzzingMode &&
|
s.fuzzingMode == fuzzingMode &&
|
||||||
s.autoFusing == autoFusing
|
s.autoFusing == autoFusing &&
|
||||||
|
s.ioSettings == ioSettings
|
||||||
case _ ⇒ false
|
case _ ⇒ false
|
||||||
}
|
}
|
||||||
|
|
||||||
override def toString: String = s"ActorMaterializerSettings($initialInputBufferSize,$maxInputBufferSize,$dispatcher,$supervisionDecider,$subscriptionTimeoutSettings,$debugLogging,$outputBurstLimit,$syncProcessingLimit,$fuzzingMode,$autoFusing)"
|
override def toString: String = s"ActorMaterializerSettings($initialInputBufferSize,$maxInputBufferSize," +
|
||||||
|
s"$dispatcher,$supervisionDecider,$subscriptionTimeoutSettings,$debugLogging,$outputBurstLimit," +
|
||||||
|
s"$syncProcessingLimit,$fuzzingMode,$autoFusing,$ioSettings)"
|
||||||
|
}
|
||||||
|
|
||||||
|
object IOSettings {
|
||||||
|
def apply(system: ActorSystem): IOSettings =
|
||||||
|
apply(system.settings.config.getConfig("akka.stream.materializer.io"))
|
||||||
|
|
||||||
|
def apply(config: Config): IOSettings =
|
||||||
|
new IOSettings(
|
||||||
|
tcpWriteBufferSize = math.min(Int.MaxValue, config.getBytes("tcp.write-buffer-size")).toInt)
|
||||||
|
|
||||||
|
def apply(tcpWriteBufferSize: Int): IOSettings =
|
||||||
|
new IOSettings(tcpWriteBufferSize)
|
||||||
|
|
||||||
|
/** Java API */
|
||||||
|
def create(config: Config) = apply(config)
|
||||||
|
|
||||||
|
/** Java API */
|
||||||
|
def create(system: ActorSystem) = apply(system)
|
||||||
|
|
||||||
|
/** Java API */
|
||||||
|
def create(tcpWriteBufferSize: Int): IOSettings =
|
||||||
|
apply(tcpWriteBufferSize)
|
||||||
|
}
|
||||||
|
|
||||||
|
final class IOSettings private (val tcpWriteBufferSize: Int) {
|
||||||
|
|
||||||
|
def withTcpWriteBufferSize(value: Int): IOSettings = copy(tcpWriteBufferSize = value)
|
||||||
|
|
||||||
|
private def copy(tcpWriteBufferSize: Int = tcpWriteBufferSize): IOSettings = new IOSettings(
|
||||||
|
tcpWriteBufferSize = tcpWriteBufferSize)
|
||||||
|
|
||||||
|
override def equals(other: Any): Boolean = other match {
|
||||||
|
case s: IOSettings ⇒ s.tcpWriteBufferSize == tcpWriteBufferSize
|
||||||
|
case _ ⇒ false
|
||||||
|
}
|
||||||
|
|
||||||
|
override def toString =
|
||||||
|
s"""IoSettings(${tcpWriteBufferSize})"""
|
||||||
}
|
}
|
||||||
|
|
||||||
object StreamSubscriptionTimeoutSettings {
|
object StreamSubscriptionTimeoutSettings {
|
||||||
|
|
|
||||||
|
|
@ -36,7 +36,8 @@ import scala.concurrent.{ Future, Promise }
|
||||||
val options: immutable.Traversable[SocketOption],
|
val options: immutable.Traversable[SocketOption],
|
||||||
val halfClose: Boolean,
|
val halfClose: Boolean,
|
||||||
val idleTimeout: Duration,
|
val idleTimeout: Duration,
|
||||||
val bindShutdownTimeout: FiniteDuration)
|
val bindShutdownTimeout: FiniteDuration,
|
||||||
|
val ioSettings: IOSettings)
|
||||||
extends GraphStageWithMaterializedValue[SourceShape[StreamTcp.IncomingConnection], Future[StreamTcp.ServerBinding]] {
|
extends GraphStageWithMaterializedValue[SourceShape[StreamTcp.IncomingConnection], Future[StreamTcp.ServerBinding]] {
|
||||||
import ConnectionSourceStage._
|
import ConnectionSourceStage._
|
||||||
|
|
||||||
|
|
@ -114,7 +115,7 @@ import scala.concurrent.{ Future, Promise }
|
||||||
connectionFlowsAwaitingInitialization.incrementAndGet()
|
connectionFlowsAwaitingInitialization.incrementAndGet()
|
||||||
|
|
||||||
val tcpFlow =
|
val tcpFlow =
|
||||||
Flow.fromGraph(new IncomingConnectionStage(connection, connected.remoteAddress, halfClose))
|
Flow.fromGraph(new IncomingConnectionStage(connection, connected.remoteAddress, halfClose, ioSettings))
|
||||||
.via(detacher[ByteString]) // must read ahead for proper completions
|
.via(detacher[ByteString]) // must read ahead for proper completions
|
||||||
.mapMaterializedValue { m ⇒
|
.mapMaterializedValue { m ⇒
|
||||||
connectionFlowsAwaitingInitialization.decrementAndGet()
|
connectionFlowsAwaitingInitialization.decrementAndGet()
|
||||||
|
|
@ -176,13 +177,16 @@ private[stream] object ConnectionSourceStage {
|
||||||
|
|
||||||
trait TcpRole {
|
trait TcpRole {
|
||||||
def halfClose: Boolean
|
def halfClose: Boolean
|
||||||
|
def ioSettings: IOSettings
|
||||||
}
|
}
|
||||||
case class Outbound(
|
case class Outbound(
|
||||||
manager: ActorRef,
|
manager: ActorRef,
|
||||||
connectCmd: Connect,
|
connectCmd: Connect,
|
||||||
localAddressPromise: Promise[InetSocketAddress],
|
localAddressPromise: Promise[InetSocketAddress],
|
||||||
halfClose: Boolean) extends TcpRole
|
halfClose: Boolean,
|
||||||
case class Inbound(connection: ActorRef, halfClose: Boolean) extends TcpRole
|
ioSettings: IOSettings) extends TcpRole
|
||||||
|
|
||||||
|
case class Inbound(connection: ActorRef, halfClose: Boolean, ioSettings: IOSettings) extends TcpRole
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* This is a *non-detached* design, i.e. this does not prefetch itself any of the inputs. It relies on downstream
|
* This is a *non-detached* design, i.e. this does not prefetch itself any of the inputs. It relies on downstream
|
||||||
|
|
@ -198,6 +202,11 @@ private[stream] object ConnectionSourceStage {
|
||||||
private def bytesOut = shape.out
|
private def bytesOut = shape.out
|
||||||
private var connection: ActorRef = _
|
private var connection: ActorRef = _
|
||||||
|
|
||||||
|
private val writeBufferSize = role.ioSettings.tcpWriteBufferSize
|
||||||
|
private var writeBuffer = ByteString.empty
|
||||||
|
private var writePending = false
|
||||||
|
private var connectionClosePending = false
|
||||||
|
|
||||||
// No reading until role have been decided
|
// No reading until role have been decided
|
||||||
setHandler(bytesOut, new OutHandler {
|
setHandler(bytesOut, new OutHandler {
|
||||||
override def onPull(): Unit = ()
|
override def onPull(): Unit = ()
|
||||||
|
|
@ -206,13 +215,13 @@ private[stream] object ConnectionSourceStage {
|
||||||
override def preStart(): Unit = {
|
override def preStart(): Unit = {
|
||||||
setKeepGoing(true)
|
setKeepGoing(true)
|
||||||
role match {
|
role match {
|
||||||
case Inbound(conn, _) ⇒
|
case Inbound(conn, _, _) ⇒
|
||||||
setHandler(bytesOut, readHandler)
|
setHandler(bytesOut, readHandler)
|
||||||
connection = conn
|
connection = conn
|
||||||
getStageActor(connected).watch(connection)
|
getStageActor(connected).watch(connection)
|
||||||
connection ! Register(self, keepOpenOnPeerClosed = true, useResumeWriting = false)
|
connection ! Register(self, keepOpenOnPeerClosed = true, useResumeWriting = false)
|
||||||
pull(bytesIn)
|
pull(bytesIn)
|
||||||
case ob @ Outbound(manager, cmd, _, _) ⇒
|
case ob @ Outbound(manager, cmd, _, _, _) ⇒
|
||||||
getStageActor(connecting(ob)).watch(manager)
|
getStageActor(connecting(ob)).watch(manager)
|
||||||
manager ! cmd
|
manager ! cmd
|
||||||
}
|
}
|
||||||
|
|
@ -238,9 +247,30 @@ private[stream] object ConnectionSourceStage {
|
||||||
}
|
}
|
||||||
|
|
||||||
private def connected(evt: (ActorRef, Any)): Unit = {
|
private def connected(evt: (ActorRef, Any)): Unit = {
|
||||||
val sender = evt._1
|
|
||||||
val msg = evt._2
|
val msg = evt._2
|
||||||
msg match {
|
msg match {
|
||||||
|
case Received(data) ⇒
|
||||||
|
// Keep on reading even when closed. There is no "close-read-side" in TCP
|
||||||
|
if (isClosed(bytesOut)) connection ! ResumeReading
|
||||||
|
else push(bytesOut, data)
|
||||||
|
|
||||||
|
case WriteAck ⇒
|
||||||
|
if (writeBuffer.isEmpty)
|
||||||
|
writePending = false
|
||||||
|
else {
|
||||||
|
connection ! Write(writeBuffer, WriteAck)
|
||||||
|
writePending = true
|
||||||
|
writeBuffer = ByteString.empty
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!writePending && connectionClosePending) {
|
||||||
|
// continue onUpstreamFinish
|
||||||
|
closeConnection()
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!isClosed(bytesIn) && !hasBeenPulled(bytesIn))
|
||||||
|
pull(bytesIn)
|
||||||
|
|
||||||
case Terminated(_) ⇒ failStage(new StreamTcpException("The connection actor has terminated. Stopping now."))
|
case Terminated(_) ⇒ failStage(new StreamTcpException("The connection actor has terminated. Stopping now."))
|
||||||
case f @ CommandFailed(cmd) ⇒ failStage(new StreamTcpException(s"Tcp command [$cmd] failed${f.causedByString}"))
|
case f @ CommandFailed(cmd) ⇒ failStage(new StreamTcpException(s"Tcp command [$cmd] failed${f.causedByString}"))
|
||||||
case ErrorClosed(cause) ⇒ failStage(new StreamTcpException(s"The connection closed with error: $cause"))
|
case ErrorClosed(cause) ⇒ failStage(new StreamTcpException(s"The connection closed with error: $cause"))
|
||||||
|
|
@ -249,15 +279,27 @@ private[stream] object ConnectionSourceStage {
|
||||||
case ConfirmedClosed ⇒ completeStage()
|
case ConfirmedClosed ⇒ completeStage()
|
||||||
case PeerClosed ⇒ complete(bytesOut)
|
case PeerClosed ⇒ complete(bytesOut)
|
||||||
|
|
||||||
case Received(data) ⇒
|
|
||||||
// Keep on reading even when closed. There is no "close-read-side" in TCP
|
|
||||||
if (isClosed(bytesOut)) connection ! ResumeReading
|
|
||||||
else push(bytesOut, data)
|
|
||||||
|
|
||||||
case WriteAck ⇒ if (!isClosed(bytesIn)) pull(bytesIn)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private def closeConnection(): Unit = {
|
||||||
|
// Note that if there are pending bytes in the writeBuffer those must be written first.
|
||||||
|
if (isClosed(bytesOut) || !role.halfClose) {
|
||||||
|
// Reading has stopped before, either because of cancel, or PeerClosed, so just Close now
|
||||||
|
// (or half-close is turned off)
|
||||||
|
if (writePending)
|
||||||
|
connectionClosePending = true // will continue when WriteAck is received and writeBuffer drained
|
||||||
|
else
|
||||||
|
connection ! Close
|
||||||
|
} else if (connection != null) {
|
||||||
|
// We still read, so we only close the write side
|
||||||
|
if (writePending)
|
||||||
|
connectionClosePending = true // will continue when WriteAck is received and writeBuffer drained
|
||||||
|
else
|
||||||
|
connection ! ConfirmedClose
|
||||||
|
} else completeStage()
|
||||||
|
}
|
||||||
|
|
||||||
val readHandler = new OutHandler {
|
val readHandler = new OutHandler {
|
||||||
override def onPull(): Unit = {
|
override def onPull(): Unit = {
|
||||||
connection ! ResumeReading
|
connection ! ResumeReading
|
||||||
|
|
@ -276,17 +318,20 @@ private[stream] object ConnectionSourceStage {
|
||||||
override def onPush(): Unit = {
|
override def onPush(): Unit = {
|
||||||
val elem = grab(bytesIn)
|
val elem = grab(bytesIn)
|
||||||
ReactiveStreamsCompliance.requireNonNullElement(elem)
|
ReactiveStreamsCompliance.requireNonNullElement(elem)
|
||||||
connection ! Write(elem.asInstanceOf[ByteString], WriteAck)
|
if (writePending) {
|
||||||
|
writeBuffer = writeBuffer ++ elem
|
||||||
|
} else {
|
||||||
|
connection ! Write(writeBuffer ++ elem, WriteAck)
|
||||||
|
writePending = true
|
||||||
|
writeBuffer = ByteString.empty
|
||||||
|
}
|
||||||
|
if (writeBuffer.size < writeBufferSize)
|
||||||
|
pull(bytesIn)
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
override def onUpstreamFinish(): Unit = {
|
override def onUpstreamFinish(): Unit =
|
||||||
// Reading has stopped before, either because of cancel, or PeerClosed, so just Close now
|
closeConnection()
|
||||||
// (or half-close is turned off)
|
|
||||||
if (isClosed(bytesOut) || !role.halfClose) connection ! Close
|
|
||||||
// We still read, so we only close the write side
|
|
||||||
else if (connection != null) connection ! ConfirmedClose
|
|
||||||
else completeStage()
|
|
||||||
}
|
|
||||||
|
|
||||||
override def onUpstreamFailure(ex: Throwable): Unit = {
|
override def onUpstreamFailure(ex: Throwable): Unit = {
|
||||||
if (connection != null) {
|
if (connection != null) {
|
||||||
|
|
@ -302,18 +347,20 @@ private[stream] object ConnectionSourceStage {
|
||||||
})
|
})
|
||||||
|
|
||||||
override def postStop(): Unit = role match {
|
override def postStop(): Unit = role match {
|
||||||
case Outbound(_, _, localAddressPromise, _) ⇒
|
case Outbound(_, _, localAddressPromise, _, _) ⇒
|
||||||
// Fail if has not been completed with an address earlier
|
// Fail if has not been completed with an address earlier
|
||||||
localAddressPromise.tryFailure(new StreamTcpException("Connection failed."))
|
localAddressPromise.tryFailure(new StreamTcpException("Connection failed."))
|
||||||
case _ ⇒ // do nothing...
|
case _ ⇒ // do nothing...
|
||||||
}
|
}
|
||||||
|
writeBuffer = ByteString.empty
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* INTERNAL API
|
* INTERNAL API
|
||||||
*/
|
*/
|
||||||
@InternalApi private[akka] class IncomingConnectionStage(connection: ActorRef, remoteAddress: InetSocketAddress, halfClose: Boolean)
|
@InternalApi private[akka] class IncomingConnectionStage(
|
||||||
|
connection: ActorRef, remoteAddress: InetSocketAddress, halfClose: Boolean, ioSettings: IOSettings)
|
||||||
extends GraphStage[FlowShape[ByteString, ByteString]] {
|
extends GraphStage[FlowShape[ByteString, ByteString]] {
|
||||||
import TcpConnectionStage._
|
import TcpConnectionStage._
|
||||||
|
|
||||||
|
|
@ -328,7 +375,7 @@ private[stream] object ConnectionSourceStage {
|
||||||
if (hasBeenCreated.get) throw new IllegalStateException("Cannot materialize an incoming connection Flow twice.")
|
if (hasBeenCreated.get) throw new IllegalStateException("Cannot materialize an incoming connection Flow twice.")
|
||||||
hasBeenCreated.set(true)
|
hasBeenCreated.set(true)
|
||||||
|
|
||||||
new TcpStreamLogic(shape, Inbound(connection, halfClose), remoteAddress)
|
new TcpStreamLogic(shape, Inbound(connection, halfClose, ioSettings), remoteAddress)
|
||||||
}
|
}
|
||||||
|
|
||||||
override def toString = s"TCP-from($remoteAddress)"
|
override def toString = s"TCP-from($remoteAddress)"
|
||||||
|
|
@ -343,7 +390,8 @@ private[stream] object ConnectionSourceStage {
|
||||||
localAddress: Option[InetSocketAddress] = None,
|
localAddress: Option[InetSocketAddress] = None,
|
||||||
options: immutable.Traversable[SocketOption] = Nil,
|
options: immutable.Traversable[SocketOption] = Nil,
|
||||||
halfClose: Boolean = true,
|
halfClose: Boolean = true,
|
||||||
connectTimeout: Duration = Duration.Inf)
|
connectTimeout: Duration = Duration.Inf,
|
||||||
|
ioSettings: IOSettings)
|
||||||
|
|
||||||
extends GraphStageWithMaterializedValue[FlowShape[ByteString, ByteString], Future[StreamTcp.OutgoingConnection]] {
|
extends GraphStageWithMaterializedValue[FlowShape[ByteString, ByteString], Future[StreamTcp.OutgoingConnection]] {
|
||||||
import TcpConnectionStage._
|
import TcpConnectionStage._
|
||||||
|
|
@ -365,7 +413,8 @@ private[stream] object ConnectionSourceStage {
|
||||||
manager,
|
manager,
|
||||||
Connect(remoteAddress, localAddress, options, connTimeout, pullMode = true),
|
Connect(remoteAddress, localAddress, options, connTimeout, pullMode = true),
|
||||||
localAddressPromise,
|
localAddressPromise,
|
||||||
halfClose),
|
halfClose,
|
||||||
|
ioSettings),
|
||||||
remoteAddress)
|
remoteAddress)
|
||||||
|
|
||||||
(logic, localAddressPromise.future.map(OutgoingConnection(remoteAddress, _))(ExecutionContexts.sameThreadExecutionContext))
|
(logic, localAddressPromise.future.map(OutgoingConnection(remoteAddress, _))(ExecutionContexts.sameThreadExecutionContext))
|
||||||
|
|
|
||||||
|
|
@ -65,8 +65,10 @@ object Tcp extends ExtensionId[Tcp] with ExtensionIdProvider {
|
||||||
final class Tcp(system: ExtendedActorSystem) extends akka.actor.Extension {
|
final class Tcp(system: ExtendedActorSystem) extends akka.actor.Extension {
|
||||||
import Tcp._
|
import Tcp._
|
||||||
|
|
||||||
|
private val settings = ActorMaterializerSettings(system)
|
||||||
|
|
||||||
// TODO maybe this should be a new setting, like `akka.stream.tcp.bind.timeout` / `shutdown-timeout` instead?
|
// TODO maybe this should be a new setting, like `akka.stream.tcp.bind.timeout` / `shutdown-timeout` instead?
|
||||||
val bindShutdownTimeout = ActorMaterializer()(system).settings.subscriptionTimeoutSettings.timeout
|
val bindShutdownTimeout = settings.subscriptionTimeoutSettings.timeout
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Creates a [[Tcp.ServerBinding]] instance which represents a prospective TCP server binding on the given `endpoint`.
|
* Creates a [[Tcp.ServerBinding]] instance which represents a prospective TCP server binding on the given `endpoint`.
|
||||||
|
|
@ -103,7 +105,8 @@ final class Tcp(system: ExtendedActorSystem) extends akka.actor.Extension {
|
||||||
options,
|
options,
|
||||||
halfClose,
|
halfClose,
|
||||||
idleTimeout,
|
idleTimeout,
|
||||||
bindShutdownTimeout))
|
bindShutdownTimeout,
|
||||||
|
settings.ioSettings))
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Creates a [[Tcp.ServerBinding]] instance which represents a prospective TCP server binding on the given `endpoint`
|
* Creates a [[Tcp.ServerBinding]] instance which represents a prospective TCP server binding on the given `endpoint`
|
||||||
|
|
@ -175,7 +178,8 @@ final class Tcp(system: ExtendedActorSystem) extends akka.actor.Extension {
|
||||||
localAddress,
|
localAddress,
|
||||||
options,
|
options,
|
||||||
halfClose,
|
halfClose,
|
||||||
connectTimeout)).via(detacher[ByteString]) // must read ahead for proper completions
|
connectTimeout,
|
||||||
|
settings.ioSettings)).via(detacher[ByteString]) // must read ahead for proper completions
|
||||||
|
|
||||||
idleTimeout match {
|
idleTimeout match {
|
||||||
case d: FiniteDuration ⇒ tcpFlow.join(TcpIdleTimeout(d, Some(remoteAddress)))
|
case d: FiniteDuration ⇒ tcpFlow.join(TcpIdleTimeout(d, Some(remoteAddress)))
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue