diff --git a/akka-bench-jmh/src/main/scala/akka/remote/artery/CodecBenchmark.scala b/akka-bench-jmh/src/main/scala/akka/remote/artery/CodecBenchmark.scala index 76c52c01a5..be9f3efd90 100644 --- a/akka-bench-jmh/src/main/scala/akka/remote/artery/CodecBenchmark.scala +++ b/akka-bench-jmh/src/main/scala/akka/remote/artery/CodecBenchmark.scala @@ -75,6 +75,8 @@ class CodecBenchmark { override def sendControl(to: Address, message: ControlMessage): Unit = ??? override def association(remoteAddress: Address): OutboundContext = ??? override def completeHandshake(peer: UniqueAddress): Future[Done] = ??? + override lazy val settings: ArterySettings = + ArterySettings(ConfigFactory.load().getConfig("akka.remote.artery")) } private var materializer: ActorMaterializer = _ diff --git a/akka-remote/src/main/resources/reference.conf b/akka-remote/src/main/resources/reference.conf index 2fd385e08c..bd266bc3f8 100644 --- a/akka-remote/src/main/resources/reference.conf +++ b/akka-remote/src/main/resources/reference.conf @@ -85,7 +85,6 @@ akka { remote { - ### FIXME: Temporary switch for the PoC artery { enabled = off port = 20200 @@ -166,10 +165,33 @@ akka { # different destination actors. The selection of lane is based on consistent # hashing of the recipient ActorRef to preserve message ordering per receiver. inbound-lanes = 1 + + # Size of the send queue for outgoing messages. Messages will be dropped if + # the queue becomes full. This may happen if you send a burst of many messages + # without end-to-end flow control. Note that there is one such queue per + # outbound association. The trade-off of using a larger queue size is that + # it consumes more memory, since the queue is based on preallocated array with + # fixed size. + outbound-message-queue-size = 3072 + + # Size of the send queue for outgoing control messages, such as system messages. + # If this limit is reached the remote system is declared to be dead and its UID + # marked as quarantined. + # The trade-off of using a larger queue size is that it consumes more memory, + # since the queue is based on preallocated array with fixed size. + outbound-control-queue-size = 3072 + + # Size of the send queue for outgoing large messages. Messages will be dropped if + # the queue becomes full. This may happen if you send a burst of many messages + # without end-to-end flow control. Note that there is one such queue per + # outbound association. The trade-off of using a larger queue size is that + # it consumes more memory, since the queue is based on preallocated array with + # fixed size. + outbound-large-message-queue-size = 256 # This setting defines the maximum number of unacknowledged system messages # allowed for a remote system. If this limit is reached the remote system is - # declared to be dead and its UID marked as tainted. + # declared to be dead and its UID marked as quarantined. system-message-buffer-size = 20000 # unacknowledged system messages are re-delivered with this interval diff --git a/akka-remote/src/main/scala/akka/remote/artery/ArterySettings.scala b/akka-remote/src/main/scala/akka/remote/artery/ArterySettings.scala index 365f71daf8..fa68f8c5e5 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/ArterySettings.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/ArterySettings.scala @@ -57,6 +57,12 @@ private[akka] final class ArterySettings private (config: Config) { n > 0, "inbound-lanes must be greater than zero") val SysMsgBufferSize: Int = getInt("system-message-buffer-size").requiring( _ > 0, "system-message-buffer-size must be more than zero") + val OutboundMessageQueueSize: Int = getInt("outbound-message-queue-size").requiring( + _ > 0, "outbound-message-queue-size must be more than zero") + val OutboundControlQueueSize: Int = getInt("outbound-control-queue-size").requiring( + _ > 0, "outbound-control-queue-size must be more than zero") + val OutboundLargeMessageQueueSize: Int = getInt("outbound-large-message-queue-size").requiring( + _ > 0, "outbound-large-message-queue-size must be more than zero") val SystemMessageResendInterval = config.getMillisDuration("system-message-resend-interval").requiring(interval ⇒ interval > Duration.Zero, "system-message-resend-interval must be more than zero") val HandshakeTimeout = config.getMillisDuration("handshake-timeout").requiring(interval ⇒ diff --git a/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala b/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala index d5187a167a..d9f6bc05f1 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala @@ -104,6 +104,8 @@ private[akka] trait InboundContext { def completeHandshake(peer: UniqueAddress): Future[Done] + def settings: ArterySettings + } /** @@ -219,6 +221,8 @@ private[akka] trait OutboundContext { */ def controlSubject: ControlMessageSubject + def settings: ArterySettings + } /** @@ -372,7 +376,7 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R priorityMessageDestinations, outboundEnvelopePool)) - def settings = provider.remoteSettings.Artery + override def settings = provider.remoteSettings.Artery override def start(): Unit = { startMediaDriver() diff --git a/akka-remote/src/main/scala/akka/remote/artery/Association.scala b/akka-remote/src/main/scala/akka/remote/artery/Association.scala index eaa814fe65..288799e373 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/Association.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/Association.scala @@ -100,7 +100,10 @@ private[remote] class Association( private val log = Logging(transport.system, getClass.getName) - private val restartCounter = new RestartCounter(transport.settings.Advanced.OutboundMaxRestarts, transport.settings.Advanced.OutboundRestartTimeout) + override def settings = transport.settings + private def advancedSettings = transport.settings.Advanced + + private val restartCounter = new RestartCounter(advancedSettings.OutboundMaxRestarts, advancedSettings.OutboundRestartTimeout) // We start with the raw wrapped queue and then it is replaced with the materialized value of // the `SendQueue` after materialization. Using same underlying queue. This makes it possible to @@ -109,12 +112,10 @@ private[remote] class Association( def createQueue(capacity: Int): Queue[OutboundEnvelope] = new ManyToOneConcurrentArrayQueue[OutboundEnvelope](capacity) - private val outboundLanes = transport.settings.Advanced.OutboundLanes - private val controlQueueSize = transport.settings.Advanced.SysMsgBufferSize - // FIXME config queue size, and it should perhaps also be possible to use some kind of LinkedQueue - // such as agrona.ManyToOneConcurrentLinkedQueue or AbstractNodeQueue for less memory consumption - private val queueSize = 3072 - private val largeQueueSize = 256 + private val outboundLanes = advancedSettings.OutboundLanes + private val controlQueueSize = advancedSettings.OutboundControlQueueSize + private val queueSize = advancedSettings.OutboundMessageQueueSize + private val largeQueueSize = advancedSettings.OutboundLargeMessageQueueSize private[this] val queues: Array[SendQueue.ProducerApi[OutboundEnvelope]] = Array.ofDim(2 + outboundLanes) queues(ControlQueueIndex) = QueueWrapperImpl(createQueue(controlQueueSize)) // control stream @@ -429,7 +430,7 @@ private[remote] class Association( .toMat(transport.outboundControl(this))(Keep.both) .run()(materializer) - if (transport.settings.Advanced.TestMode) + if (advancedSettings.TestMode) _testStages.add(testMgmt) queueValue.inject(wrapper.queue) @@ -465,7 +466,7 @@ private[remote] class Association( .toMat(transport.outbound(this))(Keep.both) .run()(materializer) - if (transport.settings.Advanced.TestMode) + if (advancedSettings.TestMode) _testStages.add(testMgmt) queueValue.inject(wrapper.queue) @@ -509,7 +510,7 @@ private[remote] class Association( val (queueValues, testMgmtValues) = a.unzip val (changeCompressionValues, laneCompletedValues) = b.unzip - if (transport.settings.Advanced.TestMode) + if (advancedSettings.TestMode) testMgmtValues.foreach(_testStages.add) import transport.system.dispatcher @@ -545,7 +546,7 @@ private[remote] class Association( .toMat(transport.outboundLarge(this))(Keep.both) .run()(materializer) - if (transport.settings.Advanced.TestMode) + if (advancedSettings.TestMode) _testStages.add(testMgmt) queueValue.inject(wrapper.queue) @@ -594,7 +595,7 @@ private[remote] class Association( lazyRestart() } else { log.error(cause, s"{} to {} failed and restarted {} times within {} seconds. Terminating system. ${cause.getMessage}", - streamName, remoteAddress, transport.settings.Advanced.OutboundMaxRestarts, transport.settings.Advanced.OutboundRestartTimeout.toSeconds) + streamName, remoteAddress, advancedSettings.OutboundMaxRestarts, advancedSettings.OutboundRestartTimeout.toSeconds) transport.system.terminate() } } diff --git a/akka-remote/src/main/scala/akka/remote/artery/Control.scala b/akka-remote/src/main/scala/akka/remote/artery/Control.scala index 76aad28dc5..66d957244d 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/Control.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/Control.scala @@ -176,7 +176,7 @@ private[akka] class OutboundControlJunction( import OutboundControlJunction._ private val sendControlMessageCallback = getAsyncCallback[ControlMessage](internalSendControlMessage) - private val maxControlMessageBufferSize: Int = 1024 // FIXME config + private val maxControlMessageBufferSize: Int = outboundContext.settings.Advanced.OutboundControlQueueSize private val buffer = new ArrayDeque[OutboundEnvelope] override def preStart(): Unit = { diff --git a/akka-remote/src/test/scala/akka/remote/artery/TestContext.scala b/akka-remote/src/test/scala/akka/remote/artery/TestContext.scala index 9a350984ef..db5965589d 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/TestContext.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/TestContext.scala @@ -19,6 +19,7 @@ import akka.remote.artery.InboundControlJunction.ControlMessageSubject import akka.util.OptionVal import akka.actor.InternalActorRef import akka.dispatch.ExecutionContexts +import com.typesafe.config.ConfigFactory private[remote] class TestInboundContext( override val localAddress: UniqueAddress, @@ -59,6 +60,9 @@ private[remote] class TestInboundContext( protected def createAssociation(remoteAddress: Address): TestOutboundContext = new TestOutboundContext(localAddress, remoteAddress, controlSubject, controlProbe) + + override lazy val settings: ArterySettings = + ArterySettings(ConfigFactory.load().getConfig("akka.remote.artery")) } private[remote] class TestOutboundContext( @@ -94,6 +98,9 @@ private[remote] class TestOutboundContext( OptionVal.None)) } + override lazy val settings: ArterySettings = + ArterySettings(ConfigFactory.load().getConfig("akka.remote.artery")) + } private[remote] class TestControlMessageSubject extends ControlMessageSubject {