Merge pull request #29140 from akka/wip-29085-log-patriknw
Log markers for SBR, #29085
This commit is contained in:
commit
2a536d7065
4 changed files with 90 additions and 21 deletions
|
|
@ -7,6 +7,7 @@ package akka.cluster
|
|||
import akka.actor.Address
|
||||
import akka.annotation.ApiMayChange
|
||||
import akka.annotation.InternalApi
|
||||
import akka.cluster.sbr.DowningStrategy
|
||||
import akka.event.LogMarker
|
||||
|
||||
/**
|
||||
|
|
@ -22,6 +23,7 @@ object ClusterLogMarker {
|
|||
*/
|
||||
@InternalApi private[akka] object Properties {
|
||||
val MemberStatus = "akkaMemberStatus"
|
||||
val SbrDecision = "akkaSbrDecision"
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -91,4 +93,53 @@ object ClusterLogMarker {
|
|||
val singletonTerminated: LogMarker =
|
||||
LogMarker("akkaClusterSingletonTerminated")
|
||||
|
||||
/**
|
||||
* Marker "akkaSbrDowning" of log event when Split Brain Resolver has made a downing decision. Followed
|
||||
* by [[ClusterLogMarker.sbrDowningNode]] for each node that is downed.
|
||||
* @param decision The downing decision. Included as property "akkaSbrDecision".
|
||||
*/
|
||||
def sbrDowning(decision: DowningStrategy.Decision): LogMarker =
|
||||
LogMarker("akkaSbrDowning", Map(Properties.SbrDecision -> decision))
|
||||
|
||||
/**
|
||||
* Marker "akkaSbrDowningNode" of log event when a member is downed by Split Brain Resolver.
|
||||
* @param node The address of the node that is downed. Included as property "akkaRemoteAddress"
|
||||
* and "akkaRemoteAddressUid".
|
||||
* @param decision The downing decision. Included as property "akkaSbrDecision".
|
||||
*/
|
||||
def sbrDowningNode(node: UniqueAddress, decision: DowningStrategy.Decision): LogMarker =
|
||||
LogMarker(
|
||||
"akkaSbrDowningNode",
|
||||
Map(
|
||||
LogMarker.Properties.RemoteAddress -> node.address,
|
||||
LogMarker.Properties.RemoteAddressUid -> node.longUid,
|
||||
Properties.SbrDecision -> decision))
|
||||
|
||||
/**
|
||||
* Marker "akkaSbrInstability" of log event when Split Brain Resolver has detected too much instability
|
||||
* and will down all nodes.
|
||||
*/
|
||||
val sbrInstability: LogMarker =
|
||||
LogMarker("akkaSbrInstability")
|
||||
|
||||
/**
|
||||
* Marker "akkaSbrLeaseAcquired" of log event when Split Brain Resolver has acquired the lease.
|
||||
* @param decision The downing decision. Included as property "akkaSbrDecision".
|
||||
*/
|
||||
def sbrLeaseAcquired(decision: DowningStrategy.Decision): LogMarker =
|
||||
LogMarker("akkaSbrLeaseAcquired", Map(Properties.SbrDecision -> decision))
|
||||
|
||||
/**
|
||||
* Marker "akkaSbrLeaseDenied" of log event when Split Brain Resolver has acquired the lease.
|
||||
* @param reverseDecision The (reverse) downing decision. Included as property "akkaSbrDecision".
|
||||
*/
|
||||
def sbrLeaseDenied(reverseDecision: DowningStrategy.Decision): LogMarker =
|
||||
LogMarker("akkaSbrLeaseDenied", Map(Properties.SbrDecision -> reverseDecision))
|
||||
|
||||
/**
|
||||
* Marker "akkaSbrLeaseReleased" of log event when Split Brain Resolver has released the lease.
|
||||
*/
|
||||
val sbrLeaseReleased: LogMarker =
|
||||
LogMarker("akkaSbrLeaseReleased")
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -20,7 +20,7 @@ import akka.coordination.lease.scaladsl.Lease
|
|||
/**
|
||||
* INTERNAL API
|
||||
*/
|
||||
@InternalApi private[sbr] object DowningStrategy {
|
||||
@InternalApi private[akka] object DowningStrategy {
|
||||
sealed trait Decision {
|
||||
def isIndirectlyConnected: Boolean
|
||||
}
|
||||
|
|
@ -53,7 +53,7 @@ import akka.coordination.lease.scaladsl.Lease
|
|||
/**
|
||||
* INTERNAL API
|
||||
*/
|
||||
@InternalApi private[sbr] abstract class DowningStrategy(val selfDc: DataCenter) {
|
||||
@InternalApi private[akka] abstract class DowningStrategy(val selfDc: DataCenter) {
|
||||
import DowningStrategy._
|
||||
|
||||
// may contain Joining and WeaklyUp
|
||||
|
|
|
|||
|
|
@ -11,7 +11,6 @@ import scala.concurrent.ExecutionContext
|
|||
import scala.concurrent.duration._
|
||||
|
||||
import akka.actor.Actor
|
||||
import akka.actor.ActorLogging
|
||||
import akka.actor.Address
|
||||
import akka.actor.ExtendedActorSystem
|
||||
import akka.actor.Props
|
||||
|
|
@ -21,10 +20,14 @@ import akka.annotation.InternalApi
|
|||
import akka.cluster.Cluster
|
||||
import akka.cluster.ClusterEvent
|
||||
import akka.cluster.ClusterEvent._
|
||||
import akka.cluster.ClusterLogMarker
|
||||
import akka.cluster.ClusterSettings.DataCenter
|
||||
import akka.cluster.Member
|
||||
import akka.cluster.Reachability
|
||||
import akka.cluster.UniqueAddress
|
||||
import akka.cluster.sbr.DowningStrategy.Decision
|
||||
import akka.event.DiagnosticMarkerBusLoggingAdapter
|
||||
import akka.event.Logging
|
||||
import akka.pattern.pipe
|
||||
|
||||
/**
|
||||
|
|
@ -97,7 +100,7 @@ import akka.pattern.pipe
|
|||
log.info(
|
||||
"SBR started. Config: stableAfter: {} ms, strategy: {}, selfUniqueAddress: {}, selfDc: {}",
|
||||
stableAfter.toMillis,
|
||||
strategy.getClass.getSimpleName,
|
||||
Logging.simpleName(strategy.getClass),
|
||||
selfUniqueAddress,
|
||||
selfDc)
|
||||
|
||||
|
|
@ -114,8 +117,9 @@ import akka.pattern.pipe
|
|||
super.postStop()
|
||||
}
|
||||
|
||||
override def down(node: Address): Unit = {
|
||||
cluster.down(node)
|
||||
override def down(node: UniqueAddress, decision: Decision): Unit = {
|
||||
log.info(ClusterLogMarker.sbrDowningNode(node, decision), "SBR is downing [{}]", node)
|
||||
cluster.down(node.address)
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -128,7 +132,6 @@ import akka.pattern.pipe
|
|||
*/
|
||||
@InternalApi private[sbr] abstract class SplitBrainResolverBase(stableAfter: FiniteDuration, strategy: DowningStrategy)
|
||||
extends Actor
|
||||
with ActorLogging
|
||||
with Stash
|
||||
with Timers {
|
||||
|
||||
|
|
@ -136,11 +139,13 @@ import akka.pattern.pipe
|
|||
import SplitBrainResolver.ReleaseLeaseCondition.NoLease
|
||||
import SplitBrainResolver._
|
||||
|
||||
val log: DiagnosticMarkerBusLoggingAdapter = Logging.withMarker(this)
|
||||
|
||||
def selfUniqueAddress: UniqueAddress
|
||||
|
||||
def selfDc: DataCenter
|
||||
|
||||
def down(node: Address): Unit
|
||||
def down(node: UniqueAddress, decision: Decision): Unit
|
||||
|
||||
// would be better as constructor parameter, but don't want to break Cinnamon instrumentation
|
||||
private val settings = new SplitBrainResolverSettings(context.system.settings.config)
|
||||
|
|
@ -289,7 +294,10 @@ import akka.pattern.pipe
|
|||
resetReachabilityChangedStats()
|
||||
} else if (downAllWhenUnstable > Duration.Zero &&
|
||||
durationSinceFirstChange > (stableAfter + downAllWhenUnstable)) {
|
||||
log.warning("SBR detected instability and will down all nodes: {}", reachabilityChangedStats)
|
||||
log.warning(
|
||||
ClusterLogMarker.sbrInstability,
|
||||
"SBR detected instability and will down all nodes: {}",
|
||||
reachabilityChangedStats)
|
||||
actOnDecision(DownAll)
|
||||
}
|
||||
}
|
||||
|
|
@ -300,7 +308,10 @@ import akka.pattern.pipe
|
|||
strategy.lease match {
|
||||
case Some(lease) =>
|
||||
if (lease.checkLease()) {
|
||||
log.info("SBR has acquired lease for decision [{}]", decision)
|
||||
log.info(
|
||||
ClusterLogMarker.sbrLeaseAcquired(decision),
|
||||
"SBR has acquired lease for decision [{}]",
|
||||
decision)
|
||||
actOnDecision(decision)
|
||||
} else {
|
||||
if (decision.acquireDelay == Duration.Zero)
|
||||
|
|
@ -349,7 +360,7 @@ import akka.pattern.pipe
|
|||
|
||||
case AcquireLeaseResult(holdingLease) =>
|
||||
if (holdingLease) {
|
||||
log.info("SBR acquired lease for decision [{}]", decision)
|
||||
log.info(ClusterLogMarker.sbrLeaseAcquired(decision), "SBR acquired lease for decision [{}]", decision)
|
||||
val downedNodes = actOnDecision(decision)
|
||||
releaseLeaseCondition = releaseLeaseCondition match {
|
||||
case ReleaseLeaseCondition.WhenMembersRemoved(nodes) =>
|
||||
|
|
@ -362,7 +373,11 @@ import akka.pattern.pipe
|
|||
}
|
||||
} else {
|
||||
val reverseDecision = strategy.reverseDecision(decision)
|
||||
log.info("SBR couldn't acquire lease, reverse decision [{}] to [{}]", decision, reverseDecision)
|
||||
log.info(
|
||||
ClusterLogMarker.sbrLeaseDenied(reverseDecision),
|
||||
"SBR couldn't acquire lease, reverse decision [{}] to [{}]",
|
||||
decision,
|
||||
reverseDecision)
|
||||
actOnDecision(reverseDecision)
|
||||
releaseLeaseCondition = NoLease
|
||||
}
|
||||
|
|
@ -379,8 +394,10 @@ import akka.pattern.pipe
|
|||
private def releaseLeaseResult(released: Boolean): Unit = {
|
||||
releaseLeaseCondition match {
|
||||
case ReleaseLeaseCondition.WhenTimeElapsed(deadline) =>
|
||||
if (released && deadline.isOverdue())
|
||||
if (released && deadline.isOverdue()) {
|
||||
log.info(ClusterLogMarker.sbrLeaseReleased, "SBR released lease.")
|
||||
releaseLeaseCondition = NoLease // released successfully
|
||||
}
|
||||
case _ =>
|
||||
// no lease or first waiting for downed nodes to be removed
|
||||
}
|
||||
|
|
@ -411,6 +428,7 @@ import akka.pattern.pipe
|
|||
else ""
|
||||
|
||||
log.warning(
|
||||
ClusterLogMarker.sbrDowning(decision),
|
||||
s"SBR took decision $decision and is downing [${nodesToDown.map(_.address).mkString(", ")}]${if (downMyself) " including myself,"
|
||||
else ""}, " +
|
||||
s"[${strategy.unreachable.size}] unreachable of [${strategy.members.size}] members" +
|
||||
|
|
@ -421,9 +439,9 @@ import akka.pattern.pipe
|
|||
if (nodesToDown.nonEmpty) {
|
||||
// downing is idempotent, and we also avoid calling down on nodes with status Down
|
||||
// down selfAddress last, since it may shutdown itself if down alone
|
||||
nodesToDown.foreach(uniqueAddress => if (uniqueAddress != selfUniqueAddress) down(uniqueAddress.address))
|
||||
nodesToDown.foreach(uniqueAddress => if (uniqueAddress != selfUniqueAddress) down(uniqueAddress, decision))
|
||||
if (downMyself)
|
||||
down(selfUniqueAddress.address)
|
||||
down(selfUniqueAddress, decision)
|
||||
|
||||
resetReachabilityChangedStats()
|
||||
resetStableDeadline()
|
||||
|
|
@ -484,7 +502,7 @@ import akka.pattern.pipe
|
|||
|
||||
def reachableDataCenter(dc: DataCenter): Unit = {
|
||||
unreachableDataCenters -= dc
|
||||
log.info("Data center [] observed as reachable again", dc)
|
||||
log.info("Data center [{}] observed as reachable again", dc)
|
||||
}
|
||||
|
||||
def seenChanged(seenBy: Set[Address]): Unit = {
|
||||
|
|
@ -569,7 +587,7 @@ import akka.pattern.pipe
|
|||
implicit val ec: ExecutionContext = internalDispatcher
|
||||
strategy.lease.foreach { l =>
|
||||
if (releaseLeaseCondition != NoLease) {
|
||||
log.info("SBR releasing lease")
|
||||
log.debug("SBR releasing lease")
|
||||
l.release().recover { case _ => false }.map(ReleaseLeaseResult.apply).pipeTo(self)
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -75,10 +75,10 @@ object SplitBrainResolverSpec {
|
|||
|
||||
var downed = Set.empty[Address]
|
||||
|
||||
override def down(node: Address): Unit = {
|
||||
if (leader && !downed(node)) {
|
||||
downed += node
|
||||
probe ! DownCalled(node)
|
||||
override def down(node: UniqueAddress, decision: DowningStrategy.Decision): Unit = {
|
||||
if (leader && !downed(node.address)) {
|
||||
downed += node.address
|
||||
probe ! DownCalled(node.address)
|
||||
} else if (!leader)
|
||||
probe ! "down must only be done by leader"
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue