Removing bounded executors since they have probably never been used, also, removing possibility to specify own RejectedExecutionHandler since Akka needs to know what to do there anyway. Implementing a sane version of CallerRuns
This commit is contained in:
parent
13647b2b61
commit
727c7de58d
8 changed files with 21 additions and 85 deletions
|
|
@ -5,26 +5,21 @@
|
|||
package akka.dispatch
|
||||
|
||||
import java.util.Collection
|
||||
import java.util.concurrent._
|
||||
import atomic.{ AtomicLong, AtomicInteger }
|
||||
import java.util.concurrent.atomic.{ AtomicLong, AtomicInteger }
|
||||
import akka.util.Duration
|
||||
import akka.event.Logging.{ Warning, Error }
|
||||
import akka.actor.ActorSystem
|
||||
import java.util.concurrent.ThreadPoolExecutor.{ AbortPolicy }
|
||||
import java.util.concurrent._
|
||||
|
||||
object ThreadPoolConfig {
|
||||
type Bounds = Int
|
||||
type FlowHandler = Either[RejectedExecutionHandler, Bounds]
|
||||
type FlowHandler = Either[SaneRejectedExecutionHandler, Bounds]
|
||||
type QueueFactory = () ⇒ BlockingQueue[Runnable]
|
||||
|
||||
val defaultAllowCoreThreadTimeout: Boolean = false
|
||||
val defaultCorePoolSize: Int = 16
|
||||
val defaultMaxPoolSize: Int = 128
|
||||
val defaultTimeout: Duration = Duration(60000L, TimeUnit.MILLISECONDS)
|
||||
def defaultFlowHandler: FlowHandler = flowHandler(new AbortPolicy)
|
||||
|
||||
def flowHandler(rejectionHandler: RejectedExecutionHandler): FlowHandler = Left(rejectionHandler)
|
||||
def flowHandler(bounds: Int): FlowHandler = Right(bounds)
|
||||
|
||||
def fixedPoolSize(size: Int): Int = size
|
||||
def scaledPoolSize(multiplier: Double): Int =
|
||||
|
|
@ -73,20 +68,14 @@ case class ThreadPoolConfig(app: ActorSystem,
|
|||
corePoolSize: Int = ThreadPoolConfig.defaultCorePoolSize,
|
||||
maxPoolSize: Int = ThreadPoolConfig.defaultMaxPoolSize,
|
||||
threadTimeout: Duration = ThreadPoolConfig.defaultTimeout,
|
||||
flowHandler: ThreadPoolConfig.FlowHandler = ThreadPoolConfig.defaultFlowHandler,
|
||||
queueFactory: ThreadPoolConfig.QueueFactory = ThreadPoolConfig.linkedBlockingQueue())
|
||||
extends ExecutorServiceFactoryProvider {
|
||||
final def createExecutorServiceFactory(name: String): ExecutorServiceFactory = new ExecutorServiceFactory {
|
||||
val threadFactory = new MonitorableThreadFactory(name)
|
||||
def createExecutorService: ExecutorService = flowHandler match {
|
||||
case Left(rejectHandler) ⇒
|
||||
val service = new ThreadPoolExecutor(corePoolSize, maxPoolSize, threadTimeout.length, threadTimeout.unit, queueFactory(), threadFactory, rejectHandler)
|
||||
service.allowCoreThreadTimeOut(allowCorePoolTimeout)
|
||||
service
|
||||
case Right(bounds) ⇒
|
||||
val service = new ThreadPoolExecutor(corePoolSize, maxPoolSize, threadTimeout.length, threadTimeout.unit, queueFactory(), threadFactory)
|
||||
service.allowCoreThreadTimeOut(allowCorePoolTimeout)
|
||||
new BoundedExecutorDecorator(app, service, bounds)
|
||||
def createExecutorService: ExecutorService = {
|
||||
val service = new ThreadPoolExecutor(corePoolSize, maxPoolSize, threadTimeout.length, threadTimeout.unit, queueFactory(), threadFactory, new SaneRejectedExecutionHandler)
|
||||
service.allowCoreThreadTimeOut(allowCorePoolTimeout)
|
||||
service
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -106,26 +95,23 @@ case class ThreadPoolConfigDispatcherBuilder(dispatcherFactory: (ThreadPoolConfi
|
|||
import ThreadPoolConfig._
|
||||
def build = dispatcherFactory(config)
|
||||
|
||||
def withNewBoundedThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity(bounds: Int): ThreadPoolConfigDispatcherBuilder =
|
||||
this.copy(config = config.copy(flowHandler = flowHandler(bounds), queueFactory = linkedBlockingQueue()))
|
||||
|
||||
def withNewThreadPoolWithCustomBlockingQueue(newQueueFactory: QueueFactory): ThreadPoolConfigDispatcherBuilder =
|
||||
this.copy(config = config.copy(flowHandler = defaultFlowHandler, queueFactory = newQueueFactory))
|
||||
this.copy(config = config.copy(queueFactory = newQueueFactory))
|
||||
|
||||
def withNewThreadPoolWithCustomBlockingQueue(queue: BlockingQueue[Runnable]): ThreadPoolConfigDispatcherBuilder =
|
||||
withNewThreadPoolWithCustomBlockingQueue(reusableQueue(queue))
|
||||
|
||||
def withNewThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity: ThreadPoolConfigDispatcherBuilder =
|
||||
this.copy(config = config.copy(queueFactory = linkedBlockingQueue(), flowHandler = defaultFlowHandler))
|
||||
this.copy(config = config.copy(queueFactory = linkedBlockingQueue()))
|
||||
|
||||
def withNewThreadPoolWithLinkedBlockingQueueWithCapacity(capacity: Int): ThreadPoolConfigDispatcherBuilder =
|
||||
this.copy(config = config.copy(queueFactory = linkedBlockingQueue(capacity), flowHandler = defaultFlowHandler))
|
||||
this.copy(config = config.copy(queueFactory = linkedBlockingQueue(capacity)))
|
||||
|
||||
def withNewThreadPoolWithSynchronousQueueWithFairness(fair: Boolean): ThreadPoolConfigDispatcherBuilder =
|
||||
this.copy(config = config.copy(queueFactory = synchronousQueue(fair), flowHandler = defaultFlowHandler))
|
||||
this.copy(config = config.copy(queueFactory = synchronousQueue(fair)))
|
||||
|
||||
def withNewThreadPoolWithArrayBlockingQueueWithCapacityAndFairness(capacity: Int, fair: Boolean): ThreadPoolConfigDispatcherBuilder =
|
||||
this.copy(config = config.copy(queueFactory = arrayBlockingQueue(capacity, fair), flowHandler = defaultFlowHandler))
|
||||
this.copy(config = config.copy(queueFactory = arrayBlockingQueue(capacity, fair)))
|
||||
|
||||
def setCorePoolSize(size: Int): ThreadPoolConfigDispatcherBuilder =
|
||||
this.copy(config = config.copy(corePoolSize = size))
|
||||
|
|
@ -139,21 +125,12 @@ case class ThreadPoolConfigDispatcherBuilder(dispatcherFactory: (ThreadPoolConfi
|
|||
def setMaxPoolSizeFromFactor(multiplier: Double): ThreadPoolConfigDispatcherBuilder =
|
||||
setMaxPoolSize(scaledPoolSize(multiplier))
|
||||
|
||||
def setExecutorBounds(bounds: Int): ThreadPoolConfigDispatcherBuilder =
|
||||
this.copy(config = config.copy(flowHandler = flowHandler(bounds)))
|
||||
|
||||
def setKeepAliveTimeInMillis(time: Long): ThreadPoolConfigDispatcherBuilder =
|
||||
setKeepAliveTime(Duration(time, TimeUnit.MILLISECONDS))
|
||||
|
||||
def setKeepAliveTime(time: Duration): ThreadPoolConfigDispatcherBuilder =
|
||||
this.copy(config = config.copy(threadTimeout = time))
|
||||
|
||||
def setRejectionPolicy(policy: RejectedExecutionHandler): ThreadPoolConfigDispatcherBuilder =
|
||||
setFlowHandler(flowHandler(policy))
|
||||
|
||||
def setFlowHandler(newFlowHandler: FlowHandler): ThreadPoolConfigDispatcherBuilder =
|
||||
this.copy(config = config.copy(flowHandler = newFlowHandler))
|
||||
|
||||
def setAllowCoreThreadTimeout(allow: Boolean): ThreadPoolConfigDispatcherBuilder =
|
||||
this.copy(config = config.copy(allowCorePoolTimeout = allow))
|
||||
|
||||
|
|
@ -207,35 +184,6 @@ class MonitorableThread(runnable: Runnable, name: String)
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
*/
|
||||
class BoundedExecutorDecorator(val app: ActorSystem, val executor: ExecutorService, bound: Int) extends ExecutorServiceDelegate {
|
||||
protected val semaphore = new Semaphore(bound)
|
||||
|
||||
override def execute(command: Runnable) = {
|
||||
semaphore.acquire
|
||||
try {
|
||||
executor.execute(new Runnable() {
|
||||
def run = {
|
||||
try {
|
||||
command.run
|
||||
} finally {
|
||||
semaphore.release
|
||||
}
|
||||
}
|
||||
})
|
||||
} catch {
|
||||
case e: RejectedExecutionException ⇒
|
||||
app.eventStream.publish(Warning(this, e.toString))
|
||||
semaphore.release
|
||||
case e: Throwable ⇒
|
||||
app.eventStream.publish(Error(e, this, e.getMessage))
|
||||
throw e
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* As the name says
|
||||
*/
|
||||
|
|
@ -269,3 +217,10 @@ trait ExecutorServiceDelegate extends ExecutorService {
|
|||
|
||||
def invokeAny[T](callables: Collection[_ <: Callable[T]], l: Long, timeUnit: TimeUnit) = executor.invokeAny(callables, l, timeUnit)
|
||||
}
|
||||
|
||||
class SaneRejectedExecutionHandler extends RejectedExecutionHandler {
|
||||
def rejectedExecution(runnable: Runnable, threadPoolExecutor: ThreadPoolExecutor): Unit = {
|
||||
if (threadPoolExecutor.isShutdown) throw new RejectedExecutionException("Shutdown")
|
||||
else runnable.run()
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue