Add receiveOne to TestProbe in Typed (#25914)
This commit is contained in:
parent
f6200cc07f
commit
e940643c12
7 changed files with 134 additions and 63 deletions
|
|
@ -13,7 +13,7 @@ import akka.annotation.InternalApi
|
|||
import akka.actor.testkit.typed.Effect
|
||||
import akka.actor.testkit.typed.Effect._
|
||||
|
||||
import scala.concurrent.duration.{ Duration, FiniteDuration }
|
||||
import scala.concurrent.duration.FiniteDuration
|
||||
import scala.reflect.ClassTag
|
||||
import scala.compat.java8.FunctionConverters._
|
||||
|
||||
|
|
|
|||
|
|
@ -4,7 +4,6 @@
|
|||
|
||||
package akka.actor.testkit.typed.internal
|
||||
|
||||
import java.time
|
||||
import java.util.concurrent.atomic.AtomicInteger
|
||||
import java.util.concurrent.{ BlockingDeque, LinkedBlockingDeque }
|
||||
import java.util.function.Supplier
|
||||
|
|
@ -132,7 +131,7 @@ private[akka] final class TestProbeImpl[M](name: String, system: ActorSystem[_])
|
|||
expectMessage(max.asScala, hint, obj)
|
||||
|
||||
private def expectMessage_internal[T <: M](max: Duration, obj: T, hint: Option[String] = None): T = {
|
||||
val o = receiveOne(max)
|
||||
val o = receiveOne_internal(max)
|
||||
val hintOrEmptyString = hint.map(": " + _).getOrElse("")
|
||||
o match {
|
||||
case Some(m) if obj == m ⇒ m.asInstanceOf[T]
|
||||
|
|
@ -141,13 +140,21 @@ private[akka] final class TestProbeImpl[M](name: String, system: ActorSystem[_])
|
|||
}
|
||||
}
|
||||
|
||||
override def receiveOne(): M = receiveOne(remainingOrDefault)
|
||||
|
||||
override def receiveOne(max: java.time.Duration): M = receiveOne(max.asScala)
|
||||
|
||||
def receiveOne(max: FiniteDuration): M =
|
||||
receiveOne_internal(max.dilated).
|
||||
getOrElse(assertFail(s"Timeout ($max) during receiveOne while waiting for message."))
|
||||
|
||||
/**
|
||||
* Receive one message from the internal queue of the TestActor. If the given
|
||||
* duration is zero, the queue is polled (non-blocking).
|
||||
*
|
||||
* This method does NOT automatically scale its Duration parameter!
|
||||
*/
|
||||
private def receiveOne(max: Duration): Option[M] = {
|
||||
private def receiveOne_internal(max: Duration): Option[M] = {
|
||||
val message = Option(
|
||||
if (max == Duration.Zero) {
|
||||
queue.pollFirst
|
||||
|
|
@ -172,20 +179,20 @@ private[akka] final class TestProbeImpl[M](name: String, system: ActorSystem[_])
|
|||
expectNoMessage_internal(settings.ExpectNoMessageDefaultTimeout.dilated)
|
||||
|
||||
private def expectNoMessage_internal(max: FiniteDuration): Unit = {
|
||||
val o = receiveOne(max)
|
||||
val o = receiveOne_internal(max)
|
||||
o match {
|
||||
case None ⇒ lastWasNoMessage = true
|
||||
case Some(m) ⇒ assertFail(s"received unexpected message $m")
|
||||
case Some(m) ⇒ assertFail(s"Received unexpected message $m")
|
||||
}
|
||||
}
|
||||
|
||||
override protected def expectMessageClass_internal[C](max: FiniteDuration, c: Class[C]): C = {
|
||||
val o = receiveOne(max)
|
||||
val o = receiveOne_internal(max)
|
||||
val bt = BoxedType(c)
|
||||
o match {
|
||||
case Some(m) if bt isInstance m ⇒ m.asInstanceOf[C]
|
||||
case Some(m) ⇒ assertFail(s"expected $c, found ${m.getClass} ($m)")
|
||||
case None ⇒ assertFail(s"timeout ($max) during expectMessageClass waiting for $c")
|
||||
case Some(m) ⇒ assertFail(s"Expected $c, found ${m.getClass} ($m)")
|
||||
case None ⇒ assertFail(s"Timeout ($max) during expectMessageClass waiting for $c")
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -193,7 +200,7 @@ private[akka] final class TestProbeImpl[M](name: String, system: ActorSystem[_])
|
|||
val stop = max + now
|
||||
for (x ← 1 to n) yield {
|
||||
val timeout = stop - now
|
||||
val o = receiveOne(timeout)
|
||||
val o = receiveOne_internal(timeout)
|
||||
o match {
|
||||
case Some(m) ⇒ m
|
||||
case None ⇒ assertFail(s"timeout ($max) while expecting $n messages (got ${x - 1})")
|
||||
|
|
@ -204,7 +211,7 @@ private[akka] final class TestProbeImpl[M](name: String, system: ActorSystem[_])
|
|||
override protected def fishForMessage_internal(max: FiniteDuration, hint: String, fisher: M ⇒ FishingOutcome): List[M] = {
|
||||
@tailrec def loop(timeout: FiniteDuration, seen: List[M]): List[M] = {
|
||||
val start = System.nanoTime()
|
||||
val maybeMsg = receiveOne(timeout)
|
||||
val maybeMsg = receiveOne_internal(timeout)
|
||||
maybeMsg match {
|
||||
case Some(message) ⇒
|
||||
val outcome = try fisher(message) catch {
|
||||
|
|
|
|||
|
|
@ -7,7 +7,6 @@ package akka.actor.testkit.typed.javadsl
|
|||
import java.time.Duration
|
||||
|
||||
import akka.actor.typed.{ ActorRef, Behavior, Props }
|
||||
import akka.actor.testkit.typed.Effect
|
||||
import akka.util.JavaDurationConverters._
|
||||
|
||||
/**
|
||||
|
|
|
|||
|
|
@ -132,7 +132,8 @@ abstract class TestProbe[M] {
|
|||
@InternalApi protected def within_internal[T](min: FiniteDuration, max: FiniteDuration, f: ⇒ T): T
|
||||
|
||||
/**
|
||||
* Same as `expectMessage(remainingOrDefault, obj)`, but correctly treating the timeFactor.
|
||||
* Same as `expectMessage(remainingOrDefault, obj)`, but using the
|
||||
* default timeout as deadline.
|
||||
*/
|
||||
def expectMessage[T <: M](obj: T): T
|
||||
|
||||
|
|
@ -203,7 +204,8 @@ abstract class TestProbe[M] {
|
|||
// FIXME awaitAssert(Procedure): Unit would be nice for java people to not have to return null
|
||||
|
||||
/**
|
||||
* Same as `expectMessageType(clazz, remainingOrDefault)`, but correctly treating the timeFactor.
|
||||
* Same as `expectMessageType(clazz, remainingOrDefault)`,but using the
|
||||
* default timeout as deadline.
|
||||
*/
|
||||
def expectMessageClass[T](clazz: Class[T]): T =
|
||||
expectMessageClass_internal(getRemainingOrDefault.asScala, clazz)
|
||||
|
|
@ -221,8 +223,18 @@ abstract class TestProbe[M] {
|
|||
@InternalApi protected def expectMessageClass_internal[C](max: FiniteDuration, c: Class[C]): C
|
||||
|
||||
/**
|
||||
* Same as `receiveN(n, remaining)` but correctly taking into account
|
||||
* the timeFactor.
|
||||
* Receive one message of type `M` within the default timeout as deadline.
|
||||
*/
|
||||
def receiveOne(): M
|
||||
|
||||
/**
|
||||
* Receive one message of type `M`. Wait time is bounded by the `max` duration,
|
||||
* with an [[AssertionError]] raised in case of timeout.
|
||||
*/
|
||||
def receiveOne(max: Duration): M
|
||||
|
||||
/**
|
||||
* Same as `receiveMessages(n, remaining)` but using the default timeout as deadline.
|
||||
*/
|
||||
def receiveMessages(n: Int): JList[M] = receiveN_internal(n, getRemainingOrDefault.asScala).asJava
|
||||
|
||||
|
|
|
|||
|
|
@ -105,7 +105,7 @@ object TestProbe {
|
|||
protected def within_internal[T](min: FiniteDuration, max: FiniteDuration, f: ⇒ T): T
|
||||
|
||||
/**
|
||||
* Same as `expectMessage(remainingOrDefault, obj)`, but correctly treating the timeFactor.
|
||||
* Same as `expectMessage(remainingOrDefault, obj)`, but using the default timeout as deadline.
|
||||
*/
|
||||
def expectMessage[T <: M](obj: T): T
|
||||
|
||||
|
|
@ -140,7 +140,7 @@ object TestProbe {
|
|||
def expectNoMessage(): Unit
|
||||
|
||||
/**
|
||||
* Same as `expectMessageType[T](remainingOrDefault)`, but correctly treating the timeFactor.
|
||||
* Same as `expectMessageType[T](remainingOrDefault)`, but using the default timeout as deadline.
|
||||
*/
|
||||
def expectMessageType[T <: M](implicit t: ClassTag[T]): T =
|
||||
expectMessageClass_internal(remainingOrDefault, t.runtimeClass.asInstanceOf[Class[T]])
|
||||
|
|
@ -154,8 +154,18 @@ object TestProbe {
|
|||
protected def expectMessageClass_internal[C](max: FiniteDuration, c: Class[C]): C
|
||||
|
||||
/**
|
||||
* Same as `receiveN(n, remaining)` but correctly taking into account
|
||||
* the timeFactor.
|
||||
* Receive one message of type `M` within the default timeout as deadline.
|
||||
*/
|
||||
def receiveOne(): M
|
||||
|
||||
/**
|
||||
* Receive one message of type `M`. Wait time is bounded by the `max` duration,
|
||||
* with an [[AssertionError]] raised in case of timeout.
|
||||
*/
|
||||
def receiveOne(max: FiniteDuration): M
|
||||
|
||||
/**
|
||||
* Same as `receiveN(n, remaining)` but using the default timeout as deadline.
|
||||
*/
|
||||
def receiveN(n: Int): immutable.Seq[M] = receiveN_internal(n, remainingOrDefault)
|
||||
|
||||
|
|
|
|||
|
|
@ -4,52 +4,54 @@
|
|||
|
||||
package akka.actor.testkit.typed.javadsl;
|
||||
|
||||
import akka.actor.typed.ActorRef;
|
||||
import akka.actor.typed.ActorSystem;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.List;
|
||||
|
||||
public class TestProbeTest {
|
||||
import akka.actor.testkit.typed.scaladsl.TestProbeSpec;
|
||||
import akka.actor.testkit.typed.scaladsl.TestProbeSpec.*;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Test;
|
||||
import org.scalatest.junit.JUnitSuite;
|
||||
|
||||
public static void compileOnlyApiTest() {
|
||||
ActorSystem<Object> system = null;
|
||||
TestProbe<String> probe = TestProbe.create(system);
|
||||
probe.ref();
|
||||
probe.awaitAssert(() -> {
|
||||
// ... something ...
|
||||
return null;
|
||||
import static org.junit.Assert.*;
|
||||
|
||||
public class TestProbeTest extends JUnitSuite {
|
||||
|
||||
@ClassRule
|
||||
public static TestKitJunitResource testKit = new TestKitJunitResource();
|
||||
|
||||
@Test
|
||||
public void testReceiveOne() {
|
||||
TestProbe<EventT> probe = TestProbe.create(testKit.system());
|
||||
|
||||
List<EventT> eventsT = akka.japi.Util.javaArrayList(TestProbeSpec.eventsT(10));
|
||||
|
||||
eventsT.forEach(e->{
|
||||
probe.getRef().tell(e);
|
||||
assertEquals(probe.receiveOne(), e);
|
||||
});
|
||||
probe.awaitAssert(Duration.ofSeconds(3), () -> {
|
||||
// ... something ...
|
||||
return null;
|
||||
});
|
||||
String awaitAssertResult =
|
||||
probe.awaitAssert(Duration.ofSeconds(3), Duration.ofMillis(100), () -> {
|
||||
// ... something ...
|
||||
return "some result";
|
||||
});
|
||||
String messageResult = probe.expectMessage("message");
|
||||
String expectClassResult = probe.expectMessageClass(String.class);
|
||||
|
||||
probe.expectNoMessage();
|
||||
|
||||
ActorRef<String> ref = null;
|
||||
probe.expectTerminated(ref, Duration.ofSeconds(1));
|
||||
|
||||
Duration remaining = probe.getRemaining();
|
||||
probe.fishForMessage(Duration.ofSeconds(3), "hint", (message) -> {
|
||||
if (message.equals("one")) return FishingOutcomes.continueAndIgnore();
|
||||
else if (message.equals("two")) return FishingOutcomes.complete();
|
||||
else return FishingOutcomes.fail("error");
|
||||
});
|
||||
|
||||
String withinResult = probe.within(Duration.ofSeconds(3), () -> {
|
||||
// ... something ...
|
||||
return "result";
|
||||
});
|
||||
|
||||
List<String> messages1 = probe.receiveMessages(3);
|
||||
List<String> messages2 = probe.receiveMessages(3, Duration.ofSeconds(5));
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReceiveOneMaxDuration() {
|
||||
TestProbe<EventT> probe = TestProbe.create(testKit.system());
|
||||
|
||||
List<EventT> eventsT = akka.japi.Util.javaArrayList(TestProbeSpec.eventsT(2));
|
||||
|
||||
eventsT.forEach(e->{
|
||||
probe.getRef().tell(e);
|
||||
assertEquals(probe.receiveOne(Duration.ofMillis(100)), e);
|
||||
});
|
||||
|
||||
probe.expectNoMessage();
|
||||
}
|
||||
|
||||
@Test(expected = AssertionError.class)
|
||||
public void testReceiveOneFailOnTimeout() {
|
||||
TestProbe<EventT> probe = TestProbe.create(testKit.system());
|
||||
probe.receiveOne(Duration.ofMillis(100));
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -5,11 +5,15 @@
|
|||
package akka.actor.testkit.typed.scaladsl
|
||||
|
||||
import akka.actor.typed.scaladsl.Behaviors
|
||||
import com.typesafe.config.ConfigFactory
|
||||
|
||||
import scala.concurrent.duration._
|
||||
import org.scalatest.WordSpecLike
|
||||
|
||||
class TestProbeSpec extends ScalaTestWithActorTestKit with WordSpecLike {
|
||||
|
||||
import TestProbeSpec._
|
||||
|
||||
def compileOnlyApiTest(): Unit = {
|
||||
val probe = TestProbe[AnyRef]()
|
||||
probe.fishForMessage(100.millis) {
|
||||
|
|
@ -31,7 +35,6 @@ class TestProbeSpec extends ScalaTestWithActorTestKit with WordSpecLike {
|
|||
"The test probe" must {
|
||||
|
||||
"allow probing for actor stop when actor already stopped" in {
|
||||
case object Stop
|
||||
val probe = TestProbe()
|
||||
val ref = spawn(Behaviors.stopped)
|
||||
probe.expectTerminated(ref, 100.millis)
|
||||
|
|
@ -142,6 +145,44 @@ class TestProbeSpec extends ScalaTestWithActorTestKit with WordSpecLike {
|
|||
}
|
||||
}
|
||||
|
||||
}
|
||||
"allow receiving one message of type TestProbe[M]" in {
|
||||
val probe = createTestProbe[EventT]()
|
||||
eventsT(10).forall { e ⇒
|
||||
probe.ref ! e
|
||||
probe.receiveOne == e
|
||||
} should ===(true)
|
||||
|
||||
probe.expectNoMessage()
|
||||
}
|
||||
|
||||
"timeout if expected single message is not received by a provided timeout" in {
|
||||
intercept[AssertionError](createTestProbe[EventT]().receiveOne(100.millis))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
object TestProbeSpec {
|
||||
|
||||
val timeoutConfig = ConfigFactory.parseString("""
|
||||
akka.actor.testkit.typed.default-timeout = 100ms
|
||||
akka.test.default-timeout = 100ms""")
|
||||
|
||||
/** Helper events for tests. */
|
||||
final case class EventT(id: Long)
|
||||
|
||||
/** Creates the `expected` number of events to test. */
|
||||
def eventsT(expected: Int): Seq[EventT] =
|
||||
for (n ← 1 to expected) yield EventT(n)
|
||||
}
|
||||
|
||||
class TestProbeTimeoutSpec extends ScalaTestWithActorTestKit(TestProbeSpec.timeoutConfig) with WordSpecLike {
|
||||
|
||||
import TestProbeSpec._
|
||||
|
||||
"The test probe" must {
|
||||
|
||||
"timeout if expected single message is not received by the default timeout" in {
|
||||
intercept[AssertionError](createTestProbe[EventT]().receiveOne())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue