diff --git a/akka-actor-tests/src/test/scala/akka/event/EventStreamSpec.scala b/akka-actor-tests/src/test/scala/akka/event/EventStreamSpec.scala index 99b039164b..22ecb6ac73 100644 --- a/akka-actor-tests/src/test/scala/akka/event/EventStreamSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/event/EventStreamSpec.scala @@ -8,7 +8,7 @@ import akka.config.Configuration import akka.util.duration._ import akka.actor.{ Actor, ActorRef } -object MainBusSpec { +object EventStreamSpec { case class M(i: Int) case class SetTarget(ref: ActorRef) @@ -16,7 +16,7 @@ object MainBusSpec { class MyLog extends Actor { var dst: ActorRef = app.deadLetters def receive = { - case Logging.InitializeLogger(bus) ⇒ bus.subscribe(context.self, classOf[SetTarget]) + case Logging.InitializeLogger(bus) ⇒ bus.subscribe(context.self, classOf[SetTarget]); sender ! Logging.LoggerInitialized case SetTarget(ref) ⇒ dst = ref; dst ! "OK" case e: Logging.LogEvent ⇒ dst ! e } @@ -32,9 +32,9 @@ object MainBusSpec { class EventStreamSpec extends AkkaSpec(Configuration( "akka.stdout-loglevel" -> "WARNING", "akka.loglevel" -> "INFO", - "akka.event-handlers" -> Seq("akka.event.MainBusSpec$MyLog", Logging.StandardOutLoggerName))) { + "akka.event-handlers" -> Seq("akka.event.EventStreamSpec$MyLog", Logging.StandardOutLoggerName))) { - import MainBusSpec._ + import EventStreamSpec._ "An EventStream" must { @@ -55,10 +55,8 @@ class EventStreamSpec extends AkkaSpec(Configuration( val bus = new EventStream(false) bus.start(app) bus.startDefaultLoggers(app, app.AkkaConfig) - awaitCond({ - bus.publish(SetTarget(testActor)) - receiveOne(0.5 seconds) == "OK" - }, 5 seconds) + bus.publish(SetTarget(testActor)) + expectMsg("OK") within(2 seconds) { import Logging._ verifyLevel(bus, InfoLevel) diff --git a/akka-actor/src/main/scala/akka/actor/package.scala b/akka-actor/src/main/scala/akka/actor/package.scala index 49d4a2d660..a3ffb5ae97 100644 --- a/akka-actor/src/main/scala/akka/actor/package.scala +++ b/akka-actor/src/main/scala/akka/actor/package.scala @@ -23,7 +23,13 @@ package object actor { def simpleName(obj: AnyRef): String = { val n = obj.getClass.getName val i = n.lastIndexOf('.') - n.substring(i + 1).replaceAll("\\$+", ".") + n.substring(i + 1) + } + + def simpleName(clazz: Class[_]): String = { + val n = clazz.getName + val i = n.lastIndexOf('.') + n.substring(i + 1) } implicit def future2actor[T](f: akka.dispatch.Future[T]) = new { diff --git a/akka-actor/src/main/scala/akka/event/Logging.scala b/akka-actor/src/main/scala/akka/event/Logging.scala index e1c8412df4..966b7a534e 100644 --- a/akka-actor/src/main/scala/akka/event/Logging.scala +++ b/akka-actor/src/main/scala/akka/event/Logging.scala @@ -3,12 +3,16 @@ */ package akka.event -import akka.actor.{ Actor, ActorPath, ActorRef, MinimalActorRef, LocalActorRef, Props, ActorSystem } +import akka.actor.{ Actor, ActorPath, ActorRef, MinimalActorRef, LocalActorRef, Props, ActorSystem, simpleName } import akka.AkkaException import akka.actor.ActorSystem.AkkaConfig import akka.util.ReflectiveAccess import akka.config.ConfigurationException import akka.util.ReentrantGuard +import akka.util.duration._ +import akka.actor.Timeout +import akka.dispatch.FutureTimeoutException +import java.util.concurrent.atomic.AtomicInteger /** * This trait brings log level handling to the EventStream: it reads the log @@ -28,6 +32,7 @@ trait LoggingBus extends ActorEventBus { private val guard = new ReentrantGuard private var loggers = Seq.empty[ActorRef] private var _logLevel: LogLevel = _ + private val loggerId = new AtomicInteger /** * Query currently set log level. See object Logging for more information. @@ -133,10 +138,17 @@ trait LoggingBus extends ActorEventBus { } private def addLogger(app: ActorSystem, clazz: Class[_ <: Actor], level: LogLevel): ActorRef = { - val actor = app.systemActorOf(Props(clazz), Props.randomName) - actor ! InitializeLogger(this) + val name = "log" + loggerId.incrementAndGet + "-" + simpleName(clazz) + val actor = app.systemActorOf(Props(clazz), name) + implicit val timeout = Timeout(3 seconds) + val response = try actor ? InitializeLogger(this) get catch { + case _: FutureTimeoutException ⇒ + publish(Warning(this, "Logger " + name + " did not respond within " + timeout + " to InitializeLogger(bus)")) + } + if (response != LoggerInitialized) + throw new LoggerInitializationException("Logger " + name + " did not respond with LoggerInitialized, sent instead " + response) AllLogLevels filter (level >= _) foreach (l ⇒ subscribe(actor, classFor(l))) - publish(Info(this, "logger " + clazz.getName + " started")) + publish(Info(this, "logger " + name + " started")) actor } @@ -275,10 +287,22 @@ object Logging { /** * Message which is sent to each default logger (i.e. from configuration file) * after its creation but before attaching it to the logging bus. The logger - * actor should handle this message, e.g. to register for more channels. + * actor must handle this message, it can be used e.g. to register for more + * channels. When done, the logger must respond with a LoggerInitialized + * message. This is necessary to ensure that additional subscriptions are in + * effect when the logging system finished starting. */ case class InitializeLogger(bus: LoggingBus) + /** + * Response message each logger must send within 1 second after receiving the + * InitializeLogger request. If initialization takes longer, send the reply + * as soon as subscriptions are set-up. + */ + case object LoggerInitialized + + class LoggerInitializationException(msg: String) extends AkkaException(msg) + trait StdOutLogger { import java.text.SimpleDateFormat import java.util.Date @@ -329,7 +353,7 @@ object Logging { def instanceName(instance: AnyRef): String = instance match { case null ⇒ "NULL" case a: ActorRef ⇒ a.address - case _ ⇒ instance.getClass.getSimpleName + case _ ⇒ simpleName(instance) } } @@ -357,7 +381,7 @@ object Logging { */ class DefaultLogger extends Actor with StdOutLogger { def receive = { - case InitializeLogger(_) ⇒ + case InitializeLogger(_) ⇒ sender ! LoggerInitialized case event: LogEvent ⇒ print(event) } } diff --git a/akka-slf4j/src/main/scala/akka/event/slf4j/SLF4J.scala b/akka-slf4j/src/main/scala/akka/event/slf4j/SLF4J.scala index 8253dcd248..80d913425a 100644 --- a/akka-slf4j/src/main/scala/akka/event/slf4j/SLF4J.scala +++ b/akka-slf4j/src/main/scala/akka/event/slf4j/SLF4J.scala @@ -49,7 +49,7 @@ class Slf4jEventHandler extends Actor with SLF4JLogging { logger(instance).debug("[{}] [{}]", event.thread.getName, message.asInstanceOf[AnyRef]) - case InitializeLogger(_) ⇒ log.info("Slf4jEventHandler started") + case InitializeLogger(_) ⇒ log.info("Slf4jEventHandler started"); sender ! LoggerInitialized } def logger(instance: AnyRef): SLFLogger = instance match { diff --git a/akka-testkit/src/main/scala/akka/testkit/TestEventListener.scala b/akka-testkit/src/main/scala/akka/testkit/TestEventListener.scala index 0e6ea66de5..9d5671c3af 100644 --- a/akka-testkit/src/main/scala/akka/testkit/TestEventListener.scala +++ b/akka-testkit/src/main/scala/akka/testkit/TestEventListener.scala @@ -8,7 +8,7 @@ import scala.util.matching.Regex import akka.actor.{ DeadLetter, ActorSystem } import akka.dispatch.SystemMessage -import akka.event.Logging.{ Warning, LogEvent, InitializeLogger, Info, Error, Debug } +import akka.event.Logging.{ Warning, LogEvent, InitializeLogger, Info, Error, Debug, LoggerInitialized } import akka.event.Logging import akka.testkit.TestEvent.{ UnMute, Mute } import akka.util.Duration @@ -445,10 +445,12 @@ class TestEventListener extends Logging.DefaultLogger { var filters: List[EventFilter] = Nil override def receive = { - case InitializeLogger(bus) ⇒ Seq(classOf[Mute], classOf[UnMute], classOf[DeadLetter]) foreach (bus.subscribe(context.self, _)) - case Mute(filters) ⇒ filters foreach addFilter - case UnMute(filters) ⇒ filters foreach removeFilter - case event: LogEvent ⇒ if (!filter(event)) print(event) + case InitializeLogger(bus) ⇒ + Seq(classOf[Mute], classOf[UnMute], classOf[DeadLetter]) foreach (bus.subscribe(context.self, _)) + sender ! LoggerInitialized + case Mute(filters) ⇒ filters foreach addFilter + case UnMute(filters) ⇒ filters foreach removeFilter + case event: LogEvent ⇒ if (!filter(event)) print(event) case DeadLetter(msg: SystemMessage, _, rcp) ⇒ val event = Warning(rcp, "received dead system message: " + msg) if (!filter(event)) print(event)