Use dedicated cluster scheduler only when default scheduler resolution isn't good enough, see #2214

* Config properties for scheduler
* Commented shutdown considerations
This commit is contained in:
Patrik Nordwall 2012-06-12 13:34:59 +02:00
parent a7d2be10eb
commit b27bae6554
4 changed files with 50 additions and 17 deletions

View file

@ -49,5 +49,14 @@ akka {
max-sample-size = 1000 max-sample-size = 1000
} }
# If the tick-duration of the default scheduler is longer than the tick-duration
# configured here a dedicated scheduler will be used for periodic tasks of the cluster,
# otherwise the default scheduler is used.
# See akka.scheduler settings for more details about the HashedWheelTimer.
scheduler {
tick-duration = 33ms
ticks-per-wheel = 512
}
} }
} }

View file

@ -19,6 +19,7 @@ import java.util.concurrent.TimeUnit._
import java.util.concurrent.TimeoutException import java.util.concurrent.TimeoutException
import akka.jsr166y.ThreadLocalRandom import akka.jsr166y.ThreadLocalRandom
import java.lang.management.ManagementFactory import java.lang.management.ManagementFactory
import java.io.Closeable
import javax.management._ import javax.management._
import scala.collection.immutable.{ Map, SortedSet } import scala.collection.immutable.{ Map, SortedSet }
import scala.annotation.tailrec import scala.annotation.tailrec
@ -435,10 +436,21 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
// ===================== WORK DAEMONS ===================== // ===================== WORK DAEMONS =====================
// ======================================================== // ========================================================
private def hwt = new HashedWheelTimer(log, private def useDedicatedScheduler: Boolean = system.settings.SchedulerTickDuration > SchedulerTickDuration
MonitorableThreadFactory(system.name + "-cluster-scheduler", system.settings.Daemonicity, None), 50.millis,
system.settings.SchedulerTicksPerWheel) private val clusterScheduler: Scheduler = {
private val clusterScheduler = new DefaultScheduler(hwt, log, system.dispatcher) if (useDedicatedScheduler) {
val threadFactory = system.threadFactory match {
case tf: MonitorableThreadFactory tf.copy(name = tf.name + "-cluster-scheduler")
case tf tf
}
val hwt = new HashedWheelTimer(log,
threadFactory,
SchedulerTickDuration, SchedulerTicksPerWheel)
new DefaultScheduler(hwt, log, system.dispatcher)
} else
system.scheduler
}
// start periodic gossip to random nodes in cluster // start periodic gossip to random nodes in cluster
private val gossipTask = FixedRateTask(clusterScheduler, PeriodicTasksInitialDelay, GossipInterval) { private val gossipTask = FixedRateTask(clusterScheduler, PeriodicTasksInitialDelay, GossipInterval) {
@ -527,13 +539,21 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
def shutdown(): Unit = { def shutdown(): Unit = {
if (isRunning.compareAndSet(true, false)) { if (isRunning.compareAndSet(true, false)) {
log.info("Cluster Node [{}] - Shutting down cluster Node and cluster daemons...", selfAddress) log.info("Cluster Node [{}] - Shutting down cluster Node and cluster daemons...", selfAddress)
// cancel the periodic tasks, note that otherwise they will be run when scheduler is shutdown
gossipTask.cancel() gossipTask.cancel()
heartbeatTask.cancel() heartbeatTask.cancel()
failureDetectorReaperTask.cancel() failureDetectorReaperTask.cancel()
leaderActionsTask.cancel() leaderActionsTask.cancel()
clusterScheduler.close() if (useDedicatedScheduler) clusterScheduler match {
case x: Closeable x.close()
case _
}
// FIXME isTerminated check can be removed when ticket #2221 is fixed
// now it prevents logging if system is shutdown (or in progress of shutdown)
if (!clusterDaemons.isTerminated) if (!clusterDaemons.isTerminated)
system.stop(clusterDaemons) system.stop(clusterDaemons)
try { try {
mBeanServer.unregisterMBean(clusterMBeanName) mBeanServer.unregisterMBean(clusterMBeanName)
} catch { } catch {

View file

@ -13,22 +13,24 @@ import akka.actor.AddressFromURIString
class ClusterSettings(val config: Config, val systemName: String) { class ClusterSettings(val config: Config, val systemName: String) {
import config._ import config._
val FailureDetectorThreshold = getInt("akka.cluster.failure-detector.threshold") final val FailureDetectorThreshold = getInt("akka.cluster.failure-detector.threshold")
val FailureDetectorMaxSampleSize = getInt("akka.cluster.failure-detector.max-sample-size") final val FailureDetectorMaxSampleSize = getInt("akka.cluster.failure-detector.max-sample-size")
val FailureDetectorImplementationClass: Option[String] = getString("akka.cluster.failure-detector.implementation-class") match { final val FailureDetectorImplementationClass: Option[String] = getString("akka.cluster.failure-detector.implementation-class") match {
case "" None case "" None
case fqcn Some(fqcn) case fqcn Some(fqcn)
} }
val NodeToJoin: Option[Address] = getString("akka.cluster.node-to-join") match { final val NodeToJoin: Option[Address] = getString("akka.cluster.node-to-join") match {
case "" None case "" None
case AddressFromURIString(addr) Some(addr) case AddressFromURIString(addr) Some(addr)
} }
val PeriodicTasksInitialDelay = Duration(getMilliseconds("akka.cluster.periodic-tasks-initial-delay"), MILLISECONDS) final val PeriodicTasksInitialDelay = Duration(getMilliseconds("akka.cluster.periodic-tasks-initial-delay"), MILLISECONDS)
val GossipInterval = Duration(getMilliseconds("akka.cluster.gossip-interval"), MILLISECONDS) final val GossipInterval = Duration(getMilliseconds("akka.cluster.gossip-interval"), MILLISECONDS)
val HeartbeatInterval = Duration(getMilliseconds("akka.cluster.heartbeat-interval"), MILLISECONDS) final val HeartbeatInterval = Duration(getMilliseconds("akka.cluster.heartbeat-interval"), MILLISECONDS)
val LeaderActionsInterval = Duration(getMilliseconds("akka.cluster.leader-actions-interval"), MILLISECONDS) final val LeaderActionsInterval = Duration(getMilliseconds("akka.cluster.leader-actions-interval"), MILLISECONDS)
val UnreachableNodesReaperInterval = Duration(getMilliseconds("akka.cluster.unreachable-nodes-reaper-interval"), MILLISECONDS) final val UnreachableNodesReaperInterval = Duration(getMilliseconds("akka.cluster.unreachable-nodes-reaper-interval"), MILLISECONDS)
val NrOfGossipDaemons = getInt("akka.cluster.nr-of-gossip-daemons") final val NrOfGossipDaemons = getInt("akka.cluster.nr-of-gossip-daemons")
val NrOfDeputyNodes = getInt("akka.cluster.nr-of-deputy-nodes") final val NrOfDeputyNodes = getInt("akka.cluster.nr-of-deputy-nodes")
val AutoDown = getBoolean("akka.cluster.auto-down") final val AutoDown = getBoolean("akka.cluster.auto-down")
final val SchedulerTickDuration = Duration(getMilliseconds("akka.cluster.scheduler.tick-duration"), MILLISECONDS)
final val SchedulerTicksPerWheel = getInt("akka.cluster.scheduler.ticks-per-wheel")
} }

View file

@ -28,6 +28,8 @@ class ClusterConfigSpec extends AkkaSpec {
NrOfGossipDaemons must be(4) NrOfGossipDaemons must be(4)
NrOfDeputyNodes must be(3) NrOfDeputyNodes must be(3)
AutoDown must be(true) AutoDown must be(true)
SchedulerTickDuration must be(33 millis)
SchedulerTicksPerWheel must be(512)
} }
} }
} }