simplify the SBR instability check (#29625)
This commit is contained in:
parent
aa4a0dbcbb
commit
b28d77b316
4 changed files with 66 additions and 71 deletions
|
|
@ -0,0 +1,2 @@
|
|||
# change to internal
|
||||
ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.cluster.sbr.DowningStrategy.setReachability")
|
||||
|
|
@ -205,26 +205,12 @@ import akka.coordination.lease.scaladsl.Lease
|
|||
_allMembers.exists(m => m.uniqueAddress == node && m.dataCenter == selfDc)
|
||||
}
|
||||
|
||||
/**
|
||||
* @return true if it was changed
|
||||
*/
|
||||
private[sbr] def setReachability(r: Reachability): Boolean = {
|
||||
private[sbr] def setReachability(r: Reachability): Unit = {
|
||||
// skip records with Reachability.Reachable, and skip records related to other DC
|
||||
val newReachability = r.filterRecords(
|
||||
_reachability = r.filterRecords(
|
||||
record =>
|
||||
(record.status == Reachability.Unreachable || record.status == Reachability.Terminated) &&
|
||||
isInSelfDc(record.observer) && isInSelfDc(record.subject))
|
||||
val oldReachability = _reachability
|
||||
|
||||
val changed =
|
||||
if (oldReachability.records.size != newReachability.records.size)
|
||||
true
|
||||
else
|
||||
oldReachability.records.map(r => r.observer -> r.subject).toSet !=
|
||||
newReachability.records.map(r => r.observer -> r.subject).toSet
|
||||
|
||||
_reachability = newReachability
|
||||
changed
|
||||
}
|
||||
|
||||
def seenBy: Set[Address] =
|
||||
|
|
|
|||
|
|
@ -294,16 +294,17 @@ import akka.pattern.pipe
|
|||
val durationSinceLatestChange = (now - reachabilityChangedStats.latestChangeTimestamp).nanos
|
||||
val durationSinceFirstChange = (now - reachabilityChangedStats.firstChangeTimestamp).nanos
|
||||
|
||||
if (durationSinceLatestChange > (stableAfter * 2)) {
|
||||
log.debug("SBR no reachability changes within {} ms, resetting stats", (stableAfter * 2).toMillis)
|
||||
resetReachabilityChangedStats()
|
||||
} else if (downAllWhenUnstable > Duration.Zero &&
|
||||
durationSinceFirstChange > (stableAfter + downAllWhenUnstable)) {
|
||||
val downAllWhenUnstableEnabled = downAllWhenUnstable > Duration.Zero
|
||||
if (downAllWhenUnstableEnabled && durationSinceFirstChange > (stableAfter + downAllWhenUnstable)) {
|
||||
log.warning(
|
||||
ClusterLogMarker.sbrInstability,
|
||||
"SBR detected instability and will down all nodes: {}",
|
||||
reachabilityChangedStats)
|
||||
actOnDecision(DownAll)
|
||||
} else if (!downAllWhenUnstableEnabled && durationSinceLatestChange > (stableAfter * 2)) {
|
||||
// downAllWhenUnstable is disabled but reset for meaningful logging
|
||||
log.debug("SBR no reachability changes within {} ms, resetting stats", (stableAfter * 2).toMillis)
|
||||
resetReachabilityChangedStats()
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -470,7 +471,10 @@ import akka.pattern.pipe
|
|||
log.debug("SBR unreachableMember [{}]", m)
|
||||
mutateMemberInfo(resetStable = true) { () =>
|
||||
strategy.addUnreachable(m)
|
||||
updateReachabilityChangedStats()
|
||||
resetReachabilityChangedStatsIfAllUnreachableDowned()
|
||||
if (!reachabilityChangedStats.isEmpty)
|
||||
log.debug("SBR noticed {}", reachabilityChangedStats)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -480,19 +484,16 @@ import akka.pattern.pipe
|
|||
log.debug("SBR reachableMember [{}]", m)
|
||||
mutateMemberInfo(resetStable = true) { () =>
|
||||
strategy.addReachable(m)
|
||||
updateReachabilityChangedStats()
|
||||
resetReachabilityChangedStatsIfAllUnreachableDowned()
|
||||
if (!reachabilityChangedStats.isEmpty)
|
||||
log.debug("SBR noticed {}", reachabilityChangedStats)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private[sbr] def reachabilityChanged(r: Reachability): Unit = {
|
||||
if (strategy.setReachability(r)) {
|
||||
// resetStableDeadline is done from unreachableMember/reachableMember
|
||||
updateReachabilityChangedStats()
|
||||
// it may also change when members are removed and therefore the reset may be needed
|
||||
resetReachabilityChangedStatsIfAllUnreachableDowned()
|
||||
log.debug("SBR noticed {}", reachabilityChangedStats)
|
||||
}
|
||||
strategy.setReachability(r)
|
||||
}
|
||||
|
||||
private def updateReachabilityChangedStats(): Unit = {
|
||||
|
|
|
|||
|
|
@ -82,6 +82,12 @@ object SplitBrainResolverSpec {
|
|||
} else if (!leader)
|
||||
probe ! "down must only be done by leader"
|
||||
}
|
||||
|
||||
override def receive: Receive =
|
||||
({
|
||||
case UnreachableMember(m) if strategy.unreachable(m.uniqueAddress) => // already unreachable
|
||||
case ReachableMember(m) if !strategy.unreachable(m.uniqueAddress) => // already reachable
|
||||
}: Receive).orElse(super.receive)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -1464,7 +1470,7 @@ class SplitBrainResolverSpec
|
|||
stop()
|
||||
}
|
||||
|
||||
"down all when unstable" in new SetupKeepMajority(
|
||||
"down all when unstable, scenario 1" in new SetupKeepMajority(
|
||||
stableAfter = 2.seconds,
|
||||
downAllWhenUnstable = 1.second,
|
||||
selfUniqueAddress = memberA.uniqueAddress,
|
||||
|
|
@ -1494,6 +1500,48 @@ class SplitBrainResolverSpec
|
|||
expectDownCalled(memberA, memberB, memberC, memberD, memberE)
|
||||
}
|
||||
|
||||
"down all when unstable, scenario 2" in new SetupKeepMajority(
|
||||
stableAfter = 2.seconds,
|
||||
downAllWhenUnstable = 500.millis,
|
||||
selfUniqueAddress = memberA.uniqueAddress,
|
||||
role = None,
|
||||
tickInterval = 100.seconds) {
|
||||
memberUp(memberA, memberB, memberC, memberD, memberE)
|
||||
leader(memberA)
|
||||
// E and D are unreachable
|
||||
reachabilityChanged(memberA -> memberE, memberB -> memberD, memberC -> memberD)
|
||||
tick()
|
||||
expectNoDecision(100.millis)
|
||||
|
||||
Thread.sleep(500)
|
||||
// E and D are still unreachable
|
||||
reachabilityChanged(memberA -> memberE, memberB -> memberD)
|
||||
tick()
|
||||
expectNoDecision(100.millis)
|
||||
// 600 ms has elapsed
|
||||
|
||||
Thread.sleep(500)
|
||||
reachabilityChanged(memberA -> memberE)
|
||||
reachable(memberD) // reset stableDeadline
|
||||
tick()
|
||||
expectNoDecision(100.millis)
|
||||
// 1200 ms has elapsed
|
||||
|
||||
Thread.sleep(500)
|
||||
// E and D are unreachable, reset stableDeadline
|
||||
reachabilityChanged(memberA -> memberE, memberB -> memberD, memberC -> memberD)
|
||||
tick()
|
||||
expectNoDecision(100.millis)
|
||||
// 1800 ms has elapsed
|
||||
|
||||
Thread.sleep(1000)
|
||||
// E and D are still unreachable
|
||||
reachabilityChanged(memberA -> memberE, memberB -> memberD)
|
||||
tick()
|
||||
// 2800 ms has elapsed and still no stability so downing all
|
||||
expectDownCalled(memberA, memberB, memberC, memberD, memberE)
|
||||
}
|
||||
|
||||
"not down all when becoming stable again" in new SetupKeepMajority(
|
||||
stableAfter = 2.seconds,
|
||||
downAllWhenUnstable = 1.second,
|
||||
|
|
@ -1594,46 +1642,4 @@ class SplitBrainResolverSpec
|
|||
}
|
||||
}
|
||||
|
||||
"Reachability changes" must {
|
||||
val strategy = new KeepMajority(defaultDataCenter, None)
|
||||
strategy.add(memberA)
|
||||
strategy.add(memberB)
|
||||
strategy.add(memberC)
|
||||
|
||||
val memberDInOtherDC = dcMember("otherDC", memberD)
|
||||
val memberEInOtherDC = dcMember("otherDC", memberE)
|
||||
|
||||
"be noticed when records added" in {
|
||||
strategy.setReachability(createReachability(List(memberA -> memberB)))
|
||||
strategy.setReachability(createReachability(List(memberA -> memberB, memberA -> memberC))) should ===(true)
|
||||
}
|
||||
|
||||
"be noticed when records removed" in {
|
||||
strategy.setReachability(createReachability(List(memberA -> memberB, memberA -> memberC)))
|
||||
strategy.setReachability(createReachability(List(memberA -> memberB))) should ===(true)
|
||||
strategy.setReachability(Reachability.empty) should ===(true)
|
||||
}
|
||||
|
||||
"be noticed when records change to Reachable" in {
|
||||
val r = createReachability(List(memberA -> memberB, memberA -> memberC))
|
||||
strategy.setReachability(r)
|
||||
strategy.setReachability(r.reachable(memberA.uniqueAddress, memberC.uniqueAddress)) should ===(true)
|
||||
}
|
||||
|
||||
"be noticed when records added and removed" in {
|
||||
strategy.setReachability(createReachability(List(memberA -> memberB)))
|
||||
strategy.setReachability(createReachability(List(memberC -> memberB))) should ===(true)
|
||||
}
|
||||
|
||||
"be ignored when records for other DC added" in {
|
||||
strategy.setReachability(createReachability(List(memberA -> memberB)))
|
||||
strategy.setReachability(createReachability(List(memberA -> memberB, memberA -> memberDInOtherDC))) should ===(
|
||||
false)
|
||||
strategy.setReachability(createReachability(List(memberA -> memberB, memberDInOtherDC -> memberB))) should ===(
|
||||
false)
|
||||
strategy.setReachability(createReachability(List(memberA -> memberB, memberDInOtherDC -> memberEInOtherDC))) should ===(
|
||||
false)
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue