diff --git a/akka-docs-dev/rst/scala/code/docs/stream/FlowDocSpec.scala b/akka-docs-dev/rst/scala/code/docs/stream/FlowDocSpec.scala new file mode 100644 index 0000000000..e30e2c7c11 --- /dev/null +++ b/akka-docs-dev/rst/scala/code/docs/stream/FlowDocSpec.scala @@ -0,0 +1,85 @@ +/** + * Copyright (C) 2014 Typesafe Inc. + */ +package docs.stream + +import akka.actor.Cancellable +import akka.stream.scaladsl.MaterializedMap +import akka.stream.scaladsl.RunnableFlow +import akka.stream.scaladsl.Sink +import akka.stream.scaladsl.Source +import akka.stream.testkit.AkkaSpec + +import concurrent.Future + +// TODO replace ⇒ with => and disable this intellij setting +class FlowDocSpec extends AkkaSpec { + + implicit val ec = system.dispatcher + + //#imports + import akka.stream.FlowMaterializer + //#imports + + implicit val mat = FlowMaterializer() + + "source is immutable" in { + //#source-immutable + val source = Source(1 to 10) + source.map(_ ⇒ 0) // has no effect on source, since it's immutable + source.runWith(Sink.fold(0)(_ + _)) // 55 + + val zeroes = source.map(_ ⇒ 0) // returns new Source[Int], with `map()` appended + zeroes.runWith(Sink.fold(0)(_ + _)) // 0 + //#source-immutable + } + + "materialization in steps" in { + //#materialization-in-steps + val source = Source(1 to 10) + val sink = Sink.fold[Int, Int](0)(_ + _) + + // connect the Source to the Sink, obtaining a RunnableFlow + val runnable: RunnableFlow = source.to(sink) + + // materialize the flow + val materialized: MaterializedMap = runnable.run() + + // get the materialized value of the FoldSink + val sum: Future[Int] = materialized.get(sink) + + //#materialization-in-steps + } + + "materialization runWith" in { + //#materialization-runWith + val source = Source(1 to 10) + val sink = Sink.fold[Int, Int](0)(_ + _) + + // materialize the flow, getting the Sinks materialized value + val sum: Future[Int] = source.runWith(sink) + //#materialization-runWith + } + + "compound source cannot be used as key" in { + //#compound-source-is-not-keyed-runWith + import scala.concurrent.duration._ + case object Tick + + val timer = Source(initialDelay = 1.second, interval = 1.seconds, tick = () ⇒ Tick) + + val timerCancel: Cancellable = Sink.ignore.runWith(timer) + timerCancel.cancel() + + val timerMap = timer.map(tick ⇒ "tick") + val _ = Sink.ignore.runWith(timerMap) // WRONG: returned type is not the timers Cancellable! + //#compound-source-is-not-keyed-runWith + + //#compound-source-is-not-keyed-run + // retain the materialized map, in order to retrieve the timers Cancellable + val materialized = timerMap.to(Sink.ignore).run() + val timerCancellable = materialized.get(timer) + timerCancellable.cancel() + //#compound-source-is-not-keyed-run + } +} diff --git a/akka-docs-dev/rst/scala/code/docs/stream/FlowGraphDocSpec.scala b/akka-docs-dev/rst/scala/code/docs/stream/FlowGraphDocSpec.scala index 847eb50666..e75e1e9d95 100644 --- a/akka-docs-dev/rst/scala/code/docs/stream/FlowGraphDocSpec.scala +++ b/akka-docs-dev/rst/scala/code/docs/stream/FlowGraphDocSpec.scala @@ -15,6 +15,9 @@ import akka.stream.scaladsl.Source import akka.stream.scaladsl.Zip import akka.stream.testkit.AkkaSpec +import scala.concurrent.Await +import scala.concurrent.duration._ + // TODO replace ⇒ with => and disable this intellij setting class FlowGraphDocSpec extends AkkaSpec { @@ -30,15 +33,13 @@ class FlowGraphDocSpec extends AkkaSpec { val in = Source(1 to 10) val out = Sink.ignore - val broadcast = Broadcast[Int] + val bcast = Broadcast[Int] val merge = Merge[Int] - val f1 = Flow[Int].map(_ + 10) - val f3 = Flow[Int].map(_.toString) - val f2 = Flow[Int].map(_ + 20) + val f1, f2, f3, f4 = Flow[Int].map(_ + 10) - in ~> broadcast ~> f1 ~> merge - broadcast ~> f2 ~> merge ~> f3 ~> out + in ~> f1 ~> bcast ~> f2 ~> merge ~> f3 ~> out + bcast ~> f4 ~> merge } //#simple-flow-graph //format: ON @@ -89,4 +90,31 @@ class FlowGraphDocSpec extends AkkaSpec { }.getMessage should include("must have at least 1 outgoing edge") } + "reusing a flow in a graph" in { + //#flow-graph-reusing-a-flow + + val topHeadSink = Sink.head[Int] + val bottomHeadSink = Sink.head[Int] + val sharedDoubler = Flow[Int].map(_ * 2) + + //#flow-graph-reusing-a-flow + + // format: OFF + val g = + //#flow-graph-reusing-a-flow + FlowGraph { implicit b ⇒ + import FlowGraphImplicits._ + val broadcast = Broadcast[Int] + Source.single(1) ~> broadcast + + broadcast ~> sharedDoubler ~> topHeadSink + broadcast ~> sharedDoubler ~> bottomHeadSink + } + //#flow-graph-reusing-a-flow + // format: ON + val map = g.run() + Await.result(map.get(topHeadSink), 300.millis) shouldEqual 2 + Await.result(map.get(bottomHeadSink), 300.millis) shouldEqual 2 + } + } diff --git a/akka-docs-dev/rst/scala/code/docs/stream/StreamDocSpec.scala b/akka-docs-dev/rst/scala/code/docs/stream/StreamDocSpec.scala deleted file mode 100644 index 91b86ffd42..0000000000 --- a/akka-docs-dev/rst/scala/code/docs/stream/StreamDocSpec.scala +++ /dev/null @@ -1,25 +0,0 @@ -/** - * Copyright (C) 2014 Typesafe Inc. - */ -package docs.stream - -import akka.stream.scaladsl.Flow -import akka.stream.scaladsl.FlowGraph -import akka.stream.scaladsl.FlowGraphImplicits -import akka.stream.scaladsl.Source -import akka.stream.scaladsl.Zip -import akka.stream.testkit.AkkaSpec - -// TODO replace ⇒ with => and disable this intellij setting -class StreamDocSpec extends AkkaSpec { - - implicit val ec = system.dispatcher - - //#imports - import akka.stream.FlowMaterializer - import akka.stream.scaladsl.Broadcast - //#imports - - implicit val mat = FlowMaterializer() - -} diff --git a/akka-docs-dev/rst/scala/code/docs/stream/StreamPartialFlowGraphDocSpec.scala b/akka-docs-dev/rst/scala/code/docs/stream/StreamPartialFlowGraphDocSpec.scala index 3c4de39310..231f494fa4 100644 --- a/akka-docs-dev/rst/scala/code/docs/stream/StreamPartialFlowGraphDocSpec.scala +++ b/akka-docs-dev/rst/scala/code/docs/stream/StreamPartialFlowGraphDocSpec.scala @@ -51,7 +51,9 @@ class StreamPartialFlowGraphDocSpec extends AkkaSpec { in3 ~> zip2.right zip2.out ~> out } + //#simple-partial-flow-graph // format: ON + //#simple-partial-flow-graph val resultSink = Sink.head[Int] diff --git a/akka-docs-dev/rst/scala/stream-quickstart.rst b/akka-docs-dev/rst/scala/stream-quickstart.rst new file mode 100644 index 0000000000..3751395b62 --- /dev/null +++ b/akka-docs-dev/rst/scala/stream-quickstart.rst @@ -0,0 +1,166 @@ +.. _quickstart-scala: +Quick Start: Reactive Tweets +============================ + +A typical use case for stream processing is consuming a live stream of data that we want to extract or aggregate some +other data from. In this example we'll consider consuming a stream of tweets and extracting information concerning Akka from them. + +We will also consider the problem inherent to all non-blocking streaming solutions – "*What if the subscriber is slower +to consume the live stream of data?*" i.e. it is unable to keep up with processing the live data. Traditionally the solution +is often to buffer the elements, but this can (and usually *will*) cause eventual buffer overflows and instability of such systems. +Instead Akka Streams depend on internal backpressure signals that allow to control what should happen in such scenarios. + +Here's the data model we'll be working with throughout the quickstart examples: + +.. includecode:: code/docs/stream/TwitterStreamQuickstartDocSpec.scala#model + +Transforming and consuming simple streams +----------------------------------------- +In order to prepare our environment by creating an :class:`ActorSystem` and :class:`FlowMaterializer`, +which will be responsible for materializing and running the streams we are about to create: + +.. includecode:: code/docs/stream/TwitterStreamQuickstartDocSpec.scala#materializer-setup + +The :class:`FlowMaterializer` can optionally take :class:`MaterializerSettings` which can be used to define +materialization properties, such as default buffer sizes (see also :ref:`stream-buffering-explained-scala`), the dispatcher to +be used by the pipeline etc. These can be overridden on an element-by-element basis or for an entire section, but this +will be discussed in depth in :ref:`stream-section-configuration`. + +Let's assume we have a stream of tweets readily available, in Akka this is expressed as a :class:`Source[Out]`: + +.. includecode:: code/docs/stream/TwitterStreamQuickstartDocSpec.scala#tweet-source + +Streams always start flowing from a :class:`Source[Out]` then can continue through :class:`Flow[In,Out]` elements or +more advanced graph elements to finally be consumed by a :class:`Sink[In]`. Both Sources and Flows provide stream operations +that can be used to transform the flowing data, a :class:`Sink` however does not since its the "end of stream" and its +behavior depends on the type of :class:`Sink` used. + +In our case let's say we want to find all twitter handles of users which tweet about ``#akka``, the operations should look +familiar to anyone who has used the Scala Collections library, however they operate on streams and not collections of data: + +.. includecode:: code/docs/stream/TwitterStreamQuickstartDocSpec.scala#authors-filter-map + +Finally in order to :ref:`materialize ` and run the stream computation we need to attach +the Flow to a :class:`Sink[T]` that will get the flow running. The simplest way to do this is to call +``runWith(sink)`` on a ``Source[Out]``. For convenience a number of common Sinks are predefined and collected as methods on +the :class:``Sink`` `companion object `_. +For now let's simply print each author: + +.. includecode:: code/docs/stream/TwitterStreamQuickstartDocSpec.scala#authors-foreachsink-println + +or by using the shorthand version (which are defined only for the most popular sinks such as :class:`FoldSink` and :class:`ForeachSink`): + +.. includecode:: code/docs/stream/TwitterStreamQuickstartDocSpec.scala#authors-foreach-println + +Materializing and running a stream always requires a :class:`FlowMaterializer` to be in implicit scope (or passed in explicitly, +like this: ``.run(mat)``). + +Flattening sequences in streams +------------------------------- +In the previous section we were working on 1:1 relationships of elements which is the most common case, but sometimes +we might want to map from one element to a number of elements and receive a "flattened" stream, similarly like ``flatMap`` +works on Scala Collections. In order to get a flattened stream of hashtags from our stream of tweets we can use the ``mapConcat`` +combinator: + +.. includecode:: code/docs/stream/TwitterStreamQuickstartDocSpec.scala#hashtags-mapConcat + +.. note:: + The name ``flatMap`` was consciously avoided due to its proximity with for-comprehensions and monadic composition. + It is problematic for two reasons: firstly, flattening by concatenation is often undesirable in bounded stream processing + due to the risk of deadlock (with merge being the preferred strategy), and secondly, the monad laws would not hold for + our implementation of flatMap (due to the liveness issues). + + Please note that the mapConcat requires the supplied function to return a strict collection (``f:Out⇒immutable.Seq[T]``), + whereas ``flatMap`` would have to operate on streams all the way through. + + +Broadcasting a stream +--------------------- +Now let's say we want to persist all hashtags, as well as all author names from this one live stream. +For example we'd like to write all author handles into one file, and all hashtags into another file on disk. +This means we have to split the source stream into 2 streams which will handle the writing to these different files. + +Elements that can be used to form such "fan-out" (or "fan-in") structures are referred to as "junctions" in Akka Streams. +One of these that we'll be using in this example is called :class:`Broadcast`, and it simply emits elements from its +input port to all of its output ports. + +Akka Streams intentionally separate the linear stream structures (Flows) from the non-linear, branching ones (FlowGraphs) +in order to offer the most convenient API for both of these cases. Graphs can express arbitrarily complex stream setups +at the expense of not reading as familiarly as collection transformations. It is also possible to wrap complex computation +graphs as Flows, Sinks or Sources, which will be explained in detail in :ref:`constructing-sources-sinks-flows-from-partial-graphs-scala`. +FlowGraphs are constructed like this: + +.. includecode:: code/docs/stream/TwitterStreamQuickstartDocSpec.scala#flow-graph-broadcast + +.. note:: + The ``~>`` (read as "edge", "via" or "to") operator is only available if ``FlowGraphImplicits._`` are imported. + Without this import you can still construct graphs using the ``builder.addEdge(from,[through,]to)`` method. + +As you can see, inside the :class:`FlowGraph` we use an implicit graph builder to mutably construct the graph +using the ``~>`` "edge operator" (also read as "connect" or "via" or "to"). Once we have the FlowGraph in the value ``g`` +*it is immutable, thread-safe, and freely shareable*. A graph can can be ``run()`` directly - assuming all +ports (sinks/sources) within a flow have been connected properly. It is possible to construct :class:`PartialFlowGraph` s +where this is not required but this will be covered in detail in :ref:`partial-flow-graph-scala`. + +As all Akka Streams elements, :class:`Broadcast` will properly propagate back-pressure to its upstream element. + +Back-pressure in action +----------------------- + +One of the main advantages of Akka Streams is that they *always* propagate back-pressure information from stream Sinks +(Subscribers) to their Sources (Publishers). It is not an optional feature, and is enabled at all times. To learn more +about the back-pressure protocol used by Akka Streams and all other Reactive Streams compatible implementations read +:ref:`back-pressure-explained-scala`. + +A typical problem applications (not using Akka Streams) like this often face is that they are unable to process the incoming data fast enough, +either temporarily or by design, and will start buffering incoming data until there's no more space to buffer, resulting +in either ``OutOfMemoryError`` s or other severe degradations of service responsiveness. With Akka Streams buffering can +and must be handled explicitly. For example, if we are only interested in the "*most recent tweets, with a buffer of 10 +elements*" this can be expressed using the ``buffer`` element: + +.. includecode:: code/docs/stream/TwitterStreamQuickstartDocSpec.scala#tweets-slow-consumption-dropHead + +The ``buffer`` element takes an explicit and required ``OverflowStrategy``, which defines how the buffer should react +when it receives another element element while it is full. Strategies provided include dropping the oldest element (``dropHead``), +dropping the entire buffer, signalling errors etc. Be sure to pick and choose the strategy that fits your use case best. + +Materialized values +------------------- +So far we've been only processing data using Flows and consuming it into some kind of external Sink - be it by printing +values or storing them in some external system. However sometimes we may be interested in some value that can be +obtained from the materialized processing pipeline. For example, we want to know how many tweets we have processed. +While this question is not as obvious to give an answer to in case of an infinite stream of tweets (one way to answer +this question in a streaming setting would to create a stream of counts described as "*up until now*, we've processed N tweets"), +but in general it is possible to deal with finite streams and come up with a nice result such as a total count of elements. + +First, let's write such an element counter using :class:`FoldSink` and then we'll see how it is possible to obtain materialized +values from a :class:`MaterializedMap` which is returned by materializing an Akka stream. We'll split execution into multiple +lines for the sake of explaining the concepts of ``Materializable`` elements and ``MaterializedType`` + +.. includecode:: code/docs/stream/TwitterStreamQuickstartDocSpec.scala#tweets-fold-count + +First, we prepare the :class:`FoldSink` which will be used to sum all ``Int`` elements of the stream. +Next we connect the ``tweets`` stream though a ``map`` step which converts each tweet into the number ``1``, +finally we connect the flow ``to`` the previously prepared Sink. Notice that this step does *not* yet materialize the +processing pipeline, it merely prepares the description of the Flow, which is now connected to a Sink, and therefore can +be ``run()``, as indicated by its type: :class:`RunnableFlow`. Next we call ``run()`` which uses the implicit :class:`FlowMaterializer` +to materialize and run the flow. The value returned by calling ``run()`` on a ``RunnableFlow`` or ``FlowGraph`` is ``MaterializedMap``, +which can be used to retrieve materialized values from the running stream. + +In order to extract an materialized value from a running stream it is possible to call ``get(Materializable)`` on a materialized map +obtained from materializing a flow or graph. Since ``FoldSink`` implements ``Materializable`` and implements the ``MaterializedType`` +as ``Future[Int]`` we can use it to obtain the :class:`Future` which when completed will contain the total length of our tweets stream. +In case of the stream failing, this future would complete with a Failure. + +The reason we have to ``get`` the value out from the materialized map, is because a :class:`RunnableFlow` may be reused +and materialized multiple times, because it is just the "blueprint" of the stream. This means that if we materialize a stream, +for example one that consumes a live stream of tweets within a minute, the materialized values for those two materializations +will be different, as illustrated by this example: + +.. includecode:: code/docs/stream/TwitterStreamQuickstartDocSpec.scala#tweets-runnable-flow-materialized-twice + +Many elements in Akka Streams provide materialized values which can be used for obtaining either results of computation or +steering these elements which will be discussed in detail in :ref:`stream-materialization-scala`. Summing up this section, now we know +what happens behind the scenes when we run this one-liner, which is equivalent to the multi line version above: + +.. includecode:: code/docs/stream/TwitterStreamQuickstartDocSpec.scala#tweets-fold-count-oneline diff --git a/akka-docs-dev/rst/scala/stream.rst b/akka-docs-dev/rst/scala/stream.rst index f7d8f98b5d..29f518924a 100644 --- a/akka-docs-dev/rst/scala/stream.rst +++ b/akka-docs-dev/rst/scala/stream.rst @@ -30,203 +30,144 @@ Motivation **TODO - write me** -Quick Start: Reactive Tweets -============================ - -A typical use case for stream processing is consuming a live stream of data that we want to extract or aggregate some -other data from. In this example we'll consider consuming a stream of tweets and extracting information concerning Akka from them. - -We will also consider the problem inherent to all non-blocking streaming solutions – "*What if the subscriber is slower -to consume the live stream of data?*" i.e. it is unable to keep up with processing the live data. Traditionally the solution -is often to buffer the elements, but this can (and usually *will*) cause eventual buffer overflows and instability of such systems. -Instead Akka Streams depend on internal backpressure signals that allow to control what should happen in such scenarios. - -Here's the data model we'll be working with throughout the quickstart examples: - -.. includecode:: code/docs/stream/TwitterStreamQuickstartDocSpec.scala#model - -Transforming and consuming simple streams ------------------------------------------ -In order to prepare our environment by creating an :class:`ActorSystem` and :class:`FlowMaterializer`, -which will be responsible for materializing and running the streams we are about to create: - -.. includecode:: code/docs/stream/TwitterStreamQuickstartDocSpec.scala#materializer-setup - -The :class:`FlowMaterializer` can optionally take :class:`MaterializerSettings` which can be used to define -materialization properties, such as default buffer sizes (see also :ref:`stream-buffering-explained-scala`), the dispatcher to -be used by the pipeline etc. These can be overridden on an element-by-element basis or for an entire section, but this -will be discussed in depth in :ref:`stream-section-configuration`. - -Let's assume we have a stream of tweets readily available, in Akka this is expressed as a :class:`Source[Out]`: - -.. includecode:: code/docs/stream/TwitterStreamQuickstartDocSpec.scala#tweet-source - -Streams always start flowing from a :class:`Source[Out]` then can continue through :class:`Flow[In,Out]` elements or -more advanced graph elements to finally be consumed by a :class:`Sink[In]`. Both Sources and Flows provide stream operations -that can be used to transform the flowing data, a :class:`Sink` however does not since its the "end of stream" and its -behavior depends on the type of :class:`Sink` used. - -In our case let's say we want to find all twitter handles of users which tweet about ``#akka``, the operations should look -familiar to anyone who has used the Scala Collections library, however they operate on streams and not collections of data: - -.. includecode:: code/docs/stream/TwitterStreamQuickstartDocSpec.scala#authors-filter-map - -Finally in order to :ref:`materialize ` and run the stream computation we need to attach -the Flow to a :class:`Sink[T]` that will get the flow running. The simplest way to do this is to call -``runWith(sink)`` on a ``Source[Out]``. For convenience a number of common Sinks are predefined and collected as methods on -the :class:``Sink`` `companion object `_. -For now let's simply print each author: - -.. includecode:: code/docs/stream/TwitterStreamQuickstartDocSpec.scala#authors-foreachsink-println - -or by using the shorthand version (which are defined only for the most popular sinks such as :class:`FoldSink` and :class:`ForeachSink`): - -.. includecode:: code/docs/stream/TwitterStreamQuickstartDocSpec.scala#authors-foreach-println - -Materializing and running a stream always requires a :class:`FlowMaterializer` to be in implicit scope (or passed in explicitly, -like this: ``.run(mat)``). - -Flattening sequences in streams -------------------------------- -In the previous section we were working on 1:1 relationships of elements which is the most common case, but sometimes -we might want to map from one element to a number of elements and receive a "flattened" stream, similarly like ``flatMap`` -works on Scala Collections. In order to get a flattened stream of hashtags from our stream of tweets we can use the ``mapConcat`` -combinator: - -.. includecode:: code/docs/stream/TwitterStreamQuickstartDocSpec.scala#hashtags-mapConcat - -.. note:: - The name ``flatMap`` was consciously avoided due to its proximity with for-comprehensions and monadic composition. - It is problematic for two reasons: firstly, flattening by concatenation is often undesirable in bounded stream processing - due to the risk of deadlock (with merge being the preferred strategy), and secondly, the monad laws would not hold for - our implementation of flatMap (due to the liveness issues). - - Please note that the mapConcat requires the supplied function to return a strict collection (``f:Out⇒immutable.Seq[T]``), - whereas ``flatMap`` would have to operate on streams all the way through. - - -Broadcasting a stream ---------------------- -Now let's say we want to persist all hashtags, as well as all author names from this one live stream. -For example we'd like to write all author handles into one file, and all hashtags into another file on disk. -This means we have to split the source stream into 2 streams which will handle the writing to these different files. - -Elements that can be used to form such "fan-out" (or "fan-in") structures are referred to as "junctions" in Akka Streams. -One of these that we'll be using in this example is called :class:`Broadcast`, and it simply emits elements from its -input port to all of its output ports. - -Akka Streams intentionally separate the linear stream structures (Flows) from the non-linear, branching ones (FlowGraphs) -in order to offer the most convenient API for both of these cases. Graphs can express arbitrarily complex stream setups -at the expense of not reading as familiarly as collection transformations. It is also possible to wrap complex computation -graphs as Flows, Sinks or Sources, which will be explained in detail in :ref:`constructing-sources-sinks-flows-from-partial-graphs-scala`. -FlowGraphs are constructed like this: - -.. includecode:: code/docs/stream/TwitterStreamQuickstartDocSpec.scala#flow-graph-broadcast - -.. note:: - The ``~>`` (read as "edge", "via" or "to") operator is only available if ``FlowGraphImplicits._`` are imported. - Without this import you can still construct graphs using the ``builder.addEdge(from,[through,]to)`` method. - -As you can see, inside the :class:`FlowGraph` we use an implicit graph builder to mutably construct the graph -using the ``~>`` "edge operator" (also read as "connect" or "via" or "to"). Once we have the FlowGraph in the value ``g`` -*it is immutable, thread-safe, and freely shareable*. A graph can can be ``run()`` directly - assuming all -ports (sinks/sources) within a flow have been connected properly. It is possible to construct :class:`PartialFlowGraph` s -where this is not required but this will be covered in detail in :ref:`partial-flow-graph-scala`. - -As all Akka streams elements, :class:`Broadcast` will properly propagate back-pressure to its upstream element. - -Back-pressure in action ------------------------ - -One of the main advantages of Akka streams is that they *always* propagate back-pressure information from stream Sinks -(Subscribers) to their Sources (Publishers). It is not an optional feature, and is enabled at all times. To learn more -about the back-pressure protocol used by Akka Streams and all other Reactive Streams compatible implementations read -:ref:`back-pressure-explained-scala`. - -A typical problem applications (not using Akka streams) like this often face is that they are unable to process the incoming data fast enough, -either temporarily or by design, and will start buffering incoming data until there's no more space to buffer, resulting -in either ``OutOfMemoryError`` s or other severe degradations of service responsiveness. With Akka streams buffering can -and must be handled explicitly. For example, if we are only interested in the "*most recent tweets, with a buffer of 10 -elements*" this can be expressed using the ``buffer`` element: - -.. includecode:: code/docs/stream/TwitterStreamQuickstartDocSpec.scala#tweets-slow-consumption-dropHead - -The ``buffer`` element takes an explicit and required ``OverflowStrategy``, which defines how the buffer should react -when it receives another element element while it is full. Strategies provided include dropping the oldest element (``dropHead``), -dropping the entire buffer, signalling errors etc. Be sure to pick and choose the strategy that fits your use case best. - -Materialized values -------------------- -So far we've been only processing data using Flows and consuming it into some kind of external Sink - be it by printing -values or storing them in some external system. However sometimes we may be interested in some value that can be -obtained from the materialized processing pipeline. For example, we want to know how many tweets we have processed. -While this question is not as obvious to give an answer to in case of an infinite stream of tweets (one way to answer -this question in a streaming setting would to create a stream of counts described as "*up until now*, we've processed N tweets"), -but in general it is possible to deal with finite streams and come up with a nice result such as a total count of elements. - -First, let's write such an element counter using :class:`FoldSink` and then we'll see how it is possible to obtain materialized -values from a :class:`MaterializedMap` which is returned by materializing an Akka stream. We'll split execution into multiple -lines for the sake of explaining the concepts of ``Materializable`` elements and ``MaterializedType`` - -.. includecode:: code/docs/stream/TwitterStreamQuickstartDocSpec.scala#tweets-fold-count - -First, we prepare the :class:`FoldSink` which will be used to sum all ``Int`` elements of the stream. -Next we connect the ``tweets`` stream though a ``map`` step which converts each tweet into the number ``1``, -finally we connect the flow ``to`` the previously prepared Sink. Notice that this step does *not* yet materialize the -processing pipeline, it merely prepares the description of the Flow, which is now connected to a Sink, and therefore can -be ``run()``, as indicated by its type: :class:`RunnableFlow`. Next we call ``run()`` which uses the implicit :class:`FlowMaterializer` -to materialize and run the flow. The value returned by calling ``run()`` on a ``RunnableFlow`` or ``FlowGraph`` is ``MaterializedMap``, -which can be used to retrieve materialized values from the running stream. - -In order to extract an materialized value from a running stream it is possible to call ``get(Materializable)`` on a materialized map -obtained from materializing a flow or graph. Since ``FoldSink`` implements ``Materializable`` and implements the ``MaterializedType`` -as ``Future[Int]`` we can use it to obtain the :class:`Future` which when completed will contain the total length of our tweets stream. -In case of the stream failing, this future would complete with a Failure. - -The reason we have to ``get`` the value out from the materialized map, is because a :class:`RunnableFlow` may be reused -and materialized multiple times, because it is just the "blueprint" of the stream. This means that if we materialize a stream, -for example one that consumes a live stream of tweets within a minute, the materialized values for those two materializations -will be different, as illustrated by this example: - -.. includecode:: code/docs/stream/TwitterStreamQuickstartDocSpec.scala#tweets-runnable-flow-materialized-twice - -Many elements in Akka streams provide materialized values which can be used for obtaining either results of computation or -steering these elements which will be discussed in detail in :ref:`stream-materialization-scala`. Summing up this section, now we know -what happens behind the scenes when we run this one-liner, which is equivalent to the multi line version above: - -.. includecode:: code/docs/stream/TwitterStreamQuickstartDocSpec.scala#tweets-fold-count-oneline - - Core concepts ============= -// TODO REWORD? This section explains the core types and concepts used in Akka Streams, from a more day-to-day use angle. -If we would like to get the big picture overview you may be interested in reading :ref:`stream-design`. +Everything in Akka Streams revolves around a number of core concepts which we introduce in detail in this section. + +Akka Streams provide a way for executing bounded processing pipelines, where bounds are expressed as the number of stream +elements in flight and in buffers at any given time. Please note that while this allows to estimate an limit memory use +it is not strictly bound to the size in memory of these elements. + +First we define the terminology which will be used though out the entire documentation: + +Stream + An active process that involves moving and transforming data. +Element + An element is the unit which is passed through the stream. All operations as well as back-pressure are expressed in + terms of elements. +Back-pressure + A means of flow-control, and most notably adjusting the speed of upstream sources to the consumption speeds of their sinks. + In the context of Akka Streams back-pressure is always understood as *non-blocking* and *asynchronous* +Processing Stage + The common name for all building blocks that build up a Flow or FlowGraph. + Examples of a processing stage would be Stage (:class:`PushStage`, :class:`PushPullStage`, :class:`StatefulStage`, + :class:`DetachedStage`), in terms of which operations like ``map()``, ``filter()`` and others are implemented. Sources, Flows and Sinks ------------------------ +Linear processing pipelines can be expressed in Akka Streams using the following three core abstractions: -// TODO: runnable flow, types - runWith +Source + A processing stage with *exactly one output*, emitting data elements in response to it's down-stream demand. +Sink + A processing stage with *exactly one input*, generating demand based on it's internal demand management strategy. +Flow + A processing stage which has *exactly one input and output*, which connects it's up and downstreams by (usually) + transforming the data elements flowing through it. +RunnableFlow + A Flow with has both ends "attached" to a Source and Sink respectively, and is ready to be ``run()``. -// TODO: talk about how creating and sharing a ``Flow.of[String]`` is useful etc. +It is important to remember that while constructing these processing pipelines by connecting their different processing +stages no data will flow through it until it is materialized. Materialization is the process of allocating all resources +needed to run the computation described by a Flow (in Akka Streams this will often involve starting up Actors). +Thanks to Flows being simply a description of the processing pipeline they are *immutable, thread-safe, and freely shareable*, +which means that it is for example safe to share send between actors–to have one actor prepare the work, and then have it +be materialized at some completely different place in the code. + +In order to be able to run a ``Flow[In,Out]`` it must be connected to a ``Sink[In]`` *and* ``Source[Out]`` of matching types. +It is also possible to directly connect a :class:`Sink` to a :class:`Source`. + +.. includecode:: code/docs/stream/FlowDocSpec.scala#materialization-in-steps + +The :class:`MaterializedMap` can be used to get materialized values of both sinks and sources out from the running +stream. In general, a stream can expose multiple materialized values, however the very common case of only wanting to +get back a Sinks (in order to read a result) or Sources (in order to cancel or influence it in some way) materialized +values has a small convenience method called ``runWith()``. It is available for ``Sink`` or ``Source`` and ``Flow``, with respectively, +requiring the user to supply a ``Source`` (in order to run a ``Sink``), a ``Sink`` (in order to run a ``Source``) and +both a ``Source`` and a ``Sink`` (in order to run a ``Flow``, since it has neither attached yet). + +.. includecode:: code/docs/stream/FlowDocSpec.scala#materialization-runWith + +It is worth pointing out that since processing stages are *immutable*, connecting them returns a new processing stage, +instead of modifying the existing instance, so while construction long flows, remember to assign the new value to a variable or run it: + +.. includecode:: code/docs/stream/FlowDocSpec.scala#source-immutable .. note:: - By default Akka streams elements support **exactly one** down-stream element. - Making fan-out (supporting multiple downstream elements) an explicit opt-in feature allows default stream elements to + By default Akka Streams elements support **exactly one** downstream processing stage. + Making fan-out (supporting multiple downstream processing stages) an explicit opt-in feature allows default stream elements to be less complex and more efficient. Also it allows for greater flexibility on *how exactly* to handle the multicast scenarios, - by providing named fan-out elements such as broadcast (signalls all down-stream elements) or balance (signals one of available down-stream elements). + by providing named fan-out elements such as broadcast (signals all down-stream elements) or balance (signals one of available down-stream elements). + +In the above example we used the ``runWith`` method, which both materializes the stream and returns the materialized value +of the given sink or source. .. _back-pressure-explained-scala: Back-pressure explained ----------------------- +Akka Streams implements an asynchronous non-blocking back-pressure protocol standardised by the Reactive Streams +specification, which Akka is a founding member of. -// TODO: explain the protocol and how it performs in slow-pub/fast-sub and fast-pub/slow-sub scenarios +As library user you do not have to write any explicit back-pressure handling code in order for it to work - it is built +and dealt with automatically by all of the provided Akka Streams processing stages. However is possible to include +explicit buffers with overflow strategies that can influence the behaviour of the stream. This is especially important +in complex processing graphs which may even sometimes even contain loops (which *must* be treated with very special +care, as explained in :ref:`cycles-scala`). -Backpressure when Fast Publisher and Slow Subscriber ----------------------------------------------------- +The back pressure protocol is defined in terms of the number of elements a downstream ``Subscriber`` is able to receive, +referred to as ``demand``. This demand is the *number of elements* receiver of the data, referred to as ``Subscriber`` +in Reactive Streams, and implemented by ``Sink`` in Akka Streams is able to safely consume at this point in time. +The source of data referred to as ``Publisher`` in Reactive Streams terminology and implemented as ``Source`` in Akka +Streams guarantees that it will never emit more elements than the received total demand for any given ``Subscriber``. -// TODO: Write me +.. note:: + The Reactive Streams specification defines its protocol in terms of **Publishers** and **Subscribers**. + These types are *not* meant to be user facing API, instead they serve as the low level building blocks for + different Reactive Streams implementations. + + Akka Streams implements these concepts as **Sources**, **Flows** (referred to as **Processor** in Reactive Streams) + and **Sinks** without exposing the Reactive Streams interfaces directly. + If you need to inter-op between different read :ref:`integration-with-Reactive-Streams-enabled-libraries`. + +The mode in which Reactive Streams back-pressure works can be colloquially described as "dynamic push / pull mode", +since it will switch between push or pull based back-pressure models depending on if the downstream is able to cope +with the upstreams production rate or not. + +To illustrate further let us consider both problem situations and how the back-pressure protocol handles them: + +Slow Publisher, fast Subscriber +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +This is the happy case of course–we do not need to slow down the Publisher in this case. However signalling rates are +rarely constant and could change at any point in time, suddenly ending up in a situation where the Subscriber is now +slower than the Publisher. In order to safeguard from these situations, the back-pressure protocol must still be enabled +during such situations, however we do not want to pay a high penalty for this safety net being enabled. + +The Reactive Streams protocol solves this by asynchronously signalling from the Subscriber to the Publisher +`Request(n:Int)` signals. The protocol guarantees that the Publisher will never signal *more* than the demand it was +signalled. Since the Subscriber however is currently faster, it will be signalling these Request messages at a higher +rate (and possibly also batching together the demand - requesting multiple elements in one Request signal). This means +that the Publisher should not ever have to wait (be back-pressured) with publishing its incoming elements. + +As we can see, in this scenario we effectively operate in so called push-mode since the Publisher can continue producing +elements as fast as it can, since the pending demand will be recovered just-in-time while it is emitting elements. + +Fast Publisher, slow Subscriber +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +This is the case when back-pressuring the ``Publisher`` is required, because the ``Subscriber`` is not able to cope with +the rate at which its upstream would like to emit data elements. + +Since the ``Publisher`` is not allowed to signal more elements than the pending demand signalled by the ``Subscriber``, +it will have to abide to this back-pressure by applying one of the below strategies: + +- not generate elements, if it is able to control their production rate, +- try buffering the elements in a *bounded* manner until more demand is signalled, +- drop elements until more demand is signalled, +- tear down the stream if unable to apply any of the above strategies. + +As we can see, this scenario effectively means that the ``Subscriber`` will *pull* the elements from the Publisher– +this mode of operation is referred to as pull-based back-pressure. In depth ======== @@ -258,24 +199,29 @@ Stream Materialization ---------------------- **TODO - write me (feel free to move around as well)** -When constructing flows and graphs in Akka streams think of them as preparing a blueprint, an execution plan. +When constructing flows and graphs in Akka Streams think of them as preparing a blueprint, an execution plan. Stream materialization is the process of taking a stream description (the graph) and allocating all the necessary resources -it needs in order to run. In the case of Akka streams this often means starting up Actors which power the processing, +it needs in order to run. In the case of Akka Streams this often means starting up Actors which power the processing, but is not restricted to that - it could also mean opening files or socket connections etc. – depending on what the stream needs. Materialization is triggered at so called "terminal operations". Most notably this includes the various forms of the ``run()`` and ``runWith()`` methods defined on flow elements as well as a small number of special syntactic sugars for running with well-known sinks, such as ``foreach(el => )`` (being an alias to ``runWith(Sink.foreach(el => ))``. +Materialization is currently performed synchronously on the materializing thread. +Tha actual stream processing is handled by :ref:`Actors actor-scala` started up during the streams materialization, +which will be running on the thread pools they have been configured to run on - which defaults to the dispatcher set in +:class:`MaterializationSettings` while constructing the :class:`FlowMaterializer`. + +.. note:: + Reusing *instances* of linear computation stages (Source, Sink, Flow) inside FlowGraphs is legal, + yet will materialize that stage multiple times. + + MaterializedMap ^^^^^^^^^^^^^^^ **TODO - write me (feel free to move around as well)** -Working with rates ------------------- - -**TODO - write me (feel free to move around as well)** - Optimizations ^^^^^^^^^^^^^ // TODO: not really to be covered right now, right? @@ -291,13 +237,13 @@ Section configuration --------------------- // TODO: it is possible to configure sections of a graph - +.. _working-with-graphs-scala: Working with Graphs =================== -Akka streams are unique in the way they handle and expose computation graphs - instead of hiding the fact that the -processing pipeline is in fact a graph in a purely "fluent" DSL, graph operations are written in a DSL that graphically -resembles and embraces the fact that the built pipeline is in fact a Graph. In this section we'll dive into the multiple -ways of constructing and re-using graphs, as well as explain common pitfalls and how to avoid them. +In Akka Streams computation graphs are not expressed using a fluent DSL like linear computations are, instead they are +written in a more graph-resembling DSL which aims to make translating graph drawings (e.g. from notes taken +from design discussions, or illustrations in protocol specifications) to and from code simpler. In this section we'll +dive into the multiple ways of constructing and re-using graphs, as well as explain common pitfalls and how to avoid them. Graphs are needed whenever you want to perform any kind of fan-in ("multiple inputs") or fan-out ("multiple outputs") operations. Considering linear Flows to be like roads, we can picture graph operations as junctions: multiple flows being connected at a single point. @@ -313,44 +259,60 @@ Flow graphs are built from simple Flows which serve as the linear connections wi which serve as fan-in and fan-out points for flows. Thanks to the junctions having meaningful types based on their behaviour and making them explicit elements these elements should be rather straight forward to use. -Akka streams currently provides these junctions: +Akka Streams currently provides these junctions: * **Fan-out** - - :class:`Broadcast` – (1 input, n outputs) signals each output given an input signal, - - :class:`Balance` – (1 input => n outputs), signals one of its output ports given an input signal, - - :class:`UnZip` – (1 input => 2 outputs), which is a specialized element which is able to split a stream of ``(A,B)`` into two streams one type ``A`` and one of type ``B``, - - :class:`FlexiRoute` – (1 input, n outputs), which enables writing custom fan out elements using a simple DSL, + - ``Broadcast[T]`` – (1 input, n outputs) signals each output given an input signal, + - ``Balance[T]`` – (1 input => n outputs), signals one of its output ports given an input signal, + - ``UnZip[A,B]`` – (1 input => 2 outputs), which is a specialized element which is able to split a stream of ``(A,B)`` tuples into two streams one type ``A`` and one of type ``B``, + - ``FlexiRoute[In]`` – (1 input, n outputs), which enables writing custom fan out elements using a simple DSL, * **Fan-in** - - :class:`Merge` – (n inputs , 1 output), picks signals randomly from inputs pushing them one by one to its output, - - :class:`MergePreferred` – like :class:`Merge` but if elements are available on ``preferred`` port, it picks from it, otherwise randomly from ``others``, - - :class:`ZipWith` – (n inputs (defined upfront), 1 output), which takes a function of n inputs that, given all inputs are signalled, transforms and emits 1 output, - + :class:`Zip` – (2 inputs, 1 output), which is a :class:`ZipWith` specialised to zipping input streams of ``A`` and ``B`` into an ``(A,B)`` stream, - - :class:`Concat` – (2 inputs, 1 output), which enables to concatenate streams (first consume one, then the second one), thus the order of which stream is ``first`` and which ``second`` matters, - - :class:`FlexiMerge` – (n inputs, 1 output), which enables writing custom fan out elements using a simple DSL. + - ``Merge[In]`` – (n inputs , 1 output), picks signals randomly from inputs pushing them one by one to its output, + - ``MergePreferred[In]`` – like :class:`Merge` but if elements are available on ``preferred`` port, it picks from it, otherwise randomly from ``others``, + - ``ZipWith[A,B,...,Out]`` – (n inputs (defined upfront), 1 output), which takes a function of n inputs that, given all inputs are signalled, transforms and emits 1 output, + + ``Zip[A,B,Out]`` – (2 inputs, 1 output), which is a :class:`ZipWith` specialised to zipping input streams of ``A`` and ``B`` into an ``(A,B)`` tuple stream, + - ``Concat[T]`` – (2 inputs, 1 output), which enables to concatenate streams (first consume one, then the second one), thus the order of which stream is ``first`` and which ``second`` matters, + - ``FlexiMerge[Out]`` – (n inputs, 1 output), which enables writing custom fan out elements using a simple DSL. One of the goals of the FlowGraph DSL is to look similar to how one would draw a graph on a whiteboard, so that it is simple to translate a design from whiteboard to code and be able to relate those two. Let's illustrate this by translating -the below hand drawn graph into Akka streams: +the below hand drawn graph into Akka Streams: .. image:: ../images/simple-graph-example.png Such graph is simple to translate to the Graph DSL since each linear element corresponds to a :class:`Flow`, and each circle corresponds to either a :class:`Junction` or a :class:`Source` or :class:`Sink` if it is beginning -or ending a :class:`Flow`. +or ending a :class:`Flow`. Junctions must always be created with defined type parameters, as otherwise the ``Nothing`` type +will be inferred and .. includecode:: code/docs/stream/FlowGraphDocSpec.scala#simple-flow-graph +.. note:: + Junction *reference equality* defines *graph node equality* (i.e. the same merge *instance* used in a FlowGraph + refers to the same location in the resulting graph). + Notice the ``import FlowGraphImplicits._`` which brings into scope the ``~>`` operator (read as "edge", "via" or "to"). It is also possible to construct graphs without the ``~>`` operator in case you prefer to use the graph builder explicitly: .. includecode:: code/docs/stream/FlowGraphDocSpec.scala#simple-flow-graph-no-implicits +By looking at the snippets above, it should be apparent that **the** :class:`b:FlowGraphBuilder` **object is mutable**. +It is also used (implicitly) by the ``~>`` operator, also making it a mutable operation as well. +The reason for this design choice is to enable simpler creation of complex graphs, which may even contain cycles. +Once the FlowGraph has been constructed though, the :class:`FlowGraph` instance *is immutable, thread-safe, and freely shareable*. +Linear Flows however are always immutable and appending an operation to a Flow always returns a new Flow instance. +This means that you can safely re-use one given Flow in multiple places in a processing graph. In the example below +we prepare a graph that consists of two parallel streams, in which we re use the same instance of :class:`Flow`, +yet it will properly be materialized as two connections between the corresponding Sources and Sinks: + +.. includecode:: code/docs/stream/FlowGraphDocSpec.scala#flow-graph-reusing-a-flow + .. _partial-flow-graph-scala: Constructing and combining Partial Flow Graphs ---------------------------------------------- Sometimes it is not possible (or needed) to construct the entire computation graph in one place, but instead construct -all of it is different phases in different places and in the end connect them all into a complete graph and run it. +all of its different phases in different places and in the end connect them all into a complete graph and run it. This can be achieved using :class:`PartialFlowGraph`. The reason of representing it as a different type is that a :class:`FlowGraph` requires all ports to be connected, and if they are not it will throw an exception at construction @@ -402,17 +364,24 @@ For defining a ``Flow[T]`` we need to expose both an undefined source and sink: .. includecode:: code/docs/stream/StreamPartialFlowGraphDocSpec.scala#flow-from-partial-flow-graph -Dealing with cycles, deadlocks ------------------------------- -// TODO: why to avoid cycles, how to enable if you really need to +Stream ordering +=============== +In Akka Streams almost all computation stages *preserve input order* of elements, this means that if inputs ``{IA1,IA2,...,IAn}`` +"cause" outputs ``{OA1,OA2,...,OAk}`` and inputs ``{IB1,IB2,...,IBm}`` "cause" outputs ``{OB1,OB2,...,OBl}`` and all of +``IAi`` happened before all ``IBi`` then ``OAi`` happens before ``OBi``. -// TODO: problem cases, expand-conflate, expand-filter +This property is even uphold by async operations such as ``mapAsync``, however an unordered version exists +called ``mapAsyncUnordered`` which does not preserve this ordering. -// TODO: working with rate +However, in the case of Junctions which handle multiple input streams (e.g. :class:`Merge`) the output order is, +in general, *not defined* for elements arriving on different input ports, that is a merge-like operation may emit ``Ai`` +before emitting ``Bi``, and it is up to its internal logic to decide the order of emitted elements. Specialized elements +such as ``Zip`` however *do guarantee* their outputs order, as each output element depends on all upstream elements having +been signalled already–thus the ordering in the case of zipping is defined by this property. -// TODO: custom processing - -// TODO: stages and flexi stuff +If you find yourself in need of fine grained control over order of emitted elements in fan-in +scenarios consider using :class:`MergePreferred` or :class:`FlexiMerge` - which gives you full control over how the +merge is performed. Streaming IO ============ @@ -451,7 +420,6 @@ Integrating with Actors ActorPublisher ^^^^^^^^^^^^^^ - ActorSubscriber ^^^^^^^^^^^^^^^