diff --git a/akka-actor/src/main/scala/dispatch/Dispatchers.scala b/akka-actor/src/main/scala/dispatch/Dispatchers.scala index 803fd700cc..f33d3b1b24 100644 --- a/akka-actor/src/main/scala/dispatch/Dispatchers.scala +++ b/akka-actor/src/main/scala/dispatch/Dispatchers.scala @@ -177,23 +177,8 @@ object Dispatchers extends Logging { def from(cfg: ConfigMap): Option[MessageDispatcher] = { lazy val name = cfg.getString("name", UUID.newUuid.toString) - val dispatcher: Option[MessageDispatcher] = cfg.getString("type") map { - case "ReactorBasedSingleThreadEventDriven" => newReactorBasedSingleThreadEventDrivenDispatcher(name) - case "ExecutorBasedEventDrivenWorkStealing" => newExecutorBasedEventDrivenWorkStealingDispatcher(name) - case "ExecutorBasedEventDriven" => newExecutorBasedEventDrivenDispatcher(name,cfg.getInt("throughput",THROUGHPUT)) - case "ReactorBasedThreadPoolEventDriven" => newReactorBasedThreadPoolEventDrivenDispatcher(name) - case "Hawt" => newHawtDispatcher(cfg.getBool("aggregate").getOrElse(true)) - case "GlobalReactorBasedSingleThreadEventDriven" => globalReactorBasedSingleThreadEventDrivenDispatcher - case "GlobalReactorBasedThreadPoolEventDriven" => globalReactorBasedThreadPoolEventDrivenDispatcher - case "GlobalExecutorBasedEventDriven" => globalExecutorBasedEventDrivenDispatcher - case "GlobalHawt" => globalHawtDispatcher - - case unknown => throw new IllegalArgumentException("Unknown dispatcher type [%s]" format unknown) - } - - dispatcher foreach { - case d: ThreadPoolBuilder => d.configureIfPossible( builder => { - + def threadPoolConfig(b: ThreadPoolBuilder) { + b.configureIfPossible( builder => { cfg.getInt("keep-alive-ms").foreach(builder.setKeepAliveTimeInMillis(_)) cfg.getDouble("core-pool-size-factor").foreach(builder.setCorePoolSizeFromFactor(_)) cfg.getDouble("max-pool-size-factor").foreach(builder.setMaxPoolSizeFromFactor(_)) @@ -209,7 +194,20 @@ object Dispatchers extends Logging { case x => throw new IllegalArgumentException("[%s] is not a valid rejectionPolicy!" format x) }).foreach(builder.setRejectionPolicy(_)) }) - case _ => + } + + val dispatcher: Option[MessageDispatcher] = cfg.getString("type") map { + case "ReactorBasedSingleThreadEventDriven" => new ReactorBasedSingleThreadEventDrivenDispatcher(name) + case "ExecutorBasedEventDrivenWorkStealing" => new ExecutorBasedEventDrivenWorkStealingDispatcher(name,MAILBOX_CAPACITY,threadPoolConfig) + case "ExecutorBasedEventDriven" => new ExecutorBasedEventDrivenDispatcher(name, cfg.getInt("throughput",THROUGHPUT),MAILBOX_CAPACITY,threadPoolConfig) + case "ReactorBasedThreadPoolEventDriven" => new ReactorBasedThreadPoolEventDrivenDispatcher(name,threadPoolConfig) + case "Hawt" => new HawtDispatcher(cfg.getBool("aggregate").getOrElse(true)) + case "GlobalReactorBasedSingleThreadEventDriven" => globalReactorBasedSingleThreadEventDrivenDispatcher + case "GlobalReactorBasedThreadPoolEventDriven" => globalReactorBasedThreadPoolEventDrivenDispatcher + case "GlobalExecutorBasedEventDriven" => globalExecutorBasedEventDrivenDispatcher + case "GlobalHawt" => globalHawtDispatcher + + case unknown => throw new IllegalArgumentException("Unknown dispatcher type [%s]" format unknown) } dispatcher diff --git a/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala b/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala index 5f8469eb84..c3ecf5ded7 100644 --- a/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala +++ b/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala @@ -65,10 +65,12 @@ import java.util.concurrent.{ConcurrentLinkedQueue, LinkedBlockingQueue} class ExecutorBasedEventDrivenDispatcher( _name: String, throughput: Int = Dispatchers.THROUGHPUT, - capacity: Int = Dispatchers.MAILBOX_CAPACITY) extends MessageDispatcher with ThreadPoolBuilder { + capacity: Int = Dispatchers.MAILBOX_CAPACITY, + config: (ThreadPoolBuilder) => Unit = _ => ()) extends MessageDispatcher with ThreadPoolBuilder { def this(_name: String, throughput: Int) = this(_name, throughput, Dispatchers.MAILBOX_CAPACITY) // Needed for Java API usage - def this(_name: String) = this(_name, Dispatchers.THROUGHPUT) // Needed for Java API usage + def this(_name: String) = this(_name,Dispatchers.THROUGHPUT,Dispatchers.MAILBOX_CAPACITY) // Needed for Java API usage + mailboxCapacity = capacity @@ -163,5 +165,9 @@ class ExecutorBasedEventDrivenDispatcher( override def toString = "ExecutorBasedEventDrivenDispatcher[" + name + "]" // FIXME: should we have an unbounded queue and not bounded as default ???? - private[akka] def init = withNewThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity.buildThreadPool + private[akka] def init = { + withNewThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity + config(this) + buildThreadPool + } } diff --git a/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala b/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala index f9409e91fb..9b1097213e 100644 --- a/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala +++ b/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala @@ -31,7 +31,11 @@ import se.scalablesolutions.akka.actor.{Actor, ActorRef, IllegalActorStateExcept */ class ExecutorBasedEventDrivenWorkStealingDispatcher( _name: String, - capacity: Int = Dispatchers.MAILBOX_CAPACITY) extends MessageDispatcher with ThreadPoolBuilder { + capacity: Int = Dispatchers.MAILBOX_CAPACITY, + config: (ThreadPoolBuilder) => Unit = _ => ()) extends MessageDispatcher with ThreadPoolBuilder { + + def this(_name: String, capacity: Int) = this(_name,capacity, _ => ()) + mailboxCapacity = capacity @volatile private var active: Boolean = false @@ -180,7 +184,11 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher( override def toString = "ExecutorBasedEventDrivenWorkStealingDispatcher[" + name + "]" - private[akka] def init = withNewThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity.buildThreadPool + private[akka] def init = { + withNewThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity + config(this) + buildThreadPool + } protected override def createMailbox(actorRef: ActorRef): AnyRef = { if (mailboxCapacity <= 0) new ConcurrentLinkedDeque[MessageInvocation] diff --git a/akka-actor/src/main/scala/dispatch/ReactorBasedThreadPoolEventDrivenDispatcher.scala b/akka-actor/src/main/scala/dispatch/ReactorBasedThreadPoolEventDrivenDispatcher.scala index 1b01e90298..684f737c07 100644 --- a/akka-actor/src/main/scala/dispatch/ReactorBasedThreadPoolEventDrivenDispatcher.scala +++ b/akka-actor/src/main/scala/dispatch/ReactorBasedThreadPoolEventDrivenDispatcher.scala @@ -62,16 +62,18 @@ import se.scalablesolutions.akka.actor.{ActorRef, IllegalActorStateException} * * @author Jonas Bonér */ -class ReactorBasedThreadPoolEventDrivenDispatcher(_name: String) +class ReactorBasedThreadPoolEventDrivenDispatcher(_name: String,config: (ThreadPoolBuilder) => Unit) extends AbstractReactorBasedEventDrivenDispatcher("akka:event-driven:reactor:dispatcher:" + _name) with ThreadPoolBuilder { + def this(_name: String) = this(_name,_ => ()) + private var fair = true private val busyActors = new HashSet[AnyRef] private val messageDemultiplexer = new Demultiplexer(queue) // build default thread pool - withNewThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity.buildThreadPool + init def start = if (!active) { log.debug("Starting up %s", toString) @@ -165,4 +167,10 @@ class ReactorBasedThreadPoolEventDrivenDispatcher(_name: String) def wakeUp = messageQueue.interrupt } + + private[akka] def init = { + withNewThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity + config(this) + buildThreadPool + } } diff --git a/akka-actor/src/main/scala/dispatch/ThreadPoolBuilder.scala b/akka-actor/src/main/scala/dispatch/ThreadPoolBuilder.scala index 9fe47d5415..9657ad3fe4 100644 --- a/akka-actor/src/main/scala/dispatch/ThreadPoolBuilder.scala +++ b/akka-actor/src/main/scala/dispatch/ThreadPoolBuilder.scala @@ -56,7 +56,6 @@ trait ThreadPoolBuilder extends Logging { def withNewThreadPoolWithCustomBlockingQueue(queue: BlockingQueue[Runnable]): ThreadPoolBuilder = synchronized { ensureNotActive verifyNotInConstructionPhase - inProcessOfBuilding = false blockingQueue = queue threadPoolBuilder = new ThreadPoolExecutor(NR_START_THREADS, NR_MAX_THREADS, KEEP_ALIVE_TIME, MILLISECONDS, queue) this