Major cleanup in HWT, Making the Timeout a static class to be able to use Unsafe instead of AtomicInteger. Sprinkling finals and changing code formatting.

(RK: cherry-picked from Viktor’s branch and cleaned up two “unused”
warnings)
This commit is contained in:
Viktor Klang 2012-08-06 13:44:07 +02:00 committed by Roland
parent a5068a50a3
commit c491ccfbf8

View file

@ -15,17 +15,19 @@
*/
package akka.util.internal;
import akka.event.LoggingAdapter;
import scala.concurrent.util.Duration;
import java.util.*;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import scala.concurrent.util.Duration;
import akka.event.LoggingAdapter;
import akka.util.Unsafe;
/**
* A {@link Timer} optimized for approximated I/O timeout scheduling.
*
@ -83,7 +85,6 @@ public class HashedWheelTimer implements Timer {
private final Worker worker = new Worker();
final Thread workerThread;
boolean shutdown = false;
private final long roundDuration;
final long tickDuration;
final Set<HashedWheelTimeout>[] wheel;
final ReusableIterator<HashedWheelTimeout>[] iterators;
@ -136,23 +137,19 @@ public class HashedWheelTimer implements Timer {
throw new IllegalArgumentException("tickDuration is too long: " + tickDuration + ' ' + duration.unit());
}
roundDuration = tickDuration * wheel.length;
workerThread = threadFactory.newThread(worker);
}
@SuppressWarnings("unchecked")
private static Set<HashedWheelTimeout>[] createWheel(int ticksPerWheel) {
private static Set<HashedWheelTimeout>[] createWheel(final int ticksPerWheel) {
if (ticksPerWheel <= 0) {
throw new IllegalArgumentException(
"ticksPerWheel must be greater than 0: " + ticksPerWheel);
throw new IllegalArgumentException("ticksPerWheel must be greater than 0: " + ticksPerWheel);
}
if (ticksPerWheel > 1073741824) {
throw new IllegalArgumentException(
"ticksPerWheel may not be greater than 2^30: " + ticksPerWheel);
throw new IllegalArgumentException("ticksPerWheel may not be greater than 2^30: " + ticksPerWheel);
}
ticksPerWheel = normalizeTicksPerWheel(ticksPerWheel);
Set<HashedWheelTimeout>[] wheel = new Set[ticksPerWheel];
final Set<HashedWheelTimeout>[] wheel = new Set[normalizeTicksPerWheel(ticksPerWheel)];
for (int i = 0; i < wheel.length; i ++) {
wheel[i] = Collections.newSetFromMap(new ConcurrentIdentityHashMap<HashedWheelTimeout, Boolean>(16, 0.95f, 4));
}
@ -241,7 +238,7 @@ public class HashedWheelTimer implements Timer {
}
public HashedWheelTimeout createTimeout(TimerTask task, long time) {
return new HashedWheelTimeout(task, time);
return new HashedWheelTimeout(this, task, time);
}
public Timeout newTimeout(TimerTask task, Duration delay) {
@ -275,7 +272,7 @@ public class HashedWheelTimer implements Timer {
lock.readLock().lock();
try {
if (shutdown) throw new IllegalStateException("cannot enqueue after shutdown");
int stopIndex = (int) ((wheelCursor + relativeIndex) & mask);
final int stopIndex = (int) ((wheelCursor + relativeIndex) & mask);
timeout.stopIndex = stopIndex;
timeout.remainingRounds = remainingRounds;
wheel[stopIndex].add(timeout);
@ -303,22 +300,17 @@ public class HashedWheelTimer implements Timer {
}
public void run() {
List<HashedWheelTimeout> expiredTimeouts =
new ArrayList<HashedWheelTimeout>();
startTime = System.nanoTime();
tick = 1;
while (!shutdown()) {
final long deadline = waitForNextTick();
if (deadline > 0) {
fetchExpiredTimeouts(expiredTimeouts, deadline);
notifyExpiredTimeouts(expiredTimeouts);
}
if (deadline > 0)
notifyExpiredTimeouts(fetchExpiredTimeouts(deadline));
}
}
private void fetchExpiredTimeouts(List<HashedWheelTimeout> expiredTimeouts, long deadline) {
private ArrayList<HashedWheelTimeout> fetchExpiredTimeouts(long deadline) {
// Find the expired timeouts and decrease the round counter
// if necessary. Note that we don't send the notification
@ -326,18 +318,15 @@ public class HashedWheelTimer implements Timer {
// an exclusive lock.
lock.writeLock().lock();
try {
int newWheelCursor = wheelCursor = wheelCursor + 1 & mask;
ReusableIterator<HashedWheelTimeout> i = iterators[newWheelCursor];
fetchExpiredTimeouts(expiredTimeouts, i, deadline);
final int newWheelCursor = wheelCursor = wheelCursor + 1 & mask;
return fetchExpiredTimeouts(iterators[newWheelCursor], deadline);
} finally {
lock.writeLock().unlock();
}
}
private void fetchExpiredTimeouts(
List<HashedWheelTimeout> expiredTimeouts,
ReusableIterator<HashedWheelTimeout> i, long deadline) {
private ArrayList<HashedWheelTimeout> fetchExpiredTimeouts(final ReusableIterator<HashedWheelTimeout> i, final long deadline) {
final ArrayList<HashedWheelTimeout> expiredTimeouts = new ArrayList<HashedWheelTimeout>();
List<HashedWheelTimeout> slipped = null;
i.rewind();
while (i.hasNext()) {
@ -351,13 +340,13 @@ public class HashedWheelTimer implements Timer {
// place, usually one tick earlier. For now, just add
// it to a temporary list - we will reschedule it in a
// separate loop.
if (slipped == null) {
if (slipped == null)
slipped = new ArrayList<HashedWheelTimeout>();
}
slipped.add(timeout);
}
} else {
timeout.remainingRounds --;
timeout.remainingRounds -= 1;
}
}
@ -367,10 +356,10 @@ public class HashedWheelTimer implements Timer {
scheduleTimeout(timeout, timeout.deadline - deadline);
}
}
return expiredTimeouts;
}
private void notifyExpiredTimeouts(
List<HashedWheelTimeout> expiredTimeouts) {
private void notifyExpiredTimeouts(ArrayList<HashedWheelTimeout> expiredTimeouts) {
// Notify the expired timeouts.
for (int i = expiredTimeouts.size() - 1; i >= 0; i --) {
expiredTimeouts.get(i).expire();
@ -413,69 +402,83 @@ public class HashedWheelTimer implements Timer {
}
}
private final class HashedWheelTimeout implements Timeout {
private static final class HashedWheelTimeout implements Timeout {
private static final long _stateOffset;
static {
try {
_stateOffset = Unsafe.instance.objectFieldOffset(HashedWheelTimeout.class.getDeclaredField("_state"));
} catch(Throwable t){
throw new ExceptionInInitializerError(t);
}
}
private static final int ST_INIT = 0;
private static final int ST_CANCELLED = 1;
private static final int ST_EXPIRED = 2;
private final HashedWheelTimer parent;
private final TimerTask task;
final long deadline;
volatile int stopIndex;
volatile long remainingRounds;
private final AtomicInteger state = new AtomicInteger(ST_INIT);
@SuppressWarnings("unused")
private volatile int _state = ST_INIT;
HashedWheelTimeout(TimerTask task, long deadline) {
HashedWheelTimeout(HashedWheelTimer parent, TimerTask task, long deadline) {
this.parent = parent;
this.task = task;
this.deadline = deadline;
}
public Timer getTimer() {
return HashedWheelTimer.this;
return parent;
}
public TimerTask getTask() {
return task;
}
public void cancel() {
if (!state.compareAndSet(ST_INIT, ST_CANCELLED)) {
// TODO return false
return;
}
private final int state() {
return Unsafe.instance.getIntVolatile(this, _stateOffset);
}
private final boolean updateState(int old, int future) {
return Unsafe.instance.compareAndSwapInt(this, _stateOffset, old, future);
}
wheel[stopIndex].remove(this);
public void cancel() {
if (updateState(ST_INIT, ST_CANCELLED)) {
parent.wheel[stopIndex].remove(this);
}
}
public boolean isCancelled() {
return state.get() == ST_CANCELLED;
return state() == ST_CANCELLED;
}
public boolean isExpired() {
return state.get() != ST_INIT;
return state() != ST_INIT;
}
public void expire() {
if (!state.compareAndSet(ST_INIT, ST_EXPIRED)) {
return;
}
try {
task.run(this);
} catch (Throwable t) {
logger.warning(
"An exception was thrown by " +
TimerTask.class.getSimpleName() + ".", t);
if (updateState(ST_INIT, ST_EXPIRED)) {
try {
task.run(this);
} catch (Throwable t) {
parent.logger.warning(
"An exception was thrown by " +
TimerTask.class.getSimpleName() + ".", t);
}
}
}
@Override
public String toString() {
long currentTime = System.nanoTime();
long remaining = deadline - currentTime;
final long currentTime = System.nanoTime();
final long remaining = deadline - currentTime;
StringBuilder buf = new StringBuilder(192);
buf.append(getClass().getSimpleName());
buf.append("HashedWheelTimeout");
buf.append('(');
buf.append("deadline: ");