=doc using java.time.Duration for stream's javadsl
This commit is contained in:
parent
3685ce619e
commit
c354aa0b6a
13 changed files with 100 additions and 119 deletions
|
|
@ -4,30 +4,31 @@
|
|||
|
||||
package jdocs.stream;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
import java.util.*;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.CompletionStage;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import akka.NotUsed;
|
||||
import akka.actor.AbstractActor;
|
||||
import akka.actor.ActorRef;
|
||||
import akka.actor.ActorSystem;
|
||||
import akka.actor.Cancellable;
|
||||
import akka.dispatch.Futures;
|
||||
import akka.japi.Pair;
|
||||
import jdocs.AbstractJavaTest;
|
||||
import akka.stream.*;
|
||||
import akka.stream.javadsl.*;
|
||||
import akka.testkit.javadsl.TestKit;
|
||||
import jdocs.AbstractJavaTest;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
|
||||
import scala.concurrent.duration.Duration;
|
||||
import scala.concurrent.duration.FiniteDuration;
|
||||
import akka.actor.ActorSystem;
|
||||
import akka.actor.ActorRef;
|
||||
import akka.actor.Cancellable;
|
||||
import akka.dispatch.Futures;
|
||||
import akka.stream.*;
|
||||
import akka.stream.javadsl.*;
|
||||
import java.time.Duration;
|
||||
import java.util.Arrays;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.CompletionStage;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
public class FlowDocTest extends AbstractJavaTest {
|
||||
|
||||
|
|
@ -131,7 +132,7 @@ public class FlowDocTest extends AbstractJavaTest {
|
|||
//#compound-source-is-not-keyed-runWith
|
||||
final Object tick = new Object();
|
||||
|
||||
final FiniteDuration oneSecond = Duration.create(1, TimeUnit.SECONDS);
|
||||
final Duration oneSecond = Duration.ofSeconds(1);
|
||||
//akka.actor.Cancellable
|
||||
final Source<Object, Cancellable> timer =
|
||||
Source.tick(oneSecond, oneSecond, tick);
|
||||
|
|
@ -208,7 +209,7 @@ public class FlowDocTest extends AbstractJavaTest {
|
|||
@Test
|
||||
public void transformingMaterialized() throws Exception {
|
||||
|
||||
FiniteDuration oneSecond = FiniteDuration.apply(1, TimeUnit.SECONDS);
|
||||
Duration oneSecond = Duration.ofSeconds(1);
|
||||
Flow<Integer, Integer, Cancellable> throttler =
|
||||
Flow.fromGraph(GraphDSL.create(
|
||||
Source.tick(oneSecond, oneSecond, ""),
|
||||
|
|
|
|||
|
|
@ -4,14 +4,14 @@
|
|||
|
||||
package jdocs.stream;
|
||||
|
||||
//#imports
|
||||
import akka.Done;
|
||||
import akka.NotUsed;
|
||||
import akka.actor.ActorSystem;
|
||||
//#imports
|
||||
import akka.dispatch.Futures;
|
||||
import akka.japi.Option;
|
||||
import akka.japi.Pair;
|
||||
import akka.japi.Predicate;
|
||||
import akka.japi.Function;
|
||||
import akka.japi.function.Procedure;
|
||||
import akka.stream.*;
|
||||
import akka.stream.javadsl.*;
|
||||
|
|
@ -19,21 +19,15 @@ import akka.stream.stage.*;
|
|||
//#imports
|
||||
import akka.stream.testkit.TestPublisher;
|
||||
import akka.stream.testkit.TestSubscriber;
|
||||
import akka.japi.Function;
|
||||
import jdocs.AbstractJavaTest;
|
||||
import akka.testkit.javadsl.TestKit;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
import org.reactivestreams.Subscription;
|
||||
import scala.compat.java8.FutureConverters;
|
||||
import scala.concurrent.ExecutionContext;
|
||||
import scala.concurrent.Promise;
|
||||
import scala.concurrent.duration.Duration;
|
||||
import scala.concurrent.duration.FiniteDuration;
|
||||
|
||||
import java.util.*;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.CompletionStage;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
|
@ -568,7 +562,7 @@ public class GraphStageDocTest extends AbstractJavaTest {
|
|||
CompletionStage<Integer> result =
|
||||
Source.from(Arrays.asList(1, 2, 3))
|
||||
.via(new TimedGate<>(2))
|
||||
.takeWithin(Duration.create(250, "millis"))
|
||||
.takeWithin(java.time.Duration.ofMillis(250))
|
||||
.runFold(0, (n, sum) -> n + sum, mat);
|
||||
|
||||
assertEquals(new Integer(1), result.toCompletableFuture().get(3, TimeUnit.SECONDS));
|
||||
|
|
|
|||
|
|
@ -9,27 +9,18 @@ import akka.NotUsed;
|
|||
import akka.actor.ActorSystem;
|
||||
import akka.actor.Cancellable;
|
||||
import akka.japi.Pair;
|
||||
import akka.stream.ActorMaterializer;
|
||||
import akka.stream.KillSwitches;
|
||||
import akka.stream.Materializer;
|
||||
import akka.stream.ThrottleMode;
|
||||
import akka.stream.UniqueKillSwitch;
|
||||
import akka.stream.*;
|
||||
import akka.stream.javadsl.*;
|
||||
import akka.stream.javadsl.PartitionHub.ConsumerInfo;
|
||||
|
||||
import jdocs.AbstractJavaTest;
|
||||
import akka.testkit.javadsl.TestKit;
|
||||
import jdocs.AbstractJavaTest;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
import scala.concurrent.duration.Duration;
|
||||
import scala.concurrent.duration.FiniteDuration;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.CompletionStage;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.function.BiFunction;
|
||||
import java.util.function.Supplier;
|
||||
import java.util.function.ToLongBiFunction;
|
||||
|
||||
public class HubDocTest extends AbstractJavaTest {
|
||||
|
|
@ -80,8 +71,8 @@ public class HubDocTest extends AbstractJavaTest {
|
|||
//#broadcast-hub
|
||||
// A simple producer that publishes a new "message" every second
|
||||
Source<String, Cancellable> producer = Source.tick(
|
||||
FiniteDuration.create(1, TimeUnit.SECONDS),
|
||||
FiniteDuration.create(1, TimeUnit.SECONDS),
|
||||
Duration.ofSeconds(1),
|
||||
Duration.ofSeconds(1),
|
||||
"New message"
|
||||
);
|
||||
|
||||
|
|
@ -132,7 +123,7 @@ public class HubDocTest extends AbstractJavaTest {
|
|||
Flow<String, String, UniqueKillSwitch> busFlow =
|
||||
Flow.fromSinkAndSource(sink, source)
|
||||
.joinMat(KillSwitches.singleBidi(), Keep.right())
|
||||
.backpressureTimeout(FiniteDuration.create(1, TimeUnit.SECONDS));
|
||||
.backpressureTimeout(Duration.ofSeconds(1));
|
||||
//#pub-sub-3
|
||||
|
||||
//#pub-sub-4
|
||||
|
|
@ -155,8 +146,8 @@ public class HubDocTest extends AbstractJavaTest {
|
|||
//#partition-hub
|
||||
// A simple producer that publishes a new "message-n" every second
|
||||
Source<String, Cancellable> producer = Source.tick(
|
||||
FiniteDuration.create(1, TimeUnit.SECONDS),
|
||||
FiniteDuration.create(1, TimeUnit.SECONDS),
|
||||
Duration.ofSeconds(1),
|
||||
Duration.ofSeconds(1),
|
||||
"message"
|
||||
).zipWith(Source.range(0, 100), (a, b) -> a + "-" + b);
|
||||
|
||||
|
|
@ -206,8 +197,8 @@ public class HubDocTest extends AbstractJavaTest {
|
|||
//#partition-hub-stateful
|
||||
// A simple producer that publishes a new "message-n" every second
|
||||
Source<String, Cancellable> producer = Source.tick(
|
||||
FiniteDuration.create(1, TimeUnit.SECONDS),
|
||||
FiniteDuration.create(1, TimeUnit.SECONDS),
|
||||
Duration.ofSeconds(1),
|
||||
Duration.ofSeconds(1),
|
||||
"message"
|
||||
).zipWith(Source.range(0, 100), (a, b) -> a + "-" + b);
|
||||
|
||||
|
|
@ -271,7 +262,7 @@ public class HubDocTest extends AbstractJavaTest {
|
|||
Source<Integer, NotUsed> fromProducer = runnableGraph.run(materializer);
|
||||
|
||||
fromProducer.runForeach(msg -> System.out.println("consumer1: " + msg), materializer);
|
||||
fromProducer.throttle(10, Duration.create(100, TimeUnit.MILLISECONDS), 10, ThrottleMode.shaping())
|
||||
fromProducer.throttle(10, Duration.ofMillis(100), 10, ThrottleMode.shaping())
|
||||
.runForeach(msg -> System.out.println("consumer2: " + msg), materializer);
|
||||
//#partition-hub-fastest
|
||||
|
||||
|
|
|
|||
|
|
@ -17,8 +17,8 @@ import akka.testkit.javadsl.TestKit;
|
|||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
import scala.concurrent.duration.FiniteDuration;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.concurrent.CompletionStage;
|
||||
|
|
@ -52,7 +52,7 @@ class KillSwitchDocTest extends AbstractJavaTest {
|
|||
//#unique-shutdown
|
||||
final Source<Integer, NotUsed> countingSrc =
|
||||
Source.from(new ArrayList<>(Arrays.asList(1, 2, 3, 4)))
|
||||
.delay(FiniteDuration.apply(1, TimeUnit.SECONDS), DelayOverflowStrategy.backpressure());
|
||||
.delay(Duration.ofSeconds(1), DelayOverflowStrategy.backpressure());
|
||||
final Sink<Integer, CompletionStage<Integer>> lastSnk = Sink.last();
|
||||
|
||||
final Pair<UniqueKillSwitch, CompletionStage<Integer>> stream = countingSrc
|
||||
|
|
@ -75,7 +75,7 @@ class KillSwitchDocTest extends AbstractJavaTest {
|
|||
//#unique-abort
|
||||
final Source<Integer, NotUsed> countingSrc =
|
||||
Source.from(new ArrayList<>(Arrays.asList(1, 2, 3, 4)))
|
||||
.delay(FiniteDuration.apply(1, TimeUnit.SECONDS), DelayOverflowStrategy.backpressure());
|
||||
.delay(Duration.ofSeconds(1), DelayOverflowStrategy.backpressure());
|
||||
final Sink<Integer, CompletionStage<Integer>> lastSnk = Sink.last();
|
||||
|
||||
final Pair<UniqueKillSwitch, CompletionStage<Integer>> stream = countingSrc
|
||||
|
|
@ -98,7 +98,7 @@ class KillSwitchDocTest extends AbstractJavaTest {
|
|||
//#shared-shutdown
|
||||
final Source<Integer, NotUsed> countingSrc =
|
||||
Source.from(new ArrayList<>(Arrays.asList(1, 2, 3, 4)))
|
||||
.delay(FiniteDuration.apply(1, TimeUnit.SECONDS), DelayOverflowStrategy.backpressure());
|
||||
.delay( Duration.ofSeconds(1), DelayOverflowStrategy.backpressure());
|
||||
final Sink<Integer, CompletionStage<Integer>> lastSnk = Sink.last();
|
||||
final SharedKillSwitch killSwitch = KillSwitches.shared("my-kill-switch");
|
||||
|
||||
|
|
@ -106,7 +106,7 @@ class KillSwitchDocTest extends AbstractJavaTest {
|
|||
.viaMat(killSwitch.flow(), Keep.right())
|
||||
.toMat(lastSnk, Keep.right()).run(mat);
|
||||
final CompletionStage<Integer> completionStageDelayed = countingSrc
|
||||
.delay(FiniteDuration.apply(1, TimeUnit.SECONDS), DelayOverflowStrategy.backpressure())
|
||||
.delay( Duration.ofSeconds(1), DelayOverflowStrategy.backpressure())
|
||||
.viaMat(killSwitch.flow(), Keep.right())
|
||||
.toMat(lastSnk, Keep.right()).run(mat);
|
||||
|
||||
|
|
@ -127,7 +127,7 @@ class KillSwitchDocTest extends AbstractJavaTest {
|
|||
//#shared-abort
|
||||
final Source<Integer, NotUsed> countingSrc =
|
||||
Source.from(new ArrayList<>(Arrays.asList(1, 2, 3, 4)))
|
||||
.delay(FiniteDuration.apply(1, TimeUnit.SECONDS), DelayOverflowStrategy.backpressure());
|
||||
.delay( Duration.ofSeconds(1), DelayOverflowStrategy.backpressure());
|
||||
final Sink<Integer, CompletionStage<Integer>> lastSnk = Sink.last();
|
||||
final SharedKillSwitch killSwitch = KillSwitches.shared("my-kill-switch");
|
||||
|
||||
|
|
|
|||
|
|
@ -17,12 +17,11 @@ import akka.util.ByteString;
|
|||
|
||||
import java.nio.file.Paths;
|
||||
import java.math.BigInteger;
|
||||
import java.time.Duration;
|
||||
import java.util.concurrent.CompletionStage;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import jdocs.AbstractJavaTest;
|
||||
import scala.concurrent.duration.Duration;
|
||||
//#other-imports
|
||||
|
||||
import org.junit.*;
|
||||
|
|
@ -66,7 +65,7 @@ public class QuickStartDocTest extends AbstractJavaTest {
|
|||
//#add-streams
|
||||
factorials
|
||||
.zipWith(Source.range(0, 99), (num, idx) -> String.format("%d! = %s", idx, num))
|
||||
.throttle(1, Duration.create(1, TimeUnit.SECONDS), 1, ThrottleMode.shaping())
|
||||
.throttle(1, Duration.ofSeconds(1), 1, ThrottleMode.shaping())
|
||||
//#add-streams
|
||||
.take(2)
|
||||
//#add-streams
|
||||
|
|
|
|||
|
|
@ -9,12 +9,14 @@ import akka.actor.ActorSystem;
|
|||
import akka.stream.KillSwitch;
|
||||
import akka.stream.KillSwitches;
|
||||
import akka.stream.Materializer;
|
||||
import akka.stream.javadsl.*;
|
||||
import scala.concurrent.duration.Duration;
|
||||
import akka.stream.javadsl.Keep;
|
||||
import akka.stream.javadsl.RestartSource;
|
||||
import akka.stream.javadsl.Sink;
|
||||
import akka.stream.javadsl.Source;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.CompletionStage;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
public class RestartDocTest {
|
||||
|
||||
|
|
@ -54,8 +56,8 @@ public class RestartDocTest {
|
|||
public void recoverWithBackoffSource() {
|
||||
//#restart-with-backoff-source
|
||||
Source<ServerSentEvent, NotUsed> eventStream = RestartSource.withBackoff(
|
||||
Duration.apply(3, TimeUnit.SECONDS), // min backoff
|
||||
Duration.apply(30, TimeUnit.SECONDS), // max backoff
|
||||
Duration.ofSeconds(3), // min backoff
|
||||
Duration.ofSeconds(30), // max backoff
|
||||
0.2, // adds 20% "noise" to vary the intervals slightly
|
||||
20, // limits the amount of restarts to 20
|
||||
() ->
|
||||
|
|
|
|||
|
|
@ -4,21 +4,19 @@
|
|||
|
||||
package jdocs.stream;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import akka.NotUsed;
|
||||
import jdocs.AbstractJavaTest;
|
||||
import akka.testkit.javadsl.TestKit;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
|
||||
import scala.concurrent.duration.FiniteDuration;
|
||||
import akka.actor.ActorSystem;
|
||||
import akka.actor.Cancellable;
|
||||
import akka.stream.*;
|
||||
import akka.stream.javadsl.*;
|
||||
import akka.testkit.javadsl.TestKit;
|
||||
import jdocs.AbstractJavaTest;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.Arrays;
|
||||
|
||||
public class StreamBuffersRateDocTest extends AbstractJavaTest {
|
||||
|
||||
|
|
@ -77,12 +75,11 @@ public class StreamBuffersRateDocTest extends AbstractJavaTest {
|
|||
@Test
|
||||
public void demonstrateBufferAbstractionLeak() {
|
||||
//#buffering-abstraction-leak
|
||||
final FiniteDuration oneSecond =
|
||||
FiniteDuration.create(1, TimeUnit.SECONDS);
|
||||
final Duration oneSecond = Duration.ofSeconds(1);
|
||||
final Source<String, Cancellable> msgSource =
|
||||
Source.tick(oneSecond, oneSecond, "message!");
|
||||
final Source<String, Cancellable> tickSource =
|
||||
Source.tick(oneSecond.mul(3), oneSecond.mul(3), "tick");
|
||||
Source.tick(oneSecond.multipliedBy(3), oneSecond.multipliedBy(3), "tick");
|
||||
final Flow<String, Integer, NotUsed> conflate =
|
||||
Flow.of(String.class).conflateWithSeed(
|
||||
first -> 1, (count, elem) -> count + 1);
|
||||
|
|
|
|||
|
|
@ -4,6 +4,7 @@
|
|||
|
||||
package jdocs.stream;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
|
@ -26,7 +27,6 @@ import akka.stream.javadsl.*;
|
|||
import akka.stream.testkit.*;
|
||||
import akka.stream.testkit.javadsl.*;
|
||||
import akka.testkit.TestProbe;
|
||||
import scala.concurrent.duration.Duration;
|
||||
import scala.concurrent.duration.FiniteDuration;
|
||||
|
||||
|
||||
|
|
@ -101,7 +101,7 @@ public class StreamTestKitDocTest extends AbstractJavaTest {
|
|||
.grouped(2)
|
||||
.runWith(Sink.head(), mat);
|
||||
akka.pattern.PatternsCS.pipe(future, system.dispatcher()).to(probe.ref());
|
||||
probe.expectMsg(Duration.create(3, TimeUnit.SECONDS),
|
||||
probe.expectMsg(FiniteDuration.create(3, TimeUnit.SECONDS),
|
||||
Arrays.asList(Arrays.asList(1, 2), Arrays.asList(3, 4))
|
||||
);
|
||||
//#pipeto-testprobe
|
||||
|
|
@ -113,18 +113,18 @@ public class StreamTestKitDocTest extends AbstractJavaTest {
|
|||
public void sinkActorRef() throws Exception {
|
||||
//#sink-actorref
|
||||
final Source<Tick, Cancellable> sourceUnderTest = Source.tick(
|
||||
FiniteDuration.create(0, TimeUnit.MILLISECONDS),
|
||||
FiniteDuration.create(200, TimeUnit.MILLISECONDS),
|
||||
Duration.ZERO,
|
||||
Duration.ofMillis(200),
|
||||
Tick.TOCK);
|
||||
|
||||
final TestProbe probe = new TestProbe(system);
|
||||
final Cancellable cancellable = sourceUnderTest
|
||||
.to(Sink.actorRef(probe.ref(), Tick.COMPLETED)).run(mat);
|
||||
probe.expectMsg(Duration.create(3, TimeUnit.SECONDS), Tick.TOCK);
|
||||
probe.expectNoMsg(Duration.create(100, TimeUnit.MILLISECONDS));
|
||||
probe.expectMsg(Duration.create(3, TimeUnit.SECONDS), Tick.TOCK);
|
||||
probe.expectMsg(FiniteDuration.create(3, TimeUnit.SECONDS), Tick.TOCK);
|
||||
probe.expectNoMsg(FiniteDuration.create(100, TimeUnit.MILLISECONDS));
|
||||
probe.expectMsg(FiniteDuration.create(3, TimeUnit.SECONDS), Tick.TOCK);
|
||||
cancellable.cancel();
|
||||
probe.expectMsg(Duration.create(3, TimeUnit.SECONDS), Tick.COMPLETED);
|
||||
probe.expectMsg(FiniteDuration.create(3, TimeUnit.SECONDS), Tick.COMPLETED);
|
||||
//#sink-actorref
|
||||
}
|
||||
|
||||
|
|
@ -207,7 +207,7 @@ public class StreamTestKitDocTest extends AbstractJavaTest {
|
|||
//#test-source-and-sink
|
||||
final Flow<Integer, Integer, NotUsed> flowUnderTest = Flow.of(Integer.class)
|
||||
.mapAsyncUnordered(2, sleep -> akka.pattern.PatternsCS.after(
|
||||
Duration.create(10, TimeUnit.MILLISECONDS),
|
||||
FiniteDuration.create(10, TimeUnit.MILLISECONDS),
|
||||
system.scheduler(),
|
||||
system.dispatcher(),
|
||||
CompletableFuture.completedFuture(sleep)
|
||||
|
|
|
|||
|
|
@ -17,12 +17,13 @@ import akka.stream.testkit.javadsl.TestSink;
|
|||
import akka.testkit.javadsl.TestKit;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
import org.junit.Ignore;
|
||||
import org.junit.Test;
|
||||
import scala.concurrent.Await;
|
||||
import scala.concurrent.Promise;
|
||||
import scala.concurrent.duration.Duration;
|
||||
import scala.concurrent.duration.FiniteDuration;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
|
@ -32,7 +33,7 @@ import static org.junit.Assert.assertEquals;
|
|||
public class RecipeAdhocSourceTest extends RecipeTest {
|
||||
static ActorSystem system;
|
||||
static Materializer mat;
|
||||
FiniteDuration duration200mills = Duration.create(200, "milliseconds");
|
||||
Duration duration200mills = Duration.ofMillis(200);
|
||||
|
||||
@BeforeClass
|
||||
public static void setup() {
|
||||
|
|
@ -48,7 +49,7 @@ public class RecipeAdhocSourceTest extends RecipeTest {
|
|||
}
|
||||
|
||||
//#adhoc-source
|
||||
public <T> Source<T, ?> adhocSource(Source<T, ?> source, FiniteDuration timeout, int maxRetries) {
|
||||
public <T> Source<T, ?> adhocSource(Source<T, ?> source, Duration timeout, int maxRetries) {
|
||||
return Source.lazily(
|
||||
() -> source.backpressureTimeout(timeout).recoverWithRetries(
|
||||
maxRetries,
|
||||
|
|
@ -204,7 +205,7 @@ public class RecipeAdhocSourceTest extends RecipeTest {
|
|||
Thread.sleep(500);
|
||||
assertEquals(TimeoutException.class, probe.expectError().getClass());
|
||||
probe.request(1); //send demand
|
||||
probe.expectNoMessage(Duration.create(200, "milliseconds")); //but no more restart
|
||||
probe.expectNoMessage(FiniteDuration.create(200, "milliseconds")); //but no more restart
|
||||
}
|
||||
};
|
||||
}
|
||||
|
|
|
|||
|
|
@ -15,7 +15,7 @@ import org.junit.AfterClass;
|
|||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.time.Duration;
|
||||
|
||||
public class RecipeKeepAlive extends RecipeTest {
|
||||
static ActorSystem system;
|
||||
|
|
@ -47,8 +47,8 @@ public class RecipeKeepAlive extends RecipeTest {
|
|||
//#inject-keepalive
|
||||
Flow<ByteString, ByteString, NotUsed> keepAliveInject =
|
||||
Flow.of(ByteString.class).keepAlive(
|
||||
scala.concurrent.duration.Duration.create(1, TimeUnit.SECONDS),
|
||||
() -> keepAliveMessage);
|
||||
Duration.ofSeconds(1),
|
||||
() -> keepAliveMessage);
|
||||
//#inject-keepalive
|
||||
//@formatter:on
|
||||
|
||||
|
|
|
|||
|
|
@ -65,7 +65,7 @@ public class FlowTest extends StreamTest {
|
|||
final java.lang.Iterable<Integer> input = Arrays.asList(0, 1, 2, 3, 4, 5);
|
||||
final Source<Integer, NotUsed> ints = Source.from(input);
|
||||
final Flow<Integer, String, NotUsed> flow1 = Flow.of(Integer.class).drop(2).take(3
|
||||
).takeWithin(FiniteDuration.create(10, TimeUnit.SECONDS
|
||||
).takeWithin(java.time.Duration.ofSeconds(10
|
||||
)).map(new Function<Integer, String>() {
|
||||
public String apply(Integer elem) {
|
||||
return lookup[elem];
|
||||
|
|
@ -80,7 +80,7 @@ public class FlowTest extends StreamTest {
|
|||
public java.util.List<String> apply(java.util.List<String> elem) {
|
||||
return elem;
|
||||
}
|
||||
}).groupedWithin(100, FiniteDuration.create(50, TimeUnit.MILLISECONDS)
|
||||
}).groupedWithin(100, java.time.Duration.ofMillis(50)
|
||||
).mapConcat(new Function<java.util.List<String>, java.lang.Iterable<String>>() {
|
||||
public java.util.List<String> apply(java.util.List<String> elem) {
|
||||
return elem;
|
||||
|
|
|
|||
|
|
@ -4,22 +4,18 @@
|
|||
|
||||
package akka.stream.javadsl;
|
||||
|
||||
import akka.Done;
|
||||
import akka.NotUsed;
|
||||
import akka.actor.ActorRef;
|
||||
import akka.japi.Pair;
|
||||
import akka.japi.function.Function;
|
||||
import akka.stream.*;
|
||||
import akka.stream.StreamTest;
|
||||
import akka.stream.ThrottleMode;
|
||||
import akka.testkit.AkkaJUnitActorSystemResource;
|
||||
import akka.testkit.AkkaSpec;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Test;
|
||||
import scala.concurrent.duration.FiniteDuration;
|
||||
|
||||
import java.util.*;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.CompletionStage;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import akka.testkit.AkkaSpec;
|
||||
import akka.testkit.AkkaJUnitActorSystemResource;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
|
|
@ -36,7 +32,7 @@ public class FlowThrottleTest extends StreamTest {
|
|||
public void mustWorksForTwoStreams() throws Exception {
|
||||
final Flow<Integer, Integer, NotUsed> sharedThrottle =
|
||||
Flow.of(Integer.class)
|
||||
.throttle(1, FiniteDuration.create(1, TimeUnit.DAYS), 1, ThrottleMode.enforcing());
|
||||
.throttle(1,java.time.Duration.ofDays(1), 1, ThrottleMode.enforcing());
|
||||
|
||||
CompletionStage<List<Integer>> result1 =
|
||||
Source.single(1).via(sharedThrottle).via(sharedThrottle).runWith(Sink.seq(), materializer);
|
||||
|
|
|
|||
|
|
@ -20,11 +20,11 @@ import akka.stream.testkit.TestPublisher;
|
|||
import akka.testkit.javadsl.TestKit;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Test;
|
||||
import scala.concurrent.duration.Duration;
|
||||
import scala.concurrent.duration.FiniteDuration;
|
||||
import scala.util.Try;
|
||||
import akka.testkit.AkkaJUnitActorSystemResource;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.CompletionStage;
|
||||
|
|
@ -71,12 +71,12 @@ public class SourceTest extends StreamTest {
|
|||
ints
|
||||
.drop(2)
|
||||
.take(3)
|
||||
.takeWithin(FiniteDuration.create(10, TimeUnit.SECONDS))
|
||||
.takeWithin(Duration.ofSeconds(10))
|
||||
.map(elem -> lookup[elem])
|
||||
.filter(elem -> !elem.equals("c"))
|
||||
.grouped(2)
|
||||
.mapConcat(elem -> elem)
|
||||
.groupedWithin(100, FiniteDuration.create(50, TimeUnit.MILLISECONDS))
|
||||
.groupedWithin(100, Duration.ofMillis(50))
|
||||
.mapConcat(elem -> elem)
|
||||
.runFold("", (acc, elem) -> acc + elem, materializer)
|
||||
.thenAccept(elem -> probe.getRef().tell(elem, ActorRef.noSender()));
|
||||
|
|
@ -432,8 +432,8 @@ public class SourceTest extends StreamTest {
|
|||
@Test
|
||||
public void mustProduceTicks() throws Exception {
|
||||
final TestKit probe = new TestKit(system);
|
||||
Source<String, Cancellable> tickSource = Source.tick(FiniteDuration.create(1, TimeUnit.SECONDS),
|
||||
FiniteDuration.create(500, TimeUnit.MILLISECONDS), "tick");
|
||||
Source<String, Cancellable> tickSource = Source.tick(Duration.ofSeconds(1),
|
||||
Duration.ofMillis(500), "tick");
|
||||
@SuppressWarnings("unused")
|
||||
Cancellable cancellable = tickSource.to(Sink.foreach(new Procedure<String>() {
|
||||
public void apply(String elem) {
|
||||
|
|
@ -450,8 +450,8 @@ public class SourceTest extends StreamTest {
|
|||
@Test
|
||||
@SuppressWarnings("unused")
|
||||
public void mustCompileMethodsWithJavaDuration() {
|
||||
Source<NotUsed, Cancellable> tickSource = Source.tick(java.time.Duration.ofSeconds(1),
|
||||
java.time.Duration.ofMillis(500), NotUsed.getInstance());
|
||||
Source<NotUsed, Cancellable> tickSource = Source.tick(Duration.ofSeconds(1),
|
||||
Duration.ofMillis(500), NotUsed.getInstance());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
@ -642,7 +642,7 @@ public class SourceTest extends StreamTest {
|
|||
probe.expectMsgEquals(0);
|
||||
probe.expectMsgEquals(1);
|
||||
|
||||
FiniteDuration duration = Duration.apply(200, TimeUnit.MILLISECONDS);
|
||||
FiniteDuration duration = FiniteDuration.apply(200, TimeUnit.MILLISECONDS);
|
||||
|
||||
probe.expectNoMsg(duration);
|
||||
future.toCompletableFuture().get(3, TimeUnit.SECONDS);
|
||||
|
|
@ -819,7 +819,7 @@ public class SourceTest extends StreamTest {
|
|||
public void mustBeAbleToUseInitialTimeout() throws Throwable {
|
||||
try {
|
||||
try {
|
||||
Source.maybe().initialTimeout(Duration.create(1, "second")).runWith(Sink.head(), materializer)
|
||||
Source.maybe().initialTimeout(Duration.ofSeconds(1)).runWith(Sink.head(), materializer)
|
||||
.toCompletableFuture().get(3, TimeUnit.SECONDS);
|
||||
org.junit.Assert.fail("A TimeoutException was expected");
|
||||
} catch (ExecutionException e) {
|
||||
|
|
@ -834,7 +834,7 @@ public class SourceTest extends StreamTest {
|
|||
public void mustBeAbleToUseCompletionTimeout() throws Throwable {
|
||||
try {
|
||||
try {
|
||||
Source.maybe().completionTimeout(Duration.create(1, "second")).runWith(Sink.head(), materializer)
|
||||
Source.maybe().completionTimeout(Duration.ofSeconds(1)).runWith(Sink.head(), materializer)
|
||||
.toCompletableFuture().get(3, TimeUnit.SECONDS);
|
||||
org.junit.Assert.fail("A TimeoutException was expected");
|
||||
} catch (ExecutionException e) {
|
||||
|
|
@ -849,7 +849,7 @@ public class SourceTest extends StreamTest {
|
|||
public void mustBeAbleToUseIdleTimeout() throws Throwable {
|
||||
try {
|
||||
try {
|
||||
Source.maybe().idleTimeout(Duration.create(1, "second")).runWith(Sink.head(), materializer)
|
||||
Source.maybe().idleTimeout(Duration.ofSeconds(1)).runWith(Sink.head(), materializer)
|
||||
.toCompletableFuture().get(3, TimeUnit.SECONDS);
|
||||
org.junit.Assert.fail("A TimeoutException was expected");
|
||||
} catch (ExecutionException e) {
|
||||
|
|
@ -864,8 +864,8 @@ public class SourceTest extends StreamTest {
|
|||
public void mustBeAbleToUseIdleInject() throws Exception {
|
||||
Integer result =
|
||||
Source.<Integer>maybe()
|
||||
.keepAlive(Duration.create(1, "second"), () -> 0)
|
||||
.takeWithin(Duration.create(1500, "milliseconds"))
|
||||
.keepAlive(Duration.ofSeconds(1), () -> 0)
|
||||
.takeWithin(Duration.ofMillis(1500))
|
||||
.runWith(Sink.head(), materializer)
|
||||
.toCompletableFuture().get(3, TimeUnit.SECONDS);
|
||||
|
||||
|
|
@ -882,8 +882,8 @@ public class SourceTest extends StreamTest {
|
|||
public void mustBeAbleToUseThrottle() throws Exception {
|
||||
Integer result =
|
||||
Source.from(Arrays.asList(0, 1, 2))
|
||||
.throttle(10, FiniteDuration.create(1, TimeUnit.SECONDS), 10, ThrottleMode.shaping())
|
||||
.throttle(10, FiniteDuration.create(1, TimeUnit.SECONDS), 10, ThrottleMode.enforcing())
|
||||
.throttle(10, Duration.ofSeconds(1), 10, ThrottleMode.shaping())
|
||||
.throttle(10, Duration.ofSeconds(1), 10, ThrottleMode.enforcing())
|
||||
.runWith(Sink.head(), materializer)
|
||||
.toCompletableFuture().get(3, TimeUnit.SECONDS);
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue