From c354aa0b6a629f38ac3af09e047a7d54a5234f9c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=99=8E=E9=B8=A3?= Date: Mon, 19 Mar 2018 22:14:33 +0800 Subject: [PATCH] =doc using java.time.Duration for stream's javadsl --- .../test/java/jdocs/stream/FlowDocTest.java | 37 ++++++++++--------- .../java/jdocs/stream/GraphStageDocTest.java | 12 ++---- .../test/java/jdocs/stream/HubDocTest.java | 31 ++++++---------- .../java/jdocs/stream/KillSwitchDocTest.java | 12 +++--- .../java/jdocs/stream/QuickStartDocTest.java | 5 +-- .../java/jdocs/stream/RestartDocTest.java | 12 +++--- .../stream/StreamBuffersRateDocTest.java | 23 +++++------- .../jdocs/stream/StreamTestKitDocTest.java | 18 ++++----- .../cookbook/RecipeAdhocSourceTest.java | 11 +++--- .../javadsl/cookbook/RecipeKeepAlive.java | 6 +-- .../java/akka/stream/javadsl/FlowTest.java | 4 +- .../akka/stream/javadsl/FlowThrottleTest.java | 18 ++++----- .../java/akka/stream/javadsl/SourceTest.java | 30 +++++++-------- 13 files changed, 100 insertions(+), 119 deletions(-) diff --git a/akka-docs/src/test/java/jdocs/stream/FlowDocTest.java b/akka-docs/src/test/java/jdocs/stream/FlowDocTest.java index 39acfcef5d..532768c066 100644 --- a/akka-docs/src/test/java/jdocs/stream/FlowDocTest.java +++ b/akka-docs/src/test/java/jdocs/stream/FlowDocTest.java @@ -4,30 +4,31 @@ package jdocs.stream; -import static org.junit.Assert.assertEquals; - -import java.util.*; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CompletionStage; -import java.util.concurrent.TimeUnit; - import akka.NotUsed; import akka.actor.AbstractActor; +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.actor.Cancellable; +import akka.dispatch.Futures; import akka.japi.Pair; -import jdocs.AbstractJavaTest; +import akka.stream.*; +import akka.stream.javadsl.*; import akka.testkit.javadsl.TestKit; +import jdocs.AbstractJavaTest; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; -import scala.concurrent.duration.Duration; -import scala.concurrent.duration.FiniteDuration; -import akka.actor.ActorSystem; -import akka.actor.ActorRef; -import akka.actor.Cancellable; -import akka.dispatch.Futures; -import akka.stream.*; -import akka.stream.javadsl.*; +import java.time.Duration; +import java.util.Arrays; +import java.util.LinkedList; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.assertEquals; public class FlowDocTest extends AbstractJavaTest { @@ -131,7 +132,7 @@ public class FlowDocTest extends AbstractJavaTest { //#compound-source-is-not-keyed-runWith final Object tick = new Object(); - final FiniteDuration oneSecond = Duration.create(1, TimeUnit.SECONDS); + final Duration oneSecond = Duration.ofSeconds(1); //akka.actor.Cancellable final Source timer = Source.tick(oneSecond, oneSecond, tick); @@ -208,7 +209,7 @@ public class FlowDocTest extends AbstractJavaTest { @Test public void transformingMaterialized() throws Exception { - FiniteDuration oneSecond = FiniteDuration.apply(1, TimeUnit.SECONDS); + Duration oneSecond = Duration.ofSeconds(1); Flow throttler = Flow.fromGraph(GraphDSL.create( Source.tick(oneSecond, oneSecond, ""), diff --git a/akka-docs/src/test/java/jdocs/stream/GraphStageDocTest.java b/akka-docs/src/test/java/jdocs/stream/GraphStageDocTest.java index 25ada3e940..d2532ef5d4 100644 --- a/akka-docs/src/test/java/jdocs/stream/GraphStageDocTest.java +++ b/akka-docs/src/test/java/jdocs/stream/GraphStageDocTest.java @@ -4,14 +4,14 @@ package jdocs.stream; +//#imports import akka.Done; import akka.NotUsed; import akka.actor.ActorSystem; -//#imports -import akka.dispatch.Futures; import akka.japi.Option; import akka.japi.Pair; import akka.japi.Predicate; +import akka.japi.Function; import akka.japi.function.Procedure; import akka.stream.*; import akka.stream.javadsl.*; @@ -19,21 +19,15 @@ import akka.stream.stage.*; //#imports import akka.stream.testkit.TestPublisher; import akka.stream.testkit.TestSubscriber; -import akka.japi.Function; import jdocs.AbstractJavaTest; import akka.testkit.javadsl.TestKit; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; import org.reactivestreams.Subscription; -import scala.compat.java8.FutureConverters; import scala.concurrent.ExecutionContext; -import scala.concurrent.Promise; -import scala.concurrent.duration.Duration; -import scala.concurrent.duration.FiniteDuration; import java.util.*; -import java.util.concurrent.Callable; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import java.util.concurrent.TimeUnit; @@ -568,7 +562,7 @@ public class GraphStageDocTest extends AbstractJavaTest { CompletionStage result = Source.from(Arrays.asList(1, 2, 3)) .via(new TimedGate<>(2)) - .takeWithin(Duration.create(250, "millis")) + .takeWithin(java.time.Duration.ofMillis(250)) .runFold(0, (n, sum) -> n + sum, mat); assertEquals(new Integer(1), result.toCompletableFuture().get(3, TimeUnit.SECONDS)); diff --git a/akka-docs/src/test/java/jdocs/stream/HubDocTest.java b/akka-docs/src/test/java/jdocs/stream/HubDocTest.java index 7f20e3fed1..cef8801d3c 100644 --- a/akka-docs/src/test/java/jdocs/stream/HubDocTest.java +++ b/akka-docs/src/test/java/jdocs/stream/HubDocTest.java @@ -9,27 +9,18 @@ import akka.NotUsed; import akka.actor.ActorSystem; import akka.actor.Cancellable; import akka.japi.Pair; -import akka.stream.ActorMaterializer; -import akka.stream.KillSwitches; -import akka.stream.Materializer; -import akka.stream.ThrottleMode; -import akka.stream.UniqueKillSwitch; +import akka.stream.*; import akka.stream.javadsl.*; import akka.stream.javadsl.PartitionHub.ConsumerInfo; - -import jdocs.AbstractJavaTest; import akka.testkit.javadsl.TestKit; +import jdocs.AbstractJavaTest; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; -import scala.concurrent.duration.Duration; -import scala.concurrent.duration.FiniteDuration; +import java.time.Duration; import java.util.List; import java.util.concurrent.CompletionStage; -import java.util.concurrent.TimeUnit; -import java.util.function.BiFunction; -import java.util.function.Supplier; import java.util.function.ToLongBiFunction; public class HubDocTest extends AbstractJavaTest { @@ -80,8 +71,8 @@ public class HubDocTest extends AbstractJavaTest { //#broadcast-hub // A simple producer that publishes a new "message" every second Source producer = Source.tick( - FiniteDuration.create(1, TimeUnit.SECONDS), - FiniteDuration.create(1, TimeUnit.SECONDS), + Duration.ofSeconds(1), + Duration.ofSeconds(1), "New message" ); @@ -132,7 +123,7 @@ public class HubDocTest extends AbstractJavaTest { Flow busFlow = Flow.fromSinkAndSource(sink, source) .joinMat(KillSwitches.singleBidi(), Keep.right()) - .backpressureTimeout(FiniteDuration.create(1, TimeUnit.SECONDS)); + .backpressureTimeout(Duration.ofSeconds(1)); //#pub-sub-3 //#pub-sub-4 @@ -155,8 +146,8 @@ public class HubDocTest extends AbstractJavaTest { //#partition-hub // A simple producer that publishes a new "message-n" every second Source producer = Source.tick( - FiniteDuration.create(1, TimeUnit.SECONDS), - FiniteDuration.create(1, TimeUnit.SECONDS), + Duration.ofSeconds(1), + Duration.ofSeconds(1), "message" ).zipWith(Source.range(0, 100), (a, b) -> a + "-" + b); @@ -206,8 +197,8 @@ public class HubDocTest extends AbstractJavaTest { //#partition-hub-stateful // A simple producer that publishes a new "message-n" every second Source producer = Source.tick( - FiniteDuration.create(1, TimeUnit.SECONDS), - FiniteDuration.create(1, TimeUnit.SECONDS), + Duration.ofSeconds(1), + Duration.ofSeconds(1), "message" ).zipWith(Source.range(0, 100), (a, b) -> a + "-" + b); @@ -271,7 +262,7 @@ public class HubDocTest extends AbstractJavaTest { Source fromProducer = runnableGraph.run(materializer); fromProducer.runForeach(msg -> System.out.println("consumer1: " + msg), materializer); - fromProducer.throttle(10, Duration.create(100, TimeUnit.MILLISECONDS), 10, ThrottleMode.shaping()) + fromProducer.throttle(10, Duration.ofMillis(100), 10, ThrottleMode.shaping()) .runForeach(msg -> System.out.println("consumer2: " + msg), materializer); //#partition-hub-fastest diff --git a/akka-docs/src/test/java/jdocs/stream/KillSwitchDocTest.java b/akka-docs/src/test/java/jdocs/stream/KillSwitchDocTest.java index 9fc174a076..0bd98e39a0 100644 --- a/akka-docs/src/test/java/jdocs/stream/KillSwitchDocTest.java +++ b/akka-docs/src/test/java/jdocs/stream/KillSwitchDocTest.java @@ -17,8 +17,8 @@ import akka.testkit.javadsl.TestKit; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; -import scala.concurrent.duration.FiniteDuration; +import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.concurrent.CompletionStage; @@ -52,7 +52,7 @@ class KillSwitchDocTest extends AbstractJavaTest { //#unique-shutdown final Source countingSrc = Source.from(new ArrayList<>(Arrays.asList(1, 2, 3, 4))) - .delay(FiniteDuration.apply(1, TimeUnit.SECONDS), DelayOverflowStrategy.backpressure()); + .delay(Duration.ofSeconds(1), DelayOverflowStrategy.backpressure()); final Sink> lastSnk = Sink.last(); final Pair> stream = countingSrc @@ -75,7 +75,7 @@ class KillSwitchDocTest extends AbstractJavaTest { //#unique-abort final Source countingSrc = Source.from(new ArrayList<>(Arrays.asList(1, 2, 3, 4))) - .delay(FiniteDuration.apply(1, TimeUnit.SECONDS), DelayOverflowStrategy.backpressure()); + .delay(Duration.ofSeconds(1), DelayOverflowStrategy.backpressure()); final Sink> lastSnk = Sink.last(); final Pair> stream = countingSrc @@ -98,7 +98,7 @@ class KillSwitchDocTest extends AbstractJavaTest { //#shared-shutdown final Source countingSrc = Source.from(new ArrayList<>(Arrays.asList(1, 2, 3, 4))) - .delay(FiniteDuration.apply(1, TimeUnit.SECONDS), DelayOverflowStrategy.backpressure()); + .delay( Duration.ofSeconds(1), DelayOverflowStrategy.backpressure()); final Sink> lastSnk = Sink.last(); final SharedKillSwitch killSwitch = KillSwitches.shared("my-kill-switch"); @@ -106,7 +106,7 @@ class KillSwitchDocTest extends AbstractJavaTest { .viaMat(killSwitch.flow(), Keep.right()) .toMat(lastSnk, Keep.right()).run(mat); final CompletionStage completionStageDelayed = countingSrc - .delay(FiniteDuration.apply(1, TimeUnit.SECONDS), DelayOverflowStrategy.backpressure()) + .delay( Duration.ofSeconds(1), DelayOverflowStrategy.backpressure()) .viaMat(killSwitch.flow(), Keep.right()) .toMat(lastSnk, Keep.right()).run(mat); @@ -127,7 +127,7 @@ class KillSwitchDocTest extends AbstractJavaTest { //#shared-abort final Source countingSrc = Source.from(new ArrayList<>(Arrays.asList(1, 2, 3, 4))) - .delay(FiniteDuration.apply(1, TimeUnit.SECONDS), DelayOverflowStrategy.backpressure()); + .delay( Duration.ofSeconds(1), DelayOverflowStrategy.backpressure()); final Sink> lastSnk = Sink.last(); final SharedKillSwitch killSwitch = KillSwitches.shared("my-kill-switch"); diff --git a/akka-docs/src/test/java/jdocs/stream/QuickStartDocTest.java b/akka-docs/src/test/java/jdocs/stream/QuickStartDocTest.java index 9fc20f2cb5..5b310f50ba 100644 --- a/akka-docs/src/test/java/jdocs/stream/QuickStartDocTest.java +++ b/akka-docs/src/test/java/jdocs/stream/QuickStartDocTest.java @@ -17,12 +17,11 @@ import akka.util.ByteString; import java.nio.file.Paths; import java.math.BigInteger; +import java.time.Duration; import java.util.concurrent.CompletionStage; import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; import jdocs.AbstractJavaTest; -import scala.concurrent.duration.Duration; //#other-imports import org.junit.*; @@ -66,7 +65,7 @@ public class QuickStartDocTest extends AbstractJavaTest { //#add-streams factorials .zipWith(Source.range(0, 99), (num, idx) -> String.format("%d! = %s", idx, num)) - .throttle(1, Duration.create(1, TimeUnit.SECONDS), 1, ThrottleMode.shaping()) + .throttle(1, Duration.ofSeconds(1), 1, ThrottleMode.shaping()) //#add-streams .take(2) //#add-streams diff --git a/akka-docs/src/test/java/jdocs/stream/RestartDocTest.java b/akka-docs/src/test/java/jdocs/stream/RestartDocTest.java index 33c86d7865..63062d0ed7 100644 --- a/akka-docs/src/test/java/jdocs/stream/RestartDocTest.java +++ b/akka-docs/src/test/java/jdocs/stream/RestartDocTest.java @@ -9,12 +9,14 @@ import akka.actor.ActorSystem; import akka.stream.KillSwitch; import akka.stream.KillSwitches; import akka.stream.Materializer; -import akka.stream.javadsl.*; -import scala.concurrent.duration.Duration; +import akka.stream.javadsl.Keep; +import akka.stream.javadsl.RestartSource; +import akka.stream.javadsl.Sink; +import akka.stream.javadsl.Source; +import java.time.Duration; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; -import java.util.concurrent.TimeUnit; public class RestartDocTest { @@ -54,8 +56,8 @@ public class RestartDocTest { public void recoverWithBackoffSource() { //#restart-with-backoff-source Source eventStream = RestartSource.withBackoff( - Duration.apply(3, TimeUnit.SECONDS), // min backoff - Duration.apply(30, TimeUnit.SECONDS), // max backoff + Duration.ofSeconds(3), // min backoff + Duration.ofSeconds(30), // max backoff 0.2, // adds 20% "noise" to vary the intervals slightly 20, // limits the amount of restarts to 20 () -> diff --git a/akka-docs/src/test/java/jdocs/stream/StreamBuffersRateDocTest.java b/akka-docs/src/test/java/jdocs/stream/StreamBuffersRateDocTest.java index 505005075d..1f49572e7f 100644 --- a/akka-docs/src/test/java/jdocs/stream/StreamBuffersRateDocTest.java +++ b/akka-docs/src/test/java/jdocs/stream/StreamBuffersRateDocTest.java @@ -4,21 +4,19 @@ package jdocs.stream; -import java.util.Arrays; -import java.util.concurrent.TimeUnit; - import akka.NotUsed; -import jdocs.AbstractJavaTest; -import akka.testkit.javadsl.TestKit; -import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.junit.Test; - -import scala.concurrent.duration.FiniteDuration; import akka.actor.ActorSystem; import akka.actor.Cancellable; import akka.stream.*; import akka.stream.javadsl.*; +import akka.testkit.javadsl.TestKit; +import jdocs.AbstractJavaTest; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.time.Duration; +import java.util.Arrays; public class StreamBuffersRateDocTest extends AbstractJavaTest { @@ -77,12 +75,11 @@ public class StreamBuffersRateDocTest extends AbstractJavaTest { @Test public void demonstrateBufferAbstractionLeak() { //#buffering-abstraction-leak - final FiniteDuration oneSecond = - FiniteDuration.create(1, TimeUnit.SECONDS); + final Duration oneSecond = Duration.ofSeconds(1); final Source msgSource = Source.tick(oneSecond, oneSecond, "message!"); final Source tickSource = - Source.tick(oneSecond.mul(3), oneSecond.mul(3), "tick"); + Source.tick(oneSecond.multipliedBy(3), oneSecond.multipliedBy(3), "tick"); final Flow conflate = Flow.of(String.class).conflateWithSeed( first -> 1, (count, elem) -> count + 1); diff --git a/akka-docs/src/test/java/jdocs/stream/StreamTestKitDocTest.java b/akka-docs/src/test/java/jdocs/stream/StreamTestKitDocTest.java index d2c3bd02f0..ebcfd5d5ec 100644 --- a/akka-docs/src/test/java/jdocs/stream/StreamTestKitDocTest.java +++ b/akka-docs/src/test/java/jdocs/stream/StreamTestKitDocTest.java @@ -4,6 +4,7 @@ package jdocs.stream; +import java.time.Duration; import java.util.Arrays; import java.util.Collections; import java.util.List; @@ -26,7 +27,6 @@ import akka.stream.javadsl.*; import akka.stream.testkit.*; import akka.stream.testkit.javadsl.*; import akka.testkit.TestProbe; -import scala.concurrent.duration.Duration; import scala.concurrent.duration.FiniteDuration; @@ -101,7 +101,7 @@ public class StreamTestKitDocTest extends AbstractJavaTest { .grouped(2) .runWith(Sink.head(), mat); akka.pattern.PatternsCS.pipe(future, system.dispatcher()).to(probe.ref()); - probe.expectMsg(Duration.create(3, TimeUnit.SECONDS), + probe.expectMsg(FiniteDuration.create(3, TimeUnit.SECONDS), Arrays.asList(Arrays.asList(1, 2), Arrays.asList(3, 4)) ); //#pipeto-testprobe @@ -113,18 +113,18 @@ public class StreamTestKitDocTest extends AbstractJavaTest { public void sinkActorRef() throws Exception { //#sink-actorref final Source sourceUnderTest = Source.tick( - FiniteDuration.create(0, TimeUnit.MILLISECONDS), - FiniteDuration.create(200, TimeUnit.MILLISECONDS), + Duration.ZERO, + Duration.ofMillis(200), Tick.TOCK); final TestProbe probe = new TestProbe(system); final Cancellable cancellable = sourceUnderTest .to(Sink.actorRef(probe.ref(), Tick.COMPLETED)).run(mat); - probe.expectMsg(Duration.create(3, TimeUnit.SECONDS), Tick.TOCK); - probe.expectNoMsg(Duration.create(100, TimeUnit.MILLISECONDS)); - probe.expectMsg(Duration.create(3, TimeUnit.SECONDS), Tick.TOCK); + probe.expectMsg(FiniteDuration.create(3, TimeUnit.SECONDS), Tick.TOCK); + probe.expectNoMsg(FiniteDuration.create(100, TimeUnit.MILLISECONDS)); + probe.expectMsg(FiniteDuration.create(3, TimeUnit.SECONDS), Tick.TOCK); cancellable.cancel(); - probe.expectMsg(Duration.create(3, TimeUnit.SECONDS), Tick.COMPLETED); + probe.expectMsg(FiniteDuration.create(3, TimeUnit.SECONDS), Tick.COMPLETED); //#sink-actorref } @@ -207,7 +207,7 @@ public class StreamTestKitDocTest extends AbstractJavaTest { //#test-source-and-sink final Flow flowUnderTest = Flow.of(Integer.class) .mapAsyncUnordered(2, sleep -> akka.pattern.PatternsCS.after( - Duration.create(10, TimeUnit.MILLISECONDS), + FiniteDuration.create(10, TimeUnit.MILLISECONDS), system.scheduler(), system.dispatcher(), CompletableFuture.completedFuture(sleep) diff --git a/akka-docs/src/test/java/jdocs/stream/javadsl/cookbook/RecipeAdhocSourceTest.java b/akka-docs/src/test/java/jdocs/stream/javadsl/cookbook/RecipeAdhocSourceTest.java index d63d2f768c..f3eb7c86ee 100644 --- a/akka-docs/src/test/java/jdocs/stream/javadsl/cookbook/RecipeAdhocSourceTest.java +++ b/akka-docs/src/test/java/jdocs/stream/javadsl/cookbook/RecipeAdhocSourceTest.java @@ -17,12 +17,13 @@ import akka.stream.testkit.javadsl.TestSink; import akka.testkit.javadsl.TestKit; import org.junit.AfterClass; import org.junit.BeforeClass; -import org.junit.Test; import org.junit.Ignore; +import org.junit.Test; import scala.concurrent.Await; import scala.concurrent.Promise; -import scala.concurrent.duration.Duration; import scala.concurrent.duration.FiniteDuration; + +import java.time.Duration; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -32,7 +33,7 @@ import static org.junit.Assert.assertEquals; public class RecipeAdhocSourceTest extends RecipeTest { static ActorSystem system; static Materializer mat; - FiniteDuration duration200mills = Duration.create(200, "milliseconds"); + Duration duration200mills = Duration.ofMillis(200); @BeforeClass public static void setup() { @@ -48,7 +49,7 @@ public class RecipeAdhocSourceTest extends RecipeTest { } //#adhoc-source - public Source adhocSource(Source source, FiniteDuration timeout, int maxRetries) { + public Source adhocSource(Source source, Duration timeout, int maxRetries) { return Source.lazily( () -> source.backpressureTimeout(timeout).recoverWithRetries( maxRetries, @@ -204,7 +205,7 @@ public class RecipeAdhocSourceTest extends RecipeTest { Thread.sleep(500); assertEquals(TimeoutException.class, probe.expectError().getClass()); probe.request(1); //send demand - probe.expectNoMessage(Duration.create(200, "milliseconds")); //but no more restart + probe.expectNoMessage(FiniteDuration.create(200, "milliseconds")); //but no more restart } }; } diff --git a/akka-docs/src/test/java/jdocs/stream/javadsl/cookbook/RecipeKeepAlive.java b/akka-docs/src/test/java/jdocs/stream/javadsl/cookbook/RecipeKeepAlive.java index 39e40609ab..aad0b4958d 100644 --- a/akka-docs/src/test/java/jdocs/stream/javadsl/cookbook/RecipeKeepAlive.java +++ b/akka-docs/src/test/java/jdocs/stream/javadsl/cookbook/RecipeKeepAlive.java @@ -15,7 +15,7 @@ import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; -import java.util.concurrent.TimeUnit; +import java.time.Duration; public class RecipeKeepAlive extends RecipeTest { static ActorSystem system; @@ -47,8 +47,8 @@ public class RecipeKeepAlive extends RecipeTest { //#inject-keepalive Flow keepAliveInject = Flow.of(ByteString.class).keepAlive( - scala.concurrent.duration.Duration.create(1, TimeUnit.SECONDS), - () -> keepAliveMessage); + Duration.ofSeconds(1), + () -> keepAliveMessage); //#inject-keepalive //@formatter:on diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java index 1e2bc55b36..81bfb5787f 100644 --- a/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java @@ -65,7 +65,7 @@ public class FlowTest extends StreamTest { final java.lang.Iterable input = Arrays.asList(0, 1, 2, 3, 4, 5); final Source ints = Source.from(input); final Flow flow1 = Flow.of(Integer.class).drop(2).take(3 - ).takeWithin(FiniteDuration.create(10, TimeUnit.SECONDS + ).takeWithin(java.time.Duration.ofSeconds(10 )).map(new Function() { public String apply(Integer elem) { return lookup[elem]; @@ -80,7 +80,7 @@ public class FlowTest extends StreamTest { public java.util.List apply(java.util.List elem) { return elem; } - }).groupedWithin(100, FiniteDuration.create(50, TimeUnit.MILLISECONDS) + }).groupedWithin(100, java.time.Duration.ofMillis(50) ).mapConcat(new Function, java.lang.Iterable>() { public java.util.List apply(java.util.List elem) { return elem; diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowThrottleTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowThrottleTest.java index 38a0dcd80f..06fd8cd476 100644 --- a/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowThrottleTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowThrottleTest.java @@ -4,22 +4,18 @@ package akka.stream.javadsl; -import akka.Done; import akka.NotUsed; -import akka.actor.ActorRef; -import akka.japi.Pair; -import akka.japi.function.Function; -import akka.stream.*; +import akka.stream.StreamTest; +import akka.stream.ThrottleMode; +import akka.testkit.AkkaJUnitActorSystemResource; +import akka.testkit.AkkaSpec; import org.junit.ClassRule; import org.junit.Test; -import scala.concurrent.duration.FiniteDuration; -import java.util.*; -import java.util.concurrent.CompletableFuture; +import java.util.Collections; +import java.util.List; import java.util.concurrent.CompletionStage; import java.util.concurrent.TimeUnit; -import akka.testkit.AkkaSpec; -import akka.testkit.AkkaJUnitActorSystemResource; import static org.junit.Assert.assertEquals; @@ -36,7 +32,7 @@ public class FlowThrottleTest extends StreamTest { public void mustWorksForTwoStreams() throws Exception { final Flow sharedThrottle = Flow.of(Integer.class) - .throttle(1, FiniteDuration.create(1, TimeUnit.DAYS), 1, ThrottleMode.enforcing()); + .throttle(1,java.time.Duration.ofDays(1), 1, ThrottleMode.enforcing()); CompletionStage> result1 = Source.single(1).via(sharedThrottle).via(sharedThrottle).runWith(Sink.seq(), materializer); diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java index 5f757c717c..e0ba11e044 100644 --- a/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java @@ -20,11 +20,11 @@ import akka.stream.testkit.TestPublisher; import akka.testkit.javadsl.TestKit; import org.junit.ClassRule; import org.junit.Test; -import scala.concurrent.duration.Duration; import scala.concurrent.duration.FiniteDuration; import scala.util.Try; import akka.testkit.AkkaJUnitActorSystemResource; +import java.time.Duration; import java.util.*; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; @@ -71,12 +71,12 @@ public class SourceTest extends StreamTest { ints .drop(2) .take(3) - .takeWithin(FiniteDuration.create(10, TimeUnit.SECONDS)) + .takeWithin(Duration.ofSeconds(10)) .map(elem -> lookup[elem]) .filter(elem -> !elem.equals("c")) .grouped(2) .mapConcat(elem -> elem) - .groupedWithin(100, FiniteDuration.create(50, TimeUnit.MILLISECONDS)) + .groupedWithin(100, Duration.ofMillis(50)) .mapConcat(elem -> elem) .runFold("", (acc, elem) -> acc + elem, materializer) .thenAccept(elem -> probe.getRef().tell(elem, ActorRef.noSender())); @@ -432,8 +432,8 @@ public class SourceTest extends StreamTest { @Test public void mustProduceTicks() throws Exception { final TestKit probe = new TestKit(system); - Source tickSource = Source.tick(FiniteDuration.create(1, TimeUnit.SECONDS), - FiniteDuration.create(500, TimeUnit.MILLISECONDS), "tick"); + Source tickSource = Source.tick(Duration.ofSeconds(1), + Duration.ofMillis(500), "tick"); @SuppressWarnings("unused") Cancellable cancellable = tickSource.to(Sink.foreach(new Procedure() { public void apply(String elem) { @@ -450,8 +450,8 @@ public class SourceTest extends StreamTest { @Test @SuppressWarnings("unused") public void mustCompileMethodsWithJavaDuration() { - Source tickSource = Source.tick(java.time.Duration.ofSeconds(1), - java.time.Duration.ofMillis(500), NotUsed.getInstance()); + Source tickSource = Source.tick(Duration.ofSeconds(1), + Duration.ofMillis(500), NotUsed.getInstance()); } @Test @@ -642,7 +642,7 @@ public class SourceTest extends StreamTest { probe.expectMsgEquals(0); probe.expectMsgEquals(1); - FiniteDuration duration = Duration.apply(200, TimeUnit.MILLISECONDS); + FiniteDuration duration = FiniteDuration.apply(200, TimeUnit.MILLISECONDS); probe.expectNoMsg(duration); future.toCompletableFuture().get(3, TimeUnit.SECONDS); @@ -819,7 +819,7 @@ public class SourceTest extends StreamTest { public void mustBeAbleToUseInitialTimeout() throws Throwable { try { try { - Source.maybe().initialTimeout(Duration.create(1, "second")).runWith(Sink.head(), materializer) + Source.maybe().initialTimeout(Duration.ofSeconds(1)).runWith(Sink.head(), materializer) .toCompletableFuture().get(3, TimeUnit.SECONDS); org.junit.Assert.fail("A TimeoutException was expected"); } catch (ExecutionException e) { @@ -834,7 +834,7 @@ public class SourceTest extends StreamTest { public void mustBeAbleToUseCompletionTimeout() throws Throwable { try { try { - Source.maybe().completionTimeout(Duration.create(1, "second")).runWith(Sink.head(), materializer) + Source.maybe().completionTimeout(Duration.ofSeconds(1)).runWith(Sink.head(), materializer) .toCompletableFuture().get(3, TimeUnit.SECONDS); org.junit.Assert.fail("A TimeoutException was expected"); } catch (ExecutionException e) { @@ -849,7 +849,7 @@ public class SourceTest extends StreamTest { public void mustBeAbleToUseIdleTimeout() throws Throwable { try { try { - Source.maybe().idleTimeout(Duration.create(1, "second")).runWith(Sink.head(), materializer) + Source.maybe().idleTimeout(Duration.ofSeconds(1)).runWith(Sink.head(), materializer) .toCompletableFuture().get(3, TimeUnit.SECONDS); org.junit.Assert.fail("A TimeoutException was expected"); } catch (ExecutionException e) { @@ -864,8 +864,8 @@ public class SourceTest extends StreamTest { public void mustBeAbleToUseIdleInject() throws Exception { Integer result = Source.maybe() - .keepAlive(Duration.create(1, "second"), () -> 0) - .takeWithin(Duration.create(1500, "milliseconds")) + .keepAlive(Duration.ofSeconds(1), () -> 0) + .takeWithin(Duration.ofMillis(1500)) .runWith(Sink.head(), materializer) .toCompletableFuture().get(3, TimeUnit.SECONDS); @@ -882,8 +882,8 @@ public class SourceTest extends StreamTest { public void mustBeAbleToUseThrottle() throws Exception { Integer result = Source.from(Arrays.asList(0, 1, 2)) - .throttle(10, FiniteDuration.create(1, TimeUnit.SECONDS), 10, ThrottleMode.shaping()) - .throttle(10, FiniteDuration.create(1, TimeUnit.SECONDS), 10, ThrottleMode.enforcing()) + .throttle(10, Duration.ofSeconds(1), 10, ThrottleMode.shaping()) + .throttle(10, Duration.ofSeconds(1), 10, ThrottleMode.enforcing()) .runWith(Sink.head(), materializer) .toCompletableFuture().get(3, TimeUnit.SECONDS);