From 1f3341713f97a0191b12981cb49d3e2e3eb8f637 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Mon, 8 Oct 2012 12:17:40 +0200 Subject: [PATCH] Remove cluster.FixedRateTask, see #2606 --- .../test/scala/akka/actor/SchedulerSpec.scala | 25 ++++++++ .../scala/akka/cluster/ClusterDaemon.scala | 31 ++++------ .../cluster/ClusterMetricsCollector.scala | 10 ++-- .../scala/akka/cluster/FixedRateTask.scala | 58 ------------------- .../akka/cluster/FixedRateTaskSpec.scala | 43 -------------- .../akka/cluster/MetricsCollectorSpec.scala | 2 +- 6 files changed, 41 insertions(+), 128 deletions(-) delete mode 100644 akka-cluster/src/main/scala/akka/cluster/FixedRateTask.scala delete mode 100644 akka-cluster/src/test/scala/akka/cluster/FixedRateTaskSpec.scala diff --git a/akka-actor-tests/src/test/scala/akka/actor/SchedulerSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/SchedulerSpec.scala index 8d1d2fa965..86cde2fb47 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/SchedulerSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/SchedulerSpec.scala @@ -214,5 +214,30 @@ class SchedulerSpec extends AkkaSpec with BeforeAndAfterEach with DefaultTimeout assert(elapsedTimeMs < 2000) // the precision is not ms exact cancellable.cancel() } + + "adjust for scheduler inaccuracy" taggedAs TimingTest in { + val startTime = System.nanoTime + val n = 33 + val latch = new TestLatch(n) + system.scheduler.schedule(150.millis, 150.millis) { + latch.countDown() + } + Await.ready(latch, 6.seconds) + val rate = n * 1000.0 / (System.nanoTime - startTime).nanos.toMillis + rate must be(6.66 plusOrMinus (0.4)) + } + + "not be affected by long running task" taggedAs TimingTest in { + val startTime = System.nanoTime + val n = 22 + val latch = new TestLatch(n) + system.scheduler.schedule(225.millis, 225.millis) { + Thread.sleep(80) + latch.countDown() + } + Await.ready(latch, 6.seconds) + val rate = n * 1000.0 / (System.nanoTime - startTime).nanos.toMillis + rate must be(4.4 plusOrMinus (0.3)) + } } } diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala index 6012c48f45..00d70afb5b 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala @@ -196,35 +196,26 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto import context.dispatcher // start periodic gossip to random nodes in cluster - val gossipTask = - FixedRateTask(scheduler, PeriodicTasksInitialDelay.max(GossipInterval).asInstanceOf[FiniteDuration], GossipInterval) { - self ! GossipTick - } + val gossipTask = scheduler.schedule(PeriodicTasksInitialDelay.max(GossipInterval).asInstanceOf[FiniteDuration], + GossipInterval, self, GossipTick) // start periodic heartbeat to all nodes in cluster - val heartbeatTask = - FixedRateTask(scheduler, PeriodicTasksInitialDelay.max(HeartbeatInterval).asInstanceOf[FiniteDuration], HeartbeatInterval) { - self ! HeartbeatTick - } + val heartbeatTask = scheduler.schedule(PeriodicTasksInitialDelay.max(HeartbeatInterval).asInstanceOf[FiniteDuration], + HeartbeatInterval, self, HeartbeatTick) // start periodic cluster failure detector reaping (moving nodes condemned by the failure detector to unreachable list) - val failureDetectorReaperTask = - FixedRateTask(scheduler, PeriodicTasksInitialDelay.max(UnreachableNodesReaperInterval).asInstanceOf[FiniteDuration], UnreachableNodesReaperInterval) { - self ! ReapUnreachableTick - } + val failureDetectorReaperTask = scheduler.schedule(PeriodicTasksInitialDelay.max(UnreachableNodesReaperInterval).asInstanceOf[FiniteDuration], + UnreachableNodesReaperInterval, self, ReapUnreachableTick) // start periodic leader action management (only applies for the current leader) - private val leaderActionsTask = - FixedRateTask(scheduler, PeriodicTasksInitialDelay.max(LeaderActionsInterval).asInstanceOf[FiniteDuration], LeaderActionsInterval) { - self ! LeaderActionsTick - } + val leaderActionsTask = scheduler.schedule(PeriodicTasksInitialDelay.max(LeaderActionsInterval).asInstanceOf[FiniteDuration], + LeaderActionsInterval, self, LeaderActionsTick) // start periodic publish of current stats - private val publishStatsTask: Option[Cancellable] = + val publishStatsTask: Option[Cancellable] = if (PublishStatsInterval == Duration.Zero) None - else Some(FixedRateTask(scheduler, PeriodicTasksInitialDelay.max(PublishStatsInterval).asInstanceOf[FiniteDuration], PublishStatsInterval) { - self ! PublishStatsTick - }) + else Some(scheduler.schedule(PeriodicTasksInitialDelay.max(PublishStatsInterval).asInstanceOf[FiniteDuration], + PublishStatsInterval, self, PublishStatsTick)) override def preStart(): Unit = { if (AutoJoin) self ! JoinSeedNodes(SeedNodes) diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterMetricsCollector.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterMetricsCollector.scala index 87bb15450b..7040c322fd 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterMetricsCollector.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterMetricsCollector.scala @@ -62,16 +62,14 @@ private[cluster] class ClusterMetricsCollector(publisher: ActorRef) extends Acto /** * Start periodic gossip to random nodes in cluster */ - val gossipTask = FixedRateTask(scheduler, PeriodicTasksInitialDelay.max(MetricsGossipInterval).asInstanceOf[FiniteDuration], MetricsGossipInterval) { - self ! GossipTick - } + val gossipTask = scheduler.schedule(PeriodicTasksInitialDelay.max(MetricsGossipInterval).asInstanceOf[FiniteDuration], + MetricsGossipInterval, self, GossipTick) /** * Start periodic metrics collection */ - val metricsTask = FixedRateTask(scheduler, PeriodicTasksInitialDelay.max(MetricsInterval).asInstanceOf[FiniteDuration], MetricsInterval) { - self ! MetricsTick - } + val metricsTask = scheduler.schedule(PeriodicTasksInitialDelay.max(MetricsInterval).asInstanceOf[FiniteDuration], + MetricsInterval, self, MetricsTick) override def preStart(): Unit = { cluster.subscribe(self, classOf[MemberEvent]) diff --git a/akka-cluster/src/main/scala/akka/cluster/FixedRateTask.scala b/akka-cluster/src/main/scala/akka/cluster/FixedRateTask.scala deleted file mode 100644 index 9e6eedf659..0000000000 --- a/akka-cluster/src/main/scala/akka/cluster/FixedRateTask.scala +++ /dev/null @@ -1,58 +0,0 @@ -/** - * Copyright (C) 2009-2012 Typesafe Inc. - */ - -package akka.cluster - -import java.util.concurrent.TimeUnit -import java.util.concurrent.atomic.{ AtomicBoolean, AtomicLong } -import akka.actor.{ Scheduler, Cancellable } -import scala.concurrent.util.Duration -import concurrent.ExecutionContext -import scala.concurrent.util.FiniteDuration - -/** - * INTERNAL API - */ -private[akka] object FixedRateTask { - def apply(scheduler: Scheduler, - initalDelay: FiniteDuration, - delay: FiniteDuration)(f: ⇒ Unit)(implicit executor: ExecutionContext): FixedRateTask = - new FixedRateTask(scheduler, initalDelay, delay, new Runnable { def run(): Unit = f }) -} - -/** - * INTERNAL API - * - * Task to be scheduled periodically at a fixed rate, compensating, on average, - * for inaccuracy in scheduler. It will start when constructed, using the - * initialDelay. - */ -private[akka] class FixedRateTask(scheduler: Scheduler, - initalDelay: FiniteDuration, - delay: FiniteDuration, - task: Runnable)(implicit executor: ExecutionContext) - extends Runnable with Cancellable { - - private val delayNanos = delay.toNanos - private val cancelled = new AtomicBoolean(false) - private val counter = new AtomicLong(0L) - private val startTime = System.nanoTime + initalDelay.toNanos - scheduler.scheduleOnce(initalDelay, this) - - def cancel(): Unit = cancelled.set(true) - - def isCancelled: Boolean = cancelled.get - - override final def run(): Unit = if (!isCancelled) try { - task.run() - } finally if (!isCancelled) { - val nextTime = startTime + delayNanos * counter.incrementAndGet - // it's ok to schedule with negative duration, will run asap - val nextDelay = Duration(nextTime - System.nanoTime, TimeUnit.NANOSECONDS) - try { - scheduler.scheduleOnce(nextDelay, this) - } catch { case e: IllegalStateException ⇒ /* will happen when scheduler is closed, nothing wrong */ } - } - -} diff --git a/akka-cluster/src/test/scala/akka/cluster/FixedRateTaskSpec.scala b/akka-cluster/src/test/scala/akka/cluster/FixedRateTaskSpec.scala deleted file mode 100644 index e6590cf9c3..0000000000 --- a/akka-cluster/src/test/scala/akka/cluster/FixedRateTaskSpec.scala +++ /dev/null @@ -1,43 +0,0 @@ -/** - * Copyright (C) 2009-2012 Typesafe Inc. - */ - -package akka.cluster - -import akka.testkit.AkkaSpec -import scala.concurrent.util.duration._ -import akka.testkit.TimingTest -import akka.testkit.TestLatch -import scala.concurrent.Await - -@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) -class FixedRateTaskSpec extends AkkaSpec { - import system.dispatcher - "Task scheduled at fixed rate" must { - "adjust for scheduler inaccuracy" taggedAs TimingTest in { - val startTime = System.nanoTime - val n = 33 - val latch = new TestLatch(n) - FixedRateTask(system.scheduler, 150.millis, 150.millis) { - latch.countDown() - } - Await.ready(latch, 6.seconds) - val rate = n * 1000.0 / (System.nanoTime - startTime).nanos.toMillis - rate must be(6.66 plusOrMinus (0.4)) - } - - "compensate for long running task" taggedAs TimingTest in { - val startTime = System.nanoTime - val n = 22 - val latch = new TestLatch(n) - FixedRateTask(system.scheduler, 225.millis, 225.millis) { - Thread.sleep(80) - latch.countDown() - } - Await.ready(latch, 6.seconds) - val rate = n * 1000.0 / (System.nanoTime - startTime).nanos.toMillis - rate must be(4.4 plusOrMinus (0.3)) - } - } -} - diff --git a/akka-cluster/src/test/scala/akka/cluster/MetricsCollectorSpec.scala b/akka-cluster/src/test/scala/akka/cluster/MetricsCollectorSpec.scala index 2288279a03..ea881f5a71 100644 --- a/akka-cluster/src/test/scala/akka/cluster/MetricsCollectorSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/MetricsCollectorSpec.scala @@ -119,7 +119,7 @@ class MetricsCollectorSpec extends AkkaSpec(MetricsEnabledSpec.config) with Impl "collect [" + samples + "] node metrics samples in an acceptable duration" taggedAs LongRunningTest in { val latch = TestLatch(samples) - val task = FixedRateTask(system.scheduler, 0 seconds, interval) { + val task = system.scheduler.schedule(0 seconds, interval) { val sample = collector.sample assertCreatedUninitialized(sample.metrics) assertExpectedSampleSize(collector.isSigar, window, sample)