Schedule cluster tasks with more accurate, see #2114

* Use scheduler with more accurate settings
* New FixedRateTask that compensates for inaccuracy
This commit is contained in:
Patrik Nordwall 2012-06-11 22:12:45 +02:00
parent d957c68639
commit 34c9e49ee0
3 changed files with 110 additions and 9 deletions

View file

@ -23,6 +23,8 @@ import javax.management._
import scala.collection.immutable.{ Map, SortedSet } import scala.collection.immutable.{ Map, SortedSet }
import scala.annotation.tailrec import scala.annotation.tailrec
import com.google.protobuf.ByteString import com.google.protobuf.ByteString
import akka.util.internal.HashedWheelTimer
import akka.dispatch.MonitorableThreadFactory
/** /**
* Interface for membership change listener. * Interface for membership change listener.
@ -422,28 +424,35 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒
// ===================== WORK DAEMONS ===================== // ===================== WORK DAEMONS =====================
// ======================================================== // ========================================================
private def hwt = new HashedWheelTimer(log,
MonitorableThreadFactory(system.name + "-cluster-scheduler", system.settings.Daemonicity, None), 50.millis,
system.settings.SchedulerTicksPerWheel)
private val clusterScheduler = new DefaultScheduler(hwt, log, system.dispatcher)
// start periodic gossip to random nodes in cluster // start periodic gossip to random nodes in cluster
private val gossipCanceller = system.scheduler.schedule(PeriodicTasksInitialDelay, GossipInterval) { private val gossipTask = FixedRateTask(clusterScheduler, PeriodicTasksInitialDelay, GossipInterval) {
gossip() gossip()
} }
// start periodic heartbeat to all nodes in cluster // start periodic heartbeat to all nodes in cluster
private val heartbeatCanceller = system.scheduler.schedule(PeriodicTasksInitialDelay, HeartbeatInterval) { private val heartbeatTask = FixedRateTask(clusterScheduler, PeriodicTasksInitialDelay, HeartbeatInterval) {
heartbeat() heartbeat()
} }
// start periodic cluster failure detector reaping (moving nodes condemned by the failure detector to unreachable list) // start periodic cluster failure detector reaping (moving nodes condemned by the failure detector to unreachable list)
private val failureDetectorReaperCanceller = system.scheduler.schedule(PeriodicTasksInitialDelay, UnreachableNodesReaperInterval) { private val failureDetectorReaperTask = FixedRateTask(clusterScheduler, PeriodicTasksInitialDelay, UnreachableNodesReaperInterval) {
reapUnreachableMembers() reapUnreachableMembers()
} }
// start periodic leader action management (only applies for the current leader) // start periodic leader action management (only applies for the current leader)
private val leaderActionsCanceller = system.scheduler.schedule(PeriodicTasksInitialDelay, LeaderActionsInterval) { private val leaderActionsTask = FixedRateTask(clusterScheduler, PeriodicTasksInitialDelay, LeaderActionsInterval) {
leaderActions() leaderActions()
} }
createMBean() createMBean()
system.registerOnTermination(shutdown())
log.info("Cluster Node [{}] - has started up successfully", selfAddress) log.info("Cluster Node [{}] - has started up successfully", selfAddress)
// ====================================================== // ======================================================
@ -507,11 +516,13 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒
def shutdown(): Unit = { def shutdown(): Unit = {
if (isRunning.compareAndSet(true, false)) { if (isRunning.compareAndSet(true, false)) {
log.info("Cluster Node [{}] - Shutting down cluster Node and cluster daemons...", selfAddress) log.info("Cluster Node [{}] - Shutting down cluster Node and cluster daemons...", selfAddress)
gossipCanceller.cancel() gossipTask.cancel()
heartbeatCanceller.cancel() heartbeatTask.cancel()
failureDetectorReaperCanceller.cancel() failureDetectorReaperTask.cancel()
leaderActionsCanceller.cancel() leaderActionsTask.cancel()
system.stop(clusterDaemons) clusterScheduler.close()
if (!clusterDaemons.isTerminated)
system.stop(clusterDaemons)
try { try {
mBeanServer.unregisterMBean(clusterMBeanName) mBeanServer.unregisterMBean(clusterMBeanName)
} catch { } catch {

View file

@ -0,0 +1,54 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.cluster
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.AtomicLong
import akka.actor.Scheduler
import akka.util.Duration
/**
* INTERNAL API
*/
private[akka] object FixedRateTask {
def apply(scheduler: Scheduler, initalDelay: Duration, delay: Duration)(f: Unit): FixedRateTask = {
new FixedRateTask(scheduler, initalDelay, delay, new Runnable { def run(): Unit = f })
}
}
/**
* INTERNAL API
*
* Task to be scheduled periodically at a fixed rate, compensating, on average,
* for inaccuracy in scheduler. It will start when constructed, using the
* initialDelay.
*/
private[akka] class FixedRateTask(scheduler: Scheduler, initalDelay: Duration, delay: Duration, task: Runnable) extends Runnable {
private val delayMillis = delay.toMillis
private val minDelayMillis = 1L
private val cancelled = new AtomicBoolean(false)
private val counter = new AtomicLong(0L)
private val startTime = System.currentTimeMillis + initalDelay.toMillis
scheduler.scheduleOnce(initalDelay, this)
def cancel(): Unit = cancelled.set(true)
override final def run(): Unit = if (!cancelled.get) try {
task.run()
} finally if (!cancelled.get) {
val nextTime = startTime + delayMillis * counter.incrementAndGet
val nextDelayMillis = nextTime - System.currentTimeMillis
val nextDelay = Duration(
(if (nextDelayMillis <= minDelayMillis) minDelayMillis else nextDelayMillis),
TimeUnit.MILLISECONDS)
try {
scheduler.scheduleOnce(nextDelay, this)
} catch { case e: IllegalStateException /* will happen when scheduler is closed, nothing wrong */ }
}
}

View file

@ -0,0 +1,36 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.cluster
import java.util.concurrent.atomic.AtomicInteger
import akka.testkit.AkkaSpec
import akka.util.duration._
import akka.testkit.TimingTest
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class FixedRateTaskSpec extends AkkaSpec {
"Task scheduled at fixed rate" must {
"adjust for scheduler inaccuracy" taggedAs TimingTest in {
val counter = new AtomicInteger
FixedRateTask(system.scheduler, 150.millis, 150.millis) {
counter.incrementAndGet()
}
5000.millis.sleep()
counter.get must (be(33) or be(34))
}
"compensate for long running task" taggedAs TimingTest in {
val counter = new AtomicInteger
FixedRateTask(system.scheduler, 225.millis, 225.millis) {
counter.incrementAndGet()
80.millis.sleep()
}
5000.millis.sleep()
counter.get must (be(22) or be(23))
}
}
}