This commit is contained in:
Viktor Klang 2010-10-22 17:50:48 +02:00
parent 4a8b93392c
commit a0b8d6cc43
8 changed files with 322 additions and 212 deletions

View file

@ -52,7 +52,7 @@ object Dispatchers extends Logging {
val MAILBOX_PUSH_TIME_OUT = Duration(config.getInt("akka.actor.default-dispatcher.mailbox-push-timeout-time", 10), TIME_UNIT)
val THROUGHPUT_DEADLINE_TIME = Duration(config.getInt("akka.actor.throughput-deadline-time",-1), TIME_UNIT)
val THROUGHPUT_DEADLINE_TIME_MILLIS = THROUGHPUT_DEADLINE_TIME.toMillis.toInt
val MAILBOX_TYPE = if (MAILBOX_CAPACITY < 0) UnboundedMailbox() else BoundedMailbox()
val MAILBOX_TYPE: MailboxType = if (MAILBOX_CAPACITY < 0) UnboundedMailbox() else BoundedMailbox()
lazy val defaultGlobalDispatcher = {
config.getConfigMap("akka.actor.default-dispatcher").flatMap(from).getOrElse(globalExecutorBasedEventDrivenDispatcher)
@ -60,13 +60,7 @@ object Dispatchers extends Logging {
object globalHawtDispatcher extends HawtDispatcher
object globalExecutorBasedEventDrivenDispatcher extends ExecutorBasedEventDrivenDispatcher(
"global", THROUGHPUT, THROUGHPUT_DEADLINE_TIME_MILLIS, MAILBOX_TYPE) {
override def register(actor: ActorRef) = {
if (isShutdown) init
super.register(actor)
}
}
object globalExecutorBasedEventDrivenDispatcher extends ExecutorBasedEventDrivenDispatcher("global", THROUGHPUT, THROUGHPUT_DEADLINE_TIME_MILLIS, MAILBOX_TYPE)
/**
* Creates an event-driven dispatcher based on the excellent HawtDispatch library.
@ -172,23 +166,27 @@ object Dispatchers extends Logging {
def from(cfg: ConfigMap): Option[MessageDispatcher] = {
lazy val name = cfg.getString("name", newUuid.toString)
def threadPoolConfig(b: ThreadPoolBuilder) {
b.configureIfPossible( builder => {
cfg.getInt("keep-alive-time").foreach(time => builder.setKeepAliveTimeInMillis(Duration(time, TIME_UNIT).toMillis.toInt))
cfg.getDouble("core-pool-size-factor").foreach(builder.setCorePoolSizeFromFactor(_))
cfg.getDouble("max-pool-size-factor").foreach(builder.setMaxPoolSizeFromFactor(_))
cfg.getInt("executor-bounds").foreach(builder.setExecutorBounds(_))
cfg.getBool("allow-core-timeout").foreach(builder.setAllowCoreThreadTimeout(_))
cfg.getInt("mailbox-capacity").foreach(builder.setMailboxCapacity(_))
def configureThreadPool(createDispatcher: => (ThreadPoolConfig) => MessageDispatcher): ThreadPoolConfigDispatcherBuilder = {
val builder = ThreadPoolConfigDispatcherBuilder(createDispatcher,ThreadPoolConfig()) //Create a new builder
//Creates a transformation from builder to builder, if the option isDefined
def conf_?[T](opt: Option[T])(fun: (T) => ThreadPoolConfigDispatcherBuilder => ThreadPoolConfigDispatcherBuilder):
Option[(ThreadPoolConfigDispatcherBuilder) => ThreadPoolConfigDispatcherBuilder] = opt map fun
cfg.getString("rejection-policy").map({
//Apply the following options to the config if they are present in the cfg
List(
conf_?(cfg getInt "keep-alive-time" )(time => _.setKeepAliveTime(Duration(time, TIME_UNIT))),
conf_?(cfg getDouble "core-pool-size-factor")(factor => _.setCorePoolSizeFromFactor(factor)),
conf_?(cfg getDouble "max-pool-size-factor" )(factor => _.setMaxPoolSizeFromFactor(factor)),
conf_?(cfg getInt "executor-bounds" )(bounds => _.setExecutorBounds(bounds)),
conf_?(cfg getBool "allow-core-timeout" )(allow => _.setAllowCoreThreadTimeout(allow)),
conf_?(cfg getString "rejection-policy" map {
case "abort" => new AbortPolicy()
case "caller-runs" => new CallerRunsPolicy()
case "discard-oldest" => new DiscardOldestPolicy()
case "discard" => new DiscardPolicy()
case x => throw new IllegalArgumentException("[%s] is not a valid rejectionPolicy!" format x)
}).foreach(builder.setRejectionPolicy(_))
})
case x => throw new IllegalArgumentException("[%s] is not a valid rejectionPolicy!" format x)
})(policy => _.setRejectionPolicy(policy))
).foldLeft(builder)( (c,f) => f.map( _(c) ).getOrElse(c)) //Returns the builder with all the specified options set
}
lazy val mailboxType: MailboxType = {
@ -200,15 +198,17 @@ object Dispatchers extends Logging {
cfg.getString("type") map {
case "ExecutorBasedEventDriven" =>
new ExecutorBasedEventDrivenDispatcher(
configureThreadPool(threadPoolConfig => new ExecutorBasedEventDrivenDispatcher(
name,
cfg.getInt("throughput", THROUGHPUT),
cfg.getInt("throughput-deadline", THROUGHPUT_DEADLINE_TIME_MILLIS),
mailboxType,
threadPoolConfig)
threadPoolConfig)).build
case "ExecutorBasedEventDrivenWorkStealing" => new ExecutorBasedEventDrivenWorkStealingDispatcher(name, mailboxType, threadPoolConfig)
case "Hawt" => new HawtDispatcher(cfg.getBool("aggregate").getOrElse(true))
case "ExecutorBasedEventDrivenWorkStealing" =>
configureThreadPool(poolCfg => new ExecutorBasedEventDrivenWorkStealingDispatcher(name, mailboxType,poolCfg)).build
case "Hawt" => new HawtDispatcher(cfg.getBool("aggregate",true))
case "GlobalExecutorBasedEventDriven" => globalExecutorBasedEventDrivenDispatcher
case "GlobalHawt" => globalHawtDispatcher
case unknown => throw new IllegalArgumentException("Unknown dispatcher type [%s]" format unknown)

View file

@ -8,8 +8,9 @@ import se.scalablesolutions.akka.actor.{ActorRef, IllegalActorStateException}
import se.scalablesolutions.akka.util.ReflectiveAccess.EnterpriseModule
import java.util.Queue
import java.util.concurrent.{RejectedExecutionException, ConcurrentLinkedQueue, LinkedBlockingQueue}
import se.scalablesolutions.akka.util.Switch
import java.util.concurrent.atomic.AtomicReference
import java.util.concurrent. {ExecutorService, RejectedExecutionException, ConcurrentLinkedQueue, LinkedBlockingQueue}
/**
* Default settings are:
@ -69,11 +70,11 @@ class ExecutorBasedEventDrivenDispatcher(
val throughput: Int = Dispatchers.THROUGHPUT,
val throughputDeadlineTime: Int = Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS,
_mailboxType: MailboxType = Dispatchers.MAILBOX_TYPE,
config: (ThreadPoolBuilder) => Unit = _ => ())
extends MessageDispatcher with ThreadPoolBuilder {
val config: ThreadPoolConfig = ThreadPoolConfig())
extends MessageDispatcher {
def this(_name: String, throughput: Int, throughputDeadlineTime: Int, mailboxType: MailboxType) =
this(_name, throughput, throughputDeadlineTime, mailboxType, _ => ()) // Needed for Java API usage
this(_name, throughput, throughputDeadlineTime, mailboxType,ThreadPoolConfig()) // Needed for Java API usage
def this(_name: String, throughput: Int, mailboxType: MailboxType) =
this(_name, throughput, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, mailboxType) // Needed for Java API usage
@ -84,14 +85,12 @@ class ExecutorBasedEventDrivenDispatcher(
def this(_name: String) =
this(_name, Dispatchers.THROUGHPUT, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, Dispatchers.MAILBOX_TYPE) // Needed for Java API usage
val name = "akka:event-driven:dispatcher:" + _name
val mailboxType = Some(_mailboxType)
private[akka] val active = new Switch(false)
val name = "akka:event-driven:dispatcher:" + _name
//Initialize
init
private[akka] val threadFactory = new MonitorableThreadFactory(name)
private[akka] val executorService = new AtomicReference[ExecutorService](null)
def dispatch(invocation: MessageInvocation) = {
val mbox = getMailbox(invocation.receiver)
@ -99,6 +98,8 @@ class ExecutorBasedEventDrivenDispatcher(
registerForExecution(mbox)
}
def isShutdown = active.isOff
/**
* @return the mailbox associated with the actor
*/
@ -112,8 +113,7 @@ class ExecutorBasedEventDrivenDispatcher(
}
case BoundedMailbox(blocking, capacity, pushTimeOut) =>
val cap = if (mailboxCapacity == -1) capacity else mailboxCapacity
new DefaultBoundedMessageQueue(cap, pushTimeOut, blocking) with ExecutableMailbox {
new DefaultBoundedMessageQueue(capacity, pushTimeOut, blocking) with ExecutableMailbox {
def dispatcher = ExecutorBasedEventDrivenDispatcher.this
}
}
@ -131,25 +131,28 @@ class ExecutorBasedEventDrivenDispatcher(
case JMSBasedDurableMailbox(serializer) => throw new UnsupportedOperationException("JMSBasedDurableMailbox is not yet supported")
}
def start = active switchOn {
def start: Unit = if (active.isOff) active switchOn {
log.debug("Starting up %s\n\twith throughput [%d]", toString, throughput)
if (executorService.get() eq null) {
val newExecutor = config.createExecutorService(threadFactory)
if (!executorService.compareAndSet(null,newExecutor))
log.error("Thought the ExecutorService was missing but appeared out of nowhere!")
}
}
def shutdown = active switchOff {
log.debug("Shutting down %s", toString)
executor.shutdownNow
uuids.clear
}
def ensureNotActive(): Unit = if (active.isOn) {
throw new IllegalActorStateException(
"Can't build a new thread pool for a dispatcher that is already up and running")
def shutdown: Unit = if (active.isOn) active switchOff {
val old = executorService.getAndSet(null)
if (old ne null) {
log.debug("Shutting down %s", toString)
old.shutdownNow()
uuids.clear
}
}
private[akka] def registerForExecution(mbox: MessageQueue with ExecutableMailbox): Unit = if (active.isOn) {
if (mbox.suspended.isOff && mbox.dispatcherLock.tryLock()) {
try {
executor execute mbox
executorService.get() execute mbox
} catch {
case e: RejectedExecutionException =>
mbox.dispatcherLock.unlock()
@ -171,13 +174,6 @@ class ExecutorBasedEventDrivenDispatcher(
mbox.suspended.switchOff
registerForExecution(mbox)
}
// FIXME: should we have an unbounded queue and not bounded as default ????
private[akka] def init {
withNewThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity
config(this)
buildThreadPool
}
}
/**

View file

@ -33,11 +33,11 @@ import se.scalablesolutions.akka.util.Switch
class ExecutorBasedEventDrivenWorkStealingDispatcher(
_name: String,
_mailboxType: MailboxType = Dispatchers.MAILBOX_TYPE,
config: (ThreadPoolBuilder) => Unit = _ => ()) extends MessageDispatcher with ThreadPoolBuilder {
config: ThreadPoolConfig = ThreadPoolConfig()) extends MessageDispatcher with ThreadPoolBuilder {
def this(_name: String, mailboxType: MailboxType) = this(_name, mailboxType, _ => ())
def this(_name: String, mailboxType: MailboxType) = this(_name, mailboxType,ThreadPoolConfig())
def this(_name: String) = this(_name, Dispatchers.MAILBOX_TYPE, _ => ())
def this(_name: String) = this(_name, Dispatchers.MAILBOX_TYPE,ThreadPoolConfig())
val mailboxType = Some(_mailboxType)
@ -54,7 +54,6 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher(
@volatile private var lastThiefIndex = 0
val name = "akka:event-driven-work-stealing:dispatcher:" + _name
init
/**
* @return the mailbox associated with the actor
@ -195,12 +194,6 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher(
override val toString = "ExecutorBasedEventDrivenWorkStealingDispatcher[" + name + "]"
private[akka] def init = {
withNewThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity
config(this)
buildThreadPool
}
def createTransientMailbox(actorRef: ActorRef, mailboxType: TransientMailboxType): AnyRef = mailboxType match {
case UnboundedMailbox(blocking) => // FIXME make use of 'blocking' in work stealer ConcurrentLinkedDeque
new ConcurrentLinkedDeque[MessageInvocation] with MessageQueue with Runnable {

View file

@ -69,6 +69,7 @@ trait MessageDispatcher extends MailboxFactory with Logging {
def shutdown: Unit
def register(actorRef: ActorRef) {
start
if (actorRef.mailbox eq null) actorRef.mailbox = createMailbox(actorRef)
uuids add actorRef.uuid
}

View file

@ -37,10 +37,6 @@ class ThreadBasedDispatcher(private val actor: ActorRef, _mailboxType: MailboxTy
}
object ThreadBasedDispatcher {
val oneThread: (ThreadPoolBuilder) => Unit = b => {
b setCorePoolSize 1
b setMaxPoolSize 1
b setAllowCoreThreadTimeout true
}
val oneThread: ThreadPoolConfig = ThreadPoolConfig(allowCorePoolTimeout = true, corePoolSize = 1, maxPoolSize = 1)
}

View file

@ -10,7 +10,131 @@ import atomic.{AtomicLong, AtomicInteger}
import ThreadPoolExecutor.CallerRunsPolicy
import se.scalablesolutions.akka.actor.IllegalActorStateException
import se.scalablesolutions.akka.util.{Logger, Logging}
import se.scalablesolutions.akka.util. {Duration, Logger, Logging}
object ThreadPoolConfig {
type Bounds = Int
type FlowHandler = Either[RejectedExecutionHandler,Bounds]
type QueueFactory = () => BlockingQueue[Runnable]
val defaultAllowCoreThreadTimeout: Boolean = false
val defaultCorePoolSize: Int = 16
val defaultMaxPoolSize: Int = 128
val defaultTimeout: Duration = Duration(60000L,TimeUnit.MILLISECONDS)
def defaultFlowHandler: FlowHandler = flowHandler(new CallerRunsPolicy)
def flowHandler(rejectionHandler: RejectedExecutionHandler): FlowHandler = Left(rejectionHandler)
def flowHandler(bounds: Int): FlowHandler = Right(bounds)
def fixedPoolSize(size: Int): Int = size
def scaledPoolSize(multiplier: Double): Int =
(Runtime.getRuntime.availableProcessors * multiplier).ceil.toInt
def arrayBlockingQueue(capacity: Int, fair: Boolean): QueueFactory =
() => new ArrayBlockingQueue[Runnable](capacity,fair)
def synchronousQueue(fair: Boolean): QueueFactory =
() => new SynchronousQueue[Runnable](fair)
def linkedBlockingQueue(): QueueFactory =
() => new LinkedBlockingQueue[Runnable]()
def linkedBlockingQueue(capacity: Int): QueueFactory =
() => new LinkedBlockingQueue[Runnable](capacity)
def reusableQueue(queue: BlockingQueue[Runnable]): QueueFactory =
() => queue
def reusableQueue(queueFactory: QueueFactory): QueueFactory = {
val queue = queueFactory()
() => queue
}
}
case class ThreadPoolConfig(allowCorePoolTimeout: Boolean = ThreadPoolConfig.defaultAllowCoreThreadTimeout,
corePoolSize: Int = ThreadPoolConfig.defaultCorePoolSize,
maxPoolSize: Int = ThreadPoolConfig.defaultMaxPoolSize,
threadTimeout: Duration = ThreadPoolConfig.defaultTimeout,
flowHandler: ThreadPoolConfig.FlowHandler = ThreadPoolConfig.defaultFlowHandler,
queueFactory: ThreadPoolConfig.QueueFactory = ThreadPoolConfig.linkedBlockingQueue()) {
final def createExecutorService(threadFactory: ThreadFactory): ExecutorService = {
flowHandler match {
case Left(rejectHandler) =>
val service = new ThreadPoolExecutor(corePoolSize, maxPoolSize, threadTimeout.length, threadTimeout.unit, queueFactory(), threadFactory, rejectHandler)
service.allowCoreThreadTimeOut(allowCorePoolTimeout)
service
case Right(bounds) =>
val service = new ThreadPoolExecutor(corePoolSize, maxPoolSize, threadTimeout.length, threadTimeout.unit, queueFactory(), threadFactory)
service.allowCoreThreadTimeOut(allowCorePoolTimeout)
new BoundedExecutorDecorator(service,bounds)
}
}
}
trait DispatcherBuilder {
def build: MessageDispatcher
}
case class ThreadPoolConfigDispatcherBuilder(dispatcherFactory: (ThreadPoolConfig) => MessageDispatcher, config: ThreadPoolConfig) extends DispatcherBuilder {
import ThreadPoolConfig._
def build = dispatcherFactory(config)
//TODO remove this, for backwards compat only
def buildThreadPool = build
def withNewBoundedThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity(bounds: Int): ThreadPoolConfigDispatcherBuilder =
this.copy(config = config.copy(flowHandler = flowHandler(bounds), queueFactory = linkedBlockingQueue()))
def withNewThreadPoolWithCustomBlockingQueue(newQueueFactory: QueueFactory): ThreadPoolConfigDispatcherBuilder =
this.copy(config = config.copy(flowHandler = defaultFlowHandler, queueFactory = newQueueFactory))
def withNewThreadPoolWithCustomBlockingQueue(queue: BlockingQueue[Runnable]): ThreadPoolConfigDispatcherBuilder =
withNewThreadPoolWithCustomBlockingQueue(reusableQueue(queue))
def withNewThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity: ThreadPoolConfigDispatcherBuilder =
this.copy(config = config.copy(queueFactory = linkedBlockingQueue(), flowHandler = defaultFlowHandler))
def withNewThreadPoolWithLinkedBlockingQueueWithCapacity(capacity: Int): ThreadPoolConfigDispatcherBuilder =
this.copy(config = config.copy(queueFactory = linkedBlockingQueue(capacity), flowHandler = defaultFlowHandler))
def withNewThreadPoolWithSynchronousQueueWithFairness(fair: Boolean): ThreadPoolConfigDispatcherBuilder =
this.copy(config = config.copy(queueFactory = synchronousQueue(fair), flowHandler = defaultFlowHandler))
def withNewThreadPoolWithArrayBlockingQueueWithCapacityAndFairness(capacity: Int, fair: Boolean): ThreadPoolConfigDispatcherBuilder =
this.copy(config = config.copy(queueFactory = arrayBlockingQueue(capacity,fair), flowHandler = defaultFlowHandler))
def setCorePoolSize(size: Int): ThreadPoolConfigDispatcherBuilder =
this.copy(config = config.copy(corePoolSize = size))
def setMaxPoolSize(size: Int): ThreadPoolConfigDispatcherBuilder =
this.copy(config = config.copy(maxPoolSize = size))
def setCorePoolSizeFromFactor(multiplier: Double): ThreadPoolConfigDispatcherBuilder =
setCorePoolSize(scaledPoolSize(multiplier))
def setMaxPoolSizeFromFactor(multiplier: Double): ThreadPoolConfigDispatcherBuilder =
setMaxPoolSize(scaledPoolSize(multiplier))
def setExecutorBounds(bounds: Int): ThreadPoolConfigDispatcherBuilder =
this.copy(config = config.copy(flowHandler = flowHandler(bounds)))
def setKeepAliveTimeInMillis(time: Long): ThreadPoolConfigDispatcherBuilder =
setKeepAliveTime(Duration(time,TimeUnit.MILLISECONDS))
def setKeepAliveTime(time: Duration): ThreadPoolConfigDispatcherBuilder =
this.copy(config = config.copy(threadTimeout = time))
def setRejectionPolicy(policy: RejectedExecutionHandler): ThreadPoolConfigDispatcherBuilder =
setFlowHandler(flowHandler(policy))
def setFlowHandler(newFlowHandler: FlowHandler): ThreadPoolConfigDispatcherBuilder =
this.copy(config = config.copy(flowHandler = newFlowHandler))
def setAllowCoreThreadTimeout(allow: Boolean): ThreadPoolConfigDispatcherBuilder =
this.copy(config = config.copy(allowCorePoolTimeout = allow))
}
trait ThreadPoolBuilder extends Logging {
val name: String
@ -26,13 +150,13 @@ trait ThreadPoolBuilder extends Logging {
@volatile private var inProcessOfBuilding = false
private var blockingQueue: BlockingQueue[Runnable] = _
private lazy val threadFactory = new MonitorableThreadFactory(name)
protected lazy val threadFactory = new MonitorableThreadFactory(name)
protected var executor: ExecutorService = _
@volatile var executor: ExecutorService = _
def isShutdown = executor.isShutdown
def buildThreadPool(): Unit = synchronized {
def buildThreadPool(): ExecutorService = synchronized {
ensureNotActive
inProcessOfBuilding = false
@ -51,6 +175,7 @@ trait ThreadPoolBuilder extends Logging {
} else {
executor = threadPoolBuilder
}
executor
}
def withNewThreadPoolWithCustomBlockingQueue(queue: BlockingQueue[Runnable]): ThreadPoolBuilder = synchronized {
@ -204,104 +329,105 @@ trait ThreadPoolBuilder extends Logging {
}
def ensureNotActive(): Unit
}
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class BoundedExecutorDecorator(val executor: ExecutorService, bound: Int) extends ExecutorService with Logging {
protected val semaphore = new Semaphore(bound)
def execute(command: Runnable) = {
semaphore.acquire
try {
executor.execute(new Runnable() {
def run = {
try {
command.run
} finally {
semaphore.release
}
}
})
} catch {
case e: RejectedExecutionException =>
semaphore.release
case e =>
log.error(e,"Unexpected exception")
throw e
}
}
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class MonitorableThreadFactory(val name: String) extends ThreadFactory {
protected val counter = new AtomicLong
// Delegating methods for the ExecutorService interface
def shutdown = executor.shutdown
def newThread(runnable: Runnable) =
new MonitorableThread(runnable, name)
// new Thread(runnable, name + "-" + counter.getAndIncrement)
}
def shutdownNow = executor.shutdownNow
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
object MonitorableThread {
val DEFAULT_NAME = "MonitorableThread"
val created = new AtomicInteger
val alive = new AtomicInteger
@volatile var debugLifecycle = false
}
def isShutdown = executor.isShutdown
// FIXME fix the issues with using the monitoring in MonitorableThread
def isTerminated = executor.isTerminated
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class MonitorableThread(runnable: Runnable, name: String)
extends Thread(runnable, name + "-" + MonitorableThread.created.incrementAndGet) with Logging {
def awaitTermination(l: Long, timeUnit: TimeUnit) = executor.awaitTermination(l, timeUnit)
setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
def uncaughtException(thread: Thread, cause: Throwable) =
log.error(cause, "UNCAUGHT in thread [%s]", thread.getName)
})
def submit[T](callable: Callable[T]) = executor.submit(callable)
def submit[T](runnable: Runnable, t: T) = executor.submit(runnable, t)
def submit(runnable: Runnable) = executor.submit(runnable)
def invokeAll[T](callables: Collection[_ <: Callable[T]]) = executor.invokeAll(callables)
def invokeAll[T](callables: Collection[_ <: Callable[T]], l: Long, timeUnit: TimeUnit) = executor.invokeAll(callables, l, timeUnit)
def invokeAny[T](callables: Collection[_ <: Callable[T]]) = executor.invokeAny(callables)
def invokeAny[T](callables: Collection[_ <: Callable[T]], l: Long, timeUnit: TimeUnit) = executor.invokeAny(callables, l, timeUnit)
}
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class MonitorableThreadFactory(val name: String) extends ThreadFactory {
protected val counter = new AtomicLong
def newThread(runnable: Runnable) =
new MonitorableThread(runnable, name)
// new Thread(runnable, name + "-" + counter.getAndIncrement)
}
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
object MonitorableThread {
val DEFAULT_NAME = "MonitorableThread"
val created = new AtomicInteger
val alive = new AtomicInteger
@volatile var debugLifecycle = false
}
// FIXME fix the issues with using the monitoring in MonitorableThread
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class MonitorableThread(runnable: Runnable, name: String)
extends Thread(runnable, name + "-" + MonitorableThread.created.incrementAndGet) with Logging {
setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
def uncaughtException(thread: Thread, cause: Throwable) =
log.error(cause, "UNCAUGHT in thread [%s]", thread.getName)
})
override def run = {
val debug = MonitorableThread.debugLifecycle
log.debug("Created thread %s", getName)
try {
MonitorableThread.alive.incrementAndGet
super.run
} finally {
MonitorableThread.alive.decrementAndGet
log.debug("Exiting thread %s", getName)
}
override def run = {
val debug = MonitorableThread.debugLifecycle
log.debug("Created thread %s", getName)
try {
MonitorableThread.alive.incrementAndGet
super.run
} finally {
MonitorableThread.alive.decrementAndGet
log.debug("Exiting thread %s", getName)
}
}
}
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class BoundedExecutorDecorator(val executor: ExecutorService, bound: Int) extends ExecutorService with Logging {
protected val semaphore = new Semaphore(bound)
def execute(command: Runnable) = {
semaphore.acquire
try {
executor.execute(new Runnable() {
def run = {
try {
command.run
} finally {
semaphore.release
}
}
})
} catch {
case e: RejectedExecutionException =>
semaphore.release
case e =>
log.error(e,"Unexpected exception")
throw e
}
}
// Delegating methods for the ExecutorService interface
def shutdown = executor.shutdown
def shutdownNow = executor.shutdownNow
def isShutdown = executor.isShutdown
def isTerminated = executor.isTerminated
def awaitTermination(l: Long, timeUnit: TimeUnit) = executor.awaitTermination(l, timeUnit)
def submit[T](callable: Callable[T]) = executor.submit(callable)
def submit[T](runnable: Runnable, t: T) = executor.submit(runnable, t)
def submit(runnable: Runnable) = executor.submit(runnable)
def invokeAll[T](callables: Collection[_ <: Callable[T]]) = executor.invokeAll(callables)
def invokeAll[T](callables: Collection[_ <: Callable[T]], l: Long, timeUnit: TimeUnit) = executor.invokeAll(callables, l, timeUnit)
def invokeAny[T](callables: Collection[_ <: Callable[T]]) = executor.invokeAny(callables)
def invokeAny[T](callables: Collection[_ <: Callable[T]], l: Long, timeUnit: TimeUnit) = executor.invokeAny(callables, l, timeUnit)
}

View file

@ -5,7 +5,7 @@
package se.scalablesolutions.akka.util
import java.util.concurrent.locks.{ReentrantReadWriteLock, ReentrantLock}
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic. {AtomicBoolean}
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
@ -119,7 +119,7 @@ class SimpleLock {
class Switch(startAsOn: Boolean = false) {
private val switch = new AtomicBoolean(startAsOn)
protected def transcend(from: Boolean,action: => Unit): Boolean = {
protected def transcend(from: Boolean,action: => Unit): Boolean = synchronized {
if (switch.compareAndSet(from,!from)) {
try {
action
@ -135,8 +135,8 @@ class Switch(startAsOn: Boolean = false) {
def switchOff(action: => Unit): Boolean = transcend(from = true, action)
def switchOn(action: => Unit): Boolean = transcend(from = false,action)
def switchOff: Boolean = switch.compareAndSet(true,false)
def switchOn: Boolean = switch.compareAndSet(false,true)
def switchOff: Boolean = synchronized { switch.compareAndSet(true,false) }
def switchOn: Boolean = synchronized { switch.compareAndSet(false,true) }
def ifOnYield[T](action: => T): Option[T] = {
if (switch.get)

View file

@ -8,9 +8,9 @@ import se.scalablesolutions.akka.config.Supervision._
import AkkaSpringConfigurationTags._
import reflect.BeanProperty
import se.scalablesolutions.akka.actor.ActorRef
import se.scalablesolutions.akka.dispatch.{ThreadPoolBuilder, Dispatchers, MessageDispatcher}
import java.util.concurrent.RejectedExecutionHandler
import java.util.concurrent.ThreadPoolExecutor.{DiscardPolicy, DiscardOldestPolicy, CallerRunsPolicy, AbortPolicy}
import se.scalablesolutions.akka.dispatch._
/**
* Reusable factory method for dispatchers.
@ -23,56 +23,54 @@ object DispatcherFactoryBean {
* @param actorRef actorRef needed for thread based dispatcher
*/
def createNewInstance(properties: DispatcherProperties, actorRef: Option[ActorRef] = None): MessageDispatcher = {
def configThreadPool(): ThreadPoolConfig = {
val poolCfg = ThreadPoolConfig()
if ((properties.threadPool ne null) && (properties.threadPool.queue ne null)) {
properties.threadPool.queue match {
case VAL_BOUNDED_ARRAY_BLOCKING_QUEUE => threadPoolBuilder.withNewThreadPoolWithArrayBlockingQueueWithCapacityAndFairness(properties.threadPool.capacity, properties.threadPool.fairness)
case VAL_UNBOUNDED_LINKED_BLOCKING_QUEUE if properties.threadPool.capacity > -1 => threadPoolBuilder.withNewThreadPoolWithLinkedBlockingQueueWithCapacity(properties.threadPool.capacity)
case VAL_UNBOUNDED_LINKED_BLOCKING_QUEUE if properties.threadPool.capacity <= 0 => threadPoolBuilder.withNewThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity
case VAL_BOUNDED_LINKED_BLOCKING_QUEUE => threadPoolBuilder.withNewBoundedThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity(properties.threadPool.bound)
case VAL_SYNCHRONOUS_QUEUE => threadPoolBuilder.withNewThreadPoolWithSynchronousQueueWithFairness(properties.threadPool.fairness)
case _ => throw new IllegalArgumentException("unknown queue type")
}
if (properties.threadPool.corePoolSize > -1)
threadPoolBuilder.setCorePoolSize(properties.threadPool.corePoolSize)
if (properties.threadPool.maxPoolSize > -1)
threadPoolBuilder.setMaxPoolSize(properties.threadPool.maxPoolSize)
if (properties.threadPool.keepAlive > -1)
threadPoolBuilder.setKeepAliveTimeInMillis(properties.threadPool.keepAlive)
if (properties.threadPool.mailboxCapacity > -1)
threadPoolBuilder.setMailboxCapacity(properties.threadPool.mailboxCapacity)
if ((properties.threadPool.rejectionPolicy ne null) && (!properties.threadPool.rejectionPolicy.isEmpty)) {
val policy: RejectedExecutionHandler = properties.threadPool.rejectionPolicy match {
case "abort-policy" => new AbortPolicy()
case "caller-runs-policy" => new CallerRunsPolicy()
case "discard-oldest-policy" => new DiscardOldestPolicy()
case "discard-policy" => new DiscardPolicy()
case _ => throw new IllegalArgumentException("Unknown rejection-policy '" + properties.threadPool.rejectionPolicy + "'")
}
threadPoolBuilder.setRejectionPolicy(policy)
}
} else poolCfg
}
var dispatcher = properties.dispatcherType match {
case EXECUTOR_BASED_EVENT_DRIVEN => Dispatchers.newExecutorBasedEventDrivenDispatcher(properties.name)
case EXECUTOR_BASED_EVENT_DRIVEN => new ExecutorBasedEventDrivenDispatcher(properties.name, config = configThreadPool)
case EXECUTOR_BASED_EVENT_DRIVEN_WORK_STEALING => Dispatchers.newExecutorBasedEventDrivenWorkStealingDispatcher(properties.name)
case THREAD_BASED => if (!actorRef.isDefined) {
throw new IllegalArgumentException("Need an ActorRef to create a thread based dispatcher.")
} else {
Dispatchers.newThreadBasedDispatcher(actorRef.get)
}
case THREAD_BASED if actorRef.isEmpty => throw new IllegalArgumentException("Need an ActorRef to create a thread based dispatcher.")
case THREAD_BASED if actorRef.isDefined => Dispatchers.newThreadBasedDispatcher(actorRef.get)
case HAWT => Dispatchers.newHawtDispatcher(properties.aggregate)
case _ => throw new IllegalArgumentException("unknown dispatcher type")
}
// build threadpool
if ((properties.threadPool ne null) && (properties.threadPool.queue ne null)) {
var threadPoolBuilder = dispatcher.asInstanceOf[ThreadPoolBuilder]
threadPoolBuilder = properties.threadPool.queue match {
case VAL_BOUNDED_ARRAY_BLOCKING_QUEUE => threadPoolBuilder.withNewThreadPoolWithArrayBlockingQueueWithCapacityAndFairness(properties.threadPool.capacity, properties.threadPool.fairness)
case VAL_UNBOUNDED_LINKED_BLOCKING_QUEUE => if (properties.threadPool.capacity > -1)
threadPoolBuilder.withNewThreadPoolWithLinkedBlockingQueueWithCapacity(properties.threadPool.capacity)
else
threadPoolBuilder.withNewThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity
case VAL_BOUNDED_LINKED_BLOCKING_QUEUE => threadPoolBuilder.withNewBoundedThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity(properties.threadPool.bound)
case VAL_SYNCHRONOUS_QUEUE => threadPoolBuilder.withNewThreadPoolWithSynchronousQueueWithFairness(properties.threadPool.fairness)
case _ => throw new IllegalArgumentException("unknown queue type")
}
if (properties.threadPool.corePoolSize > -1) {
threadPoolBuilder.setCorePoolSize(properties.threadPool.corePoolSize)
}
if (properties.threadPool.maxPoolSize > -1) {
threadPoolBuilder.setMaxPoolSize(properties.threadPool.maxPoolSize)
}
if (properties.threadPool.keepAlive > -1) {
threadPoolBuilder.setKeepAliveTimeInMillis(properties.threadPool.keepAlive)
}
if (properties.threadPool.mailboxCapacity > -1) {
threadPoolBuilder.setMailboxCapacity(properties.threadPool.mailboxCapacity)
}
if ((properties.threadPool.rejectionPolicy ne null) && (!properties.threadPool.rejectionPolicy.isEmpty)) {
val policy: RejectedExecutionHandler = properties.threadPool.rejectionPolicy match {
case "abort-policy" => new AbortPolicy()
case "caller-runs-policy" => new CallerRunsPolicy()
case "discard-oldest-policy" => new DiscardOldestPolicy()
case "discard-policy" => new DiscardPolicy()
case _ => throw new IllegalArgumentException("Unknown rejection-policy '" + properties.threadPool.rejectionPolicy + "'")
}
threadPoolBuilder.setRejectionPolicy(policy)
}
threadPoolBuilder.asInstanceOf[MessageDispatcher]
} else {
dispatcher
}
dispatcher
}
}