Merge pull request #1400 from akka/wip-3307-prepare-EC-∂π

call executor.prepare() for scheduled jobs, see #3307
This commit is contained in:
Roland Kuhn 2013-05-03 12:11:51 -07:00
commit c84bbea4aa

View file

@ -230,16 +230,18 @@ class LightArrayRevolverScheduler(config: Config,
override def schedule(initialDelay: FiniteDuration,
delay: FiniteDuration,
runnable: Runnable)(implicit executor: ExecutionContext): Cancellable =
runnable: Runnable)(implicit executor: ExecutionContext): Cancellable = {
val preparedEC = executor.prepare()
try new AtomicReference[Cancellable](InitialRepeatMarker) with Cancellable { self
compareAndSet(InitialRepeatMarker, schedule(
preparedEC,
new AtomicLong(clock() + initialDelay.toNanos) with Runnable {
override def run(): Unit = {
try {
runnable.run()
val driftNanos = clock() - getAndAdd(delay.toNanos)
if (self.get != null)
swap(schedule(this, Duration.fromNanos(Math.max(delay.toNanos - driftNanos, 1))))
swap(schedule(preparedEC, this, Duration.fromNanos(Math.max(delay.toNanos - driftNanos, 1))))
} catch {
case _: SchedulerException // ignore failure to enqueue or terminated target actor
}
@ -266,9 +268,10 @@ class LightArrayRevolverScheduler(config: Config,
} catch {
case SchedulerException(msg) throw new IllegalStateException(msg)
}
}
override def scheduleOnce(delay: FiniteDuration, runnable: Runnable)(implicit executor: ExecutionContext): Cancellable =
try schedule(runnable, roundUp(delay))
try schedule(executor.prepare(), runnable, roundUp(delay))
catch {
case SchedulerException(msg) throw new IllegalStateException(msg)
}
@ -294,7 +297,7 @@ class LightArrayRevolverScheduler(config: Config,
private val wheelMask = length() - 1
@volatile private var currentBucket = 0
private def schedule(r: Runnable, delay: FiniteDuration)(implicit ec: ExecutionContext): TimerTask =
private def schedule(ec: ExecutionContext, r: Runnable, delay: FiniteDuration): TimerTask =
if (delay <= Duration.Zero) {
if (stopped.get != null) throw new SchedulerException("cannot enqueue after timer shutdown")
ec.execute(r)
@ -341,7 +344,7 @@ class LightArrayRevolverScheduler(config: Config,
}
}
rec(new TaskHolder(r, null, rounds))
rec(new TaskHolder(r, null, rounds, ec))
}
private val stopped = new AtomicReference[Promise[immutable.Seq[TimerTask]]]
@ -451,8 +454,8 @@ object LightArrayRevolverScheduler {
*/
protected[actor] class TaskHolder(@volatile var task: Runnable,
@volatile var next: TaskHolder,
@volatile var rounds: Int)(
implicit executionContext: ExecutionContext) extends TimerTask {
@volatile var rounds: Int,
executionContext: ExecutionContext) extends TimerTask {
@tailrec
private final def extractTask(cancel: Boolean): Runnable = {
task match {
@ -501,9 +504,9 @@ object LightArrayRevolverScheduler {
def isCancelled: Boolean = false
}
// marker object during wheel movement
private val Pause = new TaskHolder(null, null, 0)(null)
private val Pause = new TaskHolder(null, null, 0, null)
// we need two empty tokens so wheel passing can be detected in schedule()
private val Empty = new TaskHolder(null, null, 0)(null)
private val Empty = new TaskHolder(null, null, 0, null)
}
/**
@ -531,12 +534,13 @@ class DefaultScheduler(config: Config,
override def schedule(initialDelay: FiniteDuration,
delay: FiniteDuration,
runnable: Runnable)(implicit executor: ExecutionContext): Cancellable = {
val preparedEC = executor.prepare()
val continuousCancellable = new ContinuousCancellable
continuousCancellable.init(
hashedWheelTimer.newTimeout(
new AtomicLong(System.nanoTime + initialDelay.toNanos) with HWTimerTask with ContinuousScheduling {
override def run(timeout: HWTimeout): Unit =
executor.execute(new Runnable {
preparedEC.execute(new Runnable {
override def run = {
try {
runnable.run()
@ -551,11 +555,13 @@ class DefaultScheduler(config: Config,
initialDelay))
}
override def scheduleOnce(delay: FiniteDuration, runnable: Runnable)(implicit executor: ExecutionContext): Cancellable =
override def scheduleOnce(delay: FiniteDuration, runnable: Runnable)(implicit executor: ExecutionContext): Cancellable = {
val preparedEC = executor.prepare()
new DefaultCancellable(
hashedWheelTimer.newTimeout(
new HWTimerTask() { def run(timeout: HWTimeout): Unit = executor.execute(runnable) },
new HWTimerTask() { def run(timeout: HWTimeout): Unit = preparedEC.execute(runnable) },
delay))
}
private trait ContinuousScheduling { this: HWTimerTask
def scheduleNext(timeout: HWTimeout, delay: FiniteDuration, delegator: ContinuousCancellable) {