diff --git a/akka-bench-jmh/src/main/scala/akka/remote/artery/SendQueueBenchmark.scala b/akka-bench-jmh/src/main/scala/akka/remote/artery/SendQueueBenchmark.scala index 5af52d0ea1..3d00630cb9 100644 --- a/akka-bench-jmh/src/main/scala/akka/remote/artery/SendQueueBenchmark.scala +++ b/akka-bench-jmh/src/main/scala/akka/remote/artery/SendQueueBenchmark.scala @@ -115,7 +115,7 @@ class SendQueueBenchmark { val burstSize = 1000 val queue = new ManyToOneConcurrentArrayQueue[Int](1024) - val source = Source.fromGraph(new SendQueue[Int](system.deadLetters)) + val source = Source.fromGraph(new SendQueue[Int](_ ⇒ ())) val (sendQueue, killSwitch) = source.viaMat(KillSwitches.single)(Keep.both) .toMat(new BarrierSink(N, latch, burstSize, barrier))(Keep.left).run()(materializer) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/LargeMessageClusterSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/LargeMessageClusterSpec.scala index 21695812ad..14d9fe24ec 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/LargeMessageClusterSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/LargeMessageClusterSpec.scala @@ -7,9 +7,12 @@ import scala.concurrent.duration._ import akka.actor.ActorIdentity import akka.actor.ActorRef +import akka.actor.ExtendedActorSystem import akka.actor.Identify import akka.actor.PoisonPill import akka.cluster.ClusterEvent.UnreachableMember +import akka.remote.RARP +import akka.remote.artery.ArterySettings import akka.remote.testconductor.RoleName import akka.remote.testkit.MultiNodeConfig import akka.remote.testkit.MultiNodeSpec @@ -26,14 +29,14 @@ object LargeMessageClusterMultiJvmSpec extends MultiNodeConfig { commonConfig(ConfigFactory.parseString( """ akka { - #loglevel = DEBUG + loglevel = DEBUG cluster.debug.verbose-heartbeat-logging = on loggers = ["akka.testkit.TestEventListener"] actor.provider = cluster testconductor.barrier-timeout = 3 minutes - cluster.failure-detector.acceptable-heartbeat-pause = 3 s + cluster.failure-detector.acceptable-heartbeat-pause = 5 s remote.artery { enabled = on @@ -128,21 +131,19 @@ abstract class LargeMessageClusterSpec extends MultiNodeSpec(LargeMessageCluster "not disturb cluster heartbeat messages when saturated" taggedAs LongRunningTest in { + // FIXME only enabled for Aeron transport until #24576 is fixed + val arterySettings = ArterySettings(system.settings.config.getConfig("akka.remote.artery")) + if (!arterySettings.Enabled || arterySettings.Transport != ArterySettings.AeronUpd) + pending + runOn(second) { - val echo2 = identify(second, "echo") - val echo3 = identify(third, "echo") val largeEcho2 = identify(second, "largeEcho") val largeEcho3 = identify(third, "largeEcho") - val ordinaryMsgSize = 10 * 1024 - val ordinaryMsg = ("0" * ordinaryMsgSize).getBytes("utf-8") - (1 to 5).foreach { _ ⇒ - echo2.tell(ordinaryMsg, echo3) - } - - val largeMsgSize = 2 * 1000 * 1000 + val largeMsgSize = 1 * 1000 * 1000 val largeMsg = ("0" * largeMsgSize).getBytes("utf-8") - (1 to 5).foreach { _ ⇒ + (1 to 3).foreach { _ ⇒ + // this will ping-pong between second and third largeEcho2.tell(largeMsg, largeEcho3) } } diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/aeron/AeronStreamLatencySpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/aeron/AeronStreamLatencySpec.scala index 7c7e045b3c..4194a48ac4 100644 --- a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/aeron/AeronStreamLatencySpec.scala +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/aeron/AeronStreamLatencySpec.scala @@ -168,6 +168,9 @@ abstract class AeronStreamLatencySpec stats.print(System.out) } + def sendToDeadLetters[T](pending: Vector[T]): Unit = + pending.foreach(system.deadLetters ! _) + val scenarios = List( TestSettings( testName = "rate-100-size-100", @@ -259,7 +262,7 @@ abstract class AeronStreamLatencySpec envelope } - val queueValue = Source.fromGraph(new SendQueue[Unit](system.deadLetters)) + val queueValue = Source.fromGraph(new SendQueue[Unit](sendToDeadLetters)) .via(sendFlow) .to(new AeronSink(channel(second), streamId, aeron, taskRunner, pool, giveUpMessageAfter, IgnoreEventSink)) .run() diff --git a/akka-remote/src/main/mima-filters/2.5.9.backwards.excludes b/akka-remote/src/main/mima-filters/2.5.9.backwards.excludes index 2e3a0d951b..00108ce0c3 100644 --- a/akka-remote/src/main/mima-filters/2.5.9.backwards.excludes +++ b/akka-remote/src/main/mima-filters/2.5.9.backwards.excludes @@ -26,6 +26,20 @@ ProblemFilters.exclude[Problem]("akka.remote.artery.AeronSource*") ProblemFilters.exclude[Problem]("akka.remote.artery.TaskRunner*") ProblemFilters.exclude[Problem]("akka.remote.artery.AeronErrorLog*") +# #23967 Stop unused Artery outbound streams +ProblemFilters.exclude[Problem]("akka.remote.artery.InboundControlJunction*") +ProblemFilters.exclude[Problem]("akka.remote.artery.Association*") +ProblemFilters.exclude[Problem]("akka.remote.artery.OutboundContext*") +ProblemFilters.exclude[Problem]("akka.remote.artery.Decoder*") +ProblemFilters.exclude[Problem]("akka.remote.artery.AssociationState*") +ProblemFilters.exclude[Problem]("akka.remote.artery.compress.InboundCompressions*") + + + + + + + diff --git a/akka-remote/src/main/resources/reference.conf b/akka-remote/src/main/resources/reference.conf index 67a51111a3..bb8430c0ce 100644 --- a/akka-remote/src/main/resources/reference.conf +++ b/akka-remote/src/main/resources/reference.conf @@ -972,6 +972,21 @@ akka { # of a network partition that you need to survive. give-up-system-message-after = 6 hours + # After catastrophic communication failures that could result in the loss of system + # messages or after the remote DeathWatch triggers the remote system gets + # quarantined to prevent inconsistent behavior. + # This setting controls how long the quarantined association will be kept around + # before being removed to avoid long-term memory leaks. It must be quarantined + # and also unused for this duration before it's removed. When removed the historical + # information about which UIDs that were quarantined for that hostname:port is + # gone which could result in communication with a previously quarantined node + # if it wakes up again. Therfore this shouldn't be set too low. + remove-quarantined-association-after = 1 h + + # Outbound streams are stopped when they haven't been used for this duration. + # They are started again when new messages are sent. + stop-idle-outbound-after = 5.minutes + # during ActorSystem termination the remoting will wait this long for # an acknowledgment by the destination system that flushing of outstanding # remote messages has been completed diff --git a/akka-remote/src/main/scala/akka/remote/artery/ArterySettings.scala b/akka-remote/src/main/scala/akka/remote/artery/ArterySettings.scala index 46f1810d8d..d8063fdd5a 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/ArterySettings.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/ArterySettings.scala @@ -143,6 +143,11 @@ private[akka] final class ArterySettings private (config: Config) { val GiveUpSystemMessageAfter: FiniteDuration = config.getMillisDuration("give-up-system-message-after").requiring(interval ⇒ interval > Duration.Zero, "give-up-system-message-after must be more than zero") + val RemoveQuarantinedAssociationAfter: FiniteDuration = + config.getMillisDuration("remove-quarantined-association-after").requiring(interval ⇒ + interval > Duration.Zero, "remove-quarantined-association-after must be more than zero") + val StopIdleOutboundAfter: FiniteDuration = config.getMillisDuration("stop-idle-outbound-after").requiring(interval ⇒ + interval > Duration.Zero, "stop-idle-outbound-after must be more than zero") val ShutdownFlushTimeout: FiniteDuration = config.getMillisDuration("shutdown-flush-timeout").requiring(interval ⇒ interval > Duration.Zero, "shutdown-flush-timeout must be more than zero") diff --git a/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala b/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala index 78f8dade93..bfd0720e82 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala @@ -10,6 +10,7 @@ import java.nio.channels.ServerSocketChannel import java.nio.file.Path import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.AtomicBoolean +import java.util.concurrent.atomic.AtomicLong import java.util.concurrent.atomic.AtomicReference import scala.annotation.tailrec @@ -36,7 +37,6 @@ import akka.remote.RemoteActorRefProvider import akka.remote.RemoteTransport import akka.remote.ThisActorSystemQuarantinedEvent import akka.remote.UniqueAddress -import akka.remote.artery.ArteryTransport.ShuttingDown import akka.remote.artery.Decoder.InboundCompressionAccess import akka.remote.artery.Encoder.OutboundCompressionAccess import akka.remote.artery.InboundControlJunction.ControlMessageObserver @@ -102,6 +102,8 @@ private[remote] object AssociationState { new AssociationState( incarnation = 1, uniqueRemoteAddressPromise = Promise(), + lastUsedTimestamp = new AtomicLong(System.nanoTime()), + controlIdleKillSwitch = OptionVal.None, quarantined = ImmutableLongMap.empty[QuarantinedTimestamp]) final case class QuarantinedTimestamp(nanoTime: Long) { @@ -116,6 +118,8 @@ private[remote] object AssociationState { private[remote] final class AssociationState( val incarnation: Int, val uniqueRemoteAddressPromise: Promise[UniqueAddress], + val lastUsedTimestamp: AtomicLong, // System.nanoTime timestamp + val controlIdleKillSwitch: OptionVal[SharedKillSwitch], val quarantined: ImmutableLongMap[AssociationState.QuarantinedTimestamp]) { import AssociationState.QuarantinedTimestamp @@ -143,7 +147,8 @@ private[remote] final class AssociationState( } def newIncarnation(remoteAddressPromise: Promise[UniqueAddress]): AssociationState = - new AssociationState(incarnation + 1, remoteAddressPromise, quarantined) + new AssociationState(incarnation + 1, remoteAddressPromise, + lastUsedTimestamp = new AtomicLong(System.nanoTime()), controlIdleKillSwitch, quarantined) def newQuarantined(): AssociationState = uniqueRemoteAddressPromise.future.value match { @@ -151,6 +156,8 @@ private[remote] final class AssociationState( new AssociationState( incarnation, uniqueRemoteAddressPromise, + lastUsedTimestamp = new AtomicLong(System.nanoTime()), + controlIdleKillSwitch, quarantined = quarantined.updated(a.uid, QuarantinedTimestamp(System.nanoTime()))) case _ ⇒ this } @@ -164,6 +171,10 @@ private[remote] final class AssociationState( def isQuarantined(uid: Long): Boolean = quarantined.contains(uid) + def withControlIdleKillSwitch(killSwitch: OptionVal[SharedKillSwitch]): AssociationState = + new AssociationState(incarnation, uniqueRemoteAddressPromise, lastUsedTimestamp, + controlIdleKillSwitch = killSwitch, quarantined) + override def toString(): String = { val a = uniqueRemoteAddressPromise.future.value match { case Some(Success(a)) ⇒ a @@ -201,6 +212,11 @@ private[remote] trait OutboundContext { */ def sendControl(message: ControlMessage): Unit + /** + * @return `true` if any of the streams are active (not stopped due to idle) + */ + def isOrdinaryMessageStreamActive(): Boolean + /** * An outbound stage can listen to control messages * via this observer subject. @@ -364,7 +380,10 @@ private[remote] abstract class ArteryTransport(_system: ExtendedActorSystem, _pr private val outboundEnvelopePool = ReusableOutboundEnvelope.createObjectPool(capacity = settings.Advanced.OutboundMessageQueueSize * settings.Advanced.OutboundLanes * 3) - protected val topLevelFREvents = + /** + * Thread-safe flight recorder for top level events. + */ + val topLevelFlightRecorder: EventSink = createFlightRecorderEventSink(synchr = true) def createFlightRecorderEventSink(synchr: Boolean = false): EventSink = { @@ -389,6 +408,8 @@ private[remote] abstract class ArteryTransport(_system: ExtendedActorSystem, _pr priorityMessageDestinations, outboundEnvelopePool)) + def remoteAddresses: Set[Address] = associationRegistry.allAssociations.map(_.remoteAddress) + override def settings: ArterySettings = provider.remoteSettings.Artery override def start(): Unit = { @@ -396,7 +417,7 @@ private[remote] abstract class ArteryTransport(_system: ExtendedActorSystem, _pr Runtime.getRuntime.addShutdownHook(shutdownHook) startTransport() - topLevelFREvents.loFreq(Transport_Started, NoMetaData) + topLevelFlightRecorder.loFreq(Transport_Started, NoMetaData) val udp = settings.Transport == ArterySettings.AeronUpd val port = @@ -420,7 +441,7 @@ private[remote] abstract class ArteryTransport(_system: ExtendedActorSystem, _pr AddressUidExtension(system).longAddressUid) // TODO: This probably needs to be a global value instead of an event as events might rotate out of the log - topLevelFREvents.loFreq(Transport_UniqueAddressSet, _localAddress.toString()) + topLevelFlightRecorder.loFreq(Transport_UniqueAddressSet, _localAddress.toString()) materializer = ActorMaterializer.systemMaterializer(settings.Advanced.MaterializerSettings, "remote", system) controlMaterializer = ActorMaterializer.systemMaterializer( @@ -428,10 +449,12 @@ private[remote] abstract class ArteryTransport(_system: ExtendedActorSystem, _pr "remoteControl", system) messageDispatcher = new MessageDispatcher(system, provider) - topLevelFREvents.loFreq(Transport_MaterializerStarted, NoMetaData) + topLevelFlightRecorder.loFreq(Transport_MaterializerStarted, NoMetaData) runInboundStreams() - topLevelFREvents.loFreq(Transport_StartupFinished, NoMetaData) + topLevelFlightRecorder.loFreq(Transport_StartupFinished, NoMetaData) + + startRemoveQuarantinedAssociationTask() log.info( "Remoting started with transport [Artery {}]; listening on address [{}] with UID [{}]", @@ -442,6 +465,15 @@ private[remote] abstract class ArteryTransport(_system: ExtendedActorSystem, _pr protected def runInboundStreams(): Unit + private def startRemoveQuarantinedAssociationTask(): Unit = { + val removeAfter = settings.Advanced.RemoveQuarantinedAssociationAfter + val interval = removeAfter / 2 + system.scheduler.schedule(removeAfter, interval) { + if (!isShutdown) + associationRegistry.removeUnusedQuarantined(removeAfter) + }(system.dispatcher) + } + // Select inbound lane based on destination to preserve message order, // Also include the uid of the sending system in the hash to spread // "hot" destinations, e.g. ActorSelection anchor. @@ -552,6 +584,8 @@ private[remote] abstract class ArteryTransport(_system: ExtendedActorSystem, _pr case ShuttingDown ⇒ // silence it } } + + override def controlSubjectCompleted(signal: Try[Done]): Unit = () }) } @@ -568,6 +602,7 @@ private[remote] abstract class ArteryTransport(_system: ExtendedActorSystem, _pr case cause ⇒ if (restartCounter.restart()) { log.error(cause, "{} failed. Restarting it. {}", streamName, cause.getMessage) + topLevelFlightRecorder.loFreq(Transport_RestartInbound, s"$localAddress - $streamName") restart() } else { log.error(cause, "{} failed and restarted {} times within {} seconds. Terminating system. {}", @@ -602,7 +637,7 @@ private[remote] abstract class ArteryTransport(_system: ExtendedActorSystem, _pr import system.dispatcher killSwitch.abort(ShutdownSignal) - topLevelFREvents.loFreq(Transport_KillSwitchPulled, NoMetaData) + topLevelFlightRecorder.loFreq(Transport_KillSwitchPulled, NoMetaData) for { _ ← streamsCompleted.recover { case _ ⇒ Done } _ ← shutdownTransport().recover { case _ ⇒ Done } @@ -610,7 +645,7 @@ private[remote] abstract class ArteryTransport(_system: ExtendedActorSystem, _pr // no need to explicitly shut down the contained access since it's lifecycle is bound to the Decoder _inboundCompressionAccess = OptionVal.None - topLevelFREvents.loFreq(Transport_FlightRecorderClose, NoMetaData) + topLevelFlightRecorder.loFreq(Transport_FlightRecorderClose, NoMetaData) flightRecorder.foreach(_.close()) afrFileChannel.foreach(_.force(true)) afrFileChannel.foreach(_.close()) @@ -692,8 +727,7 @@ private[remote] abstract class ArteryTransport(_system: ExtendedActorSystem, _pr override def completeHandshake(peer: UniqueAddress): Future[Done] = { try { - val a = associationRegistry.setUID(peer) - a.completeHandshake(peer) + associationRegistry.setUID(peer).completeHandshake(peer) } catch { case ShuttingDown ⇒ Future.successful(Done) // silence it } diff --git a/akka-remote/src/main/scala/akka/remote/artery/Association.scala b/akka-remote/src/main/scala/akka/remote/artery/Association.scala index 366addb2aa..5ecf337684 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/Association.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/Association.scala @@ -5,6 +5,7 @@ package akka.remote.artery import java.util.Queue import java.util.concurrent.CountDownLatch +import java.util.concurrent.LinkedBlockingQueue import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.AtomicBoolean import java.util.concurrent.atomic.AtomicReference @@ -13,26 +14,23 @@ import scala.annotation.tailrec import scala.concurrent.Future import scala.concurrent.Promise import scala.concurrent.duration._ -import scala.concurrent.duration.FiniteDuration + import akka.{ Done, NotUsed } import akka.actor.ActorRef import akka.actor.ActorSelectionMessage import akka.actor.Address import akka.dispatch.sysmsg.SystemMessage import akka.event.Logging -import akka.pattern.after import akka.remote._ import akka.remote.DaemonMsgCreate import akka.remote.QuarantinedEvent import akka.remote.artery.aeron.AeronSink.GaveUpMessageException import akka.remote.artery.ArteryTransport.{ AeronTerminated, ShuttingDown } import akka.remote.artery.Encoder.OutboundCompressionAccess -import akka.remote.artery.Encoder.AccessOutboundCompressionFailed import akka.remote.artery.InboundControlJunction.ControlMessageSubject import akka.remote.artery.OutboundControlJunction.OutboundControlIngress import akka.remote.artery.OutboundHandshake.HandshakeTimeoutException import akka.remote.artery.SystemMessageDelivery.ClearSystemMessageDelivery -import akka.remote.artery.compress.CompressionProtocol._ import akka.remote.artery.compress.CompressionTable import akka.stream.AbruptTerminationException import akka.stream.KillSwitches @@ -44,9 +42,10 @@ import akka.util.{ Unsafe, WildcardIndex } import akka.util.OptionVal import org.agrona.concurrent.ManyToOneConcurrentArrayQueue import akka.stream.SharedKillSwitch - import scala.util.control.NoStackTrace + import akka.actor.Cancellable +import akka.stream.StreamTcpException /** * INTERNAL API @@ -72,6 +71,15 @@ private[remote] object Association { override def isEnabled: Boolean = false } + object RemovedQueueWrapper extends QueueWrapper { + override def queue: java.util.Queue[OutboundEnvelope] = + throw new UnsupportedOperationException("The Queue is removed") + + override def offer(message: OutboundEnvelope): Boolean = false + + override def isEnabled: Boolean = false + } + final case class LazyQueueWrapper(queue: Queue[OutboundEnvelope], materialize: () ⇒ Unit) extends QueueWrapper { private val onlyOnce = new AtomicBoolean @@ -92,9 +100,14 @@ private[remote] object Association { final val LargeQueueIndex = 1 final val OrdinaryQueueIndex = 2 - private object OutboundStreamStopSignal extends RuntimeException with NoStackTrace + sealed trait StopSignal + case object OutboundStreamStopIdleSignal extends RuntimeException("") with StopSignal with NoStackTrace + case object OutboundStreamStopQuarantinedSignal extends RuntimeException("") with StopSignal with NoStackTrace - final case class OutboundStreamMatValues(streamKillSwitch: SharedKillSwitch, completed: Future[Done]) + final case class OutboundStreamMatValues( + streamKillSwitch: OptionVal[SharedKillSwitch], + completed: Future[Done], + stopping: OptionVal[StopSignal]) } /** @@ -117,7 +130,7 @@ private[remote] class Association( import FlightRecorderEvents._ private val log = Logging(transport.system, getClass.getName) - private val flightRecorder = transport.createFlightRecorderEventSink(synchr = true) + private def flightRecorder = transport.topLevelFlightRecorder override def settings = transport.settings private def advancedSettings = transport.settings.Advanced @@ -128,8 +141,13 @@ private[remote] class Association( // the `SendQueue` after materialization. Using same underlying queue. This makes it possible to // start sending (enqueuing) to the Association immediate after construction. - def createQueue(capacity: Int): Queue[OutboundEnvelope] = - new ManyToOneConcurrentArrayQueue[OutboundEnvelope](capacity) + def createQueue(capacity: Int, queueIndex: Int): Queue[OutboundEnvelope] = { + val linked = queueIndex == ControlQueueIndex || queueIndex == LargeQueueIndex + if (linked) + new LinkedBlockingQueue[OutboundEnvelope](capacity) // less memory than ManyToOneConcurrentArrayQueue + else + new ManyToOneConcurrentArrayQueue[OutboundEnvelope](capacity) + } private val outboundLanes = advancedSettings.OutboundLanes private val controlQueueSize = advancedSettings.OutboundControlQueueSize @@ -137,15 +155,15 @@ private[remote] class Association( private val largeQueueSize = advancedSettings.OutboundLargeMessageQueueSize private[this] val queues: Array[SendQueue.ProducerApi[OutboundEnvelope]] = new Array(2 + outboundLanes) - queues(ControlQueueIndex) = QueueWrapperImpl(createQueue(controlQueueSize)) // control stream + queues(ControlQueueIndex) = QueueWrapperImpl(createQueue(controlQueueSize, ControlQueueIndex)) // control stream queues(LargeQueueIndex) = if (transport.largeMessageChannelEnabled) // large messages stream - QueueWrapperImpl(createQueue(largeQueueSize)) + QueueWrapperImpl(createQueue(largeQueueSize, LargeQueueIndex)) else DisabledQueueWrapper (0 until outboundLanes).foreach { i ⇒ - queues(OrdinaryQueueIndex + i) = QueueWrapperImpl(createQueue(queueSize)) // ordinary messages stream + queues(OrdinaryQueueIndex + i) = QueueWrapperImpl(createQueue(queueSize, OrdinaryQueueIndex + i)) // ordinary messages stream } @volatile private[this] var queuesVisibility = false @@ -158,37 +176,27 @@ private[remote] class Association( // in case there is a restart at the same time as a compression table update private val changeCompressionTimeout = 5.seconds - private[remote] def changeActorRefCompression(table: CompressionTable[ActorRef]): Future[Done] = { - import transport.system.dispatcher - val c = outboundCompressionAccess - val result = - if (c.isEmpty) Future.successful(Done) - else if (c.size == 1) c.head.changeActorRefCompression(table) - else Future.sequence(c.map(_.changeActorRefCompression(table))).map(_ ⇒ Done) - timeoutAfter(result, changeCompressionTimeout, new AccessOutboundCompressionFailed) - } // keyed by stream queue index private[this] val streamMatValues = new AtomicReference(Map.empty[Int, OutboundStreamMatValues]) - private[this] val idle = new AtomicReference[Option[Cancellable]](None) - private[remote] def changeClassManifestCompression(table: CompressionTable[String]): Future[Done] = { + private[this] val idleTask = new AtomicReference[Option[Cancellable]](None) + private[this] val quarantinedIdleTask = new AtomicReference[Option[Cancellable]](None) + + private[remote] def changeActorRefCompression(table: CompressionTable[ActorRef]): Future[Done] = + updateOutboundCompression(c ⇒ c.changeActorRefCompression(table)) + + private[remote] def changeClassManifestCompression(table: CompressionTable[String]): Future[Done] = + updateOutboundCompression(c ⇒ c.changeClassManifestCompression(table)) + + private def clearOutboundCompression(): Future[Done] = + updateOutboundCompression(c ⇒ c.clearCompression()) + + private def updateOutboundCompression(action: OutboundCompressionAccess ⇒ Future[Done]): Future[Done] = { import transport.system.dispatcher val c = outboundCompressionAccess - val result = - if (c.isEmpty) Future.successful(Done) - else if (c.size == 1) c.head.changeClassManifestCompression(table) - else Future.sequence(c.map(_.changeClassManifestCompression(table))).map(_ ⇒ Done) - timeoutAfter(result, changeCompressionTimeout, new AccessOutboundCompressionFailed) - } - - private def clearOutboundCompression(): Future[Done] = { - import transport.system.dispatcher - val c = outboundCompressionAccess - val result = - if (c.isEmpty) Future.successful(Done) - else if (c.size == 1) c.head.clearCompression() - else Future.sequence(c.map(_.clearCompression())).map(_ ⇒ Done) - timeoutAfter(result, changeCompressionTimeout, new AccessOutboundCompressionFailed) + if (c.isEmpty) Future.successful(Done) + else if (c.size == 1) action(c.head) + else Future.sequence(c.map(action(_))).map(_ ⇒ Done) } private def clearInboundCompression(originUid: Long): Unit = @@ -197,12 +205,6 @@ private[remote] class Association( case _ ⇒ // do nothing } - private def timeoutAfter[T](f: Future[T], timeout: FiniteDuration, e: ⇒ Throwable): Future[T] = { - import transport.system.dispatcher - val f2 = after(timeout, transport.system.scheduler)(Future.failed(e)) - Future.firstCompletedOf(List(f, f2)) - } - private def deadletters = transport.system.deadLetters def outboundControlIngress: OutboundControlIngress = { @@ -241,7 +243,7 @@ private[remote] class Association( * @return Whether the previous state matched correctly */ @inline - private[this] def swapState(oldState: AssociationState, newState: AssociationState): Boolean = + private[artery] def swapState(oldState: AssociationState, newState: AssociationState): Boolean = Unsafe.instance.compareAndSwapObject(this, AbstractAssociation.sharedStateOffset, oldState, newState) /** @@ -250,6 +252,13 @@ private[remote] class Association( def associationState: AssociationState = Unsafe.instance.getObjectVolatile(this, AbstractAssociation.sharedStateOffset).asInstanceOf[AssociationState] + def setControlIdleKillSwitch(killSwitch: OptionVal[SharedKillSwitch]): Unit = { + val current = associationState + swapState(current, current.withControlIdleKillSwitch(killSwitch)) + if (killSwitch.isDefined) + startIdleTimer() + } + def completeHandshake(peer: UniqueAddress): Future[Done] = { require( remoteAddress == peer.address, @@ -275,7 +284,7 @@ private[remote] class Association( if (swapState(current, newState)) { current.uniqueRemoteAddressValue() match { case Some(old) ⇒ - cancelIdleTimer() + cancelQuarantinedIdleTimer() log.debug( "Incarnation {} of association to [{}] with new UID [{}] (old UID [{}])", newState.incarnation, peer.address, peer.uid, old.uid) @@ -294,11 +303,11 @@ private[remote] class Association( // OutboundContext override def sendControl(message: ControlMessage): Unit = { try { - if (!transport.isShutdown) { + if (!transport.isShutdown && !isRemovedAfterQuarantined()) { if (associationState.isQuarantined()) { log.debug("Send control message [{}] to quarantined [{}]", Logging.messageClassName(message), remoteAddress) - startIdleTimer() + startQuarantinedIdleTimer() } outboundControlIngress.sendControlMessage(message) } @@ -316,20 +325,31 @@ private[remote] class Association( val unused = queuesVisibility def dropped(queueIndex: Int, qSize: Int, env: OutboundEnvelope): Unit = { - log.debug( - "Dropping message [{}] from [{}] to [{}] due to overflow of send queue, size [{}]", - Logging.messageClassName(message), sender.getOrElse(deadletters), recipient.getOrElse(recipient), qSize) + val removed = isRemovedAfterQuarantined() + if (removed) recipient match { + case OptionVal.Some(ref) ⇒ ref.cachedAssociation = null // don't use this Association instance any more + case OptionVal.None ⇒ + } + if (log.isDebugEnabled) { + val reason = + if (removed) "removed unused quarantined association" + else s"overflow of send queue, size [$queueSize]" + log.debug( + "Dropping message [{}] from [{}] to [{}] due to {}", + Logging.messageClassName(message), sender.getOrElse(deadletters), recipient.getOrElse(recipient), reason) + } flightRecorder.hiFreq(Transport_SendQueueOverflow, queueIndex) deadletters ! env } - val quarantined = associationState.isQuarantined() + val state = associationState + val quarantined = state.isQuarantined() // allow ActorSelectionMessage to pass through quarantine, to be able to establish interaction with new system if (message.isInstanceOf[ActorSelectionMessage] || !quarantined || message == ClearSystemMessageDelivery) { if (quarantined && message != ClearSystemMessageDelivery) { log.debug("Quarantine piercing attempt with message [{}] to [{}]", Logging.messageClassName(message), recipient.getOrElse("")) - startIdleTimer() + startQuarantinedIdleTimer() } try { val outboundEnvelope = createOutboundEnvelope() @@ -398,11 +418,23 @@ private[remote] class Association( } } + override def isOrdinaryMessageStreamActive(): Boolean = + isStreamActive(OrdinaryQueueIndex) + + def isStreamActive(queueIndex: Int): Boolean = { + queues(queueIndex) match { + case _: LazyQueueWrapper ⇒ false + case DisabledQueueWrapper ⇒ false + case RemovedQueueWrapper ⇒ false + case _ ⇒ true + } + } + def sendTerminationHint(replyTo: ActorRef): Int = { if (!associationState.isQuarantined()) { val msg = ActorSystemTerminating(localAddress) var sent = 0 - queues.iterator.filter(_.isEnabled).foreach { queue ⇒ + queues.iterator.filter(q ⇒ q.isEnabled && !q.isInstanceOf[LazyQueueWrapper]).foreach { queue ⇒ try { val envelope = outboundEnvelopePool.acquire() .init(OptionVal.None, msg, OptionVal.Some(replyTo)) @@ -439,13 +471,14 @@ private[remote] class Association( "Remote actorsystem must be restarted to recover from this situation. {}", remoteAddress, u, reason) transport.system.eventStream.publish(QuarantinedEvent(remoteAddress, u)) + flightRecorder.loFreq(Transport_Quarantined, s"$remoteAddress - $u") clearOutboundCompression() clearInboundCompression(u) // end delivery of system messages to that incarnation after this point send(ClearSystemMessageDelivery, OptionVal.None, OptionVal.None) // try to tell the other system that we have quarantined it sendControl(Quarantined(localAddress, peer)) - startIdleTimer() + startQuarantinedIdleTimer() } else quarantine(reason, uid) // recursive } @@ -464,20 +497,123 @@ private[remote] class Association( } - private def cancelIdleTimer(): Unit = { - val current = idle.get + /** + * After calling this no messages can be sent with this Association instance + */ + def removedAfterQuarantined(): Unit = { + if (!isRemovedAfterQuarantined()) { + flightRecorder.loFreq(Transport_RemovedQuarantined, remoteAddress.toString) + queues(ControlQueueIndex) = RemovedQueueWrapper + + if (transport.largeMessageChannelEnabled) + queues(LargeQueueIndex) = RemovedQueueWrapper + + (0 until outboundLanes).foreach { i ⇒ + queues(OrdinaryQueueIndex + i) = RemovedQueueWrapper + } + queuesVisibility = true // volatile write for visibility of the queues array + + // cleanup + _outboundControlIngress = OptionVal.None + outboundCompressionAccess = Vector.empty + cancelIdleTimer() + cancelQuarantinedIdleTimer() + abortQuarantined() + + log.info("Unused association to [{}] removed after quarantine", remoteAddress) + } + } + + def isRemovedAfterQuarantined(): Boolean = + queues(ControlQueueIndex) == RemovedQueueWrapper + + private def cancelQuarantinedIdleTimer(): Unit = { + val current = quarantinedIdleTask.get current.foreach(_.cancel()) - idle.compareAndSet(current, None) + quarantinedIdleTask.compareAndSet(current, None) + } + + private def startQuarantinedIdleTimer(): Unit = { + cancelQuarantinedIdleTimer() + quarantinedIdleTask.set(Some(transport.system.scheduler.scheduleOnce(advancedSettings.StopQuarantinedAfterIdle) { + if (associationState.isQuarantined()) + abortQuarantined() + }(transport.system.dispatcher))) + } + + private def abortQuarantined(): Unit = { + streamMatValues.get.foreach { + case (queueIndex, OutboundStreamMatValues(killSwitch, _, _)) ⇒ + killSwitch match { + case OptionVal.Some(k) ⇒ + setStopReason(queueIndex, OutboundStreamStopQuarantinedSignal) + clearStreamKillSwitch(queueIndex, k) + k.abort(OutboundStreamStopQuarantinedSignal) + case OptionVal.None ⇒ // already aborted + } + } + } + + private def cancelIdleTimer(): Unit = { + val current = idleTask.get + current.foreach(_.cancel()) + idleTask.compareAndSet(current, None) } private def startIdleTimer(): Unit = { cancelIdleTimer() - idle.set(Some(transport.system.scheduler.scheduleOnce(advancedSettings.StopQuarantinedAfterIdle) { - if (associationState.isQuarantined()) - streamMatValues.get.valuesIterator.foreach { - case OutboundStreamMatValues(killSwitch, _) ⇒ killSwitch.abort(OutboundStreamStopSignal) + val StopIdleOutboundAfter = settings.Advanced.StopIdleOutboundAfter + val interval = StopIdleOutboundAfter / 2 + val stopIdleOutboundAfterNanos = StopIdleOutboundAfter.toNanos + val initialDelay = settings.Advanced.ConnectionTimeout.max(StopIdleOutboundAfter) + 1.second + val task: Cancellable = transport.system.scheduler.schedule(initialDelay, interval) { + if (System.nanoTime() - associationState.lastUsedTimestamp.get >= stopIdleOutboundAfterNanos) { + streamMatValues.get.foreach { + case (queueIndex, OutboundStreamMatValues(streamKillSwitch, _, stopping)) ⇒ + if (isStreamActive(queueIndex) && stopping.isEmpty) { + if (queueIndex != ControlQueueIndex) { + streamKillSwitch match { + case OptionVal.Some(k) ⇒ + // for non-control streams we can stop the entire stream + log.info("Stopping idle outbound stream [{}] to [{}]", queueIndex, remoteAddress) + flightRecorder.loFreq(Transport_StopIdleOutbound, s"$remoteAddress - $queueIndex") + setStopReason(queueIndex, OutboundStreamStopIdleSignal) + clearStreamKillSwitch(queueIndex, k) + k.abort(OutboundStreamStopIdleSignal) + case OptionVal.None ⇒ // already aborted + } + + } else { + // only stop the transport parts of the stream because SystemMessageDelivery stage has + // state (seqno) and system messages might be sent at the same time + associationState.controlIdleKillSwitch match { + case OptionVal.Some(killSwitch) ⇒ + log.info("Stopping idle outbound control stream to [{}]", remoteAddress) + flightRecorder.loFreq(Transport_StopIdleOutbound, s"$remoteAddress - $queueIndex") + setControlIdleKillSwitch(OptionVal.None) + killSwitch.abort(OutboundStreamStopIdleSignal) + case OptionVal.None ⇒ + log.debug( + "Couldn't stop idle outbound control stream to [{}] due to missing KillSwitch.", + remoteAddress) + } + } + } } - }(transport.system.dispatcher))) + + cancelIdleTimer() + } + }(transport.system.dispatcher) + + if (!idleTask.compareAndSet(None, Some(task))) { + // another thread did same thing and won + task.cancel() + } + + } + + private def sendToDeadLetters[T](pending: Vector[OutboundEnvelope]): Unit = { + pending.foreach(transport.system.deadLetters ! _) } /** @@ -492,6 +628,7 @@ private[remote] class Association( if (!controlQueue.isInstanceOf[QueueWrapper]) throw new IllegalStateException("associate() must only be called once") runOutboundStreams() + startIdleTimer() } private def runOutboundStreams(): Unit = { @@ -515,8 +652,15 @@ private[remote] class Association( val streamKillSwitch = KillSwitches.shared("outboundControlStreamKillSwitch") + def sendQueuePostStop[T](pending: Vector[OutboundEnvelope]): Unit = { + sendToDeadLetters(pending) + val systemMessagesCount = pending.count(env ⇒ env.message.isInstanceOf[SystemMessage]) + if (systemMessagesCount > 0) + quarantine(s"SendQueue stopped with [$systemMessagesCount] pending system messages.") + } + val (queueValue, (control, completed)) = - Source.fromGraph(new SendQueue[OutboundEnvelope](transport.system.deadLetters)) + Source.fromGraph(new SendQueue[OutboundEnvelope](sendQueuePostStop)) .via(streamKillSwitch.flow) .toMat(transport.outboundControl(this))(Keep.both) .run()(materializer) @@ -539,12 +683,15 @@ private[remote] class Association( case existing: QueueWrapper ⇒ existing case _ ⇒ // use new queue for restarts - QueueWrapperImpl(createQueue(capacity)) + QueueWrapperImpl(createQueue(capacity, queueIndex)) } } private def runOutboundOrdinaryMessagesStream(): Unit = { if (transport.isShutdown) throw ShuttingDown + + val streamKillSwitch = KillSwitches.shared("outboundMessagesKillSwitch") + if (outboundLanes == 1) { log.debug("Starting outbound message stream to [{}]", remoteAddress) val queueIndex = OrdinaryQueueIndex @@ -552,10 +699,8 @@ private[remote] class Association( queues(queueIndex) = wrapper // use new underlying queue immediately for restarts queuesVisibility = true // volatile write for visibility of the queues array - val streamKillSwitch = KillSwitches.shared("outboundMessagesKillSwitch") - val (queueValue, testMgmt, changeCompression, completed) = - Source.fromGraph(new SendQueue[OutboundEnvelope](transport.system.deadLetters)) + Source.fromGraph(new SendQueue[OutboundEnvelope](sendToDeadLetters)) .via(streamKillSwitch.flow) .viaMat(transport.outboundTestFlow(this))(Keep.both) .toMat(transport.outbound(this))({ case ((a, b), (c, d)) ⇒ (a, b, c, d) }) // "keep all, exploded" @@ -580,9 +725,7 @@ private[remote] class Association( wrapper }.toVector - val streamKillSwitch = KillSwitches.shared("outboundMessagesKillSwitch") - - val lane = Source.fromGraph(new SendQueue[OutboundEnvelope](transport.system.deadLetters)) + val lane = Source.fromGraph(new SendQueue[OutboundEnvelope](sendToDeadLetters)) .via(streamKillSwitch.flow) .via(transport.outboundTestFlow(this)) .viaMat(transport.outboundLane(this))(Keep.both) @@ -637,7 +780,7 @@ private[remote] class Association( val streamKillSwitch = KillSwitches.shared("outboundLargeMessagesKillSwitch") - val (queueValue, completed) = Source.fromGraph(new SendQueue[OutboundEnvelope](transport.system.deadLetters)) + val (queueValue, completed) = Source.fromGraph(new SendQueue[OutboundEnvelope](sendToDeadLetters)) .via(streamKillSwitch.flow) .via(transport.outboundTestFlow(this)) .toMat(transport.outboundLarge(this))(Keep.both) @@ -657,13 +800,21 @@ private[remote] class Association( streamCompleted: Future[Done], restart: () ⇒ Unit): Unit = { def lazyRestart(): Unit = { + flightRecorder.loFreq(Transport_RestartOutbound, s"$remoteAddress - $streamName") outboundCompressionAccess = Vector.empty if (queueIndex == ControlQueueIndex) { materializing = new CountDownLatch(1) _outboundControlIngress = OptionVal.None } // LazyQueueWrapper will invoke the `restart` function when first message is offered - queues(queueIndex) = LazyQueueWrapper(createQueue(queueCapacity), restart) + val restartAndStartIdleTimer: () ⇒ Unit = () ⇒ { + restart() + startIdleTimer() + } + + if (!isRemovedAfterQuarantined()) + queues(queueIndex) = LazyQueueWrapper(createQueue(queueCapacity, queueIndex), restartAndStartIdleTimer) + queuesVisibility = true // volatile write for visibility of the queues array } @@ -676,47 +827,73 @@ private[remote] class Association( streamCompleted.failed.foreach { case ArteryTransport.ShutdownSignal ⇒ // shutdown as expected + cancelIdleTimer() // countDown the latch in case threads are waiting on the latch in outboundControlIngress method materializing.countDown() - case _: AeronTerminated ⇒ // shutdown already in progress - case cause if transport.isShutdown ⇒ + case cause if transport.isShutdown || isRemovedAfterQuarantined() ⇒ // don't restart after shutdown, but log some details so we notice - log.error(cause, s"{} to [{}] failed after shutdown. {}", streamName, remoteAddress, cause.getMessage) + // for the TCP transport the ShutdownSignal is "converted" to StreamTcpException + if (!cause.isInstanceOf[StreamTcpException]) + log.error(cause, s"{} to [{}] failed after shutdown. {}", streamName, remoteAddress, cause.getMessage) + cancelIdleTimer() // countDown the latch in case threads are waiting on the latch in outboundControlIngress method materializing.countDown() - case _: AbruptTerminationException ⇒ // ActorSystem shutdown - case OutboundStreamStopSignal ⇒ - // stop as expected due to quarantine - log.debug("{} to [{}] stopped. It will be restarted if used again.", streamName, remoteAddress) - lazyRestart() - case cause: GaveUpMessageException ⇒ - log.debug("{} to [{}] failed. Restarting it. {}", streamName, remoteAddress, cause.getMessage) - // restart unconditionally, without counting restarts - lazyRestart() + case _: AeronTerminated ⇒ + // shutdown already in progress + cancelIdleTimer() + case _: AbruptTerminationException ⇒ + // ActorSystem shutdown + cancelIdleTimer() case cause ⇒ - if (queueIndex == ControlQueueIndex) { + + // it might have been stopped as expected due to idle or quarantine + // for the TCP transport the exception is "converted" to StreamTcpException + val stoppedIdle = cause == OutboundStreamStopIdleSignal || + getStopReason(queueIndex).contains(OutboundStreamStopIdleSignal) + val stoppedQuarantined = cause == OutboundStreamStopQuarantinedSignal || + getStopReason(queueIndex).contains(OutboundStreamStopQuarantinedSignal) + + // for some cases restart unconditionally, without counting restarts + val bypassRestartCounter = cause match { + case _: GaveUpMessageException ⇒ true + case _ ⇒ stoppedIdle || stoppedQuarantined + } + + if (queueIndex == ControlQueueIndex && !stoppedQuarantined) { cause match { case _: HandshakeTimeoutException ⇒ // ok, quarantine not possible without UID case _ ⇒ - // FIXME can we avoid quarantine if all system messages have been delivered? - quarantine("Outbound control stream restarted") + // Must quarantine in case all system messages haven't been delivered. + // See also comment in the stoppedIdle case below + quarantine(s"Outbound control stream restarted. $cause") } } - if (restartCounter.restart()) { + if (stoppedIdle) { + log.debug("{} to [{}] was idle and stopped. It will be restarted if used again.", streamName, remoteAddress) + lazyRestart() + } else if (stoppedQuarantined) { + log.debug("{} to [{}] was quarantined and stopped. It will be restarted if used again.", streamName, remoteAddress) + lazyRestart() + } else if (bypassRestartCounter || restartCounter.restart()) { log.error(cause, "{} to [{}] failed. Restarting it. {}", streamName, remoteAddress, cause.getMessage) lazyRestart() } else { log.error(cause, s"{} to [{}] failed and restarted {} times within {} seconds. Terminating system. ${cause.getMessage}", streamName, remoteAddress, advancedSettings.OutboundMaxRestarts, advancedSettings.OutboundRestartTimeout.toSeconds) + cancelIdleTimer() transport.system.terminate() } } } - private def updateStreamMatValues(streamId: Int, streamKillSwitch: SharedKillSwitch, completed: Future[Done]): Unit = { + private def updateStreamMatValues(streamId: Int, streamKillSwitch: SharedKillSwitch, + completed: Future[Done]): Unit = { implicit val ec = materializer.executionContext - updateStreamMatValues(streamId, OutboundStreamMatValues(streamKillSwitch, completed.recover { case _ ⇒ Done })) + updateStreamMatValues( + streamId, + OutboundStreamMatValues(OptionVal.Some(streamKillSwitch), completed.recover { case _ ⇒ Done }, + stopping = OptionVal.None)) } @tailrec private def updateStreamMatValues(streamId: Int, values: OutboundStreamMatValues): Unit = { @@ -726,6 +903,37 @@ private[remote] class Association( } } + @tailrec private def setStopReason(streamId: Int, stopSignal: StopSignal): Unit = { + val prev = streamMatValues.get() + prev.get(streamId) match { + case Some(v) ⇒ + if (!streamMatValues.compareAndSet(prev, prev.updated(streamId, v.copy(stopping = OptionVal.Some(stopSignal))))) + setStopReason(streamId, stopSignal) + case None ⇒ throw new IllegalStateException(s"Expected streamMatValues for [$streamId]") + } + } + + private def getStopReason(streamId: Int): OptionVal[StopSignal] = { + streamMatValues.get().get(streamId) match { + case Some(OutboundStreamMatValues(_, _, stopping)) ⇒ stopping + case None ⇒ OptionVal.None + } + } + + // after it has been used we remove the kill switch to cleanup some memory, + // not a "leak" but a KillSwitch is rather heavy + @tailrec private def clearStreamKillSwitch(streamId: Int, old: SharedKillSwitch): Unit = { + val prev = streamMatValues.get() + prev.get(streamId) match { + case Some(v) ⇒ + if (v.streamKillSwitch.isDefined && (v.streamKillSwitch.get eq old)) { + if (!streamMatValues.compareAndSet(prev, prev.updated(streamId, v.copy(streamKillSwitch = OptionVal.None)))) + clearStreamKillSwitch(streamId, old) + } + case None ⇒ throw new IllegalStateException(s"Expected streamMatValues for [$streamId]") + } + } + /** * Exposed for orderly shutdown purposes, can not be trusted except for during shutdown as streams may restart. * Will complete successfully even if one of the stream completion futures failed @@ -733,7 +941,7 @@ private[remote] class Association( def streamsCompleted: Future[Done] = { implicit val ec = materializer.executionContext Future.sequence(streamMatValues.get().values.map { - case OutboundStreamMatValues(_, done) ⇒ done + case OutboundStreamMatValues(_, done, _) ⇒ done }).map(_ ⇒ Done) } @@ -774,9 +982,10 @@ private[remote] class AssociationRegistry(createAssociation: Address ⇒ Associa * @throws ShuttingDown if called while the transport is shutting down */ @tailrec final def setUID(peer: UniqueAddress): Association = { - val currentMap = associationsByUid.get + // Don't create a new association via this method. It's supposed to exist unless it was removed after quarantined. val a = association(peer.address) + val currentMap = associationsByUid.get currentMap.get(peer.uid) match { case OptionVal.Some(previous) ⇒ if (previous eq a) @@ -797,4 +1006,50 @@ private[remote] class AssociationRegistry(createAssociation: Address ⇒ Associa def allAssociations: Set[Association] = associationsByAddress.get.values.toSet + + def removeUnusedQuarantined(after: FiniteDuration): Unit = { + removeUnusedQuarantinedByAddress(after) + removeUnusedQuarantinedByUid(after) + } + + @tailrec private def removeUnusedQuarantinedByAddress(after: FiniteDuration): Unit = { + val now = System.nanoTime() + val afterNanos = after.toNanos + val currentMap = associationsByAddress.get + val remove = currentMap.foldLeft(Map.empty[Address, Association]) { + case (acc, (address, association)) ⇒ + val state = association.associationState + if (state.isQuarantined() && ((now - state.lastUsedTimestamp.get) >= afterNanos)) + acc.updated(address, association) + else + acc + } + if (remove.nonEmpty) { + val newMap = currentMap -- remove.keysIterator + if (associationsByAddress.compareAndSet(currentMap, newMap)) + remove.valuesIterator.foreach(_.removedAfterQuarantined()) + else + removeUnusedQuarantinedByAddress(after) // CAS fail, recursive + } + } + + @tailrec private def removeUnusedQuarantinedByUid(after: FiniteDuration): Unit = { + val now = System.nanoTime() + val afterNanos = after.toNanos + val currentMap = associationsByUid.get + var remove = Map.empty[Long, Association] + currentMap.keysIterator.foreach { uid ⇒ + val association = currentMap.get(uid).get + val state = association.associationState + if (state.isQuarantined() && ((now - state.lastUsedTimestamp.get) >= afterNanos)) + remove = remove.updated(uid, association) + } + if (remove.nonEmpty) { + val newMap = remove.keysIterator.foldLeft(currentMap)((acc, uid) ⇒ acc.remove(uid)) + if (associationsByUid.compareAndSet(currentMap, newMap)) + remove.valuesIterator.foreach(_.removedAfterQuarantined()) + else + removeUnusedQuarantinedByUid(after) // CAS fail, recursive + } + } } diff --git a/akka-remote/src/main/scala/akka/remote/artery/Codecs.scala b/akka-remote/src/main/scala/akka/remote/artery/Codecs.scala index 2029ba46e7..cb52065d81 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/Codecs.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/Codecs.scala @@ -35,10 +35,6 @@ private[remote] object Encoder { def changeClassManifestCompression(table: CompressionTable[String]): Future[Done] def clearCompression(): Future[Done] } - - private[remote] class AccessOutboundCompressionFailed - extends RuntimeException("Change of outbound compression table failed (will be retried), because materialization did not complete yet") - } /** @@ -71,22 +67,17 @@ private[remote] class Encoder( private val instruments: RemoteInstruments = RemoteInstruments(system) - private val changeActorRefCompressionCb = getAsyncCallback[(CompressionTable[ActorRef], Promise[Done])] { - case (table, done) ⇒ - headerBuilder.setOutboundActorRefCompression(table) - done.success(Done) + private val changeActorRefCompressionCb = getAsyncCallback[CompressionTable[ActorRef]] { table ⇒ + headerBuilder.setOutboundActorRefCompression(table) } - private val changeClassManifsetCompressionCb = getAsyncCallback[(CompressionTable[String], Promise[Done])] { - case (table, done) ⇒ - headerBuilder.setOutboundClassManifestCompression(table) - done.success(Done) + private val changeClassManifsetCompressionCb = getAsyncCallback[CompressionTable[String]] { table ⇒ + headerBuilder.setOutboundClassManifestCompression(table) } - private val clearCompressionCb = getAsyncCallback[Promise[Done]] { done ⇒ + private val clearCompressionCb = getAsyncCallback[Unit] { _ ⇒ headerBuilder.setOutboundActorRefCompression(CompressionTable.empty[ActorRef]) headerBuilder.setOutboundClassManifestCompression(CompressionTable.empty[String]) - done.success(Done) } override protected def logSource = classOf[Encoder] @@ -177,40 +168,20 @@ private[remote] class Encoder( /** * External call from ChangeOutboundCompression materialized value */ - override def changeActorRefCompression(table: CompressionTable[ActorRef]): Future[Done] = { - val done = Promise[Done]() - try changeActorRefCompressionCb.invoke((table, done)) catch { - // This is a harmless failure, it will be retried on next advertisement or handshake attempt. - // It will only occur when callback is invoked before preStart. That is highly unlikely to - // happen since advertisement is not done immediately and handshake involves network roundtrip. - case NonFatal(_) ⇒ done.tryFailure(new AccessOutboundCompressionFailed) - } - done.future - } + override def changeActorRefCompression(table: CompressionTable[ActorRef]): Future[Done] = + changeActorRefCompressionCb.invokeWithFeedback(table) /** * External call from ChangeOutboundCompression materialized value */ - override def changeClassManifestCompression(table: CompressionTable[String]): Future[Done] = { - val done = Promise[Done]() - try changeClassManifsetCompressionCb.invoke((table, done)) catch { - // in case materialization not completed yet - case NonFatal(_) ⇒ done.tryFailure(new AccessOutboundCompressionFailed) - } - done.future - } + override def changeClassManifestCompression(table: CompressionTable[String]): Future[Done] = + changeClassManifsetCompressionCb.invokeWithFeedback(table) /** * External call from ChangeOutboundCompression materialized value */ - override def clearCompression(): Future[Done] = { - val done = Promise[Done]() - try clearCompressionCb.invoke(done) catch { - // in case materialization not completed yet - case NonFatal(_) ⇒ done.tryFailure(new AccessOutboundCompressionFailed) - } - done.future - } + override def clearCompression(): Future[Done] = + clearCompressionCb.invokeWithFeedback(()) setHandlers(in, out, this) } @@ -240,6 +211,9 @@ private[remote] object Decoder { def runNextActorRefAdvertisement(): Unit /** For testing purposes, usually triggered by timer from within Decoder stage. */ def runNextClassManifestAdvertisement(): Unit + /** For testing purposes */ + def currentCompressionOriginUids: Future[Set[Long]] + } private[remote] trait InboundCompressionAccessImpl extends InboundCompressionAccess { @@ -247,20 +221,16 @@ private[remote] object Decoder { def compressions: InboundCompressions - private val closeCompressionForCb = getAsyncCallback[(Long, Promise[Done])] { - case (uid, done) ⇒ - compressions.close(uid) - done.success(Done) + private val closeCompressionForCb = getAsyncCallback[Long] { uid ⇒ + compressions.close(uid) } - private val confirmActorRefCompressionAdvertisementCb = getAsyncCallback[(ActorRefCompressionAdvertisementAck, Promise[Done])] { - case (ActorRefCompressionAdvertisementAck(from, tableVersion), done) ⇒ + private val confirmActorRefCompressionAdvertisementCb = getAsyncCallback[ActorRefCompressionAdvertisementAck] { + case ActorRefCompressionAdvertisementAck(from, tableVersion) ⇒ compressions.confirmActorRefCompressionAdvertisement(from.uid, tableVersion) - done.success(Done) } - private val confirmClassManifestCompressionAdvertisementCb = getAsyncCallback[(ClassManifestCompressionAdvertisementAck, Promise[Done])] { - case (ClassManifestCompressionAdvertisementAck(from, tableVersion), done) ⇒ + private val confirmClassManifestCompressionAdvertisementCb = getAsyncCallback[ClassManifestCompressionAdvertisementAck] { + case ClassManifestCompressionAdvertisementAck(from, tableVersion) ⇒ compressions.confirmClassManifestCompressionAdvertisement(from.uid, tableVersion) - done.success(Done) } private val runNextActorRefAdvertisementCb = getAsyncCallback[Unit] { _ ⇒ compressions.runNextActorRefAdvertisement() @@ -268,55 +238,49 @@ private[remote] object Decoder { private val runNextClassManifestAdvertisementCb = getAsyncCallback[Unit] { _ ⇒ compressions.runNextClassManifestAdvertisement() } - - // TODO in practice though all those CB's will always succeed, no need for the futures etc IMO + private val currentCompressionOriginUidsCb = getAsyncCallback[Promise[Set[Long]]] { p ⇒ + p.success(compressions.currentOriginUids) + } /** * External call from ChangeInboundCompression materialized value */ - override def closeCompressionFor(originUid: Long): Future[Done] = { - val done = Promise[Done]() - try closeCompressionForCb.invoke((originUid, done)) catch { - // in case materialization not completed yet - case NonFatal(_) ⇒ done.tryFailure(new AccessInboundCompressionFailed) - } - done.future - } + override def closeCompressionFor(originUid: Long): Future[Done] = + closeCompressionForCb.invokeWithFeedback(originUid) + /** * External call from ChangeInboundCompression materialized value */ - override def confirmActorRefCompressionAdvertisementAck(ack: ActorRefCompressionAdvertisementAck): Future[Done] = { - val done = Promise[Done]() - try confirmActorRefCompressionAdvertisementCb.invoke((ack, done)) catch { - // in case materialization not completed yet - case NonFatal(_) ⇒ done.tryFailure(new AccessInboundCompressionFailed) - } - done.future - } + override def confirmActorRefCompressionAdvertisementAck(ack: ActorRefCompressionAdvertisementAck): Future[Done] = + confirmActorRefCompressionAdvertisementCb.invokeWithFeedback(ack) + /** * External call from ChangeInboundCompression materialized value */ - override def confirmClassManifestCompressionAdvertisementAck(ack: ClassManifestCompressionAdvertisementAck): Future[Done] = { - val done = Promise[Done]() - try confirmClassManifestCompressionAdvertisementCb.invoke((ack, done)) catch { - case NonFatal(_) ⇒ done.tryFailure(new AccessInboundCompressionFailed) - } - done.future - } + override def confirmClassManifestCompressionAdvertisementAck(ack: ClassManifestCompressionAdvertisementAck): Future[Done] = + confirmClassManifestCompressionAdvertisementCb.invokeWithFeedback(ack) + /** * External call from ChangeInboundCompression materialized value */ override def runNextActorRefAdvertisement(): Unit = runNextActorRefAdvertisementCb.invoke(()) + /** * External call from ChangeInboundCompression materialized value */ override def runNextClassManifestAdvertisement(): Unit = runNextClassManifestAdvertisementCb.invoke(()) - } - private[remote] class AccessInboundCompressionFailed - extends RuntimeException("Change of inbound compression table failed (will be retried), because materialization did not complete yet") + /** + * External call from ChangeInboundCompression materialized value + */ + override def currentCompressionOriginUids: Future[Set[Long]] = { + val p = Promise[Set[Long]] + currentCompressionOriginUidsCb.invoke(p) + p.future + } + } // timer keys private case object AdvertiseActorRefsCompressionTable diff --git a/akka-remote/src/main/scala/akka/remote/artery/Control.scala b/akka-remote/src/main/scala/akka/remote/artery/Control.scala index ead5d99372..e5f06e2f73 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/Control.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/Control.scala @@ -7,6 +7,8 @@ import java.util.ArrayDeque import scala.concurrent.Future import scala.concurrent.Promise +import scala.util.Try + import akka.Done import akka.stream.Attributes import akka.stream.FlowShape @@ -61,7 +63,6 @@ private[remote] object InboundControlJunction { private[remote] trait ControlMessageSubject { def attach(observer: ControlMessageObserver): Future[Done] def detach(observer: ControlMessageObserver): Unit - def stopped: Future[Done] } private[remote] trait ControlMessageObserver { @@ -71,6 +72,8 @@ private[remote] object InboundControlJunction { * of the envelope is always a `ControlMessage`. */ def notify(inboundEnvelope: InboundEnvelope): Unit + + def controlSubjectCompleted(signal: Try[Done]): Unit } // messages for the stream callback @@ -92,7 +95,6 @@ private[remote] class InboundControlJunction override val shape: FlowShape[InboundEnvelope, InboundEnvelope] = FlowShape(in, out) override def createLogicAndMaterializedValue(inheritedAttributes: Attributes) = { - val stoppedPromise = Promise[Done]() val logic = new GraphStageLogic(shape) with InHandler with OutHandler with ControlMessageSubject { private var observers: Vector[ControlMessageObserver] = Vector.empty @@ -105,7 +107,10 @@ private[remote] class InboundControlJunction observers = observers.filterNot(_ == observer) } - override def postStop(): Unit = stoppedPromise.success(Done) + override def postStop(): Unit = { + observers.foreach(_.controlSubjectCompleted(Try(Done))) + observers = Vector.empty + } // InHandler override def onPush(): Unit = { @@ -133,8 +138,6 @@ private[remote] class InboundControlJunction override def detach(observer: ControlMessageObserver): Unit = callback.invoke(Dettach(observer)) - override def stopped: Future[Done] = - stoppedPromise.future } (logic, logic) diff --git a/akka-remote/src/main/scala/akka/remote/artery/FlightRecorderEvents.scala b/akka-remote/src/main/scala/akka/remote/artery/FlightRecorderEvents.scala index a32953a432..f831afa71b 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/FlightRecorderEvents.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/FlightRecorderEvents.scala @@ -24,6 +24,11 @@ private[remote] object FlightRecorderEvents { val Transport_MediaFileDeleted = 11 val Transport_FlightRecorderClose = 12 val Transport_SendQueueOverflow = 13 + val Transport_StopIdleOutbound = 14 + val Transport_Quarantined = 15 + val Transport_RemovedQuarantined = 16 + val Transport_RestartOutbound = 17 + val Transport_RestartInbound = 18 // Aeron Sink events val AeronSink_Started = 50 @@ -75,6 +80,11 @@ private[remote] object FlightRecorderEvents { Transport_MediaFileDeleted → "Transport: Media file deleted", Transport_FlightRecorderClose → "Transport: Flight recorder closed", Transport_SendQueueOverflow → "Transport: Send queue overflow", + Transport_StopIdleOutbound -> "Transport: Remove idle outbound", + Transport_Quarantined -> "Transport: Quarantined association", + Transport_RemovedQuarantined -> "Transport: Removed idle quarantined association", + Transport_RestartOutbound -> "Transport: Restart outbound", + Transport_RestartInbound -> "Transport: Restart outbound", // Aeron Sink events AeronSink_Started → "AeronSink: Started", diff --git a/akka-remote/src/main/scala/akka/remote/artery/Handshake.scala b/akka-remote/src/main/scala/akka/remote/artery/Handshake.scala index 94f9687375..33eb86cf0a 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/Handshake.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/Handshake.scala @@ -132,6 +132,7 @@ private[remote] class OutboundHandshake( scheduleOnce(InjectHandshakeTick, injectHandshakeInterval) val env: OutboundEnvelope = outboundEnvelopePool.acquire().init( recipient = OptionVal.None, message = HandshakeReq(outboundContext.localAddress, outboundContext.remoteAddress), sender = OptionVal.None) + outboundContext.associationState.lastUsedTimestamp.set(System.nanoTime()) push(out, env) } diff --git a/akka-remote/src/main/scala/akka/remote/artery/SendQueue.scala b/akka-remote/src/main/scala/akka/remote/artery/SendQueue.scala index 6ffe3472a5..c37d6d4e59 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/SendQueue.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/SendQueue.scala @@ -47,7 +47,8 @@ private[remote] object SendQueue { /** * INTERNAL API */ -private[remote] final class SendQueue[T](deadLetters: ActorRef) extends GraphStageWithMaterializedValue[SourceShape[T], SendQueue.QueueValue[T]] { +private[remote] final class SendQueue[T](postStopAction: Vector[T] ⇒ Unit) + extends GraphStageWithMaterializedValue[SourceShape[T], SendQueue.QueueValue[T]] { import SendQueue._ val out: Outlet[T] = Outlet("SendQueue.out") @@ -105,15 +106,17 @@ private[remote] final class SendQueue[T](deadLetters: ActorRef) extends GraphSta } override def postStop(): Unit = { - // TODO quarantine will currently always be done when control stream is terminated, see issue #21359 + var pending = Vector.newBuilder[T] if (consumerQueue ne null) { var msg = consumerQueue.poll() while (msg != null) { - deadLetters ! msg + pending += msg msg = consumerQueue.poll() } consumerQueue.clear() } + postStopAction(pending.result()) + super.postStop() } diff --git a/akka-remote/src/main/scala/akka/remote/artery/SystemMessageDelivery.scala b/akka-remote/src/main/scala/akka/remote/artery/SystemMessageDelivery.scala index d87b24db63..8b617d92cb 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/SystemMessageDelivery.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/SystemMessageDelivery.scala @@ -31,7 +31,6 @@ import scala.util.control.NoStackTrace * INTERNAL API */ private[remote] object SystemMessageDelivery { - // FIXME serialization of these messages final case class SystemMessageEnvelope(message: AnyRef, seqNo: Long, ackReplyTo: UniqueAddress) extends ArteryMessage final case class Ack(seqNo: Long, from: UniqueAddress) extends Reply final case class Nack(seqNo: Long, from: UniqueAddress) extends Reply @@ -89,19 +88,14 @@ private[remote] class SystemMessageDelivery( pull(in) // onPull from downstream already called }.invoke } - - outboundContext.controlSubject.stopped.onComplete { - getAsyncCallback[Try[Done]] { - case Success(_) ⇒ completeStage() - case Failure(cause) ⇒ failStage(cause) - }.invoke - } } override def postStop(): Unit = { - // TODO quarantine will currently always be done when control stream is terminated, see issue #21359 + val pendingCount = unacknowledged.size sendUnacknowledgedToDeadLetters() unacknowledged.clear() + if (pendingCount > 0) + outboundContext.quarantine(s"SystemMessageDelivery stopped with [$pendingCount] pending system messages.") outboundContext.controlSubject.detach(this) } @@ -133,6 +127,14 @@ private[remote] class SystemMessageDelivery( } } + // ControlMessageObserver, external call + override def controlSubjectCompleted(signal: Try[Done]): Unit = { + getAsyncCallback[Try[Done]] { + case Success(_) ⇒ completeStage() + case Failure(cause) ⇒ failStage(cause) + }.invoke(signal) + } + private val ackCallback = getAsyncCallback[Ack] { reply ⇒ ack(reply.seqNo) } diff --git a/akka-remote/src/main/scala/akka/remote/artery/aeron/ArteryAeronUdpTransport.scala b/akka-remote/src/main/scala/akka/remote/artery/aeron/ArteryAeronUdpTransport.scala index 0621229f6d..c70900ebb9 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/aeron/ArteryAeronUdpTransport.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/aeron/ArteryAeronUdpTransport.scala @@ -24,6 +24,7 @@ import akka.remote.RemoteActorRefProvider import akka.remote.RemoteTransportException import akka.remote.artery.compress._ import akka.stream.KillSwitches +import akka.stream.SharedKillSwitch import akka.stream.scaladsl.Flow import akka.stream.scaladsl.Keep import akka.stream.scaladsl.Sink @@ -72,12 +73,12 @@ private[remote] class ArteryAeronUdpTransport(_system: ExtendedActorSystem, _pro startMediaDriver() startAeron() startAeronErrorLog() - topLevelFREvents.loFreq(Transport_AeronErrorLogStarted, NoMetaData) + topLevelFlightRecorder.loFreq(Transport_AeronErrorLogStarted, NoMetaData) if (settings.LogAeronCounters) { startAeronCounterLog() } taskRunner.start() - topLevelFREvents.loFreq(Transport_TaskRunnerStarted, NoMetaData) + topLevelFlightRecorder.loFreq(Transport_TaskRunnerStarted, NoMetaData) } private def startMediaDriver(): Unit = { @@ -119,7 +120,7 @@ private[remote] class ArteryAeronUdpTransport(_system: ExtendedActorSystem, _pro val driver = MediaDriver.launchEmbedded(driverContext) log.info("Started embedded media driver in directory [{}]", driver.aeronDirectoryName) - topLevelFREvents.loFreq(Transport_MediaDriverStarted, driver.aeronDirectoryName()) + topLevelFlightRecorder.loFreq(Transport_MediaDriverStarted, driver.aeronDirectoryName()) if (!mediaDriver.compareAndSet(None, Some(driver))) { throw new IllegalStateException("media driver started more than once") } @@ -145,7 +146,7 @@ private[remote] class ArteryAeronUdpTransport(_system: ExtendedActorSystem, _pro try { if (settings.Advanced.DeleteAeronDirectory) { IoUtil.delete(new File(driver.aeronDirectoryName), false) - topLevelFREvents.loFreq(Transport_MediaFileDeleted, NoMetaData) + topLevelFlightRecorder.loFreq(Transport_MediaFileDeleted, NoMetaData) } } catch { case NonFatal(e) ⇒ @@ -285,11 +286,17 @@ private[remote] class ArteryAeronUdpTransport(_system: ExtendedActorSystem, _pro } } - override protected def outboundTransportSink(outboundContext: OutboundContext, streamId: Int, - bufferPool: EnvelopeBufferPool): Sink[EnvelopeBuffer, Future[Done]] = { + override protected def outboundTransportSink( + outboundContext: OutboundContext, + streamId: Int, + bufferPool: EnvelopeBufferPool): Sink[EnvelopeBuffer, Future[Done]] = { val giveUpAfter = if (streamId == ControlStreamId) settings.Advanced.GiveUpSystemMessageAfter else settings.Advanced.GiveUpMessageAfter + // TODO: Note that the AssociationState.controlStreamIdleKillSwitch in control stream is not used for the + // Aeron transport. Would be difficult to handle the Future[Done] materialized value. + // If we want to stop for Aeron also it is probably easier to stop the publication inside the + // AeronSink, i.e. not using a KillSwitch. Sink.fromGraph(new AeronSink(outboundChannel(outboundContext.remoteAddress), streamId, aeron, taskRunner, bufferPool, giveUpAfter, createFlightRecorderEventSink())) } @@ -395,10 +402,10 @@ private[remote] class ArteryAeronUdpTransport(_system: ExtendedActorSystem, _pro override protected def shutdownTransport(): Future[Done] = { import system.dispatcher taskRunner.stop().map { _ ⇒ - topLevelFREvents.loFreq(Transport_Stopped, NoMetaData) + topLevelFlightRecorder.loFreq(Transport_Stopped, NoMetaData) if (aeronErrorLogTask != null) { aeronErrorLogTask.cancel() - topLevelFREvents.loFreq(Transport_AeronErrorLogTaskStopped, NoMetaData) + topLevelFlightRecorder.loFreq(Transport_AeronErrorLogTaskStopped, NoMetaData) } if (aeron != null) aeron.close() if (aeronErrorLog != null) aeronErrorLog.close() diff --git a/akka-remote/src/main/scala/akka/remote/artery/compress/InboundCompressions.scala b/akka-remote/src/main/scala/akka/remote/artery/compress/InboundCompressions.scala index 461f6c54e1..5aa2a3fc0b 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/compress/InboundCompressions.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/compress/InboundCompressions.scala @@ -4,19 +4,19 @@ package akka.remote.artery.compress -import java.util.concurrent.atomic.AtomicReference -import java.util.function.{ Consumer, LongFunction } +import java.util.function.LongFunction -import akka.actor.{ ActorRef, ActorSystem, Address } -import akka.event.{ Logging, LoggingAdapter } +import scala.annotation.tailrec + +import akka.actor.ActorRef +import akka.actor.ActorSystem +import akka.actor.Address +import akka.event.Logging +import akka.event.LoggingAdapter import akka.remote.artery._ import akka.util.OptionVal import org.agrona.collections.Long2ObjectHashMap -import scala.annotation.tailrec -import java.util.concurrent.atomic.AtomicBoolean -import java.util.concurrent.atomic.AtomicInteger - /** * INTERNAL API * Decompress and cause compression advertisements. @@ -36,6 +36,8 @@ private[remote] trait InboundCompressions { /** Triggers compression advertisement via control message. */ def runNextClassManifestAdvertisement(): Unit + def currentOriginUids: Set[Long] + /** * Remove compression and cancel advertisement scheduling for a specific origin */ @@ -47,6 +49,7 @@ private[remote] trait InboundCompressions { * INTERNAL API * * One per incoming Aeron stream, actual compression tables are kept per-originUid and created on demand. + * All access is via the Decoder stage. */ private[remote] final class InboundCompressionsImpl( system: ActorSystem, @@ -54,101 +57,104 @@ private[remote] final class InboundCompressionsImpl( settings: ArterySettings.Compression, eventSink: EventSink = IgnoreEventSink) extends InboundCompressions { - // None is used as tombstone value after closed - // TODO would be nice if we can cleanup the tombstones - // FIXME we should be able to remove the tombstones easily now - private[this] val _actorRefsIns = new Long2ObjectHashMap[Option[InboundActorRefCompression]]() + private[this] val _actorRefsIns = new Long2ObjectHashMap[InboundActorRefCompression]() private[this] val _inboundActorRefsLog = Logging(system, classOf[InboundActorRefCompression]) - private val createInboundActorRefsForOrigin = new LongFunction[Option[InboundActorRefCompression]] { - override def apply(originUid: Long): Option[InboundActorRefCompression] = { + private val createInboundActorRefsForOrigin = new LongFunction[InboundActorRefCompression] { + override def apply(originUid: Long): InboundActorRefCompression = { val actorRefHitters = new TopHeavyHitters[ActorRef](settings.ActorRefs.Max) - Some(new InboundActorRefCompression(_inboundActorRefsLog, settings, originUid, inboundContext, actorRefHitters)) + new InboundActorRefCompression(_inboundActorRefsLog, settings, originUid, inboundContext, actorRefHitters) } } - private def actorRefsIn(originUid: Long): Option[InboundActorRefCompression] = + private def actorRefsIn(originUid: Long): InboundActorRefCompression = _actorRefsIns.computeIfAbsent(originUid, createInboundActorRefsForOrigin) - // None is used as tombstone value after closed - private[this] val _classManifestsIns = new Long2ObjectHashMap[Option[InboundManifestCompression]]() + private[this] val _classManifestsIns = new Long2ObjectHashMap[InboundManifestCompression]() private[this] val _inboundManifestLog = Logging(system, classOf[InboundManifestCompression]) - private val createInboundManifestsForOrigin = new LongFunction[Option[InboundManifestCompression]] { - override def apply(originUid: Long): Option[InboundManifestCompression] = { + private val createInboundManifestsForOrigin = new LongFunction[InboundManifestCompression] { + override def apply(originUid: Long): InboundManifestCompression = { val manifestHitters = new TopHeavyHitters[String](settings.Manifests.Max) - Some(new InboundManifestCompression(_inboundManifestLog, settings, originUid, inboundContext, manifestHitters)) + new InboundManifestCompression(_inboundManifestLog, settings, originUid, inboundContext, manifestHitters) } } - private def classManifestsIn(originUid: Long): Option[InboundManifestCompression] = + private def classManifestsIn(originUid: Long): InboundManifestCompression = _classManifestsIns.computeIfAbsent(originUid, createInboundManifestsForOrigin) // actor ref compression --- override def decompressActorRef(originUid: Long, tableVersion: Byte, idx: Int): OptionVal[ActorRef] = - actorRefsIn(originUid) match { - case Some(a) ⇒ a.decompress(tableVersion, idx) - case None ⇒ OptionVal.None - } + actorRefsIn(originUid).decompress(tableVersion, idx) + override def hitActorRef(originUid: Long, address: Address, ref: ActorRef, n: Int): Unit = { if (ArterySettings.Compression.Debug) println(s"[compress] hitActorRef($originUid, $address, $ref, $n)") - actorRefsIn(originUid) match { - case Some(a) ⇒ a.increment(address, ref, n) - case None ⇒ // closed - } + actorRefsIn(originUid).increment(address, ref, n) } + override def confirmActorRefCompressionAdvertisement(originUid: Long, tableVersion: Byte): Unit = { _actorRefsIns.get(originUid) match { - case null ⇒ // ignore - case Some(a) ⇒ a.confirmAdvertisement(tableVersion) - case None ⇒ // closed + case null ⇒ // ignore + case a ⇒ a.confirmAdvertisement(tableVersion) } } /** Send compression table advertisement over control stream. Should be called from Decoder. */ override def runNextActorRefAdvertisement(): Unit = { val vs = _actorRefsIns.values.iterator() - while (vs.hasNext) vs.next() match { - case Some(inbound) ⇒ - eventSink.hiFreq(FlightRecorderEvents.Compression_Inbound_RunActorRefAdvertisement, 1) - inbound.runNextTableAdvertisement() - case None ⇒ // do nothing... + var remove = Vector.empty[Long] + while (vs.hasNext) { + val inbound = vs.next() + inboundContext.association(inbound.originUid) match { + case OptionVal.Some(a) if !a.associationState.isQuarantined(inbound.originUid) ⇒ + eventSink.hiFreq(FlightRecorderEvents.Compression_Inbound_RunActorRefAdvertisement, inbound.originUid) + inbound.runNextTableAdvertisement() + case _ ⇒ remove :+= inbound.originUid + } } + if (remove.nonEmpty) remove.foreach(close) } // class manifest compression --- override def decompressClassManifest(originUid: Long, tableVersion: Byte, idx: Int): OptionVal[String] = - classManifestsIn(originUid) match { - case Some(a) ⇒ a.decompress(tableVersion, idx) - case None ⇒ OptionVal.None - } + classManifestsIn(originUid).decompress(tableVersion, idx) override def hitClassManifest(originUid: Long, address: Address, manifest: String, n: Int): Unit = { if (ArterySettings.Compression.Debug) println(s"[compress] hitClassManifest($originUid, $address, $manifest, $n)") - classManifestsIn(originUid) match { - case Some(a) ⇒ a.increment(address, manifest, n) - case None ⇒ // closed - } + classManifestsIn(originUid).increment(address, manifest, n) } override def confirmClassManifestCompressionAdvertisement(originUid: Long, tableVersion: Byte): Unit = { _classManifestsIns.get(originUid) match { - case null ⇒ // ignore - case Some(a) ⇒ a.confirmAdvertisement(tableVersion) - case None ⇒ // closed + case null ⇒ // ignore + case a ⇒ a.confirmAdvertisement(tableVersion) } } /** Send compression table advertisement over control stream. Should be called from Decoder. */ override def runNextClassManifestAdvertisement(): Unit = { val vs = _classManifestsIns.values.iterator() - while (vs.hasNext) vs.next() match { - case Some(inbound) ⇒ - eventSink.hiFreq(FlightRecorderEvents.Compression_Inbound_RunClassManifestAdvertisement, 1) - inbound.runNextTableAdvertisement() - case None ⇒ // do nothing... + var remove = Vector.empty[Long] + while (vs.hasNext) { + val inbound = vs.next() + inboundContext.association(inbound.originUid) match { + case OptionVal.Some(a) if !a.associationState.isQuarantined(inbound.originUid) ⇒ + eventSink.hiFreq(FlightRecorderEvents.Compression_Inbound_RunClassManifestAdvertisement, inbound.originUid) + inbound.runNextTableAdvertisement() + case _ ⇒ remove :+= inbound.originUid + } } + if (remove.nonEmpty) remove.foreach(close) + } + + override def currentOriginUids: Set[Long] = { + import scala.collection.JavaConverters._ + // can't use union because of java.lang.Long and Scala Long mismatch, + // only used for testing so doesn't matter + val result = Set.empty[java.lang.Long] ++ _actorRefsIns.keySet.asScala.iterator ++ + _classManifestsIns.keySet.asScala.iterator + result.map(_.longValue) } override def close(originUid: Long): Unit = { - _actorRefsIns.putIfAbsent(originUid, None) - _classManifestsIns.putIfAbsent(originUid, None) + _actorRefsIns.remove(originUid) + _classManifestsIns.remove(originUid) } } @@ -281,7 +287,7 @@ private[remote] object InboundCompression { private[remote] abstract class InboundCompression[T >: Null]( val log: LoggingAdapter, val settings: ArterySettings.Compression, - originUid: Long, + val originUid: Long, inboundContext: InboundContext, val heavyHitters: TopHeavyHitters[T]) { @@ -396,10 +402,7 @@ private[remote] abstract class InboundCompression[T >: Null]( case None ⇒ inboundContext.association(originUid) match { case OptionVal.Some(association) ⇒ - if (association.associationState.isQuarantined(originUid)) { - // FIXME cleanup compresssion for quarantined associations, see #23967 - log.debug("Ignoring {} for quarantined originUid [{}].", Logging.simpleName(tables.activeTable), originUid) - } else if (alive) { + if (alive && association.isOrdinaryMessageStreamActive()) { val table = prepareCompressionAdvertisement(tables.nextTable.version) // TODO expensive, check if building the other way wouldn't be faster? val nextState = tables.copy(nextTable = table.invert, advertisementInProgress = Some(table)) @@ -424,16 +427,10 @@ private[remote] abstract class InboundCompression[T >: Null]( inboundContext.association(originUid) match { case OptionVal.Some(association) ⇒ - if (association.associationState.isQuarantined(originUid)) { - // give up - log.debug("Skipping advertisement in progress for quarantined originUid [{}].", originUid) - confirmAdvertisement(inProgress.version) - } else { - log.debug( - "Advertisement in progress for originUid [{}] version {}, resending", - originUid, inProgress.version) - advertiseCompressionTable(association, inProgress) // resend - } + log.debug( + "Advertisement in progress for originUid [{}] version {}, resending", + originUid, inProgress.version) + advertiseCompressionTable(association, inProgress) // resend case OptionVal.None ⇒ } } else { @@ -497,5 +494,7 @@ private[remote] case object NoInboundCompressions extends InboundCompressions { override def confirmClassManifestCompressionAdvertisement(originUid: Long, tableVersion: Byte): Unit = () override def runNextClassManifestAdvertisement(): Unit = () + override def currentOriginUids: Set[Long] = Set.empty + override def close(originUid: Long): Unit = () } diff --git a/akka-remote/src/main/scala/akka/remote/artery/tcp/ArteryTcpTransport.scala b/akka-remote/src/main/scala/akka/remote/artery/tcp/ArteryTcpTransport.scala index 125421b32c..266d4359f4 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/tcp/ArteryTcpTransport.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/tcp/ArteryTcpTransport.scala @@ -128,13 +128,30 @@ private[remote] class ArteryTcpTransport(_system: ExtendedActorSystem, _provider TcpOutbound_Connected, s"${outboundContext.remoteAddress.host.get}:${outboundContext.remoteAddress.port.get} " + s"/ ${streamName(streamId)}") - Flow[ByteString] - .prepend(Source.single(TcpFraming.encodeConnectionHeader(streamId))) - .via(connectionFlow) - .mapMaterializedValue(_ ⇒ NotUsed) - .recoverWithRetries(1, { case ArteryTransport.ShutdownSignal ⇒ Source.empty }) - .log(name = s"outbound connection to [${outboundContext.remoteAddress}], ${streamName(streamId)} stream") - .addAttributes(Attributes.logLevels(onElement = LogLevels.Off, onFailure = Logging.WarningLevel)) + + // FIXME use the Flow.lazyInit from https://github.com/akka/akka/pull/24527 + + val flow = + Flow[ByteString] + .prepend(Source.single(TcpFraming.encodeConnectionHeader(streamId))) + .via(connectionFlow) + .recoverWithRetries(1, { case ArteryTransport.ShutdownSignal ⇒ Source.empty }) + .log(name = s"outbound connection to [${outboundContext.remoteAddress}], ${streamName(streamId)} stream") + .addAttributes(Attributes.logLevels(onElement = LogLevels.Off, onFailure = Logging.WarningLevel)) + + if (streamId == ControlStreamId) { + // must replace the KillSwitch when restarted + val controlIdleKillSwitch = KillSwitches.shared("outboundControlStreamIdleKillSwitch") + Flow[ByteString] + .via(controlIdleKillSwitch.flow) + .via(flow) + .mapMaterializedValue { _ ⇒ + outboundContext.asInstanceOf[Association].setControlIdleKillSwitch(OptionVal.Some(controlIdleKillSwitch)) + NotUsed + } + } else { + flow + } } if (streamId == ControlStreamId) { @@ -145,7 +162,6 @@ private[remote] class ArteryTcpTransport(_system: ExtendedActorSystem, _provider settings.Advanced.GiveUpSystemMessageAfter, 0.1)(flowFactory) } else { // Best effort retry a few times - // FIXME only restart on failures?, but missing in RestartFlow, see https://github.com/akka/akka/pull/23911 RestartFlow.withBackoff[ByteString, ByteString]( settings.Advanced.OutboundRestartBackoff, settings.Advanced.OutboundRestartBackoff * 5, 0.1, maxRestarts = 3)(flowFactory) @@ -397,7 +413,7 @@ private[remote] class ArteryTcpTransport(_system: ExtendedActorSystem, _provider implicit val ec: ExecutionContext = materializer.executionContext inboundKillSwitch.shutdown() unbind().map { _ ⇒ - topLevelFREvents.loFreq(Transport_Stopped, NoMetaData) + topLevelFlightRecorder.loFreq(Transport_Stopped, NoMetaData) Done } } @@ -410,7 +426,7 @@ private[remote] class ArteryTcpTransport(_system: ExtendedActorSystem, _provider b ← binding _ ← b.unbind() } yield { - topLevelFREvents.loFreq(TcpInbound_Bound, s"${localAddress.address.host.get}:${localAddress.address.port}") + topLevelFlightRecorder.loFreq(TcpInbound_Bound, s"${localAddress.address.host.get}:${localAddress.address.port}") Done } case None ⇒ diff --git a/akka-remote/src/test/scala/akka/remote/artery/EnvelopeBufferSpec.scala b/akka-remote/src/test/scala/akka/remote/artery/EnvelopeBufferSpec.scala index f9da3146f6..30f425ad9f 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/EnvelopeBufferSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/EnvelopeBufferSpec.scala @@ -49,6 +49,7 @@ class EnvelopeBufferSpec extends AkkaSpec { override def runNextActorRefAdvertisement(): Unit = ??? override def runNextClassManifestAdvertisement(): Unit = ??? + override def currentOriginUids: Set[Long] = ??? } val version = ArteryTransport.HighestVersion diff --git a/akka-remote/src/test/scala/akka/remote/artery/InboundControlJunctionSpec.scala b/akka-remote/src/test/scala/akka/remote/artery/InboundControlJunctionSpec.scala index e3232c61ad..31b9deec8d 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/InboundControlJunctionSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/InboundControlJunctionSpec.scala @@ -3,6 +3,9 @@ */ package akka.remote.artery +import scala.util.Try + +import akka.Done import akka.actor.Address import akka.remote.UniqueAddress import akka.remote.artery.InboundControlJunction.ControlMessageObserver @@ -56,6 +59,7 @@ class InboundControlJunctionSpec override def notify(env: InboundEnvelope) = { observerProbe.ref ! env.message } + override def controlSubjectCompleted(signal: Try[Done]): Unit = () }) downstream.request(10) diff --git a/akka-remote/src/test/scala/akka/remote/artery/OutboundIdleShutdownSpec.scala b/akka-remote/src/test/scala/akka/remote/artery/OutboundIdleShutdownSpec.scala new file mode 100644 index 0000000000..256cc2ed30 --- /dev/null +++ b/akka-remote/src/test/scala/akka/remote/artery/OutboundIdleShutdownSpec.scala @@ -0,0 +1,209 @@ +/** + * Copyright (C) 2009-2017 Lightbend Inc. + */ +package akka.remote.artery + +import scala.concurrent.duration._ + +import akka.actor.ActorRef +import akka.actor.ActorSystem +import akka.actor.Address +import akka.actor.RootActorPath +import akka.remote.RARP +import akka.testkit.ImplicitSender +import akka.testkit.TestActors +import akka.testkit.TestProbe +import org.scalatest.concurrent.Eventually +import org.scalatest.time.Span + +class OutboundIdleShutdownSpec extends ArteryMultiNodeSpec(s""" + akka.loglevel=INFO + akka.remote.artery.advanced.stop-idle-outbound-after = 1 s + akka.remote.artery.advanced.connection-timeout = 2 s + akka.remote.artery.advanced.remove-quarantined-association-after = 1 s + akka.remote.artery.advanced.compression { + actor-refs.advertisement-interval = 5 seconds + } + """) with ImplicitSender with Eventually { + + override implicit val patience: PatienceConfig = PatienceConfig( + testKitSettings.DefaultTimeout.duration * 2, + Span(200, org.scalatest.time.Millis)) + + private def isArteryTcp: Boolean = + RARP(system).provider.transport.asInstanceOf[ArteryTransport].settings.Transport == ArterySettings.Tcp + + private def assertStreamActive(association: Association, queueIndex: Int, expected: Boolean): Unit = { + if (queueIndex == Association.ControlQueueIndex) { + // the control stream is not stopped, but for TCP the connection is closed + if (expected) + association.isStreamActive(queueIndex) shouldBe expected + else if (isArteryTcp && !association.isRemovedAfterQuarantined()) { + association.associationState.controlIdleKillSwitch.isDefined shouldBe expected + } + } else { + association.isStreamActive(queueIndex) shouldBe expected + } + + } + + "Outbound streams" should { + + "be stopped when they are idle" in withAssociation { + (_, remoteAddress, remoteEcho, localArtery, localProbe) ⇒ + + val association = localArtery.association(remoteAddress) + withClue("When initiating a connection, both the control and ordinary streams are opened") { + assertStreamActive(association, Association.ControlQueueIndex, expected = true) + assertStreamActive(association, Association.OrdinaryQueueIndex, expected = true) + } + + eventually { + assertStreamActive(association, Association.ControlQueueIndex, expected = false) + assertStreamActive(association, Association.OrdinaryQueueIndex, expected = false) + } + } + + "still be resumable after they have been stopped" in withAssociation { + (_, remoteAddress, remoteEcho, localArtery, localProbe) ⇒ + val firstAssociation = localArtery.association(remoteAddress) + + eventually { + assertStreamActive(firstAssociation, Association.ControlQueueIndex, expected = false) + assertStreamActive(firstAssociation, Association.OrdinaryQueueIndex, expected = false) + } + + withClue("re-initiating the connection should be the same as starting it the first time") { + + eventually { + remoteEcho.tell("ping", localProbe.ref) + localProbe.expectMsg("ping") + val secondAssociation = localArtery.association(remoteAddress) + assertStreamActive(secondAssociation, Association.ControlQueueIndex, expected = true) + assertStreamActive(secondAssociation, Association.OrdinaryQueueIndex, expected = true) + } + + } + } + + "eliminate quarantined association when not used" in withAssociation { + (_, remoteAddress, remoteEcho, localArtery, localProbe) ⇒ + + val association = localArtery.association(remoteAddress) + withClue("When initiating a connection, both the control and ordinary streams are opened") { + assertStreamActive(association, Association.ControlQueueIndex, expected = true) + assertStreamActive(association, Association.OrdinaryQueueIndex, expected = true) + } + + val remoteUid = association.associationState.uniqueRemoteAddress.futureValue.uid + + localArtery.quarantine(remoteAddress, Some(remoteUid), "Test") + + eventually { + assertStreamActive(association, Association.ControlQueueIndex, expected = false) + assertStreamActive(association, Association.OrdinaryQueueIndex, expected = false) + } + + Thread.sleep(2000) + // localArtery.quarantine(remoteAddress, Some(remoteUid), "Test") + + // the outbound streams are inactive and association quarantined, then it's completely removed + eventually { + localArtery.remoteAddresses should not contain remoteAddress + } + } + + "remove inbound compression after quarantine" in withAssociation { + (_, remoteAddress, remoteEcho, localArtery, localProbe) ⇒ + + val association = localArtery.association(remoteAddress) + val remoteUid = association.associationState.uniqueRemoteAddress.futureValue.uid + + localArtery.inboundCompressionAccess.get.currentCompressionOriginUids.futureValue should contain(remoteUid) + + eventually { + assertStreamActive(association, Association.OrdinaryQueueIndex, expected = false) + } + // compression still exists when idle + localArtery.inboundCompressionAccess.get.currentCompressionOriginUids.futureValue should contain(remoteUid) + + localArtery.quarantine(remoteAddress, Some(remoteUid), "Test") + // after quarantine it should be removed + eventually { + localArtery.inboundCompressionAccess.get.currentCompressionOriginUids.futureValue should not contain remoteUid + } + } + + "remove inbound compression after restart with same host:port" in withAssociation { + (remoteSystem, remoteAddress, remoteEcho, localArtery, localProbe) ⇒ + + val association = localArtery.association(remoteAddress) + val remoteUid = association.associationState.uniqueRemoteAddress.futureValue.uid + + localArtery.inboundCompressionAccess.get.currentCompressionOriginUids.futureValue should contain(remoteUid) + + shutdown(remoteSystem, verifySystemShutdown = true) + + val remoteSystem2 = newRemoteSystem(Some(s""" + akka.remote.artery.canonical.hostname = ${remoteAddress.host.get} + akka.remote.artery.canonical.port = ${remoteAddress.port.get} + """), name = Some(remoteAddress.system)) + try { + + remoteSystem2.actorOf(TestActors.echoActorProps, "echo2") + + def remoteEcho = system.actorSelection(RootActorPath(remoteAddress) / "user" / "echo2") + + val echoRef = eventually { + remoteEcho.resolveOne(1.seconds).futureValue + } + + echoRef.tell("ping2", localProbe.ref) + localProbe.expectMsg("ping2") + + val association2 = localArtery.association(remoteAddress) + val remoteUid2 = association2.associationState.uniqueRemoteAddress.futureValue.uid + + remoteUid2 should !==(remoteUid) + + eventually { + localArtery.inboundCompressionAccess.get.currentCompressionOriginUids.futureValue should contain(remoteUid2) + } + eventually { + localArtery.inboundCompressionAccess.get.currentCompressionOriginUids.futureValue should not contain remoteUid + } + } finally { + shutdown(remoteSystem2) + } + } + + /** + * Test setup fixture: + * 1. A 'remote' ActorSystem is created to spawn an Echo actor, + * 2. A TestProbe is spawned locally to initiate communication with the Echo actor + * 3. Details (remoteAddress, remoteEcho, localArtery, localProbe) are supplied to the test + */ + def withAssociation(test: (ActorSystem, Address, ActorRef, ArteryTransport, TestProbe) ⇒ Any): Unit = { + val remoteSystem = newRemoteSystem() + try { + remoteSystem.actorOf(TestActors.echoActorProps, "echo") + val remoteAddress = RARP(remoteSystem).provider.getDefaultAddress + + def remoteEcho = system.actorSelection(RootActorPath(remoteAddress) / "user" / "echo") + + val echoRef = remoteEcho.resolveOne(3.seconds).futureValue + val localProbe = new TestProbe(localSystem) + + echoRef.tell("ping", localProbe.ref) + localProbe.expectMsg("ping") + + val artery = RARP(system).provider.transport.asInstanceOf[ArteryTransport] + + test(remoteSystem, remoteAddress, echoRef, artery, localProbe) + + } finally { + shutdown(remoteSystem) + } + } + } +} diff --git a/akka-remote/src/test/scala/akka/remote/artery/SendQueueSpec.scala b/akka-remote/src/test/scala/akka/remote/artery/SendQueueSpec.scala index 2780edf691..b0e7a2f004 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/SendQueueSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/SendQueueSpec.scala @@ -55,11 +55,19 @@ class SendQueueSpec extends AkkaSpec("akka.actor.serialize-messages = off") with val matSettings = ActorMaterializerSettings(system).withFuzzing(true) implicit val mat = ActorMaterializer(matSettings)(system) + def sendToDeadLetters[T](pending: Vector[T]): Unit = + pending.foreach(system.deadLetters ! _) + + def createQueue[E](capacity: Int): Queue[E] = { + // new java.util.concurrent.LinkedBlockingQueue[E](capacity) + new ManyToOneConcurrentArrayQueue[E](capacity) + } + "SendQueue" must { "deliver all messages" in { - val queue = new ManyToOneConcurrentArrayQueue[String](128) - val (sendQueue, downstream) = Source.fromGraph(new SendQueue[String](system.deadLetters)) + val queue = createQueue[String](128) + val (sendQueue, downstream) = Source.fromGraph(new SendQueue[String](sendToDeadLetters)) .toMat(TestSink.probe)(Keep.both).run() downstream.request(10) @@ -74,11 +82,11 @@ class SendQueueSpec extends AkkaSpec("akka.actor.serialize-messages = off") with } "deliver messages enqueued before materialization" in { - val queue = new ManyToOneConcurrentArrayQueue[String](128) + val queue = createQueue[String](128) queue.offer("a") queue.offer("b") - val (sendQueue, downstream) = Source.fromGraph(new SendQueue[String](system.deadLetters)) + val (sendQueue, downstream) = Source.fromGraph(new SendQueue[String](sendToDeadLetters)) .toMat(TestSink.probe)(Keep.both).run() downstream.request(10) @@ -94,9 +102,9 @@ class SendQueueSpec extends AkkaSpec("akka.actor.serialize-messages = off") with "deliver bursts of messages" in { // this test verifies that the wakeup signal is triggered correctly - val queue = new ManyToOneConcurrentArrayQueue[Int](128) + val queue = createQueue[Int](128) val burstSize = 100 - val (sendQueue, downstream) = Source.fromGraph(new SendQueue[Int](system.deadLetters)) + val (sendQueue, downstream) = Source.fromGraph(new SendQueue[Int](sendToDeadLetters)) .grouped(burstSize) .async .toMat(TestSink.probe)(Keep.both).run() @@ -118,13 +126,13 @@ class SendQueueSpec extends AkkaSpec("akka.actor.serialize-messages = off") with "support multiple producers" in { val numberOfProducers = 5 - val queue = new ManyToOneConcurrentArrayQueue[Msg](numberOfProducers * 512) + val queue = createQueue[Msg](numberOfProducers * 512) val producers = Vector.tabulate(numberOfProducers)(i ⇒ system.actorOf(producerProps(s"producer-$i"))) // send 100 per producer before materializing producers.foreach(_ ! ProduceToQueue(0, 100, queue)) - val (sendQueue, downstream) = Source.fromGraph(new SendQueue[Msg](system.deadLetters)) + val (sendQueue, downstream) = Source.fromGraph(new SendQueue[Msg](sendToDeadLetters)) .toMat(TestSink.probe)(Keep.both).run() sendQueue.inject(queue) @@ -150,11 +158,11 @@ class SendQueueSpec extends AkkaSpec("akka.actor.serialize-messages = off") with "deliver first message" in { - def test(f: (ManyToOneConcurrentArrayQueue[String], SendQueue.QueueValue[String], TestSubscriber.Probe[String]) ⇒ Unit): Unit = { + def test(f: (Queue[String], SendQueue.QueueValue[String], TestSubscriber.Probe[String]) ⇒ Unit): Unit = { (1 to 100).foreach { n ⇒ - val queue = new ManyToOneConcurrentArrayQueue[String](16) - val (sendQueue, downstream) = Source.fromGraph(new SendQueue[String](system.deadLetters)) + val queue = createQueue[String](16) + val (sendQueue, downstream) = Source.fromGraph(new SendQueue[String](sendToDeadLetters)) .toMat(TestSink.probe)(Keep.both).run() f(queue, sendQueue, downstream) diff --git a/akka-remote/src/test/scala/akka/remote/artery/SystemMessageDeliverySpec.scala b/akka-remote/src/test/scala/akka/remote/artery/SystemMessageDeliverySpec.scala index 27ac2f5335..1d263fcc2a 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/SystemMessageDeliverySpec.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/SystemMessageDeliverySpec.scala @@ -25,6 +25,7 @@ import akka.stream.testkit.scaladsl.TestSink import akka.testkit.ImplicitSender import akka.testkit.TestActors import akka.testkit.TestProbe +import com.typesafe.config.Config import com.typesafe.config.ConfigFactory import akka.util.OptionVal @@ -33,8 +34,13 @@ object SystemMessageDeliverySpec { case class TestSysMsg(s: String) extends SystemMessageDelivery.AckedDeliveryMessage val config = ConfigFactory.parseString( - """ - akka.loglevel = DEBUG + s""" + akka.loglevel = INFO + akka.remote.artery.advanced.stop-idle-outbound-after = 1000 ms + akka.remote.artery.advanced.inject-handshake-interval = 500 ms + akka.remote.watch-failure-detector.heartbeat-interval = 2 s + akka.remote.artery.log-received-messages = on + akka.remote.artery.log-sent-messages = on """.stripMargin).withFallback(ArterySpecSupport.defaultConfig) } @@ -103,7 +109,7 @@ class SystemMessageDeliverySpec extends ArteryMultiNodeSpec(SystemMessageDeliver "System messages" must { "be delivered with real actors" in { - systemB.actorOf(TestActors.echoActorProps, "echo") + val systemBRef = systemB.actorOf(TestActors.echoActorProps, "echo") val remoteRef = { system.actorSelection(rootB / "user" / "echo") ! Identify(None) @@ -111,10 +117,39 @@ class SystemMessageDeliverySpec extends ArteryMultiNodeSpec(SystemMessageDeliver } watch(remoteRef) - remoteRef ! PoisonPill + systemB.stop(systemBRef) expectTerminated(remoteRef) } + "be delivered when concurrent idle stopping" in { + // it's configured with short stop-idle-outbound-after to stress exercise stopping of idle outbound streams + // at the same time as system messages are sent + + val systemBRef = systemB.actorOf(TestActors.echoActorProps, "echo2") + + val remoteRef = { + system.actorSelection(rootB / "user" / "echo2") ! Identify(None) + expectMsgType[ActorIdentity].ref.get + } + + val idleTimeout = RARP(system).provider.transport.asInstanceOf[ArteryTransport].settings.Advanced.StopIdleOutboundAfter + val rnd = ThreadLocalRandom.current() + + (1 to 5).foreach { _ ⇒ + (1 to 1).foreach { _ ⇒ + watch(remoteRef) + unwatch(remoteRef) + } + Thread.sleep((idleTimeout - 10.millis).toMillis + rnd.nextInt(20)) + } + + watch(remoteRef) + remoteRef ! "ping2" + expectMsg("ping2") + systemB.stop(systemBRef) + expectTerminated(remoteRef, 5.seconds) + } + "be flushed on shutdown" in { val systemC = ActorSystem("systemC", system.settings.config) try { diff --git a/akka-remote/src/test/scala/akka/remote/artery/TestContext.scala b/akka-remote/src/test/scala/akka/remote/artery/TestContext.scala index d6ce809ab1..8a0069f0bd 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/TestContext.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/TestContext.scala @@ -90,6 +90,8 @@ private[remote] class TestOutboundContext( _associationState = _associationState.newQuarantined() } + override def isOrdinaryMessageStreamActive(): Boolean = true + override def sendControl(message: ControlMessage) = { controlProbe.foreach(_ ! message) controlSubject.sendControl(InboundEnvelope(OptionVal.None, message, OptionVal.None, localAddress.uid, @@ -114,8 +116,6 @@ private[remote] class TestControlMessageSubject extends ControlMessageSubject { observers.remove(observer) } - override def stopped: Future[Done] = Promise[Done]().future - def sendControl(env: InboundEnvelope): Unit = { val iter = observers.iterator() while (iter.hasNext())