diff --git a/akka-docs/rst/java/code/docs/stream/FlowGraphDocTest.java b/akka-docs/rst/java/code/docs/stream/GraphDSLDocTest.java similarity index 94% rename from akka-docs/rst/java/code/docs/stream/FlowGraphDocTest.java rename to akka-docs/rst/java/code/docs/stream/GraphDSLDocTest.java index dd52232b04..ddb5149f14 100644 --- a/akka-docs/rst/java/code/docs/stream/FlowGraphDocTest.java +++ b/akka-docs/rst/java/code/docs/stream/GraphDSLDocTest.java @@ -27,14 +27,14 @@ import akka.stream.*; import akka.stream.javadsl.*; import akka.testkit.JavaTestKit; -public class FlowGraphDocTest extends AbstractJavaTest { +public class GraphDSLDocTest extends AbstractJavaTest { static ActorSystem system; static Materializer mat; @BeforeClass public static void setup() { - system = ActorSystem.create("FlowGraphDocTest"); + system = ActorSystem.create("GraphDSLDocTest"); mat = ActorMaterializer.create(system); } @@ -47,7 +47,7 @@ public class FlowGraphDocTest extends AbstractJavaTest { @Test public void demonstrateBuildSimpleGraph() throws Exception { - //#simple-flow-graph + //#simple-graph-dsl final Source in = Source.from(Arrays.asList(1, 2, 3, 4, 5)); final Sink, CompletionStage>> sink = Sink.head(); final Sink, CompletionStage>> sink2 = Sink.head(); @@ -72,7 +72,7 @@ public class FlowGraphDocTest extends AbstractJavaTest { builder.from(bcast).via(builder.add(f4)).toFanIn(merge); return ClosedShape.getInstance(); })); - //#simple-flow-graph + //#simple-graph-dsl final List list = result.run(mat).toCompletableFuture().get(3, TimeUnit.SECONDS); final String[] res = list.toArray(new String[] {}); Arrays.sort(res, null); @@ -107,7 +107,7 @@ public class FlowGraphDocTest extends AbstractJavaTest { @Test public void demonstrateReusingFlowInGraph() throws Exception { - //#flow-graph-reusing-a-flow + //#graph-dsl-reusing-a-flow final Sink> topHeadSink = Sink.head(); final Sink> bottomHeadSink = Sink.head(); final Flow sharedDoubler = Flow.of(Integer.class).map(elem -> elem * 2); @@ -129,7 +129,7 @@ public class FlowGraphDocTest extends AbstractJavaTest { } ) ); - //#flow-graph-reusing-a-flow + //#graph-dsl-reusing-a-flow final Pair, CompletionStage> pair = g.run(mat); assertEquals(Integer.valueOf(2), pair.first().toCompletableFuture().get(3, TimeUnit.SECONDS)); assertEquals(Integer.valueOf(2), pair.second().toCompletableFuture().get(3, TimeUnit.SECONDS)); @@ -137,7 +137,7 @@ public class FlowGraphDocTest extends AbstractJavaTest { @Test public void demonstrateMatValue() throws Exception { - //#flow-graph-matvalue + //#graph-dsl-matvalue final Sink> foldSink = Sink. fold(0, (a, b) -> { return a + b; }); @@ -152,9 +152,9 @@ public class FlowGraphDocTest extends AbstractJavaTest { fold.in(), b.from(b.materializedValue()).via(b.add(flatten)).out()); })); - //#flow-graph-matvalue + //#graph-dsl-matvalue - //#flow-graph-matvalue-cycle + //#graph-dsl-matvalue-cycle // This cannot produce any value: final Source> cyclicSource = Source.fromGraph( GraphDSL.create(foldSink, @@ -168,6 +168,6 @@ public class FlowGraphDocTest extends AbstractJavaTest { return SourceShape.of(b.from(b.materializedValue()).via(b.add(flatten)).out()); })); - //#flow-graph-matvalue-cycle + //#graph-dsl-matvalue-cycle } } diff --git a/akka-docs/rst/java/code/docs/stream/StreamPartialFlowGraphDocTest.java b/akka-docs/rst/java/code/docs/stream/StreamPartialGraphDSLDocTest.java similarity index 86% rename from akka-docs/rst/java/code/docs/stream/StreamPartialFlowGraphDocTest.java rename to akka-docs/rst/java/code/docs/stream/StreamPartialGraphDSLDocTest.java index a484685ffd..92e10ec74f 100644 --- a/akka-docs/rst/java/code/docs/stream/StreamPartialFlowGraphDocTest.java +++ b/akka-docs/rst/java/code/docs/stream/StreamPartialGraphDSLDocTest.java @@ -3,36 +3,35 @@ */ package docs.stream; -import static org.junit.Assert.assertEquals; - -import java.util.*; -import java.util.concurrent.CompletionStage; -import java.util.concurrent.TimeUnit; - import akka.Done; import akka.NotUsed; +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.japi.Pair; +import akka.stream.*; +import akka.stream.javadsl.*; +import akka.testkit.JavaTestKit; import docs.AbstractJavaTest; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; -import scala.concurrent.Await; -import scala.concurrent.Future; -import scala.concurrent.duration.Duration; -import akka.actor.*; -import akka.japi.Pair; -import akka.stream.*; -import akka.stream.javadsl.*; -import akka.testkit.JavaTestKit; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Iterator; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.TimeUnit; -public class StreamPartialFlowGraphDocTest extends AbstractJavaTest { +import static org.junit.Assert.assertEquals; + +public class StreamPartialGraphDSLDocTest extends AbstractJavaTest { static ActorSystem system; static Materializer mat; @BeforeClass public static void setup() { - system = ActorSystem.create("StreamPartialFlowGraphDocTest"); + system = ActorSystem.create("StreamPartialGraphDSLDocTest"); mat = ActorMaterializer.create(system); } @@ -45,7 +44,7 @@ public class StreamPartialFlowGraphDocTest extends AbstractJavaTest { @Test public void demonstrateBuildWithOpenPorts() throws Exception { - //#simple-partial-flow-graph + //#simple-partial-graph-dsl final Graph, NotUsed> zip = ZipWith.create((Integer left, Integer right) -> Math.max(left, right)); @@ -65,7 +64,7 @@ public class StreamPartialFlowGraphDocTest extends AbstractJavaTest { final RunnableGraph> g = RunnableGraph.>fromGraph( GraphDSL.create(resultSink, (builder, sink) -> { - // import the partial flow graph explicitly + // import the partial graph explicitly final UniformFanInShape pm = builder.add(pickMaxOfThree); builder.from(builder.add(Source.single(1))).toInlet(pm.in(0)); @@ -76,11 +75,11 @@ public class StreamPartialFlowGraphDocTest extends AbstractJavaTest { })); final CompletionStage max = g.run(mat); - //#simple-partial-flow-graph + //#simple-partial-graph-dsl assertEquals(Integer.valueOf(3), max.toCompletableFuture().get(3, TimeUnit.SECONDS)); } - //#source-from-partial-flow-graph + //#source-from-partial-graph-dsl // first create an indefinite source of integer numbers class Ints implements Iterator { private int next = 0; @@ -93,11 +92,11 @@ public class StreamPartialFlowGraphDocTest extends AbstractJavaTest { return next++; } } - //#source-from-partial-flow-graph + //#source-from-partial-graph-dsl @Test - public void demonstrateBuildSourceFromPartialFlowGraphCreate() throws Exception { - //#source-from-partial-flow-graph + public void demonstrateBuildSourceFromPartialGraphDSLCreate() throws Exception { + //#source-from-partial-graph-dsl final Source ints = Source.fromIterator(() -> new Ints()); final Source, NotUsed> pairs = Source.fromGraph( @@ -114,13 +113,13 @@ public class StreamPartialFlowGraphDocTest extends AbstractJavaTest { final CompletionStage> firstPair = pairs.runWith(Sink.>head(), mat); - //#source-from-partial-flow-graph + //#source-from-partial-graph-dsl assertEquals(new Pair<>(0, 1), firstPair.toCompletableFuture().get(3, TimeUnit.SECONDS)); } @Test - public void demonstrateBuildFlowFromPartialFlowGraphCreate() throws Exception { - //#flow-from-partial-flow-graph + public void demonstrateBuildFlowFromPartialGraphDSLCreate() throws Exception { + //#flow-from-partial-graph-dsl final Flow, NotUsed> pairs = Flow.fromGraph(GraphDSL.create( b -> { final UniformFanOutShape bcast = b.add(Broadcast.create(2)); @@ -133,11 +132,11 @@ public class StreamPartialFlowGraphDocTest extends AbstractJavaTest { return FlowShape.of(bcast.in(), zip.out()); })); - //#flow-from-partial-flow-graph + //#flow-from-partial-graph-dsl final CompletionStage> matSink = - //#flow-from-partial-flow-graph + //#flow-from-partial-graph-dsl Source.single(1).via(pairs).runWith(Sink.>head(), mat); - //#flow-from-partial-flow-graph + //#flow-from-partial-graph-dsl assertEquals(new Pair<>(1, "1"), matSink.toCompletableFuture().get(3, TimeUnit.SECONDS)); } diff --git a/akka-docs/rst/java/code/docs/stream/TwitterStreamQuickstartDocTest.java b/akka-docs/rst/java/code/docs/stream/TwitterStreamQuickstartDocTest.java index 8a15a1bc36..ab265202fd 100644 --- a/akka-docs/rst/java/code/docs/stream/TwitterStreamQuickstartDocTest.java +++ b/akka-docs/rst/java/code/docs/stream/TwitterStreamQuickstartDocTest.java @@ -275,10 +275,10 @@ public class TwitterStreamQuickstartDocTest extends AbstractJavaTest { } static abstract class HiddenDefinitions { - //#flow-graph-broadcast + //#graph-dsl-broadcast Sink writeAuthors; Sink writeHashtags; - //#flow-graph-broadcast + //#graph-dsl-broadcast } @Test @@ -286,7 +286,7 @@ public class TwitterStreamQuickstartDocTest extends AbstractJavaTest { final Sink> writeAuthors = Sink.ignore(); final Sink> writeHashtags = Sink.ignore(); - //#flow-graph-broadcast + //#graph-dsl-broadcast RunnableGraph.fromGraph(GraphDSL.create(b -> { final UniformFanOutShape bcast = b.add(Broadcast.create(2)); final FlowShape toAuthor = @@ -300,7 +300,7 @@ public class TwitterStreamQuickstartDocTest extends AbstractJavaTest { b.from(bcast).via(toTags).to(hashtags); return ClosedShape.getInstance(); })).run(mat); - //#flow-graph-broadcast + //#graph-dsl-broadcast } long slowComputation(Tweet t) { diff --git a/akka-docs/rst/java/stream/stream-graphs.rst b/akka-docs/rst/java/stream/stream-graphs.rst index 6ab59fe9fb..f7b5b3cab7 100644 --- a/akka-docs/rst/java/stream/stream-graphs.rst +++ b/akka-docs/rst/java/stream/stream-graphs.rst @@ -15,7 +15,7 @@ Some graph operations which are common enough and fit the linear style of Flows, streams, such that the second one is consumed after the first one has completed), may have shorthand methods defined on :class:`Flow` or :class:`Source` themselves, however you should keep in mind that those are also implemented as graph junctions. -.. _flow-graph-java: +.. _graph-dsl-java: Constructing Graphs ------------------- @@ -51,7 +51,7 @@ Such graph is simple to translate to the Graph DSL since each linear element cor and each circle corresponds to either a :class:`Junction` or a :class:`Source` or :class:`Sink` if it is beginning or ending a :class:`Flow`. -.. includecode:: ../code/docs/stream/FlowGraphDocTest.java#simple-flow-graph +.. includecode:: ../code/docs/stream/GraphDSLDocTest.java#simple-graph-dsl .. note:: Junction *reference equality* defines *graph node equality* (i.e. the same merge *instance* used in a GraphDSL @@ -75,9 +75,9 @@ In the example below we prepare a graph that consists of two parallel streams, in which we re-use the same instance of :class:`Flow`, yet it will properly be materialized as two connections between the corresponding Sources and Sinks: -.. includecode:: ../code/docs/stream/FlowGraphDocTest.java#flow-graph-reusing-a-flow +.. includecode:: ../code/docs/stream/GraphDSLDocTest.java#graph-dsl-reusing-a-flow -.. _partial-flow-graph-java: +.. _partial-graph-dsl-java: Constructing and combining Partial Graphs ----------------------------------------- @@ -97,7 +97,7 @@ Let's imagine we want to provide users with a specialized element that given 3 i the greatest int value of each zipped triple. We'll want to expose 3 input ports (unconnected sources) and one output port (unconnected sink). -.. includecode:: ../code/docs/stream/StreamPartialFlowGraphDocTest.java#simple-partial-flow-graph +.. includecode:: ../code/docs/stream/StreamPartialGraphDSLDocTest.java#simple-partial-graph-dsl As you can see, first we construct the partial graph that describes how to compute the maximum of two input streams, then we reuse that twice while constructing the partial graph that extends this to three input streams, @@ -136,12 +136,12 @@ be attached before this Source can run”. Refer to the example below, in which we create a Source that zips together two numbers, to see this graph construction in action: -.. includecode:: ../code/docs/stream/StreamPartialFlowGraphDocTest.java#source-from-partial-flow-graph +.. includecode:: ../code/docs/stream/StreamPartialGraphDSLDocTest.java#source-from-partial-graph-dsl Similarly the same can be done for a ``Sink`` using ``SinkShape.of`` in which case the provided value must be an ``Inlet``. For defining a ``Flow`` we need to expose both an undefined source and sink: -.. includecode:: ../code/docs/stream/StreamPartialFlowGraphDocTest.java#flow-from-partial-flow-graph +.. includecode:: ../code/docs/stream/StreamPartialGraphDSLDocTest.java#flow-from-partial-graph-dsl Combining Sources and Sinks with simplified API ----------------------------------------------- @@ -150,11 +150,11 @@ There is simplified API you can use to combine sources and sinks with junctions ``Merge`` and ``Concat`` without the need for using the Graph DSL. The combine method takes care of constructing the necessary graph underneath. In following example we combine two sources into one (fan-in): -.. includecode:: ../code/docs/stream/StreamPartialFlowGraphDocTest.java#source-combine +.. includecode:: ../code/docs/stream/StreamPartialGraphDSLDocTest.java#source-combine The same can be done for a ``Sink`` but in this case it will be fan-out: -.. includecode:: ../code/docs/stream/StreamPartialFlowGraphDocTest.java#sink-combine +.. includecode:: ../code/docs/stream/StreamPartialGraphDSLDocTest.java#sink-combine .. _bidi-flow-java: @@ -219,12 +219,12 @@ can be used in the graph as an ordinary source or outlet, and which will eventua If the materialized value is needed at more than one place, it is possible to call ``materializedValue`` any number of times to acquire the necessary number of outlets. -.. includecode:: ../code/docs/stream/FlowGraphDocTest.java#flow-graph-matvalue +.. includecode:: ../code/docs/stream/GraphDSLDocTest.java#graph-dsl-matvalue Be careful not to introduce a cycle where the materialized value actually contributes to the materialized value. The following example demonstrates a case where the materialized ``CompletionStage`` of a fold is fed back to the fold itself. -.. includecode:: ../code/docs/stream/FlowGraphDocTest.java#flow-graph-matvalue-cycle +.. includecode:: ../code/docs/stream/GraphDSLDocTest.java#graph-dsl-matvalue-cycle .. _graph-cycles-java: diff --git a/akka-docs/rst/java/stream/stream-quickstart.rst b/akka-docs/rst/java/stream/stream-quickstart.rst index 8794a124ac..fef64e9c55 100644 --- a/akka-docs/rst/java/stream/stream-quickstart.rst +++ b/akka-docs/rst/java/stream/stream-quickstart.rst @@ -243,7 +243,7 @@ at the expense of not reading as familiarly as collection transformations. Graphs are constructed using :class:`GraphDSL` like this: -.. includecode:: ../code/docs/stream/TwitterStreamQuickstartDocTest.java#flow-graph-broadcast +.. includecode:: ../code/docs/stream/TwitterStreamQuickstartDocTest.java#graph-dsl-broadcast As you can see, we use graph builder ``b`` to construct the graph using ``UniformFanOutShape`` and ``Flow`` s. @@ -257,7 +257,7 @@ Both :class:`Graph` and :class:`RunnableGraph` are *immutable, thread-safe, and A graph can also have one of several other shapes, with one or more unconnected ports. Having unconnected ports expresses a graph that is a *partial graph*. Concepts around composing and nesting graphs in large structures are explained in detail in :ref:`composition-java`. It is also possible to wrap complex computation graphs -as Flows, Sinks or Sources, which will be explained in detail in :ref:`partial-flow-graph-java`. +as Flows, Sinks or Sources, which will be explained in detail in :ref:`partial-graph-dsl-java`. Back-pressure in action diff --git a/akka-docs/rst/scala/code/docs/http/scaladsl/HttpServerExampleSpec.scala b/akka-docs/rst/scala/code/docs/http/scaladsl/HttpServerExampleSpec.scala index faaaa4f51d..9081c7d4c2 100644 --- a/akka-docs/rst/scala/code/docs/http/scaladsl/HttpServerExampleSpec.scala +++ b/akka-docs/rst/scala/code/docs/http/scaladsl/HttpServerExampleSpec.scala @@ -132,7 +132,7 @@ class HttpServerExampleSpec extends WordSpec with Matchers "connection-stream-failure-handling" in compileOnlySpec { import akka.actor.ActorSystem import akka.http.scaladsl.Http - import akka.http.scaladsl.model.{ContentTypes, HttpEntity} + import akka.http.scaladsl.model.{ ContentTypes, HttpEntity } import akka.stream.ActorMaterializer implicit val system = ActorSystem() diff --git a/akka-docs/rst/scala/code/docs/stream/FlowGraphDocSpec.scala b/akka-docs/rst/scala/code/docs/stream/GraphDSLDocSpec.scala similarity index 90% rename from akka-docs/rst/scala/code/docs/stream/FlowGraphDocSpec.scala rename to akka-docs/rst/scala/code/docs/stream/GraphDSLDocSpec.scala index 0b8398aa4d..6aa668352b 100644 --- a/akka-docs/rst/scala/code/docs/stream/FlowGraphDocSpec.scala +++ b/akka-docs/rst/scala/code/docs/stream/GraphDSLDocSpec.scala @@ -9,11 +9,10 @@ import akka.stream.scaladsl._ import akka.testkit.AkkaSpec import scala.collection.immutable -import scala.concurrent.{ Future, Await } import scala.concurrent.duration._ -import akka.stream.Attributes +import scala.concurrent.{ Await, Future } -class FlowGraphDocSpec extends AkkaSpec { +class GraphDSLDocSpec extends AkkaSpec { implicit val ec = system.dispatcher @@ -21,7 +20,7 @@ class FlowGraphDocSpec extends AkkaSpec { "build simple graph" in { //format: OFF - //#simple-flow-graph + //#simple-graph-dsl val g = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] => import GraphDSL.Implicits._ val in = Source(1 to 10) @@ -36,7 +35,7 @@ class FlowGraphDocSpec extends AkkaSpec { bcast ~> f4 ~> merge ClosedShape }) - //#simple-flow-graph + //#simple-graph-dsl //format: ON //#simple-graph-run @@ -64,17 +63,17 @@ class FlowGraphDocSpec extends AkkaSpec { } "reusing a flow in a graph" in { - //#flow-graph-reusing-a-flow + //#graph-dsl-reusing-a-flow val topHeadSink = Sink.head[Int] val bottomHeadSink = Sink.head[Int] val sharedDoubler = Flow[Int].map(_ * 2) - //#flow-graph-reusing-a-flow + //#graph-dsl-reusing-a-flow // format: OFF val g = - //#flow-graph-reusing-a-flow + //#graph-dsl-reusing-a-flow RunnableGraph.fromGraph(GraphDSL.create(topHeadSink, bottomHeadSink)((_, _)) { implicit builder => (topHS, bottomHS) => import GraphDSL.Implicits._ @@ -85,7 +84,7 @@ class FlowGraphDocSpec extends AkkaSpec { broadcast.out(1) ~> sharedDoubler ~> bottomHS.in ClosedShape }) - //#flow-graph-reusing-a-flow + //#graph-dsl-reusing-a-flow // format: ON val (topFuture, bottomFuture) = g.run() Await.result(topFuture, 300.millis) shouldEqual 2 @@ -94,7 +93,7 @@ class FlowGraphDocSpec extends AkkaSpec { "building a reusable component" in { - //#flow-graph-components-shape + //#graph-dsl-components-shape // A shape represents the input and output ports of a reusable // processing module case class PriorityWorkerPoolShape[In, Out]( @@ -126,9 +125,9 @@ class FlowGraphDocSpec extends AkkaSpec { PriorityWorkerPoolShape[In, Out](inlets(0).as[In], inlets(1).as[In], outlets(0).as[Out]) } } - //#flow-graph-components-shape + //#graph-dsl-components-shape - //#flow-graph-components-create + //#graph-dsl-components-create object PriorityWorkerPool { def apply[In, Out]( worker: Flow[In, Out, Any], @@ -161,11 +160,11 @@ class FlowGraphDocSpec extends AkkaSpec { } } - //#flow-graph-components-create + //#graph-dsl-components-create def println(s: Any): Unit = () - //#flow-graph-components-use + //#graph-dsl-components-use val worker1 = Flow[String].map("step 1 " + _) val worker2 = Flow[String].map("step 2 " + _) @@ -184,11 +183,10 @@ class FlowGraphDocSpec extends AkkaSpec { priorityPool2.resultsOut ~> Sink.foreach(println) ClosedShape }).run() - //#flow-graph-components-use + //#graph-dsl-components-use - //#flow-graph-components-shape2 - import FanInShape.Name - import FanInShape.Init + //#graph-dsl-components-shape2 + import FanInShape.{ Init, Name } class PriorityWorkerPoolShape2[In, Out](_init: Init[Out] = Name("PriorityWorkerPool")) extends FanInShape[Out](_init) { @@ -198,23 +196,23 @@ class FlowGraphDocSpec extends AkkaSpec { val priorityJobsIn = newInlet[In]("priorityJobsIn") // Outlet[Out] with name "out" is automatically created } - //#flow-graph-components-shape2 + //#graph-dsl-components-shape2 } "access to materialized value" in { - //#flow-graph-matvalue + //#graph-dsl-matvalue import GraphDSL.Implicits._ val foldFlow: Flow[Int, Int, Future[Int]] = Flow.fromGraph(GraphDSL.create(Sink.fold[Int, Int](0)(_ + _)) { implicit builder ⇒ fold ⇒ FlowShape(fold.in, builder.materializedValue.mapAsync(4)(identity).outlet) }) - //#flow-graph-matvalue + //#graph-dsl-matvalue Await.result(Source(1 to 10).via(foldFlow).runWith(Sink.head), 3.seconds) should ===(55) - //#flow-graph-matvalue-cycle + //#graph-dsl-matvalue-cycle import GraphDSL.Implicits._ // This cannot produce any value: val cyclicFold: Source[Int, Future[Int]] = Source.fromGraph(GraphDSL.create(Sink.fold[Int, Int](0)(_ + _)) { @@ -228,7 +226,7 @@ class FlowGraphDocSpec extends AkkaSpec { builder.materializedValue.mapAsync(4)(identity) ~> fold SourceShape(builder.materializedValue.mapAsync(4)(identity).outlet) }) - //#flow-graph-matvalue-cycle + //#graph-dsl-matvalue-cycle } } diff --git a/akka-docs/rst/scala/code/docs/stream/StreamPartialFlowGraphDocSpec.scala b/akka-docs/rst/scala/code/docs/stream/StreamPartialGraphDSLDocSpec.scala similarity index 88% rename from akka-docs/rst/scala/code/docs/stream/StreamPartialFlowGraphDocSpec.scala rename to akka-docs/rst/scala/code/docs/stream/StreamPartialGraphDSLDocSpec.scala index cff0bedff7..ba18bcaced 100644 --- a/akka-docs/rst/scala/code/docs/stream/StreamPartialFlowGraphDocSpec.scala +++ b/akka-docs/rst/scala/code/docs/stream/StreamPartialGraphDSLDocSpec.scala @@ -11,14 +11,14 @@ import akka.testkit.AkkaSpec import scala.concurrent.{ Await, Future } import scala.concurrent.duration._ -class StreamPartialFlowGraphDocSpec extends AkkaSpec { +class StreamPartialGraphDSLDocSpec extends AkkaSpec { implicit val ec = system.dispatcher implicit val materializer = ActorMaterializer() "build with open ports" in { - //#simple-partial-flow-graph + //#simple-partial-graph-dsl val pickMaxOfThree = GraphDSL.create() { implicit b => import GraphDSL.Implicits._ @@ -47,11 +47,11 @@ class StreamPartialFlowGraphDocSpec extends AkkaSpec { val max: Future[Int] = g.run() Await.result(max, 300.millis) should equal(3) - //#simple-partial-flow-graph + //#simple-partial-graph-dsl } - "build source from partial flow graph" in { - //#source-from-partial-flow-graph + "build source from partial graph" in { + //#source-from-partial-graph-dsl val pairs = Source.fromGraph(GraphDSL.create() { implicit b => import GraphDSL.Implicits._ @@ -68,12 +68,12 @@ class StreamPartialFlowGraphDocSpec extends AkkaSpec { }) val firstPair: Future[(Int, Int)] = pairs.runWith(Sink.head) - //#source-from-partial-flow-graph + //#source-from-partial-graph-dsl Await.result(firstPair, 300.millis) should equal(1 -> 2) } - "build flow from partial flow graph" in { - //#flow-from-partial-flow-graph + "build flow from partial graph" in { + //#flow-from-partial-graph-dsl val pairUpWithToString = Flow.fromGraph(GraphDSL.create() { implicit b => import GraphDSL.Implicits._ @@ -90,13 +90,13 @@ class StreamPartialFlowGraphDocSpec extends AkkaSpec { FlowShape(broadcast.in, zip.out) }) - //#flow-from-partial-flow-graph + //#flow-from-partial-graph-dsl // format: OFF val (_, matSink: Future[(Int, String)]) = - //#flow-from-partial-flow-graph + //#flow-from-partial-graph-dsl pairUpWithToString.runWith(Source(List(1)), Sink.head) - //#flow-from-partial-flow-graph + //#flow-from-partial-graph-dsl // format: ON Await.result(matSink, 300.millis) should equal(1 -> "1") diff --git a/akka-docs/rst/scala/code/docs/stream/TwitterStreamQuickstartDocSpec.scala b/akka-docs/rst/scala/code/docs/stream/TwitterStreamQuickstartDocSpec.scala index 818368a044..cd90d71466 100644 --- a/akka-docs/rst/scala/code/docs/stream/TwitterStreamQuickstartDocSpec.scala +++ b/akka-docs/rst/scala/code/docs/stream/TwitterStreamQuickstartDocSpec.scala @@ -104,10 +104,10 @@ class TwitterStreamQuickstartDocSpec extends AkkaSpec { } trait HiddenDefinitions { - //#flow-graph-broadcast + //#graph-dsl-broadcast val writeAuthors: Sink[Author, Unit] = ??? val writeHashtags: Sink[Hashtag, Unit] = ??? - //#flow-graph-broadcast + //#graph-dsl-broadcast } "simple broadcast" in { @@ -115,7 +115,7 @@ class TwitterStreamQuickstartDocSpec extends AkkaSpec { val writeHashtags: Sink[Hashtag, Future[Done]] = Sink.ignore // format: OFF - //#flow-graph-broadcast + //#graph-dsl-broadcast val g = RunnableGraph.fromGraph(GraphDSL.create() { implicit b => import GraphDSL.Implicits._ @@ -126,7 +126,7 @@ class TwitterStreamQuickstartDocSpec extends AkkaSpec { ClosedShape }) g.run() - //#flow-graph-broadcast + //#graph-dsl-broadcast // format: ON } diff --git a/akka-docs/rst/scala/code/docs/stream/cookbook/RecipeCollectingMetrics.scala b/akka-docs/rst/scala/code/docs/stream/cookbook/RecipeCollectingMetrics.scala index 37d98f339d..84c7b7a552 100644 --- a/akka-docs/rst/scala/code/docs/stream/cookbook/RecipeCollectingMetrics.scala +++ b/akka-docs/rst/scala/code/docs/stream/cookbook/RecipeCollectingMetrics.scala @@ -24,7 +24,7 @@ class RecipeCollectingMetrics extends RecipeSpec { // val currentLoad = loadUpdates.transform(() => new HoldWithWait) // // val graph = GraphDSL { implicit builder => - // import FlowGraphImplicits._ + // import GraphDSLImplicits._ // val collector = ZipWith[Int, Tick, String]( // (load: Int, tick: Tick) => s"current load is $load") // diff --git a/akka-docs/rst/scala/stream/stream-graphs.rst b/akka-docs/rst/scala/stream/stream-graphs.rst index 37b6dcf8be..d9b9f6bf55 100644 --- a/akka-docs/rst/scala/stream/stream-graphs.rst +++ b/akka-docs/rst/scala/stream/stream-graphs.rst @@ -15,7 +15,7 @@ Some graph operations which are common enough and fit the linear style of Flows, streams, such that the second one is consumed after the first one has completed), may have shorthand methods defined on :class:`Flow` or :class:`Source` themselves, however you should keep in mind that those are also implemented as graph junctions. -.. _flow-graph-scala: +.. _graph-dsl-scala: Constructing Graphs ------------------- @@ -52,7 +52,7 @@ and each circle corresponds to either a :class:`Junction` or a :class:`Source` o or ending a :class:`Flow`. Junctions must always be created with defined type parameters, as otherwise the ``Nothing`` type will be inferred. -.. includecode:: ../code/docs/stream/FlowGraphDocSpec.scala#simple-flow-graph +.. includecode:: ../code/docs/stream/GraphDSLDocSpec.scala#simple-graph-dsl .. note:: Junction *reference equality* defines *graph node equality* (i.e. the same merge *instance* used in a GraphDSL @@ -80,9 +80,9 @@ In the example below we prepare a graph that consists of two parallel streams, in which we re-use the same instance of :class:`Flow`, yet it will properly be materialized as two connections between the corresponding Sources and Sinks: -.. includecode:: ../code/docs/stream/FlowGraphDocSpec.scala#flow-graph-reusing-a-flow +.. includecode:: ../code/docs/stream/GraphDSLDocSpec.scala#graph-dsl-reusing-a-flow -.. _partial-flow-graph-scala: +.. _partial-graph-dsl-scala: Constructing and combining Partial Graphs ----------------------------------------- @@ -103,7 +103,7 @@ Let's imagine we want to provide users with a specialized element that given 3 i the greatest int value of each zipped triple. We'll want to expose 3 input ports (unconnected sources) and one output port (unconnected sink). -.. includecode:: ../code/docs/stream/StreamPartialFlowGraphDocSpec.scala#simple-partial-flow-graph +.. includecode:: ../code/docs/stream/StreamPartialGraphDSLDocSpec.scala#simple-partial-graph-dsl As you can see, first we construct the partial graph that contains all the zipping and comparing of stream elements. This partial graph will have three inputs and one output, wherefore we use the :class:`UniformFanInShape`. @@ -143,12 +143,12 @@ from the function passed in . The single outlet must be provided to the ``Source Refer to the example below, in which we create a Source that zips together two numbers, to see this graph construction in action: -.. includecode:: ../code/docs/stream/StreamPartialFlowGraphDocSpec.scala#source-from-partial-flow-graph +.. includecode:: ../code/docs/stream/StreamPartialGraphDSLDocSpec.scala#source-from-partial-graph-dsl Similarly the same can be done for a ``Sink[T]``, using ``SinkShape.of`` in which case the provided value must be an ``Inlet[T]``. For defining a ``Flow[T]`` we need to expose both an inlet and an outlet: -.. includecode:: ../code/docs/stream/StreamPartialFlowGraphDocSpec.scala#flow-from-partial-flow-graph +.. includecode:: ../code/docs/stream/StreamPartialGraphDSLDocSpec.scala#flow-from-partial-graph-dsl Combining Sources and Sinks with simplified API ----------------------------------------------- @@ -157,11 +157,11 @@ There is a simplified API you can use to combine sources and sinks with junction ``Merge[In]`` and ``Concat[A]`` without the need for using the Graph DSL. The combine method takes care of constructing the necessary graph underneath. In following example we combine two sources into one (fan-in): -.. includecode:: ../code/docs/stream/StreamPartialFlowGraphDocSpec.scala#source-combine +.. includecode:: ../code/docs/stream/StreamPartialGraphDSLDocSpec.scala#source-combine The same can be done for a ``Sink[T]`` but in this case it will be fan-out: -.. includecode:: ../code/docs/stream/StreamPartialFlowGraphDocSpec.scala#sink-combine +.. includecode:: ../code/docs/stream/StreamPartialGraphDSLDocSpec.scala#sink-combine Building reusable Graph components ---------------------------------- @@ -178,7 +178,7 @@ where jobs of higher priority can be sent. Altogether, our junction will have two input ports of type ``I`` (for the normal and priority jobs) and an output port of type ``O``. To represent this interface, we need to define a custom :class:`Shape`. The following lines show how to do that. -.. includecode:: ../code/docs/stream/FlowGraphDocSpec.scala#flow-graph-components-shape +.. includecode:: ../code/docs/stream/GraphDSLDocSpec.scala#graph-dsl-components-shape .. _predefined-shapes: @@ -198,20 +198,20 @@ boilerplate: Since our shape has two input ports and one output port, we can just use the :class:`FanInShape` DSL to define our custom shape: -.. includecode:: ../code/docs/stream/FlowGraphDocSpec.scala#flow-graph-components-shape2 +.. includecode:: ../code/docs/stream/GraphDSLDocSpec.scala#graph-dsl-components-shape2 Now that we have a :class:`Shape` we can wire up a Graph that represents our worker pool. First, we will merge incoming normal and priority jobs using ``MergePreferred``, then we will send the jobs to a ``Balance`` junction which will fan-out to a configurable number of workers (flows), finally we merge all these results together and send them out through our only output port. This is expressed by the following code: -.. includecode:: ../code/docs/stream/FlowGraphDocSpec.scala#flow-graph-components-create +.. includecode:: ../code/docs/stream/GraphDSLDocSpec.scala#graph-dsl-components-create All we need to do now is to use our custom junction in a graph. The following code simulates some simple workers and jobs using plain strings and prints out the results. Actually we used *two* instances of our worker pool junction using ``add()`` twice. -.. includecode:: ../code/docs/stream/FlowGraphDocSpec.scala#flow-graph-components-use +.. includecode:: ../code/docs/stream/GraphDSLDocSpec.scala#graph-dsl-components-use .. _bidi-flow-scala: @@ -276,12 +276,12 @@ can be used in the graph as an ordinary source or outlet, and which will eventua If the materialized value is needed at more than one place, it is possible to call ``materializedValue`` any number of times to acquire the necessary number of outlets. -.. includecode:: ../code/docs/stream/FlowGraphDocSpec.scala#flow-graph-matvalue +.. includecode:: ../code/docs/stream/GraphDSLDocSpec.scala#graph-dsl-matvalue Be careful not to introduce a cycle where the materialized value actually contributes to the materialized value. The following example demonstrates a case where the materialized ``Future`` of a fold is fed back to the fold itself. -.. includecode:: ../code/docs/stream/FlowGraphDocSpec.scala#flow-graph-matvalue-cycle +.. includecode:: ../code/docs/stream/GraphDSLDocSpec.scala#graph-dsl-matvalue-cycle .. _graph-cycles-scala: diff --git a/akka-docs/rst/scala/stream/stream-quickstart.rst b/akka-docs/rst/scala/stream/stream-quickstart.rst index 3dfc2afadd..aa2171c539 100644 --- a/akka-docs/rst/scala/stream/stream-quickstart.rst +++ b/akka-docs/rst/scala/stream/stream-quickstart.rst @@ -238,7 +238,7 @@ at the expense of not reading as familiarly as collection transformations. Graphs are constructed using :class:`GraphDSL` like this: -.. includecode:: ../code/docs/stream/TwitterStreamQuickstartDocSpec.scala#flow-graph-broadcast +.. includecode:: ../code/docs/stream/TwitterStreamQuickstartDocSpec.scala#graph-dsl-broadcast As you can see, inside the :class:`GraphDSL` we use an implicit graph builder ``b`` to mutably construct the graph using the ``~>`` "edge operator" (also read as "connect" or "via" or "to"). The operator is provided implicitly diff --git a/akka-http-tests/src/test/java/akka/http/javadsl/server/directives/CodingDirectivesTest.java b/akka-http-tests/src/test/java/akka/http/javadsl/server/directives/CodingDirectivesTest.java index 8fb9c33df6..c34ceb54f0 100644 --- a/akka-http-tests/src/test/java/akka/http/javadsl/server/directives/CodingDirectivesTest.java +++ b/akka-http-tests/src/test/java/akka/http/javadsl/server/directives/CodingDirectivesTest.java @@ -25,7 +25,7 @@ public class CodingDirectivesTest extends JUnitRouteTest { @BeforeClass public static void setup() { - system = ActorSystem.create("FlowGraphDocTest"); + system = ActorSystem.create("GraphDSLDocTest"); } @AfterClass diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowGraphTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/GraphDSLTest.java similarity index 91% rename from akka-stream-tests/src/test/java/akka/stream/javadsl/FlowGraphTest.java rename to akka-stream-tests/src/test/java/akka/stream/javadsl/GraphDSLTest.java index 4373ccf093..41f5a71567 100644 --- a/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowGraphTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/GraphDSLTest.java @@ -27,13 +27,13 @@ import java.util.concurrent.CompletionStage; import java.util.concurrent.TimeUnit; import static org.junit.Assert.assertEquals; -public class FlowGraphTest extends StreamTest { - public FlowGraphTest() { +public class GraphDSLTest extends StreamTest { + public GraphDSLTest() { super(actorSystemResource); } @ClassRule - public static AkkaJUnitActorSystemResource actorSystemResource = new AkkaJUnitActorSystemResource("FlowGraphTest", + public static AkkaJUnitActorSystemResource actorSystemResource = new AkkaJUnitActorSystemResource("GraphDSLTest", AkkaSpec.testConf()); @SuppressWarnings("serial") @@ -59,12 +59,12 @@ public class FlowGraphTest extends StreamTest { @Test public void mustBeAbleToUseMerge() throws Exception { final Flow f1 = - Flow.of(String.class).transform(FlowGraphTest.this. op()).named("f1"); + Flow.of(String.class).transform(GraphDSLTest.this. op()).named("f1"); final Flow f2 = - Flow.of(String.class).transform(FlowGraphTest.this. op()).named("f2"); + Flow.of(String.class).transform(GraphDSLTest.this. op()).named("f2"); @SuppressWarnings("unused") final Flow f3 = - Flow.of(String.class).transform(FlowGraphTest.this. op()).named("f3"); + Flow.of(String.class).transform(GraphDSLTest.this. op()).named("f3"); final Source in1 = Source.from(Arrays.asList("a", "b", "c")); final Source in2 = Source.from(Arrays.asList("d", "e", "f")); @@ -138,8 +138,8 @@ public class FlowGraphTest extends StreamTest { final SourceShape> in = b.add(Source.from(input)); final FanOutShape2, String, Integer> unzip = b.add(Unzip.create()); - final SinkShape out1 = b.add(FlowGraphTest.createSink(probe1)); - final SinkShape out2 = b.add(FlowGraphTest.createSink(probe2)); + final SinkShape out1 = b.add(GraphDSLTest.createSink(probe1)); + final SinkShape out2 = b.add(GraphDSLTest.createSink(probe2)); b.from(in).toInlet(unzip.in()); b.from(unzip.out0()).to(out1); @@ -178,8 +178,8 @@ public class FlowGraphTest extends StreamTest { }) ); - final SinkShape out1 = b.add(FlowGraphTest.createSink(probe1)); - final SinkShape out2 = b.add(FlowGraphTest.createSink(probe2)); + final SinkShape out1 = b.add(GraphDSLTest.createSink(probe1)); + final SinkShape out2 = b.add(GraphDSLTest.createSink(probe2)); b.from(b.add(in)).toInlet(unzip.in()); b.from(unzip.out0()).to(out1); @@ -221,10 +221,10 @@ public class FlowGraphTest extends StreamTest { }) ); - final SinkShape out1 = b.add(FlowGraphTest.createSink(probe1)); - final SinkShape out2 = b.add(FlowGraphTest.createSink(probe2)); - final SinkShape out3 = b.add(FlowGraphTest.createSink(probe3)); - final SinkShape out4 = b.add(FlowGraphTest.createSink(probe4)); + final SinkShape out1 = b.add(GraphDSLTest.createSink(probe1)); + final SinkShape out2 = b.add(GraphDSLTest.createSink(probe2)); + final SinkShape out3 = b.add(GraphDSLTest.createSink(probe3)); + final SinkShape out4 = b.add(GraphDSLTest.createSink(probe4)); b.from(b.add(in)).toInlet(unzip.in()); b.from(unzip.out0()).to(out1); diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphBackedFlowSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphBackedFlowSpec.scala index 6de24d32a4..ee3e41a9a0 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphBackedFlowSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphBackedFlowSpec.scala @@ -58,7 +58,7 @@ class GraphFlowSpec extends AkkaSpec { } - "FlowGraphs" when { + "GraphDSLs" when { "turned into flows" should { "work with a Source and Sink" in { val probe = TestSubscriber.manualProbe[Int]() diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGraphCompileSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphDSLCompileSpec.scala similarity index 99% rename from akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGraphCompileSpec.scala rename to akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphDSLCompileSpec.scala index a2aa2d7318..496d4cbbcc 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGraphCompileSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphDSLCompileSpec.scala @@ -8,13 +8,13 @@ import akka.stream.testkit._ import akka.stream.stage._ import akka.testkit.AkkaSpec -object FlowGraphCompileSpec { +object GraphDSLCompileSpec { class Fruit class Apple extends Fruit } -class FlowGraphCompileSpec extends AkkaSpec { - import FlowGraphCompileSpec._ +class GraphDSLCompileSpec extends AkkaSpec { + import GraphDSLCompileSpec._ implicit val materializer = ActorMaterializer() diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphOpsIntegrationSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphOpsIntegrationSpec.scala index 3589a2a1b0..d5cc067fa8 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphOpsIntegrationSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphOpsIntegrationSpec.scala @@ -52,7 +52,7 @@ class GraphOpsIntegrationSpec extends AkkaSpec { implicit val materializer = ActorMaterializer(settings) - "FlowGraphs" must { + "GraphDSLs" must { "support broadcast - merge layouts" in { val resultFuture = RunnableGraph.fromGraph(GraphDSL.create(Sink.head[Seq[Int]]) { implicit b ⇒ diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphPartialSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphPartialSpec.scala index 276081b61c..4e82b11829 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphPartialSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphPartialSpec.scala @@ -13,7 +13,7 @@ class GraphPartialSpec extends AkkaSpec { implicit val materializer = ActorMaterializer(settings) - "FlowFlowGraph.partial" must { + "GraphDSL.partial" must { import GraphDSL.Implicits._ "be able to build and reuse simple partial graphs" in { diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Graph.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Graph.scala index f531a7b134..a8d11b31b9 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Graph.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Graph.scala @@ -302,8 +302,6 @@ object Concat { } -// flow graph // - object GraphDSL extends GraphCreate { /**