From e14f22f2e3f7f98d3d481369cd07f819128e9c37 Mon Sep 17 00:00:00 2001 From: Roland Kuhn Date: Tue, 18 Jun 2013 13:10:34 +0200 Subject: [PATCH] rework LARS, see #3428 - tasks are still enqueued without reading the clock - in order to be resilient against timer thread over-sleeping the tasks are passed to the timer thread using an AbstractNodeQueue and the wheel itself is now private to the timer thread - reuse queue Nodes along the way to minimize allocation costs The problem with the old implementation was that the timer thread could sleep too long, then wake up and run multiple buckets in quick succession. Tasks enqueued just before that event could then get executed basically immediately, i.e. before their allotted time. --- .../java/akka/dispatch/AbstractNodeQueue.java | 36 +++- .../src/main/scala/akka/actor/Scheduler.scala | 189 ++++++++---------- 2 files changed, 114 insertions(+), 111 deletions(-) diff --git a/akka-actor/src/main/java/akka/dispatch/AbstractNodeQueue.java b/akka-actor/src/main/java/akka/dispatch/AbstractNodeQueue.java index 33954553be..46c0d08e1f 100644 --- a/akka-actor/src/main/java/akka/dispatch/AbstractNodeQueue.java +++ b/akka-actor/src/main/java/akka/dispatch/AbstractNodeQueue.java @@ -21,6 +21,9 @@ public abstract class AbstractNodeQueue extends AtomicReference peekNode() { for(;;) { @@ -40,6 +43,11 @@ public abstract class AbstractNodeQueue extends AtomicReference n = new Node(value); getAndSet(n).setNext(n); } + + public final void addNode(final Node n) { + n.setNext(null); + getAndSet(n).setNext(n); + } public final boolean isEmpty() { return peek() == null; @@ -52,6 +60,9 @@ public abstract class AbstractNodeQueue extends AtomicReference next = peekNode(); @@ -63,6 +74,25 @@ public abstract class AbstractNodeQueue extends AtomicReference pollNode() { + Node tail; + Node next; + for(;;) { + tail = ((Node)Unsafe.instance.getObjectVolatile(this, tailOffset)); + next = tail.next(); + if (next != null || get() == tail) + break; + } + if (next == null) return null; + else { + tail.value = next.value; + next.value = null; + Unsafe.instance.putOrderedObject(this, tailOffset, next); + return tail; + } + } private final static long tailOffset; @@ -75,14 +105,14 @@ public abstract class AbstractNodeQueue extends AtomicReference { - T value; + public T value; private volatile Node _nextDoNotCallMeDirectly; - Node() { + public Node() { this(null); } - Node(final T value) { + public Node(final T value) { this.value = value; } diff --git a/akka-actor/src/main/scala/akka/actor/Scheduler.scala b/akka-actor/src/main/scala/akka/actor/Scheduler.scala index 935eec5030..4ffdf6bbad 100644 --- a/akka-actor/src/main/scala/akka/actor/Scheduler.scala +++ b/akka-actor/src/main/scala/akka/actor/Scheduler.scala @@ -7,19 +7,17 @@ package akka.actor import java.io.Closeable import java.util.concurrent.ThreadFactory import java.util.concurrent.atomic.{ AtomicLong, AtomicReference, AtomicReferenceArray } - import scala.annotation.tailrec import scala.collection.immutable import scala.concurrent.{ Await, ExecutionContext, Future, Promise } import scala.concurrent.duration._ import scala.util.control.{ NoStackTrace, NonFatal } - import com.typesafe.config.Config - import akka.event.LoggingAdapter import akka.util.Helpers import akka.util.Unsafe.{ instance ⇒ unsafe } import akka.util.internal.{ HashedWheelTimer, Timeout ⇒ HWTimeout, Timer ⇒ HWTimer, TimerTask ⇒ HWTimerTask } +import akka.dispatch.AbstractNodeQueue /** * This exception is thrown by Scheduler.schedule* when scheduling is not @@ -185,16 +183,18 @@ trait Cancellable { class LightArrayRevolverScheduler(config: Config, log: LoggingAdapter, threadFactory: ThreadFactory) - extends { - val WheelShift = { - val ticks = config.getInt("akka.scheduler.ticks-per-wheel") - val shift = 31 - Integer.numberOfLeadingZeros(ticks) - if ((ticks & (ticks - 1)) != 0) throw new akka.ConfigurationException("ticks-per-wheel must be a power of 2") - shift - } - val TickDuration = Duration(config.getMilliseconds("akka.scheduler.tick-duration"), MILLISECONDS) - val ShutdownTimeout = Duration(config.getMilliseconds("akka.scheduler.shutdown-timeout"), MILLISECONDS) - } with AtomicReferenceArray[LightArrayRevolverScheduler.TaskHolder](1 << WheelShift) with Scheduler with Closeable { + extends Scheduler with Closeable { + + import Helpers.Requiring + + val WheelSize = + config.getInt("akka.scheduler.ticks-per-wheel") + .requiring(ticks ⇒ (ticks & (ticks - 1)) == 0, "ticks-per-wheel must be a power of 2") + val TickDuration = + Duration(config.getMilliseconds("akka.scheduler.tick-duration"), MILLISECONDS) + .requiring(_ >= 10.millis || !Helpers.isWindows, "minimum supported akka.scheduler.tick-duration on Windows is 10ms") + .requiring(_ >= 1.millis, "minimum supported akka.scheduler.tick-duration is ms") + val ShutdownTimeout = Duration(config.getMilliseconds("akka.scheduler.shutdown-timeout"), MILLISECONDS) import LightArrayRevolverScheduler._ @@ -294,57 +294,23 @@ class LightArrayRevolverScheduler(config: Config, private val start = clock() private val tickNanos = TickDuration.toNanos - private val wheelMask = length() - 1 - @volatile private var currentBucket = 0 + private val wheelMask = WheelSize - 1 + private val queue = new TaskQueue private def schedule(ec: ExecutionContext, r: Runnable, delay: FiniteDuration): TimerTask = if (delay <= Duration.Zero) { if (stopped.get != null) throw new SchedulerException("cannot enqueue after timer shutdown") ec.execute(r) NotCancellable + } else if (stopped.get != null) { + throw new SchedulerException("cannot enqueue after timer shutdown") } else { val ticks = (delay.toNanos / tickNanos).toInt - val rounds = (ticks >> WheelShift).toInt - - /* - * works as follows: - * - ticks are calculated to be never “too early” - * - base off of currentBucket, even after that was moved in the meantime - * - timer thread will swap in Pause, increment currentBucket, swap in null - * - hence spin on Pause, else normal CAS - * - stopping will set all buckets to Pause (in clearAll), so we need only check there - */ - @tailrec - def rec(t: TaskHolder): TimerTask = { - val current = currentBucket - val bucket = (current + ticks) & wheelMask - val contents = get(bucket) - /* - * The timer thread does the following: - * - swap Pause into the currentBucket - * - increment currentBucket - * - swap empty list into the Paused bucket (guaranteed != previous contents, even if empty) - * - * If we read the bucket contents before the last step, everything is fine, - * it will either succeed (all done before the Pause) or fail with Pause or in the CAS. - * But if we read the bucket contents after the third step but had read the currentBucket - * before the second step, then we’d enqueue into the wrong round. After seeing the new - * bucket contents, this next read will need to see the incremented currentBucket so we - * can detect this race and retry. - */ - if (current != currentBucket) rec(t) - else contents match { - case Pause ⇒ - if (stopped.get != null) throw new SchedulerException("cannot enqueue after timer shutdown") - rec(t) - case tail ⇒ - t.next = tail - if (compareAndSet(bucket, tail, t)) t - else rec(t) - } - } - - rec(new TaskHolder(r, null, rounds, ec)) + val task = new TaskHolder(r, ticks, ec) + queue.add(task) + if (stopped.get != null && task.cancel()) + throw new SchedulerException("cannot enqueue after timer shutdown") + task } private val stopped = new AtomicReference[Promise[immutable.Seq[TimerTask]]] @@ -359,18 +325,34 @@ class LightArrayRevolverScheduler(config: Config, } else Future.successful(Nil) } - private def clearAll(): immutable.Seq[TimerTask] = { - def collect(curr: TaskHolder, acc: Vector[TimerTask]): Vector[TimerTask] = { - curr match { - case null ⇒ acc - case x ⇒ collect(x.next, acc :+ x) - } - } - (0 until length()) flatMap (i ⇒ collect(getAndSet(i, Pause), Vector.empty)) - } - @volatile private var timerThread: Thread = threadFactory.newThread(new Runnable { + var tick = 0 + val wheel = Array.fill(WheelSize)(new TaskQueue) + + private def clearAll(): immutable.Seq[TimerTask] = { + def collect(q: TaskQueue, acc: Vector[TimerTask]): Vector[TimerTask] = { + q.poll() match { + case null ⇒ acc + case x ⇒ collect(q, acc :+ x) + } + } + ((0 until WheelSize) flatMap (i ⇒ collect(wheel(i), Vector.empty))) ++ collect(queue, Vector.empty) + } + + @tailrec + private def checkQueue(): Unit = queue.pollNode() match { + case null ⇒ () + case node ⇒ + if (node.value.ticks == 0) { + node.value.executeTask() + } else { + val bucket = (tick + node.value.ticks) & wheelMask + wheel(bucket).addNode(node) + } + checkQueue() + } + override final def run = try nextTick() catch { @@ -382,7 +364,10 @@ class LightArrayRevolverScheduler(config: Config, log.info("starting new LARS thread") try thread.start() catch { - case e: Throwable ⇒ log.error(e, "LARS cannot start new thread, ship’s going down!") + case e: Throwable ⇒ + log.error(e, "LARS cannot start new thread, ship’s going down!") + stopped.set(Promise successful Nil) + clearAll() } timerThread = thread case p ⇒ @@ -391,53 +376,44 @@ class LightArrayRevolverScheduler(config: Config, } throw t } + @tailrec final def nextTick(): Unit = { val sleepTime = start + tick * tickNanos - clock() if (sleepTime > 0) { + // check the queue before taking a nap + checkQueue() waitNanos(sleepTime) } else { - // first get the list of tasks out and turn the wheel - val bucket = currentBucket - val tasks = getAndSet(bucket, Pause) - val next = (bucket + 1) & wheelMask - currentBucket = next - set(bucket, if (tasks eq null) Empty else null) + val bucket = tick & wheelMask + val tasks = wheel(bucket) + val putBack = new TaskQueue - // then process the tasks and keep the non-ripe ones in a list - var last: TaskHolder = null // the last element of the putBack list - @tailrec def rec1(task: TaskHolder, nonRipe: TaskHolder): TaskHolder = { - if ((task eq null) || (task eq Empty)) nonRipe - else if (task.isCancelled) rec1(task.next, nonRipe) - else if (task.rounds > 0) { - task.rounds -= 1 - - val next = task.next - task.next = nonRipe - - if (last == null) last = task - rec1(next, task) - } else { - task.executeTask() - rec1(task.next, nonRipe) - } + @tailrec def executeBucket(): Unit = tasks.pollNode() match { + case null ⇒ () + case node ⇒ + val task = node.value + if (!task.isCancelled) { + if (task.ticks > WheelSize) { + task.ticks -= WheelSize + putBack.addNode(node) + } else task.executeTask() + } + executeBucket() } - val putBack = rec1(tasks, null) + executeBucket() + wheel(bucket) = putBack - // finally put back the non-ripe ones, who had their rounds decremented - @tailrec def rec2() { - val tail = get(bucket) - last.next = tail - if (!compareAndSet(bucket, tail, putBack)) rec2() - } - if (last != null) rec2() + // check the queue now so that ticks==0 tasks can execute immediately + checkQueue() - // and off to the next tick tick += 1 } stopped.get match { case null ⇒ nextTick() - case x ⇒ x success clearAll() + case p ⇒ + assert(stopped.compareAndSet(p, Promise successful Nil), "Stop signal violated in LARS") + p success clearAll() } } }) @@ -448,6 +424,8 @@ class LightArrayRevolverScheduler(config: Config, object LightArrayRevolverScheduler { private val taskOffset = unsafe.objectFieldOffset(classOf[TaskHolder].getDeclaredField("task")) + private class TaskQueue extends AbstractNodeQueue[TaskHolder] + /** * INTERNAL API */ @@ -456,10 +434,9 @@ object LightArrayRevolverScheduler { /** * INTERNAL API */ - protected[actor] class TaskHolder(@volatile var task: Runnable, - @volatile var next: TaskHolder, - @volatile var rounds: Int, - executionContext: ExecutionContext) extends TimerTask { + protected[actor] class TaskHolder(@volatile var task: Runnable, var ticks: Int, executionContext: ExecutionContext) + extends TimerTask { + @tailrec private final def extractTask(cancel: Boolean): Runnable = { task match { @@ -507,10 +484,6 @@ object LightArrayRevolverScheduler { def cancel(): Boolean = false def isCancelled: Boolean = false } - // marker object during wheel movement - private val Pause = new TaskHolder(null, null, 0, null) - // we need two empty tokens so wheel passing can be detected in schedule() - private val Empty = new TaskHolder(null, null, 0, null) } /**