Replace try/catch on assertThrows in FlowTest.java (#30560)
This commit is contained in:
parent
29a595f82c
commit
6fca339521
1 changed files with 47 additions and 53 deletions
|
|
@ -20,6 +20,7 @@ import akka.stream.stage.*;
|
|||
import akka.testkit.AkkaSpec;
|
||||
import akka.stream.testkit.TestPublisher;
|
||||
import akka.testkit.javadsl.TestKit;
|
||||
import org.junit.Assert;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Test;
|
||||
import org.reactivestreams.Publisher;
|
||||
|
|
@ -665,10 +666,7 @@ public class FlowTest extends StreamTest {
|
|||
Source.from(mainInputs).via(flow).runWith(Sink.head(), system);
|
||||
|
||||
List<Integer> result = future.toCompletableFuture().get(3, TimeUnit.SECONDS);
|
||||
final Set<Integer> set = new HashSet<>();
|
||||
for (Integer i : result) {
|
||||
set.add(i);
|
||||
}
|
||||
final Set<Integer> set = new HashSet<>(result);
|
||||
final Set<Integer> expected = new HashSet<>();
|
||||
for (int i = 0; i < 40; ++i) {
|
||||
expected.add(i);
|
||||
|
|
@ -1256,57 +1254,54 @@ public class FlowTest extends StreamTest {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void mustBeAbleToUseInitialTimeout() throws Throwable {
|
||||
try {
|
||||
try {
|
||||
Source.<Integer>maybe()
|
||||
.via(Flow.of(Integer.class).initialTimeout(Duration.ofSeconds(1)))
|
||||
.runWith(Sink.<Integer>head(), system)
|
||||
.toCompletableFuture()
|
||||
.get(3, TimeUnit.SECONDS);
|
||||
org.junit.Assert.fail("A TimeoutException was expected");
|
||||
} catch (ExecutionException e) {
|
||||
throw e.getCause();
|
||||
}
|
||||
} catch (TimeoutException e) {
|
||||
// expected
|
||||
}
|
||||
public void mustBeAbleToUseInitialTimeout() {
|
||||
ExecutionException executionException =
|
||||
Assert.assertThrows(
|
||||
ExecutionException.class,
|
||||
() ->
|
||||
Source.<Integer>maybe()
|
||||
.via(Flow.of(Integer.class).initialTimeout(Duration.ofSeconds(1)))
|
||||
.runWith(Sink.head(), system)
|
||||
.toCompletableFuture()
|
||||
.get(3, TimeUnit.SECONDS));
|
||||
assertEquals(
|
||||
"A TimeoutException was expected",
|
||||
TimeoutException.class,
|
||||
executionException.getCause().getClass());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void mustBeAbleToUseCompletionTimeout() throws Throwable {
|
||||
try {
|
||||
try {
|
||||
Source.<Integer>maybe()
|
||||
.via(Flow.of(Integer.class).completionTimeout(Duration.ofSeconds(1)))
|
||||
.runWith(Sink.<Integer>head(), system)
|
||||
.toCompletableFuture()
|
||||
.get(3, TimeUnit.SECONDS);
|
||||
org.junit.Assert.fail("A TimeoutException was expected");
|
||||
} catch (ExecutionException e) {
|
||||
throw e.getCause();
|
||||
}
|
||||
} catch (TimeoutException e) {
|
||||
// expected
|
||||
}
|
||||
public void mustBeAbleToUseCompletionTimeout() {
|
||||
ExecutionException executionException =
|
||||
Assert.assertThrows(
|
||||
ExecutionException.class,
|
||||
() ->
|
||||
Source.<Integer>maybe()
|
||||
.via(Flow.of(Integer.class).completionTimeout(Duration.ofSeconds(1)))
|
||||
.runWith(Sink.head(), system)
|
||||
.toCompletableFuture()
|
||||
.get(3, TimeUnit.SECONDS));
|
||||
assertEquals(
|
||||
"A TimeoutException was expected",
|
||||
TimeoutException.class,
|
||||
executionException.getCause().getClass());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void mustBeAbleToUseIdleTimeout() throws Throwable {
|
||||
try {
|
||||
try {
|
||||
Source.<Integer>maybe()
|
||||
.via(Flow.of(Integer.class).idleTimeout(Duration.ofSeconds(1)))
|
||||
.runWith(Sink.<Integer>head(), system)
|
||||
.toCompletableFuture()
|
||||
.get(3, TimeUnit.SECONDS);
|
||||
org.junit.Assert.fail("A TimeoutException was expected");
|
||||
} catch (ExecutionException e) {
|
||||
throw e.getCause();
|
||||
}
|
||||
} catch (TimeoutException e) {
|
||||
// expected
|
||||
}
|
||||
public void mustBeAbleToUseIdleTimeout() {
|
||||
ExecutionException executionException =
|
||||
Assert.assertThrows(
|
||||
ExecutionException.class,
|
||||
() ->
|
||||
Source.<Integer>maybe()
|
||||
.via(Flow.of(Integer.class).idleTimeout(Duration.ofSeconds(1)))
|
||||
.runWith(Sink.head(), system)
|
||||
.toCompletableFuture()
|
||||
.get(3, TimeUnit.SECONDS));
|
||||
assertEquals(
|
||||
"A TimeoutException was expected",
|
||||
TimeoutException.class,
|
||||
executionException.getCause().getClass());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
@ -1316,7 +1311,7 @@ public class FlowTest extends StreamTest {
|
|||
.via(
|
||||
Flow.of(Integer.class).keepAlive(Duration.ofSeconds(1), (Creator<Integer>) () -> 0))
|
||||
.takeWithin(Duration.ofMillis(1500))
|
||||
.runWith(Sink.<Integer>head(), system)
|
||||
.runWith(Sink.head(), system)
|
||||
.toCompletableFuture()
|
||||
.get(3, TimeUnit.SECONDS);
|
||||
|
||||
|
|
@ -1362,13 +1357,12 @@ public class FlowTest extends StreamTest {
|
|||
|
||||
@Test
|
||||
public void mustBeAbleToUseLazyInit() throws Exception {
|
||||
final CompletionStage<Flow<Integer, Integer, NotUsed>> future =
|
||||
new CompletableFuture<Flow<Integer, Integer, NotUsed>>();
|
||||
final CompletionStage<Flow<Integer, Integer, NotUsed>> future = new CompletableFuture<>();
|
||||
future.toCompletableFuture().complete(Flow.fromFunction((id) -> id));
|
||||
Integer result =
|
||||
Source.range(1, 10)
|
||||
.via(Flow.lazyCompletionStageFlow(() -> future))
|
||||
.runWith(Sink.<Integer>head(), system)
|
||||
.runWith(Sink.head(), system)
|
||||
.toCompletableFuture()
|
||||
.get(3, TimeUnit.SECONDS);
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue