rework LARS, see #3428

- tasks are still enqueued without reading the clock
- in order to be resilient against timer thread over-sleeping the tasks
  are passed to the timer thread using an AbstractNodeQueue and the
  wheel itself is now private to the timer thread
- reuse queue Nodes along the way to minimize allocation costs

The problem with the old implementation was that the timer thread could
sleep too long, then wake up and run multiple buckets in quick
succession. Tasks enqueued just before that event could then get
executed basically immediately, i.e. before their allotted time.
This commit is contained in:
Roland Kuhn 2013-06-18 13:10:34 +02:00
parent 981bce5dd0
commit e14f22f2e3
2 changed files with 114 additions and 111 deletions

View file

@ -21,6 +21,9 @@ public abstract class AbstractNodeQueue<T> extends AtomicReference<AbstractNodeQ
set(n);
}
/*
* !!! There is a copy of this code in pollNode() !!!
*/
@SuppressWarnings("unchecked")
protected final Node<T> peekNode() {
for(;;) {
@ -40,6 +43,11 @@ public abstract class AbstractNodeQueue<T> extends AtomicReference<AbstractNodeQ
final Node<T> n = new Node<T>(value);
getAndSet(n).setNext(n);
}
public final void addNode(final Node<T> n) {
n.setNext(null);
getAndSet(n).setNext(n);
}
public final boolean isEmpty() {
return peek() == null;
@ -52,6 +60,9 @@ public abstract class AbstractNodeQueue<T> extends AtomicReference<AbstractNodeQ
return count;
}
/*
* !!! There is a copy of this code in pollNode() !!!
*/
@SuppressWarnings("unchecked")
public final T poll() {
final Node<T> next = peekNode();
@ -63,6 +74,25 @@ public abstract class AbstractNodeQueue<T> extends AtomicReference<AbstractNodeQ
return ret;
}
}
@SuppressWarnings("unchecked")
public final Node<T> pollNode() {
Node<T> tail;
Node<T> next;
for(;;) {
tail = ((Node<T>)Unsafe.instance.getObjectVolatile(this, tailOffset));
next = tail.next();
if (next != null || get() == tail)
break;
}
if (next == null) return null;
else {
tail.value = next.value;
next.value = null;
Unsafe.instance.putOrderedObject(this, tailOffset, next);
return tail;
}
}
private final static long tailOffset;
@ -75,14 +105,14 @@ public abstract class AbstractNodeQueue<T> extends AtomicReference<AbstractNodeQ
}
public static class Node<T> {
T value;
public T value;
private volatile Node<T> _nextDoNotCallMeDirectly;
Node() {
public Node() {
this(null);
}
Node(final T value) {
public Node(final T value) {
this.value = value;
}