LightArrayRevolverScheduler, see #2904

- based on a wheel (AtomicReferenceArray) from which atomic
  single-linked lists dangle
- no locks
- deterministic tests due to overridable time source
- also bring docs up to date
This commit is contained in:
Roland 2013-01-14 23:21:51 +01:00
parent 9f2a0afc05
commit 8dea20a1f1
20 changed files with 863 additions and 202 deletions

View file

@ -1,45 +1,37 @@
package akka.actor
import language.postfixOps
import com.typesafe.config.ConfigFactory
import org.scalatest.BeforeAndAfterEach
import scala.concurrent.duration._
import java.util.concurrent.{ CountDownLatch, ConcurrentLinkedQueue, TimeUnit }
import akka.testkit._
import scala.concurrent.Await
import akka.pattern.ask
import java.io.Closeable
import java.util.concurrent._
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.TimeoutException
import scala.concurrent.{ future, Await, ExecutionContext }
import scala.concurrent.duration._
import scala.util.Try
import scala.util.control.NonFatal
import org.scalatest.BeforeAndAfterEach
import com.typesafe.config.{ Config, ConfigFactory }
import akka.pattern.ask
import akka.testkit._
object SchedulerSpec {
val testConf = ConfigFactory.parseString("""
akka.scheduler.class = akka.actor.DefaultScheduler
akka.scheduler.ticks-per-wheel = 32
""").withFallback(AkkaSpec.testConf)
val testConfRevolver = ConfigFactory.parseString("""
akka.scheduler.class = akka.actor.LightArrayRevolverScheduler
""").withFallback(testConf)
}
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class SchedulerSpec extends AkkaSpec(SchedulerSpec.testConf) with BeforeAndAfterEach with DefaultTimeout with ImplicitSender {
private val cancellables = new ConcurrentLinkedQueue[Cancellable]()
trait SchedulerSpec extends BeforeAndAfterEach with DefaultTimeout with ImplicitSender { this: AkkaSpec
import system.dispatcher
def collectCancellable(c: Cancellable): Cancellable = {
cancellables.add(c)
c
}
override def afterEach {
while (cancellables.peek() ne null) {
for (c Option(cancellables.poll())) {
c.cancel()
c.isCancelled must be === true
}
}
}
def collectCancellable(c: Cancellable): Cancellable
"A Scheduler" must {
"schedule more than once" in {
"schedule more than once" taggedAs TimingTest in {
case object Tick
case object Tock
@ -84,7 +76,7 @@ class SchedulerSpec extends AkkaSpec(SchedulerSpec.testConf) with BeforeAndAfter
expectNoMsg(500 millis)
}
"schedule once" in {
"schedule once" taggedAs TimingTest in {
case object Tick
val countDownLatch = new CountDownLatch(3)
val tickActor = system.actorOf(Props(new Actor {
@ -108,7 +100,7 @@ class SchedulerSpec extends AkkaSpec(SchedulerSpec.testConf) with BeforeAndAfter
/**
* ticket #372
*/
"be cancellable" in {
"be cancellable" taggedAs TimingTest in {
for (_ 1 to 10) system.scheduler.scheduleOnce(1 second, testActor, "fail").cancel()
expectNoMsg(2 seconds)
@ -132,12 +124,12 @@ class SchedulerSpec extends AkkaSpec(SchedulerSpec.testConf) with BeforeAndAfter
"be cancellable after initial delay" taggedAs TimingTest in {
val ticks = new AtomicInteger
val initialDelay = 20.milliseconds.dilated
val delay = 200.milliseconds.dilated
val initialDelay = 90.milliseconds.dilated
val delay = 500.milliseconds.dilated
val timeout = collectCancellable(system.scheduler.schedule(initialDelay, delay) {
ticks.incrementAndGet()
})
Thread.sleep((initialDelay + 100.milliseconds.dilated).toMillis)
Thread.sleep((initialDelay + 200.milliseconds.dilated).toMillis)
timeout.cancel()
Thread.sleep((delay + 100.milliseconds.dilated).toMillis)
@ -147,7 +139,7 @@ class SchedulerSpec extends AkkaSpec(SchedulerSpec.testConf) with BeforeAndAfter
/**
* ticket #307
*/
"pick up schedule after actor restart" in {
"pick up schedule after actor restart" taggedAs TimingTest in {
object Ping
object Crash
@ -177,7 +169,7 @@ class SchedulerSpec extends AkkaSpec(SchedulerSpec.testConf) with BeforeAndAfter
Await.ready(pingLatch, 5 seconds)
}
"never fire prematurely" in {
"never fire prematurely" taggedAs TimingTest in {
val ticks = new TestLatch(300)
case class Msg(ts: Long)
@ -238,6 +230,28 @@ class SchedulerSpec extends AkkaSpec(SchedulerSpec.testConf) with BeforeAndAfter
// Rate
n * 1000.0 / (System.nanoTime - startTime).nanos.toMillis must be(4.4 plusOrMinus 0.3)
}
"survive being stressed without cancellation" taggedAs TimingTest in {
val r = ThreadLocalRandom.current()
val N = 100000
for (_ 1 to N) {
val next = r.nextInt(3000)
val now = System.nanoTime
system.scheduler.scheduleOnce(next.millis) {
val stop = System.nanoTime
testActor ! (stop - now - next * 1000000L)
}
}
val latencies = within(5.seconds) {
for (i 1 to N) yield try expectMsgType[Long] catch {
case NonFatal(e) throw new Exception(s"failed expecting the $i-th latency", e)
}
}
val histogram = latencies groupBy (_ / 100000000L)
for (k histogram.keys.toSeq.sorted) {
system.log.info(f"${k * 100}%3d: ${histogram(k).size}")
}
}
}
"A HashedWheelTimer" must {
@ -267,3 +281,218 @@ class SchedulerSpec extends AkkaSpec(SchedulerSpec.testConf) with BeforeAndAfter
}
}
}
class DefaultSchedulerSpec extends AkkaSpec(SchedulerSpec.testConf) with SchedulerSpec {
private val cancellables = new ConcurrentLinkedQueue[Cancellable]()
def collectCancellable(c: Cancellable): Cancellable = {
cancellables.add(c)
c
}
override def afterEach {
while (cancellables.peek() ne null) {
for (c Option(cancellables.poll())) {
c.cancel()
c.isCancelled must be === true
}
}
}
}
class LightArrayRevolverSchedulerSpec extends AkkaSpec(SchedulerSpec.testConfRevolver) with SchedulerSpec {
def collectCancellable(c: Cancellable): Cancellable = c
"A LightArrayRevolverScheduler" must {
"survive being stressed with cancellation" taggedAs TimingTest in {
import system.dispatcher
val r = ThreadLocalRandom.current
val N = 1000000
val tasks = for (_ 1 to N) yield {
val next = r.nextInt(3000)
val now = System.nanoTime
system.scheduler.scheduleOnce(next.millis) {
val stop = System.nanoTime
testActor ! (stop - now - next * 1000000L)
}
}
// get somewhat into the middle of things
Thread.sleep(500)
val cancellations = for (t tasks) yield {
t.cancel()
if (t.isCancelled) 1 else 0
}
val cancelled = cancellations.sum
println(cancelled)
val latencies = within(5.seconds) {
for (i 1 to (N - cancelled)) yield try expectMsgType[Long] catch {
case NonFatal(e) throw new Exception(s"failed expecting the $i-th latency", e)
}
}
val histogram = latencies groupBy (_ / 100000000L)
for (k histogram.keys.toSeq.sorted) {
system.log.info(f"${k * 100}%3d: ${histogram(k).size}")
}
expectNoMsg(1.second)
}
"survive vicious enqueueing" in {
withScheduler(config = ConfigFactory.parseString("akka.scheduler.ticks-per-wheel=2")) { (sched, driver)
import driver._
import system.dispatcher
val counter = new AtomicInteger
val terminated = future {
var rounds = 0
while (Try(sched.scheduleOnce(Duration.Zero)(())(localEC)).isSuccess) {
Thread.sleep(1)
driver.wakeUp(step)
rounds += 1
}
rounds
}
def delay = if (ThreadLocalRandom.current.nextBoolean) step * 2 else step
val N = 1000000
(1 to N) foreach (_ sched.scheduleOnce(delay)(counter.incrementAndGet()))
sched.close()
Await.result(terminated, 3.seconds.dilated) must be > 10
awaitCond(counter.get == N)
}
}
"execute multiple jobs at once when expiring multiple buckets" in {
withScheduler() { (sched, driver)
implicit def ec = localEC
import driver._
val start = step / 2
(0 to 3) foreach (i sched.scheduleOnce(start + step * i, testActor, "hello"))
expectNoMsg(step)
wakeUp(step)
expectWait(step)
wakeUp(step * 4 + step / 2)
expectWait(step / 2)
(0 to 3) foreach (_ expectMsg(Duration.Zero, "hello"))
}
}
"correctly wrap around wheel rounds" in {
withScheduler(config = ConfigFactory.parseString("akka.scheduler.ticks-per-wheel=2")) { (sched, driver)
implicit def ec = localEC
import driver._
val start = step / 2
(0 to 3) foreach (i sched.scheduleOnce(start + step * i, probe.ref, "hello"))
probe.expectNoMsg(step)
wakeUp(step)
expectWait(step)
// the following are no for-comp to see which iteration fails
wakeUp(step)
probe.expectMsg("hello")
expectWait(step)
wakeUp(step)
probe.expectMsg("hello")
expectWait(step)
wakeUp(step)
probe.expectMsg("hello")
expectWait(step)
wakeUp(step)
probe.expectMsg("hello")
expectWait(step)
wakeUp(step)
expectWait(step)
}
}
"correctly execute jobs when clock wraps around" in {
withScheduler(Long.MaxValue - 200000000L) { (sched, driver)
implicit def ec = localEC
import driver._
val start = step / 2
(0 to 3) foreach (i sched.scheduleOnce(start + step * i, testActor, "hello"))
expectNoMsg(step)
wakeUp(step)
expectWait(step)
// the following are no for-comp to see which iteration fails
wakeUp(step)
expectMsg("hello")
expectWait(step)
wakeUp(step)
expectMsg("hello")
expectWait(step)
wakeUp(step)
expectMsg("hello")
expectWait(step)
wakeUp(step)
expectMsg("hello")
expectWait(step)
wakeUp(step)
expectWait(step)
}
}
"reliably reject jobs when shutting down" in {
withScheduler() { (sched, driver)
import system.dispatcher
val counter = new AtomicInteger
future { Thread.sleep(5); sched.close() }
val headroom = 200
var overrun = headroom
val cap = 1000000
val (success, failure) = Iterator
.continually(Try(sched.scheduleOnce(100.millis)(counter.incrementAndGet())))
.take(cap)
.takeWhile(_.isSuccess || { overrun -= 1; overrun >= 0 })
.partition(_.isSuccess)
val s = success.size
s must be < cap
awaitCond(s == counter.get, message = s"$s was not ${counter.get}")
failure.size must be === headroom
}
}
}
trait Driver {
def wakeUp(d: FiniteDuration): Unit
def expectWait(): FiniteDuration
def expectWait(d: FiniteDuration) { expectWait() must be(d) }
def probe: TestProbe
def step: FiniteDuration
}
val localEC = new ExecutionContext {
def execute(runnable: Runnable) { runnable.run() }
def reportFailure(t: Throwable) { t.printStackTrace() }
}
def withScheduler(start: Long = 0L, config: Config = ConfigFactory.empty)(thunk: (Scheduler with Closeable, Driver) Unit): Unit = {
import akka.actor.{ LightArrayRevolverScheduler LARS }
val lbq = new LinkedBlockingQueue[Long]
val prb = TestProbe()
val tf = system.asInstanceOf[ActorSystemImpl].threadFactory
val sched =
new { @volatile var time = start } with LARS(config.withFallback(system.settings.config), log, tf) {
override protected def clock(): Long = {
// println(s"clock=$time")
time
}
override protected def waitNanos(ns: Long): Unit = {
// println(s"waiting $ns")
prb.ref ! ns
try time += lbq.take()
catch {
case _: InterruptedException
}
}
}
val driver = new Driver {
def wakeUp(d: FiniteDuration) { lbq.offer(d.toNanos) }
def expectWait(): FiniteDuration = probe.expectMsgType[Long].nanos
def probe = prb
def step = sched.TickDuration
}
driver.expectWait()
try thunk(sched, driver)
finally sched.close()
}
}

View file

@ -32,10 +32,8 @@ class ConfigSpec extends AkkaSpec(ConfigFactory.defaultReference(ActorSystem.fin
settings.SerializeAllMessages must equal(false)
getInt("akka.scheduler.ticks-per-wheel") must equal(512)
settings.SchedulerTicksPerWheel must equal(512)
getMilliseconds("akka.scheduler.tick-duration") must equal(100)
settings.SchedulerTickDuration must equal(100 millis)
getString("akka.scheduler.implementation") must equal("akka.actor.LightArrayRevolverScheduler")
getBoolean("akka.daemonic") must be(false)
settings.Daemonicity must be(false)

View file

@ -0,0 +1,53 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.actor;
import scala.concurrent.ExecutionContext;
import scala.concurrent.duration.FiniteDuration;
//#scheduler
/**
* An Akka scheduler service. This one needs one special behavior: if
* Closeable, it MUST execute all outstanding tasks upon .close() in order
* to properly shutdown all dispatchers.
*
* Furthermore, this timer service MUST throw IllegalStateException if it
* cannot schedule a task. Once scheduled, the task MUST be executed. If
* executed upon close(), the task may execute before its timeout.
*
* Scheduler implementation are loaded reflectively at ActorSystem start-up
* with the following constructor arguments:
* 1) the systems com.typesafe.config.Config (from system.settings.config)
* 2) a akka.event.LoggingAdapter
* 3) a java.util.concurrent.ThreadFactory
*/
public abstract class AbstractScheduler extends AbstractSchedulerBase {
/**
* Schedules a function to be run repeatedly with an initial delay and
* a frequency. E.g. if you would like the function to be run after 2
* seconds and thereafter every 100ms you would set delay = Duration(2,
* TimeUnit.SECONDS) and interval = Duration(100, TimeUnit.MILLISECONDS)
*/
@Override
public abstract Cancellable schedule(FiniteDuration initialDelay,
FiniteDuration interval, Runnable runnable, ExecutionContext executor);
/**
* Schedules a Runnable to be run once with a delay, i.e. a time period that
* has to pass before the runnable is executed.
*/
@Override
public abstract Cancellable scheduleOnce(FiniteDuration delay, Runnable runnable,
ExecutionContext executor);
/**
* The maximum supported task frequency of this scheduler, i.e. the inverse
* of the minimum time interval between executions of a recurring task, in Hz.
*/
@Override
public abstract double maxFrequency();
}
//#scheduler

View file

@ -457,10 +457,11 @@ public class HashedWheelTimer implements Timer {
return Unsafe.instance.compareAndSwapInt(this, _stateOffset, old, future);
}
public void cancel() {
public boolean cancel() {
if (updateState(ST_INIT, ST_CANCELLED)) {
parent.wheel[stopIndex].remove(this);
}
return true;
} else return false;
}
public boolean isCancelled() {

View file

@ -51,6 +51,10 @@ public interface Timeout {
* Cancels the {@link TimerTask} associated with this handle. It the
* task has been executed or cancelled already, it will return with no
* side effect.
*
* @return whether the caller was the one who actually cancelled this
* timeout (there can be at most one; never returns true if the Timeout
* expired)
*/
void cancel();
boolean cancel();
}

View file

@ -364,7 +364,31 @@ akka {
# ticks per wheel.
# For more information see: http://www.jboss.org/netty/
tick-duration = 100ms
# The timer uses a circular wheel of buckets to store the timer tasks.
# This should be set such that the majority of scheduled timeouts (for high
# scheduling frequency) will be shorter than one rotation of the wheel
# (ticks-per-wheel * ticks-duration)
# THIS MUST BE A POWER OF TWO!
ticks-per-wheel = 512
# This setting selects the timer implementation which shall be loaded at
# system start-up. Built-in choices are:
# - akka.actor.DefaultScheduler (HWT)
# - akka.actor.LightArrayRevolverScheduler
# (to be benchmarked and evaluated)
# The class given here must implement the akka.actor.Scheduler interface
# and offer a constructor which takes three arguments:
# 1) com.typesafe.config.Config
# 2) akka.event.LoggingAdapter
# 3) java.util.concurrent.ThreadFactory
implementation = akka.actor.LightArrayRevolverScheduler
# When shutting down the scheduler, there will typically be a thread which
# needs to be stopped, and this timeout determines how long to wait for
# that to happen. In case of timeout the shutdown of the actor system will
# proceed without running possibly still enqueued tasks.
shutdown-timeout = 5s
}
io {

View file

@ -286,8 +286,8 @@ private[akka] object ActorCell {
}
final val emptyCancellable: Cancellable = new Cancellable {
def isCancelled = false
def cancel() {}
def isCancelled: Boolean = false
def cancel(): Boolean = false
}
final val emptyBehaviorStack: List[Actor.Receive] = Nil

View file

@ -20,6 +20,7 @@ import akka.util.internal.{ HashedWheelTimer, ConcurrentIdentityHashMap }
import java.util.concurrent.{ ThreadFactory, CountDownLatch, TimeoutException, RejectedExecutionException }
import java.util.concurrent.TimeUnit.MILLISECONDS
import akka.actor.dungeon.ChildrenContainer
import scala.concurrent.ExecutionContext
object ActorSystem {
@ -161,8 +162,7 @@ object ActorSystem {
case x Some(x)
}
final val SchedulerTickDuration: FiniteDuration = Duration(getMilliseconds("akka.scheduler.tick-duration"), MILLISECONDS)
final val SchedulerTicksPerWheel: Int = getInt("akka.scheduler.ticks-per-wheel")
final val SchedulerClass: String = getString("akka.scheduler.implementation")
final val Daemonicity: Boolean = getBoolean("akka.daemonic")
final val JvmExitOnFatalError: Boolean = getBoolean("akka.jvm-exit-on-fatal-error")
@ -601,6 +601,7 @@ private[akka] class ActorSystemImpl(val name: String, applicationConfig: Config,
def shutdown(): Unit = guardian.stop()
//#create-scheduler
/**
* Create the scheduler service. This one needs one special behavior: if
* Closeable, it MUST execute all outstanding tasks upon .close() in order
@ -611,12 +612,11 @@ private[akka] class ActorSystemImpl(val name: String, applicationConfig: Config,
* executed upon close(), the task may execute before its timeout.
*/
protected def createScheduler(): Scheduler =
new DefaultScheduler(
new HashedWheelTimer(log,
threadFactory.withName(threadFactory.name + "-scheduler"),
settings.SchedulerTickDuration,
settings.SchedulerTicksPerWheel),
log)
dynamicAccess.createInstanceFor[Scheduler](settings.SchedulerClass, immutable.Seq(
classOf[Config] -> settings.config,
classOf[LoggingAdapter] -> log,
classOf[ThreadFactory] -> threadFactory.withName(threadFactory.name + "-scheduler"))).get
//#create-scheduler
/*
* This is called after the last actor has signaled its termination, i.e.
@ -635,8 +635,10 @@ private[akka] class ActorSystemImpl(val name: String, applicationConfig: Config,
*/
@tailrec
private def findExtension[T <: Extension](ext: ExtensionId[T]): T = extensions.get(ext) match {
case c: CountDownLatch c.await(); findExtension(ext) //Registration in process, await completion and retry
case other other.asInstanceOf[T] //could be a T or null, in which case we return the null as T
case c: CountDownLatch
c.await(); findExtension(ext) //Registration in process, await completion and retry
case other
other.asInstanceOf[T] //could be a T or null, in which case we return the null as T
}
@tailrec

View file

@ -4,16 +4,28 @@
package akka.actor
import scala.concurrent.duration.Duration
import akka.util.internal.{ TimerTask, HashedWheelTimer, Timeout HWTimeout, Timer }
import akka.event.LoggingAdapter
import akka.dispatch.MessageDispatcher
import java.io.Closeable
import java.util.concurrent.atomic.{ AtomicReference, AtomicLong }
import java.util.concurrent.ThreadFactory
import java.util.concurrent.atomic.{ AtomicLong, AtomicReference, AtomicReferenceArray }
import scala.annotation.tailrec
import akka.util.internal._
import concurrent.ExecutionContext
import scala.concurrent.duration.FiniteDuration
import scala.collection.immutable
import scala.concurrent.{ Await, ExecutionContext, Future, Promise }
import scala.concurrent.duration._
import scala.util.control.{ NoStackTrace, NonFatal }
import com.typesafe.config.Config
import akka.event.LoggingAdapter
import akka.util.Helpers
import akka.util.Unsafe.{ instance unsafe }
import akka.util.internal.{ HashedWheelTimer, Timeout HWTimeout, Timer HWTimer, TimerTask HWTimerTask }
/**
* This exception is thrown by Scheduler.schedule* when scheduling is not
* possible, e.g. after shutting down the Scheduler.
*/
private case class SchedulerException(msg: String) extends akka.AkkaException(msg) with NoStackTrace
// The Scheduler trait is included in the documentation. KEEP THE LINES SHORT!!!
//#scheduler
@ -25,6 +37,12 @@ import scala.concurrent.duration.FiniteDuration
* Furthermore, this timer service MUST throw IllegalStateException if it
* cannot schedule a task. Once scheduled, the task MUST be executed. If
* executed upon close(), the task may execute before its timeout.
*
* Scheduler implementation are loaded reflectively at ActorSystem start-up
* with the following constructor arguments:
* 1) the systems com.typesafe.config.Config (from system.settings.config)
* 2) a akka.event.LoggingAdapter
* 3) a java.util.concurrent.ThreadFactory
*/
trait Scheduler {
/**
@ -35,11 +53,19 @@ trait Scheduler {
*
* Java & Scala API
*/
def schedule(
final def schedule(
initialDelay: FiniteDuration,
interval: FiniteDuration,
receiver: ActorRef,
message: Any)(implicit executor: ExecutionContext, sender: ActorRef = Actor.noSender): Cancellable
message: Any)(implicit executor: ExecutionContext,
sender: ActorRef = Actor.noSender): Cancellable =
schedule(initialDelay, interval, new Runnable {
def run = {
receiver ! message
if (receiver.isTerminated)
throw new SchedulerException("timer active for terminated actor")
}
})
/**
* Schedules a function to be run repeatedly with an initial delay and a
@ -49,10 +75,11 @@ trait Scheduler {
*
* Scala API
*/
def schedule(
final def schedule(
initialDelay: FiniteDuration,
interval: FiniteDuration)(f: Unit)(
implicit executor: ExecutionContext): Cancellable
implicit executor: ExecutionContext): Cancellable =
schedule(initialDelay, interval, new Runnable { override def run = f })
/**
* Schedules a function to be run repeatedly with an initial delay and
@ -67,6 +94,31 @@ trait Scheduler {
interval: FiniteDuration,
runnable: Runnable)(implicit executor: ExecutionContext): Cancellable
/**
* Schedules a message to be sent once with a delay, i.e. a time period that has
* to pass before the message is sent.
*
* Java & Scala API
*/
final def scheduleOnce(
delay: FiniteDuration,
receiver: ActorRef,
message: Any)(implicit executor: ExecutionContext,
sender: ActorRef = Actor.noSender): Cancellable =
scheduleOnce(delay, new Runnable {
override def run = receiver ! message
})
/**
* Schedules a function to be run once with a delay, i.e. a time period that has
* to pass before the function is run.
*
* Scala API
*/
final def scheduleOnce(delay: FiniteDuration)(f: Unit)(
implicit executor: ExecutionContext): Cancellable =
scheduleOnce(delay, new Runnable { override def run = f })
/**
* Schedules a Runnable to be run once with a delay, i.e. a time period that
* has to pass before the runnable is executed.
@ -78,28 +130,17 @@ trait Scheduler {
runnable: Runnable)(implicit executor: ExecutionContext): Cancellable
/**
* Schedules a message to be sent once with a delay, i.e. a time period that has
* to pass before the message is sent.
*
* Java & Scala API
* The maximum supported task frequency of this scheduler, i.e. the inverse
* of the minimum time interval between executions of a recurring task, in Hz.
*/
def scheduleOnce(
delay: FiniteDuration,
receiver: ActorRef,
message: Any)(implicit executor: ExecutionContext): Cancellable
def maxFrequency: Double
/**
* Schedules a function to be run once with a delay, i.e. a time period that has
* to pass before the function is run.
*
* Scala API
*/
def scheduleOnce(
delay: FiniteDuration)(f: Unit)(
implicit executor: ExecutionContext): Cancellable
}
//#scheduler
// this one is just here so we can present a nice AbstractScheduler for Java
abstract class AbstractSchedulerBase extends Scheduler
//#cancellable
/**
* Signifies something that can be cancelled
@ -108,14 +149,16 @@ trait Scheduler {
*/
trait Cancellable {
/**
* Cancels this Cancellable
* Cancels this Cancellable and returns true if that was successful.
* If this cancellable was (concurrently) cancelled already, then this method
* will return false although isCancelled will return true.
*
* Java & Scala API
*/
def cancel(): Unit
def cancel(): Boolean
/**
* Returns whether this Cancellable has been cancelled
* Returns true if and only if this Cancellable has been successfully cancelled
*
* Java & Scala API
*/
@ -123,6 +166,308 @@ trait Cancellable {
}
//#cancellable
/**
* This scheduler implementation is based on a revolving wheel of buckets,
* like Nettys HashedWheelTimer, which it advances at a fixed tick rate and
* dispatches tasks it finds in the current bucket to their respective
* ExecutionContexts. The tasks are held in TaskHolders, which upon
* cancellation null out their reference to the actual task, leaving only this
* shell to be cleaned up when the wheel reaches that bucket next time. This
* enables the use of a simple linked list to chain the TaskHolders off the
* wheel.
*
* Also noteworthy is that this scheduler does not obtain a current time stamp
* when scheduling single-shot tasks, instead it always rounds up the task
* delay to a full multiple of the TickDuration. This means that tasks are
* scheduled possibly one tick later than they could be (if checking that
* now() + delay <= nextTick were done).
*/
class LightArrayRevolverScheduler(config: Config,
log: LoggingAdapter,
threadFactory: ThreadFactory)
extends {
val WheelShift = {
val ticks = config.getInt("akka.scheduler.ticks-per-wheel")
val shift = 31 - Integer.numberOfLeadingZeros(ticks)
if ((ticks & (ticks - 1)) != 0) throw new akka.ConfigurationException("ticks-per-wheel must be a power of 2")
shift
}
val TickDuration = Duration(config.getMilliseconds("akka.scheduler.tick-duration"), MILLISECONDS)
val ShutdownTimeout = Duration(config.getMilliseconds("akka.scheduler.shutdown-timeout"), MILLISECONDS)
} with AtomicReferenceArray[LightArrayRevolverScheduler.TaskHolder](1 << WheelShift) with Scheduler with Closeable {
import LightArrayRevolverScheduler._
private val oneNs = Duration.fromNanos(1l)
private def roundUp(d: FiniteDuration): FiniteDuration =
try {
((d + TickDuration - oneNs) / TickDuration).toLong * TickDuration
} catch {
case _: IllegalArgumentException d // rouding up Long.MaxValue.nanos overflows
}
/**
* Clock implementation is replaceable (for testing); the implementation must
* return a monotonically increasing series of Long nanoseconds.
*/
protected def clock(): Long = System.nanoTime
/**
* Overridable for tests
*/
protected def waitNanos(nanos: Long): Unit = {
// see http://www.javamex.com/tutorials/threads/sleep_issues.shtml
val sleepMs = if (Helpers.isWindows) (nanos + 4999999) / 10000000 * 10 else (nanos + 999999) / 1000000
try Thread.sleep(sleepMs) catch {
case _: InterruptedException Thread.currentThread.interrupt() // we got woken up
}
}
override def schedule(initialDelay: FiniteDuration,
delay: FiniteDuration,
runnable: Runnable)(implicit executor: ExecutionContext): Cancellable =
try new AtomicReference[Cancellable] with Cancellable { self
set(schedule(
new AtomicLong(clock() + initialDelay.toNanos) with Runnable {
override def run(): Unit = {
try {
runnable.run()
val driftNanos = clock() - getAndAdd(delay.toNanos)
if (self.get != null)
swap(schedule(this, Duration.fromNanos(Math.max(delay.toNanos - driftNanos, 1))))
} catch {
case _: SchedulerException // ignore failure to enqueue or terminated target actor
}
}
}, roundUp(initialDelay)))
@tailrec private def swap(c: Cancellable): Unit = {
get match {
case null if (c != null) c.cancel()
case old if (!compareAndSet(old, c)) swap(c)
}
}
@tailrec final def cancel(): Boolean = {
get match {
case null false
case c
if (c.cancel()) compareAndSet(c, null)
else compareAndSet(c, null) || cancel()
}
}
override def isCancelled: Boolean = get == null
} catch {
case SchedulerException(msg) throw new IllegalStateException(msg)
}
override def scheduleOnce(delay: FiniteDuration, runnable: Runnable)(implicit executor: ExecutionContext): Cancellable =
try schedule(runnable, roundUp(delay))
catch {
case SchedulerException(msg) throw new IllegalStateException(msg)
}
private def execDirectly(t: TimerTask): Unit = {
try t.run() catch {
case e: InterruptedException throw e
case _: SchedulerException // ignore terminated actors
case NonFatal(e) log.error(e, "exception while executing timer task")
}
}
override def close(): Unit = Await.result(stop(), ShutdownTimeout) foreach execDirectly
override val maxFrequency: Double = 1.second / TickDuration
/*
* BELOW IS THE ACTUAL TIMER IMPLEMENTATION
*/
private val start = clock()
private val tickNanos = TickDuration.toNanos
private val wheelMask = length() - 1
@volatile private var currentBucket = 0
private def schedule(r: Runnable, delay: FiniteDuration)(implicit ec: ExecutionContext): TimerTask =
if (delay <= Duration.Zero) {
if (stopped.get != null) throw new SchedulerException("cannot enqueue after timer shutdown")
ec.execute(r)
NotCancellable
} else {
val ticks = (delay.toNanos / tickNanos).toInt
val rounds = (ticks >> WheelShift).toInt
/*
* works as follows:
* - ticks are calculated to be never too early
* - base off of currentBucket, even after that was moved in the meantime
* - timer thread will swap in Pause, increment currentBucket, swap in null
* - hence spin on Pause, else normal CAS
* - stopping will set all buckets to Pause (in clearAll), so we need only check there
*/
@tailrec
def rec(t: TaskHolder): TimerTask = {
val bucket = (currentBucket + ticks) & wheelMask
get(bucket) match {
case Pause
if (stopped.get != null) throw new SchedulerException("cannot enqueue after timer shutdown")
rec(t)
case tail
t.next = tail
if (compareAndSet(bucket, tail, t)) t
else rec(t)
}
}
rec(new TaskHolder(r, null, rounds))
}
private val stopped = new AtomicReference[Promise[immutable.Seq[TimerTask]]]
def stop(): Future[immutable.Seq[TimerTask]] =
if (stopped.compareAndSet(null, Promise())) {
timerThread.interrupt()
stopped.get.future
} else Future.successful(Nil)
private def clearAll(): immutable.Seq[TimerTask] = {
def collect(curr: TaskHolder, acc: Vector[TimerTask]): Vector[TimerTask] = {
curr match {
case null acc
case x collect(x.next, acc :+ x)
}
}
(0 until length()) flatMap (i collect(getAndSet(i, Pause), Vector.empty))
}
@volatile private var timerThread: Thread = threadFactory.newThread(new Runnable {
var tick = 0
override final def run =
try nextTick()
catch {
case t: Throwable
val thread = threadFactory.newThread(this)
try thread.start()
finally timerThread = thread
throw t
}
@tailrec final def nextTick(): Unit = {
val sleepTime = start + tick * tickNanos - clock()
if (sleepTime > 0) {
waitNanos(sleepTime)
} else {
// first get the list of tasks out and turn the wheel
val bucket = currentBucket
val tasks = getAndSet(bucket, Pause)
val next = (bucket + 1) & wheelMask
currentBucket = next
set(bucket, if (tasks eq null) Empty else null)
// then process the tasks and keep the non-ripe ones in a list
var last: TaskHolder = null // the last element of the putBack list
@tailrec def rec1(task: TaskHolder, nonRipe: TaskHolder): TaskHolder = {
if ((task eq null) || (task eq Empty)) nonRipe
else if (task.isCancelled) rec1(task.next, nonRipe)
else if (task.rounds > 0) {
task.rounds -= 1
val next = task.next
task.next = nonRipe
if (last == null) last = task
rec1(next, task)
} else {
task.executeTask()
rec1(task.next, nonRipe)
}
}
val putBack = rec1(tasks, null)
// finally put back the non-ripe ones, who had their rounds decremented
@tailrec def rec2() {
val tail = get(bucket)
last.next = tail
if (!compareAndSet(bucket, tail, putBack)) rec2()
}
if (last != null) rec2()
// and off to the next tick
tick += 1
}
stopped.get match {
case null nextTick()
case x x success clearAll()
}
}
})
timerThread.start()
}
object LightArrayRevolverScheduler {
private val taskOffset = unsafe.objectFieldOffset(classOf[TaskHolder].getDeclaredField("task"))
/**
* INTERNAL API
*/
protected[actor] trait TimerTask extends Runnable with Cancellable
/**
* INTERNAL API
*/
protected[actor] class TaskHolder(@volatile var task: Runnable,
@volatile var next: TaskHolder,
@volatile var rounds: Int)(
implicit executionContext: ExecutionContext) extends TimerTask {
@tailrec
private final def extractTask(cancel: Boolean): Runnable = {
task match {
case null | CancelledTask null // null means expired
case x
if (unsafe.compareAndSwapObject(this, taskOffset, x, if (cancel) CancelledTask else null)) x
else extractTask(cancel)
}
}
private[akka] final def executeTask(): Boolean = extractTask(cancel = false) match {
case null | CancelledTask false
case other
try {
executionContext execute other
true
} catch {
case _: InterruptedException { Thread.currentThread.interrupt(); false }
case NonFatal(e) { executionContext.reportFailure(e); false }
}
}
/**
* utility method to directly run the task, e.g. as clean-up action
*/
def run(): Unit = extractTask(cancel = false) match {
case null
case r r.run()
}
override def cancel(): Boolean = extractTask(cancel = true) != null
override def isCancelled: Boolean = task eq CancelledTask
}
private val CancelledTask = new Runnable { def run = () }
private val NotCancellable = new TimerTask {
def cancel(): Boolean = false
def isCancelled: Boolean = false
def run(): Unit = ()
}
// marker object during wheel movement
private val Pause = new TaskHolder(null, null, 0)(null)
// we need two empty tokens so wheel passing can be detected in schedule()
private val Empty = new TaskHolder(null, null, 0)(null)
}
/**
* Scheduled tasks (Runnable and functions) are executed with the supplied dispatcher.
* Note that dispatcher is by-name parameter, because dispatcher might not be initialized
@ -132,35 +477,19 @@ trait Cancellable {
* if it does not enqueue a task. Once a task is queued, it MUST be executed or
* returned from stop().
*/
class DefaultScheduler(hashedWheelTimer: HashedWheelTimer, log: LoggingAdapter) extends Scheduler with Closeable {
override def schedule(initialDelay: FiniteDuration,
delay: FiniteDuration,
receiver: ActorRef,
message: Any)(implicit executor: ExecutionContext, sender: ActorRef = Actor.noSender): Cancellable = {
val continuousCancellable = new ContinuousCancellable
continuousCancellable.init(
hashedWheelTimer.newTimeout(
new AtomicLong(System.nanoTime + initialDelay.toNanos) with TimerTask with ContinuousScheduling {
def run(timeout: HWTimeout) {
executor execute new Runnable {
override def run = {
receiver ! message
// Check if the receiver is still alive and kicking before reschedule the task
if (receiver.isTerminated) log.debug("Could not reschedule message to be sent because receiving actor {} has been terminated.", receiver)
else {
val driftNanos = System.nanoTime - getAndAdd(delay.toNanos)
scheduleNext(timeout, Duration.fromNanos(Math.max(delay.toNanos - driftNanos, 1)), continuousCancellable)
}
}
}
}
},
initialDelay))
}
class DefaultScheduler(config: Config,
log: LoggingAdapter,
threadFactory: ThreadFactory) extends Scheduler with Closeable {
override def schedule(initialDelay: FiniteDuration,
delay: FiniteDuration)(f: Unit)(implicit executor: ExecutionContext): Cancellable =
schedule(initialDelay, delay, new Runnable { override def run = f })
val TicksPerWheel = {
val ticks = config.getInt("akka.scheduler.ticks-per-wheel")
val shift = 31 - Integer.numberOfLeadingZeros(ticks)
if ((ticks & (ticks - 1)) != 0) throw new akka.ConfigurationException("ticks-per-wheel must be a power of 2")
ticks
}
val TickDuration = Duration(config.getMilliseconds("akka.scheduler.tick-duration"), MILLISECONDS)
private val hashedWheelTimer = new HashedWheelTimer(log, threadFactory, TickDuration, TicksPerWheel)
override def schedule(initialDelay: FiniteDuration,
delay: FiniteDuration,
@ -168,14 +497,19 @@ class DefaultScheduler(hashedWheelTimer: HashedWheelTimer, log: LoggingAdapter)
val continuousCancellable = new ContinuousCancellable
continuousCancellable.init(
hashedWheelTimer.newTimeout(
new AtomicLong(System.nanoTime + initialDelay.toNanos) with TimerTask with ContinuousScheduling {
override def run(timeout: HWTimeout): Unit = executor.execute(new Runnable {
override def run = {
runnable.run()
val driftNanos = System.nanoTime - getAndAdd(delay.toNanos)
scheduleNext(timeout, Duration.fromNanos(Math.max(delay.toNanos - driftNanos, 1)), continuousCancellable)
}
})
new AtomicLong(System.nanoTime + initialDelay.toNanos) with HWTimerTask with ContinuousScheduling {
override def run(timeout: HWTimeout): Unit =
executor.execute(new Runnable {
override def run = {
try {
runnable.run()
val driftNanos = System.nanoTime - getAndAdd(delay.toNanos)
scheduleNext(timeout, Duration.fromNanos(Math.max(delay.toNanos - driftNanos, 1)), continuousCancellable)
} catch {
case _: SchedulerException // actor target terminated
}
}
})
},
initialDelay))
}
@ -183,16 +517,10 @@ class DefaultScheduler(hashedWheelTimer: HashedWheelTimer, log: LoggingAdapter)
override def scheduleOnce(delay: FiniteDuration, runnable: Runnable)(implicit executor: ExecutionContext): Cancellable =
new DefaultCancellable(
hashedWheelTimer.newTimeout(
new TimerTask() { def run(timeout: HWTimeout): Unit = executor.execute(runnable) },
new HWTimerTask() { def run(timeout: HWTimeout): Unit = executor.execute(runnable) },
delay))
override def scheduleOnce(delay: FiniteDuration, receiver: ActorRef, message: Any)(implicit executor: ExecutionContext): Cancellable =
scheduleOnce(delay, new Runnable { override def run = receiver ! message })
override def scheduleOnce(delay: FiniteDuration)(f: Unit)(implicit executor: ExecutionContext): Cancellable =
scheduleOnce(delay, new Runnable { override def run = f })
private trait ContinuousScheduling { this: TimerTask
private trait ContinuousScheduling { this: HWTimerTask
def scheduleNext(timeout: HWTimeout, delay: FiniteDuration, delegator: ContinuousCancellable) {
try delegator.swap(timeout.getTimer.newTimeout(this, delay)) catch { case _: IllegalStateException } // stop recurring if timer is stopped
}
@ -209,23 +537,25 @@ class DefaultScheduler(hashedWheelTimer: HashedWheelTimer, log: LoggingAdapter)
val i = hashedWheelTimer.stop().iterator()
while (i.hasNext) execDirectly(i.next())
}
override def maxFrequency: Double = 1.second / TickDuration
}
private[akka] object ContinuousCancellable {
val initial: HWTimeout = new HWTimeout {
override def getTimer: Timer = null
override def getTask: TimerTask = null
override def getTimer: HWTimer = null
override def getTask: HWTimerTask = null
override def isExpired: Boolean = false
override def isCancelled: Boolean = false
override def cancel: Unit = ()
override def cancel: Boolean = true
}
val cancelled: HWTimeout = new HWTimeout {
override def getTimer: Timer = null
override def getTask: TimerTask = null
override def getTimer: HWTimer = null
override def getTask: HWTimerTask = null
override def isExpired: Boolean = false
override def isCancelled: Boolean = true
override def cancel: Unit = ()
override def cancel: Boolean = false
}
}
/**
@ -245,10 +575,10 @@ private[akka] class ContinuousCancellable extends AtomicReference[HWTimeout](Con
}
def isCancelled(): Boolean = get().isCancelled()
def cancel(): Unit = getAndSet(ContinuousCancellable.cancelled).cancel()
def cancel(): Boolean = getAndSet(ContinuousCancellable.cancelled).cancel()
}
private[akka] class DefaultCancellable(timeout: HWTimeout) extends AtomicReference[HWTimeout](timeout) with Cancellable {
override def cancel(): Unit = getAndSet(ContinuousCancellable.cancelled).cancel()
override def cancel(): Boolean = getAndSet(ContinuousCancellable.cancelled).cancel()
override def isCancelled: Boolean = get().isCancelled
}

View file

@ -23,6 +23,7 @@ import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.AtomicReference
import akka.util.internal.HashedWheelTimer
import concurrent.{ ExecutionContext, Await }
import com.typesafe.config.ConfigFactory
/**
* Cluster Extension Id and factory for creating Cluster extension.
@ -88,31 +89,26 @@ class Cluster(val system: ExtendedActorSystem) extends Extension {
* INTERNAL API
*/
private[cluster] val scheduler: Scheduler with Closeable = {
if (system.settings.SchedulerTickDuration > SchedulerTickDuration) {
if (system.scheduler.maxFrequency < 1.second / SchedulerTickDuration) {
import scala.collection.JavaConverters._
log.info("Using a dedicated scheduler for cluster. Default scheduler can be used if configured " +
"with 'akka.scheduler.tick-duration' [{} ms] <= 'akka.cluster.scheduler.tick-duration' [{} ms].",
system.settings.SchedulerTickDuration.toMillis, SchedulerTickDuration.toMillis)
1000 / system.scheduler.maxFrequency, SchedulerTickDuration.toMillis)
new DefaultScheduler(
new HashedWheelTimer(log,
system.threadFactory match {
case tf: MonitorableThreadFactory tf.withName(tf.name + "-cluster-scheduler")
case tf tf
},
SchedulerTickDuration,
SchedulerTicksPerWheel),
log)
ConfigFactory.parseString(s"akka.scheduler.tick-duration=${SchedulerTickDuration.toMillis}ms").withFallback(
system.settings.config),
log,
system.threadFactory match {
case tf: MonitorableThreadFactory tf.withName(tf.name + "-cluster-scheduler")
case tf tf
})
} else {
// delegate to system.scheduler, but don't close over system
val systemScheduler = system.scheduler
new Scheduler with Closeable {
override def close(): Unit = () // we are using system.scheduler, which we are not responsible for closing
override def schedule(initialDelay: FiniteDuration, interval: FiniteDuration,
receiver: ActorRef, message: Any)(implicit executor: ExecutionContext, sender: ActorRef = Actor.noSender): Cancellable =
systemScheduler.schedule(initialDelay, interval, receiver, message)
override def schedule(initialDelay: FiniteDuration, interval: FiniteDuration)(f: Unit)(implicit executor: ExecutionContext): Cancellable =
systemScheduler.schedule(initialDelay, interval)(f)
override def maxFrequency: Double = systemScheduler.maxFrequency
override def schedule(initialDelay: FiniteDuration, interval: FiniteDuration,
runnable: Runnable)(implicit executor: ExecutionContext): Cancellable =
@ -121,13 +117,6 @@ class Cluster(val system: ExtendedActorSystem) extends Extension {
override def scheduleOnce(delay: FiniteDuration,
runnable: Runnable)(implicit executor: ExecutionContext): Cancellable =
systemScheduler.scheduleOnce(delay, runnable)
override def scheduleOnce(delay: FiniteDuration, receiver: ActorRef,
message: Any)(implicit executor: ExecutionContext): Cancellable =
systemScheduler.scheduleOnce(delay, receiver, message)
override def scheduleOnce(delay: FiniteDuration)(f: Unit)(implicit executor: ExecutionContext): Cancellable =
systemScheduler.scheduleOnce(delay)(f)
}
}
}

View file

@ -10,6 +10,7 @@ import java.util.concurrent.TimeUnit;
//#imports1
//#imports2
import akka.actor.Actor;
import akka.actor.UntypedActor;
import akka.actor.UntypedActorFactory;
import akka.actor.Cancellable;
@ -43,7 +44,7 @@ public class SchedulerDocTestBase {
public void scheduleOneOffTask() {
//#schedule-one-off-message
system.scheduler().scheduleOnce(Duration.create(50, TimeUnit.MILLISECONDS),
testActor, "foo", system.dispatcher());
testActor, "foo", system.dispatcher(), null);
//#schedule-one-off-message
//#schedule-one-off-thunk

View file

@ -311,9 +311,8 @@ public class FaultHandlingDocSample {
counter.tell(new UseStorage(null), getSelf());
// Try to re-establish storage after while
getContext().system().scheduler().scheduleOnce(
Duration.create(10, "seconds"), getSelf(), Reconnect,
getContext().dispatcher()
);
Duration.create(10, "seconds"), getSelf(), Reconnect,
getContext().dispatcher(), null);
} else if (msg.equals(Reconnect)) {
// Re-establish storage after the scheduled delay
initStorage();

View file

@ -84,7 +84,7 @@ public class SchedulerPatternTest {
public void preStart() {
getContext().system().scheduler().scheduleOnce(
Duration.create(500, TimeUnit.MILLISECONDS),
getSelf(), "tick", getContext().dispatcher());
getSelf(), "tick", getContext().dispatcher(), null);
}
// override postRestart so we don't call preStart and schedule a new message
@ -98,7 +98,7 @@ public class SchedulerPatternTest {
// send another periodic tick after the specified delay
getContext().system().scheduler().scheduleOnce(
Duration.create(1000, TimeUnit.MILLISECONDS),
getSelf(), "tick", getContext().dispatcher());
getSelf(), "tick", getContext().dispatcher(), null);
// do something useful here
//#schedule-receive
target.tell(message, getSelf());

View file

@ -5,6 +5,7 @@ import java.util.concurrent.TimeoutException;
import scala.concurrent.Future;
import scala.concurrent.duration.Duration;
import akka.actor.Actor;
import akka.actor.ActorKilledException;
import akka.actor.ActorRef;
import akka.actor.ActorRefFactory;
@ -79,7 +80,7 @@ public class SupervisedAsk {
targetActor.forward(askParam.message, getContext());
Scheduler scheduler = getContext().system().scheduler();
timeoutMessage = scheduler.scheduleOnce(askParam.timeout.duration(),
self(), new AskTimeout(), context().dispatcher());
self(), new AskTimeout(), context().dispatcher(), null);
} else if (message instanceof Terminated) {
Throwable ex = new ActorKilledException("Target actor terminated.");
caller.tell(new Status.Failure(ex), self());

View file

@ -5,22 +5,23 @@
Scheduler (Java)
##################
Sometimes the need for making things happen in the future arises, and where do you go look then?
Look no further than ``ActorSystem``! There you find the :meth:`scheduler` method that returns an instance
of akka.actor.Scheduler, this instance is unique per ActorSystem and is used internally for scheduling things
to happen at specific points in time. Please note that the scheduled tasks are executed by the default
``MessageDispatcher`` of the ``ActorSystem``.
Sometimes the need for making things happen in the future arises, and where do
you go look then? Look no further than ``ActorSystem``! There you find the
:meth:`scheduler` method that returns an instance of
:class:`akka.actor.Scheduler`, this instance is unique per ActorSystem and is
used internally for scheduling things to happen at specific points in time.
You can schedule sending of messages to actors and execution of tasks (functions or Runnable).
You will get a ``Cancellable`` back that you can call :meth:`cancel` on to cancel the execution of the
scheduled operation.
You can schedule sending of messages to actors and execution of tasks
(functions or Runnable). You will get a ``Cancellable`` back that you can call
:meth:`cancel` on to cancel the execution of the scheduled operation.
.. warning::
The default implementation of ``Scheduler`` used by Akka is based on the Netty ``HashedWheelTimer``.
It does not execute tasks at the exact time, but on every tick, it will run everything that is overdue.
The accuracy of the default Scheduler can be modified by the "ticks-per-wheel" and "tick-duration" configuration
properties. For more information, see: `HashedWheelTimers <http://www.cse.wustl.edu/~cdgill/courses/cs6874/TimingWheels.ppt>`_.
The default implementation of ``Scheduler`` used by Akka is based on job
buckets which are emptied according to a fixed schedule. It does not
execute tasks at the exact time, but on every tick, it will run everything
that is (over)due. The accuracy of the default Scheduler can be modified
by the ``akka.scheduler.tick-duration`` configuration property.
Some examples
-------------
@ -53,19 +54,29 @@ From ``akka.actor.ActorSystem``
:include: scheduler
The Scheduler interface
-----------------------
The Scheduler Interface for Implementors
----------------------------------------
.. includecode:: ../../../akka-actor/src/main/scala/akka/actor/Scheduler.scala
The actual scheduler implementation is loaded reflectively upon
:class:`ActorSystem` start-up, which means that it is possible to provide a
different one using the ``akka.scheduler.implementation`` configuration
property. The referenced class must implement the following interface:
.. includecode:: ../../../akka-actor/src/main/java/akka/actor/AbstractScheduler.java
:include: scheduler
The Cancellable interface
-------------------------
This allows you to ``cancel`` something that has been scheduled for execution.
Scheduling a task will result in a :class:`Cancellable` (or throw an
:class:`IllegalStateException` if attempted after the schedulers shutdown).
This allows you to cancel something that has been scheduled for execution.
.. warning::
This does not abort the execution of the task, if it had already been started.
This does not abort the execution of the task, if it had already been
started. Check the return value of ``cancel`` to detect whether the
scheduled task was canceled or will (eventually) have run.
.. includecode:: ../../../akka-actor/src/main/scala/akka/actor/Scheduler.scala
:include: cancellable

View file

@ -29,7 +29,7 @@ class SchedulerDocSpec extends AkkaSpec(Map("akka.loglevel" -> "INFO")) {
expectMsg(1 second, "foo")
//#schedule-one-off-thunk
//Schedules a function to be executed (send the current time) to the testActor after 50ms
//Schedules a function to be executed (send a message to the testActor) after 50ms
system.scheduler.scheduleOnce(50 milliseconds) {
testActor ! System.currentTimeMillis
}

View file

@ -5,22 +5,23 @@
Scheduler (Scala)
###################
Sometimes the need for making things happen in the future arises, and where do you go look then?
Look no further than ``ActorSystem``! There you find the :meth:`scheduler` method that returns an instance
of akka.actor.Scheduler, this instance is unique per ActorSystem and is used internally for scheduling things
to happen at specific points in time. Please note that the scheduled tasks are executed by the default
``MessageDispatcher`` of the ``ActorSystem``.
Sometimes the need for making things happen in the future arises, and where do
you go look then? Look no further than ``ActorSystem``! There you find the
:meth:`scheduler` method that returns an instance of
:class:`akka.actor.Scheduler`, this instance is unique per ActorSystem and is
used internally for scheduling things to happen at specific points in time.
You can schedule sending of messages to actors and execution of tasks (functions or Runnable).
You will get a ``Cancellable`` back that you can call :meth:`cancel` on to cancel the execution of the
scheduled operation.
You can schedule sending of messages to actors and execution of tasks
(functions or Runnable). You will get a ``Cancellable`` back that you can call
:meth:`cancel` on to cancel the execution of the scheduled operation.
.. warning::
The default implementation of ``Scheduler`` used by Akka is based on the Netty ``HashedWheelTimer``.
It does not execute tasks at the exact time, but on every tick, it will run everything that is overdue.
The accuracy of the default Scheduler can be modified by the "ticks-per-wheel" and "tick-duration" configuration
properties. For more information, see: `HashedWheelTimers <http://www.cse.wustl.edu/~cdgill/courses/cs6874/TimingWheels.ppt>`_.
The default implementation of ``Scheduler`` used by Akka is based on job
buckets which are emptied according to a fixed schedule. It does not
execute tasks at the exact time, but on every tick, it will run everything
that is (over)due. The accuracy of the default Scheduler can be modified
by the ``akka.scheduler.tick-duration`` configuration property.
Some examples
-------------
@ -44,16 +45,26 @@ From ``akka.actor.ActorSystem``
The Scheduler interface
-----------------------
The actual scheduler implementation is loaded reflectively upon
:class:`ActorSystem` start-up, which means that it is possible to provide a
different one using the ``akka.scheduler.implementation`` configuration
property. The referenced class must implement the following interface:
.. includecode:: ../../../akka-actor/src/main/scala/akka/actor/Scheduler.scala
:include: scheduler
The Cancellable interface
-------------------------
This allows you to ``cancel`` something that has been scheduled for execution.
Scheduling a task will result in a :class:`Cancellable` (or throw an
:class:`IllegalStateException` if attempted after the schedulers shutdown).
This allows you to cancel something that has been scheduled for execution.
.. warning::
This does not abort the execution of the task, if it had already been started.
This does not abort the execution of the task, if it had already been
started. Check the return value of ``cancel`` to detect whether the
scheduled task was canceled or will (eventually) have run.
.. includecode:: ../../../akka-actor/src/main/scala/akka/actor/Scheduler.scala
:include: cancellable

View file

@ -44,8 +44,8 @@ class OsgiActorSystemFactory(val context: BundleContext, val fallbackClassLoader
def actorSystemConfig(context: BundleContext): Config = {
val bundleSymbolicName = context.getBundle.getSymbolicName
val bundleId = context.getBundle.getBundleId
val acceptedFilePath = List(s"bundle-$bundleSymbolicName", s"bundle-$bundleId", "akka").map(x => s"etc/$x")
val applicationConfiguration = acceptedFilePath.foldLeft(ConfigFactory.empty())((x, y) => x.withFallback(ConfigFactory.parseFileAnySyntax(new File(y))))
val acceptedFilePath = List(s"bundle-$bundleSymbolicName", s"bundle-$bundleId", "akka").map(x s"etc/$x")
val applicationConfiguration = acceptedFilePath.foldLeft(ConfigFactory.empty())((x, y) x.withFallback(ConfigFactory.parseFileAnySyntax(new File(y))))
applicationConfiguration.withFallback(ConfigFactory.load(classloader).withFallback(ConfigFactory.defaultReference(OsgiActorSystemFactory.akkaActorClassLoader)))
}

View file

@ -135,7 +135,15 @@ public class JavaTestKit {
public Object apply() {
return cond();
}
}, max, interval);
}, max, interval, p.awaitCond$default$4());
}
public AwaitCond(Duration max, Duration interval, String message) {
p.awaitCond(new AbstractFunction0<Object>() {
public Object apply() {
return cond();
}
}, max, interval, message);
}
}

View file

@ -215,14 +215,14 @@ trait TestKitBase {
* Note that the timeout is scaled using Duration.dilated,
* which uses the configuration entry "akka.test.timefactor".
*/
def awaitCond(p: Boolean, max: Duration = Duration.Undefined, interval: Duration = 100.millis) {
def awaitCond(p: Boolean, max: Duration = Duration.Undefined, interval: Duration = 100.millis, message: String = "") {
val _max = remainingOrDilated(max)
val stop = now + _max
@tailrec
def poll(t: Duration) {
if (!p) {
assert(now < stop, "timeout " + _max + " expired")
assert(now < stop, "timeout " + _max + " expired: " + message)
Thread.sleep(t.toMillis)
poll((stop - now) min interval)
}