2018-10-29 17:19:37 +08:00
|
|
|
/*
|
2019-01-02 18:55:26 +08:00
|
|
|
* Copyright (C) 2015-2019 Lightbend Inc. <https://www.lightbend.com>
|
2016-01-13 16:25:24 +01:00
|
|
|
*/
|
2018-03-13 23:45:55 +09:00
|
|
|
|
2017-03-16 09:30:00 +01:00
|
|
|
package jdocs.stream;
|
2016-01-13 16:25:24 +01:00
|
|
|
|
2018-03-19 22:14:33 +08:00
|
|
|
import java.time.Duration;
|
2016-01-13 16:25:24 +01:00
|
|
|
import java.util.Arrays;
|
|
|
|
|
import java.util.Collections;
|
|
|
|
|
import java.util.List;
|
2016-01-21 16:37:26 +01:00
|
|
|
import java.util.concurrent.CompletableFuture;
|
|
|
|
|
import java.util.concurrent.CompletionStage;
|
|
|
|
|
import java.util.concurrent.ExecutionException;
|
2016-01-13 16:25:24 +01:00
|
|
|
import java.util.concurrent.TimeUnit;
|
|
|
|
|
|
2016-01-20 10:00:37 +02:00
|
|
|
import akka.NotUsed;
|
2017-03-16 09:30:00 +01:00
|
|
|
import jdocs.AbstractJavaTest;
|
2017-03-17 03:02:47 +08:00
|
|
|
import akka.testkit.javadsl.TestKit;
|
2016-01-13 16:25:24 +01:00
|
|
|
import org.junit.*;
|
|
|
|
|
import static org.junit.Assert.assertEquals;
|
|
|
|
|
|
|
|
|
|
import akka.actor.*;
|
|
|
|
|
import akka.japi.Pair;
|
|
|
|
|
import akka.stream.*;
|
|
|
|
|
import akka.stream.javadsl.*;
|
|
|
|
|
import akka.stream.testkit.*;
|
|
|
|
|
import akka.stream.testkit.javadsl.*;
|
|
|
|
|
|
2016-02-11 16:39:25 +01:00
|
|
|
public class StreamTestKitDocTest extends AbstractJavaTest {
|
2016-01-13 16:25:24 +01:00
|
|
|
|
|
|
|
|
static ActorSystem system;
|
2016-02-11 16:39:25 +01:00
|
|
|
static Materializer mat;
|
2016-01-13 16:25:24 +01:00
|
|
|
|
|
|
|
|
@BeforeClass
|
|
|
|
|
public static void setup() {
|
|
|
|
|
system = ActorSystem.create("StreamTestKitDocTest");
|
2016-02-11 16:39:25 +01:00
|
|
|
mat = ActorMaterializer.create(system);
|
2016-01-13 16:25:24 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@AfterClass
|
|
|
|
|
public static void tearDown() {
|
2017-03-17 03:02:47 +08:00
|
|
|
TestKit.shutdownActorSystem(system);
|
2016-01-13 16:25:24 +01:00
|
|
|
system = null;
|
2016-02-11 16:39:25 +01:00
|
|
|
mat = null;
|
2016-01-13 16:25:24 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Test
|
|
|
|
|
public void strictCollection() throws Exception {
|
2019-01-12 04:00:53 +08:00
|
|
|
// #strict-collection
|
|
|
|
|
final Sink<Integer, CompletionStage<Integer>> sinkUnderTest =
|
|
|
|
|
Flow.of(Integer.class)
|
|
|
|
|
.map(i -> i * 2)
|
|
|
|
|
.toMat(Sink.fold(0, (agg, next) -> agg + next), Keep.right());
|
|
|
|
|
|
|
|
|
|
final CompletionStage<Integer> future =
|
|
|
|
|
Source.from(Arrays.asList(1, 2, 3, 4)).runWith(sinkUnderTest, mat);
|
2016-08-31 18:20:05 +02:00
|
|
|
final Integer result = future.toCompletableFuture().get(3, TimeUnit.SECONDS);
|
2019-01-12 04:00:53 +08:00
|
|
|
assert (result == 20);
|
|
|
|
|
// #strict-collection
|
2016-01-13 16:25:24 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Test
|
|
|
|
|
public void groupedPartOfInfiniteStream() throws Exception {
|
2019-01-12 04:00:53 +08:00
|
|
|
// #grouped-infinite
|
|
|
|
|
final Source<Integer, NotUsed> sourceUnderTest = Source.repeat(1).map(i -> i * 2);
|
2016-01-13 16:25:24 +01:00
|
|
|
|
2019-01-12 04:00:53 +08:00
|
|
|
final CompletionStage<List<Integer>> future = sourceUnderTest.take(10).runWith(Sink.seq(), mat);
|
2016-08-31 18:20:05 +02:00
|
|
|
final List<Integer> result = future.toCompletableFuture().get(3, TimeUnit.SECONDS);
|
2016-01-13 16:25:24 +01:00
|
|
|
assertEquals(result, Collections.nCopies(10, 2));
|
2019-01-12 04:00:53 +08:00
|
|
|
// #grouped-infinite
|
2016-01-13 16:25:24 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Test
|
|
|
|
|
public void foldedStream() throws Exception {
|
2019-01-12 04:00:53 +08:00
|
|
|
// #folded-stream
|
|
|
|
|
final Flow<Integer, Integer, NotUsed> flowUnderTest =
|
|
|
|
|
Flow.of(Integer.class).takeWhile(i -> i < 5);
|
|
|
|
|
|
|
|
|
|
final CompletionStage<Integer> future =
|
|
|
|
|
Source.from(Arrays.asList(1, 2, 3, 4, 5, 6))
|
|
|
|
|
.via(flowUnderTest)
|
|
|
|
|
.runWith(Sink.fold(0, (agg, next) -> agg + next), mat);
|
2016-08-31 18:20:05 +02:00
|
|
|
final Integer result = future.toCompletableFuture().get(3, TimeUnit.SECONDS);
|
2019-01-12 04:00:53 +08:00
|
|
|
assert (result == 10);
|
|
|
|
|
// #folded-stream
|
2016-01-13 16:25:24 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Test
|
|
|
|
|
public void pipeToTestProbe() throws Exception {
|
2019-01-12 04:00:53 +08:00
|
|
|
// #pipeto-testprobe
|
|
|
|
|
final Source<List<Integer>, NotUsed> sourceUnderTest =
|
|
|
|
|
Source.from(Arrays.asList(1, 2, 3, 4)).grouped(2);
|
2016-01-13 16:25:24 +01:00
|
|
|
|
2018-06-26 15:41:30 +02:00
|
|
|
final TestKit probe = new TestKit(system);
|
2019-01-12 04:00:53 +08:00
|
|
|
final CompletionStage<List<List<Integer>>> future =
|
|
|
|
|
sourceUnderTest.grouped(2).runWith(Sink.head(), mat);
|
2018-12-06 22:40:43 +08:00
|
|
|
akka.pattern.Patterns.pipe(future, system.dispatcher()).to(probe.getRef());
|
2018-06-26 15:41:30 +02:00
|
|
|
probe.expectMsg(Duration.ofSeconds(3), Arrays.asList(Arrays.asList(1, 2), Arrays.asList(3, 4)));
|
2019-01-12 04:00:53 +08:00
|
|
|
// #pipeto-testprobe
|
2016-01-13 16:25:24 +01:00
|
|
|
}
|
|
|
|
|
|
2019-01-12 04:00:53 +08:00
|
|
|
public enum Tick {
|
|
|
|
|
TOCK,
|
|
|
|
|
COMPLETED
|
|
|
|
|
};
|
2016-01-13 16:25:24 +01:00
|
|
|
|
|
|
|
|
@Test
|
|
|
|
|
public void sinkActorRef() throws Exception {
|
2019-01-12 04:00:53 +08:00
|
|
|
// #sink-actorref
|
|
|
|
|
final Source<Tick, Cancellable> sourceUnderTest =
|
|
|
|
|
Source.tick(Duration.ZERO, Duration.ofMillis(200), Tick.TOCK);
|
2016-01-13 16:25:24 +01:00
|
|
|
|
2018-06-26 15:41:30 +02:00
|
|
|
final TestKit probe = new TestKit(system);
|
2019-01-12 04:00:53 +08:00
|
|
|
final Cancellable cancellable =
|
|
|
|
|
sourceUnderTest.to(Sink.actorRef(probe.getRef(), Tick.COMPLETED)).run(mat);
|
2018-06-26 15:41:30 +02:00
|
|
|
probe.expectMsg(Duration.ofSeconds(3), Tick.TOCK);
|
|
|
|
|
probe.expectNoMessage(Duration.ofMillis(100));
|
|
|
|
|
probe.expectMsg(Duration.ofSeconds(3), Tick.TOCK);
|
2016-01-13 16:25:24 +01:00
|
|
|
cancellable.cancel();
|
2018-06-26 15:41:30 +02:00
|
|
|
probe.expectMsg(Duration.ofSeconds(3), Tick.COMPLETED);
|
2019-01-12 04:00:53 +08:00
|
|
|
// #sink-actorref
|
2016-01-13 16:25:24 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Test
|
|
|
|
|
public void sourceActorRef() throws Exception {
|
2019-01-12 04:00:53 +08:00
|
|
|
// #source-actorref
|
|
|
|
|
final Sink<Integer, CompletionStage<String>> sinkUnderTest =
|
|
|
|
|
Flow.of(Integer.class)
|
|
|
|
|
.map(i -> i.toString())
|
|
|
|
|
.toMat(Sink.fold("", (agg, next) -> agg + next), Keep.right());
|
2016-01-13 16:25:24 +01:00
|
|
|
|
2016-01-21 16:37:26 +01:00
|
|
|
final Pair<ActorRef, CompletionStage<String>> refAndCompletionStage =
|
2019-01-12 04:00:53 +08:00
|
|
|
Source.<Integer>actorRef(8, OverflowStrategy.fail())
|
|
|
|
|
.toMat(sinkUnderTest, Keep.both())
|
|
|
|
|
.run(mat);
|
2016-01-21 16:37:26 +01:00
|
|
|
final ActorRef ref = refAndCompletionStage.first();
|
|
|
|
|
final CompletionStage<String> future = refAndCompletionStage.second();
|
2016-01-13 16:25:24 +01:00
|
|
|
|
|
|
|
|
ref.tell(1, ActorRef.noSender());
|
|
|
|
|
ref.tell(2, ActorRef.noSender());
|
|
|
|
|
ref.tell(3, ActorRef.noSender());
|
|
|
|
|
ref.tell(new akka.actor.Status.Success("done"), ActorRef.noSender());
|
|
|
|
|
|
2016-01-21 16:37:26 +01:00
|
|
|
final String result = future.toCompletableFuture().get(1, TimeUnit.SECONDS);
|
2016-01-13 16:25:24 +01:00
|
|
|
assertEquals(result, "123");
|
2019-01-12 04:00:53 +08:00
|
|
|
// #source-actorref
|
2016-01-13 16:25:24 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Test
|
|
|
|
|
public void testSinkProbe() {
|
2019-01-12 04:00:53 +08:00
|
|
|
// #test-sink-probe
|
|
|
|
|
final Source<Integer, NotUsed> sourceUnderTest =
|
|
|
|
|
Source.from(Arrays.asList(1, 2, 3, 4)).filter(elem -> elem % 2 == 0).map(elem -> elem * 2);
|
2016-01-13 16:25:24 +01:00
|
|
|
|
|
|
|
|
sourceUnderTest
|
2019-01-12 04:00:53 +08:00
|
|
|
.runWith(TestSink.probe(system), mat)
|
|
|
|
|
.request(2)
|
|
|
|
|
.expectNext(4, 8)
|
|
|
|
|
.expectComplete();
|
|
|
|
|
// #test-sink-probe
|
2016-01-13 16:25:24 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Test
|
|
|
|
|
public void testSourceProbe() {
|
2019-01-12 04:00:53 +08:00
|
|
|
// #test-source-probe
|
2016-01-20 10:00:37 +02:00
|
|
|
final Sink<Integer, NotUsed> sinkUnderTest = Sink.cancelled();
|
2016-01-13 16:25:24 +01:00
|
|
|
|
|
|
|
|
TestSource.<Integer>probe(system)
|
2019-01-12 04:00:53 +08:00
|
|
|
.toMat(sinkUnderTest, Keep.left())
|
|
|
|
|
.run(mat)
|
|
|
|
|
.expectCancellation();
|
|
|
|
|
// #test-source-probe
|
2016-01-13 16:25:24 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Test
|
|
|
|
|
public void injectingFailure() throws Exception {
|
2019-01-12 04:00:53 +08:00
|
|
|
// #injecting-failure
|
2016-01-21 16:37:26 +01:00
|
|
|
final Sink<Integer, CompletionStage<Integer>> sinkUnderTest = Sink.head();
|
2016-01-13 16:25:24 +01:00
|
|
|
|
2016-01-21 16:37:26 +01:00
|
|
|
final Pair<TestPublisher.Probe<Integer>, CompletionStage<Integer>> probeAndCompletionStage =
|
2019-01-12 04:00:53 +08:00
|
|
|
TestSource.<Integer>probe(system).toMat(sinkUnderTest, Keep.both()).run(mat);
|
2016-01-21 16:37:26 +01:00
|
|
|
final TestPublisher.Probe<Integer> probe = probeAndCompletionStage.first();
|
|
|
|
|
final CompletionStage<Integer> future = probeAndCompletionStage.second();
|
2016-01-13 16:25:24 +01:00
|
|
|
probe.sendError(new Exception("boom"));
|
|
|
|
|
|
2016-01-21 16:37:26 +01:00
|
|
|
try {
|
2016-08-31 18:20:05 +02:00
|
|
|
future.toCompletableFuture().get(3, TimeUnit.SECONDS);
|
2016-01-21 16:37:26 +01:00
|
|
|
assert false;
|
|
|
|
|
} catch (ExecutionException ee) {
|
|
|
|
|
final Throwable exception = ee.getCause();
|
|
|
|
|
assertEquals(exception.getMessage(), "boom");
|
|
|
|
|
}
|
2019-01-12 04:00:53 +08:00
|
|
|
// #injecting-failure
|
2016-01-13 16:25:24 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Test
|
|
|
|
|
public void testSourceAndTestSink() throws Exception {
|
2019-01-12 04:00:53 +08:00
|
|
|
// #test-source-and-sink
|
|
|
|
|
final Flow<Integer, Integer, NotUsed> flowUnderTest =
|
|
|
|
|
Flow.of(Integer.class)
|
|
|
|
|
.mapAsyncUnordered(
|
|
|
|
|
2,
|
|
|
|
|
sleep ->
|
|
|
|
|
akka.pattern.Patterns.after(
|
|
|
|
|
Duration.ofMillis(10),
|
|
|
|
|
system.scheduler(),
|
|
|
|
|
system.dispatcher(),
|
|
|
|
|
CompletableFuture.completedFuture(sleep)));
|
2016-01-13 16:25:24 +01:00
|
|
|
|
|
|
|
|
final Pair<TestPublisher.Probe<Integer>, TestSubscriber.Probe<Integer>> pubAndSub =
|
2019-01-12 04:00:53 +08:00
|
|
|
TestSource.<Integer>probe(system)
|
|
|
|
|
.via(flowUnderTest)
|
|
|
|
|
.toMat(TestSink.<Integer>probe(system), Keep.both())
|
|
|
|
|
.run(mat);
|
2016-01-13 16:25:24 +01:00
|
|
|
final TestPublisher.Probe<Integer> pub = pubAndSub.first();
|
|
|
|
|
final TestSubscriber.Probe<Integer> sub = pubAndSub.second();
|
|
|
|
|
|
|
|
|
|
sub.request(3);
|
|
|
|
|
pub.sendNext(3);
|
|
|
|
|
pub.sendNext(2);
|
|
|
|
|
pub.sendNext(1);
|
|
|
|
|
sub.expectNextUnordered(1, 2, 3);
|
|
|
|
|
|
|
|
|
|
pub.sendError(new Exception("Power surge in the linear subroutine C-47!"));
|
|
|
|
|
final Throwable ex = sub.expectError();
|
2019-01-12 04:00:53 +08:00
|
|
|
assert (ex.getMessage().contains("C-47"));
|
|
|
|
|
// #test-source-and-sink
|
2016-01-13 16:25:24 +01:00
|
|
|
}
|
|
|
|
|
}
|