+doc Add more imports to the stream quick start guides for Java and Scala (#20797)

* Add more imports to the stream quick start guides.

This makes it easier for people to execute the code samples while they
read through the guide.

* Change line endings to be consistent with other files

For some reason these 2 files had CR+LF line endings.
This commit is contained in:
Chris Birchall 2016-06-18 11:15:17 +02:00 committed by Konrad Malawski
parent a9cb8ab4b5
commit f246c56087
4 changed files with 692 additions and 677 deletions

View file

@ -3,23 +3,27 @@
*/ */
package docs.stream; package docs.stream;
//#stream-imports
import akka.stream.*;
import akka.stream.javadsl.*;
//#stream-imports
//#other-imports
import akka.Done;
import akka.NotUsed;
import akka.actor.ActorSystem;
import akka.util.ByteString;
import java.nio.file.Paths; import java.nio.file.Paths;
import java.math.BigInteger; import java.math.BigInteger;
import java.util.concurrent.CompletionStage; import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.junit.*;
import akka.Done;
import akka.NotUsed;
import akka.actor.ActorSystem;
//#imports
import akka.stream.*;
import akka.stream.javadsl.*;
//#imports
import akka.util.ByteString;
import scala.concurrent.duration.Duration; import scala.concurrent.duration.Duration;
//#other-imports
import org.junit.*;
/** /**
* This class is not meant to be run as a test in the test suite, but it * This class is not meant to be run as a test in the test suite, but it

View file

@ -1,333 +1,337 @@
.. _stream-quickstart-java: .. _stream-quickstart-java:
Quick Start Guide Quick Start Guide
================= =================
A stream usually begins at a source, so this is also how we start an Akka A stream usually begins at a source, so this is also how we start an Akka
Stream. Before we create one, we import the full complement of streaming tools: Stream. Before we create one, we import the full complement of streaming tools:
.. includecode:: ../code/docs/stream/QuickStartDocTest.java#imports .. includecode:: ../code/docs/stream/QuickStartDocTest.java#stream-imports
Now we will start with a rather simple source, emitting the integers 1 to 100: If you want to execute the code samples while you read through the quick start guide, you will also need the following imports:
.. includecode:: ../code/docs/stream/QuickStartDocTest.java#create-source .. includecode:: ../code/docs/stream/QuickStartDocTest.java#other-imports
The :class:`Source` type is parameterized with two types: the first one is the Now we will start with a rather simple source, emitting the integers 1 to 100:
type of element that this source emits and the second one may signal that
running the source produces some auxiliary value (e.g. a network source may .. includecode:: ../code/docs/stream/QuickStartDocTest.java#create-source
provide information about the bound port or the peers address). Where no
auxiliary information is produced, the type ``akka.NotUsed`` is used—and a The :class:`Source` type is parameterized with two types: the first one is the
simple range of integers surely falls into this category. type of element that this source emits and the second one may signal that
running the source produces some auxiliary value (e.g. a network source may
Having created this source means that we have a description of how to emit the provide information about the bound port or the peers address). Where no
first 100 natural numbers, but this source is not yet active. In order to get auxiliary information is produced, the type ``akka.NotUsed`` is used—and a
those numbers out we have to run it: simple range of integers surely falls into this category.
.. includecode:: ../code/docs/stream/QuickStartDocTest.java#run-source Having created this source means that we have a description of how to emit the
first 100 natural numbers, but this source is not yet active. In order to get
This line will complement the source with a consumer function—in this example those numbers out we have to run it:
we simply print out the numbers to the console—and pass this little stream
setup to an Actor that runs it. This activation is signaled by having “run” be .. includecode:: ../code/docs/stream/QuickStartDocTest.java#run-source
part of the method name; there are other methods that run Akka Streams, and
they all follow this pattern. This line will complement the source with a consumer function—in this example
we simply print out the numbers to the console—and pass this little stream
You may wonder where the Actor gets created that runs the stream, and you are setup to an Actor that runs it. This activation is signaled by having “run” be
probably also asking yourself what this ``materializer`` means. In order to get part of the method name; there are other methods that run Akka Streams, and
this value we first need to create an Actor system: they all follow this pattern.
.. includecode:: ../code/docs/stream/QuickStartDocTest.java#create-materializer You may wonder where the Actor gets created that runs the stream, and you are
probably also asking yourself what this ``materializer`` means. In order to get
There are other ways to create a materializer, e.g. from an this value we first need to create an Actor system:
:class:`ActorContext` when using streams from within Actors. The
:class:`Materializer` is a factory for stream execution engines, it is the .. includecode:: ../code/docs/stream/QuickStartDocTest.java#create-materializer
thing that makes streams run—you dont need to worry about any of the details
just now apart from that you need one for calling any of the ``run`` methods on There are other ways to create a materializer, e.g. from an
a :class:`Source`. :class:`ActorContext` when using streams from within Actors. The
:class:`Materializer` is a factory for stream execution engines, it is the
The nice thing about Akka Streams is that the :class:`Source` is just a thing that makes streams run—you dont need to worry about any of the details
description of what you want to run, and like an architects blueprint it can just now apart from that you need one for calling any of the ``run`` methods on
be reused, incorporated into a larger design. We may choose to transform the a :class:`Source`.
source of integers and write it to a file instead:
The nice thing about Akka Streams is that the :class:`Source` is just a
.. includecode:: ../code/docs/stream/QuickStartDocTest.java#transform-source description of what you want to run, and like an architects blueprint it can
be reused, incorporated into a larger design. We may choose to transform the
First we use the ``scan`` combinator to run a computation over the whole source of integers and write it to a file instead:
stream: starting with the number 1 (``BigInteger.ONE``) we multiple by each of
the incoming numbers, one after the other; the scan operationemits the initial .. includecode:: ../code/docs/stream/QuickStartDocTest.java#transform-source
value and then every calculation result. This yields the series of factorial
numbers which we stash away as a :class:`Source` for later reuse—it is First we use the ``scan`` combinator to run a computation over the whole
important to keep in mind that nothing is actually computed yet, this is just a stream: starting with the number 1 (``BigInteger.ONE``) we multiple by each of
description of what we want to have computed once we run the stream. Then we the incoming numbers, one after the other; the scan operationemits the initial
convert the resulting series of numbers into a stream of :class:`ByteString` value and then every calculation result. This yields the series of factorial
objects describing lines in a text file. This stream is then run by attaching a numbers which we stash away as a :class:`Source` for later reuse—it is
file as the receiver of the data. In the terminology of Akka Streams this is important to keep in mind that nothing is actually computed yet, this is just a
called a :class:`Sink`. :class:`IOResult` is a type that IO operations return description of what we want to have computed once we run the stream. Then we
in Akka Streams in order to tell you how many bytes or elements were consumed convert the resulting series of numbers into a stream of :class:`ByteString`
and whether the stream terminated normally or exceptionally. objects describing lines in a text file. This stream is then run by attaching a
file as the receiver of the data. In the terminology of Akka Streams this is
Reusable Pieces called a :class:`Sink`. :class:`IOResult` is a type that IO operations return
--------------- in Akka Streams in order to tell you how many bytes or elements were consumed
and whether the stream terminated normally or exceptionally.
One of the nice parts of Akka Streams—and something that other stream libraries
do not offer—is that not only sources can be reused like blueprints, all other Reusable Pieces
elements can be as well. We can take the file-writing :class:`Sink`, prepend ---------------
the processing steps necessary to get the :class:`ByteString` elements from
incoming strings and package that up as a reusable piece as well. Since the One of the nice parts of Akka Streams—and something that other stream libraries
language for writing these streams always flows from left to right (just like do not offer—is that not only sources can be reused like blueprints, all other
plain English), we need a starting point that is like a source but with an elements can be as well. We can take the file-writing :class:`Sink`, prepend
“open” input. In Akka Streams this is called a :class:`Flow`: the processing steps necessary to get the :class:`ByteString` elements from
incoming strings and package that up as a reusable piece as well. Since the
.. includecode:: ../code/docs/stream/QuickStartDocTest.java#transform-sink language for writing these streams always flows from left to right (just like
plain English), we need a starting point that is like a source but with an
Starting from a flow of strings we convert each to :class:`ByteString` and then “open” input. In Akka Streams this is called a :class:`Flow`:
feed to the already known file-writing :class:`Sink`. The resulting blueprint
is a :class:`Sink<String, CompletionStage<IOResult>>`, which means that it .. includecode:: ../code/docs/stream/QuickStartDocTest.java#transform-sink
accepts strings as its input and when materialized it will create auxiliary
information of type ``CompletionStage<IOResult>`` (when chaining operations on Starting from a flow of strings we convert each to :class:`ByteString` and then
a :class:`Source` or :class:`Flow` the type of the auxiliary information—called feed to the already known file-writing :class:`Sink`. The resulting blueprint
the “materialized value”—is given by the leftmost starting point; since we want is a :class:`Sink<String, CompletionStage<IOResult>>`, which means that it
to retain what the ``FileIO.toFile`` sink has to offer, we need to say accepts strings as its input and when materialized it will create auxiliary
``Keep.right()``). information of type ``CompletionStage<IOResult>`` (when chaining operations on
a :class:`Source` or :class:`Flow` the type of the auxiliary information—called
We can use the new and shiny :class:`Sink` we just created by the “materialized value”—is given by the leftmost starting point; since we want
attaching it to our ``factorials`` source—after a small adaptation to turn the to retain what the ``FileIO.toFile`` sink has to offer, we need to say
numbers into strings: ``Keep.right()``).
.. includecode:: ../code/docs/stream/QuickStartDocTest.java#use-transformed-sink We can use the new and shiny :class:`Sink` we just created by
attaching it to our ``factorials`` source—after a small adaptation to turn the
Time-Based Processing numbers into strings:
---------------------
.. includecode:: ../code/docs/stream/QuickStartDocTest.java#use-transformed-sink
Before we start looking at a more involved example we explore the streaming
nature of what Akka Streams can do. Starting from the ``factorials`` source Time-Based Processing
we transform the stream by zipping it together with another stream, ---------------------
represented by a :class:`Source` that emits the number 0 to 100: the first
number emitted by the ``factorials`` source is the factorial of zero, the Before we start looking at a more involved example we explore the streaming
second is the factorial of one, and so on. We combine these two by forming nature of what Akka Streams can do. Starting from the ``factorials`` source
strings like ``"3! = 6"``. we transform the stream by zipping it together with another stream,
represented by a :class:`Source` that emits the number 0 to 100: the first
.. includecode:: ../code/docs/stream/QuickStartDocTest.java#add-streams number emitted by the ``factorials`` source is the factorial of zero, the
second is the factorial of one, and so on. We combine these two by forming
All operations so far have been time-independent and could have been performed strings like ``"3! = 6"``.
in the same fashion on strict collections of elements. The next line
demonstrates that we are in fact dealing with streams that can flow at a .. includecode:: ../code/docs/stream/QuickStartDocTest.java#add-streams
certain speed: we use the ``throttle`` combinator to slow down the stream to 1
element per second (the second ``1`` in the argument list is the maximum size All operations so far have been time-independent and could have been performed
of a burst that we want to allow—passing ``1`` means that the first element in the same fashion on strict collections of elements. The next line
gets through immediately and the second then has to wait for one second and so demonstrates that we are in fact dealing with streams that can flow at a
on). certain speed: we use the ``throttle`` combinator to slow down the stream to 1
element per second (the second ``1`` in the argument list is the maximum size
If you run this program you will see one line printed per second. One aspect of a burst that we want to allow—passing ``1`` means that the first element
that is not immediately visible deserves mention, though: if you try and set gets through immediately and the second then has to wait for one second and so
the streams to produce a billion numbers each then you will notice that your on).
JVM does not crash with an OutOfMemoryError, even though you will also notice
that running the streams happens in the background, asynchronously (this is the If you run this program you will see one line printed per second. One aspect
reason for the auxiliary information to be provided as a that is not immediately visible deserves mention, though: if you try and set
:class:`CompletionStage`, in the future). The secret that makes this work is the streams to produce a billion numbers each then you will notice that your
that Akka Streams implicitly implement pervasive flow control, all combinators JVM does not crash with an OutOfMemoryError, even though you will also notice
respect back-pressure. This allows the throttle combinator to signal to all its that running the streams happens in the background, asynchronously (this is the
upstream sources of data that it can only accept elements at a certain reason for the auxiliary information to be provided as a
rate—when the incoming rate is higher than one per second the throttle :class:`CompletionStage`, in the future). The secret that makes this work is
combinator will assert *back-pressure* upstream. that Akka Streams implicitly implement pervasive flow control, all combinators
respect back-pressure. This allows the throttle combinator to signal to all its
This is basically all there is to Akka Streams in a nutshell—glossing over the upstream sources of data that it can only accept elements at a certain
fact that there are dozens of sources and sinks and many more stream rate—when the incoming rate is higher than one per second the throttle
transformation combinators to choose from, see also :ref:`stages-overview_java`. combinator will assert *back-pressure* upstream.
Reactive Tweets This is basically all there is to Akka Streams in a nutshell—glossing over the
=============== fact that there are dozens of sources and sinks and many more stream
transformation combinators to choose from, see also :ref:`stages-overview_java`.
A typical use case for stream processing is consuming a live stream of data that we want to extract or aggregate some
other data from. In this example we'll consider consuming a stream of tweets and extracting information concerning Akka from them. Reactive Tweets
===============
We will also consider the problem inherent to all non-blocking streaming
solutions: *"What if the subscriber is too slow to consume the live stream of A typical use case for stream processing is consuming a live stream of data that we want to extract or aggregate some
data?"*. Traditionally the solution is often to buffer the elements, but this other data from. In this example we'll consider consuming a stream of tweets and extracting information concerning Akka from them.
can—and usually will—cause eventual buffer overflows and instability of such
systems. Instead Akka Streams depend on internal backpressure signals that We will also consider the problem inherent to all non-blocking streaming
allow to control what should happen in such scenarios. solutions: *"What if the subscriber is too slow to consume the live stream of
data?"*. Traditionally the solution is often to buffer the elements, but this
Here's the data model we'll be working with throughout the quickstart examples: can—and usually will—cause eventual buffer overflows and instability of such
systems. Instead Akka Streams depend on internal backpressure signals that
.. includecode:: ../code/docs/stream/TwitterStreamQuickstartDocTest.java#model allow to control what should happen in such scenarios.
Here's the data model we'll be working with throughout the quickstart examples:
.. note::
If you would like to get an overview of the used vocabulary first instead of diving head-first .. includecode:: ../code/docs/stream/TwitterStreamQuickstartDocTest.java#model
into an actual example you can have a look at the :ref:`core-concepts-java` and :ref:`defining-and-running-streams-java`
sections of the docs, and then come back to this quickstart to see it all pieced together into a simple example application.
.. note::
Transforming and consuming simple streams If you would like to get an overview of the used vocabulary first instead of diving head-first
----------------------------------------- into an actual example you can have a look at the :ref:`core-concepts-java` and :ref:`defining-and-running-streams-java`
The example application we will be looking at is a simple Twitter feed stream from which we'll want to extract certain information, sections of the docs, and then come back to this quickstart to see it all pieced together into a simple example application.
like for example finding all twitter handles of users who tweet about ``#akka``.
Transforming and consuming simple streams
In order to prepare our environment by creating an :class:`ActorSystem` and :class:`ActorMaterializer`, -----------------------------------------
which will be responsible for materializing and running the streams we are about to create: The example application we will be looking at is a simple Twitter feed stream from which we'll want to extract certain information,
like for example finding all twitter handles of users who tweet about ``#akka``.
.. includecode:: ../code/docs/stream/TwitterStreamQuickstartDocTest.java#materializer-setup
In order to prepare our environment by creating an :class:`ActorSystem` and :class:`ActorMaterializer`,
The :class:`ActorMaterializer` can optionally take :class:`ActorMaterializerSettings` which can be used to define which will be responsible for materializing and running the streams we are about to create:
materialization properties, such as default buffer sizes (see also :ref:`async-stream-buffers-java`), the dispatcher to
be used by the pipeline etc. These can be overridden with ``withAttributes`` on :class:`Flow`, :class:`Source`, :class:`Sink` and :class:`Graph`. .. includecode:: ../code/docs/stream/TwitterStreamQuickstartDocTest.java#materializer-setup
Let's assume we have a stream of tweets readily available. In Akka this is expressed as a :class:`Source<Out, M>`: The :class:`ActorMaterializer` can optionally take :class:`ActorMaterializerSettings` which can be used to define
materialization properties, such as default buffer sizes (see also :ref:`async-stream-buffers-java`), the dispatcher to
.. includecode:: ../code/docs/stream/TwitterStreamQuickstartDocTest.java#tweet-source be used by the pipeline etc. These can be overridden with ``withAttributes`` on :class:`Flow`, :class:`Source`, :class:`Sink` and :class:`Graph`.
Streams always start flowing from a ``Source<Out,M1>`` then can continue through ``Flow<In,Out,M2>`` elements or Let's assume we have a stream of tweets readily available. In Akka this is expressed as a :class:`Source<Out, M>`:
more advanced graph elements to finally be consumed by a ``Sink<In,M3>``.
.. includecode:: ../code/docs/stream/TwitterStreamQuickstartDocTest.java#tweet-source
The first type parameter—:class:`Tweet` in this case—designates the kind of elements produced
by the source while the ``M`` type parameters describe the object that is created during Streams always start flowing from a ``Source<Out,M1>`` then can continue through ``Flow<In,Out,M2>`` elements or
materialization (:ref:`see below <materialized-values-quick-java>`)—:class:`BoxedUnit` (from the ``scala.runtime`` more advanced graph elements to finally be consumed by a ``Sink<In,M3>``.
package) means that no value is produced, it is the generic equivalent of ``void``.
The first type parameter—:class:`Tweet` in this case—designates the kind of elements produced
The operations should look familiar to anyone who has used the Scala Collections library, by the source while the ``M`` type parameters describe the object that is created during
however they operate on streams and not collections of data (which is a very important distinction, as some operations materialization (:ref:`see below <materialized-values-quick-java>`)—:class:`BoxedUnit` (from the ``scala.runtime``
only make sense in streaming and vice versa): package) means that no value is produced, it is the generic equivalent of ``void``.
.. includecode:: ../code/docs/stream/TwitterStreamQuickstartDocTest.java#authors-filter-map The operations should look familiar to anyone who has used the Scala Collections library,
however they operate on streams and not collections of data (which is a very important distinction, as some operations
Finally in order to :ref:`materialize <stream-materialization-java>` and run the stream computation we need to attach only make sense in streaming and vice versa):
the Flow to a ``Sink<T, M>`` that will get the Flow running. The simplest way to do this is to call
``runWith(sink)`` on a ``Source<Out, M>``. For convenience a number of common Sinks are predefined and collected as static methods on .. includecode:: ../code/docs/stream/TwitterStreamQuickstartDocTest.java#authors-filter-map
the `Sink class <http://doc.akka.io/japi/akka-stream-and-http-experimental/@version@/akka/stream/javadsl/Sink.html>`_.
For now let's simply print each author: Finally in order to :ref:`materialize <stream-materialization-java>` and run the stream computation we need to attach
the Flow to a ``Sink<T, M>`` that will get the Flow running. The simplest way to do this is to call
.. includecode:: ../code/docs/stream/TwitterStreamQuickstartDocTest.java#authors-foreachsink-println ``runWith(sink)`` on a ``Source<Out, M>``. For convenience a number of common Sinks are predefined and collected as static methods on
the `Sink class <http://doc.akka.io/japi/akka-stream-and-http-experimental/@version@/akka/stream/javadsl/Sink.html>`_.
or by using the shorthand version (which are defined only for the most popular Sinks such as :class:`Sink.fold` and :class:`Sink.foreach`): For now let's simply print each author:
.. includecode:: ../code/docs/stream/TwitterStreamQuickstartDocTest.java#authors-foreach-println .. includecode:: ../code/docs/stream/TwitterStreamQuickstartDocTest.java#authors-foreachsink-println
Materializing and running a stream always requires a :class:`Materializer` to be passed in explicitly, or by using the shorthand version (which are defined only for the most popular Sinks such as :class:`Sink.fold` and :class:`Sink.foreach`):
like this: ``.run(mat)``.
.. includecode:: ../code/docs/stream/TwitterStreamQuickstartDocTest.java#authors-foreach-println
The complete snippet looks like this:
Materializing and running a stream always requires a :class:`Materializer` to be passed in explicitly,
.. includecode:: ../code/docs/stream/TwitterStreamQuickstartDocTest.java#first-sample like this: ``.run(mat)``.
Flattening sequences in streams The complete snippet looks like this:
-------------------------------
In the previous section we were working on 1:1 relationships of elements which is the most common case, but sometimes .. includecode:: ../code/docs/stream/TwitterStreamQuickstartDocTest.java#first-sample
we might want to map from one element to a number of elements and receive a "flattened" stream, similarly like ``flatMap``
works on Scala Collections. In order to get a flattened stream of hashtags from our stream of tweets we can use the ``mapConcat`` Flattening sequences in streams
combinator: -------------------------------
In the previous section we were working on 1:1 relationships of elements which is the most common case, but sometimes
.. includecode:: ../code/docs/stream/TwitterStreamQuickstartDocTest.java#hashtags-mapConcat we might want to map from one element to a number of elements and receive a "flattened" stream, similarly like ``flatMap``
works on Scala Collections. In order to get a flattened stream of hashtags from our stream of tweets we can use the ``mapConcat``
.. note:: combinator:
The name ``flatMap`` was consciously avoided due to its proximity with for-comprehensions and monadic composition.
It is problematic for two reasons: firstly, flattening by concatenation is often undesirable in bounded stream processing .. includecode:: ../code/docs/stream/TwitterStreamQuickstartDocTest.java#hashtags-mapConcat
due to the risk of deadlock (with merge being the preferred strategy), and secondly, the monad laws would not hold for
our implementation of flatMap (due to the liveness issues). .. note::
The name ``flatMap`` was consciously avoided due to its proximity with for-comprehensions and monadic composition.
Please note that the ``mapConcat`` requires the supplied function to return a strict collection (``Out f -> java.util.List<T>``), It is problematic for two reasons: firstly, flattening by concatenation is often undesirable in bounded stream processing
whereas ``flatMap`` would have to operate on streams all the way through. due to the risk of deadlock (with merge being the preferred strategy), and secondly, the monad laws would not hold for
our implementation of flatMap (due to the liveness issues).
Broadcasting a stream Please note that the ``mapConcat`` requires the supplied function to return a strict collection (``Out f -> java.util.List<T>``),
--------------------- whereas ``flatMap`` would have to operate on streams all the way through.
Now let's say we want to persist all hashtags, as well as all author names from this one live stream.
For example we'd like to write all author handles into one file, and all hashtags into another file on disk.
This means we have to split the source stream into two streams which will handle the writing to these different files. Broadcasting a stream
---------------------
Elements that can be used to form such "fan-out" (or "fan-in") structures are referred to as "junctions" in Akka Streams. Now let's say we want to persist all hashtags, as well as all author names from this one live stream.
One of these that we'll be using in this example is called :class:`Broadcast`, and it simply emits elements from its For example we'd like to write all author handles into one file, and all hashtags into another file on disk.
input port to all of its output ports. This means we have to split the source stream into two streams which will handle the writing to these different files.
Akka Streams intentionally separate the linear stream structures (Flows) from the non-linear, branching ones (Graphs) Elements that can be used to form such "fan-out" (or "fan-in") structures are referred to as "junctions" in Akka Streams.
in order to offer the most convenient API for both of these cases. Graphs can express arbitrarily complex stream setups One of these that we'll be using in this example is called :class:`Broadcast`, and it simply emits elements from its
at the expense of not reading as familiarly as collection transformations. input port to all of its output ports.
Graphs are constructed using :class:`GraphDSL` like this: Akka Streams intentionally separate the linear stream structures (Flows) from the non-linear, branching ones (Graphs)
in order to offer the most convenient API for both of these cases. Graphs can express arbitrarily complex stream setups
.. includecode:: ../code/docs/stream/TwitterStreamQuickstartDocTest.java#graph-dsl-broadcast at the expense of not reading as familiarly as collection transformations.
As you can see, we use graph builder ``b`` to construct the graph using ``UniformFanOutShape`` and ``Flow`` s. Graphs are constructed using :class:`GraphDSL` like this:
``GraphDSL.create`` returns a :class:`Graph`, in this example a ``Graph<ClosedShape,Unit>`` where .. includecode:: ../code/docs/stream/TwitterStreamQuickstartDocTest.java#graph-dsl-broadcast
:class:`ClosedShape` means that it is *a fully connected graph* or "closed" - there are no unconnected inputs or outputs.
Since it is closed it is possible to transform the graph into a :class:`RunnableGraph` using ``RunnableGraph.fromGraph``. As you can see, we use graph builder ``b`` to construct the graph using ``UniformFanOutShape`` and ``Flow`` s.
The runnable graph can then be ``run()`` to materialize a stream out of it.
``GraphDSL.create`` returns a :class:`Graph`, in this example a ``Graph<ClosedShape,Unit>`` where
Both :class:`Graph` and :class:`RunnableGraph` are *immutable, thread-safe, and freely shareable*. :class:`ClosedShape` means that it is *a fully connected graph* or "closed" - there are no unconnected inputs or outputs.
Since it is closed it is possible to transform the graph into a :class:`RunnableGraph` using ``RunnableGraph.fromGraph``.
A graph can also have one of several other shapes, with one or more unconnected ports. Having unconnected ports The runnable graph can then be ``run()`` to materialize a stream out of it.
expresses a graph that is a *partial graph*. Concepts around composing and nesting graphs in large structures are
explained in detail in :ref:`composition-java`. It is also possible to wrap complex computation graphs Both :class:`Graph` and :class:`RunnableGraph` are *immutable, thread-safe, and freely shareable*.
as Flows, Sinks or Sources, which will be explained in detail in :ref:`partial-graph-dsl-java`.
A graph can also have one of several other shapes, with one or more unconnected ports. Having unconnected ports
expresses a graph that is a *partial graph*. Concepts around composing and nesting graphs in large structures are
Back-pressure in action explained in detail in :ref:`composition-java`. It is also possible to wrap complex computation graphs
----------------------- as Flows, Sinks or Sources, which will be explained in detail in :ref:`partial-graph-dsl-java`.
One of the main advantages of Akka Streams is that they *always* propagate back-pressure information from stream Sinks
(Subscribers) to their Sources (Publishers). It is not an optional feature, and is enabled at all times. To learn more Back-pressure in action
about the back-pressure protocol used by Akka Streams and all other Reactive Streams compatible implementations read -----------------------
:ref:`back-pressure-explained-java`.
One of the main advantages of Akka Streams is that they *always* propagate back-pressure information from stream Sinks
A typical problem applications (not using Akka Streams) like this often face is that they are unable to process the incoming data fast enough, (Subscribers) to their Sources (Publishers). It is not an optional feature, and is enabled at all times. To learn more
either temporarily or by design, and will start buffering incoming data until there's no more space to buffer, resulting about the back-pressure protocol used by Akka Streams and all other Reactive Streams compatible implementations read
in either ``OutOfMemoryError`` s or other severe degradations of service responsiveness. With Akka Streams buffering can :ref:`back-pressure-explained-java`.
and must be handled explicitly. For example, if we are only interested in the "*most recent tweets, with a buffer of 10
elements*" this can be expressed using the ``buffer`` element: A typical problem applications (not using Akka Streams) like this often face is that they are unable to process the incoming data fast enough,
either temporarily or by design, and will start buffering incoming data until there's no more space to buffer, resulting
.. includecode:: ../code/docs/stream/TwitterStreamQuickstartDocTest.java#tweets-slow-consumption-dropHead in either ``OutOfMemoryError`` s or other severe degradations of service responsiveness. With Akka Streams buffering can
and must be handled explicitly. For example, if we are only interested in the "*most recent tweets, with a buffer of 10
The ``buffer`` element takes an explicit and required ``OverflowStrategy``, which defines how the buffer should react elements*" this can be expressed using the ``buffer`` element:
when it receives another element while it is full. Strategies provided include dropping the oldest element (``dropHead``),
dropping the entire buffer, signalling failures etc. Be sure to pick and choose the strategy that fits your use case best. .. includecode:: ../code/docs/stream/TwitterStreamQuickstartDocTest.java#tweets-slow-consumption-dropHead
.. _materialized-values-quick-java: The ``buffer`` element takes an explicit and required ``OverflowStrategy``, which defines how the buffer should react
when it receives another element while it is full. Strategies provided include dropping the oldest element (``dropHead``),
Materialized values dropping the entire buffer, signalling failures etc. Be sure to pick and choose the strategy that fits your use case best.
-------------------
So far we've been only processing data using Flows and consuming it into some kind of external Sink - be it by printing .. _materialized-values-quick-java:
values or storing them in some external system. However sometimes we may be interested in some value that can be
obtained from the materialized processing pipeline. For example, we want to know how many tweets we have processed. Materialized values
While this question is not as obvious to give an answer to in case of an infinite stream of tweets (one way to answer -------------------
this question in a streaming setting would be to create a stream of counts described as "*up until now*, we've processed N tweets"), So far we've been only processing data using Flows and consuming it into some kind of external Sink - be it by printing
but in general it is possible to deal with finite streams and come up with a nice result such as a total count of elements. values or storing them in some external system. However sometimes we may be interested in some value that can be
obtained from the materialized processing pipeline. For example, we want to know how many tweets we have processed.
First, let's write such an element counter using ``Flow.of(Class)`` and ``Sink.fold`` to see how the types look like: While this question is not as obvious to give an answer to in case of an infinite stream of tweets (one way to answer
this question in a streaming setting would be to create a stream of counts described as "*up until now*, we've processed N tweets"),
.. includecode:: ../code/docs/stream/TwitterStreamQuickstartDocTest.java#tweets-fold-count but in general it is possible to deal with finite streams and come up with a nice result such as a total count of elements.
First we prepare a reusable ``Flow`` that will change each incoming tweet into an integer of value ``1``. We'll use this in First, let's write such an element counter using ``Flow.of(Class)`` and ``Sink.fold`` to see how the types look like:
order to combine those with a ``Sink.fold`` that will sum all ``Integer`` elements of the stream and make its result available as
a ``CompletionStage<Integer>``. Next we connect the ``tweets`` stream to ``count`` with ``via``. Finally we connect the Flow to the previously .. includecode:: ../code/docs/stream/TwitterStreamQuickstartDocTest.java#tweets-fold-count
prepared Sink using ``toMat``.
First we prepare a reusable ``Flow`` that will change each incoming tweet into an integer of value ``1``. We'll use this in
Remember those mysterious ``Mat`` type parameters on ``Source<Out, Mat>``, ``Flow<In, Out, Mat>`` and ``Sink<In, Mat>``? order to combine those with a ``Sink.fold`` that will sum all ``Integer`` elements of the stream and make its result available as
They represent the type of values these processing parts return when materialized. When you chain these together, a ``CompletionStage<Integer>``. Next we connect the ``tweets`` stream to ``count`` with ``via``. Finally we connect the Flow to the previously
you can explicitly combine their materialized values: in our example we used the ``Keep.right`` predefined function, prepared Sink using ``toMat``.
which tells the implementation to only care about the materialized type of the stage currently appended to the right.
The materialized type of ``sumSink`` is ``CompletionStage<Integer>`` and because of using ``Keep.right``, the resulting :class:`RunnableGraph` Remember those mysterious ``Mat`` type parameters on ``Source<Out, Mat>``, ``Flow<In, Out, Mat>`` and ``Sink<In, Mat>``?
has also a type parameter of ``CompletionStage<Integer>``. They represent the type of values these processing parts return when materialized. When you chain these together,
you can explicitly combine their materialized values: in our example we used the ``Keep.right`` predefined function,
This step does *not* yet materialize the which tells the implementation to only care about the materialized type of the stage currently appended to the right.
processing pipeline, it merely prepares the description of the Flow, which is now connected to a Sink, and therefore can The materialized type of ``sumSink`` is ``CompletionStage<Integer>`` and because of using ``Keep.right``, the resulting :class:`RunnableGraph`
be ``run()``, as indicated by its type: ``RunnableGraph<CompletionStage<Integer>>``. Next we call ``run()`` which uses the :class:`ActorMaterializer` has also a type parameter of ``CompletionStage<Integer>``.
to materialize and run the Flow. The value returned by calling ``run()`` on a ``RunnableGraph<T>`` is of type ``T``.
In our case this type is ``CompletionStage<Integer>`` which, when completed, will contain the total length of our tweets stream. This step does *not* yet materialize the
In case of the stream failing, this future would complete with a Failure. processing pipeline, it merely prepares the description of the Flow, which is now connected to a Sink, and therefore can
be ``run()``, as indicated by its type: ``RunnableGraph<CompletionStage<Integer>>``. Next we call ``run()`` which uses the :class:`ActorMaterializer`
A :class:`RunnableGraph` may be reused to materialize and run the Flow. The value returned by calling ``run()`` on a ``RunnableGraph<T>`` is of type ``T``.
and materialized multiple times, because it is just the "blueprint" of the stream. This means that if we materialize a stream, In our case this type is ``CompletionStage<Integer>`` which, when completed, will contain the total length of our tweets stream.
for example one that consumes a live stream of tweets within a minute, the materialized values for those two materializations In case of the stream failing, this future would complete with a Failure.
will be different, as illustrated by this example:
A :class:`RunnableGraph` may be reused
.. includecode:: ../code/docs/stream/TwitterStreamQuickstartDocTest.java#tweets-runnable-flow-materialized-twice and materialized multiple times, because it is just the "blueprint" of the stream. This means that if we materialize a stream,
for example one that consumes a live stream of tweets within a minute, the materialized values for those two materializations
Many elements in Akka Streams provide materialized values which can be used for obtaining either results of computation or will be different, as illustrated by this example:
steering these elements which will be discussed in detail in :ref:`stream-materialization-java`. Summing up this section, now we know
what happens behind the scenes when we run this one-liner, which is equivalent to the multi line version above: .. includecode:: ../code/docs/stream/TwitterStreamQuickstartDocTest.java#tweets-runnable-flow-materialized-twice
.. includecode:: ../code/docs/stream/TwitterStreamQuickstartDocTest.java#tweets-fold-count-oneline Many elements in Akka Streams provide materialized values which can be used for obtaining either results of computation or
steering these elements which will be discussed in detail in :ref:`stream-materialization-java`. Summing up this section, now we know
.. note:: what happens behind the scenes when we run this one-liner, which is equivalent to the multi line version above:
``runWith()`` is a convenience method that automatically ignores the materialized value of any other stages except
those appended by the ``runWith()`` itself. In the above example it translates to using ``Keep.right`` as the combiner .. includecode:: ../code/docs/stream/TwitterStreamQuickstartDocTest.java#tweets-fold-count-oneline
for materialized values.
.. note::
``runWith()`` is a convenience method that automatically ignores the materialized value of any other stages except
those appended by the ``runWith()`` itself. In the above example it translates to using ``Keep.right`` as the combiner
for materialized values.

View file

@ -3,19 +3,22 @@
*/ */
package docs.stream package docs.stream
//#imports //#stream-imports
import akka.stream._ import akka.stream._
import akka.stream.scaladsl._ import akka.stream.scaladsl._
//#imports //#stream-imports
//#other-imports
import akka.{ NotUsed, Done } import akka.{ NotUsed, Done }
import akka.actor.ActorSystem import akka.actor.ActorSystem
import akka.util.ByteString import akka.util.ByteString
import org.scalatest._
import org.scalatest.concurrent._
import scala.concurrent._ import scala.concurrent._
import scala.concurrent.duration._ import scala.concurrent.duration._
import java.nio.file.Paths import java.nio.file.Paths
//#other-imports
import org.scalatest._
import org.scalatest.concurrent._
class QuickStartDocSpec extends WordSpec with BeforeAndAfterAll with ScalaFutures { class QuickStartDocSpec extends WordSpec with BeforeAndAfterAll with ScalaFutures {
implicit val patience = PatienceConfig(5.seconds) implicit val patience = PatienceConfig(5.seconds)

View file

@ -1,329 +1,333 @@
.. _stream-quickstart-scala: .. _stream-quickstart-scala:
Quick Start Guide Quick Start Guide
================= =================
A stream usually begins at a source, so this is also how we start an Akka A stream usually begins at a source, so this is also how we start an Akka
Stream. Before we create one, we import the full complement of streaming tools: Stream. Before we create one, we import the full complement of streaming tools:
.. includecode:: ../code/docs/stream/QuickStartDocSpec.scala#imports .. includecode:: ../code/docs/stream/QuickStartDocSpec.scala#stream-imports
Now we will start with a rather simple source, emitting the integers 1 to 100: If you want to execute the code samples while you read through the quick start guide, you will also need the following imports:
.. includecode:: ../code/docs/stream/QuickStartDocSpec.scala#create-source .. includecode:: ../code/docs/stream/QuickStartDocSpec.scala#other-imports
The :class:`Source` type is parameterized with two types: the first one is the Now we will start with a rather simple source, emitting the integers 1 to 100:
type of element that this source emits and the second one may signal that
running the source produces some auxiliary value (e.g. a network source may .. includecode:: ../code/docs/stream/QuickStartDocSpec.scala#create-source
provide information about the bound port or the peers address). Where no
auxiliary information is produced, the type ``akka.NotUsed`` is used—and a The :class:`Source` type is parameterized with two types: the first one is the
simple range of integers surely falls into this category. type of element that this source emits and the second one may signal that
running the source produces some auxiliary value (e.g. a network source may
Having created this source means that we have a description of how to emit the provide information about the bound port or the peers address). Where no
first 100 natural numbers, but this source is not yet active. In order to get auxiliary information is produced, the type ``akka.NotUsed`` is used—and a
those numbers out we have to run it: simple range of integers surely falls into this category.
.. includecode:: ../code/docs/stream/QuickStartDocSpec.scala#run-source Having created this source means that we have a description of how to emit the
first 100 natural numbers, but this source is not yet active. In order to get
This line will complement the source with a consumer function—in this example those numbers out we have to run it:
we simply print out the numbers to the console—and pass this little stream
setup to an Actor that runs it. This activation is signaled by having “run” be .. includecode:: ../code/docs/stream/QuickStartDocSpec.scala#run-source
part of the method name; there are other methods that run Akka Streams, and
they all follow this pattern. This line will complement the source with a consumer function—in this example
we simply print out the numbers to the console—and pass this little stream
You may wonder where the Actor gets created that runs the stream, and you are setup to an Actor that runs it. This activation is signaled by having “run” be
probably also asking yourself what this ``materializer`` means. In order to get part of the method name; there are other methods that run Akka Streams, and
this value we first need to create an Actor system: they all follow this pattern.
.. includecode:: ../code/docs/stream/QuickStartDocSpec.scala#create-materializer You may wonder where the Actor gets created that runs the stream, and you are
probably also asking yourself what this ``materializer`` means. In order to get
There are other ways to create a materializer, e.g. from an this value we first need to create an Actor system:
:class:`ActorContext` when using streams from within Actors. The
:class:`Materializer` is a factory for stream execution engines, it is the .. includecode:: ../code/docs/stream/QuickStartDocSpec.scala#create-materializer
thing that makes streams run—you dont need to worry about any of the details
just now apart from that you need one for calling any of the ``run`` methods on There are other ways to create a materializer, e.g. from an
a :class:`Source`. The materializer is picked up implicitly if it is omitted :class:`ActorContext` when using streams from within Actors. The
from the ``run`` method call arguments, which we will do in the following. :class:`Materializer` is a factory for stream execution engines, it is the
thing that makes streams run—you dont need to worry about any of the details
The nice thing about Akka Streams is that the :class:`Source` is just a just now apart from that you need one for calling any of the ``run`` methods on
description of what you want to run, and like an architects blueprint it can a :class:`Source`. The materializer is picked up implicitly if it is omitted
be reused, incorporated into a larger design. We may choose to transform the from the ``run`` method call arguments, which we will do in the following.
source of integers and write it to a file instead:
The nice thing about Akka Streams is that the :class:`Source` is just a
.. includecode:: ../code/docs/stream/QuickStartDocSpec.scala#transform-source description of what you want to run, and like an architects blueprint it can
be reused, incorporated into a larger design. We may choose to transform the
First we use the ``scan`` combinator to run a computation over the whole source of integers and write it to a file instead:
stream: starting with the number 1 (``BigInt(1)``) we multiple by each of
the incoming numbers, one after the other; the scan operation emits the initial .. includecode:: ../code/docs/stream/QuickStartDocSpec.scala#transform-source
value and then every calculation result. This yields the series of factorial
numbers which we stash away as a :class:`Source` for later reuse—it is First we use the ``scan`` combinator to run a computation over the whole
important to keep in mind that nothing is actually computed yet, this is just a stream: starting with the number 1 (``BigInt(1)``) we multiple by each of
description of what we want to have computed once we run the stream. Then we the incoming numbers, one after the other; the scan operation emits the initial
convert the resulting series of numbers into a stream of :class:`ByteString` value and then every calculation result. This yields the series of factorial
objects describing lines in a text file. This stream is then run by attaching a numbers which we stash away as a :class:`Source` for later reuse—it is
file as the receiver of the data. In the terminology of Akka Streams this is important to keep in mind that nothing is actually computed yet, this is just a
called a :class:`Sink`. :class:`IOResult` is a type that IO operations return in description of what we want to have computed once we run the stream. Then we
Akka Streams in order to tell you how many bytes or elements were consumed and convert the resulting series of numbers into a stream of :class:`ByteString`
whether the stream terminated normally or exceptionally. objects describing lines in a text file. This stream is then run by attaching a
file as the receiver of the data. In the terminology of Akka Streams this is
Reusable Pieces called a :class:`Sink`. :class:`IOResult` is a type that IO operations return in
--------------- Akka Streams in order to tell you how many bytes or elements were consumed and
whether the stream terminated normally or exceptionally.
One of the nice parts of Akka Streams—and something that other stream libraries
do not offer—is that not only sources can be reused like blueprints, all other Reusable Pieces
elements can be as well. We can take the file-writing :class:`Sink`, prepend ---------------
the processing steps necessary to get the :class:`ByteString` elements from
incoming strings and package that up as a reusable piece as well. Since the One of the nice parts of Akka Streams—and something that other stream libraries
language for writing these streams always flows from left to right (just like do not offer—is that not only sources can be reused like blueprints, all other
plain English), we need a starting point that is like a source but with an elements can be as well. We can take the file-writing :class:`Sink`, prepend
“open” input. In Akka Streams this is called a :class:`Flow`: the processing steps necessary to get the :class:`ByteString` elements from
incoming strings and package that up as a reusable piece as well. Since the
.. includecode:: ../code/docs/stream/QuickStartDocSpec.scala#transform-sink language for writing these streams always flows from left to right (just like
plain English), we need a starting point that is like a source but with an
Starting from a flow of strings we convert each to :class:`ByteString` and then “open” input. In Akka Streams this is called a :class:`Flow`:
feed to the already known file-writing :class:`Sink`. The resulting blueprint
is a :class:`Sink[String, Future[IOResult]]`, which means that it .. includecode:: ../code/docs/stream/QuickStartDocSpec.scala#transform-sink
accepts strings as its input and when materialized it will create auxiliary
information of type ``Future[IOResult]`` (when chaining operations on Starting from a flow of strings we convert each to :class:`ByteString` and then
a :class:`Source` or :class:`Flow` the type of the auxiliary information—called feed to the already known file-writing :class:`Sink`. The resulting blueprint
the “materialized value”—is given by the leftmost starting point; since we want is a :class:`Sink[String, Future[IOResult]]`, which means that it
to retain what the ``FileIO.toFile`` sink has to offer, we need to say accepts strings as its input and when materialized it will create auxiliary
``Keep.right``). information of type ``Future[IOResult]`` (when chaining operations on
a :class:`Source` or :class:`Flow` the type of the auxiliary information—called
We can use the new and shiny :class:`Sink` we just created by the “materialized value”—is given by the leftmost starting point; since we want
attaching it to our ``factorials`` source—after a small adaptation to turn the to retain what the ``FileIO.toFile`` sink has to offer, we need to say
numbers into strings: ``Keep.right``).
.. includecode:: ../code/docs/stream/QuickStartDocSpec.scala#use-transformed-sink We can use the new and shiny :class:`Sink` we just created by
attaching it to our ``factorials`` source—after a small adaptation to turn the
Time-Based Processing numbers into strings:
---------------------
.. includecode:: ../code/docs/stream/QuickStartDocSpec.scala#use-transformed-sink
Before we start looking at a more involved example we explore the streaming
nature of what Akka Streams can do. Starting from the ``factorials`` source Time-Based Processing
we transform the stream by zipping it together with another stream, ---------------------
represented by a :class:`Source` that emits the number 0 to 100: the first
number emitted by the ``factorials`` source is the factorial of zero, the Before we start looking at a more involved example we explore the streaming
second is the factorial of one, and so on. We combine these two by forming nature of what Akka Streams can do. Starting from the ``factorials`` source
strings like ``"3! = 6"``. we transform the stream by zipping it together with another stream,
represented by a :class:`Source` that emits the number 0 to 100: the first
.. includecode:: ../code/docs/stream/QuickStartDocSpec.scala#add-streams number emitted by the ``factorials`` source is the factorial of zero, the
second is the factorial of one, and so on. We combine these two by forming
All operations so far have been time-independent and could have been performed strings like ``"3! = 6"``.
in the same fashion on strict collections of elements. The next line
demonstrates that we are in fact dealing with streams that can flow at a .. includecode:: ../code/docs/stream/QuickStartDocSpec.scala#add-streams
certain speed: we use the ``throttle`` combinator to slow down the stream to 1
element per second (the second ``1`` in the argument list is the maximum size All operations so far have been time-independent and could have been performed
of a burst that we want to allow—passing ``1`` means that the first element in the same fashion on strict collections of elements. The next line
gets through immediately and the second then has to wait for one second and so demonstrates that we are in fact dealing with streams that can flow at a
on). certain speed: we use the ``throttle`` combinator to slow down the stream to 1
element per second (the second ``1`` in the argument list is the maximum size
If you run this program you will see one line printed per second. One aspect of a burst that we want to allow—passing ``1`` means that the first element
that is not immediately visible deserves mention, though: if you try and set gets through immediately and the second then has to wait for one second and so
the streams to produce a billion numbers each then you will notice that your on).
JVM does not crash with an OutOfMemoryError, even though you will also notice
that running the streams happens in the background, asynchronously (this is the If you run this program you will see one line printed per second. One aspect
reason for the auxiliary information to be provided as a :class:`Future`). The that is not immediately visible deserves mention, though: if you try and set
secret that makes this work is that Akka Streams implicitly implement pervasive the streams to produce a billion numbers each then you will notice that your
flow control, all combinators respect back-pressure. This allows the throttle JVM does not crash with an OutOfMemoryError, even though you will also notice
combinator to signal to all its upstream sources of data that it can only that running the streams happens in the background, asynchronously (this is the
accept elements at a certain rate—when the incoming rate is higher than one per reason for the auxiliary information to be provided as a :class:`Future`). The
second the throttle combinator will assert *back-pressure* upstream. secret that makes this work is that Akka Streams implicitly implement pervasive
flow control, all combinators respect back-pressure. This allows the throttle
This is basically all there is to Akka Streams in a nutshell—glossing over the combinator to signal to all its upstream sources of data that it can only
fact that there are dozens of sources and sinks and many more stream accept elements at a certain rate—when the incoming rate is higher than one per
transformation combinators to choose from, see also :ref:`stages-overview_scala`. second the throttle combinator will assert *back-pressure* upstream.
Reactive Tweets This is basically all there is to Akka Streams in a nutshell—glossing over the
=============== fact that there are dozens of sources and sinks and many more stream
transformation combinators to choose from, see also :ref:`stages-overview_scala`.
A typical use case for stream processing is consuming a live stream of data that we want to extract or aggregate some
other data from. In this example we'll consider consuming a stream of tweets and extracting information concerning Akka from them. Reactive Tweets
===============
We will also consider the problem inherent to all non-blocking streaming
solutions: *"What if the subscriber is too slow to consume the live stream of A typical use case for stream processing is consuming a live stream of data that we want to extract or aggregate some
data?"*. Traditionally the solution is often to buffer the elements, but this other data from. In this example we'll consider consuming a stream of tweets and extracting information concerning Akka from them.
can—and usually will—cause eventual buffer overflows and instability of such
systems. Instead Akka Streams depend on internal backpressure signals that We will also consider the problem inherent to all non-blocking streaming
allow to control what should happen in such scenarios. solutions: *"What if the subscriber is too slow to consume the live stream of
data?"*. Traditionally the solution is often to buffer the elements, but this
Here's the data model we'll be working with throughout the quickstart examples: can—and usually will—cause eventual buffer overflows and instability of such
systems. Instead Akka Streams depend on internal backpressure signals that
.. includecode:: ../code/docs/stream/TwitterStreamQuickstartDocSpec.scala#model allow to control what should happen in such scenarios.
.. note:: Here's the data model we'll be working with throughout the quickstart examples:
If you would like to get an overview of the used vocabulary first instead of diving head-first
into an actual example you can have a look at the :ref:`core-concepts-scala` and :ref:`defining-and-running-streams-scala` .. includecode:: ../code/docs/stream/TwitterStreamQuickstartDocSpec.scala#model
sections of the docs, and then come back to this quickstart to see it all pieced together into a simple example application.
.. note::
Transforming and consuming simple streams If you would like to get an overview of the used vocabulary first instead of diving head-first
----------------------------------------- into an actual example you can have a look at the :ref:`core-concepts-scala` and :ref:`defining-and-running-streams-scala`
The example application we will be looking at is a simple Twitter feed stream from which we'll want to extract certain information, sections of the docs, and then come back to this quickstart to see it all pieced together into a simple example application.
like for example finding all twitter handles of users who tweet about ``#akka``.
Transforming and consuming simple streams
In order to prepare our environment by creating an :class:`ActorSystem` and :class:`ActorMaterializer`, -----------------------------------------
which will be responsible for materializing and running the streams we are about to create: The example application we will be looking at is a simple Twitter feed stream from which we'll want to extract certain information,
like for example finding all twitter handles of users who tweet about ``#akka``.
.. includecode:: ../code/docs/stream/TwitterStreamQuickstartDocSpec.scala#materializer-setup
In order to prepare our environment by creating an :class:`ActorSystem` and :class:`ActorMaterializer`,
The :class:`ActorMaterializer` can optionally take :class:`ActorMaterializerSettings` which can be used to define which will be responsible for materializing and running the streams we are about to create:
materialization properties, such as default buffer sizes (see also :ref:`async-stream-buffers-scala`), the dispatcher to
be used by the pipeline etc. These can be overridden with ``withAttributes`` on :class:`Flow`, :class:`Source`, :class:`Sink` and :class:`Graph`. .. includecode:: ../code/docs/stream/TwitterStreamQuickstartDocSpec.scala#materializer-setup
Let's assume we have a stream of tweets readily available. In Akka this is expressed as a :class:`Source[Out, M]`: The :class:`ActorMaterializer` can optionally take :class:`ActorMaterializerSettings` which can be used to define
materialization properties, such as default buffer sizes (see also :ref:`async-stream-buffers-scala`), the dispatcher to
.. includecode:: ../code/docs/stream/TwitterStreamQuickstartDocSpec.scala#tweet-source be used by the pipeline etc. These can be overridden with ``withAttributes`` on :class:`Flow`, :class:`Source`, :class:`Sink` and :class:`Graph`.
Streams always start flowing from a :class:`Source[Out,M1]` then can continue through :class:`Flow[In,Out,M2]` elements or Let's assume we have a stream of tweets readily available. In Akka this is expressed as a :class:`Source[Out, M]`:
more advanced graph elements to finally be consumed by a :class:`Sink[In,M3]` (ignore the type parameters ``M1``, ``M2``
and ``M3`` for now, they are not relevant to the types of the elements produced/consumed by these classes they are .. includecode:: ../code/docs/stream/TwitterStreamQuickstartDocSpec.scala#tweet-source
"materialized types", which we'll talk about :ref:`below <materialized-values-quick-scala>`).
Streams always start flowing from a :class:`Source[Out,M1]` then can continue through :class:`Flow[In,Out,M2]` elements or
The operations should look familiar to anyone who has used the Scala Collections library, more advanced graph elements to finally be consumed by a :class:`Sink[In,M3]` (ignore the type parameters ``M1``, ``M2``
however they operate on streams and not collections of data (which is a very important distinction, as some operations and ``M3`` for now, they are not relevant to the types of the elements produced/consumed by these classes they are
only make sense in streaming and vice versa): "materialized types", which we'll talk about :ref:`below <materialized-values-quick-scala>`).
.. includecode:: ../code/docs/stream/TwitterStreamQuickstartDocSpec.scala#authors-filter-map The operations should look familiar to anyone who has used the Scala Collections library,
however they operate on streams and not collections of data (which is a very important distinction, as some operations
Finally in order to :ref:`materialize <stream-materialization-scala>` and run the stream computation we need to attach only make sense in streaming and vice versa):
the Flow to a :class:`Sink` that will get the Flow running. The simplest way to do this is to call
``runWith(sink)`` on a ``Source``. For convenience a number of common Sinks are predefined and collected as methods on .. includecode:: ../code/docs/stream/TwitterStreamQuickstartDocSpec.scala#authors-filter-map
the :class:`Sink` `companion object <http://doc.akka.io/api/akka-stream-and-http-experimental/@version@/#akka.stream.scaladsl.Sink$>`_.
For now let's simply print each author: Finally in order to :ref:`materialize <stream-materialization-scala>` and run the stream computation we need to attach
the Flow to a :class:`Sink` that will get the Flow running. The simplest way to do this is to call
.. includecode:: ../code/docs/stream/TwitterStreamQuickstartDocSpec.scala#authors-foreachsink-println ``runWith(sink)`` on a ``Source``. For convenience a number of common Sinks are predefined and collected as methods on
the :class:`Sink` `companion object <http://doc.akka.io/api/akka-stream-and-http-experimental/@version@/#akka.stream.scaladsl.Sink$>`_.
or by using the shorthand version (which are defined only for the most popular Sinks such as ``Sink.fold`` and ``Sink.foreach``): For now let's simply print each author:
.. includecode:: ../code/docs/stream/TwitterStreamQuickstartDocSpec.scala#authors-foreach-println .. includecode:: ../code/docs/stream/TwitterStreamQuickstartDocSpec.scala#authors-foreachsink-println
Materializing and running a stream always requires a :class:`Materializer` to be in implicit scope (or passed in explicitly, or by using the shorthand version (which are defined only for the most popular Sinks such as ``Sink.fold`` and ``Sink.foreach``):
like this: ``.run(materializer)``).
.. includecode:: ../code/docs/stream/TwitterStreamQuickstartDocSpec.scala#authors-foreach-println
The complete snippet looks like this:
Materializing and running a stream always requires a :class:`Materializer` to be in implicit scope (or passed in explicitly,
.. includecode:: ../code/docs/stream/TwitterStreamQuickstartDocSpec.scala#first-sample like this: ``.run(materializer)``).
Flattening sequences in streams The complete snippet looks like this:
-------------------------------
In the previous section we were working on 1:1 relationships of elements which is the most common case, but sometimes .. includecode:: ../code/docs/stream/TwitterStreamQuickstartDocSpec.scala#first-sample
we might want to map from one element to a number of elements and receive a "flattened" stream, similarly like ``flatMap``
works on Scala Collections. In order to get a flattened stream of hashtags from our stream of tweets we can use the ``mapConcat`` Flattening sequences in streams
combinator: -------------------------------
In the previous section we were working on 1:1 relationships of elements which is the most common case, but sometimes
.. includecode:: ../code/docs/stream/TwitterStreamQuickstartDocSpec.scala#hashtags-mapConcat we might want to map from one element to a number of elements and receive a "flattened" stream, similarly like ``flatMap``
works on Scala Collections. In order to get a flattened stream of hashtags from our stream of tweets we can use the ``mapConcat``
.. note:: combinator:
The name ``flatMap`` was consciously avoided due to its proximity with for-comprehensions and monadic composition.
It is problematic for two reasons: first, flattening by concatenation is often undesirable in bounded stream processing .. includecode:: ../code/docs/stream/TwitterStreamQuickstartDocSpec.scala#hashtags-mapConcat
due to the risk of deadlock (with merge being the preferred strategy), and second, the monad laws would not hold for
our implementation of flatMap (due to the liveness issues). .. note::
The name ``flatMap`` was consciously avoided due to its proximity with for-comprehensions and monadic composition.
Please note that the ``mapConcat`` requires the supplied function to return a strict collection (``f:Out=>immutable.Seq[T]``), It is problematic for two reasons: first, flattening by concatenation is often undesirable in bounded stream processing
whereas ``flatMap`` would have to operate on streams all the way through. due to the risk of deadlock (with merge being the preferred strategy), and second, the monad laws would not hold for
our implementation of flatMap (due to the liveness issues).
Broadcasting a stream
--------------------- Please note that the ``mapConcat`` requires the supplied function to return a strict collection (``f:Out=>immutable.Seq[T]``),
Now let's say we want to persist all hashtags, as well as all author names from this one live stream. whereas ``flatMap`` would have to operate on streams all the way through.
For example we'd like to write all author handles into one file, and all hashtags into another file on disk.
This means we have to split the source stream into two streams which will handle the writing to these different files. Broadcasting a stream
---------------------
Elements that can be used to form such "fan-out" (or "fan-in") structures are referred to as "junctions" in Akka Streams. Now let's say we want to persist all hashtags, as well as all author names from this one live stream.
One of these that we'll be using in this example is called :class:`Broadcast`, and it simply emits elements from its For example we'd like to write all author handles into one file, and all hashtags into another file on disk.
input port to all of its output ports. This means we have to split the source stream into two streams which will handle the writing to these different files.
Akka Streams intentionally separate the linear stream structures (Flows) from the non-linear, branching ones (Graphs) Elements that can be used to form such "fan-out" (or "fan-in") structures are referred to as "junctions" in Akka Streams.
in order to offer the most convenient API for both of these cases. Graphs can express arbitrarily complex stream setups One of these that we'll be using in this example is called :class:`Broadcast`, and it simply emits elements from its
at the expense of not reading as familiarly as collection transformations. input port to all of its output ports.
Graphs are constructed using :class:`GraphDSL` like this: Akka Streams intentionally separate the linear stream structures (Flows) from the non-linear, branching ones (Graphs)
in order to offer the most convenient API for both of these cases. Graphs can express arbitrarily complex stream setups
.. includecode:: ../code/docs/stream/TwitterStreamQuickstartDocSpec.scala#graph-dsl-broadcast at the expense of not reading as familiarly as collection transformations.
As you can see, inside the :class:`GraphDSL` we use an implicit graph builder ``b`` to mutably construct the graph Graphs are constructed using :class:`GraphDSL` like this:
using the ``~>`` "edge operator" (also read as "connect" or "via" or "to"). The operator is provided implicitly
by importing ``GraphDSL.Implicits._``. .. includecode:: ../code/docs/stream/TwitterStreamQuickstartDocSpec.scala#graph-dsl-broadcast
``GraphDSL.create`` returns a :class:`Graph`, in this example a :class:`Graph[ClosedShape, Unit]` where As you can see, inside the :class:`GraphDSL` we use an implicit graph builder ``b`` to mutably construct the graph
:class:`ClosedShape` means that it is *a fully connected graph* or "closed" - there are no unconnected inputs or outputs. using the ``~>`` "edge operator" (also read as "connect" or "via" or "to"). The operator is provided implicitly
Since it is closed it is possible to transform the graph into a :class:`RunnableGraph` using ``RunnableGraph.fromGraph``. by importing ``GraphDSL.Implicits._``.
The runnable graph can then be ``run()`` to materialize a stream out of it.
``GraphDSL.create`` returns a :class:`Graph`, in this example a :class:`Graph[ClosedShape, Unit]` where
Both :class:`Graph` and :class:`RunnableGraph` are *immutable, thread-safe, and freely shareable*. :class:`ClosedShape` means that it is *a fully connected graph* or "closed" - there are no unconnected inputs or outputs.
Since it is closed it is possible to transform the graph into a :class:`RunnableGraph` using ``RunnableGraph.fromGraph``.
A graph can also have one of several other shapes, with one or more unconnected ports. Having unconnected ports The runnable graph can then be ``run()`` to materialize a stream out of it.
expresses a graph that is a *partial graph*. Concepts around composing and nesting graphs in large structures are
explained in detail in :ref:`composition-scala`. It is also possible to wrap complex computation graphs Both :class:`Graph` and :class:`RunnableGraph` are *immutable, thread-safe, and freely shareable*.
as Flows, Sinks or Sources, which will be explained in detail in
:ref:`constructing-sources-sinks-flows-from-partial-graphs-scala`. A graph can also have one of several other shapes, with one or more unconnected ports. Having unconnected ports
expresses a graph that is a *partial graph*. Concepts around composing and nesting graphs in large structures are
Back-pressure in action explained in detail in :ref:`composition-scala`. It is also possible to wrap complex computation graphs
----------------------- as Flows, Sinks or Sources, which will be explained in detail in
One of the main advantages of Akka Streams is that they *always* propagate back-pressure information from stream Sinks :ref:`constructing-sources-sinks-flows-from-partial-graphs-scala`.
(Subscribers) to their Sources (Publishers). It is not an optional feature, and is enabled at all times. To learn more
about the back-pressure protocol used by Akka Streams and all other Reactive Streams compatible implementations read Back-pressure in action
:ref:`back-pressure-explained-scala`. -----------------------
One of the main advantages of Akka Streams is that they *always* propagate back-pressure information from stream Sinks
A typical problem applications (not using Akka Streams) like this often face is that they are unable to process the incoming data fast enough, (Subscribers) to their Sources (Publishers). It is not an optional feature, and is enabled at all times. To learn more
either temporarily or by design, and will start buffering incoming data until there's no more space to buffer, resulting about the back-pressure protocol used by Akka Streams and all other Reactive Streams compatible implementations read
in either ``OutOfMemoryError`` s or other severe degradations of service responsiveness. With Akka Streams buffering can :ref:`back-pressure-explained-scala`.
and must be handled explicitly. For example, if we are only interested in the "*most recent tweets, with a buffer of 10
elements*" this can be expressed using the ``buffer`` element: A typical problem applications (not using Akka Streams) like this often face is that they are unable to process the incoming data fast enough,
either temporarily or by design, and will start buffering incoming data until there's no more space to buffer, resulting
.. includecode:: ../code/docs/stream/TwitterStreamQuickstartDocSpec.scala#tweets-slow-consumption-dropHead in either ``OutOfMemoryError`` s or other severe degradations of service responsiveness. With Akka Streams buffering can
and must be handled explicitly. For example, if we are only interested in the "*most recent tweets, with a buffer of 10
The ``buffer`` element takes an explicit and required ``OverflowStrategy``, which defines how the buffer should react elements*" this can be expressed using the ``buffer`` element:
when it receives another element while it is full. Strategies provided include dropping the oldest element (``dropHead``),
dropping the entire buffer, signalling errors etc. Be sure to pick and choose the strategy that fits your use case best. .. includecode:: ../code/docs/stream/TwitterStreamQuickstartDocSpec.scala#tweets-slow-consumption-dropHead
.. _materialized-values-quick-scala: The ``buffer`` element takes an explicit and required ``OverflowStrategy``, which defines how the buffer should react
when it receives another element while it is full. Strategies provided include dropping the oldest element (``dropHead``),
Materialized values dropping the entire buffer, signalling errors etc. Be sure to pick and choose the strategy that fits your use case best.
-------------------
So far we've been only processing data using Flows and consuming it into some kind of external Sink - be it by printing .. _materialized-values-quick-scala:
values or storing them in some external system. However sometimes we may be interested in some value that can be
obtained from the materialized processing pipeline. For example, we want to know how many tweets we have processed. Materialized values
While this question is not as obvious to give an answer to in case of an infinite stream of tweets (one way to answer -------------------
this question in a streaming setting would be to create a stream of counts described as "*up until now*, we've processed N tweets"), So far we've been only processing data using Flows and consuming it into some kind of external Sink - be it by printing
but in general it is possible to deal with finite streams and come up with a nice result such as a total count of elements. values or storing them in some external system. However sometimes we may be interested in some value that can be
obtained from the materialized processing pipeline. For example, we want to know how many tweets we have processed.
First, let's write such an element counter using ``Sink.fold`` and see how the types look like: While this question is not as obvious to give an answer to in case of an infinite stream of tweets (one way to answer
this question in a streaming setting would be to create a stream of counts described as "*up until now*, we've processed N tweets"),
.. includecode:: ../code/docs/stream/TwitterStreamQuickstartDocSpec.scala#tweets-fold-count but in general it is possible to deal with finite streams and come up with a nice result such as a total count of elements.
First we prepare a reusable ``Flow`` that will change each incoming tweet into an integer of value ``1``. We'll use this in First, let's write such an element counter using ``Sink.fold`` and see how the types look like:
order to combine those with a ``Sink.fold`` that will sum all ``Int`` elements of the stream and make its result available as
a ``Future[Int]``. Next we connect the ``tweets`` stream to ``count`` with ``via``. Finally we connect the Flow to the previously .. includecode:: ../code/docs/stream/TwitterStreamQuickstartDocSpec.scala#tweets-fold-count
prepared Sink using ``toMat``.
First we prepare a reusable ``Flow`` that will change each incoming tweet into an integer of value ``1``. We'll use this in
Remember those mysterious ``Mat`` type parameters on ``Source[+Out, +Mat]``, ``Flow[-In, +Out, +Mat]`` and ``Sink[-In, +Mat]``? order to combine those with a ``Sink.fold`` that will sum all ``Int`` elements of the stream and make its result available as
They represent the type of values these processing parts return when materialized. When you chain these together, a ``Future[Int]``. Next we connect the ``tweets`` stream to ``count`` with ``via``. Finally we connect the Flow to the previously
you can explicitly combine their materialized values. In our example we used the ``Keep.right`` predefined function, prepared Sink using ``toMat``.
which tells the implementation to only care about the materialized type of the stage currently appended to the right.
The materialized type of ``sumSink`` is ``Future[Int]`` and because of using ``Keep.right``, the resulting :class:`RunnableGraph` Remember those mysterious ``Mat`` type parameters on ``Source[+Out, +Mat]``, ``Flow[-In, +Out, +Mat]`` and ``Sink[-In, +Mat]``?
has also a type parameter of ``Future[Int]``. They represent the type of values these processing parts return when materialized. When you chain these together,
you can explicitly combine their materialized values. In our example we used the ``Keep.right`` predefined function,
This step does *not* yet materialize the which tells the implementation to only care about the materialized type of the stage currently appended to the right.
processing pipeline, it merely prepares the description of the Flow, which is now connected to a Sink, and therefore can The materialized type of ``sumSink`` is ``Future[Int]`` and because of using ``Keep.right``, the resulting :class:`RunnableGraph`
be ``run()``, as indicated by its type: ``RunnableGraph[Future[Int]]``. Next we call ``run()`` which uses the implicit :class:`ActorMaterializer` has also a type parameter of ``Future[Int]``.
to materialize and run the Flow. The value returned by calling ``run()`` on a ``RunnableGraph[T]`` is of type ``T``.
In our case this type is ``Future[Int]`` which, when completed, will contain the total length of our ``tweets`` stream. This step does *not* yet materialize the
In case of the stream failing, this future would complete with a Failure. processing pipeline, it merely prepares the description of the Flow, which is now connected to a Sink, and therefore can
be ``run()``, as indicated by its type: ``RunnableGraph[Future[Int]]``. Next we call ``run()`` which uses the implicit :class:`ActorMaterializer`
A :class:`RunnableGraph` may be reused to materialize and run the Flow. The value returned by calling ``run()`` on a ``RunnableGraph[T]`` is of type ``T``.
and materialized multiple times, because it is just the "blueprint" of the stream. This means that if we materialize a stream, In our case this type is ``Future[Int]`` which, when completed, will contain the total length of our ``tweets`` stream.
for example one that consumes a live stream of tweets within a minute, the materialized values for those two materializations In case of the stream failing, this future would complete with a Failure.
will be different, as illustrated by this example:
A :class:`RunnableGraph` may be reused
.. includecode:: ../code/docs/stream/TwitterStreamQuickstartDocSpec.scala#tweets-runnable-flow-materialized-twice and materialized multiple times, because it is just the "blueprint" of the stream. This means that if we materialize a stream,
for example one that consumes a live stream of tweets within a minute, the materialized values for those two materializations
Many elements in Akka Streams provide materialized values which can be used for obtaining either results of computation or will be different, as illustrated by this example:
steering these elements which will be discussed in detail in :ref:`stream-materialization-scala`. Summing up this section, now we know
what happens behind the scenes when we run this one-liner, which is equivalent to the multi line version above: .. includecode:: ../code/docs/stream/TwitterStreamQuickstartDocSpec.scala#tweets-runnable-flow-materialized-twice
.. includecode:: ../code/docs/stream/TwitterStreamQuickstartDocSpec.scala#tweets-fold-count-oneline Many elements in Akka Streams provide materialized values which can be used for obtaining either results of computation or
steering these elements which will be discussed in detail in :ref:`stream-materialization-scala`. Summing up this section, now we know
.. note:: what happens behind the scenes when we run this one-liner, which is equivalent to the multi line version above:
``runWith()`` is a convenience method that automatically ignores the materialized value of any other stages except
those appended by the ``runWith()`` itself. In the above example it translates to using ``Keep.right`` as the combiner .. includecode:: ../code/docs/stream/TwitterStreamQuickstartDocSpec.scala#tweets-fold-count-oneline
for materialized values.
.. note::
``runWith()`` is a convenience method that automatically ignores the materialized value of any other stages except
those appended by the ``runWith()`` itself. In the above example it translates to using ``Keep.right`` as the combiner
for materialized values.