Log dead letters, see #3453
This commit is contained in:
parent
981bce5dd0
commit
cd2b77157c
9 changed files with 138 additions and 11 deletions
|
|
@ -155,6 +155,17 @@ class ActorSystemSpec extends AkkaSpec(ActorSystemSpec.config) with ImplicitSend
|
||||||
system.extension(TestExtension).system must be === system
|
system.extension(TestExtension).system must be === system
|
||||||
}
|
}
|
||||||
|
|
||||||
|
"log dead letters" in {
|
||||||
|
val sys = ActorSystem("LogDeadLetters", ConfigFactory.parseString("akka.loglevel=INFO").withFallback(AkkaSpec.testConf))
|
||||||
|
try {
|
||||||
|
val a = sys.actorOf(Props[ActorSystemSpec.Terminater])
|
||||||
|
EventFilter.info(pattern = "not delivered", occurrences = 1).intercept {
|
||||||
|
a ! "run"
|
||||||
|
a ! "boom"
|
||||||
|
}(sys)
|
||||||
|
} finally shutdown(sys)
|
||||||
|
}
|
||||||
|
|
||||||
"run termination callbacks in order" in {
|
"run termination callbacks in order" in {
|
||||||
val system2 = ActorSystem("TerminationCallbacks", AkkaSpec.testConf)
|
val system2 = ActorSystem("TerminationCallbacks", AkkaSpec.testConf)
|
||||||
val result = new ConcurrentLinkedQueue[Int]
|
val result = new ConcurrentLinkedQueue[Int]
|
||||||
|
|
|
||||||
|
|
@ -53,6 +53,12 @@ class ConfigSpec extends AkkaSpec(ConfigFactory.defaultReference(ActorSystem.fin
|
||||||
|
|
||||||
getMilliseconds("akka.logger-startup-timeout") must be(5.seconds.toMillis)
|
getMilliseconds("akka.logger-startup-timeout") must be(5.seconds.toMillis)
|
||||||
settings.LoggerStartTimeout.duration must be(5.seconds)
|
settings.LoggerStartTimeout.duration must be(5.seconds)
|
||||||
|
|
||||||
|
getInt("akka.log-dead-letters") must be(10)
|
||||||
|
settings.LogDeadLetters must be(10)
|
||||||
|
|
||||||
|
getBoolean("akka.log-dead-letters-during-shutdown") must be(true)
|
||||||
|
settings.LogDeadLettersDuringShutdown must be(true)
|
||||||
}
|
}
|
||||||
|
|
||||||
{
|
{
|
||||||
|
|
|
||||||
|
|
@ -40,6 +40,18 @@ akka {
|
||||||
# This is useful when you are uncertain of what configuration is used.
|
# This is useful when you are uncertain of what configuration is used.
|
||||||
log-config-on-start = off
|
log-config-on-start = off
|
||||||
|
|
||||||
|
# Log at info level when messages are sent to dead letters.
|
||||||
|
# Possible values:
|
||||||
|
# on: all dead letters are logged
|
||||||
|
# off: no logging of dead letters
|
||||||
|
# n: positive integer, number of dead letters that will be logged
|
||||||
|
log-dead-letters = 10
|
||||||
|
|
||||||
|
# Possibility to turn off logging of dead letters while the actor system
|
||||||
|
# is shutting down. Logging is only done when enabled by 'log-dead-letters'
|
||||||
|
# setting.
|
||||||
|
log-dead-letters-during-shutdown = on
|
||||||
|
|
||||||
# List FQCN of extensions which shall be loaded at actor system startup.
|
# List FQCN of extensions which shall be loaded at actor system startup.
|
||||||
# Should be on the format: 'extensions = ["foo", "bar"]' etc.
|
# Should be on the format: 'extensions = ["foo", "bar"]' etc.
|
||||||
# See the Akka Documentation for more info about Extensions
|
# See the Akka Documentation for more info about Extensions
|
||||||
|
|
@ -285,16 +297,16 @@ akka {
|
||||||
# schedule idle actors using the same dispatcher when a message comes in,
|
# schedule idle actors using the same dispatcher when a message comes in,
|
||||||
# and the dispatchers ExecutorService is not fully busy already.
|
# and the dispatchers ExecutorService is not fully busy already.
|
||||||
attempt-teamwork = on
|
attempt-teamwork = on
|
||||||
|
|
||||||
# If this dispatcher requires a specific type of mailbox, specify the
|
# If this dispatcher requires a specific type of mailbox, specify the
|
||||||
# fully-qualified class name here; the actually created mailbox will
|
# fully-qualified class name here; the actually created mailbox will
|
||||||
# be a subtype of this type. The empty string signifies no requirement.
|
# be a subtype of this type. The empty string signifies no requirement.
|
||||||
mailbox-requirement = ""
|
mailbox-requirement = ""
|
||||||
}
|
}
|
||||||
|
|
||||||
default-mailbox {
|
default-mailbox {
|
||||||
# FQCN of the MailboxType. The Class of the FQCN must have a public
|
# FQCN of the MailboxType. The Class of the FQCN must have a public
|
||||||
# constructor with
|
# constructor with
|
||||||
# (akka.actor.ActorSystem.Settings, com.typesafe.config.Config) parameters.
|
# (akka.actor.ActorSystem.Settings, com.typesafe.config.Config) parameters.
|
||||||
mailbox-type = "akka.dispatch.UnboundedMailbox"
|
mailbox-type = "akka.dispatch.UnboundedMailbox"
|
||||||
|
|
||||||
|
|
@ -305,7 +317,7 @@ akka {
|
||||||
# this is no longer the case, the type must explicitly be a bounded mailbox.
|
# this is no longer the case, the type must explicitly be a bounded mailbox.
|
||||||
mailbox-capacity = 1000
|
mailbox-capacity = 1000
|
||||||
|
|
||||||
# If the mailbox is bounded then this is the timeout for enqueueing
|
# If the mailbox is bounded then this is the timeout for enqueueing
|
||||||
# in case the mailbox is full. Negative values signify infinite
|
# in case the mailbox is full. Negative values signify infinite
|
||||||
# timeout, which should be avoided as it bears the risk of dead-lock.
|
# timeout, which should be avoided as it bears the risk of dead-lock.
|
||||||
mailbox-push-timeout-time = 10s
|
mailbox-push-timeout-time = 10s
|
||||||
|
|
@ -359,7 +371,7 @@ akka {
|
||||||
# com.typesafe.config.Config) parameters.
|
# com.typesafe.config.Config) parameters.
|
||||||
mailbox-type = "akka.dispatch.UnboundedDequeBasedMailbox"
|
mailbox-type = "akka.dispatch.UnboundedDequeBasedMailbox"
|
||||||
}
|
}
|
||||||
|
|
||||||
bounded-deque-based {
|
bounded-deque-based {
|
||||||
# FQCN of the MailboxType, The Class of the FQCN must have a public
|
# FQCN of the MailboxType, The Class of the FQCN must have a public
|
||||||
# constructor with (akka.actor.ActorSystem.Settings,
|
# constructor with (akka.actor.ActorSystem.Settings,
|
||||||
|
|
|
||||||
|
|
@ -151,6 +151,12 @@ object ActorSystem {
|
||||||
@deprecated("use LoggerStartTimeout)", "2.2")
|
@deprecated("use LoggerStartTimeout)", "2.2")
|
||||||
final val EventHandlerStartTimeout: Timeout = Timeout(Duration(getMilliseconds("akka.event-handler-startup-timeout"), MILLISECONDS))
|
final val EventHandlerStartTimeout: Timeout = Timeout(Duration(getMilliseconds("akka.event-handler-startup-timeout"), MILLISECONDS))
|
||||||
final val LogConfigOnStart: Boolean = config.getBoolean("akka.log-config-on-start")
|
final val LogConfigOnStart: Boolean = config.getBoolean("akka.log-config-on-start")
|
||||||
|
final val LogDeadLetters: Int = config.getString("akka.log-dead-letters").toLowerCase match {
|
||||||
|
case "off" | "false" ⇒ 0
|
||||||
|
case "on" | "true" ⇒ Int.MaxValue
|
||||||
|
case _ ⇒ config.getInt("akka.log-dead-letters")
|
||||||
|
}
|
||||||
|
final val LogDeadLettersDuringShutdown: Boolean = config.getBoolean("akka.log-dead-letters-during-shutdown")
|
||||||
|
|
||||||
final val AddLoggingReceive: Boolean = getBoolean("akka.actor.debug.receive")
|
final val AddLoggingReceive: Boolean = getBoolean("akka.actor.debug.receive")
|
||||||
final val DebugAutoReceive: Boolean = getBoolean("akka.actor.debug.autoreceive")
|
final val DebugAutoReceive: Boolean = getBoolean("akka.actor.debug.autoreceive")
|
||||||
|
|
@ -178,6 +184,7 @@ object ActorSystem {
|
||||||
* Returns the String representation of the Config that this Settings is backed by
|
* Returns the String representation of the Config that this Settings is backed by
|
||||||
*/
|
*/
|
||||||
override def toString: String = config.root.render
|
override def toString: String = config.root.render
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -459,6 +466,7 @@ private[akka] class ActorSystemImpl(val name: String, applicationConfig: Config,
|
||||||
|
|
||||||
import ActorSystem._
|
import ActorSystem._
|
||||||
|
|
||||||
|
@volatile private var logDeadLetterListener: Option[ActorRef] = None
|
||||||
final val settings: Settings = new Settings(classLoader, applicationConfig, name)
|
final val settings: Settings = new Settings(classLoader, applicationConfig, name)
|
||||||
|
|
||||||
protected def uncaughtExceptionHandler: Thread.UncaughtExceptionHandler =
|
protected def uncaughtExceptionHandler: Thread.UncaughtExceptionHandler =
|
||||||
|
|
@ -569,6 +577,8 @@ private[akka] class ActorSystemImpl(val name: String, applicationConfig: Config,
|
||||||
private lazy val _start: this.type = {
|
private lazy val _start: this.type = {
|
||||||
// the provider is expected to start default loggers, LocalActorRefProvider does this
|
// the provider is expected to start default loggers, LocalActorRefProvider does this
|
||||||
provider.init(this)
|
provider.init(this)
|
||||||
|
if (settings.LogDeadLetters > 0)
|
||||||
|
logDeadLetterListener = Some(systemActorOf(Props[DeadLetterListener], "deadLetterListener"))
|
||||||
registerOnTermination(stopScheduler())
|
registerOnTermination(stopScheduler())
|
||||||
loadExtensions()
|
loadExtensions()
|
||||||
if (LogConfigOnStart) logConfiguration()
|
if (LogConfigOnStart) logConfiguration()
|
||||||
|
|
@ -589,7 +599,10 @@ private[akka] class ActorSystemImpl(val name: String, applicationConfig: Config,
|
||||||
def awaitTermination() = awaitTermination(Duration.Inf)
|
def awaitTermination() = awaitTermination(Duration.Inf)
|
||||||
def isTerminated = terminationCallbacks.isTerminated
|
def isTerminated = terminationCallbacks.isTerminated
|
||||||
|
|
||||||
def shutdown(): Unit = guardian.stop()
|
def shutdown(): Unit = {
|
||||||
|
if (!settings.LogDeadLettersDuringShutdown) logDeadLetterListener foreach stop
|
||||||
|
guardian.stop()
|
||||||
|
}
|
||||||
|
|
||||||
//#create-scheduler
|
//#create-scheduler
|
||||||
/**
|
/**
|
||||||
|
|
|
||||||
|
|
@ -39,8 +39,10 @@ private[akka] class Mailboxes(
|
||||||
import Mailboxes._
|
import Mailboxes._
|
||||||
|
|
||||||
val deadLetterMailbox: Mailbox = new Mailbox(new MessageQueue {
|
val deadLetterMailbox: Mailbox = new Mailbox(new MessageQueue {
|
||||||
def enqueue(receiver: ActorRef, envelope: Envelope): Unit =
|
def enqueue(receiver: ActorRef, envelope: Envelope): Unit = envelope.message match {
|
||||||
deadLetters.tell(DeadLetter(envelope.message, envelope.sender, receiver), envelope.sender)
|
case _: DeadLetter ⇒ // actor subscribing to DeadLetter, drop it
|
||||||
|
case msg ⇒ deadLetters.tell(DeadLetter(msg, envelope.sender, receiver), envelope.sender)
|
||||||
|
}
|
||||||
def dequeue() = null
|
def dequeue() = null
|
||||||
def hasMessages = false
|
def hasMessages = false
|
||||||
def numberOfMessages = 0
|
def numberOfMessages = 0
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,40 @@
|
||||||
|
/**
|
||||||
|
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
|
||||||
|
*/
|
||||||
|
package akka.event
|
||||||
|
|
||||||
|
import akka.actor.Actor
|
||||||
|
import akka.actor.DeadLetter
|
||||||
|
import akka.event.Logging.Info
|
||||||
|
|
||||||
|
class DeadLetterListener extends Actor {
|
||||||
|
|
||||||
|
val eventStream = context.system.eventStream
|
||||||
|
val maxCount = context.system.settings.LogDeadLetters
|
||||||
|
var count = 0
|
||||||
|
|
||||||
|
override def preStart(): Unit =
|
||||||
|
eventStream.subscribe(self, classOf[DeadLetter])
|
||||||
|
|
||||||
|
// don't re-subscribe, skip call to preStart
|
||||||
|
override def postRestart(reason: Throwable): Unit = ()
|
||||||
|
|
||||||
|
// don't remove subscription, skip call to postStop, no children to stop
|
||||||
|
override def preRestart(reason: Throwable, message: Option[Any]): Unit = ()
|
||||||
|
|
||||||
|
override def postStop(): Unit =
|
||||||
|
eventStream.unsubscribe(self)
|
||||||
|
|
||||||
|
def receive = {
|
||||||
|
case DeadLetter(message, snd, rcp) ⇒
|
||||||
|
count += 1
|
||||||
|
val done = maxCount != Int.MaxValue && count >= maxCount
|
||||||
|
val doneMsg = if (done) ", no more dead letters will be logged" else ""
|
||||||
|
eventStream.publish(Info(rcp.path.toString, rcp.getClass,
|
||||||
|
s"Message [${message.getClass.getName}] from $snd to $rcp was not delivered. [$count] dead letters encountered$doneMsg. " +
|
||||||
|
"This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' " +
|
||||||
|
"and 'akka.log-dead-letters-during-shutdown'."))
|
||||||
|
if (done) context.stop(self)
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
@ -44,6 +44,7 @@ object MultiNodeClusterSpec {
|
||||||
failure-detector.heartbeat-interval = 400 ms
|
failure-detector.heartbeat-interval = 400 ms
|
||||||
}
|
}
|
||||||
akka.loglevel = INFO
|
akka.loglevel = INFO
|
||||||
|
akka.log-dead-letters-during-shutdown = off
|
||||||
akka.remote.log-remote-lifecycle-events = off
|
akka.remote.log-remote-lifecycle-events = off
|
||||||
akka.loggers = ["akka.testkit.TestEventListener"]
|
akka.loggers = ["akka.testkit.TestEventListener"]
|
||||||
akka.test {
|
akka.test {
|
||||||
|
|
|
||||||
|
|
@ -51,6 +51,27 @@ treatment of this case, e.g. in the SLF4J event listener which will then use
|
||||||
the string instead of the class’ name for looking up the logger instance to
|
the string instead of the class’ name for looking up the logger instance to
|
||||||
use.
|
use.
|
||||||
|
|
||||||
|
Logging of Dead Letters
|
||||||
|
-----------------------
|
||||||
|
|
||||||
|
By default messages sent to dead letters are logged at info level. Existence of dead letters
|
||||||
|
does not necessarily indicate a problem, but it might be, and therefore they are logged by default.
|
||||||
|
After a few messages this logging is turned off, to avoid flooding the logs.
|
||||||
|
You can disable this logging completely or adjust how many dead letters that are
|
||||||
|
logged. During system shutdown it is likely that you see dead letters, since pending
|
||||||
|
messages in the actor mailboxes are sent to dead letters. You can also disable logging
|
||||||
|
of dead letters during shutdown.
|
||||||
|
|
||||||
|
.. code-block:: ruby
|
||||||
|
|
||||||
|
akka {
|
||||||
|
log-dead-letters = 10
|
||||||
|
log-dead-letters-during-shutdown = on
|
||||||
|
}
|
||||||
|
|
||||||
|
To customize the logging further or take other actions for dead letters you can subscribe
|
||||||
|
to the :ref:`event-stream-java`.
|
||||||
|
|
||||||
Auxiliary logging options
|
Auxiliary logging options
|
||||||
-------------------------
|
-------------------------
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -52,6 +52,27 @@ treatment of this case, e.g. in the SLF4J event listener which will then use
|
||||||
the string instead of the class’ name for looking up the logger instance to
|
the string instead of the class’ name for looking up the logger instance to
|
||||||
use.
|
use.
|
||||||
|
|
||||||
|
Logging of Dead Letters
|
||||||
|
-----------------------
|
||||||
|
|
||||||
|
By default messages sent to dead letters are logged at info level. Existence of dead letters
|
||||||
|
does not necessarily indicate a problem, but it might be, and therefore they are logged by default.
|
||||||
|
After a few messages this logging is turned off, to avoid flooding the logs.
|
||||||
|
You can disable this logging completely or adjust how many dead letters that are
|
||||||
|
logged. During system shutdown it is likely that you see dead letters, since pending
|
||||||
|
messages in the actor mailboxes are sent to dead letters. You can also disable logging
|
||||||
|
of dead letters during shutdown.
|
||||||
|
|
||||||
|
.. code-block:: ruby
|
||||||
|
|
||||||
|
akka {
|
||||||
|
log-dead-letters = 10
|
||||||
|
log-dead-letters-during-shutdown = on
|
||||||
|
}
|
||||||
|
|
||||||
|
To customize the logging further or take other actions for dead letters you can subscribe
|
||||||
|
to the :ref:`event-stream-scala`.
|
||||||
|
|
||||||
Auxiliary logging options
|
Auxiliary logging options
|
||||||
-------------------------
|
-------------------------
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue