Merge pull request #786 from akshaal/wip-2579-squashed-scheduler-drift-akshaal

Fix for "#2579: Scheduler drifts away" (Squashed)
This commit is contained in:
Viktor Klang (√) 2012-10-07 12:50:19 -07:00
commit 079b844e40

View file

@ -9,7 +9,7 @@ import akka.util.internal.{ TimerTask, HashedWheelTimer, Timeout ⇒ HWTimeout,
import akka.event.LoggingAdapter import akka.event.LoggingAdapter
import akka.dispatch.MessageDispatcher import akka.dispatch.MessageDispatcher
import java.io.Closeable import java.io.Closeable
import java.util.concurrent.atomic.AtomicReference import java.util.concurrent.atomic.{ AtomicReference, AtomicLong }
import scala.annotation.tailrec import scala.annotation.tailrec
import akka.util.internal._ import akka.util.internal._
import concurrent.ExecutionContext import concurrent.ExecutionContext
@ -137,14 +137,17 @@ class DefaultScheduler(hashedWheelTimer: HashedWheelTimer, log: LoggingAdapter)
val continuousCancellable = new ContinuousCancellable val continuousCancellable = new ContinuousCancellable
continuousCancellable.init( continuousCancellable.init(
hashedWheelTimer.newTimeout( hashedWheelTimer.newTimeout(
new TimerTask with ContinuousScheduling { new AtomicLong(System.nanoTime + initialDelay.toNanos) with TimerTask with ContinuousScheduling {
def run(timeout: HWTimeout) { def run(timeout: HWTimeout) {
executor execute new Runnable { executor execute new Runnable {
override def run = { override def run = {
receiver ! message receiver ! message
// Check if the receiver is still alive and kicking before reschedule the task // Check if the receiver is still alive and kicking before reschedule the task
if (receiver.isTerminated) log.debug("Could not reschedule message to be sent because receiving actor {} has been terminated.", receiver) if (receiver.isTerminated) log.debug("Could not reschedule message to be sent because receiving actor {} has been terminated.", receiver)
else scheduleNext(timeout, delay, continuousCancellable) else {
val driftNanos = System.nanoTime - getAndAdd(delay.toNanos)
scheduleNext(timeout, Duration.fromNanos(Math.max(delay.toNanos - driftNanos, 1)), continuousCancellable)
}
} }
} }
} }
@ -162,11 +165,12 @@ class DefaultScheduler(hashedWheelTimer: HashedWheelTimer, log: LoggingAdapter)
val continuousCancellable = new ContinuousCancellable val continuousCancellable = new ContinuousCancellable
continuousCancellable.init( continuousCancellable.init(
hashedWheelTimer.newTimeout( hashedWheelTimer.newTimeout(
new TimerTask with ContinuousScheduling { new AtomicLong(System.nanoTime + initialDelay.toNanos) with TimerTask with ContinuousScheduling {
override def run(timeout: HWTimeout): Unit = executor.execute(new Runnable { override def run(timeout: HWTimeout): Unit = executor.execute(new Runnable {
override def run = { override def run = {
runnable.run() runnable.run()
scheduleNext(timeout, delay, continuousCancellable) val driftNanos = System.nanoTime - getAndAdd(delay.toNanos)
scheduleNext(timeout, Duration.fromNanos(Math.max(delay.toNanos - driftNanos, 1)), continuousCancellable)
} }
}) })
}, },