* for example the Cluster InitJoin message is marked with DeadLetterSuppression but was anyway logged because sent with actorSelection * for other WrappedMessage than ActorSelectionMessage we shouldn't unwrap and publish the inner in SuppressedDeadLetter because that might loose some information * therefore those are silenced in the DeadLetterListener instead
This commit is contained in:
parent
87b94b65fd
commit
2a0e967be3
3 changed files with 65 additions and 23 deletions
|
|
@ -50,6 +50,15 @@ class DeadLetterSupressionSpec extends AkkaSpec with ImplicitSender {
|
||||||
allListener.expectMsg(SuppressedDeadLetter(SuppressedMsg, testActor, system.deadLetters))
|
allListener.expectMsg(SuppressedDeadLetter(SuppressedMsg, testActor, system.deadLetters))
|
||||||
allListener.expectMsg(DeadLetter(NormalMsg, testActor, deadActor))
|
allListener.expectMsg(DeadLetter(NormalMsg, testActor, deadActor))
|
||||||
allListener.expectNoMessage()
|
allListener.expectNoMessage()
|
||||||
|
|
||||||
|
// unwrap for ActorSelection
|
||||||
|
system.actorSelection(deadActor.path) ! SuppressedMsg
|
||||||
|
system.actorSelection(deadActor.path) ! NormalMsg
|
||||||
|
|
||||||
|
// the recipient ref isn't the same as deadActor here so only checking the message
|
||||||
|
deadListener.expectMsgType[DeadLetter].message should ===(NormalMsg)
|
||||||
|
suppressedListener.expectMsgType[SuppressedDeadLetter].message should ===(SuppressedMsg)
|
||||||
|
deadListener.expectNoMessage()
|
||||||
}
|
}
|
||||||
|
|
||||||
s"must suppress message from default dead-letters logging (sent to dead: ${Logging.simpleName(system.deadLetters)})" in {
|
s"must suppress message from default dead-letters logging (sent to dead: ${Logging.simpleName(system.deadLetters)})" in {
|
||||||
|
|
@ -76,5 +85,13 @@ class DeadLetterSupressionSpec extends AkkaSpec with ImplicitSender {
|
||||||
deadListener.expectNoMessage(Duration.Zero)
|
deadListener.expectNoMessage(Duration.Zero)
|
||||||
suppressedListener.expectNoMessage(Duration.Zero)
|
suppressedListener.expectNoMessage(Duration.Zero)
|
||||||
allListener.expectNoMessage(Duration.Zero)
|
allListener.expectNoMessage(Duration.Zero)
|
||||||
|
|
||||||
|
// unwrap for ActorSelection
|
||||||
|
system.actorSelection(system.deadLetters.path) ! SuppressedMsg
|
||||||
|
system.actorSelection(system.deadLetters.path) ! NormalMsg
|
||||||
|
|
||||||
|
deadListener.expectMsg(DeadLetter(NormalMsg, testActor, system.deadLetters))
|
||||||
|
suppressedListener.expectMsg(SuppressedDeadLetter(SuppressedMsg, testActor, system.deadLetters))
|
||||||
|
deadListener.expectNoMessage()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -608,14 +608,23 @@ private[akka] class EmptyLocalActorRef(
|
||||||
case Some(identify) =>
|
case Some(identify) =>
|
||||||
if (!sel.wildcardFanOut) sender ! ActorIdentity(identify.messageId, None)
|
if (!sel.wildcardFanOut) sender ! ActorIdentity(identify.messageId, None)
|
||||||
case None =>
|
case None =>
|
||||||
eventStream.publish(DeadLetter(sel.msg, if (sender eq Actor.noSender) provider.deadLetters else sender, this))
|
sel.msg match {
|
||||||
|
case m: DeadLetterSuppression => publishSupressedDeadLetter(m, sender)
|
||||||
|
case _ =>
|
||||||
|
eventStream.publish(
|
||||||
|
DeadLetter(sel.msg, if (sender eq Actor.noSender) provider.deadLetters else sender, this))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
true
|
true
|
||||||
case m: DeadLetterSuppression =>
|
case m: DeadLetterSuppression =>
|
||||||
eventStream.publish(SuppressedDeadLetter(m, if (sender eq Actor.noSender) provider.deadLetters else sender, this))
|
publishSupressedDeadLetter(m, sender)
|
||||||
true
|
true
|
||||||
case _ => false
|
case _ => false
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private def publishSupressedDeadLetter(msg: DeadLetterSuppression, sender: ActorRef): Unit = {
|
||||||
|
eventStream.publish(SuppressedDeadLetter(msg, if (sender eq Actor.noSender) provider.deadLetters else sender, this))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
||||||
|
|
@ -13,6 +13,7 @@ import akka.actor.ActorRef
|
||||||
import akka.actor.AllDeadLetters
|
import akka.actor.AllDeadLetters
|
||||||
import akka.actor.DeadLetter
|
import akka.actor.DeadLetter
|
||||||
import akka.actor.DeadLetterActorRef
|
import akka.actor.DeadLetterActorRef
|
||||||
|
import akka.actor.DeadLetterSuppression
|
||||||
import akka.actor.Dropped
|
import akka.actor.Dropped
|
||||||
import akka.actor.WrappedMessage
|
import akka.actor.WrappedMessage
|
||||||
import akka.event.Logging.Info
|
import akka.event.Logging.Info
|
||||||
|
|
@ -59,40 +60,48 @@ class DeadLetterListener extends Actor {
|
||||||
|
|
||||||
private def receiveWithAlwaysLogging: Receive = {
|
private def receiveWithAlwaysLogging: Receive = {
|
||||||
case d: AllDeadLetters =>
|
case d: AllDeadLetters =>
|
||||||
incrementCount()
|
if (!isWrappedSuppressed(d)) {
|
||||||
logDeadLetter(d, doneMsg = "")
|
incrementCount()
|
||||||
|
logDeadLetter(d, doneMsg = "")
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private def receiveWithMaxCountLogging: Receive = {
|
private def receiveWithMaxCountLogging: Receive = {
|
||||||
case d: AllDeadLetters =>
|
case d: AllDeadLetters =>
|
||||||
incrementCount()
|
if (!isWrappedSuppressed(d)) {
|
||||||
if (count == maxCount) {
|
incrementCount()
|
||||||
logDeadLetter(d, ", no more dead letters will be logged")
|
if (count == maxCount) {
|
||||||
context.stop(self)
|
logDeadLetter(d, ", no more dead letters will be logged")
|
||||||
} else {
|
context.stop(self)
|
||||||
logDeadLetter(d, "")
|
} else {
|
||||||
|
logDeadLetter(d, "")
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private def receiveWithSuspendLogging(suspendDuration: FiniteDuration): Receive = {
|
private def receiveWithSuspendLogging(suspendDuration: FiniteDuration): Receive = {
|
||||||
case d: AllDeadLetters =>
|
case d: AllDeadLetters =>
|
||||||
incrementCount()
|
if (!isWrappedSuppressed(d)) {
|
||||||
if (count == maxCount) {
|
incrementCount()
|
||||||
val doneMsg = s", no more dead letters will be logged in next [${suspendDuration.pretty}]"
|
if (count == maxCount) {
|
||||||
logDeadLetter(d, doneMsg)
|
val doneMsg = s", no more dead letters will be logged in next [${suspendDuration.pretty}]"
|
||||||
context.become(receiveWhenSuspended(suspendDuration, Deadline.now + suspendDuration))
|
logDeadLetter(d, doneMsg)
|
||||||
} else
|
context.become(receiveWhenSuspended(suspendDuration, Deadline.now + suspendDuration))
|
||||||
logDeadLetter(d, "")
|
} else
|
||||||
|
logDeadLetter(d, "")
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private def receiveWhenSuspended(suspendDuration: FiniteDuration, suspendDeadline: Deadline): Receive = {
|
private def receiveWhenSuspended(suspendDuration: FiniteDuration, suspendDeadline: Deadline): Receive = {
|
||||||
case d: AllDeadLetters =>
|
case d: AllDeadLetters =>
|
||||||
incrementCount()
|
if (!isWrappedSuppressed(d)) {
|
||||||
if (suspendDeadline.isOverdue()) {
|
incrementCount()
|
||||||
val doneMsg = s", of which ${count - maxCount - 1} were not logged. The counter will be reset now"
|
if (suspendDeadline.isOverdue()) {
|
||||||
logDeadLetter(d, doneMsg)
|
val doneMsg = s", of which ${count - maxCount - 1} were not logged. The counter will be reset now"
|
||||||
count = 0
|
logDeadLetter(d, doneMsg)
|
||||||
context.become(receiveWithSuspendLogging(suspendDuration))
|
count = 0
|
||||||
|
context.become(receiveWithSuspendLogging(suspendDuration))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -126,4 +135,11 @@ class DeadLetterListener extends Actor {
|
||||||
(snd ne ActorRef.noSender) && (snd ne context.system.deadLetters) && !snd.isInstanceOf[DeadLetterActorRef]
|
(snd ne ActorRef.noSender) && (snd ne context.system.deadLetters) && !snd.isInstanceOf[DeadLetterActorRef]
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private def isWrappedSuppressed(d: AllDeadLetters): Boolean = {
|
||||||
|
d.message match {
|
||||||
|
case w: WrappedMessage if w.message.isInstanceOf[DeadLetterSuppression] => true
|
||||||
|
case _ => false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue