diff --git a/actor/src/main/scala/org/apache/pekko/actor/LightArrayRevolverScheduler.scala b/actor/src/main/scala/org/apache/pekko/actor/LightArrayRevolverScheduler.scala index 4b9cd82200..da4c830716 100644 --- a/actor/src/main/scala/org/apache/pekko/actor/LightArrayRevolverScheduler.scala +++ b/actor/src/main/scala/org/apache/pekko/actor/LightArrayRevolverScheduler.scala @@ -25,6 +25,7 @@ import scala.util.control.NonFatal import com.typesafe.config.Config import org.apache.pekko +import pekko.actor.Scheduler.AtomicCancellable import pekko.dispatch.AbstractNodeQueue import pekko.event.LoggingAdapter import pekko.util.Helpers @@ -111,49 +112,23 @@ class LightArrayRevolverScheduler(config: Config, log: LoggingAdapter, threadFac override def schedule(initialDelay: FiniteDuration, delay: FiniteDuration, runnable: Runnable)( implicit executor: ExecutionContext): Cancellable = { checkMaxDelay(roundUp(delay).toNanos) - try new AtomicReference[Cancellable](InitialRepeatMarker) with Cancellable { self => - compareAndSet( - InitialRepeatMarker, - schedule( - executor, - new AtomicLong(clock() + initialDelay.toNanos) with Runnable { - override def run(): Unit = { - try { - runnable.run() - val driftNanos = clock() - getAndAdd(delay.toNanos) - if (self.get != null) - swap(schedule(executor, this, Duration.fromNanos(Math.max(delay.toNanos - driftNanos, 1)))) - } catch { - case _: SchedulerException => // ignore failure to enqueue or terminated target actor - } + new AtomicCancellable(InitialRepeatMarker) { self => + final override protected def scheduledFirst(): Cancellable = + schedule( + executor, + new AtomicLong(clock() + initialDelay.toNanos) with Runnable { + override def run(): Unit = { + try { + runnable.run() + val driftNanos = clock() - getAndAdd(delay.toNanos) + if (self.get() != null) + swap(schedule(executor, this, Duration.fromNanos(Math.max(delay.toNanos - driftNanos, 1)))) + } catch { + case _: SchedulerException => // ignore failure to enqueue or terminated target actor } - }, - roundUp(initialDelay))) - - @tailrec private def swap(c: Cancellable): Unit = { - get match { - case null => if (c != null) c.cancel() - case old => if (!compareAndSet(old, c)) swap(c) - } - } - - final def cancel(): Boolean = { - @tailrec def tailrecCancel(): Boolean = { - get match { - case null => false - case c => - if (c.cancel()) compareAndSet(c, null) - else compareAndSet(c, null) || tailrecCancel() } - } - - tailrecCancel() - } - - override def isCancelled: Boolean = get == null - } - catch { - case cause @ SchedulerException(msg) => throw new IllegalStateException(msg, cause) + }, + roundUp(initialDelay)) } } diff --git a/actor/src/main/scala/org/apache/pekko/actor/Scheduler.scala b/actor/src/main/scala/org/apache/pekko/actor/Scheduler.scala index 41abe832e4..e71d7bfbc9 100644 --- a/actor/src/main/scala/org/apache/pekko/actor/Scheduler.scala +++ b/actor/src/main/scala/org/apache/pekko/actor/Scheduler.scala @@ -15,14 +15,14 @@ package org.apache.pekko.actor import java.util.concurrent.atomic.AtomicReference +import scala.annotation.nowarn import scala.annotation.tailrec import scala.concurrent.ExecutionContext import scala.concurrent.duration._ import scala.util.control.NoStackTrace -import scala.annotation.nowarn - import org.apache.pekko +import pekko.actor.Scheduler.AtomicCancellable import pekko.annotation.InternalApi import pekko.util.JavaDurationConverters @@ -81,51 +81,23 @@ trait Scheduler { * Note: For scheduling within actors `with Timers` should be preferred. */ def scheduleWithFixedDelay(initialDelay: FiniteDuration, delay: FiniteDuration)(runnable: Runnable)( - implicit executor: ExecutionContext): Cancellable = { - try new AtomicReference[Cancellable](Cancellable.initialNotCancelled) with Cancellable { self => - compareAndSet( - Cancellable.initialNotCancelled, - scheduleOnce( - initialDelay, - new Runnable { - override def run(): Unit = { - try { - runnable.run() - if (self.get != null) - swap(scheduleOnce(delay, this)) - } catch { - // ignore failure to enqueue or terminated target actor - case _: SchedulerException => - case e: IllegalStateException if e.getCause != null && e.getCause.isInstanceOf[SchedulerException] => - } - } - })) - - @tailrec private def swap(c: Cancellable): Unit = { - get match { - case null => if (c != null) c.cancel() - case old => if (!compareAndSet(old, c)) swap(c) - } - } - - final def cancel(): Boolean = { - @tailrec def tailrecCancel(): Boolean = { - get match { - case null => false - case c => - if (c.cancel()) compareAndSet(c, null) - else compareAndSet(c, null) || tailrecCancel() + implicit executor: ExecutionContext): Cancellable = new AtomicCancellable(Cancellable.initialNotCancelled) { + final override protected def scheduledFirst(): Cancellable = + scheduleOnce( + initialDelay, + new Runnable { + override def run(): Unit = { + try { + runnable.run() + if (get != null) + swap(scheduleOnce(delay, this)) + } catch { + // ignore failure to enqueue or terminated target actor + case _: SchedulerException => + case e: IllegalStateException if e.getCause != null && e.getCause.isInstanceOf[SchedulerException] => } } - - tailrecCancel() - } - - override def isCancelled: Boolean = get == null - } - catch { - case SchedulerException(msg) => throw new IllegalStateException(msg) - } + }) } /** @@ -574,4 +546,46 @@ object Scheduler { * a custom implementation of `Scheduler` must also implement this. */ trait TaskRunOnClose extends Runnable + + /** + * INTERNAL API + */ + @InternalApi + private[pekko] abstract class AtomicCancellable(initialValue: Cancellable) + extends AtomicReference[Cancellable](initialValue) + with Cancellable { + + try { + compareAndSet(initialValue, scheduledFirst()) + } catch { + case cause @ SchedulerException(msg) => throw new IllegalStateException(msg, cause) + } + + protected def scheduledFirst(): Cancellable + + @tailrec final protected def swap(c: Cancellable): Unit = { + get match { + case null => if (c != null) c.cancel() + case old => + if (!compareAndSet(old, c)) + swap(c) + } + } + + final def cancel(): Boolean = { + @tailrec def tailrecCancel(): Boolean = { + get match { + case null => false + case c => + if (c.cancel()) compareAndSet(c, null) + else compareAndSet(c, null) || tailrecCancel() + } + } + + tailrecCancel() + } + + final override def isCancelled: Boolean = get == null + + } }