pekko/akka-cluster/src/main/scala/akka/cluster/Cluster.scala

1667 lines
63 KiB
Scala
Raw Normal View History

/**
2012-01-19 18:21:06 +01:00
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.cluster
import akka.actor._
import akka.actor.Status._
import akka.ConfigurationException
import akka.dispatch.Await
import akka.dispatch.MonitorableThreadFactory
import akka.event.Logging
import akka.jsr166y.ThreadLocalRandom
import akka.pattern._
import akka.remote._
import akka.routing._
import akka.util._
import akka.util.duration._
import akka.util.internal.HashedWheelTimer
import com.google.protobuf.ByteString
import java.io.Closeable
import java.lang.management.ManagementFactory
import java.util.concurrent.atomic.{ AtomicReference, AtomicBoolean }
import java.util.concurrent.TimeoutException
import java.util.concurrent.TimeUnit._
import javax.management._
2012-06-13 15:33:38 +02:00
import MemberStatus._
import scala.annotation.tailrec
import scala.collection.immutable.{ Map, SortedSet }
import scala.collection.GenTraversableOnce
import java.util.concurrent.atomic.AtomicLong
import java.security.MessageDigest
/**
* Interface for membership change listener.
*/
trait MembershipChangeListener {
def notify(members: SortedSet[Member]): Unit
}
/**
* Interface for meta data change listener.
*/
trait MetaDataChangeListener {
def notify(meta: Map[String, Array[Byte]]): Unit
}
/**
* Base trait for all cluster messages. All ClusterMessage's are serializable.
*
* FIXME Protobuf all ClusterMessages
*/
sealed trait ClusterMessage extends Serializable
/**
* Cluster commands sent by the USER.
*/
object ClusterUserAction {
/**
* Command to initiate join another node (represented by 'address').
* Join will be sent to the other node.
*/
case class JoinTo(address: Address) extends ClusterMessage
/**
* Command to join the cluster. Sent when a node (represented by 'address')
* wants to join another node (the receiver).
*/
case class Join(address: Address) extends ClusterMessage
/**
* Command to leave the cluster.
*/
case class Leave(address: Address) extends ClusterMessage
/**
* Command to mark node as temporary down.
*/
case class Down(address: Address) extends ClusterMessage
}
/**
* INTERNAL API
*/
object InternalClusterAction {
/**
* Start message of the process to join one of the seed nodes.
* The node sends `InitJoin` to all seed nodes, which replies
* with `InitJoinAck`. The first reply is used others are discarded.
* The node sends `Join` command to the seed node that replied first.
*/
case object JoinSeedNode extends ClusterMessage
/**
* @see JoinSeedNode
*/
case object InitJoin extends ClusterMessage
/**
* @see JoinSeedNode
*/
case class InitJoinAck(address: Address) extends ClusterMessage
/**
*
* Command to [akka.cluster.ClusterHeartbeatSender]], which will send [[akka.cluster.Heartbeat]]
* to the other node.
* Local only, no need to serialize.
*/
case class SendHeartbeat(heartbeatMsg: Heartbeat, to: Address, deadline: Deadline)
case object GossipTick
case object HeartbeatTick
case object ReapUnreachableTick
case object LeaderActionsTick
case object PublishStateTick
case class SendClusterMessage(to: Address, msg: ClusterMessage)
case class SendGossipTo(address: Address)
case object GetClusterCoreRef
case class Ping(timestamp: Long = System.currentTimeMillis) extends ClusterMessage
case class Pong(ping: Ping, timestamp: Long = System.currentTimeMillis) extends ClusterMessage
}
/**
* Cluster commands sent by the LEADER.
*/
object ClusterLeaderAction {
/**
* INTERNAL API.
*
* Command to mark a node to be removed from the cluster immediately.
* Can only be sent by the leader.
*/
private[cluster] case class Exit(address: Address) extends ClusterMessage
/**
* INTERNAL API.
*
* Command to remove a node from the cluster immediately.
*/
private[cluster] case class Remove(address: Address) extends ClusterMessage
}
/**
* Represents the address and the current status of a cluster member node.
*
* Note: `hashCode` and `equals` are solely based on the underlying `Address`, not its `MemberStatus`.
*/
class Member(val address: Address, val status: MemberStatus) extends ClusterMessage {
override def hashCode = address.##
override def equals(other: Any) = Member.unapply(this) == Member.unapply(other)
override def toString = "Member(address = %s, status = %s)" format (address, status)
def copy(address: Address = this.address, status: MemberStatus = this.status): Member = new Member(address, status)
}
/**
* Module with factory and ordering methods for Member instances.
*/
object Member {
2012-05-25 12:59:14 +02:00
/**
* `Address` ordering type class, sorts addresses by host and port.
2012-05-25 12:59:14 +02:00
*/
implicit val addressOrdering: Ordering[Address] = Ordering.fromLessThan[Address] { (a, b)
2012-05-25 12:59:14 +02:00
if (a.host != b.host) a.host.getOrElse("").compareTo(b.host.getOrElse("")) < 0
else if (a.port != b.port) a.port.getOrElse(0) < b.port.getOrElse(0)
else false
}
/**
* `Member` ordering type class, sorts members by host and port with the exception that
* it puts all members that are in MemberStatus.EXITING last.
*/
implicit val ordering: Ordering[Member] = Ordering.fromLessThan[Member] { (a, b)
if (a.status == Exiting && b.status != Exiting) false
else if (a.status != Exiting && b.status == Exiting) true
else addressOrdering.compare(a.address, b.address) < 0
}
def apply(address: Address, status: MemberStatus): Member = new Member(address, status)
def unapply(other: Any) = other match {
case m: Member Some(m.address)
case _ None
}
def pickHighestPriority(a: Set[Member], b: Set[Member]): Set[Member] = {
// group all members by Address => Seq[Member]
val groupedByAddress = (a.toSeq ++ b.toSeq).groupBy(_.address)
// pick highest MemberStatus
(Set.empty[Member] /: groupedByAddress) {
case (acc, (_, members)) acc + members.reduceLeft(highestPriorityOf)
}
}
/**
* Picks the Member with the highest "priority" MemberStatus.
*/
def highestPriorityOf(m1: Member, m2: Member): Member = (m1.status, m2.status) match {
case (Removed, _) m1
case (_, Removed) m2
case (Down, _) m1
case (_, Down) m2
case (Exiting, _) m1
case (_, Exiting) m2
case (Leaving, _) m1
case (_, Leaving) m2
case (Up, Joining) m2
case (Joining, Up) m1
case (Joining, Joining) m1
case (Up, Up) m1
}
// FIXME Workaround for https://issues.scala-lang.org/browse/SI-5986
// SortedSet + and ++ operators replaces existing element
// Use these :+ and :++ operators for the Gossip members
implicit def sortedSetWorkaround(sortedSet: SortedSet[Member]): SortedSetWorkaround = new SortedSetWorkaround(sortedSet)
class SortedSetWorkaround(sortedSet: SortedSet[Member]) {
implicit def :+(elem: Member): SortedSet[Member] = {
if (sortedSet.contains(elem)) sortedSet
else sortedSet + elem
}
implicit def :++(elems: GenTraversableOnce[Member]): SortedSet[Member] =
sortedSet ++ (elems.toSet diff sortedSet)
}
}
/**
* Envelope adding a sender address to the gossip.
*/
case class GossipEnvelope(from: Address, gossip: Gossip, conversation: Boolean = true) extends ClusterMessage
/**
* When conflicting versions of received and local [[akka.cluster.Gossip]] is detected
* it's forwarded to the leader for conflict resolution.
*/
case class GossipMergeConflict(a: GossipEnvelope, b: GossipEnvelope) extends ClusterMessage
/**
* Defines the current status of a cluster member node
*
* Can be one of: Joining, Up, Leaving, Exiting and Down.
*/
sealed trait MemberStatus extends ClusterMessage {
/**
* Using the same notion for 'unavailable' as 'non-convergence': DOWN
*/
def isUnavailable: Boolean = this == Down
}
object MemberStatus {
case object Joining extends MemberStatus
case object Up extends MemberStatus
case object Leaving extends MemberStatus
case object Exiting extends MemberStatus
case object Down extends MemberStatus
case object Removed extends MemberStatus
}
/**
* Represents the overview of the cluster, holds the cluster convergence table and set with unreachable nodes.
*/
case class GossipOverview(
seen: Map[Address, VectorClock] = Map.empty,
unreachable: Set[Member] = Set.empty) {
def isNonDownUnreachable(address: Address): Boolean =
unreachable.exists { m m.address == address && m.status != Down }
override def toString =
"GossipOverview(seen = [" + seen.mkString(", ") +
"], unreachable = [" + unreachable.mkString(", ") +
"])"
}
object Gossip {
val emptyMembers: SortedSet[Member] = SortedSet.empty
}
/**
* Represents the state of the cluster; cluster ring membership, ring convergence, meta data -
* all versioned by a vector clock.
*
* When a node is joining the `Member`, with status `Joining`, is added to `members`.
* If the joining node was downed it is moved from `overview.unreachable` (status `Down`)
* to `members` (status `Joining`). It cannot rejoin if not first downed.
*
* When convergence is reached the leader change status of `members` from `Joining`
* to `Up`.
*
* When failure detector consider a node as unavailable it will be moved from
* `members` to `overview.unreachable`.
*
* When a node is downed, either manually or automatically, its status is changed to `Down`.
* It is also removed from `overview.seen` table. The node will reside as `Down` in the
* `overview.unreachable` set until joining again and it will then go through the normal
* joining procedure.
*
* When a `Gossip` is received the version (vector clock) is used to determine if the
* received `Gossip` is newer or older than the current local `Gossip`. The received `Gossip`
* and local `Gossip` is merged in case of conflicting version, i.e. vector clocks without
* same history. When merged the seen table is cleared.
*
* When a node is told by the user to leave the cluster the leader will move it to `Leaving`
* and then rebalance and repartition the cluster and start hand-off by migrating the actors
* from the leaving node to the new partitions. Once this process is complete the leader will
* move the node to the `Exiting` state and once a convergence is complete move the node to
* `Removed` by removing it from the `members` set and sending a `Removed` command to the
* removed node telling it to shut itself down.
*/
case class Gossip(
overview: GossipOverview = GossipOverview(),
members: SortedSet[Member] = Gossip.emptyMembers, // sorted set of members with their status, sorted by address
meta: Map[String, Array[Byte]] = Map.empty,
version: VectorClock = VectorClock()) // vector clock version
extends ClusterMessage // is a serializable cluster message
with Versioned[Gossip] {
// FIXME can be disabled as optimization
assertInvariants
private def assertInvariants: Unit = {
val unreachableAndLive = members.intersect(overview.unreachable)
if (unreachableAndLive.nonEmpty)
throw new IllegalArgumentException("Same nodes in both members and unreachable is not allowed, got [%s]"
format unreachableAndLive.mkString(", "))
2012-06-13 15:33:38 +02:00
val allowedLiveMemberStatuses: Set[MemberStatus] = Set(Joining, Up, Leaving, Exiting)
def hasNotAllowedLiveMemberStatus(m: Member) = !allowedLiveMemberStatuses.contains(m.status)
if (members exists hasNotAllowedLiveMemberStatus)
throw new IllegalArgumentException("Live members must have status [%s], got [%s]"
format (allowedLiveMemberStatuses.mkString(", "),
(members filter hasNotAllowedLiveMemberStatus).mkString(", ")))
val seenButNotMember = overview.seen.keySet -- members.map(_.address) -- overview.unreachable.map(_.address)
if (seenButNotMember.nonEmpty)
throw new IllegalArgumentException("Nodes not part of cluster have marked the Gossip as seen, got [%s]"
format seenButNotMember.mkString(", "))
}
/**
* Increments the version for this 'Node'.
*/
def :+(node: VectorClock.Node): Gossip = copy(version = version :+ node)
/**
* Adds a member to the member node ring.
*/
def :+(member: Member): Gossip = {
if (members contains member) this
else this copy (members = members :+ member)
}
/**
* Marks the gossip as seen by this node (address) by updating the address entry in the 'gossip.overview.seen'
* Map with the VectorClock (version) for the new gossip.
*/
def seen(address: Address): Gossip = {
if (overview.seen.contains(address) && overview.seen(address) == version) this
else this copy (overview = overview copy (seen = overview.seen + (address -> version)))
}
/**
* Merges two Gossip instances including membership tables, meta-data tables and the VectorClock histories.
*/
def merge(that: Gossip): Gossip = {
import Member.ordering
// 1. merge vector clocks
val mergedVClock = this.version merge that.version
// 2. merge meta-data
val mergedMeta = this.meta ++ that.meta
// 3. merge unreachable by selecting the single Member with highest MemberStatus out of the Member groups
val mergedUnreachable = Member.pickHighestPriority(this.overview.unreachable, that.overview.unreachable)
// 4. merge members by selecting the single Member with highest MemberStatus out of the Member groups,
// and exclude unreachable
val mergedMembers = Gossip.emptyMembers :++ Member.pickHighestPriority(this.members, that.members).filterNot(mergedUnreachable.contains)
// 5. fresh seen table
val mergedSeen = Map.empty[Address, VectorClock]
Gossip(GossipOverview(mergedSeen, mergedUnreachable), mergedMembers, mergedMeta, mergedVClock)
}
/**
* Checks if we have a cluster convergence. If there are any unreachable nodes then we can't have a convergence -
* waiting for user to act (issuing DOWN) or leader to act (issuing DOWN through auto-down).
*
* @returns Some(convergedGossip) if convergence have been reached and None if not
*/
def convergence: Boolean = {
val unreachable = overview.unreachable
val seen = overview.seen
// First check that:
// 1. we don't have any members that are unreachable, or
// 2. all unreachable members in the set have status DOWN
// Else we can't continue to check for convergence
// When that is done we check that all the entries in the 'seen' table have the same vector clock version
// and that all members exists in seen table
val hasUnreachable = unreachable.nonEmpty && unreachable.exists { _.status != Down }
val allMembersInSeen = members.forall(m seen.contains(m.address))
if (hasUnreachable) false
else if (!allMembersInSeen) true
else seen.values.toSet.size == 1
}
def isLeader(address: Address): Boolean =
members.nonEmpty && (address == members.head.address)
def leader: Option[Address] = members.headOption.map(_.address)
def isSingletonCluster: Boolean = members.size == 1
/**
* Returns true if the node is UP or JOINING.
*/
def isAvailable(address: Address): Boolean = !isUnavailable(address)
def isUnavailable(address: Address): Boolean = {
val isUnreachable = overview.unreachable exists { _.address == address }
val hasUnavailableMemberStatus = members exists { m (m.address == address) && m.status.isUnavailable }
isUnreachable || hasUnavailableMemberStatus
}
def member(address: Address): Member = {
members.find(_.address == address)
.getOrElse {
overview.unreachable
.find(_.address == address)
.getOrElse(Member(address, Removed))
}
}
override def toString =
"Gossip(" +
"overview = " + overview +
", members = [" + members.mkString(", ") +
"], meta = [" + meta.mkString(", ") +
"], version = " + version +
")"
}
/**
* Sent at regular intervals for failure detection.
*/
case class Heartbeat(from: Address) extends ClusterMessage
/**
* INTERNAL API
*/
private[cluster] case class ClusterStats(
receivedGossipCount: Long = 0L,
mergeConflictCount: Long = 0L,
mergeCount: Long = 0L,
mergeDetectedCount: Long = 0L) {
def incrementReceivedGossipCount(): ClusterStats =
copy(receivedGossipCount = receivedGossipCount + 1)
def incrementMergeConflictCount(): ClusterStats =
copy(mergeConflictCount = mergeConflictCount + 1)
def incrementMergeCount(): ClusterStats =
copy(mergeCount = mergeCount + 1)
def incrementMergeDetectedCount(): ClusterStats =
copy(mergeDetectedCount = mergeDetectedCount + 1)
}
/**
* INTERNAL API.
*
* Receives Heartbeat messages and delegates to Cluster.
* Instantiated as a single instance for each Cluster - e.g. heartbeats are serialized
* to Cluster message after message, but concurrent with other types of messages.
*/
private[cluster] final class ClusterHeartbeatDaemon(cluster: Cluster) extends Actor with ActorLogging {
def receive = {
case Heartbeat(from) cluster.failureDetector heartbeat from
}
}
/*
* This actor is responsible for sending the heartbeat messages to
* other nodes. Netty blocks when sending to broken connections. This actor
* isolates sending to different nodes by using child workers for each target
* address and thereby reduce the risk of irregular heartbeats to healty
* nodes due to broken connections to other nodes.
*/
private[cluster] final class ClusterHeartbeatSender(cluster: Cluster) extends Actor with ActorLogging {
import InternalClusterAction._
/**
* Looks up and returns the remote cluster heartbeat connection for the specific address.
*/
def clusterHeartbeatConnectionFor(address: Address): ActorRef =
context.system.actorFor(RootActorPath(address) / "system" / "cluster" / "heartbeat")
val digester = MessageDigest.getInstance("MD5")
/**
* Child name is MD5 hash of the address.
* FIXME Change to URLEncode when ticket #2123 has been fixed
*/
def encodeChildName(name: String): String = {
digester update name.getBytes("UTF-8")
digester.digest.map { h "%02x".format(0xFF & h) }.mkString
}
def receive = {
case msg @ SendHeartbeat(from, to, deadline)
val workerName = encodeChildName(to.toString)
val worker = context.actorFor(workerName) match {
case notFound if notFound.isTerminated
context.actorOf(Props(new ClusterHeartbeatSenderWorker(
cluster.settings.SendCircuitBreakerSettings, clusterHeartbeatConnectionFor(to))), workerName)
case child child
}
worker ! msg
}
}
/**
* Responsible for sending [[akka.cluster.Heartbeat]] to one specific address.
*
* Netty blocks when sending to broken connections, and this actor uses
* a configurable circuit breaker to reduce connect attempts to broken
* connections.
*
* @see ClusterHeartbeatSender
*/
private[cluster] final class ClusterHeartbeatSenderWorker(
cbSettings: CircuitBreakerSettings, toRef: ActorRef)
extends Actor with ActorLogging {
import InternalClusterAction._
val breaker = CircuitBreaker(context.system.scheduler,
cbSettings.maxFailures, cbSettings.callTimeout, cbSettings.resetTimeout).
onHalfOpen(log.debug("CircuitBreaker Half-Open for: [{}]", toRef)).
onOpen(log.debug("CircuitBreaker Open for [{}]", toRef)).
onClose(log.debug("CircuitBreaker Closed for [{}]", toRef))
context.setReceiveTimeout(30 seconds)
def receive = {
case SendHeartbeat(heartbeatMsg, _, deadline)
log.debug("Cluster Node [{}] - Heartbeat to [{}]", heartbeatMsg.from, toRef)
if (!deadline.isOverdue) {
// the CircuitBreaker will measure elapsed time and open if too many long calls
try breaker.withSyncCircuitBreaker {
log.debug("Cluster Node [{}] - Heartbeat to [{}]", heartbeatMsg.from, toRef)
toRef ! heartbeatMsg
if (deadline.isOverdue) log.debug("Sending heartbeat to [{}] took longer than expected", toRef)
} catch { case e: CircuitBreakerOpenException /* skip sending heartbeat to broken connection */ }
// make sure it will cleanup when not used any more
context.setReceiveTimeout(30 seconds)
}
case ReceiveTimeout context.stop(self) // cleanup when not used
}
}
/**
* INTERNAL API.
*/
private[cluster] final class ClusterCoreSender(selfAddress: Address) extends Actor with ActorLogging {
import InternalClusterAction._
/**
* Looks up and returns the remote cluster command connection for the specific address.
*/
private def clusterCoreConnectionFor(address: Address): ActorRef =
context.system.actorFor(RootActorPath(address) / "system" / "cluster" / "core")
def receive = {
case SendClusterMessage(to, msg)
log.debug("Cluster Node [{}] - Trying to send [{}] to [{}]", selfAddress, msg.getClass.getSimpleName, to)
clusterCoreConnectionFor(to) ! msg
}
}
/**
* INTERNAL API.
*/
private[cluster] final class ClusterCore(cluster: Cluster) extends Actor with ActorLogging {
// FIXME break up the cluster constructor parameter into something that is easier to test without Cluster
import ClusterLeaderAction._
import InternalClusterAction._
import cluster.settings._
import cluster.selfAddress
import cluster.clusterScheduler
val vclockNode = VectorClock.Node(selfAddress.toString)
val selfHeartbeat = Heartbeat(selfAddress)
// note that self is not initially member,
// and the Gossip is not versioned for this 'Node' yet
var latestGossip: Gossip = Gossip()
var joinInProgress: Map[Address, Deadline] = Map.empty
var stats = ClusterStats()
val heartbeatSender = context.actorOf(Props(new ClusterHeartbeatSender(cluster)).
withDispatcher(UseDispatcher), name = "heartbeatSender")
val coreSender = context.actorOf(Props(new ClusterCoreSender(selfAddress)).
withDispatcher(UseDispatcher), name = "coreSender")
// start periodic gossip to random nodes in cluster
val gossipTask =
FixedRateTask(clusterScheduler, PeriodicTasksInitialDelay.max(GossipInterval), GossipInterval) {
self ! GossipTick
}
// start periodic heartbeat to all nodes in cluster
val heartbeatTask =
FixedRateTask(clusterScheduler, PeriodicTasksInitialDelay.max(HeartbeatInterval), HeartbeatInterval) {
self ! HeartbeatTick
}
// start periodic cluster failure detector reaping (moving nodes condemned by the failure detector to unreachable list)
val failureDetectorReaperTask =
FixedRateTask(clusterScheduler, PeriodicTasksInitialDelay.max(UnreachableNodesReaperInterval), UnreachableNodesReaperInterval) {
self ! ReapUnreachableTick
}
// start periodic leader action management (only applies for the current leader)
private val leaderActionsTask =
FixedRateTask(clusterScheduler, PeriodicTasksInitialDelay.max(LeaderActionsInterval), LeaderActionsInterval) {
self ! LeaderActionsTick
}
// start periodic publish of current state
private val publishStateTask: Option[Cancellable] =
if (PublishStateInterval == Duration.Zero) None
else Some(FixedRateTask(clusterScheduler, PeriodicTasksInitialDelay.max(PublishStateInterval), PublishStateInterval) {
self ! PublishStateTick
})
override def preStart(): Unit = {
if (AutoJoin) self ! InternalClusterAction.JoinSeedNode
}
override def postStop(): Unit = {
gossipTask.cancel()
heartbeatTask.cancel()
failureDetectorReaperTask.cancel()
leaderActionsTask.cancel()
publishStateTask foreach { _.cancel() }
}
def receive = {
case JoinSeedNode joinSeedNode()
case InitJoin initJoin()
case InitJoinAck(address) join(address)
case Failure(e: AskTimeoutException) joinSeedNodeTimeout()
case ClusterUserAction.JoinTo(address) join(address)
case ClusterUserAction.Join(address) joining(address)
case ClusterUserAction.Down(address) downing(address)
case ClusterUserAction.Leave(address) leaving(address)
case Exit(address) exiting(address)
case Remove(address) removing(address)
case msg: GossipEnvelope receiveGossip(msg)
case msg: GossipMergeConflict receiveGossipMerge(msg)
case GossipTick gossip()
case HeartbeatTick heartbeat()
case ReapUnreachableTick reapUnreachableMembers()
case LeaderActionsTick leaderActions()
case SendGossipTo(address) gossipTo(address)
case PublishStateTick publishState()
case p: Ping ping(p)
}
def joinSeedNode(): Unit = {
val seedRoutees = for (address cluster.seedNodes; if address != cluster.selfAddress)
yield self.path.toStringWithAddress(address)
if (seedRoutees.isEmpty) {
cluster join cluster.selfAddress
} else {
implicit val within = Timeout(cluster.settings.SeedNodeTimeout)
val seedRouter = context.actorOf(
Props.empty.withRouter(ScatterGatherFirstCompletedRouter(
routees = seedRoutees, within = within.duration)))
seedRouter ? InitJoin pipeTo self
seedRouter ! PoisonPill
}
}
def initJoin(): Unit = sender ! InitJoinAck(cluster.selfAddress)
def joinSeedNodeTimeout(): Unit = cluster join cluster.selfAddress
/**
* Try to join this cluster node with the node specified by 'address'.
* A 'Join(thisNodeAddress)' command is sent to the node to join.
*/
def join(address: Address): Unit = {
val localGossip = latestGossip
2012-06-28 14:52:12 +02:00
// wipe our state since a node that joins a cluster must be empty
latestGossip = Gossip()
joinInProgress = Map.empty + (address -> (Deadline.now + JoinTimeout))
// wipe the failure detector since we are starting fresh and shouldn't care about the past
cluster.failureDetector.reset()
notifyListeners(localGossip)
val command = ClusterUserAction.Join(selfAddress)
coreSender ! SendClusterMessage(address, command)
}
/**
* State transition to JOINING - new node joining.
*/
def joining(node: Address): Unit = {
val localGossip = latestGossip
val localMembers = localGossip.members
val localUnreachable = localGossip.overview.unreachable
val alreadyMember = localMembers.exists(_.address == node)
val isUnreachable = localGossip.overview.isNonDownUnreachable(node)
if (!alreadyMember && !isUnreachable) {
// remove the node from the 'unreachable' set in case it is a DOWN node that is rejoining cluster
2012-06-28 14:52:12 +02:00
val (rejoiningMember, newUnreachableMembers) = localUnreachable partition { _.address == node }
val newOverview = localGossip.overview copy (unreachable = newUnreachableMembers)
// remove the node from the failure detector if it is a DOWN node that is rejoining cluster
if (rejoiningMember.nonEmpty) cluster.failureDetector.remove(node)
// add joining node as Joining
// add self in case someone else joins before self has joined (Set discards duplicates)
val newMembers = localMembers :+ Member(node, Joining) :+ Member(selfAddress, Joining)
val newGossip = localGossip copy (overview = newOverview, members = newMembers)
val versionedGossip = newGossip :+ vclockNode
val seenVersionedGossip = versionedGossip seen selfAddress
latestGossip = seenVersionedGossip
log.debug("Cluster Node [{}] - Node [{}] is JOINING", selfAddress, node)
// treat join as initial heartbeat, so that it becomes unavailable if nothing more happens
if (node != selfAddress) {
cluster.failureDetector heartbeat node
gossipTo(node)
}
notifyListeners(localGossip)
}
}
/**
* State transition to LEAVING.
*/
def leaving(address: Address): Unit = {
val localGossip = latestGossip
if (localGossip.members.exists(_.address == address)) { // only try to update if the node is available (in the member ring)
val newMembers = localGossip.members map { member if (member.address == address) Member(address, Leaving) else member } // mark node as LEAVING
val newGossip = localGossip copy (members = newMembers)
val versionedGossip = newGossip :+ vclockNode
val seenVersionedGossip = versionedGossip seen selfAddress
latestGossip = seenVersionedGossip
log.info("Cluster Node [{}] - Marked address [{}] as LEAVING", selfAddress, address)
publishState()
notifyListeners(localGossip)
}
}
/**
* State transition to EXITING.
*/
def exiting(address: Address): Unit = {
log.info("Cluster Node [{}] - Marked node [{}] as EXITING", selfAddress, address)
// FIXME implement when we implement hand-off
}
/**
* State transition to REMOVED.
*
* This method is for now only called after the LEADER have sent a Removed message - telling the node
* to shut down himself.
*
* In the future we might change this to allow the USER to send a Removed(address) message telling an
* arbitrary node to be moved direcly from UP -> REMOVED.
*/
def removing(address: Address): Unit = {
log.info("Cluster Node [{}] - Node has been REMOVED by the leader - shutting down...", selfAddress)
publishState()
cluster.shutdown()
}
/**
* The node to DOWN is removed from the 'members' set and put in the 'unreachable' set (if not already there)
* and its status is set to DOWN. The node is also removed from the 'seen' table.
*
* The node will reside as DOWN in the 'unreachable' set until an explicit command JOIN command is sent directly
* to this node and it will then go through the normal JOINING procedure.
*/
def downing(address: Address): Unit = {
val localGossip = latestGossip
val localMembers = localGossip.members
val localOverview = localGossip.overview
val localSeen = localOverview.seen
val localUnreachableMembers = localOverview.unreachable
// 1. check if the node to DOWN is in the 'members' set
val downedMember: Option[Member] = localMembers.collectFirst {
2012-06-13 15:33:38 +02:00
case m if m.address == address m.copy(status = Down)
}
val newMembers = downedMember match {
case Some(m)
log.info("Cluster Node [{}] - Marking node [{}] as DOWN", selfAddress, m.address)
localMembers - m
case None localMembers
}
// 2. check if the node to DOWN is in the 'unreachable' set
val newUnreachableMembers =
localUnreachableMembers.map { member
// no need to DOWN members already DOWN
2012-06-13 15:33:38 +02:00
if (member.address == address && member.status != Down) {
log.info("Cluster Node [{}] - Marking unreachable node [{}] as DOWN", selfAddress, member.address)
2012-06-13 15:33:38 +02:00
member copy (status = Down)
} else member
}
// 3. add the newly DOWNED members from the 'members' (in step 1.) to the 'newUnreachableMembers' set.
val newUnreachablePlusNewlyDownedMembers = newUnreachableMembers ++ downedMember
// 4. remove nodes marked as DOWN from the 'seen' table
val newSeen = localSeen -- newUnreachablePlusNewlyDownedMembers.collect {
2012-06-13 15:33:38 +02:00
case m if m.status == Down m.address
}
// update gossip overview
val newOverview = localOverview copy (seen = newSeen, unreachable = newUnreachablePlusNewlyDownedMembers)
val newGossip = localGossip copy (overview = newOverview, members = newMembers) // update gossip
val versionedGossip = newGossip :+ vclockNode
latestGossip = versionedGossip seen selfAddress
notifyListeners(localGossip)
}
/**
* When conflicting versions of received and local [[akka.cluster.Gossip]] is detected
* it's forwarded to the leader for conflict resolution. Trying to simultaneously
* resolving conflicts at several nodes creates new conflicts. Therefore the leader resolves
* conflicts to limit divergence. To avoid overload there is also a configurable rate
* limit of how many conflicts that are handled by second. If the limit is
* exceeded the conflicting gossip messages are dropped and will reappear later.
*/
def receiveGossipMerge(merge: GossipMergeConflict): Unit = {
stats = stats.incrementMergeConflictCount
val rate = mergeRate(stats.mergeConflictCount)
if (rate <= MaxGossipMergeRate) {
receiveGossip(merge.a.copy(conversation = false))
receiveGossip(merge.b.copy(conversation = false))
// use one-way gossip from leader to reduce load of leader
def sendBack(to: Address): Unit = {
if (to != selfAddress && !latestGossip.overview.unreachable.exists(_.address == to))
oneWayGossipTo(to)
}
sendBack(merge.a.from)
sendBack(merge.b.from)
} else {
log.debug("Dropping gossip merge conflict due to rate [{}] / s ", rate)
}
}
/**
* Receive new gossip.
*/
def receiveGossip(envelope: GossipEnvelope): Unit = {
val from = envelope.from
val remoteGossip = envelope.gossip
val localGossip = latestGossip
if (remoteGossip.overview.unreachable.exists(_.address == selfAddress)) {
// FIXME how should we handle this situation?
log.debug("Received gossip with self as unreachable, from [{}]", from)
} else if (!localGossip.overview.isNonDownUnreachable(from)) {
// leader handles merge conflicts, or when they have different views of how is leader
val handleMerge = localGossip.leader == Some(selfAddress) || localGossip.leader != remoteGossip.leader
val conflict = remoteGossip.version <> localGossip.version
if (conflict && !handleMerge) {
// delegate merge resolution to leader to reduce number of simultaneous resolves,
// which will result in new conflicts
stats = stats.incrementMergeDetectedCount
log.debug("Merge conflict [{}] detected [{}] <> [{}]", stats.mergeDetectedCount, selfAddress, from)
stats = stats.incrementMergeConflictCount
val rate = mergeRate(stats.mergeConflictCount)
if (rate <= MaxGossipMergeRate) {
coreSender ! SendClusterMessage(
to = localGossip.leader.get,
msg = GossipMergeConflict(GossipEnvelope(selfAddress, localGossip), envelope))
} else {
log.debug("Skipping gossip merge conflict due to rate [{}] / s ", rate)
}
} else {
val winningGossip =
if (conflict) {
// conflicting versions, merge, and new version
val mergedGossip = remoteGossip merge localGossip
mergedGossip :+ vclockNode
} else if (remoteGossip.version < localGossip.version) {
// local gossip is newer
localGossip
} else if (!remoteGossip.members.exists(_.address == selfAddress)) {
// FIXME This is a very strange. It can happen when many nodes join at the same time.
// It's not detected as an ordinary version conflict <>
// If we don't handle this situation there will be IllegalArgumentException when marking this as seen
// merge, and new version
val mergedGossip = remoteGossip merge (localGossip :+ Member(selfAddress, Joining))
mergedGossip :+ vclockNode
} else {
// remote gossip is newer
remoteGossip
}
val newJoinInProgress =
if (joinInProgress.isEmpty) joinInProgress
else joinInProgress --
winningGossip.members.map(_.address) --
winningGossip.overview.unreachable.map(_.address)
latestGossip = winningGossip seen selfAddress
joinInProgress = newJoinInProgress
// for all new joining nodes we remove them from the failure detector
(latestGossip.members -- localGossip.members).filter(_.status == Joining).foreach {
case node cluster.failureDetector.remove(node.address)
}
log.debug("Cluster Node [{}] - Receiving gossip from [{}]", selfAddress, from)
if (conflict) {
stats = stats.incrementMergeCount
log.debug(
"""Couldn't establish a causal relationship between "remote" gossip and "local" gossip - Remote[{}] - Local[{}] - merged them into [{}]""",
remoteGossip, localGossip, winningGossip)
}
stats = stats.incrementReceivedGossipCount
notifyListeners(localGossip)
if (envelope.conversation &&
(conflict || (winningGossip ne remoteGossip) || (latestGossip ne remoteGossip))) {
// send back gossip to sender when sender had different view, i.e. merge, or sender had
// older or sender had newer
gossipTo(from)
}
}
}
}
def mergeRate(count: Long): Double = (count * 1000.0) / GossipInterval.toMillis
/**
* Initiates a new round of gossip.
*/
def gossip(): Unit = {
stats = stats.copy(mergeConflictCount = 0)
log.debug("Cluster Node [{}] - Initiating new round of gossip", selfAddress)
if (!isSingletonCluster && isAvailable) {
val localGossip = latestGossip
// important to not accidentally use `map` of the SortedSet, since the original order is not preserved
val localMembers = localGossip.members.toIndexedSeq
val localMembersSize = localMembers.size
val localMemberAddresses = localMembers map { _.address }
val localUnreachableMembers = localGossip.overview.unreachable.toIndexedSeq
val localUnreachableSize = localUnreachableMembers.size
// 1. gossip to a random alive member with preference to a member
// with older or newer gossip version
val nodesWithdifferentView = {
val localMemberAddressesSet = localGossip.members map { _.address }
for {
(address, version) localGossip.overview.seen
if localMemberAddressesSet contains address
if version != localGossip.version
} yield address
}
val gossipedToAlive =
if (nodesWithdifferentView.nonEmpty && ThreadLocalRandom.current.nextDouble() < GossipDifferentViewProbability)
gossipToRandomNodeOf(nodesWithdifferentView.toIndexedSeq)
else
gossipToRandomNodeOf(localMemberAddresses)
2012-06-25 20:46:48 +02:00
// 2. gossip to a deputy nodes for facilitating partition healing
val deputies = deputyNodes(localMemberAddresses)
val alreadyGossipedToDeputy = gossipedToAlive.map(deputies.contains(_)).getOrElse(false)
if ((!alreadyGossipedToDeputy || localMembersSize < seedNodes.size) && deputies.nonEmpty) {
val probability = gossipToDeputyProbablity(localMembersSize, localUnreachableSize, seedNodes.size)
if (ThreadLocalRandom.current.nextDouble() < probability)
gossipToRandomNodeOf(deputies)
}
}
}
/**
* Runs periodic leader actions, such as auto-downing unreachable nodes, assigning partitions etc.
*/
def leaderActions(): Unit = {
val localGossip = latestGossip
val localMembers = localGossip.members
val isLeader = localMembers.nonEmpty && (selfAddress == localMembers.head.address)
if (isLeader && isAvailable) {
// only run the leader actions if we are the LEADER and available
val localOverview = localGossip.overview
val localSeen = localOverview.seen
val localUnreachableMembers = localOverview.unreachable
val hasPartionHandoffCompletedSuccessfully: Boolean = {
// FIXME implement partion handoff and a check if it is completed - now just returns TRUE - e.g. has completed successfully
true
}
// Leader actions are as follows:
// 1. Move EXITING => REMOVED -- When all nodes have seen that the node is EXITING (convergence) - remove the nodes from the node ring and seen table
// 2. Move JOINING => UP -- When a node joins the cluster
// 3. Move LEAVING => EXITING -- When all partition handoff has completed
// 4. Move UNREACHABLE => DOWN -- When the node is in the UNREACHABLE set it can be auto-down by leader
// 5. Store away all stuff needed for the side-effecting processing in 10.
// 6. Updating the vclock version for the changes
// 7. Updating the 'seen' table
// 8. Try to update the state with the new gossip
// 9. If failure - retry
// 10. If success - run all the side-effecting processing
val (
newGossip: Gossip,
hasChangedState: Boolean,
upMembers,
exitingMembers,
removedMembers,
unreachableButNotDownedMembers) =
if (localGossip.convergence) {
// we have convergence - so we can't have unreachable nodes
// transform the node member ring - filterNot/map/map
val newMembers =
localMembers filterNot { member
// ----------------------
// 1. Move EXITING => REMOVED - e.g. remove the nodes from the 'members' set/node ring and seen table
// ----------------------
member.status == MemberStatus.Exiting
} map { member
// ----------------------
// 2. Move JOINING => UP (once all nodes have seen that this node is JOINING e.g. we have a convergence)
// ----------------------
if (member.status == Joining) member copy (status = Up)
else member
} map { member
// ----------------------
// 3. Move LEAVING => EXITING (once we have a convergence on LEAVING *and* if we have a successful partition handoff)
// ----------------------
if (member.status == Leaving && hasPartionHandoffCompletedSuccessfully) member copy (status = Exiting)
else member
}
// ----------------------
// 5. Store away all stuff needed for the side-effecting processing in 10.
// ----------------------
// Check for the need to do side-effecting on successful state change
// Repeat the checking for transitions between JOINING -> UP, LEAVING -> EXITING, EXITING -> REMOVED
// to check for state-changes and to store away removed and exiting members for later notification
// 1. check for state-changes to update
// 2. store away removed and exiting members so we can separate the pure state changes (that can be retried on collision) and the side-effecting message sending
val (removedMembers, newMembers1) = localMembers partition (_.status == Exiting)
val (upMembers, newMembers2) = newMembers1 partition (_.status == Joining)
val (exitingMembers, newMembers3) = newMembers2 partition (_.status == Leaving && hasPartionHandoffCompletedSuccessfully)
val hasChangedState = removedMembers.nonEmpty || upMembers.nonEmpty || exitingMembers.nonEmpty
// removing REMOVED nodes from the 'seen' table
val newSeen = localSeen -- removedMembers.map(_.address)
// removing REMOVED nodes from the 'unreachable' set
val newUnreachableMembers = localUnreachableMembers -- removedMembers
val newOverview = localOverview copy (seen = newSeen, unreachable = newUnreachableMembers) // update gossip overview
val newGossip = localGossip copy (members = newMembers, overview = newOverview) // update gossip
(newGossip, hasChangedState, upMembers, exitingMembers, removedMembers, Set.empty[Member])
2012-06-05 14:13:28 +02:00
} else if (AutoDown) {
// we don't have convergence - so we might have unreachable nodes
// if 'auto-down' is turned on, then try to auto-down any unreachable nodes
val newUnreachableMembers = localUnreachableMembers.map { member
// ----------------------
// 5. Move UNREACHABLE => DOWN (auto-downing by leader)
// ----------------------
if (member.status == Down) member // no need to DOWN members already DOWN
else member copy (status = Down)
}
// Check for the need to do side-effecting on successful state change
val (unreachableButNotDownedMembers, _) = localUnreachableMembers partition (_.status != Down)
// removing nodes marked as DOWN from the 'seen' table
val newSeen = localSeen -- newUnreachableMembers.collect { case m if m.status == Down m.address }
val newOverview = localOverview copy (seen = newSeen, unreachable = newUnreachableMembers) // update gossip overview
val newGossip = localGossip copy (overview = newOverview) // update gossip
(newGossip, unreachableButNotDownedMembers.nonEmpty, Set.empty[Member], Set.empty[Member], Set.empty[Member], unreachableButNotDownedMembers)
} else (localGossip, false, Set.empty[Member], Set.empty[Member], Set.empty[Member], Set.empty[Member])
if (hasChangedState) { // we have a change of state - version it and try to update
// ----------------------
// 6. Updating the vclock version for the changes
// ----------------------
val versionedGossip = newGossip :+ vclockNode
// ----------------------
// 7. Updating the 'seen' table
// Unless the leader (this node) is part of the removed members, i.e. the leader have moved himself from EXITING -> REMOVED
// ----------------------
val seenVersionedGossip =
if (removedMembers.exists(_.address == selfAddress)) versionedGossip
else versionedGossip seen selfAddress
// ----------------------
// 8. Update the state with the new gossip
// ----------------------
latestGossip = seenVersionedGossip
// ----------------------
// 9. Run all the side-effecting processing
// ----------------------
// log the move of members from joining to up
upMembers foreach { member log.info("Cluster Node [{}] - Leader is moving node [{}] from JOINING to UP", selfAddress, member.address) }
// tell all removed members to remove and shut down themselves
removedMembers foreach { member
val address = member.address
log.info("Cluster Node [{}] - Leader is moving node [{}] from EXITING to REMOVED - and removing node from node ring", selfAddress, address)
coreSender ! SendClusterMessage(
to = address,
msg = ClusterLeaderAction.Remove(address))
}
// tell all exiting members to exit
exitingMembers foreach { member
val address = member.address
log.info("Cluster Node [{}] - Leader is moving node [{}] from LEAVING to EXITING", selfAddress, address)
coreSender ! SendClusterMessage(
to = address,
msg = ClusterLeaderAction.Exit(address)) // FIXME should use ? to await completion of handoff?
}
// log the auto-downing of the unreachable nodes
unreachableButNotDownedMembers foreach { member
log.info("Cluster Node [{}] - Leader is marking unreachable node [{}] as DOWN", selfAddress, member.address)
}
notifyListeners(localGossip)
}
}
}
def heartbeat(): Unit = {
removeOverdueJoinInProgress()
val beatTo = latestGossip.members.toSeq.map(_.address) ++ joinInProgress.keys
val deadline = Deadline.now + HeartbeatInterval
for (address beatTo; if address != selfAddress)
heartbeatSender ! SendHeartbeat(selfHeartbeat, address, deadline)
}
/**
* Removes overdue joinInProgress from State.
*/
def removeOverdueJoinInProgress(): Unit = {
val overdueJoins = joinInProgress collect {
case (address, deadline) if deadline.isOverdue address
}
if (overdueJoins.nonEmpty) {
joinInProgress = joinInProgress -- overdueJoins
}
}
/**
* Reaps the unreachable members (moves them to the 'unreachable' list in the cluster overview) according to the failure detector's verdict.
*/
def reapUnreachableMembers(): Unit = {
if (!isSingletonCluster && isAvailable) {
// only scrutinize if we are a non-singleton cluster and available
val localGossip = latestGossip
val localOverview = localGossip.overview
val localMembers = localGossip.members
val localUnreachableMembers = localGossip.overview.unreachable
val newlyDetectedUnreachableMembers = localMembers filterNot { member cluster.failureDetector.isAvailable(member.address) }
if (newlyDetectedUnreachableMembers.nonEmpty) {
val newMembers = localMembers -- newlyDetectedUnreachableMembers
val newUnreachableMembers = localUnreachableMembers ++ newlyDetectedUnreachableMembers
val newOverview = localOverview copy (unreachable = newUnreachableMembers)
val newGossip = localGossip copy (overview = newOverview, members = newMembers)
// updating vclock and 'seen' table
val versionedGossip = newGossip :+ vclockNode
val seenVersionedGossip = versionedGossip seen selfAddress
latestGossip = seenVersionedGossip
log.error("Cluster Node [{}] - Marking node(s) as UNREACHABLE [{}]", selfAddress, newlyDetectedUnreachableMembers.mkString(", "))
notifyListeners(localGossip)
}
}
}
/**
* Gets the addresses of a all the 'deputy' nodes - excluding this node if part of the group.
*/
def deputyNodes(addresses: IndexedSeq[Address]): IndexedSeq[Address] =
addresses filterNot (_ == selfAddress) intersect seedNodes
def seedNodes: IndexedSeq[Address] = cluster.seedNodes
def selectRandomNode(addresses: IndexedSeq[Address]): Option[Address] =
if (addresses.isEmpty) None
else Some(addresses(ThreadLocalRandom.current nextInt addresses.size))
def isSingletonCluster: Boolean = latestGossip.isSingletonCluster
def isAvailable: Boolean = latestGossip.isAvailable(selfAddress)
/**
* Gossips latest gossip to a random member in the set of members passed in as argument.
*
* @return the used [[akka.actor.Address] if any
*/
private def gossipToRandomNodeOf(addresses: IndexedSeq[Address]): Option[Address] = {
log.debug("Cluster Node [{}] - Selecting random node to gossip to [{}]", selfAddress, addresses.mkString(", "))
val peers = addresses filterNot (_ == selfAddress) // filter out myself
val peer = selectRandomNode(peers)
peer foreach gossipTo
peer
}
private[cluster] def gossipToDeputyProbablity(membersSize: Int, unreachableSize: Int, nrOfDeputyNodes: Int): Double = {
if (nrOfDeputyNodes > membersSize) 1.0
else if (nrOfDeputyNodes == 0) 0.0
else (membersSize + unreachableSize) match {
case 0 0.0
case sum (nrOfDeputyNodes + unreachableSize).toDouble / sum
}
}
/**
* Gossips latest gossip to an address.
*/
def gossipTo(address: Address): Unit =
gossipTo(address, GossipEnvelope(selfAddress, latestGossip, conversation = true))
def oneWayGossipTo(address: Address): Unit =
gossipTo(address, GossipEnvelope(selfAddress, latestGossip, conversation = false))
def gossipTo(address: Address, gossipMsg: GossipEnvelope): Unit = if (address != selfAddress) {
coreSender ! SendClusterMessage(address, gossipMsg)
}
def notifyListeners(oldGossip: Gossip): Unit = {
if (PublishStateInterval == Duration.Zero) publishState
val oldMembersStatus = oldGossip.members.map(m (m.address, m.status))
val newMembersStatus = latestGossip.members.map(m (m.address, m.status))
if (newMembersStatus != oldMembersStatus)
cluster notifyMembershipChangeListeners latestGossip.members
}
def publishState(): Unit = {
cluster._latestGossip = latestGossip
cluster._latestStats = stats
}
def ping(p: Ping): Unit = sender ! Pong(p)
}
/**
* INTERNAL API.
*
* Supervisor managing the different Cluster daemons.
*/
private[cluster] final class ClusterDaemonSupervisor(cluster: Cluster) extends Actor with ActorLogging {
val configuredDispatcher = cluster.settings.UseDispatcher
val core = context.actorOf(Props(new ClusterCore(cluster)).
withDispatcher(configuredDispatcher), name = "core")
val heartbeat = context.actorOf(Props(new ClusterHeartbeatDaemon(cluster)).
withDispatcher(configuredDispatcher), name = "heartbeat")
def receive = Actor.emptyBehavior
override def unhandled(unknown: Any): Unit = log.error("[{}] can not respond to messages - received [{}]",
self.path, unknown)
}
/**
* Cluster Extension Id and factory for creating Cluster extension.
* Example:
* {{{
* if (Cluster(system).isLeader) { ... }
* }}}
*/
object Cluster extends ExtensionId[Cluster] with ExtensionIdProvider {
override def get(system: ActorSystem): Cluster = super.get(system)
override def lookup = Cluster
override def createExtension(system: ExtendedActorSystem): Cluster = {
val clusterSettings = new ClusterSettings(system.settings.config, system.name)
val failureDetector = {
import clusterSettings.{ FailureDetectorImplementationClass fqcn }
system.dynamicAccess.createInstanceFor[FailureDetector](
fqcn, Seq(classOf[ActorSystem] -> system, classOf[ClusterSettings] -> clusterSettings)).fold(
e throw new ConfigurationException("Could not create custom failure detector [" + fqcn + "] due to:" + e.toString),
identity)
}
new Cluster(system, failureDetector)
}
}
/**
* This module is responsible for Gossiping cluster information. The abstraction maintains the list of live
* and dead members. Periodically i.e. every 1 second this module chooses a random member and initiates a round
* of Gossip with it.
* <p/>
* During each of these runs the member initiates gossip exchange according to following rules:
* <pre>
* 1) Gossip to random live member (if any)
* 2) If the member gossiped to at (1) was not deputy, or the number of live members is less than number of deputy list,
* gossip to random deputy with certain probability depending on number of unreachable, deputy and live members.
* </pre>
*
* Example:
* {{{
* if (Cluster(system).isLeader) { ... }
* }}}
*/
class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) extends Extension { clusterNode
/**
* Represents the state for this Cluster. Implemented using optimistic lockless concurrency.
* All state is represented by this immutable case class and managed by an AtomicReference.
*/
private case class State(memberMembershipChangeListeners: Set[MembershipChangeListener] = Set.empty)
if (!system.provider.isInstanceOf[RemoteActorRefProvider])
throw new ConfigurationException("ActorSystem[" + system + "] needs to have a 'RemoteActorRefProvider' enabled in the configuration")
private val remote: RemoteActorRefProvider = system.provider.asInstanceOf[RemoteActorRefProvider]
val remoteSettings = new RemoteSettings(system.settings.config, system.name)
val settings = new ClusterSettings(system.settings.config, system.name)
import settings._
val selfAddress = remote.transport.address
private val _isRunning = new AtomicBoolean(true)
private val log = Logging(system, "Cluster")
log.info("Cluster Node [{}] - is starting up...", selfAddress)
private val state = new AtomicReference[State](State())
/**
* Read only view of cluster state, updated periodically by
* ClusterCore. Access with `latestGossip`.
*/
@volatile
private[cluster] var _latestGossip: Gossip = Gossip()
/**
* INTERNAL API
* Read only view of internal cluster stats, updated periodically by
* ClusterCore. Access with `latestStats`.
*/
@volatile
private[cluster] var _latestStats = ClusterStats()
// ========================================================
// ===================== WORK DAEMONS =====================
// ========================================================
/**
* INTERNAL API
*/
private[cluster] val clusterScheduler: Scheduler with Closeable = {
// FIXME consider moving clusterScheduler to ClusterCore actor
if (system.settings.SchedulerTickDuration > SchedulerTickDuration) {
log.info("Using a dedicated scheduler for cluster. Default scheduler can be used if configured " +
"with 'akka.scheduler.tick-duration' [{} ms] <= 'akka.cluster.scheduler.tick-duration' [{} ms].",
system.settings.SchedulerTickDuration.toMillis, SchedulerTickDuration.toMillis)
val threadFactory = system.threadFactory match {
case tf: MonitorableThreadFactory tf.copy(name = tf.name + "-cluster-scheduler")
case tf tf
}
val hwt = new HashedWheelTimer(log,
threadFactory,
SchedulerTickDuration, SchedulerTicksPerWheel)
new DefaultScheduler(hwt, log, system.dispatcher)
} else {
// delegate to system.scheduler, but don't close
val systemScheduler = system.scheduler
new Scheduler with Closeable {
// we are using system.scheduler, which we are not responsible for closing
def close(): Unit = ()
def schedule(initialDelay: Duration, frequency: Duration, receiver: ActorRef, message: Any): Cancellable =
systemScheduler.schedule(initialDelay, frequency, receiver, message)
def schedule(initialDelay: Duration, frequency: Duration)(f: Unit): Cancellable =
systemScheduler.schedule(initialDelay, frequency)(f)
def schedule(initialDelay: Duration, frequency: Duration, runnable: Runnable): Cancellable =
systemScheduler.schedule(initialDelay, frequency, runnable)
def scheduleOnce(delay: Duration, runnable: Runnable): Cancellable =
systemScheduler.scheduleOnce(delay, runnable)
def scheduleOnce(delay: Duration, receiver: ActorRef, message: Any): Cancellable =
systemScheduler.scheduleOnce(delay, receiver, message)
def scheduleOnce(delay: Duration)(f: Unit): Cancellable =
systemScheduler.scheduleOnce(delay)(f)
}
}
}
// create supervisor for daemons under path "/system/cluster"
private val clusterDaemons: ActorRef = {
implicit val timeout = Timeout(remoteSettings.RemoteSystemDaemonAckTimeout)
val createChild = CreateChild(Props(new ClusterDaemonSupervisor(this)).
withDispatcher(UseDispatcher), name = "cluster")
Await.result(system.systemGuardian ? createChild, timeout.duration) match {
case a: ActorRef a
case e: Exception throw e
}
}
/**
* INTERNAL API
*/
private[cluster] def clusterCore: ActorRef =
system.actorFor(clusterDaemons.path / "core")
system.registerOnTermination(shutdown())
log.info("Cluster Node [{}] - has started up successfully", selfAddress)
// ======================================================
// ===================== PUBLIC API =====================
// ======================================================
def self: Member = latestGossip.member(selfAddress)
/**
* Returns true if the cluster node is up and running, false if it is shut down.
*/
def isRunning: Boolean = _isRunning.get
/**
* Latest gossip.
*/
def latestGossip: Gossip = _latestGossip
/**
* Member status for this node ([[akka.cluster.MemberStatus]]).
*
* NOTE: If the node has been removed from the cluster (and shut down) then it's status is set to the 'REMOVED' tombstone state
* and is no longer present in the node ring or any other part of the gossiping state. However in order to maintain the
* model and the semantics the user would expect, this method will in this situation return `MemberStatus.Removed`.
*/
def status: MemberStatus = self.status
/**
* Is this node the leader?
*/
def isLeader: Boolean = latestGossip.isLeader(selfAddress)
/**
* Get the address of the current leader.
*/
def leader: Address = latestGossip.leader match {
case Some(x) x
case None throw new IllegalStateException("There is no leader in this cluster")
}
/**
* Is this node a singleton cluster?
*/
def isSingletonCluster: Boolean = latestGossip.isSingletonCluster
/**
* Checks if we have a cluster convergence.
*
* @return Some(convergedGossip) if convergence have been reached and None if not
*/
def convergence: Option[Gossip] = latestGossip match {
case gossip if gossip.convergence Some(gossip)
case _ None
}
/**
* Returns true if the node is UP or JOINING.
*/
def isAvailable: Boolean = latestGossip.isAvailable(selfAddress)
/**
* Make it possible to override/configure seedNodes from tests without
* specifying in config. Addresses are unknown before startup time.
*/
def seedNodes: IndexedSeq[Address] = SeedNodes
/**
* Registers a listener to subscribe to cluster membership changes.
*/
@tailrec
final def registerListener(listener: MembershipChangeListener): Unit = {
val localState = state.get
val newListeners = localState.memberMembershipChangeListeners + listener
val newState = localState copy (memberMembershipChangeListeners = newListeners)
if (!state.compareAndSet(localState, newState)) registerListener(listener) // recur
}
/**
* Unsubscribes to cluster membership changes.
*/
@tailrec
final def unregisterListener(listener: MembershipChangeListener): Unit = {
val localState = state.get
val newListeners = localState.memberMembershipChangeListeners - listener
val newState = localState copy (memberMembershipChangeListeners = newListeners)
if (!state.compareAndSet(localState, newState)) unregisterListener(listener) // recur
}
/**
* Try to join this cluster node with the node specified by 'address'.
* A 'Join(thisNodeAddress)' command is sent to the node to join.
*/
def join(address: Address): Unit =
clusterCore ! ClusterUserAction.JoinTo(address)
/**
* Send command to issue state transition to LEAVING for the node specified by 'address'.
*/
def leave(address: Address): Unit =
clusterCore ! ClusterUserAction.Leave(address)
/**
* Send command to DOWN the node specified by 'address'.
*/
def down(address: Address): Unit =
clusterCore ! ClusterUserAction.Down(address)
// ========================================================
// ===================== INTERNAL API =====================
// ========================================================
/**
* INTERNAL API.
*
* Shuts down all connections to other members, the cluster daemon and the periodic gossip and cleanup tasks.
*
* Should not called by the user. The user can issue a LEAVE command which will tell the node
* to go through graceful handoff process `LEAVE -> EXITING -> REMOVED -> SHUTDOWN`.
*/
private[cluster] def shutdown(): Unit = {
if (_isRunning.compareAndSet(true, false)) {
log.info("Cluster Node [{}] - Shutting down cluster Node and cluster daemons...", selfAddress)
// FIXME isTerminated check can be removed when ticket #2221 is fixed
// now it prevents logging if system is shutdown (or in progress of shutdown)
if (!clusterDaemons.isTerminated)
system.stop(clusterDaemons)
clusterScheduler.close()
log.info("Cluster Node [{}] - Cluster node successfully shut down", selfAddress)
}
}
/**
* INTERNAL API
*/
private[cluster] def notifyMembershipChangeListeners(members: SortedSet[Member]): Unit = {
// FIXME run callbacks async (to not block the cluster)
state.get.memberMembershipChangeListeners foreach { _ notify members }
}
/**
* INTERNAL API
*/
private[cluster] def latestStats: ClusterStats = _latestStats
// FIXME add back JMX
}