diff --git a/akka-docs-dev/rst/scala/cookbook.rst b/akka-docs-dev/rst/scala/cookbook.rst index 090d581731..a5d2f5a97a 100644 --- a/akka-docs-dev/rst/scala/cookbook.rst +++ b/akka-docs-dev/rst/scala/cookbook.rst @@ -1,4 +1,4 @@ -.. _stream-cookbook-scala +.. _stream-cookbook-scala: ################ Streams Cookbook diff --git a/akka-docs-dev/rst/scala/graphs-cycles.rst b/akka-docs-dev/rst/scala/graphs-cycles.rst deleted file mode 100644 index 5e464d1267..0000000000 --- a/akka-docs-dev/rst/scala/graphs-cycles.rst +++ /dev/null @@ -1,81 +0,0 @@ -Graph cycles, liveness and deadlocks ------------------------------------- - -By default :class:`FlowGraph` does not allow (or to be precise, its builder does not allow) the creation of cycles. -The reason for this is that cycles need special considerations to avoid potential deadlocks and other liveness issues. -This section shows several examples of problems that can arise from the presence of feedback arcs in stream processing -graphs. - -The first example demonstrates a graph that contains a naive cycle (the presence of cycles is enabled by calling -``allowCycles()`` on the builder). The graph takes elements from the source, prints them, then broadcasts those elements -to a consumer (we just used ``Sink.ignore`` for now) and to a feedback arc that is merged back into the main stream via -a ``Merge`` junction. - -.. includecode:: code/docs/stream/GraphCyclesSpec.scala#deadlocked - -Running this we observe that after a few numbers have been printed, no more elements are logged to the console - -all processing stops after some time. After some investigation we observe that: - -* through merging from ``source`` we increase the number of elements flowing in the cycle -* by broadcasting back to the cycle we do not decrease the number of elements in the cycle - -Since Akka Streams (and Reactive Streams in general) guarantee bounded processing (see the "Buffering" section for more -details) it means that only a bounded number of elements are buffered over any time span. Since our cycle gains more and -more elements, eventually all of its internal buffers become full, backpressuring ``source`` forever. To be able -to process more elements from ``source`` elements would need to leave the cycle somehow. - -If we modify our feedback loop by replacing the ``Merge`` junction with a ``MergePreferred`` we can avoid the deadlock. -``MergePreferred`` is unfair as it always tries to consume from a preferred input port if there are elements available -before trying the other lower priority input ports. Since we feed back through the preferred port it is always guaranteed -that the elements in the cycles can flow. - -.. includecode:: code/docs/stream/GraphCyclesSpec.scala#unfair - -If we run the example we see that the same sequence of numbers are printed -over and over again, but the processing does not stop. Hence, we avoided the deadlock, but ``source`` is still -backpressured forever, because buffer space is never recovered: the only action we see is the circulation of a couple -of initial elements from ``source``. - -.. note:: - What we see here is that in certain cases we need to choose between boundedness and liveness. Our first example would - not deadlock if there would be an infinite buffer in the loop, or vice versa, if the elements in the cycle would - be balanced (as many elements are removed as many are injected) then there would be no deadlock. - -To make our cycle both live (not deadlocking) and fair we can introduce a dropping element on the feedback arc. In this -case we chose the ``buffer()`` operation giving it a dropping strategy ``OverflowStrategy.dropHead``. - -.. includecode:: code/docs/stream/GraphCyclesSpec.scala#dropping - -If we run this example we see that - -* The flow of elements does not stop, there are always elements printed -* We see that some of the numbers are printed several times over time (due to the feedback loop) but on average -the numbers are increasing in the long term - -This example highlights that one solution to avoid deadlocks in the presence of potentially unbalanced cycles -(cycles where the number of circulating elements are unbounded) is to drop elements. An alternative would be to -define a larger buffer with ``OverflowStrategy.error`` which would fail the stream instead of deadlocking it after -all buffer space has been consumed. - -As we discovered in the previous examples, the core problem was the unbalanced nature of the feedback loop. We -circumvented this issue by adding a dropping element, but now we want to build a cycle that is balanced from -the beginning instead. To achieve this we modify our first graph by replacing the ``Merge`` junction with a ``ZipWith``. -Since ``ZipWith`` takes one element from ``source`` *and* from the feedback arc to inject one element into the cycle, -we maintain the balance of elements. - -.. includecode:: code/docs/stream/GraphCyclesSpec.scala#zipping-dead - -Still, when we try to run the example it turns out that no element is printed at all! After some investigation we -realize that: - -* In order to get the first element from ``source`` into the cycle we need an already existing element in the cycle -* In order to get an initial element in the cycle we need an element from ``source`` - -These two conditions are a typical "chicken-and-egg" problem. The solution is to inject an initial -element into the cycle that is independent from ``source``. We do this by using a ``Concat`` junction on the backwards -arc that injects a single element using ``Source.single``. - -.. includecode:: code/docs/stream/GraphCyclesSpec.scala#zipping-live - -When we run the above example we see that processing starts and never stops. The important takeaway from this example -is that balanced cycles often need an initial "kick-off" element to be injected into the cycle. \ No newline at end of file diff --git a/akka-docs-dev/rst/scala/stream-customize.rst b/akka-docs-dev/rst/scala/stream-customize.rst new file mode 100644 index 0000000000..73d8caa803 --- /dev/null +++ b/akka-docs-dev/rst/scala/stream-customize.rst @@ -0,0 +1,44 @@ +.. _stream-customize-scala: + +######################## +Custom stream processing +######################## + +Custom linear processing stages +=============================== + +Using PushStage +--------------- + +*TODO* + + +Using PushPullStage +------------------- + +*TODO* + +Using StatefulStage +------------------- + +*TODO* + +Using DetachedStage +------------------- + +*TODO* + +Custom graph processing junctions +================================= + +Using FlexiMerge +---------------- + +*TODO* + +Using FlexiRoute +---------------- + +*TODO* + + diff --git a/akka-docs-dev/rst/scala/stream-flows-and-basics.rst b/akka-docs-dev/rst/scala/stream-flows-and-basics.rst new file mode 100644 index 0000000000..b83237bc39 --- /dev/null +++ b/akka-docs-dev/rst/scala/stream-flows-and-basics.rst @@ -0,0 +1,167 @@ +.. _stream-flow-scala: + +############################# +Basics and working with Flows +############################# + +Core concepts +============= + +Everything in Akka Streams revolves around a number of core concepts which we introduce in detail in this section. + +Akka Streams provide a way for executing bounded processing pipelines, where bounds are expressed as the number of stream +elements in flight and in buffers at any given time. Please note that while this allows to estimate an limit memory use +it is not strictly bound to the size in memory of these elements. + +First we define the terminology which will be used though out the entire documentation: + +Stream + An active process that involves moving and transforming data. +Element + An element is the unit which is passed through the stream. All operations as well as back-pressure are expressed in + terms of elements. +Back-pressure + A means of flow-control, and most notably adjusting the speed of upstream sources to the consumption speeds of their sinks. + In the context of Akka Streams back-pressure is always understood as *non-blocking* and *asynchronous* +Processing Stage + The common name for all building blocks that build up a Flow or FlowGraph. + Examples of a processing stage would be Stage (:class:`PushStage`, :class:`PushPullStage`, :class:`StatefulStage`, + :class:`DetachedStage`), in terms of which operations like ``map()``, ``filter()`` and others are implemented. + +Sources, Flows and Sinks +------------------------ +Linear processing pipelines can be expressed in Akka Streams using the following three core abstractions: + +Source + A processing stage with *exactly one output*, emitting data elements in response to it's down-stream demand. +Sink + A processing stage with *exactly one input*, generating demand based on it's internal demand management strategy. +Flow + A processing stage which has *exactly one input and output*, which connects it's up and downstreams by (usually) + transforming the data elements flowing through it. +RunnableFlow + A Flow with has both ends "attached" to a Source and Sink respectively, and is ready to be ``run()``. + +It is important to remember that while constructing these processing pipelines by connecting their different processing +stages no data will flow through it until it is materialized. Materialization is the process of allocating all resources +needed to run the computation described by a Flow (in Akka Streams this will often involve starting up Actors). +Thanks to Flows being simply a description of the processing pipeline they are *immutable, thread-safe, and freely shareable*, +which means that it is for example safe to share send between actors–to have one actor prepare the work, and then have it +be materialized at some completely different place in the code. + +In order to be able to run a ``Flow[In,Out]`` it must be connected to a ``Sink[In]`` *and* ``Source[Out]`` of matching types. +It is also possible to directly connect a :class:`Sink` to a :class:`Source`. + +.. includecode:: code/docs/stream/FlowDocSpec.scala#materialization-in-steps + +The :class:`MaterializedMap` can be used to get materialized values of both sinks and sources out from the running +stream. In general, a stream can expose multiple materialized values, however the very common case of only wanting to +get back a Sinks (in order to read a result) or Sources (in order to cancel or influence it in some way) materialized +values has a small convenience method called ``runWith()``. It is available for ``Sink`` or ``Source`` and ``Flow``, with respectively, +requiring the user to supply a ``Source`` (in order to run a ``Sink``), a ``Sink`` (in order to run a ``Source``) and +both a ``Source`` and a ``Sink`` (in order to run a ``Flow``, since it has neither attached yet). + +.. includecode:: code/docs/stream/FlowDocSpec.scala#materialization-runWith + +It is worth pointing out that since processing stages are *immutable*, connecting them returns a new processing stage, +instead of modifying the existing instance, so while construction long flows, remember to assign the new value to a variable or run it: + +.. includecode:: code/docs/stream/FlowDocSpec.scala#source-immutable + +.. note:: +By default Akka Streams elements support **exactly one** downstream processing stage. + Making fan-out (supporting multiple downstream processing stages) an explicit opt-in feature allows default stream elements to + be less complex and more efficient. Also it allows for greater flexibility on *how exactly* to handle the multicast scenarios, + by providing named fan-out elements such as broadcast (signals all down-stream elements) or balance (signals one of available down-stream elements). + +In the above example we used the ``runWith`` method, which both materializes the stream and returns the materialized value +of the given sink or source. + +.. _back-pressure-explained-scala: + +Back-pressure explained +----------------------- +Akka Streams implements an asynchronous non-blocking back-pressure protocol standardised by the Reactive Streams +specification, which Akka is a founding member of. + +As library user you do not have to write any explicit back-pressure handling code in order for it to work - it is built +and dealt with automatically by all of the provided Akka Streams processing stages. However is possible to include +explicit buffers with overflow strategies that can influence the behaviour of the stream. This is especially important +in complex processing graphs which may even sometimes even contain loops (which *must* be treated with very special +care, as explained in :ref:`cycles-scala`). + +The back pressure protocol is defined in terms of the number of elements a downstream ``Subscriber`` is able to receive, +referred to as ``demand``. This demand is the *number of elements* receiver of the data, referred to as ``Subscriber`` +in Reactive Streams, and implemented by ``Sink`` in Akka Streams is able to safely consume at this point in time. +The source of data referred to as ``Publisher`` in Reactive Streams terminology and implemented as ``Source`` in Akka +Streams guarantees that it will never emit more elements than the received total demand for any given ``Subscriber``. + +.. note:: +The Reactive Streams specification defines its protocol in terms of **Publishers** and **Subscribers**. + These types are *not* meant to be user facing API, instead they serve as the low level building blocks for + different Reactive Streams implementations. + + Akka Streams implements these concepts as **Sources**, **Flows** (referred to as **Processor** in Reactive Streams) + and **Sinks** without exposing the Reactive Streams interfaces directly. + If you need to inter-op between different read :ref:`integration-with-Reactive-Streams-enabled-libraries`. + +The mode in which Reactive Streams back-pressure works can be colloquially described as "dynamic push / pull mode", +since it will switch between push or pull based back-pressure models depending on if the downstream is able to cope +with the upstreams production rate or not. + +To illustrate further let us consider both problem situations and how the back-pressure protocol handles them: + +Slow Publisher, fast Subscriber +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +This is the happy case of course–we do not need to slow down the Publisher in this case. However signalling rates are +rarely constant and could change at any point in time, suddenly ending up in a situation where the Subscriber is now +slower than the Publisher. In order to safeguard from these situations, the back-pressure protocol must still be enabled +during such situations, however we do not want to pay a high penalty for this safety net being enabled. + +The Reactive Streams protocol solves this by asynchronously signalling from the Subscriber to the Publisher +`Request(n:Int)` signals. The protocol guarantees that the Publisher will never signal *more* than the demand it was +signalled. Since the Subscriber however is currently faster, it will be signalling these Request messages at a higher +rate (and possibly also batching together the demand - requesting multiple elements in one Request signal). This means +that the Publisher should not ever have to wait (be back-pressured) with publishing its incoming elements. + +As we can see, in this scenario we effectively operate in so called push-mode since the Publisher can continue producing +elements as fast as it can, since the pending demand will be recovered just-in-time while it is emitting elements. + +Fast Publisher, slow Subscriber +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +This is the case when back-pressuring the ``Publisher`` is required, because the ``Subscriber`` is not able to cope with +the rate at which its upstream would like to emit data elements. + +Since the ``Publisher`` is not allowed to signal more elements than the pending demand signalled by the ``Subscriber``, +it will have to abide to this back-pressure by applying one of the below strategies: + +- not generate elements, if it is able to control their production rate, +- try buffering the elements in a *bounded* manner until more demand is signalled, +- drop elements until more demand is signalled, +- tear down the stream if unable to apply any of the above strategies. + +As we can see, this scenario effectively means that the ``Subscriber`` will *pull* the elements from the Publisher– +this mode of operation is referred to as pull-based back-pressure. + +.. _stream-materialization-scala: +Stream Materialization +---------------------- +**TODO - write me (feel free to move around as well)** + +When constructing flows and graphs in Akka Streams think of them as preparing a blueprint, an execution plan. +Stream materialization is the process of taking a stream description (the graph) and allocating all the necessary resources +it needs in order to run. In the case of Akka Streams this often means starting up Actors which power the processing, +but is not restricted to that - it could also mean opening files or socket connections etc. – depending on what the stream needs. + +Materialization is triggered at so called "terminal operations". Most notably this includes the various forms of the ``run()`` +and ``runWith()`` methods defined on flow elements as well as a small number of special syntactic sugars for running with +well-known sinks, such as ``foreach(el => )`` (being an alias to ``runWith(Sink.foreach(el => ))``. + +Materialization is currently performed synchronously on the materializing thread. +Tha actual stream processing is handled by :ref:`Actors actor-scala` started up during the streams materialization, +which will be running on the thread pools they have been configured to run on - which defaults to the dispatcher set in +:class:`MaterializationSettings` while constructing the :class:`FlowMaterializer`. + +.. note:: +Reusing *instances* of linear computation stages (Source, Sink, Flow) inside FlowGraphs is legal, + yet will materialize that stage multiple times. diff --git a/akka-docs-dev/rst/scala/stream-graphs.rst b/akka-docs-dev/rst/scala/stream-graphs.rst new file mode 100644 index 0000000000..dfb5578836 --- /dev/null +++ b/akka-docs-dev/rst/scala/stream-graphs.rst @@ -0,0 +1,211 @@ +.. _stream-graph-scala: + +################### +Working with Graphs +################### + +In Akka Streams computation graphs are not expressed using a fluent DSL like linear computations are, instead they are +written in a more graph-resembling DSL which aims to make translating graph drawings (e.g. from notes taken +from design discussions, or illustrations in protocol specifications) to and from code simpler. In this section we'll +dive into the multiple ways of constructing and re-using graphs, as well as explain common pitfalls and how to avoid them. + +Graphs are needed whenever you want to perform any kind of fan-in ("multiple inputs") or fan-out ("multiple outputs") operations. +Considering linear Flows to be like roads, we can picture graph operations as junctions: multiple flows being connected at a single point. +Some graph operations which are common enough and fit the linear style of Flows, such as ``concat`` (which concatenates two +streams, such that the second one is consumed after the first one has completed), may have shorthand methods defined on +:class:`Flow` or :class:`Source` themselves, however you should keep in mind that those are also implemented as graph junctions. + +.. _flow-graph-scala: + +Constructing Flow Graphs +------------------------ +Flow graphs are built from simple Flows which serve as the linear connections within the graphs as well as Junctions +which serve as fan-in and fan-out points for flows. Thanks to the junctions having meaningful types based on their behaviour +and making them explicit elements these elements should be rather straight forward to use. + +Akka Streams currently provides these junctions: + +* **Fan-out** + - ``Broadcast[T]`` – (1 input, n outputs) signals each output given an input signal, + - ``Balance[T]`` – (1 input => n outputs), signals one of its output ports given an input signal, + - ``UnZip[A,B]`` – (1 input => 2 outputs), which is a specialized element which is able to split a stream of ``(A,B)`` tuples into two streams one type ``A`` and one of type ``B``, + - ``FlexiRoute[In]`` – (1 input, n outputs), which enables writing custom fan out elements using a simple DSL, +* **Fan-in** + - ``Merge[In]`` – (n inputs , 1 output), picks signals randomly from inputs pushing them one by one to its output, + - ``MergePreferred[In]`` – like :class:`Merge` but if elements are available on ``preferred`` port, it picks from it, otherwise randomly from ``others``, + - ``ZipWith[A,B,...,Out]`` – (n inputs (defined upfront), 1 output), which takes a function of n inputs that, given all inputs are signalled, transforms and emits 1 output, + + ``Zip[A,B,Out]`` – (2 inputs, 1 output), which is a :class:`ZipWith` specialised to zipping input streams of ``A`` and ``B`` into an ``(A,B)`` tuple stream, + - ``Concat[T]`` – (2 inputs, 1 output), which enables to concatenate streams (first consume one, then the second one), thus the order of which stream is ``first`` and which ``second`` matters, + - ``FlexiMerge[Out]`` – (n inputs, 1 output), which enables writing custom fan out elements using a simple DSL. + +One of the goals of the FlowGraph DSL is to look similar to how one would draw a graph on a whiteboard, so that it is +simple to translate a design from whiteboard to code and be able to relate those two. Let's illustrate this by translating +the below hand drawn graph into Akka Streams: + +.. image:: ../images/simple-graph-example.png + +Such graph is simple to translate to the Graph DSL since each linear element corresponds to a :class:`Flow`, +and each circle corresponds to either a :class:`Junction` or a :class:`Source` or :class:`Sink` if it is beginning +or ending a :class:`Flow`. Junctions must always be created with defined type parameters, as otherwise the ``Nothing`` type +will be inferred and + +.. includecode:: code/docs/stream/FlowGraphDocSpec.scala#simple-flow-graph + +.. note:: +Junction *reference equality* defines *graph node equality* (i.e. the same merge *instance* used in a FlowGraph + refers to the same location in the resulting graph). + +Notice the ``import FlowGraphImplicits._`` which brings into scope the ``~>`` operator (read as "edge", "via" or "to"). +It is also possible to construct graphs without the ``~>`` operator in case you prefer to use the graph builder explicitly: + +.. includecode:: code/docs/stream/FlowGraphDocSpec.scala#simple-flow-graph-no-implicits + +By looking at the snippets above, it should be apparent that **the** :class:`b:FlowGraphBuilder` **object is mutable**. +It is also used (implicitly) by the ``~>`` operator, also making it a mutable operation as well. +The reason for this design choice is to enable simpler creation of complex graphs, which may even contain cycles. +Once the FlowGraph has been constructed though, the :class:`FlowGraph` instance *is immutable, thread-safe, and freely shareable*. +Linear Flows however are always immutable and appending an operation to a Flow always returns a new Flow instance. +This means that you can safely re-use one given Flow in multiple places in a processing graph. In the example below +we prepare a graph that consists of two parallel streams, in which we re use the same instance of :class:`Flow`, +yet it will properly be materialized as two connections between the corresponding Sources and Sinks: + +.. includecode:: code/docs/stream/FlowGraphDocSpec.scala#flow-graph-reusing-a-flow + +.. _partial-flow-graph-scala: + +Constructing and combining Partial Flow Graphs +---------------------------------------------- +Sometimes it is not possible (or needed) to construct the entire computation graph in one place, but instead construct +all of its different phases in different places and in the end connect them all into a complete graph and run it. + +This can be achieved using :class:`PartialFlowGraph`. The reason of representing it as a different type is that a +:class:`FlowGraph` requires all ports to be connected, and if they are not it will throw an exception at construction +time, which helps to avoid simple wiring errors while working with graphs. A partial flow graph however does not perform +this validation, and allows graphs that are not yet fully connected. + +A :class:`PartialFlowGraph` is defined as a :class:`FlowGraph` which contains so called "undefined elements", +such as ``UndefinedSink[T]`` or ``UndefinedSource[T]``, which can be reused and plugged into by consumers of that +partial flow graph. Let's imagine we want to provide users with a specialized element that given 3 inputs will pick +the greatest int value of each zipped triple. We'll want to expose 3 input ports (undefined sources) and one output port +(undefined sink). + +.. includecode:: code/docs/stream/StreamPartialFlowGraphDocSpec.scala#simple-partial-flow-graph + +As you can see, first we construct the partial graph that contains all the zipping and comparing of stream +elements, then we import it (all of its nodes and connections) explicitly to the :class:`FlowGraph` instance in which all +the undefined elements are rewired to real sources and sinks. The graph can then be run and yields the expected result. + +.. warning:: +Please note that a :class:`FlowGraph` is not able to provide compile time type-safety about whether or not all + elements have been properly connected - this validation is performed as a runtime check during the graph's instantiation. + +.. _constructing-sources-sinks-flows-from-partial-graphs-scala: + +Constructing Sources, Sinks and Flows from a Partial Graphs +----------------------------------------------------------- +Instead of treating a :class:`PartialFlowGraph` as simply a collection of flows and junctions which may not yet all be +connected it is sometimes useful to expose such complex graph as a simpler structure, +such as a :class:`Source`, :class:`Sink` or :class:`Flow`. + +In fact, these concepts can be easily expressed as special cases of a partially connected graph: + +* :class:`Source` is a partial flow graph with *exactly one* :class:`UndefinedSink`, +* :class:`Sink` is a partial flow graph with *exactly one* :class:`UndefinedSource`, +* :class:`Flow` is a partial flow graph with *exactly one* :class:`UndefinedSource` and *exactly one* :class:`UndefinedSource`. + +Being able hide complex graphs inside of simple elements such as Sink / Source / Flow enables you to easily create one +complex element and from there on treat it as simple compound stage for linear computations. + +In order to create a Source from a partial flow graph ``Source[T]`` provides a special apply method that takes a function +that must return an ``UndefinedSink[T]``. This undefined sink will become "the sink that must be attached before this Source +can run". Refer to the example below, in which we create a Source that zips together two numbers, to see this graph +construction in action: + +.. includecode:: code/docs/stream/StreamPartialFlowGraphDocSpec.scala#source-from-partial-flow-graph + +Similarly the same can be done for a ``Sink[T]``, in which case the returned value must be an ``UndefinedSource[T]``. +For defining a ``Flow[T]`` we need to expose both an undefined source and sink: + +.. includecode:: code/docs/stream/StreamPartialFlowGraphDocSpec.scala#flow-from-partial-flow-graph + +Graph cycles, liveness and deadlocks +------------------------------------ + +By default :class:`FlowGraph` does not allow (or to be precise, its builder does not allow) the creation of cycles. +The reason for this is that cycles need special considerations to avoid potential deadlocks and other liveness issues. +This section shows several examples of problems that can arise from the presence of feedback arcs in stream processing +graphs. + +The first example demonstrates a graph that contains a naive cycle (the presence of cycles is enabled by calling +``allowCycles()`` on the builder). The graph takes elements from the source, prints them, then broadcasts those elements +to a consumer (we just used ``Sink.ignore`` for now) and to a feedback arc that is merged back into the main stream via +a ``Merge`` junction. + +.. includecode:: code/docs/stream/GraphCyclesSpec.scala#deadlocked + +Running this we observe that after a few numbers have been printed, no more elements are logged to the console - +all processing stops after some time. After some investigation we observe that: + +* through merging from ``source`` we increase the number of elements flowing in the cycle +* by broadcasting back to the cycle we do not decrease the number of elements in the cycle + +Since Akka Streams (and Reactive Streams in general) guarantee bounded processing (see the "Buffering" section for more +details) it means that only a bounded number of elements are buffered over any time span. Since our cycle gains more and +more elements, eventually all of its internal buffers become full, backpressuring ``source`` forever. To be able +to process more elements from ``source`` elements would need to leave the cycle somehow. + +If we modify our feedback loop by replacing the ``Merge`` junction with a ``MergePreferred`` we can avoid the deadlock. +``MergePreferred`` is unfair as it always tries to consume from a preferred input port if there are elements available +before trying the other lower priority input ports. Since we feed back through the preferred port it is always guaranteed +that the elements in the cycles can flow. + +.. includecode:: code/docs/stream/GraphCyclesSpec.scala#unfair + +If we run the example we see that the same sequence of numbers are printed +over and over again, but the processing does not stop. Hence, we avoided the deadlock, but ``source`` is still +backpressured forever, because buffer space is never recovered: the only action we see is the circulation of a couple +of initial elements from ``source``. + +.. note:: +What we see here is that in certain cases we need to choose between boundedness and liveness. Our first example would + not deadlock if there would be an infinite buffer in the loop, or vice versa, if the elements in the cycle would + be balanced (as many elements are removed as many are injected) then there would be no deadlock. + +To make our cycle both live (not deadlocking) and fair we can introduce a dropping element on the feedback arc. In this +case we chose the ``buffer()`` operation giving it a dropping strategy ``OverflowStrategy.dropHead``. + +.. includecode:: code/docs/stream/GraphCyclesSpec.scala#dropping + +If we run this example we see that + +* The flow of elements does not stop, there are always elements printed +* We see that some of the numbers are printed several times over time (due to the feedback loop) but on average +the numbers are increasing in the long term + +This example highlights that one solution to avoid deadlocks in the presence of potentially unbalanced cycles +(cycles where the number of circulating elements are unbounded) is to drop elements. An alternative would be to +define a larger buffer with ``OverflowStrategy.error`` which would fail the stream instead of deadlocking it after +all buffer space has been consumed. + +As we discovered in the previous examples, the core problem was the unbalanced nature of the feedback loop. We +circumvented this issue by adding a dropping element, but now we want to build a cycle that is balanced from +the beginning instead. To achieve this we modify our first graph by replacing the ``Merge`` junction with a ``ZipWith``. +Since ``ZipWith`` takes one element from ``source`` *and* from the feedback arc to inject one element into the cycle, +we maintain the balance of elements. + +.. includecode:: code/docs/stream/GraphCyclesSpec.scala#zipping-dead + +Still, when we try to run the example it turns out that no element is printed at all! After some investigation we +realize that: + +* In order to get the first element from ``source`` into the cycle we need an already existing element in the cycle +* In order to get an initial element in the cycle we need an element from ``source`` + +These two conditions are a typical "chicken-and-egg" problem. The solution is to inject an initial +element into the cycle that is independent from ``source``. We do this by using a ``Concat`` junction on the backwards +arc that injects a single element using ``Source.single``. + +.. includecode:: code/docs/stream/GraphCyclesSpec.scala#zipping-live + +When we run the above example we see that processing starts and never stops. The important takeaway from this example +is that balanced cycles often need an initial "kick-off" element to be injected into the cycle. \ No newline at end of file diff --git a/akka-docs-dev/rst/scala/stream-integrations.rst b/akka-docs-dev/rst/scala/stream-integrations.rst new file mode 100644 index 0000000000..6f178dfdf1 --- /dev/null +++ b/akka-docs-dev/rst/scala/stream-integrations.rst @@ -0,0 +1,379 @@ +.. _stream-integrations-scala: + +########### +Integration +########### + +Integrating with Actors +======================= + +:class:`ActorPublisher` and :class:`ActorSubscriber` are two traits that provides support for +implementing Reactive Streams :class:`Publisher` and :class:`Subscriber` with an :class:`Actor`. + +These can be consumed by other Reactive Stream libraries or used as a +Akka Streams :class:`Source` or :class:`Sink`. + +.. warning:: + + :class:`ActorPublisher` and :class:`ActorSubscriber` cannot be used with remote actors, + because if signals of the Reactive Streams protocol (e.g. ``request``) are lost the + the stream may deadlock. + +ActorPublisher +^^^^^^^^^^^^^^ + +Extend/mixin :class:`akka.stream.actor.ActorPublisher` in your :class:`Actor` to make it a +stream publisher that keeps track of the subscription life cycle and requested elements. + +Here is an example of such an actor. It dispatches incoming jobs to the attached subscriber: + +.. includecode:: code/docs/stream/ActorPublisherDocSpec.scala#job-manager + +You send elements to the stream by calling ``onNext``. You are allowed to send as many +elements as have been requested by the stream subscriber. This amount can be inquired with +``totalDemand``. It is only allowed to use ``onNext`` when ``isActive`` and ``totalDemand>0``, +otherwise ``onNext`` will throw ``IllegalStateException``. + +When the stream subscriber requests more elements the ``ActorPublisher.Request`` message +is delivered to this actor, and you can act on that event. The ``totalDemand`` +is updated automatically. + +When the stream subscriber cancels the subscription the ``ActorPublisher.Cancel`` message +is delivered to this actor. After that subsequent calls to ``onNext`` will be ignored. + +You can complete the stream by calling ``onComplete``. After that you are not allowed to +call ``onNext``, ``onError`` and ``onComplete``. + +You can terminate the stream with failure by calling ``onError``. After that you are not allowed to +call ``onNext``, ``onError`` and ``onComplete``. + +If you suspect that this ``ActorPublisher`` may never get subscribed to, you can override the ``subscriptionTimeout`` +method to provide a timeout after which this Publisher should be considered canceled. The actor will be notified when +the timeout triggers via an ``ActorPublisherMessage.SubscriptionTimeoutExceeded`` message and MUST then perform +cleanup and stop itself. + +If the actor is stopped the stream will be completed, unless it was not already terminated with +failure, completed or canceled. + +More detailed information can be found in the API documentation. + +This is how it can be used as input :class:`Source` to a :class:`Flow`: + +.. includecode:: code/docs/stream/ActorPublisherDocSpec.scala#actor-publisher-usage + +You can only attach one subscriber to this publisher. Use ``Sink.fanoutPublisher`` to enable +multiple subscribers. + +ActorSubscriber +^^^^^^^^^^^^^^^ + +Extend/mixin :class:`akka.stream.actor.ActorSubscriber` in your :class:`Actor` to make it a +stream subscriber with full control of stream back pressure. It will receive +``ActorSubscriberMessage.OnNext``, ``ActorSubscriberMessage.OnComplete`` and ``ActorSubscriberMessage.OnError`` +messages from the stream. It can also receive other, non-stream messages, in the same way as any actor. + +Here is an example of such an actor. It dispatches incoming jobs to child worker actors: + +.. includecode:: code/docs/stream/ActorSubscriberDocSpec.scala#worker-pool + +Subclass must define the ``RequestStrategy`` to control stream back pressure. +After each incoming message the ``ActorSubscriber`` will automatically invoke +the ``RequestStrategy.requestDemand`` and propagate the returned demand to the stream. + +* The provided ``WatermarkRequestStrategy`` is a good strategy if the actor performs work itself. +* The provided ``MaxInFlightRequestStrategy`` is useful if messages are queued internally or + delegated to other actors. +* You can also implement a custom ``RequestStrategy`` or call ``request`` manually together with + ``ZeroRequestStrategy`` or some other strategy. In that case + you must also call ``request`` when the actor is started or when it is ready, otherwise + it will not receive any elements. + +More detailed information can be found in the API documentation. + +This is how it can be used as output :class:`Sink` to a :class:`Flow`: + +.. includecode:: code/docs/stream/ActorSubscriberDocSpec.scala#actor-subscriber-usage + +Integrating with External Services +================================== + +Stream transformations and side effects involving external non-stream based services can be +performed with ``mapAsync`` or ``mapAsyncUnordered``. + +For example, sending emails to the authors of selected tweets using an external +email service: + +.. includecode:: code/docs/stream/IntegrationDocSpec.scala#email-server-send + +We start with the tweet stream of authors: + +.. includecode:: code/docs/stream/IntegrationDocSpec.scala#tweet-authors + +Assume that we can lookup their email address using: + +.. includecode:: code/docs/stream/IntegrationDocSpec.scala#email-address-lookup + +Transforming the stream of authors to a stream of email addresses by using the ``lookupEmail`` +service can be done with ``mapAsync``: + +.. includecode:: code/docs/stream/IntegrationDocSpec.scala#email-addresses-mapAsync + +Finally, sending the emails: + +.. includecode:: code/docs/stream/IntegrationDocSpec.scala#send-emails + +``mapAsync`` is applying the given function that is calling out to the external service to +each of the elements as they pass through this processing step. The function returns a :class:`Future` +and the value of that future will be emitted downstreams. As many futures as requested elements by +downstream may run in parallel and may complete in any order, but the elements that +are emitted downstream are in the same order as received from upstream. + +That means that back-pressure works as expected. For example if the ``emailServer.send`` +is the bottleneck it will limit the rate at which incoming tweets are retrieved and +email addresses looked up. + +Note that ``mapAsync`` preserves the order of the stream elements. In this example the order +is not important and then we can use the more efficient ``mapAsyncUnordered``: + +.. includecode:: code/docs/stream/IntegrationDocSpec.scala#external-service-mapAsyncUnordered + +In the above example the services conveniently returned a :class:`Future` of the result. +If that is not the case you need to wrap the call in a :class:`Future`. If the service call +involves blocking you must also make sure that you run it on a dedicated execution context, to +avoid starvation and disturbance of other tasks in the system. + +.. includecode:: code/docs/stream/IntegrationDocSpec.scala#blocking-mapAsync + +The configuration of the ``"blocking-dispatcher"`` may look something like: + +.. includecode:: code/docs/stream/IntegrationDocSpec.scala#blocking-dispatcher-config + +An alternative for blocking calls is to perform them in a ``map`` operation, still using a +dedicated dispatcher for that operation. + +.. includecode:: code/docs/stream/IntegrationDocSpec.scala#blocking-map + +However, that is not exactly the same as ``mapAsync``, since the ``mapAsync`` may run +several calls concurrently, but ``map`` performs them one at a time. + +For a service that is exposed as an actor, or if an actor is used as a gateway in front of an +external service, you can use ``ask``: + +.. includecode:: code/docs/stream/IntegrationDocSpec.scala#save-tweets + +Note that if the ``ask`` is not completed within the given timeout the stream is completed with failure. +If that is not desired outcome you can use ``recover`` on the ``ask`` :class:`Future`. + +Illustrating ordering and parallelism +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +Let us look at another example to get a better understanding of the ordering +and parallelism characteristics of ``mapAsync`` and ``mapAsyncUnordered``. + +Several ``mapAsync`` and ``mapAsyncUnordered`` futures may run concurrently. +The number of concurrent futures are limited by the downstream demand. +For example, if 5 elements have been requested by downstream there will be at most 5 +futures in progress. + +``mapAsync`` emits the future results in the same order as the input elements +were received. That means that completed results are only emitted downstreams +when earlier results have been completed and emitted. One slow call will thereby +delay the results of all successive calls, even though they are completed before +the slow call. + +``mapAsyncUnordered`` emits the future results as soon as they are completed, i.e. +it is possible that the elements are not emitted downstream in the same order as +received from upstream. One slow call will thereby not delay the results of faster +successive calls as long as there is downstream demand of several elements. + +Here is a fictive service that we can use to illustrate these aspects. + +.. includecode:: code/docs/stream/IntegrationDocSpec.scala#sometimes-slow-service + +Elements starting with a lower case character are simulated to take longer time +to process. + +Here is how we can use it with ``mapAsync``: + +.. includecode:: code/docs/stream/IntegrationDocSpec.scala#sometimes-slow-mapAsync + +The output may look like this: + +:: + + before: a + before: B + before: C + before: D + running: a (1) + running: B (2) + before: e + running: C (3) + before: F + running: D (4) + before: g + before: H + completed: C (3) + completed: B (2) + completed: D (1) + completed: a (0) + after: A + after: B + running: e (1) + after: C + after: D + running: F (2) + before: i + before: J + running: g (3) + running: H (4) + completed: H (2) + completed: F (3) + completed: e (1) + completed: g (0) + after: E + after: F + running: i (1) + after: G + after: H + running: J (2) + completed: J (1) + completed: i (0) + after: I + after: J + +Note that ``after`` lines are in the same order as the ``before`` lines even +though elements are ``completed`` in a different order. For example ``H`` +is ``completed`` before ``g``, but still emitted afterwards. + +The numbers in parenthesis illustrates how many calls that are in progress at +the same time. Here the downstream demand and thereby the number of concurrent +calls are limited by the buffer size (4) of the :class:`MaterializerSettings`. + +Here is how we can use the same service with ``mapAsyncUnordered``: + +.. includecode:: code/docs/stream/IntegrationDocSpec.scala#sometimes-slow-mapAsyncUnordered + +The output may look like this: + +:: + + before: a + before: B + before: C + before: D + running: a (1) + running: B (2) + before: e + running: C (3) + before: F + running: D (4) + before: g + before: H + completed: B (3) + completed: C (1) + completed: D (2) + after: B + after: D + running: e (2) + after: C + running: F (3) + before: i + before: J + completed: F (2) + after: F + running: g (3) + running: H (4) + completed: H (3) + after: H + completed: a (2) + after: A + running: i (3) + running: J (4) + completed: J (3) + after: J + completed: e (2) + after: E + completed: g (1) + after: G + completed: i (0) + after: I + +Note that ``after`` lines are not in the same order as the ``before`` lines. For example +``H`` overtakes the slow ``G``. + +The numbers in parenthesis illustrates how many calls that are in progress at +the same time. Here the downstream demand and thereby the number of concurrent +calls are limited by the buffer size (4) of the :class:`MaterializerSettings`. + +Integrating with Reactive Streams +================================= + +`Reactive Streams`_ defines a standard for asynchronous stream processing with non-blocking +back pressure. It makes it possible to plug together stream libraries that adhere to the standard. +Akka Streams is one such library. + +An incomplete list of other implementations: + +* `Reactor (1.1+)`_ +* `RxJava`_ +* `Ratpack`_ +* `Slick`_ + +.. _Reactive Streams: http://reactive-streams.org/ +.. _Reactor (1.1+): http://github.com/reactor/reactor +.. _RxJava: https://github.com/ReactiveX/RxJavaReactiveStreams +.. _Ratpack: http://www.ratpack.io/manual/current/streams.html +.. _Slick: http://slick.typesafe.com + +The two most important interfaces in Reactive Streams are the :class:`Publisher` and :class:`Subscriber`. + +.. includecode:: code/docs/stream/ReactiveStreamsDocSpec.scala#imports + +Let us assume that a library provides a publisher of tweets: + +.. includecode:: code/docs/stream/ReactiveStreamsDocSpec.scala#tweets-publisher + +and another library knows how to store author handles in a database: + +.. includecode:: code/docs/stream/ReactiveStreamsDocSpec.scala#author-storage-subscriber + +Using an Akka Streams :class:`Flow` we can transform the stream and connect those: + +.. includecode:: code/docs/stream/ReactiveStreamsDocSpec.scala +:include: authors,connect-all + + The :class:`Publisher` is used as an input :class:`Source` to the flow and the +:class:`Subscriber` is used as an output :class:`Sink`. + +A :class:`Flow` can also be materialized to a :class:`Subscriber`, :class:`Publisher` pair: + +.. includecode:: code/docs/stream/ReactiveStreamsDocSpec.scala#flow-publisher-subscriber + +A publisher can be connected to a subscriber with the ``subscribe`` method. + +It is also possible to expose a :class:`Source` as a :class:`Publisher` +by using the ``publisher`` :class:`Sink`: + +.. includecode:: code/docs/stream/ReactiveStreamsDocSpec.scala#source-publisher + +A publisher that is created with ``Sink.publisher`` only supports one subscriber. A second +subscription attempt will be rejected with an :class:`IllegalStateException`. + +A publisher that supports multiple subscribers can be created with ``Sink.fanoutPublisher`` +instead: + +.. includecode:: code/docs/stream/ReactiveStreamsDocSpec.scala +:include: author-alert-subscriber,author-storage-subscriber + +.. includecode:: code/docs/stream/ReactiveStreamsDocSpec.scala#source-fanoutPublisher + +The buffer size controls how far apart the slowest subscriber can be from the fastest subscriber +before slowing down the stream. + +To make the picture complete, it is also possible to expose a :class:`Sink` as a :class:`Subscriber` +by using the ``subscriber`` :class:`Source`: + +.. includecode:: code/docs/stream/ReactiveStreamsDocSpec.scala#sink-subscriber + + diff --git a/akka-docs-dev/rst/scala/stream-introduction.rst b/akka-docs-dev/rst/scala/stream-introduction.rst new file mode 100644 index 0000000000..3ca729d719 --- /dev/null +++ b/akka-docs-dev/rst/scala/stream-introduction.rst @@ -0,0 +1,62 @@ +.. _stream-introduction-scala: + +############ +Introduction +############ + +Motivation +========== + +The way we consume services from the internet today includes many instances of +streaming data, both downloading from a service as well as uploading to it or +peer-to-peer data transfers. Regarding data as a stream of elements instead of +in its entirety is very useful because it matches the way computers send and +receive them (for example via TCP), but it is often also a necessity because +data sets frequently become too large to be handled as a whole. We spread +computations or analyses over large clusters and call it “big data”, where the +whole principle of processing them is by feeding those data sequentially—as a +stream—through some CPUs. + +Actors can be seen as dealing with streams as well: they send and receive +series of messages in order to transfer knowledge (or data) from one place to +another. We have found it tedious and error-prone to implement all the proper +measures in order to achieve stable streaming between actors, since in addition +to sending and receiving we also need to take care to not overflow any buffers +or mailboxes in the process. Another pitfall is that Actor messages can be lost +and must be retransmitted in that case lest the stream have holes on the +receiving side. When dealing with streams of elements of a fixed given type, +Actors also do not currently offer good static guarantees that no wiring errors +are made: type-safety could be improved in this case. + +For these reasons we decided to bundle up a solution to these problems as an +Akka Streams API. The purpose is to offer an intuitive and safe way to +formulate stream processing setups such that we can then execute them +efficiently and with bounded resource usage—no more OutOfMemoryErrors. In order +to achieve this our streams need to be able to limit the buffering that they +employ, they need to be able to slow down producers if the consumers cannot +keep up. This feature is called back-pressure and is at the core of the +[Reactive Streams](http://reactive-streams.org/) initiative of which Akka is a +founding member. For you this means that the hard problem of propagating and +reacting to back-pressure has been incorporated in the design of Akka Streams +already, so you have one less thing to worry about; it also means that Akka +Streams interoperate seamlessly with all other Reactive Streams implementations +(where Reactive Streams interfaces define the interoperability SPI while +implementations like Akka Streams offer a nice user API). + +How to read these docs +====================== + +Stream processing is a different paradigm to the Actor Model or to Future +composition, therefore it may take some careful study of this subject until you +feel familiar with the tools and techniques. The documentation is here to help +and for best results we recommend the following approach: + +* Read the :ref:`quickstart-scala` to get a feel for how streams + look like and what they can do. +* The top-down learners may want to peruse the :ref:`stream-design` at this + point. +* The bottom-up learners may feel more at home rummaging through the + :ref:`stream-cookbook-scala`. +* The other sections can be read sequentially or as needed during the previous + steps, each digging deeper into specific topics. + diff --git a/akka-docs-dev/rst/scala/stream-io.rst b/akka-docs-dev/rst/scala/stream-io.rst new file mode 100644 index 0000000000..d3c616cdc3 --- /dev/null +++ b/akka-docs-dev/rst/scala/stream-io.rst @@ -0,0 +1,7 @@ +.. _stream-io-scala: + +######################### +Working with streaming IO +######################### + +*TODO* \ No newline at end of file diff --git a/akka-docs-dev/rst/scala/stream-quickstart.rst b/akka-docs-dev/rst/scala/stream-quickstart.rst index 3751395b62..2908b299a8 100644 --- a/akka-docs-dev/rst/scala/stream-quickstart.rst +++ b/akka-docs-dev/rst/scala/stream-quickstart.rst @@ -1,166 +1,167 @@ -.. _quickstart-scala: -Quick Start: Reactive Tweets -============================ - -A typical use case for stream processing is consuming a live stream of data that we want to extract or aggregate some -other data from. In this example we'll consider consuming a stream of tweets and extracting information concerning Akka from them. - -We will also consider the problem inherent to all non-blocking streaming solutions – "*What if the subscriber is slower -to consume the live stream of data?*" i.e. it is unable to keep up with processing the live data. Traditionally the solution -is often to buffer the elements, but this can (and usually *will*) cause eventual buffer overflows and instability of such systems. -Instead Akka Streams depend on internal backpressure signals that allow to control what should happen in such scenarios. - -Here's the data model we'll be working with throughout the quickstart examples: - -.. includecode:: code/docs/stream/TwitterStreamQuickstartDocSpec.scala#model - -Transforming and consuming simple streams ------------------------------------------ -In order to prepare our environment by creating an :class:`ActorSystem` and :class:`FlowMaterializer`, -which will be responsible for materializing and running the streams we are about to create: - -.. includecode:: code/docs/stream/TwitterStreamQuickstartDocSpec.scala#materializer-setup - -The :class:`FlowMaterializer` can optionally take :class:`MaterializerSettings` which can be used to define -materialization properties, such as default buffer sizes (see also :ref:`stream-buffering-explained-scala`), the dispatcher to -be used by the pipeline etc. These can be overridden on an element-by-element basis or for an entire section, but this -will be discussed in depth in :ref:`stream-section-configuration`. - -Let's assume we have a stream of tweets readily available, in Akka this is expressed as a :class:`Source[Out]`: - -.. includecode:: code/docs/stream/TwitterStreamQuickstartDocSpec.scala#tweet-source - -Streams always start flowing from a :class:`Source[Out]` then can continue through :class:`Flow[In,Out]` elements or -more advanced graph elements to finally be consumed by a :class:`Sink[In]`. Both Sources and Flows provide stream operations -that can be used to transform the flowing data, a :class:`Sink` however does not since its the "end of stream" and its -behavior depends on the type of :class:`Sink` used. - -In our case let's say we want to find all twitter handles of users which tweet about ``#akka``, the operations should look -familiar to anyone who has used the Scala Collections library, however they operate on streams and not collections of data: - -.. includecode:: code/docs/stream/TwitterStreamQuickstartDocSpec.scala#authors-filter-map - -Finally in order to :ref:`materialize ` and run the stream computation we need to attach -the Flow to a :class:`Sink[T]` that will get the flow running. The simplest way to do this is to call -``runWith(sink)`` on a ``Source[Out]``. For convenience a number of common Sinks are predefined and collected as methods on -the :class:``Sink`` `companion object `_. -For now let's simply print each author: - -.. includecode:: code/docs/stream/TwitterStreamQuickstartDocSpec.scala#authors-foreachsink-println - -or by using the shorthand version (which are defined only for the most popular sinks such as :class:`FoldSink` and :class:`ForeachSink`): - -.. includecode:: code/docs/stream/TwitterStreamQuickstartDocSpec.scala#authors-foreach-println - -Materializing and running a stream always requires a :class:`FlowMaterializer` to be in implicit scope (or passed in explicitly, -like this: ``.run(mat)``). - -Flattening sequences in streams -------------------------------- -In the previous section we were working on 1:1 relationships of elements which is the most common case, but sometimes -we might want to map from one element to a number of elements and receive a "flattened" stream, similarly like ``flatMap`` -works on Scala Collections. In order to get a flattened stream of hashtags from our stream of tweets we can use the ``mapConcat`` -combinator: - -.. includecode:: code/docs/stream/TwitterStreamQuickstartDocSpec.scala#hashtags-mapConcat - -.. note:: - The name ``flatMap`` was consciously avoided due to its proximity with for-comprehensions and monadic composition. - It is problematic for two reasons: firstly, flattening by concatenation is often undesirable in bounded stream processing - due to the risk of deadlock (with merge being the preferred strategy), and secondly, the monad laws would not hold for - our implementation of flatMap (due to the liveness issues). - - Please note that the mapConcat requires the supplied function to return a strict collection (``f:Out⇒immutable.Seq[T]``), - whereas ``flatMap`` would have to operate on streams all the way through. - - -Broadcasting a stream ---------------------- -Now let's say we want to persist all hashtags, as well as all author names from this one live stream. -For example we'd like to write all author handles into one file, and all hashtags into another file on disk. -This means we have to split the source stream into 2 streams which will handle the writing to these different files. - -Elements that can be used to form such "fan-out" (or "fan-in") structures are referred to as "junctions" in Akka Streams. -One of these that we'll be using in this example is called :class:`Broadcast`, and it simply emits elements from its -input port to all of its output ports. - -Akka Streams intentionally separate the linear stream structures (Flows) from the non-linear, branching ones (FlowGraphs) -in order to offer the most convenient API for both of these cases. Graphs can express arbitrarily complex stream setups -at the expense of not reading as familiarly as collection transformations. It is also possible to wrap complex computation -graphs as Flows, Sinks or Sources, which will be explained in detail in :ref:`constructing-sources-sinks-flows-from-partial-graphs-scala`. -FlowGraphs are constructed like this: - -.. includecode:: code/docs/stream/TwitterStreamQuickstartDocSpec.scala#flow-graph-broadcast - -.. note:: - The ``~>`` (read as "edge", "via" or "to") operator is only available if ``FlowGraphImplicits._`` are imported. - Without this import you can still construct graphs using the ``builder.addEdge(from,[through,]to)`` method. - -As you can see, inside the :class:`FlowGraph` we use an implicit graph builder to mutably construct the graph -using the ``~>`` "edge operator" (also read as "connect" or "via" or "to"). Once we have the FlowGraph in the value ``g`` -*it is immutable, thread-safe, and freely shareable*. A graph can can be ``run()`` directly - assuming all -ports (sinks/sources) within a flow have been connected properly. It is possible to construct :class:`PartialFlowGraph` s -where this is not required but this will be covered in detail in :ref:`partial-flow-graph-scala`. - -As all Akka Streams elements, :class:`Broadcast` will properly propagate back-pressure to its upstream element. - -Back-pressure in action ------------------------ - -One of the main advantages of Akka Streams is that they *always* propagate back-pressure information from stream Sinks -(Subscribers) to their Sources (Publishers). It is not an optional feature, and is enabled at all times. To learn more -about the back-pressure protocol used by Akka Streams and all other Reactive Streams compatible implementations read -:ref:`back-pressure-explained-scala`. - -A typical problem applications (not using Akka Streams) like this often face is that they are unable to process the incoming data fast enough, -either temporarily or by design, and will start buffering incoming data until there's no more space to buffer, resulting -in either ``OutOfMemoryError`` s or other severe degradations of service responsiveness. With Akka Streams buffering can -and must be handled explicitly. For example, if we are only interested in the "*most recent tweets, with a buffer of 10 -elements*" this can be expressed using the ``buffer`` element: - -.. includecode:: code/docs/stream/TwitterStreamQuickstartDocSpec.scala#tweets-slow-consumption-dropHead - -The ``buffer`` element takes an explicit and required ``OverflowStrategy``, which defines how the buffer should react -when it receives another element element while it is full. Strategies provided include dropping the oldest element (``dropHead``), -dropping the entire buffer, signalling errors etc. Be sure to pick and choose the strategy that fits your use case best. - -Materialized values -------------------- -So far we've been only processing data using Flows and consuming it into some kind of external Sink - be it by printing -values or storing them in some external system. However sometimes we may be interested in some value that can be -obtained from the materialized processing pipeline. For example, we want to know how many tweets we have processed. -While this question is not as obvious to give an answer to in case of an infinite stream of tweets (one way to answer -this question in a streaming setting would to create a stream of counts described as "*up until now*, we've processed N tweets"), -but in general it is possible to deal with finite streams and come up with a nice result such as a total count of elements. - -First, let's write such an element counter using :class:`FoldSink` and then we'll see how it is possible to obtain materialized -values from a :class:`MaterializedMap` which is returned by materializing an Akka stream. We'll split execution into multiple -lines for the sake of explaining the concepts of ``Materializable`` elements and ``MaterializedType`` - -.. includecode:: code/docs/stream/TwitterStreamQuickstartDocSpec.scala#tweets-fold-count - -First, we prepare the :class:`FoldSink` which will be used to sum all ``Int`` elements of the stream. -Next we connect the ``tweets`` stream though a ``map`` step which converts each tweet into the number ``1``, -finally we connect the flow ``to`` the previously prepared Sink. Notice that this step does *not* yet materialize the -processing pipeline, it merely prepares the description of the Flow, which is now connected to a Sink, and therefore can -be ``run()``, as indicated by its type: :class:`RunnableFlow`. Next we call ``run()`` which uses the implicit :class:`FlowMaterializer` -to materialize and run the flow. The value returned by calling ``run()`` on a ``RunnableFlow`` or ``FlowGraph`` is ``MaterializedMap``, -which can be used to retrieve materialized values from the running stream. - -In order to extract an materialized value from a running stream it is possible to call ``get(Materializable)`` on a materialized map -obtained from materializing a flow or graph. Since ``FoldSink`` implements ``Materializable`` and implements the ``MaterializedType`` -as ``Future[Int]`` we can use it to obtain the :class:`Future` which when completed will contain the total length of our tweets stream. -In case of the stream failing, this future would complete with a Failure. - -The reason we have to ``get`` the value out from the materialized map, is because a :class:`RunnableFlow` may be reused -and materialized multiple times, because it is just the "blueprint" of the stream. This means that if we materialize a stream, -for example one that consumes a live stream of tweets within a minute, the materialized values for those two materializations -will be different, as illustrated by this example: - -.. includecode:: code/docs/stream/TwitterStreamQuickstartDocSpec.scala#tweets-runnable-flow-materialized-twice - -Many elements in Akka Streams provide materialized values which can be used for obtaining either results of computation or -steering these elements which will be discussed in detail in :ref:`stream-materialization-scala`. Summing up this section, now we know -what happens behind the scenes when we run this one-liner, which is equivalent to the multi line version above: - -.. includecode:: code/docs/stream/TwitterStreamQuickstartDocSpec.scala#tweets-fold-count-oneline +.. _quickstart-scala: + +Quick Start: Reactive Tweets +============================ + +A typical use case for stream processing is consuming a live stream of data that we want to extract or aggregate some +other data from. In this example we'll consider consuming a stream of tweets and extracting information concerning Akka from them. + +We will also consider the problem inherent to all non-blocking streaming solutions – "*What if the subscriber is slower +to consume the live stream of data?*" i.e. it is unable to keep up with processing the live data. Traditionally the solution +is often to buffer the elements, but this can (and usually *will*) cause eventual buffer overflows and instability of such systems. +Instead Akka Streams depend on internal backpressure signals that allow to control what should happen in such scenarios. + +Here's the data model we'll be working with throughout the quickstart examples: + +.. includecode:: code/docs/stream/TwitterStreamQuickstartDocSpec.scala#model + +Transforming and consuming simple streams +----------------------------------------- +In order to prepare our environment by creating an :class:`ActorSystem` and :class:`FlowMaterializer`, +which will be responsible for materializing and running the streams we are about to create: + +.. includecode:: code/docs/stream/TwitterStreamQuickstartDocSpec.scala#materializer-setup + +The :class:`FlowMaterializer` can optionally take :class:`MaterializerSettings` which can be used to define +materialization properties, such as default buffer sizes (see also :ref:`stream-buffering-explained-scala`), the dispatcher to +be used by the pipeline etc. These can be overridden on an element-by-element basis or for an entire section, but this +will be discussed in depth in :ref:`stream-section-configuration`. + +Let's assume we have a stream of tweets readily available, in Akka this is expressed as a :class:`Source[Out]`: + +.. includecode:: code/docs/stream/TwitterStreamQuickstartDocSpec.scala#tweet-source + +Streams always start flowing from a :class:`Source[Out]` then can continue through :class:`Flow[In,Out]` elements or +more advanced graph elements to finally be consumed by a :class:`Sink[In]`. Both Sources and Flows provide stream operations +that can be used to transform the flowing data, a :class:`Sink` however does not since its the "end of stream" and its +behavior depends on the type of :class:`Sink` used. + +In our case let's say we want to find all twitter handles of users which tweet about ``#akka``, the operations should look +familiar to anyone who has used the Scala Collections library, however they operate on streams and not collections of data: + +.. includecode:: code/docs/stream/TwitterStreamQuickstartDocSpec.scala#authors-filter-map + +Finally in order to :ref:`materialize ` and run the stream computation we need to attach +the Flow to a :class:`Sink[T]` that will get the flow running. The simplest way to do this is to call +``runWith(sink)`` on a ``Source[Out]``. For convenience a number of common Sinks are predefined and collected as methods on +the :class:``Sink`` `companion object `_. +For now let's simply print each author: + +.. includecode:: code/docs/stream/TwitterStreamQuickstartDocSpec.scala#authors-foreachsink-println + +or by using the shorthand version (which are defined only for the most popular sinks such as :class:`FoldSink` and :class:`ForeachSink`): + +.. includecode:: code/docs/stream/TwitterStreamQuickstartDocSpec.scala#authors-foreach-println + +Materializing and running a stream always requires a :class:`FlowMaterializer` to be in implicit scope (or passed in explicitly, +like this: ``.run(mat)``). + +Flattening sequences in streams +------------------------------- +In the previous section we were working on 1:1 relationships of elements which is the most common case, but sometimes +we might want to map from one element to a number of elements and receive a "flattened" stream, similarly like ``flatMap`` +works on Scala Collections. In order to get a flattened stream of hashtags from our stream of tweets we can use the ``mapConcat`` +combinator: + +.. includecode:: code/docs/stream/TwitterStreamQuickstartDocSpec.scala#hashtags-mapConcat + +.. note:: + The name ``flatMap`` was consciously avoided due to its proximity with for-comprehensions and monadic composition. + It is problematic for two reasons: firstly, flattening by concatenation is often undesirable in bounded stream processing + due to the risk of deadlock (with merge being the preferred strategy), and secondly, the monad laws would not hold for + our implementation of flatMap (due to the liveness issues). + + Please note that the mapConcat requires the supplied function to return a strict collection (``f:Out⇒immutable.Seq[T]``), + whereas ``flatMap`` would have to operate on streams all the way through. + + +Broadcasting a stream +--------------------- +Now let's say we want to persist all hashtags, as well as all author names from this one live stream. +For example we'd like to write all author handles into one file, and all hashtags into another file on disk. +This means we have to split the source stream into 2 streams which will handle the writing to these different files. + +Elements that can be used to form such "fan-out" (or "fan-in") structures are referred to as "junctions" in Akka Streams. +One of these that we'll be using in this example is called :class:`Broadcast`, and it simply emits elements from its +input port to all of its output ports. + +Akka Streams intentionally separate the linear stream structures (Flows) from the non-linear, branching ones (FlowGraphs) +in order to offer the most convenient API for both of these cases. Graphs can express arbitrarily complex stream setups +at the expense of not reading as familiarly as collection transformations. It is also possible to wrap complex computation +graphs as Flows, Sinks or Sources, which will be explained in detail in :ref:`constructing-sources-sinks-flows-from-partial-graphs-scala`. +FlowGraphs are constructed like this: + +.. includecode:: code/docs/stream/TwitterStreamQuickstartDocSpec.scala#flow-graph-broadcast + +.. note:: + The ``~>`` (read as "edge", "via" or "to") operator is only available if ``FlowGraphImplicits._`` are imported. + Without this import you can still construct graphs using the ``builder.addEdge(from,[through,]to)`` method. + +As you can see, inside the :class:`FlowGraph` we use an implicit graph builder to mutably construct the graph +using the ``~>`` "edge operator" (also read as "connect" or "via" or "to"). Once we have the FlowGraph in the value ``g`` +*it is immutable, thread-safe, and freely shareable*. A graph can can be ``run()`` directly - assuming all +ports (sinks/sources) within a flow have been connected properly. It is possible to construct :class:`PartialFlowGraph` s +where this is not required but this will be covered in detail in :ref:`partial-flow-graph-scala`. + +As all Akka Streams elements, :class:`Broadcast` will properly propagate back-pressure to its upstream element. + +Back-pressure in action +----------------------- + +One of the main advantages of Akka Streams is that they *always* propagate back-pressure information from stream Sinks +(Subscribers) to their Sources (Publishers). It is not an optional feature, and is enabled at all times. To learn more +about the back-pressure protocol used by Akka Streams and all other Reactive Streams compatible implementations read +:ref:`back-pressure-explained-scala`. + +A typical problem applications (not using Akka Streams) like this often face is that they are unable to process the incoming data fast enough, +either temporarily or by design, and will start buffering incoming data until there's no more space to buffer, resulting +in either ``OutOfMemoryError`` s or other severe degradations of service responsiveness. With Akka Streams buffering can +and must be handled explicitly. For example, if we are only interested in the "*most recent tweets, with a buffer of 10 +elements*" this can be expressed using the ``buffer`` element: + +.. includecode:: code/docs/stream/TwitterStreamQuickstartDocSpec.scala#tweets-slow-consumption-dropHead + +The ``buffer`` element takes an explicit and required ``OverflowStrategy``, which defines how the buffer should react +when it receives another element element while it is full. Strategies provided include dropping the oldest element (``dropHead``), +dropping the entire buffer, signalling errors etc. Be sure to pick and choose the strategy that fits your use case best. + +Materialized values +------------------- +So far we've been only processing data using Flows and consuming it into some kind of external Sink - be it by printing +values or storing them in some external system. However sometimes we may be interested in some value that can be +obtained from the materialized processing pipeline. For example, we want to know how many tweets we have processed. +While this question is not as obvious to give an answer to in case of an infinite stream of tweets (one way to answer +this question in a streaming setting would to create a stream of counts described as "*up until now*, we've processed N tweets"), +but in general it is possible to deal with finite streams and come up with a nice result such as a total count of elements. + +First, let's write such an element counter using :class:`FoldSink` and then we'll see how it is possible to obtain materialized +values from a :class:`MaterializedMap` which is returned by materializing an Akka stream. We'll split execution into multiple +lines for the sake of explaining the concepts of ``Materializable`` elements and ``MaterializedType`` + +.. includecode:: code/docs/stream/TwitterStreamQuickstartDocSpec.scala#tweets-fold-count + +First, we prepare the :class:`FoldSink` which will be used to sum all ``Int`` elements of the stream. +Next we connect the ``tweets`` stream though a ``map`` step which converts each tweet into the number ``1``, +finally we connect the flow ``to`` the previously prepared Sink. Notice that this step does *not* yet materialize the +processing pipeline, it merely prepares the description of the Flow, which is now connected to a Sink, and therefore can +be ``run()``, as indicated by its type: :class:`RunnableFlow`. Next we call ``run()`` which uses the implicit :class:`FlowMaterializer` +to materialize and run the flow. The value returned by calling ``run()`` on a ``RunnableFlow`` or ``FlowGraph`` is ``MaterializedMap``, +which can be used to retrieve materialized values from the running stream. + +In order to extract an materialized value from a running stream it is possible to call ``get(Materializable)`` on a materialized map +obtained from materializing a flow or graph. Since ``FoldSink`` implements ``Materializable`` and implements the ``MaterializedType`` +as ``Future[Int]`` we can use it to obtain the :class:`Future` which when completed will contain the total length of our tweets stream. +In case of the stream failing, this future would complete with a Failure. + +The reason we have to ``get`` the value out from the materialized map, is because a :class:`RunnableFlow` may be reused +and materialized multiple times, because it is just the "blueprint" of the stream. This means that if we materialize a stream, +for example one that consumes a live stream of tweets within a minute, the materialized values for those two materializations +will be different, as illustrated by this example: + +.. includecode:: code/docs/stream/TwitterStreamQuickstartDocSpec.scala#tweets-runnable-flow-materialized-twice + +Many elements in Akka Streams provide materialized values which can be used for obtaining either results of computation or +steering these elements which will be discussed in detail in :ref:`stream-materialization-scala`. Summing up this section, now we know +what happens behind the scenes when we run this one-liner, which is equivalent to the multi line version above: + +.. includecode:: code/docs/stream/TwitterStreamQuickstartDocSpec.scala#tweets-fold-count-oneline diff --git a/akka-docs-dev/rst/scala/stream-rate.rst b/akka-docs-dev/rst/scala/stream-rate.rst new file mode 100644 index 0000000000..0ca5b14e31 --- /dev/null +++ b/akka-docs-dev/rst/scala/stream-rate.rst @@ -0,0 +1,31 @@ +.. _stream-rate-scala: + +############################# +Buffers and working with rate +############################# + +Buffers in Akka Streams +======================= + +Internal buffers and their effect +--------------------------------- + +*TODO* + +Explicit user defined buffers +----------------------------- + +*TODO* + +Rate transformation +=================== + +Understanding conflate +---------------------- + +*TODO* + +Understanding expand +-------------------- + +*TODO* diff --git a/akka-docs-dev/rst/scala/stream-design.rst b/akka-docs-dev/rst/stream-design.rst similarity index 100% rename from akka-docs-dev/rst/scala/stream-design.rst rename to akka-docs-dev/rst/stream-design.rst