diff --git a/akka-actor/src/main/scala/akka/actor/ActorLogMarker.scala b/akka-actor/src/main/scala/akka/actor/ActorLogMarker.scala new file mode 100644 index 0000000000..863bf61c27 --- /dev/null +++ b/akka-actor/src/main/scala/akka/actor/ActorLogMarker.scala @@ -0,0 +1,26 @@ +/* + * Copyright (C) 2019 Lightbend Inc. + */ + +package akka.actor + +import akka.annotation.ApiMayChange +import akka.event.LogMarker + +/** + * This is public with the purpose to document the used markers and properties of log events. + * No guarantee that it will remain binary compatible, but the marker names and properties + * are considered public API and will not be changed without notice. + */ +@ApiMayChange +object ActorLogMarker { + + /** + * Marker "akkaDeadLetter" of log event for dead letter messages. + * + * @param messageClass The message class of the DeadLetter. Included as property "akkaMessageClass". + */ + def deadLetter(messageClass: String): LogMarker = + LogMarker("akkaDeadLetter", Map(LogMarker.Properties.MessageClass -> messageClass)) + +} diff --git a/akka-actor/src/main/scala/akka/event/DeadLetterListener.scala b/akka-actor/src/main/scala/akka/event/DeadLetterListener.scala index be2cf76c2f..3f87ecf34e 100644 --- a/akka-actor/src/main/scala/akka/event/DeadLetterListener.scala +++ b/akka-actor/src/main/scala/akka/event/DeadLetterListener.scala @@ -8,6 +8,7 @@ import scala.concurrent.duration.Deadline import scala.concurrent.duration.FiniteDuration import akka.actor.Actor +import akka.actor.ActorLogMarker import akka.actor.ActorRef import akka.actor.AllDeadLetters import akka.actor.DeadLetter @@ -116,7 +117,9 @@ class DeadLetterListener extends Actor { d.recipient.getClass, logMessage + "This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' " + - "and 'akka.log-dead-letters-during-shutdown'.")) + "and 'akka.log-dead-letters-during-shutdown'.", + Logging.emptyMDC, + ActorLogMarker.deadLetter(messageStr))) } private def isReal(snd: ActorRef): Boolean = { diff --git a/akka-actor/src/main/scala/akka/event/Logging.scala b/akka-actor/src/main/scala/akka/event/Logging.scala index c4af856961..e4dc6db563 100644 --- a/akka-actor/src/main/scala/akka/event/Logging.scala +++ b/akka-actor/src/main/scala/akka/event/Logging.scala @@ -1627,17 +1627,36 @@ trait DiagnosticLoggingAdapter extends LoggingAdapter { /** DO NOT INHERIT: Class is open only for use by akka-slf4j*/ @DoNotInherit -class LogMarker(val name: String) +class LogMarker(val name: String, val properties: Map[String, Any]) { + + // for binary compatibility + def this(name: String) = this(name, Map.empty) + + /** Java API */ + def getProperties: java.util.Map[String, Object] = { + import akka.util.ccompat.JavaConverters._ + properties.map { case (k, v) => (k, v.asInstanceOf[AnyRef]) }.asJava + } +} + object LogMarker { /** The Marker is internally transferred via MDC using using this key */ private[akka] final val MDCKey = "marker" - def apply(name: String): LogMarker = new LogMarker(name) + def apply(name: String): LogMarker = new LogMarker(name, Map.empty) + + def apply(name: String, properties: Map[String, Any]): LogMarker = new LogMarker(name, properties) /** Java API */ def create(name: String): LogMarker = apply(name) + /** Java API */ + def create(name: String, properties: java.util.Map[String, Any]): LogMarker = { + import akka.util.ccompat.JavaConverters._ + apply(name, properties.asScala.toMap) + } + @Deprecated @deprecated("use akka.event.LogEventWithMarker#marker instead", since = "2.5.12") def extractFromMDC(mdc: MDC): Option[String] = @@ -1648,6 +1667,15 @@ object LogMarker { private[akka] final val Security = apply("SECURITY") + /** + * INTERNAL API + */ + @InternalApi private[akka] object Properties { + val MessageClass = "akkaMessageClass" + val RemoteAddress = "akkaRemoteAddress" + val RemoteAddressUid = "akkaRemoteAddressUid" + } + } /** @@ -1896,6 +1924,19 @@ class MarkerLoggingAdapter( if (isDebugEnabled(marker)) bus.publish(Debug(logSource, logClass, format(template, arg1, arg2, arg3, arg4), mdc, marker)) + /** + * Log message at the specified log level. + */ + def log(marker: LogMarker, level: Logging.LogLevel, message: String): Unit = { + level match { + case Logging.DebugLevel => debug(marker, message) + case Logging.InfoLevel => info(marker, message) + case Logging.WarningLevel => warning(marker, message) + case Logging.ErrorLevel => error(marker, message) + case _ => + } + } + // Copy of LoggingAdapter.format1 due to binary compatibility restrictions private def format1(t: String, arg: Any): String = arg match { case a: Array[_] if !a.getClass.getComponentType.isPrimitive => format(t, a.toIndexedSeq) diff --git a/akka-cluster-sharding/src/main/mima-filters/2.6.0.backwards.excludes/issue-28207-struct-log.excludes b/akka-cluster-sharding/src/main/mima-filters/2.6.0.backwards.excludes/issue-28207-struct-log.excludes new file mode 100644 index 0000000000..3174625e08 --- /dev/null +++ b/akka-cluster-sharding/src/main/mima-filters/2.6.0.backwards.excludes/issue-28207-struct-log.excludes @@ -0,0 +1,8 @@ +# #28207 logging with markers +ProblemFilters.exclude[MissingTypesProblem]("akka.cluster.sharding.PersistentShardCoordinator") +ProblemFilters.exclude[MissingTypesProblem]("akka.cluster.sharding.DDataShardCoordinator") +ProblemFilters.exclude[MissingTypesProblem]("akka.cluster.sharding.ShardCoordinator") +ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.cluster.sharding.ShardCoordinator.log") +ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.cluster.sharding.ShardCoordinator.typeName") +ProblemFilters.exclude[MissingTypesProblem]("akka.cluster.sharding.ShardRegion") +ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.cluster.sharding.ShardRegion.log") diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala index 758f284b7a..67e5b28e5c 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala @@ -8,6 +8,7 @@ import scala.collection.immutable import scala.concurrent.Future import scala.concurrent.duration._ import scala.util.Success + import akka.actor._ import akka.actor.DeadLetterSuppression import akka.annotation.InternalApi @@ -25,6 +26,8 @@ import akka.cluster.ddata.GSetKey import akka.cluster.ddata.Key import akka.cluster.ddata.ReplicatedData import akka.cluster.ddata.SelfUniqueAddress +import akka.event.BusLogging +import akka.event.Logging import akka.util.Timeout import com.github.ghik.silencer.silent @@ -495,13 +498,14 @@ object ShardCoordinator { abstract class ShardCoordinator( settings: ClusterShardingSettings, allocationStrategy: ShardCoordinator.ShardAllocationStrategy) - extends Actor - with ActorLogging { + extends Actor { import ShardCoordinator._ import ShardCoordinator.Internal._ import ShardRegion.ShardId import settings.tuningParameters._ + val log = Logging.withMarker(context.system, this) + val cluster = Cluster(context.system) val removalMargin = cluster.downingProvider.downRemovalMargin val minMembers = settings.role match { @@ -527,6 +531,8 @@ abstract class ShardCoordinator( cluster.subscribe(self, initialStateMode = InitialStateAsEvents, ClusterShuttingDown.getClass) + protected def typeName: String + override def postStop(): Unit = { super.postStop() rebalanceTask.cancel() @@ -881,7 +887,11 @@ abstract class ShardCoordinator( if (state.regions.contains(region) && !gracefulShutdownInProgress.contains(region)) { update(ShardHomeAllocated(shard, region)) { evt => state = state.updated(evt) - log.debug("Shard [{}] allocated at [{}]", evt.shard, evt.region) + log.debug( + ShardingLogMarker.shardAllocated(typeName, shard, regionAddress(region)), + "Shard [{}] allocated at [{}]", + evt.shard, + evt.region) sendHostShardMsg(evt.shard, evt.region) getShardHomeSender ! ShardHome(evt.shard, evt.region) @@ -895,8 +905,13 @@ abstract class ShardCoordinator( } } + private def regionAddress(region: ActorRef): Address = { + if (region.path.address.host.isEmpty) cluster.selfAddress + else region.path.address + } + def continueRebalance(shards: Set[ShardId]): Unit = { - if (log.isInfoEnabled && (shards.nonEmpty || rebalanceInProgress.nonEmpty)) { + if ((log: BusLogging).isInfoEnabled && (shards.nonEmpty || rebalanceInProgress.nonEmpty)) { log.info( "Starting rebalance for shards [{}]. Current shards rebalancing: [{}]", shards.mkString(","), @@ -932,7 +947,7 @@ abstract class ShardCoordinator( */ @deprecated("Use `ddata` mode, persistence mode is deprecated.", "2.6.0") class PersistentShardCoordinator( - typeName: String, + override val typeName: String, settings: ClusterShardingSettings, allocationStrategy: ShardCoordinator.ShardAllocationStrategy) extends ShardCoordinator(settings, allocationStrategy) @@ -1041,7 +1056,7 @@ class PersistentShardCoordinator( * @see [[ClusterSharding$ ClusterSharding extension]] */ class DDataShardCoordinator( - typeName: String, + override val typeName: String, settings: ClusterShardingSettings, allocationStrategy: ShardCoordinator.ShardAllocationStrategy, replicator: ActorRef, diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala index e4ebc87756..f3b3f8058b 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala @@ -25,6 +25,7 @@ import akka.cluster.MemberStatus import akka.cluster.ClusterSettings import akka.cluster.ClusterSettings.DataCenter import akka.cluster.sharding.Shard.ShardStats +import akka.event.Logging import akka.pattern.{ ask, pipe } import akka.util.{ MessageBufferMap, PrettyDuration, Timeout } @@ -490,7 +491,6 @@ private[akka] class ShardRegion( replicator: ActorRef, majorityMinCap: Int) extends Actor - with ActorLogging with Timers { import ShardingQueries.ShardsQueryResult @@ -499,6 +499,8 @@ private[akka] class ShardRegion( import settings._ import settings.tuningParameters._ + val log = Logging.withMarker(context.system, this) + val cluster = Cluster(context.system) // sort by age, oldest first @@ -1033,7 +1035,7 @@ private[akka] class ShardRegion( .get(id) .orElse(entityProps match { case Some(props) if !shardsByRef.values.exists(_ == id) => - log.debug("{}: Starting shard [{}] in region", typeName, id) + log.debug(ShardingLogMarker.shardStarted(typeName, id), "{}: Starting shard [{}] in region", typeName, id) val name = URLEncoder.encode(id, "utf-8") val shard = context.watch( diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardingLogMarker.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardingLogMarker.scala new file mode 100644 index 0000000000..b0afeb105f --- /dev/null +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardingLogMarker.scala @@ -0,0 +1,50 @@ +/* + * Copyright (C) 2019 Lightbend Inc. + */ + +package akka.cluster.sharding + +import akka.actor.Address +import akka.annotation.ApiMayChange +import akka.annotation.InternalApi +import akka.event.LogMarker + +/** + * This is public with the purpose to document the used markers and properties of log events. + * No guarantee that it will remain binary compatible, but the marker names and properties + * are considered public API and will not be changed without notice. + */ +@ApiMayChange +object ShardingLogMarker { + + /** + * INTERNAL API + */ + @InternalApi private[akka] object Properties { + val ShardTypeName = "akkaShardTypeName" + val ShardId = "akkaShardId" + } + + /** + * Marker "akkaShardAllocated" of log event when `ShardCoordinator` allocates a shard to a region. + * @param shardTypeName The `typeName` of the shard. Included as property "akkaShardTypeName". + * @param shardId The id of the shard. Included as property "akkaShardId". + * @param node The address of the node where the shard is allocated. Included as property "akkaRemoteAddress". + */ + def shardAllocated(shardTypeName: String, shardId: String, node: Address): LogMarker = + LogMarker( + "akkaShardAllocated", + Map( + Properties.ShardTypeName -> shardTypeName, + Properties.ShardId -> shardId, + LogMarker.Properties.RemoteAddress -> node)) + + /** + * Marker "akkaShardStarted" of log event when `ShardRegion` starts a shard. + * @param shardTypeName The `typeName` of the shard. Included as property "akkaShardTypeName". + * @param shardId The id of the shard. Included as property "akkaShardId". + */ + def shardStarted(shardTypeName: String, shardId: String): LogMarker = + LogMarker("akkaShardStarted", Map(Properties.ShardTypeName -> shardTypeName, Properties.ShardId -> shardId)) + +} diff --git a/akka-cluster-tools/src/main/scala/akka/cluster/singleton/ClusterSingletonManager.scala b/akka-cluster-tools/src/main/scala/akka/cluster/singleton/ClusterSingletonManager.scala index d7c29f302a..189f6dcc0a 100644 --- a/akka-cluster-tools/src/main/scala/akka/cluster/singleton/ClusterSingletonManager.scala +++ b/akka-cluster-tools/src/main/scala/akka/cluster/singleton/ClusterSingletonManager.scala @@ -32,6 +32,8 @@ import akka.coordination.lease.LeaseUsageSettings import akka.coordination.lease.scaladsl.Lease import akka.coordination.lease.scaladsl.LeaseProvider import akka.dispatch.Dispatchers +import akka.event.LogMarker +import akka.event.Logging import akka.pattern.ask import akka.pattern.pipe import akka.util.JavaDurationConverters._ @@ -494,6 +496,8 @@ class ClusterSingletonManager(singletonProps: Props, terminationMessage: Any, se private val singletonLeaseName = s"${context.system.name}-singleton-${self.path}" + override val log = Logging.withMarker(context.system, this) + val lease: Option[Lease] = settings.leaseSettings.map( settings => LeaseProvider(context.system) @@ -555,12 +559,21 @@ class ClusterSingletonManager(singletonProps: Props, terminationMessage: Any, se def logInfo(message: String): Unit = if (LogInfo) log.info(message) + def logInfo(marker: LogMarker, message: String): Unit = + if (LogInfo) log.info(marker, message) + def logInfo(template: String, arg1: Any): Unit = if (LogInfo) log.info(template, arg1) + def logInfo(marker: LogMarker, template: String, arg1: Any): Unit = + if (LogInfo) log.info(marker, template, arg1) + def logInfo(template: String, arg1: Any, arg2: Any): Unit = if (LogInfo) log.info(template, arg1, arg2) + def logInfo(marker: LogMarker, template: String, arg1: Any, arg2: Any): Unit = + if (LogInfo) log.info(marker, template, arg1, arg2) + def logInfo(template: String, arg1: Any, arg2: Any, arg3: Any): Unit = if (LogInfo) log.info(template, arg1, arg2, arg3) @@ -802,7 +815,9 @@ class ClusterSingletonManager(singletonProps: Props, terminationMessage: Any, se stay.using(AcquiringLeaseData(leaseRequestInProgress = false, None)) } case Event(Terminated(ref), AcquiringLeaseData(_, Some(singleton))) if ref == singleton => - logInfo("Singleton actor terminated. Trying to acquire lease again before re-creating.") + logInfo( + ClusterLogMarker.singletonTerminated, + "Singleton actor terminated. Trying to acquire lease again before re-creating.") // tryAcquireLease sets the state to None for singleton actor tryAcquireLease() case Event(AcquireLeaseFailure(t), _) => @@ -834,8 +849,11 @@ class ClusterSingletonManager(singletonProps: Props, terminationMessage: Any, se @InternalStableApi def gotoOldest(): State = { + logInfo( + ClusterLogMarker.singletonStarted, + "Singleton manager starting singleton actor [{}]", + self.path / singletonName) val singleton = context.watch(context.actorOf(singletonProps, singletonName)) - logInfo("Singleton manager starting singleton actor [{}]", singleton.path) goto(Oldest).using(OldestData(Some(singleton))) } @@ -874,7 +892,7 @@ class ClusterSingletonManager(singletonProps: Props, terminationMessage: Any, se stay case Event(Terminated(ref), d @ OldestData(Some(singleton))) if ref == singleton => - logInfo("Singleton actor [{}] was terminated", singleton.path) + logInfo(ClusterLogMarker.singletonTerminated, "Singleton actor [{}] was terminated", singleton.path) stay.using(d.copy(singleton = None)) case Event(SelfExiting, _) => @@ -934,7 +952,7 @@ class ClusterSingletonManager(singletonProps: Props, terminationMessage: Any, se gotoHandingOver(singleton, None) case Event(Terminated(ref), d @ WasOldestData(singleton, _)) if singleton.contains(ref) => - logInfo("Singleton actor [{}] was terminated", ref.path) + logInfo(ClusterLogMarker.singletonTerminated, "Singleton actor [{}] was terminated", ref.path) stay.using(d.copy(singleton = None)) case Event(SelfExiting, _) => @@ -984,7 +1002,11 @@ class ClusterSingletonManager(singletonProps: Props, terminationMessage: Any, se def handOverDone(handOverTo: Option[ActorRef]): State = { val newOldest = handOverTo.map(_.path.address) - logInfo("Singleton terminated, hand-over done [{} -> {}]", cluster.selfAddress, newOldest) + logInfo( + ClusterLogMarker.singletonTerminated, + "Singleton terminated, hand-over done [{} -> {}]", + cluster.selfAddress, + newOldest) handOverTo.foreach { _ ! HandOverDone } memberExitingProgress.trySuccess(Done) if (removed.contains(cluster.selfUniqueAddress)) { @@ -1004,7 +1026,7 @@ class ClusterSingletonManager(singletonProps: Props, terminationMessage: Any, se when(Stopping) { case Event(Terminated(ref), StoppingData(singleton)) if ref == singleton => - logInfo("Singleton actor [{}] was terminated", singleton.path) + logInfo(ClusterLogMarker.singletonTerminated, "Singleton actor [{}] was terminated", singleton.path) stop() } diff --git a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala index f6b87bf214..91bf56b206 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala @@ -24,8 +24,9 @@ import scala.concurrent.duration._ import scala.concurrent.{ Await, ExecutionContext } import scala.util.control.NonFatal +import akka.event.LogMarker import akka.event.Logging.LogLevel - +import akka.event.MarkerLoggingAdapter import com.github.ghik.silencer.silent /** @@ -101,7 +102,7 @@ class Cluster(val system: ExtendedActorSystem) extends Extension { scala.collection.JavaConverters.setAsJavaSetConverter(selfRoles).asJava private val _isTerminated = new AtomicBoolean(false) - private val log = Logging(system, ClusterLogClass.ClusterCore) + private val log = Logging.withMarker(system, ClusterLogClass.ClusterCore) // ClusterJmx is initialized as the last thing in the constructor private var clusterJmx: Option[ClusterJmx] = None @@ -468,57 +469,101 @@ class Cluster(val system: ExtendedActorSystem) extends Extension { /** * INTERNAL API */ - private[cluster] class ClusterLogger(log: LoggingAdapter) { + private[cluster] class ClusterLogger(log: MarkerLoggingAdapter) { def isDebugEnabled: Boolean = log.isDebugEnabled def logDebug(message: String): Unit = - logAtLevel(Logging.DebugLevel, message) + if (settings.LogInfo && log.isDebugEnabled) + logAtLevel(Logging.DebugLevel, message) def logDebug(template: String, arg1: Any): Unit = - logAtLevel(Logging.DebugLevel, template, arg1) + if (settings.LogInfo && log.isDebugEnabled) + logAtLevel(Logging.DebugLevel, log.format(template, arg1)) def logDebug(template: String, arg1: Any, arg2: Any): Unit = - logAtLevel(Logging.DebugLevel, template, arg1, arg2) + if (settings.LogInfo && log.isDebugEnabled) + logAtLevel(Logging.DebugLevel, log.format(template, arg1, arg2)) def logDebug(template: String, arg1: Any, arg2: Any, arg3: Any): Unit = - logAtLevel(Logging.DebugLevel, template, arg1, arg2, arg3) + if (settings.LogInfo && log.isDebugEnabled) + logAtLevel(Logging.DebugLevel, log.format(template, arg1, arg2, arg3)) def logInfo(message: String): Unit = - logAtLevel(Logging.InfoLevel, message) + if (settings.LogInfo && log.isInfoEnabled) + logAtLevel(Logging.InfoLevel, message) + + def logInfo(marker: LogMarker, message: String): Unit = + if (settings.LogInfo && log.isInfoEnabled(marker)) + logAtLevel(marker, Logging.InfoLevel, message) def logInfo(template: String, arg1: Any): Unit = - logAtLevel(Logging.InfoLevel, template, arg1) + if (settings.LogInfo && log.isInfoEnabled) + logAtLevel(Logging.InfoLevel, log.format(template, arg1)) + + def logInfo(marker: LogMarker, template: String, arg1: Any): Unit = + if (settings.LogInfo && log.isInfoEnabled(marker)) + logAtLevel(marker, Logging.InfoLevel, log.format(template, arg1)) def logInfo(template: String, arg1: Any, arg2: Any): Unit = - logAtLevel(Logging.InfoLevel, template, arg1, arg2) + if (settings.LogInfo && log.isInfoEnabled) + logAtLevel(Logging.InfoLevel, log.format(template, arg1, arg2)) + + def logInfo(marker: LogMarker, template: String, arg1: Any, arg2: Any): Unit = + if (settings.LogInfo && log.isInfoEnabled(marker)) + logAtLevel(marker, Logging.InfoLevel, log.format(template, arg1, arg2)) def logInfo(template: String, arg1: Any, arg2: Any, arg3: Any): Unit = - logAtLevel(Logging.InfoLevel, template, arg1, arg2, arg3) + if (settings.LogInfo && log.isInfoEnabled) + logAtLevel(Logging.InfoLevel, log.format(template, arg1, arg2, arg3)) + + def logInfo(marker: LogMarker, template: String, arg1: Any, arg2: Any, arg3: Any): Unit = + if (settings.LogInfo && log.isInfoEnabled(marker)) + logAtLevel(marker, Logging.InfoLevel, log.format(template, arg1, arg2, arg3)) def logWarning(message: String): Unit = - logAtLevel(Logging.WarningLevel, message) + if (log.isWarningEnabled) + logAtLevel(Logging.WarningLevel, message) def logWarning(template: String, arg1: Any): Unit = - logAtLevel(Logging.WarningLevel, template, arg1) + if (log.isWarningEnabled) + logAtLevel(Logging.WarningLevel, log.format(template, arg1)) + + def logWarning(marker: LogMarker, template: String, arg1: Any): Unit = + if (log.isWarningEnabled(marker)) + logAtLevel(marker, Logging.WarningLevel, log.format(template, arg1)) def logWarning(template: String, arg1: Any, arg2: Any): Unit = - logAtLevel(Logging.WarningLevel, template, arg1, arg2) + if (log.isWarningEnabled) + logAtLevel(Logging.WarningLevel, log.format(template, arg1, arg2)) + + def logWarning(marker: LogMarker, template: String, arg1: Any, arg2: Any): Unit = + if (log.isWarningEnabled(marker)) + logAtLevel(marker, Logging.WarningLevel, log.format(template, arg1, arg2)) def logWarning(template: String, arg1: Any, arg2: Any, arg3: Any): Unit = - logAtLevel(Logging.WarningLevel, template, arg1, arg2, arg3) + if (log.isWarningEnabled) + logAtLevel(Logging.WarningLevel, log.format(template, arg1, arg2, arg3)) def logError(message: String): Unit = - logAtLevel(Logging.ErrorLevel, message) + if (log.isErrorEnabled) + logAtLevel(Logging.ErrorLevel, message) + + def logError(marker: LogMarker, message: String): Unit = + if (log.isErrorEnabled(marker)) + logAtLevel(marker, Logging.ErrorLevel, message) def logError(template: String, arg1: Any): Unit = - logAtLevel(Logging.ErrorLevel, template, arg1) + if (log.isErrorEnabled) + logAtLevel(Logging.ErrorLevel, log.format(template, arg1)) def logError(template: String, arg1: Any, arg2: Any): Unit = - logAtLevel(Logging.ErrorLevel, template, arg1, arg2) + if (log.isErrorEnabled) + logAtLevel(Logging.ErrorLevel, log.format(template, arg1, arg2)) def logError(template: String, arg1: Any, arg2: Any, arg3: Any): Unit = - logAtLevel(Logging.ErrorLevel, template, arg1, arg2, arg3) + if (log.isErrorEnabled) + logAtLevel(Logging.ErrorLevel, log.format(template, arg1, arg2, arg3)) def logError(cause: Throwable, message: String): Unit = { if (settings.SelfDataCenter == ClusterSettings.DefaultDataCenter) @@ -527,71 +572,32 @@ class Cluster(val system: ExtendedActorSystem) extends Extension { log.error(cause, "Cluster Node [{}] dc [{}] - {}", selfAddress, settings.SelfDataCenter, message) } - def logError(cause: Throwable, template: String, arg1: Any): Unit = { - if (settings.SelfDataCenter == ClusterSettings.DefaultDataCenter) - log.error(cause, "Cluster Node [{}] - " + template, selfAddress, arg1) - else - log.error(cause, "Cluster Node [{}] dc [{}] - " + template, selfAddress, settings.SelfDataCenter, arg1) - } + def logError(cause: Throwable, template: String, arg1: Any): Unit = + logError(cause, log.format(template, arg1)) - def logError(cause: Throwable, template: String, arg1: Any, arg2: Any): Unit = { - if (settings.SelfDataCenter == ClusterSettings.DefaultDataCenter) - log.error(cause, "Cluster Node [{}] - " + template, selfAddress, arg1, arg2) - else - log.error(cause, "Cluster Node [{}] dc [{}] - " + template, selfAddress, settings.SelfDataCenter, arg1, arg2) - } + def logError(cause: Throwable, template: String, arg1: Any, arg2: Any): Unit = + logError(cause, log.format(template, arg1, arg2)) - def logError(cause: Throwable, template: String, arg1: Any, arg2: Any, arg3: Any): Unit = { - if (settings.SelfDataCenter == ClusterSettings.DefaultDataCenter) - log.error(cause, "Cluster Node [{}] - " + template, selfAddress, arg1, arg2, arg3) - else - log.error( - cause, - "Cluster Node [{}] dc [" + settings.SelfDataCenter + "] - " + template, - selfAddress, - arg1, - arg2, - arg3) - } + def logError(cause: Throwable, template: String, arg1: Any, arg2: Any, arg3: Any): Unit = + logError(cause, log.format(template, arg1, arg2, arg3)) private def logAtLevel(logLevel: LogLevel, message: String): Unit = { - if (isLevelEnabled(logLevel)) - if (settings.SelfDataCenter == ClusterSettings.DefaultDataCenter) - log.log(logLevel, "Cluster Node [{}] - {}", selfAddress, message) - else - log.log(logLevel, "Cluster Node [{}] dc [{}] - {}", selfAddress, settings.SelfDataCenter, message) + if (settings.SelfDataCenter == ClusterSettings.DefaultDataCenter) + log.log(logLevel, "Cluster Node [{}] - {}", selfAddress, message) + else + log.log(logLevel, "Cluster Node [{}] dc [{}] - {}", selfAddress, settings.SelfDataCenter, message) } - private def logAtLevel(logLevel: LogLevel, template: String, arg1: Any): Unit = { - if (isLevelEnabled(logLevel)) - if (settings.SelfDataCenter == ClusterSettings.DefaultDataCenter) - log.log(logLevel, "Cluster Node [{}] - " + template, selfAddress, arg1) - else - log.log(logLevel, "Cluster Node [{}] dc [{}] - " + template, selfAddress, settings.SelfDataCenter, arg1) + private def logAtLevel(marker: LogMarker, logLevel: LogLevel, message: String): Unit = { + if (settings.SelfDataCenter == ClusterSettings.DefaultDataCenter) + log.log(marker, logLevel, log.format("Cluster Node [{}] - {}", selfAddress, message)) + else + log.log( + marker, + logLevel, + log.format("Cluster Node [{}] dc [{}] - {}", selfAddress, settings.SelfDataCenter, message)) } - private def logAtLevel(logLevel: LogLevel, template: String, arg1: Any, arg2: Any): Unit = - if (isLevelEnabled(logLevel)) - if (settings.SelfDataCenter == ClusterSettings.DefaultDataCenter) - log.log(logLevel, "Cluster Node [{}] - " + template, selfAddress, arg1, arg2) - else - log.log(logLevel, "Cluster Node [{}] dc [{}] - " + template, selfAddress, settings.SelfDataCenter, arg1, arg2) - - private def logAtLevel(logLevel: LogLevel, template: String, arg1: Any, arg2: Any, arg3: Any): Unit = - if (isLevelEnabled(logLevel)) - if (settings.SelfDataCenter == ClusterSettings.DefaultDataCenter) - log.log(logLevel, "Cluster Node [{}] - " + template, selfAddress, arg1, arg2, arg3) - else - log.log( - logLevel, - "Cluster Node [{}] dc [" + settings.SelfDataCenter + "] - " + template, - selfAddress, - arg1, - arg2, - arg3) - - private def isLevelEnabled(logLevel: LogLevel): Boolean = - LogInfo || logLevel < Logging.InfoLevel } } diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala index 7db2ba1adf..3d12df3e15 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala @@ -323,7 +323,8 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef, joinConfigCompatCh val selfDc = cluster.selfDataCenter private val gossipLogger = - new cluster.ClusterLogger(Logging(context.system, ActorWithLogClass(this, ClusterLogClass.ClusterGossip))) + new cluster.ClusterLogger( + Logging.withMarker(context.system, ActorWithLogClass(this, ClusterLogClass.ClusterGossip))) protected def selfUniqueAddress = cluster.selfUniqueAddress @@ -786,13 +787,18 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef, joinConfigCompatCh if (joiningNode == selfUniqueAddress) { logInfo( + ClusterLogMarker.memberChanged(joiningNode, MemberStatus.Joining), "Node [{}] is JOINING itself (with roles [{}]) and forming new cluster", joiningNode.address, roles.mkString(", ")) if (localMembers.isEmpty) leaderActions() // important for deterministic oldest when bootstrapping } else { - logInfo("Node [{}] is JOINING, roles [{}]", joiningNode.address, roles.mkString(", ")) + logInfo( + ClusterLogMarker.memberChanged(joiningNode, MemberStatus.Joining), + "Node [{}] is JOINING, roles [{}]", + joiningNode.address, + roles.mkString(", ")) sender() ! Welcome(selfUniqueAddress, latestGossip) } @@ -826,19 +832,23 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef, joinConfigCompatCh */ def leaving(address: Address): Unit = { // only try to update if the node is available (in the member ring) - if (latestGossip.members.exists( - m => m.address == address && (m.status == Joining || m.status == WeaklyUp || m.status == Up))) { - val newMembers = latestGossip.members.map { m => - if (m.address == address) m.copy(status = Leaving) else m - } // mark node as LEAVING - val newGossip = latestGossip.copy(members = newMembers) + latestGossip.members.find(_.address == address).foreach { existingMember => + if (existingMember.status == Joining || existingMember.status == WeaklyUp || existingMember.status == Up) { + // mark node as LEAVING + val newMembers = latestGossip.members - existingMember + existingMember.copy(status = Leaving) + val newGossip = latestGossip.copy(members = newMembers) - updateLatestGossip(newGossip) + updateLatestGossip(newGossip) - logInfo("Marked address [{}] as [{}]", address, Leaving) - publishMembershipState() - // immediate gossip to speed up the leaving process - gossip() + logInfo( + ClusterLogMarker.memberChanged(existingMember.uniqueAddress, MemberStatus.Leaving), + "Marked address [{}] as [{}]", + address, + Leaving) + publishMembershipState() + // immediate gossip to speed up the leaving process + gossip() + } } } @@ -913,9 +923,17 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef, joinConfigCompatCh localMembers.find(_.address == address) match { case Some(m) if m.status != Down => if (localReachability.isReachable(m.uniqueAddress)) - logInfo("Marking node [{}] as [{}]", m.address, Down) + logInfo( + ClusterLogMarker.memberChanged(m.uniqueAddress, MemberStatus.Down), + "Marking node [{}] as [{}]", + m.address, + Down) else - logInfo("Marking unreachable node [{}] as [{}]", m.address, Down) + logInfo( + ClusterLogMarker.memberChanged(m.uniqueAddress, MemberStatus.Down), + "Marking unreachable node [{}] as [{}]", + m.address, + Down) val newGossip = localGossip.markAsDown(m) updateLatestGossip(newGossip) @@ -935,6 +953,7 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef, joinConfigCompatCh val newGossip = localGossip.copy(overview = newOverview) updateLatestGossip(newGossip) logWarning( + ClusterLogMarker.unreachable(node.address), "Marking node as TERMINATED [{}], due to quarantine. Node roles [{}]. " + "It must still be marked as down before it's removed.", node.address, @@ -1154,7 +1173,7 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef, joinConfigCompatCh val periodicNotice = 60 if (membershipState.convergence(exitingConfirmed)) { if (leaderActionCounter >= firstNotice) - logInfo("Leader can perform its duties again") + logInfo(ClusterLogMarker.leaderRestored, "Leader can perform its duties again") leaderActionCounter = 0 leaderActionsOnConvergence() } else { @@ -1164,6 +1183,7 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef, joinConfigCompatCh if (leaderActionCounter == firstNotice || leaderActionCounter % periodicNotice == 0) logInfo( + ClusterLogMarker.leaderIncapacitated, "Leader can currently not perform its duties, reachability status: [{}], member status: [{}]", membershipState.dcReachabilityExcludingDownedObservers, latestGossip.members @@ -1296,17 +1316,33 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef, joinConfigCompatCh exitingConfirmed = exitingConfirmed.filterNot(removedExitingConfirmed) changedMembers.foreach { m => - logInfo("Leader is moving node [{}] to [{}]", m.address, m.status) + logInfo( + ClusterLogMarker.memberChanged(m.uniqueAddress, m.status), + "Leader is moving node [{}] to [{}]", + m.address, + m.status) } removedUnreachable.foreach { m => val status = if (m.status == Exiting) "exiting" else "unreachable" - logInfo("Leader is removing {} node [{}]", status, m.address) + logInfo( + ClusterLogMarker.memberChanged(m.uniqueAddress, MemberStatus.Removed), + "Leader is removing {} node [{}]", + status, + m.address) } removedExitingConfirmed.foreach { n => - logInfo("Leader is removing confirmed Exiting node [{}]", n.address) + logInfo( + ClusterLogMarker.memberChanged(n, MemberStatus.Removed), + "Leader is removing confirmed Exiting node [{}]", + n.address) } removedOtherDc.foreach { m => - logInfo("Leader is removing {} node [{}] in DC [{}]", m.status, m.address, m.dataCenter) + logInfo( + ClusterLogMarker.memberChanged(m.uniqueAddress, MemberStatus.Removed), + "Leader is removing {} node [{}] in DC [{}]", + m.status, + m.address, + m.dataCenter) } newGossip @@ -1358,7 +1394,11 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef, joinConfigCompatCh // log status changes changedMembers.foreach { m => - logInfo("Leader is moving node [{}] to [{}]", m.address, m.status) + logInfo( + ClusterLogMarker.memberChanged(m.uniqueAddress, m.status), + "Leader is moving node [{}] to [{}]", + m.address, + m.status) } publishMembershipState() @@ -1411,20 +1451,16 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef, joinConfigCompatCh updateLatestGossip(newGossip) val (exiting, nonExiting) = newlyDetectedUnreachableMembers.partition(_.status == Exiting) - if (nonExiting.nonEmpty) - logWarning( - "Marking node(s) as UNREACHABLE [{}]. Node roles [{}]", - nonExiting.mkString(", "), - selfRoles.mkString(", ")) + nonExiting.foreach { node => + logWarning(ClusterLogMarker.unreachable(node.address), "Marking node as UNREACHABLE [{}].", node) + } if (exiting.nonEmpty) logInfo( "Marking exiting node(s) as UNREACHABLE [{}]. This is expected and they will be removed.", exiting.mkString(", ")) - if (newlyDetectedReachableMembers.nonEmpty) - logInfo( - "Marking node(s) as REACHABLE [{}]. Node roles [{}]", - newlyDetectedReachableMembers.mkString(", "), - selfRoles.mkString(",")) + nonExiting.foreach { node => + logInfo(ClusterLogMarker.reachable(node.address), "Marking node as REACHABLE [{}].", node) + } publishMembershipState() } @@ -1741,6 +1777,7 @@ private[cluster] final class JoinSeedNodeProcess( context.become(done) } else { logError( + ClusterLogMarker.joinFailed, "Couldn't join seed nodes because of incompatible cluster configuration. " + "It's recommended to perform a full cluster shutdown in order to deploy this new version." + "If a cluster shutdown isn't an option, you may want to disable this protection by setting " + @@ -1756,6 +1793,7 @@ private[cluster] final class JoinSeedNodeProcess( case ReceiveTimeout => if (attempt >= 2) logWarning( + ClusterLogMarker.joinFailed, "Couldn't join seed nodes after [{}] attempts, will try again. seed-nodes=[{}]", attempt, seedNodes.filterNot(_ == selfAddress).mkString(", ")) diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterHeartbeat.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterHeartbeat.scala index 0cebc860f3..dc6e19a6e0 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterHeartbeat.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterHeartbeat.scala @@ -41,7 +41,8 @@ private[cluster] final class ClusterHeartbeatReceiver(getCluster: () => Cluster) lazy val verboseHeartbeat = cluster.settings.Debug.VerboseHeartbeatLogging private lazy val clusterLogger = - new cluster.ClusterLogger(Logging(context.system, ActorWithLogClass(this, ClusterLogClass.ClusterHeartbeat))) + new cluster.ClusterLogger( + Logging.withMarker(context.system, ActorWithLogClass(this, ClusterLogClass.ClusterHeartbeat))) def receive: Receive = { case hb: Heartbeat => @@ -108,7 +109,8 @@ private[cluster] class ClusterHeartbeatSender extends Actor { import context.dispatcher private val clusterLogger = - new cluster.ClusterLogger(Logging(context.system, ActorWithLogClass(this, ClusterLogClass.ClusterHeartbeat))) + new cluster.ClusterLogger( + Logging.withMarker(context.system, ActorWithLogClass(this, ClusterLogClass.ClusterHeartbeat))) import clusterLogger._ val filterInternalClusterMembers: Member => Boolean = @@ -227,6 +229,7 @@ private[cluster] class ClusterHeartbeatSender extends Actor { val now = System.nanoTime() if ((now - tickTimestamp) >= (HeartbeatInterval.toNanos * 2)) logWarning( + ClusterLogMarker.heartbeatStarvation, "Scheduled sending of heartbeat was delayed. " + "Previous heartbeat was sent [{}] ms ago, expected interval is [{}] ms. This may cause failure detection " + "to mark members as unreachable. The reason can be thread starvation, e.g. by running blocking tasks on the " + diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterLogMarker.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterLogMarker.scala new file mode 100644 index 0000000000..35233c830e --- /dev/null +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterLogMarker.scala @@ -0,0 +1,94 @@ +/* + * Copyright (C) 2019 Lightbend Inc. + */ + +package akka.cluster + +import akka.actor.Address +import akka.annotation.ApiMayChange +import akka.annotation.InternalApi +import akka.event.LogMarker + +/** + * This is public with the purpose to document the used markers and properties of log events. + * No guarantee that it will remain binary compatible, but the marker names and properties + * are considered public API and will not be changed without notice. + */ +@ApiMayChange +object ClusterLogMarker { + + /** + * INTERNAL API + */ + @InternalApi private[akka] object Properties { + val MemberStatus = "akkaMemberStatus" + } + + /** + * Marker "akkaUnreachable" of log event when a node is marked as unreachable based no failure detector observation. + * @param node The address of the node that is marked as unreachable. Included as property "akkaRemoteAddress". + */ + def unreachable(node: Address): LogMarker = + LogMarker("akkaUnreachable", Map(LogMarker.Properties.RemoteAddress -> node)) + + /** + * Marker "akkaReachable" of log event when a node is marked as reachable again based no failure detector observation. + * @param node The address of the node that is marked as reachable. Included as property "akkaRemoteAddress". + */ + def reachable(node: Address): LogMarker = + LogMarker("akkaReachable", Map(LogMarker.Properties.RemoteAddress -> node)) + + /** + * Marker "akkaHeartbeatStarvation" of log event when scheduled heartbeat was delayed. + */ + val heartbeatStarvation: LogMarker = + LogMarker("akkaHeartbeatStarvation") + + /** + * Marker "akkaClusterLeaderIncapacitated" of log event when leader can't perform its duties. + * Typically because there are unreachable nodes that have not been downed. + */ + val leaderIncapacitated: LogMarker = + LogMarker("akkaClusterLeaderIncapacitated") + + /** + * Marker "akkaClusterLeaderRestored" of log event when leader can perform its duties again. + */ + val leaderRestored: LogMarker = + LogMarker("akkaClusterLeaderRestored") + + /** + * Marker "akkaJoinFailed" of log event when node couldn't join seed nodes. + */ + val joinFailed: LogMarker = + LogMarker("akkaJoinFailed") + + /** + * Marker "akkaMemberChanged" of log event when a member's status is changed by the leader. + * @param node The address of the node that is changed. Included as property "akkaRemoteAddress" + * and "akkaRemoteAddressUid". + * @param status New member status. Included as property "akkaMemberStatus". + */ + def memberChanged(node: UniqueAddress, status: MemberStatus): LogMarker = + LogMarker( + "akkaMemberChanged", + Map( + LogMarker.Properties.RemoteAddress -> node.address, + LogMarker.Properties.RemoteAddressUid -> node.longUid, + Properties.MemberStatus -> status)) + + /** + * Marker "akkaClusterSingletonStarted" of log event when Cluster Singleton + * instance has started. + */ + val singletonStarted: LogMarker = + LogMarker("akkaClusterSingletonStarted") + + /** + * Marker "akkaClusterSingletonTerminated" of log event when Cluster Singleton + * instance has terminated. + */ + val singletonTerminated: LogMarker = + LogMarker("akkaClusterSingletonTerminated") + +} diff --git a/akka-cluster/src/main/scala/akka/cluster/CrossDcClusterHeartbeat.scala b/akka-cluster/src/main/scala/akka/cluster/CrossDcClusterHeartbeat.scala index adea65ffc4..59930799c7 100644 --- a/akka-cluster/src/main/scala/akka/cluster/CrossDcClusterHeartbeat.scala +++ b/akka-cluster/src/main/scala/akka/cluster/CrossDcClusterHeartbeat.scala @@ -47,7 +47,8 @@ private[cluster] class CrossDcHeartbeatSender extends Actor { import context.dispatcher private val clusterLogger = - new cluster.ClusterLogger(Logging(context.system, ActorWithLogClass(this, ClusterLogClass.ClusterHeartbeat))) + new cluster.ClusterLogger( + Logging.withMarker(context.system, ActorWithLogClass(this, ClusterLogClass.ClusterHeartbeat))) import clusterLogger._ // For inspecting if in active state; allows avoiding "becoming active" when already active diff --git a/akka-docs/src/main/paradox/logging.md b/akka-docs/src/main/paradox/logging.md index d957b9232c..60aeb251ef 100644 --- a/akka-docs/src/main/paradox/logging.md +++ b/akka-docs/src/main/paradox/logging.md @@ -575,6 +575,14 @@ A more advanced (including most Akka added information) example pattern would be %date{ISO8601} level=[%level] marker=[%marker] logger=[%logger] akkaSource=[%X{akkaSource}] sourceActorSystem=[%X{sourceActorSystem}] sourceThread=[%X{sourceThread}] mdc=[ticket-#%X{ticketNumber}: %X{ticketDesc}] - msg=[%msg]%n----%n ``` +Akka is logging some events with markers. Some of these events also include structured MDC properties. + +* The "SECURITY" marker is used for highlighting security related events or incidents. +* Akka Actor is using the markers defined in @apidoc[akka.actor.ActorLogMarker]. +* Akka Cluster is using the markers defined in @apidoc[akka.cluster.ClusterLogMarker]. +* Akka Remoting is using the markers defined in @apidoc[akka.remote.RemoteLogMarker]. +* Akka Cluster Sharding is using the markers defined in @apidoc[akka.cluster.sharding.ShardingLogMarker]. + #### Using SLF4J's Markers It is also possible to use the `org.slf4j.Marker` with the `LoggingAdapter` when using slf4j. diff --git a/akka-docs/src/main/paradox/typed/logging.md b/akka-docs/src/main/paradox/typed/logging.md index 5702755a36..d9090b373b 100644 --- a/akka-docs/src/main/paradox/typed/logging.md +++ b/akka-docs/src/main/paradox/typed/logging.md @@ -433,6 +433,16 @@ With Logback the timestamp is available with `%X{akkaTimestamp}` specifier withi ``` +### Markers + +Akka is logging some events with markers. Some of these events also include structured MDC properties. + +* The "SECURITY" marker is used for highlighting security related events or incidents. +* Akka Actor is using the markers defined in @apidoc[akka.actor.ActorLogMarker]. +* Akka Cluster is using the markers defined in @apidoc[akka.cluster.ClusterLogMarker]. +* Akka Remoting is using the markers defined in @apidoc[akka.remote.RemoteLogMarker]. +* Akka Cluster Sharding is using the markers defined in @apidoc[akka.cluster.sharding.ShardingLogMarker]. + ### Logger names It can be useful to enable debug level or other SLF4J backend configuration for certain modules of Akka when diff --git a/akka-remote/src/main/scala/akka/remote/PhiAccrualFailureDetector.scala b/akka-remote/src/main/scala/akka/remote/PhiAccrualFailureDetector.scala index 45a92d623a..eefeeb18db 100644 --- a/akka-remote/src/main/scala/akka/remote/PhiAccrualFailureDetector.scala +++ b/akka-remote/src/main/scala/akka/remote/PhiAccrualFailureDetector.scala @@ -7,12 +7,15 @@ package akka.remote import akka.event.Logging.Warning import akka.remote.FailureDetector.Clock import java.util.concurrent.atomic.AtomicReference + import scala.annotation.tailrec import scala.concurrent.duration.Duration import scala.concurrent.duration.FiniteDuration import scala.collection.immutable + import com.typesafe.config.Config import akka.event.EventStream +import akka.event.Logging import akka.util.Helpers.ConfigOps /** @@ -144,7 +147,9 @@ class PhiAccrualFailureDetector( Warning( this.toString, getClass, - s"heartbeat interval is growing too large for address $address: $interval millis")) + s"heartbeat interval is growing too large for address $address: $interval millis", + Logging.emptyMDC, + RemoteLogMarker.failureDetectorGrowing(address))) oldState.history :+ interval } else oldState.history } diff --git a/akka-remote/src/main/scala/akka/remote/RemoteLogMarker.scala b/akka-remote/src/main/scala/akka/remote/RemoteLogMarker.scala new file mode 100644 index 0000000000..46f240d05d --- /dev/null +++ b/akka-remote/src/main/scala/akka/remote/RemoteLogMarker.scala @@ -0,0 +1,66 @@ +/* + * Copyright (C) 2019 Lightbend Inc. + */ + +package akka.remote + +import akka.actor.Address +import akka.annotation.ApiMayChange +import akka.event.LogMarker + +/** + * This is public with the purpose to document the used markers and properties of log events. + * No guarantee that it will remain binary compatible, but the marker names and properties + * are considered public API and will not be changed without notice. + */ +@ApiMayChange +object RemoteLogMarker { + + /** + * Marker "akkaFailureDetectorGrowing" of log event when failure detector heartbeat interval + * is growing too large. + * + * @param remoteAddress The address of the node that the failure detector is monitoring. Included as property "akkaRemoteAddress". + */ + def failureDetectorGrowing(remoteAddress: String): LogMarker = + LogMarker("akkaFailureDetectorGrowing", Map(LogMarker.Properties.RemoteAddress -> remoteAddress)) + + /** + * Marker "akkaQuarantine" of log event when a node is quarantined. + * + * @param remoteAddress The address of the node that is quarantined. Included as property "akkaRemoteAddress". + * @param remoteAddressUid The address of the node that is quarantined. Included as property "akkaRemoteAddressUid". + */ + def quarantine(remoteAddress: Address, remoteAddressUid: Option[Long]): LogMarker = + LogMarker( + "akkaQuarantine", + Map( + LogMarker.Properties.RemoteAddress -> remoteAddress, + LogMarker.Properties.RemoteAddressUid -> remoteAddressUid.getOrElse(""))) + + /** + * Marker "akkaConnect" of log event when outbound connection is attempted. + * + * @param remoteAddress The address of the connected node. Included as property "akkaRemoteAddress". + * @param remoteAddressUid The address of the connected node. Included as property "akkaRemoteAddressUid". + */ + def connect(remoteAddress: Address, remoteAddressUid: Option[Long]): LogMarker = + LogMarker( + "akkaConnect", + Map( + LogMarker.Properties.RemoteAddress -> remoteAddress, + LogMarker.Properties.RemoteAddressUid -> remoteAddressUid.getOrElse(""))) + + /** + * Marker "akkaDisconnected" of log event when outbound connection is closed. + * + * @param remoteAddress The address of the disconnected node. Included as property "akkaRemoteAddress". + * @param remoteAddressUid The address of the disconnected node. Included as property "akkaRemoteAddressUid". + */ + def disconnected(remoteAddress: Address, remoteAddressUid: Option[Long]): LogMarker = + LogMarker( + "akkaDisconnected", + Map( + LogMarker.Properties.RemoteAddress -> remoteAddress, + LogMarker.Properties.RemoteAddressUid -> remoteAddressUid.getOrElse(""))) +} diff --git a/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala b/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala index b1ad6b07fa..e21e60a35a 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala @@ -11,7 +11,7 @@ import akka.{ Done, NotUsed } import akka.actor.{ Actor, ActorRef, Address, CoordinatedShutdown, Dropped, ExtendedActorSystem, Props } import akka.annotation.InternalStableApi import akka.dispatch.Dispatchers -import akka.event.{ Logging, LoggingAdapter } +import akka.event.{ Logging, MarkerLoggingAdapter } import akka.remote.AddressUidExtension import akka.remote.RemoteActorRef import akka.remote.RemoteActorRefProvider @@ -28,7 +28,6 @@ import akka.stream._ import akka.stream.scaladsl.{ Flow, Keep, Sink } import akka.util.{ unused, OptionVal, WildcardIndex } import com.github.ghik.silencer.silent - import scala.annotation.tailrec import scala.concurrent.{ Await, Future, Promise } import scala.concurrent.duration._ @@ -308,7 +307,7 @@ private[remote] abstract class ArteryTransport(_system: ExtendedActorSystem, _pr @volatile private[this] var controlSubject: ControlMessageSubject = _ @volatile private[this] var messageDispatcher: MessageDispatcher = _ - override val log: LoggingAdapter = Logging(system, getClass) + override val log: MarkerLoggingAdapter = Logging.withMarker(system, getClass) /** * Compression tables must be created once, such that inbound lane restarts don't cause dropping of the tables. diff --git a/akka-remote/src/main/scala/akka/remote/artery/Association.scala b/akka-remote/src/main/scala/akka/remote/artery/Association.scala index 3ac17f04e0..d42241e9ba 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/Association.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/Association.scala @@ -30,6 +30,7 @@ import akka.event.Logging import akka.remote.DaemonMsgCreate import akka.remote.PriorityMessage import akka.remote.RemoteActorRef +import akka.remote.RemoteLogMarker import akka.remote.UniqueAddress import akka.remote.artery.ArteryTransport.AeronTerminated import akka.remote.artery.ArteryTransport.ShuttingDown @@ -142,7 +143,7 @@ private[remote] class Association( require(remoteAddress.port.nonEmpty) - private val log = Logging(transport.system, getClass) + private val log = Logging.withMarker(transport.system, getClass) private def flightRecorder = transport.topLevelFlightRecorder override def settings = transport.settings @@ -493,6 +494,7 @@ private[remote] class Association( .publish(GracefulShutdownQuarantinedEvent(UniqueAddress(remoteAddress, u), reason)) } else { log.warning( + RemoteLogMarker.quarantine(remoteAddress, Some(u)), "Association to [{}] with UID [{}] is irrecoverably failed. UID is now quarantined and all " + "messages to this UID will be delivered to dead letters. " + "Remote ActorSystem must be restarted to recover from this situation. Reason: {}", @@ -516,6 +518,7 @@ private[remote] class Association( } case Some(peer) => log.info( + RemoteLogMarker.quarantine(remoteAddress, Some(u)), "Quarantine of [{}] ignored due to non-matching UID, quarantine requested for [{}] but current is [{}]. {}", remoteAddress, u, @@ -524,12 +527,16 @@ private[remote] class Association( send(ClearSystemMessageDelivery(current.incarnation - 1), OptionVal.None, OptionVal.None) case None => log.info( + RemoteLogMarker.quarantine(remoteAddress, Some(u)), "Quarantine of [{}] ignored because handshake not completed, quarantine request was for old incarnation. {}", remoteAddress, reason) } case None => - log.warning("Quarantine of [{}] ignored because unknown UID", remoteAddress) + log.warning( + RemoteLogMarker.quarantine(remoteAddress, None), + "Quarantine of [{}] ignored because unknown UID", + remoteAddress) } } diff --git a/akka-remote/src/main/scala/akka/remote/artery/tcp/ArteryTcpTransport.scala b/akka-remote/src/main/scala/akka/remote/artery/tcp/ArteryTcpTransport.scala index b16f9fb40a..e5cfd57f60 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/tcp/ArteryTcpTransport.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/tcp/ArteryTcpTransport.scala @@ -26,6 +26,7 @@ import akka.actor.ExtendedActorSystem import akka.dispatch.ExecutionContexts import akka.event.Logging import akka.remote.RemoteActorRefProvider +import akka.remote.RemoteLogMarker import akka.remote.RemoteTransportException import akka.remote.artery.Decoder.InboundCompressionAccess import akka.remote.artery.compress._ @@ -145,6 +146,26 @@ private[remote] class ArteryTcpTransport( def connectionFlowWithRestart: Flow[ByteString, ByteString, NotUsed] = { val restartCount = new AtomicInteger(0) + def logConnect(): Unit = { + if (log.isDebugEnabled) + log.debug( + RemoteLogMarker.connect( + outboundContext.remoteAddress, + outboundContext.associationState.uniqueRemoteAddressValue().map(_.uid)), + "Outbound connection opened to [{}]", + outboundContext.remoteAddress) + } + + def logDisconnected(): Unit = { + if (log.isDebugEnabled) + log.debug( + RemoteLogMarker.disconnected( + outboundContext.remoteAddress, + outboundContext.associationState.uniqueRemoteAddressValue().map(_.uid)), + "Outbound connection closed to [{}]", + outboundContext.remoteAddress) + } + val flowFactory = () => { val onFailureLogLevel = if (restartCount.incrementAndGet() == 1) Logging.WarningLevel else Logging.DebugLevel @@ -152,6 +173,7 @@ private[remote] class ArteryTcpTransport( Flow[ByteString] .via(Flow.lazyFlow(() => { // only open the actual connection if any new messages are sent + logConnect() afr.loFreq( TcpOutbound_Connected, s"${outboundContext.remoteAddress.host.get}:${outboundContext.remoteAddress.port.get} " + @@ -161,6 +183,12 @@ private[remote] class ArteryTcpTransport( Flow[ByteString].prepend(Source.single(TcpFraming.encodeConnectionHeader(streamId))).via(connectionFlow) })) + .mapError { + case ArteryTransport.ShutdownSignal => ArteryTransport.ShutdownSignal + case e => + logDisconnected() + e + } .recoverWithRetries(1, { case ArteryTransport.ShutdownSignal => Source.empty }) .log(name = s"outbound connection to [${outboundContext.remoteAddress}], ${streamName(streamId)} stream") .addAttributes(Attributes.logLevels(onElement = LogLevels.Off, onFailure = onFailureLogLevel)) diff --git a/akka-slf4j/src/main/scala/akka/event/slf4j/Slf4jLogger.scala b/akka-slf4j/src/main/scala/akka/event/slf4j/Slf4jLogger.scala index 3c14b51f53..ea03a4de89 100644 --- a/akka-slf4j/src/main/scala/akka/event/slf4j/Slf4jLogger.scala +++ b/akka-slf4j/src/main/scala/akka/event/slf4j/Slf4jLogger.scala @@ -107,12 +107,22 @@ class Slf4jLogger extends Actor with SLF4JLogging with RequiresMessageQueue[Logg @inline final def withMdc(logSource: String, logEvent: LogEvent)(logStatement: => Unit): Unit = { + logEvent match { + case m: LogEventWithMarker if m.marker ne null => + val properties = m.marker.properties + if (properties.nonEmpty) { + properties.foreach { case (k, v) => MDC.put(k, String.valueOf(v)) } + } + case _ => + } + MDC.put(mdcAkkaSourceAttributeName, logSource) MDC.put(mdcThreadAttributeName, logEvent.thread.getName) MDC.put(mdcAkkaTimestamp, formatTimestamp(logEvent.timestamp)) MDC.put(mdcActorSystemAttributeName, context.system.name) MDC.put(mdcAkkaAddressAttributeName, akkaAddress) logEvent.mdc.foreach { case (k, v) => MDC.put(k, String.valueOf(v)) } + try logStatement finally { MDC.clear() @@ -174,7 +184,7 @@ class Slf4jLoggingFilter(@unused settings: ActorSystem.Settings, eventStream: Ev } /** Wraps [[org.slf4j.Marker]] */ -final class Slf4jLogMarker(val marker: org.slf4j.Marker) extends LogMarker(name = marker.getName) +final class Slf4jLogMarker(val marker: org.slf4j.Marker) extends LogMarker(name = marker.getName, Map.empty) /** Factory for creating [[LogMarker]] that wraps [[org.slf4j.Marker]] */ object Slf4jLogMarker { diff --git a/akka-slf4j/src/test/resources/logback-test.xml b/akka-slf4j/src/test/resources/logback-test.xml index 728ed77bff..f3847c005a 100644 --- a/akka-slf4j/src/test/resources/logback-test.xml +++ b/akka-slf4j/src/test/resources/logback-test.xml @@ -7,7 +7,7 @@ - %date{ISO8601} level=[%level] marker=[%marker] logger=[%logger] akkaSource=[%X{akkaSource}] akkaAddress=[%X{akkaAddress}] sourceActorSystem=[%X{sourceActorSystem}] sourceThread=[%X{sourceThread}] mdc=[ticket-#%X{ticketNumber}: %X{ticketDesc}] - msg=[%msg]%n----%n + %date{ISO8601} level=[%level] marker=[%marker] logger=[%logger] akkaSource=[%X{akkaSource}] akkaAddress=[%X{akkaAddress}] sourceActorSystem=[%X{sourceActorSystem}] sourceThread=[%X{sourceThread}] mdc=[ticket-#%X{ticketNumber}: %X{ticketDesc} p1: %X{p1} p2: %X{p2}] - msg=[%msg]%n----%n 1, "p2" -> "B"))) + + awaitCond(outputString.contains("----"), 5 seconds) + val s = outputString + s should include("marker=[testMarker]") + s should include("p1: 1 p2: B") + s should include("msg=[interesting message]") + } + "log info with slf4j marker" in { val slf4jMarker = MarkerFactory.getMarker("SLF") slf4jMarker.add(MarkerFactory.getMarker("ADDED")) // slf4j markers can have children @@ -143,7 +153,7 @@ class Slf4jLoggerSpec extends AkkaSpec(Slf4jLoggerSpec.config) with BeforeAndAft awaitCond(outputString.contains("----"), 5 seconds) val s = outputString s should include("marker=[SLF [ ADDED ]]") - s should include("mdc=[ticket-#3671: Custom MDC Values]") + s should include("mdc=[ticket-#3671: Custom MDC Values") s should include("msg=[security-wise interesting message]") } @@ -158,7 +168,7 @@ class Slf4jLoggerSpec extends AkkaSpec(Slf4jLoggerSpec.config) with BeforeAndAft s should include("level=[INFO]") s should include("logger=[akka.event.slf4j.Slf4jLoggerSpec$LogProducer]") (s should include).regex(sourceThreadRegex) - s should include("mdc=[ticket-#3671: Custom MDC Values]") + s should include("mdc=[ticket-#3671: Custom MDC Values") s should include("msg=[Message with custom MDC values]") } @@ -179,7 +189,7 @@ class Slf4jLoggerSpec extends AkkaSpec(Slf4jLoggerSpec.config) with BeforeAndAft s should include("level=[INFO]") s should include("logger=[akka.event.slf4j.Slf4jLoggerSpec$LogProducer]") (s should include).regex(sourceThreadRegex) - s should include("mdc=[ticket-#3671: null]") + s should include("mdc=[ticket-#3671: null") s should include("msg=[Message with null custom MDC values]") }