Switching to fork join as default dispatcher and adding tests for it
This commit is contained in:
parent
3fb62e1d9c
commit
3d226cb8ef
3 changed files with 35 additions and 21 deletions
|
|
@ -39,9 +39,11 @@ class ConfigSpec extends AkkaSpec(ConfigFactory.defaultReference) {
|
|||
{
|
||||
val c = config.getConfig("akka.actor.default-dispatcher")
|
||||
|
||||
//General dispatcher config
|
||||
|
||||
{
|
||||
c.getString("type") must equal("Dispatcher")
|
||||
c.getString("executor") must equal("thread-pool-executor")
|
||||
c.getString("executor") must equal("fork-join-executor")
|
||||
c.getInt("mailbox-capacity") must equal(-1)
|
||||
c.getMilliseconds("mailbox-push-timeout-time") must equal(10 * 1000)
|
||||
c.getString("mailboxType") must be("")
|
||||
|
|
@ -50,6 +52,17 @@ class ConfigSpec extends AkkaSpec(ConfigFactory.defaultReference) {
|
|||
c.getMilliseconds("throughput-deadline-time") must equal(0)
|
||||
}
|
||||
|
||||
//Fork join executor config
|
||||
|
||||
{
|
||||
val pool = c.getConfig("fork-join-executor")
|
||||
pool.getInt("parallelism-min") must equal(8)
|
||||
pool.getDouble("parallelism-factor") must equal(3.0)
|
||||
pool.getInt("parallelism-max") must equal(64)
|
||||
}
|
||||
|
||||
//Thread pool executor config
|
||||
|
||||
{
|
||||
val pool = c.getConfig("thread-pool-executor")
|
||||
import pool._
|
||||
|
|
|
|||
|
|
@ -159,10 +159,24 @@ akka {
|
|||
type = "Dispatcher"
|
||||
|
||||
# Which kind of ExecutorService to use for this dispatcher
|
||||
# Valid options: "thread-pool-executor" requires a "thread-pool-executor" section
|
||||
# Valid options:
|
||||
# "fork-join-executor" requires a "fork-join-executor" section
|
||||
# "thread-pool-executor" requires a "thread-pool-executor" section
|
||||
# or
|
||||
# A FQCN of a class extending ExecutorServiceConfigurator
|
||||
executor = "thread-pool-executor"
|
||||
executor = "fork-join-executor"
|
||||
|
||||
# This will be used if you have set "executor = "fork-join-executor""
|
||||
fork-join-executor {
|
||||
# Min number of threads to cap factor-based parallelism number to
|
||||
parallelism-min = 8
|
||||
|
||||
# Parallelism (threads) ... ceil(available processors * factor)
|
||||
parallelism-factor = 3.0
|
||||
|
||||
# Max number of threads to cap factor-based parallelism number to
|
||||
parallelism-max = 64
|
||||
}
|
||||
|
||||
# This will be used if you have set "executor = "thread-pool-executor""
|
||||
thread-pool-executor {
|
||||
|
|
@ -199,18 +213,6 @@ akka {
|
|||
allow-core-timeout = on
|
||||
}
|
||||
|
||||
# This will be used if you have set "executor = "fork-join-executor""
|
||||
fork-join-executor {
|
||||
# Min number of threads to cap factor-based parallelism number to
|
||||
parallelism-min = 8
|
||||
|
||||
# Parallelism (threads) ... ceil(available processors * factor)
|
||||
parallelism-factor = 3.0
|
||||
|
||||
# Max number of threads to cap factor-based parallelism number to
|
||||
parallelism-max = 64
|
||||
}
|
||||
|
||||
# How long time the dispatcher will wait for new actors until it shuts down
|
||||
shutdown-timeout = 1s
|
||||
|
||||
|
|
|
|||
|
|
@ -338,17 +338,16 @@ abstract class MessageDispatcherConfigurator(val config: Config, val prerequisit
|
|||
|
||||
def configureExecutor(): ExecutorServiceConfigurator = {
|
||||
config.getString("executor") match {
|
||||
case null | "" | "thread-pool-executor" ⇒ new ThreadPoolExecutorConfigurator(config.getConfig("thread-pool-executor"), prerequisites)
|
||||
case "fork-join-executor" ⇒ new ForkJoinExecutorConfigurator(config.getConfig("fork-join-executor"), prerequisites)
|
||||
case null | "" | "fork-join-executor" ⇒ new ForkJoinExecutorConfigurator(config.getConfig("fork-join-executor"), prerequisites)
|
||||
case "thread-pool-executor" ⇒ new ThreadPoolExecutorConfigurator(config.getConfig("thread-pool-executor"), prerequisites)
|
||||
case fqcn ⇒
|
||||
val constructorSignature = Array[Class[_]](classOf[Config], classOf[DispatcherPrerequisites])
|
||||
ReflectiveAccess.createInstance[ExecutorServiceConfigurator](fqcn, constructorSignature, Array[AnyRef](config, prerequisites)) match {
|
||||
case Right(instance) ⇒ instance
|
||||
case Left(exception) ⇒
|
||||
throw new IllegalArgumentException(
|
||||
("""Cannot instantiate ExecutorServiceConfigurator ("executor = [%s]"), defined in [%s],
|
||||
case Left(exception) ⇒ throw new IllegalArgumentException(
|
||||
("""Cannot instantiate ExecutorServiceConfigurator ("executor = [%s]"), defined in [%s],
|
||||
make sure it has an accessible constructor with a [%s,%s] signature""")
|
||||
.format(fqcn, config.getString("id"), classOf[Config], classOf[DispatcherPrerequisites]), exception)
|
||||
.format(fqcn, config.getString("id"), classOf[Config], classOf[DispatcherPrerequisites]), exception)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue