2016-05-09 07:31:41 +02:00
|
|
|
|
/**
|
|
|
|
|
|
* Copyright (C) 2016 Lightbend Inc. <http://www.lightbend.com>
|
|
|
|
|
|
*/
|
|
|
|
|
|
package akka.remote.artery
|
|
|
|
|
|
|
2016-05-29 22:15:48 +02:00
|
|
|
|
import java.util.Queue
|
2016-05-17 17:34:57 +02:00
|
|
|
|
import java.util.concurrent.CountDownLatch
|
|
|
|
|
|
import java.util.concurrent.TimeUnit
|
2016-09-12 19:46:16 +02:00
|
|
|
|
import java.util.concurrent.atomic.AtomicBoolean
|
2016-05-29 22:15:48 +02:00
|
|
|
|
import java.util.concurrent.atomic.AtomicReference
|
2016-07-04 16:42:14 +02:00
|
|
|
|
|
2016-05-13 15:34:37 +02:00
|
|
|
|
import scala.annotation.tailrec
|
2016-05-17 17:34:57 +02:00
|
|
|
|
import scala.concurrent.Future
|
2016-05-09 07:31:41 +02:00
|
|
|
|
import scala.concurrent.Promise
|
2016-05-17 17:34:57 +02:00
|
|
|
|
import scala.concurrent.duration._
|
|
|
|
|
|
import scala.concurrent.duration.FiniteDuration
|
2016-05-20 12:40:56 +02:00
|
|
|
|
import akka.{ Done, NotUsed }
|
2016-05-09 07:31:41 +02:00
|
|
|
|
import akka.actor.ActorRef
|
2016-05-17 17:34:57 +02:00
|
|
|
|
import akka.actor.ActorSelectionMessage
|
2016-05-09 07:31:41 +02:00
|
|
|
|
import akka.actor.Address
|
|
|
|
|
|
import akka.dispatch.sysmsg.SystemMessage
|
2016-05-13 08:06:13 +02:00
|
|
|
|
import akka.event.Logging
|
2016-09-12 19:46:16 +02:00
|
|
|
|
import akka.pattern.after
|
2016-08-26 14:44:33 +02:00
|
|
|
|
import akka.remote._
|
2016-08-24 19:52:07 +02:00
|
|
|
|
import akka.remote.DaemonMsgCreate
|
|
|
|
|
|
import akka.remote.QuarantinedEvent
|
2016-09-09 07:45:21 +02:00
|
|
|
|
import akka.remote.artery.AeronSink.GaveUpMessageException
|
2016-09-21 13:24:35 +02:00
|
|
|
|
import akka.remote.artery.ArteryTransport.{ AeronTerminated, ShuttingDown }
|
2016-08-24 19:52:07 +02:00
|
|
|
|
import akka.remote.artery.Encoder.ChangeOutboundCompression
|
|
|
|
|
|
import akka.remote.artery.Encoder.ChangeOutboundCompressionFailed
|
2016-05-12 08:56:28 +02:00
|
|
|
|
import akka.remote.artery.InboundControlJunction.ControlMessageSubject
|
2016-05-13 08:06:13 +02:00
|
|
|
|
import akka.remote.artery.OutboundControlJunction.OutboundControlIngress
|
2016-05-17 17:34:57 +02:00
|
|
|
|
import akka.remote.artery.OutboundHandshake.HandshakeTimeoutException
|
|
|
|
|
|
import akka.remote.artery.SystemMessageDelivery.ClearSystemMessageDelivery
|
2016-08-24 19:52:07 +02:00
|
|
|
|
import akka.remote.artery.compress.CompressionProtocol._
|
|
|
|
|
|
import akka.remote.artery.compress.CompressionTable
|
2016-05-17 17:34:57 +02:00
|
|
|
|
import akka.stream.AbruptTerminationException
|
2016-09-12 19:46:16 +02:00
|
|
|
|
import akka.stream.KillSwitches
|
2016-05-09 07:31:41 +02:00
|
|
|
|
import akka.stream.Materializer
|
2016-05-13 08:06:13 +02:00
|
|
|
|
import akka.stream.scaladsl.Keep
|
2016-09-12 19:46:16 +02:00
|
|
|
|
import akka.stream.scaladsl.MergeHub
|
2016-05-09 07:31:41 +02:00
|
|
|
|
import akka.stream.scaladsl.Source
|
2016-08-23 20:38:39 +02:00
|
|
|
|
import akka.util.{ Unsafe, WildcardIndex }
|
2016-06-05 15:40:06 +02:00
|
|
|
|
import akka.util.OptionVal
|
2016-08-24 19:52:07 +02:00
|
|
|
|
import org.agrona.concurrent.ManyToOneConcurrentArrayQueue
|
2016-09-19 11:17:41 +02:00
|
|
|
|
import akka.stream.SharedKillSwitch
|
|
|
|
|
|
import scala.util.control.NoStackTrace
|
|
|
|
|
|
import akka.actor.Cancellable
|
2016-05-29 22:15:48 +02:00
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
|
* INTERNAL API
|
|
|
|
|
|
*/
|
|
|
|
|
|
private[remote] object Association {
|
2016-09-02 08:52:20 +02:00
|
|
|
|
sealed trait QueueWrapper extends SendQueue.ProducerApi[OutboundEnvelope] {
|
|
|
|
|
|
def queue: Queue[OutboundEnvelope]
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
final case class QueueWrapperImpl(queue: Queue[OutboundEnvelope]) extends QueueWrapper {
|
2016-06-29 17:09:33 +02:00
|
|
|
|
override def offer(message: OutboundEnvelope): Boolean = queue.offer(message)
|
2016-09-22 13:49:56 +02:00
|
|
|
|
|
|
|
|
|
|
override def isEnabled: Boolean = true
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
object DisabledQueueWrapper extends QueueWrapper {
|
|
|
|
|
|
override def queue: java.util.Queue[OutboundEnvelope] =
|
|
|
|
|
|
throw new UnsupportedOperationException("The Queue is disabled")
|
|
|
|
|
|
|
|
|
|
|
|
override def offer(message: OutboundEnvelope): Boolean =
|
|
|
|
|
|
throw new UnsupportedOperationException("The method offer() is illegal on a disabled queue")
|
|
|
|
|
|
|
|
|
|
|
|
override def isEnabled: Boolean = false
|
2016-05-29 22:15:48 +02:00
|
|
|
|
}
|
2016-08-30 14:37:11 +02:00
|
|
|
|
|
2016-09-02 08:52:20 +02:00
|
|
|
|
final case class LazyQueueWrapper(queue: Queue[OutboundEnvelope], materialize: () ⇒ Unit) extends QueueWrapper {
|
|
|
|
|
|
private val onlyOnce = new AtomicBoolean
|
|
|
|
|
|
|
2016-09-06 08:04:02 +02:00
|
|
|
|
def runMaterialize(): Unit = {
|
2016-09-02 08:52:20 +02:00
|
|
|
|
if (onlyOnce.compareAndSet(false, true))
|
|
|
|
|
|
materialize()
|
2016-09-06 08:04:02 +02:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
override def offer(message: OutboundEnvelope): Boolean = {
|
|
|
|
|
|
runMaterialize()
|
2016-09-02 08:52:20 +02:00
|
|
|
|
queue.offer(message)
|
|
|
|
|
|
}
|
2016-09-22 13:49:56 +02:00
|
|
|
|
|
|
|
|
|
|
override def isEnabled: Boolean = true
|
2016-09-02 08:52:20 +02:00
|
|
|
|
}
|
|
|
|
|
|
|
2016-08-30 14:37:11 +02:00
|
|
|
|
final val ControlQueueIndex = 0
|
|
|
|
|
|
final val LargeQueueIndex = 1
|
|
|
|
|
|
final val OrdinaryQueueIndex = 2
|
2016-09-19 11:17:41 +02:00
|
|
|
|
|
|
|
|
|
|
private object OutboundStreamStopSignal extends RuntimeException with NoStackTrace
|
2016-09-23 08:03:26 +02:00
|
|
|
|
|
|
|
|
|
|
final case class OutboundStreamMatValues(streamKillSwitch: SharedKillSwitch, completed: Future[Done])
|
2016-05-29 22:15:48 +02:00
|
|
|
|
}
|
2016-05-09 07:31:41 +02:00
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
|
* INTERNAL API
|
|
|
|
|
|
*
|
|
|
|
|
|
* Thread-safe, mutable holder for association state. Main entry point for remote destined message to a specific
|
|
|
|
|
|
* remote address.
|
|
|
|
|
|
*/
|
2016-05-29 22:15:48 +02:00
|
|
|
|
private[remote] class Association(
|
2016-06-03 11:59:00 +02:00
|
|
|
|
val transport: ArteryTransport,
|
|
|
|
|
|
val materializer: Materializer,
|
|
|
|
|
|
override val remoteAddress: Address,
|
2016-05-20 12:40:56 +02:00
|
|
|
|
override val controlSubject: ControlMessageSubject,
|
2016-08-23 20:38:39 +02:00
|
|
|
|
largeMessageDestinations: WildcardIndex[NotUsed],
|
2016-08-26 14:44:33 +02:00
|
|
|
|
priorityMessageDestinations: WildcardIndex[NotUsed],
|
2016-06-29 17:09:33 +02:00
|
|
|
|
outboundEnvelopePool: ObjectPool[ReusableOutboundEnvelope])
|
2016-05-13 08:06:13 +02:00
|
|
|
|
extends AbstractAssociation with OutboundContext {
|
2016-05-29 22:15:48 +02:00
|
|
|
|
import Association._
|
2016-09-07 17:50:17 +02:00
|
|
|
|
import FlightRecorderEvents._
|
2016-05-13 08:06:13 +02:00
|
|
|
|
|
|
|
|
|
|
private val log = Logging(transport.system, getClass.getName)
|
2016-09-07 17:50:17 +02:00
|
|
|
|
private val flightRecorder = transport.createFlightRecorderEventSink(synchr = true)
|
2016-05-09 07:31:41 +02:00
|
|
|
|
|
2016-09-07 10:41:36 +02:00
|
|
|
|
override def settings = transport.settings
|
|
|
|
|
|
private def advancedSettings = transport.settings.Advanced
|
|
|
|
|
|
|
|
|
|
|
|
private val restartCounter = new RestartCounter(advancedSettings.OutboundMaxRestarts, advancedSettings.OutboundRestartTimeout)
|
2016-05-17 17:34:57 +02:00
|
|
|
|
|
2016-05-29 22:15:48 +02:00
|
|
|
|
// We start with the raw wrapped queue and then it is replaced with the materialized value of
|
|
|
|
|
|
// the `SendQueue` after materialization. Using same underlying queue. This makes it possible to
|
|
|
|
|
|
// start sending (enqueuing) to the Association immediate after construction.
|
|
|
|
|
|
|
2016-06-29 17:09:33 +02:00
|
|
|
|
def createQueue(capacity: Int): Queue[OutboundEnvelope] =
|
|
|
|
|
|
new ManyToOneConcurrentArrayQueue[OutboundEnvelope](capacity)
|
2016-05-29 22:15:48 +02:00
|
|
|
|
|
2016-09-07 10:41:36 +02:00
|
|
|
|
private val outboundLanes = advancedSettings.OutboundLanes
|
|
|
|
|
|
private val controlQueueSize = advancedSettings.OutboundControlQueueSize
|
|
|
|
|
|
private val queueSize = advancedSettings.OutboundMessageQueueSize
|
|
|
|
|
|
private val largeQueueSize = advancedSettings.OutboundLargeMessageQueueSize
|
2016-08-30 14:37:11 +02:00
|
|
|
|
|
|
|
|
|
|
private[this] val queues: Array[SendQueue.ProducerApi[OutboundEnvelope]] = Array.ofDim(2 + outboundLanes)
|
2016-09-02 08:52:20 +02:00
|
|
|
|
queues(ControlQueueIndex) = QueueWrapperImpl(createQueue(controlQueueSize)) // control stream
|
2016-09-22 13:49:56 +02:00
|
|
|
|
queues(LargeQueueIndex) =
|
|
|
|
|
|
if (transport.largeMessageChannelEnabled) // large messages stream
|
|
|
|
|
|
QueueWrapperImpl(createQueue(largeQueueSize))
|
|
|
|
|
|
else
|
|
|
|
|
|
DisabledQueueWrapper
|
|
|
|
|
|
|
2016-08-30 14:37:11 +02:00
|
|
|
|
(0 until outboundLanes).foreach { i ⇒
|
2016-09-02 08:52:20 +02:00
|
|
|
|
queues(OrdinaryQueueIndex + i) = QueueWrapperImpl(createQueue(queueSize)) // ordinary messages stream
|
2016-08-30 14:37:11 +02:00
|
|
|
|
}
|
|
|
|
|
|
@volatile private[this] var queuesVisibility = false
|
|
|
|
|
|
|
|
|
|
|
|
private def controlQueue: SendQueue.ProducerApi[OutboundEnvelope] = queues(ControlQueueIndex)
|
|
|
|
|
|
private def largeQueue: SendQueue.ProducerApi[OutboundEnvelope] = queues(LargeQueueIndex)
|
|
|
|
|
|
|
2016-09-06 08:04:02 +02:00
|
|
|
|
@volatile private[this] var _outboundControlIngress: OptionVal[OutboundControlIngress] = OptionVal.None
|
2016-05-17 17:34:57 +02:00
|
|
|
|
@volatile private[this] var materializing = new CountDownLatch(1)
|
2016-09-06 08:04:02 +02:00
|
|
|
|
@volatile private[this] var changeOutboundCompression: Vector[ChangeOutboundCompression] = Vector.empty
|
|
|
|
|
|
// in case there is a restart at the same time as a compression table update
|
|
|
|
|
|
private val changeCompressionTimeout = 5.seconds
|
|
|
|
|
|
|
|
|
|
|
|
private[artery] def changeActorRefCompression(table: CompressionTable[ActorRef]): Future[Done] = {
|
|
|
|
|
|
import transport.system.dispatcher
|
|
|
|
|
|
val c = changeOutboundCompression
|
|
|
|
|
|
val result =
|
|
|
|
|
|
if (c.isEmpty) Future.successful(Done)
|
|
|
|
|
|
else if (c.size == 1) c.head.changeActorRefCompression(table)
|
|
|
|
|
|
else Future.sequence(c.map(_.changeActorRefCompression(table))).map(_ ⇒ Done)
|
|
|
|
|
|
timeoutAfter(result, changeCompressionTimeout, new ChangeOutboundCompressionFailed)
|
|
|
|
|
|
}
|
2016-09-23 08:03:26 +02:00
|
|
|
|
// keyed by stream queue index
|
|
|
|
|
|
private[this] val streamMatValues = new AtomicReference(Map.empty[Int, OutboundStreamMatValues])
|
2016-09-19 11:17:41 +02:00
|
|
|
|
private[this] val idle = new AtomicReference[Option[Cancellable]](None)
|
2016-08-24 19:52:07 +02:00
|
|
|
|
|
2016-09-06 08:04:02 +02:00
|
|
|
|
private[artery] def changeClassManifestCompression(table: CompressionTable[String]): Future[Done] = {
|
2016-08-30 14:37:11 +02:00
|
|
|
|
import transport.system.dispatcher
|
2016-09-06 08:04:02 +02:00
|
|
|
|
val c = changeOutboundCompression
|
|
|
|
|
|
val result =
|
|
|
|
|
|
if (c.isEmpty) Future.successful(Done)
|
|
|
|
|
|
else if (c.size == 1) c.head.changeClassManifestCompression(table)
|
|
|
|
|
|
else Future.sequence(c.map(_.changeClassManifestCompression(table))).map(_ ⇒ Done)
|
|
|
|
|
|
timeoutAfter(result, changeCompressionTimeout, new ChangeOutboundCompressionFailed)
|
2016-08-30 14:37:11 +02:00
|
|
|
|
}
|
2016-08-24 19:52:07 +02:00
|
|
|
|
|
fix many bugs in InboundCompressions, #21464
* comprehensive integration test that revealed many bugs
* confirmations of manifests were wrong, at two places
* using wrong tables when system is restarted, including
originUid in the tables with checks when receiving advertisments
* close (stop scheduling) of advertisments when new incarnation,
quarantine, or restart
* cleanup how deadLetters ref was treated, and made it more robust
* make Decoder tolerant to decompression failures, can happen in
case of system restart before handshake completed
* give up resending advertisment after a few attempts without confirmation,
to avoid keeping outbound association open to possible dead system
* don't advertise new table when no inbound messages,
to avoid keeping outbound association open to possible dead system
* HeaderBuilder could use manifest field from previous message, added
resetMessageFields
* No compression for ArteryMessage, e.g. handshake messages must go
through without depending on compression tables being in sync
* improve debug logging, including originUid
2016-09-15 11:27:00 +02:00
|
|
|
|
private def clearOutboundCompression(): Future[Done] = {
|
2016-08-30 14:37:11 +02:00
|
|
|
|
import transport.system.dispatcher
|
2016-09-06 08:04:02 +02:00
|
|
|
|
val c = changeOutboundCompression
|
|
|
|
|
|
val result =
|
|
|
|
|
|
if (c.isEmpty) Future.successful(Done)
|
|
|
|
|
|
else if (c.size == 1) c.head.clearCompression()
|
|
|
|
|
|
else Future.sequence(c.map(_.clearCompression())).map(_ ⇒ Done)
|
|
|
|
|
|
timeoutAfter(result, changeCompressionTimeout, new ChangeOutboundCompressionFailed)
|
2016-08-30 14:37:11 +02:00
|
|
|
|
}
|
2016-08-24 19:52:07 +02:00
|
|
|
|
|
fix many bugs in InboundCompressions, #21464
* comprehensive integration test that revealed many bugs
* confirmations of manifests were wrong, at two places
* using wrong tables when system is restarted, including
originUid in the tables with checks when receiving advertisments
* close (stop scheduling) of advertisments when new incarnation,
quarantine, or restart
* cleanup how deadLetters ref was treated, and made it more robust
* make Decoder tolerant to decompression failures, can happen in
case of system restart before handshake completed
* give up resending advertisment after a few attempts without confirmation,
to avoid keeping outbound association open to possible dead system
* don't advertise new table when no inbound messages,
to avoid keeping outbound association open to possible dead system
* HeaderBuilder could use manifest field from previous message, added
resetMessageFields
* No compression for ArteryMessage, e.g. handshake messages must go
through without depending on compression tables being in sync
* improve debug logging, including originUid
2016-09-15 11:27:00 +02:00
|
|
|
|
private def clearInboundCompression(originUid: Long): Unit =
|
|
|
|
|
|
transport.inboundCompressions.foreach(_.close(originUid))
|
|
|
|
|
|
|
2016-09-06 08:04:02 +02:00
|
|
|
|
private def timeoutAfter[T](f: Future[T], timeout: FiniteDuration, e: ⇒ Throwable): Future[T] = {
|
2016-08-30 14:37:11 +02:00
|
|
|
|
import transport.system.dispatcher
|
2016-09-06 08:04:02 +02:00
|
|
|
|
val f2 = after(timeout, transport.system.scheduler)(Future.failed(e))
|
|
|
|
|
|
Future.firstCompletedOf(List(f, f2))
|
2016-08-30 14:37:11 +02:00
|
|
|
|
}
|
2016-05-11 15:55:06 +02:00
|
|
|
|
|
2016-09-05 12:09:59 +02:00
|
|
|
|
private def deadletters = transport.system.deadLetters
|
|
|
|
|
|
|
2016-05-12 08:56:28 +02:00
|
|
|
|
def outboundControlIngress: OutboundControlIngress = {
|
2016-09-06 08:04:02 +02:00
|
|
|
|
_outboundControlIngress match {
|
|
|
|
|
|
case OptionVal.Some(o) ⇒ o
|
|
|
|
|
|
case OptionVal.None ⇒
|
|
|
|
|
|
controlQueue match {
|
|
|
|
|
|
case w: LazyQueueWrapper ⇒ w.runMaterialize()
|
|
|
|
|
|
case _ ⇒
|
|
|
|
|
|
}
|
2016-09-19 11:17:41 +02:00
|
|
|
|
// the outboundControlIngress may be accessed before the stream is materialized
|
|
|
|
|
|
// using CountDownLatch to make sure that materialization is completed
|
2016-09-06 08:04:02 +02:00
|
|
|
|
materializing.await(10, TimeUnit.SECONDS)
|
|
|
|
|
|
_outboundControlIngress match {
|
|
|
|
|
|
case OptionVal.Some(o) ⇒ o
|
2016-09-22 11:07:17 +02:00
|
|
|
|
case OptionVal.None ⇒
|
|
|
|
|
|
if (transport.isShutdown) throw ShuttingDown
|
|
|
|
|
|
else throw new IllegalStateException("outboundControlIngress not initialized yet")
|
2016-09-06 08:04:02 +02:00
|
|
|
|
}
|
2016-05-13 08:06:13 +02:00
|
|
|
|
}
|
2016-05-11 15:55:06 +02:00
|
|
|
|
}
|
2016-05-09 07:31:41 +02:00
|
|
|
|
|
|
|
|
|
|
override def localAddress: UniqueAddress = transport.localAddress
|
|
|
|
|
|
|
2016-05-13 08:06:13 +02:00
|
|
|
|
/**
|
|
|
|
|
|
* Holds reference to shared state of Association - *access only via helper methods*
|
|
|
|
|
|
*/
|
|
|
|
|
|
@volatile
|
2016-05-13 15:34:37 +02:00
|
|
|
|
private[this] var _sharedStateDoNotCallMeDirectly: AssociationState = AssociationState()
|
2016-05-13 08:06:13 +02:00
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
|
* Helper method for access to underlying state via Unsafe
|
|
|
|
|
|
*
|
|
|
|
|
|
* @param oldState Previous state
|
|
|
|
|
|
* @param newState Next state on transition
|
|
|
|
|
|
* @return Whether the previous state matched correctly
|
|
|
|
|
|
*/
|
|
|
|
|
|
@inline
|
|
|
|
|
|
private[this] def swapState(oldState: AssociationState, newState: AssociationState): Boolean =
|
|
|
|
|
|
Unsafe.instance.compareAndSwapObject(this, AbstractAssociation.sharedStateOffset, oldState, newState)
|
|
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
|
* @return Reference to current shared state
|
|
|
|
|
|
*/
|
|
|
|
|
|
def associationState: AssociationState =
|
|
|
|
|
|
Unsafe.instance.getObjectVolatile(this, AbstractAssociation.sharedStateOffset).asInstanceOf[AssociationState]
|
|
|
|
|
|
|
2016-08-24 19:52:07 +02:00
|
|
|
|
def completeHandshake(peer: UniqueAddress): Future[Done] = {
|
2016-06-03 11:59:00 +02:00
|
|
|
|
require(
|
|
|
|
|
|
remoteAddress == peer.address,
|
2016-06-23 11:58:54 +02:00
|
|
|
|
s"wrong remote address in completeHandshake, got ${peer.address}, expected $remoteAddress")
|
2016-05-13 08:06:13 +02:00
|
|
|
|
val current = associationState
|
2016-08-24 19:52:07 +02:00
|
|
|
|
|
2016-05-13 15:34:37 +02:00
|
|
|
|
current.uniqueRemoteAddressValue() match {
|
2016-07-04 16:42:14 +02:00
|
|
|
|
case Some(`peer`) ⇒
|
2016-08-24 19:52:07 +02:00
|
|
|
|
// handshake already completed
|
|
|
|
|
|
Future.successful(Done)
|
2016-05-13 08:06:13 +02:00
|
|
|
|
case _ ⇒
|
2016-08-24 19:52:07 +02:00
|
|
|
|
// clear outbound compression, it's safe to do that several times if someone else
|
|
|
|
|
|
// completes handshake at same time, but it's important to clear it before
|
|
|
|
|
|
// we signal that the handshake is completed (uniqueRemoteAddressPromise.trySuccess)
|
|
|
|
|
|
import transport.system.dispatcher
|
fix many bugs in InboundCompressions, #21464
* comprehensive integration test that revealed many bugs
* confirmations of manifests were wrong, at two places
* using wrong tables when system is restarted, including
originUid in the tables with checks when receiving advertisments
* close (stop scheduling) of advertisments when new incarnation,
quarantine, or restart
* cleanup how deadLetters ref was treated, and made it more robust
* make Decoder tolerant to decompression failures, can happen in
case of system restart before handshake completed
* give up resending advertisment after a few attempts without confirmation,
to avoid keeping outbound association open to possible dead system
* don't advertise new table when no inbound messages,
to avoid keeping outbound association open to possible dead system
* HeaderBuilder could use manifest field from previous message, added
resetMessageFields
* No compression for ArteryMessage, e.g. handshake messages must go
through without depending on compression tables being in sync
* improve debug logging, including originUid
2016-09-15 11:27:00 +02:00
|
|
|
|
clearOutboundCompression().map { _ ⇒
|
2016-08-24 19:52:07 +02:00
|
|
|
|
current.uniqueRemoteAddressPromise.trySuccess(peer)
|
2016-05-13 15:34:37 +02:00
|
|
|
|
current.uniqueRemoteAddressValue() match {
|
2016-08-24 19:52:07 +02:00
|
|
|
|
case Some(`peer`) ⇒
|
|
|
|
|
|
// our value
|
|
|
|
|
|
case _ ⇒
|
|
|
|
|
|
val newState = current.newIncarnation(Promise.successful(peer))
|
|
|
|
|
|
if (swapState(current, newState)) {
|
|
|
|
|
|
current.uniqueRemoteAddressValue() match {
|
|
|
|
|
|
case Some(old) ⇒
|
2016-09-19 11:17:41 +02:00
|
|
|
|
cancelIdleTimer()
|
2016-08-24 19:52:07 +02:00
|
|
|
|
log.debug(
|
|
|
|
|
|
"Incarnation {} of association to [{}] with new UID [{}] (old UID [{}])",
|
|
|
|
|
|
newState.incarnation, peer.address, peer.uid, old.uid)
|
fix many bugs in InboundCompressions, #21464
* comprehensive integration test that revealed many bugs
* confirmations of manifests were wrong, at two places
* using wrong tables when system is restarted, including
originUid in the tables with checks when receiving advertisments
* close (stop scheduling) of advertisments when new incarnation,
quarantine, or restart
* cleanup how deadLetters ref was treated, and made it more robust
* make Decoder tolerant to decompression failures, can happen in
case of system restart before handshake completed
* give up resending advertisment after a few attempts without confirmation,
to avoid keeping outbound association open to possible dead system
* don't advertise new table when no inbound messages,
to avoid keeping outbound association open to possible dead system
* HeaderBuilder could use manifest field from previous message, added
resetMessageFields
* No compression for ArteryMessage, e.g. handshake messages must go
through without depending on compression tables being in sync
* improve debug logging, including originUid
2016-09-15 11:27:00 +02:00
|
|
|
|
clearInboundCompression(old.uid)
|
2016-08-24 19:52:07 +02:00
|
|
|
|
case None ⇒
|
|
|
|
|
|
// Failed, nothing to do
|
|
|
|
|
|
}
|
|
|
|
|
|
// if swap failed someone else completed before us, and that is fine
|
|
|
|
|
|
}
|
2016-05-13 08:06:13 +02:00
|
|
|
|
}
|
2016-08-24 19:52:07 +02:00
|
|
|
|
Done
|
2016-05-13 08:06:13 +02:00
|
|
|
|
}
|
|
|
|
|
|
}
|
2016-05-09 07:31:41 +02:00
|
|
|
|
}
|
|
|
|
|
|
|
2016-05-13 08:06:13 +02:00
|
|
|
|
// OutboundContext
|
2016-09-12 19:46:16 +02:00
|
|
|
|
override def sendControl(message: ControlMessage): Unit = {
|
2016-09-21 13:24:35 +02:00
|
|
|
|
try {
|
2016-09-22 11:07:17 +02:00
|
|
|
|
if (!transport.isShutdown) {
|
2016-09-19 11:17:41 +02:00
|
|
|
|
if (associationState.isQuarantined()) {
|
|
|
|
|
|
log.debug("Send control message [{}] to quarantined [{}]", Logging.messageClassName(message),
|
|
|
|
|
|
remoteAddress)
|
|
|
|
|
|
startIdleTimer()
|
|
|
|
|
|
}
|
2016-09-22 11:07:17 +02:00
|
|
|
|
outboundControlIngress.sendControlMessage(message)
|
|
|
|
|
|
}
|
2016-09-21 13:24:35 +02:00
|
|
|
|
} catch {
|
2016-09-19 11:17:41 +02:00
|
|
|
|
case ShuttingDown ⇒ // silence it
|
2016-09-21 13:24:35 +02:00
|
|
|
|
}
|
2016-09-12 19:46:16 +02:00
|
|
|
|
}
|
2016-05-13 08:06:13 +02:00
|
|
|
|
|
2016-06-29 17:09:33 +02:00
|
|
|
|
def send(message: Any, sender: OptionVal[ActorRef], recipient: OptionVal[RemoteActorRef]): Unit = {
|
2016-09-07 15:38:13 +02:00
|
|
|
|
|
2016-06-29 17:09:33 +02:00
|
|
|
|
def createOutboundEnvelope(): OutboundEnvelope =
|
|
|
|
|
|
outboundEnvelopePool.acquire().init(recipient, message.asInstanceOf[AnyRef], sender)
|
|
|
|
|
|
|
2016-08-30 14:37:11 +02:00
|
|
|
|
// volatile read to see latest queue array
|
|
|
|
|
|
val unused = queuesVisibility
|
|
|
|
|
|
|
2016-09-07 17:50:17 +02:00
|
|
|
|
def dropped(queueIndex: Int, qSize: Int, env: OutboundEnvelope): Unit = {
|
2016-09-05 12:09:59 +02:00
|
|
|
|
log.debug(
|
|
|
|
|
|
"Dropping message [{}] from [{}] to [{}] due to overflow of send queue, size [{}]",
|
2016-09-19 11:17:41 +02:00
|
|
|
|
Logging.messageClassName(message), sender.getOrElse(deadletters), recipient.getOrElse(recipient), qSize)
|
2016-09-07 17:50:17 +02:00
|
|
|
|
flightRecorder.hiFreq(Transport_SendQueueOverflow, queueIndex)
|
2016-09-05 12:09:59 +02:00
|
|
|
|
deadletters ! env
|
|
|
|
|
|
}
|
|
|
|
|
|
|
2016-09-19 11:17:41 +02:00
|
|
|
|
val quarantined = associationState.isQuarantined()
|
|
|
|
|
|
|
2016-05-13 15:34:37 +02:00
|
|
|
|
// allow ActorSelectionMessage to pass through quarantine, to be able to establish interaction with new system
|
2016-09-19 11:17:41 +02:00
|
|
|
|
if (message.isInstanceOf[ActorSelectionMessage] || !quarantined || message == ClearSystemMessageDelivery) {
|
|
|
|
|
|
if (quarantined && message != ClearSystemMessageDelivery) {
|
|
|
|
|
|
log.debug("Quarantine piercing attempt with message [{}] to [{}]", Logging.messageClassName(message), recipient.getOrElse(""))
|
|
|
|
|
|
startIdleTimer()
|
|
|
|
|
|
}
|
2016-09-21 13:24:35 +02:00
|
|
|
|
try {
|
|
|
|
|
|
message match {
|
|
|
|
|
|
case _: SystemMessage ⇒
|
|
|
|
|
|
val outboundEnvelope = createOutboundEnvelope()
|
|
|
|
|
|
if (!controlQueue.offer(createOutboundEnvelope())) {
|
|
|
|
|
|
quarantine(reason = s"Due to overflow of control queue, size [$controlQueueSize]")
|
|
|
|
|
|
dropped(ControlQueueIndex, controlQueueSize, outboundEnvelope)
|
|
|
|
|
|
}
|
|
|
|
|
|
case ActorSelectionMessage(_: PriorityMessage, _, _) | _: ControlMessage | ClearSystemMessageDelivery ⇒
|
|
|
|
|
|
// ActorSelectionMessage with PriorityMessage is used by cluster and remote failure detector heartbeating
|
|
|
|
|
|
val outboundEnvelope = createOutboundEnvelope()
|
|
|
|
|
|
if (!controlQueue.offer(createOutboundEnvelope())) {
|
|
|
|
|
|
dropped(ControlQueueIndex, controlQueueSize, outboundEnvelope)
|
|
|
|
|
|
}
|
|
|
|
|
|
case _: DaemonMsgCreate ⇒
|
|
|
|
|
|
// DaemonMsgCreate is not a SystemMessage, but must be sent over the control stream because
|
|
|
|
|
|
// remote deployment process depends on message ordering for DaemonMsgCreate and Watch messages.
|
|
|
|
|
|
// It must also be sent over the ordinary message stream so that it arrives (and creates the
|
|
|
|
|
|
// destination) before the first ordinary message arrives.
|
|
|
|
|
|
val outboundEnvelope1 = createOutboundEnvelope()
|
|
|
|
|
|
if (!controlQueue.offer(outboundEnvelope1))
|
|
|
|
|
|
dropped(ControlQueueIndex, controlQueueSize, outboundEnvelope1)
|
|
|
|
|
|
(0 until outboundLanes).foreach { i ⇒
|
|
|
|
|
|
val outboundEnvelope2 = createOutboundEnvelope()
|
|
|
|
|
|
if (!queues(OrdinaryQueueIndex + i).offer(outboundEnvelope2))
|
|
|
|
|
|
dropped(OrdinaryQueueIndex + i, queueSize, outboundEnvelope2)
|
|
|
|
|
|
}
|
|
|
|
|
|
case _ ⇒
|
|
|
|
|
|
val outboundEnvelope = createOutboundEnvelope()
|
|
|
|
|
|
val queueIndex = selectQueue(recipient)
|
|
|
|
|
|
val queue = queues(queueIndex)
|
|
|
|
|
|
val offerOk = queue.offer(outboundEnvelope)
|
|
|
|
|
|
if (!offerOk)
|
|
|
|
|
|
dropped(queueIndex, queueSize, outboundEnvelope)
|
|
|
|
|
|
}
|
|
|
|
|
|
} catch {
|
2016-09-19 11:17:41 +02:00
|
|
|
|
case ShuttingDown ⇒ // silence it
|
2016-05-13 15:34:37 +02:00
|
|
|
|
}
|
|
|
|
|
|
} else if (log.isDebugEnabled)
|
2016-09-05 12:09:59 +02:00
|
|
|
|
log.debug(
|
|
|
|
|
|
"Dropping message [{}] from [{}] to [{}] due to quarantined system [{}]",
|
2016-09-19 11:17:41 +02:00
|
|
|
|
Logging.messageClassName(message), sender.getOrElse(deadletters), recipient.getOrElse(recipient), remoteAddress)
|
2016-05-09 07:31:41 +02:00
|
|
|
|
}
|
|
|
|
|
|
|
2016-09-07 17:50:17 +02:00
|
|
|
|
private def selectQueue(recipient: OptionVal[RemoteActorRef]): Int = {
|
2016-05-20 12:40:56 +02:00
|
|
|
|
recipient match {
|
2016-06-29 17:09:33 +02:00
|
|
|
|
case OptionVal.Some(r) ⇒
|
2016-09-07 17:50:17 +02:00
|
|
|
|
r.cachedSendQueueIndex match {
|
2016-08-30 14:37:11 +02:00
|
|
|
|
case -1 ⇒
|
2016-08-26 14:44:33 +02:00
|
|
|
|
// only happens when messages are sent to new remote destination
|
|
|
|
|
|
// and is then cached on the RemoteActorRef
|
|
|
|
|
|
val elements = r.path.elements
|
2016-08-30 14:37:11 +02:00
|
|
|
|
val idx =
|
|
|
|
|
|
if (priorityMessageDestinations.find(elements).isDefined) {
|
|
|
|
|
|
log.debug("Using priority message stream for {}", r.path)
|
|
|
|
|
|
ControlQueueIndex
|
|
|
|
|
|
} else if (transport.largeMessageChannelEnabled && largeMessageDestinations.find(elements).isDefined) {
|
|
|
|
|
|
log.debug("Using large message stream for {}", r.path)
|
|
|
|
|
|
LargeQueueIndex
|
|
|
|
|
|
} else if (outboundLanes == 1) {
|
|
|
|
|
|
OrdinaryQueueIndex
|
|
|
|
|
|
} else {
|
|
|
|
|
|
// select lane based on destination, to preserve message order
|
|
|
|
|
|
OrdinaryQueueIndex + (math.abs(r.path.uid) % outboundLanes)
|
|
|
|
|
|
}
|
|
|
|
|
|
r.cachedSendQueueIndex = idx
|
|
|
|
|
|
idx
|
|
|
|
|
|
case idx ⇒ idx
|
2016-05-20 12:40:56 +02:00
|
|
|
|
}
|
2016-08-30 14:37:11 +02:00
|
|
|
|
|
|
|
|
|
|
case OptionVal.None ⇒
|
2016-09-07 17:50:17 +02:00
|
|
|
|
OrdinaryQueueIndex
|
2016-05-20 12:40:56 +02:00
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
2016-09-22 13:49:56 +02:00
|
|
|
|
def sendTerminationHint(replyTo: ActorRef): Int = {
|
|
|
|
|
|
if (!associationState.isQuarantined()) {
|
|
|
|
|
|
val msg = ActorSystemTerminating(localAddress)
|
|
|
|
|
|
var sent = 0
|
|
|
|
|
|
queues.iterator.filter(_.isEnabled).foreach { queue ⇒
|
2016-09-26 14:04:15 +02:00
|
|
|
|
try {
|
|
|
|
|
|
val envelope = outboundEnvelopePool.acquire()
|
|
|
|
|
|
.init(OptionVal.None, msg, OptionVal.Some(replyTo))
|
|
|
|
|
|
|
|
|
|
|
|
queue.offer(envelope)
|
|
|
|
|
|
sent += 1
|
|
|
|
|
|
} catch {
|
|
|
|
|
|
case ShuttingDown ⇒ // can be thrown if `offer` triggers new materialization
|
|
|
|
|
|
}
|
2016-09-22 13:49:56 +02:00
|
|
|
|
}
|
|
|
|
|
|
sent
|
|
|
|
|
|
} else 0
|
|
|
|
|
|
}
|
|
|
|
|
|
|
2016-05-13 15:34:37 +02:00
|
|
|
|
// OutboundContext
|
|
|
|
|
|
override def quarantine(reason: String): Unit = {
|
2016-06-04 22:14:28 +02:00
|
|
|
|
val uid = associationState.uniqueRemoteAddressValue().map(_.uid)
|
2016-05-13 15:34:37 +02:00
|
|
|
|
quarantine(reason, uid)
|
|
|
|
|
|
}
|
|
|
|
|
|
|
2016-05-25 12:28:44 +02:00
|
|
|
|
@tailrec final def quarantine(reason: String, uid: Option[Long]): Unit = {
|
2016-05-13 15:34:37 +02:00
|
|
|
|
uid match {
|
|
|
|
|
|
case Some(u) ⇒
|
|
|
|
|
|
val current = associationState
|
|
|
|
|
|
current.uniqueRemoteAddressValue() match {
|
2016-06-04 22:14:28 +02:00
|
|
|
|
case Some(peer) if peer.uid == u ⇒
|
2016-05-13 15:34:37 +02:00
|
|
|
|
if (!current.isQuarantined(u)) {
|
|
|
|
|
|
val newState = current.newQuarantined()
|
|
|
|
|
|
if (swapState(current, newState)) {
|
|
|
|
|
|
// quarantine state change was performed
|
2016-06-03 11:59:00 +02:00
|
|
|
|
log.warning(
|
2016-09-19 11:17:41 +02:00
|
|
|
|
"Association to [{}] with UID [{}] is irrecoverably failed. UID is now quarantined and all " +
|
|
|
|
|
|
"messages to this UID will be delivered to dead letters. " +
|
|
|
|
|
|
"Remote actorsystem must be restarted to recover from this situation. {}",
|
2016-05-13 15:34:37 +02:00
|
|
|
|
remoteAddress, u, reason)
|
2016-09-26 15:34:59 +02:00
|
|
|
|
transport.system.eventStream.publish(QuarantinedEvent(remoteAddress, u))
|
fix many bugs in InboundCompressions, #21464
* comprehensive integration test that revealed many bugs
* confirmations of manifests were wrong, at two places
* using wrong tables when system is restarted, including
originUid in the tables with checks when receiving advertisments
* close (stop scheduling) of advertisments when new incarnation,
quarantine, or restart
* cleanup how deadLetters ref was treated, and made it more robust
* make Decoder tolerant to decompression failures, can happen in
case of system restart before handshake completed
* give up resending advertisment after a few attempts without confirmation,
to avoid keeping outbound association open to possible dead system
* don't advertise new table when no inbound messages,
to avoid keeping outbound association open to possible dead system
* HeaderBuilder could use manifest field from previous message, added
resetMessageFields
* No compression for ArteryMessage, e.g. handshake messages must go
through without depending on compression tables being in sync
* improve debug logging, including originUid
2016-09-15 11:27:00 +02:00
|
|
|
|
clearOutboundCompression()
|
|
|
|
|
|
clearInboundCompression(u)
|
2016-05-13 15:34:37 +02:00
|
|
|
|
// end delivery of system messages to that incarnation after this point
|
2016-06-29 17:09:33 +02:00
|
|
|
|
send(ClearSystemMessageDelivery, OptionVal.None, OptionVal.None)
|
2016-05-13 15:34:37 +02:00
|
|
|
|
// try to tell the other system that we have quarantined it
|
|
|
|
|
|
sendControl(Quarantined(localAddress, peer))
|
2016-09-19 11:17:41 +02:00
|
|
|
|
startIdleTimer()
|
2016-05-13 15:34:37 +02:00
|
|
|
|
} else
|
|
|
|
|
|
quarantine(reason, uid) // recursive
|
|
|
|
|
|
}
|
2016-06-04 22:14:28 +02:00
|
|
|
|
case Some(peer) ⇒
|
2016-06-03 11:59:00 +02:00
|
|
|
|
log.debug(
|
|
|
|
|
|
"Quarantine of [{}] ignored due to non-matching UID, quarantine requested for [{}] but current is [{}]. {}",
|
2016-05-13 15:34:37 +02:00
|
|
|
|
remoteAddress, u, peer.uid, reason)
|
|
|
|
|
|
case None ⇒
|
2016-06-03 11:59:00 +02:00
|
|
|
|
log.debug(
|
|
|
|
|
|
"Quarantine of [{}] ignored because handshake not completed, quarantine request was for old incarnation. {}",
|
2016-05-13 15:34:37 +02:00
|
|
|
|
remoteAddress, reason)
|
|
|
|
|
|
}
|
|
|
|
|
|
case None ⇒
|
|
|
|
|
|
log.warning("Quarantine of [{}] ignored because unknown UID", remoteAddress)
|
|
|
|
|
|
}
|
|
|
|
|
|
|
2016-05-13 08:06:13 +02:00
|
|
|
|
}
|
2016-05-09 07:31:41 +02:00
|
|
|
|
|
2016-09-19 11:17:41 +02:00
|
|
|
|
private def cancelIdleTimer(): Unit = {
|
|
|
|
|
|
val current = idle.get
|
|
|
|
|
|
current.foreach(_.cancel())
|
|
|
|
|
|
idle.compareAndSet(current, None)
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
private def startIdleTimer(): Unit = {
|
|
|
|
|
|
cancelIdleTimer()
|
|
|
|
|
|
idle.set(Some(transport.system.scheduler.scheduleOnce(advancedSettings.StopQuarantinedAfterIdle) {
|
|
|
|
|
|
if (associationState.isQuarantined())
|
2016-09-23 08:03:26 +02:00
|
|
|
|
streamMatValues.get.valuesIterator.foreach {
|
|
|
|
|
|
case OutboundStreamMatValues(killSwitch, _) ⇒ killSwitch.abort(OutboundStreamStopSignal)
|
2016-09-19 11:17:41 +02:00
|
|
|
|
}
|
|
|
|
|
|
}(transport.system.dispatcher)))
|
|
|
|
|
|
}
|
|
|
|
|
|
|
2016-05-29 22:15:48 +02:00
|
|
|
|
/**
|
|
|
|
|
|
* Called once after construction when the `Association` instance
|
|
|
|
|
|
* wins the CAS in the `AssociationRegistry`. It will materialize
|
|
|
|
|
|
* the streams. It is possible to sending (enqueuing) to the association
|
|
|
|
|
|
* before this method is called.
|
2016-09-21 13:24:35 +02:00
|
|
|
|
*
|
|
|
|
|
|
* @throws ShuttingDown if called while the transport is shutting down
|
2016-05-29 22:15:48 +02:00
|
|
|
|
*/
|
2016-05-09 07:31:41 +02:00
|
|
|
|
def associate(): Unit = {
|
2016-05-29 22:15:48 +02:00
|
|
|
|
if (!controlQueue.isInstanceOf[QueueWrapper])
|
|
|
|
|
|
throw new IllegalStateException("associate() must only be called once")
|
2016-06-23 11:58:54 +02:00
|
|
|
|
runOutboundStreams()
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
private def runOutboundStreams(): Unit = {
|
2016-09-21 13:24:35 +02:00
|
|
|
|
|
2016-05-29 22:15:48 +02:00
|
|
|
|
// it's important to materialize the outboundControl stream first,
|
|
|
|
|
|
// so that outboundControlIngress is ready when stages for all streams start
|
2016-08-24 19:52:07 +02:00
|
|
|
|
runOutboundControlStream()
|
|
|
|
|
|
runOutboundOrdinaryMessagesStream()
|
2016-05-29 22:15:48 +02:00
|
|
|
|
|
2016-08-24 19:52:07 +02:00
|
|
|
|
if (transport.largeMessageChannelEnabled)
|
|
|
|
|
|
runOutboundLargeMessagesStream()
|
2016-05-17 17:34:57 +02:00
|
|
|
|
}
|
|
|
|
|
|
|
2016-08-24 19:52:07 +02:00
|
|
|
|
private def runOutboundControlStream(): Unit = {
|
2016-09-21 13:24:35 +02:00
|
|
|
|
if (transport.isShutdown) throw ShuttingDown
|
2016-09-19 11:17:41 +02:00
|
|
|
|
log.debug("Starting outbound control stream to [{}]", remoteAddress)
|
2016-05-29 22:15:48 +02:00
|
|
|
|
|
2016-08-30 14:37:11 +02:00
|
|
|
|
val wrapper = getOrCreateQueueWrapper(ControlQueueIndex, queueSize)
|
|
|
|
|
|
queues(ControlQueueIndex) = wrapper // use new underlying queue immediately for restarts
|
|
|
|
|
|
queuesVisibility = true // volatile write for visibility of the queues array
|
|
|
|
|
|
|
2016-09-19 11:17:41 +02:00
|
|
|
|
val streamKillSwitch = KillSwitches.shared("outboundControlStreamKillSwitch")
|
|
|
|
|
|
|
2016-09-12 19:46:16 +02:00
|
|
|
|
val (queueValue, (control, completed)) =
|
2016-08-30 14:37:11 +02:00
|
|
|
|
Source.fromGraph(new SendQueue[OutboundEnvelope])
|
2016-09-19 11:17:41 +02:00
|
|
|
|
.via(streamKillSwitch.flow)
|
2016-08-30 14:37:11 +02:00
|
|
|
|
.toMat(transport.outboundControl(this))(Keep.both)
|
|
|
|
|
|
.run()(materializer)
|
|
|
|
|
|
|
2016-05-29 22:15:48 +02:00
|
|
|
|
queueValue.inject(wrapper.queue)
|
|
|
|
|
|
// replace with the materialized value, still same underlying queue
|
2016-08-30 14:37:11 +02:00
|
|
|
|
queues(ControlQueueIndex) = queueValue
|
|
|
|
|
|
queuesVisibility = true // volatile write for visibility of the queues array
|
2016-09-06 08:04:02 +02:00
|
|
|
|
_outboundControlIngress = OptionVal.Some(control)
|
2016-05-17 17:34:57 +02:00
|
|
|
|
materializing.countDown()
|
2016-09-23 08:03:26 +02:00
|
|
|
|
|
|
|
|
|
|
updateStreamMatValues(ControlQueueIndex, streamKillSwitch, completed)
|
2016-09-02 08:52:20 +02:00
|
|
|
|
attachStreamRestart("Outbound control stream", ControlQueueIndex, controlQueueSize,
|
2016-09-23 08:03:26 +02:00
|
|
|
|
completed, () ⇒ runOutboundControlStream())
|
2016-05-17 17:34:57 +02:00
|
|
|
|
}
|
|
|
|
|
|
|
2016-08-30 14:37:11 +02:00
|
|
|
|
private def getOrCreateQueueWrapper(queueIndex: Int, capacity: Int): QueueWrapper = {
|
|
|
|
|
|
val unused = queuesVisibility // volatile read to see latest queues array
|
|
|
|
|
|
queues(queueIndex) match {
|
2016-05-29 22:15:48 +02:00
|
|
|
|
case existing: QueueWrapper ⇒ existing
|
|
|
|
|
|
case _ ⇒
|
|
|
|
|
|
// use new queue for restarts
|
2016-09-02 08:52:20 +02:00
|
|
|
|
QueueWrapperImpl(createQueue(capacity))
|
2016-05-29 22:15:48 +02:00
|
|
|
|
}
|
2016-08-30 14:37:11 +02:00
|
|
|
|
}
|
2016-05-29 22:15:48 +02:00
|
|
|
|
|
2016-08-24 19:52:07 +02:00
|
|
|
|
private def runOutboundOrdinaryMessagesStream(): Unit = {
|
2016-09-21 13:24:35 +02:00
|
|
|
|
if (transport.isShutdown) throw ShuttingDown
|
2016-08-30 14:37:11 +02:00
|
|
|
|
if (outboundLanes == 1) {
|
2016-09-19 11:17:41 +02:00
|
|
|
|
log.debug("Starting outbound message stream to [{}]", remoteAddress)
|
2016-08-30 14:37:11 +02:00
|
|
|
|
val queueIndex = OrdinaryQueueIndex
|
|
|
|
|
|
val wrapper = getOrCreateQueueWrapper(queueIndex, queueSize)
|
|
|
|
|
|
queues(queueIndex) = wrapper // use new underlying queue immediately for restarts
|
|
|
|
|
|
queuesVisibility = true // volatile write for visibility of the queues array
|
2016-06-02 07:21:32 +02:00
|
|
|
|
|
2016-09-19 11:17:41 +02:00
|
|
|
|
val streamKillSwitch = KillSwitches.shared("outboundMessagesKillSwitch")
|
|
|
|
|
|
|
2016-08-30 14:37:11 +02:00
|
|
|
|
val ((queueValue, testMgmt), (changeCompression, completed)) =
|
2016-06-29 17:09:33 +02:00
|
|
|
|
Source.fromGraph(new SendQueue[OutboundEnvelope])
|
2016-09-19 11:17:41 +02:00
|
|
|
|
.via(streamKillSwitch.flow)
|
2016-08-30 14:37:11 +02:00
|
|
|
|
.viaMat(transport.outboundTestFlow(this))(Keep.both)
|
2016-08-24 19:52:07 +02:00
|
|
|
|
.toMat(transport.outbound(this))(Keep.both)
|
2016-06-02 07:21:32 +02:00
|
|
|
|
.run()(materializer)
|
2016-08-30 14:37:11 +02:00
|
|
|
|
|
|
|
|
|
|
queueValue.inject(wrapper.queue)
|
|
|
|
|
|
// replace with the materialized value, still same underlying queue
|
|
|
|
|
|
queues(queueIndex) = queueValue
|
|
|
|
|
|
queuesVisibility = true // volatile write for visibility of the queues array
|
2016-09-06 08:04:02 +02:00
|
|
|
|
changeOutboundCompression = Vector(changeCompression)
|
2016-08-30 14:37:11 +02:00
|
|
|
|
|
2016-09-23 08:03:26 +02:00
|
|
|
|
updateStreamMatValues(OrdinaryQueueIndex, streamKillSwitch, completed)
|
2016-09-02 08:52:20 +02:00
|
|
|
|
attachStreamRestart("Outbound message stream", OrdinaryQueueIndex, queueSize,
|
2016-09-23 08:03:26 +02:00
|
|
|
|
completed, () ⇒ runOutboundOrdinaryMessagesStream())
|
2016-08-30 14:37:11 +02:00
|
|
|
|
|
|
|
|
|
|
} else {
|
2016-09-19 11:17:41 +02:00
|
|
|
|
log.debug("Starting outbound message stream to [{}] with [{}] lanes", remoteAddress, outboundLanes)
|
2016-08-30 14:37:11 +02:00
|
|
|
|
val wrappers = (0 until outboundLanes).map { i ⇒
|
|
|
|
|
|
val wrapper = getOrCreateQueueWrapper(OrdinaryQueueIndex + i, queueSize)
|
|
|
|
|
|
queues(OrdinaryQueueIndex + i) = wrapper // use new underlying queue immediately for restarts
|
|
|
|
|
|
queuesVisibility = true // volatile write for visibility of the queues array
|
|
|
|
|
|
wrapper
|
|
|
|
|
|
}.toVector
|
|
|
|
|
|
|
2016-09-19 11:17:41 +02:00
|
|
|
|
val streamKillSwitch = KillSwitches.shared("outboundMessagesKillSwitch")
|
2016-09-06 16:57:30 +02:00
|
|
|
|
|
2016-08-30 14:37:11 +02:00
|
|
|
|
val lane = Source.fromGraph(new SendQueue[OutboundEnvelope])
|
2016-09-19 11:17:41 +02:00
|
|
|
|
.via(streamKillSwitch.flow)
|
2016-09-12 19:46:16 +02:00
|
|
|
|
.via(transport.outboundTestFlow(this))
|
2016-08-30 14:37:11 +02:00
|
|
|
|
.viaMat(transport.outboundLane(this))(Keep.both)
|
|
|
|
|
|
.watchTermination()(Keep.both)
|
2016-09-08 19:34:15 +02:00
|
|
|
|
// recover to avoid error logging by MergeHub
|
|
|
|
|
|
.recoverWithRetries(-1, { case _: Throwable ⇒ Source.empty })
|
2016-08-30 14:37:11 +02:00
|
|
|
|
.mapMaterializedValue {
|
2016-09-12 19:46:16 +02:00
|
|
|
|
case ((q, c), w) ⇒ (q, c, w)
|
2016-08-30 14:37:11 +02:00
|
|
|
|
}
|
|
|
|
|
|
|
2016-09-06 16:57:30 +02:00
|
|
|
|
val (mergeHub, aeronSinkCompleted) = MergeHub.source[EnvelopeBuffer]
|
2016-09-19 11:17:41 +02:00
|
|
|
|
.via(streamKillSwitch.flow)
|
2016-09-06 16:57:30 +02:00
|
|
|
|
.toMat(transport.aeronSink(this))(Keep.both).run()(materializer)
|
2016-08-30 14:37:11 +02:00
|
|
|
|
|
2016-09-12 19:46:16 +02:00
|
|
|
|
val values: Vector[(SendQueue.QueueValue[OutboundEnvelope], Encoder.ChangeOutboundCompression, Future[Done])] =
|
2016-08-30 14:37:11 +02:00
|
|
|
|
(0 until outboundLanes).map { _ ⇒
|
|
|
|
|
|
lane.to(mergeHub).run()(materializer)
|
|
|
|
|
|
}(collection.breakOut)
|
|
|
|
|
|
|
2016-09-12 19:46:16 +02:00
|
|
|
|
val (queueValues, changeCompressionValues, laneCompletedValues) = values.unzip3
|
2016-08-30 14:37:11 +02:00
|
|
|
|
|
|
|
|
|
|
import transport.system.dispatcher
|
|
|
|
|
|
val completed = Future.sequence(laneCompletedValues).flatMap(_ ⇒ aeronSinkCompleted)
|
|
|
|
|
|
|
2016-09-06 16:57:30 +02:00
|
|
|
|
// tear down all parts if one part fails or completes
|
|
|
|
|
|
completed.onFailure {
|
2016-09-19 11:17:41 +02:00
|
|
|
|
case reason: Throwable ⇒ streamKillSwitch.abort(reason)
|
2016-09-06 16:57:30 +02:00
|
|
|
|
}
|
2016-09-19 11:17:41 +02:00
|
|
|
|
(laneCompletedValues :+ aeronSinkCompleted).foreach(_.onSuccess { case _ ⇒ streamKillSwitch.shutdown() })
|
2016-09-06 16:57:30 +02:00
|
|
|
|
|
2016-08-30 14:37:11 +02:00
|
|
|
|
queueValues.zip(wrappers).zipWithIndex.foreach {
|
|
|
|
|
|
case ((q, w), i) ⇒
|
|
|
|
|
|
q.inject(w.queue)
|
|
|
|
|
|
queues(OrdinaryQueueIndex + i) = q // replace with the materialized value, still same underlying queue
|
2016-06-02 07:21:32 +02:00
|
|
|
|
}
|
2016-08-30 14:37:11 +02:00
|
|
|
|
queuesVisibility = true // volatile write for visibility of the queues array
|
2016-06-02 07:21:32 +02:00
|
|
|
|
|
2016-09-06 08:04:02 +02:00
|
|
|
|
changeOutboundCompression = changeCompressionValues
|
2016-06-02 07:21:32 +02:00
|
|
|
|
|
2016-09-02 08:52:20 +02:00
|
|
|
|
attachStreamRestart("Outbound message stream", OrdinaryQueueIndex, queueSize,
|
2016-09-23 08:03:26 +02:00
|
|
|
|
completed, () ⇒ runOutboundOrdinaryMessagesStream())
|
2016-08-30 14:37:11 +02:00
|
|
|
|
}
|
2016-05-17 17:34:57 +02:00
|
|
|
|
}
|
|
|
|
|
|
|
2016-08-24 19:52:07 +02:00
|
|
|
|
private def runOutboundLargeMessagesStream(): Unit = {
|
2016-09-21 13:24:35 +02:00
|
|
|
|
if (transport.isShutdown) throw ShuttingDown
|
2016-09-19 11:17:41 +02:00
|
|
|
|
log.debug("Starting outbound large message stream to [{}]", remoteAddress)
|
2016-08-30 14:37:11 +02:00
|
|
|
|
val wrapper = getOrCreateQueueWrapper(LargeQueueIndex, largeQueueSize)
|
|
|
|
|
|
queues(LargeQueueIndex) = wrapper // use new underlying queue immediately for restarts
|
|
|
|
|
|
queuesVisibility = true // volatile write for visibility of the queues array
|
2016-06-02 07:21:32 +02:00
|
|
|
|
|
2016-09-19 11:17:41 +02:00
|
|
|
|
val streamKillSwitch = KillSwitches.shared("outboundLargeMessagesKillSwitch")
|
|
|
|
|
|
|
2016-09-12 19:46:16 +02:00
|
|
|
|
val (queueValue, completed) = Source.fromGraph(new SendQueue[OutboundEnvelope])
|
2016-09-19 11:17:41 +02:00
|
|
|
|
.via(streamKillSwitch.flow)
|
2016-09-12 19:46:16 +02:00
|
|
|
|
.via(transport.outboundTestFlow(this))
|
2016-08-30 14:37:11 +02:00
|
|
|
|
.toMat(transport.outboundLarge(this))(Keep.both)
|
|
|
|
|
|
.run()(materializer)
|
|
|
|
|
|
|
2016-05-29 22:15:48 +02:00
|
|
|
|
queueValue.inject(wrapper.queue)
|
|
|
|
|
|
// replace with the materialized value, still same underlying queue
|
2016-08-30 14:37:11 +02:00
|
|
|
|
queues(LargeQueueIndex) = queueValue
|
|
|
|
|
|
queuesVisibility = true // volatile write for visibility of the queues array
|
2016-09-23 08:03:26 +02:00
|
|
|
|
|
|
|
|
|
|
updateStreamMatValues(LargeQueueIndex, streamKillSwitch, completed)
|
2016-09-02 08:52:20 +02:00
|
|
|
|
attachStreamRestart("Outbound large message stream", LargeQueueIndex, largeQueueSize,
|
2016-09-23 08:03:26 +02:00
|
|
|
|
completed, () ⇒ runOutboundLargeMessagesStream())
|
2016-05-20 12:40:56 +02:00
|
|
|
|
}
|
|
|
|
|
|
|
2016-09-02 08:52:20 +02:00
|
|
|
|
private def attachStreamRestart(streamName: String, queueIndex: Int, queueCapacity: Int,
|
2016-09-23 08:03:26 +02:00
|
|
|
|
streamCompleted: Future[Done], restart: () ⇒ Unit): Unit = {
|
2016-09-02 08:52:20 +02:00
|
|
|
|
|
|
|
|
|
|
def lazyRestart(): Unit = {
|
2016-09-06 08:04:02 +02:00
|
|
|
|
changeOutboundCompression = Vector.empty
|
2016-09-19 11:17:41 +02:00
|
|
|
|
if (queueIndex == ControlQueueIndex) {
|
|
|
|
|
|
materializing = new CountDownLatch(1)
|
2016-09-06 08:04:02 +02:00
|
|
|
|
_outboundControlIngress = OptionVal.None
|
2016-09-19 11:17:41 +02:00
|
|
|
|
}
|
2016-09-02 08:52:20 +02:00
|
|
|
|
// LazyQueueWrapper will invoke the `restart` function when first message is offered
|
|
|
|
|
|
queues(queueIndex) = LazyQueueWrapper(createQueue(queueCapacity), restart)
|
|
|
|
|
|
queuesVisibility = true // volatile write for visibility of the queues array
|
|
|
|
|
|
}
|
|
|
|
|
|
|
2016-05-17 17:34:57 +02:00
|
|
|
|
implicit val ec = materializer.executionContext
|
|
|
|
|
|
streamCompleted.onFailure {
|
2016-09-22 11:07:17 +02:00
|
|
|
|
case ArteryTransport.ShutdownSignal ⇒
|
|
|
|
|
|
// shutdown as expected
|
|
|
|
|
|
// countDown the latch in case threads are waiting on the latch in outboundControlIngress method
|
|
|
|
|
|
materializing.countDown()
|
|
|
|
|
|
case _: AeronTerminated ⇒ // shutdown already in progress
|
2016-09-06 11:50:10 +02:00
|
|
|
|
case cause if transport.isShutdown ⇒
|
|
|
|
|
|
// don't restart after shutdown, but log some details so we notice
|
2016-09-19 11:17:41 +02:00
|
|
|
|
log.error(cause, s"{} to [{}] failed after shutdown. {}", streamName, remoteAddress, cause.getMessage)
|
2016-09-22 11:07:17 +02:00
|
|
|
|
// countDown the latch in case threads are waiting on the latch in outboundControlIngress method
|
|
|
|
|
|
materializing.countDown()
|
2016-05-17 17:34:57 +02:00
|
|
|
|
case _: AbruptTerminationException ⇒ // ActorSystem shutdown
|
2016-09-19 11:17:41 +02:00
|
|
|
|
case OutboundStreamStopSignal ⇒
|
|
|
|
|
|
// stop as expected due to quarantine
|
|
|
|
|
|
log.debug("{} to [{}] stopped. It will be restarted if used again.", streamName, remoteAddress)
|
|
|
|
|
|
lazyRestart()
|
2016-09-09 07:45:21 +02:00
|
|
|
|
case cause: GaveUpMessageException ⇒
|
2016-09-19 11:17:41 +02:00
|
|
|
|
log.debug("{} to [{}] failed. Restarting it. {}", streamName, remoteAddress, cause.getMessage)
|
2016-05-19 08:24:27 +02:00
|
|
|
|
// restart unconditionally, without counting restarts
|
2016-09-02 08:52:20 +02:00
|
|
|
|
lazyRestart()
|
2016-05-17 17:34:57 +02:00
|
|
|
|
case cause ⇒
|
2016-09-06 08:04:02 +02:00
|
|
|
|
if (queueIndex == ControlQueueIndex) {
|
|
|
|
|
|
cause match {
|
|
|
|
|
|
case _: HandshakeTimeoutException ⇒ // ok, quarantine not possible without UID
|
|
|
|
|
|
case _ ⇒ quarantine("Outbound control stream restarted")
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
2016-05-19 08:24:27 +02:00
|
|
|
|
if (restartCounter.restart()) {
|
2016-09-19 11:17:41 +02:00
|
|
|
|
log.error(cause, "{} to [{}] failed. Restarting it. {}", streamName, remoteAddress, cause.getMessage)
|
2016-09-02 08:52:20 +02:00
|
|
|
|
lazyRestart()
|
2016-05-19 08:24:27 +02:00
|
|
|
|
} else {
|
2016-09-19 11:17:41 +02:00
|
|
|
|
log.error(cause, s"{} to [{}] failed and restarted {} times within {} seconds. Terminating system. ${cause.getMessage}",
|
2016-09-07 10:41:36 +02:00
|
|
|
|
streamName, remoteAddress, advancedSettings.OutboundMaxRestarts, advancedSettings.OutboundRestartTimeout.toSeconds)
|
2016-05-19 08:24:27 +02:00
|
|
|
|
transport.system.terminate()
|
|
|
|
|
|
}
|
2016-05-11 15:55:06 +02:00
|
|
|
|
}
|
2016-05-09 07:31:41 +02:00
|
|
|
|
}
|
2016-05-25 12:28:44 +02:00
|
|
|
|
|
2016-09-23 08:03:26 +02:00
|
|
|
|
private def updateStreamMatValues(streamId: Int, streamKillSwitch: SharedKillSwitch, completed: Future[Done]): Unit = {
|
|
|
|
|
|
implicit val ec = materializer.executionContext
|
|
|
|
|
|
updateStreamMatValues(streamId, OutboundStreamMatValues(streamKillSwitch, completed.recover { case _ ⇒ Done }))
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
@tailrec private def updateStreamMatValues(streamId: Int, values: OutboundStreamMatValues): Unit = {
|
|
|
|
|
|
val prev = streamMatValues.get()
|
|
|
|
|
|
if (!streamMatValues.compareAndSet(prev, prev + (streamId → values))) {
|
|
|
|
|
|
updateStreamMatValues(streamId, values)
|
2016-09-06 11:50:10 +02:00
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
|
* Exposed for orderly shutdown purposes, can not be trusted except for during shutdown as streams may restart.
|
|
|
|
|
|
* Will complete successfully even if one of the stream completion futures failed
|
|
|
|
|
|
*/
|
|
|
|
|
|
def streamsCompleted: Future[Done] = {
|
|
|
|
|
|
implicit val ec = materializer.executionContext
|
2016-09-23 08:03:26 +02:00
|
|
|
|
Future.sequence(streamMatValues.get().values.map {
|
|
|
|
|
|
case OutboundStreamMatValues(_, done) ⇒ done
|
|
|
|
|
|
}).map(_ ⇒ Done)
|
2016-09-06 11:50:10 +02:00
|
|
|
|
}
|
|
|
|
|
|
|
2016-06-23 11:58:54 +02:00
|
|
|
|
override def toString: String =
|
2016-05-25 12:28:44 +02:00
|
|
|
|
s"Association($localAddress -> $remoteAddress with $associationState)"
|
|
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
|
* INTERNAL API
|
|
|
|
|
|
*/
|
|
|
|
|
|
private[remote] class AssociationRegistry(createAssociation: Address ⇒ Association) {
|
2016-05-29 22:15:48 +02:00
|
|
|
|
private[this] val associationsByAddress = new AtomicReference[Map[Address, Association]](Map.empty)
|
2016-06-10 13:04:23 +02:00
|
|
|
|
private[this] val associationsByUid = new AtomicReference[ImmutableLongMap[Association]](ImmutableLongMap.empty)
|
2016-05-25 12:28:44 +02:00
|
|
|
|
|
2016-09-21 13:24:35 +02:00
|
|
|
|
/**
|
|
|
|
|
|
* @throws ShuttingDown if called while the transport is shutting down
|
|
|
|
|
|
*/
|
2016-05-29 22:15:48 +02:00
|
|
|
|
@tailrec final def association(remoteAddress: Address): Association = {
|
|
|
|
|
|
val currentMap = associationsByAddress.get
|
|
|
|
|
|
currentMap.get(remoteAddress) match {
|
|
|
|
|
|
case Some(existing) ⇒ existing
|
|
|
|
|
|
case None ⇒
|
|
|
|
|
|
val newAssociation = createAssociation(remoteAddress)
|
|
|
|
|
|
val newMap = currentMap.updated(remoteAddress, newAssociation)
|
|
|
|
|
|
if (associationsByAddress.compareAndSet(currentMap, newMap)) {
|
|
|
|
|
|
newAssociation.associate() // start it, only once
|
2016-05-25 12:28:44 +02:00
|
|
|
|
newAssociation
|
2016-05-29 22:15:48 +02:00
|
|
|
|
} else
|
|
|
|
|
|
association(remoteAddress) // lost CAS, retry
|
2016-05-25 12:28:44 +02:00
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
2016-06-05 15:40:06 +02:00
|
|
|
|
def association(uid: Long): OptionVal[Association] =
|
2016-06-10 13:04:23 +02:00
|
|
|
|
associationsByUid.get.get(uid)
|
2016-05-25 12:28:44 +02:00
|
|
|
|
|
2016-09-21 13:24:35 +02:00
|
|
|
|
/**
|
|
|
|
|
|
* @throws ShuttingDown if called while the transport is shutting down
|
|
|
|
|
|
*/
|
2016-06-10 13:04:23 +02:00
|
|
|
|
@tailrec final def setUID(peer: UniqueAddress): Association = {
|
|
|
|
|
|
val currentMap = associationsByUid.get
|
2016-05-25 12:28:44 +02:00
|
|
|
|
val a = association(peer.address)
|
2016-08-24 19:52:07 +02:00
|
|
|
|
|
2016-06-10 13:04:23 +02:00
|
|
|
|
currentMap.get(peer.uid) match {
|
2016-08-24 19:52:07 +02:00
|
|
|
|
case OptionVal.Some(previous) ⇒
|
|
|
|
|
|
if (previous eq a)
|
|
|
|
|
|
// associationsByUid Map already contains the right association
|
|
|
|
|
|
a
|
|
|
|
|
|
else
|
|
|
|
|
|
// make sure we don't overwrite same UID with different association
|
|
|
|
|
|
throw new IllegalArgumentException(s"UID collision old [$previous] new [$a]")
|
|
|
|
|
|
case _ ⇒
|
|
|
|
|
|
// update associationsByUid Map with the uid -> assocation
|
|
|
|
|
|
val newMap = currentMap.updated(peer.uid, a)
|
|
|
|
|
|
if (associationsByUid.compareAndSet(currentMap, newMap))
|
|
|
|
|
|
a
|
|
|
|
|
|
else
|
|
|
|
|
|
setUID(peer) // lost CAS, retry
|
2016-06-10 13:04:23 +02:00
|
|
|
|
}
|
2016-05-25 12:28:44 +02:00
|
|
|
|
}
|
2016-06-02 07:21:32 +02:00
|
|
|
|
|
|
|
|
|
|
def allAssociations: Set[Association] =
|
|
|
|
|
|
associationsByAddress.get.values.toSet
|
2016-05-09 07:31:41 +02:00
|
|
|
|
}
|