diff --git a/akka-actor-tests/src/test/scala/akka/pattern/RetrySpec.scala b/akka-actor-tests/src/test/scala/akka/pattern/RetrySpec.scala new file mode 100644 index 0000000000..56b53f3c30 --- /dev/null +++ b/akka-actor-tests/src/test/scala/akka/pattern/RetrySpec.scala @@ -0,0 +1,103 @@ +/** + * Copyright (C) 2009-2018 Lightbend Inc. + */ + +package akka.pattern + +import akka.actor.Scheduler + +import language.postfixOps +import akka.testkit.AkkaSpec + +import scala.concurrent.{ Await, ExecutionContextExecutor, Future } +import scala.concurrent.duration._ + +class RetrySpec extends AkkaSpec with RetrySupport { + implicit val ec: ExecutionContextExecutor = system.dispatcher + implicit val scheduler: Scheduler = system.scheduler + + "pattern.retry" must { + "run a successful Future immediately" in { + val retried = retry( + () ⇒ Future.successful(5), + 5, + 1 second + ) + + within(3 seconds) { + Await.result(retried, remaining) should ===(5) + } + } + + "run a successful Future only once" in { + @volatile var counter = 0 + val retried = retry( + () ⇒ Future.successful({ + counter += 1 + counter + }), + 5, + 1 second + ) + + within(3 seconds) { + Await.result(retried, remaining) should ===(1) + } + } + + "eventually return a failure for a Future that will never succeed" in { + val retried = retry( + () ⇒ Future.failed(new IllegalStateException("Mexico")), + 5, + 100 milliseconds + ) + + within(3 second) { + intercept[IllegalStateException] { Await.result(retried, remaining) }.getMessage should ===("Mexico") + } + } + + "return a success for a Future that succeeds eventually" in { + @volatile var failCount = 0 + + def attempt() = { + if (failCount < 5) { + failCount += 1 + Future.failed(new IllegalStateException(failCount.toString)) + } else Future.successful(5) + } + + val retried = retry( + attempt, + 10, + 100 milliseconds + ) + + within(3 seconds) { + Await.result(retried, remaining) should ===(5) + } + } + + "return a failure for a Future that would have succeeded but retires were exhausted" in { + @volatile var failCount = 0 + + def attempt() = { + if (failCount < 10) { + failCount += 1 + Future.failed(new IllegalStateException(failCount.toString)) + } else Future.successful(5) + } + + val retried = retry( + attempt, + 5, + 100 milliseconds + ) + + within(3 seconds) { + intercept[IllegalStateException] { Await.result(retried, remaining) }.getMessage should ===("6") + } + } + } + +} diff --git a/akka-actor/src/main/scala/akka/pattern/Patterns.scala b/akka-actor/src/main/scala/akka/pattern/Patterns.scala index 541f4ec026..f2bd90e861 100644 --- a/akka-actor/src/main/scala/akka/pattern/Patterns.scala +++ b/akka-actor/src/main/scala/akka/pattern/Patterns.scala @@ -23,7 +23,13 @@ import scala.concurrent.ExecutionContext object Patterns { import akka.actor.ActorRef import akka.japi - import akka.pattern.{ after ⇒ scalaAfter, ask ⇒ scalaAsk, gracefulStop ⇒ scalaGracefulStop, pipe ⇒ scalaPipe } + import akka.pattern.{ + after ⇒ scalaAfter, + ask ⇒ scalaAsk, + gracefulStop ⇒ scalaGracefulStop, + pipe ⇒ scalaPipe, + retry ⇒ scalaRetry + } import akka.util.Timeout import scala.concurrent.Future @@ -268,6 +274,18 @@ object Patterns { */ def after[T](duration: FiniteDuration, scheduler: Scheduler, context: ExecutionContext, value: Future[T]): Future[T] = scalaAfter(duration, scheduler)(value)(context) + + /** + * Returns an internally retrying [[scala.concurrent.Future]] + * The first attempt will be made immediately, and each subsequent attempt will be made after 'delay'. + * A scheduler (eg context.system.scheduler) must be provided to delay each retry + * If attempts are exhausted the returned future is simply the result of invoking attempt. + * Note that the attempt function will be invoked on the given execution context for subsequent tries and + * therefore must be thread safe (not touch unsafe mutable state). + */ + def retry[T](attempt: Callable[Future[T]], attempts: Int, delay: FiniteDuration, scheduler: Scheduler, + context: ExecutionContext): Future[T] = + scalaRetry(() ⇒ attempt.call, attempts, delay)(context, scheduler) } /** @@ -278,7 +296,7 @@ object Patterns { object PatternsCS { import akka.actor.ActorRef import akka.japi - import akka.pattern.{ ask ⇒ scalaAsk, gracefulStop ⇒ scalaGracefulStop } + import akka.pattern.{ ask ⇒ scalaAsk, gracefulStop ⇒ scalaGracefulStop, retry ⇒ scalaRetry } import akka.util.Timeout import scala.concurrent.duration._ @@ -526,4 +544,15 @@ object PatternsCS { */ def after[T](duration: FiniteDuration, scheduler: Scheduler, context: ExecutionContext, value: CompletionStage[T]): CompletionStage[T] = afterCompletionStage(duration, scheduler)(value)(context) + + /** + * Returns an internally retrying [[java.util.concurrent.CompletionStage]] + * The first attempt will be made immediately, and each subsequent attempt will be made after 'delay'. + * A scheduler (eg context.system.scheduler) must be provided to delay each retry + * If attempts are exhausted the returned completion stage is simply the result of invoking attempt. + * Note that the attempt function will be invoked on the given execution context for subsequent tries + * and therefore must be thread safe (not touch unsafe mutable state). + */ + def retry[T](attempt: Callable[CompletionStage[T]], attempts: Int, delay: FiniteDuration, scheduler: Scheduler, ec: ExecutionContext): CompletionStage[T] = + scalaRetry(() ⇒ attempt.call().toScala, attempts, delay)(ec, scheduler).toJava } diff --git a/akka-actor/src/main/scala/akka/pattern/RetrySupport.scala b/akka-actor/src/main/scala/akka/pattern/RetrySupport.scala new file mode 100644 index 0000000000..7fbbeae534 --- /dev/null +++ b/akka-actor/src/main/scala/akka/pattern/RetrySupport.scala @@ -0,0 +1,55 @@ +/** + * Copyright (C) 2009-2018 Lightbend Inc. + */ + +package akka.pattern + +import akka.actor.Scheduler + +import scala.concurrent.duration.FiniteDuration +import scala.concurrent.{ ExecutionContext, Future } +import scala.util.control.NonFatal + +/** + * This trait provides the retry utility function + */ +trait RetrySupport { + + /** + * Given a function from Unit to Future, returns an internally retrying Future + * The first attempt will be made immediately, each subsequent attempt will be made after 'delay' + * A scheduler (eg context.system.scheduler) must be provided to delay each retry + * If attempts are exhausted the returned future is simply the result of invoking attempt. + * Note that the attempt function will be invoked on the given execution context for subsequent + * tries and therefore must be thread safe (not touch unsafe mutable state). + * + * Example usage: + * + * {{{ + * protected val sendAndReceive: HttpRequest => Future[HttpResponse] + * private val sendReceiveRetry: HttpRequest => Future[HttpResponse] = (req: HttpRequest) => retry[HttpResponse]( + * attempt = () => sendAndReceive(req), + * attempts = 10, + * delay = 2 seconds, + * scheduler = context.system.scheduler + * ) + * }}} + */ + def retry[T](attempt: () ⇒ Future[T], attempts: Int, delay: FiniteDuration)(implicit ec: ExecutionContext, scheduler: Scheduler): Future[T] = { + try { + if (attempts > 0) { + attempt() recoverWith { + case NonFatal(_) ⇒ after(delay, scheduler) { + retry(attempt, attempts - 1, delay) + } + } + } else { + attempt() + } + } catch { + case NonFatal(error) ⇒ Future.failed(error) + } + } +} + +object RetrySupport extends RetrySupport diff --git a/akka-actor/src/main/scala/akka/pattern/package.scala b/akka-actor/src/main/scala/akka/pattern/package.scala index 2d07ad75aa..35793b432c 100644 --- a/akka-actor/src/main/scala/akka/pattern/package.scala +++ b/akka-actor/src/main/scala/akka/pattern/package.scala @@ -40,4 +40,5 @@ package akka * }}} */ package object pattern extends PipeToSupport with AskSupport with GracefulStopSupport with FutureTimeoutSupport + with RetrySupport diff --git a/akka-docs/src/main/paradox/futures.md b/akka-docs/src/main/paradox/futures.md index 254d687df9..6a2735e783 100644 --- a/akka-docs/src/main/paradox/futures.md +++ b/akka-docs/src/main/paradox/futures.md @@ -389,8 +389,19 @@ Scala Java : @@snip [FutureDocTest.java]($code$/java/jdocs/future/FutureDocTest.java) { #imports7 #after } +## Retry + +`akka.pattern.retry` will retry a @scala[`Future` class]@java[`CompletionStage` class] some number of times with a delay between each attempt. + +Scala +: @@snip [FutureDocSpec.scala]($code$/scala/docs/future/FutureDocSpec.scala) { #retry } + +Java +: @@snip [FutureDocTest.java]($code$/java/jdocs/future/FutureDocTest.java) { #imports8 #retry } + @@@ div { .group-java } + ## Java 8, CompletionStage and CompletableFuture Starting with Akka 2.4.2 we have begun to introduce Java 8 `java.util.concurrent.CompletionStage` in Java APIs. diff --git a/akka-docs/src/test/java/jdocs/future/FutureDocTest.java b/akka-docs/src/test/java/jdocs/future/FutureDocTest.java index 28a586c39d..91c1e3111e 100644 --- a/akka-docs/src/test/java/jdocs/future/FutureDocTest.java +++ b/akka-docs/src/test/java/jdocs/future/FutureDocTest.java @@ -7,6 +7,7 @@ package jdocs.future; //#imports1 import akka.dispatch.*; import jdocs.AbstractJavaTest; +import scala.Function0; import scala.concurrent.ExecutionContext; import scala.concurrent.Future; import scala.concurrent.Await; @@ -19,7 +20,9 @@ import akka.util.Timeout; //#imports2 import scala.concurrent.duration.Duration; import akka.japi.Function; -import java.util.concurrent.Callable; + +import java.util.concurrent.*; + import static akka.dispatch.Futures.future; import static java.util.concurrent.TimeUnit.SECONDS; @@ -58,13 +61,15 @@ import static akka.pattern.Patterns.after; import java.util.Arrays; //#imports7 + +//#imports8 + +import static akka.pattern.PatternsCS.retry; + +//#imports8 + import java.util.ArrayList; import java.util.List; -import java.util.concurrent.Executor; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.CompletionStage; -import java.util.concurrent.CountDownLatch; import scala.compat.java8.FutureConverters; @@ -541,6 +546,18 @@ public class FutureDocTest extends AbstractJavaTest { Await.result(result, Duration.create(2, SECONDS)); } + @Test + public void useRetry() throws Exception { + + //#retry + final ExecutionContext ec = system.dispatcher(); + Callable> attempt = () -> CompletableFuture.completedFuture("test"); + CompletionStage retriedFuture = retry(attempt, 3, Duration.create(200, "millis"), system.scheduler(), ec); + //#retry + + retriedFuture.toCompletableFuture().get(2, SECONDS); + } + @Test public void thenApplyCompletionThread() throws Exception { //#apply-completion-thread diff --git a/akka-docs/src/test/scala/docs/future/FutureDocSpec.scala b/akka-docs/src/test/scala/docs/future/FutureDocSpec.scala index cb2d154638..8c962346a5 100644 --- a/akka-docs/src/test/scala/docs/future/FutureDocSpec.scala +++ b/akka-docs/src/test/scala/docs/future/FutureDocSpec.scala @@ -430,6 +430,27 @@ class FutureDocSpec extends AkkaSpec { intercept[IllegalStateException] { Await.result(result, 2 second) } } + "demonstrate pattern.retry" in { + //#retry + implicit val scheduler = system.scheduler + //Given some future that will succeed eventually + @volatile var failCount = 0 + def attempt() = { + if (failCount < 5) { + failCount += 1 + Future.failed(new IllegalStateException(failCount.toString)) + } else Future.successful(5) + } + //Return a new future that will retry up to 10 times + val retried = akka.pattern.retry( + attempt, + 10, + 100 milliseconds) + //#retry + + Await.result(retried, 1 second) should ===(5) + } + "demonstrate context.dispatcher" in { //#context-dispatcher class A extends Actor {