diff --git a/akka-core/src/main/scala/dispatch/Dispatchers.scala b/akka-core/src/main/scala/dispatch/Dispatchers.scala index 5055126b52..82c51c57de 100644 --- a/akka-core/src/main/scala/dispatch/Dispatchers.scala +++ b/akka-core/src/main/scala/dispatch/Dispatchers.scala @@ -124,16 +124,17 @@ object Dispatchers extends Logging { * * default-dispatcher { * type = "GlobalExecutorBasedEventDriven" # Must be one of the following, all "Global*" are non-configurable - * # ReactorBasedSingleThreadEventDriven, ExecutorBasedEventDrivenWorkStealing, ExecutorBasedEventDriven, - * # ReactorBasedThreadPoolEventDriven, Hawt, GlobalReactorBasedSingleThreadEventDriven, - * # GlobalReactorBasedThreadPoolEventDriven, GlobalExecutorBasedEventDriven, GlobalHawt - * keep-alive-ms = 60000 # Keep alive time for threads - * core-pool-size = 4 # No of core threads - * max-pool-size = 16 # Max no of threads - * allow-core-timeout = on # Allow core threads to time out + * # ReactorBasedSingleThreadEventDriven, (ExecutorBasedEventDrivenWorkStealing), ExecutorBasedEventDriven, + * # ReactorBasedThreadPoolEventDriven, Hawt, GlobalReactorBasedSingleThreadEventDriven, + * # GlobalReactorBasedThreadPoolEventDriven, GlobalExecutorBasedEventDriven, GlobalHawt + * keep-alive-ms = 60000 # Keep alive time for threads + * core-pool-size-factor = 1.0 # No of core threads ... ceil(available processors * factor) + * max-pool-size-factor = 4.0 # Max no of threads ... ceil(available processors * factor) + * executor-bounds = -1 # Makes the Executor bounded, -1 is unbounded + * allow-core-timeout = on # Allow core threads to time out * rejection-policy = "caller-runs" # abort, caller-runs, discard-oldest, discard - * throughput = 5 # Throughput for ExecutorBasedEventDrivenDispatcher - * aggregate = off # Aggregate on/off for HawtDispatchers + * throughput = 5 # Throughput for ExecutorBasedEventDrivenDispatcher + * aggregate = off # Aggregate on/off for HawtDispatchers * } * ex: from(config.getConfigMap(identifier).get) * @@ -146,21 +147,13 @@ object Dispatchers extends Logging { val dispatcher: Option[MessageDispatcher] = cfg.getString("type") map { case "ReactorBasedSingleThreadEventDriven" => newReactorBasedSingleThreadEventDrivenDispatcher(name) - case "ExecutorBasedEventDrivenWorkStealing" => newExecutorBasedEventDrivenWorkStealingDispatcher(name) - case "ExecutorBasedEventDriven" => newExecutorBasedEventDrivenDispatcher(name,cfg.getInt("throughput",THROUGHPUT)) - case "ReactorBasedThreadPoolEventDriven" => newReactorBasedThreadPoolEventDrivenDispatcher(name) - case "Hawt" => newHawtDispatcher(cfg.getBool("aggregate").getOrElse(true)) - case "GlobalReactorBasedSingleThreadEventDriven" => globalReactorBasedSingleThreadEventDrivenDispatcher - case "GlobalReactorBasedThreadPoolEventDriven" => globalReactorBasedThreadPoolEventDrivenDispatcher - case "GlobalExecutorBasedEventDriven" => globalExecutorBasedEventDrivenDispatcher - case "GlobalHawt" => globalHawtDispatcher case unknown => throw new IllegalArgumentException("Unknown dispatcher type %s" format unknown) @@ -170,25 +163,17 @@ object Dispatchers extends Logging { case d: ThreadPoolBuilder => d.configureIfPossible( builder => { cfg.getInt("keep-alive-ms").foreach(builder.setKeepAliveTimeInMillis(_)) - - cfg.getInt("core-pool-size").foreach(builder.setCorePoolSize(_)) - - cfg.getInt("max-pool-size").foreach(builder.setMaxPoolSize(_)) - + cfg.getDouble("core-pool-size-factor").foreach(builder.setCorePoolSizeFromFactor(_)) + cfg.getDouble("max-pool-size-factor").foreach(builder.setMaxPoolSizeFromFactor(_)) + cfg.getInt("executor-bounds").foreach(builder.setExecutorBounds(_)) cfg.getBool("allow-core-timeout").foreach(builder.setAllowCoreThreadTimeout(_)) cfg.getString("rejection-policy").map({ - case "abort" => new AbortPolicy() - case "caller-runs" => new CallerRunsPolicy() - case "discard-oldest" => new DiscardOldestPolicy() - case "discard" => new DiscardPolicy() - case x => throw new IllegalArgumentException("[%s] is not a valid rejectionPolicy!" format x) - }).foreach(builder.setRejectionPolicy(_)) }) case _ => diff --git a/akka-core/src/main/scala/dispatch/ThreadPoolBuilder.scala b/akka-core/src/main/scala/dispatch/ThreadPoolBuilder.scala index 340726a222..7cc96400a8 100644 --- a/akka-core/src/main/scala/dispatch/ThreadPoolBuilder.scala +++ b/akka-core/src/main/scala/dispatch/ThreadPoolBuilder.scala @@ -134,8 +134,31 @@ trait ThreadPoolBuilder extends Logging { def setMaxPoolSize(size: Int): ThreadPoolBuilder = setThreadPoolExecutorProperty(_.setMaximumPoolSize(size)) + /** - * Default is 60000 (one minute). + * Sets the core pool size to (availableProcessors * multipliers).ceil.toInt + */ + def setCorePoolSizeFromFactor(multiplier: Double): ThreadPoolBuilder = + setThreadPoolExecutorProperty(_.setCorePoolSize(procs(multiplier))) + + /** + * Sets the max pool size to (availableProcessors * multipliers).ceil.toInt + */ + def setMaxPoolSizeFromFactor(multiplier: Double): ThreadPoolBuilder = + setThreadPoolExecutorProperty(_.setMaximumPoolSize(procs(multiplier))) + + /** + * Sets the bound, -1 is unbounded + */ + def setExecutorBounds(bounds: Int): Unit = synchronized { + this.boundedExecutorBound = bounds + } + + protected def procs(multiplier: Double): Int = + (Runtime.getRuntime.availableProcessors * multiplier).ceil.toInt + + /** + * Default is 60000 (one minute). */ def setKeepAliveTimeInMillis(time: Long): ThreadPoolBuilder = setThreadPoolExecutorProperty(_.setKeepAliveTime(time, MILLISECONDS)) diff --git a/akka-core/src/test/scala/dispatch/DispatchersSpec.scala b/akka-core/src/test/scala/dispatch/DispatchersSpec.scala index 3b99cddfc7..bb548b9251 100644 --- a/akka-core/src/test/scala/dispatch/DispatchersSpec.scala +++ b/akka-core/src/test/scala/dispatch/DispatchersSpec.scala @@ -14,14 +14,15 @@ import se.scalablesolutions.akka.dispatch._ object DispatchersSpec { import Dispatchers._ // - val tipe = "type" - val keepalivems = "keep-alive-ms" - val corepoolsize = "core-pool-size" - val maxpoolsize = "max-pool-size" - val allowcoretimeout = "allow-core-timeout" - val rejectionpolicy = "rejection-policy" // abort, caller-runs, discard-oldest, discard - val throughput = "throughput" // Throughput for ExecutorBasedEventDrivenDispatcher - val aggregate = "aggregate" // Aggregate on/off for HawtDispatchers + val tipe = "type" + val keepalivems = "keep-alive-ms" + val corepoolsizefactor = "core-pool-size-factor" + val maxpoolsizefactor = "max-pool-size-factor" + val executorbounds = "executor-bounds" + val allowcoretimeout = "allow-core-timeout" + val rejectionpolicy = "rejection-policy" // abort, caller-runs, discard-oldest, discard + val throughput = "throughput" // Throughput for ExecutorBasedEventDrivenDispatcher + val aggregate = "aggregate" // Aggregate on/off for HawtDispatchers def instance(dispatcher: MessageDispatcher): (MessageDispatcher) => Boolean = _ == dispatcher def ofType[T <: MessageDispatcher : Manifest]: (MessageDispatcher) => Boolean = _.getClass == manifest[T].erasure diff --git a/config/akka-reference.conf b/config/akka-reference.conf index bd1fdd0c02..d79383dcf6 100644 --- a/config/akka-reference.conf +++ b/config/akka-reference.conf @@ -18,21 +18,22 @@ akka { "sample.security.Boot"] actor { - timeout = 5 # default timeout for future based invocations - serialize-messages = off # does a deep clone of (non-primitive) messages to ensure immutability - throughput = 5 # Default throughput for all ExecutorBasedEventDrivenDispatcher + timeout = 5 # default timeout for future based invocations + serialize-messages = off # does a deep clone of (non-primitive) messages to ensure immutability + throughput = 5 # Default throughput for all ExecutorBasedEventDrivenDispatcher default-dispatcher { type = "GlobalExecutorBasedEventDriven" # Must be one of the following, all "Global*" are non-configurable - # ReactorBasedSingleThreadEventDriven, (ExecutorBasedEventDrivenWorkStealing), ExecutorBasedEventDriven, - # ReactorBasedThreadPoolEventDriven, Hawt, GlobalReactorBasedSingleThreadEventDriven, - # GlobalReactorBasedThreadPoolEventDriven, GlobalExecutorBasedEventDriven, GlobalHawt - keep-alive-ms = 60000 # Keep alive time for threads - core-pool-size = 4 # No of core threads - max-pool-size = 16 # Max no of threads - allow-core-timeout = on # Allow core threads to time out + # ReactorBasedSingleThreadEventDriven, (ExecutorBasedEventDrivenWorkStealing), ExecutorBasedEventDriven, + # ReactorBasedThreadPoolEventDriven, Hawt, GlobalReactorBasedSingleThreadEventDriven, + # GlobalReactorBasedThreadPoolEventDriven, GlobalExecutorBasedEventDriven, GlobalHawt + keep-alive-ms = 60000 # Keep alive time for threads + core-pool-size-factor = 1.0 # No of core threads ... ceil(available processors * factor) + max-pool-size-factor = 4.0 # Max no of threads ... ceil(available processors * factor) + executor-bounds = -1 # Makes the Executor bounded, -1 is unbounded + allow-core-timeout = on # Allow core threads to time out rejection-policy = "caller-runs" # abort, caller-runs, discard-oldest, discard - throughput = 5 # Throughput for ExecutorBasedEventDrivenDispatcher - aggregate = off # Aggregate on/off for HawtDispatchers + throughput = 5 # Throughput for ExecutorBasedEventDrivenDispatcher + aggregate = off # Aggregate on/off for HawtDispatchers } }