TestProbe support for actor stop checking (#24365)

* TestProbe support for actor stop checking #24356
This commit is contained in:
Johan Andrén 2018-01-22 14:49:54 +01:00 committed by Christopher Batey
parent 719f0fb672
commit ab76b6b64d
3 changed files with 79 additions and 10 deletions

View file

@ -6,13 +6,11 @@ package akka.testkit.typed.scaladsl
import scala.concurrent.duration._ import scala.concurrent.duration._
import java.util.concurrent.BlockingDeque import java.util.concurrent.BlockingDeque
import akka.actor.typed.Behavior import akka.actor.typed.{ ActorRef, ActorSystem, Behavior, Terminated }
import akka.actor.typed.scaladsl.Behaviors import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.ActorSystem
import java.util.concurrent.LinkedBlockingDeque import java.util.concurrent.LinkedBlockingDeque
import java.util.concurrent.atomic.AtomicInteger import java.util.concurrent.atomic.AtomicInteger
import akka.actor.typed.ActorRef
import akka.util.Timeout import akka.util.Timeout
import akka.util.PrettyDuration.PrettyPrintableDuration import akka.util.PrettyDuration.PrettyPrintableDuration
@ -34,9 +32,17 @@ object TestProbe {
def apply[M](name: String)(implicit system: ActorSystem[_]): TestProbe[M] = def apply[M](name: String)(implicit system: ActorSystem[_]): TestProbe[M] =
new TestProbe(name) new TestProbe(name)
private def testActor[M](queue: BlockingDeque[M]): Behavior[M] = Behaviors.immutable { (ctx, msg) private case class WatchActor[U](actor: ActorRef[U])
queue.offerLast(msg) private def testActor[M](queue: BlockingDeque[M], terminations: BlockingDeque[Terminated]): Behavior[M] = Behaviors.immutable[M] { (ctx, msg)
msg match {
case WatchActor(ref) ctx.watch(ref)
case other queue.offerLast(other)
}
Behaviors.same Behaviors.same
}.onSignal {
case (_, t: Terminated)
terminations.offerLast(t)
Behaviors.same
} }
} }
@ -45,6 +51,7 @@ class TestProbe[M](name: String)(implicit system: ActorSystem[_]) {
import TestProbe._ import TestProbe._
private implicit val settings = TestKitSettings(system) private implicit val settings = TestKitSettings(system)
private val queue = new LinkedBlockingDeque[M] private val queue = new LinkedBlockingDeque[M]
private val terminations = new LinkedBlockingDeque[Terminated]
private var end: Duration = Duration.Undefined private var end: Duration = Duration.Undefined
@ -58,7 +65,7 @@ class TestProbe[M](name: String)(implicit system: ActorSystem[_]) {
val testActor: ActorRef[M] = { val testActor: ActorRef[M] = {
implicit val timeout = Timeout(3.seconds) implicit val timeout = Timeout(3.seconds)
val futRef = system.systemActorOf(TestProbe.testActor(queue), s"$name-${testActorId.incrementAndGet()}") val futRef = system.systemActorOf(TestProbe.testActor(queue, terminations), s"$name-${testActorId.incrementAndGet()}")
Await.result(futRef, timeout.duration + 1.second) Await.result(futRef, timeout.duration + 1.second)
} }
@ -157,7 +164,7 @@ class TestProbe[M](name: String)(implicit system: ActorSystem[_]) {
/** /**
* Receive one message from the test actor and assert that it equals the * Receive one message from the test actor and assert that it equals the
* given object. Wait time is bounded by the given duration, with an * given object. Wait time is bounded by the given duration, with an
* AssertionFailure being thrown in case of timeout. * [[AssertionError]] being thrown in case of timeout.
* *
* @return the received object * @return the received object
*/ */
@ -166,7 +173,7 @@ class TestProbe[M](name: String)(implicit system: ActorSystem[_]) {
/** /**
* Receive one message from the test actor and assert that it equals the * Receive one message from the test actor and assert that it equals the
* given object. Wait time is bounded by the given duration, with an * given object. Wait time is bounded by the given duration, with an
* AssertionFailure being thrown in case of timeout. * [[AssertionError]] being thrown in case of timeout.
* *
* @return the received object * @return the received object
*/ */
@ -188,7 +195,7 @@ class TestProbe[M](name: String)(implicit system: ActorSystem[_]) {
*/ */
private def receiveOne(max: Duration): M = { private def receiveOne(max: Duration): M = {
val message = val message =
if (max == 0.seconds) { if (max == Duration.Zero) {
queue.pollFirst queue.pollFirst
} else if (max.isFinite) { } else if (max.isFinite) {
queue.pollFirst(max.length, max.unit) queue.pollFirst(max.length, max.unit)
@ -227,7 +234,7 @@ class TestProbe[M](name: String)(implicit system: ActorSystem[_]) {
/** /**
* Receive one message from the test actor and assert that it conforms to the * Receive one message from the test actor and assert that it conforms to the
* given type (after erasure). Wait time is bounded by the given duration, * given type (after erasure). Wait time is bounded by the given duration,
* with an AssertionFailure being thrown in case of timeout. * with an [[AssertionError]] being thrown in case of timeout.
* *
* @return the received object * @return the received object
*/ */
@ -241,6 +248,24 @@ class TestProbe[M](name: String)(implicit system: ActorSystem[_]) {
o.asInstanceOf[C] o.asInstanceOf[C]
} }
/**
* Expect the given actor to be stopped or stop withing the given timeout or
* throw an [[AssertionError]].
*/
def expectTerminated[U](actorRef: ActorRef[U], max: FiniteDuration): Unit = {
testActor.asInstanceOf[ActorRef[AnyRef]] ! WatchActor(actorRef)
val message =
if (max == Duration.Zero) {
terminations.pollFirst
} else if (max.isFinite) {
terminations.pollFirst(max.length, max.unit)
} else {
terminations.takeFirst
}
assert(message != null, s"timeout ($max) during expectStop waiting for actor [${actorRef.path}] to stop")
assert(message.ref == actorRef, s"expected [${actorRef.path}] to stop, but saw [${message.ref.path}] stop")
}
/** /**
* Evaluate the given assert every `interval` until it does not throw an exception and return the * Evaluate the given assert every `interval` until it does not throw an exception and return the
* result. * result.

View file

@ -0,0 +1,44 @@
/**
* Copyright (C) 2009-2018 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.testkit.typed.scaladsl
import akka.actor.typed.scaladsl.Behaviors
import akka.testkit.typed.TestKit
import org.scalatest.{ BeforeAndAfterAll, Matchers, WordSpecLike }
import scala.concurrent.duration._
class TestProbeSpec extends TestKit with WordSpecLike with Matchers with BeforeAndAfterAll {
"The test probe" must {
"allow probing for actor stop when actor already stopped" in {
case object Stop
val probe = TestProbe()
val ref = spawn(Behaviors.stopped)
probe.expectTerminated(ref, 100.millis)
}
"allow probing for actor stop when actor has not stopped yet" in {
case object Stop
val probe = TestProbe()
val ref = spawn(Behaviors.immutable[Stop.type]((ctx, message)
Behaviors.withTimers { (timer)
timer.startSingleTimer("key", Stop, 300.millis)
Behaviors.immutable((ctx, stop)
Behaviors.stopped
)
}
))
ref ! Stop
// race, but not sure how to test in any other way
probe.expectTerminated(ref, 500.millis)
}
}
override protected def afterAll(): Unit = {
shutdown()
}
}