diff --git a/akka-actor-tests/src/test/scala/akka/actor/DeadLetterSuspensionSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/DeadLetterSuspensionSpec.scala new file mode 100644 index 0000000000..d39feab4e6 --- /dev/null +++ b/akka-actor-tests/src/test/scala/akka/actor/DeadLetterSuspensionSpec.scala @@ -0,0 +1,59 @@ +/* + * Copyright (C) 2019 Lightbend Inc. + */ + +package akka.actor + +import akka.testkit.AkkaSpec +import akka.testkit.EventFilter +import akka.testkit.ImplicitSender +import akka.testkit.TestActors + +class DeadLetterSuspensionSpec extends AkkaSpec(""" + akka.loglevel = INFO + akka.log-dead-letters = 3 + akka.log-dead-letters-suspend-duration = 2s + """) with ImplicitSender { + + val deadActor = system.actorOf(TestActors.echoActorProps) + watch(deadActor) + deadActor ! PoisonPill + expectTerminated(deadActor) + + private def expectedDeadLettersLogMessage(count: Int): String = + s"Message [java.lang.Integer] from $testActor to $deadActor was not delivered. [$count] dead letters encountered" + + "must suspend dead-letters logging when reaching 'akka.log-dead-letters', and then re-enable" in { + + EventFilter.info(start = expectedDeadLettersLogMessage(1), occurrences = 1).intercept { + deadActor ! 1 + } + EventFilter.info(start = expectedDeadLettersLogMessage(2), occurrences = 1).intercept { + deadActor ! 2 + } + EventFilter + .info(start = expectedDeadLettersLogMessage(3) + ", no more dead letters will be logged in next", occurrences = 1) + .intercept { + deadActor ! 3 + } + deadActor ! 4 + deadActor ! 5 + + // let suspend-duration elapse + Thread.sleep(2050) + + // re-enabled + EventFilter + .info(start = expectedDeadLettersLogMessage(6) + ", of which 2 were not logged", occurrences = 1) + .intercept { + deadActor ! 6 + } + + // reset count + EventFilter.info(start = expectedDeadLettersLogMessage(1), occurrences = 1).intercept { + deadActor ! 7 + } + + } + +} diff --git a/akka-actor-tests/src/test/scala/akka/config/ConfigSpec.scala b/akka-actor-tests/src/test/scala/akka/config/ConfigSpec.scala index f9cf6fee43..1434d3cac5 100644 --- a/akka-actor-tests/src/test/scala/akka/config/ConfigSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/config/ConfigSpec.scala @@ -65,8 +65,11 @@ class ConfigSpec extends AkkaSpec(ConfigFactory.defaultReference(ActorSystem.fin getInt("akka.log-dead-letters") should ===(10) settings.LogDeadLetters should ===(10) - getBoolean("akka.log-dead-letters-during-shutdown") should ===(true) - settings.LogDeadLettersDuringShutdown should ===(true) + getBoolean("akka.log-dead-letters-during-shutdown") should ===(false) + settings.LogDeadLettersDuringShutdown should ===(false) + + getDuration("akka.log-dead-letters-suspend-duration", TimeUnit.MILLISECONDS) should ===(5 * 60 * 1000L) + settings.LogDeadLettersSuspendDuration should ===(5.minutes) getBoolean("akka.coordinated-shutdown.terminate-actor-system") should ===(true) settings.CoordinatedShutdownTerminateActorSystem should ===(true) diff --git a/akka-actor/src/main/resources/reference.conf b/akka-actor/src/main/resources/reference.conf index 3afa277baf..e803cc8991 100644 --- a/akka-actor/src/main/resources/reference.conf +++ b/akka-actor/src/main/resources/reference.conf @@ -57,7 +57,12 @@ akka { # Possibility to turn off logging of dead letters while the actor system # is shutting down. Logging is only done when enabled by 'log-dead-letters' # setting. - log-dead-letters-during-shutdown = on + log-dead-letters-during-shutdown = off + + # When log-dead-letters is enabled, this will re-enable the logging after configured duration. + # infinite: suspend the logging forever; + # or a duration (eg: 5 minutes), after which the logging will be re-enabled. + log-dead-letters-suspend-duration = 5 minutes # List FQCN of extensions which shall be loaded at actor system startup. # Library extensions are regular extensions that are loaded at startup and are diff --git a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala index 3db297f0b4..f7e24e834d 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala @@ -5,28 +5,28 @@ package akka.actor import java.io.Closeable +import java.util.Optional import java.util.concurrent._ import java.util.concurrent.atomic.AtomicReference -import com.typesafe.config.{ Config, ConfigFactory } -import akka.ConfigurationException -import akka.event._ -import akka.dispatch._ -import akka.japi.Util.immutableSeq import akka.actor.dungeon.ChildrenContainer -import akka.util._ -import akka.util.Helpers.toRootLowerCase -import scala.annotation.tailrec -import scala.collection.immutable -import scala.concurrent.{ ExecutionContext, ExecutionContextExecutor, Future, Promise } -import scala.util.{ Failure, Success, Try } -import scala.util.control.{ ControlThrowable, NonFatal } -import java.util.Optional - import akka.actor.setup.{ ActorSystemSetup, Setup } import akka.annotation.InternalApi +import akka.ConfigurationException +import akka.dispatch._ +import akka.event._ +import akka.japi.Util.immutableSeq +import akka.util.Helpers.toRootLowerCase +import akka.util._ +import com.typesafe.config.{ Config, ConfigFactory } +import scala.annotation.tailrec +import scala.collection.immutable import scala.compat.java8.FutureConverters import scala.compat.java8.OptionConverters._ +import scala.concurrent.duration.Duration +import scala.concurrent.{ ExecutionContext, ExecutionContextExecutor, Future, Promise } +import scala.util.control.{ ControlThrowable, NonFatal } +import scala.util.{ Failure, Success, Try } object BootstrapSetup { @@ -386,6 +386,13 @@ object ActorSystem { case _ => config.getInt("akka.log-dead-letters") } final val LogDeadLettersDuringShutdown: Boolean = config.getBoolean("akka.log-dead-letters-during-shutdown") + final val LogDeadLettersSuspendDuration: Duration = { + val key = "akka.log-dead-letters-suspend-duration" + toRootLowerCase(config.getString(key)) match { + case "infinite" => Duration.Inf + case _ => config.getMillisDuration(key) + } + } final val AddLoggingReceive: Boolean = getBoolean("akka.actor.debug.receive") final val DebugAutoReceive: Boolean = getBoolean("akka.actor.debug.autoreceive") diff --git a/akka-actor/src/main/scala/akka/event/DeadLetterListener.scala b/akka-actor/src/main/scala/akka/event/DeadLetterListener.scala index 104c216ecf..270e156c05 100644 --- a/akka-actor/src/main/scala/akka/event/DeadLetterListener.scala +++ b/akka-actor/src/main/scala/akka/event/DeadLetterListener.scala @@ -4,15 +4,21 @@ package akka.event +import scala.concurrent.duration.Deadline +import scala.concurrent.duration.FiniteDuration + import akka.actor.Actor +import akka.actor.ActorRef import akka.actor.DeadLetter import akka.event.Logging.Info +import akka.util.PrettyDuration._ class DeadLetterListener extends Actor { - val eventStream = context.system.eventStream - val maxCount = context.system.settings.LogDeadLetters - var count = 0 + val eventStream: EventStream = context.system.eventStream + protected val maxCount: Int = context.system.settings.LogDeadLetters + private val isAlwaysLoggingDeadLetters = maxCount == Int.MaxValue + protected var count: Int = 0 override def preStart(): Unit = eventStream.subscribe(self, classOf[DeadLetter]) @@ -26,21 +32,73 @@ class DeadLetterListener extends Actor { override def postStop(): Unit = eventStream.unsubscribe(self) - def receive = { - case DeadLetter(message, snd, rcp) => + private def incrementCount(): Unit = { + // `count` is public API (for unknown reason) so for backwards compatibility reasons we + // can't change it to Long + if (count == Int.MaxValue) { + Logging.getLogger(this).info("Resetting DeadLetterListener counter after reaching Int.MaxValue.") + count = 1 + } else count += 1 - val origin = if (snd eq context.system.deadLetters) "without sender" else s"from $snd" - val done = maxCount != Int.MaxValue && count >= maxCount - val doneMsg = if (done) ", no more dead letters will be logged" else "" - eventStream.publish( - Info( - rcp.path.toString, - rcp.getClass, - s"Message [${message.getClass.getName}] $origin to $rcp was not delivered. [$count] dead letters encountered$doneMsg. " + - s"If this is not an expected behavior, then [$rcp] may have terminated unexpectedly, " + - "This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' " + - "and 'akka.log-dead-letters-during-shutdown'.")) - if (done) context.stop(self) + } + + def receive: Receive = + if (isAlwaysLoggingDeadLetters) receiveWithAlwaysLogging + else + context.system.settings.LogDeadLettersSuspendDuration match { + case suspendDuration: FiniteDuration => receiveWithSuspendLogging(suspendDuration) + case _ => receiveWithMaxCountLogging + } + + private def receiveWithAlwaysLogging: Receive = { + case DeadLetter(message, snd, recipient) => + incrementCount() + logDeadLetter(message, snd, recipient, doneMsg = "") + } + + private def receiveWithMaxCountLogging: Receive = { + case DeadLetter(message, snd, recipient) => + incrementCount() + if (count == maxCount) { + logDeadLetter(message, snd, recipient, ", no more dead letters will be logged") + context.stop(self) + } else { + logDeadLetter(message, snd, recipient, "") + } + } + + private def receiveWithSuspendLogging(suspendDuration: FiniteDuration): Receive = { + case DeadLetter(message, snd, recipient) => + incrementCount() + if (count == maxCount) { + val doneMsg = s", no more dead letters will be logged in next [${suspendDuration.pretty}]" + logDeadLetter(message, snd, recipient, doneMsg) + context.become(receiveWhenSuspended(suspendDuration, Deadline.now + suspendDuration)) + } else + logDeadLetter(message, snd, recipient, "") + } + + private def receiveWhenSuspended(suspendDuration: FiniteDuration, suspendDeadline: Deadline): Receive = { + case DeadLetter(message, snd, recipient) => + incrementCount() + if (suspendDeadline.isOverdue()) { + val doneMsg = s", of which ${count - maxCount - 1} were not logged. The counter will be reset now" + logDeadLetter(message, snd, recipient, doneMsg) + count = 0 + context.become(receiveWithSuspendLogging(suspendDuration)) + } + } + + private def logDeadLetter(message: Any, snd: ActorRef, recipient: ActorRef, doneMsg: String): Unit = { + val origin = if (snd eq context.system.deadLetters) "without sender" else s"from $snd" + eventStream.publish( + Info( + recipient.path.toString, + recipient.getClass, + s"Message [${message.getClass.getName}] $origin to $recipient was not delivered. [$count] dead letters encountered$doneMsg. " + + s"If this is not an expected behavior then $recipient may have terminated unexpectedly. " + + "This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' " + + "and 'akka.log-dead-letters-during-shutdown'.")) } } diff --git a/akka-docs/src/main/paradox/project/migration-guide-2.5.x-2.6.x.md b/akka-docs/src/main/paradox/project/migration-guide-2.5.x-2.6.x.md index 6ddf8a5d12..83727e9231 100644 --- a/akka-docs/src/main/paradox/project/migration-guide-2.5.x-2.6.x.md +++ b/akka-docs/src/main/paradox/project/migration-guide-2.5.x-2.6.x.md @@ -220,7 +220,7 @@ This reinstantiates the behavior from previous Akka versions but also removes th user and Akka internals. So, use at your own risk! Several `use-dispatcher` configuration settings that previously accepted an empty value to fall back to the default -dispatcher has now gotten an explicit value of `akka.actor.internal-dispatcher` and no longer accept an empty +dispatcher has now gotten an explicit value of `akka.actor.internal-dispatcher` and no longer accept an empty string as value. If such an empty value is used in your `application.conf` the same result is achieved by simply removing that entry completely and having the default apply. @@ -272,6 +272,13 @@ akka.coordinated-shutdown.run-by-actor-system-terminate = off Previously, Akka contained a shaded copy of the ForkJoinPool. In benchmarks, we could not find significant benefits of keeping our own copy, so from Akka 2.6 on, the default FJP from the JDK will be used. The Akka FJP copy was removed. +### Logging of dead letters + +When the number of dead letters have reached configured `akka.log-dead-letters` value it didn't log +more dead letters in Akka 2.5. In Akka 2.6 the count is reset after configured `akka.log-dead-letters-suspend-duration`. + +`akka.log-dead-letters-during-shutdown` default configuration changed from `on` to `off`. + ## Source incompatibilities ### StreamRefs @@ -279,6 +286,8 @@ keeping our own copy, so from Akka 2.6 on, the default FJP from the JDK will be The materialized value for `StreamRefs.sinkRef` and `StreamRefs.sourceRef` is no longer wrapped in `Future`/`CompletionStage`. It can be sent as reply to `sender()` immediately without using the `pipe` pattern. +`StreamRefs` was marked as [may change](../common/may-change.md). + ## Akka Typed ### Receptionist has moved @@ -304,7 +313,8 @@ it will work properly again. ### Akka Typed API changes -Akka Typed APIs are still marked as [may change](../common/may-change.md) and therefore its API can still change without deprecation period. The following is a list of API changes since the latest release: +Akka Typed APIs are still marked as [may change](../common/may-change.md) and a few changes were +made before finalizing the APIs. Compared to Akka 2.5.x the source incompatible changes are: * Factory method `Entity.ofPersistentEntity` is renamed to `Entity.ofEventSourcedEntity` in the Java API for Akka Cluster Sharding Typed. * New abstract class `EventSourcedEntityWithEnforcedReplies` in Java API for Akka Cluster Sharding Typed and corresponding factory method `Entity.ofEventSourcedEntityWithEnforcedReplies` to ease the creation of `EventSourcedBehavior` with enforced replies. @@ -315,7 +325,7 @@ Akka Typed APIs are still marked as [may change](../common/may-change.md) and th * `TimerScheduler.startPeriodicTimer`, replaced by `startTimerWithFixedDelay` or `startTimerAtFixedRate` * `Routers.pool` now take a factory function rather than a `Behavior` to protect against accidentally sharing same behavior instance and state across routees. -### Akka Typed Stream API changes +#### Akka Typed Stream API changes * `ActorSoruce.actorRef` relying on `PartialFunction` has been replaced in the Java API with a variant more suitable to be called by Java.