Use apidoc directive in stream/stream-error.md (#22904) (#31220)

This commit is contained in:
Andrei Arlou 2022-03-23 11:28:56 +02:00 committed by GitHub
parent f8caaaff37
commit 5e434998f9
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -20,8 +20,8 @@ Each of the operators downstream gets informed about the failure and each upstre
In many cases you may want to avoid complete stream failure, this can be done in a few different ways: In many cases you may want to avoid complete stream failure, this can be done in a few different ways:
* `recover` to emit a final element then complete the stream normally on upstream failure * @apidoc[recover](akka.stream.*.Source) {scala="#recover[T>:Out](pf:PartialFunction[Throwable,T]):FlowOps.this.Repr[T]" java="#recover(java.lang.Class,java.util.function.Supplier)"} to emit a final element then complete the stream normally on upstream failure
* `recoverWithRetries` to create a new upstream and start consuming from that on failure * @apidoc[recoverWithRetries](akka.stream.*.Source) {scala="#recoverWithRetries[T>:Out](attempts:Int,pf:PartialFunction[Throwable,akka.stream.Graph[akka.stream.SourceShape[T],akka.NotUsed]]):FlowOps.this.Repr[T]" java="#recoverWithRetries(int,java.lang.Class,java.util.function.Supplier)"} to create a new upstream and start consuming from that on failure
* Restarting sections of the stream after a backoff * Restarting sections of the stream after a backoff
* Using a supervision strategy for operators that support it * Using a supervision strategy for operators that support it
@ -30,8 +30,8 @@ inside an actor, and have the actor restart the entire stream on failure.
## Logging errors ## Logging errors
`log()` enables logging of a stream, which is typically useful for error logging. @apidoc[log()](akka.stream.*.Source) {scala="#log(name:String,extract:Out=%3EAny)(implicitlog:akka.event.LoggingAdapter):FlowOps.this.Repr[Out]" java="#log(java.lang.String,akka.japi.function.Function)"} enables logging of a stream, which is typically useful for error logging.
The below stream fails with `ArithmeticException` when the element `0` goes through the `map` operator, The below stream fails with @javadoc[ArithmeticException](java.lang.ArithmeticException) when the element `0` goes through the @apidoc[map](akka.stream.*.Source) {scala="#map[T](f:Out=%3ET):FlowOps.this.Repr[T]" java="#map(akka.japi.function.Function)"} operator,
Scala Scala
: @@snip [RecipeLoggingElements.scala](/akka-docs/src/test/scala/docs/stream/cookbook/RecipeLoggingElements.scala) { #log-error } : @@snip [RecipeLoggingElements.scala](/akka-docs/src/test/scala/docs/stream/cookbook/RecipeLoggingElements.scala) { #log-error }
@ -52,8 +52,8 @@ in @ref:[Logging in streams](stream-cookbook.md#logging-in-streams).
## Recover ## Recover
`recover` allows you to emit a final element and then complete the stream on an upstream failure. @apidoc[recover](akka.stream.*.Source) {scala="#recover[T>:Out](pf:PartialFunction[Throwable,T]):FlowOps.this.Repr[T]" java="#recover(java.lang.Class,java.util.function.Supplier)"} allows you to emit a final element and then complete the stream on an upstream failure.
Deciding which exceptions should be recovered is done through a `PartialFunction`. If an exception Deciding which exceptions should be recovered is done through a @scaladoc[PartialFunction](scala.PartialFunction). If an exception
does not have a @scala[matching case] @java[match defined] the stream is failed. does not have a @scala[matching case] @java[match defined] the stream is failed.
Recovering can be useful if you want to gracefully complete a stream on failure while letting Recovering can be useful if you want to gracefully complete a stream on failure while letting
@ -80,10 +80,10 @@ Java
## Recover with retries ## Recover with retries
`recoverWithRetries` allows you to put a new upstream in place of the failed one, recovering @apidoc[recoverWithRetries](akka.stream.*.Source) {scala="#recoverWithRetries[T>:Out](attempts:Int,pf:PartialFunction[Throwable,akka.stream.Graph[akka.stream.SourceShape[T],akka.NotUsed]]):FlowOps.this.Repr[T]" java="#recoverWithRetries(int,java.lang.Class,java.util.function.Supplier)"} allows you to put a new upstream in place of the failed one, recovering
stream failures up to a specified maximum number of times. stream failures up to a specified maximum number of times.
Deciding which exceptions should be recovered is done through a `PartialFunction`. If an exception Deciding which exceptions should be recovered is done through a @scaladoc[PartialFunction](scala.PartialFunction). If an exception
does not have a @scala[matching case] @java[match defined] the stream is failed. does not have a @scala[matching case] @java[match defined] the stream is failed.
Scala Scala
@ -106,7 +106,7 @@ Java
## Delayed restarts with a backoff operator ## Delayed restarts with a backoff operator
Akka streams provides a `RestartSource`, `RestartSink` and `RestartFlow` for implementing the so-called *exponential backoff Akka streams provides a @apidoc[akka.stream.*.RestartSource$], @apidoc[akka.stream.*.RestartSink$] and @apidoc[akka.stream.*.RestartFlow$] for implementing the so-called *exponential backoff
supervision strategy*, starting an operator again when it fails or completes, each time with a growing time delay between restarts. supervision strategy*, starting an operator again when it fails or completes, each time with a growing time delay between restarts.
This pattern is useful when the operator fails or completes because some external resource is not available This pattern is useful when the operator fails or completes because some external resource is not available
@ -115,7 +115,7 @@ when a WebSocket connection fails due to the HTTP server it's running on going d
By using an exponential backoff, we avoid going into a tight reconnect loop, which both gives the HTTP server some time By using an exponential backoff, we avoid going into a tight reconnect loop, which both gives the HTTP server some time
to recover, and it avoids using needless resources on the client side. to recover, and it avoids using needless resources on the client side.
The various restart shapes mentioned all expect an `akka.stream.RestartSettings` which configures the restart behaviour. The various restart shapes mentioned all expect an @apidoc[akka.stream.RestartSettings] which configures the restart behaviour.
Configurable parameters are: Configurable parameters are:
* `minBackoff` is the initial duration until the underlying stream is restarted * `minBackoff` is the initial duration until the underlying stream is restarted
@ -124,8 +124,8 @@ Configurable parameters are:
* `maxRestarts` caps the total number of restarts * `maxRestarts` caps the total number of restarts
* `maxRestartsWithin` sets a timeframe during which restarts are counted towards the same total for `maxRestarts` * `maxRestartsWithin` sets a timeframe during which restarts are counted towards the same total for `maxRestarts`
The following snippet shows how to create a backoff supervisor using @scala[`akka.stream.scaladsl.RestartSource`] The following snippet shows how to create a backoff supervisor using @apidoc[akka.stream.*.RestartSource$]
@java[`akka.stream.javadsl.RestartSource`] which will supervise the given `Source`. The `Source` in this case is a which will supervise the given @apidoc[akka.stream.*.Source]. The `Source` in this case is a
stream of Server Sent Events, produced by akka-http. If the stream fails or completes at any point, the request will stream of Server Sent Events, produced by akka-http. If the stream fails or completes at any point, the request will
be made again, in increasing intervals of 3, 6, 12, 24 and finally 30 seconds (at which point it will remain capped due be made again, in increasing intervals of 3, 6, 12, 24 and finally 30 seconds (at which point it will remain capped due
to the `maxBackoff` parameter): to the `maxBackoff` parameter):
@ -143,7 +143,7 @@ and re-starting after the same configured interval. By adding additional randomn
re-start intervals the streams will start in slightly different points in time, thus avoiding re-start intervals the streams will start in slightly different points in time, thus avoiding
large spikes of traffic hitting the recovering server or other resource that they all need to contact. large spikes of traffic hitting the recovering server or other resource that they all need to contact.
The above `RestartSource` will never terminate unless the `Sink` it's fed into cancels. It will often be handy to use The above `RestartSource` will never terminate unless the @apidoc[akka.stream.*.Sink] it's fed into cancels. It will often be handy to use
it in combination with a @ref:[`KillSwitch`](stream-dynamic.md#kill-switch), so that you can terminate it when needed: it in combination with a @ref:[`KillSwitch`](stream-dynamic.md#kill-switch), so that you can terminate it when needed:
Scala Scala
@ -152,24 +152,23 @@ Scala
Java Java
: @@snip [RestartDocTest.java](/akka-docs/src/test/java/jdocs/stream/RestartDocTest.java) { #with-kill-switch } : @@snip [RestartDocTest.java](/akka-docs/src/test/java/jdocs/stream/RestartDocTest.java) { #with-kill-switch }
Sinks and flows can also be supervised, using @scala[`akka.stream.scaladsl.RestartSink` and `akka.stream.scaladsl.RestartFlow`] Sinks and flows can also be supervised, using @apidoc[akka.stream.*.RestartSink$] and @apidoc[akka.stream.*.RestartFlow$]. The `RestartSink` is restarted when
@java[`akka.stream.javadsl.RestartSink` and `akka.stream.javadsl.RestartFlow`]. The `RestartSink` is restarted when
it cancels, while the `RestartFlow` is restarted when either the in port cancels, the out port completes, or the out it cancels, while the `RestartFlow` is restarted when either the in port cancels, the out port completes, or the out
port sends an error. port sends an error.
@@@ note @@@ note
Care should be taken when using @ref[`GraphStage`s](stream-customize.md) that conditionally propagate termination signals inside a Care should be taken when using @ref[`GraphStage`s](stream-customize.md) that conditionally propagate termination signals inside a
`RestartSource`, `RestartSink` or `RestartFlow`. @apidoc[akka.stream.*.RestartSource$], @apidoc[akka.stream.*.RestartSink$] or @apidoc[akka.stream.*.RestartFlow$].
An example is a `Broadcast` operator with the default `eagerCancel = false` where An example is a @scaladoc[Broadcast](akka.stream.scaladsl.Broadcast) operator with the default `eagerCancel = false` where
some of the outlets are for side-effecting branches (that do not re-join e.g. via a `Merge`). some of the outlets are for side-effecting branches (that do not re-join e.g. via a `Merge`).
A failure on a side branch will not terminate the supervised stream which will A failure on a side branch will not terminate the supervised stream which will
not be restarted. Conversely, a failure on the main branch can trigger a restart but leave behind old not be restarted. Conversely, a failure on the main branch can trigger a restart but leave behind old
running instances of side branches. running instances of side branches.
In this example `eagerCancel` should probably be set to `true`, or, when only a single side branch is used, `alsoTo` In this example `eagerCancel` should probably be set to `true`, or, when only a single side branch is used, @ref[`alsoTo`](operators/Source-or-Flow/alsoTo.md)
or `divertTo` should be considered as alternatives. or @ref[`divertTo`](operators/Source-or-Flow/divertTo.md) should be considered as alternatives.
@@@ @@@
@ -198,9 +197,9 @@ processing stream elements can be selected when materializing the stream through
There are three ways to handle exceptions from application code: There are three ways to handle exceptions from application code:
* `Stop` - The stream is completed with failure. * @scala[@scaladoc[Stop](akka.stream.Supervision$$Stop$)]@java[@javadoc[Supervision.stop()](akka.stream.Supervision#stop())] - The stream is completed with failure.
* `Resume` - The element is dropped and the stream continues. * @scala[@scaladoc[Resume](akka.stream.Supervision$$Resume$)]@java[@javadoc[Supervision.resume()](akka.stream.Supervision#resume())] - The element is dropped and the stream continues.
* `Restart` - The element is dropped and the stream continues after restarting the operator. * @scala[@scaladoc[Restart](akka.stream.Supervision$$Restart$)]@java[@javadoc[Supervision.restart()](akka.stream.Supervision#restart())] - The element is dropped and the stream continues after restarting the operator.
Restarting an operator means that any accumulated state is cleared. This is typically Restarting an operator means that any accumulated state is cleared. This is typically
performed by creating a new instance of the operator. performed by creating a new instance of the operator.
@ -213,7 +212,7 @@ Scala
Java Java
: @@snip [FlowErrorDocTest.java](/akka-docs/src/test/java/jdocs/stream/FlowErrorDocTest.java) { #stop } : @@snip [FlowErrorDocTest.java](/akka-docs/src/test/java/jdocs/stream/FlowErrorDocTest.java) { #stop }
The default supervision strategy for a stream can be defined on the complete `RunnableGraph`. The default supervision strategy for a stream can be defined on the complete @apidoc[akka.stream.*.RunnableGraph].
Scala Scala
: @@snip [FlowErrorDocSpec.scala](/akka-docs/src/test/scala/docs/stream/FlowErrorDocSpec.scala) { #resume } : @@snip [FlowErrorDocSpec.scala](/akka-docs/src/test/scala/docs/stream/FlowErrorDocSpec.scala) { #resume }
@ -221,7 +220,7 @@ Scala
Java Java
: @@snip [FlowErrorDocTest.java](/akka-docs/src/test/java/jdocs/stream/FlowErrorDocTest.java) { #resume } : @@snip [FlowErrorDocTest.java](/akka-docs/src/test/java/jdocs/stream/FlowErrorDocTest.java) { #resume }
Here you can see that all `ArithmeticException` will resume the processing, i.e. the Here you can see that all @javadoc[ArithmeticException](java.lang.ArithmeticException) will resume the processing, i.e. the
elements that cause the division by zero are effectively dropped. elements that cause the division by zero are effectively dropped.
@@@ note @@@ note
@ -239,7 +238,7 @@ Scala
Java Java
: @@snip [FlowErrorDocTest.java](/akka-docs/src/test/java/jdocs/stream/FlowErrorDocTest.java) { #resume-section } : @@snip [FlowErrorDocTest.java](/akka-docs/src/test/java/jdocs/stream/FlowErrorDocTest.java) { #resume-section }
`Restart` works in a similar way as `Resume` with the addition that accumulated state, @scala[@scaladoc[Restart](akka.stream.Supervision$$Restart$)]@java[@javadoc[Supervision.restart()](akka.stream.Supervision#restart())] works in a similar way as @scala[@scaladoc[Resume](akka.stream.Supervision$$Resume$)]@java[@javadoc[Supervision.resume()](akka.stream.Supervision#resume())] with the addition that accumulated state,
if any, of the failing processing operator will be reset. if any, of the failing processing operator will be reset.
Scala Scala
@ -250,7 +249,7 @@ Java
### Errors from mapAsync ### Errors from mapAsync
Stream supervision can also be applied to the futures of `mapAsync` and `mapAsyncUnordered` even if such Stream supervision can also be applied to the futures of @apidoc[mapAsync](akka.stream.*.Source) {scala="#mapAsync[T](parallelism:Int)(f:Out=%3Escala.concurrent.Future[T]):FlowOps.this.Repr[T]" java="#mapAsync(int,akka.japi.function.Function)"} and @apidoc[mapAsyncUnordered](akka.stream.*.Source) {scala="#mapAsyncUnordered[T](parallelism:Int)(f:Out=%3Escala.concurrent.Future[T]):FlowOps.this.Repr[T]" java="#mapAsyncUnordered(int,akka.japi.function.Function)"} even if such
failures happen in the future rather than inside the operator itself. failures happen in the future rather than inside the operator itself.
Let's say that we use an external service to lookup email addresses and we would like to Let's say that we use an external service to lookup email addresses and we would like to
@ -272,10 +271,10 @@ Scala
Java Java
: @@snip [IntegrationDocTest.java](/akka-docs/src/test/java/jdocs/stream/IntegrationDocTest.java) { #email-address-lookup2 } : @@snip [IntegrationDocTest.java](/akka-docs/src/test/java/jdocs/stream/IntegrationDocTest.java) { #email-address-lookup2 }
The @scala[`Future`] @java[`CompletionStage`] is completed @scala[with `Failure`] @java[normally] if the email is not found. The @scala[@scaladoc[Future](scala.concurrent.Future)] @java[@javadoc[CompletionStage](java.util.concurrent.CompletionStage)] is completed @scala[with `Failure`] @java[normally] if the email is not found.
Transforming the stream of authors to a stream of email addresses by using the `lookupEmail` Transforming the stream of authors to a stream of email addresses by using the `lookupEmail`
service can be done with `mapAsync` and we use @scala[`Supervision.resumingDecider`] @java[`Supervision.getResumingDecider`] to drop service can be done with @apidoc[mapAsync](akka.stream.*.Source) {scala="#mapAsync[T](parallelism:Int)(f:Out=%3Escala.concurrent.Future[T]):FlowOps.this.Repr[T]" java="#mapAsync(int,akka.japi.function.Function)"} and we use @scala[@scaladoc[Supervision.resumingDecider](akka.stream.Supervision$#resumingDecider:akka.stream.Supervision.Deciderwithakka.japi.function.Function[Throwable,akka.stream.Supervision.Directive])] @java[@javadoc[Supervision.getResumingDecider()](akka.stream.Supervision#getResumingDecider())] to drop
unknown email addresses: unknown email addresses:
Scala Scala
@ -284,6 +283,6 @@ Scala
Java Java
: @@snip [IntegrationDocTest.java](/akka-docs/src/test/java/jdocs/stream/IntegrationDocTest.java) { #email-addresses-mapAsync-supervision } : @@snip [IntegrationDocTest.java](/akka-docs/src/test/java/jdocs/stream/IntegrationDocTest.java) { #email-addresses-mapAsync-supervision }
If we would not use `Resume` the default stopping strategy would complete the stream If we would not use @scala[@scaladoc[Resume](akka.stream.Supervision$$Resume$)]@java[@javadoc[Supervision.resume()](akka.stream.Supervision#resume())] the default stopping strategy would complete the stream
with failure on the first @scala[`Future`] @java[`CompletionStage`] that was completed @scala[with `Failure`]@java[exceptionally]. with failure on the first @scala[`Future`] @java[`CompletionStage`] that was completed @scala[with `Failure`]@java[exceptionally].