Merge pull request #408 from akka/wip-1999-log-unhandled-∂π
add option to log UnhandledMessage, see #1999
This commit is contained in:
commit
65d9b412bb
7 changed files with 91 additions and 23 deletions
|
|
@ -5,10 +5,11 @@ package akka.event
|
|||
|
||||
import akka.testkit.AkkaSpec
|
||||
import akka.util.duration._
|
||||
import akka.actor.{ Actor, ActorRef, ActorSystemImpl }
|
||||
import akka.actor.{ Actor, ActorRef, ActorSystemImpl, ActorSystem, Props, UnhandledMessage }
|
||||
import com.typesafe.config.ConfigFactory
|
||||
import scala.collection.JavaConverters._
|
||||
import akka.actor.ActorSystem
|
||||
import akka.event.Logging.InitializeLogger
|
||||
import akka.pattern.gracefulStop
|
||||
|
||||
object EventStreamSpec {
|
||||
|
||||
|
|
@ -20,6 +21,14 @@ object EventStreamSpec {
|
|||
}
|
||||
""".format(Logging.StandardOutLoggerName))
|
||||
|
||||
val configUnhandled = ConfigFactory.parseString("""
|
||||
akka {
|
||||
stdout-loglevel = WARNING
|
||||
loglevel = DEBUG
|
||||
actor.debug.unhandled = on
|
||||
}
|
||||
""")
|
||||
|
||||
case class M(i: Int)
|
||||
|
||||
case class SetTarget(ref: ActorRef)
|
||||
|
|
@ -27,9 +36,13 @@ object EventStreamSpec {
|
|||
class MyLog extends Actor {
|
||||
var dst: ActorRef = context.system.deadLetters
|
||||
def receive = {
|
||||
case Logging.InitializeLogger(bus) ⇒ bus.subscribe(context.self, classOf[SetTarget]); sender ! Logging.LoggerInitialized
|
||||
case SetTarget(ref) ⇒ dst = ref; dst ! "OK"
|
||||
case e: Logging.LogEvent ⇒ dst ! e
|
||||
case Logging.InitializeLogger(bus) ⇒
|
||||
bus.subscribe(context.self, classOf[SetTarget])
|
||||
bus.subscribe(context.self, classOf[UnhandledMessage])
|
||||
sender ! Logging.LoggerInitialized
|
||||
case SetTarget(ref) ⇒ dst = ref; dst ! "OK"
|
||||
case e: Logging.LogEvent ⇒ dst ! e
|
||||
case u: UnhandledMessage ⇒ dst ! u
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -61,6 +74,19 @@ class EventStreamSpec extends AkkaSpec(EventStreamSpec.config) {
|
|||
}
|
||||
}
|
||||
|
||||
"be able to log unhandled messages" in {
|
||||
val sys = ActorSystem("EventStreamSpecUnhandled", configUnhandled)
|
||||
try {
|
||||
sys.eventStream.subscribe(testActor, classOf[AnyRef])
|
||||
val m = UnhandledMessage(42, sys.deadLetters, sys.deadLetters)
|
||||
sys.eventStream.publish(m)
|
||||
expectMsgAllOf(m, Logging.Debug(sys.deadLetters.path.toString, sys.deadLetters.getClass, "unhandled message from " + sys.deadLetters + ": 42"))
|
||||
sys.eventStream.unsubscribe(testActor)
|
||||
} finally {
|
||||
sys.shutdown()
|
||||
}
|
||||
}
|
||||
|
||||
"manage log levels" in {
|
||||
val bus = new EventStream(false)
|
||||
bus.startDefaultLoggers(impl)
|
||||
|
|
|
|||
|
|
@ -14,6 +14,11 @@ akka {
|
|||
|
||||
# Event handlers to register at boot time (Logging$DefaultLogger logs to STDOUT)
|
||||
event-handlers = ["akka.event.Logging$DefaultLogger"]
|
||||
|
||||
# Event handlers are created and registered synchronously during ActorSystem
|
||||
# start-up, and since they are actors, this timeout is used to bound the
|
||||
# waiting time
|
||||
event-handler-startup-timeout = 5s
|
||||
|
||||
# Log level used by the configured loggers (see "event-handlers") as soon
|
||||
# as they have been started; before that, see "stdout-loglevel"
|
||||
|
|
@ -275,6 +280,9 @@ akka {
|
|||
|
||||
# enable DEBUG logging of subscription changes on the eventStream
|
||||
event-stream = off
|
||||
|
||||
# enable DEBUG logging of unhandled messages
|
||||
unhandled = off
|
||||
}
|
||||
|
||||
# Entries for pluggable serializers and their bindings.
|
||||
|
|
|
|||
|
|
@ -125,12 +125,15 @@ object ActorSystem {
|
|||
final val LogLevel = getString("akka.loglevel")
|
||||
final val StdoutLogLevel = getString("akka.stdout-loglevel")
|
||||
final val EventHandlers: Seq[String] = getStringList("akka.event-handlers").asScala
|
||||
final val EventHandlerStartTimeout = Timeout(Duration(getMilliseconds("akka.event-handler-startup-timeout"), MILLISECONDS))
|
||||
final val LogConfigOnStart = config.getBoolean("akka.log-config-on-start")
|
||||
|
||||
final val AddLoggingReceive = getBoolean("akka.actor.debug.receive")
|
||||
final val DebugAutoReceive = getBoolean("akka.actor.debug.autoreceive")
|
||||
final val DebugLifecycle = getBoolean("akka.actor.debug.lifecycle")
|
||||
final val FsmDebugEvent = getBoolean("akka.actor.debug.fsm")
|
||||
final val DebugEventStream = getBoolean("akka.actor.debug.event-stream")
|
||||
final val DebugUnhandledMessage = getBoolean("akka.actor.debug.unhandled")
|
||||
|
||||
final val Home = config.getString("akka.home") match {
|
||||
case "" ⇒ None
|
||||
|
|
|
|||
|
|
@ -96,26 +96,40 @@ trait LoggingBus extends ActorEventBus {
|
|||
case Nil ⇒ "akka.event.Logging$DefaultLogger" :: Nil
|
||||
case loggers ⇒ loggers
|
||||
}
|
||||
val myloggers = for {
|
||||
loggerName ← defaultLoggers
|
||||
if loggerName != StandardOutLoggerName
|
||||
} yield {
|
||||
try {
|
||||
system.dynamicAccess.getClassFor[Actor](loggerName) match {
|
||||
case Right(actorClass) ⇒ addLogger(system, actorClass, level, logName)
|
||||
case Left(exception) ⇒ throw exception
|
||||
val myloggers =
|
||||
for {
|
||||
loggerName ← defaultLoggers
|
||||
if loggerName != StandardOutLoggerName
|
||||
} yield {
|
||||
try {
|
||||
system.dynamicAccess.getClassFor[Actor](loggerName) match {
|
||||
case Right(actorClass) ⇒ addLogger(system, actorClass, level, logName)
|
||||
case Left(exception) ⇒ throw exception
|
||||
}
|
||||
} catch {
|
||||
case e: Exception ⇒
|
||||
throw new ConfigurationException(
|
||||
"Event Handler specified in config can't be loaded [" + loggerName +
|
||||
"] due to [" + e.toString + "]", e)
|
||||
}
|
||||
} catch {
|
||||
case e: Exception ⇒
|
||||
throw new ConfigurationException(
|
||||
"Event Handler specified in config can't be loaded [" + loggerName +
|
||||
"] due to [" + e.toString + "]", e)
|
||||
}
|
||||
}
|
||||
guard.withGuard {
|
||||
loggers = myloggers
|
||||
_logLevel = level
|
||||
}
|
||||
try {
|
||||
if (system.settings.DebugUnhandledMessage)
|
||||
subscribe(system.systemActorOf(Props(new Actor {
|
||||
println("started" + self)
|
||||
def receive = {
|
||||
case UnhandledMessage(msg, sender, rcp) ⇒
|
||||
println("got it")
|
||||
publish(Debug(rcp.path.toString, rcp.getClass, "unhandled message from " + sender + ": " + msg))
|
||||
}
|
||||
}), "UnhandledMessageForwarder"), classOf[UnhandledMessage])
|
||||
} catch {
|
||||
case _: InvalidActorNameException ⇒ // ignore if it is already running
|
||||
}
|
||||
publish(Debug(logName, this.getClass, "Default Loggers started"))
|
||||
if (!(defaultLoggers contains StandardOutLoggerName)) {
|
||||
unsubscribe(StandardOutLogger)
|
||||
|
|
@ -154,7 +168,7 @@ trait LoggingBus extends ActorEventBus {
|
|||
private def addLogger(system: ActorSystemImpl, clazz: Class[_ <: Actor], level: LogLevel, logName: String): ActorRef = {
|
||||
val name = "log" + Extension(system).id() + "-" + simpleName(clazz)
|
||||
val actor = system.systemActorOf(Props(clazz), name)
|
||||
implicit val timeout = Timeout(5 seconds)
|
||||
implicit def timeout = system.settings.EventHandlerStartTimeout
|
||||
import akka.pattern.ask
|
||||
val response = try Await.result(actor ? InitializeLogger(this), timeout.duration) catch {
|
||||
case _: TimeoutException ⇒
|
||||
|
|
|
|||
|
|
@ -150,7 +150,9 @@ The :class:`Actor` trait defines only one abstract method, the above mentioned
|
|||
If the current actor behavior does not match a received message,
|
||||
:meth:`unhandled` is called, which by default publishes an
|
||||
``akka.actor.UnhandledMessage(message, sender, recipient)`` on the actor
|
||||
system’s event stream.
|
||||
system’s event stream (set configuration item
|
||||
``akka.event-handler-startup-timeout`` to ``true`` to have them converted into
|
||||
actual Debug messages)
|
||||
|
||||
In addition, it offers:
|
||||
|
||||
|
|
|
|||
|
|
@ -4,7 +4,7 @@
|
|||
package akka.testkit
|
||||
|
||||
import scala.util.matching.Regex
|
||||
import akka.actor.{ DeadLetter, ActorSystem, Terminated }
|
||||
import akka.actor.{ DeadLetter, ActorSystem, Terminated, UnhandledMessage }
|
||||
import akka.dispatch.{ SystemMessage, Terminate }
|
||||
import akka.event.Logging.{ Warning, LogEvent, InitializeLogger, Info, Error, Debug, LoggerInitialized }
|
||||
import akka.event.Logging
|
||||
|
|
@ -447,7 +447,7 @@ class TestEventListener extends Logging.DefaultLogger {
|
|||
|
||||
override def receive = {
|
||||
case InitializeLogger(bus) ⇒
|
||||
Seq(classOf[Mute], classOf[UnMute], classOf[DeadLetter]) foreach (bus.subscribe(context.self, _))
|
||||
Seq(classOf[Mute], classOf[UnMute], classOf[DeadLetter], classOf[UnhandledMessage]) foreach (bus.subscribe(context.self, _))
|
||||
sender ! LoggerInitialized
|
||||
case Mute(filters) ⇒ filters foreach addFilter
|
||||
case UnMute(filters) ⇒ filters foreach removeFilter
|
||||
|
|
@ -462,6 +462,9 @@ class TestEventListener extends Logging.DefaultLogger {
|
|||
val event = Warning(rcp.path.toString, rcp.getClass, "received dead letter from " + snd + ": " + msg)
|
||||
if (!filter(event)) print(event)
|
||||
}
|
||||
case UnhandledMessage(msg, sender, rcp) ⇒
|
||||
val event = Warning(rcp.path.toString, rcp.getClass, "unhandled message from " + sender + ": " + msg)
|
||||
if (!filter(event)) print(event)
|
||||
case m ⇒ print(Debug(context.system.name, this.getClass, m))
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -92,6 +92,18 @@ class AkkaSpecSpec extends WordSpec with MustMatchers {
|
|||
|
||||
"An AkkaSpec" must {
|
||||
|
||||
"warn about unhandled messages" in {
|
||||
implicit val system = ActorSystem("AkkaSpec0", AkkaSpec.testConf)
|
||||
try {
|
||||
val a = system.actorOf(Props.empty)
|
||||
EventFilter.warning(start = "unhandled message", occurrences = 1) intercept {
|
||||
a ! 42
|
||||
}
|
||||
} finally {
|
||||
system.shutdown()
|
||||
}
|
||||
}
|
||||
|
||||
"terminate all actors" in {
|
||||
// verbose config just for demonstration purposes, please leave in in case of debugging
|
||||
import scala.collection.JavaConverters._
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue