Refactor SourceTest.java: use diamong operator, use assertThrows (#30152)

This commit is contained in:
Andrei Arlou 2021-03-30 18:37:19 +03:00 committed by GitHub
parent 03375a7eff
commit 846359919f
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -23,9 +23,9 @@ import akka.testkit.AkkaSpec;
import akka.stream.testkit.TestPublisher; import akka.stream.testkit.TestPublisher;
import akka.testkit.javadsl.TestKit; import akka.testkit.javadsl.TestKit;
import com.google.common.collect.Iterables; import com.google.common.collect.Iterables;
import org.junit.Assert;
import org.junit.ClassRule; import org.junit.ClassRule;
import org.junit.Test; import org.junit.Test;
import scala.concurrent.duration.FiniteDuration;
import scala.util.Try; import scala.util.Try;
import akka.testkit.AkkaJUnitActorSystemResource; import akka.testkit.AkkaJUnitActorSystemResource;
@ -194,7 +194,7 @@ public class SourceTest extends StreamTest {
.mergeSubstreams(); .mergeSubstreams();
final CompletionStage<List<List<String>>> future = final CompletionStage<List<List<String>>> future =
source.grouped(10).runWith(Sink.<List<List<String>>>head(), system); source.grouped(10).runWith(Sink.head(), system);
final Object[] result = future.toCompletableFuture().get(1, TimeUnit.SECONDS).toArray(); final Object[] result = future.toCompletableFuture().get(1, TimeUnit.SECONDS).toArray();
Arrays.sort( Arrays.sort(
result, result,
@ -229,7 +229,7 @@ public class SourceTest extends StreamTest {
.concatSubstreams(); .concatSubstreams();
final CompletionStage<List<List<String>>> future = final CompletionStage<List<List<String>>> future =
source.grouped(10).runWith(Sink.<List<List<String>>>head(), system); source.grouped(10).runWith(Sink.head(), system);
final List<List<String>> result = future.toCompletableFuture().get(1, TimeUnit.SECONDS); final List<List<String>> result = future.toCompletableFuture().get(1, TimeUnit.SECONDS);
assertEquals( assertEquals(
@ -253,7 +253,7 @@ public class SourceTest extends StreamTest {
.concatSubstreams(); .concatSubstreams();
final CompletionStage<List<List<String>>> future = final CompletionStage<List<List<String>>> future =
source.grouped(10).runWith(Sink.<List<List<String>>>head(), system); source.grouped(10).runWith(Sink.head(), system);
final List<List<String>> result = future.toCompletableFuture().get(1, TimeUnit.SECONDS); final List<List<String>> result = future.toCompletableFuture().get(1, TimeUnit.SECONDS);
assertEquals( assertEquals(
@ -338,7 +338,7 @@ public class SourceTest extends StreamTest {
Source.from(input) Source.from(input)
.runWith( .runWith(
Sink.<String>onComplete( Sink.onComplete(
new Procedure<Try<Done>>() { new Procedure<Try<Done>>() {
@Override @Override
public void apply(Try<Done> param) throws Exception { public void apply(Try<Done> param) throws Exception {
@ -360,7 +360,7 @@ public class SourceTest extends StreamTest {
in -> { in -> {
throw new RuntimeException("simulated err"); throw new RuntimeException("simulated err");
}) })
.runWith(Sink.<String>head(), system) .runWith(Sink.head(), system)
.whenComplete( .whenComplete(
(s, ex) -> { (s, ex) -> {
if (ex == null) { if (ex == null) {
@ -377,7 +377,7 @@ public class SourceTest extends StreamTest {
public void mustBeAbleToUseToFuture() throws Exception { public void mustBeAbleToUseToFuture() throws Exception {
final TestKit probe = new TestKit(system); final TestKit probe = new TestKit(system);
final Iterable<String> input = Arrays.asList("A", "B", "C"); final Iterable<String> input = Arrays.asList("A", "B", "C");
CompletionStage<String> future = Source.from(input).runWith(Sink.<String>head(), system); CompletionStage<String> future = Source.from(input).runWith(Sink.head(), system);
String result = future.toCompletableFuture().get(3, TimeUnit.SECONDS); String result = future.toCompletableFuture().get(3, TimeUnit.SECONDS);
assertEquals("A", result); assertEquals("A", result);
} }
@ -402,15 +402,13 @@ public class SourceTest extends StreamTest {
final TestKit probe = new TestKit(system); final TestKit probe = new TestKit(system);
final Iterable<Integer> input = Arrays.asList(1, 2, 3, 4, 5, 6); final Iterable<Integer> input = Arrays.asList(1, 2, 3, 4, 5, 6);
CompletionStage<Pair<List<Integer>, Source<Integer, NotUsed>>> future = CompletionStage<Pair<List<Integer>, Source<Integer, NotUsed>>> future =
Source.from(input) Source.from(input).prefixAndTail(3).runWith(Sink.head(), system);
.prefixAndTail(3)
.runWith(Sink.<Pair<List<Integer>, Source<Integer, NotUsed>>>head(), system);
Pair<List<Integer>, Source<Integer, NotUsed>> result = Pair<List<Integer>, Source<Integer, NotUsed>> result =
future.toCompletableFuture().get(3, TimeUnit.SECONDS); future.toCompletableFuture().get(3, TimeUnit.SECONDS);
assertEquals(Arrays.asList(1, 2, 3), result.first()); assertEquals(Arrays.asList(1, 2, 3), result.first());
CompletionStage<List<Integer>> tailFuture = CompletionStage<List<Integer>> tailFuture =
result.second().limit(4).runWith(Sink.<Integer>seq(), system); result.second().limit(4).runWith(Sink.seq(), system);
List<Integer> tailResult = tailFuture.toCompletableFuture().get(3, TimeUnit.SECONDS); List<Integer> tailResult = tailFuture.toCompletableFuture().get(3, TimeUnit.SECONDS);
assertEquals(Arrays.asList(4, 5, 6), tailResult); assertEquals(Arrays.asList(4, 5, 6), tailResult);
} }
@ -421,16 +419,15 @@ public class SourceTest extends StreamTest {
final Iterable<Integer> input1 = Arrays.asList(1, 2, 3); final Iterable<Integer> input1 = Arrays.asList(1, 2, 3);
final Iterable<Integer> input2 = Arrays.asList(4, 5); final Iterable<Integer> input2 = Arrays.asList(4, 5);
final List<Source<Integer, NotUsed>> mainInputs = new ArrayList<Source<Integer, NotUsed>>(); final List<Source<Integer, NotUsed>> mainInputs = new ArrayList<>();
mainInputs.add(Source.from(input1)); mainInputs.add(Source.from(input1));
mainInputs.add(Source.from(input2)); mainInputs.add(Source.from(input2));
CompletionStage<List<Integer>> future = CompletionStage<List<Integer>> future =
Source.from(mainInputs) Source.from(mainInputs)
.<Integer, NotUsed>flatMapConcat( .flatMapConcat(ConstantFun.javaIdentityFunction())
ConstantFun.<Source<Integer, NotUsed>>javaIdentityFunction())
.grouped(6) .grouped(6)
.runWith(Sink.<List<Integer>>head(), system); .runWith(Sink.head(), system);
List<Integer> result = future.toCompletableFuture().get(3, TimeUnit.SECONDS); List<Integer> result = future.toCompletableFuture().get(3, TimeUnit.SECONDS);
@ -445,7 +442,7 @@ public class SourceTest extends StreamTest {
final Iterable<Integer> input3 = Arrays.asList(20, 21, 22, 23, 24, 25, 26, 27, 28, 29); final Iterable<Integer> input3 = Arrays.asList(20, 21, 22, 23, 24, 25, 26, 27, 28, 29);
final Iterable<Integer> input4 = Arrays.asList(30, 31, 32, 33, 34, 35, 36, 37, 38, 39); final Iterable<Integer> input4 = Arrays.asList(30, 31, 32, 33, 34, 35, 36, 37, 38, 39);
final List<Source<Integer, NotUsed>> mainInputs = new ArrayList<Source<Integer, NotUsed>>(); final List<Source<Integer, NotUsed>> mainInputs = new ArrayList<>();
mainInputs.add(Source.from(input1)); mainInputs.add(Source.from(input1));
mainInputs.add(Source.from(input2)); mainInputs.add(Source.from(input2));
mainInputs.add(Source.from(input3)); mainInputs.add(Source.from(input3));
@ -453,16 +450,13 @@ public class SourceTest extends StreamTest {
CompletionStage<List<Integer>> future = CompletionStage<List<Integer>> future =
Source.from(mainInputs) Source.from(mainInputs)
.flatMapMerge(3, ConstantFun.<Source<Integer, NotUsed>>javaIdentityFunction()) .flatMapMerge(3, ConstantFun.javaIdentityFunction())
.grouped(60) .grouped(60)
.runWith(Sink.<List<Integer>>head(), system); .runWith(Sink.head(), system);
List<Integer> result = future.toCompletableFuture().get(3, TimeUnit.SECONDS); List<Integer> result = future.toCompletableFuture().get(3, TimeUnit.SECONDS);
final Set<Integer> set = new HashSet<Integer>(); final Set<Integer> set = new HashSet<>(result);
for (Integer i : result) { final Set<Integer> expected = new HashSet<>();
set.add(i);
}
final Set<Integer> expected = new HashSet<Integer>();
for (int i = 0; i < 40; ++i) { for (int i = 0; i < 40; ++i) {
expected.add(i); expected.add(i);
} }
@ -478,7 +472,7 @@ public class SourceTest extends StreamTest {
Source.from(input) Source.from(input)
.buffer(2, OverflowStrategy.backpressure()) .buffer(2, OverflowStrategy.backpressure())
.grouped(4) .grouped(4)
.runWith(Sink.<List<String>>head(), system); .runWith(Sink.head(), system);
List<String> result = future.toCompletableFuture().get(3, TimeUnit.SECONDS); List<String> result = future.toCompletableFuture().get(3, TimeUnit.SECONDS);
assertEquals(input, result); assertEquals(input, result);
@ -512,7 +506,7 @@ public class SourceTest extends StreamTest {
CompletionStage<String> future = CompletionStage<String> future =
Source.from(input) Source.from(input)
.expand(in -> Stream.iterate(in, i -> i).iterator()) .expand(in -> Stream.iterate(in, i -> i).iterator())
.runWith(Sink.<String>head(), system); .runWith(Sink.head(), system);
String result = future.toCompletableFuture().get(3, TimeUnit.SECONDS); String result = future.toCompletableFuture().get(3, TimeUnit.SECONDS);
assertEquals("A", result); assertEquals("A", result);
} }
@ -580,9 +574,8 @@ public class SourceTest extends StreamTest {
@Test @Test
public void mustWorkFromFuture() throws Exception { public void mustWorkFromFuture() throws Exception {
final Iterable<String> input = Arrays.asList("A", "B", "C"); final Iterable<String> input = Arrays.asList("A", "B", "C");
CompletionStage<String> future1 = Source.from(input).runWith(Sink.<String>head(), system); CompletionStage<String> future1 = Source.from(input).runWith(Sink.head(), system);
CompletionStage<String> future2 = CompletionStage<String> future2 = Source.completionStage(future1).runWith(Sink.head(), system);
Source.completionStage(future1).runWith(Sink.<String>head(), system);
String result = future2.toCompletableFuture().get(3, TimeUnit.SECONDS); String result = future2.toCompletableFuture().get(3, TimeUnit.SECONDS);
assertEquals("A", result); assertEquals("A", result);
} }
@ -598,8 +591,7 @@ public class SourceTest extends StreamTest {
@Test @Test
public void mustWorkFromRange() throws Exception { public void mustWorkFromRange() throws Exception {
CompletionStage<List<Integer>> f = CompletionStage<List<Integer>> f = Source.range(0, 10).grouped(20).runWith(Sink.head(), system);
Source.range(0, 10).grouped(20).runWith(Sink.<List<Integer>>head(), system);
final List<Integer> result = f.toCompletableFuture().get(3, TimeUnit.SECONDS); final List<Integer> result = f.toCompletableFuture().get(3, TimeUnit.SECONDS);
assertEquals(11, result.size()); assertEquals(11, result.size());
Integer counter = 0; Integer counter = 0;
@ -609,7 +601,7 @@ public class SourceTest extends StreamTest {
@Test @Test
public void mustWorkFromRangeWithStep() throws Exception { public void mustWorkFromRangeWithStep() throws Exception {
CompletionStage<List<Integer>> f = CompletionStage<List<Integer>> f =
Source.range(0, 10, 2).grouped(20).runWith(Sink.<List<Integer>>head(), system); Source.range(0, 10, 2).grouped(20).runWith(Sink.head(), system);
final List<Integer> result = f.toCompletableFuture().get(3, TimeUnit.SECONDS); final List<Integer> result = f.toCompletableFuture().get(3, TimeUnit.SECONDS);
assertEquals(6, result.size()); assertEquals(6, result.size());
Integer counter = 0; Integer counter = 0;
@ -622,9 +614,9 @@ public class SourceTest extends StreamTest {
@Test @Test
public void mustRepeat() throws Exception { public void mustRepeat() throws Exception {
final CompletionStage<List<Integer>> f = final CompletionStage<List<Integer>> f =
Source.repeat(42).grouped(10000).runWith(Sink.<List<Integer>>head(), system); Source.repeat(42).grouped(10000).runWith(Sink.head(), system);
final List<Integer> result = f.toCompletableFuture().get(3, TimeUnit.SECONDS); final List<Integer> result = f.toCompletableFuture().get(3, TimeUnit.SECONDS);
assertEquals(result.size(), 10000); assertEquals(10000, result.size());
for (Integer i : result) assertEquals(i, (Integer) 42); for (Integer i : result) assertEquals(i, (Integer) 42);
} }
@ -652,18 +644,15 @@ public class SourceTest extends StreamTest {
source.offer("world"); source.offer("world");
source.complete(); source.complete();
assertEquals( assertEquals(
result.toCompletableFuture().get(3, TimeUnit.SECONDS), Arrays.asList("hello", "world")); Arrays.asList("hello", "world"), result.toCompletableFuture().get(3, TimeUnit.SECONDS));
} }
@Test @Test
public void mustBeAbleToUseActorRefSource() throws Exception { public void mustBeAbleToUseActorRefSource() throws Exception {
final TestKit probe = new TestKit(system); final TestKit probe = new TestKit(system);
final Source<Integer, ActorRef> actorRefSource = final Source<Integer, ActorRef> actorRefSource =
Source.<Integer>actorRef( Source.actorRef(
msg -> Optional.<CompletionStrategy>empty(), msg -> Optional.empty(), msg -> Optional.empty(), 10, OverflowStrategy.fail());
msg -> Optional.<Throwable>empty(),
10,
OverflowStrategy.fail());
final ActorRef ref = final ActorRef ref =
actorRefSource actorRefSource
.to( .to(
@ -830,11 +819,7 @@ public class SourceTest extends StreamTest {
final Source<Integer, NotUsed> source2 = Source.from(Arrays.asList(2, 3)); final Source<Integer, NotUsed> source2 = Source.from(Arrays.asList(2, 3));
final Source<Integer, NotUsed> source = final Source<Integer, NotUsed> source =
Source.combine( Source.combine(source1, source2, new ArrayList<>(), width -> Merge.create(width));
source1,
source2,
new ArrayList<Source<Integer, ?>>(),
width -> Merge.<Integer>create(width));
final CompletionStage<Done> future = final CompletionStage<Done> future =
source.runWith( source.runWith(
@ -858,7 +843,7 @@ public class SourceTest extends StreamTest {
Source.combineMat( Source.combineMat(
source1, source1,
source2, source2,
width -> Concat.<Integer>create(width), width -> Concat.create(width),
Keep.left()); // Keep.left() (i.e. preserve queueSource's materialized value) Keep.left()); // Keep.left() (i.e. preserve queueSource's materialized value)
SourceQueueWithComplete<Integer> queue = SourceQueueWithComplete<Integer> queue =
@ -940,12 +925,12 @@ public class SourceTest extends StreamTest {
List<Object> output = probe.receiveN(6); List<Object> output = probe.receiveN(6);
List<Pair<String, Integer>> expected = List<Pair<String, Integer>> expected =
Arrays.asList( Arrays.asList(
new Pair<String, Integer>("A", 1), new Pair<>("A", 1),
new Pair<String, Integer>("B", 2), new Pair<>("B", 2),
new Pair<String, Integer>("C", 3), new Pair<>("C", 3),
new Pair<String, Integer>("D", 4), new Pair<>("D", 4),
new Pair<String, Integer>("new kid on the block1", -1), new Pair<>("new kid on the block1", -1),
new Pair<String, Integer>("second newbie", -1)); new Pair<>("second newbie", -1));
assertEquals(expected, output); assertEquals(expected, output);
} }
@ -966,21 +951,26 @@ public class SourceTest extends StreamTest {
// #cycle // #cycle
} }
@Test(expected = IllegalArgumentException.class) @Test
public void cycleSourceMustThrow() throws Throwable { public void cycleSourceMustThrow() {
ExecutionException exception =
try { Assert.assertThrows(
// #cycle-error "CompletableFuture.get() should throw ExecutionException",
Iterator<Integer> emptyIterator = Collections.<Integer>emptyList().iterator(); ExecutionException.class,
Source.cycle(() -> emptyIterator) () -> {
.runWith(Sink.head(), system) // #cycle-error
// stream will be terminated with IllegalArgumentException Iterator<Integer> emptyIterator = Collections.<Integer>emptyList().iterator();
// #cycle-error Source.cycle(() -> emptyIterator)
.toCompletableFuture() .runWith(Sink.head(), system)
.get(); // stream will be terminated with IllegalArgumentException
} catch (ExecutionException e) { // #cycle-error
throw e.getCause(); .toCompletableFuture()
} .get();
});
assertEquals(
"The cause of ExecutionException should be IllegalArgumentException",
IllegalArgumentException.class,
exception.getCause().getClass());
} }
@Test @Test
@ -1045,9 +1035,9 @@ public class SourceTest extends StreamTest {
}, },
system); system);
probe.expectMsgEquals(new Pair<String, String>("A", "D")); probe.expectMsgEquals(new Pair<>("A", "D"));
probe.expectMsgEquals(new Pair<String, String>("B", "E")); probe.expectMsgEquals(new Pair<>("B", "E"));
probe.expectMsgEquals(new Pair<String, String>("C", "F")); probe.expectMsgEquals(new Pair<>("C", "F"));
} }
@Test @Test
@ -1070,57 +1060,57 @@ public class SourceTest extends StreamTest {
} }
@Test @Test
public void mustBeAbleToUseInitialTimeout() throws Throwable { public void mustBeAbleToUseInitialTimeout() {
try { ExecutionException exception =
try { Assert.assertThrows(
Source.maybe() "CompletableFuture.get() should throw ExecutionException",
.initialTimeout(Duration.ofSeconds(1)) ExecutionException.class,
.runWith(Sink.head(), system) () ->
.toCompletableFuture() Source.maybe()
.get(3, TimeUnit.SECONDS); .initialTimeout(Duration.ofSeconds(1))
org.junit.Assert.fail("A TimeoutException was expected"); .runWith(Sink.head(), system)
} catch (ExecutionException e) { .toCompletableFuture()
throw e.getCause(); .get(3, TimeUnit.SECONDS));
} assertEquals(
} catch (TimeoutException e) { "The cause of ExecutionException should be TimeoutException",
// expected TimeoutException.class,
} exception.getCause().getClass());
} }
@Test @Test
public void mustBeAbleToUseCompletionTimeout() throws Throwable { public void mustBeAbleToUseCompletionTimeout() {
try { ExecutionException exception =
try { Assert.assertThrows(
Source.maybe() "CompletableFuture.get() should throw ExecutionException",
.completionTimeout(Duration.ofSeconds(1)) ExecutionException.class,
.runWith(Sink.head(), system) () ->
.toCompletableFuture() Source.maybe()
.get(3, TimeUnit.SECONDS); .completionTimeout(Duration.ofSeconds(1))
org.junit.Assert.fail("A TimeoutException was expected"); .runWith(Sink.head(), system)
} catch (ExecutionException e) { .toCompletableFuture()
throw e.getCause(); .get(3, TimeUnit.SECONDS));
} assertEquals(
} catch (TimeoutException e) { "The cause of ExecutionException should be TimeoutException",
// expected TimeoutException.class,
} exception.getCause().getClass());
} }
@Test @Test
public void mustBeAbleToUseIdleTimeout() throws Throwable { public void mustBeAbleToUseIdleTimeout() {
try { ExecutionException exception =
try { Assert.assertThrows(
Source.maybe() "CompletableFuture.get() should throw ExecutionException",
.idleTimeout(Duration.ofSeconds(1)) ExecutionException.class,
.runWith(Sink.head(), system) () ->
.toCompletableFuture() Source.maybe()
.get(3, TimeUnit.SECONDS); .idleTimeout(Duration.ofSeconds(1))
org.junit.Assert.fail("A TimeoutException was expected"); .runWith(Sink.head(), system)
} catch (ExecutionException e) { .toCompletableFuture()
throw e.getCause(); .get(3, TimeUnit.SECONDS));
} assertEquals(
} catch (TimeoutException e) { "The cause of ExecutionException should be TimeoutException",
// expected TimeoutException.class,
} exception.getCause().getClass());
} }
@Test @Test
@ -1203,7 +1193,7 @@ public class SourceTest extends StreamTest {
final Creator<Iterator<Integer>> input = () -> iterator; final Creator<Iterator<Integer>> input = () -> iterator;
final Done completion = final Done completion =
Source.fromIterator(input).map(it -> it * 10).run(system).toCompletableFuture().join(); Source.fromIterator(input).map(it -> it * 10).run(system).toCompletableFuture().join();
assertEquals(completion, Done.getInstance()); assertEquals(Done.getInstance(), completion);
} }
@Test @Test
@ -1217,6 +1207,6 @@ public class SourceTest extends StreamTest {
.run(materializer) .run(materializer)
.toCompletableFuture() .toCompletableFuture()
.join(); .join();
assertEquals(completion, Done.getInstance()); assertEquals(Done.getInstance(), completion);
} }
} }