Adding support for MessageDispatcherConfigurator, which means that you can configure homegrown dispatchers in akka.conf

This commit is contained in:
Viktor Klang 2011-03-04 20:55:12 +01:00
parent fe5ead9d6b
commit c9523585d5
2 changed files with 84 additions and 51 deletions

View file

@ -7,11 +7,10 @@ package akka.dispatch
import akka.actor.{Actor, ActorRef}
import akka.actor.newUuid
import akka.config.Config._
import akka.util.{Duration}
import akka.util.{Duration,ReflectiveAccess}
import akka.config.ConfigMap
import java.util.concurrent.ThreadPoolExecutor.{AbortPolicy, CallerRunsPolicy, DiscardOldestPolicy, DiscardPolicy}
import java.util.concurrent.TimeUnit
/**
@ -165,6 +164,7 @@ object Dispatchers {
* type = "GlobalExecutorBasedEventDriven" # Must be one of the following, all "Global*" are non-configurable
* # (ExecutorBasedEventDrivenWorkStealing), ExecutorBasedEventDriven,
* # GlobalExecutorBasedEventDriven
* # A FQCN to a class inheriting MessageDispatcherConfigurator with a no-arg visible constructor
* keep-alive-time = 60 # Keep alive time for threads
* core-pool-size-factor = 1.0 # No of core threads ... ceil(available processors * factor)
* max-pool-size-factor = 4.0 # Max no of threads ... ceil(available processors * factor)
@ -178,54 +178,52 @@ object Dispatchers {
* Gotcha: Only configures the dispatcher if possible
* Returns: None if "type" isn't specified in the config
* Throws: IllegalArgumentException if the value of "type" is not valid
* IllegalArgumentException if it cannot
*/
def from(cfg: ConfigMap): Option[MessageDispatcher] = {
lazy val name = cfg.getString("name", newUuid.toString)
def configureThreadPool(createDispatcher: => (ThreadPoolConfig) => MessageDispatcher): ThreadPoolConfigDispatcherBuilder = {
import ThreadPoolConfigDispatcherBuilder.conf_?
//Apply the following options to the config if they are present in the cfg
ThreadPoolConfigDispatcherBuilder(createDispatcher,ThreadPoolConfig()).configure(
conf_?(cfg getInt "keep-alive-time" )(time => _.setKeepAliveTime(Duration(time, TIME_UNIT))),
conf_?(cfg getDouble "core-pool-size-factor")(factor => _.setCorePoolSizeFromFactor(factor)),
conf_?(cfg getDouble "max-pool-size-factor" )(factor => _.setMaxPoolSizeFromFactor(factor)),
conf_?(cfg getInt "executor-bounds" )(bounds => _.setExecutorBounds(bounds)),
conf_?(cfg getBool "allow-core-timeout" )(allow => _.setAllowCoreThreadTimeout(allow)),
conf_?(cfg getString "rejection-policy" map {
case "abort" => new AbortPolicy()
case "caller-runs" => new CallerRunsPolicy()
case "discard-oldest" => new DiscardOldestPolicy()
case "discard" => new DiscardPolicy()
case x => throw new IllegalArgumentException("[%s] is not a valid rejectionPolicy!" format x)
})(policy => _.setRejectionPolicy(policy)))
}
lazy val mailboxType: MailboxType = {
val capacity = cfg.getInt("mailbox-capacity", MAILBOX_CAPACITY)
// FIXME how do we read in isBlocking for mailbox? Now set to 'false'.
if (capacity < 0) UnboundedMailbox()
else BoundedMailbox(false, capacity, Duration(cfg.getInt("mailbox-push-timeout-time", MAILBOX_PUSH_TIME_OUT.toMillis.toInt), TIME_UNIT))
}
cfg.getString("type") map {
case "ExecutorBasedEventDriven" =>
configureThreadPool(threadPoolConfig => new ExecutorBasedEventDrivenDispatcher(
name,
cfg.getInt("throughput", THROUGHPUT),
cfg.getInt("throughput-deadline-time", THROUGHPUT_DEADLINE_TIME_MILLIS),
mailboxType,
threadPoolConfig)).build
case "ExecutorBasedEventDrivenWorkStealing" =>
configureThreadPool(threadPoolConfig => new ExecutorBasedEventDrivenWorkStealingDispatcher(
name,
cfg.getInt("throughput", THROUGHPUT),
cfg.getInt("throughput-deadline-time", THROUGHPUT_DEADLINE_TIME_MILLIS),
mailboxType,
threadPoolConfig)).build
case "GlobalExecutorBasedEventDriven" => globalExecutorBasedEventDrivenDispatcher
case unknown => throw new IllegalArgumentException("Unknown dispatcher type [%s]" format unknown)
}
cfg.getString("type") map {
case "ExecutorBasedEventDriven" => new ExecutorBasedEventDrivenDispatcherConfigurator()
case "ExecutorBasedEventDrivenWorkStealing" => new ExecutorBasedEventDrivenWorkStealingDispatcherConfigurator()
case "GlobalExecutorBasedEventDriven" => GlobalExecutorBasedEventDrivenDispatcherConfigurator
case fqn =>
ReflectiveAccess.getClassFor[MessageDispatcherConfigurator](fqn) match {
case Some(clazz) =>
val instance = ReflectiveAccess.createInstance[MessageDispatcherConfigurator](clazz, Array[Class[_]](), Array[AnyRef]())
if (instance.isEmpty)
throw new IllegalArgumentException("Cannot instantiate MessageDispatcherConfigurator type [%s], make sure it has a default no-args constructor" format fqn)
else
instance.get
case None =>
throw new IllegalArgumentException("Unknown MessageDispatcherConfigurator type [%s]" format fqn)
}
} map {
_ configure cfg
}
}
}
object GlobalExecutorBasedEventDrivenDispatcherConfigurator extends MessageDispatcherConfigurator {
def configure(config: ConfigMap): MessageDispatcher = Dispatchers.globalExecutorBasedEventDrivenDispatcher
}
class ExecutorBasedEventDrivenDispatcherConfigurator extends MessageDispatcherConfigurator {
def configure(config: ConfigMap): MessageDispatcher = {
configureThreadPool(config, threadPoolConfig => new ExecutorBasedEventDrivenDispatcher(
config.getString("name", newUuid.toString),
config.getInt("throughput", Dispatchers.THROUGHPUT),
config.getInt("throughput-deadline-time", Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS),
mailboxType(config),
threadPoolConfig)).build
}
}
class ExecutorBasedEventDrivenWorkStealingDispatcherConfigurator extends MessageDispatcherConfigurator {
def configure(config: ConfigMap): MessageDispatcher = {
configureThreadPool(config, threadPoolConfig => new ExecutorBasedEventDrivenWorkStealingDispatcher(
config.getString("name", newUuid.toString),
config.getInt("throughput", Dispatchers.THROUGHPUT),
config.getInt("throughput-deadline-time", Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS),
mailboxType(config),
threadPoolConfig)).build
}
}

