diff --git a/akka-actor-tests/src/test/scala/akka/event/EventStreamSpec.scala b/akka-actor-tests/src/test/scala/akka/event/EventStreamSpec.scala index add8173085..d2497c4a69 100644 --- a/akka-actor-tests/src/test/scala/akka/event/EventStreamSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/event/EventStreamSpec.scala @@ -5,10 +5,11 @@ package akka.event import akka.testkit.AkkaSpec import akka.util.duration._ -import akka.actor.{ Actor, ActorRef, ActorSystemImpl } +import akka.actor.{ Actor, ActorRef, ActorSystemImpl, ActorSystem, Props, UnhandledMessage } import com.typesafe.config.ConfigFactory import scala.collection.JavaConverters._ -import akka.actor.ActorSystem +import akka.event.Logging.InitializeLogger +import akka.pattern.gracefulStop object EventStreamSpec { @@ -20,6 +21,14 @@ object EventStreamSpec { } """.format(Logging.StandardOutLoggerName)) + val configUnhandled = ConfigFactory.parseString(""" + akka { + stdout-loglevel = WARNING + loglevel = DEBUG + actor.debug.unhandled = on + } + """) + case class M(i: Int) case class SetTarget(ref: ActorRef) @@ -27,9 +36,13 @@ object EventStreamSpec { class MyLog extends Actor { var dst: ActorRef = context.system.deadLetters def receive = { - case Logging.InitializeLogger(bus) ⇒ bus.subscribe(context.self, classOf[SetTarget]); sender ! Logging.LoggerInitialized - case SetTarget(ref) ⇒ dst = ref; dst ! "OK" - case e: Logging.LogEvent ⇒ dst ! e + case Logging.InitializeLogger(bus) ⇒ + bus.subscribe(context.self, classOf[SetTarget]) + bus.subscribe(context.self, classOf[UnhandledMessage]) + sender ! Logging.LoggerInitialized + case SetTarget(ref) ⇒ dst = ref; dst ! "OK" + case e: Logging.LogEvent ⇒ dst ! e + case u: UnhandledMessage ⇒ dst ! u } } @@ -61,6 +74,19 @@ class EventStreamSpec extends AkkaSpec(EventStreamSpec.config) { } } + "be able to log unhandled messages" in { + val sys = ActorSystem("EventStreamSpecUnhandled", configUnhandled) + try { + sys.eventStream.subscribe(testActor, classOf[AnyRef]) + val m = UnhandledMessage(42, sys.deadLetters, sys.deadLetters) + sys.eventStream.publish(m) + expectMsgAllOf(m, Logging.Debug(sys.deadLetters.path.toString, sys.deadLetters.getClass, "unhandled message from " + sys.deadLetters + ": 42")) + sys.eventStream.unsubscribe(testActor) + } finally { + sys.shutdown() + } + } + "manage log levels" in { val bus = new EventStream(false) bus.startDefaultLoggers(impl) diff --git a/akka-actor/src/main/resources/reference.conf b/akka-actor/src/main/resources/reference.conf index 3f38cfeca0..742b5e86d2 100644 --- a/akka-actor/src/main/resources/reference.conf +++ b/akka-actor/src/main/resources/reference.conf @@ -14,6 +14,11 @@ akka { # Event handlers to register at boot time (Logging$DefaultLogger logs to STDOUT) event-handlers = ["akka.event.Logging$DefaultLogger"] + + # Event handlers are created and registered synchronously during ActorSystem + # start-up, and since they are actors, this timeout is used to bound the + # waiting time + event-handler-startup-timeout = 5s # Log level used by the configured loggers (see "event-handlers") as soon # as they have been started; before that, see "stdout-loglevel" @@ -275,6 +280,9 @@ akka { # enable DEBUG logging of subscription changes on the eventStream event-stream = off + + # enable DEBUG logging of unhandled messages + unhandled = off } # Entries for pluggable serializers and their bindings. diff --git a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala index 4112905711..3520093c7e 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala @@ -125,12 +125,15 @@ object ActorSystem { final val LogLevel = getString("akka.loglevel") final val StdoutLogLevel = getString("akka.stdout-loglevel") final val EventHandlers: Seq[String] = getStringList("akka.event-handlers").asScala + final val EventHandlerStartTimeout = Timeout(Duration(getMilliseconds("akka.event-handler-startup-timeout"), MILLISECONDS)) final val LogConfigOnStart = config.getBoolean("akka.log-config-on-start") + final val AddLoggingReceive = getBoolean("akka.actor.debug.receive") final val DebugAutoReceive = getBoolean("akka.actor.debug.autoreceive") final val DebugLifecycle = getBoolean("akka.actor.debug.lifecycle") final val FsmDebugEvent = getBoolean("akka.actor.debug.fsm") final val DebugEventStream = getBoolean("akka.actor.debug.event-stream") + final val DebugUnhandledMessage = getBoolean("akka.actor.debug.unhandled") final val Home = config.getString("akka.home") match { case "" ⇒ None diff --git a/akka-actor/src/main/scala/akka/event/Logging.scala b/akka-actor/src/main/scala/akka/event/Logging.scala index 1a2cb2c520..bf4fc7996d 100644 --- a/akka-actor/src/main/scala/akka/event/Logging.scala +++ b/akka-actor/src/main/scala/akka/event/Logging.scala @@ -96,26 +96,40 @@ trait LoggingBus extends ActorEventBus { case Nil ⇒ "akka.event.Logging$DefaultLogger" :: Nil case loggers ⇒ loggers } - val myloggers = for { - loggerName ← defaultLoggers - if loggerName != StandardOutLoggerName - } yield { - try { - system.dynamicAccess.getClassFor[Actor](loggerName) match { - case Right(actorClass) ⇒ addLogger(system, actorClass, level, logName) - case Left(exception) ⇒ throw exception + val myloggers = + for { + loggerName ← defaultLoggers + if loggerName != StandardOutLoggerName + } yield { + try { + system.dynamicAccess.getClassFor[Actor](loggerName) match { + case Right(actorClass) ⇒ addLogger(system, actorClass, level, logName) + case Left(exception) ⇒ throw exception + } + } catch { + case e: Exception ⇒ + throw new ConfigurationException( + "Event Handler specified in config can't be loaded [" + loggerName + + "] due to [" + e.toString + "]", e) } - } catch { - case e: Exception ⇒ - throw new ConfigurationException( - "Event Handler specified in config can't be loaded [" + loggerName + - "] due to [" + e.toString + "]", e) } - } guard.withGuard { loggers = myloggers _logLevel = level } + try { + if (system.settings.DebugUnhandledMessage) + subscribe(system.systemActorOf(Props(new Actor { + println("started" + self) + def receive = { + case UnhandledMessage(msg, sender, rcp) ⇒ + println("got it") + publish(Debug(rcp.path.toString, rcp.getClass, "unhandled message from " + sender + ": " + msg)) + } + }), "UnhandledMessageForwarder"), classOf[UnhandledMessage]) + } catch { + case _: InvalidActorNameException ⇒ // ignore if it is already running + } publish(Debug(logName, this.getClass, "Default Loggers started")) if (!(defaultLoggers contains StandardOutLoggerName)) { unsubscribe(StandardOutLogger) @@ -154,7 +168,7 @@ trait LoggingBus extends ActorEventBus { private def addLogger(system: ActorSystemImpl, clazz: Class[_ <: Actor], level: LogLevel, logName: String): ActorRef = { val name = "log" + Extension(system).id() + "-" + simpleName(clazz) val actor = system.systemActorOf(Props(clazz), name) - implicit val timeout = Timeout(5 seconds) + implicit def timeout = system.settings.EventHandlerStartTimeout import akka.pattern.ask val response = try Await.result(actor ? InitializeLogger(this), timeout.duration) catch { case _: TimeoutException ⇒ diff --git a/akka-docs/scala/actors.rst b/akka-docs/scala/actors.rst index 0c98960db1..fae84c080f 100644 --- a/akka-docs/scala/actors.rst +++ b/akka-docs/scala/actors.rst @@ -150,7 +150,9 @@ The :class:`Actor` trait defines only one abstract method, the above mentioned If the current actor behavior does not match a received message, :meth:`unhandled` is called, which by default publishes an ``akka.actor.UnhandledMessage(message, sender, recipient)`` on the actor -system’s event stream. +system’s event stream (set configuration item +``akka.event-handler-startup-timeout`` to ``true`` to have them converted into +actual Debug messages) In addition, it offers: diff --git a/akka-testkit/src/main/scala/akka/testkit/TestEventListener.scala b/akka-testkit/src/main/scala/akka/testkit/TestEventListener.scala index 56ced17370..97fe6e99aa 100644 --- a/akka-testkit/src/main/scala/akka/testkit/TestEventListener.scala +++ b/akka-testkit/src/main/scala/akka/testkit/TestEventListener.scala @@ -4,7 +4,7 @@ package akka.testkit import scala.util.matching.Regex -import akka.actor.{ DeadLetter, ActorSystem, Terminated } +import akka.actor.{ DeadLetter, ActorSystem, Terminated, UnhandledMessage } import akka.dispatch.{ SystemMessage, Terminate } import akka.event.Logging.{ Warning, LogEvent, InitializeLogger, Info, Error, Debug, LoggerInitialized } import akka.event.Logging @@ -447,7 +447,7 @@ class TestEventListener extends Logging.DefaultLogger { override def receive = { case InitializeLogger(bus) ⇒ - Seq(classOf[Mute], classOf[UnMute], classOf[DeadLetter]) foreach (bus.subscribe(context.self, _)) + Seq(classOf[Mute], classOf[UnMute], classOf[DeadLetter], classOf[UnhandledMessage]) foreach (bus.subscribe(context.self, _)) sender ! LoggerInitialized case Mute(filters) ⇒ filters foreach addFilter case UnMute(filters) ⇒ filters foreach removeFilter @@ -462,6 +462,9 @@ class TestEventListener extends Logging.DefaultLogger { val event = Warning(rcp.path.toString, rcp.getClass, "received dead letter from " + snd + ": " + msg) if (!filter(event)) print(event) } + case UnhandledMessage(msg, sender, rcp) ⇒ + val event = Warning(rcp.path.toString, rcp.getClass, "unhandled message from " + sender + ": " + msg) + if (!filter(event)) print(event) case m ⇒ print(Debug(context.system.name, this.getClass, m)) } diff --git a/akka-testkit/src/test/scala/akka/testkit/AkkaSpec.scala b/akka-testkit/src/test/scala/akka/testkit/AkkaSpec.scala index 3a0f02c79a..fd763e6bad 100644 --- a/akka-testkit/src/test/scala/akka/testkit/AkkaSpec.scala +++ b/akka-testkit/src/test/scala/akka/testkit/AkkaSpec.scala @@ -92,6 +92,18 @@ class AkkaSpecSpec extends WordSpec with MustMatchers { "An AkkaSpec" must { + "warn about unhandled messages" in { + implicit val system = ActorSystem("AkkaSpec0", AkkaSpec.testConf) + try { + val a = system.actorOf(Props.empty) + EventFilter.warning(start = "unhandled message", occurrences = 1) intercept { + a ! 42 + } + } finally { + system.shutdown() + } + } + "terminate all actors" in { // verbose config just for demonstration purposes, please leave in in case of debugging import scala.collection.JavaConverters._