Merge pull request #1128 from akka/wip-3034-cluster-lars-patriknw
Create cluster scheduler from configured scheduler class, see #3034
This commit is contained in:
commit
0ff5f2d19a
1 changed files with 21 additions and 10 deletions
|
|
@ -27,6 +27,8 @@ import com.typesafe.config.ConfigFactory
|
||||||
import akka.remote.DefaultFailureDetectorRegistry
|
import akka.remote.DefaultFailureDetectorRegistry
|
||||||
import akka.remote.FailureDetector
|
import akka.remote.FailureDetector
|
||||||
import com.typesafe.config.Config
|
import com.typesafe.config.Config
|
||||||
|
import akka.event.LoggingAdapter
|
||||||
|
import java.util.concurrent.ThreadFactory
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Cluster Extension Id and factory for creating Cluster extension.
|
* Cluster Extension Id and factory for creating Cluster extension.
|
||||||
|
|
@ -96,20 +98,24 @@ class Cluster(val system: ExtendedActorSystem) extends Extension {
|
||||||
/**
|
/**
|
||||||
* INTERNAL API
|
* INTERNAL API
|
||||||
*/
|
*/
|
||||||
private[cluster] val scheduler: Scheduler with Closeable = {
|
private[cluster] val scheduler: Scheduler = {
|
||||||
if (system.scheduler.maxFrequency < 1.second / SchedulerTickDuration) {
|
if (system.scheduler.maxFrequency < 1.second / SchedulerTickDuration) {
|
||||||
import scala.collection.JavaConverters._
|
import scala.collection.JavaConverters._
|
||||||
log.info("Using a dedicated scheduler for cluster. Default scheduler can be used if configured " +
|
log.info("Using a dedicated scheduler for cluster. Default scheduler can be used if configured " +
|
||||||
"with 'akka.scheduler.tick-duration' [{} ms] <= 'akka.cluster.scheduler.tick-duration' [{} ms].",
|
"with 'akka.scheduler.tick-duration' [{} ms] <= 'akka.cluster.scheduler.tick-duration' [{} ms].",
|
||||||
(1000 / system.scheduler.maxFrequency).toInt, SchedulerTickDuration.toMillis)
|
(1000 / system.scheduler.maxFrequency).toInt, SchedulerTickDuration.toMillis)
|
||||||
new DefaultScheduler(
|
|
||||||
ConfigFactory.parseString(s"akka.scheduler.tick-duration=${SchedulerTickDuration.toMillis}ms").withFallback(
|
val cfg = ConfigFactory.parseString(
|
||||||
system.settings.config),
|
s"akka.scheduler.tick-duration=${SchedulerTickDuration.toMillis}ms").withFallback(
|
||||||
log,
|
system.settings.config)
|
||||||
system.threadFactory match {
|
val threadFactory = system.threadFactory match {
|
||||||
case tf: MonitorableThreadFactory ⇒ tf.withName(tf.name + "-cluster-scheduler")
|
case tf: MonitorableThreadFactory ⇒ tf.withName(tf.name + "-cluster-scheduler")
|
||||||
case tf ⇒ tf
|
case tf ⇒ tf
|
||||||
})
|
}
|
||||||
|
system.dynamicAccess.createInstanceFor[Scheduler](system.settings.SchedulerClass, immutable.Seq(
|
||||||
|
classOf[Config] -> cfg,
|
||||||
|
classOf[LoggingAdapter] -> log,
|
||||||
|
classOf[ThreadFactory] -> threadFactory)).get
|
||||||
} else {
|
} else {
|
||||||
// delegate to system.scheduler, but don't close over system
|
// delegate to system.scheduler, but don't close over system
|
||||||
val systemScheduler = system.scheduler
|
val systemScheduler = system.scheduler
|
||||||
|
|
@ -279,7 +285,7 @@ class Cluster(val system: ExtendedActorSystem) extends Extension {
|
||||||
system.stop(clusterDaemons)
|
system.stop(clusterDaemons)
|
||||||
if (readViewStarted) readView.close()
|
if (readViewStarted) readView.close()
|
||||||
|
|
||||||
scheduler.close()
|
closeScheduler()
|
||||||
|
|
||||||
clusterJmx foreach { _.unregisterMBean() }
|
clusterJmx foreach { _.unregisterMBean() }
|
||||||
|
|
||||||
|
|
@ -287,4 +293,9 @@ class Cluster(val system: ExtendedActorSystem) extends Extension {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private def closeScheduler(): Unit = scheduler match {
|
||||||
|
case x: Closeable ⇒ x.close()
|
||||||
|
case _ ⇒
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue