diff --git a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala index e0a4095d3e..330e085b1a 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala @@ -578,21 +578,24 @@ class LocalDeathWatch extends DeathWatch with ActorClassification { * when the scheduler is created. */ class DefaultScheduler(hashedWheelTimer: HashedWheelTimer, log: LoggingAdapter, dispatcher: ⇒ MessageDispatcher) extends Scheduler with Closeable { + import org.jboss.netty.akka.util.{ Timeout ⇒ HWTimeout } + + private def exec(task: TimerTask, delay: Duration): HWTimeout = hashedWheelTimer.newTimeout(task, delay) def schedule(receiver: ActorRef, message: Any, initialDelay: Duration, delay: Duration): Cancellable = - new DefaultCancellable(hashedWheelTimer.newTimeout(createContinuousTask(receiver, message, delay), initialDelay)) + new DefaultCancellable(exec(createContinuousTask(receiver, message, delay), initialDelay)) def schedule(f: () ⇒ Unit, initialDelay: Duration, delay: Duration): Cancellable = - new DefaultCancellable(hashedWheelTimer.newTimeout(createContinuousTask(f, delay), initialDelay)) + new DefaultCancellable(exec(createContinuousTask(f, delay), initialDelay)) def scheduleOnce(runnable: Runnable, delay: Duration): Cancellable = - new DefaultCancellable(hashedWheelTimer.newTimeout(createSingleTask(runnable), delay)) + new DefaultCancellable(exec(createSingleTask(runnable), delay)) def scheduleOnce(receiver: ActorRef, message: Any, delay: Duration): Cancellable = - new DefaultCancellable(hashedWheelTimer.newTimeout(createSingleTask(receiver, message), delay)) + new DefaultCancellable(exec(createSingleTask(receiver, message), delay)) def scheduleOnce(f: () ⇒ Unit, delay: Duration): Cancellable = - new DefaultCancellable(hashedWheelTimer.newTimeout(createSingleTask(f), delay)) + new DefaultCancellable(exec(createSingleTask(f), delay)) private def createSingleTask(runnable: Runnable): TimerTask = new TimerTask() { @@ -621,7 +624,9 @@ class DefaultScheduler(hashedWheelTimer: HashedWheelTimer, log: LoggingAdapter, // Check if the receiver is still alive and kicking before sending it a message and reschedule the task if (!receiver.isTerminated) { receiver ! message - timeout.getTimer.newTimeout(this, delay) + try timeout.getTimer.newTimeout(this, delay) catch { + case _: IllegalStateException ⇒ // stop recurring if timer is stopped + } } else { log.warning("Could not reschedule message to be sent because receiving actor has been terminated.") } @@ -633,12 +638,24 @@ class DefaultScheduler(hashedWheelTimer: HashedWheelTimer, log: LoggingAdapter, new TimerTask { def run(timeout: org.jboss.netty.akka.util.Timeout) { dispatcher.dispatchTask(f) - timeout.getTimer.newTimeout(this, delay) + try timeout.getTimer.newTimeout(this, delay) catch { + case _: IllegalStateException ⇒ // stop recurring if timer is stopped + } } } } - def close() = hashedWheelTimer.stop() + private def execDirectly(t: HWTimeout): Unit = { + try t.getTask.run(t) catch { + case e: InterruptedException ⇒ throw e + case e: Exception ⇒ log.error(e, "exception while executing timer task") + } + } + + def close() = { + import scala.collection.JavaConverters._ + hashedWheelTimer.stop().asScala foreach (t ⇒ execDirectly(t)) + } } class DefaultCancellable(val timeout: org.jboss.netty.akka.util.Timeout) extends Cancellable { diff --git a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala index a38cdccb22..484c81db58 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala @@ -406,6 +406,7 @@ class ActorSystemImpl(val name: String, applicationConfig: Config) extends Actor // this starts the reaper actor and the user-configured logging subscribers, which are also actors eventStream.start(this) eventStream.startDefaultLoggers(this) + registerOnTermination(stopScheduler()) loadExtensions() if (LogConfigOnStart) logConfiguration() this @@ -416,16 +417,15 @@ class ActorSystemImpl(val name: String, applicationConfig: Config) extends Actor def registerOnTermination[T](code: ⇒ T) { terminationFuture onComplete (_ ⇒ code) } def registerOnTermination(code: Runnable) { terminationFuture onComplete (_ ⇒ code.run) } - // TODO shutdown all that other stuff, whatever that may be def stop() { guardian.stop() - try terminationFuture.await(10 seconds) catch { - case _: FutureTimeoutException ⇒ log.warning("Failed to stop [{}] within 10 seconds", name) - } - // Dispatchers shutdown themselves, but requires the scheduler - terminationFuture onComplete (_ ⇒ stopScheduler()) } + /** + * Create the scheduler service. This one needs one special behavior: if + * Closeable, it MUST execute all outstanding tasks upon .close() in order + * to properly shutdown all dispatchers. + */ protected def createScheduler(): Scheduler = { val threadFactory = new MonitorableThreadFactory("DefaultScheduler") val hwt = new HashedWheelTimer(log, threadFactory, settings.SchedulerTickDuration, settings.SchedulerTicksPerWheel) @@ -443,12 +443,14 @@ class ActorSystemImpl(val name: String, applicationConfig: Config) extends Actor new DefaultScheduler(hwt, log, safeDispatcher) } + /* + * This is called after the last actor has signaled its termination, i.e. + * after the last dispatcher has had its chance to schedule its shutdown + * action. + */ protected def stopScheduler(): Unit = scheduler match { - case x: Closeable ⇒ - // Let dispatchers shutdown first. - // Dispatchers schedule shutdown and may also reschedule, therefore wait 4 times the shutdown delay. - x.scheduleOnce(() ⇒ { x.close(); dispatcher.shutdown() }, settings.DispatcherDefaultShutdown * 4) - case _ ⇒ + case x: Closeable ⇒ x.close() + case _ ⇒ } private val extensions = new ConcurrentIdentityHashMap[ExtensionId[_], AnyRef] diff --git a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala index 0845978e4a..0dba2805ad 100644 --- a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala @@ -212,7 +212,9 @@ abstract class MessageDispatcher(val prerequisites: DispatcherPrerequisites) ext } case RESCHEDULED ⇒ if (shutdownScheduleUpdater.compareAndSet(MessageDispatcher.this, RESCHEDULED, SCHEDULED)) - scheduler.scheduleOnce(this, shutdownTimeout) + try scheduler.scheduleOnce(this, shutdownTimeout) catch { + case _: IllegalStateException ⇒ shutdown() + } else run() } } diff --git a/akka-actor/src/main/scala/akka/dispatch/Future.scala b/akka-actor/src/main/scala/akka/dispatch/Future.scala index b9def99301..cd8c22de6e 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Future.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Future.scala @@ -956,7 +956,11 @@ class DefaultPromise[T](val timeout: Timeout)(implicit val dispatcher: MessageDi val runnable = new Runnable { def run() { if (!isCompleted) { - if (!isExpired) dispatcher.prerequisites.scheduler.scheduleOnce(this, Duration(timeLeftNoinline(), TimeUnit.NANOSECONDS)) + if (!isExpired) + try dispatcher.prerequisites.scheduler.scheduleOnce(this, Duration(timeLeftNoinline(), TimeUnit.NANOSECONDS)) + catch { + case _: IllegalStateException ⇒ func(DefaultPromise.this) + } else func(DefaultPromise.this) } } @@ -983,8 +987,17 @@ class DefaultPromise[T](val timeout: Timeout)(implicit val dispatcher: MessageDi val runnable = new Runnable { def run() { if (!isCompleted) { - if (!isExpired) dispatcher.prerequisites.scheduler.scheduleOnce(this, Duration(timeLeftNoinline(), TimeUnit.NANOSECONDS)) - else promise complete (try { Right(fallback) } catch { case e ⇒ Left(e) }) // FIXME catching all and continue isn't good for OOME, ticket #1418 + val done = + if (!isExpired) + try { + dispatcher.prerequisites.scheduler.scheduleOnce(this, Duration(timeLeftNoinline(), TimeUnit.NANOSECONDS)) + true + } catch { + case _: IllegalStateException ⇒ false + } + else false + if (!done) + promise complete (try { Right(fallback) } catch { case e ⇒ Left(e) }) // FIXME catching all and continue isn't good for OOME, ticket #1418 } } }