Use apidoc directive in stream/stream-quickstart.md (#22904) (#31244)

This commit is contained in:
Andrei Arlou 2022-03-23 17:31:48 +02:00 committed by GitHub
parent 44086b0a93
commit 816a4e2a3f
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -38,7 +38,7 @@ Scala
Java
: @@snip [QuickStartDocTest.java](/akka-docs/src/test/java/jdocs/stream/QuickStartDocTest.java) { #other-imports }
And @scala[an object]@java[a class] to start an Akka `ActorSystem` and hold your code @scala[. Making the `ActorSystem`
And @scala[an object]@java[a class] to start an Akka @apidoc[akka.actor.ActorSystem] and hold your code @scala[. Making the `ActorSystem`
implicit makes it available to the streams without manually passing it when running them]:
Scala
@ -55,11 +55,11 @@ Scala
Java
: @@snip [QuickStartDocTest.java](/akka-docs/src/test/java/jdocs/stream/QuickStartDocTest.java) { #create-source }
The `Source` type is parameterized with two types: the first one is the
The @apidoc[akka.stream.*.Source] type is parameterized with two types: the first one is the
type of element that this source emits and the second one, the "materialized value", allows
running the source to produce some auxiliary value (e.g. a network source may
provide information about the bound port or the peers address). Where no
auxiliary information is produced, the type `akka.NotUsed` is used. A
auxiliary information is produced, the type @apidoc[akka.NotUsed] is used. A
simple range of integers falls into this category - running our stream produces
a `NotUsed`.
@ -80,8 +80,8 @@ part of the method name; there are other methods that run Akka Streams, and
they all follow this pattern.
When running this @scala[source in a `scala.App`]@java[program] you might notice it does not
terminate, because the `ActorSystem` is never terminated. Luckily
`runForeach` returns a @scala[`Future[Done]`]@java[`CompletionStage<Done>`] which resolves when the stream finishes:
terminate, because the @apidoc[akka.actor.ActorSystem] is never terminated. Luckily
@apidoc[runForeach](akka.stream.*.Source) {scala="#runForeach(f:Out=%3EUnit)(implicitmaterializer:akka.stream.Materializer):scala.concurrent.Future[akka.Done]" java="#runForeach(akka.japi.function.Procedure,akka.stream.Materializer)"} returns a @scala[@scaladoc[Future](scala.concurrent.Future)[@apidoc[akka.Done]]]@java[@javadoc[CompletionStage](java.util.concurrent.CompletionStage)<@apidoc[akka.Done]>] which resolves when the stream finishes:
Scala
: @@snip [QuickStartDocSpec.scala](/akka-docs/src/test/scala/docs/stream/QuickStartDocSpec.scala) { #run-source-and-terminate }
@ -100,17 +100,17 @@ Scala
Java
: @@snip [QuickStartDocTest.java](/akka-docs/src/test/java/jdocs/stream/QuickStartDocTest.java) { #transform-source }
First we use the `scan` operator to run a computation over the whole
First we use the @apidoc[scan](akka.stream.*.Source) {scala="#scan[T](zero:T)(f:(T,Out)=%3ET):FlowOps.this.Repr[T]" java="#scan(T,akka.japi.function.Function2)"} operator to run a computation over the whole
stream: starting with the number 1 (@scala[`BigInt(1)`]@java[`BigInteger.ONE`]) we multiply by each of
the incoming numbers, one after the other; the scan operation emits the initial
value and then every calculation result. This yields the series of factorial
numbers which we stash away as a `Source` for later reuse—it is
important to keep in mind that nothing is actually computed yet, this is a
description of what we want to have computed once we run the stream. Then we
convert the resulting series of numbers into a stream of `ByteString`
convert the resulting series of numbers into a stream of @apidoc[akka.util.ByteString]
objects describing lines in a text file. This stream is then run by attaching a
file as the receiver of the data. In the terminology of Akka Streams this is
called a `Sink`. `IOResult` is a type that IO operations return in
called a @apidoc[akka.stream.*.Sink]. @apidoc[akka.stream.IOResult] is a type that IO operations return in
Akka Streams in order to tell you how many bytes or elements were consumed and
whether the stream terminated normally or exceptionally.
@ -126,12 +126,12 @@ Here is another example that you can edit and run in the browser:
One of the nice parts of Akka Streams—and something that other stream libraries
do not offer—is that not only sources can be reused like blueprints, all other
elements can be as well. We can take the file-writing `Sink`, prepend
the processing steps necessary to get the `ByteString` elements from
elements can be as well. We can take the file-writing @apidoc[akka.stream.*.Sink], prepend
the processing steps necessary to get the @apidoc[akka.util.ByteString] elements from
incoming strings and package that up as a reusable piece as well. Since the
language for writing these streams always flows from left to right (just like
plain English), we need a starting point that is like a source but with an
“open” input. In Akka Streams this is called a `Flow`:
“open” input. In Akka Streams this is called a @apidoc[akka.stream.*.Flow]:
Scala
: @@snip [QuickStartDocSpec.scala](/akka-docs/src/test/scala/docs/stream/QuickStartDocSpec.scala) { #transform-sink }
@ -143,13 +143,13 @@ Starting from a flow of strings we convert each to `ByteString` and then
feed to the already known file-writing `Sink`. The resulting blueprint
is a @scala[`Sink[String, Future[IOResult]]`]@java[`Sink<String, CompletionStage<IOResult>>`], which means that it
accepts strings as its input and when materialized it will create auxiliary
information of type @scala[`Future[IOResult]`]@java[`CompletionStage<IOResult>`] (when chaining operations on
information of type @scala[@scaladoc[Future](scala.concurrent.Future)[@apidoc[akka.stream.IOResult]]]@java[@javadoc[CompletionStage](java.util.concurrent.CompletionStage)<@apidoc[akka.stream.IOResult]>] (when chaining operations on
a `Source` or `Flow` the type of the auxiliary information—called
the “materialized value”—is given by the leftmost starting point; since we want
to retain what the `FileIO.toPath` sink has to offer, we need to say
@scala[`Keep.right`]@java[`Keep.right()`]).
to retain what the @apidoc[FileIO.toPath](akka.stream.*.FileIO$) {scala="#toPath(f:java.nio.file.Path,options:Set[java.nio.file.OpenOption],startPosition:Long):akka.stream.scaladsl.Sink[akka.util.ByteString,scala.concurrent.Future[akka.stream.IOResult]]" java="#toPath(java.nio.file.Path)"} sink has to offer, we need to say
@scala[@scaladoc[Keep.right](akka.stream.scaladsl.Keep$#right[L,R]:(L,R)=%3ER)]@java[@javadoc[Keep.right()](akka.stream.javadsl.Keep$#right())].
We can use the new and shiny `Sink` we just created by
We can use the new and shiny @apidoc[akka.stream.*.Sink] we just created by
attaching it to our `factorials` source—after a small adaptation to turn the
numbers into strings:
@ -164,7 +164,7 @@ Java
Before we start looking at a more involved example we explore the streaming
nature of what Akka Streams can do. Starting from the `factorials` source
we transform the stream by zipping it together with another stream,
represented by a `Source` that emits the number 0 to 100: the first
represented by a @apidoc[akka.stream.*.Source] that emits the number 0 to 100: the first
number emitted by the `factorials` source is the factorial of zero, the
second is the factorial of one, and so on. We combine these two by forming
strings like `"3! = 6"`.
@ -178,7 +178,7 @@ Java
All operations so far have been time-independent and could have been performed
in the same fashion on strict collections of elements. The next line
demonstrates that we are in fact dealing with streams that can flow at a
certain speed: we use the `throttle` operator to slow down the stream to 1
certain speed: we use the @apidoc[throttle](akka.stream.*.Source) {scala="#throttle(elements:Int,per:scala.concurrent.duration.FiniteDuration):FlowOps.this.Repr[Out]" java="#throttle(int,java.time.Duration)"} operator to slow down the stream to 1
element per second.
If you run this program you will see one line printed per second. One aspect
@ -186,7 +186,7 @@ that is not immediately visible deserves mention, though: if you try and set
the streams to produce a billion numbers each then you will notice that your
JVM does not crash with an OutOfMemoryError, even though you will also notice
that running the streams happens in the background, asynchronously (this is the
reason for the auxiliary information to be provided as a @scala[`Future`]@java[`CompletionStage`], in the future). The
reason for the auxiliary information to be provided as a @scala[@scaladoc[Future](scala.concurrent.Future)]@java[@javadoc[CompletionStage](java.util.concurrent.CompletionStage)], in the future). The
secret that makes this work is that Akka Streams implicitly implement pervasive
flow control, all operators respect back-pressure. This allows the throttle
operator to signal to all its upstream sources of data that it can only
@ -230,7 +230,7 @@ sections of the docs, and then come back to this quickstart to see it all pieced
The example application we will be looking at is a simple Twitter feed stream from which we'll want to extract certain information,
like for example finding all twitter handles of users who tweet about `#akka`.
In order to prepare our environment by creating an `ActorSystem` which will be responsible for running the streams we are about to create:
In order to prepare our environment by creating an @apidoc[akka.actor.ActorSystem] which will be responsible for running the streams we are about to create:
Scala
: @@snip [TwitterStreamQuickstartDocSpec.scala](/akka-docs/src/test/scala/docs/stream/TwitterStreamQuickstartDocSpec.scala) { #system-setup }
@ -267,7 +267,7 @@ Java
Finally in order to @ref:[materialize](stream-flows-and-basics.md#stream-materialization) and run the stream computation we need to attach
the Flow to a @scala[`Sink`]@java[`Sink<T, M>`] that will get the Flow running. The simplest way to do this is to call
`runWith(sink)` on a @scala[`Source`]@java[`Source<Out, M>`]. For convenience a number of common Sinks are predefined and collected as @java[static] methods on
@apidoc[runWith(sink)](akka.stream.*.Source) {scala="#runWith[Mat2](sink:akka.stream.Graph[akka.stream.SinkShape[Out],Mat2])(implicitmaterializer:akka.stream.Materializer):Mat2" java="#runWith(akka.stream.Graph,akka.actor.ClassicActorSystemProvider)"} on a @scala[`Source`]@java[`Source<Out, M>`]. For convenience a number of common Sinks are predefined and collected as @java[static] methods on
the @scala[`Sink` companion object]@java[`Sink class`].
For now let's print each author:
@ -277,7 +277,7 @@ Scala
Java
: @@snip [TwitterStreamQuickstartDocTest.java](/akka-docs/src/test/java/jdocs/stream/TwitterStreamQuickstartDocTest.java) { #authors-foreachsink-println }
or by using the shorthand version (which are defined only for the most popular Sinks such as `Sink.fold` and `Sink.foreach`):
or by using the shorthand version (which are defined only for the most popular Sinks such as @apidoc[Sink.fold](akka.stream.*.Sink$) {scala="#fold[U,T](zero:U)(f:(U,T)=%3EU):akka.stream.scaladsl.Sink[T,scala.concurrent.Future[U]]" java="#fold(U,akka.japi.function.Function2)"} and @apidoc[Sink.foreach](akka.stream.*.Sink$) {scala="#foreach[T](f:T=%3EUnit):akka.stream.scaladsl.Sink[T,scala.concurrent.Future[akka.Done]]" java="#foreach(akka.japi.function.Procedure)"}):
Scala
: @@snip [TwitterStreamQuickstartDocSpec.scala](/akka-docs/src/test/scala/docs/stream/TwitterStreamQuickstartDocSpec.scala) { #authors-foreach-println }
@ -286,7 +286,7 @@ Java
: @@snip [TwitterStreamQuickstartDocTest.java](/akka-docs/src/test/java/jdocs/stream/TwitterStreamQuickstartDocTest.java) { #authors-foreach-println }
Materializing and running a stream always requires an `ActorSystem` to be @scala[in implicit scope (or passed in explicitly,
like this: `.runWith(sink)(system)`)]@java[passed in explicitly, like this: `.runWith(sink, system)`].
like this: @scaladoc[.runWith(sink)(system)](akka.stream.scaladsl.Source#runWith[Mat2](sink:akka.stream.Graph[akka.stream.SinkShape[Out],Mat2])(implicitmaterializer:akka.stream.Materializer):Mat2))]@java[passed in explicitly, like this: @javadoc[runWith(sink, system)](akka.stream.javadsl.Source#runWith(akka.stream.Graph,akka.stream.Materializer))].
The complete snippet looks like this:
@ -300,7 +300,7 @@ Java
In the previous section we were working on 1:1 relationships of elements which is the most common case, but sometimes
we might want to map from one element to a number of elements and receive a "flattened" stream, similarly like `flatMap`
works on Scala Collections. In order to get a flattened stream of hashtags from our stream of tweets we can use the `mapConcat`
works on Scala Collections. In order to get a flattened stream of hashtags from our stream of tweets we can use the @apidoc[mapConcat](akka.stream.*.Source) {scala="#mapConcat[T](f:Out=%3EIterableOnce[T]):FlowOps.this.Repr[T]" java="#mapConcat(akka.japi.function.Function)"}
operator:
Scala
@ -316,7 +316,7 @@ It is problematic for two reasons: @scala[first]@java[firstly], flattening by co
due to the risk of deadlock (with merge being the preferred strategy), and @scala[second]@java[secondly], the monad laws would not hold for
our implementation of flatMap (due to the liveness issues).
Please note that the `mapConcat` requires the supplied function to return @scala[an iterable (`f: Out => immutable.Iterable[T]`]@java[a strict collection (`Out f -> java.util.List<T>`)],
Please note that the @apidoc[mapConcat](akka.stream.*.Source) {scala="#mapConcat[T](f:Out=%3EIterableOnce[T]):FlowOps.this.Repr[T]" java="#mapConcat(akka.japi.function.Function)"} requires the supplied function to return @scala[an iterable (`f: Out => immutable.Iterable[T]`]@java[a strict collection (`Out f -> java.util.List<T>`)],
whereas `flatMap` would have to operate on streams all the way through.
@@@
@ -328,14 +328,14 @@ For example we'd like to write all author handles into one file, and all hashtag
This means we have to split the source stream into two streams which will handle the writing to these different files.
Elements that can be used to form such "fan-out" (or "fan-in") structures are referred to as "junctions" in Akka Streams.
One of these that we'll be using in this example is called `Broadcast`, and it emits elements from its
One of these that we'll be using in this example is called @apidoc[akka.stream.*.Broadcast$], and it emits elements from its
input port to all of its output ports.
Akka Streams intentionally separate the linear stream structures (Flows) from the non-linear, branching ones (Graphs)
in order to offer the most convenient API for both of these cases. Graphs can express arbitrarily complex stream setups
at the expense of not reading as familiarly as collection transformations.
Graphs are constructed using `GraphDSL` like this:
Graphs are constructed using @apidoc[akka.stream.*.GraphDSL$] like this:
Scala
: @@snip [TwitterStreamQuickstartDocSpec.scala](/akka-docs/src/test/scala/docs/stream/TwitterStreamQuickstartDocSpec.scala) { #graph-dsl-broadcast }
@ -345,12 +345,12 @@ Java
As you can see, @scala[inside the `GraphDSL` we use an implicit graph builder `b` to mutably construct the graph
using the `~>` "edge operator" (also read as "connect" or "via" or "to"). The operator is provided implicitly
by importing `GraphDSL.Implicits._`]@java[we use graph builder `b` to construct the graph using `UniformFanOutShape` and `Flow` s].
by importing `GraphDSL.Implicits._`]@java[we use graph builder `b` to construct the graph using @apidoc[akka.stream.UniformFanOutShape] and @apidoc[akka.stream.*.Flow] s].
`GraphDSL.create` returns a `Graph`, in this example a @scala[`Graph[ClosedShape, NotUsed]`]@java[`Graph<ClosedShape,NotUsed>`] where
`ClosedShape` means that it is *a fully connected graph* or "closed" - there are no unconnected inputs or outputs.
Since it is closed it is possible to transform the graph into a `RunnableGraph` using `RunnableGraph.fromGraph`.
The `RunnableGraph` can then be `run()` to materialize a stream out of it.
@apidoc[GraphDSL.create](akka.stream.*.GraphDSL$) {scala="#create[S&lt;:akka.stream.Shape,IS&lt;:akka.stream.Shape,Mat](graphs:Seq[akka.stream.Graph[IS,Mat]])(buildBlock:akka.stream.scaladsl.GraphDSL.Builder[Seq[Mat]]=%3E(Seq[IS]=%3ES)):akka.stream.Graph[S,Seq[Mat]]" java="#create(java.util.List,akka.japi.function.Function2)"} returns a @apidoc[akka.stream.Graph], in this example a @scala[`Graph[ClosedShape, NotUsed]`]@java[`Graph<ClosedShape,NotUsed>`] where
@apidoc[akka.stream.ClosedShape] means that it is *a fully connected graph* or "closed" - there are no unconnected inputs or outputs.
Since it is closed it is possible to transform the graph into a @apidoc[akka.stream.*.RunnableGraph] using @apidoc[RunnableGraph.fromGraph](akka.stream.*.RunnableGraph$) {scala="#fromGraph[Mat](g:akka.stream.Graph[akka.stream.ClosedShape,Mat]):akka.stream.scaladsl.RunnableGraph[Mat]" java="#fromGraph(akka.stream.Graph)"}.
The `RunnableGraph` can then be @apidoc[run()](akka.stream.*.RunnableGraph) {scala="#run()(implicitmaterializer:akka.stream.Materializer):Mat" java="#run(akka.stream.Materializer)"} to materialize a stream out of it.
Both `Graph` and `RunnableGraph` are *immutable, thread-safe, and freely shareable*.
@ -369,9 +369,9 @@ about the back-pressure protocol used by Akka Streams and all other Reactive Str
A typical problem applications (not using Akka Streams) like this often face is that they are unable to process the incoming data fast enough,
either temporarily or by design, and will start buffering incoming data until there's no more space to buffer, resulting
in either `OutOfMemoryError` s or other severe degradations of service responsiveness. With Akka Streams buffering can
in either @javadoc[OutOfMemoryError](java.lang.OutOfMemoryError) s or other severe degradations of service responsiveness. With Akka Streams buffering can
and must be handled explicitly. For example, if we are only interested in the "*most recent tweets, with a buffer of 10
elements*" this can be expressed using the `buffer` element:
elements*" this can be expressed using the @apidoc[buffer](akka.stream.*.Source) {scala="#buffer(size:Int,overflowStrategy:akka.stream.OverflowStrategy):FlowOps.this.Repr[Out]" java="#buffer(int,akka.stream.OverflowStrategy)"} element:
Scala
: @@snip [TwitterStreamQuickstartDocSpec.scala](/akka-docs/src/test/scala/docs/stream/TwitterStreamQuickstartDocSpec.scala) { #tweets-slow-consumption-dropHead }
@ -379,7 +379,7 @@ Scala
Java
: @@snip [TwitterStreamQuickstartDocTest.java](/akka-docs/src/test/java/jdocs/stream/TwitterStreamQuickstartDocTest.java) { #tweets-slow-consumption-dropHead }
The `buffer` element takes an explicit and required `OverflowStrategy`, which defines how the buffer should react
The `buffer` element takes an explicit and required @apidoc[akka.stream.OverflowStrategy], which defines how the buffer should react
when it receives another element while it is full. Strategies provided include dropping the oldest element (`dropHead`),
dropping the entire buffer, signalling @scala[errors]@java[failures] etc. Be sure to pick and choose the strategy that fits your use case best.
@ -393,7 +393,7 @@ While this question is not as obvious to give an answer to in case of an infinit
this question in a streaming setting would be to create a stream of counts described as "*up until now*, we've processed N tweets"),
but in general it is possible to deal with finite streams and come up with a nice result such as a total count of elements.
First, let's write such an element counter using @scala[`Sink.fold` and]@java[`Flow.of(Class)` and `Sink.fold` to] see how the types look like:
First, let's write such an element counter using @scala[@scaladoc[Sink.fold](akka.stream.scaladsl.Sink$#fold[U,T](zero:U)(f:(U,T)=%3EU):akka.stream.scaladsl.Sink[T,scala.concurrent.Future[U]]) and]@java[@javadoc[Flow.of(Class)](akka.stream.javadsl.Flow#of(java.lang.Class)) and @javadoc[Sink.fold](akka.stream.javadsl.Sink$#fold(U,akka.japi.function.Function2)) to] see how the types look like:
Scala
: @@snip [TwitterStreamQuickstartDocSpec.scala](/akka-docs/src/test/scala/docs/stream/TwitterStreamQuickstartDocSpec.scala) { #tweets-fold-count }