From a1c3dbe307691db241173352819c5d08b86704f4 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Thu, 17 Dec 2015 09:40:03 +0100 Subject: [PATCH] =act #19201 improve configuration of thread-pool-executor * The old implementation would cap the pool size (both corePoolSize and maximumPoolSize) to max-pool-size, which is very confusing becuase maximumPoolSize is only used when the task queue is bounded. * That resulted in configuring core-pool-size-min and core-pool-size-max was not enough, because it could be capped by the default max-pool-size. * The new behavior is simply that maximumPoolSize is adjusted to not be less than corePoolSize, but otherwise the config properties match the underlying ThreadPoolExecutor implementation. * Added a convenience fixed-pool-size property. --- .../actor/LocalActorRefProviderSpec.scala | 3 +- .../scala/akka/actor/TypedActorSpec.scala | 5 +-- .../actor/dispatch/DispatcherActorSpec.scala | 6 +-- .../test/scala/akka/config/ConfigSpec.scala | 1 + akka-actor/src/main/resources/reference.conf | 30 ++++++++----- .../akka/dispatch/AbstractDispatcher.scala | 43 +++++++++++-------- .../akka/dispatch/ThreadPoolBuilder.scala | 13 +++--- .../cluster/StartupWithOneThreadSpec.scala | 3 +- akka-docs/rst/java/cluster-sharding.rst | 8 ++-- .../docs/dispatcher/DispatcherDocTest.java | 9 ++++ akka-docs/rst/java/dispatchers.rst | 9 ++++ akka-docs/rst/java/lambda-persistence.rst | 2 +- akka-docs/rst/java/persistence.rst | 2 +- akka-docs/rst/java/routing.rst | 4 +- .../project/migration-guide-2.3.x-2.4.x.rst | 1 - akka-docs/rst/scala/cluster-sharding.rst | 8 ++-- .../docs/dispatcher/DispatcherDocSpec.scala | 21 ++++++++- akka-docs/rst/scala/dispatchers.rst | 9 ++++ akka-docs/rst/scala/persistence.rst | 8 ++-- 19 files changed, 119 insertions(+), 66 deletions(-) diff --git a/akka-actor-tests/src/test/scala/akka/actor/LocalActorRefProviderSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/LocalActorRefProviderSpec.scala index ae2d8e5053..f06bf0707a 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/LocalActorRefProviderSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/LocalActorRefProviderSpec.scala @@ -22,8 +22,7 @@ object LocalActorRefProviderSpec { default-dispatcher { executor = "thread-pool-executor" thread-pool-executor { - core-pool-size-min = 16 - core-pool-size-max = 16 + fixed-pool-size = 16 } } } diff --git a/akka-actor-tests/src/test/scala/akka/actor/TypedActorSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/TypedActorSpec.scala index bcbcb1897b..82ffd8be07 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/TypedActorSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/TypedActorSpec.scala @@ -29,10 +29,7 @@ object TypedActorSpec { type = "akka.dispatch.BalancingDispatcherConfigurator" executor = "thread-pool-executor" thread-pool-executor { - core-pool-size-min = 60 - core-pool-size-max = 60 - max-pool-size-min = 60 - max-pool-size-max = 60 + fixed-pool-size = 60 } } """ diff --git a/akka-actor-tests/src/test/scala/akka/actor/dispatch/DispatcherActorSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/dispatch/DispatcherActorSpec.scala index e80eb281f4..f30c9ad119 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/dispatch/DispatcherActorSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/dispatch/DispatcherActorSpec.scala @@ -20,8 +20,7 @@ object DispatcherActorSpec { throughput = 101 executor = "thread-pool-executor" thread-pool-executor { - core-pool-size-min = 1 - core-pool-size-max = 1 + fixed-pool-size = 1 } } test-throughput-deadline-dispatcher { @@ -29,8 +28,7 @@ object DispatcherActorSpec { throughput-deadline-time = 100 milliseconds executor = "thread-pool-executor" thread-pool-executor { - core-pool-size-min = 1 - core-pool-size-max = 1 + fixed-pool-size = 1 } } diff --git a/akka-actor-tests/src/test/scala/akka/config/ConfigSpec.scala b/akka-actor-tests/src/test/scala/akka/config/ConfigSpec.scala index 7d19f4f893..b4a4c4d248 100644 --- a/akka-actor-tests/src/test/scala/akka/config/ConfigSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/config/ConfigSpec.scala @@ -110,6 +110,7 @@ class ConfigSpec extends AkkaSpec(ConfigFactory.defaultReference(ActorSystem.fin getInt("task-queue-size") should ===(-1) getString("task-queue-type") should ===("linked") getBoolean("allow-core-timeout") should ===(true) + getString("fixed-pool-size") should ===("off") } // Debug config diff --git a/akka-actor/src/main/resources/reference.conf b/akka-actor/src/main/resources/reference.conf index 3fd65ce537..afea5c8d46 100644 --- a/akka-actor/src/main/resources/reference.conf +++ b/akka-actor/src/main/resources/reference.conf @@ -333,6 +333,7 @@ akka { } # This will be used if you have set "executor = "fork-join-executor"" + # Underlying thread pool implementation is scala.concurrent.forkjoin.ForkJoinPool fork-join-executor { # Min number of threads to cap factor-based parallelism number to parallelism-min = 8 @@ -351,32 +352,41 @@ akka { } # This will be used if you have set "executor = "thread-pool-executor"" + # Underlying thread pool implementation is java.util.concurrent.ThreadPoolExecutor thread-pool-executor { # Keep alive time for threads keep-alive-time = 60s + + # Define a fixed thread pool size with this property. The corePoolSize + # and the maximumPoolSize of the ThreadPoolExecutor will be set to this + # value, if it is defined. Then the other pool-size properties will not + # be used. + fixed-pool-size = off - # Min number of threads to cap factor-based core number to + # Min number of threads to cap factor-based corePoolSize number to core-pool-size-min = 8 - # The core pool size factor is used to determine thread pool core size - # using the following formula: ceil(available processors * factor). + # The core-pool-size-factor is used to determine corePoolSize of the + # ThreadPoolExecutor using the following formula: + # ceil(available processors * factor). # Resulting size is then bounded by the core-pool-size-min and # core-pool-size-max values. core-pool-size-factor = 3.0 - # Max number of threads to cap factor-based number to + # Max number of threads to cap factor-based corePoolSize number to core-pool-size-max = 64 - # Minimum number of threads to cap factor-based max number to - # (if using a bounded task queue) + # Minimum number of threads to cap factor-based maximumPoolSize number to max-pool-size-min = 8 - # Max no of threads (if using a bounded task queue) is determined by - # calculating: ceil(available processors * factor) + # The max-pool-size-factor is used to determine maximumPoolSize of the + # ThreadPoolExecutor using the following formula: + # ceil(available processors * factor) + # The maximumPoolSize will not be less than corePoolSize. + # It is only used if using a bounded task queue. max-pool-size-factor = 3.0 - # Max number of threads to cap factor-based max number to - # (if using a bounded task queue) + # Max number of threads to cap factor-based maximumPoolSize number to max-pool-size-max = 64 # Specifies the bounded capacity of the task queue (< 1 == unbounded) diff --git a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala index d77beb89f7..9a5a0d4bb4 100644 --- a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala @@ -345,21 +345,27 @@ class ThreadPoolExecutorConfigurator(config: Config, prerequisites: DispatcherPr protected def createThreadPoolConfigBuilder(config: Config, prerequisites: DispatcherPrerequisites): ThreadPoolConfigBuilder = { import akka.util.Helpers.ConfigOps - ThreadPoolConfigBuilder(ThreadPoolConfig()) - .setKeepAliveTime(config.getMillisDuration("keep-alive-time")) - .setAllowCoreThreadTimeout(config getBoolean "allow-core-timeout") - .setCorePoolSizeFromFactor(config getInt "core-pool-size-min", config getDouble "core-pool-size-factor", config getInt "core-pool-size-max") - .setMaxPoolSizeFromFactor(config getInt "max-pool-size-min", config getDouble "max-pool-size-factor", config getInt "max-pool-size-max") - .configure( - Some(config getInt "task-queue-size") flatMap { - case size if size > 0 ⇒ - Some(config getString "task-queue-type") map { - case "array" ⇒ ThreadPoolConfig.arrayBlockingQueue(size, false) //TODO config fairness? - case "" | "linked" ⇒ ThreadPoolConfig.linkedBlockingQueue(size) - case x ⇒ throw new IllegalArgumentException("[%s] is not a valid task-queue-type [array|linked]!" format x) - } map { qf ⇒ (q: ThreadPoolConfigBuilder) ⇒ q.setQueueFactory(qf) } - case _ ⇒ None - }) + val builder = + ThreadPoolConfigBuilder(ThreadPoolConfig()) + .setKeepAliveTime(config.getMillisDuration("keep-alive-time")) + .setAllowCoreThreadTimeout(config getBoolean "allow-core-timeout") + .configure( + Some(config getInt "task-queue-size") flatMap { + case size if size > 0 ⇒ + Some(config getString "task-queue-type") map { + case "array" ⇒ ThreadPoolConfig.arrayBlockingQueue(size, false) //TODO config fairness? + case "" | "linked" ⇒ ThreadPoolConfig.linkedBlockingQueue(size) + case x ⇒ throw new IllegalArgumentException("[%s] is not a valid task-queue-type [array|linked]!" format x) + } map { qf ⇒ (q: ThreadPoolConfigBuilder) ⇒ q.setQueueFactory(qf) } + case _ ⇒ None + }) + + if (config.getString("fixed-pool-size") == "off") + builder + .setCorePoolSizeFromFactor(config getInt "core-pool-size-min", config getDouble "core-pool-size-factor", config getInt "core-pool-size-max") + .setMaxPoolSizeFromFactor(config getInt "max-pool-size-min", config getDouble "max-pool-size-factor", config getInt "max-pool-size-max") + else + builder.setFixedPoolSize(config.getInt("fixed-pool-size")) } def createExecutorServiceFactory(id: String, threadFactory: ThreadFactory): ExecutorServiceFactory = @@ -435,9 +441,10 @@ class ForkJoinExecutorConfigurator(config: Config, prerequisites: DispatcherPrer } val asyncMode = config.getString("task-peeking-mode") match { - case "FIFO" ⇒ true - case "LIFO" ⇒ false - case unsupported ⇒ throw new IllegalArgumentException(s"""Cannot instantiate ForkJoinExecutorServiceFactory. "task-peeking-mode" in "fork-join-executor" section could only set to "FIFO" or "LIFO".""") + case "FIFO" ⇒ true + case "LIFO" ⇒ false + case unsupported ⇒ throw new IllegalArgumentException("Cannot instantiate ForkJoinExecutorServiceFactory. " + + """"task-peeking-mode" in "fork-join-executor" section could only set to "FIFO" or "LIFO".""") } new ForkJoinExecutorServiceFactory( diff --git a/akka-actor/src/main/scala/akka/dispatch/ThreadPoolBuilder.scala b/akka-actor/src/main/scala/akka/dispatch/ThreadPoolBuilder.scala index 00b1edb6b9..ffa456e894 100644 --- a/akka-actor/src/main/scala/akka/dispatch/ThreadPoolBuilder.scala +++ b/akka-actor/src/main/scala/akka/dispatch/ThreadPoolBuilder.scala @@ -123,17 +123,14 @@ final case class ThreadPoolConfigBuilder(config: ThreadPoolConfig) { def withNewThreadPoolWithArrayBlockingQueueWithCapacityAndFairness(capacity: Int, fair: Boolean): ThreadPoolConfigBuilder = this.copy(config = config.copy(queueFactory = arrayBlockingQueue(capacity, fair))) + def setFixedPoolSize(size: Int): ThreadPoolConfigBuilder = + this.copy(config = config.copy(corePoolSize = size, maxPoolSize = size)) + def setCorePoolSize(size: Int): ThreadPoolConfigBuilder = - if (config.maxPoolSize < size) - this.copy(config = config.copy(corePoolSize = size, maxPoolSize = size)) - else - this.copy(config = config.copy(corePoolSize = size)) + this.copy(config = config.copy(corePoolSize = size, maxPoolSize = math.max(size, config.maxPoolSize))) def setMaxPoolSize(size: Int): ThreadPoolConfigBuilder = - if (config.corePoolSize > size) - this.copy(config = config.copy(corePoolSize = size, maxPoolSize = size)) - else - this.copy(config = config.copy(maxPoolSize = size)) + this.copy(config = config.copy(maxPoolSize = math.max(size, config.corePoolSize))) def setCorePoolSizeFromFactor(min: Int, multiplier: Double, max: Int): ThreadPoolConfigBuilder = setCorePoolSize(scaledPoolSize(min, multiplier, max)) diff --git a/akka-cluster/src/test/scala/akka/cluster/StartupWithOneThreadSpec.scala b/akka-cluster/src/test/scala/akka/cluster/StartupWithOneThreadSpec.scala index 9f969cd4ed..8c71db2221 100644 --- a/akka-cluster/src/test/scala/akka/cluster/StartupWithOneThreadSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/StartupWithOneThreadSpec.scala @@ -29,8 +29,7 @@ object StartupWithOneThreadSpec { akka.actor.default-dispatcher { executor = thread-pool-executor thread-pool-executor { - core-pool-size-min = 1 - core-pool-size-max = 1 + fixed-pool-size = 1 } } """ diff --git a/akka-docs/rst/java/cluster-sharding.rst b/akka-docs/rst/java/cluster-sharding.rst index 3b70820e27..16085429ab 100644 --- a/akka-docs/rst/java/cluster-sharding.rst +++ b/akka-docs/rst/java/cluster-sharding.rst @@ -349,11 +349,11 @@ Inspecting cluster sharding state --------------------------------- Two requests to inspect the cluster state are available: -`ClusterShard.getShardRegionStateInstance` which will return a `ClusterShard.ShardRegionState` that contains -the `ShardId`s running in a Region and what `EntityId`s are alive for each of them. +``ClusterShard.getShardRegionStateInstance`` which will return a ``ClusterShard.ShardRegionState`` that contains +the identifiers of the shards running in a Region and what entities are alive for each of them. -`ClusterShard.getClusterShardingStatsInstance` which will query all the regions in the cluster and return -a `ClusterShard.ClusterShardingStats` containing the `ShardId`s running in each region and a count +``ClusterShard.getClusterShardingStatsInstance`` which will query all the regions in the cluster and return +a ``ClusterShard.ClusterShardingStats`` containing the identifiers of the shards running in each region and a count of entities that are alive in each shard. The purpose of these messages is testing and monitoring, they are not provided to give access to diff --git a/akka-docs/rst/java/code/docs/dispatcher/DispatcherDocTest.java b/akka-docs/rst/java/code/docs/dispatcher/DispatcherDocTest.java index 3756c91bdd..116788f53e 100644 --- a/akka-docs/rst/java/code/docs/dispatcher/DispatcherDocTest.java +++ b/akka-docs/rst/java/code/docs/dispatcher/DispatcherDocTest.java @@ -66,6 +66,15 @@ public class DispatcherDocTest { //#defining-dispatcher-in-code } + @SuppressWarnings("unused") + @Test + public void defineFixedPoolSizeDispatcher() { + //#defining-fixed-pool-size-dispatcher + ActorRef myActor = system.actorOf(Props.create(MyUntypedActor.class) + .withDispatcher("blocking-io-dispatcher")); + //#defining-fixed-pool-size-dispatcher + } + @SuppressWarnings("unused") @Test public void definePinnedDispatcher() { diff --git a/akka-docs/rst/java/dispatchers.rst b/akka-docs/rst/java/dispatchers.rst index f01831eac1..3d0ab07f32 100644 --- a/akka-docs/rst/java/dispatchers.rst +++ b/akka-docs/rst/java/dispatchers.rst @@ -116,6 +116,15 @@ There are 3 different types of message dispatchers: More dispatcher configuration examples ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +Configuring a dispatcher with fixed thread pool size, e.g. for actors that perform blocking IO: + +.. includecode:: ../scala/code/docs/dispatcher/DispatcherDocSpec.scala#fixed-pool-size-dispatcher-config + +And then using it: + +.. includecode:: ../java/code/docs/dispatcher/DispatcherDocTest.java#defining-fixed-pool-size-dispatcher + + Configuring a ``PinnedDispatcher``: .. includecode:: ../scala/code/docs/dispatcher/DispatcherDocSpec.scala#my-pinned-dispatcher-config diff --git a/akka-docs/rst/java/lambda-persistence.rst b/akka-docs/rst/java/lambda-persistence.rst index 4382d4596c..074bfaf6a9 100644 --- a/akka-docs/rst/java/lambda-persistence.rst +++ b/akka-docs/rst/java/lambda-persistence.rst @@ -272,7 +272,7 @@ When sending two commands to this ``PersistentActor``, the persist handlers will .. includecode:: code/docs/persistence/LambdaPersistenceDocTest.java#nested-persist-persist-caller First the "outer layer" of persist calls is issued and their callbacks are applied. After these have successfully completed, - the inner callbacks will be invoked (once the events they are persisting have been confirmed to be persisted by the journal). +the inner callbacks will be invoked (once the events they are persisting have been confirmed to be persisted by the journal). Only after all these handlers have been successfully invoked will the next command be delivered to the persistent Actor. In other words, the stashing of incoming commands that is guaranteed by initially calling ``persist()`` on the outer layer is extended until all nested ``persist`` callbacks have been handled. diff --git a/akka-docs/rst/java/persistence.rst b/akka-docs/rst/java/persistence.rst index 570868a7c8..ff5ec1bc27 100644 --- a/akka-docs/rst/java/persistence.rst +++ b/akka-docs/rst/java/persistence.rst @@ -275,7 +275,7 @@ When sending two commands to this ``PersistentActor``, the persist handlers will .. includecode:: code/docs/persistence/PersistenceDocTest.java#nested-persist-persist-caller First the "outer layer" of persist calls is issued and their callbacks are applied. After these have successfully completed, - the inner callbacks will be invoked (once the events they are persisting have been confirmed to be persisted by the journal). +the inner callbacks will be invoked (once the events they are persisting have been confirmed to be persisted by the journal). Only after all these handlers have been successfully invoked will the next command be delivered to the persistent Actor. In other words, the stashing of incoming commands that is guaranteed by initially calling ``persist()`` on the outer layer is extended until all nested ``persist`` callbacks have been handled. diff --git a/akka-docs/rst/java/routing.rst b/akka-docs/rst/java/routing.rst index 9455bc7598..071788432e 100644 --- a/akka-docs/rst/java/routing.rst +++ b/akka-docs/rst/java/routing.rst @@ -647,9 +647,9 @@ The memory usage is O(n) where n is the number of sizes you allow, i.e. upperBou Pool with ``OptimalSizeExploringResizer`` defined in configuration: -.. includecode:: code/docs/routing/RouterDocSpec.scala#config-optimal-size-exploring-resize-pool +.. includecode:: ../scala/code/docs/routing/RouterDocSpec.scala#config-optimal-size-exploring-resize-pool -.. includecode:: code/docs/routing/RouterDocTest.java#optimal-size-exploring-resize-pool +.. includecode:: code/docs/jrouting/RouterDocTest.java#optimal-size-exploring-resize-pool Several more configuration options are available and described in ``akka.actor.deployment.default.optimal-size-exploring-resizer`` section of the reference :ref:`configuration`. diff --git a/akka-docs/rst/project/migration-guide-2.3.x-2.4.x.rst b/akka-docs/rst/project/migration-guide-2.3.x-2.4.x.rst index c5d3526d05..4dcde879c3 100644 --- a/akka-docs/rst/project/migration-guide-2.3.x-2.4.x.rst +++ b/akka-docs/rst/project/migration-guide-2.3.x-2.4.x.rst @@ -673,7 +673,6 @@ Be careful to not do any operations on the ``Future[Terminated]`` using the ``sy as ``ExecutionContext`` as it will be shut down with the ``ActorSystem``, instead use for example the Scala standard library context from ``scala.concurrent.ExecutionContext.global``. - // import system.dispatcher <- this would not work import scala.concurrent.ExecutionContext.Implicits.global diff --git a/akka-docs/rst/scala/cluster-sharding.rst b/akka-docs/rst/scala/cluster-sharding.rst index 6fa8fede74..0d1b5f29d4 100644 --- a/akka-docs/rst/scala/cluster-sharding.rst +++ b/akka-docs/rst/scala/cluster-sharding.rst @@ -350,11 +350,11 @@ Inspecting cluster sharding state --------------------------------- Two requests to inspect the cluster state are available: -`ClusterShard.GetShardRegionState` which will return a `ClusterShard.ShardRegionState` that contains -the `ShardId`s running in a Region and what `EntityId`s are alive for each of them. +``ClusterShard.GetShardRegionState`` which will return a ``ClusterShard.ShardRegionState`` that contains +the identifiers of the shards running in a Region and what entities are alive for each of them. -`ClusterShard.GetClusterShardingStats` which will query all the regions in the cluster and return -a `ClusterShard.ClusterShardingStats` containing the `ShardId`s running in each region and a count +``ClusterShard.GetClusterShardingStats`` which will query all the regions in the cluster and return +a ``ClusterShard.ClusterShardingStats`` containing the identifiers of the shards running in each region and a count of entities that are alive in each shard. The purpose of these messages is testing and monitoring, they are not provided to give access to diff --git a/akka-docs/rst/scala/code/docs/dispatcher/DispatcherDocSpec.scala b/akka-docs/rst/scala/code/docs/dispatcher/DispatcherDocSpec.scala index 8bee24cbfc..510e8981ff 100644 --- a/akka-docs/rst/scala/code/docs/dispatcher/DispatcherDocSpec.scala +++ b/akka-docs/rst/scala/code/docs/dispatcher/DispatcherDocSpec.scala @@ -93,6 +93,17 @@ object DispatcherDocSpec { } //#my-thread-pool-dispatcher-config + //#fixed-pool-size-dispatcher-config + blocking-io-dispatcher { + type = Dispatcher + executor = "thread-pool-executor" + thread-pool-executor { + fixed-pool-size = 32 + } + throughput = 1 + } + //#fixed-pool-size-dispatcher-config + //#my-pinned-dispatcher-config my-pinned-dispatcher { executor = "thread-pool-executor" @@ -268,11 +279,19 @@ class DispatcherDocSpec extends AkkaSpec(DispatcherDocSpec.config) { val dispatcher = system.dispatchers.lookup("my-dispatcher-bounded-queue") } + "defining fixed-pool-size dispatcher" in { + val context = system + //#defining-fixed-pool-size-dispatcher + val myActor = + context.actorOf(Props[MyActor].withDispatcher("blocking-io-dispatcher"), "myactor2") + //#defining-fixed-pool-size-dispatcher + } + "defining pinned dispatcher" in { val context = system //#defining-pinned-dispatcher val myActor = - context.actorOf(Props[MyActor].withDispatcher("my-pinned-dispatcher"), "myactor2") + context.actorOf(Props[MyActor].withDispatcher("my-pinned-dispatcher"), "myactor3") //#defining-pinned-dispatcher } diff --git a/akka-docs/rst/scala/dispatchers.rst b/akka-docs/rst/scala/dispatchers.rst index cf88e47a72..ad391b9482 100644 --- a/akka-docs/rst/scala/dispatchers.rst +++ b/akka-docs/rst/scala/dispatchers.rst @@ -139,6 +139,15 @@ There are 3 different types of message dispatchers: More dispatcher configuration examples ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +Configuring a dispatcher with fixed thread pool size, e.g. for actors that perform blocking IO: + +.. includecode:: ../scala/code/docs/dispatcher/DispatcherDocSpec.scala#fixed-pool-size-dispatcher-config + +And then using it: + +.. includecode:: ../scala/code/docs/dispatcher/DispatcherDocSpec.scala#defining-fixed-pool-size-dispatcher + Configuring a ``PinnedDispatcher``: .. includecode:: ../scala/code/docs/dispatcher/DispatcherDocSpec.scala#my-pinned-dispatcher-config diff --git a/akka-docs/rst/scala/persistence.rst b/akka-docs/rst/scala/persistence.rst index d211965d8a..4dc6c3ded0 100644 --- a/akka-docs/rst/scala/persistence.rst +++ b/akka-docs/rst/scala/persistence.rst @@ -260,7 +260,7 @@ When sending two commands to this ``PersistentActor``, the persist handlers will .. includecode:: code/docs/persistence/PersistenceDocSpec.scala#nested-persist-persist-caller First the "outer layer" of persist calls is issued and their callbacks are applied. After these have successfully completed, - the inner callbacks will be invoked (once the events they are persisting have been confirmed to be persisted by the journal). +the inner callbacks will be invoked (once the events they are persisting have been confirmed to be persisted by the journal). Only after all these handlers have been successfully invoked will the next command be delivered to the persistent Actor. In other words, the stashing of incoming commands that is guaranteed by initially calling ``persist()`` on the outer layer is extended until all nested ``persist`` callbacks have been handled. @@ -577,9 +577,9 @@ received. .. note:: -At-least-once delivery implies that original message sending order is not always preserved, -and the destination may receive duplicate messages. -Semantics do not match those of a normal :class:`ActorRef` send operation: + At-least-once delivery implies that original message sending order is not always preserved, + and the destination may receive duplicate messages. + Semantics do not match those of a normal :class:`ActorRef` send operation: * it is not at-most-once delivery