Telemetry SPI hooks for SBR decision, #29085

This commit is contained in:
Patrik Nordwall 2020-05-28 08:52:22 +02:00
parent 1472bd9e8c
commit cd9e9e960a
2 changed files with 32 additions and 13 deletions

View file

@ -10,6 +10,7 @@ import scala.concurrent.duration.FiniteDuration
import akka.actor.Address import akka.actor.Address
import akka.annotation.InternalApi import akka.annotation.InternalApi
import akka.annotation.InternalStableApi
import akka.cluster.ClusterSettings.DataCenter import akka.cluster.ClusterSettings.DataCenter
import akka.cluster.Member import akka.cluster.Member
import akka.cluster.MemberStatus import akka.cluster.MemberStatus
@ -59,6 +60,7 @@ import akka.coordination.lease.scaladsl.Lease
// may contain Joining and WeaklyUp // may contain Joining and WeaklyUp
private var _unreachable: Set[UniqueAddress] = Set.empty[UniqueAddress] private var _unreachable: Set[UniqueAddress] = Set.empty[UniqueAddress]
@InternalStableApi
def unreachable: Set[UniqueAddress] = _unreachable def unreachable: Set[UniqueAddress] = _unreachable
def unreachable(m: Member): Boolean = _unreachable(m.uniqueAddress) def unreachable(m: Member): Boolean = _unreachable(m.uniqueAddress)
@ -79,11 +81,13 @@ import akka.coordination.lease.scaladsl.Lease
_allMembers.filter(m => m.status == MemberStatus.Joining || m.status == MemberStatus.WeaklyUp) _allMembers.filter(m => m.status == MemberStatus.Joining || m.status == MemberStatus.WeaklyUp)
// all members in self DC, both joining and up. // all members in self DC, both joining and up.
@InternalStableApi
def allMembersInDC: immutable.SortedSet[Member] = _allMembers def allMembersInDC: immutable.SortedSet[Member] = _allMembers
/** /**
* All members in self DC, but doesn't contain Joining, WeaklyUp, Down and Exiting. * All members in self DC, but doesn't contain Joining, WeaklyUp, Down and Exiting.
*/ */
@InternalStableApi
def members: immutable.SortedSet[Member] = def members: immutable.SortedSet[Member] =
members(includingPossiblyUp = false, excludingPossiblyExiting = false) members(includingPossiblyUp = false, excludingPossiblyExiting = false)
@ -193,6 +197,7 @@ import akka.coordination.lease.scaladsl.Lease
} }
} }
@InternalStableApi
def reachability: Reachability = def reachability: Reachability =
_reachability _reachability

View file

@ -17,6 +17,7 @@ import akka.actor.Props
import akka.actor.Stash import akka.actor.Stash
import akka.actor.Timers import akka.actor.Timers
import akka.annotation.InternalApi import akka.annotation.InternalApi
import akka.annotation.InternalStableApi
import akka.cluster.Cluster import akka.cluster.Cluster
import akka.cluster.ClusterEvent import akka.cluster.ClusterEvent
import akka.cluster.ClusterEvent._ import akka.cluster.ClusterEvent._
@ -130,7 +131,7 @@ import akka.pattern.pipe
* The implementation is split into two classes SplitBrainResolver and SplitBrainResolverBase to be * The implementation is split into two classes SplitBrainResolver and SplitBrainResolverBase to be
* able to unit test the logic without running cluster. * able to unit test the logic without running cluster.
*/ */
@InternalApi private[sbr] abstract class SplitBrainResolverBase(stableAfter: FiniteDuration, strategy: DowningStrategy) @InternalApi private[sbr] abstract class SplitBrainResolverBase(stableAfter: FiniteDuration, _strategy: DowningStrategy)
extends Actor extends Actor
with Stash with Stash
with Timers { with Timers {
@ -141,6 +142,10 @@ import akka.pattern.pipe
val log: DiagnosticMarkerBusLoggingAdapter = Logging.withMarker(this) val log: DiagnosticMarkerBusLoggingAdapter = Logging.withMarker(this)
@InternalStableApi
def strategy: DowningStrategy = _strategy
@InternalStableApi
def selfUniqueAddress: UniqueAddress def selfUniqueAddress: UniqueAddress
def selfDc: DataCenter def selfDc: DataCenter
@ -416,6 +421,27 @@ import akka.pattern.pipe
strategy.nodesToDown(DownAll) strategy.nodesToDown(DownAll)
} }
observeDecision(decision, nodesToDown, unreachableDataCenters)
if (nodesToDown.nonEmpty) {
val downMyself = nodesToDown.contains(selfUniqueAddress)
// downing is idempotent, and we also avoid calling down on nodes with status Down
// down selfAddress last, since it may shutdown itself if down alone
nodesToDown.foreach(uniqueAddress => if (uniqueAddress != selfUniqueAddress) down(uniqueAddress, decision))
if (downMyself)
down(selfUniqueAddress, decision)
resetReachabilityChangedStats()
resetStableDeadline()
}
nodesToDown
}
@InternalStableApi
def observeDecision(
decision: Decision,
nodesToDown: Set[UniqueAddress],
unreachableDataCenters: Set[DataCenter]): Unit = {
val downMyself = nodesToDown.contains(selfUniqueAddress) val downMyself = nodesToDown.contains(selfUniqueAddress)
val indirectlyConnectedLogMessage = val indirectlyConnectedLogMessage =
@ -435,18 +461,6 @@ import akka.pattern.pipe
indirectlyConnectedLogMessage + indirectlyConnectedLogMessage +
s", all members in DC [${strategy.allMembersInDC.mkString(", ")}], full reachability status: ${strategy.reachability}" + s", all members in DC [${strategy.allMembersInDC.mkString(", ")}], full reachability status: ${strategy.reachability}" +
unreachableDataCentersLogMessage) unreachableDataCentersLogMessage)
if (nodesToDown.nonEmpty) {
// downing is idempotent, and we also avoid calling down on nodes with status Down
// down selfAddress last, since it may shutdown itself if down alone
nodesToDown.foreach(uniqueAddress => if (uniqueAddress != selfUniqueAddress) down(uniqueAddress, decision))
if (downMyself)
down(selfUniqueAddress, decision)
resetReachabilityChangedStats()
resetStableDeadline()
}
nodesToDown
} }
def isResponsible: Boolean = leader && selfMemberAdded def isResponsible: Boolean = leader && selfMemberAdded