2009-12-11 16:37:44 +01:00
|
|
|
/**
|
2011-07-14 16:03:08 +02:00
|
|
|
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
|
2009-12-11 16:37:44 +01:00
|
|
|
*/
|
|
|
|
|
|
2010-10-26 12:49:25 +02:00
|
|
|
package akka.dispatch
|
2009-12-11 16:37:44 +01:00
|
|
|
|
2010-03-01 22:03:17 +01:00
|
|
|
import java.util.Collection
|
2011-11-15 10:25:05 +01:00
|
|
|
import java.util.concurrent.atomic.{ AtomicLong, AtomicInteger }
|
2011-03-02 00:14:45 +01:00
|
|
|
import akka.util.Duration
|
2011-10-27 12:23:01 +02:00
|
|
|
import akka.event.Logging.{ Warning, Error }
|
2011-11-10 20:08:00 +01:00
|
|
|
import akka.actor.ActorSystem
|
2011-11-15 10:25:05 +01:00
|
|
|
import java.util.concurrent._
|
2011-11-14 16:03:26 +01:00
|
|
|
import akka.event.EventStream
|
2011-12-01 17:03:30 +01:00
|
|
|
import concurrent.forkjoin.ForkJoinPool._
|
|
|
|
|
import concurrent.forkjoin.{ ForkJoinTask, ForkJoinWorkerThread, ForkJoinPool }
|
|
|
|
|
import concurrent.forkjoin.ForkJoinTask._
|
2010-10-22 17:50:48 +02:00
|
|
|
|
|
|
|
|
object ThreadPoolConfig {
|
|
|
|
|
type Bounds = Int
|
2011-11-15 10:25:05 +01:00
|
|
|
type FlowHandler = Either[SaneRejectedExecutionHandler, Bounds]
|
2011-05-18 17:25:30 +02:00
|
|
|
type QueueFactory = () ⇒ BlockingQueue[Runnable]
|
2010-10-22 17:50:48 +02:00
|
|
|
|
|
|
|
|
val defaultAllowCoreThreadTimeout: Boolean = false
|
2011-05-18 17:25:30 +02:00
|
|
|
val defaultCorePoolSize: Int = 16
|
|
|
|
|
val defaultMaxPoolSize: Int = 128
|
|
|
|
|
val defaultTimeout: Duration = Duration(60000L, TimeUnit.MILLISECONDS)
|
2010-10-22 17:50:48 +02:00
|
|
|
|
|
|
|
|
def fixedPoolSize(size: Int): Int = size
|
|
|
|
|
def scaledPoolSize(multiplier: Double): Int =
|
|
|
|
|
(Runtime.getRuntime.availableProcessors * multiplier).ceil.toInt
|
|
|
|
|
|
|
|
|
|
def arrayBlockingQueue(capacity: Int, fair: Boolean): QueueFactory =
|
2011-05-18 17:25:30 +02:00
|
|
|
() ⇒ new ArrayBlockingQueue[Runnable](capacity, fair)
|
2010-10-22 17:50:48 +02:00
|
|
|
|
|
|
|
|
def synchronousQueue(fair: Boolean): QueueFactory =
|
2011-05-18 17:25:30 +02:00
|
|
|
() ⇒ new SynchronousQueue[Runnable](fair)
|
2010-10-22 17:50:48 +02:00
|
|
|
|
|
|
|
|
def linkedBlockingQueue(): QueueFactory =
|
2011-05-18 17:25:30 +02:00
|
|
|
() ⇒ new LinkedBlockingQueue[Runnable]()
|
2010-10-22 17:50:48 +02:00
|
|
|
|
|
|
|
|
def linkedBlockingQueue(capacity: Int): QueueFactory =
|
2011-05-18 17:25:30 +02:00
|
|
|
() ⇒ new LinkedBlockingQueue[Runnable](capacity)
|
2010-10-22 17:50:48 +02:00
|
|
|
|
|
|
|
|
def reusableQueue(queue: BlockingQueue[Runnable]): QueueFactory =
|
2011-05-18 17:25:30 +02:00
|
|
|
() ⇒ queue
|
2010-10-22 17:50:48 +02:00
|
|
|
|
|
|
|
|
def reusableQueue(queueFactory: QueueFactory): QueueFactory = {
|
|
|
|
|
val queue = queueFactory()
|
2011-05-18 17:25:30 +02:00
|
|
|
() ⇒ queue
|
2010-10-22 17:50:48 +02:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2011-10-12 18:21:12 +02:00
|
|
|
/**
|
|
|
|
|
* Function0 without the fun stuff (mostly for the sake of the Java API side of things)
|
|
|
|
|
*/
|
2011-07-01 22:13:56 +02:00
|
|
|
trait ExecutorServiceFactory {
|
|
|
|
|
def createExecutorService: ExecutorService
|
|
|
|
|
}
|
|
|
|
|
|
2011-10-12 18:21:12 +02:00
|
|
|
/**
|
|
|
|
|
* Generic way to specify an ExecutorService to a Dispatcher, create it with the given name if desired
|
|
|
|
|
*/
|
2011-07-01 22:13:56 +02:00
|
|
|
trait ExecutorServiceFactoryProvider {
|
|
|
|
|
def createExecutorServiceFactory(name: String): ExecutorServiceFactory
|
|
|
|
|
}
|
|
|
|
|
|
2011-10-12 18:21:12 +02:00
|
|
|
/**
|
|
|
|
|
* A small configuration DSL to create ThreadPoolExecutors that can be provided as an ExecutorServiceFactoryProvider to Dispatcher
|
|
|
|
|
*/
|
2011-11-17 16:09:18 +01:00
|
|
|
case class ThreadPoolConfig(allowCorePoolTimeout: Boolean = ThreadPoolConfig.defaultAllowCoreThreadTimeout,
|
2011-05-18 17:25:30 +02:00
|
|
|
corePoolSize: Int = ThreadPoolConfig.defaultCorePoolSize,
|
|
|
|
|
maxPoolSize: Int = ThreadPoolConfig.defaultMaxPoolSize,
|
|
|
|
|
threadTimeout: Duration = ThreadPoolConfig.defaultTimeout,
|
2011-07-01 22:13:56 +02:00
|
|
|
queueFactory: ThreadPoolConfig.QueueFactory = ThreadPoolConfig.linkedBlockingQueue())
|
|
|
|
|
extends ExecutorServiceFactoryProvider {
|
|
|
|
|
final def createExecutorServiceFactory(name: String): ExecutorServiceFactory = new ExecutorServiceFactory {
|
|
|
|
|
val threadFactory = new MonitorableThreadFactory(name)
|
2011-11-15 10:25:05 +01:00
|
|
|
def createExecutorService: ExecutorService = {
|
|
|
|
|
val service = new ThreadPoolExecutor(corePoolSize, maxPoolSize, threadTimeout.length, threadTimeout.unit, queueFactory(), threadFactory, new SaneRejectedExecutionHandler)
|
|
|
|
|
service.allowCoreThreadTimeOut(allowCorePoolTimeout)
|
|
|
|
|
service
|
2010-10-22 17:50:48 +02:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
trait DispatcherBuilder {
|
|
|
|
|
def build: MessageDispatcher
|
|
|
|
|
}
|
|
|
|
|
|
2010-10-24 00:36:56 +02:00
|
|
|
object ThreadPoolConfigDispatcherBuilder {
|
2011-05-18 17:25:30 +02:00
|
|
|
def conf_?[T](opt: Option[T])(fun: (T) ⇒ ThreadPoolConfigDispatcherBuilder ⇒ ThreadPoolConfigDispatcherBuilder): Option[(ThreadPoolConfigDispatcherBuilder) ⇒ ThreadPoolConfigDispatcherBuilder] = opt map fun
|
2010-10-24 00:36:56 +02:00
|
|
|
}
|
|
|
|
|
|
2011-10-12 18:21:12 +02:00
|
|
|
/**
|
|
|
|
|
* A DSL to configure and create a MessageDispatcher with a ThreadPoolExecutor
|
|
|
|
|
*/
|
2011-05-18 17:25:30 +02:00
|
|
|
case class ThreadPoolConfigDispatcherBuilder(dispatcherFactory: (ThreadPoolConfig) ⇒ MessageDispatcher, config: ThreadPoolConfig) extends DispatcherBuilder {
|
2010-10-22 17:50:48 +02:00
|
|
|
import ThreadPoolConfig._
|
|
|
|
|
def build = dispatcherFactory(config)
|
|
|
|
|
|
|
|
|
|
def withNewThreadPoolWithCustomBlockingQueue(newQueueFactory: QueueFactory): ThreadPoolConfigDispatcherBuilder =
|
2011-11-15 10:25:05 +01:00
|
|
|
this.copy(config = config.copy(queueFactory = newQueueFactory))
|
2010-10-22 17:50:48 +02:00
|
|
|
|
2011-05-18 17:25:30 +02:00
|
|
|
def withNewThreadPoolWithCustomBlockingQueue(queue: BlockingQueue[Runnable]): ThreadPoolConfigDispatcherBuilder =
|
|
|
|
|
withNewThreadPoolWithCustomBlockingQueue(reusableQueue(queue))
|
2010-10-22 17:50:48 +02:00
|
|
|
|
|
|
|
|
def withNewThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity: ThreadPoolConfigDispatcherBuilder =
|
2011-11-15 10:25:05 +01:00
|
|
|
this.copy(config = config.copy(queueFactory = linkedBlockingQueue()))
|
2010-10-22 17:50:48 +02:00
|
|
|
|
|
|
|
|
def withNewThreadPoolWithLinkedBlockingQueueWithCapacity(capacity: Int): ThreadPoolConfigDispatcherBuilder =
|
2011-11-15 10:25:05 +01:00
|
|
|
this.copy(config = config.copy(queueFactory = linkedBlockingQueue(capacity)))
|
2010-10-22 17:50:48 +02:00
|
|
|
|
|
|
|
|
def withNewThreadPoolWithSynchronousQueueWithFairness(fair: Boolean): ThreadPoolConfigDispatcherBuilder =
|
2011-11-15 10:25:05 +01:00
|
|
|
this.copy(config = config.copy(queueFactory = synchronousQueue(fair)))
|
2010-10-22 17:50:48 +02:00
|
|
|
|
|
|
|
|
def withNewThreadPoolWithArrayBlockingQueueWithCapacityAndFairness(capacity: Int, fair: Boolean): ThreadPoolConfigDispatcherBuilder =
|
2011-11-15 10:25:05 +01:00
|
|
|
this.copy(config = config.copy(queueFactory = arrayBlockingQueue(capacity, fair)))
|
2010-10-22 17:50:48 +02:00
|
|
|
|
|
|
|
|
def setCorePoolSize(size: Int): ThreadPoolConfigDispatcherBuilder =
|
|
|
|
|
this.copy(config = config.copy(corePoolSize = size))
|
|
|
|
|
|
|
|
|
|
def setMaxPoolSize(size: Int): ThreadPoolConfigDispatcherBuilder =
|
|
|
|
|
this.copy(config = config.copy(maxPoolSize = size))
|
|
|
|
|
|
|
|
|
|
def setCorePoolSizeFromFactor(multiplier: Double): ThreadPoolConfigDispatcherBuilder =
|
|
|
|
|
setCorePoolSize(scaledPoolSize(multiplier))
|
|
|
|
|
|
|
|
|
|
def setMaxPoolSizeFromFactor(multiplier: Double): ThreadPoolConfigDispatcherBuilder =
|
|
|
|
|
setMaxPoolSize(scaledPoolSize(multiplier))
|
|
|
|
|
|
|
|
|
|
def setKeepAliveTimeInMillis(time: Long): ThreadPoolConfigDispatcherBuilder =
|
2011-05-18 17:25:30 +02:00
|
|
|
setKeepAliveTime(Duration(time, TimeUnit.MILLISECONDS))
|
2010-10-22 17:50:48 +02:00
|
|
|
|
|
|
|
|
def setKeepAliveTime(time: Duration): ThreadPoolConfigDispatcherBuilder =
|
|
|
|
|
this.copy(config = config.copy(threadTimeout = time))
|
|
|
|
|
|
|
|
|
|
def setAllowCoreThreadTimeout(allow: Boolean): ThreadPoolConfigDispatcherBuilder =
|
|
|
|
|
this.copy(config = config.copy(allowCorePoolTimeout = allow))
|
2009-12-11 16:37:44 +01:00
|
|
|
|
2011-08-24 12:41:21 +02:00
|
|
|
def setQueueFactory(newQueueFactory: QueueFactory): ThreadPoolConfigDispatcherBuilder =
|
|
|
|
|
this.copy(config = config.copy(queueFactory = newQueueFactory))
|
|
|
|
|
|
2011-05-18 17:25:30 +02:00
|
|
|
def configure(fs: Option[Function[ThreadPoolConfigDispatcherBuilder, ThreadPoolConfigDispatcherBuilder]]*): ThreadPoolConfigDispatcherBuilder = fs.foldLeft(this)((c, f) ⇒ f.map(_(c)).getOrElse(c))
|
2010-10-22 17:50:48 +02:00
|
|
|
}
|
2010-05-21 20:08:49 +02:00
|
|
|
|
2010-10-22 17:50:48 +02:00
|
|
|
/**
|
|
|
|
|
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
|
|
|
|
*/
|
2011-07-11 18:54:15 +02:00
|
|
|
class MonitorableThreadFactory(val name: String, val daemonic: Boolean = false) extends ThreadFactory {
|
2010-10-22 17:50:48 +02:00
|
|
|
protected val counter = new AtomicLong
|
2009-12-11 16:37:44 +01:00
|
|
|
|
2011-07-11 18:54:15 +02:00
|
|
|
def newThread(runnable: Runnable) = {
|
|
|
|
|
val t = new MonitorableThread(runnable, name)
|
|
|
|
|
t.setDaemon(daemonic)
|
|
|
|
|
t
|
|
|
|
|
}
|
2010-10-22 17:50:48 +02:00
|
|
|
}
|
2009-12-11 16:37:44 +01:00
|
|
|
|
2010-10-22 17:50:48 +02:00
|
|
|
/**
|
|
|
|
|
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
|
|
|
|
*/
|
|
|
|
|
object MonitorableThread {
|
2011-06-07 20:10:08 -07:00
|
|
|
val DEFAULT_NAME = "MonitorableThread".intern
|
2009-12-11 16:37:44 +01:00
|
|
|
|
2011-03-30 15:12:58 +02:00
|
|
|
// FIXME use MonitorableThread.created and MonitorableThread.alive in monitoring
|
2011-05-18 17:25:30 +02:00
|
|
|
val created = new AtomicInteger
|
|
|
|
|
val alive = new AtomicInteger
|
2011-03-30 15:12:58 +02:00
|
|
|
}
|
2009-12-11 16:37:44 +01:00
|
|
|
|
2010-10-22 17:50:48 +02:00
|
|
|
/**
|
|
|
|
|
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
|
|
|
|
*/
|
|
|
|
|
class MonitorableThread(runnable: Runnable, name: String)
|
2011-05-18 17:25:30 +02:00
|
|
|
extends Thread(runnable, name + "-" + MonitorableThread.created.incrementAndGet) {
|
2010-10-22 17:50:48 +02:00
|
|
|
|
|
|
|
|
setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
|
2011-02-28 22:54:32 +01:00
|
|
|
def uncaughtException(thread: Thread, cause: Throwable) = {}
|
2010-10-22 17:50:48 +02:00
|
|
|
})
|
|
|
|
|
|
|
|
|
|
override def run = {
|
|
|
|
|
try {
|
|
|
|
|
MonitorableThread.alive.incrementAndGet
|
|
|
|
|
super.run
|
|
|
|
|
} finally {
|
|
|
|
|
MonitorableThread.alive.decrementAndGet
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
2009-12-11 16:37:44 +01:00
|
|
|
|
2011-10-12 18:21:12 +02:00
|
|
|
/**
|
|
|
|
|
* As the name says
|
|
|
|
|
*/
|
2011-02-28 22:54:32 +01:00
|
|
|
trait ExecutorServiceDelegate extends ExecutorService {
|
2010-10-24 00:36:56 +02:00
|
|
|
|
|
|
|
|
def executor: ExecutorService
|
|
|
|
|
|
|
|
|
|
def execute(command: Runnable) = executor.execute(command)
|
2009-12-11 16:37:44 +01:00
|
|
|
|
2011-04-29 10:20:16 +02:00
|
|
|
def shutdown() { executor.shutdown() }
|
2009-12-11 16:37:44 +01:00
|
|
|
|
2011-04-29 10:20:16 +02:00
|
|
|
def shutdownNow() = executor.shutdownNow()
|
2009-12-11 16:37:44 +01:00
|
|
|
|
2010-10-22 17:50:48 +02:00
|
|
|
def isShutdown = executor.isShutdown
|
2009-12-11 16:37:44 +01:00
|
|
|
|
2010-10-22 17:50:48 +02:00
|
|
|
def isTerminated = executor.isTerminated
|
2009-12-11 16:37:44 +01:00
|
|
|
|
2010-10-22 17:50:48 +02:00
|
|
|
def awaitTermination(l: Long, timeUnit: TimeUnit) = executor.awaitTermination(l, timeUnit)
|
2009-12-11 16:37:44 +01:00
|
|
|
|
2010-10-22 17:50:48 +02:00
|
|
|
def submit[T](callable: Callable[T]) = executor.submit(callable)
|
2009-12-11 16:37:44 +01:00
|
|
|
|
2010-10-22 17:50:48 +02:00
|
|
|
def submit[T](runnable: Runnable, t: T) = executor.submit(runnable, t)
|
2009-12-11 16:37:44 +01:00
|
|
|
|
2010-10-22 17:50:48 +02:00
|
|
|
def submit(runnable: Runnable) = executor.submit(runnable)
|
2009-12-11 16:37:44 +01:00
|
|
|
|
2010-10-22 17:50:48 +02:00
|
|
|
def invokeAll[T](callables: Collection[_ <: Callable[T]]) = executor.invokeAll(callables)
|
2009-12-11 16:37:44 +01:00
|
|
|
|
2010-10-22 17:50:48 +02:00
|
|
|
def invokeAll[T](callables: Collection[_ <: Callable[T]], l: Long, timeUnit: TimeUnit) = executor.invokeAll(callables, l, timeUnit)
|
2009-12-11 16:37:44 +01:00
|
|
|
|
2010-10-22 17:50:48 +02:00
|
|
|
def invokeAny[T](callables: Collection[_ <: Callable[T]]) = executor.invokeAny(callables)
|
|
|
|
|
|
|
|
|
|
def invokeAny[T](callables: Collection[_ <: Callable[T]], l: Long, timeUnit: TimeUnit) = executor.invokeAny(callables, l, timeUnit)
|
2010-05-21 20:08:49 +02:00
|
|
|
}
|
2011-11-15 10:25:05 +01:00
|
|
|
|
|
|
|
|
class SaneRejectedExecutionHandler extends RejectedExecutionHandler {
|
|
|
|
|
def rejectedExecution(runnable: Runnable, threadPoolExecutor: ThreadPoolExecutor): Unit = {
|
|
|
|
|
if (threadPoolExecutor.isShutdown) throw new RejectedExecutionException("Shutdown")
|
|
|
|
|
else runnable.run()
|
|
|
|
|
}
|
|
|
|
|
}
|
2011-12-02 17:29:47 +01:00
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Commented out pending discussion with Doug Lea
|
|
|
|
|
*
|
|
|
|
|
* case class ForkJoinPoolConfig(targetParallelism: Int = Runtime.getRuntime.availableProcessors()) extends ExecutorServiceFactoryProvider {
|
|
|
|
|
* final def createExecutorServiceFactory(name: String): ExecutorServiceFactory = new ExecutorServiceFactory {
|
|
|
|
|
* def createExecutorService: ExecutorService = {
|
|
|
|
|
* new ForkJoinPool(targetParallelism) with ExecutorService {
|
|
|
|
|
* setAsyncMode(true)
|
|
|
|
|
* setMaintainsParallelism(true)
|
|
|
|
|
*
|
|
|
|
|
* override final def execute(r: Runnable) {
|
|
|
|
|
* r match {
|
|
|
|
|
* case fjmbox: FJMailbox ⇒
|
|
|
|
|
* //fjmbox.fjTask.reinitialize()
|
|
|
|
|
* Thread.currentThread match {
|
|
|
|
|
* case fjwt: ForkJoinWorkerThread if fjwt.getPool eq this ⇒
|
|
|
|
|
* fjmbox.fjTask.fork() //We should do fjwt.pushTask(fjmbox.fjTask) but it's package protected
|
|
|
|
|
* case _ ⇒ super.execute[Unit](fjmbox.fjTask)
|
|
|
|
|
* }
|
|
|
|
|
* case _ ⇒
|
|
|
|
|
* super.execute(r)
|
|
|
|
|
* }
|
|
|
|
|
* }
|
|
|
|
|
*
|
|
|
|
|
* import java.util.{ Collection ⇒ JCollection }
|
|
|
|
|
*
|
|
|
|
|
* def invokeAny[T](callables: JCollection[_ <: Callable[T]]) =
|
|
|
|
|
* throw new UnsupportedOperationException("invokeAny. NOT!")
|
|
|
|
|
*
|
|
|
|
|
* def invokeAny[T](callables: JCollection[_ <: Callable[T]], l: Long, timeUnit: TimeUnit) =
|
|
|
|
|
* throw new UnsupportedOperationException("invokeAny. NOT!")
|
|
|
|
|
*
|
|
|
|
|
* def invokeAll[T](callables: JCollection[_ <: Callable[T]], l: Long, timeUnit: TimeUnit) =
|
|
|
|
|
* throw new UnsupportedOperationException("invokeAny. NOT!")
|
|
|
|
|
* }
|
|
|
|
|
* }
|
|
|
|
|
* }
|
|
|
|
|
* }
|
|
|
|
|
*
|
|
|
|
|
* trait FJMailbox { self: Mailbox ⇒
|
|
|
|
|
* final val fjTask = new ForkJoinTask[Unit] with Runnable {
|
|
|
|
|
* private[this] var result: Unit = ()
|
|
|
|
|
* final def getRawResult() = result
|
|
|
|
|
* final def setRawResult(v: Unit) { result = v }
|
|
|
|
|
* final def exec() = { self.run(); true }
|
|
|
|
|
* final def run() { invoke() }
|
|
|
|
|
* }
|
|
|
|
|
* }
|
|
|
|
|
*
|
|
|
|
|
*/
|