GraphDSL create with Seq[Graph] added #24067
This commit is contained in:
parent
86e44167f3
commit
67a81eb01b
7 changed files with 333 additions and 189 deletions
|
|
@ -51,7 +51,7 @@ Scala
|
|||
: @@snip [GraphDSLDocSpec.scala]($code$/scala/docs/stream/GraphDSLDocSpec.scala) { #simple-graph-dsl }
|
||||
|
||||
Java
|
||||
: @@snip [GraphDSLDocTest.java]($code$/java/jdocs/stream/GraphDSLDocTest.java) { #simple-graph-dsl }
|
||||
: @@snip [GraphDSLTest.java]($akka$/akka-stream-tests/src/test/java/akka/stream/javadsl/GraphDslTest.java) { #simple-graph-dsl }
|
||||
|
||||
@@@ note
|
||||
|
||||
|
|
@ -86,7 +86,17 @@ Scala
|
|||
: @@snip [GraphDSLDocSpec.scala]($code$/scala/docs/stream/GraphDSLDocSpec.scala) { #graph-dsl-reusing-a-flow }
|
||||
|
||||
Java
|
||||
: @@snip [GraphDSLDocTest.java]($code$/java/jdocs/stream/GraphDSLDocTest.java) { #graph-dsl-reusing-a-flow }
|
||||
: @@snip [GraphDSLTest.java]($akka$/akka-stream-tests/src/test/java/akka/stream/javadsl/GraphDslTest.java) { #graph-dsl-reusing-a-flow }
|
||||
|
||||
In some cases we may have a list of graph elements, for example if they are dynamically created.
|
||||
If these graphs have similar signatures, we can construct a graph collecting all their materialized values as a collection:
|
||||
|
||||
Scala
|
||||
: @@snip [GraphOpsIntegrationSpec.scala]($akka$/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphOpsIntegrationSpec.scala) { #graph-from-list }
|
||||
|
||||
Java
|
||||
: @@snip [GraphDSLTest.java]($akka$/akka-stream-tests/src/test/java/akka/stream/javadsl/GraphDslTest.java) { #graph-from-list }
|
||||
|
||||
|
||||
<a id="partial-graph-dsl"></a>
|
||||
## Constructing and combining Partial Graphs
|
||||
|
|
@ -344,7 +354,7 @@ Scala
|
|||
: @@snip [GraphDSLDocSpec.scala]($code$/scala/docs/stream/GraphDSLDocSpec.scala) { #graph-dsl-matvalue }
|
||||
|
||||
Java
|
||||
: @@snip [GraphDSLDocTest.java]($code$/java/jdocs/stream/GraphDSLDocTest.java) { #graph-dsl-matvalue }
|
||||
: @@snip [GraphDSLTest.java]($akka$/akka-stream-tests/src/test/java/akka/stream/javadsl/GraphDslTest.java) { #graph-dsl-matvalue }
|
||||
|
||||
|
||||
Be careful not to introduce a cycle where the materialized value actually contributes to the materialized value.
|
||||
|
|
@ -354,7 +364,7 @@ Scala
|
|||
: @@snip [GraphDSLDocSpec.scala]($code$/scala/docs/stream/GraphDSLDocSpec.scala) { #graph-dsl-matvalue-cycle }
|
||||
|
||||
Java
|
||||
: @@snip [GraphDSLDocTest.java]($code$/java/jdocs/stream/GraphDSLDocTest.java) { #graph-dsl-matvalue-cycle }
|
||||
: @@snip [GraphDSLTest.java]($akka$/akka-stream-tests/src/test/java/akka/stream/javadsl/GraphDslTest.java) { #graph-dsl-matvalue-cycle }
|
||||
|
||||
|
||||
<a id="graph-cycles"></a>
|
||||
|
|
|
|||
|
|
@ -1,170 +0,0 @@
|
|||
/**
|
||||
* Copyright (C) 2015-2018 Lightbend Inc. <https://www.lightbend.com>
|
||||
*/
|
||||
|
||||
package jdocs.stream;
|
||||
|
||||
import static org.junit.Assert.*;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.CompletionStage;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import akka.NotUsed;
|
||||
import akka.stream.ClosedShape;
|
||||
import akka.stream.SourceShape;
|
||||
import jdocs.AbstractJavaTest;
|
||||
import akka.testkit.javadsl.TestKit;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
|
||||
import akka.actor.ActorSystem;
|
||||
import akka.japi.Pair;
|
||||
import akka.stream.*;
|
||||
import akka.stream.javadsl.*;
|
||||
|
||||
public class GraphDSLDocTest extends AbstractJavaTest {
|
||||
|
||||
static ActorSystem system;
|
||||
static Materializer mat;
|
||||
|
||||
@BeforeClass
|
||||
public static void setup() {
|
||||
system = ActorSystem.create("GraphDSLDocTest");
|
||||
mat = ActorMaterializer.create(system);
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void tearDown() {
|
||||
TestKit.shutdownActorSystem(system);
|
||||
system = null;
|
||||
mat = null;
|
||||
}
|
||||
|
||||
@Test
|
||||
public void demonstrateBuildSimpleGraph() throws Exception {
|
||||
//#simple-graph-dsl
|
||||
final Source<Integer, NotUsed> in = Source.from(Arrays.asList(1, 2, 3, 4, 5));
|
||||
final Sink<List<String>, CompletionStage<List<String>>> sink = Sink.head();
|
||||
final Flow<Integer, Integer, NotUsed> f1 = Flow.of(Integer.class).map(elem -> elem + 10);
|
||||
final Flow<Integer, Integer, NotUsed> f2 = Flow.of(Integer.class).map(elem -> elem + 20);
|
||||
final Flow<Integer, String, NotUsed> f3 = Flow.of(Integer.class).map(elem -> elem.toString());
|
||||
final Flow<Integer, Integer, NotUsed> f4 = Flow.of(Integer.class).map(elem -> elem + 30);
|
||||
|
||||
final RunnableGraph<CompletionStage<List<String>>> result =
|
||||
RunnableGraph.fromGraph(
|
||||
GraphDSL // create() function binds sink, out which is sink's out port and builder DSL
|
||||
.create( // we need to reference out's shape in the builder DSL below (in to() function)
|
||||
sink, // previously created sink (Sink)
|
||||
(builder, out) -> { // variables: builder (GraphDSL.Builder) and out (SinkShape)
|
||||
final UniformFanOutShape<Integer, Integer> bcast = builder.add(Broadcast.create(2));
|
||||
final UniformFanInShape<Integer, Integer> merge = builder.add(Merge.create(2));
|
||||
|
||||
final Outlet<Integer> source = builder.add(in).out();
|
||||
builder.from(source).via(builder.add(f1))
|
||||
.viaFanOut(bcast).via(builder.add(f2)).viaFanIn(merge)
|
||||
.via(builder.add(f3.grouped(1000))).to(out); // to() expects a SinkShape
|
||||
builder.from(bcast).via(builder.add(f4)).toFanIn(merge);
|
||||
return ClosedShape.getInstance();
|
||||
}));
|
||||
//#simple-graph-dsl
|
||||
final List<String> list = result.run(mat).toCompletableFuture().get(3, TimeUnit.SECONDS);
|
||||
final String[] res = list.toArray(new String[] {});
|
||||
Arrays.sort(res, null);
|
||||
assertArrayEquals(new String[] { "31", "32", "33", "34", "35", "41", "42", "43", "44", "45" }, res);
|
||||
}
|
||||
|
||||
@Test
|
||||
@SuppressWarnings("unused")
|
||||
public void demonstrateConnectErrors() {
|
||||
try {
|
||||
//#simple-graph
|
||||
final RunnableGraph<NotUsed> g =
|
||||
RunnableGraph.<NotUsed>fromGraph(
|
||||
GraphDSL
|
||||
.create((b) -> {
|
||||
final SourceShape<Integer> source1 = b.add(Source.from(Arrays.asList(1, 2, 3, 4, 5)));
|
||||
final SourceShape<Integer> source2 = b.add(Source.from(Arrays.asList(1, 2, 3, 4, 5)));
|
||||
final FanInShape2<Integer, Integer, Pair<Integer, Integer>> zip = b.add(Zip.create());
|
||||
b.from(source1).toInlet(zip.in0());
|
||||
b.from(source2).toInlet(zip.in1());
|
||||
return ClosedShape.getInstance();
|
||||
}
|
||||
)
|
||||
);
|
||||
// unconnected zip.out (!) => "The inlets [] and outlets [] must correspond to the inlets [] and outlets [ZipWith2.out]"
|
||||
//#simple-graph
|
||||
org.junit.Assert.fail("expected IllegalArgumentException");
|
||||
} catch (IllegalStateException e) {
|
||||
assertTrue(e != null && e.getMessage() != null && e.getMessage().contains("ZipWith2.out"));
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void demonstrateReusingFlowInGraph() throws Exception {
|
||||
//#graph-dsl-reusing-a-flow
|
||||
final Sink<Integer, CompletionStage<Integer>> topHeadSink = Sink.head();
|
||||
final Sink<Integer, CompletionStage<Integer>> bottomHeadSink = Sink.head();
|
||||
final Flow<Integer, Integer, NotUsed> sharedDoubler = Flow.of(Integer.class).map(elem -> elem * 2);
|
||||
|
||||
final RunnableGraph<Pair<CompletionStage<Integer>, CompletionStage<Integer>>> g =
|
||||
RunnableGraph.<Pair<CompletionStage<Integer>, CompletionStage<Integer>>>fromGraph(
|
||||
GraphDSL.create(
|
||||
topHeadSink, // import this sink into the graph
|
||||
bottomHeadSink, // and this as well
|
||||
Keep.both(),
|
||||
(b, top, bottom) -> {
|
||||
final UniformFanOutShape<Integer, Integer> bcast =
|
||||
b.add(Broadcast.create(2));
|
||||
|
||||
b.from(b.add(Source.single(1))).viaFanOut(bcast)
|
||||
.via(b.add(sharedDoubler)).to(top);
|
||||
b.from(bcast).via(b.add(sharedDoubler)).to(bottom);
|
||||
return ClosedShape.getInstance();
|
||||
}
|
||||
)
|
||||
);
|
||||
//#graph-dsl-reusing-a-flow
|
||||
final Pair<CompletionStage<Integer>, CompletionStage<Integer>> pair = g.run(mat);
|
||||
assertEquals(Integer.valueOf(2), pair.first().toCompletableFuture().get(3, TimeUnit.SECONDS));
|
||||
assertEquals(Integer.valueOf(2), pair.second().toCompletableFuture().get(3, TimeUnit.SECONDS));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void demonstrateMatValue() throws Exception {
|
||||
//#graph-dsl-matvalue
|
||||
final Sink<Integer, CompletionStage<Integer>> foldSink = Sink.<Integer, Integer> fold(0, (a, b) -> {
|
||||
return a + b;
|
||||
});
|
||||
|
||||
final Flow<CompletionStage<Integer>, Integer, NotUsed> flatten =
|
||||
Flow.<CompletionStage<Integer>>create().mapAsync(4, x -> x);
|
||||
|
||||
final Flow<Integer, Integer, CompletionStage<Integer>> foldingFlow = Flow.fromGraph(
|
||||
GraphDSL.create(foldSink,
|
||||
(b, fold) -> {
|
||||
return FlowShape.of(
|
||||
fold.in(),
|
||||
b.from(b.materializedValue()).via(b.add(flatten)).out());
|
||||
}));
|
||||
//#graph-dsl-matvalue
|
||||
|
||||
//#graph-dsl-matvalue-cycle
|
||||
// This cannot produce any value:
|
||||
final Source<Integer, CompletionStage<Integer>> cyclicSource = Source.fromGraph(
|
||||
GraphDSL.create(foldSink,
|
||||
(b, fold) -> {
|
||||
// - Fold cannot complete until its upstream mapAsync completes
|
||||
// - mapAsync cannot complete until the materialized Future produced by
|
||||
// fold completes
|
||||
// As a result this Source will never emit anything, and its materialited
|
||||
// Future will never complete
|
||||
b.from(b.materializedValue()).via(b.add(flatten)).to(fold);
|
||||
return SourceShape.of(b.from(b.materializedValue()).via(b.add(flatten)).out());
|
||||
}));
|
||||
|
||||
//#graph-dsl-matvalue-cycle
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,201 @@
|
|||
/*
|
||||
* Copyright (C) 2018 Lightbend Inc. <https://www.lightbend.com>
|
||||
*/
|
||||
|
||||
package akka.stream.javadsl;
|
||||
|
||||
import akka.NotUsed;
|
||||
import akka.japi.Pair;
|
||||
import akka.stream.*;
|
||||
import akka.testkit.AkkaJUnitActorSystemResource;
|
||||
import akka.testkit.AkkaSpec;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Test;
|
||||
import scala.collection.Seq;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.CompletionStage;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import static org.junit.Assert.assertArrayEquals;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
public class GraphDslTest extends StreamTest {
|
||||
public GraphDslTest() {
|
||||
super(actorSystemResource);
|
||||
}
|
||||
|
||||
@ClassRule
|
||||
public static AkkaJUnitActorSystemResource actorSystemResource = new AkkaJUnitActorSystemResource("GraphDslTest",
|
||||
AkkaSpec.testConf());
|
||||
|
||||
@Test
|
||||
public void demonstrateBuildSimpleGraph() throws Exception {
|
||||
//#simple-graph-dsl
|
||||
final Source<Integer, NotUsed> in = Source.from(Arrays.asList(1, 2, 3, 4, 5));
|
||||
final Sink<List<String>, CompletionStage<List<String>>> sink = Sink.head();
|
||||
final Flow<Integer, Integer, NotUsed> f1 = Flow.of(Integer.class).map(elem -> elem + 10);
|
||||
final Flow<Integer, Integer, NotUsed> f2 = Flow.of(Integer.class).map(elem -> elem + 20);
|
||||
final Flow<Integer, String, NotUsed> f3 = Flow.of(Integer.class).map(elem -> elem.toString());
|
||||
final Flow<Integer, Integer, NotUsed> f4 = Flow.of(Integer.class).map(elem -> elem + 30);
|
||||
|
||||
final RunnableGraph<CompletionStage<List<String>>> result =
|
||||
RunnableGraph.fromGraph(
|
||||
GraphDSL // create() function binds sink, out which is sink's out port and builder DSL
|
||||
.create( // we need to reference out's shape in the builder DSL below (in to() function)
|
||||
sink, // previously created sink (Sink)
|
||||
(builder, out) -> { // variables: builder (GraphDSL.Builder) and out (SinkShape)
|
||||
final UniformFanOutShape<Integer, Integer> bcast = builder.add(Broadcast.create(2));
|
||||
final UniformFanInShape<Integer, Integer> merge = builder.add(Merge.create(2));
|
||||
|
||||
final Outlet<Integer> source = builder.add(in).out();
|
||||
builder.from(source).via(builder.add(f1))
|
||||
.viaFanOut(bcast).via(builder.add(f2)).viaFanIn(merge)
|
||||
.via(builder.add(f3.grouped(1000))).to(out); // to() expects a SinkShape
|
||||
builder.from(bcast).via(builder.add(f4)).toFanIn(merge);
|
||||
return ClosedShape.getInstance();
|
||||
}));
|
||||
//#simple-graph-dsl
|
||||
final List<String> list = result.run(materializer).toCompletableFuture().get(3, TimeUnit.SECONDS);
|
||||
final String[] res = list.toArray(new String[]{});
|
||||
Arrays.sort(res, null);
|
||||
assertArrayEquals(new String[]{"31", "32", "33", "34", "35", "41", "42", "43", "44", "45"}, res);
|
||||
}
|
||||
|
||||
@Test
|
||||
@SuppressWarnings("unused")
|
||||
public void demonstrateConnectErrors() {
|
||||
try {
|
||||
//#simple-graph
|
||||
final RunnableGraph<NotUsed> g =
|
||||
RunnableGraph.<NotUsed>fromGraph(
|
||||
GraphDSL
|
||||
.create((b) -> {
|
||||
final SourceShape<Integer> source1 = b.add(Source.from(Arrays.asList(1, 2, 3, 4, 5)));
|
||||
final SourceShape<Integer> source2 = b.add(Source.from(Arrays.asList(1, 2, 3, 4, 5)));
|
||||
final FanInShape2<Integer, Integer, Pair<Integer, Integer>> zip = b.add(Zip.create());
|
||||
b.from(source1).toInlet(zip.in0());
|
||||
b.from(source2).toInlet(zip.in1());
|
||||
return ClosedShape.getInstance();
|
||||
}
|
||||
)
|
||||
);
|
||||
// unconnected zip.out (!) => "The inlets [] and outlets [] must correspond to the inlets [] and outlets [ZipWith2.out]"
|
||||
//#simple-graph
|
||||
org.junit.Assert.fail("expected IllegalArgumentException");
|
||||
} catch (IllegalStateException e) {
|
||||
assertTrue(e != null && e.getMessage() != null && e.getMessage().contains("ZipWith2.out"));
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void demonstrateReusingFlowInGraph() throws Exception {
|
||||
//#graph-dsl-reusing-a-flow
|
||||
final Sink<Integer, CompletionStage<Integer>> topHeadSink = Sink.head();
|
||||
final Sink<Integer, CompletionStage<Integer>> bottomHeadSink = Sink.head();
|
||||
final Flow<Integer, Integer, NotUsed> sharedDoubler = Flow.of(Integer.class).map(elem -> elem * 2);
|
||||
|
||||
final RunnableGraph<Pair<CompletionStage<Integer>, CompletionStage<Integer>>> g =
|
||||
RunnableGraph.<Pair<CompletionStage<Integer>, CompletionStage<Integer>>>fromGraph(
|
||||
GraphDSL.create(
|
||||
topHeadSink, // import this sink into the graph
|
||||
bottomHeadSink, // and this as well
|
||||
Keep.both(),
|
||||
(b, top, bottom) -> {
|
||||
final UniformFanOutShape<Integer, Integer> bcast =
|
||||
b.add(Broadcast.create(2));
|
||||
|
||||
b.from(b.add(Source.single(1))).viaFanOut(bcast)
|
||||
.via(b.add(sharedDoubler)).to(top);
|
||||
b.from(bcast).via(b.add(sharedDoubler)).to(bottom);
|
||||
return ClosedShape.getInstance();
|
||||
}
|
||||
)
|
||||
);
|
||||
//#graph-dsl-reusing-a-flow
|
||||
final Pair<CompletionStage<Integer>, CompletionStage<Integer>> pair = g.run(materializer);
|
||||
assertEquals(Integer.valueOf(2), pair.first().toCompletableFuture().get(3, TimeUnit.SECONDS));
|
||||
assertEquals(Integer.valueOf(2), pair.second().toCompletableFuture().get(3, TimeUnit.SECONDS));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void demonstrateMatValue() throws Exception {
|
||||
//#graph-dsl-matvalue
|
||||
final Sink<Integer, CompletionStage<Integer>> foldSink = Sink.<Integer, Integer>fold(0, (a, b) -> {
|
||||
return a + b;
|
||||
});
|
||||
|
||||
final Flow<CompletionStage<Integer>, Integer, NotUsed> flatten =
|
||||
Flow.<CompletionStage<Integer>>create().mapAsync(4, x -> x);
|
||||
|
||||
final Flow<Integer, Integer, CompletionStage<Integer>> foldingFlow = Flow.fromGraph(
|
||||
GraphDSL.create(foldSink,
|
||||
(b, fold) -> {
|
||||
return FlowShape.of(
|
||||
fold.in(),
|
||||
b.from(b.materializedValue()).via(b.add(flatten)).out());
|
||||
}));
|
||||
//#graph-dsl-matvalue
|
||||
|
||||
//#graph-dsl-matvalue-cycle
|
||||
// This cannot produce any value:
|
||||
final Source<Integer, CompletionStage<Integer>> cyclicSource = Source.fromGraph(
|
||||
GraphDSL.create(foldSink,
|
||||
(b, fold) -> {
|
||||
// - Fold cannot complete until its upstream mapAsync completes
|
||||
// - mapAsync cannot complete until the materialized Future produced by
|
||||
// fold completes
|
||||
// As a result this Source will never emit anything, and its materialited
|
||||
// Future will never complete
|
||||
b.from(b.materializedValue()).via(b.add(flatten)).to(fold);
|
||||
return SourceShape.of(b.from(b.materializedValue()).via(b.add(flatten)).out());
|
||||
}));
|
||||
|
||||
//#graph-dsl-matvalue-cycle
|
||||
}
|
||||
|
||||
@Test
|
||||
public void beAbleToConstructClosedGraphFromList() throws Exception {
|
||||
//#graph-from-list
|
||||
//create the source
|
||||
final Source<String, NotUsed> in = Source.from(Arrays.asList("ax", "bx", "cx"));
|
||||
//generate the sinks from code
|
||||
List<String> prefixes = Arrays.asList("a", "b", "c");
|
||||
final List<Sink<String, CompletionStage<String>>> list = new ArrayList<>();
|
||||
for (String prefix : prefixes) {
|
||||
final Sink<String, CompletionStage<String>> sink =
|
||||
Flow.of(String.class).filter(str -> str.startsWith(prefix)).toMat(Sink.head(), Keep.right());
|
||||
list.add(sink);
|
||||
}
|
||||
|
||||
final RunnableGraph<List<CompletionStage<String>>> g = RunnableGraph.fromGraph(
|
||||
GraphDSL.create(
|
||||
list,
|
||||
(GraphDSL.Builder<List<CompletionStage<String>>> builder, List<SinkShape<String>> outs) -> {
|
||||
final UniformFanOutShape<String, String> bcast = builder.add(Broadcast.create(outs.size()));
|
||||
|
||||
final Outlet<String> source = builder.add(in).out();
|
||||
builder.from(source).viaFanOut(bcast);
|
||||
|
||||
for (SinkShape<String> sink : outs) {
|
||||
builder.from(bcast).to(sink);
|
||||
}
|
||||
|
||||
return ClosedShape.getInstance();
|
||||
}));
|
||||
List<CompletionStage<String>> result = g.run(materializer);
|
||||
//#graph-from-list
|
||||
|
||||
assertEquals(3, result.size());
|
||||
assertEquals("ax", result.get(0).toCompletableFuture().get(1, TimeUnit.SECONDS));
|
||||
assertEquals("bx", result.get(1).toCompletableFuture().get(1, TimeUnit.SECONDS));
|
||||
assertEquals("cx", result.get(2).toCompletableFuture().get(1, TimeUnit.SECONDS));
|
||||
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -28,20 +28,21 @@ class DslFactoriesConsistencySpec extends WordSpec with Matchers {
|
|||
|
||||
// format: OFF
|
||||
val `scala -> java types` =
|
||||
(classOf[scala.collection.immutable.Iterable[_]], classOf[java.lang.Iterable[_]]) ::
|
||||
(classOf[scala.collection.Iterator[_]], classOf[java.util.Iterator[_]]) ::
|
||||
(classOf[scala.collection.Seq[_]], classOf[java.util.List[_]]) ::
|
||||
(classOf[scala.collection.immutable.Seq[_]], classOf[java.util.List[_]]) ::
|
||||
(classOf[scala.collection.immutable.Set[_]], classOf[java.util.Set[_]]) ::
|
||||
(classOf[Boolean], classOf[akka.stream.javadsl.AsPublisher]) ::
|
||||
(classOf[scala.Function0[_]], classOf[akka.japi.function.Creator[_]]) ::
|
||||
(classOf[scala.Function0[_]], classOf[java.util.concurrent.Callable[_]]) ::
|
||||
(classOf[scala.Function1[_, Unit]], classOf[akka.japi.function.Procedure[_]]) ::
|
||||
(classOf[scala.Function1[_, _]], classOf[akka.japi.function.Function[_, _]]) ::
|
||||
(classOf[akka.stream.scaladsl.Source[_, _]], classOf[akka.stream.javadsl.Source[_, _]]) ::
|
||||
(classOf[akka.stream.scaladsl.Sink[_, _]], classOf[akka.stream.javadsl.Sink[_, _]]) ::
|
||||
(classOf[akka.stream.scaladsl.Flow[_, _, _]], classOf[akka.stream.javadsl.Flow[_, _, _]]) ::
|
||||
(classOf[akka.stream.scaladsl.RunnableGraph[_]], classOf[akka.stream.javadsl.RunnableGraph[_]]) ::
|
||||
(classOf[scala.collection.immutable.Iterable[_]], classOf[java.lang.Iterable[_]]) ::
|
||||
(classOf[scala.collection.Iterator[_]], classOf[java.util.Iterator[_]]) ::
|
||||
(classOf[scala.collection.Seq[_]], classOf[java.util.List[_]]) ::
|
||||
(classOf[scala.collection.immutable.Seq[_]], classOf[java.util.List[_]]) ::
|
||||
(classOf[scala.collection.immutable.Set[_]], classOf[java.util.Set[_]]) ::
|
||||
(classOf[Boolean], classOf[akka.stream.javadsl.AsPublisher]) ::
|
||||
(classOf[scala.Function0[_]], classOf[akka.japi.function.Creator[_]]) ::
|
||||
(classOf[scala.Function0[_]], classOf[java.util.concurrent.Callable[_]]) ::
|
||||
(classOf[scala.Function1[_, Unit]], classOf[akka.japi.function.Procedure[_]]) ::
|
||||
(classOf[scala.Function1[_, _]], classOf[akka.japi.function.Function[_, _]]) ::
|
||||
(classOf[scala.Function1[scala.Function1[_, _], _]], classOf[akka.japi.function.Function2[_, _, _]]) ::
|
||||
(classOf[akka.stream.scaladsl.Source[_, _]], classOf[akka.stream.javadsl.Source[_, _]]) ::
|
||||
(classOf[akka.stream.scaladsl.Sink[_, _]], classOf[akka.stream.javadsl.Sink[_, _]]) ::
|
||||
(classOf[akka.stream.scaladsl.Flow[_, _, _]], classOf[akka.stream.javadsl.Flow[_, _, _]]) ::
|
||||
(classOf[akka.stream.scaladsl.RunnableGraph[_]], classOf[akka.stream.javadsl.RunnableGraph[_]]) ::
|
||||
((2 to 22) map { i => (Class.forName(s"scala.Function$i"), Class.forName(s"akka.japi.function.Function$i")) }).toList
|
||||
// format: ON
|
||||
|
||||
|
|
|
|||
|
|
@ -188,6 +188,56 @@ class GraphOpsIntegrationSpec extends StreamSpec {
|
|||
result.toSet should ===(Set(4, 5, 6, 13, 14, 15))
|
||||
}
|
||||
|
||||
"be possible to use with generated components" in {
|
||||
implicit val ex = materializer.system.dispatcher
|
||||
|
||||
//#graph-from-list
|
||||
val sinks = immutable.Seq("a", "b", "c").map(prefix ⇒
|
||||
Flow[String].filter(str ⇒ str.startsWith(prefix)).toMat(Sink.head[String])(Keep.right)
|
||||
)
|
||||
|
||||
val g: RunnableGraph[Seq[Future[String]]] = RunnableGraph.fromGraph(GraphDSL.create(sinks) { implicit b ⇒ sinkList ⇒
|
||||
val broadcast = b.add(Broadcast[String](sinkList.size))
|
||||
|
||||
Source(List("ax", "bx", "cx")) ~> broadcast
|
||||
sinkList.foreach(sink ⇒ broadcast ~> sink)
|
||||
|
||||
ClosedShape
|
||||
})
|
||||
|
||||
val matList: Seq[Future[String]] = g.run()
|
||||
//#graph-from-list
|
||||
|
||||
val result: Seq[String] = Await.result(Future.sequence(matList), 3.seconds)
|
||||
|
||||
result.size shouldBe 3
|
||||
result.head shouldBe "ax"
|
||||
result(1) shouldBe "bx"
|
||||
result(2) shouldBe "cx"
|
||||
}
|
||||
|
||||
"be possible to use with generated components if list has no tail" in {
|
||||
implicit val ex = materializer.system.dispatcher
|
||||
|
||||
val sinks = immutable.Seq(Sink.seq[Int])
|
||||
|
||||
val g: RunnableGraph[Seq[Future[immutable.Seq[Int]]]] = RunnableGraph.fromGraph(GraphDSL.create(sinks) { implicit b ⇒ sinkList ⇒
|
||||
val broadcast = b.add(Broadcast[Int](sinkList.size))
|
||||
|
||||
Source(List(1, 2, 3)) ~> broadcast
|
||||
sinkList.foreach(sink ⇒ broadcast ~> sink)
|
||||
|
||||
ClosedShape
|
||||
})
|
||||
|
||||
val matList: Seq[Future[immutable.Seq[Int]]] = g.run()
|
||||
|
||||
val result: Seq[immutable.Seq[Int]] = Await.result(Future.sequence(matList), 3.seconds)
|
||||
|
||||
result.size shouldBe 1
|
||||
result.foreach(_ shouldBe List(1, 2, 3))
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -4,11 +4,20 @@
|
|||
|
||||
package akka.stream.javadsl
|
||||
|
||||
import java.util
|
||||
|
||||
import akka.NotUsed
|
||||
import akka.stream._
|
||||
import akka.japi.{ Pair, function }
|
||||
import akka.util.ConstantFun
|
||||
|
||||
import scala.annotation.unchecked.uncheckedVariance
|
||||
import scala.collection.JavaConverters._
|
||||
import akka.stream.scaladsl.{ GenericGraph, GenericGraphWithChangedAttributes }
|
||||
import akka.stream.Attributes
|
||||
import akka.stream.impl.TraversalBuilder
|
||||
|
||||
import scala.collection.parallel.immutable
|
||||
|
||||
/**
|
||||
* Merge several streams, taking elements as they arrive from input streams
|
||||
|
|
@ -424,7 +433,34 @@ object GraphDSL extends GraphCreate {
|
|||
*/
|
||||
def builder[M](): Builder[M] = new Builder()(new scaladsl.GraphDSL.Builder[M])
|
||||
|
||||
final class Builder[+Mat]()(private implicit val delegate: scaladsl.GraphDSL.Builder[Mat]) { self ⇒
|
||||
/**
|
||||
* Creates a new [[Graph]] by importing the given graph list `graphs` and passing their [[Shape]]s
|
||||
* along with the [[GraphDSL.Builder]] to the given create function.
|
||||
*/
|
||||
|
||||
def create[IS <: Shape, S <: Shape, M, G <: Graph[IS, M]](
|
||||
graphs: java.util.List[G],
|
||||
buildBlock: function.Function2[GraphDSL.Builder[java.util.List[M]], java.util.List[IS], S]): Graph[S, java.util.List[M]] = {
|
||||
require(!graphs.isEmpty, "The input list must have one or more Graph elements")
|
||||
val gbuilder = builder[java.util.List[M]]()
|
||||
val toList = (m1: M) ⇒ new util.ArrayList(util.Arrays.asList(m1))
|
||||
val combine = (s: java.util.List[M], m2: M) ⇒ {
|
||||
val newList = new util.ArrayList(s)
|
||||
newList.add(m2)
|
||||
newList
|
||||
}
|
||||
val sListH = gbuilder.delegate.add(graphs.get(0), toList)
|
||||
val sListT = graphs.subList(1, graphs.size()).asScala.map(g ⇒ gbuilder.delegate.add(g, combine)).asJava
|
||||
val s = buildBlock(gbuilder, {
|
||||
val newList = new util.ArrayList[IS]
|
||||
newList.add(sListH)
|
||||
newList.addAll(sListT)
|
||||
newList
|
||||
})
|
||||
new GenericGraph(s, gbuilder.delegate.result(s))
|
||||
}
|
||||
|
||||
final class Builder[+Mat]()(private[stream] implicit val delegate: scaladsl.GraphDSL.Builder[Mat]) { self ⇒
|
||||
import akka.stream.scaladsl.GraphDSL.Implicits._
|
||||
|
||||
/**
|
||||
|
|
|
|||
|
|
@ -1257,6 +1257,22 @@ private[stream] final class OrElse[T] extends GraphStage[UniformFanInShape[T, T]
|
|||
|
||||
object GraphDSL extends GraphApply {
|
||||
|
||||
/**
|
||||
* Creates a new [[Graph]] by importing the given graph list `graphs` and passing their [[Shape]]s
|
||||
* along with the [[GraphDSL.Builder]] to the given create function.
|
||||
*/
|
||||
def create[S <: Shape, IS <: Shape, Mat](graphs: immutable.Seq[Graph[IS, Mat]])(buildBlock: GraphDSL.Builder[immutable.Seq[Mat]] ⇒ immutable.Seq[IS] ⇒ S): Graph[S, immutable.Seq[Mat]] = {
|
||||
require(graphs.nonEmpty, "The input list must have one or more Graph elements")
|
||||
val builder = new GraphDSL.Builder
|
||||
val toList = (m1: Mat) ⇒ Seq(m1)
|
||||
val combine = (s: Seq[Mat], m2: Mat) ⇒ s :+ m2
|
||||
val sListH = builder.add(graphs.head, toList)
|
||||
val sListT = graphs.tail.map(g ⇒ builder.add(g, combine))
|
||||
val s = buildBlock(builder)(immutable.Seq(sListH) ++ sListT)
|
||||
|
||||
new GenericGraph(s, builder.result(s))
|
||||
}
|
||||
|
||||
class Builder[+M] private[stream] () {
|
||||
private val unwiredIns = new mutable.HashSet[Inlet[_]]()
|
||||
private val unwiredOuts = new mutable.HashSet[Outlet[_]]()
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue