diff --git a/akka-cluster/src/main/resources/reference.conf b/akka-cluster/src/main/resources/reference.conf index 42ce7e4a77..7dd511e34a 100644 --- a/akka-cluster/src/main/resources/reference.conf +++ b/akka-cluster/src/main/resources/reference.conf @@ -17,12 +17,21 @@ akka { # the number of gossip daemon actors nr-of-gossip-daemons = 4 + + # the number of deputy nodes (the nodes responsible for breaking network partitions) nr-of-deputy-nodes = 3 - gossip { - initialDelay = 5s - frequency = 1s - } + # how long should the node wait before starting the periodic tasks maintenance tasks? + periodic-tasks-initial-delay = 1s + + # how often should the node send out gossip information? + gossip-frequency = 1s + + # how often should the leader perform maintenance tasks? + leader-actions-frequency = 1s + + # how often should the node move nodes, marked as unreachable by the failure detector, out of the membership ring? + unreachable-nodes-reaper-frequency = 1s # accrual failure detection config failure-detector { diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala index 50b0f5bd0b..a24c75b436 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala @@ -19,8 +19,10 @@ class ClusterSettings(val config: Config, val systemName: String) { case "" ⇒ None case AddressFromURIString(addr) ⇒ Some(addr) } - val GossipInitialDelay = Duration(getMilliseconds("akka.cluster.gossip.initialDelay"), MILLISECONDS) - val GossipFrequency = Duration(getMilliseconds("akka.cluster.gossip.frequency"), MILLISECONDS) + val PeriodicTasksInitialDelay = Duration(getMilliseconds("akka.cluster.periodic-tasks-initial-delay"), MILLISECONDS) + val GossipFrequency = Duration(getMilliseconds("akka.cluster.gossip-frequency"), MILLISECONDS) + val LeaderActionsFrequency = Duration(getMilliseconds("akka.cluster.leader-actions-frequency"), MILLISECONDS) + val UnreachableNodesReaperFrequency = Duration(getMilliseconds("akka.cluster.unreachable-nodes-reaper-frequency"), MILLISECONDS) val NrOfGossipDaemons = getInt("akka.cluster.nr-of-gossip-daemons") val NrOfDeputyNodes = getInt("akka.cluster.nr-of-deputy-nodes") val AutoDown = getBoolean("akka.cluster.auto-down") diff --git a/akka-cluster/src/main/scala/akka/cluster/Node.scala b/akka-cluster/src/main/scala/akka/cluster/Node.scala index 4c94434806..9cb5bb6f50 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Node.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Node.scala @@ -335,8 +335,8 @@ object Node extends ExtensionId[Node] with ExtensionIdProvider { class Node(system: ExtendedActorSystem) extends Extension { /** - * Represents the state for this Node. Implemented using optimistic lockless concurrency, - * all state is represented by this immutable case class and managed by an AtomicReference. + * Represents the state for this Node. Implemented using optimistic lockless concurrency. + * All state is represented by this immutable case class and managed by an AtomicReference. */ private case class State( latestGossip: Gossip, @@ -356,8 +356,10 @@ class Node(system: ExtendedActorSystem) extends Extension { private val vclockNode = VectorClock.Node(remoteAddress.toString) - private val gossipInitialDelay = clusterSettings.GossipInitialDelay + private val periodicTasksInitialDelay = clusterSettings.PeriodicTasksInitialDelay private val gossipFrequency = clusterSettings.GossipFrequency + private val leaderActionsFrequency = clusterSettings.LeaderActionsFrequency + private val unreachableNodesReaperFrequency = clusterSettings.UnreachableNodesReaperFrequency implicit private val defaultTimeout = Timeout(remoteSettings.RemoteSystemDaemonAckTimeout) @@ -397,17 +399,17 @@ class Node(system: ExtendedActorSystem) extends Extension { // ======================================================== // start periodic gossip to random nodes in cluster - private val gossipCanceller = system.scheduler.schedule(gossipInitialDelay, gossipFrequency) { + private val gossipCanceller = system.scheduler.schedule(periodicTasksInitialDelay, gossipFrequency) { gossip() } // start periodic cluster failure detector reaping (moving nodes condemned by the failure detector to unreachable list) - private val failureDetectorReaperCanceller = system.scheduler.schedule(gossipInitialDelay, gossipFrequency) { // TODO: should we use the same gossipFrequency for reaping? + private val failureDetectorReaperCanceller = system.scheduler.schedule(periodicTasksInitialDelay, unreachableNodesReaperFrequency) { reapUnreachableMembers() } // start periodic leader action management (only applies for the current leader) - private val leaderActionsCanceller = system.scheduler.schedule(gossipInitialDelay, gossipFrequency) { // TODO: should we use the same gossipFrequency for leaderActions? + private val leaderActionsCanceller = system.scheduler.schedule(periodicTasksInitialDelay, leaderActionsFrequency) { leaderActions() } diff --git a/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala index 1a4b8a6082..3a92b46eec 100644 --- a/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala @@ -19,8 +19,10 @@ class ClusterConfigSpec extends ClusterSpec { FailureDetectorThreshold must be(8) FailureDetectorMaxSampleSize must be(1000) NodeToJoin must be(None) - GossipInitialDelay must be(5 seconds) + PeriodicTasksInitialDelay must be(1 seconds) GossipFrequency must be(1 second) + LeaderActionsFrequency must be(1 second) + UnreachableNodesReaperFrequency must be(1 second) NrOfGossipDaemons must be(4) NrOfDeputyNodes must be(3) AutoDown must be(true)