diff --git a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/TimerSpec.scala b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/TimerSpec.scala index b4336306ba..af0afc67ba 100644 --- a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/TimerSpec.scala +++ b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/TimerSpec.scala @@ -11,6 +11,7 @@ import scala.concurrent.duration._ import scala.util.control.NoStackTrace import akka.actor.typed.scaladsl.Behaviors import akka.actor.typed.scaladsl.TimerScheduler +import akka.testkit.TimingTest import akka.testkit.typed.TestKitSettings import akka.testkit.typed.TestKit import akka.testkit.typed.scaladsl._ @@ -32,6 +33,7 @@ class TimerSpec extends TestKit("TimerSpec") case class Tock(n: Int) extends Event case class GotPostStop(timerActive: Boolean) extends Event case class GotPreRestart(timerActive: Boolean) extends Event + case object Cancelled extends Event class Exc extends RuntimeException("simulated exc") with NoStackTrace @@ -60,6 +62,7 @@ class TimerSpec extends TestKit("TimerSpec") Behaviors.stopped case Cancel ⇒ timer.cancel("T") + monitor ! Cancelled Behaviors.same case Throw(e) ⇒ throw e @@ -78,7 +81,7 @@ class TimerSpec extends TestKit("TimerSpec") } "A timer" must { - "schedule non-repeated ticks" in { + "schedule non-repeated ticks" taggedAs TimingTest in { val probe = TestProbe[Event]("evt") val behv = Behaviors.withTimers[Command] { timer ⇒ timer.startSingleTimer("T", Tick(1), 10.millis) @@ -93,7 +96,7 @@ class TimerSpec extends TestKit("TimerSpec") probe.expectMessage(GotPostStop(false)) } - "schedule repeated ticks" in { + "schedule repeated ticks" taggedAs TimingTest in { val probe = TestProbe[Event]("evt") val behv = Behaviors.withTimers[Command] { timer ⇒ timer.startPeriodicTimer("T", Tick(1), interval) @@ -111,7 +114,7 @@ class TimerSpec extends TestKit("TimerSpec") probe.expectMessage(GotPostStop(false)) } - "replace timer" in { + "replace timer" taggedAs TimingTest in { val probe = TestProbe[Event]("evt") val behv = Behaviors.withTimers[Command] { timer ⇒ timer.startPeriodicTimer("T", Tick(1), interval) @@ -131,7 +134,7 @@ class TimerSpec extends TestKit("TimerSpec") probe.expectMessage(GotPostStop(false)) } - "cancel timer" in { + "cancel timer" taggedAs TimingTest in { val probe = TestProbe[Event]("evt") val behv = Behaviors.withTimers[Command] { timer ⇒ timer.startPeriodicTimer("T", Tick(1), interval) @@ -141,13 +144,20 @@ class TimerSpec extends TestKit("TimerSpec") val ref = spawn(behv) probe.expectMessage(Tock(1)) ref ! Cancel + probe.fishForMessage(3.seconds) { + // we don't know that we will see exactly one tock + case _: Tock ⇒ FishingOutcomes.Continue + // but we know that after we saw Cancelled we won't see any more + case Cancelled ⇒ FishingOutcomes.Complete + case msg ⇒ FishingOutcomes.Fail(s"unexpected msg: $msg") + } probe.expectNoMessage(interval + 100.millis.dilated) ref ! End probe.expectMessage(GotPostStop(false)) } - "discard timers from old incarnation after restart, alt 1" in { + "discard timers from old incarnation after restart, alt 1" taggedAs TimingTest in { val probe = TestProbe[Event]("evt") val startCounter = new AtomicInteger(0) val behv = Behaviors.supervise(Behaviors.withTimers[Command] { timer ⇒ @@ -171,7 +181,7 @@ class TimerSpec extends TestKit("TimerSpec") probe.expectMessage(GotPostStop(false)) } - "discard timers from old incarnation after restart, alt 2" in { + "discard timers from old incarnation after restart, alt 2" taggedAs TimingTest in { val probe = TestProbe[Event]("evt") val behv = Behaviors.supervise(Behaviors.withTimers[Command] { timer ⇒ timer.startPeriodicTimer("T", Tick(1), interval) @@ -197,7 +207,7 @@ class TimerSpec extends TestKit("TimerSpec") probe.expectMessage(GotPostStop(false)) } - "cancel timers when stopped from exception" in { + "cancel timers when stopped from exception" taggedAs TimingTest in { val probe = TestProbe[Event]() val behv = Behaviors.withTimers[Command] { timer ⇒ timer.startPeriodicTimer("T", Tick(1), interval) @@ -208,7 +218,7 @@ class TimerSpec extends TestKit("TimerSpec") probe.expectMessage(GotPostStop(false)) } - "cancel timers when stopped voluntarily" in { + "cancel timers when stopped voluntarily" taggedAs TimingTest in { val probe = TestProbe[Event]() val behv = Behaviors.withTimers[Command] { timer ⇒ timer.startPeriodicTimer("T", Tick(1), interval) diff --git a/akka-testkit-typed/src/main/scala/akka/testkit/typed/TestKit.scala b/akka-testkit-typed/src/main/scala/akka/testkit/typed/TestKit.scala index ff720fb57a..9e2a4a6539 100644 --- a/akka-testkit-typed/src/main/scala/akka/testkit/typed/TestKit.scala +++ b/akka-testkit-typed/src/main/scala/akka/testkit/typed/TestKit.scala @@ -3,7 +3,7 @@ package akka.testkit.typed import akka.actor.typed.scaladsl.Behaviors import akka.actor.typed.scaladsl.AskPattern._ import akka.actor.typed.{ ActorRef, ActorSystem, Behavior, Props } -import akka.annotation.ApiMayChange +import akka.annotation.{ ApiMayChange, DoNotInherit } import akka.testkit.typed.TestKit._ import akka.util.Timeout import com.typesafe.config.Config @@ -17,6 +17,14 @@ import scala.util.control.NoStackTrace */ case class TE(message: String) extends RuntimeException(message) with NoStackTrace +/** + * Not for user extension. + * + * Instances are available from `FishingOutcomes` in the respective dsls: [[akka.testkit.typed.scaladsl.FishingOutcomes]] + * and [[akka.testkit.typed.javadsl.FishingOutcomes]] + */ +@DoNotInherit abstract class FishingOutcome private[akka] () + object TestKit { private[akka] sealed trait TestKitCommand diff --git a/akka-testkit-typed/src/main/scala/akka/testkit/typed/javadsl/TestProbe.scala b/akka-testkit-typed/src/main/scala/akka/testkit/typed/javadsl/TestProbe.scala index 673f7d47ae..acfb3b8de3 100644 --- a/akka-testkit-typed/src/main/scala/akka/testkit/typed/javadsl/TestProbe.scala +++ b/akka-testkit-typed/src/main/scala/akka/testkit/typed/javadsl/TestProbe.scala @@ -4,6 +4,28 @@ package akka.testkit.typed.javadsl import akka.actor.typed.ActorSystem +import akka.testkit.typed.FishingOutcome + +import scala.concurrent.duration.FiniteDuration +import scala.collection.JavaConverters._ + +object FishingOutcomes { + + /** + * Consume this message and continue with the next + */ + def continue(): FishingOutcome = akka.testkit.typed.scaladsl.FishingOutcomes.Continue + + /** + * Complete fishing and return this message + */ + def complete(): FishingOutcome = akka.testkit.typed.scaladsl.FishingOutcomes.Complete + + /** + * Fail fishing with a custom error message + */ + def fail(error: String): FishingOutcome = akka.testkit.typed.scaladsl.FishingOutcomes.Fail(error) +} /** * Java API: @@ -18,4 +40,31 @@ class TestProbe[M](name: String, system: ActorSystem[_]) extends akka.testkit.ty def expectMessageType[T <: M](t: Class[T]): T = expectMessageClass_internal(remainingOrDefault, t) + /** + * Java API: Allows for flexible matching of multiple messages within a timeout, the fisher function is fed each incoming + * message, and returns one of the following effects to decide on what happens next: + * + * * [[FishingOutcomes.continue()]] - continue with the next message given that the timeout has not been reached + * * [[FishingOutcomes.complete()]] - successfully complete and return the message + * * [[FishingOutcomes.fail(errorMsg)]] - fail the test with a custom message + * + * Additionally failures includes the list of messages consumed. If a message of type `M` but not of type `T` is + * received this will also fail the test, additionally if the `fisher` function throws a match error the error + * is decorated with some fishing details and the test is failed (making it convenient to use this method with a + * partial function). + * + * @param max Max total time without the fisher function returning `CompleteFishing` before failing + * The timeout is dilated. + * @return The messages accepted in the order they arrived + */ + // FIXME same name would cause ambiguity but I'm out of ideas how to fix, separate Scala/Java TestProbe APIs? + def fishForMessageJava(max: FiniteDuration, fisher: java.util.function.Function[M, FishingOutcome]): java.util.List[M] = + fishForMessage(max)(fisher.apply).asJava + + /** + * Same as the other `fishForMessageJava` but includes the provided hint in all error messages + */ + def fishForMessageJava(max: FiniteDuration, hint: String, fisher: java.util.function.Function[M, FishingOutcome]): java.util.List[M] = + fishForMessage(max, hint)(fisher.apply).asJava + } diff --git a/akka-testkit-typed/src/main/scala/akka/testkit/typed/scaladsl/TestProbe.scala b/akka-testkit-typed/src/main/scala/akka/testkit/typed/scaladsl/TestProbe.scala index 965fb13bf6..a3bc3fefa1 100644 --- a/akka-testkit-typed/src/main/scala/akka/testkit/typed/scaladsl/TestProbe.scala +++ b/akka-testkit-typed/src/main/scala/akka/testkit/typed/scaladsl/TestProbe.scala @@ -11,11 +11,12 @@ import akka.actor.typed.scaladsl.Behaviors import java.util.concurrent.LinkedBlockingDeque import java.util.concurrent.atomic.AtomicInteger +import akka.annotation.DoNotInherit import akka.util.Timeout import akka.util.PrettyDuration.PrettyPrintableDuration import scala.concurrent.Await -import akka.testkit.typed.TestKitSettings +import akka.testkit.typed.{ FishingOutcome, TestKitSettings } import akka.util.BoxedType import scala.annotation.tailrec @@ -45,6 +46,29 @@ object TestProbe { } } +object FishingOutcomes { + + /** + * Consume this message, collect it into the result, and continue with the next message + */ + case object Continue extends FishingOutcome + + /** + * Consume this message, but do not collect it into the result, and continue with the next message + */ + case object ContinueAndIgnore extends FishingOutcome + + /** + * Complete fishing and return this message + */ + case object Complete extends FishingOutcome + + /** + * Fail fishing with a custom error message + */ + case class Fail(error: String) extends FishingOutcome +} + class TestProbe[M](name: String)(implicit system: ActorSystem[_]) { import TestProbe._ @@ -247,6 +271,56 @@ class TestProbe[M](name: String)(implicit system: ActorSystem[_]) { o.asInstanceOf[C] } + /** + * Allows for flexible matching of multiple messages within a timeout, the fisher function is fed each incoming + * message, and returns one of the following effects to decide on what happens next: + * + * * [[FishingOutcomes.Continue]] - continue with the next message given that the timeout has not been reached + * * [[FishingOutcomes.Complete]] - successfully complete and return the message + * * [[FishingOutcomes.Fail]] - fail the test with a custom message + * + * Additionally failures includes the list of messages consumed. If a message of type `M` but not of type `T` is + * received this will also fail the test, additionally if the `fisher` function throws a match error the error + * is decorated with some fishing details and the test is failed (making it convenient to use this method with a + * partial function). + * + * @param max Max total time without the fisher function returning `CompleteFishing` before failing + * The timeout is dilated. + * @return The messages accepted in the order they arrived + */ + def fishForMessage(max: FiniteDuration, hint: String = "")(fisher: M ⇒ FishingOutcome): List[M] = { + // not tailrec but that should be ok + def loop(timeout: FiniteDuration, seen: List[M]): List[M] = { + val start = System.nanoTime() + val msg = receiveOne(timeout) + try { + fisher(msg) match { + case FishingOutcomes.Complete ⇒ (msg :: seen).reverse + case FishingOutcomes.Fail(error) ⇒ throw new AssertionError(s"$error, hint: $hint") + case continue ⇒ + val newTimeout = + if (timeout.isFinite()) timeout - (System.nanoTime() - start).nanos + else timeout + if (newTimeout.toMillis <= 0) { + throw new AssertionError(s"timeout ($max) during fishForMessage, seen messages ${seen.reverse}, hint: $hint") + } else { + continue match { + case FishingOutcomes.Continue ⇒ loop(newTimeout, msg :: seen) + case FishingOutcomes.ContinueAndIgnore ⇒ loop(newTimeout, seen) + } + + } + } + } catch { + case ex: MatchError ⇒ throw new AssertionError( + s"Unexpected message $msg while fishing for messages, " + + s"seen messages ${seen.reverse}, hint: $hint", ex) + } + } + + loop(max.dilated, Nil) + } + /** * Expect the given actor to be stopped or stop withing the given timeout or * throw an [[AssertionError]]. diff --git a/akka-testkit-typed/src/test/scala/akka/testkit/typed/scaladsl/TestProbeSpec.scala b/akka-testkit-typed/src/test/scala/akka/testkit/typed/scaladsl/TestProbeSpec.scala index f270ed9eeb..ca701a2ecb 100644 --- a/akka-testkit-typed/src/test/scala/akka/testkit/typed/scaladsl/TestProbeSpec.scala +++ b/akka-testkit-typed/src/test/scala/akka/testkit/typed/scaladsl/TestProbeSpec.scala @@ -36,6 +36,61 @@ class TestProbeSpec extends TestKit with WordSpecLike with Matchers with BeforeA probe.expectTerminated(ref, 500.millis) } + "allow fishing for message" in { + + val probe = TestProbe[String]() + + probe.ref ! "one" + probe.ref ! "two" + + val result = probe.fishForMessage(300.millis) { + case "one" ⇒ FishingOutcomes.Continue + case "two" ⇒ FishingOutcomes.Complete + } + + result should ===(List("one", "two")) + } + + "allow failing when fishing for message" in { + + val probe = TestProbe[String]() + + probe.ref ! "one" + probe.ref ! "two" + + intercept[AssertionError] { + probe.fishForMessage(300.millis) { + case "one" ⇒ FishingOutcomes.Continue + case "two" ⇒ FishingOutcomes.Fail("not the fish I'm looking for") + } + } + } + + "fail for unknown message when fishing for messages" in { + val probe = TestProbe[String]() + + probe.ref ! "one" + probe.ref ! "two" + + intercept[AssertionError] { + probe.fishForMessage(300.millis) { + case "one" ⇒ FishingOutcomes.Continue + } + } + } + + "time out when fishing for messages" in { + val probe = TestProbe[String]() + + probe.ref ! "one" + + intercept[AssertionError] { + probe.fishForMessage(300.millis) { + case "one" ⇒ FishingOutcomes.Continue + } + } + } + } override protected def afterAll(): Unit = {