diff --git a/akka-actor/src/main/scala/akka/actor/Scheduler.scala b/akka-actor/src/main/scala/akka/actor/Scheduler.scala index c30792271e..452b37fddd 100644 --- a/akka-actor/src/main/scala/akka/actor/Scheduler.scala +++ b/akka-actor/src/main/scala/akka/actor/Scheduler.scala @@ -230,16 +230,18 @@ class LightArrayRevolverScheduler(config: Config, override def schedule(initialDelay: FiniteDuration, delay: FiniteDuration, - runnable: Runnable)(implicit executor: ExecutionContext): Cancellable = + runnable: Runnable)(implicit executor: ExecutionContext): Cancellable = { + val preparedEC = executor.prepare() try new AtomicReference[Cancellable](InitialRepeatMarker) with Cancellable { self ⇒ compareAndSet(InitialRepeatMarker, schedule( + preparedEC, new AtomicLong(clock() + initialDelay.toNanos) with Runnable { override def run(): Unit = { try { runnable.run() val driftNanos = clock() - getAndAdd(delay.toNanos) if (self.get != null) - swap(schedule(this, Duration.fromNanos(Math.max(delay.toNanos - driftNanos, 1)))) + swap(schedule(preparedEC, this, Duration.fromNanos(Math.max(delay.toNanos - driftNanos, 1)))) } catch { case _: SchedulerException ⇒ // ignore failure to enqueue or terminated target actor } @@ -266,9 +268,10 @@ class LightArrayRevolverScheduler(config: Config, } catch { case SchedulerException(msg) ⇒ throw new IllegalStateException(msg) } + } override def scheduleOnce(delay: FiniteDuration, runnable: Runnable)(implicit executor: ExecutionContext): Cancellable = - try schedule(runnable, roundUp(delay)) + try schedule(executor.prepare(), runnable, roundUp(delay)) catch { case SchedulerException(msg) ⇒ throw new IllegalStateException(msg) } @@ -294,7 +297,7 @@ class LightArrayRevolverScheduler(config: Config, private val wheelMask = length() - 1 @volatile private var currentBucket = 0 - private def schedule(r: Runnable, delay: FiniteDuration)(implicit ec: ExecutionContext): TimerTask = + private def schedule(ec: ExecutionContext, r: Runnable, delay: FiniteDuration): TimerTask = if (delay <= Duration.Zero) { if (stopped.get != null) throw new SchedulerException("cannot enqueue after timer shutdown") ec.execute(r) @@ -341,7 +344,7 @@ class LightArrayRevolverScheduler(config: Config, } } - rec(new TaskHolder(r, null, rounds)) + rec(new TaskHolder(r, null, rounds, ec)) } private val stopped = new AtomicReference[Promise[immutable.Seq[TimerTask]]] @@ -451,8 +454,8 @@ object LightArrayRevolverScheduler { */ protected[actor] class TaskHolder(@volatile var task: Runnable, @volatile var next: TaskHolder, - @volatile var rounds: Int)( - implicit executionContext: ExecutionContext) extends TimerTask { + @volatile var rounds: Int, + executionContext: ExecutionContext) extends TimerTask { @tailrec private final def extractTask(cancel: Boolean): Runnable = { task match { @@ -501,9 +504,9 @@ object LightArrayRevolverScheduler { def isCancelled: Boolean = false } // marker object during wheel movement - private val Pause = new TaskHolder(null, null, 0)(null) + private val Pause = new TaskHolder(null, null, 0, null) // we need two empty tokens so wheel passing can be detected in schedule() - private val Empty = new TaskHolder(null, null, 0)(null) + private val Empty = new TaskHolder(null, null, 0, null) } /** @@ -531,12 +534,13 @@ class DefaultScheduler(config: Config, override def schedule(initialDelay: FiniteDuration, delay: FiniteDuration, runnable: Runnable)(implicit executor: ExecutionContext): Cancellable = { + val preparedEC = executor.prepare() val continuousCancellable = new ContinuousCancellable continuousCancellable.init( hashedWheelTimer.newTimeout( new AtomicLong(System.nanoTime + initialDelay.toNanos) with HWTimerTask with ContinuousScheduling { override def run(timeout: HWTimeout): Unit = - executor.execute(new Runnable { + preparedEC.execute(new Runnable { override def run = { try { runnable.run() @@ -551,11 +555,13 @@ class DefaultScheduler(config: Config, initialDelay)) } - override def scheduleOnce(delay: FiniteDuration, runnable: Runnable)(implicit executor: ExecutionContext): Cancellable = + override def scheduleOnce(delay: FiniteDuration, runnable: Runnable)(implicit executor: ExecutionContext): Cancellable = { + val preparedEC = executor.prepare() new DefaultCancellable( hashedWheelTimer.newTimeout( - new HWTimerTask() { def run(timeout: HWTimeout): Unit = executor.execute(runnable) }, + new HWTimerTask() { def run(timeout: HWTimeout): Unit = preparedEC.execute(runnable) }, delay)) + } private trait ContinuousScheduling { this: HWTimerTask ⇒ def scheduleNext(timeout: HWTimeout, delay: FiniteDuration, delegator: ContinuousCancellable) {