* Logging of UnhandledMessage, #28260 * make use of the existing logging of dead letter also for UnhandledMessage * Akka Typed publishes UnhandledMessage in same way so those will also be logged * fix test failure in EventStreamSpec * feedback
This commit is contained in:
parent
44ad322a56
commit
aae7604dce
7 changed files with 43 additions and 11 deletions
|
|
@ -21,11 +21,21 @@ object DeadLetterSuspensionSpec {
|
|||
context.system.eventStream.publish(Dropped(n, "Don't like numbers", self))
|
||||
}
|
||||
}
|
||||
|
||||
object Unandled {
|
||||
def props(): Props = Props(new Unandled)
|
||||
}
|
||||
|
||||
class Unandled extends Actor {
|
||||
override def receive: Receive = {
|
||||
case n: Int => unhandled(n)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
class DeadLetterSuspensionSpec extends AkkaSpec("""
|
||||
akka.loglevel = INFO
|
||||
akka.log-dead-letters = 3
|
||||
akka.log-dead-letters = 4
|
||||
akka.log-dead-letters-suspend-duration = 2s
|
||||
""") with ImplicitSender {
|
||||
import DeadLetterSuspensionSpec._
|
||||
|
|
@ -36,6 +46,7 @@ class DeadLetterSuspensionSpec extends AkkaSpec("""
|
|||
expectTerminated(deadActor)
|
||||
|
||||
private val droppingActor = system.actorOf(Dropping.props(), "droppingActor")
|
||||
private val unhandledActor = system.actorOf(Unandled.props(), "unhandledActor")
|
||||
|
||||
private def expectedDeadLettersLogMessage(count: Int): String =
|
||||
s"Message [java.lang.Integer] from $testActor to $deadActor was not delivered. [$count] dead letters encountered"
|
||||
|
|
@ -43,6 +54,9 @@ class DeadLetterSuspensionSpec extends AkkaSpec("""
|
|||
private def expectedDroppedLogMessage(count: Int): String =
|
||||
s"Message [java.lang.Integer] to $droppingActor was dropped. Don't like numbers. [$count] dead letters encountered"
|
||||
|
||||
private def expectedUnhandledLogMessage(count: Int): String =
|
||||
s"Message [java.lang.Integer] from $testActor to $unhandledActor was unhandled. [$count] dead letters encountered"
|
||||
|
||||
"must suspend dead-letters logging when reaching 'akka.log-dead-letters', and then re-enable" in {
|
||||
EventFilter.info(start = expectedDeadLettersLogMessage(1), occurrences = 1).intercept {
|
||||
deadActor ! 1
|
||||
|
|
@ -50,27 +64,30 @@ class DeadLetterSuspensionSpec extends AkkaSpec("""
|
|||
EventFilter.info(start = expectedDroppedLogMessage(2), occurrences = 1).intercept {
|
||||
droppingActor ! 2
|
||||
}
|
||||
EventFilter.info(start = expectedUnhandledLogMessage(3), occurrences = 1).intercept {
|
||||
unhandledActor ! 3
|
||||
}
|
||||
EventFilter
|
||||
.info(start = expectedDeadLettersLogMessage(3) + ", no more dead letters will be logged in next", occurrences = 1)
|
||||
.info(start = expectedDeadLettersLogMessage(4) + ", no more dead letters will be logged in next", occurrences = 1)
|
||||
.intercept {
|
||||
deadActor ! 3
|
||||
deadActor ! 4
|
||||
}
|
||||
deadActor ! 4
|
||||
droppingActor ! 5
|
||||
deadActor ! 5
|
||||
droppingActor ! 6
|
||||
|
||||
// let suspend-duration elapse
|
||||
Thread.sleep(2050)
|
||||
|
||||
// re-enabled
|
||||
EventFilter
|
||||
.info(start = expectedDeadLettersLogMessage(6) + ", of which 2 were not logged", occurrences = 1)
|
||||
.info(start = expectedDeadLettersLogMessage(7) + ", of which 2 were not logged", occurrences = 1)
|
||||
.intercept {
|
||||
deadActor ! 6
|
||||
deadActor ! 7
|
||||
}
|
||||
|
||||
// reset count
|
||||
EventFilter.info(start = expectedDeadLettersLogMessage(1), occurrences = 1).intercept {
|
||||
deadActor ! 7
|
||||
deadActor ! 8
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -26,6 +26,7 @@ object EventStreamSpec {
|
|||
stdout-loglevel = WARNING
|
||||
loglevel = WARNING
|
||||
actor.debug.unhandled = on
|
||||
log-dead-letters = off
|
||||
}
|
||||
""")
|
||||
|
||||
|
|
|
|||
|
|
@ -47,7 +47,8 @@ akka {
|
|||
# This is useful when you are uncertain of what configuration is used.
|
||||
log-config-on-start = off
|
||||
|
||||
# Log at info level when messages are sent to dead letters.
|
||||
# Log at info level when messages are sent to dead letters, or published to
|
||||
# eventStream as `DeadLetter`, `Dropped` or `UnhandledMessage`.
|
||||
# Possible values:
|
||||
# on: all dead letters are logged
|
||||
# off: no logging of dead letters
|
||||
|
|
|
|||
|
|
@ -289,6 +289,7 @@ final case class UnhandledMessage(
|
|||
@BeanProperty recipient: ActorRef)
|
||||
extends NoSerializationVerificationNeeded
|
||||
with WrappedMessage
|
||||
with AllDeadLetters
|
||||
|
||||
/**
|
||||
* Classes for passing status back to the sender.
|
||||
|
|
|
|||
|
|
@ -7,10 +7,11 @@ package akka.actor
|
|||
import java.util.concurrent.ConcurrentHashMap
|
||||
|
||||
import akka.annotation.InternalApi
|
||||
|
||||
import scala.annotation.tailrec
|
||||
import scala.collection.immutable
|
||||
import scala.util.control.NonFatal
|
||||
|
||||
import akka.annotation.DoNotInherit
|
||||
import akka.dispatch._
|
||||
import akka.dispatch.sysmsg._
|
||||
import akka.event.AddressTerminatedTopic
|
||||
|
|
@ -478,8 +479,11 @@ private[akka] trait MinimalActorRef extends InternalActorRef with LocalRef {
|
|||
/**
|
||||
* Subscribe to this class to be notified about all [[DeadLetter]] (also the suppressed ones)
|
||||
* and [[Dropped]].
|
||||
*
|
||||
* Not for user extension
|
||||
*/
|
||||
sealed trait AllDeadLetters extends WrappedMessage {
|
||||
@DoNotInherit
|
||||
trait AllDeadLetters extends WrappedMessage {
|
||||
def message: Any
|
||||
def sender: ActorRef
|
||||
def recipient: ActorRef
|
||||
|
|
|
|||
|
|
@ -15,6 +15,7 @@ import akka.actor.DeadLetter
|
|||
import akka.actor.DeadLetterActorRef
|
||||
import akka.actor.DeadLetterSuppression
|
||||
import akka.actor.Dropped
|
||||
import akka.actor.UnhandledMessage
|
||||
import akka.actor.WrappedMessage
|
||||
import akka.event.Logging.Info
|
||||
import akka.util.PrettyDuration._
|
||||
|
|
@ -29,6 +30,7 @@ class DeadLetterListener extends Actor {
|
|||
override def preStart(): Unit = {
|
||||
eventStream.subscribe(self, classOf[DeadLetter])
|
||||
eventStream.subscribe(self, classOf[Dropped])
|
||||
eventStream.subscribe(self, classOf[UnhandledMessage])
|
||||
}
|
||||
|
||||
// don't re-subscribe, skip call to preStart
|
||||
|
|
@ -115,6 +117,10 @@ class DeadLetterListener extends Actor {
|
|||
val destination = if (isReal(d.recipient)) s" to ${d.recipient}" else ""
|
||||
s"Message [$messageStr]$wrappedIn$origin$destination was dropped. ${dropped.reason}. " +
|
||||
s"[$count] dead letters encountered$doneMsg. "
|
||||
case _: UnhandledMessage =>
|
||||
val destination = if (isReal(d.recipient)) s" to ${d.recipient}" else ""
|
||||
s"Message [$messageStr]$wrappedIn$origin$destination was unhandled. " +
|
||||
s"[$count] dead letters encountered$doneMsg. "
|
||||
case _ =>
|
||||
s"Message [$messageStr]$wrappedIn$origin to ${d.recipient} was not delivered. " +
|
||||
s"[$count] dead letters encountered$doneMsg. " +
|
||||
|
|
|
|||
|
|
@ -1637,6 +1637,8 @@ class LogMarker(val name: String, val properties: Map[String, Any]) {
|
|||
import akka.util.ccompat.JavaConverters._
|
||||
properties.map { case (k, v) => (k, v.asInstanceOf[AnyRef]) }.asJava
|
||||
}
|
||||
|
||||
override def toString: String = s"LogMarker($name,$properties)"
|
||||
}
|
||||
|
||||
object LogMarker {
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue