Merged the document from java/stream/stages-overview and scala/stream… (#23085)

* Merged the document from java/stream/stages-overview and scala/stream/stages-overview #23084
* Merged the document from java/stream/stream-composition.md and scala/stream/stream-composition.md #23086
This commit is contained in:
Atiq Sayyed 2017-06-08 00:24:12 +07:00 committed by Arnout Engelen
parent 168b40e40c
commit d5155755db
6 changed files with 224 additions and 2423 deletions

File diff suppressed because it is too large Load diff

View file

@ -1,264 +1 @@
# Modularity, Composition and Hierarchy ../../scala/stream/stream-composition.md
Akka Streams provide a uniform model of stream processing graphs, which allows flexible composition of reusable
components. In this chapter we show how these look like from the conceptual and API perspective, demonstrating
the modularity aspects of the library.
## Basics of composition and modularity
Every processing stage used in Akka Streams can be imagined as a "box" with input and output ports where elements to
be processed arrive and leave the stage. In this view, a `Source` is nothing else than a "box" with a single
output port, or, a `BidiFlow` is a "box" with exactly two input and two output ports. In the figure below
we illustrate the most common used stages viewed as "boxes".
![compose_shapes.png](../../images/compose_shapes.png)
The *linear* stages are `Source`, `Sink`
and `Flow`, as these can be used to compose strict chains of processing stages.
Fan-in and Fan-out stages have usually multiple input or multiple output ports, therefore they allow to build
more complex graph layouts, not just chains. `BidiFlow` stages are usually useful in IO related tasks, where
there are input and output channels to be handled. Due to the specific shape of `BidiFlow` it is easy to
stack them on top of each other to build a layered protocol for example. The `TLS` support in Akka is for example
implemented as a `BidiFlow`.
These reusable components already allow the creation of complex processing networks. What we
have seen so far does not implement modularity though. It is desirable for example to package up a larger graph entity into
a reusable component which hides its internals only exposing the ports that are meant to the users of the module
to interact with. One good example is the `Http` server component, which is encoded internally as a
`BidiFlow` which interfaces with the client TCP connection using an input-output port pair accepting and sending
`ByteString` s, while its upper ports emit and receive `HttpRequest` and `HttpResponse` instances.
The following figure demonstrates various composite stages, that contain various other type of stages internally, but
hiding them behind a *shape* that looks like a `Source`, `Flow`, etc.
![compose_composites.png](../../images/compose_composites.png)
One interesting example above is a `Flow` which is composed of a disconnected `Sink` and `Source`.
This can be achieved by using the `fromSinkAndSource()` constructor method on `Flow` which takes the two parts as
parameters.
Please note that when combining a `Flow` using that method, the termination signals are not carried
"through" as the `Sink` and `Source` are assumed to be fully independent. If however you want to construct
a `Flow` like this but need the termination events to trigger "the other side" of the composite flow, you can use
`CoupledTerminationFlow.fromSinkAndSource` which does just that. For example the cancelation of the composite flows
source-side will then lead to completion of its sink-side. Read `CoupledTerminationFlow`'s scaladoc for a
detailed explanation how this works.
The example `BidiFlow` demonstrates that internally a module can be of arbitrary complexity, and the exposed
ports can be wired in flexible ways. The only constraint is that all the ports of enclosed modules must be either
connected to each other, or exposed as interface ports, and the number of such ports needs to match the requirement
of the shape, for example a `Source` allows only one exposed output port, the rest of the internal ports must
be properly connected.
These mechanics allow arbitrary nesting of modules. For example the following figure demonstrates a `RunnableGraph`
that is built from a composite `Source` and a composite `Sink` (which in turn contains a composite
`Flow`).
![compose_nested_flow.png](../../images/compose_nested_flow.png)
The above diagram contains one more shape that we have not seen yet, which is called `RunnableGraph`. It turns
out, that if we wire all exposed ports together, so that no more open ports remain, we get a module that is *closed*.
This is what the `RunnableGraph` class represents. This is the shape that a `Materializer` can take
and turn into a network of running entities that perform the task described. In fact, a `RunnableGraph` is a
module itself, and (maybe somewhat surprisingly) it can be used as part of larger graphs. It is rarely useful to embed
a closed graph shape in a larger graph (since it becomes an isolated island as there are no open port for communication
with the rest of the graph), but this demonstrates the uniform underlying model.
If we try to build a code snippet that corresponds to the above diagram, our first try might look like this:
@@snip [CompositionDocTest.java]($code$/java/jdocs/stream/CompositionDocTest.java) { #non-nested-flow }
It is clear however that there is no nesting present in our first attempt, since the library cannot figure out
where we intended to put composite module boundaries, it is our responsibility to do that. If we are using the
DSL provided by the `Flow`, `Source`, `Sink` classes then nesting can be achieved by calling one of the
methods `withAttributes()` or `named()` (where the latter is just a shorthand for adding a name attribute).
The following code demonstrates how to achieve the desired nesting:
@@snip [CompositionDocTest.java]($code$/java/jdocs/stream/CompositionDocTest.java) { #nested-flow }
Once we have hidden the internals of our components, they act like any other built-in component of similar shape. If
we hide some of the internals of our composites, the result looks just like if any other predefine component has been
used:
![compose_nested_flow_opaque.png](../../images/compose_nested_flow_opaque.png)
If we look at usage of built-in components, and our custom components, there is no difference in usage as the code
snippet below demonstrates.
@@snip [CompositionDocTest.java]($code$/java/jdocs/stream/CompositionDocTest.java) { #reuse }
## Composing complex systems
In the previous section we explored the possibility of composition, and hierarchy, but we stayed away from non-linear,
generalized graph components. There is nothing in Akka Streams though that enforces that stream processing layouts
can only be linear. The DSL for `Source` and friends is optimized for creating such linear chains, as they are
the most common in practice. There is a more advanced DSL for building complex graphs, that can be used if more
flexibility is needed. We will see that the difference between the two DSLs is only on the surface: the concepts they
operate on are uniform across all DSLs and fit together nicely.
As a first example, let's look at a more complex layout:
![compose_graph.png](../../images/compose_graph.png)
The diagram shows a `RunnableGraph` (remember, if there are no unwired ports, the graph is closed, and therefore
can be materialized) that encapsulates a non-trivial stream processing network. It contains fan-in, fan-out stages,
directed and non-directed cycles. The `runnable()` method of the `GraphDSL` factory object allows the creation of a
general, closed, and runnable graph. For example the network on the diagram can be realized like this:
@@snip [CompositionDocTest.java]($code$/java/jdocs/stream/CompositionDocTest.java) { #complex-graph }
In the code above we used the implicit port numbering feature to make the graph more readable and similar to the diagram.
It is possible to refer to the ports, so another version might look like this:
@@snip [CompositionDocTest.java]($code$/java/jdocs/stream/CompositionDocTest.java) { #complex-graph-alt }
Similar to the case in the first section, so far we have not considered modularity. We created a complex graph, but
the layout is flat, not modularized. We will modify our example, and create a reusable component with the graph DSL.
The way to do it is to use the `create()` method on `GraphDSL` factory. If we remove the sources and sinks
from the previous example, what remains is a partial graph:
![compose_graph_partial.png](../../images/compose_graph_partial.png)
We can recreate a similar graph in code, using the DSL in a similar way than before:
@@snip [CompositionDocTest.java]($code$/java/jdocs/stream/CompositionDocTest.java) { #partial-graph }
The only new addition is the return value of the builder block, which is a `Shape`. All graphs (including
`Source`, `BidiFlow`, etc) have a shape, which encodes the *typed* ports of the module. In our example
there is exactly one input and output port left, so we can declare it to have a `FlowShape` by returning an
instance of it. While it is possible to create new `Shape` types, it is usually recommended to use one of the
matching built-in ones.
The resulting graph is already a properly wrapped module, so there is no need to call *named()* to encapsulate the graph, but
it is a good practice to give names to modules to help debugging.
![compose_graph_shape.png](../../images/compose_graph_shape.png)
Since our partial graph has the right shape, it can be already used in the simpler, linear DSL:
@@snip [CompositionDocTest.java]($code$/java/jdocs/stream/CompositionDocTest.java) { #partial-use }
It is not possible to use it as a `Flow` yet, though (i.e. we cannot call `.filter()` on it), but `Flow`
has a `fromGraph()` method that just adds the DSL to a `FlowShape`. There are similar methods on `Source`,
`Sink` and `BidiShape`, so it is easy to get back to the simpler DSL if a graph has the right shape.
For convenience, it is also possible to skip the partial graph creation, and use one of the convenience creator methods.
To demonstrate this, we will create the following graph:
![compose_graph_flow.png](../../images/compose_graph_flow.png)
The code version of the above closed graph might look like this:
@@snip [CompositionDocTest.java]($code$/java/jdocs/stream/CompositionDocTest.java) { #partial-flow-dsl }
@@@ note
All graph builder sections check if the resulting graph has all ports connected except the exposed ones and will
throw an exception if this is violated.
@@@
We are still in debt of demonstrating that `RunnableGraph` is a component just like any other, which can
be embedded in graphs. In the following snippet we embed one closed graph in another:
@@snip [CompositionDocTest.java]($code$/java/jdocs/stream/CompositionDocTest.java) { #embed-closed }
The type of the imported module indicates that the imported module has a `ClosedShape`, and so we are not
able to wire it to anything else inside the enclosing closed graph. Nevertheless, this "island" is embedded properly,
and will be materialized just like any other module that is part of the graph.
As we have demonstrated, the two DSLs are fully interoperable, as they encode a similar nested structure of "boxes with
ports", it is only the DSLs that differ to be as much powerful as possible on the given abstraction level. It is possible
to embed complex graphs in the fluid DSL, and it is just as easy to import and embed a `Flow`, etc, in a larger,
complex structure.
We have also seen, that every module has a `Shape` (for example a `Sink` has a `SinkShape`)
independently which DSL was used to create it. This uniform representation enables the rich composability of various
stream processing entities in a convenient way.
## Materialized values
After realizing that `RunnableGraph` is nothing more than a module with no unused ports (it is an island), it becomes clear that
after materialization the only way to communicate with the running stream processing logic is via some side-channel.
This side channel is represented as a *materialized value*. The situation is similar to `Actor` s, where the
`Props` instance describes the actor logic, but it is the call to `actorOf()` that creates an actually running
actor, and returns an `ActorRef` that can be used to communicate with the running actor itself. Since the
`Props` can be reused, each call will return a different reference.
When it comes to streams, each materialization creates a new running network corresponding to the blueprint that was
encoded in the provided `RunnableGraph`. To be able to interact with the running network, each materialization
needs to return a different object that provides the necessary interaction capabilities. In other words, the
`RunnableGraph` can be seen as a factory, which creates:
* a network of running processing entities, inaccessible from the outside
* a materialized value, optionally providing a controlled interaction capability with the network
Unlike actors though, each of the processing stages might provide a materialized value, so when we compose multiple
stages or modules, we need to combine the materialized value as well (there are default rules which make this easier,
for example *to()* and *via()* takes care of the most common case of taking the materialized value to the left.
See @ref:[Combining materialized values](../stream/stream-flows-and-basics.md#flow-combine-mat) for details). We demonstrate how this works by a code example and a diagram which
graphically demonstrates what is happening.
The propagation of the individual materialized values from the enclosed modules towards the top will look like this:
![compose_mat.png](../../images/compose_mat.png)
To implement the above, first, we create a composite `Source`, where the enclosed `Source` have a
materialized type of `CompletableFuture<Optional<Integer>>>`. By using the combiner function `Keep.left()`, the resulting materialized
type is of the nested module (indicated by the color *red* on the diagram):
@@snip [CompositionDocTest.java]($code$/java/jdocs/stream/CompositionDocTest.java) { #mat-combine-1 }
Next, we create a composite `Flow` from two smaller components. Here, the second enclosed `Flow` has a
materialized type of `CompletionStage<OutgoingConnection>`, and we propagate this to the parent by using `Keep.right()`
as the combiner function (indicated by the color *yellow* on the diagram):
@@snip [CompositionDocTest.java]($code$/java/jdocs/stream/CompositionDocTest.java) { #mat-combine-2 }
As a third step, we create a composite `Sink`, using our `nestedFlow` as a building block. In this snippet, both
the enclosed `Flow` and the folding `Sink` has a materialized value that is interesting for us, so
we use `Keep.both()` to get a `Pair` of them as the materialized type of `nestedSink` (indicated by the color
*blue* on the diagram)
@@snip [CompositionDocTest.java]($code$/java/jdocs/stream/CompositionDocTest.java) { #mat-combine-3 }
As the last example, we wire together `nestedSource` and `nestedSink` and we use a custom combiner function to
create a yet another materialized type of the resulting `RunnableGraph`. This combiner function just ignores
the `CompletionStage<Sink>` part, and wraps the other two values in a custom case class `MyClass`
(indicated by color *purple* on the diagram):
@@snip [CompositionDocTest.java]($code$/java/jdocs/stream/CompositionDocTest.java) { #mat-combine-4a }
@@snip [CompositionDocTest.java]($code$/java/jdocs/stream/CompositionDocTest.java) { #mat-combine-4b }
@@@ note
The nested structure in the above example is not necessary for combining the materialized values, it just
demonstrates how the two features work together. See @ref:[Operator Fusion](stream-flows-and-basics.md#operator-fusion) for further examples
of combining materialized values without nesting and hierarchy involved.
@@@
## Attributes
We have seen that we can use `named()` to introduce a nesting level in the fluid DSL (and also explicit nesting by using
`create()` from `GraphDSL`). Apart from having the effect of adding a nesting level, `named()` is actually
a shorthand for calling `withAttributes(Attributes.name("someName"))`. Attributes provide a way to fine-tune certain
aspects of the materialized running entity. For example buffer sizes for asynchronous stages can be controlled via
attributes (see @ref:[Buffers for asynchronous stages](stream-rate.md#async-stream-buffers)). When it comes to hierarchic composition, attributes are inherited
by nested modules, unless they override them with a custom value.
The code below, a modification of an earlier example sets the `inputBuffer` attribute on certain modules, but not
on others:
@@snip [CompositionDocTest.java]($code$/java/jdocs/stream/CompositionDocTest.java) { #attributes-inheritance }
The effect is, that each module inherits the `inputBuffer` attribute from its enclosing parent, unless it has
the same attribute explicitly set. `nestedSource` gets the default attributes from the materializer itself. `nestedSink`
on the other hand has this attribute set, so it will be used by all nested modules. `nestedFlow` will inherit from `nestedSink`
except the `map` stage which has again an explicitly provided attribute overriding the inherited one.
![compose_attributes.png](../../images/compose_attributes.png)
This diagram illustrates the inheritance process for the example code (representing the materializer default attributes
as the color *red*, the attributes set on `nestedSink` as *blue* and the attributes set on `nestedFlow` as *green*).

View file

@ -1,301 +1 @@
# Basics and working with Flows ../../scala/stream/stream-flows-and-basics.md
<a id="core-concepts"></a>
## Core concepts
Akka Streams is a library to process and transfer a sequence of elements using bounded buffer space. This
latter property is what we refer to as *boundedness* and it is the defining feature of Akka Streams. Translated to
everyday terms it is possible to express a chain (or as we see later, graphs) of processing entities, each executing
independently (and possibly concurrently) from the others while only buffering a limited number of elements at any given
time. This property of bounded buffers is one of the differences from the actor model, where each actor usually has
an unbounded, or a bounded, but dropping mailbox. Akka Stream processing entities have bounded "mailboxes" that
do not drop.
Before we move on, let's define some basic terminology which will be used throughout the entire documentation:
Stream
: An active process that involves moving and transforming data.
Element
: An element is the processing unit of streams. All operations transform and transfer elements from upstream to
downstream. Buffer sizes are always expressed as number of elements independently form the actual size of the elements.
Back-pressure
: A means of flow-control, a way for consumers of data to notify a producer about their current availability, effectively
slowing down the upstream producer to match their consumption speeds.
In the context of Akka Streams back-pressure is always understood as *non-blocking* and *asynchronous*.
Non-Blocking
: Means that a certain operation does not hinder the progress of the calling thread, even if it takes long time to
finish the requested operation.
Graph
: A description of a stream processing topology, defining the pathways through which elements shall flow when the stream
is running.
Processing Stage
: The common name for all building blocks that build up a Graph.
Examples of a processing stage would be operations like `map()`, `filter()`, custom `GraphStage` s and graph
junctions like `Merge` or `Broadcast`. For the full list of built-in processing stages see @ref:[stages overview](stages-overview.md)
When we talk about *asynchronous, non-blocking backpressure* we mean that the processing stages available in Akka
Streams will not use blocking calls but asynchronous message passing to exchange messages between each other, and they
will use asynchronous means to slow down a fast producer, without blocking its thread. This is a thread-pool friendly
design, since entities that need to wait (a fast producer waiting on a slow consumer) will not block the thread but
can hand it back for further use to an underlying thread-pool.
<a id="defining-and-running-streams"></a>
## Defining and running streams
Linear processing pipelines can be expressed in Akka Streams using the following core abstractions:
Source
: A processing stage with *exactly one output*, emitting data elements whenever downstream processing stages are
ready to receive them.
Sink
: A processing stage with *exactly one input*, requesting and accepting data elements possibly slowing down the upstream
producer of elements
Flow
: A processing stage which has *exactly one input and output*, which connects its up- and downstreams by
transforming the data elements flowing through it.
RunnableGraph
: A Flow that has both ends "attached" to a Source and Sink respectively, and is ready to be `run()`.
It is possible to attach a `Flow` to a `Source` resulting in a composite source, and it is also possible to prepend
a `Flow` to a `Sink` to get a new sink. After a stream is properly terminated by having both a source and a sink,
it will be represented by the `RunnableGraph` type, indicating that it is ready to be executed.
It is important to remember that even after constructing the `RunnableGraph` by connecting all the source, sink and
different processing stages, no data will flow through it until it is materialized. Materialization is the process of
allocating all resources needed to run the computation described by a Graph (in Akka Streams this will often involve
starting up Actors). Thanks to Flows being simply a description of the processing pipeline they are *immutable,
thread-safe, and freely shareable*, which means that it is for example safe to share and send them between actors, to have
one actor prepare the work, and then have it be materialized at some completely different place in the code.
@@snip [FlowDocTest.java]($code$/java/jdocs/stream/FlowDocTest.java) { #materialization-in-steps }
After running (materializing) the `RunnableGraph` we get a special container object, the `MaterializedMap`. Both
sources and sinks are able to put specific objects into this map. Whether they put something in or not is implementation
dependent. For example a `FoldSink` will make a `CompletionStage` available in this map which will represent the result
of the folding process over the stream. In general, a stream can expose multiple materialized values,
but it is quite common to be interested in only the value of the Source or the Sink in the stream. For this reason
there is a convenience method called `runWith()` available for `Sink`, `Source` or `Flow` requiring, respectively,
a supplied `Source` (in order to run a `Sink`), a `Sink` (in order to run a `Source`) or
both a `Source` and a `Sink` (in order to run a `Flow`, since it has neither attached yet).
@@snip [FlowDocTest.java]($code$/java/jdocs/stream/FlowDocTest.java) { #materialization-runWith }
It is worth pointing out that since processing stages are *immutable*, connecting them returns a new processing stage,
instead of modifying the existing instance, so while constructing long flows, remember to assign the new value to a variable or run it:
@@snip [FlowDocTest.java]($code$/java/jdocs/stream/FlowDocTest.java) { #source-immutable }
@@@ note
By default Akka Streams elements support **exactly one** downstream processing stage.
Making fan-out (supporting multiple downstream processing stages) an explicit opt-in feature allows default stream elements to
be less complex and more efficient. Also it allows for greater flexibility on *how exactly* to handle the multicast scenarios,
by providing named fan-out elements such as broadcast (signals all down-stream elements) or balance (signals one of available down-stream elements).
@@@
In the above example we used the `runWith` method, which both materializes the stream and returns the materialized value
of the given sink or source.
Since a stream can be materialized multiple times, the `MaterializedMap` returned is different for each materialization.
In the example below we create two running materialized instance of the stream that we described in the `runnable`
variable, and both materializations give us a different `CompletionStage` from the map even though we used the same `sink`
to refer to the future:
@@snip [FlowDocTest.java]($code$/java/jdocs/stream/FlowDocTest.java) { #stream-reuse }
### Defining sources, sinks and flows
The objects `Source` and `Sink` define various ways to create sources and sinks of elements. The following
examples show some of the most useful constructs (refer to the API documentation for more details):
@@snip [FlowDocTest.java]($code$/java/jdocs/stream/FlowDocTest.java) { #source-sink }
There are various ways to wire up different parts of a stream, the following examples show some of the available options:
@@snip [FlowDocTest.java]($code$/java/jdocs/stream/FlowDocTest.java) { #flow-connecting }
### Illegal stream elements
In accordance to the Reactive Streams specification ([Rule 2.13](https://github.com/reactive-streams/reactive-streams-jvm#2.13))
Akka Streams do not allow `null` to be passed through the stream as an element. In case you want to model the concept
of absence of a value we recommend using `java.util.Optional` which is available since Java 8.
<a id="back-pressure-explained"></a>
## Back-pressure explained
Akka Streams implement an asynchronous non-blocking back-pressure protocol standardised by the [Reactive Streams](http://reactive-streams.org/)
specification, which Akka is a founding member of.
The user of the library does not have to write any explicit back-pressure handling code — it is built in
and dealt with automatically by all of the provided Akka Streams processing stages. It is possible however to add
explicit buffer stages with overflow strategies that can influence the behaviour of the stream. This is especially important
in complex processing graphs which may even contain loops (which *must* be treated with very special
care, as explained in @ref:[Graph cycles, liveness and deadlocks](stream-graphs.md#graph-cycles)).
The back pressure protocol is defined in terms of the number of elements a downstream `Subscriber` is able to receive
and buffer, referred to as `demand`.
The source of data, referred to as `Publisher` in Reactive Streams terminology and implemented as `Source` in Akka
Streams, guarantees that it will never emit more elements than the received total demand for any given `Subscriber`.
@@@ note
The Reactive Streams specification defines its protocol in terms of `Publisher` and `Subscriber`.
These types are **not** meant to be user facing API, instead they serve as the low level building blocks for
different Reactive Streams implementations.
Akka Streams implements these concepts as `Source`, `Flow` (referred to as `Processor` in Reactive Streams)
and `Sink` without exposing the Reactive Streams interfaces directly.
If you need to integrate with other Reactive Stream libraries read @ref:[Integrating with Reactive Streams](stream-integrations.md#reactive-streams-integration).
@@@
The mode in which Reactive Streams back-pressure works can be colloquially described as "dynamic push / pull mode",
since it will switch between push and pull based back-pressure models depending on the downstream being able to cope
with the upstream production rate or not.
To illustrate this further let us consider both problem situations and how the back-pressure protocol handles them:
### Slow Publisher, fast Subscriber
This is the happy case of course we do not need to slow down the Publisher in this case. However signalling rates are
rarely constant and could change at any point in time, suddenly ending up in a situation where the Subscriber is now
slower than the Publisher. In order to safeguard from these situations, the back-pressure protocol must still be enabled
during such situations, however we do not want to pay a high penalty for this safety net being enabled.
The Reactive Streams protocol solves this by asynchronously signalling from the Subscriber to the Publisher
`Request(int n)` signals. The protocol guarantees that the Publisher will never signal *more* elements than the
signalled demand. Since the Subscriber however is currently faster, it will be signalling these Request messages at a higher
rate (and possibly also batching together the demand - requesting multiple elements in one Request signal). This means
that the Publisher should not ever have to wait (be back-pressured) with publishing its incoming elements.
As we can see, in this scenario we effectively operate in so called push-mode since the Publisher can continue producing
elements as fast as it can, since the pending demand will be recovered just-in-time while it is emitting elements.
### Fast Publisher, slow Subscriber
This is the case when back-pressuring the `Publisher` is required, because the `Subscriber` is not able to cope with
the rate at which its upstream would like to emit data elements.
Since the `Publisher` is not allowed to signal more elements than the pending demand signalled by the `Subscriber`,
it will have to abide to this back-pressure by applying one of the below strategies:
* not generate elements, if it is able to control their production rate,
* try buffering the elements in a *bounded* manner until more demand is signalled,
* drop elements until more demand is signalled,
* tear down the stream if unable to apply any of the above strategies.
As we can see, this scenario effectively means that the `Subscriber` will *pull* the elements from the Publisher
this mode of operation is referred to as pull-based back-pressure.
<a id="stream-materialization"></a>
## Stream Materialization
When constructing flows and graphs in Akka Streams think of them as preparing a blueprint, an execution plan.
Stream materialization is the process of taking a stream description (the graph) and allocating all the necessary resources
it needs in order to run. In the case of Akka Streams this often means starting up Actors which power the processing,
but is not restricted to that—it could also mean opening files or socket connections etc.—depending on what the stream needs.
Materialization is triggered at so called "terminal operations". Most notably this includes the various forms of the `run()`
and `runWith()` methods defined on `Source` or `Flow` elements as well as a small number of special syntactic sugars for running with
well-known sinks, such as `runForeach(el -> ...)` (being an alias to `runWith(Sink.foreach(el -> ...))`.
Materialization is currently performed synchronously on the materializing thread.
The actual stream processing is handled by actors started up during the streams materialization,
which will be running on the thread pools they have been configured to run on - which defaults to the dispatcher set in
`MaterializationSettings` while constructing the `ActorMaterializer`.
@@@ note
Reusing *instances* of linear computation stages (Source, Sink, Flow) inside composite Graphs is legal,
yet will materialize that stage multiple times.
@@@
<a id="operator-fusion"></a>
### Operator Fusion
By default Akka Streams will fuse the stream operators. This means that the processing steps of a flow or
stream graph can be executed within the same Actor and has two consequences:
* passing elements from one processing stage to the next is a lot faster between fused
stages due to avoiding the asynchronous messaging overhead
* fused stream processing stages does not run in parallel to each other, meaning that
only up to one CPU core is used for each fused part
To allow for parallel processing you will have to insert asynchronous boundaries manually into your flows and
graphs by way of adding `Attributes.asyncBoundary` using the method `async` on `Source`, `Sink` and `Flow`
to pieces that shall communicate with the rest of the graph in an asynchronous fashion.
@@snip [FlowDocTest.java]($code$/java/jdocs/stream/FlowDocTest.java) { #flow-async }
In this example we create two regions within the flow which will be executed in one Actor each—assuming that adding
and multiplying integers is an extremely costly operation this will lead to a performance gain since two CPUs can
work on the tasks in parallel. It is important to note that asynchronous boundaries are not singular places within a
flow where elements are passed asynchronously (as in other streaming libraries), but instead attributes always work
by adding information to the flow graph that has been constructed up to this point:
![asyncBoundary.png](../../images/asyncBoundary.png)
This means that everything that is inside the red bubble will be executed by one actor and everything outside of it
by another. This scheme can be applied successively, always having one such boundary enclose the previous ones plus all
processing stages that have been added since them.
@@@ warning
Without fusing (i.e. up to version 2.0-M2) each stream processing stage had an implicit input buffer
that holds a few elements for efficiency reasons. If your flow graphs contain cycles then these buffers
may have been crucial in order to avoid deadlocks. With fusing these implicit buffers are no longer
there, data elements are passed without buffering between fused stages. In those cases where buffering
is needed in order to allow the stream to run at all, you will have to insert explicit buffers with the
`.buffer()` combinator—typically a buffer of size 2 is enough to allow a feedback loop to function.
@@@
The new fusing behavior can be disabled by setting the configuration parameter `akka.stream.materializer.auto-fusing=off`.
In that case you can still manually fuse those graphs which shall run on less Actors. With the exception of the
`SslTlsStage` and the `groupBy` operator all built-in processing stages can be fused.
### Combining materialized values
Since every processing stage in Akka Streams can provide a materialized value after being materialized, it is necessary
to somehow express how these values should be composed to a final value when we plug these stages together. For this,
many combinator methods have variants that take an additional argument, a function, that will be used to combine the
resulting values. Some examples of using these combiners are illustrated in the example below.
@@snip [FlowDocTest.java]($code$/java/jdocs/stream/FlowDocTest.java) { #flow-mat-combine }
@@@ note
In Graphs it is possible to access the materialized value from inside the stream processing graph. For details see @ref:[Accessing the materialized value inside the Graph](stream-graphs.md#graph-matvalue).
@@@
## Stream ordering
In Akka Streams almost all computation stages *preserve input order* of elements. This means that if inputs `{IA1,IA2,...,IAn}`
"cause" outputs `{OA1,OA2,...,OAk}` and inputs `{IB1,IB2,...,IBm}` "cause" outputs `{OB1,OB2,...,OBl}` and all of
`IAi` happened before all `IBi` then `OAi` happens before `OBi`.
This property is even uphold by async operations such as `mapAsync`, however an unordered version exists
called `mapAsyncUnordered` which does not preserve this ordering.
However, in the case of Junctions which handle multiple input streams (e.g. `Merge`) the output order is,
in general, *not defined* for elements arriving on different input ports. That is a merge-like operation may emit `Ai`
before emitting `Bi`, and it is up to its internal logic to decide the order of emitted elements. Specialized elements
such as `Zip` however *do guarantee* their outputs order, as each output element depends on all upstream elements having
been signalled already thus the ordering in the case of zipping is defined by this property.
If you find yourself in need of fine grained control over order of emitted elements in fan-in
scenarios consider using `MergePreferred` or `GraphStage` which gives you full control over how the
merge is performed.

View file

@ -4,14 +4,14 @@
## Source stages ## Source stages
These built-in sources are available from `akka.stream.scaladsl.Source`: These built-in sources are available from @scala[`akka.stream.scaladsl.Source`] @java[`akka.stream.javadsl.Source`]:
--------------------------------------------------------------- ---------------------------------------------------------------
### fromIterator ### fromIterator
Stream the values from an `Iterator`, requesting the next value when there is demand. The iterator will be created anew Stream the values from an `Iterator`, requesting the next value when there is demand. The iterator will be created anew
for each materialization, which is the reason the method takes a function rather than an iterator directly. for each materialization, which is the reason the @scala[`method`] @java[`factory`] takes a @scala[`function`] @java[`Creator`] rather than an `Iterator` directly.
If the iterator perform blocking operations, make sure to run it on a separate dispatcher. If the iterator perform blocking operations, make sure to run it on a separate dispatcher.
@ -21,6 +21,8 @@ If the iterator perform blocking operations, make sure to run it on a separate d
--------------------------------------------------------------- ---------------------------------------------------------------
@@@ div { .group-scala }
### apply ### apply
Stream the values of an `immutable.Seq`. Stream the values of an `immutable.Seq`.
@ -29,6 +31,17 @@ Stream the values of an `immutable.Seq`.
**completes** when the last element of the seq has been emitted **completes** when the last element of the seq has been emitted
@@@
@@@ div { .group-java }
### from
Stream the values of an `Iterable`. Make sure the `Iterable` is immutable or at least not modified after being used
as a source.
@@@
--------------------------------------------------------------- ---------------------------------------------------------------
### single ### single
@ -88,7 +101,7 @@ If the future fails the stream is failed with that exception.
### fromCompletionStage ### fromCompletionStage
Send the single value of the Java `CompletionStage` when it completes and there is demand. Send the single value of the `CompletionStage` when it completes and there is demand.
If the future fails the stream is failed with that exception. If the future fails the stream is failed with that exception.
**emits** the future completes **emits** the future completes
@ -121,8 +134,8 @@ If the *completion* fails the stream is failed with that exception.
### unfold ### unfold
Stream the result of a function as long as it returns a `Some`, the value inside the option Stream the result of a function as long as it returns a @scala[`Some`] @java[`Optional`], the value inside the option
consists of a tuple where the first value is a state passed back into the next call to the function allowing consists of a @scala[tuple] @java[pair] where the first value is a state passed back into the next call to the function allowing
to pass a state. The first invocation of the provided fold function will receive the `zero` state. to pass a state. The first invocation of the provided fold function will receive the `zero` state.
Can be used to implement many stateful sources without having to touch the more low level `GraphStage` API. Can be used to implement many stateful sources without having to touch the more low level `GraphStage` API.
@ -135,14 +148,14 @@ Can be used to implement many stateful sources without having to touch the more
### unfoldAsync ### unfoldAsync
Just like `unfold` but the fold function returns a `Future` which will cause the source to Just like `unfold` but the fold function returns a @scala[`Future`] @java[`CompletionStage`] which will cause the source to
complete or emit when it completes. complete or emit when it completes.
Can be used to implement many stateful sources without having to touch the more low level `GraphStage` API. Can be used to implement many stateful sources without having to touch the more low level `GraphStage` API.
**emits** when there is demand and unfold state returned future completes with some value **emits** when there is demand and unfold state returned future completes with some value
**completes** when the future returned by the unfold function completes with an empty value **completes** when the @scala[future] @java[CompletionStage] returned by the unfold function completes with an empty value
--------------------------------------------------------------- ---------------------------------------------------------------
@ -159,8 +172,8 @@ an API but there are no elements to emit.
### maybe ### maybe
Materialize a `Promise[Option[T]]` that if completed with a `Some[T]` will emit that *T* and then complete Materialize a @scala[`Promise[Option[T]]`] @java[`CompletionStage`] that if completed with a @scala[`Some[T]`] @java[`Optional`]
the stream, or if completed with `None` complete the stream right away. will emit that *T* and then complete the stream, or if completed with @scala[`None`] @java[`empty Optional`] complete the stream right away.
**emits** when the returned promise is completed with some value **emits** when the returned promise is completed with some value
@ -206,7 +219,17 @@ elements or failing the stream, the strategy is chosen by the user.
**emits** when there is demand and there are messages in the buffer or a message is sent to the actorref **emits** when there is demand and there are messages in the buffer or a message is sent to the actorref
**completes** when the actorref is sent `akka.actor.Status.Success` or `PoisonPill` **completes** when the `ActorRef` is sent `akka.actor.Status.Success` or `PoisonPill`
---------------------------------------------------------------
### range
Emit each integer in a range, with an option to take bigger steps than 1.
**emits** when there is demand, the next value
**completes** when the end of the range has been reached
--------------------------------------------------------------- ---------------------------------------------------------------
@ -224,7 +247,7 @@ Combine several sources, using a given strategy such as merge or concat, into on
Wrap any resource that can be opened, queried for next element (in a blocking way) and closed using three distinct functions into a source. Wrap any resource that can be opened, queried for next element (in a blocking way) and closed using three distinct functions into a source.
**emits** when there is demand and read function returns value **emits** when there is demand and read @scala[function] @java[method] returns value
**completes** when read function returns `None` **completes** when read function returns `None`
@ -233,11 +256,11 @@ Wrap any resource that can be opened, queried for next element (in a blocking wa
### unfoldResourceAsync ### unfoldResourceAsync
Wrap any resource that can be opened, queried for next element (in a blocking way) and closed using three distinct functions into a source. Wrap any resource that can be opened, queried for next element (in a blocking way) and closed using three distinct functions into a source.
Functions return `Future` to achieve asynchronous processing Functions return @scala[`Future`] @java[`CompletionStage`] to achieve asynchronous processing
**emits** when there is demand and `Future` from read function returns value **emits** when there is demand and @scala[`Future`] @java[`CompletionStage`] from read function returns value
**completes** when `Future` from read function returns `None` **completes** when @scala[`Future`] @java[`CompletionStage`] from read function returns `None`
--------------------------------------------------------------- ---------------------------------------------------------------
@ -290,14 +313,14 @@ Combine the elements of multiple streams into a stream of sequences using a comb
## Sink stages ## Sink stages
These built-in sinks are available from `akka.stream.scaladsl.Sink`: These built-in sinks are available from @scala[`akka.stream.scaladsl.Sink`] @java[`akka.stream.javadsl.Sink`]:
--------------------------------------------------------------- ---------------------------------------------------------------
### head ### head
Materializes into a `Future` which completes with the first value arriving, Materializes into a @scala[`Future`] @java[`CompletionStage`] which completes with the first value arriving,
after this the stream is canceled. If no element is emitted, the future is be failed. after this the stream is canceled. If no element is emitted, the @scala[`Future`] @java[`CompletionStage`] is failed.
**cancels** after receiving one element **cancels** after receiving one element
@ -307,8 +330,8 @@ after this the stream is canceled. If no element is emitted, the future is be fa
### headOption ### headOption
Materializes into a `Future[Option[T]]` which completes with the first value arriving wrapped in a `Some`, Materializes into a @scala[`Future[Option[T]]`] @java[`CompletionStage<Optional<T>>`] which completes with the first value arriving wrapped in @scala[`Some`] @java[`Optional`],
or a `None` if the stream completes without any elements emitted. or @scala[a `None`] @java[an empty Optional] if the stream completes without any elements emitted.
**cancels** after receiving one element **cancels** after receiving one element
@ -318,8 +341,8 @@ or a `None` if the stream completes without any elements emitted.
### last ### last
Materializes into a `Future` which will complete with the last value emitted when the stream Materializes into a @scala[`Future`] @java[`CompletionStage`] which will complete with the last value emitted when the stream
completes. If the stream completes with no elements the future is failed. completes. If the stream completes with no elements the @scala[`Future`] @java[`CompletionStage`] is failed.
**cancels** never **cancels** never
@ -329,9 +352,9 @@ completes. If the stream completes with no elements the future is failed.
### lastOption ### lastOption
Materialize a `Future[Option[T]]` which completes with the last value Materialize a @scala[`Future[Option[T]]`] @java[`CompletionStage<Optional<T>>`] which completes with the last value
emitted wrapped in an `Some` when the stream completes. if the stream completes with no elements the future is emitted wrapped in an @scala[`Some`] @java[`Optional`] when the stream completes. if the stream completes with no elements the `CompletionStage` is
completed with `None`. completed with @scala[`None`] @java[an empty `Optional`].
**cancels** never **cancels** never
@ -360,8 +383,8 @@ Immediately cancel the stream
### seq ### seq
Collect values emitted from the stream into a collection, the collection is available through a `Future` or Collect values emitted from the stream into a collection, the collection is available through a @scala[`Future`] @java[`CompletionStage`] or
which completes when the stream completes. Note that the collection is bounded to `Int.MaxValue`, which completes when the stream completes. Note that the collection is bounded to @scala[`Int.MaxValue`] @java[`Integer.MAX_VALUE`],
if more element are emitted the sink will cancel the stream if more element are emitted the sink will cancel the stream
**cancels** If too many values are collected **cancels** If too many values are collected
@ -372,7 +395,7 @@ if more element are emitted the sink will cancel the stream
Invoke a given procedure for each element received. Note that it is not safe to mutate shared state from the procedure. Invoke a given procedure for each element received. Note that it is not safe to mutate shared state from the procedure.
The sink materializes into a `Future[Option[Done]]` which completes when the The sink materializes into a @scala[`Future[Option[Done]]`] @java[`CompletionStage<Optional<Done>`] which completes when the
stream completes, or fails if the stream fails. stream completes, or fails if the stream fails.
Note that it is not safe to mutate state from the procedure. Note that it is not safe to mutate state from the procedure.
@ -430,7 +453,7 @@ a buffer in case stream emitting elements faster than queue pulling them.
Fold over emitted element with a function, where each invocation will get the new element and the result from the Fold over emitted element with a function, where each invocation will get the new element and the result from the
previous fold invocation. The first invocation will be provided the `zero` value. previous fold invocation. The first invocation will be provided the `zero` value.
Materializes into a future that will complete with the last state when the stream has completed. Materializes into a @scala[`Future`] @java[`CompletionStage`] that will complete with the last state when the stream has completed.
This stage allows combining values into a result without a global mutable state by instead passing the state along This stage allows combining values into a result without a global mutable state by instead passing the state along
between invocations. between invocations.
@ -446,7 +469,7 @@ between invocations.
Apply a reduction function on the incoming elements and pass the result to the next invocation. The first invocation Apply a reduction function on the incoming elements and pass the result to the next invocation. The first invocation
receives the two first elements of the flow. receives the two first elements of the flow.
Materializes into a future that will be completed by the last result of the reduction function. Materializes into a @scala[`Future`] @java[`CompletionStage`] that will be completed by the last result of the reduction function.
**cancels** never **cancels** never
@ -525,7 +548,7 @@ dispatcher configured through the `akka.stream.blocking-io-dispatcher`.
Create a sink that wraps an `OutputStream`. Takes a function that produces an `OutputStream`, when the sink is Create a sink that wraps an `OutputStream`. Takes a function that produces an `OutputStream`, when the sink is
materialized the function will be called and bytes sent to the sink will be written to the returned `OutputStream`. materialized the function will be called and bytes sent to the sink will be written to the returned `OutputStream`.
Materializes into a `Future` which will complete with a `IOResult` when the stream Materializes into a @scala[`Future`] @java[`CompletionStage`] which will complete with a `IOResult` when the stream
completes. completes.
Note that a flow can be materialized multiple times, so the function producing the `OutputStream` must be able Note that a flow can be materialized multiple times, so the function producing the `OutputStream` must be able
@ -551,7 +574,7 @@ The `InputStream` will be ended when the stream flowing into this `Sink` complet
Create a source that wraps an `InputStream`. Takes a function that produces an `InputStream`, when the source is Create a source that wraps an `InputStream`. Takes a function that produces an `InputStream`, when the source is
materialized the function will be called and bytes from the `InputStream` will be emitted into the stream. materialized the function will be called and bytes from the `InputStream` will be emitted into the stream.
Materializes into a `Future` which will complete with a `IOResult` when the stream Materializes into a @scala[`Future`] @java[`CompletionStage`] which will complete with a `IOResult` when the stream
completes. completes.
Note that a flow can be materialized multiple times, so the function producing the `InputStream` must be able Note that a flow can be materialized multiple times, so the function producing the `InputStream` must be able
@ -593,7 +616,7 @@ downstream on demand.
### javaCollector ### javaCollector
Create a sink which materializes into a `Future` which will be completed with a result of the Java 8 `Collector` Create a sink which materializes into a @scala[`Future`] @java[`CompletionStage`] which will be completed with a result of the Java 8 `Collector`
transformation and reduction operations. This allows usage of Java 8 streams transformations for reactive streams. transformation and reduction operations. This allows usage of Java 8 streams transformations for reactive streams.
The `Collector` will trigger demand downstream. Elements emitted through the stream will be accumulated into a mutable The `Collector` will trigger demand downstream. Elements emitted through the stream will be accumulated into a mutable
result container, optionally transformed into a final representation after all input elements have been processed. result container, optionally transformed into a final representation after all input elements have been processed.
@ -606,7 +629,7 @@ to handle multiple invocations.
### javaCollectorParallelUnordered ### javaCollectorParallelUnordered
Create a sink which materializes into a `Future` which will be completed with a result of the Java 8 `Collector` Create a sink which materializes into a @scala[`Future`] @java[`CompletionStage`] which will be completed with a result of the Java 8 `Collector`
transformation and reduction operations. This allows usage of Java 8 streams transformations for reactive streams. transformation and reduction operations. This allows usage of Java 8 streams transformations for reactive streams.
The `Collector` is triggering demand downstream. Elements emitted through the stream will be accumulated into a mutable The `Collector` is triggering demand downstream. Elements emitted through the stream will be accumulated into a mutable
result container, optionally transformed into a final representation after all input elements have been processed. result container, optionally transformed into a final representation after all input elements have been processed.
@ -627,7 +650,7 @@ Sources and sinks for reading and writing files can be found on `FileIO`.
### fromPath ### fromPath
Emit the contents of a file, as `ByteString` s, materializes into a `Future` which will be completed with Emit the contents of a file, as `ByteString` s, materializes into a @scala[`Future`] @java[`CompletionStage`]` which will be completed with
a `IOResult` upon reaching the end of the file or if there is a failure. a `IOResult` upon reaching the end of the file or if there is a failure.
--------------------------------------------------------------- ---------------------------------------------------------------
@ -786,13 +809,13 @@ the second element is required from downstream.
### scanAsync ### scanAsync
Just like `scan` but receiving a function that results in a `Future` to the next value. Just like `scan` but receiving a function that results in a @scala[`Future`] @java[`CompletionStage`] to the next value.
**emits** when the `Future` resulting from the function scanning the element resolves to the next value **emits** when the @scala[`Future`] @java[`CompletionStage`] resulting from the function scanning the element resolves to the next value
**backpressures** when downstream backpressures **backpressures** when downstream backpressures
**completes** when upstream completes and the last `Future` is resolved **completes** when upstream completes and the last @scala[`Future`] @java[`CompletionStage`] is resolved
--------------------------------------------------------------- ---------------------------------------------------------------
@ -811,13 +834,13 @@ complete the current value is emitted downstream.
### foldAsync ### foldAsync
Just like `fold` but receiving a function that results in a `Future` to the next value. Just like `fold` but receiving a function that results in a @scala[`Future`] @java[`CompletionStage`] to the next value.
**emits** when upstream completes and the last `Future` is resolved **emits** when upstream completes and the last @scala[`Future`] @java[`CompletionStage`] is resolved
**backpressures** when downstream backpressures **backpressures** when downstream backpressures
**completes** when upstream completes and the last `Future` is resolved **completes** when upstream completes and the last @scala[`Future`] @java[`CompletionStage`] is resolved
--------------------------------------------------------------- ---------------------------------------------------------------
@ -1012,7 +1035,7 @@ Evaluated cost of each element defines how many elements will be allowed to trav
Log elements flowing through the stream as well as completion and erroring. By default element and Log elements flowing through the stream as well as completion and erroring. By default element and
completion signals are logged on debug level, and errors are logged on Error level. completion signals are logged on debug level, and errors are logged on Error level.
This can be changed by calling `Attributes.logLevels(...)` on the given Flow. This can be changed by calling @scala[`Attributes.logLevels(...)`] @java[`Attributes.createLogLevels(...)`] on the given Flow.
**emits** when upstream emits **emits** when upstream emits
@ -1078,38 +1101,38 @@ The order in which the *in* and *out* sides receive their respective completion
## Asynchronous processing stages ## Asynchronous processing stages
These stages encapsulate an asynchronous computation, properly handling backpressure while taking care of the asynchronous These stages encapsulate an asynchronous computation, properly handling backpressure while taking care of the asynchronous
operation at the same time (usually handling the completion of a Future). operation at the same time (usually handling the completion of a @scala[`Future`] @java[`CompletionStage`]).
--------------------------------------------------------------- ---------------------------------------------------------------
### mapAsync ### mapAsync
Pass incoming elements to a function that return a `Future` result. When the future arrives the result is passed Pass incoming elements to a function that return a @scala[`Future`] @java[`CompletionStage`] result. When the @scala[`Future`] @java[`CompletionStage`] arrives the result is passed
downstream. Up to `n` elements can be processed concurrently, but regardless of their completion time the incoming downstream. Up to `n` elements can be processed concurrently, but regardless of their completion time the incoming
order will be kept when results complete. For use cases where order does not mather `mapAsyncUnordered` can be used. order will be kept when results complete. For use cases where order does not mather `mapAsyncUnordered` can be used.
If a Future fails, the stream also fails (unless a different supervision strategy is applied) If a @scala[`Future`] @java[`CompletionStage`] fails, the stream also fails (unless a different supervision strategy is applied)
**emits** when the Future returned by the provided function finishes for the next element in sequence **emits** when the @scala[`Future`] @java[`CompletionStage`] returned by the provided function finishes for the next element in sequence
**backpressures** when the number of futures reaches the configured parallelism and the downstream backpressures **backpressures** when the number of @scala[`Future` s] @java[`CompletionStage` s] reaches the configured parallelism and the downstream backpressures
**completes** when upstream completes and all futures has been completed and all elements has been emitted **completes** when upstream completes and all @scala[`Future` s] @java[`CompletionStage` s] has been completed and all elements has been emitted
--------------------------------------------------------------- ---------------------------------------------------------------
### mapAsyncUnordered ### mapAsyncUnordered
Like `mapAsync` but `Future` results are passed downstream as they arrive regardless of the order of the elements Like `mapAsync` but @scala[`Future`] @java[`CompletionStage`] results are passed downstream as they arrive regardless of the order of the elements
that triggered them. that triggered them.
If a Future fails, the stream also fails (unless a different supervision strategy is applied) If a @scala[`Future`] @java[`CompletionStage`] fails, the stream also fails (unless a different supervision strategy is applied)
**emits** any of the Futures returned by the provided function complete **emits** any of the @scala[`Future` s] @java[`CompletionStage` s] returned by the provided function complete
**backpressures** when the number of futures reaches the configured parallelism and the downstream backpressures **backpressures** when the number of @scala[`Future` s] @java[`CompletionStage` s] reaches the configured parallelism and the downstream backpressures
**completes** upstream completes and all futures has been completed and all elements has been emitted **completes** upstream completes and all @scala[`Future` s] @java[`CompletionStage` s] has been completed and all elements has been emitted
--------------------------------------------------------------- ---------------------------------------------------------------
@ -1560,7 +1583,7 @@ Merge multiple sources. Prefer one source if all sources has elements ready.
### zip ### zip
Combines elements from each of multiple sources into tuples and passes the tuples downstream. Combines elements from each of multiple sources into @scala[tuples] @java[*Pair*] and passes the @scala[tuples] @java[pairs] downstream.
**emits** when all of the inputs have an element available **emits** when all of the inputs have an element available
@ -1744,7 +1767,7 @@ partitioner function applied to the element.
### watchTermination ### watchTermination
Materializes to a `Future` that will be completed with Done or failed depending whether the upstream of the stage has been completed or failed. Materializes to a @scala[`Future`] @java[`CompletionStage`] that will be completed with Done or failed depending whether the upstream of the stage has been completed or failed.
The stage otherwise passes through elements unchanged. The stage otherwise passes through elements unchanged.
**emits** when input has an element available **emits** when input has an element available

View file

@ -66,7 +66,12 @@ with the rest of the graph), but this demonstrates the uniform underlying model.
If we try to build a code snippet that corresponds to the above diagram, our first try might look like this: If we try to build a code snippet that corresponds to the above diagram, our first try might look like this:
@@snip [CompositionDocSpec.scala]($code$/scala/docs/stream/CompositionDocSpec.scala) { #non-nested-flow } Scala
: @@snip [CompositionDocSpec.scala]($code$/scala/docs/stream/CompositionDocSpec.scala) { #non-nested-flow }
Java
: @@snip [CompositionDocTest.java]($code$/java/jdocs/stream/CompositionDocTest.java) { #non-nested-flow }
It is clear however that there is no nesting present in our first attempt, since the library cannot figure out It is clear however that there is no nesting present in our first attempt, since the library cannot figure out
where we intended to put composite module boundaries, it is our responsibility to do that. If we are using the where we intended to put composite module boundaries, it is our responsibility to do that. If we are using the
@ -75,7 +80,11 @@ methods `withAttributes()` or `named()` (where the latter is just a shorthand fo
The following code demonstrates how to achieve the desired nesting: The following code demonstrates how to achieve the desired nesting:
@@snip [CompositionDocSpec.scala]($code$/scala/docs/stream/CompositionDocSpec.scala) { #nested-flow } Scala
: @@snip [CompositionDocSpec.scala]($code$/scala/docs/stream/CompositionDocSpec.scala) { #nested-flow }
Java
: @@snip [CompositionDocTest.java]($code$/java/jdocs/stream/CompositionDocTest.java) { #nested-flow }
Once we have hidden the internals of our components, they act like any other built-in component of similar shape. If Once we have hidden the internals of our components, they act like any other built-in component of similar shape. If
we hide some of the internals of our composites, the result looks just like if any other predefine component has been we hide some of the internals of our composites, the result looks just like if any other predefine component has been
@ -86,7 +95,11 @@ used:
If we look at usage of built-in components, and our custom components, there is no difference in usage as the code If we look at usage of built-in components, and our custom components, there is no difference in usage as the code
snippet below demonstrates. snippet below demonstrates.
@@snip [CompositionDocSpec.scala]($code$/scala/docs/stream/CompositionDocSpec.scala) { #reuse } Scala
: @@snip [CompositionDocSpec.scala]($code$/scala/docs/stream/CompositionDocSpec.scala) { #reuse }
Java
: @@snip [CompositionDocTest.java]($code$/java/jdocs/stream/CompositionDocTest.java) { #reuse }
## Composing complex systems ## Composing complex systems
@ -106,13 +119,21 @@ can be materialized) that encapsulates a non-trivial stream processing network.
directed and non-directed cycles. The `runnable()` method of the `GraphDSL` object allows the creation of a directed and non-directed cycles. The `runnable()` method of the `GraphDSL` object allows the creation of a
general, closed, and runnable graph. For example the network on the diagram can be realized like this: general, closed, and runnable graph. For example the network on the diagram can be realized like this:
@@snip [CompositionDocSpec.scala]($code$/scala/docs/stream/CompositionDocSpec.scala) { #complex-graph } Scala
: @@snip [CompositionDocSpec.scala]($code$/scala/docs/stream/CompositionDocSpec.scala) { #complex-graph }
Java
: @@snip [CompositionDocTest.java]($code$/java/jdocs/stream/CompositionDocTest.java) { #complex-graph }
In the code above we used the implicit port numbering feature (to make the graph more readable and similar to the diagram) In the code above we used the implicit port numbering feature (to make the graph more readable and similar to the diagram)
and we imported `Source` s, `Sink` s and `Flow` s explicitly. It is possible to refer to the ports and we imported `Source` s, `Sink` s and `Flow` s explicitly. It is possible to refer to the ports
explicitly, and it is not necessary to import our linear stages via `add()`, so another version might look like this: explicitly, and it is not necessary to import our linear stages via `add()`, so another version might look like this:
@@snip [CompositionDocSpec.scala]($code$/scala/docs/stream/CompositionDocSpec.scala) { #complex-graph-alt } Scala
: @@snip [CompositionDocSpec.scala]($code$/scala/docs/stream/CompositionDocSpec.scala) { #complex-graph-alt }
Java
: @@snip [CompositionDocTest.java]($code$/java/jdocs/stream/CompositionDocTest.java) { #complex-graph-alt }
Similar to the case in the first section, so far we have not considered modularity. We created a complex graph, but Similar to the case in the first section, so far we have not considered modularity. We created a complex graph, but
the layout is flat, not modularized. We will modify our example, and create a reusable component with the graph DSL. the layout is flat, not modularized. We will modify our example, and create a reusable component with the graph DSL.
@ -123,7 +144,11 @@ from the previous example, what remains is a partial graph:
We can recreate a similar graph in code, using the DSL in a similar way than before: We can recreate a similar graph in code, using the DSL in a similar way than before:
@@snip [CompositionDocSpec.scala]($code$/scala/docs/stream/CompositionDocSpec.scala) { #partial-graph } Scala
: @@snip [CompositionDocSpec.scala]($code$/scala/docs/stream/CompositionDocSpec.scala) { #partial-graph }
Java
: @@snip [CompositionDocTest.java]($code$/java/jdocs/stream/CompositionDocTest.java) { #partial-graph }
The only new addition is the return value of the builder block, which is a `Shape`. All graphs (including The only new addition is the return value of the builder block, which is a `Shape`. All graphs (including
`Source`, `BidiFlow`, etc) have a shape, which encodes the *typed* ports of the module. In our example `Source`, `BidiFlow`, etc) have a shape, which encodes the *typed* ports of the module. In our example
@ -138,7 +163,11 @@ it is a good practice to give names to modules to help debugging.
Since our partial graph has the right shape, it can be already used in the simpler, linear DSL: Since our partial graph has the right shape, it can be already used in the simpler, linear DSL:
@@snip [CompositionDocSpec.scala]($code$/scala/docs/stream/CompositionDocSpec.scala) { #partial-use } Scala
: @@snip [CompositionDocSpec.scala]($code$/scala/docs/stream/CompositionDocSpec.scala) { #partial-use }
Java
: @@snip [CompositionDocTest.java]($code$/java/jdocs/stream/CompositionDocTest.java) { #partial-use }
It is not possible to use it as a `Flow` yet, though (i.e. we cannot call `.filter()` on it), but `Flow` It is not possible to use it as a `Flow` yet, though (i.e. we cannot call `.filter()` on it), but `Flow`
has a `fromGraph()` method that just adds the DSL to a `FlowShape`. There are similar methods on `Source`, has a `fromGraph()` method that just adds the DSL to a `FlowShape`. There are similar methods on `Source`,
@ -150,7 +179,11 @@ To demonstrate this, we will create the following graph:
The code version of the above closed graph might look like this: The code version of the above closed graph might look like this:
@@snip [CompositionDocSpec.scala]($code$/scala/docs/stream/CompositionDocSpec.scala) { #partial-flow-dsl } Scala
: @@snip [CompositionDocSpec.scala]($code$/scala/docs/stream/CompositionDocSpec.scala) { #partial-flow-dsl }
Java
: @@snip [CompositionDocTest.java]($code$/java/jdocs/stream/CompositionDocTest.java) { #partial-flow-dsl }
@@@ note @@@ note
@ -162,7 +195,11 @@ throw an exception if this is violated.
We are still in debt of demonstrating that `RunnableGraph` is a component just like any other, which can We are still in debt of demonstrating that `RunnableGraph` is a component just like any other, which can
be embedded in graphs. In the following snippet we embed one closed graph in another: be embedded in graphs. In the following snippet we embed one closed graph in another:
@@snip [CompositionDocSpec.scala]($code$/scala/docs/stream/CompositionDocSpec.scala) { #embed-closed } Scala
: @@snip [CompositionDocSpec.scala]($code$/scala/docs/stream/CompositionDocSpec.scala) { #embed-closed }
Java
: @@snip [CompositionDocTest.java]($code$/java/jdocs/stream/CompositionDocTest.java) { #embed-closed }
The type of the imported module indicates that the imported module has a `ClosedShape`, and so we are not The type of the imported module indicates that the imported module has a `ClosedShape`, and so we are not
able to wire it to anything else inside the enclosing closed graph. Nevertheless, this "island" is embedded properly, able to wire it to anything else inside the enclosing closed graph. Nevertheless, this "island" is embedded properly,
@ -197,38 +234,57 @@ needs to return a different object that provides the necessary interaction capab
Unlike actors though, each of the processing stages might provide a materialized value, so when we compose multiple Unlike actors though, each of the processing stages might provide a materialized value, so when we compose multiple
stages or modules, we need to combine the materialized value as well (there are default rules which make this easier, stages or modules, we need to combine the materialized value as well (there are default rules which make this easier,
for example *to()* and *via()* takes care of the most common case of taking the materialized value to the left. for example *to()* and *via()* takes care of the most common case of taking the materialized value to the left.
See @ref:[Combining materialized values](stream-flows-and-basics.md#flow-combine-mat) for details). We demonstrate how this works by a code example and a diagram which See @ref:[Combining materialized values](stream-flows-and-basics.md#flow-combine-mat) for details).
graphically demonstrates what is happening. We demonstrate how this works by a code example and a diagram which graphically demonstrates what is happening.
The propagation of the individual materialized values from the enclosed modules towards the top will look like this: The propagation of the individual materialized values from the enclosed modules towards the top will look like this:
![compose_mat.png](../../images/compose_mat.png) ![compose_mat.png](../../images/compose_mat.png)
To implement the above, first, we create a composite `Source`, where the enclosed `Source` have a To implement the above, first, we create a composite `Source`, where the enclosed `Source` have a
materialized type of `Promise[[Option[Int]]`. By using the combiner function `Keep.left`, the resulting materialized materialized type of @scala[`Promise[[Option[Int]]`] @java[`CompletableFuture<Optional<Integer>>>`]. By using the combiner function `Keep.left`, the resulting materialized
type is of the nested module (indicated by the color *red* on the diagram): type is of the nested module (indicated by the color *red* on the diagram):
@@snip [CompositionDocSpec.scala]($code$/scala/docs/stream/CompositionDocSpec.scala) { #mat-combine-1 } Scala
: @@snip [CompositionDocSpec.scala]($code$/scala/docs/stream/CompositionDocSpec.scala) { #mat-combine-1 }
Java
: @@snip [CompositionDocTest.java]($code$/java/jdocs/stream/CompositionDocTest.java) { #mat-combine-1 }
Next, we create a composite `Flow` from two smaller components. Here, the second enclosed `Flow` has a Next, we create a composite `Flow` from two smaller components. Here, the second enclosed `Flow` has a
materialized type of `Future[OutgoingConnection]`, and we propagate this to the parent by using `Keep.right` materialized type of @scala[`Future[OutgoingConnection]`] @java[`CompletionStage<OutgoingConnection>`], and we propagate this to the parent by using `Keep.right`
as the combiner function (indicated by the color *yellow* on the diagram): as the combiner function (indicated by the color *yellow* on the diagram):
@@snip [CompositionDocSpec.scala]($code$/scala/docs/stream/CompositionDocSpec.scala) { #mat-combine-2 } Scala
: @@snip [CompositionDocSpec.scala]($code$/scala/docs/stream/CompositionDocSpec.scala) { #mat-combine-2 }
Java
: @@snip [CompositionDocTest.java]($code$/java/jdocs/stream/CompositionDocTest.java) { #mat-combine-2 }
As a third step, we create a composite `Sink`, using our `nestedFlow` as a building block. In this snippet, both As a third step, we create a composite `Sink`, using our `nestedFlow` as a building block. In this snippet, both
the enclosed `Flow` and the folding `Sink` has a materialized value that is interesting for us, so the enclosed `Flow` and the folding `Sink` has a materialized value that is interesting for us, so
we use `Keep.both` to get a `Pair` of them as the materialized type of `nestedSink` (indicated by the color we use `Keep.both` to get a `Pair` of them as the materialized type of `nestedSink` (indicated by the color
*blue* on the diagram) *blue* on the diagram)
@@snip [CompositionDocSpec.scala]($code$/scala/docs/stream/CompositionDocSpec.scala) { #mat-combine-3 } Scala
: @@snip [CompositionDocSpec.scala]($code$/scala/docs/stream/CompositionDocSpec.scala) { #mat-combine-3 }
Java
: @@snip [CompositionDocTest.java]($code$/java/jdocs/stream/CompositionDocTest.java) { #mat-combine-3 }
As the last example, we wire together `nestedSource` and `nestedSink` and we use a custom combiner function to As the last example, we wire together `nestedSource` and `nestedSink` and we use a custom combiner function to
create a yet another materialized type of the resulting `RunnableGraph`. This combiner function just ignores create a yet another materialized type of the resulting `RunnableGraph`. This combiner function just ignores
the `Future[Sink]` part, and wraps the other two values in a custom case class `MyClass` the @scala[`Future[Sink]`] @java[`CompletionStage<Sink>`] part, and wraps the other two values in a custom case class `MyClass`
(indicated by color *purple* on the diagram): (indicated by color *purple* on the diagram):
@@snip [CompositionDocSpec.scala]($code$/scala/docs/stream/CompositionDocSpec.scala) { #mat-combine-4 } Scala
: @@snip [CompositionDocSpec.scala]($code$/scala/docs/stream/CompositionDocSpec.scala) { #mat-combine-4 }
Java
: @@snip [CompositionDocTest.java]($code$/java/jdocs/stream/CompositionDocTest.java) { #mat-combine-4a }
@@snip [CompositionDocTest.java]($code$/java/jdocs/stream/CompositionDocTest.java) { #mat-combine-4b }
@@@ note @@@ note
@ -250,7 +306,11 @@ by nested modules, unless they override them with a custom value.
The code below, a modification of an earlier example sets the `inputBuffer` attribute on certain modules, but not The code below, a modification of an earlier example sets the `inputBuffer` attribute on certain modules, but not
on others: on others:
@@snip [CompositionDocSpec.scala]($code$/scala/docs/stream/CompositionDocSpec.scala) { #attributes-inheritance } Scala
: @@snip [CompositionDocSpec.scala]($code$/scala/docs/stream/CompositionDocSpec.scala) { #attributes-inheritance }
Java
: @@snip [CompositionDocTest.java]($code$/java/jdocs/stream/CompositionDocTest.java) { #attributes-inheritance }
The effect is, that each module inherits the `inputBuffer` attribute from its enclosing parent, unless it has The effect is, that each module inherits the `inputBuffer` attribute from its enclosing parent, unless it has
the same attribute explicitly set. `nestedSource` gets the default attributes from the materializer itself. `nestedSink` the same attribute explicitly set. `nestedSource` gets the default attributes from the materializer itself. `nestedSink`

View file

@ -77,13 +77,20 @@ starting up Actors). Thanks to Flows being simply a description of the processin
thread-safe, and freely shareable*, which means that it is for example safe to share and send them between actors, to have thread-safe, and freely shareable*, which means that it is for example safe to share and send them between actors, to have
one actor prepare the work, and then have it be materialized at some completely different place in the code. one actor prepare the work, and then have it be materialized at some completely different place in the code.
@@snip [FlowDocSpec.scala]($code$/scala/docs/stream/FlowDocSpec.scala) { #materialization-in-steps } Scala
: @@snip [FlowDocSpec.scala]($code$/scala/docs/stream/FlowDocSpec.scala) { #materialization-in-steps }
Java
: @@snip [FlowDocTest.java]($code$/java/jdocs/stream/FlowDocTest.java) { #materialization-in-steps }
@@@ div { .group-scala }
After running (materializing) the `RunnableGraph[T]` we get back the materialized value of type T. Every stream processing After running (materializing) the `RunnableGraph[T]` we get back the materialized value of type T. Every stream processing
stage can produce a materialized value, and it is the responsibility of the user to combine them to a new type. stage can produce a materialized value, and it is the responsibility of the user to combine them to a new type.
In the above example we used `toMat` to indicate that we want to transform the materialized value of the source and In the above example we used `toMat` to indicate that we want to transform the materialized value of the source and
sink, and we used the convenience function `Keep.right` to say that we are only interested in the materialized value sink, and we used the convenience function `Keep.right` to say that we are only interested in the materialized value
of the sink. of the sink.
In our example the `FoldSink` materializes a value of type `Future` which will represent the result In our example the `FoldSink` materializes a value of type `Future` which will represent the result
of the folding process over the stream. In general, a stream can expose multiple materialized values, of the folding process over the stream. In general, a stream can expose multiple materialized values,
but it is quite common to be interested in only the value of the Source or the Sink in the stream. For this reason but it is quite common to be interested in only the value of the Source or the Sink in the stream. For this reason
@ -91,12 +98,36 @@ there is a convenience method called `runWith()` available for `Sink`, `Source`
a supplied `Source` (in order to run a `Sink`), a `Sink` (in order to run a `Source`) or a supplied `Source` (in order to run a `Sink`), a `Sink` (in order to run a `Source`) or
both a `Source` and a `Sink` (in order to run a `Flow`, since it has neither attached yet). both a `Source` and a `Sink` (in order to run a `Flow`, since it has neither attached yet).
@@snip [FlowDocSpec.scala]($code$/scala/docs/stream/FlowDocSpec.scala) { #materialization-runWith } @@@
@@@ div { .group-java }
After running (materializing) the `RunnableGraph` we get a special container object, the `MaterializedMap`. Both
sources and sinks are able to put specific objects into this map. Whether they put something in or not is implementation
dependent.
For example a `FoldSink` will make a `CompletionStage` available in this map which will represent the result
of the folding process over the stream. In general, a stream can expose multiple materialized values,
but it is quite common to be interested in only the value of the Source or the Sink in the stream. For this reason
there is a convenience method called `runWith()` available for `Sink`, `Source` or `Flow` requiring, respectively,
a supplied `Source` (in order to run a `Sink`), a `Sink` (in order to run a `Source`) or
both a `Source` and a `Sink` (in order to run a `Flow`, since it has neither attached yet).
@@@
Scala
: @@snip [FlowDocSpec.scala]($code$/scala/docs/stream/FlowDocSpec.scala) { #materialization-runWith }
Java
: @@snip [FlowDocTest.java]($code$/java/jdocs/stream/FlowDocTest.java) { #materialization-runWith }
It is worth pointing out that since processing stages are *immutable*, connecting them returns a new processing stage, It is worth pointing out that since processing stages are *immutable*, connecting them returns a new processing stage,
instead of modifying the existing instance, so while constructing long flows, remember to assign the new value to a variable or run it: instead of modifying the existing instance, so while constructing long flows, remember to assign the new value to a variable or run it:
@@snip [FlowDocSpec.scala]($code$/scala/docs/stream/FlowDocSpec.scala) { #source-immutable } Scala
: @@snip [FlowDocSpec.scala]($code$/scala/docs/stream/FlowDocSpec.scala) { #source-immutable }
Java
: @@snip [FlowDocTest.java]($code$/java/jdocs/stream/FlowDocTest.java) { #source-immutable }
@@@ note @@@ note
@ -110,32 +141,43 @@ by providing named fan-out elements such as broadcast (signals all down-stream e
In the above example we used the `runWith` method, which both materializes the stream and returns the materialized value In the above example we used the `runWith` method, which both materializes the stream and returns the materialized value
of the given sink or source. of the given sink or source.
Since a stream can be materialized multiple times, the materialized value will also be calculated anew for each such Since a stream can be materialized multiple times, the @scala[materialized value will also be calculated anew] @java[`MaterializedMap` returned is different] for each such
materialization, usually leading to different values being returned each time. materialization, usually leading to different values being returned each time.
In the example below we create two running materialized instance of the stream that we described in the `runnable` In the example below we create two running materialized instance of the stream that we described in the `runnable`
variable, and both materializations give us a different `Future` from the map even though we used the same `sink` variable, and both materializations give us a different @scala[`Future`] @java[`CompletionStage`] from the map even though we used the same `sink`
to refer to the future: to refer to the future:
@@snip [FlowDocSpec.scala]($code$/scala/docs/stream/FlowDocSpec.scala) { #stream-reuse } Scala
: @@snip [FlowDocSpec.scala]($code$/scala/docs/stream/FlowDocSpec.scala) { #stream-reuse }
Java
: @@snip [FlowDocTest.java]($code$/java/jdocs/stream/FlowDocTest.java) { #stream-reuse }
### Defining sources, sinks and flows ### Defining sources, sinks and flows
The objects `Source` and `Sink` define various ways to create sources and sinks of elements. The following The objects `Source` and `Sink` define various ways to create sources and sinks of elements. The following
examples show some of the most useful constructs (refer to the API documentation for more details): examples show some of the most useful constructs (refer to the API documentation for more details):
@@snip [FlowDocSpec.scala]($code$/scala/docs/stream/FlowDocSpec.scala) { #source-sink } Scala
: @@snip [FlowDocSpec.scala]($code$/scala/docs/stream/FlowDocSpec.scala) { #source-sink }
Java
: @@snip [FlowDocTest.java]($code$/java/jdocs/stream/FlowDocTest.java) { #source-sink }
There are various ways to wire up different parts of a stream, the following examples show some of the available options: There are various ways to wire up different parts of a stream, the following examples show some of the available options:
@@snip [FlowDocSpec.scala]($code$/scala/docs/stream/FlowDocSpec.scala) { #flow-connecting } Scala
: @@snip [FlowDocSpec.scala]($code$/scala/docs/stream/FlowDocSpec.scala) { #flow-connecting }
Java
: @@snip [FlowDocTest.java]($code$/java/jdocs/stream/FlowDocTest.java) { #flow-connecting }
### Illegal stream elements ### Illegal stream elements
In accordance to the Reactive Streams specification ([Rule 2.13](https://github.com/reactive-streams/reactive-streams-jvm#2.13)) In accordance to the Reactive Streams specification ([Rule 2.13](https://github.com/reactive-streams/reactive-streams-jvm#2.13))
Akka Streams do not allow `null` to be passed through the stream as an element. In case you want to model the concept Akka Streams do not allow `null` to be passed through the stream as an element. In case you want to model the concept
of absence of a value we recommend using `scala.Option` or `scala.util.Either`. of absence of a value we recommend using @scala[`scala.Option` or `scala.util.Either`] @java[`java.util.Optional` which is available since Java 8].
<a id="back-pressure-explained"></a>
## Back-pressure explained ## Back-pressure explained
Akka Streams implement an asynchronous non-blocking back-pressure protocol standardised by the [Reactive Streams](http://reactive-streams.org/) Akka Streams implement an asynchronous non-blocking back-pressure protocol standardised by the [Reactive Streams](http://reactive-streams.org/)
@ -178,7 +220,7 @@ slower than the Publisher. In order to safeguard from these situations, the back
during such situations, however we do not want to pay a high penalty for this safety net being enabled. during such situations, however we do not want to pay a high penalty for this safety net being enabled.
The Reactive Streams protocol solves this by asynchronously signalling from the Subscriber to the Publisher The Reactive Streams protocol solves this by asynchronously signalling from the Subscriber to the Publisher
`Request(n:Int)` signals. The protocol guarantees that the Publisher will never signal *more* elements than the @scala[`Request(n:Int)`] @java[`Request(int n)`] signals. The protocol guarantees that the Publisher will never signal *more* elements than the
signalled demand. Since the Subscriber however is currently faster, it will be signalling these Request messages at a higher signalled demand. Since the Subscriber however is currently faster, it will be signalling these Request messages at a higher
rate (and possibly also batching together the demand - requesting multiple elements in one Request signal). This means rate (and possibly also batching together the demand - requesting multiple elements in one Request signal). This means
that the Publisher should not ever have to wait (be back-pressured) with publishing its incoming elements. that the Publisher should not ever have to wait (be back-pressured) with publishing its incoming elements.
@ -212,7 +254,7 @@ but is not restricted to that—it could also mean opening files or socket conne
Materialization is triggered at so called "terminal operations". Most notably this includes the various forms of the `run()` Materialization is triggered at so called "terminal operations". Most notably this includes the various forms of the `run()`
and `runWith()` methods defined on `Source` and `Flow` elements as well as a small number of special syntactic sugars for running with and `runWith()` methods defined on `Source` and `Flow` elements as well as a small number of special syntactic sugars for running with
well-known sinks, such as `runForeach(el => ...)` (being an alias to `runWith(Sink.foreach(el => ...))`. well-known sinks, such as @scala[`runForeach(el => ...)`] @java[`runForeach(el -> ...)`] (being an alias to @scala[`runWith(Sink.foreach(el => ...))`] @java[`runWith(Sink.foreach(el -> ...))`].
Materialization is currently performed synchronously on the materializing thread. Materialization is currently performed synchronously on the materializing thread.
The actual stream processing is handled by actors started up during the streams materialization, The actual stream processing is handled by actors started up during the streams materialization,
@ -241,7 +283,11 @@ To allow for parallel processing you will have to insert asynchronous boundaries
graphs by way of adding `Attributes.asyncBoundary` using the method `async` on `Source`, `Sink` and `Flow` graphs by way of adding `Attributes.asyncBoundary` using the method `async` on `Source`, `Sink` and `Flow`
to pieces that shall communicate with the rest of the graph in an asynchronous fashion. to pieces that shall communicate with the rest of the graph in an asynchronous fashion.
@@snip [FlowDocSpec.scala]($code$/scala/docs/stream/FlowDocSpec.scala) { #flow-async } Scala
: @@snip [FlowDocSpec.scala]($code$/scala/docs/stream/FlowDocSpec.scala) { #flow-async }
Java
: @@snip [FlowDocTest.java]($code$/java/jdocs/stream/FlowDocTest.java) { #flow-async }
In this example we create two regions within the flow which will be executed in one Actor each—assuming that adding In this example we create two regions within the flow which will be executed in one Actor each—assuming that adding
and multiplying integers is an extremely costly operation this will lead to a performance gain since two CPUs can and multiplying integers is an extremely costly operation this will lead to a performance gain since two CPUs can
@ -278,7 +324,11 @@ to somehow express how these values should be composed to a final value when we
many combinator methods have variants that take an additional argument, a function, that will be used to combine the many combinator methods have variants that take an additional argument, a function, that will be used to combine the
resulting values. Some examples of using these combiners are illustrated in the example below. resulting values. Some examples of using these combiners are illustrated in the example below.
@@snip [FlowDocSpec.scala]($code$/scala/docs/stream/FlowDocSpec.scala) { #flow-mat-combine } Scala
: @@snip [FlowDocSpec.scala]($code$/scala/docs/stream/FlowDocSpec.scala) { #flow-mat-combine }
Java
: @@snip [FlowDocTest.java]($code$/java/jdocs/stream/FlowDocTest.java) { #flow-mat-combine }
@@@ note @@@ note