From fe5a42586c1328b9a16215b34bb476c292fa9afa Mon Sep 17 00:00:00 2001 From: Arnout Engelen Date: Wed, 31 Jan 2018 14:48:05 +0100 Subject: [PATCH] Scheduler that manually advances time (##24150) * Don't apply dilation to scheduler parameter * Clarify ExecutionContext usage * Clarify comment on timePasses * Make ExplicitlyTriggeredScheduler internals private * List currently scheduled tasks in one log message * Execute immediately if (initialDelay <= Duration.Zero) * Don't reschedule if scheduled task fails * Be more efficient about logging * Widen `timePasses` delay for now https://github.com/akka/akka/pull/24243#discussion_r160985493 for some discussion on what to do instead * Remove mechanism for mixing in config from a test trait --- .../akka/actor/typed/ManualTimerTest.java | 54 +++++++++ .../akka/actor/typed/ManualTimerSpec.scala | 104 ++++++++++++++++ .../actor/typed/scaladsl/AskPattern.scala | 5 +- akka-docs/src/main/paradox/typed/testing.md | 13 ++ .../typed/ExplicitlyTriggeredScheduler.scala | 19 +++ .../scala/akka/testkit/typed/TestKit.scala | 1 + .../testkit/typed/scaladsl/ManualTime.scala | 12 ++ .../ExplicitlyTriggeredScheduler.scala | 112 ++++++++++++++++++ 8 files changed, 319 insertions(+), 1 deletion(-) create mode 100644 akka-actor-typed-tests/src/test/java/akka/actor/typed/ManualTimerTest.java create mode 100644 akka-actor-typed-tests/src/test/scala/akka/actor/typed/ManualTimerSpec.scala create mode 100644 akka-testkit-typed/src/main/scala/akka/testkit/typed/ExplicitlyTriggeredScheduler.scala create mode 100644 akka-testkit-typed/src/main/scala/akka/testkit/typed/scaladsl/ManualTime.scala create mode 100644 akka-testkit/src/main/scala/akka/testkit/ExplicitlyTriggeredScheduler.scala diff --git a/akka-actor-typed-tests/src/test/java/akka/actor/typed/ManualTimerTest.java b/akka-actor-typed-tests/src/test/java/akka/actor/typed/ManualTimerTest.java new file mode 100644 index 0000000000..669a1a284e --- /dev/null +++ b/akka-actor-typed-tests/src/test/java/akka/actor/typed/ManualTimerTest.java @@ -0,0 +1,54 @@ +/** + * Copyright (C) 2017 Lightbend Inc. + */ +package akka.actor.typed; + +//#manual-scheduling-simple +import java.util.concurrent.TimeUnit; +import static com.typesafe.config.ConfigFactory.parseString; + +import scala.concurrent.duration.Duration; + +import akka.actor.typed.javadsl.Behaviors; + +import org.junit.Test; + +import akka.testkit.typed.TestKit; +import akka.testkit.typed.ExplicitlyTriggeredScheduler; +import akka.testkit.typed.javadsl.TestProbe; + +public class ManualTimerTest extends TestKit { + ExplicitlyTriggeredScheduler scheduler; + + public ManualTimerTest() { + super(parseString("akka.scheduler.implementation = \"akka.testkit.typed.ExplicitlyTriggeredScheduler\"")); + this.scheduler = (ExplicitlyTriggeredScheduler) system().scheduler(); + } + + static final class Tick {} + static final class Tock {} + + @Test + public void testScheduleNonRepeatedTicks() { + TestProbe probe = new TestProbe<>(system()); + Behavior behavior = Behaviors.withTimers(timer -> { + timer.startSingleTimer("T", new Tick(), Duration.create(10, TimeUnit.MILLISECONDS)); + return Behaviors.immutable( (ctx, tick) -> { + probe.ref().tell(new Tock()); + return Behaviors.same(); + }); + }); + + spawn(behavior); + + scheduler.expectNoMessageFor(Duration.create(9, TimeUnit.MILLISECONDS), probe); + + scheduler.timePasses(Duration.create(2, TimeUnit.MILLISECONDS)); + probe.expectMsgType(Tock.class); + + scheduler.expectNoMessageFor(Duration.create(10, TimeUnit.SECONDS), probe); + } + + +} +//#manual-scheduling-simple diff --git a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/ManualTimerSpec.scala b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/ManualTimerSpec.scala new file mode 100644 index 0000000000..e9799c3212 --- /dev/null +++ b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/ManualTimerSpec.scala @@ -0,0 +1,104 @@ +package akka.actor.typed + +//#manual-scheduling-simple +import scala.concurrent.duration._ + +import akka.actor.typed.scaladsl.Behaviors + +import org.scalatest.WordSpecLike + +import akka.testkit.typed.TestKit +import akka.testkit.typed.scaladsl.{ ManualTime, TestProbe } + +class ManualTimerSpec extends TestKit(ManualTime.config) with ManualTime with WordSpecLike { + + "A timer" must { + "schedule non-repeated ticks" in { + case object Tick + case object Tock + + val probe = TestProbe[Tock.type]() + val behavior = Behaviors.withTimers[Tick.type] { timer ⇒ + timer.startSingleTimer("T", Tick, 10.millis) + Behaviors.immutable { (ctx, Tick) ⇒ + probe.ref ! Tock + Behaviors.same + } + } + + spawn(behavior) + + scheduler.expectNoMessageFor(9.millis, probe) + + scheduler.timePasses(2.millis) + probe.expectMsg(Tock) + + scheduler.expectNoMessageFor(10.seconds, probe) + } + //#manual-scheduling-simple + + "schedule repeated ticks" in { + case object Tick + case object Tock + + val probe = TestProbe[Tock.type]() + val behavior = Behaviors.withTimers[Tick.type] { timer ⇒ + timer.startPeriodicTimer("T", Tick, 10.millis) + Behaviors.immutable { (ctx, Tick) ⇒ + probe.ref ! Tock + Behaviors.same + } + } + + spawn(behavior) + + for (_ ← Range(0, 5)) { + scheduler.expectNoMessageFor(9.millis, probe) + + scheduler.timePasses(1.milli) + probe.expectMsg(Tock) + } + } + + "replace timer" in { + sealed trait Command + case class Tick(n: Int) extends Command + case class SlowThenBump(nextCount: Int) extends Command + sealed trait Event + case class Tock(n: Int) extends Event + + val probe = TestProbe[Event]("evt") + val interval = 10.millis + + val behavior = Behaviors.withTimers[Command] { timer ⇒ + timer.startPeriodicTimer("T", Tick(1), interval) + Behaviors.immutable { (ctx, cmd) ⇒ + cmd match { + case Tick(n) ⇒ + probe.ref ! Tock(n) + Behaviors.same + case SlowThenBump(nextCount) ⇒ + scheduler.timePasses(interval) + timer.startPeriodicTimer("T", Tick(nextCount), interval) + Behaviors.same + } + } + } + + val ref = spawn(behavior) + + scheduler.timePasses(11.millis) + probe.expectMsg(Tock(1)) + + // next Tock(1) enqueued in mailboxed, but should be discarded because of new timer + ref ! SlowThenBump(2) + scheduler.expectNoMessageFor(interval, probe) + + scheduler.timePasses(interval) + probe.expectMsg(Tock(2)) + } + + //#manual-scheduling-simple + } +} +//#manual-scheduling-simple diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/AskPattern.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/AskPattern.scala index 9b964dc3a6..554f793da9 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/AskPattern.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/AskPattern.scala @@ -63,12 +63,15 @@ object AskPattern { * val f: Future[Reply] = target ? (Request("hello", _)) * }}} */ - def ?[U](f: ActorRef[U] ⇒ T)(implicit timeout: Timeout, scheduler: Scheduler): Future[U] = + def ?[U](f: ActorRef[U] ⇒ T)(implicit timeout: Timeout, scheduler: Scheduler): Future[U] = { + // We do not currently use the implicit scheduler, but want to require it + // because it might be needed when we move to a 'native' typed runtime, see #24219 ref match { case a: adapt.ActorRefAdapter[_] ⇒ askUntyped(ref, a.untyped, timeout, f) case a: adapt.ActorSystemAdapter[_] ⇒ askUntyped(ref, a.untyped.guardian, timeout, f) case a ⇒ throw new IllegalStateException("Only expect actor references to be ActorRefAdapter or ActorSystemAdapter until native system is implemented: " + a.getClass) } + } } private val onTimeout: String ⇒ Throwable = msg ⇒ new TimeoutException(msg) diff --git a/akka-docs/src/main/paradox/typed/testing.md b/akka-docs/src/main/paradox/typed/testing.md index 7fe53ae6e6..12d7b3954d 100644 --- a/akka-docs/src/main/paradox/typed/testing.md +++ b/akka-docs/src/main/paradox/typed/testing.md @@ -183,3 +183,16 @@ Scala Java : @@snip [BasicAsyncTestingTest.java]($akka$/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/testing/async/BasicAsyncTestingTest.java) { #test-spawn-anonymous } +### Controlling the scheduler + +It can be hard to reliably unit test specific scenario's when your actor relies on timing: +especially when running many tests in parallel it can be hard to get the timing just right. +Making such tests more reliable by using generous timeouts make the tests take a long time to run. + +For such situations, we provide a scheduler where you can manually, explicitly advance the clock. + +Scala +: @@snip [ManualTimerSpec.scala]($akka$/akka-actor-typed-tests/src/test/scala/akka/actor/typed/ManualTimerSpec.scala) { #manual-scheduling-simple } + +Java +: @@snip [ManualTimerTest.scala]($akka$/akka-actor-typed-tests/src/test/java/akka/actor/typed/ManualTimerTest.java) { #manual-scheduling-simple } \ No newline at end of file diff --git a/akka-testkit-typed/src/main/scala/akka/testkit/typed/ExplicitlyTriggeredScheduler.scala b/akka-testkit-typed/src/main/scala/akka/testkit/typed/ExplicitlyTriggeredScheduler.scala new file mode 100644 index 0000000000..979efe00a5 --- /dev/null +++ b/akka-testkit-typed/src/main/scala/akka/testkit/typed/ExplicitlyTriggeredScheduler.scala @@ -0,0 +1,19 @@ +package akka.testkit.typed + +import java.util.concurrent.ThreadFactory + +import com.typesafe.config.Config + +import scala.annotation.varargs +import scala.concurrent.duration.{ Duration, FiniteDuration } + +import akka.event.LoggingAdapter + +class ExplicitlyTriggeredScheduler(config: Config, log: LoggingAdapter, tf: ThreadFactory) extends akka.testkit.ExplicitlyTriggeredScheduler(config, log, tf) { + + @varargs + def expectNoMessageFor(duration: FiniteDuration, on: scaladsl.TestProbe[_]*): Unit = { + timePasses(duration) + on.foreach(_.expectNoMessage(Duration.Zero)) + } +} diff --git a/akka-testkit-typed/src/main/scala/akka/testkit/typed/TestKit.scala b/akka-testkit-typed/src/main/scala/akka/testkit/typed/TestKit.scala index fd0a821f1d..28b4547ca8 100644 --- a/akka-testkit-typed/src/main/scala/akka/testkit/typed/TestKit.scala +++ b/akka-testkit-typed/src/main/scala/akka/testkit/typed/TestKit.scala @@ -64,6 +64,7 @@ class TestKit(name: String, config: Option[Config]) extends TestKitBase { def this(name: String) = this(name, None) def this(config: Config) = this(TestKit.getCallerName(classOf[TestKit]), Some(config)) def this(name: String, config: Config) = this(name, Some(config)) + import TestKit._ implicit val system = ActorSystem(testKitGuardian, name, config = config) } diff --git a/akka-testkit-typed/src/main/scala/akka/testkit/typed/scaladsl/ManualTime.scala b/akka-testkit-typed/src/main/scala/akka/testkit/typed/scaladsl/ManualTime.scala new file mode 100644 index 0000000000..7136e02838 --- /dev/null +++ b/akka-testkit-typed/src/main/scala/akka/testkit/typed/scaladsl/ManualTime.scala @@ -0,0 +1,12 @@ +package akka.testkit.typed.scaladsl + +import com.typesafe.config.{ Config, ConfigFactory } + +import akka.testkit.typed._ + +object ManualTime { + val config: Config = ConfigFactory.parseString("""akka.scheduler.implementation = "akka.testkit.typed.ExplicitlyTriggeredScheduler"""") +} +trait ManualTime { self: TestKit ⇒ + override val scheduler: ExplicitlyTriggeredScheduler = self.system.scheduler.asInstanceOf[ExplicitlyTriggeredScheduler] +} diff --git a/akka-testkit/src/main/scala/akka/testkit/ExplicitlyTriggeredScheduler.scala b/akka-testkit/src/main/scala/akka/testkit/ExplicitlyTriggeredScheduler.scala new file mode 100644 index 0000000000..4475947c3d --- /dev/null +++ b/akka-testkit/src/main/scala/akka/testkit/ExplicitlyTriggeredScheduler.scala @@ -0,0 +1,112 @@ +package akka.testkit + +import java.util.concurrent.ThreadFactory +import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.atomic.AtomicLong + +import com.typesafe.config.Config + +import scala.annotation.tailrec +import scala.collection.JavaConverters._ +import scala.concurrent.ExecutionContext +import scala.concurrent.duration.{ Duration, FiniteDuration } +import scala.util.Try + +import akka.actor.Cancellable +import akka.actor.Scheduler +import akka.event.LoggingAdapter + +/** + * For testing: scheduler that does not look at the clock, but must be + * progressed manually by calling `timePasses`. + * + * This allows for faster and less timing-sensitive specs, as jobs will be + * executed on the test thread instead of using the original + * {ExecutionContext}. This means recreating specific scenario's becomes + * easier, but these tests might fail to catch race conditions that only + * happen when tasks are scheduled in parallel in 'real time'. + */ +class ExplicitlyTriggeredScheduler(config: Config, log: LoggingAdapter, tf: ThreadFactory) extends Scheduler { + + private case class Item(time: Long, interval: Option[FiniteDuration], runnable: Runnable) + + private val currentTime = new AtomicLong() + private val scheduled = new ConcurrentHashMap[Item, Unit]() + + override def schedule(initialDelay: FiniteDuration, interval: FiniteDuration, runnable: Runnable)(implicit executor: ExecutionContext): Cancellable = + schedule(initialDelay, Some(interval), runnable) + + override def scheduleOnce(delay: FiniteDuration, runnable: Runnable)(implicit executor: ExecutionContext): Cancellable = + schedule(delay, None, runnable) + + /** + * Advance the clock by the specified duration, executing all outstanding jobs on the calling thread before returning. + * + * We will not add a dilation factor to this amount, since the scheduler API also does not apply dilation. + * If you want the amount of time passed to be dilated, apply the dilation before passing the delay to + * this method. + */ + def timePasses(amount: FiniteDuration) = { + // Give dispatchers time to clear :(. See + // https://github.com/akka/akka/pull/24243#discussion_r160985493 + // for some discussion on how to deal with this properly. + Thread.sleep(100) + + val newTime = currentTime.get + amount.toMillis + if (log.isDebugEnabled) + log.debug(s"Time proceeds from ${currentTime.get} to $newTime, currently scheduled for this period:" + scheduledTasks(newTime).map(item ⇒ s"\n- $item")) + + executeTasks(newTime) + currentTime.set(newTime) + } + + private def scheduledTasks(runTo: Long): Seq[Item] = + scheduled + .keySet() + .asScala + .filter(_.time <= runTo) + .toList + .sortBy(_.time) + + @tailrec + private[testkit] final def executeTasks(runTo: Long): Unit = { + scheduledTasks(runTo).headOption match { + case Some(task) ⇒ + currentTime.set(task.time) + val runResult = Try(task.runnable.run()) + scheduled.remove(task) + + if (runResult.isSuccess) + task.interval.foreach(i ⇒ scheduled.put(task.copy(time = task.time + i.toMillis), ())) + + // running the runnable might have scheduled new events + executeTasks(runTo) + case _ ⇒ // Done + } + } + + private def schedule(initialDelay: FiniteDuration, interval: Option[FiniteDuration], runnable: Runnable): Cancellable = { + val firstTime = currentTime.get + initialDelay.toMillis + val item = Item(firstTime, interval, runnable) + log.debug("Scheduled item for {}: {}", firstTime, item) + scheduled.put(item, ()) + + if (initialDelay <= Duration.Zero) + executeTasks(currentTime.get) + + new Cancellable { + var cancelled = false + + override def cancel(): Boolean = { + val before = scheduled.size + scheduled.remove(item) + cancelled = true + before > scheduled.size + } + + override def isCancelled: Boolean = cancelled + } + } + + override def maxFrequency: Double = 42 +}