This commit is contained in:
parent
acb71ac4e5
commit
e0c64f59d8
4 changed files with 16 additions and 2 deletions
|
|
@ -130,5 +130,11 @@ class FlowRecoverWithSpec extends AkkaSpec {
|
|||
.request(1)
|
||||
.expectError(ex)
|
||||
}
|
||||
|
||||
"throw IllegalArgumentException if number of retries is less than -1" in assertAllStagesStopped {
|
||||
intercept[IllegalArgumentException] {
|
||||
Flow[Int].recoverWithRetries(-2, { case t: Throwable ⇒ Source.empty[Int] })
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1259,6 +1259,7 @@ private[stream] object RecoverWith {
|
|||
}
|
||||
|
||||
private[stream] final class RecoverWith[T, M](maximumRetries: Int, pf: PartialFunction[Throwable, Graph[SourceShape[T], M]]) extends SimpleLinearGraphStage[T] {
|
||||
require(maximumRetries >= -1, "number of retries must be non-negative or equal to -1")
|
||||
override def initialAttributes = DefaultAttributes.recoverWith
|
||||
|
||||
override def createLogic(attr: Attributes) = new GraphStageLogic(shape) {
|
||||
|
|
|
|||
|
|
@ -803,7 +803,7 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends
|
|||
* RecoverWithRetries allows to switch to alternative Source on flow failure. It will stay in effect after
|
||||
* a failure has been recovered up to `attempts` number of times so that each time there is a failure
|
||||
* it is fed into the `pf` and a new Source may be materialized. Note that if you pass in 0, this won't
|
||||
* attempt to recover at all. Passing in a negative number will behave exactly the same as `recoverWith`.
|
||||
* attempt to recover at all. Passing in -1 will behave exactly the same as `recoverWith`.
|
||||
*
|
||||
* Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements.
|
||||
* This stage can recover the failure signal, but not the skipped elements, which will be dropped.
|
||||
|
|
@ -817,6 +817,9 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends
|
|||
*
|
||||
* '''Cancels when''' downstream cancels
|
||||
*
|
||||
* @param attempts Maximum number of retries or -1 to retry indefinitely
|
||||
* @param pf Receives the failure cause and returns the new Source to be materialized if any
|
||||
* @throws IllegalArgumentException if `attempts` is a negative number other than -1
|
||||
*/
|
||||
def recoverWithRetries[T >: Out](attempts: Int, pf: PartialFunction[Throwable, _ <: Graph[SourceShape[T], NotUsed]]): javadsl.Flow[In, T, Mat @uncheckedVariance] =
|
||||
new Flow(delegate.recoverWithRetries(attempts, pf))
|
||||
|
|
|
|||
|
|
@ -453,7 +453,7 @@ trait FlowOps[+Out, +Mat] {
|
|||
* RecoverWithRetries allows to switch to alternative Source on flow failure. It will stay in effect after
|
||||
* a failure has been recovered up to `attempts` number of times so that each time there is a failure
|
||||
* it is fed into the `pf` and a new Source may be materialized. Note that if you pass in 0, this won't
|
||||
* attempt to recover at all. Passing in a negative number will behave exactly the same as `recoverWith`.
|
||||
* attempt to recover at all. Passing -1 will behave exactly the same as `recoverWith`.
|
||||
*
|
||||
* Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements.
|
||||
* This stage can recover the failure signal, but not the skipped elements, which will be dropped.
|
||||
|
|
@ -467,6 +467,10 @@ trait FlowOps[+Out, +Mat] {
|
|||
*
|
||||
* '''Cancels when''' downstream cancels
|
||||
*
|
||||
* @param attempts Maximum number of retries or -1 to retry indefinitely
|
||||
* @param pf Receives the failure cause and returns the new Source to be materialized if any
|
||||
* @throws IllegalArgumentException if `attempts` is a negative number other than -1
|
||||
*
|
||||
*/
|
||||
def recoverWithRetries[T >: Out](attempts: Int, pf: PartialFunction[Throwable, Graph[SourceShape[T], NotUsed]]): Repr[T] =
|
||||
via(new RecoverWith(attempts, pf))
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue