diff --git a/akka-actor-testkit-typed/src/main/scala/akka/actor/testkit/typed/internal/EffectfulActorContext.scala b/akka-actor-testkit-typed/src/main/scala/akka/actor/testkit/typed/internal/EffectfulActorContext.scala index feb50dc2cd..88e36e1157 100644 --- a/akka-actor-testkit-typed/src/main/scala/akka/actor/testkit/typed/internal/EffectfulActorContext.scala +++ b/akka-actor-testkit-typed/src/main/scala/akka/actor/testkit/typed/internal/EffectfulActorContext.scala @@ -13,7 +13,7 @@ import akka.annotation.InternalApi import akka.actor.testkit.typed.Effect import akka.actor.testkit.typed.Effect._ -import scala.concurrent.duration.{ Duration, FiniteDuration } +import scala.concurrent.duration.FiniteDuration import scala.reflect.ClassTag import scala.compat.java8.FunctionConverters._ diff --git a/akka-actor-testkit-typed/src/main/scala/akka/actor/testkit/typed/internal/TestProbeImpl.scala b/akka-actor-testkit-typed/src/main/scala/akka/actor/testkit/typed/internal/TestProbeImpl.scala index 800d8743bc..a7954ba02f 100644 --- a/akka-actor-testkit-typed/src/main/scala/akka/actor/testkit/typed/internal/TestProbeImpl.scala +++ b/akka-actor-testkit-typed/src/main/scala/akka/actor/testkit/typed/internal/TestProbeImpl.scala @@ -4,7 +4,6 @@ package akka.actor.testkit.typed.internal -import java.time import java.util.concurrent.atomic.AtomicInteger import java.util.concurrent.{ BlockingDeque, LinkedBlockingDeque } import java.util.function.Supplier @@ -132,7 +131,7 @@ private[akka] final class TestProbeImpl[M](name: String, system: ActorSystem[_]) expectMessage(max.asScala, hint, obj) private def expectMessage_internal[T <: M](max: Duration, obj: T, hint: Option[String] = None): T = { - val o = receiveOne(max) + val o = receiveOne_internal(max) val hintOrEmptyString = hint.map(": " + _).getOrElse("") o match { case Some(m) if obj == m ⇒ m.asInstanceOf[T] @@ -141,13 +140,21 @@ private[akka] final class TestProbeImpl[M](name: String, system: ActorSystem[_]) } } + override def receiveOne(): M = receiveOne(remainingOrDefault) + + override def receiveOne(max: java.time.Duration): M = receiveOne(max.asScala) + + def receiveOne(max: FiniteDuration): M = + receiveOne_internal(max.dilated). + getOrElse(assertFail(s"Timeout ($max) during receiveOne while waiting for message.")) + /** * Receive one message from the internal queue of the TestActor. If the given * duration is zero, the queue is polled (non-blocking). * * This method does NOT automatically scale its Duration parameter! */ - private def receiveOne(max: Duration): Option[M] = { + private def receiveOne_internal(max: Duration): Option[M] = { val message = Option( if (max == Duration.Zero) { queue.pollFirst @@ -172,20 +179,20 @@ private[akka] final class TestProbeImpl[M](name: String, system: ActorSystem[_]) expectNoMessage_internal(settings.ExpectNoMessageDefaultTimeout.dilated) private def expectNoMessage_internal(max: FiniteDuration): Unit = { - val o = receiveOne(max) + val o = receiveOne_internal(max) o match { case None ⇒ lastWasNoMessage = true - case Some(m) ⇒ assertFail(s"received unexpected message $m") + case Some(m) ⇒ assertFail(s"Received unexpected message $m") } } override protected def expectMessageClass_internal[C](max: FiniteDuration, c: Class[C]): C = { - val o = receiveOne(max) + val o = receiveOne_internal(max) val bt = BoxedType(c) o match { case Some(m) if bt isInstance m ⇒ m.asInstanceOf[C] - case Some(m) ⇒ assertFail(s"expected $c, found ${m.getClass} ($m)") - case None ⇒ assertFail(s"timeout ($max) during expectMessageClass waiting for $c") + case Some(m) ⇒ assertFail(s"Expected $c, found ${m.getClass} ($m)") + case None ⇒ assertFail(s"Timeout ($max) during expectMessageClass waiting for $c") } } @@ -193,7 +200,7 @@ private[akka] final class TestProbeImpl[M](name: String, system: ActorSystem[_]) val stop = max + now for (x ← 1 to n) yield { val timeout = stop - now - val o = receiveOne(timeout) + val o = receiveOne_internal(timeout) o match { case Some(m) ⇒ m case None ⇒ assertFail(s"timeout ($max) while expecting $n messages (got ${x - 1})") @@ -204,7 +211,7 @@ private[akka] final class TestProbeImpl[M](name: String, system: ActorSystem[_]) override protected def fishForMessage_internal(max: FiniteDuration, hint: String, fisher: M ⇒ FishingOutcome): List[M] = { @tailrec def loop(timeout: FiniteDuration, seen: List[M]): List[M] = { val start = System.nanoTime() - val maybeMsg = receiveOne(timeout) + val maybeMsg = receiveOne_internal(timeout) maybeMsg match { case Some(message) ⇒ val outcome = try fisher(message) catch { diff --git a/akka-actor-testkit-typed/src/main/scala/akka/actor/testkit/typed/javadsl/Effects.scala b/akka-actor-testkit-typed/src/main/scala/akka/actor/testkit/typed/javadsl/Effects.scala index 35c138f51f..70597e4968 100644 --- a/akka-actor-testkit-typed/src/main/scala/akka/actor/testkit/typed/javadsl/Effects.scala +++ b/akka-actor-testkit-typed/src/main/scala/akka/actor/testkit/typed/javadsl/Effects.scala @@ -7,7 +7,6 @@ package akka.actor.testkit.typed.javadsl import java.time.Duration import akka.actor.typed.{ ActorRef, Behavior, Props } -import akka.actor.testkit.typed.Effect import akka.util.JavaDurationConverters._ /** diff --git a/akka-actor-testkit-typed/src/main/scala/akka/actor/testkit/typed/javadsl/TestProbe.scala b/akka-actor-testkit-typed/src/main/scala/akka/actor/testkit/typed/javadsl/TestProbe.scala index 4396707b2a..69ee22095d 100644 --- a/akka-actor-testkit-typed/src/main/scala/akka/actor/testkit/typed/javadsl/TestProbe.scala +++ b/akka-actor-testkit-typed/src/main/scala/akka/actor/testkit/typed/javadsl/TestProbe.scala @@ -132,7 +132,8 @@ abstract class TestProbe[M] { @InternalApi protected def within_internal[T](min: FiniteDuration, max: FiniteDuration, f: ⇒ T): T /** - * Same as `expectMessage(remainingOrDefault, obj)`, but correctly treating the timeFactor. + * Same as `expectMessage(remainingOrDefault, obj)`, but using the + * default timeout as deadline. */ def expectMessage[T <: M](obj: T): T @@ -203,7 +204,8 @@ abstract class TestProbe[M] { // FIXME awaitAssert(Procedure): Unit would be nice for java people to not have to return null /** - * Same as `expectMessageType(clazz, remainingOrDefault)`, but correctly treating the timeFactor. + * Same as `expectMessageType(clazz, remainingOrDefault)`,but using the + * default timeout as deadline. */ def expectMessageClass[T](clazz: Class[T]): T = expectMessageClass_internal(getRemainingOrDefault.asScala, clazz) @@ -221,8 +223,18 @@ abstract class TestProbe[M] { @InternalApi protected def expectMessageClass_internal[C](max: FiniteDuration, c: Class[C]): C /** - * Same as `receiveN(n, remaining)` but correctly taking into account - * the timeFactor. + * Receive one message of type `M` within the default timeout as deadline. + */ + def receiveOne(): M + + /** + * Receive one message of type `M`. Wait time is bounded by the `max` duration, + * with an [[AssertionError]] raised in case of timeout. + */ + def receiveOne(max: Duration): M + + /** + * Same as `receiveMessages(n, remaining)` but using the default timeout as deadline. */ def receiveMessages(n: Int): JList[M] = receiveN_internal(n, getRemainingOrDefault.asScala).asJava diff --git a/akka-actor-testkit-typed/src/main/scala/akka/actor/testkit/typed/scaladsl/TestProbe.scala b/akka-actor-testkit-typed/src/main/scala/akka/actor/testkit/typed/scaladsl/TestProbe.scala index 7d960ee258..568bee9c79 100644 --- a/akka-actor-testkit-typed/src/main/scala/akka/actor/testkit/typed/scaladsl/TestProbe.scala +++ b/akka-actor-testkit-typed/src/main/scala/akka/actor/testkit/typed/scaladsl/TestProbe.scala @@ -105,7 +105,7 @@ object TestProbe { protected def within_internal[T](min: FiniteDuration, max: FiniteDuration, f: ⇒ T): T /** - * Same as `expectMessage(remainingOrDefault, obj)`, but correctly treating the timeFactor. + * Same as `expectMessage(remainingOrDefault, obj)`, but using the default timeout as deadline. */ def expectMessage[T <: M](obj: T): T @@ -140,7 +140,7 @@ object TestProbe { def expectNoMessage(): Unit /** - * Same as `expectMessageType[T](remainingOrDefault)`, but correctly treating the timeFactor. + * Same as `expectMessageType[T](remainingOrDefault)`, but using the default timeout as deadline. */ def expectMessageType[T <: M](implicit t: ClassTag[T]): T = expectMessageClass_internal(remainingOrDefault, t.runtimeClass.asInstanceOf[Class[T]]) @@ -154,8 +154,18 @@ object TestProbe { protected def expectMessageClass_internal[C](max: FiniteDuration, c: Class[C]): C /** - * Same as `receiveN(n, remaining)` but correctly taking into account - * the timeFactor. + * Receive one message of type `M` within the default timeout as deadline. + */ + def receiveOne(): M + + /** + * Receive one message of type `M`. Wait time is bounded by the `max` duration, + * with an [[AssertionError]] raised in case of timeout. + */ + def receiveOne(max: FiniteDuration): M + + /** + * Same as `receiveN(n, remaining)` but using the default timeout as deadline. */ def receiveN(n: Int): immutable.Seq[M] = receiveN_internal(n, remainingOrDefault) diff --git a/akka-actor-testkit-typed/src/test/java/akka/actor/testkit/typed/javadsl/TestProbeTest.java b/akka-actor-testkit-typed/src/test/java/akka/actor/testkit/typed/javadsl/TestProbeTest.java index b8dfe6ec1b..807fe7e806 100644 --- a/akka-actor-testkit-typed/src/test/java/akka/actor/testkit/typed/javadsl/TestProbeTest.java +++ b/akka-actor-testkit-typed/src/test/java/akka/actor/testkit/typed/javadsl/TestProbeTest.java @@ -4,52 +4,54 @@ package akka.actor.testkit.typed.javadsl; -import akka.actor.typed.ActorRef; -import akka.actor.typed.ActorSystem; - import java.time.Duration; import java.util.List; -public class TestProbeTest { +import akka.actor.testkit.typed.scaladsl.TestProbeSpec; +import akka.actor.testkit.typed.scaladsl.TestProbeSpec.*; +import org.junit.ClassRule; +import org.junit.Test; +import org.scalatest.junit.JUnitSuite; - public static void compileOnlyApiTest() { - ActorSystem system = null; - TestProbe probe = TestProbe.create(system); - probe.ref(); - probe.awaitAssert(() -> { - // ... something ... - return null; +import static org.junit.Assert.*; + +public class TestProbeTest extends JUnitSuite { + + @ClassRule + public static TestKitJunitResource testKit = new TestKitJunitResource(); + + @Test + public void testReceiveOne() { + TestProbe probe = TestProbe.create(testKit.system()); + + List eventsT = akka.japi.Util.javaArrayList(TestProbeSpec.eventsT(10)); + + eventsT.forEach(e->{ + probe.getRef().tell(e); + assertEquals(probe.receiveOne(), e); }); - probe.awaitAssert(Duration.ofSeconds(3), () -> { - // ... something ... - return null; - }); - String awaitAssertResult = - probe.awaitAssert(Duration.ofSeconds(3), Duration.ofMillis(100), () -> { - // ... something ... - return "some result"; - }); - String messageResult = probe.expectMessage("message"); - String expectClassResult = probe.expectMessageClass(String.class); + probe.expectNoMessage(); - - ActorRef ref = null; - probe.expectTerminated(ref, Duration.ofSeconds(1)); - - Duration remaining = probe.getRemaining(); - probe.fishForMessage(Duration.ofSeconds(3), "hint", (message) -> { - if (message.equals("one")) return FishingOutcomes.continueAndIgnore(); - else if (message.equals("two")) return FishingOutcomes.complete(); - else return FishingOutcomes.fail("error"); - }); - - String withinResult = probe.within(Duration.ofSeconds(3), () -> { - // ... something ... - return "result"; - }); - - List messages1 = probe.receiveMessages(3); - List messages2 = probe.receiveMessages(3, Duration.ofSeconds(5)); - } + + @Test + public void testReceiveOneMaxDuration() { + TestProbe probe = TestProbe.create(testKit.system()); + + List eventsT = akka.japi.Util.javaArrayList(TestProbeSpec.eventsT(2)); + + eventsT.forEach(e->{ + probe.getRef().tell(e); + assertEquals(probe.receiveOne(Duration.ofMillis(100)), e); + }); + + probe.expectNoMessage(); + } + + @Test(expected = AssertionError.class) + public void testReceiveOneFailOnTimeout() { + TestProbe probe = TestProbe.create(testKit.system()); + probe.receiveOne(Duration.ofMillis(100)); + } + } diff --git a/akka-actor-testkit-typed/src/test/scala/akka/actor/testkit/typed/scaladsl/TestProbeSpec.scala b/akka-actor-testkit-typed/src/test/scala/akka/actor/testkit/typed/scaladsl/TestProbeSpec.scala index e964956c22..68cd8582dc 100644 --- a/akka-actor-testkit-typed/src/test/scala/akka/actor/testkit/typed/scaladsl/TestProbeSpec.scala +++ b/akka-actor-testkit-typed/src/test/scala/akka/actor/testkit/typed/scaladsl/TestProbeSpec.scala @@ -5,11 +5,15 @@ package akka.actor.testkit.typed.scaladsl import akka.actor.typed.scaladsl.Behaviors +import com.typesafe.config.ConfigFactory + import scala.concurrent.duration._ import org.scalatest.WordSpecLike class TestProbeSpec extends ScalaTestWithActorTestKit with WordSpecLike { + import TestProbeSpec._ + def compileOnlyApiTest(): Unit = { val probe = TestProbe[AnyRef]() probe.fishForMessage(100.millis) { @@ -31,7 +35,6 @@ class TestProbeSpec extends ScalaTestWithActorTestKit with WordSpecLike { "The test probe" must { "allow probing for actor stop when actor already stopped" in { - case object Stop val probe = TestProbe() val ref = spawn(Behaviors.stopped) probe.expectTerminated(ref, 100.millis) @@ -142,6 +145,44 @@ class TestProbeSpec extends ScalaTestWithActorTestKit with WordSpecLike { } } - } + "allow receiving one message of type TestProbe[M]" in { + val probe = createTestProbe[EventT]() + eventsT(10).forall { e ⇒ + probe.ref ! e + probe.receiveOne == e + } should ===(true) + probe.expectNoMessage() + } + + "timeout if expected single message is not received by a provided timeout" in { + intercept[AssertionError](createTestProbe[EventT]().receiveOne(100.millis)) + } + } +} + +object TestProbeSpec { + + val timeoutConfig = ConfigFactory.parseString(""" + akka.actor.testkit.typed.default-timeout = 100ms + akka.test.default-timeout = 100ms""") + + /** Helper events for tests. */ + final case class EventT(id: Long) + + /** Creates the `expected` number of events to test. */ + def eventsT(expected: Int): Seq[EventT] = + for (n ← 1 to expected) yield EventT(n) +} + +class TestProbeTimeoutSpec extends ScalaTestWithActorTestKit(TestProbeSpec.timeoutConfig) with WordSpecLike { + + import TestProbeSpec._ + + "The test probe" must { + + "timeout if expected single message is not received by the default timeout" in { + intercept[AssertionError](createTestProbe[EventT]().receiveOne()) + } + } }