Merge pull request #24755 from hepin1989/javadocs

=doc using java.time.Duration for stream's javadsl
This commit is contained in:
Patrik Nordwall 2018-03-21 13:36:14 +01:00 committed by GitHub
commit b34c38e37e
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
13 changed files with 100 additions and 119 deletions

View file

@ -4,30 +4,31 @@
package jdocs.stream; package jdocs.stream;
import static org.junit.Assert.assertEquals;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
import akka.NotUsed; import akka.NotUsed;
import akka.actor.AbstractActor; import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Cancellable;
import akka.dispatch.Futures;
import akka.japi.Pair; import akka.japi.Pair;
import jdocs.AbstractJavaTest; import akka.stream.*;
import akka.stream.javadsl.*;
import akka.testkit.javadsl.TestKit; import akka.testkit.javadsl.TestKit;
import jdocs.AbstractJavaTest;
import org.junit.AfterClass; import org.junit.AfterClass;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import org.junit.Test; import org.junit.Test;
import scala.concurrent.duration.Duration; import java.time.Duration;
import scala.concurrent.duration.FiniteDuration; import java.util.Arrays;
import akka.actor.ActorSystem; import java.util.LinkedList;
import akka.actor.ActorRef; import java.util.List;
import akka.actor.Cancellable; import java.util.Optional;
import akka.dispatch.Futures; import java.util.concurrent.CompletableFuture;
import akka.stream.*; import java.util.concurrent.CompletionStage;
import akka.stream.javadsl.*; import java.util.concurrent.TimeUnit;
import static org.junit.Assert.assertEquals;
public class FlowDocTest extends AbstractJavaTest { public class FlowDocTest extends AbstractJavaTest {
@ -131,7 +132,7 @@ public class FlowDocTest extends AbstractJavaTest {
//#compound-source-is-not-keyed-runWith //#compound-source-is-not-keyed-runWith
final Object tick = new Object(); final Object tick = new Object();
final FiniteDuration oneSecond = Duration.create(1, TimeUnit.SECONDS); final Duration oneSecond = Duration.ofSeconds(1);
//akka.actor.Cancellable //akka.actor.Cancellable
final Source<Object, Cancellable> timer = final Source<Object, Cancellable> timer =
Source.tick(oneSecond, oneSecond, tick); Source.tick(oneSecond, oneSecond, tick);
@ -208,7 +209,7 @@ public class FlowDocTest extends AbstractJavaTest {
@Test @Test
public void transformingMaterialized() throws Exception { public void transformingMaterialized() throws Exception {
FiniteDuration oneSecond = FiniteDuration.apply(1, TimeUnit.SECONDS); Duration oneSecond = Duration.ofSeconds(1);
Flow<Integer, Integer, Cancellable> throttler = Flow<Integer, Integer, Cancellable> throttler =
Flow.fromGraph(GraphDSL.create( Flow.fromGraph(GraphDSL.create(
Source.tick(oneSecond, oneSecond, ""), Source.tick(oneSecond, oneSecond, ""),

View file

@ -4,14 +4,14 @@
package jdocs.stream; package jdocs.stream;
//#imports
import akka.Done; import akka.Done;
import akka.NotUsed; import akka.NotUsed;
import akka.actor.ActorSystem; import akka.actor.ActorSystem;
//#imports
import akka.dispatch.Futures;
import akka.japi.Option; import akka.japi.Option;
import akka.japi.Pair; import akka.japi.Pair;
import akka.japi.Predicate; import akka.japi.Predicate;
import akka.japi.Function;
import akka.japi.function.Procedure; import akka.japi.function.Procedure;
import akka.stream.*; import akka.stream.*;
import akka.stream.javadsl.*; import akka.stream.javadsl.*;
@ -19,21 +19,15 @@ import akka.stream.stage.*;
//#imports //#imports
import akka.stream.testkit.TestPublisher; import akka.stream.testkit.TestPublisher;
import akka.stream.testkit.TestSubscriber; import akka.stream.testkit.TestSubscriber;
import akka.japi.Function;
import jdocs.AbstractJavaTest; import jdocs.AbstractJavaTest;
import akka.testkit.javadsl.TestKit; import akka.testkit.javadsl.TestKit;
import org.junit.AfterClass; import org.junit.AfterClass;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import org.junit.Test; import org.junit.Test;
import org.reactivestreams.Subscription; import org.reactivestreams.Subscription;
import scala.compat.java8.FutureConverters;
import scala.concurrent.ExecutionContext; import scala.concurrent.ExecutionContext;
import scala.concurrent.Promise;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;
import java.util.*; import java.util.*;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage; import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -568,7 +562,7 @@ public class GraphStageDocTest extends AbstractJavaTest {
CompletionStage<Integer> result = CompletionStage<Integer> result =
Source.from(Arrays.asList(1, 2, 3)) Source.from(Arrays.asList(1, 2, 3))
.via(new TimedGate<>(2)) .via(new TimedGate<>(2))
.takeWithin(Duration.create(250, "millis")) .takeWithin(java.time.Duration.ofMillis(250))
.runFold(0, (n, sum) -> n + sum, mat); .runFold(0, (n, sum) -> n + sum, mat);
assertEquals(new Integer(1), result.toCompletableFuture().get(3, TimeUnit.SECONDS)); assertEquals(new Integer(1), result.toCompletableFuture().get(3, TimeUnit.SECONDS));

View file

@ -9,27 +9,18 @@ import akka.NotUsed;
import akka.actor.ActorSystem; import akka.actor.ActorSystem;
import akka.actor.Cancellable; import akka.actor.Cancellable;
import akka.japi.Pair; import akka.japi.Pair;
import akka.stream.ActorMaterializer; import akka.stream.*;
import akka.stream.KillSwitches;
import akka.stream.Materializer;
import akka.stream.ThrottleMode;
import akka.stream.UniqueKillSwitch;
import akka.stream.javadsl.*; import akka.stream.javadsl.*;
import akka.stream.javadsl.PartitionHub.ConsumerInfo; import akka.stream.javadsl.PartitionHub.ConsumerInfo;
import jdocs.AbstractJavaTest;
import akka.testkit.javadsl.TestKit; import akka.testkit.javadsl.TestKit;
import jdocs.AbstractJavaTest;
import org.junit.AfterClass; import org.junit.AfterClass;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import org.junit.Test; import org.junit.Test;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;
import java.time.Duration;
import java.util.List; import java.util.List;
import java.util.concurrent.CompletionStage; import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;
import java.util.function.Supplier;
import java.util.function.ToLongBiFunction; import java.util.function.ToLongBiFunction;
public class HubDocTest extends AbstractJavaTest { public class HubDocTest extends AbstractJavaTest {
@ -80,8 +71,8 @@ public class HubDocTest extends AbstractJavaTest {
//#broadcast-hub //#broadcast-hub
// A simple producer that publishes a new "message" every second // A simple producer that publishes a new "message" every second
Source<String, Cancellable> producer = Source.tick( Source<String, Cancellable> producer = Source.tick(
FiniteDuration.create(1, TimeUnit.SECONDS), Duration.ofSeconds(1),
FiniteDuration.create(1, TimeUnit.SECONDS), Duration.ofSeconds(1),
"New message" "New message"
); );
@ -132,7 +123,7 @@ public class HubDocTest extends AbstractJavaTest {
Flow<String, String, UniqueKillSwitch> busFlow = Flow<String, String, UniqueKillSwitch> busFlow =
Flow.fromSinkAndSource(sink, source) Flow.fromSinkAndSource(sink, source)
.joinMat(KillSwitches.singleBidi(), Keep.right()) .joinMat(KillSwitches.singleBidi(), Keep.right())
.backpressureTimeout(FiniteDuration.create(1, TimeUnit.SECONDS)); .backpressureTimeout(Duration.ofSeconds(1));
//#pub-sub-3 //#pub-sub-3
//#pub-sub-4 //#pub-sub-4
@ -155,8 +146,8 @@ public class HubDocTest extends AbstractJavaTest {
//#partition-hub //#partition-hub
// A simple producer that publishes a new "message-n" every second // A simple producer that publishes a new "message-n" every second
Source<String, Cancellable> producer = Source.tick( Source<String, Cancellable> producer = Source.tick(
FiniteDuration.create(1, TimeUnit.SECONDS), Duration.ofSeconds(1),
FiniteDuration.create(1, TimeUnit.SECONDS), Duration.ofSeconds(1),
"message" "message"
).zipWith(Source.range(0, 100), (a, b) -> a + "-" + b); ).zipWith(Source.range(0, 100), (a, b) -> a + "-" + b);
@ -206,8 +197,8 @@ public class HubDocTest extends AbstractJavaTest {
//#partition-hub-stateful //#partition-hub-stateful
// A simple producer that publishes a new "message-n" every second // A simple producer that publishes a new "message-n" every second
Source<String, Cancellable> producer = Source.tick( Source<String, Cancellable> producer = Source.tick(
FiniteDuration.create(1, TimeUnit.SECONDS), Duration.ofSeconds(1),
FiniteDuration.create(1, TimeUnit.SECONDS), Duration.ofSeconds(1),
"message" "message"
).zipWith(Source.range(0, 100), (a, b) -> a + "-" + b); ).zipWith(Source.range(0, 100), (a, b) -> a + "-" + b);
@ -271,7 +262,7 @@ public class HubDocTest extends AbstractJavaTest {
Source<Integer, NotUsed> fromProducer = runnableGraph.run(materializer); Source<Integer, NotUsed> fromProducer = runnableGraph.run(materializer);
fromProducer.runForeach(msg -> System.out.println("consumer1: " + msg), materializer); fromProducer.runForeach(msg -> System.out.println("consumer1: " + msg), materializer);
fromProducer.throttle(10, Duration.create(100, TimeUnit.MILLISECONDS), 10, ThrottleMode.shaping()) fromProducer.throttle(10, Duration.ofMillis(100), 10, ThrottleMode.shaping())
.runForeach(msg -> System.out.println("consumer2: " + msg), materializer); .runForeach(msg -> System.out.println("consumer2: " + msg), materializer);
//#partition-hub-fastest //#partition-hub-fastest

View file

@ -17,8 +17,8 @@ import akka.testkit.javadsl.TestKit;
import org.junit.AfterClass; import org.junit.AfterClass;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import org.junit.Test; import org.junit.Test;
import scala.concurrent.duration.FiniteDuration;
import java.time.Duration;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.concurrent.CompletionStage; import java.util.concurrent.CompletionStage;
@ -52,7 +52,7 @@ class KillSwitchDocTest extends AbstractJavaTest {
//#unique-shutdown //#unique-shutdown
final Source<Integer, NotUsed> countingSrc = final Source<Integer, NotUsed> countingSrc =
Source.from(new ArrayList<>(Arrays.asList(1, 2, 3, 4))) Source.from(new ArrayList<>(Arrays.asList(1, 2, 3, 4)))
.delay(FiniteDuration.apply(1, TimeUnit.SECONDS), DelayOverflowStrategy.backpressure()); .delay(Duration.ofSeconds(1), DelayOverflowStrategy.backpressure());
final Sink<Integer, CompletionStage<Integer>> lastSnk = Sink.last(); final Sink<Integer, CompletionStage<Integer>> lastSnk = Sink.last();
final Pair<UniqueKillSwitch, CompletionStage<Integer>> stream = countingSrc final Pair<UniqueKillSwitch, CompletionStage<Integer>> stream = countingSrc
@ -75,7 +75,7 @@ class KillSwitchDocTest extends AbstractJavaTest {
//#unique-abort //#unique-abort
final Source<Integer, NotUsed> countingSrc = final Source<Integer, NotUsed> countingSrc =
Source.from(new ArrayList<>(Arrays.asList(1, 2, 3, 4))) Source.from(new ArrayList<>(Arrays.asList(1, 2, 3, 4)))
.delay(FiniteDuration.apply(1, TimeUnit.SECONDS), DelayOverflowStrategy.backpressure()); .delay(Duration.ofSeconds(1), DelayOverflowStrategy.backpressure());
final Sink<Integer, CompletionStage<Integer>> lastSnk = Sink.last(); final Sink<Integer, CompletionStage<Integer>> lastSnk = Sink.last();
final Pair<UniqueKillSwitch, CompletionStage<Integer>> stream = countingSrc final Pair<UniqueKillSwitch, CompletionStage<Integer>> stream = countingSrc
@ -98,7 +98,7 @@ class KillSwitchDocTest extends AbstractJavaTest {
//#shared-shutdown //#shared-shutdown
final Source<Integer, NotUsed> countingSrc = final Source<Integer, NotUsed> countingSrc =
Source.from(new ArrayList<>(Arrays.asList(1, 2, 3, 4))) Source.from(new ArrayList<>(Arrays.asList(1, 2, 3, 4)))
.delay(FiniteDuration.apply(1, TimeUnit.SECONDS), DelayOverflowStrategy.backpressure()); .delay( Duration.ofSeconds(1), DelayOverflowStrategy.backpressure());
final Sink<Integer, CompletionStage<Integer>> lastSnk = Sink.last(); final Sink<Integer, CompletionStage<Integer>> lastSnk = Sink.last();
final SharedKillSwitch killSwitch = KillSwitches.shared("my-kill-switch"); final SharedKillSwitch killSwitch = KillSwitches.shared("my-kill-switch");
@ -106,7 +106,7 @@ class KillSwitchDocTest extends AbstractJavaTest {
.viaMat(killSwitch.flow(), Keep.right()) .viaMat(killSwitch.flow(), Keep.right())
.toMat(lastSnk, Keep.right()).run(mat); .toMat(lastSnk, Keep.right()).run(mat);
final CompletionStage<Integer> completionStageDelayed = countingSrc final CompletionStage<Integer> completionStageDelayed = countingSrc
.delay(FiniteDuration.apply(1, TimeUnit.SECONDS), DelayOverflowStrategy.backpressure()) .delay( Duration.ofSeconds(1), DelayOverflowStrategy.backpressure())
.viaMat(killSwitch.flow(), Keep.right()) .viaMat(killSwitch.flow(), Keep.right())
.toMat(lastSnk, Keep.right()).run(mat); .toMat(lastSnk, Keep.right()).run(mat);
@ -127,7 +127,7 @@ class KillSwitchDocTest extends AbstractJavaTest {
//#shared-abort //#shared-abort
final Source<Integer, NotUsed> countingSrc = final Source<Integer, NotUsed> countingSrc =
Source.from(new ArrayList<>(Arrays.asList(1, 2, 3, 4))) Source.from(new ArrayList<>(Arrays.asList(1, 2, 3, 4)))
.delay(FiniteDuration.apply(1, TimeUnit.SECONDS), DelayOverflowStrategy.backpressure()); .delay( Duration.ofSeconds(1), DelayOverflowStrategy.backpressure());
final Sink<Integer, CompletionStage<Integer>> lastSnk = Sink.last(); final Sink<Integer, CompletionStage<Integer>> lastSnk = Sink.last();
final SharedKillSwitch killSwitch = KillSwitches.shared("my-kill-switch"); final SharedKillSwitch killSwitch = KillSwitches.shared("my-kill-switch");

View file

@ -17,12 +17,11 @@ import akka.util.ByteString;
import java.nio.file.Paths; import java.nio.file.Paths;
import java.math.BigInteger; import java.math.BigInteger;
import java.time.Duration;
import java.util.concurrent.CompletionStage; import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import jdocs.AbstractJavaTest; import jdocs.AbstractJavaTest;
import scala.concurrent.duration.Duration;
//#other-imports //#other-imports
import org.junit.*; import org.junit.*;
@ -66,7 +65,7 @@ public class QuickStartDocTest extends AbstractJavaTest {
//#add-streams //#add-streams
factorials factorials
.zipWith(Source.range(0, 99), (num, idx) -> String.format("%d! = %s", idx, num)) .zipWith(Source.range(0, 99), (num, idx) -> String.format("%d! = %s", idx, num))
.throttle(1, Duration.create(1, TimeUnit.SECONDS), 1, ThrottleMode.shaping()) .throttle(1, Duration.ofSeconds(1), 1, ThrottleMode.shaping())
//#add-streams //#add-streams
.take(2) .take(2)
//#add-streams //#add-streams

View file

@ -9,12 +9,14 @@ import akka.actor.ActorSystem;
import akka.stream.KillSwitch; import akka.stream.KillSwitch;
import akka.stream.KillSwitches; import akka.stream.KillSwitches;
import akka.stream.Materializer; import akka.stream.Materializer;
import akka.stream.javadsl.*; import akka.stream.javadsl.Keep;
import scala.concurrent.duration.Duration; import akka.stream.javadsl.RestartSource;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
import java.time.Duration;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage; import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
public class RestartDocTest { public class RestartDocTest {
@ -54,8 +56,8 @@ public class RestartDocTest {
public void recoverWithBackoffSource() { public void recoverWithBackoffSource() {
//#restart-with-backoff-source //#restart-with-backoff-source
Source<ServerSentEvent, NotUsed> eventStream = RestartSource.withBackoff( Source<ServerSentEvent, NotUsed> eventStream = RestartSource.withBackoff(
Duration.apply(3, TimeUnit.SECONDS), // min backoff Duration.ofSeconds(3), // min backoff
Duration.apply(30, TimeUnit.SECONDS), // max backoff Duration.ofSeconds(30), // max backoff
0.2, // adds 20% "noise" to vary the intervals slightly 0.2, // adds 20% "noise" to vary the intervals slightly
20, // limits the amount of restarts to 20 20, // limits the amount of restarts to 20
() -> () ->

View file

@ -4,21 +4,19 @@
package jdocs.stream; package jdocs.stream;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;
import akka.NotUsed; import akka.NotUsed;
import jdocs.AbstractJavaTest;
import akka.testkit.javadsl.TestKit;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import scala.concurrent.duration.FiniteDuration;
import akka.actor.ActorSystem; import akka.actor.ActorSystem;
import akka.actor.Cancellable; import akka.actor.Cancellable;
import akka.stream.*; import akka.stream.*;
import akka.stream.javadsl.*; import akka.stream.javadsl.*;
import akka.testkit.javadsl.TestKit;
import jdocs.AbstractJavaTest;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import java.time.Duration;
import java.util.Arrays;
public class StreamBuffersRateDocTest extends AbstractJavaTest { public class StreamBuffersRateDocTest extends AbstractJavaTest {
@ -77,12 +75,11 @@ public class StreamBuffersRateDocTest extends AbstractJavaTest {
@Test @Test
public void demonstrateBufferAbstractionLeak() { public void demonstrateBufferAbstractionLeak() {
//#buffering-abstraction-leak //#buffering-abstraction-leak
final FiniteDuration oneSecond = final Duration oneSecond = Duration.ofSeconds(1);
FiniteDuration.create(1, TimeUnit.SECONDS);
final Source<String, Cancellable> msgSource = final Source<String, Cancellable> msgSource =
Source.tick(oneSecond, oneSecond, "message!"); Source.tick(oneSecond, oneSecond, "message!");
final Source<String, Cancellable> tickSource = final Source<String, Cancellable> tickSource =
Source.tick(oneSecond.mul(3), oneSecond.mul(3), "tick"); Source.tick(oneSecond.multipliedBy(3), oneSecond.multipliedBy(3), "tick");
final Flow<String, Integer, NotUsed> conflate = final Flow<String, Integer, NotUsed> conflate =
Flow.of(String.class).conflateWithSeed( Flow.of(String.class).conflateWithSeed(
first -> 1, (count, elem) -> count + 1); first -> 1, (count, elem) -> count + 1);

View file

@ -4,6 +4,7 @@
package jdocs.stream; package jdocs.stream;
import java.time.Duration;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
@ -26,7 +27,6 @@ import akka.stream.javadsl.*;
import akka.stream.testkit.*; import akka.stream.testkit.*;
import akka.stream.testkit.javadsl.*; import akka.stream.testkit.javadsl.*;
import akka.testkit.TestProbe; import akka.testkit.TestProbe;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration; import scala.concurrent.duration.FiniteDuration;
@ -101,7 +101,7 @@ public class StreamTestKitDocTest extends AbstractJavaTest {
.grouped(2) .grouped(2)
.runWith(Sink.head(), mat); .runWith(Sink.head(), mat);
akka.pattern.PatternsCS.pipe(future, system.dispatcher()).to(probe.ref()); akka.pattern.PatternsCS.pipe(future, system.dispatcher()).to(probe.ref());
probe.expectMsg(Duration.create(3, TimeUnit.SECONDS), probe.expectMsg(FiniteDuration.create(3, TimeUnit.SECONDS),
Arrays.asList(Arrays.asList(1, 2), Arrays.asList(3, 4)) Arrays.asList(Arrays.asList(1, 2), Arrays.asList(3, 4))
); );
//#pipeto-testprobe //#pipeto-testprobe
@ -113,18 +113,18 @@ public class StreamTestKitDocTest extends AbstractJavaTest {
public void sinkActorRef() throws Exception { public void sinkActorRef() throws Exception {
//#sink-actorref //#sink-actorref
final Source<Tick, Cancellable> sourceUnderTest = Source.tick( final Source<Tick, Cancellable> sourceUnderTest = Source.tick(
FiniteDuration.create(0, TimeUnit.MILLISECONDS), Duration.ZERO,
FiniteDuration.create(200, TimeUnit.MILLISECONDS), Duration.ofMillis(200),
Tick.TOCK); Tick.TOCK);
final TestProbe probe = new TestProbe(system); final TestProbe probe = new TestProbe(system);
final Cancellable cancellable = sourceUnderTest final Cancellable cancellable = sourceUnderTest
.to(Sink.actorRef(probe.ref(), Tick.COMPLETED)).run(mat); .to(Sink.actorRef(probe.ref(), Tick.COMPLETED)).run(mat);
probe.expectMsg(Duration.create(3, TimeUnit.SECONDS), Tick.TOCK); probe.expectMsg(FiniteDuration.create(3, TimeUnit.SECONDS), Tick.TOCK);
probe.expectNoMsg(Duration.create(100, TimeUnit.MILLISECONDS)); probe.expectNoMsg(FiniteDuration.create(100, TimeUnit.MILLISECONDS));
probe.expectMsg(Duration.create(3, TimeUnit.SECONDS), Tick.TOCK); probe.expectMsg(FiniteDuration.create(3, TimeUnit.SECONDS), Tick.TOCK);
cancellable.cancel(); cancellable.cancel();
probe.expectMsg(Duration.create(3, TimeUnit.SECONDS), Tick.COMPLETED); probe.expectMsg(FiniteDuration.create(3, TimeUnit.SECONDS), Tick.COMPLETED);
//#sink-actorref //#sink-actorref
} }
@ -207,7 +207,7 @@ public class StreamTestKitDocTest extends AbstractJavaTest {
//#test-source-and-sink //#test-source-and-sink
final Flow<Integer, Integer, NotUsed> flowUnderTest = Flow.of(Integer.class) final Flow<Integer, Integer, NotUsed> flowUnderTest = Flow.of(Integer.class)
.mapAsyncUnordered(2, sleep -> akka.pattern.PatternsCS.after( .mapAsyncUnordered(2, sleep -> akka.pattern.PatternsCS.after(
Duration.create(10, TimeUnit.MILLISECONDS), FiniteDuration.create(10, TimeUnit.MILLISECONDS),
system.scheduler(), system.scheduler(),
system.dispatcher(), system.dispatcher(),
CompletableFuture.completedFuture(sleep) CompletableFuture.completedFuture(sleep)

View file

@ -17,12 +17,13 @@ import akka.stream.testkit.javadsl.TestSink;
import akka.testkit.javadsl.TestKit; import akka.testkit.javadsl.TestKit;
import org.junit.AfterClass; import org.junit.AfterClass;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.Ignore; import org.junit.Ignore;
import org.junit.Test;
import scala.concurrent.Await; import scala.concurrent.Await;
import scala.concurrent.Promise; import scala.concurrent.Promise;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration; import scala.concurrent.duration.FiniteDuration;
import java.time.Duration;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
@ -32,7 +33,7 @@ import static org.junit.Assert.assertEquals;
public class RecipeAdhocSourceTest extends RecipeTest { public class RecipeAdhocSourceTest extends RecipeTest {
static ActorSystem system; static ActorSystem system;
static Materializer mat; static Materializer mat;
FiniteDuration duration200mills = Duration.create(200, "milliseconds"); Duration duration200mills = Duration.ofMillis(200);
@BeforeClass @BeforeClass
public static void setup() { public static void setup() {
@ -48,7 +49,7 @@ public class RecipeAdhocSourceTest extends RecipeTest {
} }
//#adhoc-source //#adhoc-source
public <T> Source<T, ?> adhocSource(Source<T, ?> source, FiniteDuration timeout, int maxRetries) { public <T> Source<T, ?> adhocSource(Source<T, ?> source, Duration timeout, int maxRetries) {
return Source.lazily( return Source.lazily(
() -> source.backpressureTimeout(timeout).recoverWithRetries( () -> source.backpressureTimeout(timeout).recoverWithRetries(
maxRetries, maxRetries,
@ -204,7 +205,7 @@ public class RecipeAdhocSourceTest extends RecipeTest {
Thread.sleep(500); Thread.sleep(500);
assertEquals(TimeoutException.class, probe.expectError().getClass()); assertEquals(TimeoutException.class, probe.expectError().getClass());
probe.request(1); //send demand probe.request(1); //send demand
probe.expectNoMessage(Duration.create(200, "milliseconds")); //but no more restart probe.expectNoMessage(FiniteDuration.create(200, "milliseconds")); //but no more restart
} }
}; };
} }

View file

@ -15,7 +15,7 @@ import org.junit.AfterClass;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import org.junit.Test; import org.junit.Test;
import java.util.concurrent.TimeUnit; import java.time.Duration;
public class RecipeKeepAlive extends RecipeTest { public class RecipeKeepAlive extends RecipeTest {
static ActorSystem system; static ActorSystem system;
@ -47,8 +47,8 @@ public class RecipeKeepAlive extends RecipeTest {
//#inject-keepalive //#inject-keepalive
Flow<ByteString, ByteString, NotUsed> keepAliveInject = Flow<ByteString, ByteString, NotUsed> keepAliveInject =
Flow.of(ByteString.class).keepAlive( Flow.of(ByteString.class).keepAlive(
scala.concurrent.duration.Duration.create(1, TimeUnit.SECONDS), Duration.ofSeconds(1),
() -> keepAliveMessage); () -> keepAliveMessage);
//#inject-keepalive //#inject-keepalive
//@formatter:on //@formatter:on

View file

@ -65,7 +65,7 @@ public class FlowTest extends StreamTest {
final java.lang.Iterable<Integer> input = Arrays.asList(0, 1, 2, 3, 4, 5); final java.lang.Iterable<Integer> input = Arrays.asList(0, 1, 2, 3, 4, 5);
final Source<Integer, NotUsed> ints = Source.from(input); final Source<Integer, NotUsed> ints = Source.from(input);
final Flow<Integer, String, NotUsed> flow1 = Flow.of(Integer.class).drop(2).take(3 final Flow<Integer, String, NotUsed> flow1 = Flow.of(Integer.class).drop(2).take(3
).takeWithin(FiniteDuration.create(10, TimeUnit.SECONDS ).takeWithin(java.time.Duration.ofSeconds(10
)).map(new Function<Integer, String>() { )).map(new Function<Integer, String>() {
public String apply(Integer elem) { public String apply(Integer elem) {
return lookup[elem]; return lookup[elem];
@ -80,7 +80,7 @@ public class FlowTest extends StreamTest {
public java.util.List<String> apply(java.util.List<String> elem) { public java.util.List<String> apply(java.util.List<String> elem) {
return elem; return elem;
} }
}).groupedWithin(100, FiniteDuration.create(50, TimeUnit.MILLISECONDS) }).groupedWithin(100, java.time.Duration.ofMillis(50)
).mapConcat(new Function<java.util.List<String>, java.lang.Iterable<String>>() { ).mapConcat(new Function<java.util.List<String>, java.lang.Iterable<String>>() {
public java.util.List<String> apply(java.util.List<String> elem) { public java.util.List<String> apply(java.util.List<String> elem) {
return elem; return elem;

View file

@ -4,22 +4,18 @@
package akka.stream.javadsl; package akka.stream.javadsl;
import akka.Done;
import akka.NotUsed; import akka.NotUsed;
import akka.actor.ActorRef; import akka.stream.StreamTest;
import akka.japi.Pair; import akka.stream.ThrottleMode;
import akka.japi.function.Function; import akka.testkit.AkkaJUnitActorSystemResource;
import akka.stream.*; import akka.testkit.AkkaSpec;
import org.junit.ClassRule; import org.junit.ClassRule;
import org.junit.Test; import org.junit.Test;
import scala.concurrent.duration.FiniteDuration;
import java.util.*; import java.util.Collections;
import java.util.concurrent.CompletableFuture; import java.util.List;
import java.util.concurrent.CompletionStage; import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import akka.testkit.AkkaSpec;
import akka.testkit.AkkaJUnitActorSystemResource;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
@ -36,7 +32,7 @@ public class FlowThrottleTest extends StreamTest {
public void mustWorksForTwoStreams() throws Exception { public void mustWorksForTwoStreams() throws Exception {
final Flow<Integer, Integer, NotUsed> sharedThrottle = final Flow<Integer, Integer, NotUsed> sharedThrottle =
Flow.of(Integer.class) Flow.of(Integer.class)
.throttle(1, FiniteDuration.create(1, TimeUnit.DAYS), 1, ThrottleMode.enforcing()); .throttle(1,java.time.Duration.ofDays(1), 1, ThrottleMode.enforcing());
CompletionStage<List<Integer>> result1 = CompletionStage<List<Integer>> result1 =
Source.single(1).via(sharedThrottle).via(sharedThrottle).runWith(Sink.seq(), materializer); Source.single(1).via(sharedThrottle).via(sharedThrottle).runWith(Sink.seq(), materializer);

View file

@ -20,11 +20,11 @@ import akka.stream.testkit.TestPublisher;
import akka.testkit.javadsl.TestKit; import akka.testkit.javadsl.TestKit;
import org.junit.ClassRule; import org.junit.ClassRule;
import org.junit.Test; import org.junit.Test;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration; import scala.concurrent.duration.FiniteDuration;
import scala.util.Try; import scala.util.Try;
import akka.testkit.AkkaJUnitActorSystemResource; import akka.testkit.AkkaJUnitActorSystemResource;
import java.time.Duration;
import java.util.*; import java.util.*;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage; import java.util.concurrent.CompletionStage;
@ -71,12 +71,12 @@ public class SourceTest extends StreamTest {
ints ints
.drop(2) .drop(2)
.take(3) .take(3)
.takeWithin(FiniteDuration.create(10, TimeUnit.SECONDS)) .takeWithin(Duration.ofSeconds(10))
.map(elem -> lookup[elem]) .map(elem -> lookup[elem])
.filter(elem -> !elem.equals("c")) .filter(elem -> !elem.equals("c"))
.grouped(2) .grouped(2)
.mapConcat(elem -> elem) .mapConcat(elem -> elem)
.groupedWithin(100, FiniteDuration.create(50, TimeUnit.MILLISECONDS)) .groupedWithin(100, Duration.ofMillis(50))
.mapConcat(elem -> elem) .mapConcat(elem -> elem)
.runFold("", (acc, elem) -> acc + elem, materializer) .runFold("", (acc, elem) -> acc + elem, materializer)
.thenAccept(elem -> probe.getRef().tell(elem, ActorRef.noSender())); .thenAccept(elem -> probe.getRef().tell(elem, ActorRef.noSender()));
@ -432,8 +432,8 @@ public class SourceTest extends StreamTest {
@Test @Test
public void mustProduceTicks() throws Exception { public void mustProduceTicks() throws Exception {
final TestKit probe = new TestKit(system); final TestKit probe = new TestKit(system);
Source<String, Cancellable> tickSource = Source.tick(FiniteDuration.create(1, TimeUnit.SECONDS), Source<String, Cancellable> tickSource = Source.tick(Duration.ofSeconds(1),
FiniteDuration.create(500, TimeUnit.MILLISECONDS), "tick"); Duration.ofMillis(500), "tick");
@SuppressWarnings("unused") @SuppressWarnings("unused")
Cancellable cancellable = tickSource.to(Sink.foreach(new Procedure<String>() { Cancellable cancellable = tickSource.to(Sink.foreach(new Procedure<String>() {
public void apply(String elem) { public void apply(String elem) {
@ -450,8 +450,8 @@ public class SourceTest extends StreamTest {
@Test @Test
@SuppressWarnings("unused") @SuppressWarnings("unused")
public void mustCompileMethodsWithJavaDuration() { public void mustCompileMethodsWithJavaDuration() {
Source<NotUsed, Cancellable> tickSource = Source.tick(java.time.Duration.ofSeconds(1), Source<NotUsed, Cancellable> tickSource = Source.tick(Duration.ofSeconds(1),
java.time.Duration.ofMillis(500), NotUsed.getInstance()); Duration.ofMillis(500), NotUsed.getInstance());
} }
@Test @Test
@ -642,7 +642,7 @@ public class SourceTest extends StreamTest {
probe.expectMsgEquals(0); probe.expectMsgEquals(0);
probe.expectMsgEquals(1); probe.expectMsgEquals(1);
FiniteDuration duration = Duration.apply(200, TimeUnit.MILLISECONDS); FiniteDuration duration = FiniteDuration.apply(200, TimeUnit.MILLISECONDS);
probe.expectNoMsg(duration); probe.expectNoMsg(duration);
future.toCompletableFuture().get(3, TimeUnit.SECONDS); future.toCompletableFuture().get(3, TimeUnit.SECONDS);
@ -819,7 +819,7 @@ public class SourceTest extends StreamTest {
public void mustBeAbleToUseInitialTimeout() throws Throwable { public void mustBeAbleToUseInitialTimeout() throws Throwable {
try { try {
try { try {
Source.maybe().initialTimeout(Duration.create(1, "second")).runWith(Sink.head(), materializer) Source.maybe().initialTimeout(Duration.ofSeconds(1)).runWith(Sink.head(), materializer)
.toCompletableFuture().get(3, TimeUnit.SECONDS); .toCompletableFuture().get(3, TimeUnit.SECONDS);
org.junit.Assert.fail("A TimeoutException was expected"); org.junit.Assert.fail("A TimeoutException was expected");
} catch (ExecutionException e) { } catch (ExecutionException e) {
@ -834,7 +834,7 @@ public class SourceTest extends StreamTest {
public void mustBeAbleToUseCompletionTimeout() throws Throwable { public void mustBeAbleToUseCompletionTimeout() throws Throwable {
try { try {
try { try {
Source.maybe().completionTimeout(Duration.create(1, "second")).runWith(Sink.head(), materializer) Source.maybe().completionTimeout(Duration.ofSeconds(1)).runWith(Sink.head(), materializer)
.toCompletableFuture().get(3, TimeUnit.SECONDS); .toCompletableFuture().get(3, TimeUnit.SECONDS);
org.junit.Assert.fail("A TimeoutException was expected"); org.junit.Assert.fail("A TimeoutException was expected");
} catch (ExecutionException e) { } catch (ExecutionException e) {
@ -849,7 +849,7 @@ public class SourceTest extends StreamTest {
public void mustBeAbleToUseIdleTimeout() throws Throwable { public void mustBeAbleToUseIdleTimeout() throws Throwable {
try { try {
try { try {
Source.maybe().idleTimeout(Duration.create(1, "second")).runWith(Sink.head(), materializer) Source.maybe().idleTimeout(Duration.ofSeconds(1)).runWith(Sink.head(), materializer)
.toCompletableFuture().get(3, TimeUnit.SECONDS); .toCompletableFuture().get(3, TimeUnit.SECONDS);
org.junit.Assert.fail("A TimeoutException was expected"); org.junit.Assert.fail("A TimeoutException was expected");
} catch (ExecutionException e) { } catch (ExecutionException e) {
@ -864,8 +864,8 @@ public class SourceTest extends StreamTest {
public void mustBeAbleToUseIdleInject() throws Exception { public void mustBeAbleToUseIdleInject() throws Exception {
Integer result = Integer result =
Source.<Integer>maybe() Source.<Integer>maybe()
.keepAlive(Duration.create(1, "second"), () -> 0) .keepAlive(Duration.ofSeconds(1), () -> 0)
.takeWithin(Duration.create(1500, "milliseconds")) .takeWithin(Duration.ofMillis(1500))
.runWith(Sink.head(), materializer) .runWith(Sink.head(), materializer)
.toCompletableFuture().get(3, TimeUnit.SECONDS); .toCompletableFuture().get(3, TimeUnit.SECONDS);
@ -882,8 +882,8 @@ public class SourceTest extends StreamTest {
public void mustBeAbleToUseThrottle() throws Exception { public void mustBeAbleToUseThrottle() throws Exception {
Integer result = Integer result =
Source.from(Arrays.asList(0, 1, 2)) Source.from(Arrays.asList(0, 1, 2))
.throttle(10, FiniteDuration.create(1, TimeUnit.SECONDS), 10, ThrottleMode.shaping()) .throttle(10, Duration.ofSeconds(1), 10, ThrottleMode.shaping())
.throttle(10, FiniteDuration.create(1, TimeUnit.SECONDS), 10, ThrottleMode.enforcing()) .throttle(10, Duration.ofSeconds(1), 10, ThrottleMode.enforcing())
.runWith(Sink.head(), materializer) .runWith(Sink.head(), materializer)
.toCompletableFuture().get(3, TimeUnit.SECONDS); .toCompletableFuture().get(3, TimeUnit.SECONDS);