pekko/akka-remote/src/main/scala/akka/remote/Remoting.scala

988 lines
43 KiB
Scala
Raw Normal View History

/*
* Copyright (C) 2009-2019 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.remote
import akka.Done
import akka.actor.SupervisorStrategy._
import akka.actor._
import akka.event.{ Logging, LoggingAdapter }
2019-03-11 10:38:24 +01:00
import akka.pattern.{ ask, gracefulStop, pipe }
2012-12-13 14:27:34 +01:00
import akka.remote.EndpointManager._
import akka.remote.Remoting.TransportSupervisor
import akka.remote.transport.Transport.{ ActorAssociationEventListener, AssociationEventListener, InboundAssociation }
import akka.remote.transport._
2012-12-13 14:27:34 +01:00
import com.typesafe.config.Config
import java.net.URLEncoder
import java.util.concurrent.TimeoutException
2019-03-11 10:38:24 +01:00
import scala.collection.immutable.{ HashMap, Seq }
import scala.concurrent.duration._
2019-03-11 10:38:24 +01:00
import scala.concurrent.{ Await, Future, Promise }
import scala.util.control.NonFatal
import scala.util.{ Failure, Success }
import akka.remote.transport.AkkaPduCodec.Message
import java.util.concurrent.ConcurrentHashMap
import akka.dispatch.{ RequiresMessageQueue, UnboundedMessageQueueSemantics }
import akka.util.ByteString.UTF_8
import akka.util.OptionVal
import scala.collection.immutable
import akka.actor.ActorInitializationException
Various scala-2.13.0-M5 fixes fix akka-actor-tests compile errors some tests still fail though Fix test failures in akka-actor-test Manually work arround missing implicit Factory[Nothing, Seq[Nothing]] see https://github.com/scala/scala-collection-compat/issues/137 akka-remote scalafix changes Fix shutdownAll compile error test:akka-remote scalafix changes akka-multi-node-testkit scalafix Fix akka-remote-tests multi-jvm compile errors akka-stream-tests/test:scalafix Fix test:akka-stream-tests Crude implementation of ByteString.map scalafix akka-actor-typed, akka-actor-typed-tests akka-actor-typed-tests compile and succeed scalafix akka-camel scalafix akka-cluster akka-cluster compile & test scalafix akka-cluster-metrics Fix akka-cluster-metrics scalafix akka-cluster-tools akka-cluster-tools compile and test scalafix akka-distributed-data akka-distributed-data fixes scalafix akka-persistence scalafix akka-cluster-sharding fix akka-cluster-sharding scalafix akka-contrib Fix akka-cluster-sharding-typed test scalafix akka-docs Use scala-stm 0.9 (released for M5) akka-docs Remove dependency on collections-compat Cherry-pick the relevant constructs to our own private utils Shorten 'scala.collections.immutable' by importing it Duplicate 'immutable' imports Use 'foreach' on futures Replace MapLike with regular Map Internal API markers Simplify ccompat by moving PackageShared into object Since we don't currently need to differentiate between 2.11 and Avoid relying on 'union' (and ++) being left-biased Fix akka-actor/doc by removing -Ywarn-unused Make more things more private Copyright headers Use 'unsorted' to go from SortedSet to Set Duplicate import Use onComplete rather than failed.foreach Clarify why we partly duplicate scala-collection-compat
2018-11-22 16:18:10 +01:00
import akka.util.ccompat._
/**
* INTERNAL API
*/
private[remote] object AddressUrlEncoder {
def apply(address: Address): String = URLEncoder.encode(address.toString, UTF_8)
}
/**
* INTERNAL API
*/
private[akka] final case class RARP(provider: RemoteActorRefProvider) extends Extension {
def configureDispatcher(props: Props): Props = provider.remoteSettings.configureDispatcher(props)
}
2019-03-11 10:38:24 +01:00
/**
* INTERNAL API
*/
private[akka] object RARP extends ExtensionId[RARP] with ExtensionIdProvider {
override def lookup() = RARP
override def createExtension(system: ExtendedActorSystem) = RARP(system.provider.asInstanceOf[RemoteActorRefProvider])
}
/**
* INTERNAL API
* Messages marked with this trait will be sent before other messages when buffering is active.
* This means that these messages don't obey normal message ordering.
* It is used for failure detector heartbeat messages.
*
* In Artery this is not used, and instead a preconfigured set of destinations select the priority lane.
*/
private[akka] trait PriorityMessage
/**
* Failure detector heartbeat messages are marked with this trait.
*/
private[akka] trait HeartbeatMessage extends PriorityMessage
/**
* INTERNAL API
*/
private[remote] object Remoting {
2012-11-22 13:33:48 +01:00
final val EndpointManagerName = "endpointManager"
2019-03-11 10:38:24 +01:00
def localAddressForRemote(transportMapping: Map[String, Set[(AkkaProtocolTransport, Address)]],
remote: Address): Address = {
transportMapping.get(remote.protocol) match {
case Some(transports) =>
val responsibleTransports = transports.filter { case (t, _) => t.isResponsibleFor(remote) }
responsibleTransports.size match {
case 0 =>
throw new RemoteTransportException(
s"No transport is responsible for address: [$remote] although protocol [${remote.protocol}] is available." +
2019-03-11 10:38:24 +01:00
" Make sure at least one transport is configured to be responsible for the address.",
null)
case 1 =>
responsibleTransports.head._2
case _ =>
throw new RemoteTransportException(
s"Multiple transports are available for [$remote]: [${responsibleTransports.mkString(",")}]. " +
2019-03-11 10:38:24 +01:00
"Remoting cannot decide which transport to use to reach the remote system. Change your configuration " +
"so that only one transport is responsible for the address.",
null)
}
2019-03-11 10:38:24 +01:00
case None =>
throw new RemoteTransportException(
s"No transport is loaded for protocol: [${remote.protocol}], available protocols: [${transportMapping.keys
.mkString(", ")}]",
null)
}
}
2014-03-07 13:20:01 +01:00
final case class RegisterTransportActor(props: Props, name: String) extends NoSerializationVerificationNeeded
private[Remoting] class TransportSupervisor extends Actor with RequiresMessageQueue[UnboundedMessageQueueSemantics] {
override def supervisorStrategy = OneForOneStrategy() {
case NonFatal(_) => Restart
}
def receive = {
case RegisterTransportActor(props, name) =>
2019-03-11 10:38:24 +01:00
sender() ! context.actorOf(RARP(context.system).configureDispatcher(props.withDeploy(Deploy.local)), name)
}
}
}
/**
* INTERNAL API
*/
2019-03-11 10:38:24 +01:00
private[remote] class Remoting(_system: ExtendedActorSystem, _provider: RemoteActorRefProvider)
extends RemoteTransport(_system, _provider) {
@volatile private var endpointManager: Option[ActorRef] = None
@volatile private var transportMapping: Map[String, Set[(AkkaProtocolTransport, Address)]] = _
// This is effectively a write-once variable similar to a lazy val. The reason for not using a lazy val is exception
// handling.
@volatile var addresses: Set[Address] = _
// This variable has the same semantics as the addresses variable, in the sense it is written once, and emulates
// a lazy val
@volatile var defaultAddress: Address = _
import provider.remoteSettings._
2019-03-11 10:38:24 +01:00
val transportSupervisor = system.systemActorOf(configureDispatcher(Props[TransportSupervisor]), "transports")
2019-03-11 10:38:24 +01:00
override def localAddressForRemote(remote: Address): Address =
Remoting.localAddressForRemote(transportMapping, remote)
val log: LoggingAdapter = Logging(system.eventStream, getClass.getName)
val eventPublisher = new EventPublisher(system, log, RemoteLifecycleEventsLogLevel)
private def notifyError(msg: String, cause: Throwable): Unit =
eventPublisher.notifyListeners(RemotingErrorEvent(new RemoteTransportException(msg, cause)))
override def shutdown(): Future[Done] = {
endpointManager match {
case Some(manager) =>
implicit val timeout = ShutdownTimeout
def finalize(): Unit = {
2012-12-13 14:27:34 +01:00
eventPublisher.notifyListeners(RemotingShutdownEvent)
endpointManager = None
}
import system.dispatcher
2019-03-11 10:38:24 +01:00
(manager ? ShutdownAndFlush)
.mapTo[Boolean]
.andThen {
case Success(flushSuccessful) =>
if (!flushSuccessful)
log.warning(
"Shutdown finished, but flushing might not have been successful and some messages might have been dropped. " +
"Increase akka.remote.flush-wait-on-shutdown to a larger value to avoid this.")
finalize()
case Failure(e) =>
notifyError("Failure during shutdown of remoting.", e)
finalize()
}
.map { _ =>
Done
} // RARP needs only akka.Done, not a boolean
case None =>
log.warning("Remoting is not running. Ignoring shutdown attempt.")
2019-03-11 10:38:24 +01:00
Future.successful(Done)
}
}
// Start assumes that it cannot be followed by another start() without having a shutdown() first
override def start(): Unit = {
endpointManager match {
case None =>
log.info("Starting remoting")
val manager: ActorRef = system.systemActorOf(
2019-03-11 10:38:24 +01:00
configureDispatcher(Props(classOf[EndpointManager], provider.remoteSettings.config, log))
.withDeploy(Deploy.local),
Remoting.EndpointManagerName)
endpointManager = Some(manager)
try {
val addressesPromise: Promise[Seq[(AkkaProtocolTransport, Address)]] = Promise()
manager ! Listen(addressesPromise)
2019-03-11 10:38:24 +01:00
val transports: Seq[(AkkaProtocolTransport, Address)] =
Await.result(addressesPromise.future, StartupTimeout.duration)
if (transports.isEmpty) throw new RemoteTransportException("No transport drivers were loaded.", null)
2019-03-11 10:38:24 +01:00
transportMapping = transports
.groupBy {
case (transport, _) => transport.schemeIdentifier
}
.map { case (k, v) => k -> v.toSet }
defaultAddress = transports.head._2
addresses = transports.map { _._2 }.toSet
2012-12-07 16:03:04 +01:00
log.info("Remoting started; listening on addresses :" + addresses.mkString("[", ", ", "]"))
manager ! StartupFinished
eventPublisher.notifyListeners(RemotingListenEvent(addresses))
} catch {
case e: TimeoutException =>
2019-03-11 10:38:24 +01:00
notifyError(
"Startup timed out. This is usually related to actor system host setting or host name resolution misconfiguration.",
e)
throw e
case NonFatal(e) =>
notifyError("Startup failed", e)
throw e
}
case Some(_) =>
log.warning("Remoting was already started. Ignoring start attempt.")
}
}
2019-03-11 10:38:24 +01:00
override def send(message: Any, senderOption: OptionVal[ActorRef], recipient: RemoteActorRef): Unit =
endpointManager match {
case Some(manager) =>
manager.tell(Send(message, senderOption, recipient), sender = senderOption.getOrElse(Actor.noSender))
case None =>
throw new RemoteTransportExceptionNoStackTrace("Attempted to send remote message but Remoting is not running.",
null)
}
override def managementCommand(cmd: Any): Future[Boolean] = endpointManager match {
case Some(manager) =>
import system.dispatcher
implicit val timeout = CommandAckTimeout
2019-03-11 10:38:24 +01:00
(manager ? ManagementCommand(cmd)).map { case ManagementCommandAck(status) => status }
case None =>
throw new RemoteTransportExceptionNoStackTrace(
"Attempted to send management command but Remoting is not running.",
null)
}
override def quarantine(remoteAddress: Address, uid: Option[Long], reason: String): Unit = endpointManager match {
case Some(manager) =>
manager ! Quarantine(remoteAddress, uid.map(_.toInt))
2019-03-11 10:38:24 +01:00
case _ =>
throw new RemoteTransportExceptionNoStackTrace(
s"Attempted to quarantine address [$remoteAddress] with UID [$uid] but Remoting is not running",
null)
2013-04-18 17:35:43 +02:00
}
private[akka] def boundAddresses: Map[String, Set[Address]] = {
transportMapping.map {
case (scheme, transports) =>
scheme -> transports.flatMap {
// Need to do like this for binary compatibility reasons
case (t, _) => Option(t.boundAddress)
}
}
}
}
/**
* INTERNAL API
*/
private[remote] object EndpointManager {
2012-12-13 14:27:34 +01:00
// Messages between Remoting and EndpointManager
sealed trait RemotingCommand extends NoSerializationVerificationNeeded
2014-03-07 13:20:01 +01:00
final case class Listen(addressesPromise: Promise[Seq[(AkkaProtocolTransport, Address)]]) extends RemotingCommand
2012-11-22 13:33:48 +01:00
case object StartupFinished extends RemotingCommand
2012-12-13 14:27:34 +01:00
case object ShutdownAndFlush extends RemotingCommand
2019-03-11 10:38:24 +01:00
final case class Send(message: Any,
senderOption: OptionVal[ActorRef],
recipient: RemoteActorRef,
seqOpt: Option[SeqNo] = None)
extends RemotingCommand
with HasSequenceNumber {
override def toString = s"Remote message $senderOption -> $recipient"
// This MUST throw an exception to indicate that we attempted to put a nonsequenced message in one of the
// acknowledged delivery buffers
def seq = seqOpt.get
}
2014-03-07 13:20:01 +01:00
final case class Quarantine(remoteAddress: Address, uid: Option[Int]) extends RemotingCommand
final case class ManagementCommand(cmd: Any) extends RemotingCommand
final case class ManagementCommandAck(status: Boolean)
2012-12-13 14:27:34 +01:00
// Messages internal to EndpointManager
case object Prune extends NoSerializationVerificationNeeded
2019-03-11 10:38:24 +01:00
final case class ListensResult(addressesPromise: Promise[Seq[(AkkaProtocolTransport, Address)]],
results: Seq[(AkkaProtocolTransport, Address, Promise[AssociationEventListener])])
extends NoSerializationVerificationNeeded
2014-03-07 13:20:01 +01:00
final case class ListensFailure(addressesPromise: Promise[Seq[(AkkaProtocolTransport, Address)]], cause: Throwable)
2019-03-11 10:38:24 +01:00
extends NoSerializationVerificationNeeded
// Helper class to store address pairs
2014-03-07 13:20:01 +01:00
final case class Link(localAddress: Address, remoteAddress: Address)
2014-03-07 13:20:01 +01:00
final case class ResendState(uid: Int, buffer: AckedReceiveBuffer[Message])
sealed trait EndpointPolicy {
/**
* Indicates that the policy does not contain an active endpoint, but it is a tombstone of a previous failure
*/
def isTombstone: Boolean
}
final case class Pass(endpoint: ActorRef, uid: Option[Int]) extends EndpointPolicy {
override def isTombstone: Boolean = false
}
final case class Gated(timeOfRelease: Deadline) extends EndpointPolicy {
override def isTombstone: Boolean = true
}
2014-03-07 13:20:01 +01:00
final case class Quarantined(uid: Int, timeOfRelease: Deadline) extends EndpointPolicy {
override def isTombstone: Boolean = true
}
// Not threadsafe -- only to be used in HeadActor
class EndpointRegistry {
private var addressToRefuseUid = HashMap[Address, (Int, Deadline)]()
private var addressToWritable = HashMap[Address, EndpointPolicy]()
private var writableToAddress = HashMap[ActorRef, Address]()
private var addressToReadonly = HashMap[Address, (ActorRef, Int)]()
private var readonlyToAddress = HashMap[ActorRef, Address]()
def registerWritableEndpoint(address: Address, uid: Option[Int], endpoint: ActorRef): ActorRef =
addressToWritable.get(address) match {
case Some(Pass(e, _)) =>
throw new IllegalArgumentException(s"Attempting to overwrite existing endpoint [$e] with [$endpoint]")
case _ =>
// note that this overwrites Quarantine marker,
// but that is ok since we keep the quarantined uid in addressToRefuseUid
addressToWritable += address -> Pass(endpoint, uid)
writableToAddress += endpoint -> address
endpoint
}
def registerWritableEndpointUid(remoteAddress: Address, uid: Int): Unit = {
addressToWritable.get(remoteAddress) match {
case Some(Pass(ep, _)) => addressToWritable += remoteAddress -> Pass(ep, Some(uid))
case _ =>
2016-05-14 16:51:45 +08:00
}
}
def registerWritableEndpointRefuseUid(remoteAddress: Address, refuseUid: Int, timeOfRelease: Deadline): Unit = {
addressToRefuseUid = addressToRefuseUid.updated(remoteAddress, (refuseUid, timeOfRelease))
}
def registerReadOnlyEndpoint(address: Address, endpoint: ActorRef, uid: Int): ActorRef = {
addressToReadonly += address -> ((endpoint, uid))
readonlyToAddress += endpoint -> address
endpoint
2012-11-22 13:33:48 +01:00
}
def unregisterEndpoint(endpoint: ActorRef): Unit =
if (isWritable(endpoint)) {
val address = writableToAddress(endpoint)
addressToWritable.get(address) match {
case Some(policy) if policy.isTombstone => // There is already a tombstone directive, leave it there
case _ => addressToWritable -= address
}
writableToAddress -= endpoint
// leave the refuseUid
} else if (isReadOnly(endpoint)) {
addressToReadonly -= readonlyToAddress(endpoint)
readonlyToAddress -= endpoint
}
def addressForWriter(writer: ActorRef): Option[Address] = writableToAddress.get(writer)
def writableEndpointWithPolicyFor(address: Address): Option[EndpointPolicy] = addressToWritable.get(address)
def hasWritableEndpointFor(address: Address): Boolean = writableEndpointWithPolicyFor(address) match {
case Some(_: Pass) => true
case _ => false
}
def readOnlyEndpointFor(address: Address): Option[(ActorRef, Int)] = addressToReadonly.get(address)
def isWritable(endpoint: ActorRef): Boolean = writableToAddress contains endpoint
def isReadOnly(endpoint: ActorRef): Boolean = readonlyToAddress contains endpoint
2013-04-18 17:35:43 +02:00
def isQuarantined(address: Address, uid: Int): Boolean = writableEndpointWithPolicyFor(address) match {
// timeOfRelease is only used for garbage collection. If an address is still probed, we should report the
// known fact that it is quarantined.
case Some(Quarantined(`uid`, _)) => true
case _ =>
addressToRefuseUid.get(address).exists { case (refuseUid, _) => refuseUid == uid }
}
def refuseUid(address: Address): Option[Int] = writableEndpointWithPolicyFor(address) match {
// timeOfRelease is only used for garbage collection. If an address is still probed, we should report the
// known fact that it is quarantined.
case Some(Quarantined(uid, _)) => Some(uid)
case _ => addressToRefuseUid.get(address).map { case (refuseUid, _) => refuseUid }
}
/**
* Marking an endpoint as failed means that we will not try to connect to the remote system within
* the gated period but it is ok for the remote system to try to connect to us.
*/
def markAsFailed(endpoint: ActorRef, timeOfRelease: Deadline): Unit =
if (isWritable(endpoint)) {
val address = writableToAddress(endpoint)
addressToWritable.get(address) match {
case Some(Quarantined(_, _)) => // don't overwrite Quarantined with Gated
case Some(Pass(_, _)) =>
addressToWritable += address -> Gated(timeOfRelease)
writableToAddress -= endpoint
case Some(Gated(_)) => // already gated
case None =>
addressToWritable += address -> Gated(timeOfRelease)
writableToAddress -= endpoint
}
} else if (isReadOnly(endpoint)) {
addressToReadonly -= readonlyToAddress(endpoint)
readonlyToAddress -= endpoint
}
def markAsQuarantined(address: Address, uid: Int, timeOfRelease: Deadline): Unit = {
addressToWritable += address -> Quarantined(uid, timeOfRelease)
addressToRefuseUid = addressToRefuseUid.updated(address, (uid, timeOfRelease))
}
def removePolicy(address: Address): Unit =
addressToWritable -= address
def allEndpoints: collection.Iterable[ActorRef] = writableToAddress.keys ++ readonlyToAddress.keys
2013-04-18 17:35:43 +02:00
def prune(): Unit = {
addressToWritable = addressToWritable.collect {
case entry @ (_, Gated(timeOfRelease)) if timeOfRelease.hasTimeLeft =>
// Gated removed when no time left
entry
case entry @ (_, Quarantined(_, timeOfRelease)) if timeOfRelease.hasTimeLeft =>
// Quarantined removed when no time left
entry
case entry @ (_, _: Pass) => entry
}
addressToRefuseUid = addressToRefuseUid.collect {
case entry @ (_, (_, timeOfRelease)) if timeOfRelease.hasTimeLeft =>
// // Quarantined/refuseUid removed when no time left
entry
}
}
}
}
/**
* INTERNAL API
*/
2019-03-11 10:38:24 +01:00
private[remote] class EndpointManager(conf: Config, log: LoggingAdapter)
extends Actor
with RequiresMessageQueue[UnboundedMessageQueueSemantics] {
import EndpointManager._
import context.dispatcher
val settings = new RemoteSettings(conf)
val extendedSystem = context.system.asInstanceOf[ExtendedActorSystem]
2019-03-11 10:38:24 +01:00
val endpointId: Iterator[Int] = Iterator.from(0)
val eventPublisher = new EventPublisher(context.system, log, settings.RemoteLifecycleEventsLogLevel)
// Mapping between addresses and endpoint actors. If passive connections are turned off, incoming connections
// will be not part of this map!
val endpoints = new EndpointRegistry
// Mapping between transports and the local addresses they listen to
var transportMapping: Map[Address, AkkaProtocolTransport] = Map()
val pruneInterval: FiniteDuration = (settings.RetryGateClosedFor * 2).max(1.second).min(10.seconds)
val pruneTimerCancellable: Cancellable =
context.system.scheduler.schedule(pruneInterval, pruneInterval, self, Prune)
2013-06-03 11:35:37 +02:00
var pendingReadHandoffs = Map[ActorRef, AkkaProtocolHandle]()
var stashedInbound = Map[ActorRef, Vector[InboundAssociation]]()
2013-06-03 11:35:37 +02:00
2018-07-25 20:38:27 +09:00
def handleStashedInbound(endpoint: ActorRef, writerIsIdle: Boolean): Unit = {
val stashed = stashedInbound.getOrElse(endpoint, Vector.empty)
stashedInbound -= endpoint
2019-03-11 10:38:24 +01:00
stashed.foreach(handleInboundAssociation(_, writerIsIdle))
}
def keepQuarantinedOr(remoteAddress: Address)(body: => Unit): Unit = endpoints.refuseUid(remoteAddress) match {
case Some(uid) =>
2019-03-11 10:38:24 +01:00
log.info("Quarantined address [{}] is still unreachable or has not been restarted. Keeping it quarantined.",
remoteAddress)
// Restoring Quarantine marker overwritten by a Pass(endpoint, refuseUid) pair while probing remote system.
endpoints.markAsQuarantined(remoteAddress, uid, Deadline.now + settings.QuarantineDuration)
case None => body
}
override val supervisorStrategy = {
def hopeless(e: HopelessAssociation): SupervisorStrategy.Directive = e match {
case HopelessAssociation(_, remoteAddress, Some(uid), reason) =>
2019-03-11 10:38:24 +01:00
log.error(reason,
"Association to [{}] with UID [{}] irrecoverably failed. Quarantining address.",
remoteAddress,
uid)
settings.QuarantineDuration match {
case d: FiniteDuration =>
endpoints.markAsQuarantined(remoteAddress, uid, Deadline.now + d)
eventPublisher.notifyListeners(QuarantinedEvent(remoteAddress, uid))
case _ => // disabled
}
Stop
case HopelessAssociation(_, remoteAddress, None, _) =>
keepQuarantinedOr(remoteAddress) {
2019-03-11 10:38:24 +01:00
log.warning("Association to [{}] with unknown UID is irrecoverably failed. " +
"Address cannot be quarantined without knowing the UID, gating instead for {} ms.",
remoteAddress,
settings.RetryGateClosedFor.toMillis)
endpoints.markAsFailed(sender(), Deadline.now + settings.RetryGateClosedFor)
}
Stop
}
OneForOneStrategy(loggingEnabled = false) {
case InvalidAssociation(localAddress, remoteAddress, reason, disassiciationInfo) =>
keepQuarantinedOr(remoteAddress) {
val causedBy = if (reason.getCause == null) "" else s"Caused by: [${reason.getCause.getMessage}]"
log.warning(
"Tried to associate with unreachable remote address [{}]. " +
2019-03-11 10:38:24 +01:00
"Address is now gated for {} ms, all messages to this address will be delivered to dead letters. " +
"Reason: [{}] {}",
remoteAddress,
settings.RetryGateClosedFor.toMillis,
reason.getMessage,
causedBy)
endpoints.markAsFailed(sender(), Deadline.now + settings.RetryGateClosedFor)
}
disassiciationInfo.foreach {
case AssociationHandle.Quarantined =>
context.system.eventStream.publish(ThisActorSystemQuarantinedEvent(localAddress, remoteAddress))
case _ => // do nothing
}
Stop
case ShutDownAssociation(_, remoteAddress, _) =>
keepQuarantinedOr(remoteAddress) {
2019-03-11 10:38:24 +01:00
log.debug("Remote system with address [{}] has shut down. " +
"Address is now gated for {} ms, all messages to this address will be delivered to dead letters.",
remoteAddress,
settings.RetryGateClosedFor.toMillis)
endpoints.markAsFailed(sender(), Deadline.now + settings.RetryGateClosedFor)
}
Stop
case e: HopelessAssociation =>
hopeless(e)
case e: ActorInitializationException if e.getCause.isInstanceOf[HopelessAssociation] =>
hopeless(e.getCause.asInstanceOf[HopelessAssociation])
2013-04-18 17:35:43 +02:00
case NonFatal(e) =>
e match {
case _: EndpointDisassociatedException | _: EndpointAssociationException => // no logging
2019-03-11 10:38:24 +01:00
case _ => log.error(e, e.getMessage)
}
endpoints.markAsFailed(sender(), Deadline.now + settings.RetryGateClosedFor)
Stop
}
}
// Structure for saving reliable delivery state across restarts of Endpoints
val receiveBuffers = new ConcurrentHashMap[Link, ResendState]()
def receive = {
case Listen(addressesPromise) =>
2019-03-11 10:38:24 +01:00
listens
.map { ListensResult(addressesPromise, _) }
.recover {
case NonFatal(e) => ListensFailure(addressesPromise, e)
}
.pipeTo(self)
case ListensResult(addressesPromise, results) =>
2019-03-11 10:38:24 +01:00
transportMapping = results
.groupBy {
case (_, transportAddress, _) => transportAddress
}
.map {
case (a, t) if t.size > 1 =>
throw new RemoteTransportException(s"There are more than one transports listening on local address [$a]",
null)
case (a, t) => a -> t.head._1
}
// Register to each transport as listener and collect mapping to addresses
2019-03-11 10:38:24 +01:00
val transportsAndAddresses = results.map {
case (transport, address, promise) =>
promise.success(ActorAssociationEventListener(self))
transport -> address
}
addressesPromise.success(transportsAndAddresses)
case ListensFailure(addressesPromise, cause) =>
addressesPromise.failure(cause)
case ia: InboundAssociation =>
context.system.scheduler.scheduleOnce(10.milliseconds, self, ia)
case ManagementCommand(_) =>
sender() ! ManagementCommandAck(status = false)
case StartupFinished =>
2013-04-19 13:36:39 +02:00
context.become(accepting)
case ShutdownAndFlush =>
sender() ! true
2013-04-19 13:36:39 +02:00
context.stop(self) // Nothing to flush at this point
}
val accepting: Receive = {
case ManagementCommand(cmd) =>
val allStatuses: immutable.Seq[Future[Boolean]] =
transportMapping.values.iterator.map(transport => transport.managementCommand(cmd)).to(immutable.IndexedSeq)
2019-03-11 10:38:24 +01:00
akka.compat.Future.fold(allStatuses)(true)(_ && _).map(ManagementCommandAck).pipeTo(sender())
2013-04-19 13:36:39 +02:00
case Quarantine(address, uidToQuarantineOption) =>
// Stop writers
(endpoints.writableEndpointWithPolicyFor(address), uidToQuarantineOption) match {
case (Some(Pass(endpoint, _)), None) =>
context.stop(endpoint)
2019-03-11 10:38:24 +01:00
log.warning("Association to [{}] with unknown UID is reported as quarantined, but " +
"address cannot be quarantined without knowing the UID, gating instead for {} ms.",
address,
settings.RetryGateClosedFor.toMillis)
endpoints.markAsFailed(endpoint, Deadline.now + settings.RetryGateClosedFor)
case (Some(Pass(endpoint, uidOption)), Some(quarantineUid)) =>
2016-05-14 16:51:45 +08:00
uidOption match {
case Some(`quarantineUid`) =>
2016-05-14 16:51:45 +08:00
endpoints.markAsQuarantined(address, quarantineUid, Deadline.now + settings.QuarantineDuration)
eventPublisher.notifyListeners(QuarantinedEvent(address, quarantineUid))
context.stop(endpoint)
// or it does not match with the UID to be quarantined
case None if !endpoints.refuseUid(address).contains(quarantineUid) =>
2016-05-14 16:51:45 +08:00
// the quarantine uid may be got fresh by cluster gossip, so update refuseUid for late handle when the writer got uid
2019-03-11 10:38:24 +01:00
endpoints.registerWritableEndpointRefuseUid(address,
quarantineUid,
Deadline.now + settings.QuarantineDuration)
case _ => //the quarantine uid has lost the race with some failure, do nothing
2016-05-14 16:51:45 +08:00
}
case (Some(Quarantined(uid, _)), Some(quarantineUid)) if uid == quarantineUid => // the UID to be quarantined already exists, do nothing
2019-03-11 10:38:24 +01:00
case (_, Some(quarantineUid)) =>
2016-05-14 16:51:45 +08:00
// the current state is gated or quarantined, and we know the UID, update
endpoints.markAsQuarantined(address, quarantineUid, Deadline.now + settings.QuarantineDuration)
eventPublisher.notifyListeners(QuarantinedEvent(address, quarantineUid))
case _ => // the current state is Gated, WasGated or Quarantined, and we don't know the UID, do nothing.
}
// Stop inbound read-only associations
(endpoints.readOnlyEndpointFor(address), uidToQuarantineOption) match {
case (Some((endpoint, _)), None) => context.stop(endpoint)
2019-03-11 10:38:24 +01:00
case (Some((endpoint, currentUid)), Some(quarantineUid)) if currentUid == quarantineUid =>
context.stop(endpoint)
case _ => // nothing to stop
}
def matchesQuarantine(handle: AkkaProtocolHandle): Boolean = {
handle.remoteAddress == address &&
2019-03-11 10:38:24 +01:00
uidToQuarantineOption.forall(_ == handle.handshakeInfo.uid)
}
// Stop all matching pending read handoffs
pendingReadHandoffs = pendingReadHandoffs.filter {
case (pendingActor, pendingHandle) =>
val drop = matchesQuarantine(pendingHandle)
// Side-effecting here
if (drop) {
pendingHandle.disassociate("the pending handle was quarantined", log)
context.stop(pendingActor)
}
!drop
}
// Stop all matching stashed connections
stashedInbound = stashedInbound.map {
case (writer, associations) =>
writer -> associations.filter { assoc =>
val handle = assoc.association.asInstanceOf[AkkaProtocolHandle]
val drop = matchesQuarantine(handle)
if (drop) handle.disassociate("the stashed inbound handle was quarantined", log)
!drop
}
}
case s @ Send(_, _, recipientRef, _) =>
val recipientAddress = recipientRef.path.address
def createAndRegisterWritingEndpoint(): ActorRef = {
2019-03-11 10:38:24 +01:00
endpoints.registerWritableEndpoint(recipientAddress,
uid = None,
createEndpoint(recipientAddress,
recipientRef.localAddressToUse,
transportMapping(recipientRef.localAddressToUse),
settings,
handleOption = None,
writing = true))
}
endpoints.writableEndpointWithPolicyFor(recipientAddress) match {
case Some(Pass(endpoint, _)) =>
endpoint ! s
case Some(Gated(timeOfRelease)) =>
if (timeOfRelease.isOverdue()) createAndRegisterWritingEndpoint() ! s
else extendedSystem.deadLetters ! s
case Some(Quarantined(_, _)) =>
// timeOfRelease is only used for garbage collection reasons, therefore it is ignored here. We still have
// the Quarantined tombstone and we know what UID we don't want to accept, so use it.
createAndRegisterWritingEndpoint() ! s
case None =>
createAndRegisterWritingEndpoint() ! s
}
case ia @ InboundAssociation(_: AkkaProtocolHandle) =>
handleInboundAssociation(ia, writerIsIdle = false)
case EndpointWriter.StoppedReading(endpoint) =>
2013-06-03 11:35:37 +02:00
acceptPendingReader(takingOverFrom = endpoint)
case Terminated(endpoint) =>
2013-06-03 11:35:37 +02:00
acceptPendingReader(takingOverFrom = endpoint)
endpoints.unregisterEndpoint(endpoint)
handleStashedInbound(endpoint, writerIsIdle = false)
case EndpointWriter.TookOver(endpoint, handle) =>
removePendingReader(takingOverFrom = endpoint, withHandle = handle)
case ReliableDeliverySupervisor.GotUid(uid, remoteAddress) =>
val refuseUidOption = endpoints.refuseUid(remoteAddress)
2016-05-14 16:51:45 +08:00
endpoints.writableEndpointWithPolicyFor(remoteAddress) match {
case Some(Pass(endpoint, _)) =>
2016-05-14 16:51:45 +08:00
if (refuseUidOption.contains(uid)) {
endpoints.markAsQuarantined(remoteAddress, uid, Deadline.now + settings.QuarantineDuration)
eventPublisher.notifyListeners(QuarantinedEvent(remoteAddress, uid))
context.stop(endpoint)
} else endpoints.registerWritableEndpointUid(remoteAddress, uid)
handleStashedInbound(sender(), writerIsIdle = false)
case _ => // the GotUid might have lost the race with some failure
2016-05-14 16:51:45 +08:00
}
case ReliableDeliverySupervisor.Idle =>
handleStashedInbound(sender(), writerIsIdle = true)
case Prune =>
2013-04-18 17:35:43 +02:00
endpoints.prune()
case ShutdownAndFlush =>
// Shutdown all endpoints and signal to sender() when ready (and whether all endpoints were shut down gracefully)
def shutdownAll[T](resources: IterableOnce[T])(shutdown: T => Future[Boolean]): Future[Boolean] = {
2019-03-11 10:38:24 +01:00
Future.sequence(resources.toList.map(shutdown)).map(_.forall(identity)).recover {
case NonFatal(_) => false
}
}
(for {
// The construction of the future for shutdownStatus has to happen after the flushStatus future has been finished
// so that endpoints are shut down before transports.
2019-03-11 10:38:24 +01:00
flushStatus <- shutdownAll(endpoints.allEndpoints)(
gracefulStop(_, settings.FlushWait, EndpointWriter.FlushAndStop))
shutdownStatus <- shutdownAll(transportMapping.values)(_.shutdown())
2019-03-11 10:38:24 +01:00
} yield flushStatus && shutdownStatus).pipeTo(sender())
2019-03-11 10:38:24 +01:00
pendingReadHandoffs.valuesIterator.foreach(_.disassociate(AssociationHandle.Shutdown))
2012-12-13 14:27:34 +01:00
// Ignore all other writes
normalShutdown = true
2012-12-13 14:27:34 +01:00
context.become(flushing)
}
def flushing: Receive = {
case s: Send => extendedSystem.deadLetters ! s
case InboundAssociation(h: AkkaProtocolHandle) => h.disassociate(AssociationHandle.Shutdown)
case Terminated(_) => // why should we care now?
2012-12-13 14:27:34 +01:00
}
def handleInboundAssociation(ia: InboundAssociation, writerIsIdle: Boolean): Unit = ia match {
2019-03-11 10:38:24 +01:00
case ia @ InboundAssociation(handle: AkkaProtocolHandle) =>
endpoints.readOnlyEndpointFor(handle.remoteAddress) match {
case Some((endpoint, _)) =>
pendingReadHandoffs
.get(endpoint)
.foreach(_.disassociate("the existing readOnly association was replaced by a new incoming one", log))
pendingReadHandoffs += endpoint -> handle
endpoint ! EndpointWriter.TakeOver(handle, self)
endpoints.writableEndpointWithPolicyFor(handle.remoteAddress) match {
case Some(Pass(ep, _)) => ep ! ReliableDeliverySupervisor.Ungate
case _ =>
}
case None =>
if (endpoints.isQuarantined(handle.remoteAddress, handle.handshakeInfo.uid))
handle.disassociate(AssociationHandle.Quarantined)
else
endpoints.writableEndpointWithPolicyFor(handle.remoteAddress) match {
case Some(Pass(ep, None)) =>
// Idle writer will never send a GotUid or a Terminated so we need to "provoke it"
// to get an unstash event
if (!writerIsIdle) {
ep ! ReliableDeliverySupervisor.IsIdle
stashedInbound += ep -> (stashedInbound.getOrElse(ep, Vector.empty) :+ ia)
} else
createAndRegisterEndpoint(handle)
case Some(Pass(ep, Some(uid))) =>
if (handle.handshakeInfo.uid == uid) {
pendingReadHandoffs
.get(ep)
.foreach(
_.disassociate("the existing writable association was replaced by a new incoming one", log))
pendingReadHandoffs += ep -> handle
ep ! EndpointWriter.StopReading(ep, self)
ep ! ReliableDeliverySupervisor.Ungate
} else {
context.stop(ep)
endpoints.unregisterEndpoint(ep)
pendingReadHandoffs -= ep
endpoints.markAsQuarantined(handle.remoteAddress, uid, Deadline.now + settings.QuarantineDuration)
createAndRegisterEndpoint(handle)
}
case _ =>
createAndRegisterEndpoint(handle)
}
2019-03-11 10:38:24 +01:00
}
case _ => // ignore
}
private def createAndRegisterEndpoint(handle: AkkaProtocolHandle): Unit = {
val writing = settings.UsePassiveConnections && !endpoints.hasWritableEndpointFor(handle.remoteAddress)
eventPublisher.notifyListeners(AssociatedEvent(handle.localAddress, handle.remoteAddress, inbound = true))
2019-03-11 10:38:24 +01:00
val endpoint = createEndpoint(handle.remoteAddress,
handle.localAddress,
transportMapping(handle.localAddress),
settings,
Some(handle),
writing)
if (writing)
endpoints.registerWritableEndpoint(handle.remoteAddress, Some(handle.handshakeInfo.uid), endpoint)
else {
endpoints.registerReadOnlyEndpoint(handle.remoteAddress, endpoint, handle.handshakeInfo.uid)
if (!endpoints.hasWritableEndpointFor(handle.remoteAddress))
endpoints.removePolicy(handle.remoteAddress)
}
}
private def listens: Future[Seq[(AkkaProtocolTransport, Address, Promise[AssociationEventListener])]] = {
/*
* Constructs chains of adapters on top of each driver as given in configuration. The resulting structure looks
* like the following:
* AkkaProtocolTransport <- Adapter <- ... <- Adapter <- Driver
*
* The transports variable contains only the heads of each chains (the AkkaProtocolTransport instances).
*/
val transports: Seq[AkkaProtocolTransport] = for ((fqn, adapters, config) <- settings.Transports) yield {
val args = Seq(classOf[ExtendedActorSystem] -> context.system, classOf[Config] -> config)
// Loads the driver -- the bottom element of the chain.
// The chain at this point:
// Driver
val driver = extendedSystem.dynamicAccess
2019-03-11 10:38:24 +01:00
.createInstanceFor[Transport](fqn, args)
.recover({
2019-03-11 10:38:24 +01:00
case exception =>
throw new IllegalArgumentException(
s"Cannot instantiate transport [$fqn]. " +
"Make sure it extends [akka.remote.transport.Transport] and has constructor with " +
2019-03-11 10:38:24 +01:00
"[akka.actor.ExtendedActorSystem] and [com.typesafe.config.Config] parameters",
exception)
2019-03-11 10:38:24 +01:00
})
.get
// Iteratively decorates the bottom level driver with a list of adapters.
// The chain at this point:
// Adapter <- ... <- Adapter <- Driver
val wrappedTransport =
adapters.map { TransportAdaptersExtension.get(context.system).getAdapterProvider }.foldLeft(driver) {
(t: Transport, provider: TransportAdapterProvider) =>
// The TransportAdapterProvider will wrap the given Transport and returns with a wrapped one
provider.create(t, context.system.asInstanceOf[ExtendedActorSystem])
}
// Apply AkkaProtocolTransport wrapper to the end of the chain
// The chain at this point:
// AkkaProtocolTransport <- Adapter <- ... <- Adapter <- Driver
new AkkaProtocolTransport(wrappedTransport, context.system, new AkkaProtocolSettings(conf), AkkaPduProtobufCodec)
}
// Collect all transports, listen addresses and listener promises in one future
Future.sequence(transports.map { transport =>
2019-03-11 10:38:24 +01:00
transport.listen.map { case (address, listenerPromise) => (transport, address, listenerPromise) }
})
}
2013-06-03 11:35:37 +02:00
private def acceptPendingReader(takingOverFrom: ActorRef): Unit = {
if (pendingReadHandoffs.contains(takingOverFrom)) {
val handle = pendingReadHandoffs(takingOverFrom)
pendingReadHandoffs -= takingOverFrom
eventPublisher.notifyListeners(AssociatedEvent(handle.localAddress, handle.remoteAddress, inbound = true))
2019-03-11 10:38:24 +01:00
val endpoint = createEndpoint(handle.remoteAddress,
handle.localAddress,
transportMapping(handle.localAddress),
settings,
Some(handle),
writing = false)
endpoints.registerReadOnlyEndpoint(handle.remoteAddress, endpoint, handle.handshakeInfo.uid)
2013-06-03 11:35:37 +02:00
}
}
private def removePendingReader(takingOverFrom: ActorRef, withHandle: AkkaProtocolHandle): Unit = {
if (pendingReadHandoffs.get(takingOverFrom).exists(handle => handle == withHandle))
pendingReadHandoffs -= takingOverFrom
}
2019-03-11 10:38:24 +01:00
private def createEndpoint(remoteAddress: Address,
localAddress: Address,
transport: AkkaProtocolTransport,
endpointSettings: RemoteSettings,
handleOption: Option[AkkaProtocolHandle],
writing: Boolean): ActorRef = {
require(transportMapping contains localAddress, "Transport mapping is not defined for the address")
// refuseUid is ignored for read-only endpoints since the UID of the remote system is already known and has passed
// quarantine checks
val refuseUid = endpoints.refuseUid(remoteAddress)
2019-03-11 10:38:24 +01:00
if (writing)
context.watch(
context.actorOf(
RARP(extendedSystem)
.configureDispatcher(
ReliableDeliverySupervisor.props(handleOption,
localAddress,
remoteAddress,
refuseUid,
transport,
endpointSettings,
AkkaPduProtobufCodec,
receiveBuffers))
.withDeploy(Deploy.local),
"reliableEndpointWriter-" + AddressUrlEncoder(remoteAddress) + "-" + endpointId.next()))
else
context.watch(
context.actorOf(
RARP(extendedSystem)
.configureDispatcher(
EndpointWriter.props(handleOption,
localAddress,
remoteAddress,
refuseUid,
transport,
endpointSettings,
AkkaPduProtobufCodec,
receiveBuffers,
reliableDeliverySupervisor = None))
.withDeploy(Deploy.local),
"endpointWriter-" + AddressUrlEncoder(remoteAddress) + "-" + endpointId.next()))
}
private var normalShutdown = false
override def postStop(): Unit = {
pruneTimerCancellable.cancel()
2019-03-11 10:38:24 +01:00
pendingReadHandoffs.valuesIterator.foreach(_.disassociate(AssociationHandle.Shutdown))
if (!normalShutdown) {
// Remaining running endpoints are children, so they will clean up themselves.
// We still need to clean up any remaining transports because handles might be in mailboxes, and for example
// Netty is not part of the actor hierarchy, so its handles will not be cleaned up if no actor is taking
// responsibility of them (because they are sitting in a mailbox).
log.error("Remoting system has been terminated abrubtly. Attempting to shut down transports")
// The result of this shutdown is async, should we try to Await for a short duration?
2019-03-11 10:38:24 +01:00
transportMapping.values.map(_.shutdown())
}
}
}