Added tests are fixed some bugs

This commit is contained in:
Viktor Klang 2010-08-13 13:13:27 +02:00
parent f0ac45f374
commit b6444b12a5
4 changed files with 176 additions and 68 deletions

View file

@ -46,8 +46,9 @@ import java.util.concurrent.ThreadPoolExecutor.{AbortPolicy, CallerRunsPolicy, D
object Dispatchers extends Logging {
val THROUGHPUT = config.getInt("akka.actor.throughput", 5)
val defaultGlobalDispatcher = fromConfig("akka.actor.default-dispatcher",globalExecutorBasedEventDrivenDispatcher)
lazy val defaultGlobalDispatcher = {
config.getConfigMap("akka.actor.default-dispatcher").flatMap(from).getOrElse(globalExecutorBasedEventDrivenDispatcher)
}
object globalHawtDispatcher extends HawtDispatcher
@ -111,23 +112,40 @@ object Dispatchers extends Logging {
*/
def newThreadBasedDispatcher(actor: ActorRef) = new ThreadBasedDispatcher(actor)
def fromConfig(identifier: String, defaultDispatcher: => MessageDispatcher = defaultGlobalDispatcher): MessageDispatcher = {
config.getConfigMap(identifier).map(from).getOrElse(defaultDispatcher)
}
/*
* Creates of obtains a dispatcher from a ConfigMap according to the format below
*
* default-dispatcher {
* type = "Default" # ReactorBasedSingleThreadEventDriven, ExecutorBasedEventDrivenWorkStealing, ExecutorBasedEventDriven,
* # ReactorBasedThreadPoolEventDriven, Hawt, GlobalReactorBasedSingleThreadEventDriven,
* # GlobalReactorBasedThreadPoolEventDriven, GlobalExecutorBasedEventDriven, GlobalHawt, Default
* keep-alive-ms = 60000 # Keep alive time for threads
* core-pool-size = 4 # No of core threads
* max-pool-size = 16 # Max no of threads
* allow-core-timeout = on # Allow core threads to time out
* rejection-policy = "caller-runs" # abort, caller-runs, discard-oldest, discard
* throughput = 5 # Throughput for ExecutorBasedEventDrivenDispatcher
* aggregate = off # Aggregate on/off for HawtDispatchers
* }
* ex: from(config.getConfigMap(identifier).get)
*
* Gotcha: Only configures the dispatcher if possible
* Returns: None if "type" isn't specified in the config
* Throws: IllegalArgumentException if the value of "type" is not valid
*/
def from(cfg: ConfigMap): Option[MessageDispatcher] = {
lazy val name = cfg.getString("name",UUID.newUuid.toString)
def from(map: ConfigMap): MessageDispatcher = {
lazy val name = map.getString("name",UUID.newUuid.toString)
val dispatcher = map.getString("type").map({
val dispatcher: Option[MessageDispatcher] = cfg.getString("type") map {
case "ReactorBasedSingleThreadEventDriven" => newReactorBasedSingleThreadEventDrivenDispatcher(name)
case "ExecutorBasedEventDrivenWorkStealing" => newExecutorBasedEventDrivenWorkStealingDispatcher(name)
case "ExecutorBasedEventDriven" => newExecutorBasedEventDrivenDispatcher(name,map.getInt("throughput",THROUGHPUT))
case "ExecutorBasedEventDriven" => newExecutorBasedEventDrivenDispatcher(name,cfg.getInt("throughput",THROUGHPUT))
case "ReactorBasedThreadPoolEventDriven" => newReactorBasedThreadPoolEventDrivenDispatcher(name)
case "Hawt" => newHawtDispatcher(map.getBool("aggregate").getOrElse(true))
case "Hawt" => newHawtDispatcher(cfg.getBool("aggregate").getOrElse(true))
case "GlobalReactorBasedSingleThreadEventDriven" => globalReactorBasedSingleThreadEventDrivenDispatcher
@ -137,22 +155,23 @@ object Dispatchers extends Logging {
case "GlobalHawt" => globalHawtDispatcher
case "Default" => defaultGlobalDispatcher
case "Default" => globalExecutorBasedEventDrivenDispatcher
case unknown => throw new IllegalArgumentException("Unknown dispatcher type %s" format unknown)
}
}).get
dispatcher foreach {
case d: ThreadPoolBuilder => d.configureIfPossible( builder => {
if(dispatcher.isInstanceOf[ThreadPoolBuilder]) {
val configurable = dispatcher.asInstanceOf[ThreadPoolBuilder]
cfg.getInt("keep-alive-ms").foreach(builder.setKeepAliveTimeInMillis(_))
map.getInt("keep-alive-ms").foreach(configurable.setKeepAliveTimeInMillis(_))
cfg.getInt("core-pool-size").foreach(builder.setCorePoolSize(_))
map.getInt("core-pool-size").foreach(configurable.setCorePoolSize(_))
cfg.getInt("max-pool-size").foreach(builder.setMaxPoolSize(_))
map.getInt("max-pool-size").foreach(configurable.setMaxPoolSize(_))
cfg.getBool("allow-core-timeout").foreach(builder.setAllowCoreThreadTimeout(_))
map.getString("rejection-policy").map(_ match {
cfg.getString("rejection-policy").map({
case "abort" => new AbortPolicy()
@ -164,11 +183,11 @@ object Dispatchers extends Logging {
case x => throw new IllegalArgumentException("[%s] is not a valid rejectionPolicy!" format x)
}).foreach(configurable.setRejectionPolicy(_))
}).foreach(builder.setRejectionPolicy(_))
})
case _ =>
}
log.info("Dispatchers.from: %s:%s for %s",dispatcher.getClass.getName,dispatcher,map.getString("type").getOrElse("<null>"))
dispatcher
}
}

View file

@ -10,7 +10,7 @@ import atomic.{AtomicLong, AtomicInteger}
import ThreadPoolExecutor.CallerRunsPolicy
import se.scalablesolutions.akka.actor.IllegalActorStateException
import se.scalablesolutions.akka.util.Logging
import se.scalablesolutions.akka.util.{Logger, Logging}
trait ThreadPoolBuilder {
val name: String
@ -22,7 +22,7 @@ trait ThreadPoolBuilder {
private var threadPoolBuilder: ThreadPoolExecutor = _
private var boundedExecutorBound = -1
private var inProcessOfBuilding = false
@volatile private var inProcessOfBuilding = false
private var blockingQueue: BlockingQueue[Runnable] = _
private lazy val threadFactory = new MonitorableThreadFactory(name)
@ -34,7 +34,6 @@ trait ThreadPoolBuilder {
def buildThreadPool(): Unit = synchronized {
ensureNotActive
inProcessOfBuilding = false
threadPoolBuilder.allowCoreThreadTimeOut(true)
if (boundedExecutorBound > 0) {
val boundedExecutor = new BoundedExecutorDecorator(threadPoolBuilder, boundedExecutorBound)
boundedExecutorBound = -1
@ -103,46 +102,58 @@ trait ThreadPoolBuilder {
this
}
def configureIfPossible(f: (ThreadPoolBuilder) => Unit): Boolean = synchronized {
if(inProcessOfBuilding) {
f(this)
true
}
else {
Logger(getClass).info("Tried to configure an already started ThreadPoolBuilder")
false
}
}
/**
* Default is 16.
*/
def setCorePoolSize(size: Int): ThreadPoolBuilder = synchronized {
ensureNotActive
verifyInConstructionPhase
threadPoolBuilder.setCorePoolSize(size)
this
}
def setCorePoolSize(size: Int): ThreadPoolBuilder =
setThreadPoolExecutorProperty(_.setCorePoolSize(size))
/**
* Default is 128.
*/
def setMaxPoolSize(size: Int): ThreadPoolBuilder = synchronized {
ensureNotActive
verifyInConstructionPhase
threadPoolBuilder.setMaximumPoolSize(size)
this
}
def setMaxPoolSize(size: Int): ThreadPoolBuilder =
setThreadPoolExecutorProperty(_.setMaximumPoolSize(size))
/**
* Default is 60000 (one minute).
*/
def setKeepAliveTimeInMillis(time: Long): ThreadPoolBuilder = synchronized {
ensureNotActive
verifyInConstructionPhase
threadPoolBuilder.setKeepAliveTime(time, MILLISECONDS)
this
}
def setKeepAliveTimeInMillis(time: Long): ThreadPoolBuilder =
setThreadPoolExecutorProperty(_.setKeepAliveTime(time, MILLISECONDS))
/**
* Default ThreadPoolExecutor.CallerRunsPolicy. To allow graceful backing off when pool is overloaded.
*/
def setRejectionPolicy(policy: RejectedExecutionHandler): ThreadPoolBuilder = synchronized {
def setRejectionPolicy(policy: RejectedExecutionHandler): ThreadPoolBuilder =
setThreadPoolExecutorProperty(_.setRejectedExecutionHandler(policy))
/**
* Default false, set to true to conserve thread for potentially unused dispatchers
*/
def setAllowCoreThreadTimeout(allow: Boolean) =
setThreadPoolExecutorProperty(_.allowCoreThreadTimeOut(allow))
/**
* Default ThreadPoolExecutor.CallerRunsPolicy. To allow graceful backing off when pool is overloaded.
*/
protected def setThreadPoolExecutorProperty(f: (ThreadPoolExecutor) => Unit): ThreadPoolBuilder = synchronized {
ensureNotActive
verifyInConstructionPhase
threadPoolBuilder.setRejectedExecutionHandler(policy)
f(threadPoolBuilder)
this
}
protected def verifyNotInConstructionPhase = {
if (inProcessOfBuilding) throw new IllegalActorStateException("Is already in the process of building a thread pool")
inProcessOfBuilding = true

View file

@ -0,0 +1,70 @@
/**
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.actor.dispatch
import java.util.concurrent.{CountDownLatch, TimeUnit}
import org.scalatest.junit.JUnitSuite
import org.junit.Test
import net.lag.configgy.Config
import scala.reflect.{Manifest}
import se.scalablesolutions.akka.dispatch._
object DispatchersSpec {
import Dispatchers._
//
val tipe = "type"
val keepalivems = "keep-alive-ms"
val corepoolsize = "core-pool-size"
val maxpoolsize = "max-pool-size"
val allowcoretimeout = "allow-core-timeout"
val rejectionpolicy = "rejection-policy" // abort, caller-runs, discard-oldest, discard
val throughput = "throughput" // Throughput for ExecutorBasedEventDrivenDispatcher
val aggregate = "aggregate" // Aggregate on/off for HawtDispatchers
def instance(dispatcher: MessageDispatcher): (MessageDispatcher) => Boolean = _ == dispatcher
def ofType[T <: MessageDispatcher : Manifest]: (MessageDispatcher) => Boolean = _.getClass == manifest[T].erasure
def typesAndValidators: Map[String,(MessageDispatcher) => Boolean] = Map(
"ReactorBasedSingleThreadEventDriven" -> ofType[ReactorBasedSingleThreadEventDrivenDispatcher],
"ExecutorBasedEventDrivenWorkStealing" -> ofType[ExecutorBasedEventDrivenWorkStealingDispatcher],
"ExecutorBasedEventDriven" -> ofType[ExecutorBasedEventDrivenDispatcher],
"ReactorBasedThreadPoolEventDriven" -> ofType[ReactorBasedThreadPoolEventDrivenDispatcher],
"Hawt" -> ofType[HawtDispatcher],
"GlobalReactorBasedSingleThreadEventDriven" -> instance(globalReactorBasedSingleThreadEventDrivenDispatcher),
"GlobalReactorBasedThreadPoolEventDriven" -> instance(globalReactorBasedThreadPoolEventDrivenDispatcher),
"GlobalExecutorBasedEventDriven" -> instance(globalExecutorBasedEventDrivenDispatcher),
"GlobalHawt" -> instance(globalHawtDispatcher),
"Default" -> instance(globalExecutorBasedEventDrivenDispatcher)
)
def validTypes = typesAndValidators.keys.toList
lazy val allDispatchers: Map[String,Option[MessageDispatcher]] = {
validTypes.map(t => (t,from(Config.fromMap(Map(tipe -> t))))).toMap
}
}
class DispatchersSpec extends JUnitSuite {
import Dispatchers._
import DispatchersSpec._
@Test def shouldYieldNoneIfTypeIsMissing {
assert(from(Config.fromMap(Map())) === None)
}
@Test(expected = classOf[IllegalArgumentException])
def shouldThrowIllegalArgumentExceptionIfTypeDoesntExist {
from(Config.fromMap(Map(tipe -> "typedoesntexist")))
}
@Test def shouldGetTheCorrectTypesOfDispatchers {
//It can create/obtain all defined types
assert(allDispatchers.values.forall(_.isDefined))
//All created/obtained dispatchers are of the expeced type/instance
assert(typesAndValidators.forall( tuple => tuple._2(allDispatchers(tuple._1).get) ))
}
}

View file

@ -20,9 +20,18 @@ akka {
actor {
timeout = 5 # default timeout for future based invocations
serialize-messages = off # does a deep clone of (non-primitive) messages to ensure immutability
throughput = 5 # default throughput for ExecutorBasedEventDrivenDispatcher
throughput = 5 # Default throughput for all ExecutorBasedEventDrivenDispatcher
default-dispatcher {
type = "Default"
type = "Default" # ReactorBasedSingleThreadEventDriven, ExecutorBasedEventDrivenWorkStealing, ExecutorBasedEventDriven,
# ReactorBasedThreadPoolEventDriven, Hawt, GlobalReactorBasedSingleThreadEventDriven,
# GlobalReactorBasedThreadPoolEventDriven, GlobalExecutorBasedEventDriven, GlobalHawt, Default
keep-alive-ms = 60000 # Keep alive time for threads
core-pool-size = 4 # No of core threads
max-pool-size = 16 # Max no of threads
allow-core-timeout = on # Allow core threads to time out
rejection-policy = "caller-runs" # abort, caller-runs, discard-oldest, discard
throughput = 5 # Throughput for ExecutorBasedEventDrivenDispatcher
aggregate = off # Aggregate on/off for HawtDispatchers
}
}
@ -52,8 +61,7 @@ akka {
authenticator = "sample.security.BasicAuthenticationService" # The authentication service to use. Need to be overridden (uses sample now)
comet-dispatcher {
type = "Hawt"
aggregate = on
type = "Default"
}
#maxInactiveActivity = 60000 #Atmosphere CometSupport maxInactiveActivity