Merge pull request #21492 from akka/wip-21407-stop-outbound-patriknw
stop outbound streams when quarantined, #21407
This commit is contained in:
commit
7522a1db40
14 changed files with 171 additions and 80 deletions
|
|
@ -396,6 +396,17 @@ object Logging {
|
|||
n.substring(i + 1)
|
||||
}
|
||||
|
||||
/**
|
||||
* Class name representation of a message.
|
||||
* `ActorSelectionMessage` representation includes class name of
|
||||
* wrapped message.
|
||||
*/
|
||||
def messageClassName(message: Any): String = message match {
|
||||
case null ⇒ "null"
|
||||
case ActorSelectionMessage(m, _, _) ⇒ s"ActorSelectionMessage(${m.getClass.getName})"
|
||||
case m ⇒ m.getClass.getName
|
||||
}
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
*/
|
||||
|
|
|
|||
|
|
@ -8,7 +8,7 @@ import scala.collection.immutable
|
|||
import akka.actor.{ ActorLogging, ActorSelection, Address, Actor, RootActorPath }
|
||||
import akka.cluster.ClusterEvent._
|
||||
import akka.remote.FailureDetectorRegistry
|
||||
import akka.remote.PriorityMessage
|
||||
import akka.remote.HeartbeatMessage
|
||||
import akka.actor.DeadLetterSuppression
|
||||
|
||||
/**
|
||||
|
|
@ -36,12 +36,12 @@ private[cluster] object ClusterHeartbeatSender {
|
|||
/**
|
||||
* Sent at regular intervals for failure detection.
|
||||
*/
|
||||
final case class Heartbeat(from: Address) extends ClusterMessage with PriorityMessage with DeadLetterSuppression
|
||||
final case class Heartbeat(from: Address) extends ClusterMessage with HeartbeatMessage with DeadLetterSuppression
|
||||
|
||||
/**
|
||||
* Sent as reply to [[Heartbeat]] messages.
|
||||
*/
|
||||
final case class HeartbeatRsp(from: UniqueAddress) extends ClusterMessage with PriorityMessage with DeadLetterSuppression
|
||||
final case class HeartbeatRsp(from: UniqueAddress) extends ClusterMessage with HeartbeatMessage with DeadLetterSuppression
|
||||
|
||||
// sent to self only
|
||||
case object HeartbeatTick
|
||||
|
|
|
|||
|
|
@ -12,6 +12,7 @@ import akka.cluster.ClusterEvent.MemberRemoved
|
|||
import akka.cluster.ClusterEvent.MemberWeaklyUp
|
||||
import akka.remote.FailureDetectorRegistry
|
||||
import akka.remote.RemoteWatcher
|
||||
import akka.remote.RARP
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
|
|
@ -51,6 +52,7 @@ private[cluster] class ClusterRemoteWatcher(
|
|||
unreachableReaperInterval,
|
||||
heartbeatExpectedResponseAfter) {
|
||||
|
||||
private val arteryEnabled = RARP(context.system).provider.remoteSettings.Artery.Enabled
|
||||
val cluster = Cluster(context.system)
|
||||
import cluster.selfAddress
|
||||
|
||||
|
|
@ -89,8 +91,10 @@ private[cluster] class ClusterRemoteWatcher(
|
|||
def memberRemoved(m: Member, previousStatus: MemberStatus): Unit =
|
||||
if (m.address != selfAddress) {
|
||||
clusterNodes -= m.address
|
||||
if (previousStatus == MemberStatus.Down) {
|
||||
quarantine(m.address, Some(m.uniqueAddress.uid), "Cluster member removed")
|
||||
// TODO We should probably always quarantine when member is removed,
|
||||
// but keeping old behavior for old remoting for now
|
||||
if (arteryEnabled || previousStatus == MemberStatus.Down) {
|
||||
quarantine(m.address, Some(m.uniqueAddress.uid), s"Cluster member removed, previous status [$previousStatus]")
|
||||
}
|
||||
publishAddressTerminated(m.address)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -147,14 +147,6 @@ akka {
|
|||
# but must be resolved to ActorRefs first.
|
||||
large-message-destinations = []
|
||||
|
||||
# Sets the log granularity level at which Akka logs artery events. This setting
|
||||
# can take the values OFF, ERROR, WARNING, INFO or DEBUG. Please note that the effective
|
||||
# logging level is still determined by the global logging level of the actor system:
|
||||
# for example debug level artery events will be only logged if the system
|
||||
# is running with debug level logging.
|
||||
# Failures to deserialize received messages also fall under this flag.
|
||||
log-lifecycle-events = DEBUG
|
||||
|
||||
# If set to a nonempty string artery will use the given dispatcher for
|
||||
# its internal actors otherwise the default dispatcher is used.
|
||||
use-dispatcher = "akka.remote.default-remote-dispatcher"
|
||||
|
|
@ -252,23 +244,29 @@ akka {
|
|||
# dropped and will trigger quarantine. The value should be longer than the length
|
||||
# of a network partition that you need to survive.
|
||||
give-up-system-message-after = 6 hours
|
||||
|
||||
|
||||
# during ActorSystem termination the remoting will wait this long for
|
||||
# an acknowledgment by the destination system that flushing of outstanding
|
||||
# remote messages has been completed
|
||||
shutdown-flush-timeout = 1 second
|
||||
|
||||
# Timeout after which the inbound stream is going to be restarted.
|
||||
# See 'inbound-max-restarts'
|
||||
inbound-restart-timeout = 5 seconds
|
||||
|
||||
# Max number of restarts for the inbound stream.
|
||||
# Max number of restarts within 'inbound-restart-timeout' for the inbound streams.
|
||||
# If more restarts occurs the ActorSystem will be terminated.
|
||||
inbound-max-restarts = 5
|
||||
|
||||
# Timeout after which outbout stream is going to be restarted for every association.
|
||||
# See 'outbound-max-restarts'
|
||||
outbound-restart-timeout = 5 seconds
|
||||
|
||||
# Max number of restars of the outbound stream for every association.
|
||||
# Max number of restarts within 'outbound-restart-timeout' for the outbound streams.
|
||||
# If more restarts occurs the ActorSystem will be terminated.
|
||||
outbound-max-restarts = 5
|
||||
|
||||
# Stop outbound stream of a quarantined association after this idle timeout, i.e.
|
||||
# when not used any more.
|
||||
stop-quarantined-after-idle = 3 seconds
|
||||
|
||||
# Timeout after which aeron driver has not had keepalive messages
|
||||
# from a client before it considers the client dead.
|
||||
|
|
|
|||
|
|
@ -31,8 +31,8 @@ private[akka] object RemoteWatcher {
|
|||
final case class WatchRemote(watchee: InternalActorRef, watcher: InternalActorRef)
|
||||
final case class UnwatchRemote(watchee: InternalActorRef, watcher: InternalActorRef)
|
||||
|
||||
@SerialVersionUID(1L) case object Heartbeat extends PriorityMessage
|
||||
@SerialVersionUID(1L) final case class HeartbeatRsp(addressUid: Int) extends PriorityMessage
|
||||
@SerialVersionUID(1L) case object Heartbeat extends HeartbeatMessage
|
||||
@SerialVersionUID(1L) final case class HeartbeatRsp(addressUid: Int) extends HeartbeatMessage
|
||||
|
||||
// sent to self only
|
||||
case object HeartbeatTick
|
||||
|
|
|
|||
|
|
@ -59,6 +59,11 @@ private[akka] object RARP extends ExtensionId[RARP] with ExtensionIdProvider {
|
|||
*/
|
||||
private[akka] trait PriorityMessage
|
||||
|
||||
/**
|
||||
* Failure detector heartbeat messages are marked with this trait.
|
||||
*/
|
||||
private[akka] trait HeartbeatMessage extends PriorityMessage
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
*/
|
||||
|
|
|
|||
|
|
@ -53,10 +53,6 @@ private[akka] final class ArterySettings private (config: Config) {
|
|||
val segments = entry.split('/').tail
|
||||
tree.insert(segments, NotUsed)
|
||||
}
|
||||
val LifecycleEventsLogLevel: LogLevel = Logging.levelFor(toRootLowerCase(getString("log-lifecycle-events"))) match {
|
||||
case Some(level) ⇒ level
|
||||
case None ⇒ throw new ConfigurationException("Logging level must be one of (off, debug, info, warning, error)")
|
||||
}
|
||||
val Dispatcher = getString("use-dispatcher")
|
||||
|
||||
object Advanced {
|
||||
|
|
@ -103,6 +99,8 @@ private[akka] final class ArterySettings private (config: Config) {
|
|||
val OutboundRestartTimeout = config.getMillisDuration("outbound-restart-timeout").requiring(interval ⇒
|
||||
interval > Duration.Zero, "outbound-restart-timeout must be more than zero")
|
||||
val OutboundMaxRestarts = getInt("outbound-max-restarts")
|
||||
val StopQuarantinedAfterIdle = config.getMillisDuration("stop-quarantined-after-idle").requiring(interval ⇒
|
||||
interval > Duration.Zero, "stop-quarantined-after-idle must be more than zero")
|
||||
val ClientLivenessTimeout = config.getMillisDuration("client-liveness-timeout").requiring(interval ⇒
|
||||
interval > Duration.Zero, "client-liveness-timeout must be more than zero")
|
||||
val ImageLivenessTimeoutNs = config.getMillisDuration("image-liveness-timeout").requiring(interval ⇒
|
||||
|
|
|
|||
|
|
@ -29,7 +29,6 @@ import akka.actor.Props
|
|||
import akka.event.Logging
|
||||
import akka.event.LoggingAdapter
|
||||
import akka.remote.AddressUidExtension
|
||||
import akka.remote.EventPublisher
|
||||
import akka.remote.RemoteActorRef
|
||||
import akka.remote.RemoteActorRefProvider
|
||||
import akka.remote.RemoteTransport
|
||||
|
|
@ -303,7 +302,6 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R
|
|||
override def addresses: Set[Address] = _addresses
|
||||
override def localAddressForRemote(remote: Address): Address = defaultAddress
|
||||
override val log: LoggingAdapter = Logging(system, getClass.getName)
|
||||
val eventPublisher = new EventPublisher(system, log, settings.LifecycleEventsLogLevel)
|
||||
|
||||
private val codec: AkkaPduCodec = AkkaPduProtobufCodec
|
||||
private val killSwitch: SharedKillSwitch = KillSwitches.shared("transportKillSwitch")
|
||||
|
|
@ -621,7 +619,7 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R
|
|||
"prepared for another incarnation with uid [{}] than current uid [{}], table: [{}]",
|
||||
from, table.originUid, localAddress.uid, table)
|
||||
case ActorRefCompressionAdvertisementAck(from, tableVersion) ⇒
|
||||
inboundCompressions.foreach(_.confirmActorRefCompressionAdvertisement(from.uid, tableVersion))
|
||||
_inboundCompressions.foreach(_.confirmActorRefCompressionAdvertisement(from.uid, tableVersion))
|
||||
case ClassManifestCompressionAdvertisement(from, table) ⇒
|
||||
if (table.originUid == localAddress.uid) {
|
||||
log.debug("Incoming Class Manifest compression advertisement from [{}], table: [{}]", from, table)
|
||||
|
|
@ -649,7 +647,7 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R
|
|||
// Instead, the downing strategy should act on ThisActorSystemQuarantinedEvent, e.g.
|
||||
// use it as a STONITH signal.
|
||||
val lifecycleEvent = ThisActorSystemQuarantinedEvent(localAddress.address, from.address)
|
||||
publishLifecycleEvent(lifecycleEvent)
|
||||
system.eventStream.publish(lifecycleEvent)
|
||||
|
||||
case _: ActorSystemTerminating ⇒
|
||||
inboundEnvelope.sender match {
|
||||
|
|
@ -760,6 +758,7 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R
|
|||
|
||||
override def shutdown(): Future[Done] = {
|
||||
if (hasBeenShutdown.compareAndSet(false, true)) {
|
||||
log.debug("Shutting down [{}]", localAddress)
|
||||
val allAssociations = associationRegistry.allAssociations
|
||||
val flushing: Future[Done] =
|
||||
if (allAssociations.isEmpty) Future.successful(Done)
|
||||
|
|
@ -886,9 +885,6 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R
|
|||
}
|
||||
}
|
||||
|
||||
private def publishLifecycleEvent(event: RemotingLifecycleEvent): Unit =
|
||||
eventPublisher.notifyListeners(event)
|
||||
|
||||
override def quarantine(remoteAddress: Address, uid: Option[Int], reason: String): Unit = {
|
||||
try {
|
||||
// FIXME use Long uid
|
||||
|
|
@ -1026,7 +1022,7 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R
|
|||
|
||||
/** INTERNAL API: for testing only. */
|
||||
private[remote] def triggerCompressionAdvertisements(actorRef: Boolean, manifest: Boolean) = {
|
||||
inboundCompressions.foreach {
|
||||
_inboundCompressions.foreach {
|
||||
case c: InboundCompressionsImpl if actorRef || manifest ⇒
|
||||
log.info("Triggering compression table advertisement for {}", c)
|
||||
if (actorRef) c.runNextActorRefAdvertisement()
|
||||
|
|
|
|||
|
|
@ -43,6 +43,9 @@ import akka.stream.scaladsl.Source
|
|||
import akka.util.{ Unsafe, WildcardIndex }
|
||||
import akka.util.OptionVal
|
||||
import org.agrona.concurrent.ManyToOneConcurrentArrayQueue
|
||||
import akka.stream.SharedKillSwitch
|
||||
import scala.util.control.NoStackTrace
|
||||
import akka.actor.Cancellable
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
|
|
@ -73,6 +76,8 @@ private[remote] object Association {
|
|||
final val ControlQueueIndex = 0
|
||||
final val LargeQueueIndex = 1
|
||||
final val OrdinaryQueueIndex = 2
|
||||
|
||||
private object OutboundStreamStopSignal extends RuntimeException with NoStackTrace
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -139,7 +144,8 @@ private[remote] class Association(
|
|||
else Future.sequence(c.map(_.changeActorRefCompression(table))).map(_ ⇒ Done)
|
||||
timeoutAfter(result, changeCompressionTimeout, new ChangeOutboundCompressionFailed)
|
||||
}
|
||||
private[this] val streamCompletions = new AtomicReference(Map.empty[String, Future[Done]])
|
||||
private[this] val streamCompletions = new AtomicReference(Map.empty[String, (SharedKillSwitch, Future[Done])])
|
||||
private[this] val idle = new AtomicReference[Option[Cancellable]](None)
|
||||
|
||||
private[artery] def changeClassManifestCompression(table: CompressionTable[String]): Future[Done] = {
|
||||
import transport.system.dispatcher
|
||||
|
|
@ -180,7 +186,8 @@ private[remote] class Association(
|
|||
case w: LazyQueueWrapper ⇒ w.runMaterialize()
|
||||
case _ ⇒
|
||||
}
|
||||
// materialization not completed yet
|
||||
// the outboundControlIngress may be accessed before the stream is materialized
|
||||
// using CountDownLatch to make sure that materialization is completed
|
||||
materializing.await(10, TimeUnit.SECONDS)
|
||||
_outboundControlIngress match {
|
||||
case OptionVal.Some(o) ⇒ o
|
||||
|
|
@ -239,6 +246,7 @@ private[remote] class Association(
|
|||
if (swapState(current, newState)) {
|
||||
current.uniqueRemoteAddressValue() match {
|
||||
case Some(old) ⇒
|
||||
cancelIdleTimer()
|
||||
log.debug(
|
||||
"Incarnation {} of association to [{}] with new UID [{}] (old UID [{}])",
|
||||
newState.incarnation, peer.address, peer.uid, old.uid)
|
||||
|
|
@ -258,9 +266,14 @@ private[remote] class Association(
|
|||
override def sendControl(message: ControlMessage): Unit = {
|
||||
try {
|
||||
if (!transport.isShutdown)
|
||||
outboundControlIngress.sendControlMessage(message)
|
||||
if (associationState.isQuarantined()) {
|
||||
log.debug("Send control message [{}] to quarantined [{}]", Logging.messageClassName(message),
|
||||
remoteAddress)
|
||||
startIdleTimer()
|
||||
}
|
||||
outboundControlIngress.sendControlMessage(message)
|
||||
} catch {
|
||||
case ShuttingDown => // silence it
|
||||
case ShuttingDown ⇒ // silence it
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -275,13 +288,19 @@ private[remote] class Association(
|
|||
def dropped(queueIndex: Int, qSize: Int, env: OutboundEnvelope): Unit = {
|
||||
log.debug(
|
||||
"Dropping message [{}] from [{}] to [{}] due to overflow of send queue, size [{}]",
|
||||
message.getClass, sender.getOrElse(deadletters), recipient.getOrElse(recipient), qSize)
|
||||
Logging.messageClassName(message), sender.getOrElse(deadletters), recipient.getOrElse(recipient), qSize)
|
||||
flightRecorder.hiFreq(Transport_SendQueueOverflow, queueIndex)
|
||||
deadletters ! env
|
||||
}
|
||||
|
||||
val quarantined = associationState.isQuarantined()
|
||||
|
||||
// allow ActorSelectionMessage to pass through quarantine, to be able to establish interaction with new system
|
||||
if (message.isInstanceOf[ActorSelectionMessage] || !associationState.isQuarantined() || message == ClearSystemMessageDelivery) {
|
||||
if (message.isInstanceOf[ActorSelectionMessage] || !quarantined || message == ClearSystemMessageDelivery) {
|
||||
if (quarantined && message != ClearSystemMessageDelivery) {
|
||||
log.debug("Quarantine piercing attempt with message [{}] to [{}]", Logging.messageClassName(message), recipient.getOrElse(""))
|
||||
startIdleTimer()
|
||||
}
|
||||
try {
|
||||
message match {
|
||||
case _: SystemMessage ⇒
|
||||
|
|
@ -318,12 +337,12 @@ private[remote] class Association(
|
|||
dropped(queueIndex, queueSize, outboundEnvelope)
|
||||
}
|
||||
} catch {
|
||||
case ShuttingDown => // silence it
|
||||
case ShuttingDown ⇒ // silence it
|
||||
}
|
||||
} else if (log.isDebugEnabled)
|
||||
log.debug(
|
||||
"Dropping message [{}] from [{}] to [{}] due to quarantined system [{}]",
|
||||
message.getClass, sender.getOrElse(deadletters), recipient.getOrElse(recipient), remoteAddress)
|
||||
Logging.messageClassName(message), sender.getOrElse(deadletters), recipient.getOrElse(recipient), remoteAddress)
|
||||
}
|
||||
|
||||
private def selectQueue(recipient: OptionVal[RemoteActorRef]): Int = {
|
||||
|
|
@ -374,16 +393,19 @@ private[remote] class Association(
|
|||
if (swapState(current, newState)) {
|
||||
// quarantine state change was performed
|
||||
log.warning(
|
||||
"Association to [{}] with UID [{}] is irrecoverably failed. Quarantining address. {}",
|
||||
"Association to [{}] with UID [{}] is irrecoverably failed. UID is now quarantined and all " +
|
||||
"messages to this UID will be delivered to dead letters. " +
|
||||
"Remote actorsystem must be restarted to recover from this situation. {}",
|
||||
remoteAddress, u, reason)
|
||||
// FIXME when we complete the switch to Long UID we must use Long here also, issue #20644
|
||||
transport.system.eventStream.publish(QuarantinedEvent(remoteAddress, u.toInt))
|
||||
clearOutboundCompression()
|
||||
clearInboundCompression(u)
|
||||
// FIXME when we complete the switch to Long UID we must use Long here also, issue #20644
|
||||
transport.eventPublisher.notifyListeners(QuarantinedEvent(remoteAddress, u.toInt))
|
||||
// end delivery of system messages to that incarnation after this point
|
||||
send(ClearSystemMessageDelivery, OptionVal.None, OptionVal.None)
|
||||
// try to tell the other system that we have quarantined it
|
||||
sendControl(Quarantined(localAddress, peer))
|
||||
startIdleTimer()
|
||||
} else
|
||||
quarantine(reason, uid) // recursive
|
||||
}
|
||||
|
|
@ -402,6 +424,22 @@ private[remote] class Association(
|
|||
|
||||
}
|
||||
|
||||
private def cancelIdleTimer(): Unit = {
|
||||
val current = idle.get
|
||||
current.foreach(_.cancel())
|
||||
idle.compareAndSet(current, None)
|
||||
}
|
||||
|
||||
private def startIdleTimer(): Unit = {
|
||||
cancelIdleTimer()
|
||||
idle.set(Some(transport.system.scheduler.scheduleOnce(advancedSettings.StopQuarantinedAfterIdle) {
|
||||
if (associationState.isQuarantined())
|
||||
streamCompletions.get.valuesIterator.foreach {
|
||||
case (killSwitch, _) ⇒ killSwitch.abort(OutboundStreamStopSignal)
|
||||
}
|
||||
}(transport.system.dispatcher)))
|
||||
}
|
||||
|
||||
/**
|
||||
* Called once after construction when the `Association` instance
|
||||
* wins the CAS in the `AssociationRegistry`. It will materialize
|
||||
|
|
@ -429,16 +467,17 @@ private[remote] class Association(
|
|||
|
||||
private def runOutboundControlStream(): Unit = {
|
||||
if (transport.isShutdown) throw ShuttingDown
|
||||
// stage in the control stream may access the outboundControlIngress before returned here
|
||||
// using CountDownLatch to make sure that materialization is completed before accessing outboundControlIngress
|
||||
materializing = new CountDownLatch(1)
|
||||
log.debug("Starting outbound control stream to [{}]", remoteAddress)
|
||||
|
||||
val wrapper = getOrCreateQueueWrapper(ControlQueueIndex, queueSize)
|
||||
queues(ControlQueueIndex) = wrapper // use new underlying queue immediately for restarts
|
||||
queuesVisibility = true // volatile write for visibility of the queues array
|
||||
|
||||
val streamKillSwitch = KillSwitches.shared("outboundControlStreamKillSwitch")
|
||||
|
||||
val (queueValue, (control, completed)) =
|
||||
Source.fromGraph(new SendQueue[OutboundEnvelope])
|
||||
.via(streamKillSwitch.flow)
|
||||
.toMat(transport.outboundControl(this))(Keep.both)
|
||||
.run()(materializer)
|
||||
|
||||
|
|
@ -449,7 +488,7 @@ private[remote] class Association(
|
|||
_outboundControlIngress = OptionVal.Some(control)
|
||||
materializing.countDown()
|
||||
attachStreamRestart("Outbound control stream", ControlQueueIndex, controlQueueSize,
|
||||
completed, () ⇒ runOutboundControlStream())
|
||||
streamKillSwitch, completed, () ⇒ runOutboundControlStream())
|
||||
}
|
||||
|
||||
private def getOrCreateQueueWrapper(queueIndex: Int, capacity: Int): QueueWrapper = {
|
||||
|
|
@ -465,13 +504,17 @@ private[remote] class Association(
|
|||
private def runOutboundOrdinaryMessagesStream(): Unit = {
|
||||
if (transport.isShutdown) throw ShuttingDown
|
||||
if (outboundLanes == 1) {
|
||||
log.debug("Starting outbound message stream to [{}]", remoteAddress)
|
||||
val queueIndex = OrdinaryQueueIndex
|
||||
val wrapper = getOrCreateQueueWrapper(queueIndex, queueSize)
|
||||
queues(queueIndex) = wrapper // use new underlying queue immediately for restarts
|
||||
queuesVisibility = true // volatile write for visibility of the queues array
|
||||
|
||||
val streamKillSwitch = KillSwitches.shared("outboundMessagesKillSwitch")
|
||||
|
||||
val ((queueValue, testMgmt), (changeCompression, completed)) =
|
||||
Source.fromGraph(new SendQueue[OutboundEnvelope])
|
||||
.via(streamKillSwitch.flow)
|
||||
.viaMat(transport.outboundTestFlow(this))(Keep.both)
|
||||
.toMat(transport.outbound(this))(Keep.both)
|
||||
.run()(materializer)
|
||||
|
|
@ -483,9 +526,10 @@ private[remote] class Association(
|
|||
changeOutboundCompression = Vector(changeCompression)
|
||||
|
||||
attachStreamRestart("Outbound message stream", OrdinaryQueueIndex, queueSize,
|
||||
completed, () ⇒ runOutboundOrdinaryMessagesStream())
|
||||
streamKillSwitch, completed, () ⇒ runOutboundOrdinaryMessagesStream())
|
||||
|
||||
} else {
|
||||
log.debug("Starting outbound message stream to [{}] with [{}] lanes", remoteAddress, outboundLanes)
|
||||
val wrappers = (0 until outboundLanes).map { i ⇒
|
||||
val wrapper = getOrCreateQueueWrapper(OrdinaryQueueIndex + i, queueSize)
|
||||
queues(OrdinaryQueueIndex + i) = wrapper // use new underlying queue immediately for restarts
|
||||
|
|
@ -493,10 +537,10 @@ private[remote] class Association(
|
|||
wrapper
|
||||
}.toVector
|
||||
|
||||
val laneKillSwitch = KillSwitches.shared("outboundLaneKillSwitch")
|
||||
val streamKillSwitch = KillSwitches.shared("outboundMessagesKillSwitch")
|
||||
|
||||
val lane = Source.fromGraph(new SendQueue[OutboundEnvelope])
|
||||
.via(laneKillSwitch.flow)
|
||||
.via(streamKillSwitch.flow)
|
||||
.via(transport.outboundTestFlow(this))
|
||||
.viaMat(transport.outboundLane(this))(Keep.both)
|
||||
.watchTermination()(Keep.both)
|
||||
|
|
@ -507,7 +551,7 @@ private[remote] class Association(
|
|||
}
|
||||
|
||||
val (mergeHub, aeronSinkCompleted) = MergeHub.source[EnvelopeBuffer]
|
||||
.via(laneKillSwitch.flow)
|
||||
.via(streamKillSwitch.flow)
|
||||
.toMat(transport.aeronSink(this))(Keep.both).run()(materializer)
|
||||
|
||||
val values: Vector[(SendQueue.QueueValue[OutboundEnvelope], Encoder.ChangeOutboundCompression, Future[Done])] =
|
||||
|
|
@ -522,9 +566,9 @@ private[remote] class Association(
|
|||
|
||||
// tear down all parts if one part fails or completes
|
||||
completed.onFailure {
|
||||
case reason: Throwable ⇒ laneKillSwitch.abort(reason)
|
||||
case reason: Throwable ⇒ streamKillSwitch.abort(reason)
|
||||
}
|
||||
(laneCompletedValues :+ aeronSinkCompleted).foreach(_.onSuccess { case _ ⇒ laneKillSwitch.shutdown() })
|
||||
(laneCompletedValues :+ aeronSinkCompleted).foreach(_.onSuccess { case _ ⇒ streamKillSwitch.shutdown() })
|
||||
|
||||
queueValues.zip(wrappers).zipWithIndex.foreach {
|
||||
case ((q, w), i) ⇒
|
||||
|
|
@ -536,17 +580,21 @@ private[remote] class Association(
|
|||
changeOutboundCompression = changeCompressionValues
|
||||
|
||||
attachStreamRestart("Outbound message stream", OrdinaryQueueIndex, queueSize,
|
||||
completed, () ⇒ runOutboundOrdinaryMessagesStream())
|
||||
streamKillSwitch, completed, () ⇒ runOutboundOrdinaryMessagesStream())
|
||||
}
|
||||
}
|
||||
|
||||
private def runOutboundLargeMessagesStream(): Unit = {
|
||||
if (transport.isShutdown) throw ShuttingDown
|
||||
log.debug("Starting outbound large message stream to [{}]", remoteAddress)
|
||||
val wrapper = getOrCreateQueueWrapper(LargeQueueIndex, largeQueueSize)
|
||||
queues(LargeQueueIndex) = wrapper // use new underlying queue immediately for restarts
|
||||
queuesVisibility = true // volatile write for visibility of the queues array
|
||||
|
||||
val streamKillSwitch = KillSwitches.shared("outboundLargeMessagesKillSwitch")
|
||||
|
||||
val (queueValue, completed) = Source.fromGraph(new SendQueue[OutboundEnvelope])
|
||||
.via(streamKillSwitch.flow)
|
||||
.via(transport.outboundTestFlow(this))
|
||||
.toMat(transport.outboundLarge(this))(Keep.both)
|
||||
.run()(materializer)
|
||||
|
|
@ -556,32 +604,38 @@ private[remote] class Association(
|
|||
queues(LargeQueueIndex) = queueValue
|
||||
queuesVisibility = true // volatile write for visibility of the queues array
|
||||
attachStreamRestart("Outbound large message stream", LargeQueueIndex, largeQueueSize,
|
||||
completed, () ⇒ runOutboundLargeMessagesStream())
|
||||
streamKillSwitch, completed, () ⇒ runOutboundLargeMessagesStream())
|
||||
}
|
||||
|
||||
private def attachStreamRestart(streamName: String, queueIndex: Int, queueCapacity: Int,
|
||||
streamCompleted: Future[Done], restart: () ⇒ Unit): Unit = {
|
||||
streamKillSwitch: SharedKillSwitch, streamCompleted: Future[Done], restart: () ⇒ Unit): Unit = {
|
||||
|
||||
def lazyRestart(): Unit = {
|
||||
changeOutboundCompression = Vector.empty
|
||||
if (queueIndex == ControlQueueIndex)
|
||||
if (queueIndex == ControlQueueIndex) {
|
||||
materializing = new CountDownLatch(1)
|
||||
_outboundControlIngress = OptionVal.None
|
||||
}
|
||||
// LazyQueueWrapper will invoke the `restart` function when first message is offered
|
||||
queues(queueIndex) = LazyQueueWrapper(createQueue(queueCapacity), restart)
|
||||
queuesVisibility = true // volatile write for visibility of the queues array
|
||||
}
|
||||
|
||||
implicit val ec = materializer.executionContext
|
||||
updateStreamCompletion(streamName, streamCompleted.recover { case _ ⇒ Done })
|
||||
updateStreamCompletion(streamName, (streamKillSwitch, streamCompleted.recover { case _ ⇒ Done }))
|
||||
streamCompleted.onFailure {
|
||||
case ArteryTransport.ShutdownSignal ⇒ // shutdown as expected
|
||||
case _: AeronTerminated ⇒ // shutdown already in progress
|
||||
case cause if transport.isShutdown ⇒
|
||||
// don't restart after shutdown, but log some details so we notice
|
||||
log.error(cause, s"{} to {} failed after shutdown. {}", streamName, remoteAddress, cause.getMessage)
|
||||
log.error(cause, s"{} to [{}] failed after shutdown. {}", streamName, remoteAddress, cause.getMessage)
|
||||
case _: AbruptTerminationException ⇒ // ActorSystem shutdown
|
||||
case OutboundStreamStopSignal ⇒
|
||||
// stop as expected due to quarantine
|
||||
log.debug("{} to [{}] stopped. It will be restarted if used again.", streamName, remoteAddress)
|
||||
lazyRestart()
|
||||
case cause: GaveUpMessageException ⇒
|
||||
log.debug("{} to {} failed. Restarting it. {}", streamName, remoteAddress, cause.getMessage)
|
||||
log.debug("{} to [{}] failed. Restarting it. {}", streamName, remoteAddress, cause.getMessage)
|
||||
// restart unconditionally, without counting restarts
|
||||
lazyRestart()
|
||||
case cause ⇒
|
||||
|
|
@ -593,10 +647,10 @@ private[remote] class Association(
|
|||
}
|
||||
|
||||
if (restartCounter.restart()) {
|
||||
log.error(cause, "{} to {} failed. Restarting it. {}", streamName, remoteAddress, cause.getMessage)
|
||||
log.error(cause, "{} to [{}] failed. Restarting it. {}", streamName, remoteAddress, cause.getMessage)
|
||||
lazyRestart()
|
||||
} else {
|
||||
log.error(cause, s"{} to {} failed and restarted {} times within {} seconds. Terminating system. ${cause.getMessage}",
|
||||
log.error(cause, s"{} to [{}] failed and restarted {} times within {} seconds. Terminating system. ${cause.getMessage}",
|
||||
streamName, remoteAddress, advancedSettings.OutboundMaxRestarts, advancedSettings.OutboundRestartTimeout.toSeconds)
|
||||
transport.system.terminate()
|
||||
}
|
||||
|
|
@ -605,10 +659,10 @@ private[remote] class Association(
|
|||
|
||||
// set the future that completes when the current stream for a given name completes
|
||||
@tailrec
|
||||
private def updateStreamCompletion(streamName: String, streamCompleted: Future[Done]): Unit = {
|
||||
private def updateStreamCompletion(streamName: String, streamCompletion: (SharedKillSwitch, Future[Done])): Unit = {
|
||||
val prev = streamCompletions.get()
|
||||
if (!streamCompletions.compareAndSet(prev, prev + (streamName → streamCompleted))) {
|
||||
updateStreamCompletion(streamName, streamCompleted)
|
||||
if (!streamCompletions.compareAndSet(prev, prev + (streamName → streamCompletion))) {
|
||||
updateStreamCompletion(streamName, streamCompletion)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -618,7 +672,7 @@ private[remote] class Association(
|
|||
*/
|
||||
def streamsCompleted: Future[Done] = {
|
||||
implicit val ec = materializer.executionContext
|
||||
Future.sequence(streamCompletions.get().values).map(_ ⇒ Done)
|
||||
Future.sequence(streamCompletions.get().values.map(_._2)).map(_ ⇒ Done)
|
||||
}
|
||||
|
||||
override def toString: String =
|
||||
|
|
|
|||
|
|
@ -23,6 +23,7 @@ import akka.Done
|
|||
import akka.stream.stage.GraphStageWithMaterializedValue
|
||||
|
||||
import scala.concurrent.Promise
|
||||
import akka.event.Logging
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
|
|
@ -126,15 +127,18 @@ private[remote] class Encoder(
|
|||
bufferPool.release(envelope)
|
||||
outboundEnvelope.message match {
|
||||
case _: SystemMessageEnvelope ⇒
|
||||
log.error(e, "Failed to serialize system message [{}].", outboundEnvelope.message.getClass.getName)
|
||||
log.error(e, "Failed to serialize system message [{}].",
|
||||
Logging.messageClassName(outboundEnvelope.message))
|
||||
throw e
|
||||
case _ if e.isInstanceOf[java.nio.BufferOverflowException] ⇒
|
||||
val reason = new OversizedPayloadException(s"Discarding oversized payload sent to ${outboundEnvelope.recipient}: " +
|
||||
s"max allowed size ${envelope.byteBuffer.limit()} bytes. Message type [${outboundEnvelope.message.getClass.getName}].")
|
||||
log.error(reason, "Failed to serialize oversized message [{}].", outboundEnvelope.message.getClass.getName)
|
||||
val reason = new OversizedPayloadException("Discarding oversized payload sent to " +
|
||||
s"${outboundEnvelope.recipient}: max allowed size ${envelope.byteBuffer.limit()} " +
|
||||
s"bytes. Message type [${Logging.messageClassName(outboundEnvelope.message)}].")
|
||||
log.error(reason, "Failed to serialize oversized message [{}].",
|
||||
Logging.messageClassName(outboundEnvelope.message))
|
||||
pull(in)
|
||||
case _ ⇒
|
||||
log.error(e, "Failed to serialize message [{}].", outboundEnvelope.message.getClass.getName)
|
||||
log.error(e, "Failed to serialize message [{}].", Logging.messageClassName(outboundEnvelope.message))
|
||||
pull(in)
|
||||
}
|
||||
} finally {
|
||||
|
|
|
|||
|
|
@ -18,6 +18,7 @@ import akka.stream.stage.InHandler
|
|||
import akka.stream.stage.OutHandler
|
||||
import akka.remote.UniqueAddress
|
||||
import akka.util.OptionVal
|
||||
import akka.event.Logging
|
||||
|
||||
/** INTERNAL API: marker trait for protobuf-serializable artery messages */
|
||||
private[akka] trait ArteryMessage extends Serializable
|
||||
|
|
@ -206,7 +207,7 @@ private[akka] class OutboundControlJunction(
|
|||
buffer.offer(wrap(message))
|
||||
else {
|
||||
// it's alright to drop control messages
|
||||
log.debug("Dropping control message [{}] due to full buffer.", message.getClass.getName)
|
||||
log.debug("Dropping control message [{}] due to full buffer.", Logging.messageClassName(message))
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -13,6 +13,9 @@ import akka.stream.stage.InHandler
|
|||
import akka.stream.stage.OutHandler
|
||||
import akka.remote.UniqueAddress
|
||||
import akka.util.OptionVal
|
||||
import akka.event.Logging
|
||||
import akka.remote.HeartbeatMessage
|
||||
import akka.actor.ActorSelectionMessage
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
|
|
@ -23,7 +26,9 @@ private[akka] class InboundQuarantineCheck(inboundContext: InboundContext) exten
|
|||
override val shape: FlowShape[InboundEnvelope, InboundEnvelope] = FlowShape(in, out)
|
||||
|
||||
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
|
||||
new GraphStageLogic(shape) with InHandler with OutHandler {
|
||||
new GraphStageLogic(shape) with InHandler with OutHandler with StageLogging {
|
||||
|
||||
override protected def logSource = classOf[InboundQuarantineCheck]
|
||||
|
||||
// InHandler
|
||||
override def onPush(): Unit = {
|
||||
|
|
@ -34,15 +39,27 @@ private[akka] class InboundQuarantineCheck(inboundContext: InboundContext) exten
|
|||
push(out, env)
|
||||
case OptionVal.Some(association) ⇒
|
||||
if (association.associationState.isQuarantined(env.originUid)) {
|
||||
inboundContext.sendControl(
|
||||
association.remoteAddress,
|
||||
Quarantined(inboundContext.localAddress, UniqueAddress(association.remoteAddress, env.originUid)))
|
||||
if (log.isDebugEnabled)
|
||||
log.debug(
|
||||
"Dropping message [{}] from [{}#{}] because the system is quarantined",
|
||||
Logging.messageClassName(env.message), association.remoteAddress, env.originUid)
|
||||
// avoid starting outbound stream for heartbeats
|
||||
if (!env.message.isInstanceOf[Quarantined] && !isHeartbeat(env.message))
|
||||
inboundContext.sendControl(
|
||||
association.remoteAddress,
|
||||
Quarantined(inboundContext.localAddress, UniqueAddress(association.remoteAddress, env.originUid)))
|
||||
pull(in)
|
||||
} else
|
||||
push(out, env)
|
||||
}
|
||||
}
|
||||
|
||||
private def isHeartbeat(msg: Any): Boolean = msg match {
|
||||
case _: HeartbeatMessage ⇒ true
|
||||
case ActorSelectionMessage(_: HeartbeatMessage, _, _) ⇒ true
|
||||
case _ ⇒ false
|
||||
}
|
||||
|
||||
// OutHandler
|
||||
override def onPull(): Unit = pull(in)
|
||||
|
||||
|
|
|
|||
|
|
@ -64,7 +64,9 @@ private[akka] class MessageDispatcher(
|
|||
// run the receive logic for ActorSelectionMessage here to make sure it is not stuck on busy user actor
|
||||
ActorSelection.deliverSelection(l, sender, sel)
|
||||
case msg: PossiblyHarmful if UntrustedMode ⇒
|
||||
log.debug("operating in UntrustedMode, dropping inbound PossiblyHarmful message of type [{}]", msg.getClass.getName)
|
||||
log.debug(
|
||||
"operating in UntrustedMode, dropping inbound PossiblyHarmful message of type [{}]",
|
||||
Logging.messageClassName(msg))
|
||||
case msg: SystemMessage ⇒ l.sendSystemMessage(msg)
|
||||
case msg ⇒ l.!(msg)(sender)
|
||||
}
|
||||
|
|
@ -76,7 +78,7 @@ private[akka] class MessageDispatcher(
|
|||
|
||||
case r ⇒ log.error(
|
||||
"dropping message [{}] for unknown recipient [{}] arriving at [{}] inbound addresses are [{}]",
|
||||
message.getClass, r, recipientAddress, provider.transport.addresses.mkString(", "))
|
||||
Logging.messageClassName(message), r, recipientAddress, provider.transport.addresses.mkString(", "))
|
||||
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -20,6 +20,7 @@ import akka.stream.stage.InHandler
|
|||
import akka.stream.stage.OutHandler
|
||||
import akka.stream.stage.TimerGraphStageLogic
|
||||
import akka.util.OptionVal
|
||||
import akka.event.Logging
|
||||
|
||||
/**
|
||||
* INTERNAL API: Thread safe mutable state that is shared among
|
||||
|
|
@ -98,7 +99,7 @@ private[remote] class OutboundTestStage(outboundContext: OutboundContext, state:
|
|||
if (state.isBlackhole(outboundContext.localAddress.address, outboundContext.remoteAddress)) {
|
||||
log.debug(
|
||||
"dropping outbound message [{}] to [{}] because of blackhole",
|
||||
env.message.getClass.getName, outboundContext.remoteAddress)
|
||||
Logging.messageClassName(env.message), outboundContext.remoteAddress)
|
||||
pull(in) // drop message
|
||||
} else
|
||||
push(out, env)
|
||||
|
|
@ -144,7 +145,7 @@ private[remote] class InboundTestStage(inboundContext: InboundContext, state: Sh
|
|||
if (state.isBlackhole(inboundContext.localAddress.address, association.remoteAddress)) {
|
||||
log.debug(
|
||||
"dropping inbound message [{}] from [{}] with UID [{}] because of blackhole",
|
||||
env.message.getClass.getName, association.remoteAddress, env.originUid)
|
||||
Logging.messageClassName(env.message), association.remoteAddress, env.originUid)
|
||||
pull(in) // drop message
|
||||
} else
|
||||
push(out, env)
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue