pekko/akka-actor/src/main/scala/akka/dispatch/ThreadPoolBuilder.scala

218 lines
8.8 KiB
Scala
Raw Normal View History

/**
2012-01-19 18:21:06 +01:00
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.dispatch
import java.util.Collection
2012-01-25 15:38:04 +01:00
import java.util.concurrent.atomic.AtomicLong
import akka.util.Duration
import java.util.concurrent._
2010-10-22 17:50:48 +02:00
object ThreadPoolConfig {
type QueueFactory = () BlockingQueue[Runnable]
2010-10-22 17:50:48 +02:00
val defaultAllowCoreThreadTimeout: Boolean = false
val defaultCorePoolSize: Int = 16
val defaultMaxPoolSize: Int = 128
val defaultTimeout: Duration = Duration(60000L, TimeUnit.MILLISECONDS)
val defaultRejectionPolicy: RejectedExecutionHandler = new SaneRejectedExecutionHandler()
2010-10-22 17:50:48 +02:00
def scaledPoolSize(floor: Int, multiplier: Double, ceiling: Int): Int = {
import scala.math.{ min, max }
min(max((Runtime.getRuntime.availableProcessors * multiplier).ceil.toInt, floor), ceiling)
}
2010-10-22 17:50:48 +02:00
def arrayBlockingQueue(capacity: Int, fair: Boolean): QueueFactory =
() new ArrayBlockingQueue[Runnable](capacity, fair)
2010-10-22 17:50:48 +02:00
def synchronousQueue(fair: Boolean): QueueFactory =
() new SynchronousQueue[Runnable](fair)
2010-10-22 17:50:48 +02:00
def linkedBlockingQueue(): QueueFactory =
() new LinkedBlockingQueue[Runnable]()
2010-10-22 17:50:48 +02:00
def linkedBlockingQueue(capacity: Int): QueueFactory =
() new LinkedBlockingQueue[Runnable](capacity)
2010-10-22 17:50:48 +02:00
def reusableQueue(queue: BlockingQueue[Runnable]): QueueFactory =
() queue
2010-10-22 17:50:48 +02:00
def reusableQueue(queueFactory: QueueFactory): QueueFactory = {
val queue = queueFactory()
() queue
2010-10-22 17:50:48 +02:00
}
}
/**
* Function0 without the fun stuff (mostly for the sake of the Java API side of things)
*/
trait ExecutorServiceFactory {
def createExecutorService: ExecutorService
}
/**
* Generic way to specify an ExecutorService to a Dispatcher, create it with the given name if desired
*/
trait ExecutorServiceFactoryProvider {
def createExecutorServiceFactory(name: String, threadFactory: ThreadFactory): ExecutorServiceFactory
}
/**
* A small configuration DSL to create ThreadPoolExecutors that can be provided as an ExecutorServiceFactoryProvider to Dispatcher
*/
case class ThreadPoolConfig(allowCorePoolTimeout: Boolean = ThreadPoolConfig.defaultAllowCoreThreadTimeout,
corePoolSize: Int = ThreadPoolConfig.defaultCorePoolSize,
maxPoolSize: Int = ThreadPoolConfig.defaultMaxPoolSize,
threadTimeout: Duration = ThreadPoolConfig.defaultTimeout,
queueFactory: ThreadPoolConfig.QueueFactory = ThreadPoolConfig.linkedBlockingQueue(),
rejectionPolicy: RejectedExecutionHandler = ThreadPoolConfig.defaultRejectionPolicy)
extends ExecutorServiceFactoryProvider {
class ThreadPoolExecutorServiceFactory(val threadFactory: ThreadFactory) extends ExecutorServiceFactory {
def createExecutorService: ExecutorService = {
val service = new ThreadPoolExecutor(
corePoolSize,
maxPoolSize,
threadTimeout.length,
threadTimeout.unit,
queueFactory(),
threadFactory,
rejectionPolicy)
service.allowCoreThreadTimeOut(allowCorePoolTimeout)
service
2010-10-22 17:50:48 +02:00
}
}
final def createExecutorServiceFactory(name: String, threadFactory: ThreadFactory): ExecutorServiceFactory =
new ThreadPoolExecutorServiceFactory(threadFactory)
2010-10-22 17:50:48 +02:00
}
trait DispatcherBuilder {
def build: MessageDispatcher
}
object ThreadPoolConfigDispatcherBuilder {
def conf_?[T](opt: Option[T])(fun: (T) ThreadPoolConfigDispatcherBuilder ThreadPoolConfigDispatcherBuilder): Option[(ThreadPoolConfigDispatcherBuilder) ThreadPoolConfigDispatcherBuilder] = opt map fun
}
/**
* A DSL to configure and create a MessageDispatcher with a ThreadPoolExecutor
*/
case class ThreadPoolConfigDispatcherBuilder(dispatcherFactory: (ThreadPoolConfig) MessageDispatcher, config: ThreadPoolConfig) extends DispatcherBuilder {
2010-10-22 17:50:48 +02:00
import ThreadPoolConfig._
def build: MessageDispatcher = dispatcherFactory(config)
2010-10-22 17:50:48 +02:00
def withNewThreadPoolWithCustomBlockingQueue(newQueueFactory: QueueFactory): ThreadPoolConfigDispatcherBuilder =
this.copy(config = config.copy(queueFactory = newQueueFactory))
2010-10-22 17:50:48 +02:00
def withNewThreadPoolWithCustomBlockingQueue(queue: BlockingQueue[Runnable]): ThreadPoolConfigDispatcherBuilder =
withNewThreadPoolWithCustomBlockingQueue(reusableQueue(queue))
2010-10-22 17:50:48 +02:00
def withNewThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity: ThreadPoolConfigDispatcherBuilder =
this.copy(config = config.copy(queueFactory = linkedBlockingQueue()))
2010-10-22 17:50:48 +02:00
def withNewThreadPoolWithLinkedBlockingQueueWithCapacity(capacity: Int): ThreadPoolConfigDispatcherBuilder =
this.copy(config = config.copy(queueFactory = linkedBlockingQueue(capacity)))
2010-10-22 17:50:48 +02:00
def withNewThreadPoolWithSynchronousQueueWithFairness(fair: Boolean): ThreadPoolConfigDispatcherBuilder =
this.copy(config = config.copy(queueFactory = synchronousQueue(fair)))
2010-10-22 17:50:48 +02:00
def withNewThreadPoolWithArrayBlockingQueueWithCapacityAndFairness(capacity: Int, fair: Boolean): ThreadPoolConfigDispatcherBuilder =
this.copy(config = config.copy(queueFactory = arrayBlockingQueue(capacity, fair)))
2010-10-22 17:50:48 +02:00
def setCorePoolSize(size: Int): ThreadPoolConfigDispatcherBuilder =
if (config.maxPoolSize < size)
this.copy(config = config.copy(corePoolSize = size, maxPoolSize = size))
else
this.copy(config = config.copy(corePoolSize = size))
2010-10-22 17:50:48 +02:00
def setMaxPoolSize(size: Int): ThreadPoolConfigDispatcherBuilder =
if (config.corePoolSize > size)
this.copy(config = config.copy(corePoolSize = size, maxPoolSize = size))
else
this.copy(config = config.copy(maxPoolSize = size))
2010-10-22 17:50:48 +02:00
def setCorePoolSizeFromFactor(min: Int, multiplier: Double, max: Int): ThreadPoolConfigDispatcherBuilder =
setCorePoolSize(scaledPoolSize(min, multiplier, max))
2010-10-22 17:50:48 +02:00
def setMaxPoolSizeFromFactor(min: Int, multiplier: Double, max: Int): ThreadPoolConfigDispatcherBuilder =
setMaxPoolSize(scaledPoolSize(min, multiplier, max))
2010-10-22 17:50:48 +02:00
def setKeepAliveTimeInMillis(time: Long): ThreadPoolConfigDispatcherBuilder =
setKeepAliveTime(Duration(time, TimeUnit.MILLISECONDS))
2010-10-22 17:50:48 +02:00
def setKeepAliveTime(time: Duration): ThreadPoolConfigDispatcherBuilder =
this.copy(config = config.copy(threadTimeout = time))
def setAllowCoreThreadTimeout(allow: Boolean): ThreadPoolConfigDispatcherBuilder =
this.copy(config = config.copy(allowCorePoolTimeout = allow))
def setQueueFactory(newQueueFactory: QueueFactory): ThreadPoolConfigDispatcherBuilder =
this.copy(config = config.copy(queueFactory = newQueueFactory))
def configure(fs: Option[Function[ThreadPoolConfigDispatcherBuilder, ThreadPoolConfigDispatcherBuilder]]*): ThreadPoolConfigDispatcherBuilder = fs.foldLeft(this)((c, f) f.map(_(c)).getOrElse(c))
2010-10-22 17:50:48 +02:00
}
2010-05-21 20:08:49 +02:00
object MonitorableThreadFactory {
val doNothing: Thread.UncaughtExceptionHandler =
new Thread.UncaughtExceptionHandler() { def uncaughtException(thread: Thread, cause: Throwable) = () }
}
case class MonitorableThreadFactory(name: String,
daemonic: Boolean,
exceptionHandler: Thread.UncaughtExceptionHandler = MonitorableThreadFactory.doNothing)
extends ThreadFactory {
2010-10-22 17:50:48 +02:00
protected val counter = new AtomicLong
def newThread(runnable: Runnable) = {
2011-12-28 17:50:29 +01:00
val t = new Thread(runnable, name + counter.incrementAndGet())
t.setUncaughtExceptionHandler(exceptionHandler)
t.setDaemon(daemonic)
t
}
2010-10-22 17:50:48 +02:00
}
/**
* As the name says
*/
2011-02-28 22:54:32 +01:00
trait ExecutorServiceDelegate extends ExecutorService {
def executor: ExecutorService
def execute(command: Runnable) = executor.execute(command)
def shutdown() { executor.shutdown() }
def shutdownNow() = executor.shutdownNow()
2010-10-22 17:50:48 +02:00
def isShutdown = executor.isShutdown
2010-10-22 17:50:48 +02:00
def isTerminated = executor.isTerminated
2010-10-22 17:50:48 +02:00
def awaitTermination(l: Long, timeUnit: TimeUnit) = executor.awaitTermination(l, timeUnit)
2010-10-22 17:50:48 +02:00
def submit[T](callable: Callable[T]) = executor.submit(callable)
2010-10-22 17:50:48 +02:00
def submit[T](runnable: Runnable, t: T) = executor.submit(runnable, t)
2010-10-22 17:50:48 +02:00
def submit(runnable: Runnable) = executor.submit(runnable)
2010-10-22 17:50:48 +02:00
def invokeAll[T](callables: Collection[_ <: Callable[T]]) = executor.invokeAll(callables)
2010-10-22 17:50:48 +02:00
def invokeAll[T](callables: Collection[_ <: Callable[T]], l: Long, timeUnit: TimeUnit) = executor.invokeAll(callables, l, timeUnit)
2010-10-22 17:50:48 +02:00
def invokeAny[T](callables: Collection[_ <: Callable[T]]) = executor.invokeAny(callables)
def invokeAny[T](callables: Collection[_ <: Callable[T]], l: Long, timeUnit: TimeUnit) = executor.invokeAny(callables, l, timeUnit)
2010-05-21 20:08:49 +02:00
}
/**
* The RejectedExecutionHandler used by Akka, it improves on CallerRunsPolicy
* by throwing a RejectedExecutionException if the executor isShutdown.
* (CallerRunsPolicy silently discards the runnable in this case, which is arguably broken)
*/
class SaneRejectedExecutionHandler extends RejectedExecutionHandler {
def rejectedExecution(runnable: Runnable, threadPoolExecutor: ThreadPoolExecutor): Unit = {
if (threadPoolExecutor.isShutdown) throw new RejectedExecutionException("Shutdown")
else runnable.run()
}
}