diff --git a/akka-docs-dev/rst/java/stream-flows-and-basics.rst b/akka-docs-dev/rst/java/stream-flows-and-basics.rst index c41bf266e1..ea462544d5 100644 --- a/akka-docs-dev/rst/java/stream-flows-and-basics.rst +++ b/akka-docs-dev/rst/java/stream-flows-and-basics.rst @@ -4,6 +4,8 @@ Basics and working with Flows ############################# +.. _core-concepts-java: + Core concepts ============= @@ -41,6 +43,7 @@ will use asynchronous means to slow down a fast producer, without blocking its t design, since entities that need to wait (a fast producer waiting on a slow consumer) will not block the thread but can hand it back for further use to an underlying thread-pool. +.. _defining-and-running-streams-java: Defining and running streams ---------------------------- diff --git a/akka-docs-dev/rst/java/stream-quickstart.rst b/akka-docs-dev/rst/java/stream-quickstart.rst index dce68d5817..4ecad5d158 100644 --- a/akka-docs-dev/rst/java/stream-quickstart.rst +++ b/akka-docs-dev/rst/java/stream-quickstart.rst @@ -17,8 +17,17 @@ Here's the data model we'll be working with throughout the quickstart examples: .. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/TwitterStreamQuickstartDocTest.java#model + +.. note:: + If you would like to get an overview of the used vocabulary first instead of diving head-first + into an actual example you can have a look at the :ref:`core-concepts-java` and :ref:`defining-and-running-streams-java` + sections of the docs, and then come back to this quickstart to see it all pieced together into a simple example application. + Transforming and consuming simple streams ----------------------------------------- +The example application we will be looking at is a simple Twitter fed stream from which we'll want to extract certain information, +like for example finding all twitter handles of users who tweet about ``#akka``. + In order to prepare our environment by creating an :class:`ActorSystem` and :class:`ActorMaterializer`, which will be responsible for materializing and running the streams we are about to create: @@ -28,41 +37,43 @@ The :class:`ActorMaterializer` can optionally take :class:`ActorMaterializerSett materialization properties, such as default buffer sizes (see also :ref:`stream-buffers-java`), the dispatcher to be used by the pipeline etc. These can be overridden ``withAttributes`` on :class:`Flow`, :class:`Source`, :class:`Sink` and :class:`Graph`. -Let's assume we have a stream of tweets readily available, in Akka this is expressed as a :class:`Source`: +Let's assume we have a stream of tweets readily available, in Akka this is expressed as a :class:`Source`: .. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/TwitterStreamQuickstartDocTest.java#tweet-source -Streams always start flowing from a :class:`Source` then can continue -through :class:`Flow` elements or more advanced graph elements to -finally be consumed by a :class:`Sink`. The first type -parameter—:class:`Tweet` in this case—designates the kind of elements produced -by the source while the second one describes the object that is created during -materialization (see below)—:class:`BoxedUnit` (from the ``scala.runtime`` -package) means that no value is produced, it is the generic equivalent of -``void``. Both Sources and Flows provide stream operations that can be used to -transform the flowing data, a :class:`Sink` however does not since its the "end -of stream" and its behavior depends on the type of :class:`Sink` used. +Streams always start flowing from a ``Source`` then can continue through ``Flow`` elements or +more advanced graph elements to finally be consumed by a ``Sink``. -In our case let's say we want to find all twitter handles of users which tweet about ``#akka``, the operations should look -familiar to anyone who has used the Scala Collections library, however they operate on streams and not collections of data: +The first type parameter—:class:`Tweet` in this case—designates the kind of elements produced +by the source while the ``M`` type parameters describe the object that is created during +materialization (:ref:`see below `)—:class:`BoxedUnit` (from the ``scala.runtime`` +package) means that no value is produced, it is the generic equivalent of ``void``. + +The operations should look familiar to anyone who has used the Scala Collections library, +however they operate on streams and not collections of data (which is a very important distinction, as some operations +only make sense in streaming and vice versa): .. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/TwitterStreamQuickstartDocTest.java#authors-filter-map Finally in order to :ref:`materialize ` and run the stream computation we need to attach -the Flow to a ``Sink`` that will get the flow running. The simplest way to do this is to call -``runWith(sink)`` on a ``Source``. For convenience a number of common Sinks are predefined and collected as static methods on +the Flow to a ``Sink`` that will get the flow running. The simplest way to do this is to call +``runWith(sink)`` on a ``Source``. For convenience a number of common Sinks are predefined and collected as static methods on the `Sink class `_. For now let's simply print each author: .. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/TwitterStreamQuickstartDocTest.java#authors-foreachsink-println -or by using the shorthand version (which are defined only for the most popular sinks such as :class:`FoldSink` and :class:`ForeachSink`): +or by using the shorthand version (which are defined only for the most popular sinks such as :class:`Sink.fold` and :class:`Sink.foreach`): .. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/TwitterStreamQuickstartDocTest.java#authors-foreach-println Materializing and running a stream always requires a :class:`Materializer` to be passed in explicitly, like this: ``.run(mat)``. +The complete snippet looks like this: + +.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/TwitterStreamQuickstartDocTest.java#first-sample + Flattening sequences in streams ------------------------------- In the previous section we were working on 1:1 relationships of elements which is the most common case, but sometimes @@ -94,9 +105,16 @@ input port to all of its output ports. Akka Streams intentionally separate the linear stream structures (Flows) from the non-linear, branching ones (FlowGraphs) in order to offer the most convenient API for both of these cases. Graphs can express arbitrarily complex stream setups -at the expense of not reading as familiarly as collection transformations. It is also possible to wrap complex computation -graphs as Flows, Sinks or Sources, which will be explained in detail in :ref:`constructing-sources-sinks-flows-from-partial-graphs-java`. -FlowGraphs are constructed like this: +at the expense of not reading as familiarly as collection transformations. + + +A graph can be either ``closed`` which is also known as a "*fully connected graph*", or ``partial`` which can be seen as +a *partial graph* (a graph with some unconnected ports), thus being a generalisation of the Flow concept, where ``Flow`` +is simply a partial graph with one unconnected input and one unconnected output. Concepts around composing and nesting +graphs in large structures are explained explained in detail in :ref:`composition-java`. + +It is also possible to wrap complex computation graphs as Flows, Sinks or Sources, which will be explained in +detail in :ref:`constructing-sources-sinks-flows-from-partial-graphs-java`. FlowGraphs are constructed like this: .. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/TwitterStreamQuickstartDocTest.java#flow-graph-broadcast @@ -127,6 +145,8 @@ The ``buffer`` element takes an explicit and required ``OverflowStrategy``, whic when it receives another element while it is full. Strategies provided include dropping the oldest element (``dropHead``), dropping the entire buffer, signalling failures etc. Be sure to pick and choose the strategy that fits your use case best. +.. _materialized-values-quick-java: + Materialized values ------------------- So far we've been only processing data using Flows and consuming it into some kind of external Sink - be it by printing @@ -136,26 +156,30 @@ While this question is not as obvious to give an answer to in case of an infinit this question in a streaming setting would to create a stream of counts described as "*up until now*, we've processed N tweets"), but in general it is possible to deal with finite streams and come up with a nice result such as a total count of elements. -First, let's write such an element counter using :class:`FoldSink` and then we'll see how it is possible to obtain materialized -values from a :class:`MaterializedMap` which is returned by materializing an Akka stream. We'll split execution into multiple -lines for the sake of explaining the concepts of ``Materializable`` elements and ``MaterializedType`` +First, let's write such an element counter using ``Flow.of(Class)`` and ``Sink.fold`` to see how the types look like: .. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/TwitterStreamQuickstartDocTest.java#tweets-fold-count -First, we prepare the :class:`FoldSink` which will be used to sum all ``Integer`` elements of the stream. -Next we connect the ``tweets`` stream though a ``map`` step which converts each tweet into the number ``1``, -finally we connect the flow ``to`` the previously prepared Sink. Notice that this step does *not* yet materialize the -processing pipeline, it merely prepares the description of the Flow, which is now connected to a Sink, and therefore can -be ``run()``, as indicated by its type: :class:`RunnableGraph`. Next we call ``run()`` which uses the implicit :class:`ActorMaterializer` -to materialize and run the flow. The value returned by calling ``run()`` on a ``RunnableGraph`` or ``FlowGraph`` is ``MaterializedMap``, -which can be used to retrieve materialized values from the running stream. +First we prepare a reusable ``Flow`` that will change each incoming tweet into an integer of value ``1``. +We combine all values of the transformed stream using ``Sink.fold`` will sum all ``Integer`` elements of the stream +and make its result available as a ``Future``. Next we connect the ``tweets`` stream though a ``map`` step which +converts each tweet into the number ``1``, finally we connect the flow using ``toMat`` the previously prepared Sink. -In order to extract an materialized value from a running stream it is possible to call ``get(Materializable)`` on a materialized map -obtained from materializing a flow or graph. Since ``FoldSink`` implements ``Materializable`` and implements the ``MaterializedType`` -as ``Future`` we can use it to obtain the :class:`Future` which when completed will contain the total length of our tweets stream. +Remember those mysterious ``Mat`` type parameters on ``Source``, ``Flow`` and ``Sink``? +They represent the type of values these processing parts return when materialized. When you chain these together, +you can explicitly combine their materialized values: in our example we used the ``Keep.right`` predefined function, +which tells the implementation to only care about the materialized type of the stage currently appended to the right. +As you can notice, the materialized type of sumSink is ``Future`` and because of using ``Keep.right``, +the resulting :class:`RunnableGraph` has also a type parameter of ``Future``. + +This step does *not* yet materialize the +processing pipeline, it merely prepares the description of the Flow, which is now connected to a Sink, and therefore can +be ``run()``, as indicated by its type: ``RunnableGraph>``. Next we call ``run()`` which uses the :class:`ActorMaterializer` +to materialize and run the flow. The value returned by calling ``run()`` on a ``RunnableGraph`` is of type ``T``. +In our case this type is ``Future`` which, when completed, will contain the total length of our tweets stream. In case of the stream failing, this future would complete with a Failure. -The reason we have to ``get`` the value out from the materialized map, is because a :class:`RunnableGraph` may be reused +A :class:`RunnableGraph` may be reused and materialized multiple times, because it is just the "blueprint" of the stream. This means that if we materialize a stream, for example one that consumes a live stream of tweets within a minute, the materialized values for those two materializations will be different, as illustrated by this example: @@ -167,3 +191,8 @@ steering these elements which will be discussed in detail in :ref:`stream-materi what happens behind the scenes when we run this one-liner, which is equivalent to the multi line version above: .. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/TwitterStreamQuickstartDocTest.java#tweets-fold-count-oneline + +.. note:: + ``runWith()`` is a convenience method that automatically ignores the materialized value of any other stages except + those appended by the ``runWith()`` itself. In the above example it translates to using ``Keep.right`` as the combiner + for materialized values. diff --git a/akka-docs-dev/rst/scala/code/docs/stream/TwitterStreamQuickstartDocSpec.scala b/akka-docs-dev/rst/scala/code/docs/stream/TwitterStreamQuickstartDocSpec.scala index 4ebc73ca0d..b69ebd93fa 100644 --- a/akka-docs-dev/rst/scala/code/docs/stream/TwitterStreamQuickstartDocSpec.scala +++ b/akka-docs-dev/rst/scala/code/docs/stream/TwitterStreamQuickstartDocSpec.scala @@ -60,20 +60,25 @@ class TwitterStreamQuickstartDocSpec extends AkkaSpec { } trait Example1 { + //#first-sample //#materializer-setup implicit val system = ActorSystem("reactive-tweets") implicit val materializer = ActorMaterializer() //#materializer-setup + //#first-sample } implicit val mat = ActorMaterializer() "filter and map" in { + //#first-sample + //#authors-filter-map val authors: Source[Author, Unit] = tweets .filter(_.hashtags.contains(akka)) .map(_.author) + //#first-sample //#authors-filter-map trait Example3 { @@ -83,9 +88,12 @@ class TwitterStreamQuickstartDocSpec extends AkkaSpec { //#authors-collect } + //#first-sample + //#authors-foreachsink-println authors.runWith(Sink.foreach(println)) //#authors-foreachsink-println + //#first-sample //#authors-foreach-println authors.runForeach(println) @@ -155,11 +163,16 @@ class TwitterStreamQuickstartDocSpec extends AkkaSpec { "count elements on finite stream" in { //#tweets-fold-count + val count: Flow[Tweet, Int, Unit] = Flow[Tweet].map(_ => 1) + val sumSink: Sink[Int, Future[Int]] = Sink.fold[Int, Int](0)(_ + _) - val counter: RunnableGraph[Future[Int]] = tweets.map(t => 1).toMat(sumSink)(Keep.right) + val counterGraph: RunnableGraph[Future[Int]] = + tweets + .via(count) + .toMat(sumSink)(Keep.right) - val sum: Future[Int] = counter.run() + val sum: Future[Int] = counterGraph.run() sum.foreach(c => println(s"Total tweets processed: $c")) //#tweets-fold-count diff --git a/akka-docs-dev/rst/scala/stream-flows-and-basics.rst b/akka-docs-dev/rst/scala/stream-flows-and-basics.rst index 9ab8de8daa..6626d7b3fc 100644 --- a/akka-docs-dev/rst/scala/stream-flows-and-basics.rst +++ b/akka-docs-dev/rst/scala/stream-flows-and-basics.rst @@ -4,6 +4,8 @@ Basics and working with Flows ############################# +.. _core-concepts-scala: + Core concepts ============= @@ -41,6 +43,8 @@ will use asynchronous means to slow down a fast producer, without blocking its t design, since entities that need to wait (a fast producer waiting on a slow consumer) will not block the thread but can hand it back for further use to an underlying thread-pool. +.. _defining-and-running-streams-scala: + Defining and running streams ---------------------------- Linear processing pipelines can be expressed in Akka Streams using the following core abstractions: diff --git a/akka-docs-dev/rst/scala/stream-quickstart.rst b/akka-docs-dev/rst/scala/stream-quickstart.rst index a04442ea2d..57dc8e44a4 100644 --- a/akka-docs-dev/rst/scala/stream-quickstart.rst +++ b/akka-docs-dev/rst/scala/stream-quickstart.rst @@ -17,8 +17,16 @@ Here's the data model we'll be working with throughout the quickstart examples: .. includecode:: code/docs/stream/TwitterStreamQuickstartDocSpec.scala#model +.. note:: + If you would like to get an overview of the used vocabulary first instead of diving head-first + into an actual example you can have a look at the :ref:`core-concepts-scala` and :ref:`defining-and-running-streams-scala` + sections of the docs, and then come back to this quickstart to see it all pieced together into a simple example application. + Transforming and consuming simple streams ----------------------------------------- +The example application we will be looking at is a simple Twitter fed stream from which we'll want to extract certain information, +like for example finding all twitter handles of users who tweet about ``#akka``. + In order to prepare our environment by creating an :class:`ActorSystem` and :class:`ActorMaterializer`, which will be responsible for materializing and running the streams we are about to create: @@ -34,12 +42,12 @@ Let's assume we have a stream of tweets readily available, in Akka this is expre Streams always start flowing from a :class:`Source[Out,M1]` then can continue through :class:`Flow[In,Out,M2]` elements or more advanced graph elements to finally be consumed by a :class:`Sink[In,M3]` (ignore the type parameters ``M1``, ``M2`` -and ``M3`` for now, they are not relevant to the types of the elements produced/consumed by these classes). Both Sources and -Flows provide stream operations that can be used to transform the flowing data, a :class:`Sink` however does not since -its the "end of stream" and its behavior depends on the type of :class:`Sink` used. +and ``M3`` for now, they are not relevant to the types of the elements produced/consumed by these classes – they are +"materialized types", which we'll talk about :ref:`below `). -In our case let's say we want to find all twitter handles of users which tweet about ``#akka``, the operations should look -familiar to anyone who has used the Scala Collections library, however they operate on streams and not collections of data: +The operations should look familiar to anyone who has used the Scala Collections library, +however they operate on streams and not collections of data (which is a very important distinction, as some operations +only make sense in streaming and vice versa): .. includecode:: code/docs/stream/TwitterStreamQuickstartDocSpec.scala#authors-filter-map @@ -51,13 +59,17 @@ For now let's simply print each author: .. includecode:: code/docs/stream/TwitterStreamQuickstartDocSpec.scala#authors-foreachsink-println -or by using the shorthand version (which are defined only for the most popular sinks such as :class:`FoldSink` and :class:`ForeachSink`): +or by using the shorthand version (which are defined only for the most popular sinks such as ``Sink.fold`` and ``Sink.foreach``): .. includecode:: code/docs/stream/TwitterStreamQuickstartDocSpec.scala#authors-foreach-println Materializing and running a stream always requires a :class:`Materializer` to be in implicit scope (or passed in explicitly, like this: ``.run(materializer)``). +The complete snippet looks like this: + +.. includecode:: code/docs/stream/TwitterStreamQuickstartDocSpec.scala#first-sample + Flattening sequences in streams ------------------------------- In the previous section we were working on 1:1 relationships of elements which is the most common case, but sometimes @@ -89,7 +101,14 @@ input port to all of its output ports. Akka Streams intentionally separate the linear stream structures (Flows) from the non-linear, branching ones (FlowGraphs) in order to offer the most convenient API for both of these cases. Graphs can express arbitrarily complex stream setups -at the expense of not reading as familiarly as collection transformations. It is also possible to wrap complex computation +at the expense of not reading as familiarly as collection transformations. + +A graph can be either ``closed`` which is also known as a "*fully connected graph*", or ``partial`` which can be seen as +a *partial graph* (a graph with some unconnected ports), thus being a generalisation of the Flow concept, where ``Flow`` +is simply a partial graph with one unconnected input and one unconnected output. Concepts around composing and nesting +graphs in large structures are explained explained in detail in :ref:`composition-scala`. + +It is also possible to wrap complex computation graphs as Flows, Sinks or Sources, which will be explained in detail in :ref:`constructing-sources-sinks-flows-from-partial-graphs-scala`. FlowGraphs are constructed like this: @@ -127,6 +146,8 @@ The ``buffer`` element takes an explicit and required ``OverflowStrategy``, whic when it receives another element while it is full. Strategies provided include dropping the oldest element (``dropHead``), dropping the entire buffer, signalling errors etc. Be sure to pick and choose the strategy that fits your use case best. +.. _materialized-values-quick-scala: + Materialized values ------------------- So far we've been only processing data using Flows and consuming it into some kind of external Sink - be it by printing @@ -136,22 +157,25 @@ While this question is not as obvious to give an answer to in case of an infinit this question in a streaming setting would to create a stream of counts described as "*up until now*, we've processed N tweets"), but in general it is possible to deal with finite streams and come up with a nice result such as a total count of elements. -First, let's write such an element counter using :class:`FoldSink` and see how the types look like: +First, let's write such an element counter using ``Sink.fold`` and see how the types look like: .. includecode:: code/docs/stream/TwitterStreamQuickstartDocSpec.scala#tweets-fold-count -First, we prepare the :class:`FoldSink` which will be used to sum all ``Int`` elements of the stream. -Next we connect the ``tweets`` stream though a ``map`` step which converts each tweet into the number ``1``, -finally we connect the flow using ``toMat`` the previously prepared Sink. Remember those mysterious type parameters on -:class:`Source` :class:`Flow` and :class:`Sink`? They represent the type of values these processing parts return when -materialized. When you chain these together, you can explicitly combine their materialized values: in our example we -used the ``Keep.right`` predefined function, which tells the implementation to only care about the materialized -type of the stage currently appended to the right. As you can notice, the materialized type of sumSink is ``Future[Int]`` -and because of using ``Keep.right``, the resulting :class:`RunnableGraph` has also a type parameter of ``Future[Int]``. +First we prepare a reusable ``Flow`` that will change each incoming tweet into an integer of value ``1``. +We'll use this in order to combine those ones with a ``Sink.fold`` will sum all ``Int`` elements of the stream +and make its result available as a ``Future[Int]``. Next we connect the ``tweets`` stream though a ``map`` step which +converts each tweet into the number ``1``, finally we connect the flow using ``toMat`` the previously prepared Sink. + +Remember those mysterious ``Mat`` type parameters on ``Source[+Out, +Mat]``, ``Flow[-In, +Out, +Mat]`` and ``Sink[-In, +Mat]``? +They represent the type of values these processing parts return when materialized. When you chain these together, +you can explicitly combine their materialized values: in our example we used the ``Keep.right`` predefined function, +which tells the implementation to only care about the materialized type of the stage currently appended to the right. +As you can notice, the materialized type of sumSink is ``Future[Int]`` and because of using ``Keep.right``, +the resulting :class:`RunnableGraph` has also a type parameter of ``Future[Int]``. This step does *not* yet materialize the processing pipeline, it merely prepares the description of the Flow, which is now connected to a Sink, and therefore can -be ``run()``, as indicated by its type: :class:`RunnableGraph[Future[Int]]`. Next we call ``run()`` which uses the implicit :class:`ActorMaterializer` +be ``run()``, as indicated by its type: ``RunnableGraph[Future[Int]]``. Next we call ``run()`` which uses the implicit :class:`ActorMaterializer` to materialize and run the flow. The value returned by calling ``run()`` on a ``RunnableGraph[T]`` is of type ``T``. In our case this type is ``Future[Int]`` which, when completed, will contain the total length of our tweets stream. In case of the stream failing, this future would complete with a Failure.