diff --git a/akka-testkit-typed/src/main/scala/akka/testkit/typed/scaladsl/TestProbe.scala b/akka-testkit-typed/src/main/scala/akka/testkit/typed/scaladsl/TestProbe.scala index 912110bebd..99996ee46a 100644 --- a/akka-testkit-typed/src/main/scala/akka/testkit/typed/scaladsl/TestProbe.scala +++ b/akka-testkit-typed/src/main/scala/akka/testkit/typed/scaladsl/TestProbe.scala @@ -6,13 +6,11 @@ package akka.testkit.typed.scaladsl import scala.concurrent.duration._ import java.util.concurrent.BlockingDeque -import akka.actor.typed.Behavior +import akka.actor.typed.{ ActorRef, ActorSystem, Behavior, Terminated } import akka.actor.typed.scaladsl.Behaviors -import akka.actor.typed.ActorSystem import java.util.concurrent.LinkedBlockingDeque import java.util.concurrent.atomic.AtomicInteger -import akka.actor.typed.ActorRef import akka.util.Timeout import akka.util.PrettyDuration.PrettyPrintableDuration @@ -34,9 +32,17 @@ object TestProbe { def apply[M](name: String)(implicit system: ActorSystem[_]): TestProbe[M] = new TestProbe(name) - private def testActor[M](queue: BlockingDeque[M]): Behavior[M] = Behaviors.immutable { (ctx, msg) ⇒ - queue.offerLast(msg) + private case class WatchActor[U](actor: ActorRef[U]) + private def testActor[M](queue: BlockingDeque[M], terminations: BlockingDeque[Terminated]): Behavior[M] = Behaviors.immutable[M] { (ctx, msg) ⇒ + msg match { + case WatchActor(ref) ⇒ ctx.watch(ref) + case other ⇒ queue.offerLast(other) + } Behaviors.same + }.onSignal { + case (_, t: Terminated) ⇒ + terminations.offerLast(t) + Behaviors.same } } @@ -45,6 +51,7 @@ class TestProbe[M](name: String)(implicit system: ActorSystem[_]) { import TestProbe._ private implicit val settings = TestKitSettings(system) private val queue = new LinkedBlockingDeque[M] + private val terminations = new LinkedBlockingDeque[Terminated] private var end: Duration = Duration.Undefined @@ -58,7 +65,7 @@ class TestProbe[M](name: String)(implicit system: ActorSystem[_]) { val testActor: ActorRef[M] = { implicit val timeout = Timeout(3.seconds) - val futRef = system.systemActorOf(TestProbe.testActor(queue), s"$name-${testActorId.incrementAndGet()}") + val futRef = system.systemActorOf(TestProbe.testActor(queue, terminations), s"$name-${testActorId.incrementAndGet()}") Await.result(futRef, timeout.duration + 1.second) } @@ -157,7 +164,7 @@ class TestProbe[M](name: String)(implicit system: ActorSystem[_]) { /** * Receive one message from the test actor and assert that it equals the * given object. Wait time is bounded by the given duration, with an - * AssertionFailure being thrown in case of timeout. + * [[AssertionError]] being thrown in case of timeout. * * @return the received object */ @@ -166,7 +173,7 @@ class TestProbe[M](name: String)(implicit system: ActorSystem[_]) { /** * Receive one message from the test actor and assert that it equals the * given object. Wait time is bounded by the given duration, with an - * AssertionFailure being thrown in case of timeout. + * [[AssertionError]] being thrown in case of timeout. * * @return the received object */ @@ -188,7 +195,7 @@ class TestProbe[M](name: String)(implicit system: ActorSystem[_]) { */ private def receiveOne(max: Duration): M = { val message = - if (max == 0.seconds) { + if (max == Duration.Zero) { queue.pollFirst } else if (max.isFinite) { queue.pollFirst(max.length, max.unit) @@ -227,7 +234,7 @@ class TestProbe[M](name: String)(implicit system: ActorSystem[_]) { /** * Receive one message from the test actor and assert that it conforms to the * given type (after erasure). Wait time is bounded by the given duration, - * with an AssertionFailure being thrown in case of timeout. + * with an [[AssertionError]] being thrown in case of timeout. * * @return the received object */ @@ -241,6 +248,24 @@ class TestProbe[M](name: String)(implicit system: ActorSystem[_]) { o.asInstanceOf[C] } + /** + * Expect the given actor to be stopped or stop withing the given timeout or + * throw an [[AssertionError]]. + */ + def expectTerminated[U](actorRef: ActorRef[U], max: FiniteDuration): Unit = { + testActor.asInstanceOf[ActorRef[AnyRef]] ! WatchActor(actorRef) + val message = + if (max == Duration.Zero) { + terminations.pollFirst + } else if (max.isFinite) { + terminations.pollFirst(max.length, max.unit) + } else { + terminations.takeFirst + } + assert(message != null, s"timeout ($max) during expectStop waiting for actor [${actorRef.path}] to stop") + assert(message.ref == actorRef, s"expected [${actorRef.path}] to stop, but saw [${message.ref.path}] stop") + } + /** * Evaluate the given assert every `interval` until it does not throw an exception and return the * result. diff --git a/akka-testkit-typed/src/test/scala/akka/testkit/typed/scaladsl/TestProbeSpec.scala b/akka-testkit-typed/src/test/scala/akka/testkit/typed/scaladsl/TestProbeSpec.scala new file mode 100644 index 0000000000..f270ed9eeb --- /dev/null +++ b/akka-testkit-typed/src/test/scala/akka/testkit/typed/scaladsl/TestProbeSpec.scala @@ -0,0 +1,44 @@ +/** + * Copyright (C) 2009-2018 Lightbend Inc. + */ +package akka.testkit.typed.scaladsl + +import akka.actor.typed.scaladsl.Behaviors +import akka.testkit.typed.TestKit +import org.scalatest.{ BeforeAndAfterAll, Matchers, WordSpecLike } +import scala.concurrent.duration._ + +class TestProbeSpec extends TestKit with WordSpecLike with Matchers with BeforeAndAfterAll { + + "The test probe" must { + + "allow probing for actor stop when actor already stopped" in { + case object Stop + val probe = TestProbe() + val ref = spawn(Behaviors.stopped) + probe.expectTerminated(ref, 100.millis) + } + + "allow probing for actor stop when actor has not stopped yet" in { + case object Stop + val probe = TestProbe() + val ref = spawn(Behaviors.immutable[Stop.type]((ctx, message) ⇒ + Behaviors.withTimers { (timer) ⇒ + timer.startSingleTimer("key", Stop, 300.millis) + + Behaviors.immutable((ctx, stop) ⇒ + Behaviors.stopped + ) + } + )) + ref ! Stop + // race, but not sure how to test in any other way + probe.expectTerminated(ref, 500.millis) + } + + } + + override protected def afterAll(): Unit = { + shutdown() + } +} diff --git a/akka-testkit-typed/src/test/scala/akka/typed/testkit/BehaviorTestkitSpec.scala b/akka-testkit-typed/src/test/scala/akka/typed/testkit/BehaviorTestkitSpec.scala deleted file mode 100644 index e69de29bb2..0000000000