Remove cluster.FixedRateTask, see #2606

This commit is contained in:
Patrik Nordwall 2012-10-08 12:17:40 +02:00
parent 5b0a2ec7ee
commit 1f3341713f
6 changed files with 41 additions and 128 deletions

View file

@ -214,5 +214,30 @@ class SchedulerSpec extends AkkaSpec with BeforeAndAfterEach with DefaultTimeout
assert(elapsedTimeMs < 2000) // the precision is not ms exact assert(elapsedTimeMs < 2000) // the precision is not ms exact
cancellable.cancel() cancellable.cancel()
} }
"adjust for scheduler inaccuracy" taggedAs TimingTest in {
val startTime = System.nanoTime
val n = 33
val latch = new TestLatch(n)
system.scheduler.schedule(150.millis, 150.millis) {
latch.countDown()
}
Await.ready(latch, 6.seconds)
val rate = n * 1000.0 / (System.nanoTime - startTime).nanos.toMillis
rate must be(6.66 plusOrMinus (0.4))
}
"not be affected by long running task" taggedAs TimingTest in {
val startTime = System.nanoTime
val n = 22
val latch = new TestLatch(n)
system.scheduler.schedule(225.millis, 225.millis) {
Thread.sleep(80)
latch.countDown()
}
Await.ready(latch, 6.seconds)
val rate = n * 1000.0 / (System.nanoTime - startTime).nanos.toMillis
rate must be(4.4 plusOrMinus (0.3))
}
} }
} }

View file

