diff --git a/akka-docs-dev/rst/scala/code/docs/stream/FlowDocSpec.scala b/akka-docs-dev/rst/scala/code/docs/stream/FlowDocSpec.scala index e30e2c7c11..a5108e38ef 100644 --- a/akka-docs-dev/rst/scala/code/docs/stream/FlowDocSpec.scala +++ b/akka-docs-dev/rst/scala/code/docs/stream/FlowDocSpec.scala @@ -4,10 +4,7 @@ package docs.stream import akka.actor.Cancellable -import akka.stream.scaladsl.MaterializedMap -import akka.stream.scaladsl.RunnableFlow -import akka.stream.scaladsl.Sink -import akka.stream.scaladsl.Source +import akka.stream.scaladsl._ import akka.stream.testkit.AkkaSpec import concurrent.Future @@ -61,6 +58,20 @@ class FlowDocSpec extends AkkaSpec { //#materialization-runWith } + "materializedMap is unique" in { + //#stream-reuse + // connect the Source to the Sink, obtaining a RunnableFlow + val sink = Sink.fold[Int, Int](0)(_ + _) + val runnable: RunnableFlow = Source(1 to 10).to(sink) + + // get the materialized value of the FoldSink + val sum1: Future[Int] = runnable.run().get(sink) + val sum2: Future[Int] = runnable.run().get(sink) + + // sum1 and sum2 are different Futures! + //#stream-reuse + } + "compound source cannot be used as key" in { //#compound-source-is-not-keyed-runWith import scala.concurrent.duration._ @@ -82,4 +93,50 @@ class FlowDocSpec extends AkkaSpec { timerCancellable.cancel() //#compound-source-is-not-keyed-run } + + "creating sources, sinks" in { + //#source-sink + // Create a source from an Iterable + Source(List(1, 2, 3)) + + // Create a source form a Future + Source(Future.successful("Hello Streams!")) + + // Create a source from a single element + Source.single("only one element") + + // an empty source + Source.empty + + // Sink that folds over the stream and returns a Future + // of the final result in the MaterializedMap + Sink.fold[Int, Int](0)(_ + _) + + // Sink that returns a Future in the MaterializedMap, + // containing the first element of the stream + Sink.head + + // A Sink that consumes a stream without doing anything with the elements + Sink.ignore + + // A Sink that executes a side-effecting call for every element of the stream + Sink.foreach[String](println(_)) + //#source-sink + } + + "various ways of connecting source, sink, flow" in { + //#flow-connecting + // Explicitly creating and wiring up a Source, Sink and Flow + Source(1 to 6).via(Flow[Int].map(_ * 2)).to(Sink.foreach(println(_))) + + // Starting from a Source + val source = Source(1 to 6).map(_ * 2) + source.to(Sink.foreach(println(_))) + + // Starting from a Sink + val sink: Sink[Int] = Flow[Int].map(_ * 2).to(Sink.foreach(println(_))) + Source(1 to 6).to(sink) + + //#flow-connecting + } } diff --git a/akka-docs-dev/rst/scala/stream-flows-and-basics.rst b/akka-docs-dev/rst/scala/stream-flows-and-basics.rst index 4c8750cdea..f767bc93d7 100644 --- a/akka-docs-dev/rst/scala/stream-flows-and-basics.rst +++ b/akka-docs-dev/rst/scala/stream-flows-and-basics.rst @@ -7,58 +7,66 @@ Basics and working with Flows Core concepts ============= -Everything in Akka Streams revolves around a number of core concepts which we introduce in detail in this section. +Akka Streams is a library to process and transfer a sequence of elements using bounded buffer space. This +latter property is what we refer to as *boundedness* and it is the defining feature of Akka Streams. Translated to +everyday terms it is possible to express a chain (or as we see later, graphs) of processing entities, each executing +independently (and possibly concurrently) from the others while only buffering a limited number of elements at any given +time. This property of bounded buffers is one of the differences from the actor model, where each actor usually has +an unbounded, or a bounded, but dropping mailbox. Akka Stream processing entities have bounded "mailboxes" that +do not drop. -Akka Streams provide a way for executing bounded processing pipelines, where bounds are expressed as the number of stream -elements in flight and in buffers at any given time. Please note that while this allows to estimate an limit memory use -it is not strictly bound to the size in memory of these elements. - -First we define the terminology which will be used though out the entire documentation: +Before we move on, let's define some basic terminology which will be used though out the entire documentation: Stream An active process that involves moving and transforming data. Element - An element is the unit which is passed through the stream. All operations as well as back-pressure are expressed in - terms of elements. + An element is the processing unit of streams. All operations transform and transfer elements from upstream to + downstream. Buffer sizes are always expressed as number of elements independently form the actual size of the elements. Back-pressure - A means of flow-control, and most notably adjusting the speed of upstream sources to the consumption speeds of their sinks. + A means of flow-control, a way for consumers of data to notify a producer about their current availability, effectively + slowing down the upstream source to match their consumption speeds. In the context of Akka Streams back-pressure is always understood as *non-blocking* and *asynchronous* Processing Stage The common name for all building blocks that build up a Flow or FlowGraph. - Examples of a processing stage would be Stage (:class:`PushStage`, :class:`PushPullStage`, :class:`StatefulStage`, - :class:`DetachedStage`), in terms of which operations like ``map()``, ``filter()`` and others are implemented. + Examples of a processing stage would be operations like ``map()``, ``filter()``, stages added by ``transform()`` + (:class:`PushStage`, :class:`PushPullStage`, :class:`StatefulStage`) and graph junctions like ``Merge`` or ``Broadcast``. -Sources, Flows and Sinks ------------------------- +Defining and running streams +---------------------------- Linear processing pipelines can be expressed in Akka Streams using the following three core abstractions: Source - A processing stage with *exactly one output*, emitting data elements in response to it's down-stream demand. + A processing stage with *exactly one output*, emitting data elements whenever downstream processing stages are + ready to receive them. Sink - A processing stage with *exactly one input*, generating demand based on it's internal demand management strategy. + A processing stage with *exactly one input*, requesting and accepting data elements possibly slowing down the upstream + producer of elements Flow - A processing stage which has *exactly one input and output*, which connects it's up and downstreams by (usually) + A processing stage which has *exactly one input and output*, which connects its up- and downstreams by transforming the data elements flowing through it. RunnableFlow - A Flow with has both ends "attached" to a Source and Sink respectively, and is ready to be ``run()``. + A Flow that has both ends "attached" to a Source and Sink respectively, and is ready to be ``run()``. -It is important to remember that while constructing these processing pipelines by connecting their different processing -stages no data will flow through it until it is materialized. Materialization is the process of allocating all resources -needed to run the computation described by a Flow (in Akka Streams this will often involve starting up Actors). -Thanks to Flows being simply a description of the processing pipeline they are *immutable, thread-safe, and freely shareable*, -which means that it is for example safe to share send between actors–to have one actor prepare the work, and then have it -be materialized at some completely different place in the code. +It is possible to attach a ``Flow`` to a ``Source`` resulting in a composite source, and it is also possible to prepend +a ``Flow`` to a ``Sink`` to get a new sink. After a stream is properly terminated by having both a source and a sink, +it will be represented by the ``RunnableFlow`` type, indicating that it is ready to be executed. -In order to be able to run a ``Flow[In,Out]`` it must be connected to a ``Sink[In]`` *and* ``Source[Out]`` of matching types. -It is also possible to directly connect a :class:`Sink` to a :class:`Source`. +It is important to remember that even after constructing the ``RunnableFlow`` by connecting all the source, sink and +different processing stages, no data will flow through it until it is materialized. Materialization is the process of +allocating all resources needed to run the computation described by a Flow (in Akka Streams this will often involve +starting up Actors). Thanks to Flows being simply a description of the processing pipeline they are *immutable, +thread-safe, and freely shareable*, which means that it is for example safe to share and send them between actors, to have +one actor prepare the work, and then have it be materialized at some completely different place in the code. .. includecode:: code/docs/stream/FlowDocSpec.scala#materialization-in-steps -The :class:`MaterializedMap` can be used to get materialized values of both sinks and sources out from the running -stream. In general, a stream can expose multiple materialized values, however the very common case of only wanting to -get back a Sinks (in order to read a result) or Sources (in order to cancel or influence it in some way) materialized -values has a small convenience method called ``runWith()``. It is available for ``Sink`` or ``Source`` and ``Flow``, with respectively, -requiring the user to supply a ``Source`` (in order to run a ``Sink``), a ``Sink`` (in order to run a ``Source``) and +After running (materializing) the ``RunnableFlow`` we get a special container object, the ``MaterializedMap``. Both +sources and sinks are able to put specific objects into this map. Whether they put something in or not is implementation +dependent. For example a ``FoldSink`` will make a ``Future`` available in this map which will represent the result +of the folding process over the stream. In general, a stream can expose multiple materialized values, +but it is quite common to be interested in only the value of the Source or the Sink in the stream. For this reason +there is a convenience method called ``runWith()`` available for ``Sink``, ``Source`` or ``Flow`` requiring, respectively, +a supplied ``Source`` (in order to run a ``Sink``), a ``Sink`` (in order to run a ``Source``) or both a ``Source`` and a ``Sink`` (in order to run a ``Flow``, since it has neither attached yet). .. includecode:: code/docs/stream/FlowDocSpec.scala#materialization-runWith @@ -77,22 +85,43 @@ instead of modifying the existing instance, so while construction long flows, re In the above example we used the ``runWith`` method, which both materializes the stream and returns the materialized value of the given sink or source. +Since a stream can be materialized multiple times the ``MaterializedMap`` returned is different for each materialization. +In the example below we create two running materialized instance of the stream that we described in the ``runnable`` +variable, and both materializations give us a different ``Future`` from the map even though we used the same ``sink`` +to refer to the future: + +.. includecode:: code/docs/stream/FlowDocSpec.scala#stream-reuse + +Defining sources, sinks and flows +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +The objects :class:`Source` and :class:`Sink` define various ways to create sources and sinks of elements. The following +examples show some of the useful constructs (refer to the API documentation for more details): + +.. includecode:: code/docs/stream/FlowDocSpec.scala#source-sink + +There are various ways to wire up different parts of a stream, the following examples show some of the available options. + +.. includecode:: code/docs/stream/FlowDocSpec.scala#flow-connecting + + .. _back-pressure-explained-scala: Back-pressure explained ----------------------- -Akka Streams implements an asynchronous non-blocking back-pressure protocol standardised by the Reactive Streams +Akka Streams implements an asynchronous non-blocking back-pressure protocol standardised by the `Reactive Streams`_ specification, which Akka is a founding member of. -As library user you do not have to write any explicit back-pressure handling code in order for it to work - it is built -and dealt with automatically by all of the provided Akka Streams processing stages. However is possible to include -explicit buffers with overflow strategies that can influence the behaviour of the stream. This is especially important -in complex processing graphs which may even sometimes even contain loops (which *must* be treated with very special -care, as explained in :ref:`cycles-scala`). +.. _Reactive Streams: http://reactive-streams.org/ -The back pressure protocol is defined in terms of the number of elements a downstream ``Subscriber`` is able to receive, -referred to as ``demand``. This demand is the *number of elements* receiver of the data, referred to as ``Subscriber`` -in Reactive Streams, and implemented by ``Sink`` in Akka Streams is able to safely consume at this point in time. +The user of the library does not have to write any explicit back-pressure handling code — it is built in +and dealt with automatically by all of the provided Akka Streams processing stages. It is possible however to add +explicit buffer stages with overflow strategies that can influence the behaviour of the stream. This is especially important +in complex processing graphs which may even sometimes even contain loops (which *must* be treated with very special +care, as explained in :ref:`graph-cycles-scala`). + +The back pressure protocol is defined in terms of the number of elements a downstream ``Subscriber`` is able to receive +and buffer, referred to as ``demand``. The source of data referred to as ``Publisher`` in Reactive Streams terminology and implemented as ``Source`` in Akka Streams guarantees that it will never emit more elements than the received total demand for any given ``Subscriber``. @@ -103,7 +132,7 @@ Streams guarantees that it will never emit more elements than the received total Akka Streams implements these concepts as **Sources**, **Flows** (referred to as **Processor** in Reactive Streams) and **Sinks** without exposing the Reactive Streams interfaces directly. - If you need to inter-op between different read :ref:`integration-with-Reactive-Streams-enabled-libraries`. + If you need to integrate with other Reactive Stream libraries read :ref:`reactive-streams-integration-scala`. The mode in which Reactive Streams back-pressure works can be colloquially described as "dynamic push / pull mode", since it will switch between push or pull based back-pressure models depending on if the downstream is able to cope @@ -144,6 +173,7 @@ As we can see, this scenario effectively means that the ``Subscriber`` will *pul this mode of operation is referred to as pull-based back-pressure. .. _stream-materialization-scala: + Stream Materialization ---------------------- **TODO - write me (feel free to move around as well)** diff --git a/akka-docs-dev/rst/scala/stream-graphs.rst b/akka-docs-dev/rst/scala/stream-graphs.rst index 7f75a05251..6587bfc9a7 100644 --- a/akka-docs-dev/rst/scala/stream-graphs.rst +++ b/akka-docs-dev/rst/scala/stream-graphs.rst @@ -131,6 +131,8 @@ For defining a ``Flow[T]`` we need to expose both an undefined source and sink: .. includecode:: code/docs/stream/StreamPartialFlowGraphDocSpec.scala#flow-from-partial-flow-graph +.. _graph-cycles-scala: + Graph cycles, liveness and deadlocks ------------------------------------ diff --git a/akka-docs-dev/rst/scala/stream-integrations.rst b/akka-docs-dev/rst/scala/stream-integrations.rst index c5c75208f3..e6af709ced 100644 --- a/akka-docs-dev/rst/scala/stream-integrations.rst +++ b/akka-docs-dev/rst/scala/stream-integrations.rst @@ -312,6 +312,8 @@ The numbers in parenthesis illustrates how many calls that are in progress at the same time. Here the downstream demand and thereby the number of concurrent calls are limited by the buffer size (4) of the :class:`MaterializerSettings`. +.. _reactive-streams-integration-scala: + Integrating with Reactive Streams =================================