diff --git a/akka-actor/src/main/scala/dispatch/Dispatchers.scala b/akka-actor/src/main/scala/dispatch/Dispatchers.scala index 4a9c3c0917..d7592d49b7 100644 --- a/akka-actor/src/main/scala/dispatch/Dispatchers.scala +++ b/akka-actor/src/main/scala/dispatch/Dispatchers.scala @@ -100,7 +100,8 @@ object Dispatchers extends Logging { *
* Has a fluent builder interface for configuring its semantics. */ - def newExecutorBasedEventDrivenDispatcher(name: String) = new ExecutorBasedEventDrivenDispatcher(name) + def newExecutorBasedEventDrivenDispatcher(name: String) = + ThreadPoolConfigDispatcherBuilder(config => new ExecutorBasedEventDrivenDispatcher(name,config),ThreadPoolConfig()) /** * Creates a executor-based event-driven dispatcher serving multiple (millions) of actors through a thread pool. @@ -108,7 +109,8 @@ object Dispatchers extends Logging { * Has a fluent builder interface for configuring its semantics. */ def newExecutorBasedEventDrivenDispatcher(name: String, throughput: Int, mailboxType: MailboxType) = - new ExecutorBasedEventDrivenDispatcher(name, throughput, mailboxType) + ThreadPoolConfigDispatcherBuilder(config => + new ExecutorBasedEventDrivenDispatcher(name, throughput, THROUGHPUT_DEADLINE_TIME_MILLIS, mailboxType, config),ThreadPoolConfig()) /** @@ -116,30 +118,32 @@ object Dispatchers extends Logging { * * Has a fluent builder interface for configuring its semantics. */ - def newExecutorBasedEventDrivenDispatcher(name: String, throughput: Int, throughputDeadlineMs: Int, mailboxType: MailboxType) = - new ExecutorBasedEventDrivenDispatcher(name, throughput, throughputDeadlineMs, mailboxType) + def newExecutorBasedEventDrivenDispatcher(name: String, throughput: Int, throughputDeadlineMs: Int, mailboxType: MailboxType) = + ThreadPoolConfigDispatcherBuilder(config => + new ExecutorBasedEventDrivenDispatcher(name, throughput, throughputDeadlineMs, mailboxType, config),ThreadPoolConfig()) /** * Creates a executor-based event-driven dispatcher with work stealing (TODO: better doc) serving multiple (millions) of actors through a thread pool. * * Has a fluent builder interface for configuring its semantics. */ - def newExecutorBasedEventDrivenWorkStealingDispatcher(name: String) = new ExecutorBasedEventDrivenWorkStealingDispatcher(name) + def newExecutorBasedEventDrivenWorkStealingDispatcher(name: String): ThreadPoolConfigDispatcherBuilder = + newExecutorBasedEventDrivenWorkStealingDispatcher(name,MAILBOX_TYPE) /** * Creates a executor-based event-driven dispatcher with work stealing (TODO: better doc) serving multiple (millions) of actors through a thread pool. * * Has a fluent builder interface for configuring its semantics. */ - def newExecutorBasedEventDrivenWorkStealingDispatcher(name: String, mailboxType: MailboxType) = - new ExecutorBasedEventDrivenWorkStealingDispatcher(name, mailboxType = mailboxType) + def newExecutorBasedEventDrivenWorkStealingDispatcher(name: String, mailboxType: MailboxType) = + ThreadPoolConfigDispatcherBuilder(config => new ExecutorBasedEventDrivenWorkStealingDispatcher(name,mailboxType,config),ThreadPoolConfig()) /** * Utility function that tries to load the specified dispatcher config from the akka.conf * or else use the supplied default dispatcher */ def fromConfig(key: String, default: => MessageDispatcher = defaultGlobalDispatcher): MessageDispatcher = - config.getConfigMap(key).flatMap(from).getOrElse(default) + config getConfigMap key flatMap from getOrElse default /* * Creates of obtains a dispatcher from a ConfigMap according to the format below @@ -167,13 +171,10 @@ object Dispatchers extends Logging { lazy val name = cfg.getString("name", newUuid.toString) def configureThreadPool(createDispatcher: => (ThreadPoolConfig) => MessageDispatcher): ThreadPoolConfigDispatcherBuilder = { - val builder = ThreadPoolConfigDispatcherBuilder(createDispatcher,ThreadPoolConfig()) //Create a new builder - //Creates a transformation from builder to builder, if the option isDefined - def conf_?[T](opt: Option[T])(fun: (T) => ThreadPoolConfigDispatcherBuilder => ThreadPoolConfigDispatcherBuilder): - Option[(ThreadPoolConfigDispatcherBuilder) => ThreadPoolConfigDispatcherBuilder] = opt map fun + import ThreadPoolConfigDispatcherBuilder.conf_? //Apply the following options to the config if they are present in the cfg - List( + ThreadPoolConfigDispatcherBuilder(createDispatcher,ThreadPoolConfig()).configure( conf_?(cfg getInt "keep-alive-time" )(time => _.setKeepAliveTime(Duration(time, TIME_UNIT))), conf_?(cfg getDouble "core-pool-size-factor")(factor => _.setCorePoolSizeFromFactor(factor)), conf_?(cfg getDouble "max-pool-size-factor" )(factor => _.setMaxPoolSizeFromFactor(factor)), @@ -185,8 +186,7 @@ object Dispatchers extends Logging { case "discard-oldest" => new DiscardOldestPolicy() case "discard" => new DiscardPolicy() case x => throw new IllegalArgumentException("[%s] is not a valid rejectionPolicy!" format x) - })(policy => _.setRejectionPolicy(policy)) - ).foldLeft(builder)( (c,f) => f.map( _(c) ).getOrElse(c)) //Returns the builder with all the specified options set + })(policy => _.setRejectionPolicy(policy))) } lazy val mailboxType: MailboxType = { diff --git a/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala b/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala index 4e4a1f9c59..6ec5f6963e 100644 --- a/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala +++ b/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala @@ -82,6 +82,9 @@ class ExecutorBasedEventDrivenDispatcher( def this(_name: String, throughput: Int) = this(_name, throughput, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, Dispatchers.MAILBOX_TYPE) // Needed for Java API usage + def this(_name: String, _config: ThreadPoolConfig) = + this(_name, Dispatchers.THROUGHPUT, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, Dispatchers.MAILBOX_TYPE, _config) + def this(_name: String) = this(_name, Dispatchers.THROUGHPUT, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, Dispatchers.MAILBOX_TYPE) // Needed for Java API usage @@ -90,7 +93,7 @@ class ExecutorBasedEventDrivenDispatcher( private[akka] val active = new Switch(false) private[akka] val threadFactory = new MonitorableThreadFactory(name) - private[akka] val executorService = new AtomicReference[ExecutorService](null) + private[akka] val executorService = new AtomicReference[ExecutorService](config.createLazyExecutorService(threadFactory)) def dispatch(invocation: MessageInvocation) = { val mbox = getMailbox(invocation.receiver) @@ -133,15 +136,10 @@ class ExecutorBasedEventDrivenDispatcher( def start: Unit = if (active.isOff) active switchOn { log.debug("Starting up %s\n\twith throughput [%d]", toString, throughput) - if (executorService.get() eq null) { - val newExecutor = config.createExecutorService(threadFactory) - if (!executorService.compareAndSet(null,newExecutor)) - log.error("Thought the ExecutorService was missing but appeared out of nowhere!") - } } def shutdown: Unit = if (active.isOn) active switchOff { - val old = executorService.getAndSet(null) + val old = executorService.getAndSet(config.createLazyExecutorService(threadFactory)) if (old ne null) { log.debug("Shutting down %s", toString) old.shutdownNow() diff --git a/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala b/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala index f44bd60c50..a9ea028210 100644 --- a/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala +++ b/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala @@ -4,11 +4,12 @@ package se.scalablesolutions.akka.dispatch -import java.util.concurrent.CopyOnWriteArrayList import jsr166x.{Deque, ConcurrentLinkedDeque, LinkedBlockingDeque} import se.scalablesolutions.akka.actor.{Actor, ActorRef, IllegalActorStateException} import se.scalablesolutions.akka.util.Switch +import java.util.concurrent. {ExecutorService, CopyOnWriteArrayList} +import java.util.concurrent.atomic.AtomicReference /** * An executor based event driven dispatcher which will try to redistribute work from busy actors to idle actors. It is assumed @@ -33,27 +34,28 @@ import se.scalablesolutions.akka.util.Switch class ExecutorBasedEventDrivenWorkStealingDispatcher( _name: String, _mailboxType: MailboxType = Dispatchers.MAILBOX_TYPE, - config: ThreadPoolConfig = ThreadPoolConfig()) extends MessageDispatcher with ThreadPoolBuilder { + config: ThreadPoolConfig = ThreadPoolConfig()) extends MessageDispatcher { def this(_name: String, mailboxType: MailboxType) = this(_name, mailboxType,ThreadPoolConfig()) def this(_name: String) = this(_name, Dispatchers.MAILBOX_TYPE,ThreadPoolConfig()) + + //implicit def actorRef2actor(actorRef: ActorRef): Actor = actorRef.actor val mailboxType = Some(_mailboxType) - + val name = "akka:event-driven-work-stealing:dispatcher:" + _name private val active = new Switch(false) - implicit def actorRef2actor(actorRef: ActorRef): Actor = actorRef.actor - /** Type of the actors registered in this dispatcher. */ - private var actorType: Option[Class[_]] = None - + @volatile private var actorType: Option[Class[_]] = None private val pooledActors = new CopyOnWriteArrayList[ActorRef] + private[akka] val threadFactory = new MonitorableThreadFactory(name) + private[akka] val executorService = new AtomicReference[ExecutorService](config.createLazyExecutorService(threadFactory)) /** The index in the pooled actors list which was last used to steal work */ @volatile private var lastThiefIndex = 0 - val name = "akka:event-driven-work-stealing:dispatcher:" + _name + /** * @return the mailbox associated with the actor @@ -65,7 +67,7 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher( def dispatch(invocation: MessageInvocation) = if (active.isOn) { val mbox = getMailbox(invocation.receiver) mbox enqueue invocation - executor execute mbox + executorService.get() execute mbox } else throw new IllegalActorStateException("Can't submit invocations to dispatcher since it's not started") /** @@ -92,6 +94,8 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher( mailboxWasProcessed } + def isShutdown = active.isOff + /** * Process the messages in the mailbox of the given actor. * @return @@ -160,10 +164,10 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher( private def donateMessage(receiver: ActorRef, thief: ActorRef): Boolean = { val donated = getMailbox(receiver).pollLast if (donated ne null) { - if (donated.senderFuture.isDefined) thief.self.postMessageToMailboxAndCreateFutureResultWithTimeout[Any]( + if (donated.senderFuture.isDefined) thief.postMessageToMailboxAndCreateFutureResultWithTimeout[Any]( donated.message, receiver.timeout, donated.sender, donated.senderFuture) - else if (donated.sender.isDefined) thief.self.postMessageToMailbox(donated.message, donated.sender) - else thief.self.postMessageToMailbox(donated.message, None) + else if (donated.sender.isDefined) thief.postMessageToMailbox(donated.message, donated.sender) + else thief.postMessageToMailbox(donated.message, None) true } else false } @@ -172,10 +176,13 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher( log.debug("Starting up %s",toString) } - def shutdown = active switchOff { - log.debug("Shutting down %s", toString) - executor.shutdownNow - uuids.clear + def shutdown: Unit = if (active.isOn) active switchOff { + val old = executorService.getAndSet(config.createLazyExecutorService(threadFactory)) + if (old ne null) { + log.debug("Shutting down %s", toString) + old.shutdownNow() + uuids.clear + } } @@ -186,7 +193,7 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher( def resume(actorRef: ActorRef) { val mbox = getMailbox(actorRef) mbox.suspended.switchOff - executor execute mbox + executorService.get() execute mbox } def ensureNotActive(): Unit = if (active.isOn) throw new IllegalActorStateException( @@ -207,9 +214,8 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher( findThief(actorRef).foreach( tryDonateAndProcessMessages(actorRef,_) ) } } - case BoundedMailbox(blocking, capacity, pushTimeOut) => - val cap = if (mailboxCapacity == -1) capacity else mailboxCapacity - new LinkedBlockingDeque[MessageInvocation](cap) with MessageQueue with Runnable { + case BoundedMailbox(blocking, capacity, pushTimeOut) => + new LinkedBlockingDeque[MessageInvocation](capacity) with MessageQueue with Runnable { def enqueue(handle: MessageInvocation): Unit = this.add(handle) def dequeue: MessageInvocation = this.poll() diff --git a/akka-actor/src/main/scala/dispatch/ThreadPoolBuilder.scala b/akka-actor/src/main/scala/dispatch/ThreadPoolBuilder.scala index e65d47796e..ef53e81eac 100644 --- a/akka-actor/src/main/scala/dispatch/ThreadPoolBuilder.scala +++ b/akka-actor/src/main/scala/dispatch/ThreadPoolBuilder.scala @@ -58,6 +58,9 @@ case class ThreadPoolConfig(allowCorePoolTimeout: Boolean = ThreadPoolConfig.def flowHandler: ThreadPoolConfig.FlowHandler = ThreadPoolConfig.defaultFlowHandler, queueFactory: ThreadPoolConfig.QueueFactory = ThreadPoolConfig.linkedBlockingQueue()) { + final def createLazyExecutorService(threadFactory: ThreadFactory): ExecutorService = + new LazyExecutorServiceWrapper(createExecutorService(threadFactory)) + final def createExecutorService(threadFactory: ThreadFactory): ExecutorService = { flowHandler match { case Left(rejectHandler) => @@ -76,6 +79,11 @@ trait DispatcherBuilder { def build: MessageDispatcher } +object ThreadPoolConfigDispatcherBuilder { + def conf_?[T](opt: Option[T])(fun: (T) => ThreadPoolConfigDispatcherBuilder => ThreadPoolConfigDispatcherBuilder): + Option[(ThreadPoolConfigDispatcherBuilder) => ThreadPoolConfigDispatcherBuilder] = opt map fun +} + case class ThreadPoolConfigDispatcherBuilder(dispatcherFactory: (ThreadPoolConfig) => MessageDispatcher, config: ThreadPoolConfig) extends DispatcherBuilder { import ThreadPoolConfig._ def build = dispatcherFactory(config) @@ -133,214 +141,18 @@ case class ThreadPoolConfigDispatcherBuilder(dispatcherFactory: (ThreadPoolConfi def setAllowCoreThreadTimeout(allow: Boolean): ThreadPoolConfigDispatcherBuilder = this.copy(config = config.copy(allowCorePoolTimeout = allow)) + + def configure(fs: Option[Function[ThreadPoolConfigDispatcherBuilder,ThreadPoolConfigDispatcherBuilder]]*): + ThreadPoolConfigDispatcherBuilder = fs.foldLeft(this)( (c,f) => f.map( _(c) ).getOrElse(c)) } - -trait ThreadPoolBuilder extends Logging { - val name: String - - private val NR_START_THREADS = 16 - private val NR_MAX_THREADS = 128 - private val KEEP_ALIVE_TIME = 60000L // default is one minute - private val MILLISECONDS = TimeUnit.MILLISECONDS - - private var threadPoolBuilder: ThreadPoolExecutor = _ - private var boundedExecutorBound = -1 - protected var mailboxCapacity = -1 - @volatile private var inProcessOfBuilding = false - private var blockingQueue: BlockingQueue[Runnable] = _ - - protected lazy val threadFactory = new MonitorableThreadFactory(name) - - @volatile var executor: ExecutorService = _ - - def isShutdown = executor.isShutdown - - def buildThreadPool(): ExecutorService = synchronized { - ensureNotActive - inProcessOfBuilding = false - - log.debug("Creating a %s with config [core-pool:%d,max-pool:%d,timeout:%d,allowCoreTimeout:%s,rejectPolicy:%s]", - getClass.getName, - threadPoolBuilder.getCorePoolSize, - threadPoolBuilder.getMaximumPoolSize, - threadPoolBuilder.getKeepAliveTime(MILLISECONDS), - threadPoolBuilder.allowsCoreThreadTimeOut, - threadPoolBuilder.getRejectedExecutionHandler.getClass.getSimpleName) - - if (boundedExecutorBound > 0) { - val boundedExecutor = new BoundedExecutorDecorator(threadPoolBuilder, boundedExecutorBound) - boundedExecutorBound = -1 //Why is this here? - executor = boundedExecutor - } else { - executor = threadPoolBuilder - } - executor - } - - def withNewThreadPoolWithCustomBlockingQueue(queue: BlockingQueue[Runnable]): ThreadPoolBuilder = synchronized { - ensureNotActive - verifyNotInConstructionPhase - blockingQueue = queue - threadPoolBuilder = new ThreadPoolExecutor(NR_START_THREADS, NR_MAX_THREADS, KEEP_ALIVE_TIME, MILLISECONDS, queue) - this - } - - /** - * Creates a new thread pool in which the number of tasks in the pending queue is bounded. Will block when exceeded. - * - * The 'bound' variable should specify the number equal to the size of the thread pool PLUS the number of queued tasks that should be followed. - */ - def withNewBoundedThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity(bound: Int): ThreadPoolBuilder = synchronized { - ensureNotActive - verifyNotInConstructionPhase - blockingQueue = new LinkedBlockingQueue[Runnable] - threadPoolBuilder = new ThreadPoolExecutor(NR_START_THREADS, NR_MAX_THREADS, KEEP_ALIVE_TIME, MILLISECONDS, blockingQueue, threadFactory) - boundedExecutorBound = bound - this - } - - def withNewThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity: ThreadPoolBuilder = synchronized { - ensureNotActive - verifyNotInConstructionPhase - blockingQueue = new LinkedBlockingQueue[Runnable] - threadPoolBuilder = new ThreadPoolExecutor( - NR_START_THREADS, NR_MAX_THREADS, KEEP_ALIVE_TIME, MILLISECONDS, blockingQueue, threadFactory, new CallerRunsPolicy) - this - } - - def withNewThreadPoolWithLinkedBlockingQueueWithCapacity(capacity: Int): ThreadPoolBuilder = synchronized { - ensureNotActive - verifyNotInConstructionPhase - blockingQueue = new LinkedBlockingQueue[Runnable](capacity) - threadPoolBuilder = new ThreadPoolExecutor( - NR_START_THREADS, NR_MAX_THREADS, KEEP_ALIVE_TIME, MILLISECONDS, blockingQueue, threadFactory, new CallerRunsPolicy) - this - } - - def withNewThreadPoolWithSynchronousQueueWithFairness(fair: Boolean): ThreadPoolBuilder = synchronized { - ensureNotActive - verifyNotInConstructionPhase - blockingQueue = new SynchronousQueue[Runnable](fair) - threadPoolBuilder = new ThreadPoolExecutor( - NR_START_THREADS, NR_MAX_THREADS, KEEP_ALIVE_TIME, MILLISECONDS, blockingQueue, threadFactory, new CallerRunsPolicy) - this - } - - def withNewThreadPoolWithArrayBlockingQueueWithCapacityAndFairness(capacity: Int, fair: Boolean): ThreadPoolBuilder = synchronized { - ensureNotActive - verifyNotInConstructionPhase - blockingQueue = new ArrayBlockingQueue[Runnable](capacity, fair) - threadPoolBuilder = new ThreadPoolExecutor( - NR_START_THREADS, NR_MAX_THREADS, KEEP_ALIVE_TIME, MILLISECONDS, blockingQueue, threadFactory, new CallerRunsPolicy) - this - } - - def configureIfPossible(f: (ThreadPoolBuilder) => Unit): Boolean = synchronized { - if(inProcessOfBuilding) { - f(this) - true - } - else { - log.warning("Tried to configure an already started ThreadPoolBuilder of type [%s]",getClass.getName) - false - } - } - - /** - * Default is 16. - */ - def setCorePoolSize(size: Int): ThreadPoolBuilder = - setThreadPoolExecutorProperty(_.setCorePoolSize(size)) - - /** - * Default is 128. - */ - def setMaxPoolSize(size: Int): ThreadPoolBuilder = - setThreadPoolExecutorProperty(_.setMaximumPoolSize(size)) - - - /** - * Sets the core pool size to (availableProcessors * multipliers).ceil.toInt - */ - def setCorePoolSizeFromFactor(multiplier: Double): ThreadPoolBuilder = - setThreadPoolExecutorProperty(_.setCorePoolSize(procs(multiplier))) - - /** - * Sets the max pool size to (availableProcessors * multipliers).ceil.toInt - */ - def setMaxPoolSizeFromFactor(multiplier: Double): ThreadPoolBuilder = - setThreadPoolExecutorProperty(_.setMaximumPoolSize(procs(multiplier))) - - /** - * Sets the bound, -1 is unbounded - */ - def setExecutorBounds(bounds: Int): Unit = synchronized { - this.boundedExecutorBound = bounds - } - - /** - * Sets the mailbox capacity, -1 is unbounded - */ - def setMailboxCapacity(capacity: Int): Unit = synchronized { - this.mailboxCapacity = capacity - } - - protected def procs(multiplier: Double): Int = - (Runtime.getRuntime.availableProcessors * multiplier).ceil.toInt - - /** - * Default is 60000 (one minute). - */ - def setKeepAliveTimeInMillis(time: Long): ThreadPoolBuilder = - setThreadPoolExecutorProperty(_.setKeepAliveTime(time, MILLISECONDS)) - - /** - * Default ThreadPoolExecutor.CallerRunsPolicy. To allow graceful backing off when pool is overloaded. - */ - def setRejectionPolicy(policy: RejectedExecutionHandler): ThreadPoolBuilder = - setThreadPoolExecutorProperty(_.setRejectedExecutionHandler(policy)) - - /** - * Default false, set to true to conserve thread for potentially unused dispatchers - */ - def setAllowCoreThreadTimeout(allow: Boolean) = - setThreadPoolExecutorProperty(_.allowCoreThreadTimeOut(allow)) - - /** - * Default ThreadPoolExecutor.CallerRunsPolicy. To allow graceful backing off when pool is overloaded. - */ - protected def setThreadPoolExecutorProperty(f: (ThreadPoolExecutor) => Unit): ThreadPoolBuilder = synchronized { - ensureNotActive - verifyInConstructionPhase - f(threadPoolBuilder) - this - } - - - protected def verifyNotInConstructionPhase = { - if (inProcessOfBuilding) throw new IllegalActorStateException("Is already in the process of building a thread pool") - inProcessOfBuilding = true - } - - protected def verifyInConstructionPhase = { - if (!inProcessOfBuilding) throw new IllegalActorStateException( - "Is not in the process of building a thread pool, start building one by invoking one of the 'newThreadPool*' methods") - } - - def ensureNotActive(): Unit -} - - /** * @author Jonas Bonér */ class MonitorableThreadFactory(val name: String) extends ThreadFactory { protected val counter = new AtomicLong - def newThread(runnable: Runnable) = - new MonitorableThread(runnable, name) -// new Thread(runnable, name + "-" + counter.getAndIncrement) + def newThread(runnable: Runnable) = new MonitorableThread(runnable, name) } /** @@ -382,10 +194,10 @@ class MonitorableThread(runnable: Runnable, name: String) /** * @author Jonas Bonér */ -class BoundedExecutorDecorator(val executor: ExecutorService, bound: Int) extends ExecutorService with Logging { +class BoundedExecutorDecorator(val executor: ExecutorService, bound: Int) extends ExecutorServiceDelegate with Logging { protected val semaphore = new Semaphore(bound) - def execute(command: Runnable) = { + override def execute(command: Runnable) = { semaphore.acquire try { executor.execute(new Runnable() { @@ -405,8 +217,14 @@ class BoundedExecutorDecorator(val executor: ExecutorService, bound: Int) extend throw e } } +} + +trait ExecutorServiceDelegate extends ExecutorService { + + def executor: ExecutorService + + def execute(command: Runnable) = executor.execute(command) - // Delegating methods for the ExecutorService interface def shutdown = executor.shutdown def shutdownNow = executor.shutdownNow @@ -431,3 +249,14 @@ class BoundedExecutorDecorator(val executor: ExecutorService, bound: Int) extend def invokeAny[T](callables: Collection[_ <: Callable[T]], l: Long, timeUnit: TimeUnit) = executor.invokeAny(callables, l, timeUnit) } + +trait LazyExecutorService extends ExecutorServiceDelegate { + + def createExecutor: ExecutorService + + lazy val executor = createExecutor +} + +class LazyExecutorServiceWrapper(executorFactory: => ExecutorService) extends LazyExecutorService { + def createExecutor = executorFactory +} \ No newline at end of file diff --git a/akka-actor/src/test/scala/actor/supervisor/SupervisorMiscSpec.scala b/akka-actor/src/test/scala/actor/supervisor/SupervisorMiscSpec.scala index 5fcaf13173..2ad8ac267b 100644 --- a/akka-actor/src/test/scala/actor/supervisor/SupervisorMiscSpec.scala +++ b/akka-actor/src/test/scala/actor/supervisor/SupervisorMiscSpec.scala @@ -36,7 +36,7 @@ class SupervisorMiscSpec extends WordSpec with MustMatchers { }).start val actor3 = Actor.actorOf(new Actor { - self.dispatcher = Dispatchers.newExecutorBasedEventDrivenDispatcher("test") + self.dispatcher = Dispatchers.newExecutorBasedEventDrivenDispatcher("test").build override def postRestart(cause: Throwable) {countDownLatch.countDown} protected def receive = { diff --git a/akka-actor/src/test/scala/dispatch/ExecutorBasedEventDrivenDispatcherActorSpec.scala b/akka-actor/src/test/scala/dispatch/ExecutorBasedEventDrivenDispatcherActorSpec.scala index 352ade75f0..1c2670da0d 100644 --- a/akka-actor/src/test/scala/dispatch/ExecutorBasedEventDrivenDispatcherActorSpec.scala +++ b/akka-actor/src/test/scala/dispatch/ExecutorBasedEventDrivenDispatcherActorSpec.scala @@ -10,7 +10,7 @@ import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger} object ExecutorBasedEventDrivenDispatcherActorSpec { class TestActor extends Actor { - self.dispatcher = Dispatchers.newExecutorBasedEventDrivenDispatcher(self.uuid.toString) + self.dispatcher = Dispatchers.newExecutorBasedEventDrivenDispatcher(self.uuid.toString).build def receive = { case "Hello" => self.reply("World") @@ -23,7 +23,7 @@ object ExecutorBasedEventDrivenDispatcherActorSpec { val oneWay = new CountDownLatch(1) } class OneWayTestActor extends Actor { - self.dispatcher = Dispatchers.newExecutorBasedEventDrivenDispatcher(self.uuid.toString) + self.dispatcher = Dispatchers.newExecutorBasedEventDrivenDispatcher(self.uuid.toString).build def receive = { case "OneWay" => OneWayTestActor.oneWay.countDown } @@ -68,9 +68,10 @@ class ExecutorBasedEventDrivenDispatcherActorSpec extends JUnitSuite { } @Test def shouldRespectThroughput { - val throughputDispatcher = new ExecutorBasedEventDrivenDispatcher("THROUGHPUT",101,0,Dispatchers.MAILBOX_TYPE, (e) => { - e.setCorePoolSize(1) - }) + val throughputDispatcher = Dispatchers. + newExecutorBasedEventDrivenDispatcher("THROUGHPUT",101,0,Dispatchers.MAILBOX_TYPE). + setCorePoolSize(1). + build val works = new AtomicBoolean(true) val latch = new CountDownLatch(100) @@ -103,10 +104,10 @@ class ExecutorBasedEventDrivenDispatcherActorSpec extends JUnitSuite { @Test def shouldRespectThroughputDeadline { val deadlineMs = 100 - val throughputDispatcher = new ExecutorBasedEventDrivenDispatcher("THROUGHPUT",2,deadlineMs,Dispatchers.MAILBOX_TYPE, (e) => { - e.setCorePoolSize(1) - }) - + val throughputDispatcher = Dispatchers. + newExecutorBasedEventDrivenDispatcher("THROUGHPUT",2,deadlineMs,Dispatchers.MAILBOX_TYPE). + setCorePoolSize(1). + build val works = new AtomicBoolean(true) val latch = new CountDownLatch(1) val start = new CountDownLatch(1) diff --git a/akka-actor/src/test/scala/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcherSpec.scala b/akka-actor/src/test/scala/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcherSpec.scala index 3285e450c6..fe45d79869 100644 --- a/akka-actor/src/test/scala/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcherSpec.scala +++ b/akka-actor/src/test/scala/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcherSpec.scala @@ -11,9 +11,9 @@ import Actor._ import se.scalablesolutions.akka.dispatch.{MessageQueue, Dispatchers} object ExecutorBasedEventDrivenWorkStealingDispatcherSpec { - val delayableActorDispatcher = Dispatchers.newExecutorBasedEventDrivenWorkStealingDispatcher("pooled-dispatcher") - val sharedActorDispatcher = Dispatchers.newExecutorBasedEventDrivenWorkStealingDispatcher("pooled-dispatcher") - val parentActorDispatcher = Dispatchers.newExecutorBasedEventDrivenWorkStealingDispatcher("pooled-dispatcher") + val delayableActorDispatcher = Dispatchers.newExecutorBasedEventDrivenWorkStealingDispatcher("pooled-dispatcher").build + val sharedActorDispatcher = Dispatchers.newExecutorBasedEventDrivenWorkStealingDispatcher("pooled-dispatcher").build + val parentActorDispatcher = Dispatchers.newExecutorBasedEventDrivenWorkStealingDispatcher("pooled-dispatcher").build class DelayableActor(name: String, delay: Int, finishedCounter: CountDownLatch) extends Actor { self.dispatcher = delayableActorDispatcher diff --git a/akka-spring/src/main/scala/DispatcherFactoryBean.scala b/akka-spring/src/main/scala/DispatcherFactoryBean.scala index d273e75011..cfe0e05930 100644 --- a/akka-spring/src/main/scala/DispatcherFactoryBean.scala +++ b/akka-spring/src/main/scala/DispatcherFactoryBean.scala @@ -11,6 +11,7 @@ import se.scalablesolutions.akka.actor.ActorRef import java.util.concurrent.RejectedExecutionHandler import java.util.concurrent.ThreadPoolExecutor.{DiscardPolicy, DiscardOldestPolicy, CallerRunsPolicy, AbortPolicy} import se.scalablesolutions.akka.dispatch._ +import se.scalablesolutions.akka.util.Duration /** * Reusable factory method for dispatchers. @@ -24,53 +25,66 @@ object DispatcherFactoryBean { */ def createNewInstance(properties: DispatcherProperties, actorRef: Option[ActorRef] = None): MessageDispatcher = { - def configThreadPool(): ThreadPoolConfig = { - val poolCfg = ThreadPoolConfig() + //Creates a ThreadPoolConfigDispatcherBuilder and applies the configuration to it + def configureThreadPool(createDispatcher: => (ThreadPoolConfig) => MessageDispatcher): ThreadPoolConfigDispatcherBuilder = { if ((properties.threadPool ne null) && (properties.threadPool.queue ne null)) { - properties.threadPool.queue match { - case VAL_BOUNDED_ARRAY_BLOCKING_QUEUE => threadPoolBuilder.withNewThreadPoolWithArrayBlockingQueueWithCapacityAndFairness(properties.threadPool.capacity, properties.threadPool.fairness) - case VAL_UNBOUNDED_LINKED_BLOCKING_QUEUE if properties.threadPool.capacity > -1 => threadPoolBuilder.withNewThreadPoolWithLinkedBlockingQueueWithCapacity(properties.threadPool.capacity) - case VAL_UNBOUNDED_LINKED_BLOCKING_QUEUE if properties.threadPool.capacity <= 0 => threadPoolBuilder.withNewThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity - case VAL_BOUNDED_LINKED_BLOCKING_QUEUE => threadPoolBuilder.withNewBoundedThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity(properties.threadPool.bound) - case VAL_SYNCHRONOUS_QUEUE => threadPoolBuilder.withNewThreadPoolWithSynchronousQueueWithFairness(properties.threadPool.fairness) - case _ => throw new IllegalArgumentException("unknown queue type") - } + import ThreadPoolConfigDispatcherBuilder.conf_? + import properties._ + val queueDef = Some(threadPool.queue) + val corePoolSize = if (threadPool.corePoolSize > -1) Some(threadPool.corePoolSize) else None + val maxPoolSize = if (threadPool.maxPoolSize > -1) Some(threadPool.maxPoolSize) else None + val keepAlive = if (threadPool.keepAlive > -1) Some(threadPool.keepAlive) else None + val executorBounds = if (threadPool.bound > -1) Some(threadPool.bound) else None + val flowHandler = threadPool.rejectionPolicy match { + case null | "" => None + case "abort-policy" => Some(new AbortPolicy()) + case "caller-runs-policy" => Some(new CallerRunsPolicy()) + case "discard-oldest-policy" => Some(new DiscardOldestPolicy()) + case "discard-policy" => Some(new DiscardPolicy()) + case x => throw new IllegalArgumentException("Unknown rejection-policy '" + x + "'") + } - if (properties.threadPool.corePoolSize > -1) - threadPoolBuilder.setCorePoolSize(properties.threadPool.corePoolSize) - - if (properties.threadPool.maxPoolSize > -1) - threadPoolBuilder.setMaxPoolSize(properties.threadPool.maxPoolSize) - - if (properties.threadPool.keepAlive > -1) - threadPoolBuilder.setKeepAliveTimeInMillis(properties.threadPool.keepAlive) - - if (properties.threadPool.mailboxCapacity > -1) - threadPoolBuilder.setMailboxCapacity(properties.threadPool.mailboxCapacity) - - if ((properties.threadPool.rejectionPolicy ne null) && (!properties.threadPool.rejectionPolicy.isEmpty)) { - val policy: RejectedExecutionHandler = properties.threadPool.rejectionPolicy match { - case "abort-policy" => new AbortPolicy() - case "caller-runs-policy" => new CallerRunsPolicy() - case "discard-oldest-policy" => new DiscardOldestPolicy() - case "discard-policy" => new DiscardPolicy() - case _ => throw new IllegalArgumentException("Unknown rejection-policy '" + properties.threadPool.rejectionPolicy + "'") - } - threadPoolBuilder.setRejectionPolicy(policy) - } - } else poolCfg + //Apply the following options to the config if they are present in the cfg + ThreadPoolConfigDispatcherBuilder(createDispatcher,ThreadPoolConfig()).configure( + conf_?(queueDef )(definition => definition match { + case VAL_BOUNDED_ARRAY_BLOCKING_QUEUE => + _.withNewThreadPoolWithArrayBlockingQueueWithCapacityAndFairness(threadPool.capacity,threadPool.fairness) + case VAL_UNBOUNDED_LINKED_BLOCKING_QUEUE if threadPool.capacity > -1 => + _.withNewThreadPoolWithLinkedBlockingQueueWithCapacity(threadPool.capacity) + case VAL_UNBOUNDED_LINKED_BLOCKING_QUEUE if threadPool.capacity <= 0 => + _.withNewThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity + case VAL_BOUNDED_LINKED_BLOCKING_QUEUE => + _.withNewBoundedThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity(threadPool.bound) + case VAL_SYNCHRONOUS_QUEUE => + _.withNewThreadPoolWithSynchronousQueueWithFairness(threadPool.fairness) + case unknown => + throw new IllegalArgumentException("Unknown queue type " + unknown) + }), + conf_?(keepAlive )(time => _.setKeepAliveTimeInMillis(time)), + conf_?(corePoolSize )(count => _.setCorePoolSize(count)), + conf_?(maxPoolSize )(count => _.setMaxPoolSize(count)), + conf_?(executorBounds)(bounds => _.setExecutorBounds(bounds)), + conf_?(flowHandler )(policy => _.setRejectionPolicy(policy))) + } + else + ThreadPoolConfigDispatcherBuilder(createDispatcher,ThreadPoolConfig()) } - var dispatcher = properties.dispatcherType match { - case EXECUTOR_BASED_EVENT_DRIVEN => new ExecutorBasedEventDrivenDispatcher(properties.name, config = configThreadPool) - case EXECUTOR_BASED_EVENT_DRIVEN_WORK_STEALING => Dispatchers.newExecutorBasedEventDrivenWorkStealingDispatcher(properties.name) - case THREAD_BASED if actorRef.isEmpty => throw new IllegalArgumentException("Need an ActorRef to create a thread based dispatcher.") - case THREAD_BASED if actorRef.isDefined => Dispatchers.newThreadBasedDispatcher(actorRef.get) - case HAWT => Dispatchers.newHawtDispatcher(properties.aggregate) - case _ => throw new IllegalArgumentException("unknown dispatcher type") + //Create the dispatcher + properties.dispatcherType match { + case EXECUTOR_BASED_EVENT_DRIVEN => + configureThreadPool(poolConfig => new ExecutorBasedEventDrivenDispatcher(properties.name, poolConfig)).build + case EXECUTOR_BASED_EVENT_DRIVEN_WORK_STEALING => + configureThreadPool(poolConfig => new ExecutorBasedEventDrivenWorkStealingDispatcher(properties.name,Dispatchers.MAILBOX_TYPE,poolConfig)).build + case THREAD_BASED if actorRef.isEmpty => + throw new IllegalArgumentException("Need an ActorRef to create a thread based dispatcher.") + case THREAD_BASED if actorRef.isDefined => + Dispatchers.newThreadBasedDispatcher(actorRef.get) + case HAWT => + Dispatchers.newHawtDispatcher(properties.aggregate) + case unknown => + throw new IllegalArgumentException("Unknown dispatcher type " + unknown) } - - dispatcher } } diff --git a/akka-spring/src/test/scala/DispatcherSpringFeatureTest.scala b/akka-spring/src/test/scala/DispatcherSpringFeatureTest.scala index 51c8d2bd73..a5ae40ff16 100644 --- a/akka-spring/src/test/scala/DispatcherSpringFeatureTest.scala +++ b/akka-spring/src/test/scala/DispatcherSpringFeatureTest.scala @@ -130,13 +130,18 @@ class DispatcherSpringFeatureTest extends FeatureSpec with ShouldMatchers { * get ThreadPoolExecutor via reflection and assert that dispatcher is correct type */ private def getThreadPoolExecutorAndAssert(dispatcher: MessageDispatcher): ThreadPoolExecutor = { - assert(dispatcher.isInstanceOf[ThreadPoolBuilder]) - val pool = dispatcher.asInstanceOf[ThreadPoolBuilder] - val field = pool.getClass.getDeclaredField("se$scalablesolutions$akka$dispatch$ThreadPoolBuilder$$threadPoolBuilder") - field.setAccessible(true) - val executor = field.get(pool).asInstanceOf[ThreadPoolExecutor] - assert(executor ne null) - executor; + + def unpackExecutorService(e: ExecutorService): ExecutorService = e match { + case b: ExecutorServiceDelegate => unpackExecutorService(b.executor) + case t: ThreadPoolExecutor => t + case e => throw new IllegalStateException("Illegal executor type: " + e) + } + + unpackExecutorService(dispatcher match { + case e: ExecutorBasedEventDrivenDispatcher => e.start; e.executorService.get() + case e: ExecutorBasedEventDrivenWorkStealingDispatcher => e.start; e.executorService.get() + case x => throw new IllegalStateException("Illegal dispatcher type: " + x) + }).asInstanceOf[ThreadPoolExecutor] } } diff --git a/akka-typed-actor/src/test/scala/actor/typed-actor/TypedActorGuiceConfiguratorSpec.scala b/akka-typed-actor/src/test/scala/actor/typed-actor/TypedActorGuiceConfiguratorSpec.scala index ea6e939386..5d2e9fd315 100644 --- a/akka-typed-actor/src/test/scala/actor/typed-actor/TypedActorGuiceConfiguratorSpec.scala +++ b/akka-typed-actor/src/test/scala/actor/typed-actor/TypedActorGuiceConfiguratorSpec.scala @@ -30,7 +30,7 @@ class TypedActorGuiceConfiguratorSpec extends override def beforeAll { Config.config - val dispatcher = Dispatchers.newExecutorBasedEventDrivenDispatcher("test") + val dispatcher = Dispatchers.newExecutorBasedEventDrivenDispatcher("test").build conf.addExternalGuiceModule(new AbstractModule { def configure = bind(classOf[Ext]).to(classOf[ExtImpl]).in(Scopes.SINGLETON)