Merge pull request #212 from jboner/wip-1621-logging-∂π

fix RemoteDeathWatchSpec and improve logging
This commit is contained in:
Roland Kuhn 2012-01-13 05:59:03 -08:00
commit 312ea1bef1
30 changed files with 489 additions and 185 deletions

View file

@ -33,19 +33,26 @@ trait DeathWatchSpec { this: AkkaSpec with ImplicitSender with DefaultTimeout
}
"notify with one Terminated message when an Actor is stopped" in {
val terminal = system.actorOf(Props(context { case _ }))
startWatching(terminal)
testActor ! "ping"
expectMsg("ping")
val terminal = system.actorOf(Props.empty)
startWatching(terminal) ! "hallo"
expectMsg("hallo") // this ensures that the DaemonMsgWatch has been received before we send the PoisonPill
terminal ! PoisonPill
expectTerminationOf(terminal)
}
"notify with one Terminated message when an Actor is already dead" in {
val terminal = system.actorOf(Props.empty)
terminal ! PoisonPill
startWatching(terminal)
expectTerminationOf(terminal)
}
"notify with all monitors with one Terminated message when an Actor is stopped" in {
val terminal = system.actorOf(Props(context { case _ }))
val terminal = system.actorOf(Props.empty)
val monitor1, monitor2, monitor3 = startWatching(terminal)
terminal ! PoisonPill
@ -60,7 +67,7 @@ trait DeathWatchSpec { this: AkkaSpec with ImplicitSender with DefaultTimeout
}
"notify with _current_ monitors with one Terminated message when an Actor is stopped" in {
val terminal = system.actorOf(Props(context { case _ }))
val terminal = system.actorOf(Props.empty)
val monitor1, monitor3 = startWatching(terminal)
val monitor2 = system.actorOf(Props(new Actor {
context.watch(terminal)

View file

@ -171,7 +171,7 @@ class FSMActorSpec extends AkkaSpec(Map("akka.actor.debug.fsm" -> true)) with Im
system.eventStream.subscribe(testActor, classOf[Logging.Error])
fsm ! "go"
expectMsgPF(1 second, hint = "Next state 2 does not exist") {
case Logging.Error(_, `name`, "Next state 2 does not exist") true
case Logging.Error(_, `name`, _, "Next state 2 does not exist") true
}
system.eventStream.unsubscribe(testActor)
}
@ -218,18 +218,19 @@ class FSMActorSpec extends AkkaSpec(Map("akka.actor.debug.fsm" -> true)) with Im
}
})
val name = fsm.path.toString
val fsmClass = fsm.underlyingActor.getClass
system.eventStream.subscribe(testActor, classOf[Logging.Debug])
fsm ! "go"
expectMsgPF(1 second, hint = "processing Event(go,null)") {
case Logging.Debug(`name`, s: String) if s.startsWith("processing Event(go,null) from Actor[") true
case Logging.Debug(`name`, `fsmClass`, s: String) if s.startsWith("processing Event(go,null) from Actor[") true
}
expectMsg(1 second, Logging.Debug(name, "setting timer 't'/1500 milliseconds: Shutdown"))
expectMsg(1 second, Logging.Debug(name, "transition 1 -> 2"))
expectMsg(1 second, Logging.Debug(name, fsmClass, "setting timer 't'/1500 milliseconds: Shutdown"))
expectMsg(1 second, Logging.Debug(name, fsmClass, "transition 1 -> 2"))
fsm ! "stop"
expectMsgPF(1 second, hint = "processing Event(stop,null)") {
case Logging.Debug(`name`, s: String) if s.startsWith("processing Event(stop,null) from Actor[") true
case Logging.Debug(`name`, `fsmClass`, s: String) if s.startsWith("processing Event(stop,null) from Actor[") true
}
expectMsgAllOf(1 second, Logging.Debug(name, "canceling timer 't'"), Normal)
expectMsgAllOf(1 second, Logging.Debug(name, fsmClass, "canceling timer 't'"), Normal)
expectNoMsg(1 second)
system.eventStream.unsubscribe(testActor)
}

View file

