Scheduler that manually advances time (##24150)
* Don't apply dilation to scheduler parameter * Clarify ExecutionContext usage * Clarify comment on timePasses * Make ExplicitlyTriggeredScheduler internals private * List currently scheduled tasks in one log message * Execute immediately if (initialDelay <= Duration.Zero) * Don't reschedule if scheduled task fails * Be more efficient about logging * Widen `timePasses` delay for now https://github.com/akka/akka/pull/24243#discussion_r160985493 for some discussion on what to do instead * Remove mechanism for mixing in config from a test trait
This commit is contained in:
parent
978d927133
commit
fe5a42586c
8 changed files with 319 additions and 1 deletions
|
|
@ -0,0 +1,54 @@
|
|||
/**
|
||||
* Copyright (C) 2017 Lightbend Inc. <http://www.lightbend.com/>
|
||||
*/
|
||||
package akka.actor.typed;
|
||||
|
||||
//#manual-scheduling-simple
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import static com.typesafe.config.ConfigFactory.parseString;
|
||||
|
||||
import scala.concurrent.duration.Duration;
|
||||
|
||||
import akka.actor.typed.javadsl.Behaviors;
|
||||
|
||||
import org.junit.Test;
|
||||
|
||||
import akka.testkit.typed.TestKit;
|
||||
import akka.testkit.typed.ExplicitlyTriggeredScheduler;
|
||||
import akka.testkit.typed.javadsl.TestProbe;
|
||||
|
||||
public class ManualTimerTest extends TestKit {
|
||||
ExplicitlyTriggeredScheduler scheduler;
|
||||
|
||||
public ManualTimerTest() {
|
||||
super(parseString("akka.scheduler.implementation = \"akka.testkit.typed.ExplicitlyTriggeredScheduler\""));
|
||||
this.scheduler = (ExplicitlyTriggeredScheduler) system().scheduler();
|
||||
}
|
||||
|
||||
static final class Tick {}
|
||||
static final class Tock {}
|
||||
|
||||
@Test
|
||||
public void testScheduleNonRepeatedTicks() {
|
||||
TestProbe<Tock> probe = new TestProbe<>(system());
|
||||
Behavior<Tick> behavior = Behaviors.withTimers(timer -> {
|
||||
timer.startSingleTimer("T", new Tick(), Duration.create(10, TimeUnit.MILLISECONDS));
|
||||
return Behaviors.immutable( (ctx, tick) -> {
|
||||
probe.ref().tell(new Tock());
|
||||
return Behaviors.same();
|
||||
});
|
||||
});
|
||||
|
||||
spawn(behavior);
|
||||
|
||||
scheduler.expectNoMessageFor(Duration.create(9, TimeUnit.MILLISECONDS), probe);
|
||||
|
||||
scheduler.timePasses(Duration.create(2, TimeUnit.MILLISECONDS));
|
||||
probe.expectMsgType(Tock.class);
|
||||
|
||||
scheduler.expectNoMessageFor(Duration.create(10, TimeUnit.SECONDS), probe);
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
//#manual-scheduling-simple
|
||||
|
|
@ -0,0 +1,104 @@
|
|||
package akka.actor.typed
|
||||
|
||||
//#manual-scheduling-simple
|
||||
import scala.concurrent.duration._
|
||||
|
||||
import akka.actor.typed.scaladsl.Behaviors
|
||||
|
||||
import org.scalatest.WordSpecLike
|
||||
|
||||
import akka.testkit.typed.TestKit
|
||||
import akka.testkit.typed.scaladsl.{ ManualTime, TestProbe }
|
||||
|
||||
class ManualTimerSpec extends TestKit(ManualTime.config) with ManualTime with WordSpecLike {
|
||||
|
||||
"A timer" must {
|
||||
"schedule non-repeated ticks" in {
|
||||
case object Tick
|
||||
case object Tock
|
||||
|
||||
val probe = TestProbe[Tock.type]()
|
||||
val behavior = Behaviors.withTimers[Tick.type] { timer ⇒
|
||||
timer.startSingleTimer("T", Tick, 10.millis)
|
||||
Behaviors.immutable { (ctx, Tick) ⇒
|
||||
probe.ref ! Tock
|
||||
Behaviors.same
|
||||
}
|
||||
}
|
||||
|
||||
spawn(behavior)
|
||||
|
||||
scheduler.expectNoMessageFor(9.millis, probe)
|
||||
|
||||
scheduler.timePasses(2.millis)
|
||||
probe.expectMsg(Tock)
|
||||
|
||||
scheduler.expectNoMessageFor(10.seconds, probe)
|
||||
}
|
||||
//#manual-scheduling-simple
|
||||
|
||||
"schedule repeated ticks" in {
|
||||
case object Tick
|
||||
case object Tock
|
||||
|
||||
val probe = TestProbe[Tock.type]()
|
||||
val behavior = Behaviors.withTimers[Tick.type] { timer ⇒
|
||||
timer.startPeriodicTimer("T", Tick, 10.millis)
|
||||
Behaviors.immutable { (ctx, Tick) ⇒
|
||||
probe.ref ! Tock
|
||||
Behaviors.same
|
||||
}
|
||||
}
|
||||
|
||||
spawn(behavior)
|
||||
|
||||
for (_ ← Range(0, 5)) {
|
||||
scheduler.expectNoMessageFor(9.millis, probe)
|
||||
|
||||
scheduler.timePasses(1.milli)
|
||||
probe.expectMsg(Tock)
|
||||
}
|
||||
}
|
||||
|
||||
"replace timer" in {
|
||||
sealed trait Command
|
||||
case class Tick(n: Int) extends Command
|
||||
case class SlowThenBump(nextCount: Int) extends Command
|
||||
sealed trait Event
|
||||
case class Tock(n: Int) extends Event
|
||||
|
||||
val probe = TestProbe[Event]("evt")
|
||||
val interval = 10.millis
|
||||
|
||||
val behavior = Behaviors.withTimers[Command] { timer ⇒
|
||||
timer.startPeriodicTimer("T", Tick(1), interval)
|
||||
Behaviors.immutable { (ctx, cmd) ⇒
|
||||
cmd match {
|
||||
case Tick(n) ⇒
|
||||
probe.ref ! Tock(n)
|
||||
Behaviors.same
|
||||
case SlowThenBump(nextCount) ⇒
|
||||
scheduler.timePasses(interval)
|
||||
timer.startPeriodicTimer("T", Tick(nextCount), interval)
|
||||
Behaviors.same
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
val ref = spawn(behavior)
|
||||
|
||||
scheduler.timePasses(11.millis)
|
||||
probe.expectMsg(Tock(1))
|
||||
|
||||
// next Tock(1) enqueued in mailboxed, but should be discarded because of new timer
|
||||
ref ! SlowThenBump(2)
|
||||
scheduler.expectNoMessageFor(interval, probe)
|
||||
|
||||
scheduler.timePasses(interval)
|
||||
probe.expectMsg(Tock(2))
|
||||
}
|
||||
|
||||
//#manual-scheduling-simple
|
||||
}
|
||||
}
|
||||
//#manual-scheduling-simple
|
||||
|
|
@ -63,12 +63,15 @@ object AskPattern {
|
|||
* val f: Future[Reply] = target ? (Request("hello", _))
|
||||
* }}}
|
||||
*/
|
||||
def ?[U](f: ActorRef[U] ⇒ T)(implicit timeout: Timeout, scheduler: Scheduler): Future[U] =
|
||||
def ?[U](f: ActorRef[U] ⇒ T)(implicit timeout: Timeout, scheduler: Scheduler): Future[U] = {
|
||||
// We do not currently use the implicit scheduler, but want to require it
|
||||
// because it might be needed when we move to a 'native' typed runtime, see #24219
|
||||
ref match {
|
||||
case a: adapt.ActorRefAdapter[_] ⇒ askUntyped(ref, a.untyped, timeout, f)
|
||||
case a: adapt.ActorSystemAdapter[_] ⇒ askUntyped(ref, a.untyped.guardian, timeout, f)
|
||||
case a ⇒ throw new IllegalStateException("Only expect actor references to be ActorRefAdapter or ActorSystemAdapter until native system is implemented: " + a.getClass)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private val onTimeout: String ⇒ Throwable = msg ⇒ new TimeoutException(msg)
|
||||
|
|
|
|||
|
|
@ -183,3 +183,16 @@ Scala
|
|||
Java
|
||||
: @@snip [BasicAsyncTestingTest.java]($akka$/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/testing/async/BasicAsyncTestingTest.java) { #test-spawn-anonymous }
|
||||
|
||||
### Controlling the scheduler
|
||||
|
||||
It can be hard to reliably unit test specific scenario's when your actor relies on timing:
|
||||
especially when running many tests in parallel it can be hard to get the timing just right.
|
||||
Making such tests more reliable by using generous timeouts make the tests take a long time to run.
|
||||
|
||||
For such situations, we provide a scheduler where you can manually, explicitly advance the clock.
|
||||
|
||||
Scala
|
||||
: @@snip [ManualTimerSpec.scala]($akka$/akka-actor-typed-tests/src/test/scala/akka/actor/typed/ManualTimerSpec.scala) { #manual-scheduling-simple }
|
||||
|
||||
Java
|
||||
: @@snip [ManualTimerTest.scala]($akka$/akka-actor-typed-tests/src/test/java/akka/actor/typed/ManualTimerTest.java) { #manual-scheduling-simple }
|
||||
|
|
@ -0,0 +1,19 @@
|
|||
package akka.testkit.typed
|
||||
|
||||
import java.util.concurrent.ThreadFactory
|
||||
|
||||
import com.typesafe.config.Config
|
||||
|
||||
import scala.annotation.varargs
|
||||
import scala.concurrent.duration.{ Duration, FiniteDuration }
|
||||
|
||||
import akka.event.LoggingAdapter
|
||||
|
||||
class ExplicitlyTriggeredScheduler(config: Config, log: LoggingAdapter, tf: ThreadFactory) extends akka.testkit.ExplicitlyTriggeredScheduler(config, log, tf) {
|
||||
|
||||
@varargs
|
||||
def expectNoMessageFor(duration: FiniteDuration, on: scaladsl.TestProbe[_]*): Unit = {
|
||||
timePasses(duration)
|
||||
on.foreach(_.expectNoMessage(Duration.Zero))
|
||||
}
|
||||
}
|
||||
|
|
@ -64,6 +64,7 @@ class TestKit(name: String, config: Option[Config]) extends TestKitBase {
|
|||
def this(name: String) = this(name, None)
|
||||
def this(config: Config) = this(TestKit.getCallerName(classOf[TestKit]), Some(config))
|
||||
def this(name: String, config: Config) = this(name, Some(config))
|
||||
|
||||
import TestKit._
|
||||
implicit val system = ActorSystem(testKitGuardian, name, config = config)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -0,0 +1,12 @@
|
|||
package akka.testkit.typed.scaladsl
|
||||
|
||||
import com.typesafe.config.{ Config, ConfigFactory }
|
||||
|
||||
import akka.testkit.typed._
|
||||
|
||||
object ManualTime {
|
||||
val config: Config = ConfigFactory.parseString("""akka.scheduler.implementation = "akka.testkit.typed.ExplicitlyTriggeredScheduler"""")
|
||||
}
|
||||
trait ManualTime { self: TestKit ⇒
|
||||
override val scheduler: ExplicitlyTriggeredScheduler = self.system.scheduler.asInstanceOf[ExplicitlyTriggeredScheduler]
|
||||
}
|
||||
|
|
@ -0,0 +1,112 @@
|
|||
package akka.testkit
|
||||
|
||||
import java.util.concurrent.ThreadFactory
|
||||
import java.util.concurrent.ConcurrentHashMap
|
||||
import java.util.concurrent.atomic.AtomicLong
|
||||
|
||||
import com.typesafe.config.Config
|
||||
|
||||
import scala.annotation.tailrec
|
||||
import scala.collection.JavaConverters._
|
||||
import scala.concurrent.ExecutionContext
|
||||
import scala.concurrent.duration.{ Duration, FiniteDuration }
|
||||
import scala.util.Try
|
||||
|
||||
import akka.actor.Cancellable
|
||||
import akka.actor.Scheduler
|
||||
import akka.event.LoggingAdapter
|
||||
|
||||
/**
|
||||
* For testing: scheduler that does not look at the clock, but must be
|
||||
* progressed manually by calling `timePasses`.
|
||||
*
|
||||
* This allows for faster and less timing-sensitive specs, as jobs will be
|
||||
* executed on the test thread instead of using the original
|
||||
* {ExecutionContext}. This means recreating specific scenario's becomes
|
||||
* easier, but these tests might fail to catch race conditions that only
|
||||
* happen when tasks are scheduled in parallel in 'real time'.
|
||||
*/
|
||||
class ExplicitlyTriggeredScheduler(config: Config, log: LoggingAdapter, tf: ThreadFactory) extends Scheduler {
|
||||
|
||||
private case class Item(time: Long, interval: Option[FiniteDuration], runnable: Runnable)
|
||||
|
||||
private val currentTime = new AtomicLong()
|
||||
private val scheduled = new ConcurrentHashMap[Item, Unit]()
|
||||
|
||||
override def schedule(initialDelay: FiniteDuration, interval: FiniteDuration, runnable: Runnable)(implicit executor: ExecutionContext): Cancellable =
|
||||
schedule(initialDelay, Some(interval), runnable)
|
||||
|
||||
override def scheduleOnce(delay: FiniteDuration, runnable: Runnable)(implicit executor: ExecutionContext): Cancellable =
|
||||
schedule(delay, None, runnable)
|
||||
|
||||
/**
|
||||
* Advance the clock by the specified duration, executing all outstanding jobs on the calling thread before returning.
|
||||
*
|
||||
* We will not add a dilation factor to this amount, since the scheduler API also does not apply dilation.
|
||||
* If you want the amount of time passed to be dilated, apply the dilation before passing the delay to
|
||||
* this method.
|
||||
*/
|
||||
def timePasses(amount: FiniteDuration) = {
|
||||
// Give dispatchers time to clear :(. See
|
||||
// https://github.com/akka/akka/pull/24243#discussion_r160985493
|
||||
// for some discussion on how to deal with this properly.
|
||||
Thread.sleep(100)
|
||||
|
||||
val newTime = currentTime.get + amount.toMillis
|
||||
if (log.isDebugEnabled)
|
||||
log.debug(s"Time proceeds from ${currentTime.get} to $newTime, currently scheduled for this period:" + scheduledTasks(newTime).map(item ⇒ s"\n- $item"))
|
||||
|
||||
executeTasks(newTime)
|
||||
currentTime.set(newTime)
|
||||
}
|
||||
|
||||
private def scheduledTasks(runTo: Long): Seq[Item] =
|
||||
scheduled
|
||||
.keySet()
|
||||
.asScala
|
||||
.filter(_.time <= runTo)
|
||||
.toList
|
||||
.sortBy(_.time)
|
||||
|
||||
@tailrec
|
||||
private[testkit] final def executeTasks(runTo: Long): Unit = {
|
||||
scheduledTasks(runTo).headOption match {
|
||||
case Some(task) ⇒
|
||||
currentTime.set(task.time)
|
||||
val runResult = Try(task.runnable.run())
|
||||
scheduled.remove(task)
|
||||
|
||||
if (runResult.isSuccess)
|
||||
task.interval.foreach(i ⇒ scheduled.put(task.copy(time = task.time + i.toMillis), ()))
|
||||
|
||||
// running the runnable might have scheduled new events
|
||||
executeTasks(runTo)
|
||||
case _ ⇒ // Done
|
||||
}
|
||||
}
|
||||
|
||||
private def schedule(initialDelay: FiniteDuration, interval: Option[FiniteDuration], runnable: Runnable): Cancellable = {
|
||||
val firstTime = currentTime.get + initialDelay.toMillis
|
||||
val item = Item(firstTime, interval, runnable)
|
||||
log.debug("Scheduled item for {}: {}", firstTime, item)
|
||||
scheduled.put(item, ())
|
||||
|
||||
if (initialDelay <= Duration.Zero)
|
||||
executeTasks(currentTime.get)
|
||||
|
||||
new Cancellable {
|
||||
var cancelled = false
|
||||
|
||||
override def cancel(): Boolean = {
|
||||
val before = scheduled.size
|
||||
scheduled.remove(item)
|
||||
cancelled = true
|
||||
before > scheduled.size
|
||||
}
|
||||
|
||||
override def isCancelled: Boolean = cancelled
|
||||
}
|
||||
}
|
||||
|
||||
override def maxFrequency: Double = 42
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue