diff --git a/akka-docs/rst/java/code/docs/future/FutureDocTest.java b/akka-docs/rst/java/code/docs/future/FutureDocTest.java index 73cf36ad48..d79edba779 100644 --- a/akka-docs/rst/java/code/docs/future/FutureDocTest.java +++ b/akka-docs/rst/java/code/docs/future/FutureDocTest.java @@ -50,8 +50,13 @@ import java.util.Arrays; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.CountDownLatch; + +import scala.compat.java8.FutureConverters; import akka.testkit.AkkaJUnitActorSystemResource; import org.junit.ClassRule; @@ -65,6 +70,8 @@ import akka.actor.ActorRef; import akka.actor.Props; import akka.pattern.Patterns; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.core.StringContains.containsString; import static org.junit.Assert.*; public class FutureDocTest extends AbstractJavaTest { @@ -524,6 +531,131 @@ public class FutureDocTest extends AbstractJavaTest { Await.result(result, Duration.create(2, SECONDS)); } + @Test + public void thenApplyCompletionThread() throws Exception { + //#apply-completion-thread + final ExecutionContext ec = system.dispatcher(); + final CountDownLatch countDownLatch = new CountDownLatch(1); + + Future scalaFuture = Futures.future(() -> { + assertThat(Thread.currentThread().getName(), containsString("akka.actor.default-dispatcher")); + countDownLatch.await(); // do not complete yet + return "hello"; + }, ec); + + CompletionStage fromScalaFuture = FutureConverters.toJava(scalaFuture) + .thenApply(s -> { // 1 + assertThat(Thread.currentThread().getName(), containsString("ForkJoinPool.commonPool")); + return s; + }) + .thenApply(s -> { // 2 + assertThat(Thread.currentThread().getName(), containsString("ForkJoinPool.commonPool")); + return s; + }) + .thenApply(s -> { // 3 + assertThat(Thread.currentThread().getName(), containsString("ForkJoinPool.commonPool")); + return s; + }); + + countDownLatch.countDown(); // complete scalaFuture + //#apply-completion-thread + + fromScalaFuture.toCompletableFuture().get(2, SECONDS); + } + + @Test + public void thenApplyMainThread() throws Exception { + final ExecutionContext ec = system.dispatcher(); + + //#apply-main-thread + Future scalaFuture = Futures.future(() -> { + assertThat(Thread.currentThread().getName(), containsString("akka.actor.default-dispatcher")); + return "hello"; + }, ec); + + CompletionStage completedStage = FutureConverters.toJava(scalaFuture) + .thenApply(s -> { // 1 + assertThat(Thread.currentThread().getName(), containsString("ForkJoinPool.commonPool")); + return s; + }); + + completedStage.toCompletableFuture().get(2, SECONDS); // complete current CompletionStage + final String currentThread = Thread.currentThread().getName(); + + CompletionStage stage2 = completedStage + .thenApply(s -> { // 2 + assertThat(Thread.currentThread().getName(), is(currentThread)); + return s; + }) + .thenApply(s -> { // 3 + assertThat(Thread.currentThread().getName(), is(currentThread)); + return s; + }); + //#apply-main-thread + + stage2.toCompletableFuture().get(2, SECONDS); + } + + @Test + public void thenApplyAsyncDefault() throws Exception { + final ExecutionContext ec = system.dispatcher(); + + Future scalaFuture = Futures.future(() -> { + assertThat(Thread.currentThread().getName(), containsString("akka.actor.default-dispatcher")); + return "hello"; + }, ec); + + //#apply-async-default + CompletionStage fromScalaFuture = FutureConverters.toJava(scalaFuture) + .thenApplyAsync(s -> { // 1 + assertThat(Thread.currentThread().getName(), containsString("ForkJoinPool.commonPool")); + return s; + }) + .thenApplyAsync(s -> { // 2 + assertThat(Thread.currentThread().getName(), containsString("ForkJoinPool.commonPool")); + return s; + }) + .thenApplyAsync(s -> { // 3 + assertThat(Thread.currentThread().getName(), containsString("ForkJoinPool.commonPool")); + return s; + }); + //#apply-async-default + + fromScalaFuture.toCompletableFuture().get(2, SECONDS); + } + + @Test + public void thenApplyAsyncExecutor() throws Exception { + final ExecutionContext ec = system.dispatcher(); + + Future scalaFuture = Futures.future(() -> { + assertThat(Thread.currentThread().getName(), containsString("akka.actor.default-dispatcher")); + return "hello"; + }, ec); + + //#apply-async-executor + final Executor ex = system.dispatcher(); + + CompletionStage fromScalaFuture = FutureConverters.toJava(scalaFuture) + .thenApplyAsync(s -> { + assertThat(Thread.currentThread().getName(), containsString("akka.actor.default-dispatcher")); + return s; + }, ex) + .thenApplyAsync(s -> { + assertThat(Thread.currentThread().getName(), containsString("akka.actor.default-dispatcher")); + return s; + }, ex) + .thenApplyAsync(s -> { + assertThat(Thread.currentThread().getName(), containsString("akka.actor.default-dispatcher")); + return s; + }, ex); + //#apply-async-executor + + fromScalaFuture.toCompletableFuture().get(2, SECONDS); + } + + + public static class MyActor extends UntypedActor { public void onReceive(Object message) { if (message instanceof String) { diff --git a/akka-docs/rst/java/futures.rst b/akka-docs/rst/java/futures.rst index ea39963a45..4054eb064b 100644 --- a/akka-docs/rst/java/futures.rst +++ b/akka-docs/rst/java/futures.rst @@ -260,3 +260,88 @@ After .. includecode:: code/docs/future/FutureDocTest.java :include: after + +Java 8, CompletionStage and CompletableFuture +--------------------------------------------- + +Starting with Akka 2.4.2 we have begun to introduce Java 8 ``java.util.concurrent.CompletionStage`` in Java APIs. +It's a ``scala.concurrent.Future`` counterpart in Java; conversion from ``scala.concurrent.Future`` is done using +``scala-java8-compat`` library. + +Unlike ``scala.concurrent.Future`` which has async methods only, ``CompletionStage`` has *async* and *non-async* methods. + +The ``scala-java8-compat`` library returns its own implementation of ``CompletionStage`` which delegates all *non-async* +methods to their *async* counterparts. The implementation extends standard Java ``CompletableFuture``. +Java 8 ``CompletableFuture`` creates a new instance of ``CompletableFuture`` for any new stage, +which means ``scala-java8-compat`` implementation is not used after the first mapping method. + +.. note:: + After adding any additional computation stage to ``CompletionStage`` returned by ``scala-java8-compat`` + (e.g. ``CompletionStage`` instances returned by Akka) it falls back to standard behaviour of Java ``CompletableFuture``. + +Actions supplied for dependent completions of *non-async* methods may be performed by the thread +that completes the current ``CompletableFuture``, or by any other caller of a completion method. + +All *async* methods without an explicit Executor are performed using the ``ForkJoinPool.commonPool()`` executor. + +Non-async methods +^^^^^^^^^^^^^^^^^ + +When non-async methods are applied on a not yet completed ``CompletionStage``, they are completed by +the thread which completes initial ``CompletionStage``: + +.. includecode:: code/docs/future/FutureDocTest.java + :include: apply-completion-thread + +In this example Scala ``Future`` is converted to ``CompletionStage`` just like Akka does. +The completion is delayed: we are calling ``thenApply`` multiple times on a not yet complete ``CompletionStage``, then +complete the ``Future``. + +First ``thenApply`` is actually performed on ``scala-java8-compat`` instance and computational stage (lambda) execution +is delegated to default Java ``thenApplyAsync`` which is executed on ``ForkJoinPool.commonPool()``. + +Second and third ``thenApply`` methods are executed on Java 8 ``CompletableFuture`` instance which executes computational +stages on the thread which completed the first stage. It is never executed on a thread of Scala ``Future`` because +default ``thenApply`` breaks the chain and executes on ``ForkJoinPool.commonPool()``. + + +In the next example ``thenApply`` methods are executed on an already completed ``Future``/``CompletionStage``: + +.. includecode:: code/docs/future/FutureDocTest.java + :include: apply-main-thread + +First ``thenApply`` is still executed on ``ForkJoinPool.commonPool()`` (because it is actually ``thenApplyAsync`` +which is always executed on global Java pool). + +Then we wait for stages to complete so second and third ``thenApply`` are executed on completed ``CompletionStage``, +and stages are executed on the current thread - the thread which called second and third ``thenApply``. + + +Async methods +^^^^^^^^^^^^^ + +As mentioned above, default *async* methods are always executed on ``ForkJoinPool.commonPool()``: + +.. includecode:: code/docs/future/FutureDocTest.java + :include: apply-async-default + + +``CompletionStage`` also has *async* methods which take ``Executor`` as a second parameter, just like ``Future``: + +.. includecode:: code/docs/future/FutureDocTest.java + :include: apply-async-executor + +This example is behaving like ``Future``: every stage is executed on an explicitly specified ``Executor``. + +.. note:: + When in doubt, async methods with explicit executor should be used. Always async methods with a dedicated + executor/dispatcher for long-running or blocking computations, such as IO operations. + +See also: + +- `CompletionStage `_ + +- `CompletableFuture `_ + +- `scala-java8-compat `_ +