From fd69201728097cb45e1620ebbacea3b98947ed91 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonas=20Bon=C3=A9r?= Date: Sat, 21 Aug 2010 10:45:00 +0200 Subject: [PATCH] Added option to use a blocking mailbox with custom capacity --- .../src/main/scala/dispatch/Dispatchers.scala | 3 +- .../ExecutorBasedEventDrivenDispatcher.scala | 10 +++-- ...sedEventDrivenWorkStealingDispatcher.scala | 7 ++-- .../scala/dispatch/ThreadPoolBuilder.scala | 8 ++++ config/akka-reference.conf | 37 +++++++++++-------- 5 files changed, 42 insertions(+), 23 deletions(-) diff --git a/akka-core/src/main/scala/dispatch/Dispatchers.scala b/akka-core/src/main/scala/dispatch/Dispatchers.scala index 82c51c57de..1f31253178 100644 --- a/akka-core/src/main/scala/dispatch/Dispatchers.scala +++ b/akka-core/src/main/scala/dispatch/Dispatchers.scala @@ -143,7 +143,7 @@ object Dispatchers extends Logging { * Throws: IllegalArgumentException if the value of "type" is not valid */ def from(cfg: ConfigMap): Option[MessageDispatcher] = { - lazy val name = cfg.getString("name",UUID.newUuid.toString) + lazy val name = cfg.getString("name", UUID.newUuid.toString) val dispatcher: Option[MessageDispatcher] = cfg.getString("type") map { case "ReactorBasedSingleThreadEventDriven" => newReactorBasedSingleThreadEventDrivenDispatcher(name) @@ -167,6 +167,7 @@ object Dispatchers extends Logging { cfg.getDouble("max-pool-size-factor").foreach(builder.setMaxPoolSizeFromFactor(_)) cfg.getInt("executor-bounds").foreach(builder.setExecutorBounds(_)) cfg.getBool("allow-core-timeout").foreach(builder.setAllowCoreThreadTimeout(_)) + cfg.getInt("mailbox-capacity").foreach(builder.setMailboxCapacity(_)) cfg.getString("rejection-policy").map({ case "abort" => new AbortPolicy() diff --git a/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala b/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala index 1f03c1eba2..f4c3cf036a 100644 --- a/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala +++ b/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala @@ -5,7 +5,9 @@ package se.scalablesolutions.akka.dispatch import se.scalablesolutions.akka.actor.{ActorRef, IllegalActorStateException} -import jsr166x.ConcurrentLinkedDeque + +import java.util.Queue +import java.util.concurrent.{ConcurrentLinkedQueue, LinkedBlockingQueue} /** * Default settings are: @@ -76,14 +78,14 @@ class ExecutorBasedEventDrivenDispatcher(_name: String, throughput: Int = Dispat /** * @return the mailbox associated with the actor */ - private def getMailbox(receiver: ActorRef) = receiver.mailbox.asInstanceOf[ConcurrentLinkedDeque[MessageInvocation]] + private def getMailbox(receiver: ActorRef) = receiver.mailbox.asInstanceOf[Queue[MessageInvocation]] override def mailboxSize(actorRef: ActorRef) = getMailbox(actorRef).size override def register(actorRef: ActorRef) = { - // The actor will need a ConcurrentLinkedDeque based mailbox if( actorRef.mailbox eq null ) { - actorRef.mailbox = new ConcurrentLinkedDeque[MessageInvocation]() + if (mailboxCapacity <= 0) actorRef.mailbox = new ConcurrentLinkedQueue[MessageInvocation]() + else actorRef.mailbox = new LinkedBlockingQueue[MessageInvocation](mailboxCapacity) } super.register(actorRef) } diff --git a/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala b/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala index b9ff5d92f4..5c00c6838d 100644 --- a/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala +++ b/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala @@ -5,9 +5,9 @@ package se.scalablesolutions.akka.dispatch import java.util.concurrent.CopyOnWriteArrayList +import jsr166x.{Deque, ConcurrentLinkedDeque, LinkedBlockingDeque} import se.scalablesolutions.akka.actor.{Actor, ActorRef, IllegalActorStateException} -import jsr166x.ConcurrentLinkedDeque /** * An executor based event driven dispatcher which will try to redistribute work from busy actors to idle actors. It is assumed @@ -49,7 +49,7 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher(_name: String) extends Mess /** * @return the mailbox associated with the actor */ - private def getMailbox(receiver: ActorRef) = receiver.mailbox.asInstanceOf[ConcurrentLinkedDeque[MessageInvocation]] + private def getMailbox(receiver: ActorRef) = receiver.mailbox.asInstanceOf[Deque[MessageInvocation]] override def mailboxSize(actorRef: ActorRef) = getMailbox(actorRef).size @@ -182,7 +182,8 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher(_name: String) extends Mess verifyActorsAreOfSameType(actorRef) // The actor will need a ConcurrentLinkedDeque based mailbox if( actorRef.mailbox == null ) { - actorRef.mailbox = new ConcurrentLinkedDeque[MessageInvocation]() + if (mailboxCapacity <= 0) actorRef.mailbox = new ConcurrentLinkedDeque[MessageInvocation]() + else actorRef.mailbox = new LinkedBlockingDeque[MessageInvocation](mailboxCapacity) } pooledActors.add(actorRef) super.register(actorRef) diff --git a/akka-core/src/main/scala/dispatch/ThreadPoolBuilder.scala b/akka-core/src/main/scala/dispatch/ThreadPoolBuilder.scala index 7cc96400a8..a8cc9c3c30 100644 --- a/akka-core/src/main/scala/dispatch/ThreadPoolBuilder.scala +++ b/akka-core/src/main/scala/dispatch/ThreadPoolBuilder.scala @@ -22,6 +22,7 @@ trait ThreadPoolBuilder extends Logging { private var threadPoolBuilder: ThreadPoolExecutor = _ private var boundedExecutorBound = -1 + protected var mailboxCapacity = -1 @volatile private var inProcessOfBuilding = false private var blockingQueue: BlockingQueue[Runnable] = _ @@ -154,6 +155,13 @@ trait ThreadPoolBuilder extends Logging { this.boundedExecutorBound = bounds } + /** + * Sets the mailbox capacity, -1 is unbounded + */ + def setMailboxCapacity(capacity: Int): Unit = synchronized { + this.mailboxCapacity = capacity + } + protected def procs(multiplier: Double): Int = (Runtime.getRuntime.availableProcessors * multiplier).ceil.toInt diff --git a/config/akka-reference.conf b/config/akka-reference.conf index 827ec7b5ad..e741fccd23 100644 --- a/config/akka-reference.conf +++ b/config/akka-reference.conf @@ -18,22 +18,29 @@ akka { "sample.security.Boot"] actor { - timeout = 5 # default timeout for future based invocations - serialize-messages = off # does a deep clone of (non-primitive) messages to ensure immutability - throughput = 5 # Default throughput for all ExecutorBasedEventDrivenDispatcher + timeout = 5 # Default timeout for future based invocations + serialize-messages = off # Does a deep clone of (non-primitive) messages to ensure immutability + throughput = 5 # Default throughput for all ExecutorBasedEventDrivenDispatcher default-dispatcher { type = "GlobalExecutorBasedEventDriven" # Must be one of the following, all "Global*" are non-configurable - # ReactorBasedSingleThreadEventDriven, (ExecutorBasedEventDrivenWorkStealing), ExecutorBasedEventDriven, - # ReactorBasedThreadPoolEventDriven, Hawt, GlobalReactorBasedSingleThreadEventDriven, - # GlobalReactorBasedThreadPoolEventDriven, GlobalExecutorBasedEventDriven, GlobalHawt - keep-alive-ms = 60000 # Keep alive time for threads - core-pool-size-factor = 1.0 # No of core threads ... ceil(available processors * factor) - max-pool-size-factor = 4.0 # Max no of threads ... ceil(available processors * factor) - executor-bounds = -1 # Makes the Executor bounded, -1 is unbounded - allow-core-timeout = on # Allow core threads to time out + # ReactorBasedSingleThreadEventDriven, + # ExecutorBasedEventDrivenWorkStealing, + # ExecutorBasedEventDriven, + # ReactorBasedThreadPoolEventDriven, + # Hawt, + # GlobalReactorBasedSingleThreadEventDriven, + # GlobalReactorBasedThreadPoolEventDriven, + # GlobalExecutorBasedEventDriven, GlobalHawt + keep-alive-ms = 60000 # Keep alive time for threads + core-pool-size-factor = 1.0 # No of core threads ... ceil(available processors * factor) + max-pool-size-factor = 4.0 # Max no of threads ... ceil(available processors * factor) + executor-bounds = -1 # Makes the Executor bounded, -1 is unbounded + allow-core-timeout = on # Allow core threads to time out rejection-policy = "caller-runs" # abort, caller-runs, discard-oldest, discard - throughput = 5 # Throughput for ExecutorBasedEventDrivenDispatcher - aggregate = off # Aggregate on/off for HawtDispatchers + throughput = 5 # Throughput for ExecutorBasedEventDrivenDispatcher + aggregate = off # Aggregate on/off for HawtDispatchers + mailbox-capacity = 100 # If negative (or zero) then an unbounded mailbox is used (default) + # If positive then a bounded mailbox is used and the capacity is set to the number specificed } } @@ -56,10 +63,10 @@ akka { service = on hostname = "localhost" port = 9998 - filters = ["se.scalablesolutions.akka.security.AkkaSecurityFilterFactory"] # List with all jersey filters to use + filters = ["se.scalablesolutions.akka.security.AkkaSecurityFilterFactory"] # List with all jersey filters to use resource_packages = ["sample.rest.scala", "sample.rest.java", - "sample.security"] # List with all resource packages for your Jersey services + "sample.security"] # List with all resource packages for your Jersey services authenticator = "sample.security.BasicAuthenticationService" # The authentication service to use. Need to be overridden (uses sample now) comet-dispatcher {