=act #19201 improve configuration of thread-pool-executor

* The old implementation would cap the pool size (both corePoolSize
  and maximumPoolSize) to max-pool-size, which is very confusing
  becuase maximumPoolSize is only used when the task queue is bounded.
* That resulted in configuring core-pool-size-min and core-pool-size-max
  was not enough, because it could be capped by the default max-pool-size.
* The new behavior is simply that maximumPoolSize is adjusted to not be
  less than corePoolSize, but otherwise the config properties match the
  underlying ThreadPoolExecutor implementation.
* Added a convenience fixed-pool-size property.
This commit is contained in:
Patrik Nordwall 2015-12-17 09:40:03 +01:00
parent a99fee96df
commit a1c3dbe307
19 changed files with 119 additions and 66 deletions

View file

@ -22,8 +22,7 @@ object LocalActorRefProviderSpec {
default-dispatcher {
executor = "thread-pool-executor"
thread-pool-executor {
core-pool-size-min = 16
core-pool-size-max = 16
fixed-pool-size = 16
}
}
}

View file

@ -29,10 +29,7 @@ object TypedActorSpec {
type = "akka.dispatch.BalancingDispatcherConfigurator"
executor = "thread-pool-executor"
thread-pool-executor {
core-pool-size-min = 60
core-pool-size-max = 60
max-pool-size-min = 60
max-pool-size-max = 60
fixed-pool-size = 60
}
}
"""

View file

@ -20,8 +20,7 @@ object DispatcherActorSpec {
throughput = 101
executor = "thread-pool-executor"
thread-pool-executor {
core-pool-size-min = 1
core-pool-size-max = 1
fixed-pool-size = 1
}
}
test-throughput-deadline-dispatcher {
@ -29,8 +28,7 @@ object DispatcherActorSpec {
throughput-deadline-time = 100 milliseconds
executor = "thread-pool-executor"
thread-pool-executor {
core-pool-size-min = 1
core-pool-size-max = 1
fixed-pool-size = 1
}
}

View file

@ -110,6 +110,7 @@ class ConfigSpec extends AkkaSpec(ConfigFactory.defaultReference(ActorSystem.fin
getInt("task-queue-size") should ===(-1)
getString("task-queue-type") should ===("linked")
getBoolean("allow-core-timeout") should ===(true)
getString("fixed-pool-size") should ===("off")
}
// Debug config

View file

@ -333,6 +333,7 @@ akka {
}
# This will be used if you have set "executor = "fork-join-executor""
# Underlying thread pool implementation is scala.concurrent.forkjoin.ForkJoinPool
fork-join-executor {
# Min number of threads to cap factor-based parallelism number to
parallelism-min = 8
@ -351,32 +352,41 @@ akka {
}
# This will be used if you have set "executor = "thread-pool-executor""
# Underlying thread pool implementation is java.util.concurrent.ThreadPoolExecutor
thread-pool-executor {
# Keep alive time for threads
keep-alive-time = 60s
# Define a fixed thread pool size with this property. The corePoolSize
# and the maximumPoolSize of the ThreadPoolExecutor will be set to this
# value, if it is defined. Then the other pool-size properties will not
# be used.
fixed-pool-size = off
# Min number of threads to cap factor-based core number to
# Min number of threads to cap factor-based corePoolSize number to
core-pool-size-min = 8
# The core pool size factor is used to determine thread pool core size
# using the following formula: ceil(available processors * factor).
# The core-pool-size-factor is used to determine corePoolSize of the
# ThreadPoolExecutor using the following formula:
# ceil(available processors * factor).
# Resulting size is then bounded by the core-pool-size-min and
# core-pool-size-max values.
core-pool-size-factor = 3.0
# Max number of threads to cap factor-based number to
# Max number of threads to cap factor-based corePoolSize number to
core-pool-size-max = 64
# Minimum number of threads to cap factor-based max number to
# (if using a bounded task queue)
# Minimum number of threads to cap factor-based maximumPoolSize number to
max-pool-size-min = 8
# Max no of threads (if using a bounded task queue) is determined by
# calculating: ceil(available processors * factor)
# The max-pool-size-factor is used to determine maximumPoolSize of the
# ThreadPoolExecutor using the following formula:
# ceil(available processors * factor)
# The maximumPoolSize will not be less than corePoolSize.
# It is only used if using a bounded task queue.
max-pool-size-factor = 3.0
# Max number of threads to cap factor-based max number to
# (if using a bounded task queue)
# Max number of threads to cap factor-based maximumPoolSize number to
max-pool-size-max = 64
# Specifies the bounded capacity of the task queue (< 1 == unbounded)

View file

@ -345,21 +345,27 @@ class ThreadPoolExecutorConfigurator(config: Config, prerequisites: DispatcherPr
protected def createThreadPoolConfigBuilder(config: Config, prerequisites: DispatcherPrerequisites): ThreadPoolConfigBuilder = {
import akka.util.Helpers.ConfigOps
ThreadPoolConfigBuilder(ThreadPoolConfig())
.setKeepAliveTime(config.getMillisDuration("keep-alive-time"))
.setAllowCoreThreadTimeout(config getBoolean "allow-core-timeout")
.setCorePoolSizeFromFactor(config getInt "core-pool-size-min", config getDouble "core-pool-size-factor", config getInt "core-pool-size-max")
.setMaxPoolSizeFromFactor(config getInt "max-pool-size-min", config getDouble "max-pool-size-factor", config getInt "max-pool-size-max")
.configure(
Some(config getInt "task-queue-size") flatMap {
case size if size > 0
Some(config getString "task-queue-type") map {
case "array" ThreadPoolConfig.arrayBlockingQueue(size, false) //TODO config fairness?
case "" | "linked" ThreadPoolConfig.linkedBlockingQueue(size)
case x throw new IllegalArgumentException("[%s] is not a valid task-queue-type [array|linked]!" format x)
} map { qf (q: ThreadPoolConfigBuilder) q.setQueueFactory(qf) }
case _ None
})
val builder =
ThreadPoolConfigBuilder(ThreadPoolConfig())
.setKeepAliveTime(config.getMillisDuration("keep-alive-time"))
.setAllowCoreThreadTimeout(config getBoolean "allow-core-timeout")
.configure(
Some(config getInt "task-queue-size") flatMap {
case size if size > 0
Some(config getString "task-queue-type") map {
case "array" ThreadPoolConfig.arrayBlockingQueue(size, false) //TODO config fairness?
case "" | "linked" ThreadPoolConfig.linkedBlockingQueue(size)
case x throw new IllegalArgumentException("[%s] is not a valid task-queue-type [array|linked]!" format x)
} map { qf (q: ThreadPoolConfigBuilder) q.setQueueFactory(qf) }
case _ None
})
if (config.getString("fixed-pool-size") == "off")
builder
.setCorePoolSizeFromFactor(config getInt "core-pool-size-min", config getDouble "core-pool-size-factor", config getInt "core-pool-size-max")
.setMaxPoolSizeFromFactor(config getInt "max-pool-size-min", config getDouble "max-pool-size-factor", config getInt "max-pool-size-max")
else
builder.setFixedPoolSize(config.getInt("fixed-pool-size"))
}
def createExecutorServiceFactory(id: String, threadFactory: ThreadFactory): ExecutorServiceFactory =
@ -435,9 +441,10 @@ class ForkJoinExecutorConfigurator(config: Config, prerequisites: DispatcherPrer
}
val asyncMode = config.getString("task-peeking-mode") match {
case "FIFO" true
case "LIFO" false
case unsupported throw new IllegalArgumentException(s"""Cannot instantiate ForkJoinExecutorServiceFactory. "task-peeking-mode" in "fork-join-executor" section could only set to "FIFO" or "LIFO".""")
case "FIFO" true
case "LIFO" false
case unsupported throw new IllegalArgumentException("Cannot instantiate ForkJoinExecutorServiceFactory. " +
""""task-peeking-mode" in "fork-join-executor" section could only set to "FIFO" or "LIFO".""")
}
new ForkJoinExecutorServiceFactory(

View file

@ -123,17 +123,14 @@ final case class ThreadPoolConfigBuilder(config: ThreadPoolConfig) {
def withNewThreadPoolWithArrayBlockingQueueWithCapacityAndFairness(capacity: Int, fair: Boolean): ThreadPoolConfigBuilder =
this.copy(config = config.copy(queueFactory = arrayBlockingQueue(capacity, fair)))
def setFixedPoolSize(size: Int): ThreadPoolConfigBuilder =
this.copy(config = config.copy(corePoolSize = size, maxPoolSize = size))
def setCorePoolSize(size: Int): ThreadPoolConfigBuilder =
if (config.maxPoolSize < size)
this.copy(config = config.copy(corePoolSize = size, maxPoolSize = size))
else
this.copy(config = config.copy(corePoolSize = size))
this.copy(config = config.copy(corePoolSize = size, maxPoolSize = math.max(size, config.maxPoolSize)))
def setMaxPoolSize(size: Int): ThreadPoolConfigBuilder =
if (config.corePoolSize > size)
this.copy(config = config.copy(corePoolSize = size, maxPoolSize = size))
else
this.copy(config = config.copy(maxPoolSize = size))
this.copy(config = config.copy(maxPoolSize = math.max(size, config.corePoolSize)))
def setCorePoolSizeFromFactor(min: Int, multiplier: Double, max: Int): ThreadPoolConfigBuilder =
setCorePoolSize(scaledPoolSize(min, multiplier, max))

View file

@ -29,8 +29,7 @@ object StartupWithOneThreadSpec {
akka.actor.default-dispatcher {
executor = thread-pool-executor
thread-pool-executor {
core-pool-size-min = 1
core-pool-size-max = 1
fixed-pool-size = 1
}
}
"""

