Refactoring of Gossip class, #23290

* move methods that depends on selfUniqueAddress and selfDc
  to a separate MembershipState class, which also holds the
  latest gossip
* this removes the need to pass in the parameters from everywhere and
  makes it easier to cache some results
* makes it clear that those parameters are always selfUniqueAddress
  and selfDc, instead of some arbitary node/dc
This commit is contained in:
Patrik Nordwall 2017-07-04 21:58:03 +02:00
parent dee14c5b20
commit 867cc97bdd
10 changed files with 386 additions and 321 deletions

View file

@ -13,6 +13,7 @@ import akka.actor._
import akka.actor.SupervisorStrategy.Stop
import akka.cluster.MemberStatus._
import akka.cluster.ClusterEvent._
import akka.cluster.ClusterSettings.DataCenter
import akka.dispatch.{ RequiresMessageQueue, UnboundedMessageQueueSemantics }
import scala.collection.breakOut
@ -157,7 +158,7 @@ private[cluster] object InternalClusterAction {
final case class SendCurrentClusterState(receiver: ActorRef) extends SubscriptionMessage
sealed trait PublishMessage
final case class PublishChanges(newGossip: Gossip) extends PublishMessage
final case class PublishChanges(state: MembershipState) extends PublishMessage
final case class PublishEvent(event: ClusterDomainEvent) extends PublishMessage
final case object ExitingCompleted
@ -277,6 +278,7 @@ private[cluster] object ClusterCoreDaemon {
val NumberOfGossipsBeforeShutdownWhenLeaderExits = 5
val MaxGossipsBeforeShuttingDownMyself = 5
}
/**
@ -287,6 +289,7 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
with RequiresMessageQueue[UnboundedMessageQueueSemantics] {
import InternalClusterAction._
import ClusterCoreDaemon._
import MembershipState._
val cluster = Cluster(context.system)
import cluster.{ selfAddress, selfRoles, scheduler, failureDetector }
@ -299,7 +302,8 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
// note that self is not initially member,
// and the Gossip is not versioned for this 'Node' yet
var latestGossip: Gossip = Gossip.empty
var membershipState = MembershipState(Gossip.empty, cluster.selfUniqueAddress, cluster.settings.DataCenter)
def latestGossip: Gossip = membershipState.latestGossip
val statsEnabled = PublishStatsInterval.isFinite
var gossipStats = GossipStats()
@ -478,7 +482,7 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
def initJoin(): Unit = {
val selfStatus = latestGossip.member(selfUniqueAddress).status
if (Gossip.removeUnreachableWithMemberStatus.contains(selfStatus)) {
if (removeUnreachableWithMemberStatus.contains(selfStatus)) {
// prevents a Down and Exiting node from being used for joining
logInfo("Sending InitJoinNack message from node [{}] to [{}]", selfAddress, sender())
sender() ! InitJoinNack(selfAddress)
@ -570,7 +574,7 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
log.warning(
"Member with wrong ActorSystem name tried to join, but was ignored, expected [{}] but was [{}]",
selfAddress.system, joiningNode.address.system)
else if (Gossip.removeUnreachableWithMemberStatus.contains(selfStatus))
else if (removeUnreachableWithMemberStatus.contains(selfStatus))
logInfo("Trying to join [{}] to [{}] member, ignoring. Use a member that is Up instead.", joiningNode, selfStatus)
else {
val localMembers = latestGossip.members
@ -616,7 +620,7 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
} else
sender() ! Welcome(selfUniqueAddress, latestGossip)
publish(latestGossip)
publishMembershipState()
}
}
}
@ -629,10 +633,10 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
if (joinWith != from.address)
logInfo("Ignoring welcome from [{}] when trying to join with [{}]", from.address, joinWith)
else {
latestGossip = gossip seen selfUniqueAddress
membershipState = membershipState.copy(latestGossip = gossip).seen()
logInfo("Welcome from [{}]", from.address)
assertLatestGossip()
publish(latestGossip)
publishMembershipState()
if (from != selfUniqueAddress)
gossipTo(from, sender())
becomeInitialized()
@ -653,7 +657,7 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
updateLatestGossip(newGossip)
logInfo("Marked address [{}] as [{}]", address, Leaving)
publish(latestGossip)
publishMembershipState()
// immediate gossip to speed up the leaving process
gossip()
}
@ -664,9 +668,9 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
// ExitingCompleted sent via CoordinatedShutdown to continue the leaving process.
exitingTasksInProgress = false
// mark as seen
latestGossip = latestGossip seen selfUniqueAddress
membershipState = membershipState.seen()
assertLatestGossip()
publish(latestGossip)
publishMembershipState()
// Let others know (best effort) before shutdown. Otherwise they will not see
// convergence of the Exiting state until they have detected this node as
@ -681,10 +685,10 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
// send ExitingConfirmed to two potential leaders
val membersExceptSelf = latestGossip.members.filter(_.uniqueAddress != selfUniqueAddress)
latestGossip.leaderOf(selfDc, membersExceptSelf, selfUniqueAddress) match {
membershipState.leaderOf(membersExceptSelf) match {
case Some(node1)
clusterCore(node1.address) ! ExitingConfirmed(selfUniqueAddress)
latestGossip.leaderOf(selfDc, membersExceptSelf.filterNot(_.uniqueAddress == node1), selfUniqueAddress) match {
membershipState.leaderOf(membersExceptSelf.filterNot(_.uniqueAddress == node1)) match {
case Some(node2)
clusterCore(node2.address) ! ExitingConfirmed(selfUniqueAddress)
case None // no more potential leader
@ -723,7 +727,7 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
val localMembers = localGossip.members
val localOverview = localGossip.overview
val localSeen = localOverview.seen
val localReachability = localGossip.dcReachability(selfDc)
val localReachability = membershipState.dcReachability
// check if the node to DOWN is in the `members` set
localMembers.find(_.address == address) match {
@ -735,7 +739,7 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
val newGossip = localGossip.markAsDown(m)
updateLatestGossip(newGossip)
publish(latestGossip)
publishMembershipState()
case Some(_) // already down
case None
logInfo("Ignoring down of unknown node [{}]", address)
@ -753,7 +757,7 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
log.warning(
"Cluster Node [{}] - Marking node as TERMINATED [{}], due to quarantine. Node roles [{}]",
selfAddress, node.address, selfRoles.mkString(","))
publish(latestGossip)
publishMembershipState()
downing(node.address)
}
}
@ -829,14 +833,14 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
// Perform the same pruning (clear of VectorClock) as the leader did when removing a member.
// Removal of member itself is handled in merge (pickHighestPriority)
val prunedLocalGossip = localGossip.members.foldLeft(localGossip) { (g, m)
if (Gossip.removeUnreachableWithMemberStatus(m.status) && !remoteGossip.members.contains(m)) {
if (removeUnreachableWithMemberStatus(m.status) && !remoteGossip.members.contains(m)) {
log.debug("Cluster Node [{}] - Pruned conflicting local gossip: {}", selfAddress, m)
g.prune(VectorClock.Node(vclockName(m.uniqueAddress)))
} else
g
}
val prunedRemoteGossip = remoteGossip.members.foldLeft(remoteGossip) { (g, m)
if (Gossip.removeUnreachableWithMemberStatus(m.status) && !localGossip.members.contains(m)) {
if (removeUnreachableWithMemberStatus(m.status) && !localGossip.members.contains(m)) {
log.debug("Cluster Node [{}] - Pruned conflicting remote gossip: {}", selfAddress, m)
g.prune(VectorClock.Node(vclockName(m.uniqueAddress)))
} else
@ -849,9 +853,9 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
// Don't mark gossip state as seen while exiting is in progress, e.g.
// shutting down singleton actors. This delays removal of the member until
// the exiting tasks have been completed.
latestGossip =
membershipState = membershipState.copy(latestGossip =
if (exitingTasksInProgress) winningGossip
else winningGossip seen selfUniqueAddress
else winningGossip seen selfUniqueAddress)
assertLatestGossip()
// for all new joining nodes we remove them from the failure detector
@ -877,7 +881,7 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
}
}
publish(latestGossip)
publishMembershipState()
val selfStatus = latestGossip.member(selfUniqueAddress).status
if (selfStatus == Exiting && !exitingTasksInProgress) {
@ -1004,11 +1008,11 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
* Runs periodic leader actions, such as member status transitions, assigning partitions etc.
*/
def leaderActions(): Unit = {
if (latestGossip.isDcLeader(selfDc, selfUniqueAddress, selfUniqueAddress)) {
if (membershipState.isLeader(selfUniqueAddress)) {
// only run the leader actions if we are the LEADER of the data center
val firstNotice = 20
val periodicNotice = 60
if (latestGossip.convergence(selfDc, selfUniqueAddress, exitingConfirmed)) {
if (membershipState.convergence(exitingConfirmed)) {
if (leaderActionCounter >= firstNotice)
logInfo("Leader can perform its duties again")
leaderActionCounter = 0
@ -1021,7 +1025,7 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
if (leaderActionCounter == firstNotice || leaderActionCounter % periodicNotice == 0)
logInfo(
"Leader can currently not perform its duties, reachability status: [{}], member status: [{}]",
latestGossip.dcReachabilityExcludingDownedObservers(selfDc),
membershipState.dcReachabilityExcludingDownedObservers,
latestGossip.members.collect {
case m if m.dataCenter == selfDc
s"${m.address} ${m.status} seen=${latestGossip.seenByNode(m.uniqueAddress)}"
@ -1036,8 +1040,8 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
if (latestGossip.member(selfUniqueAddress).status == Down) {
// When all reachable have seen the state this member will shutdown itself when it has
// status Down. The down commands should spread before we shutdown.
val unreachable = latestGossip.dcReachability(selfDc).allUnreachableOrTerminated
val downed = latestGossip.dcMembers(selfDc).collect { case m if m.status == Down m.uniqueAddress }
val unreachable = membershipState.dcReachability.allUnreachableOrTerminated
val downed = membershipState.dcMembers.collect { case m if m.status == Down m.uniqueAddress }
if (downed.forall(node unreachable(node) || latestGossip.seenByNode(node))) {
// the reason for not shutting down immediately is to give the gossip a chance to spread
// the downing information to other downed nodes, so that they can shutdown themselves
@ -1072,9 +1076,9 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
def leaderActionsOnConvergence(): Unit = {
val removedUnreachable = for {
node latestGossip.dcReachability(selfDc).allUnreachableOrTerminated
node membershipState.dcReachability.allUnreachableOrTerminated
m = latestGossip.member(node)
if m.dataCenter == selfDc && Gossip.removeUnreachableWithMemberStatus(m.status)
if m.dataCenter == selfDc && removeUnreachableWithMemberStatus(m.status)
} yield m
val removedExitingConfirmed = exitingConfirmed.filter { n
@ -1148,7 +1152,7 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
val pruned = updatedGossip.pruneTombstones(System.currentTimeMillis() - PruneGossipTombstonesAfter.toMillis)
if (pruned ne latestGossip) {
updateLatestGossip(pruned)
publish(pruned)
publishMembershipState()
}
}
@ -1161,7 +1165,7 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
m.dataCenter == selfDc &&
m.status == Joining &&
enoughMembers &&
latestGossip.dcReachabilityExcludingDownedObservers(selfDc).isReachable(m.uniqueAddress)
membershipState.dcReachabilityExcludingDownedObservers.isReachable(m.uniqueAddress)
val changedMembers = localMembers.collect {
case m if isJoiningToWeaklyUp(m) m.copy(status = WeaklyUp)
}
@ -1177,7 +1181,7 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
logInfo("Leader is moving node [{}] to [{}]", m.address, m.status)
}
publish(latestGossip)
publishMembershipState()
}
}
@ -1230,7 +1234,7 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
if (newlyDetectedReachableMembers.nonEmpty)
logInfo("Marking node(s) as REACHABLE [{}]. Node roles [{}]", newlyDetectedReachableMembers.mkString(", "), selfRoles.mkString(","))
publish(latestGossip)
publishMembershipState()
}
}
}
@ -1269,23 +1273,25 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
clusterCore(node.address) ! GossipStatus(selfUniqueAddress, latestGossip.version)
def validNodeForGossip(node: UniqueAddress): Boolean =
node != selfUniqueAddress && latestGossip.isReachableExcludingDownedObservers(selfDc, node)
node != selfUniqueAddress && membershipState.isReachableExcludingDownedObservers(node)
def updateLatestGossip(newGossip: Gossip): Unit = {
def updateLatestGossip(gossip: Gossip): Unit = {
// Updating the vclock version for the changes
val versionedGossip = newGossip :+ vclockNode
val versionedGossip = gossip :+ vclockNode
// Don't mark gossip state as seen while exiting is in progress, e.g.
// shutting down singleton actors. This delays removal of the member until
// the exiting tasks have been completed.
if (exitingTasksInProgress)
latestGossip = versionedGossip.clearSeen()
else {
// Nobody else has seen this gossip but us
val seenVersionedGossip = versionedGossip onlySeen (selfUniqueAddress)
// Update the state with the new gossip
latestGossip = seenVersionedGossip
}
val newGossip =
if (exitingTasksInProgress)
versionedGossip.clearSeen()
else {
// Nobody else has seen this gossip but us
val seenVersionedGossip = versionedGossip onlySeen (selfUniqueAddress)
// Update the state with the new gossip
seenVersionedGossip
}
membershipState = membershipState.copy(newGossip)
assertLatestGossip()
}
@ -1293,11 +1299,11 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
if (Cluster.isAssertInvariantsEnabled && latestGossip.version.versions.size > latestGossip.members.size)
throw new IllegalStateException(s"Too many vector clock entries in gossip state ${latestGossip}")
def publish(newGossip: Gossip): Unit = {
def publishMembershipState(): Unit = {
if (cluster.settings.Debug.VerboseGossipLogging)
log.debug("Cluster Node [{}] dc [{}] - New gossip published [{}]", selfAddress, cluster.settings.DataCenter, newGossip)
log.debug("Cluster Node [{}] dc [{}] - New gossip published [{}]", selfAddress, cluster.settings.DataCenter, membershipState.latestGossip)
publisher ! PublishChanges(newGossip)
publisher ! PublishChanges(membershipState)
if (PublishStatsInterval == Duration.Zero) publishInternalStats()
}