diff --git a/akka-docs/src/main/paradox/stream/stream-graphs.md b/akka-docs/src/main/paradox/stream/stream-graphs.md index d7576e4e90..cf680f39eb 100644 --- a/akka-docs/src/main/paradox/stream/stream-graphs.md +++ b/akka-docs/src/main/paradox/stream/stream-graphs.md @@ -51,7 +51,7 @@ Scala : @@snip [GraphDSLDocSpec.scala]($code$/scala/docs/stream/GraphDSLDocSpec.scala) { #simple-graph-dsl } Java -: @@snip [GraphDSLDocTest.java]($code$/java/jdocs/stream/GraphDSLDocTest.java) { #simple-graph-dsl } +: @@snip [GraphDSLTest.java]($akka$/akka-stream-tests/src/test/java/akka/stream/javadsl/GraphDslTest.java) { #simple-graph-dsl } @@@ note @@ -86,7 +86,17 @@ Scala : @@snip [GraphDSLDocSpec.scala]($code$/scala/docs/stream/GraphDSLDocSpec.scala) { #graph-dsl-reusing-a-flow } Java -: @@snip [GraphDSLDocTest.java]($code$/java/jdocs/stream/GraphDSLDocTest.java) { #graph-dsl-reusing-a-flow } +: @@snip [GraphDSLTest.java]($akka$/akka-stream-tests/src/test/java/akka/stream/javadsl/GraphDslTest.java) { #graph-dsl-reusing-a-flow } + +In some cases we may have a list of graph elements, for example if they are dynamically created. +If these graphs have similar signatures, we can construct a graph collecting all their materialized values as a collection: + +Scala +: @@snip [GraphOpsIntegrationSpec.scala]($akka$/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphOpsIntegrationSpec.scala) { #graph-from-list } + +Java +: @@snip [GraphDSLTest.java]($akka$/akka-stream-tests/src/test/java/akka/stream/javadsl/GraphDslTest.java) { #graph-from-list } + ## Constructing and combining Partial Graphs @@ -344,7 +354,7 @@ Scala : @@snip [GraphDSLDocSpec.scala]($code$/scala/docs/stream/GraphDSLDocSpec.scala) { #graph-dsl-matvalue } Java -: @@snip [GraphDSLDocTest.java]($code$/java/jdocs/stream/GraphDSLDocTest.java) { #graph-dsl-matvalue } +: @@snip [GraphDSLTest.java]($akka$/akka-stream-tests/src/test/java/akka/stream/javadsl/GraphDslTest.java) { #graph-dsl-matvalue } Be careful not to introduce a cycle where the materialized value actually contributes to the materialized value. @@ -354,7 +364,7 @@ Scala : @@snip [GraphDSLDocSpec.scala]($code$/scala/docs/stream/GraphDSLDocSpec.scala) { #graph-dsl-matvalue-cycle } Java -: @@snip [GraphDSLDocTest.java]($code$/java/jdocs/stream/GraphDSLDocTest.java) { #graph-dsl-matvalue-cycle } +: @@snip [GraphDSLTest.java]($akka$/akka-stream-tests/src/test/java/akka/stream/javadsl/GraphDslTest.java) { #graph-dsl-matvalue-cycle } diff --git a/akka-docs/src/test/java/jdocs/stream/GraphDSLDocTest.java b/akka-docs/src/test/java/jdocs/stream/GraphDSLDocTest.java deleted file mode 100644 index 0ead4676ed..0000000000 --- a/akka-docs/src/test/java/jdocs/stream/GraphDSLDocTest.java +++ /dev/null @@ -1,170 +0,0 @@ -/** - * Copyright (C) 2015-2018 Lightbend Inc. - */ - -package jdocs.stream; - -import static org.junit.Assert.*; - -import java.util.Arrays; -import java.util.List; -import java.util.concurrent.CompletionStage; -import java.util.concurrent.TimeUnit; - -import akka.NotUsed; -import akka.stream.ClosedShape; -import akka.stream.SourceShape; -import jdocs.AbstractJavaTest; -import akka.testkit.javadsl.TestKit; -import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.junit.Test; - -import akka.actor.ActorSystem; -import akka.japi.Pair; -import akka.stream.*; -import akka.stream.javadsl.*; - -public class GraphDSLDocTest extends AbstractJavaTest { - - static ActorSystem system; - static Materializer mat; - - @BeforeClass - public static void setup() { - system = ActorSystem.create("GraphDSLDocTest"); - mat = ActorMaterializer.create(system); - } - - @AfterClass - public static void tearDown() { - TestKit.shutdownActorSystem(system); - system = null; - mat = null; - } - - @Test - public void demonstrateBuildSimpleGraph() throws Exception { - //#simple-graph-dsl - final Source in = Source.from(Arrays.asList(1, 2, 3, 4, 5)); - final Sink, CompletionStage>> sink = Sink.head(); - final Flow f1 = Flow.of(Integer.class).map(elem -> elem + 10); - final Flow f2 = Flow.of(Integer.class).map(elem -> elem + 20); - final Flow f3 = Flow.of(Integer.class).map(elem -> elem.toString()); - final Flow f4 = Flow.of(Integer.class).map(elem -> elem + 30); - - final RunnableGraph>> result = - RunnableGraph.fromGraph( - GraphDSL // create() function binds sink, out which is sink's out port and builder DSL - .create( // we need to reference out's shape in the builder DSL below (in to() function) - sink, // previously created sink (Sink) - (builder, out) -> { // variables: builder (GraphDSL.Builder) and out (SinkShape) - final UniformFanOutShape bcast = builder.add(Broadcast.create(2)); - final UniformFanInShape merge = builder.add(Merge.create(2)); - - final Outlet source = builder.add(in).out(); - builder.from(source).via(builder.add(f1)) - .viaFanOut(bcast).via(builder.add(f2)).viaFanIn(merge) - .via(builder.add(f3.grouped(1000))).to(out); // to() expects a SinkShape - builder.from(bcast).via(builder.add(f4)).toFanIn(merge); - return ClosedShape.getInstance(); - })); - //#simple-graph-dsl - final List list = result.run(mat).toCompletableFuture().get(3, TimeUnit.SECONDS); - final String[] res = list.toArray(new String[] {}); - Arrays.sort(res, null); - assertArrayEquals(new String[] { "31", "32", "33", "34", "35", "41", "42", "43", "44", "45" }, res); - } - - @Test - @SuppressWarnings("unused") - public void demonstrateConnectErrors() { - try { - //#simple-graph - final RunnableGraph g = - RunnableGraph.fromGraph( - GraphDSL - .create((b) -> { - final SourceShape source1 = b.add(Source.from(Arrays.asList(1, 2, 3, 4, 5))); - final SourceShape source2 = b.add(Source.from(Arrays.asList(1, 2, 3, 4, 5))); - final FanInShape2> zip = b.add(Zip.create()); - b.from(source1).toInlet(zip.in0()); - b.from(source2).toInlet(zip.in1()); - return ClosedShape.getInstance(); - } - ) - ); - // unconnected zip.out (!) => "The inlets [] and outlets [] must correspond to the inlets [] and outlets [ZipWith2.out]" - //#simple-graph - org.junit.Assert.fail("expected IllegalArgumentException"); - } catch (IllegalStateException e) { - assertTrue(e != null && e.getMessage() != null && e.getMessage().contains("ZipWith2.out")); - } - } - - @Test - public void demonstrateReusingFlowInGraph() throws Exception { - //#graph-dsl-reusing-a-flow - final Sink> topHeadSink = Sink.head(); - final Sink> bottomHeadSink = Sink.head(); - final Flow sharedDoubler = Flow.of(Integer.class).map(elem -> elem * 2); - - final RunnableGraph, CompletionStage>> g = - RunnableGraph., CompletionStage>>fromGraph( - GraphDSL.create( - topHeadSink, // import this sink into the graph - bottomHeadSink, // and this as well - Keep.both(), - (b, top, bottom) -> { - final UniformFanOutShape bcast = - b.add(Broadcast.create(2)); - - b.from(b.add(Source.single(1))).viaFanOut(bcast) - .via(b.add(sharedDoubler)).to(top); - b.from(bcast).via(b.add(sharedDoubler)).to(bottom); - return ClosedShape.getInstance(); - } - ) - ); - //#graph-dsl-reusing-a-flow - final Pair, CompletionStage> pair = g.run(mat); - assertEquals(Integer.valueOf(2), pair.first().toCompletableFuture().get(3, TimeUnit.SECONDS)); - assertEquals(Integer.valueOf(2), pair.second().toCompletableFuture().get(3, TimeUnit.SECONDS)); - } - - @Test - public void demonstrateMatValue() throws Exception { - //#graph-dsl-matvalue - final Sink> foldSink = Sink. fold(0, (a, b) -> { - return a + b; - }); - - final Flow, Integer, NotUsed> flatten = - Flow.>create().mapAsync(4, x -> x); - - final Flow> foldingFlow = Flow.fromGraph( - GraphDSL.create(foldSink, - (b, fold) -> { - return FlowShape.of( - fold.in(), - b.from(b.materializedValue()).via(b.add(flatten)).out()); - })); - //#graph-dsl-matvalue - - //#graph-dsl-matvalue-cycle - // This cannot produce any value: - final Source> cyclicSource = Source.fromGraph( - GraphDSL.create(foldSink, - (b, fold) -> { - // - Fold cannot complete until its upstream mapAsync completes - // - mapAsync cannot complete until the materialized Future produced by - // fold completes - // As a result this Source will never emit anything, and its materialited - // Future will never complete - b.from(b.materializedValue()).via(b.add(flatten)).to(fold); - return SourceShape.of(b.from(b.materializedValue()).via(b.add(flatten)).out()); - })); - - //#graph-dsl-matvalue-cycle - } -} diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/GraphDslTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/GraphDslTest.java new file mode 100644 index 0000000000..6943e5820e --- /dev/null +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/GraphDslTest.java @@ -0,0 +1,201 @@ +/* + * Copyright (C) 2018 Lightbend Inc. + */ + +package akka.stream.javadsl; + +import akka.NotUsed; +import akka.japi.Pair; +import akka.stream.*; +import akka.testkit.AkkaJUnitActorSystemResource; +import akka.testkit.AkkaSpec; +import org.junit.ClassRule; +import org.junit.Test; +import scala.collection.Seq; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class GraphDslTest extends StreamTest { + public GraphDslTest() { + super(actorSystemResource); + } + + @ClassRule + public static AkkaJUnitActorSystemResource actorSystemResource = new AkkaJUnitActorSystemResource("GraphDslTest", + AkkaSpec.testConf()); + + @Test + public void demonstrateBuildSimpleGraph() throws Exception { + //#simple-graph-dsl + final Source in = Source.from(Arrays.asList(1, 2, 3, 4, 5)); + final Sink, CompletionStage>> sink = Sink.head(); + final Flow f1 = Flow.of(Integer.class).map(elem -> elem + 10); + final Flow f2 = Flow.of(Integer.class).map(elem -> elem + 20); + final Flow f3 = Flow.of(Integer.class).map(elem -> elem.toString()); + final Flow f4 = Flow.of(Integer.class).map(elem -> elem + 30); + + final RunnableGraph>> result = + RunnableGraph.fromGraph( + GraphDSL // create() function binds sink, out which is sink's out port and builder DSL + .create( // we need to reference out's shape in the builder DSL below (in to() function) + sink, // previously created sink (Sink) + (builder, out) -> { // variables: builder (GraphDSL.Builder) and out (SinkShape) + final UniformFanOutShape bcast = builder.add(Broadcast.create(2)); + final UniformFanInShape merge = builder.add(Merge.create(2)); + + final Outlet source = builder.add(in).out(); + builder.from(source).via(builder.add(f1)) + .viaFanOut(bcast).via(builder.add(f2)).viaFanIn(merge) + .via(builder.add(f3.grouped(1000))).to(out); // to() expects a SinkShape + builder.from(bcast).via(builder.add(f4)).toFanIn(merge); + return ClosedShape.getInstance(); + })); + //#simple-graph-dsl + final List list = result.run(materializer).toCompletableFuture().get(3, TimeUnit.SECONDS); + final String[] res = list.toArray(new String[]{}); + Arrays.sort(res, null); + assertArrayEquals(new String[]{"31", "32", "33", "34", "35", "41", "42", "43", "44", "45"}, res); + } + + @Test + @SuppressWarnings("unused") + public void demonstrateConnectErrors() { + try { + //#simple-graph + final RunnableGraph g = + RunnableGraph.fromGraph( + GraphDSL + .create((b) -> { + final SourceShape source1 = b.add(Source.from(Arrays.asList(1, 2, 3, 4, 5))); + final SourceShape source2 = b.add(Source.from(Arrays.asList(1, 2, 3, 4, 5))); + final FanInShape2> zip = b.add(Zip.create()); + b.from(source1).toInlet(zip.in0()); + b.from(source2).toInlet(zip.in1()); + return ClosedShape.getInstance(); + } + ) + ); + // unconnected zip.out (!) => "The inlets [] and outlets [] must correspond to the inlets [] and outlets [ZipWith2.out]" + //#simple-graph + org.junit.Assert.fail("expected IllegalArgumentException"); + } catch (IllegalStateException e) { + assertTrue(e != null && e.getMessage() != null && e.getMessage().contains("ZipWith2.out")); + } + } + + @Test + public void demonstrateReusingFlowInGraph() throws Exception { + //#graph-dsl-reusing-a-flow + final Sink> topHeadSink = Sink.head(); + final Sink> bottomHeadSink = Sink.head(); + final Flow sharedDoubler = Flow.of(Integer.class).map(elem -> elem * 2); + + final RunnableGraph, CompletionStage>> g = + RunnableGraph., CompletionStage>>fromGraph( + GraphDSL.create( + topHeadSink, // import this sink into the graph + bottomHeadSink, // and this as well + Keep.both(), + (b, top, bottom) -> { + final UniformFanOutShape bcast = + b.add(Broadcast.create(2)); + + b.from(b.add(Source.single(1))).viaFanOut(bcast) + .via(b.add(sharedDoubler)).to(top); + b.from(bcast).via(b.add(sharedDoubler)).to(bottom); + return ClosedShape.getInstance(); + } + ) + ); + //#graph-dsl-reusing-a-flow + final Pair, CompletionStage> pair = g.run(materializer); + assertEquals(Integer.valueOf(2), pair.first().toCompletableFuture().get(3, TimeUnit.SECONDS)); + assertEquals(Integer.valueOf(2), pair.second().toCompletableFuture().get(3, TimeUnit.SECONDS)); + } + + @Test + public void demonstrateMatValue() throws Exception { + //#graph-dsl-matvalue + final Sink> foldSink = Sink.fold(0, (a, b) -> { + return a + b; + }); + + final Flow, Integer, NotUsed> flatten = + Flow.>create().mapAsync(4, x -> x); + + final Flow> foldingFlow = Flow.fromGraph( + GraphDSL.create(foldSink, + (b, fold) -> { + return FlowShape.of( + fold.in(), + b.from(b.materializedValue()).via(b.add(flatten)).out()); + })); + //#graph-dsl-matvalue + + //#graph-dsl-matvalue-cycle + // This cannot produce any value: + final Source> cyclicSource = Source.fromGraph( + GraphDSL.create(foldSink, + (b, fold) -> { + // - Fold cannot complete until its upstream mapAsync completes + // - mapAsync cannot complete until the materialized Future produced by + // fold completes + // As a result this Source will never emit anything, and its materialited + // Future will never complete + b.from(b.materializedValue()).via(b.add(flatten)).to(fold); + return SourceShape.of(b.from(b.materializedValue()).via(b.add(flatten)).out()); + })); + + //#graph-dsl-matvalue-cycle + } + + @Test + public void beAbleToConstructClosedGraphFromList() throws Exception { + //#graph-from-list + //create the source + final Source in = Source.from(Arrays.asList("ax", "bx", "cx")); + //generate the sinks from code + List prefixes = Arrays.asList("a", "b", "c"); + final List>> list = new ArrayList<>(); + for (String prefix : prefixes) { + final Sink> sink = + Flow.of(String.class).filter(str -> str.startsWith(prefix)).toMat(Sink.head(), Keep.right()); + list.add(sink); + } + + final RunnableGraph>> g = RunnableGraph.fromGraph( + GraphDSL.create( + list, + (GraphDSL.Builder>> builder, List> outs) -> { + final UniformFanOutShape bcast = builder.add(Broadcast.create(outs.size())); + + final Outlet source = builder.add(in).out(); + builder.from(source).viaFanOut(bcast); + + for (SinkShape sink : outs) { + builder.from(bcast).to(sink); + } + + return ClosedShape.getInstance(); + })); + List> result = g.run(materializer); + //#graph-from-list + + assertEquals(3, result.size()); + assertEquals("ax", result.get(0).toCompletableFuture().get(1, TimeUnit.SECONDS)); + assertEquals("bx", result.get(1).toCompletableFuture().get(1, TimeUnit.SECONDS)); + assertEquals("cx", result.get(2).toCompletableFuture().get(1, TimeUnit.SECONDS)); + + + } + +} diff --git a/akka-stream-tests/src/test/scala/akka/stream/DslFactoriesConsistencySpec.scala b/akka-stream-tests/src/test/scala/akka/stream/DslFactoriesConsistencySpec.scala index eb56435826..aff7e28e03 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/DslFactoriesConsistencySpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/DslFactoriesConsistencySpec.scala @@ -28,20 +28,21 @@ class DslFactoriesConsistencySpec extends WordSpec with Matchers { // format: OFF val `scala -> java types` = - (classOf[scala.collection.immutable.Iterable[_]], classOf[java.lang.Iterable[_]]) :: - (classOf[scala.collection.Iterator[_]], classOf[java.util.Iterator[_]]) :: - (classOf[scala.collection.Seq[_]], classOf[java.util.List[_]]) :: - (classOf[scala.collection.immutable.Seq[_]], classOf[java.util.List[_]]) :: - (classOf[scala.collection.immutable.Set[_]], classOf[java.util.Set[_]]) :: - (classOf[Boolean], classOf[akka.stream.javadsl.AsPublisher]) :: - (classOf[scala.Function0[_]], classOf[akka.japi.function.Creator[_]]) :: - (classOf[scala.Function0[_]], classOf[java.util.concurrent.Callable[_]]) :: - (classOf[scala.Function1[_, Unit]], classOf[akka.japi.function.Procedure[_]]) :: - (classOf[scala.Function1[_, _]], classOf[akka.japi.function.Function[_, _]]) :: - (classOf[akka.stream.scaladsl.Source[_, _]], classOf[akka.stream.javadsl.Source[_, _]]) :: - (classOf[akka.stream.scaladsl.Sink[_, _]], classOf[akka.stream.javadsl.Sink[_, _]]) :: - (classOf[akka.stream.scaladsl.Flow[_, _, _]], classOf[akka.stream.javadsl.Flow[_, _, _]]) :: - (classOf[akka.stream.scaladsl.RunnableGraph[_]], classOf[akka.stream.javadsl.RunnableGraph[_]]) :: + (classOf[scala.collection.immutable.Iterable[_]], classOf[java.lang.Iterable[_]]) :: + (classOf[scala.collection.Iterator[_]], classOf[java.util.Iterator[_]]) :: + (classOf[scala.collection.Seq[_]], classOf[java.util.List[_]]) :: + (classOf[scala.collection.immutable.Seq[_]], classOf[java.util.List[_]]) :: + (classOf[scala.collection.immutable.Set[_]], classOf[java.util.Set[_]]) :: + (classOf[Boolean], classOf[akka.stream.javadsl.AsPublisher]) :: + (classOf[scala.Function0[_]], classOf[akka.japi.function.Creator[_]]) :: + (classOf[scala.Function0[_]], classOf[java.util.concurrent.Callable[_]]) :: + (classOf[scala.Function1[_, Unit]], classOf[akka.japi.function.Procedure[_]]) :: + (classOf[scala.Function1[_, _]], classOf[akka.japi.function.Function[_, _]]) :: + (classOf[scala.Function1[scala.Function1[_, _], _]], classOf[akka.japi.function.Function2[_, _, _]]) :: + (classOf[akka.stream.scaladsl.Source[_, _]], classOf[akka.stream.javadsl.Source[_, _]]) :: + (classOf[akka.stream.scaladsl.Sink[_, _]], classOf[akka.stream.javadsl.Sink[_, _]]) :: + (classOf[akka.stream.scaladsl.Flow[_, _, _]], classOf[akka.stream.javadsl.Flow[_, _, _]]) :: + (classOf[akka.stream.scaladsl.RunnableGraph[_]], classOf[akka.stream.javadsl.RunnableGraph[_]]) :: ((2 to 22) map { i => (Class.forName(s"scala.Function$i"), Class.forName(s"akka.japi.function.Function$i")) }).toList // format: ON diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphOpsIntegrationSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphOpsIntegrationSpec.scala index 06eb103399..0493dd3bda 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphOpsIntegrationSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphOpsIntegrationSpec.scala @@ -188,6 +188,56 @@ class GraphOpsIntegrationSpec extends StreamSpec { result.toSet should ===(Set(4, 5, 6, 13, 14, 15)) } + "be possible to use with generated components" in { + implicit val ex = materializer.system.dispatcher + + //#graph-from-list + val sinks = immutable.Seq("a", "b", "c").map(prefix ⇒ + Flow[String].filter(str ⇒ str.startsWith(prefix)).toMat(Sink.head[String])(Keep.right) + ) + + val g: RunnableGraph[Seq[Future[String]]] = RunnableGraph.fromGraph(GraphDSL.create(sinks) { implicit b ⇒ sinkList ⇒ + val broadcast = b.add(Broadcast[String](sinkList.size)) + + Source(List("ax", "bx", "cx")) ~> broadcast + sinkList.foreach(sink ⇒ broadcast ~> sink) + + ClosedShape + }) + + val matList: Seq[Future[String]] = g.run() + //#graph-from-list + + val result: Seq[String] = Await.result(Future.sequence(matList), 3.seconds) + + result.size shouldBe 3 + result.head shouldBe "ax" + result(1) shouldBe "bx" + result(2) shouldBe "cx" + } + + "be possible to use with generated components if list has no tail" in { + implicit val ex = materializer.system.dispatcher + + val sinks = immutable.Seq(Sink.seq[Int]) + + val g: RunnableGraph[Seq[Future[immutable.Seq[Int]]]] = RunnableGraph.fromGraph(GraphDSL.create(sinks) { implicit b ⇒ sinkList ⇒ + val broadcast = b.add(Broadcast[Int](sinkList.size)) + + Source(List(1, 2, 3)) ~> broadcast + sinkList.foreach(sink ⇒ broadcast ~> sink) + + ClosedShape + }) + + val matList: Seq[Future[immutable.Seq[Int]]] = g.run() + + val result: Seq[immutable.Seq[Int]] = Await.result(Future.sequence(matList), 3.seconds) + + result.size shouldBe 1 + result.foreach(_ shouldBe List(1, 2, 3)) + } + } } diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Graph.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Graph.scala index 3e6bb18cd1..ee9a818e5a 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Graph.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Graph.scala @@ -4,11 +4,20 @@ package akka.stream.javadsl +import java.util + import akka.NotUsed import akka.stream._ import akka.japi.{ Pair, function } import akka.util.ConstantFun + import scala.annotation.unchecked.uncheckedVariance +import scala.collection.JavaConverters._ +import akka.stream.scaladsl.{ GenericGraph, GenericGraphWithChangedAttributes } +import akka.stream.Attributes +import akka.stream.impl.TraversalBuilder + +import scala.collection.parallel.immutable /** * Merge several streams, taking elements as they arrive from input streams @@ -424,7 +433,34 @@ object GraphDSL extends GraphCreate { */ def builder[M](): Builder[M] = new Builder()(new scaladsl.GraphDSL.Builder[M]) - final class Builder[+Mat]()(private implicit val delegate: scaladsl.GraphDSL.Builder[Mat]) { self ⇒ + /** + * Creates a new [[Graph]] by importing the given graph list `graphs` and passing their [[Shape]]s + * along with the [[GraphDSL.Builder]] to the given create function. + */ + + def create[IS <: Shape, S <: Shape, M, G <: Graph[IS, M]]( + graphs: java.util.List[G], + buildBlock: function.Function2[GraphDSL.Builder[java.util.List[M]], java.util.List[IS], S]): Graph[S, java.util.List[M]] = { + require(!graphs.isEmpty, "The input list must have one or more Graph elements") + val gbuilder = builder[java.util.List[M]]() + val toList = (m1: M) ⇒ new util.ArrayList(util.Arrays.asList(m1)) + val combine = (s: java.util.List[M], m2: M) ⇒ { + val newList = new util.ArrayList(s) + newList.add(m2) + newList + } + val sListH = gbuilder.delegate.add(graphs.get(0), toList) + val sListT = graphs.subList(1, graphs.size()).asScala.map(g ⇒ gbuilder.delegate.add(g, combine)).asJava + val s = buildBlock(gbuilder, { + val newList = new util.ArrayList[IS] + newList.add(sListH) + newList.addAll(sListT) + newList + }) + new GenericGraph(s, gbuilder.delegate.result(s)) + } + + final class Builder[+Mat]()(private[stream] implicit val delegate: scaladsl.GraphDSL.Builder[Mat]) { self ⇒ import akka.stream.scaladsl.GraphDSL.Implicits._ /** diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala index ebb09362e2..69aff32d6c 100755 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala @@ -1257,6 +1257,22 @@ private[stream] final class OrElse[T] extends GraphStage[UniformFanInShape[T, T] object GraphDSL extends GraphApply { + /** + * Creates a new [[Graph]] by importing the given graph list `graphs` and passing their [[Shape]]s + * along with the [[GraphDSL.Builder]] to the given create function. + */ + def create[S <: Shape, IS <: Shape, Mat](graphs: immutable.Seq[Graph[IS, Mat]])(buildBlock: GraphDSL.Builder[immutable.Seq[Mat]] ⇒ immutable.Seq[IS] ⇒ S): Graph[S, immutable.Seq[Mat]] = { + require(graphs.nonEmpty, "The input list must have one or more Graph elements") + val builder = new GraphDSL.Builder + val toList = (m1: Mat) ⇒ Seq(m1) + val combine = (s: Seq[Mat], m2: Mat) ⇒ s :+ m2 + val sListH = builder.add(graphs.head, toList) + val sListT = graphs.tail.map(g ⇒ builder.add(g, combine)) + val s = buildBlock(builder)(immutable.Seq(sListH) ++ sListT) + + new GenericGraph(s, builder.result(s)) + } + class Builder[+M] private[stream] () { private val unwiredIns = new mutable.HashSet[Inlet[_]]() private val unwiredOuts = new mutable.HashSet[Outlet[_]]()