1 entry per mailbox at most

This commit is contained in:
Viktor Klang 2010-09-11 15:24:09 +02:00
parent 94ad3f9ff2
commit c3d66ed3d7
4 changed files with 92 additions and 69 deletions

View file

@ -7,7 +7,7 @@ package se.scalablesolutions.akka.dispatch
import se.scalablesolutions.akka.actor.{ActorRef, IllegalActorStateException}
import java.util.Queue
import java.util.concurrent.{ConcurrentLinkedQueue, LinkedBlockingQueue}
import java.util.concurrent.{RejectedExecutionException, ConcurrentLinkedQueue, LinkedBlockingQueue}
/**
* Default settings are:
@ -80,6 +80,52 @@ class ExecutorBasedEventDrivenDispatcher(
val name = "akka:event-driven:dispatcher:" + _name
init
/**
* This is the behavior of an ExecutorBasedEventDrivenDispatchers mailbox
*/
trait ExecutableMailbox { self: MessageQueue with Runnable =>
def run = {
try {
val reDispatch = processMailbox()//Returns true if we need to reschedule the processing
self.dispatcherLock.unlock() //Unlock to give a chance for someone else to schedule processing
if (reDispatch)
dispatch(self)
} catch {
case e =>
dispatcherLock.unlock() //Unlock to give a chance for someone else to schedule processing
if(!self.isEmpty) //If the mailbox isn't empty, try to re-schedule processing, equivalent to reDispatch
dispatch(self)
throw e //Can't just swallow exceptions or errors
}
}
/**
* Process the messages in the mailbox
*
* @return true if the processing finished before the mailbox was empty, due to the throughput constraint
*/
def processMailbox(): Boolean = {
val throttle = throughput > 0
var processedMessages = 0
var nextMessage = self.dequeue
if (nextMessage ne null) {
do {
nextMessage.invoke
if(throttle) { //Will be elided when false
processedMessages += 1
if (processedMessages >= throughput) //If we're throttled, break out
return !self.isEmpty
}
nextMessage = self.dequeue
}
while (nextMessage ne null)
}
false
}
}
def dispatch(invocation: MessageInvocation) = {
val mbox = getMailbox(invocation.receiver)
mbox enqueue invocation
@ -93,56 +139,18 @@ class ExecutorBasedEventDrivenDispatcher(
override def mailboxSize(actorRef: ActorRef) = getMailbox(actorRef).size
override def createMailbox(actorRef: ActorRef): AnyRef = new DefaultMessageQueue(mailboxCapacity,mailboxConfig.pushTimeOut,false) with Runnable {
def run = {
var lockAcquiredOnce = false
var finishedBeforeMailboxEmpty = false
// this do-while loop is required to prevent missing new messages between the end of the inner while
// loop and releasing the lock
do {
if (dispatcherLock.tryLock()) {
// Only dispatch if we got the lock. Otherwise another thread is already dispatching.
lockAcquiredOnce = true
try {
finishedBeforeMailboxEmpty = processMailbox()
} finally {
dispatcherLock.unlock()
if (finishedBeforeMailboxEmpty)
dispatch(this)
}
}
} while ((lockAcquiredOnce && !finishedBeforeMailboxEmpty && !this.isEmpty))
}
/**
* Process the messages in the mailbox
*
* @return true if the processing finished before the mailbox was empty, due to the throughput constraint
*/
def processMailbox(): Boolean = {
val throttle = throughput > 0
var processedMessages = 0
var nextMessage = this.dequeue
if (nextMessage ne null) {
do {
nextMessage.invoke
if(throttle) { //Will be JIT:Ed away when false
processedMessages += 1
if (processedMessages >= throughput) //If we're throttled, break out
return !this.isEmpty
}
nextMessage = this.dequeue
}
while (nextMessage ne null)
}
false
}
}
override def createMailbox(actorRef: ActorRef): AnyRef =
if (mailboxCapacity > 0) new DefaultBoundedMessageQueue(mailboxCapacity,mailboxConfig.pushTimeOut,blockDequeue = false) with Runnable with ExecutableMailbox
else new DefaultUnboundedMessageQueue(blockDequeue = false) with Runnable with ExecutableMailbox
def dispatch(mailbox: MessageQueue with Runnable): Unit = if (active) {
executor.execute(mailbox)
if (mailbox.dispatcherLock.tryLock()) {//Ensure that only one runnable can be in the executor pool at the same time
try {
executor execute mailbox
} catch {
case e: RejectedExecutionException => mailbox.dispatcherLock.unlock()
}
}
} else {
log.warning("%s is shut down,\n\tignoring the rest of the messages in the mailbox of\n\t%s", toString, mailbox)
}