diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java index 0176b5980d..f4c8eb908a 100644 --- a/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java @@ -308,7 +308,7 @@ public class FlowTest extends StreamTest { .mergeSubstreams(); final CompletionStage>> future = - Source.from(input).via(flow).limit(10).runWith(Sink.>seq(), system); + Source.from(input).via(flow).limit(10).runWith(Sink.seq(), system); final Object[] result = future.toCompletableFuture().get(1, TimeUnit.SECONDS).toArray(); Arrays.sort( result, @@ -343,7 +343,7 @@ public class FlowTest extends StreamTest { .concatSubstreams(); final CompletionStage>> future = - Source.from(input).via(flow).limit(10).runWith(Sink.>seq(), system); + Source.from(input).via(flow).limit(10).runWith(Sink.seq(), system); final List> result = future.toCompletableFuture().get(1, TimeUnit.SECONDS); assertEquals( @@ -367,7 +367,7 @@ public class FlowTest extends StreamTest { .concatSubstreams(); final CompletionStage>> future = - Source.from(input).via(flow).limit(10).runWith(Sink.>seq(), system); + Source.from(input).via(flow).limit(10).runWith(Sink.seq(), system); final List> result = future.toCompletableFuture().get(1, TimeUnit.SECONDS); assertEquals( @@ -433,35 +433,34 @@ public class FlowTest extends StreamTest { new Function, SourceShape>() { @Override public SourceShape apply(Builder b) throws Exception { - final UniformFanInShape merge = b.add(Merge.create(2)); + final UniformFanInShape merge = b.add(Merge.create(2)); b.from(b.add(in1)).via(b.add(f1)).toInlet(merge.in(0)); b.from(b.add(in2)).via(b.add(f2)).toInlet(merge.in(1)); - return new SourceShape(merge.out()); + return new SourceShape<>(merge.out()); } })); // collecting final Publisher pub = source.runWith(publisher, system); final CompletionStage> all = - Source.fromPublisher(pub).limit(100).runWith(Sink.seq(), system); + Source.fromPublisher(pub).limit(100).runWith(Sink.seq(), system); final List result = all.toCompletableFuture().get(3, TimeUnit.SECONDS); assertEquals( - new HashSet(Arrays.asList("a", "b", "c", "d", "e", "f")), - new HashSet(result)); + new HashSet(Arrays.asList("a", "b", "c", "d", "e", "f")), new HashSet<>(result)); } @Test public void mustBeAbleToUsefromSourceCompletionStage() throws Exception { final Flow f1 = - Flow.of(String.class).via(FlowTest.this.op()).named("f1"); + Flow.of(String.class).via(FlowTest.this.op()).named("f1"); final Flow f2 = - Flow.of(String.class).via(FlowTest.this.op()).named("f2"); + Flow.of(String.class).via(FlowTest.this.op()).named("f2"); @SuppressWarnings("unused") final Flow f3 = - Flow.of(String.class).via(FlowTest.this.op()).named("f3"); + Flow.of(String.class).via(FlowTest.this.op()).named("f3"); final Source in1 = Source.from(Arrays.asList("a", "b", "c")); final Source in2 = Source.from(Arrays.asList("d", "e", "f")); @@ -474,10 +473,10 @@ public class FlowTest extends StreamTest { new Function, SourceShape>() { @Override public SourceShape apply(Builder b) throws Exception { - final UniformFanInShape merge = b.add(Merge.create(2)); + final UniformFanInShape merge = b.add(Merge.create(2)); b.from(b.add(in1)).via(b.add(f1)).toInlet(merge.in(0)); b.from(b.add(in2)).via(b.add(f2)).toInlet(merge.in(1)); - return new SourceShape(merge.out()); + return new SourceShape<>(merge.out()); } })); @@ -497,12 +496,11 @@ public class FlowTest extends StreamTest { // collecting final Publisher pub = source.runWith(publisher, system); final CompletionStage> all = - Source.fromPublisher(pub).limit(100).runWith(Sink.seq(), system); + Source.fromPublisher(pub).limit(100).runWith(Sink.seq(), system); final List result = all.toCompletableFuture().get(3, TimeUnit.SECONDS); assertEquals( - new HashSet(Arrays.asList("a", "b", "c", "d", "e", "f")), - new HashSet(result)); + new HashSet(Arrays.asList("a", "b", "c", "d", "e", "f")), new HashSet<>(result)); } @Test @@ -518,7 +516,7 @@ public class FlowTest extends StreamTest { final Outlet in1 = b.add(Source.from(input1)).out(); final Outlet in2 = b.add(Source.from(input2)).out(); final FanInShape2> zip = - b.add(Zip.create()); + b.add(Zip.create()); final SinkShape> out = b.add( Sink.foreach( @@ -539,10 +537,7 @@ public class FlowTest extends StreamTest { List output = probe.receiveN(3); List> expected = - Arrays.asList( - new Pair("A", 1), - new Pair("B", 2), - new Pair("C", 3)); + Arrays.asList(new Pair<>("A", 1), new Pair<>("B", 2), new Pair<>("C", 3)); assertEquals(expected, output); } @@ -569,10 +564,7 @@ public class FlowTest extends StreamTest { List output = probe.receiveN(4); List> expected = Arrays.asList( - new Pair("A", 1), - new Pair("B", 2), - new Pair("C", 3), - new Pair("MISSING", 4)); + new Pair<>("A", 1), new Pair<>("B", 2), new Pair<>("C", 3), new Pair<>("MISSING", 4)); assertEquals(expected, output); } @@ -627,15 +619,13 @@ public class FlowTest extends StreamTest { final Flow, Source>, NotUsed> flow = Flow.of(Integer.class).prefixAndTail(3); CompletionStage, Source>> future = - Source.from(input) - .via(flow) - .runWith(Sink., Source>>head(), system); + Source.from(input).via(flow).runWith(Sink.head(), system); Pair, Source> result = future.toCompletableFuture().get(3, TimeUnit.SECONDS); assertEquals(Arrays.asList(1, 2, 3), result.first()); CompletionStage> tailFuture = - result.second().limit(4).runWith(Sink.seq(), system); + result.second().limit(4).runWith(Sink.seq(), system); List tailResult = tailFuture.toCompletableFuture().get(3, TimeUnit.SECONDS); assertEquals(Arrays.asList(4, 5, 6), tailResult); } @@ -646,16 +636,16 @@ public class FlowTest extends StreamTest { final Iterable input1 = Arrays.asList(1, 2, 3); final Iterable input2 = Arrays.asList(4, 5); - final List> mainInputs = new ArrayList>(); + final List> mainInputs = new ArrayList<>(); mainInputs.add(Source.from(input1)); mainInputs.add(Source.from(input2)); final Flow, List, NotUsed> flow = Flow.>create() - .flatMapConcat(ConstantFun.>javaIdentityFunction()) + .flatMapConcat(ConstantFun.javaIdentityFunction()) .grouped(6); CompletionStage> future = - Source.from(mainInputs).via(flow).runWith(Sink.>head(), system); + Source.from(mainInputs).via(flow).runWith(Sink.head(), system); List result = future.toCompletableFuture().get(3, TimeUnit.SECONDS); @@ -670,7 +660,7 @@ public class FlowTest extends StreamTest { final Iterable input3 = Arrays.asList(20, 21, 22, 23, 24, 25, 26, 27, 28, 29); final Iterable input4 = Arrays.asList(30, 31, 32, 33, 34, 35, 36, 37, 38, 39); - final List> mainInputs = new ArrayList>(); + final List> mainInputs = new ArrayList<>(); mainInputs.add(Source.from(input1)); mainInputs.add(Source.from(input2)); mainInputs.add(Source.from(input3)); @@ -678,17 +668,17 @@ public class FlowTest extends StreamTest { final Flow, List, NotUsed> flow = Flow.>create() - .flatMapMerge(3, ConstantFun.>javaIdentityFunction()) + .flatMapMerge(3, ConstantFun.javaIdentityFunction()) .grouped(60); CompletionStage> future = - Source.from(mainInputs).via(flow).runWith(Sink.>head(), system); + Source.from(mainInputs).via(flow).runWith(Sink.head(), system); List result = future.toCompletableFuture().get(3, TimeUnit.SECONDS); - final Set set = new HashSet(); + final Set set = new HashSet<>(); for (Integer i : result) { set.add(i); } - final Set expected = new HashSet(); + final Set expected = new HashSet<>(); for (int i = 0; i < 40; ++i) { expected.add(i); } @@ -703,7 +693,7 @@ public class FlowTest extends StreamTest { final Flow, NotUsed> flow = Flow.of(String.class).buffer(2, OverflowStrategy.backpressure()).grouped(4); final CompletionStage> future = - Source.from(input).via(flow).runWith(Sink.>head(), system); + Source.from(input).via(flow).runWith(Sink.head(), system); List result = future.toCompletableFuture().get(3, TimeUnit.SECONDS); assertEquals(input, result); @@ -814,7 +804,7 @@ public class FlowTest extends StreamTest { final List input = Arrays.asList("A", "B", "C"); final Flow flow = Flow.of(String.class).expand(in -> Stream.iterate(in, i -> i).iterator()); - final Sink> sink = Sink.head(); + final Sink> sink = Sink.head(); CompletionStage future = Source.from(input).via(flow).runWith(sink, system); String result = future.toCompletableFuture().get(3, TimeUnit.SECONDS); assertEquals("A", result); @@ -1187,23 +1177,20 @@ public class FlowTest extends StreamTest { @Override public SinkShape apply(Builder b) throws Exception { final UniformFanOutShape broadcast = - b.add(Broadcast.create(2, true)); - final SinkShape out1 = b.add(Sink.cancelled()); - final SinkShape out2 = b.add(Sink.ignore()); + b.add(Broadcast.create(2, true)); + final SinkShape out1 = b.add(Sink.cancelled()); + final SinkShape out2 = b.add(Sink.ignore()); b.from(broadcast.out(0)).to(out1); b.from(broadcast.out(1)).to(out2); - return new SinkShape(broadcast.in()); + return new SinkShape<>(broadcast.in()); } })); final TestKit probe = new TestKit(system); Source source = Source.actorRef( - msg -> Optional.empty(), - msg -> Optional.empty(), - 1, - OverflowStrategy.dropNew()); - final ActorRef actor = source.toMat(sink, Keep.left()).run(system); + msg -> Optional.empty(), msg -> Optional.empty(), 1, OverflowStrategy.dropNew()); + final ActorRef actor = source.toMat(sink, Keep.left()).run(system); probe.watch(actor); probe.expectTerminated(actor); } @@ -1253,9 +1240,9 @@ public class FlowTest extends StreamTest { }, system); - probe.expectMsgEquals(new Pair("A", "D")); - probe.expectMsgEquals(new Pair("B", "E")); - probe.expectMsgEquals(new Pair("C", "F")); + probe.expectMsgEquals(new Pair<>("A", "D")); + probe.expectMsgEquals(new Pair<>("B", "E")); + probe.expectMsgEquals(new Pair<>("C", "F")); } @Test