fix two bugs in LARS, see #2950

- initial setting of the repeated task raced with first execution, when
  the latter won the task would not repeat
- there was a race in task submission which could lead to enqueueing one
  round too late
This commit is contained in:
Roland 2013-01-31 22:40:15 +01:00
parent 5164e2e08e
commit 2008bab2ba

View file

@ -226,8 +226,8 @@ class LightArrayRevolverScheduler(config: Config,
override def schedule(initialDelay: FiniteDuration,
delay: FiniteDuration,
runnable: Runnable)(implicit executor: ExecutionContext): Cancellable =
try new AtomicReference[Cancellable] with Cancellable { self
set(schedule(
try new AtomicReference[Cancellable](InitialRepeatMarker) with Cancellable { self
compareAndSet(InitialRepeatMarker, schedule(
new AtomicLong(clock() + initialDelay.toNanos) with Runnable {
override def run(): Unit = {
try {
@ -308,8 +308,24 @@ class LightArrayRevolverScheduler(config: Config,
*/
@tailrec
def rec(t: TaskHolder): TimerTask = {
val bucket = (currentBucket + ticks) & wheelMask
get(bucket) match {
val current = currentBucket
val bucket = (current + ticks) & wheelMask
val contents = get(bucket)
/*
* The timer thread does the following:
* - swap Pause into the currentBucket
* - increment currentBucket
* - swap empty list into the Paused bucket (guaranteed != previous contents, even if empty)
*
* If we read the bucket contents before the last step, everything is fine,
* it will either succeed (all done before the Pause) or fail with Pause or in the CAS.
* But if we read the bucket contents after the third step but had read the currentBucket
* before the second step, then wed enqueue into the wrong round. After seeing the new
* bucket contents, this next read will need to see the incremented currentBucket so we
* can detect this race and retry.
*/
if (current != currentBucket) rec(t)
else contents match {
case Pause
if (stopped.get != null) throw new SchedulerException("cannot enqueue after timer shutdown")
rec(t)
@ -471,6 +487,11 @@ object LightArrayRevolverScheduler {
def isCancelled: Boolean = false
def run(): Unit = ()
}
private val InitialRepeatMarker = new Cancellable {
def cancel(): Boolean = false
def isCancelled: Boolean = false
}
// marker object during wheel movement
private val Pause = new TaskHolder(null, null, 0)(null)
// we need two empty tokens so wheel passing can be detected in schedule()