From 465c29107d04445a52a35b3ba4097b6ab4aa2e09 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Mon, 30 Jan 2012 16:34:25 +0100 Subject: [PATCH] Migrating tests to use the new config for dispatchers --- .../scala/akka/actor/ConsistencySpec.scala | 15 ++++++---- .../actor/LocalActorRefProviderSpec.scala | 7 +++-- .../scala/akka/actor/TypedActorSpec.scala | 11 ++++--- .../workbench/BenchmarkConfig.scala | 28 +++++++++++++----- .../routing/ConfiguredLocalRoutingSpec.scala | 7 +++-- akka-actor/src/main/resources/reference.conf | 14 ++++----- .../akka/dispatch/AbstractDispatcher.scala | 9 +++--- .../akka/dispatch/ThreadPoolBuilder.scala | 11 +++---- .../docs/dispatcher/DispatcherDocSpec.scala | 29 ++++++++++++++----- .../test/scala/akka/testkit/AkkaSpec.scala | 15 ++++++---- .../transactor/CoordinatedIncrementSpec.scala | 7 +++-- 11 files changed, 96 insertions(+), 57 deletions(-) diff --git a/akka-actor-tests/src/test/scala/akka/actor/ConsistencySpec.scala b/akka-actor-tests/src/test/scala/akka/actor/ConsistencySpec.scala index 981ce89ef6..6f8639f4a4 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/ConsistencySpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/ConsistencySpec.scala @@ -9,12 +9,15 @@ object ConsistencySpec { consistency-dispatcher { throughput = 1 keep-alive-time = 1 ms - core-pool-size-min = 10 - core-pool-size-max = 10 - max-pool-size-min = 10 - max-pool-size-max = 10 - task-queue-type = array - task-queue-size = 7 + executor = "thread-pool-executor" + thread-pool-executor { + core-pool-size-min = 10 + core-pool-size-max = 10 + max-pool-size-min = 10 + max-pool-size-max = 10 + task-queue-type = array + task-queue-size = 7 + } } """ class CacheMisaligned(var value: Long, var padding1: Long, var padding2: Long, var padding3: Int) //Vars, no final fences diff --git a/akka-actor-tests/src/test/scala/akka/actor/LocalActorRefProviderSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/LocalActorRefProviderSpec.scala index 82cd08fa77..5ebd8ff565 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/LocalActorRefProviderSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/LocalActorRefProviderSpec.scala @@ -14,8 +14,11 @@ object LocalActorRefProviderSpec { akka { actor { default-dispatcher { - core-pool-size-min = 16 - core-pool-size-max = 16 + executor = "thread-pool-executor" + thread-pool-executor { + core-pool-size-min = 16 + core-pool-size-max = 16 + } } } } diff --git a/akka-actor-tests/src/test/scala/akka/actor/TypedActorSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/TypedActorSpec.scala index 49b37cc506..b83fe78338 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/TypedActorSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/TypedActorSpec.scala @@ -25,10 +25,13 @@ object TypedActorSpec { val config = """ pooled-dispatcher { type = BalancingDispatcher - core-pool-size-min = 60 - core-pool-size-max = 60 - max-pool-size-min = 60 - max-pool-size-max = 60 + executor = "thread-pool-executor" + thread-pool-executor { + core-pool-size-min = 60 + core-pool-size-max = 60 + max-pool-size-min = 60 + max-pool-size-max = 60 + } } """ diff --git a/akka-actor-tests/src/test/scala/akka/performance/workbench/BenchmarkConfig.scala b/akka-actor-tests/src/test/scala/akka/performance/workbench/BenchmarkConfig.scala index 11ed21c9aa..65294d014a 100644 --- a/akka-actor-tests/src/test/scala/akka/performance/workbench/BenchmarkConfig.scala +++ b/akka-actor-tests/src/test/scala/akka/performance/workbench/BenchmarkConfig.scala @@ -21,19 +21,28 @@ object BenchmarkConfig { useDummyOrderbook = false client-dispatcher { - core-pool-size-min = ${benchmark.maxClients} - core-pool-size-max = ${benchmark.maxClients} + executor = "thread-pool-executor" + thread-pool-executor { + core-pool-size-min = ${benchmark.maxClients} + core-pool-size-max = ${benchmark.maxClients} + } } destination-dispatcher { - core-pool-size-min = ${benchmark.maxClients} - core-pool-size-max = ${benchmark.maxClients} + executor = "thread-pool-executor" + thread-pool-executor { + core-pool-size-min = ${benchmark.maxClients} + core-pool-size-max = ${benchmark.maxClients} + } } high-throughput-dispatcher { throughput = 10000 - core-pool-size-min = ${benchmark.maxClients} - core-pool-size-max = ${benchmark.maxClients} + executor = "thread-pool-executor" + thread-pool-executor { + core-pool-size-min = ${benchmark.maxClients} + core-pool-size-max = ${benchmark.maxClients} + } } pinned-dispatcher { @@ -42,8 +51,11 @@ object BenchmarkConfig { latency-dispatcher { throughput = 1 - core-pool-size-min = ${benchmark.maxClients} - core-pool-size-max = ${benchmark.maxClients} + executor = "thread-pool-executor" + thread-pool-executor { + core-pool-size-min = ${benchmark.maxClients} + core-pool-size-max = ${benchmark.maxClients} + } } } """) diff --git a/akka-actor-tests/src/test/scala/akka/routing/ConfiguredLocalRoutingSpec.scala b/akka-actor-tests/src/test/scala/akka/routing/ConfiguredLocalRoutingSpec.scala index f2707e042c..62800b8830 100644 --- a/akka-actor-tests/src/test/scala/akka/routing/ConfiguredLocalRoutingSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/routing/ConfiguredLocalRoutingSpec.scala @@ -13,8 +13,11 @@ object ConfiguredLocalRoutingSpec { akka { actor { default-dispatcher { - core-pool-size-min = 8 - core-pool-size-max = 16 + executor = "thread-pool-executor" + thread-pool-executor { + core-pool-size-min = 8 + core-pool-size-max = 16 + } } } } diff --git a/akka-actor/src/main/resources/reference.conf b/akka-actor/src/main/resources/reference.conf index ffaedde045..74f7b5b245 100644 --- a/akka-actor/src/main/resources/reference.conf +++ b/akka-actor/src/main/resources/reference.conf @@ -158,8 +158,8 @@ akka { # parameters type = "Dispatcher" - #Which kind of ExecutorService to use for this dispatcher - #Valid options: "thread-pool-executor" requires a "thread-pool-executor" section + # Which kind of ExecutorService to use for this dispatcher + # Valid options: "thread-pool-executor" requires a "thread-pool-executor" section # "fork-join-executor" requires a "fork-join-executor" section # A FQCN of a class extending ExecutorServiceConfigurator executor = "thread-pool-executor" @@ -169,13 +169,13 @@ akka { # Keep alive time for threads keep-alive-time = 60s - # minimum number of threads to cap factor-based core number to + # Min number of threads to cap factor-based core number to core-pool-size-min = 8 # No of core threads ... ceil(available processors * factor) core-pool-size-factor = 3.0 - # maximum number of threads to cap factor-based number to + # Max number of threads to cap factor-based number to core-pool-size-max = 64 # Hint: max-pool-size is only used for bounded task queues @@ -185,7 +185,7 @@ akka { # Max no of threads ... ceil(available processors * factor) max-pool-size-factor = 3.0 - # maximum number of threads to cap factor-based max number to + # Max number of threads to cap factor-based max number to max-pool-size-max = 64 # Specifies the bounded capacity of the task queue (< 1 == unbounded) @@ -201,13 +201,13 @@ akka { # This will be used if you have set "executor = "fork-join-executor"" fork-join-executor { - # minimum number of threads to cap factor-based parallelism number to + # Min number of threads to cap factor-based parallelism number to parallelism-min = 8 # Parallelism (threads) ... ceil(available processors * factor) parallelism-factor = 3.0 - # maximum number of threads to cap factor-based parallelism number to + # Max number of threads to cap factor-based parallelism number to parallelism-max = 64 } diff --git a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala index 66dd0385c9..e3e312b720 100644 --- a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala @@ -338,17 +338,16 @@ abstract class MessageDispatcherConfigurator(val config: Config, val prerequisit def configureExecutor(): ExecutorServiceConfigurator = { config.getString("executor") match { - case null | "" ⇒ throw new IllegalArgumentException("""Missing "executor" in config file for dispatcher [%s]""".format(config.getString("id"))) - case "thread-pool-executor" ⇒ new ThreadPoolExecutorConfigurator(config.getConfig("thread-pool-executor"), prerequisites) - case "fork-join-executor" ⇒ new ForkJoinExecutorConfigurator(config.getConfig("fork-join-executor"), prerequisites) + case null | "" | "thread-pool-executor" ⇒ new ThreadPoolExecutorConfigurator(config.getConfig("thread-pool-executor"), prerequisites) + case "fork-join-executor" ⇒ new ForkJoinExecutorConfigurator(config.getConfig("fork-join-executor"), prerequisites) case fqcn ⇒ val constructorSignature = Array[Class[_]](classOf[Config], classOf[DispatcherPrerequisites]) ReflectiveAccess.createInstance[ExecutorServiceConfigurator](fqcn, constructorSignature, Array[AnyRef](config, prerequisites)) match { case Right(instance) ⇒ instance case Left(exception) ⇒ throw new IllegalArgumentException( - ("Cannot instantiate ExecutorServiceConfigurator (\"executor = [%s]\"), defined in [%s], " + - "make sure it has an accessible constructor with a [%s,%s] signature") + ("""Cannot instantiate ExecutorServiceConfigurator ("executor = [%s]"), defined in [%s], + make sure it has an accessible constructor with a [%s,%s] signature""") .format(fqcn, config.getString("id"), classOf[Config], classOf[DispatcherPrerequisites]), exception) } } diff --git a/akka-actor/src/main/scala/akka/dispatch/ThreadPoolBuilder.scala b/akka-actor/src/main/scala/akka/dispatch/ThreadPoolBuilder.scala index 369f3cdaf2..4612fdca1f 100644 --- a/akka-actor/src/main/scala/akka/dispatch/ThreadPoolBuilder.scala +++ b/akka-actor/src/main/scala/akka/dispatch/ThreadPoolBuilder.scala @@ -160,14 +160,11 @@ case class MonitorableThreadFactory(name: String, extends ThreadFactory with ForkJoinPool.ForkJoinWorkerThreadFactory { protected val counter = new AtomicLong - def newThread(pool: ForkJoinPool): ForkJoinWorkerThread = { - val t = ForkJoinPool.defaultForkJoinWorkerThreadFactory.newThread(pool) - t.setDaemon(daemonic) - t - } + def newThread(pool: ForkJoinPool): ForkJoinWorkerThread = wire(ForkJoinPool.defaultForkJoinWorkerThreadFactory.newThread(pool)) - def newThread(runnable: Runnable) = { - val t = new Thread(runnable, name + counter.incrementAndGet()) + def newThread(runnable: Runnable): Thread = wire(new Thread(runnable, name + counter.incrementAndGet())) + + protected def wire[T <: Thread](t: T): T = { t.setUncaughtExceptionHandler(exceptionHandler) t.setDaemon(daemonic) contextClassLoader foreach (t.setContextClassLoader(_)) diff --git a/akka-docs/scala/code/akka/docs/dispatcher/DispatcherDocSpec.scala b/akka-docs/scala/code/akka/docs/dispatcher/DispatcherDocSpec.scala index d0e0945fe8..0df4e3ca5b 100644 --- a/akka-docs/scala/code/akka/docs/dispatcher/DispatcherDocSpec.scala +++ b/akka-docs/scala/code/akka/docs/dispatcher/DispatcherDocSpec.scala @@ -22,12 +22,17 @@ object DispatcherDocSpec { my-dispatcher { # Dispatcher is the name of the event-based dispatcher type = Dispatcher - # minimum number of threads to cap factor-based core number to - core-pool-size-min = 2 - # No of core threads ... ceil(available processors * factor) - core-pool-size-factor = 2.0 - # maximum number of threads to cap factor-based number to - core-pool-size-max = 10 + # What kind of ExecutionService to use + executor = "thread-pool-executor" + # Configuration for the thread pool + thread-pool-executor { + # minimum number of threads to cap factor-based core number to + core-pool-size-min = 2 + # No of core threads ... ceil(available processors * factor) + core-pool-size-factor = 2.0 + # maximum number of threads to cap factor-based number to + core-pool-size-max = 10 + } # Throughput defines the number of messages that are processed in a batch before the # thread is returned to the pool. Set to 1 for as fair as possible. throughput = 100 @@ -37,8 +42,11 @@ object DispatcherDocSpec { //#my-bounded-config my-dispatcher-bounded-queue { type = Dispatcher - core-pool-size-factor = 8.0 - max-pool-size-factor = 16.0 + executor = "thread-pool-executor" + thread-pool-executor { + core-pool-size-factor = 8.0 + max-pool-size-factor = 16.0 + } # Specifies the bounded capacity of the mailbox queue mailbox-capacity = 100 throughput = 3 @@ -48,6 +56,11 @@ object DispatcherDocSpec { //#my-balancing-config my-balancing-dispatcher { type = BalancingDispatcher + executor = "thread-pool-executor" + thread-pool-executor { + core-pool-size-factor = 8.0 + max-pool-size-factor = 16.0 + } } //#my-balancing-config diff --git a/akka-testkit/src/test/scala/akka/testkit/AkkaSpec.scala b/akka-testkit/src/test/scala/akka/testkit/AkkaSpec.scala index c8db05b171..20f7e8b16a 100644 --- a/akka-testkit/src/test/scala/akka/testkit/AkkaSpec.scala +++ b/akka-testkit/src/test/scala/akka/testkit/AkkaSpec.scala @@ -29,12 +29,15 @@ object AkkaSpec { stdout-loglevel = "WARNING" actor { default-dispatcher { - core-pool-size-factor = 2 - core-pool-size-min = 8 - core-pool-size-max = 8 - max-pool-size-factor = 2 - max-pool-size-min = 8 - max-pool-size-max = 8 + executor = "thread-pool-executor" + thread-pool-executor { + core-pool-size-factor = 2 + core-pool-size-min = 8 + core-pool-size-max = 8 + max-pool-size-factor = 2 + max-pool-size-min = 8 + max-pool-size-max = 8 + } } } } diff --git a/akka-transactor/src/test/scala/akka/transactor/CoordinatedIncrementSpec.scala b/akka-transactor/src/test/scala/akka/transactor/CoordinatedIncrementSpec.scala index 265d4a9eaf..9c019a56a5 100644 --- a/akka-transactor/src/test/scala/akka/transactor/CoordinatedIncrementSpec.scala +++ b/akka-transactor/src/test/scala/akka/transactor/CoordinatedIncrementSpec.scala @@ -20,8 +20,11 @@ object CoordinatedIncrement { akka { actor { default-dispatcher { - core-pool-size-min = 5 - core-pool-size-max = 16 + executor = "thread-pool-executor" + thread-pool-executor { + core-pool-size-min = 5 + core-pool-size-max = 16 + } } } }