@ -151,7 +151,7 @@ object ActorModelSpec {
await(deadline)(stops == dispatcher.stops.get)
} catch {
case e
system.eventStream.publish(Error(e, dispatcher.toString, "actual: stops=" + dispatcher.stops.get +
system.eventStream.publish(Error(e, dispatcher.toString, dispatcher.getClass, "actual: stops=" + dispatcher.stops.get +
" required: stops=" + stops))
throw e
}
@ -208,7 +208,10 @@ object ActorModelSpec {
await(deadline)(stats.restarts.get() == restarts)
} catch {
case e
system.eventStream.publish(Error(e, Option(dispatcher).toString, "actual: " + stats + ", required: InterceptorStats(susp=" + suspensions +
system.eventStream.publish(Error(e,
Option(dispatcher).toString,
(Option(dispatcher) getOrElse this).getClass,
"actual: " + stats + ", required: InterceptorStats(susp=" + suspensions +
",res=" + resumes + ",reg=" + registers + ",unreg=" + unregisters +
",recv=" + msgsReceived + ",proc=" + msgsProcessed + ",restart=" + restarts))
throw e
@ -311,7 +314,7 @@ abstract class ActorModelSpec(config: String) extends AkkaSpec(config) with Defa
try {
f
} catch {
case e system.eventStream.publish(Error(e, "spawn", "error in spawned thread"))
case e system.eventStream.publish(Error(e, "spawn", this.getClass, "error in spawned thread"))
}
}
}

View file

@ -108,7 +108,7 @@ class EventStreamSpec extends AkkaSpec(EventStreamSpec.config) {
private def verifyLevel(bus: LoggingBus, level: Logging.LogLevel) {
import Logging._
val allmsg = Seq(Debug("", "debug"), Info("", "info"), Warning("", "warning"), Error("", "error"))
val allmsg = Seq(Debug("", null, "debug"), Info("", null, "info"), Warning("", null, "warning"), Error("", null, "error"))
val msg = allmsg filter (_.level <= level)
allmsg foreach bus.publish
msg foreach (x expectMsg(x))

View file

@ -59,7 +59,7 @@ class LoggingReceiveSpec extends WordSpec with BeforeAndAfterEach with BeforeAnd
}
val log = LoggingReceive("funky")(r)
log.isDefinedAt("hallo")
expectMsg(1 second, Logging.Debug("funky", "received unhandled message hallo"))
expectMsg(1 second, Logging.Debug("funky", classOf[DummyClassForStringSources], "received unhandled message hallo"))
}
}
@ -83,7 +83,7 @@ class LoggingReceiveSpec extends WordSpec with BeforeAndAfterEach with BeforeAnd
val name = actor.path.toString
actor ! "buh"
within(1 second) {
expectMsg(Logging.Debug(name, "received handled message buh"))
expectMsg(Logging.Debug(name, actor.underlyingActor.getClass, "received handled message buh"))
expectMsg("x")
}
@ -109,7 +109,7 @@ class LoggingReceiveSpec extends WordSpec with BeforeAndAfterEach with BeforeAnd
})
actor ! "buh"
within(1 second) {
expectMsg(Logging.Debug(actor.path.toString, "received handled message buh"))
expectMsg(Logging.Debug(actor.path.toString, actor.underlyingActor.getClass, "received handled message buh"))
expectMsg("x")
}
}
@ -130,7 +130,7 @@ class LoggingReceiveSpec extends WordSpec with BeforeAndAfterEach with BeforeAnd
val name = actor.path.toString
actor ! PoisonPill
expectMsgPF() {
case Logging.Debug(`name`, msg: String) if msg startsWith "received AutoReceiveMessage Envelope(PoisonPill" true
case Logging.Debug(`name`, _, msg: String) if msg startsWith "received AutoReceiveMessage Envelope(PoisonPill" true
}
awaitCond(actor.isTerminated, 100 millis)
}
@ -142,7 +142,7 @@ class LoggingReceiveSpec extends WordSpec with BeforeAndAfterEach with BeforeAnd
val sys = impl.systemGuardian.path.toString
ignoreMute(this)
ignoreMsg {
case Logging.Debug(s, _) s.contains("MainBusReaper") || s == sys
case Logging.Debug(`sys`, _, _) true
}
system.eventStream.subscribe(testActor, classOf[Logging.Debug])
system.eventStream.subscribe(testActor, classOf[Logging.Error])
@ -151,51 +151,53 @@ class LoggingReceiveSpec extends WordSpec with BeforeAndAfterEach with BeforeAnd
val lname = lifecycleGuardian.path.toString
val supervisor = TestActorRef[TestLogActor](Props[TestLogActor].withFaultHandler(OneForOneStrategy(List(classOf[Throwable]), 5, 5000)))
val sname = supervisor.path.toString
val sclass = classOf[TestLogActor]
val supervisorSet = receiveWhile(messages = 2) {
case Logging.Debug(`lname`, msg: String) if msg startsWith "now supervising" 1
case Logging.Debug(`sname`, msg: String) if msg startsWith "started" 2
case Logging.Debug(`lname`, _, msg: String) if msg startsWith "now supervising" 1
case Logging.Debug(`sname`, `sclass`, msg: String) if msg startsWith "started" 2
}.toSet
expectNoMsg(Duration.Zero)
assert(supervisorSet == Set(1, 2), supervisorSet + " was not Set(1, 2)")
val actor = TestActorRef[TestLogActor](Props[TestLogActor], supervisor, "none")
val aname = actor.path.toString
val aclass = classOf[TestLogActor]
val set = receiveWhile(messages = 2) {
case Logging.Debug(`sname`, msg: String) if msg startsWith "now supervising" 1
case Logging.Debug(`aname`, msg: String) if msg startsWith "started" 2
case Logging.Debug(`sname`, _, msg: String) if msg startsWith "now supervising" 1
case Logging.Debug(`aname`, `aclass`, msg: String) if msg startsWith "started" 2
}.toSet
expectNoMsg(Duration.Zero)
assert(set == Set(1, 2), set + " was not Set(1, 2)")
supervisor watch actor
expectMsgPF(hint = "now monitoring") {
case Logging.Debug(ref, msg: String)
case Logging.Debug(ref, `sclass`, msg: String)
ref == supervisor.underlyingActor && msg.startsWith("now monitoring")
}
supervisor unwatch actor
expectMsgPF(hint = "stopped monitoring") {
case Logging.Debug(ref, msg: String)
case Logging.Debug(ref, `sclass`, msg: String)
ref == supervisor.underlyingActor && msg.startsWith("stopped monitoring")
}
EventFilter[ActorKilledException](occurrences = 1) intercept {
actor ! Kill
val set = receiveWhile(messages = 3) {
case Logging.Error(_: ActorKilledException, `aname`, "Kill") 1
case Logging.Debug(`aname`, "restarting") 2
case Logging.Debug(`aname`, "restarted") 3
case Logging.Error(_: ActorKilledException, `aname`, `aclass`, "Kill") 1
case Logging.Debug(`aname`, `aclass`, "restarting") 2
case Logging.Debug(`aname`, `aclass`, "restarted") 3
}.toSet
expectNoMsg(Duration.Zero)
assert(set == Set(1, 2, 3), set + " was not Set(1, 2, 3)")
}
system.stop(supervisor)
expectMsg(Logging.Debug(sname, "stopping"))
expectMsg(Logging.Debug(aname, "stopped"))
expectMsg(Logging.Debug(sname, "stopped"))
expectMsg(Logging.Debug(sname, `sclass`, "stopping"))
expectMsg(Logging.Debug(aname, `aclass`, "stopped"))
expectMsg(Logging.Debug(sname, `sclass`, "stopped"))
}
}
}

View file

