diff --git a/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala b/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala index dbbfe3442a..bbe4b53cde 100644 --- a/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala +++ b/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala @@ -64,7 +64,7 @@ import java.util.concurrent.{ConcurrentLinkedQueue, LinkedBlockingQueue} */ class ExecutorBasedEventDrivenDispatcher( _name: String, - throughput: Int = Dispatchers.THROUGHPUT, + val throughput: Int = Dispatchers.THROUGHPUT, mailboxConfig: MailboxConfig = Dispatchers.MAILBOX_CONFIG, config: (ThreadPoolBuilder) => Unit = _ => ()) extends MessageDispatcher with ThreadPoolBuilder { @@ -109,7 +109,7 @@ class ExecutorBasedEventDrivenDispatcher( // Only dispatch if we got the lock. Otherwise another thread is already dispatching. lockAcquiredOnce = true try { - finishedBeforeMailboxEmpty = processMailbox(receiver) + finishedBeforeMailboxEmpty = processMailbox(receiver,mailbox) } finally { lock.unlock if (finishedBeforeMailboxEmpty) dispatch(receiver) @@ -128,20 +128,24 @@ class ExecutorBasedEventDrivenDispatcher( * * @return true if the processing finished before the mailbox was empty, due to the throughput constraint */ - def processMailbox(receiver: ActorRef): Boolean = { + def processMailbox(receiver: ActorRef,mailbox: MessageQueue): Boolean = { + val throttle = throughput > 0 var processedMessages = 0 - val mailbox = getMailbox(receiver) - var messageInvocation = mailbox.dequeue - while (messageInvocation != null) { - messageInvocation.invoke - processedMessages += 1 - // check if we simply continue with other messages, or reached the throughput limit - if (throughput <= 0 || processedMessages < throughput) messageInvocation = mailbox.dequeue - else { - messageInvocation = null - return !mailbox.isEmpty + var nextMessage = mailbox.dequeue + if (nextMessage ne null) { + do { + nextMessage.invoke + + if(throttle) { //Will be JIT:Ed away when false + processedMessages += 1 + if (processedMessages >= throughput) //If we're throttled, break out + return !mailbox.isEmpty + } + nextMessage = mailbox.dequeue } + while (nextMessage ne null) } + false }