merge content in stream-quickstart.md (#23212)

This commit is contained in:
Nafer Sanabria 2017-06-26 04:52:36 -05:00 committed by Arnout Engelen
parent f0fba394ea
commit 9856e2f378
2 changed files with 173 additions and 419 deletions

View file

@ -3,24 +3,40 @@
# Quick Start Guide
Create a project and add the akka-streams dependency to the build tool of your
choice as described in @ref:[Using a build tool](../guide/quickstart.md).
choice as described in @ref[Using a build tool](../guide/quickstart.md).
A stream usually begins at a source, so this is also how we start an Akka
Stream. Before we create one, we import the full complement of streaming tools:
@@snip [QuickStartDocSpec.scala]($code$/scala/docs/stream/QuickStartDocSpec.scala) { #stream-imports }
Scala
: @@snip [QuickStartDocSpec.scala]($code$/scala/docs/stream/QuickStartDocSpec.scala) { #stream-imports }
Java
: @@snip [QuickStartDocTest.java]($code$/java/jdocs/stream/QuickStartDocTest.java) { #stream-imports }
If you want to execute the code samples while you read through the quick start guide, you will also need the following imports:
@@snip [QuickStartDocSpec.scala]($code$/scala/docs/stream/QuickStartDocSpec.scala) { #other-imports }
Scala
: @@snip [QuickStartDocSpec.scala]($code$/scala/docs/stream/QuickStartDocSpec.scala) { #other-imports }
And an object to hold your code, for example:
Java
: @@snip [QuickStartDocTest.java]($code$/java/jdocs/stream/QuickStartDocTest.java) { #other-imports }
@@snip [QuickStartDocSpec.scala]($code$/scala/docs/stream/QuickStartDocSpec.scala) { #main-app }
And an @scala[object]@java[class] to hold your code, for example:
Scala
: @@snip [QuickStartDocSpec.scala]($code$/scala/docs/stream/QuickStartDocSpec.scala) { #main-app }
Java
: @@snip [Main.java]($code$/java/jdocs/stream/Main.java) { #main-app }
Now we will start with a rather simple source, emitting the integers 1 to 100:
@@snip [QuickStartDocSpec.scala]($code$/scala/docs/stream/QuickStartDocSpec.scala) { #create-source }
Scala
: @@snip [QuickStartDocSpec.scala]($code$/scala/docs/stream/QuickStartDocSpec.scala) { #create-source }
Java
: @@snip [QuickStartDocTest.java]($code$/java/jdocs/stream/QuickStartDocTest.java) { #create-source }
The `Source` type is parameterized with two types: the first one is the
type of element that this source emits and the second one may signal that
@ -33,7 +49,11 @@ Having created this source means that we have a description of how to emit the
first 100 natural numbers, but this source is not yet active. In order to get
those numbers out we have to run it:
@@snip [QuickStartDocSpec.scala]($code$/scala/docs/stream/QuickStartDocSpec.scala) { #run-source }
Scala
: @@snip [QuickStartDocSpec.scala]($code$/scala/docs/stream/QuickStartDocSpec.scala) { #run-source }
Java
: @@snip [QuickStartDocTest.java]($code$/java/jdocs/stream/QuickStartDocTest.java) { #run-source }
This line will complement the source with a consumer function—in this example
we simply print out the numbers to the console—and pass this little stream
@ -41,35 +61,47 @@ setup to an Actor that runs it. This activation is signaled by having “run”
part of the method name; there are other methods that run Akka Streams, and
they all follow this pattern.
When running this source in a `scala.App` you might notice it does not
When running this @scala[source in a `scala.App`]@java[program] you might notice it does not
terminate, because the `ActorSystem` is never terminated. Luckily
`runForeach` returns a `Future[Done]` which resolves when the stream finishes:
`runForeach` returns a @scala[`Future[Done]`]@java[`CompletionStage<Done>`] which resolves when the stream finishes:
@@snip [QuickStartDocSpec.scala]($code$/scala/docs/stream/QuickStartDocSpec.scala) { #run-source-and-terminate }
Scala
: @@snip [QuickStartDocSpec.scala]($code$/scala/docs/stream/QuickStartDocSpec.scala) { #run-source-and-terminate }
Java
: @@snip [QuickStartDocTest.java]($code$/java/jdocs/stream/QuickStartDocTest.java) { #run-source-and-terminate }
You may wonder where the Actor gets created that runs the stream, and you are
probably also asking yourself what this `materializer` means. In order to get
this value we first need to create an Actor system:
@@snip [QuickStartDocSpec.scala]($code$/scala/docs/stream/QuickStartDocSpec.scala) { #create-materializer }
Scala
: @@snip [QuickStartDocSpec.scala]($code$/scala/docs/stream/QuickStartDocSpec.scala) { #create-materializer }
Java
: @@snip [QuickStartDocTest.java]($code$/java/jdocs/stream/QuickStartDocTest.java) { #create-materializer }
There are other ways to create a materializer, e.g. from an
`ActorContext` when using streams from within Actors. The
`Materializer` is a factory for stream execution engines, it is the
thing that makes streams run—you dont need to worry about any of the details
just now apart from that you need one for calling any of the `run` methods on
a `Source`. The materializer is picked up implicitly if it is omitted
from the `run` method call arguments, which we will do in the following.
a `Source`. @scala[The materializer is picked up implicitly if it is omitted
from the `run` method call arguments, which we will do in the following.]@java[]
The nice thing about Akka Streams is that the `Source` is just a
description of what you want to run, and like an architects blueprint it can
be reused, incorporated into a larger design. We may choose to transform the
source of integers and write it to a file instead:
@@snip [QuickStartDocSpec.scala]($code$/scala/docs/stream/QuickStartDocSpec.scala) { #transform-source }
Scala
: @@snip [QuickStartDocSpec.scala]($code$/scala/docs/stream/QuickStartDocSpec.scala) { #transform-source }
Java
: @@snip [QuickStartDocTest.java]($code$/java/jdocs/stream/QuickStartDocTest.java) { #transform-source }
First we use the `scan` combinator to run a computation over the whole
stream: starting with the number 1 (`BigInt(1)`) we multiple by each of
stream: starting with the number 1 (@scala[`BigInt(1)`]@java[`BigInteger.ONE`]) we multiple by each of
the incoming numbers, one after the other; the scan operation emits the initial
value and then every calculation result. This yields the series of factorial
numbers which we stash away as a `Source` for later reuse—it is
@ -99,23 +131,31 @@ language for writing these streams always flows from left to right (just like
plain English), we need a starting point that is like a source but with an
“open” input. In Akka Streams this is called a `Flow`:
@@snip [QuickStartDocSpec.scala]($code$/scala/docs/stream/QuickStartDocSpec.scala) { #transform-sink }
Scala
: @@snip [QuickStartDocSpec.scala]($code$/scala/docs/stream/QuickStartDocSpec.scala) { #transform-sink }
Java
: @@snip [QuickStartDocTest.java]($code$/java/jdocs/stream/QuickStartDocTest.java) { #transform-sink }
Starting from a flow of strings we convert each to `ByteString` and then
feed to the already known file-writing `Sink`. The resulting blueprint
is a `Sink[String, Future[IOResult]]`, which means that it
is a @scala[`Sink[String, Future[IOResult]]`]@java[`Sink<String, CompletionStage<IOResult>>`], which means that it
accepts strings as its input and when materialized it will create auxiliary
information of type `Future[IOResult]` (when chaining operations on
information of type @scala[`Future[IOResult]`]@java[`CompletionStage<IOResult>`] (when chaining operations on
a `Source` or `Flow` the type of the auxiliary information—called
the “materialized value”—is given by the leftmost starting point; since we want
to retain what the `FileIO.toPath` sink has to offer, we need to say
`Keep.right`).
@scala[`Keep.right`]@java[`Keep.right()`]).
We can use the new and shiny `Sink` we just created by
attaching it to our `factorials` source—after a small adaptation to turn the
numbers into strings:
@@snip [QuickStartDocSpec.scala]($code$/scala/docs/stream/QuickStartDocSpec.scala) { #use-transformed-sink }
Scala
: @@snip [QuickStartDocSpec.scala]($code$/scala/docs/stream/QuickStartDocSpec.scala) { #use-transformed-sink }
Java
: @@snip [QuickStartDocTest.java]($code$/java/jdocs/stream/QuickStartDocTest.java) { #use-transformed-sink }
## Time-Based Processing
@ -127,7 +167,11 @@ number emitted by the `factorials` source is the factorial of zero, the
second is the factorial of one, and so on. We combine these two by forming
strings like `"3! = 6"`.
@@snip [QuickStartDocSpec.scala]($code$/scala/docs/stream/QuickStartDocSpec.scala) { #add-streams }
Scala
: @@snip [QuickStartDocSpec.scala]($code$/scala/docs/stream/QuickStartDocSpec.scala) { #add-streams }
Java
: @@snip [QuickStartDocTest.java]($code$/java/jdocs/stream/QuickStartDocTest.java) { #add-streams }
All operations so far have been time-independent and could have been performed
in the same fashion on strict collections of elements. The next line
@ -143,7 +187,7 @@ that is not immediately visible deserves mention, though: if you try and set
the streams to produce a billion numbers each then you will notice that your
JVM does not crash with an OutOfMemoryError, even though you will also notice
that running the streams happens in the background, asynchronously (this is the
reason for the auxiliary information to be provided as a `Future`). The
reason for the auxiliary information to be provided as a @scala[`Future`]@java[`CompletionStage`], in the future). The
secret that makes this work is that Akka Streams implicitly implement pervasive
flow control, all combinators respect back-pressure. This allows the throttle
combinator to signal to all its upstream sources of data that it can only
@ -152,7 +196,7 @@ second the throttle combinator will assert *back-pressure* upstream.
This is basically all there is to Akka Streams in a nutshell—glossing over the
fact that there are dozens of sources and sinks and many more stream
transformation combinators to choose from, see also @ref:[stages overview](stages-overview.md).
transformation combinators to choose from, see also @ref[stages overview](stages-overview.md).
# Reactive Tweets
@ -168,12 +212,16 @@ allow to control what should happen in such scenarios.
Here's the data model we'll be working with throughout the quickstart examples:
@@snip [TwitterStreamQuickstartDocSpec.scala]($code$/scala/docs/stream/TwitterStreamQuickstartDocSpec.scala) { #model }
Scala
: @@snip [TwitterStreamQuickstartDocSpec.scala]($code$/scala/docs/stream/TwitterStreamQuickstartDocSpec.scala) { #model }
Java
: @@snip [TwitterStreamQuickstartDocTest.java]($code$/java/jdocs/stream/TwitterStreamQuickstartDocTest.java) { #model }
@@@ note
If you would like to get an overview of the used vocabulary first instead of diving head-first
into an actual example you can have a look at the @ref:[Core concepts](stream-flows-and-basics.md#core-concepts) and @ref:[Defining and running streams](stream-flows-and-basics.md#defining-and-running-streams)
into an actual example you can have a look at the @ref[Core concepts](stream-flows-and-basics.md#core-concepts) and @ref[Defining and running streams](stream-flows-and-basics.md#defining-and-running-streams)
sections of the docs, and then come back to this quickstart to see it all pieced together into a simple example application.
@@@
@ -186,45 +234,73 @@ like for example finding all twitter handles of users who tweet about `#akka`.
In order to prepare our environment by creating an `ActorSystem` and `ActorMaterializer`,
which will be responsible for materializing and running the streams we are about to create:
@@snip [TwitterStreamQuickstartDocSpec.scala]($code$/scala/docs/stream/TwitterStreamQuickstartDocSpec.scala) { #materializer-setup }
Scala
: @@snip [TwitterStreamQuickstartDocSpec.scala]($code$/scala/docs/stream/TwitterStreamQuickstartDocSpec.scala) { #materializer-setup }
Java
: @@snip [TwitterStreamQuickstartDocTest.java]($code$/java/jdocs/stream/TwitterStreamQuickstartDocTest.java) { #materializer-setup }
The `ActorMaterializer` can optionally take `ActorMaterializerSettings` which can be used to define
materialization properties, such as default buffer sizes (see also @ref:[Buffers for asynchronous stages](stream-rate.md#async-stream-buffers)), the dispatcher to
materialization properties, such as default buffer sizes (see also @ref[Buffers for asynchronous stages](stream-rate.md#async-stream-buffers)), the dispatcher to
be used by the pipeline etc. These can be overridden with `withAttributes` on `Flow`, `Source`, `Sink` and `Graph`.
Let's assume we have a stream of tweets readily available. In Akka this is expressed as a `Source[Out, M]`:
Let's assume we have a stream of tweets readily available. In Akka this is expressed as a @scala[`Source[Out, M]`]@java[`Source<Out, M>`]:
@@snip [TwitterStreamQuickstartDocSpec.scala]($code$/scala/docs/stream/TwitterStreamQuickstartDocSpec.scala) { #tweet-source }
Scala
: @@snip [TwitterStreamQuickstartDocSpec.scala]($code$/scala/docs/stream/TwitterStreamQuickstartDocSpec.scala) { #tweet-source }
Streams always start flowing from a `Source[Out,M1]` then can continue through `Flow[In,Out,M2]` elements or
more advanced graph elements to finally be consumed by a `Sink[In,M3]` (ignore the type parameters `M1`, `M2`
Java
: @@snip [TwitterStreamQuickstartDocTest.java]($code$/java/jdocs/stream/TwitterStreamQuickstartDocTest.java) { #tweet-source }
Streams always start flowing from a @scala[`Source[Out,M1]`]@java[`Source<Out,M1>`] then can continue through @scala[`Flow[In,Out,M2]`]@java[`Flow<In,Out,M2>`] elements or
more advanced graph elements to finally be consumed by a @scala[`Sink[In,M3]`]@java[`Sink<In,M3>`] @scala[(ignore the type parameters `M1`, `M2`
and `M3` for now, they are not relevant to the types of the elements produced/consumed by these classes they are
"materialized types", which we'll talk about [below](#materialized-values-quick)).
"materialized types", which we'll talk about [below](#materialized-values-quick))]@java[. The first type parameter—`Tweet` in this case—designates the kind of elements produced
by the source while the `M` type parameters describe the object that is created during
materialization ([see below](#materialized-values-quick))—`NotUsed` (from the `scala.runtime`
package) means that no value is produced, it is the generic equivalent of `void`.]
The operations should look familiar to anyone who has used the Scala Collections library,
however they operate on streams and not collections of data (which is a very important distinction, as some operations
only make sense in streaming and vice versa):
@@snip [TwitterStreamQuickstartDocSpec.scala]($code$/scala/docs/stream/TwitterStreamQuickstartDocSpec.scala) { #authors-filter-map }
Scala
: @@snip [TwitterStreamQuickstartDocSpec.scala]($code$/scala/docs/stream/TwitterStreamQuickstartDocSpec.scala) { #authors-filter-map }
Finally in order to @ref:[materialize](stream-flows-and-basics.md#stream-materialization) and run the stream computation we need to attach
the Flow to a `Sink` that will get the Flow running. The simplest way to do this is to call
`runWith(sink)` on a `Source`. For convenience a number of common Sinks are predefined and collected as methods on
the `Sink` companion object.
Java
: @@snip [TwitterStreamQuickstartDocTest.java]($code$/java/jdocs/stream/TwitterStreamQuickstartDocTest.java) { #authors-filter-map }
Finally in order to @ref[materialize](stream-flows-and-basics.md#stream-materialization) and run the stream computation we need to attach
the Flow to a @scala[`Sink`]@java[`Sink<T, M>`] that will get the Flow running. The simplest way to do this is to call
`runWith(sink)` on a @scala[`Source`]@java[`Source<Out, M>`]. For convenience a number of common Sinks are predefined and collected as @scala[]@java[static] methods on
the @scala[`Sink` companion object]@java[`Sink class`].
For now let's simply print each author:
@@snip [TwitterStreamQuickstartDocSpec.scala]($code$/scala/docs/stream/TwitterStreamQuickstartDocSpec.scala) { #authors-foreachsink-println }
Scala
: @@snip [TwitterStreamQuickstartDocSpec.scala]($code$/scala/docs/stream/TwitterStreamQuickstartDocSpec.scala) { #authors-foreachsink-println }
Java
: @@snip [TwitterStreamQuickstartDocTest.java]($code$/java/jdocs/stream/TwitterStreamQuickstartDocTest.java) { #authors-foreachsink-println }
or by using the shorthand version (which are defined only for the most popular Sinks such as `Sink.fold` and `Sink.foreach`):
@@snip [TwitterStreamQuickstartDocSpec.scala]($code$/scala/docs/stream/TwitterStreamQuickstartDocSpec.scala) { #authors-foreach-println }
Scala
: @@snip [TwitterStreamQuickstartDocSpec.scala]($code$/scala/docs/stream/TwitterStreamQuickstartDocSpec.scala) { #authors-foreach-println }
Materializing and running a stream always requires a `Materializer` to be in implicit scope (or passed in explicitly,
like this: `.run(materializer)`).
Java
: @@snip [TwitterStreamQuickstartDocTest.java]($code$/java/jdocs/stream/TwitterStreamQuickstartDocTest.java) { #authors-foreach-println }
Materializing and running a stream always requires a `Materializer` to be @scala[in implicit scope (or passed in explicitly,
like this: `.run(materializer)`)]@java[to be passed in explicitly, like this: `.run(mat)`].
The complete snippet looks like this:
@@snip [TwitterStreamQuickstartDocSpec.scala]($code$/scala/docs/stream/TwitterStreamQuickstartDocSpec.scala) { #first-sample }
Scala
: @@snip [TwitterStreamQuickstartDocSpec.scala]($code$/scala/docs/stream/TwitterStreamQuickstartDocSpec.scala) { #first-sample }
Java
: @@snip [TwitterStreamQuickstartDocTest.java]($code$/java/jdocs/stream/TwitterStreamQuickstartDocTest.java) { #first-sample }
## Flattening sequences in streams
@ -233,16 +309,20 @@ we might want to map from one element to a number of elements and receive a "fla
works on Scala Collections. In order to get a flattened stream of hashtags from our stream of tweets we can use the `mapConcat`
combinator:
@@snip [TwitterStreamQuickstartDocSpec.scala]($code$/scala/docs/stream/TwitterStreamQuickstartDocSpec.scala) { #hashtags-mapConcat }
Scala
: @@snip [TwitterStreamQuickstartDocSpec.scala]($code$/scala/docs/stream/TwitterStreamQuickstartDocSpec.scala) { #hashtags-mapConcat }
Java
: @@snip [TwitterStreamQuickstartDocTest.java]($code$/java/jdocs/stream/TwitterStreamQuickstartDocTest.java) { #hashtags-mapConcat }
@@@ note
The name `flatMap` was consciously avoided due to its proximity with for-comprehensions and monadic composition.
It is problematic for two reasons: first, flattening by concatenation is often undesirable in bounded stream processing
due to the risk of deadlock (with merge being the preferred strategy), and second, the monad laws would not hold for
It is problematic for two reasons: @scala[first]@java[firstly], flattening by concatenation is often undesirable in bounded stream processing
due to the risk of deadlock (with merge being the preferred strategy), and @scala[second]@java[secondly], the monad laws would not hold for
our implementation of flatMap (due to the liveness issues).
Please note that the `mapConcat` requires the supplied function to return an iterable (`f: Out => immutable.Iterable[T]`),
Please note that the `mapConcat` requires the supplied function to return @scala[an iterable (`f: Out => immutable.Iterable[T]`]@java[a strict collection (`Out f -> java.util.List<T>`)],
whereas `flatMap` would have to operate on streams all the way through.
@@@
@ -263,13 +343,17 @@ at the expense of not reading as familiarly as collection transformations.
Graphs are constructed using `GraphDSL` like this:
@@snip [TwitterStreamQuickstartDocSpec.scala]($code$/scala/docs/stream/TwitterStreamQuickstartDocSpec.scala) { #graph-dsl-broadcast }
Scala
: @@snip [TwitterStreamQuickstartDocSpec.scala]($code$/scala/docs/stream/TwitterStreamQuickstartDocSpec.scala) { #graph-dsl-broadcast }
As you can see, inside the `GraphDSL` we use an implicit graph builder `b` to mutably construct the graph
Java
: @@snip [TwitterStreamQuickstartDocTest.java]($code$/java/jdocs/stream/TwitterStreamQuickstartDocTest.java) { #graph-dsl-broadcast }
As you can see, @scala[inside the `GraphDSL` we use an implicit graph builder `b` to mutably construct the graph
using the `~>` "edge operator" (also read as "connect" or "via" or "to"). The operator is provided implicitly
by importing `GraphDSL.Implicits._`.
by importing `GraphDSL.Implicits._`]@java[we use graph builder `b` to construct the graph using `UniformFanOutShape` and `Flow` s].
`GraphDSL.create` returns a `Graph`, in this example a `Graph[ClosedShape, NotUsed]` where
`GraphDSL.create` returns a `Graph`, in this example a @scala[`Graph[ClosedShape, NotUsed]`]@java[`Graph<ClosedShape,NotUsed>`] where
`ClosedShape` means that it is *a fully connected graph* or "closed" - there are no unconnected inputs or outputs.
Since it is closed it is possible to transform the graph into a `RunnableGraph` using `RunnableGraph.fromGraph`.
The runnable graph can then be `run()` to materialize a stream out of it.
@ -278,16 +362,16 @@ Both `Graph` and `RunnableGraph` are *immutable, thread-safe, and freely shareab
A graph can also have one of several other shapes, with one or more unconnected ports. Having unconnected ports
expresses a graph that is a *partial graph*. Concepts around composing and nesting graphs in large structures are
explained in detail in @ref:[Modularity, Composition and Hierarchy](stream-composition.md). It is also possible to wrap complex computation graphs
explained in detail in @ref[Modularity, Composition and Hierarchy](stream-composition.md). It is also possible to wrap complex computation graphs
as Flows, Sinks or Sources, which will be explained in detail in
@ref:[Constructing Sources, Sinks and Flows from Partial Graphs](stream-graphs.md#constructing-sources-sinks-flows-from-partial-graphs).
@scala[@ref[Constructing Sources, Sinks and Flows from Partial Graphs](stream-graphs.md#constructing-sources-sinks-flows-from-partial-graphs)]@java[@ref:[Constructing and combining Partial Graphs](stream-graphs.md#partial-graph-dsl)].
## Back-pressure in action
One of the main advantages of Akka Streams is that they *always* propagate back-pressure information from stream Sinks
(Subscribers) to their Sources (Publishers). It is not an optional feature, and is enabled at all times. To learn more
about the back-pressure protocol used by Akka Streams and all other Reactive Streams compatible implementations read
@ref:[Back-pressure explained](stream-flows-and-basics.md#back-pressure-explained).
@ref[Back-pressure explained](stream-flows-and-basics.md#back-pressure-explained).
A typical problem applications (not using Akka Streams) like this often face is that they are unable to process the incoming data fast enough,
either temporarily or by design, and will start buffering incoming data until there's no more space to buffer, resulting
@ -295,11 +379,15 @@ in either `OutOfMemoryError` s or other severe degradations of service responsiv
and must be handled explicitly. For example, if we are only interested in the "*most recent tweets, with a buffer of 10
elements*" this can be expressed using the `buffer` element:
@@snip [TwitterStreamQuickstartDocSpec.scala]($code$/scala/docs/stream/TwitterStreamQuickstartDocSpec.scala) { #tweets-slow-consumption-dropHead }
Scala
: @@snip [TwitterStreamQuickstartDocSpec.scala]($code$/scala/docs/stream/TwitterStreamQuickstartDocSpec.scala) { #tweets-slow-consumption-dropHead }
Java
: @@snip [TwitterStreamQuickstartDocTest.java]($code$/java/jdocs/stream/TwitterStreamQuickstartDocTest.java) { #tweets-slow-consumption-dropHead }
The `buffer` element takes an explicit and required `OverflowStrategy`, which defines how the buffer should react
when it receives another element while it is full. Strategies provided include dropping the oldest element (`dropHead`),
dropping the entire buffer, signalling errors etc. Be sure to pick and choose the strategy that fits your use case best.
dropping the entire buffer, signalling @scala[errors]@java[failures] etc. Be sure to pick and choose the strategy that fits your use case best.
<a id="materialized-values-quick"></a>
## Materialized values
@ -311,27 +399,33 @@ While this question is not as obvious to give an answer to in case of an infinit
this question in a streaming setting would be to create a stream of counts described as "*up until now*, we've processed N tweets"),
but in general it is possible to deal with finite streams and come up with a nice result such as a total count of elements.
First, let's write such an element counter using `Sink.fold` and see how the types look like:
First, let's write such an element counter using @scala[`Sink.fold` and]@java[`Flow.of(Class)` and `Sink.fold` to] see how the types look like:
@@snip [TwitterStreamQuickstartDocSpec.scala]($code$/scala/docs/stream/TwitterStreamQuickstartDocSpec.scala) { #tweets-fold-count }
Scala
: @@snip [TwitterStreamQuickstartDocSpec.scala]($code$/scala/docs/stream/TwitterStreamQuickstartDocSpec.scala) { #tweets-fold-count }
First we prepare a reusable `Flow` that will change each incoming tweet into an integer of value `1`. We'll use this in
Java
: @@snip [TwitterStreamQuickstartDocTest.java]($code$/java/jdocs/stream/TwitterStreamQuickstartDocTest.java) { #tweets-fold-count }
@scala[First we prepare a reusable `Flow` that will change each incoming tweet into an integer of value `1`. We'll use this in
order to combine those with a `Sink.fold` that will sum all `Int` elements of the stream and make its result available as
a `Future[Int]`. Next we connect the `tweets` stream to `count` with `via`. Finally we connect the Flow to the previously
prepared Sink using `toMat`.
prepared Sink using `toMat`]@java[`Sink.fold` will sum all `Integer` elements of the stream and make its result available as
a `CompletionStage<Integer>`. Next we use the `map` method of `tweets` `Source` which will change each incoming tweet
into an integer value `1`. Finally we connect the Flow to the previously prepared Sink using `toMat`].
Remember those mysterious `Mat` type parameters on ``Source[+Out, +Mat]``, `Flow[-In, +Out, +Mat]` and `Sink[-In, +Mat]`?
Remember those mysterious `Mat` type parameters on @scala[``Source[+Out, +Mat]``, `Flow[-In, +Out, +Mat]` and `Sink[-In, +Mat]`]@java[``Source<Out, Mat>``, `Flow<In, Out, Mat>` and `Sink<In, Mat>`]?
They represent the type of values these processing parts return when materialized. When you chain these together,
you can explicitly combine their materialized values. In our example we used the `Keep.right` predefined function,
you can explicitly combine their materialized values. In our example we used the @scala[`Keep.right`]@java[`Keep.right()`] predefined function,
which tells the implementation to only care about the materialized type of the stage currently appended to the right.
The materialized type of `sumSink` is `Future[Int]` and because of using `Keep.right`, the resulting `RunnableGraph`
has also a type parameter of `Future[Int]`.
The materialized type of `sumSink` is @scala[`Future[Int]`]@java[`CompletionStage<Integer>`] and because of using @scala[`Keep.right`]@java[`Keep.right()`], the resulting `RunnableGraph`
has also a type parameter of @scala[`Future[Int]`]@java[`CompletionStage<Integer>`].
This step does *not* yet materialize the
processing pipeline, it merely prepares the description of the Flow, which is now connected to a Sink, and therefore can
be `run()`, as indicated by its type: `RunnableGraph[Future[Int]]`. Next we call `run()` which uses the implicit `ActorMaterializer`
to materialize and run the Flow. The value returned by calling `run()` on a `RunnableGraph[T]` is of type `T`.
In our case this type is `Future[Int]` which, when completed, will contain the total length of our `tweets` stream.
be `run()`, as indicated by its type: @scala[`RunnableGraph[Future[Int]]`]@java[`RunnableGraph<CompletionStage<Integer>>`]. Next we call `run()` which uses the @scala[implicit]@java[] `ActorMaterializer`
to materialize and run the Flow. The value returned by calling `run()` on a @scala[`RunnableGraph[T]`]@java[`RunnableGraph<T>`] is of type `T`.
In our case this type is @scala[`Future[Int]`]@java[`CompletionStage<Integer>`] which, when completed, will contain the total length of our `tweets` stream.
In case of the stream failing, this future would complete with a Failure.
A `RunnableGraph` may be reused
@ -339,18 +433,26 @@ and materialized multiple times, because it is just the "blueprint" of the strea
for example one that consumes a live stream of tweets within a minute, the materialized values for those two materializations
will be different, as illustrated by this example:
@@snip [TwitterStreamQuickstartDocSpec.scala]($code$/scala/docs/stream/TwitterStreamQuickstartDocSpec.scala) { #tweets-runnable-flow-materialized-twice }
Scala
: @@snip [TwitterStreamQuickstartDocSpec.scala]($code$/scala/docs/stream/TwitterStreamQuickstartDocSpec.scala) { #tweets-runnable-flow-materialized-twice }
Java
: @@snip [TwitterStreamQuickstartDocTest.java]($code$/java/jdocs/stream/TwitterStreamQuickstartDocTest.java) { #tweets-runnable-flow-materialized-twice }
Many elements in Akka Streams provide materialized values which can be used for obtaining either results of computation or
steering these elements which will be discussed in detail in @ref:[Stream Materialization](stream-flows-and-basics.md#stream-materialization). Summing up this section, now we know
steering these elements which will be discussed in detail in @ref[Stream Materialization](stream-flows-and-basics.md#stream-materialization). Summing up this section, now we know
what happens behind the scenes when we run this one-liner, which is equivalent to the multi line version above:
@@snip [TwitterStreamQuickstartDocSpec.scala]($code$/scala/docs/stream/TwitterStreamQuickstartDocSpec.scala) { #tweets-fold-count-oneline }
Scala
: @@snip [TwitterStreamQuickstartDocSpec.scala]($code$/scala/docs/stream/TwitterStreamQuickstartDocSpec.scala) { #tweets-fold-count-oneline }
Java
: @@snip [TwitterStreamQuickstartDocTest.java]($code$/java/jdocs/stream/TwitterStreamQuickstartDocTest.java) { #tweets-fold-count-oneline }
@@@ note
`runWith()` is a convenience method that automatically ignores the materialized value of any other stages except
those appended by the `runWith()` itself. In the above example it translates to using `Keep.right` as the combiner
those appended by the `runWith()` itself. In the above example it translates to using @scala[`Keep.right`]@java[`Keep.right()`] as the combiner
for materialized values.
@@@