2014-12-20 14:11:29 +01:00
|
|
|
|
.. _stream-flow-scala:
|
|
|
|
|
|
|
|
|
|
|
|
#############################
|
|
|
|
|
|
Basics and working with Flows
|
|
|
|
|
|
#############################
|
|
|
|
|
|
|
|
|
|
|
|
Core concepts
|
|
|
|
|
|
=============
|
|
|
|
|
|
|
2014-12-20 16:46:57 +01:00
|
|
|
|
Akka Streams is a library to process and transfer a sequence of elements using bounded buffer space. This
|
|
|
|
|
|
latter property is what we refer to as *boundedness* and it is the defining feature of Akka Streams. Translated to
|
|
|
|
|
|
everyday terms it is possible to express a chain (or as we see later, graphs) of processing entities, each executing
|
|
|
|
|
|
independently (and possibly concurrently) from the others while only buffering a limited number of elements at any given
|
|
|
|
|
|
time. This property of bounded buffers is one of the differences from the actor model, where each actor usually has
|
|
|
|
|
|
an unbounded, or a bounded, but dropping mailbox. Akka Stream processing entities have bounded "mailboxes" that
|
|
|
|
|
|
do not drop.
|
2014-12-20 14:11:29 +01:00
|
|
|
|
|
2015-01-07 17:29:07 +01:00
|
|
|
|
Before we move on, let's define some basic terminology which will be used throughout the entire documentation:
|
2014-12-20 14:11:29 +01:00
|
|
|
|
|
|
|
|
|
|
Stream
|
|
|
|
|
|
An active process that involves moving and transforming data.
|
|
|
|
|
|
Element
|
2014-12-20 16:46:57 +01:00
|
|
|
|
An element is the processing unit of streams. All operations transform and transfer elements from upstream to
|
|
|
|
|
|
downstream. Buffer sizes are always expressed as number of elements independently form the actual size of the elements.
|
2014-12-20 14:11:29 +01:00
|
|
|
|
Back-pressure
|
2014-12-20 16:46:57 +01:00
|
|
|
|
A means of flow-control, a way for consumers of data to notify a producer about their current availability, effectively
|
2015-01-07 17:29:07 +01:00
|
|
|
|
slowing down the upstream producer to match their consumption speeds.
|
|
|
|
|
|
In the context of Akka Streams back-pressure is always understood as *non-blocking* and *asynchronous*.
|
2014-12-20 14:11:29 +01:00
|
|
|
|
Processing Stage
|
|
|
|
|
|
The common name for all building blocks that build up a Flow or FlowGraph.
|
2015-01-07 17:29:07 +01:00
|
|
|
|
Examples of a processing stage would be operations like ``map()``, ``filter()``, stages added by ``transform()`` like
|
|
|
|
|
|
:class:`PushStage`, :class:`PushPullStage`, :class:`StatefulStage` and graph junctions like ``Merge`` or ``Broadcast``.
|
2014-12-20 14:11:29 +01:00
|
|
|
|
|
2014-12-20 16:46:57 +01:00
|
|
|
|
Defining and running streams
|
|
|
|
|
|
----------------------------
|
2014-12-20 14:11:29 +01:00
|
|
|
|
Linear processing pipelines can be expressed in Akka Streams using the following three core abstractions:
|
|
|
|
|
|
|
|
|
|
|
|
Source
|
2014-12-20 17:16:08 +01:00
|
|
|
|
A processing stage with *exactly one output*, emitting data elements whenever downstream processing stages are
|
2014-12-20 16:46:57 +01:00
|
|
|
|
ready to receive them.
|
2014-12-20 14:11:29 +01:00
|
|
|
|
Sink
|
2014-12-20 16:46:57 +01:00
|
|
|
|
A processing stage with *exactly one input*, requesting and accepting data elements possibly slowing down the upstream
|
|
|
|
|
|
producer of elements
|
2014-12-20 14:11:29 +01:00
|
|
|
|
Flow
|
2014-12-20 17:16:08 +01:00
|
|
|
|
A processing stage which has *exactly one input and output*, which connects its up- and downstreams by
|
2014-12-20 14:11:29 +01:00
|
|
|
|
transforming the data elements flowing through it.
|
|
|
|
|
|
RunnableFlow
|
2014-12-20 17:16:08 +01:00
|
|
|
|
A Flow that has both ends "attached" to a Source and Sink respectively, and is ready to be ``run()``.
|
2014-12-20 14:11:29 +01:00
|
|
|
|
|
2014-12-20 16:46:57 +01:00
|
|
|
|
It is possible to attach a ``Flow`` to a ``Source`` resulting in a composite source, and it is also possible to prepend
|
|
|
|
|
|
a ``Flow`` to a ``Sink`` to get a new sink. After a stream is properly terminated by having both a source and a sink,
|
|
|
|
|
|
it will be represented by the ``RunnableFlow`` type, indicating that it is ready to be executed.
|
2014-12-20 14:11:29 +01:00
|
|
|
|
|
2014-12-20 16:46:57 +01:00
|
|
|
|
It is important to remember that even after constructing the ``RunnableFlow`` by connecting all the source, sink and
|
|
|
|
|
|
different processing stages, no data will flow through it until it is materialized. Materialization is the process of
|
|
|
|
|
|
allocating all resources needed to run the computation described by a Flow (in Akka Streams this will often involve
|
|
|
|
|
|
starting up Actors). Thanks to Flows being simply a description of the processing pipeline they are *immutable,
|
2014-12-20 17:16:08 +01:00
|
|
|
|
thread-safe, and freely shareable*, which means that it is for example safe to share and send them between actors, to have
|
2014-12-20 16:46:57 +01:00
|
|
|
|
one actor prepare the work, and then have it be materialized at some completely different place in the code.
|
2014-12-20 14:11:29 +01:00
|
|
|
|
|
|
|
|
|
|
.. includecode:: code/docs/stream/FlowDocSpec.scala#materialization-in-steps
|
|
|
|
|
|
|
2014-12-20 16:46:57 +01:00
|
|
|
|
After running (materializing) the ``RunnableFlow`` we get a special container object, the ``MaterializedMap``. Both
|
2014-12-20 17:16:08 +01:00
|
|
|
|
sources and sinks are able to put specific objects into this map. Whether they put something in or not is implementation
|
2014-12-20 16:46:57 +01:00
|
|
|
|
dependent. For example a ``FoldSink`` will make a ``Future`` available in this map which will represent the result
|
|
|
|
|
|
of the folding process over the stream. In general, a stream can expose multiple materialized values,
|
|
|
|
|
|
but it is quite common to be interested in only the value of the Source or the Sink in the stream. For this reason
|
|
|
|
|
|
there is a convenience method called ``runWith()`` available for ``Sink``, ``Source`` or ``Flow`` requiring, respectively,
|
|
|
|
|
|
a supplied ``Source`` (in order to run a ``Sink``), a ``Sink`` (in order to run a ``Source``) or
|
2014-12-20 14:11:29 +01:00
|
|
|
|
both a ``Source`` and a ``Sink`` (in order to run a ``Flow``, since it has neither attached yet).
|
|
|
|
|
|
|
|
|
|
|
|
.. includecode:: code/docs/stream/FlowDocSpec.scala#materialization-runWith
|
|
|
|
|
|
|
|
|
|
|
|
It is worth pointing out that since processing stages are *immutable*, connecting them returns a new processing stage,
|
2015-01-07 17:29:07 +01:00
|
|
|
|
instead of modifying the existing instance, so while constructing long flows, remember to assign the new value to a variable or run it:
|
2014-12-20 14:11:29 +01:00
|
|
|
|
|
|
|
|
|
|
.. includecode:: code/docs/stream/FlowDocSpec.scala#source-immutable
|
|
|
|
|
|
|
|
|
|
|
|
.. note::
|
2014-12-20 15:04:16 +01:00
|
|
|
|
By default Akka Streams elements support **exactly one** downstream processing stage.
|
|
|
|
|
|
Making fan-out (supporting multiple downstream processing stages) an explicit opt-in feature allows default stream elements to
|
|
|
|
|
|
be less complex and more efficient. Also it allows for greater flexibility on *how exactly* to handle the multicast scenarios,
|
|
|
|
|
|
by providing named fan-out elements such as broadcast (signals all down-stream elements) or balance (signals one of available down-stream elements).
|
2014-12-20 14:11:29 +01:00
|
|
|
|
|
|
|
|
|
|
In the above example we used the ``runWith`` method, which both materializes the stream and returns the materialized value
|
|
|
|
|
|
of the given sink or source.
|
|
|
|
|
|
|
2015-01-07 17:29:07 +01:00
|
|
|
|
Since a stream can be materialized multiple times, the ``MaterializedMap`` returned is different for each materialization.
|
2014-12-20 16:46:57 +01:00
|
|
|
|
In the example below we create two running materialized instance of the stream that we described in the ``runnable``
|
|
|
|
|
|
variable, and both materializations give us a different ``Future`` from the map even though we used the same ``sink``
|
|
|
|
|
|
to refer to the future:
|
|
|
|
|
|
|
|
|
|
|
|
.. includecode:: code/docs/stream/FlowDocSpec.scala#stream-reuse
|
|
|
|
|
|
|
|
|
|
|
|
Defining sources, sinks and flows
|
|
|
|
|
|
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
|
|
|
|
|
|
|
|
|
|
|
|
The objects :class:`Source` and :class:`Sink` define various ways to create sources and sinks of elements. The following
|
2015-01-07 17:29:07 +01:00
|
|
|
|
examples show some of the most useful constructs (refer to the API documentation for more details):
|
2014-12-20 16:46:57 +01:00
|
|
|
|
|
|
|
|
|
|
.. includecode:: code/docs/stream/FlowDocSpec.scala#source-sink
|
|
|
|
|
|
|
2015-01-07 17:29:07 +01:00
|
|
|
|
There are various ways to wire up different parts of a stream, the following examples show some of the available options:
|
2014-12-20 16:46:57 +01:00
|
|
|
|
|
|
|
|
|
|
.. includecode:: code/docs/stream/FlowDocSpec.scala#flow-connecting
|
|
|
|
|
|
|
|
|
|
|
|
|
2014-12-20 14:11:29 +01:00
|
|
|
|
.. _back-pressure-explained-scala:
|
|
|
|
|
|
|
|
|
|
|
|
Back-pressure explained
|
|
|
|
|
|
-----------------------
|
2015-01-07 17:29:07 +01:00
|
|
|
|
Akka Streams implement an asynchronous non-blocking back-pressure protocol standardised by the `Reactive Streams`_
|
2014-12-20 14:11:29 +01:00
|
|
|
|
specification, which Akka is a founding member of.
|
|
|
|
|
|
|
2014-12-20 17:16:08 +01:00
|
|
|
|
.. _Reactive Streams: http://reactive-streams.org/
|
|
|
|
|
|
|
|
|
|
|
|
The user of the library does not have to write any explicit back-pressure handling code — it is built in
|
2014-12-20 16:46:57 +01:00
|
|
|
|
and dealt with automatically by all of the provided Akka Streams processing stages. It is possible however to add
|
|
|
|
|
|
explicit buffer stages with overflow strategies that can influence the behaviour of the stream. This is especially important
|
2015-01-07 17:29:07 +01:00
|
|
|
|
in complex processing graphs which may even contain loops (which *must* be treated with very special
|
2014-12-20 16:46:57 +01:00
|
|
|
|
care, as explained in :ref:`graph-cycles-scala`).
|
2014-12-20 14:11:29 +01:00
|
|
|
|
|
2014-12-20 16:46:57 +01:00
|
|
|
|
The back pressure protocol is defined in terms of the number of elements a downstream ``Subscriber`` is able to receive
|
|
|
|
|
|
and buffer, referred to as ``demand``.
|
2015-01-07 17:29:07 +01:00
|
|
|
|
The source of data, referred to as ``Publisher`` in Reactive Streams terminology and implemented as ``Source`` in Akka
|
|
|
|
|
|
Streams, guarantees that it will never emit more elements than the received total demand for any given ``Subscriber``.
|
2014-12-20 14:11:29 +01:00
|
|
|
|
|
|
|
|
|
|
.. note::
|
2015-01-29 16:43:47 +01:00
|
|
|
|
|
|
|
|
|
|
The Reactive Streams specification defines its protocol in terms of ``Publisher`` and ``Subscriber``.
|
2015-01-07 17:29:07 +01:00
|
|
|
|
These types are **not** meant to be user facing API, instead they serve as the low level building blocks for
|
2014-12-20 15:04:16 +01:00
|
|
|
|
different Reactive Streams implementations.
|
2014-12-20 14:11:29 +01:00
|
|
|
|
|
2015-01-29 16:43:47 +01:00
|
|
|
|
Akka Streams implements these concepts as ``Source``, ``Flow`` (referred to as ``Processor`` in Reactive Streams)
|
|
|
|
|
|
and ``Sink`` without exposing the Reactive Streams interfaces directly.
|
2014-12-20 16:46:57 +01:00
|
|
|
|
If you need to integrate with other Reactive Stream libraries read :ref:`reactive-streams-integration-scala`.
|
2014-12-20 14:11:29 +01:00
|
|
|
|
|
|
|
|
|
|
The mode in which Reactive Streams back-pressure works can be colloquially described as "dynamic push / pull mode",
|
2015-01-07 17:29:07 +01:00
|
|
|
|
since it will switch between push and pull based back-pressure models depending on the downstream being able to cope
|
|
|
|
|
|
with the upstream production rate or not.
|
2014-12-20 14:11:29 +01:00
|
|
|
|
|
2015-01-07 17:29:07 +01:00
|
|
|
|
To illustrate this further let us consider both problem situations and how the back-pressure protocol handles them:
|
2014-12-20 14:11:29 +01:00
|
|
|
|
|
|
|
|
|
|
Slow Publisher, fast Subscriber
|
|
|
|
|
|
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
|
2015-01-07 17:29:07 +01:00
|
|
|
|
This is the happy case of course – we do not need to slow down the Publisher in this case. However signalling rates are
|
2014-12-20 14:11:29 +01:00
|
|
|
|
rarely constant and could change at any point in time, suddenly ending up in a situation where the Subscriber is now
|
|
|
|
|
|
slower than the Publisher. In order to safeguard from these situations, the back-pressure protocol must still be enabled
|
|
|
|
|
|
during such situations, however we do not want to pay a high penalty for this safety net being enabled.
|
|
|
|
|
|
|
|
|
|
|
|
The Reactive Streams protocol solves this by asynchronously signalling from the Subscriber to the Publisher
|
2015-01-07 17:29:07 +01:00
|
|
|
|
``Request(n:Int)`` signals. The protocol guarantees that the Publisher will never signal *more* elements than the
|
|
|
|
|
|
signalled demand. Since the Subscriber however is currently faster, it will be signalling these Request messages at a higher
|
2014-12-20 14:11:29 +01:00
|
|
|
|
rate (and possibly also batching together the demand - requesting multiple elements in one Request signal). This means
|
|
|
|
|
|
that the Publisher should not ever have to wait (be back-pressured) with publishing its incoming elements.
|
|
|
|
|
|
|
|
|
|
|
|
As we can see, in this scenario we effectively operate in so called push-mode since the Publisher can continue producing
|
|
|
|
|
|
elements as fast as it can, since the pending demand will be recovered just-in-time while it is emitting elements.
|
|
|
|
|
|
|
|
|
|
|
|
Fast Publisher, slow Subscriber
|
|
|
|
|
|
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
|
|
|
|
|
|
This is the case when back-pressuring the ``Publisher`` is required, because the ``Subscriber`` is not able to cope with
|
|
|
|
|
|
the rate at which its upstream would like to emit data elements.
|
|
|
|
|
|
|
|
|
|
|
|
Since the ``Publisher`` is not allowed to signal more elements than the pending demand signalled by the ``Subscriber``,
|
|
|
|
|
|
it will have to abide to this back-pressure by applying one of the below strategies:
|
|
|
|
|
|
|
|
|
|
|
|
- not generate elements, if it is able to control their production rate,
|
|
|
|
|
|
- try buffering the elements in a *bounded* manner until more demand is signalled,
|
|
|
|
|
|
- drop elements until more demand is signalled,
|
|
|
|
|
|
- tear down the stream if unable to apply any of the above strategies.
|
|
|
|
|
|
|
2015-01-07 17:29:07 +01:00
|
|
|
|
As we can see, this scenario effectively means that the ``Subscriber`` will *pull* the elements from the Publisher –
|
2014-12-20 14:11:29 +01:00
|
|
|
|
this mode of operation is referred to as pull-based back-pressure.
|
|
|
|
|
|
|
|
|
|
|
|
.. _stream-materialization-scala:
|
2014-12-20 16:46:57 +01:00
|
|
|
|
|
2014-12-20 14:11:29 +01:00
|
|
|
|
Stream Materialization
|
|
|
|
|
|
----------------------
|
|
|
|
|
|
|
|
|
|
|
|
When constructing flows and graphs in Akka Streams think of them as preparing a blueprint, an execution plan.
|
|
|
|
|
|
Stream materialization is the process of taking a stream description (the graph) and allocating all the necessary resources
|
|
|
|
|
|
it needs in order to run. In the case of Akka Streams this often means starting up Actors which power the processing,
|
|
|
|
|
|
but is not restricted to that - it could also mean opening files or socket connections etc. – depending on what the stream needs.
|
|
|
|
|
|
|
|
|
|
|
|
Materialization is triggered at so called "terminal operations". Most notably this includes the various forms of the ``run()``
|
|
|
|
|
|
and ``runWith()`` methods defined on flow elements as well as a small number of special syntactic sugars for running with
|
2015-01-26 14:57:05 +01:00
|
|
|
|
well-known sinks, such as ``runForeach(el => )`` (being an alias to ``runWith(Sink.foreach(el => ))``.
|
2014-12-20 14:11:29 +01:00
|
|
|
|
|
|
|
|
|
|
Materialization is currently performed synchronously on the materializing thread.
|
|
|
|
|
|
Tha actual stream processing is handled by :ref:`Actors actor-scala` started up during the streams materialization,
|
|
|
|
|
|
which will be running on the thread pools they have been configured to run on - which defaults to the dispatcher set in
|
2015-01-27 18:29:20 +01:00
|
|
|
|
:class:`MaterializationSettings` while constructing the :class:`ActorFlowMaterializer`.
|
2014-12-20 14:11:29 +01:00
|
|
|
|
|
|
|
|
|
|
.. note::
|
2014-12-20 15:04:16 +01:00
|
|
|
|
Reusing *instances* of linear computation stages (Source, Sink, Flow) inside FlowGraphs is legal,
|
|
|
|
|
|
yet will materialize that stage multiple times.
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
Stream ordering
|
|
|
|
|
|
===============
|
2015-01-07 17:29:07 +01:00
|
|
|
|
In Akka Streams almost all computation stages *preserve input order* of elements. This means that if inputs ``{IA1,IA2,...,IAn}``
|
2014-12-20 15:04:16 +01:00
|
|
|
|
"cause" outputs ``{OA1,OA2,...,OAk}`` and inputs ``{IB1,IB2,...,IBm}`` "cause" outputs ``{OB1,OB2,...,OBl}`` and all of
|
|
|
|
|
|
``IAi`` happened before all ``IBi`` then ``OAi`` happens before ``OBi``.
|
|
|
|
|
|
|
|
|
|
|
|
This property is even uphold by async operations such as ``mapAsync``, however an unordered version exists
|
|
|
|
|
|
called ``mapAsyncUnordered`` which does not preserve this ordering.
|
|
|
|
|
|
|
|
|
|
|
|
However, in the case of Junctions which handle multiple input streams (e.g. :class:`Merge`) the output order is,
|
2015-01-07 17:29:07 +01:00
|
|
|
|
in general, *not defined* for elements arriving on different input ports. That is a merge-like operation may emit ``Ai``
|
2014-12-20 15:04:16 +01:00
|
|
|
|
before emitting ``Bi``, and it is up to its internal logic to decide the order of emitted elements. Specialized elements
|
|
|
|
|
|
such as ``Zip`` however *do guarantee* their outputs order, as each output element depends on all upstream elements having
|
2015-01-07 17:29:07 +01:00
|
|
|
|
been signalled already – thus the ordering in the case of zipping is defined by this property.
|
2014-12-20 15:04:16 +01:00
|
|
|
|
|
|
|
|
|
|
If you find yourself in need of fine grained control over order of emitted elements in fan-in
|
2015-01-07 17:29:07 +01:00
|
|
|
|
scenarios consider using :class:`MergePreferred` or :class:`FlexiMerge` – which gives you full control over how the
|
|
|
|
|
|
merge is performed.
|