Cleaning up Scheduler, rewriting ContinuousCancellable

This commit is contained in:
Viktor Klang 2012-05-16 16:16:31 +02:00
parent cd31b4b103
commit c0cead3aad

View file

@ -9,6 +9,8 @@ import org.jboss.netty.akka.util.{ TimerTask, HashedWheelTimer, Timeout ⇒ HWTi
import akka.event.LoggingAdapter import akka.event.LoggingAdapter
import akka.dispatch.MessageDispatcher import akka.dispatch.MessageDispatcher
import java.io.Closeable import java.io.Closeable
import java.util.concurrent.atomic.AtomicReference
import scala.annotation.tailrec
//#scheduler //#scheduler
/** /**
@ -188,11 +190,7 @@ class DefaultScheduler(hashedWheelTimer: HashedWheelTimer,
private trait ContinuousScheduling { this: TimerTask private trait ContinuousScheduling { this: TimerTask
def scheduleNext(timeout: HWTimeout, delay: Duration, delegator: ContinuousCancellable) { def scheduleNext(timeout: HWTimeout, delay: Duration, delegator: ContinuousCancellable) {
try { try delegator.swap(timeout.getTimer.newTimeout(this, delay)) catch { case _: IllegalStateException } // stop recurring if timer is stopped
delegator.swap(timeout.getTimer.newTimeout(this, delay))
} catch {
case _: IllegalStateException // stop recurring if timer is stopped
}
} }
} }
@ -203,7 +201,7 @@ class DefaultScheduler(hashedWheelTimer: HashedWheelTimer,
} }
} }
def close() = { def close(): Unit = {
import scala.collection.JavaConverters._ import scala.collection.JavaConverters._
hashedWheelTimer.stop().asScala foreach execDirectly hashedWheelTimer.stop().asScala foreach execDirectly
} }
@ -214,43 +212,31 @@ class DefaultScheduler(hashedWheelTimer: HashedWheelTimer,
* methods. Needed to be able to cancel continuous tasks, * methods. Needed to be able to cancel continuous tasks,
* since they create new Timeout for each tick. * since they create new Timeout for each tick.
*/ */
private[akka] class ContinuousCancellable extends Cancellable { private[akka] class ContinuousCancellable extends AtomicReference[HWTimeout] with Cancellable {
@volatile
private var delegate: HWTimeout = _
@volatile
private var cancelled = false
private[akka] def init(initialTimeout: HWTimeout): this.type = { private[akka] def init(initialTimeout: HWTimeout): this.type = {
delegate = initialTimeout assert(compareAndSet(null, initialTimeout))
this this
} }
private[akka] def swap(newTimeout: HWTimeout): Unit = { @tailrec private[akka] final def swap(newTimeout: HWTimeout): Unit = get match {
val wasCancelled = isCancelled case null newTimeout.cancel()
delegate = newTimeout case some if some.isCancelled cancel(); newTimeout.cancel()
if (wasCancelled || isCancelled) cancel() case some if (!compareAndSet(some, newTimeout)) swap(newTimeout)
} }
def isCancelled(): Boolean = { def isCancelled(): Boolean = get match {
// delegate is initially null, but this object will not be exposed to the world until after init case null true
cancelled || delegate.isCancelled() case some isCancelled()
} }
def cancel(): Unit = { def cancel(): Unit =
// the underlying Timeout will not become cancelled once the task has been started to run, getAndSet(null) match {
// therefore we keep a flag here to make sure that rescheduling doesn't occur when cancelled case null
cancelled = true case some some.cancel()
// delegate is initially null, but this object will not be exposed to the world until after init
delegate.cancel()
} }
} }
class DefaultCancellable(val timeout: HWTimeout) extends Cancellable { private[akka] class DefaultCancellable(val timeout: HWTimeout) extends Cancellable {
def cancel() { override def cancel(): Unit = timeout.cancel()
timeout.cancel() override def isCancelled: Boolean = timeout.isCancelled
}
def isCancelled: Boolean = {
timeout.isCancelled
}
} }