From c9523585d5d1a234e06e53c5b34ce314088cb386 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Fri, 4 Mar 2011 20:55:12 +0100 Subject: [PATCH] Adding support for MessageDispatcherConfigurator, which means that you can configure homegrown dispatchers in akka.conf --- .../scala/akka/dispatch/Dispatchers.scala | 96 +++++++++---------- .../scala/akka/dispatch/MessageHandling.scala | 39 +++++++- 2 files changed, 84 insertions(+), 51 deletions(-) diff --git a/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala b/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala index 616384ae43..e41eef444b 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala @@ -7,11 +7,10 @@ package akka.dispatch import akka.actor.{Actor, ActorRef} import akka.actor.newUuid import akka.config.Config._ -import akka.util.{Duration} +import akka.util.{Duration,ReflectiveAccess} import akka.config.ConfigMap -import java.util.concurrent.ThreadPoolExecutor.{AbortPolicy, CallerRunsPolicy, DiscardOldestPolicy, DiscardPolicy} import java.util.concurrent.TimeUnit /** @@ -165,6 +164,7 @@ object Dispatchers { * type = "GlobalExecutorBasedEventDriven" # Must be one of the following, all "Global*" are non-configurable * # (ExecutorBasedEventDrivenWorkStealing), ExecutorBasedEventDriven, * # GlobalExecutorBasedEventDriven + * # A FQCN to a class inheriting MessageDispatcherConfigurator with a no-arg visible constructor * keep-alive-time = 60 # Keep alive time for threads * core-pool-size-factor = 1.0 # No of core threads ... ceil(available processors * factor) * max-pool-size-factor = 4.0 # Max no of threads ... ceil(available processors * factor) @@ -178,54 +178,52 @@ object Dispatchers { * Gotcha: Only configures the dispatcher if possible * Returns: None if "type" isn't specified in the config * Throws: IllegalArgumentException if the value of "type" is not valid + * IllegalArgumentException if it cannot */ def from(cfg: ConfigMap): Option[MessageDispatcher] = { - lazy val name = cfg.getString("name", newUuid.toString) - - def configureThreadPool(createDispatcher: => (ThreadPoolConfig) => MessageDispatcher): ThreadPoolConfigDispatcherBuilder = { - import ThreadPoolConfigDispatcherBuilder.conf_? - - //Apply the following options to the config if they are present in the cfg - ThreadPoolConfigDispatcherBuilder(createDispatcher,ThreadPoolConfig()).configure( - conf_?(cfg getInt "keep-alive-time" )(time => _.setKeepAliveTime(Duration(time, TIME_UNIT))), - conf_?(cfg getDouble "core-pool-size-factor")(factor => _.setCorePoolSizeFromFactor(factor)), - conf_?(cfg getDouble "max-pool-size-factor" )(factor => _.setMaxPoolSizeFromFactor(factor)), - conf_?(cfg getInt "executor-bounds" )(bounds => _.setExecutorBounds(bounds)), - conf_?(cfg getBool "allow-core-timeout" )(allow => _.setAllowCoreThreadTimeout(allow)), - conf_?(cfg getString "rejection-policy" map { - case "abort" => new AbortPolicy() - case "caller-runs" => new CallerRunsPolicy() - case "discard-oldest" => new DiscardOldestPolicy() - case "discard" => new DiscardPolicy() - case x => throw new IllegalArgumentException("[%s] is not a valid rejectionPolicy!" format x) - })(policy => _.setRejectionPolicy(policy))) - } - - lazy val mailboxType: MailboxType = { - val capacity = cfg.getInt("mailbox-capacity", MAILBOX_CAPACITY) - // FIXME how do we read in isBlocking for mailbox? Now set to 'false'. - if (capacity < 0) UnboundedMailbox() - else BoundedMailbox(false, capacity, Duration(cfg.getInt("mailbox-push-timeout-time", MAILBOX_PUSH_TIME_OUT.toMillis.toInt), TIME_UNIT)) - } - - cfg.getString("type") map { - case "ExecutorBasedEventDriven" => - configureThreadPool(threadPoolConfig => new ExecutorBasedEventDrivenDispatcher( - name, - cfg.getInt("throughput", THROUGHPUT), - cfg.getInt("throughput-deadline-time", THROUGHPUT_DEADLINE_TIME_MILLIS), - mailboxType, - threadPoolConfig)).build - - case "ExecutorBasedEventDrivenWorkStealing" => - configureThreadPool(threadPoolConfig => new ExecutorBasedEventDrivenWorkStealingDispatcher( - name, - cfg.getInt("throughput", THROUGHPUT), - cfg.getInt("throughput-deadline-time", THROUGHPUT_DEADLINE_TIME_MILLIS), - mailboxType, - threadPoolConfig)).build - case "GlobalExecutorBasedEventDriven" => globalExecutorBasedEventDrivenDispatcher - case unknown => throw new IllegalArgumentException("Unknown dispatcher type [%s]" format unknown) - } + cfg.getString("type") map { + case "ExecutorBasedEventDriven" => new ExecutorBasedEventDrivenDispatcherConfigurator() + case "ExecutorBasedEventDrivenWorkStealing" => new ExecutorBasedEventDrivenWorkStealingDispatcherConfigurator() + case "GlobalExecutorBasedEventDriven" => GlobalExecutorBasedEventDrivenDispatcherConfigurator + case fqn => + ReflectiveAccess.getClassFor[MessageDispatcherConfigurator](fqn) match { + case Some(clazz) => + val instance = ReflectiveAccess.createInstance[MessageDispatcherConfigurator](clazz, Array[Class[_]](), Array[AnyRef]()) + if (instance.isEmpty) + throw new IllegalArgumentException("Cannot instantiate MessageDispatcherConfigurator type [%s], make sure it has a default no-args constructor" format fqn) + else + instance.get + case None => + throw new IllegalArgumentException("Unknown MessageDispatcherConfigurator type [%s]" format fqn) + } + } map { + _ configure cfg + } } } + +object GlobalExecutorBasedEventDrivenDispatcherConfigurator extends MessageDispatcherConfigurator { + def configure(config: ConfigMap): MessageDispatcher = Dispatchers.globalExecutorBasedEventDrivenDispatcher +} + +class ExecutorBasedEventDrivenDispatcherConfigurator extends MessageDispatcherConfigurator { + def configure(config: ConfigMap): MessageDispatcher = { + configureThreadPool(config, threadPoolConfig => new ExecutorBasedEventDrivenDispatcher( + config.getString("name", newUuid.toString), + config.getInt("throughput", Dispatchers.THROUGHPUT), + config.getInt("throughput-deadline-time", Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS), + mailboxType(config), + threadPoolConfig)).build + } +} + +class ExecutorBasedEventDrivenWorkStealingDispatcherConfigurator extends MessageDispatcherConfigurator { + def configure(config: ConfigMap): MessageDispatcher = { + configureThreadPool(config, threadPoolConfig => new ExecutorBasedEventDrivenWorkStealingDispatcher( + config.getString("name", newUuid.toString), + config.getInt("throughput", Dispatchers.THROUGHPUT), + config.getInt("throughput-deadline-time", Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS), + mailboxType(config), + threadPoolConfig)).build + } +} \ No newline at end of file diff --git a/akka-actor/src/main/scala/akka/dispatch/MessageHandling.scala b/akka-actor/src/main/scala/akka/dispatch/MessageHandling.scala index cce4d2e871..3f0066227f 100644 --- a/akka-actor/src/main/scala/akka/dispatch/MessageHandling.scala +++ b/akka-actor/src/main/scala/akka/dispatch/MessageHandling.scala @@ -6,8 +6,10 @@ package akka.dispatch import java.util.concurrent._ import atomic. {AtomicInteger, AtomicBoolean, AtomicReference, AtomicLong} - -import akka.util.{Switch, ReentrantGuard, HashCode, ReflectiveAccess} +import akka.config.ConfigMap +import akka.config.Config.TIME_UNIT +import akka.util.{Duration, Switch, ReentrantGuard, HashCode, ReflectiveAccess} +import java.util.concurrent.ThreadPoolExecutor.{AbortPolicy, CallerRunsPolicy, DiscardOldestPolicy, DiscardPolicy} import akka.actor._ /** @@ -162,3 +164,36 @@ trait MessageDispatcher { */ def mailboxSize(actorRef: ActorRef): Int } + +/** + * Trait to be used for hooking in new dispatchers into Dispatchers.fromConfig + */ +abstract class MessageDispatcherConfigurator { + def configure(config: ConfigMap): MessageDispatcher + + def mailboxType(config: ConfigMap): MailboxType = { + val capacity = config.getInt("mailbox-capacity", Dispatchers.MAILBOX_CAPACITY) + // FIXME how do we read in isBlocking for mailbox? Now set to 'false'. + if (capacity < 0) UnboundedMailbox() + else BoundedMailbox(false, capacity, Duration(config.getInt("mailbox-push-timeout-time", Dispatchers.MAILBOX_PUSH_TIME_OUT.toMillis.toInt), TIME_UNIT)) + } + + def configureThreadPool(config: ConfigMap, createDispatcher: => (ThreadPoolConfig) => MessageDispatcher): ThreadPoolConfigDispatcherBuilder = { + import ThreadPoolConfigDispatcherBuilder.conf_? + + //Apply the following options to the config if they are present in the config + ThreadPoolConfigDispatcherBuilder(createDispatcher,ThreadPoolConfig()).configure( + conf_?(config getInt "keep-alive-time" )(time => _.setKeepAliveTime(Duration(time, TIME_UNIT))), + conf_?(config getDouble "core-pool-size-factor")(factor => _.setCorePoolSizeFromFactor(factor)), + conf_?(config getDouble "max-pool-size-factor" )(factor => _.setMaxPoolSizeFromFactor(factor)), + conf_?(config getInt "executor-bounds" )(bounds => _.setExecutorBounds(bounds)), + conf_?(config getBool "allow-core-timeout" )(allow => _.setAllowCoreThreadTimeout(allow)), + conf_?(config getString "rejection-policy" map { + case "abort" => new AbortPolicy() + case "caller-runs" => new CallerRunsPolicy() + case "discard-oldest" => new DiscardOldestPolicy() + case "discard" => new DiscardPolicy() + case x => throw new IllegalArgumentException("[%s] is not a valid rejectionPolicy!" format x) + })(policy => _.setRejectionPolicy(policy))) + } +}