From 1472bd9e8caa87193776858567b27997507102a2 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Wed, 27 May 2020 14:01:53 +0200 Subject: [PATCH] Log markers for SBR, #29085 --- .../scala/akka/cluster/ClusterLogMarker.scala | 51 +++++++++++++++++++ .../akka/cluster/sbr/DowningStrategy.scala | 4 +- .../akka/cluster/sbr/SplitBrainResolver.scala | 48 +++++++++++------ .../cluster/sbr/SplitBrainResolverSpec.scala | 8 +-- 4 files changed, 90 insertions(+), 21 deletions(-) diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterLogMarker.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterLogMarker.scala index c904589c3d..1a0c3ad784 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterLogMarker.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterLogMarker.scala @@ -7,6 +7,7 @@ package akka.cluster import akka.actor.Address import akka.annotation.ApiMayChange import akka.annotation.InternalApi +import akka.cluster.sbr.DowningStrategy import akka.event.LogMarker /** @@ -22,6 +23,7 @@ object ClusterLogMarker { */ @InternalApi private[akka] object Properties { val MemberStatus = "akkaMemberStatus" + val SbrDecision = "akkaSbrDecision" } /** @@ -91,4 +93,53 @@ object ClusterLogMarker { val singletonTerminated: LogMarker = LogMarker("akkaClusterSingletonTerminated") + /** + * Marker "akkaSbrDowning" of log event when Split Brain Resolver has made a downing decision. Followed + * by [[ClusterLogMarker.sbrDowningNode]] for each node that is downed. + * @param decision The downing decision. Included as property "akkaSbrDecision". + */ + def sbrDowning(decision: DowningStrategy.Decision): LogMarker = + LogMarker("akkaSbrDowning", Map(Properties.SbrDecision -> decision)) + + /** + * Marker "akkaSbrDowningNode" of log event when a member is downed by Split Brain Resolver. + * @param node The address of the node that is downed. Included as property "akkaRemoteAddress" + * and "akkaRemoteAddressUid". + * @param decision The downing decision. Included as property "akkaSbrDecision". + */ + def sbrDowningNode(node: UniqueAddress, decision: DowningStrategy.Decision): LogMarker = + LogMarker( + "akkaSbrDowningNode", + Map( + LogMarker.Properties.RemoteAddress -> node.address, + LogMarker.Properties.RemoteAddressUid -> node.longUid, + Properties.SbrDecision -> decision)) + + /** + * Marker "akkaSbrInstability" of log event when Split Brain Resolver has detected too much instability + * and will down all nodes. + */ + val sbrInstability: LogMarker = + LogMarker("akkaSbrInstability") + + /** + * Marker "akkaSbrLeaseAcquired" of log event when Split Brain Resolver has acquired the lease. + * @param decision The downing decision. Included as property "akkaSbrDecision". + */ + def sbrLeaseAcquired(decision: DowningStrategy.Decision): LogMarker = + LogMarker("akkaSbrLeaseAcquired", Map(Properties.SbrDecision -> decision)) + + /** + * Marker "akkaSbrLeaseDenied" of log event when Split Brain Resolver has acquired the lease. + * @param reverseDecision The (reverse) downing decision. Included as property "akkaSbrDecision". + */ + def sbrLeaseDenied(reverseDecision: DowningStrategy.Decision): LogMarker = + LogMarker("akkaSbrLeaseDenied", Map(Properties.SbrDecision -> reverseDecision)) + + /** + * Marker "akkaSbrLeaseReleased" of log event when Split Brain Resolver has released the lease. + */ + val sbrLeaseReleased: LogMarker = + LogMarker("akkaSbrLeaseReleased") + } diff --git a/akka-cluster/src/main/scala/akka/cluster/sbr/DowningStrategy.scala b/akka-cluster/src/main/scala/akka/cluster/sbr/DowningStrategy.scala index 72028b7a2e..98ff809885 100644 --- a/akka-cluster/src/main/scala/akka/cluster/sbr/DowningStrategy.scala +++ b/akka-cluster/src/main/scala/akka/cluster/sbr/DowningStrategy.scala @@ -20,7 +20,7 @@ import akka.coordination.lease.scaladsl.Lease /** * INTERNAL API */ -@InternalApi private[sbr] object DowningStrategy { +@InternalApi private[akka] object DowningStrategy { sealed trait Decision { def isIndirectlyConnected: Boolean } @@ -53,7 +53,7 @@ import akka.coordination.lease.scaladsl.Lease /** * INTERNAL API */ -@InternalApi private[sbr] abstract class DowningStrategy(val selfDc: DataCenter) { +@InternalApi private[akka] abstract class DowningStrategy(val selfDc: DataCenter) { import DowningStrategy._ // may contain Joining and WeaklyUp diff --git a/akka-cluster/src/main/scala/akka/cluster/sbr/SplitBrainResolver.scala b/akka-cluster/src/main/scala/akka/cluster/sbr/SplitBrainResolver.scala index 4cdb9137b8..f70a9530f0 100644 --- a/akka-cluster/src/main/scala/akka/cluster/sbr/SplitBrainResolver.scala +++ b/akka-cluster/src/main/scala/akka/cluster/sbr/SplitBrainResolver.scala @@ -11,7 +11,6 @@ import scala.concurrent.ExecutionContext import scala.concurrent.duration._ import akka.actor.Actor -import akka.actor.ActorLogging import akka.actor.Address import akka.actor.ExtendedActorSystem import akka.actor.Props @@ -21,10 +20,14 @@ import akka.annotation.InternalApi import akka.cluster.Cluster import akka.cluster.ClusterEvent import akka.cluster.ClusterEvent._ +import akka.cluster.ClusterLogMarker import akka.cluster.ClusterSettings.DataCenter import akka.cluster.Member import akka.cluster.Reachability import akka.cluster.UniqueAddress +import akka.cluster.sbr.DowningStrategy.Decision +import akka.event.DiagnosticMarkerBusLoggingAdapter +import akka.event.Logging import akka.pattern.pipe /** @@ -97,7 +100,7 @@ import akka.pattern.pipe log.info( "SBR started. Config: stableAfter: {} ms, strategy: {}, selfUniqueAddress: {}, selfDc: {}", stableAfter.toMillis, - strategy.getClass.getSimpleName, + Logging.simpleName(strategy.getClass), selfUniqueAddress, selfDc) @@ -114,8 +117,9 @@ import akka.pattern.pipe super.postStop() } - override def down(node: Address): Unit = { - cluster.down(node) + override def down(node: UniqueAddress, decision: Decision): Unit = { + log.info(ClusterLogMarker.sbrDowningNode(node, decision), "SBR is downing [{}]", node) + cluster.down(node.address) } } @@ -128,7 +132,6 @@ import akka.pattern.pipe */ @InternalApi private[sbr] abstract class SplitBrainResolverBase(stableAfter: FiniteDuration, strategy: DowningStrategy) extends Actor - with ActorLogging with Stash with Timers { @@ -136,11 +139,13 @@ import akka.pattern.pipe import SplitBrainResolver.ReleaseLeaseCondition.NoLease import SplitBrainResolver._ + val log: DiagnosticMarkerBusLoggingAdapter = Logging.withMarker(this) + def selfUniqueAddress: UniqueAddress def selfDc: DataCenter - def down(node: Address): Unit + def down(node: UniqueAddress, decision: Decision): Unit // would be better as constructor parameter, but don't want to break Cinnamon instrumentation private val settings = new SplitBrainResolverSettings(context.system.settings.config) @@ -289,7 +294,10 @@ import akka.pattern.pipe resetReachabilityChangedStats() } else if (downAllWhenUnstable > Duration.Zero && durationSinceFirstChange > (stableAfter + downAllWhenUnstable)) { - log.warning("SBR detected instability and will down all nodes: {}", reachabilityChangedStats) + log.warning( + ClusterLogMarker.sbrInstability, + "SBR detected instability and will down all nodes: {}", + reachabilityChangedStats) actOnDecision(DownAll) } } @@ -300,7 +308,10 @@ import akka.pattern.pipe strategy.lease match { case Some(lease) => if (lease.checkLease()) { - log.info("SBR has acquired lease for decision [{}]", decision) + log.info( + ClusterLogMarker.sbrLeaseAcquired(decision), + "SBR has acquired lease for decision [{}]", + decision) actOnDecision(decision) } else { if (decision.acquireDelay == Duration.Zero) @@ -349,7 +360,7 @@ import akka.pattern.pipe case AcquireLeaseResult(holdingLease) => if (holdingLease) { - log.info("SBR acquired lease for decision [{}]", decision) + log.info(ClusterLogMarker.sbrLeaseAcquired(decision), "SBR acquired lease for decision [{}]", decision) val downedNodes = actOnDecision(decision) releaseLeaseCondition = releaseLeaseCondition match { case ReleaseLeaseCondition.WhenMembersRemoved(nodes) => @@ -362,7 +373,11 @@ import akka.pattern.pipe } } else { val reverseDecision = strategy.reverseDecision(decision) - log.info("SBR couldn't acquire lease, reverse decision [{}] to [{}]", decision, reverseDecision) + log.info( + ClusterLogMarker.sbrLeaseDenied(reverseDecision), + "SBR couldn't acquire lease, reverse decision [{}] to [{}]", + decision, + reverseDecision) actOnDecision(reverseDecision) releaseLeaseCondition = NoLease } @@ -379,8 +394,10 @@ import akka.pattern.pipe private def releaseLeaseResult(released: Boolean): Unit = { releaseLeaseCondition match { case ReleaseLeaseCondition.WhenTimeElapsed(deadline) => - if (released && deadline.isOverdue()) + if (released && deadline.isOverdue()) { + log.info(ClusterLogMarker.sbrLeaseReleased, "SBR released lease.") releaseLeaseCondition = NoLease // released successfully + } case _ => // no lease or first waiting for downed nodes to be removed } @@ -411,6 +428,7 @@ import akka.pattern.pipe else "" log.warning( + ClusterLogMarker.sbrDowning(decision), s"SBR took decision $decision and is downing [${nodesToDown.map(_.address).mkString(", ")}]${if (downMyself) " including myself," else ""}, " + s"[${strategy.unreachable.size}] unreachable of [${strategy.members.size}] members" + @@ -421,9 +439,9 @@ import akka.pattern.pipe if (nodesToDown.nonEmpty) { // downing is idempotent, and we also avoid calling down on nodes with status Down // down selfAddress last, since it may shutdown itself if down alone - nodesToDown.foreach(uniqueAddress => if (uniqueAddress != selfUniqueAddress) down(uniqueAddress.address)) + nodesToDown.foreach(uniqueAddress => if (uniqueAddress != selfUniqueAddress) down(uniqueAddress, decision)) if (downMyself) - down(selfUniqueAddress.address) + down(selfUniqueAddress, decision) resetReachabilityChangedStats() resetStableDeadline() @@ -484,7 +502,7 @@ import akka.pattern.pipe def reachableDataCenter(dc: DataCenter): Unit = { unreachableDataCenters -= dc - log.info("Data center [] observed as reachable again", dc) + log.info("Data center [{}] observed as reachable again", dc) } def seenChanged(seenBy: Set[Address]): Unit = { @@ -569,7 +587,7 @@ import akka.pattern.pipe implicit val ec: ExecutionContext = internalDispatcher strategy.lease.foreach { l => if (releaseLeaseCondition != NoLease) { - log.info("SBR releasing lease") + log.debug("SBR releasing lease") l.release().recover { case _ => false }.map(ReleaseLeaseResult.apply).pipeTo(self) } } diff --git a/akka-cluster/src/test/scala/akka/cluster/sbr/SplitBrainResolverSpec.scala b/akka-cluster/src/test/scala/akka/cluster/sbr/SplitBrainResolverSpec.scala index a60cde4113..cae903c8ef 100644 --- a/akka-cluster/src/test/scala/akka/cluster/sbr/SplitBrainResolverSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/sbr/SplitBrainResolverSpec.scala @@ -75,10 +75,10 @@ object SplitBrainResolverSpec { var downed = Set.empty[Address] - override def down(node: Address): Unit = { - if (leader && !downed(node)) { - downed += node - probe ! DownCalled(node) + override def down(node: UniqueAddress, decision: DowningStrategy.Decision): Unit = { + if (leader && !downed(node.address)) { + downed += node.address + probe ! DownCalled(node.address) } else if (!leader) probe ! "down must only be done by leader" }