diff --git a/akka-docs-dev/rst/images/asyncBoundary.png b/akka-docs-dev/rst/images/asyncBoundary.png new file mode 100644 index 0000000000..337283728d Binary files /dev/null and b/akka-docs-dev/rst/images/asyncBoundary.png differ diff --git a/akka-docs-dev/rst/java/stream-customize.rst b/akka-docs-dev/rst/java/stream-customize.rst index 4ddd97e949..b5cb0a57c0 100644 --- a/akka-docs-dev/rst/java/stream-customize.rst +++ b/akka-docs-dev/rst/java/stream-customize.rst @@ -414,7 +414,7 @@ initialization. The buffer has demand for up to two elements without any downstr The following code example demonstrates a buffer class corresponding to the message sequence chart above. -.. includecode:: code/docs/stream/GraphStageDocSpec.scala#detached +.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/GraphStageDocTest.java#detached Thread safety of custom processing stages ========================================= diff --git a/akka-docs-dev/rst/java/stream-flows-and-basics.rst b/akka-docs-dev/rst/java/stream-flows-and-basics.rst index 1dc89ef952..941112663e 100644 --- a/akka-docs-dev/rst/java/stream-flows-and-basics.rst +++ b/akka-docs-dev/rst/java/stream-flows-and-basics.rst @@ -49,7 +49,8 @@ can hand it back for further use to an underlying thread-pool. .. _defining-and-running-streams-java: Defining and running streams ----------------------------- +============================ + Linear processing pipelines can be expressed in Akka Streams using the following core abstractions: Source @@ -110,7 +111,7 @@ to refer to the future: .. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/FlowDocTest.java#stream-reuse Defining sources, sinks and flows -^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +--------------------------------- The objects :class:`Source` and :class:`Sink` define various ways to create sources and sinks of elements. The following examples show some of the most useful constructs (refer to the API documentation for more details): @@ -122,7 +123,8 @@ There are various ways to wire up different parts of a stream, the following exa .. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/FlowDocTest.java#flow-connecting Illegal stream elements -^^^^^^^^^^^^^^^^^^^^^^^ +----------------------- + In accordance to the Reactive Streams specification (`Rule 2.13 `_) Akka Streams do not allow ``null`` to be passed through the stream as an element. In case you want to model the concept of absence of a value we recommend using ``akka.japi.Option`` (for Java 6 and 7) or ``java.util.Optional`` which is available since Java 8. @@ -130,7 +132,8 @@ of absence of a value we recommend using ``akka.japi.Option`` (for Java 6 and 7) .. _back-pressure-explained-java: Back-pressure explained ------------------------ +======================= + Akka Streams implement an asynchronous non-blocking back-pressure protocol standardised by the `Reactive Streams`_ specification, which Akka is a founding member of. @@ -164,7 +167,8 @@ with the upstream production rate or not. To illustrate this further let us consider both problem situations and how the back-pressure protocol handles them: Slow Publisher, fast Subscriber -^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +------------------------------- + This is the happy case of course – we do not need to slow down the Publisher in this case. However signalling rates are rarely constant and could change at any point in time, suddenly ending up in a situation where the Subscriber is now slower than the Publisher. In order to safeguard from these situations, the back-pressure protocol must still be enabled @@ -180,7 +184,8 @@ As we can see, in this scenario we effectively operate in so called push-mode si elements as fast as it can, since the pending demand will be recovered just-in-time while it is emitting elements. Fast Publisher, slow Subscriber -^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +------------------------------- + This is the case when back-pressuring the ``Publisher`` is required, because the ``Subscriber`` is not able to cope with the rate at which its upstream would like to emit data elements. @@ -198,7 +203,7 @@ this mode of operation is referred to as pull-based back-pressure. .. _stream-materialization-java: Stream Materialization ----------------------- +====================== When constructing flows and graphs in Akka Streams think of them as preparing a blueprint, an execution plan. Stream materialization is the process of taking a stream description (the graph) and allocating all the necessary resources @@ -220,8 +225,62 @@ which will be running on the thread pools they have been configured to run on - .. _flow-combine-mat-java: +Operator Fusion +--------------- + +Akka Streams 2.0 contains an initial version of stream operator fusion support. This means that +the processing steps of a flow or stream graph can be executed within the same Actor and has three +consequences: + + * starting up a stream may take longer than before due to executing the fusion algorithm + * passing elements from one processing stage to the next is a lot faster between fused + stages due to avoiding the asynchronous messaging overhead + * fused stream processing stages do no longer run in parallel to each other, meaning that + only up to one CPU core is used for each fused part + +The first point can be countered by pre-fusing and then reusing a stream blueprint as sketched below: + +.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/FlowDocTest.java#explicit-fusing + +In order to balance the effects of the second and third bullet points you will have to insert asynchronous +boundaries manually into your flows and graphs by way of adding ``Attributes.asyncBoundary`` to pieces that +shall communicate with the rest of the graph in an asynchronous fashion. + +.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/FlowDocTest.java#flow-async + +In this example we create two regions within the flow which will be executed in one Actor each—assuming that adding +and multiplying integers is an extremely costly operation this will lead to a performance gain since two CPUs can +work on the tasks in parallel. It is important to note that asynchronous boundaries are not singular places within a +flow where elements are passed asynchronously (as in other streaming libraries), but instead attributes always work +by adding information to the flow graph that has been constructed up to this point: + +| + +.. image:: ../images/asyncBoundary.png + :align: center + :width: 700 + +| + +This means that everything that is inside the red bubble will be executed by one actor and everything outside of it +by another. This scheme can be applied successively, always having one such boundary enclose the previous ones plus all +processing stages that have been added since them. + +.. warning:: + + Without fusing (i.e. up to version 2.0-M2) each stream processing stage had an implicit input buffer + that holds a few elements for efficiency reasons. If your flow graphs contain cycles then these buffers + may have been crucial in order to avoid deadlocks. With fusing these implicit buffers are no longer + there, data elements are passed without buffering between fused stages. In those cases where buffering + is needed in order to allow the stream to run at all, you will have to insert explicit buffers with the + ``.buffer()`` combinator—typically a buffer of size 2 is enough to allow a feedback loop to function. + +The new fusing behavior can be disabled by setting the configuration parameter ``akka.stream.materializer.auto-fusing=off``. +In that case you can still manually fuse those graphs which shall run on less Actors. With the exception of the +:class:`SslTlsStage` and the ``groupBy`` operator all built-in processing stages can be fused. + Combining materialized values -^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +----------------------------- Since every processing stage in Akka Streams can provide a materialized value after being materialized, it is necessary to somehow express how these values should be composed to a final value when we plug these stages together. For this, diff --git a/akka-docs-dev/rst/scala/code/docs/stream/FlowDocSpec.scala b/akka-docs-dev/rst/scala/code/docs/stream/FlowDocSpec.scala index 9ba042651c..8e0a3ea344 100644 --- a/akka-docs-dev/rst/scala/code/docs/stream/FlowDocSpec.scala +++ b/akka-docs-dev/rst/scala/code/docs/stream/FlowDocSpec.scala @@ -220,4 +220,29 @@ class FlowDocSpec extends AkkaSpec { //#flow-mat-combine } + + "explicit fusing" in { + //#explicit-fusing + import akka.stream.Fusing + + val flow = Flow[Int].map(_ * 2).filter(_ > 500) + val fused = Fusing.aggressive(flow) + + Source.fromIterator { () => Iterator from 0 } + .via(fused) + .take(1000) + //#explicit-fusing + } + + "defining asynchronous boundaries" in { + //#flow-async + import akka.stream.Attributes.asyncBoundary + + Source(List(1, 2, 3)) + .map(_ + 1) + .withAttributes(asyncBoundary) + .map(_ * 2) + .to(Sink.ignore) + //#flow-async + } } diff --git a/akka-docs-dev/rst/scala/stream-flows-and-basics.rst b/akka-docs-dev/rst/scala/stream-flows-and-basics.rst index acb765107e..285618b5c3 100644 --- a/akka-docs-dev/rst/scala/stream-flows-and-basics.rst +++ b/akka-docs-dev/rst/scala/stream-flows-and-basics.rst @@ -49,7 +49,8 @@ can hand it back for further use to an underlying thread-pool. .. _defining-and-running-streams-scala: Defining and running streams ----------------------------- +============================ + Linear processing pipelines can be expressed in Akka Streams using the following core abstractions: Source @@ -114,7 +115,7 @@ to refer to the future: .. includecode:: code/docs/stream/FlowDocSpec.scala#stream-reuse Defining sources, sinks and flows -^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +--------------------------------- The objects :class:`Source` and :class:`Sink` define various ways to create sources and sinks of elements. The following examples show some of the most useful constructs (refer to the API documentation for more details): @@ -126,7 +127,8 @@ There are various ways to wire up different parts of a stream, the following exa .. includecode:: code/docs/stream/FlowDocSpec.scala#flow-connecting Illegal stream elements -^^^^^^^^^^^^^^^^^^^^^^^ +----------------------- + In accordance to the Reactive Streams specification (`Rule 2.13 `_) Akka Streams do not allow ``null`` to be passed through the stream as an element. In case you want to model the concept of absence of a value we recommend using ``scala.Option`` or ``scala.util.Either``. @@ -134,7 +136,8 @@ of absence of a value we recommend using ``scala.Option`` or ``scala.util.Either .. _back-pressure-explained-scala: Back-pressure explained ------------------------ +======================= + Akka Streams implement an asynchronous non-blocking back-pressure protocol standardised by the `Reactive Streams`_ specification, which Akka is a founding member of. @@ -168,7 +171,8 @@ with the upstream production rate or not. To illustrate this further let us consider both problem situations and how the back-pressure protocol handles them: Slow Publisher, fast Subscriber -^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +------------------------------- + This is the happy case of course – we do not need to slow down the Publisher in this case. However signalling rates are rarely constant and could change at any point in time, suddenly ending up in a situation where the Subscriber is now slower than the Publisher. In order to safeguard from these situations, the back-pressure protocol must still be enabled @@ -184,7 +188,8 @@ As we can see, in this scenario we effectively operate in so called push-mode si elements as fast as it can, since the pending demand will be recovered just-in-time while it is emitting elements. Fast Publisher, slow Subscriber -^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +------------------------------- + This is the case when back-pressuring the ``Publisher`` is required, because the ``Subscriber`` is not able to cope with the rate at which its upstream would like to emit data elements. @@ -202,7 +207,7 @@ this mode of operation is referred to as pull-based back-pressure. .. _stream-materialization-scala: Stream Materialization ----------------------- +====================== When constructing flows and graphs in Akka Streams think of them as preparing a blueprint, an execution plan. Stream materialization is the process of taking a stream description (the graph) and allocating all the necessary resources @@ -222,10 +227,64 @@ which will be running on the thread pools they have been configured to run on - Reusing *instances* of linear computation stages (Source, Sink, Flow) inside composite Graphs is legal, yet will materialize that stage multiple times. +Operator Fusion +--------------- + +Akka Streams 2.0 contains an initial version of stream operator fusion support. This means that +the processing steps of a flow or stream graph can be executed within the same Actor and has three +consequences: + + * starting up a stream may take longer than before due to executing the fusion algorithm + * passing elements from one processing stage to the next is a lot faster between fused + stages due to avoiding the asynchronous messaging overhead + * fused stream processing stages do no longer run in parallel to each other, meaning that + only up to one CPU core is used for each fused part + +The first point can be countered by pre-fusing and then reusing a stream blueprint as sketched below: + +.. includecode:: code/docs/stream/FlowDocSpec.scala#explicit-fusing + +In order to balance the effects of the second and third bullet points you will have to insert asynchronous +boundaries manually into your flows and graphs by way of adding ``Attributes.asyncBoundary`` to pieces that +shall communicate with the rest of the graph in an asynchronous fashion. + +.. includecode:: code/docs/stream/FlowDocSpec.scala#flow-async + +In this example we create two regions within the flow which will be executed in one Actor each—assuming that adding +and multiplying integers is an extremely costly operation this will lead to a performance gain since two CPUs can +work on the tasks in parallel. It is important to note that asynchronous boundaries are not singular places within a +flow where elements are passed asynchronously (as in other streaming libraries), but instead attributes always work +by adding information to the flow graph that has been constructed up to this point: + +| + +.. image:: ../images/asyncBoundary.png + :align: center + :width: 700 + +| + +This means that everything that is inside the red bubble will be executed by one actor and everything outside of it +by another. This scheme can be applied successively, always having one such boundary enclose the previous ones plus all +processing stages that have been added since them. + +.. warning:: + + Without fusing (i.e. up to version 2.0-M2) each stream processing stage had an implicit input buffer + that holds a few elements for efficiency reasons. If your flow graphs contain cycles then these buffers + may have been crucial in order to avoid deadlocks. With fusing these implicit buffers are no longer + there, data elements are passed without buffering between fused stages. In those cases where buffering + is needed in order to allow the stream to run at all, you will have to insert explicit buffers with the + ``.buffer()`` combinator—typically a buffer of size 2 is enough to allow a feedback loop to function. + +The new fusing behavior can be disabled by setting the configuration parameter ``akka.stream.materializer.auto-fusing=off``. +In that case you can still manually fuse those graphs which shall run on less Actors. With the exception of the +:class:`SslTlsStage` and the ``groupBy`` operator all built-in processing stages can be fused. + .. _flow-combine-mat-scala: Combining materialized values -^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +----------------------------- Since every processing stage in Akka Streams can provide a materialized value after being materialized, it is necessary to somehow express how these values should be composed to a final value when we plug these stages together. For this,