From d0498eb32efeed165eeda9c5be10e3147316ca8b Mon Sep 17 00:00:00 2001 From: Roland Date: Wed, 11 Jan 2012 14:14:08 +0100 Subject: [PATCH] add Class[_] to LogEvent - it is customary to use class name for categorizing logs, hence we should support it; class is taken from logSource.getClass - update SLF4J module to use logClass as category and set logSource in MDC "akkaSource" - add docs --- .../test/scala/akka/actor/FSMActorSpec.scala | 12 +- .../akka/actor/dispatch/ActorModelSpec.scala | 12 +- .../scala/akka/event/EventStreamSpec.scala | 2 +- .../scala/akka/event/LoggingReceiveSpec.scala | 36 ++-- .../src/main/scala/akka/actor/ActorCell.scala | 43 ++-- .../main/scala/akka/actor/ActorSystem.scala | 2 +- .../akka/dispatch/AbstractDispatcher.scala | 2 +- .../main/scala/akka/dispatch/Dispatcher.scala | 2 +- .../scala/akka/dispatch/Dispatchers.scala | 2 +- .../src/main/scala/akka/dispatch/Future.scala | 4 +- .../main/scala/akka/dispatch/Mailbox.scala | 2 +- .../main/scala/akka/event/EventStream.scala | 6 +- .../src/main/scala/akka/event/Logging.scala | 189 ++++++++++++++---- .../scala/akka/event/LoggingReceive.scala | 2 +- akka-actor/src/main/scala/akka/util/JMX.scala | 4 +- akka-docs/java/logging.rst | 17 +- .../code/akka/docs/actor/ActorDocSpec.scala | 4 +- .../code/akka/docs/event/LoggingDocSpec.scala | 10 +- akka-docs/scala/logging.rst | 17 +- .../src/main/scala/akka/remote/Remote.scala | 2 +- .../remote/netty/NettyRemoteSupport.scala | 2 +- .../akka/remote/RemoteDeathWatchSpec.scala | 2 + .../main/scala/akka/event/slf4j/SLF4J.scala | 36 ++-- .../akka/testkit/TestEventListener.scala | 20 +- .../scala/akka/testkit/TestActorRefSpec.scala | 2 +- 25 files changed, 295 insertions(+), 137 deletions(-) diff --git a/akka-actor-tests/src/test/scala/akka/actor/FSMActorSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/FSMActorSpec.scala index a6d6a7df98..c5ed765aab 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/FSMActorSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/FSMActorSpec.scala @@ -171,7 +171,7 @@ class FSMActorSpec extends AkkaSpec(Map("akka.actor.debug.fsm" -> true)) with Im system.eventStream.subscribe(testActor, classOf[Logging.Error]) fsm ! "go" expectMsgPF(1 second, hint = "Next state 2 does not exist") { - case Logging.Error(_, `name`, "Next state 2 does not exist") ⇒ true + case Logging.Error(_, `name`, _, "Next state 2 does not exist") ⇒ true } system.eventStream.unsubscribe(testActor) } @@ -221,15 +221,15 @@ class FSMActorSpec extends AkkaSpec(Map("akka.actor.debug.fsm" -> true)) with Im system.eventStream.subscribe(testActor, classOf[Logging.Debug]) fsm ! "go" expectMsgPF(1 second, hint = "processing Event(go,null)") { - case Logging.Debug(`name`, s: String) if s.startsWith("processing Event(go,null) from Actor[") ⇒ true + case Logging.Debug(`name`, _, s: String) if s.startsWith("processing Event(go,null) from Actor[") ⇒ true } - expectMsg(1 second, Logging.Debug(name, "setting timer 't'/1500 milliseconds: Shutdown")) - expectMsg(1 second, Logging.Debug(name, "transition 1 -> 2")) + expectMsg(1 second, Logging.Debug(name, fsm.underlyingActor.getClass, "setting timer 't'/1500 milliseconds: Shutdown")) + expectMsg(1 second, Logging.Debug(name, fsm.underlyingActor.getClass, "transition 1 -> 2")) fsm ! "stop" expectMsgPF(1 second, hint = "processing Event(stop,null)") { - case Logging.Debug(`name`, s: String) if s.startsWith("processing Event(stop,null) from Actor[") ⇒ true + case Logging.Debug(`name`, _, s: String) if s.startsWith("processing Event(stop,null) from Actor[") ⇒ true } - expectMsgAllOf(1 second, Logging.Debug(name, "canceling timer 't'"), Normal) + expectMsgAllOf(1 second, Logging.Debug(name, fsm.underlyingActor.getClass, "canceling timer 't'"), Normal) expectNoMsg(1 second) system.eventStream.unsubscribe(testActor) } diff --git a/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala index 9debbd053c..b0d831dc77 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala @@ -151,7 +151,7 @@ object ActorModelSpec { await(deadline)(stops == dispatcher.stops.get) } catch { case e ⇒ - system.eventStream.publish(Error(e, dispatcher.toString, "actual: stops=" + dispatcher.stops.get + + system.eventStream.publish(Error(e, dispatcher.toString, dispatcher.getClass, "actual: stops=" + dispatcher.stops.get + " required: stops=" + stops)) throw e } @@ -208,9 +208,11 @@ object ActorModelSpec { await(deadline)(stats.restarts.get() == restarts) } catch { case e ⇒ - system.eventStream.publish(Error(e, Option(dispatcher).toString, "actual: " + stats + ", required: InterceptorStats(susp=" + suspensions + - ",res=" + resumes + ",reg=" + registers + ",unreg=" + unregisters + - ",recv=" + msgsReceived + ",proc=" + msgsProcessed + ",restart=" + restarts)) + system.eventStream.publish(Error(e, Option(dispatcher).toString, + if (dispatcher ne null) dispatcher.getClass else this.getClass, + "actual: " + stats + ", required: InterceptorStats(susp=" + suspensions + + ",res=" + resumes + ",reg=" + registers + ",unreg=" + unregisters + + ",recv=" + msgsReceived + ",proc=" + msgsProcessed + ",restart=" + restarts)) throw e } } @@ -311,7 +313,7 @@ abstract class ActorModelSpec(config: String) extends AkkaSpec(config) with Defa try { f } catch { - case e ⇒ system.eventStream.publish(Error(e, "spawn", "error in spawned thread")) + case e ⇒ system.eventStream.publish(Error(e, "spawn", this.getClass, "error in spawned thread")) } } } diff --git a/akka-actor-tests/src/test/scala/akka/event/EventStreamSpec.scala b/akka-actor-tests/src/test/scala/akka/event/EventStreamSpec.scala index f6e5b92201..9a41c80f6d 100644 --- a/akka-actor-tests/src/test/scala/akka/event/EventStreamSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/event/EventStreamSpec.scala @@ -108,7 +108,7 @@ class EventStreamSpec extends AkkaSpec(EventStreamSpec.config) { private def verifyLevel(bus: LoggingBus, level: Logging.LogLevel) { import Logging._ - val allmsg = Seq(Debug("", "debug"), Info("", "info"), Warning("", "warning"), Error("", "error")) + val allmsg = Seq(Debug("", null, "debug"), Info("", null, "info"), Warning("", null, "warning"), Error("", null, "error")) val msg = allmsg filter (_.level <= level) allmsg foreach bus.publish msg foreach (x ⇒ expectMsg(x)) diff --git a/akka-actor-tests/src/test/scala/akka/event/LoggingReceiveSpec.scala b/akka-actor-tests/src/test/scala/akka/event/LoggingReceiveSpec.scala index 15f8646d4b..6d524729dd 100644 --- a/akka-actor-tests/src/test/scala/akka/event/LoggingReceiveSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/event/LoggingReceiveSpec.scala @@ -59,7 +59,7 @@ class LoggingReceiveSpec extends WordSpec with BeforeAndAfterEach with BeforeAnd } val log = LoggingReceive("funky")(r) log.isDefinedAt("hallo") - expectMsg(1 second, Logging.Debug("funky", "received unhandled message hallo")) + expectMsg(1 second, Logging.Debug("funky", classOf[String], "received unhandled message hallo")) } } @@ -83,7 +83,7 @@ class LoggingReceiveSpec extends WordSpec with BeforeAndAfterEach with BeforeAnd val name = actor.path.toString actor ! "buh" within(1 second) { - expectMsg(Logging.Debug(name, "received handled message buh")) + expectMsg(Logging.Debug(name, actor.underlyingActor.getClass, "received handled message buh")) expectMsg("x") } @@ -109,7 +109,7 @@ class LoggingReceiveSpec extends WordSpec with BeforeAndAfterEach with BeforeAnd }) actor ! "buh" within(1 second) { - expectMsg(Logging.Debug(actor.path.toString, "received handled message buh")) + expectMsg(Logging.Debug(actor.path.toString, actor.underlyingActor.getClass, "received handled message buh")) expectMsg("x") } } @@ -130,7 +130,7 @@ class LoggingReceiveSpec extends WordSpec with BeforeAndAfterEach with BeforeAnd val name = actor.path.toString actor ! PoisonPill expectMsgPF() { - case Logging.Debug(`name`, msg: String) if msg startsWith "received AutoReceiveMessage Envelope(PoisonPill" ⇒ true + case Logging.Debug(`name`, _, msg: String) if msg startsWith "received AutoReceiveMessage Envelope(PoisonPill" ⇒ true } awaitCond(actor.isTerminated, 100 millis) } @@ -142,7 +142,7 @@ class LoggingReceiveSpec extends WordSpec with BeforeAndAfterEach with BeforeAnd val sys = impl.systemGuardian.path.toString ignoreMute(this) ignoreMsg { - case Logging.Debug(s, _) ⇒ s.contains("MainBusReaper") || s == sys + case Logging.Debug(`sys`, _, _) ⇒ true } system.eventStream.subscribe(testActor, classOf[Logging.Debug]) system.eventStream.subscribe(testActor, classOf[Logging.Error]) @@ -151,51 +151,53 @@ class LoggingReceiveSpec extends WordSpec with BeforeAndAfterEach with BeforeAnd val lname = lifecycleGuardian.path.toString val supervisor = TestActorRef[TestLogActor](Props[TestLogActor].withFaultHandler(OneForOneStrategy(List(classOf[Throwable]), 5, 5000))) val sname = supervisor.path.toString + val sclass = classOf[TestLogActor] val supervisorSet = receiveWhile(messages = 2) { - case Logging.Debug(`lname`, msg: String) if msg startsWith "now supervising" ⇒ 1 - case Logging.Debug(`sname`, msg: String) if msg startsWith "started" ⇒ 2 + case Logging.Debug(`lname`, _, msg: String) if msg startsWith "now supervising" ⇒ 1 + case Logging.Debug(`sname`, `sclass`, msg: String) if msg startsWith "started" ⇒ 2 }.toSet expectNoMsg(Duration.Zero) assert(supervisorSet == Set(1, 2), supervisorSet + " was not Set(1, 2)") val actor = TestActorRef[TestLogActor](Props[TestLogActor], supervisor, "none") val aname = actor.path.toString + val aclass = classOf[TestLogActor] val set = receiveWhile(messages = 2) { - case Logging.Debug(`sname`, msg: String) if msg startsWith "now supervising" ⇒ 1 - case Logging.Debug(`aname`, msg: String) if msg startsWith "started" ⇒ 2 + case Logging.Debug(`sname`, _, msg: String) if msg startsWith "now supervising" ⇒ 1 + case Logging.Debug(`aname`, `aclass`, msg: String) if msg startsWith "started" ⇒ 2 }.toSet expectNoMsg(Duration.Zero) assert(set == Set(1, 2), set + " was not Set(1, 2)") supervisor watch actor expectMsgPF(hint = "now monitoring") { - case Logging.Debug(ref, msg: String) ⇒ + case Logging.Debug(ref, `sclass`, msg: String) ⇒ ref == supervisor.underlyingActor && msg.startsWith("now monitoring") } supervisor unwatch actor expectMsgPF(hint = "stopped monitoring") { - case Logging.Debug(ref, msg: String) ⇒ + case Logging.Debug(ref, `sclass`, msg: String) ⇒ ref == supervisor.underlyingActor && msg.startsWith("stopped monitoring") } EventFilter[ActorKilledException](occurrences = 1) intercept { actor ! Kill val set = receiveWhile(messages = 3) { - case Logging.Error(_: ActorKilledException, `aname`, "Kill") ⇒ 1 - case Logging.Debug(`aname`, "restarting") ⇒ 2 - case Logging.Debug(`aname`, "restarted") ⇒ 3 + case Logging.Error(_: ActorKilledException, `aname`, `aclass`, "Kill") ⇒ 1 + case Logging.Debug(`aname`, `aclass`, "restarting") ⇒ 2 + case Logging.Debug(`aname`, `aclass`, "restarted") ⇒ 3 }.toSet expectNoMsg(Duration.Zero) assert(set == Set(1, 2, 3), set + " was not Set(1, 2, 3)") } system.stop(supervisor) - expectMsg(Logging.Debug(sname, "stopping")) - expectMsg(Logging.Debug(aname, "stopped")) - expectMsg(Logging.Debug(sname, "stopped")) + expectMsg(Logging.Debug(sname, `sclass`, "stopping")) + expectMsg(Logging.Debug(aname, `aclass`, "stopped")) + expectMsg(Logging.Debug(sname, `sclass`, "stopped")) } } } diff --git a/akka-actor/src/main/scala/akka/actor/ActorCell.scala b/akka-actor/src/main/scala/akka/actor/ActorCell.scala index eaad8d0610..5454d54d23 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorCell.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorCell.scala @@ -358,12 +358,12 @@ private[akka] class ActorCell( actor = created created.preStart() checkReceiveTimeout - if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, "started (" + actor + ")")) + if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, clazz(created), "started (" + created + ")")) } catch { // FIXME catching all and continue isn't good for OOME, ticket #1418 case e ⇒ try { - system.eventStream.publish(Error(e, self.path.toString, "error while creating actor")) + system.eventStream.publish(Error(e, self.path.toString, clazz(actor), "error while creating actor")) // prevent any further messages to be processed until the actor has been restarted dispatcher.suspend(this) } finally { @@ -373,7 +373,7 @@ private[akka] class ActorCell( def recreate(cause: Throwable): Unit = try { val failedActor = actor - if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, "restarting")) + if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, clazz(failedActor), "restarting")) val freshActor = newActor() if (failedActor ne null) { val c = currentMessage //One read only plz @@ -388,7 +388,7 @@ private[akka] class ActorCell( actor = freshActor // assign it here so if preStart fails, we can null out the sef-refs next call hotswap = Props.noHotSwap // Reset the behavior freshActor.postRestart(cause) - if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, "restarted")) + if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, clazz(freshActor), "restarted")) dispatcher.resume(this) //FIXME should this be moved down? @@ -396,7 +396,7 @@ private[akka] class ActorCell( } catch { // FIXME catching all and continue isn't good for OOME, ticket #1418 case e ⇒ try { - system.eventStream.publish(Error(e, self.path.toString, "error while creating actor")) + system.eventStream.publish(Error(e, self.path.toString, clazz(actor), "error while creating actor")) // prevent any further messages to be processed until the actor has been restarted dispatcher.suspend(this) } finally { @@ -417,7 +417,7 @@ private[akka] class ActorCell( else { // do not process normal messages while waiting for all children to terminate dispatcher suspend this - if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, "stopping")) + if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, clazz(actor), "stopping")) // do not use stop(child) because that would dissociate the children from us, but we still want to wait for them for (child ← c) child.asInstanceOf[InternalActorRef].stop() stopping = true @@ -428,12 +428,12 @@ private[akka] class ActorCell( childrenRefs.get(child.path.name) match { case None ⇒ childrenRefs = childrenRefs.updated(child.path.name, ChildRestartStats(child)) - if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, "now supervising " + child)) + if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, clazz(actor), "now supervising " + child)) case Some(ChildRestartStats(`child`, _, _)) ⇒ // this is the nominal case where we created the child and entered it in actorCreated() above - if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, "now supervising " + child)) + if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, clazz(actor), "now supervising " + child)) case Some(ChildRestartStats(c, _, _)) ⇒ - system.eventStream.publish(Warning(self.path.toString, "Already supervising other child with same name '" + child.path.name + "', old: " + c + " new: " + child)) + system.eventStream.publish(Warning(self.path.toString, clazz(actor), "Already supervising other child with same name '" + child.path.name + "', old: " + c + " new: " + child)) } } @@ -448,10 +448,10 @@ private[akka] class ActorCell( case Recreate(cause) ⇒ recreate(cause) case Link(subject) ⇒ system.deathWatch.subscribe(self, subject) - if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, "now monitoring " + subject)) + if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, clazz(actor), "now monitoring " + subject)) case Unlink(subject) ⇒ system.deathWatch.unsubscribe(self, subject) - if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, "stopped monitoring " + subject)) + if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, clazz(actor), "stopped monitoring " + subject)) case Suspend() ⇒ suspend() case Resume() ⇒ resume() case Terminate() ⇒ terminate() @@ -460,7 +460,7 @@ private[akka] class ActorCell( } } catch { case e ⇒ //Should we really catch everything here? - system.eventStream.publish(Error(e, self.path.toString, "error while processing " + message)) + system.eventStream.publish(Error(e, self.path.toString, clazz(actor), "error while processing " + message)) //TODO FIXME How should problems here be handled??? throw e } @@ -480,7 +480,7 @@ private[akka] class ActorCell( currentMessage = null // reset current message after successful invocation } catch { case e ⇒ - system.eventStream.publish(Error(e, self.path.toString, e.getMessage)) + system.eventStream.publish(Error(e, self.path.toString, clazz(actor), e.getMessage)) // prevent any further messages to be processed until the actor has been restarted dispatcher.suspend(this) @@ -500,7 +500,7 @@ private[akka] class ActorCell( } } catch { case e ⇒ - system.eventStream.publish(Error(e, self.path.toString, e.getMessage)) + system.eventStream.publish(Error(e, self.path.toString, clazz(actor), e.getMessage)) throw e } } @@ -530,7 +530,8 @@ private[akka] class ActorCell( } def autoReceiveMessage(msg: Envelope) { - if (system.settings.DebugAutoReceive) system.eventStream.publish(Debug(self.path.toString, "received AutoReceiveMessage " + msg)) + if (system.settings.DebugAutoReceive) + system.eventStream.publish(Debug(self.path.toString, clazz(actor), "received AutoReceiveMessage " + msg)) msg.message match { case Failed(cause) ⇒ handleFailure(sender, cause) @@ -554,7 +555,8 @@ private[akka] class ActorCell( try { parent.sendSystemMessage(ChildTerminated(self)) system.deathWatch.publish(Terminated(self)) - if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, "stopped")) + if (system.settings.DebugLifecycle) + system.eventStream.publish(Debug(self.path.toString, clazz(actor), "stopped")) // FIXME: can actor be null? } finally { currentMessage = null clearActorFields() @@ -565,8 +567,8 @@ private[akka] class ActorCell( final def handleFailure(child: ActorRef, cause: Throwable): Unit = childrenRefs.get(child.path.name) match { case Some(stats) if stats.child == child ⇒ if (!props.faultHandler.handleFailure(this, child, cause, stats, childrenRefs.values)) throw cause - case Some(stats) ⇒ system.eventStream.publish(Warning(self.path.toString, "dropping Failed(" + cause + ") from unknown child " + child + " matching names but not the same, was: " + stats.child)) - case None ⇒ system.eventStream.publish(Warning(self.path.toString, "dropping Failed(" + cause + ") from unknown child " + child)) + case Some(stats) ⇒ system.eventStream.publish(Warning(self.path.toString, clazz(actor), "dropping Failed(" + cause + ") from unknown child " + child + " matching names but not the same, was: " + stats.child)) + case None ⇒ system.eventStream.publish(Warning(self.path.toString, clazz(actor), "dropping Failed(" + cause + ") from unknown child " + child)) } final def handleChildTerminated(child: ActorRef): Unit = { @@ -625,4 +627,9 @@ private[akka] class ActorCell( lookupAndSetField(a.getClass, a, "self", self) } } + + private def clazz(o: AnyRef): Class[_] = { + if (o eq null) this.getClass + else o.getClass + } } diff --git a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala index aafbe1f0e3..a4f1c2c37c 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala @@ -330,7 +330,7 @@ class ActorSystemImpl(val name: String, applicationConfig: Config) extends Actor // this provides basic logging (to stdout) until .start() is called below val eventStream = new EventStream(DebugEventStream) eventStream.startStdoutLogger(settings) - val log = new BusLogging(eventStream, "ActorSystem") // “this” used only for .getClass in tagging messages + val log = new BusLogging(eventStream, "ActorSystem", this.getClass) val scheduler = createScheduler() diff --git a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala index 52f35fd952..d9b45ea7c8 100644 --- a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala @@ -80,7 +80,7 @@ final case class TaskInvocation(eventStream: EventStream, runnable: Runnable, cl runnable.run() } catch { // FIXME catching all and continue isn't good for OOME, ticket #1418 - case e ⇒ eventStream.publish(Error(e, "TaskInvocation", e.getMessage)) + case e ⇒ eventStream.publish(Error(e, "TaskInvocation", this.getClass, e.getMessage)) } finally { cleanup() } diff --git a/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala index 79331e0397..2511dbc8e2 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala @@ -59,7 +59,7 @@ class Dispatcher( executorService.get() execute invocation } catch { case e2: RejectedExecutionException ⇒ - prerequisites.eventStream.publish(Warning("Dispatcher", e2.toString)) + prerequisites.eventStream.publish(Warning("Dispatcher", this.getClass, e2.toString)) throw e2 } } diff --git a/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala b/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala index 491d79a63b..942bd25a65 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala @@ -77,7 +77,7 @@ class Dispatchers(val settings: ActorSystem.Settings, val prerequisites: Dispatc } else { // Note that the configurator of the default dispatcher will be registered for this id, // so this will only be logged once, which is crucial. - prerequisites.eventStream.publish(Warning("Dispatchers", + prerequisites.eventStream.publish(Warning("Dispatchers", this.getClass, "Dispatcher [%s] not configured, using default-dispatcher".format(id))) lookupConfigurator(DefaultDispatcherId) } diff --git a/akka-actor/src/main/scala/akka/dispatch/Future.scala b/akka-actor/src/main/scala/akka/dispatch/Future.scala index 933a263732..0675f1c9f2 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Future.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Future.scala @@ -325,7 +325,7 @@ object Future { // FIXME catching all and continue isn't good for OOME, ticket #1418 executor match { case m: MessageDispatcher ⇒ - m.prerequisites.eventStream.publish(Error(e, "Future.dispatchTask", e.getMessage)) + m.prerequisites.eventStream.publish(Error(e, "Future.dispatchTask", this.getClass, e.getMessage)) case other ⇒ e.printStackTrace() } @@ -566,7 +566,7 @@ sealed trait Future[+T] extends japi.Future[T] with Await.Awaitable[T] { protected def logError(msg: String, problem: Throwable): Unit = { executor match { - case m: MessageDispatcher ⇒ m.prerequisites.eventStream.publish(Error(problem, msg, problem.getMessage)) + case m: MessageDispatcher ⇒ m.prerequisites.eventStream.publish(Error(problem, msg, this.getClass, problem.getMessage)) case other ⇒ problem.printStackTrace() } } diff --git a/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala b/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala index 3abd961d0f..0da0bf13af 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala @@ -214,7 +214,7 @@ private[akka] abstract class Mailbox(val actor: ActorCell) extends MessageQueue } } catch { case e ⇒ - actor.system.eventStream.publish(Error(e, actor.self.path.toString, "exception during processing system messages, dropping " + SystemMessage.size(nextMessage) + " messages!")) + actor.system.eventStream.publish(Error(e, actor.self.path.toString, actor.actor.getClass, "exception during processing system messages, dropping " + SystemMessage.size(nextMessage) + " messages!")) throw e } } diff --git a/akka-actor/src/main/scala/akka/event/EventStream.scala b/akka-actor/src/main/scala/akka/event/EventStream.scala index eea9deff35..6ad2d0fbdf 100644 --- a/akka-actor/src/main/scala/akka/event/EventStream.scala +++ b/akka-actor/src/main/scala/akka/event/EventStream.scala @@ -38,19 +38,19 @@ class EventStream(private val debug: Boolean = false) extends LoggingBus with Su } override def subscribe(subscriber: ActorRef, channel: Class[_]): Boolean = { - if (debug) publish(Logging.Debug(simpleName(this), "subscribing " + subscriber + " to channel " + channel)) + if (debug) publish(Logging.Debug(simpleName(this), this.getClass, "subscribing " + subscriber + " to channel " + channel)) super.subscribe(subscriber, channel) } override def unsubscribe(subscriber: ActorRef, channel: Class[_]): Boolean = { val ret = super.unsubscribe(subscriber, channel) - if (debug) publish(Logging.Debug(simpleName(this), "unsubscribing " + subscriber + " from channel " + channel)) + if (debug) publish(Logging.Debug(simpleName(this), this.getClass, "unsubscribing " + subscriber + " from channel " + channel)) ret } override def unsubscribe(subscriber: ActorRef) { super.unsubscribe(subscriber) - if (debug) publish(Logging.Debug(simpleName(this), "unsubscribing " + subscriber + " from all channels")) + if (debug) publish(Logging.Debug(simpleName(this), this.getClass, "unsubscribing " + subscriber + " from all channels")) } } \ No newline at end of file diff --git a/akka-actor/src/main/scala/akka/event/Logging.scala b/akka-actor/src/main/scala/akka/event/Logging.scala index bfd0f2a184..83bff79617 100644 --- a/akka-actor/src/main/scala/akka/event/Logging.scala +++ b/akka-actor/src/main/scala/akka/event/Logging.scala @@ -16,10 +16,6 @@ import scala.util.control.NoStackTrace import java.util.concurrent.TimeoutException import akka.dispatch.Await -object LoggingBus { - implicit def fromActorSystem(system: ActorSystem): LoggingBus = system.eventStream -} - /** * This trait brings log level handling to the EventStream: it reads the log * levels for the initial logging (StandardOutLogger) and the loggers & level @@ -75,7 +71,7 @@ trait LoggingBus extends ActorEventBus { */ private[akka] def startStdoutLogger(config: Settings) { val level = levelFor(config.StdoutLogLevel) getOrElse { - StandardOutLogger.print(Error(new EventHandlerException, simpleName(this), "unknown akka.stdout-loglevel " + config.StdoutLogLevel)) + StandardOutLogger.print(Error(new EventHandlerException, simpleName(this), this.getClass, "unknown akka.stdout-loglevel " + config.StdoutLogLevel)) ErrorLevel } AllLogLevels filter (level >= _) foreach (l ⇒ subscribe(StandardOutLogger, classFor(l))) @@ -83,7 +79,7 @@ trait LoggingBus extends ActorEventBus { loggers = Seq(StandardOutLogger) _logLevel = level } - publish(Debug(simpleName(this), "StandardOutLogger started")) + publish(Debug(simpleName(this), this.getClass, "StandardOutLogger started")) } /** @@ -91,7 +87,7 @@ trait LoggingBus extends ActorEventBus { */ private[akka] def startDefaultLoggers(system: ActorSystemImpl) { val level = levelFor(system.settings.LogLevel) getOrElse { - StandardOutLogger.print(Error(new EventHandlerException, simpleName(this), "unknown akka.stdout-loglevel " + system.settings.LogLevel)) + StandardOutLogger.print(Error(new EventHandlerException, simpleName(this), this.getClass, "unknown akka.stdout-loglevel " + system.settings.LogLevel)) ErrorLevel } try { @@ -119,7 +115,7 @@ trait LoggingBus extends ActorEventBus { loggers = myloggers _logLevel = level } - publish(Debug(simpleName(this), "Default Loggers started")) + publish(Debug(simpleName(this), this.getClass, "Default Loggers started")) if (!(defaultLoggers contains StandardOutLoggerName)) { unsubscribe(StandardOutLogger) } @@ -138,7 +134,7 @@ trait LoggingBus extends ActorEventBus { val level = _logLevel // volatile access before reading loggers if (!(loggers contains StandardOutLogger)) { AllLogLevels filter (level >= _) foreach (l ⇒ subscribe(StandardOutLogger, classFor(l))) - publish(Debug(simpleName(this), "shutting down: StandardOutLogger started")) + publish(Debug(simpleName(this), this.getClass, "shutting down: StandardOutLogger started")) } for { logger ← loggers @@ -151,7 +147,7 @@ trait LoggingBus extends ActorEventBus { case _ ⇒ } } - publish(Debug(simpleName(this), "all default loggers stopped")) + publish(Debug(simpleName(this), this.getClass, "all default loggers stopped")) } private def addLogger(system: ActorSystemImpl, clazz: Class[_ <: Actor], level: LogLevel): ActorRef = { @@ -160,12 +156,12 @@ trait LoggingBus extends ActorEventBus { implicit val timeout = Timeout(3 seconds) val response = try Await.result(actor ? InitializeLogger(this), timeout.duration) catch { case _: TimeoutException ⇒ - publish(Warning(simpleName(this), "Logger " + name + " did not respond within " + timeout + " to InitializeLogger(bus)")) + publish(Warning(simpleName(this), this.getClass, "Logger " + name + " did not respond within " + timeout + " to InitializeLogger(bus)")) } if (response != LoggerInitialized) throw new LoggerInitializationException("Logger " + name + " did not respond with LoggerInitialized, sent instead " + response) AllLogLevels filter (level >= _) foreach (l ⇒ subscribe(actor, classFor(l))) - publish(Debug(simpleName(this), "logger " + name + " started")) + publish(Debug(simpleName(this), this.getClass, "logger " + name + " started")) actor } @@ -217,6 +213,11 @@ object LogSource { * ... * log.info("hello world!") * + * + * The source object is used in two fashions: its `Class[_]` will be part of + * all log events produced by this logger, plus a string representation is + * generated which may contain per-instance information, see `apply` or `create` + * below. * * Loggers are attached to the level-specific channels Error, * Warning, Info and Debug as @@ -305,13 +306,11 @@ object Logging { val debugFormat = "[DEBUG] [%s] [%s] [%s] %s".intern /** - * Obtain LoggingAdapter for the given event stream (system) and source object. - * Note that there is an implicit conversion from [[akka.actor.ActorSystem]] - * to [[akka.event.LoggingBus]]. + * Obtain LoggingAdapter for the given logging bus and source object. * * The source is used to identify the source of this logging channel and must have - * a corresponding LogSource[T] instance in scope; by default these are - * provided for Class[_], Actor, ActorRef and String types. The source + * a corresponding implicit LogSource[T] instance in scope; by default these are + * provided for Class[_], Actor, ActorRef and String types. By these, the source * object is translated to a String according to the following rules: * + * + * You can add your own rules quite easily: + * + * {{{ + * trait MyType { // as an example + * def name: String + * } + * + * implicit val myLogSourceType: LogSource[MyType] = new LogSource { + * def genString(a: MyType) = a.name + * } + * + * class MyClass extends MyType { + * val log = Logging(eventStream, this) // will use "hallo" as logSource + * def name = "hallo" + * } + * }}} */ - def apply[T: LogSource](eventStream: LoggingBus, logSource: T): LoggingAdapter = - new BusLogging(eventStream, implicitly[LogSource[T]].genString(logSource)) + def apply[T: LogSource](bus: LoggingBus, logSource: T): LoggingAdapter = + new BusLogging(bus, implicitly[LogSource[T]].genString(logSource), logSource.getClass) /** - * Java API: Obtain LoggingAdapter for the given system and source object. The - * source object is used to identify the source of this logging channel. The source + * Obtain LoggingAdapter for the given actor system and source object. This + * will use the system’s event stream. + * + * The source is used to identify the source of this logging channel and must have + * a corresponding implicit LogSource[T] instance in scope; by default these are + * provided for Class[_], Actor, ActorRef and String types. By these, the source * object is translated to a String according to the following rules: * + * + * You can add your own rules quite easily: + * + * {{{ + * trait MyType { // as an example + * def name: String + * } + * + * implicit val myLogSourceType: LogSource[MyType] = new LogSource { + * def genString(a: MyType) = a.name + * } + * + * class MyClass extends MyType { + * val log = Logging(eventStream, this) // will use "hallo" as logSource + * def name = "hallo" + * } + * }}} */ - def getLogger(system: ActorSystem, logSource: AnyRef): LoggingAdapter = apply(system.eventStream, LogSource.fromAnyRef(logSource)) + def apply[T: LogSource](system: ActorSystem, logSource: T): LoggingAdapter = + new BusLogging(system.eventStream, implicitly[LogSource[T]].genString(logSource), logSource.getClass) /** - * Java API: Obtain LoggingAdapter for the given event bus and source object. The - * source object is used to identify the source of this logging channel. + * Obtain LoggingAdapter for the given actor system and source object. This + * will use the system’s event stream. + * + * The source is used to identify the source of this logging channel and must have + * a corresponding implicit LogSource[T] instance in scope; by default these are + * provided for Class[_], Actor, ActorRef and String types. By these, the source + * object is translated to a String according to the following rules: + * + * + * You can add your own rules quite easily: + * + * {{{ + * trait MyType { // as an example + * def name: String + * } + * + * implicit val myLogSourceType: LogSource[MyType] = new LogSource { + * def genString(a: MyType) = a.name + * } + * + * class MyClass extends MyType { + * val log = Logging(eventStream, this) // will use "hallo" as logSource + * def name = "hallo" + * } + * }}} + */ + def getLogger(system: ActorSystem, logSource: AnyRef): LoggingAdapter = apply(system, LogSource.fromAnyRef(logSource)) + + /** + * Obtain LoggingAdapter for the given logging bus and source object. This + * will use the system’s event stream. + * + * The source is used to identify the source of this logging channel and must have + * a corresponding implicit LogSource[T] instance in scope; by default these are + * provided for Class[_], Actor, ActorRef and String types. By these, the source + * object is translated to a String according to the following rules: + * + * + * You can add your own rules quite easily: + * + * {{{ + * trait MyType { // as an example + * def name: String + * } + * + * implicit val myLogSourceType: LogSource[MyType] = new LogSource { + * def genString(a: MyType) = a.name + * } + * + * class MyClass extends MyType { + * val log = Logging(eventStream, this) // will use "hallo" as logSource + * def name = "hallo" + * } + * }}} */ def getLogger(bus: LoggingBus, logSource: AnyRef): LoggingAdapter = apply(bus, LogSource.fromAnyRef(logSource)) @@ -362,19 +462,34 @@ object Logging { * The LogLevel of this LogEvent */ def level: LogLevel + + /** + * The source of this event + */ + def logSource: String + + /** + * The class of the source of this event + */ + def logClass: Class[_] + + /** + * The message, may be any object or null. + */ + def message: Any } /** * For ERROR Logging */ - case class Error(cause: Throwable, logSource: String, message: Any = "") extends LogEvent { - def this(logSource: String, message: Any) = this(Error.NoCause, logSource, message) + case class Error(cause: Throwable, logSource: String, logClass: Class[_], message: Any = "") extends LogEvent { + def this(logSource: String, logClass: Class[_], message: Any) = this(Error.NoCause, logSource, logClass, message) override def level = ErrorLevel } object Error { - def apply(logSource: String, message: Any) = new Error(NoCause, logSource, message) + def apply(logSource: String, logClass: Class[_], message: Any) = new Error(NoCause, logSource, logClass, message) /** Null Object used for errors without cause Throwable */ object NoCause extends NoStackTrace @@ -383,21 +498,21 @@ object Logging { /** * For WARNING Logging */ - case class Warning(logSource: String, message: Any = "") extends LogEvent { + case class Warning(logSource: String, logClass: Class[_], message: Any = "") extends LogEvent { override def level = WarningLevel } /** * For INFO Logging */ - case class Info(logSource: String, message: Any = "") extends LogEvent { + case class Info(logSource: String, logClass: Class[_], message: Any = "") extends LogEvent { override def level = InfoLevel } /** * For DEBUG Logging */ - case class Debug(logSource: String, message: Any = "") extends LogEvent { + case class Debug(logSource: String, logClass: Class[_], message: Any = "") extends LogEvent { override def level = DebugLevel } @@ -439,7 +554,7 @@ object Logging { case e: Warning ⇒ warning(e) case e: Info ⇒ info(e) case e: Debug ⇒ debug(e) - case e ⇒ warning(Warning(simpleName(this), "received unexpected event of class " + e.getClass + ": " + e)) + case e ⇒ warning(Warning(simpleName(this), this.getClass, "received unexpected event of class " + e.getClass + ": " + e)) } } @@ -626,7 +741,7 @@ trait LoggingAdapter { } } -class BusLogging(val bus: LoggingBus, val logSource: String) extends LoggingAdapter { +class BusLogging(val bus: LoggingBus, val logSource: String, val logClass: Class[_]) extends LoggingAdapter { import Logging._ @@ -635,14 +750,14 @@ class BusLogging(val bus: LoggingBus, val logSource: String) extends LoggingAdap def isInfoEnabled = bus.logLevel >= InfoLevel def isDebugEnabled = bus.logLevel >= DebugLevel - protected def notifyError(message: String) { bus.publish(Error(logSource, message)) } + protected def notifyError(message: String) { bus.publish(Error(logSource, logClass, message)) } - protected def notifyError(cause: Throwable, message: String) { bus.publish(Error(cause, logSource, message)) } + protected def notifyError(cause: Throwable, message: String) { bus.publish(Error(cause, logSource, logClass, message)) } - protected def notifyWarning(message: String) { bus.publish(Warning(logSource, message)) } + protected def notifyWarning(message: String) { bus.publish(Warning(logSource, logClass, message)) } - protected def notifyInfo(message: String) { bus.publish(Info(logSource, message)) } + protected def notifyInfo(message: String) { bus.publish(Info(logSource, logClass, message)) } - protected def notifyDebug(message: String) { bus.publish(Debug(logSource, message)) } + protected def notifyDebug(message: String) { bus.publish(Debug(logSource, logClass, message)) } } diff --git a/akka-actor/src/main/scala/akka/event/LoggingReceive.scala b/akka-actor/src/main/scala/akka/event/LoggingReceive.scala index 250af89812..bb5a282856 100644 --- a/akka-actor/src/main/scala/akka/event/LoggingReceive.scala +++ b/akka-actor/src/main/scala/akka/event/LoggingReceive.scala @@ -36,7 +36,7 @@ object LoggingReceive { class LoggingReceive(source: AnyRef, r: Receive)(implicit system: ActorSystem) extends Receive { def isDefinedAt(o: Any) = { val handled = r.isDefinedAt(o) - system.eventStream.publish(Debug(LogSource.fromAnyRef(source), "received " + (if (handled) "handled" else "unhandled") + " message " + o)) + system.eventStream.publish(Debug(LogSource.fromAnyRef(source), source.getClass, "received " + (if (handled) "handled" else "unhandled") + " message " + o)) handled } def apply(o: Any): Unit = r(o) diff --git a/akka-actor/src/main/scala/akka/util/JMX.scala b/akka-actor/src/main/scala/akka/util/JMX.scala index bcfd5d2477..9a9f0530fb 100644 --- a/akka-actor/src/main/scala/akka/util/JMX.scala +++ b/akka-actor/src/main/scala/akka/util/JMX.scala @@ -21,7 +21,7 @@ object JMX { case e: InstanceAlreadyExistsException ⇒ Some(mbeanServer.getObjectInstance(name)) case e: Exception ⇒ - system.eventStream.publish(Error(e, "JMX", "Error when registering mbean [%s]".format(mbean))) + system.eventStream.publish(Error(e, "JMX", this.getClass, "Error when registering mbean [%s]".format(mbean))) None } @@ -29,6 +29,6 @@ object JMX { mbeanServer.unregisterMBean(mbean) } catch { case e: InstanceNotFoundException ⇒ {} - case e: Exception ⇒ system.eventStream.publish(Error(e, "JMX", "Error while unregistering mbean [%s]".format(mbean))) + case e: Exception ⇒ system.eventStream.publish(Error(e, "JMX", this.getClass, "Error while unregistering mbean [%s]".format(mbean))) } } diff --git a/akka-docs/java/logging.rst b/akka-docs/java/logging.rst index 20920d940b..aee644c175 100644 --- a/akka-docs/java/logging.rst +++ b/akka-docs/java/logging.rst @@ -83,8 +83,8 @@ creating the ``LoggingAdapter`` correspond to the name of the SL4FJ logger. loglevel = "DEBUG" } -Logging thread in MDC ---------------------- +Logging Thread and Akka Source in MDC +------------------------------------- Since the logging is done asynchronously the thread in which the logging was performed is captured in Mapped Diagnostic Context (MDC) with attribute name ``sourceThread``. @@ -96,3 +96,16 @@ With Logback the thread name is available with ``%X{sourceThread}`` specifier wi +Another helpful facility is that Akka captures the actor’s address when +instantiating a logger within it, meaning that the full instance identification +is available for associating log messages e.g. with members of a router. This +information is available in the MDC with attribute name ``akkaSource``:: + + + + %date{ISO8601} %-5level %logger{36} %X{akkaSource} - %msg%n + + + +For more details on what this attribute contains—also for non-actors—please see +`How to Log`_. diff --git a/akka-docs/scala/code/akka/docs/actor/ActorDocSpec.scala b/akka-docs/scala/code/akka/docs/actor/ActorDocSpec.scala index 20ac33480b..dd25d9d820 100644 --- a/akka-docs/scala/code/akka/docs/actor/ActorDocSpec.scala +++ b/akka-docs/scala/code/akka/docs/actor/ActorDocSpec.scala @@ -162,10 +162,10 @@ class ActorDocSpec extends AkkaSpec(Map("akka.loglevel" -> "INFO")) { system.eventStream.subscribe(testActor, classOf[Logging.Info]) myActor ! "test" - expectMsgPF(1 second) { case Logging.Info(_, "received test") ⇒ true } + expectMsgPF(1 second) { case Logging.Info(_, _, "received test") ⇒ true } myActor ! "unknown" - expectMsgPF(1 second) { case Logging.Info(_, "received unknown message") ⇒ true } + expectMsgPF(1 second) { case Logging.Info(_, _, "received unknown message") ⇒ true } system.eventStream.unsubscribe(testActor) system.eventStream.publish(TestEvent.UnMute(filter)) diff --git a/akka-docs/scala/code/akka/docs/event/LoggingDocSpec.scala b/akka-docs/scala/code/akka/docs/event/LoggingDocSpec.scala index ffa56a3064..c3c070d374 100644 --- a/akka-docs/scala/code/akka/docs/event/LoggingDocSpec.scala +++ b/akka-docs/scala/code/akka/docs/event/LoggingDocSpec.scala @@ -38,11 +38,11 @@ object LoggingDocSpec { class MyEventListener extends Actor { def receive = { - case InitializeLogger(_) ⇒ sender ! LoggerInitialized - case Error(cause, logSource, message) ⇒ // ... - case Warning(logSource, message) ⇒ // ... - case Info(logSource, message) ⇒ // ... - case Debug(logSource, message) ⇒ // ... + case InitializeLogger(_) ⇒ sender ! LoggerInitialized + case Error(cause, logSource, logClass, message) ⇒ // ... + case Warning(logSource, logClass, message) ⇒ // ... + case Info(logSource, logClass, message) ⇒ // ... + case Debug(logSource, logClass, message) ⇒ // ... } } //#my-event-listener diff --git a/akka-docs/scala/logging.rst b/akka-docs/scala/logging.rst index 35f4e838ff..f4272c5da0 100644 --- a/akka-docs/scala/logging.rst +++ b/akka-docs/scala/logging.rst @@ -85,8 +85,8 @@ creating the ``LoggingAdapter`` correspond to the name of the SL4FJ logger. loglevel = "DEBUG" } -Logging thread in MDC ---------------------- +Logging Thread and Akka Source in MDC +------------------------------------- Since the logging is done asynchronously the thread in which the logging was performed is captured in Mapped Diagnostic Context (MDC) with attribute name ``sourceThread``. @@ -98,3 +98,16 @@ With Logback the thread name is available with ``%X{sourceThread}`` specifier wi +Another helpful facility is that Akka captures the actor’s address when +instantiating a logger within it, meaning that the full instance identification +is available for associating log messages e.g. with members of a router. This +information is available in the MDC with attribute name ``akkaSource``:: + + + + %date{ISO8601} %-5level %logger{36} %X{akkaSource} - %msg%n + + + +For more details on what this attribute contains—also for non-actors—please see +`How to Log`_. diff --git a/akka-remote/src/main/scala/akka/remote/Remote.scala b/akka-remote/src/main/scala/akka/remote/Remote.scala index 1af4802552..6efa542e0e 100644 --- a/akka-remote/src/main/scala/akka/remote/Remote.scala +++ b/akka-remote/src/main/scala/akka/remote/Remote.scala @@ -153,7 +153,7 @@ class RemoteSystemDaemon(system: ActorSystemImpl, remote: Remote, _path: ActorPa override def !(msg: Any)(implicit sender: ActorRef = null): Unit = msg match { case message: DaemonMsg ⇒ - log.debug("Received command [\n{}] to RemoteSystemDaemon on [{}]", message, remote.remoteSettings.NodeName) + log.debug("Received command [{}] to RemoteSystemDaemon on [{}]", message, remote.remoteSettings.NodeName) message match { case DaemonMsgCreate(factory, path, supervisor) ⇒ import remote.remoteAddress diff --git a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala index 4624c9dc73..719261a5b6 100644 --- a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala +++ b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala @@ -60,7 +60,7 @@ abstract class RemoteClient private[akka] ( * Converts the message to the wireprotocol and sends the message across the wire */ def send(message: Any, senderOption: Option[ActorRef], recipient: ActorRef): Unit = if (isRunning) { - log.debug("Sending message: {}", message) + log.debug("Sending message {} from {} to {}", message, senderOption, recipient) send((message, senderOption, recipient)) } else { val exception = new RemoteClientException("RemoteModule client is not running, make sure you have invoked 'RemoteClient.connect()' before using it.", remoteSupport, remoteAddress) diff --git a/akka-remote/src/test/scala/akka/remote/RemoteDeathWatchSpec.scala b/akka-remote/src/test/scala/akka/remote/RemoteDeathWatchSpec.scala index b51720aa01..e585ade6d7 100644 --- a/akka-remote/src/test/scala/akka/remote/RemoteDeathWatchSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/RemoteDeathWatchSpec.scala @@ -15,8 +15,10 @@ akka { deployment { /watchers.remote = "akka://other@127.0.0.1:2666" } + debug.lifecycle = on } cluster.nodename = buh + loglevel = DEBUG remote.server { hostname = "127.0.0.1" port = 2665 diff --git a/akka-slf4j/src/main/scala/akka/event/slf4j/SLF4J.scala b/akka-slf4j/src/main/scala/akka/event/slf4j/SLF4J.scala index 91f3123634..4831d78270 100644 --- a/akka-slf4j/src/main/scala/akka/event/slf4j/SLF4J.scala +++ b/akka-slf4j/src/main/scala/akka/event/slf4j/SLF4J.scala @@ -19,6 +19,7 @@ trait SLF4JLogging { object Logger { def apply(logger: String): SLFLogger = SLFLoggerFactory getLogger logger + def apply(logClass: Class[_]): SLFLogger = SLFLoggerFactory getLogger logClass def root: SLFLogger = apply(SLFLogger.ROOT_LOGGER_NAME) } @@ -31,30 +32,31 @@ object Logger { class Slf4jEventHandler extends Actor with SLF4JLogging { val mdcThreadAttributeName = "sourceThread" + val mdcAkkaSourceAttributeName = "akkaSource" def receive = { - case event @ Error(cause, logSource, message) ⇒ - withMdc(mdcThreadAttributeName, event.thread.getName) { + case event @ Error(cause, logSource, logClass, message) ⇒ + withMdc(logSource, event.thread.getName) { cause match { - case Error.NoCause ⇒ Logger(logSource).error(message.toString) - case _ ⇒ Logger(logSource).error(message.toString, cause) + case Error.NoCause ⇒ Logger(logClass).error(message.toString) + case _ ⇒ Logger(logClass).error(message.toString, cause) } } - case event @ Warning(logSource, message) ⇒ - withMdc(mdcThreadAttributeName, event.thread.getName) { - Logger(logSource).warn("{}", message.asInstanceOf[AnyRef]) + case event @ Warning(logSource, logClass, message) ⇒ + withMdc(logSource, event.thread.getName) { + Logger(logClass).warn("{}", message.asInstanceOf[AnyRef]) } - case event @ Info(logSource, message) ⇒ - withMdc(mdcThreadAttributeName, event.thread.getName) { - Logger(logSource).info("{}", message.asInstanceOf[AnyRef]) + case event @ Info(logSource, logClass, message) ⇒ + withMdc(logSource, event.thread.getName) { + Logger(logClass).info("{}", message.asInstanceOf[AnyRef]) } - case event @ Debug(logSource, message) ⇒ - withMdc(mdcThreadAttributeName, event.thread.getName) { - Logger(logSource).debug("{}", message.asInstanceOf[AnyRef]) + case event @ Debug(logSource, logClass, message) ⇒ + withMdc(logSource, event.thread.getName) { + Logger(logClass).debug("{}", message.asInstanceOf[AnyRef]) } case InitializeLogger(_) ⇒ @@ -63,12 +65,14 @@ class Slf4jEventHandler extends Actor with SLF4JLogging { } @inline - final def withMdc(name: String, value: String)(logStatement: ⇒ Unit) { - MDC.put(name, value) + final def withMdc(logSource: String, thread: String)(logStatement: ⇒ Unit) { + MDC.put(mdcAkkaSourceAttributeName, logSource) + MDC.put(mdcThreadAttributeName, thread) try { logStatement } finally { - MDC.remove(name) + MDC.remove(mdcAkkaSourceAttributeName) + MDC.remove(mdcThreadAttributeName) } } diff --git a/akka-testkit/src/main/scala/akka/testkit/TestEventListener.scala b/akka-testkit/src/main/scala/akka/testkit/TestEventListener.scala index 7da8d84eba..3bee246e11 100644 --- a/akka-testkit/src/main/scala/akka/testkit/TestEventListener.scala +++ b/akka-testkit/src/main/scala/akka/testkit/TestEventListener.scala @@ -254,7 +254,7 @@ case class ErrorFilter( def matches(event: LogEvent) = { event match { - case Error(cause, src, msg) if throwable isInstance cause ⇒ + case Error(cause, src, _, msg) if throwable isInstance cause ⇒ (msg == null && cause.getMessage == null && cause.getStackTrace.length == 0) || doMatch(src, msg) || doMatch(src, cause.getMessage) case _ ⇒ false @@ -305,8 +305,8 @@ case class WarningFilter( def matches(event: LogEvent) = { event match { - case Warning(src, msg) ⇒ doMatch(src, msg) - case _ ⇒ false + case Warning(src, _, msg) ⇒ doMatch(src, msg) + case _ ⇒ false } } @@ -348,8 +348,8 @@ case class InfoFilter( def matches(event: LogEvent) = { event match { - case Info(src, msg) ⇒ doMatch(src, msg) - case _ ⇒ false + case Info(src, _, msg) ⇒ doMatch(src, msg) + case _ ⇒ false } } @@ -391,8 +391,8 @@ case class DebugFilter( def matches(event: LogEvent) = { event match { - case Debug(src, msg) ⇒ doMatch(src, msg) - case _ ⇒ false + case Debug(src, _, msg) ⇒ doMatch(src, msg) + case _ ⇒ false } } @@ -456,15 +456,15 @@ class TestEventListener extends Logging.DefaultLogger { case event: LogEvent ⇒ if (!filter(event)) print(event) case DeadLetter(msg: SystemMessage, _, rcp) ⇒ if (!msg.isInstanceOf[Terminate]) { - val event = Warning(rcp.path.toString, "received dead system message: " + msg) + val event = Warning(rcp.path.toString, rcp.getClass, "received dead system message: " + msg) if (!filter(event)) print(event) } case DeadLetter(msg, snd, rcp) ⇒ if (!msg.isInstanceOf[Terminated]) { - val event = Warning(rcp.path.toString, "received dead letter from " + snd + ": " + msg) + val event = Warning(rcp.path.toString, rcp.getClass, "received dead letter from " + snd + ": " + msg) if (!filter(event)) print(event) } - case m ⇒ print(Debug(context.system.name, m)) + case m ⇒ print(Debug(context.system.name, this.getClass, m)) } def filter(event: LogEvent): Boolean = filters exists (f ⇒ try { f(event) } catch { case e: Exception ⇒ false }) diff --git a/akka-testkit/src/test/scala/akka/testkit/TestActorRefSpec.scala b/akka-testkit/src/test/scala/akka/testkit/TestActorRefSpec.scala index 747a9c90e9..d5c9b1a151 100644 --- a/akka-testkit/src/test/scala/akka/testkit/TestActorRefSpec.scala +++ b/akka-testkit/src/test/scala/akka/testkit/TestActorRefSpec.scala @@ -81,7 +81,7 @@ object TestActorRefSpec { var count = 0 var msg: String = _ def receive = { - case Warning(_, m: String) ⇒ count += 1; msg = m + case Warning(_, _, m: String) ⇒ count += 1; msg = m } }