diff --git a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala index dda05bf6b0..571a8eaf68 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala @@ -436,10 +436,11 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) // ===================== WORK DAEMONS ===================== // ======================================================== - private def useDedicatedScheduler: Boolean = system.settings.SchedulerTickDuration > SchedulerTickDuration - - private val clusterScheduler: Scheduler = { - if (useDedicatedScheduler) { + private val clusterScheduler: Scheduler with Closeable = { + if (system.settings.SchedulerTickDuration > SchedulerTickDuration) { + log.info("Using a dedicated scheduler for cluster. Default scheduler can be used if configured " + + "with 'akka.scheduler.tick-duration' [{} ms] <= 'akka.cluster.scheduler.tick-duration' [{} ms].", + system.settings.SchedulerTickDuration.toMillis, SchedulerTickDuration.toMillis) val threadFactory = system.threadFactory match { case tf: MonitorableThreadFactory ⇒ tf.copy(name = tf.name + "-cluster-scheduler") case tf ⇒ tf @@ -448,8 +449,26 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) threadFactory, SchedulerTickDuration, SchedulerTicksPerWheel) new DefaultScheduler(hwt, log, system.dispatcher) - } else - system.scheduler + } else { + // delegate to system.scheduler, but don't close + val systemScheduler = system.scheduler + new Scheduler with Closeable { + // we are using system.scheduler, which we are not responsible for closing + def close(): Unit = () + def schedule(initialDelay: Duration, frequency: Duration, receiver: ActorRef, message: Any): Cancellable = + systemScheduler.schedule(initialDelay, frequency, receiver, message) + def schedule(initialDelay: Duration, frequency: Duration)(f: ⇒ Unit): Cancellable = + systemScheduler.schedule(initialDelay, frequency)(f) + def schedule(initialDelay: Duration, frequency: Duration, runnable: Runnable): Cancellable = + systemScheduler.schedule(initialDelay, frequency, runnable) + def scheduleOnce(delay: Duration, runnable: Runnable): Cancellable = + systemScheduler.scheduleOnce(delay, runnable) + def scheduleOnce(delay: Duration, receiver: ActorRef, message: Any): Cancellable = + systemScheduler.scheduleOnce(delay, receiver, message) + def scheduleOnce(delay: Duration)(f: ⇒ Unit): Cancellable = + systemScheduler.scheduleOnce(delay)(f) + } + } } // start periodic gossip to random nodes in cluster @@ -545,10 +564,8 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) heartbeatTask.cancel() failureDetectorReaperTask.cancel() leaderActionsTask.cancel() - if (useDedicatedScheduler) clusterScheduler match { - case x: Closeable ⇒ x.close() - case _ ⇒ - } + clusterScheduler.close() + // FIXME isTerminated check can be removed when ticket #2221 is fixed // now it prevents logging if system is shutdown (or in progress of shutdown) if (!clusterDaemons.isTerminated)