parent
42a25609c1
commit
4821fb158e
4 changed files with 22 additions and 22 deletions
|
|
@ -57,7 +57,7 @@ public class StreamTestKitDocTest extends AbstractJavaTest {
|
|||
|
||||
final CompletionStage<Integer> future = Source.from(Arrays.asList(1, 2, 3, 4))
|
||||
.runWith(sinkUnderTest, mat);
|
||||
final Integer result = future.toCompletableFuture().get(1, TimeUnit.SECONDS);
|
||||
final Integer result = future.toCompletableFuture().get(3, TimeUnit.SECONDS);
|
||||
assert(result == 20);
|
||||
//#strict-collection
|
||||
}
|
||||
|
|
@ -69,9 +69,9 @@ public class StreamTestKitDocTest extends AbstractJavaTest {
|
|||
.map(i -> i * 2);
|
||||
|
||||
final CompletionStage<List<Integer>> future = sourceUnderTest
|
||||
.grouped(10)
|
||||
.runWith(Sink.head(), mat);
|
||||
final List<Integer> result = future.toCompletableFuture().get(1, TimeUnit.SECONDS);
|
||||
.take(10)
|
||||
.runWith(Sink.seq(), mat);
|
||||
final List<Integer> result = future.toCompletableFuture().get(3, TimeUnit.SECONDS);
|
||||
assertEquals(result, Collections.nCopies(10, 2));
|
||||
//#grouped-infinite
|
||||
}
|
||||
|
|
@ -84,7 +84,7 @@ public class StreamTestKitDocTest extends AbstractJavaTest {
|
|||
|
||||
final CompletionStage<Integer> future = Source.from(Arrays.asList(1, 2, 3, 4, 5, 6))
|
||||
.via(flowUnderTest).runWith(Sink.fold(0, (agg, next) -> agg + next), mat);
|
||||
final Integer result = future.toCompletableFuture().get(1, TimeUnit.SECONDS);
|
||||
final Integer result = future.toCompletableFuture().get(3, TimeUnit.SECONDS);
|
||||
assert(result == 10);
|
||||
//#folded-stream
|
||||
}
|
||||
|
|
@ -101,7 +101,7 @@ public class StreamTestKitDocTest extends AbstractJavaTest {
|
|||
.grouped(2)
|
||||
.runWith(Sink.head(), mat);
|
||||
akka.pattern.PatternsCS.pipe(future, system.dispatcher()).to(probe.ref());
|
||||
probe.expectMsg(Duration.create(1, TimeUnit.SECONDS),
|
||||
probe.expectMsg(Duration.create(3, TimeUnit.SECONDS),
|
||||
Arrays.asList(Arrays.asList(1, 2), Arrays.asList(3, 4))
|
||||
);
|
||||
//#pipeto-testprobe
|
||||
|
|
@ -120,11 +120,11 @@ public class StreamTestKitDocTest extends AbstractJavaTest {
|
|||
final TestProbe probe = new TestProbe(system);
|
||||
final Cancellable cancellable = sourceUnderTest
|
||||
.to(Sink.actorRef(probe.ref(), Tick.COMPLETED)).run(mat);
|
||||
probe.expectMsg(Duration.create(1, TimeUnit.SECONDS), Tick.TOCK);
|
||||
probe.expectMsg(Duration.create(3, TimeUnit.SECONDS), Tick.TOCK);
|
||||
probe.expectNoMsg(Duration.create(100, TimeUnit.MILLISECONDS));
|
||||
probe.expectMsg(Duration.create(1, TimeUnit.SECONDS), Tick.TOCK);
|
||||
probe.expectMsg(Duration.create(3, TimeUnit.SECONDS), Tick.TOCK);
|
||||
cancellable.cancel();
|
||||
probe.expectMsg(Duration.create(1, TimeUnit.SECONDS), Tick.COMPLETED);
|
||||
probe.expectMsg(Duration.create(3, TimeUnit.SECONDS), Tick.COMPLETED);
|
||||
//#sink-actorref
|
||||
}
|
||||
|
||||
|
|
@ -193,7 +193,7 @@ public class StreamTestKitDocTest extends AbstractJavaTest {
|
|||
probe.sendError(new Exception("boom"));
|
||||
|
||||
try {
|
||||
future.toCompletableFuture().get(1, TimeUnit.SECONDS);
|
||||
future.toCompletableFuture().get(3, TimeUnit.SECONDS);
|
||||
assert false;
|
||||
} catch (ExecutionException ee) {
|
||||
final Throwable exception = ee.getCause();
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue