Add convenient version of recover, recoverWith and recoverWithRetries for javadsl.Flow (#25036)

* Add more convenient version of recover, recoverWith and recoverWithRetries for javadsl.Flow.

* The new method take a Class parameter to decide which failure to recover from.
* Also add corresponding unit tests for them.

* use case expression to express partial function

* make time out larger in unit test

* checkstyle

* fix parameter type
This commit is contained in:
Song Kun 2018-05-14 01:15:00 +08:00 committed by Patrik Nordwall
parent 254914c1ad
commit 55fb092bb2
2 changed files with 222 additions and 24 deletions

View file

@ -10,6 +10,7 @@ import akka.actor.ActorRef;
import akka.japi.JavaPartialFunction;
import akka.japi.Pair;
import akka.japi.function.*;
import akka.japi.pf.PFBuilder;
import akka.stream.*;
import akka.stream.scaladsl.FlowSpec;
import akka.util.ConstantFun;
@ -106,7 +107,7 @@ public class FlowTest extends StreamTest {
probe.expectMsgEquals(2);
probe.expectMsgEquals(3);
future.toCompletableFuture().get(200, TimeUnit.MILLISECONDS);
future.toCompletableFuture().get(3, TimeUnit.SECONDS);
}
@Test
@ -149,7 +150,7 @@ public class FlowTest extends StreamTest {
probe.expectMsgEquals(",");
probe.expectMsgEquals("3");
probe.expectMsgEquals("]");
future.toCompletableFuture().get(200, TimeUnit.MILLISECONDS);
future.toCompletableFuture().get(3, TimeUnit.SECONDS);
}
@Test
@ -169,7 +170,7 @@ public class FlowTest extends StreamTest {
probe.expectMsgEquals("2");
probe.expectMsgEquals(",");
probe.expectMsgEquals("3");
future.toCompletableFuture().get(200, TimeUnit.MILLISECONDS);
future.toCompletableFuture().get(3, TimeUnit.SECONDS);
}
@Test
@ -192,7 +193,7 @@ public class FlowTest extends StreamTest {
FiniteDuration duration = Duration.apply(200, TimeUnit.MILLISECONDS);
probe.expectNoMsg(duration);
future.toCompletableFuture().get(200, TimeUnit.MILLISECONDS);
future.toCompletableFuture().get(3, TimeUnit.SECONDS);
}
@ -386,7 +387,7 @@ public class FlowTest extends StreamTest {
final Publisher<String> pub = source.runWith(publisher, materializer);
final CompletionStage<List<String>> all = Source.fromPublisher(pub).limit(100).runWith(Sink.<String>seq(), materializer);
final List<String> result = all.toCompletableFuture().get(200, TimeUnit.MILLISECONDS);
final List<String> result = all.toCompletableFuture().get(3, TimeUnit.SECONDS);
assertEquals(new HashSet<Object>(Arrays.asList("a", "b", "c", "d", "e", "f")), new HashSet<String>(result));
}
@ -435,7 +436,7 @@ public class FlowTest extends StreamTest {
final Publisher<String> pub = source.runWith(publisher, materializer);
final CompletionStage<List<String>> all = Source.fromPublisher(pub).limit(100).runWith(Sink.<String>seq(), materializer);
final List<String> result = all.toCompletableFuture().get(200, TimeUnit.MILLISECONDS);
final List<String> result = all.toCompletableFuture().get(3, TimeUnit.SECONDS);
assertEquals(new HashSet<Object>(Arrays.asList("a", "b", "c", "d", "e", "f")), new HashSet<String>(result));
}
@ -711,12 +712,10 @@ public class FlowTest extends StreamTest {
final TestKit probe = new TestKit(system);
final Source<Integer, NotUsed> source = Source.fromPublisher(publisherProbe);
final Flow<Integer, Integer, NotUsed> flow = Flow.of(Integer.class).map(
new Function<Integer, Integer>() {
public Integer apply(Integer elem) {
if (elem == 2) throw new RuntimeException("ex");
else return elem;
}
final Flow<Integer, Integer, NotUsed> flow = Flow.of(Integer.class)
.map(elem -> {
if (elem == 2) throw new RuntimeException("ex");
else return elem;
})
.recover(new JavaPartialFunction<Throwable, Integer>() {
public Integer apply(Throwable elem, boolean isCheck) {
@ -736,7 +735,37 @@ public class FlowTest extends StreamTest {
probe.expectMsgEquals(1);
s.sendNext(2);
probe.expectMsgEquals(0);
future.toCompletableFuture().get(200, TimeUnit.MILLISECONDS);
future.toCompletableFuture().get(3, TimeUnit.SECONDS);
}
@Test
public void mustBeAbleToRecoverClass() throws Exception {
final TestPublisher.ManualProbe<Integer> publisherProbe = TestPublisher.manualProbe(true,system);
final TestKit probe = new TestKit(system);
final Source<Integer, NotUsed> source = Source.fromPublisher(publisherProbe);
final Flow<Integer, Integer, NotUsed> flow = Flow.of(Integer.class)
.map(elem -> {
if (elem == 2) throw new RuntimeException("ex");
else return elem;
})
.recover(
RuntimeException.class,
() -> 0
);
final CompletionStage<Done> future =
source.via(flow).runWith(Sink.foreach(elem -> probe.getRef().tell(elem, ActorRef.noSender())), materializer);
final PublisherProbeSubscription<Integer> s = publisherProbe.expectSubscription();
s.sendNext(0);
probe.expectMsgEquals(0);
s.sendNext(1);
probe.expectMsgEquals(1);
s.sendNext(2);
probe.expectMsgEquals(0);
future.toCompletableFuture().get(3, TimeUnit.SECONDS);
}
@Test
@ -746,12 +775,10 @@ public class FlowTest extends StreamTest {
final Iterable<Integer> recover = Arrays.asList(55, 0);
final Source<Integer, NotUsed> source = Source.fromPublisher(publisherProbe);
final Flow<Integer, Integer, NotUsed> flow = Flow.of(Integer.class).map(
new Function<Integer, Integer>() {
public Integer apply(Integer elem) {
if (elem == 2) throw new RuntimeException("ex");
else return elem;
}
final Flow<Integer, Integer, NotUsed> flow = Flow.of(Integer.class)
.map(elem -> {
if (elem == 2) throw new RuntimeException("ex");
else return elem;
})
.recoverWith(new JavaPartialFunction<Throwable, Source<Integer, NotUsed>>() {
public Source<Integer, NotUsed> apply(Throwable elem, boolean isCheck) {
@ -772,9 +799,105 @@ public class FlowTest extends StreamTest {
s.sendNext(2);
probe.expectMsgEquals(55);
probe.expectMsgEquals(0);
future.toCompletableFuture().get(200, TimeUnit.MILLISECONDS);
future.toCompletableFuture().get(3, TimeUnit.SECONDS);
}
@Test
public void mustBeAbleToRecoverWithClass() throws Exception {
final TestPublisher.ManualProbe<Integer> publisherProbe = TestPublisher.manualProbe(true,system);
final TestKit probe = new TestKit(system);
final Iterable<Integer> recover = Arrays.asList(55, 0);
final Source<Integer, NotUsed> source = Source.fromPublisher(publisherProbe);
final Flow<Integer, Integer, NotUsed> flow = Flow.of(Integer.class)
.map(elem -> {
if (elem == 2) throw new RuntimeException("ex");
else return elem;
})
.recoverWith(
RuntimeException.class,
() -> Source.from(recover));
final CompletionStage<Done> future =
source.via(flow).runWith(Sink.foreach(elem -> probe.getRef().tell(elem, ActorRef.noSender())), materializer);
final PublisherProbeSubscription<Integer> s = publisherProbe.expectSubscription();
s.sendNext(0);
probe.expectMsgEquals(0);
s.sendNext(1);
probe.expectMsgEquals(1);
s.sendNext(2);
probe.expectMsgEquals(55);
probe.expectMsgEquals(0);
future.toCompletableFuture().get(3, TimeUnit.SECONDS);
}
@Test
public void mustBeAbleToRecoverWithRetries() throws Exception {
final TestPublisher.ManualProbe<Integer> publisherProbe = TestPublisher.manualProbe(true,system);
final TestKit probe = new TestKit(system);
final Iterable<Integer> recover = Arrays.asList(55, 0);
final Source<Integer, NotUsed> source = Source.fromPublisher(publisherProbe);
final Flow<Integer, Integer, NotUsed> flow = Flow.of(Integer.class)
.map(elem -> {
if (elem == 2) throw new RuntimeException("ex");
else return elem;
})
.recoverWithRetries(
3,
new PFBuilder()
.match(RuntimeException.class, ex -> Source.from(recover))
.build());
final CompletionStage<Done> future =
source.via(flow).runWith(Sink.foreach(elem -> probe.getRef().tell(elem, ActorRef.noSender())), materializer);
final PublisherProbeSubscription<Integer> s = publisherProbe.expectSubscription();
s.sendNext(0);
probe.expectMsgEquals(0);
s.sendNext(1);
probe.expectMsgEquals(1);
s.sendNext(2);
probe.expectMsgEquals(55);
probe.expectMsgEquals(0);
future.toCompletableFuture().get(3, TimeUnit.SECONDS);
}
@Test
public void mustBeAbleToRecoverWithRetriesClass() throws Exception {
final TestPublisher.ManualProbe<Integer> publisherProbe = TestPublisher.manualProbe(true,system);
final TestKit probe = new TestKit(system);
final Iterable<Integer> recover = Arrays.asList(55, 0);
final Source<Integer, NotUsed> source = Source.fromPublisher(publisherProbe);
final Flow<Integer, Integer, NotUsed> flow = Flow.of(Integer.class)
.map(elem -> {
if (elem == 2) throw new RuntimeException("ex");
else return elem;
})
.recoverWithRetries(
3,
RuntimeException.class,
() -> Source.from(recover));
final CompletionStage<Done> future =
source.via(flow).runWith(Sink.foreach(elem -> probe.getRef().tell(elem, ActorRef.noSender())), materializer);
final PublisherProbeSubscription<Integer> s = publisherProbe.expectSubscription();
s.sendNext(0);
probe.expectMsgEquals(0);
s.sendNext(1);
probe.expectMsgEquals(1);
s.sendNext(2);
probe.expectMsgEquals(55);
probe.expectMsgEquals(0);
future.toCompletableFuture().get(3, TimeUnit.SECONDS);
}
@Test
public void mustBeAbleToMaterializeIdentityWithJavaFlow() throws Exception {
final TestKit probe = new TestKit(system);

View file

@ -7,16 +7,16 @@ package akka.stream.javadsl
import akka.util.{ ConstantFun, Timeout }
import akka.{ Done, NotUsed }
import akka.event.LoggingAdapter
import akka.japi.{ Pair, function }
import akka.japi.{ Pair, Util, function }
import akka.stream._
import org.reactivestreams.Processor
import scala.concurrent.duration.FiniteDuration
import akka.japi.Util
import java.util.{ Comparator, Optional }
import java.util.concurrent.CompletionStage
import akka.util.JavaDurationConverters._
import java.util.function.Supplier
import akka.util.JavaDurationConverters._
import akka.actor.ActorRef
import akka.dispatch.ExecutionContexts
import akka.stream.impl.fusing.LazyFlow
@ -1313,6 +1313,26 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr
def recover(pf: PartialFunction[Throwable, Out]): javadsl.Flow[In, Out, Mat] =
new Flow(delegate.recover(pf))
/**
* Recover allows to send last element on failure and gracefully complete the stream
* Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements.
* This stage can recover the failure signal, but not the skipped elements, which will be dropped.
*
* Throwing an exception inside `recover` _will_ be logged on ERROR level automatically.
*
* '''Emits when''' element is available from the upstream or upstream is failed and pf returns an element
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' upstream completes or upstream failed with exception pf can handle
*
* '''Cancels when''' downstream cancels
*/
def recover(clazz: Class[_ <: Throwable], supplier: Supplier[Out]): javadsl.Flow[In, Out, Mat] =
recover {
case elem if clazz.isInstance(elem) supplier.get()
}
/**
* While similar to [[recover]] this stage can be used to transform an error signal to a different one *without* logging
* it as an error in the process. So in that sense it is NOT exactly equivalent to `recover(t => throw t2)` since recover
@ -1355,10 +1375,34 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr
* '''Cancels when''' downstream cancels
*
*/
@deprecated("Use recoverWithRetries instead.", "2.4.4")
def recoverWith(pf: PartialFunction[Throwable, _ <: Graph[SourceShape[Out], NotUsed]]): javadsl.Flow[In, Out, Mat] =
new Flow(delegate.recoverWith(pf))
/**
* RecoverWith allows to switch to alternative Source on flow failure. It will stay in effect after
* a failure has been recovered so that each time there is a failure it is fed into the `pf` and a new
* Source may be materialized.
*
* Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements.
* This stage can recover the failure signal, but not the skipped elements, which will be dropped.
*
* Throwing an exception inside `recoverWith` _will_ be logged on ERROR level automatically.
*
* '''Emits when''' element is available from the upstream or upstream is failed and element is available
* from alternative Source
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' upstream completes or upstream failed with exception pf can handle
*
* '''Cancels when''' downstream cancels
*
*/
def recoverWith(clazz: Class[_ <: Throwable], supplier: Supplier[Graph[SourceShape[Out], NotUsed]]): javadsl.Flow[In, Out, Mat] =
recoverWith {
case elem if clazz.isInstance(elem) supplier.get()
}
/**
* RecoverWithRetries allows to switch to alternative Source on flow failure. It will stay in effect after
* a failure has been recovered up to `attempts` number of times so that each time there is a failure
@ -1387,6 +1431,37 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr
def recoverWithRetries(attempts: Int, pf: PartialFunction[Throwable, Graph[SourceShape[Out], NotUsed]]): javadsl.Flow[In, Out, Mat] =
new Flow(delegate.recoverWithRetries(attempts, pf))
/**
* RecoverWithRetries allows to switch to alternative Source on flow failure. It will stay in effect after
* a failure has been recovered up to `attempts` number of times so that each time there is a failure
* it is fed into the `pf` and a new Source may be materialized. Note that if you pass in 0, this won't
* attempt to recover at all.
*
* A negative `attempts` number is interpreted as "infinite", which results in the exact same behavior as `recoverWith`.
*
* Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements.
* This stage can recover the failure signal, but not the skipped elements, which will be dropped.
*
* Throwing an exception inside `recoverWithRetries` _will_ be logged on ERROR level automatically.
*
* '''Emits when''' element is available from the upstream or upstream is failed and element is available
* from alternative Source
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' upstream completes or upstream failed with exception pf can handle
*
* '''Cancels when''' downstream cancels
*
* @param attempts Maximum number of retries or -1 to retry indefinitely
* @param clazz the class object of the failure cause
* @param supplier supply the new Source to be materialized
*/
def recoverWithRetries(attempts: Int, clazz: Class[_ <: Throwable], supplier: Supplier[Graph[SourceShape[Out], NotUsed]]): javadsl.Flow[In, Out, Mat] =
recoverWithRetries(attempts, {
case elem if clazz.isInstance(elem) supplier.get()
})
/**
* Terminate processing (and cancel the upstream publisher) after the given
* number of elements. Due to input buffering some elements may have been