#19235 document fusing

This commit is contained in:
Roland Kuhn 2015-12-22 22:10:34 +01:00
parent a733096564
commit 939319e84d
5 changed files with 160 additions and 17 deletions

View file

@ -220,4 +220,29 @@ class FlowDocSpec extends AkkaSpec {
//#flow-mat-combine
}
"explicit fusing" in {
//#explicit-fusing
import akka.stream.Fusing
val flow = Flow[Int].map(_ * 2).filter(_ > 500)
val fused = Fusing.aggressive(flow)
Source.fromIterator { () => Iterator from 0 }
.via(fused)
.take(1000)
//#explicit-fusing
}
"defining asynchronous boundaries" in {
//#flow-async
import akka.stream.Attributes.asyncBoundary
Source(List(1, 2, 3))
.map(_ + 1)
.withAttributes(asyncBoundary)
.map(_ * 2)
.to(Sink.ignore)
//#flow-async
}
}

View file

@ -49,7 +49,8 @@ can hand it back for further use to an underlying thread-pool.
.. _defining-and-running-streams-scala:
Defining and running streams
----------------------------
============================
Linear processing pipelines can be expressed in Akka Streams using the following core abstractions:
Source
@ -114,7 +115,7 @@ to refer to the future:
.. includecode:: code/docs/stream/FlowDocSpec.scala#stream-reuse
Defining sources, sinks and flows
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
---------------------------------
The objects :class:`Source` and :class:`Sink` define various ways to create sources and sinks of elements. The following
examples show some of the most useful constructs (refer to the API documentation for more details):
@ -126,7 +127,8 @@ There are various ways to wire up different parts of a stream, the following exa
.. includecode:: code/docs/stream/FlowDocSpec.scala#flow-connecting
Illegal stream elements
^^^^^^^^^^^^^^^^^^^^^^^
-----------------------
In accordance to the Reactive Streams specification (`Rule 2.13 <https://github.com/reactive-streams/reactive-streams-jvm#2.13>`_)
Akka Streams do not allow ``null`` to be passed through the stream as an element. In case you want to model the concept
of absence of a value we recommend using ``scala.Option`` or ``scala.util.Either``.
@ -134,7 +136,8 @@ of absence of a value we recommend using ``scala.Option`` or ``scala.util.Either
.. _back-pressure-explained-scala:
Back-pressure explained
-----------------------
=======================
Akka Streams implement an asynchronous non-blocking back-pressure protocol standardised by the `Reactive Streams`_
specification, which Akka is a founding member of.
@ -168,7 +171,8 @@ with the upstream production rate or not.
To illustrate this further let us consider both problem situations and how the back-pressure protocol handles them:
Slow Publisher, fast Subscriber
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
-------------------------------
This is the happy case of course we do not need to slow down the Publisher in this case. However signalling rates are
rarely constant and could change at any point in time, suddenly ending up in a situation where the Subscriber is now
slower than the Publisher. In order to safeguard from these situations, the back-pressure protocol must still be enabled
@ -184,7 +188,8 @@ As we can see, in this scenario we effectively operate in so called push-mode si
elements as fast as it can, since the pending demand will be recovered just-in-time while it is emitting elements.
Fast Publisher, slow Subscriber
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
-------------------------------
This is the case when back-pressuring the ``Publisher`` is required, because the ``Subscriber`` is not able to cope with
the rate at which its upstream would like to emit data elements.
@ -202,7 +207,7 @@ this mode of operation is referred to as pull-based back-pressure.
.. _stream-materialization-scala:
Stream Materialization
----------------------
======================
When constructing flows and graphs in Akka Streams think of them as preparing a blueprint, an execution plan.
Stream materialization is the process of taking a stream description (the graph) and allocating all the necessary resources
@ -222,10 +227,64 @@ which will be running on the thread pools they have been configured to run on -
Reusing *instances* of linear computation stages (Source, Sink, Flow) inside composite Graphs is legal,
yet will materialize that stage multiple times.
Operator Fusion
---------------
Akka Streams 2.0 contains an initial version of stream operator fusion support. This means that
the processing steps of a flow or stream graph can be executed within the same Actor and has three
consequences:
* starting up a stream may take longer than before due to executing the fusion algorithm
* passing elements from one processing stage to the next is a lot faster between fused
stages due to avoiding the asynchronous messaging overhead
* fused stream processing stages do no longer run in parallel to each other, meaning that
only up to one CPU core is used for each fused part
The first point can be countered by pre-fusing and then reusing a stream blueprint as sketched below:
.. includecode:: code/docs/stream/FlowDocSpec.scala#explicit-fusing
In order to balance the effects of the second and third bullet points you will have to insert asynchronous
boundaries manually into your flows and graphs by way of adding ``Attributes.asyncBoundary`` to pieces that
shall communicate with the rest of the graph in an asynchronous fashion.
.. includecode:: code/docs/stream/FlowDocSpec.scala#flow-async
In this example we create two regions within the flow which will be executed in one Actor each—assuming that adding
and multiplying integers is an extremely costly operation this will lead to a performance gain since two CPUs can
work on the tasks in parallel. It is important to note that asynchronous boundaries are not singular places within a
flow where elements are passed asynchronously (as in other streaming libraries), but instead attributes always work
by adding information to the flow graph that has been constructed up to this point:
|
.. image:: ../images/asyncBoundary.png
:align: center
:width: 700
|
This means that everything that is inside the red bubble will be executed by one actor and everything outside of it
by another. This scheme can be applied successively, always having one such boundary enclose the previous ones plus all
processing stages that have been added since them.
.. warning::
Without fusing (i.e. up to version 2.0-M2) each stream processing stage had an implicit input buffer
that holds a few elements for efficiency reasons. If your flow graphs contain cycles then these buffers
may have been crucial in order to avoid deadlocks. With fusing these implicit buffers are no longer
there, data elements are passed without buffering between fused stages. In those cases where buffering
is needed in order to allow the stream to run at all, you will have to insert explicit buffers with the
``.buffer()`` combinator—typically a buffer of size 2 is enough to allow a feedback loop to function.
The new fusing behavior can be disabled by setting the configuration parameter ``akka.stream.materializer.auto-fusing=off``.
In that case you can still manually fuse those graphs which shall run on less Actors. With the exception of the
:class:`SslTlsStage` and the ``groupBy`` operator all built-in processing stages can be fused.
.. _flow-combine-mat-scala:
Combining materialized values
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
-----------------------------
Since every processing stage in Akka Streams can provide a materialized value after being materialized, it is necessary
to somehow express how these values should be composed to a final value when we plug these stages together. For this,