#1692 - Adding config option for making the Scheduler daemonic
This commit is contained in:
parent
c4b4302266
commit
03bc15feb1
4 changed files with 21 additions and 8 deletions
|
|
@ -36,9 +36,18 @@ class ConfigSpec extends AkkaSpec(ConfigFactory.defaultReference) {
|
||||||
getMilliseconds("akka.actor.default-dispatcher.shutdown-timeout") must equal(1 * 1000)
|
getMilliseconds("akka.actor.default-dispatcher.shutdown-timeout") must equal(1 * 1000)
|
||||||
getInt("akka.actor.default-dispatcher.throughput") must equal(5)
|
getInt("akka.actor.default-dispatcher.throughput") must equal(5)
|
||||||
getMilliseconds("akka.actor.default-dispatcher.throughput-deadline-time") must equal(0)
|
getMilliseconds("akka.actor.default-dispatcher.throughput-deadline-time") must equal(0)
|
||||||
|
|
||||||
getBoolean("akka.actor.serialize-messages") must equal(false)
|
getBoolean("akka.actor.serialize-messages") must equal(false)
|
||||||
settings.SerializeAllMessages must equal(false)
|
settings.SerializeAllMessages must equal(false)
|
||||||
|
|
||||||
|
getInt("akka.scheduler.ticksPerWheel") must equal(512)
|
||||||
|
settings.SchedulerTicksPerWheel must equal(512)
|
||||||
|
|
||||||
|
getMilliseconds("akka.scheduler.tickDuration") must equal(100)
|
||||||
|
settings.SchedulerTickDuration must equal(100 millis)
|
||||||
|
|
||||||
|
getBoolean("akka.scheduler.daemonic") must equal(true)
|
||||||
|
settings.SchedulerDaemonicity must equal(true)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -271,5 +271,6 @@ akka {
|
||||||
# For more information see: http://www.jboss.org/netty/
|
# For more information see: http://www.jboss.org/netty/
|
||||||
tickDuration = 100ms
|
tickDuration = 100ms
|
||||||
ticksPerWheel = 512
|
ticksPerWheel = 512
|
||||||
|
daemonic = on
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -96,6 +96,7 @@ object ActorSystem {
|
||||||
|
|
||||||
final val SchedulerTickDuration = Duration(getMilliseconds("akka.scheduler.tickDuration"), MILLISECONDS)
|
final val SchedulerTickDuration = Duration(getMilliseconds("akka.scheduler.tickDuration"), MILLISECONDS)
|
||||||
final val SchedulerTicksPerWheel = getInt("akka.scheduler.ticksPerWheel")
|
final val SchedulerTicksPerWheel = getInt("akka.scheduler.ticksPerWheel")
|
||||||
|
final val SchedulerDaemonicity = getBoolean("akka.scheduler.daemonic")
|
||||||
|
|
||||||
if (ConfigVersion != Version)
|
if (ConfigVersion != Version)
|
||||||
throw new ConfigurationException("Akka JAR version [" + Version + "] does not match the provided config version [" + ConfigVersion + "]")
|
throw new ConfigurationException("Akka JAR version [" + Version + "] does not match the provided config version [" + ConfigVersion + "]")
|
||||||
|
|
@ -408,18 +409,18 @@ class ActorSystemImpl(val name: String, applicationConfig: Config) extends Actor
|
||||||
* executed upon close(), the task may execute before its timeout.
|
* executed upon close(), the task may execute before its timeout.
|
||||||
*/
|
*/
|
||||||
protected def createScheduler(): Scheduler = {
|
protected def createScheduler(): Scheduler = {
|
||||||
val threadFactory = new MonitorableThreadFactory("DefaultScheduler")
|
val hwt = new HashedWheelTimer(log,
|
||||||
val hwt = new HashedWheelTimer(log, threadFactory, settings.SchedulerTickDuration, settings.SchedulerTicksPerWheel)
|
new MonitorableThreadFactory("DefaultScheduler", settings.SchedulerDaemonicity),
|
||||||
|
settings.SchedulerTickDuration,
|
||||||
|
settings.SchedulerTicksPerWheel)
|
||||||
// note that dispatcher is by-name parameter in DefaultScheduler constructor,
|
// note that dispatcher is by-name parameter in DefaultScheduler constructor,
|
||||||
// because dispatcher is not initialized when the scheduler is created
|
// because dispatcher is not initialized when the scheduler is created
|
||||||
def safeDispatcher = {
|
def safeDispatcher = dispatcher match {
|
||||||
if (dispatcher eq null) {
|
case null ⇒
|
||||||
val exc = new IllegalStateException("Scheduler is using dispatcher before it has been initialized")
|
val exc = new IllegalStateException("Scheduler is using dispatcher before it has been initialized")
|
||||||
log.error(exc, exc.getMessage)
|
log.error(exc, exc.getMessage)
|
||||||
throw exc
|
throw exc
|
||||||
} else {
|
case dispatcher ⇒ dispatcher
|
||||||
dispatcher
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
new DefaultScheduler(hwt, log, safeDispatcher)
|
new DefaultScheduler(hwt, log, safeDispatcher)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -123,7 +123,9 @@ trait Cancellable {
|
||||||
* if it does not enqueue a task. Once a task is queued, it MUST be executed or
|
* if it does not enqueue a task. Once a task is queued, it MUST be executed or
|
||||||
* returned from stop().
|
* returned from stop().
|
||||||
*/
|
*/
|
||||||
class DefaultScheduler(hashedWheelTimer: HashedWheelTimer, log: LoggingAdapter, dispatcher: ⇒ MessageDispatcher) extends Scheduler with Closeable {
|
class DefaultScheduler(hashedWheelTimer: HashedWheelTimer,
|
||||||
|
log: LoggingAdapter,
|
||||||
|
dispatcher: ⇒ MessageDispatcher) extends Scheduler with Closeable {
|
||||||
|
|
||||||
def schedule(initialDelay: Duration, delay: Duration, receiver: ActorRef, message: Any): Cancellable = {
|
def schedule(initialDelay: Duration, delay: Duration, receiver: ActorRef, message: Any): Cancellable = {
|
||||||
val continuousCancellable = new ContinuousCancellable
|
val continuousCancellable = new ContinuousCancellable
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue