diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowRecoverWithSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowRecoverWithSpec.scala index 942333be7a..ccdd739a8a 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowRecoverWithSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowRecoverWithSpec.scala @@ -130,5 +130,11 @@ class FlowRecoverWithSpec extends AkkaSpec { .request(1) .expectError(ex) } + + "throw IllegalArgumentException if number of retries is less than -1" in assertAllStagesStopped { + intercept[IllegalArgumentException] { + Flow[Int].recoverWithRetries(-2, { case t: Throwable ⇒ Source.empty[Int] }) + } + } } } diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala index 3a9db78910..017a1d3da6 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala @@ -1259,6 +1259,7 @@ private[stream] object RecoverWith { } private[stream] final class RecoverWith[T, M](maximumRetries: Int, pf: PartialFunction[Throwable, Graph[SourceShape[T], M]]) extends SimpleLinearGraphStage[T] { + require(maximumRetries >= -1, "number of retries must be non-negative or equal to -1") override def initialAttributes = DefaultAttributes.recoverWith override def createLogic(attr: Attributes) = new GraphStageLogic(shape) { diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala index 32524be0f5..bf896f55e6 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala @@ -803,7 +803,7 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends * RecoverWithRetries allows to switch to alternative Source on flow failure. It will stay in effect after * a failure has been recovered up to `attempts` number of times so that each time there is a failure * it is fed into the `pf` and a new Source may be materialized. Note that if you pass in 0, this won't - * attempt to recover at all. Passing in a negative number will behave exactly the same as `recoverWith`. + * attempt to recover at all. Passing in -1 will behave exactly the same as `recoverWith`. * * Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements. * This stage can recover the failure signal, but not the skipped elements, which will be dropped. @@ -817,6 +817,9 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends * * '''Cancels when''' downstream cancels * + * @param attempts Maximum number of retries or -1 to retry indefinitely + * @param pf Receives the failure cause and returns the new Source to be materialized if any + * @throws IllegalArgumentException if `attempts` is a negative number other than -1 */ def recoverWithRetries[T >: Out](attempts: Int, pf: PartialFunction[Throwable, _ <: Graph[SourceShape[T], NotUsed]]): javadsl.Flow[In, T, Mat @uncheckedVariance] = new Flow(delegate.recoverWithRetries(attempts, pf)) diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala index 5d034e905d..2551439d63 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala @@ -453,7 +453,7 @@ trait FlowOps[+Out, +Mat] { * RecoverWithRetries allows to switch to alternative Source on flow failure. It will stay in effect after * a failure has been recovered up to `attempts` number of times so that each time there is a failure * it is fed into the `pf` and a new Source may be materialized. Note that if you pass in 0, this won't - * attempt to recover at all. Passing in a negative number will behave exactly the same as `recoverWith`. + * attempt to recover at all. Passing -1 will behave exactly the same as `recoverWith`. * * Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements. * This stage can recover the failure signal, but not the skipped elements, which will be dropped. @@ -467,6 +467,10 @@ trait FlowOps[+Out, +Mat] { * * '''Cancels when''' downstream cancels * + * @param attempts Maximum number of retries or -1 to retry indefinitely + * @param pf Receives the failure cause and returns the new Source to be materialized if any + * @throws IllegalArgumentException if `attempts` is a negative number other than -1 + * */ def recoverWithRetries[T >: Out](attempts: Int, pf: PartialFunction[Throwable, Graph[SourceShape[T], NotUsed]]): Repr[T] = via(new RecoverWith(attempts, pf))