From 18cd126fd5f9092d0e17da2936a69d7b1d469b87 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Wed, 26 Jun 2013 16:10:27 +0200 Subject: [PATCH 1/2] Removing the use of 'null' in LARS --- .../src/main/scala/akka/actor/Scheduler.scala | 81 +++++++++---------- 1 file changed, 38 insertions(+), 43 deletions(-) diff --git a/akka-actor/src/main/scala/akka/actor/Scheduler.scala b/akka-actor/src/main/scala/akka/actor/Scheduler.scala index d88391f510..270b163cb5 100644 --- a/akka-actor/src/main/scala/akka/actor/Scheduler.scala +++ b/akka-actor/src/main/scala/akka/actor/Scheduler.scala @@ -276,16 +276,15 @@ class LightArrayRevolverScheduler(config: Config, case SchedulerException(msg) ⇒ throw new IllegalStateException(msg) } - private def execDirectly(t: TimerTask): Unit = { - try t.run() catch { - case e: InterruptedException ⇒ throw e - case _: SchedulerException ⇒ // ignore terminated actors - case NonFatal(e) ⇒ log.error(e, "exception while executing timer task") - } + override def close(): Unit = Await.result(stop(), getShutdownTimeout) foreach { + task ⇒ + try task.run() catch { + case e: InterruptedException ⇒ throw e + case _: SchedulerException ⇒ // ignore terminated actors + case NonFatal(e) ⇒ log.error(e, "exception while executing timer task") + } } - override def close(): Unit = Await.result(stop(), getShutdownTimeout) foreach execDirectly - override val maxFrequency: Double = 1.second / TickDuration /* @@ -314,7 +313,7 @@ class LightArrayRevolverScheduler(config: Config, } private val stopped = new AtomicReference[Promise[immutable.Seq[TimerTask]]] - def stop(): Future[immutable.Seq[TimerTask]] = { + private def stop(): Future[immutable.Seq[TimerTask]] = { val p = Promise[immutable.Seq[TimerTask]]() if (stopped.compareAndSet(null, p)) { // Interrupting the timer thread to make it shut down faster is not good since @@ -344,20 +343,20 @@ class LightArrayRevolverScheduler(config: Config, private def checkQueue(time: Long, pastExec: Boolean): Unit = queue.pollNode() match { case null ⇒ () case node ⇒ - if (node.value.ticks == 0) { - node.value.executeTask() - } else { - val futureTick = ((time + node.value.ticks * tickNanos - start) / tickNanos).toInt - val offset = futureTick - tick - val bucket = futureTick & wheelMask - val isCurrent = (offset & wheelMask) == 0 - /* - * if we enqueue into the current bucket after having executed its contents, - * we must correct for this because otherwise the task would execute one - * rotation later than requested - */ - node.value.ticks = if (pastExec && isCurrent) offset - WheelSize else offset - wheel(bucket).addNode(node) + node.value.ticks match { + case 0 ⇒ node.value.executeTask() + case ticks ⇒ + val futureTick = ((time + ticks * tickNanos - start) / tickNanos).toInt + val offset = futureTick - tick + val bucket = futureTick & wheelMask + val isCurrent = (offset & wheelMask) == 0 + /* + * if we enqueue into the current bucket after having executed its contents, + * we must correct for this because otherwise the task would execute one + * rotation later than requested + */ + node.value.ticks = if (pastExec && isCurrent) offset - WheelSize else offset + wheel(bucket).addNode(node) } checkQueue(time, pastExec) } @@ -432,7 +431,7 @@ class LightArrayRevolverScheduler(config: Config, } object LightArrayRevolverScheduler { - private val taskOffset = unsafe.objectFieldOffset(classOf[TaskHolder].getDeclaredField("task")) + private[this] val taskOffset = unsafe.objectFieldOffset(classOf[TaskHolder].getDeclaredField("task")) private class TaskQueue extends AbstractNodeQueue[TaskHolder] @@ -448,17 +447,14 @@ object LightArrayRevolverScheduler { extends TimerTask { @tailrec - private final def extractTask(cancel: Boolean): Runnable = { + private final def extractTask(replaceWith: Runnable): Runnable = task match { - case null | CancelledTask ⇒ null // null means expired - case x ⇒ - if (unsafe.compareAndSwapObject(this, taskOffset, x, if (cancel) CancelledTask else null)) x - else extractTask(cancel) + case t @ (ExecutedTask | CancelledTask) ⇒ t + case x ⇒ if (unsafe.compareAndSwapObject(this, taskOffset, x, replaceWith)) x else extractTask(replaceWith) } - } - private[akka] final def executeTask(): Boolean = extractTask(cancel = false) match { - case null | CancelledTask ⇒ false + private[akka] final def executeTask(): Boolean = extractTask(ExecutedTask) match { + case ExecutedTask | CancelledTask ⇒ false case other ⇒ try { executionContext execute other @@ -469,28 +465,27 @@ object LightArrayRevolverScheduler { } } - /** - * utility method to directly run the task, e.g. as clean-up action - */ - def run(): Unit = extractTask(cancel = false) match { - case null ⇒ - case r ⇒ r.run() - } + // This should only be called in execDirectly + override def run(): Unit = extractTask(ExecutedTask).run() - override def cancel(): Boolean = extractTask(cancel = true) != null + override def cancel(): Boolean = extractTask(CancelledTask) match { + case ExecutedTask | CancelledTask ⇒ false + case _ ⇒ true + } override def isCancelled: Boolean = task eq CancelledTask } - private val CancelledTask = new Runnable { def run = () } + private[this] val CancelledTask = new Runnable { def run = () } + private[this] val ExecutedTask = new Runnable { def run = () } - private val NotCancellable = new TimerTask { + private val NotCancellable: TimerTask = new TimerTask { def cancel(): Boolean = false def isCancelled: Boolean = false def run(): Unit = () } - private val InitialRepeatMarker = new Cancellable { + private val InitialRepeatMarker: Cancellable = new Cancellable { def cancel(): Boolean = false def isCancelled: Boolean = false } From a359e1b88e3b56f0ee3a9f614da8387fb6a750cf Mon Sep 17 00:00:00 2001 From: Roland Kuhn Date: Wed, 26 Jun 2013 17:01:29 +0200 Subject: [PATCH 2/2] fix yet another bug in LARS, see #3471 - the scheduler should round up the calculated slot instead of rounding down - SchedulerSpec did not catch this because it added 5ms leeway (which is wrong and now removed) - InboxActor exposed it by being really picky with early-firing timers; change that to be more robust --- .../test/scala/akka/actor/SchedulerSpec.scala | 2 +- .../src/main/scala/akka/actor/Scheduler.scala | 25 ++++++++----------- .../src/main/scala/akka/actor/dsl/Inbox.scala | 3 ++- 3 files changed, 13 insertions(+), 17 deletions(-) diff --git a/akka-actor-tests/src/test/scala/akka/actor/SchedulerSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/SchedulerSpec.scala index 0c2983dd98..c40f9b02ff 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/SchedulerSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/SchedulerSpec.scala @@ -202,7 +202,7 @@ trait SchedulerSpec extends BeforeAndAfterEach with DefaultTimeout with Implicit case Msg(ts) ⇒ val now = System.nanoTime // Make sure that no message has been dispatched before the scheduled time (10ms) has occurred - if (now - ts < 5.millis.toNanos) throw new RuntimeException("Interval is too small: " + (now - ts)) + if (now < ts) throw new RuntimeException("Interval is too small: " + (now - ts)) ticks.countDown() } })) diff --git a/akka-actor/src/main/scala/akka/actor/Scheduler.scala b/akka-actor/src/main/scala/akka/actor/Scheduler.scala index 270b163cb5..87567b113c 100644 --- a/akka-actor/src/main/scala/akka/actor/Scheduler.scala +++ b/akka-actor/src/main/scala/akka/actor/Scheduler.scala @@ -340,25 +340,23 @@ class LightArrayRevolverScheduler(config: Config, } @tailrec - private def checkQueue(time: Long, pastExec: Boolean): Unit = queue.pollNode() match { + private def checkQueue(time: Long): Unit = queue.pollNode() match { case null ⇒ () case node ⇒ node.value.ticks match { case 0 ⇒ node.value.executeTask() case ticks ⇒ - val futureTick = ((time + ticks * tickNanos - start) / tickNanos).toInt + val futureTick = (( + time - start + // calculate the nanos since timer start + (ticks * tickNanos) + // adding the desired delay + tickNanos - 1 // rounding up + ) / tickNanos).toInt // and converting to slot number val offset = futureTick - tick val bucket = futureTick & wheelMask - val isCurrent = (offset & wheelMask) == 0 - /* - * if we enqueue into the current bucket after having executed its contents, - * we must correct for this because otherwise the task would execute one - * rotation later than requested - */ - node.value.ticks = if (pastExec && isCurrent) offset - WheelSize else offset + node.value.ticks = offset wheel(bucket).addNode(node) } - checkQueue(time, pastExec) + checkQueue(time) } override final def run = @@ -387,11 +385,11 @@ class LightArrayRevolverScheduler(config: Config, @tailrec final def nextTick(): Unit = { val time = clock() - val sleepTime = start + tick * tickNanos - time + val sleepTime = start + (tick * tickNanos) - time if (sleepTime > 0) { // check the queue before taking a nap - checkQueue(time, pastExec = false) + checkQueue(time) waitNanos(sleepTime) } else { val bucket = tick & wheelMask @@ -413,9 +411,6 @@ class LightArrayRevolverScheduler(config: Config, executeBucket() wheel(bucket) = putBack - // check the queue now so that ticks==0 tasks can execute immediately - checkQueue(clock(), pastExec = true) - tick += 1 } stopped.get match { diff --git a/akka-actor/src/main/scala/akka/actor/dsl/Inbox.scala b/akka-actor/src/main/scala/akka/actor/dsl/Inbox.scala index 463d771dae..0dd8afe1c2 100644 --- a/akka-actor/src/main/scala/akka/actor/dsl/Inbox.scala +++ b/akka-actor/src/main/scala/akka/actor/dsl/Inbox.scala @@ -130,7 +130,8 @@ trait Inbox { this: ActorDSL.type ⇒ import context.dispatcher if (currentDeadline.isEmpty) { currentDeadline = Some((next, context.system.scheduler.scheduleOnce(next.timeLeft, self, Kick))) - } else if (currentDeadline.get._1 != next) { + } else { + // must not rely on the Scheduler to not fire early (for robustness) currentDeadline.get._2.cancel() currentDeadline = Some((next, context.system.scheduler.scheduleOnce(next.timeLeft, self, Kick))) }