Added support hook for persistent mailboxes + cleanup and optimizations

This commit is contained in:
Viktor Klang 2010-09-01 16:33:56 +02:00
parent 176770c077
commit e1ed8a6734
8 changed files with 43 additions and 43 deletions

View file

@ -12,22 +12,11 @@ import se.scalablesolutions.akka.actor.{Actor, ActorRef}
abstract class AbstractReactorBasedEventDrivenDispatcher(val name: String) extends MessageDispatcher {
@volatile protected var active: Boolean = false
protected val queue = new ReactiveMessageQueue(name)
protected val messageInvokers = new HashMap[ActorRef, ActorRef]
protected var selectorThread: Thread = _
protected val guard = new Object
def dispatch(invocation: MessageInvocation) = queue.append(invocation)
override def register(actorRef: ActorRef) = synchronized {
messageInvokers.put(actorRef, actorRef)
super.register(actorRef)
}
override def unregister(actorRef: ActorRef) = synchronized {
messageInvokers.remove(actorRef)
super.unregister(actorRef)
}
def shutdown = if (active) {
log.debug("Shutting down %s", toString)
active = false

View file

@ -89,12 +89,9 @@ class ExecutorBasedEventDrivenDispatcher(
override def mailboxSize(actorRef: ActorRef) = getMailbox(actorRef).size
override def register(actorRef: ActorRef) = {
if (actorRef.mailbox eq null ) {
if (mailboxCapacity <= 0) actorRef.mailbox = new ConcurrentLinkedQueue[MessageInvocation]
else actorRef.mailbox = new LinkedBlockingQueue[MessageInvocation](mailboxCapacity)
}
super.register(actorRef)
override def createMailbox(actorRef: ActorRef): AnyRef = {
if (mailboxCapacity <= 0) new ConcurrentLinkedQueue[MessageInvocation]
else new LinkedBlockingQueue[MessageInvocation](mailboxCapacity)
}
def dispatch(receiver: ActorRef): Unit = if (active) {

View file

@ -182,13 +182,13 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher(
private[akka] def init = withNewThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity.buildThreadPool
protected override def createMailbox(actorRef: ActorRef): AnyRef = {
if (mailboxCapacity <= 0) new ConcurrentLinkedDeque[MessageInvocation]
else new LinkedBlockingDeque[MessageInvocation](mailboxCapacity)
}
override def register(actorRef: ActorRef) = {
verifyActorsAreOfSameType(actorRef)
// The actor will need a ConcurrentLinkedDeque based mailbox
if (actorRef.mailbox == null) {
if (mailboxCapacity <= 0) actorRef.mailbox = new ConcurrentLinkedDeque[MessageInvocation]
else actorRef.mailbox = new LinkedBlockingDeque[MessageInvocation](mailboxCapacity)
}
pooledActors.add(actorRef)
super.register(actorRef)
}

View file

@ -185,16 +185,10 @@ class HawtDispatcher(val aggregate:Boolean=true, val parent:DispatchQueue=global
// TODO: figure out if this can be optional in akka
override def mailboxSize(actorRef: ActorRef) = 0
override def register(actorRef: ActorRef) = {
if( actorRef.mailbox == null ) {
val queue = parent.createSerialQueue(actorRef.toString)
if( aggregate ) {
actorRef.mailbox = new AggregatingHawtDispatcherMailbox(queue)
} else {
actorRef.mailbox = new HawtDispatcherMailbox(queue)
}
}
super.register(actorRef)
override def createMailbox(actorRef: ActorRef): AnyRef = {
val queue = parent.createSerialQueue(actorRef.toString)
if (aggregate) new AggregatingHawtDispatcherMailbox(queue)
else new HawtDispatcherMailbox(queue)
}
override def toString = "HawtDispatchEventDrivenDispatcher"

View file

@ -71,21 +71,41 @@ trait MessageQueue {
*/
trait MessageDispatcher extends Logging {
protected val uuids = new ConcurrentSkipListSet[String]
def dispatch(invocation: MessageInvocation)
def start
def shutdown
def register(actorRef: ActorRef) = uuids add actorRef.uuid
def register(actorRef: ActorRef) {
if(actorRef.mailbox eq null)
actorRef.mailbox = createMailbox(actorRef)
uuids add actorRef.uuid
}
def unregister(actorRef: ActorRef) = {
uuids remove actorRef.uuid
//actorRef.mailbox = null //FIXME should we null out the mailbox here?
if (canBeShutDown) shutdown // shut down in the dispatcher's references is zero
}
def canBeShutDown: Boolean = uuids.isEmpty
def isShutdown: Boolean
/**
* Returns the size of the mailbox for the specified actor
*/
def mailboxSize(actorRef: ActorRef):Int = 0
/**
* Creates and returns a mailbox for the given actor
*/
protected def createMailbox(actorRef: ActorRef): AnyRef = null
}
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
trait MessageDemultiplexer {
def select

View file

@ -29,8 +29,8 @@ class ReactorBasedSingleThreadEventDrivenDispatcher(_name: String)
val iter = selectedInvocations.iterator
while (iter.hasNext) {
val invocation = iter.next
val invoker = messageInvokers.get(invocation.receiver)
if (invoker ne null) invoker.invoke(invocation)
val invoker = invocation.receiver
if (invoker ne null) invoker invoke invocation
iter.remove
}
}

View file

@ -103,14 +103,14 @@ class ReactorBasedThreadPoolEventDrivenDispatcher(_name: String)
private def process(selectedInvocations: List[MessageInvocation]) = synchronized {
var nrOfBusyMessages = 0
val totalNrOfActors = messageInvokers.size
val totalNrOfActors = uuids.size
val totalNrOfBusyActors = busyActors.size
val invocations = selectedInvocations.iterator
while (invocations.hasNext && totalNrOfActors > totalNrOfBusyActors && passFairnessCheck(nrOfBusyMessages)) {
val invocation = invocations.next
if (invocation eq null) throw new IllegalActorStateException("Message invocation is null [" + invocation + "]")
if (!busyActors.contains(invocation.receiver)) {
val invoker = messageInvokers.get(invocation.receiver)
val invoker = invocation.receiver
if (invoker eq null) throw new IllegalActorStateException(
"Message invoker for invocation [" + invocation + "] is null")
resume(invocation.receiver)

View file

@ -28,11 +28,11 @@ class ThreadBasedDispatcher(private val actor: ActorRef,
private var selectorThread: Thread = _
@volatile private var active: Boolean = false
if (actor.mailbox eq null) {
actor.mailbox = if (mailboxCapacity > 0)
new BoundedTransferQueue[MessageInvocation](mailboxCapacity,pushTimeout,pushTimeoutUnit) with ThreadMessageQueue
else
new LinkedTransferQueue[MessageInvocation] with ThreadMessageQueue
override def createMailbox(actorRef: ActorRef): AnyRef = {
if (mailboxCapacity > 0)
new BoundedTransferQueue[MessageInvocation](mailboxCapacity,pushTimeout,pushTimeoutUnit) with ThreadMessageQueue
else
new LinkedTransferQueue[MessageInvocation] with ThreadMessageQueue
}
override def register(actorRef: ActorRef) = {