Merge scala/stream/stream-error.md and java/stream/stream-error.md (#23183)

This commit is contained in:
Atiq Sayyed 2017-06-20 18:19:57 +07:00 committed by Arnout Engelen
parent 972fb0001b
commit 21c702f36a
2 changed files with 39 additions and 86 deletions

View file

@ -1,76 +0,0 @@
# Error Handling
Strategies for how to handle exceptions from processing stream elements can be defined when
materializing the stream. The error handling strategies are inspired by actor supervision
strategies, but the semantics have been adapted to the domain of stream processing.
@@@ warning
*ZipWith*, *GraphStage* junction, *ActorPublisher* source and *ActorSubscriber* sink
components do not honour the supervision strategy attribute yet.
@@@
## Supervision Strategies
There are three ways to handle exceptions from application code:
* `Stop` - The stream is completed with failure.
* `Resume` - The element is dropped and the stream continues.
* `Restart` - The element is dropped and the stream continues after restarting the stage.
Restarting a stage means that any accumulated state is cleared. This is typically
performed by creating a new instance of the stage.
By default the stopping strategy is used for all exceptions, i.e. the stream will be completed with
failure when an exception is thrown.
@@snip [FlowErrorDocTest.java]($code$/java/jdocs/stream/FlowErrorDocTest.java) { #stop }
The default supervision strategy for a stream can be defined on the settings of the materializer.
@@snip [FlowErrorDocTest.java]($code$/java/jdocs/stream/FlowErrorDocTest.java) { #resume }
Here you can see that all `ArithmeticException` will resume the processing, i.e. the
elements that cause the division by zero are effectively dropped.
@@@ note
Be aware that dropping elements may result in deadlocks in graphs with
cycles, as explained in @ref:[Graph cycles, liveness and deadlocks](stream-graphs.md#graph-cycles).
@@@
The supervision strategy can also be defined for all operators of a flow.
@@snip [FlowErrorDocTest.java]($code$/java/jdocs/stream/FlowErrorDocTest.java) { #resume-section }
`Restart` works in a similar way as `Resume` with the addition that accumulated state,
if any, of the failing processing stage will be reset.
@@snip [FlowErrorDocTest.java]($code$/java/jdocs/stream/FlowErrorDocTest.java) { #restart-section }
## Errors from mapAsync
Stream supervision can also be applied to the futures of `mapAsync`.
Let's say that we use an external service to lookup email addresses and we would like to
discard those that cannot be found.
We start with the tweet stream of authors:
@@snip [IntegrationDocTest.java]($code$/java/jdocs/stream/IntegrationDocTest.java) { #tweet-authors }
Assume that we can lookup their email address using:
@@snip [IntegrationDocTest.java]($code$/java/jdocs/stream/IntegrationDocTest.java) { #email-address-lookup2 }
The `CompletionStage` is completed normally if the email is not found.
Transforming the stream of authors to a stream of email addresses by using the `lookupEmail`
service can be done with `mapAsync` and we use `Supervision.getResumingDecider` to drop
unknown email addresses:
@@snip [IntegrationDocTest.java]($code$/java/jdocs/stream/IntegrationDocTest.java) { #email-addresses-mapAsync-supervision }
If we would not use `Resume` the default stopping strategy would complete the stream
with failure on the first `CompletionStage` that was completed exceptionally.

View file

@ -0,0 +1 @@
../../scala/stream/stream-error.md

View file

@ -24,11 +24,19 @@ performed by creating a new instance of the stage.
By default the stopping strategy is used for all exceptions, i.e. the stream will be completed with By default the stopping strategy is used for all exceptions, i.e. the stream will be completed with
failure when an exception is thrown. failure when an exception is thrown.
@@snip [FlowErrorDocSpec.scala]($code$/scala/docs/stream/FlowErrorDocSpec.scala) { #stop } Scala
: @@snip [FlowErrorDocSpec.scala]($code$/scala/docs/stream/FlowErrorDocSpec.scala) { #stop }
Java
: @@snip [FlowErrorDocTest.java]($code$/java/jdocs/stream/FlowErrorDocTest.java) { #stop }
The default supervision strategy for a stream can be defined on the settings of the materializer. The default supervision strategy for a stream can be defined on the settings of the materializer.
@@snip [FlowErrorDocSpec.scala]($code$/scala/docs/stream/FlowErrorDocSpec.scala) { #resume } Scala
: @@snip [FlowErrorDocSpec.scala]($code$/scala/docs/stream/FlowErrorDocSpec.scala) { #resume }
Java
: @@snip [FlowErrorDocTest.java]($code$/java/jdocs/stream/FlowErrorDocTest.java) { #resume }
Here you can see that all `ArithmeticException` will resume the processing, i.e. the Here you can see that all `ArithmeticException` will resume the processing, i.e. the
elements that cause the division by zero are effectively dropped. elements that cause the division by zero are effectively dropped.
@ -42,12 +50,20 @@ cycles, as explained in @ref:[Graph cycles, liveness and deadlocks](stream-graph
The supervision strategy can also be defined for all operators of a flow. The supervision strategy can also be defined for all operators of a flow.
@@snip [FlowErrorDocSpec.scala]($code$/scala/docs/stream/FlowErrorDocSpec.scala) { #resume-section } Scala
: @@snip [FlowErrorDocSpec.scala]($code$/scala/docs/stream/FlowErrorDocSpec.scala) { #resume-section }
Java
: @@snip [FlowErrorDocTest.java]($code$/java/jdocs/stream/FlowErrorDocTest.java) { #resume-section }
`Restart` works in a similar way as `Resume` with the addition that accumulated state, `Restart` works in a similar way as `Resume` with the addition that accumulated state,
if any, of the failing processing stage will be reset. if any, of the failing processing stage will be reset.
@@snip [FlowErrorDocSpec.scala]($code$/scala/docs/stream/FlowErrorDocSpec.scala) { #restart-section } Scala
: @@snip [FlowErrorDocSpec.scala]($code$/scala/docs/stream/FlowErrorDocSpec.scala) { #restart-section }
Java
: @@snip [FlowErrorDocTest.java]($code$/java/jdocs/stream/FlowErrorDocTest.java) { #restart-section }
## Errors from mapAsync ## Errors from mapAsync
@ -58,19 +74,31 @@ discard those that cannot be found.
We start with the tweet stream of authors: We start with the tweet stream of authors:
@@snip [IntegrationDocSpec.scala]($code$/scala/docs/stream/IntegrationDocSpec.scala) { #tweet-authors } Scala
: @@snip [IntegrationDocSpec.scala]($code$/scala/docs/stream/IntegrationDocSpec.scala) { #tweet-authors }
Java
: @@snip [IntegrationDocTest.java]($code$/java/jdocs/stream/IntegrationDocTest.java) { #tweet-authors }
Assume that we can lookup their email address using: Assume that we can lookup their email address using:
@@snip [IntegrationDocSpec.scala]($code$/scala/docs/stream/IntegrationDocSpec.scala) { #email-address-lookup2 } Scala
: @@snip [IntegrationDocSpec.scala]($code$/scala/docs/stream/IntegrationDocSpec.scala) { #email-address-lookup2 }
The `Future` is completed with `Failure` if the email is not found. Java
: @@snip [IntegrationDocTest.java]($code$/java/jdocs/stream/IntegrationDocTest.java) { #email-address-lookup2 }
The @scala[`Future`] @java[`CompletionStage`] is completed @scala[with `Failure`] @java[normally] if the email is not found.
Transforming the stream of authors to a stream of email addresses by using the `lookupEmail` Transforming the stream of authors to a stream of email addresses by using the `lookupEmail`
service can be done with `mapAsync` and we use `Supervision.resumingDecider` to drop service can be done with `mapAsync` and we use @scala[`Supervision.resumingDecider`] @java[`Supervision.getResumingDecider`] to drop
unknown email addresses: unknown email addresses:
@@snip [IntegrationDocSpec.scala]($code$/scala/docs/stream/IntegrationDocSpec.scala) { #email-addresses-mapAsync-supervision } Scala
: @@snip [IntegrationDocSpec.scala]($code$/scala/docs/stream/IntegrationDocSpec.scala) { #email-addresses-mapAsync-supervision }
Java
: @@snip [IntegrationDocTest.java]($code$/java/jdocs/stream/IntegrationDocTest.java) { #email-addresses-mapAsync-supervision }
If we would not use `Resume` the default stopping strategy would complete the stream If we would not use `Resume` the default stopping strategy would complete the stream
with failure on the first `Future` that was completed with `Failure`. with failure on the first @scala[`Future`] @java[`CompletionStage`] that was completed @scala[with `Failure`]@java[exceptionally].