/** * Copyright (C) 2015-2016 Lightbend Inc. */ package docs.stream; import static org.junit.Assert.*; import java.util.Arrays; import java.util.List; import java.util.concurrent.CompletionStage; import java.util.concurrent.TimeUnit; import akka.NotUsed; import akka.stream.ClosedShape; import akka.stream.SourceShape; import docs.AbstractJavaTest; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; import scala.concurrent.Await; import scala.concurrent.Future; import scala.concurrent.duration.Duration; import akka.actor.ActorSystem; import akka.japi.Pair; import akka.stream.*; import akka.stream.javadsl.*; import akka.testkit.JavaTestKit; public class GraphDSLDocTest extends AbstractJavaTest { static ActorSystem system; static Materializer mat; @BeforeClass public static void setup() { system = ActorSystem.create("GraphDSLDocTest"); mat = ActorMaterializer.create(system); } @AfterClass public static void tearDown() { JavaTestKit.shutdownActorSystem(system); system = null; mat = null; } @Test public void demonstrateBuildSimpleGraph() throws Exception { //#simple-graph-dsl final Source in = Source.from(Arrays.asList(1, 2, 3, 4, 5)); final Sink, CompletionStage>> sink = Sink.head(); final Flow f1 = Flow.of(Integer.class).map(elem -> elem + 10); final Flow f2 = Flow.of(Integer.class).map(elem -> elem + 20); final Flow f3 = Flow.of(Integer.class).map(elem -> elem.toString()); final Flow f4 = Flow.of(Integer.class).map(elem -> elem + 30); final RunnableGraph>> result = RunnableGraph.fromGraph( GraphDSL // create() function binds sink, out which is sink's out port and builder DSL .create( // we need to reference out's shape in the builder DSL below (in to() function) sink, // previously created sink (Sink) (builder, out) -> { // variables: builder (GraphDSL.Builder) and out (SinkShape) final UniformFanOutShape bcast = builder.add(Broadcast.create(2)); final UniformFanInShape merge = builder.add(Merge.create(2)); final Outlet source = builder.add(in).out(); builder.from(source).via(builder.add(f1)) .viaFanOut(bcast).via(builder.add(f2)).viaFanIn(merge) .via(builder.add(f3.grouped(1000))).to(out); // to() expects a SinkShape builder.from(bcast).via(builder.add(f4)).toFanIn(merge); return ClosedShape.getInstance(); })); //#simple-graph-dsl final List list = result.run(mat).toCompletableFuture().get(3, TimeUnit.SECONDS); final String[] res = list.toArray(new String[] {}); Arrays.sort(res, null); assertArrayEquals(new String[] { "31", "32", "33", "34", "35", "41", "42", "43", "44", "45" }, res); } @Test @SuppressWarnings("unused") public void demonstrateConnectErrors() { try { //#simple-graph final RunnableGraph g = RunnableGraph.fromGraph( GraphDSL .create((b) -> { final SourceShape source1 = b.add(Source.from(Arrays.asList(1, 2, 3, 4, 5))); final SourceShape source2 = b.add(Source.from(Arrays.asList(1, 2, 3, 4, 5))); final FanInShape2> zip = b.add(Zip.create()); b.from(source1).toInlet(zip.in0()); b.from(source2).toInlet(zip.in1()); return ClosedShape.getInstance(); } ) ); // unconnected zip.out (!) => "The inlets [] and outlets [] must correspond to the inlets [] and outlets [ZipWith2.out]" //#simple-graph fail("expected IllegalArgumentException"); } catch (IllegalArgumentException e) { assertTrue(e != null && e.getMessage() != null && e.getMessage().contains("must correspond to")); } } @Test public void demonstrateReusingFlowInGraph() throws Exception { //#graph-dsl-reusing-a-flow final Sink> topHeadSink = Sink.head(); final Sink> bottomHeadSink = Sink.head(); final Flow sharedDoubler = Flow.of(Integer.class).map(elem -> elem * 2); final RunnableGraph, CompletionStage>> g = RunnableGraph., CompletionStage>>fromGraph( GraphDSL.create( topHeadSink, // import this sink into the graph bottomHeadSink, // and this as well Keep.both(), (b, top, bottom) -> { final UniformFanOutShape bcast = b.add(Broadcast.create(2)); b.from(b.add(Source.single(1))).viaFanOut(bcast) .via(b.add(sharedDoubler)).to(top); b.from(bcast).via(b.add(sharedDoubler)).to(bottom); return ClosedShape.getInstance(); } ) ); //#graph-dsl-reusing-a-flow final Pair, CompletionStage> pair = g.run(mat); assertEquals(Integer.valueOf(2), pair.first().toCompletableFuture().get(3, TimeUnit.SECONDS)); assertEquals(Integer.valueOf(2), pair.second().toCompletableFuture().get(3, TimeUnit.SECONDS)); } @Test public void demonstrateMatValue() throws Exception { //#graph-dsl-matvalue final Sink> foldSink = Sink. fold(0, (a, b) -> { return a + b; }); final Flow, Integer, NotUsed> flatten = Flow.>create().mapAsync(4, x -> x); final Flow> foldingFlow = Flow.fromGraph( GraphDSL.create(foldSink, (b, fold) -> { return FlowShape.of( fold.in(), b.from(b.materializedValue()).via(b.add(flatten)).out()); })); //#graph-dsl-matvalue //#graph-dsl-matvalue-cycle // This cannot produce any value: final Source> cyclicSource = Source.fromGraph( GraphDSL.create(foldSink, (b, fold) -> { // - Fold cannot complete until its upstream mapAsync completes // - mapAsync cannot complete until the materialized Future produced by // fold completes // As a result this Source will never emit anything, and its materialited // Future will never complete b.from(b.materializedValue()).via(b.add(flatten)).to(fold); return SourceShape.of(b.from(b.materializedValue()).via(b.add(flatten)).out()); })); //#graph-dsl-matvalue-cycle } }