Cleanup and refactored code a bit and added javadoc to explain througput parameter in detail.

This commit is contained in:
Jan Van Besien 2010-06-02 16:56:36 +02:00
parent c268c899d6
commit 94d61d0e2a

View file

@ -54,6 +54,10 @@ import se.scalablesolutions.akka.actor.ActorRef
* the {@link se.scalablesolutions.akka.dispatch.Dispatchers} factory object. * the {@link se.scalablesolutions.akka.dispatch.Dispatchers} factory object.
* *
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a> * @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
* @param throughput positive integer indicates the dispatcher will only process so much messages at a time from the
* mailbox, without checking the mailboxes of other actors. Zero or negative means the dispatcher
* always continues until the mailbox is empty.
* Larger values (or zero or negative) increase througput, smaller values increase fairness
*/ */
class ExecutorBasedEventDrivenDispatcher(_name: String, throughput: Int = Dispatchers.THROUGHPUT) extends MessageDispatcher with ThreadPoolBuilder { class ExecutorBasedEventDrivenDispatcher(_name: String, throughput: Int = Dispatchers.THROUGHPUT) extends MessageDispatcher with ThreadPoolBuilder {
def this(_name: String) = this(_name, Dispatchers.THROUGHPUT) // Needed for Java API usage def this(_name: String) = this(_name, Dispatchers.THROUGHPUT) // Needed for Java API usage
@ -70,30 +74,20 @@ class ExecutorBasedEventDrivenDispatcher(_name: String, throughput: Int = Dispat
def run = { def run = {
var lockAcquiredOnce = false var lockAcquiredOnce = false
var finishedBeforeMailboxEmpty = false var finishedBeforeMailboxEmpty = false
// this do-while loop is required to prevent missing new messages between the end of the inner while
// loop and releasing the lock
val lock = receiver.dispatcherLock val lock = receiver.dispatcherLock
val mailbox = receiver.mailbox val mailbox = receiver.mailbox
// this do-while loop is required to prevent missing new messages between the end of the inner while
// loop and releasing the lock
do { do {
if (lock.tryLock) { if (lock.tryLock) {
// Only dispatch if we got the lock. Otherwise another thread is already dispatching.
lockAcquiredOnce = true lockAcquiredOnce = true
try { try {
// Only dispatch if we got the lock. Otherwise another thread is already dispatching. finishedBeforeMailboxEmpty = processMailbox(receiver)
var i = 0
var messageInvocation = mailbox.poll
while (messageInvocation != null) {
messageInvocation.invoke
i += 1
if (i < throughput)
messageInvocation = mailbox.poll
else {
finishedBeforeMailboxEmpty = !mailbox.isEmpty
messageInvocation = null
}
}
} finally { } finally {
lock.unlock lock.unlock
if (finishedBeforeMailboxEmpty) dispatch(receiver) if (finishedBeforeMailboxEmpty)
dispatch(receiver)
} }
} }
} while ((lockAcquiredOnce && !finishedBeforeMailboxEmpty && !mailbox.isEmpty)) } while ((lockAcquiredOnce && !finishedBeforeMailboxEmpty && !mailbox.isEmpty))
@ -101,6 +95,30 @@ class ExecutorBasedEventDrivenDispatcher(_name: String, throughput: Int = Dispat
}) })
} else throw new IllegalStateException("Can't submit invocations to dispatcher since it's not started") } else throw new IllegalStateException("Can't submit invocations to dispatcher since it's not started")
/**
* Process the messages in the mailbox of the given actor.
*
* @return true if the processing finished before the mailbox was empty, due to the throughput constraint
*/
def processMailbox(receiver: ActorRef): Boolean = {
var processedMessages = 0
var messageInvocation = receiver.mailbox.poll
while (messageInvocation != null) {
messageInvocation.invoke
processedMessages += 1
// check if we simply continue with other messages, or reached the throughput limit
if (throughput <= 0 || processedMessages < throughput)
messageInvocation = receiver.mailbox.poll
else {
return !receiver.mailbox.isEmpty
messageInvocation = null
}
}
return false
}
def start = if (!active) { def start = if (!active) {
log.debug("Starting ExecutorBasedEventDrivenDispatcher [%s]", name) log.debug("Starting ExecutorBasedEventDrivenDispatcher [%s]", name)
log.debug("Throughput for %s = %d", name, throughput) log.debug("Throughput for %s = %d", name, throughput)