From c0cead3aad13ed178b4e2f0f6fd9ca031068db4f Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Wed, 16 May 2012 16:16:31 +0200 Subject: [PATCH] Cleaning up Scheduler, rewriting ContinuousCancellable --- .../src/main/scala/akka/actor/Scheduler.scala | 56 +++++++------------ 1 file changed, 21 insertions(+), 35 deletions(-) diff --git a/akka-actor/src/main/scala/akka/actor/Scheduler.scala b/akka-actor/src/main/scala/akka/actor/Scheduler.scala index 827e511308..6155cab10c 100644 --- a/akka-actor/src/main/scala/akka/actor/Scheduler.scala +++ b/akka-actor/src/main/scala/akka/actor/Scheduler.scala @@ -9,6 +9,8 @@ import org.jboss.netty.akka.util.{ TimerTask, HashedWheelTimer, Timeout ⇒ HWTi import akka.event.LoggingAdapter import akka.dispatch.MessageDispatcher import java.io.Closeable +import java.util.concurrent.atomic.AtomicReference +import scala.annotation.tailrec //#scheduler /** @@ -188,11 +190,7 @@ class DefaultScheduler(hashedWheelTimer: HashedWheelTimer, private trait ContinuousScheduling { this: TimerTask ⇒ def scheduleNext(timeout: HWTimeout, delay: Duration, delegator: ContinuousCancellable) { - try { - delegator.swap(timeout.getTimer.newTimeout(this, delay)) - } catch { - case _: IllegalStateException ⇒ // stop recurring if timer is stopped - } + try delegator.swap(timeout.getTimer.newTimeout(this, delay)) catch { case _: IllegalStateException ⇒ } // stop recurring if timer is stopped } } @@ -203,7 +201,7 @@ class DefaultScheduler(hashedWheelTimer: HashedWheelTimer, } } - def close() = { + def close(): Unit = { import scala.collection.JavaConverters._ hashedWheelTimer.stop().asScala foreach execDirectly } @@ -214,43 +212,31 @@ class DefaultScheduler(hashedWheelTimer: HashedWheelTimer, * methods. Needed to be able to cancel continuous tasks, * since they create new Timeout for each tick. */ -private[akka] class ContinuousCancellable extends Cancellable { - @volatile - private var delegate: HWTimeout = _ - @volatile - private var cancelled = false - +private[akka] class ContinuousCancellable extends AtomicReference[HWTimeout] with Cancellable { private[akka] def init(initialTimeout: HWTimeout): this.type = { - delegate = initialTimeout + assert(compareAndSet(null, initialTimeout)) this } - private[akka] def swap(newTimeout: HWTimeout): Unit = { - val wasCancelled = isCancelled - delegate = newTimeout - if (wasCancelled || isCancelled) cancel() + @tailrec private[akka] final def swap(newTimeout: HWTimeout): Unit = get match { + case null ⇒ newTimeout.cancel() + case some if some.isCancelled ⇒ cancel(); newTimeout.cancel() + case some ⇒ if (!compareAndSet(some, newTimeout)) swap(newTimeout) } - def isCancelled(): Boolean = { - // delegate is initially null, but this object will not be exposed to the world until after init - cancelled || delegate.isCancelled() + def isCancelled(): Boolean = get match { + case null ⇒ true + case some ⇒ isCancelled() } - def cancel(): Unit = { - // the underlying Timeout will not become cancelled once the task has been started to run, - // therefore we keep a flag here to make sure that rescheduling doesn't occur when cancelled - cancelled = true - // delegate is initially null, but this object will not be exposed to the world until after init - delegate.cancel() - } + def cancel(): Unit = + getAndSet(null) match { + case null ⇒ + case some ⇒ some.cancel() + } } -class DefaultCancellable(val timeout: HWTimeout) extends Cancellable { - def cancel() { - timeout.cancel() - } - - def isCancelled: Boolean = { - timeout.isCancelled - } +private[akka] class DefaultCancellable(val timeout: HWTimeout) extends Cancellable { + override def cancel(): Unit = timeout.cancel() + override def isCancelled: Boolean = timeout.isCancelled }