From a909bf8d084aeafb65822da13aefb7c5fa970696 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Wed, 24 Aug 2011 12:41:21 +0200 Subject: [PATCH] #1140 - Adding support for specifying ArrayBlockingQueue or LinkedBlockingQueue in akka.conf and in the builder --- .../src/test/scala/akka/config/ConfigSpec.scala | 2 ++ .../main/scala/akka/dispatch/MessageHandling.scala | 11 ++++++++++- .../main/scala/akka/dispatch/ThreadPoolBuilder.scala | 3 +++ config/akka-reference.conf | 2 ++ 4 files changed, 17 insertions(+), 1 deletion(-) diff --git a/akka-actor-tests/src/test/scala/akka/config/ConfigSpec.scala b/akka-actor-tests/src/test/scala/akka/config/ConfigSpec.scala index 3aa42858c8..1e6a70c016 100644 --- a/akka-actor-tests/src/test/scala/akka/config/ConfigSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/config/ConfigSpec.scala @@ -25,6 +25,8 @@ class ConfigSpec extends WordSpec with MustMatchers { getDouble("akka.actor.default-dispatcher.core-pool-size-factor") must equal(Some(8.0)) getDouble("akka.actor.default-dispatcher.max-pool-size-factor") must equal(Some(8.0)) getInt("akka.actor.default-dispatcher.executor-bounds") must equal(Some(-1)) + getInt("akka.actor.default-dispatcher.task-queue-size") must equal(Some(-1)) + getString("akka.actor.default-dispatcher.task-queue-type") must equal(Some("linked")) getBool("akka.actor.default-dispatcher.allow-core-timeout") must equal(Some(true)) getString("akka.actor.default-dispatcher.rejection-policy") must equal(Some("caller-runs")) getInt("akka.actor.default-dispatcher.mailbox-capacity") must equal(Some(-1)) diff --git a/akka-actor/src/main/scala/akka/dispatch/MessageHandling.scala b/akka-actor/src/main/scala/akka/dispatch/MessageHandling.scala index 4081bf5c57..5bfba2ed78 100644 --- a/akka-actor/src/main/scala/akka/dispatch/MessageHandling.scala +++ b/akka-actor/src/main/scala/akka/dispatch/MessageHandling.scala @@ -281,12 +281,21 @@ abstract class MessageDispatcherConfigurator { conf_?(config getDouble "max-pool-size-factor")(factor ⇒ _.setMaxPoolSizeFromFactor(factor)), conf_?(config getInt "executor-bounds")(bounds ⇒ _.setExecutorBounds(bounds)), conf_?(config getBool "allow-core-timeout")(allow ⇒ _.setAllowCoreThreadTimeout(allow)), + conf_?(config getInt "task-queue-size" flatMap { + case size if size > 0 => + config getString "task-queue-type" map { + case "array" => ThreadPoolConfig.arrayBlockingQueue(size, false) //TODO config fairness? + case "" | "linked" => ThreadPoolConfig.linkedBlockingQueue(size) + case x => throw new IllegalArgumentException("[%s] is not a valid task-queue-type [array|linked]!" format x) + } + case _ => None + })(queueFactory => _.setQueueFactory(queueFactory)), conf_?(config getString "rejection-policy" map { case "abort" ⇒ new AbortPolicy() case "caller-runs" ⇒ new CallerRunsPolicy() case "discard-oldest" ⇒ new DiscardOldestPolicy() case "discard" ⇒ new DiscardPolicy() - case x ⇒ throw new IllegalArgumentException("[%s] is not a valid rejectionPolicy!" format x) + case x ⇒ throw new IllegalArgumentException("[%s] is not a valid rejectionPolicy [abort|caller-runs|discard-oldest|discard]!" format x) })(policy ⇒ _.setRejectionPolicy(policy))) } } diff --git a/akka-actor/src/main/scala/akka/dispatch/ThreadPoolBuilder.scala b/akka-actor/src/main/scala/akka/dispatch/ThreadPoolBuilder.scala index c0b5dbc540..314796d61b 100644 --- a/akka-actor/src/main/scala/akka/dispatch/ThreadPoolBuilder.scala +++ b/akka-actor/src/main/scala/akka/dispatch/ThreadPoolBuilder.scala @@ -144,6 +144,9 @@ case class ThreadPoolConfigDispatcherBuilder(dispatcherFactory: (ThreadPoolConfi def setAllowCoreThreadTimeout(allow: Boolean): ThreadPoolConfigDispatcherBuilder = this.copy(config = config.copy(allowCorePoolTimeout = allow)) + def setQueueFactory(newQueueFactory: QueueFactory): ThreadPoolConfigDispatcherBuilder = + this.copy(config = config.copy(queueFactory = newQueueFactory)) + def configure(fs: Option[Function[ThreadPoolConfigDispatcherBuilder, ThreadPoolConfigDispatcherBuilder]]*): ThreadPoolConfigDispatcherBuilder = fs.foldLeft(this)((c, f) ⇒ f.map(_(c)).getOrElse(c)) } diff --git a/config/akka-reference.conf b/config/akka-reference.conf index 1e461a958a..bce4dc7c51 100644 --- a/config/akka-reference.conf +++ b/config/akka-reference.conf @@ -89,6 +89,8 @@ akka { core-pool-size-factor = 8.0 # No of core threads ... ceil(available processors * factor) max-pool-size-factor = 8.0 # Max no of threads ... ceil(available processors * factor) executor-bounds = -1 # Makes the Executor bounded, -1 is unbounded + task-queue-size = -1 # Specifies the bounded capacity of the task queue (< 1 == unbounded) + task-queue-type = "linked" # Specifies which type of task queue will be used, can be "array" or "linked" (default) allow-core-timeout = on # Allow core threads to time out rejection-policy = "caller-runs" # abort, caller-runs, discard-oldest, discard throughput = 5 # Throughput for Dispatcher, set to 1 for complete fairness