Adding support for mailboxIsEmpty on MessageDispatcher and removing getMailboxSize and mailboxSize from ActorRef, use actorref.dispatcher.mailboxSize(actorref) and actorref.dispatcher.mailboxIsEmpty(actorref)

This commit is contained in:
Viktor Klang 2011-06-07 13:23:24 -05:00
parent ed5ac01d72
commit 417fcc779d
6 changed files with 14 additions and 15 deletions

View file

@ -410,17 +410,6 @@ trait ActorRef extends ActorRefShared with java.lang.Comparable[ActorRef] with S
*/ */
def startLink(actorRef: ActorRef): ActorRef def startLink(actorRef: ActorRef): ActorRef
/**
* Returns the mailbox size.
*/
def mailboxSize = dispatcher.mailboxSize(this)
/**
* Akka Java API. <p/>
* Returns the mailbox size.
*/
def getMailboxSize: Int = mailboxSize
/** /**
* Returns the supervisor, if there is one. * Returns the supervisor, if there is one.
*/ */
@ -952,7 +941,7 @@ class LocalActorRef private[akka] (private[this] val actorFactory: () ⇒ Actor,
protected[akka] def checkReceiveTimeout() { protected[akka] def checkReceiveTimeout() {
cancelReceiveTimeout() cancelReceiveTimeout()
if (receiveTimeout.isDefined && dispatcher.mailboxSize(this) <= 0) { //Only reschedule if desired and there are currently no more messages to be processed if (receiveTimeout.isDefined && dispatcher.mailboxIsEmpty(this)) { //Only reschedule if desired and there are currently no more messages to be processed
_futureTimeout = Some(Scheduler.scheduleOnce(this, ReceiveTimeout, receiveTimeout.get, TimeUnit.MILLISECONDS)) _futureTimeout = Some(Scheduler.scheduleOnce(this, ReceiveTimeout, receiveTimeout.get, TimeUnit.MILLISECONDS))
} }
} }

View file

@ -110,6 +110,8 @@ class Dispatcher(
*/ */
protected def getMailbox(receiver: ActorRef) = receiver.mailbox.asInstanceOf[MessageQueue with ExecutableMailbox] protected def getMailbox(receiver: ActorRef) = receiver.mailbox.asInstanceOf[MessageQueue with ExecutableMailbox]
def mailboxIsEmpty(actorRef: ActorRef): Boolean = getMailbox(actorRef).isEmpty
override def mailboxSize(actorRef: ActorRef) = getMailbox(actorRef).size override def mailboxSize(actorRef: ActorRef) = getMailbox(actorRef).size
def createMailbox(actorRef: ActorRef): AnyRef = mailboxType match { def createMailbox(actorRef: ActorRef): AnyRef = mailboxType match {

View file

@ -235,6 +235,11 @@ trait MessageDispatcher {
*/ */
def mailboxSize(actorRef: ActorRef): Int def mailboxSize(actorRef: ActorRef): Int
/**
* Returns the "current" emptiness status of the mailbox for the specified actor
*/
def mailboxIsEmpty(actorRef: ActorRef): Boolean
/** /**
* Returns the amount of futures queued for execution * Returns the amount of futures queued for execution
*/ */

View file

@ -116,7 +116,7 @@ trait SmallestMailboxSelector {
var take = if (partialFill) math.min(selectionCount, delegates.length) else selectionCount var take = if (partialFill) math.min(selectionCount, delegates.length) else selectionCount
while (take > 0) { while (take > 0) {
set = delegates.sortWith(_.mailboxSize < _.mailboxSize).take(take) ++ set //Question, doesn't this risk selecting the same actor multiple times? set = delegates.sortWith((a, b) a.dispatcher.mailboxSize(a) < b.dispatcher.mailboxSize(b)).take(take) ++ set //Question, doesn't this risk selecting the same actor multiple times?
take -= set.size take -= set.size
} }
@ -187,7 +187,7 @@ trait BoundedCapacitor {
trait MailboxPressureCapacitor { trait MailboxPressureCapacitor {
def pressureThreshold: Int def pressureThreshold: Int
def pressure(delegates: Seq[ActorRef]): Int = def pressure(delegates: Seq[ActorRef]): Int =
delegates count { _.mailboxSize > pressureThreshold } delegates count { a a.dispatcher.mailboxSize(a) > pressureThreshold }
} }
/** /**

View file

@ -206,7 +206,7 @@ case class SmallestMailboxFirstIterator(val items: Seq[ActorRef]) extends Infini
def this(items: java.util.List[ActorRef]) = this(items.toList) def this(items: java.util.List[ActorRef]) = this(items.toList)
def hasNext = items != Nil def hasNext = items != Nil
def next = items.reduceLeft((a1, a2) if (a1.mailboxSize < a2.mailboxSize) a1 else a2) def next = items.reduceLeft((a1, a2) if (a1.dispatcher.mailboxSize(a1) < a2.dispatcher.mailboxSize(a2)) a1 else a2)
override def exists(f: ActorRef Boolean): Boolean = items.exists(f) override def exists(f: ActorRef Boolean): Boolean = items.exists(f)
} }

View file

@ -135,6 +135,8 @@ class CallingThreadDispatcher(val warnings: Boolean = true) extends MessageDispa
override def mailboxSize(actor: ActorRef) = getMailbox(actor).queue.size override def mailboxSize(actor: ActorRef) = getMailbox(actor).queue.size
def mailboxIsEmpty(actorRef: ActorRef): Boolean = getMailbox(actorRef).queue.isEmpty
private[akka] override def dispatch(handle: MessageInvocation) { private[akka] override def dispatch(handle: MessageInvocation) {
val mbox = getMailbox(handle.receiver) val mbox = getMailbox(handle.receiver)
val queue = mbox.queue val queue = mbox.queue
@ -210,6 +212,7 @@ class CallingThreadDispatcher(val warnings: Boolean = true) extends MessageDispa
class NestingQueue { class NestingQueue {
private var q = new LinkedList[MessageInvocation]() private var q = new LinkedList[MessageInvocation]()
def size = q.size def size = q.size
def isEmpty = q.isEmpty
def push(handle: MessageInvocation) { q.offer(handle) } def push(handle: MessageInvocation) { q.offer(handle) }
def peek = q.peek def peek = q.peek
def pop = q.poll def pop = q.poll