Cleaned up code and verified tests
This commit is contained in:
parent
d9fe236b2e
commit
eec2c409f1
4 changed files with 20 additions and 10 deletions
|
|
@ -123,9 +123,10 @@ object Dispatchers extends Logging {
|
||||||
* Creates of obtains a dispatcher from a ConfigMap according to the format below
|
* Creates of obtains a dispatcher from a ConfigMap according to the format below
|
||||||
*
|
*
|
||||||
* default-dispatcher {
|
* default-dispatcher {
|
||||||
* type = "Default" # ReactorBasedSingleThreadEventDriven, ExecutorBasedEventDrivenWorkStealing, ExecutorBasedEventDriven,
|
* type = "GlobalExecutorBasedEventDriven" # Must be one of the following, all "Global*" are non-configurable
|
||||||
|
* # ReactorBasedSingleThreadEventDriven, ExecutorBasedEventDrivenWorkStealing, ExecutorBasedEventDriven,
|
||||||
* # ReactorBasedThreadPoolEventDriven, Hawt, GlobalReactorBasedSingleThreadEventDriven,
|
* # ReactorBasedThreadPoolEventDriven, Hawt, GlobalReactorBasedSingleThreadEventDriven,
|
||||||
* # GlobalReactorBasedThreadPoolEventDriven, GlobalExecutorBasedEventDriven, GlobalHawt, Default
|
* # GlobalReactorBasedThreadPoolEventDriven, GlobalExecutorBasedEventDriven, GlobalHawt
|
||||||
* keep-alive-ms = 60000 # Keep alive time for threads
|
* keep-alive-ms = 60000 # Keep alive time for threads
|
||||||
* core-pool-size = 4 # No of core threads
|
* core-pool-size = 4 # No of core threads
|
||||||
* max-pool-size = 16 # Max no of threads
|
* max-pool-size = 16 # Max no of threads
|
||||||
|
|
|
||||||
|
|
@ -12,7 +12,7 @@ import ThreadPoolExecutor.CallerRunsPolicy
|
||||||
import se.scalablesolutions.akka.actor.IllegalActorStateException
|
import se.scalablesolutions.akka.actor.IllegalActorStateException
|
||||||
import se.scalablesolutions.akka.util.{Logger, Logging}
|
import se.scalablesolutions.akka.util.{Logger, Logging}
|
||||||
|
|
||||||
trait ThreadPoolBuilder {
|
trait ThreadPoolBuilder extends Logging {
|
||||||
val name: String
|
val name: String
|
||||||
|
|
||||||
private val NR_START_THREADS = 16
|
private val NR_START_THREADS = 16
|
||||||
|
|
@ -34,6 +34,15 @@ trait ThreadPoolBuilder {
|
||||||
def buildThreadPool(): Unit = synchronized {
|
def buildThreadPool(): Unit = synchronized {
|
||||||
ensureNotActive
|
ensureNotActive
|
||||||
inProcessOfBuilding = false
|
inProcessOfBuilding = false
|
||||||
|
|
||||||
|
log.debug("Creating a %s with config [core-pool:%d,max-pool:%d,timeout:%d,allowCoreTimeout:%s,rejectPolicy:%s]",
|
||||||
|
getClass.getName,
|
||||||
|
threadPoolBuilder.getCorePoolSize,
|
||||||
|
threadPoolBuilder.getMaximumPoolSize,
|
||||||
|
threadPoolBuilder.getKeepAliveTime(MILLISECONDS),
|
||||||
|
threadPoolBuilder.allowsCoreThreadTimeOut,
|
||||||
|
threadPoolBuilder.getRejectedExecutionHandler.getClass.getSimpleName)
|
||||||
|
|
||||||
if (boundedExecutorBound > 0) {
|
if (boundedExecutorBound > 0) {
|
||||||
val boundedExecutor = new BoundedExecutorDecorator(threadPoolBuilder, boundedExecutorBound)
|
val boundedExecutor = new BoundedExecutorDecorator(threadPoolBuilder, boundedExecutorBound)
|
||||||
boundedExecutorBound = -1
|
boundedExecutorBound = -1
|
||||||
|
|
@ -108,7 +117,7 @@ trait ThreadPoolBuilder {
|
||||||
true
|
true
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
Logger(getClass).info("Tried to configure an already started ThreadPoolBuilder")
|
log.warning("Tried to configure an already started ThreadPoolBuilder of type [%s]",getClass.getName)
|
||||||
false
|
false
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -35,8 +35,7 @@ object DispatchersSpec {
|
||||||
"GlobalReactorBasedSingleThreadEventDriven" -> instance(globalReactorBasedSingleThreadEventDrivenDispatcher),
|
"GlobalReactorBasedSingleThreadEventDriven" -> instance(globalReactorBasedSingleThreadEventDrivenDispatcher),
|
||||||
"GlobalReactorBasedThreadPoolEventDriven" -> instance(globalReactorBasedThreadPoolEventDrivenDispatcher),
|
"GlobalReactorBasedThreadPoolEventDriven" -> instance(globalReactorBasedThreadPoolEventDrivenDispatcher),
|
||||||
"GlobalExecutorBasedEventDriven" -> instance(globalExecutorBasedEventDrivenDispatcher),
|
"GlobalExecutorBasedEventDriven" -> instance(globalExecutorBasedEventDrivenDispatcher),
|
||||||
"GlobalHawt" -> instance(globalHawtDispatcher),
|
"GlobalHawt" -> instance(globalHawtDispatcher)
|
||||||
"Default" -> instance(globalExecutorBasedEventDrivenDispatcher)
|
|
||||||
)
|
)
|
||||||
|
|
||||||
def validTypes = typesAndValidators.keys.toList
|
def validTypes = typesAndValidators.keys.toList
|
||||||
|
|
@ -68,7 +67,7 @@ class DispatchersSpec extends JUnitSuite {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test def defaultingToDefaultWhileLoadingTheDefaultShouldWork {
|
@Test def defaultingToDefaultWhileLoadingTheDefaultShouldWork {
|
||||||
assert(from(Config.fromMap(Map(tipe -> "Default"))).getOrElse(defaultGlobalDispatcher) == defaultGlobalDispatcher)
|
assert(from(Config.fromMap(Map())).getOrElse(defaultGlobalDispatcher) == defaultGlobalDispatcher)
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -22,9 +22,10 @@ akka {
|
||||||
serialize-messages = off # does a deep clone of (non-primitive) messages to ensure immutability
|
serialize-messages = off # does a deep clone of (non-primitive) messages to ensure immutability
|
||||||
throughput = 5 # Default throughput for all ExecutorBasedEventDrivenDispatcher
|
throughput = 5 # Default throughput for all ExecutorBasedEventDrivenDispatcher
|
||||||
default-dispatcher {
|
default-dispatcher {
|
||||||
type = "Default" # ReactorBasedSingleThreadEventDriven, ExecutorBasedEventDrivenWorkStealing, ExecutorBasedEventDriven,
|
type = "GlobalExecutorBasedEventDriven" # Must be one of the following, all "Global*" are non-configurable
|
||||||
|
# ReactorBasedSingleThreadEventDriven, (ExecutorBasedEventDrivenWorkStealing), ExecutorBasedEventDriven,
|
||||||
# ReactorBasedThreadPoolEventDriven, Hawt, GlobalReactorBasedSingleThreadEventDriven,
|
# ReactorBasedThreadPoolEventDriven, Hawt, GlobalReactorBasedSingleThreadEventDriven,
|
||||||
# GlobalReactorBasedThreadPoolEventDriven, GlobalExecutorBasedEventDriven, GlobalHawt, Default
|
# GlobalReactorBasedThreadPoolEventDriven, GlobalExecutorBasedEventDriven, GlobalHawt
|
||||||
keep-alive-ms = 60000 # Keep alive time for threads
|
keep-alive-ms = 60000 # Keep alive time for threads
|
||||||
core-pool-size = 4 # No of core threads
|
core-pool-size = 4 # No of core threads
|
||||||
max-pool-size = 16 # Max no of threads
|
max-pool-size = 16 # Max no of threads
|
||||||
|
|
@ -61,7 +62,7 @@ akka {
|
||||||
authenticator = "sample.security.BasicAuthenticationService" # The authentication service to use. Need to be overridden (uses sample now)
|
authenticator = "sample.security.BasicAuthenticationService" # The authentication service to use. Need to be overridden (uses sample now)
|
||||||
|
|
||||||
comet-dispatcher {
|
comet-dispatcher {
|
||||||
type = "Default"
|
#type = "Hawt" //uses the default dispatcher is commented out
|
||||||
}
|
}
|
||||||
|
|
||||||
#maxInactiveActivity = 60000 #Atmosphere CometSupport maxInactiveActivity
|
#maxInactiveActivity = 60000 #Atmosphere CometSupport maxInactiveActivity
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue