2009-12-11 16:37:44 +01:00
|
|
|
/**
|
2009-12-27 16:01:53 +01:00
|
|
|
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
|
2009-12-11 16:37:44 +01:00
|
|
|
*/
|
|
|
|
|
|
|
|
|
|
package se.scalablesolutions.akka.dispatch
|
|
|
|
|
|
2010-03-01 22:03:17 +01:00
|
|
|
import java.util.Collection
|
2009-12-11 16:37:44 +01:00
|
|
|
import java.util.concurrent._
|
|
|
|
|
import atomic.{AtomicLong, AtomicInteger}
|
|
|
|
|
import ThreadPoolExecutor.CallerRunsPolicy
|
|
|
|
|
|
2010-07-02 11:14:49 +02:00
|
|
|
import se.scalablesolutions.akka.actor.IllegalActorStateException
|
2010-10-22 17:50:48 +02:00
|
|
|
import se.scalablesolutions.akka.util. {Duration, Logger, Logging}
|
|
|
|
|
|
|
|
|
|
object ThreadPoolConfig {
|
|
|
|
|
type Bounds = Int
|
|
|
|
|
type FlowHandler = Either[RejectedExecutionHandler,Bounds]
|
|
|
|
|
type QueueFactory = () => BlockingQueue[Runnable]
|
|
|
|
|
|
|
|
|
|
val defaultAllowCoreThreadTimeout: Boolean = false
|
|
|
|
|
val defaultCorePoolSize: Int = 16
|
|
|
|
|
val defaultMaxPoolSize: Int = 128
|
|
|
|
|
val defaultTimeout: Duration = Duration(60000L,TimeUnit.MILLISECONDS)
|
|
|
|
|
def defaultFlowHandler: FlowHandler = flowHandler(new CallerRunsPolicy)
|
|
|
|
|
|
|
|
|
|
def flowHandler(rejectionHandler: RejectedExecutionHandler): FlowHandler = Left(rejectionHandler)
|
|
|
|
|
def flowHandler(bounds: Int): FlowHandler = Right(bounds)
|
|
|
|
|
|
|
|
|
|
def fixedPoolSize(size: Int): Int = size
|
|
|
|
|
def scaledPoolSize(multiplier: Double): Int =
|
|
|
|
|
(Runtime.getRuntime.availableProcessors * multiplier).ceil.toInt
|
|
|
|
|
|
|
|
|
|
def arrayBlockingQueue(capacity: Int, fair: Boolean): QueueFactory =
|
|
|
|
|
() => new ArrayBlockingQueue[Runnable](capacity,fair)
|
|
|
|
|
|
|
|
|
|
def synchronousQueue(fair: Boolean): QueueFactory =
|
|
|
|
|
() => new SynchronousQueue[Runnable](fair)
|
|
|
|
|
|
|
|
|
|
def linkedBlockingQueue(): QueueFactory =
|
|
|
|
|
() => new LinkedBlockingQueue[Runnable]()
|
|
|
|
|
|
|
|
|
|
def linkedBlockingQueue(capacity: Int): QueueFactory =
|
|
|
|
|
() => new LinkedBlockingQueue[Runnable](capacity)
|
|
|
|
|
|
|
|
|
|
def reusableQueue(queue: BlockingQueue[Runnable]): QueueFactory =
|
|
|
|
|
() => queue
|
|
|
|
|
|
|
|
|
|
def reusableQueue(queueFactory: QueueFactory): QueueFactory = {
|
|
|
|
|
val queue = queueFactory()
|
|
|
|
|
() => queue
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
case class ThreadPoolConfig(allowCorePoolTimeout: Boolean = ThreadPoolConfig.defaultAllowCoreThreadTimeout,
|
|
|
|
|
corePoolSize: Int = ThreadPoolConfig.defaultCorePoolSize,
|
|
|
|
|
maxPoolSize: Int = ThreadPoolConfig.defaultMaxPoolSize,
|
|
|
|
|
threadTimeout: Duration = ThreadPoolConfig.defaultTimeout,
|
|
|
|
|
flowHandler: ThreadPoolConfig.FlowHandler = ThreadPoolConfig.defaultFlowHandler,
|
|
|
|
|
queueFactory: ThreadPoolConfig.QueueFactory = ThreadPoolConfig.linkedBlockingQueue()) {
|
|
|
|
|
|
|
|
|
|
final def createExecutorService(threadFactory: ThreadFactory): ExecutorService = {
|
|
|
|
|
flowHandler match {
|
|
|
|
|
case Left(rejectHandler) =>
|
|
|
|
|
val service = new ThreadPoolExecutor(corePoolSize, maxPoolSize, threadTimeout.length, threadTimeout.unit, queueFactory(), threadFactory, rejectHandler)
|
|
|
|
|
service.allowCoreThreadTimeOut(allowCorePoolTimeout)
|
|
|
|
|
service
|
|
|
|
|
case Right(bounds) =>
|
|
|
|
|
val service = new ThreadPoolExecutor(corePoolSize, maxPoolSize, threadTimeout.length, threadTimeout.unit, queueFactory(), threadFactory)
|
|
|
|
|
service.allowCoreThreadTimeOut(allowCorePoolTimeout)
|
|
|
|
|
new BoundedExecutorDecorator(service,bounds)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
trait DispatcherBuilder {
|
|
|
|
|
def build: MessageDispatcher
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
case class ThreadPoolConfigDispatcherBuilder(dispatcherFactory: (ThreadPoolConfig) => MessageDispatcher, config: ThreadPoolConfig) extends DispatcherBuilder {
|
|
|
|
|
import ThreadPoolConfig._
|
|
|
|
|
def build = dispatcherFactory(config)
|
|
|
|
|
|
|
|
|
|
//TODO remove this, for backwards compat only
|
|
|
|
|
def buildThreadPool = build
|
|
|
|
|
|
|
|
|
|
def withNewBoundedThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity(bounds: Int): ThreadPoolConfigDispatcherBuilder =
|
|
|
|
|
this.copy(config = config.copy(flowHandler = flowHandler(bounds), queueFactory = linkedBlockingQueue()))
|
|
|
|
|
|
|
|
|
|
def withNewThreadPoolWithCustomBlockingQueue(newQueueFactory: QueueFactory): ThreadPoolConfigDispatcherBuilder =
|
|
|
|
|
this.copy(config = config.copy(flowHandler = defaultFlowHandler, queueFactory = newQueueFactory))
|
|
|
|
|
|
|
|
|
|
def withNewThreadPoolWithCustomBlockingQueue(queue: BlockingQueue[Runnable]): ThreadPoolConfigDispatcherBuilder =
|
|
|
|
|
withNewThreadPoolWithCustomBlockingQueue(reusableQueue(queue))
|
|
|
|
|
|
|
|
|
|
def withNewThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity: ThreadPoolConfigDispatcherBuilder =
|
|
|
|
|
this.copy(config = config.copy(queueFactory = linkedBlockingQueue(), flowHandler = defaultFlowHandler))
|
|
|
|
|
|
|
|
|
|
def withNewThreadPoolWithLinkedBlockingQueueWithCapacity(capacity: Int): ThreadPoolConfigDispatcherBuilder =
|
|
|
|
|
this.copy(config = config.copy(queueFactory = linkedBlockingQueue(capacity), flowHandler = defaultFlowHandler))
|
|
|
|
|
|
|
|
|
|
def withNewThreadPoolWithSynchronousQueueWithFairness(fair: Boolean): ThreadPoolConfigDispatcherBuilder =
|
|
|
|
|
this.copy(config = config.copy(queueFactory = synchronousQueue(fair), flowHandler = defaultFlowHandler))
|
|
|
|
|
|
|
|
|
|
def withNewThreadPoolWithArrayBlockingQueueWithCapacityAndFairness(capacity: Int, fair: Boolean): ThreadPoolConfigDispatcherBuilder =
|
|
|
|
|
this.copy(config = config.copy(queueFactory = arrayBlockingQueue(capacity,fair), flowHandler = defaultFlowHandler))
|
|
|
|
|
|
|
|
|
|
def setCorePoolSize(size: Int): ThreadPoolConfigDispatcherBuilder =
|
|
|
|
|
this.copy(config = config.copy(corePoolSize = size))
|
|
|
|
|
|
|
|
|
|
def setMaxPoolSize(size: Int): ThreadPoolConfigDispatcherBuilder =
|
|
|
|
|
this.copy(config = config.copy(maxPoolSize = size))
|
|
|
|
|
|
|
|
|
|
def setCorePoolSizeFromFactor(multiplier: Double): ThreadPoolConfigDispatcherBuilder =
|
|
|
|
|
setCorePoolSize(scaledPoolSize(multiplier))
|
|
|
|
|
|
|
|
|
|
def setMaxPoolSizeFromFactor(multiplier: Double): ThreadPoolConfigDispatcherBuilder =
|
|
|
|
|
setMaxPoolSize(scaledPoolSize(multiplier))
|
|
|
|
|
|
|
|
|
|
def setExecutorBounds(bounds: Int): ThreadPoolConfigDispatcherBuilder =
|
|
|
|
|
this.copy(config = config.copy(flowHandler = flowHandler(bounds)))
|
|
|
|
|
|
|
|
|
|
def setKeepAliveTimeInMillis(time: Long): ThreadPoolConfigDispatcherBuilder =
|
|
|
|
|
setKeepAliveTime(Duration(time,TimeUnit.MILLISECONDS))
|
|
|
|
|
|
|
|
|
|
def setKeepAliveTime(time: Duration): ThreadPoolConfigDispatcherBuilder =
|
|
|
|
|
this.copy(config = config.copy(threadTimeout = time))
|
|
|
|
|
|
|
|
|
|
def setRejectionPolicy(policy: RejectedExecutionHandler): ThreadPoolConfigDispatcherBuilder =
|
|
|
|
|
setFlowHandler(flowHandler(policy))
|
|
|
|
|
|
|
|
|
|
def setFlowHandler(newFlowHandler: FlowHandler): ThreadPoolConfigDispatcherBuilder =
|
|
|
|
|
this.copy(config = config.copy(flowHandler = newFlowHandler))
|
|
|
|
|
|
|
|
|
|
def setAllowCoreThreadTimeout(allow: Boolean): ThreadPoolConfigDispatcherBuilder =
|
|
|
|
|
this.copy(config = config.copy(allowCorePoolTimeout = allow))
|
|
|
|
|
}
|
|
|
|
|
|
2009-12-11 16:37:44 +01:00
|
|
|
|
2010-08-13 14:43:02 +02:00
|
|
|
trait ThreadPoolBuilder extends Logging {
|
2009-12-11 16:37:44 +01:00
|
|
|
val name: String
|
2009-12-15 23:25:10 +01:00
|
|
|
|
2010-05-16 10:59:06 +02:00
|
|
|
private val NR_START_THREADS = 16
|
2009-12-11 16:37:44 +01:00
|
|
|
private val NR_MAX_THREADS = 128
|
|
|
|
|
private val KEEP_ALIVE_TIME = 60000L // default is one minute
|
|
|
|
|
private val MILLISECONDS = TimeUnit.MILLISECONDS
|
2010-05-21 20:08:49 +02:00
|
|
|
|
2009-12-11 16:37:44 +01:00
|
|
|
private var threadPoolBuilder: ThreadPoolExecutor = _
|
|
|
|
|
private var boundedExecutorBound = -1
|
2010-08-21 10:45:00 +02:00
|
|
|
protected var mailboxCapacity = -1
|
2010-08-13 13:13:27 +02:00
|
|
|
@volatile private var inProcessOfBuilding = false
|
2009-12-11 16:37:44 +01:00
|
|
|
private var blockingQueue: BlockingQueue[Runnable] = _
|
|
|
|
|
|
2010-10-22 17:50:48 +02:00
|
|
|
protected lazy val threadFactory = new MonitorableThreadFactory(name)
|
2009-12-15 23:25:10 +01:00
|
|
|
|
2010-10-22 17:50:48 +02:00
|
|
|
@volatile var executor: ExecutorService = _
|
2010-09-27 11:59:28 +02:00
|
|
|
|
2010-01-02 08:40:09 +01:00
|
|
|
def isShutdown = executor.isShutdown
|
|
|
|
|
|
2010-10-22 17:50:48 +02:00
|
|
|
def buildThreadPool(): ExecutorService = synchronized {
|
2009-12-11 16:37:44 +01:00
|
|
|
ensureNotActive
|
|
|
|
|
inProcessOfBuilding = false
|
2010-08-13 14:43:02 +02:00
|
|
|
|
|
|
|
|
log.debug("Creating a %s with config [core-pool:%d,max-pool:%d,timeout:%d,allowCoreTimeout:%s,rejectPolicy:%s]",
|
|
|
|
|
getClass.getName,
|
|
|
|
|
threadPoolBuilder.getCorePoolSize,
|
|
|
|
|
threadPoolBuilder.getMaximumPoolSize,
|
|
|
|
|
threadPoolBuilder.getKeepAliveTime(MILLISECONDS),
|
|
|
|
|
threadPoolBuilder.allowsCoreThreadTimeOut,
|
|
|
|
|
threadPoolBuilder.getRejectedExecutionHandler.getClass.getSimpleName)
|
|
|
|
|
|
2009-12-11 16:37:44 +01:00
|
|
|
if (boundedExecutorBound > 0) {
|
|
|
|
|
val boundedExecutor = new BoundedExecutorDecorator(threadPoolBuilder, boundedExecutorBound)
|
2010-09-07 10:12:26 +02:00
|
|
|
boundedExecutorBound = -1 //Why is this here?
|
2009-12-11 16:37:44 +01:00
|
|
|
executor = boundedExecutor
|
|
|
|
|
} else {
|
|
|
|
|
executor = threadPoolBuilder
|
|
|
|
|
}
|
2010-10-22 17:50:48 +02:00
|
|
|
executor
|
2009-12-11 16:37:44 +01:00
|
|
|
}
|
|
|
|
|
|
2009-12-15 23:25:10 +01:00
|
|
|
def withNewThreadPoolWithCustomBlockingQueue(queue: BlockingQueue[Runnable]): ThreadPoolBuilder = synchronized {
|
2009-12-11 16:37:44 +01:00
|
|
|
ensureNotActive
|
|
|
|
|
verifyNotInConstructionPhase
|
|
|
|
|
blockingQueue = queue
|
|
|
|
|
threadPoolBuilder = new ThreadPoolExecutor(NR_START_THREADS, NR_MAX_THREADS, KEEP_ALIVE_TIME, MILLISECONDS, queue)
|
|
|
|
|
this
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
2009-12-13 12:29:18 +01:00
|
|
|
* Creates a new thread pool in which the number of tasks in the pending queue is bounded. Will block when exceeded.
|
2009-12-11 16:37:44 +01:00
|
|
|
* <p/>
|
|
|
|
|
* The 'bound' variable should specify the number equal to the size of the thread pool PLUS the number of queued tasks that should be followed.
|
|
|
|
|
*/
|
2009-12-15 23:25:10 +01:00
|
|
|
def withNewBoundedThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity(bound: Int): ThreadPoolBuilder = synchronized {
|
2009-12-11 16:37:44 +01:00
|
|
|
ensureNotActive
|
|
|
|
|
verifyNotInConstructionPhase
|
|
|
|
|
blockingQueue = new LinkedBlockingQueue[Runnable]
|
|
|
|
|
threadPoolBuilder = new ThreadPoolExecutor(NR_START_THREADS, NR_MAX_THREADS, KEEP_ALIVE_TIME, MILLISECONDS, blockingQueue, threadFactory)
|
|
|
|
|
boundedExecutorBound = bound
|
|
|
|
|
this
|
|
|
|
|
}
|
|
|
|
|
|
2009-12-15 23:25:10 +01:00
|
|
|
def withNewThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity: ThreadPoolBuilder = synchronized {
|
2009-12-11 16:37:44 +01:00
|
|
|
ensureNotActive
|
|
|
|
|
verifyNotInConstructionPhase
|
2009-12-15 23:25:10 +01:00
|
|
|
blockingQueue = new LinkedBlockingQueue[Runnable]
|
2009-12-11 16:37:44 +01:00
|
|
|
threadPoolBuilder = new ThreadPoolExecutor(
|
|
|
|
|
NR_START_THREADS, NR_MAX_THREADS, KEEP_ALIVE_TIME, MILLISECONDS, blockingQueue, threadFactory, new CallerRunsPolicy)
|
|
|
|
|
this
|
|
|
|
|
}
|
|
|
|
|
|
2009-12-15 23:25:10 +01:00
|
|
|
def withNewThreadPoolWithLinkedBlockingQueueWithCapacity(capacity: Int): ThreadPoolBuilder = synchronized {
|
2009-12-11 16:37:44 +01:00
|
|
|
ensureNotActive
|
|
|
|
|
verifyNotInConstructionPhase
|
2009-12-15 23:25:10 +01:00
|
|
|
blockingQueue = new LinkedBlockingQueue[Runnable](capacity)
|
2009-12-11 16:37:44 +01:00
|
|
|
threadPoolBuilder = new ThreadPoolExecutor(
|
|
|
|
|
NR_START_THREADS, NR_MAX_THREADS, KEEP_ALIVE_TIME, MILLISECONDS, blockingQueue, threadFactory, new CallerRunsPolicy)
|
|
|
|
|
this
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
def withNewThreadPoolWithSynchronousQueueWithFairness(fair: Boolean): ThreadPoolBuilder = synchronized {
|
|
|
|
|
ensureNotActive
|
|
|
|
|
verifyNotInConstructionPhase
|
|
|
|
|
blockingQueue = new SynchronousQueue[Runnable](fair)
|
|
|
|
|
threadPoolBuilder = new ThreadPoolExecutor(
|
|
|
|
|
NR_START_THREADS, NR_MAX_THREADS, KEEP_ALIVE_TIME, MILLISECONDS, blockingQueue, threadFactory, new CallerRunsPolicy)
|
|
|
|
|
this
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
def withNewThreadPoolWithArrayBlockingQueueWithCapacityAndFairness(capacity: Int, fair: Boolean): ThreadPoolBuilder = synchronized {
|
|
|
|
|
ensureNotActive
|
|
|
|
|
verifyNotInConstructionPhase
|
|
|
|
|
blockingQueue = new ArrayBlockingQueue[Runnable](capacity, fair)
|
|
|
|
|
threadPoolBuilder = new ThreadPoolExecutor(
|
|
|
|
|
NR_START_THREADS, NR_MAX_THREADS, KEEP_ALIVE_TIME, MILLISECONDS, blockingQueue, threadFactory, new CallerRunsPolicy)
|
|
|
|
|
this
|
|
|
|
|
}
|
|
|
|
|
|
2010-08-13 13:13:27 +02:00
|
|
|
def configureIfPossible(f: (ThreadPoolBuilder) => Unit): Boolean = synchronized {
|
|
|
|
|
if(inProcessOfBuilding) {
|
|
|
|
|
f(this)
|
|
|
|
|
true
|
|
|
|
|
}
|
|
|
|
|
else {
|
2010-08-13 14:43:02 +02:00
|
|
|
log.warning("Tried to configure an already started ThreadPoolBuilder of type [%s]",getClass.getName)
|
2010-08-13 13:13:27 +02:00
|
|
|
false
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2009-12-11 16:37:44 +01:00
|
|
|
/**
|
|
|
|
|
* Default is 16.
|
|
|
|
|
*/
|
2010-08-13 13:13:27 +02:00
|
|
|
def setCorePoolSize(size: Int): ThreadPoolBuilder =
|
|
|
|
|
setThreadPoolExecutorProperty(_.setCorePoolSize(size))
|
2009-12-11 16:37:44 +01:00
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Default is 128.
|
|
|
|
|
*/
|
2010-08-13 13:13:27 +02:00
|
|
|
def setMaxPoolSize(size: Int): ThreadPoolBuilder =
|
|
|
|
|
setThreadPoolExecutorProperty(_.setMaximumPoolSize(size))
|
2009-12-11 16:37:44 +01:00
|
|
|
|
2010-08-13 23:06:52 +02:00
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Sets the core pool size to (availableProcessors * multipliers).ceil.toInt
|
|
|
|
|
*/
|
|
|
|
|
def setCorePoolSizeFromFactor(multiplier: Double): ThreadPoolBuilder =
|
|
|
|
|
setThreadPoolExecutorProperty(_.setCorePoolSize(procs(multiplier)))
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Sets the max pool size to (availableProcessors * multipliers).ceil.toInt
|
|
|
|
|
*/
|
|
|
|
|
def setMaxPoolSizeFromFactor(multiplier: Double): ThreadPoolBuilder =
|
|
|
|
|
setThreadPoolExecutorProperty(_.setMaximumPoolSize(procs(multiplier)))
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Sets the bound, -1 is unbounded
|
|
|
|
|
*/
|
|
|
|
|
def setExecutorBounds(bounds: Int): Unit = synchronized {
|
|
|
|
|
this.boundedExecutorBound = bounds
|
|
|
|
|
}
|
2010-08-21 16:13:16 +02:00
|
|
|
|
2010-08-21 10:45:00 +02:00
|
|
|
/**
|
|
|
|
|
* Sets the mailbox capacity, -1 is unbounded
|
|
|
|
|
*/
|
|
|
|
|
def setMailboxCapacity(capacity: Int): Unit = synchronized {
|
|
|
|
|
this.mailboxCapacity = capacity
|
|
|
|
|
}
|
|
|
|
|
|
2010-08-13 23:06:52 +02:00
|
|
|
protected def procs(multiplier: Double): Int =
|
|
|
|
|
(Runtime.getRuntime.availableProcessors * multiplier).ceil.toInt
|
|
|
|
|
|
2009-12-11 16:37:44 +01:00
|
|
|
/**
|
2010-08-13 23:06:52 +02:00
|
|
|
* Default is 60000 (one minute).
|
2009-12-11 16:37:44 +01:00
|
|
|
*/
|
2010-08-13 13:13:27 +02:00
|
|
|
def setKeepAliveTimeInMillis(time: Long): ThreadPoolBuilder =
|
|
|
|
|
setThreadPoolExecutorProperty(_.setKeepAliveTime(time, MILLISECONDS))
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Default ThreadPoolExecutor.CallerRunsPolicy. To allow graceful backing off when pool is overloaded.
|
|
|
|
|
*/
|
|
|
|
|
def setRejectionPolicy(policy: RejectedExecutionHandler): ThreadPoolBuilder =
|
|
|
|
|
setThreadPoolExecutorProperty(_.setRejectedExecutionHandler(policy))
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Default false, set to true to conserve thread for potentially unused dispatchers
|
|
|
|
|
*/
|
|
|
|
|
def setAllowCoreThreadTimeout(allow: Boolean) =
|
|
|
|
|
setThreadPoolExecutorProperty(_.allowCoreThreadTimeOut(allow))
|
2009-12-11 16:37:44 +01:00
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Default ThreadPoolExecutor.CallerRunsPolicy. To allow graceful backing off when pool is overloaded.
|
|
|
|
|
*/
|
2010-08-13 13:13:27 +02:00
|
|
|
protected def setThreadPoolExecutorProperty(f: (ThreadPoolExecutor) => Unit): ThreadPoolBuilder = synchronized {
|
2009-12-11 16:37:44 +01:00
|
|
|
ensureNotActive
|
|
|
|
|
verifyInConstructionPhase
|
2010-08-13 13:13:27 +02:00
|
|
|
f(threadPoolBuilder)
|
2009-12-11 16:37:44 +01:00
|
|
|
this
|
|
|
|
|
}
|
|
|
|
|
|
2010-08-13 13:13:27 +02:00
|
|
|
|
2009-12-11 16:37:44 +01:00
|
|
|
protected def verifyNotInConstructionPhase = {
|
2010-07-02 11:14:49 +02:00
|
|
|
if (inProcessOfBuilding) throw new IllegalActorStateException("Is already in the process of building a thread pool")
|
2009-12-11 16:37:44 +01:00
|
|
|
inProcessOfBuilding = true
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
protected def verifyInConstructionPhase = {
|
2010-07-02 11:14:49 +02:00
|
|
|
if (!inProcessOfBuilding) throw new IllegalActorStateException(
|
2009-12-11 16:37:44 +01:00
|
|
|
"Is not in the process of building a thread pool, start building one by invoking one of the 'newThreadPool*' methods")
|
|
|
|
|
}
|
|
|
|
|
|
2010-07-02 21:57:44 +02:00
|
|
|
def ensureNotActive(): Unit
|
2010-10-22 17:50:48 +02:00
|
|
|
}
|
2010-05-21 20:08:49 +02:00
|
|
|
|
2009-12-11 16:37:44 +01:00
|
|
|
|
2010-10-22 17:50:48 +02:00
|
|
|
/**
|
|
|
|
|
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
|
|
|
|
*/
|
|
|
|
|
class MonitorableThreadFactory(val name: String) extends ThreadFactory {
|
|
|
|
|
protected val counter = new AtomicLong
|
2009-12-11 16:37:44 +01:00
|
|
|
|
2010-10-22 17:50:48 +02:00
|
|
|
def newThread(runnable: Runnable) =
|
|
|
|
|
new MonitorableThread(runnable, name)
|
|
|
|
|
// new Thread(runnable, name + "-" + counter.getAndIncrement)
|
|
|
|
|
}
|
2009-12-11 16:37:44 +01:00
|
|
|
|
2010-10-22 17:50:48 +02:00
|
|
|
/**
|
|
|
|
|
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
|
|
|
|
*/
|
|
|
|
|
object MonitorableThread {
|
|
|
|
|
val DEFAULT_NAME = "MonitorableThread"
|
|
|
|
|
val created = new AtomicInteger
|
|
|
|
|
val alive = new AtomicInteger
|
|
|
|
|
@volatile var debugLifecycle = false
|
|
|
|
|
}
|
2009-12-11 16:37:44 +01:00
|
|
|
|
2010-10-22 17:50:48 +02:00
|
|
|
// FIXME fix the issues with using the monitoring in MonitorableThread
|
2009-12-11 16:37:44 +01:00
|
|
|
|
2010-10-22 17:50:48 +02:00
|
|
|
/**
|
|
|
|
|
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
|
|
|
|
*/
|
|
|
|
|
class MonitorableThread(runnable: Runnable, name: String)
|
|
|
|
|
extends Thread(runnable, name + "-" + MonitorableThread.created.incrementAndGet) with Logging {
|
|
|
|
|
|
|
|
|
|
setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
|
|
|
|
|
def uncaughtException(thread: Thread, cause: Throwable) =
|
|
|
|
|
log.error(cause, "UNCAUGHT in thread [%s]", thread.getName)
|
|
|
|
|
})
|
|
|
|
|
|
|
|
|
|
override def run = {
|
|
|
|
|
val debug = MonitorableThread.debugLifecycle
|
|
|
|
|
log.debug("Created thread %s", getName)
|
|
|
|
|
try {
|
|
|
|
|
MonitorableThread.alive.incrementAndGet
|
|
|
|
|
super.run
|
|
|
|
|
} finally {
|
|
|
|
|
MonitorableThread.alive.decrementAndGet
|
|
|
|
|
log.debug("Exiting thread %s", getName)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
2009-12-11 16:37:44 +01:00
|
|
|
|
2010-10-22 17:50:48 +02:00
|
|
|
/**
|
|
|
|
|
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
|
|
|
|
*/
|
|
|
|
|
class BoundedExecutorDecorator(val executor: ExecutorService, bound: Int) extends ExecutorService with Logging {
|
|
|
|
|
protected val semaphore = new Semaphore(bound)
|
|
|
|
|
|
|
|
|
|
def execute(command: Runnable) = {
|
|
|
|
|
semaphore.acquire
|
|
|
|
|
try {
|
|
|
|
|
executor.execute(new Runnable() {
|
|
|
|
|
def run = {
|
|
|
|
|
try {
|
|
|
|
|
command.run
|
|
|
|
|
} finally {
|
|
|
|
|
semaphore.release
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
})
|
|
|
|
|
} catch {
|
|
|
|
|
case e: RejectedExecutionException =>
|
|
|
|
|
semaphore.release
|
|
|
|
|
case e =>
|
|
|
|
|
log.error(e,"Unexpected exception")
|
|
|
|
|
throw e
|
|
|
|
|
}
|
|
|
|
|
}
|
2009-12-11 16:37:44 +01:00
|
|
|
|
2010-10-22 17:50:48 +02:00
|
|
|
// Delegating methods for the ExecutorService interface
|
|
|
|
|
def shutdown = executor.shutdown
|
2009-12-11 16:37:44 +01:00
|
|
|
|
2010-10-22 17:50:48 +02:00
|
|
|
def shutdownNow = executor.shutdownNow
|
2009-12-11 16:37:44 +01:00
|
|
|
|
2010-10-22 17:50:48 +02:00
|
|
|
def isShutdown = executor.isShutdown
|
2009-12-11 16:37:44 +01:00
|
|
|
|
2010-10-22 17:50:48 +02:00
|
|
|
def isTerminated = executor.isTerminated
|
2009-12-11 16:37:44 +01:00
|
|
|
|
2010-10-22 17:50:48 +02:00
|
|
|
def awaitTermination(l: Long, timeUnit: TimeUnit) = executor.awaitTermination(l, timeUnit)
|
2009-12-11 16:37:44 +01:00
|
|
|
|
2010-10-22 17:50:48 +02:00
|
|
|
def submit[T](callable: Callable[T]) = executor.submit(callable)
|
2009-12-11 16:37:44 +01:00
|
|
|
|
2010-10-22 17:50:48 +02:00
|
|
|
def submit[T](runnable: Runnable, t: T) = executor.submit(runnable, t)
|
2009-12-11 16:37:44 +01:00
|
|
|
|
2010-10-22 17:50:48 +02:00
|
|
|
def submit(runnable: Runnable) = executor.submit(runnable)
|
2009-12-11 16:37:44 +01:00
|
|
|
|
2010-10-22 17:50:48 +02:00
|
|
|
def invokeAll[T](callables: Collection[_ <: Callable[T]]) = executor.invokeAll(callables)
|
2009-12-11 16:37:44 +01:00
|
|
|
|
2010-10-22 17:50:48 +02:00
|
|
|
def invokeAll[T](callables: Collection[_ <: Callable[T]], l: Long, timeUnit: TimeUnit) = executor.invokeAll(callables, l, timeUnit)
|
2009-12-11 16:37:44 +01:00
|
|
|
|
2010-10-22 17:50:48 +02:00
|
|
|
def invokeAny[T](callables: Collection[_ <: Callable[T]]) = executor.invokeAny(callables)
|
|
|
|
|
|
|
|
|
|
def invokeAny[T](callables: Collection[_ <: Callable[T]], l: Long, timeUnit: TimeUnit) = executor.invokeAny(callables, l, timeUnit)
|
2010-05-21 20:08:49 +02:00
|
|
|
}
|