Merge pull request #17980 from 2m/wip-17337-composition-docs-drewhk
+doc #17337: Document composability and modularity (with java code)
BIN
akka-docs-dev/rst/images/compose_attributes.png
Normal file
|
After Width: | Height: | Size: 27 KiB |
BIN
akka-docs-dev/rst/images/compose_composites.png
Normal file
|
After Width: | Height: | Size: 24 KiB |
BIN
akka-docs-dev/rst/images/compose_graph.png
Normal file
|
After Width: | Height: | Size: 14 KiB |
BIN
akka-docs-dev/rst/images/compose_graph_flow.png
Normal file
|
After Width: | Height: | Size: 10 KiB |
BIN
akka-docs-dev/rst/images/compose_graph_partial.png
Normal file
|
After Width: | Height: | Size: 11 KiB |
BIN
akka-docs-dev/rst/images/compose_graph_shape.png
Normal file
|
After Width: | Height: | Size: 15 KiB |
BIN
akka-docs-dev/rst/images/compose_mat.png
Normal file
|
After Width: | Height: | Size: 20 KiB |
BIN
akka-docs-dev/rst/images/compose_nested_flow.png
Normal file
|
After Width: | Height: | Size: 9.4 KiB |
BIN
akka-docs-dev/rst/images/compose_nested_flow_opaque.png
Normal file
|
After Width: | Height: | Size: 7.5 KiB |
BIN
akka-docs-dev/rst/images/compose_shapes.png
Normal file
|
After Width: | Height: | Size: 9.4 KiB |
BIN
akka-docs-dev/rst/images/composition.png
Normal file
|
After Width: | Height: | Size: 61 KiB |
6771
akka-docs-dev/rst/images/composition.svg
Normal file
|
After Width: | Height: | Size: 291 KiB |
310
akka-docs-dev/rst/java/stream-composition.rst
Normal file
|
|
@ -0,0 +1,310 @@
|
||||||
|
.. _composition-java:
|
||||||
|
|
||||||
|
Modularity, Composition and Hierarchy
|
||||||
|
=====================================
|
||||||
|
|
||||||
|
Akka Streams provide a uniform model of stream processing graphs, which allows flexible composition of reusable
|
||||||
|
components. In this chapter we show how these look like from the conceptual and API perspective, demonstrating
|
||||||
|
the modularity aspects of the library.
|
||||||
|
|
||||||
|
Basics of composition and modularity
|
||||||
|
------------------------------------
|
||||||
|
|
||||||
|
Every processing stage used in Akka Streams can be imagined as a "box" with input and output ports where elements to
|
||||||
|
be processed arrive and leave the stage. In this view, a :class:`Source` is nothing else than a "box" with a single
|
||||||
|
output port, or, a :class:`BidiFlow` is a "box" with exactly two input and two output ports. In the figure below
|
||||||
|
we illustrate the most common used stages viewed as "boxes".
|
||||||
|
|
||||||
|
|
|
||||||
|
|
||||||
|
.. image:: ../images/compose_shapes.png
|
||||||
|
:align: center
|
||||||
|
|
||||||
|
|
|
||||||
|
|
||||||
|
The *linear* stages are :class:`Source`, :class:`Sink`
|
||||||
|
and :class:`Flow`, as these can be used to compose strict chains of processing stages.
|
||||||
|
Fan-in and Fan-out stages have usually multiple input or multiple output ports, therefore they allow to build
|
||||||
|
more complex graph layouts, not just chains. :class:`BidiFlow` stages are usually useful in IO related tasks, where
|
||||||
|
there are input and output channels to be handled. Due to the specific shape of :class:`BidiFlow` it is easy to
|
||||||
|
stack them on top of each other to build a layered protocol for example. The ``TLS`` support in Akka is for example
|
||||||
|
implemented as a :class:`BidiFlow`.
|
||||||
|
|
||||||
|
These reusable components already allow the creation of complex processing networks. What we
|
||||||
|
have seen so far does not implement modularity though. It is desirable for example to package up a larger graph entity into
|
||||||
|
a reusable component which hides its internals only exposing the ports that are meant to the users of the module
|
||||||
|
to interact with. One good example is the ``Http`` server component, which is encoded internally as a
|
||||||
|
:class:`BidiFlow` which interfaces with the client TCP connection using an input-output port pair accepting and sending
|
||||||
|
:class:`ByteString` s, while its upper ports emit and receive :class:`HttpRequest` and :class:`HttpResponse` instances.
|
||||||
|
|
||||||
|
The following figure demonstrates various composite stages, that contain various other type of stages internally, but
|
||||||
|
hiding them behind a *shape* that looks like a :class:`Source`, :class:`Flow`, etc.
|
||||||
|
|
||||||
|
|
|
||||||
|
|
||||||
|
.. image:: ../images/compose_composites.png
|
||||||
|
:align: center
|
||||||
|
|
||||||
|
|
|
||||||
|
|
||||||
|
One interesting example above is a :class:`Flow` which is composed of a disconnected :class:`Sink` and :class:`Source`.
|
||||||
|
This can be achieved by using the ``wrap()`` constructor method on :class:`Flow` which takes the two parts as
|
||||||
|
parameters.
|
||||||
|
|
||||||
|
The example :class:`BidiFlow` demonstrates that internally a module can be of arbitrary complexity, and the exposed
|
||||||
|
ports can be wired in flexible ways. The only constraint is that all the ports of enclosed modules must be either
|
||||||
|
connected to each other, or exposed as interface ports, and the number of such ports needs to match the requirement
|
||||||
|
of the shape, for example a :class:`Source` allows only one exposed output port, the rest of the internal ports must
|
||||||
|
be properly connected.
|
||||||
|
|
||||||
|
These mechanics allow arbitrary nesting of modules. For example the following figure demonstrates a :class:`RunnableGraph`
|
||||||
|
that is built from a composite :class:`Source` and a composite :class:`Sink` (which in turn contains a composite
|
||||||
|
:class:`Flow`).
|
||||||
|
|
||||||
|
|
|
||||||
|
|
||||||
|
.. image:: ../images/compose_nested_flow.png
|
||||||
|
:align: center
|
||||||
|
|
||||||
|
|
|
||||||
|
|
||||||
|
The above diagram contains one more shape that we have not seen yet, which is called :class:`RunnableGraph`. It turns
|
||||||
|
out, that if we wire all exposed ports together, so that no more open ports remain, we get a module that is *closed*.
|
||||||
|
This is what the :class:`RunnableGraph` class represents. This is the shape that a :class:`Materializer` can take
|
||||||
|
and turn into a network of running entities that perform the task described. In fact, a :class:`RunnableGraph` is a
|
||||||
|
module itself, and (maybe somewhat surprisingly) it can be used as part of larger graphs. It is rarely useful to embed
|
||||||
|
a closed graph shape in a larger graph (since it becomes an isolated island as there are no open port for communication
|
||||||
|
with the rest of the graph), but this demonstrates the uniform underlying model.
|
||||||
|
|
||||||
|
If we try to build a code snippet that corresponds to the above diagram, our first try might look like this:
|
||||||
|
|
||||||
|
.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/CompositionDocTest.java#non-nested-flow
|
||||||
|
|
||||||
|
It is clear however that there is no nesting present in our first attempt, since the library cannot figure out
|
||||||
|
where we intended to put composite module boundaries, it is our responsibility to do that. If we are using the
|
||||||
|
DSL provided by the :class:`Flow`, :class:`Source`, :class:`Sink` classes then nesting can be achieved by calling one of the
|
||||||
|
methods ``withAttributes()`` or ``named()`` (where the latter is just a shorthand for adding a name attribute).
|
||||||
|
|
||||||
|
The following code demonstrates how to achieve the desired nesting:
|
||||||
|
|
||||||
|
.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/CompositionDocTest.java#nested-flow
|
||||||
|
|
||||||
|
Once we have hidden the internals of our components, they act like any other built-in component of similar shape. If
|
||||||
|
we hide some of the internals of our composites, the result looks just like if any other predefine component has been
|
||||||
|
used:
|
||||||
|
|
||||||
|
|
|
||||||
|
|
||||||
|
.. image:: ../images/compose_nested_flow_opaque.png
|
||||||
|
:align: center
|
||||||
|
|
||||||
|
|
|
||||||
|
|
||||||
|
If we look at usage of built-in components, and our custom components, there is no difference in usage as the code
|
||||||
|
snippet below demonstrates.
|
||||||
|
|
||||||
|
.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/CompositionDocTest.java#reuse
|
||||||
|
|
||||||
|
Composing complex systems
|
||||||
|
-------------------------
|
||||||
|
|
||||||
|
In the previous section we explored the possibility of composition, and hierarchy, but we stayed away from non-linear,
|
||||||
|
generalized graph components. There is nothing in Akka Streams though that enforces that stream processing layouts
|
||||||
|
can only be linear. The DSL for :class:`Source` and friends is optimized for creating such linear chains, as they are
|
||||||
|
the most common in practice. There is a more advanced DSL for building complex graphs, that can be used if more
|
||||||
|
flexibility is needed. We will see that the difference between the two DSLs is only on the surface: the concepts they
|
||||||
|
operate on are uniform across all DSLs and fit together nicely.
|
||||||
|
|
||||||
|
As a first example, let's look at a more complex layout:
|
||||||
|
|
||||||
|
|
|
||||||
|
|
||||||
|
.. image:: ../images/compose_graph.png
|
||||||
|
:align: center
|
||||||
|
|
||||||
|
|
|
||||||
|
|
||||||
|
The diagram shows a :class:`RunnableGraph` (remember, if there are no unwired ports, the graph is closed, and therefore
|
||||||
|
can be materialized) that encapsulates a non-trivial stream processing network. It contains fan-in, fan-out stages,
|
||||||
|
directed and non-directed cycles. The ``closed()`` method of the :class:`FlowGraph` factory object allows the creation of a
|
||||||
|
general closed graph. For example the network on the diagram can be realized like this:
|
||||||
|
|
||||||
|
.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/CompositionDocTest.java#complex-graph
|
||||||
|
|
||||||
|
In the code above we used the implicit port numbering feature to make the graph more readable and similar to the diagram.
|
||||||
|
It is possible to refer to the ports, so another version might look like this:
|
||||||
|
|
||||||
|
.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/CompositionDocTest.java#complex-graph-alt
|
||||||
|
|
||||||
|
|
|
||||||
|
|
||||||
|
Similar to the case in the first section, so far we have not considered modularity. We created a complex graph, but
|
||||||
|
the layout is flat, not modularized. We will modify our example, and create a reusable component with the graph DSL.
|
||||||
|
The way to do it is to use the ``partial()`` method on :class:`FlowGraph` factory. If we remove the sources and sinks
|
||||||
|
from the previous example, what remains is a partial graph:
|
||||||
|
|
||||||
|
|
|
||||||
|
|
||||||
|
.. image:: ../images/compose_graph_partial.png
|
||||||
|
:align: center
|
||||||
|
|
||||||
|
|
|
||||||
|
|
||||||
|
We can recreate a similar graph in code, using the DSL in a similar way than before:
|
||||||
|
|
||||||
|
.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/CompositionDocTest.java#partial-graph
|
||||||
|
|
||||||
|
The only new addition is the return value of the builder block, which is a :class:`Shape`. All graphs (including
|
||||||
|
:class:`Source`, :class:`BidiFlow`, etc) have a shape, which encodes the *typed* ports of the module. In our example
|
||||||
|
there is exactly one input and output port left, so we can declare it to have a :class:`FlowShape` by returning an
|
||||||
|
instance of it. While it is possible to create new :class:`Shape` types, it is usually recommended to use one of the
|
||||||
|
matching built-in ones.
|
||||||
|
|
||||||
|
The resulting graph is already a properly wrapped module, so there is no need to call `named()` to encapsulate the graph, but
|
||||||
|
it is a good practice to give names to modules to help debugging.
|
||||||
|
|
||||||
|
|
|
||||||
|
|
||||||
|
.. image:: ../images/compose_graph_shape.png
|
||||||
|
:align: center
|
||||||
|
|
||||||
|
|
|
||||||
|
|
||||||
|
Since our partial graph has the right shape, it can be already used in the simpler, linear DSL:
|
||||||
|
|
||||||
|
.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/CompositionDocTest.java#partial-use
|
||||||
|
|
||||||
|
It is not possible to use it as a :class:`Flow` yet, though (i.e. we cannot call ``.filter()`` on it), but :class:`Flow`
|
||||||
|
has a ``wrap()`` method that just adds the DSL to a :class:`FlowShape`. There are similar methods on :class:`Source`,
|
||||||
|
:class:`Sink` and :class:`BidiShape`, so it is easy to get back to the simpler DSL if a graph has the right shape.
|
||||||
|
For convenience, it is also possible to skip the partial graph creation, and use one of the convenience creator methods.
|
||||||
|
To demonstrate this, we will create the following graph:
|
||||||
|
|
||||||
|
|
|
||||||
|
|
||||||
|
.. image:: ../images/compose_graph_flow.png
|
||||||
|
:align: center
|
||||||
|
|
||||||
|
|
|
||||||
|
|
||||||
|
The code version of the above closed graph might look like this:
|
||||||
|
|
||||||
|
.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/CompositionDocTest.java#partial-flow-dsl
|
||||||
|
|
||||||
|
.. note::
|
||||||
|
All graph builder sections check if the resulting graph has all ports connected except the exposed ones and will
|
||||||
|
throw an exception if this is violated.
|
||||||
|
|
||||||
|
We are still in debt of demonstrating that :class:`RunnableGraph` is a component just like any other, which can
|
||||||
|
be embedded in graphs. In the following snippet we embed one closed graph in another:
|
||||||
|
|
||||||
|
.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/CompositionDocTest.java#embed-closed
|
||||||
|
|
||||||
|
The type of the imported module indicates that the imported module has a :class:`ClosedShape`, and so we are not
|
||||||
|
able to wire it to anything else inside the enclosing closed graph. Nevertheless, this "island" is embedded properly,
|
||||||
|
and will be materialized just like any other module that is part of the graph.
|
||||||
|
|
||||||
|
As we have demonstrated, the two DSLs are fully interoperable, as they encode a similar nested structure of "boxes with
|
||||||
|
ports", it is only the DSLs that differ to be as much powerful as possible on the given abstraction level. It is possible
|
||||||
|
to embed complex graphs in the fluid DSL, and it is just as easy to import and embed a :class:`Flow`, etc, in a larger,
|
||||||
|
complex structure.
|
||||||
|
|
||||||
|
We have also seen, that every module has a :class:`Shape` (for example a :class:`Sink` has a :class:`SinkShape`)
|
||||||
|
independently which DSL was used to create it. This uniform representation enables the rich composability of various
|
||||||
|
stream processing entities in a convenient way.
|
||||||
|
|
||||||
|
Materialized values
|
||||||
|
-------------------
|
||||||
|
|
||||||
|
After realizing that :class:`RunnableGraph` is nothing more than a module with no unused ports (it is an island), it becomes clear that
|
||||||
|
after materialization the only way to communicate with the running stream processing logic is via some side-channel.
|
||||||
|
This side channel is represented as a *materialized value*. The situation is similar to :class:`Actor` s, where the
|
||||||
|
:class:`Props` instance describes the actor logic, but it is the call to ``actorOf()`` that creates an actually running
|
||||||
|
actor, and returns an :class:`ActorRef` that can be used to communicate with the running actor itself. Since the
|
||||||
|
:class:`Props` can be reused, each call will return a different reference.
|
||||||
|
|
||||||
|
When it comes to streams, each materialization creates a new running network corresponding to the blueprint that was
|
||||||
|
encoded in the provided :class:`RunnableGraph`. To be able to interact with the running network, each materialization
|
||||||
|
needs to return a different object that provides the necessary interaction capabilities. In other words, the
|
||||||
|
:class:`RunnableGraph` can be seen as a factory, which creates:
|
||||||
|
|
||||||
|
* a network of running processing entities, inaccessible from the outside
|
||||||
|
* a materialized value, optionally providing a controlled interaction capability with the network
|
||||||
|
|
||||||
|
Unlike actors though, each of the processing stages might provide a materialized value, so when we compose multiple
|
||||||
|
stages or modules, we need to combine the materialized value as well (there are default rules which make this easier,
|
||||||
|
for example `to()` and `via()` takes care of the most common case of taking the materialized value to the left.
|
||||||
|
See :ref:`flow-combine-mat-scala` for details). We demonstrate how this works by a code example and a diagram which
|
||||||
|
graphically demonstrates what is happening.
|
||||||
|
|
||||||
|
The propagation of the individual materialized values from the enclosed modules towards the top will look like this:
|
||||||
|
|
||||||
|
|
|
||||||
|
|
||||||
|
.. image:: ../images/compose_mat.png
|
||||||
|
:align: center
|
||||||
|
|
||||||
|
|
|
||||||
|
|
||||||
|
To implement the above, first, we create a composite :class:`Source`, where the enclosed :class:`Source` have a
|
||||||
|
materialized type of :class:`Promise<BoxedUnit>`. By using the combiner function ``Keep.left()``, the resulting materialized
|
||||||
|
type is of the nested module (indicated by the color *red* on the diagram):
|
||||||
|
|
||||||
|
.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/CompositionDocTest.java#mat-combine-1
|
||||||
|
|
||||||
|
Next, we create a composite :class:`Flow` from two smaller components. Here, the second enclosed :class:`Flow` has a
|
||||||
|
materialized type of :class:`Future<OutgoingConnection>`, and we propagate this to the parent by using ``Keep.right()``
|
||||||
|
as the combiner function (indicated by the color *yellow* on the diagram):
|
||||||
|
|
||||||
|
.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/CompositionDocTest.java#mat-combine-2
|
||||||
|
|
||||||
|
As a third step, we create a composite :class:`Sink`, using our ``nestedFlow`` as a building block. In this snippet, both
|
||||||
|
the enclosed :class:`Flow` and the folding :class:`Sink` has a materialized value that is interesting for us, so
|
||||||
|
we use ``Keep.both()`` to get a :class:`Pair` of them as the materialized type of ``nestedSink`` (indicated by the color
|
||||||
|
*blue* on the diagram)
|
||||||
|
|
||||||
|
.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/CompositionDocTest.java#mat-combine-3
|
||||||
|
|
||||||
|
As the last example, we wire together ``nestedSource`` and ``nestedSink`` and we use a custom combiner function to
|
||||||
|
create a yet another materialized type of the resulting :class:`RunnableGraph`. This combiner function just ignores
|
||||||
|
the :class:`Future<Sink>` part, and wraps the other two values in a custom case class :class:`MyClass`
|
||||||
|
(indicated by color *purple* on the diagram):
|
||||||
|
|
||||||
|
.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/CompositionDocTest.java#mat-combine-4a
|
||||||
|
|
||||||
|
.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/CompositionDocTest.java#mat-combine-4b
|
||||||
|
|
||||||
|
.. note::
|
||||||
|
The nested structure in the above example is not necessary for combining the materialized values, it just
|
||||||
|
demonstrates how the two features work together. See :ref:`flow-combine-mat-java` for further examples
|
||||||
|
of combining materialized values without nesting and hierarchy involved.
|
||||||
|
|
||||||
|
Attributes
|
||||||
|
----------
|
||||||
|
|
||||||
|
We have seen that we can use ``named()`` to introduce a nesting level in the fluid DSL (and also explicit nesting by using
|
||||||
|
``partial()`` from :class:`FlowGraph`). Apart from having the effect of adding a nesting level, ``named()`` is actually
|
||||||
|
a shorthand for calling ``withAttributes(Attributes.name("someName"))``. Attributes provide a way to fine-tune certain
|
||||||
|
aspects of the materialized running entity. For example buffer sizes can be controlled via attributes (see
|
||||||
|
:ref:`stream-buffers-scala`). When it comes to hierarchic composition, attributes are inherited by nested modules,
|
||||||
|
unless they override them with a custom value.
|
||||||
|
|
||||||
|
The code below, a modification of an earlier example sets the ``inputBuffer`` attribute on certain modules, but not
|
||||||
|
on others:
|
||||||
|
|
||||||
|
.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/CompositionDocTest.java#attributes-inheritance
|
||||||
|
|
||||||
|
The effect is, that each module inherits the ``inputBuffer`` attribute from its enclosing parent, unless it has
|
||||||
|
the same attribute explicitly set. ``nestedSource`` gets the default attributes from the materializer itself. ``nestedSink``
|
||||||
|
on the other hand has this attribute set, so it will be used by all nested modules. ``nestedFlow`` will inherit from ``nestedSource``
|
||||||
|
except the ``map`` stage which has again an explicitly provided attribute overriding the inherited one.
|
||||||
|
|
||||||
|
|
|
||||||
|
|
||||||
|
.. image:: ../images/compose_attributes.png
|
||||||
|
:align: center
|
||||||
|
|
||||||
|
|
|
||||||
|
|
||||||
|
This diagram illustrates the inheritance process for the example code (representing the materializer default attributes
|
||||||
|
as the color *red*, the attributes set on ``nestedSink`` as *blue* and the attributes set on ``nestedFlow`` as *green*).
|
||||||
|
|
@ -212,6 +212,8 @@ which will be running on the thread pools they have been configured to run on -
|
||||||
Reusing *instances* of linear computation stages (Source, Sink, Flow) inside FlowGraphs is legal,
|
Reusing *instances* of linear computation stages (Source, Sink, Flow) inside FlowGraphs is legal,
|
||||||
yet will materialize that stage multiple times.
|
yet will materialize that stage multiple times.
|
||||||
|
|
||||||
|
.. _flow-combine-mat-java:
|
||||||
|
|
||||||
Combining materialized values
|
Combining materialized values
|
||||||
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
|
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -11,6 +11,7 @@ Streams
|
||||||
../stream-design
|
../stream-design
|
||||||
stream-flows-and-basics
|
stream-flows-and-basics
|
||||||
stream-graphs
|
stream-graphs
|
||||||
|
stream-composition
|
||||||
stream-rate
|
stream-rate
|
||||||
stream-customize
|
stream-customize
|
||||||
stream-integrations
|
stream-integrations
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,244 @@
|
||||||
|
/**
|
||||||
|
* Copyright (C) 2015 Typesafe Inc. <http://www.typesafe.com>
|
||||||
|
*/
|
||||||
|
package docs.stream
|
||||||
|
|
||||||
|
import akka.stream._
|
||||||
|
import akka.stream.scaladsl.Tcp.OutgoingConnection
|
||||||
|
import akka.stream.scaladsl._
|
||||||
|
import akka.stream.testkit.AkkaSpec
|
||||||
|
import akka.util.ByteString
|
||||||
|
|
||||||
|
import scala.concurrent.{ Future, Promise }
|
||||||
|
|
||||||
|
class CompositionDocSpec extends AkkaSpec {
|
||||||
|
|
||||||
|
implicit val ec = system.dispatcher
|
||||||
|
implicit val mat = ActorMaterializer()
|
||||||
|
|
||||||
|
"nonnested flow" in {
|
||||||
|
//#non-nested-flow
|
||||||
|
Source.single(0)
|
||||||
|
.map(_ + 1)
|
||||||
|
.filter(_ != 0)
|
||||||
|
.map(_ - 2)
|
||||||
|
.to(Sink.fold(0)(_ + _))
|
||||||
|
|
||||||
|
// ... where is the nesting?
|
||||||
|
//#non-nested-flow
|
||||||
|
}
|
||||||
|
|
||||||
|
"nested flow" in {
|
||||||
|
//#nested-flow
|
||||||
|
val nestedSource =
|
||||||
|
Source.single(0) // An atomic source
|
||||||
|
.map(_ + 1) // an atomic processing stage
|
||||||
|
.named("nestedSource") // wraps up the current Source and gives it a name
|
||||||
|
|
||||||
|
val nestedFlow =
|
||||||
|
Flow[Int].filter(_ != 0) // an atomic processing stage
|
||||||
|
.map(_ - 2) // another atomic processing stage
|
||||||
|
.named("nestedFlow") // wraps up the Flow, and gives it a name
|
||||||
|
|
||||||
|
val nestedSink =
|
||||||
|
nestedFlow.to(Sink.fold(0)(_ + _)) // wire an atomic sink to the nestedFlow
|
||||||
|
.named("nestedSink") // wrap it up
|
||||||
|
|
||||||
|
// Create a RunnableGraph
|
||||||
|
val runnableGraph = nestedSource.to(nestedSink)
|
||||||
|
//#nested-flow
|
||||||
|
}
|
||||||
|
|
||||||
|
"reusing components" in {
|
||||||
|
val nestedSource =
|
||||||
|
Source.single(0) // An atomic source
|
||||||
|
.map(_ + 1) // an atomic processing stage
|
||||||
|
.named("nestedSource") // wraps up the current Source and gives it a name
|
||||||
|
|
||||||
|
val nestedFlow =
|
||||||
|
Flow[Int].filter(_ != 0) // an atomic processing stage
|
||||||
|
.map(_ - 2) // another atomic processing stage
|
||||||
|
.named("nestedFlow") // wraps up the Flow, and gives it a name
|
||||||
|
|
||||||
|
val nestedSink =
|
||||||
|
nestedFlow.to(Sink.fold(0)(_ + _)) // wire an atomic sink to the nestedFlow
|
||||||
|
.named("nestedSink") // wrap it up
|
||||||
|
|
||||||
|
//#reuse
|
||||||
|
// Create a RunnableGraph from our components
|
||||||
|
val runnableGraph = nestedSource.to(nestedSink)
|
||||||
|
|
||||||
|
// Usage is uniform, no matter if modules are composite or atomic
|
||||||
|
val runnableGraph2 = Source.single(0).to(Sink.fold(0)(_ + _))
|
||||||
|
//#reuse
|
||||||
|
}
|
||||||
|
|
||||||
|
"complex graph" in {
|
||||||
|
// format: OFF
|
||||||
|
//#complex-graph
|
||||||
|
import FlowGraph.Implicits._
|
||||||
|
FlowGraph.closed() { implicit builder =>
|
||||||
|
val A: Outlet[Int] = builder.add(Source.single(0))
|
||||||
|
val B: UniformFanOutShape[Int, Int] = builder.add(Broadcast[Int](2))
|
||||||
|
val C: UniformFanInShape[Int, Int] = builder.add(Merge[Int](2))
|
||||||
|
val D: FlowShape[Int, Int] = builder.add(Flow[Int].map(_ + 1))
|
||||||
|
val E: UniformFanOutShape[Int, Int] = builder.add(Balance[Int](2))
|
||||||
|
val F: UniformFanInShape[Int, Int] = builder.add(Merge[Int](2))
|
||||||
|
val G: Inlet[Any] = builder.add(Sink.foreach(println))
|
||||||
|
|
||||||
|
C <~ F
|
||||||
|
A ~> B ~> C ~> F
|
||||||
|
B ~> D ~> E ~> F
|
||||||
|
E ~> G
|
||||||
|
}
|
||||||
|
//#complex-graph
|
||||||
|
|
||||||
|
//#complex-graph-alt
|
||||||
|
import FlowGraph.Implicits._
|
||||||
|
FlowGraph.closed() { implicit builder =>
|
||||||
|
val B = builder.add(Broadcast[Int](2))
|
||||||
|
val C = builder.add(Merge[Int](2))
|
||||||
|
val E = builder.add(Balance[Int](2))
|
||||||
|
val F = builder.add(Merge[Int](2))
|
||||||
|
|
||||||
|
Source.single(0) ~> B.in; B.out(0) ~> C.in(1); C.out ~> F.in(0)
|
||||||
|
C.in(0) <~ F.out
|
||||||
|
|
||||||
|
B.out(1).map(_ + 1) ~> E.in; E.out(0) ~> F.in(1)
|
||||||
|
E.out(1) ~> Sink.foreach(println)
|
||||||
|
}
|
||||||
|
//#complex-graph-alt
|
||||||
|
// format: ON
|
||||||
|
}
|
||||||
|
|
||||||
|
"partial graph" in {
|
||||||
|
// format: OFF
|
||||||
|
//#partial-graph
|
||||||
|
import FlowGraph.Implicits._
|
||||||
|
val partial = FlowGraph.partial() { implicit builder =>
|
||||||
|
val B = builder.add(Broadcast[Int](2))
|
||||||
|
val C = builder.add(Merge[Int](2))
|
||||||
|
val E = builder.add(Balance[Int](2))
|
||||||
|
val F = builder.add(Merge[Int](2))
|
||||||
|
|
||||||
|
C <~ F
|
||||||
|
B ~> C ~> F
|
||||||
|
B ~> Flow[Int].map(_ + 1) ~> E ~> F
|
||||||
|
FlowShape(B.in, E.out(1))
|
||||||
|
}.named("partial")
|
||||||
|
//#partial-graph
|
||||||
|
// format: ON
|
||||||
|
|
||||||
|
//#partial-use
|
||||||
|
Source.single(0).via(partial).to(Sink.ignore)
|
||||||
|
//#partial-use
|
||||||
|
|
||||||
|
// format: OFF
|
||||||
|
//#partial-flow-dsl
|
||||||
|
// Convert the partial graph of FlowShape to a Flow to get
|
||||||
|
// access to the fluid DSL (for example to be able to call .filter())
|
||||||
|
val flow = Flow.wrap(partial)
|
||||||
|
|
||||||
|
// Simple way to create a graph backed Source
|
||||||
|
val source = Source() { implicit builder =>
|
||||||
|
val merge = builder.add(Merge[Int](2))
|
||||||
|
Source.single(0) ~> merge
|
||||||
|
Source(List(2, 3, 4)) ~> merge
|
||||||
|
|
||||||
|
// Exposing exactly one output port
|
||||||
|
merge.out
|
||||||
|
}
|
||||||
|
|
||||||
|
// Building a Sink with a nested Flow, using the fluid DSL
|
||||||
|
val sink = {
|
||||||
|
val nestedFlow = Flow[Int].map(_ * 2).drop(10).named("nestedFlow")
|
||||||
|
nestedFlow.to(Sink.head)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Putting all together
|
||||||
|
val closed = source.via(flow.filter(_ > 1)).to(sink)
|
||||||
|
//#partial-flow-dsl
|
||||||
|
// format: ON
|
||||||
|
}
|
||||||
|
|
||||||
|
"closed graph" in {
|
||||||
|
//#embed-closed
|
||||||
|
val closed1 = Source.single(0).to(Sink.foreach(println))
|
||||||
|
val closed2 = FlowGraph.closed() { implicit builder =>
|
||||||
|
val embeddedClosed: ClosedShape = builder.add(closed1)
|
||||||
|
}
|
||||||
|
//#embed-closed
|
||||||
|
}
|
||||||
|
|
||||||
|
"materialized values" in {
|
||||||
|
//#mat-combine-1
|
||||||
|
// Materializes to Promise[Unit] (red)
|
||||||
|
val source: Source[Int, Promise[Unit]] = Source.lazyEmpty[Int]
|
||||||
|
|
||||||
|
// Materializes to Unit (black)
|
||||||
|
val flow1: Flow[Int, Int, Unit] = Flow[Int].take(100)
|
||||||
|
|
||||||
|
// Materializes to Promise[Unit] (red)
|
||||||
|
val nestedSource: Source[Int, Promise[Unit]] =
|
||||||
|
source.viaMat(flow1)(Keep.left).named("nestedSource")
|
||||||
|
//#mat-combine-1
|
||||||
|
|
||||||
|
//#mat-combine-2
|
||||||
|
// Materializes to Unit (orange)
|
||||||
|
val flow2: Flow[Int, ByteString, Unit] = Flow[Int].map { i => ByteString(i.toString) }
|
||||||
|
|
||||||
|
// Materializes to Future[OutgoingConnection] (yellow)
|
||||||
|
val flow3: Flow[ByteString, ByteString, Future[OutgoingConnection]] =
|
||||||
|
Tcp().outgoingConnection("localhost", 8080)
|
||||||
|
|
||||||
|
// Materializes to Future[OutgoingConnection] (yellow)
|
||||||
|
val nestedFlow: Flow[Int, ByteString, Future[OutgoingConnection]] =
|
||||||
|
flow2.viaMat(flow3)(Keep.right).named("nestedFlow")
|
||||||
|
//#mat-combine-2
|
||||||
|
|
||||||
|
//#mat-combine-3
|
||||||
|
// Materializes to Future[String] (green)
|
||||||
|
val sink: Sink[ByteString, Future[String]] = Sink.fold("")(_ + _.utf8String)
|
||||||
|
|
||||||
|
// Materializes to (Future[OutgoingConnection], Future[String]) (blue)
|
||||||
|
val nestedSink: Sink[Int, (Future[OutgoingConnection], Future[String])] =
|
||||||
|
nestedFlow.toMat(sink)(Keep.both)
|
||||||
|
//#mat-combine-3
|
||||||
|
|
||||||
|
//#mat-combine-4
|
||||||
|
case class MyClass(private val p: Promise[Unit], conn: OutgoingConnection) {
|
||||||
|
def close() = p.success(())
|
||||||
|
}
|
||||||
|
|
||||||
|
def f(p: Promise[Unit],
|
||||||
|
rest: (Future[OutgoingConnection], Future[String])): Future[MyClass] = {
|
||||||
|
|
||||||
|
val connFuture = rest._1
|
||||||
|
connFuture.map(MyClass(p, _))
|
||||||
|
}
|
||||||
|
|
||||||
|
// Materializes to Future[MyClass] (purple)
|
||||||
|
val runnableGraph: RunnableGraph[Future[MyClass]] =
|
||||||
|
nestedSource.toMat(nestedSink)(f)
|
||||||
|
//#mat-combine-4
|
||||||
|
}
|
||||||
|
|
||||||
|
"attributes" in {
|
||||||
|
//#attributes-inheritance
|
||||||
|
import Attributes._
|
||||||
|
val nestedSource =
|
||||||
|
Source.single(0)
|
||||||
|
.map(_ + 1)
|
||||||
|
.named("nestedSource") // Wrap, no inputBuffer set
|
||||||
|
|
||||||
|
val nestedFlow =
|
||||||
|
Flow[Int].filter(_ != 0)
|
||||||
|
.via(Flow[Int].map(_ - 2).withAttributes(inputBuffer(4, 4))) // override
|
||||||
|
.named("nestedFlow") // Wrap, no inputBuffer set
|
||||||
|
|
||||||
|
val nestedSink =
|
||||||
|
nestedFlow.to(Sink.fold(0)(_ + _)) // wire an atomic sink to the nestedFlow
|
||||||
|
.withAttributes(name("nestedSink") and inputBuffer(3, 3)) // override
|
||||||
|
//#attributes-inheritance
|
||||||
|
}
|
||||||
|
}
|
||||||
311
akka-docs-dev/rst/scala/stream-composition.rst
Normal file
|
|
@ -0,0 +1,311 @@
|
||||||
|
.. _composition-scala:
|
||||||
|
|
||||||
|
Modularity, Composition and Hierarchy
|
||||||
|
=====================================
|
||||||
|
|
||||||
|
Akka Streams provide a uniform model of stream processing graphs, which allows flexible composition of reusable
|
||||||
|
components. In this chapter we show how these look like from the conceptual and API perspective, demonstrating
|
||||||
|
the modularity aspects of the library.
|
||||||
|
|
||||||
|
Basics of composition and modularity
|
||||||
|
------------------------------------
|
||||||
|
|
||||||
|
Every processing stage used in Akka Streams can be imagined as a "box" with input and output ports where elements to
|
||||||
|
be processed arrive and leave the stage. In this view, a :class:`Source` is nothing else than a "box" with a single
|
||||||
|
output port, or, a :class:`BidiFlow` is a "box" with exactly two input and two output ports. In the figure below
|
||||||
|
we illustrate the most common used stages viewed as "boxes".
|
||||||
|
|
||||||
|
|
|
||||||
|
|
||||||
|
.. image:: ../images/compose_shapes.png
|
||||||
|
:align: center
|
||||||
|
|
||||||
|
|
|
||||||
|
|
||||||
|
The *linear* stages are :class:`Source`, :class:`Sink`
|
||||||
|
and :class:`Flow`, as these can be used to compose strict chains of processing stages.
|
||||||
|
Fan-in and Fan-out stages have usually multiple input or multiple output ports, therefore they allow to build
|
||||||
|
more complex graph layouts, not just chains. :class:`BidiFlow` stages are usually useful in IO related tasks, where
|
||||||
|
there are input and output channels to be handled. Due to the specific shape of :class:`BidiFlow` it is easy to
|
||||||
|
stack them on top of each other to build a layered protocol for example. The ``TLS`` support in Akka is for example
|
||||||
|
implemented as a :class:`BidiFlow`.
|
||||||
|
|
||||||
|
These reusable components already allow the creation of complex processing networks. What we
|
||||||
|
have seen so far does not implement modularity though. It is desirable for example to package up a larger graph entity into
|
||||||
|
a reusable component which hides its internals only exposing the ports that are meant to the users of the module
|
||||||
|
to interact with. One good example is the ``Http`` server component, which is encoded internally as a
|
||||||
|
:class:`BidiFlow` which interfaces with the client TCP connection using an input-output port pair accepting and sending
|
||||||
|
:class:`ByteString` s, while its upper ports emit and receive :class:`HttpRequest` and :class:`HttpResponse` instances.
|
||||||
|
|
||||||
|
The following figure demonstrates various composite stages, that contain various other type of stages internally, but
|
||||||
|
hiding them behind a *shape* that looks like a :class:`Source`, :class:`Flow`, etc.
|
||||||
|
|
||||||
|
|
|
||||||
|
|
||||||
|
.. image:: ../images/compose_composites.png
|
||||||
|
:align: center
|
||||||
|
|
||||||
|
|
|
||||||
|
|
||||||
|
One interesting example above is a :class:`Flow` which is composed of a disconnected :class:`Sink` and :class:`Source`.
|
||||||
|
This can be achieved by using the ``wrap()`` constructor method on :class:`Flow` which takes the two parts as
|
||||||
|
parameters.
|
||||||
|
|
||||||
|
The example :class:`BidiFlow` demonstrates that internally a module can be of arbitrary complexity, and the exposed
|
||||||
|
ports can be wired in flexible ways. The only constraint is that all the ports of enclosed modules must be either
|
||||||
|
connected to each other, or exposed as interface ports, and the number of such ports needs to match the requirement
|
||||||
|
of the shape, for example a :class:`Source` allows only one exposed output port, the rest of the internal ports must
|
||||||
|
be properly connected.
|
||||||
|
|
||||||
|
These mechanics allow arbitrary nesting of modules. For example the following figure demonstrates a :class:`RunnableGraph`
|
||||||
|
that is built from a composite :class:`Source` and a composite :class:`Sink` (which in turn contains a composite
|
||||||
|
:class:`Flow`).
|
||||||
|
|
||||||
|
|
|
||||||
|
|
||||||
|
.. image:: ../images/compose_nested_flow.png
|
||||||
|
:align: center
|
||||||
|
|
||||||
|
|
|
||||||
|
|
||||||
|
The above diagram contains one more shape that we have not seen yet, which is called :class:`RunnableGraph`. It turns
|
||||||
|
out, that if we wire all exposed ports together, so that no more open ports remain, we get a module that is *closed*.
|
||||||
|
This is what the :class:`RunnableGraph` class represents. This is the shape that a :class:`Materializer` can take
|
||||||
|
and turn into a network of running entities that perform the task described. In fact, a :class:`RunnableGraph` is a
|
||||||
|
module itself, and (maybe somewhat surprisingly) it can be used as part of larger graphs. It is rarely useful to embed
|
||||||
|
a closed graph shape in a larger graph (since it becomes an isolated island as there are no open port for communication
|
||||||
|
with the rest of the graph), but this demonstrates the uniform underlying model.
|
||||||
|
|
||||||
|
If we try to build a code snippet that corresponds to the above diagram, our first try might look like this:
|
||||||
|
|
||||||
|
.. includecode:: code/docs/stream/CompositionDocSpec.scala#non-nested-flow
|
||||||
|
|
||||||
|
It is clear however that there is no nesting present in our first attempt, since the library cannot figure out
|
||||||
|
where we intended to put composite module boundaries, it is our responsibility to do that. If we are using the
|
||||||
|
DSL provided by the :class:`Flow`, :class:`Source`, :class:`Sink` classes then nesting can be achieved by calling one of the
|
||||||
|
methods ``withAttributes()`` or ``named()`` (where the latter is just a shorthand for adding a name attribute).
|
||||||
|
|
||||||
|
The following code demonstrates how to achieve the desired nesting:
|
||||||
|
|
||||||
|
.. includecode:: code/docs/stream/CompositionDocSpec.scala#nested-flow
|
||||||
|
|
||||||
|
Once we have hidden the internals of our components, they act like any other built-in component of similar shape. If
|
||||||
|
we hide some of the internals of our composites, the result looks just like if any other predefine component has been
|
||||||
|
used:
|
||||||
|
|
||||||
|
|
|
||||||
|
|
||||||
|
.. image:: ../images/compose_nested_flow_opaque.png
|
||||||
|
:align: center
|
||||||
|
|
||||||
|
|
|
||||||
|
|
||||||
|
If we look at usage of built-in components, and our custom components, there is no difference in usage as the code
|
||||||
|
snippet below demonstrates.
|
||||||
|
|
||||||
|
.. includecode:: code/docs/stream/CompositionDocSpec.scala#reuse
|
||||||
|
|
||||||
|
Composing complex systems
|
||||||
|
-------------------------
|
||||||
|
|
||||||
|
In the previous section we explored the possibility of composition, and hierarchy, but we stayed away from non-linear,
|
||||||
|
generalized graph components. There is nothing in Akka Streams though that enforces that stream processing layouts
|
||||||
|
can only be linear. The DSL for :class:`Source` and friends is optimized for creating such linear chains, as they are
|
||||||
|
the most common in practice. There is a more advanced DSL for building complex graphs, that can be used if more
|
||||||
|
flexibility is needed. We will see that the difference between the two DSLs is only on the surface: the concepts they
|
||||||
|
operate on are uniform across all DSLs and fit together nicely.
|
||||||
|
|
||||||
|
As a first example, let's look at a more complex layout:
|
||||||
|
|
||||||
|
|
|
||||||
|
|
||||||
|
.. image:: ../images/compose_graph.png
|
||||||
|
:align: center
|
||||||
|
|
||||||
|
|
|
||||||
|
|
||||||
|
The diagram shows a :class:`RunnableGraph` (remember, if there are no unwired ports, the graph is closed, and therefore
|
||||||
|
can be materialized) that encapsulates a non-trivial stream processing network. It contains fan-in, fan-out stages,
|
||||||
|
directed and non-directed cycles. The ``closed()`` method of the :class:`FlowGraph` object allows the creation of a
|
||||||
|
general closed graph. For example the network on the diagram can be realized like this:
|
||||||
|
|
||||||
|
.. includecode:: code/docs/stream/CompositionDocSpec.scala#complex-graph
|
||||||
|
|
||||||
|
In the code above we used the implicit port numbering feature (to make the graph more readable and similar to the diagram)
|
||||||
|
and we imported :class:`Source` s, :class:`Sink` s and :class:`Flow` s explicitly. It is possible to refer to the ports
|
||||||
|
explicitly, and it is not necessary to import our linear stages via ``add()``, so another version might look like this:
|
||||||
|
|
||||||
|
.. includecode:: code/docs/stream/CompositionDocSpec.scala#complex-graph-alt
|
||||||
|
|
||||||
|
|
|
||||||
|
|
||||||
|
Similar to the case in the first section, so far we have not considered modularity. We created a complex graph, but
|
||||||
|
the layout is flat, not modularized. We will modify our example, and create a reusable component with the graph DSL.
|
||||||
|
The way to do it is to use the ``partial()`` factory method on :class:`FlowGraph`. If we remove the sources and sinks
|
||||||
|
from the previous example, what remains is a partial graph:
|
||||||
|
|
||||||
|
|
|
||||||
|
|
||||||
|
.. image:: ../images/compose_graph_partial.png
|
||||||
|
:align: center
|
||||||
|
|
||||||
|
|
|
||||||
|
|
||||||
|
We can recreate a similar graph in code, using the DSL in a similar way than before:
|
||||||
|
|
||||||
|
.. includecode:: code/docs/stream/CompositionDocSpec.scala#partial-graph
|
||||||
|
|
||||||
|
The only new addition is the return value of the builder block, which is a :class:`Shape`. All graphs (including
|
||||||
|
:class:`Source`, :class:`BidiFlow`, etc) have a shape, which encodes the *typed* ports of the module. In our example
|
||||||
|
there is exactly one input and output port left, so we can declare it to have a :class:`FlowShape` by returning an
|
||||||
|
instance of it. While it is possible to create new :class:`Shape` types, it is usually recommended to use one of the
|
||||||
|
matching built-in ones.
|
||||||
|
|
||||||
|
The resulting graph is already a properly wrapped module, so there is no need to call `named()` to encapsulate the graph, but
|
||||||
|
it is a good practice to give names to modules to help debugging.
|
||||||
|
|
||||||
|
|
|
||||||
|
|
||||||
|
.. image:: ../images/compose_graph_shape.png
|
||||||
|
:align: center
|
||||||
|
|
||||||
|
|
|
||||||
|
|
||||||
|
Since our partial graph has the right shape, it can be already used in the simpler, linear DSL:
|
||||||
|
|
||||||
|
.. includecode:: code/docs/stream/CompositionDocSpec.scala#partial-use
|
||||||
|
|
||||||
|
It is not possible to use it as a :class:`Flow` yet, though (i.e. we cannot call ``.filter()`` on it), but :class:`Flow`
|
||||||
|
has a ``wrap()`` method that just adds the DSL to a :class:`FlowShape`. There are similar methods on :class:`Source`,
|
||||||
|
:class:`Sink` and :class:`BidiShape`, so it is easy to get back to the simpler DSL if a graph has the right shape.
|
||||||
|
For convenience, it is also possible to skip the partial graph creation, and use one of the convenience creator methods.
|
||||||
|
To demonstrate this, we will create the following graph:
|
||||||
|
|
||||||
|
|
|
||||||
|
|
||||||
|
.. image:: ../images/compose_graph_flow.png
|
||||||
|
:align: center
|
||||||
|
|
||||||
|
|
|
||||||
|
|
||||||
|
The code version of the above closed graph might look like this:
|
||||||
|
|
||||||
|
.. includecode:: code/docs/stream/CompositionDocSpec.scala#partial-flow-dsl
|
||||||
|
|
||||||
|
.. note::
|
||||||
|
All graph builder sections check if the resulting graph has all ports connected except the exposed ones and will
|
||||||
|
throw an exception if this is violated.
|
||||||
|
|
||||||
|
We are still in debt of demonstrating that :class:`RunnableGraph` is a component just like any other, which can
|
||||||
|
be embedded in graphs. In the following snippet we embed one closed graph in another:
|
||||||
|
|
||||||
|
.. includecode:: code/docs/stream/CompositionDocSpec.scala#embed-closed
|
||||||
|
|
||||||
|
The type of the imported module indicates that the imported module has a :class:`ClosedShape`, and so we are not
|
||||||
|
able to wire it to anything else inside the enclosing closed graph. Nevertheless, this "island" is embedded properly,
|
||||||
|
and will be materialized just like any other module that is part of the graph.
|
||||||
|
|
||||||
|
As we have demonstrated, the two DSLs are fully interoperable, as they encode a similar nested structure of "boxes with
|
||||||
|
ports", it is only the DSLs that differ to be as much powerful as possible on the given abstraction level. It is possible
|
||||||
|
to embed complex graphs in the fluid DSL, and it is just as easy to import and embed a :class:`Flow`, etc, in a larger,
|
||||||
|
complex structure.
|
||||||
|
|
||||||
|
We have also seen, that every module has a :class:`Shape` (for example a :class:`Sink` has a :class:`SinkShape`)
|
||||||
|
independently which DSL was used to create it. This uniform representation enables the rich composability of various
|
||||||
|
stream processing entities in a convenient way.
|
||||||
|
|
||||||
|
Materialized values
|
||||||
|
-------------------
|
||||||
|
|
||||||
|
After realizing that :class:`RunnableGraph` is nothing more than a module with no unused ports (it is an island), it becomes clear that
|
||||||
|
after materialization the only way to communicate with the running stream processing logic is via some side-channel.
|
||||||
|
This side channel is represented as a *materialized value*. The situation is similar to :class:`Actor` s, where the
|
||||||
|
:class:`Props` instance describes the actor logic, but it is the call to ``actorOf()`` that creates an actually running
|
||||||
|
actor, and returns an :class:`ActorRef` that can be used to communicate with the running actor itself. Since the
|
||||||
|
:class:`Props` can be reused, each call will return a different reference.
|
||||||
|
|
||||||
|
When it comes to streams, each materialization creates a new running network corresponding to the blueprint that was
|
||||||
|
encoded in the provided :class:`RunnableGraph`. To be able to interact with the running network, each materialization
|
||||||
|
needs to return a different object that provides the necessary interaction capabilities. In other words, the
|
||||||
|
:class:`RunnableGraph` can be seen as a factory, which creates:
|
||||||
|
|
||||||
|
* a network of running processing entities, inaccessible from the outside
|
||||||
|
* a materialized value, optionally providing a controlled interaction capability with the network
|
||||||
|
|
||||||
|
Unlike actors though, each of the processing stages might provide a materialized value, so when we compose multiple
|
||||||
|
stages or modules, we need to combine the materialized value as well (there are default rules which make this easier,
|
||||||
|
for example `to()` and `via()` takes care of the most common case of taking the materialized value to the left.
|
||||||
|
See :ref:`flow-combine-mat-scala` for details). We demonstrate how this works by a code example and a diagram which
|
||||||
|
graphically demonstrates what is happening.
|
||||||
|
|
||||||
|
The propagation of the individual materialized values from the enclosed modules towards the top will look like this:
|
||||||
|
|
||||||
|
|
|
||||||
|
|
||||||
|
.. image:: ../images/compose_mat.png
|
||||||
|
:align: center
|
||||||
|
|
||||||
|
|
|
||||||
|
|
||||||
|
To implement the above, first, we create a composite :class:`Source`, where the enclosed :class:`Source` have a
|
||||||
|
materialized type of :class:`Promise[Unit]`. By using the combiner function ``Keep.left``, the resulting materialized
|
||||||
|
type is of the nested module (indicated by the color *red* on the diagram):
|
||||||
|
|
||||||
|
.. includecode:: code/docs/stream/CompositionDocSpec.scala#mat-combine-1
|
||||||
|
|
||||||
|
Next, we create a composite :class:`Flow` from two smaller components. Here, the second enclosed :class:`Flow` has a
|
||||||
|
materialized type of :class:`Future[OutgoingConnection]`, and we propagate this to the parent by using ``Keep.right``
|
||||||
|
as the combiner function (indicated by the color *yellow* on the diagram):
|
||||||
|
|
||||||
|
.. includecode:: code/docs/stream/CompositionDocSpec.scala#mat-combine-2
|
||||||
|
|
||||||
|
As a third step, we create a composite :class:`Sink`, using our ``nestedFlow`` as a building block. In this snippet, both
|
||||||
|
the enclosed :class:`Flow` and the folding :class:`Sink` has a materialized value that is interesting for us, so
|
||||||
|
we use ``Keep.both`` to get a :class:`Pair` of them as the materialized type of ``nestedSink`` (indicated by the color
|
||||||
|
*blue* on the diagram)
|
||||||
|
|
||||||
|
.. includecode:: code/docs/stream/CompositionDocSpec.scala#mat-combine-3
|
||||||
|
|
||||||
|
As the last example, we wire together ``nestedSource`` and ``nestedSink`` and we use a custom combiner function to
|
||||||
|
create a yet another materialized type of the resulting :class:`RunnableGraph`. This combiner function just ignores
|
||||||
|
the :class:`Future[Sink]` part, and wraps the other two values in a custom case class :class:`MyClass`
|
||||||
|
(indicated by color *purple* on the diagram):
|
||||||
|
|
||||||
|
.. includecode:: code/docs/stream/CompositionDocSpec.scala#mat-combine-4
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
.. note::
|
||||||
|
The nested structure in the above example is not necessary for combining the materialized values, it just
|
||||||
|
demonstrates how the two features work together. See :ref:`flow-combine-mat-scala` for further examples
|
||||||
|
of combining materialized values without nesting and hierarchy involved.
|
||||||
|
|
||||||
|
Attributes
|
||||||
|
----------
|
||||||
|
|
||||||
|
We have seen that we can use ``named()`` to introduce a nesting level in the fluid DSL (and also explicit nesting by using
|
||||||
|
``partial()`` from :class:`FlowGraph`). Apart from having the effect of adding a nesting level, ``named()`` is actually
|
||||||
|
a shorthand for calling ``withAttributes(Attributes.name("someName"))``. Attributes provide a way to fine-tune certain
|
||||||
|
aspects of the materialized running entity. For example buffer sizes can be controlled via attributes (see
|
||||||
|
:ref:`stream-buffers-scala`). When it comes to hierarchic composition, attributes are inherited by nested modules,
|
||||||
|
unless they override them with a custom value.
|
||||||
|
|
||||||
|
The code below, a modification of an earlier example sets the ``inputBuffer`` attribute on certain modules, but not
|
||||||
|
on others:
|
||||||
|
|
||||||
|
.. includecode:: code/docs/stream/CompositionDocSpec.scala#attributes-inheritance
|
||||||
|
|
||||||
|
The effect is, that each module inherits the ``inputBuffer`` attribute from its enclosing parent, unless it has
|
||||||
|
the same attribute explicitly set. ``nestedSource`` gets the default attributes from the materializer itself. ``nestedSink``
|
||||||
|
on the other hand has this attribute set, so it will be used by all nested modules. ``nestedFlow`` will inherit from ``nestedSource``
|
||||||
|
except the ``map`` stage which has again an explicitly provided attribute overriding the inherited one.
|
||||||
|
|
||||||
|
|
|
||||||
|
|
||||||
|
.. image:: ../images/compose_attributes.png
|
||||||
|
:align: center
|
||||||
|
|
||||||
|
|
|
||||||
|
|
||||||
|
This diagram illustrates the inheritance process for the example code (representing the materializer default attributes
|
||||||
|
as the color *red*, the attributes set on ``nestedSink`` as *blue* and the attributes set on ``nestedFlow`` as *green*).
|
||||||
|
|
@ -215,6 +215,8 @@ which will be running on the thread pools they have been configured to run on -
|
||||||
Reusing *instances* of linear computation stages (Source, Sink, Flow) inside FlowGraphs is legal,
|
Reusing *instances* of linear computation stages (Source, Sink, Flow) inside FlowGraphs is legal,
|
||||||
yet will materialize that stage multiple times.
|
yet will materialize that stage multiple times.
|
||||||
|
|
||||||
|
.. _flow-combine-mat-scala:
|
||||||
|
|
||||||
Combining materialized values
|
Combining materialized values
|
||||||
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
|
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -11,6 +11,7 @@ Streams
|
||||||
../stream-design
|
../stream-design
|
||||||
stream-flows-and-basics
|
stream-flows-and-basics
|
||||||
stream-graphs
|
stream-graphs
|
||||||
|
stream-composition
|
||||||
stream-rate
|
stream-rate
|
||||||
stream-customize
|
stream-customize
|
||||||
stream-integrations
|
stream-integrations
|
||||||
|
|
|
||||||