Merge pull request #23583 from akka/wip-multi-dc-merge-master-patriknw

merge wip-multi-dc-dev back to master
This commit is contained in:
Patrik Nordwall 2017-09-01 17:08:28 +02:00 committed by GitHub
commit 1e4e7cbba2
55 changed files with 4839 additions and 633 deletions

View file

@ -3,25 +3,23 @@
*/
package akka.cluster
import language.existentials
import scala.collection.immutable
import scala.concurrent.duration._
import java.util.concurrent.ThreadLocalRandom
import scala.util.control.NonFatal
import akka.actor._
import akka.annotation.InternalApi
import akka.actor.SupervisorStrategy.Stop
import akka.cluster.MemberStatus._
import akka.cluster.ClusterEvent._
import akka.dispatch.{ UnboundedMessageQueueSemantics, RequiresMessageQueue }
import scala.collection.breakOut
import akka.remote.QuarantinedEvent
import java.util.ArrayList
import java.util.Collections
import akka.pattern.ask
import akka.util.Timeout
import akka.dispatch.{ RequiresMessageQueue, UnboundedMessageQueueSemantics }
import akka.Done
import akka.pattern.ask
import akka.remote.QuarantinedEvent
import akka.util.Timeout
import scala.collection.immutable
import scala.concurrent.duration._
import scala.concurrent.Future
import scala.concurrent.Promise
import scala.util.control.NonFatal
import language.existentials
/**
* Base trait for all cluster messages. All ClusterMessage's are serializable.
@ -152,7 +150,7 @@ private[cluster] object InternalClusterAction {
final case class SendCurrentClusterState(receiver: ActorRef) extends SubscriptionMessage
sealed trait PublishMessage
final case class PublishChanges(newGossip: Gossip) extends PublishMessage
final case class PublishChanges(state: MembershipState) extends PublishMessage
final case class PublishEvent(event: ClusterDomainEvent) extends PublishMessage
final case object ExitingCompleted
@ -229,7 +227,6 @@ private[cluster] final class ClusterDaemon(settings: ClusterSettings) extends Ac
*/
private[cluster] final class ClusterCoreSupervisor extends Actor with ActorLogging
with RequiresMessageQueue[UnboundedMessageQueueSemantics] {
import InternalClusterAction._
// Important - don't use Cluster(context.system) in constructor because that would
// cause deadlock. The Cluster extension is currently being created and is waiting
@ -266,26 +263,46 @@ private[cluster] final class ClusterCoreSupervisor extends Actor with ActorLoggi
/**
* INTERNAL API.
*/
private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with ActorLogging
with RequiresMessageQueue[UnboundedMessageQueueSemantics] {
import InternalClusterAction._
val cluster = Cluster(context.system)
import cluster.{ selfAddress, selfRoles, scheduler, failureDetector }
import cluster.settings._
import cluster.InfoLogger._
protected def selfUniqueAddress = cluster.selfUniqueAddress
@InternalApi
private[cluster] object ClusterCoreDaemon {
val NumberOfGossipsBeforeShutdownWhenLeaderExits = 5
val MaxGossipsBeforeShuttingDownMyself = 5
def vclockName(node: UniqueAddress): String = s"${node.address}-${node.longUid}"
val vclockNode = VectorClock.Node(vclockName(selfUniqueAddress))
}
/**
* INTERNAL API.
*/
@InternalApi
private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with ActorLogging
with RequiresMessageQueue[UnboundedMessageQueueSemantics] {
import InternalClusterAction._
import ClusterCoreDaemon._
import MembershipState._
val cluster = Cluster(context.system)
import cluster.{ selfAddress, selfRoles, scheduler, failureDetector, crossDcFailureDetector }
import cluster.settings._
import cluster.InfoLogger._
val selfDc = cluster.selfDataCenter
protected def selfUniqueAddress = cluster.selfUniqueAddress
val vclockNode = VectorClock.Node(Gossip.vclockName(selfUniqueAddress))
val gossipTargetSelector = new GossipTargetSelector(
ReduceGossipDifferentViewProbability,
cluster.settings.MultiDataCenter.CrossDcGossipProbability)
// note that self is not initially member,
// and the Gossip is not versioned for this 'Node' yet
var latestGossip: Gossip = Gossip.empty
var membershipState = MembershipState(
Gossip.empty,
cluster.selfUniqueAddress,
cluster.settings.SelfDataCenter,
cluster.settings.MultiDataCenter.CrossDcConnections)
def latestGossip: Gossip = membershipState.latestGossip
val statsEnabled = PublishStatsInterval.isFinite
var gossipStats = GossipStats()
@ -411,8 +428,12 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
def becomeInitialized(): Unit = {
// start heartbeatSender here, and not in constructor to make sure that
// heartbeating doesn't start before Welcome is received
context.actorOf(Props[ClusterHeartbeatSender].
withDispatcher(UseDispatcher), name = "heartbeatSender")
val internalHeartbeatSenderProps = Props(new ClusterHeartbeatSender()).withDispatcher(UseDispatcher)
context.actorOf(internalHeartbeatSenderProps, name = "heartbeatSender")
val externalHeartbeatProps = Props(new CrossDcHeartbeatSender()).withDispatcher(UseDispatcher)
context.actorOf(externalHeartbeatProps, name = "crossDcHeartbeatSender")
// make sure that join process is stopped
stopSeedNodeProcess()
context.become(initialized)
@ -462,7 +483,7 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
def initJoin(): Unit = {
val selfStatus = latestGossip.member(selfUniqueAddress).status
if (Gossip.removeUnreachableWithMemberStatus.contains(selfStatus)) {
if (removeUnreachableWithMemberStatus.contains(selfStatus)) {
// prevents a Down and Exiting node from being used for joining
logInfo("Sending InitJoinNack message from node [{}] to [{}]", selfAddress, sender())
sender() ! InitJoinNack(selfAddress)
@ -544,28 +565,28 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
* Received `Join` message and replies with `Welcome` message, containing
* current gossip state, including the new joining member.
*/
def joining(node: UniqueAddress, roles: Set[String]): Unit = {
def joining(joiningNode: UniqueAddress, roles: Set[String]): Unit = {
val selfStatus = latestGossip.member(selfUniqueAddress).status
if (node.address.protocol != selfAddress.protocol)
if (joiningNode.address.protocol != selfAddress.protocol)
log.warning(
"Member with wrong protocol tried to join, but was ignored, expected [{}] but was [{}]",
selfAddress.protocol, node.address.protocol)
else if (node.address.system != selfAddress.system)
selfAddress.protocol, joiningNode.address.protocol)
else if (joiningNode.address.system != selfAddress.system)
log.warning(
"Member with wrong ActorSystem name tried to join, but was ignored, expected [{}] but was [{}]",
selfAddress.system, node.address.system)
else if (Gossip.removeUnreachableWithMemberStatus.contains(selfStatus))
logInfo("Trying to join [{}] to [{}] member, ignoring. Use a member that is Up instead.", node, selfStatus)
selfAddress.system, joiningNode.address.system)
else if (removeUnreachableWithMemberStatus.contains(selfStatus))
logInfo("Trying to join [{}] to [{}] member, ignoring. Use a member that is Up instead.", joiningNode, selfStatus)
else {
val localMembers = latestGossip.members
// check by address without uid to make sure that node with same host:port is not allowed
// to join until previous node with that host:port has been removed from the cluster
localMembers.find(_.address == node.address) match {
case Some(m) if m.uniqueAddress == node
localMembers.find(_.address == joiningNode.address) match {
case Some(m) if m.uniqueAddress == joiningNode
// node retried join attempt, probably due to lost Welcome message
logInfo("Existing member [{}] is joining again.", m)
if (node != selfUniqueAddress)
if (joiningNode != selfUniqueAddress)
sender() ! Welcome(selfUniqueAddress, latestGossip)
case Some(m)
// node restarted, same host:port as existing member, but with different uid
@ -584,23 +605,24 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
}
case None
// remove the node from the failure detector
failureDetector.remove(node.address)
failureDetector.remove(joiningNode.address)
crossDcFailureDetector.remove(joiningNode.address)
// add joining node as Joining
// add self in case someone else joins before self has joined (Set discards duplicates)
val newMembers = localMembers + Member(node, roles) + Member(selfUniqueAddress, cluster.selfRoles)
val newMembers = localMembers + Member(joiningNode, roles) + Member(selfUniqueAddress, cluster.selfRoles)
val newGossip = latestGossip copy (members = newMembers)
updateLatestGossip(newGossip)
logInfo("Node [{}] is JOINING, roles [{}]", node.address, roles.mkString(", "))
if (node == selfUniqueAddress) {
logInfo("Node [{}] is JOINING, roles [{}]", joiningNode.address, roles.mkString(", "))
if (joiningNode == selfUniqueAddress) {
if (localMembers.isEmpty)
leaderActions() // important for deterministic oldest when bootstrapping
} else
sender() ! Welcome(selfUniqueAddress, latestGossip)
publish(latestGossip)
publishMembershipState()
}
}
}
@ -613,10 +635,10 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
if (joinWith != from.address)
logInfo("Ignoring welcome from [{}] when trying to join with [{}]", from.address, joinWith)
else {
membershipState = membershipState.copy(latestGossip = gossip).seen()
logInfo("Welcome from [{}]", from.address)
latestGossip = gossip seen selfUniqueAddress
assertLatestGossip()
publish(latestGossip)
publishMembershipState()
if (from != selfUniqueAddress)
gossipTo(from, sender())
becomeInitialized()
@ -637,7 +659,7 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
updateLatestGossip(newGossip)
logInfo("Marked address [{}] as [{}]", address, Leaving)
publish(latestGossip)
publishMembershipState()
// immediate gossip to speed up the leaving process
gossip()
}
@ -648,9 +670,9 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
// ExitingCompleted sent via CoordinatedShutdown to continue the leaving process.
exitingTasksInProgress = false
// mark as seen
latestGossip = latestGossip seen selfUniqueAddress
membershipState = membershipState.seen()
assertLatestGossip()
publish(latestGossip)
publishMembershipState()
// Let others know (best effort) before shutdown. Otherwise they will not see
// convergence of the Exiting state until they have detected this node as
@ -663,11 +685,12 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
gossipRandomN(NumberOfGossipsBeforeShutdownWhenLeaderExits)
// send ExitingConfirmed to two potential leaders
val membersWithoutSelf = latestGossip.members.filterNot(_.uniqueAddress == selfUniqueAddress)
latestGossip.leaderOf(membersWithoutSelf, selfUniqueAddress) match {
val membersExceptSelf = latestGossip.members.filter(_.uniqueAddress != selfUniqueAddress)
membershipState.leaderOf(membersExceptSelf) match {
case Some(node1)
clusterCore(node1.address) ! ExitingConfirmed(selfUniqueAddress)
latestGossip.leaderOf(membersWithoutSelf.filterNot(_.uniqueAddress == node1), selfUniqueAddress) match {
membershipState.leaderOf(membersExceptSelf.filterNot(_.uniqueAddress == node1)) match {
case Some(node2)
clusterCore(node2.address) ! ExitingConfirmed(selfUniqueAddress)
case None // no more potential leader
@ -704,29 +727,19 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
def downing(address: Address): Unit = {
val localGossip = latestGossip
val localMembers = localGossip.members
val localOverview = localGossip.overview
val localSeen = localOverview.seen
val localReachability = localOverview.reachability
val localReachability = membershipState.dcReachability
// check if the node to DOWN is in the `members` set
localMembers.find(_.address == address) match {
case Some(m) if (m.status != Down)
case Some(m) if m.status != Down
if (localReachability.isReachable(m.uniqueAddress))
logInfo("Marking node [{}] as [{}]", m.address, Down)
else
logInfo("Marking unreachable node [{}] as [{}]", m.address, Down)
// replace member (changed status)
val newMembers = localMembers - m + m.copy(status = Down)
// remove nodes marked as DOWN from the `seen` table
val newSeen = localSeen - m.uniqueAddress
// update gossip overview
val newOverview = localOverview copy (seen = newSeen)
val newGossip = localGossip copy (members = newMembers, overview = newOverview) // update gossip
val newGossip = localGossip.markAsDown(m)
updateLatestGossip(newGossip)
publish(latestGossip)
publishMembershipState()
case Some(_) // already down
case None
logInfo("Ignoring down of unknown node [{}]", address)
@ -744,14 +757,14 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
log.warning(
"Cluster Node [{}] - Marking node as TERMINATED [{}], due to quarantine. Node roles [{}]",
selfAddress, node.address, selfRoles.mkString(","))
publish(latestGossip)
publishMembershipState()
downing(node.address)
}
}
def receiveGossipStatus(status: GossipStatus): Unit = {
val from = status.from
if (!latestGossip.overview.reachability.isReachable(selfUniqueAddress, from))
if (!latestGossip.isReachable(selfUniqueAddress, from))
logInfo("Ignoring received gossip status from unreachable [{}] ", from)
else if (latestGossip.members.forall(_.uniqueAddress != from))
log.debug("Cluster Node [{}] - Ignoring received gossip status from unknown [{}]", selfAddress, from)
@ -778,6 +791,7 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
* Receive new gossip.
*/
def receiveGossip(envelope: GossipEnvelope): ReceiveGossipType = {
val from = envelope.from
val remoteGossip = envelope.gossip
val localGossip = latestGossip
@ -788,7 +802,7 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
} else if (envelope.to != selfUniqueAddress) {
logInfo("Ignoring received gossip intended for someone else, from [{}] to [{}]", from.address, envelope.to)
Ignored
} else if (!localGossip.overview.reachability.isReachable(selfUniqueAddress, from)) {
} else if (!localGossip.isReachable(selfUniqueAddress, from)) {
logInfo("Ignoring received gossip from unreachable [{}] ", from)
Ignored
} else if (localGossip.members.forall(_.uniqueAddress != from)) {
@ -819,16 +833,16 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
// Perform the same pruning (clear of VectorClock) as the leader did when removing a member.
// Removal of member itself is handled in merge (pickHighestPriority)
val prunedLocalGossip = localGossip.members.foldLeft(localGossip) { (g, m)
if (Gossip.removeUnreachableWithMemberStatus(m.status) && !remoteGossip.members.contains(m)) {
if (removeUnreachableWithMemberStatus(m.status) && !remoteGossip.members.contains(m)) {
log.debug("Cluster Node [{}] - Pruned conflicting local gossip: {}", selfAddress, m)
g.prune(VectorClock.Node(vclockName(m.uniqueAddress)))
g.prune(VectorClock.Node(Gossip.vclockName(m.uniqueAddress)))
} else
g
}
val prunedRemoteGossip = remoteGossip.members.foldLeft(remoteGossip) { (g, m)
if (Gossip.removeUnreachableWithMemberStatus(m.status) && !localGossip.members.contains(m)) {
if (removeUnreachableWithMemberStatus(m.status) && !localGossip.members.contains(m)) {
log.debug("Cluster Node [{}] - Pruned conflicting remote gossip: {}", selfAddress, m)
g.prune(VectorClock.Node(vclockName(m.uniqueAddress)))
g.prune(VectorClock.Node(Gossip.vclockName(m.uniqueAddress)))
} else
g
}
@ -839,20 +853,23 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
// Don't mark gossip state as seen while exiting is in progress, e.g.
// shutting down singleton actors. This delays removal of the member until
// the exiting tasks have been completed.
if (exitingTasksInProgress)
latestGossip = winningGossip
else
latestGossip = winningGossip seen selfUniqueAddress
membershipState = membershipState.copy(latestGossip =
if (exitingTasksInProgress) winningGossip
else winningGossip seen selfUniqueAddress)
assertLatestGossip()
// for all new joining nodes we remove them from the failure detector
latestGossip.members foreach {
node if (node.status == Joining && !localGossip.members(node)) failureDetector.remove(node.address)
node
if (node.status == Joining && !localGossip.members(node)) {
failureDetector.remove(node.address)
crossDcFailureDetector.remove(node.address)
}
}
log.debug("Cluster Node [{}] - Receiving gossip from [{}]", selfAddress, from)
if (comparison == VectorClock.Concurrent) {
if (comparison == VectorClock.Concurrent && cluster.settings.Debug.VerboseGossipLogging) {
log.debug(
"""Couldn't establish a causal relationship between "remote" gossip and "local" gossip - Remote[{}] - Local[{}] - merged them into [{}]""",
remoteGossip, localGossip, winningGossip)
@ -868,7 +885,7 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
}
}
publish(latestGossip)
publishMembershipState()
val selfStatus = latestGossip.member(selfUniqueAddress).status
if (selfStatus == Exiting && !exitingTasksInProgress) {
@ -908,98 +925,35 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
*/
def gossipRandomN(n: Int): Unit = {
if (!isSingletonCluster && n > 0) {
val localGossip = latestGossip
// using ArrayList to be able to shuffle
val possibleTargets = new ArrayList[UniqueAddress](localGossip.members.size)
localGossip.members.foreach { m
if (validNodeForGossip(m.uniqueAddress))
possibleTargets.add(m.uniqueAddress)
}
val randomTargets =
if (possibleTargets.size <= n)
possibleTargets
else {
Collections.shuffle(possibleTargets, ThreadLocalRandom.current())
possibleTargets.subList(0, n)
}
val iter = randomTargets.iterator
while (iter.hasNext)
gossipTo(iter.next())
gossipTargetSelector.randomNodesForFullGossip(membershipState, n).foreach(gossipTo)
}
}
/**
* Initiates a new round of gossip.
*/
def gossip(): Unit = {
def gossip(): Unit =
if (!isSingletonCluster) {
val localGossip = latestGossip
val preferredGossipTargets: Vector[UniqueAddress] =
if (ThreadLocalRandom.current.nextDouble() < adjustedGossipDifferentViewProbability) {
// If it's time to try to gossip to some nodes with a different view
// gossip to a random alive member with preference to a member with older gossip version
localGossip.members.collect {
case m if !localGossip.seenByNode(m.uniqueAddress) && validNodeForGossip(m.uniqueAddress)
m.uniqueAddress
}(breakOut)
} else Vector.empty
if (preferredGossipTargets.nonEmpty) {
val peer = selectRandomNode(preferredGossipTargets)
// send full gossip because it has different view
peer foreach gossipTo
} else {
// Fall back to localGossip; important to not accidentally use `map` of the SortedSet, since the original order is not preserved)
val peer = selectRandomNode(localGossip.members.toIndexedSeq.collect {
case m if validNodeForGossip(m.uniqueAddress) m.uniqueAddress
})
peer foreach { node
if (localGossip.seenByNode(node)) gossipStatusTo(node)
else gossipTo(node)
}
gossipTargetSelector.gossipTarget(membershipState) match {
case Some(peer)
if (!membershipState.isInSameDc(peer) || latestGossip.seenByNode(peer))
// avoid transferring the full state if possible
gossipStatusTo(peer)
else
gossipTo(peer)
case None // nothing to see here
}
}
}
/**
* For large clusters we should avoid shooting down individual
* nodes. Therefore the probability is reduced for large clusters.
*/
def adjustedGossipDifferentViewProbability: Double = {
val size = latestGossip.members.size
val low = ReduceGossipDifferentViewProbability
val high = low * 3
// start reduction when cluster is larger than configured ReduceGossipDifferentViewProbability
if (size <= low)
GossipDifferentViewProbability
else {
// don't go lower than 1/10 of the configured GossipDifferentViewProbability
val minP = GossipDifferentViewProbability / 10
if (size >= high)
minP
else {
// linear reduction of the probability with increasing number of nodes
// from ReduceGossipDifferentViewProbability at ReduceGossipDifferentViewProbability nodes
// to ReduceGossipDifferentViewProbability / 10 at ReduceGossipDifferentViewProbability * 3 nodes
// i.e. default from 0.8 at 400 nodes, to 0.08 at 1600 nodes
val k = (minP - GossipDifferentViewProbability) / (high - low)
GossipDifferentViewProbability + (size - low) * k
}
}
}
/**
* Runs periodic leader actions, such as member status transitions, assigning partitions etc.
*/
def leaderActions(): Unit = {
if (latestGossip.isLeader(selfUniqueAddress, selfUniqueAddress)) {
// only run the leader actions if we are the LEADER
if (membershipState.isLeader(selfUniqueAddress)) {
// only run the leader actions if we are the LEADER of the data center
val firstNotice = 20
val periodicNotice = 60
if (latestGossip.convergence(selfUniqueAddress, exitingConfirmed)) {
if (membershipState.convergence(exitingConfirmed)) {
if (leaderActionCounter >= firstNotice)
logInfo("Leader can perform its duties again")
leaderActionCounter = 0
@ -1012,9 +966,11 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
if (leaderActionCounter == firstNotice || leaderActionCounter % periodicNotice == 0)
logInfo(
"Leader can currently not perform its duties, reachability status: [{}], member status: [{}]",
latestGossip.reachabilityExcludingDownedObservers,
latestGossip.members.map(m
s"${m.address} ${m.status} seen=${latestGossip.seenByNode(m.uniqueAddress)}").mkString(", "))
membershipState.dcReachabilityExcludingDownedObservers,
latestGossip.members.collect {
case m if m.dataCenter == selfDc
s"${m.address} ${m.status} seen=${latestGossip.seenByNode(m.uniqueAddress)}"
}.mkString(", "))
}
}
cleanupExitingConfirmed()
@ -1025,8 +981,8 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
if (latestGossip.member(selfUniqueAddress).status == Down) {
// When all reachable have seen the state this member will shutdown itself when it has
// status Down. The down commands should spread before we shutdown.
val unreachable = latestGossip.overview.reachability.allUnreachableOrTerminated
val downed = latestGossip.members.collect { case m if m.status == Down m.uniqueAddress }
val unreachable = membershipState.dcReachability.allUnreachableOrTerminated
val downed = membershipState.dcMembers.collect { case m if m.status == Down m.uniqueAddress }
if (downed.forall(node unreachable(node) || latestGossip.seenByNode(node))) {
// the reason for not shutting down immediately is to give the gossip a chance to spread
// the downing information to other downed nodes, so that they can shutdown themselves
@ -1059,95 +1015,85 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
* 9. Update the state with the new gossip
*/
def leaderActionsOnConvergence(): Unit = {
val localGossip = latestGossip
val localMembers = localGossip.members
val localOverview = localGossip.overview
val localSeen = localOverview.seen
val enoughMembers: Boolean = isMinNrOfMembersFulfilled
def isJoiningToUp(m: Member): Boolean = (m.status == Joining || m.status == WeaklyUp) && enoughMembers
val removedUnreachable = for {
node localOverview.reachability.allUnreachableOrTerminated
m = localGossip.member(node)
if Gossip.removeUnreachableWithMemberStatus(m.status)
node membershipState.dcReachability.allUnreachableOrTerminated
m = latestGossip.member(node)
if m.dataCenter == selfDc && removeUnreachableWithMemberStatus(m.status)
} yield m
val removedExitingConfirmed = exitingConfirmed.filter(n localGossip.member(n).status == Exiting)
val removedExitingConfirmed = exitingConfirmed.filter { n
val member = latestGossip.member(n)
member.dataCenter == selfDc && member.status == Exiting
}
val changedMembers = localMembers collect {
var upNumber = 0
val changedMembers = {
val enoughMembers: Boolean = isMinNrOfMembersFulfilled
def isJoiningToUp(m: Member): Boolean = (m.status == Joining || m.status == WeaklyUp) && enoughMembers
{
case m if isJoiningToUp(m)
// Move JOINING => UP (once all nodes have seen that this node is JOINING, i.e. we have a convergence)
// and minimum number of nodes have joined the cluster
if (upNumber == 0) {
// It is alright to use same upNumber as already used by a removed member, since the upNumber
// is only used for comparing age of current cluster members (Member.isOlderThan)
val youngest = localGossip.youngestMember
upNumber = 1 + (if (youngest.upNumber == Int.MaxValue) 0 else youngest.upNumber)
} else {
upNumber += 1
}
m.copyUp(upNumber)
latestGossip.members collect {
var upNumber = 0
case m if m.status == Leaving
// Move LEAVING => EXITING (once we have a convergence on LEAVING)
m copy (status = Exiting)
{
case m if m.dataCenter == selfDc && isJoiningToUp(m)
// Move JOINING => UP (once all nodes have seen that this node is JOINING, i.e. we have a convergence)
// and minimum number of nodes have joined the cluster
if (upNumber == 0) {
// It is alright to use same upNumber as already used by a removed member, since the upNumber
// is only used for comparing age of current cluster members (Member.isOlderThan)
val youngest = latestGossip.youngestMember
upNumber = 1 + (if (youngest.upNumber == Int.MaxValue) 0 else youngest.upNumber)
} else {
upNumber += 1
}
m.copyUp(upNumber)
case m if m.dataCenter == selfDc && m.status == Leaving
// Move LEAVING => EXITING (once we have a convergence on LEAVING)
m copy (status = Exiting)
}
}
}
if (removedUnreachable.nonEmpty || removedExitingConfirmed.nonEmpty || changedMembers.nonEmpty) {
// handle changes
val updatedGossip: Gossip =
if (removedUnreachable.nonEmpty || removedExitingConfirmed.nonEmpty || changedMembers.nonEmpty) {
// replace changed members
val newMembers = changedMembers.union(localMembers).diff(removedUnreachable)
.filterNot(m removedExitingConfirmed(m.uniqueAddress))
// replace changed members
val removed = removedUnreachable.map(_.uniqueAddress).union(removedExitingConfirmed)
val newGossip =
latestGossip.update(changedMembers).removeAll(removed, System.currentTimeMillis())
// removing REMOVED nodes from the `seen` table
val removed = removedUnreachable.map(_.uniqueAddress).union(removedExitingConfirmed)
val newSeen = localSeen diff removed
// removing REMOVED nodes from the `reachability` table
val newReachability = localOverview.reachability.remove(removed)
val newOverview = localOverview copy (seen = newSeen, reachability = newReachability)
// Clear the VectorClock when member is removed. The change made by the leader is stamped
// and will propagate as is if there are no other changes on other nodes.
// If other concurrent changes on other nodes (e.g. join) the pruning is also
// taken care of when receiving gossips.
val newVersion = removed.foldLeft(localGossip.version) { (v, node)
v.prune(VectorClock.Node(vclockName(node)))
}
val newGossip = localGossip copy (members = newMembers, overview = newOverview, version = newVersion)
if (!exitingTasksInProgress && newGossip.member(selfUniqueAddress).status == Exiting) {
// Leader is moving itself from Leaving to Exiting.
// ExitingCompleted will be received via CoordinatedShutdown to continue
// the leaving process. Meanwhile the gossip state is not marked as seen.
exitingTasksInProgress = true
logInfo("Exiting (leader), starting coordinated shutdown")
selfExiting.trySuccess(Done)
coordShutdown.run()
}
if (!exitingTasksInProgress && newGossip.member(selfUniqueAddress).status == Exiting) {
// Leader is moving itself from Leaving to Exiting.
// ExitingCompleted will be received via CoordinatedShutdown to continue
// the leaving process. Meanwhile the gossip state is not marked as seen.
exitingTasksInProgress = true
logInfo("Exiting (leader), starting coordinated shutdown")
selfExiting.trySuccess(Done)
coordShutdown.run()
}
exitingConfirmed = exitingConfirmed.filterNot(removedExitingConfirmed)
updateLatestGossip(newGossip)
exitingConfirmed = exitingConfirmed.filterNot(removedExitingConfirmed)
changedMembers foreach { m
logInfo("Leader is moving node [{}] to [{}]", m.address, m.status)
}
removedUnreachable foreach { m
val status = if (m.status == Exiting) "exiting" else "unreachable"
logInfo("Leader is removing {} node [{}]", status, m.address)
}
removedExitingConfirmed.foreach { n
logInfo("Leader is removing confirmed Exiting node [{}]", n.address)
}
// log status changes
changedMembers foreach { m
logInfo("Leader is moving node [{}] to [{}]", m.address, m.status)
}
newGossip
} else
latestGossip
// log the removal of the unreachable nodes
removedUnreachable foreach { m
val status = if (m.status == Exiting) "exiting" else "unreachable"
logInfo("Leader is removing {} node [{}]", status, m.address)
}
removedExitingConfirmed.foreach { n
logInfo("Leader is removing confirmed Exiting node [{}]", n.address)
}
publish(latestGossip)
val pruned = updatedGossip.pruneTombstones(System.currentTimeMillis() - PruneGossipTombstonesAfter.toMillis)
if (pruned ne latestGossip) {
updateLatestGossip(pruned)
publishMembershipState()
}
}
@ -1157,7 +1103,10 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
val enoughMembers: Boolean = isMinNrOfMembersFulfilled
def isJoiningToWeaklyUp(m: Member): Boolean =
m.status == Joining && enoughMembers && latestGossip.reachabilityExcludingDownedObservers.isReachable(m.uniqueAddress)
m.dataCenter == selfDc &&
m.status == Joining &&
enoughMembers &&
membershipState.dcReachabilityExcludingDownedObservers.isReachable(m.uniqueAddress)
val changedMembers = localMembers.collect {
case m if isJoiningToWeaklyUp(m) m.copy(status = WeaklyUp)
}
@ -1173,7 +1122,7 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
logInfo("Leader is moving node [{}] to [{}]", m.address, m.status)
}
publish(latestGossip)
publishMembershipState()
}
}
@ -1189,24 +1138,29 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
val localOverview = localGossip.overview
val localMembers = localGossip.members
def isAvailable(member: Member): Boolean = {
if (member.dataCenter == SelfDataCenter) failureDetector.isAvailable(member.address)
else crossDcFailureDetector.isAvailable(member.address)
}
val newlyDetectedUnreachableMembers = localMembers filterNot { member
member.uniqueAddress == selfUniqueAddress ||
localOverview.reachability.status(selfUniqueAddress, member.uniqueAddress) == Reachability.Unreachable ||
localOverview.reachability.status(selfUniqueAddress, member.uniqueAddress) == Reachability.Terminated ||
failureDetector.isAvailable(member.address)
isAvailable(member)
}
val newlyDetectedReachableMembers = localOverview.reachability.allUnreachableFrom(selfUniqueAddress) collect {
case node if node != selfUniqueAddress && failureDetector.isAvailable(node.address)
case node if node != selfUniqueAddress && isAvailable(localGossip.member(node))
localGossip.member(node)
}
if (newlyDetectedUnreachableMembers.nonEmpty || newlyDetectedReachableMembers.nonEmpty) {
val newReachability1 = (localOverview.reachability /: newlyDetectedUnreachableMembers) {
val newReachability1 = newlyDetectedUnreachableMembers.foldLeft(localOverview.reachability) {
(reachability, m) reachability.unreachable(selfUniqueAddress, m.uniqueAddress)
}
val newReachability2 = (newReachability1 /: newlyDetectedReachableMembers) {
val newReachability2 = newlyDetectedReachableMembers.foldLeft(newReachability1) {
(reachability, m) reachability.reachable(selfUniqueAddress, m.uniqueAddress)
}
@ -1226,16 +1180,12 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
if (newlyDetectedReachableMembers.nonEmpty)
logInfo("Marking node(s) as REACHABLE [{}]. Node roles [{}]", newlyDetectedReachableMembers.mkString(", "), selfRoles.mkString(","))
publish(latestGossip)
publishMembershipState()
}
}
}
}
def selectRandomNode(nodes: IndexedSeq[UniqueAddress]): Option[UniqueAddress] =
if (nodes.isEmpty) None
else Some(nodes(ThreadLocalRandom.current nextInt nodes.size))
def isSingletonCluster: Boolean = latestGossip.isSingletonCluster
// needed for tests
@ -1249,40 +1199,38 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
* Gossips latest gossip to a node.
*/
def gossipTo(node: UniqueAddress): Unit =
if (validNodeForGossip(node))
if (membershipState.validNodeForGossip(node))
clusterCore(node.address) ! GossipEnvelope(selfUniqueAddress, node, latestGossip)
def gossipTo(node: UniqueAddress, destination: ActorRef): Unit =
if (validNodeForGossip(node))
if (membershipState.validNodeForGossip(node))
destination ! GossipEnvelope(selfUniqueAddress, node, latestGossip)
def gossipStatusTo(node: UniqueAddress, destination: ActorRef): Unit =
if (validNodeForGossip(node))
if (membershipState.validNodeForGossip(node))
destination ! GossipStatus(selfUniqueAddress, latestGossip.version)
def gossipStatusTo(node: UniqueAddress): Unit =
if (validNodeForGossip(node))
if (membershipState.validNodeForGossip(node))
clusterCore(node.address) ! GossipStatus(selfUniqueAddress, latestGossip.version)
def validNodeForGossip(node: UniqueAddress): Boolean =
(node != selfUniqueAddress && latestGossip.hasMember(node) &&
latestGossip.reachabilityExcludingDownedObservers.isReachable(node))
def updateLatestGossip(newGossip: Gossip): Unit = {
def updateLatestGossip(gossip: Gossip): Unit = {
// Updating the vclock version for the changes
val versionedGossip = newGossip :+ vclockNode
val versionedGossip = gossip :+ vclockNode
// Don't mark gossip state as seen while exiting is in progress, e.g.
// shutting down singleton actors. This delays removal of the member until
// the exiting tasks have been completed.
if (exitingTasksInProgress)
latestGossip = versionedGossip.clearSeen()
else {
// Nobody else has seen this gossip but us
val seenVersionedGossip = versionedGossip onlySeen (selfUniqueAddress)
// Update the state with the new gossip
latestGossip = seenVersionedGossip
}
val newGossip =
if (exitingTasksInProgress)
versionedGossip.clearSeen()
else {
// Nobody else has seen this gossip but us
val seenVersionedGossip = versionedGossip onlySeen (selfUniqueAddress)
// Update the state with the new gossip
seenVersionedGossip
}
membershipState = membershipState.copy(newGossip)
assertLatestGossip()
}
@ -1290,8 +1238,11 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
if (Cluster.isAssertInvariantsEnabled && latestGossip.version.versions.size > latestGossip.members.size)
throw new IllegalStateException(s"Too many vector clock entries in gossip state ${latestGossip}")
def publish(newGossip: Gossip): Unit = {
publisher ! PublishChanges(newGossip)
def publishMembershipState(): Unit = {
if (cluster.settings.Debug.VerboseGossipLogging)
log.debug("Cluster Node [{}] dc [{}] - New gossip published [{}]", selfAddress, cluster.settings.SelfDataCenter, membershipState.latestGossip)
publisher ! PublishChanges(membershipState)
if (PublishStatsInterval == Duration.Zero) publishInternalStats()
}