From 7b6ae2f5c91e266a2202c192d00f3f1baeb8b22d Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Tue, 12 Jun 2012 13:37:21 +0200 Subject: [PATCH] Use nanoTime in FixedRateTask, see #2214 * Rewrote test to use latch and assert rate instead --- .../scala/akka/cluster/FixedRateTask.scala | 13 ++++------ .../akka/cluster/FixedRateTaskSpec.scala | 25 ++++++++++++------- 2 files changed, 21 insertions(+), 17 deletions(-) diff --git a/akka-cluster/src/main/scala/akka/cluster/FixedRateTask.scala b/akka-cluster/src/main/scala/akka/cluster/FixedRateTask.scala index 0f594316d9..25ef058465 100644 --- a/akka-cluster/src/main/scala/akka/cluster/FixedRateTask.scala +++ b/akka-cluster/src/main/scala/akka/cluster/FixedRateTask.scala @@ -29,11 +29,10 @@ private[akka] object FixedRateTask { */ private[akka] class FixedRateTask(scheduler: Scheduler, initalDelay: Duration, delay: Duration, task: Runnable) extends Runnable { - private val delayMillis = delay.toMillis - private val minDelayMillis = 1L + private val delayNanos = delay.toNanos private val cancelled = new AtomicBoolean(false) private val counter = new AtomicLong(0L) - private val startTime = System.currentTimeMillis + initalDelay.toMillis + private val startTime = System.nanoTime + initalDelay.toNanos scheduler.scheduleOnce(initalDelay, this) def cancel(): Unit = cancelled.set(true) @@ -41,11 +40,9 @@ private[akka] class FixedRateTask(scheduler: Scheduler, initalDelay: Duration, d override final def run(): Unit = if (!cancelled.get) try { task.run() } finally if (!cancelled.get) { - val nextTime = startTime + delayMillis * counter.incrementAndGet - val nextDelayMillis = nextTime - System.currentTimeMillis - val nextDelay = Duration( - (if (nextDelayMillis <= minDelayMillis) minDelayMillis else nextDelayMillis), - TimeUnit.MILLISECONDS) + val nextTime = startTime + delayNanos * counter.incrementAndGet + // it's ok to schedule with negative duration, will run asap + val nextDelay = Duration(nextTime - System.nanoTime, TimeUnit.NANOSECONDS) try { scheduler.scheduleOnce(nextDelay, this) } catch { case e: IllegalStateException ⇒ /* will happen when scheduler is closed, nothing wrong */ } diff --git a/akka-cluster/src/test/scala/akka/cluster/FixedRateTaskSpec.scala b/akka-cluster/src/test/scala/akka/cluster/FixedRateTaskSpec.scala index 3efa3ab3ab..d259a5310b 100644 --- a/akka-cluster/src/test/scala/akka/cluster/FixedRateTaskSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/FixedRateTaskSpec.scala @@ -4,32 +4,39 @@ package akka.cluster -import java.util.concurrent.atomic.AtomicInteger import akka.testkit.AkkaSpec import akka.util.duration._ import akka.testkit.TimingTest +import akka.testkit.TestLatch +import akka.dispatch.Await @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) class FixedRateTaskSpec extends AkkaSpec { "Task scheduled at fixed rate" must { "adjust for scheduler inaccuracy" taggedAs TimingTest in { - val counter = new AtomicInteger + val startTime = System.nanoTime + val n = 33 + val latch = new TestLatch(n) FixedRateTask(system.scheduler, 150.millis, 150.millis) { - counter.incrementAndGet() + latch.countDown() } - 5000.millis.sleep() - counter.get must (be(33) or be(34)) + Await.ready(latch, 6.seconds) + val rate = n * 1000.0 / (System.nanoTime - startTime).nanos.toMillis + rate must be(6.66 plusOrMinus (0.4)) } "compensate for long running task" taggedAs TimingTest in { - val counter = new AtomicInteger + val startTime = System.nanoTime + val n = 22 + val latch = new TestLatch(n) FixedRateTask(system.scheduler, 225.millis, 225.millis) { - counter.incrementAndGet() 80.millis.sleep() + latch.countDown() } - 5000.millis.sleep() - counter.get must (be(22) or be(23)) + Await.ready(latch, 6.seconds) + val rate = n * 1000.0 / (System.nanoTime - startTime).nanos.toMillis + rate must be(4.4 plusOrMinus (0.3)) } } }