rename MainBus to EventStream (incl. field in ActorSystem)
This commit is contained in:
parent
945b1aedf9
commit
53353d7031
30 changed files with 88 additions and 88 deletions
|
|
@ -167,12 +167,12 @@ class FSMActorSpec extends AkkaSpec(Configuration("akka.actor.debug.fsm" -> true
|
|||
}
|
||||
})
|
||||
filterException[Logging.EventHandlerException] {
|
||||
app.mainbus.subscribe(testActor, classOf[Logging.Error])
|
||||
app.eventStream.subscribe(testActor, classOf[Logging.Error])
|
||||
fsm ! "go"
|
||||
expectMsgPF(1 second, hint = "Next state 2 does not exist") {
|
||||
case Logging.Error(_, `fsm`, "Next state 2 does not exist") ⇒ true
|
||||
}
|
||||
app.mainbus.unsubscribe(testActor)
|
||||
app.eventStream.unsubscribe(testActor)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -213,7 +213,7 @@ class FSMActorSpec extends AkkaSpec(Configuration("akka.actor.debug.fsm" -> true
|
|||
case StopEvent(r, _, _) ⇒ testActor ! r
|
||||
}
|
||||
})
|
||||
app.mainbus.subscribe(testActor, classOf[Logging.Debug])
|
||||
app.eventStream.subscribe(testActor, classOf[Logging.Debug])
|
||||
fsm ! "go"
|
||||
expectMsgPF(1 second, hint = "processing Event(go,null)") {
|
||||
case Logging.Debug(`fsm`, s: String) if s.startsWith("processing Event(go,null) from Actor[" + app.address + "/sys/testActor") ⇒ true
|
||||
|
|
@ -226,7 +226,7 @@ class FSMActorSpec extends AkkaSpec(Configuration("akka.actor.debug.fsm" -> true
|
|||
}
|
||||
expectMsgAllOf(1 second, Logging.Debug(fsm, "canceling timer 't'"), Normal)
|
||||
expectNoMsg(1 second)
|
||||
app.mainbus.unsubscribe(testActor)
|
||||
app.eventStream.unsubscribe(testActor)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -33,9 +33,9 @@ class LoggingReceiveSpec extends WordSpec with BeforeAndAfterEach with BeforeAnd
|
|||
case _: Logging.Info ⇒ true
|
||||
case _ ⇒ false
|
||||
})
|
||||
appLogging.mainbus.publish(filter)
|
||||
appAuto.mainbus.publish(filter)
|
||||
appLifecycle.mainbus.publish(filter)
|
||||
appLogging.eventStream.publish(filter)
|
||||
appAuto.eventStream.publish(filter)
|
||||
appLifecycle.eventStream.publish(filter)
|
||||
|
||||
def ignoreMute(t: TestKit) {
|
||||
t.ignoreMsg {
|
||||
|
|
@ -53,7 +53,7 @@ class LoggingReceiveSpec extends WordSpec with BeforeAndAfterEach with BeforeAnd
|
|||
|
||||
"decorate a Receive" in {
|
||||
new TestKit(appLogging) {
|
||||
app.mainbus.subscribe(testActor, classOf[Logging.Debug])
|
||||
app.eventStream.subscribe(testActor, classOf[Logging.Debug])
|
||||
val r: Actor.Receive = {
|
||||
case null ⇒
|
||||
}
|
||||
|
|
@ -66,8 +66,8 @@ class LoggingReceiveSpec extends WordSpec with BeforeAndAfterEach with BeforeAnd
|
|||
"be added on Actor if requested" in {
|
||||
new TestKit(appLogging) with ImplicitSender {
|
||||
ignoreMute(this)
|
||||
app.mainbus.subscribe(testActor, classOf[Logging.Debug])
|
||||
app.mainbus.subscribe(testActor, classOf[Logging.Error])
|
||||
app.eventStream.subscribe(testActor, classOf[Logging.Debug])
|
||||
app.eventStream.subscribe(testActor, classOf[Logging.Error])
|
||||
val actor = TestActorRef(new Actor {
|
||||
def receive = loggable(this) {
|
||||
case _ ⇒ sender ! "x"
|
||||
|
|
@ -95,7 +95,7 @@ class LoggingReceiveSpec extends WordSpec with BeforeAndAfterEach with BeforeAnd
|
|||
|
||||
"not duplicate logging" in {
|
||||
new TestKit(appLogging) with ImplicitSender {
|
||||
app.mainbus.subscribe(testActor, classOf[Logging.Debug])
|
||||
app.eventStream.subscribe(testActor, classOf[Logging.Debug])
|
||||
val actor = TestActorRef(new Actor {
|
||||
def receive = loggable(this)(loggable(this) {
|
||||
case _ ⇒ sender ! "x"
|
||||
|
|
@ -115,7 +115,7 @@ class LoggingReceiveSpec extends WordSpec with BeforeAndAfterEach with BeforeAnd
|
|||
|
||||
"log AutoReceiveMessages if requested" in {
|
||||
new TestKit(appAuto) {
|
||||
app.mainbus.subscribe(testActor, classOf[Logging.Debug])
|
||||
app.eventStream.subscribe(testActor, classOf[Logging.Debug])
|
||||
val actor = TestActorRef(new Actor {
|
||||
def receive = {
|
||||
case _ ⇒
|
||||
|
|
@ -135,8 +135,8 @@ class LoggingReceiveSpec extends WordSpec with BeforeAndAfterEach with BeforeAnd
|
|||
val s = ref.toString
|
||||
s.contains("MainBusReaper") || s.contains("Supervisor")
|
||||
}
|
||||
app.mainbus.subscribe(testActor, classOf[Logging.Debug])
|
||||
app.mainbus.subscribe(testActor, classOf[Logging.Error])
|
||||
app.eventStream.subscribe(testActor, classOf[Logging.Debug])
|
||||
app.eventStream.subscribe(testActor, classOf[Logging.Error])
|
||||
within(3 seconds) {
|
||||
val lifecycleGuardian = appLifecycle.guardian
|
||||
val supervisor = TestActorRef[TestLogActor](Props[TestLogActor].withFaultHandler(OneForOneStrategy(List(classOf[Throwable]), 5, 5000)))
|
||||
|
|
|
|||
|
|
@ -16,7 +16,7 @@ import akka.testkit.AkkaSpec
|
|||
class RestartStrategySpec extends AkkaSpec {
|
||||
|
||||
override def atStartup {
|
||||
app.mainbus.publish(Mute(EventFilter[Exception]("Crashing...")))
|
||||
app.eventStream.publish(Mute(EventFilter[Exception]("Crashing...")))
|
||||
}
|
||||
|
||||
object Ping
|
||||
|
|
|
|||
|
|
@ -122,7 +122,7 @@ class SupervisorSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSende
|
|||
}
|
||||
|
||||
override def atStartup() {
|
||||
app.mainbus.publish(Mute(EventFilter[RuntimeException](ExceptionMessage)))
|
||||
app.eventStream.publish(Mute(EventFilter[RuntimeException](ExceptionMessage)))
|
||||
}
|
||||
|
||||
override def beforeEach() = {
|
||||
|
|
|
|||
|
|
@ -155,7 +155,7 @@ object ActorModelSpec {
|
|||
await(deadline)(stops == dispatcher.stops.get)
|
||||
} catch {
|
||||
case e ⇒
|
||||
app.mainbus.publish(Error(e, dispatcher, "actual: starts=" + dispatcher.starts.get + ",stops=" + dispatcher.stops.get +
|
||||
app.eventStream.publish(Error(e, dispatcher, "actual: starts=" + dispatcher.starts.get + ",stops=" + dispatcher.stops.get +
|
||||
" required: starts=" + starts + ",stops=" + stops))
|
||||
throw e
|
||||
}
|
||||
|
|
@ -212,7 +212,7 @@ object ActorModelSpec {
|
|||
await(deadline)(stats.restarts.get() == restarts)
|
||||
} catch {
|
||||
case e ⇒
|
||||
app.mainbus.publish(Error(e, dispatcher, "actual: " + stats + ", required: InterceptorStats(susp=" + suspensions +
|
||||
app.eventStream.publish(Error(e, dispatcher, "actual: " + stats + ", required: InterceptorStats(susp=" + suspensions +
|
||||
",res=" + resumes + ",reg=" + registers + ",unreg=" + unregisters +
|
||||
",recv=" + msgsReceived + ",proc=" + msgsProcessed + ",restart=" + restarts))
|
||||
throw e
|
||||
|
|
@ -318,7 +318,7 @@ abstract class ActorModelSpec extends AkkaSpec {
|
|||
try {
|
||||
f
|
||||
} catch {
|
||||
case e ⇒ app.mainbus.publish(Error(e, this, "error in spawned thread"))
|
||||
case e ⇒ app.eventStream.publish(Error(e, this, "error in spawned thread"))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -29,7 +29,7 @@ object MainBusSpec {
|
|||
class C extends B1
|
||||
}
|
||||
|
||||
class MainBusSpec extends AkkaSpec(Configuration(
|
||||
class EventStreamSpec extends AkkaSpec(Configuration(
|
||||
"akka.stdout-loglevel" -> "WARNING",
|
||||
"akka.loglevel" -> "INFO",
|
||||
"akka.event-handlers" -> Seq("akka.event.MainBusSpec$MyLog", Logging.StandardOutLoggerName))) {
|
||||
|
|
@ -39,7 +39,7 @@ class MainBusSpec extends AkkaSpec(Configuration(
|
|||
"A MainBus" must {
|
||||
|
||||
"manage subscriptions" in {
|
||||
val bus = new MainBus(true)
|
||||
val bus = new EventStream(true)
|
||||
bus.start(app)
|
||||
bus.subscribe(testActor, classOf[M])
|
||||
bus.publish(M(42))
|
||||
|
|
@ -52,7 +52,7 @@ class MainBusSpec extends AkkaSpec(Configuration(
|
|||
}
|
||||
|
||||
"manage log levels" in {
|
||||
val bus = new MainBus(false)
|
||||
val bus = new EventStream(false)
|
||||
bus.start(app)
|
||||
bus.startDefaultLoggers(app, app.AkkaConfig)
|
||||
bus.publish(SetTarget(testActor))
|
||||
|
|
@ -73,7 +73,7 @@ class MainBusSpec extends AkkaSpec(Configuration(
|
|||
val b1 = new B1
|
||||
val b2 = new B2
|
||||
val c = new C
|
||||
val bus = new MainBus(false)
|
||||
val bus = new EventStream(false)
|
||||
bus.start(app)
|
||||
within(2 seconds) {
|
||||
bus.subscribe(testActor, classOf[B2]) === true
|
||||
|
|
@ -155,7 +155,7 @@ object Timeout {
|
|||
}
|
||||
|
||||
trait ActorLogging { this: Actor ⇒
|
||||
val log = akka.event.Logging(app.mainbus, context.self)
|
||||
val log = akka.event.Logging(app.eventStream, context.self)
|
||||
}
|
||||
|
||||
object Actor {
|
||||
|
|
@ -168,7 +168,7 @@ object Actor {
|
|||
class LoggingReceive(source: AnyRef, r: Receive)(implicit app: ActorSystem) extends Receive {
|
||||
def isDefinedAt(o: Any) = {
|
||||
val handled = r.isDefinedAt(o)
|
||||
app.mainbus.publish(Debug(source, "received " + (if (handled) "handled" else "unhandled") + " message " + o))
|
||||
app.eventStream.publish(Debug(source, "received " + (if (handled) "handled" else "unhandled") + " message " + o))
|
||||
handled
|
||||
}
|
||||
def apply(o: Any): Unit = r(o)
|
||||
|
|
@ -414,7 +414,7 @@ trait Actor {
|
|||
private[akka] final def apply(msg: Any) = {
|
||||
|
||||
def autoReceiveMessage(msg: AutoReceivedMessage) {
|
||||
if (app.AkkaConfig.DebugAutoReceive) app.mainbus.publish(Debug(this, "received AutoReceiveMessage " + msg))
|
||||
if (app.AkkaConfig.DebugAutoReceive) app.eventStream.publish(Debug(this, "received AutoReceiveMessage " + msg))
|
||||
|
||||
msg match {
|
||||
case HotSwap(code, discardOld) ⇒ become(code(self), discardOld)
|
||||
|
|
|
|||
|
|
@ -167,11 +167,11 @@ private[akka] class ActorCell(
|
|||
actor = created
|
||||
created.preStart()
|
||||
checkReceiveTimeout
|
||||
if (app.AkkaConfig.DebugLifecycle) app.mainbus.publish(Debug(self, "started (" + actor + ")"))
|
||||
if (app.AkkaConfig.DebugLifecycle) app.eventStream.publish(Debug(self, "started (" + actor + ")"))
|
||||
} catch {
|
||||
case e ⇒
|
||||
try {
|
||||
app.mainbus.publish(Error(e, self, "error while creating actor"))
|
||||
app.eventStream.publish(Error(e, self, "error while creating actor"))
|
||||
// prevent any further messages to be processed until the actor has been restarted
|
||||
dispatcher.suspend(this)
|
||||
} finally {
|
||||
|
|
@ -181,7 +181,7 @@ private[akka] class ActorCell(
|
|||
|
||||
def recreate(cause: Throwable): Unit = try {
|
||||
val failedActor = actor
|
||||
if (app.AkkaConfig.DebugLifecycle) app.mainbus.publish(Debug(self, "restarting"))
|
||||
if (app.AkkaConfig.DebugLifecycle) app.eventStream.publish(Debug(self, "restarting"))
|
||||
val freshActor = newActor()
|
||||
if (failedActor ne null) {
|
||||
val c = currentMessage //One read only plz
|
||||
|
|
@ -195,14 +195,14 @@ private[akka] class ActorCell(
|
|||
}
|
||||
actor = freshActor // assign it here so if preStart fails, we can null out the sef-refs next call
|
||||
freshActor.postRestart(cause)
|
||||
if (app.AkkaConfig.DebugLifecycle) app.mainbus.publish(Debug(self, "restarted"))
|
||||
if (app.AkkaConfig.DebugLifecycle) app.eventStream.publish(Debug(self, "restarted"))
|
||||
|
||||
dispatcher.resume(this) //FIXME should this be moved down?
|
||||
|
||||
props.faultHandler.handleSupervisorRestarted(cause, self, children)
|
||||
} catch {
|
||||
case e ⇒ try {
|
||||
app.mainbus.publish(Error(e, self, "error while creating actor"))
|
||||
app.eventStream.publish(Error(e, self, "error while creating actor"))
|
||||
// prevent any further messages to be processed until the actor has been restarted
|
||||
dispatcher.suspend(this)
|
||||
} finally {
|
||||
|
|
@ -223,7 +223,7 @@ private[akka] class ActorCell(
|
|||
try {
|
||||
try {
|
||||
val a = actor
|
||||
if (app.AkkaConfig.DebugLifecycle) app.mainbus.publish(Debug(self, "stopping"))
|
||||
if (app.AkkaConfig.DebugLifecycle) app.eventStream.publish(Debug(self, "stopping"))
|
||||
if (a ne null) a.postStop()
|
||||
} finally {
|
||||
//Stop supervised actors
|
||||
|
|
@ -250,8 +250,8 @@ private[akka] class ActorCell(
|
|||
if (!stats.contains(child)) {
|
||||
childrenRefs = childrenRefs.updated(child.name, child)
|
||||
childrenStats = childrenStats.updated(child, ChildRestartStats())
|
||||
if (app.AkkaConfig.DebugLifecycle) app.mainbus.publish(Debug(self, "now supervising " + child))
|
||||
} else app.mainbus.publish(Warning(self, "Already supervising " + child))
|
||||
if (app.AkkaConfig.DebugLifecycle) app.eventStream.publish(Debug(self, "now supervising " + child))
|
||||
} else app.eventStream.publish(Warning(self, "Already supervising " + child))
|
||||
}
|
||||
|
||||
try {
|
||||
|
|
@ -262,10 +262,10 @@ private[akka] class ActorCell(
|
|||
case Recreate(cause) ⇒ recreate(cause)
|
||||
case Link(subject) ⇒
|
||||
app.deathWatch.subscribe(self, subject)
|
||||
if (app.AkkaConfig.DebugLifecycle) app.mainbus.publish(Debug(self, "now monitoring " + subject))
|
||||
if (app.AkkaConfig.DebugLifecycle) app.eventStream.publish(Debug(self, "now monitoring " + subject))
|
||||
case Unlink(subject) ⇒
|
||||
app.deathWatch.unsubscribe(self, subject)
|
||||
if (app.AkkaConfig.DebugLifecycle) app.mainbus.publish(Debug(self, "stopped monitoring " + subject))
|
||||
if (app.AkkaConfig.DebugLifecycle) app.eventStream.publish(Debug(self, "stopped monitoring " + subject))
|
||||
case Suspend() ⇒ suspend()
|
||||
case Resume() ⇒ resume()
|
||||
case Terminate() ⇒ terminate()
|
||||
|
|
@ -274,7 +274,7 @@ private[akka] class ActorCell(
|
|||
}
|
||||
} catch {
|
||||
case e ⇒ //Should we really catch everything here?
|
||||
app.mainbus.publish(Error(e, self, "error while processing " + message))
|
||||
app.eventStream.publish(Error(e, self, "error while processing " + message))
|
||||
//TODO FIXME How should problems here be handled?
|
||||
throw e
|
||||
}
|
||||
|
|
@ -293,7 +293,7 @@ private[akka] class ActorCell(
|
|||
currentMessage = null // reset current message after successful invocation
|
||||
} catch {
|
||||
case e ⇒
|
||||
app.mainbus.publish(Error(e, self, e.getMessage))
|
||||
app.eventStream.publish(Error(e, self, e.getMessage))
|
||||
|
||||
// prevent any further messages to be processed until the actor has been restarted
|
||||
dispatcher.suspend(this)
|
||||
|
|
@ -313,7 +313,7 @@ private[akka] class ActorCell(
|
|||
}
|
||||
} catch {
|
||||
case e ⇒
|
||||
app.mainbus.publish(Error(e, self, e.getMessage))
|
||||
app.eventStream.publish(Error(e, self, e.getMessage))
|
||||
throw e
|
||||
}
|
||||
}
|
||||
|
|
@ -322,7 +322,7 @@ private[akka] class ActorCell(
|
|||
|
||||
final def handleFailure(fail: Failed): Unit = childrenStats.get(fail.actor) match {
|
||||
case Some(stats) ⇒ if (!props.faultHandler.handleFailure(fail, stats, childrenStats)) throw fail.cause
|
||||
case None ⇒ app.mainbus.publish(Warning(self, "dropping " + fail + " from unknown child"))
|
||||
case None ⇒ app.eventStream.publish(Warning(self, "dropping " + fail + " from unknown child"))
|
||||
}
|
||||
|
||||
final def handleChildTerminated(child: ActorRef): Unit = {
|
||||
|
|
|
|||
|
|
@ -384,10 +384,10 @@ class DeadLetterActorRef(val app: ActorSystem) extends MinimalActorRef {
|
|||
override def isShutdown(): Boolean = true
|
||||
|
||||
protected[akka] override def postMessageToMailbox(message: Any, sender: ActorRef): Unit =
|
||||
app.mainbus.publish(DeadLetter(message, sender))
|
||||
app.eventStream.publish(DeadLetter(message, sender))
|
||||
|
||||
override def ?(message: Any)(implicit timeout: Timeout): Future[Any] = {
|
||||
app.mainbus.publish(DeadLetter(message, this))
|
||||
app.eventStream.publish(DeadLetter(message, this))
|
||||
brokenPromise
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -103,7 +103,7 @@ class ActorRefProviderException(message: String) extends AkkaException(message)
|
|||
*/
|
||||
class LocalActorRefProvider(val app: ActorSystem) extends ActorRefProvider {
|
||||
|
||||
val log = Logging(app.mainbus, this)
|
||||
val log = Logging(app.eventStream, this)
|
||||
|
||||
private[akka] val deployer: Deployer = new Deployer(app)
|
||||
|
||||
|
|
|
|||
|
|
@ -101,7 +101,7 @@ class ActorSystem(val name: String, val config: Configuration) extends ActorRefF
|
|||
val DebugAutoReceive = getBool("akka.actor.debug.autoreceive", false)
|
||||
val DebugLifecycle = getBool("akka.actor.debug.lifecycle", false)
|
||||
val FsmDebugEvent = getBool("akka.actor.debug.fsm", false)
|
||||
val DebugMainBus = getBool("akka.actor.debug.mainbus", false)
|
||||
val DebugMainBus = getBool("akka.actor.debug.eventStream", false)
|
||||
|
||||
val DispatcherThroughput = getInt("akka.actor.throughput", 5)
|
||||
val DispatcherDefaultShutdown = getLong("akka.actor.dispatcher-shutdown-timeout").
|
||||
|
|
@ -161,9 +161,9 @@ class ActorSystem(val name: String, val config: Configuration) extends ActorRefF
|
|||
})
|
||||
|
||||
// this provides basic logging (to stdout) until .start() is called below
|
||||
val mainbus = new MainBus(DebugMainBus)
|
||||
mainbus.startStdoutLogger(AkkaConfig)
|
||||
val log = new BusLogging(mainbus, this)
|
||||
val eventStream = new EventStream(DebugMainBus)
|
||||
eventStream.startStdoutLogger(AkkaConfig)
|
||||
val log = new BusLogging(eventStream, this)
|
||||
|
||||
// TODO correctly pull its config from the config
|
||||
val dispatcherFactory = new Dispatchers(this)
|
||||
|
|
@ -193,7 +193,7 @@ class ActorSystem(val name: String, val config: Configuration) extends ActorRefF
|
|||
private class SystemGuardian extends Actor {
|
||||
def receive = {
|
||||
case Terminated(_) ⇒
|
||||
mainbus.stopDefaultLoggers()
|
||||
eventStream.stopDefaultLoggers()
|
||||
context.self.stop()
|
||||
}
|
||||
}
|
||||
|
|
@ -226,8 +226,8 @@ class ActorSystem(val name: String, val config: Configuration) extends ActorRefF
|
|||
deathWatch.subscribe(rootGuardian, systemGuardian)
|
||||
|
||||
// this starts the reaper actor and the user-configured logging subscribers, which are also actors
|
||||
mainbus.start(this)
|
||||
mainbus.startDefaultLoggers(this, AkkaConfig)
|
||||
eventStream.start(this)
|
||||
eventStream.startDefaultLoggers(this, AkkaConfig)
|
||||
|
||||
// TODO think about memory consistency effects when doing funky stuff inside an ActorRefProvider's constructor
|
||||
val deployer = new Deployer(this)
|
||||
|
|
|
|||
|
|
@ -36,7 +36,7 @@ trait ActorDeployer {
|
|||
class Deployer(val app: ActorSystem) extends ActorDeployer {
|
||||
|
||||
val deploymentConfig = new DeploymentConfig(app)
|
||||
val log = Logging(app.mainbus, this)
|
||||
val log = Logging(app.eventStream, this)
|
||||
|
||||
val instance: ActorDeployer = {
|
||||
val deployer = new LocalDeployer()
|
||||
|
|
|
|||
|
|
@ -188,7 +188,7 @@ trait FSM[S, D] extends ListenerManagement {
|
|||
type Timeout = Option[Duration]
|
||||
type TransitionHandler = PartialFunction[(S, S), Unit]
|
||||
|
||||
val log = Logging(app.mainbus, context.self)
|
||||
val log = Logging(app.eventStream, context.self)
|
||||
|
||||
/**
|
||||
* ****************************************
|
||||
|
|
|
|||
|
|
@ -66,7 +66,7 @@ final case class TaskInvocation(app: ActorSystem, function: () ⇒ Unit, cleanup
|
|||
try {
|
||||
function()
|
||||
} catch {
|
||||
case e ⇒ app.mainbus.publish(Error(e, this, e.getMessage))
|
||||
case e ⇒ app.eventStream.publish(Error(e, this, e.getMessage))
|
||||
} finally {
|
||||
cleanup()
|
||||
}
|
||||
|
|
|
|||
|
|
@ -93,7 +93,7 @@ class Dispatcher(
|
|||
executorService.get() execute invocation
|
||||
} catch {
|
||||
case e: RejectedExecutionException ⇒
|
||||
app.mainbus.publish(Warning(this, e.toString))
|
||||
app.eventStream.publish(Warning(this, e.toString))
|
||||
throw e
|
||||
}
|
||||
}
|
||||
|
|
@ -120,7 +120,7 @@ class Dispatcher(
|
|||
} catch {
|
||||
case e: RejectedExecutionException ⇒
|
||||
try {
|
||||
app.mainbus.publish(Warning(this, e.toString))
|
||||
app.eventStream.publish(Warning(this, e.toString))
|
||||
} finally {
|
||||
mbox.setAsIdle()
|
||||
}
|
||||
|
|
|
|||
|
|
@ -262,7 +262,7 @@ object Future {
|
|||
result completeWithResult currentValue
|
||||
} catch {
|
||||
case e: Exception ⇒
|
||||
dispatcher.app.mainbus.publish(Error(e, this, e.getMessage))
|
||||
dispatcher.app.eventStream.publish(Error(e, this, e.getMessage))
|
||||
result completeWithException e
|
||||
} finally {
|
||||
results.clear
|
||||
|
|
@ -631,7 +631,7 @@ sealed trait Future[+T] extends japi.Future[T] {
|
|||
Right(f(res))
|
||||
} catch {
|
||||
case e: Exception ⇒
|
||||
dispatcher.app.mainbus.publish(Error(e, this, e.getMessage))
|
||||
dispatcher.app.eventStream.publish(Error(e, this, e.getMessage))
|
||||
Left(e)
|
||||
})
|
||||
}
|
||||
|
|
@ -683,7 +683,7 @@ sealed trait Future[+T] extends japi.Future[T] {
|
|||
future.completeWith(f(r))
|
||||
} catch {
|
||||
case e: Exception ⇒
|
||||
dispatcher.app.mainbus.publish(Error(e, this, e.getMessage))
|
||||
dispatcher.app.eventStream.publish(Error(e, this, e.getMessage))
|
||||
future complete Left(e)
|
||||
}
|
||||
}
|
||||
|
|
@ -716,7 +716,7 @@ sealed trait Future[+T] extends japi.Future[T] {
|
|||
if (p(res)) r else Left(new MatchError(res))
|
||||
} catch {
|
||||
case e: Exception ⇒
|
||||
dispatcher.app.mainbus.publish(Error(e, this, e.getMessage))
|
||||
dispatcher.app.eventStream.publish(Error(e, this, e.getMessage))
|
||||
Left(e)
|
||||
})
|
||||
}
|
||||
|
|
@ -811,7 +811,7 @@ trait Promise[T] extends Future[T] {
|
|||
fr completeWith cont(f)
|
||||
} catch {
|
||||
case e: Exception ⇒
|
||||
dispatcher.app.mainbus.publish(Error(e, this, e.getMessage))
|
||||
dispatcher.app.eventStream.publish(Error(e, this, e.getMessage))
|
||||
fr completeWithException e
|
||||
}
|
||||
}
|
||||
|
|
@ -825,7 +825,7 @@ trait Promise[T] extends Future[T] {
|
|||
fr completeWith cont(f)
|
||||
} catch {
|
||||
case e: Exception ⇒
|
||||
dispatcher.app.mainbus.publish(Error(e, this, e.getMessage))
|
||||
dispatcher.app.eventStream.publish(Error(e, this, e.getMessage))
|
||||
fr completeWithException e
|
||||
}
|
||||
}
|
||||
|
|
@ -1017,7 +1017,7 @@ class DefaultPromise[T](val timeout: Timeout)(implicit val dispatcher: MessageDi
|
|||
} else this
|
||||
|
||||
private def notifyCompleted(func: Future[T] ⇒ Unit) {
|
||||
try { func(this) } catch { case e ⇒ dispatcher.app.mainbus.publish(Error(e, this, "Future onComplete-callback raised an exception")) } //TODO catch, everything? Really?
|
||||
try { func(this) } catch { case e ⇒ dispatcher.app.eventStream.publish(Error(e, this, "Future onComplete-callback raised an exception")) } //TODO catch, everything? Really?
|
||||
}
|
||||
|
||||
@inline
|
||||
|
|
|
|||
|
|
@ -205,7 +205,7 @@ abstract class Mailbox(val actor: ActorCell) extends AbstractMailbox with Messag
|
|||
}
|
||||
} catch {
|
||||
case e ⇒
|
||||
actor.app.mainbus.publish(Error(e, this, "exception during processing system messages, dropping " + SystemMessage.size(nextMessage) + " messages!"))
|
||||
actor.app.eventStream.publish(Error(e, this, "exception during processing system messages, dropping " + SystemMessage.size(nextMessage) + " messages!"))
|
||||
throw e
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -227,10 +227,10 @@ class BoundedExecutorDecorator(val app: ActorSystem, val executor: ExecutorServi
|
|||
})
|
||||
} catch {
|
||||
case e: RejectedExecutionException ⇒
|
||||
app.mainbus.publish(Warning(this, e.toString))
|
||||
app.eventStream.publish(Warning(this, e.toString))
|
||||
semaphore.release
|
||||
case e: Throwable ⇒
|
||||
app.mainbus.publish(Error(e, this, e.getMessage))
|
||||
app.eventStream.publish(Error(e, this, e.getMessage))
|
||||
throw e
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -8,7 +8,7 @@ import akka.actor.ActorSystem
|
|||
import akka.actor.Terminated
|
||||
import akka.util.Subclassification
|
||||
|
||||
class MainBus(debug: Boolean = false) extends LoggingBus with SubchannelClassification {
|
||||
class EventStream(debug: Boolean = false) extends LoggingBus with SubchannelClassification {
|
||||
|
||||
type Event = AnyRef
|
||||
type Classifier = Class[_]
|
||||
|
|
@ -220,7 +220,7 @@ object Logging {
|
|||
* Obtain LoggingAdapter for the given application and source object. The
|
||||
* source object is used to identify the source of this logging channel.
|
||||
*/
|
||||
def apply(app: ActorSystem, source: AnyRef): LoggingAdapter = new BusLogging(app.mainbus, source)
|
||||
def apply(app: ActorSystem, source: AnyRef): LoggingAdapter = new BusLogging(app.eventStream, source)
|
||||
/**
|
||||
* Java API: Obtain LoggingAdapter for the given application and source object. The
|
||||
* source object is used to identify the source of this logging channel.
|
||||
|
|
|
|||
|
|
@ -162,7 +162,7 @@ abstract class RemoteSupport(val app: ActorSystem) {
|
|||
recipient: ActorRef,
|
||||
loader: Option[ClassLoader]): Unit
|
||||
|
||||
protected[akka] def notifyListeners(message: RemoteLifeCycleEvent): Unit = app.mainbus.publish(message)
|
||||
protected[akka] def notifyListeners(message: RemoteLifeCycleEvent): Unit = app.eventStream.publish(message)
|
||||
|
||||
override def toString = name
|
||||
}
|
||||
|
|
|
|||
|
|
@ -24,7 +24,7 @@ object JMX {
|
|||
case e: InstanceAlreadyExistsException ⇒
|
||||
Some(mbeanServer.getObjectInstance(name))
|
||||
case e: Exception ⇒
|
||||
app.mainbus.publish(Error(e, this, "Error when registering mbean [%s]".format(mbean)))
|
||||
app.eventStream.publish(Error(e, this, "Error when registering mbean [%s]".format(mbean)))
|
||||
None
|
||||
}
|
||||
|
||||
|
|
@ -32,6 +32,6 @@ object JMX {
|
|||
mbeanServer.unregisterMBean(mbean)
|
||||
} catch {
|
||||
case e: InstanceNotFoundException ⇒ {}
|
||||
case e: Exception ⇒ app.mainbus.publish(Error(e, this, "Error while unregistering mbean [%s]".format(mbean)))
|
||||
case e: Exception ⇒ app.eventStream.publish(Error(e, this, "Error while unregistering mbean [%s]".format(mbean)))
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -36,8 +36,8 @@ class ActorDocSpec extends AkkaSpec(Configuration("akka.loglevel" -> "INFO")) {
|
|||
case e: Logging.Info ⇒ true
|
||||
case _ ⇒ false
|
||||
}
|
||||
app.mainbus.publish(TestEvent.Mute(filter))
|
||||
app.mainbus.subscribe(testActor, classOf[Logging.Info])
|
||||
app.eventStream.publish(TestEvent.Mute(filter))
|
||||
app.eventStream.subscribe(testActor, classOf[Logging.Info])
|
||||
|
||||
myActor ! "test"
|
||||
expectMsgPF(1 second) { case Logging.Info(_, "received test") ⇒ true }
|
||||
|
|
@ -45,8 +45,8 @@ class ActorDocSpec extends AkkaSpec(Configuration("akka.loglevel" -> "INFO")) {
|
|||
myActor ! "unknown"
|
||||
expectMsgPF(1 second) { case Logging.Info(_, "received unknown message") ⇒ true }
|
||||
|
||||
app.mainbus.unsubscribe(testActor)
|
||||
app.mainbus.publish(TestEvent.UnMute(filter))
|
||||
app.eventStream.unsubscribe(testActor)
|
||||
app.eventStream.publish(TestEvent.UnMute(filter))
|
||||
|
||||
myActor.stop()
|
||||
}
|
||||
|
|
|
|||
|
|
@ -74,8 +74,8 @@ class Remote(val app: ActorSystem) {
|
|||
val remote = new akka.remote.netty.NettyRemoteSupport(app)
|
||||
remote.start() //TODO FIXME Any application loader here?
|
||||
|
||||
app.mainbus.subscribe(eventStream.sender, classOf[RemoteLifeCycleEvent])
|
||||
app.mainbus.subscribe(remoteClientLifeCycleHandler, classOf[RemoteLifeCycleEvent])
|
||||
app.eventStream.subscribe(eventStream.sender, classOf[RemoteLifeCycleEvent])
|
||||
app.eventStream.subscribe(remoteClientLifeCycleHandler, classOf[RemoteLifeCycleEvent])
|
||||
|
||||
// TODO actually register this provider in app in remote mode
|
||||
//provider.register(ActorRefProvider.RemoteProvider, new RemoteActorRefProvider)
|
||||
|
|
@ -256,7 +256,7 @@ class RemoteMessage(input: RemoteMessageProtocol, remote: RemoteSupport, classLo
|
|||
.newInstance(exception.getMessage).asInstanceOf[Throwable]
|
||||
} catch {
|
||||
case problem: Exception ⇒
|
||||
remote.app.mainbus.publish(Logging.Error(problem, remote, problem.getMessage))
|
||||
remote.app.eventStream.publish(Logging.Error(problem, remote, problem.getMessage))
|
||||
CannotInstantiateRemoteExceptionDueToRemoteProtocolParsingErrorException(problem, classname, exception.getMessage)
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -71,7 +71,7 @@ public class UntypedCoordinatedIncrementTest {
|
|||
EventFilter expectedFailureFilter = (EventFilter) new ErrorFilter(ExpectedFailureException.class);
|
||||
EventFilter coordinatedFilter = (EventFilter) new ErrorFilter(CoordinatedTransactionException.class);
|
||||
Seq<EventFilter> ignoreExceptions = seq(expectedFailureFilter, coordinatedFilter);
|
||||
application.mainbus().publish(new TestEvent.Mute(ignoreExceptions));
|
||||
application.eventStream().publish(new TestEvent.Mute(ignoreExceptions));
|
||||
CountDownLatch incrementLatch = new CountDownLatch(numCounters);
|
||||
List<ActorRef> actors = new ArrayList<ActorRef>(counters);
|
||||
actors.add(failer);
|
||||
|
|
@ -84,7 +84,7 @@ public class UntypedCoordinatedIncrementTest {
|
|||
Future future = counter.ask("GetCount", askTimeout);
|
||||
assertEquals(0, ((Integer)future.get()).intValue());
|
||||
}
|
||||
application.mainbus().publish(new TestEvent.UnMute(ignoreExceptions));
|
||||
application.eventStream().publish(new TestEvent.UnMute(ignoreExceptions));
|
||||
}
|
||||
|
||||
public <A> Seq<A> seq(A... args) {
|
||||
|
|
|
|||
|
|
@ -75,7 +75,7 @@ public class UntypedTransactorTest {
|
|||
EventFilter expectedFailureFilter = (EventFilter) new ErrorFilter(ExpectedFailureException.class);
|
||||
EventFilter coordinatedFilter = (EventFilter) new ErrorFilter(CoordinatedTransactionException.class);
|
||||
Seq<EventFilter> ignoreExceptions = seq(expectedFailureFilter, coordinatedFilter);
|
||||
application.mainbus().publish(new TestEvent.Mute(ignoreExceptions));
|
||||
application.eventStream().publish(new TestEvent.Mute(ignoreExceptions));
|
||||
CountDownLatch incrementLatch = new CountDownLatch(numCounters);
|
||||
List<ActorRef> actors = new ArrayList<ActorRef>(counters);
|
||||
actors.add(failer);
|
||||
|
|
@ -96,7 +96,7 @@ public class UntypedTransactorTest {
|
|||
}
|
||||
}
|
||||
}
|
||||
application.mainbus().publish(new TestEvent.UnMute(ignoreExceptions));
|
||||
application.eventStream().publish(new TestEvent.UnMute(ignoreExceptions));
|
||||
}
|
||||
|
||||
public <A> Seq<A> seq(A... args) {
|
||||
|
|
|
|||
|
|
@ -211,12 +211,12 @@ class CallingThreadDispatcher(_app: ActorSystem, val name: String = "calling-thr
|
|||
true
|
||||
} catch {
|
||||
case ie: InterruptedException ⇒
|
||||
app.mainbus.publish(Error(this, ie))
|
||||
app.eventStream.publish(Error(this, ie))
|
||||
Thread.currentThread().interrupt()
|
||||
intex = ie
|
||||
true
|
||||
case e ⇒
|
||||
app.mainbus.publish(Error(this, e))
|
||||
app.eventStream.publish(Error(this, e))
|
||||
queue.leave
|
||||
false
|
||||
}
|
||||
|
|
|
|||
|
|
@ -79,7 +79,7 @@ abstract class EventFilter(occurrences: Int) {
|
|||
* remove the filter when the block is finished or aborted.
|
||||
*/
|
||||
def intercept[T](code: ⇒ T)(implicit app: ActorSystem): T = {
|
||||
app.mainbus publish TestEvent.Mute(this)
|
||||
app.eventStream publish TestEvent.Mute(this)
|
||||
try {
|
||||
val result = code
|
||||
if (!awaitDone(app.AkkaConfig.TestEventFilterLeeway))
|
||||
|
|
@ -88,7 +88,7 @@ abstract class EventFilter(occurrences: Int) {
|
|||
else
|
||||
throw new AssertionError("Received " + (-todo) + " messages too many on " + this)
|
||||
result
|
||||
} finally app.mainbus publish TestEvent.UnMute(this)
|
||||
} finally app.eventStream publish TestEvent.UnMute(this)
|
||||
}
|
||||
|
||||
/*
|
||||
|
|
|
|||
|
|
@ -8,7 +8,7 @@ package object testkit {
|
|||
def filterEvents[T](eventFilters: Iterable[EventFilter])(block: ⇒ T)(implicit app: ActorSystem): T = {
|
||||
def now = System.currentTimeMillis
|
||||
|
||||
app.mainbus.publish(TestEvent.Mute(eventFilters.toSeq))
|
||||
app.eventStream.publish(TestEvent.Mute(eventFilters.toSeq))
|
||||
try {
|
||||
val result = block
|
||||
|
||||
|
|
@ -19,7 +19,7 @@ package object testkit {
|
|||
|
||||
result
|
||||
} finally {
|
||||
app.mainbus.publish(TestEvent.UnMute(eventFilters.toSeq))
|
||||
app.eventStream.publish(TestEvent.UnMute(eventFilters.toSeq))
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -16,7 +16,7 @@ import akka.dispatch.FutureTimeoutException
|
|||
abstract class AkkaSpec(_application: ActorSystem = ActorSystem())
|
||||
extends TestKit(_application) with WordSpec with MustMatchers with BeforeAndAfterAll {
|
||||
|
||||
val log: LoggingAdapter = Logging(app.mainbus, this)
|
||||
val log: LoggingAdapter = Logging(app.eventStream, this)
|
||||
|
||||
final override def beforeAll {
|
||||
atStartup()
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue