diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java index f85c1be21e..8f16150472 100644 --- a/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java @@ -23,9 +23,9 @@ import akka.testkit.AkkaSpec; import akka.stream.testkit.TestPublisher; import akka.testkit.javadsl.TestKit; import com.google.common.collect.Iterables; +import org.junit.Assert; import org.junit.ClassRule; import org.junit.Test; -import scala.concurrent.duration.FiniteDuration; import scala.util.Try; import akka.testkit.AkkaJUnitActorSystemResource; @@ -194,7 +194,7 @@ public class SourceTest extends StreamTest { .mergeSubstreams(); final CompletionStage>> future = - source.grouped(10).runWith(Sink.>>head(), system); + source.grouped(10).runWith(Sink.head(), system); final Object[] result = future.toCompletableFuture().get(1, TimeUnit.SECONDS).toArray(); Arrays.sort( result, @@ -229,7 +229,7 @@ public class SourceTest extends StreamTest { .concatSubstreams(); final CompletionStage>> future = - source.grouped(10).runWith(Sink.>>head(), system); + source.grouped(10).runWith(Sink.head(), system); final List> result = future.toCompletableFuture().get(1, TimeUnit.SECONDS); assertEquals( @@ -253,7 +253,7 @@ public class SourceTest extends StreamTest { .concatSubstreams(); final CompletionStage>> future = - source.grouped(10).runWith(Sink.>>head(), system); + source.grouped(10).runWith(Sink.head(), system); final List> result = future.toCompletableFuture().get(1, TimeUnit.SECONDS); assertEquals( @@ -338,7 +338,7 @@ public class SourceTest extends StreamTest { Source.from(input) .runWith( - Sink.onComplete( + Sink.onComplete( new Procedure>() { @Override public void apply(Try param) throws Exception { @@ -360,7 +360,7 @@ public class SourceTest extends StreamTest { in -> { throw new RuntimeException("simulated err"); }) - .runWith(Sink.head(), system) + .runWith(Sink.head(), system) .whenComplete( (s, ex) -> { if (ex == null) { @@ -377,7 +377,7 @@ public class SourceTest extends StreamTest { public void mustBeAbleToUseToFuture() throws Exception { final TestKit probe = new TestKit(system); final Iterable input = Arrays.asList("A", "B", "C"); - CompletionStage future = Source.from(input).runWith(Sink.head(), system); + CompletionStage future = Source.from(input).runWith(Sink.head(), system); String result = future.toCompletableFuture().get(3, TimeUnit.SECONDS); assertEquals("A", result); } @@ -402,15 +402,13 @@ public class SourceTest extends StreamTest { final TestKit probe = new TestKit(system); final Iterable input = Arrays.asList(1, 2, 3, 4, 5, 6); CompletionStage, Source>> future = - Source.from(input) - .prefixAndTail(3) - .runWith(Sink., Source>>head(), system); + Source.from(input).prefixAndTail(3).runWith(Sink.head(), system); Pair, Source> result = future.toCompletableFuture().get(3, TimeUnit.SECONDS); assertEquals(Arrays.asList(1, 2, 3), result.first()); CompletionStage> tailFuture = - result.second().limit(4).runWith(Sink.seq(), system); + result.second().limit(4).runWith(Sink.seq(), system); List tailResult = tailFuture.toCompletableFuture().get(3, TimeUnit.SECONDS); assertEquals(Arrays.asList(4, 5, 6), tailResult); } @@ -421,16 +419,15 @@ public class SourceTest extends StreamTest { final Iterable input1 = Arrays.asList(1, 2, 3); final Iterable input2 = Arrays.asList(4, 5); - final List> mainInputs = new ArrayList>(); + final List> mainInputs = new ArrayList<>(); mainInputs.add(Source.from(input1)); mainInputs.add(Source.from(input2)); CompletionStage> future = Source.from(mainInputs) - .flatMapConcat( - ConstantFun.>javaIdentityFunction()) + .flatMapConcat(ConstantFun.javaIdentityFunction()) .grouped(6) - .runWith(Sink.>head(), system); + .runWith(Sink.head(), system); List result = future.toCompletableFuture().get(3, TimeUnit.SECONDS); @@ -445,7 +442,7 @@ public class SourceTest extends StreamTest { final Iterable input3 = Arrays.asList(20, 21, 22, 23, 24, 25, 26, 27, 28, 29); final Iterable input4 = Arrays.asList(30, 31, 32, 33, 34, 35, 36, 37, 38, 39); - final List> mainInputs = new ArrayList>(); + final List> mainInputs = new ArrayList<>(); mainInputs.add(Source.from(input1)); mainInputs.add(Source.from(input2)); mainInputs.add(Source.from(input3)); @@ -453,16 +450,13 @@ public class SourceTest extends StreamTest { CompletionStage> future = Source.from(mainInputs) - .flatMapMerge(3, ConstantFun.>javaIdentityFunction()) + .flatMapMerge(3, ConstantFun.javaIdentityFunction()) .grouped(60) - .runWith(Sink.>head(), system); + .runWith(Sink.head(), system); List result = future.toCompletableFuture().get(3, TimeUnit.SECONDS); - final Set set = new HashSet(); - for (Integer i : result) { - set.add(i); - } - final Set expected = new HashSet(); + final Set set = new HashSet<>(result); + final Set expected = new HashSet<>(); for (int i = 0; i < 40; ++i) { expected.add(i); } @@ -478,7 +472,7 @@ public class SourceTest extends StreamTest { Source.from(input) .buffer(2, OverflowStrategy.backpressure()) .grouped(4) - .runWith(Sink.>head(), system); + .runWith(Sink.head(), system); List result = future.toCompletableFuture().get(3, TimeUnit.SECONDS); assertEquals(input, result); @@ -512,7 +506,7 @@ public class SourceTest extends StreamTest { CompletionStage future = Source.from(input) .expand(in -> Stream.iterate(in, i -> i).iterator()) - .runWith(Sink.head(), system); + .runWith(Sink.head(), system); String result = future.toCompletableFuture().get(3, TimeUnit.SECONDS); assertEquals("A", result); } @@ -580,9 +574,8 @@ public class SourceTest extends StreamTest { @Test public void mustWorkFromFuture() throws Exception { final Iterable input = Arrays.asList("A", "B", "C"); - CompletionStage future1 = Source.from(input).runWith(Sink.head(), system); - CompletionStage future2 = - Source.completionStage(future1).runWith(Sink.head(), system); + CompletionStage future1 = Source.from(input).runWith(Sink.head(), system); + CompletionStage future2 = Source.completionStage(future1).runWith(Sink.head(), system); String result = future2.toCompletableFuture().get(3, TimeUnit.SECONDS); assertEquals("A", result); } @@ -598,8 +591,7 @@ public class SourceTest extends StreamTest { @Test public void mustWorkFromRange() throws Exception { - CompletionStage> f = - Source.range(0, 10).grouped(20).runWith(Sink.>head(), system); + CompletionStage> f = Source.range(0, 10).grouped(20).runWith(Sink.head(), system); final List result = f.toCompletableFuture().get(3, TimeUnit.SECONDS); assertEquals(11, result.size()); Integer counter = 0; @@ -609,7 +601,7 @@ public class SourceTest extends StreamTest { @Test public void mustWorkFromRangeWithStep() throws Exception { CompletionStage> f = - Source.range(0, 10, 2).grouped(20).runWith(Sink.>head(), system); + Source.range(0, 10, 2).grouped(20).runWith(Sink.head(), system); final List result = f.toCompletableFuture().get(3, TimeUnit.SECONDS); assertEquals(6, result.size()); Integer counter = 0; @@ -622,9 +614,9 @@ public class SourceTest extends StreamTest { @Test public void mustRepeat() throws Exception { final CompletionStage> f = - Source.repeat(42).grouped(10000).runWith(Sink.>head(), system); + Source.repeat(42).grouped(10000).runWith(Sink.head(), system); final List result = f.toCompletableFuture().get(3, TimeUnit.SECONDS); - assertEquals(result.size(), 10000); + assertEquals(10000, result.size()); for (Integer i : result) assertEquals(i, (Integer) 42); } @@ -652,18 +644,15 @@ public class SourceTest extends StreamTest { source.offer("world"); source.complete(); assertEquals( - result.toCompletableFuture().get(3, TimeUnit.SECONDS), Arrays.asList("hello", "world")); + Arrays.asList("hello", "world"), result.toCompletableFuture().get(3, TimeUnit.SECONDS)); } @Test public void mustBeAbleToUseActorRefSource() throws Exception { final TestKit probe = new TestKit(system); final Source actorRefSource = - Source.actorRef( - msg -> Optional.empty(), - msg -> Optional.empty(), - 10, - OverflowStrategy.fail()); + Source.actorRef( + msg -> Optional.empty(), msg -> Optional.empty(), 10, OverflowStrategy.fail()); final ActorRef ref = actorRefSource .to( @@ -830,11 +819,7 @@ public class SourceTest extends StreamTest { final Source source2 = Source.from(Arrays.asList(2, 3)); final Source source = - Source.combine( - source1, - source2, - new ArrayList>(), - width -> Merge.create(width)); + Source.combine(source1, source2, new ArrayList<>(), width -> Merge.create(width)); final CompletionStage future = source.runWith( @@ -858,7 +843,7 @@ public class SourceTest extends StreamTest { Source.combineMat( source1, source2, - width -> Concat.create(width), + width -> Concat.create(width), Keep.left()); // Keep.left() (i.e. preserve queueSource's materialized value) SourceQueueWithComplete queue = @@ -940,12 +925,12 @@ public class SourceTest extends StreamTest { List output = probe.receiveN(6); List> expected = Arrays.asList( - new Pair("A", 1), - new Pair("B", 2), - new Pair("C", 3), - new Pair("D", 4), - new Pair("new kid on the block1", -1), - new Pair("second newbie", -1)); + new Pair<>("A", 1), + new Pair<>("B", 2), + new Pair<>("C", 3), + new Pair<>("D", 4), + new Pair<>("new kid on the block1", -1), + new Pair<>("second newbie", -1)); assertEquals(expected, output); } @@ -966,21 +951,26 @@ public class SourceTest extends StreamTest { // #cycle } - @Test(expected = IllegalArgumentException.class) - public void cycleSourceMustThrow() throws Throwable { - - try { - // #cycle-error - Iterator emptyIterator = Collections.emptyList().iterator(); - Source.cycle(() -> emptyIterator) - .runWith(Sink.head(), system) - // stream will be terminated with IllegalArgumentException - // #cycle-error - .toCompletableFuture() - .get(); - } catch (ExecutionException e) { - throw e.getCause(); - } + @Test + public void cycleSourceMustThrow() { + ExecutionException exception = + Assert.assertThrows( + "CompletableFuture.get() should throw ExecutionException", + ExecutionException.class, + () -> { + // #cycle-error + Iterator emptyIterator = Collections.emptyList().iterator(); + Source.cycle(() -> emptyIterator) + .runWith(Sink.head(), system) + // stream will be terminated with IllegalArgumentException + // #cycle-error + .toCompletableFuture() + .get(); + }); + assertEquals( + "The cause of ExecutionException should be IllegalArgumentException", + IllegalArgumentException.class, + exception.getCause().getClass()); } @Test @@ -1045,9 +1035,9 @@ public class SourceTest extends StreamTest { }, system); - probe.expectMsgEquals(new Pair("A", "D")); - probe.expectMsgEquals(new Pair("B", "E")); - probe.expectMsgEquals(new Pair("C", "F")); + probe.expectMsgEquals(new Pair<>("A", "D")); + probe.expectMsgEquals(new Pair<>("B", "E")); + probe.expectMsgEquals(new Pair<>("C", "F")); } @Test @@ -1070,57 +1060,57 @@ public class SourceTest extends StreamTest { } @Test - public void mustBeAbleToUseInitialTimeout() throws Throwable { - try { - try { - Source.maybe() - .initialTimeout(Duration.ofSeconds(1)) - .runWith(Sink.head(), system) - .toCompletableFuture() - .get(3, TimeUnit.SECONDS); - org.junit.Assert.fail("A TimeoutException was expected"); - } catch (ExecutionException e) { - throw e.getCause(); - } - } catch (TimeoutException e) { - // expected - } + public void mustBeAbleToUseInitialTimeout() { + ExecutionException exception = + Assert.assertThrows( + "CompletableFuture.get() should throw ExecutionException", + ExecutionException.class, + () -> + Source.maybe() + .initialTimeout(Duration.ofSeconds(1)) + .runWith(Sink.head(), system) + .toCompletableFuture() + .get(3, TimeUnit.SECONDS)); + assertEquals( + "The cause of ExecutionException should be TimeoutException", + TimeoutException.class, + exception.getCause().getClass()); } @Test - public void mustBeAbleToUseCompletionTimeout() throws Throwable { - try { - try { - Source.maybe() - .completionTimeout(Duration.ofSeconds(1)) - .runWith(Sink.head(), system) - .toCompletableFuture() - .get(3, TimeUnit.SECONDS); - org.junit.Assert.fail("A TimeoutException was expected"); - } catch (ExecutionException e) { - throw e.getCause(); - } - } catch (TimeoutException e) { - // expected - } + public void mustBeAbleToUseCompletionTimeout() { + ExecutionException exception = + Assert.assertThrows( + "CompletableFuture.get() should throw ExecutionException", + ExecutionException.class, + () -> + Source.maybe() + .completionTimeout(Duration.ofSeconds(1)) + .runWith(Sink.head(), system) + .toCompletableFuture() + .get(3, TimeUnit.SECONDS)); + assertEquals( + "The cause of ExecutionException should be TimeoutException", + TimeoutException.class, + exception.getCause().getClass()); } @Test - public void mustBeAbleToUseIdleTimeout() throws Throwable { - try { - try { - Source.maybe() - .idleTimeout(Duration.ofSeconds(1)) - .runWith(Sink.head(), system) - .toCompletableFuture() - .get(3, TimeUnit.SECONDS); - org.junit.Assert.fail("A TimeoutException was expected"); - } catch (ExecutionException e) { - throw e.getCause(); - } - } catch (TimeoutException e) { - // expected - } + public void mustBeAbleToUseIdleTimeout() { + ExecutionException exception = + Assert.assertThrows( + "CompletableFuture.get() should throw ExecutionException", + ExecutionException.class, + () -> + Source.maybe() + .idleTimeout(Duration.ofSeconds(1)) + .runWith(Sink.head(), system) + .toCompletableFuture() + .get(3, TimeUnit.SECONDS)); + assertEquals( + "The cause of ExecutionException should be TimeoutException", + TimeoutException.class, + exception.getCause().getClass()); } @Test @@ -1203,7 +1193,7 @@ public class SourceTest extends StreamTest { final Creator> input = () -> iterator; final Done completion = Source.fromIterator(input).map(it -> it * 10).run(system).toCompletableFuture().join(); - assertEquals(completion, Done.getInstance()); + assertEquals(Done.getInstance(), completion); } @Test @@ -1217,6 +1207,6 @@ public class SourceTest extends StreamTest { .run(materializer) .toCompletableFuture() .join(); - assertEquals(completion, Done.getInstance()); + assertEquals(Done.getInstance(), completion); } }