Fix for "#2579: Scheduler drifts away"

This commit is contained in:
Evgeny Chukreev 2012-10-05 22:09:16 +03:00
parent 89c1f66b1f
commit 3783be2f58

View file

@ -9,7 +9,7 @@ import akka.util.internal.{ TimerTask, HashedWheelTimer, Timeout ⇒ HWTimeout,
import akka.event.LoggingAdapter
import akka.dispatch.MessageDispatcher
import java.io.Closeable
import java.util.concurrent.atomic.AtomicReference
import java.util.concurrent.atomic.{ AtomicReference, AtomicLong }
import scala.annotation.tailrec
import akka.util.internal._
import concurrent.ExecutionContext
@ -137,14 +137,17 @@ class DefaultScheduler(hashedWheelTimer: HashedWheelTimer, log: LoggingAdapter)
val continuousCancellable = new ContinuousCancellable
continuousCancellable.init(
hashedWheelTimer.newTimeout(
new TimerTask with ContinuousScheduling {
new AtomicLong(System.nanoTime + initialDelay.toNanos) with TimerTask with ContinuousScheduling {
def run(timeout: HWTimeout) {
executor execute new Runnable {
override def run = {
receiver ! message
// Check if the receiver is still alive and kicking before reschedule the task
if (receiver.isTerminated) log.debug("Could not reschedule message to be sent because receiving actor {} has been terminated.", receiver)
else scheduleNext(timeout, delay, continuousCancellable)
else {
val driftNanos = System.nanoTime - getAndAdd(delay.toNanos)
scheduleNext(timeout, Duration.fromNanos(Math.max(delay.toNanos - driftNanos, 1)), continuousCancellable)
}
}
}
}
@ -162,11 +165,12 @@ class DefaultScheduler(hashedWheelTimer: HashedWheelTimer, log: LoggingAdapter)
val continuousCancellable = new ContinuousCancellable
continuousCancellable.init(
hashedWheelTimer.newTimeout(
new TimerTask with ContinuousScheduling {
new AtomicLong(System.nanoTime + initialDelay.toNanos) with TimerTask with ContinuousScheduling {
override def run(timeout: HWTimeout): Unit = executor.execute(new Runnable {
override def run = {
runnable.run()
scheduleNext(timeout, delay, continuousCancellable)
val driftNanos = System.nanoTime - getAndAdd(delay.toNanos)
scheduleNext(timeout, Duration.fromNanos(Math.max(delay.toNanos - driftNanos, 1)), continuousCancellable)
}
})
},