From 6a8e516b6ce13e0e5d161f1c57986f120ead9bdf Mon Sep 17 00:00:00 2001 From: Roland Date: Fri, 18 Nov 2011 11:59:43 +0100 Subject: [PATCH] change source tag in log events from AnyRef to String MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - ensure that no “complex” things are attached to a LogEvent (think serialization) - ensure no escaping the “this” reference via LoggingBus during constructors (e.g. ActorSystem) - change it so that + Actor/ActorRef are represented by their address + Class[_] by simpleName + String by itself - this means that people need to think a little more while deciding how “this” should look like in logging (which I think is a good thing) --- .../scala/akka/actor/ActorLifeCycleSpec.scala | 20 ++-- .../test/scala/akka/actor/FSMActorSpec.scala | 14 ++- .../test/scala/akka/actor/FSMTimingSpec.scala | 4 +- .../src/test/scala/akka/actor/IOActor.scala | 2 +- .../scala/akka/actor/LoggingReceiveSpec.scala | 37 +++--- .../akka/actor/dispatch/ActorModelSpec.scala | 6 +- .../scala/akka/event/EventStreamSpec.scala | 2 +- .../akka/performance/workbench/Report.scala | 2 +- .../src/main/scala/akka/actor/Actor.scala | 3 +- .../src/main/scala/akka/actor/ActorCell.scala | 36 +++--- .../scala/akka/actor/ActorRefProvider.scala | 2 +- .../main/scala/akka/actor/ActorSystem.scala | 2 +- .../src/main/scala/akka/actor/Deployer.scala | 2 +- akka-actor/src/main/scala/akka/actor/IO.scala | 28 ++--- .../akka/dispatch/AbstractDispatcher.scala | 2 +- .../main/scala/akka/dispatch/Dispatcher.scala | 2 +- .../src/main/scala/akka/dispatch/Future.scala | 14 +-- .../main/scala/akka/dispatch/Mailbox.scala | 2 +- .../main/scala/akka/event/EventStream.scala | 15 ++- .../src/main/scala/akka/event/Logging.scala | 109 ++++++++++++------ akka-actor/src/main/scala/akka/util/JMX.scala | 4 +- .../actor/mailbox/BeanstalkBasedMailbox.scala | 2 +- .../actor/mailbox/FiledBasedMailbox.scala | 2 +- .../actor/mailbox/MongoBasedMailbox.scala | 2 +- .../actor/mailbox/RedisBasedMailbox.scala | 2 +- .../actor/mailbox/ZooKeeperBasedMailbox.scala | 2 +- .../src/main/scala/akka/remote/Gossiper.scala | 2 +- .../src/main/scala/akka/remote/Remote.scala | 4 +- .../akka/remote/RemoteActorRefProvider.scala | 3 +- .../akka/remote/RemoteConnectionManager.scala | 2 +- .../remote/netty/NettyRemoteSupport.scala | 8 +- .../testkit/CallingThreadDispatcher.scala | 6 +- .../akka/testkit/TestEventListener.scala | 40 +++---- .../src/main/scala/akka/testkit/TestKit.scala | 2 +- .../test/scala/akka/testkit/AkkaSpec.scala | 2 +- 35 files changed, 213 insertions(+), 174 deletions(-) diff --git a/akka-actor-tests/src/test/scala/akka/actor/ActorLifeCycleSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/ActorLifeCycleSpec.scala index 23373a8af6..5d3358dc6f 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/ActorLifeCycleSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/ActorLifeCycleSpec.scala @@ -14,13 +14,7 @@ import java.util.concurrent.atomic._ object ActorLifeCycleSpec { -} - -@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) -class ActorLifeCycleSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSender { - import ActorLifeCycleSpec._ - - class LifeCycleTestActor(id: String, generationProvider: AtomicInteger) extends Actor { + class LifeCycleTestActor(testActor: ActorRef, id: String, generationProvider: AtomicInteger) extends Actor { def report(msg: Any) = testActor ! message(msg) def message(msg: Any): Tuple3[Any, String, Int] = (msg, id, currentGen) val currentGen = generationProvider.getAndIncrement() @@ -29,6 +23,12 @@ class ActorLifeCycleSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitS def receive = { case "status" ⇒ sender ! message("OK") } } +} + +@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) +class ActorLifeCycleSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSender { + import ActorLifeCycleSpec._ + "An Actor" must { "invoke preRestart, preStart, postRestart when using OneForOneStrategy" in { @@ -36,7 +36,7 @@ class ActorLifeCycleSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitS val id = newUuid().toString val supervisor = actorOf(Props[Supervisor].withFaultHandler(OneForOneStrategy(List(classOf[Exception]), Some(3)))) val gen = new AtomicInteger(0) - val restarterProps = Props(new LifeCycleTestActor(id, gen) { + val restarterProps = Props(new LifeCycleTestActor(testActor, id, gen) { override def preRestart(reason: Throwable, message: Option[Any]) { report("preRestart") } override def postRestart(reason: Throwable) { report("postRestart") } }) @@ -70,7 +70,7 @@ class ActorLifeCycleSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitS val id = newUuid().toString val supervisor = actorOf(Props[Supervisor].withFaultHandler(OneForOneStrategy(List(classOf[Exception]), Some(3)))) val gen = new AtomicInteger(0) - val restarterProps = Props(new LifeCycleTestActor(id, gen)) + val restarterProps = Props(new LifeCycleTestActor(testActor, id, gen)) val restarter = (supervisor ? restarterProps).as[ActorRef].get expectMsg(("preStart", id, 0)) @@ -100,7 +100,7 @@ class ActorLifeCycleSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitS val id = newUuid().toString val supervisor = actorOf(Props[Supervisor].withFaultHandler(OneForOneStrategy(List(classOf[Exception]), Some(3)))) val gen = new AtomicInteger(0) - val props = Props(new LifeCycleTestActor(id, gen)) + val props = Props(new LifeCycleTestActor(testActor, id, gen)) val a = (supervisor ? props).as[ActorRef].get expectMsg(("preStart", id, 0)) a ! "status" diff --git a/akka-actor-tests/src/test/scala/akka/actor/FSMActorSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/FSMActorSpec.scala index 2409d80734..69ed7e37ae 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/FSMActorSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/FSMActorSpec.scala @@ -166,11 +166,12 @@ class FSMActorSpec extends AkkaSpec(Configuration("akka.actor.debug.fsm" -> true case Ev("go") ⇒ goto(2) } }) + val name = fsm.toString filterException[Logging.EventHandlerException] { system.eventStream.subscribe(testActor, classOf[Logging.Error]) fsm ! "go" expectMsgPF(1 second, hint = "Next state 2 does not exist") { - case Logging.Error(_, `fsm`, "Next state 2 does not exist") ⇒ true + case Logging.Error(_, `name`, "Next state 2 does not exist") ⇒ true } system.eventStream.unsubscribe(testActor) } @@ -213,18 +214,19 @@ class FSMActorSpec extends AkkaSpec(Configuration("akka.actor.debug.fsm" -> true case StopEvent(r, _, _) ⇒ testActor ! r } }) + val name = fsm.toString system.eventStream.subscribe(testActor, classOf[Logging.Debug]) fsm ! "go" expectMsgPF(1 second, hint = "processing Event(go,null)") { - case Logging.Debug(`fsm`, s: String) if s.startsWith("processing Event(go,null) from Actor[") ⇒ true + case Logging.Debug(`name`, s: String) if s.startsWith("processing Event(go,null) from Actor[") ⇒ true } - expectMsg(1 second, Logging.Debug(fsm, "setting timer 't'/1500 milliseconds: Shutdown")) - expectMsg(1 second, Logging.Debug(fsm, "transition 1 -> 2")) + expectMsg(1 second, Logging.Debug(name, "setting timer 't'/1500 milliseconds: Shutdown")) + expectMsg(1 second, Logging.Debug(name, "transition 1 -> 2")) fsm ! "stop" expectMsgPF(1 second, hint = "processing Event(stop,null)") { - case Logging.Debug(`fsm`, s: String) if s.startsWith("processing Event(stop,null) from Actor[") ⇒ true + case Logging.Debug(`name`, s: String) if s.startsWith("processing Event(stop,null) from Actor[") ⇒ true } - expectMsgAllOf(1 second, Logging.Debug(fsm, "canceling timer 't'"), Normal) + expectMsgAllOf(1 second, Logging.Debug(name, "canceling timer 't'"), Normal) expectNoMsg(1 second) system.eventStream.unsubscribe(testActor) } diff --git a/akka-actor-tests/src/test/scala/akka/actor/FSMTimingSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/FSMTimingSpec.scala index 6e243fc020..e915b69ecd 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/FSMTimingSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/FSMTimingSpec.scala @@ -106,8 +106,8 @@ class FSMTimingSpec extends AkkaSpec with ImplicitSender { } "notify unhandled messages" taggedAs TimingTest in { - filterEvents(EventFilter.warning("unhandled event Tick in state TestUnhandled", source = fsm, occurrences = 1), - EventFilter.warning("unhandled event Unhandled(test) in state TestUnhandled", source = fsm, occurrences = 1)) { + filterEvents(EventFilter.warning("unhandled event Tick in state TestUnhandled", source = fsm.toString, occurrences = 1), + EventFilter.warning("unhandled event Unhandled(test) in state TestUnhandled", source = fsm.toString, occurrences = 1)) { fsm ! TestUnhandled within(1 second) { fsm ! Tick diff --git a/akka-actor-tests/src/test/scala/akka/actor/IOActor.scala b/akka-actor-tests/src/test/scala/akka/actor/IOActor.scala index d4f08e40c2..3765ad5b6c 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/IOActor.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/IOActor.scala @@ -42,7 +42,7 @@ object IOActorSpec { class SimpleEchoClient(host: String, port: Int, ioManager: ActorRef) extends Actor with IO { - lazy val socket: SocketHandle = connect(ioManager, host, port, reader) + lazy val socket: SocketHandle = connect(ioManager, host, port)(reader) lazy val reader: ActorRef = context.actorOf { new Actor with IO { def receiveIO = { diff --git a/akka-actor-tests/src/test/scala/akka/actor/LoggingReceiveSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/LoggingReceiveSpec.scala index 858c0dcdea..207eacb74e 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/LoggingReceiveSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/LoggingReceiveSpec.scala @@ -57,9 +57,9 @@ class LoggingReceiveSpec extends WordSpec with BeforeAndAfterEach with BeforeAnd val r: Actor.Receive = { case null ⇒ } - val log = Actor.LoggingReceive(this, r) + val log = Actor.LoggingReceive("funky", r) log.isDefinedAt("hallo") - expectMsg(1 second, Logging.Debug(this, "received unhandled message hallo")) + expectMsg(1 second, Logging.Debug("funky", "received unhandled message hallo")) } } @@ -73,9 +73,10 @@ class LoggingReceiveSpec extends WordSpec with BeforeAndAfterEach with BeforeAnd case _ ⇒ sender ! "x" } }) + val name = actor.toString actor ! "buh" within(1 second) { - expectMsg(Logging.Debug(actor.underlyingActor, "received handled message buh")) + expectMsg(Logging.Debug(name, "received handled message buh")) expectMsg("x") } val r: Actor.Receive = { @@ -86,7 +87,7 @@ class LoggingReceiveSpec extends WordSpec with BeforeAndAfterEach with BeforeAnd within(500 millis) { actor ! "bah" expectMsgPF() { - case Logging.Error(_: UnhandledMessageException, `actor`, _) ⇒ true + case Logging.Error(_: UnhandledMessageException, `name`, _) ⇒ true } } } @@ -103,7 +104,7 @@ class LoggingReceiveSpec extends WordSpec with BeforeAndAfterEach with BeforeAnd }) actor ! "buh" within(1 second) { - expectMsg(Logging.Debug(actor.underlyingActor, "received handled message buh")) + expectMsg(Logging.Debug(actor.toString, "received handled message buh")) expectMsg("x") } } @@ -121,9 +122,10 @@ class LoggingReceiveSpec extends WordSpec with BeforeAndAfterEach with BeforeAnd case _ ⇒ } }) + val name = actor.toString actor ! PoisonPill expectMsgPF() { - case Logging.Debug(`actor`, msg: String) if msg startsWith "received AutoReceiveMessage Envelope(PoisonPill" ⇒ true + case Logging.Debug(`name`, msg: String) if msg startsWith "received AutoReceiveMessage Envelope(PoisonPill" ⇒ true } awaitCond(actor.isShutdown, 100 millis) } @@ -141,20 +143,23 @@ class LoggingReceiveSpec extends WordSpec with BeforeAndAfterEach with BeforeAnd system.eventStream.subscribe(testActor, classOf[Logging.Error]) within(3 seconds) { val lifecycleGuardian = appLifecycle.asInstanceOf[ActorSystemImpl].guardian + val lname = lifecycleGuardian.toString val supervisor = TestActorRef[TestLogActor](Props[TestLogActor].withFaultHandler(OneForOneStrategy(List(classOf[Throwable]), 5, 5000))) + val sname = supervisor.toString val supervisorSet = receiveWhile(messages = 2) { - case Logging.Debug(`lifecycleGuardian`, msg: String) if msg startsWith "now supervising" ⇒ 1 - case Logging.Debug(`supervisor`, msg: String) if msg startsWith "started" ⇒ 2 + case Logging.Debug(`lname`, msg: String) if msg startsWith "now supervising" ⇒ 1 + case Logging.Debug(`sname`, msg: String) if msg startsWith "started" ⇒ 2 }.toSet expectNoMsg(Duration.Zero) assert(supervisorSet == Set(1, 2), supervisorSet + " was not Set(1, 2)") val actor = TestActorRef[TestLogActor](Props[TestLogActor], supervisor, "none") + val aname = actor.toString val set = receiveWhile(messages = 2) { - case Logging.Debug(`supervisor`, msg: String) if msg startsWith "now supervising" ⇒ 1 - case Logging.Debug(`actor`, msg: String) if msg startsWith "started" ⇒ 2 + case Logging.Debug(`sname`, msg: String) if msg startsWith "now supervising" ⇒ 1 + case Logging.Debug(`aname`, msg: String) if msg startsWith "started" ⇒ 2 }.toSet expectNoMsg(Duration.Zero) assert(set == Set(1, 2), set + " was not Set(1, 2)") @@ -174,18 +179,18 @@ class LoggingReceiveSpec extends WordSpec with BeforeAndAfterEach with BeforeAnd filterException[ActorKilledException] { actor ! Kill val set = receiveWhile(messages = 3) { - case Logging.Error(_: ActorKilledException, `actor`, "Kill") ⇒ 1 - case Logging.Debug(`actor`, "restarting") ⇒ 2 - case Logging.Debug(`actor`, "restarted") ⇒ 3 + case Logging.Error(_: ActorKilledException, `aname`, "Kill") ⇒ 1 + case Logging.Debug(`aname`, "restarting") ⇒ 2 + case Logging.Debug(`aname`, "restarted") ⇒ 3 }.toSet expectNoMsg(Duration.Zero) assert(set == Set(1, 2, 3), set + " was not Set(1, 2, 3)") } supervisor.stop() - expectMsg(Logging.Debug(supervisor, "stopping")) - expectMsg(Logging.Debug(actor, "stopped")) - expectMsg(Logging.Debug(supervisor, "stopped")) + expectMsg(Logging.Debug(sname, "stopping")) + expectMsg(Logging.Debug(aname, "stopped")) + expectMsg(Logging.Debug(sname, "stopped")) } } } diff --git a/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala index 33cea49284..450c201a75 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala @@ -147,7 +147,7 @@ object ActorModelSpec { await(deadline)(stops == dispatcher.stops.get) } catch { case e ⇒ - system.eventStream.publish(Error(e, dispatcher, "actual: stops=" + dispatcher.stops.get + + system.eventStream.publish(Error(e, dispatcher.toString, "actual: stops=" + dispatcher.stops.get + " required: stops=" + stops)) throw e } @@ -204,7 +204,7 @@ object ActorModelSpec { await(deadline)(stats.restarts.get() == restarts) } catch { case e ⇒ - system.eventStream.publish(Error(e, dispatcher, "actual: " + stats + ", required: InterceptorStats(susp=" + suspensions + + system.eventStream.publish(Error(e, dispatcher.toString, "actual: " + stats + ", required: InterceptorStats(susp=" + suspensions + ",res=" + resumes + ",reg=" + registers + ",unreg=" + unregisters + ",recv=" + msgsReceived + ",proc=" + msgsProcessed + ",restart=" + restarts)) throw e @@ -310,7 +310,7 @@ abstract class ActorModelSpec extends AkkaSpec { try { f } catch { - case e ⇒ system.eventStream.publish(Error(e, this, "error in spawned thread")) + case e ⇒ system.eventStream.publish(Error(e, "spawn", "error in spawned thread")) } } } diff --git a/akka-actor-tests/src/test/scala/akka/event/EventStreamSpec.scala b/akka-actor-tests/src/test/scala/akka/event/EventStreamSpec.scala index d5371af0b9..6314561897 100644 --- a/akka-actor-tests/src/test/scala/akka/event/EventStreamSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/event/EventStreamSpec.scala @@ -102,7 +102,7 @@ class EventStreamSpec extends AkkaSpec(Configuration( private def verifyLevel(bus: LoggingBus, level: Logging.LogLevel) { import Logging._ - val allmsg = Seq(Debug(this, "debug"), Info(this, "info"), Warning(this, "warning"), Error(this, "error")) + val allmsg = Seq(Debug("", "debug"), Info("", "info"), Warning("", "warning"), Error("", "error")) val msg = allmsg filter (_.level <= level) allmsg foreach bus.publish msg foreach (x ⇒ expectMsg(x)) diff --git a/akka-actor-tests/src/test/scala/akka/performance/workbench/Report.scala b/akka-actor-tests/src/test/scala/akka/performance/workbench/Report.scala index 83e8b182bf..05319e0772 100644 --- a/akka-actor-tests/src/test/scala/akka/performance/workbench/Report.scala +++ b/akka-actor-tests/src/test/scala/akka/performance/workbench/Report.scala @@ -15,7 +15,7 @@ class Report( compareResultWith: Option[String] = None) { private def doLog = System.getProperty("benchmark.logResult", "true").toBoolean - val log = Logging(system, this) + val log = Logging(system, "Report") val dateTimeFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm") val legendTimeFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm") diff --git a/akka-actor/src/main/scala/akka/actor/Actor.scala b/akka-actor/src/main/scala/akka/actor/Actor.scala index 30eebb2f87..6472153d75 100644 --- a/akka-actor/src/main/scala/akka/actor/Actor.scala +++ b/akka-actor/src/main/scala/akka/actor/Actor.scala @@ -14,6 +14,7 @@ import akka.cluster.ClusterNode import akka.japi.{ Creator, Procedure } import akka.serialization.{ Serializer, Serialization } import akka.event.Logging.Debug +import akka.event.LogSource import akka.experimental import akka.AkkaException @@ -167,7 +168,7 @@ object Actor { class LoggingReceive(source: AnyRef, r: Receive)(implicit system: ActorSystem) extends Receive { def isDefinedAt(o: Any) = { val handled = r.isDefinedAt(o) - system.eventStream.publish(Debug(source, "received " + (if (handled) "handled" else "unhandled") + " message " + o)) + system.eventStream.publish(Debug(LogSource.fromAnyRef(source), "received " + (if (handled) "handled" else "unhandled") + " message " + o)) handled } def apply(o: Any): Unit = r(o) diff --git a/akka-actor/src/main/scala/akka/actor/ActorCell.scala b/akka-actor/src/main/scala/akka/actor/ActorCell.scala index 2685f6ca85..8a5f0eff75 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorCell.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorCell.scala @@ -17,7 +17,7 @@ import akka.event.Logging.{ Debug, Warning, Error } */ trait ActorContext extends ActorRefFactory with TypedActorFactory { - def self: ActorRef with ScalaActorRef + def self: ActorRef def hasMessages: Boolean @@ -174,11 +174,11 @@ private[akka] class ActorCell( actor = created created.preStart() checkReceiveTimeout - if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self, "started (" + actor + ")")) + if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.toString, "started (" + actor + ")")) } catch { case e ⇒ try { - system.eventStream.publish(Error(e, self, "error while creating actor")) + system.eventStream.publish(Error(e, self.toString, "error while creating actor")) // prevent any further messages to be processed until the actor has been restarted dispatcher.suspend(this) } finally { @@ -188,7 +188,7 @@ private[akka] class ActorCell( def recreate(cause: Throwable): Unit = try { val failedActor = actor - if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self, "restarting")) + if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.toString, "restarting")) val freshActor = newActor() if (failedActor ne null) { val c = currentMessage //One read only plz @@ -202,14 +202,14 @@ private[akka] class ActorCell( } actor = freshActor // assign it here so if preStart fails, we can null out the sef-refs next call freshActor.postRestart(cause) - if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self, "restarted")) + if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.toString, "restarted")) dispatcher.resume(this) //FIXME should this be moved down? props.faultHandler.handleSupervisorRestarted(cause, self, children) } catch { case e ⇒ try { - system.eventStream.publish(Error(e, self, "error while creating actor")) + system.eventStream.publish(Error(e, self.toString, "error while creating actor")) // prevent any further messages to be processed until the actor has been restarted dispatcher.suspend(this) } finally { @@ -228,7 +228,7 @@ private[akka] class ActorCell( val c = children if (c.isEmpty) doTerminate() else { - if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self, "stopping")) + if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.toString, "stopping")) for (child ← c) child.stop() stopping = true } @@ -239,8 +239,8 @@ private[akka] class ActorCell( if (!stats.contains(child)) { childrenRefs = childrenRefs.updated(child.name, child) childrenStats = childrenStats.updated(child, ChildRestartStats()) - if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self, "now supervising " + child)) - } else system.eventStream.publish(Warning(self, "Already supervising " + child)) + if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.toString, "now supervising " + child)) + } else system.eventStream.publish(Warning(self.toString, "Already supervising " + child)) } try { @@ -255,10 +255,10 @@ private[akka] class ActorCell( case Recreate(cause) ⇒ recreate(cause) case Link(subject) ⇒ system.deathWatch.subscribe(self, subject) - if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self, "now monitoring " + subject)) + if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.toString, "now monitoring " + subject)) case Unlink(subject) ⇒ system.deathWatch.unsubscribe(self, subject) - if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self, "stopped monitoring " + subject)) + if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.toString, "stopped monitoring " + subject)) case Suspend() ⇒ suspend() case Resume() ⇒ resume() case Terminate() ⇒ terminate() @@ -267,7 +267,7 @@ private[akka] class ActorCell( } } catch { case e ⇒ //Should we really catch everything here? - system.eventStream.publish(Error(e, self, "error while processing " + message)) + system.eventStream.publish(Error(e, self.toString, "error while processing " + message)) //TODO FIXME How should problems here be handled? throw e } @@ -294,7 +294,7 @@ private[akka] class ActorCell( currentMessage = null // reset current message after successful invocation } catch { case e ⇒ - system.eventStream.publish(Error(e, self, e.getMessage)) + system.eventStream.publish(Error(e, self.toString, e.getMessage)) // prevent any further messages to be processed until the actor has been restarted dispatcher.suspend(this) @@ -314,7 +314,7 @@ private[akka] class ActorCell( } } catch { case e ⇒ - system.eventStream.publish(Error(e, self, e.getMessage)) + system.eventStream.publish(Error(e, self.toString, e.getMessage)) throw e } } @@ -332,7 +332,7 @@ private[akka] class ActorCell( } def autoReceiveMessage(msg: Envelope) { - if (system.settings.DebugAutoReceive) system.eventStream.publish(Debug(self, "received AutoReceiveMessage " + msg)) + if (system.settings.DebugAutoReceive) system.eventStream.publish(Debug(self.toString, "received AutoReceiveMessage " + msg)) if (stopping) msg.message match { case ChildTerminated ⇒ handleChildTerminated(sender) @@ -350,7 +350,7 @@ private[akka] class ActorCell( private def doTerminate() { if (!system.provider.evict(self.path.toString)) - system.eventStream.publish(Warning(self, "evict of " + self.path.toString + " failed")) + system.eventStream.publish(Warning(self.toString, "evict of " + self.path.toString + " failed")) dispatcher.detach(this) @@ -361,7 +361,7 @@ private[akka] class ActorCell( try { parent.tell(ChildTerminated, self) system.deathWatch.publish(Terminated(self)) - if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self, "stopped")) + if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.toString, "stopped")) } finally { currentMessage = null clearActorFields() @@ -371,7 +371,7 @@ private[akka] class ActorCell( final def handleFailure(child: ActorRef, cause: Throwable): Unit = childrenStats.get(child) match { case Some(stats) ⇒ if (!props.faultHandler.handleFailure(child, cause, stats, childrenStats)) throw cause - case None ⇒ system.eventStream.publish(Warning(self, "dropping Failed(" + cause + ") from unknown child")) + case None ⇒ system.eventStream.publish(Warning(self.toString, "dropping Failed(" + cause + ") from unknown child")) } final def handleChildTerminated(child: ActorRef): Unit = { diff --git a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala index 5770a5959f..803410b67c 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala @@ -144,7 +144,7 @@ class LocalActorRefProvider( val dispatcher: MessageDispatcher, val scheduler: Scheduler) extends ActorRefProvider { - val log = Logging(eventStream, this) + val log = Logging(eventStream, "LocalActorRefProvider") // FIXME remove/replave (clustering shall not leak into akka-actor) val nodename: String = System.getProperty("akka.cluster.nodename") match { diff --git a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala index 6be2325385..b4416ae5fb 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala @@ -188,7 +188,7 @@ class ActorSystemImpl(val name: String, config: Configuration) extends ActorSyst // this provides basic logging (to stdout) until .start() is called below val eventStream = new EventStream(DebugEventStream) eventStream.startStdoutLogger(settings) - val log = new BusLogging(eventStream, this) // “this” used only for .getClass in tagging messages + val log = new BusLogging(eventStream, "ActorSystem") // “this” used only for .getClass in tagging messages /** * The root actor path for this application. diff --git a/akka-actor/src/main/scala/akka/actor/Deployer.scala b/akka-actor/src/main/scala/akka/actor/Deployer.scala index 4b5d64bde6..7a2ad320b6 100644 --- a/akka-actor/src/main/scala/akka/actor/Deployer.scala +++ b/akka-actor/src/main/scala/akka/actor/Deployer.scala @@ -36,7 +36,7 @@ trait ActorDeployer { class Deployer(val settings: ActorSystem.Settings, val eventStream: EventStream, val nodename: String) extends ActorDeployer { val deploymentConfig = new DeploymentConfig(nodename) - val log = Logging(eventStream, this) + val log = Logging(eventStream, "Deployer") val instance: ActorDeployer = { val deployer = new LocalDeployer() diff --git a/akka-actor/src/main/scala/akka/actor/IO.scala b/akka-actor/src/main/scala/akka/actor/IO.scala index 65e325bfe8..aeb4e53573 100644 --- a/akka-actor/src/main/scala/akka/actor/IO.scala +++ b/akka-actor/src/main/scala/akka/actor/IO.scala @@ -71,13 +71,11 @@ object IO { case class ServerHandle(owner: ActorRef, ioManager: ActorRef, uuid: UUID = new UUID()) extends Handle { override def asServer = this - def accept(socketOwner: ActorRef): SocketHandle = { + def accept()(implicit socketOwner: ActorRef): SocketHandle = { val socket = SocketHandle(socketOwner, ioManager) ioManager ! Accept(socket, this) socket } - - def accept()(implicit socketOwner: ScalaActorRef): SocketHandle = accept(socketOwner) } sealed trait IOMessage @@ -91,35 +89,23 @@ object IO { case class Read(handle: ReadHandle, bytes: ByteString) extends IOMessage case class Write(handle: WriteHandle, bytes: ByteString) extends IOMessage - def listen(ioManager: ActorRef, address: InetSocketAddress, owner: ActorRef): ServerHandle = { + def listen(ioManager: ActorRef, address: InetSocketAddress)(implicit owner: ActorRef): ServerHandle = { val server = ServerHandle(owner, ioManager) ioManager ! Listen(server, address) server } - def listen(ioManager: ActorRef, address: InetSocketAddress)(implicit sender: ScalaActorRef): ServerHandle = - listen(ioManager, address, sender) + def listen(ioManager: ActorRef, host: String, port: Int)(implicit owner: ActorRef): ServerHandle = + listen(ioManager, new InetSocketAddress(host, port)) - def listen(ioManager: ActorRef, host: String, port: Int, owner: ActorRef): ServerHandle = - listen(ioManager, new InetSocketAddress(host, port), owner) - - def listen(ioManager: ActorRef, host: String, port: Int)(implicit sender: ScalaActorRef): ServerHandle = - listen(ioManager, new InetSocketAddress(host, port), sender) - - def connect(ioManager: ActorRef, address: InetSocketAddress, owner: ActorRef): SocketHandle = { + def connect(ioManager: ActorRef, address: InetSocketAddress)(implicit owner: ActorRef): SocketHandle = { val socket = SocketHandle(owner, ioManager) ioManager ! Connect(socket, address) socket } - def connect(ioManager: ActorRef, address: InetSocketAddress)(implicit sender: ScalaActorRef): SocketHandle = - connect(ioManager, address, sender) - - def connect(ioManager: ActorRef, host: String, port: Int, owner: ActorRef): SocketHandle = - connect(ioManager, new InetSocketAddress(host, port), owner) - - def connect(ioManager: ActorRef, host: String, port: Int)(implicit sender: ScalaActorRef): SocketHandle = - connect(ioManager, new InetSocketAddress(host, port), sender) + def connect(ioManager: ActorRef, host: String, port: Int)(implicit sender: ActorRef): SocketHandle = + connect(ioManager, new InetSocketAddress(host, port)) private class HandleState(var readBytes: ByteString, var connected: Boolean) { def this() = this(ByteString.empty, false) diff --git a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala index 3d66532d31..aa3edd69ac 100644 --- a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala @@ -69,7 +69,7 @@ final case class TaskInvocation(eventStream: EventStream, function: () ⇒ Unit, try { function() } catch { - case e ⇒ eventStream.publish(Error(e, this, e.getMessage)) + case e ⇒ eventStream.publish(Error(e, "TaskInvocation", e.getMessage)) } finally { cleanup() } diff --git a/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala index 586586b7d5..e8f799c414 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala @@ -99,7 +99,7 @@ class Dispatcher( executorService.get() execute invocation } catch { case e2: RejectedExecutionException ⇒ - prerequisites.eventStream.publish(Warning(this, e2.toString)) + prerequisites.eventStream.publish(Warning("Dispatcher", e2.toString)) throw e2 } } diff --git a/akka-actor/src/main/scala/akka/dispatch/Future.scala b/akka-actor/src/main/scala/akka/dispatch/Future.scala index 2160c406cd..b57ff39512 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Future.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Future.scala @@ -262,7 +262,7 @@ object Future { result completeWithResult currentValue } catch { case e: Exception ⇒ - dispatcher.prerequisites.eventStream.publish(Error(e, this, e.getMessage)) + dispatcher.prerequisites.eventStream.publish(Error(e, "Future.fold", e.getMessage)) result completeWithException e } finally { results.clear @@ -631,7 +631,7 @@ sealed trait Future[+T] extends japi.Future[T] { Right(f(res)) } catch { case e: Exception ⇒ - dispatcher.prerequisites.eventStream.publish(Error(e, this, e.getMessage)) + dispatcher.prerequisites.eventStream.publish(Error(e, "Future.map", e.getMessage)) Left(e) }) } @@ -683,7 +683,7 @@ sealed trait Future[+T] extends japi.Future[T] { future.completeWith(f(r)) } catch { case e: Exception ⇒ - dispatcher.prerequisites.eventStream.publish(Error(e, this, e.getMessage)) + dispatcher.prerequisites.eventStream.publish(Error(e, "Future.flatMap", e.getMessage)) future complete Left(e) } } @@ -716,7 +716,7 @@ sealed trait Future[+T] extends japi.Future[T] { if (p(res)) r else Left(new MatchError(res)) } catch { case e: Exception ⇒ - dispatcher.prerequisites.eventStream.publish(Error(e, this, e.getMessage)) + dispatcher.prerequisites.eventStream.publish(Error(e, "Future.filter", e.getMessage)) Left(e) }) } @@ -788,7 +788,7 @@ trait Promise[T] extends Future[T] { fr completeWith cont(f) } catch { case e: Exception ⇒ - dispatcher.prerequisites.eventStream.publish(Error(e, this, e.getMessage)) + dispatcher.prerequisites.eventStream.publish(Error(e, "Promise.completeWith", e.getMessage)) fr completeWithException e } } @@ -802,7 +802,7 @@ trait Promise[T] extends Future[T] { fr completeWith cont(f) } catch { case e: Exception ⇒ - dispatcher.prerequisites.eventStream.publish(Error(e, this, e.getMessage)) + dispatcher.prerequisites.eventStream.publish(Error(e, "Promise.completeWith", e.getMessage)) fr completeWithException e } } @@ -994,7 +994,7 @@ class DefaultPromise[T](val timeout: Timeout)(implicit val dispatcher: MessageDi } else this private def notifyCompleted(func: Future[T] ⇒ Unit) { - try { func(this) } catch { case e ⇒ dispatcher.prerequisites.eventStream.publish(Error(e, this, "Future onComplete-callback raised an exception")) } //TODO catch, everything? Really? + try { func(this) } catch { case e ⇒ dispatcher.prerequisites.eventStream.publish(Error(e, "Future", "Future onComplete-callback raised an exception")) } //TODO catch, everything? Really? } @inline diff --git a/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala b/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala index b752653f1d..ddbebdf3ef 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala @@ -187,7 +187,7 @@ abstract class Mailbox(val actor: ActorCell) extends AbstractMailbox with Messag } } catch { case e ⇒ - actor.system.eventStream.publish(Error(e, actor.self, "exception during processing system messages, dropping " + SystemMessage.size(nextMessage) + " messages!")) + actor.system.eventStream.publish(Error(e, actor.self.toString, "exception during processing system messages, dropping " + SystemMessage.size(nextMessage) + " messages!")) throw e } } diff --git a/akka-actor/src/main/scala/akka/event/EventStream.scala b/akka-actor/src/main/scala/akka/event/EventStream.scala index c2be45d81e..3906d2cb04 100644 --- a/akka-actor/src/main/scala/akka/event/EventStream.scala +++ b/akka-actor/src/main/scala/akka/event/EventStream.scala @@ -3,9 +3,16 @@ */ package akka.event -import akka.actor.{ ActorRef, Actor, Props, ActorSystemImpl, Terminated } +import akka.actor.{ ActorRef, Actor, Props, ActorSystemImpl, Terminated, ActorSystem, simpleName } import akka.util.Subclassification +object EventStream { + implicit def fromActorSystem(system: ActorSystem) = system.eventStream +} + +class A(x: Int = 0) extends Exception("x=" + x) +class B extends A + class EventStream(debug: Boolean = false) extends LoggingBus with SubchannelClassification { type Event = AnyRef @@ -24,18 +31,18 @@ class EventStream(debug: Boolean = false) extends LoggingBus with SubchannelClas protected def publish(event: AnyRef, subscriber: ActorRef) = subscriber ! event override def subscribe(subscriber: ActorRef, channel: Class[_]): Boolean = { - if (debug) publish(Logging.Debug(this, "subscribing " + subscriber + " to channel " + channel)) + if (debug) publish(Logging.Debug(simpleName(this), "subscribing " + subscriber + " to channel " + channel)) if (reaper ne null) reaper ! subscriber super.subscribe(subscriber, channel) } override def unsubscribe(subscriber: ActorRef, channel: Class[_]): Boolean = { - if (debug) publish(Logging.Debug(this, "unsubscribing " + subscriber + " from channel " + channel)) + if (debug) publish(Logging.Debug(simpleName(this), "unsubscribing " + subscriber + " from channel " + channel)) super.unsubscribe(subscriber, channel) } override def unsubscribe(subscriber: ActorRef) { - if (debug) publish(Logging.Debug(this, "unsubscribing " + subscriber + " from all channels")) + if (debug) publish(Logging.Debug(simpleName(this), "unsubscribing " + subscriber + " from all channels")) super.unsubscribe(subscriber) } diff --git a/akka-actor/src/main/scala/akka/event/Logging.scala b/akka-actor/src/main/scala/akka/event/Logging.scala index 62ee06381d..5b3ae4b801 100644 --- a/akka-actor/src/main/scala/akka/event/Logging.scala +++ b/akka-actor/src/main/scala/akka/event/Logging.scala @@ -15,6 +15,10 @@ import akka.dispatch.FutureTimeoutException import java.util.concurrent.atomic.AtomicInteger import akka.actor.ActorRefProvider +object LoggingBus { + implicit def fromActorSystem(system: ActorSystem): LoggingBus = system.eventStream +} + /** * This trait brings log level handling to the EventStream: it reads the log * levels for the initial logging (StandardOutLogger) and the loggers&level @@ -68,7 +72,7 @@ trait LoggingBus extends ActorEventBus { private[akka] def startStdoutLogger(config: Settings) { val level = levelFor(config.StdoutLogLevel) getOrElse { - StandardOutLogger.print(Error(new EventHandlerException, this, "unknown akka.stdout-loglevel " + config.StdoutLogLevel)) + StandardOutLogger.print(Error(new EventHandlerException, simpleName(this), "unknown akka.stdout-loglevel " + config.StdoutLogLevel)) ErrorLevel } AllLogLevels filter (level >= _) foreach (l ⇒ subscribe(StandardOutLogger, classFor(l))) @@ -76,12 +80,12 @@ trait LoggingBus extends ActorEventBus { loggers = Seq(StandardOutLogger) _logLevel = level } - publish(Info(this, "StandardOutLogger started")) + publish(Info(simpleName(this), "StandardOutLogger started")) } private[akka] def startDefaultLoggers(system: ActorSystemImpl) { val level = levelFor(system.settings.LogLevel) getOrElse { - StandardOutLogger.print(Error(new EventHandlerException, this, "unknown akka.stdout-loglevel " + system.settings.LogLevel)) + StandardOutLogger.print(Error(new EventHandlerException, simpleName(this), "unknown akka.stdout-loglevel " + system.settings.LogLevel)) ErrorLevel } try { @@ -109,7 +113,7 @@ trait LoggingBus extends ActorEventBus { loggers = myloggers _logLevel = level } - publish(Info(this, "Default Loggers started")) + publish(Info(simpleName(this), "Default Loggers started")) if (!(defaultLoggers contains StandardOutLoggerName)) { unsubscribe(StandardOutLogger) } @@ -125,7 +129,7 @@ trait LoggingBus extends ActorEventBus { val level = _logLevel // volatile access before reading loggers if (!(loggers contains StandardOutLogger)) { AllLogLevels filter (level >= _) foreach (l ⇒ subscribe(StandardOutLogger, classFor(l))) - publish(Info(this, "shutting down: StandardOutLogger started")) + publish(Info(simpleName(this), "shutting down: StandardOutLogger started")) } for { logger ← loggers @@ -135,7 +139,7 @@ trait LoggingBus extends ActorEventBus { unsubscribe(logger) logger.stop() } - publish(Info(this, "all default loggers stopped")) + publish(Info(simpleName(this), "all default loggers stopped")) } private def addLogger(system: ActorSystemImpl, clazz: Class[_ <: Actor], level: LogLevel): ActorRef = { @@ -144,17 +148,52 @@ trait LoggingBus extends ActorEventBus { implicit val timeout = Timeout(3 seconds) val response = try actor ? InitializeLogger(this) get catch { case _: FutureTimeoutException ⇒ - publish(Warning(this, "Logger " + name + " did not respond within " + timeout + " to InitializeLogger(bus)")) + publish(Warning(simpleName(this), "Logger " + name + " did not respond within " + timeout + " to InitializeLogger(bus)")) } if (response != LoggerInitialized) throw new LoggerInitializationException("Logger " + name + " did not respond with LoggerInitialized, sent instead " + response) AllLogLevels filter (level >= _) foreach (l ⇒ subscribe(actor, classFor(l))) - publish(Info(this, "logger " + name + " started")) + publish(Info(simpleName(this), "logger " + name + " started")) actor } } +trait LogSource[-T] { + def genString(t: T): String +} + +object LogSource { + implicit val fromString: LogSource[String] = new LogSource[String] { + def genString(s: String) = s + } + + implicit val fromActor: LogSource[Actor] = new LogSource[Actor] { + def genString(a: Actor) = a.self.toString + } + + implicit val fromActorRef: LogSource[ActorRef] = new LogSource[ActorRef] { + def genString(a: ActorRef) = a.toString + } + + // this one unfortunately does not work as implicit, because existential types have some weird behavior + val fromClass: LogSource[Class[_]] = new LogSource[Class[_]] { + def genString(c: Class[_]) = simpleName(c) + } + implicit def fromAnyClass[T]: LogSource[Class[T]] = fromClass.asInstanceOf[LogSource[Class[T]]] + + def apply[T: LogSource](o: T) = implicitly[LogSource[T]].genString(o) + + def fromAnyRef(o: AnyRef): String = + o match { + case c: Class[_] ⇒ fromClass.genString(c) + case a: Actor ⇒ fromActor.genString(a) + case a: ActorRef ⇒ fromActorRef.genString(a) + case s: String ⇒ s + case x ⇒ simpleName(x) + } +} + /** * Main entry point for Akka logging: log levels and message types (aka * channels) defined for the main transport medium, the main event bus. The @@ -235,24 +274,26 @@ object Logging { /** * Obtain LoggingAdapter for the given application and source object. The - * source object is used to identify the source of this logging channel. + * source is used to identify the source of this logging channel and must have + * a corresponding LogSource[T] instance in scope; by default these are + * provided for Class[_], Actor, ActorRef and String types. */ - def apply(system: ActorSystem, source: AnyRef): LoggingAdapter = new BusLogging(system.eventStream, source) + def apply[T: LogSource](eventStream: LoggingBus, logSource: T): LoggingAdapter = + new BusLogging(eventStream, implicitly[LogSource[T]].genString(logSource)) + /** * Java API: Obtain LoggingAdapter for the given application and source object. The - * source object is used to identify the source of this logging channel. + * source object is used to identify the source of this logging channel; if it is + * an Actor or ActorRef, its address is used, in case of a class an approximation of + * its simpleName and in all other cases the simpleName of its class. */ - def getLogger(system: ActorSystem, source: AnyRef): LoggingAdapter = apply(system, source) - /** - * Obtain LoggingAdapter for the given event bus and source object. The - * source object is used to identify the source of this logging channel. - */ - def apply(bus: LoggingBus, source: AnyRef): LoggingAdapter = new BusLogging(bus, source) + def getLogger(system: ActorSystem, logSource: AnyRef): LoggingAdapter = apply(system.eventStream, LogSource.fromAnyRef(logSource)) + /** * Java API: Obtain LoggingAdapter for the given event bus and source object. The * source object is used to identify the source of this logging channel. */ - def getLogger(bus: LoggingBus, source: AnyRef): LoggingAdapter = apply(bus, source) + def getLogger(bus: LoggingBus, logSource: AnyRef): LoggingAdapter = apply(bus, LogSource.fromAnyRef(logSource)) /** * Artificial exception injected into Error events if no Throwable is @@ -266,22 +307,22 @@ object Logging { def level: LogLevel } - case class Error(cause: Throwable, instance: AnyRef, message: Any = "") extends LogEvent { + case class Error(cause: Throwable, logSource: String, message: Any = "") extends LogEvent { def level = ErrorLevel } object Error { - def apply(instance: AnyRef, message: Any) = new Error(new EventHandlerException, instance, message) + def apply(logSource: String, message: Any) = new Error(new EventHandlerException, logSource, message) } - case class Warning(instance: AnyRef, message: Any = "") extends LogEvent { + case class Warning(logSource: String, message: Any = "") extends LogEvent { def level = WarningLevel } - case class Info(instance: AnyRef, message: Any = "") extends LogEvent { + case class Info(logSource: String, message: Any = "") extends LogEvent { def level = InfoLevel } - case class Debug(instance: AnyRef, message: Any = "") extends LogEvent { + case class Debug(logSource: String, message: Any = "") extends LogEvent { def level = DebugLevel } @@ -318,7 +359,7 @@ object Logging { case e: Warning ⇒ warning(e) case e: Info ⇒ info(e) case e: Debug ⇒ debug(e) - case e ⇒ warning(Warning(this, "received unexpected event of class " + e.getClass + ": " + e)) + case e ⇒ warning(Warning(simpleName(this), "received unexpected event of class " + e.getClass + ": " + e)) } } @@ -326,7 +367,7 @@ object Logging { println(errorFormat.format( timestamp, event.thread.getName, - instanceName(event.instance), + event.logSource, event.message, stackTraceFor(event.cause))) @@ -334,21 +375,21 @@ object Logging { println(warningFormat.format( timestamp, event.thread.getName, - instanceName(event.instance), + event.logSource, event.message)) def info(event: Info) = println(infoFormat.format( timestamp, event.thread.getName, - instanceName(event.instance), + event.logSource, event.message)) def debug(event: Debug) = println(debugFormat.format( timestamp, event.thread.getName, - instanceName(event.instance), + event.logSource, event.message)) def instanceName(instance: AnyRef): String = instance match { @@ -491,7 +532,7 @@ trait LoggingAdapter { } } -class BusLogging(val bus: LoggingBus, val loggingInstance: AnyRef) extends LoggingAdapter { +class BusLogging(val bus: LoggingBus, val logSource: String) extends LoggingAdapter { import Logging._ @@ -500,14 +541,14 @@ class BusLogging(val bus: LoggingBus, val loggingInstance: AnyRef) extends Loggi def isInfoEnabled = bus.logLevel >= InfoLevel def isDebugEnabled = bus.logLevel >= DebugLevel - protected def notifyError(message: String) { bus.publish(Error(loggingInstance, message)) } + protected def notifyError(message: String) { bus.publish(Error(logSource, message)) } - protected def notifyError(cause: Throwable, message: String) { bus.publish(Error(cause, loggingInstance, message)) } + protected def notifyError(cause: Throwable, message: String) { bus.publish(Error(cause, logSource, message)) } - protected def notifyWarning(message: String) { bus.publish(Warning(loggingInstance, message)) } + protected def notifyWarning(message: String) { bus.publish(Warning(logSource, message)) } - protected def notifyInfo(message: String) { bus.publish(Info(loggingInstance, message)) } + protected def notifyInfo(message: String) { bus.publish(Info(logSource, message)) } - protected def notifyDebug(message: String) { bus.publish(Debug(loggingInstance, message)) } + protected def notifyDebug(message: String) { bus.publish(Debug(logSource, message)) } } diff --git a/akka-actor/src/main/scala/akka/util/JMX.scala b/akka-actor/src/main/scala/akka/util/JMX.scala index 2bf8545210..2c87524843 100644 --- a/akka-actor/src/main/scala/akka/util/JMX.scala +++ b/akka-actor/src/main/scala/akka/util/JMX.scala @@ -24,7 +24,7 @@ object JMX { case e: InstanceAlreadyExistsException ⇒ Some(mbeanServer.getObjectInstance(name)) case e: Exception ⇒ - system.eventStream.publish(Error(e, this, "Error when registering mbean [%s]".format(mbean))) + system.eventStream.publish(Error(e, "JMX", "Error when registering mbean [%s]".format(mbean))) None } @@ -32,6 +32,6 @@ object JMX { mbeanServer.unregisterMBean(mbean) } catch { case e: InstanceNotFoundException ⇒ {} - case e: Exception ⇒ system.eventStream.publish(Error(e, this, "Error while unregistering mbean [%s]".format(mbean))) + case e: Exception ⇒ system.eventStream.publish(Error(e, "JMX", "Error while unregistering mbean [%s]".format(mbean))) } } diff --git a/akka-durable-mailboxes/akka-beanstalk-mailbox/src/main/scala/akka/actor/mailbox/BeanstalkBasedMailbox.scala b/akka-durable-mailboxes/akka-beanstalk-mailbox/src/main/scala/akka/actor/mailbox/BeanstalkBasedMailbox.scala index fae743b46c..5a0cbeebe7 100644 --- a/akka-durable-mailboxes/akka-beanstalk-mailbox/src/main/scala/akka/actor/mailbox/BeanstalkBasedMailbox.scala +++ b/akka-durable-mailboxes/akka-beanstalk-mailbox/src/main/scala/akka/actor/mailbox/BeanstalkBasedMailbox.scala @@ -28,7 +28,7 @@ class BeanstalkBasedMailbox(val owner: ActorCell) extends DurableMailbox(owner) val messageSubmitTimeout = Duration(system.settings.config.getInt("akka.actor.mailbox.beanstalk.message-submit-timeout", 5), defaultTimeUnit).toSeconds.toInt val messageTimeToLive = Duration(system.settings.config.getInt("akka.actor.mailbox.beanstalk.message-time-to-live", 120), defaultTimeUnit).toSeconds.toInt - val log = Logging(system, this) + val log = Logging(system, "BeanstalkBasedMailbox") private val queue = new ThreadLocal[Client] { override def initialValue = connect(name) } diff --git a/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/FiledBasedMailbox.scala b/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/FiledBasedMailbox.scala index 2ea90fb7d7..869bada6cd 100644 --- a/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/FiledBasedMailbox.scala +++ b/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/FiledBasedMailbox.scala @@ -19,7 +19,7 @@ object FileBasedMailbox { class FileBasedMailbox(val owner: ActorCell) extends DurableMailbox(owner) with DurableMessageSerialization { - val log = Logging(system, this) + val log = Logging(system, "FileBasedMailbox") val queuePath = FileBasedMailbox.queuePath(owner.system.settings.config) diff --git a/akka-durable-mailboxes/akka-mongo-mailbox/src/main/scala/akka/actor/mailbox/MongoBasedMailbox.scala b/akka-durable-mailboxes/akka-mongo-mailbox/src/main/scala/akka/actor/mailbox/MongoBasedMailbox.scala index 9a262fd3b3..def5a1a0c2 100644 --- a/akka-durable-mailboxes/akka-mongo-mailbox/src/main/scala/akka/actor/mailbox/MongoBasedMailbox.scala +++ b/akka-durable-mailboxes/akka-mongo-mailbox/src/main/scala/akka/actor/mailbox/MongoBasedMailbox.scala @@ -38,7 +38,7 @@ class MongoBasedMailbox(val owner: ActorCell) extends DurableMailbox(owner) { val writeTimeout = system.settings.config.getInt(WRITE_TIMEOUT_KEY, 3000) val readTimeout = system.settings.config.getInt(READ_TIMEOUT_KEY, 3000) - val log = Logging(system, this) + val log = Logging(system, "MongoBasedMailbox") @volatile private var mongo = connect() diff --git a/akka-durable-mailboxes/akka-redis-mailbox/src/main/scala/akka/actor/mailbox/RedisBasedMailbox.scala b/akka-durable-mailboxes/akka-redis-mailbox/src/main/scala/akka/actor/mailbox/RedisBasedMailbox.scala index ca765bfd62..65f27b3b8f 100644 --- a/akka-durable-mailboxes/akka-redis-mailbox/src/main/scala/akka/actor/mailbox/RedisBasedMailbox.scala +++ b/akka-durable-mailboxes/akka-redis-mailbox/src/main/scala/akka/actor/mailbox/RedisBasedMailbox.scala @@ -20,7 +20,7 @@ class RedisBasedMailbox(val owner: ActorCell) extends DurableMailbox(owner) with @volatile private var clients = connect() // returns a RedisClientPool for multiple asynchronous message handling - val log = Logging(system, this) + val log = Logging(system, "RedisBasedMailbox") def enqueue(receiver: ActorRef, envelope: Envelope) { log.debug("ENQUEUING message in redis-based mailbox [%s]".format(envelope)) diff --git a/akka-durable-mailboxes/akka-zookeeper-mailbox/src/main/scala/akka/actor/mailbox/ZooKeeperBasedMailbox.scala b/akka-durable-mailboxes/akka-zookeeper-mailbox/src/main/scala/akka/actor/mailbox/ZooKeeperBasedMailbox.scala index 6fff77d7cc..dd00a4cfdc 100644 --- a/akka-durable-mailboxes/akka-zookeeper-mailbox/src/main/scala/akka/actor/mailbox/ZooKeeperBasedMailbox.scala +++ b/akka-durable-mailboxes/akka-zookeeper-mailbox/src/main/scala/akka/actor/mailbox/ZooKeeperBasedMailbox.scala @@ -30,7 +30,7 @@ class ZooKeeperBasedMailbox(val owner: ActorCell) extends DurableMailbox(owner) val queueNode = "/queues" val queuePathTemplate = queueNode + "/%s" - val log = Logging(system, this) + val log = Logging(system, "ZooKeeperBasedMailbox") private val zkClient = new AkkaZkClient(zkServerAddresses, sessionTimeout, connectionTimeout) private val queue = new ZooKeeperQueue[Array[Byte]](zkClient, queuePathTemplate.format(name), blockingQueue) diff --git a/akka-remote/src/main/scala/akka/remote/Gossiper.scala b/akka-remote/src/main/scala/akka/remote/Gossiper.scala index 56a59b2ae2..636d8a67ec 100644 --- a/akka-remote/src/main/scala/akka/remote/Gossiper.scala +++ b/akka-remote/src/main/scala/akka/remote/Gossiper.scala @@ -102,7 +102,7 @@ class Gossiper(remote: Remote) { nodeMembershipChangeListeners: Set[NodeMembershipChangeListener] = Set.empty[NodeMembershipChangeListener]) private val system = remote.system - private val log = Logging(system, this) + private val log = Logging(system, "Gossiper") private val failureDetector = remote.failureDetector private val connectionManager = new RemoteConnectionManager(system, remote, Map.empty[RemoteAddress, ActorRef]) private val seeds = Set(address) // FIXME read in list of seeds from config diff --git a/akka-remote/src/main/scala/akka/remote/Remote.scala b/akka-remote/src/main/scala/akka/remote/Remote.scala index cf3b93b311..13d813145b 100644 --- a/akka-remote/src/main/scala/akka/remote/Remote.scala +++ b/akka-remote/src/main/scala/akka/remote/Remote.scala @@ -28,7 +28,7 @@ import java.util.concurrent.atomic.AtomicLong */ class Remote(val system: ActorSystemImpl, val nodename: String) { - val log = Logging(system, this) + val log = Logging(system, "Remote") import system._ import settings._ @@ -264,7 +264,7 @@ class RemoteMessage(input: RemoteMessageProtocol, remote: RemoteSupport, classLo .newInstance(exception.getMessage).asInstanceOf[Throwable] } catch { case problem: Exception ⇒ - remote.system.eventStream.publish(Logging.Error(problem, remote, problem.getMessage)) + remote.system.eventStream.publish(Logging.Error(problem, "RemoteMessage", problem.getMessage)) CannotInstantiateRemoteExceptionDueToRemoteProtocolParsingErrorException(problem, classname, exception.getMessage) } } diff --git a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala index facbf6cba1..7aa723ebb9 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala @@ -34,7 +34,8 @@ class RemoteActorRefProvider( val dispatcher: MessageDispatcher, val scheduler: Scheduler) extends ActorRefProvider { - val log = Logging(eventStream, this) + val log = Logging(eventStream, "RemoteActorRefProvider") + val local = new LocalActorRefProvider(settings, rootPath, eventStream, dispatcher, scheduler) def deathWatch = local.deathWatch diff --git a/akka-remote/src/main/scala/akka/remote/RemoteConnectionManager.scala b/akka-remote/src/main/scala/akka/remote/RemoteConnectionManager.scala index e128742365..7b739b6199 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteConnectionManager.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteConnectionManager.scala @@ -25,7 +25,7 @@ class RemoteConnectionManager( initialConnections: Map[RemoteAddress, ActorRef] = Map.empty[RemoteAddress, ActorRef]) extends ConnectionManager { - val log = Logging(system, this) + val log = Logging(system, "RemoteConnectionManager") // FIXME is this VersionedIterable really needed? It is not used I think. Complicates API. See 'def connections' etc. case class State(version: Long, connections: Map[RemoteAddress, ActorRef]) diff --git a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala index f6110f62d4..38e03a968f 100644 --- a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala +++ b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala @@ -39,7 +39,7 @@ abstract class RemoteClient private[akka] ( val remoteSupport: NettyRemoteSupport, val remoteAddress: RemoteAddress) { - val log = Logging(remoteSupport.system, this) + val log = Logging(remoteSupport.system, "RemoteClient") val name = simpleName(this) + "@" + remoteAddress @@ -351,7 +351,7 @@ class ActiveRemoteClientHandler( * Provides the implementation of the Netty remote support */ class NettyRemoteSupport(_system: ActorSystem) extends RemoteSupport(_system) with RemoteMarshallingOps { - val log = Logging(system, this) + val log = Logging(system, "NettyRemoteSupport") val serverSettings = new RemoteServerSettings(system.settings.config, system.settings.DefaultTimeUnit) val clientSettings = new RemoteClientSettings(system.settings.config, system.settings.DefaultTimeUnit) @@ -481,7 +481,7 @@ class NettyRemoteSupport(_system: ActorSystem) extends RemoteSupport(_system) wi } class NettyRemoteServer(val remoteSupport: NettyRemoteSupport, val loader: Option[ClassLoader]) { - val log = Logging(remoteSupport.system, this) + val log = Logging(remoteSupport.system, "NettyRemoteServer") import remoteSupport.serverSettings._ val address = remoteSupport.system.rootPath.remoteAddress @@ -586,7 +586,7 @@ class RemoteServerHandler( val applicationLoader: Option[ClassLoader], val remoteSupport: NettyRemoteSupport) extends SimpleChannelUpstreamHandler { - val log = Logging(remoteSupport.system, this) + val log = Logging(remoteSupport.system, "RemoteServerHandler") import remoteSupport.serverSettings._ diff --git a/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala b/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala index ff53016a74..fe3b406575 100644 --- a/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala +++ b/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala @@ -110,6 +110,8 @@ class CallingThreadDispatcher( val name: String = "calling-thread") extends MessageDispatcher(_prerequisites) { import CallingThreadDispatcher._ + val log = akka.event.Logging(prerequisites.eventStream, "CallingThreadDispatcher") + protected[akka] override def createMailbox(actor: ActorCell) = new CallingThreadMailbox(actor) private def getMailbox(actor: ActorCell): Option[CallingThreadMailbox] = actor.mailbox match { @@ -215,12 +217,12 @@ class CallingThreadDispatcher( true } catch { case ie: InterruptedException ⇒ - prerequisites.eventStream.publish(Error(this, ie)) + log.error(ie, "Interrupted during message processing") Thread.currentThread().interrupt() intex = ie true case e ⇒ - prerequisites.eventStream.publish(Error(this, e)) + log.error(e, "Error during message processing") queue.leave false } diff --git a/akka-testkit/src/main/scala/akka/testkit/TestEventListener.scala b/akka-testkit/src/main/scala/akka/testkit/TestEventListener.scala index 13b3587624..8cf7a8da4a 100644 --- a/akka-testkit/src/main/scala/akka/testkit/TestEventListener.scala +++ b/akka-testkit/src/main/scala/akka/testkit/TestEventListener.scala @@ -95,26 +95,20 @@ abstract class EventFilter(occurrences: Int) { /* * these default values are just there for easier subclassing */ - protected val source: Option[AnyRef] = None + protected val source: Option[String] = None protected val message: Either[String, Regex] = Left("") protected val complete: Boolean = false /** * internal implementation helper, no guaranteed API */ - protected def doMatch(src: AnyRef, msg: Any) = { + protected def doMatch(src: String, msg: Any) = { val msgstr = if (msg != null) msg.toString else "null" - (source.isDefined && sourceMatch(src) || source.isEmpty) && + (source.isDefined && source.get == src || source.isEmpty) && (message match { case Left(s) ⇒ if (complete) msgstr == s else msgstr.startsWith(s) case Right(p) ⇒ p.findFirstIn(msgstr).isDefined }) } - private def sourceMatch(src: AnyRef) = { - source.get match { - case c: Class[_] ⇒ c isInstance src - case s ⇒ src == s - } - } } /** @@ -151,7 +145,7 @@ object EventFilter { * `null` does NOT work (passing `null` disables the * source filter).'' */ - def apply[A <: Throwable: Manifest](message: String = null, source: AnyRef = null, start: String = "", pattern: String = null, occurrences: Int = Int.MaxValue): EventFilter = + def apply[A <: Throwable: Manifest](message: String = null, source: String = null, start: String = "", pattern: String = null, occurrences: Int = Int.MaxValue): EventFilter = ErrorFilter(manifest[A].erasure, Option(source), if (message ne null) Left(message) else Option(pattern) map (new Regex(_)) toRight start, message ne null)(occurrences) @@ -170,7 +164,7 @@ object EventFilter { * `null` does NOT work (passing `null` disables the * source filter).'' */ - def warning(message: String = null, source: AnyRef = null, start: String = "", pattern: String = null, occurrences: Int = Int.MaxValue): EventFilter = + def warning(message: String = null, source: String = null, start: String = "", pattern: String = null, occurrences: Int = Int.MaxValue): EventFilter = WarningFilter(Option(source), if (message ne null) Left(message) else Option(pattern) map (new Regex(_)) toRight start, message ne null)(occurrences) @@ -189,7 +183,7 @@ object EventFilter { * `null` does NOT work (passing `null` disables the * source filter).'' */ - def info(message: String = null, source: AnyRef = null, start: String = "", pattern: String = null, occurrences: Int = Int.MaxValue): EventFilter = + def info(message: String = null, source: String = null, start: String = "", pattern: String = null, occurrences: Int = Int.MaxValue): EventFilter = InfoFilter(Option(source), if (message ne null) Left(message) else Option(pattern) map (new Regex(_)) toRight start, message ne null)(occurrences) @@ -208,7 +202,7 @@ object EventFilter { * `null` does NOT work (passing `null` disables the * source filter).'' */ - def debug(message: String = null, source: AnyRef = null, start: String = "", pattern: String = null, occurrences: Int = Int.MaxValue): EventFilter = + def debug(message: String = null, source: String = null, start: String = "", pattern: String = null, occurrences: Int = Int.MaxValue): EventFilter = DebugFilter(Option(source), if (message ne null) Left(message) else Option(pattern) map (new Regex(_)) toRight start, message ne null)(occurrences) @@ -244,7 +238,7 @@ object EventFilter { */ case class ErrorFilter( throwable: Class[_], - override val source: Option[AnyRef], + override val source: Option[String], override val message: Either[String, Regex], override val complete: Boolean)(occurrences: Int) extends EventFilter(occurrences) { @@ -272,7 +266,7 @@ case class ErrorFilter( * @param complete * whether the event’s message must match the given message string or pattern completely */ - def this(throwable: Class[_], source: AnyRef, message: String, pattern: Boolean, complete: Boolean, occurrences: Int) = + def this(throwable: Class[_], source: String, message: String, pattern: Boolean, complete: Boolean, occurrences: Int) = this(throwable, Option(source), if (message eq null) Left("") else if (pattern) Right(new Regex(message)) @@ -295,7 +289,7 @@ case class ErrorFilter( * If you want to match all Warning events, the most efficient is to use Left(""). */ case class WarningFilter( - override val source: Option[AnyRef], + override val source: Option[String], override val message: Either[String, Regex], override val complete: Boolean)(occurrences: Int) extends EventFilter(occurrences) { @@ -321,7 +315,7 @@ case class WarningFilter( * @param complete * whether the event’s message must match the given message string or pattern completely */ - def this(source: AnyRef, message: String, pattern: Boolean, complete: Boolean, occurrences: Int) = + def this(source: String, message: String, pattern: Boolean, complete: Boolean, occurrences: Int) = this(Option(source), if (message eq null) Left("") else if (pattern) Right(new Regex(message)) @@ -338,7 +332,7 @@ case class WarningFilter( * If you want to match all Info events, the most efficient is to use Left(""). */ case class InfoFilter( - override val source: Option[AnyRef], + override val source: Option[String], override val message: Either[String, Regex], override val complete: Boolean)(occurrences: Int) extends EventFilter(occurrences) { @@ -364,7 +358,7 @@ case class InfoFilter( * @param complete * whether the event’s message must match the given message string or pattern completely */ - def this(source: AnyRef, message: String, pattern: Boolean, complete: Boolean, occurrences: Int) = + def this(source: String, message: String, pattern: Boolean, complete: Boolean, occurrences: Int) = this(Option(source), if (message eq null) Left("") else if (pattern) Right(new Regex(message)) @@ -381,7 +375,7 @@ case class InfoFilter( * If you want to match all Debug events, the most efficient is to use Left(""). */ case class DebugFilter( - override val source: Option[AnyRef], + override val source: Option[String], override val message: Either[String, Regex], override val complete: Boolean)(occurrences: Int) extends EventFilter(occurrences) { @@ -407,7 +401,7 @@ case class DebugFilter( * @param complete * whether the event’s message must match the given message string or pattern completely */ - def this(source: AnyRef, message: String, pattern: Boolean, complete: Boolean, occurrences: Int) = + def this(source: String, message: String, pattern: Boolean, complete: Boolean, occurrences: Int) = this(Option(source), if (message eq null) Left("") else if (pattern) Right(new Regex(message)) @@ -452,12 +446,12 @@ class TestEventListener extends Logging.DefaultLogger { case event: LogEvent ⇒ if (!filter(event)) print(event) case DeadLetter(msg: SystemMessage, _, rcp) ⇒ if (!msg.isInstanceOf[Terminate]) { - val event = Warning(rcp, "received dead system message: " + msg) + val event = Warning(rcp.toString, "received dead system message: " + msg) if (!filter(event)) print(event) } case DeadLetter(msg, snd, rcp) ⇒ if (!msg.isInstanceOf[Terminated]) { - val event = Warning(rcp, "received dead letter from " + snd + ": " + msg) + val event = Warning(rcp.toString, "received dead letter from " + snd + ": " + msg) if (!filter(event)) print(event) } } diff --git a/akka-testkit/src/main/scala/akka/testkit/TestKit.scala b/akka-testkit/src/main/scala/akka/testkit/TestKit.scala index 857d9f22ee..9f225a07cc 100644 --- a/akka-testkit/src/main/scala/akka/testkit/TestKit.scala +++ b/akka-testkit/src/main/scala/akka/testkit/TestKit.scala @@ -602,5 +602,5 @@ object TestProbe { } trait ImplicitSender { this: TestKit ⇒ - implicit def implicitSenderTestActor = testActor + implicit def self = testActor } diff --git a/akka-testkit/src/test/scala/akka/testkit/AkkaSpec.scala b/akka-testkit/src/test/scala/akka/testkit/AkkaSpec.scala index c365cd43fa..b3e34de2f3 100644 --- a/akka-testkit/src/test/scala/akka/testkit/AkkaSpec.scala +++ b/akka-testkit/src/test/scala/akka/testkit/AkkaSpec.scala @@ -18,7 +18,7 @@ object TimingTest extends Tag("timing") abstract class AkkaSpec(_application: ActorSystem = ActorSystem()) extends TestKit(_application) with WordSpec with MustMatchers with BeforeAndAfterAll { - val log: LoggingAdapter = Logging(system, this) + val log: LoggingAdapter = Logging(system, this.getClass) final override def beforeAll { atStartup()