diff --git a/akka-actor/src/main/scala/dispatch/AbstractReactorBasedEventDrivenDispatcher.scala b/akka-actor/src/main/scala/dispatch/AbstractReactorBasedEventDrivenDispatcher.scala index 6bacec73be..24c566b48c 100644 --- a/akka-actor/src/main/scala/dispatch/AbstractReactorBasedEventDrivenDispatcher.scala +++ b/akka-actor/src/main/scala/dispatch/AbstractReactorBasedEventDrivenDispatcher.scala @@ -12,22 +12,11 @@ import se.scalablesolutions.akka.actor.{Actor, ActorRef} abstract class AbstractReactorBasedEventDrivenDispatcher(val name: String) extends MessageDispatcher { @volatile protected var active: Boolean = false protected val queue = new ReactiveMessageQueue(name) - protected val messageInvokers = new HashMap[ActorRef, ActorRef] protected var selectorThread: Thread = _ protected val guard = new Object def dispatch(invocation: MessageInvocation) = queue.append(invocation) - override def register(actorRef: ActorRef) = synchronized { - messageInvokers.put(actorRef, actorRef) - super.register(actorRef) - } - - override def unregister(actorRef: ActorRef) = synchronized { - messageInvokers.remove(actorRef) - super.unregister(actorRef) - } - def shutdown = if (active) { log.debug("Shutting down %s", toString) active = false diff --git a/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala b/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala index 1f8a6bfe9c..5f8469eb84 100644 --- a/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala +++ b/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala @@ -89,12 +89,9 @@ class ExecutorBasedEventDrivenDispatcher( override def mailboxSize(actorRef: ActorRef) = getMailbox(actorRef).size - override def register(actorRef: ActorRef) = { - if (actorRef.mailbox eq null ) { - if (mailboxCapacity <= 0) actorRef.mailbox = new ConcurrentLinkedQueue[MessageInvocation] - else actorRef.mailbox = new LinkedBlockingQueue[MessageInvocation](mailboxCapacity) - } - super.register(actorRef) + override def createMailbox(actorRef: ActorRef): AnyRef = { + if (mailboxCapacity <= 0) new ConcurrentLinkedQueue[MessageInvocation] + else new LinkedBlockingQueue[MessageInvocation](mailboxCapacity) } def dispatch(receiver: ActorRef): Unit = if (active) { diff --git a/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala b/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala index 4e5d626aed..f9409e91fb 100644 --- a/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala +++ b/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala @@ -182,13 +182,13 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher( private[akka] def init = withNewThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity.buildThreadPool + protected override def createMailbox(actorRef: ActorRef): AnyRef = { + if (mailboxCapacity <= 0) new ConcurrentLinkedDeque[MessageInvocation] + else new LinkedBlockingDeque[MessageInvocation](mailboxCapacity) + } + override def register(actorRef: ActorRef) = { verifyActorsAreOfSameType(actorRef) - // The actor will need a ConcurrentLinkedDeque based mailbox - if (actorRef.mailbox == null) { - if (mailboxCapacity <= 0) actorRef.mailbox = new ConcurrentLinkedDeque[MessageInvocation] - else actorRef.mailbox = new LinkedBlockingDeque[MessageInvocation](mailboxCapacity) - } pooledActors.add(actorRef) super.register(actorRef) } diff --git a/akka-actor/src/main/scala/dispatch/HawtDispatcher.scala b/akka-actor/src/main/scala/dispatch/HawtDispatcher.scala index e0ddf05d26..cf3f71295c 100644 --- a/akka-actor/src/main/scala/dispatch/HawtDispatcher.scala +++ b/akka-actor/src/main/scala/dispatch/HawtDispatcher.scala @@ -185,16 +185,10 @@ class HawtDispatcher(val aggregate:Boolean=true, val parent:DispatchQueue=global // TODO: figure out if this can be optional in akka override def mailboxSize(actorRef: ActorRef) = 0 - override def register(actorRef: ActorRef) = { - if( actorRef.mailbox == null ) { - val queue = parent.createSerialQueue(actorRef.toString) - if( aggregate ) { - actorRef.mailbox = new AggregatingHawtDispatcherMailbox(queue) - } else { - actorRef.mailbox = new HawtDispatcherMailbox(queue) - } - } - super.register(actorRef) + override def createMailbox(actorRef: ActorRef): AnyRef = { + val queue = parent.createSerialQueue(actorRef.toString) + if (aggregate) new AggregatingHawtDispatcherMailbox(queue) + else new HawtDispatcherMailbox(queue) } override def toString = "HawtDispatchEventDrivenDispatcher" diff --git a/akka-actor/src/main/scala/dispatch/MessageHandling.scala b/akka-actor/src/main/scala/dispatch/MessageHandling.scala index 395c572f0e..49f4cc3839 100644 --- a/akka-actor/src/main/scala/dispatch/MessageHandling.scala +++ b/akka-actor/src/main/scala/dispatch/MessageHandling.scala @@ -71,21 +71,41 @@ trait MessageQueue { */ trait MessageDispatcher extends Logging { protected val uuids = new ConcurrentSkipListSet[String] + def dispatch(invocation: MessageInvocation) + def start + def shutdown - def register(actorRef: ActorRef) = uuids add actorRef.uuid + + def register(actorRef: ActorRef) { + if(actorRef.mailbox eq null) + actorRef.mailbox = createMailbox(actorRef) + uuids add actorRef.uuid + } def unregister(actorRef: ActorRef) = { uuids remove actorRef.uuid + //actorRef.mailbox = null //FIXME should we null out the mailbox here? if (canBeShutDown) shutdown // shut down in the dispatcher's references is zero } + def canBeShutDown: Boolean = uuids.isEmpty + def isShutdown: Boolean + + /** + * Returns the size of the mailbox for the specified actor + */ def mailboxSize(actorRef: ActorRef):Int = 0 + + /** + * Creates and returns a mailbox for the given actor + */ + protected def createMailbox(actorRef: ActorRef): AnyRef = null } /** - * @author Jonas Bonér + * @author Jonas Bonér */ trait MessageDemultiplexer { def select diff --git a/akka-actor/src/main/scala/dispatch/ReactorBasedSingleThreadEventDrivenDispatcher.scala b/akka-actor/src/main/scala/dispatch/ReactorBasedSingleThreadEventDrivenDispatcher.scala index d0850aa830..f76465f7c7 100644 --- a/akka-actor/src/main/scala/dispatch/ReactorBasedSingleThreadEventDrivenDispatcher.scala +++ b/akka-actor/src/main/scala/dispatch/ReactorBasedSingleThreadEventDrivenDispatcher.scala @@ -29,8 +29,8 @@ class ReactorBasedSingleThreadEventDrivenDispatcher(_name: String) val iter = selectedInvocations.iterator while (iter.hasNext) { val invocation = iter.next - val invoker = messageInvokers.get(invocation.receiver) - if (invoker ne null) invoker.invoke(invocation) + val invoker = invocation.receiver + if (invoker ne null) invoker invoke invocation iter.remove } } diff --git a/akka-actor/src/main/scala/dispatch/ReactorBasedThreadPoolEventDrivenDispatcher.scala b/akka-actor/src/main/scala/dispatch/ReactorBasedThreadPoolEventDrivenDispatcher.scala index 530184d4b2..0bb8f3de45 100644 --- a/akka-actor/src/main/scala/dispatch/ReactorBasedThreadPoolEventDrivenDispatcher.scala +++ b/akka-actor/src/main/scala/dispatch/ReactorBasedThreadPoolEventDrivenDispatcher.scala @@ -103,14 +103,14 @@ class ReactorBasedThreadPoolEventDrivenDispatcher(_name: String) private def process(selectedInvocations: List[MessageInvocation]) = synchronized { var nrOfBusyMessages = 0 - val totalNrOfActors = messageInvokers.size + val totalNrOfActors = uuids.size val totalNrOfBusyActors = busyActors.size val invocations = selectedInvocations.iterator while (invocations.hasNext && totalNrOfActors > totalNrOfBusyActors && passFairnessCheck(nrOfBusyMessages)) { val invocation = invocations.next if (invocation eq null) throw new IllegalActorStateException("Message invocation is null [" + invocation + "]") if (!busyActors.contains(invocation.receiver)) { - val invoker = messageInvokers.get(invocation.receiver) + val invoker = invocation.receiver if (invoker eq null) throw new IllegalActorStateException( "Message invoker for invocation [" + invocation + "] is null") resume(invocation.receiver) diff --git a/akka-actor/src/main/scala/dispatch/ThreadBasedDispatcher.scala b/akka-actor/src/main/scala/dispatch/ThreadBasedDispatcher.scala index 589500f413..8fe07e17ac 100644 --- a/akka-actor/src/main/scala/dispatch/ThreadBasedDispatcher.scala +++ b/akka-actor/src/main/scala/dispatch/ThreadBasedDispatcher.scala @@ -28,11 +28,11 @@ class ThreadBasedDispatcher(private val actor: ActorRef, private var selectorThread: Thread = _ @volatile private var active: Boolean = false - if (actor.mailbox eq null) { - actor.mailbox = if (mailboxCapacity > 0) - new BoundedTransferQueue[MessageInvocation](mailboxCapacity,pushTimeout,pushTimeoutUnit) with ThreadMessageQueue - else - new LinkedTransferQueue[MessageInvocation] with ThreadMessageQueue + override def createMailbox(actorRef: ActorRef): AnyRef = { + if (mailboxCapacity > 0) + new BoundedTransferQueue[MessageInvocation](mailboxCapacity,pushTimeout,pushTimeoutUnit) with ThreadMessageQueue + else + new LinkedTransferQueue[MessageInvocation] with ThreadMessageQueue } override def register(actorRef: ActorRef) = {