add min/max bounds on absolute number of threads for dispatcher

The reason for the previously failing test case was that Jenkins has
only two (logical) cores and the test config sets the
core-pool-size-factor to 2, meaning four threads. This is not enough to
have four Futures waiting (as per the test) and one actor which actually
does the work (the guardian in this case). Hence, make it so that
core-pool-size-min gives the absolute minimum and set the default of
that to eight.

While doing so I cleaned up the MessageDispatcherConfigurator to not use
HOF since now configuration items are not optional anymore, yielding a
flow which is much more readily understandable.
This commit is contained in:
Roland 2011-12-06 20:33:25 +01:00
parent f7f36ac98c
commit 831b3272ad
2 changed files with 23 additions and 14 deletions

View file

@ -310,22 +310,26 @@ abstract class MessageDispatcherConfigurator() {
settings: Settings,
createDispatcher: (ThreadPoolConfig) MessageDispatcher): ThreadPoolConfigDispatcherBuilder = {
import ThreadPoolConfigDispatcherBuilder.conf_?
import scala.math.{ min, max }
//Apply the following options to the config if they are present in the config
ThreadPoolConfigDispatcherBuilder(createDispatcher, ThreadPoolConfig()).configure(
conf_?(Some(config getMilliseconds "keep-alive-time"))(time _.setKeepAliveTime(Duration(time, TimeUnit.MILLISECONDS))),
conf_?(Some(config getDouble "core-pool-size-factor"))(factor _.setCorePoolSizeFromFactor(factor)),
conf_?(Some(config getDouble "max-pool-size-factor"))(factor _.setMaxPoolSizeFromFactor(factor)),
conf_?(Some(config getBoolean "allow-core-timeout"))(allow _.setAllowCoreThreadTimeout(allow)),
conf_?(Some(config getInt "task-queue-size") flatMap {
case size if size > 0
Some(config getString "task-queue-type") map {
case "array" ThreadPoolConfig.arrayBlockingQueue(size, false) //TODO config fairness?
case "" | "linked" ThreadPoolConfig.linkedBlockingQueue(size)
case x throw new IllegalArgumentException("[%s] is not a valid task-queue-type [array|linked]!" format x)
}
case _ None
})(queueFactory _.setQueueFactory(queueFactory)))
ThreadPoolConfigDispatcherBuilder(createDispatcher, ThreadPoolConfig())
.setKeepAliveTime(Duration(config getMilliseconds "keep-alive-time", TimeUnit.MILLISECONDS))
.setAllowCoreThreadTimeout(config getBoolean "allow-core-timeout")
.setCorePoolSize(min(max(ThreadPoolConfig.scaledPoolSize(config getDouble "core-pool-size-factor"),
config getInt "core-pool-size-min"), config getInt "core-pool-size-max"))
.setMaxPoolSize(min(max(ThreadPoolConfig.scaledPoolSize(config getDouble "max-pool-size-factor"),
config getInt "max-pool-size-min"), config getInt "max-pool-size-max"))
.configure(
conf_?(Some(config getInt "task-queue-size") flatMap {
case size if size > 0
Some(config getString "task-queue-type") map {
case "array" ThreadPoolConfig.arrayBlockingQueue(size, false) //TODO config fairness?
case "" | "linked" ThreadPoolConfig.linkedBlockingQueue(size)
case x throw new IllegalArgumentException("[%s] is not a valid task-queue-type [array|linked]!" format x)
}
case _ None
})(queueFactory _.setQueueFactory(queueFactory)))
}
}