diff --git a/akka-actor/src/main/scala/akka/event/Logging.scala b/akka-actor/src/main/scala/akka/event/Logging.scala index b2dbae6ea3..8e527bd1e6 100644 --- a/akka-actor/src/main/scala/akka/event/Logging.scala +++ b/akka-actor/src/main/scala/akka/event/Logging.scala @@ -396,6 +396,17 @@ object Logging { n.substring(i + 1) } + /** + * Class name representation of a message. + * `ActorSelectionMessage` representation includes class name of + * wrapped message. + */ + def messageClassName(message: Any): String = message match { + case null ⇒ "null" + case ActorSelectionMessage(m, _, _) ⇒ s"ActorSelectionMessage(${m.getClass.getName})" + case m ⇒ m.getClass.getName + } + /** * INTERNAL API */ diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterHeartbeat.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterHeartbeat.scala index 6279e44928..160e5615f9 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterHeartbeat.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterHeartbeat.scala @@ -8,7 +8,7 @@ import scala.collection.immutable import akka.actor.{ ActorLogging, ActorSelection, Address, Actor, RootActorPath } import akka.cluster.ClusterEvent._ import akka.remote.FailureDetectorRegistry -import akka.remote.PriorityMessage +import akka.remote.HeartbeatMessage import akka.actor.DeadLetterSuppression /** @@ -36,12 +36,12 @@ private[cluster] object ClusterHeartbeatSender { /** * Sent at regular intervals for failure detection. */ - final case class Heartbeat(from: Address) extends ClusterMessage with PriorityMessage with DeadLetterSuppression + final case class Heartbeat(from: Address) extends ClusterMessage with HeartbeatMessage with DeadLetterSuppression /** * Sent as reply to [[Heartbeat]] messages. */ - final case class HeartbeatRsp(from: UniqueAddress) extends ClusterMessage with PriorityMessage with DeadLetterSuppression + final case class HeartbeatRsp(from: UniqueAddress) extends ClusterMessage with HeartbeatMessage with DeadLetterSuppression // sent to self only case object HeartbeatTick diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterRemoteWatcher.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterRemoteWatcher.scala index 454b9ba81a..397f3a6da3 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterRemoteWatcher.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterRemoteWatcher.scala @@ -12,6 +12,7 @@ import akka.cluster.ClusterEvent.MemberRemoved import akka.cluster.ClusterEvent.MemberWeaklyUp import akka.remote.FailureDetectorRegistry import akka.remote.RemoteWatcher +import akka.remote.RARP /** * INTERNAL API @@ -51,6 +52,7 @@ private[cluster] class ClusterRemoteWatcher( unreachableReaperInterval, heartbeatExpectedResponseAfter) { + private val arteryEnabled = RARP(context.system).provider.remoteSettings.Artery.Enabled val cluster = Cluster(context.system) import cluster.selfAddress @@ -89,8 +91,10 @@ private[cluster] class ClusterRemoteWatcher( def memberRemoved(m: Member, previousStatus: MemberStatus): Unit = if (m.address != selfAddress) { clusterNodes -= m.address - if (previousStatus == MemberStatus.Down) { - quarantine(m.address, Some(m.uniqueAddress.uid), "Cluster member removed") + // TODO We should probably always quarantine when member is removed, + // but keeping old behavior for old remoting for now + if (arteryEnabled || previousStatus == MemberStatus.Down) { + quarantine(m.address, Some(m.uniqueAddress.uid), s"Cluster member removed, previous status [$previousStatus]") } publishAddressTerminated(m.address) } diff --git a/akka-remote/src/main/resources/reference.conf b/akka-remote/src/main/resources/reference.conf index 3d6a596822..f79a1be708 100644 --- a/akka-remote/src/main/resources/reference.conf +++ b/akka-remote/src/main/resources/reference.conf @@ -147,14 +147,6 @@ akka { # but must be resolved to ActorRefs first. large-message-destinations = [] - # Sets the log granularity level at which Akka logs artery events. This setting - # can take the values OFF, ERROR, WARNING, INFO or DEBUG. Please note that the effective - # logging level is still determined by the global logging level of the actor system: - # for example debug level artery events will be only logged if the system - # is running with debug level logging. - # Failures to deserialize received messages also fall under this flag. - log-lifecycle-events = DEBUG - # If set to a nonempty string artery will use the given dispatcher for # its internal actors otherwise the default dispatcher is used. use-dispatcher = "akka.remote.default-remote-dispatcher" @@ -252,23 +244,29 @@ akka { # dropped and will trigger quarantine. The value should be longer than the length # of a network partition that you need to survive. give-up-system-message-after = 6 hours - + # during ActorSystem termination the remoting will wait this long for # an acknowledgment by the destination system that flushing of outstanding # remote messages has been completed shutdown-flush-timeout = 1 second - # Timeout after which the inbound stream is going to be restarted. + # See 'inbound-max-restarts' inbound-restart-timeout = 5 seconds - # Max number of restarts for the inbound stream. + # Max number of restarts within 'inbound-restart-timeout' for the inbound streams. + # If more restarts occurs the ActorSystem will be terminated. inbound-max-restarts = 5 - # Timeout after which outbout stream is going to be restarted for every association. + # See 'outbound-max-restarts' outbound-restart-timeout = 5 seconds - # Max number of restars of the outbound stream for every association. + # Max number of restarts within 'outbound-restart-timeout' for the outbound streams. + # If more restarts occurs the ActorSystem will be terminated. outbound-max-restarts = 5 + + # Stop outbound stream of a quarantined association after this idle timeout, i.e. + # when not used any more. + stop-quarantined-after-idle = 3 seconds # Timeout after which aeron driver has not had keepalive messages # from a client before it considers the client dead. diff --git a/akka-remote/src/main/scala/akka/remote/RemoteWatcher.scala b/akka-remote/src/main/scala/akka/remote/RemoteWatcher.scala index 7681eaf8a3..33c8730f40 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteWatcher.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteWatcher.scala @@ -31,8 +31,8 @@ private[akka] object RemoteWatcher { final case class WatchRemote(watchee: InternalActorRef, watcher: InternalActorRef) final case class UnwatchRemote(watchee: InternalActorRef, watcher: InternalActorRef) - @SerialVersionUID(1L) case object Heartbeat extends PriorityMessage - @SerialVersionUID(1L) final case class HeartbeatRsp(addressUid: Int) extends PriorityMessage + @SerialVersionUID(1L) case object Heartbeat extends HeartbeatMessage + @SerialVersionUID(1L) final case class HeartbeatRsp(addressUid: Int) extends HeartbeatMessage // sent to self only case object HeartbeatTick diff --git a/akka-remote/src/main/scala/akka/remote/Remoting.scala b/akka-remote/src/main/scala/akka/remote/Remoting.scala index 21597bf459..415925a42f 100644 --- a/akka-remote/src/main/scala/akka/remote/Remoting.scala +++ b/akka-remote/src/main/scala/akka/remote/Remoting.scala @@ -59,6 +59,11 @@ private[akka] object RARP extends ExtensionId[RARP] with ExtensionIdProvider { */ private[akka] trait PriorityMessage +/** + * Failure detector heartbeat messages are marked with this trait. + */ +private[akka] trait HeartbeatMessage extends PriorityMessage + /** * INTERNAL API */ diff --git a/akka-remote/src/main/scala/akka/remote/artery/ArterySettings.scala b/akka-remote/src/main/scala/akka/remote/artery/ArterySettings.scala index eb14e7b506..630eb9585f 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/ArterySettings.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/ArterySettings.scala @@ -53,10 +53,6 @@ private[akka] final class ArterySettings private (config: Config) { val segments = entry.split('/').tail tree.insert(segments, NotUsed) } - val LifecycleEventsLogLevel: LogLevel = Logging.levelFor(toRootLowerCase(getString("log-lifecycle-events"))) match { - case Some(level) ⇒ level - case None ⇒ throw new ConfigurationException("Logging level must be one of (off, debug, info, warning, error)") - } val Dispatcher = getString("use-dispatcher") object Advanced { @@ -103,6 +99,8 @@ private[akka] final class ArterySettings private (config: Config) { val OutboundRestartTimeout = config.getMillisDuration("outbound-restart-timeout").requiring(interval ⇒ interval > Duration.Zero, "outbound-restart-timeout must be more than zero") val OutboundMaxRestarts = getInt("outbound-max-restarts") + val StopQuarantinedAfterIdle = config.getMillisDuration("stop-quarantined-after-idle").requiring(interval ⇒ + interval > Duration.Zero, "stop-quarantined-after-idle must be more than zero") val ClientLivenessTimeout = config.getMillisDuration("client-liveness-timeout").requiring(interval ⇒ interval > Duration.Zero, "client-liveness-timeout must be more than zero") val ImageLivenessTimeoutNs = config.getMillisDuration("image-liveness-timeout").requiring(interval ⇒ diff --git a/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala b/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala index 14f4d9ba27..2e17a80c19 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala @@ -29,7 +29,6 @@ import akka.actor.Props import akka.event.Logging import akka.event.LoggingAdapter import akka.remote.AddressUidExtension -import akka.remote.EventPublisher import akka.remote.RemoteActorRef import akka.remote.RemoteActorRefProvider import akka.remote.RemoteTransport @@ -303,7 +302,6 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R override def addresses: Set[Address] = _addresses override def localAddressForRemote(remote: Address): Address = defaultAddress override val log: LoggingAdapter = Logging(system, getClass.getName) - val eventPublisher = new EventPublisher(system, log, settings.LifecycleEventsLogLevel) private val codec: AkkaPduCodec = AkkaPduProtobufCodec private val killSwitch: SharedKillSwitch = KillSwitches.shared("transportKillSwitch") @@ -621,7 +619,7 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R "prepared for another incarnation with uid [{}] than current uid [{}], table: [{}]", from, table.originUid, localAddress.uid, table) case ActorRefCompressionAdvertisementAck(from, tableVersion) ⇒ - inboundCompressions.foreach(_.confirmActorRefCompressionAdvertisement(from.uid, tableVersion)) + _inboundCompressions.foreach(_.confirmActorRefCompressionAdvertisement(from.uid, tableVersion)) case ClassManifestCompressionAdvertisement(from, table) ⇒ if (table.originUid == localAddress.uid) { log.debug("Incoming Class Manifest compression advertisement from [{}], table: [{}]", from, table) @@ -649,7 +647,7 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R // Instead, the downing strategy should act on ThisActorSystemQuarantinedEvent, e.g. // use it as a STONITH signal. val lifecycleEvent = ThisActorSystemQuarantinedEvent(localAddress.address, from.address) - publishLifecycleEvent(lifecycleEvent) + system.eventStream.publish(lifecycleEvent) case _: ActorSystemTerminating ⇒ inboundEnvelope.sender match { @@ -760,6 +758,7 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R override def shutdown(): Future[Done] = { if (hasBeenShutdown.compareAndSet(false, true)) { + log.debug("Shutting down [{}]", localAddress) val allAssociations = associationRegistry.allAssociations val flushing: Future[Done] = if (allAssociations.isEmpty) Future.successful(Done) @@ -886,9 +885,6 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R } } - private def publishLifecycleEvent(event: RemotingLifecycleEvent): Unit = - eventPublisher.notifyListeners(event) - override def quarantine(remoteAddress: Address, uid: Option[Int], reason: String): Unit = { try { // FIXME use Long uid @@ -1026,7 +1022,7 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R /** INTERNAL API: for testing only. */ private[remote] def triggerCompressionAdvertisements(actorRef: Boolean, manifest: Boolean) = { - inboundCompressions.foreach { + _inboundCompressions.foreach { case c: InboundCompressionsImpl if actorRef || manifest ⇒ log.info("Triggering compression table advertisement for {}", c) if (actorRef) c.runNextActorRefAdvertisement() diff --git a/akka-remote/src/main/scala/akka/remote/artery/Association.scala b/akka-remote/src/main/scala/akka/remote/artery/Association.scala index bdbabfc6f0..106d7ddc9d 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/Association.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/Association.scala @@ -43,6 +43,9 @@ import akka.stream.scaladsl.Source import akka.util.{ Unsafe, WildcardIndex } import akka.util.OptionVal import org.agrona.concurrent.ManyToOneConcurrentArrayQueue +import akka.stream.SharedKillSwitch +import scala.util.control.NoStackTrace +import akka.actor.Cancellable /** * INTERNAL API @@ -73,6 +76,8 @@ private[remote] object Association { final val ControlQueueIndex = 0 final val LargeQueueIndex = 1 final val OrdinaryQueueIndex = 2 + + private object OutboundStreamStopSignal extends RuntimeException with NoStackTrace } /** @@ -139,7 +144,8 @@ private[remote] class Association( else Future.sequence(c.map(_.changeActorRefCompression(table))).map(_ ⇒ Done) timeoutAfter(result, changeCompressionTimeout, new ChangeOutboundCompressionFailed) } - private[this] val streamCompletions = new AtomicReference(Map.empty[String, Future[Done]]) + private[this] val streamCompletions = new AtomicReference(Map.empty[String, (SharedKillSwitch, Future[Done])]) + private[this] val idle = new AtomicReference[Option[Cancellable]](None) private[artery] def changeClassManifestCompression(table: CompressionTable[String]): Future[Done] = { import transport.system.dispatcher @@ -180,7 +186,8 @@ private[remote] class Association( case w: LazyQueueWrapper ⇒ w.runMaterialize() case _ ⇒ } - // materialization not completed yet + // the outboundControlIngress may be accessed before the stream is materialized + // using CountDownLatch to make sure that materialization is completed materializing.await(10, TimeUnit.SECONDS) _outboundControlIngress match { case OptionVal.Some(o) ⇒ o @@ -239,6 +246,7 @@ private[remote] class Association( if (swapState(current, newState)) { current.uniqueRemoteAddressValue() match { case Some(old) ⇒ + cancelIdleTimer() log.debug( "Incarnation {} of association to [{}] with new UID [{}] (old UID [{}])", newState.incarnation, peer.address, peer.uid, old.uid) @@ -258,9 +266,14 @@ private[remote] class Association( override def sendControl(message: ControlMessage): Unit = { try { if (!transport.isShutdown) - outboundControlIngress.sendControlMessage(message) + if (associationState.isQuarantined()) { + log.debug("Send control message [{}] to quarantined [{}]", Logging.messageClassName(message), + remoteAddress) + startIdleTimer() + } + outboundControlIngress.sendControlMessage(message) } catch { - case ShuttingDown => // silence it + case ShuttingDown ⇒ // silence it } } @@ -275,13 +288,19 @@ private[remote] class Association( def dropped(queueIndex: Int, qSize: Int, env: OutboundEnvelope): Unit = { log.debug( "Dropping message [{}] from [{}] to [{}] due to overflow of send queue, size [{}]", - message.getClass, sender.getOrElse(deadletters), recipient.getOrElse(recipient), qSize) + Logging.messageClassName(message), sender.getOrElse(deadletters), recipient.getOrElse(recipient), qSize) flightRecorder.hiFreq(Transport_SendQueueOverflow, queueIndex) deadletters ! env } + val quarantined = associationState.isQuarantined() + // allow ActorSelectionMessage to pass through quarantine, to be able to establish interaction with new system - if (message.isInstanceOf[ActorSelectionMessage] || !associationState.isQuarantined() || message == ClearSystemMessageDelivery) { + if (message.isInstanceOf[ActorSelectionMessage] || !quarantined || message == ClearSystemMessageDelivery) { + if (quarantined && message != ClearSystemMessageDelivery) { + log.debug("Quarantine piercing attempt with message [{}] to [{}]", Logging.messageClassName(message), recipient.getOrElse("")) + startIdleTimer() + } try { message match { case _: SystemMessage ⇒ @@ -318,12 +337,12 @@ private[remote] class Association( dropped(queueIndex, queueSize, outboundEnvelope) } } catch { - case ShuttingDown => // silence it + case ShuttingDown ⇒ // silence it } } else if (log.isDebugEnabled) log.debug( "Dropping message [{}] from [{}] to [{}] due to quarantined system [{}]", - message.getClass, sender.getOrElse(deadletters), recipient.getOrElse(recipient), remoteAddress) + Logging.messageClassName(message), sender.getOrElse(deadletters), recipient.getOrElse(recipient), remoteAddress) } private def selectQueue(recipient: OptionVal[RemoteActorRef]): Int = { @@ -374,16 +393,19 @@ private[remote] class Association( if (swapState(current, newState)) { // quarantine state change was performed log.warning( - "Association to [{}] with UID [{}] is irrecoverably failed. Quarantining address. {}", + "Association to [{}] with UID [{}] is irrecoverably failed. UID is now quarantined and all " + + "messages to this UID will be delivered to dead letters. " + + "Remote actorsystem must be restarted to recover from this situation. {}", remoteAddress, u, reason) + // FIXME when we complete the switch to Long UID we must use Long here also, issue #20644 + transport.system.eventStream.publish(QuarantinedEvent(remoteAddress, u.toInt)) clearOutboundCompression() clearInboundCompression(u) - // FIXME when we complete the switch to Long UID we must use Long here also, issue #20644 - transport.eventPublisher.notifyListeners(QuarantinedEvent(remoteAddress, u.toInt)) // end delivery of system messages to that incarnation after this point send(ClearSystemMessageDelivery, OptionVal.None, OptionVal.None) // try to tell the other system that we have quarantined it sendControl(Quarantined(localAddress, peer)) + startIdleTimer() } else quarantine(reason, uid) // recursive } @@ -402,6 +424,22 @@ private[remote] class Association( } + private def cancelIdleTimer(): Unit = { + val current = idle.get + current.foreach(_.cancel()) + idle.compareAndSet(current, None) + } + + private def startIdleTimer(): Unit = { + cancelIdleTimer() + idle.set(Some(transport.system.scheduler.scheduleOnce(advancedSettings.StopQuarantinedAfterIdle) { + if (associationState.isQuarantined()) + streamCompletions.get.valuesIterator.foreach { + case (killSwitch, _) ⇒ killSwitch.abort(OutboundStreamStopSignal) + } + }(transport.system.dispatcher))) + } + /** * Called once after construction when the `Association` instance * wins the CAS in the `AssociationRegistry`. It will materialize @@ -429,16 +467,17 @@ private[remote] class Association( private def runOutboundControlStream(): Unit = { if (transport.isShutdown) throw ShuttingDown - // stage in the control stream may access the outboundControlIngress before returned here - // using CountDownLatch to make sure that materialization is completed before accessing outboundControlIngress - materializing = new CountDownLatch(1) + log.debug("Starting outbound control stream to [{}]", remoteAddress) val wrapper = getOrCreateQueueWrapper(ControlQueueIndex, queueSize) queues(ControlQueueIndex) = wrapper // use new underlying queue immediately for restarts queuesVisibility = true // volatile write for visibility of the queues array + val streamKillSwitch = KillSwitches.shared("outboundControlStreamKillSwitch") + val (queueValue, (control, completed)) = Source.fromGraph(new SendQueue[OutboundEnvelope]) + .via(streamKillSwitch.flow) .toMat(transport.outboundControl(this))(Keep.both) .run()(materializer) @@ -449,7 +488,7 @@ private[remote] class Association( _outboundControlIngress = OptionVal.Some(control) materializing.countDown() attachStreamRestart("Outbound control stream", ControlQueueIndex, controlQueueSize, - completed, () ⇒ runOutboundControlStream()) + streamKillSwitch, completed, () ⇒ runOutboundControlStream()) } private def getOrCreateQueueWrapper(queueIndex: Int, capacity: Int): QueueWrapper = { @@ -465,13 +504,17 @@ private[remote] class Association( private def runOutboundOrdinaryMessagesStream(): Unit = { if (transport.isShutdown) throw ShuttingDown if (outboundLanes == 1) { + log.debug("Starting outbound message stream to [{}]", remoteAddress) val queueIndex = OrdinaryQueueIndex val wrapper = getOrCreateQueueWrapper(queueIndex, queueSize) queues(queueIndex) = wrapper // use new underlying queue immediately for restarts queuesVisibility = true // volatile write for visibility of the queues array + val streamKillSwitch = KillSwitches.shared("outboundMessagesKillSwitch") + val ((queueValue, testMgmt), (changeCompression, completed)) = Source.fromGraph(new SendQueue[OutboundEnvelope]) + .via(streamKillSwitch.flow) .viaMat(transport.outboundTestFlow(this))(Keep.both) .toMat(transport.outbound(this))(Keep.both) .run()(materializer) @@ -483,9 +526,10 @@ private[remote] class Association( changeOutboundCompression = Vector(changeCompression) attachStreamRestart("Outbound message stream", OrdinaryQueueIndex, queueSize, - completed, () ⇒ runOutboundOrdinaryMessagesStream()) + streamKillSwitch, completed, () ⇒ runOutboundOrdinaryMessagesStream()) } else { + log.debug("Starting outbound message stream to [{}] with [{}] lanes", remoteAddress, outboundLanes) val wrappers = (0 until outboundLanes).map { i ⇒ val wrapper = getOrCreateQueueWrapper(OrdinaryQueueIndex + i, queueSize) queues(OrdinaryQueueIndex + i) = wrapper // use new underlying queue immediately for restarts @@ -493,10 +537,10 @@ private[remote] class Association( wrapper }.toVector - val laneKillSwitch = KillSwitches.shared("outboundLaneKillSwitch") + val streamKillSwitch = KillSwitches.shared("outboundMessagesKillSwitch") val lane = Source.fromGraph(new SendQueue[OutboundEnvelope]) - .via(laneKillSwitch.flow) + .via(streamKillSwitch.flow) .via(transport.outboundTestFlow(this)) .viaMat(transport.outboundLane(this))(Keep.both) .watchTermination()(Keep.both) @@ -507,7 +551,7 @@ private[remote] class Association( } val (mergeHub, aeronSinkCompleted) = MergeHub.source[EnvelopeBuffer] - .via(laneKillSwitch.flow) + .via(streamKillSwitch.flow) .toMat(transport.aeronSink(this))(Keep.both).run()(materializer) val values: Vector[(SendQueue.QueueValue[OutboundEnvelope], Encoder.ChangeOutboundCompression, Future[Done])] = @@ -522,9 +566,9 @@ private[remote] class Association( // tear down all parts if one part fails or completes completed.onFailure { - case reason: Throwable ⇒ laneKillSwitch.abort(reason) + case reason: Throwable ⇒ streamKillSwitch.abort(reason) } - (laneCompletedValues :+ aeronSinkCompleted).foreach(_.onSuccess { case _ ⇒ laneKillSwitch.shutdown() }) + (laneCompletedValues :+ aeronSinkCompleted).foreach(_.onSuccess { case _ ⇒ streamKillSwitch.shutdown() }) queueValues.zip(wrappers).zipWithIndex.foreach { case ((q, w), i) ⇒ @@ -536,17 +580,21 @@ private[remote] class Association( changeOutboundCompression = changeCompressionValues attachStreamRestart("Outbound message stream", OrdinaryQueueIndex, queueSize, - completed, () ⇒ runOutboundOrdinaryMessagesStream()) + streamKillSwitch, completed, () ⇒ runOutboundOrdinaryMessagesStream()) } } private def runOutboundLargeMessagesStream(): Unit = { if (transport.isShutdown) throw ShuttingDown + log.debug("Starting outbound large message stream to [{}]", remoteAddress) val wrapper = getOrCreateQueueWrapper(LargeQueueIndex, largeQueueSize) queues(LargeQueueIndex) = wrapper // use new underlying queue immediately for restarts queuesVisibility = true // volatile write for visibility of the queues array + val streamKillSwitch = KillSwitches.shared("outboundLargeMessagesKillSwitch") + val (queueValue, completed) = Source.fromGraph(new SendQueue[OutboundEnvelope]) + .via(streamKillSwitch.flow) .via(transport.outboundTestFlow(this)) .toMat(transport.outboundLarge(this))(Keep.both) .run()(materializer) @@ -556,32 +604,38 @@ private[remote] class Association( queues(LargeQueueIndex) = queueValue queuesVisibility = true // volatile write for visibility of the queues array attachStreamRestart("Outbound large message stream", LargeQueueIndex, largeQueueSize, - completed, () ⇒ runOutboundLargeMessagesStream()) + streamKillSwitch, completed, () ⇒ runOutboundLargeMessagesStream()) } private def attachStreamRestart(streamName: String, queueIndex: Int, queueCapacity: Int, - streamCompleted: Future[Done], restart: () ⇒ Unit): Unit = { + streamKillSwitch: SharedKillSwitch, streamCompleted: Future[Done], restart: () ⇒ Unit): Unit = { def lazyRestart(): Unit = { changeOutboundCompression = Vector.empty - if (queueIndex == ControlQueueIndex) + if (queueIndex == ControlQueueIndex) { + materializing = new CountDownLatch(1) _outboundControlIngress = OptionVal.None + } // LazyQueueWrapper will invoke the `restart` function when first message is offered queues(queueIndex) = LazyQueueWrapper(createQueue(queueCapacity), restart) queuesVisibility = true // volatile write for visibility of the queues array } implicit val ec = materializer.executionContext - updateStreamCompletion(streamName, streamCompleted.recover { case _ ⇒ Done }) + updateStreamCompletion(streamName, (streamKillSwitch, streamCompleted.recover { case _ ⇒ Done })) streamCompleted.onFailure { case ArteryTransport.ShutdownSignal ⇒ // shutdown as expected case _: AeronTerminated ⇒ // shutdown already in progress case cause if transport.isShutdown ⇒ // don't restart after shutdown, but log some details so we notice - log.error(cause, s"{} to {} failed after shutdown. {}", streamName, remoteAddress, cause.getMessage) + log.error(cause, s"{} to [{}] failed after shutdown. {}", streamName, remoteAddress, cause.getMessage) case _: AbruptTerminationException ⇒ // ActorSystem shutdown + case OutboundStreamStopSignal ⇒ + // stop as expected due to quarantine + log.debug("{} to [{}] stopped. It will be restarted if used again.", streamName, remoteAddress) + lazyRestart() case cause: GaveUpMessageException ⇒ - log.debug("{} to {} failed. Restarting it. {}", streamName, remoteAddress, cause.getMessage) + log.debug("{} to [{}] failed. Restarting it. {}", streamName, remoteAddress, cause.getMessage) // restart unconditionally, without counting restarts lazyRestart() case cause ⇒ @@ -593,10 +647,10 @@ private[remote] class Association( } if (restartCounter.restart()) { - log.error(cause, "{} to {} failed. Restarting it. {}", streamName, remoteAddress, cause.getMessage) + log.error(cause, "{} to [{}] failed. Restarting it. {}", streamName, remoteAddress, cause.getMessage) lazyRestart() } else { - log.error(cause, s"{} to {} failed and restarted {} times within {} seconds. Terminating system. ${cause.getMessage}", + log.error(cause, s"{} to [{}] failed and restarted {} times within {} seconds. Terminating system. ${cause.getMessage}", streamName, remoteAddress, advancedSettings.OutboundMaxRestarts, advancedSettings.OutboundRestartTimeout.toSeconds) transport.system.terminate() } @@ -605,10 +659,10 @@ private[remote] class Association( // set the future that completes when the current stream for a given name completes @tailrec - private def updateStreamCompletion(streamName: String, streamCompleted: Future[Done]): Unit = { + private def updateStreamCompletion(streamName: String, streamCompletion: (SharedKillSwitch, Future[Done])): Unit = { val prev = streamCompletions.get() - if (!streamCompletions.compareAndSet(prev, prev + (streamName → streamCompleted))) { - updateStreamCompletion(streamName, streamCompleted) + if (!streamCompletions.compareAndSet(prev, prev + (streamName → streamCompletion))) { + updateStreamCompletion(streamName, streamCompletion) } } @@ -618,7 +672,7 @@ private[remote] class Association( */ def streamsCompleted: Future[Done] = { implicit val ec = materializer.executionContext - Future.sequence(streamCompletions.get().values).map(_ ⇒ Done) + Future.sequence(streamCompletions.get().values.map(_._2)).map(_ ⇒ Done) } override def toString: String = diff --git a/akka-remote/src/main/scala/akka/remote/artery/Codecs.scala b/akka-remote/src/main/scala/akka/remote/artery/Codecs.scala index a470e5ae89..3e6ae64836 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/Codecs.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/Codecs.scala @@ -23,6 +23,7 @@ import akka.Done import akka.stream.stage.GraphStageWithMaterializedValue import scala.concurrent.Promise +import akka.event.Logging /** * INTERNAL API @@ -126,15 +127,18 @@ private[remote] class Encoder( bufferPool.release(envelope) outboundEnvelope.message match { case _: SystemMessageEnvelope ⇒ - log.error(e, "Failed to serialize system message [{}].", outboundEnvelope.message.getClass.getName) + log.error(e, "Failed to serialize system message [{}].", + Logging.messageClassName(outboundEnvelope.message)) throw e case _ if e.isInstanceOf[java.nio.BufferOverflowException] ⇒ - val reason = new OversizedPayloadException(s"Discarding oversized payload sent to ${outboundEnvelope.recipient}: " + - s"max allowed size ${envelope.byteBuffer.limit()} bytes. Message type [${outboundEnvelope.message.getClass.getName}].") - log.error(reason, "Failed to serialize oversized message [{}].", outboundEnvelope.message.getClass.getName) + val reason = new OversizedPayloadException("Discarding oversized payload sent to " + + s"${outboundEnvelope.recipient}: max allowed size ${envelope.byteBuffer.limit()} " + + s"bytes. Message type [${Logging.messageClassName(outboundEnvelope.message)}].") + log.error(reason, "Failed to serialize oversized message [{}].", + Logging.messageClassName(outboundEnvelope.message)) pull(in) case _ ⇒ - log.error(e, "Failed to serialize message [{}].", outboundEnvelope.message.getClass.getName) + log.error(e, "Failed to serialize message [{}].", Logging.messageClassName(outboundEnvelope.message)) pull(in) } } finally { diff --git a/akka-remote/src/main/scala/akka/remote/artery/Control.scala b/akka-remote/src/main/scala/akka/remote/artery/Control.scala index d28df42a05..0e89ca46c5 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/Control.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/Control.scala @@ -18,6 +18,7 @@ import akka.stream.stage.InHandler import akka.stream.stage.OutHandler import akka.remote.UniqueAddress import akka.util.OptionVal +import akka.event.Logging /** INTERNAL API: marker trait for protobuf-serializable artery messages */ private[akka] trait ArteryMessage extends Serializable @@ -206,7 +207,7 @@ private[akka] class OutboundControlJunction( buffer.offer(wrap(message)) else { // it's alright to drop control messages - log.debug("Dropping control message [{}] due to full buffer.", message.getClass.getName) + log.debug("Dropping control message [{}] due to full buffer.", Logging.messageClassName(message)) } } diff --git a/akka-remote/src/main/scala/akka/remote/artery/InboundQuarantineCheck.scala b/akka-remote/src/main/scala/akka/remote/artery/InboundQuarantineCheck.scala index a2b58991f5..8ac426bdca 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/InboundQuarantineCheck.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/InboundQuarantineCheck.scala @@ -13,6 +13,9 @@ import akka.stream.stage.InHandler import akka.stream.stage.OutHandler import akka.remote.UniqueAddress import akka.util.OptionVal +import akka.event.Logging +import akka.remote.HeartbeatMessage +import akka.actor.ActorSelectionMessage /** * INTERNAL API @@ -23,7 +26,9 @@ private[akka] class InboundQuarantineCheck(inboundContext: InboundContext) exten override val shape: FlowShape[InboundEnvelope, InboundEnvelope] = FlowShape(in, out) override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = - new GraphStageLogic(shape) with InHandler with OutHandler { + new GraphStageLogic(shape) with InHandler with OutHandler with StageLogging { + + override protected def logSource = classOf[InboundQuarantineCheck] // InHandler override def onPush(): Unit = { @@ -34,15 +39,27 @@ private[akka] class InboundQuarantineCheck(inboundContext: InboundContext) exten push(out, env) case OptionVal.Some(association) ⇒ if (association.associationState.isQuarantined(env.originUid)) { - inboundContext.sendControl( - association.remoteAddress, - Quarantined(inboundContext.localAddress, UniqueAddress(association.remoteAddress, env.originUid))) + if (log.isDebugEnabled) + log.debug( + "Dropping message [{}] from [{}#{}] because the system is quarantined", + Logging.messageClassName(env.message), association.remoteAddress, env.originUid) + // avoid starting outbound stream for heartbeats + if (!env.message.isInstanceOf[Quarantined] && !isHeartbeat(env.message)) + inboundContext.sendControl( + association.remoteAddress, + Quarantined(inboundContext.localAddress, UniqueAddress(association.remoteAddress, env.originUid))) pull(in) } else push(out, env) } } + private def isHeartbeat(msg: Any): Boolean = msg match { + case _: HeartbeatMessage ⇒ true + case ActorSelectionMessage(_: HeartbeatMessage, _, _) ⇒ true + case _ ⇒ false + } + // OutHandler override def onPull(): Unit = pull(in) diff --git a/akka-remote/src/main/scala/akka/remote/artery/MessageDispatcher.scala b/akka-remote/src/main/scala/akka/remote/artery/MessageDispatcher.scala index b5dc011a6e..4a61c4b191 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/MessageDispatcher.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/MessageDispatcher.scala @@ -64,7 +64,9 @@ private[akka] class MessageDispatcher( // run the receive logic for ActorSelectionMessage here to make sure it is not stuck on busy user actor ActorSelection.deliverSelection(l, sender, sel) case msg: PossiblyHarmful if UntrustedMode ⇒ - log.debug("operating in UntrustedMode, dropping inbound PossiblyHarmful message of type [{}]", msg.getClass.getName) + log.debug( + "operating in UntrustedMode, dropping inbound PossiblyHarmful message of type [{}]", + Logging.messageClassName(msg)) case msg: SystemMessage ⇒ l.sendSystemMessage(msg) case msg ⇒ l.!(msg)(sender) } @@ -76,7 +78,7 @@ private[akka] class MessageDispatcher( case r ⇒ log.error( "dropping message [{}] for unknown recipient [{}] arriving at [{}] inbound addresses are [{}]", - message.getClass, r, recipientAddress, provider.transport.addresses.mkString(", ")) + Logging.messageClassName(message), r, recipientAddress, provider.transport.addresses.mkString(", ")) } } diff --git a/akka-remote/src/main/scala/akka/remote/artery/TestStage.scala b/akka-remote/src/main/scala/akka/remote/artery/TestStage.scala index a71bb341c0..2b7ca5e27c 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/TestStage.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/TestStage.scala @@ -20,6 +20,7 @@ import akka.stream.stage.InHandler import akka.stream.stage.OutHandler import akka.stream.stage.TimerGraphStageLogic import akka.util.OptionVal +import akka.event.Logging /** * INTERNAL API: Thread safe mutable state that is shared among @@ -98,7 +99,7 @@ private[remote] class OutboundTestStage(outboundContext: OutboundContext, state: if (state.isBlackhole(outboundContext.localAddress.address, outboundContext.remoteAddress)) { log.debug( "dropping outbound message [{}] to [{}] because of blackhole", - env.message.getClass.getName, outboundContext.remoteAddress) + Logging.messageClassName(env.message), outboundContext.remoteAddress) pull(in) // drop message } else push(out, env) @@ -144,7 +145,7 @@ private[remote] class InboundTestStage(inboundContext: InboundContext, state: Sh if (state.isBlackhole(inboundContext.localAddress.address, association.remoteAddress)) { log.debug( "dropping inbound message [{}] from [{}] with UID [{}] because of blackhole", - env.message.getClass.getName, association.remoteAddress, env.originUid) + Logging.messageClassName(env.message), association.remoteAddress, env.originUid) pull(in) // drop message } else push(out, env)