@ -112,7 +112,7 @@ object Status {
}
trait ActorLogging { this: Actor
val log = akka.event.Logging(context.system.eventStream, context.self)
val log = akka.event.Logging(context.system, context.self)
}
object Actor {

View file

@ -358,12 +358,12 @@ private[akka] class ActorCell(
actor = created
created.preStart()
checkReceiveTimeout
if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, "started (" + actor + ")"))
if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, clazz(created), "started (" + created + ")"))
} catch {
// FIXME catching all and continue isn't good for OOME, ticket #1418
case e
try {
system.eventStream.publish(Error(e, self.path.toString, "error while creating actor"))
system.eventStream.publish(Error(e, self.path.toString, clazz(actor), "error while creating actor"))
// prevent any further messages to be processed until the actor has been restarted
dispatcher.suspend(this)
} finally {
@ -373,7 +373,7 @@ private[akka] class ActorCell(
def recreate(cause: Throwable): Unit = try {
val failedActor = actor
if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, "restarting"))
if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, clazz(failedActor), "restarting"))
val freshActor = newActor()
if (failedActor ne null) {
val c = currentMessage //One read only plz
@ -388,7 +388,7 @@ private[akka] class ActorCell(
actor = freshActor // assign it here so if preStart fails, we can null out the sef-refs next call
hotswap = Props.noHotSwap // Reset the behavior
freshActor.postRestart(cause)
if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, "restarted"))
if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, clazz(freshActor), "restarted"))
dispatcher.resume(this) //FIXME should this be moved down?
@ -396,7 +396,7 @@ private[akka] class ActorCell(
} catch {
// FIXME catching all and continue isn't good for OOME, ticket #1418
case e try {
system.eventStream.publish(Error(e, self.path.toString, "error while creating actor"))
system.eventStream.publish(Error(e, self.path.toString, clazz(actor), "error while creating actor"))
// prevent any further messages to be processed until the actor has been restarted
dispatcher.suspend(this)
} finally {
@ -417,7 +417,7 @@ private[akka] class ActorCell(
else {
// do not process normal messages while waiting for all children to terminate
dispatcher suspend this
if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, "stopping"))
if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, clazz(actor), "stopping"))
// do not use stop(child) because that would dissociate the children from us, but we still want to wait for them
for (child c) child.asInstanceOf[InternalActorRef].stop()
stopping = true
@ -428,12 +428,12 @@ private[akka] class ActorCell(
childrenRefs.get(child.path.name) match {
case None
childrenRefs = childrenRefs.updated(child.path.name, ChildRestartStats(child))
if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, "now supervising " + child))
if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, clazz(actor), "now supervising " + child))
case Some(ChildRestartStats(`child`, _, _))
// this is the nominal case where we created the child and entered it in actorCreated() above
if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, "now supervising " + child))
if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, clazz(actor), "now supervising " + child))
case Some(ChildRestartStats(c, _, _))
system.eventStream.publish(Warning(self.path.toString, "Already supervising other child with same name '" + child.path.name + "', old: " + c + " new: " + child))
system.eventStream.publish(Warning(self.path.toString, clazz(actor), "Already supervising other child with same name '" + child.path.name + "', old: " + c + " new: " + child))
}
}
@ -448,10 +448,10 @@ private[akka] class ActorCell(
case Recreate(cause) recreate(cause)
case Link(subject)
system.deathWatch.subscribe(self, subject)
if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, "now monitoring " + subject))
if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, clazz(actor), "now monitoring " + subject))
case Unlink(subject)
system.deathWatch.unsubscribe(self, subject)
if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, "stopped monitoring " + subject))
if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, clazz(actor), "stopped monitoring " + subject))
case Suspend() suspend()
case Resume() resume()
case Terminate() terminate()
@ -460,7 +460,7 @@ private[akka] class ActorCell(
}
} catch {
case e //Should we really catch everything here?
system.eventStream.publish(Error(e, self.path.toString, "error while processing " + message))
system.eventStream.publish(Error(e, self.path.toString, clazz(actor), "error while processing " + message))
//TODO FIXME How should problems here be handled???
throw e
}
@ -480,7 +480,7 @@ private[akka] class ActorCell(
currentMessage = null // reset current message after successful invocation
} catch {
case e
system.eventStream.publish(Error(e, self.path.toString, e.getMessage))
system.eventStream.publish(Error(e, self.path.toString, clazz(actor), e.getMessage))
// prevent any further messages to be processed until the actor has been restarted
dispatcher.suspend(this)
@ -500,7 +500,7 @@ private[akka] class ActorCell(
}
} catch {
case e
system.eventStream.publish(Error(e, self.path.toString, e.getMessage))
system.eventStream.publish(Error(e, self.path.toString, clazz(actor), e.getMessage))
throw e
}
}
@ -530,7 +530,8 @@ private[akka] class ActorCell(
}
def autoReceiveMessage(msg: Envelope) {
if (system.settings.DebugAutoReceive) system.eventStream.publish(Debug(self.path.toString, "received AutoReceiveMessage " + msg))
if (system.settings.DebugAutoReceive)
system.eventStream.publish(Debug(self.path.toString, clazz(actor), "received AutoReceiveMessage " + msg))
msg.message match {
case Failed(cause) handleFailure(sender, cause)
@ -554,7 +555,8 @@ private[akka] class ActorCell(
try {
parent.sendSystemMessage(ChildTerminated(self))
system.deathWatch.publish(Terminated(self))
if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, "stopped"))
if (system.settings.DebugLifecycle)
system.eventStream.publish(Debug(self.path.toString, clazz(actor), "stopped")) // FIXME: can actor be null?
} finally {
currentMessage = null
clearActorFields()
@ -565,8 +567,8 @@ private[akka] class ActorCell(
final def handleFailure(child: ActorRef, cause: Throwable): Unit = childrenRefs.get(child.path.name) match {
case Some(stats) if stats.child == child if (!props.faultHandler.handleFailure(this, child, cause, stats, childrenRefs.values)) throw cause
case Some(stats) system.eventStream.publish(Warning(self.path.toString, "dropping Failed(" + cause + ") from unknown child " + child + " matching names but not the same, was: " + stats.child))
case None system.eventStream.publish(Warning(self.path.toString, "dropping Failed(" + cause + ") from unknown child " + child))
case Some(stats) system.eventStream.publish(Warning(self.path.toString, clazz(actor), "dropping Failed(" + cause + ") from unknown child " + child + " matching names but not the same, was: " + stats.child))
case None system.eventStream.publish(Warning(self.path.toString, clazz(actor), "dropping Failed(" + cause + ") from unknown child " + child))
}
final def handleChildTerminated(child: ActorRef): Unit = {
@ -625,4 +627,9 @@ private[akka] class ActorCell(
lookupAndSetField(a.getClass, a, "self", self)
}
}
private def clazz(o: AnyRef): Class[_] = {
if (o eq null) this.getClass
else o.getClass
}
}

View file

@ -449,7 +449,10 @@ object DeadLetterActorRef {
val serialized = new SerializedDeadLetterActorRef
}
class DeadLetterActorRef(val eventStream: EventStream) extends MinimalActorRef {
trait DeadLetterActorRefLike extends MinimalActorRef {
def eventStream: EventStream
@volatile
private var brokenPromise: Future[Any] = _
@volatile
@ -477,7 +480,9 @@ class DeadLetterActorRef(val eventStream: EventStream) extends MinimalActorRef {
assert(brokenPromise != null)
brokenPromise
}
}
class DeadLetterActorRef(val eventStream: EventStream) extends DeadLetterActorRefLike {
@throws(classOf[java.io.ObjectStreamException])
override protected def writeReplace(): AnyRef = DeadLetterActorRef.serialized
}
@ -486,8 +491,8 @@ class DeadLetterActorRef(val eventStream: EventStream) extends MinimalActorRef {
* This special dead letter reference has a name: it is that which is returned
* by a local look-up which is unsuccessful.
*/
class EmptyLocalActorRef(_eventStream: EventStream, _dispatcher: MessageDispatcher, _path: ActorPath)
extends DeadLetterActorRef(_eventStream) {
class EmptyLocalActorRef(val eventStream: EventStream, _dispatcher: MessageDispatcher, _path: ActorPath)
extends DeadLetterActorRefLike {
init(_dispatcher, _path)
override def !(message: Any)(implicit sender: ActorRef = null): Unit = message match {
case d: DeadLetter // do NOT form endless loops

View file

@ -296,7 +296,7 @@ class LocalActorRefProvider(
val nodename: String = "local"
val clustername: String = "local"
val log = Logging(eventStream, "LocalActorRefProvider")
val log = Logging(eventStream, "LocalActorRefProvider(" + rootPath.address + ")")
/*
* generate name for temporary actor refs

View file

@ -330,7 +330,11 @@ class ActorSystemImpl(val name: String, applicationConfig: Config) extends Actor
// this provides basic logging (to stdout) until .start() is called below
val eventStream = new EventStream(DebugEventStream)
eventStream.startStdoutLogger(settings)
val log = new BusLogging(eventStream, "ActorSystem") // this used only for .getClass in tagging messages
// unfortunately we need logging before we know the rootpath address, which wants to be inserted here
@volatile
private var _log = new BusLogging(eventStream, "ActorSystem(" + name + ")", this.getClass)
def log = _log
val scheduler = createScheduler()
@ -383,6 +387,7 @@ class ActorSystemImpl(val name: String, applicationConfig: Config) extends Actor
private lazy val _start: this.type = {
// the provider is expected to start default loggers, LocalActorRefProvider does this
provider.init(this)
_log = new BusLogging(eventStream, "ActorSystem(" + lookupRoot.path.address + ")", this.getClass)
deadLetters.init(dispatcher, lookupRoot.path / "deadLetters")
// this starts the reaper actor and the user-configured logging subscribers, which are also actors
registerOnTermination(stopScheduler())
@ -498,4 +503,6 @@ class ActorSystemImpl(val name: String, applicationConfig: Config) extends Actor
}
}
override def toString = lookupRoot.path.root.address.toString
}

View file

@ -190,7 +190,7 @@ trait FSM[S, D] extends Listeners {
type Timeout = Option[Duration]
type TransitionHandler = PartialFunction[(S, S), Unit]
val log = Logging(context.system, context.self)
val log = Logging(context.system, this)
/**
* ****************************************

View file

@ -80,7 +80,7 @@ final case class TaskInvocation(eventStream: EventStream, runnable: Runnable, cl
runnable.run()
} catch {
// FIXME catching all and continue isn't good for OOME, ticket #1418
case e eventStream.publish(Error(e, "TaskInvocation", e.getMessage))
case e eventStream.publish(Error(e, "TaskInvocation", this.getClass, e.getMessage))
} finally {
cleanup()
}

View file

@ -59,7 +59,7 @@ class Dispatcher(
executorService.get() execute invocation
} catch {
case e2: RejectedExecutionException
prerequisites.eventStream.publish(Warning("Dispatcher", e2.toString))
prerequisites.eventStream.publish(Warning("Dispatcher", this.getClass, e2.toString))
throw e2
}
}

View file

@ -77,7 +77,7 @@ class Dispatchers(val settings: ActorSystem.Settings, val prerequisites: Dispatc
} else {
// Note that the configurator of the default dispatcher will be registered for this id,
// so this will only be logged once, which is crucial.
prerequisites.eventStream.publish(Warning("Dispatchers",
prerequisites.eventStream.publish(Warning("Dispatchers", this.getClass,
"Dispatcher [%s] not configured, using default-dispatcher".format(id)))
lookupConfigurator(DefaultDispatcherId)
}

View file

@ -325,7 +325,7 @@ object Future {
// FIXME catching all and continue isn't good for OOME, ticket #1418
executor match {
case m: MessageDispatcher
m.prerequisites.eventStream.publish(Error(e, "Future.dispatchTask", e.getMessage))
m.prerequisites.eventStream.publish(Error(e, "Future.dispatchTask", this.getClass, e.getMessage))
case other
e.printStackTrace()
}
@ -566,7 +566,7 @@ sealed trait Future[+T] extends japi.Future[T] with Await.Awaitable[T] {
protected def logError(msg: String, problem: Throwable): Unit = {
executor match {
case m: MessageDispatcher m.prerequisites.eventStream.publish(Error(problem, msg, problem.getMessage))
case m: MessageDispatcher m.prerequisites.eventStream.publish(Error(problem, msg, this.getClass, problem.getMessage))
case other problem.printStackTrace()
}
}

View file

@ -214,7 +214,7 @@ private[akka] abstract class Mailbox(val actor: ActorCell) extends MessageQueue
}
} catch {
case e
actor.system.eventStream.publish(Error(e, actor.self.path.toString, "exception during processing system messages, dropping " + SystemMessage.size(nextMessage) + " messages!"))
actor.system.eventStream.publish(Error(e, actor.self.path.toString, actor.actor.getClass, "exception during processing system messages, dropping " + SystemMessage.size(nextMessage) + " messages!"))
throw e
}
}

View file

@ -38,19 +38,19 @@ class EventStream(private val debug: Boolean = false) extends LoggingBus with Su
}
override def subscribe(subscriber: ActorRef, channel: Class[_]): Boolean = {
if (debug) publish(Logging.Debug(simpleName(this), "subscribing " + subscriber + " to channel " + channel))
if (debug) publish(Logging.Debug(simpleName(this), this.getClass, "subscribing " + subscriber + " to channel " + channel))
super.subscribe(subscriber, channel)
}
override def unsubscribe(subscriber: ActorRef, channel: Class[_]): Boolean = {
val ret = super.unsubscribe(subscriber, channel)
if (debug) publish(Logging.Debug(simpleName(this), "unsubscribing " + subscriber + " from channel " + channel))
if (debug) publish(Logging.Debug(simpleName(this), this.getClass, "unsubscribing " + subscriber + " from channel " + channel))
ret
}
override def unsubscribe(subscriber: ActorRef) {
super.unsubscribe(subscriber)
if (debug) publish(Logging.Debug(simpleName(this), "unsubscribing " + subscriber + " from all channels"))
if (debug) publish(Logging.Debug(simpleName(this), this.getClass, "unsubscribing " + subscriber + " from all channels"))
}
}

View file

@ -16,10 +16,6 @@ import scala.util.control.NoStackTrace
import java.util.concurrent.TimeoutException
import akka.dispatch.Await
object LoggingBus {
implicit def fromActorSystem(system: ActorSystem): LoggingBus = system.eventStream
}
/**
* This trait brings log level handling to the EventStream: it reads the log
* levels for the initial logging (StandardOutLogger) and the loggers & level
@ -75,7 +71,7 @@ trait LoggingBus extends ActorEventBus {
*/
private[akka] def startStdoutLogger(config: Settings) {
val level = levelFor(config.StdoutLogLevel) getOrElse {
StandardOutLogger.print(Error(new EventHandlerException, simpleName(this), "unknown akka.stdout-loglevel " + config.StdoutLogLevel))
StandardOutLogger.print(Error(new EventHandlerException, simpleName(this), this.getClass, "unknown akka.stdout-loglevel " + config.StdoutLogLevel))
ErrorLevel
}
AllLogLevels filter (level >= _) foreach (l subscribe(StandardOutLogger, classFor(l)))
@ -83,15 +79,16 @@ trait LoggingBus extends ActorEventBus {
loggers = Seq(StandardOutLogger)
_logLevel = level
}
publish(Debug(simpleName(this), "StandardOutLogger started"))
publish(Debug(simpleName(this), this.getClass, "StandardOutLogger started"))
}
/**
* Internal Akka use only
*/
private[akka] def startDefaultLoggers(system: ActorSystemImpl) {
val logName = simpleName(this) + "(" + system + ")"
val level = levelFor(system.settings.LogLevel) getOrElse {
StandardOutLogger.print(Error(new EventHandlerException, simpleName(this), "unknown akka.stdout-loglevel " + system.settings.LogLevel))
StandardOutLogger.print(Error(new EventHandlerException, logName, this.getClass, "unknown akka.stdout-loglevel " + system.settings.LogLevel))
ErrorLevel
}
try {
@ -105,7 +102,7 @@ trait LoggingBus extends ActorEventBus {
} yield {
try {
ReflectiveAccess.getClassFor[Actor](loggerName) match {
case Right(actorClass) addLogger(system, actorClass, level)
case Right(actorClass) addLogger(system, actorClass, level, logName)
case Left(exception) throw exception
}
} catch {
@ -119,7 +116,7 @@ trait LoggingBus extends ActorEventBus {
loggers = myloggers
_logLevel = level
}
publish(Debug(simpleName(this), "Default Loggers started"))
publish(Debug(logName, this.getClass, "Default Loggers started"))
if (!(defaultLoggers contains StandardOutLoggerName)) {
unsubscribe(StandardOutLogger)
}
@ -138,7 +135,7 @@ trait LoggingBus extends ActorEventBus {
val level = _logLevel // volatile access before reading loggers
if (!(loggers contains StandardOutLogger)) {
AllLogLevels filter (level >= _) foreach (l subscribe(StandardOutLogger, classFor(l)))
publish(Debug(simpleName(this), "shutting down: StandardOutLogger started"))
publish(Debug(simpleName(this), this.getClass, "shutting down: StandardOutLogger started"))
}
for {
logger loggers
@ -151,33 +148,105 @@ trait LoggingBus extends ActorEventBus {
case _
}
}
publish(Debug(simpleName(this), "all default loggers stopped"))
publish(Debug(simpleName(this), this.getClass, "all default loggers stopped"))
}
private def addLogger(system: ActorSystemImpl, clazz: Class[_ <: Actor], level: LogLevel): ActorRef = {
private def addLogger(system: ActorSystemImpl, clazz: Class[_ <: Actor], level: LogLevel, logName: String): ActorRef = {
val name = "log" + Extension(system).id() + "-" + simpleName(clazz)
val actor = system.systemActorOf(Props(clazz), name)
implicit val timeout = Timeout(3 seconds)
val response = try Await.result(actor ? InitializeLogger(this), timeout.duration) catch {
case _: TimeoutException
publish(Warning(simpleName(this), "Logger " + name + " did not respond within " + timeout + " to InitializeLogger(bus)"))
publish(Warning(logName, this.getClass, "Logger " + name + " did not respond within " + timeout + " to InitializeLogger(bus)"))
}
if (response != LoggerInitialized)
throw new LoggerInitializationException("Logger " + name + " did not respond with LoggerInitialized, sent instead " + response)
AllLogLevels filter (level >= _) foreach (l subscribe(actor, classFor(l)))
publish(Debug(simpleName(this), "logger " + name + " started"))
publish(Debug(logName, this.getClass, "logger " + name + " started"))
actor
}
}
/**
* This trait defines the interface to be provided by a log source formatting
* rule as used by [[akka.event.Logging]]s `apply`/`create` method.
*
* See the companion object for default implementations.
*
* Example:
* {{{
* trait MyType { // as an example
* def name: String
* }
*
* implicit val myLogSourceType: LogSource[MyType] = new LogSource {
* def genString(a: MyType) = a.name
* }
*
* class MyClass extends MyType {
* val log = Logging(eventStream, this) // will use "hallo" as logSource
* def name = "hallo"
* }
* }}}
*
* The second variant is used for including the actor systems address:
* {{{
* trait MyType { // as an example
* def name: String
* }
*
* implicit val myLogSourceType: LogSource[MyType] = new LogSource {
* def genString(a: MyType) = a.name
* def genString(a: MyType, s: ActorSystem) = a.name + "," + s
* }
*
* class MyClass extends MyType {
* val sys = ActorSyste("sys")
* val log = Logging(sys, this) // will use "hallo,akka://sys" as logSource
* def name = "hallo"
* }
* }}}
*
* The default implementation of the second variant will just call the first.
*/
trait LogSource[-T] {
def genString(t: T): String
def genString(t: T, system: ActorSystem): String = genString(t)
def getClazz(t: T): Class[_] = t.getClass
}
/**
* This is a marker class which is inserted as originator class into
* [[akka.event.LogEvent]] when the string representation was supplied
* directly.
*/
class DummyClassForStringSources
/**
* This object holds predefined formatting rules for log sources.
*
* In case an [[akka.actor.ActorSystem]] is provided, the following apply:
* <ul>
* <li>[[akka.actor.Actor]] and [[akka.actor.ActorRef]] will be represented by their absolute physical path</li>
* <li>providing a `String` as source will append "(<system address>)" and use the result</li>
* <li>providing a `Class` will extract its simple name, append "(<system address>)" and use the result</li>
* <li>anything else gives compile error unless implicit [[akka.event.LogSource]] is in scope for it</li>
* </ul>
*
* In case a [[akka.event.LoggingBus]] is provided, the following apply:
* <ul>
* <li>[[akka.actor.Actor]] and [[akka.actor.ActorRef]] will be represented by their absolute physical path</li>
* <li>providing a `String` as source will be used as is</li>
* <li>providing a `Class` will extract its simple name</li>
* <li>anything else gives compile error unless implicit [[akka.event.LogSource]] is in scope for it</li>
* </ul>
*/
object LogSource {
implicit val fromString: LogSource[String] = new LogSource[String] {
def genString(s: String) = s
override def genString(s: String, system: ActorSystem) = s + "(" + system + ")"
override def getClazz(s: String) = classOf[DummyClassForStringSources]
}
implicit val fromActor: LogSource[Actor] = new LogSource[Actor] {
@ -191,18 +260,55 @@ object LogSource {
// this one unfortunately does not work as implicit, because existential types have some weird behavior
val fromClass: LogSource[Class[_]] = new LogSource[Class[_]] {
def genString(c: Class[_]) = simpleName(c)
override def genString(c: Class[_], system: ActorSystem) = simpleName(c) + "(" + system + ")"
override def getClazz(c: Class[_]) = c
}
implicit def fromAnyClass[T]: LogSource[Class[T]] = fromClass.asInstanceOf[LogSource[Class[T]]]
def apply[T: LogSource](o: T) = implicitly[LogSource[T]].genString(o)
/**
* Convenience converter access: given an implicit `LogSource`, generate the
* string representation and originating class.
*/
def apply[T: LogSource](o: T): (String, Class[_]) = {
val ls = implicitly[LogSource[T]]
(ls.genString(o), ls.getClazz(o))
}
def fromAnyRef(o: AnyRef): String =
/**
* Convenience converter access: given an implicit `LogSource` and
* [[akka.actor.ActorSystem]], generate the string representation and
* originating class.
*/
def apply[T: LogSource](o: T, system: ActorSystem): (String, Class[_]) = {
val ls = implicitly[LogSource[T]]
(ls.genString(o, system), ls.getClazz(o))
}
/**
* construct string representation for any object according to
* rules above with fallback to its `Class`s simple name.
*/
def fromAnyRef(o: AnyRef): (String, Class[_]) =
o match {
case c: Class[_] fromClass.genString(c)
case a: Actor fromActor.genString(a)
case a: ActorRef fromActorRef.genString(a)
case s: String s
case x simpleName(x)
case c: Class[_] apply(c)
case a: Actor apply(a)
case a: ActorRef apply(a)
case s: String apply(s)
case x (simpleName(x), x.getClass)
}
/**
* construct string representation for any object according to
* rules above (including the actor systems address) with fallback to its
* `Class`s simple name.
*/
def fromAnyRef(o: AnyRef, system: ActorSystem): (String, Class[_]) =
o match {
case c: Class[_] apply(c)
case a: Actor apply(a)
case a: ActorRef apply(a)
case s: String apply(s)
case x (simpleName(x) + "(" + system + ")", x.getClass)
}
}
@ -218,6 +324,11 @@ object LogSource {
* log.info("hello world!")
* </code></pre>
*
* The source object is used in two fashions: its `Class[_]` will be part of
* all log events produced by this logger, plus a string representation is
* generated which may contain per-instance information, see `apply` or `create`
* below.
*
* Loggers are attached to the level-specific channels <code>Error</code>,
* <code>Warning</code>, <code>Info</code> and <code>Debug</code> as
* appropriate for the configured (or set) log level. If you want to implement
@ -305,42 +416,80 @@ object Logging {
val debugFormat = "[DEBUG] [%s] [%s] [%s] %s".intern
/**
* Obtain LoggingAdapter for the given event stream (system) and source object.
* Note that there is an implicit conversion from [[akka.actor.ActorSystem]]
* to [[akka.event.LoggingBus]].
* Obtain LoggingAdapter for the given actor system and source object. This
* will use the systems event stream and include the systems address in the
* log source string.
*
* The source is used to identify the source of this logging channel and must have
* a corresponding LogSource[T] instance in scope; by default these are
* provided for Class[_], Actor, ActorRef and String types. The source
* object is translated to a String according to the following rules:
* <ul>
* <li>if it is an Actor or ActorRef, its path is used</li>
* <li>in case of a String it is used as is</li>
* <li>in case of a class an approximation of its simpleName
* <li>and in all other cases the simpleName of its class</li>
* </ul>
* <b>Do not use this if you want to supply a log category string (like
* com.example.app.whatever) unaltered,</b> supply `system.eventStream` in this
* case or use
*
* {{{
* Logging(system, this.getClass)
* }}}
*
* The source is used to identify the source of this logging channel and
* must have a corresponding implicit LogSource[T] instance in scope; by
* default these are provided for Class[_], Actor, ActorRef and String types.
* See the companion object of [[akka.event.LogSource]] for details.
*
* You can add your own rules quite easily, see [[akka.event.LogSource]].
*/
def apply[T: LogSource](eventStream: LoggingBus, logSource: T): LoggingAdapter =
new BusLogging(eventStream, implicitly[LogSource[T]].genString(logSource))
def apply[T: LogSource](system: ActorSystem, logSource: T): LoggingAdapter = {
val (str, clazz) = LogSource(logSource, system)
new BusLogging(system.eventStream, str, clazz)
}
/**
* Java API: Obtain LoggingAdapter for the given system and source object. The
* source object is used to identify the source of this logging channel. The source
* object is translated to a String according to the following rules:
* <ul>
* <li>if it is an Actor or ActorRef, its path is used</li>
* <li>in case of a String it is used as is</li>
* <li>in case of a class an approximation of its simpleName
* <li>and in all other cases the simpleName of its class</li>
* </ul>
* Obtain LoggingAdapter for the given logging bus and source object.
*
* The source is used to identify the source of this logging channel and
* must have a corresponding implicit LogSource[T] instance in scope; by
* default these are provided for Class[_], Actor, ActorRef and String types.
* See the companion object of [[akka.event.LogSource]] for details.
*
* You can add your own rules quite easily, see [[akka.event.LogSource]].
*/
def getLogger(system: ActorSystem, logSource: AnyRef): LoggingAdapter = apply(system.eventStream, LogSource.fromAnyRef(logSource))
def apply[T: LogSource](bus: LoggingBus, logSource: T): LoggingAdapter = {
val (str, clazz) = LogSource(logSource)
new BusLogging(bus, str, clazz)
}
/**
* Java API: Obtain LoggingAdapter for the given event bus and source object. The
* source object is used to identify the source of this logging channel.
* Obtain LoggingAdapter for the given actor system and source object. This
* will use the systems event stream and include the systems address in the
* log source string.
*
* <b>Do not use this if you want to supply a log category string (like
* com.example.app.whatever) unaltered,</b> supply `system.eventStream` in this
* case or use
*
* {{{
* Logging.getLogger(system, this.getClass());
* }}}
*
* The source is used to identify the source of this logging channel and
* must have a corresponding implicit LogSource[T] instance in scope; by
* default these are provided for Class[_], Actor, ActorRef and String types.
* See the companion object of [[akka.event.LogSource]] for details.
*/
def getLogger(bus: LoggingBus, logSource: AnyRef): LoggingAdapter = apply(bus, LogSource.fromAnyRef(logSource))
def getLogger(system: ActorSystem, logSource: AnyRef): LoggingAdapter = {
val (str, clazz) = LogSource.fromAnyRef(logSource, system)
new BusLogging(system.eventStream, str, clazz)
}
/**
* Obtain LoggingAdapter for the given logging bus and source object.
*
* The source is used to identify the source of this logging channel and
* must have a corresponding implicit LogSource[T] instance in scope; by
* default these are provided for Class[_], Actor, ActorRef and String types.
* See the companion object of [[akka.event.LogSource]] for details.
*/
def getLogger(bus: LoggingBus, logSource: AnyRef): LoggingAdapter = {
val (str, clazz) = LogSource.fromAnyRef(logSource)
new BusLogging(bus, str, clazz)
}
/**
* Artificial exception injected into Error events if no Throwable is
@ -362,19 +511,34 @@ object Logging {
* The LogLevel of this LogEvent
*/
def level: LogLevel
/**
* The source of this event
*/
def logSource: String
/**
* The class of the source of this event
*/
def logClass: Class[_]
/**
* The message, may be any object or null.
*/
def message: Any
}
/**
* For ERROR Logging
*/
case class Error(cause: Throwable, logSource: String, message: Any = "") extends LogEvent {
def this(logSource: String, message: Any) = this(Error.NoCause, logSource, message)
case class Error(cause: Throwable, logSource: String, logClass: Class[_], message: Any = "") extends LogEvent {
def this(logSource: String, logClass: Class[_], message: Any) = this(Error.NoCause, logSource, logClass, message)
override def level = ErrorLevel
}
object Error {
def apply(logSource: String, message: Any) = new Error(NoCause, logSource, message)
def apply(logSource: String, logClass: Class[_], message: Any) = new Error(NoCause, logSource, logClass, message)
/** Null Object used for errors without cause Throwable */
object NoCause extends NoStackTrace
@ -383,21 +547,21 @@ object Logging {
/**
* For WARNING Logging
*/
case class Warning(logSource: String, message: Any = "") extends LogEvent {
case class Warning(logSource: String, logClass: Class[_], message: Any = "") extends LogEvent {
override def level = WarningLevel
}
/**
* For INFO Logging
*/
case class Info(logSource: String, message: Any = "") extends LogEvent {
case class Info(logSource: String, logClass: Class[_], message: Any = "") extends LogEvent {
override def level = InfoLevel
}
/**
* For DEBUG Logging
*/
case class Debug(logSource: String, message: Any = "") extends LogEvent {
case class Debug(logSource: String, logClass: Class[_], message: Any = "") extends LogEvent {
override def level = DebugLevel
}
@ -439,7 +603,7 @@ object Logging {
case e: Warning warning(e)
case e: Info info(e)
case e: Debug debug(e)
case e warning(Warning(simpleName(this), "received unexpected event of class " + e.getClass + ": " + e))
case e warning(Warning(simpleName(this), this.getClass, "received unexpected event of class " + e.getClass + ": " + e))
}
}
@ -626,7 +790,7 @@ trait LoggingAdapter {
}
}
class BusLogging(val bus: LoggingBus, val logSource: String) extends LoggingAdapter {
class BusLogging(val bus: LoggingBus, val logSource: String, val logClass: Class[_]) extends LoggingAdapter {
import Logging._
@ -635,14 +799,14 @@ class BusLogging(val bus: LoggingBus, val logSource: String) extends LoggingAdap
def isInfoEnabled = bus.logLevel >= InfoLevel
def isDebugEnabled = bus.logLevel >= DebugLevel
protected def notifyError(message: String) { bus.publish(Error(logSource, message)) }
protected def notifyError(message: String) { bus.publish(Error(logSource, logClass, message)) }
protected def notifyError(cause: Throwable, message: String) { bus.publish(Error(cause, logSource, message)) }
protected def notifyError(cause: Throwable, message: String) { bus.publish(Error(cause, logSource, logClass, message)) }
protected def notifyWarning(message: String) { bus.publish(Warning(logSource, message)) }
protected def notifyWarning(message: String) { bus.publish(Warning(logSource, logClass, message)) }
protected def notifyInfo(message: String) { bus.publish(Info(logSource, message)) }
protected def notifyInfo(message: String) { bus.publish(Info(logSource, logClass, message)) }
protected def notifyDebug(message: String) { bus.publish(Debug(logSource, message)) }
protected def notifyDebug(message: String) { bus.publish(Debug(logSource, logClass, message)) }
}

View file

@ -36,7 +36,8 @@ object LoggingReceive {
class LoggingReceive(source: AnyRef, r: Receive)(implicit system: ActorSystem) extends Receive {
def isDefinedAt(o: Any) = {
val handled = r.isDefinedAt(o)
system.eventStream.publish(Debug(LogSource.fromAnyRef(source), "received " + (if (handled) "handled" else "unhandled") + " message " + o))
val (str, clazz) = LogSource.fromAnyRef(source)
system.eventStream.publish(Debug(str, clazz, "received " + (if (handled) "handled" else "unhandled") + " message " + o))
handled
}
def apply(o: Any): Unit = r(o)

View file

@ -21,7 +21,7 @@ object JMX {
case e: InstanceAlreadyExistsException
Some(mbeanServer.getObjectInstance(name))
case e: Exception
system.eventStream.publish(Error(e, "JMX", "Error when registering mbean [%s]".format(mbean)))
system.eventStream.publish(Error(e, "JMX", this.getClass, "Error when registering mbean [%s]".format(mbean)))
None
}
@ -29,6 +29,6 @@ object JMX {
mbeanServer.unregisterMBean(mbean)
} catch {
case e: InstanceNotFoundException {}
case e: Exception system.eventStream.publish(Error(e, "JMX", "Error while unregistering mbean [%s]".format(mbean)))
case e: Exception system.eventStream.publish(Error(e, "JMX", this.getClass, "Error while unregistering mbean [%s]".format(mbean)))
}
}

View file

@ -17,8 +17,13 @@ as illustrated in this example:
.. includecode:: code/akka/docs/event/LoggingDocTestBase.java
:include: imports,my-actor
The second parameter to the ``Logging.getLogger`` is the source of this logging channel.
The source object is translated to a String according to the following rules:
The first parameter to ``Logging.getLogger`` could also be any
:class:`LoggingBus`, specifically ``system.eventStream()``; in the demonstrated
case, the actor systems address is included in the ``akkaSource``
representation of the log source (see `Logging Thread and Akka Source in MDC`_)
while in the second case this is not automatically done. The second parameter
to ``Logging.getLogger`` is the source of this logging channel. The source
object is translated to a String according to the following rules:
* if it is an Actor or ActorRef, its path is used
* in case of a String it is used as is
@ -28,6 +33,13 @@ The source object is translated to a String according to the following rules:
The log message may contain argument placeholders ``{}``, which will be substituted if the log level
is enabled.
The Java :class:`Class` of the log source is also included in the generated
:class:`LogEvent`. In case of a simple string this is replaced with a “marker”
class :class:`akka.event.DummyClassForStringSources` in order to allow special
treatment of this case, e.g. in the SLF4J event listener which will then use
the string instead of the class name for looking up the logger instance to
use.
Event Handler
=============
@ -83,8 +95,8 @@ creating the ``LoggingAdapter`` correspond to the name of the SL4FJ logger.
loglevel = "DEBUG"
}
Logging thread in MDC
---------------------
Logging Thread and Akka Source in MDC
-------------------------------------
Since the logging is done asynchronously the thread in which the logging was performed is captured in
Mapped Diagnostic Context (MDC) with attribute name ``sourceThread``.
@ -96,3 +108,22 @@ With Logback the thread name is available with ``%X{sourceThread}`` specifier wi
</layout>
</appender>
.. note::
It will probably be a good idea to use the ``sourceThread`` MDC value also in
non-Akka parts of the application in order to have this property consistently
available in the logs.
Another helpful facility is that Akka captures the actors address when
instantiating a logger within it, meaning that the full instance identification
is available for associating log messages e.g. with members of a router. This
information is available in the MDC with attribute name ``akkaSource``::
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<layout>
<pattern>%date{ISO8601} %-5level %logger{36} %X{akkaSource} - %msg%n</pattern>
</layout>
</appender>
For more details on what this attribute contains—also for non-actors—please see
`How to Log`_.

View file

@ -162,10 +162,10 @@ class ActorDocSpec extends AkkaSpec(Map("akka.loglevel" -> "INFO")) {
system.eventStream.subscribe(testActor, classOf[Logging.Info])
myActor ! "test"
expectMsgPF(1 second) { case Logging.Info(_, "received test") true }
expectMsgPF(1 second) { case Logging.Info(_, _, "received test") true }
myActor ! "unknown"
expectMsgPF(1 second) { case Logging.Info(_, "received unknown message") true }
expectMsgPF(1 second) { case Logging.Info(_, _, "received unknown message") true }
system.eventStream.unsubscribe(testActor)
system.eventStream.publish(TestEvent.UnMute(filter))

View file

@ -39,14 +39,32 @@ object LoggingDocSpec {
class MyEventListener extends Actor {
def receive = {
case InitializeLogger(_) sender ! LoggerInitialized
case Error(cause, logSource, message) // ...
case Warning(logSource, message) // ...
case Info(logSource, message) // ...
case Debug(logSource, message) // ...
case Error(cause, logSource, logClass, message) // ...
case Warning(logSource, logClass, message) // ...
case Info(logSource, logClass, message) // ...
case Debug(logSource, logClass, message) // ...
}
}
//#my-event-listener
//#my-source
import akka.event.LogSource
import akka.actor.ActorSystem
object MyType {
implicit val logSource: LogSource[AnyRef] = new LogSource[AnyRef] {
def genString(o: AnyRef): String = o.getClass.getName
override def getClazz(o: AnyRef): Class[_] = o.getClass
}
}
class MyType(system: ActorSystem) {
import MyType._
import akka.event.Logging
val log = Logging(system, this)
}
//#my-source
}
class LoggingDocSpec extends AkkaSpec {

View file

@ -22,6 +22,8 @@ For convenience you can mixin the ``log`` member into actors, instead of definin
.. code-block:: scala
class MyActor extends Actor with akka.actor.ActorLogging {
...
}
The second parameter to the ``Logging`` is the source of this logging channel.
The source object is translated to a String according to the following rules:
@ -29,17 +31,46 @@ The source object is translated to a String according to the following rules:
* if it is an Actor or ActorRef, its path is used
* in case of a String it is used as is
* in case of a class an approximation of its simpleName
* and in all other cases the simpleName of its class
* and in all other cases a compile error occurs unless and implicit
:class:`LogSource[T]` is in scope for the type in question.
The log message may contain argument placeholders ``{}``, which will be substituted if the log level
is enabled.
Translating Log Source to String and Class
------------------------------------------
The rules for translating the source object to the source string and class
which are inserted into the :class:`LogEvent` during runtime are implemented
using implicit parameters and thus fully customizable: simply create your own
instance of :class:`LogSource[T]` and have it in scope when creating the
logger.
.. includecode:: code/akka/docs/event/LoggingDocSpec.scala#my-source
This example creates a log source which mimics traditional usage of Java
loggers, which are based upon the originating objects class name as log
category. The override of :meth:`getClazz` is only included for demonstration
purposes as it contains exactly the default behavior.
.. note::
You may also create the string representation up front and pass that in as
the log source, but be aware that then the :class:`Class[_]` which will be
put in the :class:`LogEvent` is
:class:`akka.event.DummyClassForStringSources`.
The SLF4J event listener treats this case specially (using the actual string
to look up the logger instance to use instead of the class name), and you
might want to do this also in case you implement your own loggin adapter.
Event Handler
=============
Logging is performed asynchronously through an event bus. You can configure which event handlers that should
subscribe to the logging events. That is done using the 'event-handlers' element in the :ref:`configuration`.
Here you can also define the log level.
Logging is performed asynchronously through an event bus. You can configure
which event handlers that should subscribe to the logging events. That is done
using the ``event-handlers`` element in the :ref:`configuration`. Here you can
also define the log level.
.. code-block:: ruby
@ -50,7 +81,8 @@ Here you can also define the log level.
loglevel = "DEBUG"
}
The default one logs to STDOUT and is registered by default. It is not intended to be used for production. There is also an :ref:`slf4j-scala`
The default one logs to STDOUT and is registered by default. It is not intended
to be used for production. There is also an :ref:`slf4j-scala`
event handler available in the 'akka-slf4j' module.
Example of creating a listener:
@ -58,7 +90,6 @@ Example of creating a listener:
.. includecode:: code/akka/docs/event/LoggingDocSpec.scala
:include: my-event-listener
.. _slf4j-scala:
SLF4J
@ -85,8 +116,8 @@ creating the ``LoggingAdapter`` correspond to the name of the SL4FJ logger.
loglevel = "DEBUG"
}
Logging thread in MDC
---------------------
Logging Thread and Akka Source in MDC
-------------------------------------
Since the logging is done asynchronously the thread in which the logging was performed is captured in
Mapped Diagnostic Context (MDC) with attribute name ``sourceThread``.
@ -98,3 +129,22 @@ With Logback the thread name is available with ``%X{sourceThread}`` specifier wi
</layout>
</appender>
.. note::
It will probably be a good idea to use the ``sourceThread`` MDC value also in
non-Akka parts of the application in order to have this property consistently
available in the logs.
Another helpful facility is that Akka captures the actors address when
instantiating a logger within it, meaning that the full instance identification
is available for associating log messages e.g. with members of a router. This
information is available in the MDC with attribute name ``akkaSource``::
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<layout>
<pattern>%date{ISO8601} %-5level %logger{36} %X{akkaSource} - %msg%n</pattern>
</layout>
</appender>
For more details on what this attribute contains—also for non-actors—please see
`How to Log`_.

View file

@ -153,7 +153,7 @@ class RemoteSystemDaemon(system: ActorSystemImpl, remote: Remote, _path: ActorPa
override def !(msg: Any)(implicit sender: ActorRef = null): Unit = msg match {
case message: DaemonMsg
log.debug("Received command [\n{}] to RemoteSystemDaemon on [{}]", message, remote.remoteSettings.NodeName)
log.debug("Received command [{}] to RemoteSystemDaemon on [{}]", message, remote.remoteSettings.NodeName)
message match {
case DaemonMsgCreate(factory, path, supervisor)
import remote.remoteAddress

View file

@ -27,8 +27,6 @@ class RemoteActorRefProvider(
val scheduler: Scheduler,
_deadLetters: InternalActorRef) extends ActorRefProvider {
val log = Logging(eventStream, "RemoteActorRefProvider")
val remoteSettings = new RemoteSettings(settings.config, systemName)
def rootGuardian = local.rootGuardian
@ -44,6 +42,8 @@ class RemoteActorRefProvider(
val remote = new Remote(settings, remoteSettings)
implicit val transports = remote.transports
val log = Logging(eventStream, "RemoteActorRefProvider(" + remote.remoteAddress + ")")
val rootPath: ActorPath = RootActorPath(remote.remoteAddress)
private val local = new LocalActorRefProvider(systemName, settings, eventStream, scheduler, _deadLetters, rootPath, deployer)

View file

@ -60,7 +60,7 @@ abstract class RemoteClient private[akka] (
* Converts the message to the wireprotocol and sends the message across the wire
*/
def send(message: Any, senderOption: Option[ActorRef], recipient: ActorRef): Unit = if (isRunning) {
log.debug("Sending message: {}", message)
log.debug("Sending message {} from {} to {}", message, senderOption, recipient)
send((message, senderOption, recipient))
} else {
val exception = new RemoteClientException("RemoteModule client is not running, make sure you have invoked 'RemoteClient.connect()' before using it.", remoteSupport, remoteAddress)

View file

@ -8,6 +8,7 @@ import org.slf4j.{ Logger ⇒ SLFLogger, LoggerFactory ⇒ SLFLoggerFactory }
import org.slf4j.MDC
import akka.event.Logging._
import akka.actor._
import akka.event.DummyClassForStringSources
/**
* Base trait for all classes that wants to be able use the SLF4J logging infrastructure.
@ -19,6 +20,10 @@ trait SLF4JLogging {
object Logger {
def apply(logger: String): SLFLogger = SLFLoggerFactory getLogger logger
def apply(logClass: Class[_], logSource: String): SLFLogger = logClass match {
case c if c == classOf[DummyClassForStringSources] apply(logSource)
case _ SLFLoggerFactory getLogger logClass
}
def root: SLFLogger = apply(SLFLogger.ROOT_LOGGER_NAME)
}
@ -31,30 +36,31 @@ object Logger {
class Slf4jEventHandler extends Actor with SLF4JLogging {
val mdcThreadAttributeName = "sourceThread"
val mdcAkkaSourceAttributeName = "akkaSource"
def receive = {
case event @ Error(cause, logSource, message)
withMdc(mdcThreadAttributeName, event.thread.getName) {
case event @ Error(cause, logSource, logClass, message)
withMdc(logSource, event.thread.getName) {
cause match {
case Error.NoCause Logger(logSource).error(message.toString)
case _ Logger(logSource).error(message.toString, cause)
case Error.NoCause Logger(logClass, logSource).error(message.toString)
case _ Logger(logClass, logSource).error(message.toString, cause)
}
}
case event @ Warning(logSource, message)
withMdc(mdcThreadAttributeName, event.thread.getName) {
Logger(logSource).warn("{}", message.asInstanceOf[AnyRef])
case event @ Warning(logSource, logClass, message)
withMdc(logSource, event.thread.getName) {
Logger(logClass, logSource).warn("{}", message.asInstanceOf[AnyRef])
}
case event @ Info(logSource, message)
withMdc(mdcThreadAttributeName, event.thread.getName) {
Logger(logSource).info("{}", message.asInstanceOf[AnyRef])
case event @ Info(logSource, logClass, message)
withMdc(logSource, event.thread.getName) {
Logger(logClass, logSource).info("{}", message.asInstanceOf[AnyRef])
}
case event @ Debug(logSource, message)
withMdc(mdcThreadAttributeName, event.thread.getName) {
Logger(logSource).debug("{}", message.asInstanceOf[AnyRef])
case event @ Debug(logSource, logClass, message)
withMdc(logSource, event.thread.getName) {
Logger(logClass, logSource).debug("{}", message.asInstanceOf[AnyRef])
}
case InitializeLogger(_)
@ -63,12 +69,14 @@ class Slf4jEventHandler extends Actor with SLF4JLogging {
}
@inline
final def withMdc(name: String, value: String)(logStatement: Unit) {
MDC.put(name, value)
final def withMdc(logSource: String, thread: String)(logStatement: Unit) {
MDC.put(mdcAkkaSourceAttributeName, logSource)
MDC.put(mdcThreadAttributeName, thread)
try {
logStatement
} finally {
MDC.remove(name)
MDC.remove(mdcAkkaSourceAttributeName)
MDC.remove(mdcThreadAttributeName)
}
}

View file

@ -254,7 +254,7 @@ case class ErrorFilter(
def matches(event: LogEvent) = {
event match {
case Error(cause, src, msg) if throwable isInstance cause
case Error(cause, src, _, msg) if throwable isInstance cause
(msg == null && cause.getMessage == null && cause.getStackTrace.length == 0) ||
doMatch(src, msg) || doMatch(src, cause.getMessage)
case _ false
@ -305,7 +305,7 @@ case class WarningFilter(
def matches(event: LogEvent) = {
event match {
case Warning(src, msg) doMatch(src, msg)
case Warning(src, _, msg) doMatch(src, msg)
case _ false
}
}
@ -348,7 +348,7 @@ case class InfoFilter(
def matches(event: LogEvent) = {
event match {
case Info(src, msg) doMatch(src, msg)
case Info(src, _, msg) doMatch(src, msg)
case _ false
}
}
@ -391,7 +391,7 @@ case class DebugFilter(
def matches(event: LogEvent) = {
event match {
case Debug(src, msg) doMatch(src, msg)
case Debug(src, _, msg) doMatch(src, msg)
case _ false
}
}
@ -456,15 +456,15 @@ class TestEventListener extends Logging.DefaultLogger {
case event: LogEvent if (!filter(event)) print(event)
case DeadLetter(msg: SystemMessage, _, rcp)
if (!msg.isInstanceOf[Terminate]) {
val event = Warning(rcp.path.toString, "received dead system message: " + msg)
val event = Warning(rcp.path.toString, rcp.getClass, "received dead system message: " + msg)
if (!filter(event)) print(event)
}
case DeadLetter(msg, snd, rcp)
if (!msg.isInstanceOf[Terminated]) {
val event = Warning(rcp.path.toString, "received dead letter from " + snd + ": " + msg)
val event = Warning(rcp.path.toString, rcp.getClass, "received dead letter from " + snd + ": " + msg)
if (!filter(event)) print(event)
}
case m print(Debug(context.system.name, m))
case m print(Debug(context.system.name, this.getClass, m))
}
def filter(event: LogEvent): Boolean = filters exists (f try { f(event) } catch { case e: Exception false })

View file

@ -81,7 +81,7 @@ object TestActorRefSpec {
var count = 0
var msg: String = _
def receive = {
case Warning(_, m: String) count += 1; msg = m
case Warning(_, _, m: String) count += 1; msg = m
}
}