Added support for pool factors and executor bounds
This commit is contained in:
parent
854d7a4512
commit
b7e9bc4f8f
4 changed files with 59 additions and 49 deletions
|
|
@ -124,16 +124,17 @@ object Dispatchers extends Logging {
|
||||||
*
|
*
|
||||||
* default-dispatcher {
|
* default-dispatcher {
|
||||||
* type = "GlobalExecutorBasedEventDriven" # Must be one of the following, all "Global*" are non-configurable
|
* type = "GlobalExecutorBasedEventDriven" # Must be one of the following, all "Global*" are non-configurable
|
||||||
* # ReactorBasedSingleThreadEventDriven, ExecutorBasedEventDrivenWorkStealing, ExecutorBasedEventDriven,
|
* # ReactorBasedSingleThreadEventDriven, (ExecutorBasedEventDrivenWorkStealing), ExecutorBasedEventDriven,
|
||||||
* # ReactorBasedThreadPoolEventDriven, Hawt, GlobalReactorBasedSingleThreadEventDriven,
|
* # ReactorBasedThreadPoolEventDriven, Hawt, GlobalReactorBasedSingleThreadEventDriven,
|
||||||
* # GlobalReactorBasedThreadPoolEventDriven, GlobalExecutorBasedEventDriven, GlobalHawt
|
* # GlobalReactorBasedThreadPoolEventDriven, GlobalExecutorBasedEventDriven, GlobalHawt
|
||||||
* keep-alive-ms = 60000 # Keep alive time for threads
|
* keep-alive-ms = 60000 # Keep alive time for threads
|
||||||
* core-pool-size = 4 # No of core threads
|
* core-pool-size-factor = 1.0 # No of core threads ... ceil(available processors * factor)
|
||||||
* max-pool-size = 16 # Max no of threads
|
* max-pool-size-factor = 4.0 # Max no of threads ... ceil(available processors * factor)
|
||||||
* allow-core-timeout = on # Allow core threads to time out
|
* executor-bounds = -1 # Makes the Executor bounded, -1 is unbounded
|
||||||
|
* allow-core-timeout = on # Allow core threads to time out
|
||||||
* rejection-policy = "caller-runs" # abort, caller-runs, discard-oldest, discard
|
* rejection-policy = "caller-runs" # abort, caller-runs, discard-oldest, discard
|
||||||
* throughput = 5 # Throughput for ExecutorBasedEventDrivenDispatcher
|
* throughput = 5 # Throughput for ExecutorBasedEventDrivenDispatcher
|
||||||
* aggregate = off # Aggregate on/off for HawtDispatchers
|
* aggregate = off # Aggregate on/off for HawtDispatchers
|
||||||
* }
|
* }
|
||||||
* ex: from(config.getConfigMap(identifier).get)
|
* ex: from(config.getConfigMap(identifier).get)
|
||||||
*
|
*
|
||||||
|
|
@ -146,21 +147,13 @@ object Dispatchers extends Logging {
|
||||||
|
|
||||||
val dispatcher: Option[MessageDispatcher] = cfg.getString("type") map {
|
val dispatcher: Option[MessageDispatcher] = cfg.getString("type") map {
|
||||||
case "ReactorBasedSingleThreadEventDriven" => newReactorBasedSingleThreadEventDrivenDispatcher(name)
|
case "ReactorBasedSingleThreadEventDriven" => newReactorBasedSingleThreadEventDrivenDispatcher(name)
|
||||||
|
|
||||||
case "ExecutorBasedEventDrivenWorkStealing" => newExecutorBasedEventDrivenWorkStealingDispatcher(name)
|
case "ExecutorBasedEventDrivenWorkStealing" => newExecutorBasedEventDrivenWorkStealingDispatcher(name)
|
||||||
|
|
||||||
case "ExecutorBasedEventDriven" => newExecutorBasedEventDrivenDispatcher(name,cfg.getInt("throughput",THROUGHPUT))
|
case "ExecutorBasedEventDriven" => newExecutorBasedEventDrivenDispatcher(name,cfg.getInt("throughput",THROUGHPUT))
|
||||||
|
|
||||||
case "ReactorBasedThreadPoolEventDriven" => newReactorBasedThreadPoolEventDrivenDispatcher(name)
|
case "ReactorBasedThreadPoolEventDriven" => newReactorBasedThreadPoolEventDrivenDispatcher(name)
|
||||||
|
|
||||||
case "Hawt" => newHawtDispatcher(cfg.getBool("aggregate").getOrElse(true))
|
case "Hawt" => newHawtDispatcher(cfg.getBool("aggregate").getOrElse(true))
|
||||||
|
|
||||||
case "GlobalReactorBasedSingleThreadEventDriven" => globalReactorBasedSingleThreadEventDrivenDispatcher
|
case "GlobalReactorBasedSingleThreadEventDriven" => globalReactorBasedSingleThreadEventDrivenDispatcher
|
||||||
|
|
||||||
case "GlobalReactorBasedThreadPoolEventDriven" => globalReactorBasedThreadPoolEventDrivenDispatcher
|
case "GlobalReactorBasedThreadPoolEventDriven" => globalReactorBasedThreadPoolEventDrivenDispatcher
|
||||||
|
|
||||||
case "GlobalExecutorBasedEventDriven" => globalExecutorBasedEventDrivenDispatcher
|
case "GlobalExecutorBasedEventDriven" => globalExecutorBasedEventDrivenDispatcher
|
||||||
|
|
||||||
case "GlobalHawt" => globalHawtDispatcher
|
case "GlobalHawt" => globalHawtDispatcher
|
||||||
|
|
||||||
case unknown => throw new IllegalArgumentException("Unknown dispatcher type %s" format unknown)
|
case unknown => throw new IllegalArgumentException("Unknown dispatcher type %s" format unknown)
|
||||||
|
|
@ -170,25 +163,17 @@ object Dispatchers extends Logging {
|
||||||
case d: ThreadPoolBuilder => d.configureIfPossible( builder => {
|
case d: ThreadPoolBuilder => d.configureIfPossible( builder => {
|
||||||
|
|
||||||
cfg.getInt("keep-alive-ms").foreach(builder.setKeepAliveTimeInMillis(_))
|
cfg.getInt("keep-alive-ms").foreach(builder.setKeepAliveTimeInMillis(_))
|
||||||
|
cfg.getDouble("core-pool-size-factor").foreach(builder.setCorePoolSizeFromFactor(_))
|
||||||
cfg.getInt("core-pool-size").foreach(builder.setCorePoolSize(_))
|
cfg.getDouble("max-pool-size-factor").foreach(builder.setMaxPoolSizeFromFactor(_))
|
||||||
|
cfg.getInt("executor-bounds").foreach(builder.setExecutorBounds(_))
|
||||||
cfg.getInt("max-pool-size").foreach(builder.setMaxPoolSize(_))
|
|
||||||
|
|
||||||
cfg.getBool("allow-core-timeout").foreach(builder.setAllowCoreThreadTimeout(_))
|
cfg.getBool("allow-core-timeout").foreach(builder.setAllowCoreThreadTimeout(_))
|
||||||
|
|
||||||
cfg.getString("rejection-policy").map({
|
cfg.getString("rejection-policy").map({
|
||||||
|
|
||||||
case "abort" => new AbortPolicy()
|
case "abort" => new AbortPolicy()
|
||||||
|
|
||||||
case "caller-runs" => new CallerRunsPolicy()
|
case "caller-runs" => new CallerRunsPolicy()
|
||||||
|
|
||||||
case "discard-oldest" => new DiscardOldestPolicy()
|
case "discard-oldest" => new DiscardOldestPolicy()
|
||||||
|
|
||||||
case "discard" => new DiscardPolicy()
|
case "discard" => new DiscardPolicy()
|
||||||
|
|
||||||
case x => throw new IllegalArgumentException("[%s] is not a valid rejectionPolicy!" format x)
|
case x => throw new IllegalArgumentException("[%s] is not a valid rejectionPolicy!" format x)
|
||||||
|
|
||||||
}).foreach(builder.setRejectionPolicy(_))
|
}).foreach(builder.setRejectionPolicy(_))
|
||||||
})
|
})
|
||||||
case _ =>
|
case _ =>
|
||||||
|
|
|
||||||
|
|
@ -134,8 +134,31 @@ trait ThreadPoolBuilder extends Logging {
|
||||||
def setMaxPoolSize(size: Int): ThreadPoolBuilder =
|
def setMaxPoolSize(size: Int): ThreadPoolBuilder =
|
||||||
setThreadPoolExecutorProperty(_.setMaximumPoolSize(size))
|
setThreadPoolExecutorProperty(_.setMaximumPoolSize(size))
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Default is 60000 (one minute).
|
* Sets the core pool size to (availableProcessors * multipliers).ceil.toInt
|
||||||
|
*/
|
||||||
|
def setCorePoolSizeFromFactor(multiplier: Double): ThreadPoolBuilder =
|
||||||
|
setThreadPoolExecutorProperty(_.setCorePoolSize(procs(multiplier)))
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Sets the max pool size to (availableProcessors * multipliers).ceil.toInt
|
||||||
|
*/
|
||||||
|
def setMaxPoolSizeFromFactor(multiplier: Double): ThreadPoolBuilder =
|
||||||
|
setThreadPoolExecutorProperty(_.setMaximumPoolSize(procs(multiplier)))
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Sets the bound, -1 is unbounded
|
||||||
|
*/
|
||||||
|
def setExecutorBounds(bounds: Int): Unit = synchronized {
|
||||||
|
this.boundedExecutorBound = bounds
|
||||||
|
}
|
||||||
|
|
||||||
|
protected def procs(multiplier: Double): Int =
|
||||||
|
(Runtime.getRuntime.availableProcessors * multiplier).ceil.toInt
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Default is 60000 (one minute).
|
||||||
*/
|
*/
|
||||||
def setKeepAliveTimeInMillis(time: Long): ThreadPoolBuilder =
|
def setKeepAliveTimeInMillis(time: Long): ThreadPoolBuilder =
|
||||||
setThreadPoolExecutorProperty(_.setKeepAliveTime(time, MILLISECONDS))
|
setThreadPoolExecutorProperty(_.setKeepAliveTime(time, MILLISECONDS))
|
||||||
|
|
|
||||||
|
|
@ -14,14 +14,15 @@ import se.scalablesolutions.akka.dispatch._
|
||||||
object DispatchersSpec {
|
object DispatchersSpec {
|
||||||
import Dispatchers._
|
import Dispatchers._
|
||||||
//
|
//
|
||||||
val tipe = "type"
|
val tipe = "type"
|
||||||
val keepalivems = "keep-alive-ms"
|
val keepalivems = "keep-alive-ms"
|
||||||
val corepoolsize = "core-pool-size"
|
val corepoolsizefactor = "core-pool-size-factor"
|
||||||
val maxpoolsize = "max-pool-size"
|
val maxpoolsizefactor = "max-pool-size-factor"
|
||||||
val allowcoretimeout = "allow-core-timeout"
|
val executorbounds = "executor-bounds"
|
||||||
val rejectionpolicy = "rejection-policy" // abort, caller-runs, discard-oldest, discard
|
val allowcoretimeout = "allow-core-timeout"
|
||||||
val throughput = "throughput" // Throughput for ExecutorBasedEventDrivenDispatcher
|
val rejectionpolicy = "rejection-policy" // abort, caller-runs, discard-oldest, discard
|
||||||
val aggregate = "aggregate" // Aggregate on/off for HawtDispatchers
|
val throughput = "throughput" // Throughput for ExecutorBasedEventDrivenDispatcher
|
||||||
|
val aggregate = "aggregate" // Aggregate on/off for HawtDispatchers
|
||||||
|
|
||||||
def instance(dispatcher: MessageDispatcher): (MessageDispatcher) => Boolean = _ == dispatcher
|
def instance(dispatcher: MessageDispatcher): (MessageDispatcher) => Boolean = _ == dispatcher
|
||||||
def ofType[T <: MessageDispatcher : Manifest]: (MessageDispatcher) => Boolean = _.getClass == manifest[T].erasure
|
def ofType[T <: MessageDispatcher : Manifest]: (MessageDispatcher) => Boolean = _.getClass == manifest[T].erasure
|
||||||
|
|
|
||||||
|
|
@ -18,21 +18,22 @@ akka {
|
||||||
"sample.security.Boot"]
|
"sample.security.Boot"]
|
||||||
|
|
||||||
actor {
|
actor {
|
||||||
timeout = 5 # default timeout for future based invocations
|
timeout = 5 # default timeout for future based invocations
|
||||||
serialize-messages = off # does a deep clone of (non-primitive) messages to ensure immutability
|
serialize-messages = off # does a deep clone of (non-primitive) messages to ensure immutability
|
||||||
throughput = 5 # Default throughput for all ExecutorBasedEventDrivenDispatcher
|
throughput = 5 # Default throughput for all ExecutorBasedEventDrivenDispatcher
|
||||||
default-dispatcher {
|
default-dispatcher {
|
||||||
type = "GlobalExecutorBasedEventDriven" # Must be one of the following, all "Global*" are non-configurable
|
type = "GlobalExecutorBasedEventDriven" # Must be one of the following, all "Global*" are non-configurable
|
||||||
# ReactorBasedSingleThreadEventDriven, (ExecutorBasedEventDrivenWorkStealing), ExecutorBasedEventDriven,
|
# ReactorBasedSingleThreadEventDriven, (ExecutorBasedEventDrivenWorkStealing), ExecutorBasedEventDriven,
|
||||||
# ReactorBasedThreadPoolEventDriven, Hawt, GlobalReactorBasedSingleThreadEventDriven,
|
# ReactorBasedThreadPoolEventDriven, Hawt, GlobalReactorBasedSingleThreadEventDriven,
|
||||||
# GlobalReactorBasedThreadPoolEventDriven, GlobalExecutorBasedEventDriven, GlobalHawt
|
# GlobalReactorBasedThreadPoolEventDriven, GlobalExecutorBasedEventDriven, GlobalHawt
|
||||||
keep-alive-ms = 60000 # Keep alive time for threads
|
keep-alive-ms = 60000 # Keep alive time for threads
|
||||||
core-pool-size = 4 # No of core threads
|
core-pool-size-factor = 1.0 # No of core threads ... ceil(available processors * factor)
|
||||||
max-pool-size = 16 # Max no of threads
|
max-pool-size-factor = 4.0 # Max no of threads ... ceil(available processors * factor)
|
||||||
allow-core-timeout = on # Allow core threads to time out
|
executor-bounds = -1 # Makes the Executor bounded, -1 is unbounded
|
||||||
|
allow-core-timeout = on # Allow core threads to time out
|
||||||
rejection-policy = "caller-runs" # abort, caller-runs, discard-oldest, discard
|
rejection-policy = "caller-runs" # abort, caller-runs, discard-oldest, discard
|
||||||
throughput = 5 # Throughput for ExecutorBasedEventDrivenDispatcher
|
throughput = 5 # Throughput for ExecutorBasedEventDrivenDispatcher
|
||||||
aggregate = off # Aggregate on/off for HawtDispatchers
|
aggregate = off # Aggregate on/off for HawtDispatchers
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue