Removing global dispatcher
This commit is contained in:
parent
90ef28f91e
commit
5138fa9764
4 changed files with 22 additions and 30 deletions
|
|
@ -20,10 +20,10 @@ class ConfigSpec extends WordSpec with MustMatchers {
|
|||
getString("akka.time-unit") must equal(Some("seconds"))
|
||||
getString("akka.version") must equal(Some("2.0-SNAPSHOT"))
|
||||
|
||||
getString("akka.actor.default-dispatcher.type") must equal(Some("GlobalDispatcher"))
|
||||
getString("akka.actor.default-dispatcher.type") must equal(Some("Dispatcher"))
|
||||
getInt("akka.actor.default-dispatcher.keep-alive-time") must equal(Some(60))
|
||||
getDouble("akka.actor.default-dispatcher.core-pool-size-factor") must equal(Some(1.0))
|
||||
getDouble("akka.actor.default-dispatcher.max-pool-size-factor") must equal(Some(4.0))
|
||||
getDouble("akka.actor.default-dispatcher.core-pool-size-factor") must equal(Some(8.0))
|
||||
getDouble("akka.actor.default-dispatcher.max-pool-size-factor") must equal(Some(8.0))
|
||||
getInt("akka.actor.default-dispatcher.executor-bounds") must equal(Some(-1))
|
||||
getBool("akka.actor.default-dispatcher.allow-core-timeout") must equal(Some(true))
|
||||
getString("akka.actor.default-dispatcher.rejection-policy") must equal(Some("caller-runs"))
|
||||
|
|
|
|||
|
|
@ -28,8 +28,7 @@ object DispatchersSpec {
|
|||
|
||||
def typesAndValidators: Map[String, (MessageDispatcher) ⇒ Boolean] = Map(
|
||||
"BalancingDispatcher" -> ofType[BalancingDispatcher],
|
||||
"Dispatcher" -> ofType[Dispatcher],
|
||||
"GlobalDispatcher" -> instance(globalDispatcher))
|
||||
"Dispatcher" -> ofType[Dispatcher])
|
||||
|
||||
def validTypes = typesAndValidators.keys.toList
|
||||
|
||||
|
|
|
|||
|
|
@ -56,11 +56,8 @@ object Dispatchers {
|
|||
val THROUGHPUT_DEADLINE_TIME_MILLIS = THROUGHPUT_DEADLINE_TIME.toMillis.toInt
|
||||
val MAILBOX_TYPE: MailboxType = if (MAILBOX_CAPACITY < 1) UnboundedMailbox() else BoundedMailbox()
|
||||
|
||||
lazy val defaultGlobalDispatcher = {
|
||||
config.getSection("akka.actor.default-dispatcher").flatMap(from).getOrElse(globalDispatcher)
|
||||
}
|
||||
|
||||
object globalDispatcher extends Dispatcher("global", THROUGHPUT, THROUGHPUT_DEADLINE_TIME_MILLIS, MAILBOX_TYPE)
|
||||
lazy val defaultGlobalDispatcher =
|
||||
config.getSection("akka.actor.default-dispatcher").flatMap(from) getOrElse newDispatcher("AkkaDefaultGlobalDispatcher", 1, MAILBOX_TYPE).build
|
||||
|
||||
/**
|
||||
* Creates an thread based dispatcher serving a single actor through the same single thread.
|
||||
|
|
@ -171,11 +168,11 @@ object Dispatchers {
|
|||
* Creates of obtains a dispatcher from a ConfigMap according to the format below
|
||||
*
|
||||
* default-dispatcher {
|
||||
* type = "GlobalExecutorBasedEventDriven" # Must be one of the following, all "Global*" are non-configurable
|
||||
* # (ExecutorBasedEventDrivenWorkStealing), ExecutorBasedEventDriven,
|
||||
* # GlobalExecutorBasedEventDriven
|
||||
* type = "Dispatcher" # Must be one of the following
|
||||
* # Dispatcher, (BalancingDispatcher, only valid when all actors using it are of the same type),
|
||||
* # A FQCN to a class inheriting MessageDispatcherConfigurator with a no-arg visible constructor
|
||||
* keep-alive-time = 60 # Keep alive time for threads
|
||||
* name = "MyDispatcher" # Optional, will be a generated UUID if omitted
|
||||
* keep-alive-time = 60 # Keep alive time for threads in akka.time-unit
|
||||
* core-pool-size-factor = 1.0 # No of core threads ... ceil(available processors * factor)
|
||||
* max-pool-size-factor = 4.0 # Max no of threads ... ceil(available processors * factor)
|
||||
* executor-bounds = -1 # Makes the Executor bounded, -1 is unbounded
|
||||
|
|
@ -188,18 +185,18 @@ object Dispatchers {
|
|||
* Gotcha: Only configures the dispatcher if possible
|
||||
* Returns: None if "type" isn't specified in the config
|
||||
* Throws: IllegalArgumentException if the value of "type" is not valid
|
||||
* IllegalArgumentException if it cannot
|
||||
* IllegalArgumentException if it cannot create the MessageDispatcherConfigurator
|
||||
*/
|
||||
def from(cfg: Configuration): Option[MessageDispatcher] = {
|
||||
cfg.getString("type") map {
|
||||
case "Dispatcher" ⇒ new DispatcherConfigurator()
|
||||
case "BalancingDispatcher" ⇒ new BalancingDispatcherConfigurator()
|
||||
case "GlobalDispatcher" ⇒ GlobalDispatcherConfigurator
|
||||
cfg.getString("type") flatMap {
|
||||
case "Dispatcher" ⇒ Some(new DispatcherConfigurator())
|
||||
case "BalancingDispatcher" ⇒ Some(new BalancingDispatcherConfigurator())
|
||||
case "GlobalDispatcher" ⇒ None //TODO FIXME remove this
|
||||
case fqn ⇒
|
||||
ReflectiveAccess.getClassFor[MessageDispatcherConfigurator](fqn) match {
|
||||
case Right(clazz) ⇒
|
||||
ReflectiveAccess.createInstance[MessageDispatcherConfigurator](clazz, Array[Class[_]](), Array[AnyRef]()) match {
|
||||
case Right(configurator) ⇒ configurator
|
||||
case Right(configurator) ⇒ Some(configurator)
|
||||
case Left(exception) ⇒
|
||||
throw new IllegalArgumentException(
|
||||
"Cannot instantiate MessageDispatcherConfigurator type [%s], make sure it has a default no-args constructor" format fqn, exception)
|
||||
|
|
@ -213,10 +210,6 @@ object Dispatchers {
|
|||
}
|
||||
}
|
||||
|
||||
object GlobalDispatcherConfigurator extends MessageDispatcherConfigurator {
|
||||
def configure(config: Configuration): MessageDispatcher = Dispatchers.globalDispatcher
|
||||
}
|
||||
|
||||
class DispatcherConfigurator extends MessageDispatcherConfigurator {
|
||||
def configure(config: Configuration): MessageDispatcher = {
|
||||
configureThreadPool(config, threadPoolConfig ⇒ new Dispatcher(
|
||||
|
|
|
|||
|
|
@ -81,13 +81,13 @@ akka {
|
|||
}
|
||||
|
||||
default-dispatcher {
|
||||
type = "GlobalDispatcher" # Must be one of the following, all "Global*" are non-configurable
|
||||
# - Dispatcher
|
||||
# - BalancingDispatcher
|
||||
# - GlobalDispatcher
|
||||
type = "Dispatcher" # Must be one of the following
|
||||
# Dispatcher, (BalancingDispatcher, only valid when all actors using it are of the same type),
|
||||
# A FQCN to a class inheriting MessageDispatcherConfigurator with a no-arg visible constructor
|
||||
name = "MyDispatcher" # Optional, will be a generated UUID if omitted
|
||||
keep-alive-time = 60 # Keep alive time for threads
|
||||
core-pool-size-factor = 1.0 # No of core threads ... ceil(available processors * factor)
|
||||
max-pool-size-factor = 4.0 # Max no of threads ... ceil(available processors * factor)
|
||||
core-pool-size-factor = 8.0 # No of core threads ... ceil(available processors * factor)
|
||||
max-pool-size-factor = 8.0 # Max no of threads ... ceil(available processors * factor)
|
||||
executor-bounds = -1 # Makes the Executor bounded, -1 is unbounded
|
||||
allow-core-timeout = on # Allow core threads to time out
|
||||
rejection-policy = "caller-runs" # abort, caller-runs, discard-oldest, discard
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue