Added support for pool factors and executor bounds

This commit is contained in:
Viktor Klang 2010-08-13 23:06:52 +02:00
parent e37608fef7
commit d6d233298f
4 changed files with 59 additions and 49 deletions

View file

@ -124,16 +124,17 @@ object Dispatchers extends Logging {
*
* default-dispatcher {
* type = "GlobalExecutorBasedEventDriven" # Must be one of the following, all "Global*" are non-configurable
* # ReactorBasedSingleThreadEventDriven, ExecutorBasedEventDrivenWorkStealing, ExecutorBasedEventDriven,
* # ReactorBasedThreadPoolEventDriven, Hawt, GlobalReactorBasedSingleThreadEventDriven,
* # GlobalReactorBasedThreadPoolEventDriven, GlobalExecutorBasedEventDriven, GlobalHawt
* keep-alive-ms = 60000 # Keep alive time for threads
* core-pool-size = 4 # No of core threads
* max-pool-size = 16 # Max no of threads
* allow-core-timeout = on # Allow core threads to time out
* # ReactorBasedSingleThreadEventDriven, (ExecutorBasedEventDrivenWorkStealing), ExecutorBasedEventDriven,
* # ReactorBasedThreadPoolEventDriven, Hawt, GlobalReactorBasedSingleThreadEventDriven,
* # GlobalReactorBasedThreadPoolEventDriven, GlobalExecutorBasedEventDriven, GlobalHawt
* keep-alive-ms = 60000 # Keep alive time for threads
* core-pool-size-factor = 1.0 # No of core threads ... ceil(available processors * factor)
* max-pool-size-factor = 4.0 # Max no of threads ... ceil(available processors * factor)
* executor-bounds = -1 # Makes the Executor bounded, -1 is unbounded
* allow-core-timeout = on # Allow core threads to time out
* rejection-policy = "caller-runs" # abort, caller-runs, discard-oldest, discard
* throughput = 5 # Throughput for ExecutorBasedEventDrivenDispatcher
* aggregate = off # Aggregate on/off for HawtDispatchers
* throughput = 5 # Throughput for ExecutorBasedEventDrivenDispatcher
* aggregate = off # Aggregate on/off for HawtDispatchers
* }
* ex: from(config.getConfigMap(identifier).get)
*
@ -146,21 +147,13 @@ object Dispatchers extends Logging {
val dispatcher: Option[MessageDispatcher] = cfg.getString("type") map {
case "ReactorBasedSingleThreadEventDriven" => newReactorBasedSingleThreadEventDrivenDispatcher(name)
case "ExecutorBasedEventDrivenWorkStealing" => newExecutorBasedEventDrivenWorkStealingDispatcher(name)
case "ExecutorBasedEventDriven" => newExecutorBasedEventDrivenDispatcher(name,cfg.getInt("throughput",THROUGHPUT))
case "ReactorBasedThreadPoolEventDriven" => newReactorBasedThreadPoolEventDrivenDispatcher(name)
case "Hawt" => newHawtDispatcher(cfg.getBool("aggregate").getOrElse(true))
case "GlobalReactorBasedSingleThreadEventDriven" => globalReactorBasedSingleThreadEventDrivenDispatcher
case "GlobalReactorBasedThreadPoolEventDriven" => globalReactorBasedThreadPoolEventDrivenDispatcher
case "GlobalExecutorBasedEventDriven" => globalExecutorBasedEventDrivenDispatcher
case "GlobalHawt" => globalHawtDispatcher
case unknown => throw new IllegalArgumentException("Unknown dispatcher type %s" format unknown)
@ -170,25 +163,17 @@ object Dispatchers extends Logging {
case d: ThreadPoolBuilder => d.configureIfPossible( builder => {
cfg.getInt("keep-alive-ms").foreach(builder.setKeepAliveTimeInMillis(_))
cfg.getInt("core-pool-size").foreach(builder.setCorePoolSize(_))
cfg.getInt("max-pool-size").foreach(builder.setMaxPoolSize(_))
cfg.getDouble("core-pool-size-factor").foreach(builder.setCorePoolSizeFromFactor(_))
cfg.getDouble("max-pool-size-factor").foreach(builder.setMaxPoolSizeFromFactor(_))
cfg.getInt("executor-bounds").foreach(builder.setExecutorBounds(_))
cfg.getBool("allow-core-timeout").foreach(builder.setAllowCoreThreadTimeout(_))
cfg.getString("rejection-policy").map({
case "abort" => new AbortPolicy()
case "caller-runs" => new CallerRunsPolicy()
case "discard-oldest" => new DiscardOldestPolicy()
case "discard" => new DiscardPolicy()
case x => throw new IllegalArgumentException("[%s] is not a valid rejectionPolicy!" format x)
}).foreach(builder.setRejectionPolicy(_))
})
case _ =>

View file

@ -134,8 +134,31 @@ trait ThreadPoolBuilder extends Logging {
def setMaxPoolSize(size: Int): ThreadPoolBuilder =
setThreadPoolExecutorProperty(_.setMaximumPoolSize(size))
/**
* Default is 60000 (one minute).
* Sets the core pool size to (availableProcessors * multipliers).ceil.toInt
*/
def setCorePoolSizeFromFactor(multiplier: Double): ThreadPoolBuilder =
setThreadPoolExecutorProperty(_.setCorePoolSize(procs(multiplier)))
/**
* Sets the max pool size to (availableProcessors * multipliers).ceil.toInt
*/
def setMaxPoolSizeFromFactor(multiplier: Double): ThreadPoolBuilder =
setThreadPoolExecutorProperty(_.setMaximumPoolSize(procs(multiplier)))
/**
* Sets the bound, -1 is unbounded
*/
def setExecutorBounds(bounds: Int): Unit = synchronized {
this.boundedExecutorBound = bounds
}
protected def procs(multiplier: Double): Int =
(Runtime.getRuntime.availableProcessors * multiplier).ceil.toInt
/**
* Default is 60000 (one minute).
*/
def setKeepAliveTimeInMillis(time: Long): ThreadPoolBuilder =
setThreadPoolExecutorProperty(_.setKeepAliveTime(time, MILLISECONDS))

View file

@ -14,14 +14,15 @@ import se.scalablesolutions.akka.dispatch._
object DispatchersSpec {
import Dispatchers._
//
val tipe = "type"
val keepalivems = "keep-alive-ms"
val corepoolsize = "core-pool-size"
val maxpoolsize = "max-pool-size"
val allowcoretimeout = "allow-core-timeout"
val rejectionpolicy = "rejection-policy" // abort, caller-runs, discard-oldest, discard
val throughput = "throughput" // Throughput for ExecutorBasedEventDrivenDispatcher
val aggregate = "aggregate" // Aggregate on/off for HawtDispatchers
val tipe = "type"
val keepalivems = "keep-alive-ms"
val corepoolsizefactor = "core-pool-size-factor"
val maxpoolsizefactor = "max-pool-size-factor"
val executorbounds = "executor-bounds"
val allowcoretimeout = "allow-core-timeout"
val rejectionpolicy = "rejection-policy" // abort, caller-runs, discard-oldest, discard
val throughput = "throughput" // Throughput for ExecutorBasedEventDrivenDispatcher
val aggregate = "aggregate" // Aggregate on/off for HawtDispatchers
def instance(dispatcher: MessageDispatcher): (MessageDispatcher) => Boolean = _ == dispatcher
def ofType[T <: MessageDispatcher : Manifest]: (MessageDispatcher) => Boolean = _.getClass == manifest[T].erasure

View file

@ -18,21 +18,22 @@ akka {
"sample.security.Boot"]
actor {
timeout = 5 # default timeout for future based invocations
serialize-messages = off # does a deep clone of (non-primitive) messages to ensure immutability
throughput = 5 # Default throughput for all ExecutorBasedEventDrivenDispatcher
timeout = 5 # default timeout for future based invocations
serialize-messages = off # does a deep clone of (non-primitive) messages to ensure immutability
throughput = 5 # Default throughput for all ExecutorBasedEventDrivenDispatcher
default-dispatcher {
type = "GlobalExecutorBasedEventDriven" # Must be one of the following, all "Global*" are non-configurable
# ReactorBasedSingleThreadEventDriven, (ExecutorBasedEventDrivenWorkStealing), ExecutorBasedEventDriven,
# ReactorBasedThreadPoolEventDriven, Hawt, GlobalReactorBasedSingleThreadEventDriven,
# GlobalReactorBasedThreadPoolEventDriven, GlobalExecutorBasedEventDriven, GlobalHawt
keep-alive-ms = 60000 # Keep alive time for threads
core-pool-size = 4 # No of core threads
max-pool-size = 16 # Max no of threads
allow-core-timeout = on # Allow core threads to time out
# ReactorBasedSingleThreadEventDriven, (ExecutorBasedEventDrivenWorkStealing), ExecutorBasedEventDriven,
# ReactorBasedThreadPoolEventDriven, Hawt, GlobalReactorBasedSingleThreadEventDriven,
# GlobalReactorBasedThreadPoolEventDriven, GlobalExecutorBasedEventDriven, GlobalHawt
keep-alive-ms = 60000 # Keep alive time for threads
core-pool-size-factor = 1.0 # No of core threads ... ceil(available processors * factor)
max-pool-size-factor = 4.0 # Max no of threads ... ceil(available processors * factor)
executor-bounds = -1 # Makes the Executor bounded, -1 is unbounded
allow-core-timeout = on # Allow core threads to time out
rejection-policy = "caller-runs" # abort, caller-runs, discard-oldest, discard
throughput = 5 # Throughput for ExecutorBasedEventDrivenDispatcher
aggregate = off # Aggregate on/off for HawtDispatchers
throughput = 5 # Throughput for ExecutorBasedEventDrivenDispatcher
aggregate = off # Aggregate on/off for HawtDispatchers
}
}