+act,slf #11715 Add configurable LoggingFilter

* The filter is used by the LoggingAdapter before publishing
  to the event bus
* Slf4jLoggingFilter uses backend log level configuration
  (e.g. logback.xml)
This commit is contained in:
Patrik Nordwall 2014-06-19 11:33:08 +02:00
parent 51062ff494
commit 813543e8f8
13 changed files with 285 additions and 46 deletions

View file

@ -12,6 +12,7 @@ import scala.concurrent.duration._
import akka.actor.ActorSystem
import akka.event.Logging.DefaultLogger
import java.util.concurrent.TimeUnit
import akka.event.DefaultLoggingFilter
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class ConfigSpec extends AkkaSpec(ConfigFactory.defaultReference(ActorSystem.findClassLoader())) {
@ -58,6 +59,8 @@ class ConfigSpec extends AkkaSpec(ConfigFactory.defaultReference(ActorSystem.fin
getDuration("akka.logger-startup-timeout", TimeUnit.MILLISECONDS) should be(5.seconds.toMillis)
settings.LoggerStartTimeout.duration should be(5.seconds)
getString("akka.logging-filter") should be(classOf[DefaultLoggingFilter].getName)
getInt("akka.log-dead-letters") should be(10)
settings.LogDeadLetters should be(10)

View file

@ -16,6 +16,15 @@ akka {
# to STDOUT)
loggers = ["akka.event.Logging$DefaultLogger"]
# Filter of log events that is used by the LoggingAdapter before
# publishing log events to the eventStream. It can perform
# fine grained filtering based on the log source. The default
# implementation filters on the `loglevel`.
# FQCN of the LoggingFilter. The Class of the FQCN must implement
# akka.event.LoggingFilter and have a public constructor with
# (akka.actor.ActorSystem.Settings, akka.event.EventStream) parameters.
logging-filter = "akka.event.DefaultLoggingFilter"
# Loggers are created and registered synchronously during ActorSystem
# start-up, and since they are actors, this timeout is used to bound the
# waiting time

View file

@ -177,6 +177,7 @@ object ActorSystem {
final val LogLevel: String = getString("akka.loglevel")
final val StdoutLogLevel: String = getString("akka.stdout-loglevel")
final val Loggers: immutable.Seq[String] = immutableSeq(getStringList("akka.loggers"))
final val LoggingFilter: String = getString("akka.logging-filter")
final val LoggerStartTimeout: Timeout = Timeout(config.getMillisDuration("akka.logger-startup-timeout"))
final val LogConfigOnStart: Boolean = config.getBoolean("akka.log-config-on-start")
final val LogDeadLetters: Int = config.getString("akka.log-dead-letters").toLowerCase match {
@ -483,12 +484,19 @@ abstract class ExtendedActorSystem extends ActorSystem {
*/
def dynamicAccess: DynamicAccess
/**
* Filter of log events that is used by the LoggingAdapter before
* publishing log events to the eventStream
*/
def logFilter: LoggingFilter
/**
* For debugging: traverse actor hierarchy and make string representation.
* Careful, this may OOM on large actor systems, and it is only meant for
* helping debugging in case something already went terminally wrong.
*/
private[akka] def printTree: String
}
private[akka] class ActorSystemImpl(val name: String, applicationConfig: Config, classLoader: ClassLoader, defaultExecutionContext: Option[ExecutionContext]) extends ExtendedActorSystem {
@ -570,7 +578,12 @@ private[akka] class ActorSystemImpl(val name: String, applicationConfig: Config,
val eventStream = new EventStream(this, DebugEventStream)
eventStream.startStdoutLogger(settings)
val log: LoggingAdapter = new BusLogging(eventStream, "ActorSystem(" + name + ")", this.getClass)
val logFilter: LoggingFilter = {
val arguments = Vector(classOf[Settings] -> settings, classOf[EventStream] -> eventStream)
dynamicAccess.createInstanceFor[LoggingFilter](LoggingFilter, arguments).get
}
val log: LoggingAdapter = new BusLogging(eventStream, "ActorSystem(" + name + ")", this.getClass, logFilter)
val scheduler: Scheduler = createScheduler()

View file

@ -8,7 +8,7 @@ import java.util.concurrent._
import akka.event.Logging.{ Debug, Error, LogEventException }
import akka.actor._
import akka.dispatch.sysmsg._
import akka.event.{ BusLogging, EventStream }
import akka.event.EventStream
import com.typesafe.config.{ ConfigFactory, Config }
import akka.util.{ Unsafe, Index }
import scala.annotation.tailrec

View file

@ -496,7 +496,7 @@ object Logging {
*/
def apply[T: LogSource](system: ActorSystem, logSource: T): LoggingAdapter = {
val (str, clazz) = LogSource(logSource, system)
new BusLogging(system.eventStream, str, clazz)
new BusLogging(system.eventStream, str, clazz, system.asInstanceOf[ExtendedActorSystem].logFilter)
}
/**
@ -508,6 +508,10 @@ object Logging {
* See the companion object of [[akka.event.LogSource]] for details.
*
* You can add your own rules quite easily, see [[akka.event.LogSource]].
*
* Note that this `LoggingAdapter` will use the [[akka.event.DefaultLoggingFilter]],
* and not the [[akka.event.LoggingFilter]] configured for the system
* (if different from `DefaultLoggingFilter`).
*/
def apply[T: LogSource](bus: LoggingBus, logSource: T): LoggingAdapter = {
val (str, clazz) = LogSource(logSource)
@ -520,7 +524,8 @@ object Logging {
*/
def apply(logSource: Actor): DiagnosticLoggingAdapter = {
val (str, clazz) = LogSource(logSource)
new BusLogging(logSource.context.system.eventStream, str, clazz) with DiagnosticLoggingAdapter
val system = logSource.context.system.asInstanceOf[ExtendedActorSystem]
new BusLogging(system.eventStream, str, clazz, system.logFilter) with DiagnosticLoggingAdapter
}
/**
@ -543,7 +548,7 @@ object Logging {
*/
def getLogger(system: ActorSystem, logSource: AnyRef): LoggingAdapter = {
val (str, clazz) = LogSource.fromAnyRef(logSource, system)
new BusLogging(system.eventStream, str, clazz)
new BusLogging(system.eventStream, str, clazz, system.asInstanceOf[ExtendedActorSystem].logFilter)
}
/**
@ -553,6 +558,10 @@ object Logging {
* must have a corresponding implicit LogSource[T] instance in scope; by
* default these are provided for Class[_], Actor, ActorRef and String types.
* See the companion object of [[akka.event.LogSource]] for details.
*
* Note that this `LoggingAdapter` will use the [[akka.event.DefaultLoggingFilter]],
* and not the [[akka.event.LoggingFilter]] configured for the system
* (if different from `DefaultLoggingFilter`).
*/
def getLogger(bus: LoggingBus, logSource: AnyRef): LoggingAdapter = {
val (str, clazz) = LogSource.fromAnyRef(logSource)
@ -565,7 +574,8 @@ object Logging {
*/
def getLogger(logSource: UntypedActor): DiagnosticLoggingAdapter = {
val (str, clazz) = LogSource.fromAnyRef(logSource)
new BusLogging(logSource.getContext().system.eventStream, str, clazz) with DiagnosticLoggingAdapter
val system = logSource.getContext().system.asInstanceOf[ExtendedActorSystem]
new BusLogging(system.eventStream, str, clazz, system.logFilter) with DiagnosticLoggingAdapter
}
/**
@ -1073,6 +1083,39 @@ trait LoggingAdapter {
}
}
/**
* Filter of log events that is used by the `LoggingAdapter` before
* publishing log events to the `eventStream`. It can perform
* fine grained filtering based on the log source.
*
* Note that the [[EventStream]] will only subscribe `loggers` to the events
* corresponding to the `logLevel` of the `EventStream`. Therefore it is good
* practice that the `LoggingFilter` implementation first filters using the
* `logLevel` of the `EventStream` before applying more fine grained filters.
*/
trait LoggingFilter {
def isErrorEnabled(logClass: Class[_], logSource: String): Boolean
def isWarningEnabled(logClass: Class[_], logSource: String): Boolean
def isInfoEnabled(logClass: Class[_], logSource: String): Boolean
def isDebugEnabled(logClass: Class[_], logSource: String): Boolean
}
/**
* Default [[LoggingFilter]] that uses the logLevel of the `eventStream`, which
* initial value is defined in configuration. The logLevel `eventStream` can be
* changed while the system is running.
*/
class DefaultLoggingFilter(logLevel: () Logging.LogLevel) extends LoggingFilter {
def this(settings: Settings, eventStream: EventStream) = this(() eventStream.logLevel)
import Logging._
def isErrorEnabled(logClass: Class[_], logSource: String) = logLevel() >= ErrorLevel
def isWarningEnabled(logClass: Class[_], logSource: String) = logLevel() >= WarningLevel
def isInfoEnabled(logClass: Class[_], logSource: String) = logLevel() >= InfoLevel
def isDebugEnabled(logClass: Class[_], logSource: String) = logLevel() >= DebugLevel
}
/**
* LoggingAdapter extension which adds MDC support.
* Only recommended to be used within Actors as it isn't thread safe.
@ -1139,14 +1182,20 @@ trait DiagnosticLoggingAdapter extends LoggingAdapter {
/**
* [[akka.event.LoggingAdapter]] that publishes [[akka.event.Logging.LogEvent]] to event stream.
*/
class BusLogging(val bus: LoggingBus, val logSource: String, val logClass: Class[_]) extends LoggingAdapter {
class BusLogging(val bus: LoggingBus, val logSource: String, val logClass: Class[_], loggingFilter: LoggingFilter)
extends LoggingAdapter {
// For backwards compatibility, and when LoggingAdapter is created without direct
// association to an ActorSystem
def this(bus: LoggingBus, logSource: String, logClass: Class[_]) =
this(bus, logSource, logClass, new DefaultLoggingFilter(() bus.logLevel))
import Logging._
def isErrorEnabled = bus.logLevel >= ErrorLevel
def isWarningEnabled = bus.logLevel >= WarningLevel
def isInfoEnabled = bus.logLevel >= InfoLevel
def isDebugEnabled = bus.logLevel >= DebugLevel
def isErrorEnabled = loggingFilter.isErrorEnabled(logClass, logSource)
def isWarningEnabled = loggingFilter.isWarningEnabled(logClass, logSource)
def isInfoEnabled = loggingFilter.isInfoEnabled(logClass, logSource)
def isDebugEnabled = loggingFilter.isDebugEnabled(logClass, logSource)
protected def notifyError(message: String): Unit = bus.publish(Error(logSource, logClass, message, mdc))
protected def notifyError(cause: Throwable, message: String): Unit = bus.publish(Error(cause, logSource, logClass, message, mdc))

View file

@ -148,6 +148,10 @@ A custom ``application.conf`` might look like this::
# Options: OFF, ERROR, WARNING, INFO, DEBUG
stdout-loglevel = "DEBUG"
# Filter of log events that is used by the LoggingAdapter before
# publishing log events to the eventStream.
logging-filter = "akka.event.slf4j.Slf4jLoggingFilter"
actor {
provider = "akka.cluster.ClusterActorRefProvider"

View file

@ -222,7 +222,9 @@ and it will receive the log events in the same order as they were emitted.
You can configure which event handlers are created at system start-up and listen to logging events. That is done using the
``loggers`` element in the :ref:`configuration`.
Here you can also define the log level.
Here you can also define the log level. More fine grained filtering based on the log source
can be implemented in a custom ``LoggingFilter``, which can be defined in the ``logging-filter``
configuration property.
.. code-block:: ruby
@ -245,8 +247,6 @@ Example of creating a listener:
.. includecode:: code/docs/event/LoggingDocTest.java
:include: my-event-listener
.. _slf4j-java:
Logging to stdout during startup and shutdown
=============================================
@ -255,6 +255,8 @@ Instead log messages are printed to stdout (System.out). The default log level f
stdout logger is ``WARNING`` and it can be silenced completely by setting
``akka.stdout-loglevel=OFF``.
.. _slf4j-java:
SLF4J
=====
@ -269,16 +271,19 @@ It has one single dependency; the slf4j-api jar. In runtime you also need a SLF4
<version>1.0.13</version>
</dependency>
You need to enable the Slf4jLogger in the 'loggers' element in
You need to enable the Slf4jLogger in the ``loggers`` element in
the :ref:`configuration`. Here you can also define the log level of the event bus.
More fine grained log levels can be defined in the configuration of the SLF4J backend
(e.g. logback.xml).
(e.g. logback.xml). You should also define ``akka.event.slf4j.Slf4jLoggingFilter`` in
the ``logging-filter`` configuration property. It will filter the log events using the backend
configuration (e.g. logback.xml) before they are published to the event bus.
.. code-block:: ruby
akka {
loggers = ["akka.event.slf4j.Slf4jLogger"]
loglevel = "DEBUG"
logging-filter = "akka.event.slf4j.Slf4jLoggingFilter"
}
One gotcha is that the timestamp is attributed in the event handler, not when actually doing the logging.

View file

@ -94,3 +94,12 @@ The following, previously deprecated, features have been removed:
* Java API TestKit.dilated, moved to JavaTestKit.dilated
Slf4j logging filter
====================
If you use ``Slf4jLogger`` you should add the following configuration::
akka.logging-filter = "akka.event.slf4j.Slf4jLoggingFilter"
It will filter the log events using the backend configuration (e.g. logback.xml) before
they are published to the event bus.

View file

@ -263,7 +263,9 @@ and it will receive the log events in the same order as they were emitted.
You can configure which event handlers are created at system start-up and listen to logging events. That is done using the
``loggers`` element in the :ref:`configuration`.
Here you can also define the log level.
Here you can also define the log level. More fine grained filtering based on the log source
can be implemented in a custom ``LoggingFilter``, which can be defined in the ``logging-filter``
configuration property.
.. code-block:: ruby
@ -284,8 +286,6 @@ Example of creating a listener:
.. includecode:: code/docs/event/LoggingDocSpec.scala
:include: my-event-listener
.. _slf4j-scala:
Logging to stdout during startup and shutdown
=============================================
@ -294,6 +294,8 @@ Instead log messages are printed to stdout (System.out). The default log level f
stdout logger is ``WARNING`` and it can be silenced completely by setting
``akka.stdout-loglevel=OFF``.
.. _slf4j-scala:
SLF4J
=====
@ -302,19 +304,22 @@ It has one single dependency; the slf4j-api jar. In runtime you also need a SLF4
.. code-block:: scala
lazy val logback = "ch.qos.logback" % "logback-classic" % "1.0.13"
libraryDependencies += "ch.qos.logback" % "logback-classic" % "1.0.13"
You need to enable the Slf4jLogger in the 'loggers' element in
You need to enable the Slf4jLogger in the ``loggers`` element in
the :ref:`configuration`. Here you can also define the log level of the event bus.
More fine grained log levels can be defined in the configuration of the SLF4J backend
(e.g. logback.xml).
(e.g. logback.xml). You should also define ``akka.event.slf4j.Slf4jLoggingFilter`` in
the ``logging-filter`` configuration property. It will filter the log events using the backend
configuration (e.g. logback.xml) before they are published to the event bus.
.. code-block:: ruby
akka {
loggers = ["akka.event.slf4j.Slf4jLogger"]
loglevel = "DEBUG"
logging-filter = "akka.event.slf4j.Slf4jLoggingFilter"
}
One gotcha is that the timestamp is attributed in the event handler, not when actually doing the logging.

View file

@ -1,6 +1,7 @@
akka {
loggers = ["akka.event.slf4j.Slf4jLogger", "akka.event.Logging$DefaultLogger"]
logging-filter = "akka.event.slf4j.Slf4jLoggingFilter"
actor {
provider = "akka.cluster.ClusterActorRefProvider"

View file

@ -10,6 +10,8 @@ import akka.event.Logging._
import akka.actor._
import akka.event.DummyClassForStringSources
import akka.util.Helpers
import akka.event.LoggingFilter
import akka.event.EventStream
/**
* Base trait for all classes that wants to be able use the SLF4J logging infrastructure.
@ -104,3 +106,18 @@ class Slf4jLogger extends Actor with SLF4JLogging {
Helpers.currentTimeMillisToUTCString(timestamp)
}
/**
* [[akka.event.LoggingFilter]] that uses the log level defined in in the SLF4J
* backend configuration (e.g. logback.xml) to filter log events before publishing
* the log events to the `eventStream`.
*/
class Slf4jLoggingFilter(settings: ActorSystem.Settings, eventStream: EventStream) extends LoggingFilter {
def isErrorEnabled(logClass: Class[_], logSource: String) =
(eventStream.logLevel >= ErrorLevel) && Logger(logClass, logSource).isErrorEnabled
def isWarningEnabled(logClass: Class[_], logSource: String) =
(eventStream.logLevel >= WarningLevel) && Logger(logClass, logSource).isWarnEnabled
def isInfoEnabled(logClass: Class[_], logSource: String) =
(eventStream.logLevel >= InfoLevel) && Logger(logClass, logSource).isInfoEnabled
def isDebugEnabled(logClass: Class[_], logSource: String) =
(eventStream.logLevel >= DebugLevel) && Logger(logClass, logSource).isDebugEnabled
}

View file

@ -1,26 +1,28 @@
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%date{ISO8601} %-5level %logger %X{akkaSource} %X{sourceThread} - %msg%n</pattern>
</encoder>
</appender>
<appender name="TEST"
class="akka.event.slf4j.Slf4jLoggerSpec$TestAppender">
<appender name="TEST" class="akka.event.slf4j.Slf4jLoggerSpec$TestAppender">
<encoder>
<pattern>%date{ISO8601} level=[%level] logger=[%logger] akkaSource=[%X{akkaSource}] sourceThread=[%X{sourceThread}] mdc=[ticket-#%X{ticketNumber}: %X{ticketDesc}] - msg=[%msg]%n----%n</pattern>
</encoder>
</appender>
<logger name="akka.event.slf4j.Slf4jLoggerSpec" level="info" additivity="false">
<logger name="akka.event.slf4j.Slf4jLoggingFilterSpec$DebugLevelProducer"
level="debug" additivity="false">
<appender-ref ref="STDOUT" />
</logger>
<logger name="akka.event.slf4j.Slf4jLoggingFilterSpec$WarningLevelProducer"
level="warn" additivity="false">
<appender-ref ref="STDOUT" />
</logger>
<logger name="akka.event.slf4j.Slf4jLoggerSpec" level="info"
additivity="false">
<appender-ref ref="TEST" />
</logger>
<root level="info">
<appender-ref ref="STDOUT" />
</root>
</configuration>

View file

@ -0,0 +1,122 @@
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.event.slf4j
import language.postfixOps
import akka.testkit.AkkaSpec
import akka.actor.{ DiagnosticActorLogging, Actor, ActorLogging, Props }
import scala.concurrent.duration._
import akka.event.Logging
import ch.qos.logback.core.OutputStreamAppender
import java.io.StringWriter
import java.io.ByteArrayOutputStream
import org.scalatest.BeforeAndAfterEach
import akka.actor.ActorRef
import akka.event.Logging.InitializeLogger
import akka.event.Logging.LoggerInitialized
import akka.event.Logging.LogEvent
import akka.testkit.TestProbe
import akka.event.Logging.Warning
import akka.event.Logging.Info
import akka.event.Logging.Debug
import akka.event.LoggingAdapter
import akka.event.LogSource
object Slf4jLoggingFilterSpec {
// This test depends on logback configuration in src/test/resources/logback-test.xml
val config = """
akka {
loglevel = DEBUG
loggers = ["akka.event.slf4j.Slf4jLoggingFilterSpec$TestLogger"]
logging-filter = "akka.event.slf4j.Slf4jLoggingFilter"
}
"""
final case class SetTarget(ref: ActorRef)
class TestLogger extends Actor {
var target: Option[ActorRef] = None
override def receive: Receive = {
case InitializeLogger(bus)
bus.subscribe(context.self, classOf[SetTarget])
sender() ! LoggerInitialized
case SetTarget(ref)
target = Some(ref)
ref ! ("OK")
case event: LogEvent
println("# event: " + event)
target foreach { _ ! event }
}
}
class DebugLevelProducer extends Actor with ActorLogging {
def receive = {
case s: String
log.warning(s)
log.info(s)
println("# DebugLevelProducer: " + log.isDebugEnabled)
log.debug(s)
}
}
class WarningLevelProducer extends Actor with ActorLogging {
def receive = {
case s: String
log.warning(s)
log.info(s)
log.debug(s)
}
}
}
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class Slf4jLoggingFilterSpec extends AkkaSpec(Slf4jLoggingFilterSpec.config) with BeforeAndAfterEach {
import Slf4jLoggingFilterSpec._
"Slf4jLoggingFilter" must {
"use configured LoggingFilter at debug log level in logback conf" in {
import LogSource.fromClass
val log1 = Logging(system, classOf[DebugLevelProducer])
log1.isDebugEnabled should be(true)
log1.isInfoEnabled should be(true)
log1.isWarningEnabled should be(true)
log1.isErrorEnabled should be(true)
}
"use configured LoggingFilter at warning log level in logback conf" in {
import LogSource.fromClass
val log1 = Logging(system, classOf[WarningLevelProducer])
log1.isDebugEnabled should be(false)
log1.isInfoEnabled should be(false)
log1.isWarningEnabled should be(true)
log1.isErrorEnabled should be(true)
}
"filter ActorLogging at debug log level with logback conf" in {
val probe = TestProbe()
system.eventStream.publish(SetTarget(probe.ref))
probe.expectMsg("OK")
val debugLevelProducer = system.actorOf(Props[DebugLevelProducer], name = "debugLevelProducer")
debugLevelProducer ! "test1"
probe.expectMsgType[Warning].message should be("test1")
probe.expectMsgType[Info].message should be("test1")
probe.expectMsgType[Debug].message should be("test1")
}
"filter ActorLogging at warning log level with logback conf" in {
val probe = TestProbe()
system.eventStream.publish(SetTarget(probe.ref))
probe.expectMsg("OK")
val debugLevelProducer = system.actorOf(Props[WarningLevelProducer], name = "warningLevelProducer")
debugLevelProducer ! "test2"
probe.expectMsgType[Warning].message should be("test2")
probe.expectNoMsg(500.millis)
}
}
}