feat: Add Flow/Source#onErrorResume for javadsl. (#2120)

This commit is contained in:
He-Pin(kerr) 2025-08-31 20:34:34 +08:00 committed by GitHub
parent 617d1c5faa
commit b5131ae689
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
7 changed files with 423 additions and 5 deletions

View file

@ -0,0 +1,34 @@
# onErrorResume
Allows transforming a failure signal into a stream of elements provided by a factory function.
@ref[Error handling](../index.md#error-handling)
This method is Java API only, use @ref[recoverWith](recoverWith.md) in Scala.
## Signature
@apidoc[Source.onErrorResume](Source) { java="#onErrorResume(org.apache.pekko.japi.function.Function)" }<br>
@apidoc[Source.onErrorResume](Source) { java="#onErrorResume(java.lang.Class,org.apache.pekko.japi.function.Function)" }<br>
@apidoc[Source.onErrorResume](Source) { java="#onErrorResume(org.apache.pekko.japi.function.Predicate,org.apache.pekko.japi.function.Function)" }<br>
@apidoc[Flow.onErrorResume](Flow) { java="#onErrorResume(org.apache.pekko.japi.function.Function)" }<br>
@apidoc[Flow.onErrorResume](Flow) { java="#onErrorResume(java.lang.Class,org.apache.pekko.japi.function.Function)" }<br>
@apidoc[Flow.onErrorResume](Flow) { java="#onErrorResume(org.apache.pekko.japi.function.Predicate,org.apache.pekko.japi.function.Function)" }
## Description
Transform a failure signal into a stream of elements provided by a factory function.
## Reactive Streams semantics
@@@div { .callout }
**emits** element is available from the upstream or upstream is failed and fallback Source produces an element
**backpressures** downstream backpressures
**completes** upstream completes or upstream failed with exception and fallback Source completes
**Cancels when** downstream cancels
@@@

View file

@ -370,6 +370,7 @@ For more background see the @ref[Error Handling in Streams](../stream-error.md)
|--|--|--|
|Source/Flow|<a name="maperror"></a>@ref[mapError](Source-or-Flow/mapError.md)|While similar to `recover` this operators can be used to transform an error signal to a different one *without* logging it as an error in the process.|
|Source/Flow|<a name="onerrorcomplete"></a>@ref[onErrorComplete](Source-or-Flow/onErrorComplete.md)|Allows completing the stream when an upstream error occurs.|
|Source/Flow|<a name="onerrorresume"></a>@ref[onErrorResume](Source-or-Flow/onErrorResume.md)|Allows transforming a failure signal into a stream of elements provided by a factory function.|
|RestartSource|<a name="onfailureswithbackoff"></a>@ref[onFailuresWithBackoff](RestartSource/onFailuresWithBackoff.md)|Wrap the given @apidoc[Source] with a @apidoc[Source] that will restart it when it fails using an exponential backoff. Notice that this @apidoc[Source] will not restart on completion of the wrapped flow.|
|RestartFlow|<a name="onfailureswithbackoff"></a>@ref[onFailuresWithBackoff](RestartFlow/onFailuresWithBackoff.md)|Wrap the given @apidoc[Flow] with a @apidoc[Flow] that will restart it when it fails using an exponential backoff. Notice that this @apidoc[Flow] will not restart on completion of the wrapped flow.|
|Source/Flow|<a name="recover"></a>@ref[recover](Source-or-Flow/recover.md)|Allow sending of one last element downstream when a failure has happened upstream.|
@ -546,6 +547,7 @@ For more background see the @ref[Error Handling in Streams](../stream-error.md)
* [none](Sink/none.md)
* [onComplete](Sink/onComplete.md)
* [onErrorComplete](Source-or-Flow/onErrorComplete.md)
* [onErrorResume](Source-or-Flow/onErrorResume.md)
* [onFailuresWithBackoff](RestartSource/onFailuresWithBackoff.md)
* [onFailuresWithBackoff](RestartFlow/onFailuresWithBackoff.md)
* [optionalVia](Source-or-Flow/optionalVia.md)

View file

@ -1338,13 +1338,32 @@ public class FlowTest extends StreamTest {
return elem;
}
})
.onErrorComplete()
.via(Flow.of(Integer.class).onErrorComplete())
.runWith(TestSink.probe(system), system)
.request(2)
.expectNext(1)
.expectComplete();
}
@Test
public void mustBeAbleToOnErrorResume() {
Source.from(Arrays.asList(1, 2))
.map(
elem -> {
if (elem == 2) {
throw new RuntimeException("ex");
} else {
return elem;
}
})
.via(Flow.of(Integer.class).onErrorResume(e -> Source.single(0)))
.runWith(TestSink.probe(system), system)
.request(2)
.expectNext(1)
.expectNext(0)
.expectComplete();
}
@Test
public void mustBeAbleToOnErrorCompleteWithDedicatedException() {
Source.from(Arrays.asList(1, 2))
@ -1356,13 +1375,34 @@ public class FlowTest extends StreamTest {
return elem;
}
})
.onErrorComplete(IllegalArgumentException.class)
.via(Flow.of(Integer.class).onErrorComplete(IllegalArgumentException.class))
.runWith(TestSink.probe(system), system)
.request(2)
.expectNext(1)
.expectComplete();
}
@Test
public void mustBeAbleToOnErrorResumeWithDedicatedException() {
Source.from(Arrays.asList(1, 2))
.map(
elem -> {
if (elem == 2) {
throw new IllegalArgumentException("ex");
} else {
return elem;
}
})
.via(
Flow.of(Integer.class)
.onErrorResume(IllegalArgumentException.class, e -> Source.single(0)))
.runWith(TestSink.probe(system), system)
.request(2)
.expectNext(1)
.expectNext(0)
.expectComplete();
}
@Test
public void mustBeAbleToFailWhenExceptionTypeNotMatch() {
final IllegalArgumentException ex = new IllegalArgumentException("ex");
@ -1375,7 +1415,26 @@ public class FlowTest extends StreamTest {
return elem;
}
})
.onErrorComplete(TimeoutException.class)
.via(Flow.of(Integer.class).onErrorComplete(TimeoutException.class))
.runWith(TestSink.probe(system), system)
.request(2)
.expectNext(1)
.expectError(ex);
}
@Test
public void onErrorResumeMustBeAbleToFailWhenExceptionTypeNotMatch() {
final IllegalArgumentException ex = new IllegalArgumentException("ex");
Source.from(Arrays.asList(1, 2))
.map(
elem -> {
if (elem == 2) {
throw ex;
} else {
return elem;
}
})
.via(Flow.of(Integer.class).onErrorResume(TimeoutException.class, e -> Source.single(0)))
.runWith(TestSink.probe(system), system)
.request(2)
.expectNext(1)
@ -1393,13 +1452,34 @@ public class FlowTest extends StreamTest {
return elem;
}
})
.onErrorComplete(ex -> ex.getMessage().contains("Boom"))
.via(Flow.of(Integer.class).onErrorComplete(ex -> ex.getMessage().contains("Boom")))
.runWith(TestSink.probe(system), system)
.request(2)
.expectNext(1)
.expectComplete();
}
@Test
public void mustBeAbleToOnErrorResumeWithPredicate() {
Source.from(Arrays.asList(1, 2))
.map(
elem -> {
if (elem == 2) {
throw new IllegalArgumentException("Boom");
} else {
return elem;
}
})
.via(
Flow.of(Integer.class)
.onErrorResume(ex -> ex.getMessage().contains("Boom"), e -> Source.single(0)))
.runWith(TestSink.probe(system), system)
.request(2)
.expectNext(1)
.expectNext(0)
.expectComplete();
}
@Test
public void mustBeAbleToMapErrorClass() {
final String head = "foo";

View file

@ -1647,4 +1647,153 @@ public class SourceTest extends StreamTest {
.expectNext("Message1", "Message2")
.expectComplete();
}
@Test
public void mustBeAbleToOnErrorComplete() {
Source.from(Arrays.asList(1, 2))
.map(
elem -> {
if (elem == 2) {
throw new RuntimeException("ex");
} else {
return elem;
}
})
.onErrorComplete()
.runWith(TestSink.probe(system), system)
.request(2)
.expectNext(1)
.expectComplete();
}
@Test
public void mustBeAbleToOnErrorResume() {
Source.from(Arrays.asList(1, 2))
.map(
elem -> {
if (elem == 2) {
throw new RuntimeException("ex");
} else {
return elem;
}
})
.onErrorResume(e -> Source.single(0))
.runWith(TestSink.probe(system), system)
.request(2)
.expectNext(1)
.expectNext(0)
.expectComplete();
}
@Test
public void mustBeAbleToOnErrorCompleteWithDedicatedException() {
Source.from(Arrays.asList(1, 2))
.map(
elem -> {
if (elem == 2) {
throw new IllegalArgumentException("ex");
} else {
return elem;
}
})
.onErrorComplete(IllegalArgumentException.class)
.runWith(TestSink.probe(system), system)
.request(2)
.expectNext(1)
.expectComplete();
}
@Test
public void mustBeAbleToOnErrorResumeWithDedicatedException() {
Source.from(Arrays.asList(1, 2))
.map(
elem -> {
if (elem == 2) {
throw new IllegalArgumentException("ex");
} else {
return elem;
}
})
.onErrorResume(IllegalArgumentException.class, e -> Source.single(0))
.runWith(TestSink.probe(system), system)
.request(2)
.expectNext(1)
.expectNext(0)
.expectComplete();
}
@Test
public void mustBeAbleToFailWhenExceptionTypeNotMatch() {
final IllegalArgumentException ex = new IllegalArgumentException("ex");
Source.from(Arrays.asList(1, 2))
.map(
elem -> {
if (elem == 2) {
throw ex;
} else {
return elem;
}
})
.onErrorComplete(TimeoutException.class)
.runWith(TestSink.probe(system), system)
.request(2)
.expectNext(1)
.expectError(ex);
}
@Test
public void onErrorResumeMustBeAbleToFailWhenExceptionTypeNotMatch() {
final IllegalArgumentException ex = new IllegalArgumentException("ex");
Source.from(Arrays.asList(1, 2))
.map(
elem -> {
if (elem == 2) {
throw ex;
} else {
return elem;
}
})
.onErrorResume(TimeoutException.class, e -> Source.single(0))
.runWith(TestSink.probe(system), system)
.request(2)
.expectNext(1)
.expectError(ex);
}
@Test
public void mustBeAbleToOnErrorCompleteWithPredicate() {
Source.from(Arrays.asList(1, 2))
.map(
elem -> {
if (elem == 2) {
throw new IllegalArgumentException("Boom");
} else {
return elem;
}
})
.onErrorComplete(ex -> ex.getMessage().contains("Boom"))
.runWith(TestSink.probe(system), system)
.request(2)
.expectNext(1)
.expectComplete();
}
@Test
public void mustBeAbleToOnErrorResumeWithPredicate() {
Source.from(Arrays.asList(1, 2))
.map(
elem -> {
if (elem == 2) {
throw new IllegalArgumentException("Boom");
} else {
return elem;
}
})
.onErrorResume(ex -> ex.getMessage().contains("Boom"), e -> Source.single(0))
.runWith(TestSink.probe(system), system)
.request(2)
.expectNext(1)
.expectNext(0)
.expectComplete();
}
}

