2015-01-15 15:02:19 +01:00
|
|
|
|
.. _stream-quickstart-scala:
|
2014-12-20 14:11:29 +01:00
|
|
|
|
|
2014-12-22 16:18:26 +01:00
|
|
|
|
Quick Start Guide: Reactive Tweets
|
|
|
|
|
|
==================================
|
2014-12-20 14:11:29 +01:00
|
|
|
|
|
|
|
|
|
|
A typical use case for stream processing is consuming a live stream of data that we want to extract or aggregate some
|
|
|
|
|
|
other data from. In this example we'll consider consuming a stream of tweets and extracting information concerning Akka from them.
|
|
|
|
|
|
|
2014-12-20 16:56:22 +01:00
|
|
|
|
We will also consider the problem inherent to all non-blocking streaming
|
|
|
|
|
|
solutions: *"What if the subscriber is too slow to consume the live stream of
|
|
|
|
|
|
data?"*. Traditionally the solution is often to buffer the elements, but this
|
|
|
|
|
|
can—and usually will—cause eventual buffer overflows and instability of such
|
2016-01-06 13:27:41 +01:00
|
|
|
|
systems. Instead Akka Streams depend on internal backpressure signals that
|
2014-12-20 16:56:22 +01:00
|
|
|
|
allow to control what should happen in such scenarios.
|
2014-12-20 14:11:29 +01:00
|
|
|
|
|
|
|
|
|
|
Here's the data model we'll be working with throughout the quickstart examples:
|
|
|
|
|
|
|
2016-01-13 16:25:24 +01:00
|
|
|
|
.. includecode:: ../code/docs/stream/TwitterStreamQuickstartDocSpec.scala#model
|
2014-12-20 14:11:29 +01:00
|
|
|
|
|
2015-07-10 12:49:15 +02:00
|
|
|
|
.. note::
|
|
|
|
|
|
If you would like to get an overview of the used vocabulary first instead of diving head-first
|
|
|
|
|
|
into an actual example you can have a look at the :ref:`core-concepts-scala` and :ref:`defining-and-running-streams-scala`
|
|
|
|
|
|
sections of the docs, and then come back to this quickstart to see it all pieced together into a simple example application.
|
|
|
|
|
|
|
2014-12-20 14:11:29 +01:00
|
|
|
|
Transforming and consuming simple streams
|
|
|
|
|
|
-----------------------------------------
|
2015-09-18 10:02:09 -04:00
|
|
|
|
The example application we will be looking at is a simple Twitter feed stream from which we'll want to extract certain information,
|
2015-07-10 12:49:15 +02:00
|
|
|
|
like for example finding all twitter handles of users who tweet about ``#akka``.
|
|
|
|
|
|
|
2015-06-23 18:28:53 +02:00
|
|
|
|
In order to prepare our environment by creating an :class:`ActorSystem` and :class:`ActorMaterializer`,
|
2014-12-20 14:11:29 +01:00
|
|
|
|
which will be responsible for materializing and running the streams we are about to create:
|
|
|
|
|
|
|
2016-01-13 16:25:24 +01:00
|
|
|
|
.. includecode:: ../code/docs/stream/TwitterStreamQuickstartDocSpec.scala#materializer-setup
|
2014-12-20 14:11:29 +01:00
|
|
|
|
|
2015-06-23 18:28:53 +02:00
|
|
|
|
The :class:`ActorMaterializer` can optionally take :class:`ActorMaterializerSettings` which can be used to define
|
2015-01-29 16:43:47 +01:00
|
|
|
|
materialization properties, such as default buffer sizes (see also :ref:`stream-buffers-scala`), the dispatcher to
|
2016-01-06 13:27:41 +01:00
|
|
|
|
be used by the pipeline etc. These can be overridden with ``withAttributes`` on :class:`Flow`, :class:`Source`, :class:`Sink` and :class:`Graph`.
|
2014-12-20 14:11:29 +01:00
|
|
|
|
|
2016-01-06 13:27:41 +01:00
|
|
|
|
Let's assume we have a stream of tweets readily available. In Akka this is expressed as a :class:`Source[Out, M]`:
|
2014-12-20 14:11:29 +01:00
|
|
|
|
|
2016-01-13 16:25:24 +01:00
|
|
|
|
.. includecode:: ../code/docs/stream/TwitterStreamQuickstartDocSpec.scala#tweet-source
|
2014-12-20 14:11:29 +01:00
|
|
|
|
|
2015-02-26 11:33:29 +01:00
|
|
|
|
Streams always start flowing from a :class:`Source[Out,M1]` then can continue through :class:`Flow[In,Out,M2]` elements or
|
|
|
|
|
|
more advanced graph elements to finally be consumed by a :class:`Sink[In,M3]` (ignore the type parameters ``M1``, ``M2``
|
2015-07-10 12:49:15 +02:00
|
|
|
|
and ``M3`` for now, they are not relevant to the types of the elements produced/consumed by these classes – they are
|
|
|
|
|
|
"materialized types", which we'll talk about :ref:`below <materialized-values-quick-scala>`).
|
2014-12-20 14:11:29 +01:00
|
|
|
|
|
2015-07-10 12:49:15 +02:00
|
|
|
|
The operations should look familiar to anyone who has used the Scala Collections library,
|
|
|
|
|
|
however they operate on streams and not collections of data (which is a very important distinction, as some operations
|
|
|
|
|
|
only make sense in streaming and vice versa):
|
2014-12-20 14:11:29 +01:00
|
|
|
|
|
2016-01-13 16:25:24 +01:00
|
|
|
|
.. includecode:: ../code/docs/stream/TwitterStreamQuickstartDocSpec.scala#authors-filter-map
|
2014-12-20 14:11:29 +01:00
|
|
|
|
|
|
|
|
|
|
Finally in order to :ref:`materialize <stream-materialization-scala>` and run the stream computation we need to attach
|
2016-01-06 13:27:41 +01:00
|
|
|
|
the Flow to a :class:`Sink` that will get the Flow running. The simplest way to do this is to call
|
2015-02-26 11:33:29 +01:00
|
|
|
|
``runWith(sink)`` on a ``Source``. For convenience a number of common Sinks are predefined and collected as methods on
|
2015-04-08 13:15:59 +02:00
|
|
|
|
the :class:`Sink` `companion object <http://doc.akka.io/api/akka-stream-and-http-experimental/@version@/#akka.stream.scaladsl.Sink$>`_.
|
2014-12-20 14:11:29 +01:00
|
|
|
|
For now let's simply print each author:
|
|
|
|
|
|
|
2016-01-13 16:25:24 +01:00
|
|
|
|
.. includecode:: ../code/docs/stream/TwitterStreamQuickstartDocSpec.scala#authors-foreachsink-println
|
2014-12-20 14:11:29 +01:00
|
|
|
|
|
2016-01-06 13:27:41 +01:00
|
|
|
|
or by using the shorthand version (which are defined only for the most popular Sinks such as ``Sink.fold`` and ``Sink.foreach``):
|
2014-12-20 14:11:29 +01:00
|
|
|
|
|
2016-01-13 16:25:24 +01:00
|
|
|
|
.. includecode:: ../code/docs/stream/TwitterStreamQuickstartDocSpec.scala#authors-foreach-println
|
2014-12-20 14:11:29 +01:00
|
|
|
|
|
2015-06-23 18:28:53 +02:00
|
|
|
|
Materializing and running a stream always requires a :class:`Materializer` to be in implicit scope (or passed in explicitly,
|
2015-02-26 11:33:29 +01:00
|
|
|
|
like this: ``.run(materializer)``).
|
2014-12-20 14:11:29 +01:00
|
|
|
|
|
2015-07-10 12:49:15 +02:00
|
|
|
|
The complete snippet looks like this:
|
|
|
|
|
|
|
2016-01-13 16:25:24 +01:00
|
|
|
|
.. includecode:: ../code/docs/stream/TwitterStreamQuickstartDocSpec.scala#first-sample
|
2015-07-10 12:49:15 +02:00
|
|
|
|
|
2014-12-20 14:11:29 +01:00
|
|
|
|
Flattening sequences in streams
|
|
|
|
|
|
-------------------------------
|
|
|
|
|
|
In the previous section we were working on 1:1 relationships of elements which is the most common case, but sometimes
|
|
|
|
|
|
we might want to map from one element to a number of elements and receive a "flattened" stream, similarly like ``flatMap``
|
|
|
|
|
|
works on Scala Collections. In order to get a flattened stream of hashtags from our stream of tweets we can use the ``mapConcat``
|
|
|
|
|
|
combinator:
|
|
|
|
|
|
|
2016-01-13 16:25:24 +01:00
|
|
|
|
.. includecode:: ../code/docs/stream/TwitterStreamQuickstartDocSpec.scala#hashtags-mapConcat
|
2014-12-20 14:11:29 +01:00
|
|
|
|
|
|
|
|
|
|
.. note::
|
|
|
|
|
|
The name ``flatMap`` was consciously avoided due to its proximity with for-comprehensions and monadic composition.
|
2015-02-26 11:33:29 +01:00
|
|
|
|
It is problematic for two reasons: first, flattening by concatenation is often undesirable in bounded stream processing
|
|
|
|
|
|
due to the risk of deadlock (with merge being the preferred strategy), and second, the monad laws would not hold for
|
2014-12-20 14:11:29 +01:00
|
|
|
|
our implementation of flatMap (due to the liveness issues).
|
|
|
|
|
|
|
2016-01-06 13:27:41 +01:00
|
|
|
|
Please note that the ``mapConcat`` requires the supplied function to return a strict collection (``f:Out=>immutable.Seq[T]``),
|
2014-12-20 14:11:29 +01:00
|
|
|
|
whereas ``flatMap`` would have to operate on streams all the way through.
|
|
|
|
|
|
|
|
|
|
|
|
Broadcasting a stream
|
|
|
|
|
|
---------------------
|
|
|
|
|
|
Now let's say we want to persist all hashtags, as well as all author names from this one live stream.
|
|
|
|
|
|
For example we'd like to write all author handles into one file, and all hashtags into another file on disk.
|
2016-01-06 13:27:41 +01:00
|
|
|
|
This means we have to split the source stream into two streams which will handle the writing to these different files.
|
2014-12-20 14:11:29 +01:00
|
|
|
|
|
|
|
|
|
|
Elements that can be used to form such "fan-out" (or "fan-in") structures are referred to as "junctions" in Akka Streams.
|
|
|
|
|
|
One of these that we'll be using in this example is called :class:`Broadcast`, and it simply emits elements from its
|
|
|
|
|
|
input port to all of its output ports.
|
|
|
|
|
|
|
2015-11-30 13:37:11 +01:00
|
|
|
|
Akka Streams intentionally separate the linear stream structures (Flows) from the non-linear, branching ones (Graphs)
|
2014-12-20 14:11:29 +01:00
|
|
|
|
in order to offer the most convenient API for both of these cases. Graphs can express arbitrarily complex stream setups
|
2015-07-10 12:49:15 +02:00
|
|
|
|
at the expense of not reading as familiarly as collection transformations.
|
|
|
|
|
|
|
2015-11-30 15:45:37 +01:00
|
|
|
|
Graphs are constructed using :class:`GraphDSL` like this:
|
2014-12-20 14:11:29 +01:00
|
|
|
|
|
2016-01-13 16:25:24 +01:00
|
|
|
|
.. includecode:: ../code/docs/stream/TwitterStreamQuickstartDocSpec.scala#flow-graph-broadcast
|
2014-12-20 14:11:29 +01:00
|
|
|
|
|
2015-11-30 15:45:37 +01:00
|
|
|
|
As you can see, inside the :class:`GraphDSL` we use an implicit graph builder ``b`` to mutably construct the graph
|
2015-11-30 13:37:11 +01:00
|
|
|
|
using the ``~>`` "edge operator" (also read as "connect" or "via" or "to"). The operator is provided implicitly
|
2015-11-30 15:45:37 +01:00
|
|
|
|
by importing ``GraphDSL.Implicits._``.
|
2015-11-30 13:37:11 +01:00
|
|
|
|
|
2015-11-30 15:45:37 +01:00
|
|
|
|
``GraphDSL.create`` returns a :class:`Graph`, in this example a :class:`Graph[ClosedShape, Unit]` where
|
2015-11-30 13:37:11 +01:00
|
|
|
|
:class:`ClosedShape` means that it is *a fully connected graph* or "closed" - there are no unconnected inputs or outputs.
|
|
|
|
|
|
Since it is closed it is possible to transform the graph into a :class:`RunnableGraph` using ``RunnableGraph.fromGraph``.
|
|
|
|
|
|
The runnable graph can then be ``run()`` to materialize a stream out of it.
|
|
|
|
|
|
|
|
|
|
|
|
Both :class:`Graph` and :class:`RunnableGraph` are *immutable, thread-safe, and freely shareable*.
|
2014-12-20 14:11:29 +01:00
|
|
|
|
|
2015-11-30 13:37:11 +01:00
|
|
|
|
A graph can also have one of several other shapes, with one or more unconnected ports. Having unconnected ports
|
|
|
|
|
|
expresses a grapth that is a *partial graph*. Concepts around composing and nesting graphs in large structures are
|
2016-01-06 13:27:41 +01:00
|
|
|
|
explained in detail in :ref:`composition-scala`. It is also possible to wrap complex computation graphs
|
2015-11-30 13:37:11 +01:00
|
|
|
|
as Flows, Sinks or Sources, which will be explained in detail in
|
|
|
|
|
|
:ref:`constructing-sources-sinks-flows-from-partial-graphs-scala`.
|
2014-12-20 14:11:29 +01:00
|
|
|
|
|
|
|
|
|
|
Back-pressure in action
|
|
|
|
|
|
-----------------------
|
|
|
|
|
|
One of the main advantages of Akka Streams is that they *always* propagate back-pressure information from stream Sinks
|
|
|
|
|
|
(Subscribers) to their Sources (Publishers). It is not an optional feature, and is enabled at all times. To learn more
|
|
|
|
|
|
about the back-pressure protocol used by Akka Streams and all other Reactive Streams compatible implementations read
|
|
|
|
|
|
:ref:`back-pressure-explained-scala`.
|
|
|
|
|
|
|
|
|
|
|
|
A typical problem applications (not using Akka Streams) like this often face is that they are unable to process the incoming data fast enough,
|
|
|
|
|
|
either temporarily or by design, and will start buffering incoming data until there's no more space to buffer, resulting
|
|
|
|
|
|
in either ``OutOfMemoryError`` s or other severe degradations of service responsiveness. With Akka Streams buffering can
|
|
|
|
|
|
and must be handled explicitly. For example, if we are only interested in the "*most recent tweets, with a buffer of 10
|
|
|
|
|
|
elements*" this can be expressed using the ``buffer`` element:
|
|
|
|
|
|
|
2016-01-13 16:25:24 +01:00
|
|
|
|
.. includecode:: ../code/docs/stream/TwitterStreamQuickstartDocSpec.scala#tweets-slow-consumption-dropHead
|
2014-12-20 14:11:29 +01:00
|
|
|
|
|
|
|
|
|
|
The ``buffer`` element takes an explicit and required ``OverflowStrategy``, which defines how the buffer should react
|
2015-06-02 15:05:12 +03:00
|
|
|
|
when it receives another element while it is full. Strategies provided include dropping the oldest element (``dropHead``),
|
2014-12-20 14:11:29 +01:00
|
|
|
|
dropping the entire buffer, signalling errors etc. Be sure to pick and choose the strategy that fits your use case best.
|
|
|
|
|
|
|
2015-07-10 12:49:15 +02:00
|
|
|
|
.. _materialized-values-quick-scala:
|
|
|
|
|
|
|
2014-12-20 14:11:29 +01:00
|
|
|
|
Materialized values
|
|
|
|
|
|
-------------------
|
|
|
|
|
|
So far we've been only processing data using Flows and consuming it into some kind of external Sink - be it by printing
|
|
|
|
|
|
values or storing them in some external system. However sometimes we may be interested in some value that can be
|
|
|
|
|
|
obtained from the materialized processing pipeline. For example, we want to know how many tweets we have processed.
|
|
|
|
|
|
While this question is not as obvious to give an answer to in case of an infinite stream of tweets (one way to answer
|
2016-01-06 13:27:41 +01:00
|
|
|
|
this question in a streaming setting would be to create a stream of counts described as "*up until now*, we've processed N tweets"),
|
2014-12-20 14:11:29 +01:00
|
|
|
|
but in general it is possible to deal with finite streams and come up with a nice result such as a total count of elements.
|
|
|
|
|
|
|
2015-07-10 12:49:15 +02:00
|
|
|
|
First, let's write such an element counter using ``Sink.fold`` and see how the types look like:
|
2014-12-20 14:11:29 +01:00
|
|
|
|
|
2016-01-13 16:25:24 +01:00
|
|
|
|
.. includecode:: ../code/docs/stream/TwitterStreamQuickstartDocSpec.scala#tweets-fold-count
|
2014-12-20 14:11:29 +01:00
|
|
|
|
|
2016-01-06 13:27:41 +01:00
|
|
|
|
First we prepare a reusable ``Flow`` that will change each incoming tweet into an integer of value ``1``. We'll use this in
|
|
|
|
|
|
order to combine those with a ``Sink.fold`` that will sum all ``Int`` elements of the stream and make its result available as
|
|
|
|
|
|
a ``Future[Int]``. Next we connect the ``tweets`` stream to ``count`` with ``via``. Finally we connect the Flow to the previously
|
|
|
|
|
|
prepared Sink using ``toMat``.
|
2015-07-10 12:49:15 +02:00
|
|
|
|
|
|
|
|
|
|
Remember those mysterious ``Mat`` type parameters on ``Source[+Out, +Mat]``, ``Flow[-In, +Out, +Mat]`` and ``Sink[-In, +Mat]``?
|
|
|
|
|
|
They represent the type of values these processing parts return when materialized. When you chain these together,
|
2016-01-06 13:27:41 +01:00
|
|
|
|
you can explicitly combine their materialized values. In our example we used the ``Keep.right`` predefined function,
|
2015-07-10 12:49:15 +02:00
|
|
|
|
which tells the implementation to only care about the materialized type of the stage currently appended to the right.
|
2016-01-06 13:27:41 +01:00
|
|
|
|
The materialized type of ``sumSink`` is ``Future[Int]`` and because of using ``Keep.right``, the resulting :class:`RunnableGraph`
|
|
|
|
|
|
has also a type parameter of ``Future[Int]``.
|
2015-02-26 11:33:29 +01:00
|
|
|
|
|
|
|
|
|
|
This step does *not* yet materialize the
|
2014-12-20 14:11:29 +01:00
|
|
|
|
processing pipeline, it merely prepares the description of the Flow, which is now connected to a Sink, and therefore can
|
2015-07-10 12:49:15 +02:00
|
|
|
|
be ``run()``, as indicated by its type: ``RunnableGraph[Future[Int]]``. Next we call ``run()`` which uses the implicit :class:`ActorMaterializer`
|
2016-01-06 13:27:41 +01:00
|
|
|
|
to materialize and run the Flow. The value returned by calling ``run()`` on a ``RunnableGraph[T]`` is of type ``T``.
|
|
|
|
|
|
In our case this type is ``Future[Int]`` which, when completed, will contain the total length of our ``tweets`` stream.
|
2014-12-20 14:11:29 +01:00
|
|
|
|
In case of the stream failing, this future would complete with a Failure.
|
|
|
|
|
|
|
2015-06-23 18:41:55 +02:00
|
|
|
|
A :class:`RunnableGraph` may be reused
|
2014-12-20 14:11:29 +01:00
|
|
|
|
and materialized multiple times, because it is just the "blueprint" of the stream. This means that if we materialize a stream,
|
|
|
|
|
|
for example one that consumes a live stream of tweets within a minute, the materialized values for those two materializations
|
|
|
|
|
|
will be different, as illustrated by this example:
|
|
|
|
|
|
|
2016-01-13 16:25:24 +01:00
|
|
|
|
.. includecode:: ../code/docs/stream/TwitterStreamQuickstartDocSpec.scala#tweets-runnable-flow-materialized-twice
|
2014-12-20 14:11:29 +01:00
|
|
|
|
|
|
|
|
|
|
Many elements in Akka Streams provide materialized values which can be used for obtaining either results of computation or
|
|
|
|
|
|
steering these elements which will be discussed in detail in :ref:`stream-materialization-scala`. Summing up this section, now we know
|
|
|
|
|
|
what happens behind the scenes when we run this one-liner, which is equivalent to the multi line version above:
|
|
|
|
|
|
|
2016-01-13 16:25:24 +01:00
|
|
|
|
.. includecode:: ../code/docs/stream/TwitterStreamQuickstartDocSpec.scala#tweets-fold-count-oneline
|
2015-02-26 11:33:29 +01:00
|
|
|
|
|
|
|
|
|
|
.. note::
|
|
|
|
|
|
``runWith()`` is a convenience method that automatically ignores the materialized value of any other stages except
|
|
|
|
|
|
those appended by the ``runWith()`` itself. In the above example it translates to using ``Keep.right`` as the combiner
|
|
|
|
|
|
for materialized values.
|