diff --git a/akka-actor-tests/src/test/java/akka/pattern/PatternsTest.java b/akka-actor-tests/src/test/java/akka/pattern/PatternsTest.java index db6dbaf286..dd70ed5d08 100644 --- a/akka-actor-tests/src/test/java/akka/pattern/PatternsTest.java +++ b/akka-actor-tests/src/test/java/akka/pattern/PatternsTest.java @@ -21,6 +21,8 @@ import scala.concurrent.duration.FiniteDuration; import java.util.Arrays; import java.util.concurrent.*; import java.time.Duration; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import static akka.pattern.Patterns.ask; import static akka.pattern.Patterns.pipe; @@ -240,6 +242,43 @@ public class PatternsTest extends JUnitSuite { probe.expectMsg("hi!"); } + @Test + public void testRetryCompletionStageNoDelay() throws Exception { + final String expected = "hello"; + + CompletionStage retriedFuture = + Patterns.retry(() -> CompletableFuture.completedFuture(expected), 3, ec); + + String actual = retriedFuture.toCompletableFuture().get(3, SECONDS); + assertEquals(expected, actual); + } + + @Test + public void testRetryCompletionStageRandomDelay() throws Exception { + final String expected = "hello"; + final AtomicInteger counter = new AtomicInteger(0); + CompletionStage retriedFuture = + Patterns.retry( + () -> { + if (counter.incrementAndGet() <= 3) { + final CompletableFuture empty = new CompletableFuture<>(); + empty.completeExceptionally(new RuntimeException("failed by purpose.")); + return empty; + } else { + return CompletableFuture.completedFuture(expected); + } + }, + 3, + Duration.ofMillis(100), + Duration.ofMillis(200), + 0.2d, + system.scheduler(), + ec); + + String actual = retriedFuture.toCompletableFuture().get(3, SECONDS); + assertEquals(expected, actual); + } + @Test public void testRetry() throws Exception { final String expected = "hello"; diff --git a/akka-actor-tests/src/test/scala/akka/pattern/RetrySpec.scala b/akka-actor-tests/src/test/scala/akka/pattern/RetrySpec.scala index cb2007553e..2f89e8a10e 100644 --- a/akka-actor-tests/src/test/scala/akka/pattern/RetrySpec.scala +++ b/akka-actor-tests/src/test/scala/akka/pattern/RetrySpec.scala @@ -82,6 +82,48 @@ class RetrySpec extends AkkaSpec with RetrySupport { intercept[IllegalStateException] { Await.result(retried, remaining) }.getMessage should ===("6") } } + + "return a failure for a Future that would have succeeded but retires were exhausted with delay function" in { + @volatile var failCount = 0 + @volatile var attemptedCount = 0; + + def attempt() = { + if (failCount < 10) { + failCount += 1 + Future.failed(new IllegalStateException(failCount.toString)) + } else Future.successful(5) + } + + val retried = retry(() => attempt, 5, attempted => { + attemptedCount = attempted + Some(100.milliseconds * attempted) + }) + within(30000000 seconds) { + intercept[IllegalStateException] { Await.result(retried, remaining) }.getMessage should ===("6") + attemptedCount shouldBe 5 + } + } + + "retry can be attempted without any delay" in { + @volatile var failCount = 0 + + def attempt() = { + if (failCount < 1000) { + failCount += 1 + Future.failed(new IllegalStateException(failCount.toString)) + } else Future.successful(1) + } + val start = System.currentTimeMillis() + val retried = retry(() => attempt, 999) + + within(1 seconds) { + intercept[IllegalStateException] { + Await.result(retried, remaining) + }.getMessage should ===("1000") + val elapse = System.currentTimeMillis() - start + elapse <= 100 shouldBe true + } + } } } diff --git a/akka-actor/src/main/scala/akka/pattern/Patterns.scala b/akka-actor/src/main/scala/akka/pattern/Patterns.scala index 993d66af69..d0c9ddf4c2 100644 --- a/akka-actor/src/main/scala/akka/pattern/Patterns.scala +++ b/akka-actor/src/main/scala/akka/pattern/Patterns.scala @@ -4,6 +4,7 @@ package akka.pattern +import java.util.Optional import java.util.concurrent.{ Callable, CompletionStage, TimeUnit } import akka.actor.{ ActorSelection, Scheduler } @@ -454,10 +455,57 @@ object Patterns { value: CompletionStage[T]): CompletionStage[T] = afterCompletionStage(duration.asScala, scheduler)(value)(context) + /** + * Returns an internally retrying [[java.util.concurrent.CompletionStage]] + * The first attempt will be made immediately, each subsequent attempt will be made immediately + * if the previous attempt failed. + * + * If attempts are exhausted the returned completion operator is simply the result of invoking attempt. + * Note that the attempt function will be invoked on the given execution context for subsequent tries + * and therefore must be thread safe (not touch unsafe mutable state). + */ + def retry[T](attempt: Callable[CompletionStage[T]], attempts: Int, ec: ExecutionContext): CompletionStage[T] = { + require(attempt != null, "Parameter attempt should not be null.") + scalaRetry(() => attempt.call().toScala, attempts)(ec).toJava + } + + /** + * Returns an internally retrying [[java.util.concurrent.CompletionStage]] + * The first attempt will be made immediately, each subsequent attempt will be made with a backoff time, + * if the previous attempt failed. + * + * If attempts are exhausted the returned future is simply the result of invoking attempt. + * Note that the attempt function will be invoked on the given execution context for subsequent tries and + * therefore must be thread safe (not touch unsafe mutable state). + * + * @param minBackoff minimum (initial) duration until the child actor will + * started again, if it is terminated + * @param maxBackoff the exponential back-off is capped to this duration + * @param randomFactor after calculation of the exponential back-off an additional + * random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay. + * In order to skip this additional delay pass in `0`. + */ + def retry[T]( + attempt: Callable[CompletionStage[T]], + attempts: Int, + minBackoff: java.time.Duration, + maxBackoff: java.time.Duration, + randomFactor: Double, + scheduler: Scheduler, + ec: ExecutionContext): CompletionStage[T] = { + require(attempt != null, "Parameter attempt should not be null.") + require(minBackoff != null, "Parameter minBackoff should not be null.") + require(maxBackoff != null, "Parameter minBackoff should not be null.") + scalaRetry(() => attempt.call().toScala, attempts, minBackoff.asScala, maxBackoff.asScala, randomFactor)( + ec, + scheduler).toJava + } + /** * Returns an internally retrying [[scala.concurrent.Future]] * The first attempt will be made immediately, and each subsequent attempt will be made after 'delay'. * A scheduler (eg context.system.scheduler) must be provided to delay each retry + * * If attempts are exhausted the returned future is simply the result of invoking attempt. * Note that the attempt function will be invoked on the given execution context for subsequent tries and * therefore must be thread safe (not touch unsafe mutable state). @@ -467,13 +515,16 @@ object Patterns { attempts: Int, delay: FiniteDuration, scheduler: Scheduler, - context: ExecutionContext): Future[T] = + context: ExecutionContext): Future[T] = { + require(attempt != null, "Parameter attempt should not be null.") scalaRetry(() => attempt.call, attempts, delay)(context, scheduler) + } /** * Returns an internally retrying [[java.util.concurrent.CompletionStage]] * The first attempt will be made immediately, and each subsequent attempt will be made after 'delay'. * A scheduler (eg context.system.scheduler) must be provided to delay each retry + * * If attempts are exhausted the returned completion operator is simply the result of invoking attempt. * Note that the attempt function will be invoked on the given execution context for subsequent tries * and therefore must be thread safe (not touch unsafe mutable state). @@ -483,8 +534,37 @@ object Patterns { attempts: Int, delay: java.time.Duration, scheduler: Scheduler, - ec: ExecutionContext): CompletionStage[T] = + ec: ExecutionContext): CompletionStage[T] = { + require(attempt != null, "Parameter attempt should not be null.") scalaRetry(() => attempt.call().toScala, attempts, delay.asScala)(ec, scheduler).toJava + } + + /** + * Returns an internally retrying [[java.util.concurrent.CompletionStage]]. + * The first attempt will be made immediately, each subsequent attempt will be made after + * the 'delay' return by `delayFunction`(the input next attempt count start from 1). + * Return an empty [[Optional]] instance for no delay. + * A scheduler (eg context.system.scheduler) must be provided to delay each retry. + * You could provide a function to generate the next delay duration after first attempt, + * this function should never return `null`, otherwise an [[IllegalArgumentException]] will be through. + * + * If attempts are exhausted the returned future is simply the result of invoking attempt. + * Note that the attempt function will be invoked on the given execution context for subsequent tries and + * therefore must be thread safe (not touch unsafe mutable state). + */ + def retry[T]( + attempt: Callable[CompletionStage[T]], + attempts: Int, + delayFunction: java.util.function.IntFunction[Optional[java.time.Duration]], + scheduler: Scheduler, + context: ExecutionContext): CompletionStage[T] = { + import scala.compat.java8.OptionConverters._ + require(attempt != null, "Parameter attempt should not be null.") + scalaRetry( + () => attempt.call().toScala, + attempts, + attempted => delayFunction.apply(attempted).asScala.map(_.asScala))(context, scheduler).toJava + } } /** diff --git a/akka-actor/src/main/scala/akka/pattern/RetrySupport.scala b/akka-actor/src/main/scala/akka/pattern/RetrySupport.scala index 1ac71c0baa..dbdb051bf9 100644 --- a/akka-actor/src/main/scala/akka/pattern/RetrySupport.scala +++ b/akka-actor/src/main/scala/akka/pattern/RetrySupport.scala @@ -5,8 +5,9 @@ package akka.pattern import akka.actor.Scheduler +import akka.util.ConstantFun -import scala.concurrent.duration.FiniteDuration +import scala.concurrent.duration.{ Duration, FiniteDuration } import scala.concurrent.{ ExecutionContext, Future } import scala.util.control.NonFatal @@ -16,9 +17,77 @@ import scala.util.control.NonFatal trait RetrySupport { /** - * Given a function from Unit to Future, returns an internally retrying Future - * The first attempt will be made immediately, each subsequent attempt will be made after 'delay' - * A scheduler (eg context.system.scheduler) must be provided to delay each retry + * Given a function from Unit to Future, returns an internally retrying Future. + * The first attempt will be made immediately, each subsequent attempt will be made immediately + * if the previous attempt failed. + * + * If attempts are exhausted the returned future is simply the result of invoking attempt. + * Note that the attempt function will be invoked on the given execution context for subsequent + * tries and therefore must be thread safe (not touch unsafe mutable state). + * + * Example usage: + * + * {{{ + * def possiblyFailing(): Future[Something] = ??? + * val withRetry: Future[Something] = retry(attempt = possiblyFailing, attempts = 10) + * }}} + */ + def retry[T](attempt: () => Future[T], attempts: Int)(implicit ec: ExecutionContext): Future[T] = { + RetrySupport.retry(attempt, attempts, attempted = 0) + } + + /** + * Given a function from Unit to Future, returns an internally retrying Future. + * The first attempt will be made immediately, each subsequent attempt will be made with a backoff time, + * if the previous attempt failed. + * + * If attempts are exhausted the returned future is simply the result of invoking attempt. + * Note that the attempt function will be invoked on the given execution context for subsequent + * tries and therefore must be thread safe (not touch unsafe mutable state). + * + * Example usage: + * + * {{{ + * protected val sendAndReceive: HttpRequest => Future[HttpResponse] + * private val sendReceiveRetry: HttpRequest => Future[HttpResponse] = (req: HttpRequest) => retry[HttpResponse]( + * attempt = () => sendAndReceive(req), + * attempts = 10, + * minBackoff = 1.seconds, + * maxBackoff = 2.seconds, + * randomFactor = 0.5 + * ) + * }}} + * + * @param minBackoff minimum (initial) duration until the child actor will + * started again, if it is terminated + * @param maxBackoff the exponential back-off is capped to this duration + * @param randomFactor after calculation of the exponential back-off an additional + * random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay. + * In order to skip this additional delay pass in `0`. + */ + def retry[T]( + attempt: () => Future[T], + attempts: Int, + minBackoff: FiniteDuration, + maxBackoff: FiniteDuration, + randomFactor: Double)(implicit ec: ExecutionContext, scheduler: Scheduler): Future[T] = { + require(attempt != null, "Parameter attempt should not be null.") + require(minBackoff != null, "Parameter minBackoff should not be null.") + require(maxBackoff != null, "Parameter maxBackoff should not be null.") + require(minBackoff > Duration.Zero, "Parameter minBackoff must be > 0") + require(maxBackoff >= minBackoff, "Parameter maxBackoff must be >= minBackoff") + require(0.0 <= randomFactor && randomFactor <= 1.0, "randomFactor must be between 0.0 and 1.0") + retry( + attempt, + attempts, + attempted => Some(BackoffSupervisor.calculateDelay(attempted, minBackoff, maxBackoff, randomFactor))) + } + + /** + * Given a function from Unit to Future, returns an internally retrying Future. + * The first attempt will be made immediately, each subsequent attempt will be made after 'delay'. + * A scheduler (eg context.system.scheduler) must be provided to delay each retry. + * * If attempts are exhausted the returned future is simply the result of invoking attempt. * Note that the attempt function will be invoked on the given execution context for subsequent * tries and therefore must be thread safe (not touch unsafe mutable state). @@ -37,14 +106,81 @@ trait RetrySupport { def retry[T](attempt: () => Future[T], attempts: Int, delay: FiniteDuration)( implicit ec: ExecutionContext, scheduler: Scheduler): Future[T] = { + retry(attempt, attempts, _ => Some(delay)) + } + + /** + * Given a function from Unit to Future, returns an internally retrying Future. + * The first attempt will be made immediately, each subsequent attempt will be made after + * the 'delay' return by `delayFunction`(the input next attempt count start from 1). + * Returns [[None]] for no delay. + * A scheduler (eg context.system.scheduler) must be provided to delay each retry. + * You could provide a function to generate the next delay duration after first attempt, + * this function should never return `null`, otherwise an [[IllegalArgumentException]] will be through. + * + * If attempts are exhausted the returned future is simply the result of invoking attempt. + * Note that the attempt function will be invoked on the given execution context for subsequent + * tries and therefore must be thread safe (not touch unsafe mutable state). + * + * Example usage: + * + * //retry with back off + * {{{ + * protected val sendAndReceive: HttpRequest => Future[HttpResponse] + * private val sendReceiveRetry: HttpRequest => Future[HttpResponse] = (req: HttpRequest) => retry[HttpResponse]( + * attempt = () => sendAndReceive(req), + * attempts = 10, + * delayFunction = attempted => Option(2.seconds * attempted) + * ) + * }}} + */ + def retry[T](attempt: () => Future[T], attempts: Int, delayFunction: Int => Option[FiniteDuration])( + implicit + ec: ExecutionContext, + scheduler: Scheduler): Future[T] = { + RetrySupport.retry(attempt, attempts, delayFunction, attempted = 0) + } +} + +object RetrySupport extends RetrySupport { + + private def retry[T](attempt: () => Future[T], maxAttempts: Int, attempted: Int)( + implicit ec: ExecutionContext): Future[T] = + retry(attempt, maxAttempts, ConstantFun.scalaAnyToNone, attempted)(ec, null) + + private def retry[T]( + attempt: () => Future[T], + maxAttempts: Int, + delayFunction: Int => Option[FiniteDuration], + attempted: Int)(implicit ec: ExecutionContext, scheduler: Scheduler): Future[T] = { try { - if (attempts > 0) { - attempt().recoverWith { - case NonFatal(_) => - after(delay, scheduler) { - retry(attempt, attempts - 1, delay) - } + require(maxAttempts >= 0, "Parameter maxAttempts must >= 0.") + require(attempt != null, "Parameter attempt should not be null.") + if (maxAttempts - attempted > 0) { + val result = attempt() + if (result eq null) + result + else { + val nextAttempt = attempted + 1 + result.recoverWith { + case NonFatal(_) => + delayFunction(nextAttempt) match { + case Some(delay) => + if (delay.length < 1) + retry(attempt, maxAttempts, delayFunction, nextAttempt) + else + after(delay, scheduler) { + retry(attempt, maxAttempts, delayFunction, nextAttempt) + } + case None => + retry(attempt, maxAttempts, delayFunction, nextAttempt) + case _ => + Future.failed(new IllegalArgumentException("The delayFunction of retry should not return null.")) + } + + } } + } else { attempt() } @@ -53,5 +189,3 @@ trait RetrySupport { } } } - -object RetrySupport extends RetrySupport