java.time.Duration APIs in javadsl.TestKit #24646

This commit is contained in:
Jimin Hsieh 2018-05-29 18:17:48 +08:00 committed by Johan Andrén
parent e078e5a747
commit 7c3a8a8ed5
9 changed files with 96 additions and 65 deletions

View file

@ -9,6 +9,8 @@ import org.junit.Test;
import org.scalatest.junit.JUnitSuite; import org.scalatest.junit.JUnitSuite;
import scala.concurrent.duration.FiniteDuration; import scala.concurrent.duration.FiniteDuration;
import java.time.Duration;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import akka.actor.ActorSystem; import akka.actor.ActorSystem;
import akka.testkit.AkkaJUnitActorSystemResource; import akka.testkit.AkkaJUnitActorSystemResource;
@ -316,7 +318,7 @@ public class AdapterTest extends JUnitSuite {
} finally { } finally {
system.eventStream().setLogLevel(originalLogLevel); system.eventStream().setLogLevel(originalLogLevel);
} }
probe.expectNoMsg(FiniteDuration.create(100, TimeUnit.MILLISECONDS)); // no pong probe.expectNoMessage(Duration.ofMillis(100)); // no pong
} }
@Test @Test

View file

@ -81,7 +81,7 @@ out, in which case they use the default value from configuration item
`akka.test.single-expect-default` which itself defaults to 3 seconds (or they `akka.test.single-expect-default` which itself defaults to 3 seconds (or they
obey the innermost enclosing `Within` as detailed [below](#testkit-within)). The full signatures are: obey the innermost enclosing `Within` as detailed [below](#testkit-within)). The full signatures are:
* @scala[`expectMsg[T](d: Duration, msg: T): T`]@java[`public <T> T expectMsgEquals(FiniteDuration max, T msg)`] * @scala[`expectMsg[T](d: Duration, msg: T): T`]@java[`public <T> T expectMsgEquals(Duration max, T msg)`]
The given message object must be received within the specified time; the The given message object must be received within the specified time; the
object will be returned. object will be returned.
* @scala[`expectMsgPF[T](d: Duration)(pf: PartialFunction[Any, T]): T`]@java[`public <T> T expectMsgPF(Duration max, String hint, Function<Object, T> f)`] * @scala[`expectMsgPF[T](d: Duration)(pf: PartialFunction[Any, T]): T`]@java[`public <T> T expectMsgPF(Duration max, String hint, Function<Object, T> f)`]
@ -91,7 +91,7 @@ the @scala[partial] function to the received message is returned. @scala[The dur
be left unspecified (empty parentheses are required in this case) to use be left unspecified (empty parentheses are required in this case) to use
the deadline from the innermost enclosing [within](#testkit-within) the deadline from the innermost enclosing [within](#testkit-within)
block instead.] block instead.]
* @scala[`expectMsgClass[T](d: Duration, c: Class[T]): T`]@java[`public <T> T expectMsgClass(FiniteDuration max, Class<T> c)`] * @scala[`expectMsgClass[T](d: Duration, c: Class[T]): T`]@java[`public <T> T expectMsgClass(Duration max, Class<T> c)`]
An object which is an instance of the given `Class` must be received An object which is an instance of the given `Class` must be received
within the allotted time frame; the object will be returned. Note that this within the allotted time frame; the object will be returned. Note that this
does a conformance check; if you need the class to be equal, @scala[have a look at does a conformance check; if you need the class to be equal, @scala[have a look at
@ -111,12 +111,12 @@ method is approximately equivalent to
An object must be received within the given time, and it must be equal ( An object must be received within the given time, and it must be equal (
compared with @scala[`==`]@java[`equals()`]) to at least one of the passed reference objects; the compared with @scala[`==`]@java[`equals()`]) to at least one of the passed reference objects; the
received object will be returned. received object will be returned.
* @scala[`expectMsgAnyClassOf[T](d: Duration, obj: Class[_ <: T]*): T`]@java[`public <T> T expectMsgAnyClassOf(FiniteDuration max, Class<? extends T>... c)`] * @scala[`expectMsgAnyClassOf[T](d: Duration, obj: Class[_ <: T]*): T`]@java[`public <T> T expectMsgAnyClassOf(Duration max, Class<? extends T>... c)`]
An object must be received within the given time, and it must be an An object must be received within the given time, and it must be an
instance of at least one of the supplied `Class` objects; the instance of at least one of the supplied `Class` objects; the
received object will be returned. Note that this does a conformance check, received object will be returned. Note that this does a conformance check,
if you need the class to be equal you need to verify that afterwards. if you need the class to be equal you need to verify that afterwards.
* @scala[`expectMsgAllOf[T](d: Duration, obj: T*): Seq[T]`]@java[`public List<Object> expectMsgAllOf(FiniteDuration max, Object... msg)`] * @scala[`expectMsgAllOf[T](d: Duration, obj: T*): Seq[T]`]@java[`public List<Object> expectMsgAllOf(Duration max, Object... msg)`]
A number of objects matching the size of the supplied object array must be A number of objects matching the size of the supplied object array must be
received within the given time, and for each of the given objects there received within the given time, and for each of the given objects there
must exist at least one among the received ones which equals (compared with must exist at least one among the received ones which equals (compared with
@ -139,11 +139,11 @@ instance of this class. The full sequence of received objects is returned.
@@@ @@@
* @scala[`expectNoMsg(d: Duration)`]@java[`public void expectNoMsg(FiniteDuration max)`] * @scala[`expectNoMessage(d: Duration)`]@java[`public void expectNoMessage(Duration max)`]
No message must be received within the given time. This also fails if a No message must be received within the given time. This also fails if a
message has been received before calling this method which has not been message has been received before calling this method which has not been
removed from the queue using one of the other methods. removed from the queue using one of the other methods.
* @scala[`receiveN(n: Int, d: Duration): Seq[AnyRef]`]@java[`List<Object> receiveN(int n, FiniteDuration max)`] * @scala[`receiveN(n: Int, d: Duration): Seq[AnyRef]`]@java[`List<Object> receiveN(int n, Duration max)`]
`n` messages must be received within the given time; the received `n` messages must be received within the given time; the received
messages are returned. messages are returned.
@ -272,11 +272,7 @@ checked external to the examination, which is facilitated by a new construct
for managing time constraints: for managing time constraints:
Scala Scala
: ```scala : @@snip [TestkitDocSpec.scala]($code$/scala/docs/testkit/TestkitDocSpec.scala) { #test-within }
within([min, ]max) {
...
}
```
Java Java
: @@snip [TestKitDocTest.java]($code$/java/jdocs/testkit/TestKitDocTest.java) { #test-within } : @@snip [TestKitDocTest.java]($code$/java/jdocs/testkit/TestKitDocTest.java) { #test-within }
@ -289,13 +285,11 @@ you do not specify it, it is inherited from the innermost enclosing
`within` block. `within` block.
It should be noted that if the last message-receiving assertion of the block is It should be noted that if the last message-receiving assertion of the block is
`expectNoMsg` or `receiveWhile`, the final check of the `expectNoMessage` or `receiveWhile`, the final check of the
`within` is skipped in order to avoid false positives due to wake-up `within` is skipped in order to avoid false positives due to wake-up
latencies. This means that while individual contained assertions still use the latencies. This means that while individual contained assertions still use the
maximum time bound, the overall block may take arbitrarily longer in this case. maximum time bound, the overall block may take arbitrarily longer in this case.
Scala
: @@snip [TestkitDocSpec.scala]($code$/scala/docs/testkit/TestkitDocSpec.scala) { #test-within }
@@@ note @@@ note

View file

@ -31,16 +31,12 @@ import akka.actor.PoisonPill;
import akka.actor.Props; import akka.actor.Props;
import akka.actor.Terminated; import akka.actor.Terminated;
import akka.actor.AbstractActor; import akka.actor.AbstractActor;
import akka.actor.AbstractActor.Receive;
import akka.testkit.TestActor.AutoPilot; import akka.testkit.TestActor.AutoPilot;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;
import java.util.Arrays;
import java.util.List; import java.util.List;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit; import java.time.Duration;
public class TestKitDocTest extends AbstractJavaTest { public class TestKitDocTest extends AbstractJavaTest {
@ -92,8 +88,7 @@ public class TestKitDocTest extends AbstractJavaTest {
void triggerScheduling() { void triggerScheduling() {
getTimers().startSingleTimer( getTimers().startSingleTimer(
SCHED_KEY, SCHED_KEY,
new ScheduledMessage(), new ScheduledMessage(), Duration.ofMillis(500)
Duration.create(500, TimeUnit.MILLISECONDS)
); );
} }
} }
@ -129,7 +124,7 @@ public class TestKitDocTest extends AbstractJavaTest {
//#test-within //#test-within
new TestKit(system) {{ new TestKit(system) {{
getRef().tell(42, ActorRef.noSender()); getRef().tell(42, ActorRef.noSender());
within(java.time.Duration.ZERO, java.time.Duration.ofSeconds(1), () -> { within(Duration.ZERO, Duration.ofSeconds(1), () -> {
assertEquals((Integer) 42, expectMsgClass(Integer.class)); assertEquals((Integer) 42, expectMsgClass(Integer.class));
return null; return null;
}); });
@ -162,7 +157,7 @@ public class TestKitDocTest extends AbstractJavaTest {
getRef().tell(43, ActorRef.noSender()); getRef().tell(43, ActorRef.noSender());
getRef().tell("hello", ActorRef.noSender()); getRef().tell("hello", ActorRef.noSender());
final List<String> out = receiveWhile(duration("1 second"), in -> { final List<String> out = receiveWhile(Duration.ofSeconds(1), in -> {
if (in instanceof Integer) { if (in instanceof Integer) {
return in.toString(); return in.toString();
} else { } else {
@ -176,7 +171,7 @@ public class TestKitDocTest extends AbstractJavaTest {
//#test-receivewhile //#test-receivewhile
new TestKit(system) {{ new TestKit(system) {{
//#test-receivewhile-full //#test-receivewhile-full
receiveWhile(duration("100 millis"), duration("50 millis"), 12, in -> { receiveWhile(Duration.ofMillis(100), Duration.ofMillis(50), 12, in -> {
//#match-elided //#match-elided
throw JavaPartialFunction.noMatch(); throw JavaPartialFunction.noMatch();
//#match-elided //#match-elided
@ -190,7 +185,7 @@ public class TestKitDocTest extends AbstractJavaTest {
//#test-awaitCond //#test-awaitCond
new TestKit(system) {{ new TestKit(system) {{
getRef().tell(42, ActorRef.noSender()); getRef().tell(42, ActorRef.noSender());
awaitCond(java.time.Duration.ofSeconds(1), java.time.Duration.ofMillis(100), this::msgAvailable); awaitCond(Duration.ofSeconds(1), Duration.ofMillis(100), this::msgAvailable);
}}; }};
//#test-awaitCond //#test-awaitCond
} }
@ -200,7 +195,7 @@ public class TestKitDocTest extends AbstractJavaTest {
//#test-awaitAssert //#test-awaitAssert
new TestKit(system) {{ new TestKit(system) {{
getRef().tell(42, ActorRef.noSender()); getRef().tell(42, ActorRef.noSender());
awaitAssert(duration("1 second"), duration("100 millis"), () -> { awaitAssert(Duration.ofSeconds(1), Duration.ofMillis(100), () -> {
assertEquals(msgAvailable(), true); assertEquals(msgAvailable(), true);
return null; return null;
}); });
@ -260,8 +255,8 @@ public class TestKitDocTest extends AbstractJavaTest {
public void demonstrateDilated() { public void demonstrateDilated() {
//#duration-dilation //#duration-dilation
new TestKit(system) {{ new TestKit(system) {{
final java.time.Duration original = java.time.Duration.ofSeconds(1); final Duration original = Duration.ofSeconds(1);
final java.time.Duration stretched = dilated(original); final Duration stretched = dilated(original);
assertTrue("dilated", stretched.compareTo(original) >= 0); assertTrue("dilated", stretched.compareTo(original) >= 0);
}}; }};
//#duration-dilation //#duration-dilation
@ -399,7 +394,7 @@ public class TestKitDocTest extends AbstractJavaTest {
//#test-within-probe //#test-within-probe
new TestKit(system) {{ new TestKit(system) {{
final TestKit probe = new TestKit(system); final TestKit probe = new TestKit(system);
within(java.time.Duration.ofSeconds(1), () -> probe.expectMsgEquals("hello")); within(Duration.ofSeconds(1), () -> probe.expectMsgEquals("hello"));
}}; }};
//#test-within-probe //#test-within-probe
} catch (AssertionError e) { } catch (AssertionError e) {

View file

@ -16,7 +16,8 @@ import akka.actor.ActorRef;
import akka.actor.ActorSystem; import akka.actor.ActorSystem;
import akka.actor.Props; import akka.actor.Props;
import akka.actor.AbstractActor; import akka.actor.AbstractActor;
import scala.concurrent.duration.Duration;
import java.time.Duration;
public class TestKitSampleTest extends AbstractJavaTest { public class TestKitSampleTest extends AbstractJavaTest {
@ -67,10 +68,10 @@ public class TestKitSampleTest extends AbstractJavaTest {
// like a real resource would be passed in production // like a real resource would be passed in production
subject.tell(probe.getRef(), getRef()); subject.tell(probe.getRef(), getRef());
// await the correct response // await the correct response
expectMsg(java.time.Duration.ofSeconds(1), "done"); expectMsg(Duration.ofSeconds(1), "done");
// the run() method needs to finish within 3 seconds // the run() method needs to finish within 3 seconds
within(java.time.Duration.ofSeconds(3), () -> { within(Duration.ofSeconds(3), () -> {
subject.tell("hello", getRef()); subject.tell("hello", getRef());
// This is a demo: would normally use expectMsgEquals(). // This is a demo: would normally use expectMsgEquals().
@ -78,13 +79,13 @@ public class TestKitSampleTest extends AbstractJavaTest {
awaitCond(probe::msgAvailable); awaitCond(probe::msgAvailable);
// response must have been enqueued to us before probe // response must have been enqueued to us before probe
expectMsg(java.time.Duration.ZERO, "world"); expectMsg(Duration.ZERO, "world");
// check that the probe we injected earlier got the msg // check that the probe we injected earlier got the msg
probe.expectMsg(java.time.Duration.ZERO, "hello"); probe.expectMsg(Duration.ZERO, "hello");
Assert.assertEquals(getRef(), probe.getLastSender()); Assert.assertEquals(getRef(), probe.getLastSender());
// Will wait for the rest of the 3 seconds // Will wait for the rest of the 3 seconds
expectNoMsg(); expectNoMessage();
return null; return null;
}); });
}}; }};

View file

@ -18,11 +18,11 @@ import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.time.Duration;
import akka.persistence.fsm.PersistentFSM.CurrentState; import akka.persistence.fsm.PersistentFSM.CurrentState;
import org.junit.Test; import org.junit.Test;
import org.scalatest.junit.JUnitSuite; import org.scalatest.junit.JUnitSuite;
import scala.concurrent.duration.Duration;
import static akka.persistence.fsm.PersistentFSM.FSMState; import static akka.persistence.fsm.PersistentFSM.FSMState;
@ -129,13 +129,13 @@ public class AbstractPersistentFSMTest extends JUnitSuite {
PersistentFSM.Transition stateTransition = expectMsgClass(PersistentFSM.Transition.class); PersistentFSM.Transition stateTransition = expectMsgClass(PersistentFSM.Transition.class);
assertTransition(stateTransition, fsmRef, UserState.LOOKING_AROUND, UserState.SHOPPING); assertTransition(stateTransition, fsmRef, UserState.LOOKING_AROUND, UserState.SHOPPING);
within(duration("0.9 seconds"), remainingOrDefault(), () -> { within(Duration.ofMillis(900), getRemainingOrDefault(), () -> {
PersistentFSM.Transition st = expectMsgClass(PersistentFSM.Transition.class); PersistentFSM.Transition st = expectMsgClass(PersistentFSM.Transition.class);
assertTransition(st, fsmRef, UserState.SHOPPING, UserState.INACTIVE); assertTransition(st, fsmRef, UserState.SHOPPING, UserState.INACTIVE);
return null; return null;
}); });
within(duration("1.9 seconds"), remainingOrDefault(), () -> expectTerminated(fsmRef)); within(Duration.ofMillis(1900), getRemainingOrDefault(), () -> expectTerminated(fsmRef));
}}; }};
} }
@ -296,7 +296,7 @@ public class AbstractPersistentFSMTest extends JUnitSuite {
PersistentFSM.Transition stateTransition = expectMsgClass(PersistentFSM.Transition.class); PersistentFSM.Transition stateTransition = expectMsgClass(PersistentFSM.Transition.class);
assertTransition(stateTransition, fsmRef, UserState.LOOKING_AROUND, UserState.SHOPPING); assertTransition(stateTransition, fsmRef, UserState.LOOKING_AROUND, UserState.SHOPPING);
expectNoMsg(duration("0.6seconds")); //randomly chosen delay, less than the timeout, before stopping the FSM expectNoMessage(Duration.ofMillis(600)); //randomly chosen delay, less than the timeout, before stopping the FSM
fsmRef.tell(PoisonPill.getInstance(), ActorRef.noSender()); fsmRef.tell(PoisonPill.getInstance(), ActorRef.noSender());
expectTerminated(fsmRef); expectTerminated(fsmRef);
@ -308,13 +308,13 @@ public class AbstractPersistentFSMTest extends JUnitSuite {
currentState = expectMsgClass(akka.persistence.fsm.PersistentFSM.CurrentState.class); currentState = expectMsgClass(akka.persistence.fsm.PersistentFSM.CurrentState.class);
assertEquals(currentState.state(), UserState.SHOPPING); assertEquals(currentState.state(), UserState.SHOPPING);
within(duration("0.9 seconds"), remainingOrDefault(), () -> { within(Duration.ofMillis(900), getRemainingOrDefault(), () -> {
PersistentFSM.Transition st = expectMsgClass(PersistentFSM.Transition.class); PersistentFSM.Transition st = expectMsgClass(PersistentFSM.Transition.class);
assertTransition(st, recoveredFsmRef, UserState.SHOPPING, UserState.INACTIVE); assertTransition(st, recoveredFsmRef, UserState.SHOPPING, UserState.INACTIVE);
return null; return null;
}); });
expectNoMsg(duration("0.9 seconds")); //randomly chosen delay, less than the timeout, before stopping the FSM expectNoMessage(Duration.ofMillis(900)); //randomly chosen delay, less than the timeout, before stopping the FSM
recoveredFsmRef.tell(PoisonPill.getInstance(), ActorRef.noSender()); recoveredFsmRef.tell(PoisonPill.getInstance(), ActorRef.noSender());
expectTerminated(recoveredFsmRef); expectTerminated(recoveredFsmRef);
@ -325,7 +325,7 @@ public class AbstractPersistentFSMTest extends JUnitSuite {
currentState = expectMsgClass(akka.persistence.fsm.PersistentFSM.CurrentState.class); currentState = expectMsgClass(akka.persistence.fsm.PersistentFSM.CurrentState.class);
assertEquals(currentState.state(), UserState.INACTIVE); assertEquals(currentState.state(), UserState.INACTIVE);
within(duration("1.9 seconds"), remainingOrDefault(), () -> expectTerminated(recoveredFsmRef2)); within(Duration.ofMillis(1900), getRemainingOrDefault(), () -> expectTerminated(recoveredFsmRef2));
}}; }};
} }
@ -511,7 +511,7 @@ public class AbstractPersistentFSMTest extends JUnitSuite {
matchEvent(AddItem.class, matchEvent(AddItem.class,
(event, data) -> (event, data) ->
goTo(UserState.SHOPPING).applying(new ItemAdded(event.getItem())) goTo(UserState.SHOPPING).applying(new ItemAdded(event.getItem()))
.forMax(Duration.create(1, TimeUnit.SECONDS)) .forMax(scala.concurrent.duration.Duration.create(1, TimeUnit.SECONDS))
) )
.event(GetCurrentCart.class, (event, data) -> stay().replying(data)) .event(GetCurrentCart.class, (event, data) -> stay().replying(data))
); );
@ -520,7 +520,7 @@ public class AbstractPersistentFSMTest extends JUnitSuite {
matchEvent(AddItem.class, matchEvent(AddItem.class,
(event, data) -> (event, data) ->
stay().applying(new ItemAdded(event.getItem())) stay().applying(new ItemAdded(event.getItem()))
.forMax(Duration.create(1, TimeUnit.SECONDS))) .forMax(scala.concurrent.duration.Duration.create(1, TimeUnit.SECONDS)))
.event(Buy.class, .event(Buy.class,
//#customer-andthen-example //#customer-andthen-example
(event, data) -> (event, data) ->
@ -544,7 +544,7 @@ public class AbstractPersistentFSMTest extends JUnitSuite {
.event(GetCurrentCart.class, (event, data) -> stay().replying(data)) .event(GetCurrentCart.class, (event, data) -> stay().replying(data))
.event(StateTimeout$.class, .event(StateTimeout$.class,
(event, data) -> (event, data) ->
goTo(UserState.INACTIVE).forMax(Duration.create(2, TimeUnit.SECONDS))) goTo(UserState.INACTIVE).forMax(scala.concurrent.duration.Duration.create(2, TimeUnit.SECONDS)))
); );
@ -552,7 +552,7 @@ public class AbstractPersistentFSMTest extends JUnitSuite {
matchEvent(AddItem.class, matchEvent(AddItem.class,
(event, data) -> (event, data) ->
goTo(UserState.SHOPPING).applying(new ItemAdded(event.getItem())) goTo(UserState.SHOPPING).applying(new ItemAdded(event.getItem()))
.forMax(Duration.create(1, TimeUnit.SECONDS))) .forMax(scala.concurrent.duration.Duration.create(1, TimeUnit.SECONDS)))
.event(GetCurrentCart.class, (event, data) -> stay().replying(data)) .event(GetCurrentCart.class, (event, data) -> stay().replying(data))
.event(StateTimeout$.class, .event(StateTimeout$.class,
(event, data) -> (event, data) ->
@ -641,7 +641,7 @@ public class AbstractPersistentFSMTest extends JUnitSuite {
new TestKit(system) {{ new TestKit(system) {{
ActorRef persistentActor = system.actorOf(Props.create(PFSMwithLog.class)); ActorRef persistentActor = system.actorOf(Props.create(PFSMwithLog.class));
persistentActor.tell("check", getRef()); persistentActor.tell("check", getRef());
expectMsg(duration("1000 millis"), "started"); expectMsg(Duration.ofSeconds(1), "started");
}}; }};
} }
} }

View file

@ -7,7 +7,7 @@ package akka.stream.io;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import java.io.OutputStream; import java.io.OutputStream;
import java.util.concurrent.TimeUnit; import java.time.Duration;
import akka.testkit.javadsl.TestKit; import akka.testkit.javadsl.TestKit;
import org.junit.ClassRule; import org.junit.ClassRule;
@ -22,7 +22,6 @@ import akka.stream.javadsl.Source;
import akka.stream.javadsl.StreamConverters; import akka.stream.javadsl.StreamConverters;
import akka.stream.testkit.Utils; import akka.stream.testkit.Utils;
import akka.util.ByteString; import akka.util.ByteString;
import scala.concurrent.duration.FiniteDuration;
public class OutputStreamSourceTest extends StreamTest { public class OutputStreamSourceTest extends StreamTest {
public OutputStreamSourceTest() { public OutputStreamSourceTest() {
@ -34,8 +33,8 @@ public class OutputStreamSourceTest extends StreamTest {
Utils.UnboundedMailboxConfig()); Utils.UnboundedMailboxConfig());
@Test @Test
public void mustSendEventsViaOutputStream() throws Exception { public void mustSendEventsViaOutputStream() throws Exception {
final FiniteDuration timeout = FiniteDuration.create(3, TimeUnit.SECONDS);
final TestKit probe = new TestKit(system); final TestKit probe = new TestKit(system);
final Duration timeout = Duration.ofSeconds(3);
final Source<ByteString, OutputStream> source = StreamConverters.asOutputStream(timeout); final Source<ByteString, OutputStream> source = StreamConverters.asOutputStream(timeout);
final OutputStream s = source.to(Sink.foreach(new Procedure<ByteString>() { final OutputStream s = source.to(Sink.foreach(new Procedure<ByteString>() {

View file

@ -22,8 +22,6 @@ import akka.testkit.javadsl.TestKit;
import org.junit.ClassRule; import org.junit.ClassRule;
import org.junit.Test; import org.junit.Test;
import org.reactivestreams.Publisher; import org.reactivestreams.Publisher;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;
import akka.testkit.AkkaJUnitActorSystemResource; import akka.testkit.AkkaJUnitActorSystemResource;
import java.util.*; import java.util.*;
@ -34,6 +32,7 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import java.util.stream.Stream; import java.util.stream.Stream;
import java.time.Duration;
import static akka.stream.testkit.StreamTestKit.PublisherProbeSubscription; import static akka.stream.testkit.StreamTestKit.PublisherProbeSubscription;
import static org.junit.Assert.*; import static org.junit.Assert.*;
@ -190,9 +189,9 @@ public class FlowTest extends StreamTest {
probe.expectMsgEquals(0); probe.expectMsgEquals(0);
probe.expectMsgEquals(1); probe.expectMsgEquals(1);
FiniteDuration duration = Duration.apply(200, TimeUnit.MILLISECONDS); Duration duration = Duration.ofMillis(200);
probe.expectNoMsg(duration); probe.expectNoMessage(duration);
future.toCompletableFuture().get(3, TimeUnit.SECONDS); future.toCompletableFuture().get(3, TimeUnit.SECONDS);
} }
@ -1013,7 +1012,7 @@ public class FlowTest extends StreamTest {
public void mustBeAbleToUseInitialTimeout() throws Throwable { public void mustBeAbleToUseInitialTimeout() throws Throwable {
try { try {
try { try {
Source.<Integer> maybe().via(Flow.of(Integer.class).initialTimeout(Duration.create(1, "second"))) Source.<Integer> maybe().via(Flow.of(Integer.class).initialTimeout(Duration.ofSeconds(1)))
.runWith(Sink.<Integer> head(), materializer).toCompletableFuture().get(3, TimeUnit.SECONDS); .runWith(Sink.<Integer> head(), materializer).toCompletableFuture().get(3, TimeUnit.SECONDS);
org.junit.Assert.fail("A TimeoutException was expected"); org.junit.Assert.fail("A TimeoutException was expected");
} catch (ExecutionException e) { } catch (ExecutionException e) {
@ -1029,7 +1028,7 @@ public class FlowTest extends StreamTest {
public void mustBeAbleToUseCompletionTimeout() throws Throwable { public void mustBeAbleToUseCompletionTimeout() throws Throwable {
try { try {
try { try {
Source.<Integer> maybe().via(Flow.of(Integer.class).completionTimeout(Duration.create(1, "second"))) Source.<Integer> maybe().via(Flow.of(Integer.class).completionTimeout(Duration.ofSeconds(1)))
.runWith(Sink.<Integer> head(), materializer).toCompletableFuture().get(3, TimeUnit.SECONDS); .runWith(Sink.<Integer> head(), materializer).toCompletableFuture().get(3, TimeUnit.SECONDS);
org.junit.Assert.fail("A TimeoutException was expected"); org.junit.Assert.fail("A TimeoutException was expected");
} catch (ExecutionException e) { } catch (ExecutionException e) {
@ -1044,7 +1043,7 @@ public class FlowTest extends StreamTest {
public void mustBeAbleToUseIdleTimeout() throws Throwable { public void mustBeAbleToUseIdleTimeout() throws Throwable {
try { try {
try { try {
Source.<Integer> maybe().via(Flow.of(Integer.class).idleTimeout(Duration.create(1, "second"))) Source.<Integer> maybe().via(Flow.of(Integer.class).idleTimeout(Duration.ofSeconds(1)))
.runWith(Sink.<Integer> head(), materializer).toCompletableFuture().get(3, TimeUnit.SECONDS); .runWith(Sink.<Integer> head(), materializer).toCompletableFuture().get(3, TimeUnit.SECONDS);
org.junit.Assert.fail("A TimeoutException was expected"); org.junit.Assert.fail("A TimeoutException was expected");
} catch (ExecutionException e) { } catch (ExecutionException e) {
@ -1060,9 +1059,9 @@ public class FlowTest extends StreamTest {
Integer result = Integer result =
Source.<Integer>maybe() Source.<Integer>maybe()
.via(Flow.of(Integer.class) .via(Flow.of(Integer.class)
.keepAlive(Duration.create(1, "second"), (Creator<Integer>) () -> 0) .keepAlive(Duration.ofSeconds(1), (Creator<Integer>) () -> 0)
) )
.takeWithin(Duration.create(1500, "milliseconds")) .takeWithin(Duration.ofMillis(1500))
.runWith(Sink.<Integer>head(), materializer) .runWith(Sink.<Integer>head(), materializer)
.toCompletableFuture().get(3, TimeUnit.SECONDS); .toCompletableFuture().get(3, TimeUnit.SECONDS);

View file

@ -440,11 +440,11 @@ public class SourceTest extends StreamTest {
probe.getRef().tell(elem, ActorRef.noSender()); probe.getRef().tell(elem, ActorRef.noSender());
} }
})).run(materializer); })).run(materializer);
probe.expectNoMsg(FiniteDuration.create(600, TimeUnit.MILLISECONDS)); probe.expectNoMessage(Duration.ofMillis(600));
probe.expectMsgEquals("tick"); probe.expectMsgEquals("tick");
probe.expectNoMsg(FiniteDuration.create(200, TimeUnit.MILLISECONDS)); probe.expectNoMessage(Duration.ofMillis(200));
probe.expectMsgEquals("tick"); probe.expectMsgEquals("tick");
probe.expectNoMsg(FiniteDuration.create(200, TimeUnit.MILLISECONDS)); probe.expectNoMessage(Duration.ofMillis(200));
} }
@Test @Test
@ -642,9 +642,9 @@ public class SourceTest extends StreamTest {
probe.expectMsgEquals(0); probe.expectMsgEquals(0);
probe.expectMsgEquals(1); probe.expectMsgEquals(1);
FiniteDuration duration = FiniteDuration.apply(200, TimeUnit.MILLISECONDS); Duration duration = Duration.ofMillis(200);
probe.expectNoMsg(duration); probe.expectNoMessage(duration);
future.toCompletableFuture().get(3, TimeUnit.SECONDS); future.toCompletableFuture().get(3, TimeUnit.SECONDS);
} }

View file

@ -615,6 +615,7 @@ class TestKit(system: ActorSystem) {
* one of the given classes. Wait time is bounded by the given duration, * one of the given classes. Wait time is bounded by the given duration,
* with an AssertionFailure being thrown in case of timeout. * with an AssertionFailure being thrown in case of timeout.
*/ */
@varargs
def expectMsgAnyClassOf[T](max: java.time.Duration, objs: Class[_]*): T = def expectMsgAnyClassOf[T](max: java.time.Duration, objs: Class[_]*): T =
expectMsgAnyClassOf(max.asScala, objs: _*) expectMsgAnyClassOf(max.asScala, objs: _*)
@ -732,8 +733,18 @@ class TestKit(system: ActorSystem) {
* *
* This method does NOT automatically scale its Duration parameter! * This method does NOT automatically scale its Duration parameter!
*/ */
@Deprecated
@deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "2.5.13")
def receiveOne(max: Duration): AnyRef = tp.receiveOne(max) def receiveOne(max: Duration): AnyRef = tp.receiveOne(max)
/**
* Receive one message from the internal queue of the TestActor. If the given
* duration is zero, the queue is polled (non-blocking).
*
* This method does NOT automatically scale its Duration parameter!
*/
def receiveOne(max: java.time.Duration): AnyRef = tp.receiveOne(max.asScala)
/** /**
* Receive a series of messages until one does not match the given partial * Receive a series of messages until one does not match the given partial
* function or the idle timeout is met (disabled by default) or the overall * function or the idle timeout is met (disabled by default) or the overall
@ -746,6 +757,8 @@ class TestKit(system: ActorSystem) {
* certain characteristics are generated at a certain rate: * certain characteristics are generated at a certain rate:
* *
*/ */
@Deprecated
@deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "2.5.13")
def receiveWhile[T](max: Duration, idle: Duration, messages: Int, f: JFunction[AnyRef, T]): JList[T] = { def receiveWhile[T](max: Duration, idle: Duration, messages: Int, f: JFunction[AnyRef, T]): JList[T] = {
tp.receiveWhile(max, idle, messages)(new CachingPartialFunction[AnyRef, T] { tp.receiveWhile(max, idle, messages)(new CachingPartialFunction[AnyRef, T] {
@throws(classOf[Exception]) @throws(classOf[Exception])
@ -753,6 +766,27 @@ class TestKit(system: ActorSystem) {
}).asJava }).asJava
} }
/**
* Receive a series of messages until one does not match the given partial
* function or the idle timeout is met (disabled by default) or the overall
* maximum duration is elapsed or expected messages count is reached.
* Returns the sequence of messages.
*
* Note that it is not an error to hit the `max` duration in this case.
*
* One possible use of this method is for testing whether messages of
* certain characteristics are generated at a certain rate:
*
*/
def receiveWhile[T](max: java.time.Duration, idle: java.time.Duration, messages: Int, f: JFunction[AnyRef, T]): JList[T] = {
tp.receiveWhile(max.asScala, idle.asScala, messages)(new CachingPartialFunction[AnyRef, T] {
@throws(classOf[Exception])
override def `match`(x: AnyRef): T = f.apply(x)
}).asJava
}
@Deprecated
@deprecated("Use the overloaded one which accepts java.time.Duration instead.", since = "2.5.13")
def receiveWhile[T](max: Duration, f: JFunction[AnyRef, T]): JList[T] = { def receiveWhile[T](max: Duration, f: JFunction[AnyRef, T]): JList[T] = {
tp.receiveWhile(max = max)(new CachingPartialFunction[AnyRef, T] { tp.receiveWhile(max = max)(new CachingPartialFunction[AnyRef, T] {
@throws(classOf[Exception]) @throws(classOf[Exception])
@ -760,6 +794,13 @@ class TestKit(system: ActorSystem) {
}).asJava }).asJava
} }
def receiveWhile[T](max: java.time.Duration, f: JFunction[AnyRef, T]): JList[T] = {
tp.receiveWhile(max = max.asScala)(new CachingPartialFunction[AnyRef, T] {
@throws(classOf[Exception])
override def `match`(x: AnyRef): T = f.apply(x)
}).asJava
}
/** /**
* Spawns an actor as a child of this test actor, and returns the child's ActorRef. * Spawns an actor as a child of this test actor, and returns the child's ActorRef.
*/ */