+doc Documented how CompletionStages execute #20513 (#21845)

* Documented how CompletionStages execute #20513

* Fixed the failing test: tests are not necessarily run from "main" thread.

* Addressed pull request comment #20513

* Addressed more PR comments #20513
This commit is contained in:
Oleg Poleshuk 2016-12-12 21:46:37 +01:00 committed by Konrad `ktoso` Malawski
parent 5bcffa2acc
commit c50ecf3287
2 changed files with 217 additions and 0 deletions

View file

@ -50,8 +50,13 @@ import java.util.Arrays;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.CountDownLatch;
import scala.compat.java8.FutureConverters;
import akka.testkit.AkkaJUnitActorSystemResource;
import org.junit.ClassRule;
@ -65,6 +70,8 @@ import akka.actor.ActorRef;
import akka.actor.Props;
import akka.pattern.Patterns;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.core.StringContains.containsString;
import static org.junit.Assert.*;
public class FutureDocTest extends AbstractJavaTest {
@ -524,6 +531,131 @@ public class FutureDocTest extends AbstractJavaTest {
Await.result(result, Duration.create(2, SECONDS));
}
@Test
public void thenApplyCompletionThread() throws Exception {
//#apply-completion-thread
final ExecutionContext ec = system.dispatcher();
final CountDownLatch countDownLatch = new CountDownLatch(1);
Future<String> scalaFuture = Futures.future(() -> {
assertThat(Thread.currentThread().getName(), containsString("akka.actor.default-dispatcher"));
countDownLatch.await(); // do not complete yet
return "hello";
}, ec);
CompletionStage<String> fromScalaFuture = FutureConverters.toJava(scalaFuture)
.thenApply(s -> { // 1
assertThat(Thread.currentThread().getName(), containsString("ForkJoinPool.commonPool"));
return s;
})
.thenApply(s -> { // 2
assertThat(Thread.currentThread().getName(), containsString("ForkJoinPool.commonPool"));
return s;
})
.thenApply(s -> { // 3
assertThat(Thread.currentThread().getName(), containsString("ForkJoinPool.commonPool"));
return s;
});
countDownLatch.countDown(); // complete scalaFuture
//#apply-completion-thread
fromScalaFuture.toCompletableFuture().get(2, SECONDS);
}
@Test
public void thenApplyMainThread() throws Exception {
final ExecutionContext ec = system.dispatcher();
//#apply-main-thread
Future<String> scalaFuture = Futures.future(() -> {
assertThat(Thread.currentThread().getName(), containsString("akka.actor.default-dispatcher"));
return "hello";
}, ec);
CompletionStage<String> completedStage = FutureConverters.toJava(scalaFuture)
.thenApply(s -> { // 1
assertThat(Thread.currentThread().getName(), containsString("ForkJoinPool.commonPool"));
return s;
});
completedStage.toCompletableFuture().get(2, SECONDS); // complete current CompletionStage
final String currentThread = Thread.currentThread().getName();
CompletionStage<String> stage2 = completedStage
.thenApply(s -> { // 2
assertThat(Thread.currentThread().getName(), is(currentThread));
return s;
})
.thenApply(s -> { // 3
assertThat(Thread.currentThread().getName(), is(currentThread));
return s;
});
//#apply-main-thread
stage2.toCompletableFuture().get(2, SECONDS);
}
@Test
public void thenApplyAsyncDefault() throws Exception {
final ExecutionContext ec = system.dispatcher();
Future<String> scalaFuture = Futures.future(() -> {
assertThat(Thread.currentThread().getName(), containsString("akka.actor.default-dispatcher"));
return "hello";
}, ec);
//#apply-async-default
CompletionStage<String> fromScalaFuture = FutureConverters.toJava(scalaFuture)
.thenApplyAsync(s -> { // 1
assertThat(Thread.currentThread().getName(), containsString("ForkJoinPool.commonPool"));
return s;
})
.thenApplyAsync(s -> { // 2
assertThat(Thread.currentThread().getName(), containsString("ForkJoinPool.commonPool"));
return s;
})
.thenApplyAsync(s -> { // 3
assertThat(Thread.currentThread().getName(), containsString("ForkJoinPool.commonPool"));
return s;
});
//#apply-async-default
fromScalaFuture.toCompletableFuture().get(2, SECONDS);
}
@Test
public void thenApplyAsyncExecutor() throws Exception {
final ExecutionContext ec = system.dispatcher();
Future<String> scalaFuture = Futures.future(() -> {
assertThat(Thread.currentThread().getName(), containsString("akka.actor.default-dispatcher"));
return "hello";
}, ec);
//#apply-async-executor
final Executor ex = system.dispatcher();
CompletionStage<String> fromScalaFuture = FutureConverters.toJava(scalaFuture)
.thenApplyAsync(s -> {
assertThat(Thread.currentThread().getName(), containsString("akka.actor.default-dispatcher"));
return s;
}, ex)
.thenApplyAsync(s -> {
assertThat(Thread.currentThread().getName(), containsString("akka.actor.default-dispatcher"));
return s;
}, ex)
.thenApplyAsync(s -> {
assertThat(Thread.currentThread().getName(), containsString("akka.actor.default-dispatcher"));
return s;
}, ex);
//#apply-async-executor
fromScalaFuture.toCompletableFuture().get(2, SECONDS);
}
public static class MyActor extends UntypedActor {
public void onReceive(Object message) {
if (message instanceof String) {

View file

@ -260,3 +260,88 @@ After
.. includecode:: code/docs/future/FutureDocTest.java
:include: after
Java 8, CompletionStage and CompletableFuture
---------------------------------------------
Starting with Akka 2.4.2 we have begun to introduce Java 8 ``java.util.concurrent.CompletionStage`` in Java APIs.
It's a ``scala.concurrent.Future`` counterpart in Java; conversion from ``scala.concurrent.Future`` is done using
``scala-java8-compat`` library.
Unlike ``scala.concurrent.Future`` which has async methods only, ``CompletionStage`` has *async* and *non-async* methods.
The ``scala-java8-compat`` library returns its own implementation of ``CompletionStage`` which delegates all *non-async*
methods to their *async* counterparts. The implementation extends standard Java ``CompletableFuture``.
Java 8 ``CompletableFuture`` creates a new instance of ``CompletableFuture`` for any new stage,
which means ``scala-java8-compat`` implementation is not used after the first mapping method.
.. note::
After adding any additional computation stage to ``CompletionStage`` returned by ``scala-java8-compat``
(e.g. ``CompletionStage`` instances returned by Akka) it falls back to standard behaviour of Java ``CompletableFuture``.
Actions supplied for dependent completions of *non-async* methods may be performed by the thread
that completes the current ``CompletableFuture``, or by any other caller of a completion method.
All *async* methods without an explicit Executor are performed using the ``ForkJoinPool.commonPool()`` executor.
Non-async methods
^^^^^^^^^^^^^^^^^
When non-async methods are applied on a not yet completed ``CompletionStage``, they are completed by
the thread which completes initial ``CompletionStage``:
.. includecode:: code/docs/future/FutureDocTest.java
:include: apply-completion-thread
In this example Scala ``Future`` is converted to ``CompletionStage`` just like Akka does.
The completion is delayed: we are calling ``thenApply`` multiple times on a not yet complete ``CompletionStage``, then
complete the ``Future``.
First ``thenApply`` is actually performed on ``scala-java8-compat`` instance and computational stage (lambda) execution
is delegated to default Java ``thenApplyAsync`` which is executed on ``ForkJoinPool.commonPool()``.
Second and third ``thenApply`` methods are executed on Java 8 ``CompletableFuture`` instance which executes computational
stages on the thread which completed the first stage. It is never executed on a thread of Scala ``Future`` because
default ``thenApply`` breaks the chain and executes on ``ForkJoinPool.commonPool()``.
In the next example ``thenApply`` methods are executed on an already completed ``Future``/``CompletionStage``:
.. includecode:: code/docs/future/FutureDocTest.java
:include: apply-main-thread
First ``thenApply`` is still executed on ``ForkJoinPool.commonPool()`` (because it is actually ``thenApplyAsync``
which is always executed on global Java pool).
Then we wait for stages to complete so second and third ``thenApply`` are executed on completed ``CompletionStage``,
and stages are executed on the current thread - the thread which called second and third ``thenApply``.
Async methods
^^^^^^^^^^^^^
As mentioned above, default *async* methods are always executed on ``ForkJoinPool.commonPool()``:
.. includecode:: code/docs/future/FutureDocTest.java
:include: apply-async-default
``CompletionStage`` also has *async* methods which take ``Executor`` as a second parameter, just like ``Future``:
.. includecode:: code/docs/future/FutureDocTest.java
:include: apply-async-executor
This example is behaving like ``Future``: every stage is executed on an explicitly specified ``Executor``.
.. note::
When in doubt, async methods with explicit executor should be used. Always async methods with a dedicated
executor/dispatcher for long-running or blocking computations, such as IO operations.
See also:
- `CompletionStage <https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/CompletionStage.html>`_
- `CompletableFuture <https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/CompletableFuture.html>`_
- `scala-java8-compat <https://github.com/scala/scala-java8-compat>`_