Merge pull request #1570 from akka/wip-the-great-un-nulling-√

Removing the use of 'null' in LARS
This commit is contained in:
Roland Kuhn 2013-06-26 10:48:51 -07:00
commit 1cca2b85e3
3 changed files with 43 additions and 52 deletions

View file

@ -202,7 +202,7 @@ trait SchedulerSpec extends BeforeAndAfterEach with DefaultTimeout with Implicit
case Msg(ts) case Msg(ts)
val now = System.nanoTime val now = System.nanoTime
// Make sure that no message has been dispatched before the scheduled time (10ms) has occurred // Make sure that no message has been dispatched before the scheduled time (10ms) has occurred
if (now - ts < 5.millis.toNanos) throw new RuntimeException("Interval is too small: " + (now - ts)) if (now < ts) throw new RuntimeException("Interval is too small: " + (now - ts))
ticks.countDown() ticks.countDown()
} }
})) }))

View file

@ -276,16 +276,15 @@ class LightArrayRevolverScheduler(config: Config,
case SchedulerException(msg) throw new IllegalStateException(msg) case SchedulerException(msg) throw new IllegalStateException(msg)
} }
private def execDirectly(t: TimerTask): Unit = { override def close(): Unit = Await.result(stop(), getShutdownTimeout) foreach {
try t.run() catch { task
case e: InterruptedException throw e try task.run() catch {
case _: SchedulerException // ignore terminated actors case e: InterruptedException throw e
case NonFatal(e) log.error(e, "exception while executing timer task") case _: SchedulerException // ignore terminated actors
} case NonFatal(e) log.error(e, "exception while executing timer task")
}
} }
override def close(): Unit = Await.result(stop(), getShutdownTimeout) foreach execDirectly
override val maxFrequency: Double = 1.second / TickDuration override val maxFrequency: Double = 1.second / TickDuration
/* /*
@ -314,7 +313,7 @@ class LightArrayRevolverScheduler(config: Config,
} }
private val stopped = new AtomicReference[Promise[immutable.Seq[TimerTask]]] private val stopped = new AtomicReference[Promise[immutable.Seq[TimerTask]]]
def stop(): Future[immutable.Seq[TimerTask]] = { private def stop(): Future[immutable.Seq[TimerTask]] = {
val p = Promise[immutable.Seq[TimerTask]]() val p = Promise[immutable.Seq[TimerTask]]()
if (stopped.compareAndSet(null, p)) { if (stopped.compareAndSet(null, p)) {
// Interrupting the timer thread to make it shut down faster is not good since // Interrupting the timer thread to make it shut down faster is not good since
@ -341,25 +340,23 @@ class LightArrayRevolverScheduler(config: Config,
} }
@tailrec @tailrec
private def checkQueue(time: Long, pastExec: Boolean): Unit = queue.pollNode() match { private def checkQueue(time: Long): Unit = queue.pollNode() match {
case null () case null ()
case node case node
if (node.value.ticks == 0) { node.value.ticks match {
node.value.executeTask() case 0 node.value.executeTask()
} else { case ticks
val futureTick = ((time + node.value.ticks * tickNanos - start) / tickNanos).toInt val futureTick = ((
val offset = futureTick - tick time - start + // calculate the nanos since timer start
val bucket = futureTick & wheelMask (ticks * tickNanos) + // adding the desired delay
val isCurrent = (offset & wheelMask) == 0 tickNanos - 1 // rounding up
/* ) / tickNanos).toInt // and converting to slot number
* if we enqueue into the current bucket after having executed its contents, val offset = futureTick - tick
* we must correct for this because otherwise the task would execute one val bucket = futureTick & wheelMask
* rotation later than requested node.value.ticks = offset
*/ wheel(bucket).addNode(node)
node.value.ticks = if (pastExec && isCurrent) offset - WheelSize else offset
wheel(bucket).addNode(node)
} }
checkQueue(time, pastExec) checkQueue(time)
} }
override final def run = override final def run =
@ -388,11 +385,11 @@ class LightArrayRevolverScheduler(config: Config,
@tailrec final def nextTick(): Unit = { @tailrec final def nextTick(): Unit = {
val time = clock() val time = clock()
val sleepTime = start + tick * tickNanos - time val sleepTime = start + (tick * tickNanos) - time
if (sleepTime > 0) { if (sleepTime > 0) {
// check the queue before taking a nap // check the queue before taking a nap
checkQueue(time, pastExec = false) checkQueue(time)
waitNanos(sleepTime) waitNanos(sleepTime)
} else { } else {
val bucket = tick & wheelMask val bucket = tick & wheelMask
@ -414,9 +411,6 @@ class LightArrayRevolverScheduler(config: Config,
executeBucket() executeBucket()
wheel(bucket) = putBack wheel(bucket) = putBack
// check the queue now so that ticks==0 tasks can execute immediately
checkQueue(clock(), pastExec = true)
tick += 1 tick += 1
} }
stopped.get match { stopped.get match {
@ -432,7 +426,7 @@ class LightArrayRevolverScheduler(config: Config,
} }
object LightArrayRevolverScheduler { object LightArrayRevolverScheduler {
private val taskOffset = unsafe.objectFieldOffset(classOf[TaskHolder].getDeclaredField("task")) private[this] val taskOffset = unsafe.objectFieldOffset(classOf[TaskHolder].getDeclaredField("task"))
private class TaskQueue extends AbstractNodeQueue[TaskHolder] private class TaskQueue extends AbstractNodeQueue[TaskHolder]
@ -448,17 +442,14 @@ object LightArrayRevolverScheduler {
extends TimerTask { extends TimerTask {
@tailrec @tailrec
private final def extractTask(cancel: Boolean): Runnable = { private final def extractTask(replaceWith: Runnable): Runnable =
task match { task match {
case null | CancelledTask null // null means expired case t @ (ExecutedTask | CancelledTask) t
case x case x if (unsafe.compareAndSwapObject(this, taskOffset, x, replaceWith)) x else extractTask(replaceWith)
if (unsafe.compareAndSwapObject(this, taskOffset, x, if (cancel) CancelledTask else null)) x
else extractTask(cancel)
} }
}
private[akka] final def executeTask(): Boolean = extractTask(cancel = false) match { private[akka] final def executeTask(): Boolean = extractTask(ExecutedTask) match {
case null | CancelledTask false case ExecutedTask | CancelledTask false
case other case other
try { try {
executionContext execute other executionContext execute other
@ -469,28 +460,27 @@ object LightArrayRevolverScheduler {
} }
} }
/** // This should only be called in execDirectly
* utility method to directly run the task, e.g. as clean-up action override def run(): Unit = extractTask(ExecutedTask).run()
*/
def run(): Unit = extractTask(cancel = false) match {
case null
case r r.run()
}
override def cancel(): Boolean = extractTask(cancel = true) != null override def cancel(): Boolean = extractTask(CancelledTask) match {
case ExecutedTask | CancelledTask false
case _ true
}
override def isCancelled: Boolean = task eq CancelledTask override def isCancelled: Boolean = task eq CancelledTask
} }
private val CancelledTask = new Runnable { def run = () } private[this] val CancelledTask = new Runnable { def run = () }
private[this] val ExecutedTask = new Runnable { def run = () }
private val NotCancellable = new TimerTask { private val NotCancellable: TimerTask = new TimerTask {
def cancel(): Boolean = false def cancel(): Boolean = false
def isCancelled: Boolean = false def isCancelled: Boolean = false
def run(): Unit = () def run(): Unit = ()
} }
private val InitialRepeatMarker = new Cancellable { private val InitialRepeatMarker: Cancellable = new Cancellable {
def cancel(): Boolean = false def cancel(): Boolean = false
def isCancelled: Boolean = false def isCancelled: Boolean = false
} }

View file

@ -130,7 +130,8 @@ trait Inbox { this: ActorDSL.type ⇒
import context.dispatcher import context.dispatcher
if (currentDeadline.isEmpty) { if (currentDeadline.isEmpty) {
currentDeadline = Some((next, context.system.scheduler.scheduleOnce(next.timeLeft, self, Kick))) currentDeadline = Some((next, context.system.scheduler.scheduleOnce(next.timeLeft, self, Kick)))
} else if (currentDeadline.get._1 != next) { } else {
// must not rely on the Scheduler to not fire early (for robustness)
currentDeadline.get._2.cancel() currentDeadline.get._2.cancel()
currentDeadline = Some((next, context.system.scheduler.scheduleOnce(next.timeLeft, self, Kick))) currentDeadline = Some((next, context.system.scheduler.scheduleOnce(next.timeLeft, self, Kick)))
} }