From ebd1883df5b1f44f1e95564020a669f69d919acf Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Mon, 5 Sep 2016 15:08:30 +0200 Subject: [PATCH 1/8] remove or reword obsolete fixme --- .../scala/akka/remote/artery/ArteryTransport.scala | 3 ++- .../main/scala/akka/remote/artery/Association.scala | 8 +++----- .../src/main/scala/akka/remote/artery/Codecs.scala | 4 ++-- .../scala/akka/remote/artery/FlightRecorder.scala | 4 ++-- .../main/scala/akka/remote/artery/SendQueue.scala | 1 + .../akka/remote/artery/SystemMessageDelivery.scala | 3 +-- .../remote/artery/compress/InboundCompressions.scala | 12 ++++-------- .../akka/remote/artery/RemoteDeathWatchSpec.scala | 2 -- .../HandshakeShouldDropCompressionTableSpec.scala | 1 - 9 files changed, 15 insertions(+), 23 deletions(-) diff --git a/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala b/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala index 90efe937dd..d5187a167a 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala @@ -708,7 +708,8 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R implicit val ec = materializer.executionContext updateStreamCompletion(streamName, streamCompleted.recover { case _ ⇒ Done }) streamCompleted.onFailure { - case ShutdownSignal ⇒ // shutdown as expected + case ShutdownSignal ⇒ // shutdown as expected + case _: AeronTerminated ⇒ // shutdown already in progress case cause if isShutdown ⇒ // don't restart after shutdown, but log some details so we notice log.error(cause, s"{} failed after shutdown. {}", streamName, cause.getMessage) diff --git a/akka-remote/src/main/scala/akka/remote/artery/Association.scala b/akka-remote/src/main/scala/akka/remote/artery/Association.scala index 55c4d38335..eaa814fe65 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/Association.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/Association.scala @@ -48,6 +48,7 @@ import java.util.concurrent.atomic.AtomicBoolean import akka.stream.KillSwitches import scala.util.Failure import scala.util.Success +import akka.remote.artery.ArteryTransport.AeronTerminated /** * INTERNAL API @@ -280,7 +281,6 @@ private[remote] class Association( } // allow ActorSelectionMessage to pass through quarantine, to be able to establish interaction with new system - // FIXME where is that ActorSelectionMessage check in old remoting? if (message.isInstanceOf[ActorSelectionMessage] || !associationState.isQuarantined() || message == ClearSystemMessageDelivery) { message match { case _: SystemMessage | ClearSystemMessageDelivery | _: ControlMessage ⇒ @@ -388,7 +388,6 @@ private[remote] class Association( remoteAddress, reason) } case None ⇒ - // FIXME should we do something more, old impl used gating? log.warning("Quarantine of [{}] ignored because unknown UID", remoteAddress) } @@ -573,6 +572,7 @@ private[remote] class Association( updateStreamCompletion(streamName, streamCompleted.recover { case _ ⇒ Done }) streamCompleted.onFailure { case ArteryTransport.ShutdownSignal ⇒ // shutdown as expected + case _: AeronTerminated ⇒ // shutdown already in progress case cause if transport.isShutdown ⇒ // don't restart after shutdown, but log some details so we notice log.error(cause, s"{} to {} failed after shutdown. {}", streamName, remoteAddress, cause.getMessage) @@ -593,10 +593,8 @@ private[remote] class Association( log.error(cause, "{} to {} failed. Restarting it. {}", streamName, remoteAddress, cause.getMessage) lazyRestart() } else { - log.error(cause, s"{} to {} failed and restarted {} times within {} seconds. Terminating system. {cause.getMessage}", + log.error(cause, s"{} to {} failed and restarted {} times within {} seconds. Terminating system. ${cause.getMessage}", streamName, remoteAddress, transport.settings.Advanced.OutboundMaxRestarts, transport.settings.Advanced.OutboundRestartTimeout.toSeconds) - - // FIXME is this the right thing to do for outbound? transport.system.terminate() } } diff --git a/akka-remote/src/main/scala/akka/remote/artery/Codecs.scala b/akka-remote/src/main/scala/akka/remote/artery/Codecs.scala index b9ded54aed..e62d6de02e 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/Codecs.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/Codecs.scala @@ -313,7 +313,7 @@ private[remote] class Decoder( val decoded = inEnvelopePool.acquire().init( recipient, - localAddress, // FIXME: Is this needed anymore? What should we do here? + localAddress, // FIXME: this is used for the "non-local recipient" check in MessageDispatcher. Is this needed anymore? sender, originUid, headerBuilder.serializer, @@ -326,7 +326,7 @@ private[remote] class Decoder( // recipient for the first message that is sent to it, best effort retry scheduleOnce(RetryResolveRemoteDeployedRecipient( retryResolveRemoteDeployedRecipientAttempts, - headerBuilder.recipientActorRefPath.get, decoded), retryResolveRemoteDeployedRecipientInterval) // FIXME IS THIS SAFE? + headerBuilder.recipientActorRefPath.get, decoded), retryResolveRemoteDeployedRecipientInterval) } else push(out, decoded) } diff --git a/akka-remote/src/main/scala/akka/remote/artery/FlightRecorder.scala b/akka-remote/src/main/scala/akka/remote/artery/FlightRecorder.scala index f86248cb08..9ace0f9315 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/FlightRecorder.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/FlightRecorder.scala @@ -329,7 +329,7 @@ private[akka] class FlightRecorder(val fileChannel: FileChannel) extends AtomicB private def prepareRichRecord(recordBuffer: ByteBuffer, code: Int, metadata: Array[Byte]): Unit = { recordBuffer.clear() - // FIXME: This is a bit overkill, needs some smarter scheme later, no need to always store the wallclock + // TODO: This is a bit overkill, needs some smarter scheme later, no need to always store the wallclock recordBuffer.putLong(clock.wallClockPart) recordBuffer.putLong(clock.highSpeedPart) recordBuffer.putInt(code) @@ -342,7 +342,7 @@ private[akka] class FlightRecorder(val fileChannel: FileChannel) extends AtomicB recordBuffer.position(0) } - // FIXME: Try to save as many bytes here as possible! We will see crazy throughput here + // TODO: Try to save as many bytes here as possible! We will see crazy throughput here override def hiFreq(code: Long, param: Long): Unit = { hiFreqBatchedEntries += 1 hiFreqBatchBuffer.putLong(code) diff --git a/akka-remote/src/main/scala/akka/remote/artery/SendQueue.scala b/akka-remote/src/main/scala/akka/remote/artery/SendQueue.scala index 6710bb1ff7..73bcd0609b 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/SendQueue.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/SendQueue.scala @@ -99,6 +99,7 @@ private[remote] final class SendQueue[T] extends GraphStageWithMaterializedValue } override def postStop(): Unit = { + // TODO quarantine will currently always be done when control stream is terminated, see issue #21359 if (consumerQueue ne null) consumerQueue.clear() super.postStop() diff --git a/akka-remote/src/main/scala/akka/remote/artery/SystemMessageDelivery.scala b/akka-remote/src/main/scala/akka/remote/artery/SystemMessageDelivery.scala index c21f3bb2d9..3eaead00b7 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/SystemMessageDelivery.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/SystemMessageDelivery.scala @@ -29,7 +29,6 @@ import akka.actor.ActorRef */ private[akka] object SystemMessageDelivery { // FIXME serialization of these messages - // FIXME ackReplyTo should not be needed final case class SystemMessageEnvelope(message: AnyRef, seqNo: Long, ackReplyTo: UniqueAddress) extends ArteryMessage final case class Ack(seqNo: Long, from: UniqueAddress) extends Reply final case class Nack(seqNo: Long, from: UniqueAddress) extends Reply @@ -80,7 +79,6 @@ private[akka] class SystemMessageDelivery( outboundContext.controlSubject.stopped.onComplete { getAsyncCallback[Try[Done]] { - // FIXME quarantine case Success(_) ⇒ completeStage() case Failure(cause) ⇒ failStage(cause) }.invoke @@ -88,6 +86,7 @@ private[akka] class SystemMessageDelivery( } override def postStop(): Unit = { + // TODO quarantine will currently always be done when control stream is terminated, see issue #21359 sendUnacknowledgedToDeadLetters() unacknowledged.clear() outboundContext.controlSubject.detach(this) diff --git a/akka-remote/src/main/scala/akka/remote/artery/compress/InboundCompressions.scala b/akka-remote/src/main/scala/akka/remote/artery/compress/InboundCompressions.scala index ddcfd0bda5..b55c051dba 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/compress/InboundCompressions.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/compress/InboundCompressions.scala @@ -42,7 +42,9 @@ private[remote] final class InboundCompressionsImpl( inboundContext: InboundContext, settings: ArterySettings.Compression) extends InboundCompressions { - // FIXME we also must remove the ones that won't be used anymore - when quarantine triggers + // TODO we also must remove the ones that won't be used anymore - when quarantine triggers? + // Why is that important? Won't be naturally be removed in new advertisements since they + // are not used any more? private[this] val _actorRefsIns = new Long2ObjectHashMap[InboundActorRefCompression]() private val createInboundActorRefsForOrigin = new LongFunction[InboundActorRefCompression] { override def apply(originUid: Long): InboundActorRefCompression = { @@ -200,12 +202,6 @@ private[remote] abstract class InboundCompression[T >: Null]( lazy val log = Logging(system, getClass.getSimpleName) - // FIXME NOTE: there exist edge cases around, we advertise table 1, accumulate table 2, the remote system has not used 2 yet, - // yet we technically could already prepare table 3, then it starts using table 1 suddenly. Edge cases like that. - // SOLUTION 1: We don't start building new tables until we've seen the previous one be used (move from new to active) - // This is nice as it practically disables all the "build the table" work when the other side is not interested in using it. - // SOLUTION 2: We end up dropping messages when old table comes in (we do that anyway) - private[this] val state: AtomicReference[InboundCompression.State[T]] = new AtomicReference(InboundCompression.State.empty) // TODO calibrate properly (h/w have direct relation to preciseness and max capacity) @@ -224,7 +220,7 @@ private[remote] abstract class InboundCompression[T >: Null]( * @throws UnknownCompressedIdException if given id is not known, this may indicate a bug – such situation should not happen. */ @tailrec final def decompressInternal(incomingTableVersion: Int, idx: Int, attemptCounter: Int): OptionVal[T] = { - // effectively should never loop more than once, to avoid infinite recursion blow up eagerly + // effectively should never loop more than once, to avoid infinite recursion blow up eagerly if (attemptCounter > 2) throw new IllegalStateException(s"Unable to decompress $idx from table $incomingTableVersion. Internal state: ${state.get}") val current = state.get diff --git a/akka-remote/src/test/scala/akka/remote/artery/RemoteDeathWatchSpec.scala b/akka-remote/src/test/scala/akka/remote/artery/RemoteDeathWatchSpec.scala index 4a7d049b0e..c7da3eab69 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/RemoteDeathWatchSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/RemoteDeathWatchSpec.scala @@ -26,8 +26,6 @@ object RemoteDeathWatchSpec { } } remote.watch-failure-detector.acceptable-heartbeat-pause = 3s - # FIXME do we need the initial-system-message-delivery-timeout? - remote.initial-system-message-delivery-timeout = 3 s remote.artery.enabled = on remote.artery.hostname = localhost remote.artery.port = 0 diff --git a/akka-remote/src/test/scala/akka/remote/artery/compress/HandshakeShouldDropCompressionTableSpec.scala b/akka-remote/src/test/scala/akka/remote/artery/compress/HandshakeShouldDropCompressionTableSpec.scala index c70438300c..55433ae0f1 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/compress/HandshakeShouldDropCompressionTableSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/compress/HandshakeShouldDropCompressionTableSpec.scala @@ -59,7 +59,6 @@ class HandshakeShouldDropCompressionTableSpec extends AkkaSpec(HandshakeShouldDr } "Outgoing compression table" must { - // FIXME this is failing, we must rethink how tables are identified and updated "be dropped on system restart" in { val messagesToExchange = 10 val systemATransport = RARP(system).provider.transport.asInstanceOf[ArteryTransport] From 3c779cebd48f905d6ca096d2a264ae4da3b84c09 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Wed, 7 Sep 2016 10:41:36 +0200 Subject: [PATCH 2/8] config of send queues --- .../akka/remote/artery/CodecBenchmark.scala | 2 ++ akka-remote/src/main/resources/reference.conf | 26 +++++++++++++++++-- .../akka/remote/artery/ArterySettings.scala | 6 +++++ .../akka/remote/artery/ArteryTransport.scala | 6 ++++- .../akka/remote/artery/Association.scala | 25 +++++++++--------- .../scala/akka/remote/artery/Control.scala | 2 +- .../akka/remote/artery/TestContext.scala | 7 +++++ 7 files changed, 58 insertions(+), 16 deletions(-) diff --git a/akka-bench-jmh/src/main/scala/akka/remote/artery/CodecBenchmark.scala b/akka-bench-jmh/src/main/scala/akka/remote/artery/CodecBenchmark.scala index 76c52c01a5..be9f3efd90 100644 --- a/akka-bench-jmh/src/main/scala/akka/remote/artery/CodecBenchmark.scala +++ b/akka-bench-jmh/src/main/scala/akka/remote/artery/CodecBenchmark.scala @@ -75,6 +75,8 @@ class CodecBenchmark { override def sendControl(to: Address, message: ControlMessage): Unit = ??? override def association(remoteAddress: Address): OutboundContext = ??? override def completeHandshake(peer: UniqueAddress): Future[Done] = ??? + override lazy val settings: ArterySettings = + ArterySettings(ConfigFactory.load().getConfig("akka.remote.artery")) } private var materializer: ActorMaterializer = _ diff --git a/akka-remote/src/main/resources/reference.conf b/akka-remote/src/main/resources/reference.conf index 2fd385e08c..bd266bc3f8 100644 --- a/akka-remote/src/main/resources/reference.conf +++ b/akka-remote/src/main/resources/reference.conf @@ -85,7 +85,6 @@ akka { remote { - ### FIXME: Temporary switch for the PoC artery { enabled = off port = 20200 @@ -166,10 +165,33 @@ akka { # different destination actors. The selection of lane is based on consistent # hashing of the recipient ActorRef to preserve message ordering per receiver. inbound-lanes = 1 + + # Size of the send queue for outgoing messages. Messages will be dropped if + # the queue becomes full. This may happen if you send a burst of many messages + # without end-to-end flow control. Note that there is one such queue per + # outbound association. The trade-off of using a larger queue size is that + # it consumes more memory, since the queue is based on preallocated array with + # fixed size. + outbound-message-queue-size = 3072 + + # Size of the send queue for outgoing control messages, such as system messages. + # If this limit is reached the remote system is declared to be dead and its UID + # marked as quarantined. + # The trade-off of using a larger queue size is that it consumes more memory, + # since the queue is based on preallocated array with fixed size. + outbound-control-queue-size = 3072 + + # Size of the send queue for outgoing large messages. Messages will be dropped if + # the queue becomes full. This may happen if you send a burst of many messages + # without end-to-end flow control. Note that there is one such queue per + # outbound association. The trade-off of using a larger queue size is that + # it consumes more memory, since the queue is based on preallocated array with + # fixed size. + outbound-large-message-queue-size = 256 # This setting defines the maximum number of unacknowledged system messages # allowed for a remote system. If this limit is reached the remote system is - # declared to be dead and its UID marked as tainted. + # declared to be dead and its UID marked as quarantined. system-message-buffer-size = 20000 # unacknowledged system messages are re-delivered with this interval diff --git a/akka-remote/src/main/scala/akka/remote/artery/ArterySettings.scala b/akka-remote/src/main/scala/akka/remote/artery/ArterySettings.scala index 365f71daf8..fa68f8c5e5 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/ArterySettings.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/ArterySettings.scala @@ -57,6 +57,12 @@ private[akka] final class ArterySettings private (config: Config) { n > 0, "inbound-lanes must be greater than zero") val SysMsgBufferSize: Int = getInt("system-message-buffer-size").requiring( _ > 0, "system-message-buffer-size must be more than zero") + val OutboundMessageQueueSize: Int = getInt("outbound-message-queue-size").requiring( + _ > 0, "outbound-message-queue-size must be more than zero") + val OutboundControlQueueSize: Int = getInt("outbound-control-queue-size").requiring( + _ > 0, "outbound-control-queue-size must be more than zero") + val OutboundLargeMessageQueueSize: Int = getInt("outbound-large-message-queue-size").requiring( + _ > 0, "outbound-large-message-queue-size must be more than zero") val SystemMessageResendInterval = config.getMillisDuration("system-message-resend-interval").requiring(interval ⇒ interval > Duration.Zero, "system-message-resend-interval must be more than zero") val HandshakeTimeout = config.getMillisDuration("handshake-timeout").requiring(interval ⇒ diff --git a/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala b/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala index d5187a167a..d9f6bc05f1 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala @@ -104,6 +104,8 @@ private[akka] trait InboundContext { def completeHandshake(peer: UniqueAddress): Future[Done] + def settings: ArterySettings + } /** @@ -219,6 +221,8 @@ private[akka] trait OutboundContext { */ def controlSubject: ControlMessageSubject + def settings: ArterySettings + } /** @@ -372,7 +376,7 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R priorityMessageDestinations, outboundEnvelopePool)) - def settings = provider.remoteSettings.Artery + override def settings = provider.remoteSettings.Artery override def start(): Unit = { startMediaDriver() diff --git a/akka-remote/src/main/scala/akka/remote/artery/Association.scala b/akka-remote/src/main/scala/akka/remote/artery/Association.scala index eaa814fe65..288799e373 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/Association.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/Association.scala @@ -100,7 +100,10 @@ private[remote] class Association( private val log = Logging(transport.system, getClass.getName) - private val restartCounter = new RestartCounter(transport.settings.Advanced.OutboundMaxRestarts, transport.settings.Advanced.OutboundRestartTimeout) + override def settings = transport.settings + private def advancedSettings = transport.settings.Advanced + + private val restartCounter = new RestartCounter(advancedSettings.OutboundMaxRestarts, advancedSettings.OutboundRestartTimeout) // We start with the raw wrapped queue and then it is replaced with the materialized value of // the `SendQueue` after materialization. Using same underlying queue. This makes it possible to @@ -109,12 +112,10 @@ private[remote] class Association( def createQueue(capacity: Int): Queue[OutboundEnvelope] = new ManyToOneConcurrentArrayQueue[OutboundEnvelope](capacity) - private val outboundLanes = transport.settings.Advanced.OutboundLanes - private val controlQueueSize = transport.settings.Advanced.SysMsgBufferSize - // FIXME config queue size, and it should perhaps also be possible to use some kind of LinkedQueue - // such as agrona.ManyToOneConcurrentLinkedQueue or AbstractNodeQueue for less memory consumption - private val queueSize = 3072 - private val largeQueueSize = 256 + private val outboundLanes = advancedSettings.OutboundLanes + private val controlQueueSize = advancedSettings.OutboundControlQueueSize + private val queueSize = advancedSettings.OutboundMessageQueueSize + private val largeQueueSize = advancedSettings.OutboundLargeMessageQueueSize private[this] val queues: Array[SendQueue.ProducerApi[OutboundEnvelope]] = Array.ofDim(2 + outboundLanes) queues(ControlQueueIndex) = QueueWrapperImpl(createQueue(controlQueueSize)) // control stream @@ -429,7 +430,7 @@ private[remote] class Association( .toMat(transport.outboundControl(this))(Keep.both) .run()(materializer) - if (transport.settings.Advanced.TestMode) + if (advancedSettings.TestMode) _testStages.add(testMgmt) queueValue.inject(wrapper.queue) @@ -465,7 +466,7 @@ private[remote] class Association( .toMat(transport.outbound(this))(Keep.both) .run()(materializer) - if (transport.settings.Advanced.TestMode) + if (advancedSettings.TestMode) _testStages.add(testMgmt) queueValue.inject(wrapper.queue) @@ -509,7 +510,7 @@ private[remote] class Association( val (queueValues, testMgmtValues) = a.unzip val (changeCompressionValues, laneCompletedValues) = b.unzip - if (transport.settings.Advanced.TestMode) + if (advancedSettings.TestMode) testMgmtValues.foreach(_testStages.add) import transport.system.dispatcher @@ -545,7 +546,7 @@ private[remote] class Association( .toMat(transport.outboundLarge(this))(Keep.both) .run()(materializer) - if (transport.settings.Advanced.TestMode) + if (advancedSettings.TestMode) _testStages.add(testMgmt) queueValue.inject(wrapper.queue) @@ -594,7 +595,7 @@ private[remote] class Association( lazyRestart() } else { log.error(cause, s"{} to {} failed and restarted {} times within {} seconds. Terminating system. ${cause.getMessage}", - streamName, remoteAddress, transport.settings.Advanced.OutboundMaxRestarts, transport.settings.Advanced.OutboundRestartTimeout.toSeconds) + streamName, remoteAddress, advancedSettings.OutboundMaxRestarts, advancedSettings.OutboundRestartTimeout.toSeconds) transport.system.terminate() } } diff --git a/akka-remote/src/main/scala/akka/remote/artery/Control.scala b/akka-remote/src/main/scala/akka/remote/artery/Control.scala index 76aad28dc5..66d957244d 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/Control.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/Control.scala @@ -176,7 +176,7 @@ private[akka] class OutboundControlJunction( import OutboundControlJunction._ private val sendControlMessageCallback = getAsyncCallback[ControlMessage](internalSendControlMessage) - private val maxControlMessageBufferSize: Int = 1024 // FIXME config + private val maxControlMessageBufferSize: Int = outboundContext.settings.Advanced.OutboundControlQueueSize private val buffer = new ArrayDeque[OutboundEnvelope] override def preStart(): Unit = { diff --git a/akka-remote/src/test/scala/akka/remote/artery/TestContext.scala b/akka-remote/src/test/scala/akka/remote/artery/TestContext.scala index 9a350984ef..db5965589d 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/TestContext.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/TestContext.scala @@ -19,6 +19,7 @@ import akka.remote.artery.InboundControlJunction.ControlMessageSubject import akka.util.OptionVal import akka.actor.InternalActorRef import akka.dispatch.ExecutionContexts +import com.typesafe.config.ConfigFactory private[remote] class TestInboundContext( override val localAddress: UniqueAddress, @@ -59,6 +60,9 @@ private[remote] class TestInboundContext( protected def createAssociation(remoteAddress: Address): TestOutboundContext = new TestOutboundContext(localAddress, remoteAddress, controlSubject, controlProbe) + + override lazy val settings: ArterySettings = + ArterySettings(ConfigFactory.load().getConfig("akka.remote.artery")) } private[remote] class TestOutboundContext( @@ -94,6 +98,9 @@ private[remote] class TestOutboundContext( OptionVal.None)) } + override lazy val settings: ArterySettings = + ArterySettings(ConfigFactory.load().getConfig("akka.remote.artery")) + } private[remote] class TestControlMessageSubject extends ControlMessageSubject { From 8756ffd75ce3c2b1707dd6d0c06be2993c79a4b2 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Wed, 7 Sep 2016 15:38:13 +0200 Subject: [PATCH 3/8] handle Aeron Publication.CLOSED --- .../scala/akka/remote/artery/AeronSink.scala | 54 +++++++++++++------ .../akka/remote/artery/Association.scala | 1 + 2 files changed, 39 insertions(+), 16 deletions(-) diff --git a/akka-remote/src/main/scala/akka/remote/artery/AeronSink.scala b/akka-remote/src/main/scala/akka/remote/artery/AeronSink.scala index f93a6ab013..b90d412403 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/AeronSink.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/AeronSink.scala @@ -30,11 +30,13 @@ object AeronSink { final class GaveUpSendingException(msg: String) extends RuntimeException(msg) with NoStackTrace + final class PublicationClosedException(msg: String) extends RuntimeException(msg) with NoStackTrace + private val TimerCheckPeriod = 1 << 13 // 8192 private val TimerCheckMask = TimerCheckPeriod - 1 private final class OfferTask(pub: Publication, var buffer: UnsafeBuffer, var msgSize: Int, onOfferSuccess: AsyncCallback[Unit], - giveUpAfter: Duration, onGiveUp: AsyncCallback[Unit]) + giveUpAfter: Duration, onGiveUp: AsyncCallback[Unit], onPublicationClosed: AsyncCallback[Unit]) extends (() ⇒ Boolean) { val giveUpAfterNanos = giveUpAfter match { case f: FiniteDuration ⇒ f.toNanos @@ -54,6 +56,9 @@ object AeronSink { n = 0L onOfferSuccess.invoke(()) true + } else if (result == Publication.CLOSED) { + onPublicationClosed.invoke(()) + true } else if (giveUpAfterNanos >= 0 && (n & TimerCheckMask) == 0 && (System.nanoTime() - startTime) > giveUpAfterNanos) { // the task is invoked by the spinning thread, only check nanoTime each 8192th invocation n = 0L @@ -99,7 +104,7 @@ class AeronSink( private var backoffCount = spinning private var lastMsgSize = 0 private val offerTask = new OfferTask(pub, null, lastMsgSize, getAsyncCallback(_ ⇒ taskOnOfferSuccess()), - giveUpSendAfter, getAsyncCallback(_ ⇒ onGiveUp())) + giveUpSendAfter, getAsyncCallback(_ ⇒ onGiveUp()), getAsyncCallback(_ ⇒ onPublicationClosed())) private val addOfferTask: Add = Add(offerTask) private var offerTaskInProgress = false @@ -135,21 +140,18 @@ class AeronSink( @tailrec private def publish(): Unit = { val result = pub.offer(envelopeInFlight.aeronBuffer, 0, lastMsgSize) - // FIXME handle Publication.CLOSED if (result < 0) { - backoffCount -= 1 - if (backoffCount > 0) { - ThreadHints.onSpinWait() - publish() // recursive - } else { - // delegate backoff to shared TaskRunner - offerTaskInProgress = true - // visibility of these assignments are ensured by adding the task to the command queue - offerTask.buffer = envelopeInFlight.aeronBuffer - offerTask.msgSize = lastMsgSize - delegateTaskStartTime = System.nanoTime() - taskRunner.command(addOfferTask) - flightRecorder.hiFreq(AeronSink_DelegateToTaskRunner, countBeforeDelegate) + if (result == Publication.CLOSED) + onPublicationClosed() + else if (result == Publication.NOT_CONNECTED) + delegateBackoff() + else { + backoffCount -= 1 + if (backoffCount > 0) { + ThreadHints.onSpinWait() + publish() // recursive + } else + delegateBackoff() } } else { countBeforeDelegate += 1 @@ -157,6 +159,17 @@ class AeronSink( } } + private def delegateBackoff(): Unit = { + // delegate backoff to shared TaskRunner + offerTaskInProgress = true + // visibility of these assignments are ensured by adding the task to the command queue + offerTask.buffer = envelopeInFlight.aeronBuffer + offerTask.msgSize = lastMsgSize + delegateTaskStartTime = System.nanoTime() + taskRunner.command(addOfferTask) + flightRecorder.hiFreq(AeronSink_DelegateToTaskRunner, countBeforeDelegate) + } + private def taskOnOfferSuccess(): Unit = { countBeforeDelegate = 0 flightRecorder.hiFreq(AeronSink_ReturnFromTaskRunner, System.nanoTime() - delegateTaskStartTime) @@ -184,6 +197,15 @@ class AeronSink( failStage(cause) } + private def onPublicationClosed(): Unit = { + offerTaskInProgress = false + val cause = new PublicationClosedException(s"Aeron Publication to [${channel}] was closed.") + // this is not exepected, since we didn't close the publication ourselves + flightRecorder.alert(AeronSink_PublicationClosed, channelMetadata) + completedValue = Failure(cause) + failStage(cause) + } + override def onUpstreamFinish(): Unit = { // flush outstanding offer before completing stage if (!offerTaskInProgress) diff --git a/akka-remote/src/main/scala/akka/remote/artery/Association.scala b/akka-remote/src/main/scala/akka/remote/artery/Association.scala index 288799e373..17b9d2af7b 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/Association.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/Association.scala @@ -267,6 +267,7 @@ private[remote] class Association( outboundControlIngress.sendControlMessage(message) def send(message: Any, sender: OptionVal[ActorRef], recipient: OptionVal[RemoteActorRef]): Unit = { + def createOutboundEnvelope(): OutboundEnvelope = outboundEnvelopePool.acquire().init(recipient, message.asInstanceOf[AnyRef], sender) From 9a7d79c88276cf98aa494e1ae42e9336ad34c68e Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Wed, 7 Sep 2016 15:43:08 +0200 Subject: [PATCH 4/8] size of outboundEnvelopePool --- .../akka/remote/artery/ArteryTransport.scala | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala b/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala index d9f6bc05f1..b257f38da0 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala @@ -128,9 +128,9 @@ private[akka] object AssociationState { * INTERNAL API */ private[akka] final class AssociationState( - val incarnation: Int, + val incarnation: Int, val uniqueRemoteAddressPromise: Promise[UniqueAddress], - val quarantined: ImmutableLongMap[AssociationState.QuarantinedTimestamp]) { + val quarantined: ImmutableLongMap[AssociationState.QuarantinedTimestamp]) { import AssociationState.QuarantinedTimestamp @@ -230,7 +230,7 @@ private[akka] trait OutboundContext { */ private[remote] object FlushOnShutdown { def props(done: Promise[Done], timeout: FiniteDuration, - inboundContext: InboundContext, associations: Set[Association]): Props = { + inboundContext: InboundContext, associations: Set[Association]): Props = { require(associations.nonEmpty) Props(new FlushOnShutdown(done, timeout, inboundContext, associations)) } @@ -242,7 +242,7 @@ private[remote] object FlushOnShutdown { * INTERNAL API */ private[remote] class FlushOnShutdown(done: Promise[Done], timeout: FiniteDuration, - inboundContext: InboundContext, associations: Set[Association]) extends Actor { + inboundContext: InboundContext, associations: Set[Association]) extends Actor { var remaining = associations.flatMap(_.associationState.uniqueRemoteAddressValue) @@ -343,9 +343,9 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R private val largeEnvelopeBufferPool = new EnvelopeBufferPool(settings.Advanced.MaximumLargeFrameSize, settings.Advanced.MaximumPooledBuffers) private val inboundEnvelopePool = ReusableInboundEnvelope.createObjectPool(capacity = 16) - // FIXME capacity of outboundEnvelopePool should probably be derived from the sendQueue capacity - // times a factor (for reasonable number of outbound streams) - private val outboundEnvelopePool = ReusableOutboundEnvelope.createObjectPool(capacity = 3072 * 2) + // The outboundEnvelopePool is shared among all outbound associations + private val outboundEnvelopePool = ReusableOutboundEnvelope.createObjectPool(capacity = + settings.Advanced.OutboundMessageQueueSize * settings.Advanced.OutboundLanes * 3) val (afrFileChannel, afrFlie, flightRecorder) = initializeFlightRecorder() match { case None ⇒ (None, None, None) @@ -853,7 +853,7 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R createOutboundSink(ordinaryStreamId, outboundContext, envelopeBufferPool) private def createOutboundSink(streamId: Int, outboundContext: OutboundContext, - bufferPool: EnvelopeBufferPool): Sink[OutboundEnvelope, (ChangeOutboundCompression, Future[Done])] = { + bufferPool: EnvelopeBufferPool): Sink[OutboundEnvelope, (ChangeOutboundCompression, Future[Done])] = { outboundLane(outboundContext, bufferPool) .toMat(aeronSink(outboundContext, streamId))(Keep.both) @@ -872,7 +872,7 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R private def outboundLane( outboundContext: OutboundContext, - bufferPool: EnvelopeBufferPool): Flow[OutboundEnvelope, EnvelopeBuffer, ChangeOutboundCompression] = { + bufferPool: EnvelopeBufferPool): Flow[OutboundEnvelope, EnvelopeBuffer, ChangeOutboundCompression] = { Flow.fromGraph(killSwitch.flow[OutboundEnvelope]) .via(new OutboundHandshake(system, outboundContext, outboundEnvelopePool, settings.Advanced.HandshakeTimeout, From 3b7a7dfa59eb10351f5fd1cdfeb07486ee1f455c Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Wed, 7 Sep 2016 16:07:29 +0200 Subject: [PATCH 5/8] add reason param to quarantine method --- .../main/scala/akka/cluster/ClusterRemoteWatcher.scala | 2 +- .../akka/remote/PiercingShouldKeepQuarantineSpec.scala | 2 +- .../scala/akka/remote/RemoteQuarantinePiercingSpec.scala | 2 +- .../akka/remote/RemoteRestartedQuarantinedSpec.scala | 2 +- .../remote/artery/RemoteRestartedQuarantinedSpec.scala | 2 +- .../main/scala/akka/remote/RemoteActorRefProvider.scala | 3 ++- .../src/main/scala/akka/remote/RemoteTransport.scala | 2 +- .../src/main/scala/akka/remote/RemoteWatcher.scala | 8 ++++---- akka-remote/src/main/scala/akka/remote/Remoting.scala | 2 +- .../main/scala/akka/remote/artery/ArteryTransport.scala | 8 ++++---- .../src/main/scala/akka/remote/artery/Control.scala | 4 ++-- .../src/test/scala/akka/remote/ActorsLeakSpec.scala | 2 +- .../src/test/scala/akka/remote/RemoteWatcherSpec.scala | 2 +- akka-remote/src/test/scala/akka/remote/RemotingSpec.scala | 4 ++-- .../test/scala/akka/remote/artery/RemoteWatcherSpec.scala | 2 +- project/MiMa.scala | 4 ++++ 16 files changed, 28 insertions(+), 23 deletions(-) diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterRemoteWatcher.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterRemoteWatcher.scala index 8fb729930a..454b9ba81a 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterRemoteWatcher.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterRemoteWatcher.scala @@ -90,7 +90,7 @@ private[cluster] class ClusterRemoteWatcher( if (m.address != selfAddress) { clusterNodes -= m.address if (previousStatus == MemberStatus.Down) { - quarantine(m.address, Some(m.uniqueAddress.uid)) + quarantine(m.address, Some(m.uniqueAddress.uid), "Cluster member removed") } publishAddressTerminated(m.address) } diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/PiercingShouldKeepQuarantineSpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/PiercingShouldKeepQuarantineSpec.scala index 2166bdac0d..7a337fa57c 100644 --- a/akka-remote-tests/src/multi-jvm/scala/akka/remote/PiercingShouldKeepQuarantineSpec.scala +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/PiercingShouldKeepQuarantineSpec.scala @@ -58,7 +58,7 @@ abstract class PiercingShouldKeepQuarantineSpec(multiNodeConfig: PiercingShouldK enterBarrier("actor-identified") // Manually Quarantine the other system - RARP(system).provider.transport.quarantine(node(second).address, Some(uid)) + RARP(system).provider.transport.quarantine(node(second).address, Some(uid), "test") // Quarantining is not immediate Thread.sleep(1000) diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/RemoteQuarantinePiercingSpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/RemoteQuarantinePiercingSpec.scala index 5fa058abda..ce0a22005b 100644 --- a/akka-remote-tests/src/multi-jvm/scala/akka/remote/RemoteQuarantinePiercingSpec.scala +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/RemoteQuarantinePiercingSpec.scala @@ -78,7 +78,7 @@ abstract class RemoteQuarantinePiercingSpec(multiNodeConfig: RemoteQuarantinePie enterBarrier("actor-identified") // Manually Quarantine the other system - RARP(system).provider.transport.quarantine(node(second).address, Some(uidFirst)) + RARP(system).provider.transport.quarantine(node(second).address, Some(uidFirst), "test") // Quarantine is up -- Cannot communicate with remote system any more system.actorSelection(RootActorPath(secondAddress) / "user" / "subject") ! "identify" diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/RemoteRestartedQuarantinedSpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/RemoteRestartedQuarantinedSpec.scala index 10837756f1..bf66b9795a 100644 --- a/akka-remote-tests/src/multi-jvm/scala/akka/remote/RemoteRestartedQuarantinedSpec.scala +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/RemoteRestartedQuarantinedSpec.scala @@ -80,7 +80,7 @@ abstract class RemoteRestartedQuarantinedSpec val (uid, ref) = identifyWithUid(second, "subject") - RARP(system).provider.transport.quarantine(node(second).address, Some(uid)) + RARP(system).provider.transport.quarantine(node(second).address, Some(uid), "test") enterBarrier("quarantined") enterBarrier("still-quarantined") diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/RemoteRestartedQuarantinedSpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/RemoteRestartedQuarantinedSpec.scala index 8712d7cb03..1cdcc74fc1 100644 --- a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/RemoteRestartedQuarantinedSpec.scala +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/RemoteRestartedQuarantinedSpec.scala @@ -73,7 +73,7 @@ abstract class RemoteRestartedQuarantinedSpec val (uid, ref) = identifyWithUid(second, "subject", 5.seconds) enterBarrier("before-quarantined") - RARP(system).provider.transport.quarantine(node(second).address, Some(uid)) + RARP(system).provider.transport.quarantine(node(second).address, Some(uid), "test") enterBarrier("quarantined") enterBarrier("still-quarantined") diff --git a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala index 46b195cd4e..1aa85416f6 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala @@ -449,7 +449,8 @@ private[akka] class RemoteActorRefProvider( * @param uid UID of the remote system, if the uid is not defined it will not be a strong quarantine but * the current endpoint writer will be stopped (dropping system messages) and the address will be gated */ - def quarantine(address: Address, uid: Option[Int]): Unit = transport.quarantine(address, uid) + def quarantine(address: Address, uid: Option[Int], reason: String): Unit = + transport.quarantine(address, uid, reason) } diff --git a/akka-remote/src/main/scala/akka/remote/RemoteTransport.scala b/akka-remote/src/main/scala/akka/remote/RemoteTransport.scala index c865aa508d..c8abdabcba 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteTransport.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteTransport.scala @@ -90,6 +90,6 @@ private[akka] abstract class RemoteTransport(val system: ExtendedActorSystem, va * @param uid UID of the remote system, if the uid is not defined it will not be a strong quarantine but * the current endpoint writer will be stopped (dropping system messages) and the address will be gated */ - def quarantine(address: Address, uid: Option[Int]): Unit + def quarantine(address: Address, uid: Option[Int], reason: String): Unit } diff --git a/akka-remote/src/main/scala/akka/remote/RemoteWatcher.scala b/akka-remote/src/main/scala/akka/remote/RemoteWatcher.scala index 1da2c8c797..7681eaf8a3 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteWatcher.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteWatcher.scala @@ -158,7 +158,7 @@ private[akka] class RemoteWatcher( watchingNodes foreach { a ⇒ if (!unreachable(a) && !failureDetector.isAvailable(a)) { log.warning("Detected unreachable: [{}]", a) - quarantine(a, addressUids.get(a)) + quarantine(a, addressUids.get(a), "Deemed unreachable by remote failure detector") publishAddressTerminated(a) unreachable += a } @@ -167,8 +167,8 @@ private[akka] class RemoteWatcher( def publishAddressTerminated(address: Address): Unit = AddressTerminatedTopic(context.system).publish(AddressTerminated(address)) - def quarantine(address: Address, uid: Option[Int]): Unit = - remoteProvider.quarantine(address, uid) + def quarantine(address: Address, uid: Option[Int], reason: String): Unit = + remoteProvider.quarantine(address, uid, reason) def addWatch(watchee: InternalActorRef, watcher: InternalActorRef): Unit = { assert(watcher != self) @@ -282,4 +282,4 @@ private[akka] class RemoteWatcher( log.debug("Re-watch [{} -> {}]", watcher.path, watchee.path) watchee.sendSystemMessage(Watch(watchee, watcher)) // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅ } -} \ No newline at end of file +} diff --git a/akka-remote/src/main/scala/akka/remote/Remoting.scala b/akka-remote/src/main/scala/akka/remote/Remoting.scala index bd59bc4f7c..21597bf459 100644 --- a/akka-remote/src/main/scala/akka/remote/Remoting.scala +++ b/akka-remote/src/main/scala/akka/remote/Remoting.scala @@ -225,7 +225,7 @@ private[remote] class Remoting(_system: ExtendedActorSystem, _provider: RemoteAc case None ⇒ throw new RemoteTransportExceptionNoStackTrace("Attempted to send management command but Remoting is not running.", null) } - override def quarantine(remoteAddress: Address, uid: Option[Int]): Unit = endpointManager match { + override def quarantine(remoteAddress: Address, uid: Option[Int], reason: String): Unit = endpointManager match { case Some(manager) ⇒ manager ! Quarantine(remoteAddress, uid) case _ ⇒ throw new RemoteTransportExceptionNoStackTrace( s"Attempted to quarantine address [$remoteAddress] with uid [$uid] but Remoting is not running", null) diff --git a/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala b/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala index b257f38da0..25aefe20c3 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala @@ -543,7 +543,7 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R private def startAeronErrorLog(): Unit = { val errorLog = new AeronErrorLog(new File(aeronDir, CncFileDescriptor.CNC_FILE)) val lastTimestamp = new AtomicLong(0L) - import system.dispatcher // FIXME perhaps use another dispatcher for this + import system.dispatcher aeronErrorLogTask = system.scheduler.schedule(3.seconds, 5.seconds) { if (!isShutdown) { val newLastTimestamp = errorLog.logErrors(log, lastTimestamp.get) @@ -840,9 +840,9 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R private def publishLifecycleEvent(event: RemotingLifecycleEvent): Unit = eventPublisher.notifyListeners(event) - override def quarantine(remoteAddress: Address, uid: Option[Int]): Unit = { - // FIXME change the method signature (old remoting) to include reason and use Long uid? - association(remoteAddress).quarantine(reason = "", uid.map(_.toLong)) + override def quarantine(remoteAddress: Address, uid: Option[Int], reason: String): Unit = { + // FIXME use Long uid + association(remoteAddress).quarantine(reason, uid.map(_.toLong)) } def outboundLarge(outboundContext: OutboundContext): Sink[OutboundEnvelope, Future[Done]] = diff --git a/akka-remote/src/main/scala/akka/remote/artery/Control.scala b/akka-remote/src/main/scala/akka/remote/artery/Control.scala index 66d957244d..d28df42a05 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/Control.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/Control.scala @@ -60,13 +60,13 @@ private[akka] object InboundControlJunction { * subject to get notification of incoming control * messages. */ - private[akka] trait ControlMessageSubject { + private[remote] trait ControlMessageSubject { def attach(observer: ControlMessageObserver): Future[Done] def detach(observer: ControlMessageObserver): Unit def stopped: Future[Done] } - private[akka] trait ControlMessageObserver { + private[remote] trait ControlMessageObserver { /** * Notification of incoming control message. The message diff --git a/akka-remote/src/test/scala/akka/remote/ActorsLeakSpec.scala b/akka-remote/src/test/scala/akka/remote/ActorsLeakSpec.scala index 409891d001..7012986e2d 100644 --- a/akka-remote/src/test/scala/akka/remote/ActorsLeakSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/ActorsLeakSpec.scala @@ -119,7 +119,7 @@ class ActorsLeakSpec extends AkkaSpec(ActorsLeakSpec.config) with ImplicitSender val beforeQuarantineActors = targets.flatMap(collectLiveActors).toSet // it must not quarantine the current connection - RARP(system).provider.transport.quarantine(remoteAddress, Some(AddressUidExtension(remoteSystem).addressUid + 1)) + RARP(system).provider.transport.quarantine(remoteAddress, Some(AddressUidExtension(remoteSystem).addressUid + 1), "test") // the message from local to remote should reuse passive inbound connection system.actorSelection(RootActorPath(remoteAddress) / "user" / "stoppable") ! Identify(1) diff --git a/akka-remote/src/test/scala/akka/remote/RemoteWatcherSpec.scala b/akka-remote/src/test/scala/akka/remote/RemoteWatcherSpec.scala index 59f23d4398..56ff462e8e 100644 --- a/akka-remote/src/test/scala/akka/remote/RemoteWatcherSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/RemoteWatcherSpec.scala @@ -53,7 +53,7 @@ object RemoteWatcherSpec { // that doesn't interfere with the real watch that is going on in the background context.system.eventStream.publish(TestRemoteWatcher.AddressTerm(address)) - override def quarantine(address: Address, uid: Option[Int]): Unit = { + override def quarantine(address: Address, uid: Option[Int], reason: String): Unit = { // don't quarantine in remoting, but publish a testable message context.system.eventStream.publish(TestRemoteWatcher.Quarantined(address, uid)) } diff --git a/akka-remote/src/test/scala/akka/remote/RemotingSpec.scala b/akka-remote/src/test/scala/akka/remote/RemotingSpec.scala index e08a1a0267..701cc5c670 100644 --- a/akka-remote/src/test/scala/akka/remote/RemotingSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/RemotingSpec.scala @@ -769,11 +769,11 @@ class RemotingSpec extends AkkaSpec(RemotingSpec.cfg) with ImplicitSender with D inboundHandleProbe.expectNoMsg(1.second) // Quarantine unrelated connection - RARP(thisSystem).provider.quarantine(remoteAddress, Some(-1)) + RARP(thisSystem).provider.quarantine(remoteAddress, Some(-1), "test") inboundHandleProbe.expectNoMsg(1.second) // Quarantine the connection - RARP(thisSystem).provider.quarantine(remoteAddress, Some(remoteUID)) + RARP(thisSystem).provider.quarantine(remoteAddress, Some(remoteUID), "test") // Even though the connection is stashed it will be disassociated inboundHandleProbe.expectMsgType[AssociationHandle.Disassociated] diff --git a/akka-remote/src/test/scala/akka/remote/artery/RemoteWatcherSpec.scala b/akka-remote/src/test/scala/akka/remote/artery/RemoteWatcherSpec.scala index ede7c7f62e..3267448214 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/RemoteWatcherSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/RemoteWatcherSpec.scala @@ -54,7 +54,7 @@ object RemoteWatcherSpec { // that doesn't interfere with the real watch that is going on in the background context.system.eventStream.publish(TestRemoteWatcher.AddressTerm(address)) - override def quarantine(address: Address, uid: Option[Int]): Unit = { + override def quarantine(address: Address, uid: Option[Int], reason: String): Unit = { // don't quarantine in remoting, but publish a testable message context.system.eventStream.publish(TestRemoteWatcher.Quarantined(address, uid)) } diff --git a/project/MiMa.scala b/project/MiMa.scala index 13351fc3f3..3c293e9b4f 100644 --- a/project/MiMa.scala +++ b/project/MiMa.scala @@ -966,6 +966,10 @@ object MiMa extends AutoPlugin { ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.scaladsl.FlowOps.orElseGraph"), ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.scaladsl.FlowOps.orElse"), ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.scaladsl.FlowOpsMat.orElseMat") + ), + "2.4.10" -> Seq( + ProblemFilters.exclude[DirectMissingMethodProblem]("akka.remote.RemoteActorRefProvider.quarantine"), + ProblemFilters.exclude[DirectMissingMethodProblem]("akka.remote.RemoteWatcher.quarantine") ) ) } From 74a8bb3a006e0a88a27ee39dbe179ad0f82f4dbf Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Wed, 7 Sep 2016 17:50:17 +0200 Subject: [PATCH 6/8] flight recorder event for send queue overflow --- .../akka/remote/artery/Association.scala | 24 ++++++------ .../remote/artery/FlightRecorderEvents.scala | 37 ++++++++++--------- 2 files changed, 32 insertions(+), 29 deletions(-) diff --git a/akka-remote/src/main/scala/akka/remote/artery/Association.scala b/akka-remote/src/main/scala/akka/remote/artery/Association.scala index 17b9d2af7b..1592a85abc 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/Association.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/Association.scala @@ -97,8 +97,10 @@ private[remote] class Association( outboundEnvelopePool: ObjectPool[ReusableOutboundEnvelope]) extends AbstractAssociation with OutboundContext { import Association._ + import FlightRecorderEvents._ private val log = Logging(transport.system, getClass.getName) + private val flightRecorder = transport.createFlightRecorderEventSink(synchr = true) override def settings = transport.settings private def advancedSettings = transport.settings.Advanced @@ -274,11 +276,11 @@ private[remote] class Association( // volatile read to see latest queue array val unused = queuesVisibility - def dropped(qSize: Int, env: OutboundEnvelope): Unit = { + def dropped(queueIndex: Int, qSize: Int, env: OutboundEnvelope): Unit = { log.debug( "Dropping message [{}] from [{}] to [{}] due to overflow of send queue, size [{}]", message.getClass, sender.getOrElse(deadletters), recipient.getOrElse(recipient), qSize) - // FIXME AFR + flightRecorder.hiFreq(Transport_SendQueueOverflow, queueIndex) deadletters ! env } @@ -289,7 +291,7 @@ private[remote] class Association( val outboundEnvelope = createOutboundEnvelope() if (!controlQueue.offer(createOutboundEnvelope())) { quarantine(reason = s"Due to overflow of control queue, size [$controlQueueSize]") - dropped(controlQueueSize, outboundEnvelope) + dropped(ControlQueueIndex, controlQueueSize, outboundEnvelope) } case _: DaemonMsgCreate ⇒ // DaemonMsgCreate is not a SystemMessage, but must be sent over the control stream because @@ -298,18 +300,19 @@ private[remote] class Association( // destination) before the first ordinary message arrives. val outboundEnvelope1 = createOutboundEnvelope() if (!controlQueue.offer(outboundEnvelope1)) - dropped(controlQueueSize, outboundEnvelope1) + dropped(ControlQueueIndex, controlQueueSize, outboundEnvelope1) (0 until outboundLanes).foreach { i ⇒ val outboundEnvelope2 = createOutboundEnvelope() if (!queues(OrdinaryQueueIndex + i).offer(outboundEnvelope2)) - dropped(queueSize, outboundEnvelope2) + dropped(OrdinaryQueueIndex + i, queueSize, outboundEnvelope2) } case _ ⇒ val outboundEnvelope = createOutboundEnvelope() - val queue = selectQueue(recipient) + val queueIndex = selectQueue(recipient) + val queue = queues(queueIndex) val offerOk = queue.offer(outboundEnvelope) if (!offerOk) - dropped(queueSize, outboundEnvelope) + dropped(queueIndex, queueSize, outboundEnvelope) } } else if (log.isDebugEnabled) @@ -318,10 +321,10 @@ private[remote] class Association( message.getClass, sender.getOrElse(deadletters), recipient.getOrElse(recipient), remoteAddress) } - private def selectQueue(recipient: OptionVal[RemoteActorRef]): ProducerApi[OutboundEnvelope] = { + private def selectQueue(recipient: OptionVal[RemoteActorRef]): Int = { recipient match { case OptionVal.Some(r) ⇒ - val queueIndex = r.cachedSendQueueIndex match { + r.cachedSendQueueIndex match { case -1 ⇒ // only happens when messages are sent to new remote destination // and is then cached on the RemoteActorRef @@ -343,10 +346,9 @@ private[remote] class Association( idx case idx ⇒ idx } - queues(queueIndex) case OptionVal.None ⇒ - queues(OrdinaryQueueIndex) + OrdinaryQueueIndex } } diff --git a/akka-remote/src/main/scala/akka/remote/artery/FlightRecorderEvents.scala b/akka-remote/src/main/scala/akka/remote/artery/FlightRecorderEvents.scala index 4d1497025c..e800e5493f 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/FlightRecorderEvents.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/FlightRecorderEvents.scala @@ -18,29 +18,30 @@ object FlightRecorderEvents { val Transport_AeronErrorLogTaskStopped = 10 val Transport_MediaFileDeleted = 11 val Transport_FlightRecorderClose = 12 + val Transport_SendQueueOverflow = 13 // Aeron Sink events - val AeronSink_Started = 13 - val AeronSink_TaskRunnerRemoved = 14 - val AeronSink_PublicationClosed = 15 - val AeronSink_Stopped = 16 - val AeronSink_EnvelopeGrabbed = 17 - val AeronSink_EnvelopeOffered = 18 - val AeronSink_GaveUpEnvelope = 19 - val AeronSink_DelegateToTaskRunner = 20 - val AeronSink_ReturnFromTaskRunner = 21 + val AeronSink_Started = 50 + val AeronSink_TaskRunnerRemoved = 51 + val AeronSink_PublicationClosed = 52 + val AeronSink_Stopped = 53 + val AeronSink_EnvelopeGrabbed = 54 + val AeronSink_EnvelopeOffered = 55 + val AeronSink_GaveUpEnvelope = 56 + val AeronSink_DelegateToTaskRunner = 57 + val AeronSink_ReturnFromTaskRunner = 58 // Aeron Source events - val AeronSource_Started = 22 - val AeronSource_Stopped = 23 - val AeronSource_Received = 24 - val AeronSource_DelegateToTaskRunner = 25 - val AeronSource_ReturnFromTaskRunner = 26 + val AeronSource_Started = 70 + val AeronSource_Stopped = 71 + val AeronSource_Received = 72 + val AeronSource_DelegateToTaskRunner = 72 + val AeronSource_ReturnFromTaskRunner = 73 // Compression events - val Compression_CompressedActorRef = 25 - val Compression_AllocatedActorRefCompressionId = 26 - val Compression_CompressedManifest = 27 - val Compression_AllocatedManifestCompressionId = 28 + val Compression_CompressedActorRef = 90 + val Compression_AllocatedActorRefCompressionId = 91 + val Compression_CompressedManifest = 91 + val Compression_AllocatedManifestCompressionId = 92 } From c74ece957395d7931a2860d1040adbd81ea7cb8b Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Wed, 7 Sep 2016 21:19:27 +0200 Subject: [PATCH 7/8] require power of two --- .../scala/akka/remote/artery/ArteryTransport.scala | 12 ++++++------ .../scala/akka/remote/artery/FlightRecorder.scala | 6 ++++-- 2 files changed, 10 insertions(+), 8 deletions(-) diff --git a/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala b/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala index 25aefe20c3..83e7c81ea9 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala @@ -128,9 +128,9 @@ private[akka] object AssociationState { * INTERNAL API */ private[akka] final class AssociationState( - val incarnation: Int, + val incarnation: Int, val uniqueRemoteAddressPromise: Promise[UniqueAddress], - val quarantined: ImmutableLongMap[AssociationState.QuarantinedTimestamp]) { + val quarantined: ImmutableLongMap[AssociationState.QuarantinedTimestamp]) { import AssociationState.QuarantinedTimestamp @@ -230,7 +230,7 @@ private[akka] trait OutboundContext { */ private[remote] object FlushOnShutdown { def props(done: Promise[Done], timeout: FiniteDuration, - inboundContext: InboundContext, associations: Set[Association]): Props = { + inboundContext: InboundContext, associations: Set[Association]): Props = { require(associations.nonEmpty) Props(new FlushOnShutdown(done, timeout, inboundContext, associations)) } @@ -242,7 +242,7 @@ private[remote] object FlushOnShutdown { * INTERNAL API */ private[remote] class FlushOnShutdown(done: Promise[Done], timeout: FiniteDuration, - inboundContext: InboundContext, associations: Set[Association]) extends Actor { + inboundContext: InboundContext, associations: Set[Association]) extends Actor { var remaining = associations.flatMap(_.associationState.uniqueRemoteAddressValue) @@ -853,7 +853,7 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R createOutboundSink(ordinaryStreamId, outboundContext, envelopeBufferPool) private def createOutboundSink(streamId: Int, outboundContext: OutboundContext, - bufferPool: EnvelopeBufferPool): Sink[OutboundEnvelope, (ChangeOutboundCompression, Future[Done])] = { + bufferPool: EnvelopeBufferPool): Sink[OutboundEnvelope, (ChangeOutboundCompression, Future[Done])] = { outboundLane(outboundContext, bufferPool) .toMat(aeronSink(outboundContext, streamId))(Keep.both) @@ -872,7 +872,7 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R private def outboundLane( outboundContext: OutboundContext, - bufferPool: EnvelopeBufferPool): Flow[OutboundEnvelope, EnvelopeBuffer, ChangeOutboundCompression] = { + bufferPool: EnvelopeBufferPool): Flow[OutboundEnvelope, EnvelopeBuffer, ChangeOutboundCompression] = { Flow.fromGraph(killSwitch.flow[OutboundEnvelope]) .via(new OutboundHandshake(system, outboundContext, outboundEnvelopePool, settings.Advanced.HandshakeTimeout, diff --git a/akka-remote/src/main/scala/akka/remote/artery/FlightRecorder.scala b/akka-remote/src/main/scala/akka/remote/artery/FlightRecorder.scala index 9ace0f9315..4d3353beb0 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/FlightRecorder.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/FlightRecorder.scala @@ -132,7 +132,8 @@ private[remote] class RollingEventLogSection( recordSize: Int) { import RollingEventLogSection._ - // FIXME: check if power of two + require(entryCount > 0, "entryCount must be greater than 0") + require((entryCount & (entryCount - 1)) == 0, "entryCount must be power of two") private[this] val LogMask: Long = entryCount - 1L private[this] val buffers: Array[MappedResizeableBuffer] = Array.tabulate(FlightRecorder.SnapshotCount) { logId ⇒ @@ -237,7 +238,8 @@ private[akka] class FlightRecorder(val fileChannel: FileChannel) extends AtomicB private[this] val globalSection = new MappedResizeableBuffer(fileChannel, 0, GlobalSectionSize) - // FIXME: check if power of two + require(SnapshotCount > 0, "SnapshotCount must be greater than 0") + require((SnapshotCount & (SnapshotCount - 1)) == 0, "SnapshotCount must be power of two") private[this] val SnapshotMask = SnapshotCount - 1 private[this] val alertLogs = new RollingEventLogSection( From fea18a9a1bd43657e5f5101a2e13a264a6a441eb Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Thu, 8 Sep 2016 07:48:56 +0200 Subject: [PATCH 8/8] mute expected exception in RemoteDeathWatchSpec --- .../test/scala/akka/remote/artery/RemoteDeathWatchSpec.scala | 3 +++ 1 file changed, 3 insertions(+) diff --git a/akka-remote/src/test/scala/akka/remote/artery/RemoteDeathWatchSpec.scala b/akka-remote/src/test/scala/akka/remote/artery/RemoteDeathWatchSpec.scala index c7da3eab69..b070097412 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/RemoteDeathWatchSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/RemoteDeathWatchSpec.scala @@ -36,6 +36,9 @@ object RemoteDeathWatchSpec { class RemoteDeathWatchSpec extends AkkaSpec(RemoteDeathWatchSpec.config) with ImplicitSender with DefaultTimeout with DeathWatchSpec { import RemoteDeathWatchSpec._ + system.eventStream.publish(TestEvent.Mute( + EventFilter[io.aeron.exceptions.RegistrationException]())) + val other = ActorSystem("other", ConfigFactory.parseString(s"akka.remote.artery.port=$otherPort") .withFallback(system.settings.config))