diff --git a/akka-actor/src/main/java/akka/dispatch/AbstractNodeQueue.java b/akka-actor/src/main/java/akka/dispatch/AbstractNodeQueue.java index 33954553be..46c0d08e1f 100644 --- a/akka-actor/src/main/java/akka/dispatch/AbstractNodeQueue.java +++ b/akka-actor/src/main/java/akka/dispatch/AbstractNodeQueue.java @@ -21,6 +21,9 @@ public abstract class AbstractNodeQueue extends AtomicReference peekNode() { for(;;) { @@ -40,6 +43,11 @@ public abstract class AbstractNodeQueue extends AtomicReference n = new Node(value); getAndSet(n).setNext(n); } + + public final void addNode(final Node n) { + n.setNext(null); + getAndSet(n).setNext(n); + } public final boolean isEmpty() { return peek() == null; @@ -52,6 +60,9 @@ public abstract class AbstractNodeQueue extends AtomicReference next = peekNode(); @@ -63,6 +74,25 @@ public abstract class AbstractNodeQueue extends AtomicReference pollNode() { + Node tail; + Node next; + for(;;) { + tail = ((Node)Unsafe.instance.getObjectVolatile(this, tailOffset)); + next = tail.next(); + if (next != null || get() == tail) + break; + } + if (next == null) return null; + else { + tail.value = next.value; + next.value = null; + Unsafe.instance.putOrderedObject(this, tailOffset, next); + return tail; + } + } private final static long tailOffset; @@ -75,14 +105,14 @@ public abstract class AbstractNodeQueue extends AtomicReference { - T value; + public T value; private volatile Node _nextDoNotCallMeDirectly; - Node() { + public Node() { this(null); } - Node(final T value) { + public Node(final T value) { this.value = value; } diff --git a/akka-actor/src/main/scala/akka/actor/Scheduler.scala b/akka-actor/src/main/scala/akka/actor/Scheduler.scala index 935eec5030..4ffdf6bbad 100644 --- a/akka-actor/src/main/scala/akka/actor/Scheduler.scala +++ b/akka-actor/src/main/scala/akka/actor/Scheduler.scala @@ -7,19 +7,17 @@ package akka.actor import java.io.Closeable import java.util.concurrent.ThreadFactory import java.util.concurrent.atomic.{ AtomicLong, AtomicReference, AtomicReferenceArray } - import scala.annotation.tailrec import scala.collection.immutable import scala.concurrent.{ Await, ExecutionContext, Future, Promise } import scala.concurrent.duration._ import scala.util.control.{ NoStackTrace, NonFatal } - import com.typesafe.config.Config - import akka.event.LoggingAdapter import akka.util.Helpers import akka.util.Unsafe.{ instance ⇒ unsafe } import akka.util.internal.{ HashedWheelTimer, Timeout ⇒ HWTimeout, Timer ⇒ HWTimer, TimerTask ⇒ HWTimerTask } +import akka.dispatch.AbstractNodeQueue /** * This exception is thrown by Scheduler.schedule* when scheduling is not @@ -185,16 +183,18 @@ trait Cancellable { class LightArrayRevolverScheduler(config: Config, log: LoggingAdapter, threadFactory: ThreadFactory) - extends { - val WheelShift = { - val ticks = config.getInt("akka.scheduler.ticks-per-wheel") - val shift = 31 - Integer.numberOfLeadingZeros(ticks) - if ((ticks & (ticks - 1)) != 0) throw new akka.ConfigurationException("ticks-per-wheel must be a power of 2") - shift - } - val TickDuration = Duration(config.getMilliseconds("akka.scheduler.tick-duration"), MILLISECONDS) - val ShutdownTimeout = Duration(config.getMilliseconds("akka.scheduler.shutdown-timeout"), MILLISECONDS) - } with AtomicReferenceArray[LightArrayRevolverScheduler.TaskHolder](1 << WheelShift) with Scheduler with Closeable { + extends Scheduler with Closeable { + + import Helpers.Requiring + + val WheelSize = + config.getInt("akka.scheduler.ticks-per-wheel") + .requiring(ticks ⇒ (ticks & (ticks - 1)) == 0, "ticks-per-wheel must be a power of 2") + val TickDuration = + Duration(config.getMilliseconds("akka.scheduler.tick-duration"), MILLISECONDS) + .requiring(_ >= 10.millis || !Helpers.isWindows, "minimum supported akka.scheduler.tick-duration on Windows is 10ms") + .requiring(_ >= 1.millis, "minimum supported akka.scheduler.tick-duration is ms") + val ShutdownTimeout = Duration(config.getMilliseconds("akka.scheduler.shutdown-timeout"), MILLISECONDS) import LightArrayRevolverScheduler._ @@ -294,57 +294,23 @@ class LightArrayRevolverScheduler(config: Config, private val start = clock() private val tickNanos = TickDuration.toNanos - private val wheelMask = length() - 1 - @volatile private var currentBucket = 0 + private val wheelMask = WheelSize - 1 + private val queue = new TaskQueue private def schedule(ec: ExecutionContext, r: Runnable, delay: FiniteDuration): TimerTask = if (delay <= Duration.Zero) { if (stopped.get != null) throw new SchedulerException("cannot enqueue after timer shutdown") ec.execute(r) NotCancellable + } else if (stopped.get != null) { + throw new SchedulerException("cannot enqueue after timer shutdown") } else { val ticks = (delay.toNanos / tickNanos).toInt - val rounds = (ticks >> WheelShift).toInt - - /* - * works as follows: - * - ticks are calculated to be never “too early” - * - base off of currentBucket, even after that was moved in the meantime - * - timer thread will swap in Pause, increment currentBucket, swap in null - * - hence spin on Pause, else normal CAS - * - stopping will set all buckets to Pause (in clearAll), so we need only check there - */ - @tailrec - def rec(t: TaskHolder): TimerTask = { - val current = currentBucket - val bucket = (current + ticks) & wheelMask - val contents = get(bucket) - /* - * The timer thread does the following: - * - swap Pause into the currentBucket - * - increment currentBucket - * - swap empty list into the Paused bucket (guaranteed != previous contents, even if empty) - * - * If we read the bucket contents before the last step, everything is fine, - * it will either succeed (all done before the Pause) or fail with Pause or in the CAS. - * But if we read the bucket contents after the third step but had read the currentBucket - * before the second step, then we’d enqueue into the wrong round. After seeing the new - * bucket contents, this next read will need to see the incremented currentBucket so we - * can detect this race and retry. - */ - if (current != currentBucket) rec(t) - else contents match { - case Pause ⇒ - if (stopped.get != null) throw new SchedulerException("cannot enqueue after timer shutdown") - rec(t) - case tail ⇒ - t.next = tail - if (compareAndSet(bucket, tail, t)) t - else rec(t) - } - } - - rec(new TaskHolder(r, null, rounds, ec)) + val task = new TaskHolder(r, ticks, ec) + queue.add(task) + if (stopped.get != null && task.cancel()) + throw new SchedulerException("cannot enqueue after timer shutdown") + task } private val stopped = new AtomicReference[Promise[immutable.Seq[TimerTask]]] @@ -359,18 +325,34 @@ class LightArrayRevolverScheduler(config: Config, } else Future.successful(Nil) } - private def clearAll(): immutable.Seq[TimerTask] = { - def collect(curr: TaskHolder, acc: Vector[TimerTask]): Vector[TimerTask] = { - curr match { - case null ⇒ acc - case x ⇒ collect(x.next, acc :+ x) - } - } - (0 until length()) flatMap (i ⇒ collect(getAndSet(i, Pause), Vector.empty)) - } - @volatile private var timerThread: Thread = threadFactory.newThread(new Runnable { + var tick = 0 + val wheel = Array.fill(WheelSize)(new TaskQueue) + + private def clearAll(): immutable.Seq[TimerTask] = { + def collect(q: TaskQueue, acc: Vector[TimerTask]): Vector[TimerTask] = { + q.poll() match { + case null ⇒ acc + case x ⇒ collect(q, acc :+ x) + } + } + ((0 until WheelSize) flatMap (i ⇒ collect(wheel(i), Vector.empty))) ++ collect(queue, Vector.empty) + } + + @tailrec + private def checkQueue(): Unit = queue.pollNode() match { + case null ⇒ () + case node ⇒ + if (node.value.ticks == 0) { + node.value.executeTask() + } else { + val bucket = (tick + node.value.ticks) & wheelMask + wheel(bucket).addNode(node) + } + checkQueue() + } + override final def run = try nextTick() catch { @@ -382,7 +364,10 @@ class LightArrayRevolverScheduler(config: Config, log.info("starting new LARS thread") try thread.start() catch { - case e: Throwable ⇒ log.error(e, "LARS cannot start new thread, ship’s going down!") + case e: Throwable ⇒ + log.error(e, "LARS cannot start new thread, ship’s going down!") + stopped.set(Promise successful Nil) + clearAll() } timerThread = thread case p ⇒ @@ -391,53 +376,44 @@ class LightArrayRevolverScheduler(config: Config, } throw t } + @tailrec final def nextTick(): Unit = { val sleepTime = start + tick * tickNanos - clock() if (sleepTime > 0) { + // check the queue before taking a nap + checkQueue() waitNanos(sleepTime) } else { - // first get the list of tasks out and turn the wheel - val bucket = currentBucket - val tasks = getAndSet(bucket, Pause) - val next = (bucket + 1) & wheelMask - currentBucket = next - set(bucket, if (tasks eq null) Empty else null) + val bucket = tick & wheelMask + val tasks = wheel(bucket) + val putBack = new TaskQueue - // then process the tasks and keep the non-ripe ones in a list - var last: TaskHolder = null // the last element of the putBack list - @tailrec def rec1(task: TaskHolder, nonRipe: TaskHolder): TaskHolder = { - if ((task eq null) || (task eq Empty)) nonRipe - else if (task.isCancelled) rec1(task.next, nonRipe) - else if (task.rounds > 0) { - task.rounds -= 1 - - val next = task.next - task.next = nonRipe - - if (last == null) last = task - rec1(next, task) - } else { - task.executeTask() - rec1(task.next, nonRipe) - } + @tailrec def executeBucket(): Unit = tasks.pollNode() match { + case null ⇒ () + case node ⇒ + val task = node.value + if (!task.isCancelled) { + if (task.ticks > WheelSize) { + task.ticks -= WheelSize + putBack.addNode(node) + } else task.executeTask() + } + executeBucket() } - val putBack = rec1(tasks, null) + executeBucket() + wheel(bucket) = putBack - // finally put back the non-ripe ones, who had their rounds decremented - @tailrec def rec2() { - val tail = get(bucket) - last.next = tail - if (!compareAndSet(bucket, tail, putBack)) rec2() - } - if (last != null) rec2() + // check the queue now so that ticks==0 tasks can execute immediately + checkQueue() - // and off to the next tick tick += 1 } stopped.get match { case null ⇒ nextTick() - case x ⇒ x success clearAll() + case p ⇒ + assert(stopped.compareAndSet(p, Promise successful Nil), "Stop signal violated in LARS") + p success clearAll() } } }) @@ -448,6 +424,8 @@ class LightArrayRevolverScheduler(config: Config, object LightArrayRevolverScheduler { private val taskOffset = unsafe.objectFieldOffset(classOf[TaskHolder].getDeclaredField("task")) + private class TaskQueue extends AbstractNodeQueue[TaskHolder] + /** * INTERNAL API */ @@ -456,10 +434,9 @@ object LightArrayRevolverScheduler { /** * INTERNAL API */ - protected[actor] class TaskHolder(@volatile var task: Runnable, - @volatile var next: TaskHolder, - @volatile var rounds: Int, - executionContext: ExecutionContext) extends TimerTask { + protected[actor] class TaskHolder(@volatile var task: Runnable, var ticks: Int, executionContext: ExecutionContext) + extends TimerTask { + @tailrec private final def extractTask(cancel: Boolean): Runnable = { task match { @@ -507,10 +484,6 @@ object LightArrayRevolverScheduler { def cancel(): Boolean = false def isCancelled: Boolean = false } - // marker object during wheel movement - private val Pause = new TaskHolder(null, null, 0, null) - // we need two empty tokens so wheel passing can be detected in schedule() - private val Empty = new TaskHolder(null, null, 0, null) } /**