config of send queues

This commit is contained in:
Patrik Nordwall 2016-09-07 10:41:36 +02:00
parent ebd1883df5
commit 3c779cebd4
7 changed files with 58 additions and 16 deletions

View file

@ -75,6 +75,8 @@ class CodecBenchmark {
override def sendControl(to: Address, message: ControlMessage): Unit = ???
override def association(remoteAddress: Address): OutboundContext = ???
override def completeHandshake(peer: UniqueAddress): Future[Done] = ???
override lazy val settings: ArterySettings =
ArterySettings(ConfigFactory.load().getConfig("akka.remote.artery"))
}
private var materializer: ActorMaterializer = _

View file

@ -85,7 +85,6 @@ akka {
remote {
### FIXME: Temporary switch for the PoC
artery {
enabled = off
port = 20200
@ -166,10 +165,33 @@ akka {
# different destination actors. The selection of lane is based on consistent
# hashing of the recipient ActorRef to preserve message ordering per receiver.
inbound-lanes = 1
# Size of the send queue for outgoing messages. Messages will be dropped if
# the queue becomes full. This may happen if you send a burst of many messages
# without end-to-end flow control. Note that there is one such queue per
# outbound association. The trade-off of using a larger queue size is that
# it consumes more memory, since the queue is based on preallocated array with
# fixed size.
outbound-message-queue-size = 3072
# Size of the send queue for outgoing control messages, such as system messages.
# If this limit is reached the remote system is declared to be dead and its UID
# marked as quarantined.
# The trade-off of using a larger queue size is that it consumes more memory,
# since the queue is based on preallocated array with fixed size.
outbound-control-queue-size = 3072
# Size of the send queue for outgoing large messages. Messages will be dropped if
# the queue becomes full. This may happen if you send a burst of many messages
# without end-to-end flow control. Note that there is one such queue per
# outbound association. The trade-off of using a larger queue size is that
# it consumes more memory, since the queue is based on preallocated array with
# fixed size.
outbound-large-message-queue-size = 256
# This setting defines the maximum number of unacknowledged system messages
# allowed for a remote system. If this limit is reached the remote system is
# declared to be dead and its UID marked as tainted.
# declared to be dead and its UID marked as quarantined.
system-message-buffer-size = 20000
# unacknowledged system messages are re-delivered with this interval

View file

@ -57,6 +57,12 @@ private[akka] final class ArterySettings private (config: Config) {
n > 0, "inbound-lanes must be greater than zero")
val SysMsgBufferSize: Int = getInt("system-message-buffer-size").requiring(
_ > 0, "system-message-buffer-size must be more than zero")
val OutboundMessageQueueSize: Int = getInt("outbound-message-queue-size").requiring(
_ > 0, "outbound-message-queue-size must be more than zero")
val OutboundControlQueueSize: Int = getInt("outbound-control-queue-size").requiring(
_ > 0, "outbound-control-queue-size must be more than zero")
val OutboundLargeMessageQueueSize: Int = getInt("outbound-large-message-queue-size").requiring(
_ > 0, "outbound-large-message-queue-size must be more than zero")
val SystemMessageResendInterval = config.getMillisDuration("system-message-resend-interval").requiring(interval
interval > Duration.Zero, "system-message-resend-interval must be more than zero")
val HandshakeTimeout = config.getMillisDuration("handshake-timeout").requiring(interval

View file

@ -104,6 +104,8 @@ private[akka] trait InboundContext {
def completeHandshake(peer: UniqueAddress): Future[Done]
def settings: ArterySettings
}
/**
@ -219,6 +221,8 @@ private[akka] trait OutboundContext {
*/
def controlSubject: ControlMessageSubject
def settings: ArterySettings
}
/**
@ -372,7 +376,7 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R
priorityMessageDestinations,
outboundEnvelopePool))
def settings = provider.remoteSettings.Artery
override def settings = provider.remoteSettings.Artery
override def start(): Unit = {
startMediaDriver()

View file

@ -100,7 +100,10 @@ private[remote] class Association(
private val log = Logging(transport.system, getClass.getName)
private val restartCounter = new RestartCounter(transport.settings.Advanced.OutboundMaxRestarts, transport.settings.Advanced.OutboundRestartTimeout)
override def settings = transport.settings
private def advancedSettings = transport.settings.Advanced
private val restartCounter = new RestartCounter(advancedSettings.OutboundMaxRestarts, advancedSettings.OutboundRestartTimeout)
// We start with the raw wrapped queue and then it is replaced with the materialized value of
// the `SendQueue` after materialization. Using same underlying queue. This makes it possible to
@ -109,12 +112,10 @@ private[remote] class Association(
def createQueue(capacity: Int): Queue[OutboundEnvelope] =
new ManyToOneConcurrentArrayQueue[OutboundEnvelope](capacity)
private val outboundLanes = transport.settings.Advanced.OutboundLanes
private val controlQueueSize = transport.settings.Advanced.SysMsgBufferSize
// FIXME config queue size, and it should perhaps also be possible to use some kind of LinkedQueue
// such as agrona.ManyToOneConcurrentLinkedQueue or AbstractNodeQueue for less memory consumption
private val queueSize = 3072
private val largeQueueSize = 256
private val outboundLanes = advancedSettings.OutboundLanes
private val controlQueueSize = advancedSettings.OutboundControlQueueSize
private val queueSize = advancedSettings.OutboundMessageQueueSize
private val largeQueueSize = advancedSettings.OutboundLargeMessageQueueSize
private[this] val queues: Array[SendQueue.ProducerApi[OutboundEnvelope]] = Array.ofDim(2 + outboundLanes)
queues(ControlQueueIndex) = QueueWrapperImpl(createQueue(controlQueueSize)) // control stream
@ -429,7 +430,7 @@ private[remote] class Association(
.toMat(transport.outboundControl(this))(Keep.both)
.run()(materializer)
if (transport.settings.Advanced.TestMode)
if (advancedSettings.TestMode)
_testStages.add(testMgmt)
queueValue.inject(wrapper.queue)
@ -465,7 +466,7 @@ private[remote] class Association(
.toMat(transport.outbound(this))(Keep.both)
.run()(materializer)
if (transport.settings.Advanced.TestMode)
if (advancedSettings.TestMode)
_testStages.add(testMgmt)
queueValue.inject(wrapper.queue)
@ -509,7 +510,7 @@ private[remote] class Association(
val (queueValues, testMgmtValues) = a.unzip
val (changeCompressionValues, laneCompletedValues) = b.unzip
if (transport.settings.Advanced.TestMode)
if (advancedSettings.TestMode)
testMgmtValues.foreach(_testStages.add)
import transport.system.dispatcher
@ -545,7 +546,7 @@ private[remote] class Association(
.toMat(transport.outboundLarge(this))(Keep.both)
.run()(materializer)
if (transport.settings.Advanced.TestMode)
if (advancedSettings.TestMode)
_testStages.add(testMgmt)
queueValue.inject(wrapper.queue)
@ -594,7 +595,7 @@ private[remote] class Association(
lazyRestart()
} else {
log.error(cause, s"{} to {} failed and restarted {} times within {} seconds. Terminating system. ${cause.getMessage}",
streamName, remoteAddress, transport.settings.Advanced.OutboundMaxRestarts, transport.settings.Advanced.OutboundRestartTimeout.toSeconds)
streamName, remoteAddress, advancedSettings.OutboundMaxRestarts, advancedSettings.OutboundRestartTimeout.toSeconds)
transport.system.terminate()
}
}

View file

@ -176,7 +176,7 @@ private[akka] class OutboundControlJunction(
import OutboundControlJunction._
private val sendControlMessageCallback = getAsyncCallback[ControlMessage](internalSendControlMessage)
private val maxControlMessageBufferSize: Int = 1024 // FIXME config
private val maxControlMessageBufferSize: Int = outboundContext.settings.Advanced.OutboundControlQueueSize
private val buffer = new ArrayDeque[OutboundEnvelope]
override def preStart(): Unit = {

View file

@ -19,6 +19,7 @@ import akka.remote.artery.InboundControlJunction.ControlMessageSubject
import akka.util.OptionVal
import akka.actor.InternalActorRef
import akka.dispatch.ExecutionContexts
import com.typesafe.config.ConfigFactory
private[remote] class TestInboundContext(
override val localAddress: UniqueAddress,
@ -59,6 +60,9 @@ private[remote] class TestInboundContext(
protected def createAssociation(remoteAddress: Address): TestOutboundContext =
new TestOutboundContext(localAddress, remoteAddress, controlSubject, controlProbe)
override lazy val settings: ArterySettings =
ArterySettings(ConfigFactory.load().getConfig("akka.remote.artery"))
}
private[remote] class TestOutboundContext(
@ -94,6 +98,9 @@ private[remote] class TestOutboundContext(
OptionVal.None))
}
override lazy val settings: ArterySettings =
ArterySettings(ConfigFactory.load().getConfig("akka.remote.artery"))
}
private[remote] class TestControlMessageSubject extends ControlMessageSubject {