From 87b94b65fd1ad5ccf9b0cf5b7080e2dc3f181d74 Mon Sep 17 00:00:00 2001 From: Andrey Yamshchikov <404-@users.noreply.github.com> Date: Tue, 10 Dec 2019 07:05:56 -0500 Subject: [PATCH] Add Java DSL Flow#mapError version without PartialFunction (#24992) (#26310) * Add an overloaded version of the Flow#mapError (Java DSL) which does not use a Scala PartialFunction. * Add test verifying mapError matching on parent class * Add to Source, SubSource and SubFlow as well --- .../java/akka/stream/javadsl/FlowTest.java | 49 +++++++++++++++++++ .../main/scala/akka/stream/javadsl/Flow.scala | 24 +++++++++ .../scala/akka/stream/javadsl/Source.scala | 24 +++++++++ .../scala/akka/stream/javadsl/SubFlow.scala | 26 ++++++++++ .../scala/akka/stream/javadsl/SubSource.scala | 24 +++++++++ 5 files changed, 147 insertions(+) diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java index 7e3991c995..e485c78eac 100644 --- a/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java @@ -13,6 +13,7 @@ import akka.japi.function.*; import akka.japi.pf.PFBuilder; import akka.stream.*; import akka.stream.scaladsl.FlowSpec; +import akka.stream.testkit.javadsl.TestSink; import akka.util.ConstantFun; import akka.stream.javadsl.GraphDSL.Builder; import akka.stream.stage.*; @@ -1080,6 +1081,54 @@ public class FlowTest extends StreamTest { future.toCompletableFuture().get(3, TimeUnit.SECONDS); } + @Test + public void mustBeAbleToMapErrorClass() { + final String head = "foo"; + final Source, NotUsed> source = + Source.from(Arrays.asList(Optional.of(head), Optional.empty())); + final IllegalArgumentException boom = new IllegalArgumentException("boom"); + final Flow, String, NotUsed> flow = + Flow., String>fromFunction(Optional::get) + .mapError(NoSuchElementException.class, (NoSuchElementException e) -> boom); + + source + .via(flow) + .runWith(TestSink.probe(system), system) + .request(2) + .expectNext(head) + .expectError(boom); + } + + @Test + public void mustBeAbleToMapErrorClassExactly() { + final Source source = Source.single("foo"); + final Flow flow = + Flow.fromFunction(str -> str.charAt(-1)) + .mapError(NoSuchElementException.class, IllegalArgumentException::new); + + final Throwable actual = + source.via(flow).runWith(TestSink.probe(system), system).request(1).expectError(); + org.junit.Assert.assertTrue(actual instanceof IndexOutOfBoundsException); + } + + @Test + public void mustBeAbleToMapErrorSuperClass() { + final String head = "foo"; + final Source, NotUsed> source = + Source.from(Arrays.asList(Optional.of(head), Optional.empty())); + final IllegalArgumentException boom = new IllegalArgumentException("boom"); + final Flow, String, NotUsed> flow = + Flow., String>fromFunction(Optional::get) + .mapError(RuntimeException.class, (RuntimeException e) -> boom); + + source + .via(flow) + .runWith(TestSink.probe(system), system) + .request(2) + .expectNext(head) + .expectError(boom); + } + @Test public void mustBeAbleToMaterializeIdentityWithJavaFlow() throws Exception { final TestKit probe = new TestKit(system); diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala index a8e7d38c7b..6fdab2bd97 100755 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala @@ -1555,6 +1555,30 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr def mapError(pf: PartialFunction[Throwable, Throwable]): javadsl.Flow[In, Out, Mat] = new Flow(delegate.mapError(pf)) + /** + * While similar to [[recover]] this operator can be used to transform an error signal to a different one *without* logging + * it as an error in the process. So in that sense it is NOT exactly equivalent to `recover(t => throw t2)` since recover + * would log the `t2` error. + * + * Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements. + * This operator can recover the failure signal, but not the skipped elements, which will be dropped. + * + * Similarly to [[recover]] throwing an exception inside `mapError` _will_ be logged. + * + * '''Emits when''' element is available from the upstream or upstream is failed and pf returns an element + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes or upstream failed with exception pf can handle + * + * '''Cancels when''' downstream cancels + * + */ + def mapError[E <: Throwable](clazz: Class[E], f: function.Function[E, Throwable]): javadsl.Flow[In, Out, Mat] = + mapError { + case err if clazz.isInstance(err) => f(clazz.cast(err)) + } + /** * RecoverWith allows to switch to alternative Source on flow failure. It will stay in effect after * a failure has been recovered so that each time there is a failure it is fed into the `pf` and a new diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala index f592266b54..1ec3a29310 100755 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala @@ -1847,6 +1847,30 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[ def mapError(pf: PartialFunction[Throwable, Throwable]): javadsl.Source[Out, Mat] = new Source(delegate.mapError(pf)) + /** + * While similar to [[recover]] this operator can be used to transform an error signal to a different one *without* logging + * it as an error in the process. So in that sense it is NOT exactly equivalent to `recover(t => throw t2)` since recover + * would log the `t2` error. + * + * Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements. + * This operator can recover the failure signal, but not the skipped elements, which will be dropped. + * + * Similarly to [[recover]] throwing an exception inside `mapError` _will_ be logged. + * + * '''Emits when''' element is available from the upstream or upstream is failed and pf returns an element + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes or upstream failed with exception pf can handle + * + * '''Cancels when''' downstream cancels + * + */ + def mapError[E <: Throwable](clazz: Class[E], f: function.Function[E, Throwable]): javadsl.Source[Out, Mat] = + mapError { + case err if clazz.isInstance(err) => f(clazz.cast(err)) + } + /** * RecoverWith allows to switch to alternative Source on flow failure. It will stay in effect after * a failure has been recovered so that each time there is a failure it is fed into the `pf` and a new diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala index dd6588785e..8869597614 100755 --- a/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala @@ -1026,6 +1026,32 @@ class SubFlow[In, Out, Mat]( def mapError(pf: PartialFunction[Throwable, Throwable]): SubFlow[In, Out, Mat @uncheckedVariance] = new SubFlow(delegate.mapError(pf)) + /** + * While similar to [[recover]] this operator can be used to transform an error signal to a different one *without* logging + * it as an error in the process. So in that sense it is NOT exactly equivalent to `recover(t => throw t2)` since recover + * would log the `t2` error. + * + * Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements. + * This operator can recover the failure signal, but not the skipped elements, which will be dropped. + * + * Similarly to [[recover]] throwing an exception inside `mapError` _will_ be logged. + * + * '''Emits when''' element is available from the upstream or upstream is failed and pf returns an element + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes or upstream failed with exception pf can handle + * + * '''Cancels when''' downstream cancels + * + */ + def mapError[E <: Throwable]( + clazz: Class[E], + f: function.Function[E, Throwable]): javadsl.SubFlow[In, Out, Mat @uncheckedVariance] = + mapError { + case err if clazz.isInstance(err) => f(clazz.cast(err)) + } + /** * Terminate processing (and cancel the upstream publisher) after the given * number of elements. Due to input buffering some elements may have been diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala b/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala index 5cc3d6be8c..7111d29127 100755 --- a/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala @@ -1007,6 +1007,30 @@ class SubSource[Out, Mat]( def mapError(pf: PartialFunction[Throwable, Throwable]): SubSource[Out, Mat] = new SubSource(delegate.mapError(pf)) + /** + * While similar to [[recover]] this operator can be used to transform an error signal to a different one *without* logging + * it as an error in the process. So in that sense it is NOT exactly equivalent to `recover(t => throw t2)` since recover + * would log the `t2` error. + * + * Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements. + * This operator can recover the failure signal, but not the skipped elements, which will be dropped. + * + * Similarly to [[recover]] throwing an exception inside `mapError` _will_ be logged. + * + * '''Emits when''' element is available from the upstream or upstream is failed and pf returns an element + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes or upstream failed with exception pf can handle + * + * '''Cancels when''' downstream cancels + * + */ + def mapError[E <: Throwable](clazz: Class[E], f: function.Function[E, Throwable]): javadsl.SubSource[Out, Mat] = + mapError { + case err if clazz.isInstance(err) => f(clazz.cast(err)) + } + /** * Terminate processing (and cancel the upstream publisher) after the given * number of elements. Due to input buffering some elements may have been