diff --git a/akka-actor/src/main/scala/akka/actor/LightArrayRevolverScheduler.scala b/akka-actor/src/main/scala/akka/actor/LightArrayRevolverScheduler.scala new file mode 100644 index 0000000000..6ee5a5df20 --- /dev/null +++ b/akka-actor/src/main/scala/akka/actor/LightArrayRevolverScheduler.scala @@ -0,0 +1,354 @@ +/** + * Copyright (C) 2009-2015 Typesafe Inc. + */ + +package akka.actor + +import java.io.Closeable +import java.util.concurrent.ThreadFactory +import java.util.concurrent.atomic.{ AtomicLong, AtomicReference, AtomicReferenceArray } +import scala.annotation.tailrec +import scala.collection.immutable +import scala.concurrent.{ Await, ExecutionContext, Future, Promise } +import scala.concurrent.duration._ +import scala.util.control.{ NoStackTrace, NonFatal } +import com.typesafe.config.Config +import akka.event.LoggingAdapter +import akka.util.Helpers +import akka.util.Unsafe.{ instance ⇒ unsafe } +import akka.dispatch.AbstractNodeQueue + +/** + * This scheduler implementation is based on a revolving wheel of buckets, + * like Netty’s HashedWheelTimer, which it advances at a fixed tick rate and + * dispatches tasks it finds in the current bucket to their respective + * ExecutionContexts. The tasks are held in TaskHolders, which upon + * cancellation null out their reference to the actual task, leaving only this + * shell to be cleaned up when the wheel reaches that bucket next time. This + * enables the use of a simple linked list to chain the TaskHolders off the + * wheel. + * + * Also noteworthy is that this scheduler does not obtain a current time stamp + * when scheduling single-shot tasks, instead it always rounds up the task + * delay to a full multiple of the TickDuration. This means that tasks are + * scheduled possibly one tick later than they could be (if checking that + * “now() + delay <= nextTick” were done). + */ +class LightArrayRevolverScheduler(config: Config, + log: LoggingAdapter, + threadFactory: ThreadFactory) + extends Scheduler with Closeable { + + import Helpers.Requiring + import Helpers.ConfigOps + + val WheelSize = + config.getInt("akka.scheduler.ticks-per-wheel") + .requiring(ticks ⇒ (ticks & (ticks - 1)) == 0, "ticks-per-wheel must be a power of 2") + val TickDuration = + config.getMillisDuration("akka.scheduler.tick-duration") + .requiring(_ >= 10.millis || !Helpers.isWindows, "minimum supported akka.scheduler.tick-duration on Windows is 10ms") + .requiring(_ >= 1.millis, "minimum supported akka.scheduler.tick-duration is 1ms") + val ShutdownTimeout = config.getMillisDuration("akka.scheduler.shutdown-timeout") + + import LightArrayRevolverScheduler._ + + private val oneNs = Duration.fromNanos(1l) + private def roundUp(d: FiniteDuration): FiniteDuration = + try { + ((d + TickDuration - oneNs) / TickDuration).toLong * TickDuration + } catch { + case _: IllegalArgumentException ⇒ d // rounding up Long.MaxValue.nanos overflows + } + + /** + * Clock implementation is replaceable (for testing); the implementation must + * return a monotonically increasing series of Long nanoseconds. + */ + protected def clock(): Long = System.nanoTime + + /** + * Overridable for tests + */ + protected def getShutdownTimeout: FiniteDuration = ShutdownTimeout + + /** + * Overridable for tests + */ + protected def waitNanos(nanos: Long): Unit = { + // see http://www.javamex.com/tutorials/threads/sleep_issues.shtml + val sleepMs = if (Helpers.isWindows) (nanos + 4999999) / 10000000 * 10 else (nanos + 999999) / 1000000 + try Thread.sleep(sleepMs) catch { + case _: InterruptedException ⇒ Thread.currentThread.interrupt() // we got woken up + } + } + + override def schedule(initialDelay: FiniteDuration, + delay: FiniteDuration, + runnable: Runnable)(implicit executor: ExecutionContext): Cancellable = { + checkMaxDelay(roundUp(delay).toNanos) + val preparedEC = executor.prepare() + try new AtomicReference[Cancellable](InitialRepeatMarker) with Cancellable { self ⇒ + compareAndSet(InitialRepeatMarker, schedule( + preparedEC, + new AtomicLong(clock() + initialDelay.toNanos) with Runnable { + override def run(): Unit = { + try { + runnable.run() + val driftNanos = clock() - getAndAdd(delay.toNanos) + if (self.get != null) + swap(schedule(preparedEC, this, Duration.fromNanos(Math.max(delay.toNanos - driftNanos, 1)))) + } catch { + case _: SchedulerException ⇒ // ignore failure to enqueue or terminated target actor + } + } + }, roundUp(initialDelay))) + + @tailrec private def swap(c: Cancellable): Unit = { + get match { + case null ⇒ if (c != null) c.cancel() + case old ⇒ if (!compareAndSet(old, c)) swap(c) + } + } + + @tailrec final def cancel(): Boolean = { + get match { + case null ⇒ false + case c ⇒ + if (c.cancel()) compareAndSet(c, null) + else compareAndSet(c, null) || cancel() + } + } + + override def isCancelled: Boolean = get == null + } catch { + case SchedulerException(msg) ⇒ throw new IllegalStateException(msg) + } + } + + override def scheduleOnce(delay: FiniteDuration, runnable: Runnable)(implicit executor: ExecutionContext): Cancellable = + try schedule(executor.prepare(), runnable, roundUp(delay)) + catch { + case SchedulerException(msg) ⇒ throw new IllegalStateException(msg) + } + + override def close(): Unit = Await.result(stop(), getShutdownTimeout) foreach { + task ⇒ + try task.run() catch { + case e: InterruptedException ⇒ throw e + case _: SchedulerException ⇒ // ignore terminated actors + case NonFatal(e) ⇒ log.error(e, "exception while executing timer task") + } + } + + override val maxFrequency: Double = 1.second / TickDuration + + /* + * BELOW IS THE ACTUAL TIMER IMPLEMENTATION + */ + + private val start = clock() + private val tickNanos = TickDuration.toNanos + private val wheelMask = WheelSize - 1 + private val queue = new TaskQueue + + private def schedule(ec: ExecutionContext, r: Runnable, delay: FiniteDuration): TimerTask = + if (delay <= Duration.Zero) { + if (stopped.get != null) throw new SchedulerException("cannot enqueue after timer shutdown") + ec.execute(r) + NotCancellable + } else if (stopped.get != null) { + throw new SchedulerException("cannot enqueue after timer shutdown") + } else { + val delayNanos = delay.toNanos + checkMaxDelay(delayNanos) + + val ticks = (delayNanos / tickNanos).toInt + val task = new TaskHolder(r, ticks, ec) + queue.add(task) + if (stopped.get != null && task.cancel()) + throw new SchedulerException("cannot enqueue after timer shutdown") + task + } + + private def checkMaxDelay(delayNanos: Long): Unit = + if (delayNanos / tickNanos > Int.MaxValue) + // 1 second margin in the error message due to rounding + throw new IllegalArgumentException(s"Task scheduled with [${delayNanos.nanos.toSeconds}] seconds delay, " + + s"which is too far in future, maximum delay is [${(tickNanos * Int.MaxValue).nanos.toSeconds - 1}] seconds") + + private val stopped = new AtomicReference[Promise[immutable.Seq[TimerTask]]] + private def stop(): Future[immutable.Seq[TimerTask]] = { + val p = Promise[immutable.Seq[TimerTask]]() + if (stopped.compareAndSet(null, p)) { + // Interrupting the timer thread to make it shut down faster is not good since + // it could be in the middle of executing the scheduled tasks, which might not + // respond well to being interrupted. + // Instead we just wait one more tick for it to finish. + p.future + } else Future.successful(Nil) + } + + @volatile private var timerThread: Thread = threadFactory.newThread(new Runnable { + + var tick = 0 + val wheel = Array.fill(WheelSize)(new TaskQueue) + + private def clearAll(): immutable.Seq[TimerTask] = { + @tailrec def collect(q: TaskQueue, acc: Vector[TimerTask]): Vector[TimerTask] = { + q.poll() match { + case null ⇒ acc + case x ⇒ collect(q, acc :+ x) + } + } + ((0 until WheelSize) flatMap (i ⇒ collect(wheel(i), Vector.empty))) ++ collect(queue, Vector.empty) + } + + @tailrec + private def checkQueue(time: Long): Unit = queue.pollNode() match { + case null ⇒ () + case node ⇒ + node.value.ticks match { + case 0 ⇒ node.value.executeTask() + case ticks ⇒ + val futureTick = (( + time - start + // calculate the nanos since timer start + (ticks * tickNanos) + // adding the desired delay + tickNanos - 1 // rounding up + ) / tickNanos).toInt // and converting to slot number + // tick is an Int that will wrap around, but toInt of futureTick gives us modulo operations + // and the difference (offset) will be correct in any case + val offset = futureTick - tick + val bucket = futureTick & wheelMask + node.value.ticks = offset + wheel(bucket).addNode(node) + } + checkQueue(time) + } + + override final def run = + try nextTick() + catch { + case t: Throwable ⇒ + log.error(t, "exception on LARS’ timer thread") + stopped.get match { + case null ⇒ + val thread = threadFactory.newThread(this) + log.info("starting new LARS thread") + try thread.start() + catch { + case e: Throwable ⇒ + log.error(e, "LARS cannot start new thread, ship’s going down!") + stopped.set(Promise successful Nil) + clearAll() + } + timerThread = thread + case p ⇒ + assert(stopped.compareAndSet(p, Promise successful Nil), "Stop signal violated in LARS") + p success clearAll() + } + throw t + } + + @tailrec final def nextTick(): Unit = { + val time = clock() + val sleepTime = start + (tick * tickNanos) - time + + if (sleepTime > 0) { + // check the queue before taking a nap + checkQueue(time) + waitNanos(sleepTime) + } else { + val bucket = tick & wheelMask + val tasks = wheel(bucket) + val putBack = new TaskQueue + + @tailrec def executeBucket(): Unit = tasks.pollNode() match { + case null ⇒ () + case node ⇒ + val task = node.value + if (!task.isCancelled) { + if (task.ticks >= WheelSize) { + task.ticks -= WheelSize + putBack.addNode(node) + } else task.executeTask() + } + executeBucket() + } + executeBucket() + wheel(bucket) = putBack + + tick += 1 + } + stopped.get match { + case null ⇒ nextTick() + case p ⇒ + assert(stopped.compareAndSet(p, Promise successful Nil), "Stop signal violated in LARS") + p success clearAll() + } + } + }) + + timerThread.start() +} + +object LightArrayRevolverScheduler { + private[this] val taskOffset = unsafe.objectFieldOffset(classOf[TaskHolder].getDeclaredField("task")) + + private class TaskQueue extends AbstractNodeQueue[TaskHolder] + + /** + * INTERNAL API + */ + protected[actor] trait TimerTask extends Runnable with Cancellable + + /** + * INTERNAL API + */ + protected[actor] class TaskHolder(@volatile var task: Runnable, var ticks: Int, executionContext: ExecutionContext) + extends TimerTask { + + @tailrec + private final def extractTask(replaceWith: Runnable): Runnable = + task match { + case t @ (ExecutedTask | CancelledTask) ⇒ t + case x ⇒ if (unsafe.compareAndSwapObject(this, taskOffset, x, replaceWith)) x else extractTask(replaceWith) + } + + private[akka] final def executeTask(): Boolean = extractTask(ExecutedTask) match { + case ExecutedTask | CancelledTask ⇒ false + case other ⇒ + try { + executionContext execute other + true + } catch { + case _: InterruptedException ⇒ { Thread.currentThread.interrupt(); false } + case NonFatal(e) ⇒ { executionContext.reportFailure(e); false } + } + } + + // This should only be called in execDirectly + override def run(): Unit = extractTask(ExecutedTask).run() + + override def cancel(): Boolean = extractTask(CancelledTask) match { + case ExecutedTask | CancelledTask ⇒ false + case _ ⇒ true + } + + override def isCancelled: Boolean = task eq CancelledTask + } + + private[this] val CancelledTask = new Runnable { def run = () } + private[this] val ExecutedTask = new Runnable { def run = () } + + private val NotCancellable: TimerTask = new TimerTask { + def cancel(): Boolean = false + def isCancelled: Boolean = false + def run(): Unit = () + } + + private val InitialRepeatMarker: Cancellable = new Cancellable { + def cancel(): Boolean = false + def isCancelled: Boolean = false + } +} diff --git a/akka-actor/src/main/scala/akka/actor/Scheduler.scala b/akka-actor/src/main/scala/akka/actor/Scheduler.scala index 8fb6c7e884..426d094b6b 100644 --- a/akka-actor/src/main/scala/akka/actor/Scheduler.scala +++ b/akka-actor/src/main/scala/akka/actor/Scheduler.scala @@ -4,19 +4,9 @@ package akka.actor -import java.io.Closeable -import java.util.concurrent.ThreadFactory -import java.util.concurrent.atomic.{ AtomicLong, AtomicReference, AtomicReferenceArray } -import scala.annotation.tailrec -import scala.collection.immutable -import scala.concurrent.{ Await, ExecutionContext, Future, Promise } +import scala.concurrent.ExecutionContext import scala.concurrent.duration._ -import scala.util.control.{ NoStackTrace, NonFatal } -import com.typesafe.config.Config -import akka.event.LoggingAdapter -import akka.util.Helpers -import akka.util.Unsafe.{ instance ⇒ unsafe } -import akka.dispatch.AbstractNodeQueue +import scala.util.control.NoStackTrace /** * This exception is thrown by Scheduler.schedule* when scheduling is not @@ -177,343 +167,3 @@ trait Cancellable { def isCancelled: Boolean } //#cancellable - -/** - * This scheduler implementation is based on a revolving wheel of buckets, - * like Netty’s HashedWheelTimer, which it advances at a fixed tick rate and - * dispatches tasks it finds in the current bucket to their respective - * ExecutionContexts. The tasks are held in TaskHolders, which upon - * cancellation null out their reference to the actual task, leaving only this - * shell to be cleaned up when the wheel reaches that bucket next time. This - * enables the use of a simple linked list to chain the TaskHolders off the - * wheel. - * - * Also noteworthy is that this scheduler does not obtain a current time stamp - * when scheduling single-shot tasks, instead it always rounds up the task - * delay to a full multiple of the TickDuration. This means that tasks are - * scheduled possibly one tick later than they could be (if checking that - * “now() + delay <= nextTick” were done). - */ -class LightArrayRevolverScheduler(config: Config, - log: LoggingAdapter, - threadFactory: ThreadFactory) - extends Scheduler with Closeable { - - import Helpers.Requiring - import Helpers.ConfigOps - - val WheelSize = - config.getInt("akka.scheduler.ticks-per-wheel") - .requiring(ticks ⇒ (ticks & (ticks - 1)) == 0, "ticks-per-wheel must be a power of 2") - val TickDuration = - config.getMillisDuration("akka.scheduler.tick-duration") - .requiring(_ >= 10.millis || !Helpers.isWindows, "minimum supported akka.scheduler.tick-duration on Windows is 10ms") - .requiring(_ >= 1.millis, "minimum supported akka.scheduler.tick-duration is 1ms") - val ShutdownTimeout = config.getMillisDuration("akka.scheduler.shutdown-timeout") - - import LightArrayRevolverScheduler._ - - private val oneNs = Duration.fromNanos(1l) - private def roundUp(d: FiniteDuration): FiniteDuration = - try { - ((d + TickDuration - oneNs) / TickDuration).toLong * TickDuration - } catch { - case _: IllegalArgumentException ⇒ d // rounding up Long.MaxValue.nanos overflows - } - - /** - * Clock implementation is replaceable (for testing); the implementation must - * return a monotonically increasing series of Long nanoseconds. - */ - protected def clock(): Long = System.nanoTime - - /** - * Overridable for tests - */ - protected def getShutdownTimeout: FiniteDuration = ShutdownTimeout - - /** - * Overridable for tests - */ - protected def waitNanos(nanos: Long): Unit = { - // see http://www.javamex.com/tutorials/threads/sleep_issues.shtml - val sleepMs = if (Helpers.isWindows) (nanos + 4999999) / 10000000 * 10 else (nanos + 999999) / 1000000 - try Thread.sleep(sleepMs) catch { - case _: InterruptedException ⇒ Thread.currentThread.interrupt() // we got woken up - } - } - - override def schedule(initialDelay: FiniteDuration, - delay: FiniteDuration, - runnable: Runnable)(implicit executor: ExecutionContext): Cancellable = { - checkMaxDelay(roundUp(delay).toNanos) - val preparedEC = executor.prepare() - try new AtomicReference[Cancellable](InitialRepeatMarker) with Cancellable { self ⇒ - compareAndSet(InitialRepeatMarker, schedule( - preparedEC, - new AtomicLong(clock() + initialDelay.toNanos) with Runnable { - override def run(): Unit = { - try { - runnable.run() - val driftNanos = clock() - getAndAdd(delay.toNanos) - if (self.get != null) - swap(schedule(preparedEC, this, Duration.fromNanos(Math.max(delay.toNanos - driftNanos, 1)))) - } catch { - case _: SchedulerException ⇒ // ignore failure to enqueue or terminated target actor - } - } - }, roundUp(initialDelay))) - - @tailrec private def swap(c: Cancellable): Unit = { - get match { - case null ⇒ if (c != null) c.cancel() - case old ⇒ if (!compareAndSet(old, c)) swap(c) - } - } - - @tailrec final def cancel(): Boolean = { - get match { - case null ⇒ false - case c ⇒ - if (c.cancel()) compareAndSet(c, null) - else compareAndSet(c, null) || cancel() - } - } - - override def isCancelled: Boolean = get == null - } catch { - case SchedulerException(msg) ⇒ throw new IllegalStateException(msg) - } - } - - override def scheduleOnce(delay: FiniteDuration, runnable: Runnable)(implicit executor: ExecutionContext): Cancellable = - try schedule(executor.prepare(), runnable, roundUp(delay)) - catch { - case SchedulerException(msg) ⇒ throw new IllegalStateException(msg) - } - - /** - * Shutdown the scheduler. All scheduled task will be executed when the scheduler closed, - * i.e. the task may execute before its timeout. - */ - override def close(): Unit = Await.result(stop(), getShutdownTimeout) foreach { - task ⇒ - try task.run() catch { - case e: InterruptedException ⇒ throw e - case _: SchedulerException ⇒ // ignore terminated actors - case NonFatal(e) ⇒ log.error(e, "exception while executing timer task") - } - } - - override val maxFrequency: Double = 1.second / TickDuration - - /* - * BELOW IS THE ACTUAL TIMER IMPLEMENTATION - */ - - private val start = clock() - private val tickNanos = TickDuration.toNanos - private val wheelMask = WheelSize - 1 - private val queue = new TaskQueue - - private def schedule(ec: ExecutionContext, r: Runnable, delay: FiniteDuration): TimerTask = - if (delay <= Duration.Zero) { - if (stopped.get != null) throw new SchedulerException("cannot enqueue after timer shutdown") - ec.execute(r) - NotCancellable - } else if (stopped.get != null) { - throw new SchedulerException("cannot enqueue after timer shutdown") - } else { - val delayNanos = delay.toNanos - checkMaxDelay(delayNanos) - - val ticks = (delayNanos / tickNanos).toInt - val task = new TaskHolder(r, ticks, ec) - queue.add(task) - if (stopped.get != null && task.cancel()) - throw new SchedulerException("cannot enqueue after timer shutdown") - task - } - - private def checkMaxDelay(delayNanos: Long): Unit = - if (delayNanos / tickNanos > Int.MaxValue) - // 1 second margin in the error message due to rounding - throw new IllegalArgumentException(s"Task scheduled with [${delayNanos.nanos.toSeconds}] seconds delay, " + - s"which is too far in future, maximum delay is [${(tickNanos * Int.MaxValue).nanos.toSeconds - 1}] seconds") - - private val stopped = new AtomicReference[Promise[immutable.Seq[TimerTask]]] - private def stop(): Future[immutable.Seq[TimerTask]] = { - val p = Promise[immutable.Seq[TimerTask]]() - if (stopped.compareAndSet(null, p)) { - // Interrupting the timer thread to make it shut down faster is not good since - // it could be in the middle of executing the scheduled tasks, which might not - // respond well to being interrupted. - // Instead we just wait one more tick for it to finish. - p.future - } else Future.successful(Nil) - } - - @volatile private var timerThread: Thread = threadFactory.newThread(new Runnable { - - var tick = 0 - val wheel = Array.fill(WheelSize)(new TaskQueue) - - private def clearAll(): immutable.Seq[TimerTask] = { - @tailrec def collect(q: TaskQueue, acc: Vector[TimerTask]): Vector[TimerTask] = { - q.poll() match { - case null ⇒ acc - case x ⇒ collect(q, acc :+ x) - } - } - ((0 until WheelSize) flatMap (i ⇒ collect(wheel(i), Vector.empty))) ++ collect(queue, Vector.empty) - } - - @tailrec - private def checkQueue(time: Long): Unit = queue.pollNode() match { - case null ⇒ () - case node ⇒ - node.value.ticks match { - case 0 ⇒ node.value.executeTask() - case ticks ⇒ - val futureTick = (( - time - start + // calculate the nanos since timer start - (ticks * tickNanos) + // adding the desired delay - tickNanos - 1 // rounding up - ) / tickNanos).toInt // and converting to slot number - // tick is an Int that will wrap around, but toInt of futureTick gives us modulo operations - // and the difference (offset) will be correct in any case - val offset = futureTick - tick - val bucket = futureTick & wheelMask - node.value.ticks = offset - wheel(bucket).addNode(node) - } - checkQueue(time) - } - - override final def run = - try nextTick() - catch { - case t: Throwable ⇒ - log.error(t, "exception on LARS’ timer thread") - stopped.get match { - case null ⇒ - val thread = threadFactory.newThread(this) - log.info("starting new LARS thread") - try thread.start() - catch { - case e: Throwable ⇒ - log.error(e, "LARS cannot start new thread, ship’s going down!") - stopped.set(Promise successful Nil) - clearAll() - } - timerThread = thread - case p ⇒ - assert(stopped.compareAndSet(p, Promise successful Nil), "Stop signal violated in LARS") - p success clearAll() - } - throw t - } - - @tailrec final def nextTick(): Unit = { - val time = clock() - val sleepTime = start + (tick * tickNanos) - time - - if (sleepTime > 0) { - // check the queue before taking a nap - checkQueue(time) - waitNanos(sleepTime) - } else { - val bucket = tick & wheelMask - val tasks = wheel(bucket) - val putBack = new TaskQueue - - @tailrec def executeBucket(): Unit = tasks.pollNode() match { - case null ⇒ () - case node ⇒ - val task = node.value - if (!task.isCancelled) { - if (task.ticks >= WheelSize) { - task.ticks -= WheelSize - putBack.addNode(node) - } else task.executeTask() - } - executeBucket() - } - executeBucket() - wheel(bucket) = putBack - - tick += 1 - } - stopped.get match { - case null ⇒ nextTick() - case p ⇒ - assert(stopped.compareAndSet(p, Promise successful Nil), "Stop signal violated in LARS") - p success clearAll() - } - } - }) - - timerThread.start() -} - -object LightArrayRevolverScheduler { - private[this] val taskOffset = unsafe.objectFieldOffset(classOf[TaskHolder].getDeclaredField("task")) - - private class TaskQueue extends AbstractNodeQueue[TaskHolder] - - /** - * INTERNAL API - */ - protected[actor] trait TimerTask extends Runnable with Cancellable - - /** - * INTERNAL API - */ - protected[actor] class TaskHolder(@volatile var task: Runnable, var ticks: Int, executionContext: ExecutionContext) - extends TimerTask { - - @tailrec - private final def extractTask(replaceWith: Runnable): Runnable = - task match { - case t @ (ExecutedTask | CancelledTask) ⇒ t - case x ⇒ if (unsafe.compareAndSwapObject(this, taskOffset, x, replaceWith)) x else extractTask(replaceWith) - } - - private[akka] final def executeTask(): Boolean = extractTask(ExecutedTask) match { - case ExecutedTask | CancelledTask ⇒ false - case other ⇒ - try { - executionContext execute other - true - } catch { - case _: InterruptedException ⇒ { Thread.currentThread.interrupt(); false } - case NonFatal(e) ⇒ { executionContext.reportFailure(e); false } - } - } - - // This should only be called in execDirectly - override def run(): Unit = extractTask(ExecutedTask).run() - - override def cancel(): Boolean = extractTask(CancelledTask) match { - case ExecutedTask | CancelledTask ⇒ false - case _ ⇒ true - } - - override def isCancelled: Boolean = task eq CancelledTask - } - - private[this] val CancelledTask = new Runnable { def run = () } - private[this] val ExecutedTask = new Runnable { def run = () } - - private val NotCancellable: TimerTask = new TimerTask { - def cancel(): Boolean = false - def isCancelled: Boolean = false - def run(): Unit = () - } - - private val InitialRepeatMarker: Cancellable = new Cancellable { - def cancel(): Boolean = false - def isCancelled: Boolean = false - } -} -