From c1dd4463b940187dd49f92f5c566dbe7f687363d Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Mon, 30 Jan 2012 13:44:56 +0100 Subject: [PATCH] Restructuring how executors are configured and making sure people can plug in their own --- .../akka/actor/dispatch/ActorModelSpec.scala | 36 +++++----- .../actor/dispatch/DispatcherActorSpec.scala | 14 ++-- .../test/scala/akka/config/ConfigSpec.scala | 60 ++++++++++------- akka-actor/src/main/resources/reference.conf | 65 ++++++++++++------- .../akka/dispatch/AbstractDispatcher.scala | 37 +++++++++-- .../scala/akka/dispatch/Dispatchers.scala | 55 ++++++++-------- .../akka/dispatch/ThreadPoolBuilder.scala | 41 +++++------- 7 files changed, 180 insertions(+), 128 deletions(-) diff --git a/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala index 57c3567c4e..45e1954486 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala @@ -448,16 +448,14 @@ object DispatcherModelSpec { class MessageDispatcherInterceptorConfigurator(config: Config, prerequisites: DispatcherPrerequisites) extends MessageDispatcherConfigurator(config, prerequisites) { - private val instance: MessageDispatcher = { - configureThreadPool(config, - threadPoolConfig ⇒ new Dispatcher(prerequisites, - config.getString("id"), - config.getInt("throughput"), - Duration(config.getNanoseconds("throughput-deadline-time"), TimeUnit.NANOSECONDS), - mailboxType, - threadPoolConfig, - Duration(config.getMilliseconds("shutdown-timeout"), TimeUnit.MILLISECONDS)) with MessageDispatcherInterceptor) - } + private val instance: MessageDispatcher = + new Dispatcher(prerequisites, + config.getString("id"), + config.getInt("throughput"), + Duration(config.getNanoseconds("throughput-deadline-time"), TimeUnit.NANOSECONDS), + mailboxType, + configureExecutor(), + Duration(config.getMilliseconds("shutdown-timeout"), TimeUnit.MILLISECONDS)) with MessageDispatcherInterceptor override def dispatcher(): MessageDispatcher = instance } @@ -522,16 +520,14 @@ object BalancingDispatcherModelSpec { class BalancingMessageDispatcherInterceptorConfigurator(config: Config, prerequisites: DispatcherPrerequisites) extends MessageDispatcherConfigurator(config, prerequisites) { - private val instance: MessageDispatcher = { - configureThreadPool(config, - threadPoolConfig ⇒ new BalancingDispatcher(prerequisites, - config.getString("id"), - config.getInt("throughput"), - Duration(config.getNanoseconds("throughput-deadline-time"), TimeUnit.NANOSECONDS), - mailboxType, - threadPoolConfig, - Duration(config.getMilliseconds("shutdown-timeout"), TimeUnit.MILLISECONDS)) with MessageDispatcherInterceptor) - } + private val instance: MessageDispatcher = + new BalancingDispatcher(prerequisites, + config.getString("id"), + config.getInt("throughput"), + Duration(config.getNanoseconds("throughput-deadline-time"), TimeUnit.NANOSECONDS), + mailboxType, + configureExecutor(), + Duration(config.getMilliseconds("shutdown-timeout"), TimeUnit.MILLISECONDS)) with MessageDispatcherInterceptor override def dispatcher(): MessageDispatcher = instance } diff --git a/akka-actor-tests/src/test/scala/akka/actor/dispatch/DispatcherActorSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/dispatch/DispatcherActorSpec.scala index 2dce8346db..4b3dd4a5b3 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/dispatch/DispatcherActorSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/dispatch/DispatcherActorSpec.scala @@ -16,14 +16,20 @@ object DispatcherActorSpec { } test-throughput-dispatcher { throughput = 101 - core-pool-size-min = 1 - core-pool-size-max = 1 + executor = "thread-pool-executor" + thread-pool-executor { + core-pool-size-min = 1 + core-pool-size-max = 1 + } } test-throughput-deadline-dispatcher { throughput = 2 throughput-deadline-time = 100 milliseconds - core-pool-size-min = 1 - core-pool-size-max = 1 + executor = "thread-pool-executor" + thread-pool-executor { + core-pool-size-min = 1 + core-pool-size-max = 1 + } } """ diff --git a/akka-actor-tests/src/test/scala/akka/config/ConfigSpec.scala b/akka-actor-tests/src/test/scala/akka/config/ConfigSpec.scala index 67c7a51b60..a29ee517a3 100644 --- a/akka-actor-tests/src/test/scala/akka/config/ConfigSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/config/ConfigSpec.scala @@ -18,35 +18,49 @@ class ConfigSpec extends AkkaSpec(ConfigFactory.defaultReference) { val settings = system.settings val config = settings.config - import config._ - getString("akka.version") must equal("2.0-SNAPSHOT") - settings.ConfigVersion must equal("2.0-SNAPSHOT") + { + import config._ - getBoolean("akka.daemonic") must equal(false) + getString("akka.version") must equal("2.0-SNAPSHOT") + settings.ConfigVersion must equal("2.0-SNAPSHOT") - getString("akka.actor.default-dispatcher.type") must equal("Dispatcher") - getMilliseconds("akka.actor.default-dispatcher.keep-alive-time") must equal(60 * 1000) - getDouble("akka.actor.default-dispatcher.core-pool-size-factor") must equal(3.0) - getDouble("akka.actor.default-dispatcher.max-pool-size-factor") must equal(3.0) - getInt("akka.actor.default-dispatcher.task-queue-size") must equal(-1) - getString("akka.actor.default-dispatcher.task-queue-type") must equal("linked") - getBoolean("akka.actor.default-dispatcher.allow-core-timeout") must equal(true) - getInt("akka.actor.default-dispatcher.mailbox-capacity") must equal(-1) - getMilliseconds("akka.actor.default-dispatcher.mailbox-push-timeout-time") must equal(10 * 1000) - getString("akka.actor.default-dispatcher.mailboxType") must be("") - getMilliseconds("akka.actor.default-dispatcher.shutdown-timeout") must equal(1 * 1000) - getInt("akka.actor.default-dispatcher.throughput") must equal(5) - getMilliseconds("akka.actor.default-dispatcher.throughput-deadline-time") must equal(0) + getBoolean("akka.daemonic") must equal(false) + getBoolean("akka.actor.serialize-messages") must equal(false) + settings.SerializeAllMessages must equal(false) - getBoolean("akka.actor.serialize-messages") must equal(false) - settings.SerializeAllMessages must equal(false) + getInt("akka.scheduler.ticksPerWheel") must equal(512) + settings.SchedulerTicksPerWheel must equal(512) - getInt("akka.scheduler.ticksPerWheel") must equal(512) - settings.SchedulerTicksPerWheel must equal(512) + getMilliseconds("akka.scheduler.tickDuration") must equal(100) + settings.SchedulerTickDuration must equal(100 millis) + } - getMilliseconds("akka.scheduler.tickDuration") must equal(100) - settings.SchedulerTickDuration must equal(100 millis) + { + val c = config.getConfig("akka.actor.default-dispatcher") + + { + c.getString("type") must equal("Dispatcher") + c.getString("executor") must equal("thread-pool-executor") + c.getInt("mailbox-capacity") must equal(-1) + c.getMilliseconds("mailbox-push-timeout-time") must equal(10 * 1000) + c.getString("mailboxType") must be("") + c.getMilliseconds("shutdown-timeout") must equal(1 * 1000) + c.getInt("throughput") must equal(5) + c.getMilliseconds("throughput-deadline-time") must equal(0) + } + + { + val pool = c.getConfig("thread-pool-executor") + import pool._ + getMilliseconds("keep-alive-time") must equal(60 * 1000) + getDouble("core-pool-size-factor") must equal(3.0) + getDouble("max-pool-size-factor") must equal(3.0) + getInt("task-queue-size") must equal(-1) + getString("task-queue-type") must equal("linked") + getBoolean("allow-core-timeout") must equal(true) + } + } } } } diff --git a/akka-actor/src/main/resources/reference.conf b/akka-actor/src/main/resources/reference.conf index 999c4286c2..ffaedde045 100644 --- a/akka-actor/src/main/resources/reference.conf +++ b/akka-actor/src/main/resources/reference.conf @@ -158,37 +158,58 @@ akka { # parameters type = "Dispatcher" - # Keep alive time for threads - keep-alive-time = 60s + #Which kind of ExecutorService to use for this dispatcher + #Valid options: "thread-pool-executor" requires a "thread-pool-executor" section + # "fork-join-executor" requires a "fork-join-executor" section + # A FQCN of a class extending ExecutorServiceConfigurator + executor = "thread-pool-executor" - # minimum number of threads to cap factor-based core number to - core-pool-size-min = 8 + # This will be used if you have set "executor = "thread-pool-executor"" + thread-pool-executor { + # Keep alive time for threads + keep-alive-time = 60s - # No of core threads ... ceil(available processors * factor) - core-pool-size-factor = 3.0 + # minimum number of threads to cap factor-based core number to + core-pool-size-min = 8 - # maximum number of threads to cap factor-based number to - core-pool-size-max = 64 + # No of core threads ... ceil(available processors * factor) + core-pool-size-factor = 3.0 - # Hint: max-pool-size is only used for bounded task queues - # minimum number of threads to cap factor-based max number to - max-pool-size-min = 8 + # maximum number of threads to cap factor-based number to + core-pool-size-max = 64 - # Max no of threads ... ceil(available processors * factor) - max-pool-size-factor = 3.0 + # Hint: max-pool-size is only used for bounded task queues + # minimum number of threads to cap factor-based max number to + max-pool-size-min = 8 - # maximum number of threads to cap factor-based max number to - max-pool-size-max = 64 + # Max no of threads ... ceil(available processors * factor) + max-pool-size-factor = 3.0 - # Specifies the bounded capacity of the task queue (< 1 == unbounded) - task-queue-size = -1 + # maximum number of threads to cap factor-based max number to + max-pool-size-max = 64 - # Specifies which type of task queue will be used, can be "array" or - # "linked" (default) - task-queue-type = "linked" + # Specifies the bounded capacity of the task queue (< 1 == unbounded) + task-queue-size = -1 - # Allow core threads to time out - allow-core-timeout = on + # Specifies which type of task queue will be used, can be "array" or + # "linked" (default) + task-queue-type = "linked" + + # Allow core threads to time out + allow-core-timeout = on + } + + # This will be used if you have set "executor = "fork-join-executor"" + fork-join-executor { + # minimum number of threads to cap factor-based parallelism number to + parallelism-min = 8 + + # Parallelism (threads) ... ceil(available processors * factor) + parallelism-factor = 3.0 + + # maximum number of threads to cap factor-based parallelism number to + parallelism-max = 64 + } # How long time the dispatcher will wait for new actors until it shuts down shutdown-timeout = 1s diff --git a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala index bb4e3e42e7..77b272d1a5 100644 --- a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala @@ -292,6 +292,8 @@ abstract class MessageDispatcher(val prerequisites: DispatcherPrerequisites) ext protected[akka] def shutdown(): Unit } +abstract class ExecutorServiceConfigurator(config: Config, prerequisites: DispatcherPrerequisites) extends ExecutorServiceFactoryProvider + /** * Base class to be used for hooking in new dispatchers into Dispatchers. */ @@ -333,14 +335,32 @@ abstract class MessageDispatcherConfigurator(val config: Config, val prerequisit } } - def configureThreadPool( - config: Config, - createDispatcher: ⇒ (ExecutorServiceFactoryProvider) ⇒ MessageDispatcher): MessageDispatcher = { - import ThreadPoolConfigDispatcherBuilder.conf_? + def configureExecutor(): ExecutorServiceConfigurator = { + config.getString("executor") match { + case null | "" ⇒ throw new IllegalArgumentException("""Missing "executor" in config file for dispatcher [%s]""".format(config.getString("id"))) + case "thread-pool-executor" ⇒ new ThreadPoolExecutorConfigurator(config.getConfig("thread-pool-executor"), prerequisites) + //case "fork-join-executor" => new ForkJoinExecutorConfigurator(config.getConfig("fork-join-executor"), prerequisites) + case fqcn ⇒ + val constructorSignature = Array[Class[_]](classOf[Config], classOf[DispatcherPrerequisites]) + ReflectiveAccess.createInstance[ExecutorServiceConfigurator](fqcn, constructorSignature, Array[AnyRef](config, prerequisites)) match { + case Right(instance) ⇒ instance + case Left(exception) ⇒ + throw new IllegalArgumentException( + ("Cannot instantiate ExecutorServiceConfigurator (\"executor = [%s]\"), defined in [%s], " + + "make sure it has an accessible constructor with a [%s,%s] signature") + .format(fqcn, config.getString("id"), classOf[Config], classOf[DispatcherPrerequisites]), exception) + } + } + } +} - //Apply the following options to the config if they are present in the config +class ThreadPoolExecutorConfigurator(config: Config, prerequisites: DispatcherPrerequisites) extends ExecutorServiceConfigurator(config, prerequisites) { + import ThreadPoolConfigBuilder.conf_? - ThreadPoolConfigDispatcherBuilder(createDispatcher, ThreadPoolConfig()) + val threadPoolConfig: ThreadPoolConfig = createThreadPoolConfigBuilder(config, prerequisites).config + + def createThreadPoolConfigBuilder(config: Config, prerequisites: DispatcherPrerequisites): ThreadPoolConfigBuilder = { + ThreadPoolConfigBuilder(ThreadPoolConfig()) .setKeepAliveTime(Duration(config getMilliseconds "keep-alive-time", TimeUnit.MILLISECONDS)) .setAllowCoreThreadTimeout(config getBoolean "allow-core-timeout") .setCorePoolSizeFromFactor(config getInt "core-pool-size-min", config getDouble "core-pool-size-factor", config getInt "core-pool-size-max") @@ -354,6 +374,9 @@ abstract class MessageDispatcherConfigurator(val config: Config, val prerequisit case x ⇒ throw new IllegalArgumentException("[%s] is not a valid task-queue-type [array|linked]!" format x) } case _ ⇒ None - })(queueFactory ⇒ _.setQueueFactory(queueFactory))).build + })(queueFactory ⇒ _.setQueueFactory(queueFactory))) } + + def createExecutorServiceFactory(name: String, threadFactory: ThreadFactory): ExecutorServiceFactory = + threadPoolConfig.createExecutorServiceFactory(name, threadFactory) } diff --git a/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala b/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala index 29ddc6c495..fd58346955 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala @@ -156,15 +156,14 @@ class Dispatchers(val settings: ActorSystem.Settings, val prerequisites: Dispatc class DispatcherConfigurator(config: Config, prerequisites: DispatcherPrerequisites) extends MessageDispatcherConfigurator(config, prerequisites) { - private val instance = - configureThreadPool(config, - threadPoolConfig ⇒ new Dispatcher(prerequisites, - config.getString("id"), - config.getInt("throughput"), - Duration(config.getNanoseconds("throughput-deadline-time"), TimeUnit.NANOSECONDS), - mailboxType, - threadPoolConfig, - Duration(config.getMilliseconds("shutdown-timeout"), TimeUnit.MILLISECONDS))) + private val instance = new Dispatcher( + prerequisites, + config.getString("id"), + config.getInt("throughput"), + Duration(config.getNanoseconds("throughput-deadline-time"), TimeUnit.NANOSECONDS), + mailboxType, + configureExecutor(), + Duration(config.getMilliseconds("shutdown-timeout"), TimeUnit.MILLISECONDS)) /** * Returns the same dispatcher instance for each invocation @@ -180,14 +179,13 @@ class DispatcherConfigurator(config: Config, prerequisites: DispatcherPrerequisi class BalancingDispatcherConfigurator(config: Config, prerequisites: DispatcherPrerequisites) extends MessageDispatcherConfigurator(config, prerequisites) { - private val instance = - configureThreadPool(config, - threadPoolConfig ⇒ new BalancingDispatcher(prerequisites, - config.getString("id"), - config.getInt("throughput"), - Duration(config.getNanoseconds("throughput-deadline-time"), TimeUnit.NANOSECONDS), - mailboxType, threadPoolConfig, - Duration(config.getMilliseconds("shutdown-timeout"), TimeUnit.MILLISECONDS))) + private val instance = new BalancingDispatcher( + prerequisites, + config.getString("id"), + config.getInt("throughput"), + Duration(config.getNanoseconds("throughput-deadline-time"), TimeUnit.NANOSECONDS), + mailboxType, configureExecutor(), + Duration(config.getMilliseconds("shutdown-timeout"), TimeUnit.MILLISECONDS)) /** * Returns the same dispatcher instance for each invocation @@ -203,23 +201,22 @@ class BalancingDispatcherConfigurator(config: Config, prerequisites: DispatcherP class PinnedDispatcherConfigurator(config: Config, prerequisites: DispatcherPrerequisites) extends MessageDispatcherConfigurator(config, prerequisites) { - def createPinnedDispatcherWith(tpc: ThreadPoolConfig): PinnedDispatcher = - new PinnedDispatcher( - prerequisites, null, config.getString("id"), mailboxType, - Duration(config.getMilliseconds("shutdown-timeout"), TimeUnit.MILLISECONDS), tpc) - - /** - * Creates new dispatcher for each invocation. - */ - override def dispatcher(): MessageDispatcher = configureThreadPool(config, { - case t: ThreadPoolConfig ⇒ createPinnedDispatcherWith(t) + val threadPoolConfig: ThreadPoolConfig = configureExecutor() match { + case e: ThreadPoolExecutorConfigurator ⇒ e.threadPoolConfig case other ⇒ prerequisites.eventStream.publish( Warning("PinnedDispatcherConfigurator", this.getClass, "PinnedDispatcher [%s] not configured to use ThreadPoolExecutor, falling back to default config.".format( config.getString("id")))) - createPinnedDispatcherWith(ThreadPoolConfig()) - }) + ThreadPoolConfig() + } + /** + * Creates new dispatcher for each invocation. + */ + override def dispatcher(): MessageDispatcher = + new PinnedDispatcher( + prerequisites, null, config.getString("id"), mailboxType, + Duration(config.getMilliseconds("shutdown-timeout"), TimeUnit.MILLISECONDS), threadPoolConfig) } diff --git a/akka-actor/src/main/scala/akka/dispatch/ThreadPoolBuilder.scala b/akka-actor/src/main/scala/akka/dispatch/ThreadPoolBuilder.scala index 687c3d8191..e9430340fa 100644 --- a/akka-actor/src/main/scala/akka/dispatch/ThreadPoolBuilder.scala +++ b/akka-actor/src/main/scala/akka/dispatch/ThreadPoolBuilder.scala @@ -87,70 +87,65 @@ case class ThreadPoolConfig(allowCorePoolTimeout: Boolean = ThreadPoolConfig.def new ThreadPoolExecutorServiceFactory(threadFactory) } -trait DispatcherBuilder { - def build: MessageDispatcher -} - -object ThreadPoolConfigDispatcherBuilder { - def conf_?[T](opt: Option[T])(fun: (T) ⇒ ThreadPoolConfigDispatcherBuilder ⇒ ThreadPoolConfigDispatcherBuilder): Option[(ThreadPoolConfigDispatcherBuilder) ⇒ ThreadPoolConfigDispatcherBuilder] = opt map fun +object ThreadPoolConfigBuilder { + def conf_?[T](opt: Option[T])(fun: (T) ⇒ ThreadPoolConfigBuilder ⇒ ThreadPoolConfigBuilder): Option[(ThreadPoolConfigBuilder) ⇒ ThreadPoolConfigBuilder] = opt map fun } /** * A DSL to configure and create a MessageDispatcher with a ThreadPoolExecutor */ -case class ThreadPoolConfigDispatcherBuilder(dispatcherFactory: (ThreadPoolConfig) ⇒ MessageDispatcher, config: ThreadPoolConfig) extends DispatcherBuilder { +case class ThreadPoolConfigBuilder(config: ThreadPoolConfig) { import ThreadPoolConfig._ - def build: MessageDispatcher = dispatcherFactory(config) - def withNewThreadPoolWithCustomBlockingQueue(newQueueFactory: QueueFactory): ThreadPoolConfigDispatcherBuilder = + def withNewThreadPoolWithCustomBlockingQueue(newQueueFactory: QueueFactory): ThreadPoolConfigBuilder = this.copy(config = config.copy(queueFactory = newQueueFactory)) - def withNewThreadPoolWithCustomBlockingQueue(queue: BlockingQueue[Runnable]): ThreadPoolConfigDispatcherBuilder = + def withNewThreadPoolWithCustomBlockingQueue(queue: BlockingQueue[Runnable]): ThreadPoolConfigBuilder = withNewThreadPoolWithCustomBlockingQueue(reusableQueue(queue)) - def withNewThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity: ThreadPoolConfigDispatcherBuilder = + def withNewThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity: ThreadPoolConfigBuilder = this.copy(config = config.copy(queueFactory = linkedBlockingQueue())) - def withNewThreadPoolWithLinkedBlockingQueueWithCapacity(capacity: Int): ThreadPoolConfigDispatcherBuilder = + def withNewThreadPoolWithLinkedBlockingQueueWithCapacity(capacity: Int): ThreadPoolConfigBuilder = this.copy(config = config.copy(queueFactory = linkedBlockingQueue(capacity))) - def withNewThreadPoolWithSynchronousQueueWithFairness(fair: Boolean): ThreadPoolConfigDispatcherBuilder = + def withNewThreadPoolWithSynchronousQueueWithFairness(fair: Boolean): ThreadPoolConfigBuilder = this.copy(config = config.copy(queueFactory = synchronousQueue(fair))) - def withNewThreadPoolWithArrayBlockingQueueWithCapacityAndFairness(capacity: Int, fair: Boolean): ThreadPoolConfigDispatcherBuilder = + def withNewThreadPoolWithArrayBlockingQueueWithCapacityAndFairness(capacity: Int, fair: Boolean): ThreadPoolConfigBuilder = this.copy(config = config.copy(queueFactory = arrayBlockingQueue(capacity, fair))) - def setCorePoolSize(size: Int): ThreadPoolConfigDispatcherBuilder = + def setCorePoolSize(size: Int): ThreadPoolConfigBuilder = if (config.maxPoolSize < size) this.copy(config = config.copy(corePoolSize = size, maxPoolSize = size)) else this.copy(config = config.copy(corePoolSize = size)) - def setMaxPoolSize(size: Int): ThreadPoolConfigDispatcherBuilder = + def setMaxPoolSize(size: Int): ThreadPoolConfigBuilder = if (config.corePoolSize > size) this.copy(config = config.copy(corePoolSize = size, maxPoolSize = size)) else this.copy(config = config.copy(maxPoolSize = size)) - def setCorePoolSizeFromFactor(min: Int, multiplier: Double, max: Int): ThreadPoolConfigDispatcherBuilder = + def setCorePoolSizeFromFactor(min: Int, multiplier: Double, max: Int): ThreadPoolConfigBuilder = setCorePoolSize(scaledPoolSize(min, multiplier, max)) - def setMaxPoolSizeFromFactor(min: Int, multiplier: Double, max: Int): ThreadPoolConfigDispatcherBuilder = + def setMaxPoolSizeFromFactor(min: Int, multiplier: Double, max: Int): ThreadPoolConfigBuilder = setMaxPoolSize(scaledPoolSize(min, multiplier, max)) - def setKeepAliveTimeInMillis(time: Long): ThreadPoolConfigDispatcherBuilder = + def setKeepAliveTimeInMillis(time: Long): ThreadPoolConfigBuilder = setKeepAliveTime(Duration(time, TimeUnit.MILLISECONDS)) - def setKeepAliveTime(time: Duration): ThreadPoolConfigDispatcherBuilder = + def setKeepAliveTime(time: Duration): ThreadPoolConfigBuilder = this.copy(config = config.copy(threadTimeout = time)) - def setAllowCoreThreadTimeout(allow: Boolean): ThreadPoolConfigDispatcherBuilder = + def setAllowCoreThreadTimeout(allow: Boolean): ThreadPoolConfigBuilder = this.copy(config = config.copy(allowCorePoolTimeout = allow)) - def setQueueFactory(newQueueFactory: QueueFactory): ThreadPoolConfigDispatcherBuilder = + def setQueueFactory(newQueueFactory: QueueFactory): ThreadPoolConfigBuilder = this.copy(config = config.copy(queueFactory = newQueueFactory)) - def configure(fs: Option[Function[ThreadPoolConfigDispatcherBuilder, ThreadPoolConfigDispatcherBuilder]]*): ThreadPoolConfigDispatcherBuilder = fs.foldLeft(this)((c, f) ⇒ f.map(_(c)).getOrElse(c)) + def configure(fs: Option[Function[ThreadPoolConfigBuilder, ThreadPoolConfigBuilder]]*): ThreadPoolConfigBuilder = fs.foldLeft(this)((c, f) ⇒ f.map(_(c)).getOrElse(c)) } object MonitorableThreadFactory {