pekko/akka-cluster/src/main/scala/akka/cluster/Cluster.scala

556 lines
22 KiB
Scala
Raw Normal View History

/*
* Copyright (C) 2009-2019 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.cluster
import java.io.Closeable
import java.util.concurrent.ThreadFactory
import java.util.concurrent.atomic.AtomicBoolean
import akka.ConfigurationException
import akka.actor._
import akka.annotation.InternalApi
import akka.cluster.ClusterSettings.DataCenter
import akka.dispatch.MonitorableThreadFactory
import akka.event.{ Logging, LoggingAdapter }
import akka.japi.Util
import akka.pattern._
Various scala-2.13.0-M5 fixes fix akka-actor-tests compile errors some tests still fail though Fix test failures in akka-actor-test Manually work arround missing implicit Factory[Nothing, Seq[Nothing]] see https://github.com/scala/scala-collection-compat/issues/137 akka-remote scalafix changes Fix shutdownAll compile error test:akka-remote scalafix changes akka-multi-node-testkit scalafix Fix akka-remote-tests multi-jvm compile errors akka-stream-tests/test:scalafix Fix test:akka-stream-tests Crude implementation of ByteString.map scalafix akka-actor-typed, akka-actor-typed-tests akka-actor-typed-tests compile and succeed scalafix akka-camel scalafix akka-cluster akka-cluster compile & test scalafix akka-cluster-metrics Fix akka-cluster-metrics scalafix akka-cluster-tools akka-cluster-tools compile and test scalafix akka-distributed-data akka-distributed-data fixes scalafix akka-persistence scalafix akka-cluster-sharding fix akka-cluster-sharding scalafix akka-contrib Fix akka-cluster-sharding-typed test scalafix akka-docs Use scala-stm 0.9 (released for M5) akka-docs Remove dependency on collections-compat Cherry-pick the relevant constructs to our own private utils Shorten 'scala.collections.immutable' by importing it Duplicate 'immutable' imports Use 'foreach' on futures Replace MapLike with regular Map Internal API markers Simplify ccompat by moving PackageShared into object Since we don't currently need to differentiate between 2.11 and Avoid relying on 'union' (and ++) being left-biased Fix akka-actor/doc by removing -Ywarn-unused Make more things more private Copyright headers Use 'unsorted' to go from SortedSet to Set Duplicate import Use onComplete rather than failed.foreach Clarify why we partly duplicate scala-collection-compat
2018-11-22 16:18:10 +01:00
import akka.remote.{ UniqueAddress _, _ }
import com.typesafe.config.{ Config, ConfigFactory }
import scala.annotation.varargs
import scala.collection.immutable
import scala.concurrent.duration._
import scala.concurrent.{ Await, ExecutionContext }
import scala.util.control.NonFatal
import akka.event.Logging.LogLevel
/**
* Cluster Extension Id and factory for creating Cluster extension.
*/
object Cluster extends ExtensionId[Cluster] with ExtensionIdProvider {
override def get(system: ActorSystem): Cluster = super.get(system)
override def lookup = Cluster
override def createExtension(system: ExtendedActorSystem): Cluster = new Cluster(system)
/**
* INTERNAL API
*/
private[cluster] final val isAssertInvariantsEnabled: Boolean =
System.getProperty("akka.cluster.assert", "off").toLowerCase match {
case "on" | "true" true
case _ false
}
}
/**
2013-07-08 13:42:44 +02:00
* This module is responsible cluster membership information. Changes to the cluster
* information is retrieved through [[#subscribe]]. Commands to operate the cluster is
* available through methods in this class, such as [[#join]], [[#down]] and [[#leave]].
*
* Each cluster [[Member]] is identified by its [[akka.actor.Address]], and
* the cluster address of this actor system is [[#selfAddress]]. A member also has a status;
* initially [[MemberStatus]] `Joining` followed by [[MemberStatus]] `Up`.
*/
class Cluster(val system: ExtendedActorSystem) extends Extension {
import ClusterEvent._
val settings = new ClusterSettings(system.settings.config, system.name)
import ClusterLogger._
import settings._
private val joinConfigCompatChecker: JoinConfigCompatChecker = JoinConfigCompatChecker.load(system, settings)
/**
* The address including a `uid` of this cluster member.
* The `uid` is needed to be able to distinguish different
* incarnations of a member with same hostname and port.
*/
val selfUniqueAddress: UniqueAddress = system.provider match {
case c: ClusterActorRefProvider
UniqueAddress(c.transport.defaultAddress, AddressUidExtension(system).longAddressUid)
case other throw new ConfigurationException(
s"ActorSystem [${system}] needs to have 'akka.actor.provider' set to 'cluster' in the configuration, currently uses [${other.getClass.getName}]")
}
/**
* The address of this cluster member.
*/
def selfAddress: Address = selfUniqueAddress.address
/** Data center to which this node belongs to (defaults to "default" if not configured explicitly) */
def selfDataCenter: DataCenter = settings.SelfDataCenter
/**
* roles that this member has
*/
def selfRoles: Set[String] = settings.Roles
/**
* Java API: roles that this member has
*/
def getSelfRoles: java.util.Set[String] =
scala.collection.JavaConverters.setAsJavaSetConverter(selfRoles).asJava
private val _isTerminated = new AtomicBoolean(false)
private val log = Logging(system, getClass.getName)
// ClusterJmx is initialized as the last thing in the constructor
private var clusterJmx: Option[ClusterJmx] = None
logInfo("Starting up, Akka version [{}] ...", system.settings.ConfigVersion)
val failureDetector: FailureDetectorRegistry[Address] = {
val createFailureDetector = ()
FailureDetectorLoader.load(settings.FailureDetectorImplementationClass, settings.FailureDetectorConfig, system)
new DefaultFailureDetectorRegistry(createFailureDetector)
}
val crossDcFailureDetector: FailureDetectorRegistry[Address] = {
val createFailureDetector = ()
2017-07-07 13:19:10 +01:00
FailureDetectorLoader.load(
settings.MultiDataCenter.CrossDcFailureDetectorSettings.ImplementationClass,
settings.MultiDataCenter.CrossDcFailureDetectorSettings.config, system)
new DefaultFailureDetectorRegistry(createFailureDetector)
}
// needs to be lazy to allow downing provider impls to access Cluster (if not we get deadlock)
lazy val downingProvider: DowningProvider =
DowningProvider.load(settings.DowningProviderClassName, system)
// ========================================================
// ===================== WORK DAEMONS =====================
// ========================================================
/**
* INTERNAL API
*/
private[cluster] val scheduler: Scheduler = {
if (system.scheduler.maxFrequency < 1.second / SchedulerTickDuration) {
logInfo(
"Using a dedicated scheduler for cluster. Default scheduler can be used if configured " +
"with 'akka.scheduler.tick-duration' [{} ms] <= 'akka.cluster.scheduler.tick-duration' [{} ms].",
(1000 / system.scheduler.maxFrequency).toInt, SchedulerTickDuration.toMillis)
val cfg = ConfigFactory.parseString(
s"akka.scheduler.tick-duration=${SchedulerTickDuration.toMillis}ms").withFallback(
system.settings.config)
val threadFactory = system.threadFactory match {
case tf: MonitorableThreadFactory tf.withName(tf.name + "-cluster-scheduler")
case tf tf
}
system.dynamicAccess.createInstanceFor[Scheduler](system.settings.SchedulerClass, immutable.Seq(
classOf[Config] cfg,
classOf[LoggingAdapter] log,
classOf[ThreadFactory] threadFactory)).get
} else {
// delegate to system.scheduler, but don't close over system
val systemScheduler = system.scheduler
new Scheduler with Closeable {
override def close(): Unit = () // we are using system.scheduler, which we are not responsible for closing
override def maxFrequency: Double = systemScheduler.maxFrequency
override def schedule(initialDelay: FiniteDuration, interval: FiniteDuration,
runnable: Runnable)(implicit executor: ExecutionContext): Cancellable =
systemScheduler.schedule(initialDelay, interval, runnable)
override def scheduleOnce(
delay: FiniteDuration,
runnable: Runnable)(implicit executor: ExecutionContext): Cancellable =
systemScheduler.scheduleOnce(delay, runnable)
}
}
}
// create supervisor for daemons under path "/system/cluster"
private val clusterDaemons: ActorRef = {
system.systemActorOf(Props(classOf[ClusterDaemon], joinConfigCompatChecker).
withDispatcher(UseDispatcher).withDeploy(Deploy.local), name = "cluster")
}
/**
* INTERNAL API
*/
private[cluster] val clusterCore: ActorRef = {
implicit val timeout = system.settings.CreationTimeout
try {
Await.result((clusterDaemons ? InternalClusterAction.GetClusterCoreRef).mapTo[ActorRef], timeout.duration)
} catch {
case NonFatal(e)
log.error(e, "Failed to startup Cluster. You can try to increase 'akka.actor.creation-timeout'.")
shutdown()
// don't re-throw, that would cause the extension to be re-recreated
// from shutdown() or other places, which may result in
// InvalidActorNameException: actor name [cluster] is not unique
system.deadLetters
}
}
private[cluster] val readView: ClusterReadView = new ClusterReadView(this)
system.registerOnTermination(shutdown())
if (JmxEnabled)
clusterJmx = {
val jmx = new ClusterJmx(this, log)
jmx.createMBean()
Some(jmx)
}
logInfo("Started up successfully")
// ======================================================
// ===================== PUBLIC API =====================
// ======================================================
/**
* Returns true if this cluster instance has be shutdown.
*/
def isTerminated: Boolean = _isTerminated.get
/**
* Current snapshot state of the cluster.
*/
def state: CurrentClusterState = readView.state
/**
* Current snapshot of the member itself
*/
def selfMember: Member = readView.self
/**
* Subscribe to one or more cluster domain events.
* The `to` classes can be [[akka.cluster.ClusterEvent.ClusterDomainEvent]]
* or subclasses.
*
* A snapshot of [[akka.cluster.ClusterEvent.CurrentClusterState]]
* will be sent to the subscriber as the first message.
*/
@varargs def subscribe(subscriber: ActorRef, to: Class[_]*): Unit =
subscribe(subscriber, initialStateMode = InitialStateAsSnapshot, to: _*)
/**
* Subscribe to one or more cluster domain events.
* The `to` classes can be [[akka.cluster.ClusterEvent.ClusterDomainEvent]]
* or subclasses.
*
* If `initialStateMode` is `ClusterEvent.InitialStateAsEvents` the events corresponding
* to the current state will be sent to the subscriber to mimic what you would
* have seen if you were listening to the events when they occurred in the past.
*
* If `initialStateMode` is `ClusterEvent.InitialStateAsSnapshot` a snapshot of
* [[akka.cluster.ClusterEvent.CurrentClusterState]] will be sent to the subscriber as the
* first message.
*
* Note that for large clusters it is more efficient to use `InitialStateAsSnapshot`.
*/
@varargs def subscribe(subscriber: ActorRef, initialStateMode: SubscriptionInitialStateMode, to: Class[_]*): Unit = {
require(to.length > 0, "at least one `ClusterDomainEvent` class is required")
require(
to.forall(classOf[ClusterDomainEvent].isAssignableFrom),
s"subscribe to `akka.cluster.ClusterEvent.ClusterDomainEvent` or subclasses, was [${to.map(_.getName).mkString(", ")}]")
clusterCore ! InternalClusterAction.Subscribe(subscriber, initialStateMode, to.toSet)
}
/**
* Unsubscribe to all cluster domain events.
*/
def unsubscribe(subscriber: ActorRef): Unit =
clusterCore ! InternalClusterAction.Unsubscribe(subscriber, None)
/**
* Unsubscribe to a specific type of cluster domain events,
* matching previous `subscribe` registration.
*/
def unsubscribe(subscriber: ActorRef, to: Class[_]): Unit =
clusterCore ! InternalClusterAction.Unsubscribe(subscriber, Some(to))
/**
* Send current (full) state of the cluster to the specified
* receiver. If you want this to happen periodically you need to schedule
* a call to this method yourself. Note that you can also retrieve the current
* state with [[#state]].
*/
def sendCurrentClusterState(receiver: ActorRef): Unit =
clusterCore ! InternalClusterAction.SendCurrentClusterState(receiver)
/**
* Try to join this cluster node with the node specified by 'address'.
* A 'Join(selfAddress)' command is sent to the node to join.
*
* An actor system can only join a cluster once. Additional attempts will be ignored.
* When it has successfully joined it must be restarted to be able to join another
* cluster or to join the same cluster again.
*
* The name of the [[akka.actor.ActorSystem]] must be the same for all members of a
* cluster.
*/
def join(address: Address): Unit =
clusterCore ! ClusterUserAction.JoinTo(fillLocal(address))
private def fillLocal(address: Address): Address = {
// local address might be used if grabbed from actorRef.path.address
if (address.hasLocalScope && address.system == selfAddress.system) selfAddress
else address
}
/**
* Join the specified seed nodes without defining them in config.
* Especially useful from tests when Addresses are unknown before startup time.
*
* An actor system can only join a cluster once. Additional attempts will be ignored.
* When it has successfully joined it must be restarted to be able to join another
* cluster or to join the same cluster again.
*/
def joinSeedNodes(seedNodes: immutable.Seq[Address]): Unit =
clusterCore ! InternalClusterAction.JoinSeedNodes(seedNodes.toVector.map(fillLocal))
/**
* Java API
*
* Join the specified seed nodes without defining them in config.
* Especially useful from tests when Addresses are unknown before startup time.
*
* An actor system can only join a cluster once. Additional attempts will be ignored.
* When it has successfully joined it must be restarted to be able to join another
* cluster or to join the same cluster again.
*/
def joinSeedNodes(seedNodes: java.util.List[Address]): Unit =
joinSeedNodes(Util.immutableSeq(seedNodes))
/**
* Send command to issue state transition to LEAVING for the node specified by 'address'.
* The member will go through the status changes [[MemberStatus]] `Leaving` (not published to
* subscribers) followed by [[MemberStatus]] `Exiting` and finally [[MemberStatus]] `Removed`.
2013-07-08 13:42:44 +02:00
*
* Note that this command can be issued to any member in the cluster, not necessarily the
* one that is leaving. The cluster extension, but not the actor system or JVM, of the
* leaving member will be shutdown after the leader has changed status of the member to
* Exiting. Thereafter the member will be removed from the cluster. Normally this is
* handled automatically, but in case of network failures during this process it might
* still be necessary to set the nodes status to Down in order to complete the removal.
*/
def leave(address: Address): Unit =
clusterCore ! ClusterUserAction.Leave(fillLocal(address))
/**
* Send command to DOWN the node specified by 'address'.
2013-07-08 13:42:44 +02:00
*
* When a member is considered by the failure detector to be unreachable the leader is not
* allowed to perform its duties, such as changing status of new joining members to 'Up'.
* The status of the unreachable member must be changed to 'Down', which can be done with
* this method.
*/
def down(address: Address): Unit =
clusterCore ! ClusterUserAction.Down(fillLocal(address))
/**
* The supplied thunk will be run, once, when current cluster member is `Up`.
* Typically used together with configuration option `akka.cluster.min-nr-of-members`
* to defer some action, such as starting actors, until the cluster has reached
* a certain size.
*/
def registerOnMemberUp[T](code: T): Unit =
registerOnMemberUp(new Runnable { def run() = code })
/**
* Java API: The supplied callback will be run, once, when current cluster member is `Up`.
* Typically used together with configuration option `akka.cluster.min-nr-of-members`
* to defer some action, such as starting actors, until the cluster has reached
* a certain size.
*/
def registerOnMemberUp(callback: Runnable): Unit =
clusterDaemons ! InternalClusterAction.AddOnMemberUpListener(callback)
/**
* The supplied thunk will be run, once, when current cluster member is `Removed`.
* If the cluster has already been shutdown the thunk will run on the caller thread immediately.
* Typically used together `cluster.leave(cluster.selfAddress)` and then `system.terminate()`.
*/
def registerOnMemberRemoved[T](code: T): Unit =
registerOnMemberRemoved(new Runnable { override def run(): Unit = code })
/**
* Java API: The supplied thunk will be run, once, when current cluster member is `Removed`.
* If the cluster has already been shutdown the thunk will run on the caller thread immediately.
* Typically used together `cluster.leave(cluster.selfAddress)` and then `system.terminate()`.
*/
def registerOnMemberRemoved(callback: Runnable): Unit = {
if (_isTerminated.get())
callback.run()
else
clusterDaemons ! InternalClusterAction.AddOnMemberRemovedListener(callback)
}
/**
* Generate the remote actor path by replacing the Address in the RootActor Path for the given
* ActorRef with the cluster's `selfAddress`, unless address' host is already defined
*/
def remotePathOf(actorRef: ActorRef): ActorPath = {
val path = actorRef.path
if (path.address.host.isDefined) {
path
} else {
path.root.copy(selfAddress) / path.elements withUid path.uid
}
}
// ========================================================
// ===================== INTERNAL API =====================
// ========================================================
/**
* INTERNAL API.
*
* Shuts down all connections to other members, the cluster daemon and the periodic gossip and cleanup tasks.
*
* Should not called by the user. The user can issue a LEAVE command which will tell the node
* to go through graceful handoff process `LEAVE -&gt; EXITING -&gt; REMOVED -&gt; SHUTDOWN`.
*/
@InternalApi private[cluster] def shutdown(): Unit = {
if (_isTerminated.compareAndSet(false, true)) {
logInfo("Shutting down...")
system.stop(clusterDaemons)
// readView might be null if init fails before it is created
if (readView != null)
readView.close()
closeScheduler()
clusterJmx foreach { _.unregisterMBean() }
logInfo("Successfully shut down")
}
}
private def closeScheduler(): Unit = scheduler match {
case x: Closeable x.close()
case _ // ignore, this is fine
}
/**
* INTERNAL API
*/
private[cluster] object ClusterLogger {
def isDebugEnabled: Boolean =
log.isDebugEnabled
def logDebug(message: String): Unit =
logAtLevel(Logging.DebugLevel, message)
def logDebug(template: String, arg1: Any): Unit =
logAtLevel(Logging.InfoLevel, template, arg1)
def logDebug(template: String, arg1: Any, arg2: Any): Unit =
logAtLevel(Logging.InfoLevel, template, arg1, arg2)
def logDebug(template: String, arg1: Any, arg2: Any, arg3: Any): Unit =
logAtLevel(Logging.InfoLevel, template, arg1, arg2, arg3)
def logInfo(message: String): Unit =
logAtLevel(Logging.InfoLevel, message)
def logInfo(template: String, arg1: Any): Unit =
logAtLevel(Logging.InfoLevel, template, arg1)
def logInfo(template: String, arg1: Any, arg2: Any): Unit =
logAtLevel(Logging.InfoLevel, template, arg1, arg2)
def logInfo(template: String, arg1: Any, arg2: Any, arg3: Any): Unit =
logAtLevel(Logging.InfoLevel, template, arg1, arg2, arg3)
def logWarning(message: String): Unit =
logAtLevel(Logging.WarningLevel, message)
def logWarning(template: String, arg1: Any): Unit =
logAtLevel(Logging.WarningLevel, template, arg1)
def logWarning(template: String, arg1: Any, arg2: Any): Unit =
logAtLevel(Logging.WarningLevel, template, arg1, arg2)
def logWarning(template: String, arg1: Any, arg2: Any, arg3: Any): Unit =
logAtLevel(Logging.WarningLevel, template, arg1, arg2, arg3)
def logError(message: String): Unit =
logAtLevel(Logging.ErrorLevel, message)
def logError(template: String, arg1: Any): Unit =
logAtLevel(Logging.ErrorLevel, template, arg1)
def logError(template: String, arg1: Any, arg2: Any): Unit =
logAtLevel(Logging.ErrorLevel, template, arg1, arg2)
def logError(template: String, arg1: Any, arg2: Any, arg3: Any): Unit =
logAtLevel(Logging.ErrorLevel, template, arg1, arg2, arg3)
def logError(cause: Throwable, message: String): Unit = {
if (settings.SelfDataCenter == ClusterSettings.DefaultDataCenter)
log.error(cause, "Cluster Node [{}] - {}", selfAddress, message)
else
log.error(cause, "Cluster Node [{}] dc [{}] - {}", selfAddress, settings.SelfDataCenter, message)
}
def logError(cause: Throwable, template: String, arg1: Any): Unit = {
if (settings.SelfDataCenter == ClusterSettings.DefaultDataCenter)
log.error(cause, "Cluster Node [{}] - " + template, selfAddress, arg1)
else
log.error(cause, "Cluster Node [{}] dc [{}] - " + template, selfAddress, settings.SelfDataCenter, arg1)
}
def logError(cause: Throwable, template: String, arg1: Any, arg2: Any): Unit = {
if (settings.SelfDataCenter == ClusterSettings.DefaultDataCenter)
log.error(cause, "Cluster Node [{}] - " + template, selfAddress, arg1, arg2)
else
log.error(cause, "Cluster Node [{}] dc [{}] - " + template, selfAddress, settings.SelfDataCenter, arg1, arg2)
}
def logError(cause: Throwable, template: String, arg1: Any, arg2: Any, arg3: Any): Unit = {
if (settings.SelfDataCenter == ClusterSettings.DefaultDataCenter)
log.error(cause, "Cluster Node [{}] - " + template, selfAddress, arg1, arg2, arg3)
else
log.error(cause, "Cluster Node [{}] dc [" + settings.SelfDataCenter + "] - " + template, selfAddress, arg1, arg2, arg3)
}
private def logAtLevel(logLevel: LogLevel, message: String): Unit = {
if (isLevelEnabled(logLevel))
if (settings.SelfDataCenter == ClusterSettings.DefaultDataCenter)
log.log(logLevel, "Cluster Node [{}] - {}", selfAddress, message)
else
log.log(logLevel, "Cluster Node [{}] dc [{}] - {}", selfAddress, settings.SelfDataCenter, message)
}
private def logAtLevel(logLevel: LogLevel, template: String, arg1: Any): Unit = {
if (isLevelEnabled(logLevel))
if (settings.SelfDataCenter == ClusterSettings.DefaultDataCenter)
log.log(logLevel, "Cluster Node [{}] - " + template, selfAddress, arg1)
else
log.log(logLevel, "Cluster Node [{}] dc [{}] - " + template, selfAddress, settings.SelfDataCenter, arg1)
}
private def logAtLevel(logLevel: LogLevel, template: String, arg1: Any, arg2: Any): Unit =
if (isLevelEnabled(logLevel))
if (settings.SelfDataCenter == ClusterSettings.DefaultDataCenter)
log.log(logLevel, "Cluster Node [{}] - " + template, selfAddress, arg1, arg2)
else
log.log(logLevel, "Cluster Node [{}] dc [{}] - " + template, selfAddress, settings.SelfDataCenter, arg1, arg2)
private def logAtLevel(logLevel: LogLevel, template: String, arg1: Any, arg2: Any, arg3: Any): Unit =
if (isLevelEnabled(logLevel))
if (settings.SelfDataCenter == ClusterSettings.DefaultDataCenter)
log.log(logLevel, "Cluster Node [{}] - " + template, selfAddress, arg1, arg2, arg3)
else
log.log(logLevel, "Cluster Node [{}] dc [" + settings.SelfDataCenter + "] - " + template, selfAddress, arg1, arg2, arg3)
private def isLevelEnabled(logLevel: LogLevel): Boolean =
LogInfo || logLevel < Logging.InfoLevel
}
}