diff --git a/docs/src/main/paradox/stream/operators/Source-or-Flow/onErrorResume.md b/docs/src/main/paradox/stream/operators/Source-or-Flow/onErrorResume.md new file mode 100644 index 0000000000..2091be459e --- /dev/null +++ b/docs/src/main/paradox/stream/operators/Source-or-Flow/onErrorResume.md @@ -0,0 +1,34 @@ +# onErrorResume + +Allows transforming a failure signal into a stream of elements provided by a factory function. + +@ref[Error handling](../index.md#error-handling) + +This method is Java API only, use @ref[recoverWith](recoverWith.md) in Scala. + +## Signature + +@apidoc[Source.onErrorResume](Source) { java="#onErrorResume(org.apache.pekko.japi.function.Function)" }
+@apidoc[Source.onErrorResume](Source) { java="#onErrorResume(java.lang.Class,org.apache.pekko.japi.function.Function)" }
+@apidoc[Source.onErrorResume](Source) { java="#onErrorResume(org.apache.pekko.japi.function.Predicate,org.apache.pekko.japi.function.Function)" }
+@apidoc[Flow.onErrorResume](Flow) { java="#onErrorResume(org.apache.pekko.japi.function.Function)" }
+@apidoc[Flow.onErrorResume](Flow) { java="#onErrorResume(java.lang.Class,org.apache.pekko.japi.function.Function)" }
+@apidoc[Flow.onErrorResume](Flow) { java="#onErrorResume(org.apache.pekko.japi.function.Predicate,org.apache.pekko.japi.function.Function)" } + + +## Description + +Transform a failure signal into a stream of elements provided by a factory function. + +## Reactive Streams semantics + +@@@div { .callout } + +**emits** element is available from the upstream or upstream is failed and fallback Source produces an element + +**backpressures** downstream backpressures + +**completes** upstream completes or upstream failed with exception and fallback Source completes + +**Cancels when** downstream cancels +@@@ \ No newline at end of file diff --git a/docs/src/main/paradox/stream/operators/index.md b/docs/src/main/paradox/stream/operators/index.md index 8f46341db3..c1cd3e21e0 100644 --- a/docs/src/main/paradox/stream/operators/index.md +++ b/docs/src/main/paradox/stream/operators/index.md @@ -370,6 +370,7 @@ For more background see the @ref[Error Handling in Streams](../stream-error.md) |--|--|--| |Source/Flow|@ref[mapError](Source-or-Flow/mapError.md)|While similar to `recover` this operators can be used to transform an error signal to a different one *without* logging it as an error in the process.| |Source/Flow|@ref[onErrorComplete](Source-or-Flow/onErrorComplete.md)|Allows completing the stream when an upstream error occurs.| +|Source/Flow|@ref[onErrorResume](Source-or-Flow/onErrorResume.md)|Allows transforming a failure signal into a stream of elements provided by a factory function.| |RestartSource|@ref[onFailuresWithBackoff](RestartSource/onFailuresWithBackoff.md)|Wrap the given @apidoc[Source] with a @apidoc[Source] that will restart it when it fails using an exponential backoff. Notice that this @apidoc[Source] will not restart on completion of the wrapped flow.| |RestartFlow|@ref[onFailuresWithBackoff](RestartFlow/onFailuresWithBackoff.md)|Wrap the given @apidoc[Flow] with a @apidoc[Flow] that will restart it when it fails using an exponential backoff. Notice that this @apidoc[Flow] will not restart on completion of the wrapped flow.| |Source/Flow|@ref[recover](Source-or-Flow/recover.md)|Allow sending of one last element downstream when a failure has happened upstream.| @@ -546,6 +547,7 @@ For more background see the @ref[Error Handling in Streams](../stream-error.md) * [none](Sink/none.md) * [onComplete](Sink/onComplete.md) * [onErrorComplete](Source-or-Flow/onErrorComplete.md) +* [onErrorResume](Source-or-Flow/onErrorResume.md) * [onFailuresWithBackoff](RestartSource/onFailuresWithBackoff.md) * [onFailuresWithBackoff](RestartFlow/onFailuresWithBackoff.md) * [optionalVia](Source-or-Flow/optionalVia.md) diff --git a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/FlowTest.java b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/FlowTest.java index f11c7a7e31..2137589d6d 100644 --- a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/FlowTest.java +++ b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/FlowTest.java @@ -1338,13 +1338,32 @@ public class FlowTest extends StreamTest { return elem; } }) - .onErrorComplete() + .via(Flow.of(Integer.class).onErrorComplete()) .runWith(TestSink.probe(system), system) .request(2) .expectNext(1) .expectComplete(); } + @Test + public void mustBeAbleToOnErrorResume() { + Source.from(Arrays.asList(1, 2)) + .map( + elem -> { + if (elem == 2) { + throw new RuntimeException("ex"); + } else { + return elem; + } + }) + .via(Flow.of(Integer.class).onErrorResume(e -> Source.single(0))) + .runWith(TestSink.probe(system), system) + .request(2) + .expectNext(1) + .expectNext(0) + .expectComplete(); + } + @Test public void mustBeAbleToOnErrorCompleteWithDedicatedException() { Source.from(Arrays.asList(1, 2)) @@ -1356,13 +1375,34 @@ public class FlowTest extends StreamTest { return elem; } }) - .onErrorComplete(IllegalArgumentException.class) + .via(Flow.of(Integer.class).onErrorComplete(IllegalArgumentException.class)) .runWith(TestSink.probe(system), system) .request(2) .expectNext(1) .expectComplete(); } + @Test + public void mustBeAbleToOnErrorResumeWithDedicatedException() { + Source.from(Arrays.asList(1, 2)) + .map( + elem -> { + if (elem == 2) { + throw new IllegalArgumentException("ex"); + } else { + return elem; + } + }) + .via( + Flow.of(Integer.class) + .onErrorResume(IllegalArgumentException.class, e -> Source.single(0))) + .runWith(TestSink.probe(system), system) + .request(2) + .expectNext(1) + .expectNext(0) + .expectComplete(); + } + @Test public void mustBeAbleToFailWhenExceptionTypeNotMatch() { final IllegalArgumentException ex = new IllegalArgumentException("ex"); @@ -1375,7 +1415,26 @@ public class FlowTest extends StreamTest { return elem; } }) - .onErrorComplete(TimeoutException.class) + .via(Flow.of(Integer.class).onErrorComplete(TimeoutException.class)) + .runWith(TestSink.probe(system), system) + .request(2) + .expectNext(1) + .expectError(ex); + } + + @Test + public void onErrorResumeMustBeAbleToFailWhenExceptionTypeNotMatch() { + final IllegalArgumentException ex = new IllegalArgumentException("ex"); + Source.from(Arrays.asList(1, 2)) + .map( + elem -> { + if (elem == 2) { + throw ex; + } else { + return elem; + } + }) + .via(Flow.of(Integer.class).onErrorResume(TimeoutException.class, e -> Source.single(0))) .runWith(TestSink.probe(system), system) .request(2) .expectNext(1) @@ -1393,13 +1452,34 @@ public class FlowTest extends StreamTest { return elem; } }) - .onErrorComplete(ex -> ex.getMessage().contains("Boom")) + .via(Flow.of(Integer.class).onErrorComplete(ex -> ex.getMessage().contains("Boom"))) .runWith(TestSink.probe(system), system) .request(2) .expectNext(1) .expectComplete(); } + @Test + public void mustBeAbleToOnErrorResumeWithPredicate() { + Source.from(Arrays.asList(1, 2)) + .map( + elem -> { + if (elem == 2) { + throw new IllegalArgumentException("Boom"); + } else { + return elem; + } + }) + .via( + Flow.of(Integer.class) + .onErrorResume(ex -> ex.getMessage().contains("Boom"), e -> Source.single(0))) + .runWith(TestSink.probe(system), system) + .request(2) + .expectNext(1) + .expectNext(0) + .expectComplete(); + } + @Test public void mustBeAbleToMapErrorClass() { final String head = "foo"; diff --git a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java index b3331212e3..3e47d66c3a 100644 --- a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java +++ b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java @@ -1647,4 +1647,153 @@ public class SourceTest extends StreamTest { .expectNext("Message1", "Message2") .expectComplete(); } + + @Test + public void mustBeAbleToOnErrorComplete() { + Source.from(Arrays.asList(1, 2)) + .map( + elem -> { + if (elem == 2) { + throw new RuntimeException("ex"); + } else { + return elem; + } + }) + .onErrorComplete() + .runWith(TestSink.probe(system), system) + .request(2) + .expectNext(1) + .expectComplete(); + } + + @Test + public void mustBeAbleToOnErrorResume() { + Source.from(Arrays.asList(1, 2)) + .map( + elem -> { + if (elem == 2) { + throw new RuntimeException("ex"); + } else { + return elem; + } + }) + .onErrorResume(e -> Source.single(0)) + .runWith(TestSink.probe(system), system) + .request(2) + .expectNext(1) + .expectNext(0) + .expectComplete(); + } + + @Test + public void mustBeAbleToOnErrorCompleteWithDedicatedException() { + Source.from(Arrays.asList(1, 2)) + .map( + elem -> { + if (elem == 2) { + throw new IllegalArgumentException("ex"); + } else { + return elem; + } + }) + .onErrorComplete(IllegalArgumentException.class) + .runWith(TestSink.probe(system), system) + .request(2) + .expectNext(1) + .expectComplete(); + } + + @Test + public void mustBeAbleToOnErrorResumeWithDedicatedException() { + Source.from(Arrays.asList(1, 2)) + .map( + elem -> { + if (elem == 2) { + throw new IllegalArgumentException("ex"); + } else { + return elem; + } + }) + .onErrorResume(IllegalArgumentException.class, e -> Source.single(0)) + .runWith(TestSink.probe(system), system) + .request(2) + .expectNext(1) + .expectNext(0) + .expectComplete(); + } + + @Test + public void mustBeAbleToFailWhenExceptionTypeNotMatch() { + final IllegalArgumentException ex = new IllegalArgumentException("ex"); + Source.from(Arrays.asList(1, 2)) + .map( + elem -> { + if (elem == 2) { + throw ex; + } else { + return elem; + } + }) + .onErrorComplete(TimeoutException.class) + .runWith(TestSink.probe(system), system) + .request(2) + .expectNext(1) + .expectError(ex); + } + + @Test + public void onErrorResumeMustBeAbleToFailWhenExceptionTypeNotMatch() { + final IllegalArgumentException ex = new IllegalArgumentException("ex"); + Source.from(Arrays.asList(1, 2)) + .map( + elem -> { + if (elem == 2) { + throw ex; + } else { + return elem; + } + }) + .onErrorResume(TimeoutException.class, e -> Source.single(0)) + .runWith(TestSink.probe(system), system) + .request(2) + .expectNext(1) + .expectError(ex); + } + + @Test + public void mustBeAbleToOnErrorCompleteWithPredicate() { + Source.from(Arrays.asList(1, 2)) + .map( + elem -> { + if (elem == 2) { + throw new IllegalArgumentException("Boom"); + } else { + return elem; + } + }) + .onErrorComplete(ex -> ex.getMessage().contains("Boom")) + .runWith(TestSink.probe(system), system) + .request(2) + .expectNext(1) + .expectComplete(); + } + + @Test + public void mustBeAbleToOnErrorResumeWithPredicate() { + Source.from(Arrays.asList(1, 2)) + .map( + elem -> { + if (elem == 2) { + throw new IllegalArgumentException("Boom"); + } else { + return elem; + } + }) + .onErrorResume(ex -> ex.getMessage().contains("Boom"), e -> Source.single(0)) + .runWith(TestSink.probe(system), system) + .request(2) + .expectNext(1) + .expectNext(0) + .expectComplete(); + } } diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/DslConsistencySpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/DslConsistencySpec.scala index 7c78fe1af3..544e418fdf 100755 --- a/stream-tests/src/test/scala/org/apache/pekko/stream/DslConsistencySpec.scala +++ b/stream-tests/src/test/scala/org/apache/pekko/stream/DslConsistencySpec.scala @@ -70,7 +70,9 @@ class DslConsistencySpec extends AnyWordSpec with Matchers { "andThenMat", "isIdentity", "withAttributes", - "transformMaterializing") ++ + "transformMaterializing", + "onErrorResume" // Java Only, Scala use `recoverWith` + ) ++ Set("asScala", "asJava", "deprecatedAndThen", "deprecatedAndThenMat") val graphHelpers = Set( diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala index 7a2ec9018e..637bf91541 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala @@ -2109,6 +2109,81 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr case ex: Throwable if predicate.test(ex) => true }) + /** + * Transform a failure signal into a stream of elements provided by a factory function. + * This allows to continue processing with another stream when a failure occurs. + * + * Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements. + * This operator can recover the failure signal, but not the skipped elements, which will be dropped. + * + * '''Emits when''' element is available from the upstream or upstream is failed and fallback Source produces an element + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes or upstream failed with exception and fallback Source completes + * + * '''Cancels when''' downstream cancels + * + * @param fallback Function which produces a Source to continue the stream + * @since 2.0.0 + */ + def onErrorResume[T >: Out](fallback: function.Function[_ >: Throwable, _ <: Graph[SourceShape[T], NotUsed]]) + : javadsl.Flow[In, T, Mat] = new Flow(delegate.recoverWith { + case ex: Throwable => fallback(ex) + }) + + /** + * Transform a failure signal into a stream of elements provided by a factory function. + * This allows to continue processing with another stream when a failure occurs. + * + * Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements. + * This operator can recover the failure signal, but not the skipped elements, which will be dropped. + * + * '''Emits when''' element is available from the upstream or upstream is failed and fallback Source produces an element + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes or upstream failed with exception and fallback Source completes + * + * '''Cancels when''' downstream cancels + * + * @param clazz the class object of the failure cause + * @param fallback Function which produces a Source to continue the stream + * @since 2.0.0 + */ + def onErrorResume[T >: Out]( + clazz: Class[_ <: Throwable], + fallback: function.Function[_ >: Throwable, _ <: Graph[SourceShape[T], NotUsed]]) + : javadsl.Flow[In, T, Mat] = new Flow(delegate.recoverWith { + case ex: Throwable if clazz.isInstance(ex) => fallback(ex) + }) + + /** + * Transform a failure signal into a stream of elements provided by a factory function. + * This allows to continue processing with another stream when a failure occurs. + * + * Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements. + * This operator can recover the failure signal, but not the skipped elements, which will be dropped. + * + * '''Emits when''' element is available from the upstream or upstream is failed and fallback Source produces an element + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes or upstream failed with exception and fallback Source completes + * + * '''Cancels when''' downstream cancels + * + * @param predicate Predicate which determines if the exception should be handled + * @param function Function which produces a Source to continue the stream + * @since 2.0.0 + */ + def onErrorResume[T >: Out]( + predicate: function.Predicate[_ >: Throwable], + fallback: function.Function[_ >: Throwable, _ <: Graph[SourceShape[T], NotUsed]]) + : javadsl.Flow[In, T, Mat] = new Flow(delegate.recoverWith { + case ex: Throwable if predicate.test(ex) => fallback(ex) + }) + /** * Terminate processing (and cancel the upstream publisher) after the given * number of elements. Due to input buffering some elements may have been diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala index 3594211de5..3649a7ef44 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala @@ -2343,6 +2343,82 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[ case ex: Throwable if predicate.test(ex) => true }) + /** + * Transform a failure signal into a Source of elements provided by a factory function. + * This allows to continue processing with another stream when a failure occurs. + * + * Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements. + * This operator can recover the failure signal, but not the skipped elements, which will be dropped. + * + * '''Emits when''' element is available from the upstream or upstream is failed and fallback Source produces an element + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes or upstream failed with exception and fallback Source completes + * + * '''Cancels when''' downstream cancels + * + * @param fallback Function which produces a Source to continue the stream + * @since 2.0.0 + */ + def onErrorResume[T >: Out]( + fallback: function.Function[_ >: Throwable, _ <: Graph[SourceShape[T], NotUsed]]): javadsl.Source[T, Mat] = + new Source(delegate.recoverWith { + case ex: Throwable => fallback(ex) + }) + + /** + * Transform a failure signal into a stream of elements provided by a factory function. + * This allows to continue processing with another stream when a failure occurs. + * + * Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements. + * This operator can recover the failure signal, but not the skipped elements, which will be dropped. + * + * '''Emits when''' element is available from the upstream or upstream is failed and fallback Source produces an element + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes or upstream failed with exception and fallback Source completes + * + * '''Cancels when''' downstream cancels + * + * @param clazz the class object of the failure cause + * @param fallback Function which produces a Source to continue the stream + * @since 2.0.0 + */ + def onErrorResume[T >: Out]( + clazz: Class[_ <: Throwable], + fallback: function.Function[_ >: Throwable, _ <: Graph[SourceShape[T], NotUsed]]): javadsl.Source[T, Mat] = + new Source(delegate.recoverWith { + case ex: Throwable if clazz.isInstance(ex) => fallback(ex) + }) + + /** + * Transform a failure signal into a stream of elements provided by a factory function. + * This allows to continue processing with another stream when a failure occurs. + * + * Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements. + * This operator can recover the failure signal, but not the skipped elements, which will be dropped. + * + * '''Emits when''' element is available from the upstream or upstream is failed and fallback Source produces an element + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes or upstream failed with exception and fallback Source completes + * + * '''Cancels when''' downstream cancels + * + * @param predicate Predicate which determines if the exception should be handled + * @param fallback Function which produces a Source to continue the stream + * @since 2.0.0 + */ + def onErrorResume[T >: Out]( + predicate: function.Predicate[_ >: Throwable], + fallback: function.Function[_ >: Throwable, _ <: Graph[SourceShape[T], NotUsed]]): javadsl.Source[T, Mat] = + new Source(delegate.recoverWith { + case ex: Throwable if predicate.test(ex) => fallback(ex) + }) + /** * Transform each input element into an `Iterable` of output elements that is * then flattened into the output stream.