View file

@ -349,11 +349,11 @@ Inspecting cluster sharding state
---------------------------------
Two requests to inspect the cluster state are available:
`ClusterShard.getShardRegionStateInstance` which will return a `ClusterShard.ShardRegionState` that contains
the `ShardId`s running in a Region and what `EntityId`s are alive for each of them.
``ClusterShard.getShardRegionStateInstance`` which will return a ``ClusterShard.ShardRegionState`` that contains
the identifiers of the shards running in a Region and what entities are alive for each of them.
`ClusterShard.getClusterShardingStatsInstance` which will query all the regions in the cluster and return
a `ClusterShard.ClusterShardingStats` containing the `ShardId`s running in each region and a count
``ClusterShard.getClusterShardingStatsInstance`` which will query all the regions in the cluster and return
a ``ClusterShard.ClusterShardingStats`` containing the identifiers of the shards running in each region and a count
of entities that are alive in each shard.
The purpose of these messages is testing and monitoring, they are not provided to give access to

View file

@ -66,6 +66,15 @@ public class DispatcherDocTest {
//#defining-dispatcher-in-code
}
@SuppressWarnings("unused")
@Test
public void defineFixedPoolSizeDispatcher() {
//#defining-fixed-pool-size-dispatcher
ActorRef myActor = system.actorOf(Props.create(MyUntypedActor.class)
.withDispatcher("blocking-io-dispatcher"));
//#defining-fixed-pool-size-dispatcher
}
@SuppressWarnings("unused")
@Test
public void definePinnedDispatcher() {

View file

@ -116,6 +116,15 @@ There are 3 different types of message dispatchers:
More dispatcher configuration examples
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
Configuring a dispatcher with fixed thread pool size, e.g. for actors that perform blocking IO:
.. includecode:: ../scala/code/docs/dispatcher/DispatcherDocSpec.scala#fixed-pool-size-dispatcher-config
And then using it:
.. includecode:: ../java/code/docs/dispatcher/DispatcherDocTest.java#defining-fixed-pool-size-dispatcher
Configuring a ``PinnedDispatcher``:
.. includecode:: ../scala/code/docs/dispatcher/DispatcherDocSpec.scala#my-pinned-dispatcher-config

View file

@ -272,7 +272,7 @@ When sending two commands to this ``PersistentActor``, the persist handlers will
.. includecode:: code/docs/persistence/LambdaPersistenceDocTest.java#nested-persist-persist-caller
First the "outer layer" of persist calls is issued and their callbacks are applied. After these have successfully completed,
the inner callbacks will be invoked (once the events they are persisting have been confirmed to be persisted by the journal).
the inner callbacks will be invoked (once the events they are persisting have been confirmed to be persisted by the journal).
Only after all these handlers have been successfully invoked will the next command be delivered to the persistent Actor.
In other words, the stashing of incoming commands that is guaranteed by initially calling ``persist()`` on the outer layer
is extended until all nested ``persist`` callbacks have been handled.

View file

@ -275,7 +275,7 @@ When sending two commands to this ``PersistentActor``, the persist handlers will
.. includecode:: code/docs/persistence/PersistenceDocTest.java#nested-persist-persist-caller
First the "outer layer" of persist calls is issued and their callbacks are applied. After these have successfully completed,
the inner callbacks will be invoked (once the events they are persisting have been confirmed to be persisted by the journal).
the inner callbacks will be invoked (once the events they are persisting have been confirmed to be persisted by the journal).
Only after all these handlers have been successfully invoked will the next command be delivered to the persistent Actor.
In other words, the stashing of incoming commands that is guaranteed by initially calling ``persist()`` on the outer layer
is extended until all nested ``persist`` callbacks have been handled.

View file

@ -647,9 +647,9 @@ The memory usage is O(n) where n is the number of sizes you allow, i.e. upperBou
Pool with ``OptimalSizeExploringResizer`` defined in configuration:
.. includecode:: code/docs/routing/RouterDocSpec.scala#config-optimal-size-exploring-resize-pool
.. includecode:: ../scala/code/docs/routing/RouterDocSpec.scala#config-optimal-size-exploring-resize-pool
.. includecode:: code/docs/routing/RouterDocTest.java#optimal-size-exploring-resize-pool
.. includecode:: code/docs/jrouting/RouterDocTest.java#optimal-size-exploring-resize-pool
Several more configuration options are available and described in ``akka.actor.deployment.default.optimal-size-exploring-resizer``
section of the reference :ref:`configuration`.

View file

@ -673,7 +673,6 @@ Be careful to not do any operations on the ``Future[Terminated]`` using the ``sy
as ``ExecutionContext`` as it will be shut down with the ``ActorSystem``, instead use for example
the Scala standard library context from ``scala.concurrent.ExecutionContext.global``.
// import system.dispatcher <- this would not work
import scala.concurrent.ExecutionContext.Implicits.global

View file

@ -350,11 +350,11 @@ Inspecting cluster sharding state
---------------------------------
Two requests to inspect the cluster state are available:
`ClusterShard.GetShardRegionState` which will return a `ClusterShard.ShardRegionState` that contains
the `ShardId`s running in a Region and what `EntityId`s are alive for each of them.
``ClusterShard.GetShardRegionState`` which will return a ``ClusterShard.ShardRegionState`` that contains
the identifiers of the shards running in a Region and what entities are alive for each of them.
`ClusterShard.GetClusterShardingStats` which will query all the regions in the cluster and return
a `ClusterShard.ClusterShardingStats` containing the `ShardId`s running in each region and a count
``ClusterShard.GetClusterShardingStats`` which will query all the regions in the cluster and return
a ``ClusterShard.ClusterShardingStats`` containing the identifiers of the shards running in each region and a count
of entities that are alive in each shard.
The purpose of these messages is testing and monitoring, they are not provided to give access to

View file

@ -93,6 +93,17 @@ object DispatcherDocSpec {
}
//#my-thread-pool-dispatcher-config
//#fixed-pool-size-dispatcher-config
blocking-io-dispatcher {
type = Dispatcher
executor = "thread-pool-executor"
thread-pool-executor {
fixed-pool-size = 32
}
throughput = 1
}
//#fixed-pool-size-dispatcher-config
//#my-pinned-dispatcher-config
my-pinned-dispatcher {
executor = "thread-pool-executor"
@ -268,11 +279,19 @@ class DispatcherDocSpec extends AkkaSpec(DispatcherDocSpec.config) {
val dispatcher = system.dispatchers.lookup("my-dispatcher-bounded-queue")
}
"defining fixed-pool-size dispatcher" in {
val context = system
//#defining-fixed-pool-size-dispatcher
val myActor =
context.actorOf(Props[MyActor].withDispatcher("blocking-io-dispatcher"), "myactor2")
//#defining-fixed-pool-size-dispatcher
}
"defining pinned dispatcher" in {
val context = system
//#defining-pinned-dispatcher
val myActor =
context.actorOf(Props[MyActor].withDispatcher("my-pinned-dispatcher"), "myactor2")
context.actorOf(Props[MyActor].withDispatcher("my-pinned-dispatcher"), "myactor3")
//#defining-pinned-dispatcher
}

View file

@ -139,6 +139,15 @@ There are 3 different types of message dispatchers:
More dispatcher configuration examples
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
Configuring a dispatcher with fixed thread pool size, e.g. for actors that perform blocking IO:
.. includecode:: ../scala/code/docs/dispatcher/DispatcherDocSpec.scala#fixed-pool-size-dispatcher-config
And then using it:
.. includecode:: ../scala/code/docs/dispatcher/DispatcherDocSpec.scala#defining-fixed-pool-size-dispatcher
Configuring a ``PinnedDispatcher``:
.. includecode:: ../scala/code/docs/dispatcher/DispatcherDocSpec.scala#my-pinned-dispatcher-config

View file

@ -260,7 +260,7 @@ When sending two commands to this ``PersistentActor``, the persist handlers will
.. includecode:: code/docs/persistence/PersistenceDocSpec.scala#nested-persist-persist-caller
First the "outer layer" of persist calls is issued and their callbacks are applied. After these have successfully completed,
the inner callbacks will be invoked (once the events they are persisting have been confirmed to be persisted by the journal).
the inner callbacks will be invoked (once the events they are persisting have been confirmed to be persisted by the journal).
Only after all these handlers have been successfully invoked will the next command be delivered to the persistent Actor.
In other words, the stashing of incoming commands that is guaranteed by initially calling ``persist()`` on the outer layer
is extended until all nested ``persist`` callbacks have been handled.
@ -577,9 +577,9 @@ received.
.. note::
At-least-once delivery implies that original message sending order is not always preserved,
and the destination may receive duplicate messages.
Semantics do not match those of a normal :class:`ActorRef` send operation:
At-least-once delivery implies that original message sending order is not always preserved,
and the destination may receive duplicate messages.
Semantics do not match those of a normal :class:`ActorRef` send operation:
* it is not at-most-once delivery