From 77532c1bb5d4e8078b3e031ecac03697edbc58fc Mon Sep 17 00:00:00 2001 From: "He-Pin(kerr)" Date: Tue, 23 Apr 2024 20:34:16 +0800 Subject: [PATCH] feat: Add retry with predicate (#1269) --- .../org/apache/pekko/pattern/RetrySpec.scala | 34 +++ .../org/apache/pekko/pattern/Patterns.scala | 244 ++++++++++++++++- .../apache/pekko/pattern/RetrySupport.scala | 246 ++++++++++++++++-- .../test/java/jdocs/future/FutureDocTest.java | 13 + 4 files changed, 500 insertions(+), 37 deletions(-) diff --git a/actor-tests/src/test/scala/org/apache/pekko/pattern/RetrySpec.scala b/actor-tests/src/test/scala/org/apache/pekko/pattern/RetrySpec.scala index 607ed61d65..5aa9e093c3 100644 --- a/actor-tests/src/test/scala/org/apache/pekko/pattern/RetrySpec.scala +++ b/actor-tests/src/test/scala/org/apache/pekko/pattern/RetrySpec.scala @@ -22,6 +22,8 @@ import org.apache.pekko import pekko.actor.Scheduler import pekko.testkit.PekkoSpec +import java.util.concurrent.atomic.AtomicInteger + class RetrySpec extends PekkoSpec with RetrySupport { implicit val ec: ExecutionContextExecutor = system.dispatcher implicit val scheduler: Scheduler = system.scheduler @@ -152,6 +154,38 @@ class RetrySpec extends PekkoSpec with RetrySupport { Await.result(retried, remaining) should ===(5) } } + + "be able to retry with predicate for value" in { + val counter = new AtomicInteger(0) + def attempt(): Future[Int] = { + Future.successful(counter.incrementAndGet()) + } + + val retried = retry(() => attempt(), (t: Int, _) => t < 5, 10, 100 milliseconds) + + within(3 seconds) { + Await.result(retried, remaining) should ===(5) + } + } + + "be able to skip retry with predicate for exception" in { + val counter = new AtomicInteger(0) + + def attempt(): Future[Int] = { + counter.incrementAndGet() + // should not retry on this exception + Future.failed(new IllegalArgumentException()) + } + + val retried = + retry(() => attempt(), (_: Int, e) => !e.isInstanceOf[IllegalArgumentException], 10, 100 milliseconds) + + within(3 seconds) { + retried.failed.futureValue shouldBe an[IllegalArgumentException] + counter.get() should ===(1) + } + } + } } diff --git a/actor/src/main/scala/org/apache/pekko/pattern/Patterns.scala b/actor/src/main/scala/org/apache/pekko/pattern/Patterns.scala index 09e90526e0..2b8759e026 100644 --- a/actor/src/main/scala/org/apache/pekko/pattern/Patterns.scala +++ b/actor/src/main/scala/org/apache/pekko/pattern/Patterns.scala @@ -15,6 +15,7 @@ package org.apache.pekko.pattern import java.util.Optional import java.util.concurrent.{ Callable, CompletionStage, TimeUnit } +import java.util.function.BiPredicate import scala.concurrent.ExecutionContext @@ -491,13 +492,42 @@ object Patterns { * * If attempts are exhausted the returned completion operator is simply the result of invoking attempt. * Note that the attempt function will be invoked on the given execution context for subsequent tries - * and therefore must be thread safe (not touch unsafe mutable state). + * and therefore must be thread safe (i.e. not touch unsafe mutable state). */ def retry[T](attempt: Callable[CompletionStage[T]], attempts: Int, ec: ExecutionContext): CompletionStage[T] = { require(attempt != null, "Parameter attempt should not be null.") scalaRetry(() => attempt.call().asScala, attempts)(ec).asJava } + /** + * Returns an internally retrying [[java.util.concurrent.CompletionStage]]. + * + * When the future is completed, the `shouldRetry` predicate is always been invoked with the result (or `null` if none) + * and the exception (or `null` if none). If the `shouldRetry` predicate returns true, then a new attempt is made, + * each subsequent attempt will be made after the 'delay' return by `delayFunction` (the input next attempt count start from 1). + * Return an empty [[Optional]] instance for no delay. + * + * If attempts are exhausted the returned completion operator is simply the result of invoking attempt. + * Note that the attempt function will be invoked on the given execution context for subsequent tries + * and therefore must be thread safe (i.e. not touch unsafe mutable state). + * + * @param attempt the function to be attempted + * @param shouldRetry the predicate to determine if the attempt should be retried + * @param attempts the maximum number of attempts + * @param ec the execution context + * @return the result [[java.util.concurrent.CompletionStage]] which maybe retried + * + * @since 1.1.0 + */ + def retry[T]( + attempt: Callable[CompletionStage[T]], + shouldRetry: BiPredicate[T, Throwable], + attempts: Int, + ec: ExecutionContext): CompletionStage[T] = { + require(attempt != null, "Parameter attempt should not be null.") + scalaRetry(() => attempt.call().asScala, (t, e) => shouldRetry.test(t, e), attempts)(ec).asJava + } + /** * Returns an internally retrying [[java.util.concurrent.CompletionStage]] * The first attempt will be made immediately, each subsequent attempt will be made with a backoff time, @@ -505,7 +535,7 @@ object Patterns { * * If attempts are exhausted the returned future is simply the result of invoking attempt. * Note that the attempt function will be invoked on the given execution context for subsequent tries and - * therefore must be thread safe (not touch unsafe mutable state). + * therefore must be thread safe (i.e. not touch unsafe mutable state). * * @param minBackoff minimum (initial) duration until the child actor will * started again, if it is terminated @@ -530,6 +560,50 @@ object Patterns { system.classicSystem.scheduler, system.classicSystem.dispatcher) + /** + * Returns an internally retrying [[java.util.concurrent.CompletionStage]]. + * + * When the future is completed, the `shouldRetry` predicate is always been invoked with the result (or `null` if none) + * and the exception (or `null` if none). If the `shouldRetry` predicate returns true, then a new attempt is made, + * each subsequent attempt will be made after the 'delay' return by `delayFunction` (the input next attempt count start from 1). + * Return an empty [[Optional]] instance for no delay. + * + * If attempts are exhausted the returned future is simply the result of invoking attempt. + * Note that the attempt function will be invoked on the given execution context for subsequent tries and + * therefore must be thread safe (i.e. not touch unsafe mutable state). + * + * @param attempt the function to be attempted + * @param shouldRetry the predicate to determine if the attempt should be retried + * @param attempts the maximum number of attempts + * @param minBackoff minimum (initial) duration until the child actor will + * started again, if it is terminated + * @param maxBackoff the exponential back-off is capped to this duration + * @param randomFactor after calculation of the exponential back-off an additional + * random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay. + * In order to skip this additional delay pass in `0`. + * @param system the actor system + * @return the result [[java.util.concurrent.CompletionStage]] which maybe retried + * + * @since 1.1.0 + */ + def retry[T]( + attempt: Callable[CompletionStage[T]], + shouldRetry: BiPredicate[T, Throwable], + attempts: Int, + minBackoff: java.time.Duration, + maxBackoff: java.time.Duration, + randomFactor: Double, + system: ClassicActorSystemProvider): CompletionStage[T] = + retry( + attempt, + shouldRetry, + attempts, + minBackoff, + maxBackoff, + randomFactor, + system.classicSystem.scheduler, + system.classicSystem.dispatcher) + /** * Returns an internally retrying [[java.util.concurrent.CompletionStage]] * The first attempt will be made immediately, each subsequent attempt will be made with a backoff time, @@ -537,7 +611,7 @@ object Patterns { * * If attempts are exhausted the returned future is simply the result of invoking attempt. * Note that the attempt function will be invoked on the given execution context for subsequent tries and - * therefore must be thread safe (not touch unsafe mutable state). + * therefore must be thread safe (i.e. not touch unsafe mutable state). * * @param minBackoff minimum (initial) duration until the child actor will * started again, if it is terminated @@ -562,6 +636,53 @@ object Patterns { scheduler).asJava } + /** + * Returns an internally retrying [[java.util.concurrent.CompletionStage]]. + * + * When the future is completed, the `shouldRetry` predicate is always been invoked with the result (or `null` if none) + * and the exception (or `null` if none). If the `shouldRetry` predicate returns true, then a new attempt is made, + * each subsequent attempt will be made after the 'delay' return by `delayFunction` (the input next attempt count start from 1). + * Return an empty [[Optional]] instance for no delay. + * + * If attempts are exhausted the returned future is simply the result of invoking attempt. + * Note that the attempt function will be invoked on the given execution context for subsequent tries and + * therefore must be thread safe (i.e. not touch unsafe mutable state). + * + * @param attempt the function to be attempted + * @param shouldRetry the predicate to determine if the attempt should be retried + * @param attempts the maximum number of attempts + * @param minBackoff minimum (initial) duration until the child actor will + * started again, if it is terminated + * @param maxBackoff the exponential back-off is capped to this duration + * @param randomFactor after calculation of the exponential back-off an additional + * random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay. + * In order to skip this additional delay pass in `0`. + * @param scheduler the scheduler for scheduling a delay + * @param ec the execution context + * @return the result [[java.util.concurrent.CompletionStage]] which maybe retried + * + * @since 1.1.0 + */ + def retry[T]( + attempt: Callable[CompletionStage[T]], + shouldRetry: BiPredicate[T, Throwable], + attempts: Int, + minBackoff: java.time.Duration, + maxBackoff: java.time.Duration, + randomFactor: Double, + scheduler: Scheduler, + ec: ExecutionContext): CompletionStage[T] = { + require(attempt != null, "Parameter attempt should not be null.") + require(minBackoff != null, "Parameter minBackoff should not be null.") + require(maxBackoff != null, "Parameter minBackoff should not be null.") + scalaRetry( + () => attempt.call().asScala, + (t, e) => shouldRetry.test(t, e), + attempts, minBackoff.asScala, maxBackoff.asScala, randomFactor)( + ec, + scheduler).asJava + } + /** * Returns an internally retrying [[scala.concurrent.Future]] * The first attempt will be made immediately, and each subsequent attempt will be made after 'delay'. @@ -569,7 +690,7 @@ object Patterns { * * If attempts are exhausted the returned future is simply the result of invoking attempt. * Note that the attempt function will be invoked on the given execution context for subsequent tries and - * therefore must be thread safe (not touch unsafe mutable state). + * therefore must be thread safe (i.e. not touch unsafe mutable state). */ def retry[T]( attempt: Callable[Future[T]], @@ -588,7 +709,7 @@ object Patterns { * * If attempts are exhausted the returned completion operator is simply the result of invoking attempt. * Note that the attempt function will be invoked on the given execution context for subsequent tries - * and therefore must be thread safe (not touch unsafe mutable state). + * and therefore must be thread safe (i.e. not touch unsafe mutable state). */ def retry[T]( attempt: Callable[CompletionStage[T]], @@ -597,6 +718,35 @@ object Patterns { system: ClassicActorSystemProvider): CompletionStage[T] = retry(attempt, attempts, delay, system.classicSystem.scheduler, system.classicSystem.dispatcher) + /** + * Returns an internally retrying [[java.util.concurrent.CompletionStage]]. + * + * When the future is completed, the `shouldRetry` predicate is always been invoked with the result (or `null` if none) + * and the exception (or `null` if none). If the `shouldRetry` predicate returns true, then a new attempt is made, + * each subsequent attempt will be made after the 'delay' return by `delayFunction` (the input next attempt count start from 1). + * Return an empty [[Optional]] instance for no delay. + * + * If attempts are exhausted the returned completion operator is simply the result of invoking attempt. + * Note that the attempt function will be invoked on the given execution context for subsequent tries + * and therefore must be thread safe (i.e. not touch unsafe mutable state). + * + * @param attempt the function to be attempted + * @param shouldRetry the predicate to determine if the attempt should be retried + * @param attempts the maximum number of attempts + * @param delay the delay between each attempt + * @param system the actor system + * @return the result [[java.util.concurrent.CompletionStage]] which maybe retried + * + * @since 1.1.0 + */ + def retry[T]( + attempt: Callable[CompletionStage[T]], + shouldRetry: BiPredicate[T, Throwable], + attempts: Int, + delay: java.time.Duration, + system: ClassicActorSystemProvider): CompletionStage[T] = + retry(attempt, shouldRetry, attempts, delay, system.classicSystem.scheduler, system.classicSystem.dispatcher) + /** * Returns an internally retrying [[java.util.concurrent.CompletionStage]] * The first attempt will be made immediately, and each subsequent attempt will be made after 'delay'. @@ -604,7 +754,7 @@ object Patterns { * * If attempts are exhausted the returned completion operator is simply the result of invoking attempt. * Note that the attempt function will be invoked on the given execution context for subsequent tries - * and therefore must be thread safe (not touch unsafe mutable state). + * and therefore must be thread safe (i.e. not touch unsafe mutable state). */ def retry[T]( attempt: Callable[CompletionStage[T]], @@ -616,10 +766,44 @@ object Patterns { scalaRetry(() => attempt.call().asScala, attempts, delay.asScala)(ec, scheduler).asJava } + /** + * Returns an internally retrying [[java.util.concurrent.CompletionStage]]. + * + * When the future is completed, the `shouldRetry` predicate is always been invoked with the result (or `null` if none) + * and the exception (or `null` if none). If the `shouldRetry` predicate returns true, then a new attempt is made, + * each subsequent attempt will be made after the 'delay' return by `delayFunction` (the input next attempt count start from 1). + * Return an empty [[Optional]] instance for no delay. + * + * If attempts are exhausted the returned completion operator is simply the result of invoking attempt. + * Note that the attempt function will be invoked on the given execution context for subsequent tries + * and therefore must be thread safe (i.e. not touch unsafe mutable state). + * + * @param attempt the function to be attempted + * @param shouldRetry the predicate to determine if the attempt should be retried + * @param attempts the maximum number of attempts + * @param delay the delay between each attempt + * @param scheduler the scheduler for scheduling a delay + * @param ec the execution context + * @return the result [[java.util.concurrent.CompletionStage]] which maybe retried * + * + * @since 1.1.0 + */ + def retry[T]( + attempt: Callable[CompletionStage[T]], + shouldRetry: BiPredicate[T, Throwable], + attempts: Int, + delay: java.time.Duration, + scheduler: Scheduler, + ec: ExecutionContext): CompletionStage[T] = { + require(attempt != null, "Parameter attempt should not be null.") + scalaRetry(() => attempt.call().asScala, (t, e) => shouldRetry.test(t, e), attempts, delay.asScala)(ec, + scheduler).asJava + } + /** * Returns an internally retrying [[java.util.concurrent.CompletionStage]]. * The first attempt will be made immediately, each subsequent attempt will be made after - * the 'delay' return by `delayFunction`(the input next attempt count start from 1). + * the 'delay' return by `delayFunction` (the input next attempt count start from 1). * Return an empty [[Optional]] instance for no delay. * A scheduler (eg context.system.scheduler) must be provided to delay each retry. * You could provide a function to generate the next delay duration after first attempt, @@ -627,7 +811,7 @@ object Patterns { * * If attempts are exhausted the returned future is simply the result of invoking attempt. * Note that the attempt function will be invoked on the given execution context for subsequent tries and - * therefore must be thread safe (not touch unsafe mutable state). + * therefore must be thread safe (i.e. not touch unsafe mutable state). */ def retry[T]( attempt: Callable[CompletionStage[T]], @@ -642,6 +826,48 @@ object Patterns { attempts, attempted => delayFunction.apply(attempted).toScala.map(_.asScala))(context, scheduler).asJava } + + /** + * Returns an internally retrying [[java.util.concurrent.CompletionStage]]. + * + * When the future is completed, the `shouldRetry` predicate is always been invoked with the result (or `null` if none) + * and the exception (or `null` if none). If the `shouldRetry` predicate returns true, then a new attempt is made, + * each subsequent attempt will be made after the 'delay' return by `delayFunction` (the input next attempt count start from 1). + * Return an empty [[Optional]] instance for no delay. + * + * A scheduler (eg context.system.scheduler) must be provided to delay each retry. + * You could provide a function to generate the next delay duration after first attempt, + * this function should never return `null`, otherwise an [[java.lang.IllegalArgumentException]] will be through. + * + * If attempts are exhausted the returned future is simply the result of invoking attempt. + * Note that the attempt function will be invoked on the given execution context for subsequent tries and + * therefore must be thread safe (i.e. not touch unsafe mutable state). + * + * @param attempt the function to be attempted + * @param shouldRetry the predicate to determine if the attempt should be retried + * @param attempts the maximum number of attempts + * @param delayFunction the function to generate the next delay duration, `None` for no delay + * @param scheduler the scheduler for scheduling a delay + * @param context the execution context + * @return the result [[java.util.concurrent.CompletionStage]] which maybe retried + * + * @since 1.1.0 + */ + def retry[T]( + attempt: Callable[CompletionStage[T]], + shouldRetry: BiPredicate[T, Throwable], + attempts: Int, + delayFunction: java.util.function.IntFunction[Optional[java.time.Duration]], + scheduler: Scheduler, + context: ExecutionContext): CompletionStage[T] = { + import pekko.util.OptionConverters._ + require(attempt != null, "Parameter attempt should not be null.") + scalaRetry( + () => attempt.call().asScala, + (t, e) => shouldRetry.test(t, e), + attempts, + attempted => delayFunction.apply(attempted).toScala.map(_.asScala))(context, scheduler).asJava + } } /** @@ -1077,7 +1303,7 @@ object PatternsCS { * A scheduler (eg context.system.scheduler) must be provided to delay each retry * If attempts are exhausted the returned completion operator is simply the result of invoking attempt. * Note that the attempt function will be invoked on the given execution context for subsequent tries - * and therefore must be thread safe (not touch unsafe mutable state). + * and therefore must be thread safe (i.e. not touch unsafe mutable state). */ @deprecated("Use Patterns.retry instead.", since = "Akka 2.5.19") def retry[T]( diff --git a/actor/src/main/scala/org/apache/pekko/pattern/RetrySupport.scala b/actor/src/main/scala/org/apache/pekko/pattern/RetrySupport.scala index 95bad97a73..2fccd0912e 100644 --- a/actor/src/main/scala/org/apache/pekko/pattern/RetrySupport.scala +++ b/actor/src/main/scala/org/apache/pekko/pattern/RetrySupport.scala @@ -15,6 +15,7 @@ package org.apache.pekko.pattern import scala.concurrent.{ ExecutionContext, Future } import scala.concurrent.duration.{ Duration, FiniteDuration } +import scala.util.{ Failure, Success } import scala.util.control.NonFatal import org.apache.pekko @@ -33,7 +34,7 @@ trait RetrySupport { * * If attempts are exhausted the returned future is simply the result of invoking attempt. * Note that the attempt function will be invoked on the given execution context for subsequent - * tries and therefore must be thread safe (not touch unsafe mutable state). + * tries and therefore must be thread safe (i.e. not touch unsafe mutable state). * * Example usage: * @@ -46,6 +47,40 @@ trait RetrySupport { RetrySupport.retry(attempt, attempts, attempted = 0) } + /** + * Given a function from Unit to Future, returns an internally retrying Future. + * + * When the future is completed, the `shouldRetry` predicate is always been invoked with the result (or `null` if none) + * and the exception (or `null` if none). If the `shouldRetry` predicate returns true, then a new attempt is made, + * each subsequent attempt will be made after the 'delay' return by `delayFunction` (the input next attempt count start from 1). + * Returns [[scala.None]] for no delay. + * + * If attempts are exhausted the returned future is simply the result of invoking attempt. + * Note that the attempt function will be invoked on the given execution context for subsequent + * tries and therefore must be thread safe (i.e. not touch unsafe mutable state). + * + * Example usage: + * + * {{{ + * def possiblyFailing(): Future[Something] = ??? + * val shouldRetry: (Something, Throwable) => throwable ne null + * val withRetry: Future[Something] = retry(attempt = possiblyFailing, shouldRetry, attempts = 10) + * }}} + * + * @param attempt the function to be attempted + * @param shouldRetry the predicate to determine if the attempt should be retried + * @param attempts the maximum number of attempts + * @param ec the execution context + * @return the result future which maybe retried + * + * @since 1.1.0 + */ + def retry[T](attempt: () => Future[T], + shouldRetry: (T, Throwable) => Boolean, + attempts: Int)(implicit ec: ExecutionContext): Future[T] = { + RetrySupport.retry(attempt, shouldRetry, attempts, ConstantFun.scalaAnyToNone, attempted = 0)(ec, null) + } + /** * Given a function from Unit to Future, returns an internally retrying Future. * The first attempt will be made immediately, each subsequent attempt will be made with a backoff time, @@ -53,7 +88,7 @@ trait RetrySupport { * * If attempts are exhausted the returned future is simply the result of invoking attempt. * Note that the attempt function will be invoked on the given execution context for subsequent - * tries and therefore must be thread safe (not touch unsafe mutable state). + * tries and therefore must be thread safe (i.e. not touch unsafe mutable state). * * Example usage: * @@ -81,7 +116,64 @@ trait RetrySupport { minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double)(implicit ec: ExecutionContext, scheduler: Scheduler): Future[T] = { - require(attempt != null, "Parameter attempt should not be null.") + retry( + attempt, + RetrySupport.retryOnException, + attempts, + minBackoff, + maxBackoff, + randomFactor) + } + + /** + * Given a function from Unit to Future, returns an internally retrying Future. + * + * When the future is completed, the `shouldRetry` predicate is always been invoked with the result (or `null` if none) + * and the exception (or `null` if none). If the `shouldRetry` predicate returns true, then a new attempt is made, + * each subsequent attempt will be made after the 'delay' return by `delayFunction` (the input next attempt count start from 1). + * Returns [[scala.None]] for no delay. + * + * If attempts are exhausted the returned future is simply the result of invoking attempt. + * Note that the attempt function will be invoked on the given execution context for subsequent + * tries and therefore must be thread safe (i.e. not touch unsafe mutable state). + * + * Example usage: + * + * {{{ + * protected val sendAndReceive: HttpRequest => Future[HttpResponse] + * protected val shouldRetry: (HttpResponse, Throwable) => throwable ne null + * private val sendReceiveRetry: HttpRequest => Future[HttpResponse] = (req: HttpRequest) => retry[HttpResponse]( + * attempt = () => sendAndReceive(req), + * shouldRetry, + * attempts = 10, + * minBackoff = 1.seconds, + * maxBackoff = 2.seconds, + * randomFactor = 0.5 + * ) + * }}} + * + * @param attempt the function to be attempted + * @param shouldRetry the predicate to determine if the attempt should be retried + * @param attempts the maximum number of attempts + * @param minBackoff minimum (initial) duration until the child actor will + * started again, if it is terminated + * @param maxBackoff the exponential back-off is capped to this duration + * @param randomFactor after calculation of the exponential back-off an additional + * random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay. + * In order to skip this additional delay pass in `0`. + * @param ec the execution context + * @param scheduler the scheduler for scheduling a delay + * @return the result future which maybe retried + * + * @since 1.1.0 + */ + def retry[T]( + attempt: () => Future[T], + shouldRetry: (T, Throwable) => Boolean, + attempts: Int, + minBackoff: FiniteDuration, + maxBackoff: FiniteDuration, + randomFactor: Double)(implicit ec: ExecutionContext, scheduler: Scheduler): Future[T] = { require(minBackoff != null, "Parameter minBackoff should not be null.") require(maxBackoff != null, "Parameter maxBackoff should not be null.") require(minBackoff > Duration.Zero, "Parameter minBackoff must be > 0") @@ -89,6 +181,7 @@ trait RetrySupport { require(0.0 <= randomFactor && randomFactor <= 1.0, "randomFactor must be between 0.0 and 1.0") retry( attempt, + shouldRetry, attempts, attempted => Some(BackoffSupervisor.calculateDelay(attempted, minBackoff, maxBackoff, randomFactor))) } @@ -100,7 +193,7 @@ trait RetrySupport { * * If attempts are exhausted the returned future is simply the result of invoking attempt. * Note that the attempt function will be invoked on the given execution context for subsequent - * tries and therefore must be thread safe (not touch unsafe mutable state). + * tries and therefore must be thread safe (i.e. not touch unsafe mutable state). * * Example usage: * @@ -121,16 +214,61 @@ trait RetrySupport { /** * Given a function from Unit to Future, returns an internally retrying Future. - * The first attempt will be made immediately, each subsequent attempt will be made after - * the 'delay' return by `delayFunction`(the input next attempt count start from 1). + * + * When the future is completed, the `shouldRetry` predicate is always been invoked with the result (or `null` if none) + * and the exception (or `null` if none). If the `shouldRetry` predicate returns true, then a new attempt is made, + * each subsequent attempt will be made after the 'delay' return by `delayFunction` (the input next attempt count start from 1). * Returns [[scala.None]] for no delay. + * + * If attempts are exhausted the returned future is simply the result of invoking attempt. + * Note that the attempt function will be invoked on the given execution context for subsequent + * tries and therefore must be thread safe (i.e. not touch unsafe mutable state). + * + * Example usage: + * + * {{{ + * protected val sendAndReceive: HttpRequest => Future[HttpResponse] + * protected val shouldRetry: (HttpResponse, Throwable) => throwable ne null + * private val sendReceiveRetry: HttpRequest => Future[HttpResponse] = (req: HttpRequest) => retry[HttpResponse]( + * attempt = () => sendAndReceive(req), + * shouldRetry, + * attempts = 10, + * delay = 2.seconds + * ) + * }}} + * + * @param attempt the function to be attempted + * @param shouldRetry the predicate to determine if the attempt should be retried + * @param attempts the maximum number of attempts + * @param delay the delay duration + * @param ec the execution context + * @param scheduler the scheduler for scheduling a delay + * @return the result future which maybe retried + * + * @since 1.1.0 + */ + def retry[T](attempt: () => Future[T], + shouldRetry: (T, Throwable) => Boolean, + attempts: Int, + delay: FiniteDuration)( + implicit ec: ExecutionContext, + scheduler: Scheduler): Future[T] = { + retry(attempt, shouldRetry, attempts, _ => Some(delay)) + } + + /** + * Given a function from Unit to Future, returns an internally retrying Future. + * The first attempt will be made immediately, each subsequent attempt will be made after + * the 'delay' return by `delayFunction` (the input next attempt count start from 1). + * Returns [[scala.None]] for no delay. + * * A scheduler (eg context.system.scheduler) must be provided to delay each retry. * You could provide a function to generate the next delay duration after first attempt, * this function should never return `null`, otherwise an [[java.lang.IllegalArgumentException]] will be through. * * If attempts are exhausted the returned future is simply the result of invoking attempt. * Note that the attempt function will be invoked on the given execution context for subsequent - * tries and therefore must be thread safe (not touch unsafe mutable state). + * tries and therefore must be thread safe (i.e. not touch unsafe mutable state). * * Example usage: * @@ -148,22 +286,70 @@ trait RetrySupport { implicit ec: ExecutionContext, scheduler: Scheduler): Future[T] = { - RetrySupport.retry(attempt, attempts, delayFunction, attempted = 0) + RetrySupport.retry(attempt, RetrySupport.retryOnException, attempts, delayFunction, attempted = 0) + } + + /** + * Given a function from Unit to Future, returns an internally retrying Future. + * + * When the future is completed, the `shouldRetry` predicate is always been invoked with the result (or `null` if none) + * and the exception (or `null` if none). If the `shouldRetry` predicate returns true, then a new attempt is made, + * each subsequent attempt will be made after the 'delay' return by `delayFunction` (the input next attempt count start from 1). + * Returns [[scala.None]] for no delay. + * + * A scheduler (eg context.system.scheduler) must be provided to delay each retry. + * You could provide a function to generate the next delay duration after first attempt, + * this function should never return `null`, otherwise an [[java.lang.IllegalArgumentException]] will be through. + * + * If attempts are exhausted the returned future is simply the result of invoking attempt. + * Note that the attempt function will be invoked on the given execution context for subsequent + * tries and therefore must be thread safe (i.e. not touch unsafe mutable state). + * + * Example usage: + * + * //retry with back off + * {{{ + * protected val sendAndReceive: HttpRequest => Future[HttpResponse] + * protected val shouldRetry: (HttpResponse, Throwable) => throwable ne null + * private val sendReceiveRetry: HttpRequest => Future[HttpResponse] = (req: HttpRequest) => retry[HttpResponse]( + * attempt = () => sendAndReceive(req), + * shouldRetry, + * attempts = 10, + * delayFunction = attempted => Option(2.seconds * attempted) + * ) + * }}} + * + * @param attempt the function to be attempted + * @param shouldRetry the predicate to determine if the attempt should be retried + * @param attempts the maximum number of attempts + * @param delayFunction the function to generate the next delay duration, `None` for no delay + * @param ec the execution context + * @param scheduler the scheduler for scheduling a delay + * @return the result future which maybe retried + * + * @since 1.1.0 + */ + def retry[T](attempt: () => Future[T], + shouldRetry: (T, Throwable) => Boolean, + attempts: Int, + delayFunction: Int => Option[FiniteDuration])(implicit ec: ExecutionContext, scheduler: Scheduler): Future[T] = { + RetrySupport.retry(attempt, shouldRetry, attempts, delayFunction, attempted = 0) } } object RetrySupport extends RetrySupport { + private val retryOnException: (Any, Throwable) => Boolean = (_: Any, e: Throwable) => e != null private def retry[T](attempt: () => Future[T], maxAttempts: Int, attempted: Int)( implicit ec: ExecutionContext): Future[T] = - retry(attempt, maxAttempts, ConstantFun.scalaAnyToNone, attempted)(ec, null) + retry(attempt, retryOnException, maxAttempts, ConstantFun.scalaAnyToNone, attempted)(ec, null) private def retry[T]( attempt: () => Future[T], + shouldRetry: (T, Throwable) => Boolean, maxAttempts: Int, delayFunction: Int => Option[FiniteDuration], attempted: Int)(implicit ec: ExecutionContext, scheduler: Scheduler): Future[T] = { - def tryAttempt(): Future[T] = { try { attempt() @@ -172,30 +358,34 @@ object RetrySupport extends RetrySupport { } } - require(maxAttempts >= 0, "Parameter maxAttempts must >= 0.") + def doRetry(nextAttempt: Int): Future[T] = delayFunction(nextAttempt) match { + case Some(delay) => + if (delay.length < 1) + retry(attempt, shouldRetry, maxAttempts, delayFunction, nextAttempt) + else + after(delay, scheduler) { + retry(attempt, shouldRetry, maxAttempts, delayFunction, nextAttempt) + } + case None => + retry(attempt, shouldRetry, maxAttempts, delayFunction, nextAttempt) + case null => + Future.failed(new IllegalArgumentException("The delayFunction of retry should not return null.")) + } + require(attempt != null, "Parameter attempt should not be null.") + require(maxAttempts >= 0, "Parameter maxAttempts must >= 0.") + require(delayFunction != null, "Parameter delayFunction should not be null.") + require(attempted >= 0, "Parameter attempted must >= 0.") + if (maxAttempts - attempted > 0) { val result = tryAttempt() if (result eq null) result else { - val nextAttempt = attempted + 1 - result.recoverWith { - case NonFatal(_) => - delayFunction(nextAttempt) match { - case Some(delay) => - if (delay.length < 1) - retry(attempt, maxAttempts, delayFunction, nextAttempt) - else - after(delay, scheduler) { - retry(attempt, maxAttempts, delayFunction, nextAttempt) - } - case None => - retry(attempt, maxAttempts, delayFunction, nextAttempt) - case null => - Future.failed(new IllegalArgumentException("The delayFunction of retry should not return null.")) - } - + result.transformWith { + case Success(value) if shouldRetry(value, null) => doRetry(attempted + 1) + case Failure(e) if NonFatal(e) && shouldRetry(null.asInstanceOf[T], e) => doRetry(attempted + 1) + case _ => result } } diff --git a/docs/src/test/java/jdocs/future/FutureDocTest.java b/docs/src/test/java/jdocs/future/FutureDocTest.java index 861ebd9fae..5627b1d4e1 100644 --- a/docs/src/test/java/jdocs/future/FutureDocTest.java +++ b/docs/src/test/java/jdocs/future/FutureDocTest.java @@ -85,4 +85,17 @@ public class FutureDocTest extends AbstractJavaTest { retriedFuture.toCompletableFuture().get(2, SECONDS); } + + @Test + public void useRetryWithPredicate() throws Exception { + // #retry + Callable> attempt = () -> CompletableFuture.completedFuture("test"); + + CompletionStage retriedFuture = + Patterns.retry( + attempt, (notUsed, e) -> e != null, 3, java.time.Duration.ofMillis(200), system); + // #retry + + retriedFuture.toCompletableFuture().get(2, SECONDS); + } }