=str #22917 recoverWithRetries allow 0 and negative values

This commit is contained in:
Konrad `ktoso` Malawski 2017-06-13 18:18:46 +09:00 committed by Johan Andrén
parent ee79a3d1a8
commit b7d7316c1c
9 changed files with 51 additions and 16 deletions

View file

@ -939,7 +939,7 @@ Throwing an exception inside `recoverWith` _will_ be logged on ERROR level autom
RecoverWithRetries allows to switch to alternative Source on flow failure. It will stay in effect after
a failure has been recovered up to *attempts* number of times so that each time there is a failure
it is fed into the *pf* and a new Source may be materialized. Note that if you pass in 0, this won't
attempt to recover at all. Passing -1 will behave exactly the same as *recoverWith*.
attempt to recover at all. A negative `attempts` number is interpreted as "infinite", which results in the exact same behavior as `recoverWith`.
Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements.
This stage can recover the failure signal, but not the skipped elements, which will be dropped.

View file

@ -128,10 +128,35 @@ class FlowRecoverWithSpec extends StreamSpec {
.expectError(ex)
}
"throw IllegalArgumentException if number of retries is less than -1" in assertAllStagesStopped {
intercept[IllegalArgumentException] {
Flow[Int].recoverWithRetries(-2, { case t: Throwable Source.empty[Int] })
}
"not attempt recovering when attempts is zero" in assertAllStagesStopped {
Source(1 to 3).map { a if (a == 3) throw ex else a }
.recoverWithRetries(0, { case t: Throwable Source(List(22, 33)) })
.runWith(TestSink.probe[Int])
.request(100)
.expectNextN(List(1, 2))
.expectError(ex)
}
"recover infinitely when negative (-1) number of attempts given" in assertAllStagesStopped {
val oneThenBoom = Source(1 to 2).map { a if (a == 2) throw ex else a }
oneThenBoom
.recoverWithRetries(-1, { case t: Throwable oneThenBoom })
.runWith(TestSink.probe[Int])
.request(5)
.expectNextN(List(1, 2, 3, 4, 5).map(_ 1))
.cancel()
}
"recover infinitely when negative (smaller than -1) number of attempts given" in assertAllStagesStopped {
val oneThenBoom = Source(1 to 2).map { a if (a == 2) throw ex else a }
oneThenBoom
.recoverWithRetries(-10, { case t: Throwable oneThenBoom })
.runWith(TestSink.probe[Int])
.request(5)
.expectNextN(List(1, 2, 3, 4, 5).map(_ 1))
.cancel()
}
"fail correctly when materialization of recover source fails" in assertAllStagesStopped {

View file

@ -1768,12 +1768,9 @@ private[stream] object Collect {
/**
* INTERNAL API
*/
@InternalApi private[stream] object RecoverWith {
val InfiniteRetries = -1
}
@InternalApi private[stream] object RecoverWith
@InternalApi private[akka] final class RecoverWith[T, M](val maximumRetries: Int, val pf: PartialFunction[Throwable, Graph[SourceShape[T], M]]) extends SimpleLinearGraphStage[T] {
require(maximumRetries >= -1, "number of retries must be non-negative or equal to -1")
override def initialAttributes = DefaultAttributes.recoverWith
@ -1791,7 +1788,7 @@ private[stream] object Collect {
})
def onFailure(ex: Throwable) =
if ((maximumRetries == RecoverWith.InfiniteRetries || attempt < maximumRetries) && pf.isDefinedAt(ex)) {
if ((maximumRetries < 0 || attempt < maximumRetries) && pf.isDefinedAt(ex)) {
switchTo(pf(ex))
attempt += 1
} else

View file

@ -999,7 +999,9 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends
* RecoverWithRetries allows to switch to alternative Source on flow failure. It will stay in effect after
* a failure has been recovered up to `attempts` number of times so that each time there is a failure
* it is fed into the `pf` and a new Source may be materialized. Note that if you pass in 0, this won't
* attempt to recover at all. Passing in -1 will behave exactly the same as `recoverWith`.
* attempt to recover at all.
*
* A negative `attempts` number is interpreted as "infinite", which results in the exact same behavior as `recoverWith`.
*
* Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements.
* This stage can recover the failure signal, but not the skipped elements, which will be dropped.

View file

@ -1008,7 +1008,9 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap
* RecoverWithRetries allows to switch to alternative Source on flow failure. It will stay in effect after
* a failure has been recovered up to `attempts` number of times so that each time there is a failure
* it is fed into the `pf` and a new Source may be materialized. Note that if you pass in 0, this won't
* attempt to recover at all. Passing in a negative number will behave exactly the same as `recoverWith`.
* attempt to recover at all.
*
* A negative `attempts` number is interpreted as "infinite", which results in the exact same behavior as `recoverWith`.
*
* Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements.
* This stage can recover the failure signal, but not the skipped elements, which will be dropped.

View file

@ -729,7 +729,9 @@ class SubFlow[-In, +Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Flo
* RecoverWithRetries allows to switch to alternative Source on flow failure. It will stay in effect after
* a failure has been recovered up to `attempts` number of times so that each time there is a failure
* it is fed into the `pf` and a new Source may be materialized. Note that if you pass in 0, this won't
* attempt to recover at all. Passing in a negative number will behave exactly the same as `recoverWith`.
* attempt to recover at all.
*
* A negative `attempts` number is interpreted as "infinite", which results in the exact same behavior as `recoverWith`.
*
* Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements.
* This stage can recover the failure signal, but not the skipped elements, which will be dropped.

View file

@ -725,7 +725,9 @@ class SubSource[+Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source
* RecoverWithRetries allows to switch to alternative Source on flow failure. It will stay in effect after
* a failure has been recovered up to `attempts` number of times so that each time there is a failure
* it is fed into the `pf` and a new Source may be materialized. Note that if you pass in 0, this won't
* attempt to recover at all. Passing in a negative number will behave exactly the same as `recoverWith`.
* attempt to recover at all.
*
* A negative `attempts` number is interpreted as "infinite", which results in the exact same behavior as `recoverWith`.
*
* Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements.
* This stage can recover the failure signal, but not the skipped elements, which will be dropped.

View file

@ -536,7 +536,9 @@ trait FlowOps[+Out, +Mat] {
* RecoverWithRetries allows to switch to alternative Source on flow failure. It will stay in effect after
* a failure has been recovered up to `attempts` number of times so that each time there is a failure
* it is fed into the `pf` and a new Source may be materialized. Note that if you pass in 0, this won't
* attempt to recover at all. Passing -1 will behave exactly the same as `recoverWith`.
* attempt to recover at all.
*
* A negative `attempts` number is interpreted as "infinite", which results in the exact same behavior as `recoverWith`.
*
* Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements.
* This stage can recover the failure signal, but not the skipped elements, which will be dropped.

View file

@ -1216,7 +1216,10 @@ object MiMa extends AutoPlugin {
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.pattern.BackoffOptionsImpl.apply"),
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.pattern.BackoffOnRestartSupervisor.this"),
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.pattern.HandleBackoff.replyWhileStopped"),
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.pattern.BackoffOptions.withReplyWhileStopped")
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.pattern.BackoffOptions.withReplyWhileStopped"),
// #23144 recoverWithRetries cleanup
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.fusing.RecoverWith.InfiniteRetries")
)
)