diff --git a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala index b463c3b0ea..8be6b21d25 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala @@ -23,6 +23,8 @@ import javax.management._ import scala.collection.immutable.{ Map, SortedSet } import scala.annotation.tailrec import com.google.protobuf.ByteString +import akka.util.internal.HashedWheelTimer +import akka.dispatch.MonitorableThreadFactory /** * Interface for membership change listener. @@ -422,28 +424,35 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒ // ===================== WORK DAEMONS ===================== // ======================================================== + private def hwt = new HashedWheelTimer(log, + MonitorableThreadFactory(system.name + "-cluster-scheduler", system.settings.Daemonicity, None), 50.millis, + system.settings.SchedulerTicksPerWheel) + private val clusterScheduler = new DefaultScheduler(hwt, log, system.dispatcher) + // start periodic gossip to random nodes in cluster - private val gossipCanceller = system.scheduler.schedule(PeriodicTasksInitialDelay, GossipInterval) { + private val gossipTask = FixedRateTask(clusterScheduler, PeriodicTasksInitialDelay, GossipInterval) { gossip() } // start periodic heartbeat to all nodes in cluster - private val heartbeatCanceller = system.scheduler.schedule(PeriodicTasksInitialDelay, HeartbeatInterval) { + private val heartbeatTask = FixedRateTask(clusterScheduler, PeriodicTasksInitialDelay, HeartbeatInterval) { heartbeat() } // start periodic cluster failure detector reaping (moving nodes condemned by the failure detector to unreachable list) - private val failureDetectorReaperCanceller = system.scheduler.schedule(PeriodicTasksInitialDelay, UnreachableNodesReaperInterval) { + private val failureDetectorReaperTask = FixedRateTask(clusterScheduler, PeriodicTasksInitialDelay, UnreachableNodesReaperInterval) { reapUnreachableMembers() } // start periodic leader action management (only applies for the current leader) - private val leaderActionsCanceller = system.scheduler.schedule(PeriodicTasksInitialDelay, LeaderActionsInterval) { + private val leaderActionsTask = FixedRateTask(clusterScheduler, PeriodicTasksInitialDelay, LeaderActionsInterval) { leaderActions() } createMBean() + system.registerOnTermination(shutdown()) + log.info("Cluster Node [{}] - has started up successfully", selfAddress) // ====================================================== @@ -507,11 +516,13 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒ def shutdown(): Unit = { if (isRunning.compareAndSet(true, false)) { log.info("Cluster Node [{}] - Shutting down cluster Node and cluster daemons...", selfAddress) - gossipCanceller.cancel() - heartbeatCanceller.cancel() - failureDetectorReaperCanceller.cancel() - leaderActionsCanceller.cancel() - system.stop(clusterDaemons) + gossipTask.cancel() + heartbeatTask.cancel() + failureDetectorReaperTask.cancel() + leaderActionsTask.cancel() + clusterScheduler.close() + if (!clusterDaemons.isTerminated) + system.stop(clusterDaemons) try { mBeanServer.unregisterMBean(clusterMBeanName) } catch { diff --git a/akka-cluster/src/main/scala/akka/cluster/FixedRateTask.scala b/akka-cluster/src/main/scala/akka/cluster/FixedRateTask.scala new file mode 100644 index 0000000000..0f594316d9 --- /dev/null +++ b/akka-cluster/src/main/scala/akka/cluster/FixedRateTask.scala @@ -0,0 +1,54 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ + +package akka.cluster + +import java.util.concurrent.TimeUnit +import java.util.concurrent.atomic.AtomicBoolean +import java.util.concurrent.atomic.AtomicLong + +import akka.actor.Scheduler +import akka.util.Duration + +/** + * INTERNAL API + */ +private[akka] object FixedRateTask { + def apply(scheduler: Scheduler, initalDelay: Duration, delay: Duration)(f: ⇒ Unit): FixedRateTask = { + new FixedRateTask(scheduler, initalDelay, delay, new Runnable { def run(): Unit = f }) + } +} + +/** + * INTERNAL API + * + * Task to be scheduled periodically at a fixed rate, compensating, on average, + * for inaccuracy in scheduler. It will start when constructed, using the + * initialDelay. + */ +private[akka] class FixedRateTask(scheduler: Scheduler, initalDelay: Duration, delay: Duration, task: Runnable) extends Runnable { + + private val delayMillis = delay.toMillis + private val minDelayMillis = 1L + private val cancelled = new AtomicBoolean(false) + private val counter = new AtomicLong(0L) + private val startTime = System.currentTimeMillis + initalDelay.toMillis + scheduler.scheduleOnce(initalDelay, this) + + def cancel(): Unit = cancelled.set(true) + + override final def run(): Unit = if (!cancelled.get) try { + task.run() + } finally if (!cancelled.get) { + val nextTime = startTime + delayMillis * counter.incrementAndGet + val nextDelayMillis = nextTime - System.currentTimeMillis + val nextDelay = Duration( + (if (nextDelayMillis <= minDelayMillis) minDelayMillis else nextDelayMillis), + TimeUnit.MILLISECONDS) + try { + scheduler.scheduleOnce(nextDelay, this) + } catch { case e: IllegalStateException ⇒ /* will happen when scheduler is closed, nothing wrong */ } + } + +} diff --git a/akka-cluster/src/test/scala/akka/cluster/FixedRateTaskSpec.scala b/akka-cluster/src/test/scala/akka/cluster/FixedRateTaskSpec.scala new file mode 100644 index 0000000000..3efa3ab3ab --- /dev/null +++ b/akka-cluster/src/test/scala/akka/cluster/FixedRateTaskSpec.scala @@ -0,0 +1,36 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ + +package akka.cluster + +import java.util.concurrent.atomic.AtomicInteger +import akka.testkit.AkkaSpec +import akka.util.duration._ +import akka.testkit.TimingTest + +@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) +class FixedRateTaskSpec extends AkkaSpec { + + "Task scheduled at fixed rate" must { + "adjust for scheduler inaccuracy" taggedAs TimingTest in { + val counter = new AtomicInteger + FixedRateTask(system.scheduler, 150.millis, 150.millis) { + counter.incrementAndGet() + } + 5000.millis.sleep() + counter.get must (be(33) or be(34)) + } + + "compensate for long running task" taggedAs TimingTest in { + val counter = new AtomicInteger + FixedRateTask(system.scheduler, 225.millis, 225.millis) { + counter.incrementAndGet() + 80.millis.sleep() + } + 5000.millis.sleep() + counter.get must (be(22) or be(23)) + } + } +} +