Fixing mailboxSize for ThreadBasedDispatcher

This commit is contained in:
Viktor Klang 2010-09-03 11:43:58 +02:00
parent 3268b1771e
commit bd798abfbd
4 changed files with 13 additions and 8 deletions

View file

@ -96,7 +96,7 @@ trait MessageDispatcher extends Logging {
/**
* Returns the size of the mailbox for the specified actor
*/
def mailboxSize(actorRef: ActorRef):Int = 0
def mailboxSize(actorRef: ActorRef):Int
/**
* Creates and returns a mailbox for the given actor

View file

@ -11,6 +11,7 @@
package se.scalablesolutions.akka.dispatch
import java.util.{LinkedList, List}
import se.scalablesolutions.akka.actor.ActorRef
class ReactorBasedSingleThreadEventDrivenDispatcher(_name: String)
extends AbstractReactorBasedEventDrivenDispatcher("akka:event-driven:reactor:single-thread:dispatcher:" + _name) {
@ -39,6 +40,8 @@ class ReactorBasedSingleThreadEventDrivenDispatcher(_name: String)
selectorThread.start
}
def mailboxSize(a: ActorRef) = 0
def isShutdown = !active
override def toString = "ReactorBasedSingleThreadEventDrivenDispatcher[" + name + "]"

View file

@ -7,8 +7,7 @@ package se.scalablesolutions.akka.dispatch
import java.util.concurrent.locks.ReentrantLock
import java.util.{HashSet, HashMap, LinkedList, List}
import se.scalablesolutions.akka.actor.IllegalActorStateException
import se.scalablesolutions.akka.actor.{ActorRef, IllegalActorStateException}
/**
* Implements the Reactor pattern as defined in: [http://www.cs.wustl.edu/~schmidt/PDF/reactor-siemens.pdf].<br/>
@ -139,6 +138,8 @@ class ReactorBasedThreadPoolEventDrivenDispatcher(_name: String)
else nrOfBusyMessages < 100
}
def mailboxSize(a: ActorRef) = 0
def ensureNotActive(): Unit = if (active) throw new IllegalActorStateException(
"Can't build a new thread pool for a dispatcher that is already up and running")

View file

@ -44,6 +44,8 @@ class ThreadBasedDispatcher(private val actor: ActorRef,
def mailbox = actor.mailbox.asInstanceOf[ThreadMessageQueue]
def mailboxSize(a: ActorRef) = mailbox.size
def dispatch(invocation: MessageInvocation) = mailbox append invocation
def start = if (!active) {
@ -73,14 +75,13 @@ class ThreadBasedDispatcher(private val actor: ActorRef,
override def toString = "ThreadBasedDispatcher[" + threadName + "]"
}
trait ThreadMessageQueue extends MessageQueue { self: TransferQueue[MessageInvocation] =>
trait ThreadMessageQueue extends MessageQueue with TransferQueue[MessageInvocation] {
final def append(invocation: MessageInvocation): Unit = {
if(!self.tryTransfer(invocation)) { //First, try to send the invocation to a waiting consumer
if(!self.offer(invocation)) //If no consumer found, append it to the queue, if that fails, we're aborting
if(!tryTransfer(invocation)) { //First, try to send the invocation to a waiting consumer
if(!offer(invocation)) //If no consumer found, append it to the queue, if that fails, we're aborting
throw new MessageQueueAppendFailedException("BlockingMessageTransferQueue transfer timed out")
}
}
final def next: MessageInvocation = self.take
final def next: MessageInvocation = take
}