diff --git a/akka-docs/build.sbt b/akka-docs/build.sbt index d2f1f6de15..fe1a68beeb 100644 --- a/akka-docs/build.sbt +++ b/akka-docs/build.sbt @@ -19,6 +19,7 @@ name in (Compile, paradox) := "Akka" paradoxProperties ++= Map( "akka.canonical.base_url" -> "http://doc.akka.io/docs/akka/current", "github.base_url" -> GitHub.url(version.value), // for links like this: @github[#1](#1) or @github[83986f9](83986f9) + "extref.akka.http.base_url" -> "http://doc.akka.io/docs/akka-http/current", "extref.wikipedia.base_url" -> "https://en.wikipedia.org/wiki/%s", "extref.github.base_url" -> (GitHub.url(version.value) + "/%s"), // for links to our sources "extref.samples.base_url" -> "https://github.com/akka/akka-samples/tree/2.5/%s", diff --git a/akka-docs/src/main/paradox/images/dispatcher-behaviour-on-bad-code.png b/akka-docs/src/main/paradox/images/dispatcher-behaviour-on-bad-code.png new file mode 100644 index 0000000000..8cfa3b8a8c Binary files /dev/null and b/akka-docs/src/main/paradox/images/dispatcher-behaviour-on-bad-code.png differ diff --git a/akka-docs/src/main/paradox/images/dispatcher-behaviour-on-good-code.png b/akka-docs/src/main/paradox/images/dispatcher-behaviour-on-good-code.png new file mode 100644 index 0000000000..c764b2f737 Binary files /dev/null and b/akka-docs/src/main/paradox/images/dispatcher-behaviour-on-good-code.png differ diff --git a/akka-docs/src/main/paradox/java/dispatchers.md b/akka-docs/src/main/paradox/java/dispatchers.md deleted file mode 100644 index d5c12ee0de..0000000000 --- a/akka-docs/src/main/paradox/java/dispatchers.md +++ /dev/null @@ -1,142 +0,0 @@ -# Dispatchers - -An Akka `MessageDispatcher` is what makes Akka Actors "tick", it is the engine of the machine so to speak. -All `MessageDispatcher` implementations are also an `ExecutionContext`, which means that they can be used -to execute arbitrary code, for instance @ref:[Futures](futures.md). - -## Default dispatcher - -Every `ActorSystem` will have a default dispatcher that will be used in case nothing else is configured for an `Actor`. -The default dispatcher can be configured, and is by default a `Dispatcher` with the specified `default-executor`. -If an ActorSystem is created with an ExecutionContext passed in, this ExecutionContext will be used as the default executor for all -dispatchers in this ActorSystem. If no ExecutionContext is given, it will fallback to the executor specified in -`akka.actor.default-dispatcher.default-executor.fallback`. By default this is a "fork-join-executor", which -gives excellent performance in most cases. - - -## Looking up a Dispatcher - -Dispatchers implement the `ExecutionContext` interface and can thus be used to run `Future` invocations etc. - -@@snip [DispatcherDocTest.java]($code$/java/jdocs/dispatcher/DispatcherDocTest.java) { #lookup } - -## Setting the dispatcher for an Actor - -So in case you want to give your `Actor` a different dispatcher than the default, you need to do two things, of which the first is -is to configure the dispatcher: - -@@snip [DispatcherDocSpec.scala]($code$/scala/docs/dispatcher/DispatcherDocSpec.scala) { #my-dispatcher-config } - -@@@ note - -Note that the `parallelism-max` does not set the upper bound on the total number of threads -allocated by the ForkJoinPool. It is a setting specifically talking about the number of *hot* -threads the pool keep running in order to reduce the latency of handling a new incoming task. -You can read more about parallelism in the JDK's [ForkJoinPool documentation](https://docs.oracle.com/javase/8/jdocs/api/java/util/concurrent/ForkJoinPool.html). - -@@@ - -Another example that uses the "thread-pool-executor": - -@@snip [DispatcherDocSpec.scala]($code$/scala/docs/dispatcher/DispatcherDocSpec.scala) { #fixed-pool-size-dispatcher-config } - -@@@ note - -The thread pool executor dispatcher is implemented using by a `java.util.concurrent.ThreadPoolExecutor`. -You can read more about it in the JDK's [ThreadPoolExecutor documentation](https://docs.oracle.com/javase/8/jdocs/api/java/util/concurrent/ThreadPoolExecutor.html). - -@@@ - -For more options, see the default-dispatcher section of the @ref:[configuration](general/configuration.md). - -Then you create the actor as usual and define the dispatcher in the deployment configuration. - -@@snip [DispatcherDocTest.java]($code$/java/jdocs/dispatcher/DispatcherDocTest.java) { #defining-dispatcher-in-config } - -@@snip [DispatcherDocSpec.scala]($code$/scala/docs/dispatcher/DispatcherDocSpec.scala) { #dispatcher-deployment-config } - -An alternative to the deployment configuration is to define the dispatcher in code. -If you define the `dispatcher` in the deployment configuration then this value will be used instead -of programmatically provided parameter. - -@@snip [DispatcherDocTest.java]($code$/java/jdocs/dispatcher/DispatcherDocTest.java) { #defining-dispatcher-in-code } - -@@@ note - -The dispatcher you specify in `withDispatcher` and the `dispatcher` property in the deployment -configuration is in fact a path into your configuration. -So in this example it's a top-level section, but you could for instance put it as a sub-section, -where you'd use periods to denote sub-sections, like this: `"foo.bar.my-dispatcher"` - -@@@ - -## Types of dispatchers - -There are 3 different types of message dispatchers: - -* **Dispatcher** - - This is an event-based dispatcher that binds a set of Actors to a thread - pool. It is the default dispatcher used if one is not specified. - - * Sharability: Unlimited - * Mailboxes: Any, creates one per Actor - * Use cases: Default dispatcher, Bulkheading - * Driven by: `java.util.concurrent.ExecutorService`. - Specify using "executor" using "fork-join-executor", "thread-pool-executor" or the FQCN of - an `akka.dispatcher.ExecutorServiceConfigurator`. - -* **PinnedDispatcher** - - This dispatcher dedicates a unique thread for each actor using it; i.e. - each actor will have its own thread pool with only one thread in the pool. - - * Sharability: None - * Mailboxes: Any, creates one per Actor - * Use cases: Bulkheading - * Driven by: Any `akka.dispatch.ThreadPoolExecutorConfigurator`. - By default a "thread-pool-executor". - -* **CallingThreadDispatcher** - - This dispatcher runs invocations on the current thread only. This - dispatcher does not create any new threads, but it can be used from - different threads concurrently for the same actor. - See @ref:[CallingThreadDispatcher](testing.md#callingthreaddispatcher) - for details and restrictions. - - * Sharability: Unlimited - * Mailboxes: Any, creates one per Actor per Thread (on demand) - * Use cases: Testing - * Driven by: The calling thread (duh) - -### More dispatcher configuration examples - -Configuring a dispatcher with fixed thread pool size, e.g. for actors that perform blocking IO: - -@@snip [DispatcherDocSpec.scala]($code$/scala/docs/dispatcher/DispatcherDocSpec.scala) { #fixed-pool-size-dispatcher-config } - -And then using it: - -@@snip [DispatcherDocTest.java]($code$/java/jdocs/dispatcher/DispatcherDocTest.java) { #defining-fixed-pool-size-dispatcher } - -Another example that uses the thread pool based on the number of cores (e.g. for CPU bound tasks) - -@@snip [DispatcherDocSpec.scala]($code$/scala/docs/dispatcher/DispatcherDocSpec.scala) { #my-thread-pool-dispatcher-config } - -Configuring a `PinnedDispatcher`: - -@@snip [DispatcherDocSpec.scala]($code$/scala/docs/dispatcher/DispatcherDocSpec.scala) { #my-pinned-dispatcher-config } - -And then using it: - -@@snip [DispatcherDocTest.java]($code$/java/jdocs/dispatcher/DispatcherDocTest.java) { #defining-pinned-dispatcher } - -Note that `thread-pool-executor` configuration as per the above `my-thread-pool-dispatcher` example is -NOT applicable. This is because every actor will have its own thread pool when using `PinnedDispatcher`, -and that pool will have only one thread. - -Note that it's not guaranteed that the *same* thread is used over time, since the core pool timeout -is used for `PinnedDispatcher` to keep resource usage down in case of idle actors. To use the same -thread all the time you need to add `thread-pool-executor.allow-core-timeout=off` to the -configuration of the `PinnedDispatcher`. diff --git a/akka-docs/src/main/paradox/java/dispatchers.md b/akka-docs/src/main/paradox/java/dispatchers.md new file mode 120000 index 0000000000..4ffd76c4a9 --- /dev/null +++ b/akka-docs/src/main/paradox/java/dispatchers.md @@ -0,0 +1 @@ +../scala/dispatchers.md \ No newline at end of file diff --git a/akka-docs/src/main/paradox/scala/dispatchers.md b/akka-docs/src/main/paradox/scala/dispatchers.md index fd1e5fca88..9dbb583ef2 100644 --- a/akka-docs/src/main/paradox/scala/dispatchers.md +++ b/akka-docs/src/main/paradox/scala/dispatchers.md @@ -18,13 +18,17 @@ gives excellent performance in most cases. Dispatchers implement the `ExecutionContext` interface and can thus be used to run `Future` invocations etc. -@@snip [DispatcherDocSpec.scala]($code$/scala/docs/dispatcher/DispatcherDocSpec.scala) { #lookup } +Scala +: @@snip [DispatcherDocSpec.scala]($code$/scala/docs/dispatcher/DispatcherDocSpec.scala) { #lookup } +Java +: @@snip [DispatcherDocTest.java]($code$/java/jdocs/dispatcher/DispatcherDocTest.java) { #lookup } ## Setting the dispatcher for an Actor -So in case you want to give your `Actor` a different dispatcher than the default, you need to do two things, of which the first +So in case you want to give your `Actor` a different dispatcher than the default, you need to do two things, of which the first is to configure the dispatcher: + @@snip [DispatcherDocSpec.scala]($code$/scala/docs/dispatcher/DispatcherDocSpec.scala) { #my-dispatcher-config } @@@ note @@ -32,12 +36,13 @@ is to configure the dispatcher: Note that the `parallelism-max` does not set the upper bound on the total number of threads allocated by the ForkJoinPool. It is a setting specifically talking about the number of *hot* threads the pool keep running in order to reduce the latency of handling a new incoming task. -You can read more about parallelism in the JDK's [ForkJoinPool documentation](https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ForkJoinPool.html). +You can read more about parallelism in the JDK's [ForkJoinPool documentation](https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ForkJoinPool.html)] @@@ Another example that uses the "thread-pool-executor": + @@snip [DispatcherDocSpec.scala]($code$/scala/docs/dispatcher/DispatcherDocSpec.scala) { #fixed-pool-size-dispatcher-config } @@@ note @@ -51,15 +56,24 @@ For more options, see the default-dispatcher section of the @ref:[configuration] Then you create the actor as usual and define the dispatcher in the deployment configuration. -@@snip [DispatcherDocSpec.scala]($code$/scala/docs/dispatcher/DispatcherDocSpec.scala) { #defining-dispatcher-in-config } +Scala +: @@snip [DispatcherDocSpec.scala]($code$/scala/docs/dispatcher/DispatcherDocSpec.scala) { #defining-dispatcher-in-config } -@@snip [DispatcherDocSpec.scala]($code$/scala/docs/dispatcher/DispatcherDocSpec.scala) { #dispatcher-deployment-config } +Java +: @@snip [DispatcherDocTest.java]($code$/java/jdocs/dispatcher/DispatcherDocTest.java) { #defining-dispatcher-in-config } + + +@@snip [DispatcherDocSpec.scala]($code$/scala/docs/dispatcher/DispatcherDocSpec.scala) { #dispatcher-deployment-config } An alternative to the deployment configuration is to define the dispatcher in code. If you define the `dispatcher` in the deployment configuration then this value will be used instead of programmatically provided parameter. -@@snip [DispatcherDocSpec.scala]($code$/scala/docs/dispatcher/DispatcherDocSpec.scala) { #defining-dispatcher-in-code } +Scala +: @@snip [DispatcherDocSpec.scala]($code$/scala/docs/dispatcher/DispatcherDocSpec.scala) { #defining-dispatcher-in-code } + +Java +: @@snip [DispatcherDocTest.java]($code$/java/jdocs/dispatcher/DispatcherDocTest.java) { #defining-dispatcher-in-code } @@@ note @@ -76,7 +90,7 @@ There are 3 different types of message dispatchers: * **Dispatcher** - This is an event-based dispatcher that binds a set of Actors to a thread + This is an event-based dispatcher that binds a set of Actors to a thread pool. It is the default dispatcher used if one is not specified. * Sharability: Unlimited @@ -96,7 +110,7 @@ There are 3 different types of message dispatchers: * Use cases: Bulkheading * Driven by: Any `akka.dispatch.ThreadPoolExecutorConfigurator`. By default a "thread-pool-executor". - + * **CallingThreadDispatcher** This dispatcher runs invocations on the current thread only. This @@ -118,19 +132,29 @@ Configuring a dispatcher with fixed thread pool size, e.g. for actors that perfo And then using it: -@@snip [DispatcherDocSpec.scala]($code$/scala/docs/dispatcher/DispatcherDocSpec.scala) { #defining-fixed-pool-size-dispatcher } +Scala +: @@snip [DispatcherDocSpec.scala]($code$/scala/docs/dispatcher/DispatcherDocSpec.scala) { #defining-fixed-pool-size-dispatcher } + +Java +: @@snip [DispatcherDocTest.java]($code$/java/jdocs/dispatcher/DispatcherDocTest.java) { #defining-fixed-pool-size-dispatcher } Another example that uses the thread pool based on the number of cores (e.g. for CPU bound tasks) -@@snip [DispatcherDocSpec.scala]($code$/scala/docs/dispatcher/DispatcherDocSpec.scala) { #my-thread-pool-dispatcher-config } + +@@snip [DispatcherDocSpec.scala]($code$/scala/docs/dispatcher/DispatcherDocSpec.scala) {#my-thread-pool-dispatcher-config } Configuring a `PinnedDispatcher`: -@@snip [DispatcherDocSpec.scala]($code$/scala/docs/dispatcher/DispatcherDocSpec.scala) { #my-pinned-dispatcher-config } + +@@snip [DispatcherDocSpec.scala]($code$/scala/docs/dispatcher/DispatcherDocSpec.scala) {#my-pinned-dispatcher-config } And then using it: -@@snip [DispatcherDocSpec.scala]($code$/scala/docs/dispatcher/DispatcherDocSpec.scala) { #defining-pinned-dispatcher } +Scala +: @@snip [DispatcherDocSpec.scala]($code$/scala/docs/dispatcher/DispatcherDocSpec.scala) { #defining-pinned-dispatcher } + +Java +: @@snip [DispatcherDocTest.java]($code$/java/jdocs/dispatcher/DispatcherDocTest.java) { #defining-pinned-dispatcher } Note that `thread-pool-executor` configuration as per the above `my-thread-pool-dispatcher` example is NOT applicable. This is because every actor will have its own thread pool when using `PinnedDispatcher`, @@ -140,3 +164,222 @@ Note that it's not guaranteed that the *same* thread is used over time, since th is used for `PinnedDispatcher` to keep resource usage down in case of idle actors. To use the same thread all the time you need to add `thread-pool-executor.allow-core-timeout=off` to the configuration of the `PinnedDispatcher`. + +## Blocking Needs Careful Management + +In some cases it is unavoidable to do blocking operations, i.e. to put a thread +to sleep for an indeterminate time, waiting for an external event to occur. +Examples are legacy RDBMS drivers or messaging APIs, and the underlying reason +is typically that (network) I/O occurs under the covers. + + +Scala +: @@snip [BlockingDispatcherSample.scala]($akka$/akka-docs/src/test/scala/docs/actor/BlockingDispatcherSample.scala) { #blocking-in-actor } + +Java +: @@snip [BlockingDispatcherSample.java]($akka$/akka-docs/src/test/java/jdocs/actor/BlockingActor.java) { #blocking-in-actor } + + +When facing this, you +may be tempted to just wrap the blocking call inside a `Future` and work +with that instead, but this strategy is too simple: you are quite likely to +find bottlenecks or run out of memory or threads when the application runs +under increased load. + +Scala +: @@snip [BlockingDispatcherSample.scala]($akka$/akka-docs/src/test/scala/docs/actor/BlockingDispatcherSample.scala) { #blocking-in-future } + +Java +: @@snip [BlockingDispatcherSample.java]($akka$/akka-docs/src/test/java/jdocs/actor/BlockingFutureActor.java) { #blocking-in-future } + + +### Problem: Blocking on default dispatcher + +The key here is this line: + +@@@ div { .group-scala } + +```scala +implicit val executionContext: ExecutionContext = context.dispatcher +``` + +@@@ + +@@@ div { .group-java } + +```java +ExecutionContext ec = getContext().dispatcher(); +``` + +@@@ + +Using @scala[`context.dispatcher`] @java[`getContext().dispatcher()`] as the dispatcher on which the blocking `Future` +executes can be a problem, since this dispatcher is by default used for all other actor processing +unless you @ref:[set up a separate dispatcher for the actor](dispatchers.md#setting-the-dispatcher-for-an-actor). + +If all of the available threads are blocked, then all the actors on the same dispatcher will starve for threads and +will not be able to process incoming messages. + +@@@ note + +Blocking APIs should also be avoided if possible. Try to find or build Reactive APIs, +such that blocking is minimised, or moved over to dedicated dispatchers. + +Often when integrating with existing libraries or systems it is not possible to +avoid blocking APIs. The following solution explains how to handle blocking +operations properly. + +Note that the same hints apply to managing blocking operations anywhere in Akka, +including Streams, Http and other reactive libraries built on top of it. + +@@@ + +Let's set up an application with the above `BlockingFutureActor` and the following `PrintActor`. + +Scala +: @@snip [BlockingDispatcherSample.scala]($akka$/akka-docs/src/test/scala/docs/actor/BlockingDispatcherSample.scala) { #print-actor } + +Java +: @@snip [BlockingDispatcherSample.java]($akka$/akka-docs/src/test/java/jdocs/actor/PrintActor.java) { #print-actor } + + +Scala +: @@snip [BlockingDispatcherSample.scala]($akka$/akka-docs/src/test/scala/docs/actor/BlockingDispatcherSample.scala) { #blocking-main } + +Java +: @@snip [BlockingDispatcherSample.java]($akka$/akka-docs/src/test/java/jdocs/actor/BlockingDispatcherTest.java) { #blocking-main } + + +Here the app is sending 100 messages to `BlockingFutureActor` and `PrintActor` and large numbers +of `akka.actor.default-dispatcher` threads are handling requests. When you run the above code, +you will likely to see the entire application gets stuck somewhere like this: + +``` +> PrintActor: 44 +> PrintActor: 45 +``` + +`PrintActor` is considered non-blocking, however it is not able to proceed with handling the remaining messages, +since all the threads are occupied and blocked by the other blocking actor - thus leading to thread starvation. + +In the thread state diagrams below the colours have the following meaning: + + * Turquoise - Sleeping state + * Orange - Waiting state + * Green - Runnable state + +The thread information was recorded using the YourKit profiler, however any good JVM profiler +has this feature (including the free and bundled with the Oracle JDK VisualVM, as well as Oracle Flight Recorder). + +The orange portion of the thread shows that it is idle. Idle threads are fine - +they're ready to accept new work. However, large amount of turquoise (blocked, or sleeping as in our example) threads +is very bad and leads to thread starvation. + +@@@ note + +If you own a Lightbend subscription you can use the commercial [Thread Starvation Detector](http://developer.lightbend.com/docs/akka-commercial-addons/current/starvation-detector.html) +which will issue warning log statements if it detects any of your dispatchers suffering from starvation and other. +It is a helpful first step to identify the problem is occurring in a production system, +and then you can apply the proposed solutions as explained below. + +@@@ + +![dispatcher-behaviour-on-bad-code.png](../images/dispatcher-behaviour-on-bad-code.png) + +In the above example we put the code under load by sending hundreds of messages to the blocking actor +which causes threads of the default dispatcher to be blocked. +The fork join pool based dispatcher in Akka then attempts to compensate for this blocking by adding more threads to the pool +(`default-akka.actor.default-dispatcher 18,19,20,...`). +This however is not able to help if those too will immediately get blocked, +and eventually the blocking operations will dominate the entire dispatcher. + +In essence, the `Thread.sleep` operation has dominated all threads and caused anything +executing on the default dispatcher to starve for resources (including any actor +that you have not configured an explicit dispatcher for). + +### Solution: Dedicated dispatcher for blocking operations + +One of the most efficient methods of isolating the blocking behaviour such that it does not impact the rest of the system +is to prepare and use a dedicated dispatcher for all those blocking operations. +This technique is often referred to as as "bulk-heading" or simply "isolating blocking". + +In `application.conf`, the dispatcher dedicated to blocking behaviour should +be configured as follows: + + +@@snip [BlockingDispatcherSample.scala]($akka$/akka-docs/src/test/scala/docs/actor/BlockingDispatcherSample.scala) { #my-blocking-dispatcher-config } + +A `thread-pool-executor` based dispatcher allows us to set a limit on the number of threads it will host, +and this way we gain tight control over how at-most-how-many blocked threads will be in the system. + +The exact size should be fine tuned depending on the workload you're expecting to run on this dispatcher +as well as the number of cores of the machine you're running the application on. +Usually a small number around the number of cores is a good default to start from. + +Whenever blocking has to be done, use the above configured dispatcher +instead of the default one: + +Scala +: @@snip [BlockingDispatcherSample.scala]($akka$/akka-docs/src/test/scala/docs/actor/BlockingDispatcherSample.scala) { #separate-dispatcher } + +Java +: @@snip [BlockingDispatcherSample.java]($akka$/akka-docs/src/test/java/jdocs/actor/SeparateDispatcherFutureActor.java) { #separate-dispatcher } + +The thread pool behaviour is shown in the below diagram. + +![dispatcher-behaviour-on-good-code.png](../images/dispatcher-behaviour-on-good-code.png) + +Messages sent to `SeparateDispatcherFutureActor` and `PrintActor` are easily handled by the default dispatcher - the +green lines, which represent the actual execution. + +When blocking operations are run on the `my-blocking-dispatcher`, +it uses the threads (up to the configured limit) to handle these operations. +The sleeping in this case is nicely isolated to just this dispatcher, and the default one remains unaffected, +allowing the rest of the application to proceed as if nothing bad was happening. After +a certain period idleness, threads started by this dispatcher will be shut down. + +In this case, the throughput of other actors was not impacted - +they were still served on the default dispatcher. + +This is the recommended way of dealing with any kind of blocking in reactive +applications. + +For a similar discussion specific about Akka HTTP refer to, @extref:[Handling blocking operations in Akka HTTP](akka.http:java/http/handling-blocking-operations-in-akka-http-routes.html#solution-dedicated-dispatcher-for-blocking-operations). + +### Available solutions to blocking operations + +The non-exhaustive list of adequate solutions to the “blocking problem” +includes the following suggestions: + + * Do the blocking call within an actor (or a set of actors managed by a router +@ref:[router](routing.md), making sure to +configure a thread pool which is either dedicated for this purpose or +sufficiently sized. + * Do the blocking call within a `Future`, ensuring an upper bound on +the number of such calls at any point in time (submitting an unbounded +number of tasks of this nature will exhaust your memory or thread limits). + * Do the blocking call within a `Future`, providing a thread pool with +an upper limit on the number of threads which is appropriate for the +hardware on which the application runs, as explained in detail in this section. + * Dedicate a single thread to manage a set of blocking resources (e.g. a NIO +selector driving multiple channels) and dispatch events as they occur as +actor messages. + +The first possibility is especially well-suited for resources which are +single-threaded in nature, like database handles which traditionally can only +execute one outstanding query at a time and use internal synchronization to +ensure this. A common pattern is to create a router for N actors, each of which +wraps a single DB connection and handles queries as sent to the router. The +number N must then be tuned for maximum throughput, which will vary depending +on which DBMS is deployed on what hardware. + +@@@ note + +Configuring thread pools is a task best delegated to Akka, simply configure +in the `application.conf` and instantiate through an +@ref:[`ActorSystem`](#dispatcher-lookup) + +@@@ + + + diff --git a/akka-docs/src/main/paradox/scala/general/actor-systems.md b/akka-docs/src/main/paradox/scala/general/actor-systems.md index 7666158e6b..a44cf51a13 100644 --- a/akka-docs/src/main/paradox/scala/general/actor-systems.md +++ b/akka-docs/src/main/paradox/scala/general/actor-systems.md @@ -102,50 +102,6 @@ respect to fault-handling (both considering the granularity of configuration and the performance) and it also reduces the strain on the guardian actor, which is a single point of contention if over-used. -## Blocking Needs Careful Management - -In some cases it is unavoidable to do blocking operations, i.e. to put a thread -to sleep for an indeterminate time, waiting for an external event to occur. -Examples are legacy RDBMS drivers or messaging APIs, and the underlying reason -is typically that (network) I/O occurs under the covers. When facing this, you -may be tempted to just wrap the blocking call inside a `Future` and work -with that instead, but this strategy is too simple: you are quite likely to -find bottlenecks or run out of memory or threads when the application runs -under increased load. - -The non-exhaustive list of adequate solutions to the “blocking problem” -includes the following suggestions: - - * Do the blocking call within an actor (or a set of actors managed by a router -@ref:[router](../routing.md), making sure to -configure a thread pool which is either dedicated for this purpose or -sufficiently sized. - * Do the blocking call within a `Future`, ensuring an upper bound on -the number of such calls at any point in time (submitting an unbounded -number of tasks of this nature will exhaust your memory or thread limits). - * Do the blocking call within a `Future`, providing a thread pool with -an upper limit on the number of threads which is appropriate for the -hardware on which the application runs. - * Dedicate a single thread to manage a set of blocking resources (e.g. a NIO -selector driving multiple channels) and dispatch events as they occur as -actor messages. - -The first possibility is especially well-suited for resources which are -single-threaded in nature, like database handles which traditionally can only -execute one outstanding query at a time and use internal synchronization to -ensure this. A common pattern is to create a router for N actors, each of which -wraps a single DB connection and handles queries as sent to the router. The -number N must then be tuned for maximum throughput, which will vary depending -on which DBMS is deployed on what hardware. - -@@@ note - -Configuring thread pools is a task best delegated to Akka, simply configure -in the `application.conf` and instantiate through an -@ref:[`ActorSystem`](../dispatchers.md#dispatcher-lookup) - -@@@ - ## What you should not concern yourself with An actor system manages the resources it is configured to use in order to run diff --git a/akka-docs/src/test/java/jdocs/actor/BlockingActor.java b/akka-docs/src/test/java/jdocs/actor/BlockingActor.java new file mode 100644 index 0000000000..92a20a50be --- /dev/null +++ b/akka-docs/src/test/java/jdocs/actor/BlockingActor.java @@ -0,0 +1,22 @@ +/** + * Copyright (C) 2009-2017 Lightbend Inc. + */ + +package jdocs.actor; + +import akka.actor.AbstractActor; + +// #blocking-in-actor +class BlockingActor extends AbstractActor { + + @Override + public Receive createReceive() { + return receiveBuilder() + .match(Integer.class, i -> { + Thread.sleep(5000); //block for 5 seconds, representing blocking I/O, etc + System.out.println("Blocking operation finished: " + i); + }) + .build(); + } +} +// #blocking-in-actor \ No newline at end of file diff --git a/akka-docs/src/test/java/jdocs/actor/BlockingDispatcherTest.java b/akka-docs/src/test/java/jdocs/actor/BlockingDispatcherTest.java new file mode 100644 index 0000000000..0c7c30e3d2 --- /dev/null +++ b/akka-docs/src/test/java/jdocs/actor/BlockingDispatcherTest.java @@ -0,0 +1,32 @@ +/** + * Copyright (C) 2009-2017 Lightbend Inc. + */ +package jdocs.actor; + +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.actor.Props; + +public class BlockingDispatcherTest { + public static void main(String args[]) { + ActorSystem system = ActorSystem.create("BlockingDispatcherTest"); + + try { + // #blocking-main + ActorRef actor1 = system.actorOf(Props.create(BlockingFutureActor.class)); + ActorRef actor2 = system.actorOf(Props.create(PrintActor.class)); + + for (int i = 0; i < 100; i++) { + actor1.tell(i, ActorRef.noSender()); + actor2.tell(i, ActorRef.noSender()); + } + // #blocking-main + Thread.sleep(5000 * 6); + + } catch (InterruptedException e) { + //swallow the exception + } finally { + system.terminate(); + } + } +} diff --git a/akka-docs/src/test/java/jdocs/actor/BlockingFutureActor.java b/akka-docs/src/test/java/jdocs/actor/BlockingFutureActor.java new file mode 100644 index 0000000000..e5f26c93bb --- /dev/null +++ b/akka-docs/src/test/java/jdocs/actor/BlockingFutureActor.java @@ -0,0 +1,30 @@ +/** + * Copyright (C) 2009-2017 Lightbend Inc. + */ + +package jdocs.actor; + +import akka.actor.AbstractActor; +import akka.dispatch.Futures; +import scala.concurrent.ExecutionContext; +import scala.concurrent.Future; + +// #blocking-in-future +class BlockingFutureActor extends AbstractActor { + ExecutionContext ec = getContext().dispatcher(); + + @Override + public Receive createReceive() { + return receiveBuilder() + .match(Integer.class, i -> { + System.out.println("Calling blocking Future: " + i); + Future f = Futures.future(() -> { + Thread.sleep(5000); + System.out.println("Blocking future finished: " + i); + return i; + }, ec); + }) + .build(); + } +} +// #blocking-in-future \ No newline at end of file diff --git a/akka-docs/src/test/java/jdocs/actor/PrintActor.java b/akka-docs/src/test/java/jdocs/actor/PrintActor.java new file mode 100644 index 0000000000..1bdf14f975 --- /dev/null +++ b/akka-docs/src/test/java/jdocs/actor/PrintActor.java @@ -0,0 +1,20 @@ +/** + * Copyright (C) 2009-2017 Lightbend Inc. + */ + +package jdocs.actor; + +import akka.actor.AbstractActor; + +// #print-actor +class PrintActor extends AbstractActor { + @Override + public Receive createReceive() { + return receiveBuilder() + .match(Integer.class, i -> { + System.out.println("PrintActor: " + i); + }) + .build(); + } +} +// #print-actor diff --git a/akka-docs/src/test/java/jdocs/actor/SeparateDispatcherFutureActor.java b/akka-docs/src/test/java/jdocs/actor/SeparateDispatcherFutureActor.java new file mode 100644 index 0000000000..b0003cf732 --- /dev/null +++ b/akka-docs/src/test/java/jdocs/actor/SeparateDispatcherFutureActor.java @@ -0,0 +1,30 @@ +/** + * Copyright (C) 2009-2017 Lightbend Inc. + */ + +package jdocs.actor; + +import akka.actor.AbstractActor; +import akka.dispatch.Futures; +import scala.concurrent.ExecutionContext; +import scala.concurrent.Future; + +// #separate-dispatcher +class SeparateDispatcherFutureActor extends AbstractActor { + ExecutionContext ec = getContext().getSystem().dispatchers().lookup("my-blocking-dispatcher"); + + @Override + public Receive createReceive() { + return receiveBuilder() + .match(Integer.class, i -> { + System.out.println("Calling blocking Future on separate dispatcher: " + i); + Future f = Futures.future(() -> { + Thread.sleep(5000); + System.out.println("Blocking future finished: " + i); + return i; + }, ec); + }) + .build(); + } +} +// #separate-dispatcher \ No newline at end of file diff --git a/akka-docs/src/test/java/jdocs/actor/SeparateDispatcherTest.java b/akka-docs/src/test/java/jdocs/actor/SeparateDispatcherTest.java new file mode 100644 index 0000000000..d02e87b629 --- /dev/null +++ b/akka-docs/src/test/java/jdocs/actor/SeparateDispatcherTest.java @@ -0,0 +1,46 @@ +/** + * Copyright (C) 2009-2017 Lightbend Inc. + */ + +package jdocs.actor; + +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.actor.Props; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; + +class SeparateDispatcherTest { + public static void main(String args[]) { + Config config = ConfigFactory.parseString( + "my-blocking-dispatcher {\n" + + " type = Dispatcher\n" + + " executor = \"thread-pool-executor\"\n" + + " thread-pool-executor {\n" + + " fixed-pool-size = 16\n" + + " }\n" + + " throughput = 1\n" + + "}\n" + ); + + ActorSystem system = ActorSystem.create("BlockingDispatcherTest", config); + + try { + // #separate-dispatcher-main + ActorRef actor1 = system.actorOf(Props.create(SeparateDispatcherFutureActor.class)); + ActorRef actor2 = system.actorOf(Props.create(PrintActor.class)); + + for (int i = 0; i < 100; i++) { + actor1.tell(i, ActorRef.noSender()); + actor2.tell(i, ActorRef.noSender()); + } + // #separate-dispatcher-main + Thread.sleep(5000 * 6); + + } catch (InterruptedException e) { + //swallow the exception + } finally { + system.terminate(); + } + } +} diff --git a/akka-docs/src/test/scala/docs/actor/BlockingDispatcherSample.scala b/akka-docs/src/test/scala/docs/actor/BlockingDispatcherSample.scala new file mode 100644 index 0000000000..a57e2350f8 --- /dev/null +++ b/akka-docs/src/test/scala/docs/actor/BlockingDispatcherSample.scala @@ -0,0 +1,115 @@ +/** + * Copyright (C) 2009-2017 Lightbend Inc. + */ +package docs.actor + +import akka.actor.{ Actor, ActorSystem, Props } +import com.typesafe.config.{ Config, ConfigFactory } + +import scala.concurrent.{ ExecutionContext, Future } + +// #blocking-in-actor +class BlockingActor extends Actor { + def receive = { + case i: Int => + Thread.sleep(5000) //block for 5 seconds, representing blocking I/O, etc + println(s"Blocking operation finished: ${i}") + } +} +// #blocking-in-actor + +// #blocking-in-future +class BlockingFutureActor extends Actor { + implicit val executionContext: ExecutionContext = context.dispatcher + + def receive = { + case i: Int => + println(s"Calling blocking Future: ${i}") + Future { + Thread.sleep(5000) //block for 5 seconds + println(s"Blocking future finished ${i}") + } + } +} +// #blocking-in-future + +// #separate-dispatcher +class SeparateDispatcherFutureActor extends Actor { + implicit val executionContext: ExecutionContext = context.system.dispatchers.lookup("my-blocking-dispatcher") + + def receive = { + case i: Int => + println(s"Calling blocking Future: ${i}") + Future { + Thread.sleep(5000) //block for 5 seconds + println(s"Blocking future finished ${i}") + } + } +} +// #separate-dispatcher + +// #print-actor +class PrintActor extends Actor { + def receive = { + case i: Int => + println(s"PrintActor: ${i}") + } +} +// #print-actor + +object BlockingDispatcherSample { + def main(args: Array[String]) = { + val system = ActorSystem("BlockingDispatcherSample") + + try { + // #blocking-main + val actor1 = system.actorOf(Props(new BlockingFutureActor)) + val actor2 = system.actorOf(Props(new PrintActor)) + + for (i <- 1 to 100) { + actor1 ! i + actor2 ! i + } + // #blocking-main + } finally { + Thread.sleep(5000 * 6) + system.terminate() + } + } +} + +object SeparateDispatcherSample { + def main(args: Array[String]) = { + + val config = ConfigFactory.parseString( + """ + //#my-blocking-dispatcher-config + my-blocking-dispatcher { + type = Dispatcher + executor = "thread-pool-executor" + thread-pool-executor { + fixed-pool-size = 16 + } + throughput = 1 + } + //#my-blocking-dispatcher-config + """ + ) + val system = ActorSystem("SeparateDispatcherSample", config) + + try { + // #separate-dispatcher-main + val actor1 = system.actorOf(Props(new SeparateDispatcherFutureActor)) + val actor2 = system.actorOf(Props(new PrintActor)) + + for (i <- 1 to 100) { + actor1 ! i + actor2 ! i + } + // #separate-dispatcher-main + } finally { + Thread.sleep(5000 * 6) + system.terminate() + } + } +}