parent
3ea59b1e76
commit
650490f68a
19 changed files with 660 additions and 318 deletions
|
|
@ -14,14 +14,14 @@ import akka.actor.typed.javadsl.Behaviors;
|
|||
import org.junit.Test;
|
||||
|
||||
import akka.testkit.typed.TestKit;
|
||||
import akka.testkit.typed.ExplicitlyTriggeredScheduler;
|
||||
import akka.testkit.typed.javadsl.ExplicitlyTriggeredScheduler;
|
||||
import akka.testkit.typed.javadsl.TestProbe;
|
||||
|
||||
public class ManualTimerTest extends TestKit {
|
||||
ExplicitlyTriggeredScheduler scheduler;
|
||||
|
||||
public ManualTimerTest() {
|
||||
super(parseString("akka.scheduler.implementation = \"akka.testkit.typed.ExplicitlyTriggeredScheduler\""));
|
||||
super(parseString("akka.scheduler.implementation = \"akka.testkit.typed.javadsl.ExplicitlyTriggeredScheduler\""));
|
||||
this.scheduler = (ExplicitlyTriggeredScheduler) system().scheduler();
|
||||
}
|
||||
|
||||
|
|
@ -30,7 +30,7 @@ public class ManualTimerTest extends TestKit {
|
|||
|
||||
@Test
|
||||
public void testScheduleNonRepeatedTicks() {
|
||||
TestProbe<Tock> probe = new TestProbe<>(system());
|
||||
TestProbe<Tock> probe = TestProbe.create(system());
|
||||
Behavior<Tick> behavior = Behaviors.withTimers(timer -> {
|
||||
timer.startSingleTimer("T", new Tick(), Duration.create(10, TimeUnit.MILLISECONDS));
|
||||
return Behaviors.immutable( (ctx, tick) -> {
|
||||
|
|
@ -44,7 +44,7 @@ public class ManualTimerTest extends TestKit {
|
|||
scheduler.expectNoMessageFor(Duration.create(9, TimeUnit.MILLISECONDS), probe);
|
||||
|
||||
scheduler.timePasses(Duration.create(2, TimeUnit.MILLISECONDS));
|
||||
probe.expectMessageType(Tock.class);
|
||||
probe.expectMessageClass(Tock.class);
|
||||
|
||||
scheduler.expectNoMessageFor(Duration.create(10, TimeUnit.SECONDS), probe);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -42,7 +42,7 @@ public class ActorContextAskTest extends JUnitSuite {
|
|||
final ActorRef<Ping> pingPong = Adapter.spawnAnonymous(system, pingPongBehavior);
|
||||
|
||||
|
||||
final TestProbe<Object> probe = new TestProbe<>(Adapter.toTyped(system));
|
||||
final TestProbe<Object> probe = TestProbe.create(Adapter.toTyped(system));
|
||||
|
||||
final Behavior<Object> snitch = Behaviors.setup((ActorContext<Object> ctx) -> {
|
||||
ctx.ask(Pong.class,
|
||||
|
|
@ -62,7 +62,7 @@ public class ActorContextAskTest extends JUnitSuite {
|
|||
|
||||
Adapter.spawnAnonymous(system, snitch);
|
||||
|
||||
probe.expectMessageType(Pong.class);
|
||||
probe.expectMessageClass(Pong.class);
|
||||
}
|
||||
|
||||
|
||||
|
|
|
|||
|
|
@ -8,7 +8,7 @@ import akka.actor.typed.ActorSystem;
|
|||
import akka.actor.typed.Behavior;
|
||||
import akka.actor.typed.Props;
|
||||
import akka.actor.typed.javadsl.*;
|
||||
import akka.testkit.typed.scaladsl.TestProbe;
|
||||
import akka.testkit.typed.javadsl.TestProbe;
|
||||
import akka.util.Timeout;
|
||||
import org.junit.Test;
|
||||
import org.scalatest.junit.JUnitSuite;
|
||||
|
|
@ -323,7 +323,7 @@ public class InteractionPatternsTest extends JUnitSuite {
|
|||
@Test
|
||||
public void timers() throws Exception {
|
||||
final ActorSystem<Object> system = ActorSystem.create(Behaviors.empty(), "timers-sample");
|
||||
TestProbe<Batch> probe = new TestProbe<>("batcher", system);
|
||||
TestProbe<Batch> probe = TestProbe.create("batcher", system);
|
||||
ActorRef<Msg> bufferer = Await.result(system.systemActorOf(
|
||||
behavior(probe.ref(), new FiniteDuration(1, TimeUnit.SECONDS), 10),
|
||||
"batcher", Props.empty(), akka.util.Timeout.apply(1, TimeUnit.SECONDS)),
|
||||
|
|
|
|||
|
|
@ -49,7 +49,7 @@ public class BasicAsyncTestingTest extends TestKit {
|
|||
@Test
|
||||
public void testVerifyingAResponse() {
|
||||
//#test-spawn
|
||||
TestProbe<Pong> probe = new TestProbe<>(system());
|
||||
TestProbe<Pong> probe = TestProbe.create(system());
|
||||
ActorRef<Ping> pinger = spawn(echoActor, "ping");
|
||||
pinger.tell(new Ping("hello", probe.ref()));
|
||||
probe.expectMessage(new Pong("hello"));
|
||||
|
|
@ -59,7 +59,7 @@ public class BasicAsyncTestingTest extends TestKit {
|
|||
@Test
|
||||
public void testVerifyingAResponseAnonymous() {
|
||||
//#test-spawn-anonymous
|
||||
TestProbe<Pong> probe = new TestProbe<>(system());
|
||||
TestProbe<Pong> probe = TestProbe.create(system());
|
||||
ActorRef<Ping> pinger = spawn(echoActor);
|
||||
pinger.tell(new Ping("hello", probe.ref()));
|
||||
probe.expectMessage(new Pong("hello"));
|
||||
|
|
|
|||
|
|
@ -146,10 +146,10 @@ class TimerSpec extends TestKit("TimerSpec")
|
|||
ref ! Cancel
|
||||
probe.fishForMessage(3.seconds) {
|
||||
// we don't know that we will see exactly one tock
|
||||
case _: Tock ⇒ FishingOutcomes.Continue
|
||||
case _: Tock ⇒ FishingOutcomes.continue
|
||||
// but we know that after we saw Cancelled we won't see any more
|
||||
case Cancelled ⇒ FishingOutcomes.Complete
|
||||
case msg ⇒ FishingOutcomes.Fail(s"unexpected msg: $msg")
|
||||
case Cancelled ⇒ FishingOutcomes.complete
|
||||
case msg ⇒ FishingOutcomes.fail(s"unexpected msg: $msg")
|
||||
}
|
||||
probe.expectNoMessage(interval + 100.millis.dilated)
|
||||
|
||||
|
|
|
|||
|
|
@ -35,22 +35,22 @@ public class ClusterApiTest extends JUnitSuite {
|
|||
Cluster cluster1 = Cluster.get(system1);
|
||||
Cluster cluster2 = Cluster.get(system2);
|
||||
|
||||
TestProbe<ClusterEvent.ClusterDomainEvent> probe1 = new TestProbe<>(system1);
|
||||
TestProbe<ClusterEvent.ClusterDomainEvent> probe1 = TestProbe.create(system1);
|
||||
|
||||
cluster1.subscriptions().tell(new Subscribe<>(probe1.ref().narrow(), SelfUp.class));
|
||||
cluster1.manager().tell(new Join(cluster1.selfMember().address()));
|
||||
probe1.expectMessageType(SelfUp.class);
|
||||
probe1.expectMessageClass(SelfUp.class);
|
||||
|
||||
TestProbe<ClusterEvent.ClusterDomainEvent> probe2 = new TestProbe<>(system2);
|
||||
TestProbe<ClusterEvent.ClusterDomainEvent> probe2 = TestProbe.create(system2);
|
||||
cluster2.subscriptions().tell(new Subscribe<>(probe2.ref().narrow(), SelfUp.class));
|
||||
cluster2.manager().tell(new Join(cluster1.selfMember().address()));
|
||||
probe2.expectMessageType(SelfUp.class);
|
||||
probe2.expectMessageClass(SelfUp.class);
|
||||
|
||||
|
||||
cluster2.subscriptions().tell(new Subscribe<>(probe2.ref().narrow(), SelfRemoved.class));
|
||||
cluster2.manager().tell(new Leave(cluster2.selfMember().address()));
|
||||
|
||||
probe2.expectMessageType(SelfRemoved.class);
|
||||
probe2.expectMessageClass(SelfRemoved.class);
|
||||
} finally {
|
||||
// TODO no java API to terminate actor system
|
||||
Await.result(system1.terminate().zip(system2.terminate()), Duration.create("5 seconds"));
|
||||
|
|
|
|||
|
|
@ -58,15 +58,15 @@ public class BasicClusterExampleTest { // extends JUnitSuite {
|
|||
Cluster cluster2 = Cluster.get(system2);
|
||||
|
||||
//#cluster-subscribe
|
||||
TestProbe<ClusterEvent.MemberEvent> testProbe = new TestProbe<>(system);
|
||||
TestProbe<ClusterEvent.MemberEvent> testProbe = TestProbe.create(system);
|
||||
cluster.subscriptions().tell(Subscribe.create(testProbe.ref(), ClusterEvent.MemberEvent.class));
|
||||
//#cluster-subscribe
|
||||
|
||||
//#cluster-leave-example
|
||||
cluster.manager().tell(Leave.create(cluster2.selfMember().address()));
|
||||
testProbe.expectMessageType(ClusterEvent.MemberLeft.class);
|
||||
testProbe.expectMessageType(ClusterEvent.MemberExited.class);
|
||||
testProbe.expectMessageType(ClusterEvent.MemberRemoved.class);
|
||||
testProbe.expectMessageClass(ClusterEvent.MemberLeft.class);
|
||||
testProbe.expectMessageClass(ClusterEvent.MemberExited.class);
|
||||
testProbe.expectMessageClass(ClusterEvent.MemberRemoved.class);
|
||||
//#cluster-leave-example
|
||||
|
||||
} finally {
|
||||
|
|
|
|||
|
|
@ -145,14 +145,14 @@ public class PersistentActorTest extends TestKit {
|
|||
|
||||
|
||||
private PersistentBehavior<Command, Incremented, State> counter(String persistenceId, ActorRef<Pair<State, Incremented>> probe) {
|
||||
ActorRef<String> loggingProbe = new TestProbe<String>(system()).ref();
|
||||
ActorRef<String> loggingProbe = TestProbe.create(String.class, system()).ref();
|
||||
return counter(persistenceId, probe, loggingProbe, (s, i, l) -> false);
|
||||
}
|
||||
|
||||
private PersistentBehavior<Command, Incremented, State> counter(String persistenceId) {
|
||||
return counter(persistenceId,
|
||||
new TestProbe<Pair<State, Incremented>>(system()).ref(),
|
||||
new TestProbe<String>(system()).ref(),
|
||||
TestProbe.<Pair<State, Incremented>>create(system()).ref(),
|
||||
TestProbe.<String>create(system()).ref(),
|
||||
(s, i, l) -> false);
|
||||
}
|
||||
|
||||
|
|
@ -161,8 +161,8 @@ public class PersistentActorTest extends TestKit {
|
|||
Function3<State, Incremented, Long, Boolean> snapshot
|
||||
) {
|
||||
return counter(persistenceId,
|
||||
new TestProbe<Pair<State, Incremented>>(system()).ref(),
|
||||
new TestProbe<String>(system()).ref(), snapshot);
|
||||
TestProbe.<Pair<State, Incremented>>create(system()).ref(),
|
||||
TestProbe.<String>create(system()).ref(), snapshot);
|
||||
}
|
||||
|
||||
private PersistentBehavior<Command, Incremented, State> counter(
|
||||
|
|
@ -176,7 +176,7 @@ public class PersistentActorTest extends TestKit {
|
|||
String persistentId,
|
||||
ActorRef<Pair<State, Incremented>> eventProbe,
|
||||
Function3<State, Incremented, Long, Boolean> snapshot) {
|
||||
return counter(persistentId, eventProbe, new TestProbe<String>(system()).ref(), snapshot);
|
||||
return counter(persistentId, eventProbe, TestProbe.<String>create(system()).ref(), snapshot);
|
||||
}
|
||||
|
||||
private PersistentBehavior<Command, Incremented, State> counter(
|
||||
|
|
@ -253,7 +253,7 @@ public class PersistentActorTest extends TestKit {
|
|||
@Test
|
||||
public void persistEvents() {
|
||||
ActorRef<Command> c = spawn(counter("c2"));
|
||||
TestProbe<State> probe = new TestProbe<>(system());
|
||||
TestProbe<State> probe = TestProbe.create(system());
|
||||
c.tell(Increment.instance);
|
||||
c.tell(new GetValue(probe.ref()));
|
||||
probe.expectMessage(new State(1, singletonList(0)));
|
||||
|
|
@ -262,7 +262,7 @@ public class PersistentActorTest extends TestKit {
|
|||
@Test
|
||||
public void replyStoredEvents() {
|
||||
ActorRef<Command> c = spawn(counter("c2"));
|
||||
TestProbe<State> probe = new TestProbe<>(system());
|
||||
TestProbe<State> probe = TestProbe.create(system());
|
||||
c.tell(Increment.instance);
|
||||
c.tell(Increment.instance);
|
||||
c.tell(Increment.instance);
|
||||
|
|
@ -279,7 +279,7 @@ public class PersistentActorTest extends TestKit {
|
|||
|
||||
@Test
|
||||
public void handleTerminatedSignal() {
|
||||
TestProbe<Pair<State, Incremented>> eventHandlerProbe = new TestProbe<>(system());
|
||||
TestProbe<Pair<State, Incremented>> eventHandlerProbe = TestProbe.create(system());
|
||||
ActorRef<Command> c = spawn(counter("c2", eventHandlerProbe.ref()));
|
||||
c.tell(Increment.instance);
|
||||
c.tell(new IncrementLater());
|
||||
|
|
@ -289,7 +289,7 @@ public class PersistentActorTest extends TestKit {
|
|||
|
||||
@Test
|
||||
public void handleReceiveTimeout() {
|
||||
TestProbe<Pair<State, Incremented>> eventHandlerProbe = new TestProbe<>(system());
|
||||
TestProbe<Pair<State, Incremented>> eventHandlerProbe = TestProbe.create(system());
|
||||
ActorRef<Command> c = spawn(counter("c1", eventHandlerProbe.ref()));
|
||||
c.tell(new Increment100OnTimeout());
|
||||
eventHandlerProbe.expectMessage(Pair.create(emptyState, timeoutEvent));
|
||||
|
|
@ -297,8 +297,8 @@ public class PersistentActorTest extends TestKit {
|
|||
|
||||
@Test
|
||||
public void chainableSideEffectsWithEvents() {
|
||||
TestProbe<Pair<State, Incremented>> eventHandlerProbe = new TestProbe<>(system());
|
||||
TestProbe<String> loggingProbe = new TestProbe<>(system());
|
||||
TestProbe<Pair<State, Incremented>> eventHandlerProbe = TestProbe.create(system());
|
||||
TestProbe<String> loggingProbe = TestProbe.create(system());
|
||||
ActorRef<Command> c = spawn(counter("c1", eventHandlerProbe.ref(), loggingProbe.ref()));
|
||||
c.tell(new EmptyEventsListAndThenLog());
|
||||
loggingProbe.expectMessage(loggingOne);
|
||||
|
|
@ -311,11 +311,11 @@ public class PersistentActorTest extends TestKit {
|
|||
c.tell(Increment.instance);
|
||||
c.tell(Increment.instance);
|
||||
c.tell(Increment.instance);
|
||||
TestProbe<State> probe = new TestProbe<>(system());
|
||||
TestProbe<State> probe = TestProbe.create(system());
|
||||
c.tell(new GetValue(probe.ref()));
|
||||
probe.expectMessage(new State(3, Arrays.asList(0, 1, 2)));
|
||||
|
||||
TestProbe<Pair<State, Incremented>> eventProbe = new TestProbe<>(system());
|
||||
TestProbe<Pair<State, Incremented>> eventProbe = TestProbe.create(system());
|
||||
snapshoter = counter("c11", eventProbe.ref(), (s, e, l) -> s.value % 2 == 0);
|
||||
ActorRef<Command> c2 = spawn(snapshoter);
|
||||
// First 2 are snapshot
|
||||
|
|
@ -326,7 +326,7 @@ public class PersistentActorTest extends TestKit {
|
|||
|
||||
@Test
|
||||
public void stopThenLog() {
|
||||
TestProbe<State> probe = new TestProbe<>(system());
|
||||
TestProbe<State> probe = TestProbe.create(system());
|
||||
ActorRef<Command> c = spawn(counter("c12"));
|
||||
c.tell(new StopThenLog());
|
||||
probe.expectTerminated(c, FiniteDuration.create(1, TimeUnit.SECONDS));
|
||||
|
|
|
|||
|
|
@ -0,0 +1,22 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2018 Lightbend Inc. <http://www.lightbend.com>
|
||||
*/
|
||||
package akka.testkit.typed
|
||||
|
||||
import akka.annotation.DoNotInherit
|
||||
|
||||
/**
|
||||
* Not for user extension.
|
||||
*
|
||||
* Instances are available from `FishingOutcomes` in the respective dsls: [[akka.testkit.typed.scaladsl.FishingOutcomes]]
|
||||
* and [[akka.testkit.typed.javadsl.FishingOutcomes]]
|
||||
*/
|
||||
@DoNotInherit sealed trait FishingOutcome
|
||||
|
||||
object FishingOutcome {
|
||||
|
||||
case object Continue extends FishingOutcome
|
||||
case object ContinueAndIgnore extends FishingOutcome
|
||||
case object Complete extends FishingOutcome
|
||||
case class Fail(error: String) extends FishingOutcome
|
||||
}
|
||||
|
|
@ -148,7 +148,7 @@ final case class CapturedLogEvent(logLevel: LogLevel, message: String, cause: Op
|
|||
new FunctionRef[U](
|
||||
self.path / i.ref.path.name,
|
||||
(msg, _) ⇒ { val m = f(msg); if (m != null) { selfInbox.ref ! m; i.ref ! msg } },
|
||||
(self) ⇒ selfInbox.ref.sorry.sendSystem(internal.DeathWatchNotification(self, null)))
|
||||
(self) ⇒ selfInbox.ref.sorry.sendSystem(DeathWatchNotification(self, null)))
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
|||
|
|
@ -15,15 +15,7 @@ import scala.util.control.NoStackTrace
|
|||
/**
|
||||
* Exception without stack trace to use for verifying exceptions in tests
|
||||
*/
|
||||
case class TE(message: String) extends RuntimeException(message) with NoStackTrace
|
||||
|
||||
/**
|
||||
* Not for user extension.
|
||||
*
|
||||
* Instances are available from `FishingOutcomes` in the respective dsls: [[akka.testkit.typed.scaladsl.FishingOutcomes]]
|
||||
* and [[akka.testkit.typed.javadsl.FishingOutcomes]]
|
||||
*/
|
||||
@DoNotInherit abstract class FishingOutcome private[akka] ()
|
||||
final case class TE(message: String) extends RuntimeException(message) with NoStackTrace
|
||||
|
||||
object TestKit {
|
||||
|
||||
|
|
|
|||
|
|
@ -0,0 +1,245 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2018 Lightbend Inc. <http://www.lightbend.com>
|
||||
*/
|
||||
package akka.testkit.typed.internal
|
||||
|
||||
import java.util.concurrent.atomic.AtomicInteger
|
||||
import java.util.concurrent.{ BlockingDeque, LinkedBlockingDeque }
|
||||
import java.util.function.Supplier
|
||||
|
||||
import akka.actor.typed.scaladsl.Behaviors
|
||||
import akka.actor.typed.{ ActorRef, ActorSystem, Behavior, Terminated }
|
||||
import akka.annotation.InternalApi
|
||||
import akka.testkit.typed.javadsl.{ TestProbe ⇒ JavaTestProbe }
|
||||
import akka.testkit.typed.scaladsl.{ TestDuration, TestProbe ⇒ ScalaTestProbe }
|
||||
import akka.testkit.typed.{ FishingOutcome, TestKitSettings }
|
||||
import akka.util.PrettyDuration._
|
||||
import akka.util.{ BoxedType, Timeout }
|
||||
|
||||
import scala.annotation.tailrec
|
||||
import scala.concurrent.Await
|
||||
import scala.concurrent.duration._
|
||||
import scala.util.control.NonFatal
|
||||
|
||||
@InternalApi
|
||||
private[akka] object TestProbeImpl {
|
||||
private val testActorId = new AtomicInteger(0)
|
||||
|
||||
private case class WatchActor[U](actor: ActorRef[U])
|
||||
private def testActor[M](queue: BlockingDeque[M], terminations: BlockingDeque[Terminated]): Behavior[M] = Behaviors.immutable[M] { (ctx, msg) ⇒
|
||||
msg match {
|
||||
case WatchActor(ref) ⇒ ctx.watch(ref)
|
||||
case other ⇒ queue.offerLast(other)
|
||||
}
|
||||
Behaviors.same
|
||||
}.onSignal {
|
||||
case (_, t: Terminated) ⇒
|
||||
terminations.offerLast(t)
|
||||
Behaviors.same
|
||||
}
|
||||
}
|
||||
|
||||
@InternalApi
|
||||
private[akka] final class TestProbeImpl[M](name: String, system: ActorSystem[_]) extends JavaTestProbe[M] with ScalaTestProbe[M] {
|
||||
|
||||
import TestProbeImpl._
|
||||
protected implicit val settings = TestKitSettings(system)
|
||||
private val queue = new LinkedBlockingDeque[M]
|
||||
private val terminations = new LinkedBlockingDeque[Terminated]
|
||||
|
||||
private var end: Duration = Duration.Undefined
|
||||
|
||||
/**
|
||||
* if last assertion was expectNoMessage, disable timing failure upon within()
|
||||
* block end.
|
||||
*/
|
||||
private var lastWasNoMessage = false
|
||||
|
||||
private var lastMessage: Option[M] = None
|
||||
|
||||
private val testActor: ActorRef[M] = {
|
||||
// FIXME arbitrary timeout?
|
||||
implicit val timeout = Timeout(3.seconds)
|
||||
val futRef = system.systemActorOf(TestProbeImpl.testActor(queue, terminations), s"$name-${testActorId.incrementAndGet()}")
|
||||
Await.result(futRef, timeout.duration + 1.second)
|
||||
}
|
||||
|
||||
override def ref = testActor
|
||||
|
||||
override def remainingOrDefault = remainingOr(settings.SingleExpectDefaultTimeout.dilated)
|
||||
|
||||
override def remaining: FiniteDuration = end match {
|
||||
case f: FiniteDuration ⇒ f - now
|
||||
case _ ⇒ throw new AssertionError("`remaining` may not be called outside of `within`")
|
||||
}
|
||||
|
||||
override def remainingOr(duration: FiniteDuration): FiniteDuration = end match {
|
||||
case x if x eq Duration.Undefined ⇒ duration
|
||||
case x if !x.isFinite ⇒ throw new IllegalArgumentException("`end` cannot be infinite")
|
||||
case f: FiniteDuration ⇒ f - now
|
||||
}
|
||||
|
||||
private def remainingOrDilated(max: Duration): FiniteDuration = max match {
|
||||
case x if x eq Duration.Undefined ⇒ remainingOrDefault
|
||||
case x if !x.isFinite ⇒ throw new IllegalArgumentException("max duration cannot be infinite")
|
||||
case f: FiniteDuration ⇒ f.dilated
|
||||
}
|
||||
|
||||
override protected def within_internal[T](min: FiniteDuration, max: FiniteDuration, f: ⇒ T): T = {
|
||||
val _max = max.dilated
|
||||
val start = now
|
||||
val rem = if (end == Duration.Undefined) Duration.Inf else end - start
|
||||
assert(rem >= min, s"required min time $min not possible, only ${rem.pretty} left")
|
||||
|
||||
lastWasNoMessage = false
|
||||
|
||||
val max_diff = _max min rem
|
||||
val prev_end = end
|
||||
end = start + max_diff
|
||||
|
||||
val ret = try f finally end = prev_end
|
||||
|
||||
val diff = now - start
|
||||
assert(min <= diff, s"block took ${diff.pretty}, should at least have been $min")
|
||||
if (!lastWasNoMessage) {
|
||||
assert(diff <= max_diff, s"block took ${diff.pretty}, exceeding ${max_diff.pretty}")
|
||||
}
|
||||
|
||||
ret
|
||||
}
|
||||
|
||||
override def expectMessage[T <: M](obj: T): T = expectMessage_internal(remainingOrDefault, obj)
|
||||
|
||||
override def expectMessage[T <: M](max: FiniteDuration, obj: T): T = expectMessage_internal(max.dilated, obj)
|
||||
|
||||
override def expectMessage[T <: M](max: FiniteDuration, hint: String, obj: T): T =
|
||||
expectMessage_internal(max.dilated, obj, Some(hint))
|
||||
|
||||
private def expectMessage_internal[T <: M](max: Duration, obj: T, hint: Option[String] = None): T = {
|
||||
val o = receiveOne(max)
|
||||
val hintOrEmptyString = hint.map(": " + _).getOrElse("")
|
||||
assert(o != null, s"timeout ($max) during expectMessage while waiting for $obj" + hintOrEmptyString)
|
||||
assert(obj == o, s"expected $obj, found $o" + hintOrEmptyString)
|
||||
o.asInstanceOf[T]
|
||||
}
|
||||
|
||||
/**
|
||||
* Receive one message from the internal queue of the TestActor. If the given
|
||||
* duration is zero, the queue is polled (non-blocking).
|
||||
*
|
||||
* This method does NOT automatically scale its Duration parameter!
|
||||
*/
|
||||
private def receiveOne(max: Duration): M = {
|
||||
val message =
|
||||
if (max == Duration.Zero) {
|
||||
queue.pollFirst
|
||||
} else if (max.isFinite) {
|
||||
queue.pollFirst(max.length, max.unit)
|
||||
} else {
|
||||
queue.takeFirst
|
||||
}
|
||||
lastWasNoMessage = false
|
||||
lastMessage = if (message == null) None else Some(message)
|
||||
message
|
||||
}
|
||||
|
||||
override def expectNoMessage(max: FiniteDuration): Unit = { expectNoMessage_internal(max) }
|
||||
|
||||
override def expectNoMessage(): Unit = { expectNoMessage_internal(settings.ExpectNoMessageDefaultTimeout.dilated) }
|
||||
|
||||
private def expectNoMessage_internal(max: FiniteDuration) {
|
||||
val o = receiveOne(max)
|
||||
assert(o == null, s"received unexpected message $o")
|
||||
lastWasNoMessage = true
|
||||
}
|
||||
|
||||
override protected def expectMessageClass_internal[C](max: FiniteDuration, c: Class[C]): C = {
|
||||
val o = receiveOne(max)
|
||||
assert(o != null, s"timeout ($max) during expectMessageClass waiting for $c")
|
||||
assert(BoxedType(c) isInstance o, s"expected $c, found ${o.getClass} ($o)")
|
||||
o.asInstanceOf[C]
|
||||
}
|
||||
|
||||
override protected def fishForMessage_internal(max: FiniteDuration, hint: String, fisher: M ⇒ FishingOutcome): List[M] = {
|
||||
// not tailrec but that should be ok
|
||||
def loop(timeout: FiniteDuration, seen: List[M]): List[M] = {
|
||||
val start = System.nanoTime()
|
||||
val msg = receiveOne(timeout)
|
||||
try {
|
||||
fisher(msg) match {
|
||||
case FishingOutcome.Complete ⇒ (msg :: seen).reverse
|
||||
case FishingOutcome.Fail(error) ⇒ throw new AssertionError(s"$error, hint: $hint")
|
||||
case continue ⇒
|
||||
val newTimeout =
|
||||
if (timeout.isFinite()) timeout - (System.nanoTime() - start).nanos
|
||||
else timeout
|
||||
if (newTimeout.toMillis <= 0) {
|
||||
throw new AssertionError(s"timeout ($max) during fishForMessage, seen messages ${seen.reverse}, hint: $hint")
|
||||
} else {
|
||||
|
||||
continue match {
|
||||
case FishingOutcome.Continue ⇒ loop(newTimeout, msg :: seen)
|
||||
case FishingOutcome.ContinueAndIgnore ⇒ loop(newTimeout, seen)
|
||||
case _ ⇒ ??? // cannot happen
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
} catch {
|
||||
case ex: MatchError ⇒ throw new AssertionError(
|
||||
s"Unexpected message $msg while fishing for messages, " +
|
||||
s"seen messages ${seen.reverse}, hint: $hint", ex)
|
||||
}
|
||||
}
|
||||
|
||||
loop(max.dilated, Nil)
|
||||
}
|
||||
|
||||
override def expectTerminated[U](actorRef: ActorRef[U], max: FiniteDuration): Unit = {
|
||||
testActor.asInstanceOf[ActorRef[AnyRef]] ! WatchActor(actorRef)
|
||||
val message =
|
||||
if (max == Duration.Zero) {
|
||||
terminations.pollFirst
|
||||
} else if (max.isFinite) {
|
||||
terminations.pollFirst(max.length, max.unit)
|
||||
} else {
|
||||
terminations.takeFirst
|
||||
}
|
||||
assert(message != null, s"timeout ($max) during expectStop waiting for actor [${actorRef.path}] to stop")
|
||||
assert(message.ref == actorRef, s"expected [${actorRef.path}] to stop, but saw [${message.ref.path}] stop")
|
||||
}
|
||||
|
||||
override def awaitAssert[A](max: Duration, interval: Duration, supplier: Supplier[A]): A =
|
||||
awaitAssert(supplier.get(), max, interval)
|
||||
|
||||
override def awaitAssert[A](a: ⇒ A, max: Duration = Duration.Undefined, interval: Duration = 100.millis): A = {
|
||||
val _max = remainingOrDilated(max)
|
||||
val stop = now + _max
|
||||
|
||||
@tailrec
|
||||
def poll(t: Duration): A = {
|
||||
val result: A =
|
||||
try {
|
||||
a
|
||||
} catch {
|
||||
case NonFatal(e) ⇒
|
||||
if ((now + t) >= stop) throw e
|
||||
else null.asInstanceOf[A]
|
||||
}
|
||||
|
||||
if (result != null) result
|
||||
else {
|
||||
Thread.sleep(t.toMillis)
|
||||
poll((stop - now) min interval)
|
||||
}
|
||||
}
|
||||
|
||||
poll(_max min interval)
|
||||
}
|
||||
|
||||
/**
|
||||
* Obtain current time (`System.nanoTime`) as Duration.
|
||||
*/
|
||||
private def now: FiniteDuration = System.nanoTime.nanos
|
||||
|
||||
}
|
||||
|
|
@ -1,19 +1,22 @@
|
|||
package akka.testkit.typed
|
||||
/**
|
||||
* Copyright (C) 2009-2018 Lightbend Inc. <https://www.lightbend.com>
|
||||
*/
|
||||
package akka.testkit.typed.javadsl
|
||||
|
||||
import java.util.concurrent.ThreadFactory
|
||||
|
||||
import akka.event.LoggingAdapter
|
||||
import com.typesafe.config.Config
|
||||
|
||||
import scala.annotation.varargs
|
||||
import scala.concurrent.duration.{ Duration, FiniteDuration }
|
||||
|
||||
import akka.event.LoggingAdapter
|
||||
|
||||
class ExplicitlyTriggeredScheduler(config: Config, log: LoggingAdapter, tf: ThreadFactory) extends akka.testkit.ExplicitlyTriggeredScheduler(config, log, tf) {
|
||||
|
||||
@varargs
|
||||
def expectNoMessageFor(duration: FiniteDuration, on: scaladsl.TestProbe[_]*): Unit = {
|
||||
def expectNoMessageFor(duration: FiniteDuration, on: TestProbe[_]*): Unit = {
|
||||
timePasses(duration)
|
||||
on.foreach(_.expectNoMessage(Duration.Zero))
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -3,42 +3,205 @@
|
|||
*/
|
||||
package akka.testkit.typed.javadsl
|
||||
|
||||
import akka.actor.typed.ActorSystem
|
||||
import akka.testkit.typed.FishingOutcome
|
||||
import java.util.function.Supplier
|
||||
|
||||
import scala.concurrent.duration.FiniteDuration
|
||||
import akka.actor.typed.{ ActorRef, ActorSystem }
|
||||
import akka.annotation.DoNotInherit
|
||||
import akka.testkit.typed.internal.TestProbeImpl
|
||||
import akka.testkit.typed.{ FishingOutcome, TestKitSettings }
|
||||
import akka.testkit.typed.scaladsl.TestDuration
|
||||
|
||||
import scala.concurrent.duration.{ Duration, FiniteDuration }
|
||||
import scala.collection.JavaConverters._
|
||||
import scala.concurrent.duration._
|
||||
|
||||
object FishingOutcomes {
|
||||
/**
|
||||
* Consume this message and continue with the next
|
||||
*/
|
||||
def continue(): FishingOutcome = FishingOutcome.Continue
|
||||
|
||||
/**
|
||||
* Consume this message and continue with the next
|
||||
*/
|
||||
def continue(): FishingOutcome = akka.testkit.typed.scaladsl.FishingOutcomes.Continue
|
||||
def continueAndIgnore(): FishingOutcome = akka.testkit.typed.FishingOutcome.ContinueAndIgnore
|
||||
|
||||
/**
|
||||
* Complete fishing and return this message
|
||||
*/
|
||||
def complete(): FishingOutcome = akka.testkit.typed.scaladsl.FishingOutcomes.Complete
|
||||
def complete(): FishingOutcome = akka.testkit.typed.FishingOutcome.Complete
|
||||
|
||||
/**
|
||||
* Fail fishing with a custom error message
|
||||
*/
|
||||
def fail(error: String): FishingOutcome = akka.testkit.typed.scaladsl.FishingOutcomes.Fail(error)
|
||||
def fail(error: String): FishingOutcome = akka.testkit.typed.FishingOutcome.Fail(error)
|
||||
}
|
||||
|
||||
object TestProbe {
|
||||
|
||||
def create[M](system: ActorSystem[_]): TestProbe[M] =
|
||||
create(name = "testProbe", system)
|
||||
|
||||
def create[M](clazz: Class[M], system: ActorSystem[_]): TestProbe[M] =
|
||||
create(system)
|
||||
|
||||
def create[M](name: String, system: ActorSystem[_]): TestProbe[M] =
|
||||
new TestProbeImpl[M](name, system)
|
||||
|
||||
def create[M](name: String, clazz: Class[M], system: ActorSystem[_]): TestProbe[M] =
|
||||
new TestProbeImpl[M](name, system)
|
||||
}
|
||||
|
||||
/**
|
||||
* Java API:
|
||||
* Java API: * Create instances through the `create` factories in the [[TestProbe]] companion.
|
||||
*
|
||||
* A test probe is essentially a queryable mailbox which can be used in place of an actor and the received
|
||||
* messages can then be asserted etc.
|
||||
*
|
||||
* Not for user extension
|
||||
*/
|
||||
class TestProbe[M](name: String, system: ActorSystem[_]) extends akka.testkit.typed.scaladsl.TestProbe[M](name)(system) {
|
||||
@DoNotInherit
|
||||
abstract class TestProbe[M] {
|
||||
|
||||
def this(system: ActorSystem[_]) = this("testProbe", system)
|
||||
implicit protected def settings: TestKitSettings
|
||||
|
||||
/**
|
||||
* Same as `expectMsgType[T](remainingOrDefault)`, but correctly treating the timeFactor.
|
||||
* ActorRef for this TestProbe
|
||||
*/
|
||||
def expectMessageType[T <: M](t: Class[T]): T =
|
||||
expectMessageClass_internal(remainingOrDefault, t)
|
||||
def ref: ActorRef[M]
|
||||
|
||||
/**
|
||||
* Obtain time remaining for execution of the innermost enclosing `within`
|
||||
* block or missing that it returns the properly dilated default for this
|
||||
* case from settings (key "akka.actor.typed.test.single-expect-default").
|
||||
*/
|
||||
def remainingOrDefault: FiniteDuration
|
||||
|
||||
/**
|
||||
* Obtain time remaining for execution of the innermost enclosing `within`
|
||||
* block or throw an [[AssertionError]] if no `within` block surrounds this
|
||||
* call.
|
||||
*/
|
||||
def remaining: FiniteDuration
|
||||
|
||||
/**
|
||||
* Obtain time remaining for execution of the innermost enclosing `within`
|
||||
* block or missing that it returns the given duration.
|
||||
*/
|
||||
def remainingOr(duration: FiniteDuration): FiniteDuration
|
||||
|
||||
/**
|
||||
* Execute code block while bounding its execution time between `min` and
|
||||
* `max`. `within` blocks may be nested. All methods in this trait which
|
||||
* take maximum wait times are available in a version which implicitly uses
|
||||
* the remaining time governed by the innermost enclosing `within` block.
|
||||
*
|
||||
* Note that the timeout is scaled using Duration.dilated, which uses the
|
||||
* configuration entry "akka.actor.typed.test.timefactor", while the min Duration is not.
|
||||
*
|
||||
* {{{
|
||||
* val ret = within(50 millis) {
|
||||
* test ! Ping
|
||||
* expectMessageType[Pong]
|
||||
* }
|
||||
* }}}
|
||||
*/
|
||||
def within[T](min: FiniteDuration, max: FiniteDuration)(f: Supplier[T]): T =
|
||||
within_internal(min, max, f.get())
|
||||
|
||||
/**
|
||||
* Same as calling `within(0 seconds, max)(f)`.
|
||||
*/
|
||||
def within[T](max: FiniteDuration)(f: Supplier[T]): T =
|
||||
within_internal(Duration.Zero, max, f.get())
|
||||
|
||||
protected def within_internal[T](min: FiniteDuration, max: FiniteDuration, f: ⇒ T): T
|
||||
|
||||
/**
|
||||
* Same as `expectMessage(remainingOrDefault, obj)`, but correctly treating the timeFactor.
|
||||
*/
|
||||
def expectMessage[T <: M](obj: T): T
|
||||
|
||||
/**
|
||||
* Receive one message from the test actor and assert that it equals the
|
||||
* given object. Wait time is bounded by the given duration, with an
|
||||
* [[AssertionError]] being thrown in case of timeout.
|
||||
*
|
||||
* @return the received object
|
||||
*/
|
||||
def expectMessage[T <: M](max: FiniteDuration, obj: T): T
|
||||
|
||||
/**
|
||||
* Receive one message from the test actor and assert that it equals the
|
||||
* given object. Wait time is bounded by the given duration, with an
|
||||
* [[AssertionError]] being thrown in case of timeout.
|
||||
*
|
||||
* @return the received object
|
||||
*/
|
||||
def expectMessage[T <: M](max: FiniteDuration, hint: String, obj: T): T
|
||||
|
||||
/**
|
||||
* Assert that no message is received for the specified time.
|
||||
* Supplied value is not dilated.
|
||||
*/
|
||||
def expectNoMessage(max: FiniteDuration): Unit
|
||||
|
||||
/**
|
||||
* Assert that no message is received. Waits for the default period configured as `akka.actor.typed.test.expect-no-message-default`
|
||||
* That value is dilated.
|
||||
*/
|
||||
def expectNoMessage(): Unit
|
||||
|
||||
/**
|
||||
* Expect the given actor to be stopped or stop withing the given timeout or
|
||||
* throw an [[AssertionError]].
|
||||
*/
|
||||
def expectTerminated[U](actorRef: ActorRef[U], max: FiniteDuration): Unit
|
||||
|
||||
/**
|
||||
* Evaluate the given assert every `interval` until it does not throw an exception and return the
|
||||
* result.
|
||||
*
|
||||
* If the `max` timeout expires the last exception is thrown.
|
||||
*
|
||||
* Note that the timeout is scaled using Duration.dilated, which uses the configuration entry "akka.test.timefactor".
|
||||
*/
|
||||
def awaitAssert[A](max: Duration, interval: Duration, supplier: Supplier[A]): A
|
||||
|
||||
/**
|
||||
* Evaluate the given assert every 100 milliseconds until it does not throw an exception and return the
|
||||
* result.
|
||||
*
|
||||
* If the `max` timeout expires the last exception is thrown.
|
||||
*
|
||||
* Note that the timeout is scaled using Duration.dilated, which uses the configuration entry "akka.test.timefactor".
|
||||
*/
|
||||
def awaitAssert[A](max: Duration, supplier: Supplier[A]): A =
|
||||
awaitAssert(max, 100.millis, supplier)
|
||||
|
||||
/**
|
||||
* Evaluate the given assert every 100 milliseconds until it does not throw an exception and return the
|
||||
* result. A max time is taken it from the innermost enclosing `within` block.
|
||||
*/
|
||||
def awaitAssert[A](supplier: Supplier[A]): A =
|
||||
awaitAssert(Duration.Undefined, supplier)
|
||||
|
||||
// FIXME awaitAssert(Procedure): Unit would be nice for java people to not have to return null
|
||||
|
||||
/**
|
||||
* Same as `expectMessageType(clazz, remainingOrDefault)`, but correctly treating the timeFactor.
|
||||
*/
|
||||
def expectMessageClass[T](clazz: Class[T]): T =
|
||||
expectMessageClass_internal(remainingOrDefault, clazz)
|
||||
|
||||
/**
|
||||
* Wait for a message of type M and return it when it arrives, or fail if the `max` timeout is hit.
|
||||
* The timeout is dilated.
|
||||
*/
|
||||
def expectMessageClass[T](clazz: Class[T], max: FiniteDuration): T =
|
||||
expectMessageClass_internal(max.dilated, clazz)
|
||||
|
||||
protected def expectMessageClass_internal[C](max: FiniteDuration, c: Class[C]): C
|
||||
|
||||
/**
|
||||
* Java API: Allows for flexible matching of multiple messages within a timeout, the fisher function is fed each incoming
|
||||
|
|
@ -57,14 +220,15 @@ class TestProbe[M](name: String, system: ActorSystem[_]) extends akka.testkit.ty
|
|||
* The timeout is dilated.
|
||||
* @return The messages accepted in the order they arrived
|
||||
*/
|
||||
// FIXME same name would cause ambiguity but I'm out of ideas how to fix, separate Scala/Java TestProbe APIs?
|
||||
def fishForMessageJava(max: FiniteDuration, fisher: java.util.function.Function[M, FishingOutcome]): java.util.List[M] =
|
||||
fishForMessage(max)(fisher.apply).asJava
|
||||
def fishForMessage(max: FiniteDuration, fisher: java.util.function.Function[M, FishingOutcome]): java.util.List[M] =
|
||||
fishForMessage(max, "", fisher)
|
||||
|
||||
/**
|
||||
* Same as the other `fishForMessageJava` but includes the provided hint in all error messages
|
||||
*/
|
||||
def fishForMessageJava(max: FiniteDuration, hint: String, fisher: java.util.function.Function[M, FishingOutcome]): java.util.List[M] =
|
||||
fishForMessage(max, hint)(fisher.apply).asJava
|
||||
def fishForMessage(max: FiniteDuration, hint: String, fisher: java.util.function.Function[M, FishingOutcome]): java.util.List[M] =
|
||||
fishForMessage_internal(max, hint, fisher.apply).asJava
|
||||
|
||||
protected def fishForMessage_internal(max: FiniteDuration, hint: String, fisher: M ⇒ FishingOutcome): List[M]
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -0,0 +1,22 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2018 Lightbend Inc. <https://www.lightbend.com>
|
||||
*/
|
||||
package akka.testkit.typed.scaladsl
|
||||
|
||||
import java.util.concurrent.ThreadFactory
|
||||
|
||||
import akka.event.LoggingAdapter
|
||||
import com.typesafe.config.Config
|
||||
|
||||
import scala.annotation.varargs
|
||||
import scala.concurrent.duration.{ Duration, FiniteDuration }
|
||||
|
||||
class ExplicitlyTriggeredScheduler(config: Config, log: LoggingAdapter, tf: ThreadFactory) extends akka.testkit.ExplicitlyTriggeredScheduler(config, log, tf) {
|
||||
|
||||
@varargs
|
||||
def expectNoMessageFor(duration: FiniteDuration, on: TestProbe[_]*): Unit = {
|
||||
timePasses(duration)
|
||||
on.foreach(_.expectNoMessage(Duration.Zero))
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -5,7 +5,7 @@ import com.typesafe.config.{ Config, ConfigFactory }
|
|||
import akka.testkit.typed._
|
||||
|
||||
object ManualTime {
|
||||
val config: Config = ConfigFactory.parseString("""akka.scheduler.implementation = "akka.testkit.typed.ExplicitlyTriggeredScheduler"""")
|
||||
val config: Config = ConfigFactory.parseString("""akka.scheduler.implementation = "akka.testkit.typed.scaladsl.ExplicitlyTriggeredScheduler"""")
|
||||
}
|
||||
trait ManualTime { self: TestKit ⇒
|
||||
override val scheduler: ExplicitlyTriggeredScheduler = self.system.scheduler.asInstanceOf[ExplicitlyTriggeredScheduler]
|
||||
|
|
|
|||
|
|
@ -3,137 +3,79 @@
|
|||
*/
|
||||
package akka.testkit.typed.scaladsl
|
||||
|
||||
import scala.concurrent.duration._
|
||||
import java.util.concurrent.BlockingDeque
|
||||
|
||||
import akka.actor.typed.{ ActorRef, ActorSystem, Behavior, Terminated }
|
||||
import akka.actor.typed.scaladsl.Behaviors
|
||||
import java.util.concurrent.LinkedBlockingDeque
|
||||
import java.util.concurrent.atomic.AtomicInteger
|
||||
|
||||
import akka.actor.typed.{ ActorRef, ActorSystem }
|
||||
import akka.annotation.DoNotInherit
|
||||
import akka.util.Timeout
|
||||
import akka.util.PrettyDuration.PrettyPrintableDuration
|
||||
|
||||
import scala.concurrent.Await
|
||||
import akka.testkit.typed.internal.TestProbeImpl
|
||||
import akka.testkit.typed.{ FishingOutcome, TestKitSettings }
|
||||
import akka.util.BoxedType
|
||||
|
||||
import scala.annotation.tailrec
|
||||
import scala.concurrent.duration._
|
||||
import scala.reflect.ClassTag
|
||||
import scala.util.control.NonFatal
|
||||
import scala.collection.immutable
|
||||
|
||||
object FishingOutcomes {
|
||||
/**
|
||||
* Complete fishing and return all messages up until this
|
||||
*/
|
||||
val complete = FishingOutcome.Complete
|
||||
/**
|
||||
* Consume this message, collect it into the result, and continue with the next message
|
||||
*/
|
||||
val continue = FishingOutcome.Continue
|
||||
/**
|
||||
* Consume this message, but do not collect it into the result, and continue with the next message
|
||||
*/
|
||||
val continueAndIgnore = FishingOutcome.ContinueAndIgnore
|
||||
/**
|
||||
* Fail fishing with a custom error message
|
||||
*/
|
||||
def fail(msg: String) = FishingOutcome.Fail(msg)
|
||||
}
|
||||
|
||||
object TestProbe {
|
||||
private val testActorId = new AtomicInteger(0)
|
||||
|
||||
def apply[M]()(implicit system: ActorSystem[_]): TestProbe[M] =
|
||||
apply(name = "testProbe")
|
||||
|
||||
def apply[M](name: String)(implicit system: ActorSystem[_]): TestProbe[M] =
|
||||
new TestProbe(name)
|
||||
new TestProbeImpl[M](name, system)
|
||||
|
||||
private case class WatchActor[U](actor: ActorRef[U])
|
||||
private def testActor[M](queue: BlockingDeque[M], terminations: BlockingDeque[Terminated]): Behavior[M] = Behaviors.immutable[M] { (ctx, msg) ⇒
|
||||
msg match {
|
||||
case WatchActor(ref) ⇒ ctx.watch(ref)
|
||||
case other ⇒ queue.offerLast(other)
|
||||
}
|
||||
Behaviors.same
|
||||
}.onSignal {
|
||||
case (_, t: Terminated) ⇒
|
||||
terminations.offerLast(t)
|
||||
Behaviors.same
|
||||
}
|
||||
}
|
||||
|
||||
object FishingOutcomes {
|
||||
/**
|
||||
* Create instances through the factories in the [[TestProbe]] companion.
|
||||
*
|
||||
* A test probe is essentially a queryable mailbox which can be used in place of an actor and the received
|
||||
* messages can then be asserted
|
||||
*
|
||||
* Not for user extension
|
||||
*/
|
||||
@DoNotInherit trait TestProbe[M] {
|
||||
|
||||
implicit protected def settings: TestKitSettings
|
||||
|
||||
/**
|
||||
* Consume this message, collect it into the result, and continue with the next message
|
||||
* ActorRef for this TestProbe
|
||||
*/
|
||||
case object Continue extends FishingOutcome
|
||||
|
||||
/**
|
||||
* Consume this message, but do not collect it into the result, and continue with the next message
|
||||
*/
|
||||
case object ContinueAndIgnore extends FishingOutcome
|
||||
|
||||
/**
|
||||
* Complete fishing and return this message
|
||||
*/
|
||||
case object Complete extends FishingOutcome
|
||||
|
||||
/**
|
||||
* Fail fishing with a custom error message
|
||||
*/
|
||||
case class Fail(error: String) extends FishingOutcome
|
||||
}
|
||||
|
||||
class TestProbe[M](name: String)(implicit system: ActorSystem[_]) {
|
||||
|
||||
import TestProbe._
|
||||
private implicit val settings = TestKitSettings(system)
|
||||
private val queue = new LinkedBlockingDeque[M]
|
||||
private val terminations = new LinkedBlockingDeque[Terminated]
|
||||
|
||||
private var end: Duration = Duration.Undefined
|
||||
|
||||
/**
|
||||
* if last assertion was expectNoMessage, disable timing failure upon within()
|
||||
* block end.
|
||||
*/
|
||||
private var lastWasNoMessage = false
|
||||
|
||||
private var lastMessage: Option[M] = None
|
||||
|
||||
val testActor: ActorRef[M] = {
|
||||
implicit val timeout = Timeout(3.seconds)
|
||||
val futRef = system.systemActorOf(TestProbe.testActor(queue, terminations), s"$name-${testActorId.incrementAndGet()}")
|
||||
Await.result(futRef, timeout.duration + 1.second)
|
||||
}
|
||||
|
||||
/**
|
||||
* Shorthand to get the `testActor`.
|
||||
*/
|
||||
def ref: ActorRef[M] = testActor
|
||||
|
||||
/**
|
||||
* Obtain current time (`System.nanoTime`) as Duration.
|
||||
*/
|
||||
protected def now: FiniteDuration = System.nanoTime.nanos
|
||||
def ref: ActorRef[M]
|
||||
|
||||
/**
|
||||
* Obtain time remaining for execution of the innermost enclosing `within`
|
||||
* block or missing that it returns the properly dilated default for this
|
||||
* case from settings (key "akka.actor.typed.test.single-expect-default").
|
||||
*/
|
||||
def remainingOrDefault = remainingOr(settings.SingleExpectDefaultTimeout.dilated)
|
||||
def remainingOrDefault: FiniteDuration
|
||||
|
||||
/**
|
||||
* Obtain time remaining for execution of the innermost enclosing `within`
|
||||
* block or throw an [[AssertionError]] if no `within` block surrounds this
|
||||
* call.
|
||||
*/
|
||||
def remaining: FiniteDuration = end match {
|
||||
case f: FiniteDuration ⇒ f - now
|
||||
case _ ⇒ throw new AssertionError("`remaining` may not be called outside of `within`")
|
||||
}
|
||||
def remaining: FiniteDuration
|
||||
|
||||
/**
|
||||
* Obtain time remaining for execution of the innermost enclosing `within`
|
||||
* block or missing that it returns the given duration.
|
||||
*/
|
||||
def remainingOr(duration: FiniteDuration): FiniteDuration = end match {
|
||||
case x if x eq Duration.Undefined ⇒ duration
|
||||
case x if !x.isFinite ⇒ throw new IllegalArgumentException("`end` cannot be infinite")
|
||||
case f: FiniteDuration ⇒ f - now
|
||||
}
|
||||
|
||||
private def remainingOrDilated(max: Duration): FiniteDuration = max match {
|
||||
case x if x eq Duration.Undefined ⇒ remainingOrDefault
|
||||
case x if !x.isFinite ⇒ throw new IllegalArgumentException("max duration cannot be infinite")
|
||||
case f: FiniteDuration ⇒ f.dilated
|
||||
}
|
||||
def remainingOr(duration: FiniteDuration): FiniteDuration
|
||||
|
||||
/**
|
||||
* Execute code block while bounding its execution time between `min` and
|
||||
|
|
@ -151,38 +93,20 @@ class TestProbe[M](name: String)(implicit system: ActorSystem[_]) {
|
|||
* }
|
||||
* }}}
|
||||
*/
|
||||
def within[T](min: FiniteDuration, max: FiniteDuration)(f: ⇒ T): T = {
|
||||
val _max = max.dilated
|
||||
val start = now
|
||||
val rem = if (end == Duration.Undefined) Duration.Inf else end - start
|
||||
assert(rem >= min, s"required min time $min not possible, only ${rem.pretty} left")
|
||||
|
||||
lastWasNoMessage = false
|
||||
|
||||
val max_diff = _max min rem
|
||||
val prev_end = end
|
||||
end = start + max_diff
|
||||
|
||||
val ret = try f finally end = prev_end
|
||||
|
||||
val diff = now - start
|
||||
assert(min <= diff, s"block took ${diff.pretty}, should at least have been $min")
|
||||
if (!lastWasNoMessage) {
|
||||
assert(diff <= max_diff, s"block took ${diff.pretty}, exceeding ${max_diff.pretty}")
|
||||
}
|
||||
|
||||
ret
|
||||
}
|
||||
|
||||
def within[T](min: FiniteDuration, max: FiniteDuration)(f: ⇒ T): T =
|
||||
within_internal(min, max, f)
|
||||
/**
|
||||
* Same as calling `within(0 seconds, max)(f)`.
|
||||
*/
|
||||
def within[T](max: FiniteDuration)(f: ⇒ T): T = within(Duration.Zero, max)(f)
|
||||
def within[T](max: FiniteDuration)(f: ⇒ T): T =
|
||||
within_internal(Duration.Zero, max, f)
|
||||
|
||||
protected def within_internal[T](min: FiniteDuration, max: FiniteDuration, f: ⇒ T): T
|
||||
|
||||
/**
|
||||
* Same as `expectMessage(remainingOrDefault, obj)`, but correctly treating the timeFactor.
|
||||
*/
|
||||
def expectMessage[T <: M](obj: T): T = expectMessage_internal(remainingOrDefault, obj)
|
||||
def expectMessage[T <: M](obj: T): T
|
||||
|
||||
/**
|
||||
* Receive one message from the test actor and assert that it equals the
|
||||
|
|
@ -191,7 +115,7 @@ class TestProbe[M](name: String)(implicit system: ActorSystem[_]) {
|
|||
*
|
||||
* @return the received object
|
||||
*/
|
||||
def expectMessage[T <: M](max: FiniteDuration, obj: T): T = expectMessage_internal(max.dilated, obj)
|
||||
def expectMessage[T <: M](max: FiniteDuration, obj: T): T
|
||||
|
||||
/**
|
||||
* Receive one message from the test actor and assert that it equals the
|
||||
|
|
@ -200,53 +124,19 @@ class TestProbe[M](name: String)(implicit system: ActorSystem[_]) {
|
|||
*
|
||||
* @return the received object
|
||||
*/
|
||||
def expectMessage[T <: M](max: FiniteDuration, hint: String, obj: T): T = expectMessage_internal(max.dilated, obj, Some(hint))
|
||||
|
||||
private def expectMessage_internal[T <: M](max: Duration, obj: T, hint: Option[String] = None): T = {
|
||||
val o = receiveOne(max)
|
||||
val hintOrEmptyString = hint.map(": " + _).getOrElse("")
|
||||
assert(o != null, s"timeout ($max) during expectMessage while waiting for $obj" + hintOrEmptyString)
|
||||
assert(obj == o, s"expected $obj, found $o" + hintOrEmptyString)
|
||||
o.asInstanceOf[T]
|
||||
}
|
||||
|
||||
/**
|
||||
* Receive one message from the internal queue of the TestActor. If the given
|
||||
* duration is zero, the queue is polled (non-blocking).
|
||||
*
|
||||
* This method does NOT automatically scale its Duration parameter!
|
||||
*/
|
||||
private def receiveOne(max: Duration): M = {
|
||||
val message =
|
||||
if (max == Duration.Zero) {
|
||||
queue.pollFirst
|
||||
} else if (max.isFinite) {
|
||||
queue.pollFirst(max.length, max.unit)
|
||||
} else {
|
||||
queue.takeFirst
|
||||
}
|
||||
lastWasNoMessage = false
|
||||
lastMessage = if (message == null) None else Some(message)
|
||||
message
|
||||
}
|
||||
def expectMessage[T <: M](max: FiniteDuration, hint: String, obj: T): T
|
||||
|
||||
/**
|
||||
* Assert that no message is received for the specified time.
|
||||
* Supplied value is not dilated.
|
||||
*/
|
||||
def expectNoMessage(max: FiniteDuration) { expectNoMessage_internal(max) }
|
||||
def expectNoMessage(max: FiniteDuration): Unit
|
||||
|
||||
/**
|
||||
* Assert that no message is received. Waits for the default period configured as `akka.actor.typed.test.expect-no-message-default`
|
||||
* That value is dilated.
|
||||
*/
|
||||
def expectNoMessage() { expectNoMessage_internal(settings.ExpectNoMessageDefaultTimeout.dilated) }
|
||||
|
||||
private def expectNoMessage_internal(max: FiniteDuration) {
|
||||
val o = receiveOne(max)
|
||||
assert(o == null, s"received unexpected message $o")
|
||||
lastWasNoMessage = true
|
||||
}
|
||||
def expectNoMessage(): Unit
|
||||
|
||||
/**
|
||||
* Same as `expectMessageType[T](remainingOrDefault)`, but correctly treating the timeFactor.
|
||||
|
|
@ -255,29 +145,21 @@ class TestProbe[M](name: String)(implicit system: ActorSystem[_]) {
|
|||
expectMessageClass_internal(remainingOrDefault, t.runtimeClass.asInstanceOf[Class[T]])
|
||||
|
||||
/**
|
||||
* Receive one message from the test actor and assert that it conforms to the
|
||||
* given type (after erasure). Wait time is bounded by the given duration,
|
||||
* with an [[AssertionError]] being thrown in case of timeout.
|
||||
*
|
||||
* @return the received object
|
||||
* Expect a message of type T to arrive within `max` or fail. `max` is dilated.
|
||||
*/
|
||||
def expectMessageType[T <: M](max: FiniteDuration)(implicit t: ClassTag[T]): T =
|
||||
expectMessageClass_internal(max.dilated, t.runtimeClass.asInstanceOf[Class[T]])
|
||||
|
||||
private[akka] def expectMessageClass_internal[C](max: FiniteDuration, c: Class[C]): C = {
|
||||
val o = receiveOne(max)
|
||||
assert(o != null, s"timeout ($max) during expectMessageClass waiting for $c")
|
||||
assert(BoxedType(c) isInstance o, s"expected $c, found ${o.getClass} ($o)")
|
||||
o.asInstanceOf[C]
|
||||
}
|
||||
protected def expectMessageClass_internal[C](max: FiniteDuration, c: Class[C]): C
|
||||
|
||||
/**
|
||||
* Allows for flexible matching of multiple messages within a timeout, the fisher function is fed each incoming
|
||||
* message, and returns one of the following effects to decide on what happens next:
|
||||
*
|
||||
* * [[FishingOutcomes.Continue]] - continue with the next message given that the timeout has not been reached
|
||||
* * [[FishingOutcomes.Complete]] - successfully complete and return the message
|
||||
* * [[FishingOutcomes.Fail]] - fail the test with a custom message
|
||||
* * [[FishingOutcomes.continue]] - continue with the next message given that the timeout has not been reached
|
||||
* * [[FishingOutcomes.continueAndIgnore]] - continue and do not save the message in the returned list
|
||||
* * [[FishingOutcomes.complete]] - successfully complete and return the message
|
||||
* * [[FishingOutcomes.fail]] - fail the test with a custom message
|
||||
*
|
||||
* Additionally failures includes the list of messages consumed. If a message of type `M` but not of type `T` is
|
||||
* received this will also fail the test, additionally if the `fisher` function throws a match error the error
|
||||
|
|
@ -288,56 +170,22 @@ class TestProbe[M](name: String)(implicit system: ActorSystem[_]) {
|
|||
* The timeout is dilated.
|
||||
* @return The messages accepted in the order they arrived
|
||||
*/
|
||||
def fishForMessage(max: FiniteDuration, hint: String = "")(fisher: M ⇒ FishingOutcome): List[M] = {
|
||||
// not tailrec but that should be ok
|
||||
def loop(timeout: FiniteDuration, seen: List[M]): List[M] = {
|
||||
val start = System.nanoTime()
|
||||
val msg = receiveOne(timeout)
|
||||
try {
|
||||
fisher(msg) match {
|
||||
case FishingOutcomes.Complete ⇒ (msg :: seen).reverse
|
||||
case FishingOutcomes.Fail(error) ⇒ throw new AssertionError(s"$error, hint: $hint")
|
||||
case continue ⇒
|
||||
val newTimeout =
|
||||
if (timeout.isFinite()) timeout - (System.nanoTime() - start).nanos
|
||||
else timeout
|
||||
if (newTimeout.toMillis <= 0) {
|
||||
throw new AssertionError(s"timeout ($max) during fishForMessage, seen messages ${seen.reverse}, hint: $hint")
|
||||
} else {
|
||||
continue match {
|
||||
case FishingOutcomes.Continue ⇒ loop(newTimeout, msg :: seen)
|
||||
case FishingOutcomes.ContinueAndIgnore ⇒ loop(newTimeout, seen)
|
||||
}
|
||||
def fishForMessage(max: FiniteDuration, hint: String)(fisher: M ⇒ FishingOutcome): immutable.Seq[M] =
|
||||
fishForMessage_internal(max, hint, fisher)
|
||||
|
||||
}
|
||||
}
|
||||
} catch {
|
||||
case ex: MatchError ⇒ throw new AssertionError(
|
||||
s"Unexpected message $msg while fishing for messages, " +
|
||||
s"seen messages ${seen.reverse}, hint: $hint", ex)
|
||||
}
|
||||
}
|
||||
/**
|
||||
* Same as the other `fishForMessage` but with no hint
|
||||
*/
|
||||
def fishForMessage(max: FiniteDuration)(fisher: M ⇒ FishingOutcome): immutable.Seq[M] =
|
||||
fishForMessage(max, "")(fisher)
|
||||
|
||||
loop(max.dilated, Nil)
|
||||
}
|
||||
protected def fishForMessage_internal(max: FiniteDuration, hint: String, fisher: M ⇒ FishingOutcome): immutable.Seq[M]
|
||||
|
||||
/**
|
||||
* Expect the given actor to be stopped or stop withing the given timeout or
|
||||
* throw an [[AssertionError]].
|
||||
*/
|
||||
def expectTerminated[U](actorRef: ActorRef[U], max: FiniteDuration): Unit = {
|
||||
testActor.asInstanceOf[ActorRef[AnyRef]] ! WatchActor(actorRef)
|
||||
val message =
|
||||
if (max == Duration.Zero) {
|
||||
terminations.pollFirst
|
||||
} else if (max.isFinite) {
|
||||
terminations.pollFirst(max.length, max.unit)
|
||||
} else {
|
||||
terminations.takeFirst
|
||||
}
|
||||
assert(message != null, s"timeout ($max) during expectStop waiting for actor [${actorRef.path}] to stop")
|
||||
assert(message.ref == actorRef, s"expected [${actorRef.path}] to stop, but saw [${message.ref.path}] stop")
|
||||
}
|
||||
def expectTerminated[U](actorRef: ActorRef[U], max: FiniteDuration): Unit
|
||||
|
||||
/**
|
||||
* Evaluate the given assert every `interval` until it does not throw an exception and return the
|
||||
|
|
@ -351,29 +199,5 @@ class TestProbe[M](name: String)(implicit system: ActorSystem[_]) {
|
|||
* Note that the timeout is scaled using Duration.dilated,
|
||||
* which uses the configuration entry "akka.test.timefactor".
|
||||
*/
|
||||
def awaitAssert[A](a: ⇒ A, max: Duration = Duration.Undefined, interval: Duration = 100.millis): A = {
|
||||
val _max = remainingOrDilated(max)
|
||||
val stop = now + _max
|
||||
|
||||
@tailrec
|
||||
def poll(t: Duration): A = {
|
||||
val result: A =
|
||||
try {
|
||||
a
|
||||
} catch {
|
||||
case NonFatal(e) ⇒
|
||||
if ((now + t) >= stop) throw e
|
||||
else null.asInstanceOf[A]
|
||||
}
|
||||
|
||||
if (result != null) result
|
||||
else {
|
||||
Thread.sleep(t.toMillis)
|
||||
poll((stop - now) min interval)
|
||||
}
|
||||
}
|
||||
|
||||
poll(_max min interval)
|
||||
}
|
||||
|
||||
def awaitAssert[A](a: ⇒ A, max: Duration = Duration.Undefined, interval: Duration = 100.millis): A
|
||||
}
|
||||
|
|
|
|||
|
|
@ -0,0 +1,52 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2018 Lightbend Inc. <http://www.lightbend.com>
|
||||
*/
|
||||
package akka.testkit.typed.javadsl;
|
||||
|
||||
import akka.actor.typed.ActorRef;
|
||||
import akka.actor.typed.ActorSystem;
|
||||
import scala.concurrent.duration.Duration;
|
||||
import scala.concurrent.duration.FiniteDuration;
|
||||
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
public class TestProbeTest {
|
||||
|
||||
public static void compileOnlyApiTest() {
|
||||
ActorSystem<Object> system = null;
|
||||
TestProbe<String> probe = TestProbe.create(system);
|
||||
probe.ref();
|
||||
probe.awaitAssert(() -> {
|
||||
// ... something ...
|
||||
return null;
|
||||
});
|
||||
probe.awaitAssert(FiniteDuration.apply(3, TimeUnit.SECONDS), () -> {
|
||||
// ... something ...
|
||||
return null;
|
||||
});
|
||||
String awaitAssertResult =
|
||||
probe.awaitAssert(FiniteDuration.apply(3, TimeUnit.SECONDS), FiniteDuration.apply(100, TimeUnit.MILLISECONDS), () -> {
|
||||
// ... something ...
|
||||
return "some result";
|
||||
});
|
||||
String messageResult = probe.expectMessage("message");
|
||||
String expectClassResult = probe.expectMessageClass(String.class);
|
||||
probe.expectNoMessage();
|
||||
|
||||
ActorRef<String> ref = null;
|
||||
probe.expectTerminated(ref, FiniteDuration.apply(1, TimeUnit.SECONDS));
|
||||
|
||||
FiniteDuration remaining = probe.remaining();
|
||||
probe.fishForMessage(FiniteDuration.apply(3, TimeUnit.SECONDS), "hint", (msg) -> {
|
||||
if (msg.equals("one")) return FishingOutcomes.continueAndIgnore();
|
||||
else if (msg.equals("two")) return FishingOutcomes.complete();
|
||||
else return FishingOutcomes.fail("error");
|
||||
});
|
||||
|
||||
String withinResult = probe.within(FiniteDuration.apply(3, TimeUnit.SECONDS), () -> {
|
||||
// ... something ...
|
||||
return "result";
|
||||
});
|
||||
|
||||
}
|
||||
}
|
||||
|
|
@ -10,6 +10,24 @@ import scala.concurrent.duration._
|
|||
|
||||
class TestProbeSpec extends TestKit with WordSpecLike with Matchers with BeforeAndAfterAll {
|
||||
|
||||
def compileOnlyApiTest(): Unit = {
|
||||
val probe = TestProbe[AnyRef]()
|
||||
probe.fishForMessage(100.millis) {
|
||||
case _ ⇒ FishingOutcomes.complete
|
||||
}
|
||||
probe.awaitAssert({
|
||||
"result"
|
||||
})
|
||||
probe.expectMessageType[String]
|
||||
probe.expectMessage("whoa")
|
||||
probe.expectNoMessage()
|
||||
probe.expectNoMessage(300.millis)
|
||||
probe.expectTerminated(system.deadLetters, 100.millis)
|
||||
probe.within(100.millis) {
|
||||
"result"
|
||||
}
|
||||
}
|
||||
|
||||
"The test probe" must {
|
||||
|
||||
"allow probing for actor stop when actor already stopped" in {
|
||||
|
|
@ -44,8 +62,8 @@ class TestProbeSpec extends TestKit with WordSpecLike with Matchers with BeforeA
|
|||
probe.ref ! "two"
|
||||
|
||||
val result = probe.fishForMessage(300.millis) {
|
||||
case "one" ⇒ FishingOutcomes.Continue
|
||||
case "two" ⇒ FishingOutcomes.Complete
|
||||
case "one" ⇒ FishingOutcomes.continue
|
||||
case "two" ⇒ FishingOutcomes.complete
|
||||
}
|
||||
|
||||
result should ===(List("one", "two"))
|
||||
|
|
@ -60,8 +78,8 @@ class TestProbeSpec extends TestKit with WordSpecLike with Matchers with BeforeA
|
|||
|
||||
intercept[AssertionError] {
|
||||
probe.fishForMessage(300.millis) {
|
||||
case "one" ⇒ FishingOutcomes.Continue
|
||||
case "two" ⇒ FishingOutcomes.Fail("not the fish I'm looking for")
|
||||
case "one" ⇒ FishingOutcomes.continue
|
||||
case "two" ⇒ FishingOutcomes.fail("not the fish I'm looking for")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -74,7 +92,7 @@ class TestProbeSpec extends TestKit with WordSpecLike with Matchers with BeforeA
|
|||
|
||||
intercept[AssertionError] {
|
||||
probe.fishForMessage(300.millis) {
|
||||
case "one" ⇒ FishingOutcomes.Continue
|
||||
case "one" ⇒ FishingOutcomes.continue
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -86,7 +104,7 @@ class TestProbeSpec extends TestKit with WordSpecLike with Matchers with BeforeA
|
|||
|
||||
intercept[AssertionError] {
|
||||
probe.fishForMessage(300.millis) {
|
||||
case "one" ⇒ FishingOutcomes.Continue
|
||||
case "one" ⇒ FishingOutcomes.continue
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue