feat: Add retry with predicate (#1269)
This commit is contained in:
parent
64e07dcf1b
commit
77532c1bb5
4 changed files with 500 additions and 37 deletions
|
|
@ -22,6 +22,8 @@ import org.apache.pekko
|
|||
import pekko.actor.Scheduler
|
||||
import pekko.testkit.PekkoSpec
|
||||
|
||||
import java.util.concurrent.atomic.AtomicInteger
|
||||
|
||||
class RetrySpec extends PekkoSpec with RetrySupport {
|
||||
implicit val ec: ExecutionContextExecutor = system.dispatcher
|
||||
implicit val scheduler: Scheduler = system.scheduler
|
||||
|
|
@ -152,6 +154,38 @@ class RetrySpec extends PekkoSpec with RetrySupport {
|
|||
Await.result(retried, remaining) should ===(5)
|
||||
}
|
||||
}
|
||||
|
||||
"be able to retry with predicate for value" in {
|
||||
val counter = new AtomicInteger(0)
|
||||
def attempt(): Future[Int] = {
|
||||
Future.successful(counter.incrementAndGet())
|
||||
}
|
||||
|
||||
val retried = retry(() => attempt(), (t: Int, _) => t < 5, 10, 100 milliseconds)
|
||||
|
||||
within(3 seconds) {
|
||||
Await.result(retried, remaining) should ===(5)
|
||||
}
|
||||
}
|
||||
|
||||
"be able to skip retry with predicate for exception" in {
|
||||
val counter = new AtomicInteger(0)
|
||||
|
||||
def attempt(): Future[Int] = {
|
||||
counter.incrementAndGet()
|
||||
// should not retry on this exception
|
||||
Future.failed(new IllegalArgumentException())
|
||||
}
|
||||
|
||||
val retried =
|
||||
retry(() => attempt(), (_: Int, e) => !e.isInstanceOf[IllegalArgumentException], 10, 100 milliseconds)
|
||||
|
||||
within(3 seconds) {
|
||||
retried.failed.futureValue shouldBe an[IllegalArgumentException]
|
||||
counter.get() should ===(1)
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -15,6 +15,7 @@ package org.apache.pekko.pattern
|
|||
|
||||
import java.util.Optional
|
||||
import java.util.concurrent.{ Callable, CompletionStage, TimeUnit }
|
||||
import java.util.function.BiPredicate
|
||||
|
||||
import scala.concurrent.ExecutionContext
|
||||
|
||||
|
|
@ -491,13 +492,42 @@ object Patterns {
|
|||
*
|
||||
* If attempts are exhausted the returned completion operator is simply the result of invoking attempt.
|
||||
* Note that the attempt function will be invoked on the given execution context for subsequent tries
|
||||
* and therefore must be thread safe (not touch unsafe mutable state).
|
||||
* and therefore must be thread safe (i.e. not touch unsafe mutable state).
|
||||
*/
|
||||
def retry[T](attempt: Callable[CompletionStage[T]], attempts: Int, ec: ExecutionContext): CompletionStage[T] = {
|
||||
require(attempt != null, "Parameter attempt should not be null.")
|
||||
scalaRetry(() => attempt.call().asScala, attempts)(ec).asJava
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns an internally retrying [[java.util.concurrent.CompletionStage]].
|
||||
*
|
||||
* When the future is completed, the `shouldRetry` predicate is always been invoked with the result (or `null` if none)
|
||||
* and the exception (or `null` if none). If the `shouldRetry` predicate returns true, then a new attempt is made,
|
||||
* each subsequent attempt will be made after the 'delay' return by `delayFunction` (the input next attempt count start from 1).
|
||||
* Return an empty [[Optional]] instance for no delay.
|
||||
*
|
||||
* If attempts are exhausted the returned completion operator is simply the result of invoking attempt.
|
||||
* Note that the attempt function will be invoked on the given execution context for subsequent tries
|
||||
* and therefore must be thread safe (i.e. not touch unsafe mutable state).
|
||||
*
|
||||
* @param attempt the function to be attempted
|
||||
* @param shouldRetry the predicate to determine if the attempt should be retried
|
||||
* @param attempts the maximum number of attempts
|
||||
* @param ec the execution context
|
||||
* @return the result [[java.util.concurrent.CompletionStage]] which maybe retried
|
||||
*
|
||||
* @since 1.1.0
|
||||
*/
|
||||
def retry[T](
|
||||
attempt: Callable[CompletionStage[T]],
|
||||
shouldRetry: BiPredicate[T, Throwable],
|
||||
attempts: Int,
|
||||
ec: ExecutionContext): CompletionStage[T] = {
|
||||
require(attempt != null, "Parameter attempt should not be null.")
|
||||
scalaRetry(() => attempt.call().asScala, (t, e) => shouldRetry.test(t, e), attempts)(ec).asJava
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns an internally retrying [[java.util.concurrent.CompletionStage]]
|
||||
* The first attempt will be made immediately, each subsequent attempt will be made with a backoff time,
|
||||
|
|
@ -505,7 +535,7 @@ object Patterns {
|
|||
*
|
||||
* If attempts are exhausted the returned future is simply the result of invoking attempt.
|
||||
* Note that the attempt function will be invoked on the given execution context for subsequent tries and
|
||||
* therefore must be thread safe (not touch unsafe mutable state).
|
||||
* therefore must be thread safe (i.e. not touch unsafe mutable state).
|
||||
*
|
||||
* @param minBackoff minimum (initial) duration until the child actor will
|
||||
* started again, if it is terminated
|
||||
|
|
@ -530,6 +560,50 @@ object Patterns {
|
|||
system.classicSystem.scheduler,
|
||||
system.classicSystem.dispatcher)
|
||||
|
||||
/**
|
||||
* Returns an internally retrying [[java.util.concurrent.CompletionStage]].
|
||||
*
|
||||
* When the future is completed, the `shouldRetry` predicate is always been invoked with the result (or `null` if none)
|
||||
* and the exception (or `null` if none). If the `shouldRetry` predicate returns true, then a new attempt is made,
|
||||
* each subsequent attempt will be made after the 'delay' return by `delayFunction` (the input next attempt count start from 1).
|
||||
* Return an empty [[Optional]] instance for no delay.
|
||||
*
|
||||
* If attempts are exhausted the returned future is simply the result of invoking attempt.
|
||||
* Note that the attempt function will be invoked on the given execution context for subsequent tries and
|
||||
* therefore must be thread safe (i.e. not touch unsafe mutable state).
|
||||
*
|
||||
* @param attempt the function to be attempted
|
||||
* @param shouldRetry the predicate to determine if the attempt should be retried
|
||||
* @param attempts the maximum number of attempts
|
||||
* @param minBackoff minimum (initial) duration until the child actor will
|
||||
* started again, if it is terminated
|
||||
* @param maxBackoff the exponential back-off is capped to this duration
|
||||
* @param randomFactor after calculation of the exponential back-off an additional
|
||||
* random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay.
|
||||
* In order to skip this additional delay pass in `0`.
|
||||
* @param system the actor system
|
||||
* @return the result [[java.util.concurrent.CompletionStage]] which maybe retried
|
||||
*
|
||||
* @since 1.1.0
|
||||
*/
|
||||
def retry[T](
|
||||
attempt: Callable[CompletionStage[T]],
|
||||
shouldRetry: BiPredicate[T, Throwable],
|
||||
attempts: Int,
|
||||
minBackoff: java.time.Duration,
|
||||
maxBackoff: java.time.Duration,
|
||||
randomFactor: Double,
|
||||
system: ClassicActorSystemProvider): CompletionStage[T] =
|
||||
retry(
|
||||
attempt,
|
||||
shouldRetry,
|
||||
attempts,
|
||||
minBackoff,
|
||||
maxBackoff,
|
||||
randomFactor,
|
||||
system.classicSystem.scheduler,
|
||||
system.classicSystem.dispatcher)
|
||||
|
||||
/**
|
||||
* Returns an internally retrying [[java.util.concurrent.CompletionStage]]
|
||||
* The first attempt will be made immediately, each subsequent attempt will be made with a backoff time,
|
||||
|
|
@ -537,7 +611,7 @@ object Patterns {
|
|||
*
|
||||
* If attempts are exhausted the returned future is simply the result of invoking attempt.
|
||||
* Note that the attempt function will be invoked on the given execution context for subsequent tries and
|
||||
* therefore must be thread safe (not touch unsafe mutable state).
|
||||
* therefore must be thread safe (i.e. not touch unsafe mutable state).
|
||||
*
|
||||
* @param minBackoff minimum (initial) duration until the child actor will
|
||||
* started again, if it is terminated
|
||||
|
|
@ -562,6 +636,53 @@ object Patterns {
|
|||
scheduler).asJava
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns an internally retrying [[java.util.concurrent.CompletionStage]].
|
||||
*
|
||||
* When the future is completed, the `shouldRetry` predicate is always been invoked with the result (or `null` if none)
|
||||
* and the exception (or `null` if none). If the `shouldRetry` predicate returns true, then a new attempt is made,
|
||||
* each subsequent attempt will be made after the 'delay' return by `delayFunction` (the input next attempt count start from 1).
|
||||
* Return an empty [[Optional]] instance for no delay.
|
||||
*
|
||||
* If attempts are exhausted the returned future is simply the result of invoking attempt.
|
||||
* Note that the attempt function will be invoked on the given execution context for subsequent tries and
|
||||
* therefore must be thread safe (i.e. not touch unsafe mutable state).
|
||||
*
|
||||
* @param attempt the function to be attempted
|
||||
* @param shouldRetry the predicate to determine if the attempt should be retried
|
||||
* @param attempts the maximum number of attempts
|
||||
* @param minBackoff minimum (initial) duration until the child actor will
|
||||
* started again, if it is terminated
|
||||
* @param maxBackoff the exponential back-off is capped to this duration
|
||||
* @param randomFactor after calculation of the exponential back-off an additional
|
||||
* random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay.
|
||||
* In order to skip this additional delay pass in `0`.
|
||||
* @param scheduler the scheduler for scheduling a delay
|
||||
* @param ec the execution context
|
||||
* @return the result [[java.util.concurrent.CompletionStage]] which maybe retried
|
||||
*
|
||||
* @since 1.1.0
|
||||
*/
|
||||
def retry[T](
|
||||
attempt: Callable[CompletionStage[T]],
|
||||
shouldRetry: BiPredicate[T, Throwable],
|
||||
attempts: Int,
|
||||
minBackoff: java.time.Duration,
|
||||
maxBackoff: java.time.Duration,
|
||||
randomFactor: Double,
|
||||
scheduler: Scheduler,
|
||||
ec: ExecutionContext): CompletionStage[T] = {
|
||||
require(attempt != null, "Parameter attempt should not be null.")
|
||||
require(minBackoff != null, "Parameter minBackoff should not be null.")
|
||||
require(maxBackoff != null, "Parameter minBackoff should not be null.")
|
||||
scalaRetry(
|
||||
() => attempt.call().asScala,
|
||||
(t, e) => shouldRetry.test(t, e),
|
||||
attempts, minBackoff.asScala, maxBackoff.asScala, randomFactor)(
|
||||
ec,
|
||||
scheduler).asJava
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns an internally retrying [[scala.concurrent.Future]]
|
||||
* The first attempt will be made immediately, and each subsequent attempt will be made after 'delay'.
|
||||
|
|
@ -569,7 +690,7 @@ object Patterns {
|
|||
*
|
||||
* If attempts are exhausted the returned future is simply the result of invoking attempt.
|
||||
* Note that the attempt function will be invoked on the given execution context for subsequent tries and
|
||||
* therefore must be thread safe (not touch unsafe mutable state).
|
||||
* therefore must be thread safe (i.e. not touch unsafe mutable state).
|
||||
*/
|
||||
def retry[T](
|
||||
attempt: Callable[Future[T]],
|
||||
|
|
@ -588,7 +709,7 @@ object Patterns {
|
|||
*
|
||||
* If attempts are exhausted the returned completion operator is simply the result of invoking attempt.
|
||||
* Note that the attempt function will be invoked on the given execution context for subsequent tries
|
||||
* and therefore must be thread safe (not touch unsafe mutable state).
|
||||
* and therefore must be thread safe (i.e. not touch unsafe mutable state).
|
||||
*/
|
||||
def retry[T](
|
||||
attempt: Callable[CompletionStage[T]],
|
||||
|
|
@ -597,6 +718,35 @@ object Patterns {
|
|||
system: ClassicActorSystemProvider): CompletionStage[T] =
|
||||
retry(attempt, attempts, delay, system.classicSystem.scheduler, system.classicSystem.dispatcher)
|
||||
|
||||
/**
|
||||
* Returns an internally retrying [[java.util.concurrent.CompletionStage]].
|
||||
*
|
||||
* When the future is completed, the `shouldRetry` predicate is always been invoked with the result (or `null` if none)
|
||||
* and the exception (or `null` if none). If the `shouldRetry` predicate returns true, then a new attempt is made,
|
||||
* each subsequent attempt will be made after the 'delay' return by `delayFunction` (the input next attempt count start from 1).
|
||||
* Return an empty [[Optional]] instance for no delay.
|
||||
*
|
||||
* If attempts are exhausted the returned completion operator is simply the result of invoking attempt.
|
||||
* Note that the attempt function will be invoked on the given execution context for subsequent tries
|
||||
* and therefore must be thread safe (i.e. not touch unsafe mutable state).
|
||||
*
|
||||
* @param attempt the function to be attempted
|
||||
* @param shouldRetry the predicate to determine if the attempt should be retried
|
||||
* @param attempts the maximum number of attempts
|
||||
* @param delay the delay between each attempt
|
||||
* @param system the actor system
|
||||
* @return the result [[java.util.concurrent.CompletionStage]] which maybe retried
|
||||
*
|
||||
* @since 1.1.0
|
||||
*/
|
||||
def retry[T](
|
||||
attempt: Callable[CompletionStage[T]],
|
||||
shouldRetry: BiPredicate[T, Throwable],
|
||||
attempts: Int,
|
||||
delay: java.time.Duration,
|
||||
system: ClassicActorSystemProvider): CompletionStage[T] =
|
||||
retry(attempt, shouldRetry, attempts, delay, system.classicSystem.scheduler, system.classicSystem.dispatcher)
|
||||
|
||||
/**
|
||||
* Returns an internally retrying [[java.util.concurrent.CompletionStage]]
|
||||
* The first attempt will be made immediately, and each subsequent attempt will be made after 'delay'.
|
||||
|
|
@ -604,7 +754,7 @@ object Patterns {
|
|||
*
|
||||
* If attempts are exhausted the returned completion operator is simply the result of invoking attempt.
|
||||
* Note that the attempt function will be invoked on the given execution context for subsequent tries
|
||||
* and therefore must be thread safe (not touch unsafe mutable state).
|
||||
* and therefore must be thread safe (i.e. not touch unsafe mutable state).
|
||||
*/
|
||||
def retry[T](
|
||||
attempt: Callable[CompletionStage[T]],
|
||||
|
|
@ -616,10 +766,44 @@ object Patterns {
|
|||
scalaRetry(() => attempt.call().asScala, attempts, delay.asScala)(ec, scheduler).asJava
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns an internally retrying [[java.util.concurrent.CompletionStage]].
|
||||
*
|
||||
* When the future is completed, the `shouldRetry` predicate is always been invoked with the result (or `null` if none)
|
||||
* and the exception (or `null` if none). If the `shouldRetry` predicate returns true, then a new attempt is made,
|
||||
* each subsequent attempt will be made after the 'delay' return by `delayFunction` (the input next attempt count start from 1).
|
||||
* Return an empty [[Optional]] instance for no delay.
|
||||
*
|
||||
* If attempts are exhausted the returned completion operator is simply the result of invoking attempt.
|
||||
* Note that the attempt function will be invoked on the given execution context for subsequent tries
|
||||
* and therefore must be thread safe (i.e. not touch unsafe mutable state).
|
||||
*
|
||||
* @param attempt the function to be attempted
|
||||
* @param shouldRetry the predicate to determine if the attempt should be retried
|
||||
* @param attempts the maximum number of attempts
|
||||
* @param delay the delay between each attempt
|
||||
* @param scheduler the scheduler for scheduling a delay
|
||||
* @param ec the execution context
|
||||
* @return the result [[java.util.concurrent.CompletionStage]] which maybe retried *
|
||||
*
|
||||
* @since 1.1.0
|
||||
*/
|
||||
def retry[T](
|
||||
attempt: Callable[CompletionStage[T]],
|
||||
shouldRetry: BiPredicate[T, Throwable],
|
||||
attempts: Int,
|
||||
delay: java.time.Duration,
|
||||
scheduler: Scheduler,
|
||||
ec: ExecutionContext): CompletionStage[T] = {
|
||||
require(attempt != null, "Parameter attempt should not be null.")
|
||||
scalaRetry(() => attempt.call().asScala, (t, e) => shouldRetry.test(t, e), attempts, delay.asScala)(ec,
|
||||
scheduler).asJava
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns an internally retrying [[java.util.concurrent.CompletionStage]].
|
||||
* The first attempt will be made immediately, each subsequent attempt will be made after
|
||||
* the 'delay' return by `delayFunction`(the input next attempt count start from 1).
|
||||
* the 'delay' return by `delayFunction` (the input next attempt count start from 1).
|
||||
* Return an empty [[Optional]] instance for no delay.
|
||||
* A scheduler (eg context.system.scheduler) must be provided to delay each retry.
|
||||
* You could provide a function to generate the next delay duration after first attempt,
|
||||
|
|
@ -627,7 +811,7 @@ object Patterns {
|
|||
*
|
||||
* If attempts are exhausted the returned future is simply the result of invoking attempt.
|
||||
* Note that the attempt function will be invoked on the given execution context for subsequent tries and
|
||||
* therefore must be thread safe (not touch unsafe mutable state).
|
||||
* therefore must be thread safe (i.e. not touch unsafe mutable state).
|
||||
*/
|
||||
def retry[T](
|
||||
attempt: Callable[CompletionStage[T]],
|
||||
|
|
@ -642,6 +826,48 @@ object Patterns {
|
|||
attempts,
|
||||
attempted => delayFunction.apply(attempted).toScala.map(_.asScala))(context, scheduler).asJava
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns an internally retrying [[java.util.concurrent.CompletionStage]].
|
||||
*
|
||||
* When the future is completed, the `shouldRetry` predicate is always been invoked with the result (or `null` if none)
|
||||
* and the exception (or `null` if none). If the `shouldRetry` predicate returns true, then a new attempt is made,
|
||||
* each subsequent attempt will be made after the 'delay' return by `delayFunction` (the input next attempt count start from 1).
|
||||
* Return an empty [[Optional]] instance for no delay.
|
||||
*
|
||||
* A scheduler (eg context.system.scheduler) must be provided to delay each retry.
|
||||
* You could provide a function to generate the next delay duration after first attempt,
|
||||
* this function should never return `null`, otherwise an [[java.lang.IllegalArgumentException]] will be through.
|
||||
*
|
||||
* If attempts are exhausted the returned future is simply the result of invoking attempt.
|
||||
* Note that the attempt function will be invoked on the given execution context for subsequent tries and
|
||||
* therefore must be thread safe (i.e. not touch unsafe mutable state).
|
||||
*
|
||||
* @param attempt the function to be attempted
|
||||
* @param shouldRetry the predicate to determine if the attempt should be retried
|
||||
* @param attempts the maximum number of attempts
|
||||
* @param delayFunction the function to generate the next delay duration, `None` for no delay
|
||||
* @param scheduler the scheduler for scheduling a delay
|
||||
* @param context the execution context
|
||||
* @return the result [[java.util.concurrent.CompletionStage]] which maybe retried
|
||||
*
|
||||
* @since 1.1.0
|
||||
*/
|
||||
def retry[T](
|
||||
attempt: Callable[CompletionStage[T]],
|
||||
shouldRetry: BiPredicate[T, Throwable],
|
||||
attempts: Int,
|
||||
delayFunction: java.util.function.IntFunction[Optional[java.time.Duration]],
|
||||
scheduler: Scheduler,
|
||||
context: ExecutionContext): CompletionStage[T] = {
|
||||
import pekko.util.OptionConverters._
|
||||
require(attempt != null, "Parameter attempt should not be null.")
|
||||
scalaRetry(
|
||||
() => attempt.call().asScala,
|
||||
(t, e) => shouldRetry.test(t, e),
|
||||
attempts,
|
||||
attempted => delayFunction.apply(attempted).toScala.map(_.asScala))(context, scheduler).asJava
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -1077,7 +1303,7 @@ object PatternsCS {
|
|||
* A scheduler (eg context.system.scheduler) must be provided to delay each retry
|
||||
* If attempts are exhausted the returned completion operator is simply the result of invoking attempt.
|
||||
* Note that the attempt function will be invoked on the given execution context for subsequent tries
|
||||
* and therefore must be thread safe (not touch unsafe mutable state).
|
||||
* and therefore must be thread safe (i.e. not touch unsafe mutable state).
|
||||
*/
|
||||
@deprecated("Use Patterns.retry instead.", since = "Akka 2.5.19")
|
||||
def retry[T](
|
||||
|
|
|
|||
|
|
@ -15,6 +15,7 @@ package org.apache.pekko.pattern
|
|||
|
||||
import scala.concurrent.{ ExecutionContext, Future }
|
||||
import scala.concurrent.duration.{ Duration, FiniteDuration }
|
||||
import scala.util.{ Failure, Success }
|
||||
import scala.util.control.NonFatal
|
||||
|
||||
import org.apache.pekko
|
||||
|
|
@ -33,7 +34,7 @@ trait RetrySupport {
|
|||
*
|
||||
* If attempts are exhausted the returned future is simply the result of invoking attempt.
|
||||
* Note that the attempt function will be invoked on the given execution context for subsequent
|
||||
* tries and therefore must be thread safe (not touch unsafe mutable state).
|
||||
* tries and therefore must be thread safe (i.e. not touch unsafe mutable state).
|
||||
*
|
||||
* <b>Example usage:</b>
|
||||
*
|
||||
|
|
@ -46,6 +47,40 @@ trait RetrySupport {
|
|||
RetrySupport.retry(attempt, attempts, attempted = 0)
|
||||
}
|
||||
|
||||
/**
|
||||
* Given a function from Unit to Future, returns an internally retrying Future.
|
||||
*
|
||||
* When the future is completed, the `shouldRetry` predicate is always been invoked with the result (or `null` if none)
|
||||
* and the exception (or `null` if none). If the `shouldRetry` predicate returns true, then a new attempt is made,
|
||||
* each subsequent attempt will be made after the 'delay' return by `delayFunction` (the input next attempt count start from 1).
|
||||
* Returns [[scala.None]] for no delay.
|
||||
*
|
||||
* If attempts are exhausted the returned future is simply the result of invoking attempt.
|
||||
* Note that the attempt function will be invoked on the given execution context for subsequent
|
||||
* tries and therefore must be thread safe (i.e. not touch unsafe mutable state).
|
||||
*
|
||||
* <b>Example usage:</b>
|
||||
*
|
||||
* {{{
|
||||
* def possiblyFailing(): Future[Something] = ???
|
||||
* val shouldRetry: (Something, Throwable) => throwable ne null
|
||||
* val withRetry: Future[Something] = retry(attempt = possiblyFailing, shouldRetry, attempts = 10)
|
||||
* }}}
|
||||
*
|
||||
* @param attempt the function to be attempted
|
||||
* @param shouldRetry the predicate to determine if the attempt should be retried
|
||||
* @param attempts the maximum number of attempts
|
||||
* @param ec the execution context
|
||||
* @return the result future which maybe retried
|
||||
*
|
||||
* @since 1.1.0
|
||||
*/
|
||||
def retry[T](attempt: () => Future[T],
|
||||
shouldRetry: (T, Throwable) => Boolean,
|
||||
attempts: Int)(implicit ec: ExecutionContext): Future[T] = {
|
||||
RetrySupport.retry(attempt, shouldRetry, attempts, ConstantFun.scalaAnyToNone, attempted = 0)(ec, null)
|
||||
}
|
||||
|
||||
/**
|
||||
* Given a function from Unit to Future, returns an internally retrying Future.
|
||||
* The first attempt will be made immediately, each subsequent attempt will be made with a backoff time,
|
||||
|
|
@ -53,7 +88,7 @@ trait RetrySupport {
|
|||
*
|
||||
* If attempts are exhausted the returned future is simply the result of invoking attempt.
|
||||
* Note that the attempt function will be invoked on the given execution context for subsequent
|
||||
* tries and therefore must be thread safe (not touch unsafe mutable state).
|
||||
* tries and therefore must be thread safe (i.e. not touch unsafe mutable state).
|
||||
*
|
||||
* <b>Example usage:</b>
|
||||
*
|
||||
|
|
@ -81,7 +116,64 @@ trait RetrySupport {
|
|||
minBackoff: FiniteDuration,
|
||||
maxBackoff: FiniteDuration,
|
||||
randomFactor: Double)(implicit ec: ExecutionContext, scheduler: Scheduler): Future[T] = {
|
||||
require(attempt != null, "Parameter attempt should not be null.")
|
||||
retry(
|
||||
attempt,
|
||||
RetrySupport.retryOnException,
|
||||
attempts,
|
||||
minBackoff,
|
||||
maxBackoff,
|
||||
randomFactor)
|
||||
}
|
||||
|
||||
/**
|
||||
* Given a function from Unit to Future, returns an internally retrying Future.
|
||||
*
|
||||
* When the future is completed, the `shouldRetry` predicate is always been invoked with the result (or `null` if none)
|
||||
* and the exception (or `null` if none). If the `shouldRetry` predicate returns true, then a new attempt is made,
|
||||
* each subsequent attempt will be made after the 'delay' return by `delayFunction` (the input next attempt count start from 1).
|
||||
* Returns [[scala.None]] for no delay.
|
||||
*
|
||||
* If attempts are exhausted the returned future is simply the result of invoking attempt.
|
||||
* Note that the attempt function will be invoked on the given execution context for subsequent
|
||||
* tries and therefore must be thread safe (i.e. not touch unsafe mutable state).
|
||||
*
|
||||
* <b>Example usage:</b>
|
||||
*
|
||||
* {{{
|
||||
* protected val sendAndReceive: HttpRequest => Future[HttpResponse]
|
||||
* protected val shouldRetry: (HttpResponse, Throwable) => throwable ne null
|
||||
* private val sendReceiveRetry: HttpRequest => Future[HttpResponse] = (req: HttpRequest) => retry[HttpResponse](
|
||||
* attempt = () => sendAndReceive(req),
|
||||
* shouldRetry,
|
||||
* attempts = 10,
|
||||
* minBackoff = 1.seconds,
|
||||
* maxBackoff = 2.seconds,
|
||||
* randomFactor = 0.5
|
||||
* )
|
||||
* }}}
|
||||
*
|
||||
* @param attempt the function to be attempted
|
||||
* @param shouldRetry the predicate to determine if the attempt should be retried
|
||||
* @param attempts the maximum number of attempts
|
||||
* @param minBackoff minimum (initial) duration until the child actor will
|
||||
* started again, if it is terminated
|
||||
* @param maxBackoff the exponential back-off is capped to this duration
|
||||
* @param randomFactor after calculation of the exponential back-off an additional
|
||||
* random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay.
|
||||
* In order to skip this additional delay pass in `0`.
|
||||
* @param ec the execution context
|
||||
* @param scheduler the scheduler for scheduling a delay
|
||||
* @return the result future which maybe retried
|
||||
*
|
||||
* @since 1.1.0
|
||||
*/
|
||||
def retry[T](
|
||||
attempt: () => Future[T],
|
||||
shouldRetry: (T, Throwable) => Boolean,
|
||||
attempts: Int,
|
||||
minBackoff: FiniteDuration,
|
||||
maxBackoff: FiniteDuration,
|
||||
randomFactor: Double)(implicit ec: ExecutionContext, scheduler: Scheduler): Future[T] = {
|
||||
require(minBackoff != null, "Parameter minBackoff should not be null.")
|
||||
require(maxBackoff != null, "Parameter maxBackoff should not be null.")
|
||||
require(minBackoff > Duration.Zero, "Parameter minBackoff must be > 0")
|
||||
|
|
@ -89,6 +181,7 @@ trait RetrySupport {
|
|||
require(0.0 <= randomFactor && randomFactor <= 1.0, "randomFactor must be between 0.0 and 1.0")
|
||||
retry(
|
||||
attempt,
|
||||
shouldRetry,
|
||||
attempts,
|
||||
attempted => Some(BackoffSupervisor.calculateDelay(attempted, minBackoff, maxBackoff, randomFactor)))
|
||||
}
|
||||
|
|
@ -100,7 +193,7 @@ trait RetrySupport {
|
|||
*
|
||||
* If attempts are exhausted the returned future is simply the result of invoking attempt.
|
||||
* Note that the attempt function will be invoked on the given execution context for subsequent
|
||||
* tries and therefore must be thread safe (not touch unsafe mutable state).
|
||||
* tries and therefore must be thread safe (i.e. not touch unsafe mutable state).
|
||||
*
|
||||
* <b>Example usage:</b>
|
||||
*
|
||||
|
|
@ -121,16 +214,61 @@ trait RetrySupport {
|
|||
|
||||
/**
|
||||
* Given a function from Unit to Future, returns an internally retrying Future.
|
||||
* The first attempt will be made immediately, each subsequent attempt will be made after
|
||||
* the 'delay' return by `delayFunction`(the input next attempt count start from 1).
|
||||
*
|
||||
* When the future is completed, the `shouldRetry` predicate is always been invoked with the result (or `null` if none)
|
||||
* and the exception (or `null` if none). If the `shouldRetry` predicate returns true, then a new attempt is made,
|
||||
* each subsequent attempt will be made after the 'delay' return by `delayFunction` (the input next attempt count start from 1).
|
||||
* Returns [[scala.None]] for no delay.
|
||||
*
|
||||
* If attempts are exhausted the returned future is simply the result of invoking attempt.
|
||||
* Note that the attempt function will be invoked on the given execution context for subsequent
|
||||
* tries and therefore must be thread safe (i.e. not touch unsafe mutable state).
|
||||
*
|
||||
* <b>Example usage:</b>
|
||||
*
|
||||
* {{{
|
||||
* protected val sendAndReceive: HttpRequest => Future[HttpResponse]
|
||||
* protected val shouldRetry: (HttpResponse, Throwable) => throwable ne null
|
||||
* private val sendReceiveRetry: HttpRequest => Future[HttpResponse] = (req: HttpRequest) => retry[HttpResponse](
|
||||
* attempt = () => sendAndReceive(req),
|
||||
* shouldRetry,
|
||||
* attempts = 10,
|
||||
* delay = 2.seconds
|
||||
* )
|
||||
* }}}
|
||||
*
|
||||
* @param attempt the function to be attempted
|
||||
* @param shouldRetry the predicate to determine if the attempt should be retried
|
||||
* @param attempts the maximum number of attempts
|
||||
* @param delay the delay duration
|
||||
* @param ec the execution context
|
||||
* @param scheduler the scheduler for scheduling a delay
|
||||
* @return the result future which maybe retried
|
||||
*
|
||||
* @since 1.1.0
|
||||
*/
|
||||
def retry[T](attempt: () => Future[T],
|
||||
shouldRetry: (T, Throwable) => Boolean,
|
||||
attempts: Int,
|
||||
delay: FiniteDuration)(
|
||||
implicit ec: ExecutionContext,
|
||||
scheduler: Scheduler): Future[T] = {
|
||||
retry(attempt, shouldRetry, attempts, _ => Some(delay))
|
||||
}
|
||||
|
||||
/**
|
||||
* Given a function from Unit to Future, returns an internally retrying Future.
|
||||
* The first attempt will be made immediately, each subsequent attempt will be made after
|
||||
* the 'delay' return by `delayFunction` (the input next attempt count start from 1).
|
||||
* Returns [[scala.None]] for no delay.
|
||||
*
|
||||
* A scheduler (eg context.system.scheduler) must be provided to delay each retry.
|
||||
* You could provide a function to generate the next delay duration after first attempt,
|
||||
* this function should never return `null`, otherwise an [[java.lang.IllegalArgumentException]] will be through.
|
||||
*
|
||||
* If attempts are exhausted the returned future is simply the result of invoking attempt.
|
||||
* Note that the attempt function will be invoked on the given execution context for subsequent
|
||||
* tries and therefore must be thread safe (not touch unsafe mutable state).
|
||||
* tries and therefore must be thread safe (i.e. not touch unsafe mutable state).
|
||||
*
|
||||
* <b>Example usage:</b>
|
||||
*
|
||||
|
|
@ -148,22 +286,70 @@ trait RetrySupport {
|
|||
implicit
|
||||
ec: ExecutionContext,
|
||||
scheduler: Scheduler): Future[T] = {
|
||||
RetrySupport.retry(attempt, attempts, delayFunction, attempted = 0)
|
||||
RetrySupport.retry(attempt, RetrySupport.retryOnException, attempts, delayFunction, attempted = 0)
|
||||
}
|
||||
|
||||
/**
|
||||
* Given a function from Unit to Future, returns an internally retrying Future.
|
||||
*
|
||||
* When the future is completed, the `shouldRetry` predicate is always been invoked with the result (or `null` if none)
|
||||
* and the exception (or `null` if none). If the `shouldRetry` predicate returns true, then a new attempt is made,
|
||||
* each subsequent attempt will be made after the 'delay' return by `delayFunction` (the input next attempt count start from 1).
|
||||
* Returns [[scala.None]] for no delay.
|
||||
*
|
||||
* A scheduler (eg context.system.scheduler) must be provided to delay each retry.
|
||||
* You could provide a function to generate the next delay duration after first attempt,
|
||||
* this function should never return `null`, otherwise an [[java.lang.IllegalArgumentException]] will be through.
|
||||
*
|
||||
* If attempts are exhausted the returned future is simply the result of invoking attempt.
|
||||
* Note that the attempt function will be invoked on the given execution context for subsequent
|
||||
* tries and therefore must be thread safe (i.e. not touch unsafe mutable state).
|
||||
*
|
||||
* <b>Example usage:</b>
|
||||
*
|
||||
* //retry with back off
|
||||
* {{{
|
||||
* protected val sendAndReceive: HttpRequest => Future[HttpResponse]
|
||||
* protected val shouldRetry: (HttpResponse, Throwable) => throwable ne null
|
||||
* private val sendReceiveRetry: HttpRequest => Future[HttpResponse] = (req: HttpRequest) => retry[HttpResponse](
|
||||
* attempt = () => sendAndReceive(req),
|
||||
* shouldRetry,
|
||||
* attempts = 10,
|
||||
* delayFunction = attempted => Option(2.seconds * attempted)
|
||||
* )
|
||||
* }}}
|
||||
*
|
||||
* @param attempt the function to be attempted
|
||||
* @param shouldRetry the predicate to determine if the attempt should be retried
|
||||
* @param attempts the maximum number of attempts
|
||||
* @param delayFunction the function to generate the next delay duration, `None` for no delay
|
||||
* @param ec the execution context
|
||||
* @param scheduler the scheduler for scheduling a delay
|
||||
* @return the result future which maybe retried
|
||||
*
|
||||
* @since 1.1.0
|
||||
*/
|
||||
def retry[T](attempt: () => Future[T],
|
||||
shouldRetry: (T, Throwable) => Boolean,
|
||||
attempts: Int,
|
||||
delayFunction: Int => Option[FiniteDuration])(implicit ec: ExecutionContext, scheduler: Scheduler): Future[T] = {
|
||||
RetrySupport.retry(attempt, shouldRetry, attempts, delayFunction, attempted = 0)
|
||||
}
|
||||
}
|
||||
|
||||
object RetrySupport extends RetrySupport {
|
||||
private val retryOnException: (Any, Throwable) => Boolean = (_: Any, e: Throwable) => e != null
|
||||
|
||||
private def retry[T](attempt: () => Future[T], maxAttempts: Int, attempted: Int)(
|
||||
implicit ec: ExecutionContext): Future[T] =
|
||||
retry(attempt, maxAttempts, ConstantFun.scalaAnyToNone, attempted)(ec, null)
|
||||
retry(attempt, retryOnException, maxAttempts, ConstantFun.scalaAnyToNone, attempted)(ec, null)
|
||||
|
||||
private def retry[T](
|
||||
attempt: () => Future[T],
|
||||
shouldRetry: (T, Throwable) => Boolean,
|
||||
maxAttempts: Int,
|
||||
delayFunction: Int => Option[FiniteDuration],
|
||||
attempted: Int)(implicit ec: ExecutionContext, scheduler: Scheduler): Future[T] = {
|
||||
|
||||
def tryAttempt(): Future[T] = {
|
||||
try {
|
||||
attempt()
|
||||
|
|
@ -172,30 +358,34 @@ object RetrySupport extends RetrySupport {
|
|||
}
|
||||
}
|
||||
|
||||
require(maxAttempts >= 0, "Parameter maxAttempts must >= 0.")
|
||||
def doRetry(nextAttempt: Int): Future[T] = delayFunction(nextAttempt) match {
|
||||
case Some(delay) =>
|
||||
if (delay.length < 1)
|
||||
retry(attempt, shouldRetry, maxAttempts, delayFunction, nextAttempt)
|
||||
else
|
||||
after(delay, scheduler) {
|
||||
retry(attempt, shouldRetry, maxAttempts, delayFunction, nextAttempt)
|
||||
}
|
||||
case None =>
|
||||
retry(attempt, shouldRetry, maxAttempts, delayFunction, nextAttempt)
|
||||
case null =>
|
||||
Future.failed(new IllegalArgumentException("The delayFunction of retry should not return null."))
|
||||
}
|
||||
|
||||
require(attempt != null, "Parameter attempt should not be null.")
|
||||
require(maxAttempts >= 0, "Parameter maxAttempts must >= 0.")
|
||||
require(delayFunction != null, "Parameter delayFunction should not be null.")
|
||||
require(attempted >= 0, "Parameter attempted must >= 0.")
|
||||
|
||||
if (maxAttempts - attempted > 0) {
|
||||
val result = tryAttempt()
|
||||
if (result eq null)
|
||||
result
|
||||
else {
|
||||
val nextAttempt = attempted + 1
|
||||
result.recoverWith {
|
||||
case NonFatal(_) =>
|
||||
delayFunction(nextAttempt) match {
|
||||
case Some(delay) =>
|
||||
if (delay.length < 1)
|
||||
retry(attempt, maxAttempts, delayFunction, nextAttempt)
|
||||
else
|
||||
after(delay, scheduler) {
|
||||
retry(attempt, maxAttempts, delayFunction, nextAttempt)
|
||||
}
|
||||
case None =>
|
||||
retry(attempt, maxAttempts, delayFunction, nextAttempt)
|
||||
case null =>
|
||||
Future.failed(new IllegalArgumentException("The delayFunction of retry should not return null."))
|
||||
}
|
||||
|
||||
result.transformWith {
|
||||
case Success(value) if shouldRetry(value, null) => doRetry(attempted + 1)
|
||||
case Failure(e) if NonFatal(e) && shouldRetry(null.asInstanceOf[T], e) => doRetry(attempted + 1)
|
||||
case _ => result
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -85,4 +85,17 @@ public class FutureDocTest extends AbstractJavaTest {
|
|||
|
||||
retriedFuture.toCompletableFuture().get(2, SECONDS);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void useRetryWithPredicate() throws Exception {
|
||||
// #retry
|
||||
Callable<CompletionStage<String>> attempt = () -> CompletableFuture.completedFuture("test");
|
||||
|
||||
CompletionStage<String> retriedFuture =
|
||||
Patterns.retry(
|
||||
attempt, (notUsed, e) -> e != null, 3, java.time.Duration.ofMillis(200), system);
|
||||
// #retry
|
||||
|
||||
retriedFuture.toCompletableFuture().get(2, SECONDS);
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue