From 53353d70310be9a41392a2511a4d744062f6b00f Mon Sep 17 00:00:00 2001 From: Roland Date: Thu, 10 Nov 2011 20:48:50 +0100 Subject: [PATCH] rename MainBus to EventStream (incl. field in ActorSystem) --- .../test/scala/akka/actor/FSMActorSpec.scala | 8 +++--- .../scala/akka/actor/LoggingReceiveSpec.scala | 20 ++++++------- .../akka/actor/RestartStrategySpec.scala | 2 +- .../scala/akka/actor/SupervisorSpec.scala | 2 +- .../akka/actor/dispatch/ActorModelSpec.scala | 6 ++-- ...ainBusSpec.scala => EventStreamSpec.scala} | 8 +++--- .../src/main/scala/akka/actor/Actor.scala | 6 ++-- .../src/main/scala/akka/actor/ActorCell.scala | 28 +++++++++---------- .../src/main/scala/akka/actor/ActorRef.scala | 4 +-- .../scala/akka/actor/ActorRefProvider.scala | 2 +- .../main/scala/akka/actor/ActorSystem.scala | 14 +++++----- .../src/main/scala/akka/actor/Deployer.scala | 2 +- .../src/main/scala/akka/actor/FSM.scala | 2 +- .../akka/dispatch/AbstractDispatcher.scala | 2 +- .../main/scala/akka/dispatch/Dispatcher.scala | 4 +-- .../src/main/scala/akka/dispatch/Future.scala | 14 +++++----- .../main/scala/akka/dispatch/Mailbox.scala | 2 +- .../akka/dispatch/ThreadPoolBuilder.scala | 4 +-- .../{MainBus.scala => EventStream.scala} | 2 +- .../src/main/scala/akka/event/Logging.scala | 2 +- .../scala/akka/remote/RemoteInterface.scala | 2 +- akka-actor/src/main/scala/akka/util/JMX.scala | 4 +-- akka-docs/scala/code/ActorDocSpec.scala | 8 +++--- .../src/main/scala/akka/remote/Remote.scala | 6 ++-- .../test/UntypedCoordinatedIncrementTest.java | 4 +-- .../test/UntypedTransactorTest.java | 4 +-- .../testkit/CallingThreadDispatcher.scala | 4 +-- .../akka/testkit/TestEventListener.scala | 4 +-- .../src/main/scala/akka/testkit/package.scala | 4 +-- .../test/scala/akka/testkit/AkkaSpec.scala | 2 +- 30 files changed, 88 insertions(+), 88 deletions(-) rename akka-actor-tests/src/test/scala/akka/event/{MainBusSpec.scala => EventStreamSpec.scala} (93%) rename akka-actor/src/main/scala/akka/event/{MainBus.scala => EventStream.scala} (94%) diff --git a/akka-actor-tests/src/test/scala/akka/actor/FSMActorSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/FSMActorSpec.scala index ebd1925a2f..2ba83a9971 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/FSMActorSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/FSMActorSpec.scala @@ -167,12 +167,12 @@ class FSMActorSpec extends AkkaSpec(Configuration("akka.actor.debug.fsm" -> true } }) filterException[Logging.EventHandlerException] { - app.mainbus.subscribe(testActor, classOf[Logging.Error]) + app.eventStream.subscribe(testActor, classOf[Logging.Error]) fsm ! "go" expectMsgPF(1 second, hint = "Next state 2 does not exist") { case Logging.Error(_, `fsm`, "Next state 2 does not exist") ⇒ true } - app.mainbus.unsubscribe(testActor) + app.eventStream.unsubscribe(testActor) } } @@ -213,7 +213,7 @@ class FSMActorSpec extends AkkaSpec(Configuration("akka.actor.debug.fsm" -> true case StopEvent(r, _, _) ⇒ testActor ! r } }) - app.mainbus.subscribe(testActor, classOf[Logging.Debug]) + app.eventStream.subscribe(testActor, classOf[Logging.Debug]) fsm ! "go" expectMsgPF(1 second, hint = "processing Event(go,null)") { case Logging.Debug(`fsm`, s: String) if s.startsWith("processing Event(go,null) from Actor[" + app.address + "/sys/testActor") ⇒ true @@ -226,7 +226,7 @@ class FSMActorSpec extends AkkaSpec(Configuration("akka.actor.debug.fsm" -> true } expectMsgAllOf(1 second, Logging.Debug(fsm, "canceling timer 't'"), Normal) expectNoMsg(1 second) - app.mainbus.unsubscribe(testActor) + app.eventStream.unsubscribe(testActor) } } } diff --git a/akka-actor-tests/src/test/scala/akka/actor/LoggingReceiveSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/LoggingReceiveSpec.scala index 6ae806aafc..0da39b9ddf 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/LoggingReceiveSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/LoggingReceiveSpec.scala @@ -33,9 +33,9 @@ class LoggingReceiveSpec extends WordSpec with BeforeAndAfterEach with BeforeAnd case _: Logging.Info ⇒ true case _ ⇒ false }) - appLogging.mainbus.publish(filter) - appAuto.mainbus.publish(filter) - appLifecycle.mainbus.publish(filter) + appLogging.eventStream.publish(filter) + appAuto.eventStream.publish(filter) + appLifecycle.eventStream.publish(filter) def ignoreMute(t: TestKit) { t.ignoreMsg { @@ -53,7 +53,7 @@ class LoggingReceiveSpec extends WordSpec with BeforeAndAfterEach with BeforeAnd "decorate a Receive" in { new TestKit(appLogging) { - app.mainbus.subscribe(testActor, classOf[Logging.Debug]) + app.eventStream.subscribe(testActor, classOf[Logging.Debug]) val r: Actor.Receive = { case null ⇒ } @@ -66,8 +66,8 @@ class LoggingReceiveSpec extends WordSpec with BeforeAndAfterEach with BeforeAnd "be added on Actor if requested" in { new TestKit(appLogging) with ImplicitSender { ignoreMute(this) - app.mainbus.subscribe(testActor, classOf[Logging.Debug]) - app.mainbus.subscribe(testActor, classOf[Logging.Error]) + app.eventStream.subscribe(testActor, classOf[Logging.Debug]) + app.eventStream.subscribe(testActor, classOf[Logging.Error]) val actor = TestActorRef(new Actor { def receive = loggable(this) { case _ ⇒ sender ! "x" @@ -95,7 +95,7 @@ class LoggingReceiveSpec extends WordSpec with BeforeAndAfterEach with BeforeAnd "not duplicate logging" in { new TestKit(appLogging) with ImplicitSender { - app.mainbus.subscribe(testActor, classOf[Logging.Debug]) + app.eventStream.subscribe(testActor, classOf[Logging.Debug]) val actor = TestActorRef(new Actor { def receive = loggable(this)(loggable(this) { case _ ⇒ sender ! "x" @@ -115,7 +115,7 @@ class LoggingReceiveSpec extends WordSpec with BeforeAndAfterEach with BeforeAnd "log AutoReceiveMessages if requested" in { new TestKit(appAuto) { - app.mainbus.subscribe(testActor, classOf[Logging.Debug]) + app.eventStream.subscribe(testActor, classOf[Logging.Debug]) val actor = TestActorRef(new Actor { def receive = { case _ ⇒ @@ -135,8 +135,8 @@ class LoggingReceiveSpec extends WordSpec with BeforeAndAfterEach with BeforeAnd val s = ref.toString s.contains("MainBusReaper") || s.contains("Supervisor") } - app.mainbus.subscribe(testActor, classOf[Logging.Debug]) - app.mainbus.subscribe(testActor, classOf[Logging.Error]) + app.eventStream.subscribe(testActor, classOf[Logging.Debug]) + app.eventStream.subscribe(testActor, classOf[Logging.Error]) within(3 seconds) { val lifecycleGuardian = appLifecycle.guardian val supervisor = TestActorRef[TestLogActor](Props[TestLogActor].withFaultHandler(OneForOneStrategy(List(classOf[Throwable]), 5, 5000))) diff --git a/akka-actor-tests/src/test/scala/akka/actor/RestartStrategySpec.scala b/akka-actor-tests/src/test/scala/akka/actor/RestartStrategySpec.scala index 027e614c49..f967e9c5f3 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/RestartStrategySpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/RestartStrategySpec.scala @@ -16,7 +16,7 @@ import akka.testkit.AkkaSpec class RestartStrategySpec extends AkkaSpec { override def atStartup { - app.mainbus.publish(Mute(EventFilter[Exception]("Crashing..."))) + app.eventStream.publish(Mute(EventFilter[Exception]("Crashing..."))) } object Ping diff --git a/akka-actor-tests/src/test/scala/akka/actor/SupervisorSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/SupervisorSpec.scala index 6c09a2ab80..dd20c236cb 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/SupervisorSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/SupervisorSpec.scala @@ -122,7 +122,7 @@ class SupervisorSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSende } override def atStartup() { - app.mainbus.publish(Mute(EventFilter[RuntimeException](ExceptionMessage))) + app.eventStream.publish(Mute(EventFilter[RuntimeException](ExceptionMessage))) } override def beforeEach() = { diff --git a/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala index 3bd02716a1..8d1230a3df 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala @@ -155,7 +155,7 @@ object ActorModelSpec { await(deadline)(stops == dispatcher.stops.get) } catch { case e ⇒ - app.mainbus.publish(Error(e, dispatcher, "actual: starts=" + dispatcher.starts.get + ",stops=" + dispatcher.stops.get + + app.eventStream.publish(Error(e, dispatcher, "actual: starts=" + dispatcher.starts.get + ",stops=" + dispatcher.stops.get + " required: starts=" + starts + ",stops=" + stops)) throw e } @@ -212,7 +212,7 @@ object ActorModelSpec { await(deadline)(stats.restarts.get() == restarts) } catch { case e ⇒ - app.mainbus.publish(Error(e, dispatcher, "actual: " + stats + ", required: InterceptorStats(susp=" + suspensions + + app.eventStream.publish(Error(e, dispatcher, "actual: " + stats + ", required: InterceptorStats(susp=" + suspensions + ",res=" + resumes + ",reg=" + registers + ",unreg=" + unregisters + ",recv=" + msgsReceived + ",proc=" + msgsProcessed + ",restart=" + restarts)) throw e @@ -318,7 +318,7 @@ abstract class ActorModelSpec extends AkkaSpec { try { f } catch { - case e ⇒ app.mainbus.publish(Error(e, this, "error in spawned thread")) + case e ⇒ app.eventStream.publish(Error(e, this, "error in spawned thread")) } } } diff --git a/akka-actor-tests/src/test/scala/akka/event/MainBusSpec.scala b/akka-actor-tests/src/test/scala/akka/event/EventStreamSpec.scala similarity index 93% rename from akka-actor-tests/src/test/scala/akka/event/MainBusSpec.scala rename to akka-actor-tests/src/test/scala/akka/event/EventStreamSpec.scala index 2116f49151..04888b61a0 100644 --- a/akka-actor-tests/src/test/scala/akka/event/MainBusSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/event/EventStreamSpec.scala @@ -29,7 +29,7 @@ object MainBusSpec { class C extends B1 } -class MainBusSpec extends AkkaSpec(Configuration( +class EventStreamSpec extends AkkaSpec(Configuration( "akka.stdout-loglevel" -> "WARNING", "akka.loglevel" -> "INFO", "akka.event-handlers" -> Seq("akka.event.MainBusSpec$MyLog", Logging.StandardOutLoggerName))) { @@ -39,7 +39,7 @@ class MainBusSpec extends AkkaSpec(Configuration( "A MainBus" must { "manage subscriptions" in { - val bus = new MainBus(true) + val bus = new EventStream(true) bus.start(app) bus.subscribe(testActor, classOf[M]) bus.publish(M(42)) @@ -52,7 +52,7 @@ class MainBusSpec extends AkkaSpec(Configuration( } "manage log levels" in { - val bus = new MainBus(false) + val bus = new EventStream(false) bus.start(app) bus.startDefaultLoggers(app, app.AkkaConfig) bus.publish(SetTarget(testActor)) @@ -73,7 +73,7 @@ class MainBusSpec extends AkkaSpec(Configuration( val b1 = new B1 val b2 = new B2 val c = new C - val bus = new MainBus(false) + val bus = new EventStream(false) bus.start(app) within(2 seconds) { bus.subscribe(testActor, classOf[B2]) === true diff --git a/akka-actor/src/main/scala/akka/actor/Actor.scala b/akka-actor/src/main/scala/akka/actor/Actor.scala index e7d7d66228..88dfcbfec6 100644 --- a/akka-actor/src/main/scala/akka/actor/Actor.scala +++ b/akka-actor/src/main/scala/akka/actor/Actor.scala @@ -155,7 +155,7 @@ object Timeout { } trait ActorLogging { this: Actor ⇒ - val log = akka.event.Logging(app.mainbus, context.self) + val log = akka.event.Logging(app.eventStream, context.self) } object Actor { @@ -168,7 +168,7 @@ object Actor { class LoggingReceive(source: AnyRef, r: Receive)(implicit app: ActorSystem) extends Receive { def isDefinedAt(o: Any) = { val handled = r.isDefinedAt(o) - app.mainbus.publish(Debug(source, "received " + (if (handled) "handled" else "unhandled") + " message " + o)) + app.eventStream.publish(Debug(source, "received " + (if (handled) "handled" else "unhandled") + " message " + o)) handled } def apply(o: Any): Unit = r(o) @@ -414,7 +414,7 @@ trait Actor { private[akka] final def apply(msg: Any) = { def autoReceiveMessage(msg: AutoReceivedMessage) { - if (app.AkkaConfig.DebugAutoReceive) app.mainbus.publish(Debug(this, "received AutoReceiveMessage " + msg)) + if (app.AkkaConfig.DebugAutoReceive) app.eventStream.publish(Debug(this, "received AutoReceiveMessage " + msg)) msg match { case HotSwap(code, discardOld) ⇒ become(code(self), discardOld) diff --git a/akka-actor/src/main/scala/akka/actor/ActorCell.scala b/akka-actor/src/main/scala/akka/actor/ActorCell.scala index 9ec863974f..15479761f2 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorCell.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorCell.scala @@ -167,11 +167,11 @@ private[akka] class ActorCell( actor = created created.preStart() checkReceiveTimeout - if (app.AkkaConfig.DebugLifecycle) app.mainbus.publish(Debug(self, "started (" + actor + ")")) + if (app.AkkaConfig.DebugLifecycle) app.eventStream.publish(Debug(self, "started (" + actor + ")")) } catch { case e ⇒ try { - app.mainbus.publish(Error(e, self, "error while creating actor")) + app.eventStream.publish(Error(e, self, "error while creating actor")) // prevent any further messages to be processed until the actor has been restarted dispatcher.suspend(this) } finally { @@ -181,7 +181,7 @@ private[akka] class ActorCell( def recreate(cause: Throwable): Unit = try { val failedActor = actor - if (app.AkkaConfig.DebugLifecycle) app.mainbus.publish(Debug(self, "restarting")) + if (app.AkkaConfig.DebugLifecycle) app.eventStream.publish(Debug(self, "restarting")) val freshActor = newActor() if (failedActor ne null) { val c = currentMessage //One read only plz @@ -195,14 +195,14 @@ private[akka] class ActorCell( } actor = freshActor // assign it here so if preStart fails, we can null out the sef-refs next call freshActor.postRestart(cause) - if (app.AkkaConfig.DebugLifecycle) app.mainbus.publish(Debug(self, "restarted")) + if (app.AkkaConfig.DebugLifecycle) app.eventStream.publish(Debug(self, "restarted")) dispatcher.resume(this) //FIXME should this be moved down? props.faultHandler.handleSupervisorRestarted(cause, self, children) } catch { case e ⇒ try { - app.mainbus.publish(Error(e, self, "error while creating actor")) + app.eventStream.publish(Error(e, self, "error while creating actor")) // prevent any further messages to be processed until the actor has been restarted dispatcher.suspend(this) } finally { @@ -223,7 +223,7 @@ private[akka] class ActorCell( try { try { val a = actor - if (app.AkkaConfig.DebugLifecycle) app.mainbus.publish(Debug(self, "stopping")) + if (app.AkkaConfig.DebugLifecycle) app.eventStream.publish(Debug(self, "stopping")) if (a ne null) a.postStop() } finally { //Stop supervised actors @@ -250,8 +250,8 @@ private[akka] class ActorCell( if (!stats.contains(child)) { childrenRefs = childrenRefs.updated(child.name, child) childrenStats = childrenStats.updated(child, ChildRestartStats()) - if (app.AkkaConfig.DebugLifecycle) app.mainbus.publish(Debug(self, "now supervising " + child)) - } else app.mainbus.publish(Warning(self, "Already supervising " + child)) + if (app.AkkaConfig.DebugLifecycle) app.eventStream.publish(Debug(self, "now supervising " + child)) + } else app.eventStream.publish(Warning(self, "Already supervising " + child)) } try { @@ -262,10 +262,10 @@ private[akka] class ActorCell( case Recreate(cause) ⇒ recreate(cause) case Link(subject) ⇒ app.deathWatch.subscribe(self, subject) - if (app.AkkaConfig.DebugLifecycle) app.mainbus.publish(Debug(self, "now monitoring " + subject)) + if (app.AkkaConfig.DebugLifecycle) app.eventStream.publish(Debug(self, "now monitoring " + subject)) case Unlink(subject) ⇒ app.deathWatch.unsubscribe(self, subject) - if (app.AkkaConfig.DebugLifecycle) app.mainbus.publish(Debug(self, "stopped monitoring " + subject)) + if (app.AkkaConfig.DebugLifecycle) app.eventStream.publish(Debug(self, "stopped monitoring " + subject)) case Suspend() ⇒ suspend() case Resume() ⇒ resume() case Terminate() ⇒ terminate() @@ -274,7 +274,7 @@ private[akka] class ActorCell( } } catch { case e ⇒ //Should we really catch everything here? - app.mainbus.publish(Error(e, self, "error while processing " + message)) + app.eventStream.publish(Error(e, self, "error while processing " + message)) //TODO FIXME How should problems here be handled? throw e } @@ -293,7 +293,7 @@ private[akka] class ActorCell( currentMessage = null // reset current message after successful invocation } catch { case e ⇒ - app.mainbus.publish(Error(e, self, e.getMessage)) + app.eventStream.publish(Error(e, self, e.getMessage)) // prevent any further messages to be processed until the actor has been restarted dispatcher.suspend(this) @@ -313,7 +313,7 @@ private[akka] class ActorCell( } } catch { case e ⇒ - app.mainbus.publish(Error(e, self, e.getMessage)) + app.eventStream.publish(Error(e, self, e.getMessage)) throw e } } @@ -322,7 +322,7 @@ private[akka] class ActorCell( final def handleFailure(fail: Failed): Unit = childrenStats.get(fail.actor) match { case Some(stats) ⇒ if (!props.faultHandler.handleFailure(fail, stats, childrenStats)) throw fail.cause - case None ⇒ app.mainbus.publish(Warning(self, "dropping " + fail + " from unknown child")) + case None ⇒ app.eventStream.publish(Warning(self, "dropping " + fail + " from unknown child")) } final def handleChildTerminated(child: ActorRef): Unit = { diff --git a/akka-actor/src/main/scala/akka/actor/ActorRef.scala b/akka-actor/src/main/scala/akka/actor/ActorRef.scala index c436f17d43..cf572490fb 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRef.scala @@ -384,10 +384,10 @@ class DeadLetterActorRef(val app: ActorSystem) extends MinimalActorRef { override def isShutdown(): Boolean = true protected[akka] override def postMessageToMailbox(message: Any, sender: ActorRef): Unit = - app.mainbus.publish(DeadLetter(message, sender)) + app.eventStream.publish(DeadLetter(message, sender)) override def ?(message: Any)(implicit timeout: Timeout): Future[Any] = { - app.mainbus.publish(DeadLetter(message, this)) + app.eventStream.publish(DeadLetter(message, this)) brokenPromise } diff --git a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala index 49a38d7bcd..d448c5162b 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala @@ -103,7 +103,7 @@ class ActorRefProviderException(message: String) extends AkkaException(message) */ class LocalActorRefProvider(val app: ActorSystem) extends ActorRefProvider { - val log = Logging(app.mainbus, this) + val log = Logging(app.eventStream, this) private[akka] val deployer: Deployer = new Deployer(app) diff --git a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala index 3c6ab39158..734973cc13 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala @@ -101,7 +101,7 @@ class ActorSystem(val name: String, val config: Configuration) extends ActorRefF val DebugAutoReceive = getBool("akka.actor.debug.autoreceive", false) val DebugLifecycle = getBool("akka.actor.debug.lifecycle", false) val FsmDebugEvent = getBool("akka.actor.debug.fsm", false) - val DebugMainBus = getBool("akka.actor.debug.mainbus", false) + val DebugMainBus = getBool("akka.actor.debug.eventStream", false) val DispatcherThroughput = getInt("akka.actor.throughput", 5) val DispatcherDefaultShutdown = getLong("akka.actor.dispatcher-shutdown-timeout"). @@ -161,9 +161,9 @@ class ActorSystem(val name: String, val config: Configuration) extends ActorRefF }) // this provides basic logging (to stdout) until .start() is called below - val mainbus = new MainBus(DebugMainBus) - mainbus.startStdoutLogger(AkkaConfig) - val log = new BusLogging(mainbus, this) + val eventStream = new EventStream(DebugMainBus) + eventStream.startStdoutLogger(AkkaConfig) + val log = new BusLogging(eventStream, this) // TODO correctly pull its config from the config val dispatcherFactory = new Dispatchers(this) @@ -193,7 +193,7 @@ class ActorSystem(val name: String, val config: Configuration) extends ActorRefF private class SystemGuardian extends Actor { def receive = { case Terminated(_) ⇒ - mainbus.stopDefaultLoggers() + eventStream.stopDefaultLoggers() context.self.stop() } } @@ -226,8 +226,8 @@ class ActorSystem(val name: String, val config: Configuration) extends ActorRefF deathWatch.subscribe(rootGuardian, systemGuardian) // this starts the reaper actor and the user-configured logging subscribers, which are also actors - mainbus.start(this) - mainbus.startDefaultLoggers(this, AkkaConfig) + eventStream.start(this) + eventStream.startDefaultLoggers(this, AkkaConfig) // TODO think about memory consistency effects when doing funky stuff inside an ActorRefProvider's constructor val deployer = new Deployer(this) diff --git a/akka-actor/src/main/scala/akka/actor/Deployer.scala b/akka-actor/src/main/scala/akka/actor/Deployer.scala index f35bc35e5a..8005d9c631 100644 --- a/akka-actor/src/main/scala/akka/actor/Deployer.scala +++ b/akka-actor/src/main/scala/akka/actor/Deployer.scala @@ -36,7 +36,7 @@ trait ActorDeployer { class Deployer(val app: ActorSystem) extends ActorDeployer { val deploymentConfig = new DeploymentConfig(app) - val log = Logging(app.mainbus, this) + val log = Logging(app.eventStream, this) val instance: ActorDeployer = { val deployer = new LocalDeployer() diff --git a/akka-actor/src/main/scala/akka/actor/FSM.scala b/akka-actor/src/main/scala/akka/actor/FSM.scala index 5bdb782114..d4b65ba453 100644 --- a/akka-actor/src/main/scala/akka/actor/FSM.scala +++ b/akka-actor/src/main/scala/akka/actor/FSM.scala @@ -188,7 +188,7 @@ trait FSM[S, D] extends ListenerManagement { type Timeout = Option[Duration] type TransitionHandler = PartialFunction[(S, S), Unit] - val log = Logging(app.mainbus, context.self) + val log = Logging(app.eventStream, context.self) /** * **************************************** diff --git a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala index 0b6927ce41..144110db4b 100644 --- a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala @@ -66,7 +66,7 @@ final case class TaskInvocation(app: ActorSystem, function: () ⇒ Unit, cleanup try { function() } catch { - case e ⇒ app.mainbus.publish(Error(e, this, e.getMessage)) + case e ⇒ app.eventStream.publish(Error(e, this, e.getMessage)) } finally { cleanup() } diff --git a/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala index e5adf00c92..065c2b4528 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala @@ -93,7 +93,7 @@ class Dispatcher( executorService.get() execute invocation } catch { case e: RejectedExecutionException ⇒ - app.mainbus.publish(Warning(this, e.toString)) + app.eventStream.publish(Warning(this, e.toString)) throw e } } @@ -120,7 +120,7 @@ class Dispatcher( } catch { case e: RejectedExecutionException ⇒ try { - app.mainbus.publish(Warning(this, e.toString)) + app.eventStream.publish(Warning(this, e.toString)) } finally { mbox.setAsIdle() } diff --git a/akka-actor/src/main/scala/akka/dispatch/Future.scala b/akka-actor/src/main/scala/akka/dispatch/Future.scala index 945c3df5b1..990f3832f1 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Future.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Future.scala @@ -262,7 +262,7 @@ object Future { result completeWithResult currentValue } catch { case e: Exception ⇒ - dispatcher.app.mainbus.publish(Error(e, this, e.getMessage)) + dispatcher.app.eventStream.publish(Error(e, this, e.getMessage)) result completeWithException e } finally { results.clear @@ -631,7 +631,7 @@ sealed trait Future[+T] extends japi.Future[T] { Right(f(res)) } catch { case e: Exception ⇒ - dispatcher.app.mainbus.publish(Error(e, this, e.getMessage)) + dispatcher.app.eventStream.publish(Error(e, this, e.getMessage)) Left(e) }) } @@ -683,7 +683,7 @@ sealed trait Future[+T] extends japi.Future[T] { future.completeWith(f(r)) } catch { case e: Exception ⇒ - dispatcher.app.mainbus.publish(Error(e, this, e.getMessage)) + dispatcher.app.eventStream.publish(Error(e, this, e.getMessage)) future complete Left(e) } } @@ -716,7 +716,7 @@ sealed trait Future[+T] extends japi.Future[T] { if (p(res)) r else Left(new MatchError(res)) } catch { case e: Exception ⇒ - dispatcher.app.mainbus.publish(Error(e, this, e.getMessage)) + dispatcher.app.eventStream.publish(Error(e, this, e.getMessage)) Left(e) }) } @@ -811,7 +811,7 @@ trait Promise[T] extends Future[T] { fr completeWith cont(f) } catch { case e: Exception ⇒ - dispatcher.app.mainbus.publish(Error(e, this, e.getMessage)) + dispatcher.app.eventStream.publish(Error(e, this, e.getMessage)) fr completeWithException e } } @@ -825,7 +825,7 @@ trait Promise[T] extends Future[T] { fr completeWith cont(f) } catch { case e: Exception ⇒ - dispatcher.app.mainbus.publish(Error(e, this, e.getMessage)) + dispatcher.app.eventStream.publish(Error(e, this, e.getMessage)) fr completeWithException e } } @@ -1017,7 +1017,7 @@ class DefaultPromise[T](val timeout: Timeout)(implicit val dispatcher: MessageDi } else this private def notifyCompleted(func: Future[T] ⇒ Unit) { - try { func(this) } catch { case e ⇒ dispatcher.app.mainbus.publish(Error(e, this, "Future onComplete-callback raised an exception")) } //TODO catch, everything? Really? + try { func(this) } catch { case e ⇒ dispatcher.app.eventStream.publish(Error(e, this, "Future onComplete-callback raised an exception")) } //TODO catch, everything? Really? } @inline diff --git a/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala b/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala index 04dd2f8f0f..026ad67875 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala @@ -205,7 +205,7 @@ abstract class Mailbox(val actor: ActorCell) extends AbstractMailbox with Messag } } catch { case e ⇒ - actor.app.mainbus.publish(Error(e, this, "exception during processing system messages, dropping " + SystemMessage.size(nextMessage) + " messages!")) + actor.app.eventStream.publish(Error(e, this, "exception during processing system messages, dropping " + SystemMessage.size(nextMessage) + " messages!")) throw e } } diff --git a/akka-actor/src/main/scala/akka/dispatch/ThreadPoolBuilder.scala b/akka-actor/src/main/scala/akka/dispatch/ThreadPoolBuilder.scala index d4277f7212..b1a9547ccf 100644 --- a/akka-actor/src/main/scala/akka/dispatch/ThreadPoolBuilder.scala +++ b/akka-actor/src/main/scala/akka/dispatch/ThreadPoolBuilder.scala @@ -227,10 +227,10 @@ class BoundedExecutorDecorator(val app: ActorSystem, val executor: ExecutorServi }) } catch { case e: RejectedExecutionException ⇒ - app.mainbus.publish(Warning(this, e.toString)) + app.eventStream.publish(Warning(this, e.toString)) semaphore.release case e: Throwable ⇒ - app.mainbus.publish(Error(e, this, e.getMessage)) + app.eventStream.publish(Error(e, this, e.getMessage)) throw e } } diff --git a/akka-actor/src/main/scala/akka/event/MainBus.scala b/akka-actor/src/main/scala/akka/event/EventStream.scala similarity index 94% rename from akka-actor/src/main/scala/akka/event/MainBus.scala rename to akka-actor/src/main/scala/akka/event/EventStream.scala index 0bf1443e9c..d64fc0a6ce 100644 --- a/akka-actor/src/main/scala/akka/event/MainBus.scala +++ b/akka-actor/src/main/scala/akka/event/EventStream.scala @@ -8,7 +8,7 @@ import akka.actor.ActorSystem import akka.actor.Terminated import akka.util.Subclassification -class MainBus(debug: Boolean = false) extends LoggingBus with SubchannelClassification { +class EventStream(debug: Boolean = false) extends LoggingBus with SubchannelClassification { type Event = AnyRef type Classifier = Class[_] diff --git a/akka-actor/src/main/scala/akka/event/Logging.scala b/akka-actor/src/main/scala/akka/event/Logging.scala index 425d3deafe..641271f5c0 100644 --- a/akka-actor/src/main/scala/akka/event/Logging.scala +++ b/akka-actor/src/main/scala/akka/event/Logging.scala @@ -220,7 +220,7 @@ object Logging { * Obtain LoggingAdapter for the given application and source object. The * source object is used to identify the source of this logging channel. */ - def apply(app: ActorSystem, source: AnyRef): LoggingAdapter = new BusLogging(app.mainbus, source) + def apply(app: ActorSystem, source: AnyRef): LoggingAdapter = new BusLogging(app.eventStream, source) /** * Java API: Obtain LoggingAdapter for the given application and source object. The * source object is used to identify the source of this logging channel. diff --git a/akka-actor/src/main/scala/akka/remote/RemoteInterface.scala b/akka-actor/src/main/scala/akka/remote/RemoteInterface.scala index b6e8006e74..d5837c95b1 100644 --- a/akka-actor/src/main/scala/akka/remote/RemoteInterface.scala +++ b/akka-actor/src/main/scala/akka/remote/RemoteInterface.scala @@ -162,7 +162,7 @@ abstract class RemoteSupport(val app: ActorSystem) { recipient: ActorRef, loader: Option[ClassLoader]): Unit - protected[akka] def notifyListeners(message: RemoteLifeCycleEvent): Unit = app.mainbus.publish(message) + protected[akka] def notifyListeners(message: RemoteLifeCycleEvent): Unit = app.eventStream.publish(message) override def toString = name } diff --git a/akka-actor/src/main/scala/akka/util/JMX.scala b/akka-actor/src/main/scala/akka/util/JMX.scala index f5baf3d0e2..1c7465882b 100644 --- a/akka-actor/src/main/scala/akka/util/JMX.scala +++ b/akka-actor/src/main/scala/akka/util/JMX.scala @@ -24,7 +24,7 @@ object JMX { case e: InstanceAlreadyExistsException ⇒ Some(mbeanServer.getObjectInstance(name)) case e: Exception ⇒ - app.mainbus.publish(Error(e, this, "Error when registering mbean [%s]".format(mbean))) + app.eventStream.publish(Error(e, this, "Error when registering mbean [%s]".format(mbean))) None } @@ -32,6 +32,6 @@ object JMX { mbeanServer.unregisterMBean(mbean) } catch { case e: InstanceNotFoundException ⇒ {} - case e: Exception ⇒ app.mainbus.publish(Error(e, this, "Error while unregistering mbean [%s]".format(mbean))) + case e: Exception ⇒ app.eventStream.publish(Error(e, this, "Error while unregistering mbean [%s]".format(mbean))) } } diff --git a/akka-docs/scala/code/ActorDocSpec.scala b/akka-docs/scala/code/ActorDocSpec.scala index 47f9fe0a89..fb2ccdde5b 100644 --- a/akka-docs/scala/code/ActorDocSpec.scala +++ b/akka-docs/scala/code/ActorDocSpec.scala @@ -36,8 +36,8 @@ class ActorDocSpec extends AkkaSpec(Configuration("akka.loglevel" -> "INFO")) { case e: Logging.Info ⇒ true case _ ⇒ false } - app.mainbus.publish(TestEvent.Mute(filter)) - app.mainbus.subscribe(testActor, classOf[Logging.Info]) + app.eventStream.publish(TestEvent.Mute(filter)) + app.eventStream.subscribe(testActor, classOf[Logging.Info]) myActor ! "test" expectMsgPF(1 second) { case Logging.Info(_, "received test") ⇒ true } @@ -45,8 +45,8 @@ class ActorDocSpec extends AkkaSpec(Configuration("akka.loglevel" -> "INFO")) { myActor ! "unknown" expectMsgPF(1 second) { case Logging.Info(_, "received unknown message") ⇒ true } - app.mainbus.unsubscribe(testActor) - app.mainbus.publish(TestEvent.UnMute(filter)) + app.eventStream.unsubscribe(testActor) + app.eventStream.publish(TestEvent.UnMute(filter)) myActor.stop() } diff --git a/akka-remote/src/main/scala/akka/remote/Remote.scala b/akka-remote/src/main/scala/akka/remote/Remote.scala index 74a37b1d56..a56e6be38a 100644 --- a/akka-remote/src/main/scala/akka/remote/Remote.scala +++ b/akka-remote/src/main/scala/akka/remote/Remote.scala @@ -74,8 +74,8 @@ class Remote(val app: ActorSystem) { val remote = new akka.remote.netty.NettyRemoteSupport(app) remote.start() //TODO FIXME Any application loader here? - app.mainbus.subscribe(eventStream.sender, classOf[RemoteLifeCycleEvent]) - app.mainbus.subscribe(remoteClientLifeCycleHandler, classOf[RemoteLifeCycleEvent]) + app.eventStream.subscribe(eventStream.sender, classOf[RemoteLifeCycleEvent]) + app.eventStream.subscribe(remoteClientLifeCycleHandler, classOf[RemoteLifeCycleEvent]) // TODO actually register this provider in app in remote mode //provider.register(ActorRefProvider.RemoteProvider, new RemoteActorRefProvider) @@ -256,7 +256,7 @@ class RemoteMessage(input: RemoteMessageProtocol, remote: RemoteSupport, classLo .newInstance(exception.getMessage).asInstanceOf[Throwable] } catch { case problem: Exception ⇒ - remote.app.mainbus.publish(Logging.Error(problem, remote, problem.getMessage)) + remote.app.eventStream.publish(Logging.Error(problem, remote, problem.getMessage)) CannotInstantiateRemoteExceptionDueToRemoteProtocolParsingErrorException(problem, classname, exception.getMessage) } } diff --git a/akka-stm/src/test/java/akka/transactor/test/UntypedCoordinatedIncrementTest.java b/akka-stm/src/test/java/akka/transactor/test/UntypedCoordinatedIncrementTest.java index 7e521f729c..de86642d52 100644 --- a/akka-stm/src/test/java/akka/transactor/test/UntypedCoordinatedIncrementTest.java +++ b/akka-stm/src/test/java/akka/transactor/test/UntypedCoordinatedIncrementTest.java @@ -71,7 +71,7 @@ public class UntypedCoordinatedIncrementTest { EventFilter expectedFailureFilter = (EventFilter) new ErrorFilter(ExpectedFailureException.class); EventFilter coordinatedFilter = (EventFilter) new ErrorFilter(CoordinatedTransactionException.class); Seq ignoreExceptions = seq(expectedFailureFilter, coordinatedFilter); - application.mainbus().publish(new TestEvent.Mute(ignoreExceptions)); + application.eventStream().publish(new TestEvent.Mute(ignoreExceptions)); CountDownLatch incrementLatch = new CountDownLatch(numCounters); List actors = new ArrayList(counters); actors.add(failer); @@ -84,7 +84,7 @@ public class UntypedCoordinatedIncrementTest { Future future = counter.ask("GetCount", askTimeout); assertEquals(0, ((Integer)future.get()).intValue()); } - application.mainbus().publish(new TestEvent.UnMute(ignoreExceptions)); + application.eventStream().publish(new TestEvent.UnMute(ignoreExceptions)); } public Seq seq(A... args) { diff --git a/akka-stm/src/test/java/akka/transactor/test/UntypedTransactorTest.java b/akka-stm/src/test/java/akka/transactor/test/UntypedTransactorTest.java index b12d072769..e146fb3d21 100644 --- a/akka-stm/src/test/java/akka/transactor/test/UntypedTransactorTest.java +++ b/akka-stm/src/test/java/akka/transactor/test/UntypedTransactorTest.java @@ -75,7 +75,7 @@ public class UntypedTransactorTest { EventFilter expectedFailureFilter = (EventFilter) new ErrorFilter(ExpectedFailureException.class); EventFilter coordinatedFilter = (EventFilter) new ErrorFilter(CoordinatedTransactionException.class); Seq ignoreExceptions = seq(expectedFailureFilter, coordinatedFilter); - application.mainbus().publish(new TestEvent.Mute(ignoreExceptions)); + application.eventStream().publish(new TestEvent.Mute(ignoreExceptions)); CountDownLatch incrementLatch = new CountDownLatch(numCounters); List actors = new ArrayList(counters); actors.add(failer); @@ -96,7 +96,7 @@ public class UntypedTransactorTest { } } } - application.mainbus().publish(new TestEvent.UnMute(ignoreExceptions)); + application.eventStream().publish(new TestEvent.UnMute(ignoreExceptions)); } public Seq seq(A... args) { diff --git a/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala b/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala index 3d75938637..cb0d86099f 100644 --- a/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala +++ b/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala @@ -211,12 +211,12 @@ class CallingThreadDispatcher(_app: ActorSystem, val name: String = "calling-thr true } catch { case ie: InterruptedException ⇒ - app.mainbus.publish(Error(this, ie)) + app.eventStream.publish(Error(this, ie)) Thread.currentThread().interrupt() intex = ie true case e ⇒ - app.mainbus.publish(Error(this, e)) + app.eventStream.publish(Error(this, e)) queue.leave false } diff --git a/akka-testkit/src/main/scala/akka/testkit/TestEventListener.scala b/akka-testkit/src/main/scala/akka/testkit/TestEventListener.scala index 25bcf2ce17..5c3fa5334f 100644 --- a/akka-testkit/src/main/scala/akka/testkit/TestEventListener.scala +++ b/akka-testkit/src/main/scala/akka/testkit/TestEventListener.scala @@ -79,7 +79,7 @@ abstract class EventFilter(occurrences: Int) { * remove the filter when the block is finished or aborted. */ def intercept[T](code: ⇒ T)(implicit app: ActorSystem): T = { - app.mainbus publish TestEvent.Mute(this) + app.eventStream publish TestEvent.Mute(this) try { val result = code if (!awaitDone(app.AkkaConfig.TestEventFilterLeeway)) @@ -88,7 +88,7 @@ abstract class EventFilter(occurrences: Int) { else throw new AssertionError("Received " + (-todo) + " messages too many on " + this) result - } finally app.mainbus publish TestEvent.UnMute(this) + } finally app.eventStream publish TestEvent.UnMute(this) } /* diff --git a/akka-testkit/src/main/scala/akka/testkit/package.scala b/akka-testkit/src/main/scala/akka/testkit/package.scala index 9a11ae3861..a7f2a9c81c 100644 --- a/akka-testkit/src/main/scala/akka/testkit/package.scala +++ b/akka-testkit/src/main/scala/akka/testkit/package.scala @@ -8,7 +8,7 @@ package object testkit { def filterEvents[T](eventFilters: Iterable[EventFilter])(block: ⇒ T)(implicit app: ActorSystem): T = { def now = System.currentTimeMillis - app.mainbus.publish(TestEvent.Mute(eventFilters.toSeq)) + app.eventStream.publish(TestEvent.Mute(eventFilters.toSeq)) try { val result = block @@ -19,7 +19,7 @@ package object testkit { result } finally { - app.mainbus.publish(TestEvent.UnMute(eventFilters.toSeq)) + app.eventStream.publish(TestEvent.UnMute(eventFilters.toSeq)) } } diff --git a/akka-testkit/src/test/scala/akka/testkit/AkkaSpec.scala b/akka-testkit/src/test/scala/akka/testkit/AkkaSpec.scala index f7f87c49c4..86faf1df28 100644 --- a/akka-testkit/src/test/scala/akka/testkit/AkkaSpec.scala +++ b/akka-testkit/src/test/scala/akka/testkit/AkkaSpec.scala @@ -16,7 +16,7 @@ import akka.dispatch.FutureTimeoutException abstract class AkkaSpec(_application: ActorSystem = ActorSystem()) extends TestKit(_application) with WordSpec with MustMatchers with BeforeAndAfterAll { - val log: LoggingAdapter = Logging(app.mainbus, this) + val log: LoggingAdapter = Logging(app.eventStream, this) final override def beforeAll { atStartup()