Info log about dedicated scheduler, and refactoring, see #2214
* Refactoring with wrapping of Scheduler according to @viktorklang's wish
This commit is contained in:
parent
7b6ae2f5c9
commit
40d9b27e73
1 changed files with 27 additions and 10 deletions
|
|
@ -436,10 +436,11 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
|
|||
// ===================== WORK DAEMONS =====================
|
||||
// ========================================================
|
||||
|
||||
private def useDedicatedScheduler: Boolean = system.settings.SchedulerTickDuration > SchedulerTickDuration
|
||||
|
||||
private val clusterScheduler: Scheduler = {
|
||||
if (useDedicatedScheduler) {
|
||||
private val clusterScheduler: Scheduler with Closeable = {
|
||||
if (system.settings.SchedulerTickDuration > SchedulerTickDuration) {
|
||||
log.info("Using a dedicated scheduler for cluster. Default scheduler can be used if configured " +
|
||||
"with 'akka.scheduler.tick-duration' [{} ms] <= 'akka.cluster.scheduler.tick-duration' [{} ms].",
|
||||
system.settings.SchedulerTickDuration.toMillis, SchedulerTickDuration.toMillis)
|
||||
val threadFactory = system.threadFactory match {
|
||||
case tf: MonitorableThreadFactory ⇒ tf.copy(name = tf.name + "-cluster-scheduler")
|
||||
case tf ⇒ tf
|
||||
|
|
@ -448,8 +449,26 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
|
|||
threadFactory,
|
||||
SchedulerTickDuration, SchedulerTicksPerWheel)
|
||||
new DefaultScheduler(hwt, log, system.dispatcher)
|
||||
} else
|
||||
system.scheduler
|
||||
} else {
|
||||
// delegate to system.scheduler, but don't close
|
||||
val systemScheduler = system.scheduler
|
||||
new Scheduler with Closeable {
|
||||
// we are using system.scheduler, which we are not responsible for closing
|
||||
def close(): Unit = ()
|
||||
def schedule(initialDelay: Duration, frequency: Duration, receiver: ActorRef, message: Any): Cancellable =
|
||||
systemScheduler.schedule(initialDelay, frequency, receiver, message)
|
||||
def schedule(initialDelay: Duration, frequency: Duration)(f: ⇒ Unit): Cancellable =
|
||||
systemScheduler.schedule(initialDelay, frequency)(f)
|
||||
def schedule(initialDelay: Duration, frequency: Duration, runnable: Runnable): Cancellable =
|
||||
systemScheduler.schedule(initialDelay, frequency, runnable)
|
||||
def scheduleOnce(delay: Duration, runnable: Runnable): Cancellable =
|
||||
systemScheduler.scheduleOnce(delay, runnable)
|
||||
def scheduleOnce(delay: Duration, receiver: ActorRef, message: Any): Cancellable =
|
||||
systemScheduler.scheduleOnce(delay, receiver, message)
|
||||
def scheduleOnce(delay: Duration)(f: ⇒ Unit): Cancellable =
|
||||
systemScheduler.scheduleOnce(delay)(f)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// start periodic gossip to random nodes in cluster
|
||||
|
|
@ -545,10 +564,8 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
|
|||
heartbeatTask.cancel()
|
||||
failureDetectorReaperTask.cancel()
|
||||
leaderActionsTask.cancel()
|
||||
if (useDedicatedScheduler) clusterScheduler match {
|
||||
case x: Closeable ⇒ x.close()
|
||||
case _ ⇒
|
||||
}
|
||||
clusterScheduler.close()
|
||||
|
||||
// FIXME isTerminated check can be removed when ticket #2221 is fixed
|
||||
// now it prevents logging if system is shutdown (or in progress of shutdown)
|
||||
if (!clusterDaemons.isTerminated)
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue