diff --git a/akka-actor-tests/src/test/scala/akka/actor/ActorLifeCycleSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/ActorLifeCycleSpec.scala
index 23373a8af6..5d3358dc6f 100644
--- a/akka-actor-tests/src/test/scala/akka/actor/ActorLifeCycleSpec.scala
+++ b/akka-actor-tests/src/test/scala/akka/actor/ActorLifeCycleSpec.scala
@@ -14,13 +14,7 @@ import java.util.concurrent.atomic._
object ActorLifeCycleSpec {
-}
-
-@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
-class ActorLifeCycleSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSender {
- import ActorLifeCycleSpec._
-
- class LifeCycleTestActor(id: String, generationProvider: AtomicInteger) extends Actor {
+ class LifeCycleTestActor(testActor: ActorRef, id: String, generationProvider: AtomicInteger) extends Actor {
def report(msg: Any) = testActor ! message(msg)
def message(msg: Any): Tuple3[Any, String, Int] = (msg, id, currentGen)
val currentGen = generationProvider.getAndIncrement()
@@ -29,6 +23,12 @@ class ActorLifeCycleSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitS
def receive = { case "status" ⇒ sender ! message("OK") }
}
+}
+
+@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
+class ActorLifeCycleSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSender {
+ import ActorLifeCycleSpec._
+
"An Actor" must {
"invoke preRestart, preStart, postRestart when using OneForOneStrategy" in {
@@ -36,7 +36,7 @@ class ActorLifeCycleSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitS
val id = newUuid().toString
val supervisor = actorOf(Props[Supervisor].withFaultHandler(OneForOneStrategy(List(classOf[Exception]), Some(3))))
val gen = new AtomicInteger(0)
- val restarterProps = Props(new LifeCycleTestActor(id, gen) {
+ val restarterProps = Props(new LifeCycleTestActor(testActor, id, gen) {
override def preRestart(reason: Throwable, message: Option[Any]) { report("preRestart") }
override def postRestart(reason: Throwable) { report("postRestart") }
})
@@ -70,7 +70,7 @@ class ActorLifeCycleSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitS
val id = newUuid().toString
val supervisor = actorOf(Props[Supervisor].withFaultHandler(OneForOneStrategy(List(classOf[Exception]), Some(3))))
val gen = new AtomicInteger(0)
- val restarterProps = Props(new LifeCycleTestActor(id, gen))
+ val restarterProps = Props(new LifeCycleTestActor(testActor, id, gen))
val restarter = (supervisor ? restarterProps).as[ActorRef].get
expectMsg(("preStart", id, 0))
@@ -100,7 +100,7 @@ class ActorLifeCycleSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitS
val id = newUuid().toString
val supervisor = actorOf(Props[Supervisor].withFaultHandler(OneForOneStrategy(List(classOf[Exception]), Some(3))))
val gen = new AtomicInteger(0)
- val props = Props(new LifeCycleTestActor(id, gen))
+ val props = Props(new LifeCycleTestActor(testActor, id, gen))
val a = (supervisor ? props).as[ActorRef].get
expectMsg(("preStart", id, 0))
a ! "status"
diff --git a/akka-actor-tests/src/test/scala/akka/actor/FSMActorSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/FSMActorSpec.scala
index 2409d80734..69ed7e37ae 100644
--- a/akka-actor-tests/src/test/scala/akka/actor/FSMActorSpec.scala
+++ b/akka-actor-tests/src/test/scala/akka/actor/FSMActorSpec.scala
@@ -166,11 +166,12 @@ class FSMActorSpec extends AkkaSpec(Configuration("akka.actor.debug.fsm" -> true
case Ev("go") ⇒ goto(2)
}
})
+ val name = fsm.toString
filterException[Logging.EventHandlerException] {
system.eventStream.subscribe(testActor, classOf[Logging.Error])
fsm ! "go"
expectMsgPF(1 second, hint = "Next state 2 does not exist") {
- case Logging.Error(_, `fsm`, "Next state 2 does not exist") ⇒ true
+ case Logging.Error(_, `name`, "Next state 2 does not exist") ⇒ true
}
system.eventStream.unsubscribe(testActor)
}
@@ -213,18 +214,19 @@ class FSMActorSpec extends AkkaSpec(Configuration("akka.actor.debug.fsm" -> true
case StopEvent(r, _, _) ⇒ testActor ! r
}
})
+ val name = fsm.toString
system.eventStream.subscribe(testActor, classOf[Logging.Debug])
fsm ! "go"
expectMsgPF(1 second, hint = "processing Event(go,null)") {
- case Logging.Debug(`fsm`, s: String) if s.startsWith("processing Event(go,null) from Actor[") ⇒ true
+ case Logging.Debug(`name`, s: String) if s.startsWith("processing Event(go,null) from Actor[") ⇒ true
}
- expectMsg(1 second, Logging.Debug(fsm, "setting timer 't'/1500 milliseconds: Shutdown"))
- expectMsg(1 second, Logging.Debug(fsm, "transition 1 -> 2"))
+ expectMsg(1 second, Logging.Debug(name, "setting timer 't'/1500 milliseconds: Shutdown"))
+ expectMsg(1 second, Logging.Debug(name, "transition 1 -> 2"))
fsm ! "stop"
expectMsgPF(1 second, hint = "processing Event(stop,null)") {
- case Logging.Debug(`fsm`, s: String) if s.startsWith("processing Event(stop,null) from Actor[") ⇒ true
+ case Logging.Debug(`name`, s: String) if s.startsWith("processing Event(stop,null) from Actor[") ⇒ true
}
- expectMsgAllOf(1 second, Logging.Debug(fsm, "canceling timer 't'"), Normal)
+ expectMsgAllOf(1 second, Logging.Debug(name, "canceling timer 't'"), Normal)
expectNoMsg(1 second)
system.eventStream.unsubscribe(testActor)
}
diff --git a/akka-actor-tests/src/test/scala/akka/actor/FSMTimingSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/FSMTimingSpec.scala
index 6e243fc020..e915b69ecd 100644
--- a/akka-actor-tests/src/test/scala/akka/actor/FSMTimingSpec.scala
+++ b/akka-actor-tests/src/test/scala/akka/actor/FSMTimingSpec.scala
@@ -106,8 +106,8 @@ class FSMTimingSpec extends AkkaSpec with ImplicitSender {
}
"notify unhandled messages" taggedAs TimingTest in {
- filterEvents(EventFilter.warning("unhandled event Tick in state TestUnhandled", source = fsm, occurrences = 1),
- EventFilter.warning("unhandled event Unhandled(test) in state TestUnhandled", source = fsm, occurrences = 1)) {
+ filterEvents(EventFilter.warning("unhandled event Tick in state TestUnhandled", source = fsm.toString, occurrences = 1),
+ EventFilter.warning("unhandled event Unhandled(test) in state TestUnhandled", source = fsm.toString, occurrences = 1)) {
fsm ! TestUnhandled
within(1 second) {
fsm ! Tick
diff --git a/akka-actor-tests/src/test/scala/akka/actor/IOActor.scala b/akka-actor-tests/src/test/scala/akka/actor/IOActor.scala
index d4f08e40c2..3765ad5b6c 100644
--- a/akka-actor-tests/src/test/scala/akka/actor/IOActor.scala
+++ b/akka-actor-tests/src/test/scala/akka/actor/IOActor.scala
@@ -42,7 +42,7 @@ object IOActorSpec {
class SimpleEchoClient(host: String, port: Int, ioManager: ActorRef) extends Actor with IO {
- lazy val socket: SocketHandle = connect(ioManager, host, port, reader)
+ lazy val socket: SocketHandle = connect(ioManager, host, port)(reader)
lazy val reader: ActorRef = context.actorOf {
new Actor with IO {
def receiveIO = {
diff --git a/akka-actor-tests/src/test/scala/akka/actor/LoggingReceiveSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/LoggingReceiveSpec.scala
index 858c0dcdea..207eacb74e 100644
--- a/akka-actor-tests/src/test/scala/akka/actor/LoggingReceiveSpec.scala
+++ b/akka-actor-tests/src/test/scala/akka/actor/LoggingReceiveSpec.scala
@@ -57,9 +57,9 @@ class LoggingReceiveSpec extends WordSpec with BeforeAndAfterEach with BeforeAnd
val r: Actor.Receive = {
case null ⇒
}
- val log = Actor.LoggingReceive(this, r)
+ val log = Actor.LoggingReceive("funky", r)
log.isDefinedAt("hallo")
- expectMsg(1 second, Logging.Debug(this, "received unhandled message hallo"))
+ expectMsg(1 second, Logging.Debug("funky", "received unhandled message hallo"))
}
}
@@ -73,9 +73,10 @@ class LoggingReceiveSpec extends WordSpec with BeforeAndAfterEach with BeforeAnd
case _ ⇒ sender ! "x"
}
})
+ val name = actor.toString
actor ! "buh"
within(1 second) {
- expectMsg(Logging.Debug(actor.underlyingActor, "received handled message buh"))
+ expectMsg(Logging.Debug(name, "received handled message buh"))
expectMsg("x")
}
val r: Actor.Receive = {
@@ -86,7 +87,7 @@ class LoggingReceiveSpec extends WordSpec with BeforeAndAfterEach with BeforeAnd
within(500 millis) {
actor ! "bah"
expectMsgPF() {
- case Logging.Error(_: UnhandledMessageException, `actor`, _) ⇒ true
+ case Logging.Error(_: UnhandledMessageException, `name`, _) ⇒ true
}
}
}
@@ -103,7 +104,7 @@ class LoggingReceiveSpec extends WordSpec with BeforeAndAfterEach with BeforeAnd
})
actor ! "buh"
within(1 second) {
- expectMsg(Logging.Debug(actor.underlyingActor, "received handled message buh"))
+ expectMsg(Logging.Debug(actor.toString, "received handled message buh"))
expectMsg("x")
}
}
@@ -121,9 +122,10 @@ class LoggingReceiveSpec extends WordSpec with BeforeAndAfterEach with BeforeAnd
case _ ⇒
}
})
+ val name = actor.toString
actor ! PoisonPill
expectMsgPF() {
- case Logging.Debug(`actor`, msg: String) if msg startsWith "received AutoReceiveMessage Envelope(PoisonPill" ⇒ true
+ case Logging.Debug(`name`, msg: String) if msg startsWith "received AutoReceiveMessage Envelope(PoisonPill" ⇒ true
}
awaitCond(actor.isShutdown, 100 millis)
}
@@ -141,20 +143,23 @@ class LoggingReceiveSpec extends WordSpec with BeforeAndAfterEach with BeforeAnd
system.eventStream.subscribe(testActor, classOf[Logging.Error])
within(3 seconds) {
val lifecycleGuardian = appLifecycle.asInstanceOf[ActorSystemImpl].guardian
+ val lname = lifecycleGuardian.toString
val supervisor = TestActorRef[TestLogActor](Props[TestLogActor].withFaultHandler(OneForOneStrategy(List(classOf[Throwable]), 5, 5000)))
+ val sname = supervisor.toString
val supervisorSet = receiveWhile(messages = 2) {
- case Logging.Debug(`lifecycleGuardian`, msg: String) if msg startsWith "now supervising" ⇒ 1
- case Logging.Debug(`supervisor`, msg: String) if msg startsWith "started" ⇒ 2
+ case Logging.Debug(`lname`, msg: String) if msg startsWith "now supervising" ⇒ 1
+ case Logging.Debug(`sname`, msg: String) if msg startsWith "started" ⇒ 2
}.toSet
expectNoMsg(Duration.Zero)
assert(supervisorSet == Set(1, 2), supervisorSet + " was not Set(1, 2)")
val actor = TestActorRef[TestLogActor](Props[TestLogActor], supervisor, "none")
+ val aname = actor.toString
val set = receiveWhile(messages = 2) {
- case Logging.Debug(`supervisor`, msg: String) if msg startsWith "now supervising" ⇒ 1
- case Logging.Debug(`actor`, msg: String) if msg startsWith "started" ⇒ 2
+ case Logging.Debug(`sname`, msg: String) if msg startsWith "now supervising" ⇒ 1
+ case Logging.Debug(`aname`, msg: String) if msg startsWith "started" ⇒ 2
}.toSet
expectNoMsg(Duration.Zero)
assert(set == Set(1, 2), set + " was not Set(1, 2)")
@@ -174,18 +179,18 @@ class LoggingReceiveSpec extends WordSpec with BeforeAndAfterEach with BeforeAnd
filterException[ActorKilledException] {
actor ! Kill
val set = receiveWhile(messages = 3) {
- case Logging.Error(_: ActorKilledException, `actor`, "Kill") ⇒ 1
- case Logging.Debug(`actor`, "restarting") ⇒ 2
- case Logging.Debug(`actor`, "restarted") ⇒ 3
+ case Logging.Error(_: ActorKilledException, `aname`, "Kill") ⇒ 1
+ case Logging.Debug(`aname`, "restarting") ⇒ 2
+ case Logging.Debug(`aname`, "restarted") ⇒ 3
}.toSet
expectNoMsg(Duration.Zero)
assert(set == Set(1, 2, 3), set + " was not Set(1, 2, 3)")
}
supervisor.stop()
- expectMsg(Logging.Debug(supervisor, "stopping"))
- expectMsg(Logging.Debug(actor, "stopped"))
- expectMsg(Logging.Debug(supervisor, "stopped"))
+ expectMsg(Logging.Debug(sname, "stopping"))
+ expectMsg(Logging.Debug(aname, "stopped"))
+ expectMsg(Logging.Debug(sname, "stopped"))
}
}
}
diff --git a/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala
index 33cea49284..450c201a75 100644
--- a/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala
+++ b/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala
@@ -147,7 +147,7 @@ object ActorModelSpec {
await(deadline)(stops == dispatcher.stops.get)
} catch {
case e ⇒
- system.eventStream.publish(Error(e, dispatcher, "actual: stops=" + dispatcher.stops.get +
+ system.eventStream.publish(Error(e, dispatcher.toString, "actual: stops=" + dispatcher.stops.get +
" required: stops=" + stops))
throw e
}
@@ -204,7 +204,7 @@ object ActorModelSpec {
await(deadline)(stats.restarts.get() == restarts)
} catch {
case e ⇒
- system.eventStream.publish(Error(e, dispatcher, "actual: " + stats + ", required: InterceptorStats(susp=" + suspensions +
+ system.eventStream.publish(Error(e, dispatcher.toString, "actual: " + stats + ", required: InterceptorStats(susp=" + suspensions +
",res=" + resumes + ",reg=" + registers + ",unreg=" + unregisters +
",recv=" + msgsReceived + ",proc=" + msgsProcessed + ",restart=" + restarts))
throw e
@@ -310,7 +310,7 @@ abstract class ActorModelSpec extends AkkaSpec {
try {
f
} catch {
- case e ⇒ system.eventStream.publish(Error(e, this, "error in spawned thread"))
+ case e ⇒ system.eventStream.publish(Error(e, "spawn", "error in spawned thread"))
}
}
}
diff --git a/akka-actor-tests/src/test/scala/akka/event/EventStreamSpec.scala b/akka-actor-tests/src/test/scala/akka/event/EventStreamSpec.scala
index d5371af0b9..6314561897 100644
--- a/akka-actor-tests/src/test/scala/akka/event/EventStreamSpec.scala
+++ b/akka-actor-tests/src/test/scala/akka/event/EventStreamSpec.scala
@@ -102,7 +102,7 @@ class EventStreamSpec extends AkkaSpec(Configuration(
private def verifyLevel(bus: LoggingBus, level: Logging.LogLevel) {
import Logging._
- val allmsg = Seq(Debug(this, "debug"), Info(this, "info"), Warning(this, "warning"), Error(this, "error"))
+ val allmsg = Seq(Debug("", "debug"), Info("", "info"), Warning("", "warning"), Error("", "error"))
val msg = allmsg filter (_.level <= level)
allmsg foreach bus.publish
msg foreach (x ⇒ expectMsg(x))
diff --git a/akka-actor-tests/src/test/scala/akka/performance/workbench/Report.scala b/akka-actor-tests/src/test/scala/akka/performance/workbench/Report.scala
index 83e8b182bf..05319e0772 100644
--- a/akka-actor-tests/src/test/scala/akka/performance/workbench/Report.scala
+++ b/akka-actor-tests/src/test/scala/akka/performance/workbench/Report.scala
@@ -15,7 +15,7 @@ class Report(
compareResultWith: Option[String] = None) {
private def doLog = System.getProperty("benchmark.logResult", "true").toBoolean
- val log = Logging(system, this)
+ val log = Logging(system, "Report")
val dateTimeFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm")
val legendTimeFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm")
diff --git a/akka-actor/src/main/scala/akka/actor/Actor.scala b/akka-actor/src/main/scala/akka/actor/Actor.scala
index 30eebb2f87..6472153d75 100644
--- a/akka-actor/src/main/scala/akka/actor/Actor.scala
+++ b/akka-actor/src/main/scala/akka/actor/Actor.scala
@@ -14,6 +14,7 @@ import akka.cluster.ClusterNode
import akka.japi.{ Creator, Procedure }
import akka.serialization.{ Serializer, Serialization }
import akka.event.Logging.Debug
+import akka.event.LogSource
import akka.experimental
import akka.AkkaException
@@ -167,7 +168,7 @@ object Actor {
class LoggingReceive(source: AnyRef, r: Receive)(implicit system: ActorSystem) extends Receive {
def isDefinedAt(o: Any) = {
val handled = r.isDefinedAt(o)
- system.eventStream.publish(Debug(source, "received " + (if (handled) "handled" else "unhandled") + " message " + o))
+ system.eventStream.publish(Debug(LogSource.fromAnyRef(source), "received " + (if (handled) "handled" else "unhandled") + " message " + o))
handled
}
def apply(o: Any): Unit = r(o)
diff --git a/akka-actor/src/main/scala/akka/actor/ActorCell.scala b/akka-actor/src/main/scala/akka/actor/ActorCell.scala
index 2685f6ca85..8a5f0eff75 100644
--- a/akka-actor/src/main/scala/akka/actor/ActorCell.scala
+++ b/akka-actor/src/main/scala/akka/actor/ActorCell.scala
@@ -17,7 +17,7 @@ import akka.event.Logging.{ Debug, Warning, Error }
*/
trait ActorContext extends ActorRefFactory with TypedActorFactory {
- def self: ActorRef with ScalaActorRef
+ def self: ActorRef
def hasMessages: Boolean
@@ -174,11 +174,11 @@ private[akka] class ActorCell(
actor = created
created.preStart()
checkReceiveTimeout
- if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self, "started (" + actor + ")"))
+ if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.toString, "started (" + actor + ")"))
} catch {
case e ⇒
try {
- system.eventStream.publish(Error(e, self, "error while creating actor"))
+ system.eventStream.publish(Error(e, self.toString, "error while creating actor"))
// prevent any further messages to be processed until the actor has been restarted
dispatcher.suspend(this)
} finally {
@@ -188,7 +188,7 @@ private[akka] class ActorCell(
def recreate(cause: Throwable): Unit = try {
val failedActor = actor
- if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self, "restarting"))
+ if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.toString, "restarting"))
val freshActor = newActor()
if (failedActor ne null) {
val c = currentMessage //One read only plz
@@ -202,14 +202,14 @@ private[akka] class ActorCell(
}
actor = freshActor // assign it here so if preStart fails, we can null out the sef-refs next call
freshActor.postRestart(cause)
- if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self, "restarted"))
+ if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.toString, "restarted"))
dispatcher.resume(this) //FIXME should this be moved down?
props.faultHandler.handleSupervisorRestarted(cause, self, children)
} catch {
case e ⇒ try {
- system.eventStream.publish(Error(e, self, "error while creating actor"))
+ system.eventStream.publish(Error(e, self.toString, "error while creating actor"))
// prevent any further messages to be processed until the actor has been restarted
dispatcher.suspend(this)
} finally {
@@ -228,7 +228,7 @@ private[akka] class ActorCell(
val c = children
if (c.isEmpty) doTerminate()
else {
- if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self, "stopping"))
+ if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.toString, "stopping"))
for (child ← c) child.stop()
stopping = true
}
@@ -239,8 +239,8 @@ private[akka] class ActorCell(
if (!stats.contains(child)) {
childrenRefs = childrenRefs.updated(child.name, child)
childrenStats = childrenStats.updated(child, ChildRestartStats())
- if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self, "now supervising " + child))
- } else system.eventStream.publish(Warning(self, "Already supervising " + child))
+ if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.toString, "now supervising " + child))
+ } else system.eventStream.publish(Warning(self.toString, "Already supervising " + child))
}
try {
@@ -255,10 +255,10 @@ private[akka] class ActorCell(
case Recreate(cause) ⇒ recreate(cause)
case Link(subject) ⇒
system.deathWatch.subscribe(self, subject)
- if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self, "now monitoring " + subject))
+ if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.toString, "now monitoring " + subject))
case Unlink(subject) ⇒
system.deathWatch.unsubscribe(self, subject)
- if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self, "stopped monitoring " + subject))
+ if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.toString, "stopped monitoring " + subject))
case Suspend() ⇒ suspend()
case Resume() ⇒ resume()
case Terminate() ⇒ terminate()
@@ -267,7 +267,7 @@ private[akka] class ActorCell(
}
} catch {
case e ⇒ //Should we really catch everything here?
- system.eventStream.publish(Error(e, self, "error while processing " + message))
+ system.eventStream.publish(Error(e, self.toString, "error while processing " + message))
//TODO FIXME How should problems here be handled?
throw e
}
@@ -294,7 +294,7 @@ private[akka] class ActorCell(
currentMessage = null // reset current message after successful invocation
} catch {
case e ⇒
- system.eventStream.publish(Error(e, self, e.getMessage))
+ system.eventStream.publish(Error(e, self.toString, e.getMessage))
// prevent any further messages to be processed until the actor has been restarted
dispatcher.suspend(this)
@@ -314,7 +314,7 @@ private[akka] class ActorCell(
}
} catch {
case e ⇒
- system.eventStream.publish(Error(e, self, e.getMessage))
+ system.eventStream.publish(Error(e, self.toString, e.getMessage))
throw e
}
}
@@ -332,7 +332,7 @@ private[akka] class ActorCell(
}
def autoReceiveMessage(msg: Envelope) {
- if (system.settings.DebugAutoReceive) system.eventStream.publish(Debug(self, "received AutoReceiveMessage " + msg))
+ if (system.settings.DebugAutoReceive) system.eventStream.publish(Debug(self.toString, "received AutoReceiveMessage " + msg))
if (stopping) msg.message match {
case ChildTerminated ⇒ handleChildTerminated(sender)
@@ -350,7 +350,7 @@ private[akka] class ActorCell(
private def doTerminate() {
if (!system.provider.evict(self.path.toString))
- system.eventStream.publish(Warning(self, "evict of " + self.path.toString + " failed"))
+ system.eventStream.publish(Warning(self.toString, "evict of " + self.path.toString + " failed"))
dispatcher.detach(this)
@@ -361,7 +361,7 @@ private[akka] class ActorCell(
try {
parent.tell(ChildTerminated, self)
system.deathWatch.publish(Terminated(self))
- if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self, "stopped"))
+ if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.toString, "stopped"))
} finally {
currentMessage = null
clearActorFields()
@@ -371,7 +371,7 @@ private[akka] class ActorCell(
final def handleFailure(child: ActorRef, cause: Throwable): Unit = childrenStats.get(child) match {
case Some(stats) ⇒ if (!props.faultHandler.handleFailure(child, cause, stats, childrenStats)) throw cause
- case None ⇒ system.eventStream.publish(Warning(self, "dropping Failed(" + cause + ") from unknown child"))
+ case None ⇒ system.eventStream.publish(Warning(self.toString, "dropping Failed(" + cause + ") from unknown child"))
}
final def handleChildTerminated(child: ActorRef): Unit = {
diff --git a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala
index 5770a5959f..803410b67c 100644
--- a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala
+++ b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala
@@ -144,7 +144,7 @@ class LocalActorRefProvider(
val dispatcher: MessageDispatcher,
val scheduler: Scheduler) extends ActorRefProvider {
- val log = Logging(eventStream, this)
+ val log = Logging(eventStream, "LocalActorRefProvider")
// FIXME remove/replave (clustering shall not leak into akka-actor)
val nodename: String = System.getProperty("akka.cluster.nodename") match {
diff --git a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala
index 6be2325385..b4416ae5fb 100644
--- a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala
+++ b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala
@@ -188,7 +188,7 @@ class ActorSystemImpl(val name: String, config: Configuration) extends ActorSyst
// this provides basic logging (to stdout) until .start() is called below
val eventStream = new EventStream(DebugEventStream)
eventStream.startStdoutLogger(settings)
- val log = new BusLogging(eventStream, this) // “this” used only for .getClass in tagging messages
+ val log = new BusLogging(eventStream, "ActorSystem") // “this” used only for .getClass in tagging messages
/**
* The root actor path for this application.
diff --git a/akka-actor/src/main/scala/akka/actor/Deployer.scala b/akka-actor/src/main/scala/akka/actor/Deployer.scala
index 4b5d64bde6..7a2ad320b6 100644
--- a/akka-actor/src/main/scala/akka/actor/Deployer.scala
+++ b/akka-actor/src/main/scala/akka/actor/Deployer.scala
@@ -36,7 +36,7 @@ trait ActorDeployer {
class Deployer(val settings: ActorSystem.Settings, val eventStream: EventStream, val nodename: String) extends ActorDeployer {
val deploymentConfig = new DeploymentConfig(nodename)
- val log = Logging(eventStream, this)
+ val log = Logging(eventStream, "Deployer")
val instance: ActorDeployer = {
val deployer = new LocalDeployer()
diff --git a/akka-actor/src/main/scala/akka/actor/IO.scala b/akka-actor/src/main/scala/akka/actor/IO.scala
index 65e325bfe8..aeb4e53573 100644
--- a/akka-actor/src/main/scala/akka/actor/IO.scala
+++ b/akka-actor/src/main/scala/akka/actor/IO.scala
@@ -71,13 +71,11 @@ object IO {
case class ServerHandle(owner: ActorRef, ioManager: ActorRef, uuid: UUID = new UUID()) extends Handle {
override def asServer = this
- def accept(socketOwner: ActorRef): SocketHandle = {
+ def accept()(implicit socketOwner: ActorRef): SocketHandle = {
val socket = SocketHandle(socketOwner, ioManager)
ioManager ! Accept(socket, this)
socket
}
-
- def accept()(implicit socketOwner: ScalaActorRef): SocketHandle = accept(socketOwner)
}
sealed trait IOMessage
@@ -91,35 +89,23 @@ object IO {
case class Read(handle: ReadHandle, bytes: ByteString) extends IOMessage
case class Write(handle: WriteHandle, bytes: ByteString) extends IOMessage
- def listen(ioManager: ActorRef, address: InetSocketAddress, owner: ActorRef): ServerHandle = {
+ def listen(ioManager: ActorRef, address: InetSocketAddress)(implicit owner: ActorRef): ServerHandle = {
val server = ServerHandle(owner, ioManager)
ioManager ! Listen(server, address)
server
}
- def listen(ioManager: ActorRef, address: InetSocketAddress)(implicit sender: ScalaActorRef): ServerHandle =
- listen(ioManager, address, sender)
+ def listen(ioManager: ActorRef, host: String, port: Int)(implicit owner: ActorRef): ServerHandle =
+ listen(ioManager, new InetSocketAddress(host, port))
- def listen(ioManager: ActorRef, host: String, port: Int, owner: ActorRef): ServerHandle =
- listen(ioManager, new InetSocketAddress(host, port), owner)
-
- def listen(ioManager: ActorRef, host: String, port: Int)(implicit sender: ScalaActorRef): ServerHandle =
- listen(ioManager, new InetSocketAddress(host, port), sender)
-
- def connect(ioManager: ActorRef, address: InetSocketAddress, owner: ActorRef): SocketHandle = {
+ def connect(ioManager: ActorRef, address: InetSocketAddress)(implicit owner: ActorRef): SocketHandle = {
val socket = SocketHandle(owner, ioManager)
ioManager ! Connect(socket, address)
socket
}
- def connect(ioManager: ActorRef, address: InetSocketAddress)(implicit sender: ScalaActorRef): SocketHandle =
- connect(ioManager, address, sender)
-
- def connect(ioManager: ActorRef, host: String, port: Int, owner: ActorRef): SocketHandle =
- connect(ioManager, new InetSocketAddress(host, port), owner)
-
- def connect(ioManager: ActorRef, host: String, port: Int)(implicit sender: ScalaActorRef): SocketHandle =
- connect(ioManager, new InetSocketAddress(host, port), sender)
+ def connect(ioManager: ActorRef, host: String, port: Int)(implicit sender: ActorRef): SocketHandle =
+ connect(ioManager, new InetSocketAddress(host, port))
private class HandleState(var readBytes: ByteString, var connected: Boolean) {
def this() = this(ByteString.empty, false)
diff --git a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala
index 3d66532d31..aa3edd69ac 100644
--- a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala
+++ b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala
@@ -69,7 +69,7 @@ final case class TaskInvocation(eventStream: EventStream, function: () ⇒ Unit,
try {
function()
} catch {
- case e ⇒ eventStream.publish(Error(e, this, e.getMessage))
+ case e ⇒ eventStream.publish(Error(e, "TaskInvocation", e.getMessage))
} finally {
cleanup()
}
diff --git a/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala
index 586586b7d5..e8f799c414 100644
--- a/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala
+++ b/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala
@@ -99,7 +99,7 @@ class Dispatcher(
executorService.get() execute invocation
} catch {
case e2: RejectedExecutionException ⇒
- prerequisites.eventStream.publish(Warning(this, e2.toString))
+ prerequisites.eventStream.publish(Warning("Dispatcher", e2.toString))
throw e2
}
}
diff --git a/akka-actor/src/main/scala/akka/dispatch/Future.scala b/akka-actor/src/main/scala/akka/dispatch/Future.scala
index 2160c406cd..b57ff39512 100644
--- a/akka-actor/src/main/scala/akka/dispatch/Future.scala
+++ b/akka-actor/src/main/scala/akka/dispatch/Future.scala
@@ -262,7 +262,7 @@ object Future {
result completeWithResult currentValue
} catch {
case e: Exception ⇒
- dispatcher.prerequisites.eventStream.publish(Error(e, this, e.getMessage))
+ dispatcher.prerequisites.eventStream.publish(Error(e, "Future.fold", e.getMessage))
result completeWithException e
} finally {
results.clear
@@ -631,7 +631,7 @@ sealed trait Future[+T] extends japi.Future[T] {
Right(f(res))
} catch {
case e: Exception ⇒
- dispatcher.prerequisites.eventStream.publish(Error(e, this, e.getMessage))
+ dispatcher.prerequisites.eventStream.publish(Error(e, "Future.map", e.getMessage))
Left(e)
})
}
@@ -683,7 +683,7 @@ sealed trait Future[+T] extends japi.Future[T] {
future.completeWith(f(r))
} catch {
case e: Exception ⇒
- dispatcher.prerequisites.eventStream.publish(Error(e, this, e.getMessage))
+ dispatcher.prerequisites.eventStream.publish(Error(e, "Future.flatMap", e.getMessage))
future complete Left(e)
}
}
@@ -716,7 +716,7 @@ sealed trait Future[+T] extends japi.Future[T] {
if (p(res)) r else Left(new MatchError(res))
} catch {
case e: Exception ⇒
- dispatcher.prerequisites.eventStream.publish(Error(e, this, e.getMessage))
+ dispatcher.prerequisites.eventStream.publish(Error(e, "Future.filter", e.getMessage))
Left(e)
})
}
@@ -788,7 +788,7 @@ trait Promise[T] extends Future[T] {
fr completeWith cont(f)
} catch {
case e: Exception ⇒
- dispatcher.prerequisites.eventStream.publish(Error(e, this, e.getMessage))
+ dispatcher.prerequisites.eventStream.publish(Error(e, "Promise.completeWith", e.getMessage))
fr completeWithException e
}
}
@@ -802,7 +802,7 @@ trait Promise[T] extends Future[T] {
fr completeWith cont(f)
} catch {
case e: Exception ⇒
- dispatcher.prerequisites.eventStream.publish(Error(e, this, e.getMessage))
+ dispatcher.prerequisites.eventStream.publish(Error(e, "Promise.completeWith", e.getMessage))
fr completeWithException e
}
}
@@ -994,7 +994,7 @@ class DefaultPromise[T](val timeout: Timeout)(implicit val dispatcher: MessageDi
} else this
private def notifyCompleted(func: Future[T] ⇒ Unit) {
- try { func(this) } catch { case e ⇒ dispatcher.prerequisites.eventStream.publish(Error(e, this, "Future onComplete-callback raised an exception")) } //TODO catch, everything? Really?
+ try { func(this) } catch { case e ⇒ dispatcher.prerequisites.eventStream.publish(Error(e, "Future", "Future onComplete-callback raised an exception")) } //TODO catch, everything? Really?
}
@inline
diff --git a/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala b/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala
index b752653f1d..ddbebdf3ef 100644
--- a/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala
+++ b/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala
@@ -187,7 +187,7 @@ abstract class Mailbox(val actor: ActorCell) extends AbstractMailbox with Messag
}
} catch {
case e ⇒
- actor.system.eventStream.publish(Error(e, actor.self, "exception during processing system messages, dropping " + SystemMessage.size(nextMessage) + " messages!"))
+ actor.system.eventStream.publish(Error(e, actor.self.toString, "exception during processing system messages, dropping " + SystemMessage.size(nextMessage) + " messages!"))
throw e
}
}
diff --git a/akka-actor/src/main/scala/akka/event/EventStream.scala b/akka-actor/src/main/scala/akka/event/EventStream.scala
index c2be45d81e..3906d2cb04 100644
--- a/akka-actor/src/main/scala/akka/event/EventStream.scala
+++ b/akka-actor/src/main/scala/akka/event/EventStream.scala
@@ -3,9 +3,16 @@
*/
package akka.event
-import akka.actor.{ ActorRef, Actor, Props, ActorSystemImpl, Terminated }
+import akka.actor.{ ActorRef, Actor, Props, ActorSystemImpl, Terminated, ActorSystem, simpleName }
import akka.util.Subclassification
+object EventStream {
+ implicit def fromActorSystem(system: ActorSystem) = system.eventStream
+}
+
+class A(x: Int = 0) extends Exception("x=" + x)
+class B extends A
+
class EventStream(debug: Boolean = false) extends LoggingBus with SubchannelClassification {
type Event = AnyRef
@@ -24,18 +31,18 @@ class EventStream(debug: Boolean = false) extends LoggingBus with SubchannelClas
protected def publish(event: AnyRef, subscriber: ActorRef) = subscriber ! event
override def subscribe(subscriber: ActorRef, channel: Class[_]): Boolean = {
- if (debug) publish(Logging.Debug(this, "subscribing " + subscriber + " to channel " + channel))
+ if (debug) publish(Logging.Debug(simpleName(this), "subscribing " + subscriber + " to channel " + channel))
if (reaper ne null) reaper ! subscriber
super.subscribe(subscriber, channel)
}
override def unsubscribe(subscriber: ActorRef, channel: Class[_]): Boolean = {
- if (debug) publish(Logging.Debug(this, "unsubscribing " + subscriber + " from channel " + channel))
+ if (debug) publish(Logging.Debug(simpleName(this), "unsubscribing " + subscriber + " from channel " + channel))
super.unsubscribe(subscriber, channel)
}
override def unsubscribe(subscriber: ActorRef) {
- if (debug) publish(Logging.Debug(this, "unsubscribing " + subscriber + " from all channels"))
+ if (debug) publish(Logging.Debug(simpleName(this), "unsubscribing " + subscriber + " from all channels"))
super.unsubscribe(subscriber)
}
diff --git a/akka-actor/src/main/scala/akka/event/Logging.scala b/akka-actor/src/main/scala/akka/event/Logging.scala
index 62ee06381d..5b3ae4b801 100644
--- a/akka-actor/src/main/scala/akka/event/Logging.scala
+++ b/akka-actor/src/main/scala/akka/event/Logging.scala
@@ -15,6 +15,10 @@ import akka.dispatch.FutureTimeoutException
import java.util.concurrent.atomic.AtomicInteger
import akka.actor.ActorRefProvider
+object LoggingBus {
+ implicit def fromActorSystem(system: ActorSystem): LoggingBus = system.eventStream
+}
+
/**
* This trait brings log level handling to the EventStream: it reads the log
* levels for the initial logging (StandardOutLogger) and the loggers&level
@@ -68,7 +72,7 @@ trait LoggingBus extends ActorEventBus {
private[akka] def startStdoutLogger(config: Settings) {
val level = levelFor(config.StdoutLogLevel) getOrElse {
- StandardOutLogger.print(Error(new EventHandlerException, this, "unknown akka.stdout-loglevel " + config.StdoutLogLevel))
+ StandardOutLogger.print(Error(new EventHandlerException, simpleName(this), "unknown akka.stdout-loglevel " + config.StdoutLogLevel))
ErrorLevel
}
AllLogLevels filter (level >= _) foreach (l ⇒ subscribe(StandardOutLogger, classFor(l)))
@@ -76,12 +80,12 @@ trait LoggingBus extends ActorEventBus {
loggers = Seq(StandardOutLogger)
_logLevel = level
}
- publish(Info(this, "StandardOutLogger started"))
+ publish(Info(simpleName(this), "StandardOutLogger started"))
}
private[akka] def startDefaultLoggers(system: ActorSystemImpl) {
val level = levelFor(system.settings.LogLevel) getOrElse {
- StandardOutLogger.print(Error(new EventHandlerException, this, "unknown akka.stdout-loglevel " + system.settings.LogLevel))
+ StandardOutLogger.print(Error(new EventHandlerException, simpleName(this), "unknown akka.stdout-loglevel " + system.settings.LogLevel))
ErrorLevel
}
try {
@@ -109,7 +113,7 @@ trait LoggingBus extends ActorEventBus {
loggers = myloggers
_logLevel = level
}
- publish(Info(this, "Default Loggers started"))
+ publish(Info(simpleName(this), "Default Loggers started"))
if (!(defaultLoggers contains StandardOutLoggerName)) {
unsubscribe(StandardOutLogger)
}
@@ -125,7 +129,7 @@ trait LoggingBus extends ActorEventBus {
val level = _logLevel // volatile access before reading loggers
if (!(loggers contains StandardOutLogger)) {
AllLogLevels filter (level >= _) foreach (l ⇒ subscribe(StandardOutLogger, classFor(l)))
- publish(Info(this, "shutting down: StandardOutLogger started"))
+ publish(Info(simpleName(this), "shutting down: StandardOutLogger started"))
}
for {
logger ← loggers
@@ -135,7 +139,7 @@ trait LoggingBus extends ActorEventBus {
unsubscribe(logger)
logger.stop()
}
- publish(Info(this, "all default loggers stopped"))
+ publish(Info(simpleName(this), "all default loggers stopped"))
}
private def addLogger(system: ActorSystemImpl, clazz: Class[_ <: Actor], level: LogLevel): ActorRef = {
@@ -144,17 +148,52 @@ trait LoggingBus extends ActorEventBus {
implicit val timeout = Timeout(3 seconds)
val response = try actor ? InitializeLogger(this) get catch {
case _: FutureTimeoutException ⇒
- publish(Warning(this, "Logger " + name + " did not respond within " + timeout + " to InitializeLogger(bus)"))
+ publish(Warning(simpleName(this), "Logger " + name + " did not respond within " + timeout + " to InitializeLogger(bus)"))
}
if (response != LoggerInitialized)
throw new LoggerInitializationException("Logger " + name + " did not respond with LoggerInitialized, sent instead " + response)
AllLogLevels filter (level >= _) foreach (l ⇒ subscribe(actor, classFor(l)))
- publish(Info(this, "logger " + name + " started"))
+ publish(Info(simpleName(this), "logger " + name + " started"))
actor
}
}
+trait LogSource[-T] {
+ def genString(t: T): String
+}
+
+object LogSource {
+ implicit val fromString: LogSource[String] = new LogSource[String] {
+ def genString(s: String) = s
+ }
+
+ implicit val fromActor: LogSource[Actor] = new LogSource[Actor] {
+ def genString(a: Actor) = a.self.toString
+ }
+
+ implicit val fromActorRef: LogSource[ActorRef] = new LogSource[ActorRef] {
+ def genString(a: ActorRef) = a.toString
+ }
+
+ // this one unfortunately does not work as implicit, because existential types have some weird behavior
+ val fromClass: LogSource[Class[_]] = new LogSource[Class[_]] {
+ def genString(c: Class[_]) = simpleName(c)
+ }
+ implicit def fromAnyClass[T]: LogSource[Class[T]] = fromClass.asInstanceOf[LogSource[Class[T]]]
+
+ def apply[T: LogSource](o: T) = implicitly[LogSource[T]].genString(o)
+
+ def fromAnyRef(o: AnyRef): String =
+ o match {
+ case c: Class[_] ⇒ fromClass.genString(c)
+ case a: Actor ⇒ fromActor.genString(a)
+ case a: ActorRef ⇒ fromActorRef.genString(a)
+ case s: String ⇒ s
+ case x ⇒ simpleName(x)
+ }
+}
+
/**
* Main entry point for Akka logging: log levels and message types (aka
* channels) defined for the main transport medium, the main event bus. The
@@ -235,24 +274,26 @@ object Logging {
/**
* Obtain LoggingAdapter for the given application and source object. The
- * source object is used to identify the source of this logging channel.
+ * source is used to identify the source of this logging channel and must have
+ * a corresponding LogSource[T] instance in scope; by default these are
+ * provided for Class[_], Actor, ActorRef and String types.
*/
- def apply(system: ActorSystem, source: AnyRef): LoggingAdapter = new BusLogging(system.eventStream, source)
+ def apply[T: LogSource](eventStream: LoggingBus, logSource: T): LoggingAdapter =
+ new BusLogging(eventStream, implicitly[LogSource[T]].genString(logSource))
+
/**
* Java API: Obtain LoggingAdapter for the given application and source object. The
- * source object is used to identify the source of this logging channel.
+ * source object is used to identify the source of this logging channel; if it is
+ * an Actor or ActorRef, its address is used, in case of a class an approximation of
+ * its simpleName and in all other cases the simpleName of its class.
*/
- def getLogger(system: ActorSystem, source: AnyRef): LoggingAdapter = apply(system, source)
- /**
- * Obtain LoggingAdapter for the given event bus and source object. The
- * source object is used to identify the source of this logging channel.
- */
- def apply(bus: LoggingBus, source: AnyRef): LoggingAdapter = new BusLogging(bus, source)
+ def getLogger(system: ActorSystem, logSource: AnyRef): LoggingAdapter = apply(system.eventStream, LogSource.fromAnyRef(logSource))
+
/**
* Java API: Obtain LoggingAdapter for the given event bus and source object. The
* source object is used to identify the source of this logging channel.
*/
- def getLogger(bus: LoggingBus, source: AnyRef): LoggingAdapter = apply(bus, source)
+ def getLogger(bus: LoggingBus, logSource: AnyRef): LoggingAdapter = apply(bus, LogSource.fromAnyRef(logSource))
/**
* Artificial exception injected into Error events if no Throwable is
@@ -266,22 +307,22 @@ object Logging {
def level: LogLevel
}
- case class Error(cause: Throwable, instance: AnyRef, message: Any = "") extends LogEvent {
+ case class Error(cause: Throwable, logSource: String, message: Any = "") extends LogEvent {
def level = ErrorLevel
}
object Error {
- def apply(instance: AnyRef, message: Any) = new Error(new EventHandlerException, instance, message)
+ def apply(logSource: String, message: Any) = new Error(new EventHandlerException, logSource, message)
}
- case class Warning(instance: AnyRef, message: Any = "") extends LogEvent {
+ case class Warning(logSource: String, message: Any = "") extends LogEvent {
def level = WarningLevel
}
- case class Info(instance: AnyRef, message: Any = "") extends LogEvent {
+ case class Info(logSource: String, message: Any = "") extends LogEvent {
def level = InfoLevel
}
- case class Debug(instance: AnyRef, message: Any = "") extends LogEvent {
+ case class Debug(logSource: String, message: Any = "") extends LogEvent {
def level = DebugLevel
}
@@ -318,7 +359,7 @@ object Logging {
case e: Warning ⇒ warning(e)
case e: Info ⇒ info(e)
case e: Debug ⇒ debug(e)
- case e ⇒ warning(Warning(this, "received unexpected event of class " + e.getClass + ": " + e))
+ case e ⇒ warning(Warning(simpleName(this), "received unexpected event of class " + e.getClass + ": " + e))
}
}
@@ -326,7 +367,7 @@ object Logging {
println(errorFormat.format(
timestamp,
event.thread.getName,
- instanceName(event.instance),
+ event.logSource,
event.message,
stackTraceFor(event.cause)))
@@ -334,21 +375,21 @@ object Logging {
println(warningFormat.format(
timestamp,
event.thread.getName,
- instanceName(event.instance),
+ event.logSource,
event.message))
def info(event: Info) =
println(infoFormat.format(
timestamp,
event.thread.getName,
- instanceName(event.instance),
+ event.logSource,
event.message))
def debug(event: Debug) =
println(debugFormat.format(
timestamp,
event.thread.getName,
- instanceName(event.instance),
+ event.logSource,
event.message))
def instanceName(instance: AnyRef): String = instance match {
@@ -491,7 +532,7 @@ trait LoggingAdapter {
}
}
-class BusLogging(val bus: LoggingBus, val loggingInstance: AnyRef) extends LoggingAdapter {
+class BusLogging(val bus: LoggingBus, val logSource: String) extends LoggingAdapter {
import Logging._
@@ -500,14 +541,14 @@ class BusLogging(val bus: LoggingBus, val loggingInstance: AnyRef) extends Loggi
def isInfoEnabled = bus.logLevel >= InfoLevel
def isDebugEnabled = bus.logLevel >= DebugLevel
- protected def notifyError(message: String) { bus.publish(Error(loggingInstance, message)) }
+ protected def notifyError(message: String) { bus.publish(Error(logSource, message)) }
- protected def notifyError(cause: Throwable, message: String) { bus.publish(Error(cause, loggingInstance, message)) }
+ protected def notifyError(cause: Throwable, message: String) { bus.publish(Error(cause, logSource, message)) }
- protected def notifyWarning(message: String) { bus.publish(Warning(loggingInstance, message)) }
+ protected def notifyWarning(message: String) { bus.publish(Warning(logSource, message)) }
- protected def notifyInfo(message: String) { bus.publish(Info(loggingInstance, message)) }
+ protected def notifyInfo(message: String) { bus.publish(Info(logSource, message)) }
- protected def notifyDebug(message: String) { bus.publish(Debug(loggingInstance, message)) }
+ protected def notifyDebug(message: String) { bus.publish(Debug(logSource, message)) }
}
diff --git a/akka-actor/src/main/scala/akka/util/JMX.scala b/akka-actor/src/main/scala/akka/util/JMX.scala
index 2bf8545210..2c87524843 100644
--- a/akka-actor/src/main/scala/akka/util/JMX.scala
+++ b/akka-actor/src/main/scala/akka/util/JMX.scala
@@ -24,7 +24,7 @@ object JMX {
case e: InstanceAlreadyExistsException ⇒
Some(mbeanServer.getObjectInstance(name))
case e: Exception ⇒
- system.eventStream.publish(Error(e, this, "Error when registering mbean [%s]".format(mbean)))
+ system.eventStream.publish(Error(e, "JMX", "Error when registering mbean [%s]".format(mbean)))
None
}
@@ -32,6 +32,6 @@ object JMX {
mbeanServer.unregisterMBean(mbean)
} catch {
case e: InstanceNotFoundException ⇒ {}
- case e: Exception ⇒ system.eventStream.publish(Error(e, this, "Error while unregistering mbean [%s]".format(mbean)))
+ case e: Exception ⇒ system.eventStream.publish(Error(e, "JMX", "Error while unregistering mbean [%s]".format(mbean)))
}
}
diff --git a/akka-durable-mailboxes/akka-beanstalk-mailbox/src/main/scala/akka/actor/mailbox/BeanstalkBasedMailbox.scala b/akka-durable-mailboxes/akka-beanstalk-mailbox/src/main/scala/akka/actor/mailbox/BeanstalkBasedMailbox.scala
index fae743b46c..5a0cbeebe7 100644
--- a/akka-durable-mailboxes/akka-beanstalk-mailbox/src/main/scala/akka/actor/mailbox/BeanstalkBasedMailbox.scala
+++ b/akka-durable-mailboxes/akka-beanstalk-mailbox/src/main/scala/akka/actor/mailbox/BeanstalkBasedMailbox.scala
@@ -28,7 +28,7 @@ class BeanstalkBasedMailbox(val owner: ActorCell) extends DurableMailbox(owner)
val messageSubmitTimeout = Duration(system.settings.config.getInt("akka.actor.mailbox.beanstalk.message-submit-timeout", 5), defaultTimeUnit).toSeconds.toInt
val messageTimeToLive = Duration(system.settings.config.getInt("akka.actor.mailbox.beanstalk.message-time-to-live", 120), defaultTimeUnit).toSeconds.toInt
- val log = Logging(system, this)
+ val log = Logging(system, "BeanstalkBasedMailbox")
private val queue = new ThreadLocal[Client] { override def initialValue = connect(name) }
diff --git a/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/FiledBasedMailbox.scala b/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/FiledBasedMailbox.scala
index 2ea90fb7d7..869bada6cd 100644
--- a/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/FiledBasedMailbox.scala
+++ b/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/FiledBasedMailbox.scala
@@ -19,7 +19,7 @@ object FileBasedMailbox {
class FileBasedMailbox(val owner: ActorCell) extends DurableMailbox(owner) with DurableMessageSerialization {
- val log = Logging(system, this)
+ val log = Logging(system, "FileBasedMailbox")
val queuePath = FileBasedMailbox.queuePath(owner.system.settings.config)
diff --git a/akka-durable-mailboxes/akka-mongo-mailbox/src/main/scala/akka/actor/mailbox/MongoBasedMailbox.scala b/akka-durable-mailboxes/akka-mongo-mailbox/src/main/scala/akka/actor/mailbox/MongoBasedMailbox.scala
index 9a262fd3b3..def5a1a0c2 100644
--- a/akka-durable-mailboxes/akka-mongo-mailbox/src/main/scala/akka/actor/mailbox/MongoBasedMailbox.scala
+++ b/akka-durable-mailboxes/akka-mongo-mailbox/src/main/scala/akka/actor/mailbox/MongoBasedMailbox.scala
@@ -38,7 +38,7 @@ class MongoBasedMailbox(val owner: ActorCell) extends DurableMailbox(owner) {
val writeTimeout = system.settings.config.getInt(WRITE_TIMEOUT_KEY, 3000)
val readTimeout = system.settings.config.getInt(READ_TIMEOUT_KEY, 3000)
- val log = Logging(system, this)
+ val log = Logging(system, "MongoBasedMailbox")
@volatile
private var mongo = connect()
diff --git a/akka-durable-mailboxes/akka-redis-mailbox/src/main/scala/akka/actor/mailbox/RedisBasedMailbox.scala b/akka-durable-mailboxes/akka-redis-mailbox/src/main/scala/akka/actor/mailbox/RedisBasedMailbox.scala
index ca765bfd62..65f27b3b8f 100644
--- a/akka-durable-mailboxes/akka-redis-mailbox/src/main/scala/akka/actor/mailbox/RedisBasedMailbox.scala
+++ b/akka-durable-mailboxes/akka-redis-mailbox/src/main/scala/akka/actor/mailbox/RedisBasedMailbox.scala
@@ -20,7 +20,7 @@ class RedisBasedMailbox(val owner: ActorCell) extends DurableMailbox(owner) with
@volatile
private var clients = connect() // returns a RedisClientPool for multiple asynchronous message handling
- val log = Logging(system, this)
+ val log = Logging(system, "RedisBasedMailbox")
def enqueue(receiver: ActorRef, envelope: Envelope) {
log.debug("ENQUEUING message in redis-based mailbox [%s]".format(envelope))
diff --git a/akka-durable-mailboxes/akka-zookeeper-mailbox/src/main/scala/akka/actor/mailbox/ZooKeeperBasedMailbox.scala b/akka-durable-mailboxes/akka-zookeeper-mailbox/src/main/scala/akka/actor/mailbox/ZooKeeperBasedMailbox.scala
index 6fff77d7cc..dd00a4cfdc 100644
--- a/akka-durable-mailboxes/akka-zookeeper-mailbox/src/main/scala/akka/actor/mailbox/ZooKeeperBasedMailbox.scala
+++ b/akka-durable-mailboxes/akka-zookeeper-mailbox/src/main/scala/akka/actor/mailbox/ZooKeeperBasedMailbox.scala
@@ -30,7 +30,7 @@ class ZooKeeperBasedMailbox(val owner: ActorCell) extends DurableMailbox(owner)
val queueNode = "/queues"
val queuePathTemplate = queueNode + "/%s"
- val log = Logging(system, this)
+ val log = Logging(system, "ZooKeeperBasedMailbox")
private val zkClient = new AkkaZkClient(zkServerAddresses, sessionTimeout, connectionTimeout)
private val queue = new ZooKeeperQueue[Array[Byte]](zkClient, queuePathTemplate.format(name), blockingQueue)
diff --git a/akka-remote/src/main/scala/akka/remote/Gossiper.scala b/akka-remote/src/main/scala/akka/remote/Gossiper.scala
index 56a59b2ae2..636d8a67ec 100644
--- a/akka-remote/src/main/scala/akka/remote/Gossiper.scala
+++ b/akka-remote/src/main/scala/akka/remote/Gossiper.scala
@@ -102,7 +102,7 @@ class Gossiper(remote: Remote) {
nodeMembershipChangeListeners: Set[NodeMembershipChangeListener] = Set.empty[NodeMembershipChangeListener])
private val system = remote.system
- private val log = Logging(system, this)
+ private val log = Logging(system, "Gossiper")
private val failureDetector = remote.failureDetector
private val connectionManager = new RemoteConnectionManager(system, remote, Map.empty[RemoteAddress, ActorRef])
private val seeds = Set(address) // FIXME read in list of seeds from config
diff --git a/akka-remote/src/main/scala/akka/remote/Remote.scala b/akka-remote/src/main/scala/akka/remote/Remote.scala
index cf3b93b311..13d813145b 100644
--- a/akka-remote/src/main/scala/akka/remote/Remote.scala
+++ b/akka-remote/src/main/scala/akka/remote/Remote.scala
@@ -28,7 +28,7 @@ import java.util.concurrent.atomic.AtomicLong
*/
class Remote(val system: ActorSystemImpl, val nodename: String) {
- val log = Logging(system, this)
+ val log = Logging(system, "Remote")
import system._
import settings._
@@ -264,7 +264,7 @@ class RemoteMessage(input: RemoteMessageProtocol, remote: RemoteSupport, classLo
.newInstance(exception.getMessage).asInstanceOf[Throwable]
} catch {
case problem: Exception ⇒
- remote.system.eventStream.publish(Logging.Error(problem, remote, problem.getMessage))
+ remote.system.eventStream.publish(Logging.Error(problem, "RemoteMessage", problem.getMessage))
CannotInstantiateRemoteExceptionDueToRemoteProtocolParsingErrorException(problem, classname, exception.getMessage)
}
}
diff --git a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala
index facbf6cba1..7aa723ebb9 100644
--- a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala
+++ b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala
@@ -34,7 +34,8 @@ class RemoteActorRefProvider(
val dispatcher: MessageDispatcher,
val scheduler: Scheduler) extends ActorRefProvider {
- val log = Logging(eventStream, this)
+ val log = Logging(eventStream, "RemoteActorRefProvider")
+
val local = new LocalActorRefProvider(settings, rootPath, eventStream, dispatcher, scheduler)
def deathWatch = local.deathWatch
diff --git a/akka-remote/src/main/scala/akka/remote/RemoteConnectionManager.scala b/akka-remote/src/main/scala/akka/remote/RemoteConnectionManager.scala
index e128742365..7b739b6199 100644
--- a/akka-remote/src/main/scala/akka/remote/RemoteConnectionManager.scala
+++ b/akka-remote/src/main/scala/akka/remote/RemoteConnectionManager.scala
@@ -25,7 +25,7 @@ class RemoteConnectionManager(
initialConnections: Map[RemoteAddress, ActorRef] = Map.empty[RemoteAddress, ActorRef])
extends ConnectionManager {
- val log = Logging(system, this)
+ val log = Logging(system, "RemoteConnectionManager")
// FIXME is this VersionedIterable really needed? It is not used I think. Complicates API. See 'def connections' etc.
case class State(version: Long, connections: Map[RemoteAddress, ActorRef])
diff --git a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala
index f6110f62d4..38e03a968f 100644
--- a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala
+++ b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala
@@ -39,7 +39,7 @@ abstract class RemoteClient private[akka] (
val remoteSupport: NettyRemoteSupport,
val remoteAddress: RemoteAddress) {
- val log = Logging(remoteSupport.system, this)
+ val log = Logging(remoteSupport.system, "RemoteClient")
val name = simpleName(this) + "@" + remoteAddress
@@ -351,7 +351,7 @@ class ActiveRemoteClientHandler(
* Provides the implementation of the Netty remote support
*/
class NettyRemoteSupport(_system: ActorSystem) extends RemoteSupport(_system) with RemoteMarshallingOps {
- val log = Logging(system, this)
+ val log = Logging(system, "NettyRemoteSupport")
val serverSettings = new RemoteServerSettings(system.settings.config, system.settings.DefaultTimeUnit)
val clientSettings = new RemoteClientSettings(system.settings.config, system.settings.DefaultTimeUnit)
@@ -481,7 +481,7 @@ class NettyRemoteSupport(_system: ActorSystem) extends RemoteSupport(_system) wi
}
class NettyRemoteServer(val remoteSupport: NettyRemoteSupport, val loader: Option[ClassLoader]) {
- val log = Logging(remoteSupport.system, this)
+ val log = Logging(remoteSupport.system, "NettyRemoteServer")
import remoteSupport.serverSettings._
val address = remoteSupport.system.rootPath.remoteAddress
@@ -586,7 +586,7 @@ class RemoteServerHandler(
val applicationLoader: Option[ClassLoader],
val remoteSupport: NettyRemoteSupport) extends SimpleChannelUpstreamHandler {
- val log = Logging(remoteSupport.system, this)
+ val log = Logging(remoteSupport.system, "RemoteServerHandler")
import remoteSupport.serverSettings._
diff --git a/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala b/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala
index ff53016a74..fe3b406575 100644
--- a/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala
+++ b/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala
@@ -110,6 +110,8 @@ class CallingThreadDispatcher(
val name: String = "calling-thread") extends MessageDispatcher(_prerequisites) {
import CallingThreadDispatcher._
+ val log = akka.event.Logging(prerequisites.eventStream, "CallingThreadDispatcher")
+
protected[akka] override def createMailbox(actor: ActorCell) = new CallingThreadMailbox(actor)
private def getMailbox(actor: ActorCell): Option[CallingThreadMailbox] = actor.mailbox match {
@@ -215,12 +217,12 @@ class CallingThreadDispatcher(
true
} catch {
case ie: InterruptedException ⇒
- prerequisites.eventStream.publish(Error(this, ie))
+ log.error(ie, "Interrupted during message processing")
Thread.currentThread().interrupt()
intex = ie
true
case e ⇒
- prerequisites.eventStream.publish(Error(this, e))
+ log.error(e, "Error during message processing")
queue.leave
false
}
diff --git a/akka-testkit/src/main/scala/akka/testkit/TestEventListener.scala b/akka-testkit/src/main/scala/akka/testkit/TestEventListener.scala
index 13b3587624..8cf7a8da4a 100644
--- a/akka-testkit/src/main/scala/akka/testkit/TestEventListener.scala
+++ b/akka-testkit/src/main/scala/akka/testkit/TestEventListener.scala
@@ -95,26 +95,20 @@ abstract class EventFilter(occurrences: Int) {
/*
* these default values are just there for easier subclassing
*/
- protected val source: Option[AnyRef] = None
+ protected val source: Option[String] = None
protected val message: Either[String, Regex] = Left("")
protected val complete: Boolean = false
/**
* internal implementation helper, no guaranteed API
*/
- protected def doMatch(src: AnyRef, msg: Any) = {
+ protected def doMatch(src: String, msg: Any) = {
val msgstr = if (msg != null) msg.toString else "null"
- (source.isDefined && sourceMatch(src) || source.isEmpty) &&
+ (source.isDefined && source.get == src || source.isEmpty) &&
(message match {
case Left(s) ⇒ if (complete) msgstr == s else msgstr.startsWith(s)
case Right(p) ⇒ p.findFirstIn(msgstr).isDefined
})
}
- private def sourceMatch(src: AnyRef) = {
- source.get match {
- case c: Class[_] ⇒ c isInstance src
- case s ⇒ src == s
- }
- }
}
/**
@@ -151,7 +145,7 @@ object EventFilter {
* `null` does NOT work (passing `null` disables the
* source filter).''
*/
- def apply[A <: Throwable: Manifest](message: String = null, source: AnyRef = null, start: String = "", pattern: String = null, occurrences: Int = Int.MaxValue): EventFilter =
+ def apply[A <: Throwable: Manifest](message: String = null, source: String = null, start: String = "", pattern: String = null, occurrences: Int = Int.MaxValue): EventFilter =
ErrorFilter(manifest[A].erasure, Option(source),
if (message ne null) Left(message) else Option(pattern) map (new Regex(_)) toRight start,
message ne null)(occurrences)
@@ -170,7 +164,7 @@ object EventFilter {
* `null` does NOT work (passing `null` disables the
* source filter).''
*/
- def warning(message: String = null, source: AnyRef = null, start: String = "", pattern: String = null, occurrences: Int = Int.MaxValue): EventFilter =
+ def warning(message: String = null, source: String = null, start: String = "", pattern: String = null, occurrences: Int = Int.MaxValue): EventFilter =
WarningFilter(Option(source),
if (message ne null) Left(message) else Option(pattern) map (new Regex(_)) toRight start,
message ne null)(occurrences)
@@ -189,7 +183,7 @@ object EventFilter {
* `null` does NOT work (passing `null` disables the
* source filter).''
*/
- def info(message: String = null, source: AnyRef = null, start: String = "", pattern: String = null, occurrences: Int = Int.MaxValue): EventFilter =
+ def info(message: String = null, source: String = null, start: String = "", pattern: String = null, occurrences: Int = Int.MaxValue): EventFilter =
InfoFilter(Option(source),
if (message ne null) Left(message) else Option(pattern) map (new Regex(_)) toRight start,
message ne null)(occurrences)
@@ -208,7 +202,7 @@ object EventFilter {
* `null` does NOT work (passing `null` disables the
* source filter).''
*/
- def debug(message: String = null, source: AnyRef = null, start: String = "", pattern: String = null, occurrences: Int = Int.MaxValue): EventFilter =
+ def debug(message: String = null, source: String = null, start: String = "", pattern: String = null, occurrences: Int = Int.MaxValue): EventFilter =
DebugFilter(Option(source),
if (message ne null) Left(message) else Option(pattern) map (new Regex(_)) toRight start,
message ne null)(occurrences)
@@ -244,7 +238,7 @@ object EventFilter {
*/
case class ErrorFilter(
throwable: Class[_],
- override val source: Option[AnyRef],
+ override val source: Option[String],
override val message: Either[String, Regex],
override val complete: Boolean)(occurrences: Int) extends EventFilter(occurrences) {
@@ -272,7 +266,7 @@ case class ErrorFilter(
* @param complete
* whether the event’s message must match the given message string or pattern completely
*/
- def this(throwable: Class[_], source: AnyRef, message: String, pattern: Boolean, complete: Boolean, occurrences: Int) =
+ def this(throwable: Class[_], source: String, message: String, pattern: Boolean, complete: Boolean, occurrences: Int) =
this(throwable, Option(source),
if (message eq null) Left("")
else if (pattern) Right(new Regex(message))
@@ -295,7 +289,7 @@ case class ErrorFilter(
* If you want to match all Warning events, the most efficient is to use Left("").
*/
case class WarningFilter(
- override val source: Option[AnyRef],
+ override val source: Option[String],
override val message: Either[String, Regex],
override val complete: Boolean)(occurrences: Int) extends EventFilter(occurrences) {
@@ -321,7 +315,7 @@ case class WarningFilter(
* @param complete
* whether the event’s message must match the given message string or pattern completely
*/
- def this(source: AnyRef, message: String, pattern: Boolean, complete: Boolean, occurrences: Int) =
+ def this(source: String, message: String, pattern: Boolean, complete: Boolean, occurrences: Int) =
this(Option(source),
if (message eq null) Left("")
else if (pattern) Right(new Regex(message))
@@ -338,7 +332,7 @@ case class WarningFilter(
* If you want to match all Info events, the most efficient is to use Left("").
*/
case class InfoFilter(
- override val source: Option[AnyRef],
+ override val source: Option[String],
override val message: Either[String, Regex],
override val complete: Boolean)(occurrences: Int) extends EventFilter(occurrences) {
@@ -364,7 +358,7 @@ case class InfoFilter(
* @param complete
* whether the event’s message must match the given message string or pattern completely
*/
- def this(source: AnyRef, message: String, pattern: Boolean, complete: Boolean, occurrences: Int) =
+ def this(source: String, message: String, pattern: Boolean, complete: Boolean, occurrences: Int) =
this(Option(source),
if (message eq null) Left("")
else if (pattern) Right(new Regex(message))
@@ -381,7 +375,7 @@ case class InfoFilter(
* If you want to match all Debug events, the most efficient is to use Left("").
*/
case class DebugFilter(
- override val source: Option[AnyRef],
+ override val source: Option[String],
override val message: Either[String, Regex],
override val complete: Boolean)(occurrences: Int) extends EventFilter(occurrences) {
@@ -407,7 +401,7 @@ case class DebugFilter(
* @param complete
* whether the event’s message must match the given message string or pattern completely
*/
- def this(source: AnyRef, message: String, pattern: Boolean, complete: Boolean, occurrences: Int) =
+ def this(source: String, message: String, pattern: Boolean, complete: Boolean, occurrences: Int) =
this(Option(source),
if (message eq null) Left("")
else if (pattern) Right(new Regex(message))
@@ -452,12 +446,12 @@ class TestEventListener extends Logging.DefaultLogger {
case event: LogEvent ⇒ if (!filter(event)) print(event)
case DeadLetter(msg: SystemMessage, _, rcp) ⇒
if (!msg.isInstanceOf[Terminate]) {
- val event = Warning(rcp, "received dead system message: " + msg)
+ val event = Warning(rcp.toString, "received dead system message: " + msg)
if (!filter(event)) print(event)
}
case DeadLetter(msg, snd, rcp) ⇒
if (!msg.isInstanceOf[Terminated]) {
- val event = Warning(rcp, "received dead letter from " + snd + ": " + msg)
+ val event = Warning(rcp.toString, "received dead letter from " + snd + ": " + msg)
if (!filter(event)) print(event)
}
}
diff --git a/akka-testkit/src/main/scala/akka/testkit/TestKit.scala b/akka-testkit/src/main/scala/akka/testkit/TestKit.scala
index 857d9f22ee..9f225a07cc 100644
--- a/akka-testkit/src/main/scala/akka/testkit/TestKit.scala
+++ b/akka-testkit/src/main/scala/akka/testkit/TestKit.scala
@@ -602,5 +602,5 @@ object TestProbe {
}
trait ImplicitSender { this: TestKit ⇒
- implicit def implicitSenderTestActor = testActor
+ implicit def self = testActor
}
diff --git a/akka-testkit/src/test/scala/akka/testkit/AkkaSpec.scala b/akka-testkit/src/test/scala/akka/testkit/AkkaSpec.scala
index c365cd43fa..b3e34de2f3 100644
--- a/akka-testkit/src/test/scala/akka/testkit/AkkaSpec.scala
+++ b/akka-testkit/src/test/scala/akka/testkit/AkkaSpec.scala
@@ -18,7 +18,7 @@ object TimingTest extends Tag("timing")
abstract class AkkaSpec(_application: ActorSystem = ActorSystem())
extends TestKit(_application) with WordSpec with MustMatchers with BeforeAndAfterAll {
- val log: LoggingAdapter = Logging(system, this)
+ val log: LoggingAdapter = Logging(system, this.getClass)
final override def beforeAll {
atStartup()