#1140 - Adding support for specifying ArrayBlockingQueue or LinkedBlockingQueue in akka.conf and in the builder

This commit is contained in:
Viktor Klang 2011-08-24 12:41:21 +02:00
parent b76c02d136
commit a909bf8d08
4 changed files with 17 additions and 1 deletions

View file

@ -25,6 +25,8 @@ class ConfigSpec extends WordSpec with MustMatchers {
getDouble("akka.actor.default-dispatcher.core-pool-size-factor") must equal(Some(8.0)) getDouble("akka.actor.default-dispatcher.core-pool-size-factor") must equal(Some(8.0))
getDouble("akka.actor.default-dispatcher.max-pool-size-factor") must equal(Some(8.0)) getDouble("akka.actor.default-dispatcher.max-pool-size-factor") must equal(Some(8.0))
getInt("akka.actor.default-dispatcher.executor-bounds") must equal(Some(-1)) getInt("akka.actor.default-dispatcher.executor-bounds") must equal(Some(-1))
getInt("akka.actor.default-dispatcher.task-queue-size") must equal(Some(-1))
getString("akka.actor.default-dispatcher.task-queue-type") must equal(Some("linked"))
getBool("akka.actor.default-dispatcher.allow-core-timeout") must equal(Some(true)) getBool("akka.actor.default-dispatcher.allow-core-timeout") must equal(Some(true))
getString("akka.actor.default-dispatcher.rejection-policy") must equal(Some("caller-runs")) getString("akka.actor.default-dispatcher.rejection-policy") must equal(Some("caller-runs"))
getInt("akka.actor.default-dispatcher.mailbox-capacity") must equal(Some(-1)) getInt("akka.actor.default-dispatcher.mailbox-capacity") must equal(Some(-1))

View file

@ -281,12 +281,21 @@ abstract class MessageDispatcherConfigurator {
conf_?(config getDouble "max-pool-size-factor")(factor _.setMaxPoolSizeFromFactor(factor)), conf_?(config getDouble "max-pool-size-factor")(factor _.setMaxPoolSizeFromFactor(factor)),
conf_?(config getInt "executor-bounds")(bounds _.setExecutorBounds(bounds)), conf_?(config getInt "executor-bounds")(bounds _.setExecutorBounds(bounds)),
conf_?(config getBool "allow-core-timeout")(allow _.setAllowCoreThreadTimeout(allow)), conf_?(config getBool "allow-core-timeout")(allow _.setAllowCoreThreadTimeout(allow)),
conf_?(config getInt "task-queue-size" flatMap {
case size if size > 0 =>
config getString "task-queue-type" map {
case "array" => ThreadPoolConfig.arrayBlockingQueue(size, false) //TODO config fairness?
case "" | "linked" => ThreadPoolConfig.linkedBlockingQueue(size)
case x => throw new IllegalArgumentException("[%s] is not a valid task-queue-type [array|linked]!" format x)
}
case _ => None
})(queueFactory => _.setQueueFactory(queueFactory)),
conf_?(config getString "rejection-policy" map { conf_?(config getString "rejection-policy" map {
case "abort" new AbortPolicy() case "abort" new AbortPolicy()
case "caller-runs" new CallerRunsPolicy() case "caller-runs" new CallerRunsPolicy()
case "discard-oldest" new DiscardOldestPolicy() case "discard-oldest" new DiscardOldestPolicy()
case "discard" new DiscardPolicy() case "discard" new DiscardPolicy()
case x throw new IllegalArgumentException("[%s] is not a valid rejectionPolicy!" format x) case x throw new IllegalArgumentException("[%s] is not a valid rejectionPolicy [abort|caller-runs|discard-oldest|discard]!" format x)
})(policy _.setRejectionPolicy(policy))) })(policy _.setRejectionPolicy(policy)))
} }
} }

View file

@ -144,6 +144,9 @@ case class ThreadPoolConfigDispatcherBuilder(dispatcherFactory: (ThreadPoolConfi
def setAllowCoreThreadTimeout(allow: Boolean): ThreadPoolConfigDispatcherBuilder = def setAllowCoreThreadTimeout(allow: Boolean): ThreadPoolConfigDispatcherBuilder =
this.copy(config = config.copy(allowCorePoolTimeout = allow)) this.copy(config = config.copy(allowCorePoolTimeout = allow))
def setQueueFactory(newQueueFactory: QueueFactory): ThreadPoolConfigDispatcherBuilder =
this.copy(config = config.copy(queueFactory = newQueueFactory))
def configure(fs: Option[Function[ThreadPoolConfigDispatcherBuilder, ThreadPoolConfigDispatcherBuilder]]*): ThreadPoolConfigDispatcherBuilder = fs.foldLeft(this)((c, f) f.map(_(c)).getOrElse(c)) def configure(fs: Option[Function[ThreadPoolConfigDispatcherBuilder, ThreadPoolConfigDispatcherBuilder]]*): ThreadPoolConfigDispatcherBuilder = fs.foldLeft(this)((c, f) f.map(_(c)).getOrElse(c))
} }

View file

@ -89,6 +89,8 @@ akka {
core-pool-size-factor = 8.0 # No of core threads ... ceil(available processors * factor) core-pool-size-factor = 8.0 # No of core threads ... ceil(available processors * factor)
max-pool-size-factor = 8.0 # Max no of threads ... ceil(available processors * factor) max-pool-size-factor = 8.0 # Max no of threads ... ceil(available processors * factor)
executor-bounds = -1 # Makes the Executor bounded, -1 is unbounded executor-bounds = -1 # Makes the Executor bounded, -1 is unbounded
task-queue-size = -1 # Specifies the bounded capacity of the task queue (< 1 == unbounded)
task-queue-type = "linked" # Specifies which type of task queue will be used, can be "array" or "linked" (default)
allow-core-timeout = on # Allow core threads to time out allow-core-timeout = on # Allow core threads to time out
rejection-policy = "caller-runs" # abort, caller-runs, discard-oldest, discard rejection-policy = "caller-runs" # abort, caller-runs, discard-oldest, discard
throughput = 5 # Throughput for Dispatcher, set to 1 for complete fairness throughput = 5 # Throughput for Dispatcher, set to 1 for complete fairness