diff --git a/akka-typed-testkit/src/main/resources/reference.conf b/akka-typed-testkit/src/main/resources/reference.conf new file mode 100644 index 0000000000..1230df0591 --- /dev/null +++ b/akka-typed-testkit/src/main/resources/reference.conf @@ -0,0 +1,20 @@ +############################################ +# Akka Typed Testkit Reference Config File # +############################################ + +# This is the reference config file that contains all the default settings. +# Make your edits/overrides in your application.conf. + +akka.typed.test { + # factor by which to scale timeouts during tests, e.g. to account for shared + # build system load + timefactor = 1.0 + + # duration to wait in expectMsg and friends outside of within() block + # by default + single-expect-default = 3s + + # The timeout that is added as an implicit by DefaultTimeout trait + default-timeout = 5s + +} diff --git a/akka-typed-testkit/src/main/scala/akka/typed/testkit/Effects.scala b/akka-typed-testkit/src/main/scala/akka/typed/testkit/Effects.scala index 56076032a2..165d8f8506 100644 --- a/akka-typed-testkit/src/main/scala/akka/typed/testkit/Effects.scala +++ b/akka-typed-testkit/src/main/scala/akka/typed/testkit/Effects.scala @@ -9,7 +9,10 @@ import akka.typed.{ ActorContext, ActorRef, ActorSystem, Behavior, DeploymentCon import scala.annotation.tailrec import scala.collection.immutable +import scala.util.control.Exception.Catcher +import scala.util.control.NonFatal import scala.concurrent.duration.{ Duration, FiniteDuration } +import akka.typed.PostStop /** * All tracked effects must extend implement this type. It is deliberately @@ -56,12 +59,23 @@ class EffectfulActorContext[T](_name: String, _initialBehavior: Behavior[T], _ma def currentBehavior: Behavior[T] = current def isAlive: Boolean = Behavior.isAlive(current) + private def handleException: Catcher[Unit] = { + case NonFatal(e) ⇒ + try Behavior.canonicalize(Behavior.interpretSignal(current, this, PostStop), current, this) // TODO why canonicalize here? + catch { case NonFatal(ex) ⇒ /* ignore, real is logging */ } + throw e + } + def run(msg: T): Unit = { - current = Behavior.canonicalize(Behavior.interpretMessage(current, this, msg), current, this) + try { + current = Behavior.canonicalize(Behavior.interpretMessage(current, this, msg), current, this) + } catch handleException } def signal(signal: Signal): Unit = { - current = Behavior.canonicalize(Behavior.interpretSignal(current, this, signal), current, this) + try { + current = Behavior.canonicalize(Behavior.interpretSignal(current, this, signal), current, this) + } catch handleException } override def spawnAnonymous[U](behavior: Behavior[U], deployment: DeploymentConfig = EmptyDeploymentConfig): ActorRef[U] = { diff --git a/akka-typed-testkit/src/main/scala/akka/typed/testkit/TestKitSettings.scala b/akka-typed-testkit/src/main/scala/akka/typed/testkit/TestKitSettings.scala new file mode 100644 index 0000000000..3cfa588d65 --- /dev/null +++ b/akka-typed-testkit/src/main/scala/akka/typed/testkit/TestKitSettings.scala @@ -0,0 +1,34 @@ +/** + * Copyright (C) 2017 Lightbend Inc. + */ +package akka.typed.testkit + +import com.typesafe.config.Config +import scala.concurrent.duration.FiniteDuration +import akka.util.Timeout +import akka.typed.ActorSystem + +object TestKitSettings { + /** + * Reads configuration settings from `akka.typed.test` section. + */ + def apply(system: ActorSystem[_]): TestKitSettings = + apply(system.settings.config) + + /** + * Reads configuration settings from given `Config` that + * must have the same layout as the `akka.typed.test` section. + */ + def apply(config: Config): TestKitSettings = + new TestKitSettings(config) +} + +class TestKitSettings(val config: Config) { + + import akka.util.Helpers._ + + val TestTimeFactor = config.getDouble("akka.typed.test.timefactor"). + requiring(tf ⇒ !tf.isInfinite && tf > 0, "akka.typed.test.timefactor must be positive finite double") + val SingleExpectDefaultTimeout: FiniteDuration = config.getMillisDuration("akka.typed.test.single-expect-default") + val DefaultTimeout: Timeout = Timeout(config.getMillisDuration("akka.typed.test.default-timeout")) +} diff --git a/akka-typed-testkit/src/main/scala/akka/typed/testkit/scaladsl/TestProbe.scala b/akka-typed-testkit/src/main/scala/akka/typed/testkit/scaladsl/TestProbe.scala new file mode 100644 index 0000000000..a1850a1c57 --- /dev/null +++ b/akka-typed-testkit/src/main/scala/akka/typed/testkit/scaladsl/TestProbe.scala @@ -0,0 +1,229 @@ +/** + * Copyright (C) 2017 Lightbend Inc. + */ +package akka.typed.testkit.scaladsl + +import scala.concurrent.duration._ +import java.util.concurrent.BlockingDeque +import akka.typed.Behavior +import akka.typed.scaladsl.Actor._ +import akka.typed.ActorSystem +import java.util.concurrent.LinkedBlockingDeque +import java.util.concurrent.atomic.AtomicInteger +import akka.typed.ActorRef +import akka.util.Timeout +import akka.util.PrettyDuration.PrettyPrintableDuration +import scala.concurrent.Await +import com.typesafe.config.Config +import akka.typed.testkit.TestKitSettings +import akka.util.BoxedType +import scala.reflect.ClassTag + +object TestProbe { + private val testActorId = new AtomicInteger(0) + + def apply[M]()(implicit system: ActorSystem[_], settings: TestKitSettings): TestProbe[M] = + apply(name = "testProbe") + + def apply[M](name: String)(implicit system: ActorSystem[_], settings: TestKitSettings): TestProbe[M] = + new TestProbe(name) + + private def testActor[M](queue: BlockingDeque[M]): Behavior[M] = Immutable { (ctx, msg) ⇒ + queue.offerLast(msg) + Same + } +} + +class TestProbe[M](name: String)(implicit val system: ActorSystem[_], val settings: TestKitSettings) { + import TestProbe._ + private val queue = new LinkedBlockingDeque[M] + + private var end: Duration = Duration.Undefined + + /** + * if last assertion was expectNoMsg, disable timing failure upon within() + * block end. + */ + private var lastWasNoMsg = false + + private var lastMessage: Option[M] = None + + val testActor: ActorRef[M] = { + implicit val timeout = Timeout(3.seconds) + val futRef = system.systemActorOf(TestProbe.testActor(queue), s"$name-${testActorId.incrementAndGet()}") + Await.result(futRef, timeout.duration + 1.second) + } + + /** + * Shorthand to get the `testActor`. + */ + def ref: ActorRef[M] = testActor + + /** + * Obtain current time (`System.nanoTime`) as Duration. + */ + protected def now: FiniteDuration = System.nanoTime.nanos + + /** + * Obtain time remaining for execution of the innermost enclosing `within` + * block or missing that it returns the properly dilated default for this + * case from settings (key "akka.typed.test.single-expect-default"). + */ + def remainingOrDefault = remainingOr(settings.SingleExpectDefaultTimeout.dilated) + + /** + * Obtain time remaining for execution of the innermost enclosing `within` + * block or throw an [[AssertionError]] if no `within` block surrounds this + * call. + */ + def remaining: FiniteDuration = end match { + case f: FiniteDuration ⇒ f - now + case _ ⇒ throw new AssertionError("`remaining` may not be called outside of `within`") + } + + /** + * Obtain time remaining for execution of the innermost enclosing `within` + * block or missing that it returns the given duration. + */ + def remainingOr(duration: FiniteDuration): FiniteDuration = end match { + case x if x eq Duration.Undefined ⇒ duration + case x if !x.isFinite ⇒ throw new IllegalArgumentException("`end` cannot be infinite") + case f: FiniteDuration ⇒ f - now + } + + private def remainingOrDilated(max: Duration): FiniteDuration = max match { + case x if x eq Duration.Undefined ⇒ remainingOrDefault + case x if !x.isFinite ⇒ throw new IllegalArgumentException("max duration cannot be infinite") + case f: FiniteDuration ⇒ f.dilated + } + + /** + * Execute code block while bounding its execution time between `min` and + * `max`. `within` blocks may be nested. All methods in this trait which + * take maximum wait times are available in a version which implicitly uses + * the remaining time governed by the innermost enclosing `within` block. + * + * Note that the timeout is scaled using Duration.dilated, which uses the + * configuration entry "akka.typed.test.timefactor", while the min Duration is not. + * + * {{{ + * val ret = within(50 millis) { + * test ! Ping + * expectMsgType[Pong] + * } + * }}} + */ + def within[T](min: FiniteDuration, max: FiniteDuration)(f: ⇒ T): T = { + val _max = max.dilated + val start = now + val rem = if (end == Duration.Undefined) Duration.Inf else end - start + assert(rem >= min, s"required min time $min not possible, only ${rem.pretty} left") + + lastWasNoMsg = false + + val max_diff = _max min rem + val prev_end = end + end = start + max_diff + + val ret = try f finally end = prev_end + + val diff = now - start + assert(min <= diff, s"block took ${diff.pretty}, should at least have been $min") + if (!lastWasNoMsg) { + assert(diff <= max_diff, s"block took ${diff.pretty}, exceeding ${max_diff.pretty}") + } + + ret + } + + /** + * Same as calling `within(0 seconds, max)(f)`. + */ + def within[T](max: FiniteDuration)(f: ⇒ T): T = within(Duration.Zero, max)(f) + + /** + * Same as `expectMsg(remainingOrDefault, obj)`, but correctly treating the timeFactor. + */ + def expectMsg[T <: M](obj: T): T = expectMsg_internal(remainingOrDefault, obj) + + /** + * Receive one message from the test actor and assert that it equals the + * given object. Wait time is bounded by the given duration, with an + * AssertionFailure being thrown in case of timeout. + * + * @return the received object + */ + def expectMsg[T <: M](max: FiniteDuration, obj: T): T = expectMsg_internal(max.dilated, obj) + + /** + * Receive one message from the test actor and assert that it equals the + * given object. Wait time is bounded by the given duration, with an + * AssertionFailure being thrown in case of timeout. + * + * @return the received object + */ + def expectMsg[T <: M](max: FiniteDuration, hint: String, obj: T): T = expectMsg_internal(max.dilated, obj, Some(hint)) + + private def expectMsg_internal[T <: M](max: Duration, obj: T, hint: Option[String] = None): T = { + val o = receiveOne(max) + val hintOrEmptyString = hint.map(": " + _).getOrElse("") + assert(o != null, s"timeout ($max) during expectMsg while waiting for $obj" + hintOrEmptyString) + assert(obj == o, s"expected $obj, found $o" + hintOrEmptyString) + o.asInstanceOf[T] + } + + /** + * Receive one message from the internal queue of the TestActor. If the given + * duration is zero, the queue is polled (non-blocking). + * + * This method does NOT automatically scale its Duration parameter! + */ + private def receiveOne(max: Duration): M = { + val message = + if (max == 0.seconds) { + queue.pollFirst + } else if (max.isFinite) { + queue.pollFirst(max.length, max.unit) + } else { + queue.takeFirst + } + lastWasNoMsg = false + lastMessage = if (message == null) None else Some(message) + message + } + + /** + * Assert that no message is received for the specified time. + */ + def expectNoMsg(max: FiniteDuration) { expectNoMsg_internal(max.dilated) } + + private def expectNoMsg_internal(max: FiniteDuration) { + val o = receiveOne(max) + assert(o == null, s"received unexpected message $o") + lastWasNoMsg = true + } + + /** + * Same as `expectMsgType[T](remainingOrDefault)`, but correctly treating the timeFactor. + */ + def expectMsgType[T <: M](implicit t: ClassTag[T]): T = + expectMsgClass_internal(remainingOrDefault, t.runtimeClass.asInstanceOf[Class[T]]) + + /** + * Receive one message from the test actor and assert that it conforms to the + * given type (after erasure). Wait time is bounded by the given duration, + * with an AssertionFailure being thrown in case of timeout. + * + * @return the received object + */ + def expectMsgType[T <: M](max: FiniteDuration)(implicit t: ClassTag[T]): T = + expectMsgClass_internal(max.dilated, t.runtimeClass.asInstanceOf[Class[T]]) + + private def expectMsgClass_internal[C](max: FiniteDuration, c: Class[C]): C = { + val o = receiveOne(max) + assert(o != null, s"timeout ($max) during expectMsgClass waiting for $c") + assert(BoxedType(c) isInstance o, s"expected $c, found ${o.getClass} ($o)") + o.asInstanceOf[C] + } + +} diff --git a/akka-typed-testkit/src/main/scala/akka/typed/testkit/scaladsl/package.scala b/akka-typed-testkit/src/main/scala/akka/typed/testkit/scaladsl/package.scala new file mode 100644 index 0000000000..81053e8c94 --- /dev/null +++ b/akka-typed-testkit/src/main/scala/akka/typed/testkit/scaladsl/package.scala @@ -0,0 +1,34 @@ +/** + * Copyright (C) 2009-2017 Lightbend Inc. + */ +package akka.typed.testkit + +import scala.concurrent.duration.{ Duration, FiniteDuration } +import scala.reflect.ClassTag +import scala.collection.immutable +import java.util.concurrent.TimeUnit.MILLISECONDS +import akka.typed.ActorSystem + +package object scaladsl { + + /** + * Scala API. Scale timeouts (durations) during tests with the configured + * 'akka.test.timefactor'. + * Implicit class providing `dilated` method. + * + * {{{ + * import scala.concurrent.duration._ + * import akka.typed.testkit.scaladsl._ + * 10.milliseconds.dilated + * }}} + * + * Uses the scaling factor from the `TestTimeFactor` in the [[TestKitSettings]] + * (in implicit scope). + * + */ + implicit class TestDuration(val duration: FiniteDuration) extends AnyVal { + def dilated(implicit settings: TestKitSettings): FiniteDuration = + (duration * settings.TestTimeFactor).asInstanceOf[FiniteDuration] + } + +}