diff --git a/akka-cluster/src/main/resources/reference.conf b/akka-cluster/src/main/resources/reference.conf index 7fb930eaef..b9104fe6cf 100644 --- a/akka-cluster/src/main/resources/reference.conf +++ b/akka-cluster/src/main/resources/reference.conf @@ -49,5 +49,14 @@ akka { max-sample-size = 1000 } + + # If the tick-duration of the default scheduler is longer than the tick-duration + # configured here a dedicated scheduler will be used for periodic tasks of the cluster, + # otherwise the default scheduler is used. + # See akka.scheduler settings for more details about the HashedWheelTimer. + scheduler { + tick-duration = 33ms + ticks-per-wheel = 512 + } } } diff --git a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala index 46c6919cc1..dda05bf6b0 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala @@ -19,6 +19,7 @@ import java.util.concurrent.TimeUnit._ import java.util.concurrent.TimeoutException import akka.jsr166y.ThreadLocalRandom import java.lang.management.ManagementFactory +import java.io.Closeable import javax.management._ import scala.collection.immutable.{ Map, SortedSet } import scala.annotation.tailrec @@ -435,10 +436,21 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) // ===================== WORK DAEMONS ===================== // ======================================================== - private def hwt = new HashedWheelTimer(log, - MonitorableThreadFactory(system.name + "-cluster-scheduler", system.settings.Daemonicity, None), 50.millis, - system.settings.SchedulerTicksPerWheel) - private val clusterScheduler = new DefaultScheduler(hwt, log, system.dispatcher) + private def useDedicatedScheduler: Boolean = system.settings.SchedulerTickDuration > SchedulerTickDuration + + private val clusterScheduler: Scheduler = { + if (useDedicatedScheduler) { + val threadFactory = system.threadFactory match { + case tf: MonitorableThreadFactory ⇒ tf.copy(name = tf.name + "-cluster-scheduler") + case tf ⇒ tf + } + val hwt = new HashedWheelTimer(log, + threadFactory, + SchedulerTickDuration, SchedulerTicksPerWheel) + new DefaultScheduler(hwt, log, system.dispatcher) + } else + system.scheduler + } // start periodic gossip to random nodes in cluster private val gossipTask = FixedRateTask(clusterScheduler, PeriodicTasksInitialDelay, GossipInterval) { @@ -527,13 +539,21 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) def shutdown(): Unit = { if (isRunning.compareAndSet(true, false)) { log.info("Cluster Node [{}] - Shutting down cluster Node and cluster daemons...", selfAddress) + + // cancel the periodic tasks, note that otherwise they will be run when scheduler is shutdown gossipTask.cancel() heartbeatTask.cancel() failureDetectorReaperTask.cancel() leaderActionsTask.cancel() - clusterScheduler.close() + if (useDedicatedScheduler) clusterScheduler match { + case x: Closeable ⇒ x.close() + case _ ⇒ + } + // FIXME isTerminated check can be removed when ticket #2221 is fixed + // now it prevents logging if system is shutdown (or in progress of shutdown) if (!clusterDaemons.isTerminated) system.stop(clusterDaemons) + try { mBeanServer.unregisterMBean(clusterMBeanName) } catch { diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala index 9a17f2a0eb..ee4f6a03d2 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala @@ -13,22 +13,24 @@ import akka.actor.AddressFromURIString class ClusterSettings(val config: Config, val systemName: String) { import config._ - val FailureDetectorThreshold = getInt("akka.cluster.failure-detector.threshold") - val FailureDetectorMaxSampleSize = getInt("akka.cluster.failure-detector.max-sample-size") - val FailureDetectorImplementationClass: Option[String] = getString("akka.cluster.failure-detector.implementation-class") match { + final val FailureDetectorThreshold = getInt("akka.cluster.failure-detector.threshold") + final val FailureDetectorMaxSampleSize = getInt("akka.cluster.failure-detector.max-sample-size") + final val FailureDetectorImplementationClass: Option[String] = getString("akka.cluster.failure-detector.implementation-class") match { case "" ⇒ None case fqcn ⇒ Some(fqcn) } - val NodeToJoin: Option[Address] = getString("akka.cluster.node-to-join") match { + final val NodeToJoin: Option[Address] = getString("akka.cluster.node-to-join") match { case "" ⇒ None case AddressFromURIString(addr) ⇒ Some(addr) } - val PeriodicTasksInitialDelay = Duration(getMilliseconds("akka.cluster.periodic-tasks-initial-delay"), MILLISECONDS) - val GossipInterval = Duration(getMilliseconds("akka.cluster.gossip-interval"), MILLISECONDS) - val HeartbeatInterval = Duration(getMilliseconds("akka.cluster.heartbeat-interval"), MILLISECONDS) - val LeaderActionsInterval = Duration(getMilliseconds("akka.cluster.leader-actions-interval"), MILLISECONDS) - val UnreachableNodesReaperInterval = Duration(getMilliseconds("akka.cluster.unreachable-nodes-reaper-interval"), MILLISECONDS) - val NrOfGossipDaemons = getInt("akka.cluster.nr-of-gossip-daemons") - val NrOfDeputyNodes = getInt("akka.cluster.nr-of-deputy-nodes") - val AutoDown = getBoolean("akka.cluster.auto-down") + final val PeriodicTasksInitialDelay = Duration(getMilliseconds("akka.cluster.periodic-tasks-initial-delay"), MILLISECONDS) + final val GossipInterval = Duration(getMilliseconds("akka.cluster.gossip-interval"), MILLISECONDS) + final val HeartbeatInterval = Duration(getMilliseconds("akka.cluster.heartbeat-interval"), MILLISECONDS) + final val LeaderActionsInterval = Duration(getMilliseconds("akka.cluster.leader-actions-interval"), MILLISECONDS) + final val UnreachableNodesReaperInterval = Duration(getMilliseconds("akka.cluster.unreachable-nodes-reaper-interval"), MILLISECONDS) + final val NrOfGossipDaemons = getInt("akka.cluster.nr-of-gossip-daemons") + final val NrOfDeputyNodes = getInt("akka.cluster.nr-of-deputy-nodes") + final val AutoDown = getBoolean("akka.cluster.auto-down") + final val SchedulerTickDuration = Duration(getMilliseconds("akka.cluster.scheduler.tick-duration"), MILLISECONDS) + final val SchedulerTicksPerWheel = getInt("akka.cluster.scheduler.ticks-per-wheel") } diff --git a/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala index 6c9023d410..481d9f7e5a 100644 --- a/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala @@ -28,6 +28,8 @@ class ClusterConfigSpec extends AkkaSpec { NrOfGossipDaemons must be(4) NrOfDeputyNodes must be(3) AutoDown must be(true) + SchedulerTickDuration must be(33 millis) + SchedulerTicksPerWheel must be(512) } } }