From b6444b12a5776598dd51f522e8065ced787e6025 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Fri, 13 Aug 2010 13:13:27 +0200 Subject: [PATCH] Added tests are fixed some bugs --- .../src/main/scala/dispatch/Dispatchers.scala | 97 +++++++++++-------- .../scala/dispatch/ThreadPoolBuilder.scala | 57 ++++++----- .../test/scala/dispatch/DispatchersSpec.scala | 70 +++++++++++++ config/akka-reference.conf | 20 ++-- 4 files changed, 176 insertions(+), 68 deletions(-) create mode 100644 akka-core/src/test/scala/dispatch/DispatchersSpec.scala diff --git a/akka-core/src/main/scala/dispatch/Dispatchers.scala b/akka-core/src/main/scala/dispatch/Dispatchers.scala index 06fd4e9699..79861086fc 100644 --- a/akka-core/src/main/scala/dispatch/Dispatchers.scala +++ b/akka-core/src/main/scala/dispatch/Dispatchers.scala @@ -46,8 +46,9 @@ import java.util.concurrent.ThreadPoolExecutor.{AbortPolicy, CallerRunsPolicy, D object Dispatchers extends Logging { val THROUGHPUT = config.getInt("akka.actor.throughput", 5) - - val defaultGlobalDispatcher = fromConfig("akka.actor.default-dispatcher",globalExecutorBasedEventDrivenDispatcher) + lazy val defaultGlobalDispatcher = { + config.getConfigMap("akka.actor.default-dispatcher").flatMap(from).getOrElse(globalExecutorBasedEventDrivenDispatcher) + } object globalHawtDispatcher extends HawtDispatcher @@ -111,23 +112,40 @@ object Dispatchers extends Logging { */ def newThreadBasedDispatcher(actor: ActorRef) = new ThreadBasedDispatcher(actor) - def fromConfig(identifier: String, defaultDispatcher: => MessageDispatcher = defaultGlobalDispatcher): MessageDispatcher = { - config.getConfigMap(identifier).map(from).getOrElse(defaultDispatcher) - } + /* + * Creates of obtains a dispatcher from a ConfigMap according to the format below + * + * default-dispatcher { + * type = "Default" # ReactorBasedSingleThreadEventDriven, ExecutorBasedEventDrivenWorkStealing, ExecutorBasedEventDriven, + * # ReactorBasedThreadPoolEventDriven, Hawt, GlobalReactorBasedSingleThreadEventDriven, + * # GlobalReactorBasedThreadPoolEventDriven, GlobalExecutorBasedEventDriven, GlobalHawt, Default + * keep-alive-ms = 60000 # Keep alive time for threads + * core-pool-size = 4 # No of core threads + * max-pool-size = 16 # Max no of threads + * allow-core-timeout = on # Allow core threads to time out + * rejection-policy = "caller-runs" # abort, caller-runs, discard-oldest, discard + * throughput = 5 # Throughput for ExecutorBasedEventDrivenDispatcher + * aggregate = off # Aggregate on/off for HawtDispatchers + * } + * ex: from(config.getConfigMap(identifier).get) + * + * Gotcha: Only configures the dispatcher if possible + * Returns: None if "type" isn't specified in the config + * Throws: IllegalArgumentException if the value of "type" is not valid + */ + def from(cfg: ConfigMap): Option[MessageDispatcher] = { + lazy val name = cfg.getString("name",UUID.newUuid.toString) - def from(map: ConfigMap): MessageDispatcher = { - lazy val name = map.getString("name",UUID.newUuid.toString) - - val dispatcher = map.getString("type").map({ + val dispatcher: Option[MessageDispatcher] = cfg.getString("type") map { case "ReactorBasedSingleThreadEventDriven" => newReactorBasedSingleThreadEventDrivenDispatcher(name) case "ExecutorBasedEventDrivenWorkStealing" => newExecutorBasedEventDrivenWorkStealingDispatcher(name) - case "ExecutorBasedEventDriven" => newExecutorBasedEventDrivenDispatcher(name,map.getInt("throughput",THROUGHPUT)) + case "ExecutorBasedEventDriven" => newExecutorBasedEventDrivenDispatcher(name,cfg.getInt("throughput",THROUGHPUT)) case "ReactorBasedThreadPoolEventDriven" => newReactorBasedThreadPoolEventDrivenDispatcher(name) - case "Hawt" => newHawtDispatcher(map.getBool("aggregate").getOrElse(true)) + case "Hawt" => newHawtDispatcher(cfg.getBool("aggregate").getOrElse(true)) case "GlobalReactorBasedSingleThreadEventDriven" => globalReactorBasedSingleThreadEventDrivenDispatcher @@ -137,37 +155,38 @@ object Dispatchers extends Logging { case "GlobalHawt" => globalHawtDispatcher - case "Default" => defaultGlobalDispatcher - + case "Default" => globalExecutorBasedEventDrivenDispatcher + case unknown => throw new IllegalArgumentException("Unknown dispatcher type %s" format unknown) - - }).get - - if(dispatcher.isInstanceOf[ThreadPoolBuilder]) { - val configurable = dispatcher.asInstanceOf[ThreadPoolBuilder] - - map.getInt("keep-alive-ms").foreach(configurable.setKeepAliveTimeInMillis(_)) - - map.getInt("core-pool-size").foreach(configurable.setCorePoolSize(_)) - - map.getInt("max-pool-size").foreach(configurable.setMaxPoolSize(_)) - - map.getString("rejection-policy").map(_ match { - - case "abort" => new AbortPolicy() - - case "caller-runs" => new CallerRunsPolicy() - - case "discard-oldest" => new DiscardOldestPolicy() - - case "discard" => new DiscardPolicy() - - case x => throw new IllegalArgumentException("[%s] is not a valid rejectionPolicy!" format x) - - }).foreach(configurable.setRejectionPolicy(_)) } - log.info("Dispatchers.from: %s:%s for %s",dispatcher.getClass.getName,dispatcher,map.getString("type").getOrElse("")) + dispatcher foreach { + case d: ThreadPoolBuilder => d.configureIfPossible( builder => { + + cfg.getInt("keep-alive-ms").foreach(builder.setKeepAliveTimeInMillis(_)) + + cfg.getInt("core-pool-size").foreach(builder.setCorePoolSize(_)) + + cfg.getInt("max-pool-size").foreach(builder.setMaxPoolSize(_)) + + cfg.getBool("allow-core-timeout").foreach(builder.setAllowCoreThreadTimeout(_)) + + cfg.getString("rejection-policy").map({ + + case "abort" => new AbortPolicy() + + case "caller-runs" => new CallerRunsPolicy() + + case "discard-oldest" => new DiscardOldestPolicy() + + case "discard" => new DiscardPolicy() + + case x => throw new IllegalArgumentException("[%s] is not a valid rejectionPolicy!" format x) + + }).foreach(builder.setRejectionPolicy(_)) + }) + case _ => + } dispatcher } diff --git a/akka-core/src/main/scala/dispatch/ThreadPoolBuilder.scala b/akka-core/src/main/scala/dispatch/ThreadPoolBuilder.scala index 6f8da64073..9439080010 100644 --- a/akka-core/src/main/scala/dispatch/ThreadPoolBuilder.scala +++ b/akka-core/src/main/scala/dispatch/ThreadPoolBuilder.scala @@ -10,7 +10,7 @@ import atomic.{AtomicLong, AtomicInteger} import ThreadPoolExecutor.CallerRunsPolicy import se.scalablesolutions.akka.actor.IllegalActorStateException -import se.scalablesolutions.akka.util.Logging +import se.scalablesolutions.akka.util.{Logger, Logging} trait ThreadPoolBuilder { val name: String @@ -22,7 +22,7 @@ trait ThreadPoolBuilder { private var threadPoolBuilder: ThreadPoolExecutor = _ private var boundedExecutorBound = -1 - private var inProcessOfBuilding = false + @volatile private var inProcessOfBuilding = false private var blockingQueue: BlockingQueue[Runnable] = _ private lazy val threadFactory = new MonitorableThreadFactory(name) @@ -34,7 +34,6 @@ trait ThreadPoolBuilder { def buildThreadPool(): Unit = synchronized { ensureNotActive inProcessOfBuilding = false - threadPoolBuilder.allowCoreThreadTimeOut(true) if (boundedExecutorBound > 0) { val boundedExecutor = new BoundedExecutorDecorator(threadPoolBuilder, boundedExecutorBound) boundedExecutorBound = -1 @@ -103,46 +102,58 @@ trait ThreadPoolBuilder { this } + def configureIfPossible(f: (ThreadPoolBuilder) => Unit): Boolean = synchronized { + if(inProcessOfBuilding) { + f(this) + true + } + else { + Logger(getClass).info("Tried to configure an already started ThreadPoolBuilder") + false + } + } + /** * Default is 16. */ - def setCorePoolSize(size: Int): ThreadPoolBuilder = synchronized { - ensureNotActive - verifyInConstructionPhase - threadPoolBuilder.setCorePoolSize(size) - this - } + def setCorePoolSize(size: Int): ThreadPoolBuilder = + setThreadPoolExecutorProperty(_.setCorePoolSize(size)) /** * Default is 128. */ - def setMaxPoolSize(size: Int): ThreadPoolBuilder = synchronized { - ensureNotActive - verifyInConstructionPhase - threadPoolBuilder.setMaximumPoolSize(size) - this - } + def setMaxPoolSize(size: Int): ThreadPoolBuilder = + setThreadPoolExecutorProperty(_.setMaximumPoolSize(size)) /** * Default is 60000 (one minute). */ - def setKeepAliveTimeInMillis(time: Long): ThreadPoolBuilder = synchronized { - ensureNotActive - verifyInConstructionPhase - threadPoolBuilder.setKeepAliveTime(time, MILLISECONDS) - this - } + def setKeepAliveTimeInMillis(time: Long): ThreadPoolBuilder = + setThreadPoolExecutorProperty(_.setKeepAliveTime(time, MILLISECONDS)) /** * Default ThreadPoolExecutor.CallerRunsPolicy. To allow graceful backing off when pool is overloaded. */ - def setRejectionPolicy(policy: RejectedExecutionHandler): ThreadPoolBuilder = synchronized { + def setRejectionPolicy(policy: RejectedExecutionHandler): ThreadPoolBuilder = + setThreadPoolExecutorProperty(_.setRejectedExecutionHandler(policy)) + + /** + * Default false, set to true to conserve thread for potentially unused dispatchers + */ + def setAllowCoreThreadTimeout(allow: Boolean) = + setThreadPoolExecutorProperty(_.allowCoreThreadTimeOut(allow)) + + /** + * Default ThreadPoolExecutor.CallerRunsPolicy. To allow graceful backing off when pool is overloaded. + */ + protected def setThreadPoolExecutorProperty(f: (ThreadPoolExecutor) => Unit): ThreadPoolBuilder = synchronized { ensureNotActive verifyInConstructionPhase - threadPoolBuilder.setRejectedExecutionHandler(policy) + f(threadPoolBuilder) this } + protected def verifyNotInConstructionPhase = { if (inProcessOfBuilding) throw new IllegalActorStateException("Is already in the process of building a thread pool") inProcessOfBuilding = true diff --git a/akka-core/src/test/scala/dispatch/DispatchersSpec.scala b/akka-core/src/test/scala/dispatch/DispatchersSpec.scala new file mode 100644 index 0000000000..a0eb9c6889 --- /dev/null +++ b/akka-core/src/test/scala/dispatch/DispatchersSpec.scala @@ -0,0 +1,70 @@ +/** + * Copyright (C) 2009-2010 Scalable Solutions AB + */ +package se.scalablesolutions.akka.actor.dispatch + +import java.util.concurrent.{CountDownLatch, TimeUnit} +import org.scalatest.junit.JUnitSuite +import org.junit.Test + +import net.lag.configgy.Config +import scala.reflect.{Manifest} +import se.scalablesolutions.akka.dispatch._ + +object DispatchersSpec { + import Dispatchers._ + // + val tipe = "type" + val keepalivems = "keep-alive-ms" + val corepoolsize = "core-pool-size" + val maxpoolsize = "max-pool-size" + val allowcoretimeout = "allow-core-timeout" + val rejectionpolicy = "rejection-policy" // abort, caller-runs, discard-oldest, discard + val throughput = "throughput" // Throughput for ExecutorBasedEventDrivenDispatcher + val aggregate = "aggregate" // Aggregate on/off for HawtDispatchers + + def instance(dispatcher: MessageDispatcher): (MessageDispatcher) => Boolean = _ == dispatcher + def ofType[T <: MessageDispatcher : Manifest]: (MessageDispatcher) => Boolean = _.getClass == manifest[T].erasure + + def typesAndValidators: Map[String,(MessageDispatcher) => Boolean] = Map( + "ReactorBasedSingleThreadEventDriven" -> ofType[ReactorBasedSingleThreadEventDrivenDispatcher], + "ExecutorBasedEventDrivenWorkStealing" -> ofType[ExecutorBasedEventDrivenWorkStealingDispatcher], + "ExecutorBasedEventDriven" -> ofType[ExecutorBasedEventDrivenDispatcher], + "ReactorBasedThreadPoolEventDriven" -> ofType[ReactorBasedThreadPoolEventDrivenDispatcher], + "Hawt" -> ofType[HawtDispatcher], + "GlobalReactorBasedSingleThreadEventDriven" -> instance(globalReactorBasedSingleThreadEventDrivenDispatcher), + "GlobalReactorBasedThreadPoolEventDriven" -> instance(globalReactorBasedThreadPoolEventDrivenDispatcher), + "GlobalExecutorBasedEventDriven" -> instance(globalExecutorBasedEventDrivenDispatcher), + "GlobalHawt" -> instance(globalHawtDispatcher), + "Default" -> instance(globalExecutorBasedEventDrivenDispatcher) + ) + + def validTypes = typesAndValidators.keys.toList + + lazy val allDispatchers: Map[String,Option[MessageDispatcher]] = { + validTypes.map(t => (t,from(Config.fromMap(Map(tipe -> t))))).toMap + } +} + +class DispatchersSpec extends JUnitSuite { + + import Dispatchers._ + import DispatchersSpec._ + + @Test def shouldYieldNoneIfTypeIsMissing { + assert(from(Config.fromMap(Map())) === None) + } + + @Test(expected = classOf[IllegalArgumentException]) + def shouldThrowIllegalArgumentExceptionIfTypeDoesntExist { + from(Config.fromMap(Map(tipe -> "typedoesntexist"))) + } + + @Test def shouldGetTheCorrectTypesOfDispatchers { + //It can create/obtain all defined types + assert(allDispatchers.values.forall(_.isDefined)) + //All created/obtained dispatchers are of the expeced type/instance + assert(typesAndValidators.forall( tuple => tuple._2(allDispatchers(tuple._1).get) )) + } + +} diff --git a/config/akka-reference.conf b/config/akka-reference.conf index 7e9c1b13bb..7b05c4e7de 100644 --- a/config/akka-reference.conf +++ b/config/akka-reference.conf @@ -18,11 +18,20 @@ akka { "sample.security.Boot"] actor { - timeout = 5 # default timeout for future based invocations - serialize-messages = off # does a deep clone of (non-primitive) messages to ensure immutability - throughput = 5 # default throughput for ExecutorBasedEventDrivenDispatcher + timeout = 5 # default timeout for future based invocations + serialize-messages = off # does a deep clone of (non-primitive) messages to ensure immutability + throughput = 5 # Default throughput for all ExecutorBasedEventDrivenDispatcher default-dispatcher { - type = "Default" + type = "Default" # ReactorBasedSingleThreadEventDriven, ExecutorBasedEventDrivenWorkStealing, ExecutorBasedEventDriven, + # ReactorBasedThreadPoolEventDriven, Hawt, GlobalReactorBasedSingleThreadEventDriven, + # GlobalReactorBasedThreadPoolEventDriven, GlobalExecutorBasedEventDriven, GlobalHawt, Default + keep-alive-ms = 60000 # Keep alive time for threads + core-pool-size = 4 # No of core threads + max-pool-size = 16 # Max no of threads + allow-core-timeout = on # Allow core threads to time out + rejection-policy = "caller-runs" # abort, caller-runs, discard-oldest, discard + throughput = 5 # Throughput for ExecutorBasedEventDrivenDispatcher + aggregate = off # Aggregate on/off for HawtDispatchers } } @@ -52,8 +61,7 @@ akka { authenticator = "sample.security.BasicAuthenticationService" # The authentication service to use. Need to be overridden (uses sample now) comet-dispatcher { - type = "Hawt" - aggregate = on + type = "Default" } #maxInactiveActivity = 60000 #Atmosphere CometSupport maxInactiveActivity