!doc: Restructure document pages

This commit is contained in:
Endre Sándor Varga 2014-12-20 14:11:29 +01:00
parent 2d140eee30
commit 4da6e6d0bb
11 changed files with 1069 additions and 248 deletions

View file

@ -1,4 +1,4 @@
.. _stream-cookbook-scala
.. _stream-cookbook-scala:
################
Streams Cookbook

View file

@ -1,81 +0,0 @@
Graph cycles, liveness and deadlocks
------------------------------------
By default :class:`FlowGraph` does not allow (or to be precise, its builder does not allow) the creation of cycles.
The reason for this is that cycles need special considerations to avoid potential deadlocks and other liveness issues.
This section shows several examples of problems that can arise from the presence of feedback arcs in stream processing
graphs.
The first example demonstrates a graph that contains a naive cycle (the presence of cycles is enabled by calling
``allowCycles()`` on the builder). The graph takes elements from the source, prints them, then broadcasts those elements
to a consumer (we just used ``Sink.ignore`` for now) and to a feedback arc that is merged back into the main stream via
a ``Merge`` junction.
.. includecode:: code/docs/stream/GraphCyclesSpec.scala#deadlocked
Running this we observe that after a few numbers have been printed, no more elements are logged to the console -
all processing stops after some time. After some investigation we observe that:
* through merging from ``source`` we increase the number of elements flowing in the cycle
* by broadcasting back to the cycle we do not decrease the number of elements in the cycle
Since Akka Streams (and Reactive Streams in general) guarantee bounded processing (see the "Buffering" section for more
details) it means that only a bounded number of elements are buffered over any time span. Since our cycle gains more and
more elements, eventually all of its internal buffers become full, backpressuring ``source`` forever. To be able
to process more elements from ``source`` elements would need to leave the cycle somehow.
If we modify our feedback loop by replacing the ``Merge`` junction with a ``MergePreferred`` we can avoid the deadlock.
``MergePreferred`` is unfair as it always tries to consume from a preferred input port if there are elements available
before trying the other lower priority input ports. Since we feed back through the preferred port it is always guaranteed
that the elements in the cycles can flow.
.. includecode:: code/docs/stream/GraphCyclesSpec.scala#unfair
If we run the example we see that the same sequence of numbers are printed
over and over again, but the processing does not stop. Hence, we avoided the deadlock, but ``source`` is still
backpressured forever, because buffer space is never recovered: the only action we see is the circulation of a couple
of initial elements from ``source``.
.. note::
What we see here is that in certain cases we need to choose between boundedness and liveness. Our first example would
not deadlock if there would be an infinite buffer in the loop, or vice versa, if the elements in the cycle would
be balanced (as many elements are removed as many are injected) then there would be no deadlock.
To make our cycle both live (not deadlocking) and fair we can introduce a dropping element on the feedback arc. In this
case we chose the ``buffer()`` operation giving it a dropping strategy ``OverflowStrategy.dropHead``.
.. includecode:: code/docs/stream/GraphCyclesSpec.scala#dropping
If we run this example we see that
* The flow of elements does not stop, there are always elements printed
* We see that some of the numbers are printed several times over time (due to the feedback loop) but on average
the numbers are increasing in the long term
This example highlights that one solution to avoid deadlocks in the presence of potentially unbalanced cycles
(cycles where the number of circulating elements are unbounded) is to drop elements. An alternative would be to
define a larger buffer with ``OverflowStrategy.error`` which would fail the stream instead of deadlocking it after
all buffer space has been consumed.
As we discovered in the previous examples, the core problem was the unbalanced nature of the feedback loop. We
circumvented this issue by adding a dropping element, but now we want to build a cycle that is balanced from
the beginning instead. To achieve this we modify our first graph by replacing the ``Merge`` junction with a ``ZipWith``.
Since ``ZipWith`` takes one element from ``source`` *and* from the feedback arc to inject one element into the cycle,
we maintain the balance of elements.
.. includecode:: code/docs/stream/GraphCyclesSpec.scala#zipping-dead
Still, when we try to run the example it turns out that no element is printed at all! After some investigation we
realize that:
* In order to get the first element from ``source`` into the cycle we need an already existing element in the cycle
* In order to get an initial element in the cycle we need an element from ``source``
These two conditions are a typical "chicken-and-egg" problem. The solution is to inject an initial
element into the cycle that is independent from ``source``. We do this by using a ``Concat`` junction on the backwards
arc that injects a single element using ``Source.single``.
.. includecode:: code/docs/stream/GraphCyclesSpec.scala#zipping-live
When we run the above example we see that processing starts and never stops. The important takeaway from this example
is that balanced cycles often need an initial "kick-off" element to be injected into the cycle.

View file

@ -0,0 +1,44 @@
.. _stream-customize-scala:
########################
Custom stream processing
########################
Custom linear processing stages
===============================
Using PushStage
---------------
*TODO*
Using PushPullStage
-------------------
*TODO*
Using StatefulStage
-------------------
*TODO*
Using DetachedStage
-------------------
*TODO*
Custom graph processing junctions
=================================
Using FlexiMerge
----------------
*TODO*
Using FlexiRoute
----------------
*TODO*

View file

