Merge pull request #1027 from akka/wip-2904-timer-∂π

first cut of new AkkaTimer, see #2904
This commit is contained in:
Roland Kuhn 2013-01-23 11:06:41 -08:00
commit 7066b37077
19 changed files with 861 additions and 200 deletions

View file

@ -23,6 +23,7 @@ import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.AtomicReference
import akka.util.internal.HashedWheelTimer
import concurrent.{ ExecutionContext, Await }
import com.typesafe.config.ConfigFactory
/**
* Cluster Extension Id and factory for creating Cluster extension.
@ -88,31 +89,26 @@ class Cluster(val system: ExtendedActorSystem) extends Extension {
* INTERNAL API
*/
private[cluster] val scheduler: Scheduler with Closeable = {
if (system.settings.SchedulerTickDuration > SchedulerTickDuration) {
if (system.scheduler.maxFrequency < 1.second / SchedulerTickDuration) {
import scala.collection.JavaConverters._
log.info("Using a dedicated scheduler for cluster. Default scheduler can be used if configured " +
"with 'akka.scheduler.tick-duration' [{} ms] <= 'akka.cluster.scheduler.tick-duration' [{} ms].",
system.settings.SchedulerTickDuration.toMillis, SchedulerTickDuration.toMillis)
1000 / system.scheduler.maxFrequency, SchedulerTickDuration.toMillis)
new DefaultScheduler(
new HashedWheelTimer(log,
system.threadFactory match {
case tf: MonitorableThreadFactory tf.withName(tf.name + "-cluster-scheduler")
case tf tf
},
SchedulerTickDuration,
SchedulerTicksPerWheel),
log)
ConfigFactory.parseString(s"akka.scheduler.tick-duration=${SchedulerTickDuration.toMillis}ms").withFallback(
system.settings.config),
log,
system.threadFactory match {
case tf: MonitorableThreadFactory tf.withName(tf.name + "-cluster-scheduler")
case tf tf
})
} else {
// delegate to system.scheduler, but don't close over system
val systemScheduler = system.scheduler
new Scheduler with Closeable {
override def close(): Unit = () // we are using system.scheduler, which we are not responsible for closing
override def schedule(initialDelay: FiniteDuration, interval: FiniteDuration,
receiver: ActorRef, message: Any)(implicit executor: ExecutionContext, sender: ActorRef = Actor.noSender): Cancellable =
systemScheduler.schedule(initialDelay, interval, receiver, message)
override def schedule(initialDelay: FiniteDuration, interval: FiniteDuration)(f: Unit)(implicit executor: ExecutionContext): Cancellable =
systemScheduler.schedule(initialDelay, interval)(f)
override def maxFrequency: Double = systemScheduler.maxFrequency
override def schedule(initialDelay: FiniteDuration, interval: FiniteDuration,
runnable: Runnable)(implicit executor: ExecutionContext): Cancellable =
@ -121,13 +117,6 @@ class Cluster(val system: ExtendedActorSystem) extends Extension {
override def scheduleOnce(delay: FiniteDuration,
runnable: Runnable)(implicit executor: ExecutionContext): Cancellable =
systemScheduler.scheduleOnce(delay, runnable)
override def scheduleOnce(delay: FiniteDuration, receiver: ActorRef,
message: Any)(implicit executor: ExecutionContext): Cancellable =
systemScheduler.scheduleOnce(delay, receiver, message)
override def scheduleOnce(delay: FiniteDuration)(f: Unit)(implicit executor: ExecutionContext): Cancellable =
systemScheduler.scheduleOnce(delay)(f)
}
}
}