From 20466772ceead08103c0b7dc78025da6d27e7881 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=99=8E=E9=B8=A3?= Date: Sat, 21 Apr 2018 05:32:18 +0800 Subject: [PATCH 1/2] =act re-enable dead letter logging after specified duration (cherry picked from commit 0caa4f3c22f62e1a65d04dc5b5507f375d7ee794) --- akka-actor/src/main/resources/reference.conf | 5 ++ .../main/scala/akka/actor/ActorSystem.scala | 35 ++++++----- .../scala/akka/event/DeadLetterListener.scala | 63 ++++++++++++++++++- 3 files changed, 86 insertions(+), 17 deletions(-) diff --git a/akka-actor/src/main/resources/reference.conf b/akka-actor/src/main/resources/reference.conf index 3afa277baf..832d1d6c8a 100644 --- a/akka-actor/src/main/resources/reference.conf +++ b/akka-actor/src/main/resources/reference.conf @@ -54,6 +54,11 @@ akka { # n: positive integer, number of dead letters that will be logged log-dead-letters = 10 + # When log-dead-letters is on, this will renble the logging after configured duration. + # infinite: suspend the logging forever; + # or a duration (eg: 5 minutes), after which the logging will be re-enabled. + log-dead-letters-suspend-duration = infinite + # Possibility to turn off logging of dead letters while the actor system # is shutting down. Logging is only done when enabled by 'log-dead-letters' # setting. diff --git a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala index 3db297f0b4..d06bc90931 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala @@ -5,28 +5,28 @@ package akka.actor import java.io.Closeable +import java.util.Optional import java.util.concurrent._ import java.util.concurrent.atomic.AtomicReference -import com.typesafe.config.{ Config, ConfigFactory } -import akka.ConfigurationException -import akka.event._ -import akka.dispatch._ -import akka.japi.Util.immutableSeq import akka.actor.dungeon.ChildrenContainer -import akka.util._ -import akka.util.Helpers.toRootLowerCase -import scala.annotation.tailrec -import scala.collection.immutable -import scala.concurrent.{ ExecutionContext, ExecutionContextExecutor, Future, Promise } -import scala.util.{ Failure, Success, Try } -import scala.util.control.{ ControlThrowable, NonFatal } -import java.util.Optional - import akka.actor.setup.{ ActorSystemSetup, Setup } import akka.annotation.InternalApi +import akka.dispatch._ +import akka.event._ +import akka.japi.Util.immutableSeq +import akka.util.Helpers.toRootLowerCase +import akka.util._ +import com.typesafe.config.{ Config, ConfigFactory } + +import scala.annotation.tailrec +import scala.collection.immutable import scala.compat.java8.FutureConverters import scala.compat.java8.OptionConverters._ +import scala.concurrent.duration.Duration +import scala.concurrent.{ ExecutionContext, ExecutionContextExecutor, Future, Promise } +import scala.util.control.{ ControlThrowable, NonFatal } +import scala.util.{ Failure, Success, Try } object BootstrapSetup { @@ -385,6 +385,13 @@ object ActorSystem { case "on" | "true" => Int.MaxValue case _ => config.getInt("akka.log-dead-letters") } + final val LogDeadLettersSuspendDuration: Duration = + toRootLowerCase(config.getString("akka.log-dead-letters-suspend-duration")) match { + case "infinite" ⇒ Duration.Inf + case _ ⇒ + import JavaDurationConverters._ + config.getDuration("akka.log-dead-letters-suspend-duration").asScala + } final val LogDeadLettersDuringShutdown: Boolean = config.getBoolean("akka.log-dead-letters-during-shutdown") final val AddLoggingReceive: Boolean = getBoolean("akka.actor.debug.receive") diff --git a/akka-actor/src/main/scala/akka/event/DeadLetterListener.scala b/akka-actor/src/main/scala/akka/event/DeadLetterListener.scala index 104c216ecf..f55ed5fa28 100644 --- a/akka-actor/src/main/scala/akka/event/DeadLetterListener.scala +++ b/akka-actor/src/main/scala/akka/event/DeadLetterListener.scala @@ -10,9 +10,12 @@ import akka.event.Logging.Info class DeadLetterListener extends Actor { - val eventStream = context.system.eventStream - val maxCount = context.system.settings.LogDeadLetters - var count = 0 + val eventStream: EventStream = context.system.eventStream + protected val maxCount: Int = context.system.settings.LogDeadLetters + private val isAlwaysLoggingDeadLetters = maxCount == Int.MaxValue + private val suspendDuration: Duration = context.system.settings.LogDeadLettersSuspendDuration + protected var count = 0 + private var suspendDeadline: Deadline = Deadline.now override def preStart(): Unit = eventStream.subscribe(self, classOf[DeadLetter]) @@ -43,4 +46,58 @@ class DeadLetterListener extends Actor { if (done) context.stop(self) } + def receive: Receive = { + + case DeadLetter(message, sender, receipt) ⇒ + if (isAlwaysLoggingDeadLetters) { + logDeadLetter(message, sender, receipt, doneMsg = "") + } else { + suspendDuration match { + case duration: FiniteDuration ⇒ + if (count == maxCount && suspendDeadline.isOverdue()) { + count = 1 // reset, and start logging again + } + if (count < maxCount) { + val willDone = count + 1 == maxCount + if (willDone) { + logDeadLetter( + message, + sender, + receipt, + s", no more dead letters will be logged in next :[$suspendDuration]") + // after the dead letters were logged maxCount times, + // dead letters it will suspend logging for suspendDuration. + suspendDeadline = Deadline.now + duration + } else { + logDeadLetter(message, sender, receipt, "") + } + count += 1 + } + case _ ⇒ + //When no suspending + val willDone = count + 1 == maxCount + if (willDone) { + logDeadLetter(message, sender, receipt, ", no more dead letters will be logged") + context.stop(self) + } else { + logDeadLetter(message, sender, receipt, "") + count += 1 + } + } + } + } + + private def logDeadLetter(message: Any, sender: ActorRef, receipt: ActorRef, doneMsg: String): Unit = { + val origin = if (sender eq context.system.deadLetters) "without sender" else s"from $sender" + + eventStream.publish( + Info( + receipt.path.toString, + receipt.getClass, + s"Message [${message.getClass.getName}] $origin to $receipt was not delivered. [$count] dead letters encountered$doneMsg. " + + s"If this is not an expected behavior, the receiver may have terminated unexpectedly, " + + "This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' " + + "and 'akka.log-dead-letters-during-shutdown'.")) + } + } From 9a0778435c5fde11dd6925ee3ea7927a21f61aa0 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Tue, 4 Jun 2019 14:59:28 +0200 Subject: [PATCH 2/2] refactoring, cleanup, and count non-logged, #24874 --- .../akka/actor/DeadLetterSuspensionSpec.scala | 59 +++++++++ .../test/scala/akka/config/ConfigSpec.scala | 7 +- akka-actor/src/main/resources/reference.conf | 12 +- .../main/scala/akka/actor/ActorSystem.scala | 16 +-- .../scala/akka/event/DeadLetterListener.scala | 119 +++++++++--------- .../project/migration-guide-2.5.x-2.6.x.md | 16 ++- 6 files changed, 151 insertions(+), 78 deletions(-) create mode 100644 akka-actor-tests/src/test/scala/akka/actor/DeadLetterSuspensionSpec.scala diff --git a/akka-actor-tests/src/test/scala/akka/actor/DeadLetterSuspensionSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/DeadLetterSuspensionSpec.scala new file mode 100644 index 0000000000..d39feab4e6 --- /dev/null +++ b/akka-actor-tests/src/test/scala/akka/actor/DeadLetterSuspensionSpec.scala @@ -0,0 +1,59 @@ +/* + * Copyright (C) 2019 Lightbend Inc. + */ + +package akka.actor + +import akka.testkit.AkkaSpec +import akka.testkit.EventFilter +import akka.testkit.ImplicitSender +import akka.testkit.TestActors + +class DeadLetterSuspensionSpec extends AkkaSpec(""" + akka.loglevel = INFO + akka.log-dead-letters = 3 + akka.log-dead-letters-suspend-duration = 2s + """) with ImplicitSender { + + val deadActor = system.actorOf(TestActors.echoActorProps) + watch(deadActor) + deadActor ! PoisonPill + expectTerminated(deadActor) + + private def expectedDeadLettersLogMessage(count: Int): String = + s"Message [java.lang.Integer] from $testActor to $deadActor was not delivered. [$count] dead letters encountered" + + "must suspend dead-letters logging when reaching 'akka.log-dead-letters', and then re-enable" in { + + EventFilter.info(start = expectedDeadLettersLogMessage(1), occurrences = 1).intercept { + deadActor ! 1 + } + EventFilter.info(start = expectedDeadLettersLogMessage(2), occurrences = 1).intercept { + deadActor ! 2 + } + EventFilter + .info(start = expectedDeadLettersLogMessage(3) + ", no more dead letters will be logged in next", occurrences = 1) + .intercept { + deadActor ! 3 + } + deadActor ! 4 + deadActor ! 5 + + // let suspend-duration elapse + Thread.sleep(2050) + + // re-enabled + EventFilter + .info(start = expectedDeadLettersLogMessage(6) + ", of which 2 were not logged", occurrences = 1) + .intercept { + deadActor ! 6 + } + + // reset count + EventFilter.info(start = expectedDeadLettersLogMessage(1), occurrences = 1).intercept { + deadActor ! 7 + } + + } + +} diff --git a/akka-actor-tests/src/test/scala/akka/config/ConfigSpec.scala b/akka-actor-tests/src/test/scala/akka/config/ConfigSpec.scala index f9cf6fee43..1434d3cac5 100644 --- a/akka-actor-tests/src/test/scala/akka/config/ConfigSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/config/ConfigSpec.scala @@ -65,8 +65,11 @@ class ConfigSpec extends AkkaSpec(ConfigFactory.defaultReference(ActorSystem.fin getInt("akka.log-dead-letters") should ===(10) settings.LogDeadLetters should ===(10) - getBoolean("akka.log-dead-letters-during-shutdown") should ===(true) - settings.LogDeadLettersDuringShutdown should ===(true) + getBoolean("akka.log-dead-letters-during-shutdown") should ===(false) + settings.LogDeadLettersDuringShutdown should ===(false) + + getDuration("akka.log-dead-letters-suspend-duration", TimeUnit.MILLISECONDS) should ===(5 * 60 * 1000L) + settings.LogDeadLettersSuspendDuration should ===(5.minutes) getBoolean("akka.coordinated-shutdown.terminate-actor-system") should ===(true) settings.CoordinatedShutdownTerminateActorSystem should ===(true) diff --git a/akka-actor/src/main/resources/reference.conf b/akka-actor/src/main/resources/reference.conf index 832d1d6c8a..e803cc8991 100644 --- a/akka-actor/src/main/resources/reference.conf +++ b/akka-actor/src/main/resources/reference.conf @@ -54,15 +54,15 @@ akka { # n: positive integer, number of dead letters that will be logged log-dead-letters = 10 - # When log-dead-letters is on, this will renble the logging after configured duration. - # infinite: suspend the logging forever; - # or a duration (eg: 5 minutes), after which the logging will be re-enabled. - log-dead-letters-suspend-duration = infinite - # Possibility to turn off logging of dead letters while the actor system # is shutting down. Logging is only done when enabled by 'log-dead-letters' # setting. - log-dead-letters-during-shutdown = on + log-dead-letters-during-shutdown = off + + # When log-dead-letters is enabled, this will re-enable the logging after configured duration. + # infinite: suspend the logging forever; + # or a duration (eg: 5 minutes), after which the logging will be re-enabled. + log-dead-letters-suspend-duration = 5 minutes # List FQCN of extensions which shall be loaded at actor system startup. # Library extensions are regular extensions that are loaded at startup and are diff --git a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala index d06bc90931..f7e24e834d 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala @@ -12,13 +12,13 @@ import java.util.concurrent.atomic.AtomicReference import akka.actor.dungeon.ChildrenContainer import akka.actor.setup.{ ActorSystemSetup, Setup } import akka.annotation.InternalApi +import akka.ConfigurationException import akka.dispatch._ import akka.event._ import akka.japi.Util.immutableSeq import akka.util.Helpers.toRootLowerCase import akka.util._ import com.typesafe.config.{ Config, ConfigFactory } - import scala.annotation.tailrec import scala.collection.immutable import scala.compat.java8.FutureConverters @@ -385,14 +385,14 @@ object ActorSystem { case "on" | "true" => Int.MaxValue case _ => config.getInt("akka.log-dead-letters") } - final val LogDeadLettersSuspendDuration: Duration = - toRootLowerCase(config.getString("akka.log-dead-letters-suspend-duration")) match { - case "infinite" ⇒ Duration.Inf - case _ ⇒ - import JavaDurationConverters._ - config.getDuration("akka.log-dead-letters-suspend-duration").asScala - } final val LogDeadLettersDuringShutdown: Boolean = config.getBoolean("akka.log-dead-letters-during-shutdown") + final val LogDeadLettersSuspendDuration: Duration = { + val key = "akka.log-dead-letters-suspend-duration" + toRootLowerCase(config.getString(key)) match { + case "infinite" => Duration.Inf + case _ => config.getMillisDuration(key) + } + } final val AddLoggingReceive: Boolean = getBoolean("akka.actor.debug.receive") final val DebugAutoReceive: Boolean = getBoolean("akka.actor.debug.autoreceive") diff --git a/akka-actor/src/main/scala/akka/event/DeadLetterListener.scala b/akka-actor/src/main/scala/akka/event/DeadLetterListener.scala index f55ed5fa28..270e156c05 100644 --- a/akka-actor/src/main/scala/akka/event/DeadLetterListener.scala +++ b/akka-actor/src/main/scala/akka/event/DeadLetterListener.scala @@ -4,18 +4,21 @@ package akka.event +import scala.concurrent.duration.Deadline +import scala.concurrent.duration.FiniteDuration + import akka.actor.Actor +import akka.actor.ActorRef import akka.actor.DeadLetter import akka.event.Logging.Info +import akka.util.PrettyDuration._ class DeadLetterListener extends Actor { val eventStream: EventStream = context.system.eventStream protected val maxCount: Int = context.system.settings.LogDeadLetters private val isAlwaysLoggingDeadLetters = maxCount == Int.MaxValue - private val suspendDuration: Duration = context.system.settings.LogDeadLettersSuspendDuration - protected var count = 0 - private var suspendDeadline: Deadline = Deadline.now + protected var count: Int = 0 override def preStart(): Unit = eventStream.subscribe(self, classOf[DeadLetter]) @@ -29,73 +32,71 @@ class DeadLetterListener extends Actor { override def postStop(): Unit = eventStream.unsubscribe(self) - def receive = { - case DeadLetter(message, snd, rcp) => + private def incrementCount(): Unit = { + // `count` is public API (for unknown reason) so for backwards compatibility reasons we + // can't change it to Long + if (count == Int.MaxValue) { + Logging.getLogger(this).info("Resetting DeadLetterListener counter after reaching Int.MaxValue.") + count = 1 + } else count += 1 - val origin = if (snd eq context.system.deadLetters) "without sender" else s"from $snd" - val done = maxCount != Int.MaxValue && count >= maxCount - val doneMsg = if (done) ", no more dead letters will be logged" else "" - eventStream.publish( - Info( - rcp.path.toString, - rcp.getClass, - s"Message [${message.getClass.getName}] $origin to $rcp was not delivered. [$count] dead letters encountered$doneMsg. " + - s"If this is not an expected behavior, then [$rcp] may have terminated unexpectedly, " + - "This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' " + - "and 'akka.log-dead-letters-during-shutdown'.")) - if (done) context.stop(self) } - def receive: Receive = { + def receive: Receive = + if (isAlwaysLoggingDeadLetters) receiveWithAlwaysLogging + else + context.system.settings.LogDeadLettersSuspendDuration match { + case suspendDuration: FiniteDuration => receiveWithSuspendLogging(suspendDuration) + case _ => receiveWithMaxCountLogging + } - case DeadLetter(message, sender, receipt) ⇒ - if (isAlwaysLoggingDeadLetters) { - logDeadLetter(message, sender, receipt, doneMsg = "") + private def receiveWithAlwaysLogging: Receive = { + case DeadLetter(message, snd, recipient) => + incrementCount() + logDeadLetter(message, snd, recipient, doneMsg = "") + } + + private def receiveWithMaxCountLogging: Receive = { + case DeadLetter(message, snd, recipient) => + incrementCount() + if (count == maxCount) { + logDeadLetter(message, snd, recipient, ", no more dead letters will be logged") + context.stop(self) } else { - suspendDuration match { - case duration: FiniteDuration ⇒ - if (count == maxCount && suspendDeadline.isOverdue()) { - count = 1 // reset, and start logging again - } - if (count < maxCount) { - val willDone = count + 1 == maxCount - if (willDone) { - logDeadLetter( - message, - sender, - receipt, - s", no more dead letters will be logged in next :[$suspendDuration]") - // after the dead letters were logged maxCount times, - // dead letters it will suspend logging for suspendDuration. - suspendDeadline = Deadline.now + duration - } else { - logDeadLetter(message, sender, receipt, "") - } - count += 1 - } - case _ ⇒ - //When no suspending - val willDone = count + 1 == maxCount - if (willDone) { - logDeadLetter(message, sender, receipt, ", no more dead letters will be logged") - context.stop(self) - } else { - logDeadLetter(message, sender, receipt, "") - count += 1 - } - } + logDeadLetter(message, snd, recipient, "") } } - private def logDeadLetter(message: Any, sender: ActorRef, receipt: ActorRef, doneMsg: String): Unit = { - val origin = if (sender eq context.system.deadLetters) "without sender" else s"from $sender" + private def receiveWithSuspendLogging(suspendDuration: FiniteDuration): Receive = { + case DeadLetter(message, snd, recipient) => + incrementCount() + if (count == maxCount) { + val doneMsg = s", no more dead letters will be logged in next [${suspendDuration.pretty}]" + logDeadLetter(message, snd, recipient, doneMsg) + context.become(receiveWhenSuspended(suspendDuration, Deadline.now + suspendDuration)) + } else + logDeadLetter(message, snd, recipient, "") + } + private def receiveWhenSuspended(suspendDuration: FiniteDuration, suspendDeadline: Deadline): Receive = { + case DeadLetter(message, snd, recipient) => + incrementCount() + if (suspendDeadline.isOverdue()) { + val doneMsg = s", of which ${count - maxCount - 1} were not logged. The counter will be reset now" + logDeadLetter(message, snd, recipient, doneMsg) + count = 0 + context.become(receiveWithSuspendLogging(suspendDuration)) + } + } + + private def logDeadLetter(message: Any, snd: ActorRef, recipient: ActorRef, doneMsg: String): Unit = { + val origin = if (snd eq context.system.deadLetters) "without sender" else s"from $snd" eventStream.publish( Info( - receipt.path.toString, - receipt.getClass, - s"Message [${message.getClass.getName}] $origin to $receipt was not delivered. [$count] dead letters encountered$doneMsg. " + - s"If this is not an expected behavior, the receiver may have terminated unexpectedly, " + + recipient.path.toString, + recipient.getClass, + s"Message [${message.getClass.getName}] $origin to $recipient was not delivered. [$count] dead letters encountered$doneMsg. " + + s"If this is not an expected behavior then $recipient may have terminated unexpectedly. " + "This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' " + "and 'akka.log-dead-letters-during-shutdown'.")) } diff --git a/akka-docs/src/main/paradox/project/migration-guide-2.5.x-2.6.x.md b/akka-docs/src/main/paradox/project/migration-guide-2.5.x-2.6.x.md index 6ddf8a5d12..83727e9231 100644 --- a/akka-docs/src/main/paradox/project/migration-guide-2.5.x-2.6.x.md +++ b/akka-docs/src/main/paradox/project/migration-guide-2.5.x-2.6.x.md @@ -220,7 +220,7 @@ This reinstantiates the behavior from previous Akka versions but also removes th user and Akka internals. So, use at your own risk! Several `use-dispatcher` configuration settings that previously accepted an empty value to fall back to the default -dispatcher has now gotten an explicit value of `akka.actor.internal-dispatcher` and no longer accept an empty +dispatcher has now gotten an explicit value of `akka.actor.internal-dispatcher` and no longer accept an empty string as value. If such an empty value is used in your `application.conf` the same result is achieved by simply removing that entry completely and having the default apply. @@ -272,6 +272,13 @@ akka.coordinated-shutdown.run-by-actor-system-terminate = off Previously, Akka contained a shaded copy of the ForkJoinPool. In benchmarks, we could not find significant benefits of keeping our own copy, so from Akka 2.6 on, the default FJP from the JDK will be used. The Akka FJP copy was removed. +### Logging of dead letters + +When the number of dead letters have reached configured `akka.log-dead-letters` value it didn't log +more dead letters in Akka 2.5. In Akka 2.6 the count is reset after configured `akka.log-dead-letters-suspend-duration`. + +`akka.log-dead-letters-during-shutdown` default configuration changed from `on` to `off`. + ## Source incompatibilities ### StreamRefs @@ -279,6 +286,8 @@ keeping our own copy, so from Akka 2.6 on, the default FJP from the JDK will be The materialized value for `StreamRefs.sinkRef` and `StreamRefs.sourceRef` is no longer wrapped in `Future`/`CompletionStage`. It can be sent as reply to `sender()` immediately without using the `pipe` pattern. +`StreamRefs` was marked as [may change](../common/may-change.md). + ## Akka Typed ### Receptionist has moved @@ -304,7 +313,8 @@ it will work properly again. ### Akka Typed API changes -Akka Typed APIs are still marked as [may change](../common/may-change.md) and therefore its API can still change without deprecation period. The following is a list of API changes since the latest release: +Akka Typed APIs are still marked as [may change](../common/may-change.md) and a few changes were +made before finalizing the APIs. Compared to Akka 2.5.x the source incompatible changes are: * Factory method `Entity.ofPersistentEntity` is renamed to `Entity.ofEventSourcedEntity` in the Java API for Akka Cluster Sharding Typed. * New abstract class `EventSourcedEntityWithEnforcedReplies` in Java API for Akka Cluster Sharding Typed and corresponding factory method `Entity.ofEventSourcedEntityWithEnforcedReplies` to ease the creation of `EventSourcedBehavior` with enforced replies. @@ -315,7 +325,7 @@ Akka Typed APIs are still marked as [may change](../common/may-change.md) and th * `TimerScheduler.startPeriodicTimer`, replaced by `startTimerWithFixedDelay` or `startTimerAtFixedRate` * `Routers.pool` now take a factory function rather than a `Behavior` to protect against accidentally sharing same behavior instance and state across routees. -### Akka Typed Stream API changes +#### Akka Typed Stream API changes * `ActorSoruce.actorRef` relying on `PartialFunction` has been replaced in the Java API with a variant more suitable to be called by Java.