diff --git a/akka-actor-tests/src/test/scala/akka/actor/SchedulerSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/SchedulerSpec.scala index 0710be6ad5..a2a7c645fa 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/SchedulerSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/SchedulerSpec.scala @@ -1,45 +1,37 @@ package akka.actor import language.postfixOps - -import com.typesafe.config.ConfigFactory -import org.scalatest.BeforeAndAfterEach -import scala.concurrent.duration._ -import java.util.concurrent.{ CountDownLatch, ConcurrentLinkedQueue, TimeUnit } -import akka.testkit._ -import scala.concurrent.Await -import akka.pattern.ask +import java.io.Closeable +import java.util.concurrent._ import java.util.concurrent.atomic.AtomicInteger -import java.util.concurrent.TimeoutException +import scala.concurrent.{ future, Await, ExecutionContext } +import scala.concurrent.duration._ +import scala.util.Try +import scala.util.control.NonFatal +import org.scalatest.BeforeAndAfterEach +import com.typesafe.config.{ Config, ConfigFactory } +import akka.pattern.ask +import akka.testkit._ object SchedulerSpec { val testConf = ConfigFactory.parseString(""" + akka.scheduler.class = akka.actor.DefaultScheduler akka.scheduler.ticks-per-wheel = 32 """).withFallback(AkkaSpec.testConf) + + val testConfRevolver = ConfigFactory.parseString(""" + akka.scheduler.class = akka.actor.LightArrayRevolverScheduler + """).withFallback(testConf) } -@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) -class SchedulerSpec extends AkkaSpec(SchedulerSpec.testConf) with BeforeAndAfterEach with DefaultTimeout with ImplicitSender { - private val cancellables = new ConcurrentLinkedQueue[Cancellable]() +trait SchedulerSpec extends BeforeAndAfterEach with DefaultTimeout with ImplicitSender { this: AkkaSpec ⇒ import system.dispatcher - def collectCancellable(c: Cancellable): Cancellable = { - cancellables.add(c) - c - } - - override def afterEach { - while (cancellables.peek() ne null) { - for (c ← Option(cancellables.poll())) { - c.cancel() - c.isCancelled must be === true - } - } - } + def collectCancellable(c: Cancellable): Cancellable "A Scheduler" must { - "schedule more than once" in { + "schedule more than once" taggedAs TimingTest in { case object Tick case object Tock @@ -84,7 +76,7 @@ class SchedulerSpec extends AkkaSpec(SchedulerSpec.testConf) with BeforeAndAfter expectNoMsg(500 millis) } - "schedule once" in { + "schedule once" taggedAs TimingTest in { case object Tick val countDownLatch = new CountDownLatch(3) val tickActor = system.actorOf(Props(new Actor { @@ -108,7 +100,7 @@ class SchedulerSpec extends AkkaSpec(SchedulerSpec.testConf) with BeforeAndAfter /** * ticket #372 */ - "be cancellable" in { + "be cancellable" taggedAs TimingTest in { for (_ ← 1 to 10) system.scheduler.scheduleOnce(1 second, testActor, "fail").cancel() expectNoMsg(2 seconds) @@ -132,12 +124,12 @@ class SchedulerSpec extends AkkaSpec(SchedulerSpec.testConf) with BeforeAndAfter "be cancellable after initial delay" taggedAs TimingTest in { val ticks = new AtomicInteger - val initialDelay = 20.milliseconds.dilated - val delay = 200.milliseconds.dilated + val initialDelay = 90.milliseconds.dilated + val delay = 500.milliseconds.dilated val timeout = collectCancellable(system.scheduler.schedule(initialDelay, delay) { ticks.incrementAndGet() }) - Thread.sleep((initialDelay + 100.milliseconds.dilated).toMillis) + Thread.sleep((initialDelay + 200.milliseconds.dilated).toMillis) timeout.cancel() Thread.sleep((delay + 100.milliseconds.dilated).toMillis) @@ -147,7 +139,7 @@ class SchedulerSpec extends AkkaSpec(SchedulerSpec.testConf) with BeforeAndAfter /** * ticket #307 */ - "pick up schedule after actor restart" in { + "pick up schedule after actor restart" taggedAs TimingTest in { object Ping object Crash @@ -177,7 +169,7 @@ class SchedulerSpec extends AkkaSpec(SchedulerSpec.testConf) with BeforeAndAfter Await.ready(pingLatch, 5 seconds) } - "never fire prematurely" in { + "never fire prematurely" taggedAs TimingTest in { val ticks = new TestLatch(300) case class Msg(ts: Long) @@ -238,6 +230,28 @@ class SchedulerSpec extends AkkaSpec(SchedulerSpec.testConf) with BeforeAndAfter // Rate n * 1000.0 / (System.nanoTime - startTime).nanos.toMillis must be(4.4 plusOrMinus 0.3) } + + "survive being stressed without cancellation" taggedAs TimingTest in { + val r = ThreadLocalRandom.current() + val N = 100000 + for (_ ← 1 to N) { + val next = r.nextInt(3000) + val now = System.nanoTime + system.scheduler.scheduleOnce(next.millis) { + val stop = System.nanoTime + testActor ! (stop - now - next * 1000000L) + } + } + val latencies = within(5.seconds) { + for (i ← 1 to N) yield try expectMsgType[Long] catch { + case NonFatal(e) ⇒ throw new Exception(s"failed expecting the $i-th latency", e) + } + } + val histogram = latencies groupBy (_ / 100000000L) + for (k ← histogram.keys.toSeq.sorted) { + system.log.info(f"${k * 100}%3d: ${histogram(k).size}") + } + } } "A HashedWheelTimer" must { @@ -267,3 +281,218 @@ class SchedulerSpec extends AkkaSpec(SchedulerSpec.testConf) with BeforeAndAfter } } } + +class DefaultSchedulerSpec extends AkkaSpec(SchedulerSpec.testConf) with SchedulerSpec { + private val cancellables = new ConcurrentLinkedQueue[Cancellable]() + + def collectCancellable(c: Cancellable): Cancellable = { + cancellables.add(c) + c + } + + override def afterEach { + while (cancellables.peek() ne null) { + for (c ← Option(cancellables.poll())) { + c.cancel() + c.isCancelled must be === true + } + } + } +} + +class LightArrayRevolverSchedulerSpec extends AkkaSpec(SchedulerSpec.testConfRevolver) with SchedulerSpec { + + def collectCancellable(c: Cancellable): Cancellable = c + + "A LightArrayRevolverScheduler" must { + + "survive being stressed with cancellation" taggedAs TimingTest in { + import system.dispatcher + val r = ThreadLocalRandom.current + val N = 1000000 + val tasks = for (_ ← 1 to N) yield { + val next = r.nextInt(3000) + val now = System.nanoTime + system.scheduler.scheduleOnce(next.millis) { + val stop = System.nanoTime + testActor ! (stop - now - next * 1000000L) + } + } + // get somewhat into the middle of things + Thread.sleep(500) + val cancellations = for (t ← tasks) yield { + t.cancel() + if (t.isCancelled) 1 else 0 + } + val cancelled = cancellations.sum + println(cancelled) + val latencies = within(5.seconds) { + for (i ← 1 to (N - cancelled)) yield try expectMsgType[Long] catch { + case NonFatal(e) ⇒ throw new Exception(s"failed expecting the $i-th latency", e) + } + } + val histogram = latencies groupBy (_ / 100000000L) + for (k ← histogram.keys.toSeq.sorted) { + system.log.info(f"${k * 100}%3d: ${histogram(k).size}") + } + expectNoMsg(1.second) + } + + "survive vicious enqueueing" in { + withScheduler(config = ConfigFactory.parseString("akka.scheduler.ticks-per-wheel=2")) { (sched, driver) ⇒ + import driver._ + import system.dispatcher + val counter = new AtomicInteger + val terminated = future { + var rounds = 0 + while (Try(sched.scheduleOnce(Duration.Zero)(())(localEC)).isSuccess) { + Thread.sleep(1) + driver.wakeUp(step) + rounds += 1 + } + rounds + } + def delay = if (ThreadLocalRandom.current.nextBoolean) step * 2 else step + val N = 1000000 + (1 to N) foreach (_ ⇒ sched.scheduleOnce(delay)(counter.incrementAndGet())) + sched.close() + Await.result(terminated, 3.seconds.dilated) must be > 10 + awaitCond(counter.get == N) + } + } + + "execute multiple jobs at once when expiring multiple buckets" in { + withScheduler() { (sched, driver) ⇒ + implicit def ec = localEC + import driver._ + val start = step / 2 + (0 to 3) foreach (i ⇒ sched.scheduleOnce(start + step * i, testActor, "hello")) + expectNoMsg(step) + wakeUp(step) + expectWait(step) + wakeUp(step * 4 + step / 2) + expectWait(step / 2) + (0 to 3) foreach (_ ⇒ expectMsg(Duration.Zero, "hello")) + } + } + + "correctly wrap around wheel rounds" in { + withScheduler(config = ConfigFactory.parseString("akka.scheduler.ticks-per-wheel=2")) { (sched, driver) ⇒ + implicit def ec = localEC + import driver._ + val start = step / 2 + (0 to 3) foreach (i ⇒ sched.scheduleOnce(start + step * i, probe.ref, "hello")) + probe.expectNoMsg(step) + wakeUp(step) + expectWait(step) + // the following are no for-comp to see which iteration fails + wakeUp(step) + probe.expectMsg("hello") + expectWait(step) + wakeUp(step) + probe.expectMsg("hello") + expectWait(step) + wakeUp(step) + probe.expectMsg("hello") + expectWait(step) + wakeUp(step) + probe.expectMsg("hello") + expectWait(step) + wakeUp(step) + expectWait(step) + } + } + + "correctly execute jobs when clock wraps around" in { + withScheduler(Long.MaxValue - 200000000L) { (sched, driver) ⇒ + implicit def ec = localEC + import driver._ + val start = step / 2 + (0 to 3) foreach (i ⇒ sched.scheduleOnce(start + step * i, testActor, "hello")) + expectNoMsg(step) + wakeUp(step) + expectWait(step) + // the following are no for-comp to see which iteration fails + wakeUp(step) + expectMsg("hello") + expectWait(step) + wakeUp(step) + expectMsg("hello") + expectWait(step) + wakeUp(step) + expectMsg("hello") + expectWait(step) + wakeUp(step) + expectMsg("hello") + expectWait(step) + wakeUp(step) + expectWait(step) + } + } + + "reliably reject jobs when shutting down" in { + withScheduler() { (sched, driver) ⇒ + import system.dispatcher + val counter = new AtomicInteger + future { Thread.sleep(5); sched.close() } + val headroom = 200 + var overrun = headroom + val cap = 1000000 + val (success, failure) = Iterator + .continually(Try(sched.scheduleOnce(100.millis)(counter.incrementAndGet()))) + .take(cap) + .takeWhile(_.isSuccess || { overrun -= 1; overrun >= 0 }) + .partition(_.isSuccess) + val s = success.size + s must be < cap + awaitCond(s == counter.get, message = s"$s was not ${counter.get}") + failure.size must be === headroom + } + } + } + + trait Driver { + def wakeUp(d: FiniteDuration): Unit + def expectWait(): FiniteDuration + def expectWait(d: FiniteDuration) { expectWait() must be(d) } + def probe: TestProbe + def step: FiniteDuration + } + + val localEC = new ExecutionContext { + def execute(runnable: Runnable) { runnable.run() } + def reportFailure(t: Throwable) { t.printStackTrace() } + } + + def withScheduler(start: Long = 0L, config: Config = ConfigFactory.empty)(thunk: (Scheduler with Closeable, Driver) ⇒ Unit): Unit = { + import akka.actor.{ LightArrayRevolverScheduler ⇒ LARS } + val lbq = new LinkedBlockingQueue[Long] + val prb = TestProbe() + val tf = system.asInstanceOf[ActorSystemImpl].threadFactory + val sched = + new { @volatile var time = start } with LARS(config.withFallback(system.settings.config), log, tf) { + override protected def clock(): Long = { + // println(s"clock=$time") + time + } + override protected def waitNanos(ns: Long): Unit = { + // println(s"waiting $ns") + prb.ref ! ns + try time += lbq.take() + catch { + case _: InterruptedException ⇒ + } + } + } + val driver = new Driver { + def wakeUp(d: FiniteDuration) { lbq.offer(d.toNanos) } + def expectWait(): FiniteDuration = probe.expectMsgType[Long].nanos + def probe = prb + def step = sched.TickDuration + } + driver.expectWait() + try thunk(sched, driver) + finally sched.close() + } + +} diff --git a/akka-actor-tests/src/test/scala/akka/config/ConfigSpec.scala b/akka-actor-tests/src/test/scala/akka/config/ConfigSpec.scala index 9a43631894..0e080118b1 100644 --- a/akka-actor-tests/src/test/scala/akka/config/ConfigSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/config/ConfigSpec.scala @@ -32,10 +32,8 @@ class ConfigSpec extends AkkaSpec(ConfigFactory.defaultReference(ActorSystem.fin settings.SerializeAllMessages must equal(false) getInt("akka.scheduler.ticks-per-wheel") must equal(512) - settings.SchedulerTicksPerWheel must equal(512) - getMilliseconds("akka.scheduler.tick-duration") must equal(100) - settings.SchedulerTickDuration must equal(100 millis) + getString("akka.scheduler.implementation") must equal("akka.actor.LightArrayRevolverScheduler") getBoolean("akka.daemonic") must be(false) settings.Daemonicity must be(false) diff --git a/akka-actor/src/main/java/akka/actor/AbstractScheduler.java b/akka-actor/src/main/java/akka/actor/AbstractScheduler.java new file mode 100644 index 0000000000..fdf5fe9001 --- /dev/null +++ b/akka-actor/src/main/java/akka/actor/AbstractScheduler.java @@ -0,0 +1,53 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ + +package akka.actor; + +import scala.concurrent.ExecutionContext; +import scala.concurrent.duration.FiniteDuration; + +//#scheduler +/** + * An Akka scheduler service. This one needs one special behavior: if + * Closeable, it MUST execute all outstanding tasks upon .close() in order + * to properly shutdown all dispatchers. + * + * Furthermore, this timer service MUST throw IllegalStateException if it + * cannot schedule a task. Once scheduled, the task MUST be executed. If + * executed upon close(), the task may execute before its timeout. + * + * Scheduler implementation are loaded reflectively at ActorSystem start-up + * with the following constructor arguments: + * 1) the system’s com.typesafe.config.Config (from system.settings.config) + * 2) a akka.event.LoggingAdapter + * 3) a java.util.concurrent.ThreadFactory + */ +public abstract class AbstractScheduler extends AbstractSchedulerBase { + + /** + * Schedules a function to be run repeatedly with an initial delay and + * a frequency. E.g. if you would like the function to be run after 2 + * seconds and thereafter every 100ms you would set delay = Duration(2, + * TimeUnit.SECONDS) and interval = Duration(100, TimeUnit.MILLISECONDS) + */ + @Override + public abstract Cancellable schedule(FiniteDuration initialDelay, + FiniteDuration interval, Runnable runnable, ExecutionContext executor); + + /** + * Schedules a Runnable to be run once with a delay, i.e. a time period that + * has to pass before the runnable is executed. + */ + @Override + public abstract Cancellable scheduleOnce(FiniteDuration delay, Runnable runnable, + ExecutionContext executor); + + /** + * The maximum supported task frequency of this scheduler, i.e. the inverse + * of the minimum time interval between executions of a recurring task, in Hz. + */ + @Override + public abstract double maxFrequency(); +} +//#scheduler diff --git a/akka-actor/src/main/java/akka/util/internal/HashedWheelTimer.java b/akka-actor/src/main/java/akka/util/internal/HashedWheelTimer.java index fcdee4866f..e7addaa3d5 100644 --- a/akka-actor/src/main/java/akka/util/internal/HashedWheelTimer.java +++ b/akka-actor/src/main/java/akka/util/internal/HashedWheelTimer.java @@ -457,10 +457,11 @@ public class HashedWheelTimer implements Timer { return Unsafe.instance.compareAndSwapInt(this, _stateOffset, old, future); } - public void cancel() { + public boolean cancel() { if (updateState(ST_INIT, ST_CANCELLED)) { parent.wheel[stopIndex].remove(this); - } + return true; + } else return false; } public boolean isCancelled() { diff --git a/akka-actor/src/main/java/akka/util/internal/Timeout.java b/akka-actor/src/main/java/akka/util/internal/Timeout.java index a03534bb8d..b417796bfc 100644 --- a/akka-actor/src/main/java/akka/util/internal/Timeout.java +++ b/akka-actor/src/main/java/akka/util/internal/Timeout.java @@ -51,6 +51,10 @@ public interface Timeout { * Cancels the {@link TimerTask} associated with this handle. It the * task has been executed or cancelled already, it will return with no * side effect. + * + * @return whether the caller was the one who actually cancelled this + * timeout (there can be at most one; never returns true if the Timeout + * expired) */ - void cancel(); + boolean cancel(); } diff --git a/akka-actor/src/main/resources/reference.conf b/akka-actor/src/main/resources/reference.conf index aeee89a65f..0039320631 100644 --- a/akka-actor/src/main/resources/reference.conf +++ b/akka-actor/src/main/resources/reference.conf @@ -364,7 +364,31 @@ akka { # ticks per wheel. # For more information see: http://www.jboss.org/netty/ tick-duration = 100ms + + # The timer uses a circular wheel of buckets to store the timer tasks. + # This should be set such that the majority of scheduled timeouts (for high + # scheduling frequency) will be shorter than one rotation of the wheel + # (ticks-per-wheel * ticks-duration) + # THIS MUST BE A POWER OF TWO! ticks-per-wheel = 512 + + # This setting selects the timer implementation which shall be loaded at + # system start-up. Built-in choices are: + # - akka.actor.DefaultScheduler (HWT) + # - akka.actor.LightArrayRevolverScheduler + # (to be benchmarked and evaluated) + # The class given here must implement the akka.actor.Scheduler interface + # and offer a constructor which takes three arguments: + # 1) com.typesafe.config.Config + # 2) akka.event.LoggingAdapter + # 3) java.util.concurrent.ThreadFactory + implementation = akka.actor.LightArrayRevolverScheduler + + # When shutting down the scheduler, there will typically be a thread which + # needs to be stopped, and this timeout determines how long to wait for + # that to happen. In case of timeout the shutdown of the actor system will + # proceed without running possibly still enqueued tasks. + shutdown-timeout = 5s } io { diff --git a/akka-actor/src/main/scala/akka/actor/ActorCell.scala b/akka-actor/src/main/scala/akka/actor/ActorCell.scala index 51f11c044c..6f632dac45 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorCell.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorCell.scala @@ -286,8 +286,8 @@ private[akka] object ActorCell { } final val emptyCancellable: Cancellable = new Cancellable { - def isCancelled = false - def cancel() {} + def isCancelled: Boolean = false + def cancel(): Boolean = false } final val emptyBehaviorStack: List[Actor.Receive] = Nil diff --git a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala index 45025f1887..4f4b29480e 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala @@ -20,6 +20,7 @@ import akka.util.internal.{ HashedWheelTimer, ConcurrentIdentityHashMap } import java.util.concurrent.{ ThreadFactory, CountDownLatch, TimeoutException, RejectedExecutionException } import java.util.concurrent.TimeUnit.MILLISECONDS import akka.actor.dungeon.ChildrenContainer +import scala.concurrent.ExecutionContext object ActorSystem { @@ -161,8 +162,7 @@ object ActorSystem { case x ⇒ Some(x) } - final val SchedulerTickDuration: FiniteDuration = Duration(getMilliseconds("akka.scheduler.tick-duration"), MILLISECONDS) - final val SchedulerTicksPerWheel: Int = getInt("akka.scheduler.ticks-per-wheel") + final val SchedulerClass: String = getString("akka.scheduler.implementation") final val Daemonicity: Boolean = getBoolean("akka.daemonic") final val JvmExitOnFatalError: Boolean = getBoolean("akka.jvm-exit-on-fatal-error") @@ -601,6 +601,7 @@ private[akka] class ActorSystemImpl(val name: String, applicationConfig: Config, def shutdown(): Unit = guardian.stop() + //#create-scheduler /** * Create the scheduler service. This one needs one special behavior: if * Closeable, it MUST execute all outstanding tasks upon .close() in order @@ -611,12 +612,11 @@ private[akka] class ActorSystemImpl(val name: String, applicationConfig: Config, * executed upon close(), the task may execute before its timeout. */ protected def createScheduler(): Scheduler = - new DefaultScheduler( - new HashedWheelTimer(log, - threadFactory.withName(threadFactory.name + "-scheduler"), - settings.SchedulerTickDuration, - settings.SchedulerTicksPerWheel), - log) + dynamicAccess.createInstanceFor[Scheduler](settings.SchedulerClass, immutable.Seq( + classOf[Config] -> settings.config, + classOf[LoggingAdapter] -> log, + classOf[ThreadFactory] -> threadFactory.withName(threadFactory.name + "-scheduler"))).get + //#create-scheduler /* * This is called after the last actor has signaled its termination, i.e. @@ -635,8 +635,10 @@ private[akka] class ActorSystemImpl(val name: String, applicationConfig: Config, */ @tailrec private def findExtension[T <: Extension](ext: ExtensionId[T]): T = extensions.get(ext) match { - case c: CountDownLatch ⇒ c.await(); findExtension(ext) //Registration in process, await completion and retry - case other ⇒ other.asInstanceOf[T] //could be a T or null, in which case we return the null as T + case c: CountDownLatch ⇒ + c.await(); findExtension(ext) //Registration in process, await completion and retry + case other ⇒ + other.asInstanceOf[T] //could be a T or null, in which case we return the null as T } @tailrec diff --git a/akka-actor/src/main/scala/akka/actor/Scheduler.scala b/akka-actor/src/main/scala/akka/actor/Scheduler.scala index bbb830110d..52efe7455b 100644 --- a/akka-actor/src/main/scala/akka/actor/Scheduler.scala +++ b/akka-actor/src/main/scala/akka/actor/Scheduler.scala @@ -4,16 +4,28 @@ package akka.actor -import scala.concurrent.duration.Duration -import akka.util.internal.{ TimerTask, HashedWheelTimer, Timeout ⇒ HWTimeout, Timer } -import akka.event.LoggingAdapter -import akka.dispatch.MessageDispatcher import java.io.Closeable -import java.util.concurrent.atomic.{ AtomicReference, AtomicLong } +import java.util.concurrent.ThreadFactory +import java.util.concurrent.atomic.{ AtomicLong, AtomicReference, AtomicReferenceArray } + import scala.annotation.tailrec -import akka.util.internal._ -import concurrent.ExecutionContext -import scala.concurrent.duration.FiniteDuration +import scala.collection.immutable +import scala.concurrent.{ Await, ExecutionContext, Future, Promise } +import scala.concurrent.duration._ +import scala.util.control.{ NoStackTrace, NonFatal } + +import com.typesafe.config.Config + +import akka.event.LoggingAdapter +import akka.util.Helpers +import akka.util.Unsafe.{ instance ⇒ unsafe } +import akka.util.internal.{ HashedWheelTimer, Timeout ⇒ HWTimeout, Timer ⇒ HWTimer, TimerTask ⇒ HWTimerTask } + +/** + * This exception is thrown by Scheduler.schedule* when scheduling is not + * possible, e.g. after shutting down the Scheduler. + */ +private case class SchedulerException(msg: String) extends akka.AkkaException(msg) with NoStackTrace // The Scheduler trait is included in the documentation. KEEP THE LINES SHORT!!! //#scheduler @@ -25,6 +37,12 @@ import scala.concurrent.duration.FiniteDuration * Furthermore, this timer service MUST throw IllegalStateException if it * cannot schedule a task. Once scheduled, the task MUST be executed. If * executed upon close(), the task may execute before its timeout. + * + * Scheduler implementation are loaded reflectively at ActorSystem start-up + * with the following constructor arguments: + * 1) the system’s com.typesafe.config.Config (from system.settings.config) + * 2) a akka.event.LoggingAdapter + * 3) a java.util.concurrent.ThreadFactory */ trait Scheduler { /** @@ -35,11 +53,19 @@ trait Scheduler { * * Java & Scala API */ - def schedule( + final def schedule( initialDelay: FiniteDuration, interval: FiniteDuration, receiver: ActorRef, - message: Any)(implicit executor: ExecutionContext, sender: ActorRef = Actor.noSender): Cancellable + message: Any)(implicit executor: ExecutionContext, + sender: ActorRef = Actor.noSender): Cancellable = + schedule(initialDelay, interval, new Runnable { + def run = { + receiver ! message + if (receiver.isTerminated) + throw new SchedulerException("timer active for terminated actor") + } + }) /** * Schedules a function to be run repeatedly with an initial delay and a @@ -49,10 +75,11 @@ trait Scheduler { * * Scala API */ - def schedule( + final def schedule( initialDelay: FiniteDuration, interval: FiniteDuration)(f: ⇒ Unit)( - implicit executor: ExecutionContext): Cancellable + implicit executor: ExecutionContext): Cancellable = + schedule(initialDelay, interval, new Runnable { override def run = f }) /** * Schedules a function to be run repeatedly with an initial delay and @@ -67,6 +94,31 @@ trait Scheduler { interval: FiniteDuration, runnable: Runnable)(implicit executor: ExecutionContext): Cancellable + /** + * Schedules a message to be sent once with a delay, i.e. a time period that has + * to pass before the message is sent. + * + * Java & Scala API + */ + final def scheduleOnce( + delay: FiniteDuration, + receiver: ActorRef, + message: Any)(implicit executor: ExecutionContext, + sender: ActorRef = Actor.noSender): Cancellable = + scheduleOnce(delay, new Runnable { + override def run = receiver ! message + }) + + /** + * Schedules a function to be run once with a delay, i.e. a time period that has + * to pass before the function is run. + * + * Scala API + */ + final def scheduleOnce(delay: FiniteDuration)(f: ⇒ Unit)( + implicit executor: ExecutionContext): Cancellable = + scheduleOnce(delay, new Runnable { override def run = f }) + /** * Schedules a Runnable to be run once with a delay, i.e. a time period that * has to pass before the runnable is executed. @@ -78,28 +130,17 @@ trait Scheduler { runnable: Runnable)(implicit executor: ExecutionContext): Cancellable /** - * Schedules a message to be sent once with a delay, i.e. a time period that has - * to pass before the message is sent. - * - * Java & Scala API + * The maximum supported task frequency of this scheduler, i.e. the inverse + * of the minimum time interval between executions of a recurring task, in Hz. */ - def scheduleOnce( - delay: FiniteDuration, - receiver: ActorRef, - message: Any)(implicit executor: ExecutionContext): Cancellable + def maxFrequency: Double - /** - * Schedules a function to be run once with a delay, i.e. a time period that has - * to pass before the function is run. - * - * Scala API - */ - def scheduleOnce( - delay: FiniteDuration)(f: ⇒ Unit)( - implicit executor: ExecutionContext): Cancellable } //#scheduler +// this one is just here so we can present a nice AbstractScheduler for Java +abstract class AbstractSchedulerBase extends Scheduler + //#cancellable /** * Signifies something that can be cancelled @@ -108,14 +149,16 @@ trait Scheduler { */ trait Cancellable { /** - * Cancels this Cancellable + * Cancels this Cancellable and returns true if that was successful. + * If this cancellable was (concurrently) cancelled already, then this method + * will return false although isCancelled will return true. * * Java & Scala API */ - def cancel(): Unit + def cancel(): Boolean /** - * Returns whether this Cancellable has been cancelled + * Returns true if and only if this Cancellable has been successfully cancelled * * Java & Scala API */ @@ -123,6 +166,308 @@ trait Cancellable { } //#cancellable +/** + * This scheduler implementation is based on a revolving wheel of buckets, + * like Netty’s HashedWheelTimer, which it advances at a fixed tick rate and + * dispatches tasks it finds in the current bucket to their respective + * ExecutionContexts. The tasks are held in TaskHolders, which upon + * cancellation null out their reference to the actual task, leaving only this + * shell to be cleaned up when the wheel reaches that bucket next time. This + * enables the use of a simple linked list to chain the TaskHolders off the + * wheel. + * + * Also noteworthy is that this scheduler does not obtain a current time stamp + * when scheduling single-shot tasks, instead it always rounds up the task + * delay to a full multiple of the TickDuration. This means that tasks are + * scheduled possibly one tick later than they could be (if checking that + * “now() + delay <= nextTick” were done). + */ +class LightArrayRevolverScheduler(config: Config, + log: LoggingAdapter, + threadFactory: ThreadFactory) + extends { + val WheelShift = { + val ticks = config.getInt("akka.scheduler.ticks-per-wheel") + val shift = 31 - Integer.numberOfLeadingZeros(ticks) + if ((ticks & (ticks - 1)) != 0) throw new akka.ConfigurationException("ticks-per-wheel must be a power of 2") + shift + } + val TickDuration = Duration(config.getMilliseconds("akka.scheduler.tick-duration"), MILLISECONDS) + val ShutdownTimeout = Duration(config.getMilliseconds("akka.scheduler.shutdown-timeout"), MILLISECONDS) + } with AtomicReferenceArray[LightArrayRevolverScheduler.TaskHolder](1 << WheelShift) with Scheduler with Closeable { + + import LightArrayRevolverScheduler._ + + private val oneNs = Duration.fromNanos(1l) + private def roundUp(d: FiniteDuration): FiniteDuration = + try { + ((d + TickDuration - oneNs) / TickDuration).toLong * TickDuration + } catch { + case _: IllegalArgumentException ⇒ d // rouding up Long.MaxValue.nanos overflows + } + + /** + * Clock implementation is replaceable (for testing); the implementation must + * return a monotonically increasing series of Long nanoseconds. + */ + protected def clock(): Long = System.nanoTime + + /** + * Overridable for tests + */ + protected def waitNanos(nanos: Long): Unit = { + // see http://www.javamex.com/tutorials/threads/sleep_issues.shtml + val sleepMs = if (Helpers.isWindows) (nanos + 4999999) / 10000000 * 10 else (nanos + 999999) / 1000000 + try Thread.sleep(sleepMs) catch { + case _: InterruptedException ⇒ Thread.currentThread.interrupt() // we got woken up + } + } + + override def schedule(initialDelay: FiniteDuration, + delay: FiniteDuration, + runnable: Runnable)(implicit executor: ExecutionContext): Cancellable = + try new AtomicReference[Cancellable] with Cancellable { self ⇒ + set(schedule( + new AtomicLong(clock() + initialDelay.toNanos) with Runnable { + override def run(): Unit = { + try { + runnable.run() + val driftNanos = clock() - getAndAdd(delay.toNanos) + if (self.get != null) + swap(schedule(this, Duration.fromNanos(Math.max(delay.toNanos - driftNanos, 1)))) + } catch { + case _: SchedulerException ⇒ // ignore failure to enqueue or terminated target actor + } + } + }, roundUp(initialDelay))) + + @tailrec private def swap(c: Cancellable): Unit = { + get match { + case null ⇒ if (c != null) c.cancel() + case old ⇒ if (!compareAndSet(old, c)) swap(c) + } + } + + @tailrec final def cancel(): Boolean = { + get match { + case null ⇒ false + case c ⇒ + if (c.cancel()) compareAndSet(c, null) + else compareAndSet(c, null) || cancel() + } + } + + override def isCancelled: Boolean = get == null + } catch { + case SchedulerException(msg) ⇒ throw new IllegalStateException(msg) + } + + override def scheduleOnce(delay: FiniteDuration, runnable: Runnable)(implicit executor: ExecutionContext): Cancellable = + try schedule(runnable, roundUp(delay)) + catch { + case SchedulerException(msg) ⇒ throw new IllegalStateException(msg) + } + + private def execDirectly(t: TimerTask): Unit = { + try t.run() catch { + case e: InterruptedException ⇒ throw e + case _: SchedulerException ⇒ // ignore terminated actors + case NonFatal(e) ⇒ log.error(e, "exception while executing timer task") + } + } + + override def close(): Unit = Await.result(stop(), ShutdownTimeout) foreach execDirectly + + override val maxFrequency: Double = 1.second / TickDuration + + /* + * BELOW IS THE ACTUAL TIMER IMPLEMENTATION + */ + + private val start = clock() + private val tickNanos = TickDuration.toNanos + private val wheelMask = length() - 1 + @volatile private var currentBucket = 0 + + private def schedule(r: Runnable, delay: FiniteDuration)(implicit ec: ExecutionContext): TimerTask = + if (delay <= Duration.Zero) { + if (stopped.get != null) throw new SchedulerException("cannot enqueue after timer shutdown") + ec.execute(r) + NotCancellable + } else { + val ticks = (delay.toNanos / tickNanos).toInt + val rounds = (ticks >> WheelShift).toInt + + /* + * works as follows: + * - ticks are calculated to be never “too early” + * - base off of currentBucket, even after that was moved in the meantime + * - timer thread will swap in Pause, increment currentBucket, swap in null + * - hence spin on Pause, else normal CAS + * - stopping will set all buckets to Pause (in clearAll), so we need only check there + */ + @tailrec + def rec(t: TaskHolder): TimerTask = { + val bucket = (currentBucket + ticks) & wheelMask + get(bucket) match { + case Pause ⇒ + if (stopped.get != null) throw new SchedulerException("cannot enqueue after timer shutdown") + rec(t) + case tail ⇒ + t.next = tail + if (compareAndSet(bucket, tail, t)) t + else rec(t) + } + } + + rec(new TaskHolder(r, null, rounds)) + } + + private val stopped = new AtomicReference[Promise[immutable.Seq[TimerTask]]] + def stop(): Future[immutable.Seq[TimerTask]] = + if (stopped.compareAndSet(null, Promise())) { + timerThread.interrupt() + stopped.get.future + } else Future.successful(Nil) + + private def clearAll(): immutable.Seq[TimerTask] = { + def collect(curr: TaskHolder, acc: Vector[TimerTask]): Vector[TimerTask] = { + curr match { + case null ⇒ acc + case x ⇒ collect(x.next, acc :+ x) + } + } + (0 until length()) flatMap (i ⇒ collect(getAndSet(i, Pause), Vector.empty)) + } + + @volatile private var timerThread: Thread = threadFactory.newThread(new Runnable { + var tick = 0 + override final def run = + try nextTick() + catch { + case t: Throwable ⇒ + val thread = threadFactory.newThread(this) + try thread.start() + finally timerThread = thread + throw t + } + @tailrec final def nextTick(): Unit = { + val sleepTime = start + tick * tickNanos - clock() + + if (sleepTime > 0) { + waitNanos(sleepTime) + } else { + // first get the list of tasks out and turn the wheel + val bucket = currentBucket + val tasks = getAndSet(bucket, Pause) + val next = (bucket + 1) & wheelMask + currentBucket = next + set(bucket, if (tasks eq null) Empty else null) + + // then process the tasks and keep the non-ripe ones in a list + var last: TaskHolder = null // the last element of the putBack list + @tailrec def rec1(task: TaskHolder, nonRipe: TaskHolder): TaskHolder = { + if ((task eq null) || (task eq Empty)) nonRipe + else if (task.isCancelled) rec1(task.next, nonRipe) + else if (task.rounds > 0) { + task.rounds -= 1 + + val next = task.next + task.next = nonRipe + + if (last == null) last = task + rec1(next, task) + } else { + task.executeTask() + rec1(task.next, nonRipe) + } + } + val putBack = rec1(tasks, null) + + // finally put back the non-ripe ones, who had their rounds decremented + @tailrec def rec2() { + val tail = get(bucket) + last.next = tail + if (!compareAndSet(bucket, tail, putBack)) rec2() + } + if (last != null) rec2() + + // and off to the next tick + tick += 1 + } + stopped.get match { + case null ⇒ nextTick() + case x ⇒ x success clearAll() + } + } + }) + + timerThread.start() +} + +object LightArrayRevolverScheduler { + private val taskOffset = unsafe.objectFieldOffset(classOf[TaskHolder].getDeclaredField("task")) + + /** + * INTERNAL API + */ + protected[actor] trait TimerTask extends Runnable with Cancellable + + /** + * INTERNAL API + */ + protected[actor] class TaskHolder(@volatile var task: Runnable, + @volatile var next: TaskHolder, + @volatile var rounds: Int)( + implicit executionContext: ExecutionContext) extends TimerTask { + @tailrec + private final def extractTask(cancel: Boolean): Runnable = { + task match { + case null | CancelledTask ⇒ null // null means expired + case x ⇒ + if (unsafe.compareAndSwapObject(this, taskOffset, x, if (cancel) CancelledTask else null)) x + else extractTask(cancel) + } + } + + private[akka] final def executeTask(): Boolean = extractTask(cancel = false) match { + case null | CancelledTask ⇒ false + case other ⇒ + try { + executionContext execute other + true + } catch { + case _: InterruptedException ⇒ { Thread.currentThread.interrupt(); false } + case NonFatal(e) ⇒ { executionContext.reportFailure(e); false } + } + } + + /** + * utility method to directly run the task, e.g. as clean-up action + */ + def run(): Unit = extractTask(cancel = false) match { + case null ⇒ + case r ⇒ r.run() + } + + override def cancel(): Boolean = extractTask(cancel = true) != null + + override def isCancelled: Boolean = task eq CancelledTask + } + + private val CancelledTask = new Runnable { def run = () } + + private val NotCancellable = new TimerTask { + def cancel(): Boolean = false + def isCancelled: Boolean = false + def run(): Unit = () + } + // marker object during wheel movement + private val Pause = new TaskHolder(null, null, 0)(null) + // we need two empty tokens so wheel passing can be detected in schedule() + private val Empty = new TaskHolder(null, null, 0)(null) +} + /** * Scheduled tasks (Runnable and functions) are executed with the supplied dispatcher. * Note that dispatcher is by-name parameter, because dispatcher might not be initialized @@ -132,35 +477,19 @@ trait Cancellable { * if it does not enqueue a task. Once a task is queued, it MUST be executed or * returned from stop(). */ -class DefaultScheduler(hashedWheelTimer: HashedWheelTimer, log: LoggingAdapter) extends Scheduler with Closeable { - override def schedule(initialDelay: FiniteDuration, - delay: FiniteDuration, - receiver: ActorRef, - message: Any)(implicit executor: ExecutionContext, sender: ActorRef = Actor.noSender): Cancellable = { - val continuousCancellable = new ContinuousCancellable - continuousCancellable.init( - hashedWheelTimer.newTimeout( - new AtomicLong(System.nanoTime + initialDelay.toNanos) with TimerTask with ContinuousScheduling { - def run(timeout: HWTimeout) { - executor execute new Runnable { - override def run = { - receiver ! message - // Check if the receiver is still alive and kicking before reschedule the task - if (receiver.isTerminated) log.debug("Could not reschedule message to be sent because receiving actor {} has been terminated.", receiver) - else { - val driftNanos = System.nanoTime - getAndAdd(delay.toNanos) - scheduleNext(timeout, Duration.fromNanos(Math.max(delay.toNanos - driftNanos, 1)), continuousCancellable) - } - } - } - } - }, - initialDelay)) - } +class DefaultScheduler(config: Config, + log: LoggingAdapter, + threadFactory: ThreadFactory) extends Scheduler with Closeable { - override def schedule(initialDelay: FiniteDuration, - delay: FiniteDuration)(f: ⇒ Unit)(implicit executor: ExecutionContext): Cancellable = - schedule(initialDelay, delay, new Runnable { override def run = f }) + val TicksPerWheel = { + val ticks = config.getInt("akka.scheduler.ticks-per-wheel") + val shift = 31 - Integer.numberOfLeadingZeros(ticks) + if ((ticks & (ticks - 1)) != 0) throw new akka.ConfigurationException("ticks-per-wheel must be a power of 2") + ticks + } + val TickDuration = Duration(config.getMilliseconds("akka.scheduler.tick-duration"), MILLISECONDS) + + private val hashedWheelTimer = new HashedWheelTimer(log, threadFactory, TickDuration, TicksPerWheel) override def schedule(initialDelay: FiniteDuration, delay: FiniteDuration, @@ -168,14 +497,19 @@ class DefaultScheduler(hashedWheelTimer: HashedWheelTimer, log: LoggingAdapter) val continuousCancellable = new ContinuousCancellable continuousCancellable.init( hashedWheelTimer.newTimeout( - new AtomicLong(System.nanoTime + initialDelay.toNanos) with TimerTask with ContinuousScheduling { - override def run(timeout: HWTimeout): Unit = executor.execute(new Runnable { - override def run = { - runnable.run() - val driftNanos = System.nanoTime - getAndAdd(delay.toNanos) - scheduleNext(timeout, Duration.fromNanos(Math.max(delay.toNanos - driftNanos, 1)), continuousCancellable) - } - }) + new AtomicLong(System.nanoTime + initialDelay.toNanos) with HWTimerTask with ContinuousScheduling { + override def run(timeout: HWTimeout): Unit = + executor.execute(new Runnable { + override def run = { + try { + runnable.run() + val driftNanos = System.nanoTime - getAndAdd(delay.toNanos) + scheduleNext(timeout, Duration.fromNanos(Math.max(delay.toNanos - driftNanos, 1)), continuousCancellable) + } catch { + case _: SchedulerException ⇒ // actor target terminated + } + } + }) }, initialDelay)) } @@ -183,16 +517,10 @@ class DefaultScheduler(hashedWheelTimer: HashedWheelTimer, log: LoggingAdapter) override def scheduleOnce(delay: FiniteDuration, runnable: Runnable)(implicit executor: ExecutionContext): Cancellable = new DefaultCancellable( hashedWheelTimer.newTimeout( - new TimerTask() { def run(timeout: HWTimeout): Unit = executor.execute(runnable) }, + new HWTimerTask() { def run(timeout: HWTimeout): Unit = executor.execute(runnable) }, delay)) - override def scheduleOnce(delay: FiniteDuration, receiver: ActorRef, message: Any)(implicit executor: ExecutionContext): Cancellable = - scheduleOnce(delay, new Runnable { override def run = receiver ! message }) - - override def scheduleOnce(delay: FiniteDuration)(f: ⇒ Unit)(implicit executor: ExecutionContext): Cancellable = - scheduleOnce(delay, new Runnable { override def run = f }) - - private trait ContinuousScheduling { this: TimerTask ⇒ + private trait ContinuousScheduling { this: HWTimerTask ⇒ def scheduleNext(timeout: HWTimeout, delay: FiniteDuration, delegator: ContinuousCancellable) { try delegator.swap(timeout.getTimer.newTimeout(this, delay)) catch { case _: IllegalStateException ⇒ } // stop recurring if timer is stopped } @@ -209,23 +537,25 @@ class DefaultScheduler(hashedWheelTimer: HashedWheelTimer, log: LoggingAdapter) val i = hashedWheelTimer.stop().iterator() while (i.hasNext) execDirectly(i.next()) } + + override def maxFrequency: Double = 1.second / TickDuration } private[akka] object ContinuousCancellable { val initial: HWTimeout = new HWTimeout { - override def getTimer: Timer = null - override def getTask: TimerTask = null + override def getTimer: HWTimer = null + override def getTask: HWTimerTask = null override def isExpired: Boolean = false override def isCancelled: Boolean = false - override def cancel: Unit = () + override def cancel: Boolean = true } val cancelled: HWTimeout = new HWTimeout { - override def getTimer: Timer = null - override def getTask: TimerTask = null + override def getTimer: HWTimer = null + override def getTask: HWTimerTask = null override def isExpired: Boolean = false override def isCancelled: Boolean = true - override def cancel: Unit = () + override def cancel: Boolean = false } } /** @@ -245,10 +575,10 @@ private[akka] class ContinuousCancellable extends AtomicReference[HWTimeout](Con } def isCancelled(): Boolean = get().isCancelled() - def cancel(): Unit = getAndSet(ContinuousCancellable.cancelled).cancel() + def cancel(): Boolean = getAndSet(ContinuousCancellable.cancelled).cancel() } private[akka] class DefaultCancellable(timeout: HWTimeout) extends AtomicReference[HWTimeout](timeout) with Cancellable { - override def cancel(): Unit = getAndSet(ContinuousCancellable.cancelled).cancel() + override def cancel(): Boolean = getAndSet(ContinuousCancellable.cancelled).cancel() override def isCancelled: Boolean = get().isCancelled } diff --git a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala index 54d3e6bfa3..cc2e67e7e3 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala @@ -23,6 +23,7 @@ import java.util.concurrent.atomic.AtomicBoolean import java.util.concurrent.atomic.AtomicReference import akka.util.internal.HashedWheelTimer import concurrent.{ ExecutionContext, Await } +import com.typesafe.config.ConfigFactory /** * Cluster Extension Id and factory for creating Cluster extension. @@ -88,31 +89,26 @@ class Cluster(val system: ExtendedActorSystem) extends Extension { * INTERNAL API */ private[cluster] val scheduler: Scheduler with Closeable = { - if (system.settings.SchedulerTickDuration > SchedulerTickDuration) { + if (system.scheduler.maxFrequency < 1.second / SchedulerTickDuration) { + import scala.collection.JavaConverters._ log.info("Using a dedicated scheduler for cluster. Default scheduler can be used if configured " + "with 'akka.scheduler.tick-duration' [{} ms] <= 'akka.cluster.scheduler.tick-duration' [{} ms].", - system.settings.SchedulerTickDuration.toMillis, SchedulerTickDuration.toMillis) + 1000 / system.scheduler.maxFrequency, SchedulerTickDuration.toMillis) new DefaultScheduler( - new HashedWheelTimer(log, - system.threadFactory match { - case tf: MonitorableThreadFactory ⇒ tf.withName(tf.name + "-cluster-scheduler") - case tf ⇒ tf - }, - SchedulerTickDuration, - SchedulerTicksPerWheel), - log) + ConfigFactory.parseString(s"akka.scheduler.tick-duration=${SchedulerTickDuration.toMillis}ms").withFallback( + system.settings.config), + log, + system.threadFactory match { + case tf: MonitorableThreadFactory ⇒ tf.withName(tf.name + "-cluster-scheduler") + case tf ⇒ tf + }) } else { // delegate to system.scheduler, but don't close over system val systemScheduler = system.scheduler new Scheduler with Closeable { override def close(): Unit = () // we are using system.scheduler, which we are not responsible for closing - override def schedule(initialDelay: FiniteDuration, interval: FiniteDuration, - receiver: ActorRef, message: Any)(implicit executor: ExecutionContext, sender: ActorRef = Actor.noSender): Cancellable = - systemScheduler.schedule(initialDelay, interval, receiver, message) - - override def schedule(initialDelay: FiniteDuration, interval: FiniteDuration)(f: ⇒ Unit)(implicit executor: ExecutionContext): Cancellable = - systemScheduler.schedule(initialDelay, interval)(f) + override def maxFrequency: Double = systemScheduler.maxFrequency override def schedule(initialDelay: FiniteDuration, interval: FiniteDuration, runnable: Runnable)(implicit executor: ExecutionContext): Cancellable = @@ -121,13 +117,6 @@ class Cluster(val system: ExtendedActorSystem) extends Extension { override def scheduleOnce(delay: FiniteDuration, runnable: Runnable)(implicit executor: ExecutionContext): Cancellable = systemScheduler.scheduleOnce(delay, runnable) - - override def scheduleOnce(delay: FiniteDuration, receiver: ActorRef, - message: Any)(implicit executor: ExecutionContext): Cancellable = - systemScheduler.scheduleOnce(delay, receiver, message) - - override def scheduleOnce(delay: FiniteDuration)(f: ⇒ Unit)(implicit executor: ExecutionContext): Cancellable = - systemScheduler.scheduleOnce(delay)(f) } } } diff --git a/akka-docs/rst/java/code/docs/actor/SchedulerDocTestBase.java b/akka-docs/rst/java/code/docs/actor/SchedulerDocTestBase.java index 66b5181ba8..d49ddda85e 100644 --- a/akka-docs/rst/java/code/docs/actor/SchedulerDocTestBase.java +++ b/akka-docs/rst/java/code/docs/actor/SchedulerDocTestBase.java @@ -10,6 +10,7 @@ import java.util.concurrent.TimeUnit; //#imports1 //#imports2 +import akka.actor.Actor; import akka.actor.UntypedActor; import akka.actor.UntypedActorFactory; import akka.actor.Cancellable; @@ -43,7 +44,7 @@ public class SchedulerDocTestBase { public void scheduleOneOffTask() { //#schedule-one-off-message system.scheduler().scheduleOnce(Duration.create(50, TimeUnit.MILLISECONDS), - testActor, "foo", system.dispatcher()); + testActor, "foo", system.dispatcher(), null); //#schedule-one-off-message //#schedule-one-off-thunk diff --git a/akka-docs/rst/java/code/docs/actor/japi/FaultHandlingDocSample.java b/akka-docs/rst/java/code/docs/actor/japi/FaultHandlingDocSample.java index 37d1da703a..74c7035fe6 100644 --- a/akka-docs/rst/java/code/docs/actor/japi/FaultHandlingDocSample.java +++ b/akka-docs/rst/java/code/docs/actor/japi/FaultHandlingDocSample.java @@ -311,9 +311,8 @@ public class FaultHandlingDocSample { counter.tell(new UseStorage(null), getSelf()); // Try to re-establish storage after while getContext().system().scheduler().scheduleOnce( - Duration.create(10, "seconds"), getSelf(), Reconnect, - getContext().dispatcher() - ); + Duration.create(10, "seconds"), getSelf(), Reconnect, + getContext().dispatcher(), null); } else if (msg.equals(Reconnect)) { // Re-establish storage after the scheduled delay initStorage(); diff --git a/akka-docs/rst/java/code/docs/pattern/SchedulerPatternTest.java b/akka-docs/rst/java/code/docs/pattern/SchedulerPatternTest.java index bb3eb8b777..16080e948a 100644 --- a/akka-docs/rst/java/code/docs/pattern/SchedulerPatternTest.java +++ b/akka-docs/rst/java/code/docs/pattern/SchedulerPatternTest.java @@ -84,7 +84,7 @@ public class SchedulerPatternTest { public void preStart() { getContext().system().scheduler().scheduleOnce( Duration.create(500, TimeUnit.MILLISECONDS), - getSelf(), "tick", getContext().dispatcher()); + getSelf(), "tick", getContext().dispatcher(), null); } // override postRestart so we don't call preStart and schedule a new message @@ -98,7 +98,7 @@ public class SchedulerPatternTest { // send another periodic tick after the specified delay getContext().system().scheduler().scheduleOnce( Duration.create(1000, TimeUnit.MILLISECONDS), - getSelf(), "tick", getContext().dispatcher()); + getSelf(), "tick", getContext().dispatcher(), null); // do something useful here //#schedule-receive target.tell(message, getSelf()); diff --git a/akka-docs/rst/java/code/docs/pattern/SupervisedAsk.java b/akka-docs/rst/java/code/docs/pattern/SupervisedAsk.java index 248b8acd42..1c8d92166c 100644 --- a/akka-docs/rst/java/code/docs/pattern/SupervisedAsk.java +++ b/akka-docs/rst/java/code/docs/pattern/SupervisedAsk.java @@ -5,6 +5,7 @@ import java.util.concurrent.TimeoutException; import scala.concurrent.Future; import scala.concurrent.duration.Duration; +import akka.actor.Actor; import akka.actor.ActorKilledException; import akka.actor.ActorRef; import akka.actor.ActorRefFactory; @@ -79,7 +80,7 @@ public class SupervisedAsk { targetActor.forward(askParam.message, getContext()); Scheduler scheduler = getContext().system().scheduler(); timeoutMessage = scheduler.scheduleOnce(askParam.timeout.duration(), - self(), new AskTimeout(), context().dispatcher()); + self(), new AskTimeout(), context().dispatcher(), null); } else if (message instanceof Terminated) { Throwable ex = new ActorKilledException("Target actor terminated."); caller.tell(new Status.Failure(ex), self()); diff --git a/akka-docs/rst/java/scheduler.rst b/akka-docs/rst/java/scheduler.rst index 6676dcc6f0..42c44443f4 100644 --- a/akka-docs/rst/java/scheduler.rst +++ b/akka-docs/rst/java/scheduler.rst @@ -5,22 +5,23 @@ Scheduler (Java) ################## -Sometimes the need for making things happen in the future arises, and where do you go look then? -Look no further than ``ActorSystem``! There you find the :meth:`scheduler` method that returns an instance -of akka.actor.Scheduler, this instance is unique per ActorSystem and is used internally for scheduling things -to happen at specific points in time. Please note that the scheduled tasks are executed by the default -``MessageDispatcher`` of the ``ActorSystem``. +Sometimes the need for making things happen in the future arises, and where do +you go look then? Look no further than ``ActorSystem``! There you find the +:meth:`scheduler` method that returns an instance of +:class:`akka.actor.Scheduler`, this instance is unique per ActorSystem and is +used internally for scheduling things to happen at specific points in time. -You can schedule sending of messages to actors and execution of tasks (functions or Runnable). -You will get a ``Cancellable`` back that you can call :meth:`cancel` on to cancel the execution of the -scheduled operation. +You can schedule sending of messages to actors and execution of tasks +(functions or Runnable). You will get a ``Cancellable`` back that you can call +:meth:`cancel` on to cancel the execution of the scheduled operation. .. warning:: - The default implementation of ``Scheduler`` used by Akka is based on the Netty ``HashedWheelTimer``. - It does not execute tasks at the exact time, but on every tick, it will run everything that is overdue. - The accuracy of the default Scheduler can be modified by the "ticks-per-wheel" and "tick-duration" configuration - properties. For more information, see: `HashedWheelTimers `_. + The default implementation of ``Scheduler`` used by Akka is based on job + buckets which are emptied according to a fixed schedule. It does not + execute tasks at the exact time, but on every tick, it will run everything + that is (over)due. The accuracy of the default Scheduler can be modified + by the ``akka.scheduler.tick-duration`` configuration property. Some examples ------------- @@ -53,19 +54,29 @@ From ``akka.actor.ActorSystem`` :include: scheduler -The Scheduler interface ------------------------ +The Scheduler Interface for Implementors +---------------------------------------- -.. includecode:: ../../../akka-actor/src/main/scala/akka/actor/Scheduler.scala +The actual scheduler implementation is loaded reflectively upon +:class:`ActorSystem` start-up, which means that it is possible to provide a +different one using the ``akka.scheduler.implementation`` configuration +property. The referenced class must implement the following interface: + +.. includecode:: ../../../akka-actor/src/main/java/akka/actor/AbstractScheduler.java :include: scheduler The Cancellable interface ------------------------- -This allows you to ``cancel`` something that has been scheduled for execution. +Scheduling a task will result in a :class:`Cancellable` (or throw an +:class:`IllegalStateException` if attempted after the scheduler’s shutdown). +This allows you to cancel something that has been scheduled for execution. .. warning:: - This does not abort the execution of the task, if it had already been started. + + This does not abort the execution of the task, if it had already been + started. Check the return value of ``cancel`` to detect whether the + scheduled task was canceled or will (eventually) have run. .. includecode:: ../../../akka-actor/src/main/scala/akka/actor/Scheduler.scala :include: cancellable diff --git a/akka-docs/rst/scala/code/docs/actor/SchedulerDocSpec.scala b/akka-docs/rst/scala/code/docs/actor/SchedulerDocSpec.scala index 5b46d94298..d1344f1c72 100644 --- a/akka-docs/rst/scala/code/docs/actor/SchedulerDocSpec.scala +++ b/akka-docs/rst/scala/code/docs/actor/SchedulerDocSpec.scala @@ -29,7 +29,7 @@ class SchedulerDocSpec extends AkkaSpec(Map("akka.loglevel" -> "INFO")) { expectMsg(1 second, "foo") //#schedule-one-off-thunk - //Schedules a function to be executed (send the current time) to the testActor after 50ms + //Schedules a function to be executed (send a message to the testActor) after 50ms system.scheduler.scheduleOnce(50 milliseconds) { testActor ! System.currentTimeMillis } diff --git a/akka-docs/rst/scala/scheduler.rst b/akka-docs/rst/scala/scheduler.rst index e221752fac..b67a9a7de5 100644 --- a/akka-docs/rst/scala/scheduler.rst +++ b/akka-docs/rst/scala/scheduler.rst @@ -5,22 +5,23 @@ Scheduler (Scala) ################### -Sometimes the need for making things happen in the future arises, and where do you go look then? -Look no further than ``ActorSystem``! There you find the :meth:`scheduler` method that returns an instance -of akka.actor.Scheduler, this instance is unique per ActorSystem and is used internally for scheduling things -to happen at specific points in time. Please note that the scheduled tasks are executed by the default -``MessageDispatcher`` of the ``ActorSystem``. +Sometimes the need for making things happen in the future arises, and where do +you go look then? Look no further than ``ActorSystem``! There you find the +:meth:`scheduler` method that returns an instance of +:class:`akka.actor.Scheduler`, this instance is unique per ActorSystem and is +used internally for scheduling things to happen at specific points in time. -You can schedule sending of messages to actors and execution of tasks (functions or Runnable). -You will get a ``Cancellable`` back that you can call :meth:`cancel` on to cancel the execution of the -scheduled operation. +You can schedule sending of messages to actors and execution of tasks +(functions or Runnable). You will get a ``Cancellable`` back that you can call +:meth:`cancel` on to cancel the execution of the scheduled operation. .. warning:: - The default implementation of ``Scheduler`` used by Akka is based on the Netty ``HashedWheelTimer``. - It does not execute tasks at the exact time, but on every tick, it will run everything that is overdue. - The accuracy of the default Scheduler can be modified by the "ticks-per-wheel" and "tick-duration" configuration - properties. For more information, see: `HashedWheelTimers `_. + The default implementation of ``Scheduler`` used by Akka is based on job + buckets which are emptied according to a fixed schedule. It does not + execute tasks at the exact time, but on every tick, it will run everything + that is (over)due. The accuracy of the default Scheduler can be modified + by the ``akka.scheduler.tick-duration`` configuration property. Some examples ------------- @@ -44,16 +45,26 @@ From ``akka.actor.ActorSystem`` The Scheduler interface ----------------------- +The actual scheduler implementation is loaded reflectively upon +:class:`ActorSystem` start-up, which means that it is possible to provide a +different one using the ``akka.scheduler.implementation`` configuration +property. The referenced class must implement the following interface: + .. includecode:: ../../../akka-actor/src/main/scala/akka/actor/Scheduler.scala :include: scheduler The Cancellable interface ------------------------- -This allows you to ``cancel`` something that has been scheduled for execution. +Scheduling a task will result in a :class:`Cancellable` (or throw an +:class:`IllegalStateException` if attempted after the scheduler’s shutdown). +This allows you to cancel something that has been scheduled for execution. .. warning:: - This does not abort the execution of the task, if it had already been started. + + This does not abort the execution of the task, if it had already been + started. Check the return value of ``cancel`` to detect whether the + scheduled task was canceled or will (eventually) have run. .. includecode:: ../../../akka-actor/src/main/scala/akka/actor/Scheduler.scala :include: cancellable diff --git a/akka-osgi/src/main/scala/akka/osgi/OsgiActorSystemFactory.scala b/akka-osgi/src/main/scala/akka/osgi/OsgiActorSystemFactory.scala index 620e4bfc5a..b0732559a2 100644 --- a/akka-osgi/src/main/scala/akka/osgi/OsgiActorSystemFactory.scala +++ b/akka-osgi/src/main/scala/akka/osgi/OsgiActorSystemFactory.scala @@ -44,8 +44,8 @@ class OsgiActorSystemFactory(val context: BundleContext, val fallbackClassLoader def actorSystemConfig(context: BundleContext): Config = { val bundleSymbolicName = context.getBundle.getSymbolicName val bundleId = context.getBundle.getBundleId - val acceptedFilePath = List(s"bundle-$bundleSymbolicName", s"bundle-$bundleId", "akka").map(x => s"etc/$x") - val applicationConfiguration = acceptedFilePath.foldLeft(ConfigFactory.empty())((x, y) => x.withFallback(ConfigFactory.parseFileAnySyntax(new File(y)))) + val acceptedFilePath = List(s"bundle-$bundleSymbolicName", s"bundle-$bundleId", "akka").map(x ⇒ s"etc/$x") + val applicationConfiguration = acceptedFilePath.foldLeft(ConfigFactory.empty())((x, y) ⇒ x.withFallback(ConfigFactory.parseFileAnySyntax(new File(y)))) applicationConfiguration.withFallback(ConfigFactory.load(classloader).withFallback(ConfigFactory.defaultReference(OsgiActorSystemFactory.akkaActorClassLoader))) } diff --git a/akka-testkit/src/main/java/akka/testkit/JavaTestKit.java b/akka-testkit/src/main/java/akka/testkit/JavaTestKit.java index 835fd1939a..46d819edd1 100644 --- a/akka-testkit/src/main/java/akka/testkit/JavaTestKit.java +++ b/akka-testkit/src/main/java/akka/testkit/JavaTestKit.java @@ -135,7 +135,15 @@ public class JavaTestKit { public Object apply() { return cond(); } - }, max, interval); + }, max, interval, p.awaitCond$default$4()); + } + + public AwaitCond(Duration max, Duration interval, String message) { + p.awaitCond(new AbstractFunction0() { + public Object apply() { + return cond(); + } + }, max, interval, message); } } diff --git a/akka-testkit/src/main/scala/akka/testkit/TestKit.scala b/akka-testkit/src/main/scala/akka/testkit/TestKit.scala index e81acb23a3..45ccdbb8c7 100644 --- a/akka-testkit/src/main/scala/akka/testkit/TestKit.scala +++ b/akka-testkit/src/main/scala/akka/testkit/TestKit.scala @@ -215,14 +215,14 @@ trait TestKitBase { * Note that the timeout is scaled using Duration.dilated, * which uses the configuration entry "akka.test.timefactor". */ - def awaitCond(p: ⇒ Boolean, max: Duration = Duration.Undefined, interval: Duration = 100.millis) { + def awaitCond(p: ⇒ Boolean, max: Duration = Duration.Undefined, interval: Duration = 100.millis, message: String = "") { val _max = remainingOrDilated(max) val stop = now + _max @tailrec def poll(t: Duration) { if (!p) { - assert(now < stop, "timeout " + _max + " expired") + assert(now < stop, "timeout " + _max + " expired: " + message) Thread.sleep(t.toMillis) poll((stop - now) min interval) }