Merge pull request #25801 from akka/wip-25794-ClusterShardingSpec-patriknw

Hardening of typed./ClusterShardingSpec, #25794
This commit is contained in:
Patrik Nordwall 2018-11-09 09:53:14 +01:00 committed by GitHub
commit 42adfd781c
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 78 additions and 12 deletions

View file

@ -19,6 +19,7 @@ import akka.util.PrettyDuration._
import akka.util.{ BoxedType, Timeout }
import akka.util.JavaDurationConverters._
import scala.annotation.tailrec
import scala.collection.immutable
import scala.concurrent.Await
import scala.concurrent.duration._
import scala.util.control.NonFatal
@ -180,6 +181,16 @@ private[akka] final class TestProbeImpl[M](name: String, system: ActorSystem[_])
o.asInstanceOf[C]
}
override protected def receiveN_internal(n: Int, max: FiniteDuration): immutable.Seq[M] = {
val stop = max + now
for (x 1 to n) yield {
val timeout = stop - now
val o = receiveOne(timeout)
assert(o != null, s"timeout ($max) while expecting $n messages (got ${x - 1})")
o
}
}
override protected def fishForMessage_internal(max: FiniteDuration, hint: String, fisher: M FishingOutcome): List[M] = {
// not tailrec but that should be ok
def loop(timeout: FiniteDuration, seen: List[M]): List[M] = {

View file

@ -6,6 +6,7 @@ package akka.actor.testkit.typed.javadsl
import java.time.Duration
import java.util.function.Supplier
import java.util.{ List JList }
import akka.actor.typed.{ ActorRef, ActorSystem }
import akka.annotation.DoNotInherit
@ -13,6 +14,7 @@ import akka.actor.testkit.typed.internal.TestProbeImpl
import akka.actor.testkit.typed.{ FishingOutcome, TestKitSettings }
import akka.actor.testkit.typed.scaladsl.TestDuration
import akka.util.JavaDurationConverters._
import scala.collection.immutable
import scala.collection.JavaConverters._
import scala.concurrent.duration.FiniteDuration
@ -218,6 +220,22 @@ abstract class TestProbe[M] {
*/
@InternalApi protected def expectMessageClass_internal[C](max: FiniteDuration, c: Class[C]): C
/**
* Same as `receiveN(n, remaining)` but correctly taking into account
* the timeFactor.
*/
def receiveMessages(n: Int): JList[M] = receiveN_internal(n, getRemainingOrDefault.asScala).asJava
/**
* Receive `n` messages in a row before the given deadline.
*/
def receiveMessages(n: Int, max: Duration): JList[M] = receiveN_internal(n, max.asScala.dilated).asJava
/**
* INTERNAL API
*/
@InternalApi protected def receiveN_internal(n: Int, max: FiniteDuration): immutable.Seq[M]
/**
* Java API: Allows for flexible matching of multiple messages within a timeout, the fisher function is fed each incoming
* message, and returns one of the following effects to decide on what happens next:

View file

@ -153,6 +153,19 @@ object TestProbe {
protected def expectMessageClass_internal[C](max: FiniteDuration, c: Class[C]): C
/**
* Same as `receiveN(n, remaining)` but correctly taking into account
* the timeFactor.
*/
def receiveN(n: Int): immutable.Seq[M] = receiveN_internal(n, remainingOrDefault)
/**
* Receive `n` messages in a row before the given deadline.
*/
def receiveN(n: Int, max: FiniteDuration): immutable.Seq[M] = receiveN_internal(n, max.dilated)
protected def receiveN_internal(n: Int, max: FiniteDuration): immutable.Seq[M]
/**
* Allows for flexible matching of multiple messages within a timeout, the fisher function is fed each incoming
* message, and returns one of the following effects to decide on what happens next:

View file

@ -8,6 +8,7 @@ import akka.actor.typed.ActorRef;
import akka.actor.typed.ActorSystem;
import java.time.Duration;
import java.util.List;
public class TestProbeTest {
@ -47,5 +48,8 @@ public class TestProbeTest {
return "result";
});
List<String> messages1 = probe.receiveMessages(3);
List<String> messages2 = probe.receiveMessages(3, Duration.ofSeconds(5));
}
}

View file

@ -120,6 +120,28 @@ class TestProbeSpec extends ScalaTestWithActorTestKit with WordSpecLike {
}
}
"allow receiving N messages" in {
val probe = TestProbe[String]()
probe.ref ! "one"
probe.ref ! "two"
probe.ref ! "three"
val result = probe.receiveN(3)
result should ===(List("one", "two", "three"))
}
"time out when not receiving N messages" in {
val probe = TestProbe[String]()
probe.ref ! "one"
intercept[AssertionError] {
probe.receiveN(3, 50.millis)
}
}
}
}

View file

@ -420,13 +420,13 @@ class ClusterShardingSpec extends ScalaTestWithActorTestKit(ClusterShardingSpec.
}
"use the stopMessage for leaving/rebalance" in {
var replies1 = Set.empty[String]
(1 to 10).foreach { n
val p = TestProbe[String]()
shardingRef1 ! ShardingEnvelope(s"test$n", WhoAreYou(p.ref))
replies1 += p.expectMessageType[String]
// use many entites to reduce the risk that all are hashed to the same shard/node
val numberOfEntities = 100
val probe1 = TestProbe[String]()
(1 to numberOfEntities).foreach { n
shardingRef1 ! ShardingEnvelope(s"test$n", WhoAreYou(probe1.ref))
}
replies1.size should ===(10)
val replies1 = probe1.receiveN(numberOfEntities, 10.seconds)
Cluster(system2).manager ! Leave(Cluster(system2).selfMember.address)
@ -435,13 +435,11 @@ class ClusterShardingSpec extends ScalaTestWithActorTestKit(ClusterShardingSpec.
Cluster(system2).isTerminated should ===(true)
}
var replies2 = Set.empty[String]
(1 to 10).foreach { n
val p = TestProbe[String]()
shardingRef1 ! ShardingEnvelope(s"test$n", WhoAreYou(p.ref))
replies2 += p.expectMessageType[String](10.seconds)
val probe2 = TestProbe[String]()
(1 to numberOfEntities).foreach { n
shardingRef1 ! ShardingEnvelope(s"test$n", WhoAreYou(probe2.ref))
}
replies2.size should ===(10)
val replies2 = probe2.receiveN(numberOfEntities, 10.seconds)
replies2 should !==(replies1) // different addresses
}
}