diff --git a/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala b/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala index 185c28ff5b..ccbb9edc94 100644 --- a/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala +++ b/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala @@ -54,6 +54,10 @@ import se.scalablesolutions.akka.actor.ActorRef * the {@link se.scalablesolutions.akka.dispatch.Dispatchers} factory object. * * @author Jonas Bonér + * @param throughput positive integer indicates the dispatcher will only process so much messages at a time from the + * mailbox, without checking the mailboxes of other actors. Zero or negative means the dispatcher + * always continues until the mailbox is empty. + * Larger values (or zero or negative) increase througput, smaller values increase fairness */ class ExecutorBasedEventDrivenDispatcher(_name: String, throughput: Int = Dispatchers.THROUGHPUT) extends MessageDispatcher with ThreadPoolBuilder { def this(_name: String) = this(_name, Dispatchers.THROUGHPUT) // Needed for Java API usage @@ -70,30 +74,20 @@ class ExecutorBasedEventDrivenDispatcher(_name: String, throughput: Int = Dispat def run = { var lockAcquiredOnce = false var finishedBeforeMailboxEmpty = false - // this do-while loop is required to prevent missing new messages between the end of the inner while - // loop and releasing the lock val lock = receiver.dispatcherLock val mailbox = receiver.mailbox + // this do-while loop is required to prevent missing new messages between the end of the inner while + // loop and releasing the lock do { if (lock.tryLock) { + // Only dispatch if we got the lock. Otherwise another thread is already dispatching. lockAcquiredOnce = true try { - // Only dispatch if we got the lock. Otherwise another thread is already dispatching. - var i = 0 - var messageInvocation = mailbox.poll - while (messageInvocation != null) { - messageInvocation.invoke - i += 1 - if (i < throughput) - messageInvocation = mailbox.poll - else { - finishedBeforeMailboxEmpty = !mailbox.isEmpty - messageInvocation = null - } - } + finishedBeforeMailboxEmpty = processMailbox(receiver) } finally { lock.unlock - if (finishedBeforeMailboxEmpty) dispatch(receiver) + if (finishedBeforeMailboxEmpty) + dispatch(receiver) } } } while ((lockAcquiredOnce && !finishedBeforeMailboxEmpty && !mailbox.isEmpty)) @@ -101,6 +95,30 @@ class ExecutorBasedEventDrivenDispatcher(_name: String, throughput: Int = Dispat }) } else throw new IllegalStateException("Can't submit invocations to dispatcher since it's not started") + + /** + * Process the messages in the mailbox of the given actor. + * + * @return true if the processing finished before the mailbox was empty, due to the throughput constraint + */ + def processMailbox(receiver: ActorRef): Boolean = { + var processedMessages = 0 + var messageInvocation = receiver.mailbox.poll + while (messageInvocation != null) { + messageInvocation.invoke + processedMessages += 1 + // check if we simply continue with other messages, or reached the throughput limit + if (throughput <= 0 || processedMessages < throughput) + messageInvocation = receiver.mailbox.poll + else { + return !receiver.mailbox.isEmpty + messageInvocation = null + } + } + + return false + } + def start = if (!active) { log.debug("Starting ExecutorBasedEventDrivenDispatcher [%s]", name) log.debug("Throughput for %s = %d", name, throughput)