More description about blocking operations (Fixes #22803)
Improve wordings in actor-systems.md Adding Java samples in actor-systems.md, and description for scala.concurrent.blocking() inside Future Move the section for blocking operations to dispatchers.md Fix minor issues in dispatchers.md Remove note about scala.concurrent.blocking which would be unnecessary. Correcting a typo "run run"
This commit is contained in:
parent
2449aeeb14
commit
dfef449268
13 changed files with 552 additions and 198 deletions
Binary file not shown.
|
After Width: | Height: | Size: 202 KiB |
Binary file not shown.
|
After Width: | Height: | Size: 218 KiB |
|
|
@ -1,142 +0,0 @@
|
|||
# Dispatchers
|
||||
|
||||
An Akka `MessageDispatcher` is what makes Akka Actors "tick", it is the engine of the machine so to speak.
|
||||
All `MessageDispatcher` implementations are also an `ExecutionContext`, which means that they can be used
|
||||
to execute arbitrary code, for instance @ref:[Futures](futures.md).
|
||||
|
||||
## Default dispatcher
|
||||
|
||||
Every `ActorSystem` will have a default dispatcher that will be used in case nothing else is configured for an `Actor`.
|
||||
The default dispatcher can be configured, and is by default a `Dispatcher` with the specified `default-executor`.
|
||||
If an ActorSystem is created with an ExecutionContext passed in, this ExecutionContext will be used as the default executor for all
|
||||
dispatchers in this ActorSystem. If no ExecutionContext is given, it will fallback to the executor specified in
|
||||
`akka.actor.default-dispatcher.default-executor.fallback`. By default this is a "fork-join-executor", which
|
||||
gives excellent performance in most cases.
|
||||
|
||||
<a id="dispatcher-lookup"></a>
|
||||
## Looking up a Dispatcher
|
||||
|
||||
Dispatchers implement the `ExecutionContext` interface and can thus be used to run `Future` invocations etc.
|
||||
|
||||
@@snip [DispatcherDocTest.java]($code$/java/jdocs/dispatcher/DispatcherDocTest.java) { #lookup }
|
||||
|
||||
## Setting the dispatcher for an Actor
|
||||
|
||||
So in case you want to give your `Actor` a different dispatcher than the default, you need to do two things, of which the first is
|
||||
is to configure the dispatcher:
|
||||
|
||||
@@snip [DispatcherDocSpec.scala]($code$/scala/docs/dispatcher/DispatcherDocSpec.scala) { #my-dispatcher-config }
|
||||
|
||||
@@@ note
|
||||
|
||||
Note that the `parallelism-max` does not set the upper bound on the total number of threads
|
||||
allocated by the ForkJoinPool. It is a setting specifically talking about the number of *hot*
|
||||
threads the pool keep running in order to reduce the latency of handling a new incoming task.
|
||||
You can read more about parallelism in the JDK's [ForkJoinPool documentation](https://docs.oracle.com/javase/8/jdocs/api/java/util/concurrent/ForkJoinPool.html).
|
||||
|
||||
@@@
|
||||
|
||||
Another example that uses the "thread-pool-executor":
|
||||
|
||||
@@snip [DispatcherDocSpec.scala]($code$/scala/docs/dispatcher/DispatcherDocSpec.scala) { #fixed-pool-size-dispatcher-config }
|
||||
|
||||
@@@ note
|
||||
|
||||
The thread pool executor dispatcher is implemented using by a `java.util.concurrent.ThreadPoolExecutor`.
|
||||
You can read more about it in the JDK's [ThreadPoolExecutor documentation](https://docs.oracle.com/javase/8/jdocs/api/java/util/concurrent/ThreadPoolExecutor.html).
|
||||
|
||||
@@@
|
||||
|
||||
For more options, see the default-dispatcher section of the @ref:[configuration](general/configuration.md).
|
||||
|
||||
Then you create the actor as usual and define the dispatcher in the deployment configuration.
|
||||
|
||||
@@snip [DispatcherDocTest.java]($code$/java/jdocs/dispatcher/DispatcherDocTest.java) { #defining-dispatcher-in-config }
|
||||
|
||||
@@snip [DispatcherDocSpec.scala]($code$/scala/docs/dispatcher/DispatcherDocSpec.scala) { #dispatcher-deployment-config }
|
||||
|
||||
An alternative to the deployment configuration is to define the dispatcher in code.
|
||||
If you define the `dispatcher` in the deployment configuration then this value will be used instead
|
||||
of programmatically provided parameter.
|
||||
|
||||
@@snip [DispatcherDocTest.java]($code$/java/jdocs/dispatcher/DispatcherDocTest.java) { #defining-dispatcher-in-code }
|
||||
|
||||
@@@ note
|
||||
|
||||
The dispatcher you specify in `withDispatcher` and the `dispatcher` property in the deployment
|
||||
configuration is in fact a path into your configuration.
|
||||
So in this example it's a top-level section, but you could for instance put it as a sub-section,
|
||||
where you'd use periods to denote sub-sections, like this: `"foo.bar.my-dispatcher"`
|
||||
|
||||
@@@
|
||||
|
||||
## Types of dispatchers
|
||||
|
||||
There are 3 different types of message dispatchers:
|
||||
|
||||
* **Dispatcher**
|
||||
|
||||
This is an event-based dispatcher that binds a set of Actors to a thread
|
||||
pool. It is the default dispatcher used if one is not specified.
|
||||
|
||||
* Sharability: Unlimited
|
||||
* Mailboxes: Any, creates one per Actor
|
||||
* Use cases: Default dispatcher, Bulkheading
|
||||
* Driven by: `java.util.concurrent.ExecutorService`.
|
||||
Specify using "executor" using "fork-join-executor", "thread-pool-executor" or the FQCN of
|
||||
an `akka.dispatcher.ExecutorServiceConfigurator`.
|
||||
|
||||
* **PinnedDispatcher**
|
||||
|
||||
This dispatcher dedicates a unique thread for each actor using it; i.e.
|
||||
each actor will have its own thread pool with only one thread in the pool.
|
||||
|
||||
* Sharability: None
|
||||
* Mailboxes: Any, creates one per Actor
|
||||
* Use cases: Bulkheading
|
||||
* Driven by: Any `akka.dispatch.ThreadPoolExecutorConfigurator`.
|
||||
By default a "thread-pool-executor".
|
||||
|
||||
* **CallingThreadDispatcher**
|
||||
|
||||
This dispatcher runs invocations on the current thread only. This
|
||||
dispatcher does not create any new threads, but it can be used from
|
||||
different threads concurrently for the same actor.
|
||||
See @ref:[CallingThreadDispatcher](testing.md#callingthreaddispatcher)
|
||||
for details and restrictions.
|
||||
|
||||
* Sharability: Unlimited
|
||||
* Mailboxes: Any, creates one per Actor per Thread (on demand)
|
||||
* Use cases: Testing
|
||||
* Driven by: The calling thread (duh)
|
||||
|
||||
### More dispatcher configuration examples
|
||||
|
||||
Configuring a dispatcher with fixed thread pool size, e.g. for actors that perform blocking IO:
|
||||
|
||||
@@snip [DispatcherDocSpec.scala]($code$/scala/docs/dispatcher/DispatcherDocSpec.scala) { #fixed-pool-size-dispatcher-config }
|
||||
|
||||
And then using it:
|
||||
|
||||
@@snip [DispatcherDocTest.java]($code$/java/jdocs/dispatcher/DispatcherDocTest.java) { #defining-fixed-pool-size-dispatcher }
|
||||
|
||||
Another example that uses the thread pool based on the number of cores (e.g. for CPU bound tasks)
|
||||
|
||||
@@snip [DispatcherDocSpec.scala]($code$/scala/docs/dispatcher/DispatcherDocSpec.scala) { #my-thread-pool-dispatcher-config }
|
||||
|
||||
Configuring a `PinnedDispatcher`:
|
||||
|
||||
@@snip [DispatcherDocSpec.scala]($code$/scala/docs/dispatcher/DispatcherDocSpec.scala) { #my-pinned-dispatcher-config }
|
||||
|
||||
And then using it:
|
||||
|
||||
@@snip [DispatcherDocTest.java]($code$/java/jdocs/dispatcher/DispatcherDocTest.java) { #defining-pinned-dispatcher }
|
||||
|
||||
Note that `thread-pool-executor` configuration as per the above `my-thread-pool-dispatcher` example is
|
||||
NOT applicable. This is because every actor will have its own thread pool when using `PinnedDispatcher`,
|
||||
and that pool will have only one thread.
|
||||
|
||||
Note that it's not guaranteed that the *same* thread is used over time, since the core pool timeout
|
||||
is used for `PinnedDispatcher` to keep resource usage down in case of idle actors. To use the same
|
||||
thread all the time you need to add `thread-pool-executor.allow-core-timeout=off` to the
|
||||
configuration of the `PinnedDispatcher`.
|
||||
1
akka-docs/src/main/paradox/java/dispatchers.md
Symbolic link
1
akka-docs/src/main/paradox/java/dispatchers.md
Symbolic link
|
|
@ -0,0 +1 @@
|
|||
../scala/dispatchers.md
|
||||
|
|
@ -18,13 +18,17 @@ gives excellent performance in most cases.
|
|||
|
||||
Dispatchers implement the `ExecutionContext` interface and can thus be used to run `Future` invocations etc.
|
||||
|
||||
@@snip [DispatcherDocSpec.scala]($code$/scala/docs/dispatcher/DispatcherDocSpec.scala) { #lookup }
|
||||
Scala
|
||||
: @@snip [DispatcherDocSpec.scala]($code$/scala/docs/dispatcher/DispatcherDocSpec.scala) { #lookup }
|
||||
|
||||
Java
|
||||
: @@snip [DispatcherDocTest.java]($code$/java/jdocs/dispatcher/DispatcherDocTest.java) { #lookup }
|
||||
## Setting the dispatcher for an Actor
|
||||
|
||||
So in case you want to give your `Actor` a different dispatcher than the default, you need to do two things, of which the first
|
||||
So in case you want to give your `Actor` a different dispatcher than the default, you need to do two things, of which the first
|
||||
is to configure the dispatcher:
|
||||
|
||||
<!--same config text for Scala & Java-->
|
||||
@@snip [DispatcherDocSpec.scala]($code$/scala/docs/dispatcher/DispatcherDocSpec.scala) { #my-dispatcher-config }
|
||||
|
||||
@@@ note
|
||||
|
|
@ -32,12 +36,13 @@ is to configure the dispatcher:
|
|||
Note that the `parallelism-max` does not set the upper bound on the total number of threads
|
||||
allocated by the ForkJoinPool. It is a setting specifically talking about the number of *hot*
|
||||
threads the pool keep running in order to reduce the latency of handling a new incoming task.
|
||||
You can read more about parallelism in the JDK's [ForkJoinPool documentation](https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ForkJoinPool.html).
|
||||
You can read more about parallelism in the JDK's [ForkJoinPool documentation](https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ForkJoinPool.html)]
|
||||
|
||||
@@@
|
||||
|
||||
Another example that uses the "thread-pool-executor":
|
||||
|
||||
<!--same config text for Scala & Java-->
|
||||
@@snip [DispatcherDocSpec.scala]($code$/scala/docs/dispatcher/DispatcherDocSpec.scala) { #fixed-pool-size-dispatcher-config }
|
||||
|
||||
@@@ note
|
||||
|
|
@ -51,15 +56,24 @@ For more options, see the default-dispatcher section of the @ref:[configuration]
|
|||
|
||||
Then you create the actor as usual and define the dispatcher in the deployment configuration.
|
||||
|
||||
@@snip [DispatcherDocSpec.scala]($code$/scala/docs/dispatcher/DispatcherDocSpec.scala) { #defining-dispatcher-in-config }
|
||||
Scala
|
||||
: @@snip [DispatcherDocSpec.scala]($code$/scala/docs/dispatcher/DispatcherDocSpec.scala) { #defining-dispatcher-in-config }
|
||||
|
||||
@@snip [DispatcherDocSpec.scala]($code$/scala/docs/dispatcher/DispatcherDocSpec.scala) { #dispatcher-deployment-config }
|
||||
Java
|
||||
: @@snip [DispatcherDocTest.java]($code$/java/jdocs/dispatcher/DispatcherDocTest.java) { #defining-dispatcher-in-config }
|
||||
|
||||
<!--same config text for Scala & Java-->
|
||||
@@snip [DispatcherDocSpec.scala]($code$/scala/docs/dispatcher/DispatcherDocSpec.scala) { #dispatcher-deployment-config }
|
||||
|
||||
An alternative to the deployment configuration is to define the dispatcher in code.
|
||||
If you define the `dispatcher` in the deployment configuration then this value will be used instead
|
||||
of programmatically provided parameter.
|
||||
|
||||
@@snip [DispatcherDocSpec.scala]($code$/scala/docs/dispatcher/DispatcherDocSpec.scala) { #defining-dispatcher-in-code }
|
||||
Scala
|
||||
: @@snip [DispatcherDocSpec.scala]($code$/scala/docs/dispatcher/DispatcherDocSpec.scala) { #defining-dispatcher-in-code }
|
||||
|
||||
Java
|
||||
: @@snip [DispatcherDocTest.java]($code$/java/jdocs/dispatcher/DispatcherDocTest.java) { #defining-dispatcher-in-code }
|
||||
|
||||
@@@ note
|
||||
|
||||
|
|
@ -76,7 +90,7 @@ There are 3 different types of message dispatchers:
|
|||
|
||||
* **Dispatcher**
|
||||
|
||||
This is an event-based dispatcher that binds a set of Actors to a thread
|
||||
This is an event-based dispatcher that binds a set of Actors to a thread
|
||||
pool. It is the default dispatcher used if one is not specified.
|
||||
|
||||
* Sharability: Unlimited
|
||||
|
|
@ -96,7 +110,7 @@ There are 3 different types of message dispatchers:
|
|||
* Use cases: Bulkheading
|
||||
* Driven by: Any `akka.dispatch.ThreadPoolExecutorConfigurator`.
|
||||
By default a "thread-pool-executor".
|
||||
|
||||
|
||||
* **CallingThreadDispatcher**
|
||||
|
||||
This dispatcher runs invocations on the current thread only. This
|
||||
|
|
@ -118,19 +132,29 @@ Configuring a dispatcher with fixed thread pool size, e.g. for actors that perfo
|
|||
|
||||
And then using it:
|
||||
|
||||
@@snip [DispatcherDocSpec.scala]($code$/scala/docs/dispatcher/DispatcherDocSpec.scala) { #defining-fixed-pool-size-dispatcher }
|
||||
Scala
|
||||
: @@snip [DispatcherDocSpec.scala]($code$/scala/docs/dispatcher/DispatcherDocSpec.scala) { #defining-fixed-pool-size-dispatcher }
|
||||
|
||||
Java
|
||||
: @@snip [DispatcherDocTest.java]($code$/java/jdocs/dispatcher/DispatcherDocTest.java) { #defining-fixed-pool-size-dispatcher }
|
||||
|
||||
Another example that uses the thread pool based on the number of cores (e.g. for CPU bound tasks)
|
||||
|
||||
@@snip [DispatcherDocSpec.scala]($code$/scala/docs/dispatcher/DispatcherDocSpec.scala) { #my-thread-pool-dispatcher-config }
|
||||
<!--same config text for Scala & Java-->
|
||||
@@snip [DispatcherDocSpec.scala]($code$/scala/docs/dispatcher/DispatcherDocSpec.scala) {#my-thread-pool-dispatcher-config }
|
||||
|
||||
Configuring a `PinnedDispatcher`:
|
||||
|
||||
@@snip [DispatcherDocSpec.scala]($code$/scala/docs/dispatcher/DispatcherDocSpec.scala) { #my-pinned-dispatcher-config }
|
||||
<!--same config text for Scala & Java-->
|
||||
@@snip [DispatcherDocSpec.scala]($code$/scala/docs/dispatcher/DispatcherDocSpec.scala) {#my-pinned-dispatcher-config }
|
||||
|
||||
And then using it:
|
||||
|
||||
@@snip [DispatcherDocSpec.scala]($code$/scala/docs/dispatcher/DispatcherDocSpec.scala) { #defining-pinned-dispatcher }
|
||||
Scala
|
||||
: @@snip [DispatcherDocSpec.scala]($code$/scala/docs/dispatcher/DispatcherDocSpec.scala) { #defining-pinned-dispatcher }
|
||||
|
||||
Java
|
||||
: @@snip [DispatcherDocTest.java]($code$/java/jdocs/dispatcher/DispatcherDocTest.java) { #defining-pinned-dispatcher }
|
||||
|
||||
Note that `thread-pool-executor` configuration as per the above `my-thread-pool-dispatcher` example is
|
||||
NOT applicable. This is because every actor will have its own thread pool when using `PinnedDispatcher`,
|
||||
|
|
@ -140,3 +164,222 @@ Note that it's not guaranteed that the *same* thread is used over time, since th
|
|||
is used for `PinnedDispatcher` to keep resource usage down in case of idle actors. To use the same
|
||||
thread all the time you need to add `thread-pool-executor.allow-core-timeout=off` to the
|
||||
configuration of the `PinnedDispatcher`.
|
||||
|
||||
## Blocking Needs Careful Management
|
||||
|
||||
In some cases it is unavoidable to do blocking operations, i.e. to put a thread
|
||||
to sleep for an indeterminate time, waiting for an external event to occur.
|
||||
Examples are legacy RDBMS drivers or messaging APIs, and the underlying reason
|
||||
is typically that (network) I/O occurs under the covers.
|
||||
|
||||
|
||||
Scala
|
||||
: @@snip [BlockingDispatcherSample.scala]($akka$/akka-docs/src/test/scala/docs/actor/BlockingDispatcherSample.scala) { #blocking-in-actor }
|
||||
|
||||
Java
|
||||
: @@snip [BlockingDispatcherSample.java]($akka$/akka-docs/src/test/java/jdocs/actor/BlockingActor.java) { #blocking-in-actor }
|
||||
|
||||
|
||||
When facing this, you
|
||||
may be tempted to just wrap the blocking call inside a `Future` and work
|
||||
with that instead, but this strategy is too simple: you are quite likely to
|
||||
find bottlenecks or run out of memory or threads when the application runs
|
||||
under increased load.
|
||||
|
||||
Scala
|
||||
: @@snip [BlockingDispatcherSample.scala]($akka$/akka-docs/src/test/scala/docs/actor/BlockingDispatcherSample.scala) { #blocking-in-future }
|
||||
|
||||
Java
|
||||
: @@snip [BlockingDispatcherSample.java]($akka$/akka-docs/src/test/java/jdocs/actor/BlockingFutureActor.java) { #blocking-in-future }
|
||||
|
||||
|
||||
### Problem: Blocking on default dispatcher
|
||||
|
||||
The key here is this line:
|
||||
|
||||
@@@ div { .group-scala }
|
||||
|
||||
```scala
|
||||
implicit val executionContext: ExecutionContext = context.dispatcher
|
||||
```
|
||||
|
||||
@@@
|
||||
|
||||
@@@ div { .group-java }
|
||||
|
||||
```java
|
||||
ExecutionContext ec = getContext().dispatcher();
|
||||
```
|
||||
|
||||
@@@
|
||||
|
||||
Using @scala[`context.dispatcher`] @java[`getContext().dispatcher()`] as the dispatcher on which the blocking `Future`
|
||||
executes can be a problem, since this dispatcher is by default used for all other actor processing
|
||||
unless you @ref:[set up a separate dispatcher for the actor](dispatchers.md#setting-the-dispatcher-for-an-actor).
|
||||
|
||||
If all of the available threads are blocked, then all the actors on the same dispatcher will starve for threads and
|
||||
will not be able to process incoming messages.
|
||||
|
||||
@@@ note
|
||||
|
||||
Blocking APIs should also be avoided if possible. Try to find or build Reactive APIs,
|
||||
such that blocking is minimised, or moved over to dedicated dispatchers.
|
||||
|
||||
Often when integrating with existing libraries or systems it is not possible to
|
||||
avoid blocking APIs. The following solution explains how to handle blocking
|
||||
operations properly.
|
||||
|
||||
Note that the same hints apply to managing blocking operations anywhere in Akka,
|
||||
including Streams, Http and other reactive libraries built on top of it.
|
||||
|
||||
@@@
|
||||
|
||||
Let's set up an application with the above `BlockingFutureActor` and the following `PrintActor`.
|
||||
|
||||
Scala
|
||||
: @@snip [BlockingDispatcherSample.scala]($akka$/akka-docs/src/test/scala/docs/actor/BlockingDispatcherSample.scala) { #print-actor }
|
||||
|
||||
Java
|
||||
: @@snip [BlockingDispatcherSample.java]($akka$/akka-docs/src/test/java/jdocs/actor/PrintActor.java) { #print-actor }
|
||||
|
||||
|
||||
Scala
|
||||
: @@snip [BlockingDispatcherSample.scala]($akka$/akka-docs/src/test/scala/docs/actor/BlockingDispatcherSample.scala) { #blocking-main }
|
||||
|
||||
Java
|
||||
: @@snip [BlockingDispatcherSample.java]($akka$/akka-docs/src/test/java/jdocs/actor/BlockingDispatcherTest.java) { #blocking-main }
|
||||
|
||||
|
||||
Here the app is sending 100 messages to `BlockingFutureActor` and `PrintActor` and large numbers
|
||||
of `akka.actor.default-dispatcher` threads are handling requests. When you run the above code,
|
||||
you will likely to see the entire application gets stuck somewhere like this:
|
||||
|
||||
```
|
||||
> PrintActor: 44
|
||||
> PrintActor: 45
|
||||
```
|
||||
|
||||
`PrintActor` is considered non-blocking, however it is not able to proceed with handling the remaining messages,
|
||||
since all the threads are occupied and blocked by the other blocking actor - thus leading to thread starvation.
|
||||
|
||||
In the thread state diagrams below the colours have the following meaning:
|
||||
|
||||
* Turquoise - Sleeping state
|
||||
* Orange - Waiting state
|
||||
* Green - Runnable state
|
||||
|
||||
The thread information was recorded using the YourKit profiler, however any good JVM profiler
|
||||
has this feature (including the free and bundled with the Oracle JDK VisualVM, as well as Oracle Flight Recorder).
|
||||
|
||||
The orange portion of the thread shows that it is idle. Idle threads are fine -
|
||||
they're ready to accept new work. However, large amount of turquoise (blocked, or sleeping as in our example) threads
|
||||
is very bad and leads to thread starvation.
|
||||
|
||||
@@@ note
|
||||
|
||||
If you own a Lightbend subscription you can use the commercial [Thread Starvation Detector](http://developer.lightbend.com/docs/akka-commercial-addons/current/starvation-detector.html)
|
||||
which will issue warning log statements if it detects any of your dispatchers suffering from starvation and other.
|
||||
It is a helpful first step to identify the problem is occurring in a production system,
|
||||
and then you can apply the proposed solutions as explained below.
|
||||
|
||||
@@@
|
||||
|
||||

|
||||
|
||||
In the above example we put the code under load by sending hundreds of messages to the blocking actor
|
||||
which causes threads of the default dispatcher to be blocked.
|
||||
The fork join pool based dispatcher in Akka then attempts to compensate for this blocking by adding more threads to the pool
|
||||
(`default-akka.actor.default-dispatcher 18,19,20,...`).
|
||||
This however is not able to help if those too will immediately get blocked,
|
||||
and eventually the blocking operations will dominate the entire dispatcher.
|
||||
|
||||
In essence, the `Thread.sleep` operation has dominated all threads and caused anything
|
||||
executing on the default dispatcher to starve for resources (including any actor
|
||||
that you have not configured an explicit dispatcher for).
|
||||
|
||||
### Solution: Dedicated dispatcher for blocking operations
|
||||
|
||||
One of the most efficient methods of isolating the blocking behaviour such that it does not impact the rest of the system
|
||||
is to prepare and use a dedicated dispatcher for all those blocking operations.
|
||||
This technique is often referred to as as "bulk-heading" or simply "isolating blocking".
|
||||
|
||||
In `application.conf`, the dispatcher dedicated to blocking behaviour should
|
||||
be configured as follows:
|
||||
|
||||
<!--same config text for Scala & Java-->
|
||||
@@snip [BlockingDispatcherSample.scala]($akka$/akka-docs/src/test/scala/docs/actor/BlockingDispatcherSample.scala) { #my-blocking-dispatcher-config }
|
||||
|
||||
A `thread-pool-executor` based dispatcher allows us to set a limit on the number of threads it will host,
|
||||
and this way we gain tight control over how at-most-how-many blocked threads will be in the system.
|
||||
|
||||
The exact size should be fine tuned depending on the workload you're expecting to run on this dispatcher
|
||||
as well as the number of cores of the machine you're running the application on.
|
||||
Usually a small number around the number of cores is a good default to start from.
|
||||
|
||||
Whenever blocking has to be done, use the above configured dispatcher
|
||||
instead of the default one:
|
||||
|
||||
Scala
|
||||
: @@snip [BlockingDispatcherSample.scala]($akka$/akka-docs/src/test/scala/docs/actor/BlockingDispatcherSample.scala) { #separate-dispatcher }
|
||||
|
||||
Java
|
||||
: @@snip [BlockingDispatcherSample.java]($akka$/akka-docs/src/test/java/jdocs/actor/SeparateDispatcherFutureActor.java) { #separate-dispatcher }
|
||||
|
||||
The thread pool behaviour is shown in the below diagram.
|
||||
|
||||

|
||||
|
||||
Messages sent to `SeparateDispatcherFutureActor` and `PrintActor` are easily handled by the default dispatcher - the
|
||||
green lines, which represent the actual execution.
|
||||
|
||||
When blocking operations are run on the `my-blocking-dispatcher`,
|
||||
it uses the threads (up to the configured limit) to handle these operations.
|
||||
The sleeping in this case is nicely isolated to just this dispatcher, and the default one remains unaffected,
|
||||
allowing the rest of the application to proceed as if nothing bad was happening. After
|
||||
a certain period idleness, threads started by this dispatcher will be shut down.
|
||||
|
||||
In this case, the throughput of other actors was not impacted -
|
||||
they were still served on the default dispatcher.
|
||||
|
||||
This is the recommended way of dealing with any kind of blocking in reactive
|
||||
applications.
|
||||
|
||||
For a similar discussion specific about Akka HTTP refer to, @extref:[Handling blocking operations in Akka HTTP](akka.http:java/http/handling-blocking-operations-in-akka-http-routes.html#solution-dedicated-dispatcher-for-blocking-operations).
|
||||
|
||||
### Available solutions to blocking operations
|
||||
|
||||
The non-exhaustive list of adequate solutions to the “blocking problem”
|
||||
includes the following suggestions:
|
||||
|
||||
* Do the blocking call within an actor (or a set of actors managed by a router
|
||||
@ref:[router](routing.md), making sure to
|
||||
configure a thread pool which is either dedicated for this purpose or
|
||||
sufficiently sized.
|
||||
* Do the blocking call within a `Future`, ensuring an upper bound on
|
||||
the number of such calls at any point in time (submitting an unbounded
|
||||
number of tasks of this nature will exhaust your memory or thread limits).
|
||||
* Do the blocking call within a `Future`, providing a thread pool with
|
||||
an upper limit on the number of threads which is appropriate for the
|
||||
hardware on which the application runs, as explained in detail in this section.
|
||||
* Dedicate a single thread to manage a set of blocking resources (e.g. a NIO
|
||||
selector driving multiple channels) and dispatch events as they occur as
|
||||
actor messages.
|
||||
|
||||
The first possibility is especially well-suited for resources which are
|
||||
single-threaded in nature, like database handles which traditionally can only
|
||||
execute one outstanding query at a time and use internal synchronization to
|
||||
ensure this. A common pattern is to create a router for N actors, each of which
|
||||
wraps a single DB connection and handles queries as sent to the router. The
|
||||
number N must then be tuned for maximum throughput, which will vary depending
|
||||
on which DBMS is deployed on what hardware.
|
||||
|
||||
@@@ note
|
||||
|
||||
Configuring thread pools is a task best delegated to Akka, simply configure
|
||||
in the `application.conf` and instantiate through an
|
||||
@ref:[`ActorSystem`](#dispatcher-lookup)
|
||||
|
||||
@@@
|
||||
|
||||
|
||||
|
||||
|
|
|
|||
|
|
@ -102,50 +102,6 @@ respect to fault-handling (both considering the granularity of configuration
|
|||
and the performance) and it also reduces the strain on the guardian actor,
|
||||
which is a single point of contention if over-used.
|
||||
|
||||
## Blocking Needs Careful Management
|
||||
|
||||
In some cases it is unavoidable to do blocking operations, i.e. to put a thread
|
||||
to sleep for an indeterminate time, waiting for an external event to occur.
|
||||
Examples are legacy RDBMS drivers or messaging APIs, and the underlying reason
|
||||
is typically that (network) I/O occurs under the covers. When facing this, you
|
||||
may be tempted to just wrap the blocking call inside a `Future` and work
|
||||
with that instead, but this strategy is too simple: you are quite likely to
|
||||
find bottlenecks or run out of memory or threads when the application runs
|
||||
under increased load.
|
||||
|
||||
The non-exhaustive list of adequate solutions to the “blocking problem”
|
||||
includes the following suggestions:
|
||||
|
||||
* Do the blocking call within an actor (or a set of actors managed by a router
|
||||
@ref:[router](../routing.md), making sure to
|
||||
configure a thread pool which is either dedicated for this purpose or
|
||||
sufficiently sized.
|
||||
* Do the blocking call within a `Future`, ensuring an upper bound on
|
||||
the number of such calls at any point in time (submitting an unbounded
|
||||
number of tasks of this nature will exhaust your memory or thread limits).
|
||||
* Do the blocking call within a `Future`, providing a thread pool with
|
||||
an upper limit on the number of threads which is appropriate for the
|
||||
hardware on which the application runs.
|
||||
* Dedicate a single thread to manage a set of blocking resources (e.g. a NIO
|
||||
selector driving multiple channels) and dispatch events as they occur as
|
||||
actor messages.
|
||||
|
||||
The first possibility is especially well-suited for resources which are
|
||||
single-threaded in nature, like database handles which traditionally can only
|
||||
execute one outstanding query at a time and use internal synchronization to
|
||||
ensure this. A common pattern is to create a router for N actors, each of which
|
||||
wraps a single DB connection and handles queries as sent to the router. The
|
||||
number N must then be tuned for maximum throughput, which will vary depending
|
||||
on which DBMS is deployed on what hardware.
|
||||
|
||||
@@@ note
|
||||
|
||||
Configuring thread pools is a task best delegated to Akka, simply configure
|
||||
in the `application.conf` and instantiate through an
|
||||
@ref:[`ActorSystem`](../dispatchers.md#dispatcher-lookup)
|
||||
|
||||
@@@
|
||||
|
||||
## What you should not concern yourself with
|
||||
|
||||
An actor system manages the resources it is configured to use in order to run
|
||||
|
|
|
|||
22
akka-docs/src/test/java/jdocs/actor/BlockingActor.java
Normal file
22
akka-docs/src/test/java/jdocs/actor/BlockingActor.java
Normal file
|
|
@ -0,0 +1,22 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2017 Lightbend Inc. <http://www.lightbend.com>
|
||||
*/
|
||||
|
||||
package jdocs.actor;
|
||||
|
||||
import akka.actor.AbstractActor;
|
||||
|
||||
// #blocking-in-actor
|
||||
class BlockingActor extends AbstractActor {
|
||||
|
||||
@Override
|
||||
public Receive createReceive() {
|
||||
return receiveBuilder()
|
||||
.match(Integer.class, i -> {
|
||||
Thread.sleep(5000); //block for 5 seconds, representing blocking I/O, etc
|
||||
System.out.println("Blocking operation finished: " + i);
|
||||
})
|
||||
.build();
|
||||
}
|
||||
}
|
||||
// #blocking-in-actor
|
||||
|
|
@ -0,0 +1,32 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2017 Lightbend Inc. <http://www.lightbend.com>
|
||||
*/
|
||||
package jdocs.actor;
|
||||
|
||||
import akka.actor.ActorRef;
|
||||
import akka.actor.ActorSystem;
|
||||
import akka.actor.Props;
|
||||
|
||||
public class BlockingDispatcherTest {
|
||||
public static void main(String args[]) {
|
||||
ActorSystem system = ActorSystem.create("BlockingDispatcherTest");
|
||||
|
||||
try {
|
||||
// #blocking-main
|
||||
ActorRef actor1 = system.actorOf(Props.create(BlockingFutureActor.class));
|
||||
ActorRef actor2 = system.actorOf(Props.create(PrintActor.class));
|
||||
|
||||
for (int i = 0; i < 100; i++) {
|
||||
actor1.tell(i, ActorRef.noSender());
|
||||
actor2.tell(i, ActorRef.noSender());
|
||||
}
|
||||
// #blocking-main
|
||||
Thread.sleep(5000 * 6);
|
||||
|
||||
} catch (InterruptedException e) {
|
||||
//swallow the exception
|
||||
} finally {
|
||||
system.terminate();
|
||||
}
|
||||
}
|
||||
}
|
||||
30
akka-docs/src/test/java/jdocs/actor/BlockingFutureActor.java
Normal file
30
akka-docs/src/test/java/jdocs/actor/BlockingFutureActor.java
Normal file
|
|
@ -0,0 +1,30 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2017 Lightbend Inc. <http://www.lightbend.com>
|
||||
*/
|
||||
|
||||
package jdocs.actor;
|
||||
|
||||
import akka.actor.AbstractActor;
|
||||
import akka.dispatch.Futures;
|
||||
import scala.concurrent.ExecutionContext;
|
||||
import scala.concurrent.Future;
|
||||
|
||||
// #blocking-in-future
|
||||
class BlockingFutureActor extends AbstractActor {
|
||||
ExecutionContext ec = getContext().dispatcher();
|
||||
|
||||
@Override
|
||||
public Receive createReceive() {
|
||||
return receiveBuilder()
|
||||
.match(Integer.class, i -> {
|
||||
System.out.println("Calling blocking Future: " + i);
|
||||
Future<Integer> f = Futures.future(() -> {
|
||||
Thread.sleep(5000);
|
||||
System.out.println("Blocking future finished: " + i);
|
||||
return i;
|
||||
}, ec);
|
||||
})
|
||||
.build();
|
||||
}
|
||||
}
|
||||
// #blocking-in-future
|
||||
20
akka-docs/src/test/java/jdocs/actor/PrintActor.java
Normal file
20
akka-docs/src/test/java/jdocs/actor/PrintActor.java
Normal file
|
|
@ -0,0 +1,20 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2017 Lightbend Inc. <http://www.lightbend.com>
|
||||
*/
|
||||
|
||||
package jdocs.actor;
|
||||
|
||||
import akka.actor.AbstractActor;
|
||||
|
||||
// #print-actor
|
||||
class PrintActor extends AbstractActor {
|
||||
@Override
|
||||
public Receive createReceive() {
|
||||
return receiveBuilder()
|
||||
.match(Integer.class, i -> {
|
||||
System.out.println("PrintActor: " + i);
|
||||
})
|
||||
.build();
|
||||
}
|
||||
}
|
||||
// #print-actor
|
||||
|
|
@ -0,0 +1,30 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2017 Lightbend Inc. <http://www.lightbend.com>
|
||||
*/
|
||||
|
||||
package jdocs.actor;
|
||||
|
||||
import akka.actor.AbstractActor;
|
||||
import akka.dispatch.Futures;
|
||||
import scala.concurrent.ExecutionContext;
|
||||
import scala.concurrent.Future;
|
||||
|
||||
// #separate-dispatcher
|
||||
class SeparateDispatcherFutureActor extends AbstractActor {
|
||||
ExecutionContext ec = getContext().getSystem().dispatchers().lookup("my-blocking-dispatcher");
|
||||
|
||||
@Override
|
||||
public Receive createReceive() {
|
||||
return receiveBuilder()
|
||||
.match(Integer.class, i -> {
|
||||
System.out.println("Calling blocking Future on separate dispatcher: " + i);
|
||||
Future<Integer> f = Futures.future(() -> {
|
||||
Thread.sleep(5000);
|
||||
System.out.println("Blocking future finished: " + i);
|
||||
return i;
|
||||
}, ec);
|
||||
})
|
||||
.build();
|
||||
}
|
||||
}
|
||||
// #separate-dispatcher
|
||||
|
|
@ -0,0 +1,46 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2017 Lightbend Inc. <http://www.lightbend.com>
|
||||
*/
|
||||
|
||||
package jdocs.actor;
|
||||
|
||||
import akka.actor.ActorRef;
|
||||
import akka.actor.ActorSystem;
|
||||
import akka.actor.Props;
|
||||
import com.typesafe.config.Config;
|
||||
import com.typesafe.config.ConfigFactory;
|
||||
|
||||
class SeparateDispatcherTest {
|
||||
public static void main(String args[]) {
|
||||
Config config = ConfigFactory.parseString(
|
||||
"my-blocking-dispatcher {\n" +
|
||||
" type = Dispatcher\n" +
|
||||
" executor = \"thread-pool-executor\"\n" +
|
||||
" thread-pool-executor {\n" +
|
||||
" fixed-pool-size = 16\n" +
|
||||
" }\n" +
|
||||
" throughput = 1\n" +
|
||||
"}\n"
|
||||
);
|
||||
|
||||
ActorSystem system = ActorSystem.create("BlockingDispatcherTest", config);
|
||||
|
||||
try {
|
||||
// #separate-dispatcher-main
|
||||
ActorRef actor1 = system.actorOf(Props.create(SeparateDispatcherFutureActor.class));
|
||||
ActorRef actor2 = system.actorOf(Props.create(PrintActor.class));
|
||||
|
||||
for (int i = 0; i < 100; i++) {
|
||||
actor1.tell(i, ActorRef.noSender());
|
||||
actor2.tell(i, ActorRef.noSender());
|
||||
}
|
||||
// #separate-dispatcher-main
|
||||
Thread.sleep(5000 * 6);
|
||||
|
||||
} catch (InterruptedException e) {
|
||||
//swallow the exception
|
||||
} finally {
|
||||
system.terminate();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,115 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2017 Lightbend Inc. <http://www.lightbend.com>
|
||||
*/
|
||||
package docs.actor
|
||||
|
||||
import akka.actor.{ Actor, ActorSystem, Props }
|
||||
import com.typesafe.config.{ Config, ConfigFactory }
|
||||
|
||||
import scala.concurrent.{ ExecutionContext, Future }
|
||||
|
||||
// #blocking-in-actor
|
||||
class BlockingActor extends Actor {
|
||||
def receive = {
|
||||
case i: Int =>
|
||||
Thread.sleep(5000) //block for 5 seconds, representing blocking I/O, etc
|
||||
println(s"Blocking operation finished: ${i}")
|
||||
}
|
||||
}
|
||||
// #blocking-in-actor
|
||||
|
||||
// #blocking-in-future
|
||||
class BlockingFutureActor extends Actor {
|
||||
implicit val executionContext: ExecutionContext = context.dispatcher
|
||||
|
||||
def receive = {
|
||||
case i: Int =>
|
||||
println(s"Calling blocking Future: ${i}")
|
||||
Future {
|
||||
Thread.sleep(5000) //block for 5 seconds
|
||||
println(s"Blocking future finished ${i}")
|
||||
}
|
||||
}
|
||||
}
|
||||
// #blocking-in-future
|
||||
|
||||
// #separate-dispatcher
|
||||
class SeparateDispatcherFutureActor extends Actor {
|
||||
implicit val executionContext: ExecutionContext = context.system.dispatchers.lookup("my-blocking-dispatcher")
|
||||
|
||||
def receive = {
|
||||
case i: Int =>
|
||||
println(s"Calling blocking Future: ${i}")
|
||||
Future {
|
||||
Thread.sleep(5000) //block for 5 seconds
|
||||
println(s"Blocking future finished ${i}")
|
||||
}
|
||||
}
|
||||
}
|
||||
// #separate-dispatcher
|
||||
|
||||
// #print-actor
|
||||
class PrintActor extends Actor {
|
||||
def receive = {
|
||||
case i: Int =>
|
||||
println(s"PrintActor: ${i}")
|
||||
}
|
||||
}
|
||||
// #print-actor
|
||||
|
||||
object BlockingDispatcherSample {
|
||||
def main(args: Array[String]) = {
|
||||
val system = ActorSystem("BlockingDispatcherSample")
|
||||
|
||||
try {
|
||||
// #blocking-main
|
||||
val actor1 = system.actorOf(Props(new BlockingFutureActor))
|
||||
val actor2 = system.actorOf(Props(new PrintActor))
|
||||
|
||||
for (i <- 1 to 100) {
|
||||
actor1 ! i
|
||||
actor2 ! i
|
||||
}
|
||||
// #blocking-main
|
||||
} finally {
|
||||
Thread.sleep(5000 * 6)
|
||||
system.terminate()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
object SeparateDispatcherSample {
|
||||
def main(args: Array[String]) = {
|
||||
|
||||
val config = ConfigFactory.parseString(
|
||||
"""
|
||||
//#my-blocking-dispatcher-config
|
||||
my-blocking-dispatcher {
|
||||
type = Dispatcher
|
||||
executor = "thread-pool-executor"
|
||||
thread-pool-executor {
|
||||
fixed-pool-size = 16
|
||||
}
|
||||
throughput = 1
|
||||
}
|
||||
//#my-blocking-dispatcher-config
|
||||
"""
|
||||
)
|
||||
val system = ActorSystem("SeparateDispatcherSample", config)
|
||||
|
||||
try {
|
||||
// #separate-dispatcher-main
|
||||
val actor1 = system.actorOf(Props(new SeparateDispatcherFutureActor))
|
||||
val actor2 = system.actorOf(Props(new PrintActor))
|
||||
|
||||
for (i <- 1 to 100) {
|
||||
actor1 ! i
|
||||
actor2 ! i
|
||||
}
|
||||
// #separate-dispatcher-main
|
||||
} finally {
|
||||
Thread.sleep(5000 * 6)
|
||||
system.terminate()
|
||||
}
|
||||
}
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue