diff --git a/akka-actor/src/main/java/org/jboss/netty/akka/util/HashedWheelTimer.java b/akka-actor/src/main/java/org/jboss/netty/akka/util/HashedWheelTimer.java index 6d31b327c6..5b19e1f2eb 100644 --- a/akka-actor/src/main/java/org/jboss/netty/akka/util/HashedWheelTimer.java +++ b/akka-actor/src/main/java/org/jboss/netty/akka/util/HashedWheelTimer.java @@ -80,7 +80,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; public class HashedWheelTimer implements Timer { private final Worker worker = new Worker(); final Thread workerThread; - final AtomicBoolean shutdown = new AtomicBoolean(); + boolean shutdown = false; private final long roundDuration; final long tickDuration; final Set[] wheel; @@ -181,12 +181,17 @@ public class HashedWheelTimer implements Timer { * {@linkplain #stop() stopped} already */ public synchronized void start() { - if (shutdown.get()) { - throw new IllegalStateException("cannot be started once stopped"); - } + lock.readLock().lock(); + try { + if (shutdown) { + throw new IllegalStateException("cannot be started once stopped"); + } - if (!workerThread.isAlive()) { - workerThread.start(); + if (!workerThread.isAlive()) { + workerThread.start(); + } + } finally { + lock.readLock().unlock(); } } @@ -198,8 +203,15 @@ public class HashedWheelTimer implements Timer { TimerTask.class.getSimpleName()); } - if (!shutdown.compareAndSet(false, true)) { - return Collections.emptySet(); + lock.writeLock().lock(); + try { + if (shutdown) { + return Collections.emptySet(); + } else { + shutdown = true; + } + } finally { + lock.writeLock().unlock(); } boolean interrupted = false; @@ -224,6 +236,10 @@ public class HashedWheelTimer implements Timer { return Collections.unmodifiableSet(unprocessedTimeouts); } + + public HashedWheelTimeout createTimeout(TimerTask task, long time) { + return new HashedWheelTimeout(task, time); + } public Timeout newTimeout(TimerTask task, Duration delay) { final long currentTime = System.nanoTime(); @@ -239,7 +255,7 @@ public class HashedWheelTimer implements Timer { start(); } - HashedWheelTimeout timeout = new HashedWheelTimeout(task, currentTime + delay.toNanos()); + HashedWheelTimeout timeout = createTimeout(task, currentTime + delay.toNanos()); scheduleTimeout(timeout, delay.toNanos()); return timeout; } @@ -260,6 +276,7 @@ public class HashedWheelTimer implements Timer { // Add the timeout to the wheel. lock.readLock().lock(); try { + if (shutdown) throw new IllegalStateException("cannot enqueue after shutdown"); int stopIndex = (int) (wheelCursor + relativeIndex & mask); timeout.stopIndex = stopIndex; timeout.remainingRounds = remainingRounds; @@ -277,6 +294,15 @@ public class HashedWheelTimer implements Timer { Worker() { super(); } + + private boolean shutdown() { + lock.readLock().lock(); + try { + return shutdown; + } finally { + lock.readLock().unlock(); + } + } public void run() { List expiredTimeouts = @@ -285,7 +311,7 @@ public class HashedWheelTimer implements Timer { startTime = System.nanoTime(); tick = 1; - while (!shutdown.get()) { + while (!shutdown()) { final long deadline = waitForNextTick(); if (deadline > 0) { fetchExpiredTimeouts(expiredTimeouts, deadline); @@ -372,7 +398,7 @@ public class HashedWheelTimer implements Timer { int nanoSeconds = (int) (sleepTime - (milliSeconds * 1000000)); Thread.sleep(milliSeconds, nanoSeconds); } catch (InterruptedException e) { - if (shutdown.get()) { + if (shutdown()) { return -1; } }