2015-01-15 15:02:19 +01:00
.. _stream-quickstart-scala:
2014-12-20 14:11:29 +01:00
2014-12-22 16:18:26 +01:00
Quick Start Guide: Reactive Tweets
==================================
2014-12-20 14:11:29 +01:00
A typical use case for stream processing is consuming a live stream of data that we want to extract or aggregate some
other data from. In this example we'll consider consuming a stream of tweets and extracting information concerning Akka from them.
2014-12-20 16:56:22 +01:00
We will also consider the problem inherent to all non-blocking streaming
solutions: *"What if the subscriber is too slow to consume the live stream of
data?"*. Traditionally the solution is often to buffer the elements, but this
can—and usually will—cause eventual buffer overflows and instability of such
systems. Instead Akka Streams depend on internal backpressure signals that
allow to control what should happen in such scenarios.
2014-12-20 14:11:29 +01:00
Here's the data model we'll be working with throughout the quickstart examples:
.. includecode :: code/docs/stream/TwitterStreamQuickstartDocSpec.scala#model
Transforming and consuming simple streams
-----------------------------------------
2015-01-27 18:29:20 +01:00
In order to prepare our environment by creating an :class: `ActorSystem` and :class: `ActorFlowMaterializer` ,
2014-12-20 14:11:29 +01:00
which will be responsible for materializing and running the streams we are about to create:
.. includecode :: code/docs/stream/TwitterStreamQuickstartDocSpec.scala#materializer-setup
2015-01-27 18:29:20 +01:00
The :class: `ActorFlowMaterializer` can optionally take :class: `ActorFlowMaterializerSettings` which can be used to define
2015-01-29 16:43:47 +01:00
materialization properties, such as default buffer sizes (see also :ref: `stream-buffers-scala` ), the dispatcher to
2014-12-20 14:11:29 +01:00
be used by the pipeline etc. These can be overridden on an element-by-element basis or for an entire section, but this
will be discussed in depth in :ref: `stream-section-configuration` .
2015-02-26 11:33:29 +01:00
Let's assume we have a stream of tweets readily available, in Akka this is expressed as a :class: `Source[Out, M]` :
2014-12-20 14:11:29 +01:00
.. includecode :: code/docs/stream/TwitterStreamQuickstartDocSpec.scala#tweet-source
2015-02-26 11:33:29 +01:00
Streams always start flowing from a :class: `Source[Out,M1]` then can continue through :class: `Flow[In,Out,M2]` elements or
more advanced graph elements to finally be consumed by a :class: `Sink[In,M3]` (ignore the type parameters `` M1 `` , `` M2 ``
and `` M3 `` for now, they are not relevant to the types of the elements produced/consumed by these classes). Both Sources and
Flows provide stream operations that can be used to transform the flowing data, a :class: `Sink` however does not since
its the "end of stream" and its behavior depends on the type of :class: `Sink` used.
2014-12-20 14:11:29 +01:00
In our case let's say we want to find all twitter handles of users which tweet about `` #akka `` , the operations should look
familiar to anyone who has used the Scala Collections library, however they operate on streams and not collections of data:
.. includecode :: code/docs/stream/TwitterStreamQuickstartDocSpec.scala#authors-filter-map
Finally in order to :ref: `materialize <stream-materialization-scala>` and run the stream computation we need to attach
2015-02-26 11:33:29 +01:00
the Flow to a :class: `Sink` that will get the flow running. The simplest way to do this is to call
`` runWith(sink) `` on a `` Source `` . For convenience a number of common Sinks are predefined and collected as methods on
2015-02-27 14:05:14 +01:00
the :class: ``Sink` ` `companion object <http://doc.akka.io/api/akka-stream-and-http-experimental/1.0-M4/#akka.stream.scaladsl.Sink$> `_ .
2014-12-20 14:11:29 +01:00
For now let's simply print each author:
.. includecode :: code/docs/stream/TwitterStreamQuickstartDocSpec.scala#authors-foreachsink-println
or by using the shorthand version (which are defined only for the most popular sinks such as :class: `FoldSink` and :class: `ForeachSink` ):
.. includecode :: code/docs/stream/TwitterStreamQuickstartDocSpec.scala#authors-foreach-println
Materializing and running a stream always requires a :class: `FlowMaterializer` to be in implicit scope (or passed in explicitly,
2015-02-26 11:33:29 +01:00
like this: `` .run(materializer) `` ).
2014-12-20 14:11:29 +01:00
Flattening sequences in streams
-------------------------------
In the previous section we were working on 1:1 relationships of elements which is the most common case, but sometimes
we might want to map from one element to a number of elements and receive a "flattened" stream, similarly like `` flatMap ``
works on Scala Collections. In order to get a flattened stream of hashtags from our stream of tweets we can use the `` mapConcat ``
combinator:
.. includecode :: code/docs/stream/TwitterStreamQuickstartDocSpec.scala#hashtags-mapConcat
.. note ::
The name `` flatMap `` was consciously avoided due to its proximity with for-comprehensions and monadic composition.
2015-02-26 11:33:29 +01:00
It is problematic for two reasons: first, flattening by concatenation is often undesirable in bounded stream processing
due to the risk of deadlock (with merge being the preferred strategy), and second, the monad laws would not hold for
2014-12-20 14:11:29 +01:00
our implementation of flatMap (due to the liveness issues).
2014-12-22 16:18:26 +01:00
Please note that the mapConcat requires the supplied function to return a strict collection (`` f:Out=>immutable.Seq[T] `` ),
2014-12-20 14:11:29 +01:00
whereas `` flatMap `` would have to operate on streams all the way through.
Broadcasting a stream
---------------------
Now let's say we want to persist all hashtags, as well as all author names from this one live stream.
For example we'd like to write all author handles into one file, and all hashtags into another file on disk.
This means we have to split the source stream into 2 streams which will handle the writing to these different files.
Elements that can be used to form such "fan-out" (or "fan-in") structures are referred to as "junctions" in Akka Streams.
One of these that we'll be using in this example is called :class: `Broadcast` , and it simply emits elements from its
input port to all of its output ports.
Akka Streams intentionally separate the linear stream structures (Flows) from the non-linear, branching ones (FlowGraphs)
in order to offer the most convenient API for both of these cases. Graphs can express arbitrarily complex stream setups
at the expense of not reading as familiarly as collection transformations. It is also possible to wrap complex computation
graphs as Flows, Sinks or Sources, which will be explained in detail in :ref: `constructing-sources-sinks-flows-from-partial-graphs-scala` .
FlowGraphs are constructed like this:
.. includecode :: code/docs/stream/TwitterStreamQuickstartDocSpec.scala#flow-graph-broadcast
.. note ::
2015-02-26 11:33:29 +01:00
The `` ~> `` (read as "edge", "via" or "to") operator is only available if `` FlowGraph.Implicits._ `` are imported.
2014-12-20 14:11:29 +01:00
Without this import you can still construct graphs using the `` builder.addEdge(from,[through,]to) `` method.
As you can see, inside the :class: `FlowGraph` we use an implicit graph builder to mutably construct the graph
using the `` ~> `` "edge operator" (also read as "connect" or "via" or "to"). Once we have the FlowGraph in the value `` g ``
*it is immutable, thread-safe, and freely shareable* . A graph can can be `` run() `` directly - assuming all
2015-02-26 11:33:29 +01:00
ports (sinks/sources) within a flow have been connected properly. It is possible to construct partial graphs
2014-12-20 14:11:29 +01:00
where this is not required but this will be covered in detail in :ref: `partial-flow-graph-scala` .
As all Akka Streams elements, :class: `Broadcast` will properly propagate back-pressure to its upstream element.
Back-pressure in action
-----------------------
One of the main advantages of Akka Streams is that they *always* propagate back-pressure information from stream Sinks
(Subscribers) to their Sources (Publishers). It is not an optional feature, and is enabled at all times. To learn more
about the back-pressure protocol used by Akka Streams and all other Reactive Streams compatible implementations read
:ref: `back-pressure-explained-scala` .
A typical problem applications (not using Akka Streams) like this often face is that they are unable to process the incoming data fast enough,
either temporarily or by design, and will start buffering incoming data until there's no more space to buffer, resulting
in either `` OutOfMemoryError `` s or other severe degradations of service responsiveness. With Akka Streams buffering can
and must be handled explicitly. For example, if we are only interested in the "*most recent tweets, with a buffer of 10
elements*" this can be expressed using the `` buffer `` element:
.. includecode :: code/docs/stream/TwitterStreamQuickstartDocSpec.scala#tweets-slow-consumption-dropHead
The `` buffer `` element takes an explicit and required `` OverflowStrategy `` , which defines how the buffer should react
when it receives another element element while it is full. Strategies provided include dropping the oldest element (`` dropHead `` ),
dropping the entire buffer, signalling errors etc. Be sure to pick and choose the strategy that fits your use case best.
Materialized values
-------------------
So far we've been only processing data using Flows and consuming it into some kind of external Sink - be it by printing
values or storing them in some external system. However sometimes we may be interested in some value that can be
obtained from the materialized processing pipeline. For example, we want to know how many tweets we have processed.
While this question is not as obvious to give an answer to in case of an infinite stream of tweets (one way to answer
this question in a streaming setting would to create a stream of counts described as "*up until now* , we've processed N tweets"),
but in general it is possible to deal with finite streams and come up with a nice result such as a total count of elements.
2015-02-26 11:33:29 +01:00
First, let's write such an element counter using :class: `FoldSink` and see how the types look like:
2014-12-20 14:11:29 +01:00
.. includecode :: code/docs/stream/TwitterStreamQuickstartDocSpec.scala#tweets-fold-count
First, we prepare the :class: `FoldSink` which will be used to sum all `` Int `` elements of the stream.
Next we connect the `` tweets `` stream though a `` map `` step which converts each tweet into the number `` 1 `` ,
2015-02-26 11:33:29 +01:00
finally we connect the flow using `` toMat `` the previously prepared Sink. Remember those mysterious type parameters on
:class: `Source` :class: `Flow` and :class: `Sink` ? They represent the type of values these processing parts return when
materialized. When you chain these together, you can explicitly combine their materialized values: in our example we
used the `` Keep.right `` predefined function, which tells the implementation to only care about the materialized
type of the stage currently appended to the right. As you can notice, the materialized type of sumSink is `` Future[Int] ``
and because of using `` Keep.right `` , the resulting :class: `RunnableFlow` has also a type parameter of `` Future[Int] `` .
This step does *not* yet materialize the
2014-12-20 14:11:29 +01:00
processing pipeline, it merely prepares the description of the Flow, which is now connected to a Sink, and therefore can
2015-02-26 11:33:29 +01:00
be `` run() `` , as indicated by its type: :class: `RunnableFlow[Future[Int]]` . Next we call `` run() `` which uses the implicit :class: `ActorFlowMaterializer`
to materialize and run the flow. The value returned by calling `` run() `` on a `` RunnableFlow[T] `` is of type `` T `` .
In our case this type is `` Future[Int] `` which, when completed, will contain the total length of our tweets stream.
2014-12-20 14:11:29 +01:00
In case of the stream failing, this future would complete with a Failure.
2015-02-26 11:33:29 +01:00
A :class: `RunnableFlow` may be reused
2014-12-20 14:11:29 +01:00
and materialized multiple times, because it is just the "blueprint" of the stream. This means that if we materialize a stream,
for example one that consumes a live stream of tweets within a minute, the materialized values for those two materializations
will be different, as illustrated by this example:
.. includecode :: code/docs/stream/TwitterStreamQuickstartDocSpec.scala#tweets-runnable-flow-materialized-twice
Many elements in Akka Streams provide materialized values which can be used for obtaining either results of computation or
steering these elements which will be discussed in detail in :ref: `stream-materialization-scala` . Summing up this section, now we know
what happens behind the scenes when we run this one-liner, which is equivalent to the multi line version above:
.. includecode :: code/docs/stream/TwitterStreamQuickstartDocSpec.scala#tweets-fold-count-oneline
2015-02-26 11:33:29 +01:00
.. note ::
`` runWith() `` is a convenience method that automatically ignores the materialized value of any other stages except
those appended by the `` runWith() `` itself. In the above example it translates to using `` Keep.right `` as the combiner
for materialized values.