Merge pull request #1547 from akka/wip-LARS-2.0-∂π

rework LARS, see #3428
This commit is contained in:
Roland Kuhn 2013-06-20 06:23:27 -07:00
commit 4ac9993c18
5 changed files with 133 additions and 114 deletions

View file

@ -387,6 +387,21 @@ class LightArrayRevolverSchedulerSpec extends AkkaSpec(SchedulerSpec.testConfRev
}
}
"properly defer jobs even when the timer thread oversleeps" in {
withScheduler() { (sched, driver)
implicit def ec = localEC
import driver._
sched.scheduleOnce(step * 3, probe.ref, "hello")
wakeUp(step * 5)
expectWait(step)
wakeUp(step * 2)
expectWait(step)
wakeUp(step)
probe.expectMsg("hello")
expectWait(step)
}
}
"correctly wrap around wheel rounds" in {
withScheduler(config = ConfigFactory.parseString("akka.scheduler.ticks-per-wheel=2")) { (sched, driver)
implicit def ec = localEC

View file

@ -32,7 +32,7 @@ class ConfigSpec extends AkkaSpec(ConfigFactory.defaultReference(ActorSystem.fin
settings.SerializeAllMessages must equal(false)
getInt("akka.scheduler.ticks-per-wheel") must equal(512)
getMilliseconds("akka.scheduler.tick-duration") must equal(20)
getMilliseconds("akka.scheduler.tick-duration") must equal(10)
getString("akka.scheduler.implementation") must equal("akka.actor.LightArrayRevolverScheduler")
getBoolean("akka.daemonic") must be(false)

View file

@ -21,6 +21,9 @@ public abstract class AbstractNodeQueue<T> extends AtomicReference<AbstractNodeQ
set(n);
}
/*
* !!! There is a copy of this code in pollNode() !!!
*/
@SuppressWarnings("unchecked")
protected final Node<T> peekNode() {
for(;;) {
@ -40,6 +43,11 @@ public abstract class AbstractNodeQueue<T> extends AtomicReference<AbstractNodeQ
final Node<T> n = new Node<T>(value);
getAndSet(n).setNext(n);
}
public final void addNode(final Node<T> n) {
n.setNext(null);
getAndSet(n).setNext(n);
}
public final boolean isEmpty() {
return peek() == null;
@ -52,6 +60,9 @@ public abstract class AbstractNodeQueue<T> extends AtomicReference<AbstractNodeQ
return count;
}
/*
* !!! There is a copy of this code in pollNode() !!!
*/
@SuppressWarnings("unchecked")
public final T poll() {
final Node<T> next = peekNode();
@ -63,6 +74,25 @@ public abstract class AbstractNodeQueue<T> extends AtomicReference<AbstractNodeQ
return ret;
}
}
@SuppressWarnings("unchecked")
public final Node<T> pollNode() {
Node<T> tail;
Node<T> next;
for(;;) {
tail = ((Node<T>)Unsafe.instance.getObjectVolatile(this, tailOffset));
next = tail.next();
if (next != null || get() == tail)
break;
}
if (next == null) return null;
else {
tail.value = next.value;
next.value = null;
Unsafe.instance.putOrderedObject(this, tailOffset, next);
return tail;
}
}
private final static long tailOffset;
@ -75,14 +105,14 @@ public abstract class AbstractNodeQueue<T> extends AtomicReference<AbstractNodeQ
}
public static class Node<T> {
T value;
public T value;
private volatile Node<T> _nextDoNotCallMeDirectly;
Node() {
public Node() {
this(null);
}
Node(final T value) {
public Node(final T value) {
this.value = value;
}

View file

@ -448,7 +448,7 @@ akka {
# Note that it might take up to 1 tick to stop the Timer, so setting the
# tick-duration to a high value will make shutting down the actor system
# take longer.
tick-duration = 20ms
tick-duration = 10ms
# The timer uses a circular wheel of buckets to store the timer tasks.
# This should be set such that the majority of scheduled timeouts (for high

View file

@ -7,19 +7,17 @@ package akka.actor
import java.io.Closeable
import java.util.concurrent.ThreadFactory
import java.util.concurrent.atomic.{ AtomicLong, AtomicReference, AtomicReferenceArray }
import scala.annotation.tailrec
import scala.collection.immutable
import scala.concurrent.{ Await, ExecutionContext, Future, Promise }
import scala.concurrent.duration._
import scala.util.control.{ NoStackTrace, NonFatal }
import com.typesafe.config.Config
import akka.event.LoggingAdapter
import akka.util.Helpers
import akka.util.Unsafe.{ instance unsafe }
import akka.util.internal.{ HashedWheelTimer, Timeout HWTimeout, Timer HWTimer, TimerTask HWTimerTask }
import akka.dispatch.AbstractNodeQueue
/**
* This exception is thrown by Scheduler.schedule* when scheduling is not
@ -185,16 +183,18 @@ trait Cancellable {
class LightArrayRevolverScheduler(config: Config,
log: LoggingAdapter,
threadFactory: ThreadFactory)
extends {
val WheelShift = {
val ticks = config.getInt("akka.scheduler.ticks-per-wheel")
val shift = 31 - Integer.numberOfLeadingZeros(ticks)
if ((ticks & (ticks - 1)) != 0) throw new akka.ConfigurationException("ticks-per-wheel must be a power of 2")
shift
}
val TickDuration = Duration(config.getMilliseconds("akka.scheduler.tick-duration"), MILLISECONDS)
val ShutdownTimeout = Duration(config.getMilliseconds("akka.scheduler.shutdown-timeout"), MILLISECONDS)
} with AtomicReferenceArray[LightArrayRevolverScheduler.TaskHolder](1 << WheelShift) with Scheduler with Closeable {
extends Scheduler with Closeable {
import Helpers.Requiring
val WheelSize =
config.getInt("akka.scheduler.ticks-per-wheel")
.requiring(ticks (ticks & (ticks - 1)) == 0, "ticks-per-wheel must be a power of 2")
val TickDuration =
Duration(config.getMilliseconds("akka.scheduler.tick-duration"), MILLISECONDS)
.requiring(_ >= 10.millis || !Helpers.isWindows, "minimum supported akka.scheduler.tick-duration on Windows is 10ms")
.requiring(_ >= 1.millis, "minimum supported akka.scheduler.tick-duration is ms")
val ShutdownTimeout = Duration(config.getMilliseconds("akka.scheduler.shutdown-timeout"), MILLISECONDS)
import LightArrayRevolverScheduler._
@ -294,57 +294,23 @@ class LightArrayRevolverScheduler(config: Config,
private val start = clock()
private val tickNanos = TickDuration.toNanos
private val wheelMask = length() - 1
@volatile private var currentBucket = 0
private val wheelMask = WheelSize - 1
private val queue = new TaskQueue
private def schedule(ec: ExecutionContext, r: Runnable, delay: FiniteDuration): TimerTask =
if (delay <= Duration.Zero) {
if (stopped.get != null) throw new SchedulerException("cannot enqueue after timer shutdown")
ec.execute(r)
NotCancellable
} else if (stopped.get != null) {
throw new SchedulerException("cannot enqueue after timer shutdown")
} else {
val ticks = (delay.toNanos / tickNanos).toInt
val rounds = (ticks >> WheelShift).toInt
/*
* works as follows:
* - ticks are calculated to be never too early
* - base off of currentBucket, even after that was moved in the meantime
* - timer thread will swap in Pause, increment currentBucket, swap in null
* - hence spin on Pause, else normal CAS
* - stopping will set all buckets to Pause (in clearAll), so we need only check there
*/
@tailrec
def rec(t: TaskHolder): TimerTask = {
val current = currentBucket
val bucket = (current + ticks) & wheelMask
val contents = get(bucket)
/*
* The timer thread does the following:
* - swap Pause into the currentBucket
* - increment currentBucket
* - swap empty list into the Paused bucket (guaranteed != previous contents, even if empty)
*
* If we read the bucket contents before the last step, everything is fine,
* it will either succeed (all done before the Pause) or fail with Pause or in the CAS.
* But if we read the bucket contents after the third step but had read the currentBucket
* before the second step, then wed enqueue into the wrong round. After seeing the new
* bucket contents, this next read will need to see the incremented currentBucket so we
* can detect this race and retry.
*/
if (current != currentBucket) rec(t)
else contents match {
case Pause
if (stopped.get != null) throw new SchedulerException("cannot enqueue after timer shutdown")
rec(t)
case tail
t.next = tail
if (compareAndSet(bucket, tail, t)) t
else rec(t)
}
}
rec(new TaskHolder(r, null, rounds, ec))
val task = new TaskHolder(r, ticks, ec)
queue.add(task)
if (stopped.get != null && task.cancel())
throw new SchedulerException("cannot enqueue after timer shutdown")
task
}
private val stopped = new AtomicReference[Promise[immutable.Seq[TimerTask]]]
@ -359,18 +325,34 @@ class LightArrayRevolverScheduler(config: Config,
} else Future.successful(Nil)
}
private def clearAll(): immutable.Seq[TimerTask] = {
def collect(curr: TaskHolder, acc: Vector[TimerTask]): Vector[TimerTask] = {
curr match {
case null acc
case x collect(x.next, acc :+ x)
}
}
(0 until length()) flatMap (i collect(getAndSet(i, Pause), Vector.empty))
}
@volatile private var timerThread: Thread = threadFactory.newThread(new Runnable {
var tick = 0
val wheel = Array.fill(WheelSize)(new TaskQueue)
private def clearAll(): immutable.Seq[TimerTask] = {
@tailrec def collect(q: TaskQueue, acc: Vector[TimerTask]): Vector[TimerTask] = {
q.poll() match {
case null acc
case x collect(q, acc :+ x)
}
}
((0 until WheelSize) flatMap (i collect(wheel(i), Vector.empty))) ++ collect(queue, Vector.empty)
}
@tailrec
private def checkQueue(time: Long): Unit = queue.pollNode() match {
case null ()
case node
if (node.value.ticks == 0) {
node.value.executeTask()
} else {
val bucket = ((time + node.value.ticks * tickNanos - start) / tickNanos) & wheelMask
wheel(bucket.toInt).addNode(node)
}
checkQueue(time)
}
override final def run =
try nextTick()
catch {
@ -382,7 +364,10 @@ class LightArrayRevolverScheduler(config: Config,
log.info("starting new LARS thread")
try thread.start()
catch {
case e: Throwable log.error(e, "LARS cannot start new thread, ships going down!")
case e: Throwable
log.error(e, "LARS cannot start new thread, ships going down!")
stopped.set(Promise successful Nil)
clearAll()
}
timerThread = thread
case p
@ -391,53 +376,45 @@ class LightArrayRevolverScheduler(config: Config,
}
throw t
}
@tailrec final def nextTick(): Unit = {
val sleepTime = start + tick * tickNanos - clock()
val time = clock()
val sleepTime = start + tick * tickNanos - time
if (sleepTime > 0) {
// check the queue before taking a nap
checkQueue(time)
waitNanos(sleepTime)
} else {
// first get the list of tasks out and turn the wheel
val bucket = currentBucket
val tasks = getAndSet(bucket, Pause)
val next = (bucket + 1) & wheelMask
currentBucket = next
set(bucket, if (tasks eq null) Empty else null)
val bucket = tick & wheelMask
val tasks = wheel(bucket)
val putBack = new TaskQueue
// then process the tasks and keep the non-ripe ones in a list
var last: TaskHolder = null // the last element of the putBack list
@tailrec def rec1(task: TaskHolder, nonRipe: TaskHolder): TaskHolder = {
if ((task eq null) || (task eq Empty)) nonRipe
else if (task.isCancelled) rec1(task.next, nonRipe)
else if (task.rounds > 0) {
task.rounds -= 1
val next = task.next
task.next = nonRipe
if (last == null) last = task
rec1(next, task)
} else {
task.executeTask()
rec1(task.next, nonRipe)
}
@tailrec def executeBucket(): Unit = tasks.pollNode() match {
case null ()
case node
val task = node.value
if (!task.isCancelled) {
if (task.ticks > WheelSize) {
task.ticks -= WheelSize
putBack.addNode(node)
} else task.executeTask()
}
executeBucket()
}
val putBack = rec1(tasks, null)
executeBucket()
wheel(bucket) = putBack
// finally put back the non-ripe ones, who had their rounds decremented
@tailrec def rec2() {
val tail = get(bucket)
last.next = tail
if (!compareAndSet(bucket, tail, putBack)) rec2()
}
if (last != null) rec2()
// check the queue now so that ticks==0 tasks can execute immediately
checkQueue(clock())
// and off to the next tick
tick += 1
}
stopped.get match {
case null nextTick()
case x x success clearAll()
case p
assert(stopped.compareAndSet(p, Promise successful Nil), "Stop signal violated in LARS")
p success clearAll()
}
}
})
@ -448,6 +425,8 @@ class LightArrayRevolverScheduler(config: Config,
object LightArrayRevolverScheduler {
private val taskOffset = unsafe.objectFieldOffset(classOf[TaskHolder].getDeclaredField("task"))
private class TaskQueue extends AbstractNodeQueue[TaskHolder]
/**
* INTERNAL API
*/
@ -456,10 +435,9 @@ object LightArrayRevolverScheduler {
/**
* INTERNAL API
*/
protected[actor] class TaskHolder(@volatile var task: Runnable,
@volatile var next: TaskHolder,
@volatile var rounds: Int,
executionContext: ExecutionContext) extends TimerTask {
protected[actor] class TaskHolder(@volatile var task: Runnable, var ticks: Int, executionContext: ExecutionContext)
extends TimerTask {
@tailrec
private final def extractTask(cancel: Boolean): Runnable = {
task match {
@ -507,10 +485,6 @@ object LightArrayRevolverScheduler {
def cancel(): Boolean = false
def isCancelled: Boolean = false
}
// marker object during wheel movement
private val Pause = new TaskHolder(null, null, 0, null)
// we need two empty tokens so wheel passing can be detected in schedule()
private val Empty = new TaskHolder(null, null, 0, null)
}
/**