diff --git a/akka-docs/src/main/paradox/scala/stream/stages-overview.md b/akka-docs/src/main/paradox/scala/stream/stages-overview.md index 0ebafc4a2a..72b901d42f 100644 --- a/akka-docs/src/main/paradox/scala/stream/stages-overview.md +++ b/akka-docs/src/main/paradox/scala/stream/stages-overview.md @@ -939,7 +939,7 @@ Throwing an exception inside `recoverWith` _will_ be logged on ERROR level autom RecoverWithRetries allows to switch to alternative Source on flow failure. It will stay in effect after a failure has been recovered up to *attempts* number of times so that each time there is a failure it is fed into the *pf* and a new Source may be materialized. Note that if you pass in 0, this won't -attempt to recover at all. Passing -1 will behave exactly the same as *recoverWith*. +attempt to recover at all. A negative `attempts` number is interpreted as "infinite", which results in the exact same behavior as `recoverWith`. Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements. This stage can recover the failure signal, but not the skipped elements, which will be dropped. diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowRecoverWithSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowRecoverWithSpec.scala index e3813f8e7d..a909a7816f 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowRecoverWithSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowRecoverWithSpec.scala @@ -128,10 +128,35 @@ class FlowRecoverWithSpec extends StreamSpec { .expectError(ex) } - "throw IllegalArgumentException if number of retries is less than -1" in assertAllStagesStopped { - intercept[IllegalArgumentException] { - Flow[Int].recoverWithRetries(-2, { case t: Throwable ⇒ Source.empty[Int] }) - } + "not attempt recovering when attempts is zero" in assertAllStagesStopped { + Source(1 to 3).map { a ⇒ if (a == 3) throw ex else a } + .recoverWithRetries(0, { case t: Throwable ⇒ Source(List(22, 33)) }) + .runWith(TestSink.probe[Int]) + .request(100) + .expectNextN(List(1, 2)) + .expectError(ex) + } + + "recover infinitely when negative (-1) number of attempts given" in assertAllStagesStopped { + val oneThenBoom = Source(1 to 2).map { a ⇒ if (a == 2) throw ex else a } + + oneThenBoom + .recoverWithRetries(-1, { case t: Throwable ⇒ oneThenBoom }) + .runWith(TestSink.probe[Int]) + .request(5) + .expectNextN(List(1, 2, 3, 4, 5).map(_ ⇒ 1)) + .cancel() + } + + "recover infinitely when negative (smaller than -1) number of attempts given" in assertAllStagesStopped { + val oneThenBoom = Source(1 to 2).map { a ⇒ if (a == 2) throw ex else a } + + oneThenBoom + .recoverWithRetries(-10, { case t: Throwable ⇒ oneThenBoom }) + .runWith(TestSink.probe[Int]) + .request(5) + .expectNextN(List(1, 2, 3, 4, 5).map(_ ⇒ 1)) + .cancel() } "fail correctly when materialization of recover source fails" in assertAllStagesStopped { diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala index f1af46a881..00d65a1c9b 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala @@ -1768,12 +1768,9 @@ private[stream] object Collect { /** * INTERNAL API */ -@InternalApi private[stream] object RecoverWith { - val InfiniteRetries = -1 -} +@InternalApi private[stream] object RecoverWith @InternalApi private[akka] final class RecoverWith[T, M](val maximumRetries: Int, val pf: PartialFunction[Throwable, Graph[SourceShape[T], M]]) extends SimpleLinearGraphStage[T] { - require(maximumRetries >= -1, "number of retries must be non-negative or equal to -1") override def initialAttributes = DefaultAttributes.recoverWith @@ -1791,7 +1788,7 @@ private[stream] object Collect { }) def onFailure(ex: Throwable) = - if ((maximumRetries == RecoverWith.InfiniteRetries || attempt < maximumRetries) && pf.isDefinedAt(ex)) { + if ((maximumRetries < 0 || attempt < maximumRetries) && pf.isDefinedAt(ex)) { switchTo(pf(ex)) attempt += 1 } else diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala index 4153bd3351..3dacf70e17 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala @@ -999,7 +999,9 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends * RecoverWithRetries allows to switch to alternative Source on flow failure. It will stay in effect after * a failure has been recovered up to `attempts` number of times so that each time there is a failure * it is fed into the `pf` and a new Source may be materialized. Note that if you pass in 0, this won't - * attempt to recover at all. Passing in -1 will behave exactly the same as `recoverWith`. + * attempt to recover at all. + * + * A negative `attempts` number is interpreted as "infinite", which results in the exact same behavior as `recoverWith`. * * Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements. * This stage can recover the failure signal, but not the skipped elements, which will be dropped. diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala index 9fed0bfa93..72d5885060 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala @@ -1008,7 +1008,9 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap * RecoverWithRetries allows to switch to alternative Source on flow failure. It will stay in effect after * a failure has been recovered up to `attempts` number of times so that each time there is a failure * it is fed into the `pf` and a new Source may be materialized. Note that if you pass in 0, this won't - * attempt to recover at all. Passing in a negative number will behave exactly the same as `recoverWith`. + * attempt to recover at all. + * + * A negative `attempts` number is interpreted as "infinite", which results in the exact same behavior as `recoverWith`. * * Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements. * This stage can recover the failure signal, but not the skipped elements, which will be dropped. diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala index 57aa2cf385..cb5bd94418 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala @@ -729,7 +729,9 @@ class SubFlow[-In, +Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Flo * RecoverWithRetries allows to switch to alternative Source on flow failure. It will stay in effect after * a failure has been recovered up to `attempts` number of times so that each time there is a failure * it is fed into the `pf` and a new Source may be materialized. Note that if you pass in 0, this won't - * attempt to recover at all. Passing in a negative number will behave exactly the same as `recoverWith`. + * attempt to recover at all. + * + * A negative `attempts` number is interpreted as "infinite", which results in the exact same behavior as `recoverWith`. * * Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements. * This stage can recover the failure signal, but not the skipped elements, which will be dropped. diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala b/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala index d871b04ce7..f9037a83a8 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala @@ -725,7 +725,9 @@ class SubSource[+Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source * RecoverWithRetries allows to switch to alternative Source on flow failure. It will stay in effect after * a failure has been recovered up to `attempts` number of times so that each time there is a failure * it is fed into the `pf` and a new Source may be materialized. Note that if you pass in 0, this won't - * attempt to recover at all. Passing in a negative number will behave exactly the same as `recoverWith`. + * attempt to recover at all. + * + * A negative `attempts` number is interpreted as "infinite", which results in the exact same behavior as `recoverWith`. * * Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements. * This stage can recover the failure signal, but not the skipped elements, which will be dropped. diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala index 7e17fef307..3c07d6e448 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala @@ -536,7 +536,9 @@ trait FlowOps[+Out, +Mat] { * RecoverWithRetries allows to switch to alternative Source on flow failure. It will stay in effect after * a failure has been recovered up to `attempts` number of times so that each time there is a failure * it is fed into the `pf` and a new Source may be materialized. Note that if you pass in 0, this won't - * attempt to recover at all. Passing -1 will behave exactly the same as `recoverWith`. + * attempt to recover at all. + * + * A negative `attempts` number is interpreted as "infinite", which results in the exact same behavior as `recoverWith`. * * Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements. * This stage can recover the failure signal, but not the skipped elements, which will be dropped. diff --git a/project/MiMa.scala b/project/MiMa.scala index d2b513fa8f..1c3c47f1d5 100644 --- a/project/MiMa.scala +++ b/project/MiMa.scala @@ -1216,7 +1216,10 @@ object MiMa extends AutoPlugin { ProblemFilters.exclude[DirectMissingMethodProblem]("akka.pattern.BackoffOptionsImpl.apply"), ProblemFilters.exclude[DirectMissingMethodProblem]("akka.pattern.BackoffOnRestartSupervisor.this"), ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.pattern.HandleBackoff.replyWhileStopped"), - ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.pattern.BackoffOptions.withReplyWhileStopped") + ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.pattern.BackoffOptions.withReplyWhileStopped"), + + // #23144 recoverWithRetries cleanup + ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.fusing.RecoverWith.InfiniteRetries") ) )