View file

@ -70,7 +70,9 @@ class DslConsistencySpec extends AnyWordSpec with Matchers {
"andThenMat",
"isIdentity",
"withAttributes",
"transformMaterializing") ++
"transformMaterializing",
"onErrorResume" // Java Only, Scala use `recoverWith`
) ++
Set("asScala", "asJava", "deprecatedAndThen", "deprecatedAndThenMat")
val graphHelpers = Set(

View file

@ -2109,6 +2109,81 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr
case ex: Throwable if predicate.test(ex) => true
})
/**
* Transform a failure signal into a stream of elements provided by a factory function.
* This allows to continue processing with another stream when a failure occurs.
*
* Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements.
* This operator can recover the failure signal, but not the skipped elements, which will be dropped.
*
* '''Emits when''' element is available from the upstream or upstream is failed and fallback Source produces an element
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' upstream completes or upstream failed with exception and fallback Source completes
*
* '''Cancels when''' downstream cancels
*
* @param fallback Function which produces a Source to continue the stream
* @since 2.0.0
*/
def onErrorResume[T >: Out](fallback: function.Function[_ >: Throwable, _ <: Graph[SourceShape[T], NotUsed]])
: javadsl.Flow[In, T, Mat] = new Flow(delegate.recoverWith {
case ex: Throwable => fallback(ex)
})
/**
* Transform a failure signal into a stream of elements provided by a factory function.
* This allows to continue processing with another stream when a failure occurs.
*
* Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements.
* This operator can recover the failure signal, but not the skipped elements, which will be dropped.
*
* '''Emits when''' element is available from the upstream or upstream is failed and fallback Source produces an element
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' upstream completes or upstream failed with exception and fallback Source completes
*
* '''Cancels when''' downstream cancels
*
* @param clazz the class object of the failure cause
* @param fallback Function which produces a Source to continue the stream
* @since 2.0.0
*/
def onErrorResume[T >: Out](
clazz: Class[_ <: Throwable],
fallback: function.Function[_ >: Throwable, _ <: Graph[SourceShape[T], NotUsed]])
: javadsl.Flow[In, T, Mat] = new Flow(delegate.recoverWith {
case ex: Throwable if clazz.isInstance(ex) => fallback(ex)
})
/**
* Transform a failure signal into a stream of elements provided by a factory function.
* This allows to continue processing with another stream when a failure occurs.
*
* Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements.
* This operator can recover the failure signal, but not the skipped elements, which will be dropped.
*
* '''Emits when''' element is available from the upstream or upstream is failed and fallback Source produces an element
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' upstream completes or upstream failed with exception and fallback Source completes
*
* '''Cancels when''' downstream cancels
*
* @param predicate Predicate which determines if the exception should be handled
* @param function Function which produces a Source to continue the stream
* @since 2.0.0
*/
def onErrorResume[T >: Out](
predicate: function.Predicate[_ >: Throwable],
fallback: function.Function[_ >: Throwable, _ <: Graph[SourceShape[T], NotUsed]])
: javadsl.Flow[In, T, Mat] = new Flow(delegate.recoverWith {
case ex: Throwable if predicate.test(ex) => fallback(ex)
})
/**
* Terminate processing (and cancel the upstream publisher) after the given
* number of elements. Due to input buffering some elements may have been

View file

@ -2343,6 +2343,82 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[
case ex: Throwable if predicate.test(ex) => true
})
/**
* Transform a failure signal into a Source of elements provided by a factory function.
* This allows to continue processing with another stream when a failure occurs.
*
* Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements.
* This operator can recover the failure signal, but not the skipped elements, which will be dropped.
*
* '''Emits when''' element is available from the upstream or upstream is failed and fallback Source produces an element
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' upstream completes or upstream failed with exception and fallback Source completes
*
* '''Cancels when''' downstream cancels
*
* @param fallback Function which produces a Source to continue the stream
* @since 2.0.0
*/
def onErrorResume[T >: Out](
fallback: function.Function[_ >: Throwable, _ <: Graph[SourceShape[T], NotUsed]]): javadsl.Source[T, Mat] =
new Source(delegate.recoverWith {
case ex: Throwable => fallback(ex)
})
/**
* Transform a failure signal into a stream of elements provided by a factory function.
* This allows to continue processing with another stream when a failure occurs.
*
* Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements.
* This operator can recover the failure signal, but not the skipped elements, which will be dropped.
*
* '''Emits when''' element is available from the upstream or upstream is failed and fallback Source produces an element
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' upstream completes or upstream failed with exception and fallback Source completes
*
* '''Cancels when''' downstream cancels
*
* @param clazz the class object of the failure cause
* @param fallback Function which produces a Source to continue the stream
* @since 2.0.0
*/
def onErrorResume[T >: Out](
clazz: Class[_ <: Throwable],
fallback: function.Function[_ >: Throwable, _ <: Graph[SourceShape[T], NotUsed]]): javadsl.Source[T, Mat] =
new Source(delegate.recoverWith {
case ex: Throwable if clazz.isInstance(ex) => fallback(ex)
})
/**
* Transform a failure signal into a stream of elements provided by a factory function.
* This allows to continue processing with another stream when a failure occurs.
*
* Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements.
* This operator can recover the failure signal, but not the skipped elements, which will be dropped.
*
* '''Emits when''' element is available from the upstream or upstream is failed and fallback Source produces an element
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' upstream completes or upstream failed with exception and fallback Source completes
*
* '''Cancels when''' downstream cancels
*
* @param predicate Predicate which determines if the exception should be handled
* @param fallback Function which produces a Source to continue the stream
* @since 2.0.0
*/
def onErrorResume[T >: Out](
predicate: function.Predicate[_ >: Throwable],
fallback: function.Function[_ >: Throwable, _ <: Graph[SourceShape[T], NotUsed]]): javadsl.Source[T, Mat] =
new Source(delegate.recoverWith {
case ex: Throwable if predicate.test(ex) => fallback(ex)
})
/**
* Transform each input element into an `Iterable` of output elements that is
* then flattened into the output stream.