diff --git a/akka-actor-tests/src/test/scala/akka/actor/FSMActorSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/FSMActorSpec.scala index 1656c6efdc..9a82cc2489 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/FSMActorSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/FSMActorSpec.scala @@ -61,7 +61,7 @@ object FSMActorSpec { whenUnhandled { case Ev(msg) ⇒ { - EventHandler.info(this, "unhandled event " + msg + " in state " + stateName + " with data " + stateData) + app.eventHandler.info(this, "unhandled event " + msg + " in state " + stateName + " with data " + stateData) unhandledLatch.open stay } @@ -102,13 +102,13 @@ object FSMActorSpec { class FSMActorSpec extends AkkaSpec(Configuration("akka.actor.debug.fsm" -> true)) with BeforeAndAfterAll with BeforeAndAfterEach with ImplicitSender { import FSMActorSpec._ - val eh_level = EventHandler.level + val eh_level = app.eventHandler.level var logger: ActorRef = _ override def afterEach { - EventHandler.level = eh_level + app.eventHandler.level = eh_level if (logger ne null) { - EventHandler.removeListener(logger) + app.eventHandler.removeListener(logger) logger = null } } @@ -178,7 +178,7 @@ class FSMActorSpec extends AkkaSpec(Configuration("akka.actor.debug.fsm" -> true case x ⇒ testActor forward x } }) - EventHandler.addListener(logger) + app.eventHandler.addListener(logger) fsm ! "go" expectMsgPF(1 second) { case EventHandler.Error(_: EventHandler.EventHandlerException, ref, "Next state 2 does not exist") if ref eq fsm.underlyingActor ⇒ true @@ -224,8 +224,8 @@ class FSMActorSpec extends AkkaSpec(Configuration("akka.actor.debug.fsm" -> true case x ⇒ testActor forward x } }) - EventHandler.addListener(logger) - EventHandler.level = EventHandler.DebugLevel + app.eventHandler.addListener(logger) + app.eventHandler.level = EventHandler.DebugLevel fsmref ! "go" expectMsgPF(1 second) { case EventHandler.Debug(`fsm`, s: String) if s.startsWith("processing Event(go,null) from Actor[testActor") ⇒ true diff --git a/akka-actor-tests/src/test/scala/akka/actor/FSMTransitionSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/FSMTransitionSpec.scala index 40a451060c..3eb6de4cf3 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/FSMTransitionSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/FSMTransitionSpec.scala @@ -5,7 +5,6 @@ package akka.actor import akka.testkit._ import akka.util.duration._ -import akka.event.EventHandler import FSM._ diff --git a/akka-actor-tests/src/test/scala/akka/actor/LoggingReceiveSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/LoggingReceiveSpec.scala index a95ddbb007..8d15aeaf57 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/LoggingReceiveSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/LoggingReceiveSpec.scala @@ -5,13 +5,13 @@ package akka.actor import org.scalatest.{ BeforeAndAfterAll, BeforeAndAfterEach } import akka.testkit.{ TestKit, TestActorRef, EventFilter, TestEvent, ImplicitSender } -import akka.event.EventHandler import akka.util.duration._ import akka.testkit.AkkaSpec import org.scalatest.WordSpec import akka.AkkaApplication import akka.AkkaApplication.defaultConfig import akka.config.Configuration +import akka.event.EventHandler object LoggingReceiveSpec { class TestLogActor extends Actor { @@ -29,9 +29,9 @@ class LoggingReceiveSpec extends WordSpec with BeforeAndAfterEach with BeforeAnd val appLifecycle = AkkaApplication("lifecycle", config ++ Configuration("akka.actor.debug.lifecycle" -> true)) // override def beforeAll { - // EventHandler.notify(TestEvent.Mute(EventFilter[UnhandledMessageException], + // app.eventHandler.notify(TestEvent.Mute(EventFilter[UnhandledMessageException], // EventFilter[ActorKilledException], EventFilter.custom { - // case d: EventHandler.Debug ⇒ true + // case d: app.eventHandler.Debug ⇒ true // case _ ⇒ false // })) // } @@ -43,14 +43,14 @@ class LoggingReceiveSpec extends WordSpec with BeforeAndAfterEach with BeforeAnd // !s.startsWith("now monitoring") && !s.startsWith("stopped monitoring") // case EventHandler.Debug(_, _) ⇒ true // case EventHandler.Error(_: UnhandledMessageException, _, _) ⇒ false - // case _: EventHandler.Error ⇒ true + // case _: app.eventHandler.Error ⇒ true // } "A LoggingReceive" ignore { "decorate a Receive" in { new TestKit(appLogging) { - EventHandler.addListener(testActor) + app.eventHandler.addListener(testActor) val r: Actor.Receive = { case null ⇒ } @@ -62,7 +62,7 @@ class LoggingReceiveSpec extends WordSpec with BeforeAndAfterEach with BeforeAnd "be added on Actor if requested" in { new TestKit(appLogging) with ImplicitSender { - EventHandler.addListener(testActor) + app.eventHandler.addListener(testActor) val actor = TestActorRef(new Actor { def receive = loggable(this) { case _ ⇒ reply("x") @@ -89,7 +89,7 @@ class LoggingReceiveSpec extends WordSpec with BeforeAndAfterEach with BeforeAnd "not duplicate logging" in { new TestKit(appLogging) with ImplicitSender { - EventHandler.addListener(testActor) + app.eventHandler.addListener(testActor) val actor = TestActorRef(new Actor { def receive = loggable(this)(loggable(this) { case _ ⇒ reply("x") @@ -109,7 +109,7 @@ class LoggingReceiveSpec extends WordSpec with BeforeAndAfterEach with BeforeAnd "log AutoReceiveMessages if requested" in { new TestKit(appAuto) { - EventHandler.addListener(testActor) + app.eventHandler.addListener(testActor) val actor = TestActorRef(new Actor { def receive = { case _ ⇒ @@ -123,7 +123,7 @@ class LoggingReceiveSpec extends WordSpec with BeforeAndAfterEach with BeforeAnd "log LifeCycle changes if requested" in { new TestKit(appLifecycle) { - EventHandler.addListener(testActor) + app.eventHandler.addListener(testActor) within(2 seconds) { val supervisor = TestActorRef[TestLogActor](Props[TestLogActor].withFaultHandler(OneForOneStrategy(List(classOf[Throwable]), 5, 5000))) diff --git a/akka-actor-tests/src/test/scala/akka/actor/RestartStrategySpec.scala b/akka-actor-tests/src/test/scala/akka/actor/RestartStrategySpec.scala index dafe164fbb..d3caac95ef 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/RestartStrategySpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/RestartStrategySpec.scala @@ -6,7 +6,6 @@ package akka.actor import java.lang.Thread.sleep import org.scalatest.BeforeAndAfterAll -import akka.event.EventHandler import akka.testkit.TestEvent._ import akka.testkit.EventFilter import java.util.concurrent.{ TimeUnit, CountDownLatch } @@ -16,11 +15,11 @@ import akka.testkit.AkkaSpec class RestartStrategySpec extends AkkaSpec with BeforeAndAfterAll { override def beforeAll() { - EventHandler.notify(Mute(EventFilter[Exception]("Crashing..."))) + app.eventHandler.notify(Mute(EventFilter[Exception]("Crashing..."))) } override def afterAll() { - EventHandler.notify(UnMuteAll) + app.eventHandler.notify(UnMuteAll) } object Ping diff --git a/akka-actor-tests/src/test/scala/akka/actor/SchedulerSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/SchedulerSpec.scala index f324c83c4f..81d4e64fd6 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/SchedulerSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/SchedulerSpec.scala @@ -1,7 +1,6 @@ package akka.actor import org.scalatest.BeforeAndAfterEach -import akka.event.EventHandler import akka.testkit.TestEvent._ import akka.testkit.EventFilter import org.multiverse.api.latches.StandardLatch @@ -18,13 +17,13 @@ class SchedulerSpec extends AkkaSpec with BeforeAndAfterEach { } override def beforeEach { - EventHandler.notify(Mute(EventFilter[Exception]("CRASH"))) + app.eventHandler.notify(Mute(EventFilter[Exception]("CRASH"))) } override def afterEach { while (futures.peek() ne null) { Option(futures.poll()).foreach(_.cancel(true)) } app.registry.local.shutdownAll - EventHandler.start() + app.eventHandler.start() } "A Scheduler" must { diff --git a/akka-actor-tests/src/test/scala/akka/actor/SupervisorSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/SupervisorSpec.scala index e9824a9242..a227f5c4cb 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/SupervisorSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/SupervisorSpec.scala @@ -12,7 +12,6 @@ import akka.testkit.Testing.sleepFor import akka.util.duration._ import akka.{ Die, Ping } import akka.actor.Actor._ -import akka.event.EventHandler import akka.testkit.TestEvent._ import akka.testkit.EventFilter import java.util.concurrent.atomic.AtomicInteger @@ -123,13 +122,13 @@ class SupervisorSpec extends AkkaSpec with BeforeAndAfterEach with BeforeAndAfte } override def beforeAll() = { - EventHandler notify Mute(EventFilter[Exception]("Die"), + app.eventHandler notify Mute(EventFilter[Exception]("Die"), EventFilter[IllegalStateException]("Don't wanna!"), EventFilter[RuntimeException]("Expected")) } override def afterAll() = { - EventHandler notify UnMuteAll + app.eventHandler notify UnMuteAll } override def beforeEach() = { diff --git a/akka-actor-tests/src/test/scala/akka/actor/Ticket669Spec.scala b/akka-actor-tests/src/test/scala/akka/actor/Ticket669Spec.scala index 2769ced074..d671625e0f 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/Ticket669Spec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/Ticket669Spec.scala @@ -17,7 +17,6 @@ class Ticket669Spec extends AkkaSpec with BeforeAndAfterAll with ImplicitSender override def afterAll = { app.registry.local.shutdownAll - akka.event.EventHandler.start() } "A supervised actor with lifecycle PERMANENT" should { diff --git a/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala index 712273a14a..435a4df2be 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala @@ -3,7 +3,6 @@ */ package akka.actor.dispatch -import akka.event.EventHandler import org.scalatest.Assertions._ import akka.testkit.{ Testing, filterEvents, EventFilter, AkkaSpec } import akka.dispatch._ @@ -153,14 +152,14 @@ object ActorModelSpec { def assertDispatcher(dispatcher: MessageDispatcherInterceptor)( starts: Long = dispatcher.starts.get(), - stops: Long = dispatcher.stops.get()) { + stops: Long = dispatcher.stops.get())(implicit app: AkkaApplication) { val deadline = System.currentTimeMillis + dispatcher.timeoutMs * 5 try { await(deadline)(starts == dispatcher.starts.get) await(deadline)(stops == dispatcher.stops.get) } catch { case e ⇒ - EventHandler.error(e, dispatcher, "actual: starts=" + dispatcher.starts.get + ",stops=" + dispatcher.stops.get + + app.eventHandler.error(e, dispatcher, "actual: starts=" + dispatcher.starts.get + ",stops=" + dispatcher.stops.get + " required: starts=" + starts + ",stops=" + stops) throw e } @@ -190,7 +189,7 @@ object ActorModelSpec { unregisters: Long = 0, msgsReceived: Long = 0, msgsProcessed: Long = 0, - restarts: Long = 0) { + restarts: Long = 0)(implicit app: AkkaApplication) { assertRef(actorRef, dispatcher)( suspensions, resumes, @@ -208,7 +207,7 @@ object ActorModelSpec { unregisters: Long = statsFor(actorRef).unregisters.get(), msgsReceived: Long = statsFor(actorRef).msgsReceived.get(), msgsProcessed: Long = statsFor(actorRef).msgsProcessed.get(), - restarts: Long = statsFor(actorRef).restarts.get()) { + restarts: Long = statsFor(actorRef).restarts.get())(implicit app: AkkaApplication) { val stats = statsFor(actorRef, Option(dispatcher).getOrElse(actorRef.asInstanceOf[LocalActorRef].underlying.dispatcher)) val deadline = System.currentTimeMillis + 1000 try { @@ -221,7 +220,7 @@ object ActorModelSpec { await(deadline)(stats.restarts.get() == restarts) } catch { case e ⇒ - EventHandler.error(e, dispatcher, "actual: " + stats + ", required: InterceptorStats(susp=" + suspensions + + app.eventHandler.error(e, dispatcher, "actual: " + stats + ", required: InterceptorStats(susp=" + suspensions + ",res=" + resumes + ",reg=" + registers + ",unreg=" + unregisters + ",recv=" + msgsReceived + ",proc=" + msgsProcessed + ",restart=" + restarts) throw e @@ -324,7 +323,7 @@ abstract class ActorModelSpec extends AkkaSpec { try { f } catch { - case e ⇒ EventHandler.error(e, this, "error in spawned thread") + case e ⇒ app.eventHandler.error(e, this, "error in spawned thread") } } } @@ -399,7 +398,7 @@ abstract class ActorModelSpec extends AkkaSpec { } catch { case e ⇒ System.err.println("Error: " + e.getMessage + " missing count downs == " + cachedMessage.latch.getCount() + " out of " + num) - //EventHandler.error(new Exception with NoStackTrace, null, cachedMessage.latch.getCount()) + //app.eventHandler.error(new Exception with NoStackTrace, null, cachedMessage.latch.getCount()) } } for (run ← 1 to 3) { @@ -480,10 +479,10 @@ class DispatcherModelSpec extends ActorModelSpec { import ActorModelSpec._ def newInterceptedDispatcher = ThreadPoolConfigDispatcherBuilder(config ⇒ - new Dispatcher("foo", app.AkkaConfig.DispatcherThroughput, + new Dispatcher(app, "foo", app.AkkaConfig.DispatcherThroughput, app.dispatcherFactory.ThroughputDeadlineTimeMillis, app.dispatcherFactory.MailboxType, config, app.dispatcherFactory.DispatcherShutdownMillis) with MessageDispatcherInterceptor, - ThreadPoolConfig()).build.asInstanceOf[MessageDispatcherInterceptor] + ThreadPoolConfig(app)).build.asInstanceOf[MessageDispatcherInterceptor] def dispatcherType = "Dispatcher" @@ -509,10 +508,10 @@ class BalancingDispatcherModelSpec extends ActorModelSpec { import ActorModelSpec._ def newInterceptedDispatcher = ThreadPoolConfigDispatcherBuilder(config ⇒ - new BalancingDispatcher("foo", 1, // TODO check why 1 here? (came from old test) + new BalancingDispatcher(app, "foo", 1, // TODO check why 1 here? (came from old test) app.dispatcherFactory.ThroughputDeadlineTimeMillis, app.dispatcherFactory.MailboxType, config, app.dispatcherFactory.DispatcherShutdownMillis) with MessageDispatcherInterceptor, - ThreadPoolConfig()).build.asInstanceOf[MessageDispatcherInterceptor] + ThreadPoolConfig(app)).build.asInstanceOf[MessageDispatcherInterceptor] def dispatcherType = "Balancing Dispatcher" } diff --git a/akka-actor-tests/src/test/scala/akka/actor/dispatch/PinnedActorSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/dispatch/PinnedActorSpec.scala index c0f9b99d65..3f3a7c156f 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/dispatch/PinnedActorSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/dispatch/PinnedActorSpec.scala @@ -2,7 +2,6 @@ package akka.actor.dispatch import java.util.concurrent.{ CountDownLatch, TimeUnit } -import akka.event.EventHandler import akka.testkit.TestEvent._ import akka.testkit.EventFilter import akka.dispatch.{ PinnedDispatcher, Dispatchers } @@ -25,11 +24,11 @@ class PinnedActorSpec extends AkkaSpec with BeforeAndAfterEach { private val unit = TimeUnit.MILLISECONDS override def beforeEach { - EventHandler.notify(Mute(EventFilter[RuntimeException]("Failure"))) + app.eventHandler.notify(Mute(EventFilter[RuntimeException]("Failure"))) } override def afterEach { - EventHandler.notify(UnMuteAll) + app.eventHandler.notify(UnMuteAll) } "A PinnedActor" must { @@ -51,7 +50,7 @@ class PinnedActorSpec extends AkkaSpec with BeforeAndAfterEach { "support ask/exception" in { val actor = createActor(Props[TestActor].withDispatcher(app.dispatcherFactory.newPinnedDispatcher("test"))) - EventHandler.notify(Mute(EventFilter[RuntimeException]("Expected exception; to test fault-tolerance"))) + app.eventHandler.notify(Mute(EventFilter[RuntimeException]("Expected exception; to test fault-tolerance"))) try { (actor ? "Failure").get fail("Should have thrown an exception") diff --git a/akka-actor-tests/src/test/scala/akka/performance/trading/common/AkkaPerformanceTest.scala b/akka-actor-tests/src/test/scala/akka/performance/trading/common/AkkaPerformanceTest.scala index 4b35f1ccef..556d37d735 100644 --- a/akka-actor-tests/src/test/scala/akka/performance/trading/common/AkkaPerformanceTest.scala +++ b/akka-actor-tests/src/test/scala/akka/performance/trading/common/AkkaPerformanceTest.scala @@ -4,7 +4,6 @@ import java.util.concurrent.CountDownLatch import java.util.concurrent.TimeUnit import akka.performance.trading.domain._ import akka.performance.trading.common._ -import akka.event.EventHandler import akka.actor.{ Props, ActorRef, Actor, PoisonPill } import akka.AkkaApplication @@ -62,7 +61,7 @@ abstract class AkkaPerformanceTest(val app: AkkaApplication) extends BenchmarkSc val duration = System.nanoTime - t0 stat.addValue(duration) if (!rsp.status) { - EventHandler.error(this, "Invalid rsp") + app.eventHandler.error(this, "Invalid rsp") } delay(delayMs) } diff --git a/akka-actor-tests/src/test/scala/akka/performance/trading/common/MatchingEngine.scala b/akka-actor-tests/src/test/scala/akka/performance/trading/common/MatchingEngine.scala index 6c310264fb..5f2989fa97 100644 --- a/akka-actor-tests/src/test/scala/akka/performance/trading/common/MatchingEngine.scala +++ b/akka-actor-tests/src/test/scala/akka/performance/trading/common/MatchingEngine.scala @@ -5,7 +5,6 @@ import akka.actor._ import akka.dispatch.Future import akka.dispatch.FutureTimeoutException import akka.dispatch.MessageDispatcher -import akka.event.EventHandler trait MatchingEngine { val meId: String @@ -27,7 +26,7 @@ class AkkaMatchingEngine(val meId: String, val orderbooks: List[Orderbook]) case order: Order ⇒ handleOrder(order) case unknown ⇒ - EventHandler.warning(this, "Received unknown message: " + unknown) + app.eventHandler.warning(this, "Received unknown message: " + unknown) } def handleOrder(order: Order) { @@ -42,7 +41,7 @@ class AkkaMatchingEngine(val meId: String, val orderbooks: List[Orderbook]) pendingStandbyReply.foreach(waitForStandby(_)) done(true) case None ⇒ - EventHandler.warning(this, "Orderbook not handled by this MatchingEngine: " + order.orderbookSymbol) + app.eventHandler.warning(this, "Orderbook not handled by this MatchingEngine: " + order.orderbookSymbol) done(false) } } @@ -56,7 +55,7 @@ class AkkaMatchingEngine(val meId: String, val orderbooks: List[Orderbook]) pendingStandbyFuture.await } catch { case e: FutureTimeoutException ⇒ - EventHandler.error(this, "Standby timeout: " + e) + app.eventHandler.error(this, "Standby timeout: " + e) } } diff --git a/akka-actor-tests/src/test/scala/akka/performance/trading/common/OrderReceiver.scala b/akka-actor-tests/src/test/scala/akka/performance/trading/common/OrderReceiver.scala index 2c50bffdb0..c1f811b425 100644 --- a/akka-actor-tests/src/test/scala/akka/performance/trading/common/OrderReceiver.scala +++ b/akka-actor-tests/src/test/scala/akka/performance/trading/common/OrderReceiver.scala @@ -3,7 +3,6 @@ package akka.performance.trading.common import akka.performance.trading.domain._ import akka.actor._ import akka.dispatch.MessageDispatcher -import akka.event.EventHandler trait OrderReceiver { type ME @@ -32,7 +31,7 @@ class AkkaOrderReceiver extends Actor with OrderReceiver { case routing @ MatchingEngineRouting(mapping) ⇒ refreshMatchingEnginePartitions(routing.asInstanceOf[MatchingEngineRouting[ActorRef]]) case order: Order ⇒ placeOrder(order) - case unknown ⇒ EventHandler.warning(this, "Received unknown message: " + unknown) + case unknown ⇒ app.eventHandler.warning(this, "Received unknown message: " + unknown) } def placeOrder(order: Order) = { @@ -41,7 +40,7 @@ class AkkaOrderReceiver extends Actor with OrderReceiver { case Some(m) ⇒ m.forward(order) case None ⇒ - EventHandler.warning(this, "Unknown orderbook: " + order.orderbookSymbol) + app.eventHandler.warning(this, "Unknown orderbook: " + order.orderbookSymbol) channel ! new Rsp(false) } } diff --git a/akka-actor-tests/src/test/scala/akka/performance/trading/common/PerformanceTest.scala b/akka-actor-tests/src/test/scala/akka/performance/trading/common/PerformanceTest.scala index fbd388dc6d..2f35c4ad7e 100644 --- a/akka-actor-tests/src/test/scala/akka/performance/trading/common/PerformanceTest.scala +++ b/akka-actor-tests/src/test/scala/akka/performance/trading/common/PerformanceTest.scala @@ -51,7 +51,7 @@ trait PerformanceTest extends JUnitSuite { var stat: DescriptiveStatistics = _ - val resultRepository = BenchResultRepository() + val resultRepository = BenchResultRepository(app) lazy val report = new Report(app, resultRepository, compareResultWith) type TS <: TradingSystem diff --git a/akka-actor-tests/src/test/scala/akka/performance/trading/oneway/OneWayMatchingEngine.scala b/akka-actor-tests/src/test/scala/akka/performance/trading/oneway/OneWayMatchingEngine.scala index 728c86c67c..44ce92a92d 100644 --- a/akka-actor-tests/src/test/scala/akka/performance/trading/oneway/OneWayMatchingEngine.scala +++ b/akka-actor-tests/src/test/scala/akka/performance/trading/oneway/OneWayMatchingEngine.scala @@ -2,7 +2,6 @@ package akka.performance.trading.oneway import akka.actor._ import akka.dispatch.MessageDispatcher -import akka.event.EventHandler import akka.performance.trading.domain.Order import akka.performance.trading.domain.Orderbook import akka.performance.trading.common.AkkaMatchingEngine @@ -18,7 +17,7 @@ class OneWayMatchingEngine(meId: String, orderbooks: List[Orderbook]) extends Ak orderbook.matchOrders() case None ⇒ - EventHandler.warning(this, "Orderbook not handled by this MatchingEngine: " + order.orderbookSymbol) + app.eventHandler.warning(this, "Orderbook not handled by this MatchingEngine: " + order.orderbookSymbol) } } diff --git a/akka-actor-tests/src/test/scala/akka/performance/trading/oneway/OneWayOrderReceiver.scala b/akka-actor-tests/src/test/scala/akka/performance/trading/oneway/OneWayOrderReceiver.scala index 604dcadb5f..daeabfb36b 100644 --- a/akka-actor-tests/src/test/scala/akka/performance/trading/oneway/OneWayOrderReceiver.scala +++ b/akka-actor-tests/src/test/scala/akka/performance/trading/oneway/OneWayOrderReceiver.scala @@ -2,7 +2,6 @@ package akka.performance.trading.oneway import akka.actor._ import akka.dispatch.MessageDispatcher -import akka.event.EventHandler import akka.performance.trading.domain._ import akka.performance.trading.common.AkkaOrderReceiver @@ -14,7 +13,7 @@ class OneWayOrderReceiver extends AkkaOrderReceiver { case Some(m) ⇒ m ! order case None ⇒ - EventHandler.warning(this, "Unknown orderbook: " + order.orderbookSymbol) + app.eventHandler.warning(this, "Unknown orderbook: " + order.orderbookSymbol) } } } diff --git a/akka-actor-tests/src/test/scala/akka/performance/workbench/BenchResultRepository.scala b/akka-actor-tests/src/test/scala/akka/performance/workbench/BenchResultRepository.scala index 048606d322..c1b2456b2d 100644 --- a/akka-actor-tests/src/test/scala/akka/performance/workbench/BenchResultRepository.scala +++ b/akka-actor-tests/src/test/scala/akka/performance/workbench/BenchResultRepository.scala @@ -11,10 +11,9 @@ import java.io.ObjectOutputStream import java.io.PrintWriter import java.text.SimpleDateFormat import java.util.Date - import scala.collection.mutable.{ Map ⇒ MutableMap } +import akka.AkkaApplication -import akka.event.EventHandler trait BenchResultRepository { def add(stats: Stats) @@ -32,11 +31,10 @@ trait BenchResultRepository { } object BenchResultRepository { - private val repository = new FileBenchResultRepository - def apply(): BenchResultRepository = repository + def apply(app: AkkaApplication): BenchResultRepository = new FileBenchResultRepository(app) } -class FileBenchResultRepository extends BenchResultRepository { +class FileBenchResultRepository(val app: AkkaApplication) extends BenchResultRepository { private val statsByName = MutableMap[String, Seq[Stats]]() private val baselineStats = MutableMap[Key, Stats]() private val historicalStats = MutableMap[Key, Seq[Stats]]() @@ -105,7 +103,7 @@ class FileBenchResultRepository extends BenchResultRepository { out.writeObject(stats) } catch { case e: Exception ⇒ - EventHandler.error(this, "Failed to save [%s] to [%s], due to [%s]". + app.eventHandler.error(this, "Failed to save [%s] to [%s], due to [%s]". format(stats, f.getAbsolutePath, e.getMessage)) } finally { if (out ne null) try { out.close() } catch { case ignore: Exception ⇒ } @@ -122,7 +120,7 @@ class FileBenchResultRepository extends BenchResultRepository { Some(stats) } catch { case e: Throwable ⇒ - EventHandler.error(this, "Failed to load from [%s], due to [%s]". + app.eventHandler.error(this, "Failed to load from [%s], due to [%s]". format(f.getAbsolutePath, e.getMessage)) None } finally { @@ -146,7 +144,7 @@ class FileBenchResultRepository extends BenchResultRepository { writer.flush() } catch { case e: Exception ⇒ - EventHandler.error(this, "Failed to save report to [%s], due to [%s]". + app.eventHandler.error(this, "Failed to save report to [%s], due to [%s]". format(f.getAbsolutePath, e.getMessage)) } finally { if (writer ne null) try { writer.close() } catch { case ignore: Exception ⇒ } diff --git a/akka-actor-tests/src/test/scala/akka/performance/workbench/Report.scala b/akka-actor-tests/src/test/scala/akka/performance/workbench/Report.scala index c3f188a85e..d3a5f020d0 100644 --- a/akka-actor-tests/src/test/scala/akka/performance/workbench/Report.scala +++ b/akka-actor-tests/src/test/scala/akka/performance/workbench/Report.scala @@ -5,7 +5,6 @@ import java.text.SimpleDateFormat import java.util.Date import scala.collection.JavaConversions.asScalaBuffer import scala.collection.JavaConversions.enumerationAsScalaIterator -import akka.event.EventHandler import akka.AkkaApplication class Report(app: AkkaApplication, @@ -53,7 +52,7 @@ class Report(app: AkkaApplication, resultRepository.saveHtmlReport(sb.toString, reportName) if (log) { - EventHandler.info(this, resultTable + "Charts in html report: " + resultRepository.htmlReportUrl(reportName)) + app.eventHandler.info(this, resultTable + "Charts in html report: " + resultRepository.htmlReportUrl(reportName)) } } diff --git a/akka-actor/src/main/java/akka/event/JavaEventHandler.java b/akka-actor/src/main/java/akka/event/JavaEventHandler.java deleted file mode 100644 index f6e0224ed3..0000000000 --- a/akka-actor/src/main/java/akka/event/JavaEventHandler.java +++ /dev/null @@ -1,37 +0,0 @@ -/** - * Copyright (C) 2009-2011 Typesafe Inc. - */ - -package akka.event; - -import akka.actor.ActorRef; - -/** - * Java API for Akka EventHandler - */ -public class JavaEventHandler { - - - public static void notify(Object message) { - EventHandler$.MODULE$.notify(message); - } - - public static void debug(ActorRef instance, Object message) { - EventHandler$.MODULE$.debug(instance, message); - } - - public static void info(ActorRef instance, Object message) { - EventHandler$.MODULE$.info(instance,message); - } - - public static void warning(ActorRef instance, Object message) { - EventHandler$.MODULE$.warning(instance,message); - } - - public static void error(ActorRef instance, Object message) { - EventHandler$.MODULE$.debug(instance,message); - } - -} - - diff --git a/akka-actor/src/main/scala/akka/AkkaApplication.scala b/akka-actor/src/main/scala/akka/AkkaApplication.scala index 7547157e38..7012431835 100644 --- a/akka-actor/src/main/scala/akka/AkkaApplication.scala +++ b/akka-actor/src/main/scala/akka/AkkaApplication.scala @@ -16,6 +16,9 @@ import akka.dispatch.UnboundedMailbox import akka.routing.Routing import remote.RemoteSupport import akka.serialization.Serialization +import akka.event.EventHandler +import akka.event.EventHandlerLogging +import akka.event.Logging object AkkaApplication { @@ -124,6 +127,14 @@ class AkkaApplication(val name: String, val config: Configuration) extends Actor import AkkaConfig._ + if (ConfigVersion != Version) + throw new ConfigurationException("Akka JAR version [" + Version + + "] does not match the provided config version [" + ConfigVersion + "]") + + val eventHandler = new EventHandler(this) + + val log: Logging = new EventHandlerLogging(eventHandler, this) + val startTime = System.currentTimeMillis def uptime = (System.currentTimeMillis - startTime) / 1000 @@ -137,10 +148,6 @@ class AkkaApplication(val name: String, val config: Configuration) extends Actor case value ⇒ value } - if (ConfigVersion != Version) - throw new ConfigurationException("Akka JAR version [" + Version + - "] does not match the provided config version [" + ConfigVersion + "]") - // TODO correctly pull its config from the config val dispatcherFactory = new Dispatchers(this) diff --git a/akka-actor/src/main/scala/akka/actor/Actor.scala b/akka-actor/src/main/scala/akka/actor/Actor.scala index bb7cc469f2..d8d81b90c8 100644 --- a/akka-actor/src/main/scala/akka/actor/Actor.scala +++ b/akka-actor/src/main/scala/akka/actor/Actor.scala @@ -149,17 +149,17 @@ object Actor { /** * This decorator adds invocation logging to a Receive function. */ - class LoggingReceive(source: AnyRef, r: Receive) extends Receive { + class LoggingReceive(source: AnyRef, r: Receive)(implicit app: AkkaApplication) extends Receive { def isDefinedAt(o: Any) = { val handled = r.isDefinedAt(o) - EventHandler.debug(source, "received " + (if (handled) "handled" else "unhandled") + " message " + o) + app.eventHandler.debug(source, "received " + (if (handled) "handled" else "unhandled") + " message " + o) handled } def apply(o: Any): Unit = r(o) } object LoggingReceive { - def apply(source: AnyRef, r: Receive): Receive = r match { + def apply(source: AnyRef, r: Receive)(implicit app: AkkaApplication): Receive = r match { case _: LoggingReceive ⇒ r case _ ⇒ new LoggingReceive(source, r) } @@ -211,9 +211,9 @@ trait Actor { context } - implicit def app = context.application + implicit def app = context.app - private def config = context.application.AkkaConfig + private def config = context.app.AkkaConfig /** * The default timeout, based on the config setting 'akka.actor.timeout' @@ -423,7 +423,7 @@ trait Actor { throw new InvalidMessageException("Message from [" + channel + "] to [" + self.toString + "] is null") def autoReceiveMessage(msg: AutoReceivedMessage) { - if (config.DebugAutoReceive) EventHandler.debug(this, "received AutoReceiveMessage " + msg) + if (config.DebugAutoReceive) app.eventHandler.debug(this, "received AutoReceiveMessage " + msg) msg match { case HotSwap(code, discardOld) ⇒ become(code(self), discardOld) @@ -468,7 +468,6 @@ object Address { def validate(address: String) { if (!validAddressPattern.matcher(address).matches) { val e = new IllegalArgumentException("Address [" + address + "] is not valid, need to follow pattern: " + validAddressPattern.pattern) - EventHandler.error(e, this, e.getMessage) throw e } } diff --git a/akka-actor/src/main/scala/akka/actor/ActorCell.scala b/akka-actor/src/main/scala/akka/actor/ActorCell.scala index 84f0897842..913b30ce88 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorCell.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorCell.scala @@ -9,7 +9,7 @@ import akka.util._ import scala.annotation.tailrec import scala.collection.immutable.Stack import scala.collection.JavaConverters -import akka.event.{ InVMMonitoring, EventHandler } +import akka.event.InVMMonitoring import java.util.concurrent.{ ScheduledFuture, TimeUnit } import java.util.{ Collection ⇒ JCollection, Collections ⇒ JCollections } import akka.AkkaApplication @@ -49,7 +49,8 @@ private[akka] trait ActorContext extends ActorRefFactory { def handleChildTerminated(child: ActorRef): Unit - def application: AkkaApplication + def app: AkkaApplication + } case class ChildRestartStats(val child: ActorRef, var maxNrOfRetriesCount: Int = 0, var restartTimeWindowStartNanos: Long = 0L) { @@ -195,7 +196,7 @@ case class OneForOneStrategy(trapExit: List[Class[_ <: Throwable]], fail.actor.restart(fail.cause) else fail.actor.stop() //TODO optimization to drop child here already? - case None ⇒ EventHandler.warning(this, "Got Failure from non-child: " + fail) + case None ⇒ throw new AssertionError("Got Failure from non-child: " + fail) } } } @@ -207,7 +208,7 @@ private[akka] object ActorCell { } private[akka] class ActorCell( - val application: AkkaApplication, + val app: AkkaApplication, val self: ActorRef with ScalaActorRef, val props: Props, @volatile var receiveTimeout: Option[Long], @@ -215,7 +216,7 @@ private[akka] class ActorCell( import ActorCell._ - def provider = application.provider + def provider = app.provider @volatile var futureTimeout: Option[ScheduledFuture[AnyRef]] = None //FIXME TODO Doesn't need to be volatile either, since it will only ever be accessed when a message is processed @@ -251,7 +252,7 @@ private[akka] class ActorCell( } } - application.registry.register(self) + app.registry.register(self) dispatcher.attach(this) } @@ -333,10 +334,10 @@ private[akka] class ActorCell( actor = created created.preStart() checkReceiveTimeout - if (application.AkkaConfig.DebugLifecycle) EventHandler.debug(created, "started") + if (app.AkkaConfig.DebugLifecycle) app.eventHandler.debug(self, "started") } catch { case e ⇒ try { - EventHandler.error(e, this, "error while creating actor") + app.eventHandler.error(e, self, "error while creating actor") // prevent any further messages to be processed until the actor has been restarted dispatcher.suspend(this) envelope.channel.sendException(e) @@ -347,7 +348,7 @@ private[akka] class ActorCell( def recreate(cause: Throwable): Unit = try { val failedActor = actor - if (application.AkkaConfig.DebugLifecycle) EventHandler.debug(failedActor, "restarting") + if (app.AkkaConfig.DebugLifecycle) app.eventHandler.debug(self, "restarting") val freshActor = newActor() if (failedActor ne null) { val c = currentMessage //One read only plz @@ -361,14 +362,14 @@ private[akka] class ActorCell( } actor = freshActor // assign it here so if preStart fails, we can null out the sef-refs next call freshActor.postRestart(cause) - if (application.AkkaConfig.DebugLifecycle) EventHandler.debug(freshActor, "restarted") + if (app.AkkaConfig.DebugLifecycle) app.eventHandler.debug(self, "restarted") dispatcher.resume(this) //FIXME should this be moved down? props.faultHandler.handleSupervisorRestarted(cause, self, _children) } catch { case e ⇒ try { - EventHandler.error(e, this, "error while creating actor") + app.eventHandler.error(e, self, "error while creating actor") // prevent any further messages to be processed until the actor has been restarted dispatcher.suspend(this) envelope.channel.sendException(e) @@ -384,13 +385,13 @@ private[akka] class ActorCell( def terminate() { receiveTimeout = None cancelReceiveTimeout - application.provider.evict(self.address) - application.registry.unregister(self) + app.provider.evict(self.address) + app.registry.unregister(self) dispatcher.detach(this) try { val a = actor - if (application.AkkaConfig.DebugLifecycle) EventHandler.debug(a, "stopping") + if (app.AkkaConfig.DebugLifecycle) app.eventHandler.debug(self, "stopping") if (a ne null) a.postStop() //Stop supervised actors @@ -416,8 +417,8 @@ private[akka] class ActorCell( val links = _children if (!links.exists(_.child == child)) { _children = links :+ ChildRestartStats(child) - if (application.AkkaConfig.DebugLifecycle) EventHandler.debug(actor, "now supervising " + child) - } else EventHandler.warning(actor, "Already supervising " + child) + if (app.AkkaConfig.DebugLifecycle) app.eventHandler.debug(self, "now supervising " + child) + } else app.eventHandler.warning(self, "Already supervising " + child) } try { @@ -428,10 +429,10 @@ private[akka] class ActorCell( case Recreate(cause) ⇒ recreate(cause) case Link(subject) ⇒ akka.event.InVMMonitoring.link(self, subject) - if (application.AkkaConfig.DebugLifecycle) EventHandler.debug(actor, "now monitoring " + subject) + if (app.AkkaConfig.DebugLifecycle) app.eventHandler.debug(self, "now monitoring " + subject) case Unlink(subject) ⇒ akka.event.InVMMonitoring.unlink(self, subject) - if (application.AkkaConfig.DebugLifecycle) EventHandler.debug(actor, "stopped monitoring " + subject) + if (app.AkkaConfig.DebugLifecycle) app.eventHandler.debug(self, "stopped monitoring " + subject) case Suspend ⇒ suspend() case Resume ⇒ resume() case Terminate ⇒ terminate() @@ -440,7 +441,7 @@ private[akka] class ActorCell( } } catch { case e ⇒ //Should we really catch everything here? - EventHandler.error(e, actor, "error while processing " + envelope.message) + app.eventHandler.error(e, self, "error while processing " + envelope.message) //TODO FIXME How should problems here be handled? throw e } finally { @@ -461,7 +462,7 @@ private[akka] class ActorCell( currentMessage = null // reset current message after successful invocation } catch { case e ⇒ - EventHandler.error(e, self, e.getMessage) + app.eventHandler.error(e, self, e.getMessage) // prevent any further messages to be processed until the actor has been restarted dispatcher.suspend(this) @@ -480,7 +481,7 @@ private[akka] class ActorCell( } } catch { case e ⇒ - EventHandler.error(e, actor, e.getMessage) + app.eventHandler.error(e, self, e.getMessage) throw e } } else { diff --git a/akka-actor/src/main/scala/akka/actor/Deployer.scala b/akka-actor/src/main/scala/akka/actor/Deployer.scala index b60f275de8..3653069e95 100644 --- a/akka-actor/src/main/scala/akka/actor/Deployer.scala +++ b/akka-actor/src/main/scala/akka/actor/Deployer.scala @@ -26,15 +26,15 @@ trait ActorDeployer { * * @author Jonas Bonér */ -class Deployer(val application: AkkaApplication) extends ActorDeployer { +class Deployer(val app: AkkaApplication) extends ActorDeployer { - val deploymentConfig = new DeploymentConfig(application) + val deploymentConfig = new DeploymentConfig(app) // val defaultAddress = Node(Config.nodename) lazy val instance: ActorDeployer = { - val deployer = if (application.reflective.ClusterModule.isEnabled) { - application.reflective.ClusterModule.clusterDeployer + val deployer = if (app.reflective.ClusterModule.isEnabled) { + app.reflective.ClusterModule.clusterDeployer } else { LocalDeployer } @@ -80,14 +80,14 @@ class Deployer(val application: AkkaApplication) extends ActorDeployer { lookupInConfig(address) } catch { case e: ConfigurationException ⇒ - EventHandler.error(e, this, e.getMessage) + app.eventHandler.error(e, this, e.getMessage) throw e } newDeployment foreach { d ⇒ if (d eq null) { val e = new IllegalStateException("Deployment for address [" + address + "] is null") - EventHandler.error(e, this, e.getMessage) + app.eventHandler.error(e, this, e.getMessage) throw e } deploy(d) // deploy and cache it @@ -106,7 +106,7 @@ class Deployer(val application: AkkaApplication) extends ActorDeployer { private[akka] def addressesInConfig: List[String] = { val deploymentPath = "akka.actor.deployment" - application.config.getSection(deploymentPath) match { + app.config.getSection(deploymentPath) match { case None ⇒ Nil case Some(addressConfig) ⇒ addressConfig.map.keySet @@ -118,7 +118,7 @@ class Deployer(val application: AkkaApplication) extends ActorDeployer { /** * Lookup deployment in 'akka.conf' configuration file. */ - private[akka] def lookupInConfig(address: String, configuration: Configuration = application.config): Option[Deploy] = { + private[akka] def lookupInConfig(address: String, configuration: Configuration = app.config): Option[Deploy] = { import akka.util.ReflectiveAccess.{ createInstance, emptyArguments, emptyParams, getClassFor } // -------------------------------- @@ -332,13 +332,13 @@ class Deployer(val application: AkkaApplication) extends ActorDeployer { private[akka] def throwDeploymentBoundException(deployment: Deploy): Nothing = { val e = new DeploymentAlreadyBoundException("Address [" + deployment.address + "] already bound to [" + deployment + "]") - EventHandler.error(e, this, e.getMessage) + app.eventHandler.error(e, this, e.getMessage) throw e } private[akka] def thrownNoDeploymentBoundException(address: String): Nothing = { val e = new NoDeploymentBoundException("Address [" + address + "] is not bound to a deployment") - EventHandler.error(e, this, e.getMessage) + app.eventHandler.error(e, this, e.getMessage) throw e } } diff --git a/akka-actor/src/main/scala/akka/actor/FSM.scala b/akka-actor/src/main/scala/akka/actor/FSM.scala index da95c8745a..5cbeec8b09 100644 --- a/akka-actor/src/main/scala/akka/actor/FSM.scala +++ b/akka-actor/src/main/scala/akka/actor/FSM.scala @@ -420,7 +420,7 @@ trait FSM[S, D] extends ListenerManagement { */ private val handleEventDefault: StateFunction = { case Event(value, stateData) ⇒ - EventHandler.warning(this, "unhandled event " + value + " in state " + stateName) + app.eventHandler.warning(context.self, "unhandled event " + value + " in state " + stateName) stay } private var handleEvent: StateFunction = handleEventDefault @@ -471,7 +471,7 @@ trait FSM[S, D] extends ListenerManagement { actorRef ! CurrentState(self, currentState.stateName) } catch { case e: ActorInitializationException ⇒ - EventHandler.warning(this, "trying to register not running listener") + app.eventHandler.warning(context.self, "trying to register not running listener") } case UnsubscribeTransitionCallBack(actorRef) ⇒ removeListener(actorRef) @@ -537,8 +537,8 @@ trait FSM[S, D] extends ListenerManagement { if (!currentState.stopReason.isDefined) { val reason = nextState.stopReason.get reason match { - case Failure(ex: Throwable) ⇒ EventHandler.error(ex, this, "terminating due to Failure") - case Failure(msg) ⇒ EventHandler.error(this, msg) + case Failure(ex: Throwable) ⇒ app.eventHandler.error(ex, context.self, "terminating due to Failure") + case Failure(msg) ⇒ app.eventHandler.error(context.self, msg) case _ ⇒ } val stopEvent = StopEvent(reason, currentState.stateName, currentState.stateData) @@ -568,7 +568,7 @@ trait LoggingFSM[S, D] extends FSM[S, D] { this: Actor ⇒ def logDepth: Int = 0 - private val debugEvent = context.application.AkkaConfig.FsmDebugEvent + private val debugEvent = context.app.AkkaConfig.FsmDebugEvent private val events = new Array[Event](logDepth) private val states = new Array[AnyRef](logDepth) @@ -587,13 +587,13 @@ trait LoggingFSM[S, D] extends FSM[S, D] { this: Actor ⇒ protected[akka] abstract override def setTimer(name: String, msg: Any, timeout: Duration, repeat: Boolean): State = { if (debugEvent) - EventHandler.debug(this, "setting " + (if (repeat) "repeating " else "") + "timer '" + name + "'/" + timeout + ": " + msg) + app.eventHandler.debug(context.self, "setting " + (if (repeat) "repeating " else "") + "timer '" + name + "'/" + timeout + ": " + msg) super.setTimer(name, msg, timeout, repeat) } protected[akka] abstract override def cancelTimer(name: String) = { if (debugEvent) - EventHandler.debug(this, "canceling timer '" + name + "'") + app.eventHandler.debug(context.self, "canceling timer '" + name + "'") super.cancelTimer(name) } @@ -605,7 +605,7 @@ trait LoggingFSM[S, D] extends FSM[S, D] { this: Actor ⇒ case c: UntypedChannel ⇒ c.toString case _ ⇒ "unknown" } - EventHandler.debug(this, "processing " + event + " from " + srcstr) + app.eventHandler.debug(context.self, "processing " + event + " from " + srcstr) } if (logDepth > 0) { @@ -619,7 +619,7 @@ trait LoggingFSM[S, D] extends FSM[S, D] { this: Actor ⇒ val newState = stateName if (debugEvent && oldState != newState) - EventHandler.debug(this, "transition " + oldState + " -> " + newState) + app.eventHandler.debug(context.self, "transition " + oldState + " -> " + newState) } /** diff --git a/akka-actor/src/main/scala/akka/actor/IO.scala b/akka-actor/src/main/scala/akka/actor/IO.scala index 86b879ccff..a47e287bca 100644 --- a/akka-actor/src/main/scala/akka/actor/IO.scala +++ b/akka-actor/src/main/scala/akka/actor/IO.scala @@ -6,7 +6,6 @@ package akka.actor import akka.util.ByteString import akka.dispatch.Envelope import akka.event.EventHandler - import java.net.InetSocketAddress import java.io.IOException import java.util.concurrent.atomic.AtomicReference @@ -21,13 +20,12 @@ import java.nio.channels.{ SelectionKey, CancelledKeyException } - import scala.collection.mutable import scala.collection.immutable.Queue import scala.annotation.tailrec import scala.util.continuations._ - import com.eaio.uuid.UUID +import akka.AkkaApplication object IO { @@ -257,7 +255,7 @@ class IOManager(bufferSize: Int = 8192) extends Actor { var worker: IOWorker = _ override def preStart { - worker = new IOWorker(self, bufferSize) + worker = new IOWorker(app, self, bufferSize) worker.start() } @@ -294,7 +292,7 @@ private[akka] object IOWorker { case object Shutdown extends Request } -private[akka] class IOWorker(ioManager: ActorRef, val bufferSize: Int) { +private[akka] class IOWorker(app: AkkaApplication, ioManager: ActorRef, val bufferSize: Int) { import SelectionKey.{ OP_READ, OP_WRITE, OP_ACCEPT, OP_CONNECT } import IOWorker._ @@ -400,7 +398,7 @@ private[akka] class IOWorker(ioManager: ActorRef, val bufferSize: Int) { handle.owner ! IO.Closed(handle, cause) } catch { case e: ActorInitializationException ⇒ - EventHandler debug (this, "IO.Handle's owner not running") + app.eventHandler debug (ioManager, "IO.Handle's owner not running") } case None ⇒ } diff --git a/akka-actor/src/main/scala/akka/actor/Scheduler.scala b/akka-actor/src/main/scala/akka/actor/Scheduler.scala index f35861e6df..47bdc7a5e3 100644 --- a/akka-actor/src/main/scala/akka/actor/Scheduler.scala +++ b/akka-actor/src/main/scala/akka/actor/Scheduler.scala @@ -48,7 +48,6 @@ object Scheduler { } catch { case e: Exception ⇒ val error = SchedulerException(message + " could not be scheduled on " + receiver, e) - EventHandler.error(error, this, "%s @ %s".format(receiver, message)) throw error } } @@ -74,7 +73,6 @@ object Scheduler { } catch { case e: Exception ⇒ val error = SchedulerException("Failed to schedule a Runnable", e) - EventHandler.error(error, this, error.getMessage) throw error } } @@ -90,7 +88,6 @@ object Scheduler { } catch { case e: Exception ⇒ val error = SchedulerException(message + " could not be scheduleOnce'd on " + receiver, e) - EventHandler.error(e, this, receiver + " @ " + message) throw error } } @@ -116,7 +113,6 @@ object Scheduler { } catch { case e: Exception ⇒ val error = SchedulerException("Failed to scheduleOnce a Runnable", e) - EventHandler.error(e, this, error.getMessage) throw error } } diff --git a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala index 4302423903..9836152027 100644 --- a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala @@ -44,12 +44,12 @@ final case class SystemEnvelope(val receiver: ActorCell, val message: SystemMess } } -final case class TaskInvocation(function: () ⇒ Unit, cleanup: () ⇒ Unit) extends Runnable { +final case class TaskInvocation(app: AkkaApplication, function: () ⇒ Unit, cleanup: () ⇒ Unit) extends Runnable { def run() { try { function() } catch { - case e ⇒ EventHandler.error(e, this, e.getMessage) + case e ⇒ app.eventHandler.error(e, this, e.getMessage) } finally { cleanup() } @@ -67,7 +67,7 @@ object MessageDispatcher { /** * @author Jonas Bonér */ -abstract class MessageDispatcher extends Serializable { +abstract class MessageDispatcher(val app: AkkaApplication) extends Serializable { import MessageDispatcher._ protected val uuids = new ConcurrentSkipListSet[Uuid] @@ -144,7 +144,7 @@ abstract class MessageDispatcher extends Serializable { _tasks.getAndIncrement() try { startIfUnstarted() - executeTask(TaskInvocation(block, taskCleanup)) + executeTask(TaskInvocation(app, block, taskCleanup)) } catch { case e ⇒ _tasks.decrementAndGet @@ -331,7 +331,7 @@ abstract class MessageDispatcherConfigurator(val application: AkkaApplication) { import ThreadPoolConfigDispatcherBuilder.conf_? //Apply the following options to the config if they are present in the config - ThreadPoolConfigDispatcherBuilder(createDispatcher, ThreadPoolConfig()).configure( + ThreadPoolConfigDispatcherBuilder(createDispatcher, ThreadPoolConfig(application)).configure( conf_?(config getInt "keep-alive-time")(time ⇒ _.setKeepAliveTime(Duration(time, application.AkkaConfig.DefaultTimeUnit))), conf_?(config getDouble "core-pool-size-factor")(factor ⇒ _.setCorePoolSizeFromFactor(factor)), conf_?(config getDouble "max-pool-size-factor")(factor ⇒ _.setMaxPoolSizeFromFactor(factor)), diff --git a/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala index dc1f197ef2..5a267c24fc 100644 --- a/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala @@ -9,6 +9,7 @@ import akka.actor.{ ActorCell, Actor, IllegalActorStateException } import java.util.concurrent.{ LinkedBlockingQueue, ConcurrentLinkedQueue, ConcurrentSkipListSet } import java.util.{ Comparator, Queue } import annotation.tailrec +import akka.AkkaApplication /** * An executor based event driven dispatcher which will try to redistribute work from busy actors to idle actors. It is assumed @@ -27,13 +28,14 @@ import annotation.tailrec * @author Viktor Klang */ class BalancingDispatcher( + _app: AkkaApplication, _name: String, throughput: Int, throughputDeadlineTime: Int, mailboxType: MailboxType, config: ThreadPoolConfig, _timeoutMs: Long) - extends Dispatcher(_name, throughput, throughputDeadlineTime, mailboxType, config, _timeoutMs) { + extends Dispatcher(_app, _name, throughput, throughputDeadlineTime, mailboxType, config, _timeoutMs) { private val buddies = new ConcurrentSkipListSet[ActorCell](new Comparator[ActorCell] { def compare(a: ActorCell, b: ActorCell) = a.uuid.compareTo(b.uuid) }) //new ConcurrentLinkedQueue[ActorCell]() diff --git a/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala index bdfd411cd6..7455f7970b 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala @@ -8,6 +8,7 @@ import akka.event.EventHandler import java.util.concurrent.atomic.AtomicReference import java.util.concurrent.{ TimeUnit, ExecutorService, RejectedExecutionException, ConcurrentLinkedQueue } import akka.actor.{ ActorCell, ActorKilledException } +import akka.AkkaApplication /** * Default settings are: @@ -63,13 +64,14 @@ import akka.actor.{ ActorCell, ActorKilledException } * Larger values (or zero or negative) increase throughput, smaller values increase fairness */ class Dispatcher( + _app: AkkaApplication, _name: String, val throughput: Int, val throughputDeadlineTime: Int, val mailboxType: MailboxType, executorServiceFactoryProvider: ExecutorServiceFactoryProvider, val timeoutMs: Long) - extends MessageDispatcher { + extends MessageDispatcher(_app) { val name = "akka:event-driven:dispatcher:" + _name @@ -93,7 +95,7 @@ class Dispatcher( executorService.get() execute invocation } catch { case e: RejectedExecutionException ⇒ - EventHandler.warning(this, e.toString) + app.eventHandler.warning(this, e.toString) throw e } } @@ -119,7 +121,7 @@ class Dispatcher( true } catch { case e: RejectedExecutionException ⇒ - EventHandler.warning(this, e.toString) + app.eventHandler.warning(this, e.toString) mbox.setAsIdle() throw e } diff --git a/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala b/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala index f3b2456450..f8141925fe 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala @@ -43,15 +43,15 @@ import akka.AkkaApplication * * @author Jonas Bonér */ -class Dispatchers(val application: AkkaApplication) { - val ThroughputDeadlineTimeMillis = application.AkkaConfig.DispatcherThroughputDeadlineTime.toMillis.toInt +class Dispatchers(val app: AkkaApplication) { + val ThroughputDeadlineTimeMillis = app.AkkaConfig.DispatcherThroughputDeadlineTime.toMillis.toInt val MailboxType: MailboxType = - if (application.AkkaConfig.MailboxCapacity < 1) UnboundedMailbox() - else BoundedMailbox(application.AkkaConfig.MailboxCapacity, application.AkkaConfig.MailboxPushTimeout) - val DispatcherShutdownMillis = application.AkkaConfig.DispatcherDefaultShutdown.toMillis + if (app.AkkaConfig.MailboxCapacity < 1) UnboundedMailbox() + else BoundedMailbox(app.AkkaConfig.MailboxCapacity, app.AkkaConfig.MailboxPushTimeout) + val DispatcherShutdownMillis = app.AkkaConfig.DispatcherDefaultShutdown.toMillis lazy val defaultGlobalDispatcher = - application.config.getSection("akka.actor.default-dispatcher").flatMap(from) getOrElse newDispatcher("AkkaDefaultGlobalDispatcher", 1, MailboxType).build + app.config.getSection("akka.actor.default-dispatcher").flatMap(from) getOrElse newDispatcher("AkkaDefaultGlobalDispatcher", 1, MailboxType).build /** * Creates an thread based dispatcher serving a single actor through the same single thread. @@ -60,8 +60,8 @@ class Dispatchers(val application: AkkaApplication) { * E.g. each actor consumes its own thread. */ def newPinnedDispatcher(actor: LocalActorRef) = actor match { - case null ⇒ new PinnedDispatcher(null, "anon", MailboxType, DispatcherShutdownMillis) - case some ⇒ new PinnedDispatcher(some.underlying, some.underlying.uuid.toString, MailboxType, DispatcherShutdownMillis) + case null ⇒ new PinnedDispatcher(app, null, "anon", MailboxType, DispatcherShutdownMillis) + case some ⇒ new PinnedDispatcher(app, some.underlying, some.underlying.uuid.toString, MailboxType, DispatcherShutdownMillis) } /** @@ -71,8 +71,8 @@ class Dispatchers(val application: AkkaApplication) { * E.g. each actor consumes its own thread. */ def newPinnedDispatcher(actor: LocalActorRef, mailboxType: MailboxType) = actor match { - case null ⇒ new PinnedDispatcher(null, "anon", mailboxType, DispatcherShutdownMillis) - case some ⇒ new PinnedDispatcher(some.underlying, some.underlying.uuid.toString, mailboxType, DispatcherShutdownMillis) + case null ⇒ new PinnedDispatcher(app, null, "anon", mailboxType, DispatcherShutdownMillis) + case some ⇒ new PinnedDispatcher(app, some.underlying, some.underlying.uuid.toString, mailboxType, DispatcherShutdownMillis) } /** @@ -81,7 +81,7 @@ class Dispatchers(val application: AkkaApplication) { * E.g. each actor consumes its own thread. */ def newPinnedDispatcher(name: String, mailboxType: MailboxType) = - new PinnedDispatcher(null, name, mailboxType, DispatcherShutdownMillis) + new PinnedDispatcher(app, null, name, mailboxType, DispatcherShutdownMillis) /** * Creates an thread based dispatcher serving a single actor through the same single thread. @@ -89,7 +89,7 @@ class Dispatchers(val application: AkkaApplication) { * E.g. each actor consumes its own thread. */ def newPinnedDispatcher(name: String) = - new PinnedDispatcher(null, name, MailboxType, DispatcherShutdownMillis) + new PinnedDispatcher(app, null, name, MailboxType, DispatcherShutdownMillis) /** * Creates a executor-based event-driven dispatcher serving multiple (millions) of actors through a thread pool. @@ -97,8 +97,8 @@ class Dispatchers(val application: AkkaApplication) { * Has a fluent builder interface for configuring its semantics. */ def newDispatcher(name: String) = - ThreadPoolConfigDispatcherBuilder(config ⇒ new Dispatcher(name, application.AkkaConfig.DispatcherThroughput, - ThroughputDeadlineTimeMillis, MailboxType, config, DispatcherShutdownMillis), ThreadPoolConfig()) + ThreadPoolConfigDispatcherBuilder(config ⇒ new Dispatcher(app, name, app.AkkaConfig.DispatcherThroughput, + ThroughputDeadlineTimeMillis, MailboxType, config, DispatcherShutdownMillis), ThreadPoolConfig(app)) /** * Creates a executor-based event-driven dispatcher serving multiple (millions) of actors through a thread pool. @@ -107,7 +107,7 @@ class Dispatchers(val application: AkkaApplication) { */ def newDispatcher(name: String, throughput: Int, mailboxType: MailboxType) = ThreadPoolConfigDispatcherBuilder(config ⇒ - new Dispatcher(name, throughput, ThroughputDeadlineTimeMillis, mailboxType, config, DispatcherShutdownMillis), ThreadPoolConfig()) + new Dispatcher(app, name, throughput, ThroughputDeadlineTimeMillis, mailboxType, config, DispatcherShutdownMillis), ThreadPoolConfig(app)) /** * Creates a executor-based event-driven dispatcher serving multiple (millions) of actors through a thread pool. @@ -116,7 +116,7 @@ class Dispatchers(val application: AkkaApplication) { */ def newDispatcher(name: String, throughput: Int, throughputDeadlineMs: Int, mailboxType: MailboxType) = ThreadPoolConfigDispatcherBuilder(config ⇒ - new Dispatcher(name, throughput, throughputDeadlineMs, mailboxType, config, DispatcherShutdownMillis), ThreadPoolConfig()) + new Dispatcher(app, name, throughput, throughputDeadlineMs, mailboxType, config, DispatcherShutdownMillis), ThreadPoolConfig(app)) /** * Creates a executor-based event-driven dispatcher, with work-stealing, serving multiple (millions) of actors through a thread pool. @@ -124,8 +124,8 @@ class Dispatchers(val application: AkkaApplication) { * Has a fluent builder interface for configuring its semantics. */ def newBalancingDispatcher(name: String) = - ThreadPoolConfigDispatcherBuilder(config ⇒ new BalancingDispatcher(name, application.AkkaConfig.DispatcherThroughput, - ThroughputDeadlineTimeMillis, MailboxType, config, DispatcherShutdownMillis), ThreadPoolConfig()) + ThreadPoolConfigDispatcherBuilder(config ⇒ new BalancingDispatcher(app, name, app.AkkaConfig.DispatcherThroughput, + ThroughputDeadlineTimeMillis, MailboxType, config, DispatcherShutdownMillis), ThreadPoolConfig(app)) /** * Creates a executor-based event-driven dispatcher, with work-stealing, serving multiple (millions) of actors through a thread pool. @@ -134,7 +134,7 @@ class Dispatchers(val application: AkkaApplication) { */ def newBalancingDispatcher(name: String, throughput: Int) = ThreadPoolConfigDispatcherBuilder(config ⇒ - new BalancingDispatcher(name, throughput, ThroughputDeadlineTimeMillis, MailboxType, config, DispatcherShutdownMillis), ThreadPoolConfig()) + new BalancingDispatcher(app, name, throughput, ThroughputDeadlineTimeMillis, MailboxType, config, DispatcherShutdownMillis), ThreadPoolConfig(app)) /** * Creates a executor-based event-driven dispatcher, with work-stealing, serving multiple (millions) of actors through a thread pool. @@ -143,7 +143,7 @@ class Dispatchers(val application: AkkaApplication) { */ def newBalancingDispatcher(name: String, throughput: Int, mailboxType: MailboxType) = ThreadPoolConfigDispatcherBuilder(config ⇒ - new BalancingDispatcher(name, throughput, ThroughputDeadlineTimeMillis, mailboxType, config, DispatcherShutdownMillis), ThreadPoolConfig()) + new BalancingDispatcher(app, name, throughput, ThroughputDeadlineTimeMillis, mailboxType, config, DispatcherShutdownMillis), ThreadPoolConfig(app)) /** * Creates a executor-based event-driven dispatcher, with work-stealing, serving multiple (millions) of actors through a thread pool. @@ -152,13 +152,13 @@ class Dispatchers(val application: AkkaApplication) { */ def newBalancingDispatcher(name: String, throughput: Int, throughputDeadlineMs: Int, mailboxType: MailboxType) = ThreadPoolConfigDispatcherBuilder(config ⇒ - new BalancingDispatcher(name, throughput, throughputDeadlineMs, mailboxType, config, DispatcherShutdownMillis), ThreadPoolConfig()) + new BalancingDispatcher(app, name, throughput, throughputDeadlineMs, mailboxType, config, DispatcherShutdownMillis), ThreadPoolConfig(app)) /** * Utility function that tries to load the specified dispatcher config from the akka.conf * or else use the supplied default dispatcher */ def fromConfig(key: String, default: ⇒ MessageDispatcher = defaultGlobalDispatcher): MessageDispatcher = - application.config getSection key flatMap from getOrElse default + app.config getSection key flatMap from getOrElse default /* * Creates of obtains a dispatcher from a ConfigMap according to the format below @@ -185,8 +185,8 @@ class Dispatchers(val application: AkkaApplication) { */ def from(cfg: Configuration): Option[MessageDispatcher] = { cfg.getString("type") flatMap { - case "Dispatcher" ⇒ Some(new DispatcherConfigurator(application)) - case "BalancingDispatcher" ⇒ Some(new BalancingDispatcherConfigurator(application)) + case "Dispatcher" ⇒ Some(new DispatcherConfigurator(app)) + case "BalancingDispatcher" ⇒ Some(new BalancingDispatcherConfigurator(app)) case "GlobalDispatcher" ⇒ None //TODO FIXME remove this case fqn ⇒ ReflectiveAccess.getClassFor[MessageDispatcherConfigurator](fqn) match { @@ -206,26 +206,26 @@ class Dispatchers(val application: AkkaApplication) { } } -class DispatcherConfigurator(application: AkkaApplication) extends MessageDispatcherConfigurator(application) { +class DispatcherConfigurator(app: AkkaApplication) extends MessageDispatcherConfigurator(app) { def configure(config: Configuration): MessageDispatcher = { - configureThreadPool(config, threadPoolConfig ⇒ new Dispatcher( + configureThreadPool(config, threadPoolConfig ⇒ new Dispatcher(app, config.getString("name", newUuid.toString), - config.getInt("throughput", application.AkkaConfig.DispatcherThroughput), - config.getInt("throughput-deadline-time", application.AkkaConfig.DispatcherThroughputDeadlineTime.toMillis.toInt), + config.getInt("throughput", app.AkkaConfig.DispatcherThroughput), + config.getInt("throughput-deadline-time", app.AkkaConfig.DispatcherThroughputDeadlineTime.toMillis.toInt), mailboxType(config), threadPoolConfig, - application.AkkaConfig.DispatcherDefaultShutdown.toMillis)).build + app.AkkaConfig.DispatcherDefaultShutdown.toMillis)).build } } -class BalancingDispatcherConfigurator(application: AkkaApplication) extends MessageDispatcherConfigurator(application) { +class BalancingDispatcherConfigurator(app: AkkaApplication) extends MessageDispatcherConfigurator(app) { def configure(config: Configuration): MessageDispatcher = { - configureThreadPool(config, threadPoolConfig ⇒ new BalancingDispatcher( + configureThreadPool(config, threadPoolConfig ⇒ new BalancingDispatcher(app, config.getString("name", newUuid.toString), - config.getInt("throughput", application.AkkaConfig.DispatcherThroughput), - config.getInt("throughput-deadline-time", application.AkkaConfig.DispatcherThroughputDeadlineTime.toMillis.toInt), + config.getInt("throughput", app.AkkaConfig.DispatcherThroughput), + config.getInt("throughput-deadline-time", app.AkkaConfig.DispatcherThroughputDeadlineTime.toMillis.toInt), mailboxType(config), threadPoolConfig, - application.AkkaConfig.DispatcherDefaultShutdown.toMillis)).build + app.AkkaConfig.DispatcherDefaultShutdown.toMillis)).build } } diff --git a/akka-actor/src/main/scala/akka/dispatch/Future.scala b/akka-actor/src/main/scala/akka/dispatch/Future.scala index bad2d0c294..d6a46904ce 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Future.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Future.scala @@ -262,7 +262,7 @@ object Future { result completeWithResult currentValue } catch { case e: Exception ⇒ - EventHandler.error(e, this, e.getMessage) + dispatcher.app.eventHandler.error(e, this, e.getMessage) result completeWithException e } finally { results.clear @@ -596,7 +596,7 @@ sealed trait Future[+T] extends japi.Future[T] { Right(f(res)) } catch { case e: Exception ⇒ - EventHandler.error(e, this, e.getMessage) + dispatcher.app.eventHandler.error(e, this, e.getMessage) Left(e) }) } @@ -648,7 +648,7 @@ sealed trait Future[+T] extends japi.Future[T] { future.completeWith(f(r)) } catch { case e: Exception ⇒ - EventHandler.error(e, this, e.getMessage) + dispatcher.app.eventHandler.error(e, this, e.getMessage) future complete Left(e) } } @@ -681,7 +681,7 @@ sealed trait Future[+T] extends japi.Future[T] { if (p(res)) r else Left(new MatchError(res)) } catch { case e: Exception ⇒ - EventHandler.error(e, this, e.getMessage) + dispatcher.app.eventHandler.error(e, this, e.getMessage) Left(e) }) } @@ -781,7 +781,7 @@ trait Promise[T] extends Future[T] { fr completeWith cont(f) } catch { case e: Exception ⇒ - EventHandler.error(e, this, e.getMessage) + dispatcher.app.eventHandler.error(e, this, e.getMessage) fr completeWithException e } } @@ -795,7 +795,7 @@ trait Promise[T] extends Future[T] { fr completeWith cont(f) } catch { case e: Exception ⇒ - EventHandler.error(e, this, e.getMessage) + dispatcher.app.eventHandler.error(e, this, e.getMessage) fr completeWithException e } } @@ -957,7 +957,7 @@ class DefaultPromise[T](val timeout: Timeout)(implicit val dispatcher: MessageDi } else this private def notifyCompleted(func: Future[T] ⇒ Unit) { - try { func(this) } catch { case e ⇒ EventHandler.error(e, this, "Future onComplete-callback raised an exception") } //TODO catch, everything? Really? + try { func(this) } catch { case e ⇒ dispatcher.app.eventHandler.error(e, this, "Future onComplete-callback raised an exception") } //TODO catch, everything? Really? } @inline diff --git a/akka-actor/src/main/scala/akka/dispatch/PinnedDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/PinnedDispatcher.scala index 679ef6f810..7215a7379a 100644 --- a/akka-actor/src/main/scala/akka/dispatch/PinnedDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/PinnedDispatcher.scala @@ -6,14 +6,15 @@ package akka.dispatch import java.util.concurrent.atomic.AtomicReference import akka.actor.ActorCell +import akka.AkkaApplication /** * Dedicates a unique thread for each actor passed in as reference. Served through its messageQueue. * * @author Jonas Bonér */ -class PinnedDispatcher(_actor: ActorCell, _name: String, _mailboxType: MailboxType, _timeoutMs: Long) - extends Dispatcher(_name, Int.MaxValue, -1, _mailboxType, PinnedDispatcher.oneThread, _timeoutMs) { +class PinnedDispatcher(_app: AkkaApplication, _actor: ActorCell, _name: String, _mailboxType: MailboxType, _timeoutMs: Long) + extends Dispatcher(_app, _name, Int.MaxValue, -1, _mailboxType, PinnedDispatcher.oneThread(_app), _timeoutMs) { protected[akka] val owner = new AtomicReference[ActorCell](_actor) @@ -32,6 +33,6 @@ class PinnedDispatcher(_actor: ActorCell, _name: String, _mailboxType: MailboxTy } object PinnedDispatcher { - val oneThread: ThreadPoolConfig = ThreadPoolConfig(allowCorePoolTimeout = true, corePoolSize = 1, maxPoolSize = 1) + def oneThread(app: AkkaApplication): ThreadPoolConfig = ThreadPoolConfig(app, allowCorePoolTimeout = true, corePoolSize = 1, maxPoolSize = 1) } diff --git a/akka-actor/src/main/scala/akka/dispatch/ThreadPoolBuilder.scala b/akka-actor/src/main/scala/akka/dispatch/ThreadPoolBuilder.scala index 314796d61b..4d9ec28014 100644 --- a/akka-actor/src/main/scala/akka/dispatch/ThreadPoolBuilder.scala +++ b/akka-actor/src/main/scala/akka/dispatch/ThreadPoolBuilder.scala @@ -8,9 +8,9 @@ import java.util.Collection import java.util.concurrent._ import atomic.{ AtomicLong, AtomicInteger } import ThreadPoolExecutor.CallerRunsPolicy - import akka.util.Duration import akka.event.EventHandler +import akka.AkkaApplication object ThreadPoolConfig { type Bounds = Int @@ -59,7 +59,8 @@ trait ExecutorServiceFactoryProvider { def createExecutorServiceFactory(name: String): ExecutorServiceFactory } -case class ThreadPoolConfig(allowCorePoolTimeout: Boolean = ThreadPoolConfig.defaultAllowCoreThreadTimeout, +case class ThreadPoolConfig(app: AkkaApplication, + allowCorePoolTimeout: Boolean = ThreadPoolConfig.defaultAllowCoreThreadTimeout, corePoolSize: Int = ThreadPoolConfig.defaultCorePoolSize, maxPoolSize: Int = ThreadPoolConfig.defaultMaxPoolSize, threadTimeout: Duration = ThreadPoolConfig.defaultTimeout, @@ -76,7 +77,7 @@ case class ThreadPoolConfig(allowCorePoolTimeout: Boolean = ThreadPoolConfig.def case Right(bounds) ⇒ val service = new ThreadPoolExecutor(corePoolSize, maxPoolSize, threadTimeout.length, threadTimeout.unit, queueFactory(), threadFactory) service.allowCoreThreadTimeOut(allowCorePoolTimeout) - new BoundedExecutorDecorator(service, bounds) + new BoundedExecutorDecorator(app, service, bounds) } } } @@ -197,7 +198,7 @@ class MonitorableThread(runnable: Runnable, name: String) /** * @author Jonas Bonér */ -class BoundedExecutorDecorator(val executor: ExecutorService, bound: Int) extends ExecutorServiceDelegate { +class BoundedExecutorDecorator(val app: AkkaApplication, val executor: ExecutorService, bound: Int) extends ExecutorServiceDelegate { protected val semaphore = new Semaphore(bound) override def execute(command: Runnable) = { @@ -214,10 +215,10 @@ class BoundedExecutorDecorator(val executor: ExecutorService, bound: Int) extend }) } catch { case e: RejectedExecutionException ⇒ - EventHandler.warning(this, e.toString) + app.eventHandler.warning(this, e.toString) semaphore.release case e: Throwable ⇒ - EventHandler.error(e, this, e.getMessage) + app.eventHandler.error(e, this, e.getMessage) throw e } } diff --git a/akka-actor/src/main/scala/akka/event/EventHandler.scala b/akka-actor/src/main/scala/akka/event/EventHandler.scala index 00867ffb0c..5382e95076 100644 --- a/akka-actor/src/main/scala/akka/event/EventHandler.scala +++ b/akka-actor/src/main/scala/akka/event/EventHandler.scala @@ -12,6 +12,124 @@ import akka.serialization._ import akka.AkkaException import akka.AkkaApplication +object EventHandler { + + val ErrorLevel = 1 + val WarningLevel = 2 + val InfoLevel = 3 + val DebugLevel = 4 + + val errorFormat = "[ERROR] [%s] [%s] [%s] %s\n%s".intern + val warningFormat = "[WARN] [%s] [%s] [%s] %s".intern + val infoFormat = "[INFO] [%s] [%s] [%s] %s".intern + val debugFormat = "[DEBUG] [%s] [%s] [%s] %s".intern + val genericFormat = "[GENERIC] [%s] [%s]".intern + + class EventHandlerException extends AkkaException + + lazy val StandardOutLogger = new StandardOutLogger {} + + sealed trait Event { + @transient + val thread: Thread = Thread.currentThread + def level: Int + } + + case class Error(cause: Throwable, instance: AnyRef, message: Any = "") extends Event { + def level = ErrorLevel + } + + case class Warning(instance: AnyRef, message: Any = "") extends Event { + def level = WarningLevel + } + + case class Info(instance: AnyRef, message: Any = "") extends Event { + def level = InfoLevel + } + + case class Debug(instance: AnyRef, message: Any = "") extends Event { + def level = DebugLevel + } + + trait StandardOutLogger { + import java.text.SimpleDateFormat + import java.util.Date + + val dateFormat = new SimpleDateFormat("MM/dd/yyyy HH:mm:ss.S") + + def timestamp = dateFormat.format(new Date) + + def print(event: Any) { + event match { + case e: Error ⇒ error(e) + case e: Warning ⇒ warning(e) + case e: Info ⇒ info(e) + case e: Debug ⇒ debug(e) + case e ⇒ generic(e) + } + } + + def error(event: Error) = + println(errorFormat.format( + timestamp, + event.thread.getName, + instanceName(event.instance), + event.message, + stackTraceFor(event.cause))) + + def warning(event: Warning) = + println(warningFormat.format( + timestamp, + event.thread.getName, + instanceName(event.instance), + event.message)) + + def info(event: Info) = + println(infoFormat.format( + timestamp, + event.thread.getName, + instanceName(event.instance), + event.message)) + + def debug(event: Debug) = + println(debugFormat.format( + timestamp, + event.thread.getName, + instanceName(event.instance), + event.message)) + + def generic(event: Any) = + println(genericFormat.format(timestamp, event.toString)) + + def instanceName(instance: AnyRef): String = instance match { + case null ⇒ "NULL" + case a: ActorRef ⇒ a.address + case _ ⇒ instance.getClass.getSimpleName + } + } + + class DefaultListener extends Actor with StandardOutLogger { + def receive = { case event ⇒ print(event) } + } + + def stackTraceFor(e: Throwable) = { + import java.io.{ StringWriter, PrintWriter } + val sw = new StringWriter + val pw = new PrintWriter(sw) + e.printStackTrace(pw) + sw.toString + } + + private def levelFor(eventClass: Class[_ <: Event]) = { + if (classOf[Error].isAssignableFrom(eventClass)) ErrorLevel + else if (classOf[Warning].isAssignableFrom(eventClass)) WarningLevel + else if (classOf[Info].isAssignableFrom(eventClass)) InfoLevel + else if (classOf[Debug].isAssignableFrom(eventClass)) DebugLevel + else DebugLevel + } + +} + /** * Event handler. *

@@ -53,60 +171,22 @@ import akka.AkkaApplication * * @author Jonas Bonér */ -object EventHandler extends ListenerManagement { +class EventHandler(app: AkkaApplication) extends ListenerManagement { - // TODO remove this EVIL thing! - private val appl = AkkaApplication("akka-reference.conf") + import EventHandler._ val synchronousLogging: Boolean = System.getProperty("akka.event.force-sync") match { case null | "" ⇒ false case _ ⇒ true } - val ErrorLevel = 1 - val WarningLevel = 2 - val InfoLevel = 3 - val DebugLevel = 4 - - sealed trait Event { - @transient - val thread: Thread = Thread.currentThread - def level: Int - } - - case class Error(cause: Throwable, instance: AnyRef, message: Any = "") extends Event { - def level = ErrorLevel - } - - case class Warning(instance: AnyRef, message: Any = "") extends Event { - def level = WarningLevel - } - - case class Info(instance: AnyRef, message: Any = "") extends Event { - def level = InfoLevel - } - - case class Debug(instance: AnyRef, message: Any = "") extends Event { - def level = DebugLevel - } - - val errorFormat = "[ERROR] [%s] [%s] [%s] %s\n%s".intern - val warningFormat = "[WARN] [%s] [%s] [%s] %s".intern - val infoFormat = "[INFO] [%s] [%s] [%s] %s".intern - val debugFormat = "[DEBUG] [%s] [%s] [%s] %s".intern - val genericFormat = "[GENERIC] [%s] [%s]".intern - - class EventHandlerException extends AkkaException - - lazy val StandardOutLogger = new StandardOutLogger {} - lazy val EventHandlerDispatcher = - appl.dispatcherFactory.fromConfig("akka.event-handler-dispatcher", appl.dispatcherFactory.newDispatcher("event-handler-dispatcher").setCorePoolSize(2).build) + app.dispatcherFactory.fromConfig("akka.event-handler-dispatcher", app.dispatcherFactory.newDispatcher("event-handler-dispatcher").setCorePoolSize(2).build) implicit object defaultListenerFormat extends StatelessActorFormat[DefaultListener] @volatile - var level: Int = appl.AkkaConfig.LogLevel match { + var level: Int = app.AkkaConfig.LogLevel match { case "ERROR" | "error" ⇒ ErrorLevel case "WARNING" | "warning" ⇒ WarningLevel case "INFO" | "info" ⇒ InfoLevel @@ -117,14 +197,14 @@ object EventHandler extends ListenerManagement { def start() { try { - val defaultListeners = appl.AkkaConfig.EventHandlers match { + val defaultListeners = app.AkkaConfig.EventHandlers match { case Nil ⇒ "akka.event.EventHandler$DefaultListener" :: Nil case listeners ⇒ listeners } defaultListeners foreach { listenerName ⇒ try { ReflectiveAccess.getClassFor[Actor](listenerName) match { - case Right(actorClass) ⇒ addListener(new LocalActorRef(appl, Props(actorClass).withDispatcher(EventHandlerDispatcher), newUuid.toString, systemService = true)) + case Right(actorClass) ⇒ addListener(new LocalActorRef(app, Props(actorClass).withDispatcher(EventHandlerDispatcher), newUuid.toString, systemService = true)) case Left(exception) ⇒ throw exception } } catch { @@ -207,87 +287,10 @@ object EventHandler extends ListenerManagement { def isDebugEnabled = level >= DebugLevel - def stackTraceFor(e: Throwable) = { - import java.io.{ StringWriter, PrintWriter } - val sw = new StringWriter - val pw = new PrintWriter(sw) - e.printStackTrace(pw) - sw.toString - } - - private def levelFor(eventClass: Class[_ <: Event]) = { - if (classOf[Error].isAssignableFrom(eventClass)) ErrorLevel - else if (classOf[Warning].isAssignableFrom(eventClass)) WarningLevel - else if (classOf[Info].isAssignableFrom(eventClass)) InfoLevel - else if (classOf[Debug].isAssignableFrom(eventClass)) DebugLevel - else DebugLevel - } - private def log(event: Any) { if (synchronousLogging) StandardOutLogger.print(event) else notifyListeners(event) } - trait StandardOutLogger { - import java.text.SimpleDateFormat - import java.util.Date - - val dateFormat = new SimpleDateFormat("MM/dd/yyyy HH:mm:ss.S") - - def timestamp = dateFormat.format(new Date) - - def print(event: Any) { - event match { - case e: Error ⇒ error(e) - case e: Warning ⇒ warning(e) - case e: Info ⇒ info(e) - case e: Debug ⇒ debug(e) - case e ⇒ generic(e) - } - } - - def error(event: Error) = - println(errorFormat.format( - timestamp, - event.thread.getName, - instanceName(event.instance), - event.message, - stackTraceFor(event.cause))) - - def warning(event: Warning) = - println(warningFormat.format( - timestamp, - event.thread.getName, - instanceName(event.instance), - event.message)) - - def info(event: Info) = - println(infoFormat.format( - timestamp, - event.thread.getName, - instanceName(event.instance), - event.message)) - - def debug(event: Debug) = - println(debugFormat.format( - timestamp, - event.thread.getName, - instanceName(event.instance), - event.message)) - - def generic(event: Any) = - println(genericFormat.format(timestamp, event.toString)) - - def instanceName(instance: AnyRef): String = instance match { - case null ⇒ "NULL" - case a: ActorRef ⇒ a.address - case _ ⇒ instance.getClass.getSimpleName - } - } - - class DefaultListener extends Actor with StandardOutLogger { - def receive = { case event ⇒ print(event) } - } - start() } diff --git a/akka-actor/src/main/scala/akka/event/Logging.scala b/akka-actor/src/main/scala/akka/event/Logging.scala new file mode 100644 index 0000000000..92ff0da5e1 --- /dev/null +++ b/akka-actor/src/main/scala/akka/event/Logging.scala @@ -0,0 +1,108 @@ +/** + * Copyright (C) 2009-2011 Typesafe Inc. + */ +package akka.event +import akka.actor.Actor + +/** + * Logging wrapper to make nicer and optimize: provide template versions which + * evaluate .toString only if the log level is actually enabled. + */ +trait Logging { + + /* + * implement these as precisely as needed/possible: always returning true + * just makes the notify... methods be called every time. + */ + def isErrorEnabled: Boolean + def isWarningEnabled: Boolean + def isInfoEnabled: Boolean + def isDebugEnabled: Boolean + + /* + * These actually implement the passing on of the messages to be logged. + * Will not be called if is...Enabled returned false. + */ + protected def notifyError(cause: Throwable, message: String) + protected def notifyWarning(message: String) + protected def notifyInfo(message: String) + protected def notifyDebug(message: String) + + /* + * The rest is just the widening of the API for the user's convenience. + */ + + def error(cause: Throwable, message: String) { if (isErrorEnabled) notifyError(cause, message) } + def error(cause: Throwable, template: String, arg1: Any) { if (isErrorEnabled) error(cause, format(template, arg1)) } + def error(cause: Throwable, template: String, arg1: Any, arg2: Any) { if (isErrorEnabled) error(cause, format(template, arg1, arg2)) } + def error(cause: Throwable, template: String, arg1: Any, arg2: Any, arg3: Any) { if (isErrorEnabled) error(cause, format(template, arg1, arg2, arg3)) } + def error(cause: Throwable, template: String, arg1: Any, arg2: Any, arg3: Any, arg4: Any) { if (isErrorEnabled) error(cause, format(template, arg1, arg2, arg3, arg4)) } + + def error(message: String) { if (isErrorEnabled) error(null: Throwable, message) } + def error(template: String, arg1: Any) { if (isErrorEnabled) error(format(template, arg1)) } + def error(template: String, arg1: Any, arg2: Any) { if (isErrorEnabled) error(format(template, arg1, arg2)) } + def error(template: String, arg1: Any, arg2: Any, arg3: Any) { if (isErrorEnabled) error(format(template, arg1, arg2, arg3)) } + def error(template: String, arg1: Any, arg2: Any, arg3: Any, arg4: Any) { if (isErrorEnabled) error(format(template, arg1, arg2, arg3, arg4)) } + + def warning(message: String) { if (isWarningEnabled) notifyWarning(message) } + def warning(template: String, arg1: Any) { if (isWarningEnabled) warning(format(template, arg1)) } + def warning(template: String, arg1: Any, arg2: Any) { if (isWarningEnabled) warning(format(template, arg1, arg2)) } + def warning(template: String, arg1: Any, arg2: Any, arg3: Any) { if (isWarningEnabled) warning(format(template, arg1, arg2, arg3)) } + def warning(template: String, arg1: Any, arg2: Any, arg3: Any, arg4: Any) { if (isWarningEnabled) warning(format(template, arg1, arg2, arg3, arg4)) } + + def info(message: String) { if (isInfoEnabled) notifyInfo(message) } + def info(template: String, arg1: Any) { if (isInfoEnabled) info(format(template, arg1)) } + def info(template: String, arg1: Any, arg2: Any) { if (isInfoEnabled) info(format(template, arg1, arg2)) } + def info(template: String, arg1: Any, arg2: Any, arg3: Any) { if (isInfoEnabled) info(format(template, arg1, arg2, arg3)) } + def info(template: String, arg1: Any, arg2: Any, arg3: Any, arg4: Any) { if (isInfoEnabled) info(format(template, arg1, arg2, arg3, arg4)) } + + def debug(message: String) { if (isDebugEnabled) notifyDebug(message) } + def debug(template: String, arg1: Any) { if (isDebugEnabled) debug(format(template, arg1)) } + def debug(template: String, arg1: Any, arg2: Any) { if (isDebugEnabled) debug(format(template, arg1, arg2)) } + def debug(template: String, arg1: Any, arg2: Any, arg3: Any) { if (isDebugEnabled) debug(format(template, arg1, arg2, arg3)) } + def debug(template: String, arg1: Any, arg2: Any, arg3: Any, arg4: Any) { if (isDebugEnabled) debug(format(template, arg1, arg2, arg3, arg4)) } + + def format(t: String, arg1: Any) = t.replaceFirst("{}", arg1.asInstanceOf[AnyRef].toString) + def format(t: String, arg1: Any, arg2: Any) = t.replaceFirst("{}", arg1.asInstanceOf[AnyRef].toString).replaceFirst("{}", arg2.asInstanceOf[AnyRef].toString) + def format(t: String, arg1: Any, arg2: Any, arg3: Any) = t.replaceFirst("{}", arg1.asInstanceOf[AnyRef].toString).replaceFirst("{}", arg2.asInstanceOf[AnyRef].toString).replaceFirst("{}", arg3.asInstanceOf[AnyRef].toString) + def format(t: String, arg1: Any, arg2: Any, arg3: Any, arg4: Any) = t.replaceFirst("{}", arg1.asInstanceOf[AnyRef].toString).replaceFirst("{}", arg2.asInstanceOf[AnyRef].toString).replaceFirst("{}", arg3.asInstanceOf[AnyRef].toString).replaceFirst("{}", arg4.asInstanceOf[AnyRef].toString) + +} + +trait ActorLogging extends Logging { this: Actor => + + import EventHandler._ + + def isErrorEnabled = app.eventHandler.level >= ErrorLevel + def isWarningEnabled = app.eventHandler.level >= WarningLevel + def isInfoEnabled = app.eventHandler.level >= InfoLevel + def isDebugEnabled = app.eventHandler.level >= DebugLevel + + protected def notifyError(cause: Throwable, message: String) { app.eventHandler.notifyListeners(Error(cause, context.self, message)) } + + protected def notifyWarning(message: String) { app.eventHandler.notifyListeners(Warning(context.self, message)) } + + protected def notifyInfo(message: String) { app.eventHandler.notifyListeners(Info(context.self, message)) } + + protected def notifyDebug(message: String) { app.eventHandler.notifyListeners(Debug(context.self, message)) } + +} + +class EventHandlerLogging(val eventHandler: EventHandler, val loggingInstance: AnyRef) extends Logging { + + import EventHandler._ + + def isErrorEnabled = eventHandler.level >= ErrorLevel + def isWarningEnabled = eventHandler.level >= WarningLevel + def isInfoEnabled = eventHandler.level >= InfoLevel + def isDebugEnabled = eventHandler.level >= DebugLevel + + protected def notifyError(cause: Throwable, message: String) { eventHandler.notifyListeners(Error(cause, loggingInstance, message)) } + + protected def notifyWarning(message: String) { eventHandler.notifyListeners(Warning(loggingInstance, message)) } + + protected def notifyInfo(message: String) { eventHandler.notifyListeners(Info(loggingInstance, message)) } + + protected def notifyDebug(message: String) { eventHandler.notifyListeners(Debug(loggingInstance, message)) } + +} \ No newline at end of file diff --git a/akka-actor/src/main/scala/akka/remote/RemoteEventHandler.scala b/akka-actor/src/main/scala/akka/remote/RemoteEventHandler.scala index 4d122a1447..1cc70ad6ee 100644 --- a/akka-actor/src/main/scala/akka/remote/RemoteEventHandler.scala +++ b/akka-actor/src/main/scala/akka/remote/RemoteEventHandler.scala @@ -5,7 +5,6 @@ package akka.remote import akka.actor.Actor -import akka.event.EventHandler /** * RemoteModule client and server event listener that pipes the events to the standard Akka EventHander. @@ -18,33 +17,33 @@ class RemoteEventHandler extends Actor { // client case RemoteClientError(cause, client, address) ⇒ - EventHandler.error(cause, client, "RemoteClientError - Address[%s]" format address.toString) + app.eventHandler.error(cause, client, "RemoteClientError - Address[%s]" format address.toString) case RemoteClientWriteFailed(request, cause, client, address) ⇒ - EventHandler.error(cause, client, "RemoteClientWriteFailed - Request[%s] Address[%s]".format(request, address.toString)) + app.eventHandler.error(cause, client, "RemoteClientWriteFailed - Request[%s] Address[%s]".format(request, address.toString)) case RemoteClientDisconnected(client, address) ⇒ - EventHandler.info(client, "RemoteClientDisconnected - Address[%s]" format address.toString) + app.eventHandler.info(client, "RemoteClientDisconnected - Address[%s]" format address.toString) case RemoteClientConnected(client, address) ⇒ - EventHandler.info(client, "RemoteClientConnected - Address[%s]" format address.toString) + app.eventHandler.info(client, "RemoteClientConnected - Address[%s]" format address.toString) case RemoteClientStarted(client, address) ⇒ - EventHandler.info(client, "RemoteClientStarted - Address[%s]" format address.toString) + app.eventHandler.info(client, "RemoteClientStarted - Address[%s]" format address.toString) case RemoteClientShutdown(client, address) ⇒ - EventHandler.info(client, "RemoteClientShutdown - Address[%s]" format address.toString) + app.eventHandler.info(client, "RemoteClientShutdown - Address[%s]" format address.toString) // server case RemoteServerError(cause, server) ⇒ - EventHandler.error(cause, server, "RemoteServerError") + app.eventHandler.error(cause, server, "RemoteServerError") case RemoteServerWriteFailed(request, cause, server, clientAddress) ⇒ - EventHandler.error(cause, server, "RemoteServerWriteFailed - Request[%s] Address[%s]" format (request, clientAddress.toString)) + app.eventHandler.error(cause, server, "RemoteServerWriteFailed - Request[%s] Address[%s]" format (request, clientAddress.toString)) case RemoteServerStarted(server) ⇒ - EventHandler.info(server, "RemoteServerStarted") + app.eventHandler.info(server, "RemoteServerStarted") case RemoteServerShutdown(server) ⇒ - EventHandler.info(server, "RemoteServerShutdown") + app.eventHandler.info(server, "RemoteServerShutdown") case RemoteServerClientConnected(server, clientAddress) ⇒ - EventHandler.info(server, "RemoteServerClientConnected - Address[%s]" format clientAddress.toString) + app.eventHandler.info(server, "RemoteServerClientConnected - Address[%s]" format clientAddress.toString) case RemoteServerClientDisconnected(server, clientAddress) ⇒ - EventHandler.info(server, "RemoteServerClientDisconnected - Address[%s]" format clientAddress.toString) + app.eventHandler.info(server, "RemoteServerClientDisconnected - Address[%s]" format clientAddress.toString) case RemoteServerClientClosed(server, clientAddress) ⇒ - EventHandler.info(server, "RemoteServerClientClosed - Address[%s]" format clientAddress.toString) + app.eventHandler.info(server, "RemoteServerClientClosed - Address[%s]" format clientAddress.toString) case _ ⇒ //ignore other } diff --git a/akka-actor/src/main/scala/akka/routing/Routing.scala b/akka-actor/src/main/scala/akka/routing/Routing.scala index 2a0dc2a7b3..6d627f013e 100644 --- a/akka-actor/src/main/scala/akka/routing/Routing.scala +++ b/akka-actor/src/main/scala/akka/routing/Routing.scala @@ -315,7 +315,6 @@ trait BasicRouter extends Router { private def throwNoConnectionsError = { val error = new RoutingException("No replica connections for router") - EventHandler.error(error, this, error.toString) throw error } } diff --git a/akka-actor/src/main/scala/akka/util/Helpers.scala b/akka-actor/src/main/scala/akka/util/Helpers.scala index 1436f76580..da44abbe25 100644 --- a/akka-actor/src/main/scala/akka/util/Helpers.scala +++ b/akka-actor/src/main/scala/akka/util/Helpers.scala @@ -45,7 +45,6 @@ object Helpers { while (root.getCause ne null) root = e.getCause root.printStackTrace(new java.io.PrintWriter(sw)) System.err.println(sw.toString) - EventHandler.error(e, this, e.toString) throw e } } diff --git a/akka-actor/src/main/scala/akka/util/JMX.scala b/akka-actor/src/main/scala/akka/util/JMX.scala index 33b118d547..d4e287c2e3 100644 --- a/akka-actor/src/main/scala/akka/util/JMX.scala +++ b/akka-actor/src/main/scala/akka/util/JMX.scala @@ -5,9 +5,9 @@ package akka.util import akka.event.EventHandler - import java.lang.management.ManagementFactory import javax.management.{ ObjectInstance, ObjectName, InstanceAlreadyExistsException, InstanceNotFoundException } +import akka.AkkaApplication /** * @author Jonas Bonér @@ -18,20 +18,20 @@ object JMX { def nameFor(hostname: String, service: String, bean: String): ObjectName = new ObjectName("akka.%s:type=%s,name=%s".format(hostname, service, bean.replace(":", "_"))) - def register(name: ObjectName, mbean: AnyRef): Option[ObjectInstance] = try { + def register(name: ObjectName, mbean: AnyRef)(implicit app: AkkaApplication): Option[ObjectInstance] = try { Some(mbeanServer.registerMBean(mbean, name)) } catch { case e: InstanceAlreadyExistsException ⇒ Some(mbeanServer.getObjectInstance(name)) case e: Exception ⇒ - EventHandler.error(e, this, "Error when registering mbean [%s]".format(mbean)) + app.eventHandler.error(e, this, "Error when registering mbean [%s]".format(mbean)) None } - def unregister(mbean: ObjectName) = try { + def unregister(mbean: ObjectName)(implicit app: AkkaApplication) = try { mbeanServer.unregisterMBean(mbean) } catch { case e: InstanceNotFoundException ⇒ {} - case e: Exception ⇒ EventHandler.error(e, this, "Error while unregistering mbean [%s]".format(mbean)) + case e: Exception ⇒ app.eventHandler.error(e, this, "Error while unregistering mbean [%s]".format(mbean)) } } diff --git a/akka-actor/src/main/scala/akka/util/LockUtil.scala b/akka-actor/src/main/scala/akka/util/LockUtil.scala index 09f963e421..ce419ce33a 100644 --- a/akka-actor/src/main/scala/akka/util/LockUtil.scala +++ b/akka-actor/src/main/scala/akka/util/LockUtil.scala @@ -106,11 +106,8 @@ class Switch(startAsOn: Boolean = false) { if (switch.compareAndSet(from, !from)) { try { action - } catch { - case e: Throwable ⇒ - EventHandler.error(e, this, e.getMessage) - switch.compareAndSet(!from, from) // revert status - throw e + } finally { + switch.compareAndSet(!from, from) // revert status } true } else false diff --git a/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala b/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala index 3634eeae2b..f0b298b29a 100644 --- a/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala +++ b/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala @@ -34,11 +34,7 @@ object ReflectiveAccess { ctor.setAccessible(true) Right(ctor.newInstance(args: _*).asInstanceOf[T]) } catch { - case e: java.lang.reflect.InvocationTargetException ⇒ - EventHandler.debug(this, e.getCause.toString) - Left(e) case e: Exception ⇒ - EventHandler.debug(this, e.toString) Left(e) } @@ -154,7 +150,7 @@ class ReflectiveAccess(val application: AkkaApplication) { if (!isEnabled) { val e = new ModuleNotAvailableException( "Can't load the cluster module, make sure it is enabled in the config ('akka.enabled-modules = [\"cluster\"])' and that akka-cluster.jar is on the classpath") - EventHandler.debug(this, e.toString) + application.eventHandler.debug(this, e.toString) throw e } } @@ -162,21 +158,21 @@ class ReflectiveAccess(val application: AkkaApplication) { lazy val clusterInstance: Option[Cluster] = getObjectFor("akka.cluster.Cluster$") match { case Right(value) ⇒ Some(value) case Left(exception) ⇒ - EventHandler.debug(this, exception.toString) + application.eventHandler.debug(this, exception.toString) None } lazy val clusterDeployerInstance: Option[ActorDeployer] = getObjectFor("akka.cluster.ClusterDeployer$") match { case Right(value) ⇒ Some(value) case Left(exception) ⇒ - EventHandler.debug(this, exception.toString) + application.eventHandler.debug(this, exception.toString) None } lazy val transactionLogInstance: Option[TransactionLogObject] = getObjectFor("akka.cluster.TransactionLog$") match { case Right(value) ⇒ Some(value) case Left(exception) ⇒ - EventHandler.debug(this, exception.toString) + application.eventHandler.debug(this, exception.toString) None } @@ -249,7 +245,7 @@ class ReflectiveAccess(val application: AkkaApplication) { if (!isEnabled) { val e = new ModuleNotAvailableException( "Can't load the remote module, make sure it is enabled in the config ('akka.enabled-modules = [\"remote\"])' and that akka-remote.jar is on the classpath") - EventHandler.debug(this, e.toString) + application.eventHandler.debug(this, e.toString) throw e } } @@ -257,7 +253,7 @@ class ReflectiveAccess(val application: AkkaApplication) { lazy val remoteInstance: Option[RemoteService] = getObjectFor("akka.remote.Remote$") match { case Right(value) ⇒ Some(value) case Left(exception) ⇒ - EventHandler.debug(this, exception.toString) + application.eventHandler.debug(this, exception.toString) None } @@ -269,7 +265,7 @@ class ReflectiveAccess(val application: AkkaApplication) { val remoteSupportClass = getClassFor[RemoteSupport](TRANSPORT) match { case Right(value) ⇒ Some(value) case Left(exception) ⇒ - EventHandler.debug(this, exception.toString) + application.eventHandler.debug(this, exception.toString) None } @@ -283,7 +279,7 @@ class ReflectiveAccess(val application: AkkaApplication) { case Left(exception) ⇒ val e = new ModuleNotAvailableException( "Can't instantiate [%s] - make sure that akka-remote.jar is on the classpath".format(remoteClass.getName), exception) - EventHandler.debug(this, e.toString) + application.eventHandler.debug(this, e.toString) throw e } } diff --git a/akka-http/src/main/scala/akka/http/JettyContinuation.scala b/akka-http/src/main/scala/akka/http/JettyContinuation.scala index d6665b00e3..8fcd6b21a9 100644 --- a/akka-http/src/main/scala/akka/http/JettyContinuation.scala +++ b/akka-http/src/main/scala/akka/http/JettyContinuation.scala @@ -16,8 +16,8 @@ import akka.AkkaApplication */ trait JettyContinuation extends ContinuationListener { import javax.servlet.http.HttpServletResponse - - protected def application: AkkaApplication + + def app: AkkaApplication val builder: () ⇒ tAsyncRequestContext val context: Option[tAsyncRequestContext] = Some(builder()) @@ -35,7 +35,7 @@ trait JettyContinuation extends ContinuationListener { // the fresh continuation (coming through getAsyncContinuation) // case (true, false, false) ⇒ { - continuation.setTimeout(application.MistSettings.DefaultTimeout) + continuation.setTimeout(app.MistSettings.DefaultTimeout) continuation.addContinuationListener(this) continuation.suspend @@ -47,7 +47,7 @@ trait JettyContinuation extends ContinuationListener { // case (true, true, false) ⇒ { - continuation.setTimeout(application.MistSettings.DefaultTimeout) + continuation.setTimeout(app.MistSettings.DefaultTimeout) continuation.addContinuationListener(this) Some(continuation) @@ -58,9 +58,9 @@ trait JettyContinuation extends ContinuationListener { // case (false, false, false) ⇒ { - continuation.setTimeout(continuation.getAttribute(application.MistSettings.TimeoutAttribute).asInstanceOf[Long]) + continuation.setTimeout(continuation.getAttribute(app.MistSettings.TimeoutAttribute).asInstanceOf[Long]) continuation.suspend - continuation.removeAttribute(application.MistSettings.TimeoutAttribute) + continuation.removeAttribute(app.MistSettings.TimeoutAttribute) None } @@ -70,8 +70,8 @@ trait JettyContinuation extends ContinuationListener { // case (false, true, false) ⇒ { - continuation.setTimeout(continuation.getAttribute(application.MistSettings.TimeoutAttribute).asInstanceOf[Long]) - continuation.removeAttribute(application.MistSettings.TimeoutAttribute) + continuation.setTimeout(continuation.getAttribute(app.MistSettings.TimeoutAttribute).asInstanceOf[Long]) + continuation.removeAttribute(app.MistSettings.TimeoutAttribute) None } @@ -87,13 +87,13 @@ trait JettyContinuation extends ContinuationListener { def suspended: Boolean = _continuation match { case None ⇒ false - case Some(continuation) ⇒ (continuation.isSuspended || (continuation.getAttribute(application.MistSettings.TimeoutAttribute) ne null)) + case Some(continuation) ⇒ (continuation.isSuspended || (continuation.getAttribute(app.MistSettings.TimeoutAttribute) ne null)) } def timeout(ms: Long): Boolean = _continuation match { case None ⇒ false case Some(continuation) ⇒ - continuation.setAttribute(application.MistSettings.TimeoutAttribute, ms) + continuation.setAttribute(app.MistSettings.TimeoutAttribute, ms) continuation.resume true } @@ -103,21 +103,19 @@ trait JettyContinuation extends ContinuationListener { // def onComplete(c: Continuation) = {} def onTimeout(c: Continuation) = { - c.getServletResponse.asInstanceOf[HttpServletResponse].addHeader(application.MistSettings.ExpiredHeaderName, application.MistSettings.ExpiredHeaderValue) + c.getServletResponse.asInstanceOf[HttpServletResponse].addHeader(app.MistSettings.ExpiredHeaderName, app.MistSettings.ExpiredHeaderValue) c.complete } } -class JettyContinuationMethodFactory(val _application: AkkaApplication) extends RequestMethodFactory { - trait App { - def application = _application - } - def Delete(f: () ⇒ tAsyncRequestContext): RequestMethod = new Delete(f) with JettyContinuation with App - def Get(f: () ⇒ tAsyncRequestContext): RequestMethod = new Get(f) with JettyContinuation with App - def Head(f: () ⇒ tAsyncRequestContext): RequestMethod = new Head(f) with JettyContinuation with App - def Options(f: () ⇒ tAsyncRequestContext): RequestMethod = new Options(f) with JettyContinuation with App - def Post(f: () ⇒ tAsyncRequestContext): RequestMethod = new Post(f) with JettyContinuation with App - def Put(f: () ⇒ tAsyncRequestContext): RequestMethod = new Put(f) with JettyContinuation with App - def Trace(f: () ⇒ tAsyncRequestContext): RequestMethod = new Trace(f) with JettyContinuation with App +class JettyContinuationMethodFactory(_app: AkkaApplication) extends RequestMethodFactory { + implicit val app = _app + def Delete(f: () ⇒ tAsyncRequestContext): RequestMethod = new Delete(f) with JettyContinuation + def Get(f: () ⇒ tAsyncRequestContext): RequestMethod = new Get(f) with JettyContinuation + def Head(f: () ⇒ tAsyncRequestContext): RequestMethod = new Head(f) with JettyContinuation + def Options(f: () ⇒ tAsyncRequestContext): RequestMethod = new Options(f) with JettyContinuation + def Post(f: () ⇒ tAsyncRequestContext): RequestMethod = new Post(f) with JettyContinuation + def Put(f: () ⇒ tAsyncRequestContext): RequestMethod = new Put(f) with JettyContinuation + def Trace(f: () ⇒ tAsyncRequestContext): RequestMethod = new Trace(f) with JettyContinuation } diff --git a/akka-http/src/main/scala/akka/http/Mist.scala b/akka-http/src/main/scala/akka/http/Mist.scala index 829db96286..6d530ad0f3 100644 --- a/akka-http/src/main/scala/akka/http/Mist.scala +++ b/akka-http/src/main/scala/akka/http/Mist.scala @@ -53,7 +53,7 @@ object Types { trait Mist { import javax.servlet.ServletContext - protected def application: AkkaApplication + protected def app: AkkaApplication /** * The root endpoint actor @@ -99,7 +99,7 @@ trait Mist { // shoot the message to the root endpoint for processing // IMPORTANT: the suspend method is invoked on the server thread not in the actor - val method = builder(() ⇒ suspend(application.MistSettings.ConnectionClose)) + val method = builder(() ⇒ suspend(app.MistSettings.ConnectionClose)) if (method.go) root ! method } @@ -111,9 +111,9 @@ trait Mist { val server = context.getServerInfo val (major, minor) = (context.getMajorVersion, context.getMinorVersion) factory = if (major >= 3) { - Some(new Servlet30ContextMethodFactory(application)) - } else if (server.toLowerCase startsWith application.MistSettings.JettyServer) { - Some(new JettyContinuationMethodFactory(application)) + Some(new Servlet30ContextMethodFactory(app)) + } else if (server.toLowerCase startsWith app.MistSettings.JettyServer) { + Some(new JettyContinuationMethodFactory(app)) } else { None } @@ -123,14 +123,14 @@ trait Mist { trait RootEndpointLocator { var root: ActorRef = null - protected def application: AkkaApplication + protected def app: AkkaApplication def configureRoot(address: String) { def findRoot(address: String): ActorRef = - application.registry.actorFor(address).getOrElse( + app.registry.actorFor(address).getOrElse( throw new ConfigurationException("akka.http.root-actor-id configuration option does not have a valid actor address [" + address + "]")) - root = if ((address eq null) || address == "") findRoot(application.MistSettings.RootActorID) else findRoot(address) + root = if ((address eq null) || address == "") findRoot(app.MistSettings.RootActorID) else findRoot(address) } } @@ -138,7 +138,7 @@ trait RootEndpointLocator { * AkkaMistServlet adds support to bridge Http and Actors in an asynchronous fashion * Async impls currently supported: Servlet3.0, Jetty Continuations */ -class AkkaMistServlet(val application: AkkaApplication) extends HttpServlet with Mist with RootEndpointLocator { +class AkkaMistServlet(val app: AkkaApplication) extends HttpServlet with Mist with RootEndpointLocator { import javax.servlet.{ ServletConfig } /** @@ -157,7 +157,7 @@ class AkkaMistServlet(val application: AkkaApplication) extends HttpServlet with * Proof-of-concept, use at own risk * Will be officially supported in a later release */ -class AkkaMistFilter(val application: AkkaApplication) extends Filter with Mist with RootEndpointLocator { +class AkkaMistFilter(val app: AkkaApplication) extends Filter with Mist with RootEndpointLocator { import javax.servlet.{ ServletRequest, ServletResponse, FilterConfig, FilterChain } /** @@ -294,6 +294,8 @@ class RootEndpoint extends Actor with Endpoint { trait RequestMethod { import java.io.IOException import javax.servlet.http.{ HttpServletResponse, HttpServletRequest } + + def app: AkkaApplication // required implementations val builder: () ⇒ tAsyncRequestContext @@ -358,7 +360,7 @@ trait RequestMethod { } } catch { case io: Exception ⇒ - EventHandler.error(io, this, io.getMessage) + app.eventHandler.error(io, this, io.getMessage) false } case None ⇒ false @@ -374,7 +376,7 @@ trait RequestMethod { } } catch { case io: IOException ⇒ - EventHandler.error(io, this, io.getMessage) + app.eventHandler.error(io, this, io.getMessage) } case None ⇒ {} } @@ -401,13 +403,13 @@ trait RequestMethod { def Unavailable(body: String, retry: Int): Boolean = complete(HttpServletResponse.SC_SERVICE_UNAVAILABLE, body, List(("Retry-After", retry.toString))) } -abstract class Delete(val builder: () ⇒ tAsyncRequestContext) extends RequestMethod -abstract class Get(val builder: () ⇒ tAsyncRequestContext) extends RequestMethod -abstract class Head(val builder: () ⇒ tAsyncRequestContext) extends RequestMethod -abstract class Options(val builder: () ⇒ tAsyncRequestContext) extends RequestMethod -abstract class Post(val builder: () ⇒ tAsyncRequestContext) extends RequestMethod -abstract class Put(val builder: () ⇒ tAsyncRequestContext) extends RequestMethod -abstract class Trace(val builder: () ⇒ tAsyncRequestContext) extends RequestMethod +abstract class Delete(val builder: () ⇒ tAsyncRequestContext)(implicit val app: AkkaApplication) extends RequestMethod +abstract class Get(val builder: () ⇒ tAsyncRequestContext)(implicit val app: AkkaApplication) extends RequestMethod +abstract class Head(val builder: () ⇒ tAsyncRequestContext)(implicit val app: AkkaApplication) extends RequestMethod +abstract class Options(val builder: () ⇒ tAsyncRequestContext)(implicit val app: AkkaApplication) extends RequestMethod +abstract class Post(val builder: () ⇒ tAsyncRequestContext)(implicit val app: AkkaApplication) extends RequestMethod +abstract class Put(val builder: () ⇒ tAsyncRequestContext)(implicit val app: AkkaApplication) extends RequestMethod +abstract class Trace(val builder: () ⇒ tAsyncRequestContext)(implicit val app: AkkaApplication) extends RequestMethod trait RequestMethodFactory { def Delete(f: () ⇒ tAsyncRequestContext): RequestMethod diff --git a/akka-http/src/main/scala/akka/http/Servlet30Context.scala b/akka-http/src/main/scala/akka/http/Servlet30Context.scala index 8f3c191d32..fd797b8a5c 100644 --- a/akka-http/src/main/scala/akka/http/Servlet30Context.scala +++ b/akka-http/src/main/scala/akka/http/Servlet30Context.scala @@ -6,7 +6,6 @@ package akka.http import javax.servlet.{ AsyncContext, AsyncListener, AsyncEvent } import Types._ -import akka.event.EventHandler import akka.AkkaApplication /** @@ -15,7 +14,7 @@ import akka.AkkaApplication trait Servlet30Context extends AsyncListener { import javax.servlet.http.HttpServletResponse - protected def application: AkkaApplication + def app: AkkaApplication val builder: () ⇒ tAsyncRequestContext val context: Option[tAsyncRequestContext] = Some(builder()) @@ -23,7 +22,7 @@ trait Servlet30Context extends AsyncListener { protected val _ac: AsyncContext = { val ac = context.get.asInstanceOf[AsyncContext] - ac setTimeout application.MistSettings.DefaultTimeout + ac setTimeout app.MistSettings.DefaultTimeout ac addListener this ac } @@ -36,7 +35,7 @@ trait Servlet30Context extends AsyncListener { true } catch { case e: IllegalStateException ⇒ - EventHandler.error(e, this, e.getMessage) + app.eventHandler.error(e, this, e.getMessage) false } } @@ -47,25 +46,23 @@ trait Servlet30Context extends AsyncListener { def onComplete(e: AsyncEvent) {} def onError(e: AsyncEvent) = e.getThrowable match { case null ⇒ - case t ⇒ EventHandler.error(t, this, t.getMessage) + case t ⇒ app.eventHandler.error(t, this, t.getMessage) } def onStartAsync(e: AsyncEvent) {} def onTimeout(e: AsyncEvent) = { - e.getSuppliedResponse.asInstanceOf[HttpServletResponse].addHeader(application.MistSettings.ExpiredHeaderName, application.MistSettings.ExpiredHeaderValue) + e.getSuppliedResponse.asInstanceOf[HttpServletResponse].addHeader(app.MistSettings.ExpiredHeaderName, app.MistSettings.ExpiredHeaderValue) e.getAsyncContext.complete } } -class Servlet30ContextMethodFactory(val _application: AkkaApplication) extends RequestMethodFactory { - trait App { - def application = _application - } - def Delete(f: () ⇒ tAsyncRequestContext): RequestMethod = new Delete(f) with Servlet30Context with App - def Get(f: () ⇒ tAsyncRequestContext): RequestMethod = new Get(f) with Servlet30Context with App - def Head(f: () ⇒ tAsyncRequestContext): RequestMethod = new Head(f) with Servlet30Context with App - def Options(f: () ⇒ tAsyncRequestContext): RequestMethod = new Options(f) with Servlet30Context with App - def Post(f: () ⇒ tAsyncRequestContext): RequestMethod = new Post(f) with Servlet30Context with App - def Put(f: () ⇒ tAsyncRequestContext): RequestMethod = new Put(f) with Servlet30Context with App - def Trace(f: () ⇒ tAsyncRequestContext): RequestMethod = new Trace(f) with Servlet30Context with App +class Servlet30ContextMethodFactory(_app: AkkaApplication) extends RequestMethodFactory { + implicit val app = _app + def Delete(f: () ⇒ tAsyncRequestContext): RequestMethod = new Delete(f) with Servlet30Context + def Get(f: () ⇒ tAsyncRequestContext): RequestMethod = new Get(f) with Servlet30Context + def Head(f: () ⇒ tAsyncRequestContext): RequestMethod = new Head(f) with Servlet30Context + def Options(f: () ⇒ tAsyncRequestContext): RequestMethod = new Options(f) with Servlet30Context + def Post(f: () ⇒ tAsyncRequestContext): RequestMethod = new Post(f) with Servlet30Context + def Put(f: () ⇒ tAsyncRequestContext): RequestMethod = new Put(f) with Servlet30Context + def Trace(f: () ⇒ tAsyncRequestContext): RequestMethod = new Trace(f) with Servlet30Context } diff --git a/akka-remote/src/main/scala/akka/remote/BootableRemoteActorService.scala b/akka-remote/src/main/scala/akka/remote/BootableRemoteActorService.scala index 3850e61daf..0f423db230 100644 --- a/akka-remote/src/main/scala/akka/remote/BootableRemoteActorService.scala +++ b/akka-remote/src/main/scala/akka/remote/BootableRemoteActorService.scala @@ -26,19 +26,19 @@ trait BootableRemoteActorService extends Bootable { abstract override def onLoad() { if (app.reflective.ClusterModule.isEnabled && settings.isRemotingEnabled) { - EventHandler.info(this, "Initializing Remote Actors Service...") + app.eventHandler.info(this, "Initializing Remote Actors Service...") startRemoteService() - EventHandler.info(this, "Remote Actors Service initialized") + app.eventHandler.info(this, "Remote Actors Service initialized") } super.onLoad() } abstract override def onUnload() { - EventHandler.info(this, "Shutting down Remote Actors Service") + app.eventHandler.info(this, "Shutting down Remote Actors Service") app.remote.shutdown() if (remoteServerThread.isAlive) remoteServerThread.join(1000) - EventHandler.info(this, "Remote Actors Service has been shut down") + app.eventHandler.info(this, "Remote Actors Service has been shut down") super.onUnload() } } diff --git a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala index 119a84ca4c..a124dde295 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala @@ -35,7 +35,7 @@ class RemoteActorRefProvider(val app: AkkaApplication, val remote: Remote) exten private val actors = new ConcurrentHashMap[String, Promise[Option[ActorRef]]] - private val failureDetector = new BannagePeriodFailureDetector(remote, timeToBan = 60 seconds) // FIXME make timeToBan configurable + private val failureDetector = new BannagePeriodFailureDetector(app, remote, timeToBan = 60 seconds) // FIXME make timeToBan configurable def actorOf(props: Props, address: String): Option[ActorRef] = { Address.validate(address) @@ -129,7 +129,7 @@ class RemoteActorRefProvider(val app: AkkaApplication, val remote: Remote) exten * Using (checking out) actor on a specific node. */ def useActorOnNode(remoteAddress: InetSocketAddress, actorAddress: String, actorFactory: () ⇒ Actor) { - EventHandler.debug(this, "Instantiating Actor [%s] on node [%s]".format(actorAddress, remoteAddress)) + app.eventHandler.debug(this, "Instantiating Actor [%s] on node [%s]".format(actorAddress, remoteAddress)) val actorFactoryBytes = app.serialization.serialize(actorFactory) match { @@ -164,20 +164,20 @@ class RemoteActorRefProvider(val app: AkkaApplication, val remote: Remote) exten try { (connection ? (command, remote.remoteDaemonAckTimeout)).as[Status] match { case Some(Success(receiver)) ⇒ - EventHandler.debug(this, "Remote command sent to [%s] successfully received".format(receiver)) + app.eventHandler.debug(this, "Remote command sent to [%s] successfully received".format(receiver)) case Some(Failure(cause)) ⇒ - EventHandler.error(cause, this, cause.toString) + app.eventHandler.error(cause, this, cause.toString) throw cause case None ⇒ val error = new RemoteException("Remote command to [%s] timed out".format(connection.address)) - EventHandler.error(error, this, error.toString) + app.eventHandler.error(error, this, error.toString) throw error } } catch { case e: Exception ⇒ - EventHandler.error(e, this, "Could not send remote command to [%s] due to: %s".format(connection.address, e.toString)) + app.eventHandler.error(e, this, "Could not send remote command to [%s] due to: %s".format(connection.address, e.toString)) throw e } } else { diff --git a/akka-remote/src/main/scala/akka/remote/RemoteDaemon.scala b/akka-remote/src/main/scala/akka/remote/RemoteDaemon.scala index fb9ed34c6f..b4360de1d2 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteDaemon.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteDaemon.scala @@ -76,7 +76,7 @@ class Remote(val app: AkkaApplication) extends RemoteService { def start() { val triggerLazyServerVal = address.toString - EventHandler.info(this, "Starting remote server on [%s]".format(triggerLazyServerVal)) + app.eventHandler.info(this, "Starting remote server on [%s]".format(triggerLazyServerVal)) } def uuidProtocolToUuid(uuid: UuidProtocol): UUID = new UUID(uuid.getHigh, uuid.getLow) @@ -100,12 +100,12 @@ class RemoteDaemon(val remote: Remote) extends Actor { import remote._ override def preRestart(reason: Throwable, msg: Option[Any]) { - EventHandler.debug(this, "RemoteDaemon failed due to [%s] restarting...".format(reason)) + app.eventHandler.debug(this, "RemoteDaemon failed due to [%s] restarting...".format(reason)) } def receive: Actor.Receive = { case message: RemoteDaemonMessageProtocol ⇒ - EventHandler.debug(this, + app.eventHandler.debug(this, "Received command [\n%s] to RemoteDaemon on [%s]".format(message, app.nodename)) message.getMessageType match { @@ -123,7 +123,7 @@ class RemoteDaemon(val remote: Remote) extends Actor { //TODO: should we not deal with unrecognized message types? } - case unknown ⇒ EventHandler.warning(this, "Unknown message [%s]".format(unknown)) + case unknown ⇒ app.eventHandler.warning(this, "Unknown message [%s]".format(unknown)) } def handleUse(message: RemoteProtocol.RemoteDaemonMessageProtocol) { @@ -146,7 +146,7 @@ class RemoteDaemon(val remote: Remote) extends Actor { remote.server.register(actorAddress, newActorRef) } else { - EventHandler.error(this, "Actor 'address' is not defined, ignoring remote daemon command [%s]".format(message)) + app.eventHandler.error(this, "Actor 'address' is not defined, ignoring remote daemon command [%s]".format(message)) } reply(Success(address.toString)) diff --git a/akka-remote/src/main/scala/akka/remote/RemoteFailureDetector.scala b/akka-remote/src/main/scala/akka/remote/RemoteFailureDetector.scala index d4c44f9030..4b5f100374 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteFailureDetector.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteFailureDetector.scala @@ -16,13 +16,14 @@ import scala.annotation.tailrec import java.net.InetSocketAddress import java.util.concurrent.atomic.AtomicReference import System.{ currentTimeMillis ⇒ newTimestamp } +import akka.AkkaApplication /** * Base class for remote failure detection management. * * @author Jonas Bonér */ -abstract class RemoteFailureDetectorBase(remote: Remote, initialConnections: Map[InetSocketAddress, ActorRef]) +abstract class RemoteFailureDetectorBase(app: AkkaApplication, remote: Remote, initialConnections: Map[InetSocketAddress, ActorRef]) extends FailureDetector with NetworkEventStream.Listener { @@ -81,7 +82,7 @@ abstract class RemoteFailureDetectorBase(remote: Remote, initialConnections: Map @tailrec final def failOver(from: InetSocketAddress, to: InetSocketAddress) { - EventHandler.debug(this, "RemoteFailureDetector failover from [%s] to [%s]".format(from, to)) + app.eventHandler.debug(this, "RemoteFailureDetector failover from [%s] to [%s]".format(from, to)) val oldState = state.get var changed = false @@ -132,7 +133,7 @@ abstract class RemoteFailureDetectorBase(remote: Remote, initialConnections: Map if (!state.compareAndSet(oldState, newState)) { remove(faultyConnection) // recur } else { - EventHandler.debug(this, "Removing connection [%s]".format(faultyAddress)) + app.eventHandler.debug(this, "Removing connection [%s]".format(faultyAddress)) remote.eventStream.unregister(this, faultyAddress) // unregister the connections - e.g stop listen to events from it } } @@ -160,7 +161,7 @@ abstract class RemoteFailureDetectorBase(remote: Remote, initialConnections: Map putIfAbsent(address, newConnectionFactory) // recur } else { // we succeeded - EventHandler.debug(this, "Adding connection [%s]".format(address)) + app.eventHandler.debug(this, "Adding connection [%s]".format(address)) remote.eventStream.register(this, address) // register the connection - e.g listen to events from it newConnection // return new connection actor } @@ -175,9 +176,9 @@ abstract class RemoteFailureDetectorBase(remote: Remote, initialConnections: Map /** * Simple failure detector that removes the failing connection permanently on first error. */ -class RemoveConnectionOnFirstFailureRemoteFailureDetector(_remote: Remote, +class RemoveConnectionOnFirstFailureRemoteFailureDetector(_app: AkkaApplication, _remote: Remote, initialConnections: Map[InetSocketAddress, ActorRef] = Map.empty[InetSocketAddress, ActorRef]) - extends RemoteFailureDetectorBase(_remote, initialConnections) { + extends RemoteFailureDetectorBase(_app, _remote, initialConnections) { protected def newState() = State(Long.MinValue, initialConnections) @@ -213,10 +214,10 @@ class RemoveConnectionOnFirstFailureRemoteFailureDetector(_remote: Remote, * * @author Jonas Bonér */ -class BannagePeriodFailureDetector(_remote: Remote, +class BannagePeriodFailureDetector(_app: AkkaApplication, _remote: Remote, initialConnections: Map[InetSocketAddress, ActorRef] = Map.empty[InetSocketAddress, ActorRef], timeToBan: Duration) - extends RemoteFailureDetectorBase(_remote, initialConnections) { + extends RemoteFailureDetectorBase(_app, _remote, initialConnections) { // FIXME considering adding a Scheduler event to notify the BannagePeriodFailureDetector unban the banned connection after the timeToBan have exprired diff --git a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala index 232e902fae..c2581e3da6 100644 --- a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala +++ b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala @@ -9,7 +9,6 @@ import akka.dispatch.{ ActorPromise, DefaultPromise, Promise } import akka.remote._ import RemoteProtocol._ import akka.util._ -import akka.event.EventHandler import org.jboss.netty.channel._ import org.jboss.netty.channel.group.{ DefaultChannelGroup, ChannelGroup, ChannelGroupFuture } import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory @@ -209,7 +208,7 @@ abstract class RemoteClient private[akka] ( senderFuture: Option[Promise[T]]): Option[Promise[T]] = { if (isRunning) { - EventHandler.debug(this, "Sending to connection [%s] message [\n%s]".format(remoteAddress, request)) + app.eventHandler.debug(this, "Sending to connection [%s] message [\n%s]".format(remoteAddress, request)) // tell if (request.getOneWay) { @@ -279,7 +278,7 @@ abstract class RemoteClient private[akka] ( private[remote] def sendPendingRequests() = pendingRequests synchronized { // ensure only one thread at a time can flush the log val nrOfMessages = pendingRequests.size - if (nrOfMessages > 0) EventHandler.info(this, "Resending [%s] previously failed messages after remote client reconnect" format nrOfMessages) + if (nrOfMessages > 0) app.eventHandler.info(this, "Resending [%s] previously failed messages after remote client reconnect" format nrOfMessages) var pendingRequest = pendingRequests.peek while (pendingRequest ne null) { @@ -362,14 +361,14 @@ class ActiveRemoteClient private[akka] ( } def attemptReconnect(): Boolean = { - EventHandler.debug(this, "Remote client reconnecting to [%s]".format(remoteAddress)) + app.eventHandler.debug(this, "Remote client reconnecting to [%s]".format(remoteAddress)) val connection = bootstrap.connect(remoteAddress) openChannels.add(connection.awaitUninterruptibly.getChannel) // Wait until the connection attempt succeeds or fails. if (!connection.isSuccess) { notifyListeners(RemoteClientError(connection.getCause, module, remoteAddress)) - EventHandler.error(connection.getCause, this, "Reconnection to [%s] has failed".format(remoteAddress)) + app.eventHandler.error(connection.getCause, this, "Reconnection to [%s] has failed".format(remoteAddress)) false } else { @@ -387,7 +386,7 @@ class ActiveRemoteClient private[akka] ( bootstrap.setOption("tcpNoDelay", true) bootstrap.setOption("keepAlive", true) - EventHandler.debug(this, "Starting remote client connection to [%s]".format(remoteAddress)) + app.eventHandler.debug(this, "Starting remote client connection to [%s]".format(remoteAddress)) connection = bootstrap.connect(remoteAddress) @@ -396,7 +395,7 @@ class ActiveRemoteClient private[akka] ( if (!connection.isSuccess) { notifyListeners(RemoteClientError(connection.getCause, module, remoteAddress)) - EventHandler.error(connection.getCause, this, "Remote client connection to [%s] has failed".format(remoteAddress)) + app.eventHandler.error(connection.getCause, this, "Remote client connection to [%s] has failed".format(remoteAddress)) false } else { @@ -423,7 +422,7 @@ class ActiveRemoteClient private[akka] ( case false if reconnectIfAlreadyConnected ⇒ closeChannel(connection) - EventHandler.debug(this, "Remote client reconnecting to [%s]".format(remoteAddress)) + app.eventHandler.debug(this, "Remote client reconnecting to [%s]".format(remoteAddress)) attemptReconnect() case false ⇒ false @@ -432,7 +431,7 @@ class ActiveRemoteClient private[akka] ( // Please note that this method does _not_ remove the ARC from the NettyRemoteClientModule's map of clients def shutdown() = runSwitch switchOff { - EventHandler.info(this, "Shutting down remote client [%s]".format(name)) + app.eventHandler.info(this, "Shutting down remote client [%s]".format(name)) notifyListeners(RemoteClientShutdown(module, remoteAddress)) timer.stop() @@ -444,7 +443,7 @@ class ActiveRemoteClient private[akka] ( connection = null pendingRequests.clear() - EventHandler.info(this, "[%s] has been shut down".format(name)) + app.eventHandler.info(this, "[%s] has been shut down".format(name)) } private[akka] def isWithinReconnectionTimeWindow: Boolean = { @@ -454,7 +453,7 @@ class ActiveRemoteClient private[akka] ( } else { val timeLeft = (RECONNECTION_TIME_WINDOW - (System.currentTimeMillis - reconnectionTimeWindowStart)) > 0 if (timeLeft) { - EventHandler.info(this, "Will try to reconnect to remote server for another [%s] milliseconds".format(timeLeft)) + app.eventHandler.info(this, "Will try to reconnect to remote server for another [%s] milliseconds".format(timeLeft)) } timeLeft } @@ -521,7 +520,7 @@ class ActiveRemoteClientHandler( case arp: AkkaRemoteProtocol if arp.hasMessage ⇒ val reply = arp.getMessage val replyUuid = uuidFrom(reply.getActorInfo.getUuid.getHigh, reply.getActorInfo.getUuid.getLow) - EventHandler.debug(this, "Remote client received RemoteMessageProtocol[\n%s]\nTrying to map back to future [%s]".format(reply, replyUuid)) + app.eventHandler.debug(this, "Remote client received RemoteMessageProtocol[\n%s]\nTrying to map back to future [%s]".format(reply, replyUuid)) futures.remove(replyUuid).asInstanceOf[Promise[Any]] match { case null ⇒ @@ -543,7 +542,7 @@ class ActiveRemoteClientHandler( } } catch { case e: Exception ⇒ - EventHandler.error(e, this, e.getMessage) + app.eventHandler.error(e, this, e.getMessage) client.notifyListeners(RemoteClientError(e, client.module, client.remoteAddress)) } } @@ -567,24 +566,24 @@ class ActiveRemoteClientHandler( try { if (client.useTransactionLog) client.sendPendingRequests() // try to send pending requests (still there after client/server crash ard reconnect client.notifyListeners(RemoteClientConnected(client.module, client.remoteAddress)) - EventHandler.debug(this, "Remote client connected to [%s]".format(ctx.getChannel.getRemoteAddress)) + app.eventHandler.debug(this, "Remote client connected to [%s]".format(ctx.getChannel.getRemoteAddress)) client.resetReconnectionTimeWindow } catch { case e: Exception ⇒ - EventHandler.error(e, this, e.getMessage) + app.eventHandler.error(e, this, e.getMessage) client.notifyListeners(RemoteClientError(e, client.module, client.remoteAddress)) } } override def channelDisconnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = { client.notifyListeners(RemoteClientDisconnected(client.module, client.remoteAddress)) - EventHandler.debug(this, "Remote client disconnected from [%s]".format(ctx.getChannel.getRemoteAddress)) + app.eventHandler.debug(this, "Remote client disconnected from [%s]".format(ctx.getChannel.getRemoteAddress)) } override def exceptionCaught(ctx: ChannelHandlerContext, event: ExceptionEvent) = { val cause = event.getCause if (cause ne null) { - EventHandler.error(event.getCause, this, "Unexpected exception [%s] from downstream in remote client [%s]".format(event.getCause, event)) + app.eventHandler.error(event.getCause, this, "Unexpected exception [%s] from downstream in remote client [%s]".format(event.getCause, event)) cause match { case e: ReadTimeoutException ⇒ @@ -596,7 +595,7 @@ class ActiveRemoteClientHandler( event.getChannel.close //FIXME Is this the correct behavior? } - } else EventHandler.error(this, "Unexpected exception from downstream in remote client [%s]".format(event)) + } else app.eventHandler.error(this, "Unexpected exception from downstream in remote client [%s]".format(event)) } private def parseException(reply: RemoteMessageProtocol, loader: Option[ClassLoader]): Throwable = { @@ -611,7 +610,7 @@ class ActiveRemoteClientHandler( .newInstance(exception.getMessage).asInstanceOf[Throwable] } catch { case problem: Exception ⇒ - EventHandler.error(problem, this, problem.getMessage) + app.eventHandler.error(problem, this, problem.getMessage) CannotInstantiateRemoteExceptionDueToRemoteProtocolParsingErrorException(problem, classname, exception.getMessage) } } @@ -646,7 +645,7 @@ class NettyRemoteSupport(_app: AkkaApplication) extends RemoteSupport(_app) with } val remoteInetSocketAddress = new InetSocketAddress(host, port) - EventHandler.debug(this, + app.eventHandler.debug(this, "Creating RemoteActorRef with address [%s] connected to [%s]" .format(actorAddress, remoteInetSocketAddress)) RemoteActorRef(app, app.remote, remoteInetSocketAddress, actorAddress, loader) @@ -689,7 +688,7 @@ class NettyRemoteServer(app: AkkaApplication, serverModule: NettyRemoteServerMod serverModule.notifyListeners(RemoteServerStarted(serverModule)) def shutdown() { - EventHandler.info(this, "Shutting down remote server [%s]".format(name)) + app.eventHandler.info(this, "Shutting down remote server [%s]".format(name)) try { val shutdownSignal = { val b = RemoteControlProtocol.newBuilder.setCommandType(CommandType.SHUTDOWN) @@ -705,7 +704,7 @@ class NettyRemoteServer(app: AkkaApplication, serverModule: NettyRemoteServerMod serverModule.notifyListeners(RemoteServerShutdown(serverModule)) } catch { case e: Exception ⇒ - EventHandler.error(e, this, e.getMessage) + app.eventHandler.error(e, this, e.getMessage) } } } @@ -736,13 +735,13 @@ trait NettyRemoteServerModule extends RemoteServerModule { def start(_hostname: String, _port: Int, loader: Option[ClassLoader] = None): RemoteServerModule = guard withGuard { try { _isRunning switchOn { - EventHandler.debug(this, "Starting up remote server on %s:s".format(_hostname, _port)) + app.eventHandler.debug(this, "Starting up remote server on %s:s".format(_hostname, _port)) currentServer.set(Some(new NettyRemoteServer(app, this, _hostname, _port, loader))) } } catch { case e: Exception ⇒ - EventHandler.error(e, this, e.getMessage) + app.eventHandler.error(e, this, e.getMessage) notifyListeners(RemoteServerError(e, this)) } this @@ -751,7 +750,7 @@ trait NettyRemoteServerModule extends RemoteServerModule { def shutdownServerModule() = guard withGuard { _isRunning switchOff { currentServer.getAndSet(None) foreach { instance ⇒ - EventHandler.debug(this, "Shutting down remote server on %s:%s".format(instance.host, instance.port)) + app.eventHandler.debug(this, "Shutting down remote server on %s:%s".format(instance.host, instance.port)) instance.shutdown() } } @@ -796,7 +795,7 @@ trait NettyRemoteServerModule extends RemoteServerModule { def unregister(actorRef: ActorRef): Unit = guard withGuard { if (_isRunning.isOn) { - EventHandler.debug(this, "Unregister server side remote actor with id [%s]".format(actorRef.uuid)) + app.eventHandler.debug(this, "Unregister server side remote actor with id [%s]".format(actorRef.uuid)) actors.remove(actorRef.address, actorRef) actorsByUuid.remove(actorRef.uuid.toString, actorRef) @@ -811,7 +810,7 @@ trait NettyRemoteServerModule extends RemoteServerModule { def unregister(id: String): Unit = guard withGuard { if (_isRunning.isOn) { - EventHandler.debug(this, "Unregister server side remote actor with id [%s]".format(id)) + app.eventHandler.debug(this, "Unregister server side remote actor with id [%s]".format(id)) if (id.startsWith(UUID_PREFIX)) actorsByUuid.remove(id.substring(UUID_PREFIX.length)) else { @@ -830,7 +829,7 @@ trait NettyRemoteServerModule extends RemoteServerModule { def unregisterPerSession(id: String) { if (_isRunning.isOn) { - EventHandler.info(this, "Unregistering server side remote actor with id [%s]".format(id)) + app.eventHandler.info(this, "Unregistering server side remote actor with id [%s]".format(id)) actorsFactories.remove(id) } @@ -936,7 +935,7 @@ class RemoteServerHandler( override def channelConnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = { val clientAddress = getClientAddress(ctx) - EventHandler.debug(this, "Remote client [%s] connected to [%s]".format(clientAddress, server.name)) + app.eventHandler.debug(this, "Remote client [%s] connected to [%s]".format(clientAddress, server.name)) sessionActors.set(event.getChannel(), new ConcurrentHashMap[String, ActorRef]()) server.notifyListeners(RemoteServerClientConnected(server, clientAddress)) @@ -945,7 +944,7 @@ class RemoteServerHandler( override def channelDisconnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = { val clientAddress = getClientAddress(ctx) - EventHandler.debug(this, "Remote client [%s] disconnected from [%s]".format(clientAddress, server.name)) + app.eventHandler.debug(this, "Remote client [%s] disconnected from [%s]".format(clientAddress, server.name)) // stop all session actors for ( @@ -955,7 +954,7 @@ class RemoteServerHandler( try { actor ! PoisonPill } catch { - case e: Exception ⇒ EventHandler.error(e, this, "Couldn't stop %s".format(actor)) + case e: Exception ⇒ app.eventHandler.error(e, this, "Couldn't stop %s".format(actor)) } } @@ -964,7 +963,7 @@ class RemoteServerHandler( override def channelClosed(ctx: ChannelHandlerContext, event: ChannelStateEvent) = { val clientAddress = getClientAddress(ctx) - EventHandler.debug("Remote client [%s] channel closed from [%s]".format(clientAddress, server.name), this) + app.eventHandler.debug("Remote client [%s] channel closed from [%s]".format(clientAddress, server.name), this) server.notifyListeners(RemoteServerClientClosed(server, clientAddress)) } @@ -984,7 +983,7 @@ class RemoteServerHandler( } override def exceptionCaught(ctx: ChannelHandlerContext, event: ExceptionEvent) = { - EventHandler.error(event.getCause, this, "Unexpected exception from remote downstream") + app.eventHandler.error(event.getCause, this, "Unexpected exception from remote downstream") event.getChannel.close server.notifyListeners(RemoteServerError(event.getCause, server)) @@ -997,25 +996,25 @@ class RemoteServerHandler( } private def handleRemoteMessageProtocol(request: RemoteMessageProtocol, channel: Channel) = try { - EventHandler.debug(this, "Received remote message [%s]".format(request)) + app.eventHandler.debug(this, "Received remote message [%s]".format(request)) dispatchToActor(request, channel) } catch { case e: Exception ⇒ server.notifyListeners(RemoteServerError(e, server)) - EventHandler.error(e, this, e.getMessage) + app.eventHandler.error(e, this, e.getMessage) } private def dispatchToActor(request: RemoteMessageProtocol, channel: Channel) { val actorInfo = request.getActorInfo - EventHandler.debug(this, "Dispatching to remote actor [%s]".format(actorInfo.getUuid)) + app.eventHandler.debug(this, "Dispatching to remote actor [%s]".format(actorInfo.getUuid)) val actorRef = try { createActor(actorInfo, channel) } catch { case e: SecurityException ⇒ - EventHandler.error(e, this, e.getMessage) + app.eventHandler.error(e, this, e.getMessage) write(channel, createErrorReplyMessage(e, request)) server.notifyListeners(RemoteServerError(e, server)) return @@ -1078,7 +1077,7 @@ class RemoteServerHandler( // else addr // } - EventHandler.debug(this, + app.eventHandler.debug(this, "Looking up a remotely available actor for address [%s] on node [%s]" .format(address, app.nodename)) diff --git a/akka-remote/src/main/scala/akka/serialization/SerializationProtocol.scala b/akka-remote/src/main/scala/akka/serialization/SerializationProtocol.scala index 593fa65de0..2277b9b6c2 100644 --- a/akka-remote/src/main/scala/akka/serialization/SerializationProtocol.scala +++ b/akka-remote/src/main/scala/akka/serialization/SerializationProtocol.scala @@ -144,7 +144,7 @@ class ActorSerialization(val app: AkkaApplication) { overriddenUuid: Option[UUID], loader: Option[ClassLoader]): ActorRef = { - EventHandler.debug(this, "Deserializing SerializedActorRefProtocol to LocalActorRef:\n%s".format(protocol)) + app.eventHandler.debug(this, "Deserializing SerializedActorRefProtocol to LocalActorRef:\n%s".format(protocol)) // import ReplicationStorageType._ // import ReplicationStrategyType._ @@ -240,7 +240,7 @@ class RemoteActorSerialization(val app: AkkaApplication) { * Deserializes a RemoteActorRefProtocol Protocol Buffers (protobuf) Message into an RemoteActorRef instance. */ private[akka] def fromProtobufToRemoteActorRef(protocol: RemoteActorRefProtocol, loader: Option[ClassLoader]): ActorRef = { - EventHandler.debug(this, "Deserializing RemoteActorRefProtocol to RemoteActorRef:\n %s".format(protocol)) + app.eventHandler.debug(this, "Deserializing RemoteActorRefProtocol to RemoteActorRef:\n %s".format(protocol)) val ref = RemoteActorRef( app, app.remote, @@ -248,7 +248,7 @@ class RemoteActorSerialization(val app: AkkaApplication) { protocol.getAddress, loader) - EventHandler.debug(this, "Newly deserialized RemoteActorRef has uuid: %s".format(ref.uuid)) + app.eventHandler.debug(this, "Newly deserialized RemoteActorRef has uuid: %s".format(ref.uuid)) ref } @@ -267,7 +267,7 @@ class RemoteActorSerialization(val app: AkkaApplication) { app.reflective.RemoteModule.configDefaultAddress } - EventHandler.debug(this, "Register serialized Actor [%s] as remote @ [%s]".format(actor.uuid, remoteAddress)) + app.eventHandler.debug(this, "Register serialized Actor [%s] as remote @ [%s]".format(actor.uuid, remoteAddress)) RemoteActorRefProtocol.newBuilder .setInetSocketAddress(ByteString.copyFrom(JavaSerializer.toBinary(remoteAddress))) diff --git a/akka-stm/src/main/scala/akka/agent/Agent.scala b/akka-stm/src/main/scala/akka/agent/Agent.scala index 2d8f541195..a32a669f95 100644 --- a/akka-stm/src/main/scala/akka/agent/Agent.scala +++ b/akka-stm/src/main/scala/akka/agent/Agent.scala @@ -153,7 +153,7 @@ class Agent[T](initialValue: T, application: AkkaApplication) { def sendOff(f: T ⇒ T): Unit = { send((value: T) ⇒ { suspend() - val pinnedDispatcher = new PinnedDispatcher(null, "agent-send-off", UnboundedMailbox(), application.AkkaConfig.ActorTimeoutMillis) + val pinnedDispatcher = new PinnedDispatcher(application, null, "agent-send-off", UnboundedMailbox(), application.AkkaConfig.ActorTimeoutMillis) val threadBased = application.createActor(Props(new ThreadBasedAgentUpdater(this)).withDispatcher(pinnedDispatcher)) threadBased ! Update(f) value @@ -171,7 +171,7 @@ class Agent[T](initialValue: T, application: AkkaApplication) { val result = new DefaultPromise[T](timeout)(application.dispatcher) send((value: T) ⇒ { suspend() - val pinnedDispatcher = new PinnedDispatcher(null, "agent-alter-off", UnboundedMailbox(), application.AkkaConfig.ActorTimeoutMillis) + val pinnedDispatcher = new PinnedDispatcher(application, null, "agent-alter-off", UnboundedMailbox(), application.AkkaConfig.ActorTimeoutMillis) val threadBased = application.createActor(Props(new ThreadBasedAgentUpdater(this)).withDispatcher(pinnedDispatcher)) result completeWith threadBased.?(Update(f), timeout).asInstanceOf[Future[T]] value diff --git a/akka-stm/src/test/java/akka/transactor/test/UntypedCoordinatedIncrementTest.java b/akka-stm/src/test/java/akka/transactor/test/UntypedCoordinatedIncrementTest.java index 652f8896f0..0e0daa9bb1 100644 --- a/akka-stm/src/test/java/akka/transactor/test/UntypedCoordinatedIncrementTest.java +++ b/akka-stm/src/test/java/akka/transactor/test/UntypedCoordinatedIncrementTest.java @@ -77,7 +77,7 @@ public class UntypedCoordinatedIncrementTest { EventFilter expectedFailureFilter = (EventFilter) new ErrorFilter(ExpectedFailureException.class); EventFilter coordinatedFilter = (EventFilter) new ErrorFilter(CoordinatedTransactionException.class); Seq ignoreExceptions = seq(expectedFailureFilter, coordinatedFilter); - EventHandler.notify(new TestEvent.Mute(ignoreExceptions)); + application.eventHandler().notify(new TestEvent.Mute(ignoreExceptions)); CountDownLatch incrementLatch = new CountDownLatch(numCounters); List actors = new ArrayList(counters); actors.add(failer); @@ -98,7 +98,7 @@ public class UntypedCoordinatedIncrementTest { } } } - EventHandler.notify(new TestEvent.UnMute(ignoreExceptions)); + application.eventHandler().notify(new TestEvent.UnMute(ignoreExceptions)); } public Seq seq(A... args) { diff --git a/akka-stm/src/test/java/akka/transactor/test/UntypedTransactorTest.java b/akka-stm/src/test/java/akka/transactor/test/UntypedTransactorTest.java index 0bd7402036..938ac39ae8 100644 --- a/akka-stm/src/test/java/akka/transactor/test/UntypedTransactorTest.java +++ b/akka-stm/src/test/java/akka/transactor/test/UntypedTransactorTest.java @@ -76,7 +76,7 @@ public class UntypedTransactorTest { EventFilter expectedFailureFilter = (EventFilter) new ErrorFilter(ExpectedFailureException.class); EventFilter coordinatedFilter = (EventFilter) new ErrorFilter(CoordinatedTransactionException.class); Seq ignoreExceptions = seq(expectedFailureFilter, coordinatedFilter); - EventHandler.notify(new TestEvent.Mute(ignoreExceptions)); + application.eventHandler().notify(new TestEvent.Mute(ignoreExceptions)); CountDownLatch incrementLatch = new CountDownLatch(numCounters); List actors = new ArrayList(counters); actors.add(failer); @@ -97,7 +97,7 @@ public class UntypedTransactorTest { } } } - EventHandler.notify(new TestEvent.UnMute(ignoreExceptions)); + application.eventHandler().notify(new TestEvent.UnMute(ignoreExceptions)); } public Seq seq(A... args) { diff --git a/akka-stm/src/test/scala/transactor/CoordinatedIncrementSpec.scala b/akka-stm/src/test/scala/transactor/CoordinatedIncrementSpec.scala index 5cd0459b5e..18dea7136d 100644 --- a/akka-stm/src/test/scala/transactor/CoordinatedIncrementSpec.scala +++ b/akka-stm/src/test/scala/transactor/CoordinatedIncrementSpec.scala @@ -1,7 +1,5 @@ package akka.transactor.test -import org.scalatest.WordSpec -import org.scalatest.matchers.MustMatchers import org.scalatest.BeforeAndAfterAll import akka.AkkaApplication @@ -55,7 +53,7 @@ object CoordinatedIncrement { } } -class CoordinatedIncrementSpec extends WordSpec with MustMatchers with BeforeAndAfterAll { +class CoordinatedIncrementSpec extends AkkaSpec with BeforeAndAfterAll { import CoordinatedIncrement._ val application = AkkaApplication("CoordinatedIncrementSpec") @@ -88,7 +86,7 @@ class CoordinatedIncrementSpec extends WordSpec with MustMatchers with BeforeAnd EventFilter[ExpectedFailureException], EventFilter[CoordinatedTransactionException], EventFilter[ActorTimeoutException]) - EventHandler.notify(TestEvent.Mute(ignoreExceptions)) + app.eventHandler.notify(TestEvent.Mute(ignoreExceptions)) val (counters, failer) = createActors val coordinated = Coordinated() counters(0) ! Coordinated(Increment(counters.tail :+ failer)) @@ -98,7 +96,7 @@ class CoordinatedIncrementSpec extends WordSpec with MustMatchers with BeforeAnd } counters foreach (_.stop()) failer.stop() - EventHandler.notify(TestEvent.UnMute(ignoreExceptions)) + app.eventHandler.notify(TestEvent.UnMute(ignoreExceptions)) } } } diff --git a/akka-stm/src/test/scala/transactor/FickleFriendsSpec.scala b/akka-stm/src/test/scala/transactor/FickleFriendsSpec.scala index fc7d84e861..e7144ee755 100644 --- a/akka-stm/src/test/scala/transactor/FickleFriendsSpec.scala +++ b/akka-stm/src/test/scala/transactor/FickleFriendsSpec.scala @@ -98,7 +98,7 @@ object FickleFriends { } } -class FickleFriendsSpec extends WordSpec with MustMatchers with BeforeAndAfterAll { +class FickleFriendsSpec extends AkkaSpec with BeforeAndAfterAll { import FickleFriends._ val application = AkkaApplication("FickleFriendsSpec") @@ -119,7 +119,7 @@ class FickleFriendsSpec extends WordSpec with MustMatchers with BeforeAndAfterAl EventFilter[ExpectedFailureException], EventFilter[CoordinatedTransactionException], EventFilter[ActorTimeoutException]) - EventHandler.notify(TestEvent.Mute(ignoreExceptions)) + app.eventHandler.notify(TestEvent.Mute(ignoreExceptions)) val (counters, coordinator) = createActors val latch = new CountDownLatch(1) coordinator ! FriendlyIncrement(counters, latch) @@ -130,7 +130,7 @@ class FickleFriendsSpec extends WordSpec with MustMatchers with BeforeAndAfterAl } counters foreach (_.stop()) coordinator.stop() - EventHandler.notify(TestEvent.UnMute(ignoreExceptions)) + app.eventHandler.notify(TestEvent.UnMute(ignoreExceptions)) } } } diff --git a/akka-stm/src/test/scala/transactor/TransactorSpec.scala b/akka-stm/src/test/scala/transactor/TransactorSpec.scala index 7d72665402..5ebbc0d5d3 100644 --- a/akka-stm/src/test/scala/transactor/TransactorSpec.scala +++ b/akka-stm/src/test/scala/transactor/TransactorSpec.scala @@ -75,7 +75,7 @@ object SimpleTransactor { } } -class TransactorSpec extends WordSpec with MustMatchers { +class TransactorSpec extends AkkaSpec { import TransactorIncrement._ import SimpleTransactor._ @@ -109,7 +109,7 @@ class TransactorSpec extends WordSpec with MustMatchers { EventFilter[ExpectedFailureException], EventFilter[CoordinatedTransactionException], EventFilter[ActorTimeoutException]) - EventHandler.notify(TestEvent.Mute(ignoreExceptions)) + app.eventHandler.notify(TestEvent.Mute(ignoreExceptions)) val (counters, failer) = createTransactors val failLatch = TestLatch(numCounters) counters(0) ! Increment(counters.tail :+ failer, failLatch) @@ -119,7 +119,7 @@ class TransactorSpec extends WordSpec with MustMatchers { } counters foreach (_.stop()) failer.stop() - EventHandler.notify(TestEvent.UnMute(ignoreExceptions)) + app.eventHandler.notify(TestEvent.UnMute(ignoreExceptions)) } } diff --git a/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala b/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala index 64b08ac537..9cf9992809 100644 --- a/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala +++ b/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala @@ -12,6 +12,7 @@ import java.lang.ref.WeakReference import scala.annotation.tailrec import akka.actor.ActorCell import akka.dispatch._ +import akka.AkkaApplication /* * Locking rules: @@ -32,8 +33,6 @@ import akka.dispatch._ object CallingThreadDispatcher { - lazy val global = new CallingThreadDispatcher("global-calling-thread") - // PRIVATE DATA private var queues = Map[CallingThreadMailbox, Set[WeakReference[NestingQueue]]]() @@ -104,7 +103,7 @@ object CallingThreadDispatcher { * @author Roland Kuhn * @since 1.1 */ -class CallingThreadDispatcher(val name: String = "calling-thread", val warnings: Boolean = true) extends MessageDispatcher { +class CallingThreadDispatcher(_app: AkkaApplication, val name: String = "calling-thread", val warnings: Boolean = true) extends MessageDispatcher(_app) { import CallingThreadDispatcher._ protected[akka] override def createMailbox(actor: ActorCell) = new CallingThreadMailbox(this) @@ -157,14 +156,14 @@ class CallingThreadDispatcher(val name: String = "calling-thread", val warnings: val execute = mbox.suspendSwitch.fold { queue.push(handle) if (warnings && handle.channel.isInstanceOf[Promise[_]]) { - EventHandler.warning(this, "suspendSwitch, creating Future could deadlock; target: %s" format handle.receiver) + app.eventHandler.warning(this, "suspendSwitch, creating Future could deadlock; target: %s" format handle.receiver) } false } { queue.push(handle) if (queue.isActive) { if (warnings && handle.channel.isInstanceOf[Promise[_]]) { - EventHandler.warning(this, "blocked on this thread, creating Future could deadlock; target: %s" format handle.receiver) + app.eventHandler.warning(this, "blocked on this thread, creating Future could deadlock; target: %s" format handle.receiver) } false } else { @@ -204,13 +203,13 @@ class CallingThreadDispatcher(val name: String = "calling-thread", val warnings: handle.invoke if (warnings) handle.channel match { case f: ActorPromise if !f.isCompleted ⇒ - EventHandler.warning(this, "calling %s with message %s did not reply as expected, might deadlock" format (handle.receiver, handle.message)) + app.eventHandler.warning(this, "calling %s with message %s did not reply as expected, might deadlock" format (handle.receiver, handle.message)) case _ ⇒ } true } catch { case e ⇒ - EventHandler.error(this, e) + app.eventHandler.error(this, e) queue.leave false } diff --git a/akka-testkit/src/main/scala/akka/testkit/TestActorRef.scala b/akka-testkit/src/main/scala/akka/testkit/TestActorRef.scala index a872fd5bbd..ad8720d397 100644 --- a/akka-testkit/src/main/scala/akka/testkit/TestActorRef.scala +++ b/akka-testkit/src/main/scala/akka/testkit/TestActorRef.scala @@ -19,8 +19,8 @@ import akka.AkkaApplication * @author Roland Kuhn * @since 1.1 */ -class TestActorRef[T <: Actor](application: AkkaApplication, props: Props, address: String) - extends LocalActorRef(application, props.withDispatcher(CallingThreadDispatcher.global), address, false) { +class TestActorRef[T <: Actor](_app: AkkaApplication, props: Props, address: String) + extends LocalActorRef(_app, props.withDispatcher(new CallingThreadDispatcher(_app)), address, false) { /** * Directly inject messages into actor receive behavior. Any exceptions * thrown will be available to you, while still being able to use diff --git a/akka-testkit/src/main/scala/akka/testkit/TestKit.scala b/akka-testkit/src/main/scala/akka/testkit/TestKit.scala index bdce4702a6..5fa8d136f1 100644 --- a/akka-testkit/src/main/scala/akka/testkit/TestKit.scala +++ b/akka-testkit/src/main/scala/akka/testkit/TestKit.scala @@ -99,7 +99,7 @@ class TestKit(_app: AkkaApplication) { * ActorRef of the test actor. Access is provided to enable e.g. * registration as message target. */ - val testActor: ActorRef = new LocalActorRef(app, Props(new TestActor(queue)).copy(dispatcher = CallingThreadDispatcher.global), "testActor" + TestKit.testActorId.incrementAndGet(), true) + val testActor: ActorRef = new LocalActorRef(app, Props(new TestActor(queue)).copy(dispatcher = new CallingThreadDispatcher(app)), "testActor" + TestKit.testActorId.incrementAndGet(), true) private var end: Duration = Duration.Inf diff --git a/akka-testkit/src/main/scala/akka/testkit/package.scala b/akka-testkit/src/main/scala/akka/testkit/package.scala index c24c26539c..54c6ed8182 100644 --- a/akka-testkit/src/main/scala/akka/testkit/package.scala +++ b/akka-testkit/src/main/scala/akka/testkit/package.scala @@ -3,16 +3,16 @@ package akka import akka.event.EventHandler package object testkit { - def filterEvents[T](eventFilters: Iterable[EventFilter])(block: ⇒ T): T = { - EventHandler.notify(TestEvent.Mute(eventFilters.toSeq)) + def filterEvents[T](eventFilters: Iterable[EventFilter])(block: ⇒ T)(implicit app: AkkaApplication): T = { + app.eventHandler.notify(TestEvent.Mute(eventFilters.toSeq)) try { block } finally { - EventHandler.notify(TestEvent.UnMute(eventFilters.toSeq)) + app.eventHandler.notify(TestEvent.UnMute(eventFilters.toSeq)) } } - def filterEvents[T](eventFilters: EventFilter*)(block: ⇒ T): T = filterEvents(eventFilters.toSeq)(block) + def filterEvents[T](eventFilters: EventFilter*)(block: ⇒ T)(implicit app: AkkaApplication): T = filterEvents(eventFilters.toSeq)(block) - def filterException[T <: Throwable: Manifest](block: ⇒ Unit): Unit = filterEvents(Seq(EventFilter[T]))(block) + def filterException[T <: Throwable](block: ⇒ Unit)(implicit app: AkkaApplication, m: Manifest[T]): Unit = filterEvents(Seq(EventFilter[T]))(block) } diff --git a/akka-testkit/src/test/scala/akka/testkit/TestActorRefSpec.scala b/akka-testkit/src/test/scala/akka/testkit/TestActorRefSpec.scala index 023b72425c..c147a496e8 100644 --- a/akka-testkit/src/test/scala/akka/testkit/TestActorRefSpec.scala +++ b/akka-testkit/src/test/scala/akka/testkit/TestActorRefSpec.scala @@ -94,8 +94,6 @@ class TestActorRefSpec extends AkkaSpec with BeforeAndAfterEach { import TestActorRefSpec._ - EventHandler.start() - override def beforeEach { otherthread = null } diff --git a/akka-tutorials/akka-tutorial-second/src/main/scala/Pi.scala b/akka-tutorials/akka-tutorial-second/src/main/scala/Pi.scala index 899b5e09fc..4d0a19f9cc 100644 --- a/akka-tutorials/akka-tutorial-second/src/main/scala/Pi.scala +++ b/akka-tutorials/akka-tutorial-second/src/main/scala/Pi.scala @@ -108,9 +108,9 @@ object Pi extends App { master.?(Calculate, Timeout(60000)). await.resultOrException match { //wait for the result, with a 60 seconds timeout case Some(pi) ⇒ - EventHandler.info(this, "\n\tPi estimate: \t\t%s\n\tCalculation time: \t%s millis".format(pi, (now - start))) + app.eventHandler.info(this, "\n\tPi estimate: \t\t%s\n\tCalculation time: \t%s millis".format(pi, (now - start))) case None ⇒ - EventHandler.error(this, "Pi calculation did not complete within the timeout.") + app.eventHandler.error(this, "Pi calculation did not complete within the timeout.") } } }