Refactor GraphDslTest and TcpTest (#30160)
This commit is contained in:
parent
4f5e01376d
commit
b3edd0eeac
2 changed files with 70 additions and 66 deletions
|
|
@ -10,19 +10,19 @@ import akka.japi.Pair;
|
||||||
import akka.stream.*;
|
import akka.stream.*;
|
||||||
import akka.testkit.AkkaJUnitActorSystemResource;
|
import akka.testkit.AkkaJUnitActorSystemResource;
|
||||||
import akka.testkit.AkkaSpec;
|
import akka.testkit.AkkaSpec;
|
||||||
|
import org.junit.Assert;
|
||||||
import org.junit.ClassRule;
|
import org.junit.ClassRule;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import scala.collection.Seq;
|
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.HashSet;
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.concurrent.CompletionStage;
|
import java.util.concurrent.CompletionStage;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
import static org.junit.Assert.assertArrayEquals;
|
import static org.junit.Assert.assertArrayEquals;
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertNotNull;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
public class GraphDslTest extends StreamTest {
|
public class GraphDslTest extends StreamTest {
|
||||||
|
|
@ -78,29 +78,33 @@ public class GraphDslTest extends StreamTest {
|
||||||
@Test
|
@Test
|
||||||
@SuppressWarnings("unused")
|
@SuppressWarnings("unused")
|
||||||
public void demonstrateConnectErrors() {
|
public void demonstrateConnectErrors() {
|
||||||
try {
|
IllegalStateException exception =
|
||||||
// #simple-graph
|
Assert.assertThrows(
|
||||||
final RunnableGraph<NotUsed> g =
|
"expected IllegalStateException",
|
||||||
RunnableGraph.<NotUsed>fromGraph(
|
IllegalStateException.class,
|
||||||
GraphDSL.create(
|
() -> {
|
||||||
(b) -> {
|
// #simple-graph
|
||||||
final SourceShape<Integer> source1 =
|
final RunnableGraph<NotUsed> g =
|
||||||
b.add(Source.from(Arrays.asList(1, 2, 3, 4, 5)));
|
RunnableGraph.fromGraph(
|
||||||
final SourceShape<Integer> source2 =
|
GraphDSL.create(
|
||||||
b.add(Source.from(Arrays.asList(1, 2, 3, 4, 5)));
|
(b) -> {
|
||||||
final FanInShape2<Integer, Integer, Pair<Integer, Integer>> zip =
|
final SourceShape<Integer> source1 =
|
||||||
b.add(Zip.create());
|
b.add(Source.from(Arrays.asList(1, 2, 3, 4, 5)));
|
||||||
b.from(source1).toInlet(zip.in0());
|
final SourceShape<Integer> source2 =
|
||||||
b.from(source2).toInlet(zip.in1());
|
b.add(Source.from(Arrays.asList(1, 2, 3, 4, 5)));
|
||||||
return ClosedShape.getInstance();
|
final FanInShape2<Integer, Integer, Pair<Integer, Integer>> zip =
|
||||||
}));
|
b.add(Zip.create());
|
||||||
// unconnected zip.out (!) => "The inlets [] and outlets [] must correspond to the inlets []
|
b.from(source1).toInlet(zip.in0());
|
||||||
// and outlets [ZipWith2.out]"
|
b.from(source2).toInlet(zip.in1());
|
||||||
// #simple-graph
|
return ClosedShape.getInstance();
|
||||||
org.junit.Assert.fail("expected IllegalArgumentException");
|
}));
|
||||||
} catch (IllegalStateException e) {
|
// unconnected zip.out (!) => "The inlets [] and outlets [] must correspond to the
|
||||||
assertTrue(e != null && e.getMessage() != null && e.getMessage().contains("ZipWith2.out"));
|
// inlets []
|
||||||
}
|
// and outlets [ZipWith2.out]"
|
||||||
|
// #simple-graph
|
||||||
|
});
|
||||||
|
assertNotNull(exception.getMessage());
|
||||||
|
assertTrue(exception.getMessage().contains("ZipWith2.out"));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|
@ -112,7 +116,7 @@ public class GraphDslTest extends StreamTest {
|
||||||
Flow.of(Integer.class).map(elem -> elem * 2);
|
Flow.of(Integer.class).map(elem -> elem * 2);
|
||||||
|
|
||||||
final RunnableGraph<Pair<CompletionStage<Integer>, CompletionStage<Integer>>> g =
|
final RunnableGraph<Pair<CompletionStage<Integer>, CompletionStage<Integer>>> g =
|
||||||
RunnableGraph.<Pair<CompletionStage<Integer>, CompletionStage<Integer>>>fromGraph(
|
RunnableGraph.fromGraph(
|
||||||
GraphDSL.create(
|
GraphDSL.create(
|
||||||
topHeadSink, // import this sink into the graph
|
topHeadSink, // import this sink into the graph
|
||||||
bottomHeadSink, // and this as well
|
bottomHeadSink, // and this as well
|
||||||
|
|
@ -137,7 +141,7 @@ public class GraphDslTest extends StreamTest {
|
||||||
public void demonstrateMatValue() throws Exception {
|
public void demonstrateMatValue() throws Exception {
|
||||||
// #graph-dsl-matvalue
|
// #graph-dsl-matvalue
|
||||||
final Sink<Integer, CompletionStage<Integer>> foldSink =
|
final Sink<Integer, CompletionStage<Integer>> foldSink =
|
||||||
Sink.<Integer, Integer>fold(
|
Sink.fold(
|
||||||
0,
|
0,
|
||||||
(a, b) -> {
|
(a, b) -> {
|
||||||
return a + b;
|
return a + b;
|
||||||
|
|
@ -222,10 +226,9 @@ public class GraphDslTest extends StreamTest {
|
||||||
public void canUseMapMaterializedValueOnGraphs() {
|
public void canUseMapMaterializedValueOnGraphs() {
|
||||||
Graph<SourceShape<Object>, NotUsed> srcGraph = Source.empty();
|
Graph<SourceShape<Object>, NotUsed> srcGraph = Source.empty();
|
||||||
Graph<SourceShape<Object>, Pair<NotUsed, NotUsed>> mappedMatValueSrcGraph =
|
Graph<SourceShape<Object>, Pair<NotUsed, NotUsed>> mappedMatValueSrcGraph =
|
||||||
Graph.mapMaterializedValue(
|
Graph.mapMaterializedValue(srcGraph, notUsed -> new Pair<>(notUsed, notUsed));
|
||||||
srcGraph, notUsed -> new Pair<NotUsed, NotUsed>(notUsed, notUsed));
|
|
||||||
Sink<Object, CompletionStage<Done>> snk = Sink.ignore();
|
Sink<Object, CompletionStage<Done>> snk = Sink.ignore();
|
||||||
Pair<NotUsed, NotUsed> pair = Source.fromGraph(mappedMatValueSrcGraph).to(snk).run(system);
|
Pair<NotUsed, NotUsed> pair = Source.fromGraph(mappedMatValueSrcGraph).to(snk).run(system);
|
||||||
assertEquals(pair, new Pair<NotUsed, NotUsed>(NotUsed.getInstance(), NotUsed.getInstance()));
|
assertEquals(pair, new Pair<>(NotUsed.getInstance(), NotUsed.getInstance()));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -20,6 +20,8 @@ import akka.testkit.javadsl.EventFilter;
|
||||||
import akka.testkit.javadsl.TestKit;
|
import akka.testkit.javadsl.TestKit;
|
||||||
import akka.util.ByteString;
|
import akka.util.ByteString;
|
||||||
import static akka.util.ByteString.emptyByteString;
|
import static akka.util.ByteString.emptyByteString;
|
||||||
|
|
||||||
|
import org.junit.Assert;
|
||||||
import org.junit.ClassRule;
|
import org.junit.ClassRule;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
|
|
@ -69,7 +71,7 @@ public class TcpTest extends StreamTest {
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
final List<ByteString> testInput = new ArrayList<ByteString>();
|
final List<ByteString> testInput = new ArrayList<>();
|
||||||
|
|
||||||
{
|
{
|
||||||
for (char c = 'a'; c <= 'z'; c++) {
|
for (char c = 'a'; c <= 'z'; c++) {
|
||||||
|
|
@ -127,22 +129,20 @@ public class TcpTest extends StreamTest {
|
||||||
.occurrences(1)
|
.occurrences(1)
|
||||||
.intercept(
|
.intercept(
|
||||||
() -> {
|
() -> {
|
||||||
try {
|
ExecutionException executionException =
|
||||||
binding
|
Assert.assertThrows(
|
||||||
.to(echoHandler)
|
"CompletableFuture.get() should throw ExecutionException",
|
||||||
.run(system)
|
ExecutionException.class,
|
||||||
.toCompletableFuture()
|
() ->
|
||||||
.get(5, TimeUnit.SECONDS);
|
binding
|
||||||
assertTrue("Expected BindFailedException, but nothing was reported", false);
|
.to(echoHandler)
|
||||||
} catch (ExecutionException e) {
|
.run(system)
|
||||||
if (e.getCause() instanceof BindFailedException) {
|
.toCompletableFuture()
|
||||||
} // all good
|
.get(5, TimeUnit.SECONDS));
|
||||||
else throw new AssertionError("failed", e);
|
assertTrue(
|
||||||
// expected
|
"The cause of ExecutionException should be instanceof BindFailedException",
|
||||||
b.unbind();
|
executionException.getCause() instanceof BindFailedException);
|
||||||
} catch (Exception e) {
|
b.unbind();
|
||||||
throw new AssertionError("failed", e);
|
|
||||||
}
|
|
||||||
return null;
|
return null;
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
@ -150,26 +150,27 @@ public class TcpTest extends StreamTest {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void mustReportClientConnectFailure() throws Throwable {
|
public void mustReportClientConnectFailure() {
|
||||||
final InetSocketAddress serverAddress = SocketUtil.notBoundServerAddress();
|
final InetSocketAddress serverAddress = SocketUtil.notBoundServerAddress();
|
||||||
try {
|
ExecutionException executionException =
|
||||||
try {
|
Assert.assertThrows(
|
||||||
Source.from(testInput)
|
"CompletableFuture.get() should throw ExecutionException",
|
||||||
.viaMat(
|
ExecutionException.class,
|
||||||
Tcp.get(system)
|
() ->
|
||||||
.outgoingConnection(serverAddress.getHostString(), serverAddress.getPort()),
|
Source.from(testInput)
|
||||||
Keep.right())
|
.viaMat(
|
||||||
.to(Sink.<ByteString>ignore())
|
Tcp.get(system)
|
||||||
.run(system)
|
.outgoingConnection(
|
||||||
.toCompletableFuture()
|
serverAddress.getHostString(), serverAddress.getPort()),
|
||||||
.get(5, TimeUnit.SECONDS);
|
Keep.right())
|
||||||
assertTrue("Expected StreamTcpException, but nothing was reported", false);
|
.to(Sink.ignore())
|
||||||
} catch (ExecutionException e) {
|
.run(system)
|
||||||
throw e.getCause();
|
.toCompletableFuture()
|
||||||
}
|
.get(5, TimeUnit.SECONDS));
|
||||||
} catch (StreamTcpException e) {
|
assertEquals(
|
||||||
// expected
|
"The cause of ExecutionException should be StreamTcpException",
|
||||||
}
|
StreamTcpException.class,
|
||||||
|
executionException.getCause().getClass());
|
||||||
}
|
}
|
||||||
|
|
||||||
// compile only sample
|
// compile only sample
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue