diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java index 3335ceb072..5940f6636b 100644 --- a/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java @@ -20,6 +20,7 @@ import akka.stream.stage.*; import akka.testkit.AkkaSpec; import akka.stream.testkit.TestPublisher; import akka.testkit.javadsl.TestKit; +import org.junit.Assert; import org.junit.ClassRule; import org.junit.Test; import org.reactivestreams.Publisher; @@ -665,10 +666,7 @@ public class FlowTest extends StreamTest { Source.from(mainInputs).via(flow).runWith(Sink.head(), system); List result = future.toCompletableFuture().get(3, TimeUnit.SECONDS); - final Set set = new HashSet<>(); - for (Integer i : result) { - set.add(i); - } + final Set set = new HashSet<>(result); final Set expected = new HashSet<>(); for (int i = 0; i < 40; ++i) { expected.add(i); @@ -1256,57 +1254,54 @@ public class FlowTest extends StreamTest { } @Test - public void mustBeAbleToUseInitialTimeout() throws Throwable { - try { - try { - Source.maybe() - .via(Flow.of(Integer.class).initialTimeout(Duration.ofSeconds(1))) - .runWith(Sink.head(), system) - .toCompletableFuture() - .get(3, TimeUnit.SECONDS); - org.junit.Assert.fail("A TimeoutException was expected"); - } catch (ExecutionException e) { - throw e.getCause(); - } - } catch (TimeoutException e) { - // expected - } + public void mustBeAbleToUseInitialTimeout() { + ExecutionException executionException = + Assert.assertThrows( + ExecutionException.class, + () -> + Source.maybe() + .via(Flow.of(Integer.class).initialTimeout(Duration.ofSeconds(1))) + .runWith(Sink.head(), system) + .toCompletableFuture() + .get(3, TimeUnit.SECONDS)); + assertEquals( + "A TimeoutException was expected", + TimeoutException.class, + executionException.getCause().getClass()); } @Test - public void mustBeAbleToUseCompletionTimeout() throws Throwable { - try { - try { - Source.maybe() - .via(Flow.of(Integer.class).completionTimeout(Duration.ofSeconds(1))) - .runWith(Sink.head(), system) - .toCompletableFuture() - .get(3, TimeUnit.SECONDS); - org.junit.Assert.fail("A TimeoutException was expected"); - } catch (ExecutionException e) { - throw e.getCause(); - } - } catch (TimeoutException e) { - // expected - } + public void mustBeAbleToUseCompletionTimeout() { + ExecutionException executionException = + Assert.assertThrows( + ExecutionException.class, + () -> + Source.maybe() + .via(Flow.of(Integer.class).completionTimeout(Duration.ofSeconds(1))) + .runWith(Sink.head(), system) + .toCompletableFuture() + .get(3, TimeUnit.SECONDS)); + assertEquals( + "A TimeoutException was expected", + TimeoutException.class, + executionException.getCause().getClass()); } @Test - public void mustBeAbleToUseIdleTimeout() throws Throwable { - try { - try { - Source.maybe() - .via(Flow.of(Integer.class).idleTimeout(Duration.ofSeconds(1))) - .runWith(Sink.head(), system) - .toCompletableFuture() - .get(3, TimeUnit.SECONDS); - org.junit.Assert.fail("A TimeoutException was expected"); - } catch (ExecutionException e) { - throw e.getCause(); - } - } catch (TimeoutException e) { - // expected - } + public void mustBeAbleToUseIdleTimeout() { + ExecutionException executionException = + Assert.assertThrows( + ExecutionException.class, + () -> + Source.maybe() + .via(Flow.of(Integer.class).idleTimeout(Duration.ofSeconds(1))) + .runWith(Sink.head(), system) + .toCompletableFuture() + .get(3, TimeUnit.SECONDS)); + assertEquals( + "A TimeoutException was expected", + TimeoutException.class, + executionException.getCause().getClass()); } @Test @@ -1316,7 +1311,7 @@ public class FlowTest extends StreamTest { .via( Flow.of(Integer.class).keepAlive(Duration.ofSeconds(1), (Creator) () -> 0)) .takeWithin(Duration.ofMillis(1500)) - .runWith(Sink.head(), system) + .runWith(Sink.head(), system) .toCompletableFuture() .get(3, TimeUnit.SECONDS); @@ -1362,13 +1357,12 @@ public class FlowTest extends StreamTest { @Test public void mustBeAbleToUseLazyInit() throws Exception { - final CompletionStage> future = - new CompletableFuture>(); + final CompletionStage> future = new CompletableFuture<>(); future.toCompletableFuture().complete(Flow.fromFunction((id) -> id)); Integer result = Source.range(1, 10) .via(Flow.lazyCompletionStageFlow(() -> future)) - .runWith(Sink.head(), system) + .runWith(Sink.head(), system) .toCompletableFuture() .get(3, TimeUnit.SECONDS);