diff --git a/akka-cluster/src/main/scala/akka/cluster/sbr/DowningStrategy.scala b/akka-cluster/src/main/scala/akka/cluster/sbr/DowningStrategy.scala index 98ff809885..bf370a0ea4 100644 --- a/akka-cluster/src/main/scala/akka/cluster/sbr/DowningStrategy.scala +++ b/akka-cluster/src/main/scala/akka/cluster/sbr/DowningStrategy.scala @@ -10,6 +10,7 @@ import scala.concurrent.duration.FiniteDuration import akka.actor.Address import akka.annotation.InternalApi +import akka.annotation.InternalStableApi import akka.cluster.ClusterSettings.DataCenter import akka.cluster.Member import akka.cluster.MemberStatus @@ -59,6 +60,7 @@ import akka.coordination.lease.scaladsl.Lease // may contain Joining and WeaklyUp private var _unreachable: Set[UniqueAddress] = Set.empty[UniqueAddress] + @InternalStableApi def unreachable: Set[UniqueAddress] = _unreachable def unreachable(m: Member): Boolean = _unreachable(m.uniqueAddress) @@ -79,11 +81,13 @@ import akka.coordination.lease.scaladsl.Lease _allMembers.filter(m => m.status == MemberStatus.Joining || m.status == MemberStatus.WeaklyUp) // all members in self DC, both joining and up. + @InternalStableApi def allMembersInDC: immutable.SortedSet[Member] = _allMembers /** * All members in self DC, but doesn't contain Joining, WeaklyUp, Down and Exiting. */ + @InternalStableApi def members: immutable.SortedSet[Member] = members(includingPossiblyUp = false, excludingPossiblyExiting = false) @@ -193,6 +197,7 @@ import akka.coordination.lease.scaladsl.Lease } } + @InternalStableApi def reachability: Reachability = _reachability diff --git a/akka-cluster/src/main/scala/akka/cluster/sbr/SplitBrainResolver.scala b/akka-cluster/src/main/scala/akka/cluster/sbr/SplitBrainResolver.scala index f70a9530f0..4545836d2f 100644 --- a/akka-cluster/src/main/scala/akka/cluster/sbr/SplitBrainResolver.scala +++ b/akka-cluster/src/main/scala/akka/cluster/sbr/SplitBrainResolver.scala @@ -17,6 +17,7 @@ import akka.actor.Props import akka.actor.Stash import akka.actor.Timers import akka.annotation.InternalApi +import akka.annotation.InternalStableApi import akka.cluster.Cluster import akka.cluster.ClusterEvent import akka.cluster.ClusterEvent._ @@ -130,7 +131,7 @@ import akka.pattern.pipe * The implementation is split into two classes SplitBrainResolver and SplitBrainResolverBase to be * able to unit test the logic without running cluster. */ -@InternalApi private[sbr] abstract class SplitBrainResolverBase(stableAfter: FiniteDuration, strategy: DowningStrategy) +@InternalApi private[sbr] abstract class SplitBrainResolverBase(stableAfter: FiniteDuration, _strategy: DowningStrategy) extends Actor with Stash with Timers { @@ -141,6 +142,10 @@ import akka.pattern.pipe val log: DiagnosticMarkerBusLoggingAdapter = Logging.withMarker(this) + @InternalStableApi + def strategy: DowningStrategy = _strategy + + @InternalStableApi def selfUniqueAddress: UniqueAddress def selfDc: DataCenter @@ -416,6 +421,27 @@ import akka.pattern.pipe strategy.nodesToDown(DownAll) } + observeDecision(decision, nodesToDown, unreachableDataCenters) + + if (nodesToDown.nonEmpty) { + val downMyself = nodesToDown.contains(selfUniqueAddress) + // downing is idempotent, and we also avoid calling down on nodes with status Down + // down selfAddress last, since it may shutdown itself if down alone + nodesToDown.foreach(uniqueAddress => if (uniqueAddress != selfUniqueAddress) down(uniqueAddress, decision)) + if (downMyself) + down(selfUniqueAddress, decision) + + resetReachabilityChangedStats() + resetStableDeadline() + } + nodesToDown + } + + @InternalStableApi + def observeDecision( + decision: Decision, + nodesToDown: Set[UniqueAddress], + unreachableDataCenters: Set[DataCenter]): Unit = { val downMyself = nodesToDown.contains(selfUniqueAddress) val indirectlyConnectedLogMessage = @@ -435,18 +461,6 @@ import akka.pattern.pipe indirectlyConnectedLogMessage + s", all members in DC [${strategy.allMembersInDC.mkString(", ")}], full reachability status: ${strategy.reachability}" + unreachableDataCentersLogMessage) - - if (nodesToDown.nonEmpty) { - // downing is idempotent, and we also avoid calling down on nodes with status Down - // down selfAddress last, since it may shutdown itself if down alone - nodesToDown.foreach(uniqueAddress => if (uniqueAddress != selfUniqueAddress) down(uniqueAddress, decision)) - if (downMyself) - down(selfUniqueAddress, decision) - - resetReachabilityChangedStats() - resetStableDeadline() - } - nodesToDown } def isResponsible: Boolean = leader && selfMemberAdded