Structured log events, #28207 (#28209)

* Expanding the LogMarker to optionally include Map of additional
  properties
* The name of the marker shows up as `tags` in Kibana
* The properties of the LogMarker are included as MDC entries, which
  Logstash encoder automatically understands
* Implemented with classic eventStream logging so that it can be used
  from all places without dependencies to Slf4j
  * also means that it's possible to subscribe to LogEventWithMarker
    to consume these events via the eventStream
* move marker definitions to RemoteLogMarker and ClusterLogMarker
* marker for dead letters
* marker for leader detained/allowed
* marker for member status changed
* markers for shard allocated and started
* test LogMarker with properties in Slf4jLoggerSpec
* doc note of the LogMarker definitions
This commit is contained in:
Patrik Nordwall 2019-12-05 11:36:21 +01:00 committed by GitHub
parent 2f80042cd8
commit 89165badbb
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
23 changed files with 590 additions and 138 deletions

View file

@ -0,0 +1,26 @@
/*
* Copyright (C) 2019 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.actor
import akka.annotation.ApiMayChange
import akka.event.LogMarker
/**
* This is public with the purpose to document the used markers and properties of log events.
* No guarantee that it will remain binary compatible, but the marker names and properties
* are considered public API and will not be changed without notice.
*/
@ApiMayChange
object ActorLogMarker {
/**
* Marker "akkaDeadLetter" of log event for dead letter messages.
*
* @param messageClass The message class of the DeadLetter. Included as property "akkaMessageClass".
*/
def deadLetter(messageClass: String): LogMarker =
LogMarker("akkaDeadLetter", Map(LogMarker.Properties.MessageClass -> messageClass))
}

View file

