Merge remote branch 'upstream/master'

This commit is contained in:
David Greco 2010-09-15 15:33:49 +02:00
commit f7ae1ff431
4 changed files with 122 additions and 32 deletions

View file

@ -44,9 +44,10 @@ import se.scalablesolutions.akka.util.{Duration, Logging, UUID}
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
object Dispatchers extends Logging {
val THROUGHPUT = config.getInt("akka.actor.throughput", 5)
val MAILBOX_CAPACITY = config.getInt("akka.actor.default-dispatcher.mailbox-capacity", -1)
val MAILBOX_CONFIG = MailboxConfig(
val THROUGHPUT = config.getInt("akka.actor.throughput", 5)
val THROUGHPUT_DEADLINE_MS = config.getInt("akka.actor.throughput-deadline-ms",-1)
val MAILBOX_CAPACITY = config.getInt("akka.actor.default-dispatcher.mailbox-capacity", -1)
val MAILBOX_CONFIG = MailboxConfig(
capacity = Dispatchers.MAILBOX_CAPACITY,
pushTimeOut = config.getInt("akka.actor.default-dispatcher.mailbox-push-timeout-ms").map(Duration(_,TimeUnit.MILLISECONDS)),
blockingDequeue = false
@ -58,7 +59,7 @@ object Dispatchers extends Logging {
object globalHawtDispatcher extends HawtDispatcher
object globalExecutorBasedEventDrivenDispatcher extends ExecutorBasedEventDrivenDispatcher("global",THROUGHPUT,MAILBOX_CONFIG) {
object globalExecutorBasedEventDrivenDispatcher extends ExecutorBasedEventDrivenDispatcher("global",THROUGHPUT,THROUGHPUT_DEADLINE_MS,MAILBOX_CONFIG) {
override def register(actor: ActorRef) = {
if (isShutdown) init
super.register(actor)
@ -116,14 +117,14 @@ object Dispatchers extends Logging {
* <p/>
* Has a fluent builder interface for configuring its semantics.
*/
def newExecutorBasedEventDrivenDispatcher(name: String, throughput: Int, mailboxCapacity: Int) = new ExecutorBasedEventDrivenDispatcher(name, throughput, mailboxCapacity)
def newExecutorBasedEventDrivenDispatcher(name: String, throughput: Int, throughputDeadlineMs: Int, mailboxCapacity: Int) = new ExecutorBasedEventDrivenDispatcher(name, throughput, throughputDeadlineMs, mailboxCapacity)
/**
* Creates a executor-based event-driven dispatcher serving multiple (millions) of actors through a thread pool.
* <p/>
* Has a fluent builder interface for configuring its semantics.
*/
def newExecutorBasedEventDrivenDispatcher(name: String, throughput: Int, mailboxCapacity: Int, pushTimeOut: Duration) = new ExecutorBasedEventDrivenDispatcher(name, throughput, MailboxConfig(mailboxCapacity,Some(pushTimeOut),false))
def newExecutorBasedEventDrivenDispatcher(name: String, throughput: Int, throughputDeadlineMs: Int, mailboxCapacity: Int, pushTimeOut: Duration) = new ExecutorBasedEventDrivenDispatcher(name, throughput, throughputDeadlineMs, MailboxConfig(mailboxCapacity,Some(pushTimeOut),false))
/**
@ -198,13 +199,28 @@ object Dispatchers extends Logging {
}
val dispatcher: Option[MessageDispatcher] = cfg.getString("type") map {
case "ExecutorBasedEventDrivenWorkStealing" => new ExecutorBasedEventDrivenWorkStealingDispatcher(name,MAILBOX_CAPACITY,threadPoolConfig)
case "ExecutorBasedEventDriven" => new ExecutorBasedEventDrivenDispatcher(name, cfg.getInt("throughput",THROUGHPUT),mailboxBounds,threadPoolConfig)
case "Hawt" => new HawtDispatcher(cfg.getBool("aggregate").getOrElse(true))
case "GlobalExecutorBasedEventDriven" => globalExecutorBasedEventDrivenDispatcher
case "GlobalHawt" => globalHawtDispatcher
case "ExecutorBasedEventDrivenWorkStealing" =>
new ExecutorBasedEventDrivenWorkStealingDispatcher(name,MAILBOX_CAPACITY,threadPoolConfig)
case "ExecutorBasedEventDriven" =>
new ExecutorBasedEventDrivenDispatcher(
name,
cfg.getInt("throughput",THROUGHPUT),
cfg.getInt("throughput-deadline-ms",THROUGHPUT_DEADLINE_MS),
mailboxBounds,
threadPoolConfig)
case unknown => throw new IllegalArgumentException("Unknown dispatcher type [%s]" format unknown)
case "Hawt" =>
new HawtDispatcher(cfg.getBool("aggregate").getOrElse(true))
case "GlobalExecutorBasedEventDriven" =>
globalExecutorBasedEventDrivenDispatcher
case "GlobalHawt" =>
globalHawtDispatcher
case unknown =>
throw new IllegalArgumentException("Unknown dispatcher type [%s]" format unknown)
}
dispatcher

View file

@ -65,12 +65,13 @@ import java.util.concurrent.{RejectedExecutionException, ConcurrentLinkedQueue,
class ExecutorBasedEventDrivenDispatcher(
_name: String,
val throughput: Int = Dispatchers.THROUGHPUT,
val throughputDeadlineMs: Int = Dispatchers.THROUGHPUT_DEADLINE_MS,
mailboxConfig: MailboxConfig = Dispatchers.MAILBOX_CONFIG,
config: (ThreadPoolBuilder) => Unit = _ => ()) extends MessageDispatcher with ThreadPoolBuilder {
def this(_name: String, throughput: Int, capacity: Int) = this(_name,throughput,MailboxConfig(capacity,None,false))
def this(_name: String, throughput: Int) = this(_name, throughput, Dispatchers.MAILBOX_CAPACITY) // Needed for Java API usage
def this(_name: String) = this(_name,Dispatchers.THROUGHPUT,Dispatchers.MAILBOX_CAPACITY) // Needed for Java API usage
def this(_name: String, throughput: Int, throughputDeadlineMs: Int, capacity: Int) = this(_name,throughput,throughputDeadlineMs,MailboxConfig(capacity,None,false))
def this(_name: String, throughput: Int) = this(_name, throughput, Dispatchers.THROUGHPUT_DEADLINE_MS, Dispatchers.MAILBOX_CAPACITY) // Needed for Java API usage
def this(_name: String) = this(_name,Dispatchers.THROUGHPUT, Dispatchers.THROUGHPUT_DEADLINE_MS,Dispatchers.MAILBOX_CAPACITY) // Needed for Java API usage
//FIXME remove this from ThreadPoolBuilder
mailboxCapacity = mailboxConfig.capacity
@ -102,24 +103,28 @@ class ExecutorBasedEventDrivenDispatcher(
* @return true if the processing finished before the mailbox was empty, due to the throughput constraint
*/
final def processMailbox(): Boolean = {
val throttle = throughput > 0
var processedMessages = 0
var nextMessage = self.dequeue
if (nextMessage ne null) {
do {
nextMessage.invoke
var nextMessage = self.dequeue
if (nextMessage ne null) {
val throttle = throughput > 0
var processedMessages = 0
val isDeadlineEnabled = throttle && throughputDeadlineMs > 0
val started = if (isDeadlineEnabled) System.currentTimeMillis else 0
if(throttle) { //Will be elided when false
processedMessages += 1
if (processedMessages >= throughput) //If we're throttled, break out
return !self.isEmpty
}
nextMessage = self.dequeue
}
while (nextMessage ne null)
}
do {
nextMessage.invoke
false
if(throttle) { //Will be elided when false
processedMessages += 1
if ((processedMessages >= throughput)
|| (isDeadlineEnabled && (System.currentTimeMillis - started) >= throughputDeadlineMs)) //If we're throttled, break out
return !self.isEmpty
}
nextMessage = self.dequeue
}
while (nextMessage ne null)
}
false
}
}

View file

@ -3,9 +3,10 @@ package se.scalablesolutions.akka.actor.dispatch
import java.util.concurrent.{CountDownLatch, TimeUnit}
import org.scalatest.junit.JUnitSuite
import org.junit.Test
import se.scalablesolutions.akka.dispatch.Dispatchers
import se.scalablesolutions.akka.dispatch.{Dispatchers,ExecutorBasedEventDrivenDispatcher}
import se.scalablesolutions.akka.actor.Actor
import Actor._
import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
object ExecutorBasedEventDrivenDispatcherActorSpec {
class TestActor extends Actor {
@ -65,4 +66,70 @@ class ExecutorBasedEventDrivenDispatcherActorSpec extends JUnitSuite {
}
actor.stop
}
@Test def shouldRespectThroughput {
val throughputDispatcher = new ExecutorBasedEventDrivenDispatcher("THROUGHPUT",101,0,Dispatchers.MAILBOX_CONFIG, (e) => {
e.setCorePoolSize(1)
})
val works = new AtomicBoolean(true)
val latch = new CountDownLatch(100)
val start = new CountDownLatch(1)
val fastOne = actorOf(
new Actor {
self.dispatcher = throughputDispatcher
def receive = { case "sabotage" => works.set(false) }
}).start
val slowOne = actorOf(
new Actor {
self.dispatcher = throughputDispatcher
def receive = {
case "hogexecutor" => start.await
case "ping" => if (works.get) latch.countDown
}
}).start
slowOne ! "hogexecutor"
(1 to 100) foreach { _ => slowOne ! "ping"}
fastOne ! "sabotage"
start.countDown
val result = latch.await(3,TimeUnit.SECONDS)
fastOne.stop
slowOne.stop
throughputDispatcher.shutdown
assert(result === true)
}
@Test def shouldRespectThroughputDeadline {
val deadlineMs = 100
val throughputDispatcher = new ExecutorBasedEventDrivenDispatcher("THROUGHPUT",2,deadlineMs,Dispatchers.MAILBOX_CONFIG, (e) => {
e.setCorePoolSize(1)
})
val works = new AtomicBoolean(true)
val latch = new CountDownLatch(1)
val start = new CountDownLatch(1)
val fastOne = actorOf(
new Actor {
self.dispatcher = throughputDispatcher
def receive = { case "ping" => if(works.get) latch.countDown; self.stop }
}).start
val slowOne = actorOf(
new Actor {
self.dispatcher = throughputDispatcher
def receive = {
case "hogexecutor" => start.await
case "ping" => works.set(false); self.stop
}
}).start
slowOne ! "hogexecutor"
slowOne ! "ping"
fastOne ! "ping"
Thread.sleep(deadlineMs)
start.countDown
assert(latch.await(10,TimeUnit.SECONDS) === true)
}
}

View file

@ -25,6 +25,7 @@ akka {
# - TypedActor: methods with non-void return type
serialize-messages = off # Does a deep clone of (non-primitive) messages to ensure immutability
throughput = 5 # Default throughput for all ExecutorBasedEventDrivenDispatcher, set to 1 for complete fairness
throughput-deadline-ms = -1 # Default throughput deadline for all ExecutorBasedEventDrivenDispatcher, set to 0 or negative for no deadline
default-dispatcher {
type = "GlobalExecutorBasedEventDriven" # Must be one of the following, all "Global*" are non-configurable
@ -44,6 +45,7 @@ akka {
allow-core-timeout = on # Allow core threads to time out
rejection-policy = "caller-runs" # abort, caller-runs, discard-oldest, discard
throughput = 5 # Throughput for ExecutorBasedEventDrivenDispatcher, set to 1 for complete fairness
throughput-deadline-ms = -1 # Throughput deadline for ExecutorBasedEventDrivenDispatcher, set to 0 or negative for no deadline
aggregate = off # Aggregate on/off for HawtDispatchers
mailbox-capacity = -1 # If negative (or zero) then an unbounded mailbox is used (default)
# If positive then a bounded mailbox is used and the capacity is set using the property