ignore reachability records for invalid nodes (#30425)
Co-authored-by: Patrik Nordwall <patrik.nordwall@gmail.com>
This commit is contained in:
parent
33ea41e827
commit
83f049c047
2 changed files with 80 additions and 8 deletions
|
|
@ -256,9 +256,10 @@ import akka.coordination.lease.scaladsl.Lease
|
|||
|
||||
def nodesToDown(decision: Decision = decide()): Set[UniqueAddress] = {
|
||||
val downable = members
|
||||
.union(joining)
|
||||
.filterNot(m => m.status == MemberStatus.Down || m.status == MemberStatus.Exiting)
|
||||
.union(joining)
|
||||
.map(_.uniqueAddress)
|
||||
|
||||
decision match {
|
||||
case DownUnreachable | AcquireLeaseAndDownUnreachable(_) => downable.intersect(unreachable)
|
||||
case DownReachable => downable.diff(unreachable)
|
||||
|
|
@ -271,7 +272,7 @@ import akka.coordination.lease.scaladsl.Lease
|
|||
// failure detection observations between the indirectly connected nodes.
|
||||
// Also include nodes that corresponds to the decision without the unreachability observations from
|
||||
// the indirectly connected nodes
|
||||
downable.intersect(indirectlyConnected.union(additionalNodesToDownWhenIndirectlyConnected))
|
||||
downable.intersect(indirectlyConnected.union(additionalNodesToDownWhenIndirectlyConnected(downable)))
|
||||
case ReverseDownIndirectlyConnected =>
|
||||
// indirectly connected + all reachable
|
||||
downable.intersect(indirectlyConnected).union(downable.diff(unreachable))
|
||||
|
|
@ -281,23 +282,27 @@ import akka.coordination.lease.scaladsl.Lease
|
|||
}
|
||||
}
|
||||
|
||||
private def additionalNodesToDownWhenIndirectlyConnected: Set[UniqueAddress] = {
|
||||
private def additionalNodesToDownWhenIndirectlyConnected(downable: Set[UniqueAddress]): Set[UniqueAddress] = {
|
||||
if (unreachableButNotIndirectlyConnected.isEmpty)
|
||||
Set.empty
|
||||
else {
|
||||
val originalUnreachable = _unreachable
|
||||
val originalReachability = _reachability
|
||||
|
||||
try {
|
||||
val intersectionOfObserversAndSubjects = indirectlyConnectedFromIntersectionOfObserversAndSubjects
|
||||
val haveSeenCurrentGossip = indirectlyConnectedFromSeenCurrentGossip
|
||||
// remove records between the indirectly connected
|
||||
_reachability = reachability.filterRecords(
|
||||
r =>
|
||||
!((intersectionOfObserversAndSubjects(r.observer) && intersectionOfObserversAndSubjects(r.subject)) ||
|
||||
(haveSeenCurrentGossip(r.observer) && haveSeenCurrentGossip(r.subject))))
|
||||
_reachability = reachability.filterRecords { r =>
|
||||
// we only retain records for addresses that are still downable
|
||||
downable.contains(r.observer) && downable.contains(r.subject) &&
|
||||
// remove records between the indirectly connected
|
||||
!(intersectionOfObserversAndSubjects(r.observer) && intersectionOfObserversAndSubjects(r.subject) ||
|
||||
haveSeenCurrentGossip(r.observer) && haveSeenCurrentGossip(r.subject))
|
||||
}
|
||||
_unreachable = reachability.allUnreachableOrTerminated
|
||||
val additionalDecision = decide()
|
||||
|
||||
val additionalDecision = decide()
|
||||
if (additionalDecision.isIndirectlyConnected)
|
||||
throw new IllegalStateException(
|
||||
s"SBR double $additionalDecision decision, downing all instead. " +
|
||||
|
|
@ -305,6 +310,7 @@ import akka.coordination.lease.scaladsl.Lease
|
|||
s"still indirectlyConnected: [$indirectlyConnected], seenBy: [$seenBy]")
|
||||
|
||||
nodesToDown(additionalDecision)
|
||||
|
||||
} finally {
|
||||
_unreachable = originalUnreachable
|
||||
_reachability = originalReachability
|
||||
|
|
|
|||
|
|
@ -15,6 +15,7 @@ import akka.actor.Address
|
|||
import akka.actor.ExtendedActorSystem
|
||||
import akka.actor.Props
|
||||
import akka.cluster.ClusterEvent.LeaderChanged
|
||||
import akka.cluster.ClusterEvent.MemberExited
|
||||
import akka.cluster.ClusterEvent.MemberRemoved
|
||||
import akka.cluster.ClusterEvent.MemberUp
|
||||
import akka.cluster.ClusterEvent.MemberWeaklyUp
|
||||
|
|
@ -935,7 +936,22 @@ class SplitBrainResolverSpec
|
|||
val reverseDecision3 = strategy3.reverseDecision(decision3.asInstanceOf[AcquireLeaseDecision])
|
||||
reverseDecision3 should ===(ReverseDownIndirectlyConnected)
|
||||
strategy3.nodesToDown(reverseDecision3) should ===(Set(memberB, memberC, memberD, memberE).map(_.uniqueAddress))
|
||||
}
|
||||
|
||||
"down indirectly connected to already downed node during partition: {A, B, C, D} | {(E, F)} => {A, B, C, D}" in new Setup2(
|
||||
role = None) {
|
||||
val memberELeaving = leaving(memberE)
|
||||
val memberFDown = downed(memberF)
|
||||
side1 = Set(memberA, memberB, memberC, memberD)
|
||||
side2 = Set(memberELeaving, memberFDown)
|
||||
// trouble when indirectly connected happens before clean partition
|
||||
indirectlyConnected = List(memberELeaving -> memberFDown)
|
||||
|
||||
// from side1 of the partition, majority
|
||||
assertDowningSide(side1, Set(memberELeaving))
|
||||
|
||||
// from side2 of the partition, minority
|
||||
assertDowningSide(side2, Set(memberA, memberB, memberC, memberD, memberELeaving))
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -1477,6 +1493,56 @@ class SplitBrainResolverSpec
|
|||
|
||||
}
|
||||
|
||||
"down indirectly connected when combined with partition and exiting: {A, B, C, D} | {E, F-exiting} => {A, B, C, D}" in {
|
||||
new SetupKeepMajority(stableAfter = Duration.Zero, memberA.uniqueAddress, role = None) {
|
||||
memberUp(memberA, memberB, memberC, memberD, memberE, memberF)
|
||||
val memberFExiting = exiting(memberF)
|
||||
a ! MemberExited(memberFExiting)
|
||||
leader(memberA)
|
||||
// indirectly connected: memberF
|
||||
// partition: memberA, memberB, memberC, memberD | memberE, memberF
|
||||
reachabilityChanged(
|
||||
memberA -> memberE,
|
||||
memberA -> memberFExiting,
|
||||
memberB -> memberE,
|
||||
memberB -> memberFExiting,
|
||||
memberC -> memberE,
|
||||
memberC -> memberFExiting,
|
||||
memberD -> memberE,
|
||||
memberD -> memberFExiting,
|
||||
memberE -> memberFExiting)
|
||||
tick()
|
||||
// keep fully connected members
|
||||
expectDownCalled(memberE)
|
||||
stop()
|
||||
}
|
||||
}
|
||||
|
||||
"down indirectly connected when combined with partition and exiting: {A, B, C, D} | {E-exiting, F} => {A, B, C, D}" in {
|
||||
new SetupKeepMajority(stableAfter = Duration.Zero, memberA.uniqueAddress, role = None) {
|
||||
memberUp(memberA, memberB, memberC, memberD, memberE, memberF)
|
||||
val memberEExiting = exiting(memberE)
|
||||
a ! MemberExited(memberEExiting)
|
||||
leader(memberA)
|
||||
// indirectly connected: memberF
|
||||
// partition: memberA, memberB, memberC, memberD | memberE, memberF
|
||||
reachabilityChanged(
|
||||
memberA -> memberEExiting,
|
||||
memberA -> memberF,
|
||||
memberB -> memberEExiting,
|
||||
memberB -> memberF,
|
||||
memberC -> memberEExiting,
|
||||
memberC -> memberF,
|
||||
memberD -> memberEExiting,
|
||||
memberD -> memberF,
|
||||
memberE -> memberF)
|
||||
tick()
|
||||
// keep fully connected members
|
||||
expectDownCalled(memberF)
|
||||
stop()
|
||||
}
|
||||
}
|
||||
|
||||
"down all in self data centers" in new SetupDownAllNodes(stableAfter = Duration.Zero, memberA.uniqueAddress) {
|
||||
memberUp(dataCenter(selfDc, memberA, memberB, memberC).toList: _*)
|
||||
// D and E not in self DC
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue