* Add an overloaded version of the Flow#mapError (Java DSL) which does not use a Scala PartialFunction. * Add test verifying mapError matching on parent class * Add to Source, SubSource and SubFlow as well
This commit is contained in:
parent
653d05e7d6
commit
87b94b65fd
5 changed files with 147 additions and 0 deletions
|
|
@ -13,6 +13,7 @@ import akka.japi.function.*;
|
||||||
import akka.japi.pf.PFBuilder;
|
import akka.japi.pf.PFBuilder;
|
||||||
import akka.stream.*;
|
import akka.stream.*;
|
||||||
import akka.stream.scaladsl.FlowSpec;
|
import akka.stream.scaladsl.FlowSpec;
|
||||||
|
import akka.stream.testkit.javadsl.TestSink;
|
||||||
import akka.util.ConstantFun;
|
import akka.util.ConstantFun;
|
||||||
import akka.stream.javadsl.GraphDSL.Builder;
|
import akka.stream.javadsl.GraphDSL.Builder;
|
||||||
import akka.stream.stage.*;
|
import akka.stream.stage.*;
|
||||||
|
|
@ -1080,6 +1081,54 @@ public class FlowTest extends StreamTest {
|
||||||
future.toCompletableFuture().get(3, TimeUnit.SECONDS);
|
future.toCompletableFuture().get(3, TimeUnit.SECONDS);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void mustBeAbleToMapErrorClass() {
|
||||||
|
final String head = "foo";
|
||||||
|
final Source<Optional<String>, NotUsed> source =
|
||||||
|
Source.from(Arrays.asList(Optional.of(head), Optional.empty()));
|
||||||
|
final IllegalArgumentException boom = new IllegalArgumentException("boom");
|
||||||
|
final Flow<Optional<String>, String, NotUsed> flow =
|
||||||
|
Flow.<Optional<String>, String>fromFunction(Optional::get)
|
||||||
|
.mapError(NoSuchElementException.class, (NoSuchElementException e) -> boom);
|
||||||
|
|
||||||
|
source
|
||||||
|
.via(flow)
|
||||||
|
.runWith(TestSink.probe(system), system)
|
||||||
|
.request(2)
|
||||||
|
.expectNext(head)
|
||||||
|
.expectError(boom);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void mustBeAbleToMapErrorClassExactly() {
|
||||||
|
final Source<String, NotUsed> source = Source.single("foo");
|
||||||
|
final Flow<String, Character, NotUsed> flow =
|
||||||
|
Flow.<String, Character>fromFunction(str -> str.charAt(-1))
|
||||||
|
.mapError(NoSuchElementException.class, IllegalArgumentException::new);
|
||||||
|
|
||||||
|
final Throwable actual =
|
||||||
|
source.via(flow).runWith(TestSink.probe(system), system).request(1).expectError();
|
||||||
|
org.junit.Assert.assertTrue(actual instanceof IndexOutOfBoundsException);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void mustBeAbleToMapErrorSuperClass() {
|
||||||
|
final String head = "foo";
|
||||||
|
final Source<Optional<String>, NotUsed> source =
|
||||||
|
Source.from(Arrays.asList(Optional.of(head), Optional.empty()));
|
||||||
|
final IllegalArgumentException boom = new IllegalArgumentException("boom");
|
||||||
|
final Flow<Optional<String>, String, NotUsed> flow =
|
||||||
|
Flow.<Optional<String>, String>fromFunction(Optional::get)
|
||||||
|
.mapError(RuntimeException.class, (RuntimeException e) -> boom);
|
||||||
|
|
||||||
|
source
|
||||||
|
.via(flow)
|
||||||
|
.runWith(TestSink.probe(system), system)
|
||||||
|
.request(2)
|
||||||
|
.expectNext(head)
|
||||||
|
.expectError(boom);
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void mustBeAbleToMaterializeIdentityWithJavaFlow() throws Exception {
|
public void mustBeAbleToMaterializeIdentityWithJavaFlow() throws Exception {
|
||||||
final TestKit probe = new TestKit(system);
|
final TestKit probe = new TestKit(system);
|
||||||
|
|
|
||||||
|
|
@ -1555,6 +1555,30 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr
|
||||||
def mapError(pf: PartialFunction[Throwable, Throwable]): javadsl.Flow[In, Out, Mat] =
|
def mapError(pf: PartialFunction[Throwable, Throwable]): javadsl.Flow[In, Out, Mat] =
|
||||||
new Flow(delegate.mapError(pf))
|
new Flow(delegate.mapError(pf))
|
||||||
|
|
||||||
|
/**
|
||||||
|
* While similar to [[recover]] this operator can be used to transform an error signal to a different one *without* logging
|
||||||
|
* it as an error in the process. So in that sense it is NOT exactly equivalent to `recover(t => throw t2)` since recover
|
||||||
|
* would log the `t2` error.
|
||||||
|
*
|
||||||
|
* Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements.
|
||||||
|
* This operator can recover the failure signal, but not the skipped elements, which will be dropped.
|
||||||
|
*
|
||||||
|
* Similarly to [[recover]] throwing an exception inside `mapError` _will_ be logged.
|
||||||
|
*
|
||||||
|
* '''Emits when''' element is available from the upstream or upstream is failed and pf returns an element
|
||||||
|
*
|
||||||
|
* '''Backpressures when''' downstream backpressures
|
||||||
|
*
|
||||||
|
* '''Completes when''' upstream completes or upstream failed with exception pf can handle
|
||||||
|
*
|
||||||
|
* '''Cancels when''' downstream cancels
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
def mapError[E <: Throwable](clazz: Class[E], f: function.Function[E, Throwable]): javadsl.Flow[In, Out, Mat] =
|
||||||
|
mapError {
|
||||||
|
case err if clazz.isInstance(err) => f(clazz.cast(err))
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* RecoverWith allows to switch to alternative Source on flow failure. It will stay in effect after
|
* RecoverWith allows to switch to alternative Source on flow failure. It will stay in effect after
|
||||||
* a failure has been recovered so that each time there is a failure it is fed into the `pf` and a new
|
* a failure has been recovered so that each time there is a failure it is fed into the `pf` and a new
|
||||||
|
|
|
||||||
|
|
@ -1847,6 +1847,30 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[
|
||||||
def mapError(pf: PartialFunction[Throwable, Throwable]): javadsl.Source[Out, Mat] =
|
def mapError(pf: PartialFunction[Throwable, Throwable]): javadsl.Source[Out, Mat] =
|
||||||
new Source(delegate.mapError(pf))
|
new Source(delegate.mapError(pf))
|
||||||
|
|
||||||
|
/**
|
||||||
|
* While similar to [[recover]] this operator can be used to transform an error signal to a different one *without* logging
|
||||||
|
* it as an error in the process. So in that sense it is NOT exactly equivalent to `recover(t => throw t2)` since recover
|
||||||
|
* would log the `t2` error.
|
||||||
|
*
|
||||||
|
* Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements.
|
||||||
|
* This operator can recover the failure signal, but not the skipped elements, which will be dropped.
|
||||||
|
*
|
||||||
|
* Similarly to [[recover]] throwing an exception inside `mapError` _will_ be logged.
|
||||||
|
*
|
||||||
|
* '''Emits when''' element is available from the upstream or upstream is failed and pf returns an element
|
||||||
|
*
|
||||||
|
* '''Backpressures when''' downstream backpressures
|
||||||
|
*
|
||||||
|
* '''Completes when''' upstream completes or upstream failed with exception pf can handle
|
||||||
|
*
|
||||||
|
* '''Cancels when''' downstream cancels
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
def mapError[E <: Throwable](clazz: Class[E], f: function.Function[E, Throwable]): javadsl.Source[Out, Mat] =
|
||||||
|
mapError {
|
||||||
|
case err if clazz.isInstance(err) => f(clazz.cast(err))
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* RecoverWith allows to switch to alternative Source on flow failure. It will stay in effect after
|
* RecoverWith allows to switch to alternative Source on flow failure. It will stay in effect after
|
||||||
* a failure has been recovered so that each time there is a failure it is fed into the `pf` and a new
|
* a failure has been recovered so that each time there is a failure it is fed into the `pf` and a new
|
||||||
|
|
|
||||||
|
|
@ -1026,6 +1026,32 @@ class SubFlow[In, Out, Mat](
|
||||||
def mapError(pf: PartialFunction[Throwable, Throwable]): SubFlow[In, Out, Mat @uncheckedVariance] =
|
def mapError(pf: PartialFunction[Throwable, Throwable]): SubFlow[In, Out, Mat @uncheckedVariance] =
|
||||||
new SubFlow(delegate.mapError(pf))
|
new SubFlow(delegate.mapError(pf))
|
||||||
|
|
||||||
|
/**
|
||||||
|
* While similar to [[recover]] this operator can be used to transform an error signal to a different one *without* logging
|
||||||
|
* it as an error in the process. So in that sense it is NOT exactly equivalent to `recover(t => throw t2)` since recover
|
||||||
|
* would log the `t2` error.
|
||||||
|
*
|
||||||
|
* Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements.
|
||||||
|
* This operator can recover the failure signal, but not the skipped elements, which will be dropped.
|
||||||
|
*
|
||||||
|
* Similarly to [[recover]] throwing an exception inside `mapError` _will_ be logged.
|
||||||
|
*
|
||||||
|
* '''Emits when''' element is available from the upstream or upstream is failed and pf returns an element
|
||||||
|
*
|
||||||
|
* '''Backpressures when''' downstream backpressures
|
||||||
|
*
|
||||||
|
* '''Completes when''' upstream completes or upstream failed with exception pf can handle
|
||||||
|
*
|
||||||
|
* '''Cancels when''' downstream cancels
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
def mapError[E <: Throwable](
|
||||||
|
clazz: Class[E],
|
||||||
|
f: function.Function[E, Throwable]): javadsl.SubFlow[In, Out, Mat @uncheckedVariance] =
|
||||||
|
mapError {
|
||||||
|
case err if clazz.isInstance(err) => f(clazz.cast(err))
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Terminate processing (and cancel the upstream publisher) after the given
|
* Terminate processing (and cancel the upstream publisher) after the given
|
||||||
* number of elements. Due to input buffering some elements may have been
|
* number of elements. Due to input buffering some elements may have been
|
||||||
|
|
|
||||||
|
|
@ -1007,6 +1007,30 @@ class SubSource[Out, Mat](
|
||||||
def mapError(pf: PartialFunction[Throwable, Throwable]): SubSource[Out, Mat] =
|
def mapError(pf: PartialFunction[Throwable, Throwable]): SubSource[Out, Mat] =
|
||||||
new SubSource(delegate.mapError(pf))
|
new SubSource(delegate.mapError(pf))
|
||||||
|
|
||||||
|
/**
|
||||||
|
* While similar to [[recover]] this operator can be used to transform an error signal to a different one *without* logging
|
||||||
|
* it as an error in the process. So in that sense it is NOT exactly equivalent to `recover(t => throw t2)` since recover
|
||||||
|
* would log the `t2` error.
|
||||||
|
*
|
||||||
|
* Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements.
|
||||||
|
* This operator can recover the failure signal, but not the skipped elements, which will be dropped.
|
||||||
|
*
|
||||||
|
* Similarly to [[recover]] throwing an exception inside `mapError` _will_ be logged.
|
||||||
|
*
|
||||||
|
* '''Emits when''' element is available from the upstream or upstream is failed and pf returns an element
|
||||||
|
*
|
||||||
|
* '''Backpressures when''' downstream backpressures
|
||||||
|
*
|
||||||
|
* '''Completes when''' upstream completes or upstream failed with exception pf can handle
|
||||||
|
*
|
||||||
|
* '''Cancels when''' downstream cancels
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
def mapError[E <: Throwable](clazz: Class[E], f: function.Function[E, Throwable]): javadsl.SubSource[Out, Mat] =
|
||||||
|
mapError {
|
||||||
|
case err if clazz.isInstance(err) => f(clazz.cast(err))
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Terminate processing (and cancel the upstream publisher) after the given
|
* Terminate processing (and cancel the upstream publisher) after the given
|
||||||
* number of elements. Due to input buffering some elements may have been
|
* number of elements. Due to input buffering some elements may have been
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue