Merge pull request #869 from akka/wip-2686-HWT-overflow-∂π
make HashedWheelTimer tolerate wrap-arounds, see #2686
This commit is contained in:
commit
44bf8a5d82
2 changed files with 38 additions and 7 deletions
|
|
@ -8,10 +8,31 @@ import language.postfixOps
|
|||
import org.scalatest.WordSpec
|
||||
import org.scalatest.matchers.MustMatchers
|
||||
import scala.concurrent.duration._
|
||||
import scala.concurrent.Await
|
||||
|
||||
import java.util.concurrent.TimeUnit._
|
||||
import akka.testkit.AkkaSpec
|
||||
import akka.testkit.TestLatch
|
||||
import java.util.concurrent.TimeoutException
|
||||
import akka.testkit.LongRunningTest
|
||||
|
||||
class DurationSpec extends WordSpec with MustMatchers {
|
||||
class DurationSpec extends AkkaSpec {
|
||||
|
||||
"A HashedWheelTimer" must {
|
||||
|
||||
"not mess up long timeouts" taggedAs LongRunningTest in {
|
||||
val longish = Long.MaxValue.nanos
|
||||
val barrier = TestLatch()
|
||||
import system.dispatcher
|
||||
val job = system.scheduler.scheduleOnce(longish)(barrier.countDown())
|
||||
intercept[TimeoutException] {
|
||||
// this used to fire after 46 seconds due to wrap-around
|
||||
Await.ready(barrier, 90 seconds)
|
||||
}
|
||||
job.cancel()
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
"Duration" must {
|
||||
|
||||
|
|
|
|||
|
|
@ -263,8 +263,11 @@ public class HashedWheelTimer implements Timer {
|
|||
|
||||
void scheduleTimeout(HashedWheelTimeout timeout, long delay) {
|
||||
// Prepare the required parameters to schedule the timeout object.
|
||||
final long relativeIndex = Math.max(1, (delay + tickDuration - 1) / tickDuration); // If relative index < 1 then it should be 1
|
||||
|
||||
long relativeIndex = (delay + tickDuration - 1) / tickDuration;
|
||||
// if the previous line had an overflow going on, then we’ll just schedule this timeout
|
||||
// one tick early; that shouldn’t matter since we’re talking 270 years here
|
||||
if (relativeIndex < 0) relativeIndex = delay / tickDuration;
|
||||
if (relativeIndex == 0) relativeIndex = 1;
|
||||
final long remainingRounds = relativeIndex / wheel.length;
|
||||
|
||||
// Add the timeout to the wheel.
|
||||
|
|
@ -304,7 +307,7 @@ public class HashedWheelTimer implements Timer {
|
|||
|
||||
while (!shutdown()) {
|
||||
final long deadline = waitForNextTick();
|
||||
if (deadline > 0)
|
||||
if (deadline > Long.MIN_VALUE)
|
||||
notifyExpiredTimeouts(fetchExpiredTimeouts(deadline));
|
||||
}
|
||||
}
|
||||
|
|
@ -332,7 +335,7 @@ public class HashedWheelTimer implements Timer {
|
|||
HashedWheelTimeout timeout = i.next();
|
||||
if (timeout.remainingRounds <= 0) {
|
||||
i.remove();
|
||||
if (timeout.deadline <= deadline) {
|
||||
if (timeout.deadline - deadline <= 0) {
|
||||
expiredTimeouts.add(timeout);
|
||||
} else {
|
||||
// Handle the case where the timeout is put into a wrong
|
||||
|
|
@ -368,6 +371,12 @@ public class HashedWheelTimer implements Timer {
|
|||
expiredTimeouts.clear();
|
||||
}
|
||||
|
||||
/**
|
||||
* calculate goal nanoTime from startTime and current tick number,
|
||||
* then wait until that goal has been reached.
|
||||
*
|
||||
* @return Long.MIN_VALUE if received a shutdown request, current time otherwise (with Long.MIN_VALUE changed by +1)
|
||||
*/
|
||||
private long waitForNextTick() {
|
||||
long deadline = startTime + tickDuration * tick;
|
||||
|
||||
|
|
@ -378,7 +387,8 @@ public class HashedWheelTimer implements Timer {
|
|||
|
||||
if (sleepTimeMs <= 0) {
|
||||
tick += 1;
|
||||
return currentTime;
|
||||
if (currentTime == Long.MIN_VALUE) return -Long.MAX_VALUE;
|
||||
else return currentTime;
|
||||
}
|
||||
|
||||
// Check if we run on windows, as if thats the case we will need
|
||||
|
|
@ -394,7 +404,7 @@ public class HashedWheelTimer implements Timer {
|
|||
Thread.sleep(sleepTimeMs);
|
||||
} catch (InterruptedException e) {
|
||||
if (shutdown()) {
|
||||
return -1;
|
||||
return Long.MIN_VALUE;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue