diff --git a/akka-actor-testkit-typed/src/main/scala/akka/actor/testkit/typed/internal/TestProbeImpl.scala b/akka-actor-testkit-typed/src/main/scala/akka/actor/testkit/typed/internal/TestProbeImpl.scala index 5c1ad5b0d0..2baec5a30f 100644 --- a/akka-actor-testkit-typed/src/main/scala/akka/actor/testkit/typed/internal/TestProbeImpl.scala +++ b/akka-actor-testkit-typed/src/main/scala/akka/actor/testkit/typed/internal/TestProbeImpl.scala @@ -19,6 +19,7 @@ import akka.util.PrettyDuration._ import akka.util.{ BoxedType, Timeout } import akka.util.JavaDurationConverters._ import scala.annotation.tailrec +import scala.collection.immutable import scala.concurrent.Await import scala.concurrent.duration._ import scala.util.control.NonFatal @@ -180,6 +181,16 @@ private[akka] final class TestProbeImpl[M](name: String, system: ActorSystem[_]) o.asInstanceOf[C] } + override protected def receiveN_internal(n: Int, max: FiniteDuration): immutable.Seq[M] = { + val stop = max + now + for (x ← 1 to n) yield { + val timeout = stop - now + val o = receiveOne(timeout) + assert(o != null, s"timeout ($max) while expecting $n messages (got ${x - 1})") + o + } + } + override protected def fishForMessage_internal(max: FiniteDuration, hint: String, fisher: M ⇒ FishingOutcome): List[M] = { // not tailrec but that should be ok def loop(timeout: FiniteDuration, seen: List[M]): List[M] = { diff --git a/akka-actor-testkit-typed/src/main/scala/akka/actor/testkit/typed/javadsl/TestProbe.scala b/akka-actor-testkit-typed/src/main/scala/akka/actor/testkit/typed/javadsl/TestProbe.scala index 41170c2f67..4396707b2a 100644 --- a/akka-actor-testkit-typed/src/main/scala/akka/actor/testkit/typed/javadsl/TestProbe.scala +++ b/akka-actor-testkit-typed/src/main/scala/akka/actor/testkit/typed/javadsl/TestProbe.scala @@ -6,6 +6,7 @@ package akka.actor.testkit.typed.javadsl import java.time.Duration import java.util.function.Supplier +import java.util.{ List ⇒ JList } import akka.actor.typed.{ ActorRef, ActorSystem } import akka.annotation.DoNotInherit @@ -13,6 +14,7 @@ import akka.actor.testkit.typed.internal.TestProbeImpl import akka.actor.testkit.typed.{ FishingOutcome, TestKitSettings } import akka.actor.testkit.typed.scaladsl.TestDuration import akka.util.JavaDurationConverters._ +import scala.collection.immutable import scala.collection.JavaConverters._ import scala.concurrent.duration.FiniteDuration @@ -218,6 +220,22 @@ abstract class TestProbe[M] { */ @InternalApi protected def expectMessageClass_internal[C](max: FiniteDuration, c: Class[C]): C + /** + * Same as `receiveN(n, remaining)` but correctly taking into account + * the timeFactor. + */ + def receiveMessages(n: Int): JList[M] = receiveN_internal(n, getRemainingOrDefault.asScala).asJava + + /** + * Receive `n` messages in a row before the given deadline. + */ + def receiveMessages(n: Int, max: Duration): JList[M] = receiveN_internal(n, max.asScala.dilated).asJava + + /** + * INTERNAL API + */ + @InternalApi protected def receiveN_internal(n: Int, max: FiniteDuration): immutable.Seq[M] + /** * Java API: Allows for flexible matching of multiple messages within a timeout, the fisher function is fed each incoming * message, and returns one of the following effects to decide on what happens next: diff --git a/akka-actor-testkit-typed/src/main/scala/akka/actor/testkit/typed/scaladsl/TestProbe.scala b/akka-actor-testkit-typed/src/main/scala/akka/actor/testkit/typed/scaladsl/TestProbe.scala index 30a77d58a1..79fd383705 100644 --- a/akka-actor-testkit-typed/src/main/scala/akka/actor/testkit/typed/scaladsl/TestProbe.scala +++ b/akka-actor-testkit-typed/src/main/scala/akka/actor/testkit/typed/scaladsl/TestProbe.scala @@ -153,6 +153,19 @@ object TestProbe { protected def expectMessageClass_internal[C](max: FiniteDuration, c: Class[C]): C + /** + * Same as `receiveN(n, remaining)` but correctly taking into account + * the timeFactor. + */ + def receiveN(n: Int): immutable.Seq[M] = receiveN_internal(n, remainingOrDefault) + + /** + * Receive `n` messages in a row before the given deadline. + */ + def receiveN(n: Int, max: FiniteDuration): immutable.Seq[M] = receiveN_internal(n, max.dilated) + + protected def receiveN_internal(n: Int, max: FiniteDuration): immutable.Seq[M] + /** * Allows for flexible matching of multiple messages within a timeout, the fisher function is fed each incoming * message, and returns one of the following effects to decide on what happens next: diff --git a/akka-actor-testkit-typed/src/test/java/akka/actor/testkit/typed/javadsl/TestProbeTest.java b/akka-actor-testkit-typed/src/test/java/akka/actor/testkit/typed/javadsl/TestProbeTest.java index f535f36d1f..b8dfe6ec1b 100644 --- a/akka-actor-testkit-typed/src/test/java/akka/actor/testkit/typed/javadsl/TestProbeTest.java +++ b/akka-actor-testkit-typed/src/test/java/akka/actor/testkit/typed/javadsl/TestProbeTest.java @@ -8,6 +8,7 @@ import akka.actor.typed.ActorRef; import akka.actor.typed.ActorSystem; import java.time.Duration; +import java.util.List; public class TestProbeTest { @@ -47,5 +48,8 @@ public class TestProbeTest { return "result"; }); + List messages1 = probe.receiveMessages(3); + List messages2 = probe.receiveMessages(3, Duration.ofSeconds(5)); + } } diff --git a/akka-actor-testkit-typed/src/test/scala/akka/actor/testkit/typed/scaladsl/TestProbeSpec.scala b/akka-actor-testkit-typed/src/test/scala/akka/actor/testkit/typed/scaladsl/TestProbeSpec.scala index 256b1dbf3e..e964956c22 100644 --- a/akka-actor-testkit-typed/src/test/scala/akka/actor/testkit/typed/scaladsl/TestProbeSpec.scala +++ b/akka-actor-testkit-typed/src/test/scala/akka/actor/testkit/typed/scaladsl/TestProbeSpec.scala @@ -120,6 +120,28 @@ class TestProbeSpec extends ScalaTestWithActorTestKit with WordSpecLike { } } + "allow receiving N messages" in { + val probe = TestProbe[String]() + + probe.ref ! "one" + probe.ref ! "two" + probe.ref ! "three" + + val result = probe.receiveN(3) + + result should ===(List("one", "two", "three")) + } + + "time out when not receiving N messages" in { + val probe = TestProbe[String]() + + probe.ref ! "one" + + intercept[AssertionError] { + probe.receiveN(3, 50.millis) + } + } + } } diff --git a/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/scaladsl/ClusterShardingSpec.scala b/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/scaladsl/ClusterShardingSpec.scala index c4256d47a5..71f6e858fc 100644 --- a/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/scaladsl/ClusterShardingSpec.scala +++ b/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/scaladsl/ClusterShardingSpec.scala @@ -420,13 +420,13 @@ class ClusterShardingSpec extends ScalaTestWithActorTestKit(ClusterShardingSpec. } "use the stopMessage for leaving/rebalance" in { - var replies1 = Set.empty[String] - (1 to 10).foreach { n ⇒ - val p = TestProbe[String]() - shardingRef1 ! ShardingEnvelope(s"test$n", WhoAreYou(p.ref)) - replies1 += p.expectMessageType[String] + // use many entites to reduce the risk that all are hashed to the same shard/node + val numberOfEntities = 100 + val probe1 = TestProbe[String]() + (1 to numberOfEntities).foreach { n ⇒ + shardingRef1 ! ShardingEnvelope(s"test$n", WhoAreYou(probe1.ref)) } - replies1.size should ===(10) + val replies1 = probe1.receiveN(numberOfEntities, 10.seconds) Cluster(system2).manager ! Leave(Cluster(system2).selfMember.address) @@ -435,13 +435,11 @@ class ClusterShardingSpec extends ScalaTestWithActorTestKit(ClusterShardingSpec. Cluster(system2).isTerminated should ===(true) } - var replies2 = Set.empty[String] - (1 to 10).foreach { n ⇒ - val p = TestProbe[String]() - shardingRef1 ! ShardingEnvelope(s"test$n", WhoAreYou(p.ref)) - replies2 += p.expectMessageType[String](10.seconds) + val probe2 = TestProbe[String]() + (1 to numberOfEntities).foreach { n ⇒ + shardingRef1 ! ShardingEnvelope(s"test$n", WhoAreYou(probe2.ref)) } - replies2.size should ===(10) + val replies2 = probe2.receiveN(numberOfEntities, 10.seconds) replies2 should !==(replies1) // different addresses } }