Tests green, config basically in place, need to work on start/stop semantics and countdowns
This commit is contained in:
parent
4478474681
commit
c241703a01
10 changed files with 161 additions and 308 deletions
|
|
@ -100,7 +100,8 @@ object Dispatchers extends Logging {
|
||||||
* <p/>
|
* <p/>
|
||||||
* Has a fluent builder interface for configuring its semantics.
|
* Has a fluent builder interface for configuring its semantics.
|
||||||
*/
|
*/
|
||||||
def newExecutorBasedEventDrivenDispatcher(name: String) = new ExecutorBasedEventDrivenDispatcher(name)
|
def newExecutorBasedEventDrivenDispatcher(name: String) =
|
||||||
|
ThreadPoolConfigDispatcherBuilder(config => new ExecutorBasedEventDrivenDispatcher(name,config),ThreadPoolConfig())
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Creates a executor-based event-driven dispatcher serving multiple (millions) of actors through a thread pool.
|
* Creates a executor-based event-driven dispatcher serving multiple (millions) of actors through a thread pool.
|
||||||
|
|
@ -108,7 +109,8 @@ object Dispatchers extends Logging {
|
||||||
* Has a fluent builder interface for configuring its semantics.
|
* Has a fluent builder interface for configuring its semantics.
|
||||||
*/
|
*/
|
||||||
def newExecutorBasedEventDrivenDispatcher(name: String, throughput: Int, mailboxType: MailboxType) =
|
def newExecutorBasedEventDrivenDispatcher(name: String, throughput: Int, mailboxType: MailboxType) =
|
||||||
new ExecutorBasedEventDrivenDispatcher(name, throughput, mailboxType)
|
ThreadPoolConfigDispatcherBuilder(config =>
|
||||||
|
new ExecutorBasedEventDrivenDispatcher(name, throughput, THROUGHPUT_DEADLINE_TIME_MILLIS, mailboxType, config),ThreadPoolConfig())
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -117,14 +119,16 @@ object Dispatchers extends Logging {
|
||||||
* Has a fluent builder interface for configuring its semantics.
|
* Has a fluent builder interface for configuring its semantics.
|
||||||
*/
|
*/
|
||||||
def newExecutorBasedEventDrivenDispatcher(name: String, throughput: Int, throughputDeadlineMs: Int, mailboxType: MailboxType) =
|
def newExecutorBasedEventDrivenDispatcher(name: String, throughput: Int, throughputDeadlineMs: Int, mailboxType: MailboxType) =
|
||||||
new ExecutorBasedEventDrivenDispatcher(name, throughput, throughputDeadlineMs, mailboxType)
|
ThreadPoolConfigDispatcherBuilder(config =>
|
||||||
|
new ExecutorBasedEventDrivenDispatcher(name, throughput, throughputDeadlineMs, mailboxType, config),ThreadPoolConfig())
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Creates a executor-based event-driven dispatcher with work stealing (TODO: better doc) serving multiple (millions) of actors through a thread pool.
|
* Creates a executor-based event-driven dispatcher with work stealing (TODO: better doc) serving multiple (millions) of actors through a thread pool.
|
||||||
* <p/>
|
* <p/>
|
||||||
* Has a fluent builder interface for configuring its semantics.
|
* Has a fluent builder interface for configuring its semantics.
|
||||||
*/
|
*/
|
||||||
def newExecutorBasedEventDrivenWorkStealingDispatcher(name: String) = new ExecutorBasedEventDrivenWorkStealingDispatcher(name)
|
def newExecutorBasedEventDrivenWorkStealingDispatcher(name: String): ThreadPoolConfigDispatcherBuilder =
|
||||||
|
newExecutorBasedEventDrivenWorkStealingDispatcher(name,MAILBOX_TYPE)
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Creates a executor-based event-driven dispatcher with work stealing (TODO: better doc) serving multiple (millions) of actors through a thread pool.
|
* Creates a executor-based event-driven dispatcher with work stealing (TODO: better doc) serving multiple (millions) of actors through a thread pool.
|
||||||
|
|
@ -132,14 +136,14 @@ object Dispatchers extends Logging {
|
||||||
* Has a fluent builder interface for configuring its semantics.
|
* Has a fluent builder interface for configuring its semantics.
|
||||||
*/
|
*/
|
||||||
def newExecutorBasedEventDrivenWorkStealingDispatcher(name: String, mailboxType: MailboxType) =
|
def newExecutorBasedEventDrivenWorkStealingDispatcher(name: String, mailboxType: MailboxType) =
|
||||||
new ExecutorBasedEventDrivenWorkStealingDispatcher(name, mailboxType = mailboxType)
|
ThreadPoolConfigDispatcherBuilder(config => new ExecutorBasedEventDrivenWorkStealingDispatcher(name,mailboxType,config),ThreadPoolConfig())
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Utility function that tries to load the specified dispatcher config from the akka.conf
|
* Utility function that tries to load the specified dispatcher config from the akka.conf
|
||||||
* or else use the supplied default dispatcher
|
* or else use the supplied default dispatcher
|
||||||
*/
|
*/
|
||||||
def fromConfig(key: String, default: => MessageDispatcher = defaultGlobalDispatcher): MessageDispatcher =
|
def fromConfig(key: String, default: => MessageDispatcher = defaultGlobalDispatcher): MessageDispatcher =
|
||||||
config.getConfigMap(key).flatMap(from).getOrElse(default)
|
config getConfigMap key flatMap from getOrElse default
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Creates of obtains a dispatcher from a ConfigMap according to the format below
|
* Creates of obtains a dispatcher from a ConfigMap according to the format below
|
||||||
|
|
@ -167,13 +171,10 @@ object Dispatchers extends Logging {
|
||||||
lazy val name = cfg.getString("name", newUuid.toString)
|
lazy val name = cfg.getString("name", newUuid.toString)
|
||||||
|
|
||||||
def configureThreadPool(createDispatcher: => (ThreadPoolConfig) => MessageDispatcher): ThreadPoolConfigDispatcherBuilder = {
|
def configureThreadPool(createDispatcher: => (ThreadPoolConfig) => MessageDispatcher): ThreadPoolConfigDispatcherBuilder = {
|
||||||
val builder = ThreadPoolConfigDispatcherBuilder(createDispatcher,ThreadPoolConfig()) //Create a new builder
|
import ThreadPoolConfigDispatcherBuilder.conf_?
|
||||||
//Creates a transformation from builder to builder, if the option isDefined
|
|
||||||
def conf_?[T](opt: Option[T])(fun: (T) => ThreadPoolConfigDispatcherBuilder => ThreadPoolConfigDispatcherBuilder):
|
|
||||||
Option[(ThreadPoolConfigDispatcherBuilder) => ThreadPoolConfigDispatcherBuilder] = opt map fun
|
|
||||||
|
|
||||||
//Apply the following options to the config if they are present in the cfg
|
//Apply the following options to the config if they are present in the cfg
|
||||||
List(
|
ThreadPoolConfigDispatcherBuilder(createDispatcher,ThreadPoolConfig()).configure(
|
||||||
conf_?(cfg getInt "keep-alive-time" )(time => _.setKeepAliveTime(Duration(time, TIME_UNIT))),
|
conf_?(cfg getInt "keep-alive-time" )(time => _.setKeepAliveTime(Duration(time, TIME_UNIT))),
|
||||||
conf_?(cfg getDouble "core-pool-size-factor")(factor => _.setCorePoolSizeFromFactor(factor)),
|
conf_?(cfg getDouble "core-pool-size-factor")(factor => _.setCorePoolSizeFromFactor(factor)),
|
||||||
conf_?(cfg getDouble "max-pool-size-factor" )(factor => _.setMaxPoolSizeFromFactor(factor)),
|
conf_?(cfg getDouble "max-pool-size-factor" )(factor => _.setMaxPoolSizeFromFactor(factor)),
|
||||||
|
|
@ -185,8 +186,7 @@ object Dispatchers extends Logging {
|
||||||
case "discard-oldest" => new DiscardOldestPolicy()
|
case "discard-oldest" => new DiscardOldestPolicy()
|
||||||
case "discard" => new DiscardPolicy()
|
case "discard" => new DiscardPolicy()
|
||||||
case x => throw new IllegalArgumentException("[%s] is not a valid rejectionPolicy!" format x)
|
case x => throw new IllegalArgumentException("[%s] is not a valid rejectionPolicy!" format x)
|
||||||
})(policy => _.setRejectionPolicy(policy))
|
})(policy => _.setRejectionPolicy(policy)))
|
||||||
).foldLeft(builder)( (c,f) => f.map( _(c) ).getOrElse(c)) //Returns the builder with all the specified options set
|
|
||||||
}
|
}
|
||||||
|
|
||||||
lazy val mailboxType: MailboxType = {
|
lazy val mailboxType: MailboxType = {
|
||||||
|
|
|
||||||
|
|
@ -82,6 +82,9 @@ class ExecutorBasedEventDrivenDispatcher(
|
||||||
def this(_name: String, throughput: Int) =
|
def this(_name: String, throughput: Int) =
|
||||||
this(_name, throughput, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, Dispatchers.MAILBOX_TYPE) // Needed for Java API usage
|
this(_name, throughput, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, Dispatchers.MAILBOX_TYPE) // Needed for Java API usage
|
||||||
|
|
||||||
|
def this(_name: String, _config: ThreadPoolConfig) =
|
||||||
|
this(_name, Dispatchers.THROUGHPUT, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, Dispatchers.MAILBOX_TYPE, _config)
|
||||||
|
|
||||||
def this(_name: String) =
|
def this(_name: String) =
|
||||||
this(_name, Dispatchers.THROUGHPUT, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, Dispatchers.MAILBOX_TYPE) // Needed for Java API usage
|
this(_name, Dispatchers.THROUGHPUT, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, Dispatchers.MAILBOX_TYPE) // Needed for Java API usage
|
||||||
|
|
||||||
|
|
@ -90,7 +93,7 @@ class ExecutorBasedEventDrivenDispatcher(
|
||||||
|
|
||||||
private[akka] val active = new Switch(false)
|
private[akka] val active = new Switch(false)
|
||||||
private[akka] val threadFactory = new MonitorableThreadFactory(name)
|
private[akka] val threadFactory = new MonitorableThreadFactory(name)
|
||||||
private[akka] val executorService = new AtomicReference[ExecutorService](null)
|
private[akka] val executorService = new AtomicReference[ExecutorService](config.createLazyExecutorService(threadFactory))
|
||||||
|
|
||||||
def dispatch(invocation: MessageInvocation) = {
|
def dispatch(invocation: MessageInvocation) = {
|
||||||
val mbox = getMailbox(invocation.receiver)
|
val mbox = getMailbox(invocation.receiver)
|
||||||
|
|
@ -133,15 +136,10 @@ class ExecutorBasedEventDrivenDispatcher(
|
||||||
|
|
||||||
def start: Unit = if (active.isOff) active switchOn {
|
def start: Unit = if (active.isOff) active switchOn {
|
||||||
log.debug("Starting up %s\n\twith throughput [%d]", toString, throughput)
|
log.debug("Starting up %s\n\twith throughput [%d]", toString, throughput)
|
||||||
if (executorService.get() eq null) {
|
|
||||||
val newExecutor = config.createExecutorService(threadFactory)
|
|
||||||
if (!executorService.compareAndSet(null,newExecutor))
|
|
||||||
log.error("Thought the ExecutorService was missing but appeared out of nowhere!")
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
def shutdown: Unit = if (active.isOn) active switchOff {
|
def shutdown: Unit = if (active.isOn) active switchOff {
|
||||||
val old = executorService.getAndSet(null)
|
val old = executorService.getAndSet(config.createLazyExecutorService(threadFactory))
|
||||||
if (old ne null) {
|
if (old ne null) {
|
||||||
log.debug("Shutting down %s", toString)
|
log.debug("Shutting down %s", toString)
|
||||||
old.shutdownNow()
|
old.shutdownNow()
|
||||||
|
|
|
||||||
|
|
@ -4,11 +4,12 @@
|
||||||
|
|
||||||
package se.scalablesolutions.akka.dispatch
|
package se.scalablesolutions.akka.dispatch
|
||||||
|
|
||||||
import java.util.concurrent.CopyOnWriteArrayList
|
|
||||||
import jsr166x.{Deque, ConcurrentLinkedDeque, LinkedBlockingDeque}
|
import jsr166x.{Deque, ConcurrentLinkedDeque, LinkedBlockingDeque}
|
||||||
|
|
||||||
import se.scalablesolutions.akka.actor.{Actor, ActorRef, IllegalActorStateException}
|
import se.scalablesolutions.akka.actor.{Actor, ActorRef, IllegalActorStateException}
|
||||||
import se.scalablesolutions.akka.util.Switch
|
import se.scalablesolutions.akka.util.Switch
|
||||||
|
import java.util.concurrent. {ExecutorService, CopyOnWriteArrayList}
|
||||||
|
import java.util.concurrent.atomic.AtomicReference
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* An executor based event driven dispatcher which will try to redistribute work from busy actors to idle actors. It is assumed
|
* An executor based event driven dispatcher which will try to redistribute work from busy actors to idle actors. It is assumed
|
||||||
|
|
@ -33,27 +34,28 @@ import se.scalablesolutions.akka.util.Switch
|
||||||
class ExecutorBasedEventDrivenWorkStealingDispatcher(
|
class ExecutorBasedEventDrivenWorkStealingDispatcher(
|
||||||
_name: String,
|
_name: String,
|
||||||
_mailboxType: MailboxType = Dispatchers.MAILBOX_TYPE,
|
_mailboxType: MailboxType = Dispatchers.MAILBOX_TYPE,
|
||||||
config: ThreadPoolConfig = ThreadPoolConfig()) extends MessageDispatcher with ThreadPoolBuilder {
|
config: ThreadPoolConfig = ThreadPoolConfig()) extends MessageDispatcher {
|
||||||
|
|
||||||
def this(_name: String, mailboxType: MailboxType) = this(_name, mailboxType,ThreadPoolConfig())
|
def this(_name: String, mailboxType: MailboxType) = this(_name, mailboxType,ThreadPoolConfig())
|
||||||
|
|
||||||
def this(_name: String) = this(_name, Dispatchers.MAILBOX_TYPE,ThreadPoolConfig())
|
def this(_name: String) = this(_name, Dispatchers.MAILBOX_TYPE,ThreadPoolConfig())
|
||||||
|
|
||||||
val mailboxType = Some(_mailboxType)
|
//implicit def actorRef2actor(actorRef: ActorRef): Actor = actorRef.actor
|
||||||
|
|
||||||
|
val mailboxType = Some(_mailboxType)
|
||||||
|
val name = "akka:event-driven-work-stealing:dispatcher:" + _name
|
||||||
private val active = new Switch(false)
|
private val active = new Switch(false)
|
||||||
|
|
||||||
implicit def actorRef2actor(actorRef: ActorRef): Actor = actorRef.actor
|
|
||||||
|
|
||||||
/** Type of the actors registered in this dispatcher. */
|
/** Type of the actors registered in this dispatcher. */
|
||||||
private var actorType: Option[Class[_]] = None
|
@volatile private var actorType: Option[Class[_]] = None
|
||||||
|
|
||||||
private val pooledActors = new CopyOnWriteArrayList[ActorRef]
|
private val pooledActors = new CopyOnWriteArrayList[ActorRef]
|
||||||
|
private[akka] val threadFactory = new MonitorableThreadFactory(name)
|
||||||
|
private[akka] val executorService = new AtomicReference[ExecutorService](config.createLazyExecutorService(threadFactory))
|
||||||
|
|
||||||
/** The index in the pooled actors list which was last used to steal work */
|
/** The index in the pooled actors list which was last used to steal work */
|
||||||
@volatile private var lastThiefIndex = 0
|
@volatile private var lastThiefIndex = 0
|
||||||
|
|
||||||
val name = "akka:event-driven-work-stealing:dispatcher:" + _name
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @return the mailbox associated with the actor
|
* @return the mailbox associated with the actor
|
||||||
|
|
@ -65,7 +67,7 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher(
|
||||||
def dispatch(invocation: MessageInvocation) = if (active.isOn) {
|
def dispatch(invocation: MessageInvocation) = if (active.isOn) {
|
||||||
val mbox = getMailbox(invocation.receiver)
|
val mbox = getMailbox(invocation.receiver)
|
||||||
mbox enqueue invocation
|
mbox enqueue invocation
|
||||||
executor execute mbox
|
executorService.get() execute mbox
|
||||||
} else throw new IllegalActorStateException("Can't submit invocations to dispatcher since it's not started")
|
} else throw new IllegalActorStateException("Can't submit invocations to dispatcher since it's not started")
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -92,6 +94,8 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher(
|
||||||
mailboxWasProcessed
|
mailboxWasProcessed
|
||||||
}
|
}
|
||||||
|
|
||||||
|
def isShutdown = active.isOff
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Process the messages in the mailbox of the given actor.
|
* Process the messages in the mailbox of the given actor.
|
||||||
* @return
|
* @return
|
||||||
|
|
@ -160,10 +164,10 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher(
|
||||||
private def donateMessage(receiver: ActorRef, thief: ActorRef): Boolean = {
|
private def donateMessage(receiver: ActorRef, thief: ActorRef): Boolean = {
|
||||||
val donated = getMailbox(receiver).pollLast
|
val donated = getMailbox(receiver).pollLast
|
||||||
if (donated ne null) {
|
if (donated ne null) {
|
||||||
if (donated.senderFuture.isDefined) thief.self.postMessageToMailboxAndCreateFutureResultWithTimeout[Any](
|
if (donated.senderFuture.isDefined) thief.postMessageToMailboxAndCreateFutureResultWithTimeout[Any](
|
||||||
donated.message, receiver.timeout, donated.sender, donated.senderFuture)
|
donated.message, receiver.timeout, donated.sender, donated.senderFuture)
|
||||||
else if (donated.sender.isDefined) thief.self.postMessageToMailbox(donated.message, donated.sender)
|
else if (donated.sender.isDefined) thief.postMessageToMailbox(donated.message, donated.sender)
|
||||||
else thief.self.postMessageToMailbox(donated.message, None)
|
else thief.postMessageToMailbox(donated.message, None)
|
||||||
true
|
true
|
||||||
} else false
|
} else false
|
||||||
}
|
}
|
||||||
|
|
@ -172,11 +176,14 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher(
|
||||||
log.debug("Starting up %s",toString)
|
log.debug("Starting up %s",toString)
|
||||||
}
|
}
|
||||||
|
|
||||||
def shutdown = active switchOff {
|
def shutdown: Unit = if (active.isOn) active switchOff {
|
||||||
|
val old = executorService.getAndSet(config.createLazyExecutorService(threadFactory))
|
||||||
|
if (old ne null) {
|
||||||
log.debug("Shutting down %s", toString)
|
log.debug("Shutting down %s", toString)
|
||||||
executor.shutdownNow
|
old.shutdownNow()
|
||||||
uuids.clear
|
uuids.clear
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
def suspend(actorRef: ActorRef) {
|
def suspend(actorRef: ActorRef) {
|
||||||
|
|
@ -186,7 +193,7 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher(
|
||||||
def resume(actorRef: ActorRef) {
|
def resume(actorRef: ActorRef) {
|
||||||
val mbox = getMailbox(actorRef)
|
val mbox = getMailbox(actorRef)
|
||||||
mbox.suspended.switchOff
|
mbox.suspended.switchOff
|
||||||
executor execute mbox
|
executorService.get() execute mbox
|
||||||
}
|
}
|
||||||
|
|
||||||
def ensureNotActive(): Unit = if (active.isOn) throw new IllegalActorStateException(
|
def ensureNotActive(): Unit = if (active.isOn) throw new IllegalActorStateException(
|
||||||
|
|
@ -208,8 +215,7 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
case BoundedMailbox(blocking, capacity, pushTimeOut) =>
|
case BoundedMailbox(blocking, capacity, pushTimeOut) =>
|
||||||
val cap = if (mailboxCapacity == -1) capacity else mailboxCapacity
|
new LinkedBlockingDeque[MessageInvocation](capacity) with MessageQueue with Runnable {
|
||||||
new LinkedBlockingDeque[MessageInvocation](cap) with MessageQueue with Runnable {
|
|
||||||
def enqueue(handle: MessageInvocation): Unit = this.add(handle)
|
def enqueue(handle: MessageInvocation): Unit = this.add(handle)
|
||||||
|
|
||||||
def dequeue: MessageInvocation = this.poll()
|
def dequeue: MessageInvocation = this.poll()
|
||||||
|
|
|
||||||
|
|
@ -58,6 +58,9 @@ case class ThreadPoolConfig(allowCorePoolTimeout: Boolean = ThreadPoolConfig.def
|
||||||
flowHandler: ThreadPoolConfig.FlowHandler = ThreadPoolConfig.defaultFlowHandler,
|
flowHandler: ThreadPoolConfig.FlowHandler = ThreadPoolConfig.defaultFlowHandler,
|
||||||
queueFactory: ThreadPoolConfig.QueueFactory = ThreadPoolConfig.linkedBlockingQueue()) {
|
queueFactory: ThreadPoolConfig.QueueFactory = ThreadPoolConfig.linkedBlockingQueue()) {
|
||||||
|
|
||||||
|
final def createLazyExecutorService(threadFactory: ThreadFactory): ExecutorService =
|
||||||
|
new LazyExecutorServiceWrapper(createExecutorService(threadFactory))
|
||||||
|
|
||||||
final def createExecutorService(threadFactory: ThreadFactory): ExecutorService = {
|
final def createExecutorService(threadFactory: ThreadFactory): ExecutorService = {
|
||||||
flowHandler match {
|
flowHandler match {
|
||||||
case Left(rejectHandler) =>
|
case Left(rejectHandler) =>
|
||||||
|
|
@ -76,6 +79,11 @@ trait DispatcherBuilder {
|
||||||
def build: MessageDispatcher
|
def build: MessageDispatcher
|
||||||
}
|
}
|
||||||
|
|
||||||
|
object ThreadPoolConfigDispatcherBuilder {
|
||||||
|
def conf_?[T](opt: Option[T])(fun: (T) => ThreadPoolConfigDispatcherBuilder => ThreadPoolConfigDispatcherBuilder):
|
||||||
|
Option[(ThreadPoolConfigDispatcherBuilder) => ThreadPoolConfigDispatcherBuilder] = opt map fun
|
||||||
|
}
|
||||||
|
|
||||||
case class ThreadPoolConfigDispatcherBuilder(dispatcherFactory: (ThreadPoolConfig) => MessageDispatcher, config: ThreadPoolConfig) extends DispatcherBuilder {
|
case class ThreadPoolConfigDispatcherBuilder(dispatcherFactory: (ThreadPoolConfig) => MessageDispatcher, config: ThreadPoolConfig) extends DispatcherBuilder {
|
||||||
import ThreadPoolConfig._
|
import ThreadPoolConfig._
|
||||||
def build = dispatcherFactory(config)
|
def build = dispatcherFactory(config)
|
||||||
|
|
@ -133,214 +141,18 @@ case class ThreadPoolConfigDispatcherBuilder(dispatcherFactory: (ThreadPoolConfi
|
||||||
|
|
||||||
def setAllowCoreThreadTimeout(allow: Boolean): ThreadPoolConfigDispatcherBuilder =
|
def setAllowCoreThreadTimeout(allow: Boolean): ThreadPoolConfigDispatcherBuilder =
|
||||||
this.copy(config = config.copy(allowCorePoolTimeout = allow))
|
this.copy(config = config.copy(allowCorePoolTimeout = allow))
|
||||||
|
|
||||||
|
def configure(fs: Option[Function[ThreadPoolConfigDispatcherBuilder,ThreadPoolConfigDispatcherBuilder]]*):
|
||||||
|
ThreadPoolConfigDispatcherBuilder = fs.foldLeft(this)( (c,f) => f.map( _(c) ).getOrElse(c))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
trait ThreadPoolBuilder extends Logging {
|
|
||||||
val name: String
|
|
||||||
|
|
||||||
private val NR_START_THREADS = 16
|
|
||||||
private val NR_MAX_THREADS = 128
|
|
||||||
private val KEEP_ALIVE_TIME = 60000L // default is one minute
|
|
||||||
private val MILLISECONDS = TimeUnit.MILLISECONDS
|
|
||||||
|
|
||||||
private var threadPoolBuilder: ThreadPoolExecutor = _
|
|
||||||
private var boundedExecutorBound = -1
|
|
||||||
protected var mailboxCapacity = -1
|
|
||||||
@volatile private var inProcessOfBuilding = false
|
|
||||||
private var blockingQueue: BlockingQueue[Runnable] = _
|
|
||||||
|
|
||||||
protected lazy val threadFactory = new MonitorableThreadFactory(name)
|
|
||||||
|
|
||||||
@volatile var executor: ExecutorService = _
|
|
||||||
|
|
||||||
def isShutdown = executor.isShutdown
|
|
||||||
|
|
||||||
def buildThreadPool(): ExecutorService = synchronized {
|
|
||||||
ensureNotActive
|
|
||||||
inProcessOfBuilding = false
|
|
||||||
|
|
||||||
log.debug("Creating a %s with config [core-pool:%d,max-pool:%d,timeout:%d,allowCoreTimeout:%s,rejectPolicy:%s]",
|
|
||||||
getClass.getName,
|
|
||||||
threadPoolBuilder.getCorePoolSize,
|
|
||||||
threadPoolBuilder.getMaximumPoolSize,
|
|
||||||
threadPoolBuilder.getKeepAliveTime(MILLISECONDS),
|
|
||||||
threadPoolBuilder.allowsCoreThreadTimeOut,
|
|
||||||
threadPoolBuilder.getRejectedExecutionHandler.getClass.getSimpleName)
|
|
||||||
|
|
||||||
if (boundedExecutorBound > 0) {
|
|
||||||
val boundedExecutor = new BoundedExecutorDecorator(threadPoolBuilder, boundedExecutorBound)
|
|
||||||
boundedExecutorBound = -1 //Why is this here?
|
|
||||||
executor = boundedExecutor
|
|
||||||
} else {
|
|
||||||
executor = threadPoolBuilder
|
|
||||||
}
|
|
||||||
executor
|
|
||||||
}
|
|
||||||
|
|
||||||
def withNewThreadPoolWithCustomBlockingQueue(queue: BlockingQueue[Runnable]): ThreadPoolBuilder = synchronized {
|
|
||||||
ensureNotActive
|
|
||||||
verifyNotInConstructionPhase
|
|
||||||
blockingQueue = queue
|
|
||||||
threadPoolBuilder = new ThreadPoolExecutor(NR_START_THREADS, NR_MAX_THREADS, KEEP_ALIVE_TIME, MILLISECONDS, queue)
|
|
||||||
this
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Creates a new thread pool in which the number of tasks in the pending queue is bounded. Will block when exceeded.
|
|
||||||
* <p/>
|
|
||||||
* The 'bound' variable should specify the number equal to the size of the thread pool PLUS the number of queued tasks that should be followed.
|
|
||||||
*/
|
|
||||||
def withNewBoundedThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity(bound: Int): ThreadPoolBuilder = synchronized {
|
|
||||||
ensureNotActive
|
|
||||||
verifyNotInConstructionPhase
|
|
||||||
blockingQueue = new LinkedBlockingQueue[Runnable]
|
|
||||||
threadPoolBuilder = new ThreadPoolExecutor(NR_START_THREADS, NR_MAX_THREADS, KEEP_ALIVE_TIME, MILLISECONDS, blockingQueue, threadFactory)
|
|
||||||
boundedExecutorBound = bound
|
|
||||||
this
|
|
||||||
}
|
|
||||||
|
|
||||||
def withNewThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity: ThreadPoolBuilder = synchronized {
|
|
||||||
ensureNotActive
|
|
||||||
verifyNotInConstructionPhase
|
|
||||||
blockingQueue = new LinkedBlockingQueue[Runnable]
|
|
||||||
threadPoolBuilder = new ThreadPoolExecutor(
|
|
||||||
NR_START_THREADS, NR_MAX_THREADS, KEEP_ALIVE_TIME, MILLISECONDS, blockingQueue, threadFactory, new CallerRunsPolicy)
|
|
||||||
this
|
|
||||||
}
|
|
||||||
|
|
||||||
def withNewThreadPoolWithLinkedBlockingQueueWithCapacity(capacity: Int): ThreadPoolBuilder = synchronized {
|
|
||||||
ensureNotActive
|
|
||||||
verifyNotInConstructionPhase
|
|
||||||
blockingQueue = new LinkedBlockingQueue[Runnable](capacity)
|
|
||||||
threadPoolBuilder = new ThreadPoolExecutor(
|
|
||||||
NR_START_THREADS, NR_MAX_THREADS, KEEP_ALIVE_TIME, MILLISECONDS, blockingQueue, threadFactory, new CallerRunsPolicy)
|
|
||||||
this
|
|
||||||
}
|
|
||||||
|
|
||||||
def withNewThreadPoolWithSynchronousQueueWithFairness(fair: Boolean): ThreadPoolBuilder = synchronized {
|
|
||||||
ensureNotActive
|
|
||||||
verifyNotInConstructionPhase
|
|
||||||
blockingQueue = new SynchronousQueue[Runnable](fair)
|
|
||||||
threadPoolBuilder = new ThreadPoolExecutor(
|
|
||||||
NR_START_THREADS, NR_MAX_THREADS, KEEP_ALIVE_TIME, MILLISECONDS, blockingQueue, threadFactory, new CallerRunsPolicy)
|
|
||||||
this
|
|
||||||
}
|
|
||||||
|
|
||||||
def withNewThreadPoolWithArrayBlockingQueueWithCapacityAndFairness(capacity: Int, fair: Boolean): ThreadPoolBuilder = synchronized {
|
|
||||||
ensureNotActive
|
|
||||||
verifyNotInConstructionPhase
|
|
||||||
blockingQueue = new ArrayBlockingQueue[Runnable](capacity, fair)
|
|
||||||
threadPoolBuilder = new ThreadPoolExecutor(
|
|
||||||
NR_START_THREADS, NR_MAX_THREADS, KEEP_ALIVE_TIME, MILLISECONDS, blockingQueue, threadFactory, new CallerRunsPolicy)
|
|
||||||
this
|
|
||||||
}
|
|
||||||
|
|
||||||
def configureIfPossible(f: (ThreadPoolBuilder) => Unit): Boolean = synchronized {
|
|
||||||
if(inProcessOfBuilding) {
|
|
||||||
f(this)
|
|
||||||
true
|
|
||||||
}
|
|
||||||
else {
|
|
||||||
log.warning("Tried to configure an already started ThreadPoolBuilder of type [%s]",getClass.getName)
|
|
||||||
false
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Default is 16.
|
|
||||||
*/
|
|
||||||
def setCorePoolSize(size: Int): ThreadPoolBuilder =
|
|
||||||
setThreadPoolExecutorProperty(_.setCorePoolSize(size))
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Default is 128.
|
|
||||||
*/
|
|
||||||
def setMaxPoolSize(size: Int): ThreadPoolBuilder =
|
|
||||||
setThreadPoolExecutorProperty(_.setMaximumPoolSize(size))
|
|
||||||
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Sets the core pool size to (availableProcessors * multipliers).ceil.toInt
|
|
||||||
*/
|
|
||||||
def setCorePoolSizeFromFactor(multiplier: Double): ThreadPoolBuilder =
|
|
||||||
setThreadPoolExecutorProperty(_.setCorePoolSize(procs(multiplier)))
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Sets the max pool size to (availableProcessors * multipliers).ceil.toInt
|
|
||||||
*/
|
|
||||||
def setMaxPoolSizeFromFactor(multiplier: Double): ThreadPoolBuilder =
|
|
||||||
setThreadPoolExecutorProperty(_.setMaximumPoolSize(procs(multiplier)))
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Sets the bound, -1 is unbounded
|
|
||||||
*/
|
|
||||||
def setExecutorBounds(bounds: Int): Unit = synchronized {
|
|
||||||
this.boundedExecutorBound = bounds
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Sets the mailbox capacity, -1 is unbounded
|
|
||||||
*/
|
|
||||||
def setMailboxCapacity(capacity: Int): Unit = synchronized {
|
|
||||||
this.mailboxCapacity = capacity
|
|
||||||
}
|
|
||||||
|
|
||||||
protected def procs(multiplier: Double): Int =
|
|
||||||
(Runtime.getRuntime.availableProcessors * multiplier).ceil.toInt
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Default is 60000 (one minute).
|
|
||||||
*/
|
|
||||||
def setKeepAliveTimeInMillis(time: Long): ThreadPoolBuilder =
|
|
||||||
setThreadPoolExecutorProperty(_.setKeepAliveTime(time, MILLISECONDS))
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Default ThreadPoolExecutor.CallerRunsPolicy. To allow graceful backing off when pool is overloaded.
|
|
||||||
*/
|
|
||||||
def setRejectionPolicy(policy: RejectedExecutionHandler): ThreadPoolBuilder =
|
|
||||||
setThreadPoolExecutorProperty(_.setRejectedExecutionHandler(policy))
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Default false, set to true to conserve thread for potentially unused dispatchers
|
|
||||||
*/
|
|
||||||
def setAllowCoreThreadTimeout(allow: Boolean) =
|
|
||||||
setThreadPoolExecutorProperty(_.allowCoreThreadTimeOut(allow))
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Default ThreadPoolExecutor.CallerRunsPolicy. To allow graceful backing off when pool is overloaded.
|
|
||||||
*/
|
|
||||||
protected def setThreadPoolExecutorProperty(f: (ThreadPoolExecutor) => Unit): ThreadPoolBuilder = synchronized {
|
|
||||||
ensureNotActive
|
|
||||||
verifyInConstructionPhase
|
|
||||||
f(threadPoolBuilder)
|
|
||||||
this
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
protected def verifyNotInConstructionPhase = {
|
|
||||||
if (inProcessOfBuilding) throw new IllegalActorStateException("Is already in the process of building a thread pool")
|
|
||||||
inProcessOfBuilding = true
|
|
||||||
}
|
|
||||||
|
|
||||||
protected def verifyInConstructionPhase = {
|
|
||||||
if (!inProcessOfBuilding) throw new IllegalActorStateException(
|
|
||||||
"Is not in the process of building a thread pool, start building one by invoking one of the 'newThreadPool*' methods")
|
|
||||||
}
|
|
||||||
|
|
||||||
def ensureNotActive(): Unit
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||||
*/
|
*/
|
||||||
class MonitorableThreadFactory(val name: String) extends ThreadFactory {
|
class MonitorableThreadFactory(val name: String) extends ThreadFactory {
|
||||||
protected val counter = new AtomicLong
|
protected val counter = new AtomicLong
|
||||||
|
|
||||||
def newThread(runnable: Runnable) =
|
def newThread(runnable: Runnable) = new MonitorableThread(runnable, name)
|
||||||
new MonitorableThread(runnable, name)
|
|
||||||
// new Thread(runnable, name + "-" + counter.getAndIncrement)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -382,10 +194,10 @@ class MonitorableThread(runnable: Runnable, name: String)
|
||||||
/**
|
/**
|
||||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||||
*/
|
*/
|
||||||
class BoundedExecutorDecorator(val executor: ExecutorService, bound: Int) extends ExecutorService with Logging {
|
class BoundedExecutorDecorator(val executor: ExecutorService, bound: Int) extends ExecutorServiceDelegate with Logging {
|
||||||
protected val semaphore = new Semaphore(bound)
|
protected val semaphore = new Semaphore(bound)
|
||||||
|
|
||||||
def execute(command: Runnable) = {
|
override def execute(command: Runnable) = {
|
||||||
semaphore.acquire
|
semaphore.acquire
|
||||||
try {
|
try {
|
||||||
executor.execute(new Runnable() {
|
executor.execute(new Runnable() {
|
||||||
|
|
@ -405,8 +217,14 @@ class BoundedExecutorDecorator(val executor: ExecutorService, bound: Int) extend
|
||||||
throw e
|
throw e
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
trait ExecutorServiceDelegate extends ExecutorService {
|
||||||
|
|
||||||
|
def executor: ExecutorService
|
||||||
|
|
||||||
|
def execute(command: Runnable) = executor.execute(command)
|
||||||
|
|
||||||
// Delegating methods for the ExecutorService interface
|
|
||||||
def shutdown = executor.shutdown
|
def shutdown = executor.shutdown
|
||||||
|
|
||||||
def shutdownNow = executor.shutdownNow
|
def shutdownNow = executor.shutdownNow
|
||||||
|
|
@ -431,3 +249,14 @@ class BoundedExecutorDecorator(val executor: ExecutorService, bound: Int) extend
|
||||||
|
|
||||||
def invokeAny[T](callables: Collection[_ <: Callable[T]], l: Long, timeUnit: TimeUnit) = executor.invokeAny(callables, l, timeUnit)
|
def invokeAny[T](callables: Collection[_ <: Callable[T]], l: Long, timeUnit: TimeUnit) = executor.invokeAny(callables, l, timeUnit)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
trait LazyExecutorService extends ExecutorServiceDelegate {
|
||||||
|
|
||||||
|
def createExecutor: ExecutorService
|
||||||
|
|
||||||
|
lazy val executor = createExecutor
|
||||||
|
}
|
||||||
|
|
||||||
|
class LazyExecutorServiceWrapper(executorFactory: => ExecutorService) extends LazyExecutorService {
|
||||||
|
def createExecutor = executorFactory
|
||||||
|
}
|
||||||
|
|
@ -36,7 +36,7 @@ class SupervisorMiscSpec extends WordSpec with MustMatchers {
|
||||||
}).start
|
}).start
|
||||||
|
|
||||||
val actor3 = Actor.actorOf(new Actor {
|
val actor3 = Actor.actorOf(new Actor {
|
||||||
self.dispatcher = Dispatchers.newExecutorBasedEventDrivenDispatcher("test")
|
self.dispatcher = Dispatchers.newExecutorBasedEventDrivenDispatcher("test").build
|
||||||
override def postRestart(cause: Throwable) {countDownLatch.countDown}
|
override def postRestart(cause: Throwable) {countDownLatch.countDown}
|
||||||
|
|
||||||
protected def receive = {
|
protected def receive = {
|
||||||
|
|
|
||||||
|
|
@ -10,7 +10,7 @@ import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
|
||||||
|
|
||||||
object ExecutorBasedEventDrivenDispatcherActorSpec {
|
object ExecutorBasedEventDrivenDispatcherActorSpec {
|
||||||
class TestActor extends Actor {
|
class TestActor extends Actor {
|
||||||
self.dispatcher = Dispatchers.newExecutorBasedEventDrivenDispatcher(self.uuid.toString)
|
self.dispatcher = Dispatchers.newExecutorBasedEventDrivenDispatcher(self.uuid.toString).build
|
||||||
def receive = {
|
def receive = {
|
||||||
case "Hello" =>
|
case "Hello" =>
|
||||||
self.reply("World")
|
self.reply("World")
|
||||||
|
|
@ -23,7 +23,7 @@ object ExecutorBasedEventDrivenDispatcherActorSpec {
|
||||||
val oneWay = new CountDownLatch(1)
|
val oneWay = new CountDownLatch(1)
|
||||||
}
|
}
|
||||||
class OneWayTestActor extends Actor {
|
class OneWayTestActor extends Actor {
|
||||||
self.dispatcher = Dispatchers.newExecutorBasedEventDrivenDispatcher(self.uuid.toString)
|
self.dispatcher = Dispatchers.newExecutorBasedEventDrivenDispatcher(self.uuid.toString).build
|
||||||
def receive = {
|
def receive = {
|
||||||
case "OneWay" => OneWayTestActor.oneWay.countDown
|
case "OneWay" => OneWayTestActor.oneWay.countDown
|
||||||
}
|
}
|
||||||
|
|
@ -68,9 +68,10 @@ class ExecutorBasedEventDrivenDispatcherActorSpec extends JUnitSuite {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test def shouldRespectThroughput {
|
@Test def shouldRespectThroughput {
|
||||||
val throughputDispatcher = new ExecutorBasedEventDrivenDispatcher("THROUGHPUT",101,0,Dispatchers.MAILBOX_TYPE, (e) => {
|
val throughputDispatcher = Dispatchers.
|
||||||
e.setCorePoolSize(1)
|
newExecutorBasedEventDrivenDispatcher("THROUGHPUT",101,0,Dispatchers.MAILBOX_TYPE).
|
||||||
})
|
setCorePoolSize(1).
|
||||||
|
build
|
||||||
|
|
||||||
val works = new AtomicBoolean(true)
|
val works = new AtomicBoolean(true)
|
||||||
val latch = new CountDownLatch(100)
|
val latch = new CountDownLatch(100)
|
||||||
|
|
@ -103,10 +104,10 @@ class ExecutorBasedEventDrivenDispatcherActorSpec extends JUnitSuite {
|
||||||
|
|
||||||
@Test def shouldRespectThroughputDeadline {
|
@Test def shouldRespectThroughputDeadline {
|
||||||
val deadlineMs = 100
|
val deadlineMs = 100
|
||||||
val throughputDispatcher = new ExecutorBasedEventDrivenDispatcher("THROUGHPUT",2,deadlineMs,Dispatchers.MAILBOX_TYPE, (e) => {
|
val throughputDispatcher = Dispatchers.
|
||||||
e.setCorePoolSize(1)
|
newExecutorBasedEventDrivenDispatcher("THROUGHPUT",2,deadlineMs,Dispatchers.MAILBOX_TYPE).
|
||||||
})
|
setCorePoolSize(1).
|
||||||
|
build
|
||||||
val works = new AtomicBoolean(true)
|
val works = new AtomicBoolean(true)
|
||||||
val latch = new CountDownLatch(1)
|
val latch = new CountDownLatch(1)
|
||||||
val start = new CountDownLatch(1)
|
val start = new CountDownLatch(1)
|
||||||
|
|
|
||||||
|
|
@ -11,9 +11,9 @@ import Actor._
|
||||||
import se.scalablesolutions.akka.dispatch.{MessageQueue, Dispatchers}
|
import se.scalablesolutions.akka.dispatch.{MessageQueue, Dispatchers}
|
||||||
|
|
||||||
object ExecutorBasedEventDrivenWorkStealingDispatcherSpec {
|
object ExecutorBasedEventDrivenWorkStealingDispatcherSpec {
|
||||||
val delayableActorDispatcher = Dispatchers.newExecutorBasedEventDrivenWorkStealingDispatcher("pooled-dispatcher")
|
val delayableActorDispatcher = Dispatchers.newExecutorBasedEventDrivenWorkStealingDispatcher("pooled-dispatcher").build
|
||||||
val sharedActorDispatcher = Dispatchers.newExecutorBasedEventDrivenWorkStealingDispatcher("pooled-dispatcher")
|
val sharedActorDispatcher = Dispatchers.newExecutorBasedEventDrivenWorkStealingDispatcher("pooled-dispatcher").build
|
||||||
val parentActorDispatcher = Dispatchers.newExecutorBasedEventDrivenWorkStealingDispatcher("pooled-dispatcher")
|
val parentActorDispatcher = Dispatchers.newExecutorBasedEventDrivenWorkStealingDispatcher("pooled-dispatcher").build
|
||||||
|
|
||||||
class DelayableActor(name: String, delay: Int, finishedCounter: CountDownLatch) extends Actor {
|
class DelayableActor(name: String, delay: Int, finishedCounter: CountDownLatch) extends Actor {
|
||||||
self.dispatcher = delayableActorDispatcher
|
self.dispatcher = delayableActorDispatcher
|
||||||
|
|
|
||||||
|
|
@ -11,6 +11,7 @@ import se.scalablesolutions.akka.actor.ActorRef
|
||||||
import java.util.concurrent.RejectedExecutionHandler
|
import java.util.concurrent.RejectedExecutionHandler
|
||||||
import java.util.concurrent.ThreadPoolExecutor.{DiscardPolicy, DiscardOldestPolicy, CallerRunsPolicy, AbortPolicy}
|
import java.util.concurrent.ThreadPoolExecutor.{DiscardPolicy, DiscardOldestPolicy, CallerRunsPolicy, AbortPolicy}
|
||||||
import se.scalablesolutions.akka.dispatch._
|
import se.scalablesolutions.akka.dispatch._
|
||||||
|
import se.scalablesolutions.akka.util.Duration
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Reusable factory method for dispatchers.
|
* Reusable factory method for dispatchers.
|
||||||
|
|
@ -24,53 +25,66 @@ object DispatcherFactoryBean {
|
||||||
*/
|
*/
|
||||||
def createNewInstance(properties: DispatcherProperties, actorRef: Option[ActorRef] = None): MessageDispatcher = {
|
def createNewInstance(properties: DispatcherProperties, actorRef: Option[ActorRef] = None): MessageDispatcher = {
|
||||||
|
|
||||||
def configThreadPool(): ThreadPoolConfig = {
|
//Creates a ThreadPoolConfigDispatcherBuilder and applies the configuration to it
|
||||||
val poolCfg = ThreadPoolConfig()
|
def configureThreadPool(createDispatcher: => (ThreadPoolConfig) => MessageDispatcher): ThreadPoolConfigDispatcherBuilder = {
|
||||||
if ((properties.threadPool ne null) && (properties.threadPool.queue ne null)) {
|
if ((properties.threadPool ne null) && (properties.threadPool.queue ne null)) {
|
||||||
properties.threadPool.queue match {
|
import ThreadPoolConfigDispatcherBuilder.conf_?
|
||||||
case VAL_BOUNDED_ARRAY_BLOCKING_QUEUE => threadPoolBuilder.withNewThreadPoolWithArrayBlockingQueueWithCapacityAndFairness(properties.threadPool.capacity, properties.threadPool.fairness)
|
import properties._
|
||||||
case VAL_UNBOUNDED_LINKED_BLOCKING_QUEUE if properties.threadPool.capacity > -1 => threadPoolBuilder.withNewThreadPoolWithLinkedBlockingQueueWithCapacity(properties.threadPool.capacity)
|
val queueDef = Some(threadPool.queue)
|
||||||
case VAL_UNBOUNDED_LINKED_BLOCKING_QUEUE if properties.threadPool.capacity <= 0 => threadPoolBuilder.withNewThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity
|
val corePoolSize = if (threadPool.corePoolSize > -1) Some(threadPool.corePoolSize) else None
|
||||||
case VAL_BOUNDED_LINKED_BLOCKING_QUEUE => threadPoolBuilder.withNewBoundedThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity(properties.threadPool.bound)
|
val maxPoolSize = if (threadPool.maxPoolSize > -1) Some(threadPool.maxPoolSize) else None
|
||||||
case VAL_SYNCHRONOUS_QUEUE => threadPoolBuilder.withNewThreadPoolWithSynchronousQueueWithFairness(properties.threadPool.fairness)
|
val keepAlive = if (threadPool.keepAlive > -1) Some(threadPool.keepAlive) else None
|
||||||
case _ => throw new IllegalArgumentException("unknown queue type")
|
val executorBounds = if (threadPool.bound > -1) Some(threadPool.bound) else None
|
||||||
|
val flowHandler = threadPool.rejectionPolicy match {
|
||||||
|
case null | "" => None
|
||||||
|
case "abort-policy" => Some(new AbortPolicy())
|
||||||
|
case "caller-runs-policy" => Some(new CallerRunsPolicy())
|
||||||
|
case "discard-oldest-policy" => Some(new DiscardOldestPolicy())
|
||||||
|
case "discard-policy" => Some(new DiscardPolicy())
|
||||||
|
case x => throw new IllegalArgumentException("Unknown rejection-policy '" + x + "'")
|
||||||
}
|
}
|
||||||
|
|
||||||
if (properties.threadPool.corePoolSize > -1)
|
//Apply the following options to the config if they are present in the cfg
|
||||||
threadPoolBuilder.setCorePoolSize(properties.threadPool.corePoolSize)
|
ThreadPoolConfigDispatcherBuilder(createDispatcher,ThreadPoolConfig()).configure(
|
||||||
|
conf_?(queueDef )(definition => definition match {
|
||||||
if (properties.threadPool.maxPoolSize > -1)
|
case VAL_BOUNDED_ARRAY_BLOCKING_QUEUE =>
|
||||||
threadPoolBuilder.setMaxPoolSize(properties.threadPool.maxPoolSize)
|
_.withNewThreadPoolWithArrayBlockingQueueWithCapacityAndFairness(threadPool.capacity,threadPool.fairness)
|
||||||
|
case VAL_UNBOUNDED_LINKED_BLOCKING_QUEUE if threadPool.capacity > -1 =>
|
||||||
if (properties.threadPool.keepAlive > -1)
|
_.withNewThreadPoolWithLinkedBlockingQueueWithCapacity(threadPool.capacity)
|
||||||
threadPoolBuilder.setKeepAliveTimeInMillis(properties.threadPool.keepAlive)
|
case VAL_UNBOUNDED_LINKED_BLOCKING_QUEUE if threadPool.capacity <= 0 =>
|
||||||
|
_.withNewThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity
|
||||||
if (properties.threadPool.mailboxCapacity > -1)
|
case VAL_BOUNDED_LINKED_BLOCKING_QUEUE =>
|
||||||
threadPoolBuilder.setMailboxCapacity(properties.threadPool.mailboxCapacity)
|
_.withNewBoundedThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity(threadPool.bound)
|
||||||
|
case VAL_SYNCHRONOUS_QUEUE =>
|
||||||
if ((properties.threadPool.rejectionPolicy ne null) && (!properties.threadPool.rejectionPolicy.isEmpty)) {
|
_.withNewThreadPoolWithSynchronousQueueWithFairness(threadPool.fairness)
|
||||||
val policy: RejectedExecutionHandler = properties.threadPool.rejectionPolicy match {
|
case unknown =>
|
||||||
case "abort-policy" => new AbortPolicy()
|
throw new IllegalArgumentException("Unknown queue type " + unknown)
|
||||||
case "caller-runs-policy" => new CallerRunsPolicy()
|
}),
|
||||||
case "discard-oldest-policy" => new DiscardOldestPolicy()
|
conf_?(keepAlive )(time => _.setKeepAliveTimeInMillis(time)),
|
||||||
case "discard-policy" => new DiscardPolicy()
|
conf_?(corePoolSize )(count => _.setCorePoolSize(count)),
|
||||||
case _ => throw new IllegalArgumentException("Unknown rejection-policy '" + properties.threadPool.rejectionPolicy + "'")
|
conf_?(maxPoolSize )(count => _.setMaxPoolSize(count)),
|
||||||
|
conf_?(executorBounds)(bounds => _.setExecutorBounds(bounds)),
|
||||||
|
conf_?(flowHandler )(policy => _.setRejectionPolicy(policy)))
|
||||||
}
|
}
|
||||||
threadPoolBuilder.setRejectionPolicy(policy)
|
else
|
||||||
}
|
ThreadPoolConfigDispatcherBuilder(createDispatcher,ThreadPoolConfig())
|
||||||
} else poolCfg
|
|
||||||
}
|
}
|
||||||
|
|
||||||
var dispatcher = properties.dispatcherType match {
|
//Create the dispatcher
|
||||||
case EXECUTOR_BASED_EVENT_DRIVEN => new ExecutorBasedEventDrivenDispatcher(properties.name, config = configThreadPool)
|
properties.dispatcherType match {
|
||||||
case EXECUTOR_BASED_EVENT_DRIVEN_WORK_STEALING => Dispatchers.newExecutorBasedEventDrivenWorkStealingDispatcher(properties.name)
|
case EXECUTOR_BASED_EVENT_DRIVEN =>
|
||||||
case THREAD_BASED if actorRef.isEmpty => throw new IllegalArgumentException("Need an ActorRef to create a thread based dispatcher.")
|
configureThreadPool(poolConfig => new ExecutorBasedEventDrivenDispatcher(properties.name, poolConfig)).build
|
||||||
case THREAD_BASED if actorRef.isDefined => Dispatchers.newThreadBasedDispatcher(actorRef.get)
|
case EXECUTOR_BASED_EVENT_DRIVEN_WORK_STEALING =>
|
||||||
case HAWT => Dispatchers.newHawtDispatcher(properties.aggregate)
|
configureThreadPool(poolConfig => new ExecutorBasedEventDrivenWorkStealingDispatcher(properties.name,Dispatchers.MAILBOX_TYPE,poolConfig)).build
|
||||||
case _ => throw new IllegalArgumentException("unknown dispatcher type")
|
case THREAD_BASED if actorRef.isEmpty =>
|
||||||
|
throw new IllegalArgumentException("Need an ActorRef to create a thread based dispatcher.")
|
||||||
|
case THREAD_BASED if actorRef.isDefined =>
|
||||||
|
Dispatchers.newThreadBasedDispatcher(actorRef.get)
|
||||||
|
case HAWT =>
|
||||||
|
Dispatchers.newHawtDispatcher(properties.aggregate)
|
||||||
|
case unknown =>
|
||||||
|
throw new IllegalArgumentException("Unknown dispatcher type " + unknown)
|
||||||
}
|
}
|
||||||
|
|
||||||
dispatcher
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -130,13 +130,18 @@ class DispatcherSpringFeatureTest extends FeatureSpec with ShouldMatchers {
|
||||||
* get ThreadPoolExecutor via reflection and assert that dispatcher is correct type
|
* get ThreadPoolExecutor via reflection and assert that dispatcher is correct type
|
||||||
*/
|
*/
|
||||||
private def getThreadPoolExecutorAndAssert(dispatcher: MessageDispatcher): ThreadPoolExecutor = {
|
private def getThreadPoolExecutorAndAssert(dispatcher: MessageDispatcher): ThreadPoolExecutor = {
|
||||||
assert(dispatcher.isInstanceOf[ThreadPoolBuilder])
|
|
||||||
val pool = dispatcher.asInstanceOf[ThreadPoolBuilder]
|
def unpackExecutorService(e: ExecutorService): ExecutorService = e match {
|
||||||
val field = pool.getClass.getDeclaredField("se$scalablesolutions$akka$dispatch$ThreadPoolBuilder$$threadPoolBuilder")
|
case b: ExecutorServiceDelegate => unpackExecutorService(b.executor)
|
||||||
field.setAccessible(true)
|
case t: ThreadPoolExecutor => t
|
||||||
val executor = field.get(pool).asInstanceOf[ThreadPoolExecutor]
|
case e => throw new IllegalStateException("Illegal executor type: " + e)
|
||||||
assert(executor ne null)
|
}
|
||||||
executor;
|
|
||||||
|
unpackExecutorService(dispatcher match {
|
||||||
|
case e: ExecutorBasedEventDrivenDispatcher => e.start; e.executorService.get()
|
||||||
|
case e: ExecutorBasedEventDrivenWorkStealingDispatcher => e.start; e.executorService.get()
|
||||||
|
case x => throw new IllegalStateException("Illegal dispatcher type: " + x)
|
||||||
|
}).asInstanceOf[ThreadPoolExecutor]
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -30,7 +30,7 @@ class TypedActorGuiceConfiguratorSpec extends
|
||||||
|
|
||||||
override def beforeAll {
|
override def beforeAll {
|
||||||
Config.config
|
Config.config
|
||||||
val dispatcher = Dispatchers.newExecutorBasedEventDrivenDispatcher("test")
|
val dispatcher = Dispatchers.newExecutorBasedEventDrivenDispatcher("test").build
|
||||||
|
|
||||||
conf.addExternalGuiceModule(new AbstractModule {
|
conf.addExternalGuiceModule(new AbstractModule {
|
||||||
def configure = bind(classOf[Ext]).to(classOf[ExtImpl]).in(Scopes.SINGLETON)
|
def configure = bind(classOf[Ext]).to(classOf[ExtImpl]).in(Scopes.SINGLETON)
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue