From 923a27cb6086461ad209dcaf9115057bb4b5baee Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Tue, 16 Oct 2018 15:20:05 +0200 Subject: [PATCH 1/2] Add receiveN to typed TestProbe * had to use a different name in javadsl because TestProbeImpl extends both and it would conflict with different return types --- .../typed/internal/TestProbeImpl.scala | 11 ++++++++++ .../testkit/typed/javadsl/TestProbe.scala | 18 +++++++++++++++ .../testkit/typed/scaladsl/TestProbe.scala | 13 +++++++++++ .../testkit/typed/javadsl/TestProbeTest.java | 4 ++++ .../typed/scaladsl/TestProbeSpec.scala | 22 +++++++++++++++++++ 5 files changed, 68 insertions(+) diff --git a/akka-actor-testkit-typed/src/main/scala/akka/actor/testkit/typed/internal/TestProbeImpl.scala b/akka-actor-testkit-typed/src/main/scala/akka/actor/testkit/typed/internal/TestProbeImpl.scala index 7ec80dede8..8cca226e6b 100644 --- a/akka-actor-testkit-typed/src/main/scala/akka/actor/testkit/typed/internal/TestProbeImpl.scala +++ b/akka-actor-testkit-typed/src/main/scala/akka/actor/testkit/typed/internal/TestProbeImpl.scala @@ -19,6 +19,7 @@ import akka.util.PrettyDuration._ import akka.util.{ BoxedType, Timeout } import akka.util.JavaDurationConverters._ import scala.annotation.tailrec +import scala.collection.immutable import scala.concurrent.Await import scala.concurrent.duration._ import scala.util.control.NonFatal @@ -180,6 +181,16 @@ private[akka] final class TestProbeImpl[M](name: String, system: ActorSystem[_]) o.asInstanceOf[C] } + override protected def receiveN_internal(n: Int, max: FiniteDuration): immutable.Seq[M] = { + val stop = max + now + for (x ← 1 to n) yield { + val timeout = stop - now + val o = receiveOne(timeout) + assert(o != null, s"timeout ($max) while expecting $n messages (got ${x - 1})") + o + } + } + override protected def fishForMessage_internal(max: FiniteDuration, hint: String, fisher: M ⇒ FishingOutcome): List[M] = { // not tailrec but that should be ok def loop(timeout: FiniteDuration, seen: List[M]): List[M] = { diff --git a/akka-actor-testkit-typed/src/main/scala/akka/actor/testkit/typed/javadsl/TestProbe.scala b/akka-actor-testkit-typed/src/main/scala/akka/actor/testkit/typed/javadsl/TestProbe.scala index fff86d8f3e..fb824b616b 100644 --- a/akka-actor-testkit-typed/src/main/scala/akka/actor/testkit/typed/javadsl/TestProbe.scala +++ b/akka-actor-testkit-typed/src/main/scala/akka/actor/testkit/typed/javadsl/TestProbe.scala @@ -6,6 +6,7 @@ package akka.actor.testkit.typed.javadsl import java.time.Duration import java.util.function.Supplier +import java.util.{ List ⇒ JList } import akka.actor.typed.{ ActorRef, ActorSystem } import akka.annotation.DoNotInherit @@ -13,6 +14,7 @@ import akka.actor.testkit.typed.internal.TestProbeImpl import akka.actor.testkit.typed.{ FishingOutcome, TestKitSettings } import akka.actor.testkit.typed.scaladsl.TestDuration import akka.util.JavaDurationConverters._ +import scala.collection.immutable import scala.collection.JavaConverters._ import scala.concurrent.duration.FiniteDuration @@ -218,6 +220,22 @@ abstract class TestProbe[M] { */ @InternalApi protected def expectMessageClass_internal[C](max: FiniteDuration, c: Class[C]): C + /** + * Same as `receiveN(n, remaining)` but correctly taking into account + * the timeFactor. + */ + def receiveMessages(n: Int): JList[M] = receiveN_internal(n, getRemainingOrDefault.asScala).asJava + + /** + * Receive `n` messages in a row before the given deadline. + */ + def receiveMessages(n: Int, max: Duration): JList[M] = receiveN_internal(n, max.asScala.dilated).asJava + + /** + * INTERNAL API + */ + @InternalApi protected def receiveN_internal(n: Int, max: FiniteDuration): immutable.Seq[M] + /** * Java API: Allows for flexible matching of multiple messages within a timeout, the fisher function is fed each incoming * message, and returns one of the following effects to decide on what happens next: diff --git a/akka-actor-testkit-typed/src/main/scala/akka/actor/testkit/typed/scaladsl/TestProbe.scala b/akka-actor-testkit-typed/src/main/scala/akka/actor/testkit/typed/scaladsl/TestProbe.scala index 3e47167a82..87b8884366 100644 --- a/akka-actor-testkit-typed/src/main/scala/akka/actor/testkit/typed/scaladsl/TestProbe.scala +++ b/akka-actor-testkit-typed/src/main/scala/akka/actor/testkit/typed/scaladsl/TestProbe.scala @@ -153,6 +153,19 @@ object TestProbe { protected def expectMessageClass_internal[C](max: FiniteDuration, c: Class[C]): C + /** + * Same as `receiveN(n, remaining)` but correctly taking into account + * the timeFactor. + */ + def receiveN(n: Int): immutable.Seq[M] = receiveN_internal(n, remainingOrDefault) + + /** + * Receive `n` messages in a row before the given deadline. + */ + def receiveN(n: Int, max: FiniteDuration): immutable.Seq[M] = receiveN_internal(n, max.dilated) + + protected def receiveN_internal(n: Int, max: FiniteDuration): immutable.Seq[M] + /** * Allows for flexible matching of multiple messages within a timeout, the fisher function is fed each incoming * message, and returns one of the following effects to decide on what happens next: diff --git a/akka-actor-testkit-typed/src/test/java/akka/actor/testkit/typed/javadsl/TestProbeTest.java b/akka-actor-testkit-typed/src/test/java/akka/actor/testkit/typed/javadsl/TestProbeTest.java index 1e573c16ea..60154b4fc4 100644 --- a/akka-actor-testkit-typed/src/test/java/akka/actor/testkit/typed/javadsl/TestProbeTest.java +++ b/akka-actor-testkit-typed/src/test/java/akka/actor/testkit/typed/javadsl/TestProbeTest.java @@ -8,6 +8,7 @@ import akka.actor.typed.ActorRef; import akka.actor.typed.ActorSystem; import java.time.Duration; +import java.util.List; public class TestProbeTest { @@ -47,5 +48,8 @@ public class TestProbeTest { return "result"; }); + List messages1 = probe.receiveMessages(3); + List messages2 = probe.receiveMessages(3, Duration.ofSeconds(5)); + } } diff --git a/akka-actor-testkit-typed/src/test/scala/akka/actor/testkit/typed/scaladsl/TestProbeSpec.scala b/akka-actor-testkit-typed/src/test/scala/akka/actor/testkit/typed/scaladsl/TestProbeSpec.scala index 40da1e9e2f..e27178e4cc 100644 --- a/akka-actor-testkit-typed/src/test/scala/akka/actor/testkit/typed/scaladsl/TestProbeSpec.scala +++ b/akka-actor-testkit-typed/src/test/scala/akka/actor/testkit/typed/scaladsl/TestProbeSpec.scala @@ -120,6 +120,28 @@ class TestProbeSpec extends ScalaTestWithActorTestKit with WordSpecLike { } } + "allow receiving N messages" in { + val probe = TestProbe[String]() + + probe.ref ! "one" + probe.ref ! "two" + probe.ref ! "three" + + val result = probe.receiveN(3) + + result should ===(List("one", "two", "three")) + } + + "time out when not receiving N messages" in { + val probe = TestProbe[String]() + + probe.ref ! "one" + + intercept[AssertionError] { + probe.receiveN(3, 50.millis) + } + } + } } From baf14589d2a76bdf149c2106270b1ca018998078 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Tue, 16 Oct 2018 15:28:11 +0200 Subject: [PATCH 2/2] Hardening of typed./ClusterShardingSpec, #25794 * The replies didn't change after the leaving * I see two reason why it could have failed * The test is sending the same messages as the very first thing earlier in the test and then sharding might not now about the two nodes and therefore allocated all to one node * All messages are hashed to the same node/shard --- .../typed/scaladsl/ClusterShardingSpec.scala | 22 +++++++++---------- 1 file changed, 10 insertions(+), 12 deletions(-) diff --git a/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/scaladsl/ClusterShardingSpec.scala b/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/scaladsl/ClusterShardingSpec.scala index bf409f2b98..1d021293ca 100644 --- a/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/scaladsl/ClusterShardingSpec.scala +++ b/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/scaladsl/ClusterShardingSpec.scala @@ -393,13 +393,13 @@ class ClusterShardingSpec extends ScalaTestWithActorTestKit(ClusterShardingSpec. } "use the stopMessage for leaving/rebalance" in { - var replies1 = Set.empty[String] - (1 to 10).foreach { n ⇒ - val p = TestProbe[String]() - shardingRef1 ! ShardingEnvelope(s"test$n", WhoAreYou(p.ref)) - replies1 += p.expectMessageType[String] + // use many entites to reduce the risk that all are hashed to the same shard/node + val numberOfEntities = 100 + val probe1 = TestProbe[String]() + (1 to numberOfEntities).foreach { n ⇒ + shardingRef1 ! ShardingEnvelope(s"test$n", WhoAreYou(probe1.ref)) } - replies1.size should ===(10) + val replies1 = probe1.receiveN(numberOfEntities, 10.seconds) Cluster(system2).manager ! Leave(Cluster(system2).selfMember.address) @@ -408,13 +408,11 @@ class ClusterShardingSpec extends ScalaTestWithActorTestKit(ClusterShardingSpec. Cluster(system2).isTerminated should ===(true) } - var replies2 = Set.empty[String] - (1 to 10).foreach { n ⇒ - val p = TestProbe[String]() - shardingRef1 ! ShardingEnvelope(s"test$n", WhoAreYou(p.ref)) - replies2 += p.expectMessageType[String](10.seconds) + val probe2 = TestProbe[String]() + (1 to numberOfEntities).foreach { n ⇒ + shardingRef1 ! ShardingEnvelope(s"test$n", WhoAreYou(probe2.ref)) } - replies2.size should ===(10) + val replies2 = probe2.receiveN(numberOfEntities, 10.seconds) replies2 should !==(replies1) // different addresses } }