From 1c722b8ae132ab65a88ec210bf8b61adf466395d Mon Sep 17 00:00:00 2001 From: Konrad 'ktoso' Malawski Date: Thu, 11 Dec 2014 14:57:48 +0100 Subject: [PATCH 1/2] +doc flow / flowgraph docs, moved quickstart + moved quickstart + more info about simple materialization --- .../scala/code/docs/stream/FlowDocSpec.scala | 88 +++++ .../code/docs/stream/FlowGraphDocSpec.scala | 30 ++ .../code/docs/stream/StreamDocSpec.scala | 25 -- akka-docs-dev/rst/scala/stream-quickstart.rst | 166 +++++++++ akka-docs-dev/rst/scala/stream.rst | 334 +++++++----------- 5 files changed, 405 insertions(+), 238 deletions(-) create mode 100644 akka-docs-dev/rst/scala/code/docs/stream/FlowDocSpec.scala delete mode 100644 akka-docs-dev/rst/scala/code/docs/stream/StreamDocSpec.scala create mode 100644 akka-docs-dev/rst/scala/stream-quickstart.rst diff --git a/akka-docs-dev/rst/scala/code/docs/stream/FlowDocSpec.scala b/akka-docs-dev/rst/scala/code/docs/stream/FlowDocSpec.scala new file mode 100644 index 0000000000..546086a535 --- /dev/null +++ b/akka-docs-dev/rst/scala/code/docs/stream/FlowDocSpec.scala @@ -0,0 +1,88 @@ +/** + * Copyright (C) 2014 Typesafe Inc. + */ +package docs.stream + +import akka.actor.Cancellable +import akka.stream.scaladsl.MaterializedMap +import akka.stream.scaladsl.RunnableFlow +import akka.stream.scaladsl.Sink +import akka.stream.scaladsl.Source +import akka.stream.testkit.AkkaSpec + +import concurrent.Future + +// TODO replace ⇒ with => and disable this intellij setting +class FlowDocSpec extends AkkaSpec { + + implicit val ec = system.dispatcher + + //#imports + import akka.stream.FlowMaterializer + //#imports + + implicit val mat = FlowMaterializer() + + "source is immutable" in { + //#source-immutable + val source = Source(1 to 10) + source.map(_ ⇒ 0) // has no effect on source, since it's immutable + source.runWith(Sink.fold(0)(_ + _)) // 55 + + val zeroes = source.map(_ ⇒ 0) // returns new Source[Int], with `map()` appended + zeroes.runWith(Sink.fold(0)(_ + _)) // 0 + //#source-immutable + } + + "materialization in steps" in { + //#materialization-in-steps + val source = Source(1 to 10) + val sink = Sink.fold[Int, Int](0)(_ + _) + + // connect the Source to the Sink, obtaining a RunnableFlow + val runnable: RunnableFlow = source.to(sink) + + // materialize the flow + val materialized: MaterializedMap = runnable.run() + + // get the materialized value of the FoldSink + val sum: Future[Int] = materialized.get(sink) + + //#materialization-in-steps + } + + "materialization runWith" in { + //#materialization-runWith + val source = Source(1 to 10) + val sink = Sink.fold[Int, Int](0)(_ + _) + + // materialize the flow + val materialized: MaterializedMap = source.to(sink).run() + + // get the materialized value from the running streams MaterializedMap + val sum: Future[Int] = materialized.get(sink) + //#materialization-runWith + } + + "compound source cannot be used as key" in { + //#compound-source-is-not-keyed-runWith + import scala.concurrent.duration._ + case object Tick + + val timer = Source(initialDelay = 1.second, interval = 1.seconds, tick = () ⇒ Tick) + + val timerCancel: Cancellable = Sink.ignore.runWith(timer) + timerCancel.cancel() + + val timerMap = timer.map(tick ⇒ "tick") + val _ = Sink.ignore.runWith(timerMap) // WRONG: returned type is not the timers Cancellable! + //#compound-source-is-not-keyed-runWith + + //#compound-source-is-not-keyed-run + // retain the materialized map, in order to retrieve the timers Cancellable + val materialized = timerMap.to(Sink.ignore).run() + val timerCancellable = materialized.get(timer) + timerCancellable.cancel() + //#compound-source-is-not-keyed-run + } +} diff --git a/akka-docs-dev/rst/scala/code/docs/stream/FlowGraphDocSpec.scala b/akka-docs-dev/rst/scala/code/docs/stream/FlowGraphDocSpec.scala index 847eb50666..67b5d684a7 100644 --- a/akka-docs-dev/rst/scala/code/docs/stream/FlowGraphDocSpec.scala +++ b/akka-docs-dev/rst/scala/code/docs/stream/FlowGraphDocSpec.scala @@ -15,6 +15,9 @@ import akka.stream.scaladsl.Source import akka.stream.scaladsl.Zip import akka.stream.testkit.AkkaSpec +import scala.concurrent.Await +import scala.concurrent.duration._ + // TODO replace ⇒ with => and disable this intellij setting class FlowGraphDocSpec extends AkkaSpec { @@ -89,4 +92,31 @@ class FlowGraphDocSpec extends AkkaSpec { }.getMessage should include("must have at least 1 outgoing edge") } + "reusing a flow in a graph" in { + //#flow-graph-reusing-a-flow + + val topHeadSink = Sink.head[Int] + val bottomHeadSink = Sink.head[Int] + val sharedDoubler = Flow[Int].map(_ * 2) + + //#flow-graph-reusing-a-flow + + // format: OFF + val g = + //#flow-graph-reusing-a-flow + FlowGraph { implicit b ⇒ + import FlowGraphImplicits._ + val broadcast = Broadcast[Int] + Source.single(1) ~> broadcast + + broadcast ~> sharedDoubler ~> topHeadSink + broadcast ~> sharedDoubler ~> bottomHeadSink + } + //#flow-graph-reusing-a-flow + // format: ON + val map = g.run() + Await.result(map.get(topHeadSink), 300.millis) shouldEqual 2 + Await.result(map.get(bottomHeadSink), 300.millis) shouldEqual 2 + } + } diff --git a/akka-docs-dev/rst/scala/code/docs/stream/StreamDocSpec.scala b/akka-docs-dev/rst/scala/code/docs/stream/StreamDocSpec.scala deleted file mode 100644 index 91b86ffd42..0000000000 --- a/akka-docs-dev/rst/scala/code/docs/stream/StreamDocSpec.scala +++ /dev/null @@ -1,25 +0,0 @@ -/** - * Copyright (C) 2014 Typesafe Inc. - */ -package docs.stream - -import akka.stream.scaladsl.Flow -import akka.stream.scaladsl.FlowGraph -import akka.stream.scaladsl.FlowGraphImplicits -import akka.stream.scaladsl.Source -import akka.stream.scaladsl.Zip -import akka.stream.testkit.AkkaSpec - -// TODO replace ⇒ with => and disable this intellij setting -class StreamDocSpec extends AkkaSpec { - - implicit val ec = system.dispatcher - - //#imports - import akka.stream.FlowMaterializer - import akka.stream.scaladsl.Broadcast - //#imports - - implicit val mat = FlowMaterializer() - -} diff --git a/akka-docs-dev/rst/scala/stream-quickstart.rst b/akka-docs-dev/rst/scala/stream-quickstart.rst new file mode 100644 index 0000000000..3751395b62 --- /dev/null +++ b/akka-docs-dev/rst/scala/stream-quickstart.rst @@ -0,0 +1,166 @@ +.. _quickstart-scala: +Quick Start: Reactive Tweets +============================ + +A typical use case for stream processing is consuming a live stream of data that we want to extract or aggregate some +other data from. In this example we'll consider consuming a stream of tweets and extracting information concerning Akka from them. + +We will also consider the problem inherent to all non-blocking streaming solutions – "*What if the subscriber is slower +to consume the live stream of data?*" i.e. it is unable to keep up with processing the live data. Traditionally the solution +is often to buffer the elements, but this can (and usually *will*) cause eventual buffer overflows and instability of such systems. +Instead Akka Streams depend on internal backpressure signals that allow to control what should happen in such scenarios. + +Here's the data model we'll be working with throughout the quickstart examples: + +.. includecode:: code/docs/stream/TwitterStreamQuickstartDocSpec.scala#model + +Transforming and consuming simple streams +----------------------------------------- +In order to prepare our environment by creating an :class:`ActorSystem` and :class:`FlowMaterializer`, +which will be responsible for materializing and running the streams we are about to create: + +.. includecode:: code/docs/stream/TwitterStreamQuickstartDocSpec.scala#materializer-setup + +The :class:`FlowMaterializer` can optionally take :class:`MaterializerSettings` which can be used to define +materialization properties, such as default buffer sizes (see also :ref:`stream-buffering-explained-scala`), the dispatcher to +be used by the pipeline etc. These can be overridden on an element-by-element basis or for an entire section, but this +will be discussed in depth in :ref:`stream-section-configuration`. + +Let's assume we have a stream of tweets readily available, in Akka this is expressed as a :class:`Source[Out]`: + +.. includecode:: code/docs/stream/TwitterStreamQuickstartDocSpec.scala#tweet-source + +Streams always start flowing from a :class:`Source[Out]` then can continue through :class:`Flow[In,Out]` elements or +more advanced graph elements to finally be consumed by a :class:`Sink[In]`. Both Sources and Flows provide stream operations +that can be used to transform the flowing data, a :class:`Sink` however does not since its the "end of stream" and its +behavior depends on the type of :class:`Sink` used. + +In our case let's say we want to find all twitter handles of users which tweet about ``#akka``, the operations should look +familiar to anyone who has used the Scala Collections library, however they operate on streams and not collections of data: + +.. includecode:: code/docs/stream/TwitterStreamQuickstartDocSpec.scala#authors-filter-map + +Finally in order to :ref:`materialize ` and run the stream computation we need to attach +the Flow to a :class:`Sink[T]` that will get the flow running. The simplest way to do this is to call +``runWith(sink)`` on a ``Source[Out]``. For convenience a number of common Sinks are predefined and collected as methods on +the :class:``Sink`` `companion object `_. +For now let's simply print each author: + +.. includecode:: code/docs/stream/TwitterStreamQuickstartDocSpec.scala#authors-foreachsink-println + +or by using the shorthand version (which are defined only for the most popular sinks such as :class:`FoldSink` and :class:`ForeachSink`): + +.. includecode:: code/docs/stream/TwitterStreamQuickstartDocSpec.scala#authors-foreach-println + +Materializing and running a stream always requires a :class:`FlowMaterializer` to be in implicit scope (or passed in explicitly, +like this: ``.run(mat)``). + +Flattening sequences in streams +------------------------------- +In the previous section we were working on 1:1 relationships of elements which is the most common case, but sometimes +we might want to map from one element to a number of elements and receive a "flattened" stream, similarly like ``flatMap`` +works on Scala Collections. In order to get a flattened stream of hashtags from our stream of tweets we can use the ``mapConcat`` +combinator: + +.. includecode:: code/docs/stream/TwitterStreamQuickstartDocSpec.scala#hashtags-mapConcat + +.. note:: + The name ``flatMap`` was consciously avoided due to its proximity with for-comprehensions and monadic composition. + It is problematic for two reasons: firstly, flattening by concatenation is often undesirable in bounded stream processing + due to the risk of deadlock (with merge being the preferred strategy), and secondly, the monad laws would not hold for + our implementation of flatMap (due to the liveness issues). + + Please note that the mapConcat requires the supplied function to return a strict collection (``f:Out⇒immutable.Seq[T]``), + whereas ``flatMap`` would have to operate on streams all the way through. + + +Broadcasting a stream +--------------------- +Now let's say we want to persist all hashtags, as well as all author names from this one live stream. +For example we'd like to write all author handles into one file, and all hashtags into another file on disk. +This means we have to split the source stream into 2 streams which will handle the writing to these different files. + +Elements that can be used to form such "fan-out" (or "fan-in") structures are referred to as "junctions" in Akka Streams. +One of these that we'll be using in this example is called :class:`Broadcast`, and it simply emits elements from its +input port to all of its output ports. + +Akka Streams intentionally separate the linear stream structures (Flows) from the non-linear, branching ones (FlowGraphs) +in order to offer the most convenient API for both of these cases. Graphs can express arbitrarily complex stream setups +at the expense of not reading as familiarly as collection transformations. It is also possible to wrap complex computation +graphs as Flows, Sinks or Sources, which will be explained in detail in :ref:`constructing-sources-sinks-flows-from-partial-graphs-scala`. +FlowGraphs are constructed like this: + +.. includecode:: code/docs/stream/TwitterStreamQuickstartDocSpec.scala#flow-graph-broadcast + +.. note:: + The ``~>`` (read as "edge", "via" or "to") operator is only available if ``FlowGraphImplicits._`` are imported. + Without this import you can still construct graphs using the ``builder.addEdge(from,[through,]to)`` method. + +As you can see, inside the :class:`FlowGraph` we use an implicit graph builder to mutably construct the graph +using the ``~>`` "edge operator" (also read as "connect" or "via" or "to"). Once we have the FlowGraph in the value ``g`` +*it is immutable, thread-safe, and freely shareable*. A graph can can be ``run()`` directly - assuming all +ports (sinks/sources) within a flow have been connected properly. It is possible to construct :class:`PartialFlowGraph` s +where this is not required but this will be covered in detail in :ref:`partial-flow-graph-scala`. + +As all Akka Streams elements, :class:`Broadcast` will properly propagate back-pressure to its upstream element. + +Back-pressure in action +----------------------- + +One of the main advantages of Akka Streams is that they *always* propagate back-pressure information from stream Sinks +(Subscribers) to their Sources (Publishers). It is not an optional feature, and is enabled at all times. To learn more +about the back-pressure protocol used by Akka Streams and all other Reactive Streams compatible implementations read +:ref:`back-pressure-explained-scala`. + +A typical problem applications (not using Akka Streams) like this often face is that they are unable to process the incoming data fast enough, +either temporarily or by design, and will start buffering incoming data until there's no more space to buffer, resulting +in either ``OutOfMemoryError`` s or other severe degradations of service responsiveness. With Akka Streams buffering can +and must be handled explicitly. For example, if we are only interested in the "*most recent tweets, with a buffer of 10 +elements*" this can be expressed using the ``buffer`` element: + +.. includecode:: code/docs/stream/TwitterStreamQuickstartDocSpec.scala#tweets-slow-consumption-dropHead + +The ``buffer`` element takes an explicit and required ``OverflowStrategy``, which defines how the buffer should react +when it receives another element element while it is full. Strategies provided include dropping the oldest element (``dropHead``), +dropping the entire buffer, signalling errors etc. Be sure to pick and choose the strategy that fits your use case best. + +Materialized values +------------------- +So far we've been only processing data using Flows and consuming it into some kind of external Sink - be it by printing +values or storing them in some external system. However sometimes we may be interested in some value that can be +obtained from the materialized processing pipeline. For example, we want to know how many tweets we have processed. +While this question is not as obvious to give an answer to in case of an infinite stream of tweets (one way to answer +this question in a streaming setting would to create a stream of counts described as "*up until now*, we've processed N tweets"), +but in general it is possible to deal with finite streams and come up with a nice result such as a total count of elements. + +First, let's write such an element counter using :class:`FoldSink` and then we'll see how it is possible to obtain materialized +values from a :class:`MaterializedMap` which is returned by materializing an Akka stream. We'll split execution into multiple +lines for the sake of explaining the concepts of ``Materializable`` elements and ``MaterializedType`` + +.. includecode:: code/docs/stream/TwitterStreamQuickstartDocSpec.scala#tweets-fold-count + +First, we prepare the :class:`FoldSink` which will be used to sum all ``Int`` elements of the stream. +Next we connect the ``tweets`` stream though a ``map`` step which converts each tweet into the number ``1``, +finally we connect the flow ``to`` the previously prepared Sink. Notice that this step does *not* yet materialize the +processing pipeline, it merely prepares the description of the Flow, which is now connected to a Sink, and therefore can +be ``run()``, as indicated by its type: :class:`RunnableFlow`. Next we call ``run()`` which uses the implicit :class:`FlowMaterializer` +to materialize and run the flow. The value returned by calling ``run()`` on a ``RunnableFlow`` or ``FlowGraph`` is ``MaterializedMap``, +which can be used to retrieve materialized values from the running stream. + +In order to extract an materialized value from a running stream it is possible to call ``get(Materializable)`` on a materialized map +obtained from materializing a flow or graph. Since ``FoldSink`` implements ``Materializable`` and implements the ``MaterializedType`` +as ``Future[Int]`` we can use it to obtain the :class:`Future` which when completed will contain the total length of our tweets stream. +In case of the stream failing, this future would complete with a Failure. + +The reason we have to ``get`` the value out from the materialized map, is because a :class:`RunnableFlow` may be reused +and materialized multiple times, because it is just the "blueprint" of the stream. This means that if we materialize a stream, +for example one that consumes a live stream of tweets within a minute, the materialized values for those two materializations +will be different, as illustrated by this example: + +.. includecode:: code/docs/stream/TwitterStreamQuickstartDocSpec.scala#tweets-runnable-flow-materialized-twice + +Many elements in Akka Streams provide materialized values which can be used for obtaining either results of computation or +steering these elements which will be discussed in detail in :ref:`stream-materialization-scala`. Summing up this section, now we know +what happens behind the scenes when we run this one-liner, which is equivalent to the multi line version above: + +.. includecode:: code/docs/stream/TwitterStreamQuickstartDocSpec.scala#tweets-fold-count-oneline diff --git a/akka-docs-dev/rst/scala/stream.rst b/akka-docs-dev/rst/scala/stream.rst index f7d8f98b5d..bbb049181b 100644 --- a/akka-docs-dev/rst/scala/stream.rst +++ b/akka-docs-dev/rst/scala/stream.rst @@ -30,203 +30,90 @@ Motivation **TODO - write me** -Quick Start: Reactive Tweets -============================ - -A typical use case for stream processing is consuming a live stream of data that we want to extract or aggregate some -other data from. In this example we'll consider consuming a stream of tweets and extracting information concerning Akka from them. - -We will also consider the problem inherent to all non-blocking streaming solutions – "*What if the subscriber is slower -to consume the live stream of data?*" i.e. it is unable to keep up with processing the live data. Traditionally the solution -is often to buffer the elements, but this can (and usually *will*) cause eventual buffer overflows and instability of such systems. -Instead Akka Streams depend on internal backpressure signals that allow to control what should happen in such scenarios. - -Here's the data model we'll be working with throughout the quickstart examples: - -.. includecode:: code/docs/stream/TwitterStreamQuickstartDocSpec.scala#model - -Transforming and consuming simple streams ------------------------------------------ -In order to prepare our environment by creating an :class:`ActorSystem` and :class:`FlowMaterializer`, -which will be responsible for materializing and running the streams we are about to create: - -.. includecode:: code/docs/stream/TwitterStreamQuickstartDocSpec.scala#materializer-setup - -The :class:`FlowMaterializer` can optionally take :class:`MaterializerSettings` which can be used to define -materialization properties, such as default buffer sizes (see also :ref:`stream-buffering-explained-scala`), the dispatcher to -be used by the pipeline etc. These can be overridden on an element-by-element basis or for an entire section, but this -will be discussed in depth in :ref:`stream-section-configuration`. - -Let's assume we have a stream of tweets readily available, in Akka this is expressed as a :class:`Source[Out]`: - -.. includecode:: code/docs/stream/TwitterStreamQuickstartDocSpec.scala#tweet-source - -Streams always start flowing from a :class:`Source[Out]` then can continue through :class:`Flow[In,Out]` elements or -more advanced graph elements to finally be consumed by a :class:`Sink[In]`. Both Sources and Flows provide stream operations -that can be used to transform the flowing data, a :class:`Sink` however does not since its the "end of stream" and its -behavior depends on the type of :class:`Sink` used. - -In our case let's say we want to find all twitter handles of users which tweet about ``#akka``, the operations should look -familiar to anyone who has used the Scala Collections library, however they operate on streams and not collections of data: - -.. includecode:: code/docs/stream/TwitterStreamQuickstartDocSpec.scala#authors-filter-map - -Finally in order to :ref:`materialize ` and run the stream computation we need to attach -the Flow to a :class:`Sink[T]` that will get the flow running. The simplest way to do this is to call -``runWith(sink)`` on a ``Source[Out]``. For convenience a number of common Sinks are predefined and collected as methods on -the :class:``Sink`` `companion object `_. -For now let's simply print each author: - -.. includecode:: code/docs/stream/TwitterStreamQuickstartDocSpec.scala#authors-foreachsink-println - -or by using the shorthand version (which are defined only for the most popular sinks such as :class:`FoldSink` and :class:`ForeachSink`): - -.. includecode:: code/docs/stream/TwitterStreamQuickstartDocSpec.scala#authors-foreach-println - -Materializing and running a stream always requires a :class:`FlowMaterializer` to be in implicit scope (or passed in explicitly, -like this: ``.run(mat)``). - -Flattening sequences in streams -------------------------------- -In the previous section we were working on 1:1 relationships of elements which is the most common case, but sometimes -we might want to map from one element to a number of elements and receive a "flattened" stream, similarly like ``flatMap`` -works on Scala Collections. In order to get a flattened stream of hashtags from our stream of tweets we can use the ``mapConcat`` -combinator: - -.. includecode:: code/docs/stream/TwitterStreamQuickstartDocSpec.scala#hashtags-mapConcat - -.. note:: - The name ``flatMap`` was consciously avoided due to its proximity with for-comprehensions and monadic composition. - It is problematic for two reasons: firstly, flattening by concatenation is often undesirable in bounded stream processing - due to the risk of deadlock (with merge being the preferred strategy), and secondly, the monad laws would not hold for - our implementation of flatMap (due to the liveness issues). - - Please note that the mapConcat requires the supplied function to return a strict collection (``f:Out⇒immutable.Seq[T]``), - whereas ``flatMap`` would have to operate on streams all the way through. - - -Broadcasting a stream ---------------------- -Now let's say we want to persist all hashtags, as well as all author names from this one live stream. -For example we'd like to write all author handles into one file, and all hashtags into another file on disk. -This means we have to split the source stream into 2 streams which will handle the writing to these different files. - -Elements that can be used to form such "fan-out" (or "fan-in") structures are referred to as "junctions" in Akka Streams. -One of these that we'll be using in this example is called :class:`Broadcast`, and it simply emits elements from its -input port to all of its output ports. - -Akka Streams intentionally separate the linear stream structures (Flows) from the non-linear, branching ones (FlowGraphs) -in order to offer the most convenient API for both of these cases. Graphs can express arbitrarily complex stream setups -at the expense of not reading as familiarly as collection transformations. It is also possible to wrap complex computation -graphs as Flows, Sinks or Sources, which will be explained in detail in :ref:`constructing-sources-sinks-flows-from-partial-graphs-scala`. -FlowGraphs are constructed like this: - -.. includecode:: code/docs/stream/TwitterStreamQuickstartDocSpec.scala#flow-graph-broadcast - -.. note:: - The ``~>`` (read as "edge", "via" or "to") operator is only available if ``FlowGraphImplicits._`` are imported. - Without this import you can still construct graphs using the ``builder.addEdge(from,[through,]to)`` method. - -As you can see, inside the :class:`FlowGraph` we use an implicit graph builder to mutably construct the graph -using the ``~>`` "edge operator" (also read as "connect" or "via" or "to"). Once we have the FlowGraph in the value ``g`` -*it is immutable, thread-safe, and freely shareable*. A graph can can be ``run()`` directly - assuming all -ports (sinks/sources) within a flow have been connected properly. It is possible to construct :class:`PartialFlowGraph` s -where this is not required but this will be covered in detail in :ref:`partial-flow-graph-scala`. - -As all Akka streams elements, :class:`Broadcast` will properly propagate back-pressure to its upstream element. - -Back-pressure in action ------------------------ - -One of the main advantages of Akka streams is that they *always* propagate back-pressure information from stream Sinks -(Subscribers) to their Sources (Publishers). It is not an optional feature, and is enabled at all times. To learn more -about the back-pressure protocol used by Akka Streams and all other Reactive Streams compatible implementations read -:ref:`back-pressure-explained-scala`. - -A typical problem applications (not using Akka streams) like this often face is that they are unable to process the incoming data fast enough, -either temporarily or by design, and will start buffering incoming data until there's no more space to buffer, resulting -in either ``OutOfMemoryError`` s or other severe degradations of service responsiveness. With Akka streams buffering can -and must be handled explicitly. For example, if we are only interested in the "*most recent tweets, with a buffer of 10 -elements*" this can be expressed using the ``buffer`` element: - -.. includecode:: code/docs/stream/TwitterStreamQuickstartDocSpec.scala#tweets-slow-consumption-dropHead - -The ``buffer`` element takes an explicit and required ``OverflowStrategy``, which defines how the buffer should react -when it receives another element element while it is full. Strategies provided include dropping the oldest element (``dropHead``), -dropping the entire buffer, signalling errors etc. Be sure to pick and choose the strategy that fits your use case best. - -Materialized values -------------------- -So far we've been only processing data using Flows and consuming it into some kind of external Sink - be it by printing -values or storing them in some external system. However sometimes we may be interested in some value that can be -obtained from the materialized processing pipeline. For example, we want to know how many tweets we have processed. -While this question is not as obvious to give an answer to in case of an infinite stream of tweets (one way to answer -this question in a streaming setting would to create a stream of counts described as "*up until now*, we've processed N tweets"), -but in general it is possible to deal with finite streams and come up with a nice result such as a total count of elements. - -First, let's write such an element counter using :class:`FoldSink` and then we'll see how it is possible to obtain materialized -values from a :class:`MaterializedMap` which is returned by materializing an Akka stream. We'll split execution into multiple -lines for the sake of explaining the concepts of ``Materializable`` elements and ``MaterializedType`` - -.. includecode:: code/docs/stream/TwitterStreamQuickstartDocSpec.scala#tweets-fold-count - -First, we prepare the :class:`FoldSink` which will be used to sum all ``Int`` elements of the stream. -Next we connect the ``tweets`` stream though a ``map`` step which converts each tweet into the number ``1``, -finally we connect the flow ``to`` the previously prepared Sink. Notice that this step does *not* yet materialize the -processing pipeline, it merely prepares the description of the Flow, which is now connected to a Sink, and therefore can -be ``run()``, as indicated by its type: :class:`RunnableFlow`. Next we call ``run()`` which uses the implicit :class:`FlowMaterializer` -to materialize and run the flow. The value returned by calling ``run()`` on a ``RunnableFlow`` or ``FlowGraph`` is ``MaterializedMap``, -which can be used to retrieve materialized values from the running stream. - -In order to extract an materialized value from a running stream it is possible to call ``get(Materializable)`` on a materialized map -obtained from materializing a flow or graph. Since ``FoldSink`` implements ``Materializable`` and implements the ``MaterializedType`` -as ``Future[Int]`` we can use it to obtain the :class:`Future` which when completed will contain the total length of our tweets stream. -In case of the stream failing, this future would complete with a Failure. - -The reason we have to ``get`` the value out from the materialized map, is because a :class:`RunnableFlow` may be reused -and materialized multiple times, because it is just the "blueprint" of the stream. This means that if we materialize a stream, -for example one that consumes a live stream of tweets within a minute, the materialized values for those two materializations -will be different, as illustrated by this example: - -.. includecode:: code/docs/stream/TwitterStreamQuickstartDocSpec.scala#tweets-runnable-flow-materialized-twice - -Many elements in Akka streams provide materialized values which can be used for obtaining either results of computation or -steering these elements which will be discussed in detail in :ref:`stream-materialization-scala`. Summing up this section, now we know -what happens behind the scenes when we run this one-liner, which is equivalent to the multi line version above: - -.. includecode:: code/docs/stream/TwitterStreamQuickstartDocSpec.scala#tweets-fold-count-oneline - - Core concepts ============= -// TODO REWORD? This section explains the core types and concepts used in Akka Streams, from a more day-to-day use angle. -If we would like to get the big picture overview you may be interested in reading :ref:`stream-design`. +Everything in Akka Streams revolves around a number of core concepts which we introduce in detail in this section. + +Akka Streams provide a way for executing bounded processing pipelines, where bounds are expressed as the number of stream +elements in flight and in buffers at any given time. Please note that while this allows to estimate an limit memory use +it is not strictly bound to the size in memory of these elements. + +We define a few key words which will be used though out the entire documentation: + +Stream + An active process that involves moving and transforming data. +Element + An element is the unit which is passed through the stream. All operations as well as back-pressure are expressed in + terms of elements. +Back-pressure + A means of flow-control, and most notably adjusting the speed of upstream sources to the consumption speeds of their sinks. + In the context of Akka Streams back-pressure is always understood as *non-blocking* and *asynchronous* +Processing Stage + The common name for all building blocks that build up a Flow or FlowGraph. + Examples of a processing stage would be Stage (:class:`PushStage`, :class:`PushPullStage`, :class:`StatefulStage`, + :class:`DetachedStage`), in terms of which operations like ``map()``, ``filter()`` and others are implemented. Sources, Flows and Sinks ------------------------ +Linear processing pipelines can be expressed in Akka Streams using the following three core abstractions: -// TODO: runnable flow, types - runWith +Source + A processing stage with *exactly one output*, emitting data elements in response to it's down-stream demand. +Sink + A processing stage with *exactly one input*, generating demand based on it's internal demand management strategy. +Flow + A processing stage which has *exactly one input and output*, which connects it's up and downstreams by (usually) + transforming the data elements flowing through it. +RunnableFlow + A Flow with has both ends "attached" to a Source and Sink respectively, and is ready to be ``run()``. -// TODO: talk about how creating and sharing a ``Flow.of[String]`` is useful etc. +It is important to remember that while constructing these processing pipelines by connecting their different processing +stages no data will flow through it until it is materialized. Materialization is the process of allocating all resources +needed to run the computation described by a Flow (in Akka Streams this will often involve starting up Actors). +Thanks to Flows being simply a description of the processing pipeline they are *immutable, thread-safe, and freely shareable*, +which means that it is for example safe to share send between actors–to have one actor prepare the work, and then have it +be materialized at some completely different place in the code. + +In order to be able to run a ``Flow[In,Out]`` it must be connected to a ``Sink[In]`` *and* ``Source[Out]`` of matching types. +It is also possible to directly connect a :class:`Sink` to a :class:`Source`. + +.. includecode:: code/docs/stream/FlowDocSpec.scala#materialization-in-steps + +The :class:`MaterializedMap` can be used to get materialized values of both sinks and sources out from the running +stream. In general, a stream can expose multiple materialized values, however the very common case of only wanting to +get back a Sinks (in order to read a result) or Sources (in order to cancel or influence it in some way) materialized +values has a small convenience method called ``runWith()``. It is available for ``Sink`` or ``Source`` and ``Flow``, with respectively, +requiring the user to supply a ``Source`` (in order to run a ``Sink``), a ``Sink`` (in order to run a ``Source``) and +both a ``Source`` and a ``Sink`` (in order to run a ``Flow``, since it has neither attached yet). + +.. includecode:: code/docs/stream/FlowDocSpec.scala#materialization-runWith + +It is worth pointing out that since processing stages are *immutable*, connecting them returns a new processing stage, +instead of modifying the existing instance, so while construction long flows, remember to assign the new value to a variable or run it: + +.. includecode:: code/docs/stream/FlowDocSpec.scala#source-immutable .. note:: - By default Akka streams elements support **exactly one** down-stream element. - Making fan-out (supporting multiple downstream elements) an explicit opt-in feature allows default stream elements to + By default Akka Streams elements support **exactly one** downstream processing stage. + Making fan-out (supporting multiple downstream processing stages) an explicit opt-in feature allows default stream elements to be less complex and more efficient. Also it allows for greater flexibility on *how exactly* to handle the multicast scenarios, - by providing named fan-out elements such as broadcast (signalls all down-stream elements) or balance (signals one of available down-stream elements). + by providing named fan-out elements such as broadcast (signals all down-stream elements) or balance (signals one of available down-stream elements). + +In the above example we used the ``runWith`` method, which both materializes the stream and returns the materialized value +of the given sink or source. .. _back-pressure-explained-scala: Back-pressure explained ----------------------- +Back-pressure in Akka Streams is always enabled and all stream processing stages adhere the same back-pressure protocol. +While these back-pressure signals are in fact explicit in terms of protocol, they are hidden from users of the library, +such that in normal usage one does *not* have to explicitly think about handling back-pressure, unless working with rate detached stages. -// TODO: explain the protocol and how it performs in slow-pub/fast-sub and fast-pub/slow-sub scenarios +Back-pressure is defined in terms of element count which referred to as ``demand``. -Backpressure when Fast Publisher and Slow Subscriber ----------------------------------------------------- - -// TODO: Write me +Akka Streams implement the Reactive Streams back-pressure protocol, which can be described as a dynamic push/pull model. In depth ======== @@ -258,24 +145,29 @@ Stream Materialization ---------------------- **TODO - write me (feel free to move around as well)** -When constructing flows and graphs in Akka streams think of them as preparing a blueprint, an execution plan. +When constructing flows and graphs in Akka Streams think of them as preparing a blueprint, an execution plan. Stream materialization is the process of taking a stream description (the graph) and allocating all the necessary resources -it needs in order to run. In the case of Akka streams this often means starting up Actors which power the processing, +it needs in order to run. In the case of Akka Streams this often means starting up Actors which power the processing, but is not restricted to that - it could also mean opening files or socket connections etc. – depending on what the stream needs. Materialization is triggered at so called "terminal operations". Most notably this includes the various forms of the ``run()`` and ``runWith()`` methods defined on flow elements as well as a small number of special syntactic sugars for running with well-known sinks, such as ``foreach(el => )`` (being an alias to ``runWith(Sink.foreach(el => ))``. +Materialization is currently performed synchronously on the materializing thread. +Tha actual stream processing is handled by :ref:`Actors actor-scala` started up during the streams materialization, +which will be running on the thread pools they have been configured to run on - which defaults to the dispatcher set in +:class:`MaterializationSettings` while constructing the :class:`FlowMaterializer`. + +.. note:: + Reusing *instances* of linear computation stages (Source, Sink, Flow) inside FlowGraphs is legal, + yet will materialize that stage multiple times. + + MaterializedMap ^^^^^^^^^^^^^^^ **TODO - write me (feel free to move around as well)** -Working with rates ------------------- - -**TODO - write me (feel free to move around as well)** - Optimizations ^^^^^^^^^^^^^ // TODO: not really to be covered right now, right? @@ -291,10 +183,10 @@ Section configuration --------------------- // TODO: it is possible to configure sections of a graph - +.. _working-with-graphs-scala: Working with Graphs =================== -Akka streams are unique in the way they handle and expose computation graphs - instead of hiding the fact that the +Akka Streams are unique in the way they handle and expose computation graphs - instead of hiding the fact that the processing pipeline is in fact a graph in a purely "fluent" DSL, graph operations are written in a DSL that graphically resembles and embraces the fact that the built pipeline is in fact a Graph. In this section we'll dive into the multiple ways of constructing and re-using graphs, as well as explain common pitfalls and how to avoid them. @@ -313,44 +205,60 @@ Flow graphs are built from simple Flows which serve as the linear connections wi which serve as fan-in and fan-out points for flows. Thanks to the junctions having meaningful types based on their behaviour and making them explicit elements these elements should be rather straight forward to use. -Akka streams currently provides these junctions: +Akka Streams currently provides these junctions: * **Fan-out** - - :class:`Broadcast` – (1 input, n outputs) signals each output given an input signal, - - :class:`Balance` – (1 input => n outputs), signals one of its output ports given an input signal, - - :class:`UnZip` – (1 input => 2 outputs), which is a specialized element which is able to split a stream of ``(A,B)`` into two streams one type ``A`` and one of type ``B``, - - :class:`FlexiRoute` – (1 input, n outputs), which enables writing custom fan out elements using a simple DSL, + - ``Broadcast[T]`` – (1 input, n outputs) signals each output given an input signal, + - ``Balance[T]`` – (1 input => n outputs), signals one of its output ports given an input signal, + - ``UnZip[A,B]`` – (1 input => 2 outputs), which is a specialized element which is able to split a stream of ``(A,B)`` into two streams one type ``A`` and one of type ``B``, + - ``FlexiRoute[In]`` – (1 input, n outputs), which enables writing custom fan out elements using a simple DSL, * **Fan-in** - - :class:`Merge` – (n inputs , 1 output), picks signals randomly from inputs pushing them one by one to its output, - - :class:`MergePreferred` – like :class:`Merge` but if elements are available on ``preferred`` port, it picks from it, otherwise randomly from ``others``, - - :class:`ZipWith` – (n inputs (defined upfront), 1 output), which takes a function of n inputs that, given all inputs are signalled, transforms and emits 1 output, - + :class:`Zip` – (2 inputs, 1 output), which is a :class:`ZipWith` specialised to zipping input streams of ``A`` and ``B`` into an ``(A,B)`` stream, - - :class:`Concat` – (2 inputs, 1 output), which enables to concatenate streams (first consume one, then the second one), thus the order of which stream is ``first`` and which ``second`` matters, - - :class:`FlexiMerge` – (n inputs, 1 output), which enables writing custom fan out elements using a simple DSL. + - ``Merge[In]`` – (n inputs , 1 output), picks signals randomly from inputs pushing them one by one to its output, + - ``MergePreferred[In]`` – like :class:`Merge` but if elements are available on ``preferred`` port, it picks from it, otherwise randomly from ``others``, + - ``ZipWith[A,B,...,Out]`` – (n inputs (defined upfront), 1 output), which takes a function of n inputs that, given all inputs are signalled, transforms and emits 1 output, + + ``Zip[A,B,Out]`` – (2 inputs, 1 output), which is a :class:`ZipWith` specialised to zipping input streams of ``A`` and ``B`` into an ``(A,B)`` stream, + - ``Concat[T]`` – (2 inputs, 1 output), which enables to concatenate streams (first consume one, then the second one), thus the order of which stream is ``first`` and which ``second`` matters, + - ``FlexiMerge[Out]`` – (n inputs, 1 output), which enables writing custom fan out elements using a simple DSL. One of the goals of the FlowGraph DSL is to look similar to how one would draw a graph on a whiteboard, so that it is simple to translate a design from whiteboard to code and be able to relate those two. Let's illustrate this by translating -the below hand drawn graph into Akka streams: +the below hand drawn graph into Akka Streams: .. image:: ../images/simple-graph-example.png Such graph is simple to translate to the Graph DSL since each linear element corresponds to a :class:`Flow`, and each circle corresponds to either a :class:`Junction` or a :class:`Source` or :class:`Sink` if it is beginning -or ending a :class:`Flow`. +or ending a :class:`Flow`. Junctions must always be created with defined type parameters, as otherwise the ``Nothing`` type +will be inferred and .. includecode:: code/docs/stream/FlowGraphDocSpec.scala#simple-flow-graph +.. note:: + Junction *reference equality* defines *graph node equality* (i.e. the same merge *instance* used in a FlowGraph + refers to the same location in the resulting graph). + Notice the ``import FlowGraphImplicits._`` which brings into scope the ``~>`` operator (read as "edge", "via" or "to"). It is also possible to construct graphs without the ``~>`` operator in case you prefer to use the graph builder explicitly: .. includecode:: code/docs/stream/FlowGraphDocSpec.scala#simple-flow-graph-no-implicits +By looking at the snippets above, it should be apparent that **the** :class:`b:FlowGraphBuilder` **object is mutable**. +It is also used (implicitly) by the ``~>`` operator, also making it a mutable operation as well. +The reason for this design choice is to enable simpler creation of complex graphs, which may even contain cycles. +Once the FlowGraph has been constructed though, the :class:`FlowGraph` instance *is immutable, thread-safe, and freely shareable*. +Linear Flows however are always immutable and appending an operation to a Flow always returns a new Flow instance. +This means that you can safely re-use one given Flow in multiple places in a processing graph. In the example below +we prepare a graph that consists of two parallel streams, in which we re use the same instance of :class:`Flow`, +yet it will properly be materialized as two connections between the corresponding Sources and Sinks: + +.. includecode:: code/docs/stream/FlowGraphDocSpec.scala#flow-graph-reusing-a-flow + .. _partial-flow-graph-scala: Constructing and combining Partial Flow Graphs ---------------------------------------------- Sometimes it is not possible (or needed) to construct the entire computation graph in one place, but instead construct -all of it is different phases in different places and in the end connect them all into a complete graph and run it. +all of its different phases in different places and in the end connect them all into a complete graph and run it. This can be achieved using :class:`PartialFlowGraph`. The reason of representing it as a different type is that a :class:`FlowGraph` requires all ports to be connected, and if they are not it will throw an exception at construction @@ -402,17 +310,18 @@ For defining a ``Flow[T]`` we need to expose both an undefined source and sink: .. includecode:: code/docs/stream/StreamPartialFlowGraphDocSpec.scala#flow-from-partial-flow-graph -Dealing with cycles, deadlocks ------------------------------- -// TODO: why to avoid cycles, how to enable if you really need to +Stream ordering +=============== +In Akka Streams almost all computation stages *preserve input order* of elements, this means that each output element +``O`` is the result of some sequence of incoming ``I1,I2,I3`` elements. This property is even adhered by async operations +such as ``mapAsync``, however an unordered version exists called ``mapAsyncUnordered`` which does not preserve this ordering. -// TODO: problem cases, expand-conflate, expand-filter - -// TODO: working with rate - -// TODO: custom processing - -// TODO: stages and flexi stuff +However, in the case of Junctions which handle multiple input streams (e.g. :class:`Merge`) the output order is **not defined**, +as different junctions may choose to implement consuming their upstreams in a multitude of ways, each being valid under +certain circumstances. If you find yourself in need of fine grained control over order of emitted elements in fan-in +scenarios consider using :class:`MergePreferred` or :class:`FlexiMerge` - which gives you full control over how the +merge is performed. One notable exception from that rule is :class:`Zip` as is only ever emits an element once all of +its upstreams have one available, thus no reordering can occur. Streaming IO ============ @@ -451,7 +360,6 @@ Integrating with Actors ActorPublisher ^^^^^^^^^^^^^^ - ActorSubscriber ^^^^^^^^^^^^^^^ From 98143e3c93470c9612dd7667c47547d800e72c6f Mon Sep 17 00:00:00 2001 From: Konrad 'ktoso' Malawski Date: Thu, 18 Dec 2014 18:11:32 +0100 Subject: [PATCH 2/2] +doc explain backpressure / reactive streams a bit --- .../scala/code/docs/stream/FlowDocSpec.scala | 7 +- .../code/docs/stream/FlowGraphDocSpec.scala | 10 +- .../StreamPartialFlowGraphDocSpec.scala | 2 + akka-docs-dev/rst/scala/stream.rst | 100 ++++++++++++++---- 4 files changed, 88 insertions(+), 31 deletions(-) diff --git a/akka-docs-dev/rst/scala/code/docs/stream/FlowDocSpec.scala b/akka-docs-dev/rst/scala/code/docs/stream/FlowDocSpec.scala index 546086a535..e30e2c7c11 100644 --- a/akka-docs-dev/rst/scala/code/docs/stream/FlowDocSpec.scala +++ b/akka-docs-dev/rst/scala/code/docs/stream/FlowDocSpec.scala @@ -56,11 +56,8 @@ class FlowDocSpec extends AkkaSpec { val source = Source(1 to 10) val sink = Sink.fold[Int, Int](0)(_ + _) - // materialize the flow - val materialized: MaterializedMap = source.to(sink).run() - - // get the materialized value from the running streams MaterializedMap - val sum: Future[Int] = materialized.get(sink) + // materialize the flow, getting the Sinks materialized value + val sum: Future[Int] = source.runWith(sink) //#materialization-runWith } diff --git a/akka-docs-dev/rst/scala/code/docs/stream/FlowGraphDocSpec.scala b/akka-docs-dev/rst/scala/code/docs/stream/FlowGraphDocSpec.scala index 67b5d684a7..e75e1e9d95 100644 --- a/akka-docs-dev/rst/scala/code/docs/stream/FlowGraphDocSpec.scala +++ b/akka-docs-dev/rst/scala/code/docs/stream/FlowGraphDocSpec.scala @@ -33,15 +33,13 @@ class FlowGraphDocSpec extends AkkaSpec { val in = Source(1 to 10) val out = Sink.ignore - val broadcast = Broadcast[Int] + val bcast = Broadcast[Int] val merge = Merge[Int] - val f1 = Flow[Int].map(_ + 10) - val f3 = Flow[Int].map(_.toString) - val f2 = Flow[Int].map(_ + 20) + val f1, f2, f3, f4 = Flow[Int].map(_ + 10) - in ~> broadcast ~> f1 ~> merge - broadcast ~> f2 ~> merge ~> f3 ~> out + in ~> f1 ~> bcast ~> f2 ~> merge ~> f3 ~> out + bcast ~> f4 ~> merge } //#simple-flow-graph //format: ON diff --git a/akka-docs-dev/rst/scala/code/docs/stream/StreamPartialFlowGraphDocSpec.scala b/akka-docs-dev/rst/scala/code/docs/stream/StreamPartialFlowGraphDocSpec.scala index 3c4de39310..231f494fa4 100644 --- a/akka-docs-dev/rst/scala/code/docs/stream/StreamPartialFlowGraphDocSpec.scala +++ b/akka-docs-dev/rst/scala/code/docs/stream/StreamPartialFlowGraphDocSpec.scala @@ -51,7 +51,9 @@ class StreamPartialFlowGraphDocSpec extends AkkaSpec { in3 ~> zip2.right zip2.out ~> out } + //#simple-partial-flow-graph // format: ON + //#simple-partial-flow-graph val resultSink = Sink.head[Int] diff --git a/akka-docs-dev/rst/scala/stream.rst b/akka-docs-dev/rst/scala/stream.rst index bbb049181b..29f518924a 100644 --- a/akka-docs-dev/rst/scala/stream.rst +++ b/akka-docs-dev/rst/scala/stream.rst @@ -39,7 +39,7 @@ Akka Streams provide a way for executing bounded processing pipelines, where bou elements in flight and in buffers at any given time. Please note that while this allows to estimate an limit memory use it is not strictly bound to the size in memory of these elements. -We define a few key words which will be used though out the entire documentation: +First we define the terminology which will be used though out the entire documentation: Stream An active process that involves moving and transforming data. @@ -107,13 +107,67 @@ of the given sink or source. Back-pressure explained ----------------------- -Back-pressure in Akka Streams is always enabled and all stream processing stages adhere the same back-pressure protocol. -While these back-pressure signals are in fact explicit in terms of protocol, they are hidden from users of the library, -such that in normal usage one does *not* have to explicitly think about handling back-pressure, unless working with rate detached stages. +Akka Streams implements an asynchronous non-blocking back-pressure protocol standardised by the Reactive Streams +specification, which Akka is a founding member of. -Back-pressure is defined in terms of element count which referred to as ``demand``. +As library user you do not have to write any explicit back-pressure handling code in order for it to work - it is built +and dealt with automatically by all of the provided Akka Streams processing stages. However is possible to include +explicit buffers with overflow strategies that can influence the behaviour of the stream. This is especially important +in complex processing graphs which may even sometimes even contain loops (which *must* be treated with very special +care, as explained in :ref:`cycles-scala`). -Akka Streams implement the Reactive Streams back-pressure protocol, which can be described as a dynamic push/pull model. +The back pressure protocol is defined in terms of the number of elements a downstream ``Subscriber`` is able to receive, +referred to as ``demand``. This demand is the *number of elements* receiver of the data, referred to as ``Subscriber`` +in Reactive Streams, and implemented by ``Sink`` in Akka Streams is able to safely consume at this point in time. +The source of data referred to as ``Publisher`` in Reactive Streams terminology and implemented as ``Source`` in Akka +Streams guarantees that it will never emit more elements than the received total demand for any given ``Subscriber``. + +.. note:: + The Reactive Streams specification defines its protocol in terms of **Publishers** and **Subscribers**. + These types are *not* meant to be user facing API, instead they serve as the low level building blocks for + different Reactive Streams implementations. + + Akka Streams implements these concepts as **Sources**, **Flows** (referred to as **Processor** in Reactive Streams) + and **Sinks** without exposing the Reactive Streams interfaces directly. + If you need to inter-op between different read :ref:`integration-with-Reactive-Streams-enabled-libraries`. + +The mode in which Reactive Streams back-pressure works can be colloquially described as "dynamic push / pull mode", +since it will switch between push or pull based back-pressure models depending on if the downstream is able to cope +with the upstreams production rate or not. + +To illustrate further let us consider both problem situations and how the back-pressure protocol handles them: + +Slow Publisher, fast Subscriber +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +This is the happy case of course–we do not need to slow down the Publisher in this case. However signalling rates are +rarely constant and could change at any point in time, suddenly ending up in a situation where the Subscriber is now +slower than the Publisher. In order to safeguard from these situations, the back-pressure protocol must still be enabled +during such situations, however we do not want to pay a high penalty for this safety net being enabled. + +The Reactive Streams protocol solves this by asynchronously signalling from the Subscriber to the Publisher +`Request(n:Int)` signals. The protocol guarantees that the Publisher will never signal *more* than the demand it was +signalled. Since the Subscriber however is currently faster, it will be signalling these Request messages at a higher +rate (and possibly also batching together the demand - requesting multiple elements in one Request signal). This means +that the Publisher should not ever have to wait (be back-pressured) with publishing its incoming elements. + +As we can see, in this scenario we effectively operate in so called push-mode since the Publisher can continue producing +elements as fast as it can, since the pending demand will be recovered just-in-time while it is emitting elements. + +Fast Publisher, slow Subscriber +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +This is the case when back-pressuring the ``Publisher`` is required, because the ``Subscriber`` is not able to cope with +the rate at which its upstream would like to emit data elements. + +Since the ``Publisher`` is not allowed to signal more elements than the pending demand signalled by the ``Subscriber``, +it will have to abide to this back-pressure by applying one of the below strategies: + +- not generate elements, if it is able to control their production rate, +- try buffering the elements in a *bounded* manner until more demand is signalled, +- drop elements until more demand is signalled, +- tear down the stream if unable to apply any of the above strategies. + +As we can see, this scenario effectively means that the ``Subscriber`` will *pull* the elements from the Publisher– +this mode of operation is referred to as pull-based back-pressure. In depth ======== @@ -186,10 +240,10 @@ Section configuration .. _working-with-graphs-scala: Working with Graphs =================== -Akka Streams are unique in the way they handle and expose computation graphs - instead of hiding the fact that the -processing pipeline is in fact a graph in a purely "fluent" DSL, graph operations are written in a DSL that graphically -resembles and embraces the fact that the built pipeline is in fact a Graph. In this section we'll dive into the multiple -ways of constructing and re-using graphs, as well as explain common pitfalls and how to avoid them. +In Akka Streams computation graphs are not expressed using a fluent DSL like linear computations are, instead they are +written in a more graph-resembling DSL which aims to make translating graph drawings (e.g. from notes taken +from design discussions, or illustrations in protocol specifications) to and from code simpler. In this section we'll +dive into the multiple ways of constructing and re-using graphs, as well as explain common pitfalls and how to avoid them. Graphs are needed whenever you want to perform any kind of fan-in ("multiple inputs") or fan-out ("multiple outputs") operations. Considering linear Flows to be like roads, we can picture graph operations as junctions: multiple flows being connected at a single point. @@ -210,13 +264,13 @@ Akka Streams currently provides these junctions: * **Fan-out** - ``Broadcast[T]`` – (1 input, n outputs) signals each output given an input signal, - ``Balance[T]`` – (1 input => n outputs), signals one of its output ports given an input signal, - - ``UnZip[A,B]`` – (1 input => 2 outputs), which is a specialized element which is able to split a stream of ``(A,B)`` into two streams one type ``A`` and one of type ``B``, + - ``UnZip[A,B]`` – (1 input => 2 outputs), which is a specialized element which is able to split a stream of ``(A,B)`` tuples into two streams one type ``A`` and one of type ``B``, - ``FlexiRoute[In]`` – (1 input, n outputs), which enables writing custom fan out elements using a simple DSL, * **Fan-in** - ``Merge[In]`` – (n inputs , 1 output), picks signals randomly from inputs pushing them one by one to its output, - ``MergePreferred[In]`` – like :class:`Merge` but if elements are available on ``preferred`` port, it picks from it, otherwise randomly from ``others``, - ``ZipWith[A,B,...,Out]`` – (n inputs (defined upfront), 1 output), which takes a function of n inputs that, given all inputs are signalled, transforms and emits 1 output, - + ``Zip[A,B,Out]`` – (2 inputs, 1 output), which is a :class:`ZipWith` specialised to zipping input streams of ``A`` and ``B`` into an ``(A,B)`` stream, + + ``Zip[A,B,Out]`` – (2 inputs, 1 output), which is a :class:`ZipWith` specialised to zipping input streams of ``A`` and ``B`` into an ``(A,B)`` tuple stream, - ``Concat[T]`` – (2 inputs, 1 output), which enables to concatenate streams (first consume one, then the second one), thus the order of which stream is ``first`` and which ``second`` matters, - ``FlexiMerge[Out]`` – (n inputs, 1 output), which enables writing custom fan out elements using a simple DSL. @@ -312,16 +366,22 @@ For defining a ``Flow[T]`` we need to expose both an undefined source and sink: Stream ordering =============== -In Akka Streams almost all computation stages *preserve input order* of elements, this means that each output element -``O`` is the result of some sequence of incoming ``I1,I2,I3`` elements. This property is even adhered by async operations -such as ``mapAsync``, however an unordered version exists called ``mapAsyncUnordered`` which does not preserve this ordering. +In Akka Streams almost all computation stages *preserve input order* of elements, this means that if inputs ``{IA1,IA2,...,IAn}`` +"cause" outputs ``{OA1,OA2,...,OAk}`` and inputs ``{IB1,IB2,...,IBm}`` "cause" outputs ``{OB1,OB2,...,OBl}`` and all of +``IAi`` happened before all ``IBi`` then ``OAi`` happens before ``OBi``. -However, in the case of Junctions which handle multiple input streams (e.g. :class:`Merge`) the output order is **not defined**, -as different junctions may choose to implement consuming their upstreams in a multitude of ways, each being valid under -certain circumstances. If you find yourself in need of fine grained control over order of emitted elements in fan-in +This property is even uphold by async operations such as ``mapAsync``, however an unordered version exists +called ``mapAsyncUnordered`` which does not preserve this ordering. + +However, in the case of Junctions which handle multiple input streams (e.g. :class:`Merge`) the output order is, +in general, *not defined* for elements arriving on different input ports, that is a merge-like operation may emit ``Ai`` +before emitting ``Bi``, and it is up to its internal logic to decide the order of emitted elements. Specialized elements +such as ``Zip`` however *do guarantee* their outputs order, as each output element depends on all upstream elements having +been signalled already–thus the ordering in the case of zipping is defined by this property. + +If you find yourself in need of fine grained control over order of emitted elements in fan-in scenarios consider using :class:`MergePreferred` or :class:`FlexiMerge` - which gives you full control over how the -merge is performed. One notable exception from that rule is :class:`Zip` as is only ever emits an element once all of -its upstreams have one available, thus no reordering can occur. +merge is performed. Streaming IO ============