diff --git a/akka-actor/src/main/java/akka/util/internal/HashedWheelTimer.java b/akka-actor/src/main/java/akka/util/internal/HashedWheelTimer.java index 09a1dee833..be0ec835fb 100644 --- a/akka-actor/src/main/java/akka/util/internal/HashedWheelTimer.java +++ b/akka-actor/src/main/java/akka/util/internal/HashedWheelTimer.java @@ -15,17 +15,19 @@ */ package akka.util.internal; -import akka.event.LoggingAdapter; -import scala.concurrent.util.Duration; - -import java.util.*; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; import java.util.concurrent.ThreadFactory; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; +import scala.concurrent.util.Duration; +import akka.event.LoggingAdapter; +import akka.util.Unsafe; + /** * A {@link Timer} optimized for approximated I/O timeout scheduling. * @@ -83,7 +85,6 @@ public class HashedWheelTimer implements Timer { private final Worker worker = new Worker(); final Thread workerThread; boolean shutdown = false; - private final long roundDuration; final long tickDuration; final Set[] wheel; final ReusableIterator[] iterators; @@ -136,23 +137,19 @@ public class HashedWheelTimer implements Timer { throw new IllegalArgumentException("tickDuration is too long: " + tickDuration + ' ' + duration.unit()); } - roundDuration = tickDuration * wheel.length; workerThread = threadFactory.newThread(worker); } @SuppressWarnings("unchecked") - private static Set[] createWheel(int ticksPerWheel) { + private static Set[] createWheel(final int ticksPerWheel) { if (ticksPerWheel <= 0) { - throw new IllegalArgumentException( - "ticksPerWheel must be greater than 0: " + ticksPerWheel); + throw new IllegalArgumentException("ticksPerWheel must be greater than 0: " + ticksPerWheel); } if (ticksPerWheel > 1073741824) { - throw new IllegalArgumentException( - "ticksPerWheel may not be greater than 2^30: " + ticksPerWheel); + throw new IllegalArgumentException("ticksPerWheel may not be greater than 2^30: " + ticksPerWheel); } - ticksPerWheel = normalizeTicksPerWheel(ticksPerWheel); - Set[] wheel = new Set[ticksPerWheel]; + final Set[] wheel = new Set[normalizeTicksPerWheel(ticksPerWheel)]; for (int i = 0; i < wheel.length; i ++) { wheel[i] = Collections.newSetFromMap(new ConcurrentIdentityHashMap(16, 0.95f, 4)); } @@ -241,7 +238,7 @@ public class HashedWheelTimer implements Timer { } public HashedWheelTimeout createTimeout(TimerTask task, long time) { - return new HashedWheelTimeout(task, time); + return new HashedWheelTimeout(this, task, time); } public Timeout newTimeout(TimerTask task, Duration delay) { @@ -275,7 +272,7 @@ public class HashedWheelTimer implements Timer { lock.readLock().lock(); try { if (shutdown) throw new IllegalStateException("cannot enqueue after shutdown"); - int stopIndex = (int) ((wheelCursor + relativeIndex) & mask); + final int stopIndex = (int) ((wheelCursor + relativeIndex) & mask); timeout.stopIndex = stopIndex; timeout.remainingRounds = remainingRounds; wheel[stopIndex].add(timeout); @@ -303,22 +300,17 @@ public class HashedWheelTimer implements Timer { } public void run() { - List expiredTimeouts = - new ArrayList(); - startTime = System.nanoTime(); tick = 1; while (!shutdown()) { final long deadline = waitForNextTick(); - if (deadline > 0) { - fetchExpiredTimeouts(expiredTimeouts, deadline); - notifyExpiredTimeouts(expiredTimeouts); - } + if (deadline > 0) + notifyExpiredTimeouts(fetchExpiredTimeouts(deadline)); } } - private void fetchExpiredTimeouts(List expiredTimeouts, long deadline) { + private ArrayList fetchExpiredTimeouts(long deadline) { // Find the expired timeouts and decrease the round counter // if necessary. Note that we don't send the notification @@ -326,18 +318,15 @@ public class HashedWheelTimer implements Timer { // an exclusive lock. lock.writeLock().lock(); try { - int newWheelCursor = wheelCursor = wheelCursor + 1 & mask; - ReusableIterator i = iterators[newWheelCursor]; - fetchExpiredTimeouts(expiredTimeouts, i, deadline); + final int newWheelCursor = wheelCursor = wheelCursor + 1 & mask; + return fetchExpiredTimeouts(iterators[newWheelCursor], deadline); } finally { lock.writeLock().unlock(); } } - private void fetchExpiredTimeouts( - List expiredTimeouts, - ReusableIterator i, long deadline) { - + private ArrayList fetchExpiredTimeouts(final ReusableIterator i, final long deadline) { + final ArrayList expiredTimeouts = new ArrayList(); List slipped = null; i.rewind(); while (i.hasNext()) { @@ -351,13 +340,13 @@ public class HashedWheelTimer implements Timer { // place, usually one tick earlier. For now, just add // it to a temporary list - we will reschedule it in a // separate loop. - if (slipped == null) { + if (slipped == null) slipped = new ArrayList(); - } + slipped.add(timeout); } } else { - timeout.remainingRounds --; + timeout.remainingRounds -= 1; } } @@ -367,10 +356,10 @@ public class HashedWheelTimer implements Timer { scheduleTimeout(timeout, timeout.deadline - deadline); } } + return expiredTimeouts; } - private void notifyExpiredTimeouts( - List expiredTimeouts) { + private void notifyExpiredTimeouts(ArrayList expiredTimeouts) { // Notify the expired timeouts. for (int i = expiredTimeouts.size() - 1; i >= 0; i --) { expiredTimeouts.get(i).expire(); @@ -413,69 +402,83 @@ public class HashedWheelTimer implements Timer { } } - private final class HashedWheelTimeout implements Timeout { + private static final class HashedWheelTimeout implements Timeout { + private static final long _stateOffset; + + static { + try { + _stateOffset = Unsafe.instance.objectFieldOffset(HashedWheelTimeout.class.getDeclaredField("_state")); + } catch(Throwable t){ + throw new ExceptionInInitializerError(t); + } + } private static final int ST_INIT = 0; private static final int ST_CANCELLED = 1; private static final int ST_EXPIRED = 2; + private final HashedWheelTimer parent; private final TimerTask task; final long deadline; volatile int stopIndex; volatile long remainingRounds; - private final AtomicInteger state = new AtomicInteger(ST_INIT); + @SuppressWarnings("unused") + private volatile int _state = ST_INIT; - HashedWheelTimeout(TimerTask task, long deadline) { + HashedWheelTimeout(HashedWheelTimer parent, TimerTask task, long deadline) { + this.parent = parent; this.task = task; this.deadline = deadline; } public Timer getTimer() { - return HashedWheelTimer.this; + return parent; } public TimerTask getTask() { return task; } - public void cancel() { - if (!state.compareAndSet(ST_INIT, ST_CANCELLED)) { - // TODO return false - return; - } + private final int state() { + return Unsafe.instance.getIntVolatile(this, _stateOffset); + } + private final boolean updateState(int old, int future) { + return Unsafe.instance.compareAndSwapInt(this, _stateOffset, old, future); + } - wheel[stopIndex].remove(this); + public void cancel() { + if (updateState(ST_INIT, ST_CANCELLED)) { + parent.wheel[stopIndex].remove(this); + } } public boolean isCancelled() { - return state.get() == ST_CANCELLED; + return state() == ST_CANCELLED; } public boolean isExpired() { - return state.get() != ST_INIT; + return state() != ST_INIT; } public void expire() { - if (!state.compareAndSet(ST_INIT, ST_EXPIRED)) { - return; - } - - try { - task.run(this); - } catch (Throwable t) { - logger.warning( - "An exception was thrown by " + - TimerTask.class.getSimpleName() + ".", t); + if (updateState(ST_INIT, ST_EXPIRED)) { + try { + task.run(this); + } catch (Throwable t) { + parent.logger.warning( + "An exception was thrown by " + + TimerTask.class.getSimpleName() + ".", t); + } } } @Override public String toString() { - long currentTime = System.nanoTime(); - long remaining = deadline - currentTime; + final long currentTime = System.nanoTime(); + final long remaining = deadline - currentTime; StringBuilder buf = new StringBuilder(192); - buf.append(getClass().getSimpleName()); + buf.append("HashedWheelTimeout"); buf.append('('); buf.append("deadline: ");