From 2a0e967be3a05cddef7b946154cfe9077a2cb37b Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Wed, 11 Dec 2019 08:58:25 +0100 Subject: [PATCH] Supress ActorSelectionMessage with DeadLetterSuppression, #28226 (#28341) * for example the Cluster InitJoin message is marked with DeadLetterSuppression but was anyway logged because sent with actorSelection * for other WrappedMessage than ActorSelectionMessage we shouldn't unwrap and publish the inner in SuppressedDeadLetter because that might loose some information * therefore those are silenced in the DeadLetterListener instead --- .../akka/actor/DeadLetterSupressionSpec.scala | 17 ++++++ .../src/main/scala/akka/actor/ActorRef.scala | 13 ++++- .../scala/akka/event/DeadLetterListener.scala | 58 ++++++++++++------- 3 files changed, 65 insertions(+), 23 deletions(-) diff --git a/akka-actor-tests/src/test/scala/akka/actor/DeadLetterSupressionSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/DeadLetterSupressionSpec.scala index e7b45d7483..6c6aab5fe3 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/DeadLetterSupressionSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/DeadLetterSupressionSpec.scala @@ -50,6 +50,15 @@ class DeadLetterSupressionSpec extends AkkaSpec with ImplicitSender { allListener.expectMsg(SuppressedDeadLetter(SuppressedMsg, testActor, system.deadLetters)) allListener.expectMsg(DeadLetter(NormalMsg, testActor, deadActor)) allListener.expectNoMessage() + + // unwrap for ActorSelection + system.actorSelection(deadActor.path) ! SuppressedMsg + system.actorSelection(deadActor.path) ! NormalMsg + + // the recipient ref isn't the same as deadActor here so only checking the message + deadListener.expectMsgType[DeadLetter].message should ===(NormalMsg) + suppressedListener.expectMsgType[SuppressedDeadLetter].message should ===(SuppressedMsg) + deadListener.expectNoMessage() } s"must suppress message from default dead-letters logging (sent to dead: ${Logging.simpleName(system.deadLetters)})" in { @@ -76,5 +85,13 @@ class DeadLetterSupressionSpec extends AkkaSpec with ImplicitSender { deadListener.expectNoMessage(Duration.Zero) suppressedListener.expectNoMessage(Duration.Zero) allListener.expectNoMessage(Duration.Zero) + + // unwrap for ActorSelection + system.actorSelection(system.deadLetters.path) ! SuppressedMsg + system.actorSelection(system.deadLetters.path) ! NormalMsg + + deadListener.expectMsg(DeadLetter(NormalMsg, testActor, system.deadLetters)) + suppressedListener.expectMsg(SuppressedDeadLetter(SuppressedMsg, testActor, system.deadLetters)) + deadListener.expectNoMessage() } } diff --git a/akka-actor/src/main/scala/akka/actor/ActorRef.scala b/akka-actor/src/main/scala/akka/actor/ActorRef.scala index 15b3e5fb67..04fefbb5be 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRef.scala @@ -608,14 +608,23 @@ private[akka] class EmptyLocalActorRef( case Some(identify) => if (!sel.wildcardFanOut) sender ! ActorIdentity(identify.messageId, None) case None => - eventStream.publish(DeadLetter(sel.msg, if (sender eq Actor.noSender) provider.deadLetters else sender, this)) + sel.msg match { + case m: DeadLetterSuppression => publishSupressedDeadLetter(m, sender) + case _ => + eventStream.publish( + DeadLetter(sel.msg, if (sender eq Actor.noSender) provider.deadLetters else sender, this)) + } } true case m: DeadLetterSuppression => - eventStream.publish(SuppressedDeadLetter(m, if (sender eq Actor.noSender) provider.deadLetters else sender, this)) + publishSupressedDeadLetter(m, sender) true case _ => false } + + private def publishSupressedDeadLetter(msg: DeadLetterSuppression, sender: ActorRef): Unit = { + eventStream.publish(SuppressedDeadLetter(msg, if (sender eq Actor.noSender) provider.deadLetters else sender, this)) + } } /** diff --git a/akka-actor/src/main/scala/akka/event/DeadLetterListener.scala b/akka-actor/src/main/scala/akka/event/DeadLetterListener.scala index 3f87ecf34e..e0b973d2f0 100644 --- a/akka-actor/src/main/scala/akka/event/DeadLetterListener.scala +++ b/akka-actor/src/main/scala/akka/event/DeadLetterListener.scala @@ -13,6 +13,7 @@ import akka.actor.ActorRef import akka.actor.AllDeadLetters import akka.actor.DeadLetter import akka.actor.DeadLetterActorRef +import akka.actor.DeadLetterSuppression import akka.actor.Dropped import akka.actor.WrappedMessage import akka.event.Logging.Info @@ -59,40 +60,48 @@ class DeadLetterListener extends Actor { private def receiveWithAlwaysLogging: Receive = { case d: AllDeadLetters => - incrementCount() - logDeadLetter(d, doneMsg = "") + if (!isWrappedSuppressed(d)) { + incrementCount() + logDeadLetter(d, doneMsg = "") + } } private def receiveWithMaxCountLogging: Receive = { case d: AllDeadLetters => - incrementCount() - if (count == maxCount) { - logDeadLetter(d, ", no more dead letters will be logged") - context.stop(self) - } else { - logDeadLetter(d, "") + if (!isWrappedSuppressed(d)) { + incrementCount() + if (count == maxCount) { + logDeadLetter(d, ", no more dead letters will be logged") + context.stop(self) + } else { + logDeadLetter(d, "") + } } } private def receiveWithSuspendLogging(suspendDuration: FiniteDuration): Receive = { case d: AllDeadLetters => - incrementCount() - if (count == maxCount) { - val doneMsg = s", no more dead letters will be logged in next [${suspendDuration.pretty}]" - logDeadLetter(d, doneMsg) - context.become(receiveWhenSuspended(suspendDuration, Deadline.now + suspendDuration)) - } else - logDeadLetter(d, "") + if (!isWrappedSuppressed(d)) { + incrementCount() + if (count == maxCount) { + val doneMsg = s", no more dead letters will be logged in next [${suspendDuration.pretty}]" + logDeadLetter(d, doneMsg) + context.become(receiveWhenSuspended(suspendDuration, Deadline.now + suspendDuration)) + } else + logDeadLetter(d, "") + } } private def receiveWhenSuspended(suspendDuration: FiniteDuration, suspendDeadline: Deadline): Receive = { case d: AllDeadLetters => - incrementCount() - if (suspendDeadline.isOverdue()) { - val doneMsg = s", of which ${count - maxCount - 1} were not logged. The counter will be reset now" - logDeadLetter(d, doneMsg) - count = 0 - context.become(receiveWithSuspendLogging(suspendDuration)) + if (!isWrappedSuppressed(d)) { + incrementCount() + if (suspendDeadline.isOverdue()) { + val doneMsg = s", of which ${count - maxCount - 1} were not logged. The counter will be reset now" + logDeadLetter(d, doneMsg) + count = 0 + context.become(receiveWithSuspendLogging(suspendDuration)) + } } } @@ -126,4 +135,11 @@ class DeadLetterListener extends Actor { (snd ne ActorRef.noSender) && (snd ne context.system.deadLetters) && !snd.isInstanceOf[DeadLetterActorRef] } + private def isWrappedSuppressed(d: AllDeadLetters): Boolean = { + d.message match { + case w: WrappedMessage if w.message.isInstanceOf[DeadLetterSuppression] => true + case _ => false + } + } + }