From 5a8f4135b7f343eda6d728595c74a597877141bd Mon Sep 17 00:00:00 2001 From: Alexander Golubev Date: Fri, 29 Jan 2016 22:06:36 -0500 Subject: [PATCH] +str #18045 add recoverWith(Source) --- akka-docs/rst/java/stream/stages-overview.rst | 10 ++ .../rst/scala/stream/stages-overview.rst | 10 ++ .../java/akka/stream/javadsl/FlowTest.java | 36 ++++++ .../stream/scaladsl/FlowRecoverSpec.scala | 55 +++------ .../stream/scaladsl/FlowRecoverWithSpec.scala | 116 ++++++++++++++++++ .../main/scala/akka/stream/impl/Stages.scala | 1 + .../scala/akka/stream/impl/fusing/Ops.scala | 52 +++++++- .../stream/impl/fusing/StreamOfStreams.scala | 1 + .../main/scala/akka/stream/javadsl/Flow.scala | 21 ++++ .../scala/akka/stream/javadsl/Source.scala | 20 +++ .../scala/akka/stream/javadsl/SubFlow.scala | 20 +++ .../scala/akka/stream/javadsl/SubSource.scala | 20 +++ .../scala/akka/stream/scaladsl/Flow.scala | 20 +++ 13 files changed, 346 insertions(+), 36 deletions(-) create mode 100644 akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowRecoverWithSpec.scala diff --git a/akka-docs/rst/java/stream/stages-overview.rst b/akka-docs/rst/java/stream/stages-overview.rst index 3a9a6cca54..e39bc5cb61 100644 --- a/akka-docs/rst/java/stream/stages-overview.rst +++ b/akka-docs/rst/java/stream/stages-overview.rst @@ -580,6 +580,16 @@ Allow sending of one last element downstream when a failure has happened upstrea *completes* when upstream completes or upstream failed with exception pf can handle +recoverWith +^^^^^^^^^^^ +Allow switching to alternative Source when a failure has happened upstream. + +*emits* the element is available from the upstream or upstream is failed and pf returns alternative Source + +*backpressures* downstream backpressures, after failure happened it backprssures to alternative Source + +*completes* upstream completes or upstream failed with exception pf can handle + detach ^^^^^^ Detach upstream demand from downstream demand without detaching the stream rates. diff --git a/akka-docs/rst/scala/stream/stages-overview.rst b/akka-docs/rst/scala/stream/stages-overview.rst index d94920d576..b46a7aaa7e 100644 --- a/akka-docs/rst/scala/stream/stages-overview.rst +++ b/akka-docs/rst/scala/stream/stages-overview.rst @@ -569,6 +569,16 @@ Allow sending of one last element downstream when a failure has happened upstrea *completes* when upstream completes or upstream failed with exception pf can handle +recoverWith +^^^^^^^^^^^ +Allow switching to alternative Source when a failure has happened upstream. + +*emits* the element is available from the upstream or upstream is failed and pf returns alternative Source + +*backpressures* downstream backpressures, after failure happened it backprssures to alternative Source + +*completes* upstream completes or upstream failed with exception pf can handle + detach ^^^^^^ Detach upstream demand from downstream demand without detaching the stream rates. diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java index 1a5f56ce79..df13d98a9d 100644 --- a/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java @@ -628,6 +628,42 @@ public class FlowTest extends StreamTest { future.toCompletableFuture().get(200, TimeUnit.MILLISECONDS); } + @Test + public void mustBeAbleToRecoverWithSource() throws Exception { + final TestPublisher.ManualProbe publisherProbe = TestPublisher.manualProbe(true,system); + final JavaTestKit probe = new JavaTestKit(system); + final Iterable recover = Arrays.asList(55, 0); + + final Source source = Source.fromPublisher(publisherProbe); + final Flow flow = Flow.of(Integer.class).map( + new Function() { + public Integer apply(Integer elem) { + if (elem == 2) throw new RuntimeException("ex"); + else return elem; + } + }) + .recoverWith(new JavaPartialFunction>() { + public Source apply(Throwable elem, boolean isCheck) { + if (isCheck) return null; + return Source.from(recover); + } + }); + + final CompletionStage future = + source.via(flow).runWith(Sink.foreach(elem -> probe.getRef().tell(elem, ActorRef.noSender())), materializer); + + final PublisherProbeSubscription s = publisherProbe.expectSubscription(); + + s.sendNext(0); + probe.expectMsgEquals(0); + s.sendNext(1); + probe.expectMsgEquals(1); + s.sendNext(2); + probe.expectMsgEquals(55); + probe.expectMsgEquals(0); + future.toCompletableFuture().get(200, TimeUnit.MILLISECONDS); + } + @Test public void mustBeAbleToMaterializeIdentityWithJavaFlow() throws Exception { final JavaTestKit probe = new JavaTestKit(system); diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowRecoverSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowRecoverSpec.scala index 2815e89429..949c9e701e 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowRecoverSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowRecoverSpec.scala @@ -3,9 +3,10 @@ */ package akka.stream.scaladsl +import akka.stream.testkit.scaladsl.TestSink import akka.stream.{ ActorMaterializer, ActorMaterializerSettings } import akka.stream.testkit.Utils._ -import akka.stream.testkit.{ AkkaSpec, TestSubscriber } +import akka.stream.testkit.AkkaSpec import scala.util.control.NoStackTrace @@ -19,56 +20,40 @@ class FlowRecoverSpec extends AkkaSpec { "A Recover" must { "recover when there is a handler" in assertAllStagesStopped { - val subscriber = TestSubscriber.probe[Int]() - Source(1 to 4).map { a ⇒ if (a == 3) throw ex else a } .recover { case t: Throwable ⇒ 0 } - .runWith(Sink.fromSubscriber(subscriber)) - - subscriber.requestNext(1) - subscriber.requestNext(2) - - subscriber.request(1) - subscriber.expectNext(0) - - subscriber.request(1) - subscriber.expectComplete() + .runWith(TestSink.probe[Int]) + .requestNext(1) + .requestNext(2) + .requestNext(0) + .request(1) + .expectComplete() } "failed stream if handler is not for such exception type" in assertAllStagesStopped { - val subscriber = TestSubscriber.probe[Int]() - Source(1 to 3).map { a ⇒ if (a == 2) throw ex else a } .recover { case t: IndexOutOfBoundsException ⇒ 0 } - .runWith(Sink.fromSubscriber(subscriber)) - - subscriber.requestNext(1) - subscriber.request(1) - subscriber.expectError(ex) + .runWith(TestSink.probe[Int]) + .requestNext(1) + .request(1) + .expectError(ex) } "not influence stream when there is no exceptions" in assertAllStagesStopped { - val subscriber = TestSubscriber.probe[Int]() - - val k = Source(1 to 3).map(identity) + Source(1 to 3).map(identity) .recover { case t: Throwable ⇒ 0 } - .runWith(Sink.fromSubscriber(subscriber)) - - subscriber.requestNext(1) - subscriber.requestNext(2) - subscriber.requestNext(3) - subscriber.expectComplete() + .runWith(TestSink.probe[Int]) + .request(3) + .expectNextN(1 to 3) + .expectComplete() } "finish stream if it's empty" in assertAllStagesStopped { - val subscriber = TestSubscriber.probe[Int]() Source.empty.map(identity) .recover { case t: Throwable ⇒ 0 } - .runWith(Sink.fromSubscriber(subscriber)) - - subscriber.request(1) - subscriber.expectComplete() - + .runWith(TestSink.probe[Int]) + .request(1) + .expectComplete() } } } diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowRecoverWithSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowRecoverWithSpec.scala new file mode 100644 index 0000000000..7903eff232 --- /dev/null +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowRecoverWithSpec.scala @@ -0,0 +1,116 @@ +/** + * Copyright (C) 2016 Typesafe Inc. + */ +package akka.stream.scaladsl + +import akka.stream.testkit.scaladsl.TestSink +import akka.stream.{ ActorMaterializer, ActorMaterializerSettings } +import akka.stream.testkit.Utils._ +import akka.stream.testkit.AkkaSpec + +import scala.util.control.NoStackTrace + +class FlowRecoverWithSpec extends AkkaSpec { + + val settings = ActorMaterializerSettings(system).withInputBuffer(initialSize = 1, maxSize = 1) + + implicit val materializer = ActorMaterializer(settings) + + val ex = new RuntimeException("ex") with NoStackTrace + + "A RecoverWith" must { + "recover when there is a handler" in assertAllStagesStopped { + Source(1 to 4).map { a ⇒ if (a == 3) throw ex else a } + .recoverWith { case t: Throwable ⇒ Source(List(0, -1)) } + .runWith(TestSink.probe[Int]) + .request(2) + .expectNextN(1 to 2) + .request(1) + .expectNext(0) + .request(1) + .expectNext(-1) + .expectComplete() + } + + "cancel substream if parent is terminated when there is a handler" in assertAllStagesStopped { + Source(1 to 4).map { a ⇒ if (a == 3) throw ex else a } + .recoverWith { case t: Throwable ⇒ Source(List(0, -1)) } + .runWith(TestSink.probe[Int]) + .request(2) + .expectNextN(1 to 2) + .request(1) + .expectNext(0) + .cancel() + } + + "failed stream if handler is not for such exception type" in assertAllStagesStopped { + Source(1 to 3).map { a ⇒ if (a == 2) throw ex else a } + .recoverWith { case t: IndexOutOfBoundsException ⇒ Source.single(0) } + .runWith(TestSink.probe[Int]) + .request(1) + .expectNext(1) + .request(1) + .expectError(ex) + } + + "be able to recover with th same unmaterialized source if configured" in assertAllStagesStopped { + val src = Source(1 to 3).map { a ⇒ if (a == 3) throw ex else a } + src.recoverWith { case t: Throwable ⇒ src } + .runWith(TestSink.probe[Int]) + .request(2) + .expectNextN(1 to 2) + .request(2) + .expectNextN(1 to 2) + .request(2) + .expectNextN(1 to 2) + .cancel() + } + + "not influence stream when there is no exceptions" in assertAllStagesStopped { + Source(1 to 3).map(identity) + .recoverWith { case t: Throwable ⇒ Source.single(0) } + .runWith(TestSink.probe[Int]) + .request(3) + .expectNextN(1 to 3) + .expectComplete() + } + + "finish stream if it's empty" in assertAllStagesStopped { + Source.empty.map(identity) + .recoverWith { case t: Throwable ⇒ Source.single(0) } + .runWith(TestSink.probe[Int]) + .request(3) + .expectComplete() + } + + "switch the second time if alternative source throws exception" in assertAllStagesStopped { + val k = Source(1 to 3).map { a ⇒ if (a == 3) throw new IndexOutOfBoundsException() else a } + .recoverWith { + case t: IndexOutOfBoundsException ⇒ + Source(List(11, 22)).map(m ⇒ if (m == 22) throw new IllegalArgumentException() else m) + case t: IllegalArgumentException ⇒ Source(List(33, 44)) + }.runWith(TestSink.probe[Int]) + .request(2) + .expectNextN(List(1, 2)) + .request(2) + .expectNextN(List(11, 33)) + .request(1) + .expectNext(44) + .expectComplete() + } + + "terminate with exception if altrnative source failed" in assertAllStagesStopped { + Source(1 to 3).map { a ⇒ if (a == 3) throw new IndexOutOfBoundsException() else a } + .recoverWith { + case t: IndexOutOfBoundsException ⇒ + Source(List(11, 22)).map(m ⇒ if (m == 22) throw ex else m) + }.runWith(TestSink.probe[Int]) + .request(2) + .expectNextN(List(1, 2)) + .request(1) + .expectNextN(List(11)) + .request(1) + .expectError(ex) + } + } +} diff --git a/akka-stream/src/main/scala/akka/stream/impl/Stages.scala b/akka-stream/src/main/scala/akka/stream/impl/Stages.scala index 771664d742..9eaa14261a 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Stages.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Stages.scala @@ -65,6 +65,7 @@ private[stream] object Stages { val merge = name("merge") val mergePreferred = name("mergePreferred") val flattenMerge = name("flattenMerge") + val recoverWith = name("recoverWith") val broadcast = name("broadcast") val balance = name("balance") val zip = name("zip") diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala index 74dd9e9942..2a5d17d0d1 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala @@ -6,10 +6,10 @@ package akka.stream.impl.fusing import akka.event.Logging.LogLevel import akka.event.{ LogSource, Logging, LoggingAdapter } import akka.stream.Attributes.{ InputBuffer, LogLevels } -import akka.stream.impl.Stages.DefaultAttributes import akka.stream.OverflowStrategies._ import akka.stream.impl.fusing.GraphStages.SimpleLinearGraphStage import akka.stream.impl.{ Buffer ⇒ BufferImpl, ReactiveStreamsCompliance } +import akka.stream.scaladsl.Source import akka.stream.stage._ import akka.stream.{ Supervision, _ } import scala.annotation.tailrec @@ -1119,3 +1119,53 @@ private[stream] final class Reduce[T](f: (T, T) ⇒ T) extends SimpleLinearGraph } override def toString = "Reduce" } + +/** + * INTERNAL API + */ +private[stream] final class RecoverWith[T, M](pf: PartialFunction[Throwable, Graph[SourceShape[T], M]]) extends SimpleLinearGraphStage[T] { + override def initialAttributes = DefaultAttributes.recoverWith + + override def createLogic(attr: Attributes) = new GraphStageLogic(shape) { + setHandler(in, new InHandler { + override def onPush(): Unit = push(out, grab(in)) + override def onUpstreamFailure(ex: Throwable) = onFailure(ex) + }) + + setHandler(out, new OutHandler { + override def onPull(): Unit = pull(in) + }) + + def onFailure(ex: Throwable) = if (pf.isDefinedAt(ex)) switchTo(pf(ex)) else failStage(ex) + + def switchTo(source: Graph[SourceShape[T], M]): Unit = { + val sinkIn = new SubSinkInlet[T]("RecoverWithSink") + sinkIn.setHandler(new InHandler { + override def onPush(): Unit = + if (isAvailable(out)) { + push(out, sinkIn.grab()) + sinkIn.pull() + } + override def onUpstreamFinish(): Unit = if (!sinkIn.isAvailable) completeStage() + override def onUpstreamFailure(ex: Throwable) = onFailure(ex) + }) + + def pushOut(): Unit = { + push(out, sinkIn.grab()) + if (!sinkIn.isClosed) sinkIn.pull() + else completeStage() + } + + val outHandler = new OutHandler { + override def onPull(): Unit = if (sinkIn.isAvailable) pushOut() + override def onDownstreamFinish(): Unit = sinkIn.cancel() + } + + Source.fromGraph(source).runWith(sinkIn.sink)(interpreter.subFusingMaterializer) + setHandler(out, outHandler) + sinkIn.pull() + } + } + + override def toString: String = "RecoverWith" +} \ No newline at end of file diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/StreamOfStreams.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/StreamOfStreams.scala index 7e423a1309..a692f359d7 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/StreamOfStreams.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/StreamOfStreams.scala @@ -8,6 +8,7 @@ import akka.NotUsed import akka.stream._ import akka.stream.impl.Stages.DefaultAttributes import akka.stream.impl.SubscriptionTimeoutException +import akka.stream.impl.fusing.GraphStages.SimpleLinearGraphStage import akka.stream.stage._ import akka.stream.scaladsl._ import akka.stream.actor.ActorSubscriberMessage diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala index c41d6832cf..d0f01441fa 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala @@ -3,6 +3,7 @@ */ package akka.stream.javadsl +import akka.stream.impl.fusing.RecoverWith import akka.{ NotUsed, Done } import akka.event.LoggingAdapter import akka.japi.{ function, Pair } @@ -760,6 +761,26 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends def recover[T >: Out](pf: PartialFunction[Throwable, T]): javadsl.Flow[In, T, Mat] = new Flow(delegate.recover(pf)) + /** + * RecoverWith allows to switch to alternative Source on flow failure. It will stay in effect after + * a failure has been recovered so that each time there is a failure it is fed into the `pf` and a new + * Source may be materialized. + * Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements. + * This stage can recover the failure signal, but not the skipped elements, which will be dropped. + * + * '''Emits when''' element is available from the upstream or upstream is failed and element is available + * from alternative Source + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes or upstream failed with exception pf can handle + * + * '''Cancels when''' downstream cancels + * + */ + def recoverWith[T >: Out](pf: PartialFunction[Throwable, _ <: Graph[SourceShape[T], NotUsed]]): javadsl.Flow[In, T, Mat @uncheckedVariance] = + new Flow(delegate.recoverWith(pf)) + /** * Terminate processing (and cancel the upstream publisher) after the given * number of elements. Due to input buffering some elements may have been diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala index 653c344c46..974d1ea494 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala @@ -775,6 +775,26 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap def recover[T >: Out](pf: PartialFunction[Throwable, T]): javadsl.Source[T, Mat] = new Source(delegate.recover(pf)) + /** + * RecoverWith allows to switch to alternative Source on flow failure. It will stay in effect after + * a failure has been recovered so that each time there is a failure it is fed into the `pf` and a new + * Source may be materialized. + * Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements. + * This stage can recover the failure signal, but not the skipped elements, which will be dropped. + * + * '''Emits when''' element is available from the upstream or upstream is failed and element is available + * from alternative Source + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes or upstream failed with exception pf can handle + * + * '''Cancels when''' downstream cancels + * + */ + def recoverWith[T >: Out](pf: PartialFunction[Throwable, _ <: Graph[SourceShape[T], NotUsed]]): Source[T, Mat @uncheckedVariance] = + new Source(delegate.recoverWith(pf)) + /** * Transform each input element into an `Iterable of output elements that is * then flattened into the output stream. diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala index aa61e8830b..5176084a10 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala @@ -606,6 +606,26 @@ class SubFlow[-In, +Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Flo def recover[T >: Out](pf: PartialFunction[Throwable, T]): SubFlow[In, T, Mat] = new SubFlow(delegate.recover(pf)) + /** + * RecoverWith allows to switch to alternative Source on flow failure. It will stay in effect after + * a failure has been recovered so that each time there is a failure it is fed into the `pf` and a new + * Source may be materialized. + * Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements. + * This stage can recover the failure signal, but not the skipped elements, which will be dropped. + * + * '''Emits when''' element is available from the upstream or upstream is failed and element is available + * from alternative Source + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes or upstream failed with exception pf can handle + * + * '''Cancels when''' downstream cancels + * + */ + def recoverWith[T >: Out](pf: PartialFunction[Throwable, _ <: Graph[SourceShape[T], NotUsed]]): SubFlow[In, T, Mat @uncheckedVariance] = + new SubFlow(delegate.recoverWith(pf)) + /** * Terminate processing (and cancel the upstream publisher) after the given * number of elements. Due to input buffering some elements may have been diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala b/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala index 80faf6b12e..e804341e6f 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala @@ -602,6 +602,26 @@ class SubSource[+Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source def recover[T >: Out](pf: PartialFunction[Throwable, T]): SubSource[T, Mat] = new SubSource(delegate.recover(pf)) + /** + * RecoverWith allows to switch to alternative Source on flow failure. It will stay in effect after + * a failure has been recovered so that each time there is a failure it is fed into the `pf` and a new + * Source may be materialized. + * Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements. + * This stage can recover the failure signal, but not the skipped elements, which will be dropped. + * + * '''Emits when''' element is available from the upstream or upstream is failed and element is available + * from alternative Source + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes or upstream failed with exception pf can handle + * + * '''Cancels when''' downstream cancels + * + */ + def recoverWith[T >: Out](pf: PartialFunction[Throwable, _ <: Graph[SourceShape[T], NotUsed]]): SubSource[T, Mat @uncheckedVariance] = + new SubSource(delegate.recoverWith(pf)) + /** * Terminate processing (and cancel the upstream publisher) after the given * number of elements. Due to input buffering some elements may have been diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala index dee90c8b19..3287666e7d 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala @@ -420,6 +420,26 @@ trait FlowOps[+Out, +Mat] { */ def recover[T >: Out](pf: PartialFunction[Throwable, T]): Repr[T] = andThen(Recover(pf)) + /** + * RecoverWith allows to switch to alternative Source on flow failure. It will stay in effect after + * a failure has been recovered so that each time there is a failure it is fed into the `pf` and a new + * Source may be materialized. + * Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements. + * This stage can recover the failure signal, but not the skipped elements, which will be dropped. + * + * '''Emits when''' element is available from the upstream or upstream is failed and element is available + * from alternative Source + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes or upstream failed with exception pf can handle + * + * '''Cancels when''' downstream cancels + * + */ + def recoverWith[T >: Out](pf: PartialFunction[Throwable, Graph[SourceShape[T], NotUsed]]): Repr[T] = + via(new RecoverWith(pf)) + /** * Transform this stream by applying the given function to each of the elements * as they pass through this processing step.