From b76c02d136076bba1472251e0f623a34faa456ea Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Wed, 24 Aug 2011 11:23:12 +0200 Subject: [PATCH 1/2] #1139 - Added akka.actor.DefaultBootableActorLoaderService for the Java API --- .../main/scala/akka/actor/BootableActorLoaderService.scala | 5 +++++ akka-docs/scala/http.rst | 3 +++ 2 files changed, 8 insertions(+) diff --git a/akka-actor/src/main/scala/akka/actor/BootableActorLoaderService.scala b/akka-actor/src/main/scala/akka/actor/BootableActorLoaderService.scala index 7cf4fbaf4e..7da0ee2490 100644 --- a/akka-actor/src/main/scala/akka/actor/BootableActorLoaderService.scala +++ b/akka-actor/src/main/scala/akka/actor/BootableActorLoaderService.scala @@ -60,3 +60,8 @@ trait BootableActorLoaderService extends Bootable { Actor.registry.local.shutdownAll } } + +/** + * Java API for the default JAX-RS/Mist Initializer + */ + class DefaultBootableActorLoaderService extends BootableActorLoaderService diff --git a/akka-docs/scala/http.rst b/akka-docs/scala/http.rst index 44448b14fc..1cf0b930ae 100644 --- a/akka-docs/scala/http.rst +++ b/akka-docs/scala/http.rst @@ -40,6 +40,9 @@ If you deploy Akka in a JEE container, don't forget to create an Akka initializa // loader.boot(true, new BootableActorLoaderService {}) // If you don't need akka-remote } +For Java users, it's currently only possible to use BootableActorLoaderService, but you'll need to use: akka.actor.DefaultBootableActorLoaderService + + Then you just declare it in your web.xml: .. code-block:: xml From a909bf8d084aeafb65822da13aefb7c5fa970696 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Wed, 24 Aug 2011 12:41:21 +0200 Subject: [PATCH 2/2] #1140 - Adding support for specifying ArrayBlockingQueue or LinkedBlockingQueue in akka.conf and in the builder --- .../src/test/scala/akka/config/ConfigSpec.scala | 2 ++ .../main/scala/akka/dispatch/MessageHandling.scala | 11 ++++++++++- .../main/scala/akka/dispatch/ThreadPoolBuilder.scala | 3 +++ config/akka-reference.conf | 2 ++ 4 files changed, 17 insertions(+), 1 deletion(-) diff --git a/akka-actor-tests/src/test/scala/akka/config/ConfigSpec.scala b/akka-actor-tests/src/test/scala/akka/config/ConfigSpec.scala index 3aa42858c8..1e6a70c016 100644 --- a/akka-actor-tests/src/test/scala/akka/config/ConfigSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/config/ConfigSpec.scala @@ -25,6 +25,8 @@ class ConfigSpec extends WordSpec with MustMatchers { getDouble("akka.actor.default-dispatcher.core-pool-size-factor") must equal(Some(8.0)) getDouble("akka.actor.default-dispatcher.max-pool-size-factor") must equal(Some(8.0)) getInt("akka.actor.default-dispatcher.executor-bounds") must equal(Some(-1)) + getInt("akka.actor.default-dispatcher.task-queue-size") must equal(Some(-1)) + getString("akka.actor.default-dispatcher.task-queue-type") must equal(Some("linked")) getBool("akka.actor.default-dispatcher.allow-core-timeout") must equal(Some(true)) getString("akka.actor.default-dispatcher.rejection-policy") must equal(Some("caller-runs")) getInt("akka.actor.default-dispatcher.mailbox-capacity") must equal(Some(-1)) diff --git a/akka-actor/src/main/scala/akka/dispatch/MessageHandling.scala b/akka-actor/src/main/scala/akka/dispatch/MessageHandling.scala index 4081bf5c57..5bfba2ed78 100644 --- a/akka-actor/src/main/scala/akka/dispatch/MessageHandling.scala +++ b/akka-actor/src/main/scala/akka/dispatch/MessageHandling.scala @@ -281,12 +281,21 @@ abstract class MessageDispatcherConfigurator { conf_?(config getDouble "max-pool-size-factor")(factor ⇒ _.setMaxPoolSizeFromFactor(factor)), conf_?(config getInt "executor-bounds")(bounds ⇒ _.setExecutorBounds(bounds)), conf_?(config getBool "allow-core-timeout")(allow ⇒ _.setAllowCoreThreadTimeout(allow)), + conf_?(config getInt "task-queue-size" flatMap { + case size if size > 0 => + config getString "task-queue-type" map { + case "array" => ThreadPoolConfig.arrayBlockingQueue(size, false) //TODO config fairness? + case "" | "linked" => ThreadPoolConfig.linkedBlockingQueue(size) + case x => throw new IllegalArgumentException("[%s] is not a valid task-queue-type [array|linked]!" format x) + } + case _ => None + })(queueFactory => _.setQueueFactory(queueFactory)), conf_?(config getString "rejection-policy" map { case "abort" ⇒ new AbortPolicy() case "caller-runs" ⇒ new CallerRunsPolicy() case "discard-oldest" ⇒ new DiscardOldestPolicy() case "discard" ⇒ new DiscardPolicy() - case x ⇒ throw new IllegalArgumentException("[%s] is not a valid rejectionPolicy!" format x) + case x ⇒ throw new IllegalArgumentException("[%s] is not a valid rejectionPolicy [abort|caller-runs|discard-oldest|discard]!" format x) })(policy ⇒ _.setRejectionPolicy(policy))) } } diff --git a/akka-actor/src/main/scala/akka/dispatch/ThreadPoolBuilder.scala b/akka-actor/src/main/scala/akka/dispatch/ThreadPoolBuilder.scala index c0b5dbc540..314796d61b 100644 --- a/akka-actor/src/main/scala/akka/dispatch/ThreadPoolBuilder.scala +++ b/akka-actor/src/main/scala/akka/dispatch/ThreadPoolBuilder.scala @@ -144,6 +144,9 @@ case class ThreadPoolConfigDispatcherBuilder(dispatcherFactory: (ThreadPoolConfi def setAllowCoreThreadTimeout(allow: Boolean): ThreadPoolConfigDispatcherBuilder = this.copy(config = config.copy(allowCorePoolTimeout = allow)) + def setQueueFactory(newQueueFactory: QueueFactory): ThreadPoolConfigDispatcherBuilder = + this.copy(config = config.copy(queueFactory = newQueueFactory)) + def configure(fs: Option[Function[ThreadPoolConfigDispatcherBuilder, ThreadPoolConfigDispatcherBuilder]]*): ThreadPoolConfigDispatcherBuilder = fs.foldLeft(this)((c, f) ⇒ f.map(_(c)).getOrElse(c)) } diff --git a/config/akka-reference.conf b/config/akka-reference.conf index 1e461a958a..bce4dc7c51 100644 --- a/config/akka-reference.conf +++ b/config/akka-reference.conf @@ -89,6 +89,8 @@ akka { core-pool-size-factor = 8.0 # No of core threads ... ceil(available processors * factor) max-pool-size-factor = 8.0 # Max no of threads ... ceil(available processors * factor) executor-bounds = -1 # Makes the Executor bounded, -1 is unbounded + task-queue-size = -1 # Specifies the bounded capacity of the task queue (< 1 == unbounded) + task-queue-type = "linked" # Specifies which type of task queue will be used, can be "array" or "linked" (default) allow-core-timeout = on # Allow core threads to time out rejection-policy = "caller-runs" # abort, caller-runs, discard-oldest, discard throughput = 5 # Throughput for Dispatcher, set to 1 for complete fairness