diff --git a/akka-actor-typed-tests/src/test/java/akka/actor/typed/ManualTimerTest.java b/akka-actor-typed-tests/src/test/java/akka/actor/typed/ManualTimerTest.java index 33fa91ee7e..606d1b7b9d 100644 --- a/akka-actor-typed-tests/src/test/java/akka/actor/typed/ManualTimerTest.java +++ b/akka-actor-typed-tests/src/test/java/akka/actor/typed/ManualTimerTest.java @@ -14,14 +14,14 @@ import akka.actor.typed.javadsl.Behaviors; import org.junit.Test; import akka.testkit.typed.TestKit; -import akka.testkit.typed.ExplicitlyTriggeredScheduler; +import akka.testkit.typed.javadsl.ExplicitlyTriggeredScheduler; import akka.testkit.typed.javadsl.TestProbe; public class ManualTimerTest extends TestKit { ExplicitlyTriggeredScheduler scheduler; public ManualTimerTest() { - super(parseString("akka.scheduler.implementation = \"akka.testkit.typed.ExplicitlyTriggeredScheduler\"")); + super(parseString("akka.scheduler.implementation = \"akka.testkit.typed.javadsl.ExplicitlyTriggeredScheduler\"")); this.scheduler = (ExplicitlyTriggeredScheduler) system().scheduler(); } @@ -30,7 +30,7 @@ public class ManualTimerTest extends TestKit { @Test public void testScheduleNonRepeatedTicks() { - TestProbe probe = new TestProbe<>(system()); + TestProbe probe = TestProbe.create(system()); Behavior behavior = Behaviors.withTimers(timer -> { timer.startSingleTimer("T", new Tick(), Duration.create(10, TimeUnit.MILLISECONDS)); return Behaviors.immutable( (ctx, tick) -> { @@ -44,7 +44,7 @@ public class ManualTimerTest extends TestKit { scheduler.expectNoMessageFor(Duration.create(9, TimeUnit.MILLISECONDS), probe); scheduler.timePasses(Duration.create(2, TimeUnit.MILLISECONDS)); - probe.expectMessageType(Tock.class); + probe.expectMessageClass(Tock.class); scheduler.expectNoMessageFor(Duration.create(10, TimeUnit.SECONDS), probe); } diff --git a/akka-actor-typed-tests/src/test/java/akka/actor/typed/javadsl/ActorContextAskTest.java b/akka-actor-typed-tests/src/test/java/akka/actor/typed/javadsl/ActorContextAskTest.java index 99ea9cc784..029136ed92 100644 --- a/akka-actor-typed-tests/src/test/java/akka/actor/typed/javadsl/ActorContextAskTest.java +++ b/akka-actor-typed-tests/src/test/java/akka/actor/typed/javadsl/ActorContextAskTest.java @@ -42,7 +42,7 @@ public class ActorContextAskTest extends JUnitSuite { final ActorRef pingPong = Adapter.spawnAnonymous(system, pingPongBehavior); - final TestProbe probe = new TestProbe<>(Adapter.toTyped(system)); + final TestProbe probe = TestProbe.create(Adapter.toTyped(system)); final Behavior snitch = Behaviors.setup((ActorContext ctx) -> { ctx.ask(Pong.class, @@ -62,7 +62,7 @@ public class ActorContextAskTest extends JUnitSuite { Adapter.spawnAnonymous(system, snitch); - probe.expectMessageType(Pong.class); + probe.expectMessageClass(Pong.class); } diff --git a/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/InteractionPatternsTest.java b/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/InteractionPatternsTest.java index cfd9424157..980df6550f 100644 --- a/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/InteractionPatternsTest.java +++ b/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/InteractionPatternsTest.java @@ -8,7 +8,7 @@ import akka.actor.typed.ActorSystem; import akka.actor.typed.Behavior; import akka.actor.typed.Props; import akka.actor.typed.javadsl.*; -import akka.testkit.typed.scaladsl.TestProbe; +import akka.testkit.typed.javadsl.TestProbe; import akka.util.Timeout; import org.junit.Test; import org.scalatest.junit.JUnitSuite; @@ -323,7 +323,7 @@ public class InteractionPatternsTest extends JUnitSuite { @Test public void timers() throws Exception { final ActorSystem system = ActorSystem.create(Behaviors.empty(), "timers-sample"); - TestProbe probe = new TestProbe<>("batcher", system); + TestProbe probe = TestProbe.create("batcher", system); ActorRef bufferer = Await.result(system.systemActorOf( behavior(probe.ref(), new FiniteDuration(1, TimeUnit.SECONDS), 10), "batcher", Props.empty(), akka.util.Timeout.apply(1, TimeUnit.SECONDS)), diff --git a/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/testing/async/BasicAsyncTestingTest.java b/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/testing/async/BasicAsyncTestingTest.java index e584ef890f..1467c4e6c2 100644 --- a/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/testing/async/BasicAsyncTestingTest.java +++ b/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/testing/async/BasicAsyncTestingTest.java @@ -49,7 +49,7 @@ public class BasicAsyncTestingTest extends TestKit { @Test public void testVerifyingAResponse() { //#test-spawn - TestProbe probe = new TestProbe<>(system()); + TestProbe probe = TestProbe.create(system()); ActorRef pinger = spawn(echoActor, "ping"); pinger.tell(new Ping("hello", probe.ref())); probe.expectMessage(new Pong("hello")); @@ -59,7 +59,7 @@ public class BasicAsyncTestingTest extends TestKit { @Test public void testVerifyingAResponseAnonymous() { //#test-spawn-anonymous - TestProbe probe = new TestProbe<>(system()); + TestProbe probe = TestProbe.create(system()); ActorRef pinger = spawn(echoActor); pinger.tell(new Ping("hello", probe.ref())); probe.expectMessage(new Pong("hello")); diff --git a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/TimerSpec.scala b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/TimerSpec.scala index af0afc67ba..7cb1b814b9 100644 --- a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/TimerSpec.scala +++ b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/TimerSpec.scala @@ -146,10 +146,10 @@ class TimerSpec extends TestKit("TimerSpec") ref ! Cancel probe.fishForMessage(3.seconds) { // we don't know that we will see exactly one tock - case _: Tock ⇒ FishingOutcomes.Continue + case _: Tock ⇒ FishingOutcomes.continue // but we know that after we saw Cancelled we won't see any more - case Cancelled ⇒ FishingOutcomes.Complete - case msg ⇒ FishingOutcomes.Fail(s"unexpected msg: $msg") + case Cancelled ⇒ FishingOutcomes.complete + case msg ⇒ FishingOutcomes.fail(s"unexpected msg: $msg") } probe.expectNoMessage(interval + 100.millis.dilated) diff --git a/akka-cluster-typed/src/test/java/akka/cluster/typed/ClusterApiTest.java b/akka-cluster-typed/src/test/java/akka/cluster/typed/ClusterApiTest.java index c1a5709d7b..a9f29a5e6a 100644 --- a/akka-cluster-typed/src/test/java/akka/cluster/typed/ClusterApiTest.java +++ b/akka-cluster-typed/src/test/java/akka/cluster/typed/ClusterApiTest.java @@ -35,22 +35,22 @@ public class ClusterApiTest extends JUnitSuite { Cluster cluster1 = Cluster.get(system1); Cluster cluster2 = Cluster.get(system2); - TestProbe probe1 = new TestProbe<>(system1); + TestProbe probe1 = TestProbe.create(system1); cluster1.subscriptions().tell(new Subscribe<>(probe1.ref().narrow(), SelfUp.class)); cluster1.manager().tell(new Join(cluster1.selfMember().address())); - probe1.expectMessageType(SelfUp.class); + probe1.expectMessageClass(SelfUp.class); - TestProbe probe2 = new TestProbe<>(system2); + TestProbe probe2 = TestProbe.create(system2); cluster2.subscriptions().tell(new Subscribe<>(probe2.ref().narrow(), SelfUp.class)); cluster2.manager().tell(new Join(cluster1.selfMember().address())); - probe2.expectMessageType(SelfUp.class); + probe2.expectMessageClass(SelfUp.class); cluster2.subscriptions().tell(new Subscribe<>(probe2.ref().narrow(), SelfRemoved.class)); cluster2.manager().tell(new Leave(cluster2.selfMember().address())); - probe2.expectMessageType(SelfRemoved.class); + probe2.expectMessageClass(SelfRemoved.class); } finally { // TODO no java API to terminate actor system Await.result(system1.terminate().zip(system2.terminate()), Duration.create("5 seconds")); diff --git a/akka-cluster-typed/src/test/java/jdocs/akka/cluster/typed/BasicClusterExampleTest.java b/akka-cluster-typed/src/test/java/jdocs/akka/cluster/typed/BasicClusterExampleTest.java index ea4941d62f..deb638e8b9 100644 --- a/akka-cluster-typed/src/test/java/jdocs/akka/cluster/typed/BasicClusterExampleTest.java +++ b/akka-cluster-typed/src/test/java/jdocs/akka/cluster/typed/BasicClusterExampleTest.java @@ -58,15 +58,15 @@ public class BasicClusterExampleTest { // extends JUnitSuite { Cluster cluster2 = Cluster.get(system2); //#cluster-subscribe - TestProbe testProbe = new TestProbe<>(system); + TestProbe testProbe = TestProbe.create(system); cluster.subscriptions().tell(Subscribe.create(testProbe.ref(), ClusterEvent.MemberEvent.class)); //#cluster-subscribe //#cluster-leave-example cluster.manager().tell(Leave.create(cluster2.selfMember().address())); - testProbe.expectMessageType(ClusterEvent.MemberLeft.class); - testProbe.expectMessageType(ClusterEvent.MemberExited.class); - testProbe.expectMessageType(ClusterEvent.MemberRemoved.class); + testProbe.expectMessageClass(ClusterEvent.MemberLeft.class); + testProbe.expectMessageClass(ClusterEvent.MemberExited.class); + testProbe.expectMessageClass(ClusterEvent.MemberRemoved.class); //#cluster-leave-example } finally { diff --git a/akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/PersistentActorTest.java b/akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/PersistentActorTest.java index 69c592a96a..83aa57e940 100644 --- a/akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/PersistentActorTest.java +++ b/akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/PersistentActorTest.java @@ -145,14 +145,14 @@ public class PersistentActorTest extends TestKit { private PersistentBehavior counter(String persistenceId, ActorRef> probe) { - ActorRef loggingProbe = new TestProbe(system()).ref(); + ActorRef loggingProbe = TestProbe.create(String.class, system()).ref(); return counter(persistenceId, probe, loggingProbe, (s, i, l) -> false); } private PersistentBehavior counter(String persistenceId) { return counter(persistenceId, - new TestProbe>(system()).ref(), - new TestProbe(system()).ref(), + TestProbe.>create(system()).ref(), + TestProbe.create(system()).ref(), (s, i, l) -> false); } @@ -161,8 +161,8 @@ public class PersistentActorTest extends TestKit { Function3 snapshot ) { return counter(persistenceId, - new TestProbe>(system()).ref(), - new TestProbe(system()).ref(), snapshot); + TestProbe.>create(system()).ref(), + TestProbe.create(system()).ref(), snapshot); } private PersistentBehavior counter( @@ -176,7 +176,7 @@ public class PersistentActorTest extends TestKit { String persistentId, ActorRef> eventProbe, Function3 snapshot) { - return counter(persistentId, eventProbe, new TestProbe(system()).ref(), snapshot); + return counter(persistentId, eventProbe, TestProbe.create(system()).ref(), snapshot); } private PersistentBehavior counter( @@ -253,7 +253,7 @@ public class PersistentActorTest extends TestKit { @Test public void persistEvents() { ActorRef c = spawn(counter("c2")); - TestProbe probe = new TestProbe<>(system()); + TestProbe probe = TestProbe.create(system()); c.tell(Increment.instance); c.tell(new GetValue(probe.ref())); probe.expectMessage(new State(1, singletonList(0))); @@ -262,7 +262,7 @@ public class PersistentActorTest extends TestKit { @Test public void replyStoredEvents() { ActorRef c = spawn(counter("c2")); - TestProbe probe = new TestProbe<>(system()); + TestProbe probe = TestProbe.create(system()); c.tell(Increment.instance); c.tell(Increment.instance); c.tell(Increment.instance); @@ -279,7 +279,7 @@ public class PersistentActorTest extends TestKit { @Test public void handleTerminatedSignal() { - TestProbe> eventHandlerProbe = new TestProbe<>(system()); + TestProbe> eventHandlerProbe = TestProbe.create(system()); ActorRef c = spawn(counter("c2", eventHandlerProbe.ref())); c.tell(Increment.instance); c.tell(new IncrementLater()); @@ -289,7 +289,7 @@ public class PersistentActorTest extends TestKit { @Test public void handleReceiveTimeout() { - TestProbe> eventHandlerProbe = new TestProbe<>(system()); + TestProbe> eventHandlerProbe = TestProbe.create(system()); ActorRef c = spawn(counter("c1", eventHandlerProbe.ref())); c.tell(new Increment100OnTimeout()); eventHandlerProbe.expectMessage(Pair.create(emptyState, timeoutEvent)); @@ -297,8 +297,8 @@ public class PersistentActorTest extends TestKit { @Test public void chainableSideEffectsWithEvents() { - TestProbe> eventHandlerProbe = new TestProbe<>(system()); - TestProbe loggingProbe = new TestProbe<>(system()); + TestProbe> eventHandlerProbe = TestProbe.create(system()); + TestProbe loggingProbe = TestProbe.create(system()); ActorRef c = spawn(counter("c1", eventHandlerProbe.ref(), loggingProbe.ref())); c.tell(new EmptyEventsListAndThenLog()); loggingProbe.expectMessage(loggingOne); @@ -311,11 +311,11 @@ public class PersistentActorTest extends TestKit { c.tell(Increment.instance); c.tell(Increment.instance); c.tell(Increment.instance); - TestProbe probe = new TestProbe<>(system()); + TestProbe probe = TestProbe.create(system()); c.tell(new GetValue(probe.ref())); probe.expectMessage(new State(3, Arrays.asList(0, 1, 2))); - TestProbe> eventProbe = new TestProbe<>(system()); + TestProbe> eventProbe = TestProbe.create(system()); snapshoter = counter("c11", eventProbe.ref(), (s, e, l) -> s.value % 2 == 0); ActorRef c2 = spawn(snapshoter); // First 2 are snapshot @@ -326,7 +326,7 @@ public class PersistentActorTest extends TestKit { @Test public void stopThenLog() { - TestProbe probe = new TestProbe<>(system()); + TestProbe probe = TestProbe.create(system()); ActorRef c = spawn(counter("c12")); c.tell(new StopThenLog()); probe.expectTerminated(c, FiniteDuration.create(1, TimeUnit.SECONDS)); diff --git a/akka-testkit-typed/src/main/scala/akka/testkit/typed/FishingOutcome.scala b/akka-testkit-typed/src/main/scala/akka/testkit/typed/FishingOutcome.scala new file mode 100644 index 0000000000..ad74b4ccc1 --- /dev/null +++ b/akka-testkit-typed/src/main/scala/akka/testkit/typed/FishingOutcome.scala @@ -0,0 +1,22 @@ +/** + * Copyright (C) 2009-2018 Lightbend Inc. + */ +package akka.testkit.typed + +import akka.annotation.DoNotInherit + +/** + * Not for user extension. + * + * Instances are available from `FishingOutcomes` in the respective dsls: [[akka.testkit.typed.scaladsl.FishingOutcomes]] + * and [[akka.testkit.typed.javadsl.FishingOutcomes]] + */ +@DoNotInherit sealed trait FishingOutcome + +object FishingOutcome { + + case object Continue extends FishingOutcome + case object ContinueAndIgnore extends FishingOutcome + case object Complete extends FishingOutcome + case class Fail(error: String) extends FishingOutcome +} diff --git a/akka-testkit-typed/src/main/scala/akka/testkit/typed/StubbedActorContext.scala b/akka-testkit-typed/src/main/scala/akka/testkit/typed/StubbedActorContext.scala index be6149b2a2..ca308d2a10 100644 --- a/akka-testkit-typed/src/main/scala/akka/testkit/typed/StubbedActorContext.scala +++ b/akka-testkit-typed/src/main/scala/akka/testkit/typed/StubbedActorContext.scala @@ -148,7 +148,7 @@ final case class CapturedLogEvent(logLevel: LogLevel, message: String, cause: Op new FunctionRef[U]( self.path / i.ref.path.name, (msg, _) ⇒ { val m = f(msg); if (m != null) { selfInbox.ref ! m; i.ref ! msg } }, - (self) ⇒ selfInbox.ref.sorry.sendSystem(internal.DeathWatchNotification(self, null))) + (self) ⇒ selfInbox.ref.sorry.sendSystem(DeathWatchNotification(self, null))) } /** diff --git a/akka-testkit-typed/src/main/scala/akka/testkit/typed/TestKit.scala b/akka-testkit-typed/src/main/scala/akka/testkit/typed/TestKit.scala index 9e2a4a6539..b943e08126 100644 --- a/akka-testkit-typed/src/main/scala/akka/testkit/typed/TestKit.scala +++ b/akka-testkit-typed/src/main/scala/akka/testkit/typed/TestKit.scala @@ -15,15 +15,7 @@ import scala.util.control.NoStackTrace /** * Exception without stack trace to use for verifying exceptions in tests */ -case class TE(message: String) extends RuntimeException(message) with NoStackTrace - -/** - * Not for user extension. - * - * Instances are available from `FishingOutcomes` in the respective dsls: [[akka.testkit.typed.scaladsl.FishingOutcomes]] - * and [[akka.testkit.typed.javadsl.FishingOutcomes]] - */ -@DoNotInherit abstract class FishingOutcome private[akka] () +final case class TE(message: String) extends RuntimeException(message) with NoStackTrace object TestKit { diff --git a/akka-testkit-typed/src/main/scala/akka/testkit/typed/internal/TestProbeImpl.scala b/akka-testkit-typed/src/main/scala/akka/testkit/typed/internal/TestProbeImpl.scala new file mode 100644 index 0000000000..41aadd12f0 --- /dev/null +++ b/akka-testkit-typed/src/main/scala/akka/testkit/typed/internal/TestProbeImpl.scala @@ -0,0 +1,245 @@ +/** + * Copyright (C) 2009-2018 Lightbend Inc. + */ +package akka.testkit.typed.internal + +import java.util.concurrent.atomic.AtomicInteger +import java.util.concurrent.{ BlockingDeque, LinkedBlockingDeque } +import java.util.function.Supplier + +import akka.actor.typed.scaladsl.Behaviors +import akka.actor.typed.{ ActorRef, ActorSystem, Behavior, Terminated } +import akka.annotation.InternalApi +import akka.testkit.typed.javadsl.{ TestProbe ⇒ JavaTestProbe } +import akka.testkit.typed.scaladsl.{ TestDuration, TestProbe ⇒ ScalaTestProbe } +import akka.testkit.typed.{ FishingOutcome, TestKitSettings } +import akka.util.PrettyDuration._ +import akka.util.{ BoxedType, Timeout } + +import scala.annotation.tailrec +import scala.concurrent.Await +import scala.concurrent.duration._ +import scala.util.control.NonFatal + +@InternalApi +private[akka] object TestProbeImpl { + private val testActorId = new AtomicInteger(0) + + private case class WatchActor[U](actor: ActorRef[U]) + private def testActor[M](queue: BlockingDeque[M], terminations: BlockingDeque[Terminated]): Behavior[M] = Behaviors.immutable[M] { (ctx, msg) ⇒ + msg match { + case WatchActor(ref) ⇒ ctx.watch(ref) + case other ⇒ queue.offerLast(other) + } + Behaviors.same + }.onSignal { + case (_, t: Terminated) ⇒ + terminations.offerLast(t) + Behaviors.same + } +} + +@InternalApi +private[akka] final class TestProbeImpl[M](name: String, system: ActorSystem[_]) extends JavaTestProbe[M] with ScalaTestProbe[M] { + + import TestProbeImpl._ + protected implicit val settings = TestKitSettings(system) + private val queue = new LinkedBlockingDeque[M] + private val terminations = new LinkedBlockingDeque[Terminated] + + private var end: Duration = Duration.Undefined + + /** + * if last assertion was expectNoMessage, disable timing failure upon within() + * block end. + */ + private var lastWasNoMessage = false + + private var lastMessage: Option[M] = None + + private val testActor: ActorRef[M] = { + // FIXME arbitrary timeout? + implicit val timeout = Timeout(3.seconds) + val futRef = system.systemActorOf(TestProbeImpl.testActor(queue, terminations), s"$name-${testActorId.incrementAndGet()}") + Await.result(futRef, timeout.duration + 1.second) + } + + override def ref = testActor + + override def remainingOrDefault = remainingOr(settings.SingleExpectDefaultTimeout.dilated) + + override def remaining: FiniteDuration = end match { + case f: FiniteDuration ⇒ f - now + case _ ⇒ throw new AssertionError("`remaining` may not be called outside of `within`") + } + + override def remainingOr(duration: FiniteDuration): FiniteDuration = end match { + case x if x eq Duration.Undefined ⇒ duration + case x if !x.isFinite ⇒ throw new IllegalArgumentException("`end` cannot be infinite") + case f: FiniteDuration ⇒ f - now + } + + private def remainingOrDilated(max: Duration): FiniteDuration = max match { + case x if x eq Duration.Undefined ⇒ remainingOrDefault + case x if !x.isFinite ⇒ throw new IllegalArgumentException("max duration cannot be infinite") + case f: FiniteDuration ⇒ f.dilated + } + + override protected def within_internal[T](min: FiniteDuration, max: FiniteDuration, f: ⇒ T): T = { + val _max = max.dilated + val start = now + val rem = if (end == Duration.Undefined) Duration.Inf else end - start + assert(rem >= min, s"required min time $min not possible, only ${rem.pretty} left") + + lastWasNoMessage = false + + val max_diff = _max min rem + val prev_end = end + end = start + max_diff + + val ret = try f finally end = prev_end + + val diff = now - start + assert(min <= diff, s"block took ${diff.pretty}, should at least have been $min") + if (!lastWasNoMessage) { + assert(diff <= max_diff, s"block took ${diff.pretty}, exceeding ${max_diff.pretty}") + } + + ret + } + + override def expectMessage[T <: M](obj: T): T = expectMessage_internal(remainingOrDefault, obj) + + override def expectMessage[T <: M](max: FiniteDuration, obj: T): T = expectMessage_internal(max.dilated, obj) + + override def expectMessage[T <: M](max: FiniteDuration, hint: String, obj: T): T = + expectMessage_internal(max.dilated, obj, Some(hint)) + + private def expectMessage_internal[T <: M](max: Duration, obj: T, hint: Option[String] = None): T = { + val o = receiveOne(max) + val hintOrEmptyString = hint.map(": " + _).getOrElse("") + assert(o != null, s"timeout ($max) during expectMessage while waiting for $obj" + hintOrEmptyString) + assert(obj == o, s"expected $obj, found $o" + hintOrEmptyString) + o.asInstanceOf[T] + } + + /** + * Receive one message from the internal queue of the TestActor. If the given + * duration is zero, the queue is polled (non-blocking). + * + * This method does NOT automatically scale its Duration parameter! + */ + private def receiveOne(max: Duration): M = { + val message = + if (max == Duration.Zero) { + queue.pollFirst + } else if (max.isFinite) { + queue.pollFirst(max.length, max.unit) + } else { + queue.takeFirst + } + lastWasNoMessage = false + lastMessage = if (message == null) None else Some(message) + message + } + + override def expectNoMessage(max: FiniteDuration): Unit = { expectNoMessage_internal(max) } + + override def expectNoMessage(): Unit = { expectNoMessage_internal(settings.ExpectNoMessageDefaultTimeout.dilated) } + + private def expectNoMessage_internal(max: FiniteDuration) { + val o = receiveOne(max) + assert(o == null, s"received unexpected message $o") + lastWasNoMessage = true + } + + override protected def expectMessageClass_internal[C](max: FiniteDuration, c: Class[C]): C = { + val o = receiveOne(max) + assert(o != null, s"timeout ($max) during expectMessageClass waiting for $c") + assert(BoxedType(c) isInstance o, s"expected $c, found ${o.getClass} ($o)") + o.asInstanceOf[C] + } + + override protected def fishForMessage_internal(max: FiniteDuration, hint: String, fisher: M ⇒ FishingOutcome): List[M] = { + // not tailrec but that should be ok + def loop(timeout: FiniteDuration, seen: List[M]): List[M] = { + val start = System.nanoTime() + val msg = receiveOne(timeout) + try { + fisher(msg) match { + case FishingOutcome.Complete ⇒ (msg :: seen).reverse + case FishingOutcome.Fail(error) ⇒ throw new AssertionError(s"$error, hint: $hint") + case continue ⇒ + val newTimeout = + if (timeout.isFinite()) timeout - (System.nanoTime() - start).nanos + else timeout + if (newTimeout.toMillis <= 0) { + throw new AssertionError(s"timeout ($max) during fishForMessage, seen messages ${seen.reverse}, hint: $hint") + } else { + + continue match { + case FishingOutcome.Continue ⇒ loop(newTimeout, msg :: seen) + case FishingOutcome.ContinueAndIgnore ⇒ loop(newTimeout, seen) + case _ ⇒ ??? // cannot happen + } + + } + } + } catch { + case ex: MatchError ⇒ throw new AssertionError( + s"Unexpected message $msg while fishing for messages, " + + s"seen messages ${seen.reverse}, hint: $hint", ex) + } + } + + loop(max.dilated, Nil) + } + + override def expectTerminated[U](actorRef: ActorRef[U], max: FiniteDuration): Unit = { + testActor.asInstanceOf[ActorRef[AnyRef]] ! WatchActor(actorRef) + val message = + if (max == Duration.Zero) { + terminations.pollFirst + } else if (max.isFinite) { + terminations.pollFirst(max.length, max.unit) + } else { + terminations.takeFirst + } + assert(message != null, s"timeout ($max) during expectStop waiting for actor [${actorRef.path}] to stop") + assert(message.ref == actorRef, s"expected [${actorRef.path}] to stop, but saw [${message.ref.path}] stop") + } + + override def awaitAssert[A](max: Duration, interval: Duration, supplier: Supplier[A]): A = + awaitAssert(supplier.get(), max, interval) + + override def awaitAssert[A](a: ⇒ A, max: Duration = Duration.Undefined, interval: Duration = 100.millis): A = { + val _max = remainingOrDilated(max) + val stop = now + _max + + @tailrec + def poll(t: Duration): A = { + val result: A = + try { + a + } catch { + case NonFatal(e) ⇒ + if ((now + t) >= stop) throw e + else null.asInstanceOf[A] + } + + if (result != null) result + else { + Thread.sleep(t.toMillis) + poll((stop - now) min interval) + } + } + + poll(_max min interval) + } + + /** + * Obtain current time (`System.nanoTime`) as Duration. + */ + private def now: FiniteDuration = System.nanoTime.nanos + +} diff --git a/akka-testkit-typed/src/main/scala/akka/testkit/typed/ExplicitlyTriggeredScheduler.scala b/akka-testkit-typed/src/main/scala/akka/testkit/typed/javadsl/ExplicitlyTriggeredScheduler.scala similarity index 70% rename from akka-testkit-typed/src/main/scala/akka/testkit/typed/ExplicitlyTriggeredScheduler.scala rename to akka-testkit-typed/src/main/scala/akka/testkit/typed/javadsl/ExplicitlyTriggeredScheduler.scala index 979efe00a5..6ac97b80cc 100644 --- a/akka-testkit-typed/src/main/scala/akka/testkit/typed/ExplicitlyTriggeredScheduler.scala +++ b/akka-testkit-typed/src/main/scala/akka/testkit/typed/javadsl/ExplicitlyTriggeredScheduler.scala @@ -1,19 +1,22 @@ -package akka.testkit.typed +/** + * Copyright (C) 2009-2018 Lightbend Inc. + */ +package akka.testkit.typed.javadsl import java.util.concurrent.ThreadFactory +import akka.event.LoggingAdapter import com.typesafe.config.Config import scala.annotation.varargs import scala.concurrent.duration.{ Duration, FiniteDuration } -import akka.event.LoggingAdapter - class ExplicitlyTriggeredScheduler(config: Config, log: LoggingAdapter, tf: ThreadFactory) extends akka.testkit.ExplicitlyTriggeredScheduler(config, log, tf) { @varargs - def expectNoMessageFor(duration: FiniteDuration, on: scaladsl.TestProbe[_]*): Unit = { + def expectNoMessageFor(duration: FiniteDuration, on: TestProbe[_]*): Unit = { timePasses(duration) on.foreach(_.expectNoMessage(Duration.Zero)) } + } diff --git a/akka-testkit-typed/src/main/scala/akka/testkit/typed/javadsl/TestProbe.scala b/akka-testkit-typed/src/main/scala/akka/testkit/typed/javadsl/TestProbe.scala index acfb3b8de3..1c8095453a 100644 --- a/akka-testkit-typed/src/main/scala/akka/testkit/typed/javadsl/TestProbe.scala +++ b/akka-testkit-typed/src/main/scala/akka/testkit/typed/javadsl/TestProbe.scala @@ -3,42 +3,205 @@ */ package akka.testkit.typed.javadsl -import akka.actor.typed.ActorSystem -import akka.testkit.typed.FishingOutcome +import java.util.function.Supplier -import scala.concurrent.duration.FiniteDuration +import akka.actor.typed.{ ActorRef, ActorSystem } +import akka.annotation.DoNotInherit +import akka.testkit.typed.internal.TestProbeImpl +import akka.testkit.typed.{ FishingOutcome, TestKitSettings } +import akka.testkit.typed.scaladsl.TestDuration + +import scala.concurrent.duration.{ Duration, FiniteDuration } import scala.collection.JavaConverters._ +import scala.concurrent.duration._ object FishingOutcomes { + /** + * Consume this message and continue with the next + */ + def continue(): FishingOutcome = FishingOutcome.Continue /** * Consume this message and continue with the next */ - def continue(): FishingOutcome = akka.testkit.typed.scaladsl.FishingOutcomes.Continue + def continueAndIgnore(): FishingOutcome = akka.testkit.typed.FishingOutcome.ContinueAndIgnore /** * Complete fishing and return this message */ - def complete(): FishingOutcome = akka.testkit.typed.scaladsl.FishingOutcomes.Complete + def complete(): FishingOutcome = akka.testkit.typed.FishingOutcome.Complete /** * Fail fishing with a custom error message */ - def fail(error: String): FishingOutcome = akka.testkit.typed.scaladsl.FishingOutcomes.Fail(error) + def fail(error: String): FishingOutcome = akka.testkit.typed.FishingOutcome.Fail(error) +} + +object TestProbe { + + def create[M](system: ActorSystem[_]): TestProbe[M] = + create(name = "testProbe", system) + + def create[M](clazz: Class[M], system: ActorSystem[_]): TestProbe[M] = + create(system) + + def create[M](name: String, system: ActorSystem[_]): TestProbe[M] = + new TestProbeImpl[M](name, system) + + def create[M](name: String, clazz: Class[M], system: ActorSystem[_]): TestProbe[M] = + new TestProbeImpl[M](name, system) } /** - * Java API: + * Java API: * Create instances through the `create` factories in the [[TestProbe]] companion. + * + * A test probe is essentially a queryable mailbox which can be used in place of an actor and the received + * messages can then be asserted etc. + * + * Not for user extension */ -class TestProbe[M](name: String, system: ActorSystem[_]) extends akka.testkit.typed.scaladsl.TestProbe[M](name)(system) { +@DoNotInherit +abstract class TestProbe[M] { - def this(system: ActorSystem[_]) = this("testProbe", system) + implicit protected def settings: TestKitSettings /** - * Same as `expectMsgType[T](remainingOrDefault)`, but correctly treating the timeFactor. + * ActorRef for this TestProbe */ - def expectMessageType[T <: M](t: Class[T]): T = - expectMessageClass_internal(remainingOrDefault, t) + def ref: ActorRef[M] + + /** + * Obtain time remaining for execution of the innermost enclosing `within` + * block or missing that it returns the properly dilated default for this + * case from settings (key "akka.actor.typed.test.single-expect-default"). + */ + def remainingOrDefault: FiniteDuration + + /** + * Obtain time remaining for execution of the innermost enclosing `within` + * block or throw an [[AssertionError]] if no `within` block surrounds this + * call. + */ + def remaining: FiniteDuration + + /** + * Obtain time remaining for execution of the innermost enclosing `within` + * block or missing that it returns the given duration. + */ + def remainingOr(duration: FiniteDuration): FiniteDuration + + /** + * Execute code block while bounding its execution time between `min` and + * `max`. `within` blocks may be nested. All methods in this trait which + * take maximum wait times are available in a version which implicitly uses + * the remaining time governed by the innermost enclosing `within` block. + * + * Note that the timeout is scaled using Duration.dilated, which uses the + * configuration entry "akka.actor.typed.test.timefactor", while the min Duration is not. + * + * {{{ + * val ret = within(50 millis) { + * test ! Ping + * expectMessageType[Pong] + * } + * }}} + */ + def within[T](min: FiniteDuration, max: FiniteDuration)(f: Supplier[T]): T = + within_internal(min, max, f.get()) + + /** + * Same as calling `within(0 seconds, max)(f)`. + */ + def within[T](max: FiniteDuration)(f: Supplier[T]): T = + within_internal(Duration.Zero, max, f.get()) + + protected def within_internal[T](min: FiniteDuration, max: FiniteDuration, f: ⇒ T): T + + /** + * Same as `expectMessage(remainingOrDefault, obj)`, but correctly treating the timeFactor. + */ + def expectMessage[T <: M](obj: T): T + + /** + * Receive one message from the test actor and assert that it equals the + * given object. Wait time is bounded by the given duration, with an + * [[AssertionError]] being thrown in case of timeout. + * + * @return the received object + */ + def expectMessage[T <: M](max: FiniteDuration, obj: T): T + + /** + * Receive one message from the test actor and assert that it equals the + * given object. Wait time is bounded by the given duration, with an + * [[AssertionError]] being thrown in case of timeout. + * + * @return the received object + */ + def expectMessage[T <: M](max: FiniteDuration, hint: String, obj: T): T + + /** + * Assert that no message is received for the specified time. + * Supplied value is not dilated. + */ + def expectNoMessage(max: FiniteDuration): Unit + + /** + * Assert that no message is received. Waits for the default period configured as `akka.actor.typed.test.expect-no-message-default` + * That value is dilated. + */ + def expectNoMessage(): Unit + + /** + * Expect the given actor to be stopped or stop withing the given timeout or + * throw an [[AssertionError]]. + */ + def expectTerminated[U](actorRef: ActorRef[U], max: FiniteDuration): Unit + + /** + * Evaluate the given assert every `interval` until it does not throw an exception and return the + * result. + * + * If the `max` timeout expires the last exception is thrown. + * + * Note that the timeout is scaled using Duration.dilated, which uses the configuration entry "akka.test.timefactor". + */ + def awaitAssert[A](max: Duration, interval: Duration, supplier: Supplier[A]): A + + /** + * Evaluate the given assert every 100 milliseconds until it does not throw an exception and return the + * result. + * + * If the `max` timeout expires the last exception is thrown. + * + * Note that the timeout is scaled using Duration.dilated, which uses the configuration entry "akka.test.timefactor". + */ + def awaitAssert[A](max: Duration, supplier: Supplier[A]): A = + awaitAssert(max, 100.millis, supplier) + + /** + * Evaluate the given assert every 100 milliseconds until it does not throw an exception and return the + * result. A max time is taken it from the innermost enclosing `within` block. + */ + def awaitAssert[A](supplier: Supplier[A]): A = + awaitAssert(Duration.Undefined, supplier) + + // FIXME awaitAssert(Procedure): Unit would be nice for java people to not have to return null + + /** + * Same as `expectMessageType(clazz, remainingOrDefault)`, but correctly treating the timeFactor. + */ + def expectMessageClass[T](clazz: Class[T]): T = + expectMessageClass_internal(remainingOrDefault, clazz) + + /** + * Wait for a message of type M and return it when it arrives, or fail if the `max` timeout is hit. + * The timeout is dilated. + */ + def expectMessageClass[T](clazz: Class[T], max: FiniteDuration): T = + expectMessageClass_internal(max.dilated, clazz) + + protected def expectMessageClass_internal[C](max: FiniteDuration, c: Class[C]): C /** * Java API: Allows for flexible matching of multiple messages within a timeout, the fisher function is fed each incoming @@ -57,14 +220,15 @@ class TestProbe[M](name: String, system: ActorSystem[_]) extends akka.testkit.ty * The timeout is dilated. * @return The messages accepted in the order they arrived */ - // FIXME same name would cause ambiguity but I'm out of ideas how to fix, separate Scala/Java TestProbe APIs? - def fishForMessageJava(max: FiniteDuration, fisher: java.util.function.Function[M, FishingOutcome]): java.util.List[M] = - fishForMessage(max)(fisher.apply).asJava + def fishForMessage(max: FiniteDuration, fisher: java.util.function.Function[M, FishingOutcome]): java.util.List[M] = + fishForMessage(max, "", fisher) /** * Same as the other `fishForMessageJava` but includes the provided hint in all error messages */ - def fishForMessageJava(max: FiniteDuration, hint: String, fisher: java.util.function.Function[M, FishingOutcome]): java.util.List[M] = - fishForMessage(max, hint)(fisher.apply).asJava + def fishForMessage(max: FiniteDuration, hint: String, fisher: java.util.function.Function[M, FishingOutcome]): java.util.List[M] = + fishForMessage_internal(max, hint, fisher.apply).asJava + + protected def fishForMessage_internal(max: FiniteDuration, hint: String, fisher: M ⇒ FishingOutcome): List[M] } diff --git a/akka-testkit-typed/src/main/scala/akka/testkit/typed/scaladsl/ExplicitlyTriggeredScheduler.scala b/akka-testkit-typed/src/main/scala/akka/testkit/typed/scaladsl/ExplicitlyTriggeredScheduler.scala new file mode 100644 index 0000000000..b41b908d8b --- /dev/null +++ b/akka-testkit-typed/src/main/scala/akka/testkit/typed/scaladsl/ExplicitlyTriggeredScheduler.scala @@ -0,0 +1,22 @@ +/** + * Copyright (C) 2009-2018 Lightbend Inc. + */ +package akka.testkit.typed.scaladsl + +import java.util.concurrent.ThreadFactory + +import akka.event.LoggingAdapter +import com.typesafe.config.Config + +import scala.annotation.varargs +import scala.concurrent.duration.{ Duration, FiniteDuration } + +class ExplicitlyTriggeredScheduler(config: Config, log: LoggingAdapter, tf: ThreadFactory) extends akka.testkit.ExplicitlyTriggeredScheduler(config, log, tf) { + + @varargs + def expectNoMessageFor(duration: FiniteDuration, on: TestProbe[_]*): Unit = { + timePasses(duration) + on.foreach(_.expectNoMessage(Duration.Zero)) + } + +} diff --git a/akka-testkit-typed/src/main/scala/akka/testkit/typed/scaladsl/ManualTime.scala b/akka-testkit-typed/src/main/scala/akka/testkit/typed/scaladsl/ManualTime.scala index 7136e02838..5625ef3f3f 100644 --- a/akka-testkit-typed/src/main/scala/akka/testkit/typed/scaladsl/ManualTime.scala +++ b/akka-testkit-typed/src/main/scala/akka/testkit/typed/scaladsl/ManualTime.scala @@ -5,7 +5,7 @@ import com.typesafe.config.{ Config, ConfigFactory } import akka.testkit.typed._ object ManualTime { - val config: Config = ConfigFactory.parseString("""akka.scheduler.implementation = "akka.testkit.typed.ExplicitlyTriggeredScheduler"""") + val config: Config = ConfigFactory.parseString("""akka.scheduler.implementation = "akka.testkit.typed.scaladsl.ExplicitlyTriggeredScheduler"""") } trait ManualTime { self: TestKit ⇒ override val scheduler: ExplicitlyTriggeredScheduler = self.system.scheduler.asInstanceOf[ExplicitlyTriggeredScheduler] diff --git a/akka-testkit-typed/src/main/scala/akka/testkit/typed/scaladsl/TestProbe.scala b/akka-testkit-typed/src/main/scala/akka/testkit/typed/scaladsl/TestProbe.scala index a3bc3fefa1..0de9b6f20f 100644 --- a/akka-testkit-typed/src/main/scala/akka/testkit/typed/scaladsl/TestProbe.scala +++ b/akka-testkit-typed/src/main/scala/akka/testkit/typed/scaladsl/TestProbe.scala @@ -3,137 +3,79 @@ */ package akka.testkit.typed.scaladsl -import scala.concurrent.duration._ -import java.util.concurrent.BlockingDeque - -import akka.actor.typed.{ ActorRef, ActorSystem, Behavior, Terminated } -import akka.actor.typed.scaladsl.Behaviors -import java.util.concurrent.LinkedBlockingDeque -import java.util.concurrent.atomic.AtomicInteger - +import akka.actor.typed.{ ActorRef, ActorSystem } import akka.annotation.DoNotInherit -import akka.util.Timeout -import akka.util.PrettyDuration.PrettyPrintableDuration - -import scala.concurrent.Await +import akka.testkit.typed.internal.TestProbeImpl import akka.testkit.typed.{ FishingOutcome, TestKitSettings } -import akka.util.BoxedType -import scala.annotation.tailrec +import scala.concurrent.duration._ import scala.reflect.ClassTag -import scala.util.control.NonFatal +import scala.collection.immutable + +object FishingOutcomes { + /** + * Complete fishing and return all messages up until this + */ + val complete = FishingOutcome.Complete + /** + * Consume this message, collect it into the result, and continue with the next message + */ + val continue = FishingOutcome.Continue + /** + * Consume this message, but do not collect it into the result, and continue with the next message + */ + val continueAndIgnore = FishingOutcome.ContinueAndIgnore + /** + * Fail fishing with a custom error message + */ + def fail(msg: String) = FishingOutcome.Fail(msg) +} object TestProbe { - private val testActorId = new AtomicInteger(0) - def apply[M]()(implicit system: ActorSystem[_]): TestProbe[M] = apply(name = "testProbe") def apply[M](name: String)(implicit system: ActorSystem[_]): TestProbe[M] = - new TestProbe(name) + new TestProbeImpl[M](name, system) - private case class WatchActor[U](actor: ActorRef[U]) - private def testActor[M](queue: BlockingDeque[M], terminations: BlockingDeque[Terminated]): Behavior[M] = Behaviors.immutable[M] { (ctx, msg) ⇒ - msg match { - case WatchActor(ref) ⇒ ctx.watch(ref) - case other ⇒ queue.offerLast(other) - } - Behaviors.same - }.onSignal { - case (_, t: Terminated) ⇒ - terminations.offerLast(t) - Behaviors.same - } } -object FishingOutcomes { +/** + * Create instances through the factories in the [[TestProbe]] companion. + * + * A test probe is essentially a queryable mailbox which can be used in place of an actor and the received + * messages can then be asserted + * + * Not for user extension + */ +@DoNotInherit trait TestProbe[M] { + + implicit protected def settings: TestKitSettings /** - * Consume this message, collect it into the result, and continue with the next message + * ActorRef for this TestProbe */ - case object Continue extends FishingOutcome - - /** - * Consume this message, but do not collect it into the result, and continue with the next message - */ - case object ContinueAndIgnore extends FishingOutcome - - /** - * Complete fishing and return this message - */ - case object Complete extends FishingOutcome - - /** - * Fail fishing with a custom error message - */ - case class Fail(error: String) extends FishingOutcome -} - -class TestProbe[M](name: String)(implicit system: ActorSystem[_]) { - - import TestProbe._ - private implicit val settings = TestKitSettings(system) - private val queue = new LinkedBlockingDeque[M] - private val terminations = new LinkedBlockingDeque[Terminated] - - private var end: Duration = Duration.Undefined - - /** - * if last assertion was expectNoMessage, disable timing failure upon within() - * block end. - */ - private var lastWasNoMessage = false - - private var lastMessage: Option[M] = None - - val testActor: ActorRef[M] = { - implicit val timeout = Timeout(3.seconds) - val futRef = system.systemActorOf(TestProbe.testActor(queue, terminations), s"$name-${testActorId.incrementAndGet()}") - Await.result(futRef, timeout.duration + 1.second) - } - - /** - * Shorthand to get the `testActor`. - */ - def ref: ActorRef[M] = testActor - - /** - * Obtain current time (`System.nanoTime`) as Duration. - */ - protected def now: FiniteDuration = System.nanoTime.nanos + def ref: ActorRef[M] /** * Obtain time remaining for execution of the innermost enclosing `within` * block or missing that it returns the properly dilated default for this * case from settings (key "akka.actor.typed.test.single-expect-default"). */ - def remainingOrDefault = remainingOr(settings.SingleExpectDefaultTimeout.dilated) + def remainingOrDefault: FiniteDuration /** * Obtain time remaining for execution of the innermost enclosing `within` * block or throw an [[AssertionError]] if no `within` block surrounds this * call. */ - def remaining: FiniteDuration = end match { - case f: FiniteDuration ⇒ f - now - case _ ⇒ throw new AssertionError("`remaining` may not be called outside of `within`") - } + def remaining: FiniteDuration /** * Obtain time remaining for execution of the innermost enclosing `within` * block or missing that it returns the given duration. */ - def remainingOr(duration: FiniteDuration): FiniteDuration = end match { - case x if x eq Duration.Undefined ⇒ duration - case x if !x.isFinite ⇒ throw new IllegalArgumentException("`end` cannot be infinite") - case f: FiniteDuration ⇒ f - now - } - - private def remainingOrDilated(max: Duration): FiniteDuration = max match { - case x if x eq Duration.Undefined ⇒ remainingOrDefault - case x if !x.isFinite ⇒ throw new IllegalArgumentException("max duration cannot be infinite") - case f: FiniteDuration ⇒ f.dilated - } + def remainingOr(duration: FiniteDuration): FiniteDuration /** * Execute code block while bounding its execution time between `min` and @@ -151,38 +93,20 @@ class TestProbe[M](name: String)(implicit system: ActorSystem[_]) { * } * }}} */ - def within[T](min: FiniteDuration, max: FiniteDuration)(f: ⇒ T): T = { - val _max = max.dilated - val start = now - val rem = if (end == Duration.Undefined) Duration.Inf else end - start - assert(rem >= min, s"required min time $min not possible, only ${rem.pretty} left") - - lastWasNoMessage = false - - val max_diff = _max min rem - val prev_end = end - end = start + max_diff - - val ret = try f finally end = prev_end - - val diff = now - start - assert(min <= diff, s"block took ${diff.pretty}, should at least have been $min") - if (!lastWasNoMessage) { - assert(diff <= max_diff, s"block took ${diff.pretty}, exceeding ${max_diff.pretty}") - } - - ret - } - + def within[T](min: FiniteDuration, max: FiniteDuration)(f: ⇒ T): T = + within_internal(min, max, f) /** * Same as calling `within(0 seconds, max)(f)`. */ - def within[T](max: FiniteDuration)(f: ⇒ T): T = within(Duration.Zero, max)(f) + def within[T](max: FiniteDuration)(f: ⇒ T): T = + within_internal(Duration.Zero, max, f) + + protected def within_internal[T](min: FiniteDuration, max: FiniteDuration, f: ⇒ T): T /** * Same as `expectMessage(remainingOrDefault, obj)`, but correctly treating the timeFactor. */ - def expectMessage[T <: M](obj: T): T = expectMessage_internal(remainingOrDefault, obj) + def expectMessage[T <: M](obj: T): T /** * Receive one message from the test actor and assert that it equals the @@ -191,7 +115,7 @@ class TestProbe[M](name: String)(implicit system: ActorSystem[_]) { * * @return the received object */ - def expectMessage[T <: M](max: FiniteDuration, obj: T): T = expectMessage_internal(max.dilated, obj) + def expectMessage[T <: M](max: FiniteDuration, obj: T): T /** * Receive one message from the test actor and assert that it equals the @@ -200,53 +124,19 @@ class TestProbe[M](name: String)(implicit system: ActorSystem[_]) { * * @return the received object */ - def expectMessage[T <: M](max: FiniteDuration, hint: String, obj: T): T = expectMessage_internal(max.dilated, obj, Some(hint)) - - private def expectMessage_internal[T <: M](max: Duration, obj: T, hint: Option[String] = None): T = { - val o = receiveOne(max) - val hintOrEmptyString = hint.map(": " + _).getOrElse("") - assert(o != null, s"timeout ($max) during expectMessage while waiting for $obj" + hintOrEmptyString) - assert(obj == o, s"expected $obj, found $o" + hintOrEmptyString) - o.asInstanceOf[T] - } - - /** - * Receive one message from the internal queue of the TestActor. If the given - * duration is zero, the queue is polled (non-blocking). - * - * This method does NOT automatically scale its Duration parameter! - */ - private def receiveOne(max: Duration): M = { - val message = - if (max == Duration.Zero) { - queue.pollFirst - } else if (max.isFinite) { - queue.pollFirst(max.length, max.unit) - } else { - queue.takeFirst - } - lastWasNoMessage = false - lastMessage = if (message == null) None else Some(message) - message - } + def expectMessage[T <: M](max: FiniteDuration, hint: String, obj: T): T /** * Assert that no message is received for the specified time. * Supplied value is not dilated. */ - def expectNoMessage(max: FiniteDuration) { expectNoMessage_internal(max) } + def expectNoMessage(max: FiniteDuration): Unit /** * Assert that no message is received. Waits for the default period configured as `akka.actor.typed.test.expect-no-message-default` * That value is dilated. */ - def expectNoMessage() { expectNoMessage_internal(settings.ExpectNoMessageDefaultTimeout.dilated) } - - private def expectNoMessage_internal(max: FiniteDuration) { - val o = receiveOne(max) - assert(o == null, s"received unexpected message $o") - lastWasNoMessage = true - } + def expectNoMessage(): Unit /** * Same as `expectMessageType[T](remainingOrDefault)`, but correctly treating the timeFactor. @@ -255,29 +145,21 @@ class TestProbe[M](name: String)(implicit system: ActorSystem[_]) { expectMessageClass_internal(remainingOrDefault, t.runtimeClass.asInstanceOf[Class[T]]) /** - * Receive one message from the test actor and assert that it conforms to the - * given type (after erasure). Wait time is bounded by the given duration, - * with an [[AssertionError]] being thrown in case of timeout. - * - * @return the received object + * Expect a message of type T to arrive within `max` or fail. `max` is dilated. */ def expectMessageType[T <: M](max: FiniteDuration)(implicit t: ClassTag[T]): T = expectMessageClass_internal(max.dilated, t.runtimeClass.asInstanceOf[Class[T]]) - private[akka] def expectMessageClass_internal[C](max: FiniteDuration, c: Class[C]): C = { - val o = receiveOne(max) - assert(o != null, s"timeout ($max) during expectMessageClass waiting for $c") - assert(BoxedType(c) isInstance o, s"expected $c, found ${o.getClass} ($o)") - o.asInstanceOf[C] - } + protected def expectMessageClass_internal[C](max: FiniteDuration, c: Class[C]): C /** * Allows for flexible matching of multiple messages within a timeout, the fisher function is fed each incoming * message, and returns one of the following effects to decide on what happens next: * - * * [[FishingOutcomes.Continue]] - continue with the next message given that the timeout has not been reached - * * [[FishingOutcomes.Complete]] - successfully complete and return the message - * * [[FishingOutcomes.Fail]] - fail the test with a custom message + * * [[FishingOutcomes.continue]] - continue with the next message given that the timeout has not been reached + * * [[FishingOutcomes.continueAndIgnore]] - continue and do not save the message in the returned list + * * [[FishingOutcomes.complete]] - successfully complete and return the message + * * [[FishingOutcomes.fail]] - fail the test with a custom message * * Additionally failures includes the list of messages consumed. If a message of type `M` but not of type `T` is * received this will also fail the test, additionally if the `fisher` function throws a match error the error @@ -288,56 +170,22 @@ class TestProbe[M](name: String)(implicit system: ActorSystem[_]) { * The timeout is dilated. * @return The messages accepted in the order they arrived */ - def fishForMessage(max: FiniteDuration, hint: String = "")(fisher: M ⇒ FishingOutcome): List[M] = { - // not tailrec but that should be ok - def loop(timeout: FiniteDuration, seen: List[M]): List[M] = { - val start = System.nanoTime() - val msg = receiveOne(timeout) - try { - fisher(msg) match { - case FishingOutcomes.Complete ⇒ (msg :: seen).reverse - case FishingOutcomes.Fail(error) ⇒ throw new AssertionError(s"$error, hint: $hint") - case continue ⇒ - val newTimeout = - if (timeout.isFinite()) timeout - (System.nanoTime() - start).nanos - else timeout - if (newTimeout.toMillis <= 0) { - throw new AssertionError(s"timeout ($max) during fishForMessage, seen messages ${seen.reverse}, hint: $hint") - } else { - continue match { - case FishingOutcomes.Continue ⇒ loop(newTimeout, msg :: seen) - case FishingOutcomes.ContinueAndIgnore ⇒ loop(newTimeout, seen) - } + def fishForMessage(max: FiniteDuration, hint: String)(fisher: M ⇒ FishingOutcome): immutable.Seq[M] = + fishForMessage_internal(max, hint, fisher) - } - } - } catch { - case ex: MatchError ⇒ throw new AssertionError( - s"Unexpected message $msg while fishing for messages, " + - s"seen messages ${seen.reverse}, hint: $hint", ex) - } - } + /** + * Same as the other `fishForMessage` but with no hint + */ + def fishForMessage(max: FiniteDuration)(fisher: M ⇒ FishingOutcome): immutable.Seq[M] = + fishForMessage(max, "")(fisher) - loop(max.dilated, Nil) - } + protected def fishForMessage_internal(max: FiniteDuration, hint: String, fisher: M ⇒ FishingOutcome): immutable.Seq[M] /** * Expect the given actor to be stopped or stop withing the given timeout or * throw an [[AssertionError]]. */ - def expectTerminated[U](actorRef: ActorRef[U], max: FiniteDuration): Unit = { - testActor.asInstanceOf[ActorRef[AnyRef]] ! WatchActor(actorRef) - val message = - if (max == Duration.Zero) { - terminations.pollFirst - } else if (max.isFinite) { - terminations.pollFirst(max.length, max.unit) - } else { - terminations.takeFirst - } - assert(message != null, s"timeout ($max) during expectStop waiting for actor [${actorRef.path}] to stop") - assert(message.ref == actorRef, s"expected [${actorRef.path}] to stop, but saw [${message.ref.path}] stop") - } + def expectTerminated[U](actorRef: ActorRef[U], max: FiniteDuration): Unit /** * Evaluate the given assert every `interval` until it does not throw an exception and return the @@ -351,29 +199,5 @@ class TestProbe[M](name: String)(implicit system: ActorSystem[_]) { * Note that the timeout is scaled using Duration.dilated, * which uses the configuration entry "akka.test.timefactor". */ - def awaitAssert[A](a: ⇒ A, max: Duration = Duration.Undefined, interval: Duration = 100.millis): A = { - val _max = remainingOrDilated(max) - val stop = now + _max - - @tailrec - def poll(t: Duration): A = { - val result: A = - try { - a - } catch { - case NonFatal(e) ⇒ - if ((now + t) >= stop) throw e - else null.asInstanceOf[A] - } - - if (result != null) result - else { - Thread.sleep(t.toMillis) - poll((stop - now) min interval) - } - } - - poll(_max min interval) - } - + def awaitAssert[A](a: ⇒ A, max: Duration = Duration.Undefined, interval: Duration = 100.millis): A } diff --git a/akka-testkit-typed/src/test/scala/akka/testkit/typed/javadsl/TestProbeTest.java b/akka-testkit-typed/src/test/scala/akka/testkit/typed/javadsl/TestProbeTest.java new file mode 100644 index 0000000000..f9096c3613 --- /dev/null +++ b/akka-testkit-typed/src/test/scala/akka/testkit/typed/javadsl/TestProbeTest.java @@ -0,0 +1,52 @@ +/** + * Copyright (C) 2009-2018 Lightbend Inc. + */ +package akka.testkit.typed.javadsl; + +import akka.actor.typed.ActorRef; +import akka.actor.typed.ActorSystem; +import scala.concurrent.duration.Duration; +import scala.concurrent.duration.FiniteDuration; + +import java.util.concurrent.TimeUnit; + +public class TestProbeTest { + + public static void compileOnlyApiTest() { + ActorSystem system = null; + TestProbe probe = TestProbe.create(system); + probe.ref(); + probe.awaitAssert(() -> { + // ... something ... + return null; + }); + probe.awaitAssert(FiniteDuration.apply(3, TimeUnit.SECONDS), () -> { + // ... something ... + return null; + }); + String awaitAssertResult = + probe.awaitAssert(FiniteDuration.apply(3, TimeUnit.SECONDS), FiniteDuration.apply(100, TimeUnit.MILLISECONDS), () -> { + // ... something ... + return "some result"; + }); + String messageResult = probe.expectMessage("message"); + String expectClassResult = probe.expectMessageClass(String.class); + probe.expectNoMessage(); + + ActorRef ref = null; + probe.expectTerminated(ref, FiniteDuration.apply(1, TimeUnit.SECONDS)); + + FiniteDuration remaining = probe.remaining(); + probe.fishForMessage(FiniteDuration.apply(3, TimeUnit.SECONDS), "hint", (msg) -> { + if (msg.equals("one")) return FishingOutcomes.continueAndIgnore(); + else if (msg.equals("two")) return FishingOutcomes.complete(); + else return FishingOutcomes.fail("error"); + }); + + String withinResult = probe.within(FiniteDuration.apply(3, TimeUnit.SECONDS), () -> { + // ... something ... + return "result"; + }); + + } +} diff --git a/akka-testkit-typed/src/test/scala/akka/testkit/typed/scaladsl/TestProbeSpec.scala b/akka-testkit-typed/src/test/scala/akka/testkit/typed/scaladsl/TestProbeSpec.scala index ca701a2ecb..5b9ba704f3 100644 --- a/akka-testkit-typed/src/test/scala/akka/testkit/typed/scaladsl/TestProbeSpec.scala +++ b/akka-testkit-typed/src/test/scala/akka/testkit/typed/scaladsl/TestProbeSpec.scala @@ -10,6 +10,24 @@ import scala.concurrent.duration._ class TestProbeSpec extends TestKit with WordSpecLike with Matchers with BeforeAndAfterAll { + def compileOnlyApiTest(): Unit = { + val probe = TestProbe[AnyRef]() + probe.fishForMessage(100.millis) { + case _ ⇒ FishingOutcomes.complete + } + probe.awaitAssert({ + "result" + }) + probe.expectMessageType[String] + probe.expectMessage("whoa") + probe.expectNoMessage() + probe.expectNoMessage(300.millis) + probe.expectTerminated(system.deadLetters, 100.millis) + probe.within(100.millis) { + "result" + } + } + "The test probe" must { "allow probing for actor stop when actor already stopped" in { @@ -44,8 +62,8 @@ class TestProbeSpec extends TestKit with WordSpecLike with Matchers with BeforeA probe.ref ! "two" val result = probe.fishForMessage(300.millis) { - case "one" ⇒ FishingOutcomes.Continue - case "two" ⇒ FishingOutcomes.Complete + case "one" ⇒ FishingOutcomes.continue + case "two" ⇒ FishingOutcomes.complete } result should ===(List("one", "two")) @@ -60,8 +78,8 @@ class TestProbeSpec extends TestKit with WordSpecLike with Matchers with BeforeA intercept[AssertionError] { probe.fishForMessage(300.millis) { - case "one" ⇒ FishingOutcomes.Continue - case "two" ⇒ FishingOutcomes.Fail("not the fish I'm looking for") + case "one" ⇒ FishingOutcomes.continue + case "two" ⇒ FishingOutcomes.fail("not the fish I'm looking for") } } } @@ -74,7 +92,7 @@ class TestProbeSpec extends TestKit with WordSpecLike with Matchers with BeforeA intercept[AssertionError] { probe.fishForMessage(300.millis) { - case "one" ⇒ FishingOutcomes.Continue + case "one" ⇒ FishingOutcomes.continue } } } @@ -86,7 +104,7 @@ class TestProbeSpec extends TestKit with WordSpecLike with Matchers with BeforeA intercept[AssertionError] { probe.fishForMessage(300.millis) { - case "one" ⇒ FishingOutcomes.Continue + case "one" ⇒ FishingOutcomes.continue } } }