@ -0,0 +1,167 @@
.. _stream-flow-scala:
#############################
Basics and working with Flows
#############################
Core concepts
=============
Everything in Akka Streams revolves around a number of core concepts which we introduce in detail in this section.
Akka Streams provide a way for executing bounded processing pipelines, where bounds are expressed as the number of stream
elements in flight and in buffers at any given time. Please note that while this allows to estimate an limit memory use
it is not strictly bound to the size in memory of these elements.
First we define the terminology which will be used though out the entire documentation:
Stream
An active process that involves moving and transforming data.
Element
An element is the unit which is passed through the stream. All operations as well as back-pressure are expressed in
terms of elements.
Back-pressure
A means of flow-control, and most notably adjusting the speed of upstream sources to the consumption speeds of their sinks.
In the context of Akka Streams back-pressure is always understood as *non-blocking* and *asynchronous*
Processing Stage
The common name for all building blocks that build up a Flow or FlowGraph.
Examples of a processing stage would be Stage (:class:`PushStage`, :class:`PushPullStage`, :class:`StatefulStage`,
:class:`DetachedStage`), in terms of which operations like ``map()``, ``filter()`` and others are implemented.
Sources, Flows and Sinks
------------------------
Linear processing pipelines can be expressed in Akka Streams using the following three core abstractions:
Source
A processing stage with *exactly one output*, emitting data elements in response to it's down-stream demand.
Sink
A processing stage with *exactly one input*, generating demand based on it's internal demand management strategy.
Flow
A processing stage which has *exactly one input and output*, which connects it's up and downstreams by (usually)
transforming the data elements flowing through it.
RunnableFlow
A Flow with has both ends "attached" to a Source and Sink respectively, and is ready to be ``run()``.
It is important to remember that while constructing these processing pipelines by connecting their different processing
stages no data will flow through it until it is materialized. Materialization is the process of allocating all resources
needed to run the computation described by a Flow (in Akka Streams this will often involve starting up Actors).
Thanks to Flows being simply a description of the processing pipeline they are *immutable, thread-safe, and freely shareable*,
which means that it is for example safe to share send between actorsto have one actor prepare the work, and then have it
be materialized at some completely different place in the code.
In order to be able to run a ``Flow[In,Out]`` it must be connected to a ``Sink[In]`` *and* ``Source[Out]`` of matching types.
It is also possible to directly connect a :class:`Sink` to a :class:`Source`.
.. includecode:: code/docs/stream/FlowDocSpec.scala#materialization-in-steps
The :class:`MaterializedMap` can be used to get materialized values of both sinks and sources out from the running
stream. In general, a stream can expose multiple materialized values, however the very common case of only wanting to
get back a Sinks (in order to read a result) or Sources (in order to cancel or influence it in some way) materialized
values has a small convenience method called ``runWith()``. It is available for ``Sink`` or ``Source`` and ``Flow``, with respectively,
requiring the user to supply a ``Source`` (in order to run a ``Sink``), a ``Sink`` (in order to run a ``Source``) and
both a ``Source`` and a ``Sink`` (in order to run a ``Flow``, since it has neither attached yet).
.. includecode:: code/docs/stream/FlowDocSpec.scala#materialization-runWith
It is worth pointing out that since processing stages are *immutable*, connecting them returns a new processing stage,
instead of modifying the existing instance, so while construction long flows, remember to assign the new value to a variable or run it:
.. includecode:: code/docs/stream/FlowDocSpec.scala#source-immutable
.. note::
By default Akka Streams elements support **exactly one** downstream processing stage.
Making fan-out (supporting multiple downstream processing stages) an explicit opt-in feature allows default stream elements to
be less complex and more efficient. Also it allows for greater flexibility on *how exactly* to handle the multicast scenarios,
by providing named fan-out elements such as broadcast (signals all down-stream elements) or balance (signals one of available down-stream elements).
In the above example we used the ``runWith`` method, which both materializes the stream and returns the materialized value
of the given sink or source.
.. _back-pressure-explained-scala:
Back-pressure explained
-----------------------
Akka Streams implements an asynchronous non-blocking back-pressure protocol standardised by the Reactive Streams
specification, which Akka is a founding member of.
As library user you do not have to write any explicit back-pressure handling code in order for it to work - it is built
and dealt with automatically by all of the provided Akka Streams processing stages. However is possible to include
explicit buffers with overflow strategies that can influence the behaviour of the stream. This is especially important
in complex processing graphs which may even sometimes even contain loops (which *must* be treated with very special
care, as explained in :ref:`cycles-scala`).
The back pressure protocol is defined in terms of the number of elements a downstream ``Subscriber`` is able to receive,
referred to as ``demand``. This demand is the *number of elements* receiver of the data, referred to as ``Subscriber``
in Reactive Streams, and implemented by ``Sink`` in Akka Streams is able to safely consume at this point in time.
The source of data referred to as ``Publisher`` in Reactive Streams terminology and implemented as ``Source`` in Akka
Streams guarantees that it will never emit more elements than the received total demand for any given ``Subscriber``.
.. note::
The Reactive Streams specification defines its protocol in terms of **Publishers** and **Subscribers**.
These types are *not* meant to be user facing API, instead they serve as the low level building blocks for
different Reactive Streams implementations.
Akka Streams implements these concepts as **Sources**, **Flows** (referred to as **Processor** in Reactive Streams)
and **Sinks** without exposing the Reactive Streams interfaces directly.
If you need to inter-op between different read :ref:`integration-with-Reactive-Streams-enabled-libraries`.
The mode in which Reactive Streams back-pressure works can be colloquially described as "dynamic push / pull mode",
since it will switch between push or pull based back-pressure models depending on if the downstream is able to cope
with the upstreams production rate or not.
To illustrate further let us consider both problem situations and how the back-pressure protocol handles them:
Slow Publisher, fast Subscriber
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
This is the happy case of coursewe do not need to slow down the Publisher in this case. However signalling rates are
rarely constant and could change at any point in time, suddenly ending up in a situation where the Subscriber is now
slower than the Publisher. In order to safeguard from these situations, the back-pressure protocol must still be enabled
during such situations, however we do not want to pay a high penalty for this safety net being enabled.
The Reactive Streams protocol solves this by asynchronously signalling from the Subscriber to the Publisher
`Request(n:Int)` signals. The protocol guarantees that the Publisher will never signal *more* than the demand it was
signalled. Since the Subscriber however is currently faster, it will be signalling these Request messages at a higher
rate (and possibly also batching together the demand - requesting multiple elements in one Request signal). This means
that the Publisher should not ever have to wait (be back-pressured) with publishing its incoming elements.
As we can see, in this scenario we effectively operate in so called push-mode since the Publisher can continue producing
elements as fast as it can, since the pending demand will be recovered just-in-time while it is emitting elements.
Fast Publisher, slow Subscriber
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
This is the case when back-pressuring the ``Publisher`` is required, because the ``Subscriber`` is not able to cope with
the rate at which its upstream would like to emit data elements.
Since the ``Publisher`` is not allowed to signal more elements than the pending demand signalled by the ``Subscriber``,
it will have to abide to this back-pressure by applying one of the below strategies:
- not generate elements, if it is able to control their production rate,
- try buffering the elements in a *bounded* manner until more demand is signalled,
- drop elements until more demand is signalled,
- tear down the stream if unable to apply any of the above strategies.
As we can see, this scenario effectively means that the ``Subscriber`` will *pull* the elements from the Publisher
this mode of operation is referred to as pull-based back-pressure.
.. _stream-materialization-scala:
Stream Materialization
----------------------
**TODO - write me (feel free to move around as well)**
When constructing flows and graphs in Akka Streams think of them as preparing a blueprint, an execution plan.
Stream materialization is the process of taking a stream description (the graph) and allocating all the necessary resources
it needs in order to run. In the case of Akka Streams this often means starting up Actors which power the processing,
but is not restricted to that - it could also mean opening files or socket connections etc. depending on what the stream needs.
Materialization is triggered at so called "terminal operations". Most notably this includes the various forms of the ``run()``
and ``runWith()`` methods defined on flow elements as well as a small number of special syntactic sugars for running with
well-known sinks, such as ``foreach(el => )`` (being an alias to ``runWith(Sink.foreach(el => ))``.
Materialization is currently performed synchronously on the materializing thread.
Tha actual stream processing is handled by :ref:`Actors actor-scala` started up during the streams materialization,
which will be running on the thread pools they have been configured to run on - which defaults to the dispatcher set in
:class:`MaterializationSettings` while constructing the :class:`FlowMaterializer`.
.. note::
Reusing *instances* of linear computation stages (Source, Sink, Flow) inside FlowGraphs is legal,
yet will materialize that stage multiple times.

View file

@ -0,0 +1,211 @@
.. _stream-graph-scala:
###################
Working with Graphs
###################
In Akka Streams computation graphs are not expressed using a fluent DSL like linear computations are, instead they are
written in a more graph-resembling DSL which aims to make translating graph drawings (e.g. from notes taken
from design discussions, or illustrations in protocol specifications) to and from code simpler. In this section we'll
dive into the multiple ways of constructing and re-using graphs, as well as explain common pitfalls and how to avoid them.
Graphs are needed whenever you want to perform any kind of fan-in ("multiple inputs") or fan-out ("multiple outputs") operations.
Considering linear Flows to be like roads, we can picture graph operations as junctions: multiple flows being connected at a single point.
Some graph operations which are common enough and fit the linear style of Flows, such as ``concat`` (which concatenates two
streams, such that the second one is consumed after the first one has completed), may have shorthand methods defined on
:class:`Flow` or :class:`Source` themselves, however you should keep in mind that those are also implemented as graph junctions.
.. _flow-graph-scala:
Constructing Flow Graphs
------------------------
Flow graphs are built from simple Flows which serve as the linear connections within the graphs as well as Junctions
which serve as fan-in and fan-out points for flows. Thanks to the junctions having meaningful types based on their behaviour
and making them explicit elements these elements should be rather straight forward to use.
Akka Streams currently provides these junctions:
* **Fan-out**
- ``Broadcast[T]`` (1 input, n outputs) signals each output given an input signal,
- ``Balance[T]`` (1 input => n outputs), signals one of its output ports given an input signal,
- ``UnZip[A,B]`` (1 input => 2 outputs), which is a specialized element which is able to split a stream of ``(A,B)`` tuples into two streams one type ``A`` and one of type ``B``,
- ``FlexiRoute[In]`` (1 input, n outputs), which enables writing custom fan out elements using a simple DSL,
* **Fan-in**
- ``Merge[In]`` (n inputs , 1 output), picks signals randomly from inputs pushing them one by one to its output,
- ``MergePreferred[In]`` like :class:`Merge` but if elements are available on ``preferred`` port, it picks from it, otherwise randomly from ``others``,
- ``ZipWith[A,B,...,Out]`` (n inputs (defined upfront), 1 output), which takes a function of n inputs that, given all inputs are signalled, transforms and emits 1 output,
+ ``Zip[A,B,Out]`` (2 inputs, 1 output), which is a :class:`ZipWith` specialised to zipping input streams of ``A`` and ``B`` into an ``(A,B)`` tuple stream,
- ``Concat[T]`` (2 inputs, 1 output), which enables to concatenate streams (first consume one, then the second one), thus the order of which stream is ``first`` and which ``second`` matters,
- ``FlexiMerge[Out]`` (n inputs, 1 output), which enables writing custom fan out elements using a simple DSL.
One of the goals of the FlowGraph DSL is to look similar to how one would draw a graph on a whiteboard, so that it is
simple to translate a design from whiteboard to code and be able to relate those two. Let's illustrate this by translating
the below hand drawn graph into Akka Streams:
.. image:: ../images/simple-graph-example.png
Such graph is simple to translate to the Graph DSL since each linear element corresponds to a :class:`Flow`,
and each circle corresponds to either a :class:`Junction` or a :class:`Source` or :class:`Sink` if it is beginning
or ending a :class:`Flow`. Junctions must always be created with defined type parameters, as otherwise the ``Nothing`` type
will be inferred and
.. includecode:: code/docs/stream/FlowGraphDocSpec.scala#simple-flow-graph
.. note::
Junction *reference equality* defines *graph node equality* (i.e. the same merge *instance* used in a FlowGraph
refers to the same location in the resulting graph).
Notice the ``import FlowGraphImplicits._`` which brings into scope the ``~>`` operator (read as "edge", "via" or "to").
It is also possible to construct graphs without the ``~>`` operator in case you prefer to use the graph builder explicitly:
.. includecode:: code/docs/stream/FlowGraphDocSpec.scala#simple-flow-graph-no-implicits
By looking at the snippets above, it should be apparent that **the** :class:`b:FlowGraphBuilder` **object is mutable**.
It is also used (implicitly) by the ``~>`` operator, also making it a mutable operation as well.
The reason for this design choice is to enable simpler creation of complex graphs, which may even contain cycles.
Once the FlowGraph has been constructed though, the :class:`FlowGraph` instance *is immutable, thread-safe, and freely shareable*.
Linear Flows however are always immutable and appending an operation to a Flow always returns a new Flow instance.
This means that you can safely re-use one given Flow in multiple places in a processing graph. In the example below
we prepare a graph that consists of two parallel streams, in which we re use the same instance of :class:`Flow`,
yet it will properly be materialized as two connections between the corresponding Sources and Sinks:
.. includecode:: code/docs/stream/FlowGraphDocSpec.scala#flow-graph-reusing-a-flow
.. _partial-flow-graph-scala:
Constructing and combining Partial Flow Graphs
----------------------------------------------
Sometimes it is not possible (or needed) to construct the entire computation graph in one place, but instead construct
all of its different phases in different places and in the end connect them all into a complete graph and run it.
This can be achieved using :class:`PartialFlowGraph`. The reason of representing it as a different type is that a
:class:`FlowGraph` requires all ports to be connected, and if they are not it will throw an exception at construction
time, which helps to avoid simple wiring errors while working with graphs. A partial flow graph however does not perform
this validation, and allows graphs that are not yet fully connected.
A :class:`PartialFlowGraph` is defined as a :class:`FlowGraph` which contains so called "undefined elements",
such as ``UndefinedSink[T]`` or ``UndefinedSource[T]``, which can be reused and plugged into by consumers of that
partial flow graph. Let's imagine we want to provide users with a specialized element that given 3 inputs will pick
the greatest int value of each zipped triple. We'll want to expose 3 input ports (undefined sources) and one output port
(undefined sink).
.. includecode:: code/docs/stream/StreamPartialFlowGraphDocSpec.scala#simple-partial-flow-graph
As you can see, first we construct the partial graph that contains all the zipping and comparing of stream
elements, then we import it (all of its nodes and connections) explicitly to the :class:`FlowGraph` instance in which all
the undefined elements are rewired to real sources and sinks. The graph can then be run and yields the expected result.
.. warning::
Please note that a :class:`FlowGraph` is not able to provide compile time type-safety about whether or not all
elements have been properly connected - this validation is performed as a runtime check during the graph's instantiation.
.. _constructing-sources-sinks-flows-from-partial-graphs-scala:
Constructing Sources, Sinks and Flows from a Partial Graphs
-----------------------------------------------------------
Instead of treating a :class:`PartialFlowGraph` as simply a collection of flows and junctions which may not yet all be
connected it is sometimes useful to expose such complex graph as a simpler structure,
such as a :class:`Source`, :class:`Sink` or :class:`Flow`.
In fact, these concepts can be easily expressed as special cases of a partially connected graph:
* :class:`Source` is a partial flow graph with *exactly one* :class:`UndefinedSink`,
* :class:`Sink` is a partial flow graph with *exactly one* :class:`UndefinedSource`,
* :class:`Flow` is a partial flow graph with *exactly one* :class:`UndefinedSource` and *exactly one* :class:`UndefinedSource`.
Being able hide complex graphs inside of simple elements such as Sink / Source / Flow enables you to easily create one
complex element and from there on treat it as simple compound stage for linear computations.
In order to create a Source from a partial flow graph ``Source[T]`` provides a special apply method that takes a function
that must return an ``UndefinedSink[T]``. This undefined sink will become "the sink that must be attached before this Source
can run". Refer to the example below, in which we create a Source that zips together two numbers, to see this graph
construction in action:
.. includecode:: code/docs/stream/StreamPartialFlowGraphDocSpec.scala#source-from-partial-flow-graph
Similarly the same can be done for a ``Sink[T]``, in which case the returned value must be an ``UndefinedSource[T]``.
For defining a ``Flow[T]`` we need to expose both an undefined source and sink:
.. includecode:: code/docs/stream/StreamPartialFlowGraphDocSpec.scala#flow-from-partial-flow-graph
Graph cycles, liveness and deadlocks
------------------------------------
By default :class:`FlowGraph` does not allow (or to be precise, its builder does not allow) the creation of cycles.
The reason for this is that cycles need special considerations to avoid potential deadlocks and other liveness issues.
This section shows several examples of problems that can arise from the presence of feedback arcs in stream processing
graphs.
The first example demonstrates a graph that contains a naive cycle (the presence of cycles is enabled by calling
``allowCycles()`` on the builder). The graph takes elements from the source, prints them, then broadcasts those elements
to a consumer (we just used ``Sink.ignore`` for now) and to a feedback arc that is merged back into the main stream via
a ``Merge`` junction.
.. includecode:: code/docs/stream/GraphCyclesSpec.scala#deadlocked
Running this we observe that after a few numbers have been printed, no more elements are logged to the console -
all processing stops after some time. After some investigation we observe that:
* through merging from ``source`` we increase the number of elements flowing in the cycle
* by broadcasting back to the cycle we do not decrease the number of elements in the cycle
Since Akka Streams (and Reactive Streams in general) guarantee bounded processing (see the "Buffering" section for more
details) it means that only a bounded number of elements are buffered over any time span. Since our cycle gains more and
more elements, eventually all of its internal buffers become full, backpressuring ``source`` forever. To be able
to process more elements from ``source`` elements would need to leave the cycle somehow.
If we modify our feedback loop by replacing the ``Merge`` junction with a ``MergePreferred`` we can avoid the deadlock.
``MergePreferred`` is unfair as it always tries to consume from a preferred input port if there are elements available
before trying the other lower priority input ports. Since we feed back through the preferred port it is always guaranteed
that the elements in the cycles can flow.
.. includecode:: code/docs/stream/GraphCyclesSpec.scala#unfair
If we run the example we see that the same sequence of numbers are printed
over and over again, but the processing does not stop. Hence, we avoided the deadlock, but ``source`` is still
backpressured forever, because buffer space is never recovered: the only action we see is the circulation of a couple
of initial elements from ``source``.
.. note::
What we see here is that in certain cases we need to choose between boundedness and liveness. Our first example would
not deadlock if there would be an infinite buffer in the loop, or vice versa, if the elements in the cycle would
be balanced (as many elements are removed as many are injected) then there would be no deadlock.
To make our cycle both live (not deadlocking) and fair we can introduce a dropping element on the feedback arc. In this
case we chose the ``buffer()`` operation giving it a dropping strategy ``OverflowStrategy.dropHead``.
.. includecode:: code/docs/stream/GraphCyclesSpec.scala#dropping
If we run this example we see that
* The flow of elements does not stop, there are always elements printed
* We see that some of the numbers are printed several times over time (due to the feedback loop) but on average
the numbers are increasing in the long term
This example highlights that one solution to avoid deadlocks in the presence of potentially unbalanced cycles
(cycles where the number of circulating elements are unbounded) is to drop elements. An alternative would be to
define a larger buffer with ``OverflowStrategy.error`` which would fail the stream instead of deadlocking it after
all buffer space has been consumed.
As we discovered in the previous examples, the core problem was the unbalanced nature of the feedback loop. We
circumvented this issue by adding a dropping element, but now we want to build a cycle that is balanced from
the beginning instead. To achieve this we modify our first graph by replacing the ``Merge`` junction with a ``ZipWith``.
Since ``ZipWith`` takes one element from ``source`` *and* from the feedback arc to inject one element into the cycle,
we maintain the balance of elements.
.. includecode:: code/docs/stream/GraphCyclesSpec.scala#zipping-dead
Still, when we try to run the example it turns out that no element is printed at all! After some investigation we
realize that:
* In order to get the first element from ``source`` into the cycle we need an already existing element in the cycle
* In order to get an initial element in the cycle we need an element from ``source``
These two conditions are a typical "chicken-and-egg" problem. The solution is to inject an initial
element into the cycle that is independent from ``source``. We do this by using a ``Concat`` junction on the backwards
arc that injects a single element using ``Source.single``.
.. includecode:: code/docs/stream/GraphCyclesSpec.scala#zipping-live
When we run the above example we see that processing starts and never stops. The important takeaway from this example
is that balanced cycles often need an initial "kick-off" element to be injected into the cycle.

View file

@ -0,0 +1,379 @@
.. _stream-integrations-scala:
###########
Integration
###########
Integrating with Actors
=======================
:class:`ActorPublisher` and :class:`ActorSubscriber` are two traits that provides support for
implementing Reactive Streams :class:`Publisher` and :class:`Subscriber` with an :class:`Actor`.
These can be consumed by other Reactive Stream libraries or used as a
Akka Streams :class:`Source` or :class:`Sink`.
.. warning::
:class:`ActorPublisher` and :class:`ActorSubscriber` cannot be used with remote actors,
because if signals of the Reactive Streams protocol (e.g. ``request``) are lost the
the stream may deadlock.
ActorPublisher
^^^^^^^^^^^^^^
Extend/mixin :class:`akka.stream.actor.ActorPublisher` in your :class:`Actor` to make it a
stream publisher that keeps track of the subscription life cycle and requested elements.
Here is an example of such an actor. It dispatches incoming jobs to the attached subscriber:
.. includecode:: code/docs/stream/ActorPublisherDocSpec.scala#job-manager
You send elements to the stream by calling ``onNext``. You are allowed to send as many
elements as have been requested by the stream subscriber. This amount can be inquired with
``totalDemand``. It is only allowed to use ``onNext`` when ``isActive`` and ``totalDemand>0``,
otherwise ``onNext`` will throw ``IllegalStateException``.
When the stream subscriber requests more elements the ``ActorPublisher.Request`` message
is delivered to this actor, and you can act on that event. The ``totalDemand``
is updated automatically.
When the stream subscriber cancels the subscription the ``ActorPublisher.Cancel`` message
is delivered to this actor. After that subsequent calls to ``onNext`` will be ignored.
You can complete the stream by calling ``onComplete``. After that you are not allowed to
call ``onNext``, ``onError`` and ``onComplete``.
You can terminate the stream with failure by calling ``onError``. After that you are not allowed to
call ``onNext``, ``onError`` and ``onComplete``.
If you suspect that this ``ActorPublisher`` may never get subscribed to, you can override the ``subscriptionTimeout``
method to provide a timeout after which this Publisher should be considered canceled. The actor will be notified when
the timeout triggers via an ``ActorPublisherMessage.SubscriptionTimeoutExceeded`` message and MUST then perform
cleanup and stop itself.
If the actor is stopped the stream will be completed, unless it was not already terminated with
failure, completed or canceled.
More detailed information can be found in the API documentation.
This is how it can be used as input :class:`Source` to a :class:`Flow`:
.. includecode:: code/docs/stream/ActorPublisherDocSpec.scala#actor-publisher-usage
You can only attach one subscriber to this publisher. Use ``Sink.fanoutPublisher`` to enable
multiple subscribers.
ActorSubscriber
^^^^^^^^^^^^^^^
Extend/mixin :class:`akka.stream.actor.ActorSubscriber` in your :class:`Actor` to make it a
stream subscriber with full control of stream back pressure. It will receive
``ActorSubscriberMessage.OnNext``, ``ActorSubscriberMessage.OnComplete`` and ``ActorSubscriberMessage.OnError``
messages from the stream. It can also receive other, non-stream messages, in the same way as any actor.
Here is an example of such an actor. It dispatches incoming jobs to child worker actors:
.. includecode:: code/docs/stream/ActorSubscriberDocSpec.scala#worker-pool
Subclass must define the ``RequestStrategy`` to control stream back pressure.
After each incoming message the ``ActorSubscriber`` will automatically invoke
the ``RequestStrategy.requestDemand`` and propagate the returned demand to the stream.
* The provided ``WatermarkRequestStrategy`` is a good strategy if the actor performs work itself.
* The provided ``MaxInFlightRequestStrategy`` is useful if messages are queued internally or
delegated to other actors.
* You can also implement a custom ``RequestStrategy`` or call ``request`` manually together with
``ZeroRequestStrategy`` or some other strategy. In that case
you must also call ``request`` when the actor is started or when it is ready, otherwise
it will not receive any elements.
More detailed information can be found in the API documentation.
This is how it can be used as output :class:`Sink` to a :class:`Flow`:
.. includecode:: code/docs/stream/ActorSubscriberDocSpec.scala#actor-subscriber-usage
Integrating with External Services
==================================
Stream transformations and side effects involving external non-stream based services can be
performed with ``mapAsync`` or ``mapAsyncUnordered``.
For example, sending emails to the authors of selected tweets using an external
email service:
.. includecode:: code/docs/stream/IntegrationDocSpec.scala#email-server-send
We start with the tweet stream of authors:
.. includecode:: code/docs/stream/IntegrationDocSpec.scala#tweet-authors
Assume that we can lookup their email address using:
.. includecode:: code/docs/stream/IntegrationDocSpec.scala#email-address-lookup
Transforming the stream of authors to a stream of email addresses by using the ``lookupEmail``
service can be done with ``mapAsync``:
.. includecode:: code/docs/stream/IntegrationDocSpec.scala#email-addresses-mapAsync
Finally, sending the emails:
.. includecode:: code/docs/stream/IntegrationDocSpec.scala#send-emails
``mapAsync`` is applying the given function that is calling out to the external service to
each of the elements as they pass through this processing step. The function returns a :class:`Future`
and the value of that future will be emitted downstreams. As many futures as requested elements by
downstream may run in parallel and may complete in any order, but the elements that
are emitted downstream are in the same order as received from upstream.
That means that back-pressure works as expected. For example if the ``emailServer.send``
is the bottleneck it will limit the rate at which incoming tweets are retrieved and
email addresses looked up.
Note that ``mapAsync`` preserves the order of the stream elements. In this example the order
is not important and then we can use the more efficient ``mapAsyncUnordered``:
.. includecode:: code/docs/stream/IntegrationDocSpec.scala#external-service-mapAsyncUnordered
In the above example the services conveniently returned a :class:`Future` of the result.
If that is not the case you need to wrap the call in a :class:`Future`. If the service call
involves blocking you must also make sure that you run it on a dedicated execution context, to
avoid starvation and disturbance of other tasks in the system.
.. includecode:: code/docs/stream/IntegrationDocSpec.scala#blocking-mapAsync
The configuration of the ``"blocking-dispatcher"`` may look something like:
.. includecode:: code/docs/stream/IntegrationDocSpec.scala#blocking-dispatcher-config
An alternative for blocking calls is to perform them in a ``map`` operation, still using a
dedicated dispatcher for that operation.
.. includecode:: code/docs/stream/IntegrationDocSpec.scala#blocking-map
However, that is not exactly the same as ``mapAsync``, since the ``mapAsync`` may run
several calls concurrently, but ``map`` performs them one at a time.
For a service that is exposed as an actor, or if an actor is used as a gateway in front of an
external service, you can use ``ask``:
.. includecode:: code/docs/stream/IntegrationDocSpec.scala#save-tweets
Note that if the ``ask`` is not completed within the given timeout the stream is completed with failure.
If that is not desired outcome you can use ``recover`` on the ``ask`` :class:`Future`.
Illustrating ordering and parallelism
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
Let us look at another example to get a better understanding of the ordering
and parallelism characteristics of ``mapAsync`` and ``mapAsyncUnordered``.
Several ``mapAsync`` and ``mapAsyncUnordered`` futures may run concurrently.
The number of concurrent futures are limited by the downstream demand.
For example, if 5 elements have been requested by downstream there will be at most 5
futures in progress.
``mapAsync`` emits the future results in the same order as the input elements
were received. That means that completed results are only emitted downstreams
when earlier results have been completed and emitted. One slow call will thereby
delay the results of all successive calls, even though they are completed before
the slow call.
``mapAsyncUnordered`` emits the future results as soon as they are completed, i.e.
it is possible that the elements are not emitted downstream in the same order as
received from upstream. One slow call will thereby not delay the results of faster
successive calls as long as there is downstream demand of several elements.
Here is a fictive service that we can use to illustrate these aspects.
.. includecode:: code/docs/stream/IntegrationDocSpec.scala#sometimes-slow-service
Elements starting with a lower case character are simulated to take longer time
to process.
Here is how we can use it with ``mapAsync``:
.. includecode:: code/docs/stream/IntegrationDocSpec.scala#sometimes-slow-mapAsync
The output may look like this:
::
before: a
before: B
before: C
before: D
running: a (1)
running: B (2)
before: e
running: C (3)
before: F
running: D (4)
before: g
before: H
completed: C (3)
completed: B (2)
completed: D (1)
completed: a (0)
after: A
after: B
running: e (1)
after: C
after: D
running: F (2)
before: i
before: J
running: g (3)
running: H (4)
completed: H (2)
completed: F (3)
completed: e (1)
completed: g (0)
after: E
after: F
running: i (1)
after: G
after: H
running: J (2)
completed: J (1)
completed: i (0)
after: I
after: J
Note that ``after`` lines are in the same order as the ``before`` lines even
though elements are ``completed`` in a different order. For example ``H``
is ``completed`` before ``g``, but still emitted afterwards.
The numbers in parenthesis illustrates how many calls that are in progress at
the same time. Here the downstream demand and thereby the number of concurrent
calls are limited by the buffer size (4) of the :class:`MaterializerSettings`.
Here is how we can use the same service with ``mapAsyncUnordered``:
.. includecode:: code/docs/stream/IntegrationDocSpec.scala#sometimes-slow-mapAsyncUnordered
The output may look like this:
::
before: a
before: B
before: C
before: D
running: a (1)
running: B (2)
before: e
running: C (3)
before: F
running: D (4)
before: g
before: H
completed: B (3)
completed: C (1)
completed: D (2)
after: B
after: D
running: e (2)
after: C
running: F (3)
before: i
before: J
completed: F (2)
after: F
running: g (3)
running: H (4)
completed: H (3)
after: H
completed: a (2)
after: A
running: i (3)
running: J (4)
completed: J (3)
after: J
completed: e (2)
after: E
completed: g (1)
after: G
completed: i (0)
after: I
Note that ``after`` lines are not in the same order as the ``before`` lines. For example
``H`` overtakes the slow ``G``.
The numbers in parenthesis illustrates how many calls that are in progress at
the same time. Here the downstream demand and thereby the number of concurrent
calls are limited by the buffer size (4) of the :class:`MaterializerSettings`.
Integrating with Reactive Streams
=================================
`Reactive Streams`_ defines a standard for asynchronous stream processing with non-blocking
back pressure. It makes it possible to plug together stream libraries that adhere to the standard.
Akka Streams is one such library.
An incomplete list of other implementations:
* `Reactor (1.1+)`_
* `RxJava`_
* `Ratpack`_
* `Slick`_
.. _Reactive Streams: http://reactive-streams.org/
.. _Reactor (1.1+): http://github.com/reactor/reactor
.. _RxJava: https://github.com/ReactiveX/RxJavaReactiveStreams
.. _Ratpack: http://www.ratpack.io/manual/current/streams.html
.. _Slick: http://slick.typesafe.com
The two most important interfaces in Reactive Streams are the :class:`Publisher` and :class:`Subscriber`.
.. includecode:: code/docs/stream/ReactiveStreamsDocSpec.scala#imports
Let us assume that a library provides a publisher of tweets:
.. includecode:: code/docs/stream/ReactiveStreamsDocSpec.scala#tweets-publisher
and another library knows how to store author handles in a database:
.. includecode:: code/docs/stream/ReactiveStreamsDocSpec.scala#author-storage-subscriber
Using an Akka Streams :class:`Flow` we can transform the stream and connect those:
.. includecode:: code/docs/stream/ReactiveStreamsDocSpec.scala
:include: authors,connect-all
The :class:`Publisher` is used as an input :class:`Source` to the flow and the
:class:`Subscriber` is used as an output :class:`Sink`.
A :class:`Flow` can also be materialized to a :class:`Subscriber`, :class:`Publisher` pair:
.. includecode:: code/docs/stream/ReactiveStreamsDocSpec.scala#flow-publisher-subscriber
A publisher can be connected to a subscriber with the ``subscribe`` method.
It is also possible to expose a :class:`Source` as a :class:`Publisher`
by using the ``publisher`` :class:`Sink`:
.. includecode:: code/docs/stream/ReactiveStreamsDocSpec.scala#source-publisher
A publisher that is created with ``Sink.publisher`` only supports one subscriber. A second
subscription attempt will be rejected with an :class:`IllegalStateException`.
A publisher that supports multiple subscribers can be created with ``Sink.fanoutPublisher``
instead:
.. includecode:: code/docs/stream/ReactiveStreamsDocSpec.scala
:include: author-alert-subscriber,author-storage-subscriber
.. includecode:: code/docs/stream/ReactiveStreamsDocSpec.scala#source-fanoutPublisher
The buffer size controls how far apart the slowest subscriber can be from the fastest subscriber
before slowing down the stream.
To make the picture complete, it is also possible to expose a :class:`Sink` as a :class:`Subscriber`
by using the ``subscriber`` :class:`Source`:
.. includecode:: code/docs/stream/ReactiveStreamsDocSpec.scala#sink-subscriber

View file

@ -0,0 +1,62 @@
.. _stream-introduction-scala:
############
Introduction
############
Motivation
==========
The way we consume services from the internet today includes many instances of
streaming data, both downloading from a service as well as uploading to it or
peer-to-peer data transfers. Regarding data as a stream of elements instead of
in its entirety is very useful because it matches the way computers send and
receive them (for example via TCP), but it is often also a necessity because
data sets frequently become too large to be handled as a whole. We spread
computations or analyses over large clusters and call it “big data”, where the
whole principle of processing them is by feeding those data sequentially—as a
stream—through some CPUs.
Actors can be seen as dealing with streams as well: they send and receive
series of messages in order to transfer knowledge (or data) from one place to
another. We have found it tedious and error-prone to implement all the proper
measures in order to achieve stable streaming between actors, since in addition
to sending and receiving we also need to take care to not overflow any buffers
or mailboxes in the process. Another pitfall is that Actor messages can be lost
and must be retransmitted in that case lest the stream have holes on the
receiving side. When dealing with streams of elements of a fixed given type,
Actors also do not currently offer good static guarantees that no wiring errors
are made: type-safety could be improved in this case.
For these reasons we decided to bundle up a solution to these problems as an
Akka Streams API. The purpose is to offer an intuitive and safe way to
formulate stream processing setups such that we can then execute them
efficiently and with bounded resource usage—no more OutOfMemoryErrors. In order
to achieve this our streams need to be able to limit the buffering that they
employ, they need to be able to slow down producers if the consumers cannot
keep up. This feature is called back-pressure and is at the core of the
[Reactive Streams](http://reactive-streams.org/) initiative of which Akka is a
founding member. For you this means that the hard problem of propagating and
reacting to back-pressure has been incorporated in the design of Akka Streams
already, so you have one less thing to worry about; it also means that Akka
Streams interoperate seamlessly with all other Reactive Streams implementations
(where Reactive Streams interfaces define the interoperability SPI while
implementations like Akka Streams offer a nice user API).
How to read these docs
======================
Stream processing is a different paradigm to the Actor Model or to Future
composition, therefore it may take some careful study of this subject until you
feel familiar with the tools and techniques. The documentation is here to help
and for best results we recommend the following approach:
* Read the :ref:`quickstart-scala` to get a feel for how streams
look like and what they can do.
* The top-down learners may want to peruse the :ref:`stream-design` at this
point.
* The bottom-up learners may feel more at home rummaging through the
:ref:`stream-cookbook-scala`.
* The other sections can be read sequentially or as needed during the previous
steps, each digging deeper into specific topics.

View file

@ -0,0 +1,7 @@
.. _stream-io-scala:
#########################
Working with streaming IO
#########################
*TODO*

View file

@ -1,166 +1,167 @@
.. _quickstart-scala:
Quick Start: Reactive Tweets
============================
A typical use case for stream processing is consuming a live stream of data that we want to extract or aggregate some
other data from. In this example we'll consider consuming a stream of tweets and extracting information concerning Akka from them.
We will also consider the problem inherent to all non-blocking streaming solutions "*What if the subscriber is slower
to consume the live stream of data?*" i.e. it is unable to keep up with processing the live data. Traditionally the solution
is often to buffer the elements, but this can (and usually *will*) cause eventual buffer overflows and instability of such systems.
Instead Akka Streams depend on internal backpressure signals that allow to control what should happen in such scenarios.
Here's the data model we'll be working with throughout the quickstart examples:
.. includecode:: code/docs/stream/TwitterStreamQuickstartDocSpec.scala#model
Transforming and consuming simple streams
-----------------------------------------
In order to prepare our environment by creating an :class:`ActorSystem` and :class:`FlowMaterializer`,
which will be responsible for materializing and running the streams we are about to create:
.. includecode:: code/docs/stream/TwitterStreamQuickstartDocSpec.scala#materializer-setup
The :class:`FlowMaterializer` can optionally take :class:`MaterializerSettings` which can be used to define
materialization properties, such as default buffer sizes (see also :ref:`stream-buffering-explained-scala`), the dispatcher to
be used by the pipeline etc. These can be overridden on an element-by-element basis or for an entire section, but this
will be discussed in depth in :ref:`stream-section-configuration`.
Let's assume we have a stream of tweets readily available, in Akka this is expressed as a :class:`Source[Out]`:
.. includecode:: code/docs/stream/TwitterStreamQuickstartDocSpec.scala#tweet-source
Streams always start flowing from a :class:`Source[Out]` then can continue through :class:`Flow[In,Out]` elements or
more advanced graph elements to finally be consumed by a :class:`Sink[In]`. Both Sources and Flows provide stream operations
that can be used to transform the flowing data, a :class:`Sink` however does not since its the "end of stream" and its
behavior depends on the type of :class:`Sink` used.
In our case let's say we want to find all twitter handles of users which tweet about ``#akka``, the operations should look
familiar to anyone who has used the Scala Collections library, however they operate on streams and not collections of data:
.. includecode:: code/docs/stream/TwitterStreamQuickstartDocSpec.scala#authors-filter-map
Finally in order to :ref:`materialize <stream-materialization-scala>` and run the stream computation we need to attach
the Flow to a :class:`Sink[T]` that will get the flow running. The simplest way to do this is to call
``runWith(sink)`` on a ``Source[Out]``. For convenience a number of common Sinks are predefined and collected as methods on
the :class:``Sink`` `companion object <http://doc.akka.io/api/akka-stream-and-http-experimental/1.0-M2-SNAPSHOT/#akka.stream.scaladsl.Sink$>`_.
For now let's simply print each author:
.. includecode:: code/docs/stream/TwitterStreamQuickstartDocSpec.scala#authors-foreachsink-println
or by using the shorthand version (which are defined only for the most popular sinks such as :class:`FoldSink` and :class:`ForeachSink`):
.. includecode:: code/docs/stream/TwitterStreamQuickstartDocSpec.scala#authors-foreach-println
Materializing and running a stream always requires a :class:`FlowMaterializer` to be in implicit scope (or passed in explicitly,
like this: ``.run(mat)``).
Flattening sequences in streams
-------------------------------
In the previous section we were working on 1:1 relationships of elements which is the most common case, but sometimes
we might want to map from one element to a number of elements and receive a "flattened" stream, similarly like ``flatMap``
works on Scala Collections. In order to get a flattened stream of hashtags from our stream of tweets we can use the ``mapConcat``
combinator:
.. includecode:: code/docs/stream/TwitterStreamQuickstartDocSpec.scala#hashtags-mapConcat
.. note::
The name ``flatMap`` was consciously avoided due to its proximity with for-comprehensions and monadic composition.
It is problematic for two reasons: firstly, flattening by concatenation is often undesirable in bounded stream processing
due to the risk of deadlock (with merge being the preferred strategy), and secondly, the monad laws would not hold for
our implementation of flatMap (due to the liveness issues).
Please note that the mapConcat requires the supplied function to return a strict collection (``f:Out⇒immutable.Seq[T]``),
whereas ``flatMap`` would have to operate on streams all the way through.
Broadcasting a stream
---------------------
Now let's say we want to persist all hashtags, as well as all author names from this one live stream.
For example we'd like to write all author handles into one file, and all hashtags into another file on disk.
This means we have to split the source stream into 2 streams which will handle the writing to these different files.
Elements that can be used to form such "fan-out" (or "fan-in") structures are referred to as "junctions" in Akka Streams.
One of these that we'll be using in this example is called :class:`Broadcast`, and it simply emits elements from its
input port to all of its output ports.
Akka Streams intentionally separate the linear stream structures (Flows) from the non-linear, branching ones (FlowGraphs)
in order to offer the most convenient API for both of these cases. Graphs can express arbitrarily complex stream setups
at the expense of not reading as familiarly as collection transformations. It is also possible to wrap complex computation
graphs as Flows, Sinks or Sources, which will be explained in detail in :ref:`constructing-sources-sinks-flows-from-partial-graphs-scala`.
FlowGraphs are constructed like this:
.. includecode:: code/docs/stream/TwitterStreamQuickstartDocSpec.scala#flow-graph-broadcast
.. note::
The ``~>`` (read as "edge", "via" or "to") operator is only available if ``FlowGraphImplicits._`` are imported.
Without this import you can still construct graphs using the ``builder.addEdge(from,[through,]to)`` method.
As you can see, inside the :class:`FlowGraph` we use an implicit graph builder to mutably construct the graph
using the ``~>`` "edge operator" (also read as "connect" or "via" or "to"). Once we have the FlowGraph in the value ``g``
*it is immutable, thread-safe, and freely shareable*. A graph can can be ``run()`` directly - assuming all
ports (sinks/sources) within a flow have been connected properly. It is possible to construct :class:`PartialFlowGraph` s
where this is not required but this will be covered in detail in :ref:`partial-flow-graph-scala`.
As all Akka Streams elements, :class:`Broadcast` will properly propagate back-pressure to its upstream element.
Back-pressure in action
-----------------------
One of the main advantages of Akka Streams is that they *always* propagate back-pressure information from stream Sinks
(Subscribers) to their Sources (Publishers). It is not an optional feature, and is enabled at all times. To learn more
about the back-pressure protocol used by Akka Streams and all other Reactive Streams compatible implementations read
:ref:`back-pressure-explained-scala`.
A typical problem applications (not using Akka Streams) like this often face is that they are unable to process the incoming data fast enough,
either temporarily or by design, and will start buffering incoming data until there's no more space to buffer, resulting
in either ``OutOfMemoryError`` s or other severe degradations of service responsiveness. With Akka Streams buffering can
and must be handled explicitly. For example, if we are only interested in the "*most recent tweets, with a buffer of 10
elements*" this can be expressed using the ``buffer`` element:
.. includecode:: code/docs/stream/TwitterStreamQuickstartDocSpec.scala#tweets-slow-consumption-dropHead
The ``buffer`` element takes an explicit and required ``OverflowStrategy``, which defines how the buffer should react
when it receives another element element while it is full. Strategies provided include dropping the oldest element (``dropHead``),
dropping the entire buffer, signalling errors etc. Be sure to pick and choose the strategy that fits your use case best.
Materialized values
-------------------
So far we've been only processing data using Flows and consuming it into some kind of external Sink - be it by printing
values or storing them in some external system. However sometimes we may be interested in some value that can be
obtained from the materialized processing pipeline. For example, we want to know how many tweets we have processed.
While this question is not as obvious to give an answer to in case of an infinite stream of tweets (one way to answer
this question in a streaming setting would to create a stream of counts described as "*up until now*, we've processed N tweets"),
but in general it is possible to deal with finite streams and come up with a nice result such as a total count of elements.
First, let's write such an element counter using :class:`FoldSink` and then we'll see how it is possible to obtain materialized
values from a :class:`MaterializedMap` which is returned by materializing an Akka stream. We'll split execution into multiple
lines for the sake of explaining the concepts of ``Materializable`` elements and ``MaterializedType``
.. includecode:: code/docs/stream/TwitterStreamQuickstartDocSpec.scala#tweets-fold-count
First, we prepare the :class:`FoldSink` which will be used to sum all ``Int`` elements of the stream.
Next we connect the ``tweets`` stream though a ``map`` step which converts each tweet into the number ``1``,
finally we connect the flow ``to`` the previously prepared Sink. Notice that this step does *not* yet materialize the
processing pipeline, it merely prepares the description of the Flow, which is now connected to a Sink, and therefore can
be ``run()``, as indicated by its type: :class:`RunnableFlow`. Next we call ``run()`` which uses the implicit :class:`FlowMaterializer`
to materialize and run the flow. The value returned by calling ``run()`` on a ``RunnableFlow`` or ``FlowGraph`` is ``MaterializedMap``,
which can be used to retrieve materialized values from the running stream.
In order to extract an materialized value from a running stream it is possible to call ``get(Materializable)`` on a materialized map
obtained from materializing a flow or graph. Since ``FoldSink`` implements ``Materializable`` and implements the ``MaterializedType``
as ``Future[Int]`` we can use it to obtain the :class:`Future` which when completed will contain the total length of our tweets stream.
In case of the stream failing, this future would complete with a Failure.
The reason we have to ``get`` the value out from the materialized map, is because a :class:`RunnableFlow` may be reused
and materialized multiple times, because it is just the "blueprint" of the stream. This means that if we materialize a stream,
for example one that consumes a live stream of tweets within a minute, the materialized values for those two materializations
will be different, as illustrated by this example:
.. includecode:: code/docs/stream/TwitterStreamQuickstartDocSpec.scala#tweets-runnable-flow-materialized-twice
Many elements in Akka Streams provide materialized values which can be used for obtaining either results of computation or
steering these elements which will be discussed in detail in :ref:`stream-materialization-scala`. Summing up this section, now we know
what happens behind the scenes when we run this one-liner, which is equivalent to the multi line version above:
.. includecode:: code/docs/stream/TwitterStreamQuickstartDocSpec.scala#tweets-fold-count-oneline
.. _quickstart-scala:
Quick Start: Reactive Tweets
============================
A typical use case for stream processing is consuming a live stream of data that we want to extract or aggregate some
other data from. In this example we'll consider consuming a stream of tweets and extracting information concerning Akka from them.
We will also consider the problem inherent to all non-blocking streaming solutions "*What if the subscriber is slower
to consume the live stream of data?*" i.e. it is unable to keep up with processing the live data. Traditionally the solution
is often to buffer the elements, but this can (and usually *will*) cause eventual buffer overflows and instability of such systems.
Instead Akka Streams depend on internal backpressure signals that allow to control what should happen in such scenarios.
Here's the data model we'll be working with throughout the quickstart examples:
.. includecode:: code/docs/stream/TwitterStreamQuickstartDocSpec.scala#model
Transforming and consuming simple streams
-----------------------------------------
In order to prepare our environment by creating an :class:`ActorSystem` and :class:`FlowMaterializer`,
which will be responsible for materializing and running the streams we are about to create:
.. includecode:: code/docs/stream/TwitterStreamQuickstartDocSpec.scala#materializer-setup
The :class:`FlowMaterializer` can optionally take :class:`MaterializerSettings` which can be used to define
materialization properties, such as default buffer sizes (see also :ref:`stream-buffering-explained-scala`), the dispatcher to
be used by the pipeline etc. These can be overridden on an element-by-element basis or for an entire section, but this
will be discussed in depth in :ref:`stream-section-configuration`.
Let's assume we have a stream of tweets readily available, in Akka this is expressed as a :class:`Source[Out]`:
.. includecode:: code/docs/stream/TwitterStreamQuickstartDocSpec.scala#tweet-source
Streams always start flowing from a :class:`Source[Out]` then can continue through :class:`Flow[In,Out]` elements or
more advanced graph elements to finally be consumed by a :class:`Sink[In]`. Both Sources and Flows provide stream operations
that can be used to transform the flowing data, a :class:`Sink` however does not since its the "end of stream" and its
behavior depends on the type of :class:`Sink` used.
In our case let's say we want to find all twitter handles of users which tweet about ``#akka``, the operations should look
familiar to anyone who has used the Scala Collections library, however they operate on streams and not collections of data:
.. includecode:: code/docs/stream/TwitterStreamQuickstartDocSpec.scala#authors-filter-map
Finally in order to :ref:`materialize <stream-materialization-scala>` and run the stream computation we need to attach
the Flow to a :class:`Sink[T]` that will get the flow running. The simplest way to do this is to call
``runWith(sink)`` on a ``Source[Out]``. For convenience a number of common Sinks are predefined and collected as methods on
the :class:``Sink`` `companion object <http://doc.akka.io/api/akka-stream-and-http-experimental/1.0-M2-SNAPSHOT/#akka.stream.scaladsl.Sink$>`_.
For now let's simply print each author:
.. includecode:: code/docs/stream/TwitterStreamQuickstartDocSpec.scala#authors-foreachsink-println
or by using the shorthand version (which are defined only for the most popular sinks such as :class:`FoldSink` and :class:`ForeachSink`):
.. includecode:: code/docs/stream/TwitterStreamQuickstartDocSpec.scala#authors-foreach-println
Materializing and running a stream always requires a :class:`FlowMaterializer` to be in implicit scope (or passed in explicitly,
like this: ``.run(mat)``).
Flattening sequences in streams
-------------------------------
In the previous section we were working on 1:1 relationships of elements which is the most common case, but sometimes
we might want to map from one element to a number of elements and receive a "flattened" stream, similarly like ``flatMap``
works on Scala Collections. In order to get a flattened stream of hashtags from our stream of tweets we can use the ``mapConcat``
combinator:
.. includecode:: code/docs/stream/TwitterStreamQuickstartDocSpec.scala#hashtags-mapConcat
.. note::
The name ``flatMap`` was consciously avoided due to its proximity with for-comprehensions and monadic composition.
It is problematic for two reasons: firstly, flattening by concatenation is often undesirable in bounded stream processing
due to the risk of deadlock (with merge being the preferred strategy), and secondly, the monad laws would not hold for
our implementation of flatMap (due to the liveness issues).
Please note that the mapConcat requires the supplied function to return a strict collection (``f:Out⇒immutable.Seq[T]``),
whereas ``flatMap`` would have to operate on streams all the way through.
Broadcasting a stream
---------------------
Now let's say we want to persist all hashtags, as well as all author names from this one live stream.
For example we'd like to write all author handles into one file, and all hashtags into another file on disk.
This means we have to split the source stream into 2 streams which will handle the writing to these different files.
Elements that can be used to form such "fan-out" (or "fan-in") structures are referred to as "junctions" in Akka Streams.
One of these that we'll be using in this example is called :class:`Broadcast`, and it simply emits elements from its
input port to all of its output ports.
Akka Streams intentionally separate the linear stream structures (Flows) from the non-linear, branching ones (FlowGraphs)
in order to offer the most convenient API for both of these cases. Graphs can express arbitrarily complex stream setups
at the expense of not reading as familiarly as collection transformations. It is also possible to wrap complex computation
graphs as Flows, Sinks or Sources, which will be explained in detail in :ref:`constructing-sources-sinks-flows-from-partial-graphs-scala`.
FlowGraphs are constructed like this:
.. includecode:: code/docs/stream/TwitterStreamQuickstartDocSpec.scala#flow-graph-broadcast
.. note::
The ``~>`` (read as "edge", "via" or "to") operator is only available if ``FlowGraphImplicits._`` are imported.
Without this import you can still construct graphs using the ``builder.addEdge(from,[through,]to)`` method.
As you can see, inside the :class:`FlowGraph` we use an implicit graph builder to mutably construct the graph
using the ``~>`` "edge operator" (also read as "connect" or "via" or "to"). Once we have the FlowGraph in the value ``g``
*it is immutable, thread-safe, and freely shareable*. A graph can can be ``run()`` directly - assuming all
ports (sinks/sources) within a flow have been connected properly. It is possible to construct :class:`PartialFlowGraph` s
where this is not required but this will be covered in detail in :ref:`partial-flow-graph-scala`.
As all Akka Streams elements, :class:`Broadcast` will properly propagate back-pressure to its upstream element.
Back-pressure in action
-----------------------
One of the main advantages of Akka Streams is that they *always* propagate back-pressure information from stream Sinks
(Subscribers) to their Sources (Publishers). It is not an optional feature, and is enabled at all times. To learn more
about the back-pressure protocol used by Akka Streams and all other Reactive Streams compatible implementations read
:ref:`back-pressure-explained-scala`.
A typical problem applications (not using Akka Streams) like this often face is that they are unable to process the incoming data fast enough,
either temporarily or by design, and will start buffering incoming data until there's no more space to buffer, resulting
in either ``OutOfMemoryError`` s or other severe degradations of service responsiveness. With Akka Streams buffering can
and must be handled explicitly. For example, if we are only interested in the "*most recent tweets, with a buffer of 10
elements*" this can be expressed using the ``buffer`` element:
.. includecode:: code/docs/stream/TwitterStreamQuickstartDocSpec.scala#tweets-slow-consumption-dropHead
The ``buffer`` element takes an explicit and required ``OverflowStrategy``, which defines how the buffer should react
when it receives another element element while it is full. Strategies provided include dropping the oldest element (``dropHead``),
dropping the entire buffer, signalling errors etc. Be sure to pick and choose the strategy that fits your use case best.
Materialized values
-------------------
So far we've been only processing data using Flows and consuming it into some kind of external Sink - be it by printing
values or storing them in some external system. However sometimes we may be interested in some value that can be
obtained from the materialized processing pipeline. For example, we want to know how many tweets we have processed.
While this question is not as obvious to give an answer to in case of an infinite stream of tweets (one way to answer
this question in a streaming setting would to create a stream of counts described as "*up until now*, we've processed N tweets"),
but in general it is possible to deal with finite streams and come up with a nice result such as a total count of elements.
First, let's write such an element counter using :class:`FoldSink` and then we'll see how it is possible to obtain materialized
values from a :class:`MaterializedMap` which is returned by materializing an Akka stream. We'll split execution into multiple
lines for the sake of explaining the concepts of ``Materializable`` elements and ``MaterializedType``
.. includecode:: code/docs/stream/TwitterStreamQuickstartDocSpec.scala#tweets-fold-count
First, we prepare the :class:`FoldSink` which will be used to sum all ``Int`` elements of the stream.
Next we connect the ``tweets`` stream though a ``map`` step which converts each tweet into the number ``1``,
finally we connect the flow ``to`` the previously prepared Sink. Notice that this step does *not* yet materialize the
processing pipeline, it merely prepares the description of the Flow, which is now connected to a Sink, and therefore can
be ``run()``, as indicated by its type: :class:`RunnableFlow`. Next we call ``run()`` which uses the implicit :class:`FlowMaterializer`
to materialize and run the flow. The value returned by calling ``run()`` on a ``RunnableFlow`` or ``FlowGraph`` is ``MaterializedMap``,
which can be used to retrieve materialized values from the running stream.
In order to extract an materialized value from a running stream it is possible to call ``get(Materializable)`` on a materialized map
obtained from materializing a flow or graph. Since ``FoldSink`` implements ``Materializable`` and implements the ``MaterializedType``
as ``Future[Int]`` we can use it to obtain the :class:`Future` which when completed will contain the total length of our tweets stream.
In case of the stream failing, this future would complete with a Failure.
The reason we have to ``get`` the value out from the materialized map, is because a :class:`RunnableFlow` may be reused
and materialized multiple times, because it is just the "blueprint" of the stream. This means that if we materialize a stream,
for example one that consumes a live stream of tweets within a minute, the materialized values for those two materializations
will be different, as illustrated by this example:
.. includecode:: code/docs/stream/TwitterStreamQuickstartDocSpec.scala#tweets-runnable-flow-materialized-twice
Many elements in Akka Streams provide materialized values which can be used for obtaining either results of computation or
steering these elements which will be discussed in detail in :ref:`stream-materialization-scala`. Summing up this section, now we know
what happens behind the scenes when we run this one-liner, which is equivalent to the multi line version above:
.. includecode:: code/docs/stream/TwitterStreamQuickstartDocSpec.scala#tweets-fold-count-oneline

View file

@ -0,0 +1,31 @@
.. _stream-rate-scala:
#############################
Buffers and working with rate
#############################
Buffers in Akka Streams
=======================
Internal buffers and their effect
---------------------------------
*TODO*
Explicit user defined buffers
-----------------------------
*TODO*
Rate transformation
===================
Understanding conflate
----------------------
*TODO*
Understanding expand
--------------------
*TODO*