Merge pull request #17961 from ktoso/wip-quickstart-impr-ktoso

=doc #17655 improvements in streams quickstart
This commit is contained in:
Konrad Malawski 2015-07-14 15:58:05 +02:00
commit c4d018d31a
5 changed files with 125 additions and 52 deletions

View file

@ -4,6 +4,8 @@
Basics and working with Flows Basics and working with Flows
############################# #############################
.. _core-concepts-java:
Core concepts Core concepts
============= =============
@ -41,6 +43,7 @@ will use asynchronous means to slow down a fast producer, without blocking its t
design, since entities that need to wait (a fast producer waiting on a slow consumer) will not block the thread but design, since entities that need to wait (a fast producer waiting on a slow consumer) will not block the thread but
can hand it back for further use to an underlying thread-pool. can hand it back for further use to an underlying thread-pool.
.. _defining-and-running-streams-java:
Defining and running streams Defining and running streams
---------------------------- ----------------------------

View file

@ -17,8 +17,17 @@ Here's the data model we'll be working with throughout the quickstart examples:
.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/TwitterStreamQuickstartDocTest.java#model .. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/TwitterStreamQuickstartDocTest.java#model
.. note::
If you would like to get an overview of the used vocabulary first instead of diving head-first
into an actual example you can have a look at the :ref:`core-concepts-java` and :ref:`defining-and-running-streams-java`
sections of the docs, and then come back to this quickstart to see it all pieced together into a simple example application.
Transforming and consuming simple streams Transforming and consuming simple streams
----------------------------------------- -----------------------------------------
The example application we will be looking at is a simple Twitter fed stream from which we'll want to extract certain information,
like for example finding all twitter handles of users who tweet about ``#akka``.
In order to prepare our environment by creating an :class:`ActorSystem` and :class:`ActorMaterializer`, In order to prepare our environment by creating an :class:`ActorSystem` and :class:`ActorMaterializer`,
which will be responsible for materializing and running the streams we are about to create: which will be responsible for materializing and running the streams we are about to create:
@ -28,41 +37,43 @@ The :class:`ActorMaterializer` can optionally take :class:`ActorMaterializerSett
materialization properties, such as default buffer sizes (see also :ref:`stream-buffers-java`), the dispatcher to materialization properties, such as default buffer sizes (see also :ref:`stream-buffers-java`), the dispatcher to
be used by the pipeline etc. These can be overridden ``withAttributes`` on :class:`Flow`, :class:`Source`, :class:`Sink` and :class:`Graph`. be used by the pipeline etc. These can be overridden ``withAttributes`` on :class:`Flow`, :class:`Source`, :class:`Sink` and :class:`Graph`.
Let's assume we have a stream of tweets readily available, in Akka this is expressed as a :class:`Source<Out>`: Let's assume we have a stream of tweets readily available, in Akka this is expressed as a :class:`Source<Out, M>`:
.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/TwitterStreamQuickstartDocTest.java#tweet-source .. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/TwitterStreamQuickstartDocTest.java#tweet-source
Streams always start flowing from a :class:`Source<Out>` then can continue Streams always start flowing from a ``Source<Out,M1>`` then can continue through ``Flow<In,Out,M2>`` elements or
through :class:`Flow<In,Out>` elements or more advanced graph elements to more advanced graph elements to finally be consumed by a ``Sink<In,M3>``.
finally be consumed by a :class:`Sink<In>`. The first type
parameter—:class:`Tweet` in this case—designates the kind of elements produced
by the source while the second one describes the object that is created during
materialization (see below)—:class:`BoxedUnit` (from the ``scala.runtime``
package) means that no value is produced, it is the generic equivalent of
``void``. Both Sources and Flows provide stream operations that can be used to
transform the flowing data, a :class:`Sink` however does not since its the "end
of stream" and its behavior depends on the type of :class:`Sink` used.
In our case let's say we want to find all twitter handles of users which tweet about ``#akka``, the operations should look The first type parameter—:class:`Tweet` in this case—designates the kind of elements produced
familiar to anyone who has used the Scala Collections library, however they operate on streams and not collections of data: by the source while the ``M`` type parameters describe the object that is created during
materialization (:ref:`see below <materialized-values-quick-java>`)—:class:`BoxedUnit` (from the ``scala.runtime``
package) means that no value is produced, it is the generic equivalent of ``void``.
The operations should look familiar to anyone who has used the Scala Collections library,
however they operate on streams and not collections of data (which is a very important distinction, as some operations
only make sense in streaming and vice versa):
.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/TwitterStreamQuickstartDocTest.java#authors-filter-map .. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/TwitterStreamQuickstartDocTest.java#authors-filter-map
Finally in order to :ref:`materialize <stream-materialization-java>` and run the stream computation we need to attach Finally in order to :ref:`materialize <stream-materialization-java>` and run the stream computation we need to attach
the Flow to a ``Sink<T>`` that will get the flow running. The simplest way to do this is to call the Flow to a ``Sink<T, M>`` that will get the flow running. The simplest way to do this is to call
``runWith(sink)`` on a ``Source<Out>``. For convenience a number of common Sinks are predefined and collected as static methods on ``runWith(sink)`` on a ``Source<Out, M>``. For convenience a number of common Sinks are predefined and collected as static methods on
the `Sink class <http://doc.akka.io/japi/akka-stream-and-http-experimental/@version@/akka/stream/javadsl/Sink.html>`_. the `Sink class <http://doc.akka.io/japi/akka-stream-and-http-experimental/@version@/akka/stream/javadsl/Sink.html>`_.
For now let's simply print each author: For now let's simply print each author:
.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/TwitterStreamQuickstartDocTest.java#authors-foreachsink-println .. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/TwitterStreamQuickstartDocTest.java#authors-foreachsink-println
or by using the shorthand version (which are defined only for the most popular sinks such as :class:`FoldSink` and :class:`ForeachSink`): or by using the shorthand version (which are defined only for the most popular sinks such as :class:`Sink.fold` and :class:`Sink.foreach`):
.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/TwitterStreamQuickstartDocTest.java#authors-foreach-println .. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/TwitterStreamQuickstartDocTest.java#authors-foreach-println
Materializing and running a stream always requires a :class:`Materializer` to be passed in explicitly, Materializing and running a stream always requires a :class:`Materializer` to be passed in explicitly,
like this: ``.run(mat)``. like this: ``.run(mat)``.
The complete snippet looks like this:
.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/TwitterStreamQuickstartDocTest.java#first-sample
Flattening sequences in streams Flattening sequences in streams
------------------------------- -------------------------------
In the previous section we were working on 1:1 relationships of elements which is the most common case, but sometimes In the previous section we were working on 1:1 relationships of elements which is the most common case, but sometimes
@ -94,9 +105,16 @@ input port to all of its output ports.
Akka Streams intentionally separate the linear stream structures (Flows) from the non-linear, branching ones (FlowGraphs) Akka Streams intentionally separate the linear stream structures (Flows) from the non-linear, branching ones (FlowGraphs)
in order to offer the most convenient API for both of these cases. Graphs can express arbitrarily complex stream setups in order to offer the most convenient API for both of these cases. Graphs can express arbitrarily complex stream setups
at the expense of not reading as familiarly as collection transformations. It is also possible to wrap complex computation at the expense of not reading as familiarly as collection transformations.
graphs as Flows, Sinks or Sources, which will be explained in detail in :ref:`constructing-sources-sinks-flows-from-partial-graphs-java`.
FlowGraphs are constructed like this:
A graph can be either ``closed`` which is also known as a "*fully connected graph*", or ``partial`` which can be seen as
a *partial graph* (a graph with some unconnected ports), thus being a generalisation of the Flow concept, where ``Flow``
is simply a partial graph with one unconnected input and one unconnected output. Concepts around composing and nesting
graphs in large structures are explained explained in detail in :ref:`composition-java`.
It is also possible to wrap complex computation graphs as Flows, Sinks or Sources, which will be explained in
detail in :ref:`constructing-sources-sinks-flows-from-partial-graphs-java`. FlowGraphs are constructed like this:
.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/TwitterStreamQuickstartDocTest.java#flow-graph-broadcast .. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/TwitterStreamQuickstartDocTest.java#flow-graph-broadcast
@ -127,6 +145,8 @@ The ``buffer`` element takes an explicit and required ``OverflowStrategy``, whic
when it receives another element while it is full. Strategies provided include dropping the oldest element (``dropHead``), when it receives another element while it is full. Strategies provided include dropping the oldest element (``dropHead``),
dropping the entire buffer, signalling failures etc. Be sure to pick and choose the strategy that fits your use case best. dropping the entire buffer, signalling failures etc. Be sure to pick and choose the strategy that fits your use case best.
.. _materialized-values-quick-java:
Materialized values Materialized values
------------------- -------------------
So far we've been only processing data using Flows and consuming it into some kind of external Sink - be it by printing So far we've been only processing data using Flows and consuming it into some kind of external Sink - be it by printing
@ -136,26 +156,30 @@ While this question is not as obvious to give an answer to in case of an infinit
this question in a streaming setting would to create a stream of counts described as "*up until now*, we've processed N tweets"), this question in a streaming setting would to create a stream of counts described as "*up until now*, we've processed N tweets"),
but in general it is possible to deal with finite streams and come up with a nice result such as a total count of elements. but in general it is possible to deal with finite streams and come up with a nice result such as a total count of elements.
First, let's write such an element counter using :class:`FoldSink` and then we'll see how it is possible to obtain materialized First, let's write such an element counter using ``Flow.of(Class)`` and ``Sink.fold`` to see how the types look like:
values from a :class:`MaterializedMap` which is returned by materializing an Akka stream. We'll split execution into multiple
lines for the sake of explaining the concepts of ``Materializable`` elements and ``MaterializedType``
.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/TwitterStreamQuickstartDocTest.java#tweets-fold-count .. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/TwitterStreamQuickstartDocTest.java#tweets-fold-count
First, we prepare the :class:`FoldSink` which will be used to sum all ``Integer`` elements of the stream. First we prepare a reusable ``Flow`` that will change each incoming tweet into an integer of value ``1``.
Next we connect the ``tweets`` stream though a ``map`` step which converts each tweet into the number ``1``, We combine all values of the transformed stream using ``Sink.fold`` will sum all ``Integer`` elements of the stream
finally we connect the flow ``to`` the previously prepared Sink. Notice that this step does *not* yet materialize the and make its result available as a ``Future<Integer>``. Next we connect the ``tweets`` stream though a ``map`` step which
processing pipeline, it merely prepares the description of the Flow, which is now connected to a Sink, and therefore can converts each tweet into the number ``1``, finally we connect the flow using ``toMat`` the previously prepared Sink.
be ``run()``, as indicated by its type: :class:`RunnableGraph`. Next we call ``run()`` which uses the implicit :class:`ActorMaterializer`
to materialize and run the flow. The value returned by calling ``run()`` on a ``RunnableGraph`` or ``FlowGraph`` is ``MaterializedMap``,
which can be used to retrieve materialized values from the running stream.
In order to extract an materialized value from a running stream it is possible to call ``get(Materializable)`` on a materialized map Remember those mysterious ``Mat`` type parameters on ``Source<Out, Mat>``, ``Flow<In, Out, Mat>`` and ``Sink<In, Mat>``?
obtained from materializing a flow or graph. Since ``FoldSink`` implements ``Materializable`` and implements the ``MaterializedType`` They represent the type of values these processing parts return when materialized. When you chain these together,
as ``Future<Integer>`` we can use it to obtain the :class:`Future` which when completed will contain the total length of our tweets stream. you can explicitly combine their materialized values: in our example we used the ``Keep.right`` predefined function,
which tells the implementation to only care about the materialized type of the stage currently appended to the right.
As you can notice, the materialized type of sumSink is ``Future<Integer>`` and because of using ``Keep.right``,
the resulting :class:`RunnableGraph` has also a type parameter of ``Future<Integer>``.
This step does *not* yet materialize the
processing pipeline, it merely prepares the description of the Flow, which is now connected to a Sink, and therefore can
be ``run()``, as indicated by its type: ``RunnableGraph<Future<Integer>>``. Next we call ``run()`` which uses the :class:`ActorMaterializer`
to materialize and run the flow. The value returned by calling ``run()`` on a ``RunnableGraph<T>`` is of type ``T``.
In our case this type is ``Future<Integer>`` which, when completed, will contain the total length of our tweets stream.
In case of the stream failing, this future would complete with a Failure. In case of the stream failing, this future would complete with a Failure.
The reason we have to ``get`` the value out from the materialized map, is because a :class:`RunnableGraph` may be reused A :class:`RunnableGraph` may be reused
and materialized multiple times, because it is just the "blueprint" of the stream. This means that if we materialize a stream, and materialized multiple times, because it is just the "blueprint" of the stream. This means that if we materialize a stream,
for example one that consumes a live stream of tweets within a minute, the materialized values for those two materializations for example one that consumes a live stream of tweets within a minute, the materialized values for those two materializations
will be different, as illustrated by this example: will be different, as illustrated by this example:
@ -167,3 +191,8 @@ steering these elements which will be discussed in detail in :ref:`stream-materi
what happens behind the scenes when we run this one-liner, which is equivalent to the multi line version above: what happens behind the scenes when we run this one-liner, which is equivalent to the multi line version above:
.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/TwitterStreamQuickstartDocTest.java#tweets-fold-count-oneline .. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/TwitterStreamQuickstartDocTest.java#tweets-fold-count-oneline
.. note::
``runWith()`` is a convenience method that automatically ignores the materialized value of any other stages except
those appended by the ``runWith()`` itself. In the above example it translates to using ``Keep.right`` as the combiner
for materialized values.

View file

@ -60,20 +60,25 @@ class TwitterStreamQuickstartDocSpec extends AkkaSpec {
} }
trait Example1 { trait Example1 {
//#first-sample
//#materializer-setup //#materializer-setup
implicit val system = ActorSystem("reactive-tweets") implicit val system = ActorSystem("reactive-tweets")
implicit val materializer = ActorMaterializer() implicit val materializer = ActorMaterializer()
//#materializer-setup //#materializer-setup
//#first-sample
} }
implicit val mat = ActorMaterializer() implicit val mat = ActorMaterializer()
"filter and map" in { "filter and map" in {
//#first-sample
//#authors-filter-map //#authors-filter-map
val authors: Source[Author, Unit] = val authors: Source[Author, Unit] =
tweets tweets
.filter(_.hashtags.contains(akka)) .filter(_.hashtags.contains(akka))
.map(_.author) .map(_.author)
//#first-sample
//#authors-filter-map //#authors-filter-map
trait Example3 { trait Example3 {
@ -83,9 +88,12 @@ class TwitterStreamQuickstartDocSpec extends AkkaSpec {
//#authors-collect //#authors-collect
} }
//#first-sample
//#authors-foreachsink-println //#authors-foreachsink-println
authors.runWith(Sink.foreach(println)) authors.runWith(Sink.foreach(println))
//#authors-foreachsink-println //#authors-foreachsink-println
//#first-sample
//#authors-foreach-println //#authors-foreach-println
authors.runForeach(println) authors.runForeach(println)
@ -155,11 +163,16 @@ class TwitterStreamQuickstartDocSpec extends AkkaSpec {
"count elements on finite stream" in { "count elements on finite stream" in {
//#tweets-fold-count //#tweets-fold-count
val count: Flow[Tweet, Int, Unit] = Flow[Tweet].map(_ => 1)
val sumSink: Sink[Int, Future[Int]] = Sink.fold[Int, Int](0)(_ + _) val sumSink: Sink[Int, Future[Int]] = Sink.fold[Int, Int](0)(_ + _)
val counter: RunnableGraph[Future[Int]] = tweets.map(t => 1).toMat(sumSink)(Keep.right) val counterGraph: RunnableGraph[Future[Int]] =
tweets
.via(count)
.toMat(sumSink)(Keep.right)
val sum: Future[Int] = counter.run() val sum: Future[Int] = counterGraph.run()
sum.foreach(c => println(s"Total tweets processed: $c")) sum.foreach(c => println(s"Total tweets processed: $c"))
//#tweets-fold-count //#tweets-fold-count

View file

@ -4,6 +4,8 @@
Basics and working with Flows Basics and working with Flows
############################# #############################
.. _core-concepts-scala:
Core concepts Core concepts
============= =============
@ -41,6 +43,8 @@ will use asynchronous means to slow down a fast producer, without blocking its t
design, since entities that need to wait (a fast producer waiting on a slow consumer) will not block the thread but design, since entities that need to wait (a fast producer waiting on a slow consumer) will not block the thread but
can hand it back for further use to an underlying thread-pool. can hand it back for further use to an underlying thread-pool.
.. _defining-and-running-streams-scala:
Defining and running streams Defining and running streams
---------------------------- ----------------------------
Linear processing pipelines can be expressed in Akka Streams using the following core abstractions: Linear processing pipelines can be expressed in Akka Streams using the following core abstractions:

View file

@ -17,8 +17,16 @@ Here's the data model we'll be working with throughout the quickstart examples:
.. includecode:: code/docs/stream/TwitterStreamQuickstartDocSpec.scala#model .. includecode:: code/docs/stream/TwitterStreamQuickstartDocSpec.scala#model
.. note::
If you would like to get an overview of the used vocabulary first instead of diving head-first
into an actual example you can have a look at the :ref:`core-concepts-scala` and :ref:`defining-and-running-streams-scala`
sections of the docs, and then come back to this quickstart to see it all pieced together into a simple example application.
Transforming and consuming simple streams Transforming and consuming simple streams
----------------------------------------- -----------------------------------------
The example application we will be looking at is a simple Twitter fed stream from which we'll want to extract certain information,
like for example finding all twitter handles of users who tweet about ``#akka``.
In order to prepare our environment by creating an :class:`ActorSystem` and :class:`ActorMaterializer`, In order to prepare our environment by creating an :class:`ActorSystem` and :class:`ActorMaterializer`,
which will be responsible for materializing and running the streams we are about to create: which will be responsible for materializing and running the streams we are about to create:
@ -34,12 +42,12 @@ Let's assume we have a stream of tweets readily available, in Akka this is expre
Streams always start flowing from a :class:`Source[Out,M1]` then can continue through :class:`Flow[In,Out,M2]` elements or Streams always start flowing from a :class:`Source[Out,M1]` then can continue through :class:`Flow[In,Out,M2]` elements or
more advanced graph elements to finally be consumed by a :class:`Sink[In,M3]` (ignore the type parameters ``M1``, ``M2`` more advanced graph elements to finally be consumed by a :class:`Sink[In,M3]` (ignore the type parameters ``M1``, ``M2``
and ``M3`` for now, they are not relevant to the types of the elements produced/consumed by these classes). Both Sources and and ``M3`` for now, they are not relevant to the types of the elements produced/consumed by these classes they are
Flows provide stream operations that can be used to transform the flowing data, a :class:`Sink` however does not since "materialized types", which we'll talk about :ref:`below <materialized-values-quick-scala>`).
its the "end of stream" and its behavior depends on the type of :class:`Sink` used.
In our case let's say we want to find all twitter handles of users which tweet about ``#akka``, the operations should look The operations should look familiar to anyone who has used the Scala Collections library,
familiar to anyone who has used the Scala Collections library, however they operate on streams and not collections of data: however they operate on streams and not collections of data (which is a very important distinction, as some operations
only make sense in streaming and vice versa):
.. includecode:: code/docs/stream/TwitterStreamQuickstartDocSpec.scala#authors-filter-map .. includecode:: code/docs/stream/TwitterStreamQuickstartDocSpec.scala#authors-filter-map
@ -51,13 +59,17 @@ For now let's simply print each author:
.. includecode:: code/docs/stream/TwitterStreamQuickstartDocSpec.scala#authors-foreachsink-println .. includecode:: code/docs/stream/TwitterStreamQuickstartDocSpec.scala#authors-foreachsink-println
or by using the shorthand version (which are defined only for the most popular sinks such as :class:`FoldSink` and :class:`ForeachSink`): or by using the shorthand version (which are defined only for the most popular sinks such as ``Sink.fold`` and ``Sink.foreach``):
.. includecode:: code/docs/stream/TwitterStreamQuickstartDocSpec.scala#authors-foreach-println .. includecode:: code/docs/stream/TwitterStreamQuickstartDocSpec.scala#authors-foreach-println
Materializing and running a stream always requires a :class:`Materializer` to be in implicit scope (or passed in explicitly, Materializing and running a stream always requires a :class:`Materializer` to be in implicit scope (or passed in explicitly,
like this: ``.run(materializer)``). like this: ``.run(materializer)``).
The complete snippet looks like this:
.. includecode:: code/docs/stream/TwitterStreamQuickstartDocSpec.scala#first-sample
Flattening sequences in streams Flattening sequences in streams
------------------------------- -------------------------------
In the previous section we were working on 1:1 relationships of elements which is the most common case, but sometimes In the previous section we were working on 1:1 relationships of elements which is the most common case, but sometimes
@ -89,7 +101,14 @@ input port to all of its output ports.
Akka Streams intentionally separate the linear stream structures (Flows) from the non-linear, branching ones (FlowGraphs) Akka Streams intentionally separate the linear stream structures (Flows) from the non-linear, branching ones (FlowGraphs)
in order to offer the most convenient API for both of these cases. Graphs can express arbitrarily complex stream setups in order to offer the most convenient API for both of these cases. Graphs can express arbitrarily complex stream setups
at the expense of not reading as familiarly as collection transformations. It is also possible to wrap complex computation at the expense of not reading as familiarly as collection transformations.
A graph can be either ``closed`` which is also known as a "*fully connected graph*", or ``partial`` which can be seen as
a *partial graph* (a graph with some unconnected ports), thus being a generalisation of the Flow concept, where ``Flow``
is simply a partial graph with one unconnected input and one unconnected output. Concepts around composing and nesting
graphs in large structures are explained explained in detail in :ref:`composition-scala`.
It is also possible to wrap complex computation
graphs as Flows, Sinks or Sources, which will be explained in detail in :ref:`constructing-sources-sinks-flows-from-partial-graphs-scala`. graphs as Flows, Sinks or Sources, which will be explained in detail in :ref:`constructing-sources-sinks-flows-from-partial-graphs-scala`.
FlowGraphs are constructed like this: FlowGraphs are constructed like this:
@ -127,6 +146,8 @@ The ``buffer`` element takes an explicit and required ``OverflowStrategy``, whic
when it receives another element while it is full. Strategies provided include dropping the oldest element (``dropHead``), when it receives another element while it is full. Strategies provided include dropping the oldest element (``dropHead``),
dropping the entire buffer, signalling errors etc. Be sure to pick and choose the strategy that fits your use case best. dropping the entire buffer, signalling errors etc. Be sure to pick and choose the strategy that fits your use case best.
.. _materialized-values-quick-scala:
Materialized values Materialized values
------------------- -------------------
So far we've been only processing data using Flows and consuming it into some kind of external Sink - be it by printing So far we've been only processing data using Flows and consuming it into some kind of external Sink - be it by printing
@ -136,22 +157,25 @@ While this question is not as obvious to give an answer to in case of an infinit
this question in a streaming setting would to create a stream of counts described as "*up until now*, we've processed N tweets"), this question in a streaming setting would to create a stream of counts described as "*up until now*, we've processed N tweets"),
but in general it is possible to deal with finite streams and come up with a nice result such as a total count of elements. but in general it is possible to deal with finite streams and come up with a nice result such as a total count of elements.
First, let's write such an element counter using :class:`FoldSink` and see how the types look like: First, let's write such an element counter using ``Sink.fold`` and see how the types look like:
.. includecode:: code/docs/stream/TwitterStreamQuickstartDocSpec.scala#tweets-fold-count .. includecode:: code/docs/stream/TwitterStreamQuickstartDocSpec.scala#tweets-fold-count
First, we prepare the :class:`FoldSink` which will be used to sum all ``Int`` elements of the stream. First we prepare a reusable ``Flow`` that will change each incoming tweet into an integer of value ``1``.
Next we connect the ``tweets`` stream though a ``map`` step which converts each tweet into the number ``1``, We'll use this in order to combine those ones with a ``Sink.fold`` will sum all ``Int`` elements of the stream
finally we connect the flow using ``toMat`` the previously prepared Sink. Remember those mysterious type parameters on and make its result available as a ``Future[Int]``. Next we connect the ``tweets`` stream though a ``map`` step which
:class:`Source` :class:`Flow` and :class:`Sink`? They represent the type of values these processing parts return when converts each tweet into the number ``1``, finally we connect the flow using ``toMat`` the previously prepared Sink.
materialized. When you chain these together, you can explicitly combine their materialized values: in our example we
used the ``Keep.right`` predefined function, which tells the implementation to only care about the materialized Remember those mysterious ``Mat`` type parameters on ``Source[+Out, +Mat]``, ``Flow[-In, +Out, +Mat]`` and ``Sink[-In, +Mat]``?
type of the stage currently appended to the right. As you can notice, the materialized type of sumSink is ``Future[Int]`` They represent the type of values these processing parts return when materialized. When you chain these together,
and because of using ``Keep.right``, the resulting :class:`RunnableGraph` has also a type parameter of ``Future[Int]``. you can explicitly combine their materialized values: in our example we used the ``Keep.right`` predefined function,
which tells the implementation to only care about the materialized type of the stage currently appended to the right.
As you can notice, the materialized type of sumSink is ``Future[Int]`` and because of using ``Keep.right``,
the resulting :class:`RunnableGraph` has also a type parameter of ``Future[Int]``.
This step does *not* yet materialize the This step does *not* yet materialize the
processing pipeline, it merely prepares the description of the Flow, which is now connected to a Sink, and therefore can processing pipeline, it merely prepares the description of the Flow, which is now connected to a Sink, and therefore can
be ``run()``, as indicated by its type: :class:`RunnableGraph[Future[Int]]`. Next we call ``run()`` which uses the implicit :class:`ActorMaterializer` be ``run()``, as indicated by its type: ``RunnableGraph[Future[Int]]``. Next we call ``run()`` which uses the implicit :class:`ActorMaterializer`
to materialize and run the flow. The value returned by calling ``run()`` on a ``RunnableGraph[T]`` is of type ``T``. to materialize and run the flow. The value returned by calling ``run()`` on a ``RunnableGraph[T]`` is of type ``T``.
In our case this type is ``Future[Int]`` which, when completed, will contain the total length of our tweets stream. In our case this type is ``Future[Int]`` which, when completed, will contain the total length of our tweets stream.
In case of the stream failing, this future would complete with a Failure. In case of the stream failing, this future would complete with a Failure.