update graph docs

This commit is contained in:
Roland Kuhn 2015-02-26 16:37:16 +01:00
parent 81fb5c14cb
commit 1563e0189b
4 changed files with 88 additions and 65 deletions

View file

@ -20,13 +20,13 @@ class FlowGraphDocSpec extends AkkaSpec {
"build simple graph" in { "build simple graph" in {
//format: OFF //format: OFF
//#simple-flow-graph //#simple-flow-graph
val g = FlowGraph.closed() { implicit b => val g = FlowGraph.closed() { implicit builder: FlowGraph.Builder =>
import FlowGraph.Implicits._ import FlowGraph.Implicits._
val in = Source(1 to 10) val in = Source(1 to 10)
val out = Sink.ignore val out = Sink.ignore
val bcast = b.add(Broadcast[Int](2)) val bcast = builder.add(Broadcast[Int](2))
val merge = b.add(Merge[Int](2)) val merge = builder.add(Merge[Int](2))
val f1, f2, f3, f4 = Flow[Int].map(_ + 10) val f1, f2, f3, f4 = Flow[Int].map(_ + 10)
@ -43,21 +43,21 @@ class FlowGraphDocSpec extends AkkaSpec {
"build simple graph without implicits" in { "build simple graph without implicits" in {
//#simple-flow-graph-no-implicits //#simple-flow-graph-no-implicits
val g = FlowGraph.closed() { b => val g = FlowGraph.closed() { builder: FlowGraph.Builder =>
val in = Source(1 to 10) val in = Source(1 to 10)
val out = Sink.ignore val out = Sink.ignore
val broadcast = b.add(Broadcast[Int](2)) val broadcast = builder.add(Broadcast[Int](2))
val merge = b.add(Merge[Int](2)) val merge = builder.add(Merge[Int](2))
val f1 = Flow[Int].map(_ + 10) val f1 = Flow[Int].map(_ + 10)
val f3 = Flow[Int].map(_.toString) val f3 = Flow[Int].map(_.toString)
val f2 = Flow[Int].map(_ + 20) val f2 = Flow[Int].map(_ + 20)
b.addEdge(b.add(in), broadcast.in) builder.addEdge(builder.add(in), broadcast.in)
b.addEdge(broadcast.out(0), f1, merge.in(0)) builder.addEdge(broadcast.out(0), f1, merge.in(0))
b.addEdge(broadcast.out(1), f2, merge.in(1)) builder.addEdge(broadcast.out(1), f2, merge.in(1))
b.addEdge(merge.out, f3, b.add(out)) builder.addEdge(merge.out, f3, builder.add(out))
} }
//#simple-flow-graph-no-implicits //#simple-flow-graph-no-implicits
@ -67,12 +67,12 @@ class FlowGraphDocSpec extends AkkaSpec {
"flow connection errors" in { "flow connection errors" in {
intercept[IllegalArgumentException] { intercept[IllegalArgumentException] {
//#simple-graph //#simple-graph
FlowGraph.closed() { implicit b => FlowGraph.closed() { implicit builder =>
import FlowGraph.Implicits._ import FlowGraph.Implicits._
val source1 = Source(1 to 10) val source1 = Source(1 to 10)
val source2 = Source(1 to 10) val source2 = Source(1 to 10)
val zip = b.add(Zip[Int, Int]()) val zip = builder.add(Zip[Int, Int]())
source1 ~> zip.in0 source1 ~> zip.in0
source2 ~> zip.in1 source2 ~> zip.in1
@ -94,10 +94,10 @@ class FlowGraphDocSpec extends AkkaSpec {
// format: OFF // format: OFF
val g = val g =
//#flow-graph-reusing-a-flow //#flow-graph-reusing-a-flow
FlowGraph.closed(topHeadSink, bottomHeadSink)((_, _)) { implicit b => FlowGraph.closed(topHeadSink, bottomHeadSink)((_, _)) { implicit builder =>
(topHS, bottomHS) => (topHS, bottomHS) =>
import FlowGraph.Implicits._ import FlowGraph.Implicits._
val broadcast = b.add(Broadcast[Int](2)) val broadcast = builder.add(Broadcast[Int](2))
Source.single(1) ~> broadcast.in Source.single(1) ~> broadcast.in
broadcast.out(0) ~> sharedDoubler ~> topHS.inlet broadcast.out(0) ~> sharedDoubler ~> topHS.inlet
@ -207,12 +207,13 @@ class FlowGraphDocSpec extends AkkaSpec {
import FanInShape.Name import FanInShape.Name
import FanInShape.Init import FanInShape.Init
case class PriorityWorkerPoolShape2[In, Out]( class PriorityWorkerPoolShape2[In, Out](_init: Init[Out] = Name("PriorityWorkerPool"))
_init: Init[Out] = Name("PriorityWorkerPool")) extends FanInShape2[In, In, Out](_init) { extends FanInShape[Out](_init) {
protected override def construct(i: Init[Out]) = new PriorityWorkerPoolShape2(i)
def jobsIn: Inlet[In] = in0 val jobsIn = newInlet[In]("jobsIn")
def priorityJobsIn: Inlet[In] = in1 val priorityJobsIn = newInlet[In]("priorityJobsIn")
def resultsOut: Outlet[Out] = out // Outlet[Out] with name "out" is automatically created
} }
//#flow-graph-components-shape2 //#flow-graph-components-shape2

View file

@ -89,11 +89,12 @@ class GraphCyclesSpec extends AkkaSpec {
val zip = b.add(ZipWith((left: Int, right: Int) => left)) val zip = b.add(ZipWith((left: Int, right: Int) => left))
val bcast = b.add(Broadcast[Int](2)) val bcast = b.add(Broadcast[Int](2))
val concat = b.add(Concat[Int]()) val concat = b.add(Concat[Int]())
val start = Source.single(0)
source ~> zip.in0 source ~> zip.in0
zip.out.map { s => println(s); s } ~> bcast ~> Sink.ignore() zip.out.map { s => println(s); s } ~> bcast ~> Sink.ignore()
zip.in1 <~ concat <~ bcast zip.in1 <~ concat <~ start
concat <~ Source.single(0) concat <~ bcast
} }
//#zipping-live //#zipping-live
// format: ON // format: ON

View file

@ -60,8 +60,8 @@ class StreamPartialFlowGraphDocSpec extends AkkaSpec {
def ints = Source(() => Iterator.from(1)) def ints = Source(() => Iterator.from(1))
// connect the graph // connect the graph
ints ~> Flow[Int].filter(_ % 2 != 0) ~> zip.in0 ints.filter(_ % 2 != 0) ~> zip.in0
ints ~> Flow[Int].filter(_ % 2 == 0) ~> zip.in1 ints.filter(_ % 2 == 0) ~> zip.in1
// expose port // expose port
zip.out zip.out
@ -82,8 +82,8 @@ class StreamPartialFlowGraphDocSpec extends AkkaSpec {
val zip = b.add(Zip[Int, String]()) val zip = b.add(Zip[Int, String]())
// connect the graph // connect the graph
broadcast.out(0) ~> Flow[Int].map(identity) ~> zip.in0 broadcast.out(0).map(identity) ~> zip.in0
broadcast.out(1) ~> Flow[Int].map(_.toString) ~> zip.in1 broadcast.out(1).map(_.toString) ~> zip.in1
// expose ports // expose ports
(broadcast.in, zip.out) (broadcast.in, zip.out)

View file

@ -6,7 +6,7 @@ Working with Graphs
In Akka Streams computation graphs are not expressed using a fluent DSL like linear computations are, instead they are In Akka Streams computation graphs are not expressed using a fluent DSL like linear computations are, instead they are
written in a more graph-resembling DSL which aims to make translating graph drawings (e.g. from notes taken written in a more graph-resembling DSL which aims to make translating graph drawings (e.g. from notes taken
from design discussions, or illustrations in protocol specifications) to and from code simpler. In this section we'll from design discussions, or illustrations in protocol specifications) to and from code simpler. In this section well
dive into the multiple ways of constructing and re-using graphs, as well as explain common pitfalls and how to avoid them. dive into the multiple ways of constructing and re-using graphs, as well as explain common pitfalls and how to avoid them.
Graphs are needed whenever you want to perform any kind of fan-in ("multiple inputs") or fan-out ("multiple outputs") operations. Graphs are needed whenever you want to perform any kind of fan-in ("multiple inputs") or fan-out ("multiple outputs") operations.
@ -19,6 +19,7 @@ streams, such that the second one is consumed after the first one has completed)
Constructing Flow Graphs Constructing Flow Graphs
------------------------ ------------------------
Flow graphs are built from simple Flows which serve as the linear connections within the graphs as well as junctions Flow graphs are built from simple Flows which serve as the linear connections within the graphs as well as junctions
which serve as fan-in and fan-out points for Flows. Thanks to the junctions having meaningful types based on their behaviour which serve as fan-in and fan-out points for Flows. Thanks to the junctions having meaningful types based on their behaviour
and making them explicit elements these elements should be rather straightforward to use. and making them explicit elements these elements should be rather straightforward to use.
@ -27,19 +28,19 @@ Akka Streams currently provide these junctions:
* **Fan-out** * **Fan-out**
- ``Broadcast[T]`` (1 input, n outputs) signals each output given an input signal, - ``Broadcast[T]`` *(1 input, N outputs)* given an input element emits to each output
- ``Balance[T]`` (1 input => n outputs), signals one of its output ports given an input signal, - ``Balance[T]`` *(1 input, N outputs)* given an input element emits to one of its output ports
- ``UnZip[A,B]`` (1 input => 2 outputs), which is a specialized element which is able to split a stream of ``(A,B)`` tuples into two streams one type ``A`` and one of type ``B``, - ``UnZip[A,B]`` *(1 input, 2 outputs)* splits a stream of ``(A,B)`` tuples into two streams, one of type ``A`` and one of type ``B``
- ``FlexiRoute[In]`` (1 input, n outputs), which enables writing custom fan out elements using a simple DSL, - ``FlexiRoute[In]`` *(1 input, N outputs)* enables writing custom fan out elements using a simple DSL
* **Fan-in** * **Fan-in**
- ``Merge[In]`` (n inputs , 1 output), picks signals randomly from inputs pushing them one by one to its output, - ``Merge[In]`` *(N inputs , 1 output)* picks randomly from inputs pushing them one by one to its output
- ``MergePreferred[In]`` like :class:`Merge` but if elements are available on ``preferred`` port, it picks from it, otherwise randomly from ``others``, - ``MergePreferred[In]`` like :class:`Merge` but if elements are available on ``preferred`` port, it picks from it, otherwise randomly from ``others``
- ``ZipWith[A,B,...,Out]`` (n inputs (defined upfront), 1 output), which takes a function of n inputs that, given all inputs are signalled, transforms and emits 1 output, - ``ZipWith[A,B,...,Out]`` *(N inputs, 1 output)* which takes a function of N inputs that given a value for each input emits 1 output element
- ``Zip[A,B]`` (2 inputs, 1 output), which is a :class:`ZipWith` specialised to zipping input streams of ``A`` and ``B`` into an ``(A,B)`` tuple stream, - ``Zip[A,B]`` *(2 inputs, 1 output)* is a :class:`ZipWith` specialised to zipping input streams of ``A`` and ``B`` into an ``(A,B)`` tuple stream
- ``Concat[A]`` (2 inputs, 1 output), which enables to concatenate streams (first consume one, then the second one), thus the order of which stream is ``first`` and which ``second`` matters, - ``Concat[A]`` *(2 inputs, 1 output)* concatenates two streams (first consume one, then the second one)
- ``FlexiMerge[Out]`` (n inputs, 1 output), which enables writing custom fan-in elements using a simple DSL. - ``FlexiMerge[Out]`` *(N inputs, 1 output)* enables writing custom fan-in elements using a simple DSL
One of the goals of the FlowGraph DSL is to look similar to how one would draw a graph on a whiteboard, so that it is One of the goals of the FlowGraph DSL is to look similar to how one would draw a graph on a whiteboard, so that it is
simple to translate a design from whiteboard to code and be able to relate those two. Let's illustrate this by translating simple to translate a design from whiteboard to code and be able to relate those two. Let's illustrate this by translating
@ -58,20 +59,30 @@ will be inferred.
Junction *reference equality* defines *graph node equality* (i.e. the same merge *instance* used in a FlowGraph Junction *reference equality* defines *graph node equality* (i.e. the same merge *instance* used in a FlowGraph
refers to the same location in the resulting graph). refers to the same location in the resulting graph).
Notice the ``import FlowGraphImplicits._`` which brings into scope the ``~>`` operator (read as "edge", "via" or "to"). Notice the ``import FlowGraph.Implicits._`` which brings into scope the ``~>`` operator (read as "edge", "via" or "to")
and its inverted counterpart ``<~`` (for noting down flows in the opposite direction where appropriate).
It is also possible to construct graphs without the ``~>`` operator in case you prefer to use the graph builder explicitly: It is also possible to construct graphs without the ``~>`` operator in case you prefer to use the graph builder explicitly:
.. includecode:: code/docs/stream/FlowGraphDocSpec.scala#simple-flow-graph-no-implicits .. includecode:: code/docs/stream/FlowGraphDocSpec.scala#simple-flow-graph-no-implicits
By looking at the snippets above, it should be apparent that the :class:`FlowGraphBuilder` object is *mutable*. By looking at the snippets above, it should be apparent that the :class:`FlowGraph.Builder` object is *mutable*.
It is also used (implicitly) by the ``~>`` operator, also making it a mutable operation as well. It is used (implicitly) by the ``~>`` operator, also making it a mutable operation as well.
The reason for this design choice is to enable simpler creation of complex graphs, which may even contain cycles. The reason for this design choice is to enable simpler creation of complex graphs, which may even contain cycles.
Once the FlowGraph has been constructed though, the :class:`FlowGraph` instance *is immutable, thread-safe, and freely shareable*. Once the FlowGraph has been constructed though, the :class:`FlowGraph` instance *is immutable, thread-safe, and freely shareable*.
The same is true of all flow pieces—sources, sinks, and flows—once they are constructed.
This means that you can safely re-use one given Flow in multiple places in a processing graph.
Linear Flows however are always immutable and appending an operation to a Flow always returns a new Flow instance. We have seen examples of such re-use already above: the merge and broadcast junctions were imported
This means that you can safely re-use one given Flow in multiple places in a processing graph. In the example below into the graph using ``builder.add(...)``, an operation that will make a copy of the blueprint that
we prepare a graph that consists of two parallel streams, in which we re-use the same instance of :class:`Flow`, is passed to it and return the inlets and outlets of the resulting copy so that they can be wired up.
yet it will properly be materialized as two connections between the corresponding Sources and Sinks: Another alternative is to pass existing graphs—of any shape—into the factory method that produces a
new graph. The difference between these approaches is that importing using ``b.add(...)`` ignores the
materialized value of the imported graph while importing via the factory method allows its inclusion;
for more details see :ref:`stream-materialization-scala`.
In the example below we prepare a graph that consists of two parallel streams,
in which we re-use the same instance of :class:`Flow`, yet it will properly be
materialized as two connections between the corresponding Sources and Sinks:
.. includecode:: code/docs/stream/FlowGraphDocSpec.scala#flow-graph-reusing-a-flow .. includecode:: code/docs/stream/FlowGraphDocSpec.scala#flow-graph-reusing-a-flow
@ -82,53 +93,59 @@ Constructing and combining Partial Flow Graphs
Sometimes it is not possible (or needed) to construct the entire computation graph in one place, but instead construct Sometimes it is not possible (or needed) to construct the entire computation graph in one place, but instead construct
all of its different phases in different places and in the end connect them all into a complete graph and run it. all of its different phases in different places and in the end connect them all into a complete graph and run it.
This can be achieved using :class:`PartialFlowGraph`. The reason of representing it as a different type is that a This can be achieved using ``FlowGraph.partial`` instead of
:class:`FlowGraph` requires all ports to be connected, and if they are not it will throw an exception at construction ``FlowGraph.closed``, which will return a ``Graph`` instead of a
time, which helps to avoid simple wiring errors while working with graphs. A partial flow graph however does not perform ``RunnableFlow``. The reason of representing it as a different type is that a
this validation, and allows graphs that are not yet fully connected. :class:`RunnableFlow` requires all ports to be connected, and if they are not
it will throw an exception at construction time, which helps to avoid simple
wiring errors while working with graphs. A partial flow graph however allows
you to return the set of yet to be connected ports from the code block that
performs the internal wiring.
A :class:`PartialFlowGraph` is defined as a :class:`FlowGraph` which contains so called "undefined elements", Let's imagine we want to provide users with a specialized element that given 3 inputs will pick
such as ``UndefinedSink[T]`` or ``UndefinedSource[T]``, which can be reused and plugged into by consumers of that the greatest int value of each zipped triple. We'll want to expose 3 input ports (unconnected sources) and one output port
partial flow graph. Let's imagine we want to provide users with a specialized element that given 3 inputs will pick (unconnected sink).
the greatest int value of each zipped triple. We'll want to expose 3 input ports (undefined sources) and one output port
(undefined sink).
.. includecode:: code/docs/stream/StreamPartialFlowGraphDocSpec.scala#simple-partial-flow-graph .. includecode:: code/docs/stream/StreamPartialFlowGraphDocSpec.scala#simple-partial-flow-graph
As you can see, first we construct the partial graph that contains all the zipping and comparing of stream As you can see, first we construct the partial graph that contains all the zipping and comparing of stream
elements, then we import it (all of its nodes and connections) explicitly to the :class:`FlowGraph` instance in which all elements. This partial graph will have three inputs and one output, wherefore we use the :class:`UniformFanInShape`.
Then we import it (all of its nodes and connections) explicitly into the closed graph built in the second step in which all
the undefined elements are rewired to real sources and sinks. The graph can then be run and yields the expected result. the undefined elements are rewired to real sources and sinks. The graph can then be run and yields the expected result.
.. warning:: .. warning::
Please note that a :class:`FlowGraph` is not able to provide compile time type-safety about whether or not all Please note that a :class:`FlowGraph` is not able to provide compile time type-safety about whether or not all
elements have been properly connected - this validation is performed as a runtime check during the graph's instantiation. elements have been properly connected - this validation is performed as a runtime check during the graph's instantiation.
A partial flow graph also verifies that all ports are either connected or part of the returned :class:`Shape`.
.. _constructing-sources-sinks-flows-from-partial-graphs-scala: .. _constructing-sources-sinks-flows-from-partial-graphs-scala:
Constructing Sources, Sinks and Flows from Partial Graphs Constructing Sources, Sinks and Flows from Partial Graphs
--------------------------------------------------------- ---------------------------------------------------------
Instead of treating a :class:`PartialFlowGraph` as simply a collection of flows and junctions which may not yet all be Instead of treating a partial flow graph as simply a collection of flows and junctions which may not yet all be
connected it is sometimes useful to expose such a complex graph as a simpler structure, connected it is sometimes useful to expose such a complex graph as a simpler structure,
such as a :class:`Source`, :class:`Sink` or :class:`Flow`. such as a :class:`Source`, :class:`Sink` or :class:`Flow`.
In fact, these concepts can be easily expressed as special cases of a partially connected graph: In fact, these concepts can be easily expressed as special cases of a partially connected graph:
* :class:`Source` is a partial flow graph with *exactly one* :class:`UndefinedSink`, * :class:`Source` is a partial flow graph with *exactly one* output, that is it returns a :class:`SourceShape`.
* :class:`Sink` is a partial flow graph with *exactly one* :class:`UndefinedSource`, * :class:`Sink` is a partial flow graph with *exactly one* input, that is it returns a :class:`SinkShape`.
* :class:`Flow` is a partial flow graph with *exactly one* :class:`UndefinedSource` and *exactly one* :class:`UndefinedSource`. * :class:`Flow` is a partial flow graph with *exactly one* input and *exactly one* output, that is it returns a :class:`FlowShape`.
Being able to hide complex graphs inside of simple elements such as Sink / Source / Flow enables you to easily create one Being able to hide complex graphs inside of simple elements such as Sink / Source / Flow enables you to easily create one
complex element and from there on treat it as simple compound stage for linear computations. complex element and from there on treat it as simple compound stage for linear computations.
In order to create a Source from a partial flow graph ``Source`` provides a special apply method that takes a function In order to create a Source from a partial flow graph ``Source`` provides a special apply method that takes a function
that must return an ``UndefinedSink``. This undefined sink will become "the sink that must be attached before this Source that must return an :class:`Outlet[T]`. This unconnected sink will become “the sink that must be attached before this Source
can run". Refer to the example below, in which we create a Source that zips together two numbers, to see this graph can run. Refer to the example below, in which we create a Source that zips together two numbers, to see this graph
construction in action: construction in action:
.. includecode:: code/docs/stream/StreamPartialFlowGraphDocSpec.scala#source-from-partial-flow-graph .. includecode:: code/docs/stream/StreamPartialFlowGraphDocSpec.scala#source-from-partial-flow-graph
Similarly the same can be done for a ``Sink[T]``, in which case the returned value must be an ``UndefinedSource[T]``. Similarly the same can be done for a ``Sink[T]``, in which case the returned value must be an ``Inlet[T]``.
For defining a ``Flow[T]`` we need to expose both an undefined source and sink: For defining a ``Flow[T]`` we need to expose both an inlet and an outlet:
.. includecode:: code/docs/stream/StreamPartialFlowGraphDocSpec.scala#flow-from-partial-flow-graph .. includecode:: code/docs/stream/StreamPartialFlowGraphDocSpec.scala#flow-from-partial-flow-graph
@ -159,7 +176,7 @@ boilerplate
* :class:`FanInShape1`, :class:`FanInShape2`, ..., :class:`FanOutShape1`, :class:`FanOutShape2`, ... for junctions * :class:`FanInShape1`, :class:`FanInShape2`, ..., :class:`FanOutShape1`, :class:`FanOutShape2`, ... for junctions
with multiple input (or output) ports of different types. with multiple input (or output) ports of different types.
Since our shape has two input ports and one output port, we can just reuse the :class:`FanInShape2` class to define Since our shape has two input ports and one output port, we can just use the :class:`FanInShape` DSL to define
our custom shape: our custom shape:
.. includecode:: code/docs/stream/FlowGraphDocSpec.scala#flow-graph-components-shape2 .. includecode:: code/docs/stream/FlowGraphDocSpec.scala#flow-graph-components-shape2
@ -182,16 +199,20 @@ using ``add()`` twice.
Graph cycles, liveness and deadlocks Graph cycles, liveness and deadlocks
------------------------------------ ------------------------------------
By default :class:`FlowGraph` does not allow (or to be precise, its builder does not allow) the creation of cycles. Cycles in bounded flow graphs need special considerations to avoid potential deadlocks and other liveness issues.
The reason for this is that cycles need special considerations to avoid potential deadlocks and other liveness issues.
This section shows several examples of problems that can arise from the presence of feedback arcs in stream processing This section shows several examples of problems that can arise from the presence of feedback arcs in stream processing
graphs. graphs.
The first example demonstrates a graph that contains a naive cycle (the presence of cycles is enabled by calling The first example demonstrates a graph that contains a naïve cycle (the presence of cycles is enabled by calling
``allowCycles()`` on the builder). The graph takes elements from the source, prints them, then broadcasts those elements ``allowCycles()`` on the builder). The graph takes elements from the source, prints them, then broadcasts those elements
to a consumer (we just used ``Sink.ignore`` for now) and to a feedback arc that is merged back into the main stream via to a consumer (we just used ``Sink.ignore`` for now) and to a feedback arc that is merged back into the main stream via
a ``Merge`` junction. a ``Merge`` junction.
.. note::
The graph DSL allows the connection arrows to be reversed, which is particularly handy when writing cycles—as we will
see there are cases where this is very helpful.
.. includecode:: code/docs/stream/GraphCyclesSpec.scala#deadlocked .. includecode:: code/docs/stream/GraphCyclesSpec.scala#deadlocked
Running this we observe that after a few numbers have been printed, no more elements are logged to the console - Running this we observe that after a few numbers have been printed, no more elements are logged to the console -