Restructuring how executors are configured and making sure people can plug in their own

This commit is contained in:
Viktor Klang 2012-01-30 13:44:56 +01:00
parent 1a122986a1
commit c1dd4463b9
7 changed files with 180 additions and 128 deletions

View file

@ -87,70 +87,65 @@ case class ThreadPoolConfig(allowCorePoolTimeout: Boolean = ThreadPoolConfig.def
new ThreadPoolExecutorServiceFactory(threadFactory)
}
trait DispatcherBuilder {
def build: MessageDispatcher
}
object ThreadPoolConfigDispatcherBuilder {
def conf_?[T](opt: Option[T])(fun: (T) ThreadPoolConfigDispatcherBuilder ThreadPoolConfigDispatcherBuilder): Option[(ThreadPoolConfigDispatcherBuilder) ThreadPoolConfigDispatcherBuilder] = opt map fun
object ThreadPoolConfigBuilder {
def conf_?[T](opt: Option[T])(fun: (T) ThreadPoolConfigBuilder ThreadPoolConfigBuilder): Option[(ThreadPoolConfigBuilder) ThreadPoolConfigBuilder] = opt map fun
}
/**
* A DSL to configure and create a MessageDispatcher with a ThreadPoolExecutor
*/
case class ThreadPoolConfigDispatcherBuilder(dispatcherFactory: (ThreadPoolConfig) MessageDispatcher, config: ThreadPoolConfig) extends DispatcherBuilder {
case class ThreadPoolConfigBuilder(config: ThreadPoolConfig) {
import ThreadPoolConfig._
def build: MessageDispatcher = dispatcherFactory(config)
def withNewThreadPoolWithCustomBlockingQueue(newQueueFactory: QueueFactory): ThreadPoolConfigDispatcherBuilder =
def withNewThreadPoolWithCustomBlockingQueue(newQueueFactory: QueueFactory): ThreadPoolConfigBuilder =
this.copy(config = config.copy(queueFactory = newQueueFactory))
def withNewThreadPoolWithCustomBlockingQueue(queue: BlockingQueue[Runnable]): ThreadPoolConfigDispatcherBuilder =
def withNewThreadPoolWithCustomBlockingQueue(queue: BlockingQueue[Runnable]): ThreadPoolConfigBuilder =
withNewThreadPoolWithCustomBlockingQueue(reusableQueue(queue))
def withNewThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity: ThreadPoolConfigDispatcherBuilder =
def withNewThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity: ThreadPoolConfigBuilder =
this.copy(config = config.copy(queueFactory = linkedBlockingQueue()))
def withNewThreadPoolWithLinkedBlockingQueueWithCapacity(capacity: Int): ThreadPoolConfigDispatcherBuilder =
def withNewThreadPoolWithLinkedBlockingQueueWithCapacity(capacity: Int): ThreadPoolConfigBuilder =
this.copy(config = config.copy(queueFactory = linkedBlockingQueue(capacity)))
def withNewThreadPoolWithSynchronousQueueWithFairness(fair: Boolean): ThreadPoolConfigDispatcherBuilder =
def withNewThreadPoolWithSynchronousQueueWithFairness(fair: Boolean): ThreadPoolConfigBuilder =
this.copy(config = config.copy(queueFactory = synchronousQueue(fair)))
def withNewThreadPoolWithArrayBlockingQueueWithCapacityAndFairness(capacity: Int, fair: Boolean): ThreadPoolConfigDispatcherBuilder =
def withNewThreadPoolWithArrayBlockingQueueWithCapacityAndFairness(capacity: Int, fair: Boolean): ThreadPoolConfigBuilder =
this.copy(config = config.copy(queueFactory = arrayBlockingQueue(capacity, fair)))
def setCorePoolSize(size: Int): ThreadPoolConfigDispatcherBuilder =
def setCorePoolSize(size: Int): ThreadPoolConfigBuilder =
if (config.maxPoolSize < size)
this.copy(config = config.copy(corePoolSize = size, maxPoolSize = size))
else
this.copy(config = config.copy(corePoolSize = size))
def setMaxPoolSize(size: Int): ThreadPoolConfigDispatcherBuilder =
def setMaxPoolSize(size: Int): ThreadPoolConfigBuilder =
if (config.corePoolSize > size)
this.copy(config = config.copy(corePoolSize = size, maxPoolSize = size))
else
this.copy(config = config.copy(maxPoolSize = size))
def setCorePoolSizeFromFactor(min: Int, multiplier: Double, max: Int): ThreadPoolConfigDispatcherBuilder =
def setCorePoolSizeFromFactor(min: Int, multiplier: Double, max: Int): ThreadPoolConfigBuilder =
setCorePoolSize(scaledPoolSize(min, multiplier, max))
def setMaxPoolSizeFromFactor(min: Int, multiplier: Double, max: Int): ThreadPoolConfigDispatcherBuilder =
def setMaxPoolSizeFromFactor(min: Int, multiplier: Double, max: Int): ThreadPoolConfigBuilder =
setMaxPoolSize(scaledPoolSize(min, multiplier, max))
def setKeepAliveTimeInMillis(time: Long): ThreadPoolConfigDispatcherBuilder =
def setKeepAliveTimeInMillis(time: Long): ThreadPoolConfigBuilder =
setKeepAliveTime(Duration(time, TimeUnit.MILLISECONDS))
def setKeepAliveTime(time: Duration): ThreadPoolConfigDispatcherBuilder =
def setKeepAliveTime(time: Duration): ThreadPoolConfigBuilder =
this.copy(config = config.copy(threadTimeout = time))
def setAllowCoreThreadTimeout(allow: Boolean): ThreadPoolConfigDispatcherBuilder =
def setAllowCoreThreadTimeout(allow: Boolean): ThreadPoolConfigBuilder =
this.copy(config = config.copy(allowCorePoolTimeout = allow))
def setQueueFactory(newQueueFactory: QueueFactory): ThreadPoolConfigDispatcherBuilder =
def setQueueFactory(newQueueFactory: QueueFactory): ThreadPoolConfigBuilder =
this.copy(config = config.copy(queueFactory = newQueueFactory))
def configure(fs: Option[Function[ThreadPoolConfigDispatcherBuilder, ThreadPoolConfigDispatcherBuilder]]*): ThreadPoolConfigDispatcherBuilder = fs.foldLeft(this)((c, f) f.map(_(c)).getOrElse(c))
def configure(fs: Option[Function[ThreadPoolConfigBuilder, ThreadPoolConfigBuilder]]*): ThreadPoolConfigBuilder = fs.foldLeft(this)((c, f) f.map(_(c)).getOrElse(c))
}
object MonitorableThreadFactory {