Migrating tests to use the new config for dispatchers

This commit is contained in:
Viktor Klang 2012-01-30 16:34:25 +01:00
parent 8bc6513911
commit 465c29107d
11 changed files with 96 additions and 57 deletions

View file

@ -9,6 +9,8 @@ object ConsistencySpec {
consistency-dispatcher {
throughput = 1
keep-alive-time = 1 ms
executor = "thread-pool-executor"
thread-pool-executor {
core-pool-size-min = 10
core-pool-size-max = 10
max-pool-size-min = 10
@ -16,6 +18,7 @@ object ConsistencySpec {
task-queue-type = array
task-queue-size = 7
}
}
"""
class CacheMisaligned(var value: Long, var padding1: Long, var padding2: Long, var padding3: Int) //Vars, no final fences

View file

@ -14,11 +14,14 @@ object LocalActorRefProviderSpec {
akka {
actor {
default-dispatcher {
executor = "thread-pool-executor"
thread-pool-executor {
core-pool-size-min = 16
core-pool-size-max = 16
}
}
}
}
"""
}

View file

@ -25,11 +25,14 @@ object TypedActorSpec {
val config = """
pooled-dispatcher {
type = BalancingDispatcher
executor = "thread-pool-executor"
thread-pool-executor {
core-pool-size-min = 60
core-pool-size-max = 60
max-pool-size-min = 60
max-pool-size-max = 60
}
}
"""
class CyclicIterator[T](val items: Seq[T]) extends Iterator[T] {

View file

@ -21,20 +21,29 @@ object BenchmarkConfig {
useDummyOrderbook = false
client-dispatcher {
executor = "thread-pool-executor"
thread-pool-executor {
core-pool-size-min = ${benchmark.maxClients}
core-pool-size-max = ${benchmark.maxClients}
}
}
destination-dispatcher {
executor = "thread-pool-executor"
thread-pool-executor {
core-pool-size-min = ${benchmark.maxClients}
core-pool-size-max = ${benchmark.maxClients}
}
}
high-throughput-dispatcher {
throughput = 10000
executor = "thread-pool-executor"
thread-pool-executor {
core-pool-size-min = ${benchmark.maxClients}
core-pool-size-max = ${benchmark.maxClients}
}
}
pinned-dispatcher {
type = PinnedDispatcher
@ -42,10 +51,13 @@ object BenchmarkConfig {
latency-dispatcher {
throughput = 1
executor = "thread-pool-executor"
thread-pool-executor {
core-pool-size-min = ${benchmark.maxClients}
core-pool-size-max = ${benchmark.maxClients}
}
}
}
""")
private val longRunningBenchmarkConfig = ConfigFactory.parseString("""
benchmark {

View file

@ -13,11 +13,14 @@ object ConfiguredLocalRoutingSpec {
akka {
actor {
default-dispatcher {
executor = "thread-pool-executor"
thread-pool-executor {
core-pool-size-min = 8
core-pool-size-max = 16
}
}
}
}
"""
}

View file

@ -169,13 +169,13 @@ akka {
# Keep alive time for threads
keep-alive-time = 60s
# minimum number of threads to cap factor-based core number to
# Min number of threads to cap factor-based core number to
core-pool-size-min = 8
# No of core threads ... ceil(available processors * factor)
core-pool-size-factor = 3.0
# maximum number of threads to cap factor-based number to
# Max number of threads to cap factor-based number to
core-pool-size-max = 64
# Hint: max-pool-size is only used for bounded task queues
@ -185,7 +185,7 @@ akka {
# Max no of threads ... ceil(available processors * factor)
max-pool-size-factor = 3.0
# maximum number of threads to cap factor-based max number to
# Max number of threads to cap factor-based max number to
max-pool-size-max = 64
# Specifies the bounded capacity of the task queue (< 1 == unbounded)
@ -201,13 +201,13 @@ akka {
# This will be used if you have set "executor = "fork-join-executor""
fork-join-executor {
# minimum number of threads to cap factor-based parallelism number to
# Min number of threads to cap factor-based parallelism number to
parallelism-min = 8
# Parallelism (threads) ... ceil(available processors * factor)
parallelism-factor = 3.0
# maximum number of threads to cap factor-based parallelism number to
# Max number of threads to cap factor-based parallelism number to
parallelism-max = 64
}

View file

@ -338,8 +338,7 @@ abstract class MessageDispatcherConfigurator(val config: Config, val prerequisit
def configureExecutor(): ExecutorServiceConfigurator = {
config.getString("executor") match {
case null | "" throw new IllegalArgumentException("""Missing "executor" in config file for dispatcher [%s]""".format(config.getString("id")))
case "thread-pool-executor" new ThreadPoolExecutorConfigurator(config.getConfig("thread-pool-executor"), prerequisites)
case null | "" | "thread-pool-executor" new ThreadPoolExecutorConfigurator(config.getConfig("thread-pool-executor"), prerequisites)
case "fork-join-executor" new ForkJoinExecutorConfigurator(config.getConfig("fork-join-executor"), prerequisites)
case fqcn
val constructorSignature = Array[Class[_]](classOf[Config], classOf[DispatcherPrerequisites])
@ -347,8 +346,8 @@ abstract class MessageDispatcherConfigurator(val config: Config, val prerequisit
case Right(instance) instance
case Left(exception)
throw new IllegalArgumentException(
("Cannot instantiate ExecutorServiceConfigurator (\"executor = [%s]\"), defined in [%s], " +
"make sure it has an accessible constructor with a [%s,%s] signature")
("""Cannot instantiate ExecutorServiceConfigurator ("executor = [%s]"), defined in [%s],
make sure it has an accessible constructor with a [%s,%s] signature""")
.format(fqcn, config.getString("id"), classOf[Config], classOf[DispatcherPrerequisites]), exception)
}
}

View file

@ -160,14 +160,11 @@ case class MonitorableThreadFactory(name: String,
extends ThreadFactory with ForkJoinPool.ForkJoinWorkerThreadFactory {
protected val counter = new AtomicLong
def newThread(pool: ForkJoinPool): ForkJoinWorkerThread = {
val t = ForkJoinPool.defaultForkJoinWorkerThreadFactory.newThread(pool)
t.setDaemon(daemonic)
t
}
def newThread(pool: ForkJoinPool): ForkJoinWorkerThread = wire(ForkJoinPool.defaultForkJoinWorkerThreadFactory.newThread(pool))
def newThread(runnable: Runnable) = {
val t = new Thread(runnable, name + counter.incrementAndGet())
def newThread(runnable: Runnable): Thread = wire(new Thread(runnable, name + counter.incrementAndGet()))
protected def wire[T <: Thread](t: T): T = {
t.setUncaughtExceptionHandler(exceptionHandler)
t.setDaemon(daemonic)
contextClassLoader foreach (t.setContextClassLoader(_))

View file

@ -22,12 +22,17 @@ object DispatcherDocSpec {
my-dispatcher {
# Dispatcher is the name of the event-based dispatcher
type = Dispatcher
# What kind of ExecutionService to use
executor = "thread-pool-executor"
# Configuration for the thread pool
thread-pool-executor {
# minimum number of threads to cap factor-based core number to
core-pool-size-min = 2
# No of core threads ... ceil(available processors * factor)
core-pool-size-factor = 2.0
# maximum number of threads to cap factor-based number to
core-pool-size-max = 10
}
# Throughput defines the number of messages that are processed in a batch before the
# thread is returned to the pool. Set to 1 for as fair as possible.
throughput = 100
@ -37,8 +42,11 @@ object DispatcherDocSpec {
//#my-bounded-config
my-dispatcher-bounded-queue {
type = Dispatcher
executor = "thread-pool-executor"
thread-pool-executor {
core-pool-size-factor = 8.0
max-pool-size-factor = 16.0
}
# Specifies the bounded capacity of the mailbox queue
mailbox-capacity = 100
throughput = 3
@ -48,6 +56,11 @@ object DispatcherDocSpec {
//#my-balancing-config
my-balancing-dispatcher {
type = BalancingDispatcher
executor = "thread-pool-executor"
thread-pool-executor {
core-pool-size-factor = 8.0
max-pool-size-factor = 16.0
}
}
//#my-balancing-config

View file

@ -29,6 +29,8 @@ object AkkaSpec {
stdout-loglevel = "WARNING"
actor {
default-dispatcher {
executor = "thread-pool-executor"
thread-pool-executor {
core-pool-size-factor = 2
core-pool-size-min = 8
core-pool-size-max = 8
@ -38,6 +40,7 @@ object AkkaSpec {
}
}
}
}
""")
def mapToConfig(map: Map[String, Any]): Config = {

View file

@ -20,11 +20,14 @@ object CoordinatedIncrement {
akka {
actor {
default-dispatcher {
executor = "thread-pool-executor"
thread-pool-executor {
core-pool-size-min = 5
core-pool-size-max = 16
}
}
}
}
"""
case class Increment(friends: Seq[ActorRef])