From 3d226cb8efb60228277b9df61145c3fc9c006b3d Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Tue, 31 Jan 2012 10:12:45 +0100 Subject: [PATCH] Switching to fork join as default dispatcher and adding tests for it --- .../test/scala/akka/config/ConfigSpec.scala | 15 +++++++++- akka-actor/src/main/resources/reference.conf | 30 ++++++++++--------- .../akka/dispatch/AbstractDispatcher.scala | 11 ++++--- 3 files changed, 35 insertions(+), 21 deletions(-) diff --git a/akka-actor-tests/src/test/scala/akka/config/ConfigSpec.scala b/akka-actor-tests/src/test/scala/akka/config/ConfigSpec.scala index a29ee517a3..ad39057d1d 100644 --- a/akka-actor-tests/src/test/scala/akka/config/ConfigSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/config/ConfigSpec.scala @@ -39,9 +39,11 @@ class ConfigSpec extends AkkaSpec(ConfigFactory.defaultReference) { { val c = config.getConfig("akka.actor.default-dispatcher") + //General dispatcher config + { c.getString("type") must equal("Dispatcher") - c.getString("executor") must equal("thread-pool-executor") + c.getString("executor") must equal("fork-join-executor") c.getInt("mailbox-capacity") must equal(-1) c.getMilliseconds("mailbox-push-timeout-time") must equal(10 * 1000) c.getString("mailboxType") must be("") @@ -50,6 +52,17 @@ class ConfigSpec extends AkkaSpec(ConfigFactory.defaultReference) { c.getMilliseconds("throughput-deadline-time") must equal(0) } + //Fork join executor config + + { + val pool = c.getConfig("fork-join-executor") + pool.getInt("parallelism-min") must equal(8) + pool.getDouble("parallelism-factor") must equal(3.0) + pool.getInt("parallelism-max") must equal(64) + } + + //Thread pool executor config + { val pool = c.getConfig("thread-pool-executor") import pool._ diff --git a/akka-actor/src/main/resources/reference.conf b/akka-actor/src/main/resources/reference.conf index 74f7b5b245..23d573e794 100644 --- a/akka-actor/src/main/resources/reference.conf +++ b/akka-actor/src/main/resources/reference.conf @@ -159,10 +159,24 @@ akka { type = "Dispatcher" # Which kind of ExecutorService to use for this dispatcher - # Valid options: "thread-pool-executor" requires a "thread-pool-executor" section + # Valid options: # "fork-join-executor" requires a "fork-join-executor" section + # "thread-pool-executor" requires a "thread-pool-executor" section + # or # A FQCN of a class extending ExecutorServiceConfigurator - executor = "thread-pool-executor" + executor = "fork-join-executor" + + # This will be used if you have set "executor = "fork-join-executor"" + fork-join-executor { + # Min number of threads to cap factor-based parallelism number to + parallelism-min = 8 + + # Parallelism (threads) ... ceil(available processors * factor) + parallelism-factor = 3.0 + + # Max number of threads to cap factor-based parallelism number to + parallelism-max = 64 + } # This will be used if you have set "executor = "thread-pool-executor"" thread-pool-executor { @@ -199,18 +213,6 @@ akka { allow-core-timeout = on } - # This will be used if you have set "executor = "fork-join-executor"" - fork-join-executor { - # Min number of threads to cap factor-based parallelism number to - parallelism-min = 8 - - # Parallelism (threads) ... ceil(available processors * factor) - parallelism-factor = 3.0 - - # Max number of threads to cap factor-based parallelism number to - parallelism-max = 64 - } - # How long time the dispatcher will wait for new actors until it shuts down shutdown-timeout = 1s diff --git a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala index e3e312b720..d4c8f5f560 100644 --- a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala @@ -338,17 +338,16 @@ abstract class MessageDispatcherConfigurator(val config: Config, val prerequisit def configureExecutor(): ExecutorServiceConfigurator = { config.getString("executor") match { - case null | "" | "thread-pool-executor" ⇒ new ThreadPoolExecutorConfigurator(config.getConfig("thread-pool-executor"), prerequisites) - case "fork-join-executor" ⇒ new ForkJoinExecutorConfigurator(config.getConfig("fork-join-executor"), prerequisites) + case null | "" | "fork-join-executor" ⇒ new ForkJoinExecutorConfigurator(config.getConfig("fork-join-executor"), prerequisites) + case "thread-pool-executor" ⇒ new ThreadPoolExecutorConfigurator(config.getConfig("thread-pool-executor"), prerequisites) case fqcn ⇒ val constructorSignature = Array[Class[_]](classOf[Config], classOf[DispatcherPrerequisites]) ReflectiveAccess.createInstance[ExecutorServiceConfigurator](fqcn, constructorSignature, Array[AnyRef](config, prerequisites)) match { case Right(instance) ⇒ instance - case Left(exception) ⇒ - throw new IllegalArgumentException( - ("""Cannot instantiate ExecutorServiceConfigurator ("executor = [%s]"), defined in [%s], + case Left(exception) ⇒ throw new IllegalArgumentException( + ("""Cannot instantiate ExecutorServiceConfigurator ("executor = [%s]"), defined in [%s], make sure it has an accessible constructor with a [%s,%s] signature""") - .format(fqcn, config.getString("id"), classOf[Config], classOf[DispatcherPrerequisites]), exception) + .format(fqcn, config.getString("id"), classOf[Config], classOf[DispatcherPrerequisites]), exception) } } }