@ -196,35 +196,26 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto
import context.dispatcher import context.dispatcher
// start periodic gossip to random nodes in cluster // start periodic gossip to random nodes in cluster
val gossipTask = val gossipTask = scheduler.schedule(PeriodicTasksInitialDelay.max(GossipInterval).asInstanceOf[FiniteDuration],
FixedRateTask(scheduler, PeriodicTasksInitialDelay.max(GossipInterval).asInstanceOf[FiniteDuration], GossipInterval) { GossipInterval, self, GossipTick)
self ! GossipTick
}
// start periodic heartbeat to all nodes in cluster // start periodic heartbeat to all nodes in cluster
val heartbeatTask = val heartbeatTask = scheduler.schedule(PeriodicTasksInitialDelay.max(HeartbeatInterval).asInstanceOf[FiniteDuration],
FixedRateTask(scheduler, PeriodicTasksInitialDelay.max(HeartbeatInterval).asInstanceOf[FiniteDuration], HeartbeatInterval) { HeartbeatInterval, self, HeartbeatTick)
self ! HeartbeatTick
}
// start periodic cluster failure detector reaping (moving nodes condemned by the failure detector to unreachable list) // start periodic cluster failure detector reaping (moving nodes condemned by the failure detector to unreachable list)
val failureDetectorReaperTask = val failureDetectorReaperTask = scheduler.schedule(PeriodicTasksInitialDelay.max(UnreachableNodesReaperInterval).asInstanceOf[FiniteDuration],
FixedRateTask(scheduler, PeriodicTasksInitialDelay.max(UnreachableNodesReaperInterval).asInstanceOf[FiniteDuration], UnreachableNodesReaperInterval) { UnreachableNodesReaperInterval, self, ReapUnreachableTick)
self ! ReapUnreachableTick
}
// start periodic leader action management (only applies for the current leader) // start periodic leader action management (only applies for the current leader)
private val leaderActionsTask = val leaderActionsTask = scheduler.schedule(PeriodicTasksInitialDelay.max(LeaderActionsInterval).asInstanceOf[FiniteDuration],
FixedRateTask(scheduler, PeriodicTasksInitialDelay.max(LeaderActionsInterval).asInstanceOf[FiniteDuration], LeaderActionsInterval) { LeaderActionsInterval, self, LeaderActionsTick)
self ! LeaderActionsTick
}
// start periodic publish of current stats // start periodic publish of current stats
private val publishStatsTask: Option[Cancellable] = val publishStatsTask: Option[Cancellable] =
if (PublishStatsInterval == Duration.Zero) None if (PublishStatsInterval == Duration.Zero) None
else Some(FixedRateTask(scheduler, PeriodicTasksInitialDelay.max(PublishStatsInterval).asInstanceOf[FiniteDuration], PublishStatsInterval) { else Some(scheduler.schedule(PeriodicTasksInitialDelay.max(PublishStatsInterval).asInstanceOf[FiniteDuration],
self ! PublishStatsTick PublishStatsInterval, self, PublishStatsTick))
})
override def preStart(): Unit = { override def preStart(): Unit = {
if (AutoJoin) self ! JoinSeedNodes(SeedNodes) if (AutoJoin) self ! JoinSeedNodes(SeedNodes)

View file

@ -62,16 +62,14 @@ private[cluster] class ClusterMetricsCollector(publisher: ActorRef) extends Acto
/** /**
* Start periodic gossip to random nodes in cluster * Start periodic gossip to random nodes in cluster
*/ */
val gossipTask = FixedRateTask(scheduler, PeriodicTasksInitialDelay.max(MetricsGossipInterval).asInstanceOf[FiniteDuration], MetricsGossipInterval) { val gossipTask = scheduler.schedule(PeriodicTasksInitialDelay.max(MetricsGossipInterval).asInstanceOf[FiniteDuration],
self ! GossipTick MetricsGossipInterval, self, GossipTick)
}
/** /**
* Start periodic metrics collection * Start periodic metrics collection
*/ */
val metricsTask = FixedRateTask(scheduler, PeriodicTasksInitialDelay.max(MetricsInterval).asInstanceOf[FiniteDuration], MetricsInterval) { val metricsTask = scheduler.schedule(PeriodicTasksInitialDelay.max(MetricsInterval).asInstanceOf[FiniteDuration],
self ! MetricsTick MetricsInterval, self, MetricsTick)
}
override def preStart(): Unit = { override def preStart(): Unit = {
cluster.subscribe(self, classOf[MemberEvent]) cluster.subscribe(self, classOf[MemberEvent])

View file

@ -1,58 +0,0 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.cluster
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.{ AtomicBoolean, AtomicLong }
import akka.actor.{ Scheduler, Cancellable }
import scala.concurrent.util.Duration
import concurrent.ExecutionContext
import scala.concurrent.util.FiniteDuration
/**
* INTERNAL API
*/
private[akka] object FixedRateTask {
def apply(scheduler: Scheduler,
initalDelay: FiniteDuration,
delay: FiniteDuration)(f: Unit)(implicit executor: ExecutionContext): FixedRateTask =
new FixedRateTask(scheduler, initalDelay, delay, new Runnable { def run(): Unit = f })
}
/**
* INTERNAL API
*
* Task to be scheduled periodically at a fixed rate, compensating, on average,
* for inaccuracy in scheduler. It will start when constructed, using the
* initialDelay.
*/
private[akka] class FixedRateTask(scheduler: Scheduler,
initalDelay: FiniteDuration,
delay: FiniteDuration,
task: Runnable)(implicit executor: ExecutionContext)
extends Runnable with Cancellable {
private val delayNanos = delay.toNanos
private val cancelled = new AtomicBoolean(false)
private val counter = new AtomicLong(0L)
private val startTime = System.nanoTime + initalDelay.toNanos
scheduler.scheduleOnce(initalDelay, this)
def cancel(): Unit = cancelled.set(true)
def isCancelled: Boolean = cancelled.get
override final def run(): Unit = if (!isCancelled) try {
task.run()
} finally if (!isCancelled) {
val nextTime = startTime + delayNanos * counter.incrementAndGet
// it's ok to schedule with negative duration, will run asap
val nextDelay = Duration(nextTime - System.nanoTime, TimeUnit.NANOSECONDS)
try {
scheduler.scheduleOnce(nextDelay, this)
} catch { case e: IllegalStateException /* will happen when scheduler is closed, nothing wrong */ }
}
}

View file

@ -1,43 +0,0 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.cluster
import akka.testkit.AkkaSpec
import scala.concurrent.util.duration._
import akka.testkit.TimingTest
import akka.testkit.TestLatch
import scala.concurrent.Await
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class FixedRateTaskSpec extends AkkaSpec {
import system.dispatcher
"Task scheduled at fixed rate" must {
"adjust for scheduler inaccuracy" taggedAs TimingTest in {
val startTime = System.nanoTime
val n = 33
val latch = new TestLatch(n)
FixedRateTask(system.scheduler, 150.millis, 150.millis) {
latch.countDown()
}
Await.ready(latch, 6.seconds)
val rate = n * 1000.0 / (System.nanoTime - startTime).nanos.toMillis
rate must be(6.66 plusOrMinus (0.4))
}
"compensate for long running task" taggedAs TimingTest in {
val startTime = System.nanoTime
val n = 22
val latch = new TestLatch(n)
FixedRateTask(system.scheduler, 225.millis, 225.millis) {
Thread.sleep(80)
latch.countDown()
}
Await.ready(latch, 6.seconds)
val rate = n * 1000.0 / (System.nanoTime - startTime).nanos.toMillis
rate must be(4.4 plusOrMinus (0.3))
}
}
}

View file

@ -119,7 +119,7 @@ class MetricsCollectorSpec extends AkkaSpec(MetricsEnabledSpec.config) with Impl
"collect [" + samples + "] node metrics samples in an acceptable duration" taggedAs LongRunningTest in { "collect [" + samples + "] node metrics samples in an acceptable duration" taggedAs LongRunningTest in {
val latch = TestLatch(samples) val latch = TestLatch(samples)
val task = FixedRateTask(system.scheduler, 0 seconds, interval) { val task = system.scheduler.schedule(0 seconds, interval) {
val sample = collector.sample val sample = collector.sample
assertCreatedUninitialized(sample.metrics) assertCreatedUninitialized(sample.metrics)
assertExpectedSampleSize(collector.isSigar, window, sample) assertExpectedSampleSize(collector.isSigar, window, sample)