View file

@ -6,8 +6,10 @@ package akka.dispatch
import java.util.concurrent._
import atomic. {AtomicInteger, AtomicBoolean, AtomicReference, AtomicLong}
import akka.util.{Switch, ReentrantGuard, HashCode, ReflectiveAccess}
import akka.config.ConfigMap
import akka.config.Config.TIME_UNIT
import akka.util.{Duration, Switch, ReentrantGuard, HashCode, ReflectiveAccess}
import java.util.concurrent.ThreadPoolExecutor.{AbortPolicy, CallerRunsPolicy, DiscardOldestPolicy, DiscardPolicy}
import akka.actor._
/**
@ -162,3 +164,36 @@ trait MessageDispatcher {
*/
def mailboxSize(actorRef: ActorRef): Int
}
/**
* Trait to be used for hooking in new dispatchers into Dispatchers.fromConfig
*/
abstract class MessageDispatcherConfigurator {
def configure(config: ConfigMap): MessageDispatcher
def mailboxType(config: ConfigMap): MailboxType = {
val capacity = config.getInt("mailbox-capacity", Dispatchers.MAILBOX_CAPACITY)
// FIXME how do we read in isBlocking for mailbox? Now set to 'false'.
if (capacity < 0) UnboundedMailbox()
else BoundedMailbox(false, capacity, Duration(config.getInt("mailbox-push-timeout-time", Dispatchers.MAILBOX_PUSH_TIME_OUT.toMillis.toInt), TIME_UNIT))
}
def configureThreadPool(config: ConfigMap, createDispatcher: => (ThreadPoolConfig) => MessageDispatcher): ThreadPoolConfigDispatcherBuilder = {
import ThreadPoolConfigDispatcherBuilder.conf_?
//Apply the following options to the config if they are present in the config
ThreadPoolConfigDispatcherBuilder(createDispatcher,ThreadPoolConfig()).configure(
conf_?(config getInt "keep-alive-time" )(time => _.setKeepAliveTime(Duration(time, TIME_UNIT))),
conf_?(config getDouble "core-pool-size-factor")(factor => _.setCorePoolSizeFromFactor(factor)),
conf_?(config getDouble "max-pool-size-factor" )(factor => _.setMaxPoolSizeFromFactor(factor)),
conf_?(config getInt "executor-bounds" )(bounds => _.setExecutorBounds(bounds)),
conf_?(config getBool "allow-core-timeout" )(allow => _.setAllowCoreThreadTimeout(allow)),
conf_?(config getString "rejection-policy" map {
case "abort" => new AbortPolicy()
case "caller-runs" => new CallerRunsPolicy()
case "discard-oldest" => new DiscardOldestPolicy()
case "discard" => new DiscardPolicy()
case x => throw new IllegalArgumentException("[%s] is not a valid rejectionPolicy!" format x)
})(policy => _.setRejectionPolicy(policy)))
}
}