Use type inference for collections in FlowTest.java (#30110)

This commit is contained in:
Andrei Arlou 2021-03-29 18:34:01 +03:00 committed by GitHub
parent 99af826546
commit f7c559e057
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -308,7 +308,7 @@ public class FlowTest extends StreamTest {
.mergeSubstreams();
final CompletionStage<List<List<String>>> future =
Source.from(input).via(flow).limit(10).runWith(Sink.<List<String>>seq(), system);
Source.from(input).via(flow).limit(10).runWith(Sink.seq(), system);
final Object[] result = future.toCompletableFuture().get(1, TimeUnit.SECONDS).toArray();
Arrays.sort(
result,
@ -343,7 +343,7 @@ public class FlowTest extends StreamTest {
.concatSubstreams();
final CompletionStage<List<List<String>>> future =
Source.from(input).via(flow).limit(10).runWith(Sink.<List<String>>seq(), system);
Source.from(input).via(flow).limit(10).runWith(Sink.seq(), system);
final List<List<String>> result = future.toCompletableFuture().get(1, TimeUnit.SECONDS);
assertEquals(
@ -367,7 +367,7 @@ public class FlowTest extends StreamTest {
.concatSubstreams();
final CompletionStage<List<List<String>>> future =
Source.from(input).via(flow).limit(10).runWith(Sink.<List<String>>seq(), system);
Source.from(input).via(flow).limit(10).runWith(Sink.seq(), system);
final List<List<String>> result = future.toCompletableFuture().get(1, TimeUnit.SECONDS);
assertEquals(
@ -433,35 +433,34 @@ public class FlowTest extends StreamTest {
new Function<GraphDSL.Builder<NotUsed>, SourceShape<String>>() {
@Override
public SourceShape<String> apply(Builder<NotUsed> b) throws Exception {
final UniformFanInShape<String, String> merge = b.add(Merge.<String>create(2));
final UniformFanInShape<String, String> merge = b.add(Merge.create(2));
b.from(b.add(in1)).via(b.add(f1)).toInlet(merge.in(0));
b.from(b.add(in2)).via(b.add(f2)).toInlet(merge.in(1));
return new SourceShape<String>(merge.out());
return new SourceShape<>(merge.out());
}
}));
// collecting
final Publisher<String> pub = source.runWith(publisher, system);
final CompletionStage<List<String>> all =
Source.fromPublisher(pub).limit(100).runWith(Sink.<String>seq(), system);
Source.fromPublisher(pub).limit(100).runWith(Sink.seq(), system);
final List<String> result = all.toCompletableFuture().get(3, TimeUnit.SECONDS);
assertEquals(
new HashSet<Object>(Arrays.asList("a", "b", "c", "d", "e", "f")),
new HashSet<String>(result));
new HashSet<Object>(Arrays.asList("a", "b", "c", "d", "e", "f")), new HashSet<>(result));
}
@Test
public void mustBeAbleToUsefromSourceCompletionStage() throws Exception {
final Flow<String, String, NotUsed> f1 =
Flow.of(String.class).via(FlowTest.this.<String>op()).named("f1");
Flow.of(String.class).via(FlowTest.this.op()).named("f1");
final Flow<String, String, NotUsed> f2 =
Flow.of(String.class).via(FlowTest.this.<String>op()).named("f2");
Flow.of(String.class).via(FlowTest.this.op()).named("f2");
@SuppressWarnings("unused")
final Flow<String, String, NotUsed> f3 =
Flow.of(String.class).via(FlowTest.this.<String>op()).named("f3");
Flow.of(String.class).via(FlowTest.this.op()).named("f3");
final Source<String, NotUsed> in1 = Source.from(Arrays.asList("a", "b", "c"));
final Source<String, NotUsed> in2 = Source.from(Arrays.asList("d", "e", "f"));
@ -474,10 +473,10 @@ public class FlowTest extends StreamTest {
new Function<GraphDSL.Builder<NotUsed>, SourceShape<String>>() {
@Override
public SourceShape<String> apply(Builder<NotUsed> b) throws Exception {
final UniformFanInShape<String, String> merge = b.add(Merge.<String>create(2));
final UniformFanInShape<String, String> merge = b.add(Merge.create(2));
b.from(b.add(in1)).via(b.add(f1)).toInlet(merge.in(0));
b.from(b.add(in2)).via(b.add(f2)).toInlet(merge.in(1));
return new SourceShape<String>(merge.out());
return new SourceShape<>(merge.out());
}
}));
@ -497,12 +496,11 @@ public class FlowTest extends StreamTest {
// collecting
final Publisher<String> pub = source.runWith(publisher, system);
final CompletionStage<List<String>> all =
Source.fromPublisher(pub).limit(100).runWith(Sink.<String>seq(), system);
Source.fromPublisher(pub).limit(100).runWith(Sink.seq(), system);
final List<String> result = all.toCompletableFuture().get(3, TimeUnit.SECONDS);
assertEquals(
new HashSet<Object>(Arrays.asList("a", "b", "c", "d", "e", "f")),
new HashSet<String>(result));
new HashSet<Object>(Arrays.asList("a", "b", "c", "d", "e", "f")), new HashSet<>(result));
}
@Test
@ -518,7 +516,7 @@ public class FlowTest extends StreamTest {
final Outlet<String> in1 = b.add(Source.from(input1)).out();
final Outlet<Integer> in2 = b.add(Source.from(input2)).out();
final FanInShape2<String, Integer, Pair<String, Integer>> zip =
b.add(Zip.<String, Integer>create());
b.add(Zip.create());
final SinkShape<Pair<String, Integer>> out =
b.add(
Sink.foreach(
@ -539,10 +537,7 @@ public class FlowTest extends StreamTest {
List<Object> output = probe.receiveN(3);
List<Pair<String, Integer>> expected =
Arrays.asList(
new Pair<String, Integer>("A", 1),
new Pair<String, Integer>("B", 2),
new Pair<String, Integer>("C", 3));
Arrays.asList(new Pair<>("A", 1), new Pair<>("B", 2), new Pair<>("C", 3));
assertEquals(expected, output);
}
@ -569,10 +564,7 @@ public class FlowTest extends StreamTest {
List<Object> output = probe.receiveN(4);
List<Pair<String, Integer>> expected =
Arrays.asList(
new Pair<String, Integer>("A", 1),
new Pair<String, Integer>("B", 2),
new Pair<String, Integer>("C", 3),
new Pair<String, Integer>("MISSING", 4));
new Pair<>("A", 1), new Pair<>("B", 2), new Pair<>("C", 3), new Pair<>("MISSING", 4));
assertEquals(expected, output);
}
@ -627,15 +619,13 @@ public class FlowTest extends StreamTest {
final Flow<Integer, Pair<List<Integer>, Source<Integer, NotUsed>>, NotUsed> flow =
Flow.of(Integer.class).prefixAndTail(3);
CompletionStage<Pair<List<Integer>, Source<Integer, NotUsed>>> future =
Source.from(input)
.via(flow)
.runWith(Sink.<Pair<List<Integer>, Source<Integer, NotUsed>>>head(), system);
Source.from(input).via(flow).runWith(Sink.head(), system);
Pair<List<Integer>, Source<Integer, NotUsed>> result =
future.toCompletableFuture().get(3, TimeUnit.SECONDS);
assertEquals(Arrays.asList(1, 2, 3), result.first());
CompletionStage<List<Integer>> tailFuture =
result.second().limit(4).runWith(Sink.<Integer>seq(), system);
result.second().limit(4).runWith(Sink.seq(), system);
List<Integer> tailResult = tailFuture.toCompletableFuture().get(3, TimeUnit.SECONDS);
assertEquals(Arrays.asList(4, 5, 6), tailResult);
}
@ -646,16 +636,16 @@ public class FlowTest extends StreamTest {
final Iterable<Integer> input1 = Arrays.asList(1, 2, 3);
final Iterable<Integer> input2 = Arrays.asList(4, 5);
final List<Source<Integer, NotUsed>> mainInputs = new ArrayList<Source<Integer, NotUsed>>();
final List<Source<Integer, NotUsed>> mainInputs = new ArrayList<>();
mainInputs.add(Source.from(input1));
mainInputs.add(Source.from(input2));
final Flow<Source<Integer, NotUsed>, List<Integer>, NotUsed> flow =
Flow.<Source<Integer, NotUsed>>create()
.flatMapConcat(ConstantFun.<Source<Integer, NotUsed>>javaIdentityFunction())
.flatMapConcat(ConstantFun.javaIdentityFunction())
.grouped(6);
CompletionStage<List<Integer>> future =
Source.from(mainInputs).via(flow).runWith(Sink.<List<Integer>>head(), system);
Source.from(mainInputs).via(flow).runWith(Sink.head(), system);
List<Integer> result = future.toCompletableFuture().get(3, TimeUnit.SECONDS);
@ -670,7 +660,7 @@ public class FlowTest extends StreamTest {
final Iterable<Integer> input3 = Arrays.asList(20, 21, 22, 23, 24, 25, 26, 27, 28, 29);
final Iterable<Integer> input4 = Arrays.asList(30, 31, 32, 33, 34, 35, 36, 37, 38, 39);
final List<Source<Integer, NotUsed>> mainInputs = new ArrayList<Source<Integer, NotUsed>>();
final List<Source<Integer, NotUsed>> mainInputs = new ArrayList<>();
mainInputs.add(Source.from(input1));
mainInputs.add(Source.from(input2));
mainInputs.add(Source.from(input3));
@ -678,17 +668,17 @@ public class FlowTest extends StreamTest {
final Flow<Source<Integer, NotUsed>, List<Integer>, NotUsed> flow =
Flow.<Source<Integer, NotUsed>>create()
.flatMapMerge(3, ConstantFun.<Source<Integer, NotUsed>>javaIdentityFunction())
.flatMapMerge(3, ConstantFun.javaIdentityFunction())
.grouped(60);
CompletionStage<List<Integer>> future =
Source.from(mainInputs).via(flow).runWith(Sink.<List<Integer>>head(), system);
Source.from(mainInputs).via(flow).runWith(Sink.head(), system);
List<Integer> result = future.toCompletableFuture().get(3, TimeUnit.SECONDS);
final Set<Integer> set = new HashSet<Integer>();
final Set<Integer> set = new HashSet<>();
for (Integer i : result) {
set.add(i);
}
final Set<Integer> expected = new HashSet<Integer>();
final Set<Integer> expected = new HashSet<>();
for (int i = 0; i < 40; ++i) {
expected.add(i);
}
@ -703,7 +693,7 @@ public class FlowTest extends StreamTest {
final Flow<String, List<String>, NotUsed> flow =
Flow.of(String.class).buffer(2, OverflowStrategy.backpressure()).grouped(4);
final CompletionStage<List<String>> future =
Source.from(input).via(flow).runWith(Sink.<List<String>>head(), system);
Source.from(input).via(flow).runWith(Sink.head(), system);
List<String> result = future.toCompletableFuture().get(3, TimeUnit.SECONDS);
assertEquals(input, result);
@ -814,7 +804,7 @@ public class FlowTest extends StreamTest {
final List<String> input = Arrays.asList("A", "B", "C");
final Flow<String, String, NotUsed> flow =
Flow.of(String.class).expand(in -> Stream.iterate(in, i -> i).iterator());
final Sink<String, CompletionStage<String>> sink = Sink.<String>head();
final Sink<String, CompletionStage<String>> sink = Sink.head();
CompletionStage<String> future = Source.from(input).via(flow).runWith(sink, system);
String result = future.toCompletableFuture().get(3, TimeUnit.SECONDS);
assertEquals("A", result);
@ -1187,23 +1177,20 @@ public class FlowTest extends StreamTest {
@Override
public SinkShape<String> apply(Builder<NotUsed> b) throws Exception {
final UniformFanOutShape<String, String> broadcast =
b.add(Broadcast.<String>create(2, true));
final SinkShape<String> out1 = b.add(Sink.<String>cancelled());
final SinkShape<String> out2 = b.add(Sink.<String>ignore());
b.add(Broadcast.create(2, true));
final SinkShape<String> out1 = b.add(Sink.cancelled());
final SinkShape<String> out2 = b.add(Sink.ignore());
b.from(broadcast.out(0)).to(out1);
b.from(broadcast.out(1)).to(out2);
return new SinkShape<String>(broadcast.in());
return new SinkShape<>(broadcast.in());
}
}));
final TestKit probe = new TestKit(system);
Source<String, ActorRef> source =
Source.actorRef(
msg -> Optional.<CompletionStrategy>empty(),
msg -> Optional.<Throwable>empty(),
1,
OverflowStrategy.dropNew());
final ActorRef actor = source.toMat(sink, Keep.<ActorRef, NotUsed>left()).run(system);
msg -> Optional.empty(), msg -> Optional.empty(), 1, OverflowStrategy.dropNew());
final ActorRef actor = source.toMat(sink, Keep.left()).run(system);
probe.watch(actor);
probe.expectTerminated(actor);
}
@ -1253,9 +1240,9 @@ public class FlowTest extends StreamTest {
},
system);
probe.expectMsgEquals(new Pair<String, String>("A", "D"));
probe.expectMsgEquals(new Pair<String, String>("B", "E"));
probe.expectMsgEquals(new Pair<String, String>("C", "F"));
probe.expectMsgEquals(new Pair<>("A", "D"));
probe.expectMsgEquals(new Pair<>("B", "E"));
probe.expectMsgEquals(new Pair<>("C", "F"));
}
@Test