diff --git a/akka-actor-tests/src/test/scala/akka/actor/FSMActorSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/FSMActorSpec.scala index 6d85c6997c..d1cda03f9d 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/FSMActorSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/FSMActorSpec.scala @@ -7,7 +7,7 @@ package akka.actor import org.scalatest.{ BeforeAndAfterAll, BeforeAndAfterEach } import akka.testkit._ -import TestEvent.{ Mute, UnMuteAll } +import TestEvent.Mute import FSM._ import akka.util.Duration import akka.util.duration._ @@ -61,7 +61,7 @@ object FSMActorSpec { whenUnhandled { case Ev(msg) ⇒ { - app.eventHandler.info(this, "unhandled event " + msg + " in state " + stateName + " with data " + stateData) + log.info("unhandled event " + msg + " in state " + stateName + " with data " + stateData) unhandledLatch.open stay } @@ -132,8 +132,8 @@ class FSMActorSpec extends AkkaSpec(Configuration("akka.actor.debug.fsm" -> true lockedLatch.await filterEvents(EventFilter.custom { - case EventHandler.Info(_: Lock, _) ⇒ true - case _ ⇒ false + case Logging.Info(_: Lock, _) ⇒ true + case _ ⇒ false }) { lock ! "not_handled" unhandledLatch.await @@ -163,18 +163,13 @@ class FSMActorSpec extends AkkaSpec(Configuration("akka.actor.debug.fsm" -> true case Ev("go") ⇒ goto(2) } }) - val logger = actorOf(new Actor { - def receive = { - case x ⇒ testActor forward x - } - }) - filterException[EventHandler.EventHandlerException] { - app.eventHandler.addListener(logger) + filterException[Logging.EventHandlerException] { + app.mainbus.subscribe(testActor, classOf[Logging.Error]) fsm ! "go" - expectMsgPF(1 second) { - case EventHandler.Error(_: EventHandler.EventHandlerException, `fsm`, "Next state 2 does not exist") ⇒ true + expectMsgPF(1 second, hint = "Next state 2 does not exist") { + case Logging.Error(_, `fsm`, "Next state 2 does not exist") ⇒ true } - app.eventHandler.removeListener(logger) + app.mainbus.unsubscribe(testActor) } } @@ -198,9 +193,9 @@ class FSMActorSpec extends AkkaSpec(Configuration("akka.actor.debug.fsm" -> true new TestKit(AkkaApplication("fsm event", AkkaApplication.defaultConfig ++ Configuration("akka.event-handler-level" -> "DEBUG", "akka.actor.debug.fsm" -> true))) { - app.eventHandler.notify(TestEvent.Mute(EventFilter.custom { - case _: EventHandler.Debug ⇒ true - case _ ⇒ false + app.mainbus.publish(TestEvent.Mute(EventFilter.custom { + case _: Logging.Debug ⇒ true + case _ ⇒ false })) val fsm = TestActorRef(new Actor with LoggingFSM[Int, Null] { startWith(1, null) @@ -218,25 +213,20 @@ class FSMActorSpec extends AkkaSpec(Configuration("akka.actor.debug.fsm" -> true case StopEvent(r, _, _) ⇒ testActor ! r } }) - val logger = actorOf(new Actor { - def receive = { - case x ⇒ testActor forward x - } - }) - app.eventHandler.addListener(logger) + app.mainbus.subscribe(testActor, classOf[Logging.Debug]) fsm ! "go" - expectMsgPF(1 second) { - case EventHandler.Debug(`fsm`, s: String) if s.startsWith("processing Event(go,null) from Actor[testActor") ⇒ true + expectMsgPF(1 second, hint = "processing Event(go,null)") { + case Logging.Debug(`fsm`, s: String) if s.startsWith("processing Event(go,null) from Actor[testActor") ⇒ true } - expectMsg(1 second, EventHandler.Debug(fsm, "setting timer 't'/1500 milliseconds: Shutdown")) - expectMsg(1 second, EventHandler.Debug(fsm, "transition 1 -> 2")) + expectMsg(1 second, Logging.Debug(fsm, "setting timer 't'/1500 milliseconds: Shutdown")) + expectMsg(1 second, Logging.Debug(fsm, "transition 1 -> 2")) fsm ! "stop" - expectMsgPF(1 second) { - case EventHandler.Debug(`fsm`, s: String) if s.startsWith("processing Event(stop,null) from Actor[testActor") ⇒ true + expectMsgPF(1 second, hint = "processing Event(stop,null)") { + case Logging.Debug(`fsm`, s: String) if s.startsWith("processing Event(stop,null) from Actor[testActor") ⇒ true } - expectMsgAllOf(1 second, EventHandler.Debug(fsm, "canceling timer 't'"), Normal) + expectMsgAllOf(1 second, Logging.Debug(fsm, "canceling timer 't'"), Normal) expectNoMsg(1 second) - app.eventHandler.removeListener(logger) + app.mainbus.unsubscribe(testActor) } } diff --git a/akka-actor-tests/src/test/scala/akka/actor/LoggingReceiveSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/LoggingReceiveSpec.scala index 5ea68924d1..e708b4771e 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/LoggingReceiveSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/LoggingReceiveSpec.scala @@ -10,7 +10,7 @@ import org.scalatest.WordSpec import akka.AkkaApplication import akka.AkkaApplication.defaultConfig import akka.config.Configuration -import akka.event.EventHandler +import akka.event.Logging object LoggingReceiveSpec { class TestLogActor extends Actor { @@ -28,13 +28,13 @@ class LoggingReceiveSpec extends WordSpec with BeforeAndAfterEach with BeforeAnd val appLifecycle = AkkaApplication("lifecycle", config ++ Configuration("akka.actor.debug.lifecycle" -> true)) val filter = TestEvent.Mute(EventFilter.custom { - case _: EventHandler.Debug ⇒ true - case _: EventHandler.Info ⇒ true - case _ ⇒ false + case _: Logging.Debug ⇒ true + case _: Logging.Info ⇒ true + case _ ⇒ false }) - appLogging.eventHandler.notify(filter) - appAuto.eventHandler.notify(filter) - appLifecycle.eventHandler.notify(filter) + appLogging.mainbus.publish(filter) + appAuto.mainbus.publish(filter) + appLifecycle.mainbus.publish(filter) def ignoreMute(t: TestKit) { t.ignoreMsg { @@ -46,20 +46,20 @@ class LoggingReceiveSpec extends WordSpec with BeforeAndAfterEach with BeforeAnd "decorate a Receive" in { new TestKit(appLogging) { - app.eventHandler.addListener(testActor) + app.mainbus.subscribe(testActor, classOf[Logging.Debug]) val r: Actor.Receive = { case null ⇒ } val log = Actor.LoggingReceive(this, r) log.isDefinedAt("hallo") - expectMsg(1 second, EventHandler.Debug(this, "received unhandled message hallo")) - } + expectMsg(1 second, Logging.Debug(this, "received unhandled message hallo")) + }.app.stop() } "be added on Actor if requested" in { new TestKit(appLogging) with ImplicitSender { ignoreMute(this) - app.eventHandler.addListener(testActor) + app.mainbus.subscribe(testActor, classOf[Logging.Debug]) val actor = TestActorRef(new Actor { def receive = loggable(this) { case _ ⇒ channel ! "x" @@ -67,7 +67,7 @@ class LoggingReceiveSpec extends WordSpec with BeforeAndAfterEach with BeforeAnd }) actor ! "buh" within(1 second) { - expectMsg(EventHandler.Debug(actor.underlyingActor, "received handled message buh")) + expectMsg(Logging.Debug(actor.underlyingActor, "received handled message buh")) expectMsg("x") } val r: Actor.Receive = { @@ -78,17 +78,16 @@ class LoggingReceiveSpec extends WordSpec with BeforeAndAfterEach with BeforeAnd within(300 millis) { actor ! "bah" expectMsgPF() { - case EventHandler.Error(_: UnhandledMessageException, `actor`, _) ⇒ true + case Logging.Error(_: UnhandledMessageException, `actor`, _) ⇒ true } } } - actor.stop() - } + }.app.stop() } "not duplicate logging" in { new TestKit(appLogging) with ImplicitSender { - app.eventHandler.addListener(testActor) + app.mainbus.subscribe(testActor, classOf[Logging.Debug]) val actor = TestActorRef(new Actor { def receive = loggable(this)(loggable(this) { case _ ⇒ channel ! "x" @@ -96,10 +95,10 @@ class LoggingReceiveSpec extends WordSpec with BeforeAndAfterEach with BeforeAnd }) actor ! "buh" within(1 second) { - expectMsg(EventHandler.Debug(actor.underlyingActor, "received handled message buh")) + expectMsg(Logging.Debug(actor.underlyingActor, "received handled message buh")) expectMsg("x") } - } + }.app.stop() } } @@ -108,65 +107,65 @@ class LoggingReceiveSpec extends WordSpec with BeforeAndAfterEach with BeforeAnd "log AutoReceiveMessages if requested" in { new TestKit(appAuto) { - app.eventHandler.addListener(testActor) + app.mainbus.subscribe(testActor, classOf[Logging.Debug]) val actor = TestActorRef(new Actor { def receive = { case _ ⇒ } }) actor ! PoisonPill - expectMsg(300 millis, EventHandler.Debug(actor.underlyingActor, "received AutoReceiveMessage PoisonPill")) + expectMsg(300 millis, Logging.Debug(actor.underlyingActor, "received AutoReceiveMessage PoisonPill")) awaitCond(actor.isShutdown, 100 millis) - } + }.app.stop() } // TODO remove ignore as soon as logging is working properly during start-up again "log LifeCycle changes if requested" ignore { new TestKit(appLifecycle) { ignoreMute(this) - app.eventHandler.addListener(testActor) + app.mainbus.subscribe(testActor, classOf[Logging.Debug]) within(2 seconds) { val supervisor = TestActorRef[TestLogActor](Props[TestLogActor].withFaultHandler(OneForOneStrategy(List(classOf[Throwable]), 5, 5000))) - expectMsg(EventHandler.Debug(supervisor, "started")) + expectMsg(Logging.Debug(supervisor, "started")) val actor = new TestActorRef[TestLogActor](app, Props[TestLogActor], supervisor, "none") expectMsgPF() { - case EventHandler.Debug(ref, msg: String) ⇒ ref == supervisor && msg.startsWith("now supervising") + case Logging.Debug(ref, msg: String) ⇒ ref == supervisor && msg.startsWith("now supervising") } - expectMsg(EventHandler.Debug(actor, "started")) + expectMsg(Logging.Debug(actor, "started")) supervisor startsMonitoring actor expectMsgPF(hint = "now monitoring") { - case EventHandler.Debug(ref, msg: String) ⇒ + case Logging.Debug(ref, msg: String) ⇒ ref == supervisor.underlyingActor && msg.startsWith("now monitoring") } supervisor stopsMonitoring actor expectMsgPF(hint = "stopped monitoring") { - case EventHandler.Debug(ref, msg: String) ⇒ + case Logging.Debug(ref, msg: String) ⇒ ref == supervisor.underlyingActor && msg.startsWith("stopped monitoring") } filterException[ActorKilledException] { actor ! Kill expectMsgPF() { - case EventHandler.Error(_: ActorKilledException, `actor`, "Kill") ⇒ true + case Logging.Error(_: ActorKilledException, `actor`, "Kill") ⇒ true } - expectMsg(EventHandler.Debug(actor, "restarting")) + expectMsg(Logging.Debug(actor, "restarting")) } awaitCond(msgAvailable) expectMsgPF(hint = "restarted") { - case EventHandler.Debug(`actor`, "restarted") ⇒ true + case Logging.Debug(`actor`, "restarted") ⇒ true } actor.stop() - expectMsg(EventHandler.Debug(actor, "stopping")) + expectMsg(Logging.Debug(actor, "stopping")) supervisor.stop() } - } + }.app.stop() } } diff --git a/akka-actor-tests/src/test/scala/akka/actor/RestartStrategySpec.scala b/akka-actor-tests/src/test/scala/akka/actor/RestartStrategySpec.scala index 1fa42ac61b..6021ecf8cc 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/RestartStrategySpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/RestartStrategySpec.scala @@ -15,14 +15,6 @@ import akka.testkit.AkkaSpec @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) class RestartStrategySpec extends AkkaSpec { - override def atStartup() { - app.eventHandler.notify(Mute(EventFilter[Exception]("Crashing..."))) - } - - override def atTermination() { - app.eventHandler.notify(UnMuteAll) - } - object Ping object Crash diff --git a/akka-actor-tests/src/test/scala/akka/actor/SchedulerSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/SchedulerSpec.scala index 9d4f0ebd4e..7ab547cf1f 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/SchedulerSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/SchedulerSpec.scala @@ -17,10 +17,6 @@ class SchedulerSpec extends AkkaSpec with BeforeAndAfterEach { future } - override def beforeEach { - app.eventHandler.notify(Mute(EventFilter[Exception]("CRASH"))) - } - override def afterEach { while (futures.peek() ne null) { Option(futures.poll()).foreach(_.cancel(true)) } } diff --git a/akka-actor-tests/src/test/scala/akka/actor/SupervisorSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/SupervisorSpec.scala index 20b1f92aea..0f90840564 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/SupervisorSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/SupervisorSpec.scala @@ -118,14 +118,8 @@ class SupervisorSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSende (pingpong1, pingpong2, pingpong3, topSupervisor) } - override def atStartup() = { - app.eventHandler notify Mute(EventFilter[Exception]("Die"), - EventFilter[IllegalStateException]("Don't wanna!"), - EventFilter[RuntimeException]("Expected")) - } - - override def atTermination() = { - app.eventHandler notify UnMuteAll + override def atStartup() { + app.mainbus.publish(Mute(EventFilter[RuntimeException](ExceptionMessage))) } override def beforeEach() = { diff --git a/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala index 318120a2a4..54a6d3bd0e 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala @@ -15,6 +15,7 @@ import akka.actor._ import util.control.NoStackTrace import akka.AkkaApplication import akka.util.duration._ +import akka.event.Logging.Error object ActorModelSpec { @@ -154,8 +155,8 @@ object ActorModelSpec { await(deadline)(stops == dispatcher.stops.get) } catch { case e ⇒ - app.eventHandler.error(e, dispatcher, "actual: starts=" + dispatcher.starts.get + ",stops=" + dispatcher.stops.get + - " required: starts=" + starts + ",stops=" + stops) + app.mainbus.publish(Error(e, dispatcher, "actual: starts=" + dispatcher.starts.get + ",stops=" + dispatcher.stops.get + + " required: starts=" + starts + ",stops=" + stops)) throw e } } @@ -215,9 +216,9 @@ object ActorModelSpec { await(deadline)(stats.restarts.get() == restarts) } catch { case e ⇒ - app.eventHandler.error(e, dispatcher, "actual: " + stats + ", required: InterceptorStats(susp=" + suspensions + + app.mainbus.publish(Error(e, dispatcher, "actual: " + stats + ", required: InterceptorStats(susp=" + suspensions + ",res=" + resumes + ",reg=" + registers + ",unreg=" + unregisters + - ",recv=" + msgsReceived + ",proc=" + msgsProcessed + ",restart=" + restarts) + ",recv=" + msgsReceived + ",proc=" + msgsProcessed + ",restart=" + restarts)) throw e } } @@ -321,7 +322,7 @@ abstract class ActorModelSpec extends AkkaSpec { try { f } catch { - case e ⇒ app.eventHandler.error(e, this, "error in spawned thread") + case e ⇒ app.mainbus.publish(Error(e, this, "error in spawned thread")) } } } @@ -373,7 +374,7 @@ abstract class ActorModelSpec extends AkkaSpec { } "continue to process messages when a thread gets interrupted" in { - filterEvents(EventFilter[InterruptedException], EventFilter[akka.event.EventHandler.EventHandlerException]) { + filterEvents(EventFilter[InterruptedException], EventFilter[akka.event.Logging.EventHandlerException]) { implicit val dispatcher = newInterceptedDispatcher implicit val timeout = Timeout(5 seconds) val a = newTestActor(dispatcher) diff --git a/akka-actor-tests/src/test/scala/akka/actor/dispatch/PinnedActorSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/dispatch/PinnedActorSpec.scala index a2f0a785de..f337c445b6 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/dispatch/PinnedActorSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/dispatch/PinnedActorSpec.scala @@ -2,8 +2,7 @@ package akka.actor.dispatch import java.util.concurrent.{ CountDownLatch, TimeUnit } -import akka.testkit.TestEvent._ -import akka.testkit.EventFilter +import akka.testkit._ import akka.dispatch.{ PinnedDispatcher, Dispatchers } import akka.actor.{ Props, Actor } import akka.testkit.AkkaSpec @@ -24,14 +23,6 @@ class PinnedActorSpec extends AkkaSpec with BeforeAndAfterEach { private val unit = TimeUnit.MILLISECONDS - override def beforeEach { - app.eventHandler.notify(Mute(EventFilter[RuntimeException]("Failure"))) - } - - override def afterEach { - app.eventHandler.notify(UnMuteAll) - } - "A PinnedActor" must { "support tell" in { @@ -51,13 +42,14 @@ class PinnedActorSpec extends AkkaSpec with BeforeAndAfterEach { "support ask/exception" in { val actor = actorOf(Props[TestActor].withDispatcher(app.dispatcherFactory.newPinnedDispatcher("test"))) - app.eventHandler.notify(Mute(EventFilter[RuntimeException]("Expected exception; to test fault-tolerance"))) - try { - (actor ? "Failure").get - fail("Should have thrown an exception") - } catch { - case e ⇒ - assert("Expected exception; to test fault-tolerance" === e.getMessage()) + filterException[RuntimeException] { + try { + (actor ? "Failure").get + fail("Should have thrown an exception") + } catch { + case e ⇒ + assert("Expected exception; to test fault-tolerance" === e.getMessage()) + } } actor.stop() } diff --git a/akka-actor-tests/src/test/scala/akka/event/EventBusSpec.scala b/akka-actor-tests/src/test/scala/akka/event/EventBusSpec.scala index 4861dd9ea5..afd255a27c 100644 --- a/akka-actor-tests/src/test/scala/akka/event/EventBusSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/event/EventBusSpec.scala @@ -147,7 +147,6 @@ object ActorEventBusSpec { type Classifier = String def classify(event: Event) = event.toString - protected def compareSubscribers(a: Subscriber, b: Subscriber): Int = a compareTo b protected def mapSize = 32 def publish(event: Event, subscriber: Subscriber) = subscriber ! event } diff --git a/akka-actor-tests/src/test/scala/akka/event/MainBusSpec.scala b/akka-actor-tests/src/test/scala/akka/event/MainBusSpec.scala new file mode 100644 index 0000000000..a132ae32d6 --- /dev/null +++ b/akka-actor-tests/src/test/scala/akka/event/MainBusSpec.scala @@ -0,0 +1,32 @@ +/** + * Copyright (C) 2009-2011 Typesafe Inc. + */ +package akka.event + +import akka.testkit.AkkaSpec +import akka.config.Configuration +import akka.util.duration._ + +object MainBusSpec { + case class M(i: Int) +} + +class MainBusSpec extends AkkaSpec(Configuration( + "akka.actor.debug.lifecycle" -> true, + "akka.actor.debug.mainbus" -> true)) { + + import MainBusSpec._ + + "A MainBus" must { + + "allow subscriptions" in { + val bus = new MainBus(true) + bus.start(app) + bus.subscribe(testActor, classOf[M]) + bus.publish(M(42)) + expectMsg(1 second, M(42)) + } + + } + +} \ No newline at end of file diff --git a/akka-actor-tests/src/test/scala/akka/performance/trading/common/AkkaPerformanceTest.scala b/akka-actor-tests/src/test/scala/akka/performance/trading/common/AkkaPerformanceTest.scala index 3bd7ea96d9..0ba7c65be6 100644 --- a/akka-actor-tests/src/test/scala/akka/performance/trading/common/AkkaPerformanceTest.scala +++ b/akka-actor-tests/src/test/scala/akka/performance/trading/common/AkkaPerformanceTest.scala @@ -6,9 +6,12 @@ import akka.performance.trading.domain._ import akka.performance.trading.common._ import akka.actor.{ Props, ActorRef, Actor, PoisonPill } import akka.AkkaApplication +import akka.event.Logging abstract class AkkaPerformanceTest(val app: AkkaApplication) extends BenchmarkScenarios { + val log = Logging(app, this) + type TS = AkkaTradingSystem val clientDispatcher = app.dispatcherFactory.newDispatcher("client-dispatcher") @@ -70,7 +73,7 @@ abstract class AkkaPerformanceTest(val app: AkkaApplication) extends BenchmarkSc placeOrder(orderReceiver, o) } if (!rsp.status) { - app.eventHandler.error(this, "Invalid rsp") + log.error("Invalid rsp") } delay(delayMs) } diff --git a/akka-actor-tests/src/test/scala/akka/performance/trading/common/MatchingEngine.scala b/akka-actor-tests/src/test/scala/akka/performance/trading/common/MatchingEngine.scala index 5f2989fa97..f13fd20dba 100644 --- a/akka-actor-tests/src/test/scala/akka/performance/trading/common/MatchingEngine.scala +++ b/akka-actor-tests/src/test/scala/akka/performance/trading/common/MatchingEngine.scala @@ -16,7 +16,7 @@ trait MatchingEngine { } class AkkaMatchingEngine(val meId: String, val orderbooks: List[Orderbook]) - extends Actor with MatchingEngine { + extends Actor with MatchingEngine with ActorLogging { var standby: Option[ActorRef] = None @@ -26,7 +26,7 @@ class AkkaMatchingEngine(val meId: String, val orderbooks: List[Orderbook]) case order: Order ⇒ handleOrder(order) case unknown ⇒ - app.eventHandler.warning(this, "Received unknown message: " + unknown) + log.warning("Received unknown message: " + unknown) } def handleOrder(order: Order) { @@ -41,7 +41,7 @@ class AkkaMatchingEngine(val meId: String, val orderbooks: List[Orderbook]) pendingStandbyReply.foreach(waitForStandby(_)) done(true) case None ⇒ - app.eventHandler.warning(this, "Orderbook not handled by this MatchingEngine: " + order.orderbookSymbol) + log.warning("Orderbook not handled by this MatchingEngine: " + order.orderbookSymbol) done(false) } } @@ -55,7 +55,7 @@ class AkkaMatchingEngine(val meId: String, val orderbooks: List[Orderbook]) pendingStandbyFuture.await } catch { case e: FutureTimeoutException ⇒ - app.eventHandler.error(this, "Standby timeout: " + e) + log.error("Standby timeout: " + e) } } diff --git a/akka-actor-tests/src/test/scala/akka/performance/trading/common/OrderReceiver.scala b/akka-actor-tests/src/test/scala/akka/performance/trading/common/OrderReceiver.scala index c1f811b425..f477cd43ca 100644 --- a/akka-actor-tests/src/test/scala/akka/performance/trading/common/OrderReceiver.scala +++ b/akka-actor-tests/src/test/scala/akka/performance/trading/common/OrderReceiver.scala @@ -24,14 +24,14 @@ trait OrderReceiver { } -class AkkaOrderReceiver extends Actor with OrderReceiver { +class AkkaOrderReceiver extends Actor with OrderReceiver with ActorLogging { type ME = ActorRef def receive = { case routing @ MatchingEngineRouting(mapping) ⇒ refreshMatchingEnginePartitions(routing.asInstanceOf[MatchingEngineRouting[ActorRef]]) case order: Order ⇒ placeOrder(order) - case unknown ⇒ app.eventHandler.warning(this, "Received unknown message: " + unknown) + case unknown ⇒ log.warning("Received unknown message: " + unknown) } def placeOrder(order: Order) = { @@ -40,7 +40,7 @@ class AkkaOrderReceiver extends Actor with OrderReceiver { case Some(m) ⇒ m.forward(order) case None ⇒ - app.eventHandler.warning(this, "Unknown orderbook: " + order.orderbookSymbol) + log.warning("Unknown orderbook: " + order.orderbookSymbol) channel ! new Rsp(false) } } diff --git a/akka-actor-tests/src/test/scala/akka/performance/trading/oneway/OneWayMatchingEngine.scala b/akka-actor-tests/src/test/scala/akka/performance/trading/oneway/OneWayMatchingEngine.scala index 44ce92a92d..20c33551e9 100644 --- a/akka-actor-tests/src/test/scala/akka/performance/trading/oneway/OneWayMatchingEngine.scala +++ b/akka-actor-tests/src/test/scala/akka/performance/trading/oneway/OneWayMatchingEngine.scala @@ -17,7 +17,7 @@ class OneWayMatchingEngine(meId: String, orderbooks: List[Orderbook]) extends Ak orderbook.matchOrders() case None ⇒ - app.eventHandler.warning(this, "Orderbook not handled by this MatchingEngine: " + order.orderbookSymbol) + log.warning("Orderbook not handled by this MatchingEngine: " + order.orderbookSymbol) } } diff --git a/akka-actor-tests/src/test/scala/akka/performance/trading/oneway/OneWayOrderReceiver.scala b/akka-actor-tests/src/test/scala/akka/performance/trading/oneway/OneWayOrderReceiver.scala index daeabfb36b..46dd57c111 100644 --- a/akka-actor-tests/src/test/scala/akka/performance/trading/oneway/OneWayOrderReceiver.scala +++ b/akka-actor-tests/src/test/scala/akka/performance/trading/oneway/OneWayOrderReceiver.scala @@ -13,7 +13,7 @@ class OneWayOrderReceiver extends AkkaOrderReceiver { case Some(m) ⇒ m ! order case None ⇒ - app.eventHandler.warning(this, "Unknown orderbook: " + order.orderbookSymbol) + log.warning("Unknown orderbook: " + order.orderbookSymbol) } } } diff --git a/akka-actor-tests/src/test/scala/akka/performance/workbench/BenchResultRepository.scala b/akka-actor-tests/src/test/scala/akka/performance/workbench/BenchResultRepository.scala index bc9d6593f3..5713f91762 100644 --- a/akka-actor-tests/src/test/scala/akka/performance/workbench/BenchResultRepository.scala +++ b/akka-actor-tests/src/test/scala/akka/performance/workbench/BenchResultRepository.scala @@ -13,6 +13,7 @@ import java.text.SimpleDateFormat import java.util.Date import scala.collection.mutable.{ Map ⇒ MutableMap } import akka.AkkaApplication +import akka.event.Logging trait BenchResultRepository { def add(stats: Stats) @@ -43,6 +44,8 @@ class FileBenchResultRepository(val app: AkkaApplication) extends BenchResultRep private def htmlDirExists: Boolean = new File(htmlDir).exists protected val maxHistorical = 7 + val log = Logging(app, this) + case class Key(name: String, load: Int) def add(stats: Stats) { @@ -102,8 +105,7 @@ class FileBenchResultRepository(val app: AkkaApplication) extends BenchResultRep out.writeObject(stats) } catch { case e: Exception ⇒ - app.eventHandler.error(this, "Failed to save [%s] to [%s], due to [%s]". - format(stats, f.getAbsolutePath, e.getMessage)) + log.error("Failed to save [{}] to [{}], due to [{}]", stats, f.getAbsolutePath, e.getMessage) } finally { if (out ne null) try { out.close() } catch { case ignore: Exception ⇒ } } @@ -119,8 +121,7 @@ class FileBenchResultRepository(val app: AkkaApplication) extends BenchResultRep Some(stats) } catch { case e: Throwable ⇒ - app.eventHandler.error(this, "Failed to load from [%s], due to [%s]". - format(f.getAbsolutePath, e.getMessage)) + log.error("Failed to load from [{}], due to [{}]", f.getAbsolutePath, e.getMessage) None } finally { if (in ne null) try { in.close() } catch { case ignore: Exception ⇒ } @@ -143,8 +144,7 @@ class FileBenchResultRepository(val app: AkkaApplication) extends BenchResultRep writer.flush() } catch { case e: Exception ⇒ - app.eventHandler.error(this, "Failed to save report to [%s], due to [%s]". - format(f.getAbsolutePath, e.getMessage)) + log.error("Failed to save report to [{}], due to [{}]", f.getAbsolutePath, e.getMessage) } finally { if (writer ne null) try { writer.close() } catch { case ignore: Exception ⇒ } } diff --git a/akka-actor-tests/src/test/scala/akka/performance/workbench/Report.scala b/akka-actor-tests/src/test/scala/akka/performance/workbench/Report.scala index d3a5f020d0..0383b04e8c 100644 --- a/akka-actor-tests/src/test/scala/akka/performance/workbench/Report.scala +++ b/akka-actor-tests/src/test/scala/akka/performance/workbench/Report.scala @@ -6,12 +6,14 @@ import java.util.Date import scala.collection.JavaConversions.asScalaBuffer import scala.collection.JavaConversions.enumerationAsScalaIterator import akka.AkkaApplication +import akka.event.Logging class Report(app: AkkaApplication, resultRepository: BenchResultRepository, compareResultWith: Option[String] = None) { - private def log = System.getProperty("benchmark.logResult", "true").toBoolean + private def doLog = System.getProperty("benchmark.logResult", "true").toBoolean + val log = Logging(app, this) val dateTimeFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm") val legendTimeFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm") @@ -51,8 +53,8 @@ class Report(app: AkkaApplication, val reportName = current.name + "--" + timestamp + ".html" resultRepository.saveHtmlReport(sb.toString, reportName) - if (log) { - app.eventHandler.info(this, resultTable + "Charts in html report: " + resultRepository.htmlReportUrl(reportName)) + if (doLog) { + log.info(resultTable + "Charts in html report: " + resultRepository.htmlReportUrl(reportName)) } } diff --git a/akka-actor/src/main/scala/akka/AkkaApplication.scala b/akka-actor/src/main/scala/akka/AkkaApplication.scala index b2bcccb0b5..837960229d 100644 --- a/akka-actor/src/main/scala/akka/AkkaApplication.scala +++ b/akka-actor/src/main/scala/akka/AkkaApplication.scala @@ -20,6 +20,8 @@ import java.net.InetSocketAddress object AkkaApplication { + type AkkaConfig = a.AkkaConfig.type forSome { val a: AkkaApplication } + val Version = "2.0-SNAPSHOT" val envHome = System.getenv("AKKA_HOME") match { @@ -92,12 +94,14 @@ class AkkaApplication(val name: String, val config: Configuration) extends Actor val ActorTimeoutMillis = ActorTimeout.duration.toMillis val SerializeAllMessages = getBool("akka.actor.serialize-messages", false) - val LogLevel = getString("akka.event-handler-level", "INFO") + val LogLevel = getString("akka.loglevel", "INFO") + val StdoutLogLevel = getString("akka.stdout-loglevel", LogLevel) val EventHandlers = getList("akka.event-handlers") val AddLoggingReceive = getBool("akka.actor.debug.receive", false) val DebugAutoReceive = getBool("akka.actor.debug.autoreceive", false) val DebugLifecycle = getBool("akka.actor.debug.lifecycle", false) val FsmDebugEvent = getBool("akka.actor.debug.fsm", false) + val DebugMainBus = getBool("akka.actor.debug.mainbus", false) val DispatcherThroughput = getInt("akka.actor.throughput", 5) val DispatcherDefaultShutdown = getLong("akka.actor.dispatcher-shutdown-timeout"). @@ -133,6 +137,8 @@ class AkkaApplication(val name: String, val config: Configuration) extends Actor val ExpiredHeaderValue = config.getString("akka.http.expired-header-value", "expired") } + private[akka] def systemActorOf(props: Props, address: String): ActorRef = provider.actorOf(props, systemGuardian, address, true) + import AkkaConfig._ if (ConfigVersion != Version) @@ -159,11 +165,14 @@ class AkkaApplication(val name: String, val config: Configuration) extends Actor val defaultAddress = new InetSocketAddress(hostname, AkkaConfig.RemoteServerPort) + // this provides basic logging (to stdout) until .start() is called below + val mainbus = new MainBus(DebugMainBus) + mainbus.startStdoutLogger(AkkaConfig) + val log = new MainBusLogging(mainbus, this) + // TODO correctly pull its config from the config val dispatcherFactory = new Dispatchers(this) - implicit val dispatcher = dispatcherFactory.defaultGlobalDispatcher - def terminationFuture: Future[ExitStatus] = provider.terminationFuture // TODO think about memory consistency effects when doing funky stuff inside constructor @@ -175,7 +184,7 @@ class AkkaApplication(val name: String, val config: Configuration) extends Actor // TODO make this configurable protected[akka] val guardian: ActorRef = { import akka.actor.FaultHandlingStrategy._ - new LocalActorRef(this, + provider.actorOf( Props(context ⇒ { case _ ⇒ }).withFaultHandler(OneForOneStrategy { case _: ActorKilledException ⇒ Stop case _: ActorInitializationException ⇒ Stop @@ -186,20 +195,31 @@ class AkkaApplication(val name: String, val config: Configuration) extends Actor true) } - // TODO think about memory consistency effects when doing funky stuff inside constructor - val eventHandler = new EventHandler(this) - - // TODO think about memory consistency effects when doing funky stuff inside constructor - val log: Logging = new EventHandlerLogging(eventHandler, this) + protected[akka] val systemGuardian: ActorRef = { + import akka.actor.FaultHandlingStrategy._ + provider.actorOf( + Props(context ⇒ { case _ ⇒ }).withFaultHandler(OneForOneStrategy { + case _: ActorKilledException ⇒ Stop + case _: ActorInitializationException ⇒ Stop + case _: Exception ⇒ Restart + }).withDispatcher(dispatcher), + provider.theOneWhoWalksTheBubblesOfSpaceTime, + "SystemSupervisor", + true) + } // TODO think about memory consistency effects when doing funky stuff inside constructor val deadLetters = new DeadLetterActorRef(this) + val deathWatch = provider.createDeathWatch() + + // this starts the reaper actor and the user-configured logging subscribers, which are also actors + mainbus.start(this) + mainbus.startDefaultLoggers(this, AkkaConfig) + // TODO think about memory consistency effects when doing funky stuff inside an ActorRefProvider's constructor val deployer = new Deployer(this) - val deathWatch = provider.createDeathWatch() - // TODO think about memory consistency effects when doing funky stuff inside constructor val typedActor = new TypedActor(this) @@ -212,6 +232,7 @@ class AkkaApplication(val name: String, val config: Configuration) extends Actor // TODO shutdown all that other stuff, whatever that may be def stop(): Unit = { guardian.stop() + systemGuardian.stop() } terminationFuture.onComplete(_ ⇒ dispatcher.shutdown()) diff --git a/akka-actor/src/main/scala/akka/actor/Actor.scala b/akka-actor/src/main/scala/akka/actor/Actor.scala index d55c330840..f8ad786341 100644 --- a/akka-actor/src/main/scala/akka/actor/Actor.scala +++ b/akka-actor/src/main/scala/akka/actor/Actor.scala @@ -13,7 +13,7 @@ import akka.remote.RemoteSupport import akka.cluster.ClusterNode import akka.japi.{ Creator, Procedure } import akka.serialization.{ Serializer, Serialization } -import akka.event.EventHandler +import akka.event.Logging.Debug import akka.experimental import akka.{ AkkaApplication, AkkaException } @@ -153,6 +153,10 @@ object Timeout { implicit def defaultTimeout(implicit app: AkkaApplication) = app.AkkaConfig.ActorTimeout } +trait ActorLogging { this: Actor ⇒ + val log = akka.event.Logging(app.mainbus, context.self) +} + object Actor { type Receive = PartialFunction[Any, Unit] @@ -163,7 +167,7 @@ object Actor { class LoggingReceive(source: AnyRef, r: Receive)(implicit app: AkkaApplication) extends Receive { def isDefinedAt(o: Any) = { val handled = r.isDefinedAt(o) - app.eventHandler.debug(source, "received " + (if (handled) "handled" else "unhandled") + " message " + o) + app.mainbus.publish(Debug(source, "received " + (if (handled) "handled" else "unhandled") + " message " + o)) handled } def apply(o: Any): Unit = r(o) @@ -418,7 +422,7 @@ trait Actor { private[akka] final def apply(msg: Any) = { def autoReceiveMessage(msg: AutoReceivedMessage) { - if (app.AkkaConfig.DebugAutoReceive) app.eventHandler.debug(this, "received AutoReceiveMessage " + msg) + if (app.AkkaConfig.DebugAutoReceive) app.mainbus.publish(Debug(this, "received AutoReceiveMessage " + msg)) msg match { case HotSwap(code, discardOld) ⇒ become(code(self), discardOld) diff --git a/akka-actor/src/main/scala/akka/actor/ActorCell.scala b/akka-actor/src/main/scala/akka/actor/ActorCell.scala index 75c7d1f02e..180cbe5df7 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorCell.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorCell.scala @@ -12,6 +12,7 @@ import scala.collection.JavaConverters import java.util.concurrent.{ ScheduledFuture, TimeUnit } import java.util.{ Collection ⇒ JCollection, Collections ⇒ JCollections } import akka.AkkaApplication +import akka.event.Logging.{ Debug, Warning, Error } /** * The actor context - the view of the actor cell from the actor. @@ -176,11 +177,11 @@ private[akka] class ActorCell( actor = created created.preStart() checkReceiveTimeout - if (app.AkkaConfig.DebugLifecycle) app.eventHandler.debug(self, "started") + if (app.AkkaConfig.DebugLifecycle) app.mainbus.publish(Debug(self, "started (" + actor + ")")) } catch { case e ⇒ try { - app.eventHandler.error(e, self, "error while creating actor") + app.mainbus.publish(Error(e, self, "error while creating actor")) // prevent any further messages to be processed until the actor has been restarted dispatcher.suspend(this) } finally { @@ -190,7 +191,7 @@ private[akka] class ActorCell( def recreate(cause: Throwable): Unit = try { val failedActor = actor - if (app.AkkaConfig.DebugLifecycle) app.eventHandler.debug(self, "restarting") + if (app.AkkaConfig.DebugLifecycle) app.mainbus.publish(Debug(self, "restarting")) val freshActor = newActor() if (failedActor ne null) { val c = currentMessage //One read only plz @@ -204,14 +205,14 @@ private[akka] class ActorCell( } actor = freshActor // assign it here so if preStart fails, we can null out the sef-refs next call freshActor.postRestart(cause) - if (app.AkkaConfig.DebugLifecycle) app.eventHandler.debug(self, "restarted") + if (app.AkkaConfig.DebugLifecycle) app.mainbus.publish(Debug(self, "restarted")) dispatcher.resume(this) //FIXME should this be moved down? props.faultHandler.handleSupervisorRestarted(cause, self, children) } catch { case e ⇒ try { - app.eventHandler.error(e, self, "error while creating actor") + app.mainbus.publish(Error(e, self, "error while creating actor")) // prevent any further messages to be processed until the actor has been restarted dispatcher.suspend(this) } finally { @@ -232,7 +233,7 @@ private[akka] class ActorCell( try { try { val a = actor - if (app.AkkaConfig.DebugLifecycle) app.eventHandler.debug(self, "stopping") + if (app.AkkaConfig.DebugLifecycle) app.mainbus.publish(Debug(self, "stopping")) if (a ne null) a.postStop() } finally { //Stop supervised actors @@ -257,8 +258,8 @@ private[akka] class ActorCell( val links = _children if (!links.contains(child)) { _children = _children.updated(child, ChildRestartStats()) - if (app.AkkaConfig.DebugLifecycle) app.eventHandler.debug(self, "now supervising " + child) - } else app.eventHandler.warning(self, "Already supervising " + child) + if (app.AkkaConfig.DebugLifecycle) app.mainbus.publish(Debug(self, "now supervising " + child)) + } else app.mainbus.publish(Warning(self, "Already supervising " + child)) } try { @@ -269,10 +270,10 @@ private[akka] class ActorCell( case Recreate(cause) ⇒ recreate(cause) case Link(subject) ⇒ app.deathWatch.subscribe(self, subject) - if (app.AkkaConfig.DebugLifecycle) app.eventHandler.debug(self, "now monitoring " + subject) + if (app.AkkaConfig.DebugLifecycle) app.mainbus.publish(Debug(self, "now monitoring " + subject)) case Unlink(subject) ⇒ app.deathWatch.unsubscribe(self, subject) - if (app.AkkaConfig.DebugLifecycle) app.eventHandler.debug(self, "stopped monitoring " + subject) + if (app.AkkaConfig.DebugLifecycle) app.mainbus.publish(Debug(self, "stopped monitoring " + subject)) case Suspend() ⇒ suspend() case Resume() ⇒ resume() case Terminate() ⇒ terminate() @@ -281,7 +282,7 @@ private[akka] class ActorCell( } } catch { case e ⇒ //Should we really catch everything here? - app.eventHandler.error(e, self, "error while processing " + message) + app.mainbus.publish(Error(e, self, "error while processing " + message)) //TODO FIXME How should problems here be handled? throw e } @@ -300,7 +301,7 @@ private[akka] class ActorCell( currentMessage = null // reset current message after successful invocation } catch { case e ⇒ - app.eventHandler.error(e, self, e.getMessage) + app.mainbus.publish(Error(e, self, e.getMessage)) // prevent any further messages to be processed until the actor has been restarted dispatcher.suspend(this) @@ -322,7 +323,7 @@ private[akka] class ActorCell( } } catch { case e ⇒ - app.eventHandler.error(e, self, e.getMessage) + app.mainbus.publish(Error(e, self, e.getMessage)) throw e } } else { @@ -334,7 +335,7 @@ private[akka] class ActorCell( def handleFailure(fail: Failed): Unit = _children.get(fail.actor) match { case Some(stats) ⇒ if (!props.faultHandler.handleFailure(fail, stats, _children)) throw fail.cause - case None ⇒ app.eventHandler.warning(self, "dropping " + fail + " from unknown child") + case None ⇒ app.mainbus.publish(Warning(self, "dropping " + fail + " from unknown child")) } def handleChildTerminated(child: ActorRef): Unit = { diff --git a/akka-actor/src/main/scala/akka/actor/ActorRef.scala b/akka-actor/src/main/scala/akka/actor/ActorRef.scala index 43fb856afb..1df0dc64da 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRef.scala @@ -383,12 +383,12 @@ class DeadLetterActorRef(app: AkkaApplication) extends MinimalActorRef { override def isShutdown(): Boolean = true - protected[akka] override def postMessageToMailbox(message: Any, channel: UntypedChannel): Unit = app.eventHandler.notify(DeadLetter(message, channel)) + protected[akka] override def postMessageToMailbox(message: Any, channel: UntypedChannel): Unit = app.mainbus.publish(DeadLetter(message, channel)) protected[akka] override def postMessageToMailboxAndCreateFutureResultWithTimeout( message: Any, timeout: Timeout, - channel: UntypedChannel): Future[Any] = { app.eventHandler.notify(DeadLetter(message, channel)); brokenPromise } + channel: UntypedChannel): Future[Any] = { app.mainbus.publish(DeadLetter(message, channel)); brokenPromise } } abstract class AskActorRef(promise: Promise[Any], app: AkkaApplication) extends MinimalActorRef { diff --git a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala index aa84ef2711..f8f701c4d2 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala @@ -11,7 +11,7 @@ import akka.AkkaApplication import java.util.concurrent.ConcurrentHashMap import com.eaio.uuid.UUID import akka.AkkaException -import akka.event.{ ActorClassification, DeathWatch, EventHandler } +import akka.event.{ ActorClassification, DeathWatch, MainBusLogging } import akka.dispatch._ /** @@ -91,6 +91,7 @@ class ActorRefProviderException(message: String) extends AkkaException(message) class LocalActorRefProvider(val app: AkkaApplication) extends ActorRefProvider { val terminationFuture = new DefaultPromise[AkkaApplication.ExitStatus](Timeout.never)(app.dispatcher) + val log = new MainBusLogging(app.mainbus, this) /** * Top-level anchor for the supervision hierarchy of this actor system. Will @@ -105,14 +106,14 @@ class LocalActorRefProvider(val app: AkkaApplication) extends ActorRefProvider { msg match { case Failed(child, ex) ⇒ child.stop() case ChildTerminated(child) ⇒ terminationFuture.completeWithResult(AkkaApplication.Stopped) - case _ ⇒ app.eventHandler.error(this, this + " received unexpected message " + msg) + case _ ⇒ log.error(this + " received unexpected message " + msg) } } protected[akka] override def sendSystemMessage(message: SystemMessage) { message match { case Supervise(child) ⇒ // TODO register child in some map to keep track of it and enable shutdown after all dead - case _ ⇒ app.eventHandler.error(this, this + " received unexpected system message " + message) + case _ ⇒ log.error(this + " received unexpected system message " + message) } } } diff --git a/akka-actor/src/main/scala/akka/actor/Deployer.scala b/akka-actor/src/main/scala/akka/actor/Deployer.scala index 82af1d9c56..fecdc801fa 100644 --- a/akka-actor/src/main/scala/akka/actor/Deployer.scala +++ b/akka-actor/src/main/scala/akka/actor/Deployer.scala @@ -8,7 +8,7 @@ import collection.immutable.Seq import java.util.concurrent.ConcurrentHashMap -import akka.event.EventHandler +import akka.event.MainBusLogging import akka.actor.DeploymentConfig._ import akka.{ AkkaException, AkkaApplication } import akka.config.{ Configuration, ConfigurationException } @@ -34,6 +34,7 @@ trait ActorDeployer { class Deployer(val app: AkkaApplication) extends ActorDeployer { val deploymentConfig = new DeploymentConfig(app) + val log = new MainBusLogging(app.mainbus, this) // val defaultAddress = Node(Config.nodename) @@ -81,7 +82,7 @@ class Deployer(val app: AkkaApplication) extends ActorDeployer { lookupInConfig(address) } catch { case e: ConfigurationException ⇒ - app.eventHandler.error(e, this, e.getMessage) //TODO FIXME I do not condone log AND rethrow + log.error(e, e.getMessage) //TODO FIXME I do not condone log AND rethrow throw e } @@ -324,13 +325,13 @@ class Deployer(val app: AkkaApplication) extends ActorDeployer { private[akka] def throwDeploymentBoundException(deployment: Deploy): Nothing = { val e = new DeploymentAlreadyBoundException("Address [" + deployment.address + "] already bound to [" + deployment + "]") - app.eventHandler.error(e, this, e.getMessage) + log.error(e, e.getMessage) throw e } private[akka] def thrownNoDeploymentBoundException(address: String): Nothing = { val e = new NoDeploymentBoundException("Address [" + address + "] is not bound to a deployment") - app.eventHandler.error(e, this, e.getMessage) + log.error(e, e.getMessage) throw e } } diff --git a/akka-actor/src/main/scala/akka/actor/FSM.scala b/akka-actor/src/main/scala/akka/actor/FSM.scala index 3ea53364ac..dee625d92c 100644 --- a/akka-actor/src/main/scala/akka/actor/FSM.scala +++ b/akka-actor/src/main/scala/akka/actor/FSM.scala @@ -4,7 +4,7 @@ package akka.actor import akka.util._ -import akka.event.EventHandler +import akka.event.MainBusLogging import scala.collection.mutable import java.util.concurrent.ScheduledFuture @@ -190,6 +190,8 @@ trait FSM[S, D] extends ListenerManagement { type Timeout = Option[Duration] type TransitionHandler = PartialFunction[(S, S), Unit] + val log = new MainBusLogging(app.mainbus, context.self) + /** * **************************************** * DSL @@ -421,7 +423,7 @@ trait FSM[S, D] extends ListenerManagement { */ private val handleEventDefault: StateFunction = { case Event(value, stateData) ⇒ - app.eventHandler.warning(context.self, "unhandled event " + value + " in state " + stateName) + log.warning("unhandled event " + value + " in state " + stateName) stay } private var handleEvent: StateFunction = handleEventDefault @@ -534,8 +536,8 @@ trait FSM[S, D] extends ListenerManagement { if (!currentState.stopReason.isDefined) { val reason = nextState.stopReason.get reason match { - case Failure(ex: Throwable) ⇒ app.eventHandler.error(ex, context.self, "terminating due to Failure") - case Failure(msg) ⇒ app.eventHandler.error(context.self, msg) + case Failure(ex: Throwable) ⇒ log.error(ex, "terminating due to Failure") + case Failure(msg: AnyRef) ⇒ log.error(msg.toString) case _ ⇒ } val stopEvent = StopEvent(reason, currentState.stateName, currentState.stateData) @@ -584,13 +586,13 @@ trait LoggingFSM[S, D] extends FSM[S, D] { this: Actor ⇒ protected[akka] abstract override def setTimer(name: String, msg: Any, timeout: Duration, repeat: Boolean): State = { if (debugEvent) - app.eventHandler.debug(context.self, "setting " + (if (repeat) "repeating " else "") + "timer '" + name + "'/" + timeout + ": " + msg) + log.debug("setting " + (if (repeat) "repeating " else "") + "timer '" + name + "'/" + timeout + ": " + msg) super.setTimer(name, msg, timeout, repeat) } protected[akka] abstract override def cancelTimer(name: String) = { if (debugEvent) - app.eventHandler.debug(context.self, "canceling timer '" + name + "'") + log.debug("canceling timer '" + name + "'") super.cancelTimer(name) } @@ -602,7 +604,7 @@ trait LoggingFSM[S, D] extends FSM[S, D] { this: Actor ⇒ case c: UntypedChannel ⇒ c.toString case _ ⇒ "unknown" } - app.eventHandler.debug(context.self, "processing " + event + " from " + srcstr) + log.debug("processing " + event + " from " + srcstr) } if (logDepth > 0) { @@ -616,7 +618,7 @@ trait LoggingFSM[S, D] extends FSM[S, D] { this: Actor ⇒ val newState = stateName if (debugEvent && oldState != newState) - app.eventHandler.debug(context.self, "transition " + oldState + " -> " + newState) + log.debug("transition " + oldState + " -> " + newState) } /** diff --git a/akka-actor/src/main/scala/akka/actor/IO.scala b/akka-actor/src/main/scala/akka/actor/IO.scala index 600d3a334a..164321ade9 100644 --- a/akka-actor/src/main/scala/akka/actor/IO.scala +++ b/akka-actor/src/main/scala/akka/actor/IO.scala @@ -5,7 +5,6 @@ package akka.actor import akka.util.ByteString import akka.dispatch.Envelope -import akka.event.EventHandler import java.net.InetSocketAddress import java.io.IOException import java.util.concurrent.atomic.AtomicReference diff --git a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala index f89451b962..f3d41e138e 100644 --- a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala @@ -6,7 +6,7 @@ package akka.dispatch import java.util.concurrent._ import java.util.concurrent.atomic.AtomicLong -import akka.event.EventHandler +import akka.event.Logging.Error import akka.config.Configuration import akka.util.{ Duration, Switch, ReentrantGuard } import java.util.concurrent.ThreadPoolExecutor.{ AbortPolicy, CallerRunsPolicy, DiscardOldestPolicy, DiscardPolicy } @@ -66,7 +66,7 @@ final case class TaskInvocation(app: AkkaApplication, function: () ⇒ Unit, cle try { function() } catch { - case e ⇒ app.eventHandler.error(e, this, e.getMessage) + case e ⇒ app.mainbus.publish(Error(e, this, e.getMessage)) } finally { cleanup() } diff --git a/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala index f9819c3cd3..3e9411d593 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala @@ -4,7 +4,7 @@ package akka.dispatch -import akka.event.EventHandler +import akka.event.Logging.Warning import java.util.concurrent.atomic.AtomicReference import java.util.concurrent.{ TimeUnit, ExecutorService, RejectedExecutionException, ConcurrentLinkedQueue } import akka.actor.{ ActorCell, ActorKilledException } @@ -95,7 +95,7 @@ class Dispatcher( executorService.get() execute invocation } catch { case e: RejectedExecutionException ⇒ - app.eventHandler.warning(this, e.toString) + app.mainbus.publish(Warning(this, e.toString)) throw e } } @@ -122,7 +122,7 @@ class Dispatcher( } catch { case e: RejectedExecutionException ⇒ try { - app.eventHandler.warning(this, e.toString) + app.mainbus.publish(Warning(this, e.toString)) } finally { mbox.setAsIdle() } diff --git a/akka-actor/src/main/scala/akka/dispatch/Future.scala b/akka-actor/src/main/scala/akka/dispatch/Future.scala index 922aa9cf5c..f232762687 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Future.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Future.scala @@ -6,7 +6,7 @@ package akka.dispatch import akka.AkkaException -import akka.event.EventHandler +import akka.event.Logging.Error import akka.actor.{ UntypedChannel, Timeout, ExceptionChannel } import scala.Option import akka.japi.{ Procedure, Function ⇒ JFunc, Option ⇒ JOption } @@ -262,7 +262,7 @@ object Future { result completeWithResult currentValue } catch { case e: Exception ⇒ - dispatcher.app.eventHandler.error(e, this, e.getMessage) + dispatcher.app.mainbus.publish(Error(e, this, e.getMessage)) result completeWithException e } finally { results.clear @@ -596,7 +596,7 @@ sealed trait Future[+T] extends japi.Future[T] { Right(f(res)) } catch { case e: Exception ⇒ - dispatcher.app.eventHandler.error(e, this, e.getMessage) + dispatcher.app.mainbus.publish(Error(e, this, e.getMessage)) Left(e) }) } @@ -648,7 +648,7 @@ sealed trait Future[+T] extends japi.Future[T] { future.completeWith(f(r)) } catch { case e: Exception ⇒ - dispatcher.app.eventHandler.error(e, this, e.getMessage) + dispatcher.app.mainbus.publish(Error(e, this, e.getMessage)) future complete Left(e) } } @@ -681,7 +681,7 @@ sealed trait Future[+T] extends japi.Future[T] { if (p(res)) r else Left(new MatchError(res)) } catch { case e: Exception ⇒ - dispatcher.app.eventHandler.error(e, this, e.getMessage) + dispatcher.app.mainbus.publish(Error(e, this, e.getMessage)) Left(e) }) } @@ -781,7 +781,7 @@ trait Promise[T] extends Future[T] { fr completeWith cont(f) } catch { case e: Exception ⇒ - dispatcher.app.eventHandler.error(e, this, e.getMessage) + dispatcher.app.mainbus.publish(Error(e, this, e.getMessage)) fr completeWithException e } } @@ -795,7 +795,7 @@ trait Promise[T] extends Future[T] { fr completeWith cont(f) } catch { case e: Exception ⇒ - dispatcher.app.eventHandler.error(e, this, e.getMessage) + dispatcher.app.mainbus.publish(Error(e, this, e.getMessage)) fr completeWithException e } } @@ -985,7 +985,7 @@ class DefaultPromise[T](val timeout: Timeout)(implicit val dispatcher: MessageDi } else this private def notifyCompleted(func: Future[T] ⇒ Unit) { - try { func(this) } catch { case e ⇒ dispatcher.app.eventHandler.error(e, this, "Future onComplete-callback raised an exception") } //TODO catch, everything? Really? + try { func(this) } catch { case e ⇒ dispatcher.app.mainbus.publish(Error(e, this, "Future onComplete-callback raised an exception")) } //TODO catch, everything? Really? } @inline diff --git a/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala b/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala index d03655489b..4222c6fd77 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala @@ -12,6 +12,7 @@ import akka.actor.{ ActorContext, ActorCell } import java.util.concurrent._ import atomic.{ AtomicInteger, AtomicReferenceFieldUpdater } import annotation.tailrec +import akka.event.Logging.Error class MessageQueueAppendFailedException(message: String, cause: Throwable = null) extends AkkaException(message, cause) @@ -205,7 +206,7 @@ abstract class Mailbox(val actor: ActorCell) extends AbstractMailbox with Messag } } catch { case e ⇒ - actor.app.eventHandler.error(e, this, "exception during processing system messages, dropping " + SystemMessage.size(nextMessage) + " messages!") + actor.app.mainbus.publish(Error(e, this, "exception during processing system messages, dropping " + SystemMessage.size(nextMessage) + " messages!")) throw e } } diff --git a/akka-actor/src/main/scala/akka/dispatch/ThreadPoolBuilder.scala b/akka-actor/src/main/scala/akka/dispatch/ThreadPoolBuilder.scala index ac529342bd..0d579e6e1e 100644 --- a/akka-actor/src/main/scala/akka/dispatch/ThreadPoolBuilder.scala +++ b/akka-actor/src/main/scala/akka/dispatch/ThreadPoolBuilder.scala @@ -9,7 +9,7 @@ import java.util.concurrent._ import atomic.{ AtomicLong, AtomicInteger } import ThreadPoolExecutor.CallerRunsPolicy import akka.util.Duration -import akka.event.EventHandler +import akka.event.Logging.{ Warning, Error } import akka.AkkaApplication object ThreadPoolConfig { @@ -227,10 +227,10 @@ class BoundedExecutorDecorator(val app: AkkaApplication, val executor: ExecutorS }) } catch { case e: RejectedExecutionException ⇒ - app.eventHandler.warning(this, e.toString) + app.mainbus.publish(Warning(this, e.toString)) semaphore.release case e: Throwable ⇒ - app.eventHandler.error(e, this, e.getMessage) + app.mainbus.publish(Error(e, this, e.getMessage)) throw e } } diff --git a/akka-actor/src/main/scala/akka/event/EventBus.scala b/akka-actor/src/main/scala/akka/event/EventBus.scala index 9dd76f5344..c95b91e9ce 100644 --- a/akka-actor/src/main/scala/akka/event/EventBus.scala +++ b/akka-actor/src/main/scala/akka/event/EventBus.scala @@ -48,6 +48,7 @@ trait EventBus { */ trait ActorEventBus extends EventBus { type Subscriber = ActorRef + protected def compareSubscribers(a: ActorRef, b: ActorRef) = a compareTo b } /** @@ -254,9 +255,9 @@ trait ActorClassification { self: ActorEventBus with ActorClassifier ⇒ */ protected def mapSize: Int - def publish(event: Event): Unit = mappings.get(classify(event)) match { - case null ⇒ - case raw: Vector[_] ⇒ raw.asInstanceOf[Vector[ActorRef]] foreach { _ ! event } + def publish(event: Event): Unit = { + val receivers = mappings.get(classify(event)) + if (receivers ne null) receivers foreach { _ ! event } } def subscribe(subscriber: Subscriber, to: Classifier): Boolean = associate(to, subscriber) diff --git a/akka-actor/src/main/scala/akka/event/EventHandler.scala b/akka-actor/src/main/scala/akka/event/EventHandler.scala index 3acdf52a0b..7a4a5fe0ad 100644 --- a/akka-actor/src/main/scala/akka/event/EventHandler.scala +++ b/akka-actor/src/main/scala/akka/event/EventHandler.scala @@ -12,124 +12,6 @@ import akka.serialization._ import akka.AkkaException import akka.AkkaApplication -object EventHandler { - - val ErrorLevel = 1 - val WarningLevel = 2 - val InfoLevel = 3 - val DebugLevel = 4 - - val errorFormat = "[ERROR] [%s] [%s] [%s] %s\n%s".intern - val warningFormat = "[WARN] [%s] [%s] [%s] %s".intern - val infoFormat = "[INFO] [%s] [%s] [%s] %s".intern - val debugFormat = "[DEBUG] [%s] [%s] [%s] %s".intern - val genericFormat = "[GENERIC] [%s] [%s]".intern - - class EventHandlerException extends AkkaException - - lazy val StandardOutLogger = new StandardOutLogger {} - - sealed trait Event { - @transient - val thread: Thread = Thread.currentThread - def level: Int - } - - case class Error(cause: Throwable, instance: AnyRef, message: Any = "") extends Event { - def level = ErrorLevel - } - - case class Warning(instance: AnyRef, message: Any = "") extends Event { - def level = WarningLevel - } - - case class Info(instance: AnyRef, message: Any = "") extends Event { - def level = InfoLevel - } - - case class Debug(instance: AnyRef, message: Any = "") extends Event { - def level = DebugLevel - } - - trait StandardOutLogger { - import java.text.SimpleDateFormat - import java.util.Date - - val dateFormat = new SimpleDateFormat("MM/dd/yyyy HH:mm:ss.S") - - def timestamp = dateFormat.format(new Date) - - def print(event: Any) { - event match { - case e: Error ⇒ error(e) - case e: Warning ⇒ warning(e) - case e: Info ⇒ info(e) - case e: Debug ⇒ debug(e) - case e ⇒ generic(e) - } - } - - def error(event: Error) = - println(errorFormat.format( - timestamp, - event.thread.getName, - instanceName(event.instance), - event.message, - stackTraceFor(event.cause))) - - def warning(event: Warning) = - println(warningFormat.format( - timestamp, - event.thread.getName, - instanceName(event.instance), - event.message)) - - def info(event: Info) = - println(infoFormat.format( - timestamp, - event.thread.getName, - instanceName(event.instance), - event.message)) - - def debug(event: Debug) = - println(debugFormat.format( - timestamp, - event.thread.getName, - instanceName(event.instance), - event.message)) - - def generic(event: Any) = - println(genericFormat.format(timestamp, event.toString)) - - def instanceName(instance: AnyRef): String = instance match { - case null ⇒ "NULL" - case a: ActorRef ⇒ a.address - case _ ⇒ simpleName(instance) - } - } - - class DefaultListener extends Actor with StandardOutLogger { - def receive = { case event ⇒ print(event) } - } - - def stackTraceFor(e: Throwable) = { - import java.io.{ StringWriter, PrintWriter } - val sw = new StringWriter - val pw = new PrintWriter(sw) - e.printStackTrace(pw) - sw.toString - } - - private def levelFor(eventClass: Class[_ <: Event]) = { - if (classOf[Error].isAssignableFrom(eventClass)) ErrorLevel - else if (classOf[Warning].isAssignableFrom(eventClass)) WarningLevel - else if (classOf[Info].isAssignableFrom(eventClass)) InfoLevel - else if (classOf[Debug].isAssignableFrom(eventClass)) DebugLevel - else DebugLevel - } - -} - /** * Event handler. *

@@ -171,126 +53,3 @@ object EventHandler { * * @author Jonas Bonér */ -class EventHandler(app: AkkaApplication) extends ListenerManagement { - - import EventHandler._ - - val synchronousLogging: Boolean = System.getProperty("akka.event.force-sync") match { - case null | "" ⇒ false - case _ ⇒ true - } - - lazy val EventHandlerDispatcher = - app.dispatcherFactory.fromConfig("akka.event-handler-dispatcher", app.dispatcherFactory.newDispatcher("event-handler-dispatcher").setCorePoolSize(2).build) - - implicit object defaultListenerFormat extends StatelessActorFormat[DefaultListener] - - @volatile - var level: Int = app.AkkaConfig.LogLevel match { - case "ERROR" | "error" ⇒ ErrorLevel - case "WARNING" | "warning" ⇒ WarningLevel - case "INFO" | "info" ⇒ InfoLevel - case "DEBUG" | "debug" ⇒ DebugLevel - case unknown ⇒ throw new ConfigurationException( - "Configuration option 'akka.event-handler-level' is invalid [" + unknown + "]") - } - - def start() { - try { - val defaultListeners = app.AkkaConfig.EventHandlers match { - case Nil ⇒ "akka.event.EventHandler$DefaultListener" :: Nil - case listeners ⇒ listeners - } - defaultListeners foreach { listenerName ⇒ - try { - ReflectiveAccess.getClassFor[Actor](listenerName) match { - case Right(actorClass) ⇒ addListener(new LocalActorRef(app, Props(actorClass).withDispatcher(EventHandlerDispatcher), app.guardian, Props.randomAddress, systemService = true)) - case Left(exception) ⇒ throw exception - } - } catch { - case e: Exception ⇒ - throw new ConfigurationException( - "Event Handler specified in config can't be loaded [" + listenerName + - "] due to [" + e.toString + "]", e) - } - } - info(this, "Starting up EventHandler") - } catch { - case e: Exception ⇒ - System.err.println("error while starting up EventHandler") - e.printStackTrace() - throw new ConfigurationException("Could not start Event Handler due to [" + e.toString + "]") - } - } - - /** - * Shuts down all event handler listeners including the event handle dispatcher. - */ - def shutdown() { - foreachListener { l ⇒ - removeListener(l) - l.stop() - } - } - - def notify(event: Any) { - if (event.isInstanceOf[Event]) { - if (level >= event.asInstanceOf[Event].level) log(event) - } else log(event) - } - - def notify[T <: Event: ClassManifest](event: ⇒ T) { - if (level >= levelFor(classManifest[T].erasure.asInstanceOf[Class[_ <: Event]])) log(event) - } - - def error(cause: Throwable, instance: AnyRef, message: ⇒ String) { - if (level >= ErrorLevel) log(Error(cause, instance, message)) - } - - def error(cause: Throwable, instance: AnyRef, message: Any) { - if (level >= ErrorLevel) log(Error(cause, instance, message)) - } - - def error(instance: AnyRef, message: ⇒ String) { - if (level >= ErrorLevel) log(Error(new EventHandlerException, instance, message)) - } - - def error(instance: AnyRef, message: Any) { - if (level >= ErrorLevel) log(Error(new EventHandlerException, instance, message)) - } - - def warning(instance: AnyRef, message: ⇒ String) { - if (level >= WarningLevel) log(Warning(instance, message)) - } - - def warning(instance: AnyRef, message: Any) { - if (level >= WarningLevel) log(Warning(instance, message)) - } - - def info(instance: AnyRef, message: ⇒ String) { - if (level >= InfoLevel) log(Info(instance, message)) - } - - def info(instance: AnyRef, message: Any) { - if (level >= InfoLevel) log(Info(instance, message)) - } - - def debug(instance: AnyRef, message: ⇒ String) { - if (level >= DebugLevel) log(Debug(instance, message)) - } - - def debug(instance: AnyRef, message: Any) { - if (level >= DebugLevel) log(Debug(instance, message)) - } - - def isInfoEnabled = level >= InfoLevel - - def isDebugEnabled = level >= DebugLevel - - private def log(event: Any) { - if (synchronousLogging) StandardOutLogger.print(event) - else notifyListeners(event) - } - - start() -} diff --git a/akka-actor/src/main/scala/akka/event/Logging.scala b/akka-actor/src/main/scala/akka/event/Logging.scala index 12ead94967..59932879f1 100644 --- a/akka-actor/src/main/scala/akka/event/Logging.scala +++ b/akka-actor/src/main/scala/akka/event/Logging.scala @@ -2,7 +2,258 @@ * Copyright (C) 2009-2011 Typesafe Inc. */ package akka.event -import akka.actor.Actor + +import akka.actor.{ Actor, ActorRef, MinimalActorRef, LocalActorRef, Props, UntypedChannel } +import akka.{ AkkaException, AkkaApplication } +import akka.AkkaApplication.AkkaConfig +import akka.util.ReflectiveAccess +import akka.config.ConfigurationException + +/** + * This trait brings log level handling to the MainBus: it reads the log + * levels for the initial logging (StandardOutLogger) and the loggers&level + * for after-init logging, possibly keeping the StandardOutLogger enabled if + * it is part of the configured loggers. All configured loggers are treated as + * system services and managed by this trait, i.e. subscribed/unsubscribed in + * response to changes of LoggingBus.logLevel. + */ +trait LoggingBus extends ActorEventBus { + + type Event >: Logging.LogEvent + type Classifier >: Class[_] + + import Logging._ + + private var loggers = Seq.empty[ActorRef] + @volatile + private var _logLevel: LogLevel = _ + + def logLevel = _logLevel + + def logLevel_=(level: LogLevel) { + for { l ← AllLogLevels if l > _logLevel && l <= level; log ← loggers } subscribe(log, classFor(l)) + for { l ← AllLogLevels if l <= _logLevel && l > level; log ← loggers } unsubscribe(log, classFor(l)) + _logLevel = level + } + + def startStdoutLogger(config: AkkaConfig) { + val level = levelFor(config.StdoutLogLevel) getOrElse { + StandardOutLogger.print(Error(new EventHandlerException, this, "unknown akka.stdout-loglevel " + config.StdoutLogLevel)) + ErrorLevel + } + AllLogLevels filter (level >= _) foreach (l ⇒ subscribe(StandardOutLogger, classFor(l))) + loggers = Seq(StandardOutLogger) + _logLevel = level + publish(Info(this, "StandardOutLogger started")) + } + + def startDefaultLoggers(app: AkkaApplication, config: AkkaConfig) { + val level = levelFor(config.LogLevel) getOrElse { + StandardOutLogger.print(Error(new EventHandlerException, this, "unknown akka.stdout-loglevel " + config.LogLevel)) + ErrorLevel + } + try { + val defaultLoggers = config.EventHandlers match { + case Nil ⇒ "akka.event.Logging$DefaultLogger" :: Nil + case loggers ⇒ loggers + } + loggers = for { + loggerName ← defaultLoggers + if loggerName != DefaultLoggerName + } yield { + try { + ReflectiveAccess.getClassFor[Actor](loggerName) match { + case Right(actorClass) ⇒ addLogger(app, actorClass, level) + case Left(exception) ⇒ throw exception + } + } catch { + case e: Exception ⇒ + throw new ConfigurationException( + "Event Handler specified in config can't be loaded [" + loggerName + + "] due to [" + e.toString + "]", e) + } + } + publish(Info(this, "Default Loggers started")) + if (defaultLoggers contains DefaultLoggerName) { + loggers :+= StandardOutLogger + } else { + unsubscribe(StandardOutLogger) + } + _logLevel = level + } catch { + case e: Exception ⇒ + System.err.println("error while starting up EventHandler") + e.printStackTrace() + throw new ConfigurationException("Could not start Event Handler due to [" + e.toString + "]") + } + } + + private def addLogger(app: AkkaApplication, clazz: Class[_ <: Actor], level: LogLevel): ActorRef = { + val actor = app.systemActorOf(Props(clazz), Props.randomAddress) + actor ! InitializeLogger(this) + AllLogLevels filter (level >= _) foreach (l ⇒ subscribe(actor, classFor(l))) + publish(Info(this, "logger " + clazz.getName + " started")) + actor + } + +} + +object Logging { + + trait LogLevelType + type LogLevel = Int with LogLevelType + final val ErrorLevel = 1.asInstanceOf[Int with LogLevelType] + final val WarningLevel = 2.asInstanceOf[Int with LogLevelType] + final val InfoLevel = 3.asInstanceOf[Int with LogLevelType] + final val DebugLevel = 4.asInstanceOf[Int with LogLevelType] + + def levelFor(s: String): Option[LogLevel] = s match { + case "ERROR" | "error" ⇒ Some(ErrorLevel) + case "WARNING" | "warning" ⇒ Some(WarningLevel) + case "INFO" | "info" ⇒ Some(InfoLevel) + case "DEBUG" | "debug" ⇒ Some(DebugLevel) + case unknown ⇒ None + } + + def levelFor(eventClass: Class[_ <: LogEvent]) = { + if (classOf[Error].isAssignableFrom(eventClass)) ErrorLevel + else if (classOf[Warning].isAssignableFrom(eventClass)) WarningLevel + else if (classOf[Info].isAssignableFrom(eventClass)) InfoLevel + else if (classOf[Debug].isAssignableFrom(eventClass)) DebugLevel + else DebugLevel + } + + def classFor(level: LogLevel): Class[_ <: LogEvent] = level match { + case ErrorLevel ⇒ classOf[Error] + case WarningLevel ⇒ classOf[Warning] + case InfoLevel ⇒ classOf[Info] + case DebugLevel ⇒ classOf[Debug] + } + + val AllLogLevels = Seq(ErrorLevel: AnyRef, WarningLevel, InfoLevel, DebugLevel).asInstanceOf[Seq[LogLevel]] + + val errorFormat = "[ERROR] [%s] [%s] [%s] %s\n%s".intern + val warningFormat = "[WARN] [%s] [%s] [%s] %s".intern + val infoFormat = "[INFO] [%s] [%s] [%s] %s".intern + val debugFormat = "[DEBUG] [%s] [%s] [%s] %s".intern + val genericFormat = "[GENERIC] [%s] [%s]".intern + + def apply(app: AkkaApplication, instance: AnyRef): Logging = new MainBusLogging(app.mainbus, instance) + def apply(bus: MainBus, instance: AnyRef): Logging = new MainBusLogging(bus, instance) + + class EventHandlerException extends AkkaException + + sealed trait LogEvent { + @transient + val thread: Thread = Thread.currentThread + def level: LogLevel + } + + case class Error(cause: Throwable, instance: AnyRef, message: Any = "") extends LogEvent { + def level = ErrorLevel + } + object Error { + def apply(instance: AnyRef, message: Any) = new Error(new EventHandlerException, instance, message) + } + + case class Warning(instance: AnyRef, message: Any = "") extends LogEvent { + def level = WarningLevel + } + + case class Info(instance: AnyRef, message: Any = "") extends LogEvent { + def level = InfoLevel + } + + case class Debug(instance: AnyRef, message: Any = "") extends LogEvent { + def level = DebugLevel + } + + case class InitializeLogger(bus: LoggingBus) + + trait StdOutLogger { + import java.text.SimpleDateFormat + import java.util.Date + + val dateFormat = new SimpleDateFormat("MM/dd/yyyy HH:mm:ss.S") + + def timestamp = dateFormat.format(new Date) + + def print(event: Any) { + event match { + case e: Error ⇒ error(e) + case e: Warning ⇒ warning(e) + case e: Info ⇒ info(e) + case e: Debug ⇒ debug(e) + case e ⇒ generic(e) + } + } + + def error(event: Error) = + println(errorFormat.format( + timestamp, + event.thread.getName, + instanceName(event.instance), + event.message, + stackTraceFor(event.cause))) + + def warning(event: Warning) = + println(warningFormat.format( + timestamp, + event.thread.getName, + instanceName(event.instance), + event.message)) + + def info(event: Info) = + println(infoFormat.format( + timestamp, + event.thread.getName, + instanceName(event.instance), + event.message)) + + def debug(event: Debug) = + println(debugFormat.format( + timestamp, + event.thread.getName, + instanceName(event.instance), + event.message)) + + def generic(event: Any) = + println(genericFormat.format(timestamp, event.toString)) + + def instanceName(instance: AnyRef): String = instance match { + case null ⇒ "NULL" + case a: ActorRef ⇒ a.address + case _ ⇒ instance.getClass.getSimpleName + } + } + + class StandardOutLogger extends MinimalActorRef with StdOutLogger { + override val toString = "StandardOutLogger" + override def postMessageToMailbox(obj: Any, channel: UntypedChannel) { print(obj) } + } + val StandardOutLogger = new StandardOutLogger + val DefaultLoggerName = StandardOutLogger.getClass.getName + + class DefaultLogger extends Actor with StdOutLogger { + def receive = { + case InitializeLogger(_) ⇒ + case event: LogEvent ⇒ print(event) + } + } + + def stackTraceFor(e: Throwable) = { + if (e ne null) { + import java.io.{ StringWriter, PrintWriter } + val sw = new StringWriter + val pw = new PrintWriter(sw) + e.printStackTrace(pw) + sw.toString + } else { + "[NO STACK TRACE]" + } + } + +} /** * Logging wrapper to make nicer and optimize: provide template versions which @@ -69,40 +320,21 @@ trait Logging { } -trait ActorLogging extends Logging { this: Actor ⇒ +class MainBusLogging(val mainbus: MainBus, val loggingInstance: AnyRef) extends Logging { - import EventHandler._ + import Logging._ - def isErrorEnabled = app.eventHandler.level >= ErrorLevel - def isWarningEnabled = app.eventHandler.level >= WarningLevel - def isInfoEnabled = app.eventHandler.level >= InfoLevel - def isDebugEnabled = app.eventHandler.level >= DebugLevel + def isErrorEnabled = mainbus.logLevel >= ErrorLevel + def isWarningEnabled = mainbus.logLevel >= WarningLevel + def isInfoEnabled = mainbus.logLevel >= InfoLevel + def isDebugEnabled = mainbus.logLevel >= DebugLevel - protected def notifyError(cause: Throwable, message: String) { app.eventHandler.notifyListeners(Error(cause, context.self, message)) } + protected def notifyError(cause: Throwable, message: String) { mainbus.publish(Error(cause, loggingInstance, message)) } - protected def notifyWarning(message: String) { app.eventHandler.notifyListeners(Warning(context.self, message)) } + protected def notifyWarning(message: String) { mainbus.publish(Warning(loggingInstance, message)) } - protected def notifyInfo(message: String) { app.eventHandler.notifyListeners(Info(context.self, message)) } + protected def notifyInfo(message: String) { mainbus.publish(Info(loggingInstance, message)) } - protected def notifyDebug(message: String) { app.eventHandler.notifyListeners(Debug(context.self, message)) } - -} - -class EventHandlerLogging(val eventHandler: EventHandler, val loggingInstance: AnyRef) extends Logging { - - import EventHandler._ - - def isErrorEnabled = eventHandler.level >= ErrorLevel - def isWarningEnabled = eventHandler.level >= WarningLevel - def isInfoEnabled = eventHandler.level >= InfoLevel - def isDebugEnabled = eventHandler.level >= DebugLevel - - protected def notifyError(cause: Throwable, message: String) { eventHandler.notifyListeners(Error(cause, loggingInstance, message)) } - - protected def notifyWarning(message: String) { eventHandler.notifyListeners(Warning(loggingInstance, message)) } - - protected def notifyInfo(message: String) { eventHandler.notifyListeners(Info(loggingInstance, message)) } - - protected def notifyDebug(message: String) { eventHandler.notifyListeners(Debug(loggingInstance, message)) } + protected def notifyDebug(message: String) { mainbus.publish(Debug(loggingInstance, message)) } } \ No newline at end of file diff --git a/akka-actor/src/main/scala/akka/event/MainBus.scala b/akka-actor/src/main/scala/akka/event/MainBus.scala new file mode 100644 index 0000000000..ffaa41ca5a --- /dev/null +++ b/akka-actor/src/main/scala/akka/event/MainBus.scala @@ -0,0 +1,56 @@ +/** + * Copyright (C) 2009-2011 Typesafe Inc. + */ +package akka.event + +import akka.actor.{ ActorRef, Actor, Props } +import akka.AkkaApplication +import akka.actor.Terminated + +class MainBus(debug: Boolean = false) extends LoggingBus with LookupClassification { + + type Event = AnyRef + type Classifier = Class[_] + + @volatile + private var reaper: ActorRef = _ + + protected def mapSize = 16 + + protected def classify(event: AnyRef): Class[_] = event.getClass + + protected def publish(event: AnyRef, subscriber: ActorRef) = subscriber ! event + + override def subscribe(subscriber: ActorRef, channel: Class[_]): Boolean = { + if (debug) publish(Logging.Debug(this, "subscribing " + subscriber + " to channel " + channel)) + if (reaper ne null) reaper ! subscriber + super.subscribe(subscriber, channel) + } + + override def unsubscribe(subscriber: ActorRef, channel: Class[_]): Boolean = { + if (debug) publish(Logging.Debug(this, "unsubscribing " + subscriber + " from channel " + channel)) + super.unsubscribe(subscriber, channel) + } + + override def unsubscribe(subscriber: ActorRef) { + if (debug) publish(Logging.Debug(this, "unsubscribing " + subscriber + " from all channels")) + super.unsubscribe(subscriber) + } + + def start(app: AkkaApplication) { + reaper = app.systemActorOf(Props(new Actor { + def receive = loggable(context.self) { + case ref: ActorRef ⇒ watch(ref) + case Terminated(ref) ⇒ unsubscribe(ref) + } + }), "MainBusReaper") + subscribers.values foreach (reaper ! _) + } + + def printSubscribers: String = { + val sb = new StringBuilder + for (c ← subscribers.keys) sb.append(c + " -> " + subscribers.valueIterator(c).mkString("[", ", ", "]")) + sb.toString + } + +} \ No newline at end of file diff --git a/akka-actor/src/main/scala/akka/remote/RemoteInterface.scala b/akka-actor/src/main/scala/akka/remote/RemoteInterface.scala index c1050c7842..4650f945ba 100644 --- a/akka-actor/src/main/scala/akka/remote/RemoteInterface.scala +++ b/akka-actor/src/main/scala/akka/remote/RemoteInterface.scala @@ -30,7 +30,7 @@ trait RemoteModule { val UUID_PREFIX = "uuid:".intern def optimizeLocalScoped_?(): Boolean //Apply optimizations for remote operations in local scope - protected[akka] def notifyListeners(message: ⇒ Any): Unit + protected[akka] def notifyListeners(message: RemoteLifeCycleEvent): Unit private[akka] def actors: ConcurrentHashMap[String, ActorRef] // FIXME need to invalidate this cache on replication private[akka] def actorsByUuid: ConcurrentHashMap[String, ActorRef] // FIXME remove actorsByUuid map? @@ -193,7 +193,7 @@ abstract class RemoteSupport(val app: AkkaApplication) extends RemoteServerModul clear } - protected[akka] override def notifyListeners(message: ⇒ Any): Unit = app.eventHandler.notify(message) + protected[akka] override def notifyListeners(message: RemoteLifeCycleEvent): Unit = app.mainbus.publish(message) private[akka] val actors = new ConcurrentHashMap[String, ActorRef] private[akka] val actorsByUuid = new ConcurrentHashMap[String, ActorRef] diff --git a/akka-actor/src/main/scala/akka/routing/Routing.scala b/akka-actor/src/main/scala/akka/routing/Routing.scala index 77ce3de642..29d30ef489 100644 --- a/akka-actor/src/main/scala/akka/routing/Routing.scala +++ b/akka-actor/src/main/scala/akka/routing/Routing.scala @@ -6,7 +6,6 @@ package akka.routing import akka.AkkaException import akka.actor._ -import akka.event.EventHandler import akka.config.ConfigurationException import akka.dispatch.{ Future, MessageDispatcher } import akka.AkkaApplication diff --git a/akka-actor/src/main/scala/akka/util/Index.scala b/akka-actor/src/main/scala/akka/util/Index.scala index afbb7a2c20..f79ea921dd 100644 --- a/akka-actor/src/main/scala/akka/util/Index.scala +++ b/akka-actor/src/main/scala/akka/util/Index.scala @@ -8,6 +8,7 @@ import annotation.tailrec import java.util.concurrent.{ ConcurrentSkipListSet, ConcurrentHashMap } import java.util.{ Comparator, Set ⇒ JSet } +import scala.collection.mutable /** * An implementation of a ConcurrentMultiMap @@ -98,6 +99,24 @@ class Index[K, V](val mapSize: Int, val valueComparator: Comparator[V]) { container.entrySet foreach { e ⇒ e.getValue.foreach(fun(e.getKey, _)) } } + /** + * Returns the value set. + */ + def values = { + import scala.collection.JavaConversions._ + val builder = mutable.Set.empty[V] + for { + entry ← container.entrySet + v ← entry.getValue + } builder += v + builder.toSet + } + + /** + * Returns the key set. + */ + def keys = scala.collection.JavaConversions.asScalaIterable(container.keySet) + /** * Disassociates the value of type V from the key of type K * @return true if the value was disassociated from the key and false if it wasn't previously associated with the key diff --git a/akka-actor/src/main/scala/akka/util/JMX.scala b/akka-actor/src/main/scala/akka/util/JMX.scala index d4e287c2e3..ad735ae39b 100644 --- a/akka-actor/src/main/scala/akka/util/JMX.scala +++ b/akka-actor/src/main/scala/akka/util/JMX.scala @@ -4,7 +4,7 @@ package akka.util -import akka.event.EventHandler +import akka.event.Logging.Error import java.lang.management.ManagementFactory import javax.management.{ ObjectInstance, ObjectName, InstanceAlreadyExistsException, InstanceNotFoundException } import akka.AkkaApplication @@ -24,7 +24,7 @@ object JMX { case e: InstanceAlreadyExistsException ⇒ Some(mbeanServer.getObjectInstance(name)) case e: Exception ⇒ - app.eventHandler.error(e, this, "Error when registering mbean [%s]".format(mbean)) + app.mainbus.publish(Error(e, this, "Error when registering mbean [%s]".format(mbean))) None } @@ -32,6 +32,6 @@ object JMX { mbeanServer.unregisterMBean(mbean) } catch { case e: InstanceNotFoundException ⇒ {} - case e: Exception ⇒ app.eventHandler.error(e, this, "Error while unregistering mbean [%s]".format(mbean)) + case e: Exception ⇒ app.mainbus.publish(Error(e, this, "Error while unregistering mbean [%s]".format(mbean))) } } diff --git a/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala b/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala index a1e05cc819..4bacf85636 100644 --- a/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala +++ b/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala @@ -3,12 +3,13 @@ */ package akka.util + import akka.dispatch.Envelope import akka.config.ModuleNotAvailableException import akka.actor._ import DeploymentConfig.ReplicationScheme import akka.config.ModuleNotAvailableException -import akka.event.EventHandler +import akka.event.Logging.Debug import akka.cluster.ClusterNode import akka.remote.{ RemoteSupport, RemoteService } import akka.routing.{ RoutedProps, Router } @@ -167,7 +168,7 @@ class ReflectiveAccess(val app: AkkaApplication) { if (!isEnabled) { val e = new ModuleNotAvailableException( "Can't load the cluster module, make sure it is enabled in the config ('akka.enabled-modules = [\"cluster\"])' and that akka-cluster.jar is on the classpath") - app.eventHandler.debug(this, e.toString) + app.mainbus.publish(Debug(this, e.toString)) throw e } } @@ -175,21 +176,21 @@ class ReflectiveAccess(val app: AkkaApplication) { lazy val clusterInstance: Option[Cluster] = getObjectFor("akka.cluster.Cluster$") match { case Right(value) ⇒ Some(value) case Left(exception) ⇒ - app.eventHandler.debug(this, exception.toString) + app.mainbus.publish(Debug(this, exception.toString)) None } lazy val clusterDeployerInstance: Option[ActorDeployer] = getObjectFor("akka.cluster.ClusterDeployer$") match { case Right(value) ⇒ Some(value) case Left(exception) ⇒ - app.eventHandler.debug(this, exception.toString) + app.mainbus.publish(Debug(this, exception.toString)) None } lazy val transactionLogInstance: Option[TransactionLogObject] = getObjectFor("akka.cluster.TransactionLog$") match { case Right(value) ⇒ Some(value) case Left(exception) ⇒ - app.eventHandler.debug(this, exception.toString) + app.mainbus.publish(Debug(this, exception.toString)) None } diff --git a/akka-docs/scala/code/ActorDocSpec.scala b/akka-docs/scala/code/ActorDocSpec.scala index a8998d7691..72e7adad73 100644 --- a/akka-docs/scala/code/ActorDocSpec.scala +++ b/akka-docs/scala/code/ActorDocSpec.scala @@ -7,15 +7,16 @@ import akka.util.duration._ //#imports import akka.actor.Actor -import akka.event.EventHandler +import akka.event.Logging //#imports //#my-actor class MyActor extends Actor { + val log = Logging(app, this) def receive = { - case "test" ⇒ app.eventHandler.info(this, "received test") - case _ ⇒ app.eventHandler.info(this, "received unknown message") + case "test" ⇒ log.info("received test") + case _ ⇒ log.info("received unknown message") } } //#my-actor @@ -30,23 +31,21 @@ class ActorDocSpec extends AkkaSpec { // testing the actor // TODO: convert docs to AkkaSpec(Configuration(...)) - app.eventHandler.notify(TestEvent.Mute(EventFilter.custom { - case e: EventHandler.Info ⇒ true - case _ ⇒ false - })) - app.eventHandler.addListener(testActor) - val eventLevel = app.eventHandler.level - app.eventHandler.level = EventHandler.InfoLevel + val filter = EventFilter.custom { + case e: Logging.Info ⇒ true + case _ ⇒ false + } + app.mainbus.publish(TestEvent.Mute(filter)) + app.mainbus.subscribe(testActor, classOf[Logging.Info]) myActor ! "test" - expectMsgPF(1 second) { case EventHandler.Info(_, "received test") ⇒ true } + expectMsgPF(1 second) { case Logging.Info(_, "received test") ⇒ true } myActor ! "unknown" - expectMsgPF(1 second) { case EventHandler.Info(_, "received unknown message") ⇒ true } + expectMsgPF(1 second) { case Logging.Info(_, "received unknown message") ⇒ true } - app.eventHandler.level = eventLevel - app.eventHandler.removeListener(testActor) - app.eventHandler.notify(TestEvent.UnMuteAll) + app.mainbus.unsubscribe(testActor) + app.mainbus.publish(TestEvent.UnMute(filter)) myActor.stop() } diff --git a/akka-http/src/main/scala/akka/http/Mist.scala b/akka-http/src/main/scala/akka/http/Mist.scala index a079ee8ecd..7f71faa1f8 100644 --- a/akka-http/src/main/scala/akka/http/Mist.scala +++ b/akka-http/src/main/scala/akka/http/Mist.scala @@ -4,7 +4,7 @@ package akka.http -import akka.event.EventHandler +import akka.event.Logging import akka.config.ConfigurationException import javax.servlet.http.{ HttpServletResponse, HttpServletRequest } import javax.servlet.http.HttpServlet @@ -360,7 +360,7 @@ trait RequestMethod { } } catch { case io: Exception ⇒ - app.eventHandler.error(io, this, io.getMessage) + app.mainbus.publish(Logging.Error(io, this, io.getMessage)) false } case None ⇒ false @@ -376,7 +376,7 @@ trait RequestMethod { } } catch { case io: IOException ⇒ - app.eventHandler.error(io, this, io.getMessage) + app.mainbus.publish(Logging.Error(io, this, io.getMessage)) } case None ⇒ {} } diff --git a/akka-http/src/main/scala/akka/http/Servlet30Context.scala b/akka-http/src/main/scala/akka/http/Servlet30Context.scala index fd797b8a5c..e70ce0d360 100644 --- a/akka-http/src/main/scala/akka/http/Servlet30Context.scala +++ b/akka-http/src/main/scala/akka/http/Servlet30Context.scala @@ -7,6 +7,7 @@ package akka.http import javax.servlet.{ AsyncContext, AsyncListener, AsyncEvent } import Types._ import akka.AkkaApplication +import akka.event.Logging /** * @author Garrick Evans @@ -35,7 +36,7 @@ trait Servlet30Context extends AsyncListener { true } catch { case e: IllegalStateException ⇒ - app.eventHandler.error(e, this, e.getMessage) + app.mainbus.publish(Logging.Error(e, this, e.getMessage)) false } } @@ -46,7 +47,7 @@ trait Servlet30Context extends AsyncListener { def onComplete(e: AsyncEvent) {} def onError(e: AsyncEvent) = e.getThrowable match { case null ⇒ - case t ⇒ app.eventHandler.error(t, this, t.getMessage) + case t ⇒ app.mainbus.publish(Logging.Error(t, this, t.getMessage)) } def onStartAsync(e: AsyncEvent) {} def onTimeout(e: AsyncEvent) = { diff --git a/akka-remote/src/main/scala/akka/remote/Remote.scala b/akka-remote/src/main/scala/akka/remote/Remote.scala index 550b1a20c7..e21c4f2ef5 100644 --- a/akka-remote/src/main/scala/akka/remote/Remote.scala +++ b/akka-remote/src/main/scala/akka/remote/Remote.scala @@ -6,7 +6,7 @@ package akka.remote import akka.AkkaApplication import akka.actor._ -import akka.event.EventHandler +import akka.event.Logging import akka.dispatch.{ Dispatchers, Future, PinnedDispatcher } import akka.actor.Status._ import akka.util._ @@ -29,6 +29,8 @@ import com.eaio.uuid.UUID */ class Remote(val app: AkkaApplication) extends RemoteService { + val log = Logging(app, this) + import app._ import app.config import app.AkkaConfig._ @@ -75,8 +77,8 @@ class Remote(val app: AkkaApplication) extends RemoteService { val remote = new akka.remote.netty.NettyRemoteSupport(app) remote.start(hostname, port) remote.register(remoteDaemonServiceName, remoteDaemon) - app.eventHandler.addListener(eventStream.channel) - app.eventHandler.addListener(remoteClientLifeCycleHandler) + app.mainbus.subscribe(eventStream.channel, classOf[RemoteLifeCycleEvent]) + app.mainbus.subscribe(remoteClientLifeCycleHandler, classOf[RemoteLifeCycleEvent]) // TODO actually register this provider in app in remote mode //provider.register(ActorRefProvider.RemoteProvider, new RemoteActorRefProvider) remote @@ -86,7 +88,7 @@ class Remote(val app: AkkaApplication) extends RemoteService { def start() { val triggerLazyServerVal = address.toString - eventHandler.info(this, "Starting remote server on [%s]".format(triggerLazyServerVal)) + log.info("Starting remote server on [{}]", triggerLazyServerVal) } def uuidProtocolToUuid(uuid: UuidProtocol): UUID = new UUID(uuid.getHigh, uuid.getLow) @@ -108,16 +110,14 @@ class Remote(val app: AkkaApplication) extends RemoteService { class RemoteSystemDaemon(remote: Remote) extends Actor { import remote._ - import remote.app._ override def preRestart(reason: Throwable, msg: Option[Any]) { - eventHandler.debug(this, "RemoteSystemDaemon failed due to [%s] - restarting...".format(reason)) + log.debug("RemoteSystemDaemon failed due to [{}] - restarting...", reason) } def receive: Actor.Receive = { case message: RemoteSystemDaemonMessageProtocol ⇒ - eventHandler.debug(this, - "Received command [\n%s] to RemoteSystemDaemon on [%s]".format(message, nodename)) + log.debug("Received command [\n{}] to RemoteSystemDaemon on [{}]", message, app.nodename) message.getMessageType match { case USE ⇒ handleUse(message) @@ -135,7 +135,7 @@ class RemoteSystemDaemon(remote: Remote) extends Actor { //TODO: should we not deal with unrecognized message types? } - case unknown ⇒ eventHandler.warning(this, "Unknown message to RemoteSystemDaemon [%s]".format(unknown)) + case unknown ⇒ log.warning("Unknown message to RemoteSystemDaemon [{}]", unknown) } def handleUse(message: RemoteSystemDaemonMessageProtocol) { @@ -147,7 +147,7 @@ class RemoteSystemDaemon(remote: Remote) extends Actor { else message.getPayload.toByteArray val actorFactory = - serialization.deserialize(actorFactoryBytes, classOf[() ⇒ Actor], None) match { + app.serialization.deserialize(actorFactoryBytes, classOf[() ⇒ Actor], None) match { case Left(error) ⇒ throw error case Right(instance) ⇒ instance.asInstanceOf[() ⇒ Actor] } @@ -158,7 +158,7 @@ class RemoteSystemDaemon(remote: Remote) extends Actor { server.register(actorAddress, newActorRef) } else { - eventHandler.error(this, "Actor 'address' for actor to instantiate is not defined, ignoring remote system daemon command [%s]".format(message)) + log.error("Actor 'address' for actor to instantiate is not defined, ignoring remote system daemon command [{}]", message) } channel ! Success(address.toString) @@ -232,7 +232,7 @@ class RemoteSystemDaemon(remote: Remote) extends Actor { } private def payloadFor[T](message: RemoteSystemDaemonMessageProtocol, clazz: Class[T]): T = { - serialization.deserialize(message.getPayload.toByteArray, clazz, None) match { + app.serialization.deserialize(message.getPayload.toByteArray, clazz, None) match { case Left(error) ⇒ throw error case Right(instance) ⇒ instance.asInstanceOf[T] } diff --git a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala index b87eae50aa..1092bd6875 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala @@ -12,7 +12,7 @@ import akka.routing._ import akka.dispatch._ import akka.util.duration._ import akka.config.ConfigurationException -import akka.event.{ DeathWatch, EventHandler } +import akka.event.{ DeathWatch, Logging } import akka.serialization.{ Serialization, Serializer, ActorSerialization, Compression } import akka.serialization.Compression.LZF import akka.remote.RemoteProtocol._ @@ -30,6 +30,8 @@ import com.google.protobuf.ByteString */ class RemoteActorRefProvider(val app: AkkaApplication) extends ActorRefProvider { + val log = Logging(app, this) + import java.util.concurrent.ConcurrentHashMap import akka.dispatch.Promise @@ -167,7 +169,7 @@ class RemoteActorRefProvider(val app: AkkaApplication) extends ActorRefProvider * Using (checking out) actor on a specific node. */ def useActorOnNode(remoteAddress: InetSocketAddress, actorAddress: String, actorFactory: () ⇒ Actor) { - app.eventHandler.debug(this, "Instantiating Actor [%s] on node [%s]".format(actorAddress, remoteAddress)) + log.debug("Instantiating Actor [{}] on node [{}]", actorAddress, remoteAddress) val actorFactoryBytes = app.serialization.serialize(actorFactory) match { @@ -198,20 +200,20 @@ class RemoteActorRefProvider(val app: AkkaApplication) extends ActorRefProvider try { (connection ? (command, remote.remoteSystemDaemonAckTimeout)).as[Status] match { case Some(Success(receiver)) ⇒ - app.eventHandler.debug(this, "Remote system command sent to [%s] successfully received".format(receiver)) + log.debug("Remote system command sent to [{}] successfully received", receiver) case Some(Failure(cause)) ⇒ - app.eventHandler.error(cause, this, cause.toString) + log.error(cause, cause.toString) throw cause case None ⇒ val error = new RemoteException("Remote system command to [%s] timed out".format(connection.address)) - app.eventHandler.error(error, this, error.toString) + log.error(error, error.toString) throw error } } catch { case e: Exception ⇒ - app.eventHandler.error(e, this, "Could not send remote system command to [%s] due to: %s".format(connection.address, e.toString)) + log.error(e, "Could not send remote system command to [{}] due to: {}", connection.address, e.toString) throw e } } else { diff --git a/akka-remote/src/main/scala/akka/remote/RemoteConnectionManager.scala b/akka-remote/src/main/scala/akka/remote/RemoteConnectionManager.scala index f8837ec4f4..61a4e3172b 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteConnectionManager.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteConnectionManager.scala @@ -7,6 +7,7 @@ package akka.remote import akka.actor._ import akka.routing._ import akka.AkkaApplication +import akka.event.Logging import scala.collection.immutable.Map import scala.annotation.tailrec @@ -25,6 +26,8 @@ class RemoteConnectionManager( initialConnections: Map[InetSocketAddress, ActorRef] = Map.empty[InetSocketAddress, ActorRef]) extends ConnectionManager { + val log = Logging(app, this) + // FIXME is this VersionedIterable really needed? It is not used I think. Complicates API. See 'def connections' etc. case class State(version: Long, connections: Map[InetSocketAddress, ActorRef]) extends VersionedIterable[ActorRef] { @@ -62,7 +65,7 @@ class RemoteConnectionManager( @tailrec final def failOver(from: InetSocketAddress, to: InetSocketAddress) { - app.eventHandler.debug(this, "Failing over connection from [%s] to [%s]".format(from, to)) + log.debug("Failing over connection from [{}] to [{}]", from, to) val oldState = state.get var changed = false @@ -113,7 +116,7 @@ class RemoteConnectionManager( if (!state.compareAndSet(oldState, newState)) { remove(faultyConnection) // recur } else { - app.eventHandler.debug(this, "Removing connection [%s]".format(faultyAddress)) + log.debug("Removing connection [{}]", faultyAddress) } } } @@ -140,7 +143,7 @@ class RemoteConnectionManager( putIfAbsent(address, newConnectionFactory) // recur } else { // we succeeded - app.eventHandler.debug(this, "Adding connection [%s]".format(address)) + log.debug("Adding connection [{}]", address) newConnection // return new connection actor } } diff --git a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala index d143b20013..b4194f5926 100644 --- a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala +++ b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala @@ -28,6 +28,7 @@ import akka.AkkaException import akka.AkkaApplication import akka.serialization.RemoteActorSerialization import akka.dispatch.{ Terminate, ActorPromise, DefaultPromise, Promise } +import akka.event.Logging class RemoteClientMessageBufferException(message: String, cause: Throwable = null) extends AkkaException(message, cause) { def this(msg: String) = this(msg, null); @@ -143,6 +144,8 @@ abstract class RemoteClient private[akka] ( val module: NettyRemoteClientModule, val remoteAddress: InetSocketAddress) { + val log = Logging(app, this) + val name = simpleName(this) + "@" + remoteAddress.getAddress.getHostAddress + "::" + remoteAddress.getPort @@ -155,7 +158,7 @@ abstract class RemoteClient private[akka] ( private[remote] def isRunning = runSwitch.isOn - protected def notifyListeners(msg: ⇒ Any): Unit + protected def notifyListeners(msg: RemoteLifeCycleEvent): Unit protected def currentChannel: Channel @@ -186,7 +189,7 @@ abstract class RemoteClient private[akka] ( senderFuture: Option[Promise[T]]): Option[Promise[T]] = { if (isRunning) { - app.eventHandler.debug(this, "Sending to connection [%s] message [\n%s]".format(remoteAddress, request)) + log.debug("Sending to connection [{}] message [\n{}]", remoteAddress, request) // tell if (request.getOneWay) { @@ -253,7 +256,7 @@ class ActiveRemoteClient private[akka] ( module: NettyRemoteClientModule, remoteAddress: InetSocketAddress, val loader: Option[ClassLoader] = None, - notifyListenersFun: (⇒ Any) ⇒ Unit) + notifyListenersFun: (RemoteLifeCycleEvent) ⇒ Unit) extends RemoteClient(_app, remoteSupport, module, remoteAddress) { val settings = new RemoteClientSettings(app) @@ -271,7 +274,7 @@ class ActiveRemoteClient private[akka] ( @volatile private var reconnectionTimeWindowStart = 0L - def notifyListeners(msg: ⇒ Any): Unit = notifyListenersFun(msg) + def notifyListeners(msg: RemoteLifeCycleEvent): Unit = notifyListenersFun(msg) def currentChannel = connection.getChannel @@ -293,14 +296,14 @@ class ActiveRemoteClient private[akka] ( } def attemptReconnect(): Boolean = { - app.eventHandler.debug(this, "Remote client reconnecting to [%s]".format(remoteAddress)) + log.debug("Remote client reconnecting to [{}]", remoteAddress) val connection = bootstrap.connect(remoteAddress) openChannels.add(connection.awaitUninterruptibly.getChannel) // Wait until the connection attempt succeeds or fails. if (!connection.isSuccess) { notifyListeners(RemoteClientError(connection.getCause, module, remoteAddress)) - app.eventHandler.error(connection.getCause, this, "Reconnection to [%s] has failed".format(remoteAddress)) + log.error(connection.getCause, "Reconnection to [{}] has failed", remoteAddress) false } else { @@ -318,7 +321,7 @@ class ActiveRemoteClient private[akka] ( bootstrap.setOption("tcpNoDelay", true) bootstrap.setOption("keepAlive", true) - app.eventHandler.debug(this, "Starting remote client connection to [%s]".format(remoteAddress)) + log.debug("Starting remote client connection to [{}]", remoteAddress) connection = bootstrap.connect(remoteAddress) @@ -327,7 +330,7 @@ class ActiveRemoteClient private[akka] ( if (!connection.isSuccess) { notifyListeners(RemoteClientError(connection.getCause, module, remoteAddress)) - app.eventHandler.error(connection.getCause, this, "Remote client connection to [%s] has failed".format(remoteAddress)) + log.error(connection.getCause, "Remote client connection to [{}] has failed", remoteAddress) false } else { @@ -354,7 +357,7 @@ class ActiveRemoteClient private[akka] ( case false if reconnectIfAlreadyConnected ⇒ closeChannel(connection) - app.eventHandler.debug(this, "Remote client reconnecting to [%s]".format(remoteAddress)) + log.debug("Remote client reconnecting to [{}]", remoteAddress) attemptReconnect() case false ⇒ false @@ -363,7 +366,7 @@ class ActiveRemoteClient private[akka] ( // Please note that this method does _not_ remove the ARC from the NettyRemoteClientModule's map of clients def shutdown() = runSwitch switchOff { - app.eventHandler.info(this, "Shutting down remote client [%s]".format(name)) + log.info("Shutting down remote client [{}]", name) notifyListeners(RemoteClientShutdown(module, remoteAddress)) timer.stop() @@ -374,7 +377,7 @@ class ActiveRemoteClient private[akka] ( bootstrap = null connection = null - app.eventHandler.info(this, "[%s] has been shut down".format(name)) + log.info("[{}] has been shut down", name) } private[akka] def isWithinReconnectionTimeWindow: Boolean = { @@ -384,7 +387,7 @@ class ActiveRemoteClient private[akka] ( } else { val timeLeft = (RECONNECTION_TIME_WINDOW - (System.currentTimeMillis - reconnectionTimeWindowStart)) > 0 if (timeLeft) { - app.eventHandler.info(this, "Will try to reconnect to remote server for another [%s] milliseconds".format(timeLeft)) + log.info("Will try to reconnect to remote server for another [{}] milliseconds", timeLeft) } timeLeft } @@ -435,6 +438,8 @@ class ActiveRemoteClientHandler( val client: ActiveRemoteClient) extends SimpleChannelUpstreamHandler { + val log = Logging(app, this) + implicit def _app = app override def messageReceived(ctx: ChannelHandlerContext, event: MessageEvent) { @@ -451,7 +456,7 @@ class ActiveRemoteClientHandler( case arp: AkkaRemoteProtocol if arp.hasMessage ⇒ val reply = arp.getMessage val replyUuid = uuidFrom(reply.getActorInfo.getUuid.getHigh, reply.getActorInfo.getUuid.getLow) - app.eventHandler.debug(this, "Remote client received RemoteMessageProtocol[\n%s]\nTrying to map back to future [%s]".format(reply, replyUuid)) + log.debug("Remote client received RemoteMessageProtocol[\n{}]\nTrying to map back to future [{}]", reply, replyUuid) futures.remove(replyUuid).asInstanceOf[Promise[Any]] match { case null ⇒ @@ -473,7 +478,7 @@ class ActiveRemoteClientHandler( } } catch { case e: Exception ⇒ - app.eventHandler.error(e, this, e.getMessage) + log.error(e, e.getMessage) client.notifyListeners(RemoteClientError(e, client.module, client.remoteAddress)) } } @@ -496,24 +501,24 @@ class ActiveRemoteClientHandler( override def channelConnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = { try { client.notifyListeners(RemoteClientConnected(client.module, client.remoteAddress)) - app.eventHandler.debug(this, "Remote client connected to [%s]".format(ctx.getChannel.getRemoteAddress)) + log.debug("Remote client connected to [{}]", ctx.getChannel.getRemoteAddress) client.resetReconnectionTimeWindow } catch { case e: Exception ⇒ - app.eventHandler.error(e, this, e.getMessage) + log.error(e, e.getMessage) client.notifyListeners(RemoteClientError(e, client.module, client.remoteAddress)) } } override def channelDisconnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = { client.notifyListeners(RemoteClientDisconnected(client.module, client.remoteAddress)) - app.eventHandler.debug(this, "Remote client disconnected from [%s]".format(ctx.getChannel.getRemoteAddress)) + log.debug("Remote client disconnected from [{}]", ctx.getChannel.getRemoteAddress) } override def exceptionCaught(ctx: ChannelHandlerContext, event: ExceptionEvent) = { val cause = event.getCause if (cause ne null) { - app.eventHandler.error(event.getCause, this, "Unexpected exception [%s] from downstream in remote client [%s]".format(event.getCause, event)) + log.error(event.getCause, "Unexpected exception [{}] from downstream in remote client [{}]", event.getCause, event) cause match { case e: ReadTimeoutException ⇒ @@ -525,7 +530,7 @@ class ActiveRemoteClientHandler( event.getChannel.close //FIXME Is this the correct behavior? } - } else app.eventHandler.error(this, "Unexpected exception from downstream in remote client [%s]".format(event)) + } else log.error("Unexpected exception from downstream in remote client [{}]", event) } private def parseException(reply: RemoteMessageProtocol, loader: Option[ClassLoader]): Throwable = { @@ -540,7 +545,7 @@ class ActiveRemoteClientHandler( .newInstance(exception.getMessage).asInstanceOf[Throwable] } catch { case problem: Exception ⇒ - app.eventHandler.error(problem, this, problem.getMessage) + log.error(problem, problem.getMessage) CannotInstantiateRemoteExceptionDueToRemoteProtocolParsingErrorException(problem, classname, exception.getMessage) } } @@ -575,15 +580,15 @@ class NettyRemoteSupport(_app: AkkaApplication) extends RemoteSupport(_app) with } val remoteInetSocketAddress = new InetSocketAddress(host, port) - app.eventHandler.debug(this, - "Creating RemoteActorRef with address [%s] connected to [%s]" - .format(actorAddress, remoteInetSocketAddress)) + log.debug("Creating RemoteActorRef with address [{}] connected to [{}]", actorAddress, remoteInetSocketAddress) RemoteActorRef(this, remoteInetSocketAddress, actorAddress, loader) } } class NettyRemoteServer(app: AkkaApplication, serverModule: NettyRemoteServerModule, val host: String, val port: Int, val loader: Option[ClassLoader]) { + val log = Logging(app, this) + val settings = new RemoteServerSettings(app) import settings._ @@ -618,7 +623,7 @@ class NettyRemoteServer(app: AkkaApplication, serverModule: NettyRemoteServerMod serverModule.notifyListeners(RemoteServerStarted(serverModule)) def shutdown() { - app.eventHandler.info(this, "Shutting down remote server [%s]".format(name)) + log.info("Shutting down remote server [{}]", name) try { val shutdownSignal = { val b = RemoteControlProtocol.newBuilder.setCommandType(CommandType.SHUTDOWN) @@ -634,7 +639,7 @@ class NettyRemoteServer(app: AkkaApplication, serverModule: NettyRemoteServerMod serverModule.notifyListeners(RemoteServerShutdown(serverModule)) } catch { case e: Exception ⇒ - app.eventHandler.error(e, this, e.getMessage) + log.error(e, e.getMessage) } } } @@ -642,6 +647,8 @@ class NettyRemoteServer(app: AkkaApplication, serverModule: NettyRemoteServerMod trait NettyRemoteServerModule extends RemoteServerModule { self: RemoteSupport ⇒ + val log = Logging(app, this) + def app: AkkaApplication def remoteSupport = self @@ -666,13 +673,13 @@ trait NettyRemoteServerModule extends RemoteServerModule { def start(_hostname: String, _port: Int, loader: Option[ClassLoader] = None): RemoteServerModule = guard withGuard { try { _isRunning switchOn { - app.eventHandler.debug(this, "Starting up remote server on %s:s".format(_hostname, _port)) + log.debug("Starting up remote server on {}:{}", _hostname, _port) currentServer.set(Some(new NettyRemoteServer(app, this, _hostname, _port, loader))) } } catch { case e: Exception ⇒ - app.eventHandler.error(e, this, e.getMessage) + log.error(e, e.getMessage) notifyListeners(RemoteServerError(e, this)) } this @@ -681,7 +688,7 @@ trait NettyRemoteServerModule extends RemoteServerModule { def shutdownServerModule() = guard withGuard { _isRunning switchOff { currentServer.getAndSet(None) foreach { instance ⇒ - app.eventHandler.debug(this, "Shutting down remote server on %s:%s".format(instance.host, instance.port)) + log.debug("Shutting down remote server on {}:{}", instance.host, instance.port) instance.shutdown() } } @@ -726,7 +733,7 @@ trait NettyRemoteServerModule extends RemoteServerModule { def unregister(actorRef: ActorRef): Unit = guard withGuard { if (_isRunning.isOn) { - app.eventHandler.debug(this, "Unregister server side remote actor with id [%s]".format(actorRef.uuid)) + log.debug("Unregister server side remote actor with id [{}]", actorRef.uuid) actors.remove(actorRef.address, actorRef) actorsByUuid.remove(actorRef.uuid.toString, actorRef) @@ -741,7 +748,7 @@ trait NettyRemoteServerModule extends RemoteServerModule { def unregister(id: String): Unit = guard withGuard { if (_isRunning.isOn) { - app.eventHandler.debug(this, "Unregister server side remote actor with id [%s]".format(id)) + log.debug("Unregister server side remote actor with id [{}]", id) if (id.startsWith(UUID_PREFIX)) actorsByUuid.remove(id.substring(UUID_PREFIX.length)) else { @@ -760,7 +767,7 @@ trait NettyRemoteServerModule extends RemoteServerModule { def unregisterPerSession(id: String) { if (_isRunning.isOn) { - app.eventHandler.info(this, "Unregistering server side remote actor with id [%s]".format(id)) + log.info("Unregistering server side remote actor with id [{}]", id) actorsFactories.remove(id) } @@ -832,6 +839,8 @@ class RemoteServerHandler( val applicationLoader: Option[ClassLoader], val server: NettyRemoteServerModule) extends SimpleChannelUpstreamHandler { + val log = Logging(app, this) + import settings._ implicit def app = server.app @@ -866,7 +875,7 @@ class RemoteServerHandler( override def channelConnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = { val clientAddress = getClientAddress(ctx) - app.eventHandler.debug(this, "Remote client [%s] connected to [%s]".format(clientAddress, server.name)) + log.debug("Remote client [{}] connected to [{}]", clientAddress, server.name) sessionActors.set(event.getChannel(), new ConcurrentHashMap[String, ActorRef]()) server.notifyListeners(RemoteServerClientConnected(server, clientAddress)) @@ -875,7 +884,7 @@ class RemoteServerHandler( override def channelDisconnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = { val clientAddress = getClientAddress(ctx) - app.eventHandler.debug(this, "Remote client [%s] disconnected from [%s]".format(clientAddress, server.name)) + log.debug("Remote client [{}] disconnected from [{}]", clientAddress, server.name) // stop all session actors for ( @@ -885,7 +894,7 @@ class RemoteServerHandler( try { actor ! PoisonPill } catch { - case e: Exception ⇒ app.eventHandler.error(e, this, "Couldn't stop %s".format(actor)) + case e: Exception ⇒ log.error(e, "Couldn't stop {}", actor) } } @@ -894,7 +903,7 @@ class RemoteServerHandler( override def channelClosed(ctx: ChannelHandlerContext, event: ChannelStateEvent) = { val clientAddress = getClientAddress(ctx) - app.eventHandler.debug("Remote client [%s] channel closed from [%s]".format(clientAddress, server.name), this) + log.debug("Remote client [{}] channel closed from [{}]", clientAddress, server.name) server.notifyListeners(RemoteServerClientClosed(server, clientAddress)) } @@ -914,7 +923,7 @@ class RemoteServerHandler( } override def exceptionCaught(ctx: ChannelHandlerContext, event: ExceptionEvent) = { - app.eventHandler.error(event.getCause, this, "Unexpected exception from remote downstream") + log.error(event.getCause, "Unexpected exception from remote downstream") event.getChannel.close server.notifyListeners(RemoteServerError(event.getCause, server)) @@ -927,25 +936,25 @@ class RemoteServerHandler( } private def handleRemoteMessageProtocol(request: RemoteMessageProtocol, channel: Channel) = try { - app.eventHandler.debug(this, "Received remote message [%s]".format(request)) + log.debug("Received remote message [{}]", request) dispatchToActor(request, channel) } catch { case e: Exception ⇒ server.notifyListeners(RemoteServerError(e, server)) - app.eventHandler.error(e, this, e.getMessage) + log.error(e, e.getMessage) } private def dispatchToActor(request: RemoteMessageProtocol, channel: Channel) { val actorInfo = request.getActorInfo - app.eventHandler.debug(this, "Dispatching to remote actor [%s]".format(actorInfo.getUuid)) + log.debug("Dispatching to remote actor [{}]", actorInfo.getUuid) val actorRef = try { actorOf(actorInfo, channel) } catch { case e: SecurityException ⇒ - app.eventHandler.error(e, this, e.getMessage) + log.error(e, e.getMessage) write(channel, createErrorReplyMessage(e, request)) server.notifyListeners(RemoteServerError(e, server)) return @@ -1002,9 +1011,7 @@ class RemoteServerHandler( val uuid = actorInfo.getUuid val address = actorInfo.getAddress - app.eventHandler.debug(this, - "Looking up a remotely available actor for address [%s] on node [%s]" - .format(address, app.nodename)) + log.debug("Looking up a remotely available actor for address [{}] on node [{}]", address, app.nodename) val byAddress = server.actors.get(address) // try actor-by-address if (byAddress eq null) { diff --git a/akka-remote/src/main/scala/akka/serialization/SerializationProtocol.scala b/akka-remote/src/main/scala/akka/serialization/SerializationProtocol.scala index c4bcfce6ab..378255c199 100644 --- a/akka-remote/src/main/scala/akka/serialization/SerializationProtocol.scala +++ b/akka-remote/src/main/scala/akka/serialization/SerializationProtocol.scala @@ -8,7 +8,7 @@ import akka.actor._ import akka.actor.DeploymentConfig._ import akka.dispatch.Envelope import akka.util.{ ReflectiveAccess, Duration } -import akka.event.EventHandler +import akka.event.Logging import akka.remote._ import RemoteProtocol._ import akka.AkkaApplication @@ -28,6 +28,8 @@ import com.eaio.uuid.UUID class ActorSerialization(val app: AkkaApplication, remote: RemoteSupport) { implicit val defaultSerializer = akka.serialization.JavaSerializer // Format.Default + val log = Logging(app, this) + val remoteActorSerialization = new RemoteActorSerialization(app, remote) def fromBinary[T <: Actor](bytes: Array[Byte], homeAddress: InetSocketAddress): ActorRef = @@ -144,7 +146,7 @@ class ActorSerialization(val app: AkkaApplication, remote: RemoteSupport) { overriddenUuid: Option[UUID], loader: Option[ClassLoader]): ActorRef = { - app.eventHandler.debug(this, "Deserializing SerializedActorRefProtocol to LocalActorRef:\n%s".format(protocol)) + log.debug("Deserializing SerializedActorRefProtocol to LocalActorRef:\n{}", protocol) // import ReplicationStorageType._ // import ReplicationStrategyType._ @@ -223,6 +225,8 @@ class ActorSerialization(val app: AkkaApplication, remote: RemoteSupport) { class RemoteActorSerialization(val app: AkkaApplication, remote: RemoteSupport) { + val log = Logging(app, this) + /** * Deserializes a byte array (Array[Byte]) into an RemoteActorRef instance. */ @@ -239,7 +243,7 @@ class RemoteActorSerialization(val app: AkkaApplication, remote: RemoteSupport) * Deserializes a RemoteActorRefProtocol Protocol Buffers (protobuf) Message into an RemoteActorRef instance. */ private[akka] def fromProtobufToRemoteActorRef(protocol: RemoteActorRefProtocol, loader: Option[ClassLoader]): ActorRef = { - app.eventHandler.debug(this, "Deserializing RemoteActorRefProtocol to RemoteActorRef:\n %s".format(protocol)) + log.debug("Deserializing RemoteActorRefProtocol to RemoteActorRef:\n{}", protocol) val ref = RemoteActorRef( remote, @@ -247,7 +251,7 @@ class RemoteActorSerialization(val app: AkkaApplication, remote: RemoteSupport) protocol.getAddress, loader) - app.eventHandler.debug(this, "Newly deserialized RemoteActorRef has uuid: %s".format(ref.uuid)) + log.debug("Newly deserialized RemoteActorRef has uuid: {}", ref.uuid) ref } @@ -266,7 +270,7 @@ class RemoteActorSerialization(val app: AkkaApplication, remote: RemoteSupport) app.defaultAddress } - app.eventHandler.debug(this, "Register serialized Actor [%s] as remote @ [%s]".format(actor.uuid, remoteAddress)) + log.debug("Register serialized Actor [{}] as remote @ [{}]", actor.uuid, remoteAddress) RemoteActorRefProtocol.newBuilder .setInetSocketAddress(ByteString.copyFrom(JavaSerializer.toBinary(remoteAddress))) diff --git a/akka-slf4j/src/main/scala/akka/event/slf4j/SLF4J.scala b/akka-slf4j/src/main/scala/akka/event/slf4j/SLF4J.scala index 4db40e6c28..0ffd1b8490 100644 --- a/akka-slf4j/src/main/scala/akka/event/slf4j/SLF4J.scala +++ b/akka-slf4j/src/main/scala/akka/event/slf4j/SLF4J.scala @@ -6,7 +6,7 @@ package akka.event.slf4j import org.slf4j.{ Logger ⇒ SLFLogger, LoggerFactory ⇒ SLFLoggerFactory } -import akka.event.EventHandler +import akka.event.Logging._ import akka.actor._ import Actor._ @@ -32,7 +32,6 @@ object Logger { * @author Jonas Bonér */ class Slf4jEventHandler extends Actor with Logging { - import EventHandler._ def receive = { case event @ Error(cause, instance, message) ⇒ diff --git a/akka-stm/src/test/java/akka/transactor/test/UntypedCoordinatedIncrementTest.java b/akka-stm/src/test/java/akka/transactor/test/UntypedCoordinatedIncrementTest.java index 6eeb1546b5..3a9491ed3f 100644 --- a/akka-stm/src/test/java/akka/transactor/test/UntypedCoordinatedIncrementTest.java +++ b/akka-stm/src/test/java/akka/transactor/test/UntypedCoordinatedIncrementTest.java @@ -12,7 +12,6 @@ import akka.actor.Props; import akka.actor.UntypedActor; import akka.actor.UntypedActorFactory; import akka.dispatch.Future; -import akka.event.EventHandler; import akka.testkit.EventFilter; import akka.testkit.ErrorFilter; import akka.testkit.TestEvent; @@ -77,7 +76,7 @@ public class UntypedCoordinatedIncrementTest { EventFilter expectedFailureFilter = (EventFilter) new ErrorFilter(ExpectedFailureException.class); EventFilter coordinatedFilter = (EventFilter) new ErrorFilter(CoordinatedTransactionException.class); Seq ignoreExceptions = seq(expectedFailureFilter, coordinatedFilter); - application.eventHandler().notify(new TestEvent.Mute(ignoreExceptions)); + application.mainbus().publish(new TestEvent.Mute(ignoreExceptions)); CountDownLatch incrementLatch = new CountDownLatch(numCounters); List actors = new ArrayList(counters); actors.add(failer); @@ -98,7 +97,7 @@ public class UntypedCoordinatedIncrementTest { } } } - application.eventHandler().notify(new TestEvent.UnMute(ignoreExceptions)); + application.mainbus().publish(new TestEvent.UnMute(ignoreExceptions)); } public Seq seq(A... args) { diff --git a/akka-stm/src/test/java/akka/transactor/test/UntypedTransactorTest.java b/akka-stm/src/test/java/akka/transactor/test/UntypedTransactorTest.java index 8dc2c2beae..6d99e83e51 100644 --- a/akka-stm/src/test/java/akka/transactor/test/UntypedTransactorTest.java +++ b/akka-stm/src/test/java/akka/transactor/test/UntypedTransactorTest.java @@ -11,7 +11,6 @@ import akka.actor.Props; import akka.actor.UntypedActor; import akka.actor.UntypedActorFactory; import akka.dispatch.Future; -import akka.event.EventHandler; import akka.testkit.EventFilter; import akka.testkit.ErrorFilter; import akka.testkit.TestEvent; @@ -76,7 +75,7 @@ public class UntypedTransactorTest { EventFilter expectedFailureFilter = (EventFilter) new ErrorFilter(ExpectedFailureException.class); EventFilter coordinatedFilter = (EventFilter) new ErrorFilter(CoordinatedTransactionException.class); Seq ignoreExceptions = seq(expectedFailureFilter, coordinatedFilter); - application.eventHandler().notify(new TestEvent.Mute(ignoreExceptions)); + application.mainbus().publish(new TestEvent.Mute(ignoreExceptions)); CountDownLatch incrementLatch = new CountDownLatch(numCounters); List actors = new ArrayList(counters); actors.add(failer); @@ -97,7 +96,7 @@ public class UntypedTransactorTest { } } } - application.eventHandler().notify(new TestEvent.UnMute(ignoreExceptions)); + application.mainbus().publish(new TestEvent.UnMute(ignoreExceptions)); } public Seq seq(A... args) { diff --git a/akka-stm/src/test/scala/transactor/CoordinatedIncrementSpec.scala b/akka-stm/src/test/scala/transactor/CoordinatedIncrementSpec.scala index 284786ef5a..e0f242a61b 100644 --- a/akka-stm/src/test/scala/transactor/CoordinatedIncrementSpec.scala +++ b/akka-stm/src/test/scala/transactor/CoordinatedIncrementSpec.scala @@ -7,7 +7,6 @@ import akka.transactor.Coordinated import akka.actor._ import akka.stm.{ Ref, TransactionFactory } import akka.util.duration._ -import akka.event.EventHandler import akka.transactor.CoordinatedTransactionException import akka.testkit._ @@ -86,17 +85,17 @@ class CoordinatedIncrementSpec extends AkkaSpec with BeforeAndAfterAll { EventFilter[ExpectedFailureException], EventFilter[CoordinatedTransactionException], EventFilter[ActorTimeoutException]) - app.eventHandler.notify(TestEvent.Mute(ignoreExceptions)) - val (counters, failer) = actorOfs - val coordinated = Coordinated() - counters(0) ! Coordinated(Increment(counters.tail :+ failer)) - coordinated.await - for (counter ← counters) { - (counter ? GetCount).as[Int].get must be === 0 + filterEvents(ignoreExceptions) { + val (counters, failer) = actorOfs + val coordinated = Coordinated() + counters(0) ! Coordinated(Increment(counters.tail :+ failer)) + coordinated.await + for (counter ← counters) { + (counter ? GetCount).as[Int].get must be === 0 + } + counters foreach (_.stop()) + failer.stop() } - counters foreach (_.stop()) - failer.stop() - app.eventHandler.notify(TestEvent.UnMute(ignoreExceptions)) } } } diff --git a/akka-stm/src/test/scala/transactor/FickleFriendsSpec.scala b/akka-stm/src/test/scala/transactor/FickleFriendsSpec.scala index e4b0eed68e..f8d0e8e8b2 100644 --- a/akka-stm/src/test/scala/transactor/FickleFriendsSpec.scala +++ b/akka-stm/src/test/scala/transactor/FickleFriendsSpec.scala @@ -9,7 +9,6 @@ import akka.transactor.Coordinated import akka.actor._ import akka.stm._ import akka.util.duration._ -import akka.event.EventHandler import akka.transactor.CoordinatedTransactionException import akka.testkit._ @@ -119,18 +118,18 @@ class FickleFriendsSpec extends AkkaSpec with BeforeAndAfterAll { EventFilter[ExpectedFailureException], EventFilter[CoordinatedTransactionException], EventFilter[ActorTimeoutException]) - app.eventHandler.notify(TestEvent.Mute(ignoreExceptions)) - val (counters, coordinator) = actorOfs - val latch = new CountDownLatch(1) - coordinator ! FriendlyIncrement(counters, latch) - latch.await // this could take a while - (coordinator ? GetCount).as[Int].get must be === 1 - for (counter ← counters) { - (counter ? GetCount).as[Int].get must be === 1 + filterEvents(ignoreExceptions) { + val (counters, coordinator) = actorOfs + val latch = new CountDownLatch(1) + coordinator ! FriendlyIncrement(counters, latch) + latch.await // this could take a while + (coordinator ? GetCount).as[Int].get must be === 1 + for (counter ← counters) { + (counter ? GetCount).as[Int].get must be === 1 + } + counters foreach (_.stop()) + coordinator.stop() } - counters foreach (_.stop()) - coordinator.stop() - app.eventHandler.notify(TestEvent.UnMute(ignoreExceptions)) } } } diff --git a/akka-stm/src/test/scala/transactor/TransactorSpec.scala b/akka-stm/src/test/scala/transactor/TransactorSpec.scala index 7c8dda761a..daea5134db 100644 --- a/akka-stm/src/test/scala/transactor/TransactorSpec.scala +++ b/akka-stm/src/test/scala/transactor/TransactorSpec.scala @@ -8,7 +8,6 @@ import akka.transactor.Transactor import akka.actor._ import akka.stm._ import akka.util.duration._ -import akka.event.EventHandler import akka.transactor.CoordinatedTransactionException import akka.testkit._ @@ -109,17 +108,17 @@ class TransactorSpec extends AkkaSpec { EventFilter[ExpectedFailureException], EventFilter[CoordinatedTransactionException], EventFilter[ActorTimeoutException]) - app.eventHandler.notify(TestEvent.Mute(ignoreExceptions)) - val (counters, failer) = createTransactors - val failLatch = TestLatch(numCounters) - counters(0) ! Increment(counters.tail :+ failer, failLatch) - failLatch.await - for (counter ← counters) { - (counter ? GetCount).as[Int].get must be === 0 + filterEvents(ignoreExceptions) { + val (counters, failer) = createTransactors + val failLatch = TestLatch(numCounters) + counters(0) ! Increment(counters.tail :+ failer, failLatch) + failLatch.await + for (counter ← counters) { + (counter ? GetCount).as[Int].get must be === 0 + } + counters foreach (_.stop()) + failer.stop() } - counters foreach (_.stop()) - failer.stop() - app.eventHandler.notify(TestEvent.UnMute(ignoreExceptions)) } } diff --git a/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala b/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala index ea38de78a1..089e3bac18 100644 --- a/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala +++ b/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala @@ -3,7 +3,7 @@ */ package akka.testkit -import akka.event.EventHandler +import akka.event.Logging.{ Warning, Error } import java.util.concurrent.locks.ReentrantLock import java.util.LinkedList import java.util.concurrent.RejectedExecutionException @@ -166,14 +166,14 @@ class CallingThreadDispatcher(_app: AkkaApplication, val name: String = "calling val execute = mbox.suspendSwitch.fold { queue.push(handle) if (warnings && handle.channel.isInstanceOf[Promise[_]]) { - app.eventHandler.warning(this, "suspendSwitch, creating Future could deadlock; target: %s" format receiver) + app.mainbus.publish(Warning(this, "suspendSwitch, creating Future could deadlock; target: %s" format receiver)) } false } { queue.push(handle) if (queue.isActive) { if (warnings && handle.channel.isInstanceOf[Promise[_]]) { - app.eventHandler.warning(this, "blocked on this thread, creating Future could deadlock; target: %s" format receiver) + app.mainbus.publish(Warning(this, "blocked on this thread, creating Future could deadlock; target: %s" format receiver)) } false } else { @@ -216,18 +216,18 @@ class CallingThreadDispatcher(_app: AkkaApplication, val name: String = "calling mbox.actor.invoke(handle) if (warnings) handle.channel match { case f: ActorPromise if !f.isCompleted ⇒ - app.eventHandler.warning(this, "calling %s with message %s did not reply as expected, might deadlock" format (mbox.actor, handle.message)) + app.mainbus.publish(Warning(this, "calling %s with message %s did not reply as expected, might deadlock" format (mbox.actor, handle.message))) case _ ⇒ } true } catch { case ie: InterruptedException ⇒ - app.eventHandler.error(this, ie) + app.mainbus.publish(Error(this, ie)) Thread.currentThread().interrupt() intex = ie true case e ⇒ - app.eventHandler.error(this, e) + app.mainbus.publish(Error(this, e)) queue.leave false } diff --git a/akka-testkit/src/main/scala/akka/testkit/TestActorRef.scala b/akka-testkit/src/main/scala/akka/testkit/TestActorRef.scala index 3402429ddd..c6827a3547 100644 --- a/akka-testkit/src/main/scala/akka/testkit/TestActorRef.scala +++ b/akka-testkit/src/main/scala/akka/testkit/TestActorRef.scala @@ -6,7 +6,6 @@ package akka.testkit import akka.actor._ import akka.util.ReflectiveAccess -import akka.event.EventHandler import com.eaio.uuid.UUID import akka.actor.Props._ import akka.AkkaApplication diff --git a/akka-testkit/src/main/scala/akka/testkit/TestEventListener.scala b/akka-testkit/src/main/scala/akka/testkit/TestEventListener.scala index c7773bdf80..ce7ef422fe 100644 --- a/akka-testkit/src/main/scala/akka/testkit/TestEventListener.scala +++ b/akka-testkit/src/main/scala/akka/testkit/TestEventListener.scala @@ -1,7 +1,6 @@ package akka.testkit -import akka.event.EventHandler -import akka.event.EventHandler.{ Event, Error } +import akka.event.Logging.{ LogEvent, Error, InitializeLogger } import akka.actor.Actor sealed trait TestEvent @@ -15,11 +14,10 @@ object TestEvent { def apply(filter: EventFilter, filters: EventFilter*): UnMute = new UnMute(filter +: filters.toSeq) } case class UnMute(filters: Seq[EventFilter]) extends TestEvent - case object UnMuteAll extends TestEvent } trait EventFilter { - def apply(event: Event): Boolean + def apply(event: LogEvent): Boolean } object EventFilter { @@ -36,19 +34,19 @@ object EventFilter { def apply[A <: Throwable: Manifest](source: AnyRef, message: String): EventFilter = ErrorSourceMessageFilter(manifest[A].erasure, source, message) - def custom(test: (Event) ⇒ Boolean): EventFilter = + def custom(test: (LogEvent) ⇒ Boolean): EventFilter = CustomEventFilter(test) } case class ErrorFilter(throwable: Class[_]) extends EventFilter { - def apply(event: Event) = event match { + def apply(event: LogEvent) = event match { case Error(cause, _, _) ⇒ throwable isInstance cause case _ ⇒ false } } case class ErrorMessageFilter(throwable: Class[_], message: String) extends EventFilter { - def apply(event: Event) = event match { + def apply(event: LogEvent) = event match { case Error(cause, _, _) if !(throwable isInstance cause) ⇒ false case Error(cause, _, null) if cause.getMessage eq null ⇒ cause.getStackTrace.length == 0 case Error(cause, _, null) ⇒ cause.getMessage startsWith message @@ -59,14 +57,14 @@ case class ErrorMessageFilter(throwable: Class[_], message: String) extends Even } case class ErrorSourceFilter(throwable: Class[_], source: AnyRef) extends EventFilter { - def apply(event: Event) = event match { + def apply(event: LogEvent) = event match { case Error(cause, instance, _) ⇒ (throwable isInstance cause) && (source eq instance) case _ ⇒ false } } case class ErrorSourceMessageFilter(throwable: Class[_], source: AnyRef, message: String) extends EventFilter { - def apply(event: Event) = event match { + def apply(event: LogEvent) = event match { case Error(cause, instance, _) if !((throwable isInstance cause) && (source eq instance)) ⇒ false case Error(cause, _, null) if cause.getMessage eq null ⇒ cause.getStackTrace.length == 0 case Error(cause, _, null) ⇒ cause.getMessage startsWith message @@ -76,23 +74,23 @@ case class ErrorSourceMessageFilter(throwable: Class[_], source: AnyRef, message } } -case class CustomEventFilter(test: (Event) ⇒ Boolean) extends EventFilter { - def apply(event: Event) = test(event) +case class CustomEventFilter(test: (LogEvent) ⇒ Boolean) extends EventFilter { + def apply(event: LogEvent) = test(event) } -class TestEventListener extends EventHandler.DefaultListener { +class TestEventListener extends akka.event.Logging.DefaultLogger { import TestEvent._ var filters: List[EventFilter] = Nil override def receive: Actor.Receive = ({ - case Mute(filters) ⇒ filters foreach addFilter - case UnMute(filters) ⇒ filters foreach removeFilter - case UnMuteAll ⇒ filters = Nil - case event: Event if filter(event) ⇒ + case InitializeLogger(bus) ⇒ Seq(classOf[Mute], classOf[UnMute]) foreach (bus.subscribe(context.self, _)) + case Mute(filters) ⇒ filters foreach addFilter + case UnMute(filters) ⇒ filters foreach removeFilter + case event: LogEvent if filter(event) ⇒ }: Actor.Receive) orElse super.receive - def filter(event: Event): Boolean = filters exists (f ⇒ try { f(event) } catch { case e: Exception ⇒ false }) + def filter(event: LogEvent): Boolean = filters exists (f ⇒ try { f(event) } catch { case e: Exception ⇒ false }) def addFilter(filter: EventFilter): Unit = filters ::= filter diff --git a/akka-testkit/src/main/scala/akka/testkit/TestKit.scala b/akka-testkit/src/main/scala/akka/testkit/TestKit.scala index c7fc73bb05..fbf8138bfe 100644 --- a/akka-testkit/src/main/scala/akka/testkit/TestKit.scala +++ b/akka-testkit/src/main/scala/akka/testkit/TestKit.scala @@ -99,7 +99,8 @@ class TestKit(_app: AkkaApplication) { * ActorRef of the test actor. Access is provided to enable e.g. * registration as message target. */ - val testActor: ActorRef = new LocalActorRef(app, Props(new TestActor(queue)).copy(dispatcher = new CallingThreadDispatcher(app)), app.guardian, "testActor" + TestKit.testActorId.incrementAndGet(), true) + val testActor: ActorRef = app.systemActorOf(Props(new TestActor(queue)).copy(dispatcher = new CallingThreadDispatcher(app)), + "testActor" + TestKit.testActorId.incrementAndGet) private var end: Duration = Duration.Inf diff --git a/akka-testkit/src/main/scala/akka/testkit/package.scala b/akka-testkit/src/main/scala/akka/testkit/package.scala index 54c6ed8182..384baa3d11 100644 --- a/akka-testkit/src/main/scala/akka/testkit/package.scala +++ b/akka-testkit/src/main/scala/akka/testkit/package.scala @@ -1,14 +1,12 @@ package akka -import akka.event.EventHandler - package object testkit { def filterEvents[T](eventFilters: Iterable[EventFilter])(block: ⇒ T)(implicit app: AkkaApplication): T = { - app.eventHandler.notify(TestEvent.Mute(eventFilters.toSeq)) + app.mainbus.publish(TestEvent.Mute(eventFilters.toSeq)) try { block } finally { - app.eventHandler.notify(TestEvent.UnMute(eventFilters.toSeq)) + app.mainbus.publish(TestEvent.UnMute(eventFilters.toSeq)) } } diff --git a/akka-testkit/src/test/scala/akka/testkit/AkkaSpec.scala b/akka-testkit/src/test/scala/akka/testkit/AkkaSpec.scala index b4341e949d..22f114b109 100644 --- a/akka-testkit/src/test/scala/akka/testkit/AkkaSpec.scala +++ b/akka-testkit/src/test/scala/akka/testkit/AkkaSpec.scala @@ -9,10 +9,13 @@ import org.scalatest.matchers.MustMatchers import akka.AkkaApplication import akka.actor.{ Actor, ActorRef, Props } import akka.dispatch.MessageDispatcher +import akka.event.{ Logging, MainBusLogging } abstract class AkkaSpec(_application: AkkaApplication = AkkaApplication()) extends TestKit(_application) with WordSpec with MustMatchers with BeforeAndAfterAll { + val log: Logging = new MainBusLogging(app.mainbus, this) + final override def beforeAll { atStartup() } diff --git a/akka-testkit/src/test/scala/akka/testkit/TestActorRefSpec.scala b/akka-testkit/src/test/scala/akka/testkit/TestActorRefSpec.scala index 3fb594a91e..6720299337 100644 --- a/akka-testkit/src/test/scala/akka/testkit/TestActorRefSpec.scala +++ b/akka-testkit/src/test/scala/akka/testkit/TestActorRefSpec.scala @@ -6,7 +6,7 @@ package akka.testkit import org.scalatest.matchers.MustMatchers import org.scalatest.{ BeforeAndAfterEach, WordSpec } import akka.actor._ -import akka.event.EventHandler +import akka.event.Logging.Warning import akka.dispatch.{ Future, Promise } import akka.AkkaApplication @@ -80,7 +80,6 @@ object TestActorRefSpec { } class Logger extends Actor { - import EventHandler._ var count = 0 var msg: String = _ def receive = { diff --git a/akka-testkit/src/test/scala/akka/testkit/TestProbeSpec.scala b/akka-testkit/src/test/scala/akka/testkit/TestProbeSpec.scala index 930e5c1454..302c461f95 100644 --- a/akka-testkit/src/test/scala/akka/testkit/TestProbeSpec.scala +++ b/akka-testkit/src/test/scala/akka/testkit/TestProbeSpec.scala @@ -4,7 +4,6 @@ import org.scalatest.WordSpec import org.scalatest.matchers.MustMatchers import org.scalatest.{ BeforeAndAfterEach, WordSpec } import akka.actor._ -import akka.event.EventHandler import akka.dispatch.Future import akka.util.duration._ diff --git a/akka-tutorials/akka-tutorial-second/src/main/scala/Pi.scala b/akka-tutorials/akka-tutorial-second/src/main/scala/Pi.scala index 67841c7a60..09b9246ad6 100644 --- a/akka-tutorials/akka-tutorial-second/src/main/scala/Pi.scala +++ b/akka-tutorials/akka-tutorial-second/src/main/scala/Pi.scala @@ -5,7 +5,7 @@ package akka.tutorial.second import akka.actor.Actor._ -import akka.event.EventHandler +import akka.event.Logging import System.{ currentTimeMillis ⇒ now } import akka.routing.Routing.Broadcast import akka.actor.{ Timeout, Channel, Actor, PoisonPill } @@ -15,6 +15,7 @@ import akka.AkkaApplication object Pi extends App { val app = AkkaApplication() + val log = Logging(app, this) calculate(nrOfWorkers = 4, nrOfElements = 10000, nrOfMessages = 10000) @@ -110,9 +111,9 @@ object Pi extends App { master.?(Calculate, Timeout(60000)). await.resultOrException match { //wait for the result, with a 60 seconds timeout case Some(pi) ⇒ - app.eventHandler.info(this, "\n\tPi estimate: \t\t%s\n\tCalculation time: \t%s millis".format(pi, (now - start))) + log.info("\n\tPi estimate: \t\t{}\n\tCalculation time: \t{} millis", pi, now - start) case None ⇒ - app.eventHandler.error(this, "Pi calculation did not complete within the timeout.") + log.error("Pi calculation did not complete within the timeout.") } } } diff --git a/config/akka-reference.conf b/config/akka-reference.conf index 0a5fcb9fa8..6b11df0560 100644 --- a/config/akka-reference.conf +++ b/config/akka-reference.conf @@ -14,8 +14,9 @@ akka { time-unit = "seconds" # Time unit for all timeout properties throughout the config - event-handlers = ["akka.event.EventHandler$DefaultListener"] # Event handlers to register at boot time (EventHandler$DefaultListener logs to STDOUT) - event-handler-level = "INFO" # Options: ERROR, WARNING, INFO, DEBUG + event-handlers = ["akka.event.Logging$DefaultLogger"] # Event handlers to register at boot time (Logging$DefaultLogger logs to STDOUT) + loglevel = "INFO" # Options: ERROR, WARNING, INFO, DEBUG + stdout-loglevel = "WARNING" # Loglevel for the very basic logger activated during AkkaApplication startup event-handler-dispatcher { type = "Dispatcher" # Must be one of the following diff --git a/config/akka.test.conf b/config/akka.test.conf index 4755e1f11f..8e21a7d184 100644 --- a/config/akka.test.conf +++ b/config/akka.test.conf @@ -6,7 +6,7 @@ include "akka-reference.conf" akka { event-handlers = ["akka.testkit.TestEventListener"] - event-handler-level = "WARNING" + loglevel = "WARNING" actor { default-dispatcher { core-pool-size = 4