Added option to use a blocking mailbox with custom capacity

This commit is contained in:
Jonas Bonér 2010-08-21 10:45:00 +02:00
parent 60d16d91e0
commit fd69201728
5 changed files with 42 additions and 23 deletions

View file

@ -143,7 +143,7 @@ object Dispatchers extends Logging {
* Throws: IllegalArgumentException if the value of "type" is not valid * Throws: IllegalArgumentException if the value of "type" is not valid
*/ */
def from(cfg: ConfigMap): Option[MessageDispatcher] = { def from(cfg: ConfigMap): Option[MessageDispatcher] = {
lazy val name = cfg.getString("name",UUID.newUuid.toString) lazy val name = cfg.getString("name", UUID.newUuid.toString)
val dispatcher: Option[MessageDispatcher] = cfg.getString("type") map { val dispatcher: Option[MessageDispatcher] = cfg.getString("type") map {
case "ReactorBasedSingleThreadEventDriven" => newReactorBasedSingleThreadEventDrivenDispatcher(name) case "ReactorBasedSingleThreadEventDriven" => newReactorBasedSingleThreadEventDrivenDispatcher(name)
@ -167,6 +167,7 @@ object Dispatchers extends Logging {
cfg.getDouble("max-pool-size-factor").foreach(builder.setMaxPoolSizeFromFactor(_)) cfg.getDouble("max-pool-size-factor").foreach(builder.setMaxPoolSizeFromFactor(_))
cfg.getInt("executor-bounds").foreach(builder.setExecutorBounds(_)) cfg.getInt("executor-bounds").foreach(builder.setExecutorBounds(_))
cfg.getBool("allow-core-timeout").foreach(builder.setAllowCoreThreadTimeout(_)) cfg.getBool("allow-core-timeout").foreach(builder.setAllowCoreThreadTimeout(_))
cfg.getInt("mailbox-capacity").foreach(builder.setMailboxCapacity(_))
cfg.getString("rejection-policy").map({ cfg.getString("rejection-policy").map({
case "abort" => new AbortPolicy() case "abort" => new AbortPolicy()

View file

@ -5,7 +5,9 @@
package se.scalablesolutions.akka.dispatch package se.scalablesolutions.akka.dispatch
import se.scalablesolutions.akka.actor.{ActorRef, IllegalActorStateException} import se.scalablesolutions.akka.actor.{ActorRef, IllegalActorStateException}
import jsr166x.ConcurrentLinkedDeque
import java.util.Queue
import java.util.concurrent.{ConcurrentLinkedQueue, LinkedBlockingQueue}
/** /**
* Default settings are: * Default settings are:
@ -76,14 +78,14 @@ class ExecutorBasedEventDrivenDispatcher(_name: String, throughput: Int = Dispat
/** /**
* @return the mailbox associated with the actor * @return the mailbox associated with the actor
*/ */
private def getMailbox(receiver: ActorRef) = receiver.mailbox.asInstanceOf[ConcurrentLinkedDeque[MessageInvocation]] private def getMailbox(receiver: ActorRef) = receiver.mailbox.asInstanceOf[Queue[MessageInvocation]]
override def mailboxSize(actorRef: ActorRef) = getMailbox(actorRef).size override def mailboxSize(actorRef: ActorRef) = getMailbox(actorRef).size
override def register(actorRef: ActorRef) = { override def register(actorRef: ActorRef) = {
// The actor will need a ConcurrentLinkedDeque based mailbox
if( actorRef.mailbox eq null ) { if( actorRef.mailbox eq null ) {
actorRef.mailbox = new ConcurrentLinkedDeque[MessageInvocation]() if (mailboxCapacity <= 0) actorRef.mailbox = new ConcurrentLinkedQueue[MessageInvocation]()
else actorRef.mailbox = new LinkedBlockingQueue[MessageInvocation](mailboxCapacity)
} }
super.register(actorRef) super.register(actorRef)
} }

View file

@ -5,9 +5,9 @@
package se.scalablesolutions.akka.dispatch package se.scalablesolutions.akka.dispatch
import java.util.concurrent.CopyOnWriteArrayList import java.util.concurrent.CopyOnWriteArrayList
import jsr166x.{Deque, ConcurrentLinkedDeque, LinkedBlockingDeque}
import se.scalablesolutions.akka.actor.{Actor, ActorRef, IllegalActorStateException} import se.scalablesolutions.akka.actor.{Actor, ActorRef, IllegalActorStateException}
import jsr166x.ConcurrentLinkedDeque
/** /**
* An executor based event driven dispatcher which will try to redistribute work from busy actors to idle actors. It is assumed * An executor based event driven dispatcher which will try to redistribute work from busy actors to idle actors. It is assumed
@ -49,7 +49,7 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher(_name: String) extends Mess
/** /**
* @return the mailbox associated with the actor * @return the mailbox associated with the actor
*/ */
private def getMailbox(receiver: ActorRef) = receiver.mailbox.asInstanceOf[ConcurrentLinkedDeque[MessageInvocation]] private def getMailbox(receiver: ActorRef) = receiver.mailbox.asInstanceOf[Deque[MessageInvocation]]
override def mailboxSize(actorRef: ActorRef) = getMailbox(actorRef).size override def mailboxSize(actorRef: ActorRef) = getMailbox(actorRef).size
@ -182,7 +182,8 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher(_name: String) extends Mess
verifyActorsAreOfSameType(actorRef) verifyActorsAreOfSameType(actorRef)
// The actor will need a ConcurrentLinkedDeque based mailbox // The actor will need a ConcurrentLinkedDeque based mailbox
if( actorRef.mailbox == null ) { if( actorRef.mailbox == null ) {
actorRef.mailbox = new ConcurrentLinkedDeque[MessageInvocation]() if (mailboxCapacity <= 0) actorRef.mailbox = new ConcurrentLinkedDeque[MessageInvocation]()
else actorRef.mailbox = new LinkedBlockingDeque[MessageInvocation](mailboxCapacity)
} }
pooledActors.add(actorRef) pooledActors.add(actorRef)
super.register(actorRef) super.register(actorRef)

View file

@ -22,6 +22,7 @@ trait ThreadPoolBuilder extends Logging {
private var threadPoolBuilder: ThreadPoolExecutor = _ private var threadPoolBuilder: ThreadPoolExecutor = _
private var boundedExecutorBound = -1 private var boundedExecutorBound = -1
protected var mailboxCapacity = -1
@volatile private var inProcessOfBuilding = false @volatile private var inProcessOfBuilding = false
private var blockingQueue: BlockingQueue[Runnable] = _ private var blockingQueue: BlockingQueue[Runnable] = _
@ -154,6 +155,13 @@ trait ThreadPoolBuilder extends Logging {
this.boundedExecutorBound = bounds this.boundedExecutorBound = bounds
} }
/**
* Sets the mailbox capacity, -1 is unbounded
*/
def setMailboxCapacity(capacity: Int): Unit = synchronized {
this.mailboxCapacity = capacity
}
protected def procs(multiplier: Double): Int = protected def procs(multiplier: Double): Int =
(Runtime.getRuntime.availableProcessors * multiplier).ceil.toInt (Runtime.getRuntime.availableProcessors * multiplier).ceil.toInt

View file

@ -18,22 +18,29 @@ akka {
"sample.security.Boot"] "sample.security.Boot"]
actor { actor {
timeout = 5 # default timeout for future based invocations timeout = 5 # Default timeout for future based invocations
serialize-messages = off # does a deep clone of (non-primitive) messages to ensure immutability serialize-messages = off # Does a deep clone of (non-primitive) messages to ensure immutability
throughput = 5 # Default throughput for all ExecutorBasedEventDrivenDispatcher throughput = 5 # Default throughput for all ExecutorBasedEventDrivenDispatcher
default-dispatcher { default-dispatcher {
type = "GlobalExecutorBasedEventDriven" # Must be one of the following, all "Global*" are non-configurable type = "GlobalExecutorBasedEventDriven" # Must be one of the following, all "Global*" are non-configurable
# ReactorBasedSingleThreadEventDriven, (ExecutorBasedEventDrivenWorkStealing), ExecutorBasedEventDriven, # ReactorBasedSingleThreadEventDriven,
# ReactorBasedThreadPoolEventDriven, Hawt, GlobalReactorBasedSingleThreadEventDriven, # ExecutorBasedEventDrivenWorkStealing,
# GlobalReactorBasedThreadPoolEventDriven, GlobalExecutorBasedEventDriven, GlobalHawt # ExecutorBasedEventDriven,
keep-alive-ms = 60000 # Keep alive time for threads # ReactorBasedThreadPoolEventDriven,
core-pool-size-factor = 1.0 # No of core threads ... ceil(available processors * factor) # Hawt,
max-pool-size-factor = 4.0 # Max no of threads ... ceil(available processors * factor) # GlobalReactorBasedSingleThreadEventDriven,
executor-bounds = -1 # Makes the Executor bounded, -1 is unbounded # GlobalReactorBasedThreadPoolEventDriven,
allow-core-timeout = on # Allow core threads to time out # GlobalExecutorBasedEventDriven, GlobalHawt
keep-alive-ms = 60000 # Keep alive time for threads
core-pool-size-factor = 1.0 # No of core threads ... ceil(available processors * factor)
max-pool-size-factor = 4.0 # Max no of threads ... ceil(available processors * factor)
executor-bounds = -1 # Makes the Executor bounded, -1 is unbounded
allow-core-timeout = on # Allow core threads to time out
rejection-policy = "caller-runs" # abort, caller-runs, discard-oldest, discard rejection-policy = "caller-runs" # abort, caller-runs, discard-oldest, discard
throughput = 5 # Throughput for ExecutorBasedEventDrivenDispatcher throughput = 5 # Throughput for ExecutorBasedEventDrivenDispatcher
aggregate = off # Aggregate on/off for HawtDispatchers aggregate = off # Aggregate on/off for HawtDispatchers
mailbox-capacity = 100 # If negative (or zero) then an unbounded mailbox is used (default)
# If positive then a bounded mailbox is used and the capacity is set to the number specificed
} }
} }
@ -56,10 +63,10 @@ akka {
service = on service = on
hostname = "localhost" hostname = "localhost"
port = 9998 port = 9998
filters = ["se.scalablesolutions.akka.security.AkkaSecurityFilterFactory"] # List with all jersey filters to use filters = ["se.scalablesolutions.akka.security.AkkaSecurityFilterFactory"] # List with all jersey filters to use
resource_packages = ["sample.rest.scala", resource_packages = ["sample.rest.scala",
"sample.rest.java", "sample.rest.java",
"sample.security"] # List with all resource packages for your Jersey services "sample.security"] # List with all resource packages for your Jersey services
authenticator = "sample.security.BasicAuthenticationService" # The authentication service to use. Need to be overridden (uses sample now) authenticator = "sample.security.BasicAuthenticationService" # The authentication service to use. Need to be overridden (uses sample now)
comet-dispatcher { comet-dispatcher {