From 7d328c9984a968ec3879e24e49606ca081b338cd Mon Sep 17 00:00:00 2001 From: Enno <458526+ennru@users.noreply.github.com> Date: Thu, 2 Jul 2020 10:19:11 +0200 Subject: [PATCH] Docs: re-add akka.patterns page (#29024) * Docs: re-add akka.patterns page * Add overloads with ClassicActorSystemProvider * docs for Java `after` --- .../akka/pattern/FutureTimeoutSupport.scala | 17 + .../main/scala/akka/pattern/Patterns.scala | 61 +- akka-docs/src/main/paradox/.htaccess | 1 - akka-docs/src/main/paradox/futures.md | 31 + akka-docs/src/main/paradox/index-utilities.md | 1 + .../test/java/jdocs/future/FutureDocTest.java | 932 +----------------- .../scala/docs/future/FutureDocSpec.scala | 25 +- 7 files changed, 164 insertions(+), 904 deletions(-) create mode 100644 akka-docs/src/main/paradox/futures.md diff --git a/akka-actor/src/main/scala/akka/pattern/FutureTimeoutSupport.scala b/akka-actor/src/main/scala/akka/pattern/FutureTimeoutSupport.scala index dd9a3f08bf..4e77e1a232 100644 --- a/akka-actor/src/main/scala/akka/pattern/FutureTimeoutSupport.scala +++ b/akka-actor/src/main/scala/akka/pattern/FutureTimeoutSupport.scala @@ -17,6 +17,23 @@ import akka.dispatch.Futures trait FutureTimeoutSupport { + /** + * Returns a [[scala.concurrent.Future]] that will be completed with the success or failure of the provided value + * after the specified duration. + */ + def after[T](duration: FiniteDuration)(value: => Future[T])( + implicit system: ClassicActorSystemProvider): Future[T] = { + after(duration, using = system.classicSystem.scheduler)(value)(system.classicSystem.dispatcher) + } + + /** + * Returns a [[java.util.concurrent.CompletionStage]] that will be completed with the success or failure of the provided value + * after the specified duration. + */ + def afterCompletionStage[T](duration: FiniteDuration)(value: => CompletionStage[T])( + implicit system: ClassicActorSystemProvider): CompletionStage[T] = + afterCompletionStage(duration, system.classicSystem.scheduler)(value)(system.classicSystem.dispatcher) + /** * Returns a [[scala.concurrent.Future]] that will be completed with the success or failure of the provided value * after the specified duration. diff --git a/akka-actor/src/main/scala/akka/pattern/Patterns.scala b/akka-actor/src/main/scala/akka/pattern/Patterns.scala index 8197a42ca1..2bac6b8250 100644 --- a/akka-actor/src/main/scala/akka/pattern/Patterns.scala +++ b/akka-actor/src/main/scala/akka/pattern/Patterns.scala @@ -9,8 +9,7 @@ import java.util.concurrent.{ Callable, CompletionStage, TimeUnit } import scala.compat.java8.FutureConverters._ import scala.concurrent.ExecutionContext - -import akka.actor.{ ActorSelection, Scheduler } +import akka.actor.{ ActorSelection, ClassicActorSystemProvider, Scheduler } import akka.util.JavaDurationConverters._ /** @@ -424,6 +423,16 @@ object Patterns { value: Callable[Future[T]]): Future[T] = scalaAfter(duration, scheduler)(value.call())(context) + /** + * Returns a [[java.util.concurrent.CompletionStage]] that will be completed with the success or failure of the provided Callable + * after the specified duration. + */ + def after[T]( + duration: java.time.Duration, + system: ClassicActorSystemProvider, + value: Callable[CompletionStage[T]]): CompletionStage[T] = + after(duration, system.classicSystem.scheduler, system.classicSystem.dispatcher, value) + /** * Returns a [[java.util.concurrent.CompletionStage]] that will be completed with the success or failure of the provided Callable * after the specified duration. @@ -469,6 +478,38 @@ object Patterns { scalaRetry(() => attempt.call().toScala, attempts)(ec).toJava } + /** + * Returns an internally retrying [[java.util.concurrent.CompletionStage]] + * The first attempt will be made immediately, each subsequent attempt will be made with a backoff time, + * if the previous attempt failed. + * + * If attempts are exhausted the returned future is simply the result of invoking attempt. + * Note that the attempt function will be invoked on the given execution context for subsequent tries and + * therefore must be thread safe (not touch unsafe mutable state). + * + * @param minBackoff minimum (initial) duration until the child actor will + * started again, if it is terminated + * @param maxBackoff the exponential back-off is capped to this duration + * @param randomFactor after calculation of the exponential back-off an additional + * random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay. + * In order to skip this additional delay pass in `0`. + */ + def retry[T]( + attempt: Callable[CompletionStage[T]], + attempts: Int, + minBackoff: java.time.Duration, + maxBackoff: java.time.Duration, + randomFactor: Double, + system: ClassicActorSystemProvider): CompletionStage[T] = + retry( + attempt, + attempts, + minBackoff, + maxBackoff, + randomFactor, + system.classicSystem.scheduler, + system.classicSystem.dispatcher) + /** * Returns an internally retrying [[java.util.concurrent.CompletionStage]] * The first attempt will be made immediately, each subsequent attempt will be made with a backoff time, @@ -520,6 +561,22 @@ object Patterns { scalaRetry(() => attempt.call, attempts, delay)(context, scheduler) } + /** + * Returns an internally retrying [[java.util.concurrent.CompletionStage]] + * The first attempt will be made immediately, and each subsequent attempt will be made after 'delay'. + * A scheduler (eg context.system.scheduler) must be provided to delay each retry + * + * If attempts are exhausted the returned completion operator is simply the result of invoking attempt. + * Note that the attempt function will be invoked on the given execution context for subsequent tries + * and therefore must be thread safe (not touch unsafe mutable state). + */ + def retry[T]( + attempt: Callable[CompletionStage[T]], + attempts: Int, + delay: java.time.Duration, + system: ClassicActorSystemProvider): CompletionStage[T] = + retry(attempt, attempts, delay, system.classicSystem.scheduler, system.classicSystem.dispatcher) + /** * Returns an internally retrying [[java.util.concurrent.CompletionStage]] * The first attempt will be made immediately, and each subsequent attempt will be made after 'delay'. diff --git a/akka-docs/src/main/paradox/.htaccess b/akka-docs/src/main/paradox/.htaccess index 533788fb19..97fd4e4726 100644 --- a/akka-docs/src/main/paradox/.htaccess +++ b/akka-docs/src/main/paradox/.htaccess @@ -31,7 +31,6 @@ RedirectMatch 301 ^(.*)/additional/rolling-deploys\.html$ $1/additional/rolling- RedirectMatch 301 ^(.*)/additional/index\.html$ $1/project/index.html RedirectMatch 301 ^(.*)/howto\.html$ https://doc.akka.io/docs/akka/2.5/howto.html RedirectMatch 301 ^(.*)/common/duration\.html$ https://doc.akka.io/docs/akka/2.5/common/duration.html -RedirectMatch 301 ^(.*)/futures\.html$ https://doc.akka.io/docs/akka/2.5/futures.html RedirectMatch 301 ^(.*)/java8-compat\.html$ https://doc.akka.io/docs/akka/2.5/java8-compat.html RedirectMatch 301 ^(.*)/common/cluster\.html$ $1/typed/cluster-concepts.html diff --git a/akka-docs/src/main/paradox/futures.md b/akka-docs/src/main/paradox/futures.md new file mode 100644 index 0000000000..a8fcc6a297 --- /dev/null +++ b/akka-docs/src/main/paradox/futures.md @@ -0,0 +1,31 @@ +# Futures patterns + +## Dependency + +Akka offers tiny helpers for use with @scala[@scaladoc[Future](scala.concurrent.Future)s]@java[@javadoc[CompletionStage](java.util.concurrent.CompletionStage)]. These are part of Akka's core module: + +@@dependency[sbt,Maven,Gradle] { + group="com.typesafe.akka" + artifact="akka-actor_$scala.binary_version$" + version="$akka.version$" +} + +## After + +@scala[`akka.pattern.after`]@java[@javadoc[akka.pattern.Patterns.after](akka.pattern.Patterns#after)] makes it easy to complete a @scala[@scaladoc[Future](scala.concurrent.Future)]@java[@javadoc[CompletionStage](java.util.concurrent.CompletionStage)] with a value or exception after a timeout. + +Scala +: @@snip [FutureDocSpec.scala](/akka-docs/src/test/scala/docs/future/FutureDocSpec.scala) { #after } + +Java +: @@snip [FutureDocTest.java](/akka-docs/src/test/java/jdocs/future/FutureDocTest.java) { #imports #after } + +## Retry + +@scala[`akka.pattern.retry`]@java[@javadoc[akka.pattern.Patterns.retry](akka.pattern.Patterns#retry)] will retry a @scala[@scaladoc[Future](scala.concurrent.Future)]@java[@javadoc[CompletionStage](java.util.concurrent.CompletionStage)] some number of times with a delay between each attempt. + +Scala +: @@snip [FutureDocSpec.scala](/akka-docs/src/test/scala/docs/future/FutureDocSpec.scala) { #retry } + +Java +: @@snip [FutureDocTest.java](/akka-docs/src/test/java/jdocs/future/FutureDocTest.java) { #imports #retry } diff --git a/akka-docs/src/main/paradox/index-utilities.md b/akka-docs/src/main/paradox/index-utilities.md index 018bbc5989..9f52f8d40d 100644 --- a/akka-docs/src/main/paradox/index-utilities.md +++ b/akka-docs/src/main/paradox/index-utilities.md @@ -6,6 +6,7 @@ * [logging](typed/logging.md) * [common/circuitbreaker](common/circuitbreaker.md) +* [futures](futures.md) * [extensions](typed/extending.md) @@@ diff --git a/akka-docs/src/test/java/jdocs/future/FutureDocTest.java b/akka-docs/src/test/java/jdocs/future/FutureDocTest.java index 2749983757..c77d4789a1 100644 --- a/akka-docs/src/test/java/jdocs/future/FutureDocTest.java +++ b/akka-docs/src/test/java/jdocs/future/FutureDocTest.java @@ -4,90 +4,33 @@ package jdocs.future; -// #imports1 -import akka.dispatch.*; -import jdocs.AbstractJavaTest; -import scala.concurrent.ExecutionContext; -import scala.concurrent.Future; -import scala.concurrent.Await; -import scala.concurrent.Promise; -import akka.util.Timeout; - -// #imports1 - -// #imports2 -import java.time.Duration; -import java.util.concurrent.*; - -import scala.util.Try; - -import akka.japi.Function; - -import static akka.dispatch.Futures.future; -import static java.util.concurrent.TimeUnit.SECONDS; - -// #imports2 - -// #imports3 -import static akka.dispatch.Futures.sequence; - -// #imports3 - -// #imports4 -import static akka.dispatch.Futures.traverse; - -// #imports4 - -// #imports5 -import akka.japi.Function2; -import static akka.dispatch.Futures.fold; - -// #imports5 - -// #imports6 -import static akka.dispatch.Futures.reduce; - -// #imports6 - -// #imports7 -import static akka.pattern.Patterns.after; - -import java.util.Arrays; -// #imports7 - -// #imports8 - -import static akka.pattern.Patterns.retry; - -// #imports8 - -// #imports-ask -import static akka.pattern.Patterns.ask; -// #imports-ask -// #imports-pipe -import static akka.pattern.Patterns.pipe; -// #imports-pipe - -import java.util.ArrayList; -import java.util.List; - -import scala.compat.java8.FutureConverters; - +import akka.actor.typed.ActorSystem; +import akka.dispatch.Futures; +import akka.pattern.Patterns; import akka.testkit.AkkaJUnitActorSystemResource; +import akka.testkit.AkkaSpec; +import akka.util.Timeout; +import jdocs.AbstractJavaTest; import org.junit.ClassRule; import org.junit.Test; +import scala.compat.java8.FutureConverters; +import scala.concurrent.Await; +import scala.concurrent.ExecutionContext; +import scala.concurrent.Future; -import akka.testkit.AkkaSpec; -import akka.actor.Status.Failure; -import akka.actor.ActorSystem; -import akka.actor.AbstractActor; -import akka.actor.ActorRef; -import akka.actor.Props; +import java.time.Duration; +import java.util.Arrays; +import java.util.concurrent.Callable; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; + +import static akka.actor.typed.javadsl.Adapter.toTyped; +import static akka.dispatch.Futures.future; +// #imports import akka.pattern.Patterns; -import static org.hamcrest.CoreMatchers.is; -import static org.hamcrest.core.StringContains.containsString; -import static org.junit.Assert.*; +// #imports +import static java.util.concurrent.TimeUnit.SECONDS; public class FutureDocTest extends AbstractJavaTest { @@ -95,841 +38,44 @@ public class FutureDocTest extends AbstractJavaTest { public static AkkaJUnitActorSystemResource actorSystemResource = new AkkaJUnitActorSystemResource("FutureDocTest", AkkaSpec.testConf()); - private final ActorSystem system = actorSystemResource.getSystem(); + private final ActorSystem system = toTyped(actorSystemResource.getSystem()); - public static final class PrintResult extends OnSuccess { - @Override - public final void onSuccess(T t) { - // print t - } - } - - public static final class Demo { - // #print-result - public static final class PrintResult extends OnSuccess { - @Override - public final void onSuccess(T t) { - System.out.println(t); - } - } - // #print-result - } - - // #pipe-to-usage - public class ActorUsingPipeTo extends AbstractActor { - ActorRef target; - Duration timeout; - - ActorUsingPipeTo(ActorRef target) { - this.target = target; - this.timeout = Duration.ofSeconds(5); - } - - @Override - public Receive createReceive() { - return receiveBuilder() - .match( - String.class, - msg -> { - CompletableFuture fut = - ask(target, "some message", timeout).toCompletableFuture(); - - // the pipe pattern - pipe(fut, getContext().dispatcher()).to(getSender()); - }) - .build(); - } - } - // #pipe-to-usage - - // #pipe-to-returned-data - public class UserData { - final String data; - - UserData(String data) { - this.data = data; - } - } - - public class UserActivity { - final String activity; - - UserActivity(String activity) { - this.activity = activity; - } - } - // #pipe-to-returned-data - - // #pipe-to-user-data-actor - public class UserDataActor extends AbstractActor { - UserData internalData; - - UserDataActor() { - this.internalData = new UserData("initial data"); - } - - @Override - public Receive createReceive() { - return receiveBuilder() - .match(GetFromUserDataActor.class, msg -> sender().tell(internalData, self())) - .build(); - } - } - - public class GetFromUserDataActor {} - // #pipe-to-user-data-actor - - // #pipe-to-user-activity-actor - interface UserActivityRepository { - CompletableFuture> queryHistoricalActivities(String userId); - } - - public class UserActivityActor extends AbstractActor { - String userId; - UserActivityRepository repository; - - UserActivityActor(String userId, UserActivityRepository repository) { - this.userId = userId; - this.repository = repository; - } - - @Override - public Receive createReceive() { - return receiveBuilder() - .match( - GetFromUserActivityActor.class, - msg -> { - CompletableFuture> fut = - repository.queryHistoricalActivities(userId); - - pipe(fut, getContext().dispatcher()).to(sender()); - }) - .build(); - } - } - - public class GetFromUserActivityActor {} - // #pipe-to-user-activity-actor - - // #pipe-to-proxy-actor - public class UserProxyActor extends AbstractActor { - ActorRef userActor; - ActorRef userActivityActor; - Duration timeout = Duration.ofSeconds(5); - - UserProxyActor(ActorRef userActor, ActorRef userActivityActor) { - this.userActor = userActor; - this.userActivityActor = userActivityActor; - } - - @Override - public Receive createReceive() { - return receiveBuilder() - .match( - GetUserData.class, - msg -> { - CompletableFuture fut = - ask(userActor, new GetUserData(), timeout).toCompletableFuture(); - - pipe(fut, getContext().dispatcher()); - }) - .match( - GetUserActivities.class, - msg -> { - CompletableFuture fut = - ask(userActivityActor, new GetFromUserActivityActor(), timeout) - .toCompletableFuture(); - - pipe(fut, getContext().dispatcher()).to(sender()); - }) - .build(); - } - } - // #pipe-to-proxy-actor - - // #pipe-to-proxy-messages - public class GetUserData {} - - public class GetUserActivities {} - // #pipe-to-proxy-messages - - @SuppressWarnings("unchecked") - @Test - public void useCustomExecutionContext() throws Exception { - ExecutorService yourExecutorServiceGoesHere = Executors.newSingleThreadExecutor(); - // #diy-execution-context - ExecutionContext ec = ExecutionContexts.fromExecutorService(yourExecutorServiceGoesHere); - - // Use ec with your Futures - Future f1 = Futures.successful("foo"); - - // Then you shut down the ExecutorService at the end of your application. - yourExecutorServiceGoesHere.shutdown(); - // #diy-execution-context - } - - @Test - public void useBlockingFromActor() throws Exception { - ActorRef actor = system.actorOf(Props.create(MyActor.class)); - String msg = "hello"; - // #ask-blocking - Timeout timeout = Timeout.create(Duration.ofSeconds(5)); - Future future = Patterns.ask(actor, msg, timeout); - String result = (String) Await.result(future, timeout.duration()); - // #ask-blocking - - assertEquals("HELLO", result); - } - - @Test - public void useFutureEval() throws Exception { - // #future-eval - Future f = - future( - new Callable() { - public String call() { - return "Hello" + "World"; - } - }, - system.dispatcher()); - - f.onComplete(new PrintResult>(), system.dispatcher()); - // #future-eval - Timeout timeout = Timeout.create(Duration.ofSeconds(5)); - String result = (String) Await.result(f, timeout.duration()); - assertEquals("HelloWorld", result); - } - - @Test - public void useMap() throws Exception { - // #map - final ExecutionContext ec = system.dispatcher(); - - Future f1 = - future( - new Callable() { - public String call() { - return "Hello" + "World"; - } - }, - ec); - - Future f2 = - f1.map( - new Mapper() { - public Integer apply(String s) { - return s.length(); - } - }, - ec); - - f2.onComplete(new PrintResult>(), system.dispatcher()); - // #map - Timeout timeout = Timeout.create(Duration.ofSeconds(5)); - int result = Await.result(f2, timeout.duration()); - assertEquals(10, result); - } - - @Test - public void useFlatMap() throws Exception { - // #flat-map - final ExecutionContext ec = system.dispatcher(); - - Future f1 = - future( - new Callable() { - public String call() { - return "Hello" + "World"; - } - }, - ec); - - Future f2 = - f1.flatMap( - new Mapper>() { - public Future apply(final String s) { - return future( - new Callable() { - public Integer call() { - return s.length(); - } - }, - ec); - } - }, - ec); - - f2.onComplete(new PrintResult>(), system.dispatcher()); - // #flat-map - Timeout timeout = Timeout.create(Duration.ofSeconds(5)); - int result = Await.result(f2, timeout.duration()); - assertEquals(10, result); - } - - @Test - public void useSequence() throws Exception { - List> source = new ArrayList>(); - source.add(Futures.successful(1)); - source.add(Futures.successful(2)); - - // #sequence - final ExecutionContext ec = system.dispatcher(); - // Some source generating a sequence of Future:s - Iterable> listOfFutureInts = source; - - // now we have a Future[Iterable[Integer]] - Future> futureListOfInts = sequence(listOfFutureInts, ec); - - // Find the sum of the odd numbers - Future futureSum = - futureListOfInts.map( - new Mapper, Long>() { - public Long apply(Iterable ints) { - long sum = 0; - for (Integer i : ints) sum += i; - return sum; - } - }, - ec); - - futureSum.onComplete(new PrintResult>(), system.dispatcher()); - // #sequence - Timeout timeout = Timeout.create(Duration.ofSeconds(5)); - long result = Await.result(futureSum, timeout.duration()); - assertEquals(3L, result); - } - - @Test - public void useTraverse() throws Exception { - // #traverse - final ExecutionContext ec = system.dispatcher(); - // Just a sequence of Strings - Iterable listStrings = Arrays.asList("a", "b", "c"); - - Future> futureResult = - traverse( - listStrings, - new Function>() { - public Future apply(final String r) { - return future( - new Callable() { - public String call() { - return r.toUpperCase(); - } - }, - ec); - } - }, - ec); - - // Returns the sequence of strings as upper case - futureResult.onComplete(new PrintResult>>(), system.dispatcher()); - // #traverse - Timeout timeout = Timeout.create(Duration.ofSeconds(5)); - Iterable result = Await.result(futureResult, timeout.duration()); - assertEquals(Arrays.asList("A", "B", "C"), result); - } - - @Test - public void useFold() throws Exception { - List> source = new ArrayList>(); - source.add(Futures.successful("a")); - source.add(Futures.successful("b")); - // #fold - - final ExecutionContext ec = system.dispatcher(); - - // A sequence of Futures, in this case Strings - Iterable> futures = source; - - // Start value is the empty string - Future resultFuture = - fold( - "", - futures, - new Function2() { - public String apply(String r, String t) { - return r + t; // Just concatenate - } - }, - ec); - - resultFuture.onComplete(new PrintResult>(), system.dispatcher()); - // #fold - Timeout timeout = Timeout.create(Duration.ofSeconds(5)); - String result = Await.result(resultFuture, timeout.duration()); - assertEquals("ab", result); - } - - @Test - public void useReduce() throws Exception { - List> source = new ArrayList>(); - source.add(Futures.successful("a")); - source.add(Futures.successful("b")); - // #reduce - - final ExecutionContext ec = system.dispatcher(); - // A sequence of Futures, in this case Strings - Iterable> futures = source; - - Future resultFuture = - reduce( - futures, - new Function2() { - public Object apply(Object r, String t) { - return r + t; // Just concatenate - } - }, - ec); - - resultFuture.onComplete(new PrintResult>(), system.dispatcher()); - // #reduce - Timeout timeout = Timeout.create(Duration.ofSeconds(5)); - Object result = Await.result(resultFuture, timeout.duration()); - - assertEquals("ab", result); - } - - @Test - public void useSuccessfulAndFailedAndPromise() throws Exception { - final ExecutionContext ec = system.dispatcher(); - // #successful - Future future = Futures.successful("Yay!"); - // #successful - // #failed - Future otherFuture = Futures.failed(new IllegalArgumentException("Bang!")); - // #failed - // #promise - Promise promise = Futures.promise(); - Future theFuture = promise.future(); - promise.success("hello"); - // #promise - Timeout timeout = Timeout.create(Duration.ofSeconds(5)); - Object result = Await.result(future, timeout.duration()); - assertEquals("Yay!", result); - Throwable result2 = Await.result(otherFuture.failed(), timeout.duration()); - assertEquals("Bang!", result2.getMessage()); - String out = Await.result(theFuture, timeout.duration()); - assertEquals("hello", out); - } - - @Test - public void useFilter() throws Exception { - // #filter - final ExecutionContext ec = system.dispatcher(); - Future future1 = Futures.successful(4); - Future successfulFilter = - future1.filter( - Filter.filterOf( - new Function() { - public Boolean apply(Integer i) { - return i % 2 == 0; - } - }), - ec); - - Future failedFilter = - future1.filter( - Filter.filterOf( - new Function() { - public Boolean apply(Integer i) { - return i % 2 != 0; - } - }), - ec); - // When filter fails, the returned Future will be failed with a scala.MatchError - // #filter - } - - public void sendToTheInternetz(String s) {} - - public void sendToIssueTracker(Throwable t) {} - - @Test - public void useAndThen() { - // #and-then - final ExecutionContext ec = system.dispatcher(); - Future future1 = - Futures.successful("value") - .andThen( - new OnComplete() { - public void onComplete(Throwable failure, String result) { - if (failure != null) sendToIssueTracker(failure); - } - }, - ec) - .andThen( - new OnComplete() { - public void onComplete(Throwable failure, String result) { - if (result != null) sendToTheInternetz(result); - } - }, - ec); - // #and-then - } - - @Test - public void useRecover() throws Exception { - // #recover - final ExecutionContext ec = system.dispatcher(); - - Future future = - future( - new Callable() { - public Integer call() { - return 1 / 0; - } - }, - ec) - .recover( - new Recover() { - public Integer recover(Throwable problem) throws Throwable { - if (problem instanceof ArithmeticException) return 0; - else throw problem; - } - }, - ec); - - future.onComplete(new PrintResult>(), system.dispatcher()); - // #recover - Timeout timeout = Timeout.create(Duration.ofSeconds(5)); - int result = Await.result(future, timeout.duration()); - assertEquals(result, 0); - } - - @Test - public void useTryRecover() throws Exception { - // #try-recover - final ExecutionContext ec = system.dispatcher(); - - Future future = - future( - new Callable() { - public Integer call() { - return 1 / 0; - } - }, - ec) - .recoverWith( - new Recover>() { - public Future recover(Throwable problem) throws Throwable { - if (problem instanceof ArithmeticException) { - return future( - new Callable() { - public Integer call() { - return 0; - } - }, - ec); - } else throw problem; - } - }, - ec); - - future.onComplete(new PrintResult>(), system.dispatcher()); - // #try-recover - Timeout timeout = Timeout.create(Duration.ofSeconds(5)); - int result = Await.result(future, timeout.duration()); - assertEquals(result, 0); - } - - @Test - public void useOnOnComplete() throws Exception { - { - Future future = Futures.successful("foo"); - // #onComplete - final ExecutionContext ec = system.dispatcher(); - - future.onComplete( - new OnComplete() { - public void onComplete(Throwable failure, String result) { - if (failure != null) { - // We got a failure, handle it here - } else { - // We got a result, do something with it - } - } - }, - ec); - // #onComplete - } - } - - @Test - public void useOrAndZip() throws Exception { - { - // #zip - final ExecutionContext ec = system.dispatcher(); - Future future1 = Futures.successful("foo"); - Future future2 = Futures.successful("bar"); - Future future3 = - future1 - .zip(future2) - .map( - new Mapper, String>() { - public String apply(scala.Tuple2 zipped) { - return zipped._1() + " " + zipped._2(); - } - }, - ec); - - future3.onComplete(new PrintResult>(), system.dispatcher()); - // #zip - Timeout timeout = Timeout.create(Duration.ofSeconds(5)); - String result = Await.result(future3, timeout.duration()); - assertEquals("foo bar", result); - } - - { - // #fallback-to - Future future1 = Futures.failed(new IllegalStateException("OHNOES1")); - Future future2 = Futures.failed(new IllegalStateException("OHNOES2")); - Future future3 = Futures.successful("bar"); - // Will have "bar" in this case - Future future4 = future1.fallbackTo(future2).fallbackTo(future3); - future4.onComplete(new PrintResult>(), system.dispatcher()); - // #fallback-to - Timeout timeout = Timeout.create(Duration.ofSeconds(5)); - String result = Await.result(future4, timeout.duration()); - assertEquals("bar", result); - } - } - - @Test(expected = IllegalStateException.class) + @Test(expected = java.util.concurrent.CompletionException.class) @SuppressWarnings("unchecked") public void useAfter() throws Exception { + final ExecutionContext ec = system.executionContext(); + // #after + CompletionStage failWithException = + CompletableFuture.supplyAsync( + () -> { + throw new IllegalStateException("OHNOES1"); + }); + CompletionStage delayed = + Patterns.after(Duration.ofMillis(200), system, () -> failWithException); // #after - final ExecutionContext ec = system.dispatcher(); - Future failExc = Futures.failed(new IllegalStateException("OHNOES1")); - Timeout delay = Timeout.create(Duration.ofMillis(200)); - Future delayed = Patterns.after(delay.duration(), system.scheduler(), ec, failExc); Future future = future( - new Callable() { - public String call() throws InterruptedException { - Thread.sleep(1000); - return "foo"; - } + () -> { + Thread.sleep(1000); + return "foo"; }, ec); Future result = - Futures.firstCompletedOf(Arrays.>asList(future, delayed), ec); - // #after + Futures.firstCompletedOf( + Arrays.>asList(future, FutureConverters.toScala(delayed)), ec); Timeout timeout = Timeout.create(Duration.ofSeconds(2)); Await.result(result, timeout.duration()); } @Test public void useRetry() throws Exception { - // #retry - final ExecutionContext ec = system.dispatcher(); Callable> attempt = () -> CompletableFuture.completedFuture("test"); + CompletionStage retriedFuture = - retry(attempt, 3, java.time.Duration.ofMillis(200), system.scheduler(), ec); + Patterns.retry(attempt, 3, java.time.Duration.ofMillis(200), system); // #retry retriedFuture.toCompletableFuture().get(2, SECONDS); } - - @Test - public void thenApplyCompletionThread() throws Exception { - // #apply-completion-thread - final ExecutionContext ec = system.dispatcher(); - final CountDownLatch countDownLatch = new CountDownLatch(1); - - Future scalaFuture = - Futures.future( - () -> { - assertThat( - Thread.currentThread().getName(), - containsString("akka.actor.default-dispatcher")); - countDownLatch.await(); // do not complete yet - return "hello"; - }, - ec); - - CompletionStage fromScalaFuture = - FutureConverters.toJava(scalaFuture) - .thenApply( - s -> { // 1 - assertThat( - Thread.currentThread().getName(), containsString("ForkJoinPool.commonPool")); - return s; - }) - .thenApply( - s -> { // 2 - assertThat( - Thread.currentThread().getName(), containsString("ForkJoinPool.commonPool")); - return s; - }) - .thenApply( - s -> { // 3 - assertThat( - Thread.currentThread().getName(), containsString("ForkJoinPool.commonPool")); - return s; - }); - - countDownLatch.countDown(); // complete scalaFuture - // #apply-completion-thread - - fromScalaFuture.toCompletableFuture().get(2, SECONDS); - } - - @Test - public void thenApplyMainThread() throws Exception { - final ExecutionContext ec = system.dispatcher(); - - // #apply-main-thread - Future scalaFuture = - Futures.future( - () -> { - assertThat( - Thread.currentThread().getName(), - containsString("akka.actor.default-dispatcher")); - return "hello"; - }, - ec); - - CompletionStage completedStage = - FutureConverters.toJava(scalaFuture) - .thenApply( - s -> { // 1 - assertThat( - Thread.currentThread().getName(), containsString("ForkJoinPool.commonPool")); - return s; - }); - - completedStage.toCompletableFuture().get(2, SECONDS); // complete current CompletionStage - final String currentThread = Thread.currentThread().getName(); - - CompletionStage stage2 = - completedStage - .thenApply( - s -> { // 2 - assertThat(Thread.currentThread().getName(), is(currentThread)); - return s; - }) - .thenApply( - s -> { // 3 - assertThat(Thread.currentThread().getName(), is(currentThread)); - return s; - }); - // #apply-main-thread - - stage2.toCompletableFuture().get(2, SECONDS); - } - - @Test - public void thenApplyAsyncDefault() throws Exception { - final ExecutionContext ec = system.dispatcher(); - - Future scalaFuture = - Futures.future( - () -> { - assertThat( - Thread.currentThread().getName(), - containsString("akka.actor.default-dispatcher")); - return "hello"; - }, - ec); - - // #apply-async-default - CompletionStage fromScalaFuture = - FutureConverters.toJava(scalaFuture) - .thenApplyAsync( - s -> { // 1 - assertThat( - Thread.currentThread().getName(), containsString("ForkJoinPool.commonPool")); - return s; - }) - .thenApplyAsync( - s -> { // 2 - assertThat( - Thread.currentThread().getName(), containsString("ForkJoinPool.commonPool")); - return s; - }) - .thenApplyAsync( - s -> { // 3 - assertThat( - Thread.currentThread().getName(), containsString("ForkJoinPool.commonPool")); - return s; - }); - // #apply-async-default - - fromScalaFuture.toCompletableFuture().get(2, SECONDS); - } - - @Test - public void thenApplyAsyncExecutor() throws Exception { - final ExecutionContext ec = system.dispatcher(); - - Future scalaFuture = - Futures.future( - () -> { - assertThat( - Thread.currentThread().getName(), - containsString("akka.actor.default-dispatcher")); - return "hello"; - }, - ec); - - // #apply-async-executor - final Executor ex = system.dispatcher(); - - CompletionStage fromScalaFuture = - FutureConverters.toJava(scalaFuture) - .thenApplyAsync( - s -> { - assertThat( - Thread.currentThread().getName(), - containsString("akka.actor.default-dispatcher")); - return s; - }, - ex) - .thenApplyAsync( - s -> { - assertThat( - Thread.currentThread().getName(), - containsString("akka.actor.default-dispatcher")); - return s; - }, - ex) - .thenApplyAsync( - s -> { - assertThat( - Thread.currentThread().getName(), - containsString("akka.actor.default-dispatcher")); - return s; - }, - ex); - // #apply-async-executor - - fromScalaFuture.toCompletableFuture().get(2, SECONDS); - } - - public static class MyActor extends AbstractActor { - @Override - public Receive createReceive() { - return receiveBuilder() - .match( - String.class, - msg -> { - getSender().tell(msg.toUpperCase(), getSelf()); - }) - .match( - Integer.class, - i -> { - if (i < 0) { - getSender() - .tell( - new Failure(new ArithmeticException("Negative values not supported")), - getSelf()); - } else { - getSender().tell(i, getSelf()); - } - }) - .build(); - } - } } diff --git a/akka-docs/src/test/scala/docs/future/FutureDocSpec.scala b/akka-docs/src/test/scala/docs/future/FutureDocSpec.scala index c383e7241f..a3b76b1bc2 100644 --- a/akka-docs/src/test/scala/docs/future/FutureDocSpec.scala +++ b/akka-docs/src/test/scala/docs/future/FutureDocSpec.scala @@ -5,12 +5,15 @@ package docs.future import language.postfixOps - import akka.testkit._ import akka.actor.{ Actor, ActorRef, Props, Status } import akka.util.Timeout + import scala.concurrent.duration._ import java.lang.IllegalStateException + +import akka.actor.typed.ActorSystem + import scala.concurrent.{ Await, ExecutionContext, Future, Promise } import scala.util.{ Failure, Success } @@ -479,12 +482,12 @@ class FutureDocSpec extends AkkaSpec { } "demonstrate usage of pattern.after" in { + import akka.actor.typed.scaladsl.adapter.ClassicActorSystemOps + implicit val system: ActorSystem[Nothing] = this.system.toTyped //#after - // TODO after is unfortunately shadowed by ScalaTest, fix as part of #3759 - // import akka.pattern.after - val delayed = - akka.pattern.after(200 millis, using = system.scheduler)(Future.failed(new IllegalStateException("OHNOES"))) + akka.pattern.after(200.millis)(Future.failed(new IllegalStateException("OHNOES"))) + val future = Future { Thread.sleep(1000); "foo" } val result = Future.firstCompletedOf(Seq(future, delayed)) //#after @@ -492,18 +495,24 @@ class FutureDocSpec extends AkkaSpec { } "demonstrate pattern.retry" in { + import akka.actor.typed.scaladsl.adapter.ClassicActorSystemOps + val system: ActorSystem[Nothing] = this.system.toTyped //#retry - implicit val scheduler = system.scheduler + import akka.actor.typed.scaladsl.adapter._ + implicit val scheduler: akka.actor.Scheduler = system.scheduler.toClassic + implicit val ec: ExecutionContext = system.executionContext + //Given some future that will succeed eventually @volatile var failCount = 0 - def attempt() = { + def futureToAttempt() = { if (failCount < 5) { failCount += 1 Future.failed(new IllegalStateException(failCount.toString)) } else Future.successful(5) } + //Return a new future that will retry up to 10 times - val retried = akka.pattern.retry(() => attempt(), 10, 100 milliseconds) + val retried: Future[Int] = akka.pattern.retry(() => futureToAttempt(), attempts = 10, 100 milliseconds) //#retry Await.result(retried, 1 second) should ===(5)