From 2008bab2ba37fb556734df0b1981c2409e2d6cbb Mon Sep 17 00:00:00 2001 From: Roland Date: Thu, 31 Jan 2013 22:40:15 +0100 Subject: [PATCH] fix two bugs in LARS, see #2950 - initial setting of the repeated task raced with first execution, when the latter won the task would not repeat - there was a race in task submission which could lead to enqueueing one round too late --- .../src/main/scala/akka/actor/Scheduler.scala | 29 ++++++++++++++++--- 1 file changed, 25 insertions(+), 4 deletions(-) diff --git a/akka-actor/src/main/scala/akka/actor/Scheduler.scala b/akka-actor/src/main/scala/akka/actor/Scheduler.scala index e3be799fb2..fba646fc4d 100644 --- a/akka-actor/src/main/scala/akka/actor/Scheduler.scala +++ b/akka-actor/src/main/scala/akka/actor/Scheduler.scala @@ -226,8 +226,8 @@ class LightArrayRevolverScheduler(config: Config, override def schedule(initialDelay: FiniteDuration, delay: FiniteDuration, runnable: Runnable)(implicit executor: ExecutionContext): Cancellable = - try new AtomicReference[Cancellable] with Cancellable { self ⇒ - set(schedule( + try new AtomicReference[Cancellable](InitialRepeatMarker) with Cancellable { self ⇒ + compareAndSet(InitialRepeatMarker, schedule( new AtomicLong(clock() + initialDelay.toNanos) with Runnable { override def run(): Unit = { try { @@ -308,8 +308,24 @@ class LightArrayRevolverScheduler(config: Config, */ @tailrec def rec(t: TaskHolder): TimerTask = { - val bucket = (currentBucket + ticks) & wheelMask - get(bucket) match { + val current = currentBucket + val bucket = (current + ticks) & wheelMask + val contents = get(bucket) + /* + * The timer thread does the following: + * - swap Pause into the currentBucket + * - increment currentBucket + * - swap empty list into the Paused bucket (guaranteed != previous contents, even if empty) + * + * If we read the bucket contents before the last step, everything is fine, + * it will either succeed (all done before the Pause) or fail with Pause or in the CAS. + * But if we read the bucket contents after the third step but had read the currentBucket + * before the second step, then we’d enqueue into the wrong round. After seeing the new + * bucket contents, this next read will need to see the incremented currentBucket so we + * can detect this race and retry. + */ + if (current != currentBucket) rec(t) + else contents match { case Pause ⇒ if (stopped.get != null) throw new SchedulerException("cannot enqueue after timer shutdown") rec(t) @@ -471,6 +487,11 @@ object LightArrayRevolverScheduler { def isCancelled: Boolean = false def run(): Unit = () } + + private val InitialRepeatMarker = new Cancellable { + def cancel(): Boolean = false + def isCancelled: Boolean = false + } // marker object during wheel movement private val Pause = new TaskHolder(null, null, 0)(null) // we need two empty tokens so wheel passing can be detected in schedule()