Adding support for ForkJoin dispatcher as FJDispatcher

This commit is contained in:
Viktor Klang 2011-07-01 22:13:56 +02:00
parent 99b625571d
commit fc51bc4864
4 changed files with 144 additions and 21 deletions

View file

@ -11,6 +11,7 @@ import ThreadPoolExecutor.CallerRunsPolicy
import akka.util.Duration
import akka.event.EventHandler
import concurrent.forkjoin.{ ForkJoinWorkerThread, ForkJoinTask, ForkJoinPool }
object ThreadPoolConfig {
type Bounds = Int
@ -51,18 +52,24 @@ object ThreadPoolConfig {
}
}
trait ExecutorServiceFactory {
def createExecutorService: ExecutorService
}
trait ExecutorServiceFactoryProvider {
def createExecutorServiceFactory(name: String): ExecutorServiceFactory
}
case class ThreadPoolConfig(allowCorePoolTimeout: Boolean = ThreadPoolConfig.defaultAllowCoreThreadTimeout,
corePoolSize: Int = ThreadPoolConfig.defaultCorePoolSize,
maxPoolSize: Int = ThreadPoolConfig.defaultMaxPoolSize,
threadTimeout: Duration = ThreadPoolConfig.defaultTimeout,
flowHandler: ThreadPoolConfig.FlowHandler = ThreadPoolConfig.defaultFlowHandler,
queueFactory: ThreadPoolConfig.QueueFactory = ThreadPoolConfig.linkedBlockingQueue()) {
final def createLazyExecutorService(threadFactory: ThreadFactory): ExecutorService =
new LazyExecutorServiceWrapper(createExecutorService(threadFactory))
final def createExecutorService(threadFactory: ThreadFactory): ExecutorService = {
flowHandler match {
queueFactory: ThreadPoolConfig.QueueFactory = ThreadPoolConfig.linkedBlockingQueue())
extends ExecutorServiceFactoryProvider {
final def createExecutorServiceFactory(name: String): ExecutorServiceFactory = new ExecutorServiceFactory {
val threadFactory = new MonitorableThreadFactory(name)
def createExecutorService: ExecutorService = flowHandler match {
case Left(rejectHandler)
val service = new ThreadPoolExecutor(corePoolSize, maxPoolSize, threadTimeout.length, threadTimeout.unit, queueFactory(), threadFactory, rejectHandler)
service.allowCoreThreadTimeOut(allowCorePoolTimeout)