Include host and port in mdc if clustered or remote #28073
This commit is contained in:
parent
38c938124e
commit
4588a3904f
13 changed files with 199 additions and 40 deletions
|
|
@ -7,6 +7,9 @@ package akka.actor.typed.scaladsl
|
|||
import java.util.concurrent.atomic.AtomicInteger
|
||||
import java.util.concurrent.atomic.AtomicReference
|
||||
|
||||
import akka.actor.ActorPath
|
||||
import akka.actor.ExtendedActorSystem
|
||||
import akka.actor.testkit.typed.LoggingEvent
|
||||
import akka.actor.testkit.typed.TestException
|
||||
import akka.actor.testkit.typed.scaladsl.ActorTestKit
|
||||
import akka.actor.testkit.typed.scaladsl.LoggingTestKit
|
||||
|
|
@ -263,11 +266,11 @@ class ActorLoggingSpec extends ScalaTestWithActorTestKit("""
|
|||
}
|
||||
}
|
||||
val actor =
|
||||
LoggingTestKit.info("Starting up").withMdc(Map(ActorMdc.TagsKey -> "tag1,tag2")).expect {
|
||||
LoggingTestKit.info("Starting up").withMdc(Map(ActorMdc.AkkaTagsKey -> "tag1,tag2")).expect {
|
||||
spawn(behavior, ActorTags("tag1", "tag2"))
|
||||
}
|
||||
|
||||
LoggingTestKit.info("Got message").withMdc(Map(ActorMdc.TagsKey -> "tag1,tag2")).expect {
|
||||
LoggingTestKit.info("Got message").withMdc(Map(ActorMdc.AkkaTagsKey -> "tag1,tag2")).expect {
|
||||
actor ! "ping"
|
||||
}
|
||||
}
|
||||
|
|
@ -342,7 +345,8 @@ class ActorLoggingSpec extends ScalaTestWithActorTestKit("""
|
|||
val ref = LoggingTestKit
|
||||
.info("Starting")
|
||||
// not counting for example "akkaSource", but it shouldn't have any other entries
|
||||
.withCustom(logEvent => logEvent.mdc.keysIterator.forall(_.startsWith("akka")))
|
||||
.withCustom(logEvent =>
|
||||
logEvent.mdc.keysIterator.forall(entry => entry.startsWith("akka") || entry == "sourceActorSystem"))
|
||||
.expect {
|
||||
spawn(behaviors)
|
||||
}
|
||||
|
|
@ -463,10 +467,10 @@ class ActorLoggingSpec extends ScalaTestWithActorTestKit("""
|
|||
|
||||
"always include some MDC values in the log" in {
|
||||
// need AtomicReference because LoggingFilter defined before actor is created and ActorTestKit names are dynamic
|
||||
val actorPathStr = new AtomicReference[String]
|
||||
val actorPath = new AtomicReference[ActorPath]
|
||||
val behavior =
|
||||
Behaviors.setup[Message] { context =>
|
||||
actorPathStr.set(context.self.path.toString)
|
||||
actorPath.set(context.self.path)
|
||||
context.log.info("Starting")
|
||||
Behaviors.receiveMessage { _ =>
|
||||
if (MDC.get("logSource") != null)
|
||||
|
|
@ -476,15 +480,32 @@ class ActorLoggingSpec extends ScalaTestWithActorTestKit("""
|
|||
}
|
||||
}
|
||||
|
||||
def assertExpectedMdc(event: LoggingEvent) = {
|
||||
try {
|
||||
event.mdc should contain allElementsOf (
|
||||
Map(
|
||||
ActorMdc.AkkaAddressKey -> system.classicSystem.asInstanceOf[ExtendedActorSystem].provider.addressString,
|
||||
ActorMdc.AkkaSourceKey -> actorPath.get.toString,
|
||||
ActorMdc.SourceActorSystemKey -> system.name)
|
||||
)
|
||||
true
|
||||
} catch {
|
||||
case ex: Throwable =>
|
||||
// give us some info about what was missing thanks
|
||||
ex.printStackTrace()
|
||||
false
|
||||
}
|
||||
}
|
||||
|
||||
// log from setup
|
||||
// can't use LoggingEventFilter.withMdc here because the actorPathStr isn't know yet
|
||||
val ref =
|
||||
LoggingTestKit.info("Starting").withCustom(event => event.mdc("akkaSource") == actorPathStr.get).expect {
|
||||
LoggingTestKit.info("Starting").withCustom(assertExpectedMdc).expect {
|
||||
spawn(behavior)
|
||||
}
|
||||
|
||||
// on message
|
||||
LoggingTestKit.info("Got message!").withMdc(Map("akkaSource" -> actorPathStr.get)).withOccurrences(10).expect {
|
||||
LoggingTestKit.info("Got message!").withCustom(assertExpectedMdc).withOccurrences(10).expect {
|
||||
(1 to 10).foreach { n =>
|
||||
ref ! Message(n, s"msg-$n")
|
||||
}
|
||||
|
|
|
|||
|
|
@ -0,0 +1,6 @@
|
|||
# internal changes
|
||||
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.actor.typed.internal.ActorMdc.SourceKey")
|
||||
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.actor.typed.internal.ActorMdc.TagsKey")
|
||||
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.actor.typed.internal.ActorMdc.TagsKey")
|
||||
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.actor.typed.internal.ActorMdc.SourceKey")
|
||||
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.actor.typed.internal.ActorMdc.setMdc")
|
||||
|
|
@ -10,14 +10,18 @@ import java.util.ArrayList
|
|||
import java.util.Optional
|
||||
import java.util.concurrent.CompletionStage
|
||||
|
||||
import akka.actor.Address
|
||||
import akka.actor.typed.internal.adapter.ActorSystemAdapter
|
||||
|
||||
import scala.concurrent.{ ExecutionContextExecutor, Future }
|
||||
import scala.reflect.ClassTag
|
||||
import scala.util.Try
|
||||
|
||||
import akka.annotation.InternalApi
|
||||
import akka.dispatch.ExecutionContexts
|
||||
import akka.util.{ BoxedType, OptionVal, Timeout }
|
||||
import akka.util.{ BoxedType, Timeout }
|
||||
import akka.util.Timeout
|
||||
import akka.util.JavaDurationConverters._
|
||||
import akka.util.OptionVal
|
||||
import com.github.ghik.silencer.silent
|
||||
import org.slf4j.Logger
|
||||
import org.slf4j.LoggerFactory
|
||||
|
|
@ -28,21 +32,44 @@ import org.slf4j.LoggerFactory
|
|||
@InternalApi private[akka] object ActorContextImpl {
|
||||
|
||||
// single context for logging as there are a few things that are initialized
|
||||
// together
|
||||
final class LoggingContext(val logger: Logger, tags: Set[String], val hasCustomName: Boolean) {
|
||||
// together that we can cache as long as the actor is alive
|
||||
object LoggingContext {
|
||||
def apply(logger: Logger, tags: Set[String], ctx: ActorContextImpl[_]): LoggingContext = {
|
||||
val tagsString =
|
||||
// "" means no tags
|
||||
if (tags.isEmpty) ""
|
||||
else
|
||||
// mdc can only contain string values, and we don't want to render that string
|
||||
// on each log entry or message, so do that up front here
|
||||
tags.mkString(",")
|
||||
|
||||
val akkaSource = ctx.self.path.toString
|
||||
|
||||
val akkaAddress =
|
||||
ctx.system match {
|
||||
case adapter: ActorSystemAdapter[_] => adapter.provider.addressString
|
||||
case _ => Address("akka", ctx.system.name).toString
|
||||
}
|
||||
|
||||
val sourceActorSystem = ctx.system.name
|
||||
|
||||
new LoggingContext(logger, tagsString, akkaSource, sourceActorSystem, akkaAddress, hasCustomName = false)
|
||||
}
|
||||
}
|
||||
|
||||
final case class LoggingContext(
|
||||
logger: Logger,
|
||||
tagsString: String,
|
||||
akkaSource: String,
|
||||
sourceActorSystem: String,
|
||||
akkaAddress: String,
|
||||
hasCustomName: Boolean) {
|
||||
// toggled once per message if logging is used to avoid having to
|
||||
// touch the mdc thread local for cleanup in the end
|
||||
var mdcUsed = false
|
||||
val tagsString =
|
||||
// "" means no tags
|
||||
if (tags.isEmpty) ""
|
||||
else
|
||||
// mdc can only contain string values, and we don't want to render that string
|
||||
// on each log entry or message, so do that up front here
|
||||
tags.mkString(",")
|
||||
|
||||
def withLogger(logger: Logger): LoggingContext = {
|
||||
val l = new LoggingContext(logger, tags, hasCustomName = true)
|
||||
val l = copy(logger = logger, hasCustomName = true)
|
||||
l.mdcUsed = mdcUsed
|
||||
l
|
||||
}
|
||||
|
|
@ -117,7 +144,7 @@ import org.slf4j.LoggerFactory
|
|||
case OptionVal.None =>
|
||||
val logClass = LoggerClass.detectLoggerClassFromStack(classOf[Behavior[_]])
|
||||
val logger = LoggerFactory.getLogger(logClass.getName)
|
||||
val l = new LoggingContext(logger, classicActorContext.props.deploy.tags, hasCustomName = false)
|
||||
val l = LoggingContext(logger, classicActorContext.props.deploy.tags, this)
|
||||
_logging = OptionVal.Some(l)
|
||||
l
|
||||
}
|
||||
|
|
@ -125,9 +152,7 @@ import org.slf4j.LoggerFactory
|
|||
|
||||
override def log: Logger = {
|
||||
val logging = loggingContext()
|
||||
// avoid access to MDC ThreadLocal if not needed, see details in LoggingContext
|
||||
logging.mdcUsed = true
|
||||
ActorMdc.setMdc(self.path.toString, logging.tagsString)
|
||||
ActorMdc.setMdc(logging)
|
||||
logging.logger
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -11,16 +11,20 @@ import org.slf4j.MDC
|
|||
* INTERNAL API
|
||||
*/
|
||||
@InternalApi private[akka] object ActorMdc {
|
||||
val SourceKey = "akkaSource"
|
||||
val TagsKey = "akkaTags"
|
||||
val SourceActorSystemKey = "sourceActorSystem"
|
||||
val AkkaSourceKey = "akkaSource"
|
||||
val AkkaTagsKey = "akkaTags"
|
||||
val AkkaAddressKey = "akkaAddress"
|
||||
|
||||
/**
|
||||
* @param tags empty string for no tags, a single tag or a comma separated list of tags
|
||||
*/
|
||||
def setMdc(source: String, tags: String): Unit = {
|
||||
MDC.put(SourceKey, source)
|
||||
if (tags.nonEmpty)
|
||||
MDC.put(TagsKey, tags)
|
||||
def setMdc(context: ActorContextImpl.LoggingContext): Unit = {
|
||||
// avoid access to MDC ThreadLocal if not needed, see details in LoggingContext
|
||||
context.mdcUsed = true
|
||||
MDC.put(AkkaSourceKey, context.akkaSource)
|
||||
MDC.put(SourceActorSystemKey, context.sourceActorSystem)
|
||||
MDC.put(AkkaAddressKey, context.akkaAddress)
|
||||
// empty string for no tags, a single tag or a comma separated list of tags
|
||||
if (context.tagsString.nonEmpty)
|
||||
MDC.put(AkkaTagsKey, context.tagsString)
|
||||
}
|
||||
|
||||
// MDC is cleared (if used) from aroundReceive in ActorAdapter after processing each message,
|
||||
|
|
|
|||
|
|
@ -0,0 +1,2 @@
|
|||
# new method/field on provider
|
||||
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.actor.ActorRefProvider.addressString")
|
||||
|
|
@ -153,6 +153,13 @@ import akka.util.OptionVal
|
|||
|
||||
/** INTERNAL API */
|
||||
@InternalApi private[akka] def serializationInformation: Serialization.Information
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
*/
|
||||
@InternalApi
|
||||
private[akka] def addressString: String
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -729,4 +736,17 @@ private[akka] class LocalActorRefProvider private[akka] (
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
// lazily initialized with fallback since it can depend on transport which is not initialized up front
|
||||
// worth caching since if it is used once in a system it will very likely be used many times
|
||||
@volatile private var _addressString: OptionVal[String] = OptionVal.None
|
||||
override private[akka] def addressString: String = {
|
||||
_addressString match {
|
||||
case OptionVal.Some(addr) => addr
|
||||
case OptionVal.None =>
|
||||
val addr = getDefaultAddress.toString
|
||||
_addressString = OptionVal.Some(addr)
|
||||
addr
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -0,0 +1,54 @@
|
|||
/*
|
||||
* Copyright (C) 2009-2019 Lightbend Inc. <https://www.lightbend.com>
|
||||
*/
|
||||
|
||||
package akka.cluster.typed
|
||||
|
||||
import akka.actor.ExtendedActorSystem
|
||||
import akka.actor.testkit.typed.scaladsl.LogCapturing
|
||||
import akka.actor.testkit.typed.scaladsl.LoggingTestKit
|
||||
import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
|
||||
import akka.actor.typed.internal.ActorMdc
|
||||
import akka.actor.typed.scaladsl.Behaviors
|
||||
import com.typesafe.config.ConfigFactory
|
||||
import org.scalatest.Matchers
|
||||
import org.scalatest.WordSpecLike
|
||||
|
||||
object ClusterActorLoggingSpec {
|
||||
def config = ConfigFactory.parseString("""
|
||||
akka.actor.provider = cluster
|
||||
akka.remote.classic.netty.tcp.port = 0
|
||||
akka.remote.artery.canonical.port = 0
|
||||
akka.remote.artery.canonical.hostname = 127.0.0.1
|
||||
""")
|
||||
}
|
||||
|
||||
class ClusterActorLoggingSpec
|
||||
extends ScalaTestWithActorTestKit(ClusterActorLoggingSpec.config)
|
||||
with WordSpecLike
|
||||
with Matchers
|
||||
with LogCapturing {
|
||||
|
||||
"Logging from an actor in a cluster" must {
|
||||
|
||||
"include host and port in sourceActorSystem mdc entry" in {
|
||||
|
||||
def addressString = system.classicSystem.asInstanceOf[ExtendedActorSystem].provider.addressString
|
||||
|
||||
val behavior =
|
||||
Behaviors.setup[String] { context =>
|
||||
context.log.info("Starting")
|
||||
Behaviors.empty
|
||||
}
|
||||
|
||||
LoggingTestKit
|
||||
.info("Starting")
|
||||
.withCustom { event =>
|
||||
event.mdc(ActorMdc.AkkaAddressKey) == addressString
|
||||
}
|
||||
.expect {
|
||||
spawn(behavior)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -401,7 +401,7 @@ With Logback the actor path is available with `%X{akkaSource}` specifier within
|
|||
</encoder>
|
||||
```
|
||||
|
||||
The actor system in which the logging was performed is available in the MDC with attribute name `sourceActorSystem`,
|
||||
The actor system name in which the logging was performed is available in the MDC with attribute name `sourceActorSystem`,
|
||||
but that is typically also included in the `akkaSource` attribute.
|
||||
With Logback the ActorSystem name is available with `%X{sourceActorSystem}` specifier within the pattern layout configuration:
|
||||
|
||||
|
|
@ -411,9 +411,20 @@ With Logback the ActorSystem name is available with `%X{sourceActorSystem}` spec
|
|||
</encoder>
|
||||
```
|
||||
|
||||
Akka's internal logging is asynchronous which means that the timestamp of a log entry is taken from
|
||||
The address of the actor system, containing host and port if the system is using cluster, is available through `akkaAddress`:
|
||||
|
||||
```
|
||||
<encoder>
|
||||
<pattern>%date{ISO8601} %-5level %logger{36} %X{akkaAddress} - %msg%n</pattern>
|
||||
</encoder>
|
||||
```
|
||||
|
||||
|
||||
For typed actors the log event timestamp is taken when the log call was made but for
|
||||
Akka's _internal_ logging as well as the classic actor logging is asynchronous which means that the timestamp of a log entry is taken from
|
||||
when the underlying logger implementation is called, which can be surprising at first.
|
||||
If you want to more accurately output the timestamp, use the MDC attribute `akkaTimestamp`.
|
||||
If you want to more accurately output the timestamp for such loggers, use the MDC attribute `akkaTimestamp`. Note that
|
||||
the MDC key will not have any value for a typed actor.
|
||||
With Logback the timestamp is available with `%X{akkaTimestamp}` specifier within the pattern layout configuration:
|
||||
|
||||
```
|
||||
|
|
|
|||
|
|
@ -114,7 +114,7 @@ public class LoggerSourceTest extends JUnitSuite {
|
|||
return testKit.spawn(behavior);
|
||||
});
|
||||
|
||||
// MDC persistenceId ajd persistencePhase for the "command-received" not included in the
|
||||
// MDC persistenceId and persistencePhase for the "command-received" not included in the
|
||||
// "command-received" logging, because that is via ActorContext.log directly and
|
||||
// EventSourcedBehaviorImpl
|
||||
// isn't involved.
|
||||
|
|
|
|||
|
|
@ -246,7 +246,6 @@ private[akka] class RemoteActorRefProvider(
|
|||
case ArterySettings.Tcp => new ArteryTcpTransport(system, this, tlsEnabled = false)
|
||||
case ArterySettings.TlsTcp => new ArteryTcpTransport(system, this, tlsEnabled = true)
|
||||
} else new Remoting(system, this))
|
||||
|
||||
_internals = internals
|
||||
remotingTerminator ! internals
|
||||
|
||||
|
|
@ -258,6 +257,7 @@ private[akka] class RemoteActorRefProvider(
|
|||
// this enables reception of remote requests
|
||||
transport.start()
|
||||
|
||||
_addressString = OptionVal.Some(_internals.transport.defaultAddress.toString)
|
||||
_remoteWatcher = createOrNone[ActorRef](createRemoteWatcher(system))
|
||||
remoteDeploymentWatcher = createOrNone[ActorRef](createRemoteDeploymentWatcher(system))
|
||||
}
|
||||
|
|
@ -623,6 +623,17 @@ private[akka] class RemoteActorRefProvider(
|
|||
def quarantine(address: Address, uid: Option[Long], reason: String): Unit =
|
||||
transport.quarantine(address, uid, reason)
|
||||
|
||||
// lazily initialized with fallback since it can depend on transport which is not initialized up front
|
||||
// worth caching since if it is used once in a system it will very likely be used many times
|
||||
@volatile private var _addressString: OptionVal[String] = OptionVal.None
|
||||
override private[akka] def addressString: String = {
|
||||
_addressString match {
|
||||
case OptionVal.Some(addr) => addr
|
||||
case OptionVal.None =>
|
||||
// not initialized yet, fallback
|
||||
local.addressString
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private[akka] trait RemoteRef extends ActorRefScope {
|
||||
|
|
|
|||
|
|
@ -58,6 +58,9 @@ class Slf4jLogger extends Actor with SLF4JLogging with RequiresMessageQueue[Logg
|
|||
val mdcActorSystemAttributeName = "sourceActorSystem"
|
||||
val mdcAkkaSourceAttributeName = "akkaSource"
|
||||
val mdcAkkaTimestamp = "akkaTimestamp"
|
||||
val mdcAkkaAddressAttributeName = "akkaAddress"
|
||||
|
||||
private def akkaAddress = context.system.asInstanceOf[ExtendedActorSystem].provider.addressString
|
||||
|
||||
def receive = {
|
||||
|
||||
|
|
@ -107,7 +110,8 @@ class Slf4jLogger extends Actor with SLF4JLogging with RequiresMessageQueue[Logg
|
|||
MDC.put(mdcAkkaSourceAttributeName, logSource)
|
||||
MDC.put(mdcThreadAttributeName, logEvent.thread.getName)
|
||||
MDC.put(mdcAkkaTimestamp, formatTimestamp(logEvent.timestamp))
|
||||
MDC.put(mdcActorSystemAttributeName, actorSystemName)
|
||||
MDC.put(mdcActorSystemAttributeName, context.system.name)
|
||||
MDC.put(mdcAkkaAddressAttributeName, akkaAddress)
|
||||
logEvent.mdc.foreach { case (k, v) => MDC.put(k, String.valueOf(v)) }
|
||||
try logStatement
|
||||
finally {
|
||||
|
|
@ -134,7 +138,6 @@ class Slf4jLogger extends Actor with SLF4JLogging with RequiresMessageQueue[Logg
|
|||
protected def formatTimestamp(timestamp: Long): String =
|
||||
Helpers.currentTimeMillisToUTCString(timestamp)
|
||||
|
||||
private val actorSystemName = context.system.name
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
|||
|
|
@ -7,7 +7,7 @@
|
|||
</appender>
|
||||
<appender name="TEST" class="akka.event.slf4j.Slf4jLoggerSpec$TestAppender">
|
||||
<encoder>
|
||||
<pattern>%date{ISO8601} level=[%level] marker=[%marker] logger=[%logger] akkaSource=[%X{akkaSource}] sourceActorSystem=[%X{sourceActorSystem}] sourceThread=[%X{sourceThread}] mdc=[ticket-#%X{ticketNumber}: %X{ticketDesc}] - msg=[%msg]%n----%n</pattern>
|
||||
<pattern>%date{ISO8601} level=[%level] marker=[%marker] logger=[%logger] akkaSource=[%X{akkaSource}] akkaAddress=[%X{akkaAddress}] sourceActorSystem=[%X{sourceActorSystem}] sourceThread=[%X{sourceThread}] mdc=[ticket-#%X{ticketNumber}: %X{ticketDesc}] - msg=[%msg]%n----%n</pattern>
|
||||
</encoder>
|
||||
</appender>
|
||||
<logger name="akka.event.slf4j.Slf4jLoggingFilterSpec$DebugLevelProducer"
|
||||
|
|
|
|||
|
|
@ -91,6 +91,7 @@ class Slf4jLoggerSpec extends AkkaSpec(Slf4jLoggerSpec.config) with BeforeAndAft
|
|||
awaitCond(outputString.contains("----"), 5 seconds)
|
||||
val s = outputString
|
||||
s should include("akkaSource=[akka://Slf4jLoggerSpec/user/logProducer]")
|
||||
s should include("akkaAddress=[akka://Slf4jLoggerSpec]")
|
||||
s should include("level=[ERROR]")
|
||||
s should include("logger=[akka.event.slf4j.Slf4jLoggerSpec$LogProducer]")
|
||||
(s should include).regex(sourceThreadRegex)
|
||||
|
|
@ -105,6 +106,7 @@ class Slf4jLoggerSpec extends AkkaSpec(Slf4jLoggerSpec.config) with BeforeAndAft
|
|||
awaitCond(outputString.contains("----"), 5 seconds)
|
||||
val s = outputString
|
||||
s should include("akkaSource=[akka://Slf4jLoggerSpec/user/logProducer]")
|
||||
s should include("akkaAddress=[akka://Slf4jLoggerSpec]")
|
||||
s should include("level=[INFO]")
|
||||
s should include("logger=[akka.event.slf4j.Slf4jLoggerSpec$LogProducer]")
|
||||
(s should include).regex(sourceThreadRegex)
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue