diff --git a/akka-docs/src/main/paradox/stream/stream-quickstart.md b/akka-docs/src/main/paradox/stream/stream-quickstart.md index c10ccabe00..e1576680ba 100644 --- a/akka-docs/src/main/paradox/stream/stream-quickstart.md +++ b/akka-docs/src/main/paradox/stream/stream-quickstart.md @@ -38,7 +38,7 @@ Scala Java : @@snip [QuickStartDocTest.java](/akka-docs/src/test/java/jdocs/stream/QuickStartDocTest.java) { #other-imports } -And @scala[an object]@java[a class] to start an Akka `ActorSystem` and hold your code @scala[. Making the `ActorSystem` +And @scala[an object]@java[a class] to start an Akka @apidoc[akka.actor.ActorSystem] and hold your code @scala[. Making the `ActorSystem` implicit makes it available to the streams without manually passing it when running them]: Scala @@ -55,11 +55,11 @@ Scala Java : @@snip [QuickStartDocTest.java](/akka-docs/src/test/java/jdocs/stream/QuickStartDocTest.java) { #create-source } -The `Source` type is parameterized with two types: the first one is the +The @apidoc[akka.stream.*.Source] type is parameterized with two types: the first one is the type of element that this source emits and the second one, the "materialized value", allows running the source to produce some auxiliary value (e.g. a network source may provide information about the bound port or the peer’s address). Where no -auxiliary information is produced, the type `akka.NotUsed` is used. A +auxiliary information is produced, the type @apidoc[akka.NotUsed] is used. A simple range of integers falls into this category - running our stream produces a `NotUsed`. @@ -80,8 +80,8 @@ part of the method name; there are other methods that run Akka Streams, and they all follow this pattern. When running this @scala[source in a `scala.App`]@java[program] you might notice it does not -terminate, because the `ActorSystem` is never terminated. Luckily -`runForeach` returns a @scala[`Future[Done]`]@java[`CompletionStage`] which resolves when the stream finishes: +terminate, because the @apidoc[akka.actor.ActorSystem] is never terminated. Luckily +@apidoc[runForeach](akka.stream.*.Source) {scala="#runForeach(f:Out=%3EUnit)(implicitmaterializer:akka.stream.Materializer):scala.concurrent.Future[akka.Done]" java="#runForeach(akka.japi.function.Procedure,akka.stream.Materializer)"} returns a @scala[@scaladoc[Future](scala.concurrent.Future)[@apidoc[akka.Done]]]@java[@javadoc[CompletionStage](java.util.concurrent.CompletionStage)<@apidoc[akka.Done]>] which resolves when the stream finishes: Scala : @@snip [QuickStartDocSpec.scala](/akka-docs/src/test/scala/docs/stream/QuickStartDocSpec.scala) { #run-source-and-terminate } @@ -100,17 +100,17 @@ Scala Java : @@snip [QuickStartDocTest.java](/akka-docs/src/test/java/jdocs/stream/QuickStartDocTest.java) { #transform-source } -First we use the `scan` operator to run a computation over the whole +First we use the @apidoc[scan](akka.stream.*.Source) {scala="#scan[T](zero:T)(f:(T,Out)=%3ET):FlowOps.this.Repr[T]" java="#scan(T,akka.japi.function.Function2)"} operator to run a computation over the whole stream: starting with the number 1 (@scala[`BigInt(1)`]@java[`BigInteger.ONE`]) we multiply by each of the incoming numbers, one after the other; the scan operation emits the initial value and then every calculation result. This yields the series of factorial numbers which we stash away as a `Source` for later reuse—it is important to keep in mind that nothing is actually computed yet, this is a description of what we want to have computed once we run the stream. Then we -convert the resulting series of numbers into a stream of `ByteString` +convert the resulting series of numbers into a stream of @apidoc[akka.util.ByteString] objects describing lines in a text file. This stream is then run by attaching a file as the receiver of the data. In the terminology of Akka Streams this is -called a `Sink`. `IOResult` is a type that IO operations return in +called a @apidoc[akka.stream.*.Sink]. @apidoc[akka.stream.IOResult] is a type that IO operations return in Akka Streams in order to tell you how many bytes or elements were consumed and whether the stream terminated normally or exceptionally. @@ -126,12 +126,12 @@ Here is another example that you can edit and run in the browser: One of the nice parts of Akka Streams—and something that other stream libraries do not offer—is that not only sources can be reused like blueprints, all other -elements can be as well. We can take the file-writing `Sink`, prepend -the processing steps necessary to get the `ByteString` elements from +elements can be as well. We can take the file-writing @apidoc[akka.stream.*.Sink], prepend +the processing steps necessary to get the @apidoc[akka.util.ByteString] elements from incoming strings and package that up as a reusable piece as well. Since the language for writing these streams always flows from left to right (just like plain English), we need a starting point that is like a source but with an -“open” input. In Akka Streams this is called a `Flow`: +“open” input. In Akka Streams this is called a @apidoc[akka.stream.*.Flow]: Scala : @@snip [QuickStartDocSpec.scala](/akka-docs/src/test/scala/docs/stream/QuickStartDocSpec.scala) { #transform-sink } @@ -143,13 +143,13 @@ Starting from a flow of strings we convert each to `ByteString` and then feed to the already known file-writing `Sink`. The resulting blueprint is a @scala[`Sink[String, Future[IOResult]]`]@java[`Sink>`], which means that it accepts strings as its input and when materialized it will create auxiliary -information of type @scala[`Future[IOResult]`]@java[`CompletionStage`] (when chaining operations on +information of type @scala[@scaladoc[Future](scala.concurrent.Future)[@apidoc[akka.stream.IOResult]]]@java[@javadoc[CompletionStage](java.util.concurrent.CompletionStage)<@apidoc[akka.stream.IOResult]>] (when chaining operations on a `Source` or `Flow` the type of the auxiliary information—called the “materialized value”—is given by the leftmost starting point; since we want -to retain what the `FileIO.toPath` sink has to offer, we need to say -@scala[`Keep.right`]@java[`Keep.right()`]). +to retain what the @apidoc[FileIO.toPath](akka.stream.*.FileIO$) {scala="#toPath(f:java.nio.file.Path,options:Set[java.nio.file.OpenOption],startPosition:Long):akka.stream.scaladsl.Sink[akka.util.ByteString,scala.concurrent.Future[akka.stream.IOResult]]" java="#toPath(java.nio.file.Path)"} sink has to offer, we need to say +@scala[@scaladoc[Keep.right](akka.stream.scaladsl.Keep$#right[L,R]:(L,R)=%3ER)]@java[@javadoc[Keep.right()](akka.stream.javadsl.Keep$#right())]. -We can use the new and shiny `Sink` we just created by +We can use the new and shiny @apidoc[akka.stream.*.Sink] we just created by attaching it to our `factorials` source—after a small adaptation to turn the numbers into strings: @@ -164,7 +164,7 @@ Java Before we start looking at a more involved example we explore the streaming nature of what Akka Streams can do. Starting from the `factorials` source we transform the stream by zipping it together with another stream, -represented by a `Source` that emits the number 0 to 100: the first +represented by a @apidoc[akka.stream.*.Source] that emits the number 0 to 100: the first number emitted by the `factorials` source is the factorial of zero, the second is the factorial of one, and so on. We combine these two by forming strings like `"3! = 6"`. @@ -178,7 +178,7 @@ Java All operations so far have been time-independent and could have been performed in the same fashion on strict collections of elements. The next line demonstrates that we are in fact dealing with streams that can flow at a -certain speed: we use the `throttle` operator to slow down the stream to 1 +certain speed: we use the @apidoc[throttle](akka.stream.*.Source) {scala="#throttle(elements:Int,per:scala.concurrent.duration.FiniteDuration):FlowOps.this.Repr[Out]" java="#throttle(int,java.time.Duration)"} operator to slow down the stream to 1 element per second. If you run this program you will see one line printed per second. One aspect @@ -186,7 +186,7 @@ that is not immediately visible deserves mention, though: if you try and set the streams to produce a billion numbers each then you will notice that your JVM does not crash with an OutOfMemoryError, even though you will also notice that running the streams happens in the background, asynchronously (this is the -reason for the auxiliary information to be provided as a @scala[`Future`]@java[`CompletionStage`], in the future). The +reason for the auxiliary information to be provided as a @scala[@scaladoc[Future](scala.concurrent.Future)]@java[@javadoc[CompletionStage](java.util.concurrent.CompletionStage)], in the future). The secret that makes this work is that Akka Streams implicitly implement pervasive flow control, all operators respect back-pressure. This allows the throttle operator to signal to all its upstream sources of data that it can only @@ -230,7 +230,7 @@ sections of the docs, and then come back to this quickstart to see it all pieced The example application we will be looking at is a simple Twitter feed stream from which we'll want to extract certain information, like for example finding all twitter handles of users who tweet about `#akka`. -In order to prepare our environment by creating an `ActorSystem` which will be responsible for running the streams we are about to create: +In order to prepare our environment by creating an @apidoc[akka.actor.ActorSystem] which will be responsible for running the streams we are about to create: Scala : @@snip [TwitterStreamQuickstartDocSpec.scala](/akka-docs/src/test/scala/docs/stream/TwitterStreamQuickstartDocSpec.scala) { #system-setup } @@ -267,7 +267,7 @@ Java Finally in order to @ref:[materialize](stream-flows-and-basics.md#stream-materialization) and run the stream computation we need to attach the Flow to a @scala[`Sink`]@java[`Sink`] that will get the Flow running. The simplest way to do this is to call -`runWith(sink)` on a @scala[`Source`]@java[`Source`]. For convenience a number of common Sinks are predefined and collected as @java[static] methods on +@apidoc[runWith(sink)](akka.stream.*.Source) {scala="#runWith[Mat2](sink:akka.stream.Graph[akka.stream.SinkShape[Out],Mat2])(implicitmaterializer:akka.stream.Materializer):Mat2" java="#runWith(akka.stream.Graph,akka.actor.ClassicActorSystemProvider)"} on a @scala[`Source`]@java[`Source`]. For convenience a number of common Sinks are predefined and collected as @java[static] methods on the @scala[`Sink` companion object]@java[`Sink class`]. For now let's print each author: @@ -277,7 +277,7 @@ Scala Java : @@snip [TwitterStreamQuickstartDocTest.java](/akka-docs/src/test/java/jdocs/stream/TwitterStreamQuickstartDocTest.java) { #authors-foreachsink-println } -or by using the shorthand version (which are defined only for the most popular Sinks such as `Sink.fold` and `Sink.foreach`): +or by using the shorthand version (which are defined only for the most popular Sinks such as @apidoc[Sink.fold](akka.stream.*.Sink$) {scala="#fold[U,T](zero:U)(f:(U,T)=%3EU):akka.stream.scaladsl.Sink[T,scala.concurrent.Future[U]]" java="#fold(U,akka.japi.function.Function2)"} and @apidoc[Sink.foreach](akka.stream.*.Sink$) {scala="#foreach[T](f:T=%3EUnit):akka.stream.scaladsl.Sink[T,scala.concurrent.Future[akka.Done]]" java="#foreach(akka.japi.function.Procedure)"}): Scala : @@snip [TwitterStreamQuickstartDocSpec.scala](/akka-docs/src/test/scala/docs/stream/TwitterStreamQuickstartDocSpec.scala) { #authors-foreach-println } @@ -286,7 +286,7 @@ Java : @@snip [TwitterStreamQuickstartDocTest.java](/akka-docs/src/test/java/jdocs/stream/TwitterStreamQuickstartDocTest.java) { #authors-foreach-println } Materializing and running a stream always requires an `ActorSystem` to be @scala[in implicit scope (or passed in explicitly, -like this: `.runWith(sink)(system)`)]@java[passed in explicitly, like this: `.runWith(sink, system)`]. +like this: @scaladoc[.runWith(sink)(system)](akka.stream.scaladsl.Source#runWith[Mat2](sink:akka.stream.Graph[akka.stream.SinkShape[Out],Mat2])(implicitmaterializer:akka.stream.Materializer):Mat2))]@java[passed in explicitly, like this: @javadoc[runWith(sink, system)](akka.stream.javadsl.Source#runWith(akka.stream.Graph,akka.stream.Materializer))]. The complete snippet looks like this: @@ -300,7 +300,7 @@ Java In the previous section we were working on 1:1 relationships of elements which is the most common case, but sometimes we might want to map from one element to a number of elements and receive a "flattened" stream, similarly like `flatMap` -works on Scala Collections. In order to get a flattened stream of hashtags from our stream of tweets we can use the `mapConcat` +works on Scala Collections. In order to get a flattened stream of hashtags from our stream of tweets we can use the @apidoc[mapConcat](akka.stream.*.Source) {scala="#mapConcat[T](f:Out=%3EIterableOnce[T]):FlowOps.this.Repr[T]" java="#mapConcat(akka.japi.function.Function)"} operator: Scala @@ -316,7 +316,7 @@ It is problematic for two reasons: @scala[first]@java[firstly], flattening by co due to the risk of deadlock (with merge being the preferred strategy), and @scala[second]@java[secondly], the monad laws would not hold for our implementation of flatMap (due to the liveness issues). -Please note that the `mapConcat` requires the supplied function to return @scala[an iterable (`f: Out => immutable.Iterable[T]`]@java[a strict collection (`Out f -> java.util.List`)], +Please note that the @apidoc[mapConcat](akka.stream.*.Source) {scala="#mapConcat[T](f:Out=%3EIterableOnce[T]):FlowOps.this.Repr[T]" java="#mapConcat(akka.japi.function.Function)"} requires the supplied function to return @scala[an iterable (`f: Out => immutable.Iterable[T]`]@java[a strict collection (`Out f -> java.util.List`)], whereas `flatMap` would have to operate on streams all the way through. @@@ @@ -328,14 +328,14 @@ For example we'd like to write all author handles into one file, and all hashtag This means we have to split the source stream into two streams which will handle the writing to these different files. Elements that can be used to form such "fan-out" (or "fan-in") structures are referred to as "junctions" in Akka Streams. -One of these that we'll be using in this example is called `Broadcast`, and it emits elements from its +One of these that we'll be using in this example is called @apidoc[akka.stream.*.Broadcast$], and it emits elements from its input port to all of its output ports. Akka Streams intentionally separate the linear stream structures (Flows) from the non-linear, branching ones (Graphs) in order to offer the most convenient API for both of these cases. Graphs can express arbitrarily complex stream setups at the expense of not reading as familiarly as collection transformations. -Graphs are constructed using `GraphDSL` like this: +Graphs are constructed using @apidoc[akka.stream.*.GraphDSL$] like this: Scala : @@snip [TwitterStreamQuickstartDocSpec.scala](/akka-docs/src/test/scala/docs/stream/TwitterStreamQuickstartDocSpec.scala) { #graph-dsl-broadcast } @@ -345,12 +345,12 @@ Java As you can see, @scala[inside the `GraphDSL` we use an implicit graph builder `b` to mutably construct the graph using the `~>` "edge operator" (also read as "connect" or "via" or "to"). The operator is provided implicitly -by importing `GraphDSL.Implicits._`]@java[we use graph builder `b` to construct the graph using `UniformFanOutShape` and `Flow` s]. +by importing `GraphDSL.Implicits._`]@java[we use graph builder `b` to construct the graph using @apidoc[akka.stream.UniformFanOutShape] and @apidoc[akka.stream.*.Flow] s]. -`GraphDSL.create` returns a `Graph`, in this example a @scala[`Graph[ClosedShape, NotUsed]`]@java[`Graph`] where -`ClosedShape` means that it is *a fully connected graph* or "closed" - there are no unconnected inputs or outputs. -Since it is closed it is possible to transform the graph into a `RunnableGraph` using `RunnableGraph.fromGraph`. -The `RunnableGraph` can then be `run()` to materialize a stream out of it. +@apidoc[GraphDSL.create](akka.stream.*.GraphDSL$) {scala="#create[S<:akka.stream.Shape,IS<:akka.stream.Shape,Mat](graphs:Seq[akka.stream.Graph[IS,Mat]])(buildBlock:akka.stream.scaladsl.GraphDSL.Builder[Seq[Mat]]=%3E(Seq[IS]=%3ES)):akka.stream.Graph[S,Seq[Mat]]" java="#create(java.util.List,akka.japi.function.Function2)"} returns a @apidoc[akka.stream.Graph], in this example a @scala[`Graph[ClosedShape, NotUsed]`]@java[`Graph`] where +@apidoc[akka.stream.ClosedShape] means that it is *a fully connected graph* or "closed" - there are no unconnected inputs or outputs. +Since it is closed it is possible to transform the graph into a @apidoc[akka.stream.*.RunnableGraph] using @apidoc[RunnableGraph.fromGraph](akka.stream.*.RunnableGraph$) {scala="#fromGraph[Mat](g:akka.stream.Graph[akka.stream.ClosedShape,Mat]):akka.stream.scaladsl.RunnableGraph[Mat]" java="#fromGraph(akka.stream.Graph)"}. +The `RunnableGraph` can then be @apidoc[run()](akka.stream.*.RunnableGraph) {scala="#run()(implicitmaterializer:akka.stream.Materializer):Mat" java="#run(akka.stream.Materializer)"} to materialize a stream out of it. Both `Graph` and `RunnableGraph` are *immutable, thread-safe, and freely shareable*. @@ -369,9 +369,9 @@ about the back-pressure protocol used by Akka Streams and all other Reactive Str A typical problem applications (not using Akka Streams) like this often face is that they are unable to process the incoming data fast enough, either temporarily or by design, and will start buffering incoming data until there's no more space to buffer, resulting -in either `OutOfMemoryError` s or other severe degradations of service responsiveness. With Akka Streams buffering can +in either @javadoc[OutOfMemoryError](java.lang.OutOfMemoryError) s or other severe degradations of service responsiveness. With Akka Streams buffering can and must be handled explicitly. For example, if we are only interested in the "*most recent tweets, with a buffer of 10 -elements*" this can be expressed using the `buffer` element: +elements*" this can be expressed using the @apidoc[buffer](akka.stream.*.Source) {scala="#buffer(size:Int,overflowStrategy:akka.stream.OverflowStrategy):FlowOps.this.Repr[Out]" java="#buffer(int,akka.stream.OverflowStrategy)"} element: Scala : @@snip [TwitterStreamQuickstartDocSpec.scala](/akka-docs/src/test/scala/docs/stream/TwitterStreamQuickstartDocSpec.scala) { #tweets-slow-consumption-dropHead } @@ -379,7 +379,7 @@ Scala Java : @@snip [TwitterStreamQuickstartDocTest.java](/akka-docs/src/test/java/jdocs/stream/TwitterStreamQuickstartDocTest.java) { #tweets-slow-consumption-dropHead } -The `buffer` element takes an explicit and required `OverflowStrategy`, which defines how the buffer should react +The `buffer` element takes an explicit and required @apidoc[akka.stream.OverflowStrategy], which defines how the buffer should react when it receives another element while it is full. Strategies provided include dropping the oldest element (`dropHead`), dropping the entire buffer, signalling @scala[errors]@java[failures] etc. Be sure to pick and choose the strategy that fits your use case best. @@ -393,7 +393,7 @@ While this question is not as obvious to give an answer to in case of an infinit this question in a streaming setting would be to create a stream of counts described as "*up until now*, we've processed N tweets"), but in general it is possible to deal with finite streams and come up with a nice result such as a total count of elements. -First, let's write such an element counter using @scala[`Sink.fold` and]@java[`Flow.of(Class)` and `Sink.fold` to] see how the types look like: +First, let's write such an element counter using @scala[@scaladoc[Sink.fold](akka.stream.scaladsl.Sink$#fold[U,T](zero:U)(f:(U,T)=%3EU):akka.stream.scaladsl.Sink[T,scala.concurrent.Future[U]]) and]@java[@javadoc[Flow.of(Class)](akka.stream.javadsl.Flow#of(java.lang.Class)) and @javadoc[Sink.fold](akka.stream.javadsl.Sink$#fold(U,akka.japi.function.Function2)) to] see how the types look like: Scala : @@snip [TwitterStreamQuickstartDocSpec.scala](/akka-docs/src/test/scala/docs/stream/TwitterStreamQuickstartDocSpec.scala) { #tweets-fold-count }