Improve Java perspective dispatcher docs (#27521)

This commit is contained in:
Arnout Engelen 2019-08-29 14:22:50 +02:00 committed by Patrik Nordwall
parent acee1b4185
commit 29b95f166a
14 changed files with 230 additions and 207 deletions

View file

@ -11,24 +11,14 @@ import akka.actor.typed.DispatcherSelector;
public class DispatchersDocTest {
private static Behavior<String> yourBehavior = Behaviors.empty();
private static Behavior<Object> example =
Behaviors.receive(
(context, message) -> {
public static void spawnDispatchers(ActorContext<Integer> context, Behavior<String> behavior) {
// #spawn-dispatcher
context.spawn(yourBehavior, "DefaultDispatcher");
context.spawn(behavior, "DefaultDispatcher");
context.spawn(behavior, "ExplicitDefaultDispatcher", DispatcherSelector.defaultDispatcher());
context.spawn(behavior, "BlockingDispatcher", DispatcherSelector.blocking());
context.spawn(behavior, "ParentDispatcher", DispatcherSelector.sameAsParent());
context.spawn(
yourBehavior, "ExplicitDefaultDispatcher", DispatcherSelector.defaultDispatcher());
context.spawn(yourBehavior, "BlockingDispatcher", DispatcherSelector.blocking());
context.spawn(
yourBehavior,
"DispatcherFromConfig",
DispatcherSelector.fromConfig("your-dispatcher"));
context.spawn(yourBehavior, "ParentDispatcher", DispatcherSelector.sameAsParent());
behavior, "DispatcherFromConfig", DispatcherSelector.fromConfig("your-dispatcher"));
// #spawn-dispatcher
return Behaviors.same();
});
}
}

View file

