diff --git a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala index 5d9f3e7146..79513d9764 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala @@ -27,6 +27,8 @@ import com.typesafe.config.ConfigFactory import akka.remote.DefaultFailureDetectorRegistry import akka.remote.FailureDetector import com.typesafe.config.Config +import akka.event.LoggingAdapter +import java.util.concurrent.ThreadFactory /** * Cluster Extension Id and factory for creating Cluster extension. @@ -96,20 +98,24 @@ class Cluster(val system: ExtendedActorSystem) extends Extension { /** * INTERNAL API */ - private[cluster] val scheduler: Scheduler with Closeable = { + private[cluster] val scheduler: Scheduler = { if (system.scheduler.maxFrequency < 1.second / SchedulerTickDuration) { import scala.collection.JavaConverters._ log.info("Using a dedicated scheduler for cluster. Default scheduler can be used if configured " + "with 'akka.scheduler.tick-duration' [{} ms] <= 'akka.cluster.scheduler.tick-duration' [{} ms].", (1000 / system.scheduler.maxFrequency).toInt, SchedulerTickDuration.toMillis) - new DefaultScheduler( - ConfigFactory.parseString(s"akka.scheduler.tick-duration=${SchedulerTickDuration.toMillis}ms").withFallback( - system.settings.config), - log, - system.threadFactory match { - case tf: MonitorableThreadFactory ⇒ tf.withName(tf.name + "-cluster-scheduler") - case tf ⇒ tf - }) + + val cfg = ConfigFactory.parseString( + s"akka.scheduler.tick-duration=${SchedulerTickDuration.toMillis}ms").withFallback( + system.settings.config) + val threadFactory = system.threadFactory match { + case tf: MonitorableThreadFactory ⇒ tf.withName(tf.name + "-cluster-scheduler") + case tf ⇒ tf + } + system.dynamicAccess.createInstanceFor[Scheduler](system.settings.SchedulerClass, immutable.Seq( + classOf[Config] -> cfg, + classOf[LoggingAdapter] -> log, + classOf[ThreadFactory] -> threadFactory)).get } else { // delegate to system.scheduler, but don't close over system val systemScheduler = system.scheduler @@ -279,7 +285,7 @@ class Cluster(val system: ExtendedActorSystem) extends Extension { system.stop(clusterDaemons) if (readViewStarted) readView.close() - scheduler.close() + closeScheduler() clusterJmx foreach { _.unregisterMBean() } @@ -287,4 +293,9 @@ class Cluster(val system: ExtendedActorSystem) extends Extension { } } + private def closeScheduler(): Unit = scheduler match { + case x: Closeable ⇒ x.close() + case _ ⇒ + } + }