diff --git a/akka-actor-typed-tests/src/test/java/akka/actor/typed/ManualTimerTest.java b/akka-actor-typed-tests/src/test/java/akka/actor/typed/ManualTimerTest.java
new file mode 100644
index 0000000000..669a1a284e
--- /dev/null
+++ b/akka-actor-typed-tests/src/test/java/akka/actor/typed/ManualTimerTest.java
@@ -0,0 +1,54 @@
+/**
+ * Copyright (C) 2017 Lightbend Inc.
+ */
+package akka.actor.typed;
+
+//#manual-scheduling-simple
+import java.util.concurrent.TimeUnit;
+import static com.typesafe.config.ConfigFactory.parseString;
+
+import scala.concurrent.duration.Duration;
+
+import akka.actor.typed.javadsl.Behaviors;
+
+import org.junit.Test;
+
+import akka.testkit.typed.TestKit;
+import akka.testkit.typed.ExplicitlyTriggeredScheduler;
+import akka.testkit.typed.javadsl.TestProbe;
+
+public class ManualTimerTest extends TestKit {
+ ExplicitlyTriggeredScheduler scheduler;
+
+ public ManualTimerTest() {
+ super(parseString("akka.scheduler.implementation = \"akka.testkit.typed.ExplicitlyTriggeredScheduler\""));
+ this.scheduler = (ExplicitlyTriggeredScheduler) system().scheduler();
+ }
+
+ static final class Tick {}
+ static final class Tock {}
+
+ @Test
+ public void testScheduleNonRepeatedTicks() {
+ TestProbe probe = new TestProbe<>(system());
+ Behavior behavior = Behaviors.withTimers(timer -> {
+ timer.startSingleTimer("T", new Tick(), Duration.create(10, TimeUnit.MILLISECONDS));
+ return Behaviors.immutable( (ctx, tick) -> {
+ probe.ref().tell(new Tock());
+ return Behaviors.same();
+ });
+ });
+
+ spawn(behavior);
+
+ scheduler.expectNoMessageFor(Duration.create(9, TimeUnit.MILLISECONDS), probe);
+
+ scheduler.timePasses(Duration.create(2, TimeUnit.MILLISECONDS));
+ probe.expectMsgType(Tock.class);
+
+ scheduler.expectNoMessageFor(Duration.create(10, TimeUnit.SECONDS), probe);
+ }
+
+
+}
+//#manual-scheduling-simple
diff --git a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/ManualTimerSpec.scala b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/ManualTimerSpec.scala
new file mode 100644
index 0000000000..e9799c3212
--- /dev/null
+++ b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/ManualTimerSpec.scala
@@ -0,0 +1,104 @@
+package akka.actor.typed
+
+//#manual-scheduling-simple
+import scala.concurrent.duration._
+
+import akka.actor.typed.scaladsl.Behaviors
+
+import org.scalatest.WordSpecLike
+
+import akka.testkit.typed.TestKit
+import akka.testkit.typed.scaladsl.{ ManualTime, TestProbe }
+
+class ManualTimerSpec extends TestKit(ManualTime.config) with ManualTime with WordSpecLike {
+
+ "A timer" must {
+ "schedule non-repeated ticks" in {
+ case object Tick
+ case object Tock
+
+ val probe = TestProbe[Tock.type]()
+ val behavior = Behaviors.withTimers[Tick.type] { timer ⇒
+ timer.startSingleTimer("T", Tick, 10.millis)
+ Behaviors.immutable { (ctx, Tick) ⇒
+ probe.ref ! Tock
+ Behaviors.same
+ }
+ }
+
+ spawn(behavior)
+
+ scheduler.expectNoMessageFor(9.millis, probe)
+
+ scheduler.timePasses(2.millis)
+ probe.expectMsg(Tock)
+
+ scheduler.expectNoMessageFor(10.seconds, probe)
+ }
+ //#manual-scheduling-simple
+
+ "schedule repeated ticks" in {
+ case object Tick
+ case object Tock
+
+ val probe = TestProbe[Tock.type]()
+ val behavior = Behaviors.withTimers[Tick.type] { timer ⇒
+ timer.startPeriodicTimer("T", Tick, 10.millis)
+ Behaviors.immutable { (ctx, Tick) ⇒
+ probe.ref ! Tock
+ Behaviors.same
+ }
+ }
+
+ spawn(behavior)
+
+ for (_ ← Range(0, 5)) {
+ scheduler.expectNoMessageFor(9.millis, probe)
+
+ scheduler.timePasses(1.milli)
+ probe.expectMsg(Tock)
+ }
+ }
+
+ "replace timer" in {
+ sealed trait Command
+ case class Tick(n: Int) extends Command
+ case class SlowThenBump(nextCount: Int) extends Command
+ sealed trait Event
+ case class Tock(n: Int) extends Event
+
+ val probe = TestProbe[Event]("evt")
+ val interval = 10.millis
+
+ val behavior = Behaviors.withTimers[Command] { timer ⇒
+ timer.startPeriodicTimer("T", Tick(1), interval)
+ Behaviors.immutable { (ctx, cmd) ⇒
+ cmd match {
+ case Tick(n) ⇒
+ probe.ref ! Tock(n)
+ Behaviors.same
+ case SlowThenBump(nextCount) ⇒
+ scheduler.timePasses(interval)
+ timer.startPeriodicTimer("T", Tick(nextCount), interval)
+ Behaviors.same
+ }
+ }
+ }
+
+ val ref = spawn(behavior)
+
+ scheduler.timePasses(11.millis)
+ probe.expectMsg(Tock(1))
+
+ // next Tock(1) enqueued in mailboxed, but should be discarded because of new timer
+ ref ! SlowThenBump(2)
+ scheduler.expectNoMessageFor(interval, probe)
+
+ scheduler.timePasses(interval)
+ probe.expectMsg(Tock(2))
+ }
+
+ //#manual-scheduling-simple
+ }
+}
+//#manual-scheduling-simple
diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/AskPattern.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/AskPattern.scala
index 9b964dc3a6..554f793da9 100644
--- a/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/AskPattern.scala
+++ b/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/AskPattern.scala
@@ -63,12 +63,15 @@ object AskPattern {
* val f: Future[Reply] = target ? (Request("hello", _))
* }}}
*/
- def ?[U](f: ActorRef[U] ⇒ T)(implicit timeout: Timeout, scheduler: Scheduler): Future[U] =
+ def ?[U](f: ActorRef[U] ⇒ T)(implicit timeout: Timeout, scheduler: Scheduler): Future[U] = {
+ // We do not currently use the implicit scheduler, but want to require it
+ // because it might be needed when we move to a 'native' typed runtime, see #24219
ref match {
case a: adapt.ActorRefAdapter[_] ⇒ askUntyped(ref, a.untyped, timeout, f)
case a: adapt.ActorSystemAdapter[_] ⇒ askUntyped(ref, a.untyped.guardian, timeout, f)
case a ⇒ throw new IllegalStateException("Only expect actor references to be ActorRefAdapter or ActorSystemAdapter until native system is implemented: " + a.getClass)
}
+ }
}
private val onTimeout: String ⇒ Throwable = msg ⇒ new TimeoutException(msg)
diff --git a/akka-docs/src/main/paradox/typed/testing.md b/akka-docs/src/main/paradox/typed/testing.md
index 7fe53ae6e6..12d7b3954d 100644
--- a/akka-docs/src/main/paradox/typed/testing.md
+++ b/akka-docs/src/main/paradox/typed/testing.md
@@ -183,3 +183,16 @@ Scala
Java
: @@snip [BasicAsyncTestingTest.java]($akka$/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/testing/async/BasicAsyncTestingTest.java) { #test-spawn-anonymous }
+### Controlling the scheduler
+
+It can be hard to reliably unit test specific scenario's when your actor relies on timing:
+especially when running many tests in parallel it can be hard to get the timing just right.
+Making such tests more reliable by using generous timeouts make the tests take a long time to run.
+
+For such situations, we provide a scheduler where you can manually, explicitly advance the clock.
+
+Scala
+: @@snip [ManualTimerSpec.scala]($akka$/akka-actor-typed-tests/src/test/scala/akka/actor/typed/ManualTimerSpec.scala) { #manual-scheduling-simple }
+
+Java
+: @@snip [ManualTimerTest.scala]($akka$/akka-actor-typed-tests/src/test/java/akka/actor/typed/ManualTimerTest.java) { #manual-scheduling-simple }
\ No newline at end of file
diff --git a/akka-testkit-typed/src/main/scala/akka/testkit/typed/ExplicitlyTriggeredScheduler.scala b/akka-testkit-typed/src/main/scala/akka/testkit/typed/ExplicitlyTriggeredScheduler.scala
new file mode 100644
index 0000000000..979efe00a5
--- /dev/null
+++ b/akka-testkit-typed/src/main/scala/akka/testkit/typed/ExplicitlyTriggeredScheduler.scala
@@ -0,0 +1,19 @@
+package akka.testkit.typed
+
+import java.util.concurrent.ThreadFactory
+
+import com.typesafe.config.Config
+
+import scala.annotation.varargs
+import scala.concurrent.duration.{ Duration, FiniteDuration }
+
+import akka.event.LoggingAdapter
+
+class ExplicitlyTriggeredScheduler(config: Config, log: LoggingAdapter, tf: ThreadFactory) extends akka.testkit.ExplicitlyTriggeredScheduler(config, log, tf) {
+
+ @varargs
+ def expectNoMessageFor(duration: FiniteDuration, on: scaladsl.TestProbe[_]*): Unit = {
+ timePasses(duration)
+ on.foreach(_.expectNoMessage(Duration.Zero))
+ }
+}
diff --git a/akka-testkit-typed/src/main/scala/akka/testkit/typed/TestKit.scala b/akka-testkit-typed/src/main/scala/akka/testkit/typed/TestKit.scala
index fd0a821f1d..28b4547ca8 100644
--- a/akka-testkit-typed/src/main/scala/akka/testkit/typed/TestKit.scala
+++ b/akka-testkit-typed/src/main/scala/akka/testkit/typed/TestKit.scala
@@ -64,6 +64,7 @@ class TestKit(name: String, config: Option[Config]) extends TestKitBase {
def this(name: String) = this(name, None)
def this(config: Config) = this(TestKit.getCallerName(classOf[TestKit]), Some(config))
def this(name: String, config: Config) = this(name, Some(config))
+
import TestKit._
implicit val system = ActorSystem(testKitGuardian, name, config = config)
}
diff --git a/akka-testkit-typed/src/main/scala/akka/testkit/typed/scaladsl/ManualTime.scala b/akka-testkit-typed/src/main/scala/akka/testkit/typed/scaladsl/ManualTime.scala
new file mode 100644
index 0000000000..7136e02838
--- /dev/null
+++ b/akka-testkit-typed/src/main/scala/akka/testkit/typed/scaladsl/ManualTime.scala
@@ -0,0 +1,12 @@
+package akka.testkit.typed.scaladsl
+
+import com.typesafe.config.{ Config, ConfigFactory }
+
+import akka.testkit.typed._
+
+object ManualTime {
+ val config: Config = ConfigFactory.parseString("""akka.scheduler.implementation = "akka.testkit.typed.ExplicitlyTriggeredScheduler"""")
+}
+trait ManualTime { self: TestKit ⇒
+ override val scheduler: ExplicitlyTriggeredScheduler = self.system.scheduler.asInstanceOf[ExplicitlyTriggeredScheduler]
+}
diff --git a/akka-testkit/src/main/scala/akka/testkit/ExplicitlyTriggeredScheduler.scala b/akka-testkit/src/main/scala/akka/testkit/ExplicitlyTriggeredScheduler.scala
new file mode 100644
index 0000000000..4475947c3d
--- /dev/null
+++ b/akka-testkit/src/main/scala/akka/testkit/ExplicitlyTriggeredScheduler.scala
@@ -0,0 +1,112 @@
+package akka.testkit
+
+import java.util.concurrent.ThreadFactory
+import java.util.concurrent.ConcurrentHashMap
+import java.util.concurrent.atomic.AtomicLong
+
+import com.typesafe.config.Config
+
+import scala.annotation.tailrec
+import scala.collection.JavaConverters._
+import scala.concurrent.ExecutionContext
+import scala.concurrent.duration.{ Duration, FiniteDuration }
+import scala.util.Try
+
+import akka.actor.Cancellable
+import akka.actor.Scheduler
+import akka.event.LoggingAdapter
+
+/**
+ * For testing: scheduler that does not look at the clock, but must be
+ * progressed manually by calling `timePasses`.
+ *
+ * This allows for faster and less timing-sensitive specs, as jobs will be
+ * executed on the test thread instead of using the original
+ * {ExecutionContext}. This means recreating specific scenario's becomes
+ * easier, but these tests might fail to catch race conditions that only
+ * happen when tasks are scheduled in parallel in 'real time'.
+ */
+class ExplicitlyTriggeredScheduler(config: Config, log: LoggingAdapter, tf: ThreadFactory) extends Scheduler {
+
+ private case class Item(time: Long, interval: Option[FiniteDuration], runnable: Runnable)
+
+ private val currentTime = new AtomicLong()
+ private val scheduled = new ConcurrentHashMap[Item, Unit]()
+
+ override def schedule(initialDelay: FiniteDuration, interval: FiniteDuration, runnable: Runnable)(implicit executor: ExecutionContext): Cancellable =
+ schedule(initialDelay, Some(interval), runnable)
+
+ override def scheduleOnce(delay: FiniteDuration, runnable: Runnable)(implicit executor: ExecutionContext): Cancellable =
+ schedule(delay, None, runnable)
+
+ /**
+ * Advance the clock by the specified duration, executing all outstanding jobs on the calling thread before returning.
+ *
+ * We will not add a dilation factor to this amount, since the scheduler API also does not apply dilation.
+ * If you want the amount of time passed to be dilated, apply the dilation before passing the delay to
+ * this method.
+ */
+ def timePasses(amount: FiniteDuration) = {
+ // Give dispatchers time to clear :(. See
+ // https://github.com/akka/akka/pull/24243#discussion_r160985493
+ // for some discussion on how to deal with this properly.
+ Thread.sleep(100)
+
+ val newTime = currentTime.get + amount.toMillis
+ if (log.isDebugEnabled)
+ log.debug(s"Time proceeds from ${currentTime.get} to $newTime, currently scheduled for this period:" + scheduledTasks(newTime).map(item ⇒ s"\n- $item"))
+
+ executeTasks(newTime)
+ currentTime.set(newTime)
+ }
+
+ private def scheduledTasks(runTo: Long): Seq[Item] =
+ scheduled
+ .keySet()
+ .asScala
+ .filter(_.time <= runTo)
+ .toList
+ .sortBy(_.time)
+
+ @tailrec
+ private[testkit] final def executeTasks(runTo: Long): Unit = {
+ scheduledTasks(runTo).headOption match {
+ case Some(task) ⇒
+ currentTime.set(task.time)
+ val runResult = Try(task.runnable.run())
+ scheduled.remove(task)
+
+ if (runResult.isSuccess)
+ task.interval.foreach(i ⇒ scheduled.put(task.copy(time = task.time + i.toMillis), ()))
+
+ // running the runnable might have scheduled new events
+ executeTasks(runTo)
+ case _ ⇒ // Done
+ }
+ }
+
+ private def schedule(initialDelay: FiniteDuration, interval: Option[FiniteDuration], runnable: Runnable): Cancellable = {
+ val firstTime = currentTime.get + initialDelay.toMillis
+ val item = Item(firstTime, interval, runnable)
+ log.debug("Scheduled item for {}: {}", firstTime, item)
+ scheduled.put(item, ())
+
+ if (initialDelay <= Duration.Zero)
+ executeTasks(currentTime.get)
+
+ new Cancellable {
+ var cancelled = false
+
+ override def cancel(): Boolean = {
+ val before = scheduled.size
+ scheduled.remove(item)
+ cancelled = true
+ before > scheduled.size
+ }
+
+ override def isCancelled: Boolean = cancelled
+ }
+ }
+
+ override def maxFrequency: Double = 42
+}