@ -21,13 +21,16 @@ Dispatchers are part of core Akka, which means that they are part of the akka-ac
version="$akka.version$"
}
## Introduction
<a id="dispatcher-lookup"></a>
## Looking up a Dispatcher
An Akka `MessageDispatcher` is what makes Akka Actors "tick", it is the engine of the machine so to speak.
All `MessageDispatcher` implementations are also an `ExecutionContext`, which means that they can be used
to execute arbitrary code.
Dispatchers implement the @scala[`ExecutionContext`]@java[`Executor`] interface and can thus be used to run @scala[`Future`]@java[`CompletableFuture`] invocations etc.
For full details on how to work with dispatchers see the @ref:[main dispatcher docs](typed/dispatchers.md#types-of-dispatchers).
Scala
: @@snip [DispatcherDocSpec.scala](/akka-docs/src/test/scala/docs/dispatcher/DispatcherDocSpec.scala) { #lookup }
Java
: @@snip [DispatcherDocTest.java](/akka-docs/src/test/java/jdocs/dispatcher/DispatcherDocTest.java) { #lookup }
## Setting the dispatcher for an Actor

View file

@ -14,15 +14,13 @@ Dispatchers are part of core Akka, which means that they are part of the akka-ac
An Akka `MessageDispatcher` is what makes Akka Actors "tick", it is the engine of the machine so to speak.
All `MessageDispatcher` implementations are also an @scala[`ExecutionContext`]@java[`Executor`], which means that they can be used
to execute arbitrary code.
to execute arbitrary code, for instance @scala[`Future`s]@java[`CompletableFuture`s].
## Default dispatcher
Every `ActorSystem` will have a default dispatcher that will be used in case nothing else is configured for an `Actor`.
The default dispatcher can be configured, and is by default a `Dispatcher` with the specified `default-executor`.
If an ActorSystem is created with an ExecutionContext passed in, this ExecutionContext will be used as the default executor for all
dispatchers in this ActorSystem. If no ExecutionContext is given, it will fallback to the executor specified in
`akka.actor.default-dispatcher.default-executor.fallback`. By default this is a "fork-join-executor", which
The default dispatcher can be configured, and is by default a `Dispatcher` with the configured `akka.actor.default-dispatcher.executor`.
If no executor is selected a "fork-join-executor" is selected, which
gives excellent performance in most cases.
## Internal dispatcher
@ -34,13 +32,13 @@ be replaced by another dispatcher by making `akka.actor.internal-dispatcher` an
<a id="dispatcher-lookup"></a>
## Looking up a Dispatcher
Dispatchers implement the `ExecutionContext` interface and can thus be used to run `Future` invocations etc.
Dispatchers implement the @scala[`ExecutionContext`]@java[`Executor`] interface and can thus be used to run @scala[`Future`]@java[`CompletableFuture`] invocations etc.
Scala
: @@snip [DispatcherDocSpec.scala](/akka-docs/src/test/scala/docs/dispatcher/DispatcherDocSpec.scala) { #lookup }
: @@snip [DispatcherDocSpec.scala](/akka-docs/src/test/scala/docs/actor/typed/DispatcherDocSpec.scala) { #lookup }
Java
: @@snip [DispatcherDocTest.java](/akka-docs/src/test/java/jdocs/dispatcher/DispatcherDocTest.java) { #lookup }
: @@snip [DispatcherDocTest.java](/akka-docs/src/test/java/jdocs/actor/typed/DispatcherDocTest.java) { #lookup }
## Selecting a dispatcher
@ -62,13 +60,10 @@ Java
* `DispatcherSelector.blocking` can be used to execute actors that block e.g. a legacy database API that does not support @scala[`Future`]@java[`CompletionStage`]s
* `DispatcherSelector.sameAsParent` to use the same dispatcher as the parent actor
The final example shows how to load a custom dispatcher from configuration and replies on this being in your application.conf:
The final example shows how to load a custom dispatcher from configuration and relies on this being in your application.conf:
Scala
: @@snip [DispatcherDocSpec.scala](/akka-actor-typed-tests/src/test/scala/docs/akka/typed/DispatchersDocSpec.scala) { #config }
Java
: @@snip [DispatcherDocSpec.scala](/akka-actor-typed-tests/src/test/scala/docs/akka/typed/DispatchersDocSpec.scala) { #config }
<!-- Same between Java and Scala -->
@@snip [DispatcherDocSpec.scala](/akka-actor-typed-tests/src/test/scala/docs/akka/typed/DispatchersDocSpec.scala) { #config }
## Types of dispatchers
@ -130,50 +125,19 @@ to sleep for an indeterminate time, waiting for an external event to occur.
Examples are legacy RDBMS drivers or messaging APIs, and the underlying reason
is typically that (network) I/O occurs under the covers.
### Problem: Blocking on default dispatcher
Simply add blocking calls to your actor message processing like this is problematic:
Scala
: @@snip [BlockingDispatcherSample.scala](/akka-docs/src/test/scala/docs/actor/typed/BlockingDispatcherSample.scala) { #blocking-in-actor }
: @@snip [BlockingDispatcherSample.scala](/akka-docs/src/test/scala/docs/actor/typed/BlockingActor.scala) { #blocking-in-actor }
Java
: @@snip [BlockingActor.java](/akka-docs/src/test/java/jdocs/actor/typed/BlockingActor.java)
When facing this, you
may be tempted to wrap the blocking call inside a `Future` and work
with that instead, but this strategy is too simple: you are quite likely to
find bottlenecks or run out of memory or threads when the application runs
under increased load.
Scala
: @@snip [BlockingDispatcherSample.scala](/akka-docs/src/test/scala/docs/actor/typed/BlockingDispatcherSample.scala) { #blocking-in-future }
Java
: @@snip [BlockingFutureActor.java](/akka-docs/src/test/java/jdocs/actor/typed/BlockingFutureActor.java) { #blocking-in-future }
### Problem: Blocking on default dispatcher
The key here is this line:
@@@ div { .group-scala }
```scala
implicit val executionContext: ExecutionContext = context.executionContext
```
@@@
@@@ div { .group-java }
```java
ExecutionContext ec = getContext().getExecutionContext();
```
@@@
Using @scala[`context.executionContext`] @java[`getContext().getExecutionContext()`] as the dispatcher on which the blocking `Future`
executes can be a problem, since this dispatcher is by default used for all other actor processing
unless you @ref:[set up a separate dispatcher for the actor](../dispatchers.md#setting-the-dispatcher-for-an-actor).
If all of the available threads are blocked, then all the actors on the same dispatcher will starve for threads and
Without any further configuration the default dispatcher runs this actor along
with all other actors. This is very efficient when all actor message processing is
non-blocking. If all of the available threads are blocked, however, then all the actors on the same dispatcher will starve for threads and
will not be able to process incoming messages.
@@@ note
@ -190,10 +154,10 @@ including Streams, Http and other reactive libraries built on top of it.
@@@
Let's set up an application with the above `BlockingFutureActor` and the following `PrintActor`.
To demonstrate this problem, let's set up an application with the above `BlockingActor` and the following `PrintActor`:
Scala
: @@snip [BlockingDispatcherSample.scala](/akka-docs/src/test/scala/docs/actor/typed/BlockingDispatcherSample.scala) { #print-actor }
: @@snip [PrintActor.scala](/akka-docs/src/test/scala/docs/actor/typed/PrintActor.scala) { #print-actor }
Java
: @@snip [PrintActor.java](/akka-docs/src/test/java/jdocs/actor/typed/PrintActor.java) { #print-actor }
@ -206,7 +170,7 @@ Java
: @@snip [BlockingDispatcherTest.java](/akka-docs/src/test/java/jdocs/actor/typed/BlockingDispatcherTest.java) { #blocking-main }
Here the app is sending 100 messages to `BlockingFutureActor` and `PrintActor` and large numbers
Here the app is sending 100 messages to `BlockingActor` and `PrintActor` and large numbers
of `akka.actor.default-dispatcher` threads are handling requests. When you run the above code,
you will likely to see the entire application gets stuck somewhere like this:
@ -253,6 +217,37 @@ In essence, the `Thread.sleep` operation has dominated all threads and caused an
executing on the default dispatcher to starve for resources (including any actor
that you have not configured an explicit dispatcher for).
@@@ div { .group-scala }
### Non-solution: Wrapping in a Future
<!--
A CompletableFuture by default on ForkJoinPool.commonPool(), so
because that is already separate from the default dispatcher
the problem described in these sections do not apply:
-->
When facing this, you
may be tempted to wrap the blocking call inside a `Future` and work
with that instead, but this strategy is too simple: you are quite likely to
find bottlenecks or run out of memory or threads when the application runs
under increased load.
Scala
: @@snip [BlockingDispatcherSample.scala](/akka-docs/src/test/scala/docs/actor/typed/BlockingDispatcherSample.scala) { #blocking-in-future }
The key problematic line here is this:
```scala
implicit val executionContext: ExecutionContext = context.executionContext
```
Using @scala[`context.executionContext`] as the dispatcher on which the blocking `Future`
executes can still be a problem, since this dispatcher is by default used for all other actor processing
unless you @ref:[set up a separate dispatcher for the actor](../dispatchers.md#setting-the-dispatcher-for-an-actor).
@@@
### Solution: Dedicated dispatcher for blocking operations
One of the most efficient methods of isolating the blocking behavior such that it does not impact the rest of the system
@ -278,14 +273,13 @@ Scala
: @@snip [BlockingDispatcherSample.scala](/akka-docs/src/test/scala/docs/actor/typed/BlockingDispatcherSample.scala) { #separate-dispatcher }
Java
: @@snip [SeparateDispatcherFutureActor.java](/akka-docs/src/test/java/jdocs/actor/typed/SeparateDispatcherFutureActor.java) { #separate-dispatcher }
: @@snip [SeparateDispatcherCompletionStageActor.java](/akka-docs/src/test/java/jdocs/actor/typed/SeparateDispatcherCompletionStageActor.java) { #separate-dispatcher }
The thread pool behavior is shown in the below diagram.
![dispatcher-behaviour-on-good-code.png](../images/dispatcher-behaviour-on-good-code.png)
Messages sent to `SeparateDispatcherFutureActor` and `PrintActor` are handled by the default dispatcher - the
Messages sent to `SeparateDispatcherCompletionStageActor` and `PrintActor` are handled by the default dispatcher - the
green lines, which represent the actual execution.
When blocking operations are run on the `my-blocking-dispatcher`,
@ -300,14 +294,14 @@ they were still served on the default dispatcher.
This is the recommended way of dealing with any kind of blocking in reactive
applications.
For a similar discussion specific about Akka HTTP refer to, @scala[@extref[Handling blocking operations in Akka HTTP](akka.http:scala/http/handling-blocking-operations-in-akka-http-routes.html#handling-blocking-operations-in-akka-http)]@java[@extref[Handling blocking operations in Akka HTTP](akka.http:java/http/handling-blocking-operations-in-akka-http-routes.html#handling-blocking-operations-in-akka-http)].
For a similar discussion specifically about Akka HTTP, refer to @extref[Handling blocking operations in Akka HTTP](akka.http:handling-blocking-operations-in-akka-http-routes.html).
### Available solutions to blocking operations
The non-exhaustive list of adequate solutions to the “blocking problem”
includes the following suggestions:
* Do the blocking call within a `Future`, ensuring an upper bound on
* Do the blocking call within a @scala[`Future`]@java[`CompletionStage`], ensuring an upper bound on
the number of such calls at any point in time (submitting an unbounded
number of tasks of this nature will exhaust your memory or thread limits).
* Do the blocking call within a `Future`, providing a thread pool with

View file

@ -1,35 +0,0 @@
/*
* Copyright (C) 2009-2019 Lightbend Inc. <https://www.lightbend.com>
*/
package jdocs.actor;
import akka.actor.AbstractActor;
import akka.dispatch.Futures;
import scala.concurrent.ExecutionContext;
import scala.concurrent.Future;
// #blocking-in-future
class BlockingFutureActor extends AbstractActor {
ExecutionContext ec = getContext().getDispatcher();
@Override
public Receive createReceive() {
return receiveBuilder()
.match(
Integer.class,
i -> {
System.out.println("Calling blocking Future: " + i);
Future<Integer> f =
Futures.future(
() -> {
Thread.sleep(5000);
System.out.println("Blocking future finished: " + i);
return i;
},
ec);
})
.build();
}
}
// #blocking-in-future

View file

@ -20,7 +20,11 @@ public class BlockingActor extends AbstractBehavior<Integer> {
.onMessage(
Integer.class,
i -> {
Thread.sleep(5000); // block for 5 seconds, representing blocking I/O, etc
// DO NOT DO THIS HERE: this is an example of incorrect code,
// better alternatives are described futher on.
// block for 5 seconds, representing blocking I/O, etc
Thread.sleep(5000);
System.out.println("Blocking operation finished: " + i);
return Behaviors.same();
})

View file

@ -13,8 +13,7 @@ public class BlockingDispatcherTest {
Behavior<Void> root =
Behaviors.setup(
context -> {
ActorRef<Integer> actor1 =
context.spawn(BlockingFutureActor.create(), "BlockingFutureActor");
ActorRef<Integer> actor1 = context.spawn(BlockingActor.create(), "BlockingActor");
ActorRef<Integer> actor2 = context.spawn(new PrintActor(), "PrintActor");
for (int i = 0; i < 100; i++) {

View file

@ -1,49 +0,0 @@
/*
* Copyright (C) 2009-2019 Lightbend Inc. <https://www.lightbend.com>
*/
package jdocs.actor.typed;
import akka.actor.typed.*;
import akka.actor.typed.javadsl.*;
import akka.dispatch.Futures;
import scala.concurrent.ExecutionContext;
import scala.concurrent.Future;
// #blocking-in-future
class BlockingFutureActor extends AbstractBehavior<Integer> {
private final ExecutionContext ec;
public static Behavior<Integer> create() {
return Behaviors.setup(BlockingFutureActor::new);
}
private BlockingFutureActor(ActorContext<Integer> context) {
ec = context.getExecutionContext();
}
@Override
public Receive createReceive() {
return newReceiveBuilder()
.onMessage(
Integer.class,
i -> {
triggerFutureBlockingOperation(i, ec);
return Behaviors.same();
})
.build();
}
private static final void triggerFutureBlockingOperation(Integer i, ExecutionContext ec) {
System.out.println("Calling blocking Future: " + i);
Future<Integer> f =
Futures.future(
() -> {
Thread.sleep(5000);
System.out.println("Blocking future finished: " + i);
return i;
},
ec);
}
}
// #blocking-in-future

View file

@ -0,0 +1,55 @@
/*
* Copyright (C) 2009-2019 Lightbend Inc. <https://www.lightbend.com>
*/
package jdocs.actor.typed;
import java.util.concurrent.Executor;
import scala.concurrent.ExecutionContextExecutor;
import akka.actor.typed.*;
import akka.actor.typed.javadsl.*;
@SuppressWarnings("unused")
public class DispatcherDocTest {
private final ActorSystem<Void> system = null;
private final ActorContext<Void> context = null;
public void defineDispatcherInCode() {
// #defining-dispatcher-in-code
ActorRef<Integer> myActor =
context.spawn(
new PrintActor(), "PrintActor", DispatcherSelector.fromConfig("my-dispatcher"));
// #defining-dispatcher-in-code
}
public void defineFixedPoolSizeDispatcher() {
// #defining-fixed-pool-size-dispatcher
ActorRef<Integer> myActor =
context.spawn(
new PrintActor(),
"PrintActor",
DispatcherSelector.fromConfig("blocking-io-dispatcher"));
// #defining-fixed-pool-size-dispatcher
}
public void definePinnedDispatcher() {
// #defining-pinned-dispatcher
ActorRef<Integer> myActor =
context.spawn(
new PrintActor(), "PrintActor", DispatcherSelector.fromConfig("my-pinned-dispatcher"));
// #defining-pinned-dispatcher
}
public void compileLookup() {
// #lookup
// this is scala.concurrent.ExecutionContextExecutor, which implements
// both scala.concurrent.ExecutionContext (for use with Futures, Scheduler, etc.)
// and java.util.concurrent.Executor (for use with CompletableFuture etc.)
final ExecutionContextExecutor ex =
system.dispatchers().lookup(DispatcherSelector.fromConfig("my-dispatcher"));
// #lookup
}
}

View file

@ -4,15 +4,15 @@
package jdocs.actor.typed;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import akka.actor.typed.*;
import akka.actor.typed.javadsl.*;
import akka.dispatch.Futures;
import scala.concurrent.ExecutionContext;
import scala.concurrent.Future;
// #separate-dispatcher
class SeparateDispatcherFutureActor extends AbstractBehavior<Integer> {
final ExecutionContext ec;
final Executor ec;
public static Behavior<Integer> create() {
return Behaviors.setup(SeparateDispatcherFutureActor::new);
@ -38,14 +38,18 @@ class SeparateDispatcherFutureActor extends AbstractBehavior<Integer> {
.build();
}
private static final void triggerFutureBlockingOperation(Integer i, ExecutionContext ec) {
private static final void triggerFutureBlockingOperation(Integer i, Executor ec) {
System.out.println("Calling blocking Future on separate dispatcher: " + i);
Future<Integer> f =
Futures.future(
CompletableFuture<Integer> f =
CompletableFuture.supplyAsync(
() -> {
try {
Thread.sleep(5000);
System.out.println("Blocking future finished: " + i);
return i;
} catch (InterruptedException e) {
return -1;
}
},
ec);
}

View file

@ -16,6 +16,7 @@ import jdocs.actor.MyActor;
import org.junit.ClassRule;
import org.junit.Test;
import scala.concurrent.ExecutionContext;
import scala.concurrent.ExecutionContextExecutor;
// #imports
import akka.actor.*;
@ -88,9 +89,10 @@ public class DispatcherDocTest extends AbstractJavaTest {
@SuppressWarnings("unused")
public void compileLookup() {
// #lookup
// this is scala.concurrent.ExecutionContext
// for use with Futures, Scheduler, etc.
final ExecutionContext ex = system.dispatchers().lookup("my-dispatcher");
// this is scala.concurrent.ExecutionContextExecutor, which implements
// both scala.concurrent.ExecutionContext (for use with Futures, Scheduler, etc.)
// and java.util.concurrent.Executor (for use with CompletableFuture etc.)
final ExecutionContextExecutor ex = system.dispatchers().lookup("my-dispatcher");
// #lookup
}

View file

@ -0,0 +1,23 @@
/*
* Copyright (C) 2019 Lightbend Inc. <https://www.lightbend.com>
*/
package docs.actor.typed
// #blocking-in-actor
import akka.actor.typed.Behavior
import akka.actor.typed.scaladsl.Behaviors
object BlockingActor {
val behavior: Behavior[Int] = Behaviors.receiveMessage {
case i: Int =>
// DO NOT DO THIS HERE: this is an example of incorrect code,
// better alternatives are described futher on.
//block for 5 seconds, representing blocking I/O, etc
Thread.sleep(5000)
println(s"Blocking operation finished: ${i}")
Behaviors.same
}
}
// #blocking-in-actor

View file

@ -10,17 +10,6 @@ import com.typesafe.config.{ Config, ConfigFactory }
import scala.concurrent.{ ExecutionContext, Future }
// #blocking-in-actor
object BlockingActor {
val behavior: Behavior[Int] = Behaviors.receiveMessage {
case i: Int =>
Thread.sleep(5000) //block for 5 seconds, representing blocking I/O, etc
println(s"Blocking operation finished: ${i}")
Behaviors.same
}
}
// #blocking-in-actor
// #blocking-in-future
object BlockingFutureActor {
def apply(): Behavior[Int] =
@ -68,16 +57,6 @@ object SeparateDispatcherFutureActor {
}
// #separate-dispatcher
// #print-actor
object PrintActor {
val behavior: Behavior[Integer] =
Behaviors.receiveMessage(i => {
println(s"PrintActor: ${i}")
Behaviors.same
})
}
// #print-actor
object BlockingDispatcherSample {
def main(args: Array[String]) = {
// #blocking-main

View file

@ -0,0 +1,36 @@
/*
* Copyright (C) 2009-2019 Lightbend Inc. <https://www.lightbend.com>
*/
package docs.actor.typed
import akka.actor.typed.scaladsl.ActorContext
object DispatcherDocSpec {
val context: ActorContext[Integer] = ???
{
//#defining-dispatcher-in-code
import akka.actor.typed.DispatcherSelector
val myActor =
context.spawn(PrintActor.behavior, "PrintActor", DispatcherSelector.fromConfig("PrintActor"))
//#defining-dispatcher-in-code
}
{
//#defining-fixed-pool-size-dispatcher
import akka.actor.typed.DispatcherSelector
val myActor =
context.spawn(PrintActor.behavior, "PrintActor", DispatcherSelector.fromConfig("blocking-io-dispatcher"))
//#defining-fixed-pool-size-dispatcher
}
{
//#lookup
// for use with Futures, Scheduler, etc.
import akka.actor.typed.DispatcherSelector
implicit val executionContext = context.system.dispatchers.lookup(DispatcherSelector.fromConfig("my-dispatcher"))
//#lookup
}
}

View file

@ -0,0 +1,18 @@
/*
* Copyright (C) 2019 Lightbend Inc. <https://www.lightbend.com>
*/
package docs.actor.typed
// #print-actor
import akka.actor.typed.Behavior
import akka.actor.typed.scaladsl.Behaviors
object PrintActor {
val behavior: Behavior[Integer] =
Behaviors.receiveMessage(i => {
println(s"PrintActor: ${i}")
Behaviors.same
})
}
// #print-actor