diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowRecoverWithSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowRecoverWithSpec.scala index e1fae1205c..f98c6c0b8b 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowRecoverWithSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowRecoverWithSpec.scala @@ -53,7 +53,7 @@ class FlowRecoverWithSpec extends AkkaSpec { .expectError(ex) } - "be able to recover with th same unmaterialized source if configured" in assertAllStagesStopped { + "be able to recover with the same unmaterialized source if configured" in assertAllStagesStopped { val src = Source(1 to 3).map { a ⇒ if (a == 3) throw ex else a } src.recoverWith { case t: Throwable ⇒ src } .runWith(TestSink.probe[Int]) @@ -99,7 +99,7 @@ class FlowRecoverWithSpec extends AkkaSpec { .expectComplete() } - "terminate with exception if altrnative source failed" in assertAllStagesStopped { + "terminate with exception if partial function fails to match after an alternative source failure" in assertAllStagesStopped { Source(1 to 3).map { a ⇒ if (a == 3) throw new IndexOutOfBoundsException() else a } .recoverWith { case t: IndexOutOfBoundsException ⇒ @@ -112,5 +112,23 @@ class FlowRecoverWithSpec extends AkkaSpec { .request(1) .expectError(ex) } + + "terminate with exception after set number of retries" in assertAllStagesStopped { + Source(1 to 3).map { a ⇒ if (a == 3) throw new IndexOutOfBoundsException() else a } + .recoverWithRetries(3, { + case t: Throwable ⇒ + Source(List(11, 22)).concat(Source.failed(ex)) + }).runWith(TestSink.probe[Int]) + .request(2) + .expectNextN(List(1, 2)) + .request(2) + .expectNextN(List(11, 22)) + .request(2) + .expectNextN(List(11, 22)) + .request(2) + .expectNextN(List(11, 22)) + .request(1) + .expectError(ex) + } } } diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala index e16fae74a2..3dcf9e0e2f 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala @@ -1239,10 +1239,16 @@ private[stream] final class Reduce[T](f: (T, T) ⇒ T) extends SimpleLinearGraph /** * INTERNAL API */ -private[stream] final class RecoverWith[T, M](pf: PartialFunction[Throwable, Graph[SourceShape[T], M]]) extends SimpleLinearGraphStage[T] { +private[stream] object RecoverWith { + val InfiniteRetries = -1 +} + +private[stream] final class RecoverWith[T, M](maximumRetries: Int, pf: PartialFunction[Throwable, Graph[SourceShape[T], M]]) extends SimpleLinearGraphStage[T] { override def initialAttributes = DefaultAttributes.recoverWith override def createLogic(attr: Attributes) = new GraphStageLogic(shape) { + var attempt = 0 + setHandler(in, new InHandler { override def onPush(): Unit = push(out, grab(in)) override def onUpstreamFailure(ex: Throwable) = onFailure(ex) @@ -1252,7 +1258,13 @@ private[stream] final class RecoverWith[T, M](pf: PartialFunction[Throwable, Gra override def onPull(): Unit = pull(in) }) - def onFailure(ex: Throwable) = if (pf.isDefinedAt(ex)) switchTo(pf(ex)) else failStage(ex) + def onFailure(ex: Throwable) = + if ((maximumRetries == RecoverWith.InfiniteRetries || attempt < maximumRetries) && pf.isDefinedAt(ex)) { + switchTo(pf(ex)) + attempt += 1 + } + else + failStage(ex) def switchTo(source: Graph[SourceShape[T], M]): Unit = { val sinkIn = new SubSinkInlet[T]("RecoverWithSink") diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala index f1973402cf..fe3ce1dded 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala @@ -795,9 +795,32 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends * '''Cancels when''' downstream cancels * */ + @deprecated("Use recoverWithRetries instead.", "2.4.4") def recoverWith[T >: Out](pf: PartialFunction[Throwable, _ <: Graph[SourceShape[T], NotUsed]]): javadsl.Flow[In, T, Mat @uncheckedVariance] = new Flow(delegate.recoverWith(pf)) + /** + * RecoverWithRetries allows to switch to alternative Source on flow failure. It will stay in effect after + * a failure has been recovered up to `attempts` number of times so that each time there is a failure + * it is fed into the `pf` and a new Source may be materialized. Note that if you pass in 0, this won't + * attempt to recover at all. Passing in a negative number will behave exactly the same as `recoverWith`. + * + * Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements. + * This stage can recover the failure signal, but not the skipped elements, which will be dropped. + * + * '''Emits when''' element is available from the upstream or upstream is failed and element is available + * from alternative Source + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes or upstream failed with exception pf can handle + * + * '''Cancels when''' downstream cancels + * + */ + def recoverWithRetries[T >: Out](attempts: Int, pf: PartialFunction[Throwable, _ <: Graph[SourceShape[T], NotUsed]]): javadsl.Flow[In, T, Mat @uncheckedVariance] = + new Flow(delegate.recoverWithRetries(attempts, pf)) + /** * Terminate processing (and cancel the upstream publisher) after the given * number of elements. Due to input buffering some elements may have been diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala index 4da17ced0e..ee5964a973 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala @@ -818,6 +818,7 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap * * '''Cancels when''' downstream cancels */ + @deprecated("Use recoverWithRetries instead.", "2.4.4") def recover[T >: Out](pf: PartialFunction[Throwable, T]): javadsl.Source[T, Mat] = new Source(delegate.recover(pf)) @@ -842,6 +843,28 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap def recoverWith[T >: Out](pf: PartialFunction[Throwable, _ <: Graph[SourceShape[T], NotUsed]]): Source[T, Mat @uncheckedVariance] = new Source(delegate.recoverWith(pf)) + + /** + * RecoverWithRetries allows to switch to alternative Source on flow failure. It will stay in effect after + * a failure has been recovered up to `attempts` number of times so that each time there is a failure + * it is fed into the `pf` and a new Source may be materialized. Note that if you pass in 0, this won't + * attempt to recover at all. Passing in a negative number will behave exactly the same as `recoverWith`. + * + * Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements. + * This stage can recover the failure signal, but not the skipped elements, which will be dropped. + * + * '''Emits when''' element is available from the upstream or upstream is failed and element is available + * from alternative Source + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes or upstream failed with exception pf can handle + * + * '''Cancels when''' downstream cancels + * + */ + def recoverWithRetries[T >: Out](attempts: Int, pf: PartialFunction[Throwable, _ <: Graph[SourceShape[T], NotUsed]]): Source[T, Mat @uncheckedVariance] = + new Source(delegate.recoverWithRetries(attempts, pf)) /** * Transform each input element into an `Iterable` of output elements that is * then flattened into the output stream. diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala index c0cde79c27..76efdbc8a7 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala @@ -629,9 +629,34 @@ class SubFlow[-In, +Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Flo * '''Cancels when''' downstream cancels * */ + @deprecated("Use recoverWithRetries instead.", "2.4.4") def recoverWith[T >: Out](pf: PartialFunction[Throwable, _ <: Graph[SourceShape[T], NotUsed]]): SubFlow[In, T, Mat @uncheckedVariance] = new SubFlow(delegate.recoverWith(pf)) + /** + * RecoverWithRetries allows to switch to alternative Source on flow failure. It will stay in effect after + * a failure has been recovered up to `attempts` number of times so that each time there is a failure + * it is fed into the `pf` and a new Source may be materialized. Note that if you pass in 0, this won't + * attempt to recover at all. Passing in a negative number will behave exactly the same as `recoverWith`. + * + * Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements. + * This stage can recover the failure signal, but not the skipped elements, which will be dropped. + * + * '''Emits when''' element is available from the upstream or upstream is failed and element is available + * from alternative Source + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes or upstream failed with exception pf can handle + * + * '''Cancels when''' downstream cancels + * + */ + def recoverWithRetries[T >: Out](attempts: Int, pf: PartialFunction[Throwable, _ <: Graph[SourceShape[T], NotUsed]]): SubFlow[In, T, Mat @uncheckedVariance] = + new SubFlow(delegate.recoverWithRetries(attempts, pf)) + + + /** * Terminate processing (and cancel the upstream publisher) after the given * number of elements. Due to input buffering some elements may have been diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala b/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala index cf69ad3abd..fcd153e2a0 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala @@ -627,9 +627,32 @@ class SubSource[+Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source * '''Cancels when''' downstream cancels * */ + @deprecated("Use recoverWithRetries instead.", "2.4.4") def recoverWith[T >: Out](pf: PartialFunction[Throwable, _ <: Graph[SourceShape[T], NotUsed]]): SubSource[T, Mat @uncheckedVariance] = new SubSource(delegate.recoverWith(pf)) + /** + * RecoverWithRetries allows to switch to alternative Source on flow failure. It will stay in effect after + * a failure has been recovered up to `attempts` number of times so that each time there is a failure + * it is fed into the `pf` and a new Source may be materialized. Note that if you pass in 0, this won't + * attempt to recover at all. Passing in a negative number will behave exactly the same as `recoverWith`. + * + * Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements. + * This stage can recover the failure signal, but not the skipped elements, which will be dropped. + * + * '''Emits when''' element is available from the upstream or upstream is failed and element is available + * from alternative Source + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes or upstream failed with exception pf can handle + * + * '''Cancels when''' downstream cancels + * + */ + def recoverWithRetries[T >: Out](attempts: Int, pf: PartialFunction[Throwable, _ <: Graph[SourceShape[T], NotUsed]]): SubSource[T, Mat @uncheckedVariance] = + new SubSource(delegate.recoverWithRetries(attempts, pf)) + /** * Terminate processing (and cancel the upstream publisher) after the given * number of elements. Due to input buffering some elements may have been diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala index 5c6adafeb5..e6761c8710 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala @@ -445,8 +445,31 @@ trait FlowOps[+Out, +Mat] { * '''Cancels when''' downstream cancels * */ + @deprecated("Use recoverWithRetries instead.", "2.4.4") def recoverWith[T >: Out](pf: PartialFunction[Throwable, Graph[SourceShape[T], NotUsed]]): Repr[T] = - via(new RecoverWith(pf)) + via(new RecoverWith(-1, pf)) + + /** + * RecoverWithRetries allows to switch to alternative Source on flow failure. It will stay in effect after + * a failure has been recovered up to `attempts` number of times so that each time there is a failure + * it is fed into the `pf` and a new Source may be materialized. Note that if you pass in 0, this won't + * attempt to recover at all. Passing in a negative number will behave exactly the same as `recoverWith`. + * + * Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements. + * This stage can recover the failure signal, but not the skipped elements, which will be dropped. + * + * '''Emits when''' element is available from the upstream or upstream is failed and element is available + * from alternative Source + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes or upstream failed with exception pf can handle + * + * '''Cancels when''' downstream cancels + * + */ + def recoverWithRetries[T >: Out](attempts: Int, pf: PartialFunction[Throwable, Graph[SourceShape[T], NotUsed]]): Repr[T] = + via(new RecoverWith(attempts, pf)) /** * Transform this stream by applying the given function to each of the elements diff --git a/project/MiMa.scala b/project/MiMa.scala index ed915dd970..6487630f79 100644 --- a/project/MiMa.scala +++ b/project/MiMa.scala @@ -738,8 +738,11 @@ object MiMa extends AutoPlugin { ProblemFilters.exclude[MissingClassProblem]("akka.http.impl.engine.client.PoolGateway$Running"), ProblemFilters.exclude[MissingClassProblem]("akka.http.impl.engine.client.PoolGateway$IsShutdown"), ProblemFilters.exclude[MissingClassProblem]("akka.http.impl.engine.client.PoolGateway$NewIncarnation"), - ProblemFilters.exclude[MissingClassProblem]("akka.http.impl.engine.client.PoolGateway$State") - ) + ProblemFilters.exclude[MissingClassProblem]("akka.http.impl.engine.client.PoolGateway$State"), + + // #20123 + ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.scaladsl.FlowOps.recoverWithRetries") + ) ) } }