feat: Add SchedulerTask which will be notified once cancelled. (#1593)

This commit is contained in:
He-Pin(kerr) 2024-12-29 17:08:01 +08:00 committed by GitHub
parent fc09ac6344
commit db94dedf23
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 66 additions and 3 deletions

View file

@ -131,6 +131,17 @@ trait SchedulerSpec extends BeforeAndAfterEach with DefaultTimeout with Implicit
task.isCancelled should ===(true)
}
"notify callback if cancel is performed before execution" taggedAs TimingTest in {
val latch = new CountDownLatch(1)
val task = system.scheduler.scheduleOnce(100 millis,
new SchedulerTask {
override def run(): Unit = ()
override def cancelled(): Unit = latch.countDown()
})
task.cancel()
latch.await(100, TimeUnit.MILLISECONDS) should ===(true)
}
"not be canceled if cancel is performed after execution" taggedAs TimingTest in {
val latch = TestLatch(1)
val task = collectCancellable(system.scheduler.scheduleOnce(10.millis)(latch.countDown()))
@ -334,6 +345,23 @@ trait SchedulerSpec extends BeforeAndAfterEach with DefaultTimeout with Implicit
ticks.get should ===(1)
}
"notify callback if cancel is performed after initial delay" taggedAs TimingTest in {
val latch = new CountDownLatch(1)
val initialDelay = 90.millis.dilated
val delay = 500.millis.dilated
val task = system.scheduler.scheduleWithFixedDelay(
initialDelay,
delay)(
new SchedulerTask {
override def run(): Unit = ()
override def cancelled(): Unit = latch.countDown()
})
Thread.sleep((initialDelay + 200.millis.dilated).toMillis)
task.cancel()
latch.await(100, TimeUnit.MILLISECONDS) should ===(true)
}
/**
* ticket #307
*/

View file

@ -139,7 +139,7 @@ class LightArrayRevolverScheduler(config: Config, log: LoggingAdapter, threadFac
final override protected def scheduledFirst(): Cancellable =
schedule(
executor,
new AtomicLong(clock() + initialDelay.toNanos) with Runnable {
new AtomicLong(clock() + initialDelay.toNanos) with SchedulerTask {
override def run(): Unit = {
try {
runnable.run()
@ -150,6 +150,11 @@ class LightArrayRevolverScheduler(config: Config, log: LoggingAdapter, threadFac
case _: SchedulerException => // ignore failure to enqueue or terminated target actor
}
}
override def cancelled(): Unit = runnable match {
case task: SchedulerTask => task.cancelled()
case _ =>
}
},
roundUp(initialDelay))
}
@ -390,7 +395,18 @@ object LightArrayRevolverScheduler {
override def cancel(): Boolean = extractTask(CancelledTask) match {
case ExecutedTask | CancelledTask => false
case _ => true
case task: SchedulerTask =>
notifyCancellation(task)
true
case _ => true
}
private def notifyCancellation(task: SchedulerTask): Unit = {
try {
executionContext.execute(() => task.cancelled())
} catch {
case NonFatal(e) => executionContext.reportFailure(e)
}
}
override def isCancelled: Boolean = task eq CancelledTask

View file

@ -85,7 +85,7 @@ trait Scheduler {
final override protected def scheduledFirst(): Cancellable =
scheduleOnce(
initialDelay,
new Runnable {
new SchedulerTask {
override def run(): Unit = {
try {
runnable.run()
@ -97,6 +97,11 @@ trait Scheduler {
case e: IllegalStateException if e.getCause != null && e.getCause.isInstanceOf[SchedulerException] =>
}
}
override def cancelled(): Unit = runnable match {
case task: SchedulerTask => task.cancelled()
case _ =>
}
})
}
@ -498,6 +503,20 @@ trait Scheduler {
// this one is just here so we can present a nice AbstractScheduler for Java
abstract class AbstractSchedulerBase extends Scheduler
/**
* A Task that will be notified when it is cancelled.
*
* @since 1.2.0
*/
trait SchedulerTask extends Runnable {
/**
* Called for [[SchedulerTask]]s that are successfully canceled via [[Cancellable#cancel]].
* Overriding this method allows to for example run some cleanup.
*/
def cancelled(): Unit = ()
}
/**
* Signifies something that can be cancelled
* There is no strict guarantee that the implementation is thread-safe,