@ -8,6 +8,7 @@ import scala.concurrent.duration.Deadline
import scala.concurrent.duration.FiniteDuration
import akka.actor.Actor
import akka.actor.ActorLogMarker
import akka.actor.ActorRef
import akka.actor.AllDeadLetters
import akka.actor.DeadLetter
@ -116,7 +117,9 @@ class DeadLetterListener extends Actor {
d.recipient.getClass,
logMessage +
"This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' " +
"and 'akka.log-dead-letters-during-shutdown'."))
"and 'akka.log-dead-letters-during-shutdown'.",
Logging.emptyMDC,
ActorLogMarker.deadLetter(messageStr)))
}
private def isReal(snd: ActorRef): Boolean = {

View file

@ -1627,17 +1627,36 @@ trait DiagnosticLoggingAdapter extends LoggingAdapter {
/** DO NOT INHERIT: Class is open only for use by akka-slf4j*/
@DoNotInherit
class LogMarker(val name: String)
class LogMarker(val name: String, val properties: Map[String, Any]) {
// for binary compatibility
def this(name: String) = this(name, Map.empty)
/** Java API */
def getProperties: java.util.Map[String, Object] = {
import akka.util.ccompat.JavaConverters._
properties.map { case (k, v) => (k, v.asInstanceOf[AnyRef]) }.asJava
}
}
object LogMarker {
/** The Marker is internally transferred via MDC using using this key */
private[akka] final val MDCKey = "marker"
def apply(name: String): LogMarker = new LogMarker(name)
def apply(name: String): LogMarker = new LogMarker(name, Map.empty)
def apply(name: String, properties: Map[String, Any]): LogMarker = new LogMarker(name, properties)
/** Java API */
def create(name: String): LogMarker = apply(name)
/** Java API */
def create(name: String, properties: java.util.Map[String, Any]): LogMarker = {
import akka.util.ccompat.JavaConverters._
apply(name, properties.asScala.toMap)
}
@Deprecated
@deprecated("use akka.event.LogEventWithMarker#marker instead", since = "2.5.12")
def extractFromMDC(mdc: MDC): Option[String] =
@ -1648,6 +1667,15 @@ object LogMarker {
private[akka] final val Security = apply("SECURITY")
/**
* INTERNAL API
*/
@InternalApi private[akka] object Properties {
val MessageClass = "akkaMessageClass"
val RemoteAddress = "akkaRemoteAddress"
val RemoteAddressUid = "akkaRemoteAddressUid"
}
}
/**
@ -1896,6 +1924,19 @@ class MarkerLoggingAdapter(
if (isDebugEnabled(marker))
bus.publish(Debug(logSource, logClass, format(template, arg1, arg2, arg3, arg4), mdc, marker))
/**
* Log message at the specified log level.
*/
def log(marker: LogMarker, level: Logging.LogLevel, message: String): Unit = {
level match {
case Logging.DebugLevel => debug(marker, message)
case Logging.InfoLevel => info(marker, message)
case Logging.WarningLevel => warning(marker, message)
case Logging.ErrorLevel => error(marker, message)
case _ =>
}
}
// Copy of LoggingAdapter.format1 due to binary compatibility restrictions
private def format1(t: String, arg: Any): String = arg match {
case a: Array[_] if !a.getClass.getComponentType.isPrimitive => format(t, a.toIndexedSeq)

View file

@ -0,0 +1,8 @@
# #28207 logging with markers
ProblemFilters.exclude[MissingTypesProblem]("akka.cluster.sharding.PersistentShardCoordinator")
ProblemFilters.exclude[MissingTypesProblem]("akka.cluster.sharding.DDataShardCoordinator")
ProblemFilters.exclude[MissingTypesProblem]("akka.cluster.sharding.ShardCoordinator")
ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.cluster.sharding.ShardCoordinator.log")
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.cluster.sharding.ShardCoordinator.typeName")
ProblemFilters.exclude[MissingTypesProblem]("akka.cluster.sharding.ShardRegion")
ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.cluster.sharding.ShardRegion.log")

View file

@ -8,6 +8,7 @@ import scala.collection.immutable
import scala.concurrent.Future
import scala.concurrent.duration._
import scala.util.Success
import akka.actor._
import akka.actor.DeadLetterSuppression
import akka.annotation.InternalApi
@ -25,6 +26,8 @@ import akka.cluster.ddata.GSetKey
import akka.cluster.ddata.Key
import akka.cluster.ddata.ReplicatedData
import akka.cluster.ddata.SelfUniqueAddress
import akka.event.BusLogging
import akka.event.Logging
import akka.util.Timeout
import com.github.ghik.silencer.silent
@ -495,13 +498,14 @@ object ShardCoordinator {
abstract class ShardCoordinator(
settings: ClusterShardingSettings,
allocationStrategy: ShardCoordinator.ShardAllocationStrategy)
extends Actor
with ActorLogging {
extends Actor {
import ShardCoordinator._
import ShardCoordinator.Internal._
import ShardRegion.ShardId
import settings.tuningParameters._
val log = Logging.withMarker(context.system, this)
val cluster = Cluster(context.system)
val removalMargin = cluster.downingProvider.downRemovalMargin
val minMembers = settings.role match {
@ -527,6 +531,8 @@ abstract class ShardCoordinator(
cluster.subscribe(self, initialStateMode = InitialStateAsEvents, ClusterShuttingDown.getClass)
protected def typeName: String
override def postStop(): Unit = {
super.postStop()
rebalanceTask.cancel()
@ -881,7 +887,11 @@ abstract class ShardCoordinator(
if (state.regions.contains(region) && !gracefulShutdownInProgress.contains(region)) {
update(ShardHomeAllocated(shard, region)) { evt =>
state = state.updated(evt)
log.debug("Shard [{}] allocated at [{}]", evt.shard, evt.region)
log.debug(
ShardingLogMarker.shardAllocated(typeName, shard, regionAddress(region)),
"Shard [{}] allocated at [{}]",
evt.shard,
evt.region)
sendHostShardMsg(evt.shard, evt.region)
getShardHomeSender ! ShardHome(evt.shard, evt.region)
@ -895,8 +905,13 @@ abstract class ShardCoordinator(
}
}
private def regionAddress(region: ActorRef): Address = {
if (region.path.address.host.isEmpty) cluster.selfAddress
else region.path.address
}
def continueRebalance(shards: Set[ShardId]): Unit = {
if (log.isInfoEnabled && (shards.nonEmpty || rebalanceInProgress.nonEmpty)) {
if ((log: BusLogging).isInfoEnabled && (shards.nonEmpty || rebalanceInProgress.nonEmpty)) {
log.info(
"Starting rebalance for shards [{}]. Current shards rebalancing: [{}]",
shards.mkString(","),
@ -932,7 +947,7 @@ abstract class ShardCoordinator(
*/
@deprecated("Use `ddata` mode, persistence mode is deprecated.", "2.6.0")
class PersistentShardCoordinator(
typeName: String,
override val typeName: String,
settings: ClusterShardingSettings,
allocationStrategy: ShardCoordinator.ShardAllocationStrategy)
extends ShardCoordinator(settings, allocationStrategy)
@ -1041,7 +1056,7 @@ class PersistentShardCoordinator(
* @see [[ClusterSharding$ ClusterSharding extension]]
*/
class DDataShardCoordinator(
typeName: String,
override val typeName: String,
settings: ClusterShardingSettings,
allocationStrategy: ShardCoordinator.ShardAllocationStrategy,
replicator: ActorRef,

View file

@ -25,6 +25,7 @@ import akka.cluster.MemberStatus
import akka.cluster.ClusterSettings
import akka.cluster.ClusterSettings.DataCenter
import akka.cluster.sharding.Shard.ShardStats
import akka.event.Logging
import akka.pattern.{ ask, pipe }
import akka.util.{ MessageBufferMap, PrettyDuration, Timeout }
@ -490,7 +491,6 @@ private[akka] class ShardRegion(
replicator: ActorRef,
majorityMinCap: Int)
extends Actor
with ActorLogging
with Timers {
import ShardingQueries.ShardsQueryResult
@ -499,6 +499,8 @@ private[akka] class ShardRegion(
import settings._
import settings.tuningParameters._
val log = Logging.withMarker(context.system, this)
val cluster = Cluster(context.system)
// sort by age, oldest first
@ -1033,7 +1035,7 @@ private[akka] class ShardRegion(
.get(id)
.orElse(entityProps match {
case Some(props) if !shardsByRef.values.exists(_ == id) =>
log.debug("{}: Starting shard [{}] in region", typeName, id)
log.debug(ShardingLogMarker.shardStarted(typeName, id), "{}: Starting shard [{}] in region", typeName, id)
val name = URLEncoder.encode(id, "utf-8")
val shard = context.watch(

View file

@ -0,0 +1,50 @@
/*
* Copyright (C) 2019 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.cluster.sharding
import akka.actor.Address
import akka.annotation.ApiMayChange
import akka.annotation.InternalApi
import akka.event.LogMarker
/**
* This is public with the purpose to document the used markers and properties of log events.
* No guarantee that it will remain binary compatible, but the marker names and properties
* are considered public API and will not be changed without notice.
*/
@ApiMayChange
object ShardingLogMarker {
/**
* INTERNAL API
*/
@InternalApi private[akka] object Properties {
val ShardTypeName = "akkaShardTypeName"
val ShardId = "akkaShardId"
}
/**
* Marker "akkaShardAllocated" of log event when `ShardCoordinator` allocates a shard to a region.
* @param shardTypeName The `typeName` of the shard. Included as property "akkaShardTypeName".
* @param shardId The id of the shard. Included as property "akkaShardId".
* @param node The address of the node where the shard is allocated. Included as property "akkaRemoteAddress".
*/
def shardAllocated(shardTypeName: String, shardId: String, node: Address): LogMarker =
LogMarker(
"akkaShardAllocated",
Map(
Properties.ShardTypeName -> shardTypeName,
Properties.ShardId -> shardId,
LogMarker.Properties.RemoteAddress -> node))
/**
* Marker "akkaShardStarted" of log event when `ShardRegion` starts a shard.
* @param shardTypeName The `typeName` of the shard. Included as property "akkaShardTypeName".
* @param shardId The id of the shard. Included as property "akkaShardId".
*/
def shardStarted(shardTypeName: String, shardId: String): LogMarker =
LogMarker("akkaShardStarted", Map(Properties.ShardTypeName -> shardTypeName, Properties.ShardId -> shardId))
}

View file

@ -32,6 +32,8 @@ import akka.coordination.lease.LeaseUsageSettings
import akka.coordination.lease.scaladsl.Lease
import akka.coordination.lease.scaladsl.LeaseProvider
import akka.dispatch.Dispatchers
import akka.event.LogMarker
import akka.event.Logging
import akka.pattern.ask
import akka.pattern.pipe
import akka.util.JavaDurationConverters._
@ -494,6 +496,8 @@ class ClusterSingletonManager(singletonProps: Props, terminationMessage: Any, se
private val singletonLeaseName = s"${context.system.name}-singleton-${self.path}"
override val log = Logging.withMarker(context.system, this)
val lease: Option[Lease] = settings.leaseSettings.map(
settings =>
LeaseProvider(context.system)
@ -555,12 +559,21 @@ class ClusterSingletonManager(singletonProps: Props, terminationMessage: Any, se
def logInfo(message: String): Unit =
if (LogInfo) log.info(message)
def logInfo(marker: LogMarker, message: String): Unit =
if (LogInfo) log.info(marker, message)
def logInfo(template: String, arg1: Any): Unit =
if (LogInfo) log.info(template, arg1)
def logInfo(marker: LogMarker, template: String, arg1: Any): Unit =
if (LogInfo) log.info(marker, template, arg1)
def logInfo(template: String, arg1: Any, arg2: Any): Unit =
if (LogInfo) log.info(template, arg1, arg2)
def logInfo(marker: LogMarker, template: String, arg1: Any, arg2: Any): Unit =
if (LogInfo) log.info(marker, template, arg1, arg2)
def logInfo(template: String, arg1: Any, arg2: Any, arg3: Any): Unit =
if (LogInfo) log.info(template, arg1, arg2, arg3)
@ -802,7 +815,9 @@ class ClusterSingletonManager(singletonProps: Props, terminationMessage: Any, se
stay.using(AcquiringLeaseData(leaseRequestInProgress = false, None))
}
case Event(Terminated(ref), AcquiringLeaseData(_, Some(singleton))) if ref == singleton =>
logInfo("Singleton actor terminated. Trying to acquire lease again before re-creating.")
logInfo(
ClusterLogMarker.singletonTerminated,
"Singleton actor terminated. Trying to acquire lease again before re-creating.")
// tryAcquireLease sets the state to None for singleton actor
tryAcquireLease()
case Event(AcquireLeaseFailure(t), _) =>
@ -834,8 +849,11 @@ class ClusterSingletonManager(singletonProps: Props, terminationMessage: Any, se
@InternalStableApi
def gotoOldest(): State = {
logInfo(
ClusterLogMarker.singletonStarted,
"Singleton manager starting singleton actor [{}]",
self.path / singletonName)
val singleton = context.watch(context.actorOf(singletonProps, singletonName))
logInfo("Singleton manager starting singleton actor [{}]", singleton.path)
goto(Oldest).using(OldestData(Some(singleton)))
}
@ -874,7 +892,7 @@ class ClusterSingletonManager(singletonProps: Props, terminationMessage: Any, se
stay
case Event(Terminated(ref), d @ OldestData(Some(singleton))) if ref == singleton =>
logInfo("Singleton actor [{}] was terminated", singleton.path)
logInfo(ClusterLogMarker.singletonTerminated, "Singleton actor [{}] was terminated", singleton.path)
stay.using(d.copy(singleton = None))
case Event(SelfExiting, _) =>
@ -934,7 +952,7 @@ class ClusterSingletonManager(singletonProps: Props, terminationMessage: Any, se
gotoHandingOver(singleton, None)
case Event(Terminated(ref), d @ WasOldestData(singleton, _)) if singleton.contains(ref) =>
logInfo("Singleton actor [{}] was terminated", ref.path)
logInfo(ClusterLogMarker.singletonTerminated, "Singleton actor [{}] was terminated", ref.path)
stay.using(d.copy(singleton = None))
case Event(SelfExiting, _) =>
@ -984,7 +1002,11 @@ class ClusterSingletonManager(singletonProps: Props, terminationMessage: Any, se
def handOverDone(handOverTo: Option[ActorRef]): State = {
val newOldest = handOverTo.map(_.path.address)
logInfo("Singleton terminated, hand-over done [{} -> {}]", cluster.selfAddress, newOldest)
logInfo(
ClusterLogMarker.singletonTerminated,
"Singleton terminated, hand-over done [{} -> {}]",
cluster.selfAddress,
newOldest)
handOverTo.foreach { _ ! HandOverDone }
memberExitingProgress.trySuccess(Done)
if (removed.contains(cluster.selfUniqueAddress)) {
@ -1004,7 +1026,7 @@ class ClusterSingletonManager(singletonProps: Props, terminationMessage: Any, se
when(Stopping) {
case Event(Terminated(ref), StoppingData(singleton)) if ref == singleton =>
logInfo("Singleton actor [{}] was terminated", singleton.path)
logInfo(ClusterLogMarker.singletonTerminated, "Singleton actor [{}] was terminated", singleton.path)
stop()
}

View file

@ -24,8 +24,9 @@ import scala.concurrent.duration._
import scala.concurrent.{ Await, ExecutionContext }
import scala.util.control.NonFatal
import akka.event.LogMarker
import akka.event.Logging.LogLevel
import akka.event.MarkerLoggingAdapter
import com.github.ghik.silencer.silent
/**
@ -101,7 +102,7 @@ class Cluster(val system: ExtendedActorSystem) extends Extension {
scala.collection.JavaConverters.setAsJavaSetConverter(selfRoles).asJava
private val _isTerminated = new AtomicBoolean(false)
private val log = Logging(system, ClusterLogClass.ClusterCore)
private val log = Logging.withMarker(system, ClusterLogClass.ClusterCore)
// ClusterJmx is initialized as the last thing in the constructor
private var clusterJmx: Option[ClusterJmx] = None
@ -468,57 +469,101 @@ class Cluster(val system: ExtendedActorSystem) extends Extension {
/**
* INTERNAL API
*/
private[cluster] class ClusterLogger(log: LoggingAdapter) {
private[cluster] class ClusterLogger(log: MarkerLoggingAdapter) {
def isDebugEnabled: Boolean =
log.isDebugEnabled
def logDebug(message: String): Unit =
if (settings.LogInfo && log.isDebugEnabled)
logAtLevel(Logging.DebugLevel, message)
def logDebug(template: String, arg1: Any): Unit =
logAtLevel(Logging.DebugLevel, template, arg1)
if (settings.LogInfo && log.isDebugEnabled)
logAtLevel(Logging.DebugLevel, log.format(template, arg1))
def logDebug(template: String, arg1: Any, arg2: Any): Unit =
logAtLevel(Logging.DebugLevel, template, arg1, arg2)
if (settings.LogInfo && log.isDebugEnabled)
logAtLevel(Logging.DebugLevel, log.format(template, arg1, arg2))
def logDebug(template: String, arg1: Any, arg2: Any, arg3: Any): Unit =
logAtLevel(Logging.DebugLevel, template, arg1, arg2, arg3)
if (settings.LogInfo && log.isDebugEnabled)
logAtLevel(Logging.DebugLevel, log.format(template, arg1, arg2, arg3))
def logInfo(message: String): Unit =
if (settings.LogInfo && log.isInfoEnabled)
logAtLevel(Logging.InfoLevel, message)
def logInfo(marker: LogMarker, message: String): Unit =
if (settings.LogInfo && log.isInfoEnabled(marker))
logAtLevel(marker, Logging.InfoLevel, message)
def logInfo(template: String, arg1: Any): Unit =
logAtLevel(Logging.InfoLevel, template, arg1)
if (settings.LogInfo && log.isInfoEnabled)
logAtLevel(Logging.InfoLevel, log.format(template, arg1))
def logInfo(marker: LogMarker, template: String, arg1: Any): Unit =
if (settings.LogInfo && log.isInfoEnabled(marker))
logAtLevel(marker, Logging.InfoLevel, log.format(template, arg1))
def logInfo(template: String, arg1: Any, arg2: Any): Unit =
logAtLevel(Logging.InfoLevel, template, arg1, arg2)
if (settings.LogInfo && log.isInfoEnabled)
logAtLevel(Logging.InfoLevel, log.format(template, arg1, arg2))
def logInfo(marker: LogMarker, template: String, arg1: Any, arg2: Any): Unit =
if (settings.LogInfo && log.isInfoEnabled(marker))
logAtLevel(marker, Logging.InfoLevel, log.format(template, arg1, arg2))
def logInfo(template: String, arg1: Any, arg2: Any, arg3: Any): Unit =
logAtLevel(Logging.InfoLevel, template, arg1, arg2, arg3)
if (settings.LogInfo && log.isInfoEnabled)
logAtLevel(Logging.InfoLevel, log.format(template, arg1, arg2, arg3))
def logInfo(marker: LogMarker, template: String, arg1: Any, arg2: Any, arg3: Any): Unit =
if (settings.LogInfo && log.isInfoEnabled(marker))
logAtLevel(marker, Logging.InfoLevel, log.format(template, arg1, arg2, arg3))
def logWarning(message: String): Unit =
if (log.isWarningEnabled)
logAtLevel(Logging.WarningLevel, message)
def logWarning(template: String, arg1: Any): Unit =
logAtLevel(Logging.WarningLevel, template, arg1)
if (log.isWarningEnabled)
logAtLevel(Logging.WarningLevel, log.format(template, arg1))
def logWarning(marker: LogMarker, template: String, arg1: Any): Unit =
if (log.isWarningEnabled(marker))
logAtLevel(marker, Logging.WarningLevel, log.format(template, arg1))
def logWarning(template: String, arg1: Any, arg2: Any): Unit =
logAtLevel(Logging.WarningLevel, template, arg1, arg2)
if (log.isWarningEnabled)
logAtLevel(Logging.WarningLevel, log.format(template, arg1, arg2))
def logWarning(marker: LogMarker, template: String, arg1: Any, arg2: Any): Unit =
if (log.isWarningEnabled(marker))
logAtLevel(marker, Logging.WarningLevel, log.format(template, arg1, arg2))
def logWarning(template: String, arg1: Any, arg2: Any, arg3: Any): Unit =
logAtLevel(Logging.WarningLevel, template, arg1, arg2, arg3)
if (log.isWarningEnabled)
logAtLevel(Logging.WarningLevel, log.format(template, arg1, arg2, arg3))
def logError(message: String): Unit =
if (log.isErrorEnabled)
logAtLevel(Logging.ErrorLevel, message)
def logError(marker: LogMarker, message: String): Unit =
if (log.isErrorEnabled(marker))
logAtLevel(marker, Logging.ErrorLevel, message)
def logError(template: String, arg1: Any): Unit =
logAtLevel(Logging.ErrorLevel, template, arg1)
if (log.isErrorEnabled)
logAtLevel(Logging.ErrorLevel, log.format(template, arg1))
def logError(template: String, arg1: Any, arg2: Any): Unit =
logAtLevel(Logging.ErrorLevel, template, arg1, arg2)
if (log.isErrorEnabled)
logAtLevel(Logging.ErrorLevel, log.format(template, arg1, arg2))
def logError(template: String, arg1: Any, arg2: Any, arg3: Any): Unit =
logAtLevel(Logging.ErrorLevel, template, arg1, arg2, arg3)
if (log.isErrorEnabled)
logAtLevel(Logging.ErrorLevel, log.format(template, arg1, arg2, arg3))
def logError(cause: Throwable, message: String): Unit = {
if (settings.SelfDataCenter == ClusterSettings.DefaultDataCenter)
@ -527,71 +572,32 @@ class Cluster(val system: ExtendedActorSystem) extends Extension {
log.error(cause, "Cluster Node [{}] dc [{}] - {}", selfAddress, settings.SelfDataCenter, message)
}
def logError(cause: Throwable, template: String, arg1: Any): Unit = {
if (settings.SelfDataCenter == ClusterSettings.DefaultDataCenter)
log.error(cause, "Cluster Node [{}] - " + template, selfAddress, arg1)
else
log.error(cause, "Cluster Node [{}] dc [{}] - " + template, selfAddress, settings.SelfDataCenter, arg1)
}
def logError(cause: Throwable, template: String, arg1: Any): Unit =
logError(cause, log.format(template, arg1))
def logError(cause: Throwable, template: String, arg1: Any, arg2: Any): Unit = {
if (settings.SelfDataCenter == ClusterSettings.DefaultDataCenter)
log.error(cause, "Cluster Node [{}] - " + template, selfAddress, arg1, arg2)
else
log.error(cause, "Cluster Node [{}] dc [{}] - " + template, selfAddress, settings.SelfDataCenter, arg1, arg2)
}
def logError(cause: Throwable, template: String, arg1: Any, arg2: Any): Unit =
logError(cause, log.format(template, arg1, arg2))
def logError(cause: Throwable, template: String, arg1: Any, arg2: Any, arg3: Any): Unit = {
if (settings.SelfDataCenter == ClusterSettings.DefaultDataCenter)
log.error(cause, "Cluster Node [{}] - " + template, selfAddress, arg1, arg2, arg3)
else
log.error(
cause,
"Cluster Node [{}] dc [" + settings.SelfDataCenter + "] - " + template,
selfAddress,
arg1,
arg2,
arg3)
}
def logError(cause: Throwable, template: String, arg1: Any, arg2: Any, arg3: Any): Unit =
logError(cause, log.format(template, arg1, arg2, arg3))
private def logAtLevel(logLevel: LogLevel, message: String): Unit = {
if (isLevelEnabled(logLevel))
if (settings.SelfDataCenter == ClusterSettings.DefaultDataCenter)
log.log(logLevel, "Cluster Node [{}] - {}", selfAddress, message)
else
log.log(logLevel, "Cluster Node [{}] dc [{}] - {}", selfAddress, settings.SelfDataCenter, message)
}
private def logAtLevel(logLevel: LogLevel, template: String, arg1: Any): Unit = {
if (isLevelEnabled(logLevel))
private def logAtLevel(marker: LogMarker, logLevel: LogLevel, message: String): Unit = {
if (settings.SelfDataCenter == ClusterSettings.DefaultDataCenter)
log.log(logLevel, "Cluster Node [{}] - " + template, selfAddress, arg1)
else
log.log(logLevel, "Cluster Node [{}] dc [{}] - " + template, selfAddress, settings.SelfDataCenter, arg1)
}
private def logAtLevel(logLevel: LogLevel, template: String, arg1: Any, arg2: Any): Unit =
if (isLevelEnabled(logLevel))
if (settings.SelfDataCenter == ClusterSettings.DefaultDataCenter)
log.log(logLevel, "Cluster Node [{}] - " + template, selfAddress, arg1, arg2)
else
log.log(logLevel, "Cluster Node [{}] dc [{}] - " + template, selfAddress, settings.SelfDataCenter, arg1, arg2)
private def logAtLevel(logLevel: LogLevel, template: String, arg1: Any, arg2: Any, arg3: Any): Unit =
if (isLevelEnabled(logLevel))
if (settings.SelfDataCenter == ClusterSettings.DefaultDataCenter)
log.log(logLevel, "Cluster Node [{}] - " + template, selfAddress, arg1, arg2, arg3)
log.log(marker, logLevel, log.format("Cluster Node [{}] - {}", selfAddress, message))
else
log.log(
marker,
logLevel,
"Cluster Node [{}] dc [" + settings.SelfDataCenter + "] - " + template,
selfAddress,
arg1,
arg2,
arg3)
log.format("Cluster Node [{}] dc [{}] - {}", selfAddress, settings.SelfDataCenter, message))
}
private def isLevelEnabled(logLevel: LogLevel): Boolean =
LogInfo || logLevel < Logging.InfoLevel
}
}

View file

@ -323,7 +323,8 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef, joinConfigCompatCh
val selfDc = cluster.selfDataCenter
private val gossipLogger =
new cluster.ClusterLogger(Logging(context.system, ActorWithLogClass(this, ClusterLogClass.ClusterGossip)))
new cluster.ClusterLogger(
Logging.withMarker(context.system, ActorWithLogClass(this, ClusterLogClass.ClusterGossip)))
protected def selfUniqueAddress = cluster.selfUniqueAddress
@ -786,13 +787,18 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef, joinConfigCompatCh
if (joiningNode == selfUniqueAddress) {
logInfo(
ClusterLogMarker.memberChanged(joiningNode, MemberStatus.Joining),
"Node [{}] is JOINING itself (with roles [{}]) and forming new cluster",
joiningNode.address,
roles.mkString(", "))
if (localMembers.isEmpty)
leaderActions() // important for deterministic oldest when bootstrapping
} else {
logInfo("Node [{}] is JOINING, roles [{}]", joiningNode.address, roles.mkString(", "))
logInfo(
ClusterLogMarker.memberChanged(joiningNode, MemberStatus.Joining),
"Node [{}] is JOINING, roles [{}]",
joiningNode.address,
roles.mkString(", "))
sender() ! Welcome(selfUniqueAddress, latestGossip)
}
@ -826,21 +832,25 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef, joinConfigCompatCh
*/
def leaving(address: Address): Unit = {
// only try to update if the node is available (in the member ring)
if (latestGossip.members.exists(
m => m.address == address && (m.status == Joining || m.status == WeaklyUp || m.status == Up))) {
val newMembers = latestGossip.members.map { m =>
if (m.address == address) m.copy(status = Leaving) else m
} // mark node as LEAVING
latestGossip.members.find(_.address == address).foreach { existingMember =>
if (existingMember.status == Joining || existingMember.status == WeaklyUp || existingMember.status == Up) {
// mark node as LEAVING
val newMembers = latestGossip.members - existingMember + existingMember.copy(status = Leaving)
val newGossip = latestGossip.copy(members = newMembers)
updateLatestGossip(newGossip)
logInfo("Marked address [{}] as [{}]", address, Leaving)
logInfo(
ClusterLogMarker.memberChanged(existingMember.uniqueAddress, MemberStatus.Leaving),
"Marked address [{}] as [{}]",
address,
Leaving)
publishMembershipState()
// immediate gossip to speed up the leaving process
gossip()
}
}
}
def exitingCompleted() = {
logInfo("Exiting completed")
@ -913,9 +923,17 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef, joinConfigCompatCh
localMembers.find(_.address == address) match {
case Some(m) if m.status != Down =>
if (localReachability.isReachable(m.uniqueAddress))
logInfo("Marking node [{}] as [{}]", m.address, Down)
logInfo(
ClusterLogMarker.memberChanged(m.uniqueAddress, MemberStatus.Down),
"Marking node [{}] as [{}]",
m.address,
Down)
else
logInfo("Marking unreachable node [{}] as [{}]", m.address, Down)
logInfo(
ClusterLogMarker.memberChanged(m.uniqueAddress, MemberStatus.Down),
"Marking unreachable node [{}] as [{}]",
m.address,
Down)
val newGossip = localGossip.markAsDown(m)
updateLatestGossip(newGossip)
@ -935,6 +953,7 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef, joinConfigCompatCh
val newGossip = localGossip.copy(overview = newOverview)
updateLatestGossip(newGossip)
logWarning(
ClusterLogMarker.unreachable(node.address),
"Marking node as TERMINATED [{}], due to quarantine. Node roles [{}]. " +
"It must still be marked as down before it's removed.",
node.address,
@ -1154,7 +1173,7 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef, joinConfigCompatCh
val periodicNotice = 60
if (membershipState.convergence(exitingConfirmed)) {
if (leaderActionCounter >= firstNotice)
logInfo("Leader can perform its duties again")
logInfo(ClusterLogMarker.leaderRestored, "Leader can perform its duties again")
leaderActionCounter = 0
leaderActionsOnConvergence()
} else {
@ -1164,6 +1183,7 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef, joinConfigCompatCh
if (leaderActionCounter == firstNotice || leaderActionCounter % periodicNotice == 0)
logInfo(
ClusterLogMarker.leaderIncapacitated,
"Leader can currently not perform its duties, reachability status: [{}], member status: [{}]",
membershipState.dcReachabilityExcludingDownedObservers,
latestGossip.members
@ -1296,17 +1316,33 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef, joinConfigCompatCh
exitingConfirmed = exitingConfirmed.filterNot(removedExitingConfirmed)
changedMembers.foreach { m =>
logInfo("Leader is moving node [{}] to [{}]", m.address, m.status)
logInfo(
ClusterLogMarker.memberChanged(m.uniqueAddress, m.status),
"Leader is moving node [{}] to [{}]",
m.address,
m.status)
}
removedUnreachable.foreach { m =>
val status = if (m.status == Exiting) "exiting" else "unreachable"
logInfo("Leader is removing {} node [{}]", status, m.address)
logInfo(
ClusterLogMarker.memberChanged(m.uniqueAddress, MemberStatus.Removed),
"Leader is removing {} node [{}]",
status,
m.address)
}
removedExitingConfirmed.foreach { n =>
logInfo("Leader is removing confirmed Exiting node [{}]", n.address)
logInfo(
ClusterLogMarker.memberChanged(n, MemberStatus.Removed),
"Leader is removing confirmed Exiting node [{}]",
n.address)
}
removedOtherDc.foreach { m =>
logInfo("Leader is removing {} node [{}] in DC [{}]", m.status, m.address, m.dataCenter)
logInfo(
ClusterLogMarker.memberChanged(m.uniqueAddress, MemberStatus.Removed),
"Leader is removing {} node [{}] in DC [{}]",
m.status,
m.address,
m.dataCenter)
}
newGossip
@ -1358,7 +1394,11 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef, joinConfigCompatCh
// log status changes
changedMembers.foreach { m =>
logInfo("Leader is moving node [{}] to [{}]", m.address, m.status)
logInfo(
ClusterLogMarker.memberChanged(m.uniqueAddress, m.status),
"Leader is moving node [{}] to [{}]",
m.address,
m.status)
}
publishMembershipState()
@ -1411,20 +1451,16 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef, joinConfigCompatCh
updateLatestGossip(newGossip)
val (exiting, nonExiting) = newlyDetectedUnreachableMembers.partition(_.status == Exiting)
if (nonExiting.nonEmpty)
logWarning(
"Marking node(s) as UNREACHABLE [{}]. Node roles [{}]",
nonExiting.mkString(", "),
selfRoles.mkString(", "))
nonExiting.foreach { node =>
logWarning(ClusterLogMarker.unreachable(node.address), "Marking node as UNREACHABLE [{}].", node)
}
if (exiting.nonEmpty)
logInfo(
"Marking exiting node(s) as UNREACHABLE [{}]. This is expected and they will be removed.",
exiting.mkString(", "))
if (newlyDetectedReachableMembers.nonEmpty)
logInfo(
"Marking node(s) as REACHABLE [{}]. Node roles [{}]",
newlyDetectedReachableMembers.mkString(", "),
selfRoles.mkString(","))
nonExiting.foreach { node =>
logInfo(ClusterLogMarker.reachable(node.address), "Marking node as REACHABLE [{}].", node)
}
publishMembershipState()
}
@ -1741,6 +1777,7 @@ private[cluster] final class JoinSeedNodeProcess(
context.become(done)
} else {
logError(
ClusterLogMarker.joinFailed,
"Couldn't join seed nodes because of incompatible cluster configuration. " +
"It's recommended to perform a full cluster shutdown in order to deploy this new version." +
"If a cluster shutdown isn't an option, you may want to disable this protection by setting " +
@ -1756,6 +1793,7 @@ private[cluster] final class JoinSeedNodeProcess(
case ReceiveTimeout =>
if (attempt >= 2)
logWarning(
ClusterLogMarker.joinFailed,
"Couldn't join seed nodes after [{}] attempts, will try again. seed-nodes=[{}]",
attempt,
seedNodes.filterNot(_ == selfAddress).mkString(", "))

View file

@ -41,7 +41,8 @@ private[cluster] final class ClusterHeartbeatReceiver(getCluster: () => Cluster)
lazy val verboseHeartbeat = cluster.settings.Debug.VerboseHeartbeatLogging
private lazy val clusterLogger =
new cluster.ClusterLogger(Logging(context.system, ActorWithLogClass(this, ClusterLogClass.ClusterHeartbeat)))
new cluster.ClusterLogger(
Logging.withMarker(context.system, ActorWithLogClass(this, ClusterLogClass.ClusterHeartbeat)))
def receive: Receive = {
case hb: Heartbeat =>
@ -108,7 +109,8 @@ private[cluster] class ClusterHeartbeatSender extends Actor {
import context.dispatcher
private val clusterLogger =
new cluster.ClusterLogger(Logging(context.system, ActorWithLogClass(this, ClusterLogClass.ClusterHeartbeat)))
new cluster.ClusterLogger(
Logging.withMarker(context.system, ActorWithLogClass(this, ClusterLogClass.ClusterHeartbeat)))
import clusterLogger._
val filterInternalClusterMembers: Member => Boolean =
@ -227,6 +229,7 @@ private[cluster] class ClusterHeartbeatSender extends Actor {
val now = System.nanoTime()
if ((now - tickTimestamp) >= (HeartbeatInterval.toNanos * 2))
logWarning(
ClusterLogMarker.heartbeatStarvation,
"Scheduled sending of heartbeat was delayed. " +
"Previous heartbeat was sent [{}] ms ago, expected interval is [{}] ms. This may cause failure detection " +
"to mark members as unreachable. The reason can be thread starvation, e.g. by running blocking tasks on the " +

View file

@ -0,0 +1,94 @@
/*
* Copyright (C) 2019 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.cluster
import akka.actor.Address
import akka.annotation.ApiMayChange
import akka.annotation.InternalApi
import akka.event.LogMarker
/**
* This is public with the purpose to document the used markers and properties of log events.
* No guarantee that it will remain binary compatible, but the marker names and properties
* are considered public API and will not be changed without notice.
*/
@ApiMayChange
object ClusterLogMarker {
/**
* INTERNAL API
*/
@InternalApi private[akka] object Properties {
val MemberStatus = "akkaMemberStatus"
}
/**
* Marker "akkaUnreachable" of log event when a node is marked as unreachable based no failure detector observation.
* @param node The address of the node that is marked as unreachable. Included as property "akkaRemoteAddress".
*/
def unreachable(node: Address): LogMarker =
LogMarker("akkaUnreachable", Map(LogMarker.Properties.RemoteAddress -> node))
/**
* Marker "akkaReachable" of log event when a node is marked as reachable again based no failure detector observation.
* @param node The address of the node that is marked as reachable. Included as property "akkaRemoteAddress".
*/
def reachable(node: Address): LogMarker =
LogMarker("akkaReachable", Map(LogMarker.Properties.RemoteAddress -> node))
/**
* Marker "akkaHeartbeatStarvation" of log event when scheduled heartbeat was delayed.
*/
val heartbeatStarvation: LogMarker =
LogMarker("akkaHeartbeatStarvation")
/**
* Marker "akkaClusterLeaderIncapacitated" of log event when leader can't perform its duties.
* Typically because there are unreachable nodes that have not been downed.
*/
val leaderIncapacitated: LogMarker =
LogMarker("akkaClusterLeaderIncapacitated")
/**
* Marker "akkaClusterLeaderRestored" of log event when leader can perform its duties again.
*/
val leaderRestored: LogMarker =
LogMarker("akkaClusterLeaderRestored")
/**
* Marker "akkaJoinFailed" of log event when node couldn't join seed nodes.
*/
val joinFailed: LogMarker =
LogMarker("akkaJoinFailed")
/**
* Marker "akkaMemberChanged" of log event when a member's status is changed by the leader.
* @param node The address of the node that is changed. Included as property "akkaRemoteAddress"
* and "akkaRemoteAddressUid".
* @param status New member status. Included as property "akkaMemberStatus".
*/
def memberChanged(node: UniqueAddress, status: MemberStatus): LogMarker =
LogMarker(
"akkaMemberChanged",
Map(
LogMarker.Properties.RemoteAddress -> node.address,
LogMarker.Properties.RemoteAddressUid -> node.longUid,
Properties.MemberStatus -> status))
/**
* Marker "akkaClusterSingletonStarted" of log event when Cluster Singleton
* instance has started.
*/
val singletonStarted: LogMarker =
LogMarker("akkaClusterSingletonStarted")
/**
* Marker "akkaClusterSingletonTerminated" of log event when Cluster Singleton
* instance has terminated.
*/
val singletonTerminated: LogMarker =
LogMarker("akkaClusterSingletonTerminated")
}

View file

@ -47,7 +47,8 @@ private[cluster] class CrossDcHeartbeatSender extends Actor {
import context.dispatcher
private val clusterLogger =
new cluster.ClusterLogger(Logging(context.system, ActorWithLogClass(this, ClusterLogClass.ClusterHeartbeat)))
new cluster.ClusterLogger(
Logging.withMarker(context.system, ActorWithLogClass(this, ClusterLogClass.ClusterHeartbeat)))
import clusterLogger._
// For inspecting if in active state; allows avoiding "becoming active" when already active

View file

@ -575,6 +575,14 @@ A more advanced (including most Akka added information) example pattern would be
<pattern>%date{ISO8601} level=[%level] marker=[%marker] logger=[%logger] akkaSource=[%X{akkaSource}] sourceActorSystem=[%X{sourceActorSystem}] sourceThread=[%X{sourceThread}] mdc=[ticket-#%X{ticketNumber}: %X{ticketDesc}] - msg=[%msg]%n----%n</pattern>
```
Akka is logging some events with markers. Some of these events also include structured MDC properties.
* The "SECURITY" marker is used for highlighting security related events or incidents.
* Akka Actor is using the markers defined in @apidoc[akka.actor.ActorLogMarker].
* Akka Cluster is using the markers defined in @apidoc[akka.cluster.ClusterLogMarker].
* Akka Remoting is using the markers defined in @apidoc[akka.remote.RemoteLogMarker].
* Akka Cluster Sharding is using the markers defined in @apidoc[akka.cluster.sharding.ShardingLogMarker].
#### Using SLF4J's Markers
It is also possible to use the `org.slf4j.Marker` with the `LoggingAdapter` when using slf4j.

View file

@ -433,6 +433,16 @@ With Logback the timestamp is available with `%X{akkaTimestamp}` specifier withi
</encoder>
```
### Markers
Akka is logging some events with markers. Some of these events also include structured MDC properties.
* The "SECURITY" marker is used for highlighting security related events or incidents.
* Akka Actor is using the markers defined in @apidoc[akka.actor.ActorLogMarker].
* Akka Cluster is using the markers defined in @apidoc[akka.cluster.ClusterLogMarker].
* Akka Remoting is using the markers defined in @apidoc[akka.remote.RemoteLogMarker].
* Akka Cluster Sharding is using the markers defined in @apidoc[akka.cluster.sharding.ShardingLogMarker].
### Logger names
It can be useful to enable debug level or other SLF4J backend configuration for certain modules of Akka when

View file

@ -7,12 +7,15 @@ package akka.remote
import akka.event.Logging.Warning
import akka.remote.FailureDetector.Clock
import java.util.concurrent.atomic.AtomicReference
import scala.annotation.tailrec
import scala.concurrent.duration.Duration
import scala.concurrent.duration.FiniteDuration
import scala.collection.immutable
import com.typesafe.config.Config
import akka.event.EventStream
import akka.event.Logging
import akka.util.Helpers.ConfigOps
/**
@ -144,7 +147,9 @@ class PhiAccrualFailureDetector(
Warning(
this.toString,
getClass,
s"heartbeat interval is growing too large for address $address: $interval millis"))
s"heartbeat interval is growing too large for address $address: $interval millis",
Logging.emptyMDC,
RemoteLogMarker.failureDetectorGrowing(address)))
oldState.history :+ interval
} else oldState.history
}

View file

@ -0,0 +1,66 @@
/*
* Copyright (C) 2019 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.remote
import akka.actor.Address
import akka.annotation.ApiMayChange
import akka.event.LogMarker
/**
* This is public with the purpose to document the used markers and properties of log events.
* No guarantee that it will remain binary compatible, but the marker names and properties
* are considered public API and will not be changed without notice.
*/
@ApiMayChange
object RemoteLogMarker {
/**
* Marker "akkaFailureDetectorGrowing" of log event when failure detector heartbeat interval
* is growing too large.
*
* @param remoteAddress The address of the node that the failure detector is monitoring. Included as property "akkaRemoteAddress".
*/
def failureDetectorGrowing(remoteAddress: String): LogMarker =
LogMarker("akkaFailureDetectorGrowing", Map(LogMarker.Properties.RemoteAddress -> remoteAddress))
/**
* Marker "akkaQuarantine" of log event when a node is quarantined.
*
* @param remoteAddress The address of the node that is quarantined. Included as property "akkaRemoteAddress".
* @param remoteAddressUid The address of the node that is quarantined. Included as property "akkaRemoteAddressUid".
*/
def quarantine(remoteAddress: Address, remoteAddressUid: Option[Long]): LogMarker =
LogMarker(
"akkaQuarantine",
Map(
LogMarker.Properties.RemoteAddress -> remoteAddress,
LogMarker.Properties.RemoteAddressUid -> remoteAddressUid.getOrElse("")))
/**
* Marker "akkaConnect" of log event when outbound connection is attempted.
*
* @param remoteAddress The address of the connected node. Included as property "akkaRemoteAddress".
* @param remoteAddressUid The address of the connected node. Included as property "akkaRemoteAddressUid".
*/
def connect(remoteAddress: Address, remoteAddressUid: Option[Long]): LogMarker =
LogMarker(
"akkaConnect",
Map(
LogMarker.Properties.RemoteAddress -> remoteAddress,
LogMarker.Properties.RemoteAddressUid -> remoteAddressUid.getOrElse("")))
/**
* Marker "akkaDisconnected" of log event when outbound connection is closed.
*
* @param remoteAddress The address of the disconnected node. Included as property "akkaRemoteAddress".
* @param remoteAddressUid The address of the disconnected node. Included as property "akkaRemoteAddressUid".
*/
def disconnected(remoteAddress: Address, remoteAddressUid: Option[Long]): LogMarker =
LogMarker(
"akkaDisconnected",
Map(
LogMarker.Properties.RemoteAddress -> remoteAddress,
LogMarker.Properties.RemoteAddressUid -> remoteAddressUid.getOrElse("")))
}

View file

@ -11,7 +11,7 @@ import akka.{ Done, NotUsed }
import akka.actor.{ Actor, ActorRef, Address, CoordinatedShutdown, Dropped, ExtendedActorSystem, Props }
import akka.annotation.InternalStableApi
import akka.dispatch.Dispatchers
import akka.event.{ Logging, LoggingAdapter }
import akka.event.{ Logging, MarkerLoggingAdapter }
import akka.remote.AddressUidExtension
import akka.remote.RemoteActorRef
import akka.remote.RemoteActorRefProvider
@ -28,7 +28,6 @@ import akka.stream._
import akka.stream.scaladsl.{ Flow, Keep, Sink }
import akka.util.{ unused, OptionVal, WildcardIndex }
import com.github.ghik.silencer.silent
import scala.annotation.tailrec
import scala.concurrent.{ Await, Future, Promise }
import scala.concurrent.duration._
@ -308,7 +307,7 @@ private[remote] abstract class ArteryTransport(_system: ExtendedActorSystem, _pr
@volatile private[this] var controlSubject: ControlMessageSubject = _
@volatile private[this] var messageDispatcher: MessageDispatcher = _
override val log: LoggingAdapter = Logging(system, getClass)
override val log: MarkerLoggingAdapter = Logging.withMarker(system, getClass)
/**
* Compression tables must be created once, such that inbound lane restarts don't cause dropping of the tables.

View file

@ -30,6 +30,7 @@ import akka.event.Logging
import akka.remote.DaemonMsgCreate
import akka.remote.PriorityMessage
import akka.remote.RemoteActorRef
import akka.remote.RemoteLogMarker
import akka.remote.UniqueAddress
import akka.remote.artery.ArteryTransport.AeronTerminated
import akka.remote.artery.ArteryTransport.ShuttingDown
@ -142,7 +143,7 @@ private[remote] class Association(
require(remoteAddress.port.nonEmpty)
private val log = Logging(transport.system, getClass)
private val log = Logging.withMarker(transport.system, getClass)
private def flightRecorder = transport.topLevelFlightRecorder
override def settings = transport.settings
@ -493,6 +494,7 @@ private[remote] class Association(
.publish(GracefulShutdownQuarantinedEvent(UniqueAddress(remoteAddress, u), reason))
} else {
log.warning(
RemoteLogMarker.quarantine(remoteAddress, Some(u)),
"Association to [{}] with UID [{}] is irrecoverably failed. UID is now quarantined and all " +
"messages to this UID will be delivered to dead letters. " +
"Remote ActorSystem must be restarted to recover from this situation. Reason: {}",
@ -516,6 +518,7 @@ private[remote] class Association(
}
case Some(peer) =>
log.info(
RemoteLogMarker.quarantine(remoteAddress, Some(u)),
"Quarantine of [{}] ignored due to non-matching UID, quarantine requested for [{}] but current is [{}]. {}",
remoteAddress,
u,
@ -524,12 +527,16 @@ private[remote] class Association(
send(ClearSystemMessageDelivery(current.incarnation - 1), OptionVal.None, OptionVal.None)
case None =>
log.info(
RemoteLogMarker.quarantine(remoteAddress, Some(u)),
"Quarantine of [{}] ignored because handshake not completed, quarantine request was for old incarnation. {}",
remoteAddress,
reason)
}
case None =>
log.warning("Quarantine of [{}] ignored because unknown UID", remoteAddress)
log.warning(
RemoteLogMarker.quarantine(remoteAddress, None),
"Quarantine of [{}] ignored because unknown UID",
remoteAddress)
}
}

View file

@ -26,6 +26,7 @@ import akka.actor.ExtendedActorSystem
import akka.dispatch.ExecutionContexts
import akka.event.Logging
import akka.remote.RemoteActorRefProvider
import akka.remote.RemoteLogMarker
import akka.remote.RemoteTransportException
import akka.remote.artery.Decoder.InboundCompressionAccess
import akka.remote.artery.compress._
@ -145,6 +146,26 @@ private[remote] class ArteryTcpTransport(
def connectionFlowWithRestart: Flow[ByteString, ByteString, NotUsed] = {
val restartCount = new AtomicInteger(0)
def logConnect(): Unit = {
if (log.isDebugEnabled)
log.debug(
RemoteLogMarker.connect(
outboundContext.remoteAddress,
outboundContext.associationState.uniqueRemoteAddressValue().map(_.uid)),
"Outbound connection opened to [{}]",
outboundContext.remoteAddress)
}
def logDisconnected(): Unit = {
if (log.isDebugEnabled)
log.debug(
RemoteLogMarker.disconnected(
outboundContext.remoteAddress,
outboundContext.associationState.uniqueRemoteAddressValue().map(_.uid)),
"Outbound connection closed to [{}]",
outboundContext.remoteAddress)
}
val flowFactory = () => {
val onFailureLogLevel = if (restartCount.incrementAndGet() == 1) Logging.WarningLevel else Logging.DebugLevel
@ -152,6 +173,7 @@ private[remote] class ArteryTcpTransport(
Flow[ByteString]
.via(Flow.lazyFlow(() => {
// only open the actual connection if any new messages are sent
logConnect()
afr.loFreq(
TcpOutbound_Connected,
s"${outboundContext.remoteAddress.host.get}:${outboundContext.remoteAddress.port.get} " +
@ -161,6 +183,12 @@ private[remote] class ArteryTcpTransport(
Flow[ByteString].prepend(Source.single(TcpFraming.encodeConnectionHeader(streamId))).via(connectionFlow)
}))
.mapError {
case ArteryTransport.ShutdownSignal => ArteryTransport.ShutdownSignal
case e =>
logDisconnected()
e
}
.recoverWithRetries(1, { case ArteryTransport.ShutdownSignal => Source.empty })
.log(name = s"outbound connection to [${outboundContext.remoteAddress}], ${streamName(streamId)} stream")
.addAttributes(Attributes.logLevels(onElement = LogLevels.Off, onFailure = onFailureLogLevel))

View file

@ -107,12 +107,22 @@ class Slf4jLogger extends Actor with SLF4JLogging with RequiresMessageQueue[Logg
@inline
final def withMdc(logSource: String, logEvent: LogEvent)(logStatement: => Unit): Unit = {
logEvent match {
case m: LogEventWithMarker if m.marker ne null =>
val properties = m.marker.properties
if (properties.nonEmpty) {
properties.foreach { case (k, v) => MDC.put(k, String.valueOf(v)) }
}
case _ =>
}
MDC.put(mdcAkkaSourceAttributeName, logSource)
MDC.put(mdcThreadAttributeName, logEvent.thread.getName)
MDC.put(mdcAkkaTimestamp, formatTimestamp(logEvent.timestamp))
MDC.put(mdcActorSystemAttributeName, context.system.name)
MDC.put(mdcAkkaAddressAttributeName, akkaAddress)
logEvent.mdc.foreach { case (k, v) => MDC.put(k, String.valueOf(v)) }
try logStatement
finally {
MDC.clear()
@ -174,7 +184,7 @@ class Slf4jLoggingFilter(@unused settings: ActorSystem.Settings, eventStream: Ev
}
/** Wraps [[org.slf4j.Marker]] */
final class Slf4jLogMarker(val marker: org.slf4j.Marker) extends LogMarker(name = marker.getName)
final class Slf4jLogMarker(val marker: org.slf4j.Marker) extends LogMarker(name = marker.getName, Map.empty)
/** Factory for creating [[LogMarker]] that wraps [[org.slf4j.Marker]] */
object Slf4jLogMarker {

View file

@ -7,7 +7,7 @@
</appender>
<appender name="TEST" class="akka.event.slf4j.Slf4jLoggerSpec$TestAppender">
<encoder>
<pattern>%date{ISO8601} level=[%level] marker=[%marker] logger=[%logger] akkaSource=[%X{akkaSource}] akkaAddress=[%X{akkaAddress}] sourceActorSystem=[%X{sourceActorSystem}] sourceThread=[%X{sourceThread}] mdc=[ticket-#%X{ticketNumber}: %X{ticketDesc}] - msg=[%msg]%n----%n</pattern>
<pattern>%date{ISO8601} level=[%level] marker=[%marker] logger=[%logger] akkaSource=[%X{akkaSource}] akkaAddress=[%X{akkaAddress}] sourceActorSystem=[%X{sourceActorSystem}] sourceThread=[%X{sourceThread}] mdc=[ticket-#%X{ticketNumber}: %X{ticketDesc} p1: %X{p1} p2: %X{p2}] - msg=[%msg]%n----%n</pattern>
</encoder>
</appender>
<logger name="akka.event.slf4j.Slf4jLoggingFilterSpec$DebugLevelProducer"

View file

@ -122,6 +122,16 @@ class Slf4jLoggerSpec extends AkkaSpec(Slf4jLoggerSpec.config) with BeforeAndAft
s should include("msg=[security-wise interesting message]")
}
"log info with marker and properties" in {
producer ! StringWithMarker("interesting message", LogMarker("testMarker", Map("p1" -> 1, "p2" -> "B")))
awaitCond(outputString.contains("----"), 5 seconds)
val s = outputString
s should include("marker=[testMarker]")
s should include("p1: 1 p2: B")
s should include("msg=[interesting message]")
}
"log info with slf4j marker" in {
val slf4jMarker = MarkerFactory.getMarker("SLF")
slf4jMarker.add(MarkerFactory.getMarker("ADDED")) // slf4j markers can have children
@ -143,7 +153,7 @@ class Slf4jLoggerSpec extends AkkaSpec(Slf4jLoggerSpec.config) with BeforeAndAft
awaitCond(outputString.contains("----"), 5 seconds)
val s = outputString
s should include("marker=[SLF [ ADDED ]]")
s should include("mdc=[ticket-#3671: Custom MDC Values]")
s should include("mdc=[ticket-#3671: Custom MDC Values")
s should include("msg=[security-wise interesting message]")
}
@ -158,7 +168,7 @@ class Slf4jLoggerSpec extends AkkaSpec(Slf4jLoggerSpec.config) with BeforeAndAft
s should include("level=[INFO]")
s should include("logger=[akka.event.slf4j.Slf4jLoggerSpec$LogProducer]")
(s should include).regex(sourceThreadRegex)
s should include("mdc=[ticket-#3671: Custom MDC Values]")
s should include("mdc=[ticket-#3671: Custom MDC Values")
s should include("msg=[Message with custom MDC values]")
}
@ -179,7 +189,7 @@ class Slf4jLoggerSpec extends AkkaSpec(Slf4jLoggerSpec.config) with BeforeAndAft
s should include("level=[INFO]")
s should include("logger=[akka.event.slf4j.Slf4jLoggerSpec$LogProducer]")
(s should include).regex(sourceThreadRegex)
s should include("mdc=[ticket-#3671: null]")
s should include("mdc=[ticket-#3671: null")
s should include("msg=[Message with null custom MDC values]")
}