Quarantine and cleanup idle associations, #24972

* fix NPE in shutdownTransport
  * perhaps because shutdown before started
  * system.dispatcher is used in other places of the shutdown
* improve logging of compression advertisment progress
* adjust RestartFlow.withBackoff parameters
* quarantine after ActorSystemTerminating signal
  (will cleanup compressions)
* Quarantine idle associations
  * liveness checks by sending extra HandshakeReq and update the
    lastUsed when reply received
  * concervative default value to survive network partition, in
    case no other messages are sent
* Adjust logging and QuarantinedEvent for harmless quarantine
  * Harmless if it was via the shutdown signal or cluster leaving
This commit is contained in:
Patrik Nordwall 2018-04-25 08:38:27 +02:00
parent f976f8d793
commit 7fc7744049
15 changed files with 297 additions and 176 deletions

View file

@ -108,7 +108,8 @@ private[cluster] class ClusterRemoteWatcher(
clusterNodes -= m.address
if (previousStatus == MemberStatus.Down) {
quarantine(m.address, Some(m.uniqueAddress.longUid), s"Cluster member removed, previous status [$previousStatus]")
quarantine(m.address, Some(m.uniqueAddress.longUid),
s"Cluster member removed, previous status [$previousStatus]", harmless = false)
} else if (arteryEnabled) {
// Don't quarantine gracefully removed members (leaving) directly,
// give Cluster Singleton some time to exchange TakeOver/HandOver messages.
@ -128,14 +129,15 @@ private[cluster] class ClusterRemoteWatcher(
pendingDelayedQuarantine.find(_.address == newIncarnation.address).foreach { oldIncarnation
pendingDelayedQuarantine -= oldIncarnation
quarantine(oldIncarnation.address, Some(oldIncarnation.longUid),
s"Cluster member removed, new incarnation joined")
s"Cluster member removed, new incarnation joined", harmless = true)
}
}
def delayedQuarantine(m: Member, previousStatus: MemberStatus): Unit = {
if (pendingDelayedQuarantine(m.uniqueAddress)) {
pendingDelayedQuarantine -= m.uniqueAddress
quarantine(m.address, Some(m.uniqueAddress.longUid), s"Cluster member removed, previous status [$previousStatus]")
quarantine(m.address, Some(m.uniqueAddress.longUid), s"Cluster member removed, previous status [$previousStatus]",
harmless = true)
}
}

View file

@ -1,3 +1,10 @@
# #24972 Artery internals
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.remote.artery.compress.InboundCompression.confirmAdvertisement")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.remote.artery.OutboundHandshake.this")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.remote.artery.ArteryTransport.terminationHintReplier")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.remote.RemoteWatcher.quarantine")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.remote.artery.Association.quarantine")
# Internal API changes
ProblemFilters.exclude[MissingTypesProblem]("akka.remote.artery.ArteryTransport$InboundStreamMatValues$")
ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.remote.artery.ArteryTransport#InboundStreamMatValues.apply")

View file

