diff --git a/akka-actor/src/main/scala/dispatch/MessageHandling.scala b/akka-actor/src/main/scala/dispatch/MessageHandling.scala index 49f4cc3839..640ded8039 100644 --- a/akka-actor/src/main/scala/dispatch/MessageHandling.scala +++ b/akka-actor/src/main/scala/dispatch/MessageHandling.scala @@ -96,7 +96,7 @@ trait MessageDispatcher extends Logging { /** * Returns the size of the mailbox for the specified actor */ - def mailboxSize(actorRef: ActorRef):Int = 0 + def mailboxSize(actorRef: ActorRef):Int /** * Creates and returns a mailbox for the given actor diff --git a/akka-actor/src/main/scala/dispatch/ReactorBasedSingleThreadEventDrivenDispatcher.scala b/akka-actor/src/main/scala/dispatch/ReactorBasedSingleThreadEventDrivenDispatcher.scala index f76465f7c7..c698b22c15 100644 --- a/akka-actor/src/main/scala/dispatch/ReactorBasedSingleThreadEventDrivenDispatcher.scala +++ b/akka-actor/src/main/scala/dispatch/ReactorBasedSingleThreadEventDrivenDispatcher.scala @@ -11,6 +11,7 @@ package se.scalablesolutions.akka.dispatch import java.util.{LinkedList, List} +import se.scalablesolutions.akka.actor.ActorRef class ReactorBasedSingleThreadEventDrivenDispatcher(_name: String) extends AbstractReactorBasedEventDrivenDispatcher("akka:event-driven:reactor:single-thread:dispatcher:" + _name) { @@ -39,6 +40,8 @@ class ReactorBasedSingleThreadEventDrivenDispatcher(_name: String) selectorThread.start } + def mailboxSize(a: ActorRef) = 0 + def isShutdown = !active override def toString = "ReactorBasedSingleThreadEventDrivenDispatcher[" + name + "]" diff --git a/akka-actor/src/main/scala/dispatch/ReactorBasedThreadPoolEventDrivenDispatcher.scala b/akka-actor/src/main/scala/dispatch/ReactorBasedThreadPoolEventDrivenDispatcher.scala index 0bb8f3de45..1b01e90298 100644 --- a/akka-actor/src/main/scala/dispatch/ReactorBasedThreadPoolEventDrivenDispatcher.scala +++ b/akka-actor/src/main/scala/dispatch/ReactorBasedThreadPoolEventDrivenDispatcher.scala @@ -7,8 +7,7 @@ package se.scalablesolutions.akka.dispatch import java.util.concurrent.locks.ReentrantLock import java.util.{HashSet, HashMap, LinkedList, List} - -import se.scalablesolutions.akka.actor.IllegalActorStateException +import se.scalablesolutions.akka.actor.{ActorRef, IllegalActorStateException} /** * Implements the Reactor pattern as defined in: [http://www.cs.wustl.edu/~schmidt/PDF/reactor-siemens.pdf].
@@ -139,6 +138,8 @@ class ReactorBasedThreadPoolEventDrivenDispatcher(_name: String) else nrOfBusyMessages < 100 } + def mailboxSize(a: ActorRef) = 0 + def ensureNotActive(): Unit = if (active) throw new IllegalActorStateException( "Can't build a new thread pool for a dispatcher that is already up and running") diff --git a/akka-actor/src/main/scala/dispatch/ThreadBasedDispatcher.scala b/akka-actor/src/main/scala/dispatch/ThreadBasedDispatcher.scala index 8fe07e17ac..65ee9ed845 100644 --- a/akka-actor/src/main/scala/dispatch/ThreadBasedDispatcher.scala +++ b/akka-actor/src/main/scala/dispatch/ThreadBasedDispatcher.scala @@ -44,6 +44,8 @@ class ThreadBasedDispatcher(private val actor: ActorRef, def mailbox = actor.mailbox.asInstanceOf[ThreadMessageQueue] + def mailboxSize(a: ActorRef) = mailbox.size + def dispatch(invocation: MessageInvocation) = mailbox append invocation def start = if (!active) { @@ -73,14 +75,13 @@ class ThreadBasedDispatcher(private val actor: ActorRef, override def toString = "ThreadBasedDispatcher[" + threadName + "]" } -trait ThreadMessageQueue extends MessageQueue { self: TransferQueue[MessageInvocation] => - +trait ThreadMessageQueue extends MessageQueue with TransferQueue[MessageInvocation] { final def append(invocation: MessageInvocation): Unit = { - if(!self.tryTransfer(invocation)) { //First, try to send the invocation to a waiting consumer - if(!self.offer(invocation)) //If no consumer found, append it to the queue, if that fails, we're aborting + if(!tryTransfer(invocation)) { //First, try to send the invocation to a waiting consumer + if(!offer(invocation)) //If no consumer found, append it to the queue, if that fails, we're aborting throw new MessageQueueAppendFailedException("BlockingMessageTransferQueue transfer timed out") } } - final def next: MessageInvocation = self.take + final def next: MessageInvocation = take }