ExecutorBasedEventDrivenDispatcher now works and unit tests are added

This commit is contained in:
Viktor Klang 2010-09-09 15:49:59 +02:00
parent af737972e2
commit c9ad9b5d49
3 changed files with 61 additions and 6 deletions

View file

@ -72,7 +72,7 @@ class ExecutorBasedEventDrivenDispatcher(
def this(_name: String, throughput: Int) = this(_name, throughput, Dispatchers.MAILBOX_CAPACITY) // Needed for Java API usage
def this(_name: String) = this(_name,Dispatchers.THROUGHPUT,Dispatchers.MAILBOX_CAPACITY) // Needed for Java API usage
//FIXME remove this from ThreadPoolBuilder
mailboxCapacity = mailboxConfig.capacity
@volatile private var active: Boolean = false
@ -81,18 +81,18 @@ class ExecutorBasedEventDrivenDispatcher(
init
def dispatch(invocation: MessageInvocation) = {
getMailbox(invocation.receiver).add(invocation)
getMailbox(invocation.receiver) enqueue invocation
dispatch(invocation.receiver)
}
/**
* @return the mailbox associated with the actor
*/
private def getMailbox(receiver: ActorRef) = receiver.mailbox.asInstanceOf[Queue[MessageInvocation]]
private def getMailbox(receiver: ActorRef) = receiver.mailbox.asInstanceOf[MessageQueue]
override def mailboxSize(actorRef: ActorRef) = getMailbox(actorRef).size
override def createMailbox(actorRef: ActorRef): AnyRef = mailboxConfig.newMailbox(bounds = mailboxCapacity)
override def createMailbox(actorRef: ActorRef): AnyRef = mailboxConfig.newMailbox(bounds = mailboxCapacity, blockDequeue = false)
def dispatch(receiver: ActorRef): Unit = if (active) {
@ -131,12 +131,12 @@ class ExecutorBasedEventDrivenDispatcher(
def processMailbox(receiver: ActorRef): Boolean = {
var processedMessages = 0
val mailbox = getMailbox(receiver)
var messageInvocation = mailbox.poll
var messageInvocation = mailbox.dequeue
while (messageInvocation != null) {
messageInvocation.invoke
processedMessages += 1
// check if we simply continue with other messages, or reached the throughput limit
if (throughput <= 0 || processedMessages < throughput) messageInvocation = mailbox.poll
if (throughput <= 0 || processedMessages < throughput) messageInvocation = mailbox.dequeue
else {
messageInvocation = null
return !mailbox.isEmpty