diff --git a/akka-actor/src/main/scala/dispatch/Dispatchers.scala b/akka-actor/src/main/scala/dispatch/Dispatchers.scala index 55e819a2c8..4a9c3c0917 100644 --- a/akka-actor/src/main/scala/dispatch/Dispatchers.scala +++ b/akka-actor/src/main/scala/dispatch/Dispatchers.scala @@ -52,7 +52,7 @@ object Dispatchers extends Logging { val MAILBOX_PUSH_TIME_OUT = Duration(config.getInt("akka.actor.default-dispatcher.mailbox-push-timeout-time", 10), TIME_UNIT) val THROUGHPUT_DEADLINE_TIME = Duration(config.getInt("akka.actor.throughput-deadline-time",-1), TIME_UNIT) val THROUGHPUT_DEADLINE_TIME_MILLIS = THROUGHPUT_DEADLINE_TIME.toMillis.toInt - val MAILBOX_TYPE = if (MAILBOX_CAPACITY < 0) UnboundedMailbox() else BoundedMailbox() + val MAILBOX_TYPE: MailboxType = if (MAILBOX_CAPACITY < 0) UnboundedMailbox() else BoundedMailbox() lazy val defaultGlobalDispatcher = { config.getConfigMap("akka.actor.default-dispatcher").flatMap(from).getOrElse(globalExecutorBasedEventDrivenDispatcher) @@ -60,13 +60,7 @@ object Dispatchers extends Logging { object globalHawtDispatcher extends HawtDispatcher - object globalExecutorBasedEventDrivenDispatcher extends ExecutorBasedEventDrivenDispatcher( - "global", THROUGHPUT, THROUGHPUT_DEADLINE_TIME_MILLIS, MAILBOX_TYPE) { - override def register(actor: ActorRef) = { - if (isShutdown) init - super.register(actor) - } - } + object globalExecutorBasedEventDrivenDispatcher extends ExecutorBasedEventDrivenDispatcher("global", THROUGHPUT, THROUGHPUT_DEADLINE_TIME_MILLIS, MAILBOX_TYPE) /** * Creates an event-driven dispatcher based on the excellent HawtDispatch library. @@ -172,23 +166,27 @@ object Dispatchers extends Logging { def from(cfg: ConfigMap): Option[MessageDispatcher] = { lazy val name = cfg.getString("name", newUuid.toString) - def threadPoolConfig(b: ThreadPoolBuilder) { - b.configureIfPossible( builder => { - cfg.getInt("keep-alive-time").foreach(time => builder.setKeepAliveTimeInMillis(Duration(time, TIME_UNIT).toMillis.toInt)) - cfg.getDouble("core-pool-size-factor").foreach(builder.setCorePoolSizeFromFactor(_)) - cfg.getDouble("max-pool-size-factor").foreach(builder.setMaxPoolSizeFromFactor(_)) - cfg.getInt("executor-bounds").foreach(builder.setExecutorBounds(_)) - cfg.getBool("allow-core-timeout").foreach(builder.setAllowCoreThreadTimeout(_)) - cfg.getInt("mailbox-capacity").foreach(builder.setMailboxCapacity(_)) + def configureThreadPool(createDispatcher: => (ThreadPoolConfig) => MessageDispatcher): ThreadPoolConfigDispatcherBuilder = { + val builder = ThreadPoolConfigDispatcherBuilder(createDispatcher,ThreadPoolConfig()) //Create a new builder + //Creates a transformation from builder to builder, if the option isDefined + def conf_?[T](opt: Option[T])(fun: (T) => ThreadPoolConfigDispatcherBuilder => ThreadPoolConfigDispatcherBuilder): + Option[(ThreadPoolConfigDispatcherBuilder) => ThreadPoolConfigDispatcherBuilder] = opt map fun - cfg.getString("rejection-policy").map({ + //Apply the following options to the config if they are present in the cfg + List( + conf_?(cfg getInt "keep-alive-time" )(time => _.setKeepAliveTime(Duration(time, TIME_UNIT))), + conf_?(cfg getDouble "core-pool-size-factor")(factor => _.setCorePoolSizeFromFactor(factor)), + conf_?(cfg getDouble "max-pool-size-factor" )(factor => _.setMaxPoolSizeFromFactor(factor)), + conf_?(cfg getInt "executor-bounds" )(bounds => _.setExecutorBounds(bounds)), + conf_?(cfg getBool "allow-core-timeout" )(allow => _.setAllowCoreThreadTimeout(allow)), + conf_?(cfg getString "rejection-policy" map { case "abort" => new AbortPolicy() case "caller-runs" => new CallerRunsPolicy() case "discard-oldest" => new DiscardOldestPolicy() case "discard" => new DiscardPolicy() - case x => throw new IllegalArgumentException("[%s] is not a valid rejectionPolicy!" format x) - }).foreach(builder.setRejectionPolicy(_)) - }) + case x => throw new IllegalArgumentException("[%s] is not a valid rejectionPolicy!" format x) + })(policy => _.setRejectionPolicy(policy)) + ).foldLeft(builder)( (c,f) => f.map( _(c) ).getOrElse(c)) //Returns the builder with all the specified options set } lazy val mailboxType: MailboxType = { @@ -200,15 +198,17 @@ object Dispatchers extends Logging { cfg.getString("type") map { case "ExecutorBasedEventDriven" => - new ExecutorBasedEventDrivenDispatcher( + configureThreadPool(threadPoolConfig => new ExecutorBasedEventDrivenDispatcher( name, cfg.getInt("throughput", THROUGHPUT), cfg.getInt("throughput-deadline", THROUGHPUT_DEADLINE_TIME_MILLIS), mailboxType, - threadPoolConfig) + threadPoolConfig)).build - case "ExecutorBasedEventDrivenWorkStealing" => new ExecutorBasedEventDrivenWorkStealingDispatcher(name, mailboxType, threadPoolConfig) - case "Hawt" => new HawtDispatcher(cfg.getBool("aggregate").getOrElse(true)) + case "ExecutorBasedEventDrivenWorkStealing" => + configureThreadPool(poolCfg => new ExecutorBasedEventDrivenWorkStealingDispatcher(name, mailboxType,poolCfg)).build + + case "Hawt" => new HawtDispatcher(cfg.getBool("aggregate",true)) case "GlobalExecutorBasedEventDriven" => globalExecutorBasedEventDrivenDispatcher case "GlobalHawt" => globalHawtDispatcher case unknown => throw new IllegalArgumentException("Unknown dispatcher type [%s]" format unknown) diff --git a/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala b/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala index 213e68b863..4e4a1f9c59 100644 --- a/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala +++ b/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala @@ -8,8 +8,9 @@ import se.scalablesolutions.akka.actor.{ActorRef, IllegalActorStateException} import se.scalablesolutions.akka.util.ReflectiveAccess.EnterpriseModule import java.util.Queue -import java.util.concurrent.{RejectedExecutionException, ConcurrentLinkedQueue, LinkedBlockingQueue} import se.scalablesolutions.akka.util.Switch +import java.util.concurrent.atomic.AtomicReference +import java.util.concurrent. {ExecutorService, RejectedExecutionException, ConcurrentLinkedQueue, LinkedBlockingQueue} /** * Default settings are: @@ -69,11 +70,11 @@ class ExecutorBasedEventDrivenDispatcher( val throughput: Int = Dispatchers.THROUGHPUT, val throughputDeadlineTime: Int = Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, _mailboxType: MailboxType = Dispatchers.MAILBOX_TYPE, - config: (ThreadPoolBuilder) => Unit = _ => ()) - extends MessageDispatcher with ThreadPoolBuilder { + val config: ThreadPoolConfig = ThreadPoolConfig()) + extends MessageDispatcher { def this(_name: String, throughput: Int, throughputDeadlineTime: Int, mailboxType: MailboxType) = - this(_name, throughput, throughputDeadlineTime, mailboxType, _ => ()) // Needed for Java API usage + this(_name, throughput, throughputDeadlineTime, mailboxType,ThreadPoolConfig()) // Needed for Java API usage def this(_name: String, throughput: Int, mailboxType: MailboxType) = this(_name, throughput, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, mailboxType) // Needed for Java API usage @@ -84,14 +85,12 @@ class ExecutorBasedEventDrivenDispatcher( def this(_name: String) = this(_name, Dispatchers.THROUGHPUT, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, Dispatchers.MAILBOX_TYPE) // Needed for Java API usage + val name = "akka:event-driven:dispatcher:" + _name val mailboxType = Some(_mailboxType) private[akka] val active = new Switch(false) - - val name = "akka:event-driven:dispatcher:" + _name - - //Initialize - init + private[akka] val threadFactory = new MonitorableThreadFactory(name) + private[akka] val executorService = new AtomicReference[ExecutorService](null) def dispatch(invocation: MessageInvocation) = { val mbox = getMailbox(invocation.receiver) @@ -99,6 +98,8 @@ class ExecutorBasedEventDrivenDispatcher( registerForExecution(mbox) } + def isShutdown = active.isOff + /** * @return the mailbox associated with the actor */ @@ -112,8 +113,7 @@ class ExecutorBasedEventDrivenDispatcher( } case BoundedMailbox(blocking, capacity, pushTimeOut) => - val cap = if (mailboxCapacity == -1) capacity else mailboxCapacity - new DefaultBoundedMessageQueue(cap, pushTimeOut, blocking) with ExecutableMailbox { + new DefaultBoundedMessageQueue(capacity, pushTimeOut, blocking) with ExecutableMailbox { def dispatcher = ExecutorBasedEventDrivenDispatcher.this } } @@ -131,25 +131,28 @@ class ExecutorBasedEventDrivenDispatcher( case JMSBasedDurableMailbox(serializer) => throw new UnsupportedOperationException("JMSBasedDurableMailbox is not yet supported") } - def start = active switchOn { + def start: Unit = if (active.isOff) active switchOn { log.debug("Starting up %s\n\twith throughput [%d]", toString, throughput) + if (executorService.get() eq null) { + val newExecutor = config.createExecutorService(threadFactory) + if (!executorService.compareAndSet(null,newExecutor)) + log.error("Thought the ExecutorService was missing but appeared out of nowhere!") + } } - def shutdown = active switchOff { - log.debug("Shutting down %s", toString) - executor.shutdownNow - uuids.clear - } - - def ensureNotActive(): Unit = if (active.isOn) { - throw new IllegalActorStateException( - "Can't build a new thread pool for a dispatcher that is already up and running") + def shutdown: Unit = if (active.isOn) active switchOff { + val old = executorService.getAndSet(null) + if (old ne null) { + log.debug("Shutting down %s", toString) + old.shutdownNow() + uuids.clear + } } private[akka] def registerForExecution(mbox: MessageQueue with ExecutableMailbox): Unit = if (active.isOn) { if (mbox.suspended.isOff && mbox.dispatcherLock.tryLock()) { try { - executor execute mbox + executorService.get() execute mbox } catch { case e: RejectedExecutionException => mbox.dispatcherLock.unlock() @@ -171,13 +174,6 @@ class ExecutorBasedEventDrivenDispatcher( mbox.suspended.switchOff registerForExecution(mbox) } - - // FIXME: should we have an unbounded queue and not bounded as default ???? - private[akka] def init { - withNewThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity - config(this) - buildThreadPool - } } /** diff --git a/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala b/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala index 2bcc7c489f..f44bd60c50 100644 --- a/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala +++ b/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala @@ -33,11 +33,11 @@ import se.scalablesolutions.akka.util.Switch class ExecutorBasedEventDrivenWorkStealingDispatcher( _name: String, _mailboxType: MailboxType = Dispatchers.MAILBOX_TYPE, - config: (ThreadPoolBuilder) => Unit = _ => ()) extends MessageDispatcher with ThreadPoolBuilder { + config: ThreadPoolConfig = ThreadPoolConfig()) extends MessageDispatcher with ThreadPoolBuilder { - def this(_name: String, mailboxType: MailboxType) = this(_name, mailboxType, _ => ()) + def this(_name: String, mailboxType: MailboxType) = this(_name, mailboxType,ThreadPoolConfig()) - def this(_name: String) = this(_name, Dispatchers.MAILBOX_TYPE, _ => ()) + def this(_name: String) = this(_name, Dispatchers.MAILBOX_TYPE,ThreadPoolConfig()) val mailboxType = Some(_mailboxType) @@ -54,7 +54,6 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher( @volatile private var lastThiefIndex = 0 val name = "akka:event-driven-work-stealing:dispatcher:" + _name - init /** * @return the mailbox associated with the actor @@ -195,12 +194,6 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher( override val toString = "ExecutorBasedEventDrivenWorkStealingDispatcher[" + name + "]" - private[akka] def init = { - withNewThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity - config(this) - buildThreadPool - } - def createTransientMailbox(actorRef: ActorRef, mailboxType: TransientMailboxType): AnyRef = mailboxType match { case UnboundedMailbox(blocking) => // FIXME make use of 'blocking' in work stealer ConcurrentLinkedDeque new ConcurrentLinkedDeque[MessageInvocation] with MessageQueue with Runnable { diff --git a/akka-actor/src/main/scala/dispatch/MessageHandling.scala b/akka-actor/src/main/scala/dispatch/MessageHandling.scala index 86e4552151..88d978b311 100644 --- a/akka-actor/src/main/scala/dispatch/MessageHandling.scala +++ b/akka-actor/src/main/scala/dispatch/MessageHandling.scala @@ -69,6 +69,7 @@ trait MessageDispatcher extends MailboxFactory with Logging { def shutdown: Unit def register(actorRef: ActorRef) { + start if (actorRef.mailbox eq null) actorRef.mailbox = createMailbox(actorRef) uuids add actorRef.uuid } diff --git a/akka-actor/src/main/scala/dispatch/ThreadBasedDispatcher.scala b/akka-actor/src/main/scala/dispatch/ThreadBasedDispatcher.scala index 8c2229c6b3..247399818f 100644 --- a/akka-actor/src/main/scala/dispatch/ThreadBasedDispatcher.scala +++ b/akka-actor/src/main/scala/dispatch/ThreadBasedDispatcher.scala @@ -37,10 +37,6 @@ class ThreadBasedDispatcher(private val actor: ActorRef, _mailboxType: MailboxTy } object ThreadBasedDispatcher { - val oneThread: (ThreadPoolBuilder) => Unit = b => { - b setCorePoolSize 1 - b setMaxPoolSize 1 - b setAllowCoreThreadTimeout true - } + val oneThread: ThreadPoolConfig = ThreadPoolConfig(allowCorePoolTimeout = true, corePoolSize = 1, maxPoolSize = 1) } diff --git a/akka-actor/src/main/scala/dispatch/ThreadPoolBuilder.scala b/akka-actor/src/main/scala/dispatch/ThreadPoolBuilder.scala index 1e5e257349..e65d47796e 100644 --- a/akka-actor/src/main/scala/dispatch/ThreadPoolBuilder.scala +++ b/akka-actor/src/main/scala/dispatch/ThreadPoolBuilder.scala @@ -10,7 +10,131 @@ import atomic.{AtomicLong, AtomicInteger} import ThreadPoolExecutor.CallerRunsPolicy import se.scalablesolutions.akka.actor.IllegalActorStateException -import se.scalablesolutions.akka.util.{Logger, Logging} +import se.scalablesolutions.akka.util. {Duration, Logger, Logging} + +object ThreadPoolConfig { + type Bounds = Int + type FlowHandler = Either[RejectedExecutionHandler,Bounds] + type QueueFactory = () => BlockingQueue[Runnable] + + val defaultAllowCoreThreadTimeout: Boolean = false + val defaultCorePoolSize: Int = 16 + val defaultMaxPoolSize: Int = 128 + val defaultTimeout: Duration = Duration(60000L,TimeUnit.MILLISECONDS) + def defaultFlowHandler: FlowHandler = flowHandler(new CallerRunsPolicy) + + def flowHandler(rejectionHandler: RejectedExecutionHandler): FlowHandler = Left(rejectionHandler) + def flowHandler(bounds: Int): FlowHandler = Right(bounds) + + def fixedPoolSize(size: Int): Int = size + def scaledPoolSize(multiplier: Double): Int = + (Runtime.getRuntime.availableProcessors * multiplier).ceil.toInt + + def arrayBlockingQueue(capacity: Int, fair: Boolean): QueueFactory = + () => new ArrayBlockingQueue[Runnable](capacity,fair) + + def synchronousQueue(fair: Boolean): QueueFactory = + () => new SynchronousQueue[Runnable](fair) + + def linkedBlockingQueue(): QueueFactory = + () => new LinkedBlockingQueue[Runnable]() + + def linkedBlockingQueue(capacity: Int): QueueFactory = + () => new LinkedBlockingQueue[Runnable](capacity) + + def reusableQueue(queue: BlockingQueue[Runnable]): QueueFactory = + () => queue + + def reusableQueue(queueFactory: QueueFactory): QueueFactory = { + val queue = queueFactory() + () => queue + } +} + +case class ThreadPoolConfig(allowCorePoolTimeout: Boolean = ThreadPoolConfig.defaultAllowCoreThreadTimeout, + corePoolSize: Int = ThreadPoolConfig.defaultCorePoolSize, + maxPoolSize: Int = ThreadPoolConfig.defaultMaxPoolSize, + threadTimeout: Duration = ThreadPoolConfig.defaultTimeout, + flowHandler: ThreadPoolConfig.FlowHandler = ThreadPoolConfig.defaultFlowHandler, + queueFactory: ThreadPoolConfig.QueueFactory = ThreadPoolConfig.linkedBlockingQueue()) { + + final def createExecutorService(threadFactory: ThreadFactory): ExecutorService = { + flowHandler match { + case Left(rejectHandler) => + val service = new ThreadPoolExecutor(corePoolSize, maxPoolSize, threadTimeout.length, threadTimeout.unit, queueFactory(), threadFactory, rejectHandler) + service.allowCoreThreadTimeOut(allowCorePoolTimeout) + service + case Right(bounds) => + val service = new ThreadPoolExecutor(corePoolSize, maxPoolSize, threadTimeout.length, threadTimeout.unit, queueFactory(), threadFactory) + service.allowCoreThreadTimeOut(allowCorePoolTimeout) + new BoundedExecutorDecorator(service,bounds) + } + } +} + +trait DispatcherBuilder { + def build: MessageDispatcher +} + +case class ThreadPoolConfigDispatcherBuilder(dispatcherFactory: (ThreadPoolConfig) => MessageDispatcher, config: ThreadPoolConfig) extends DispatcherBuilder { + import ThreadPoolConfig._ + def build = dispatcherFactory(config) + + //TODO remove this, for backwards compat only + def buildThreadPool = build + + def withNewBoundedThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity(bounds: Int): ThreadPoolConfigDispatcherBuilder = + this.copy(config = config.copy(flowHandler = flowHandler(bounds), queueFactory = linkedBlockingQueue())) + + def withNewThreadPoolWithCustomBlockingQueue(newQueueFactory: QueueFactory): ThreadPoolConfigDispatcherBuilder = + this.copy(config = config.copy(flowHandler = defaultFlowHandler, queueFactory = newQueueFactory)) + + def withNewThreadPoolWithCustomBlockingQueue(queue: BlockingQueue[Runnable]): ThreadPoolConfigDispatcherBuilder = + withNewThreadPoolWithCustomBlockingQueue(reusableQueue(queue)) + + def withNewThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity: ThreadPoolConfigDispatcherBuilder = + this.copy(config = config.copy(queueFactory = linkedBlockingQueue(), flowHandler = defaultFlowHandler)) + + def withNewThreadPoolWithLinkedBlockingQueueWithCapacity(capacity: Int): ThreadPoolConfigDispatcherBuilder = + this.copy(config = config.copy(queueFactory = linkedBlockingQueue(capacity), flowHandler = defaultFlowHandler)) + + def withNewThreadPoolWithSynchronousQueueWithFairness(fair: Boolean): ThreadPoolConfigDispatcherBuilder = + this.copy(config = config.copy(queueFactory = synchronousQueue(fair), flowHandler = defaultFlowHandler)) + + def withNewThreadPoolWithArrayBlockingQueueWithCapacityAndFairness(capacity: Int, fair: Boolean): ThreadPoolConfigDispatcherBuilder = + this.copy(config = config.copy(queueFactory = arrayBlockingQueue(capacity,fair), flowHandler = defaultFlowHandler)) + + def setCorePoolSize(size: Int): ThreadPoolConfigDispatcherBuilder = + this.copy(config = config.copy(corePoolSize = size)) + + def setMaxPoolSize(size: Int): ThreadPoolConfigDispatcherBuilder = + this.copy(config = config.copy(maxPoolSize = size)) + + def setCorePoolSizeFromFactor(multiplier: Double): ThreadPoolConfigDispatcherBuilder = + setCorePoolSize(scaledPoolSize(multiplier)) + + def setMaxPoolSizeFromFactor(multiplier: Double): ThreadPoolConfigDispatcherBuilder = + setMaxPoolSize(scaledPoolSize(multiplier)) + + def setExecutorBounds(bounds: Int): ThreadPoolConfigDispatcherBuilder = + this.copy(config = config.copy(flowHandler = flowHandler(bounds))) + + def setKeepAliveTimeInMillis(time: Long): ThreadPoolConfigDispatcherBuilder = + setKeepAliveTime(Duration(time,TimeUnit.MILLISECONDS)) + + def setKeepAliveTime(time: Duration): ThreadPoolConfigDispatcherBuilder = + this.copy(config = config.copy(threadTimeout = time)) + + def setRejectionPolicy(policy: RejectedExecutionHandler): ThreadPoolConfigDispatcherBuilder = + setFlowHandler(flowHandler(policy)) + + def setFlowHandler(newFlowHandler: FlowHandler): ThreadPoolConfigDispatcherBuilder = + this.copy(config = config.copy(flowHandler = newFlowHandler)) + + def setAllowCoreThreadTimeout(allow: Boolean): ThreadPoolConfigDispatcherBuilder = + this.copy(config = config.copy(allowCorePoolTimeout = allow)) +} + trait ThreadPoolBuilder extends Logging { val name: String @@ -26,13 +150,13 @@ trait ThreadPoolBuilder extends Logging { @volatile private var inProcessOfBuilding = false private var blockingQueue: BlockingQueue[Runnable] = _ - private lazy val threadFactory = new MonitorableThreadFactory(name) + protected lazy val threadFactory = new MonitorableThreadFactory(name) - protected var executor: ExecutorService = _ + @volatile var executor: ExecutorService = _ def isShutdown = executor.isShutdown - def buildThreadPool(): Unit = synchronized { + def buildThreadPool(): ExecutorService = synchronized { ensureNotActive inProcessOfBuilding = false @@ -51,6 +175,7 @@ trait ThreadPoolBuilder extends Logging { } else { executor = threadPoolBuilder } + executor } def withNewThreadPoolWithCustomBlockingQueue(queue: BlockingQueue[Runnable]): ThreadPoolBuilder = synchronized { @@ -204,104 +329,105 @@ trait ThreadPoolBuilder extends Logging { } def ensureNotActive(): Unit +} - /** - * @author Jonas Bonér - */ - class BoundedExecutorDecorator(val executor: ExecutorService, bound: Int) extends ExecutorService with Logging { - protected val semaphore = new Semaphore(bound) - def execute(command: Runnable) = { - semaphore.acquire - try { - executor.execute(new Runnable() { - def run = { - try { - command.run - } finally { - semaphore.release - } - } - }) - } catch { - case e: RejectedExecutionException => - semaphore.release - case e => - log.error(e,"Unexpected exception") - throw e - } - } +/** + * @author Jonas Bonér + */ +class MonitorableThreadFactory(val name: String) extends ThreadFactory { + protected val counter = new AtomicLong - // Delegating methods for the ExecutorService interface - def shutdown = executor.shutdown + def newThread(runnable: Runnable) = + new MonitorableThread(runnable, name) +// new Thread(runnable, name + "-" + counter.getAndIncrement) +} - def shutdownNow = executor.shutdownNow +/** + * @author Jonas Bonér + */ +object MonitorableThread { + val DEFAULT_NAME = "MonitorableThread" + val created = new AtomicInteger + val alive = new AtomicInteger + @volatile var debugLifecycle = false +} - def isShutdown = executor.isShutdown +// FIXME fix the issues with using the monitoring in MonitorableThread - def isTerminated = executor.isTerminated +/** + * @author Jonas Bonér + */ +class MonitorableThread(runnable: Runnable, name: String) + extends Thread(runnable, name + "-" + MonitorableThread.created.incrementAndGet) with Logging { - def awaitTermination(l: Long, timeUnit: TimeUnit) = executor.awaitTermination(l, timeUnit) + setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() { + def uncaughtException(thread: Thread, cause: Throwable) = + log.error(cause, "UNCAUGHT in thread [%s]", thread.getName) + }) - def submit[T](callable: Callable[T]) = executor.submit(callable) - - def submit[T](runnable: Runnable, t: T) = executor.submit(runnable, t) - - def submit(runnable: Runnable) = executor.submit(runnable) - - def invokeAll[T](callables: Collection[_ <: Callable[T]]) = executor.invokeAll(callables) - - def invokeAll[T](callables: Collection[_ <: Callable[T]], l: Long, timeUnit: TimeUnit) = executor.invokeAll(callables, l, timeUnit) - - def invokeAny[T](callables: Collection[_ <: Callable[T]]) = executor.invokeAny(callables) - - def invokeAny[T](callables: Collection[_ <: Callable[T]], l: Long, timeUnit: TimeUnit) = executor.invokeAny(callables, l, timeUnit) - } - - /** - * @author Jonas Bonér - */ - class MonitorableThreadFactory(val name: String) extends ThreadFactory { - protected val counter = new AtomicLong - - def newThread(runnable: Runnable) = - new MonitorableThread(runnable, name) - // new Thread(runnable, name + "-" + counter.getAndIncrement) - } - - /** - * @author Jonas Bonér - */ - object MonitorableThread { - val DEFAULT_NAME = "MonitorableThread" - val created = new AtomicInteger - val alive = new AtomicInteger - @volatile var debugLifecycle = false - } - - // FIXME fix the issues with using the monitoring in MonitorableThread - - /** - * @author Jonas Bonér - */ - class MonitorableThread(runnable: Runnable, name: String) - extends Thread(runnable, name + "-" + MonitorableThread.created.incrementAndGet) with Logging { - - setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() { - def uncaughtException(thread: Thread, cause: Throwable) = - log.error(cause, "UNCAUGHT in thread [%s]", thread.getName) - }) - - override def run = { - val debug = MonitorableThread.debugLifecycle - log.debug("Created thread %s", getName) - try { - MonitorableThread.alive.incrementAndGet - super.run - } finally { - MonitorableThread.alive.decrementAndGet - log.debug("Exiting thread %s", getName) - } + override def run = { + val debug = MonitorableThread.debugLifecycle + log.debug("Created thread %s", getName) + try { + MonitorableThread.alive.incrementAndGet + super.run + } finally { + MonitorableThread.alive.decrementAndGet + log.debug("Exiting thread %s", getName) } } } + +/** + * @author Jonas Bonér + */ +class BoundedExecutorDecorator(val executor: ExecutorService, bound: Int) extends ExecutorService with Logging { + protected val semaphore = new Semaphore(bound) + + def execute(command: Runnable) = { + semaphore.acquire + try { + executor.execute(new Runnable() { + def run = { + try { + command.run + } finally { + semaphore.release + } + } + }) + } catch { + case e: RejectedExecutionException => + semaphore.release + case e => + log.error(e,"Unexpected exception") + throw e + } + } + + // Delegating methods for the ExecutorService interface + def shutdown = executor.shutdown + + def shutdownNow = executor.shutdownNow + + def isShutdown = executor.isShutdown + + def isTerminated = executor.isTerminated + + def awaitTermination(l: Long, timeUnit: TimeUnit) = executor.awaitTermination(l, timeUnit) + + def submit[T](callable: Callable[T]) = executor.submit(callable) + + def submit[T](runnable: Runnable, t: T) = executor.submit(runnable, t) + + def submit(runnable: Runnable) = executor.submit(runnable) + + def invokeAll[T](callables: Collection[_ <: Callable[T]]) = executor.invokeAll(callables) + + def invokeAll[T](callables: Collection[_ <: Callable[T]], l: Long, timeUnit: TimeUnit) = executor.invokeAll(callables, l, timeUnit) + + def invokeAny[T](callables: Collection[_ <: Callable[T]]) = executor.invokeAny(callables) + + def invokeAny[T](callables: Collection[_ <: Callable[T]], l: Long, timeUnit: TimeUnit) = executor.invokeAny(callables, l, timeUnit) +} diff --git a/akka-actor/src/main/scala/util/LockUtil.scala b/akka-actor/src/main/scala/util/LockUtil.scala index 7c37267157..909713194b 100644 --- a/akka-actor/src/main/scala/util/LockUtil.scala +++ b/akka-actor/src/main/scala/util/LockUtil.scala @@ -5,7 +5,7 @@ package se.scalablesolutions.akka.util import java.util.concurrent.locks.{ReentrantReadWriteLock, ReentrantLock} -import java.util.concurrent.atomic.AtomicBoolean +import java.util.concurrent.atomic. {AtomicBoolean} /** * @author Jonas Bonér @@ -119,7 +119,7 @@ class SimpleLock { class Switch(startAsOn: Boolean = false) { private val switch = new AtomicBoolean(startAsOn) - protected def transcend(from: Boolean,action: => Unit): Boolean = { + protected def transcend(from: Boolean,action: => Unit): Boolean = synchronized { if (switch.compareAndSet(from,!from)) { try { action @@ -135,8 +135,8 @@ class Switch(startAsOn: Boolean = false) { def switchOff(action: => Unit): Boolean = transcend(from = true, action) def switchOn(action: => Unit): Boolean = transcend(from = false,action) - def switchOff: Boolean = switch.compareAndSet(true,false) - def switchOn: Boolean = switch.compareAndSet(false,true) + def switchOff: Boolean = synchronized { switch.compareAndSet(true,false) } + def switchOn: Boolean = synchronized { switch.compareAndSet(false,true) } def ifOnYield[T](action: => T): Option[T] = { if (switch.get) diff --git a/akka-spring/src/main/scala/DispatcherFactoryBean.scala b/akka-spring/src/main/scala/DispatcherFactoryBean.scala index 893b44e24d..d273e75011 100644 --- a/akka-spring/src/main/scala/DispatcherFactoryBean.scala +++ b/akka-spring/src/main/scala/DispatcherFactoryBean.scala @@ -8,9 +8,9 @@ import se.scalablesolutions.akka.config.Supervision._ import AkkaSpringConfigurationTags._ import reflect.BeanProperty import se.scalablesolutions.akka.actor.ActorRef -import se.scalablesolutions.akka.dispatch.{ThreadPoolBuilder, Dispatchers, MessageDispatcher} import java.util.concurrent.RejectedExecutionHandler import java.util.concurrent.ThreadPoolExecutor.{DiscardPolicy, DiscardOldestPolicy, CallerRunsPolicy, AbortPolicy} +import se.scalablesolutions.akka.dispatch._ /** * Reusable factory method for dispatchers. @@ -23,56 +23,54 @@ object DispatcherFactoryBean { * @param actorRef actorRef needed for thread based dispatcher */ def createNewInstance(properties: DispatcherProperties, actorRef: Option[ActorRef] = None): MessageDispatcher = { + + def configThreadPool(): ThreadPoolConfig = { + val poolCfg = ThreadPoolConfig() + if ((properties.threadPool ne null) && (properties.threadPool.queue ne null)) { + properties.threadPool.queue match { + case VAL_BOUNDED_ARRAY_BLOCKING_QUEUE => threadPoolBuilder.withNewThreadPoolWithArrayBlockingQueueWithCapacityAndFairness(properties.threadPool.capacity, properties.threadPool.fairness) + case VAL_UNBOUNDED_LINKED_BLOCKING_QUEUE if properties.threadPool.capacity > -1 => threadPoolBuilder.withNewThreadPoolWithLinkedBlockingQueueWithCapacity(properties.threadPool.capacity) + case VAL_UNBOUNDED_LINKED_BLOCKING_QUEUE if properties.threadPool.capacity <= 0 => threadPoolBuilder.withNewThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity + case VAL_BOUNDED_LINKED_BLOCKING_QUEUE => threadPoolBuilder.withNewBoundedThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity(properties.threadPool.bound) + case VAL_SYNCHRONOUS_QUEUE => threadPoolBuilder.withNewThreadPoolWithSynchronousQueueWithFairness(properties.threadPool.fairness) + case _ => throw new IllegalArgumentException("unknown queue type") + } + + if (properties.threadPool.corePoolSize > -1) + threadPoolBuilder.setCorePoolSize(properties.threadPool.corePoolSize) + + if (properties.threadPool.maxPoolSize > -1) + threadPoolBuilder.setMaxPoolSize(properties.threadPool.maxPoolSize) + + if (properties.threadPool.keepAlive > -1) + threadPoolBuilder.setKeepAliveTimeInMillis(properties.threadPool.keepAlive) + + if (properties.threadPool.mailboxCapacity > -1) + threadPoolBuilder.setMailboxCapacity(properties.threadPool.mailboxCapacity) + + if ((properties.threadPool.rejectionPolicy ne null) && (!properties.threadPool.rejectionPolicy.isEmpty)) { + val policy: RejectedExecutionHandler = properties.threadPool.rejectionPolicy match { + case "abort-policy" => new AbortPolicy() + case "caller-runs-policy" => new CallerRunsPolicy() + case "discard-oldest-policy" => new DiscardOldestPolicy() + case "discard-policy" => new DiscardPolicy() + case _ => throw new IllegalArgumentException("Unknown rejection-policy '" + properties.threadPool.rejectionPolicy + "'") + } + threadPoolBuilder.setRejectionPolicy(policy) + } + } else poolCfg + } + var dispatcher = properties.dispatcherType match { - case EXECUTOR_BASED_EVENT_DRIVEN => Dispatchers.newExecutorBasedEventDrivenDispatcher(properties.name) + case EXECUTOR_BASED_EVENT_DRIVEN => new ExecutorBasedEventDrivenDispatcher(properties.name, config = configThreadPool) case EXECUTOR_BASED_EVENT_DRIVEN_WORK_STEALING => Dispatchers.newExecutorBasedEventDrivenWorkStealingDispatcher(properties.name) - case THREAD_BASED => if (!actorRef.isDefined) { - throw new IllegalArgumentException("Need an ActorRef to create a thread based dispatcher.") - } else { - Dispatchers.newThreadBasedDispatcher(actorRef.get) - } + case THREAD_BASED if actorRef.isEmpty => throw new IllegalArgumentException("Need an ActorRef to create a thread based dispatcher.") + case THREAD_BASED if actorRef.isDefined => Dispatchers.newThreadBasedDispatcher(actorRef.get) case HAWT => Dispatchers.newHawtDispatcher(properties.aggregate) case _ => throw new IllegalArgumentException("unknown dispatcher type") } - // build threadpool - if ((properties.threadPool ne null) && (properties.threadPool.queue ne null)) { - var threadPoolBuilder = dispatcher.asInstanceOf[ThreadPoolBuilder] - threadPoolBuilder = properties.threadPool.queue match { - case VAL_BOUNDED_ARRAY_BLOCKING_QUEUE => threadPoolBuilder.withNewThreadPoolWithArrayBlockingQueueWithCapacityAndFairness(properties.threadPool.capacity, properties.threadPool.fairness) - case VAL_UNBOUNDED_LINKED_BLOCKING_QUEUE => if (properties.threadPool.capacity > -1) - threadPoolBuilder.withNewThreadPoolWithLinkedBlockingQueueWithCapacity(properties.threadPool.capacity) - else - threadPoolBuilder.withNewThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity - case VAL_BOUNDED_LINKED_BLOCKING_QUEUE => threadPoolBuilder.withNewBoundedThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity(properties.threadPool.bound) - case VAL_SYNCHRONOUS_QUEUE => threadPoolBuilder.withNewThreadPoolWithSynchronousQueueWithFairness(properties.threadPool.fairness) - case _ => throw new IllegalArgumentException("unknown queue type") - } - if (properties.threadPool.corePoolSize > -1) { - threadPoolBuilder.setCorePoolSize(properties.threadPool.corePoolSize) - } - if (properties.threadPool.maxPoolSize > -1) { - threadPoolBuilder.setMaxPoolSize(properties.threadPool.maxPoolSize) - } - if (properties.threadPool.keepAlive > -1) { - threadPoolBuilder.setKeepAliveTimeInMillis(properties.threadPool.keepAlive) - } - if (properties.threadPool.mailboxCapacity > -1) { - threadPoolBuilder.setMailboxCapacity(properties.threadPool.mailboxCapacity) - } - if ((properties.threadPool.rejectionPolicy ne null) && (!properties.threadPool.rejectionPolicy.isEmpty)) { - val policy: RejectedExecutionHandler = properties.threadPool.rejectionPolicy match { - case "abort-policy" => new AbortPolicy() - case "caller-runs-policy" => new CallerRunsPolicy() - case "discard-oldest-policy" => new DiscardOldestPolicy() - case "discard-policy" => new DiscardPolicy() - case _ => throw new IllegalArgumentException("Unknown rejection-policy '" + properties.threadPool.rejectionPolicy + "'") - } - threadPoolBuilder.setRejectionPolicy(policy) - } - threadPoolBuilder.asInstanceOf[MessageDispatcher] - } else { - dispatcher - } + + dispatcher } }