!str #19037 rename FlowGraph to GraphDSL

This commit is contained in:
Roland Kuhn 2015-11-30 15:45:37 +01:00
parent 5895834d98
commit f00da4daac
92 changed files with 535 additions and 542 deletions

View file

@ -51,14 +51,14 @@ class MigrationsScala extends AkkaSpec {
//#bidiflow-wrap
//#graph-create
// Replaces FlowGraph.closed()
FlowGraph.create() { builder =>
// Replaces GraphDSL.closed()
GraphDSL.create() { builder =>
//...
ClosedShape
}
// Replaces FlowGraph.partial()
FlowGraph.create() { builder =>
// Replaces GraphDSL.partial()
GraphDSL.create() { builder =>
//...
FlowShape(inlet, outlet)
}
@ -66,25 +66,25 @@ class MigrationsScala extends AkkaSpec {
//#graph-create-2
Source.fromGraph(
FlowGraph.create() { builder =>
GraphDSL.create() { builder =>
//...
SourceShape(outlet)
})
Sink.fromGraph(
FlowGraph.create() { builder =>
GraphDSL.create() { builder =>
//...
SinkShape(inlet)
})
Flow.fromGraph(
FlowGraph.create() { builder =>
GraphDSL.create() { builder =>
//...
FlowShape(inlet, outlet)
})
BidiFlow.fromGraph(
FlowGraph.create() { builder =>
GraphDSL.create() { builder =>
//...
BidiShape(inlet1, outlet1, inlet2, outlet2)
})
@ -92,8 +92,8 @@ class MigrationsScala extends AkkaSpec {
//#graph-edges
RunnableGraph.fromGraph(
FlowGraph.create() { implicit builder =>
import FlowGraph.Implicits._
GraphDSL.create() { implicit builder =>
import GraphDSL.Implicits._
outlet ~> inlet
outlet ~> flow ~> inlet
//...

View file

@ -44,7 +44,7 @@ object BidiFlowDocSpec {
}
//#codec-impl
val codecVerbose = BidiFlow.fromGraph(FlowGraph.create() { b =>
val codecVerbose = BidiFlow.fromGraph(GraphDSL.create() { b =>
// construct and add the top flow, going outbound
val outbound = b.add(Flow[Message].map(toBytes))
// construct and add the bottom flow, going inbound
@ -58,7 +58,7 @@ object BidiFlowDocSpec {
//#codec
//#framing
val framing = BidiFlow.fromGraph(FlowGraph.create() { b =>
val framing = BidiFlow.fromGraph(GraphDSL.create() { b =>
implicit val order = ByteOrder.LITTLE_ENDIAN
def addLengthHeader(bytes: ByteString) = {
@ -116,12 +116,12 @@ object BidiFlowDocSpec {
})
//#framing
val chopUp = BidiFlow.fromGraph(FlowGraph.create() { b =>
val chopUp = BidiFlow.fromGraph(GraphDSL.create() { b =>
val f = Flow[ByteString].mapConcat(_.map(ByteString(_)))
BidiShape.fromFlows(b.add(f), b.add(f))
})
val accumulate = BidiFlow.fromGraph(FlowGraph.create() { b =>
val accumulate = BidiFlow.fromGraph(GraphDSL.create() { b =>
val f = Flow[ByteString].grouped(1000).map(_.fold(ByteString.empty)(_ ++ _))
BidiShape.fromFlows(b.add(f), b.add(f))
})

View file

@ -76,8 +76,8 @@ class CompositionDocSpec extends AkkaSpec {
"complex graph" in {
// format: OFF
//#complex-graph
import FlowGraph.Implicits._
RunnableGraph.fromGraph(FlowGraph.create() { implicit builder =>
import GraphDSL.Implicits._
RunnableGraph.fromGraph(GraphDSL.create() { implicit builder =>
val A: Outlet[Int] = builder.add(Source.single(0)).outlet
val B: UniformFanOutShape[Int, Int] = builder.add(Broadcast[Int](2))
val C: UniformFanInShape[Int, Int] = builder.add(Merge[Int](2))
@ -96,8 +96,8 @@ class CompositionDocSpec extends AkkaSpec {
//#complex-graph
//#complex-graph-alt
import FlowGraph.Implicits._
RunnableGraph.fromGraph(FlowGraph.create() { implicit builder =>
import GraphDSL.Implicits._
RunnableGraph.fromGraph(GraphDSL.create() { implicit builder =>
val B = builder.add(Broadcast[Int](2))
val C = builder.add(Merge[Int](2))
val E = builder.add(Balance[Int](2))
@ -117,8 +117,8 @@ class CompositionDocSpec extends AkkaSpec {
"partial graph" in {
// format: OFF
//#partial-graph
import FlowGraph.Implicits._
val partial = FlowGraph.create() { implicit builder =>
import GraphDSL.Implicits._
val partial = GraphDSL.create() { implicit builder =>
val B = builder.add(Broadcast[Int](2))
val C = builder.add(Merge[Int](2))
val E = builder.add(Balance[Int](2))
@ -143,7 +143,7 @@ class CompositionDocSpec extends AkkaSpec {
val flow = Flow.fromGraph(partial)
// Simple way to create a graph backed Source
val source = Source.fromGraph( FlowGraph.create() { implicit builder =>
val source = Source.fromGraph( GraphDSL.create() { implicit builder =>
val merge = builder.add(Merge[Int](2))
Source.single(0) ~> merge
Source(List(2, 3, 4)) ~> merge
@ -167,7 +167,7 @@ class CompositionDocSpec extends AkkaSpec {
"closed graph" in {
//#embed-closed
val closed1 = Source.single(0).to(Sink.foreach(println))
val closed2 = RunnableGraph.fromGraph(FlowGraph.create() { implicit builder =>
val closed2 = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder =>
val embeddedClosed: ClosedShape = builder.add(closed1)
//
embeddedClosed

View file

@ -148,9 +148,9 @@ class FlowDocSpec extends AkkaSpec {
"various ways of transforming materialized values" in {
import scala.concurrent.duration._
val throttler = Flow.fromGraph(FlowGraph.create(Source.tick(1.second, 1.second, "test")) { implicit builder =>
val throttler = Flow.fromGraph(GraphDSL.create(Source.tick(1.second, 1.second, "test")) { implicit builder =>
tickSource =>
import FlowGraph.Implicits._
import GraphDSL.Implicits._
val zip = builder.add(ZipWith[String, Int, Int](Keep.right))
tickSource ~> zip.in0
FlowShape(zip.in1, zip.out)
@ -211,9 +211,9 @@ class FlowDocSpec extends AkkaSpec {
// The result of r11 can be also achieved by using the Graph API
val r12: RunnableGraph[(Promise[Option[Int]], Cancellable, Future[Int])] =
RunnableGraph.fromGraph(FlowGraph.create(source, flow, sink)((_, _, _)) { implicit builder =>
RunnableGraph.fromGraph(GraphDSL.create(source, flow, sink)((_, _, _)) { implicit builder =>
(src, f, dst) =>
import FlowGraph.Implicits._
import GraphDSL.Implicits._
src ~> f ~> dst
ClosedShape
})

View file

@ -21,8 +21,8 @@ class FlowGraphDocSpec extends AkkaSpec {
"build simple graph" in {
//format: OFF
//#simple-flow-graph
val g = RunnableGraph.fromGraph(FlowGraph.create() { implicit builder: FlowGraph.Builder[Unit] =>
import FlowGraph.Implicits._
val g = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder: GraphDSL.Builder[Unit] =>
import GraphDSL.Implicits._
val in = Source(1 to 10)
val out = Sink.ignore
@ -46,8 +46,8 @@ class FlowGraphDocSpec extends AkkaSpec {
"flow connection errors" in {
intercept[IllegalArgumentException] {
//#simple-graph
RunnableGraph.fromGraph(FlowGraph.create() { implicit builder =>
import FlowGraph.Implicits._
RunnableGraph.fromGraph(GraphDSL.create() { implicit builder =>
import GraphDSL.Implicits._
val source1 = Source(1 to 10)
val source2 = Source(1 to 10)
@ -74,9 +74,9 @@ class FlowGraphDocSpec extends AkkaSpec {
// format: OFF
val g =
//#flow-graph-reusing-a-flow
RunnableGraph.fromGraph(FlowGraph.create(topHeadSink, bottomHeadSink)((_, _)) { implicit builder =>
RunnableGraph.fromGraph(GraphDSL.create(topHeadSink, bottomHeadSink)((_, _)) { implicit builder =>
(topHS, bottomHS) =>
import FlowGraph.Implicits._
import GraphDSL.Implicits._
val broadcast = builder.add(Broadcast[Int](2))
Source.single(1) ~> broadcast.in
@ -133,8 +133,8 @@ class FlowGraphDocSpec extends AkkaSpec {
worker: Flow[In, Out, Any],
workerCount: Int): Graph[PriorityWorkerPoolShape[In, Out], Unit] = {
FlowGraph.create() { implicit b
import FlowGraph.Implicits._
GraphDSL.create() { implicit b
import GraphDSL.Implicits._
val priorityMerge = b.add(MergePreferred[In](1))
val balance = b.add(Balance[In](workerCount))
@ -168,8 +168,8 @@ class FlowGraphDocSpec extends AkkaSpec {
val worker1 = Flow[String].map("step 1 " + _)
val worker2 = Flow[String].map("step 2 " + _)
RunnableGraph.fromGraph(FlowGraph.create() { implicit b =>
import FlowGraph.Implicits._
RunnableGraph.fromGraph(GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._
val priorityPool1 = b.add(PriorityWorkerPool(worker1, 4))
val priorityPool2 = b.add(PriorityWorkerPool(worker2, 2))
@ -203,8 +203,8 @@ class FlowGraphDocSpec extends AkkaSpec {
"access to materialized value" in {
//#flow-graph-matvalue
import FlowGraph.Implicits._
val foldFlow: Flow[Int, Int, Future[Int]] = Flow.fromGraph(FlowGraph.create(Sink.fold[Int, Int](0)(_ + _)) {
import GraphDSL.Implicits._
val foldFlow: Flow[Int, Int, Future[Int]] = Flow.fromGraph(GraphDSL.create(Sink.fold[Int, Int](0)(_ + _)) {
implicit builder
fold
FlowShape(fold.inlet, builder.materializedValue.mapAsync(4)(identity).outlet)
@ -214,9 +214,9 @@ class FlowGraphDocSpec extends AkkaSpec {
Await.result(Source(1 to 10).via(foldFlow).runWith(Sink.head), 3.seconds) should ===(55)
//#flow-graph-matvalue-cycle
import FlowGraph.Implicits._
import GraphDSL.Implicits._
// This cannot produce any value:
val cyclicFold: Source[Int, Future[Int]] = Source.fromGraph(FlowGraph.create(Sink.fold[Int, Int](0)(_ + _)) {
val cyclicFold: Source[Int, Future[Int]] = Source.fromGraph(GraphDSL.create(Sink.fold[Int, Int](0)(_ + _)) {
implicit builder =>
fold =>
// - Fold cannot complete until its upstream mapAsync completes

View file

@ -1,12 +1,12 @@
package docs.stream
import akka.stream.FlowShape
import akka.stream.scaladsl.{ FlowGraph, Merge, Balance, Source, Flow }
import akka.stream.scaladsl.{ GraphDSL, Merge, Balance, Source, Flow }
import akka.stream.testkit.AkkaSpec
class FlowParallelismDocSpec extends AkkaSpec {
import FlowGraph.Implicits._
import GraphDSL.Implicits._
case class ScoopOfBatter()
case class HalfCookedPancake()
@ -38,7 +38,7 @@ class FlowParallelismDocSpec extends AkkaSpec {
val fryingPan: Flow[ScoopOfBatter, Pancake, Unit] =
Flow[ScoopOfBatter].map { batter => Pancake() }
val pancakeChef: Flow[ScoopOfBatter, Pancake, Unit] = Flow.fromGraph(FlowGraph.create() { implicit builder =>
val pancakeChef: Flow[ScoopOfBatter, Pancake, Unit] = Flow.fromGraph(GraphDSL.create() { implicit builder =>
val dispatchBatter = builder.add(Balance[ScoopOfBatter](2))
val mergePancakes = builder.add(Merge[Pancake](2))
@ -59,7 +59,7 @@ class FlowParallelismDocSpec extends AkkaSpec {
"Demonstrate parallelized pipelines" in {
//#parallel-pipeline
val pancakeChef: Flow[ScoopOfBatter, Pancake, Unit] =
Flow.fromGraph(FlowGraph.create() { implicit builder =>
Flow.fromGraph(GraphDSL.create() { implicit builder =>
val dispatchBatter = builder.add(Balance[ScoopOfBatter](2))
val mergePancakes = builder.add(Merge[Pancake](2))
@ -77,7 +77,7 @@ class FlowParallelismDocSpec extends AkkaSpec {
"Demonstrate pipelined parallel processing" in {
//#pipelined-parallel
val pancakeChefs1: Flow[ScoopOfBatter, HalfCookedPancake, Unit] =
Flow.fromGraph(FlowGraph.create() { implicit builder =>
Flow.fromGraph(GraphDSL.create() { implicit builder =>
val dispatchBatter = builder.add(Balance[ScoopOfBatter](2))
val mergeHalfPancakes = builder.add(Merge[HalfCookedPancake](2))
@ -90,7 +90,7 @@ class FlowParallelismDocSpec extends AkkaSpec {
})
val pancakeChefs2: Flow[HalfCookedPancake, Pancake, Unit] =
Flow.fromGraph(FlowGraph.create() { implicit builder =>
Flow.fromGraph(GraphDSL.create() { implicit builder =>
val dispatchHalfPancakes = builder.add(Balance[HalfCookedPancake](2))
val mergePancakes = builder.add(Merge[Pancake](2))

View file

@ -16,8 +16,8 @@ class GraphCyclesSpec extends AkkaSpec {
// format: OFF
//#deadlocked
// WARNING! The graph below deadlocks!
RunnableGraph.fromGraph(FlowGraph.create() { implicit b =>
import FlowGraph.Implicits._
RunnableGraph.fromGraph(GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._
val merge = b.add(Merge[Int](2))
val bcast = b.add(Broadcast[Int](2))
@ -34,8 +34,8 @@ class GraphCyclesSpec extends AkkaSpec {
// format: OFF
//#unfair
// WARNING! The graph below stops consuming from "source" after a few steps
RunnableGraph.fromGraph(FlowGraph.create() { implicit b =>
import FlowGraph.Implicits._
RunnableGraph.fromGraph(GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._
val merge = b.add(MergePreferred[Int](1))
val bcast = b.add(Broadcast[Int](2))
@ -51,8 +51,8 @@ class GraphCyclesSpec extends AkkaSpec {
"include a dropping cycle" in {
// format: OFF
//#dropping
RunnableGraph.fromGraph(FlowGraph.create() { implicit b =>
import FlowGraph.Implicits._
RunnableGraph.fromGraph(GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._
val merge = b.add(Merge[Int](2))
val bcast = b.add(Broadcast[Int](2))
@ -69,8 +69,8 @@ class GraphCyclesSpec extends AkkaSpec {
// format: OFF
//#zipping-dead
// WARNING! The graph below never processes any elements
RunnableGraph.fromGraph(FlowGraph.create() { implicit b =>
import FlowGraph.Implicits._
RunnableGraph.fromGraph(GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._
val zip = b.add(ZipWith[Int, Int, Int]((left, right) => right))
val bcast = b.add(Broadcast[Int](2))
@ -87,8 +87,8 @@ class GraphCyclesSpec extends AkkaSpec {
"include a live zipping cycle" in {
// format: OFF
//#zipping-live
RunnableGraph.fromGraph(FlowGraph.create() { implicit b =>
import FlowGraph.Implicits._
RunnableGraph.fromGraph(GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._
val zip = b.add(ZipWith((left: Int, right: Int) => left))
val bcast = b.add(Broadcast[Int](2))

View file

@ -66,7 +66,7 @@ class GraphStageDocSpec extends AkkaSpec {
//#custom-source-example
//#simple-source-usage
// A GraphStage is a proper Graph, just like what FlowGraph.create would return
// A GraphStage is a proper Graph, just like what GraphDSL.create would return
val sourceGraph: Graph[SourceShape[Int], Unit] = new NumbersSource
// Create a Source from the Graph to access the DSL

View file

@ -39,8 +39,8 @@ class StreamBuffersRateSpec extends AkkaSpec {
import scala.concurrent.duration._
case class Tick()
RunnableGraph.fromGraph(FlowGraph.create() { implicit b =>
import FlowGraph.Implicits._
RunnableGraph.fromGraph(GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._
val zipper = b.add(ZipWith[Tick, Int, Int]((tick, count) => count))

View file

@ -19,8 +19,8 @@ class StreamPartialFlowGraphDocSpec extends AkkaSpec {
"build with open ports" in {
//#simple-partial-flow-graph
val pickMaxOfThree = FlowGraph.create() { implicit b =>
import FlowGraph.Implicits._
val pickMaxOfThree = GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._
val zip1 = b.add(ZipWith[Int, Int, Int](math.max _))
val zip2 = b.add(ZipWith[Int, Int, Int](math.max _))
@ -31,9 +31,9 @@ class StreamPartialFlowGraphDocSpec extends AkkaSpec {
val resultSink = Sink.head[Int]
val g = RunnableGraph.fromGraph(FlowGraph.create(resultSink) { implicit b =>
val g = RunnableGraph.fromGraph(GraphDSL.create(resultSink) { implicit b =>
sink =>
import FlowGraph.Implicits._
import GraphDSL.Implicits._
// importing the partial graph will return its shape (inlets & outlets)
val pm3 = b.add(pickMaxOfThree)
@ -52,8 +52,8 @@ class StreamPartialFlowGraphDocSpec extends AkkaSpec {
"build source from partial flow graph" in {
//#source-from-partial-flow-graph
val pairs = Source.fromGraph(FlowGraph.create() { implicit b =>
import FlowGraph.Implicits._
val pairs = Source.fromGraph(GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._
// prepare graph elements
val zip = b.add(Zip[Int, Int]())
@ -75,8 +75,8 @@ class StreamPartialFlowGraphDocSpec extends AkkaSpec {
"build flow from partial flow graph" in {
//#flow-from-partial-flow-graph
val pairUpWithToString =
Flow.fromGraph(FlowGraph.create() { implicit b =>
import FlowGraph.Implicits._
Flow.fromGraph(GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._
// prepare graph elements
val broadcast = b.add(Broadcast[Int](2))

View file

@ -118,8 +118,8 @@ class TwitterStreamQuickstartDocSpec extends AkkaSpec {
// format: OFF
//#flow-graph-broadcast
val g = RunnableGraph.fromGraph(FlowGraph.create() { implicit b =>
import FlowGraph.Implicits._
val g = RunnableGraph.fromGraph(GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._
val bcast = b.add(Broadcast[Tweet](2))
tweets ~> bcast.in

View file

@ -27,7 +27,7 @@ class RecipeCollectingMetrics extends RecipeSpec {
// //#periodic-metrics-collection
// val currentLoad = loadUpdates.transform(() => new HoldWithWait)
//
// val graph = FlowGraph { implicit builder =>
// val graph = GraphDSL { implicit builder =>
// import FlowGraphImplicits._
// val collector = ZipWith[Int, Tick, String](
// (load: Int, tick: Tick) => s"current load is $load")

View file

@ -24,9 +24,9 @@ class RecipeDroppyBroadcast extends RecipeSpec {
val mySink3 = Sink(sub3)
//#droppy-bcast
val graph = RunnableGraph.fromGraph(FlowGraph.create(mySink1, mySink2, mySink3)((_, _, _)) { implicit b =>
val graph = RunnableGraph.fromGraph(GraphDSL.create(mySink1, mySink2, mySink3)((_, _, _)) { implicit b =>
(sink1, sink2, sink3) =>
import FlowGraph.Implicits._
import GraphDSL.Implicits._
val bcast = b.add(Broadcast[Int](3))
myElements ~> bcast

View file

@ -99,8 +99,8 @@ class RecipeGlobalRateLimit extends RecipeSpec {
val probe = TestSubscriber.manualProbe[String]()
RunnableGraph.fromGraph(FlowGraph.create() { implicit b =>
import FlowGraph.Implicits._
RunnableGraph.fromGraph(GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._
val merge = b.add(Merge[String](2))
source1 ~> merge ~> Sink(probe)
source2 ~> merge

View file

@ -18,8 +18,8 @@ class RecipeManualTrigger extends RecipeSpec {
val sink = Sink(sub)
//#manually-triggered-stream
val graph = RunnableGraph.fromGraph(FlowGraph.create() { implicit builder =>
import FlowGraph.Implicits._
val graph = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder =>
import GraphDSL.Implicits._
val zip = builder.add(Zip[Message, Trigger]())
elements ~> zip.in0
triggerSource ~> zip.in1
@ -57,8 +57,8 @@ class RecipeManualTrigger extends RecipeSpec {
val sink = Sink(sub)
//#manually-triggered-stream-zipwith
val graph = RunnableGraph.fromGraph(FlowGraph.create() { implicit builder =>
import FlowGraph.Implicits._
val graph = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder =>
import GraphDSL.Implicits._
val zip = builder.add(ZipWith((msg: Message, trigger: Trigger) => msg))
elements ~> zip.in0

View file

@ -19,9 +19,9 @@ class RecipeWorkerPool extends RecipeSpec {
//#worker-pool
def balancer[In, Out](worker: Flow[In, Out, Any], workerCount: Int): Flow[In, Out, Unit] = {
import FlowGraph.Implicits._
import GraphDSL.Implicits._
Flow.fromGraph(FlowGraph.create() { implicit b =>
Flow.fromGraph(GraphDSL.create() { implicit b =>
val balancer = b.add(Balance[In](workerCount, waitForAllDownstreams = true))
val merge = b.add(Merge[Out](workerCount))

View file

@ -72,8 +72,8 @@ class StreamTcpDocSpec extends AkkaSpec {
connections runForeach { connection =>
val serverLogic = Flow.fromGraph(FlowGraph.create() { implicit b =>
import FlowGraph.Implicits._
val serverLogic = Flow.fromGraph(GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._
// server logic, parses incoming commands
val commandParser = new PushStage[String, String] {

View file

@ -92,18 +92,20 @@ Should be replaced by
.. includecode:: code/docs/MigrationsScala.scala#bidiflow-wrap
FlowGraph builder methods have been renamed
FlowGraph class and builder methods have been renamed
===========================================
Due to incorrect overlap with the :class:`Flow` concept we renamed the :class:`FlowGraph` class to :class:`GraphDSL`.
There is now only one graph creation method called ``create`` which is analogous to the old ``partial`` method. For
closed graphs now it is explicitly required to return ``ClosedShape`` at the end of the builder block.
Update procedure
----------------
1. Replace all occurrences of ``FlowGraph.partial()`` or ``FlowGraph.closed()`` with ``FlowGraph.create()``
2. Add ``ClosedShape`` as a return value of the builder block if it was ``FlowGraph.closed()`` before
3. Wrap the closed graph with ``RunnableGraph.fromGraph`` if it was ``FlowGraph.closed()`` before
1. Search and replace all occurrences of ``FlowGraph`` with ``GraphDSL``.
2. Replace all occurrences of ``GraphDSL.partial()`` or ``GraphDSL.closed()`` with ``GraphDSL.create()``.
3. Add ``ClosedShape`` as a return value of the builder block if it was ``FlowGraph.closed()`` before.
4. Wrap the closed graph with ``RunnableGraph.fromGraph`` if it was ``FlowGraph.closed()`` before.
Example
^^^^^^^
@ -131,7 +133,7 @@ Methods that create Source, Sink, Flow from Graphs have been removed
Previously there were convenience methods available on ``Sink``, ``Source``, ``Flow`` an ``BidiFlow`` to create
these DSL elements from a graph builder directly. Now this requires two explicit steps to reduce the number of overloaded
methods (helps Java 8 type inference) and also reduces the ways how these elements can be created. There is only one
graph creation method to learn (``FlowGraph.create``) and then there is only one conversion method to use ``fromGraph()``.
graph creation method to learn (``GraphDSL.create``) and then there is only one conversion method to use ``fromGraph()``.
This means that the following methods have been removed:
- ``adapt()`` method on ``Source``, ``Sink``, ``Flow`` and ``BidiFlow`` (both DSLs)
@ -144,7 +146,7 @@ Update procedure
Everywhere where ``Source``, ``Sink``, ``Flow`` and ``BidiFlow`` is created from a graph using a builder have to
be replaced with two steps
1. Create a ``Graph`` with the correct ``Shape`` using ``FlowGraph.create`` (e.g.. for ``Source`` it means first
1. Create a ``Graph`` with the correct ``Shape`` using ``GraphDSL.create`` (e.g.. for ``Source`` it means first
creating a ``Graph`` with ``SourceShape``)
2. Create the required DSL element by calling ``fromGraph()`` on the required DSL element (e.g. ``Source.fromGraph``)
passing the graph created in the previous step
@ -187,7 +189,7 @@ Several Graph builder methods have been removed
The ``addEdge`` methods have been removed from the DSL to reduce the ways connections can be made and to reduce the
number of overloads. Now only the ``~>`` notation is available which requires the import of the implicits
``FlowGraph.Implicits._``.
``GraphDSL.Implicits._``.
Update procedure
----------------

View file

@ -126,7 +126,7 @@ As a first example, let's look at a more complex layout:
The diagram shows a :class:`RunnableGraph` (remember, if there are no unwired ports, the graph is closed, and therefore
can be materialized) that encapsulates a non-trivial stream processing network. It contains fan-in, fan-out stages,
directed and non-directed cycles. The ``runnable()`` method of the :class:`FlowGraph` object allows the creation of a
directed and non-directed cycles. The ``runnable()`` method of the :class:`GraphDSL` object allows the creation of a
general, closed, and runnable graph. For example the network on the diagram can be realized like this:
.. includecode:: code/docs/stream/CompositionDocSpec.scala#complex-graph
@ -141,7 +141,7 @@ explicitly, and it is not necessary to import our linear stages via ``add()``, s
Similar to the case in the first section, so far we have not considered modularity. We created a complex graph, but
the layout is flat, not modularized. We will modify our example, and create a reusable component with the graph DSL.
The way to do it is to use the ``create()`` factory method on :class:`FlowGraph`. If we remove the sources and sinks
The way to do it is to use the ``create()`` factory method on :class:`GraphDSL`. If we remove the sources and sinks
from the previous example, what remains is a partial graph:
|
@ -284,7 +284,7 @@ Attributes
----------
We have seen that we can use ``named()`` to introduce a nesting level in the fluid DSL (and also explicit nesting by using
``create()`` from :class:`FlowGraph`). Apart from having the effect of adding a nesting level, ``named()`` is actually
``create()`` from :class:`GraphDSL`). Apart from having the effect of adding a nesting level, ``named()`` is actually
a shorthand for calling ``withAttributes(Attributes.name("someName"))``. Attributes provide a way to fine-tune certain
aspects of the materialized running entity. For example buffer sizes can be controlled via attributes (see
:ref:`stream-buffers-scala`). When it comes to hierarchic composition, attributes are inherited by nested modules,

View file

@ -15,7 +15,7 @@ Custom processing with GraphStage
=================================
The :class:`GraphStage` abstraction can be used to create arbitrary graph processing stages with any number of input
or output ports. It is a counterpart of the ``FlowGraph.create()`` method which creates new stream processing
or output ports. It is a counterpart of the ``GraphDSL.create()`` method which creates new stream processing
stages by composing others. Where :class:`GraphStage` differs is that it creates a stage that is itself not divisible into
smaller ones, and allows state to be maintained inside it in a safe way.

View file

@ -31,8 +31,11 @@ Back-pressure
Non-Blocking
Means that a certain operation does not hinder the progress of the calling thread, even if it takes long time to
finish the requested operation.
Graph
A description of a stream processing topology, defining the pathways through which elements shall flow when the stream
is running.
Processing Stage
The common name for all building blocks that build up a Flow or FlowGraph.
The common name for all building blocks that build up a Graph.
Examples of a processing stage would be operations like ``map()``, ``filter()``, stages added by ``transform()`` like
:class:`PushStage`, :class:`PushPullStage`, :class:`StatefulStage` and graph junctions like ``Merge`` or ``Broadcast``.
For the full list of built-in processing stages see :ref:`stages-overview`
@ -67,7 +70,7 @@ it will be represented by the ``RunnableGraph`` type, indicating that it is read
It is important to remember that even after constructing the ``RunnableGraph`` by connecting all the source, sink and
different processing stages, no data will flow through it until it is materialized. Materialization is the process of
allocating all resources needed to run the computation described by a Flow (in Akka Streams this will often involve
allocating all resources needed to run the computation described by a Graph (in Akka Streams this will often involve
starting up Actors). Thanks to Flows being simply a description of the processing pipeline they are *immutable,
thread-safe, and freely shareable*, which means that it is for example safe to share and send them between actors, to have
one actor prepare the work, and then have it be materialized at some completely different place in the code.
@ -204,11 +207,11 @@ Stream Materialization
When constructing flows and graphs in Akka Streams think of them as preparing a blueprint, an execution plan.
Stream materialization is the process of taking a stream description (the graph) and allocating all the necessary resources
it needs in order to run. In the case of Akka Streams this often means starting up Actors which power the processing,
but is not restricted to that - it could also mean opening files or socket connections etc. depending on what the stream needs.
but is not restricted to that—it could also mean opening files or socket connections etc.—depending on what the stream needs.
Materialization is triggered at so called "terminal operations". Most notably this includes the various forms of the ``run()``
and ``runWith()`` methods defined on flow elements as well as a small number of special syntactic sugars for running with
well-known sinks, such as ``runForeach(el => )`` (being an alias to ``runWith(Sink.foreach(el => ))``.
and ``runWith()`` methods defined on :class:`Source` and :class:`Flow` elements as well as a small number of special syntactic sugars for running with
well-known sinks, such as ``runForeach(el => ...)`` (being an alias to ``runWith(Sink.foreach(el => ...))``.
Materialization is currently performed synchronously on the materializing thread.
The actual stream processing is handled by actors started up during the streams materialization,
@ -216,7 +219,7 @@ which will be running on the thread pools they have been configured to run on -
:class:`MaterializationSettings` while constructing the :class:`ActorMaterializer`.
.. note::
Reusing *instances* of linear computation stages (Source, Sink, Flow) inside FlowGraphs is legal,
Reusing *instances* of linear computation stages (Source, Sink, Flow) inside composite Graphs is legal,
yet will materialize that stage multiple times.
.. _flow-combine-mat-scala:

View file

@ -17,10 +17,10 @@ streams, such that the second one is consumed after the first one has completed)
.. _flow-graph-scala:
Constructing Flow Graphs
------------------------
Constructing Graphs
-------------------
Flow graphs are built from simple Flows which serve as the linear connections within the graphs as well as junctions
Graphs are built from simple Flows which serve as the linear connections within the graphs as well as junctions
which serve as fan-in and fan-out points for Flows. Thanks to the junctions having meaningful types based on their behaviour
and making them explicit elements these elements should be rather straightforward to use.
@ -41,7 +41,7 @@ Akka Streams currently provide these junctions (for a detailed list see :ref:`st
- ``Zip[A,B]`` *(2 inputs, 1 output)* is a :class:`ZipWith` specialised to zipping input streams of ``A`` and ``B`` into an ``(A,B)`` tuple stream
- ``Concat[A]`` *(2 inputs, 1 output)* concatenates two streams (first consume one, then the second one)
One of the goals of the FlowGraph DSL is to look similar to how one would draw a graph on a whiteboard, so that it is
One of the goals of the GraphDSL DSL is to look similar to how one would draw a graph on a whiteboard, so that it is
simple to translate a design from whiteboard to code and be able to relate those two. Let's illustrate this by translating
the below hand drawn graph into Akka Streams:
@ -55,18 +55,18 @@ will be inferred.
.. includecode:: code/docs/stream/FlowGraphDocSpec.scala#simple-flow-graph
.. note::
Junction *reference equality* defines *graph node equality* (i.e. the same merge *instance* used in a FlowGraph
Junction *reference equality* defines *graph node equality* (i.e. the same merge *instance* used in a GraphDSL
refers to the same location in the resulting graph).
Notice the ``import FlowGraph.Implicits._`` which brings into scope the ``~>`` operator (read as "edge", "via" or "to")
Notice the ``import GraphDSL.Implicits._`` which brings into scope the ``~>`` operator (read as "edge", "via" or "to")
and its inverted counterpart ``<~`` (for noting down flows in the opposite direction where appropriate).
By looking at the snippets above, it should be apparent that the :class:`FlowGraph.Builder` object is *mutable*.
By looking at the snippets above, it should be apparent that the :class:`GraphDSL.Builder` object is *mutable*.
It is used (implicitly) by the ``~>`` operator, also making it a mutable operation as well.
The reason for this design choice is to enable simpler creation of complex graphs, which may even contain cycles.
Once the FlowGraph has been constructed though, the :class:`FlowGraph` instance *is immutable, thread-safe, and freely shareable*.
The same is true of all flow pieces—sources, sinks, and flows—once they are constructed.
This means that you can safely re-use one given Flow in multiple places in a processing graph.
Once the GraphDSL has been constructed though, the :class:`GraphDSL` instance *is immutable, thread-safe, and freely shareable*.
The same is true of all graph pieces—sources, sinks, and flows—once they are constructed.
This means that you can safely re-use one given Flow or junction in multiple places in a processing graph.
We have seen examples of such re-use already above: the merge and broadcast junctions were imported
into the graph using ``builder.add(...)``, an operation that will make a copy of the blueprint that
@ -84,18 +84,18 @@ materialized as two connections between the corresponding Sources and Sinks:
.. _partial-flow-graph-scala:
Constructing and combining Partial Flow Graphs
----------------------------------------------
Constructing and combining Partial Graphs
-----------------------------------------
Sometimes it is not possible (or needed) to construct the entire computation graph in one place, but instead construct
all of its different phases in different places and in the end connect them all into a complete graph and run it.
This can be achieved by returning a different ``Shape`` than ``ClosedShape``, for example ``FlowShape(in, out)``, from the
function given to ``FlowGraph.create``. See :ref:`predefined_shapes`) for a list of such predefined shapes.
function given to ``GraphDSL.create``. See :ref:`predefined_shapes`) for a list of such predefined shapes.
Making a ``Graph`` a :class:`RunnableGraph` requires all ports to be connected, and if they are not
it will throw an exception at construction time, which helps to avoid simple
wiring errors while working with graphs. A partial flow graph however allows
wiring errors while working with graphs. A partial graph however allows
you to return the set of yet to be connected ports from the code block that
performs the internal wiring.
@ -112,10 +112,10 @@ the undefined elements are rewired to real sources and sinks. The graph can then
.. warning::
Please note that :class:`FlowGraph` is not able to provide compile time type-safety about whether or not all
Please note that :class:`GraphDSL` is not able to provide compile time type-safety about whether or not all
elements have been properly connected—this validation is performed as a runtime check during the graph's instantiation.
A partial flow graph also verifies that all ports are either connected or part of the returned :class:`Shape`.
A partial graph also verifies that all ports are either connected or part of the returned :class:`Shape`.
.. _constructing-sources-sinks-flows-from-partial-graphs-scala:
@ -136,7 +136,7 @@ Being able to hide complex graphs inside of simple elements such as Sink / Sourc
complex element and from there on treat it as simple compound stage for linear computations.
In order to create a Source from a graph the method ``Source.fromGraph`` is used, to use it we must have a
``Graph[SourceShape, T]``. This is constructed using ``FlowGraph.create`` and returning a ``SourceShape``
``Graph[SourceShape, T]``. This is constructed using ``GraphDSL.create`` and returning a ``SourceShape``
from the function passed in . The single outlet must be provided to the ``SourceShape.of`` method and will become
“the sink that must be attached before this Source can run”.
@ -283,7 +283,7 @@ The following example demonstrates a case where the materialized ``Future`` of a
Graph cycles, liveness and deadlocks
------------------------------------
Cycles in bounded flow graphs need special considerations to avoid potential deadlocks and other liveness issues.
Cycles in bounded stream topologies need special considerations to avoid potential deadlocks and other liveness issues.
This section shows several examples of problems that can arise from the presence of feedback arcs in stream processing
graphs.

View file

@ -86,10 +86,10 @@ it makes sense to make the Server initiate the conversation by emitting a "hello
.. includecode:: code/docs/stream/io/StreamTcpDocSpec.scala#welcome-banner-chat-server
The way we constructed a :class:`Flow` using a :class:`PartialFlowGraph` is explained in detail in
The way we constructed a :class:`Flow` using the :class:`GraphDSL` is explained in detail in
:ref:`constructing-sources-sinks-flows-from-partial-graphs-scala`, however the basic concepts is rather simple
we can encapsulate arbitrarily complex logic within a :class:`Flow` as long as it exposes the same interface, which means
exposing exactly one :class:`UndefinedSink` and exactly one :class:`UndefinedSource` which will be connected to the TCP
exposing exactly one :class:`Outlet` and exactly one :class:`Inlet` which will be connected to the TCP
pipeline. In this example we use a :class:`Concat` graph processing stage to inject the initial message, and then
continue with handling all incoming data using the echo handler. You should use this pattern of encapsulating complex
logic in Flows and attaching those to :class:`StreamIO` in order to implement your custom and possibly sophisticated TCP servers.

View file

@ -103,15 +103,15 @@ Akka Streams intentionally separate the linear stream structures (Flows) from th
in order to offer the most convenient API for both of these cases. Graphs can express arbitrarily complex stream setups
at the expense of not reading as familiarly as collection transformations.
Graphs are constructed using :class:`FlowGraph` like this:
Graphs are constructed using :class:`GraphDSL` like this:
.. includecode:: code/docs/stream/TwitterStreamQuickstartDocSpec.scala#flow-graph-broadcast
As you can see, inside the :class:`FlowGraph` we use an implicit graph builder ``b`` to mutably construct the graph
As you can see, inside the :class:`GraphDSL` we use an implicit graph builder ``b`` to mutably construct the graph
using the ``~>`` "edge operator" (also read as "connect" or "via" or "to"). The operator is provided implicitly
by importing ``FlowGraph.Implicits._``.
by importing ``GraphDSL.Implicits._``.
``FlowGraph.create`` returns a :class:`Graph`, in this example a :class:`Graph[ClosedShape, Unit]` where
``GraphDSL.create`` returns a :class:`Graph`, in this example a :class:`Graph[ClosedShape, Unit]` where
:class:`ClosedShape` means that it is *a fully connected graph* or "closed" - there are no unconnected inputs or outputs.
Since it is closed it is possible to transform the graph into a :class:`RunnableGraph` using ``RunnableGraph.fromGraph``.
The runnable graph can then be ``run()`` to materialize a stream out of it.

View file

@ -75,7 +75,7 @@ Here is an example of a code that demonstrate some of the issues caused by inter
.. includecode:: code/docs/stream/StreamBuffersRateSpec.scala#buffering-abstraction-leak
Running the above example one would expect the number *3* to be printed in every 3 seconds (the ``conflate`` step here
Running the above example one would expect the number *3* to be printed in every 3 seconds (the ``cUndefinedSourceonflate`` step here
is configured so that it counts the number of elements received before the downstream ``ZipWith`` consumes them). What
is being printed is different though, we will see the number *1*. The reason for this is the internal buffer which is
by default 16 elements large, and prefetches elements before the ``ZipWith`` starts consuming them. It is possible
@ -141,15 +141,15 @@ Rate transformation
Understanding conflate
----------------------
When a fast producer can not be informed to slow down by backpressure or some other signal, conflate might be useful to combine elements from a producer until a demand signal comes from a consumer.
When a fast producer can not be informed to slow down by backpressure or some other signal, ``conflate`` might be useful to combine elements from a producer until a demand signal comes from a consumer.
Below is an example snippet that summarizes fast stream of elements to a standart deviation, mean and count of elements that have arrived while the stats have been calculated.
.. includecode:: code/docs/stream/RateTransformationDocSpec.scala#conflate-summarize
This example demonstrates that such flow's rate is decoupled. Element rate at the start of the flow can be much higher that the element rate at the end of the flow.
This example demonstrates that such flow's rate is decoupled. The element rate at the start of the flow can be much higher that the element rate at the end of the flow.
Another possible use of conflate is to not consider all elements for summary when producer starts getting too fast. Example below demonstrates how conflate can be used to implement random drop of elements when consumer is not able to keep up with the producer.
Another possible use of ``conflate`` is to not consider all elements for summary when producer starts getting too fast. Example below demonstrates how ``conflate`` can be used to implement random drop of elements when consumer is not able to keep up with the producer.
.. includecode:: code/docs/stream/RateTransformationDocSpec.scala#conflate-sample
@ -158,7 +158,7 @@ Understanding expand
Expand helps to deal with slow producers which are unable to keep up with the demand coming from consumers. Expand allows to extrapolate a value to be sent as an element to a consumer.
As a simple use of expand here is a flow that sends the same element to consumer when producer does not send any new elements.
As a simple use of ``expand`` here is a flow that sends the same element to consumer when producer does not send any new elements.
.. includecode:: code/docs/stream/RateTransformationDocSpec.scala#expand-last
@ -166,4 +166,4 @@ Expand also allows to keep some state between demand requests from the downstrea
.. includecode:: code/docs/stream/RateTransformationDocSpec.scala#expand-drift
Note that all of the elements coming from upstream will go through expand at least once. This means that the output of this flow is going to report a drift of zero if producer is fast enough, or a larger drift otherwise.
Note that all of the elements coming from upstream will go through ``expand`` at least once. This means that the output of this flow is going to report a drift of zero if producer is fast enough, or a larger drift otherwise.