Added support for throughput deadlines

This commit is contained in:
Viktor Klang 2010-09-15 15:10:47 +02:00
parent e976457b11
commit 496a8b691b
3 changed files with 120 additions and 32 deletions

View file

@ -65,12 +65,13 @@ import java.util.concurrent.{RejectedExecutionException, ConcurrentLinkedQueue,
class ExecutorBasedEventDrivenDispatcher(
_name: String,
val throughput: Int = Dispatchers.THROUGHPUT,
val throughputDeadlineMs: Int = Dispatchers.THROUGHPUT_DEADLINE_MS,
mailboxConfig: MailboxConfig = Dispatchers.MAILBOX_CONFIG,
config: (ThreadPoolBuilder) => Unit = _ => ()) extends MessageDispatcher with ThreadPoolBuilder {
def this(_name: String, throughput: Int, capacity: Int) = this(_name,throughput,MailboxConfig(capacity,None,false))
def this(_name: String, throughput: Int) = this(_name, throughput, Dispatchers.MAILBOX_CAPACITY) // Needed for Java API usage
def this(_name: String) = this(_name,Dispatchers.THROUGHPUT,Dispatchers.MAILBOX_CAPACITY) // Needed for Java API usage
def this(_name: String, throughput: Int, throughputDeadlineMs: Int, capacity: Int) = this(_name,throughput,throughputDeadlineMs,MailboxConfig(capacity,None,false))
def this(_name: String, throughput: Int) = this(_name, throughput, Dispatchers.THROUGHPUT_DEADLINE_MS, Dispatchers.MAILBOX_CAPACITY) // Needed for Java API usage
def this(_name: String) = this(_name,Dispatchers.THROUGHPUT, Dispatchers.THROUGHPUT_DEADLINE_MS,Dispatchers.MAILBOX_CAPACITY) // Needed for Java API usage
//FIXME remove this from ThreadPoolBuilder
mailboxCapacity = mailboxConfig.capacity
@ -102,24 +103,28 @@ class ExecutorBasedEventDrivenDispatcher(
* @return true if the processing finished before the mailbox was empty, due to the throughput constraint
*/
final def processMailbox(): Boolean = {
val throttle = throughput > 0
var processedMessages = 0
var nextMessage = self.dequeue
if (nextMessage ne null) {
do {
nextMessage.invoke
var nextMessage = self.dequeue
if (nextMessage ne null) {
val throttle = throughput > 0
var processedMessages = 0
val isDeadlineEnabled = throttle && throughputDeadlineMs > 0
val started = if (isDeadlineEnabled) System.currentTimeMillis else 0
if(throttle) { //Will be elided when false
processedMessages += 1
if (processedMessages >= throughput) //If we're throttled, break out
return !self.isEmpty
}
nextMessage = self.dequeue
}
while (nextMessage ne null)
}
do {
nextMessage.invoke
false
if(throttle) { //Will be elided when false
processedMessages += 1
if ((processedMessages >= throughput)
|| (isDeadlineEnabled && (System.currentTimeMillis - started) >= throughputDeadlineMs)) //If we're throttled, break out
return !self.isEmpty
}
nextMessage = self.dequeue
}
while (nextMessage ne null)
}
false
}
}