Merge pull request #27085 from akka/wip-24874-deadlog-patriknw

re-enable dead letter logging after specified duration, #24874
This commit is contained in:
Patrik Nordwall 2019-06-07 17:58:32 +02:00 committed by GitHub
commit 047d620c07
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 179 additions and 37 deletions

View file

@ -0,0 +1,59 @@
/*
* Copyright (C) 2019 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.actor
import akka.testkit.AkkaSpec
import akka.testkit.EventFilter
import akka.testkit.ImplicitSender
import akka.testkit.TestActors
class DeadLetterSuspensionSpec extends AkkaSpec("""
akka.loglevel = INFO
akka.log-dead-letters = 3
akka.log-dead-letters-suspend-duration = 2s
""") with ImplicitSender {
val deadActor = system.actorOf(TestActors.echoActorProps)
watch(deadActor)
deadActor ! PoisonPill
expectTerminated(deadActor)
private def expectedDeadLettersLogMessage(count: Int): String =
s"Message [java.lang.Integer] from $testActor to $deadActor was not delivered. [$count] dead letters encountered"
"must suspend dead-letters logging when reaching 'akka.log-dead-letters', and then re-enable" in {
EventFilter.info(start = expectedDeadLettersLogMessage(1), occurrences = 1).intercept {
deadActor ! 1
}
EventFilter.info(start = expectedDeadLettersLogMessage(2), occurrences = 1).intercept {
deadActor ! 2
}
EventFilter
.info(start = expectedDeadLettersLogMessage(3) + ", no more dead letters will be logged in next", occurrences = 1)
.intercept {
deadActor ! 3
}
deadActor ! 4
deadActor ! 5
// let suspend-duration elapse
Thread.sleep(2050)
// re-enabled
EventFilter
.info(start = expectedDeadLettersLogMessage(6) + ", of which 2 were not logged", occurrences = 1)
.intercept {
deadActor ! 6
}
// reset count
EventFilter.info(start = expectedDeadLettersLogMessage(1), occurrences = 1).intercept {
deadActor ! 7
}
}
}

View file

@ -65,8 +65,11 @@ class ConfigSpec extends AkkaSpec(ConfigFactory.defaultReference(ActorSystem.fin
getInt("akka.log-dead-letters") should ===(10) getInt("akka.log-dead-letters") should ===(10)
settings.LogDeadLetters should ===(10) settings.LogDeadLetters should ===(10)
getBoolean("akka.log-dead-letters-during-shutdown") should ===(true) getBoolean("akka.log-dead-letters-during-shutdown") should ===(false)
settings.LogDeadLettersDuringShutdown should ===(true) settings.LogDeadLettersDuringShutdown should ===(false)
getDuration("akka.log-dead-letters-suspend-duration", TimeUnit.MILLISECONDS) should ===(5 * 60 * 1000L)
settings.LogDeadLettersSuspendDuration should ===(5.minutes)
getBoolean("akka.coordinated-shutdown.terminate-actor-system") should ===(true) getBoolean("akka.coordinated-shutdown.terminate-actor-system") should ===(true)
settings.CoordinatedShutdownTerminateActorSystem should ===(true) settings.CoordinatedShutdownTerminateActorSystem should ===(true)

View file

@ -57,7 +57,12 @@ akka {
# Possibility to turn off logging of dead letters while the actor system # Possibility to turn off logging of dead letters while the actor system
# is shutting down. Logging is only done when enabled by 'log-dead-letters' # is shutting down. Logging is only done when enabled by 'log-dead-letters'
# setting. # setting.
log-dead-letters-during-shutdown = on log-dead-letters-during-shutdown = off
# When log-dead-letters is enabled, this will re-enable the logging after configured duration.
# infinite: suspend the logging forever;
# or a duration (eg: 5 minutes), after which the logging will be re-enabled.
log-dead-letters-suspend-duration = 5 minutes
# List FQCN of extensions which shall be loaded at actor system startup. # List FQCN of extensions which shall be loaded at actor system startup.
# Library extensions are regular extensions that are loaded at startup and are # Library extensions are regular extensions that are loaded at startup and are

View file

@ -5,28 +5,28 @@
package akka.actor package akka.actor
import java.io.Closeable import java.io.Closeable
import java.util.Optional
import java.util.concurrent._ import java.util.concurrent._
import java.util.concurrent.atomic.AtomicReference import java.util.concurrent.atomic.AtomicReference
import com.typesafe.config.{ Config, ConfigFactory }
import akka.ConfigurationException
import akka.event._
import akka.dispatch._
import akka.japi.Util.immutableSeq
import akka.actor.dungeon.ChildrenContainer import akka.actor.dungeon.ChildrenContainer
import akka.util._
import akka.util.Helpers.toRootLowerCase
import scala.annotation.tailrec
import scala.collection.immutable
import scala.concurrent.{ ExecutionContext, ExecutionContextExecutor, Future, Promise }
import scala.util.{ Failure, Success, Try }
import scala.util.control.{ ControlThrowable, NonFatal }
import java.util.Optional
import akka.actor.setup.{ ActorSystemSetup, Setup } import akka.actor.setup.{ ActorSystemSetup, Setup }
import akka.annotation.InternalApi import akka.annotation.InternalApi
import akka.ConfigurationException
import akka.dispatch._
import akka.event._
import akka.japi.Util.immutableSeq
import akka.util.Helpers.toRootLowerCase
import akka.util._
import com.typesafe.config.{ Config, ConfigFactory }
import scala.annotation.tailrec
import scala.collection.immutable
import scala.compat.java8.FutureConverters import scala.compat.java8.FutureConverters
import scala.compat.java8.OptionConverters._ import scala.compat.java8.OptionConverters._
import scala.concurrent.duration.Duration
import scala.concurrent.{ ExecutionContext, ExecutionContextExecutor, Future, Promise }
import scala.util.control.{ ControlThrowable, NonFatal }
import scala.util.{ Failure, Success, Try }
object BootstrapSetup { object BootstrapSetup {
@ -386,6 +386,13 @@ object ActorSystem {
case _ => config.getInt("akka.log-dead-letters") case _ => config.getInt("akka.log-dead-letters")
} }
final val LogDeadLettersDuringShutdown: Boolean = config.getBoolean("akka.log-dead-letters-during-shutdown") final val LogDeadLettersDuringShutdown: Boolean = config.getBoolean("akka.log-dead-letters-during-shutdown")
final val LogDeadLettersSuspendDuration: Duration = {
val key = "akka.log-dead-letters-suspend-duration"
toRootLowerCase(config.getString(key)) match {
case "infinite" => Duration.Inf
case _ => config.getMillisDuration(key)
}
}
final val AddLoggingReceive: Boolean = getBoolean("akka.actor.debug.receive") final val AddLoggingReceive: Boolean = getBoolean("akka.actor.debug.receive")
final val DebugAutoReceive: Boolean = getBoolean("akka.actor.debug.autoreceive") final val DebugAutoReceive: Boolean = getBoolean("akka.actor.debug.autoreceive")

View file

@ -4,15 +4,21 @@
package akka.event package akka.event
import scala.concurrent.duration.Deadline
import scala.concurrent.duration.FiniteDuration
import akka.actor.Actor import akka.actor.Actor
import akka.actor.ActorRef
import akka.actor.DeadLetter import akka.actor.DeadLetter
import akka.event.Logging.Info import akka.event.Logging.Info
import akka.util.PrettyDuration._
class DeadLetterListener extends Actor { class DeadLetterListener extends Actor {
val eventStream = context.system.eventStream val eventStream: EventStream = context.system.eventStream
val maxCount = context.system.settings.LogDeadLetters protected val maxCount: Int = context.system.settings.LogDeadLetters
var count = 0 private val isAlwaysLoggingDeadLetters = maxCount == Int.MaxValue
protected var count: Int = 0
override def preStart(): Unit = override def preStart(): Unit =
eventStream.subscribe(self, classOf[DeadLetter]) eventStream.subscribe(self, classOf[DeadLetter])
@ -26,21 +32,73 @@ class DeadLetterListener extends Actor {
override def postStop(): Unit = override def postStop(): Unit =
eventStream.unsubscribe(self) eventStream.unsubscribe(self)
def receive = { private def incrementCount(): Unit = {
case DeadLetter(message, snd, rcp) => // `count` is public API (for unknown reason) so for backwards compatibility reasons we
// can't change it to Long
if (count == Int.MaxValue) {
Logging.getLogger(this).info("Resetting DeadLetterListener counter after reaching Int.MaxValue.")
count = 1
} else
count += 1 count += 1
val origin = if (snd eq context.system.deadLetters) "without sender" else s"from $snd" }
val done = maxCount != Int.MaxValue && count >= maxCount
val doneMsg = if (done) ", no more dead letters will be logged" else "" def receive: Receive =
eventStream.publish( if (isAlwaysLoggingDeadLetters) receiveWithAlwaysLogging
Info( else
rcp.path.toString, context.system.settings.LogDeadLettersSuspendDuration match {
rcp.getClass, case suspendDuration: FiniteDuration => receiveWithSuspendLogging(suspendDuration)
s"Message [${message.getClass.getName}] $origin to $rcp was not delivered. [$count] dead letters encountered$doneMsg. " + case _ => receiveWithMaxCountLogging
s"If this is not an expected behavior, then [$rcp] may have terminated unexpectedly, " + }
"This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' " +
"and 'akka.log-dead-letters-during-shutdown'.")) private def receiveWithAlwaysLogging: Receive = {
if (done) context.stop(self) case DeadLetter(message, snd, recipient) =>
incrementCount()
logDeadLetter(message, snd, recipient, doneMsg = "")
}
private def receiveWithMaxCountLogging: Receive = {
case DeadLetter(message, snd, recipient) =>
incrementCount()
if (count == maxCount) {
logDeadLetter(message, snd, recipient, ", no more dead letters will be logged")
context.stop(self)
} else {
logDeadLetter(message, snd, recipient, "")
}
}
private def receiveWithSuspendLogging(suspendDuration: FiniteDuration): Receive = {
case DeadLetter(message, snd, recipient) =>
incrementCount()
if (count == maxCount) {
val doneMsg = s", no more dead letters will be logged in next [${suspendDuration.pretty}]"
logDeadLetter(message, snd, recipient, doneMsg)
context.become(receiveWhenSuspended(suspendDuration, Deadline.now + suspendDuration))
} else
logDeadLetter(message, snd, recipient, "")
}
private def receiveWhenSuspended(suspendDuration: FiniteDuration, suspendDeadline: Deadline): Receive = {
case DeadLetter(message, snd, recipient) =>
incrementCount()
if (suspendDeadline.isOverdue()) {
val doneMsg = s", of which ${count - maxCount - 1} were not logged. The counter will be reset now"
logDeadLetter(message, snd, recipient, doneMsg)
count = 0
context.become(receiveWithSuspendLogging(suspendDuration))
}
}
private def logDeadLetter(message: Any, snd: ActorRef, recipient: ActorRef, doneMsg: String): Unit = {
val origin = if (snd eq context.system.deadLetters) "without sender" else s"from $snd"
eventStream.publish(
Info(
recipient.path.toString,
recipient.getClass,
s"Message [${message.getClass.getName}] $origin to $recipient was not delivered. [$count] dead letters encountered$doneMsg. " +
s"If this is not an expected behavior then $recipient may have terminated unexpectedly. " +
"This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' " +
"and 'akka.log-dead-letters-during-shutdown'."))
} }
} }

View file

@ -220,7 +220,7 @@ This reinstantiates the behavior from previous Akka versions but also removes th
user and Akka internals. So, use at your own risk! user and Akka internals. So, use at your own risk!
Several `use-dispatcher` configuration settings that previously accepted an empty value to fall back to the default Several `use-dispatcher` configuration settings that previously accepted an empty value to fall back to the default
dispatcher has now gotten an explicit value of `akka.actor.internal-dispatcher` and no longer accept an empty dispatcher has now gotten an explicit value of `akka.actor.internal-dispatcher` and no longer accept an empty
string as value. If such an empty value is used in your `application.conf` the same result is achieved by simply removing string as value. If such an empty value is used in your `application.conf` the same result is achieved by simply removing
that entry completely and having the default apply. that entry completely and having the default apply.
@ -272,6 +272,13 @@ akka.coordinated-shutdown.run-by-actor-system-terminate = off
Previously, Akka contained a shaded copy of the ForkJoinPool. In benchmarks, we could not find significant benefits of Previously, Akka contained a shaded copy of the ForkJoinPool. In benchmarks, we could not find significant benefits of
keeping our own copy, so from Akka 2.6 on, the default FJP from the JDK will be used. The Akka FJP copy was removed. keeping our own copy, so from Akka 2.6 on, the default FJP from the JDK will be used. The Akka FJP copy was removed.
### Logging of dead letters
When the number of dead letters have reached configured `akka.log-dead-letters` value it didn't log
more dead letters in Akka 2.5. In Akka 2.6 the count is reset after configured `akka.log-dead-letters-suspend-duration`.
`akka.log-dead-letters-during-shutdown` default configuration changed from `on` to `off`.
## Source incompatibilities ## Source incompatibilities
### StreamRefs ### StreamRefs
@ -279,6 +286,8 @@ keeping our own copy, so from Akka 2.6 on, the default FJP from the JDK will be
The materialized value for `StreamRefs.sinkRef` and `StreamRefs.sourceRef` is no longer wrapped in The materialized value for `StreamRefs.sinkRef` and `StreamRefs.sourceRef` is no longer wrapped in
`Future`/`CompletionStage`. It can be sent as reply to `sender()` immediately without using the `pipe` pattern. `Future`/`CompletionStage`. It can be sent as reply to `sender()` immediately without using the `pipe` pattern.
`StreamRefs` was marked as [may change](../common/may-change.md).
## Akka Typed ## Akka Typed
### Receptionist has moved ### Receptionist has moved
@ -304,7 +313,8 @@ it will work properly again.
### Akka Typed API changes ### Akka Typed API changes
Akka Typed APIs are still marked as [may change](../common/may-change.md) and therefore its API can still change without deprecation period. The following is a list of API changes since the latest release: Akka Typed APIs are still marked as [may change](../common/may-change.md) and a few changes were
made before finalizing the APIs. Compared to Akka 2.5.x the source incompatible changes are:
* Factory method `Entity.ofPersistentEntity` is renamed to `Entity.ofEventSourcedEntity` in the Java API for Akka Cluster Sharding Typed. * Factory method `Entity.ofPersistentEntity` is renamed to `Entity.ofEventSourcedEntity` in the Java API for Akka Cluster Sharding Typed.
* New abstract class `EventSourcedEntityWithEnforcedReplies` in Java API for Akka Cluster Sharding Typed and corresponding factory method `Entity.ofEventSourcedEntityWithEnforcedReplies` to ease the creation of `EventSourcedBehavior` with enforced replies. * New abstract class `EventSourcedEntityWithEnforcedReplies` in Java API for Akka Cluster Sharding Typed and corresponding factory method `Entity.ofEventSourcedEntityWithEnforcedReplies` to ease the creation of `EventSourcedBehavior` with enforced replies.
@ -315,7 +325,7 @@ Akka Typed APIs are still marked as [may change](../common/may-change.md) and th
* `TimerScheduler.startPeriodicTimer`, replaced by `startTimerWithFixedDelay` or `startTimerAtFixedRate` * `TimerScheduler.startPeriodicTimer`, replaced by `startTimerWithFixedDelay` or `startTimerAtFixedRate`
* `Routers.pool` now take a factory function rather than a `Behavior` to protect against accidentally sharing same behavior instance and state across routees. * `Routers.pool` now take a factory function rather than a `Behavior` to protect against accidentally sharing same behavior instance and state across routees.
### Akka Typed Stream API changes #### Akka Typed Stream API changes
* `ActorSoruce.actorRef` relying on `PartialFunction` has been replaced in the Java API with a variant more suitable to be called by Java. * `ActorSoruce.actorRef` relying on `PartialFunction` has been replaced in the Java API with a variant more suitable to be called by Java.