pekko/akka-actor/src/main/scala/akka/dispatch/ThreadPoolBuilder.scala

263 lines
10 KiB
Scala
Raw Normal View History

/**
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
*/
package akka.dispatch
import java.util.Collection
import java.util.concurrent._
import atomic.{ AtomicLong, AtomicInteger }
import ThreadPoolExecutor.CallerRunsPolicy
import akka.util.Duration
import akka.event.EventHandler
import concurrent.forkjoin.{ ForkJoinWorkerThread, ForkJoinTask, ForkJoinPool }
2010-10-22 17:50:48 +02:00
object ThreadPoolConfig {
type Bounds = Int
type FlowHandler = Either[RejectedExecutionHandler, Bounds]
type QueueFactory = () BlockingQueue[Runnable]
2010-10-22 17:50:48 +02:00
val defaultAllowCoreThreadTimeout: Boolean = false
val defaultCorePoolSize: Int = 16
val defaultMaxPoolSize: Int = 128
val defaultTimeout: Duration = Duration(60000L, TimeUnit.MILLISECONDS)
def defaultFlowHandler: FlowHandler = flowHandler(new CallerRunsPolicy)
2010-10-22 17:50:48 +02:00
def flowHandler(rejectionHandler: RejectedExecutionHandler): FlowHandler = Left(rejectionHandler)
def flowHandler(bounds: Int): FlowHandler = Right(bounds)
def fixedPoolSize(size: Int): Int = size
def scaledPoolSize(multiplier: Double): Int =
(Runtime.getRuntime.availableProcessors * multiplier).ceil.toInt
def arrayBlockingQueue(capacity: Int, fair: Boolean): QueueFactory =
() new ArrayBlockingQueue[Runnable](capacity, fair)
2010-10-22 17:50:48 +02:00
def synchronousQueue(fair: Boolean): QueueFactory =
() new SynchronousQueue[Runnable](fair)
2010-10-22 17:50:48 +02:00
def linkedBlockingQueue(): QueueFactory =
() new LinkedBlockingQueue[Runnable]()
2010-10-22 17:50:48 +02:00
def linkedBlockingQueue(capacity: Int): QueueFactory =
() new LinkedBlockingQueue[Runnable](capacity)
2010-10-22 17:50:48 +02:00
def reusableQueue(queue: BlockingQueue[Runnable]): QueueFactory =
() queue
2010-10-22 17:50:48 +02:00
def reusableQueue(queueFactory: QueueFactory): QueueFactory = {
val queue = queueFactory()
() queue
2010-10-22 17:50:48 +02:00
}
}
trait ExecutorServiceFactory {
def createExecutorService: ExecutorService
}
trait ExecutorServiceFactoryProvider {
def createExecutorServiceFactory(name: String): ExecutorServiceFactory
}
2010-10-22 17:50:48 +02:00
case class ThreadPoolConfig(allowCorePoolTimeout: Boolean = ThreadPoolConfig.defaultAllowCoreThreadTimeout,
corePoolSize: Int = ThreadPoolConfig.defaultCorePoolSize,
maxPoolSize: Int = ThreadPoolConfig.defaultMaxPoolSize,
threadTimeout: Duration = ThreadPoolConfig.defaultTimeout,
flowHandler: ThreadPoolConfig.FlowHandler = ThreadPoolConfig.defaultFlowHandler,
queueFactory: ThreadPoolConfig.QueueFactory = ThreadPoolConfig.linkedBlockingQueue())
extends ExecutorServiceFactoryProvider {
final def createExecutorServiceFactory(name: String): ExecutorServiceFactory = new ExecutorServiceFactory {
val threadFactory = new MonitorableThreadFactory(name)
def createExecutorService: ExecutorService = flowHandler match {
case Left(rejectHandler)
2010-10-22 17:50:48 +02:00
val service = new ThreadPoolExecutor(corePoolSize, maxPoolSize, threadTimeout.length, threadTimeout.unit, queueFactory(), threadFactory, rejectHandler)
service.allowCoreThreadTimeOut(allowCorePoolTimeout)
service
case Right(bounds)
2010-10-22 17:50:48 +02:00
val service = new ThreadPoolExecutor(corePoolSize, maxPoolSize, threadTimeout.length, threadTimeout.unit, queueFactory(), threadFactory)
service.allowCoreThreadTimeOut(allowCorePoolTimeout)
new BoundedExecutorDecorator(service, bounds)
2010-10-22 17:50:48 +02:00
}
}
}
trait DispatcherBuilder {
def build: MessageDispatcher
}
object ThreadPoolConfigDispatcherBuilder {
def conf_?[T](opt: Option[T])(fun: (T) ThreadPoolConfigDispatcherBuilder ThreadPoolConfigDispatcherBuilder): Option[(ThreadPoolConfigDispatcherBuilder) ThreadPoolConfigDispatcherBuilder] = opt map fun
}
case class ThreadPoolConfigDispatcherBuilder(dispatcherFactory: (ThreadPoolConfig) MessageDispatcher, config: ThreadPoolConfig) extends DispatcherBuilder {
2010-10-22 17:50:48 +02:00
import ThreadPoolConfig._
def build = dispatcherFactory(config)
def withNewBoundedThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity(bounds: Int): ThreadPoolConfigDispatcherBuilder =
this.copy(config = config.copy(flowHandler = flowHandler(bounds), queueFactory = linkedBlockingQueue()))
def withNewThreadPoolWithCustomBlockingQueue(newQueueFactory: QueueFactory): ThreadPoolConfigDispatcherBuilder =
this.copy(config = config.copy(flowHandler = defaultFlowHandler, queueFactory = newQueueFactory))
def withNewThreadPoolWithCustomBlockingQueue(queue: BlockingQueue[Runnable]): ThreadPoolConfigDispatcherBuilder =
withNewThreadPoolWithCustomBlockingQueue(reusableQueue(queue))
2010-10-22 17:50:48 +02:00
def withNewThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity: ThreadPoolConfigDispatcherBuilder =
this.copy(config = config.copy(queueFactory = linkedBlockingQueue(), flowHandler = defaultFlowHandler))
def withNewThreadPoolWithLinkedBlockingQueueWithCapacity(capacity: Int): ThreadPoolConfigDispatcherBuilder =
this.copy(config = config.copy(queueFactory = linkedBlockingQueue(capacity), flowHandler = defaultFlowHandler))
def withNewThreadPoolWithSynchronousQueueWithFairness(fair: Boolean): ThreadPoolConfigDispatcherBuilder =
this.copy(config = config.copy(queueFactory = synchronousQueue(fair), flowHandler = defaultFlowHandler))
def withNewThreadPoolWithArrayBlockingQueueWithCapacityAndFairness(capacity: Int, fair: Boolean): ThreadPoolConfigDispatcherBuilder =
this.copy(config = config.copy(queueFactory = arrayBlockingQueue(capacity, fair), flowHandler = defaultFlowHandler))
2010-10-22 17:50:48 +02:00
def setCorePoolSize(size: Int): ThreadPoolConfigDispatcherBuilder =
this.copy(config = config.copy(corePoolSize = size))
def setMaxPoolSize(size: Int): ThreadPoolConfigDispatcherBuilder =
this.copy(config = config.copy(maxPoolSize = size))
def setCorePoolSizeFromFactor(multiplier: Double): ThreadPoolConfigDispatcherBuilder =
setCorePoolSize(scaledPoolSize(multiplier))
def setMaxPoolSizeFromFactor(multiplier: Double): ThreadPoolConfigDispatcherBuilder =
setMaxPoolSize(scaledPoolSize(multiplier))
def setExecutorBounds(bounds: Int): ThreadPoolConfigDispatcherBuilder =
this.copy(config = config.copy(flowHandler = flowHandler(bounds)))
def setKeepAliveTimeInMillis(time: Long): ThreadPoolConfigDispatcherBuilder =
setKeepAliveTime(Duration(time, TimeUnit.MILLISECONDS))
2010-10-22 17:50:48 +02:00
def setKeepAliveTime(time: Duration): ThreadPoolConfigDispatcherBuilder =
this.copy(config = config.copy(threadTimeout = time))
def setRejectionPolicy(policy: RejectedExecutionHandler): ThreadPoolConfigDispatcherBuilder =
setFlowHandler(flowHandler(policy))
def setFlowHandler(newFlowHandler: FlowHandler): ThreadPoolConfigDispatcherBuilder =
this.copy(config = config.copy(flowHandler = newFlowHandler))
def setAllowCoreThreadTimeout(allow: Boolean): ThreadPoolConfigDispatcherBuilder =
this.copy(config = config.copy(allowCorePoolTimeout = allow))
def configure(fs: Option[Function[ThreadPoolConfigDispatcherBuilder, ThreadPoolConfigDispatcherBuilder]]*): ThreadPoolConfigDispatcherBuilder = fs.foldLeft(this)((c, f) f.map(_(c)).getOrElse(c))
2010-10-22 17:50:48 +02:00
}
2010-05-21 20:08:49 +02:00
2010-10-22 17:50:48 +02:00
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class MonitorableThreadFactory(val name: String) extends ThreadFactory {
protected val counter = new AtomicLong
def newThread(runnable: Runnable) = new MonitorableThread(runnable, name)
2010-10-22 17:50:48 +02:00
}
2010-10-22 17:50:48 +02:00
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
object MonitorableThread {
val DEFAULT_NAME = "MonitorableThread".intern
// FIXME use MonitorableThread.created and MonitorableThread.alive in monitoring
val created = new AtomicInteger
val alive = new AtomicInteger
}
2010-10-22 17:50:48 +02:00
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class MonitorableThread(runnable: Runnable, name: String)
extends Thread(runnable, name + "-" + MonitorableThread.created.incrementAndGet) {
2010-10-22 17:50:48 +02:00
setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
2011-02-28 22:54:32 +01:00
def uncaughtException(thread: Thread, cause: Throwable) = {}
2010-10-22 17:50:48 +02:00
})
override def run = {
try {
MonitorableThread.alive.incrementAndGet
super.run
} finally {
MonitorableThread.alive.decrementAndGet
}
}
}
2010-10-22 17:50:48 +02:00
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class BoundedExecutorDecorator(val executor: ExecutorService, bound: Int) extends ExecutorServiceDelegate {
2010-10-22 17:50:48 +02:00
protected val semaphore = new Semaphore(bound)
override def execute(command: Runnable) = {
2010-10-22 17:50:48 +02:00
semaphore.acquire
try {
executor.execute(new Runnable() {
def run = {
try {
command.run
} finally {
semaphore.release
}
}
})
} catch {
case e: RejectedExecutionException
EventHandler.warning(this, e.toString)
2010-10-22 17:50:48 +02:00
semaphore.release
case e: Throwable
EventHandler.error(e, this, e.getMessage)
2010-10-22 17:50:48 +02:00
throw e
}
}
}
2011-02-28 22:54:32 +01:00
trait ExecutorServiceDelegate extends ExecutorService {
def executor: ExecutorService
def execute(command: Runnable) = executor.execute(command)
def shutdown() { executor.shutdown() }
def shutdownNow() = executor.shutdownNow()
2010-10-22 17:50:48 +02:00
def isShutdown = executor.isShutdown
2010-10-22 17:50:48 +02:00
def isTerminated = executor.isTerminated
2010-10-22 17:50:48 +02:00
def awaitTermination(l: Long, timeUnit: TimeUnit) = executor.awaitTermination(l, timeUnit)
2010-10-22 17:50:48 +02:00
def submit[T](callable: Callable[T]) = executor.submit(callable)
2010-10-22 17:50:48 +02:00
def submit[T](runnable: Runnable, t: T) = executor.submit(runnable, t)
2010-10-22 17:50:48 +02:00
def submit(runnable: Runnable) = executor.submit(runnable)
2010-10-22 17:50:48 +02:00
def invokeAll[T](callables: Collection[_ <: Callable[T]]) = executor.invokeAll(callables)
2010-10-22 17:50:48 +02:00
def invokeAll[T](callables: Collection[_ <: Callable[T]], l: Long, timeUnit: TimeUnit) = executor.invokeAll(callables, l, timeUnit)
2010-10-22 17:50:48 +02:00
def invokeAny[T](callables: Collection[_ <: Callable[T]]) = executor.invokeAny(callables)
def invokeAny[T](callables: Collection[_ <: Callable[T]], l: Long, timeUnit: TimeUnit) = executor.invokeAny(callables, l, timeUnit)
2010-05-21 20:08:49 +02:00
}
trait LazyExecutorService extends ExecutorServiceDelegate {
def createExecutor: ExecutorService
lazy val executor = {
createExecutor
}
}
class LazyExecutorServiceWrapper(executorFactory: ExecutorService) extends LazyExecutorService {
def createExecutor = executorFactory
2010-10-29 16:33:31 +02:00
}