From 55fb092bb2466898b547190660e88cc6a2e057c4 Mon Sep 17 00:00:00 2001 From: Song Kun Date: Mon, 14 May 2018 01:15:00 +0800 Subject: [PATCH] Add convenient version of recover, recoverWith and recoverWithRetries for javadsl.Flow (#25036) * Add more convenient version of recover, recoverWith and recoverWithRetries for javadsl.Flow. * The new method take a Class parameter to decide which failure to recover from. * Also add corresponding unit tests for them. * use case expression to express partial function * make time out larger in unit test * checkstyle * fix parameter type --- .../java/akka/stream/javadsl/FlowTest.java | 163 +++++++++++++++--- .../main/scala/akka/stream/javadsl/Flow.scala | 83 ++++++++- 2 files changed, 222 insertions(+), 24 deletions(-) diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java index 81bfb5787f..7a6b84ee90 100644 --- a/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java @@ -10,6 +10,7 @@ import akka.actor.ActorRef; import akka.japi.JavaPartialFunction; import akka.japi.Pair; import akka.japi.function.*; +import akka.japi.pf.PFBuilder; import akka.stream.*; import akka.stream.scaladsl.FlowSpec; import akka.util.ConstantFun; @@ -106,7 +107,7 @@ public class FlowTest extends StreamTest { probe.expectMsgEquals(2); probe.expectMsgEquals(3); - future.toCompletableFuture().get(200, TimeUnit.MILLISECONDS); + future.toCompletableFuture().get(3, TimeUnit.SECONDS); } @Test @@ -149,7 +150,7 @@ public class FlowTest extends StreamTest { probe.expectMsgEquals(","); probe.expectMsgEquals("3"); probe.expectMsgEquals("]"); - future.toCompletableFuture().get(200, TimeUnit.MILLISECONDS); + future.toCompletableFuture().get(3, TimeUnit.SECONDS); } @Test @@ -169,7 +170,7 @@ public class FlowTest extends StreamTest { probe.expectMsgEquals("2"); probe.expectMsgEquals(","); probe.expectMsgEquals("3"); - future.toCompletableFuture().get(200, TimeUnit.MILLISECONDS); + future.toCompletableFuture().get(3, TimeUnit.SECONDS); } @Test @@ -192,7 +193,7 @@ public class FlowTest extends StreamTest { FiniteDuration duration = Duration.apply(200, TimeUnit.MILLISECONDS); probe.expectNoMsg(duration); - future.toCompletableFuture().get(200, TimeUnit.MILLISECONDS); + future.toCompletableFuture().get(3, TimeUnit.SECONDS); } @@ -386,7 +387,7 @@ public class FlowTest extends StreamTest { final Publisher pub = source.runWith(publisher, materializer); final CompletionStage> all = Source.fromPublisher(pub).limit(100).runWith(Sink.seq(), materializer); - final List result = all.toCompletableFuture().get(200, TimeUnit.MILLISECONDS); + final List result = all.toCompletableFuture().get(3, TimeUnit.SECONDS); assertEquals(new HashSet(Arrays.asList("a", "b", "c", "d", "e", "f")), new HashSet(result)); } @@ -435,7 +436,7 @@ public class FlowTest extends StreamTest { final Publisher pub = source.runWith(publisher, materializer); final CompletionStage> all = Source.fromPublisher(pub).limit(100).runWith(Sink.seq(), materializer); - final List result = all.toCompletableFuture().get(200, TimeUnit.MILLISECONDS); + final List result = all.toCompletableFuture().get(3, TimeUnit.SECONDS); assertEquals(new HashSet(Arrays.asList("a", "b", "c", "d", "e", "f")), new HashSet(result)); } @@ -711,12 +712,10 @@ public class FlowTest extends StreamTest { final TestKit probe = new TestKit(system); final Source source = Source.fromPublisher(publisherProbe); - final Flow flow = Flow.of(Integer.class).map( - new Function() { - public Integer apply(Integer elem) { - if (elem == 2) throw new RuntimeException("ex"); - else return elem; - } + final Flow flow = Flow.of(Integer.class) + .map(elem -> { + if (elem == 2) throw new RuntimeException("ex"); + else return elem; }) .recover(new JavaPartialFunction() { public Integer apply(Throwable elem, boolean isCheck) { @@ -736,7 +735,37 @@ public class FlowTest extends StreamTest { probe.expectMsgEquals(1); s.sendNext(2); probe.expectMsgEquals(0); - future.toCompletableFuture().get(200, TimeUnit.MILLISECONDS); + future.toCompletableFuture().get(3, TimeUnit.SECONDS); + } + + @Test + public void mustBeAbleToRecoverClass() throws Exception { + final TestPublisher.ManualProbe publisherProbe = TestPublisher.manualProbe(true,system); + final TestKit probe = new TestKit(system); + + final Source source = Source.fromPublisher(publisherProbe); + final Flow flow = Flow.of(Integer.class) + .map(elem -> { + if (elem == 2) throw new RuntimeException("ex"); + else return elem; + }) + .recover( + RuntimeException.class, + () -> 0 + ); + + final CompletionStage future = + source.via(flow).runWith(Sink.foreach(elem -> probe.getRef().tell(elem, ActorRef.noSender())), materializer); + + final PublisherProbeSubscription s = publisherProbe.expectSubscription(); + + s.sendNext(0); + probe.expectMsgEquals(0); + s.sendNext(1); + probe.expectMsgEquals(1); + s.sendNext(2); + probe.expectMsgEquals(0); + future.toCompletableFuture().get(3, TimeUnit.SECONDS); } @Test @@ -746,12 +775,10 @@ public class FlowTest extends StreamTest { final Iterable recover = Arrays.asList(55, 0); final Source source = Source.fromPublisher(publisherProbe); - final Flow flow = Flow.of(Integer.class).map( - new Function() { - public Integer apply(Integer elem) { - if (elem == 2) throw new RuntimeException("ex"); - else return elem; - } + final Flow flow = Flow.of(Integer.class) + .map(elem -> { + if (elem == 2) throw new RuntimeException("ex"); + else return elem; }) .recoverWith(new JavaPartialFunction>() { public Source apply(Throwable elem, boolean isCheck) { @@ -772,9 +799,105 @@ public class FlowTest extends StreamTest { s.sendNext(2); probe.expectMsgEquals(55); probe.expectMsgEquals(0); - future.toCompletableFuture().get(200, TimeUnit.MILLISECONDS); + future.toCompletableFuture().get(3, TimeUnit.SECONDS); } + @Test + public void mustBeAbleToRecoverWithClass() throws Exception { + final TestPublisher.ManualProbe publisherProbe = TestPublisher.manualProbe(true,system); + final TestKit probe = new TestKit(system); + final Iterable recover = Arrays.asList(55, 0); + + final Source source = Source.fromPublisher(publisherProbe); + final Flow flow = Flow.of(Integer.class) + .map(elem -> { + if (elem == 2) throw new RuntimeException("ex"); + else return elem; + }) + .recoverWith( + RuntimeException.class, + () -> Source.from(recover)); + + final CompletionStage future = + source.via(flow).runWith(Sink.foreach(elem -> probe.getRef().tell(elem, ActorRef.noSender())), materializer); + + final PublisherProbeSubscription s = publisherProbe.expectSubscription(); + + s.sendNext(0); + probe.expectMsgEquals(0); + s.sendNext(1); + probe.expectMsgEquals(1); + s.sendNext(2); + probe.expectMsgEquals(55); + probe.expectMsgEquals(0); + future.toCompletableFuture().get(3, TimeUnit.SECONDS); + } + + @Test + public void mustBeAbleToRecoverWithRetries() throws Exception { + final TestPublisher.ManualProbe publisherProbe = TestPublisher.manualProbe(true,system); + final TestKit probe = new TestKit(system); + final Iterable recover = Arrays.asList(55, 0); + + final Source source = Source.fromPublisher(publisherProbe); + final Flow flow = Flow.of(Integer.class) + .map(elem -> { + if (elem == 2) throw new RuntimeException("ex"); + else return elem; + }) + .recoverWithRetries( + 3, + new PFBuilder() + .match(RuntimeException.class, ex -> Source.from(recover)) + .build()); + + final CompletionStage future = + source.via(flow).runWith(Sink.foreach(elem -> probe.getRef().tell(elem, ActorRef.noSender())), materializer); + + final PublisherProbeSubscription s = publisherProbe.expectSubscription(); + + s.sendNext(0); + probe.expectMsgEquals(0); + s.sendNext(1); + probe.expectMsgEquals(1); + s.sendNext(2); + probe.expectMsgEquals(55); + probe.expectMsgEquals(0); + future.toCompletableFuture().get(3, TimeUnit.SECONDS); + } + + @Test + public void mustBeAbleToRecoverWithRetriesClass() throws Exception { + final TestPublisher.ManualProbe publisherProbe = TestPublisher.manualProbe(true,system); + final TestKit probe = new TestKit(system); + final Iterable recover = Arrays.asList(55, 0); + + final Source source = Source.fromPublisher(publisherProbe); + final Flow flow = Flow.of(Integer.class) + .map(elem -> { + if (elem == 2) throw new RuntimeException("ex"); + else return elem; + }) + .recoverWithRetries( + 3, + RuntimeException.class, + () -> Source.from(recover)); + + final CompletionStage future = + source.via(flow).runWith(Sink.foreach(elem -> probe.getRef().tell(elem, ActorRef.noSender())), materializer); + + final PublisherProbeSubscription s = publisherProbe.expectSubscription(); + + s.sendNext(0); + probe.expectMsgEquals(0); + s.sendNext(1); + probe.expectMsgEquals(1); + s.sendNext(2); + probe.expectMsgEquals(55); + probe.expectMsgEquals(0); + future.toCompletableFuture().get(3, TimeUnit.SECONDS); + } + @Test public void mustBeAbleToMaterializeIdentityWithJavaFlow() throws Exception { final TestKit probe = new TestKit(system); diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala index 8ccd7d3b1b..6f6c1388e4 100755 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala @@ -7,16 +7,16 @@ package akka.stream.javadsl import akka.util.{ ConstantFun, Timeout } import akka.{ Done, NotUsed } import akka.event.LoggingAdapter -import akka.japi.{ Pair, function } +import akka.japi.{ Pair, Util, function } import akka.stream._ import org.reactivestreams.Processor import scala.concurrent.duration.FiniteDuration -import akka.japi.Util import java.util.{ Comparator, Optional } import java.util.concurrent.CompletionStage -import akka.util.JavaDurationConverters._ +import java.util.function.Supplier +import akka.util.JavaDurationConverters._ import akka.actor.ActorRef import akka.dispatch.ExecutionContexts import akka.stream.impl.fusing.LazyFlow @@ -1313,6 +1313,26 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr def recover(pf: PartialFunction[Throwable, Out]): javadsl.Flow[In, Out, Mat] = new Flow(delegate.recover(pf)) + /** + * Recover allows to send last element on failure and gracefully complete the stream + * Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements. + * This stage can recover the failure signal, but not the skipped elements, which will be dropped. + * + * Throwing an exception inside `recover` _will_ be logged on ERROR level automatically. + * + * '''Emits when''' element is available from the upstream or upstream is failed and pf returns an element + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes or upstream failed with exception pf can handle + * + * '''Cancels when''' downstream cancels + */ + def recover(clazz: Class[_ <: Throwable], supplier: Supplier[Out]): javadsl.Flow[In, Out, Mat] = + recover { + case elem if clazz.isInstance(elem) ⇒ supplier.get() + } + /** * While similar to [[recover]] this stage can be used to transform an error signal to a different one *without* logging * it as an error in the process. So in that sense it is NOT exactly equivalent to `recover(t => throw t2)` since recover @@ -1355,10 +1375,34 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr * '''Cancels when''' downstream cancels * */ - @deprecated("Use recoverWithRetries instead.", "2.4.4") def recoverWith(pf: PartialFunction[Throwable, _ <: Graph[SourceShape[Out], NotUsed]]): javadsl.Flow[In, Out, Mat] = new Flow(delegate.recoverWith(pf)) + /** + * RecoverWith allows to switch to alternative Source on flow failure. It will stay in effect after + * a failure has been recovered so that each time there is a failure it is fed into the `pf` and a new + * Source may be materialized. + * + * Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements. + * This stage can recover the failure signal, but not the skipped elements, which will be dropped. + * + * Throwing an exception inside `recoverWith` _will_ be logged on ERROR level automatically. + * + * '''Emits when''' element is available from the upstream or upstream is failed and element is available + * from alternative Source + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes or upstream failed with exception pf can handle + * + * '''Cancels when''' downstream cancels + * + */ + def recoverWith(clazz: Class[_ <: Throwable], supplier: Supplier[Graph[SourceShape[Out], NotUsed]]): javadsl.Flow[In, Out, Mat] = + recoverWith { + case elem if clazz.isInstance(elem) ⇒ supplier.get() + } + /** * RecoverWithRetries allows to switch to alternative Source on flow failure. It will stay in effect after * a failure has been recovered up to `attempts` number of times so that each time there is a failure @@ -1387,6 +1431,37 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr def recoverWithRetries(attempts: Int, pf: PartialFunction[Throwable, Graph[SourceShape[Out], NotUsed]]): javadsl.Flow[In, Out, Mat] = new Flow(delegate.recoverWithRetries(attempts, pf)) + /** + * RecoverWithRetries allows to switch to alternative Source on flow failure. It will stay in effect after + * a failure has been recovered up to `attempts` number of times so that each time there is a failure + * it is fed into the `pf` and a new Source may be materialized. Note that if you pass in 0, this won't + * attempt to recover at all. + * + * A negative `attempts` number is interpreted as "infinite", which results in the exact same behavior as `recoverWith`. + * + * Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements. + * This stage can recover the failure signal, but not the skipped elements, which will be dropped. + * + * Throwing an exception inside `recoverWithRetries` _will_ be logged on ERROR level automatically. + * + * '''Emits when''' element is available from the upstream or upstream is failed and element is available + * from alternative Source + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes or upstream failed with exception pf can handle + * + * '''Cancels when''' downstream cancels + * + * @param attempts Maximum number of retries or -1 to retry indefinitely + * @param clazz the class object of the failure cause + * @param supplier supply the new Source to be materialized + */ + def recoverWithRetries(attempts: Int, clazz: Class[_ <: Throwable], supplier: Supplier[Graph[SourceShape[Out], NotUsed]]): javadsl.Flow[In, Out, Mat] = + recoverWithRetries(attempts, { + case elem if clazz.isInstance(elem) ⇒ supplier.get() + }) + /** * Terminate processing (and cancel the upstream publisher) after the given * number of elements. Due to input buffering some elements may have been