@ -967,16 +967,17 @@ akka {
# Only used when transport is tcp or tls-tcp.
connection-timeout = 5 seconds
# The timeout for outbound associations to perform the handshake.
# This timeout must be greater than the 'image-liveness-timeout'.
# The timeout for outbound associations to perform the initial handshake.
# This timeout must be greater than the 'image-liveness-timeout' when
# transport is aeron-udp.
handshake-timeout = 20 seconds
# incomplete handshake attempt is retried with this interval
# incomplete initial handshake attempt is retried with this interval
handshake-retry-interval = 1 second
# handshake requests are performed periodically with this interval,
# Handshake requests are performed periodically with this interval,
# also after the handshake has been completed to be able to establish
# a new session with a restarted destination system
# a new session with a restarted destination system.
inject-handshake-interval = 1 second
# messages that are not accepted by Aeron are dropped after retrying for this period
@ -988,6 +989,27 @@ akka {
# of a network partition that you need to survive.
give-up-system-message-after = 6 hours
# Outbound streams are stopped when they haven't been used for this duration.
# They are started again when new messages are sent.
stop-idle-outbound-after = 5 minutes
# Outbound streams are quarantined when they haven't been used for this duration
# to cleanup resources used by the association, such as compression tables.
# This will cleanup association to crashed systems that didn't announce their
# termination.
# The value should be longer than the length of a network partition that you
# need to survive.
# The value must also be greater than stop-idle-outbound-after.
# Once every 1/10 of this duration an extra handshake message will be sent.
# Therfore it's also recommended to use a value that is greater than 10 times
# the stop-idle-outbound-after, since otherwise the idle streams will not be
# stopped.
quarantine-idle-outbound-after = 6 hours
# Stop outbound stream of a quarantined association after this idle timeout, i.e.
# when not used any more.
stop-quarantined-after-idle = 3 seconds
# After catastrophic communication failures that could result in the loss of system
# messages or after the remote DeathWatch triggers the remote system gets
# quarantined to prevent inconsistent behavior.
@ -999,10 +1021,6 @@ akka {
# if it wakes up again. Therfore this shouldn't be set too low.
remove-quarantined-association-after = 1 h
# Outbound streams are stopped when they haven't been used for this duration.
# They are started again when new messages are sent.
stop-idle-outbound-after = 5.minutes
# during ActorSystem termination the remoting will wait this long for
# an acknowledgment by the destination system that flushing of outstanding
# remote messages has been completed
@ -1026,10 +1044,6 @@ akka {
# If more restarts occurs the ActorSystem will be terminated.
outbound-max-restarts = 5
# Stop outbound stream of a quarantined association after this idle timeout, i.e.
# when not used any more.
stop-quarantined-after-idle = 3 seconds
# Timeout after which aeron driver has not had keepalive messages
# from a client before it considers the client dead.
# Only used when transport is aeron-udp.

View file

@ -9,10 +9,11 @@ import akka.dispatch.sysmsg.{ DeathWatchNotification, Watch }
import akka.dispatch.{ RequiresMessageQueue, UnboundedMessageQueueSemantics }
import akka.event.AddressTerminatedTopic
import akka.remote.artery.ArteryMessage
import scala.collection.mutable
import scala.concurrent.duration._
import akka.remote.artery.ArteryTransport
/**
* INTERNAL API
*/
@ -163,7 +164,7 @@ private[akka] class RemoteWatcher(
watchingNodes foreach { a
if (!unreachable(a) && !failureDetector.isAvailable(a)) {
log.warning("Detected unreachable: [{}]", a)
quarantine(a, addressUids.get(a), "Deemed unreachable by remote failure detector")
quarantine(a, addressUids.get(a), "Deemed unreachable by remote failure detector", harmless = false)
publishAddressTerminated(a)
unreachable += a
}
@ -172,8 +173,12 @@ private[akka] class RemoteWatcher(
def publishAddressTerminated(address: Address): Unit =
AddressTerminatedTopic(context.system).publish(AddressTerminated(address))
def quarantine(address: Address, uid: Option[Long], reason: String): Unit =
remoteProvider.quarantine(address, uid, reason)
def quarantine(address: Address, uid: Option[Long], reason: String, harmless: Boolean): Unit = {
remoteProvider.transport match {
case t: ArteryTransport if harmless t.quarantine(address, uid, reason, harmless)
case _ remoteProvider.quarantine(address, uid, reason)
}
}
def addWatch(watchee: InternalActorRef, watcher: InternalActorRef): Unit = {
assert(watcher != self)

View file

@ -95,7 +95,7 @@ final case class QuarantinedEvent(address: Address, longUid: Long) extends Remot
override def logLevel: Logging.LogLevel = Logging.WarningLevel
override val toString: String =
s"Association to [$address] having UID [$longUid] is irrecoverably failed. UID is now quarantined and all " +
"messages to this UID will be delivered to dead letters. Remote actorsystem must be restarted to recover " +
"messages to this UID will be delivered to dead letters. Remote ActorSystem must be restarted to recover " +
"from this situation."
// For binary compatibility
@ -110,6 +110,17 @@ final case class QuarantinedEvent(address: Address, longUid: Long) extends Remot
def copy(address: Address = address, uid: Int = uid) = new QuarantinedEvent(address, uid)
}
/**
* The `uniqueAddress` was quarantined but it was due to normal shutdown or cluster leaving/exiting.
*/
@SerialVersionUID(1L)
final case class GracefulShutdownQuarantinedEvent(uniqueAddress: UniqueAddress, reason: String) extends RemotingLifecycleEvent {
override def logLevel: Logging.LogLevel = Logging.InfoLevel
override val toString: String =
s"Association to [${uniqueAddress.address}] having UID [${uniqueAddress.uid}] has been stopped. All " +
s"messages to this UID will be delivered to dead letters. Reason: $reason "
}
@SerialVersionUID(1L)
final case class ThisActorSystemQuarantinedEvent(localAddress: Address, remoteAddress: Address) extends RemotingLifecycleEvent {
override def logLevel: LogLevel = Logging.WarningLevel

View file

@ -144,11 +144,18 @@ private[akka] final class ArterySettings private (config: Config) {
val GiveUpSystemMessageAfter: FiniteDuration =
config.getMillisDuration("give-up-system-message-after").requiring(interval
interval > Duration.Zero, "give-up-system-message-after must be more than zero")
val StopIdleOutboundAfter: FiniteDuration = config.getMillisDuration("stop-idle-outbound-after")
.requiring(interval interval > Duration.Zero, "stop-idle-outbound-after must be more than zero")
val QuarantineIdleOutboundAfter: FiniteDuration = config.getMillisDuration("quarantine-idle-outbound-after")
.requiring(
interval interval > StopIdleOutboundAfter,
"quarantine-idle-outbound-after must be greater than stop-idle-outbound-after")
val StopQuarantinedAfterIdle: FiniteDuration =
config.getMillisDuration("stop-quarantined-after-idle").requiring(interval
interval > Duration.Zero, "stop-quarantined-after-idle must be more than zero")
val RemoveQuarantinedAssociationAfter: FiniteDuration =
config.getMillisDuration("remove-quarantined-association-after").requiring(interval
interval > Duration.Zero, "remove-quarantined-association-after must be more than zero")
val StopIdleOutboundAfter: FiniteDuration = config.getMillisDuration("stop-idle-outbound-after").requiring(interval
interval > Duration.Zero, "stop-idle-outbound-after must be more than zero")
val ShutdownFlushTimeout: FiniteDuration =
config.getMillisDuration("shutdown-flush-timeout").requiring(interval
interval > Duration.Zero, "shutdown-flush-timeout must be more than zero")
@ -163,9 +170,6 @@ private[akka] final class ArterySettings private (config: Config) {
config.getMillisDuration("outbound-restart-timeout").requiring(interval
interval > Duration.Zero, "outbound-restart-timeout must be more than zero")
val OutboundMaxRestarts: Int = getInt("outbound-max-restarts")
val StopQuarantinedAfterIdle: FiniteDuration =
config.getMillisDuration("stop-quarantined-after-idle").requiring(interval
interval > Duration.Zero, "stop-quarantined-after-idle must be more than zero")
val ClientLivenessTimeout: FiniteDuration =
config.getMillisDuration("client-liveness-timeout").requiring(interval
interval > Duration.Zero, "client-liveness-timeout must be more than zero")

View file

@ -736,8 +736,12 @@ private[remote] abstract class ArteryTransport(_system: ExtendedActorSystem, _pr
}
override def quarantine(remoteAddress: Address, uid: Option[Long], reason: String): Unit = {
quarantine(remoteAddress, uid, reason, harmless = false)
}
def quarantine(remoteAddress: Address, uid: Option[Long], reason: String, harmless: Boolean): Unit = {
try {
association(remoteAddress).quarantine(reason, uid)
association(remoteAddress).quarantine(reason, uid, harmless)
} catch {
case ShuttingDown // silence it
}
@ -772,15 +776,16 @@ private[remote] abstract class ArteryTransport(_system: ExtendedActorSystem, _pr
Flow.fromGraph(killSwitch.flow[OutboundEnvelope])
.via(new OutboundHandshake(system, outboundContext, outboundEnvelopePool, settings.Advanced.HandshakeTimeout,
settings.Advanced.HandshakeRetryInterval, settings.Advanced.InjectHandshakeInterval))
settings.Advanced.HandshakeRetryInterval, settings.Advanced.InjectHandshakeInterval, Duration.Undefined))
.viaMat(createEncoder(bufferPool, streamId))(Keep.right)
}
def outboundControl(outboundContext: OutboundContext): Sink[OutboundEnvelope, (OutboundControlIngress, Future[Done])] = {
val livenessProbeInterval = (settings.Advanced.QuarantineIdleOutboundAfter / 10)
.max(settings.Advanced.HandshakeRetryInterval)
Flow.fromGraph(killSwitch.flow[OutboundEnvelope])
.via(new OutboundHandshake(system, outboundContext, outboundEnvelopePool, settings.Advanced.HandshakeTimeout,
settings.Advanced.HandshakeRetryInterval, settings.Advanced.InjectHandshakeInterval))
settings.Advanced.HandshakeRetryInterval, settings.Advanced.InjectHandshakeInterval, livenessProbeInterval))
.via(new SystemMessageDelivery(outboundContext, system.deadLetters, settings.Advanced.SystemMessageResendInterval,
settings.Advanced.SysMsgBufferSize))
// note that System messages must not be dropped before the SystemMessageDelivery stage
@ -813,13 +818,20 @@ private[remote] abstract class ArteryTransport(_system: ExtendedActorSystem, _pr
// Checks for termination hint messages and sends an ACK for those (not processing them further)
// Purpose of this stage is flushing, the sender can wait for the ACKs up to try flushing
// pending messages.
def terminationHintReplier(): Flow[InboundEnvelope, InboundEnvelope, NotUsed] = {
def terminationHintReplier(inControlStream: Boolean): Flow[InboundEnvelope, InboundEnvelope, NotUsed] = {
Flow[InboundEnvelope].filter { envelope
envelope.message match {
case _: ActorSystemTerminating
case ActorSystemTerminating(from)
envelope.sender match {
case OptionVal.Some(snd) snd.tell(ActorSystemTerminatingAck(localAddress), ActorRef.noSender)
case OptionVal.None log.error("Expected sender for ActorSystemTerminating message")
case OptionVal.Some(snd)
snd.tell(ActorSystemTerminatingAck(localAddress), ActorRef.noSender)
if (inControlStream)
system.scheduler.scheduleOnce(settings.Advanced.ShutdownFlushTimeout) {
if (!isShutdown)
quarantine(from.address, Some(from.uid), "ActorSystem terminated", harmless = true)
}(materializer.executionContext)
case OptionVal.None
log.error("Expected sender for ActorSystemTerminating message from [{}]", from)
}
false
case _ true
@ -831,7 +843,7 @@ private[remote] abstract class ArteryTransport(_system: ExtendedActorSystem, _pr
Flow[InboundEnvelope]
.via(createDeserializer(bufferPool))
.via(if (settings.Advanced.TestMode) new InboundTestStage(this, testState) else Flow[InboundEnvelope])
.via(terminationHintReplier())
.via(terminationHintReplier(inControlStream = false))
.via(new InboundHandshake(this, inControlStream = false))
.via(new InboundQuarantineCheck(this))
.toMat(messageDispatcherSink)(Keep.right)
@ -850,7 +862,7 @@ private[remote] abstract class ArteryTransport(_system: ExtendedActorSystem, _pr
Flow[InboundEnvelope]
.via(createDeserializer(envelopeBufferPool))
.via(if (settings.Advanced.TestMode) new InboundTestStage(this, testState) else Flow[InboundEnvelope])
.via(terminationHintReplier())
.via(terminationHintReplier(inControlStream = true))
.via(new InboundHandshake(this, inControlStream = true))
.via(new InboundQuarantineCheck(this))
.viaMat(new InboundControlJunction)(Keep.right)

View file

@ -4,6 +4,7 @@
package akka.remote.artery
import akka.util.PrettyDuration._
import java.util.Queue
import java.util.concurrent.CountDownLatch
import java.util.concurrent.LinkedBlockingQueue
@ -180,8 +181,8 @@ private[remote] class Association(
// keyed by stream queue index
private[this] val streamMatValues = new AtomicReference(Map.empty[Int, OutboundStreamMatValues])
private[this] val idleTask = new AtomicReference[Option[Cancellable]](None)
private[this] val quarantinedIdleTask = new AtomicReference[Option[Cancellable]](None)
private[this] val idleTimer = new AtomicReference[Option[Cancellable]](None)
private[this] val stopQuarantinedTimer = new AtomicReference[Option[Cancellable]](None)
private[remote] def changeActorRefCompression(table: CompressionTable[ActorRef]): Future[Done] =
updateOutboundCompression(c c.changeActorRefCompression(table))
@ -256,8 +257,6 @@ private[remote] class Association(
def setControlIdleKillSwitch(killSwitch: OptionVal[SharedKillSwitch]): Unit = {
val current = associationState
swapState(current, current.withControlIdleKillSwitch(killSwitch))
if (killSwitch.isDefined)
startIdleTimer()
}
def completeHandshake(peer: UniqueAddress): Future[Done] = {
@ -285,7 +284,7 @@ private[remote] class Association(
if (swapState(current, newState)) {
current.uniqueRemoteAddressValue() match {
case Some(old)
cancelQuarantinedIdleTimer()
cancelStopQuarantinedTimer()
log.debug(
"Incarnation {} of association to [{}] with new UID [{}] (old UID [{}])",
newState.incarnation, peer.address, peer.uid, old.uid)
@ -308,7 +307,7 @@ private[remote] class Association(
if (associationState.isQuarantined()) {
log.debug("Send control message [{}] to quarantined [{}]", Logging.messageClassName(message),
remoteAddress)
startQuarantinedIdleTimer()
setupStopQuarantinedTimer()
}
outboundControlIngress.sendControlMessage(message)
}
@ -351,7 +350,7 @@ private[remote] class Association(
if (message.isInstanceOf[ActorSelectionMessage] || !quarantined || messageIsClearSystemMessageDelivery) {
if (quarantined && !messageIsClearSystemMessageDelivery) {
log.debug("Quarantine piercing attempt with message [{}] to [{}]", Logging.messageClassName(message), recipient.getOrElse(""))
startQuarantinedIdleTimer()
setupStopQuarantinedTimer()
}
try {
val outboundEnvelope = createOutboundEnvelope()
@ -454,10 +453,10 @@ private[remote] class Association(
// OutboundContext
override def quarantine(reason: String): Unit = {
val uid = associationState.uniqueRemoteAddressValue().map(_.uid)
quarantine(reason, uid)
quarantine(reason, uid, harmless = false)
}
@tailrec final def quarantine(reason: String, uid: Option[Long]): Unit = {
@tailrec final def quarantine(reason: String, uid: Option[Long], harmless: Boolean): Unit = {
uid match {
case Some(u)
val current = associationState
@ -467,22 +466,32 @@ private[remote] class Association(
val newState = current.newQuarantined()
if (swapState(current, newState)) {
// quarantine state change was performed
if (harmless) {
log.info(
"Association to [{}] having UID [{}] has been stopped. All " +
"messages to this UID will be delivered to dead letters. Reason: {}",
remoteAddress, u, reason)
transport.system.eventStream.publish(GracefulShutdownQuarantinedEvent(UniqueAddress(remoteAddress, u), reason))
} else {
log.warning(
"Association to [{}] with UID [{}] is irrecoverably failed. UID is now quarantined and all " +
"messages to this UID will be delivered to dead letters. " +
"Remote actorsystem must be restarted to recover from this situation. {}",
"Remote ActorSystem must be restarted to recover from this situation. Reason: {}",
remoteAddress, u, reason)
transport.system.eventStream.publish(QuarantinedEvent(remoteAddress, u))
}
flightRecorder.loFreq(Transport_Quarantined, s"$remoteAddress - $u")
clearOutboundCompression()
clearInboundCompression(u)
// end delivery of system messages to that incarnation after this point
send(ClearSystemMessageDelivery(current.incarnation), OptionVal.None, OptionVal.None)
if (!harmless) {
// try to tell the other system that we have quarantined it
sendControl(Quarantined(localAddress, peer))
startQuarantinedIdleTimer()
}
setupStopQuarantinedTimer()
} else
quarantine(reason, uid) // recursive
quarantine(reason, uid, harmless) // recursive
}
case Some(peer)
log.info(
@ -519,8 +528,7 @@ private[remote] class Association(
// cleanup
_outboundControlIngress = OptionVal.None
outboundCompressionAccess = Vector.empty
cancelIdleTimer()
cancelQuarantinedIdleTimer()
cancelAllTimers()
abortQuarantined()
log.info("Unused association to [{}] removed after quarantine", remoteAddress)
@ -530,21 +538,22 @@ private[remote] class Association(
def isRemovedAfterQuarantined(): Boolean =
queues(ControlQueueIndex) == RemovedQueueWrapper
private def cancelQuarantinedIdleTimer(): Unit = {
val current = quarantinedIdleTask.get
private def cancelStopQuarantinedTimer(): Unit = {
val current = stopQuarantinedTimer.get
current.foreach(_.cancel())
quarantinedIdleTask.compareAndSet(current, None)
stopQuarantinedTimer.compareAndSet(current, None)
}
private def startQuarantinedIdleTimer(): Unit = {
cancelQuarantinedIdleTimer()
quarantinedIdleTask.set(Some(transport.system.scheduler.scheduleOnce(advancedSettings.StopQuarantinedAfterIdle) {
private def setupStopQuarantinedTimer(): Unit = {
cancelStopQuarantinedTimer()
stopQuarantinedTimer.set(Some(transport.system.scheduler.scheduleOnce(advancedSettings.StopQuarantinedAfterIdle) {
if (associationState.isQuarantined())
abortQuarantined()
}(transport.system.dispatcher)))
}
private def abortQuarantined(): Unit = {
cancelIdleTimer()
streamMatValues.get.foreach {
case (queueIndex, OutboundStreamMatValues(killSwitch, _, _))
killSwitch match {
@ -558,19 +567,24 @@ private[remote] class Association(
}
private def cancelIdleTimer(): Unit = {
val current = idleTask.get
val current = idleTimer.get
current.foreach(_.cancel())
idleTask.compareAndSet(current, None)
idleTimer.compareAndSet(current, None)
}
private def startIdleTimer(): Unit = {
cancelIdleTimer()
private def setupIdleTimer(): Unit = {
if (idleTimer.get.isEmpty) {
val StopIdleOutboundAfter = settings.Advanced.StopIdleOutboundAfter
val QuarantineIdleOutboundAfter = settings.Advanced.QuarantineIdleOutboundAfter
val interval = StopIdleOutboundAfter / 2
val stopIdleOutboundAfterNanos = StopIdleOutboundAfter.toNanos
val initialDelay = settings.Advanced.ConnectionTimeout.max(StopIdleOutboundAfter) + 1.second
val task: Cancellable = transport.system.scheduler.schedule(initialDelay, interval) {
if (System.nanoTime() - associationState.lastUsedTimestamp.get >= stopIdleOutboundAfterNanos) {
val task = transport.system.scheduler.schedule(initialDelay, interval) {
val lastUsedDurationNanos = System.nanoTime() - associationState.lastUsedTimestamp.get
if (lastUsedDurationNanos >= QuarantineIdleOutboundAfter.toNanos && !associationState.isQuarantined()) {
// If idle longer than quarantine-idle-outbound-after and the low frequency HandshakeReq
// doesn't get through it will be quarantined to cleanup lingering associations to crashed systems.
quarantine(s"Idle longer than quarantine-idle-outbound-after [${QuarantineIdleOutboundAfter.pretty}]")
} else if (lastUsedDurationNanos >= StopIdleOutboundAfter.toNanos) {
streamMatValues.get.foreach {
case (queueIndex, OutboundStreamMatValues(streamKillSwitch, _, stopping))
if (isStreamActive(queueIndex) && stopping.isEmpty) {
@ -595,24 +609,24 @@ private[remote] class Association(
flightRecorder.loFreq(Transport_StopIdleOutbound, s"$remoteAddress - $queueIndex")
setControlIdleKillSwitch(OptionVal.None)
killSwitch.abort(OutboundStreamStopIdleSignal)
case OptionVal.None
log.debug(
"Couldn't stop idle outbound control stream to [{}] due to missing KillSwitch.",
remoteAddress)
case OptionVal.None // already stopped
}
}
}
}
cancelIdleTimer()
}
}(transport.system.dispatcher)
if (!idleTask.compareAndSet(None, Some(task))) {
if (!idleTimer.compareAndSet(None, Some(task))) {
// another thread did same thing and won
task.cancel()
}
}
}
private def cancelAllTimers(): Unit = {
cancelIdleTimer()
cancelStopQuarantinedTimer()
}
private def sendToDeadLetters[T](pending: Vector[OutboundEnvelope]): Unit = {
@ -631,7 +645,6 @@ private[remote] class Association(
if (!controlQueue.isInstanceOf[QueueWrapper])
throw new IllegalStateException("associate() must only be called once")
runOutboundStreams()
startIdleTimer()
}
private def runOutboundStreams(): Unit = {
@ -676,6 +689,7 @@ private[remote] class Association(
materializing.countDown()
updateStreamMatValues(ControlQueueIndex, streamKillSwitch, completed)
setupIdleTimer()
attachOutboundStreamRestart("Outbound control stream", ControlQueueIndex, controlQueueSize,
completed, () runOutboundControlStream())
}
@ -810,13 +824,12 @@ private[remote] class Association(
_outboundControlIngress = OptionVal.None
}
// LazyQueueWrapper will invoke the `restart` function when first message is offered
val restartAndStartIdleTimer: () Unit = () {
val wrappedRestartFun: () Unit = () {
restart()
startIdleTimer()
}
if (!isRemovedAfterQuarantined())
queues(queueIndex) = LazyQueueWrapper(createQueue(queueCapacity, queueIndex), restartAndStartIdleTimer)
queues(queueIndex) = LazyQueueWrapper(createQueue(queueCapacity, queueIndex), wrappedRestartFun)
queuesVisibility = true // volatile write for visibility of the queues array
}
@ -830,7 +843,7 @@ private[remote] class Association(
streamCompleted.failed.foreach {
case ArteryTransport.ShutdownSignal
// shutdown as expected
cancelIdleTimer()
cancelAllTimers()
// countDown the latch in case threads are waiting on the latch in outboundControlIngress method
materializing.countDown()
case cause if transport.isShutdown || isRemovedAfterQuarantined()
@ -838,15 +851,15 @@ private[remote] class Association(
// for the TCP transport the ShutdownSignal is "converted" to StreamTcpException
if (!cause.isInstanceOf[StreamTcpException])
log.error(cause, s"{} to [{}] failed after shutdown. {}", streamName, remoteAddress, cause.getMessage)
cancelIdleTimer()
cancelAllTimers()
// countDown the latch in case threads are waiting on the latch in outboundControlIngress method
materializing.countDown()
case _: AeronTerminated
// shutdown already in progress
cancelIdleTimer()
cancelAllTimers()
case _: AbruptTerminationException
// ActorSystem shutdown
cancelIdleTimer()
cancelAllTimers()
case cause
// it might have been stopped as expected due to idle or quarantine
@ -884,7 +897,7 @@ private[remote] class Association(
} else {
log.error(cause, s"{} to [{}] failed and restarted {} times within {} seconds. Terminating system. ${cause.getMessage}",
streamName, remoteAddress, advancedSettings.OutboundMaxRestarts, advancedSettings.OutboundRestartTimeout.toSeconds)
cancelIdleTimer()
cancelAllTimers()
transport.system.terminate()
}
}

View file

@ -42,6 +42,7 @@ private[remote] object OutboundHandshake {
private case object HandshakeTimeout
private case object HandshakeRetryTick
private case object InjectHandshakeTick
private case object LivenessProbeTick
}
@ -53,7 +54,9 @@ private[remote] class OutboundHandshake(
outboundContext: OutboundContext,
outboundEnvelopePool: ObjectPool[ReusableOutboundEnvelope],
timeout: FiniteDuration,
retryInterval: FiniteDuration, injectHandshakeInterval: FiniteDuration)
retryInterval: FiniteDuration,
injectHandshakeInterval: FiniteDuration,
livenessProbeInterval: Duration)
extends GraphStage[FlowShape[OutboundEnvelope, OutboundEnvelope]] {
val in: Inlet[OutboundEnvelope] = Inlet("OutboundHandshake.in")
@ -61,29 +64,42 @@ private[remote] class OutboundHandshake(
override val shape: FlowShape[OutboundEnvelope, OutboundEnvelope] = FlowShape(in, out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new TimerGraphStageLogic(shape) with InHandler with OutHandler {
new TimerGraphStageLogic(shape) with InHandler with OutHandler with StageLogging {
import OutboundHandshake._
private var handshakeState: HandshakeState = Start
private var pendingMessage: OutboundEnvelope = null
private var pendingMessage: OptionVal[OutboundEnvelope] = OptionVal.None
private var injectHandshakeTickScheduled = false
override protected def logSource: Class[_] = classOf[OutboundHandshake]
override def preStart(): Unit = {
scheduleOnce(HandshakeTimeout, timeout)
livenessProbeInterval match {
case d: FiniteDuration schedulePeriodically(LivenessProbeTick, d)
case _ // only used in control stream
}
}
// InHandler
override def onPush(): Unit = {
if (handshakeState != Completed)
throw new IllegalStateException(s"onPush before handshake completed, was [$handshakeState]")
throw new IllegalStateException(s"onPush before handshake completed, was [$handshakeState].")
// inject a HandshakeReq once in a while to trigger a new handshake when destination
// system has been restarted
if (injectHandshakeTickScheduled) {
// out is always available here, except for if a liveness HandshakeReq was just pushed
if (isAvailable(out))
push(out, grab(in))
else {
if (pendingMessage.isDefined)
throw new IllegalStateException(s"pendingMessage expected to be empty")
pendingMessage = OptionVal.Some(grab(in))
}
} else {
pushHandshakeReq()
pendingMessage = grab(in)
pendingMessage = OptionVal.Some(grab(in))
}
}
@ -91,11 +107,13 @@ private[remote] class OutboundHandshake(
override def onPull(): Unit = {
handshakeState match {
case Completed
if (pendingMessage eq null)
pendingMessage match {
case OptionVal.None
if (!hasBeenPulled(in))
pull(in)
else {
push(out, pendingMessage)
pendingMessage = null
case OptionVal.Some(p)
push(out, p)
pendingMessage = OptionVal.None
}
case Start
@ -131,10 +149,29 @@ private[remote] class OutboundHandshake(
private def pushHandshakeReq(): Unit = {
injectHandshakeTickScheduled = true
scheduleOnce(InjectHandshakeTick, injectHandshakeInterval)
val env: OutboundEnvelope = outboundEnvelopePool.acquire().init(
recipient = OptionVal.None, message = HandshakeReq(outboundContext.localAddress, outboundContext.remoteAddress), sender = OptionVal.None)
outboundContext.associationState.lastUsedTimestamp.set(System.nanoTime())
push(out, env)
if (isAvailable(out))
push(out, createHandshakeReqEnvelope())
}
private def pushLivenessProbeReq(): Unit = {
// The associationState.lastUsedTimestamp will be updated when the HandshakeRsp is received
// and that is the confirmation that the other system is alive, and will not be quarantined
// by the quarantine-idle-outbound-after even though no real messages have been sent.
if (handshakeState == Completed && isAvailable(out) && pendingMessage.isEmpty) {
val lastUsedDuration = (System.nanoTime() - outboundContext.associationState.lastUsedTimestamp.get()).nanos
if (lastUsedDuration >= livenessProbeInterval) {
log.info(
"Association to [{}] has been idle for [{}] seconds, sending HandshakeReq to validate liveness",
outboundContext.remoteAddress, lastUsedDuration.toSeconds)
push(out, createHandshakeReqEnvelope())
}
}
}
private def createHandshakeReqEnvelope(): OutboundEnvelope = {
outboundEnvelopePool.acquire().init(
recipient = OptionVal.None, message = HandshakeReq(outboundContext.localAddress, outboundContext.remoteAddress), sender = OptionVal.None)
}
private def handshakeCompleted(): Unit = {
@ -148,6 +185,8 @@ private[remote] class OutboundHandshake(
case InjectHandshakeTick
// next onPush message will trigger sending of HandshakeReq
injectHandshakeTickScheduled = false
case LivenessProbeTick
pushLivenessProbeReq()
case HandshakeRetryTick
if (isAvailable(out))
pushHandshakeReq()
@ -181,6 +220,11 @@ private[remote] class InboundHandshake(inboundContext: InboundContext, inControl
env.message match {
case HandshakeReq(from, to) onHandshakeReq(from, to)
case HandshakeRsp(from)
// Touch the lastUsedTimestamp here also because when sending the extra low frequency HandshakeRsp
// the timestamp is not supposed to be updated when sending but when receiving reply, which confirms
// that the other system is alive.
inboundContext.association(from.address).associationState.lastUsedTimestamp.set(System.nanoTime())
after(inboundContext.completeHandshake(from)) {
pull(in)
}

View file

@ -93,7 +93,7 @@ private[remote] final class InboundCompressionsImpl(
override def confirmActorRefCompressionAdvertisement(originUid: Long, tableVersion: Byte): Unit = {
_actorRefsIns.get(originUid) match {
case null // ignore
case a a.confirmAdvertisement(tableVersion)
case a a.confirmAdvertisement(tableVersion, gaveUp = false)
}
}
/** Send compression table advertisement over control stream. Should be called from Decoder. */
@ -124,7 +124,7 @@ private[remote] final class InboundCompressionsImpl(
override def confirmClassManifestCompressionAdvertisement(originUid: Long, tableVersion: Byte): Unit = {
_classManifestsIns.get(originUid) match {
case null // ignore
case a a.confirmAdvertisement(tableVersion)
case a a.confirmAdvertisement(tableVersion, gaveUp = false)
}
}
/** Send compression table advertisement over control stream. Should be called from Decoder. */
@ -296,6 +296,7 @@ private[remote] abstract class InboundCompression[T >: Null](
// We should not continue sending advertisements to an association that might be dead (not quarantined yet)
@volatile private[this] var alive = true
private[this] var resendCount = 0
private[this] val maxResendCount = 3
private[this] val cms = new CountMinSketch(16, 1024, System.currentTimeMillis().toInt)
@ -338,7 +339,7 @@ private[remote] abstract class InboundCompression[T >: Null](
"Received first value from originUid [{}] compressed using the advertised compression table, " +
"flipping to it (version: {})",
originUid, current.nextTable.version)
confirmAdvertisement(incomingTableVersion)
confirmAdvertisement(incomingTableVersion, gaveUp = false)
decompressInternal(incomingTableVersion, idx, attemptCounter + 1) // recurse
case _
@ -354,15 +355,17 @@ private[remote] abstract class InboundCompression[T >: Null](
}
}
final def confirmAdvertisement(tableVersion: Byte): Unit = {
final def confirmAdvertisement(tableVersion: Byte, gaveUp: Boolean): Unit = {
tables.advertisementInProgress match {
case Some(inProgress) if tableVersion == inProgress.version
tables = tables.startUsingNextTable()
log.debug("Confirmed compression table version [{}] for originUid [{}]", tableVersion, originUid)
log.debug(
"{} compression table version [{}] for originUid [{}]",
if (gaveUp) "Gave up" else "Confirmed", tableVersion, originUid)
case Some(inProgress) if tableVersion != inProgress.version
log.debug(
"Confirmed compression table version [{}] for originUid [{}] but other version in progress [{}]",
tableVersion, originUid, inProgress.version)
"{} compression table version [{}] for originUid [{}] but other version in progress [{}]",
if (gaveUp) "Gave up" else "Confirmed", tableVersion, originUid, inProgress.version)
case None
// already confirmed
}
@ -410,7 +413,7 @@ private[remote] abstract class InboundCompression[T >: Null](
alive = false // will be set to true on first incoming message
resendCount = 0
advertiseCompressionTable(association, table)
} else {
} else if (association.isOrdinaryMessageStreamActive()) {
log.debug("{} for originUid [{}] not changed, no need to advertise same.", Logging.simpleName(tables.activeTable), originUid)
}
@ -422,23 +425,23 @@ private[remote] abstract class InboundCompression[T >: Null](
case Some(inProgress)
resendCount += 1
if (resendCount <= 5) {
if (resendCount <= maxResendCount) {
// The ActorRefCompressionAdvertisement message is resent because it can be lost
inboundContext.association(originUid) match {
case OptionVal.Some(association)
log.debug(
"Advertisement in progress for originUid [{}] version {}, resending",
originUid, inProgress.version)
"Advertisement in progress for originUid [{}] version [{}], resending [{}:{}]",
originUid, inProgress.version, resendCount, maxResendCount)
advertiseCompressionTable(association, inProgress) // resend
case OptionVal.None
}
} else {
// give up, it might be dead
log.debug(
"Advertisement in progress for originUid [{}] version {} but no confirmation after retries.",
"Advertisement in progress for originUid [{}] version [{}] but no confirmation after retries.",
originUid, inProgress.version)
confirmAdvertisement(inProgress.version)
confirmAdvertisement(inProgress.version, gaveUp = true)
}
}
}

View file

@ -128,7 +128,7 @@ private[remote] class ArteryTcpTransport(_system: ExtendedActorSystem, _provider
def connectionFlowWithRestart: Flow[ByteString, ByteString, NotUsed] = {
val flowFactory = () {
val flow =
def flow(controlIdleKillSwitch: OptionVal[SharedKillSwitch]) =
Flow[ByteString]
.via(Flow.lazyInitAsync(() {
// only open the actual connection if any new messages are sent
@ -136,6 +136,8 @@ private[remote] class ArteryTcpTransport(_system: ExtendedActorSystem, _provider
TcpOutbound_Connected,
s"${outboundContext.remoteAddress.host.get}:${outboundContext.remoteAddress.port.get} " +
s"/ ${streamName(streamId)}")
if (controlIdleKillSwitch.isDefined)
outboundContext.asInstanceOf[Association].setControlIdleKillSwitch(controlIdleKillSwitch)
Future.successful(
Flow[ByteString]
.prepend(Source.single(TcpFraming.encodeConnectionHeader(streamId)))
@ -150,29 +152,19 @@ private[remote] class ArteryTcpTransport(_system: ExtendedActorSystem, _provider
val controlIdleKillSwitch = KillSwitches.shared("outboundControlStreamIdleKillSwitch")
Flow[ByteString]
.via(controlIdleKillSwitch.flow)
.via(flow)
.mapMaterializedValue { _
outboundContext.asInstanceOf[Association].setControlIdleKillSwitch(OptionVal.Some(controlIdleKillSwitch))
NotUsed
}
.via(flow(OptionVal.Some(controlIdleKillSwitch)))
} else {
flow
flow(OptionVal.None)
}
}
if (streamId == ControlStreamId) {
// restart of inner connection part important in control flow, since system messages
// are buffered and resent from the outer SystemMessageDelivery stage.
val maxRestarts = if (streamId == ControlStreamId) Int.MaxValue else 3
// Restart of inner connection part important in control stream, since system messages
// are buffered and resent from the outer SystemMessageDelivery stage. No maxRestarts limit for control
// stream. For message stream it's best effort retry a few times.
RestartFlow.withBackoff[ByteString, ByteString](
settings.Advanced.OutboundRestartBackoff,
settings.Advanced.GiveUpSystemMessageAfter, 0.1)(flowFactory)
} else {
// Best effort retry a few times
RestartFlow.withBackoff[ByteString, ByteString](
settings.Advanced.OutboundRestartBackoff,
settings.Advanced.OutboundRestartBackoff * 5, 0.1, maxRestarts = 3)(flowFactory)
}
settings.Advanced.OutboundRestartBackoff * 5, 0.1, maxRestarts)(flowFactory)
}
Flow[EnvelopeBuffer]
@ -416,7 +408,7 @@ private[remote] class ArteryTcpTransport(_system: ExtendedActorSystem, _provider
}
override protected def shutdownTransport(): Future[Done] = {
implicit val ec: ExecutionContext = materializer.executionContext
import system.dispatcher
inboundKillSwitch.shutdown()
unbind().map { _
topLevelFlightRecorder.loFreq(Transport_Stopped, NoMetaData)
@ -425,9 +417,9 @@ private[remote] class ArteryTcpTransport(_system: ExtendedActorSystem, _provider
}
private def unbind(): Future[Done] = {
implicit val ec: ExecutionContext = materializer.executionContext
serverBinding match {
case Some(binding)
import system.dispatcher
for {
b binding
_ b.unbind()

View file

@ -54,7 +54,7 @@ object RemoteWatcherSpec {
// that doesn't interfere with the real watch that is going on in the background
context.system.eventStream.publish(TestRemoteWatcher.AddressTerm(address))
override def quarantine(address: Address, uid: Option[Long], reason: String): Unit = {
override def quarantine(address: Address, uid: Option[Long], reason: String, harmless: Boolean): Unit = {
// don't quarantine in remoting, but publish a testable message
context.system.eventStream.publish(TestRemoteWatcher.Quarantined(address, uid))
}

View file

@ -33,11 +33,13 @@ class OutboundHandshakeSpec extends AkkaSpec with ImplicitSender {
private def setupStream(
outboundContext: OutboundContext, timeout: FiniteDuration = 5.seconds,
retryInterval: FiniteDuration = 10.seconds,
injectHandshakeInterval: FiniteDuration = 10.seconds): (TestPublisher.Probe[String], TestSubscriber.Probe[Any]) = {
injectHandshakeInterval: FiniteDuration = 10.seconds,
livenessProbeInterval: Duration = Duration.Undefined): (TestPublisher.Probe[String], TestSubscriber.Probe[Any]) = {
TestSource.probe[String]
.map(msg outboundEnvelopePool.acquire().init(OptionVal.None, msg, OptionVal.None))
.via(new OutboundHandshake(system, outboundContext, outboundEnvelopePool, timeout, retryInterval, injectHandshakeInterval))
.via(new OutboundHandshake(system, outboundContext, outboundEnvelopePool, timeout, retryInterval,
injectHandshakeInterval, livenessProbeInterval))
.map(env env.message)
.toMat(TestSink.probe[Any])(Keep.both)
.run()
@ -130,6 +132,21 @@ class OutboundHandshakeSpec extends AkkaSpec with ImplicitSender {
downstream.cancel()
}
"send HandshakeReq for liveness probing" in {
val inboundContext = new TestInboundContext(localAddress = addressA)
val outboundContext = inboundContext.association(addressB.address)
val (upstream, downstream) = setupStream(outboundContext, livenessProbeInterval = 200.millis)
downstream.request(10)
// this is from the initial
downstream.expectNext(HandshakeReq(addressA, addressB.address))
inboundContext.completeHandshake(addressB)
// these are from livenessProbeInterval
downstream.expectNext(HandshakeReq(addressA, addressB.address))
downstream.expectNext(HandshakeReq(addressA, addressB.address))
downstream.cancel()
}
}
}

View file

@ -105,9 +105,6 @@ class OutboundIdleShutdownSpec extends ArteryMultiNodeSpec(s"""
assertStreamActive(association, Association.OrdinaryQueueIndex, expected = false)
}
Thread.sleep(2000)
// localArtery.quarantine(remoteAddress, Some(remoteUid), "Test")
// the outbound streams are inactive and association quarantined, then it's completely removed
eventually {
localArtery.remoteAddresses should not contain remoteAddress

View file

@ -55,7 +55,7 @@ object RemoteWatcherSpec {
// that doesn't interfere with the real watch that is going on in the background
context.system.eventStream.publish(TestRemoteWatcher.AddressTerm(address))
override def quarantine(address: Address, uid: Option[Long], reason: String): Unit = {
override def quarantine(address: Address, uid: Option[Long], reason: String, harmless: Boolean): Unit = {
// don't quarantine in remoting, but publish a testable message
context.system.eventStream.publish(TestRemoteWatcher.Quarantined(address, uid))
}