diff --git a/akka-actor/src/main/scala/akka/actor/Scheduler.scala b/akka-actor/src/main/scala/akka/actor/Scheduler.scala index 02c67c6423..023acca4c1 100644 --- a/akka-actor/src/main/scala/akka/actor/Scheduler.scala +++ b/akka-actor/src/main/scala/akka/actor/Scheduler.scala @@ -9,7 +9,7 @@ import akka.util.internal.{ TimerTask, HashedWheelTimer, Timeout ⇒ HWTimeout, import akka.event.LoggingAdapter import akka.dispatch.MessageDispatcher import java.io.Closeable -import java.util.concurrent.atomic.AtomicReference +import java.util.concurrent.atomic.{ AtomicReference, AtomicLong } import scala.annotation.tailrec import akka.util.internal._ import concurrent.ExecutionContext @@ -137,14 +137,17 @@ class DefaultScheduler(hashedWheelTimer: HashedWheelTimer, log: LoggingAdapter) val continuousCancellable = new ContinuousCancellable continuousCancellable.init( hashedWheelTimer.newTimeout( - new TimerTask with ContinuousScheduling { + new AtomicLong(System.nanoTime + initialDelay.toNanos) with TimerTask with ContinuousScheduling { def run(timeout: HWTimeout) { executor execute new Runnable { override def run = { receiver ! message // Check if the receiver is still alive and kicking before reschedule the task if (receiver.isTerminated) log.debug("Could not reschedule message to be sent because receiving actor {} has been terminated.", receiver) - else scheduleNext(timeout, delay, continuousCancellable) + else { + val driftNanos = System.nanoTime - getAndAdd(delay.toNanos) + scheduleNext(timeout, Duration.fromNanos(Math.max(delay.toNanos - driftNanos, 1)), continuousCancellable) + } } } } @@ -162,11 +165,12 @@ class DefaultScheduler(hashedWheelTimer: HashedWheelTimer, log: LoggingAdapter) val continuousCancellable = new ContinuousCancellable continuousCancellable.init( hashedWheelTimer.newTimeout( - new TimerTask with ContinuousScheduling { + new AtomicLong(System.nanoTime + initialDelay.toNanos) with TimerTask with ContinuousScheduling { override def run(timeout: HWTimeout): Unit = executor.execute(new Runnable { override def run = { runnable.run() - scheduleNext(timeout, delay, continuousCancellable) + val driftNanos = System.nanoTime - getAndAdd(delay.toNanos) + scheduleNext(timeout, Duration.fromNanos(Math.max(delay.toNanos - driftNanos, 1)), continuousCancellable) } }) },