make scheduler shutdown more stringent

- run closeScheduler upon ActorSystem termination (directly)
- this will execute all outstanding tasks (dispatcher shutdowns have
  been queued already, because the last actor has already exited)
- further use of the scheduler (e.g. from tasks just being run) results
  in IllegalStateException
- catch such exceptions in DefaultPromise&MessageDispatcher in case of
  self-rescheduling tasks and execute final action immediately
- also silently stop recurring tasks
This commit is contained in:
Roland 2011-12-03 21:26:32 +01:00
parent ed4e302d2a
commit 1755aedb58
4 changed files with 57 additions and 23 deletions

View file

@ -578,21 +578,24 @@ class LocalDeathWatch extends DeathWatch with ActorClassification {
* when the scheduler is created.
*/
class DefaultScheduler(hashedWheelTimer: HashedWheelTimer, log: LoggingAdapter, dispatcher: MessageDispatcher) extends Scheduler with Closeable {
import org.jboss.netty.akka.util.{ Timeout HWTimeout }
private def exec(task: TimerTask, delay: Duration): HWTimeout = hashedWheelTimer.newTimeout(task, delay)
def schedule(receiver: ActorRef, message: Any, initialDelay: Duration, delay: Duration): Cancellable =
new DefaultCancellable(hashedWheelTimer.newTimeout(createContinuousTask(receiver, message, delay), initialDelay))
new DefaultCancellable(exec(createContinuousTask(receiver, message, delay), initialDelay))
def schedule(f: () Unit, initialDelay: Duration, delay: Duration): Cancellable =
new DefaultCancellable(hashedWheelTimer.newTimeout(createContinuousTask(f, delay), initialDelay))
new DefaultCancellable(exec(createContinuousTask(f, delay), initialDelay))
def scheduleOnce(runnable: Runnable, delay: Duration): Cancellable =
new DefaultCancellable(hashedWheelTimer.newTimeout(createSingleTask(runnable), delay))
new DefaultCancellable(exec(createSingleTask(runnable), delay))
def scheduleOnce(receiver: ActorRef, message: Any, delay: Duration): Cancellable =
new DefaultCancellable(hashedWheelTimer.newTimeout(createSingleTask(receiver, message), delay))
new DefaultCancellable(exec(createSingleTask(receiver, message), delay))
def scheduleOnce(f: () Unit, delay: Duration): Cancellable =
new DefaultCancellable(hashedWheelTimer.newTimeout(createSingleTask(f), delay))
new DefaultCancellable(exec(createSingleTask(f), delay))
private def createSingleTask(runnable: Runnable): TimerTask =
new TimerTask() {
@ -621,7 +624,9 @@ class DefaultScheduler(hashedWheelTimer: HashedWheelTimer, log: LoggingAdapter,
// Check if the receiver is still alive and kicking before sending it a message and reschedule the task
if (!receiver.isTerminated) {
receiver ! message
timeout.getTimer.newTimeout(this, delay)
try timeout.getTimer.newTimeout(this, delay) catch {
case _: IllegalStateException // stop recurring if timer is stopped
}
} else {
log.warning("Could not reschedule message to be sent because receiving actor has been terminated.")
}
@ -633,12 +638,24 @@ class DefaultScheduler(hashedWheelTimer: HashedWheelTimer, log: LoggingAdapter,
new TimerTask {
def run(timeout: org.jboss.netty.akka.util.Timeout) {
dispatcher.dispatchTask(f)
timeout.getTimer.newTimeout(this, delay)
try timeout.getTimer.newTimeout(this, delay) catch {
case _: IllegalStateException // stop recurring if timer is stopped
}
}
}
}
def close() = hashedWheelTimer.stop()
private def execDirectly(t: HWTimeout): Unit = {
try t.getTask.run(t) catch {
case e: InterruptedException throw e
case e: Exception log.error(e, "exception while executing timer task")
}
}
def close() = {
import scala.collection.JavaConverters._
hashedWheelTimer.stop().asScala foreach (t execDirectly(t))
}
}
class DefaultCancellable(val timeout: org.jboss.netty.akka.util.Timeout) extends Cancellable {

View file

@ -406,6 +406,7 @@ class ActorSystemImpl(val name: String, applicationConfig: Config) extends Actor
// this starts the reaper actor and the user-configured logging subscribers, which are also actors
eventStream.start(this)
eventStream.startDefaultLoggers(this)
registerOnTermination(stopScheduler())
loadExtensions()
if (LogConfigOnStart) logConfiguration()
this
@ -416,16 +417,15 @@ class ActorSystemImpl(val name: String, applicationConfig: Config) extends Actor
def registerOnTermination[T](code: T) { terminationFuture onComplete (_ code) }
def registerOnTermination(code: Runnable) { terminationFuture onComplete (_ code.run) }
// TODO shutdown all that other stuff, whatever that may be
def stop() {
guardian.stop()
try terminationFuture.await(10 seconds) catch {
case _: FutureTimeoutException log.warning("Failed to stop [{}] within 10 seconds", name)
}
// Dispatchers shutdown themselves, but requires the scheduler
terminationFuture onComplete (_ stopScheduler())
}
/**
* Create the scheduler service. This one needs one special behavior: if
* Closeable, it MUST execute all outstanding tasks upon .close() in order
* to properly shutdown all dispatchers.
*/
protected def createScheduler(): Scheduler = {
val threadFactory = new MonitorableThreadFactory("DefaultScheduler")
val hwt = new HashedWheelTimer(log, threadFactory, settings.SchedulerTickDuration, settings.SchedulerTicksPerWheel)
@ -443,12 +443,14 @@ class ActorSystemImpl(val name: String, applicationConfig: Config) extends Actor
new DefaultScheduler(hwt, log, safeDispatcher)
}
/*
* This is called after the last actor has signaled its termination, i.e.
* after the last dispatcher has had its chance to schedule its shutdown
* action.
*/
protected def stopScheduler(): Unit = scheduler match {
case x: Closeable
// Let dispatchers shutdown first.
// Dispatchers schedule shutdown and may also reschedule, therefore wait 4 times the shutdown delay.
x.scheduleOnce(() { x.close(); dispatcher.shutdown() }, settings.DispatcherDefaultShutdown * 4)
case _
case x: Closeable x.close()
case _
}
private val extensions = new ConcurrentIdentityHashMap[ExtensionId[_], AnyRef]

View file

@ -212,7 +212,9 @@ abstract class MessageDispatcher(val prerequisites: DispatcherPrerequisites) ext
}
case RESCHEDULED
if (shutdownScheduleUpdater.compareAndSet(MessageDispatcher.this, RESCHEDULED, SCHEDULED))
scheduler.scheduleOnce(this, shutdownTimeout)
try scheduler.scheduleOnce(this, shutdownTimeout) catch {
case _: IllegalStateException shutdown()
}
else run()
}
}

View file

@ -956,7 +956,11 @@ class DefaultPromise[T](val timeout: Timeout)(implicit val dispatcher: MessageDi
val runnable = new Runnable {
def run() {
if (!isCompleted) {
if (!isExpired) dispatcher.prerequisites.scheduler.scheduleOnce(this, Duration(timeLeftNoinline(), TimeUnit.NANOSECONDS))
if (!isExpired)
try dispatcher.prerequisites.scheduler.scheduleOnce(this, Duration(timeLeftNoinline(), TimeUnit.NANOSECONDS))
catch {
case _: IllegalStateException func(DefaultPromise.this)
}
else func(DefaultPromise.this)
}
}
@ -983,8 +987,17 @@ class DefaultPromise[T](val timeout: Timeout)(implicit val dispatcher: MessageDi
val runnable = new Runnable {
def run() {
if (!isCompleted) {
if (!isExpired) dispatcher.prerequisites.scheduler.scheduleOnce(this, Duration(timeLeftNoinline(), TimeUnit.NANOSECONDS))
else promise complete (try { Right(fallback) } catch { case e Left(e) }) // FIXME catching all and continue isn't good for OOME, ticket #1418
val done =
if (!isExpired)
try {
dispatcher.prerequisites.scheduler.scheduleOnce(this, Duration(timeLeftNoinline(), TimeUnit.NANOSECONDS))
true
} catch {
case _: IllegalStateException false
}
else false
if (!done)
promise complete (try { Right(fallback) } catch { case e Left(e) }) // FIXME catching all and continue isn't good for OOME, ticket #1418
}
}
}