pekko/akka-remote/src/main/scala/akka/remote/artery/Association.scala

1147 lines
46 KiB
Scala
Raw Normal View History

/*
* Copyright (C) 2016-2019 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.remote.artery
import java.net.ConnectException
import java.util.Queue
import java.util.concurrent.CountDownLatch
import java.util.concurrent.LinkedBlockingQueue
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.AtomicReference
import scala.annotation.tailrec
import scala.concurrent.Future
import scala.concurrent.Promise
import scala.concurrent.duration._
import scala.util.control.NoStackTrace
import akka.Done
import akka.NotUsed
import akka.actor.ActorRef
import akka.actor.ActorSelectionMessage
import akka.actor.Address
import akka.actor.Cancellable
import akka.actor.Dropped
import akka.dispatch.sysmsg.SystemMessage
import akka.event.Logging
import akka.remote.DaemonMsgCreate
import akka.remote.PriorityMessage
import akka.remote.RemoteActorRef
import akka.remote.UniqueAddress
import akka.remote.artery.ArteryTransport.AeronTerminated
import akka.remote.artery.ArteryTransport.ShuttingDown
import akka.remote.artery.Encoder.OutboundCompressionAccess
2016-05-12 08:56:28 +02:00
import akka.remote.artery.InboundControlJunction.ControlMessageSubject
import akka.remote.artery.OutboundControlJunction.OutboundControlIngress
import akka.remote.artery.OutboundHandshake.HandshakeTimeoutException
import akka.remote.artery.SystemMessageDelivery.ClearSystemMessageDelivery
import akka.remote.artery.aeron.AeronSink.GaveUpMessageException
import akka.remote.artery.compress.CompressionTable
import akka.stream.AbruptTerminationException
import akka.stream.KillSwitches
import akka.stream.Materializer
import akka.stream.SharedKillSwitch
import akka.stream.StreamTcpException
import akka.stream.scaladsl.Keep
import akka.stream.scaladsl.MergeHub
import akka.stream.scaladsl.Source
import akka.util.OptionVal
import akka.util.PrettyDuration._
import akka.util.Unsafe
import akka.util.WildcardIndex
Various scala-2.13.0-M5 fixes fix akka-actor-tests compile errors some tests still fail though Fix test failures in akka-actor-test Manually work arround missing implicit Factory[Nothing, Seq[Nothing]] see https://github.com/scala/scala-collection-compat/issues/137 akka-remote scalafix changes Fix shutdownAll compile error test:akka-remote scalafix changes akka-multi-node-testkit scalafix Fix akka-remote-tests multi-jvm compile errors akka-stream-tests/test:scalafix Fix test:akka-stream-tests Crude implementation of ByteString.map scalafix akka-actor-typed, akka-actor-typed-tests akka-actor-typed-tests compile and succeed scalafix akka-camel scalafix akka-cluster akka-cluster compile & test scalafix akka-cluster-metrics Fix akka-cluster-metrics scalafix akka-cluster-tools akka-cluster-tools compile and test scalafix akka-distributed-data akka-distributed-data fixes scalafix akka-persistence scalafix akka-cluster-sharding fix akka-cluster-sharding scalafix akka-contrib Fix akka-cluster-sharding-typed test scalafix akka-docs Use scala-stm 0.9 (released for M5) akka-docs Remove dependency on collections-compat Cherry-pick the relevant constructs to our own private utils Shorten 'scala.collections.immutable' by importing it Duplicate 'immutable' imports Use 'foreach' on futures Replace MapLike with regular Map Internal API markers Simplify ccompat by moving PackageShared into object Since we don't currently need to differentiate between 2.11 and Avoid relying on 'union' (and ++) being left-biased Fix akka-actor/doc by removing -Ywarn-unused Make more things more private Copyright headers Use 'unsorted' to go from SortedSet to Set Duplicate import Use onComplete rather than failed.foreach Clarify why we partly duplicate scala-collection-compat
2018-11-22 16:18:10 +01:00
import akka.util.ccompat._
2019-04-16 20:26:09 +02:00
import com.github.ghik.silencer.silent
import org.agrona.concurrent.ManyToOneConcurrentArrayQueue
/**
* INTERNAL API
*/
private[remote] object Association {
sealed trait QueueWrapper extends SendQueue.ProducerApi[OutboundEnvelope] {
def queue: Queue[OutboundEnvelope]
}
final case class QueueWrapperImpl(queue: Queue[OutboundEnvelope]) extends QueueWrapper {
override def offer(message: OutboundEnvelope): Boolean = queue.offer(message)
override def isEnabled: Boolean = true
}
object DisabledQueueWrapper extends QueueWrapper {
override def queue: java.util.Queue[OutboundEnvelope] =
throw new UnsupportedOperationException("The Queue is disabled")
override def offer(message: OutboundEnvelope): Boolean =
throw new UnsupportedOperationException("The method offer() is illegal on a disabled queue")
override def isEnabled: Boolean = false
}
object RemovedQueueWrapper extends QueueWrapper {
override def queue: java.util.Queue[OutboundEnvelope] =
throw new UnsupportedOperationException("The Queue is removed")
override def offer(message: OutboundEnvelope): Boolean = false
override def isEnabled: Boolean = false
}
final case class LazyQueueWrapper(queue: Queue[OutboundEnvelope], materialize: () => Unit) extends QueueWrapper {
private val onlyOnce = new AtomicBoolean
2016-09-06 08:04:02 +02:00
def runMaterialize(): Unit = {
if (onlyOnce.compareAndSet(false, true))
materialize()
2016-09-06 08:04:02 +02:00
}
override def offer(message: OutboundEnvelope): Boolean = {
runMaterialize()
queue.offer(message)
}
override def isEnabled: Boolean = true
}
final val ControlQueueIndex = 0
final val LargeQueueIndex = 1
final val OrdinaryQueueIndex = 2
sealed trait StopSignal
case object OutboundStreamStopIdleSignal extends RuntimeException("") with StopSignal with NoStackTrace
case object OutboundStreamStopQuarantinedSignal extends RuntimeException("") with StopSignal with NoStackTrace
2019-03-13 10:56:20 +01:00
final case class OutboundStreamMatValues(
streamKillSwitch: OptionVal[SharedKillSwitch],
completed: Future[Done],
stopping: OptionVal[StopSignal])
}
/**
* INTERNAL API
*
* Thread-safe, mutable holder for association state. Main entry point for remote destined message to a specific
* remote address.
*/
2019-04-16 20:26:09 +02:00
@ccompatUsedUntil213
2019-03-13 10:56:20 +01:00
private[remote] class Association(
val transport: ArteryTransport,
val materializer: Materializer,
val controlMaterializer: Materializer,
override val remoteAddress: Address,
override val controlSubject: ControlMessageSubject,
largeMessageDestinations: WildcardIndex[NotUsed],
priorityMessageDestinations: WildcardIndex[NotUsed],
outboundEnvelopePool: ObjectPool[ReusableOutboundEnvelope])
2019-03-11 10:38:24 +01:00
extends AbstractAssociation
with OutboundContext {
import Association._
import FlightRecorderEvents._
require(remoteAddress.port.nonEmpty)
private val log = Logging(transport.system, getClass.getName)
private def flightRecorder = transport.topLevelFlightRecorder
2016-09-07 10:41:36 +02:00
override def settings = transport.settings
private def advancedSettings = transport.settings.Advanced
2019-03-11 10:38:24 +01:00
private val restartCounter =
new RestartCounter(advancedSettings.OutboundMaxRestarts, advancedSettings.OutboundRestartTimeout)
// We start with the raw wrapped queue and then it is replaced with the materialized value of
// the `SendQueue` after materialization. Using same underlying queue. This makes it possible to
// start sending (enqueuing) to the Association immediate after construction.
def createQueue(capacity: Int, queueIndex: Int): Queue[OutboundEnvelope] = {
val linked = queueIndex == ControlQueueIndex || queueIndex == LargeQueueIndex
if (linked)
new LinkedBlockingQueue[OutboundEnvelope](capacity) // less memory than ManyToOneConcurrentArrayQueue
else
new ManyToOneConcurrentArrayQueue[OutboundEnvelope](capacity)
}
2016-09-07 10:41:36 +02:00
private val outboundLanes = advancedSettings.OutboundLanes
private val controlQueueSize = advancedSettings.OutboundControlQueueSize
private val queueSize = advancedSettings.OutboundMessageQueueSize
private val largeQueueSize = advancedSettings.OutboundLargeMessageQueueSize
private[this] val queues: Array[SendQueue.ProducerApi[OutboundEnvelope]] = new Array(2 + outboundLanes)
queues(ControlQueueIndex) = QueueWrapperImpl(createQueue(controlQueueSize, ControlQueueIndex)) // control stream
queues(LargeQueueIndex) =
if (transport.largeMessageChannelEnabled) // large messages stream
QueueWrapperImpl(createQueue(largeQueueSize, LargeQueueIndex))
else
DisabledQueueWrapper
(0 until outboundLanes).foreach { i =>
queues(OrdinaryQueueIndex + i) = QueueWrapperImpl(createQueue(queueSize, OrdinaryQueueIndex + i)) // ordinary messages stream
}
@volatile private[this] var queuesVisibility = false
private def controlQueue: SendQueue.ProducerApi[OutboundEnvelope] = queues(ControlQueueIndex)
2016-09-06 08:04:02 +02:00
@volatile private[this] var _outboundControlIngress: OptionVal[OutboundControlIngress] = OptionVal.None
@volatile private[this] var materializing = new CountDownLatch(1)
@volatile private[this] var outboundCompressionAccess: Vector[OutboundCompressionAccess] = Vector.empty
2016-09-06 08:04:02 +02:00
// keyed by stream queue index
private[this] val streamMatValues = new AtomicReference(Map.empty[Int, OutboundStreamMatValues])
private[this] val idleTimer = new AtomicReference[Option[Cancellable]](None)
private[this] val stopQuarantinedTimer = new AtomicReference[Option[Cancellable]](None)
private[remote] def changeActorRefCompression(table: CompressionTable[ActorRef]): Future[Done] =
updateOutboundCompression(c => c.changeActorRefCompression(table))
private[remote] def changeClassManifestCompression(table: CompressionTable[String]): Future[Done] =
updateOutboundCompression(c => c.changeClassManifestCompression(table))
private def clearOutboundCompression(): Future[Done] =
updateOutboundCompression(c => c.clearCompression())
private def updateOutboundCompression(action: OutboundCompressionAccess => Future[Done]): Future[Done] = {
implicit val ec = transport.system.dispatchers.internalDispatcher
val c = outboundCompressionAccess
if (c.isEmpty) Future.successful(Done)
else if (c.size == 1) action(c.head)
else Future.sequence(c.map(action(_))).map(_ => Done)
}
private def clearInboundCompression(originUid: Long): Unit =
transport.inboundCompressionAccess match {
case OptionVal.Some(access) => access.closeCompressionFor(originUid)
case _ => // do nothing
}
private def deadletters = transport.system.deadLetters
2016-05-12 08:56:28 +02:00
def outboundControlIngress: OutboundControlIngress = {
2016-09-06 08:04:02 +02:00
_outboundControlIngress match {
case OptionVal.Some(o) => o
case OptionVal.None =>
2016-09-06 08:04:02 +02:00
controlQueue match {
case w: LazyQueueWrapper => w.runMaterialize()
case _ =>
2016-09-06 08:04:02 +02:00
}
// the outboundControlIngress may be accessed before the stream is materialized
// using CountDownLatch to make sure that materialization is completed
2016-09-06 08:04:02 +02:00
materializing.await(10, TimeUnit.SECONDS)
_outboundControlIngress match {
case OptionVal.Some(o) => o
case OptionVal.None =>
if (transport.isShutdown) throw ShuttingDown
else throw new IllegalStateException("outboundControlIngress not initialized yet")
2016-09-06 08:04:02 +02:00
}
}
}
override def localAddress: UniqueAddress = transport.localAddress
/**
* Holds reference to shared state of Association - *access only via helper methods*
*/
@volatile
2019-04-16 20:26:09 +02:00
@silent
private[this] var _sharedStateDoNotCallMeDirectly: AssociationState = AssociationState()
/**
* Helper method for access to underlying state via Unsafe
*
* @param oldState Previous state
* @param newState Next state on transition
* @return Whether the previous state matched correctly
*/
@inline
private[artery] def swapState(oldState: AssociationState, newState: AssociationState): Boolean =
Unsafe.instance.compareAndSwapObject(this, AbstractAssociation.sharedStateOffset, oldState, newState)
/**
* @return Reference to current shared state
*/
def associationState: AssociationState =
Unsafe.instance.getObjectVolatile(this, AbstractAssociation.sharedStateOffset).asInstanceOf[AssociationState]
def setControlIdleKillSwitch(killSwitch: OptionVal[SharedKillSwitch]): Unit = {
val current = associationState
swapState(current, current.withControlIdleKillSwitch(killSwitch))
}
def completeHandshake(peer: UniqueAddress): Future[Done] = {
2019-03-13 10:56:20 +01:00
require(
remoteAddress == peer.address,
s"wrong remote address in completeHandshake, got ${peer.address}, expected $remoteAddress")
val current = associationState
current.uniqueRemoteAddressValue() match {
case Some(`peer`) =>
// handshake already completed
Future.successful(Done)
case _ =>
// clear outbound compression, it's safe to do that several times if someone else
// completes handshake at same time, but it's important to clear it before
// we signal that the handshake is completed (uniqueRemoteAddressPromise.trySuccess)
implicit val ec = transport.system.dispatchers.internalDispatcher
clearOutboundCompression().map { _ =>
current.uniqueRemoteAddressPromise.trySuccess(peer)
current.uniqueRemoteAddressValue() match {
case Some(`peer`) =>
// our value
case _ =>
val newState = current.newIncarnation(Promise.successful(peer))
if (swapState(current, newState)) {
current.uniqueRemoteAddressValue() match {
case Some(old) =>
cancelStopQuarantinedTimer()
2019-03-13 10:56:20 +01:00
log.debug(
"Incarnation {} of association to [{}] with new UID [{}] (old UID [{}])",
newState.incarnation,
peer.address,
peer.uid,
old.uid)
clearInboundCompression(old.uid)
case None =>
// Failed, nothing to do
}
// if swap failed someone else completed before us, and that is fine
}
}
Done
}
}
}
// OutboundContext
override def sendControl(message: ControlMessage): Unit = {
try {
if (!transport.isShutdown && !isRemovedAfterQuarantined()) {
if (associationState.isQuarantined()) {
2019-03-11 10:38:24 +01:00
log.debug("Send control message [{}] to quarantined [{}]", Logging.messageClassName(message), remoteAddress)
setupStopQuarantinedTimer()
}
outboundControlIngress.sendControlMessage(message)
}
} catch {
case ShuttingDown => // silence it
}
}
def send(message: Any, sender: OptionVal[ActorRef], recipient: OptionVal[RemoteActorRef]): Unit = {
2016-09-07 15:38:13 +02:00
def createOutboundEnvelope(): OutboundEnvelope =
outboundEnvelopePool.acquire().init(recipient, message.asInstanceOf[AnyRef], sender)
// volatile read to see latest queue array
2019-04-16 20:26:09 +02:00
@silent
val unused = queuesVisibility
def dropped(queueIndex: Int, qSize: Int, env: OutboundEnvelope): Unit = {
val removed = isRemovedAfterQuarantined()
if (removed) recipient match {
case OptionVal.Some(ref) => ref.cachedAssociation = null // don't use this Association instance any more
case OptionVal.None =>
}
val reason =
if (removed) "Due to removed unused quarantined association"
else s"Due to overflow of send queue, size [$qSize]"
transport.system.eventStream
.publish(Dropped(message, reason, env.sender.getOrElse(ActorRef.noSender), recipient.getOrElse(deadletters)))
flightRecorder.hiFreq(Transport_SendQueueOverflow, queueIndex)
deadletters ! env
}
val state = associationState
val quarantined = state.isQuarantined()
val messageIsClearSystemMessageDelivery = message.isInstanceOf[ClearSystemMessageDelivery]
// allow ActorSelectionMessage to pass through quarantine, to be able to establish interaction with new system
if (message.isInstanceOf[ActorSelectionMessage] || !quarantined || messageIsClearSystemMessageDelivery) {
if (quarantined && !messageIsClearSystemMessageDelivery) {
2019-03-13 10:56:20 +01:00
log.debug(
"Quarantine piercing attempt with message [{}] to [{}]",
Logging.messageClassName(message),
recipient.getOrElse(""))
setupStopQuarantinedTimer()
}
try {
val outboundEnvelope = createOutboundEnvelope()
message match {
case _: SystemMessage =>
if (!controlQueue.offer(outboundEnvelope)) {
quarantine(reason = s"Due to overflow of control queue, size [$controlQueueSize]")
dropped(ControlQueueIndex, controlQueueSize, outboundEnvelope)
}
case ActorSelectionMessage(_: PriorityMessage, _, _) | _: ControlMessage | _: ClearSystemMessageDelivery =>
// ActorSelectionMessage with PriorityMessage is used by cluster and remote failure detector heartbeating
if (!controlQueue.offer(outboundEnvelope)) {
dropped(ControlQueueIndex, controlQueueSize, outboundEnvelope)
}
case _: DaemonMsgCreate =>
// DaemonMsgCreate is not a SystemMessage, but must be sent over the control stream because
// remote deployment process depends on message ordering for DaemonMsgCreate and Watch messages.
// First ordinary message may arrive earlier but then the resolve in the Decoder is retried
// so that the first message can be delivered after the remote actor has been created.
if (!controlQueue.offer(outboundEnvelope))
dropped(ControlQueueIndex, controlQueueSize, outboundEnvelope)
case _ =>
val queueIndex = selectQueue(recipient)
val queue = queues(queueIndex)
val offerOk = queue.offer(outboundEnvelope)
if (!offerOk)
dropped(queueIndex, queueSize, outboundEnvelope)
}
} catch {
case ShuttingDown => // silence it
}
} else if (log.isDebugEnabled)
2019-03-13 10:56:20 +01:00
log.debug(
"Dropping message [{}] from [{}] to [{}] due to quarantined system [{}]",
Logging.messageClassName(message),
sender.getOrElse(deadletters),
recipient.getOrElse(recipient),
remoteAddress)
}
private def selectQueue(recipient: OptionVal[RemoteActorRef]): Int = {
recipient match {
case OptionVal.Some(r) =>
r.cachedSendQueueIndex match {
case -1 =>
// only happens when messages are sent to new remote destination
// and is then cached on the RemoteActorRef
val elements = r.path.elements
val idx =
if (priorityMessageDestinations.find(elements).isDefined) {
log.debug("Using priority message stream for {}", r.path)
ControlQueueIndex
} else if (transport.largeMessageChannelEnabled && largeMessageDestinations.find(elements).isDefined) {
log.debug("Using large message stream for {}", r.path)
LargeQueueIndex
} else if (outboundLanes == 1) {
OrdinaryQueueIndex
} else {
// select lane based on destination, to preserve message order
OrdinaryQueueIndex + (math.abs(r.path.uid % outboundLanes))
}
r.cachedSendQueueIndex = idx
idx
case idx => idx
}
case OptionVal.None =>
OrdinaryQueueIndex
}
}
override def isOrdinaryMessageStreamActive(): Boolean =
isStreamActive(OrdinaryQueueIndex)
def isStreamActive(queueIndex: Int): Boolean = {
queues(queueIndex) match {
case _: LazyQueueWrapper => false
case DisabledQueueWrapper => false
case RemovedQueueWrapper => false
case _ => true
}
}
def sendTerminationHint(replyTo: ActorRef): Int = {
if (!associationState.isQuarantined()) {
val msg = ActorSystemTerminating(localAddress)
var sent = 0
queues.iterator.filter(q => q.isEnabled && !q.isInstanceOf[LazyQueueWrapper]).foreach { queue =>
try {
2019-03-11 10:38:24 +01:00
val envelope = outboundEnvelopePool.acquire().init(OptionVal.None, msg, OptionVal.Some(replyTo))
queue.offer(envelope)
sent += 1
} catch {
case ShuttingDown => // can be thrown if `offer` triggers new materialization
}
}
sent
} else 0
}
// OutboundContext
override def quarantine(reason: String): Unit = {
val uid = associationState.uniqueRemoteAddressValue().map(_.uid)
quarantine(reason, uid, harmless = false)
}
@tailrec final def quarantine(reason: String, uid: Option[Long], harmless: Boolean): Unit = {
uid match {
case Some(u) =>
val current = associationState
current.uniqueRemoteAddressValue() match {
case Some(peer) if peer.uid == u =>
if (!current.isQuarantined(u)) {
val newState = current.newQuarantined()
if (swapState(current, newState)) {
// quarantine state change was performed
if (harmless) {
2019-03-13 10:56:20 +01:00
log.info(
"Association to [{}] having UID [{}] has been stopped. All " +
"messages to this UID will be delivered to dead letters. Reason: {}",
remoteAddress,
u,
reason)
2019-03-11 10:38:24 +01:00
transport.system.eventStream
.publish(GracefulShutdownQuarantinedEvent(UniqueAddress(remoteAddress, u), reason))
} else {
log.warning(
"Association to [{}] with UID [{}] is irrecoverably failed. UID is now quarantined and all " +
2019-03-11 10:38:24 +01:00
"messages to this UID will be delivered to dead letters. " +
"Remote ActorSystem must be restarted to recover from this situation. Reason: {}",
remoteAddress,
u,
reason)
transport.system.eventStream.publish(QuarantinedEvent(UniqueAddress(remoteAddress, u)))
}
flightRecorder.loFreq(Transport_Quarantined, s"$remoteAddress - $u")
clearOutboundCompression()
clearInboundCompression(u)
// end delivery of system messages to that incarnation after this point
send(ClearSystemMessageDelivery(current.incarnation), OptionVal.None, OptionVal.None)
if (!harmless) {
// try to tell the other system that we have quarantined it
sendControl(Quarantined(localAddress, peer))
}
setupStopQuarantinedTimer()
} else
quarantine(reason, uid, harmless) // recursive
}
case Some(peer) =>
log.info(
"Quarantine of [{}] ignored due to non-matching UID, quarantine requested for [{}] but current is [{}]. {}",
2019-03-11 10:38:24 +01:00
remoteAddress,
u,
peer.uid,
reason)
send(ClearSystemMessageDelivery(current.incarnation - 1), OptionVal.None, OptionVal.None)
case None =>
log.info(
"Quarantine of [{}] ignored because handshake not completed, quarantine request was for old incarnation. {}",
2019-03-11 10:38:24 +01:00
remoteAddress,
reason)
}
case None =>
log.warning("Quarantine of [{}] ignored because unknown UID", remoteAddress)
}
}
/**
* After calling this no messages can be sent with this Association instance
*/
def removedAfterQuarantined(): Unit = {
if (!isRemovedAfterQuarantined()) {
flightRecorder.loFreq(Transport_RemovedQuarantined, remoteAddress.toString)
queues(ControlQueueIndex) = RemovedQueueWrapper
if (transport.largeMessageChannelEnabled)
queues(LargeQueueIndex) = RemovedQueueWrapper
(0 until outboundLanes).foreach { i =>
queues(OrdinaryQueueIndex + i) = RemovedQueueWrapper
}
queuesVisibility = true // volatile write for visibility of the queues array
// cleanup
_outboundControlIngress = OptionVal.None
outboundCompressionAccess = Vector.empty
cancelAllTimers()
abortQuarantined()
log.info("Unused association to [{}] removed after quarantine", remoteAddress)
}
}
def isRemovedAfterQuarantined(): Boolean =
queues(ControlQueueIndex) == RemovedQueueWrapper
private def cancelStopQuarantinedTimer(): Unit = {
val current = stopQuarantinedTimer.get
current.foreach(_.cancel())
stopQuarantinedTimer.compareAndSet(current, None)
}
private def setupStopQuarantinedTimer(): Unit = {
cancelStopQuarantinedTimer()
stopQuarantinedTimer.set(Some(transport.system.scheduler.scheduleOnce(advancedSettings.StopQuarantinedAfterIdle) {
if (associationState.isQuarantined())
abortQuarantined()
}(transport.system.dispatchers.internalDispatcher)))
}
private def abortQuarantined(): Unit = {
cancelIdleTimer()
streamMatValues.get.foreach {
case (queueIndex, OutboundStreamMatValues(killSwitch, _, _)) =>
killSwitch match {
case OptionVal.Some(k) =>
setStopReason(queueIndex, OutboundStreamStopQuarantinedSignal)
clearStreamKillSwitch(queueIndex, k)
k.abort(OutboundStreamStopQuarantinedSignal)
case OptionVal.None => // already aborted
}
}
}
private def cancelIdleTimer(): Unit = {
val current = idleTimer.get
current.foreach(_.cancel())
idleTimer.compareAndSet(current, None)
}
private def setupIdleTimer(): Unit = {
if (idleTimer.get.isEmpty) {
val StopIdleOutboundAfter = settings.Advanced.StopIdleOutboundAfter
val QuarantineIdleOutboundAfter = settings.Advanced.QuarantineIdleOutboundAfter
val interval = StopIdleOutboundAfter / 2
val initialDelay = settings.Advanced.Tcp.ConnectionTimeout.max(StopIdleOutboundAfter) + 1.second
val task =
transport.system.scheduler.scheduleWithFixedDelay(initialDelay, interval) { () =>
val lastUsedDurationNanos = System.nanoTime() - associationState.lastUsedTimestamp.get
if (lastUsedDurationNanos >= QuarantineIdleOutboundAfter.toNanos && !associationState.isQuarantined()) {
// If idle longer than quarantine-idle-outbound-after and the low frequency HandshakeReq
// doesn't get through it will be quarantined to cleanup lingering associations to crashed systems.
quarantine(s"Idle longer than quarantine-idle-outbound-after [${QuarantineIdleOutboundAfter.pretty}]")
} else if (lastUsedDurationNanos >= StopIdleOutboundAfter.toNanos) {
streamMatValues.get.foreach {
case (queueIndex, OutboundStreamMatValues(streamKillSwitch, _, stopping)) =>
if (isStreamActive(queueIndex) && stopping.isEmpty) {
if (queueIndex != ControlQueueIndex) {
streamKillSwitch match {
case OptionVal.Some(k) =>
// for non-control streams we can stop the entire stream
log.info("Stopping idle outbound stream [{}] to [{}]", queueIndex, remoteAddress)
flightRecorder.loFreq(Transport_StopIdleOutbound, s"$remoteAddress - $queueIndex")
setStopReason(queueIndex, OutboundStreamStopIdleSignal)
clearStreamKillSwitch(queueIndex, k)
k.abort(OutboundStreamStopIdleSignal)
case OptionVal.None => // already aborted
}
} else {
// only stop the transport parts of the stream because SystemMessageDelivery stage has
// state (seqno) and system messages might be sent at the same time
associationState.controlIdleKillSwitch match {
case OptionVal.Some(killSwitch) =>
log.info("Stopping idle outbound control stream to [{}]", remoteAddress)
flightRecorder.loFreq(Transport_StopIdleOutbound, s"$remoteAddress - $queueIndex")
setControlIdleKillSwitch(OptionVal.None)
killSwitch.abort(OutboundStreamStopIdleSignal)
case OptionVal.None => // already stopped
}
}
}
}
}
}(transport.system.dispatcher)
if (!idleTimer.compareAndSet(None, Some(task))) {
// another thread did same thing and won
task.cancel()
}
}
}
private def cancelAllTimers(): Unit = {
cancelIdleTimer()
cancelStopQuarantinedTimer()
}
private def sendToDeadLetters[T](pending: Vector[OutboundEnvelope]): Unit = {
pending.foreach(transport.system.deadLetters ! _)
}
/**
* Called once after construction when the `Association` instance
* wins the CAS in the `AssociationRegistry`. It will materialize
* the streams. It is possible to sending (enqueuing) to the association
* before this method is called.
*
* @throws ShuttingDown if called while the transport is shutting down
*/
def associate(): Unit = {
if (!controlQueue.isInstanceOf[QueueWrapper])
throw new IllegalStateException("associate() must only be called once")
runOutboundStreams()
}
private def runOutboundStreams(): Unit = {
// it's important to materialize the outboundControl stream first,
// so that outboundControlIngress is ready when stages for all streams start
runOutboundControlStream()
runOutboundOrdinaryMessagesStream()
if (transport.largeMessageChannelEnabled)
runOutboundLargeMessagesStream()
}
private def runOutboundControlStream(): Unit = {
if (transport.isShutdown) throw ShuttingDown
log.debug("Starting outbound control stream to [{}]", remoteAddress)
val wrapper = getOrCreateQueueWrapper(ControlQueueIndex, queueSize)
queues(ControlQueueIndex) = wrapper // use new underlying queue immediately for restarts
queuesVisibility = true // volatile write for visibility of the queues array
val streamKillSwitch = KillSwitches.shared("outboundControlStreamKillSwitch")
def sendQueuePostStop[T](pending: Vector[OutboundEnvelope]): Unit = {
sendToDeadLetters(pending)
val systemMessagesCount = pending.count(env => env.message.isInstanceOf[SystemMessage])
if (systemMessagesCount > 0)
quarantine(s"SendQueue stopped with [$systemMessagesCount] pending system messages.")
}
val (queueValue, (control, completed)) =
2019-03-11 10:38:24 +01:00
Source
.fromGraph(new SendQueue[OutboundEnvelope](sendQueuePostStop))
.via(streamKillSwitch.flow)
.toMat(transport.outboundControl(this))(Keep.both)
.run()(materializer)
queueValue.inject(wrapper.queue)
// replace with the materialized value, still same underlying queue
queues(ControlQueueIndex) = queueValue
queuesVisibility = true // volatile write for visibility of the queues array
2016-09-06 08:04:02 +02:00
_outboundControlIngress = OptionVal.Some(control)
materializing.countDown()
updateStreamMatValues(ControlQueueIndex, streamKillSwitch, completed)
setupIdleTimer()
2019-03-13 10:56:20 +01:00
attachOutboundStreamRestart(
"Outbound control stream",
ControlQueueIndex,
controlQueueSize,
completed,
() => runOutboundControlStream())
}
private def getOrCreateQueueWrapper(queueIndex: Int, capacity: Int): QueueWrapper = {
2019-04-16 20:26:09 +02:00
@silent
val unused = queuesVisibility // volatile read to see latest queues array
queues(queueIndex) match {
case existing: QueueWrapper => existing
2019-03-11 10:38:24 +01:00
case _ =>
// use new queue for restarts
QueueWrapperImpl(createQueue(capacity, queueIndex))
}
}
private def runOutboundOrdinaryMessagesStream(): Unit = {
if (transport.isShutdown) throw ShuttingDown
val streamKillSwitch = KillSwitches.shared("outboundMessagesKillSwitch")
if (outboundLanes == 1) {
log.debug("Starting outbound message stream to [{}]", remoteAddress)
val queueIndex = OrdinaryQueueIndex
val wrapper = getOrCreateQueueWrapper(queueIndex, queueSize)
queues(queueIndex) = wrapper // use new underlying queue immediately for restarts
queuesVisibility = true // volatile write for visibility of the queues array
val (queueValue, _, changeCompression, completed) =
2019-03-11 10:38:24 +01:00
Source
.fromGraph(new SendQueue[OutboundEnvelope](sendToDeadLetters))
.via(streamKillSwitch.flow)
.viaMat(transport.outboundTestFlow(this))(Keep.both)
.toMat(transport.outbound(this))({ case ((a, b), (c, d)) => (a, b, c, d) }) // "keep all, exploded"
.run()(materializer)
queueValue.inject(wrapper.queue)
// replace with the materialized value, still same underlying queue
queues(queueIndex) = queueValue
queuesVisibility = true // volatile write for visibility of the queues array
outboundCompressionAccess = Vector(changeCompression)
updateStreamMatValues(OrdinaryQueueIndex, streamKillSwitch, completed)
2019-03-13 10:56:20 +01:00
attachOutboundStreamRestart(
"Outbound message stream",
OrdinaryQueueIndex,
queueSize,
completed,
() => runOutboundOrdinaryMessagesStream())
} else {
log.debug("Starting outbound message stream to [{}] with [{}] lanes", remoteAddress, outboundLanes)
val wrappers = (0 until outboundLanes).map { i =>
val wrapper = getOrCreateQueueWrapper(OrdinaryQueueIndex + i, queueSize)
queues(OrdinaryQueueIndex + i) = wrapper // use new underlying queue immediately for restarts
queuesVisibility = true // volatile write for visibility of the queues array
wrapper
}.toVector
2019-03-11 10:38:24 +01:00
val lane = Source
.fromGraph(new SendQueue[OutboundEnvelope](sendToDeadLetters))
.via(streamKillSwitch.flow)
.via(transport.outboundTestFlow(this))
.viaMat(transport.outboundLane(this))(Keep.both)
.watchTermination()(Keep.both)
// recover to avoid error logging by MergeHub
.recoverWithRetries(-1, { case _: Throwable => Source.empty })
.mapMaterializedValue {
case ((q, c), w) => (q, c, w)
}
2019-03-11 10:38:24 +01:00
val (mergeHub, transportSinkCompleted) = MergeHub
.source[EnvelopeBuffer]
.via(streamKillSwitch.flow)
2019-03-11 10:38:24 +01:00
.toMat(transport.outboundTransportSink(this))(Keep.both)
.run()(materializer)
val values: Vector[(SendQueue.QueueValue[OutboundEnvelope], Encoder.OutboundCompressionAccess, Future[Done])] =
2019-03-11 10:38:24 +01:00
(0 until outboundLanes).iterator
.map { _ =>
lane.to(mergeHub).run()(materializer)
}
.to(Vector)
val (queueValues, compressionAccessValues, laneCompletedValues) = values.unzip3
implicit val ec = transport.system.dispatchers.internalDispatcher
// tear down all parts if one part fails or completes
Future.firstCompletedOf(laneCompletedValues).failed.foreach { reason =>
streamKillSwitch.abort(reason)
}
2019-03-11 10:38:24 +01:00
(laneCompletedValues :+ transportSinkCompleted).foreach(_.foreach { _ =>
streamKillSwitch.shutdown()
})
val allCompleted = Future.sequence(laneCompletedValues).flatMap(_ => transportSinkCompleted)
queueValues.zip(wrappers).zipWithIndex.foreach {
case ((q, w), i) =>
q.inject(w.queue)
queues(OrdinaryQueueIndex + i) = q // replace with the materialized value, still same underlying queue
}
queuesVisibility = true // volatile write for visibility of the queues array
outboundCompressionAccess = compressionAccessValues
2019-03-13 10:56:20 +01:00
attachOutboundStreamRestart(
"Outbound message stream",
OrdinaryQueueIndex,
queueSize,
allCompleted,
() => runOutboundOrdinaryMessagesStream())
}
}
private def runOutboundLargeMessagesStream(): Unit = {
if (transport.isShutdown) throw ShuttingDown
log.debug("Starting outbound large message stream to [{}]", remoteAddress)
val wrapper = getOrCreateQueueWrapper(LargeQueueIndex, largeQueueSize)
queues(LargeQueueIndex) = wrapper // use new underlying queue immediately for restarts
queuesVisibility = true // volatile write for visibility of the queues array
val streamKillSwitch = KillSwitches.shared("outboundLargeMessagesKillSwitch")
2019-03-11 10:38:24 +01:00
val (queueValue, completed) = Source
.fromGraph(new SendQueue[OutboundEnvelope](sendToDeadLetters))
.via(streamKillSwitch.flow)
.via(transport.outboundTestFlow(this))
.toMat(transport.outboundLarge(this))(Keep.both)
.run()(materializer)
queueValue.inject(wrapper.queue)
// replace with the materialized value, still same underlying queue
queues(LargeQueueIndex) = queueValue
queuesVisibility = true // volatile write for visibility of the queues array
updateStreamMatValues(LargeQueueIndex, streamKillSwitch, completed)
2019-03-13 10:56:20 +01:00
attachOutboundStreamRestart(
"Outbound large message stream",
LargeQueueIndex,
largeQueueSize,
completed,
() => runOutboundLargeMessagesStream())
}
2019-03-13 10:56:20 +01:00
private def attachOutboundStreamRestart(
streamName: String,
queueIndex: Int,
queueCapacity: Int,
streamCompleted: Future[Done],
restart: () => Unit): Unit = {
def lazyRestart(): Unit = {
flightRecorder.loFreq(Transport_RestartOutbound, s"$remoteAddress - $streamName")
outboundCompressionAccess = Vector.empty
if (queueIndex == ControlQueueIndex) {
materializing = new CountDownLatch(1)
2016-09-06 08:04:02 +02:00
_outboundControlIngress = OptionVal.None
}
// LazyQueueWrapper will invoke the `restart` function when first message is offered
val wrappedRestartFun: () => Unit = () => {
restart()
}
if (!isRemovedAfterQuarantined())
queues(queueIndex) = LazyQueueWrapper(createQueue(queueCapacity, queueIndex), wrappedRestartFun)
queuesVisibility = true // volatile write for visibility of the queues array
}
implicit val ec = materializer.executionContext
streamCompleted.foreach { _ =>
// shutdown as expected
// countDown the latch in case threads are waiting on the latch in outboundControlIngress method
materializing.countDown()
}
streamCompleted.failed.foreach {
case ArteryTransport.ShutdownSignal =>
// shutdown as expected
cancelAllTimers()
// countDown the latch in case threads are waiting on the latch in outboundControlIngress method
materializing.countDown()
case cause if transport.isShutdown || isRemovedAfterQuarantined() =>
// don't restart after shutdown, but log some details so we notice
// for the TCP transport the ShutdownSignal is "converted" to StreamTcpException
if (!cause.isInstanceOf[StreamTcpException])
log.error(cause, s"{} to [{}] failed after shutdown. {}", streamName, remoteAddress, cause.getMessage)
cancelAllTimers()
// countDown the latch in case threads are waiting on the latch in outboundControlIngress method
materializing.countDown()
case _: AeronTerminated =>
// shutdown already in progress
cancelAllTimers()
case _: AbruptTerminationException =>
// ActorSystem shutdown
cancelAllTimers()
case cause =>
// it might have been stopped as expected due to idle or quarantine
// for the TCP transport the exception is "converted" to StreamTcpException
val stoppedIdle = cause == OutboundStreamStopIdleSignal ||
getStopReason(queueIndex).contains(OutboundStreamStopIdleSignal)
val stoppedQuarantined = cause == OutboundStreamStopQuarantinedSignal ||
getStopReason(queueIndex).contains(OutboundStreamStopQuarantinedSignal)
// for some cases restart unconditionally, without counting restarts
val bypassRestartCounter = cause match {
case _: GaveUpMessageException => true
case _ => stoppedIdle || stoppedQuarantined
}
if (queueIndex == ControlQueueIndex && !stoppedQuarantined) {
2016-09-06 08:04:02 +02:00
cause match {
case _: HandshakeTimeoutException => // ok, quarantine not possible without UID
2019-03-11 10:38:24 +01:00
case _ =>
// Must quarantine in case all system messages haven't been delivered.
// See also comment in the stoppedIdle case below
quarantine(s"Outbound control stream restarted. $cause")
2016-09-06 08:04:02 +02:00
}
}
def isConnectException: Boolean =
cause.isInstanceOf[StreamTcpException] && cause.getCause != null && cause.getCause
.isInstanceOf[ConnectException]
if (stoppedIdle) {
log.debug("{} to [{}] was idle and stopped. It will be restarted if used again.", streamName, remoteAddress)
lazyRestart()
} else if (stoppedQuarantined) {
2019-03-13 10:56:20 +01:00
log.debug(
"{} to [{}] was quarantined and stopped. It will be restarted if used again.",
streamName,
remoteAddress)
lazyRestart()
} else if (bypassRestartCounter || restartCounter.restart()) {
// ConnectException may happen repeatedly and are already logged in connectionFlowWithRestart
if (isConnectException)
log.debug("{} to [{}] failed. Restarting it. {}", streamName, remoteAddress, cause)
else
log.warning("{} to [{}] failed. Restarting it. {}", streamName, remoteAddress, cause)
lazyRestart()
2016-05-19 08:24:27 +02:00
} else {
2019-03-13 10:56:20 +01:00
log.error(
cause,
s"{} to [{}] failed and restarted {} times within {} seconds. Terminating system. ${cause.getMessage}",
streamName,
remoteAddress,
advancedSettings.OutboundMaxRestarts,
advancedSettings.OutboundRestartTimeout.toSeconds)
cancelAllTimers()
2016-05-19 08:24:27 +02:00
transport.system.terminate()
}
}
}
2019-03-13 10:56:20 +01:00
private def updateStreamMatValues(
streamId: Int,
streamKillSwitch: SharedKillSwitch,
completed: Future[Done]): Unit = {
implicit val ec = materializer.executionContext
2019-03-11 10:38:24 +01:00
updateStreamMatValues(streamId, OutboundStreamMatValues(OptionVal.Some(streamKillSwitch), completed.recover {
case _ => Done
}, stopping = OptionVal.None))
}
@tailrec private def updateStreamMatValues(streamId: Int, values: OutboundStreamMatValues): Unit = {
val prev = streamMatValues.get()
if (!streamMatValues.compareAndSet(prev, prev + (streamId -> values))) {
updateStreamMatValues(streamId, values)
}
}
@tailrec private def setStopReason(streamId: Int, stopSignal: StopSignal): Unit = {
val prev = streamMatValues.get()
prev.get(streamId) match {
case Some(v) =>
if (!streamMatValues.compareAndSet(prev, prev.updated(streamId, v.copy(stopping = OptionVal.Some(stopSignal)))))
setStopReason(streamId, stopSignal)
case None => throw new IllegalStateException(s"Expected streamMatValues for [$streamId]")
}
}
private def getStopReason(streamId: Int): OptionVal[StopSignal] = {
streamMatValues.get().get(streamId) match {
case Some(OutboundStreamMatValues(_, _, stopping)) => stopping
2019-03-11 10:38:24 +01:00
case None => OptionVal.None
}
}
// after it has been used we remove the kill switch to cleanup some memory,
// not a "leak" but a KillSwitch is rather heavy
@tailrec private def clearStreamKillSwitch(streamId: Int, old: SharedKillSwitch): Unit = {
val prev = streamMatValues.get()
prev.get(streamId) match {
case Some(v) =>
if (v.streamKillSwitch.isDefined && (v.streamKillSwitch.get eq old)) {
if (!streamMatValues.compareAndSet(prev, prev.updated(streamId, v.copy(streamKillSwitch = OptionVal.None))))
clearStreamKillSwitch(streamId, old)
}
case None => throw new IllegalStateException(s"Expected streamMatValues for [$streamId]")
}
}
/**
* Exposed for orderly shutdown purposes, can not be trusted except for during shutdown as streams may restart.
* Will complete successfully even if one of the stream completion futures failed
*/
def streamsCompleted: Future[Done] = {
implicit val ec = materializer.executionContext
2019-03-11 10:38:24 +01:00
Future
.sequence(streamMatValues.get().values.map {
case OutboundStreamMatValues(_, done, _) => done
})
.map(_ => Done)
}
override def toString: String =
s"Association($localAddress -> $remoteAddress with $associationState)"
}
/**
* INTERNAL API
*/
private[remote] class AssociationRegistry(createAssociation: Address => Association) {
private[this] val associationsByAddress = new AtomicReference[Map[Address, Association]](Map.empty)
private[this] val associationsByUid = new AtomicReference[ImmutableLongMap[Association]](ImmutableLongMap.empty)
/**
* @throws ShuttingDown if called while the transport is shutting down
*/
@tailrec final def association(remoteAddress: Address): Association = {
val currentMap = associationsByAddress.get
currentMap.get(remoteAddress) match {
case Some(existing) => existing
case None =>
val newAssociation = createAssociation(remoteAddress)
val newMap = currentMap.updated(remoteAddress, newAssociation)
if (associationsByAddress.compareAndSet(currentMap, newMap)) {
newAssociation.associate() // start it, only once
newAssociation
} else
association(remoteAddress) // lost CAS, retry
}
}
def association(uid: Long): OptionVal[Association] =
associationsByUid.get.get(uid)
/**
* @throws ShuttingDown if called while the transport is shutting down
*/
@tailrec final def setUID(peer: UniqueAddress): Association = {
// Don't create a new association via this method. It's supposed to exist unless it was removed after quarantined.
val a = association(peer.address)
val currentMap = associationsByUid.get
currentMap.get(peer.uid) match {
case OptionVal.Some(previous) =>
if (previous eq a)
// associationsByUid Map already contains the right association
a
else
// make sure we don't overwrite same UID with different association
throw new IllegalArgumentException(s"UID collision old [$previous] new [$a]")
case _ =>
// update associationsByUid Map with the uid -> association
val newMap = currentMap.updated(peer.uid, a)
if (associationsByUid.compareAndSet(currentMap, newMap))
a
else
setUID(peer) // lost CAS, retry
}
}
def allAssociations: Set[Association] =
associationsByAddress.get.values.toSet
def removeUnusedQuarantined(after: FiniteDuration): Unit = {
removeUnusedQuarantinedByAddress(after)
removeUnusedQuarantinedByUid(after)
}
@tailrec private def removeUnusedQuarantinedByAddress(after: FiniteDuration): Unit = {
val now = System.nanoTime()
val afterNanos = after.toNanos
val currentMap = associationsByAddress.get
val remove = currentMap.foldLeft(Map.empty[Address, Association]) {
case (acc, (address, association)) =>
val state = association.associationState
if (state.isQuarantined() && ((now - state.lastUsedTimestamp.get) >= afterNanos))
acc.updated(address, association)
else
acc
}
if (remove.nonEmpty) {
val newMap = currentMap -- remove.keysIterator
if (associationsByAddress.compareAndSet(currentMap, newMap))
remove.valuesIterator.foreach(_.removedAfterQuarantined())
else
removeUnusedQuarantinedByAddress(after) // CAS fail, recursive
}
}
@tailrec private def removeUnusedQuarantinedByUid(after: FiniteDuration): Unit = {
val now = System.nanoTime()
val afterNanos = after.toNanos
val currentMap = associationsByUid.get
var remove = Map.empty[Long, Association]
currentMap.keysIterator.foreach { uid =>
val association = currentMap.get(uid).get
val state = association.associationState
if (state.isQuarantined() && ((now - state.lastUsedTimestamp.get) >= afterNanos))
remove = remove.updated(uid, association)
}
if (remove.nonEmpty) {
val newMap = remove.keysIterator.foldLeft(currentMap)((acc, uid) => acc.remove(uid))
if (associationsByUid.compareAndSet(currentMap, newMap))
remove.valuesIterator.foreach(_.removedAfterQuarantined())
else
removeUnusedQuarantinedByUid(after) // CAS fail, recursive
}
}
}