From 685da0b80a8fbe50194a2d2b2be4a015151ad185 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Thu, 5 Feb 2015 13:30:14 +0100 Subject: [PATCH] =str #16652 Add java docs for 'Basics and working with Flows' --- akka-docs-dev/rst/java/stream-customize.rst | 2 +- .../rst/java/stream-flows-and-basics.rst | 217 ++++++++++++++++++ .../scala/code/docs/stream/FlowDocSpec.scala | 2 +- akka-docs-dev/rst/scala/stream-customize.rst | 2 +- .../rst/scala/stream-flows-and-basics.rst | 1 - 5 files changed, 220 insertions(+), 4 deletions(-) create mode 100644 akka-docs-dev/rst/java/stream-flows-and-basics.rst diff --git a/akka-docs-dev/rst/java/stream-customize.rst b/akka-docs-dev/rst/java/stream-customize.rst index bc64bafe09..4c3dcfac48 100644 --- a/akka-docs-dev/rst/java/stream-customize.rst +++ b/akka-docs-dev/rst/java/stream-customize.rst @@ -109,7 +109,7 @@ emit additional elementss and call ``ctx.finish()`` or ``ctx.pushAndFinish()`` e The reason for this slightly complex termination sequence is that the underlying ``onComplete`` signal of Reactive Streams may arrive without any pending demand, i.e. without respecting backpressure. This means that our push/pull structure that was illustrated in the figure of our custom processing chain does not - apply to termination. Unlike our neat model that is analogous to a ball that bounces back-and-forth in a + apply to termination. Our neat model that is analogous to a ball that bounces back-and-forth in a pipe (it bounces back on ``Filter``, ``Duplicator`` for example) cannot describe the termination signals. By calling ``absorbTermination()`` the execution environment checks if the conceptual token was *above* the current stage at that time (which means that it will never come back, so the environment immediately calls ``onPull``) or it was diff --git a/akka-docs-dev/rst/java/stream-flows-and-basics.rst b/akka-docs-dev/rst/java/stream-flows-and-basics.rst new file mode 100644 index 0000000000..5ffc1c027b --- /dev/null +++ b/akka-docs-dev/rst/java/stream-flows-and-basics.rst @@ -0,0 +1,217 @@ +.. _stream-flow-java: + +############################# +Basics and working with Flows +############################# + +Core concepts +============= + +Akka Streams is a library to process and transfer a sequence of elements using bounded buffer space. This +latter property is what we refer to as *boundedness* and it is the defining feature of Akka Streams. Translated to +everyday terms it is possible to express a chain (or as we see later, graphs) of processing entities, each executing +independently (and possibly concurrently) from the others while only buffering a limited number of elements at any given +time. This property of bounded buffers is one of the differences from the actor model, where each actor usually has +an unbounded, or a bounded, but dropping mailbox. Akka Stream processing entities have bounded "mailboxes" that +do not drop. + +Before we move on, let's define some basic terminology which will be used throughout the entire documentation: + +Stream + An active process that involves moving and transforming data. +Element + An element is the processing unit of streams. All operations transform and transfer elements from upstream to + downstream. Buffer sizes are always expressed as number of elements independently form the actual size of the elements. +Back-pressure + A means of flow-control, a way for consumers of data to notify a producer about their current availability, effectively + slowing down the upstream producer to match their consumption speeds. + In the context of Akka Streams back-pressure is always understood as *non-blocking* and *asynchronous*. +Processing Stage + The common name for all building blocks that build up a Flow or FlowGraph. + Examples of a processing stage would be operations like ``map()``, ``filter()``, stages added by ``transform()`` like + :class:`PushStage`, :class:`PushPullStage`, :class:`StatefulStage` and graph junctions like ``Merge`` or ``Broadcast``. + +Defining and running streams +---------------------------- +Linear processing pipelines can be expressed in Akka Streams using the following three core abstractions: + +Source + A processing stage with *exactly one output*, emitting data elements whenever downstream processing stages are + ready to receive them. +Sink + A processing stage with *exactly one input*, requesting and accepting data elements possibly slowing down the upstream + producer of elements +Flow + A processing stage which has *exactly one input and output*, which connects its up- and downstreams by + transforming the data elements flowing through it. +RunnableFlow + A Flow that has both ends "attached" to a Source and Sink respectively, and is ready to be ``run()``. + +It is possible to attach a ``Flow`` to a ``Source`` resulting in a composite source, and it is also possible to prepend +a ``Flow`` to a ``Sink`` to get a new sink. After a stream is properly terminated by having both a source and a sink, +it will be represented by the ``RunnableFlow`` type, indicating that it is ready to be executed. + +It is important to remember that even after constructing the ``RunnableFlow`` by connecting all the source, sink and +different processing stages, no data will flow through it until it is materialized. Materialization is the process of +allocating all resources needed to run the computation described by a Flow (in Akka Streams this will often involve +starting up Actors). Thanks to Flows being simply a description of the processing pipeline they are *immutable, +thread-safe, and freely shareable*, which means that it is for example safe to share and send them between actors, to have +one actor prepare the work, and then have it be materialized at some completely different place in the code. + +.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/FlowDocTest.java#materialization-in-steps + +After running (materializing) the ``RunnableFlow`` we get a special container object, the ``MaterializedMap``. Both +sources and sinks are able to put specific objects into this map. Whether they put something in or not is implementation +dependent. For example a ``FoldSink`` will make a ``Future`` available in this map which will represent the result +of the folding process over the stream. In general, a stream can expose multiple materialized values, +but it is quite common to be interested in only the value of the Source or the Sink in the stream. For this reason +there is a convenience method called ``runWith()`` available for ``Sink``, ``Source`` or ``Flow`` requiring, respectively, +a supplied ``Source`` (in order to run a ``Sink``), a ``Sink`` (in order to run a ``Source``) or +both a ``Source`` and a ``Sink`` (in order to run a ``Flow``, since it has neither attached yet). + +.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/FlowDocTest.java#materialization-runWith + +It is worth pointing out that since processing stages are *immutable*, connecting them returns a new processing stage, +instead of modifying the existing instance, so while constructing long flows, remember to assign the new value to a variable or run it: + +.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/FlowDocTest.java#source-immutable + +.. note:: + By default Akka Streams elements support **exactly one** downstream processing stage. + Making fan-out (supporting multiple downstream processing stages) an explicit opt-in feature allows default stream elements to + be less complex and more efficient. Also it allows for greater flexibility on *how exactly* to handle the multicast scenarios, + by providing named fan-out elements such as broadcast (signals all down-stream elements) or balance (signals one of available down-stream elements). + +In the above example we used the ``runWith`` method, which both materializes the stream and returns the materialized value +of the given sink or source. + +Since a stream can be materialized multiple times, the ``MaterializedMap`` returned is different for each materialization. +In the example below we create two running materialized instance of the stream that we described in the ``runnable`` +variable, and both materializations give us a different ``Future`` from the map even though we used the same ``sink`` +to refer to the future: + +.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/FlowDocTest.java#stream-reuse + +Defining sources, sinks and flows +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +The objects :class:`Source` and :class:`Sink` define various ways to create sources and sinks of elements. The following +examples show some of the most useful constructs (refer to the API documentation for more details): + +.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/FlowDocTest.java#source-sink + +There are various ways to wire up different parts of a stream, the following examples show some of the available options: + +.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/FlowDocTest.java#flow-connecting + + +.. _back-pressure-explained-java: + +Back-pressure explained +----------------------- +Akka Streams implement an asynchronous non-blocking back-pressure protocol standardised by the `Reactive Streams`_ +specification, which Akka is a founding member of. + +.. _Reactive Streams: http://reactive-streams.org/ + +The user of the library does not have to write any explicit back-pressure handling code — it is built in +and dealt with automatically by all of the provided Akka Streams processing stages. It is possible however to add +explicit buffer stages with overflow strategies that can influence the behaviour of the stream. This is especially important +in complex processing graphs which may even contain loops (which *must* be treated with very special +care, as explained in :ref:`graph-cycles-java`). + +The back pressure protocol is defined in terms of the number of elements a downstream ``Subscriber`` is able to receive +and buffer, referred to as ``demand``. +The source of data, referred to as ``Publisher`` in Reactive Streams terminology and implemented as ``Source`` in Akka +Streams, guarantees that it will never emit more elements than the received total demand for any given ``Subscriber``. + +.. note:: + + The Reactive Streams specification defines its protocol in terms of ``Publisher`` and ``Subscriber``. + These types are **not** meant to be user facing API, instead they serve as the low level building blocks for + different Reactive Streams implementations. + + Akka Streams implements these concepts as ``Source``, ``Flow`` (referred to as ``Processor`` in Reactive Streams) + and ``Sink`` without exposing the Reactive Streams interfaces directly. + If you need to integrate with other Reactive Stream libraries read :ref:`reactive-streams-integration-java`. + +The mode in which Reactive Streams back-pressure works can be colloquially described as "dynamic push / pull mode", +since it will switch between push and pull based back-pressure models depending on the downstream being able to cope +with the upstream production rate or not. + +To illustrate this further let us consider both problem situations and how the back-pressure protocol handles them: + +Slow Publisher, fast Subscriber +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +This is the happy case of course – we do not need to slow down the Publisher in this case. However signalling rates are +rarely constant and could change at any point in time, suddenly ending up in a situation where the Subscriber is now +slower than the Publisher. In order to safeguard from these situations, the back-pressure protocol must still be enabled +during such situations, however we do not want to pay a high penalty for this safety net being enabled. + +The Reactive Streams protocol solves this by asynchronously signalling from the Subscriber to the Publisher +``Request(int n)`` signals. The protocol guarantees that the Publisher will never signal *more* elements than the +signalled demand. Since the Subscriber however is currently faster, it will be signalling these Request messages at a higher +rate (and possibly also batching together the demand - requesting multiple elements in one Request signal). This means +that the Publisher should not ever have to wait (be back-pressured) with publishing its incoming elements. + +As we can see, in this scenario we effectively operate in so called push-mode since the Publisher can continue producing +elements as fast as it can, since the pending demand will be recovered just-in-time while it is emitting elements. + +Fast Publisher, slow Subscriber +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +This is the case when back-pressuring the ``Publisher`` is required, because the ``Subscriber`` is not able to cope with +the rate at which its upstream would like to emit data elements. + +Since the ``Publisher`` is not allowed to signal more elements than the pending demand signalled by the ``Subscriber``, +it will have to abide to this back-pressure by applying one of the below strategies: + +- not generate elements, if it is able to control their production rate, +- try buffering the elements in a *bounded* manner until more demand is signalled, +- drop elements until more demand is signalled, +- tear down the stream if unable to apply any of the above strategies. + +As we can see, this scenario effectively means that the ``Subscriber`` will *pull* the elements from the Publisher – +this mode of operation is referred to as pull-based back-pressure. + +.. _stream-materialization-java: + +Stream Materialization +---------------------- + +When constructing flows and graphs in Akka Streams think of them as preparing a blueprint, an execution plan. +Stream materialization is the process of taking a stream description (the graph) and allocating all the necessary resources +it needs in order to run. In the case of Akka Streams this often means starting up Actors which power the processing, +but is not restricted to that - it could also mean opening files or socket connections etc. – depending on what the stream needs. + +Materialization is triggered at so called "terminal operations". Most notably this includes the various forms of the ``run()`` +and ``runWith()`` methods defined on flow elements as well as a small number of special syntactic sugars for running with +well-known sinks, such as ``runForeach(el -> )`` (being an alias to ``runWith(Sink.foreach(el -> ))``. + +Materialization is currently performed synchronously on the materializing thread. +Tha actual stream processing is handled by :ref:`Actors actor-java` started up during the streams materialization, +which will be running on the thread pools they have been configured to run on - which defaults to the dispatcher set in +:class:`MaterializationSettings` while constructing the :class:`ActorFlowMaterializer`. + +.. note:: + Reusing *instances* of linear computation stages (Source, Sink, Flow) inside FlowGraphs is legal, + yet will materialize that stage multiple times. + + +Stream ordering +=============== +In Akka Streams almost all computation stages *preserve input order* of elements. This means that if inputs ``{IA1,IA2,...,IAn}`` +"cause" outputs ``{OA1,OA2,...,OAk}`` and inputs ``{IB1,IB2,...,IBm}`` "cause" outputs ``{OB1,OB2,...,OBl}`` and all of +``IAi`` happened before all ``IBi`` then ``OAi`` happens before ``OBi``. + +This property is even uphold by async operations such as ``mapAsync``, however an unordered version exists +called ``mapAsyncUnordered`` which does not preserve this ordering. + +However, in the case of Junctions which handle multiple input streams (e.g. :class:`Merge`) the output order is, +in general, *not defined* for elements arriving on different input ports. That is a merge-like operation may emit ``Ai`` +before emitting ``Bi``, and it is up to its internal logic to decide the order of emitted elements. Specialized elements +such as ``Zip`` however *do guarantee* their outputs order, as each output element depends on all upstream elements having +been signalled already – thus the ordering in the case of zipping is defined by this property. + +If you find yourself in need of fine grained control over order of emitted elements in fan-in +scenarios consider using :class:`MergePreferred` or :class:`FlexiMerge` – which gives you full control over how the +merge is performed. diff --git a/akka-docs-dev/rst/scala/code/docs/stream/FlowDocSpec.scala b/akka-docs-dev/rst/scala/code/docs/stream/FlowDocSpec.scala index f312b9da72..2f64e89fc2 100644 --- a/akka-docs-dev/rst/scala/code/docs/stream/FlowDocSpec.scala +++ b/akka-docs-dev/rst/scala/code/docs/stream/FlowDocSpec.scala @@ -86,7 +86,7 @@ class FlowDocSpec extends AkkaSpec { //#compound-source-is-not-keyed-runWith //#compound-source-is-not-keyed-run - // retain the materialized map, in order to retrieve the timers Cancellable + // retain the materialized map, in order to retrieve the timer's Cancellable val materialized = timerMap.to(Sink.ignore).run() val timerCancellable = materialized.get(timer) timerCancellable.cancel() diff --git a/akka-docs-dev/rst/scala/stream-customize.rst b/akka-docs-dev/rst/scala/stream-customize.rst index 0ae2037497..e72ff1d787 100644 --- a/akka-docs-dev/rst/scala/stream-customize.rst +++ b/akka-docs-dev/rst/scala/stream-customize.rst @@ -109,7 +109,7 @@ emit additional elementss and call ``ctx.finish()`` or ``ctx.pushAndFinish()`` e The reason for this slightly complex termination sequence is that the underlying ``onComplete`` signal of Reactive Streams may arrive without any pending demand, i.e. without respecting backpressure. This means that our push/pull structure that was illustrated in the figure of our custom processing chain does not - apply to termination. Unlike our neat model that is analogous to a ball that bounces back-and-forth in a + apply to termination. Our neat model that is analogous to a ball that bounces back-and-forth in a pipe (it bounces back on ``Filter``, ``Duplicator`` for example) cannot describe the termination signals. By calling ``absorbTermination()`` the execution environment checks if the conceptual token was *above* the current stage at that time (which means that it will never come back, so the environment immediately calls ``onPull``) or it was diff --git a/akka-docs-dev/rst/scala/stream-flows-and-basics.rst b/akka-docs-dev/rst/scala/stream-flows-and-basics.rst index 2994bd1348..fe1fb281a4 100644 --- a/akka-docs-dev/rst/scala/stream-flows-and-basics.rst +++ b/akka-docs-dev/rst/scala/stream-flows-and-basics.rst @@ -177,7 +177,6 @@ this mode of operation is referred to as pull-based back-pressure. Stream Materialization ---------------------- -**TODO - write me (feel free to move around as well)** When constructing flows and graphs in Akka Streams think of them as preparing a blueprint, an execution plan. Stream materialization is the process of taking a stream description (the graph) and allocating all the necessary resources