diff --git a/akka-cluster/src/main/scala/akka/cluster/sbr/DowningStrategy.scala b/akka-cluster/src/main/scala/akka/cluster/sbr/DowningStrategy.scala index cc5f57e7bb..e85f734d91 100644 --- a/akka-cluster/src/main/scala/akka/cluster/sbr/DowningStrategy.scala +++ b/akka-cluster/src/main/scala/akka/cluster/sbr/DowningStrategy.scala @@ -256,9 +256,10 @@ import akka.coordination.lease.scaladsl.Lease def nodesToDown(decision: Decision = decide()): Set[UniqueAddress] = { val downable = members - .union(joining) .filterNot(m => m.status == MemberStatus.Down || m.status == MemberStatus.Exiting) + .union(joining) .map(_.uniqueAddress) + decision match { case DownUnreachable | AcquireLeaseAndDownUnreachable(_) => downable.intersect(unreachable) case DownReachable => downable.diff(unreachable) @@ -271,7 +272,7 @@ import akka.coordination.lease.scaladsl.Lease // failure detection observations between the indirectly connected nodes. // Also include nodes that corresponds to the decision without the unreachability observations from // the indirectly connected nodes - downable.intersect(indirectlyConnected.union(additionalNodesToDownWhenIndirectlyConnected)) + downable.intersect(indirectlyConnected.union(additionalNodesToDownWhenIndirectlyConnected(downable))) case ReverseDownIndirectlyConnected => // indirectly connected + all reachable downable.intersect(indirectlyConnected).union(downable.diff(unreachable)) @@ -281,23 +282,27 @@ import akka.coordination.lease.scaladsl.Lease } } - private def additionalNodesToDownWhenIndirectlyConnected: Set[UniqueAddress] = { + private def additionalNodesToDownWhenIndirectlyConnected(downable: Set[UniqueAddress]): Set[UniqueAddress] = { if (unreachableButNotIndirectlyConnected.isEmpty) Set.empty else { val originalUnreachable = _unreachable val originalReachability = _reachability + try { val intersectionOfObserversAndSubjects = indirectlyConnectedFromIntersectionOfObserversAndSubjects val haveSeenCurrentGossip = indirectlyConnectedFromSeenCurrentGossip // remove records between the indirectly connected - _reachability = reachability.filterRecords( - r => - !((intersectionOfObserversAndSubjects(r.observer) && intersectionOfObserversAndSubjects(r.subject)) || - (haveSeenCurrentGossip(r.observer) && haveSeenCurrentGossip(r.subject)))) + _reachability = reachability.filterRecords { r => + // we only retain records for addresses that are still downable + downable.contains(r.observer) && downable.contains(r.subject) && + // remove records between the indirectly connected + !(intersectionOfObserversAndSubjects(r.observer) && intersectionOfObserversAndSubjects(r.subject) || + haveSeenCurrentGossip(r.observer) && haveSeenCurrentGossip(r.subject)) + } _unreachable = reachability.allUnreachableOrTerminated - val additionalDecision = decide() + val additionalDecision = decide() if (additionalDecision.isIndirectlyConnected) throw new IllegalStateException( s"SBR double $additionalDecision decision, downing all instead. " + @@ -305,6 +310,7 @@ import akka.coordination.lease.scaladsl.Lease s"still indirectlyConnected: [$indirectlyConnected], seenBy: [$seenBy]") nodesToDown(additionalDecision) + } finally { _unreachable = originalUnreachable _reachability = originalReachability diff --git a/akka-cluster/src/test/scala/akka/cluster/sbr/SplitBrainResolverSpec.scala b/akka-cluster/src/test/scala/akka/cluster/sbr/SplitBrainResolverSpec.scala index faf26c87e1..be91ab3c7c 100644 --- a/akka-cluster/src/test/scala/akka/cluster/sbr/SplitBrainResolverSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/sbr/SplitBrainResolverSpec.scala @@ -15,6 +15,7 @@ import akka.actor.Address import akka.actor.ExtendedActorSystem import akka.actor.Props import akka.cluster.ClusterEvent.LeaderChanged +import akka.cluster.ClusterEvent.MemberExited import akka.cluster.ClusterEvent.MemberRemoved import akka.cluster.ClusterEvent.MemberUp import akka.cluster.ClusterEvent.MemberWeaklyUp @@ -935,7 +936,22 @@ class SplitBrainResolverSpec val reverseDecision3 = strategy3.reverseDecision(decision3.asInstanceOf[AcquireLeaseDecision]) reverseDecision3 should ===(ReverseDownIndirectlyConnected) strategy3.nodesToDown(reverseDecision3) should ===(Set(memberB, memberC, memberD, memberE).map(_.uniqueAddress)) + } + "down indirectly connected to already downed node during partition: {A, B, C, D} | {(E, F)} => {A, B, C, D}" in new Setup2( + role = None) { + val memberELeaving = leaving(memberE) + val memberFDown = downed(memberF) + side1 = Set(memberA, memberB, memberC, memberD) + side2 = Set(memberELeaving, memberFDown) + // trouble when indirectly connected happens before clean partition + indirectlyConnected = List(memberELeaving -> memberFDown) + + // from side1 of the partition, majority + assertDowningSide(side1, Set(memberELeaving)) + + // from side2 of the partition, minority + assertDowningSide(side2, Set(memberA, memberB, memberC, memberD, memberELeaving)) } } @@ -1477,6 +1493,56 @@ class SplitBrainResolverSpec } + "down indirectly connected when combined with partition and exiting: {A, B, C, D} | {E, F-exiting} => {A, B, C, D}" in { + new SetupKeepMajority(stableAfter = Duration.Zero, memberA.uniqueAddress, role = None) { + memberUp(memberA, memberB, memberC, memberD, memberE, memberF) + val memberFExiting = exiting(memberF) + a ! MemberExited(memberFExiting) + leader(memberA) + // indirectly connected: memberF + // partition: memberA, memberB, memberC, memberD | memberE, memberF + reachabilityChanged( + memberA -> memberE, + memberA -> memberFExiting, + memberB -> memberE, + memberB -> memberFExiting, + memberC -> memberE, + memberC -> memberFExiting, + memberD -> memberE, + memberD -> memberFExiting, + memberE -> memberFExiting) + tick() + // keep fully connected members + expectDownCalled(memberE) + stop() + } + } + + "down indirectly connected when combined with partition and exiting: {A, B, C, D} | {E-exiting, F} => {A, B, C, D}" in { + new SetupKeepMajority(stableAfter = Duration.Zero, memberA.uniqueAddress, role = None) { + memberUp(memberA, memberB, memberC, memberD, memberE, memberF) + val memberEExiting = exiting(memberE) + a ! MemberExited(memberEExiting) + leader(memberA) + // indirectly connected: memberF + // partition: memberA, memberB, memberC, memberD | memberE, memberF + reachabilityChanged( + memberA -> memberEExiting, + memberA -> memberF, + memberB -> memberEExiting, + memberB -> memberF, + memberC -> memberEExiting, + memberC -> memberF, + memberD -> memberEExiting, + memberD -> memberF, + memberE -> memberF) + tick() + // keep fully connected members + expectDownCalled(memberF) + stop() + } + } + "down all in self data centers" in new SetupDownAllNodes(stableAfter = Duration.Zero, memberA.uniqueAddress) { memberUp(dataCenter(selfDc, memberA, memberB, memberC).toList: _*) // D and E not in self DC