Incorparate review comments, see #2473
* Also added ClusterSettings in constructor of ClusterDaemon, because that will be needed to decide if the metrics actor is to be started
This commit is contained in:
parent
6dd0d736f7
commit
83e7f5d6d6
2 changed files with 8 additions and 10 deletions
|
|
@ -139,7 +139,7 @@ private[cluster] object ClusterLeaderAction {
|
|||
*
|
||||
* Supervisor managing the different Cluster daemons.
|
||||
*/
|
||||
private[cluster] final class ClusterDaemon extends Actor with ActorLogging {
|
||||
private[cluster] final class ClusterDaemon(settings: ClusterSettings) extends Actor with ActorLogging {
|
||||
|
||||
// Important - don't use Cluster(context.system) here because that would
|
||||
// cause deadlock. The Cluster extension is currently being created and is waiting
|
||||
|
|
@ -165,9 +165,7 @@ private[cluster] final class ClusterCoreDaemon extends Actor with ActorLogging {
|
|||
import ClusterHeartbeatSender._
|
||||
|
||||
val cluster = Cluster(context.system)
|
||||
def selfAddress = cluster.selfAddress
|
||||
def clusterScheduler = cluster.scheduler
|
||||
def failureDetector = cluster.failureDetector
|
||||
import cluster.{ selfAddress, scheduler, failureDetector }
|
||||
import cluster.settings._
|
||||
|
||||
val vclockNode = VectorClock.Node(selfAddress.toString)
|
||||
|
|
@ -191,32 +189,32 @@ private[cluster] final class ClusterCoreDaemon extends Actor with ActorLogging {
|
|||
|
||||
// start periodic gossip to random nodes in cluster
|
||||
val gossipTask =
|
||||
FixedRateTask(clusterScheduler, PeriodicTasksInitialDelay.max(GossipInterval), GossipInterval) {
|
||||
FixedRateTask(scheduler, PeriodicTasksInitialDelay.max(GossipInterval), GossipInterval) {
|
||||
self ! GossipTick
|
||||
}
|
||||
|
||||
// start periodic heartbeat to all nodes in cluster
|
||||
val heartbeatTask =
|
||||
FixedRateTask(clusterScheduler, PeriodicTasksInitialDelay.max(HeartbeatInterval), HeartbeatInterval) {
|
||||
FixedRateTask(scheduler, PeriodicTasksInitialDelay.max(HeartbeatInterval), HeartbeatInterval) {
|
||||
self ! HeartbeatTick
|
||||
}
|
||||
|
||||
// start periodic cluster failure detector reaping (moving nodes condemned by the failure detector to unreachable list)
|
||||
val failureDetectorReaperTask =
|
||||
FixedRateTask(clusterScheduler, PeriodicTasksInitialDelay.max(UnreachableNodesReaperInterval), UnreachableNodesReaperInterval) {
|
||||
FixedRateTask(scheduler, PeriodicTasksInitialDelay.max(UnreachableNodesReaperInterval), UnreachableNodesReaperInterval) {
|
||||
self ! ReapUnreachableTick
|
||||
}
|
||||
|
||||
// start periodic leader action management (only applies for the current leader)
|
||||
private val leaderActionsTask =
|
||||
FixedRateTask(clusterScheduler, PeriodicTasksInitialDelay.max(LeaderActionsInterval), LeaderActionsInterval) {
|
||||
FixedRateTask(scheduler, PeriodicTasksInitialDelay.max(LeaderActionsInterval), LeaderActionsInterval) {
|
||||
self ! LeaderActionsTick
|
||||
}
|
||||
|
||||
// start periodic publish of current state
|
||||
private val publishStateTask: Option[Cancellable] =
|
||||
if (PublishStatsInterval == Duration.Zero) None
|
||||
else Some(FixedRateTask(clusterScheduler, PeriodicTasksInitialDelay.max(PublishStatsInterval), PublishStatsInterval) {
|
||||
else Some(FixedRateTask(scheduler, PeriodicTasksInitialDelay.max(PublishStatsInterval), PublishStatsInterval) {
|
||||
self ! PublishStatsTick
|
||||
})
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue