Added config options for run frequency of the different periodic node tasks.
Signed-off-by: Jonas Bonér <jonas@jonasboner.com>
This commit is contained in:
parent
50d11593c9
commit
1ae2c68d2f
4 changed files with 28 additions and 13 deletions
|
|
@ -17,12 +17,21 @@ akka {
|
||||||
|
|
||||||
# the number of gossip daemon actors
|
# the number of gossip daemon actors
|
||||||
nr-of-gossip-daemons = 4
|
nr-of-gossip-daemons = 4
|
||||||
|
|
||||||
|
# the number of deputy nodes (the nodes responsible for breaking network partitions)
|
||||||
nr-of-deputy-nodes = 3
|
nr-of-deputy-nodes = 3
|
||||||
|
|
||||||
gossip {
|
# how long should the node wait before starting the periodic tasks maintenance tasks?
|
||||||
initialDelay = 5s
|
periodic-tasks-initial-delay = 1s
|
||||||
frequency = 1s
|
|
||||||
}
|
# how often should the node send out gossip information?
|
||||||
|
gossip-frequency = 1s
|
||||||
|
|
||||||
|
# how often should the leader perform maintenance tasks?
|
||||||
|
leader-actions-frequency = 1s
|
||||||
|
|
||||||
|
# how often should the node move nodes, marked as unreachable by the failure detector, out of the membership ring?
|
||||||
|
unreachable-nodes-reaper-frequency = 1s
|
||||||
|
|
||||||
# accrual failure detection config
|
# accrual failure detection config
|
||||||
failure-detector {
|
failure-detector {
|
||||||
|
|
|
||||||
|
|
@ -19,8 +19,10 @@ class ClusterSettings(val config: Config, val systemName: String) {
|
||||||
case "" ⇒ None
|
case "" ⇒ None
|
||||||
case AddressFromURIString(addr) ⇒ Some(addr)
|
case AddressFromURIString(addr) ⇒ Some(addr)
|
||||||
}
|
}
|
||||||
val GossipInitialDelay = Duration(getMilliseconds("akka.cluster.gossip.initialDelay"), MILLISECONDS)
|
val PeriodicTasksInitialDelay = Duration(getMilliseconds("akka.cluster.periodic-tasks-initial-delay"), MILLISECONDS)
|
||||||
val GossipFrequency = Duration(getMilliseconds("akka.cluster.gossip.frequency"), MILLISECONDS)
|
val GossipFrequency = Duration(getMilliseconds("akka.cluster.gossip-frequency"), MILLISECONDS)
|
||||||
|
val LeaderActionsFrequency = Duration(getMilliseconds("akka.cluster.leader-actions-frequency"), MILLISECONDS)
|
||||||
|
val UnreachableNodesReaperFrequency = Duration(getMilliseconds("akka.cluster.unreachable-nodes-reaper-frequency"), MILLISECONDS)
|
||||||
val NrOfGossipDaemons = getInt("akka.cluster.nr-of-gossip-daemons")
|
val NrOfGossipDaemons = getInt("akka.cluster.nr-of-gossip-daemons")
|
||||||
val NrOfDeputyNodes = getInt("akka.cluster.nr-of-deputy-nodes")
|
val NrOfDeputyNodes = getInt("akka.cluster.nr-of-deputy-nodes")
|
||||||
val AutoDown = getBoolean("akka.cluster.auto-down")
|
val AutoDown = getBoolean("akka.cluster.auto-down")
|
||||||
|
|
|
||||||
|
|
@ -335,8 +335,8 @@ object Node extends ExtensionId[Node] with ExtensionIdProvider {
|
||||||
class Node(system: ExtendedActorSystem) extends Extension {
|
class Node(system: ExtendedActorSystem) extends Extension {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Represents the state for this Node. Implemented using optimistic lockless concurrency,
|
* Represents the state for this Node. Implemented using optimistic lockless concurrency.
|
||||||
* all state is represented by this immutable case class and managed by an AtomicReference.
|
* All state is represented by this immutable case class and managed by an AtomicReference.
|
||||||
*/
|
*/
|
||||||
private case class State(
|
private case class State(
|
||||||
latestGossip: Gossip,
|
latestGossip: Gossip,
|
||||||
|
|
@ -356,8 +356,10 @@ class Node(system: ExtendedActorSystem) extends Extension {
|
||||||
|
|
||||||
private val vclockNode = VectorClock.Node(remoteAddress.toString)
|
private val vclockNode = VectorClock.Node(remoteAddress.toString)
|
||||||
|
|
||||||
private val gossipInitialDelay = clusterSettings.GossipInitialDelay
|
private val periodicTasksInitialDelay = clusterSettings.PeriodicTasksInitialDelay
|
||||||
private val gossipFrequency = clusterSettings.GossipFrequency
|
private val gossipFrequency = clusterSettings.GossipFrequency
|
||||||
|
private val leaderActionsFrequency = clusterSettings.LeaderActionsFrequency
|
||||||
|
private val unreachableNodesReaperFrequency = clusterSettings.UnreachableNodesReaperFrequency
|
||||||
|
|
||||||
implicit private val defaultTimeout = Timeout(remoteSettings.RemoteSystemDaemonAckTimeout)
|
implicit private val defaultTimeout = Timeout(remoteSettings.RemoteSystemDaemonAckTimeout)
|
||||||
|
|
||||||
|
|
@ -397,17 +399,17 @@ class Node(system: ExtendedActorSystem) extends Extension {
|
||||||
// ========================================================
|
// ========================================================
|
||||||
|
|
||||||
// start periodic gossip to random nodes in cluster
|
// start periodic gossip to random nodes in cluster
|
||||||
private val gossipCanceller = system.scheduler.schedule(gossipInitialDelay, gossipFrequency) {
|
private val gossipCanceller = system.scheduler.schedule(periodicTasksInitialDelay, gossipFrequency) {
|
||||||
gossip()
|
gossip()
|
||||||
}
|
}
|
||||||
|
|
||||||
// start periodic cluster failure detector reaping (moving nodes condemned by the failure detector to unreachable list)
|
// start periodic cluster failure detector reaping (moving nodes condemned by the failure detector to unreachable list)
|
||||||
private val failureDetectorReaperCanceller = system.scheduler.schedule(gossipInitialDelay, gossipFrequency) { // TODO: should we use the same gossipFrequency for reaping?
|
private val failureDetectorReaperCanceller = system.scheduler.schedule(periodicTasksInitialDelay, unreachableNodesReaperFrequency) {
|
||||||
reapUnreachableMembers()
|
reapUnreachableMembers()
|
||||||
}
|
}
|
||||||
|
|
||||||
// start periodic leader action management (only applies for the current leader)
|
// start periodic leader action management (only applies for the current leader)
|
||||||
private val leaderActionsCanceller = system.scheduler.schedule(gossipInitialDelay, gossipFrequency) { // TODO: should we use the same gossipFrequency for leaderActions?
|
private val leaderActionsCanceller = system.scheduler.schedule(periodicTasksInitialDelay, leaderActionsFrequency) {
|
||||||
leaderActions()
|
leaderActions()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -19,8 +19,10 @@ class ClusterConfigSpec extends ClusterSpec {
|
||||||
FailureDetectorThreshold must be(8)
|
FailureDetectorThreshold must be(8)
|
||||||
FailureDetectorMaxSampleSize must be(1000)
|
FailureDetectorMaxSampleSize must be(1000)
|
||||||
NodeToJoin must be(None)
|
NodeToJoin must be(None)
|
||||||
GossipInitialDelay must be(5 seconds)
|
PeriodicTasksInitialDelay must be(1 seconds)
|
||||||
GossipFrequency must be(1 second)
|
GossipFrequency must be(1 second)
|
||||||
|
LeaderActionsFrequency must be(1 second)
|
||||||
|
UnreachableNodesReaperFrequency must be(1 second)
|
||||||
NrOfGossipDaemons must be(4)
|
NrOfGossipDaemons must be(4)
|
||||||
NrOfDeputyNodes must be(3)
|
NrOfDeputyNodes must be(3)
|
||||||
AutoDown must be(true)
|
AutoDown must be(true)
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue