diff --git a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala index 144bb5dc9f..f33dd3f0c5 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala @@ -409,7 +409,12 @@ class LocalDeathWatch extends DeathWatch with ActorClassification { } } -class DefaultScheduler(hashedWheelTimer: HashedWheelTimer, system: ActorSystem) extends Scheduler { +/** + * Scheduled tasks (Runnable and functions) are executed with the supplied dispatcher. + * Note that dispatcher is by-name parameter, because dispatcher might not be initialized + * when the scheduler is created. + */ +class DefaultScheduler(hashedWheelTimer: HashedWheelTimer, log: LoggingAdapter, dispatcher: ⇒ MessageDispatcher) extends Scheduler { def schedule(receiver: ActorRef, message: Any, initialDelay: Duration, delay: Duration): Cancellable = new DefaultCancellable(hashedWheelTimer.newTimeout(createContinuousTask(receiver, message, delay), initialDelay)) @@ -429,7 +434,7 @@ class DefaultScheduler(hashedWheelTimer: HashedWheelTimer, system: ActorSystem) private def createSingleTask(runnable: Runnable): TimerTask = new TimerTask() { def run(timeout: org.jboss.netty.akka.util.Timeout) { - system.dispatcher.dispatchTask(() ⇒ runnable.run()) + dispatcher.dispatchTask(() ⇒ runnable.run()) } } @@ -443,7 +448,7 @@ class DefaultScheduler(hashedWheelTimer: HashedWheelTimer, system: ActorSystem) private def createSingleTask(f: () ⇒ Unit): TimerTask = new TimerTask { def run(timeout: org.jboss.netty.akka.util.Timeout) { - system.dispatcher.dispatchTask(f) + dispatcher.dispatchTask(f) } } @@ -455,7 +460,7 @@ class DefaultScheduler(hashedWheelTimer: HashedWheelTimer, system: ActorSystem) receiver ! message timeout.getTimer.newTimeout(this, delay) } else { - system.eventStream.publish(Warning(this.getClass.getSimpleName, "Could not reschedule message to be sent because receiving actor has been terminated.")) + log.warning("Could not reschedule message to be sent because receiving actor has been terminated.") } } } @@ -464,7 +469,7 @@ class DefaultScheduler(hashedWheelTimer: HashedWheelTimer, system: ActorSystem) private def createContinuousTask(f: () ⇒ Unit, delay: Duration): TimerTask = { new TimerTask { def run(timeout: org.jboss.netty.akka.util.Timeout) { - system.dispatcher.dispatchTask(f) + dispatcher.dispatchTask(f) timeout.getTimer.newTimeout(this, delay) } } diff --git a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala index d66a66d3b1..83f88c1a61 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala @@ -302,7 +302,7 @@ class ActorSystemImpl(val name: String, val applicationConfig: Config) extends A eventStream.startStdoutLogger(settings) val log = new BusLogging(eventStream, "ActorSystem") // “this” used only for .getClass in tagging messages - val scheduler = new DefaultScheduler(new HashedWheelTimer(log, Executors.defaultThreadFactory, settings.SchedulerTickDuration, settings.SchedulerTicksPerWheel), this) + val scheduler = createScheduler() val provider: ActorRefProvider = { val providerClass = ReflectiveAccess.getClassFor(ProviderClass) match { @@ -336,6 +336,7 @@ class ActorSystemImpl(val name: String, val applicationConfig: Config) extends A } val dispatcherFactory = new Dispatchers(settings, DefaultDispatcherPrerequisites(eventStream, deadLetterMailbox, scheduler)) + // TODO why implicit val dispatcher? implicit val dispatcher = dispatcherFactory.defaultGlobalDispatcher //FIXME Set this to a Failure when things bubble to the top @@ -376,10 +377,32 @@ class ActorSystemImpl(val name: String, val applicationConfig: Config) extends A // TODO shutdown all that other stuff, whatever that may be def stop() { guardian.stop() - terminationFuture onComplete (_ ⇒ scheduler.stop()) + terminationFuture onComplete (_ ⇒ stopScheduler()) terminationFuture onComplete (_ ⇒ dispatcher.shutdown()) } + protected def createScheduler(): Scheduler = { + val threadFactory = new MonitorableThreadFactory("DefaultScheduler") + val hwt = new HashedWheelTimer(log, threadFactory, settings.SchedulerTickDuration, settings.SchedulerTicksPerWheel) + // note that dispatcher is by-name parameter in DefaultScheduler constructor, + // because dispatcher is not initialized when the scheduler is created + def safeDispatcher = { + if (dispatcher eq null) { + val exc = new IllegalStateException("Scheduler is using dispatcher before it has been initialized") + log.error(exc, exc.getMessage) + throw exc + } else { + dispatcher + } + } + new DefaultScheduler(hwt, log, safeDispatcher) + } + + protected def stopScheduler(): Unit = scheduler match { + case x: DefaultScheduler ⇒ x.stop() + case _ ⇒ + } + private val extensions = new ConcurrentIdentityHashMap[ExtensionId[_], AnyRef] /**