support stop of TestProbe
This commit is contained in:
parent
e5c1fc02a9
commit
850a10443b
4 changed files with 40 additions and 11 deletions
|
|
@ -27,18 +27,26 @@ import scala.util.control.NonFatal
|
|||
private[akka] object TestProbeImpl {
|
||||
private val testActorId = new AtomicInteger(0)
|
||||
|
||||
private case class WatchActor[U](actor: ActorRef[U])
|
||||
private def testActor[M](queue: BlockingDeque[M], terminations: BlockingDeque[Terminated]): Behavior[M] = Behaviors.receive[M] { (context, message) ⇒
|
||||
message match {
|
||||
case WatchActor(ref) ⇒ context.watch(ref)
|
||||
case other ⇒ queue.offerLast(other)
|
||||
private final case class WatchActor[U](actor: ActorRef[U])
|
||||
private case object Stop
|
||||
|
||||
private def testActor[M](queue: BlockingDeque[M], terminations: BlockingDeque[Terminated]): Behavior[M] =
|
||||
Behaviors.receive[M] { (context, msg) ⇒
|
||||
msg match {
|
||||
case WatchActor(ref) ⇒
|
||||
context.watch(ref)
|
||||
Behaviors.same
|
||||
case Stop ⇒
|
||||
Behaviors.stopped
|
||||
case other ⇒
|
||||
queue.offerLast(other)
|
||||
Behaviors.same
|
||||
}
|
||||
}.receiveSignal {
|
||||
case (_, t: Terminated) ⇒
|
||||
terminations.offerLast(t)
|
||||
Behaviors.same
|
||||
}
|
||||
Behaviors.same
|
||||
}.receiveSignal {
|
||||
case (_, t: Terminated) ⇒
|
||||
terminations.offerLast(t)
|
||||
Behaviors.same
|
||||
}
|
||||
}
|
||||
|
||||
@InternalApi
|
||||
|
|
@ -290,4 +298,8 @@ private[akka] final class TestProbeImpl[M](name: String, system: ActorSystem[_])
|
|||
|
||||
private def assertFail(msg: String): Nothing = throw new AssertionError(msg)
|
||||
|
||||
override def stop(): Unit = {
|
||||
testActor.asInstanceOf[ActorRef[AnyRef]] ! Stop
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -279,4 +279,9 @@ abstract class TestProbe[M] {
|
|||
*/
|
||||
@InternalApi protected def fishForMessage_internal(max: FiniteDuration, hint: String, fisher: M ⇒ FishingOutcome): List[M]
|
||||
|
||||
/**
|
||||
* Stops the [[TestProbe.getRef]], which is useful when testing watch and termination.
|
||||
*/
|
||||
def stop(): Unit
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -224,4 +224,9 @@ object TestProbe {
|
|||
* which uses the configuration entry "akka.test.timefactor".
|
||||
*/
|
||||
def awaitAssert[A](a: ⇒ A, max: Duration = Duration.Undefined, interval: Duration = 100.millis): A
|
||||
|
||||
/**
|
||||
* Stops the [[TestProbe.ref]], which is useful when testing watch and termination.
|
||||
*/
|
||||
def stop(): Unit
|
||||
}
|
||||
|
|
|
|||
|
|
@ -158,6 +158,13 @@ class TestProbeSpec extends ScalaTestWithActorTestKit with WordSpecLike {
|
|||
"timeout if expected single message is not received by a provided timeout" in {
|
||||
intercept[AssertionError](createTestProbe[EventT]().receiveOne(100.millis))
|
||||
}
|
||||
|
||||
"support watch and stop of probe" in {
|
||||
val probe1 = TestProbe[String]()
|
||||
val probe2 = TestProbe[String]()
|
||||
probe1.stop()
|
||||
probe2.expectTerminated(probe1.ref, probe2.remainingOrDefault)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue