diff --git a/akka-docs/src/main/paradox/java/stream/stream-error.md b/akka-docs/src/main/paradox/java/stream/stream-error.md deleted file mode 100644 index a77fd99535..0000000000 --- a/akka-docs/src/main/paradox/java/stream/stream-error.md +++ /dev/null @@ -1,76 +0,0 @@ -# Error Handling - -Strategies for how to handle exceptions from processing stream elements can be defined when -materializing the stream. The error handling strategies are inspired by actor supervision -strategies, but the semantics have been adapted to the domain of stream processing. - -@@@ warning - -*ZipWith*, *GraphStage* junction, *ActorPublisher* source and *ActorSubscriber* sink -components do not honour the supervision strategy attribute yet. - -@@@ - -## Supervision Strategies - -There are three ways to handle exceptions from application code: - - * `Stop` - The stream is completed with failure. - * `Resume` - The element is dropped and the stream continues. - * `Restart` - The element is dropped and the stream continues after restarting the stage. -Restarting a stage means that any accumulated state is cleared. This is typically -performed by creating a new instance of the stage. - -By default the stopping strategy is used for all exceptions, i.e. the stream will be completed with -failure when an exception is thrown. - -@@snip [FlowErrorDocTest.java]($code$/java/jdocs/stream/FlowErrorDocTest.java) { #stop } - -The default supervision strategy for a stream can be defined on the settings of the materializer. - -@@snip [FlowErrorDocTest.java]($code$/java/jdocs/stream/FlowErrorDocTest.java) { #resume } - -Here you can see that all `ArithmeticException` will resume the processing, i.e. the -elements that cause the division by zero are effectively dropped. - -@@@ note - -Be aware that dropping elements may result in deadlocks in graphs with -cycles, as explained in @ref:[Graph cycles, liveness and deadlocks](stream-graphs.md#graph-cycles). - -@@@ - -The supervision strategy can also be defined for all operators of a flow. - -@@snip [FlowErrorDocTest.java]($code$/java/jdocs/stream/FlowErrorDocTest.java) { #resume-section } - -`Restart` works in a similar way as `Resume` with the addition that accumulated state, -if any, of the failing processing stage will be reset. - -@@snip [FlowErrorDocTest.java]($code$/java/jdocs/stream/FlowErrorDocTest.java) { #restart-section } - -## Errors from mapAsync - -Stream supervision can also be applied to the futures of `mapAsync`. - -Let's say that we use an external service to lookup email addresses and we would like to -discard those that cannot be found. - -We start with the tweet stream of authors: - -@@snip [IntegrationDocTest.java]($code$/java/jdocs/stream/IntegrationDocTest.java) { #tweet-authors } - -Assume that we can lookup their email address using: - -@@snip [IntegrationDocTest.java]($code$/java/jdocs/stream/IntegrationDocTest.java) { #email-address-lookup2 } - -The `CompletionStage` is completed normally if the email is not found. - -Transforming the stream of authors to a stream of email addresses by using the `lookupEmail` -service can be done with `mapAsync` and we use `Supervision.getResumingDecider` to drop -unknown email addresses: - -@@snip [IntegrationDocTest.java]($code$/java/jdocs/stream/IntegrationDocTest.java) { #email-addresses-mapAsync-supervision } - -If we would not use `Resume` the default stopping strategy would complete the stream -with failure on the first `CompletionStage` that was completed exceptionally. \ No newline at end of file diff --git a/akka-docs/src/main/paradox/java/stream/stream-error.md b/akka-docs/src/main/paradox/java/stream/stream-error.md new file mode 120000 index 0000000000..f4bab4d977 --- /dev/null +++ b/akka-docs/src/main/paradox/java/stream/stream-error.md @@ -0,0 +1 @@ +../../scala/stream/stream-error.md \ No newline at end of file diff --git a/akka-docs/src/main/paradox/scala/stream/stream-error.md b/akka-docs/src/main/paradox/scala/stream/stream-error.md index 25bbb57aae..73655c4e55 100644 --- a/akka-docs/src/main/paradox/scala/stream/stream-error.md +++ b/akka-docs/src/main/paradox/scala/stream/stream-error.md @@ -24,11 +24,19 @@ performed by creating a new instance of the stage. By default the stopping strategy is used for all exceptions, i.e. the stream will be completed with failure when an exception is thrown. -@@snip [FlowErrorDocSpec.scala]($code$/scala/docs/stream/FlowErrorDocSpec.scala) { #stop } +Scala +: @@snip [FlowErrorDocSpec.scala]($code$/scala/docs/stream/FlowErrorDocSpec.scala) { #stop } + +Java +: @@snip [FlowErrorDocTest.java]($code$/java/jdocs/stream/FlowErrorDocTest.java) { #stop } The default supervision strategy for a stream can be defined on the settings of the materializer. -@@snip [FlowErrorDocSpec.scala]($code$/scala/docs/stream/FlowErrorDocSpec.scala) { #resume } +Scala +: @@snip [FlowErrorDocSpec.scala]($code$/scala/docs/stream/FlowErrorDocSpec.scala) { #resume } + +Java +: @@snip [FlowErrorDocTest.java]($code$/java/jdocs/stream/FlowErrorDocTest.java) { #resume } Here you can see that all `ArithmeticException` will resume the processing, i.e. the elements that cause the division by zero are effectively dropped. @@ -42,12 +50,20 @@ cycles, as explained in @ref:[Graph cycles, liveness and deadlocks](stream-graph The supervision strategy can also be defined for all operators of a flow. -@@snip [FlowErrorDocSpec.scala]($code$/scala/docs/stream/FlowErrorDocSpec.scala) { #resume-section } +Scala +: @@snip [FlowErrorDocSpec.scala]($code$/scala/docs/stream/FlowErrorDocSpec.scala) { #resume-section } + +Java +: @@snip [FlowErrorDocTest.java]($code$/java/jdocs/stream/FlowErrorDocTest.java) { #resume-section } `Restart` works in a similar way as `Resume` with the addition that accumulated state, if any, of the failing processing stage will be reset. -@@snip [FlowErrorDocSpec.scala]($code$/scala/docs/stream/FlowErrorDocSpec.scala) { #restart-section } +Scala +: @@snip [FlowErrorDocSpec.scala]($code$/scala/docs/stream/FlowErrorDocSpec.scala) { #restart-section } + +Java +: @@snip [FlowErrorDocTest.java]($code$/java/jdocs/stream/FlowErrorDocTest.java) { #restart-section } ## Errors from mapAsync @@ -58,19 +74,31 @@ discard those that cannot be found. We start with the tweet stream of authors: -@@snip [IntegrationDocSpec.scala]($code$/scala/docs/stream/IntegrationDocSpec.scala) { #tweet-authors } +Scala +: @@snip [IntegrationDocSpec.scala]($code$/scala/docs/stream/IntegrationDocSpec.scala) { #tweet-authors } + +Java +: @@snip [IntegrationDocTest.java]($code$/java/jdocs/stream/IntegrationDocTest.java) { #tweet-authors } Assume that we can lookup their email address using: -@@snip [IntegrationDocSpec.scala]($code$/scala/docs/stream/IntegrationDocSpec.scala) { #email-address-lookup2 } +Scala +: @@snip [IntegrationDocSpec.scala]($code$/scala/docs/stream/IntegrationDocSpec.scala) { #email-address-lookup2 } -The `Future` is completed with `Failure` if the email is not found. +Java +: @@snip [IntegrationDocTest.java]($code$/java/jdocs/stream/IntegrationDocTest.java) { #email-address-lookup2 } + +The @scala[`Future`] @java[`CompletionStage`] is completed @scala[with `Failure`] @java[normally] if the email is not found. Transforming the stream of authors to a stream of email addresses by using the `lookupEmail` -service can be done with `mapAsync` and we use `Supervision.resumingDecider` to drop +service can be done with `mapAsync` and we use @scala[`Supervision.resumingDecider`] @java[`Supervision.getResumingDecider`] to drop unknown email addresses: -@@snip [IntegrationDocSpec.scala]($code$/scala/docs/stream/IntegrationDocSpec.scala) { #email-addresses-mapAsync-supervision } +Scala +: @@snip [IntegrationDocSpec.scala]($code$/scala/docs/stream/IntegrationDocSpec.scala) { #email-addresses-mapAsync-supervision } + +Java +: @@snip [IntegrationDocTest.java]($code$/java/jdocs/stream/IntegrationDocTest.java) { #email-addresses-mapAsync-supervision } If we would not use `Resume` the default stopping strategy would complete the stream -with failure on the first `Future` that was completed with `Failure`. \ No newline at end of file +with failure on the first @scala[`Future`] @java[`CompletionStage`] that was completed @scala[with `Failure`]@java[exceptionally]. \ No newline at end of file