=act Extract AtomicCancellable in Scheduler.

Signed-off-by: He-Pin <hepin1989@gmail.com>
This commit is contained in:
He-Pin 2023-08-15 20:19:06 +08:00 committed by kerr
parent a855e58bfc
commit cbdc8d866c
2 changed files with 75 additions and 86 deletions

View file

@ -25,6 +25,7 @@ import scala.util.control.NonFatal
import com.typesafe.config.Config
import org.apache.pekko
import pekko.actor.Scheduler.AtomicCancellable
import pekko.dispatch.AbstractNodeQueue
import pekko.event.LoggingAdapter
import pekko.util.Helpers
@ -111,49 +112,23 @@ class LightArrayRevolverScheduler(config: Config, log: LoggingAdapter, threadFac
override def schedule(initialDelay: FiniteDuration, delay: FiniteDuration, runnable: Runnable)(
implicit executor: ExecutionContext): Cancellable = {
checkMaxDelay(roundUp(delay).toNanos)
try new AtomicReference[Cancellable](InitialRepeatMarker) with Cancellable { self =>
compareAndSet(
InitialRepeatMarker,
schedule(
executor,
new AtomicLong(clock() + initialDelay.toNanos) with Runnable {
override def run(): Unit = {
try {
runnable.run()
val driftNanos = clock() - getAndAdd(delay.toNanos)
if (self.get != null)
swap(schedule(executor, this, Duration.fromNanos(Math.max(delay.toNanos - driftNanos, 1))))
} catch {
case _: SchedulerException => // ignore failure to enqueue or terminated target actor
}
new AtomicCancellable(InitialRepeatMarker) { self =>
final override protected def scheduledFirst(): Cancellable =
schedule(
executor,
new AtomicLong(clock() + initialDelay.toNanos) with Runnable {
override def run(): Unit = {
try {
runnable.run()
val driftNanos = clock() - getAndAdd(delay.toNanos)
if (self.get() != null)
swap(schedule(executor, this, Duration.fromNanos(Math.max(delay.toNanos - driftNanos, 1))))
} catch {
case _: SchedulerException => // ignore failure to enqueue or terminated target actor
}
},
roundUp(initialDelay)))
@tailrec private def swap(c: Cancellable): Unit = {
get match {
case null => if (c != null) c.cancel()
case old => if (!compareAndSet(old, c)) swap(c)
}
}
final def cancel(): Boolean = {
@tailrec def tailrecCancel(): Boolean = {
get match {
case null => false
case c =>
if (c.cancel()) compareAndSet(c, null)
else compareAndSet(c, null) || tailrecCancel()
}
}
tailrecCancel()
}
override def isCancelled: Boolean = get == null
}
catch {
case cause @ SchedulerException(msg) => throw new IllegalStateException(msg, cause)
},
roundUp(initialDelay))
}
}

View file

@ -15,14 +15,14 @@ package org.apache.pekko.actor
import java.util.concurrent.atomic.AtomicReference
import scala.annotation.nowarn
import scala.annotation.tailrec
import scala.concurrent.ExecutionContext
import scala.concurrent.duration._
import scala.util.control.NoStackTrace
import scala.annotation.nowarn
import org.apache.pekko
import pekko.actor.Scheduler.AtomicCancellable
import pekko.annotation.InternalApi
import pekko.util.JavaDurationConverters
@ -81,51 +81,23 @@ trait Scheduler {
* Note: For scheduling within actors `with Timers` should be preferred.
*/
def scheduleWithFixedDelay(initialDelay: FiniteDuration, delay: FiniteDuration)(runnable: Runnable)(
implicit executor: ExecutionContext): Cancellable = {
try new AtomicReference[Cancellable](Cancellable.initialNotCancelled) with Cancellable { self =>
compareAndSet(
Cancellable.initialNotCancelled,
scheduleOnce(
initialDelay,
new Runnable {
override def run(): Unit = {
try {
runnable.run()
if (self.get != null)
swap(scheduleOnce(delay, this))
} catch {
// ignore failure to enqueue or terminated target actor
case _: SchedulerException =>
case e: IllegalStateException if e.getCause != null && e.getCause.isInstanceOf[SchedulerException] =>
}
}
}))
@tailrec private def swap(c: Cancellable): Unit = {
get match {
case null => if (c != null) c.cancel()
case old => if (!compareAndSet(old, c)) swap(c)
}
}
final def cancel(): Boolean = {
@tailrec def tailrecCancel(): Boolean = {
get match {
case null => false
case c =>
if (c.cancel()) compareAndSet(c, null)
else compareAndSet(c, null) || tailrecCancel()
implicit executor: ExecutionContext): Cancellable = new AtomicCancellable(Cancellable.initialNotCancelled) {
final override protected def scheduledFirst(): Cancellable =
scheduleOnce(
initialDelay,
new Runnable {
override def run(): Unit = {
try {
runnable.run()
if (get != null)
swap(scheduleOnce(delay, this))
} catch {
// ignore failure to enqueue or terminated target actor
case _: SchedulerException =>
case e: IllegalStateException if e.getCause != null && e.getCause.isInstanceOf[SchedulerException] =>
}
}
tailrecCancel()
}
override def isCancelled: Boolean = get == null
}
catch {
case SchedulerException(msg) => throw new IllegalStateException(msg)
}
})
}
/**
@ -574,4 +546,46 @@ object Scheduler {
* a custom implementation of `Scheduler` must also implement this.
*/
trait TaskRunOnClose extends Runnable
/**
* INTERNAL API
*/
@InternalApi
private[pekko] abstract class AtomicCancellable(initialValue: Cancellable)
extends AtomicReference[Cancellable](initialValue)
with Cancellable {
try {
compareAndSet(initialValue, scheduledFirst())
} catch {
case cause @ SchedulerException(msg) => throw new IllegalStateException(msg, cause)
}
protected def scheduledFirst(): Cancellable
@tailrec final protected def swap(c: Cancellable): Unit = {
get match {
case null => if (c != null) c.cancel()
case old =>
if (!compareAndSet(old, c))
swap(c)
}
}
final def cancel(): Boolean = {
@tailrec def tailrecCancel(): Boolean = {
get match {
case null => false
case c =>
if (c.cancel()) compareAndSet(c, null)
else compareAndSet(c, null) || tailrecCancel()
}
}
tailrecCancel()
}
final override def isCancelled: Boolean = get == null
}
}