Fixing Dispatcher config bug #422

This commit is contained in:
Viktor Klang 2010-09-04 12:00:12 +02:00
parent 2286e96470
commit 930a3afa7d
5 changed files with 45 additions and 26 deletions

View file

@ -177,23 +177,8 @@ object Dispatchers extends Logging {
def from(cfg: ConfigMap): Option[MessageDispatcher] = {
lazy val name = cfg.getString("name", UUID.newUuid.toString)
val dispatcher: Option[MessageDispatcher] = cfg.getString("type") map {
case "ReactorBasedSingleThreadEventDriven" => newReactorBasedSingleThreadEventDrivenDispatcher(name)
case "ExecutorBasedEventDrivenWorkStealing" => newExecutorBasedEventDrivenWorkStealingDispatcher(name)
case "ExecutorBasedEventDriven" => newExecutorBasedEventDrivenDispatcher(name,cfg.getInt("throughput",THROUGHPUT))
case "ReactorBasedThreadPoolEventDriven" => newReactorBasedThreadPoolEventDrivenDispatcher(name)
case "Hawt" => newHawtDispatcher(cfg.getBool("aggregate").getOrElse(true))
case "GlobalReactorBasedSingleThreadEventDriven" => globalReactorBasedSingleThreadEventDrivenDispatcher
case "GlobalReactorBasedThreadPoolEventDriven" => globalReactorBasedThreadPoolEventDrivenDispatcher
case "GlobalExecutorBasedEventDriven" => globalExecutorBasedEventDrivenDispatcher
case "GlobalHawt" => globalHawtDispatcher
case unknown => throw new IllegalArgumentException("Unknown dispatcher type [%s]" format unknown)
}
dispatcher foreach {
case d: ThreadPoolBuilder => d.configureIfPossible( builder => {
def threadPoolConfig(b: ThreadPoolBuilder) {
b.configureIfPossible( builder => {
cfg.getInt("keep-alive-ms").foreach(builder.setKeepAliveTimeInMillis(_))
cfg.getDouble("core-pool-size-factor").foreach(builder.setCorePoolSizeFromFactor(_))
cfg.getDouble("max-pool-size-factor").foreach(builder.setMaxPoolSizeFromFactor(_))
@ -209,7 +194,20 @@ object Dispatchers extends Logging {
case x => throw new IllegalArgumentException("[%s] is not a valid rejectionPolicy!" format x)
}).foreach(builder.setRejectionPolicy(_))
})
case _ =>
}
val dispatcher: Option[MessageDispatcher] = cfg.getString("type") map {
case "ReactorBasedSingleThreadEventDriven" => new ReactorBasedSingleThreadEventDrivenDispatcher(name)
case "ExecutorBasedEventDrivenWorkStealing" => new ExecutorBasedEventDrivenWorkStealingDispatcher(name,MAILBOX_CAPACITY,threadPoolConfig)
case "ExecutorBasedEventDriven" => new ExecutorBasedEventDrivenDispatcher(name, cfg.getInt("throughput",THROUGHPUT),MAILBOX_CAPACITY,threadPoolConfig)
case "ReactorBasedThreadPoolEventDriven" => new ReactorBasedThreadPoolEventDrivenDispatcher(name,threadPoolConfig)
case "Hawt" => new HawtDispatcher(cfg.getBool("aggregate").getOrElse(true))
case "GlobalReactorBasedSingleThreadEventDriven" => globalReactorBasedSingleThreadEventDrivenDispatcher
case "GlobalReactorBasedThreadPoolEventDriven" => globalReactorBasedThreadPoolEventDrivenDispatcher
case "GlobalExecutorBasedEventDriven" => globalExecutorBasedEventDrivenDispatcher
case "GlobalHawt" => globalHawtDispatcher
case unknown => throw new IllegalArgumentException("Unknown dispatcher type [%s]" format unknown)
}
dispatcher

View file

@ -65,10 +65,12 @@ import java.util.concurrent.{ConcurrentLinkedQueue, LinkedBlockingQueue}
class ExecutorBasedEventDrivenDispatcher(
_name: String,
throughput: Int = Dispatchers.THROUGHPUT,
capacity: Int = Dispatchers.MAILBOX_CAPACITY) extends MessageDispatcher with ThreadPoolBuilder {
capacity: Int = Dispatchers.MAILBOX_CAPACITY,
config: (ThreadPoolBuilder) => Unit = _ => ()) extends MessageDispatcher with ThreadPoolBuilder {
def this(_name: String, throughput: Int) = this(_name, throughput, Dispatchers.MAILBOX_CAPACITY) // Needed for Java API usage
def this(_name: String) = this(_name, Dispatchers.THROUGHPUT) // Needed for Java API usage
def this(_name: String) = this(_name,Dispatchers.THROUGHPUT,Dispatchers.MAILBOX_CAPACITY) // Needed for Java API usage
mailboxCapacity = capacity
@ -163,5 +165,9 @@ class ExecutorBasedEventDrivenDispatcher(
override def toString = "ExecutorBasedEventDrivenDispatcher[" + name + "]"
// FIXME: should we have an unbounded queue and not bounded as default ????
private[akka] def init = withNewThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity.buildThreadPool
private[akka] def init = {
withNewThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity
config(this)
buildThreadPool
}
}

View file

@ -31,7 +31,11 @@ import se.scalablesolutions.akka.actor.{Actor, ActorRef, IllegalActorStateExcept
*/
class ExecutorBasedEventDrivenWorkStealingDispatcher(
_name: String,
capacity: Int = Dispatchers.MAILBOX_CAPACITY) extends MessageDispatcher with ThreadPoolBuilder {
capacity: Int = Dispatchers.MAILBOX_CAPACITY,
config: (ThreadPoolBuilder) => Unit = _ => ()) extends MessageDispatcher with ThreadPoolBuilder {
def this(_name: String, capacity: Int) = this(_name,capacity, _ => ())
mailboxCapacity = capacity
@volatile private var active: Boolean = false
@ -180,7 +184,11 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher(
override def toString = "ExecutorBasedEventDrivenWorkStealingDispatcher[" + name + "]"
private[akka] def init = withNewThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity.buildThreadPool
private[akka] def init = {
withNewThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity
config(this)
buildThreadPool
}
protected override def createMailbox(actorRef: ActorRef): AnyRef = {
if (mailboxCapacity <= 0) new ConcurrentLinkedDeque[MessageInvocation]

View file

@ -62,16 +62,18 @@ import se.scalablesolutions.akka.actor.{ActorRef, IllegalActorStateException}
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class ReactorBasedThreadPoolEventDrivenDispatcher(_name: String)
class ReactorBasedThreadPoolEventDrivenDispatcher(_name: String,config: (ThreadPoolBuilder) => Unit)
extends AbstractReactorBasedEventDrivenDispatcher("akka:event-driven:reactor:dispatcher:" + _name)
with ThreadPoolBuilder {
def this(_name: String) = this(_name,_ => ())
private var fair = true
private val busyActors = new HashSet[AnyRef]
private val messageDemultiplexer = new Demultiplexer(queue)
// build default thread pool
withNewThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity.buildThreadPool
init
def start = if (!active) {
log.debug("Starting up %s", toString)
@ -165,4 +167,10 @@ class ReactorBasedThreadPoolEventDrivenDispatcher(_name: String)
def wakeUp = messageQueue.interrupt
}
private[akka] def init = {
withNewThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity
config(this)
buildThreadPool
}
}

View file

@ -56,7 +56,6 @@ trait ThreadPoolBuilder extends Logging {
def withNewThreadPoolWithCustomBlockingQueue(queue: BlockingQueue[Runnable]): ThreadPoolBuilder = synchronized {
ensureNotActive
verifyNotInConstructionPhase
inProcessOfBuilding = false
blockingQueue = queue
threadPoolBuilder = new ThreadPoolExecutor(NR_START_THREADS, NR_MAX_THREADS, KEEP_ALIVE_TIME, MILLISECONDS, queue)
this