diff --git a/akka-actor/src/main/scala/akka/actor/ActorRef.scala b/akka-actor/src/main/scala/akka/actor/ActorRef.scala index 9cb2c14513..89b3dc7736 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRef.scala @@ -410,17 +410,6 @@ trait ActorRef extends ActorRefShared with java.lang.Comparable[ActorRef] with S */ def startLink(actorRef: ActorRef): ActorRef - /** - * Returns the mailbox size. - */ - def mailboxSize = dispatcher.mailboxSize(this) - - /** - * Akka Java API.
- * Returns the mailbox size. - */ - def getMailboxSize: Int = mailboxSize - /** * Returns the supervisor, if there is one. */ @@ -952,7 +941,7 @@ class LocalActorRef private[akka] (private[this] val actorFactory: () ⇒ Actor, protected[akka] def checkReceiveTimeout() { cancelReceiveTimeout() - if (receiveTimeout.isDefined && dispatcher.mailboxSize(this) <= 0) { //Only reschedule if desired and there are currently no more messages to be processed + if (receiveTimeout.isDefined && dispatcher.mailboxIsEmpty(this)) { //Only reschedule if desired and there are currently no more messages to be processed _futureTimeout = Some(Scheduler.scheduleOnce(this, ReceiveTimeout, receiveTimeout.get, TimeUnit.MILLISECONDS)) } } diff --git a/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala index bf02af5997..b78b99cabf 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala @@ -110,6 +110,8 @@ class Dispatcher( */ protected def getMailbox(receiver: ActorRef) = receiver.mailbox.asInstanceOf[MessageQueue with ExecutableMailbox] + def mailboxIsEmpty(actorRef: ActorRef): Boolean = getMailbox(actorRef).isEmpty + override def mailboxSize(actorRef: ActorRef) = getMailbox(actorRef).size def createMailbox(actorRef: ActorRef): AnyRef = mailboxType match { diff --git a/akka-actor/src/main/scala/akka/dispatch/MessageHandling.scala b/akka-actor/src/main/scala/akka/dispatch/MessageHandling.scala index 0c87581e5e..40aae3f691 100644 --- a/akka-actor/src/main/scala/akka/dispatch/MessageHandling.scala +++ b/akka-actor/src/main/scala/akka/dispatch/MessageHandling.scala @@ -235,6 +235,11 @@ trait MessageDispatcher { */ def mailboxSize(actorRef: ActorRef): Int + /** + * Returns the "current" emptiness status of the mailbox for the specified actor + */ + def mailboxIsEmpty(actorRef: ActorRef): Boolean + /** * Returns the amount of futures queued for execution */ diff --git a/akka-actor/src/main/scala/akka/routing/Pool.scala b/akka-actor/src/main/scala/akka/routing/Pool.scala index c036616521..0c2c9d6378 100644 --- a/akka-actor/src/main/scala/akka/routing/Pool.scala +++ b/akka-actor/src/main/scala/akka/routing/Pool.scala @@ -116,7 +116,7 @@ trait SmallestMailboxSelector { var take = if (partialFill) math.min(selectionCount, delegates.length) else selectionCount while (take > 0) { - set = delegates.sortWith(_.mailboxSize < _.mailboxSize).take(take) ++ set //Question, doesn't this risk selecting the same actor multiple times? + set = delegates.sortWith((a, b) ⇒ a.dispatcher.mailboxSize(a) < b.dispatcher.mailboxSize(b)).take(take) ++ set //Question, doesn't this risk selecting the same actor multiple times? take -= set.size } @@ -187,7 +187,7 @@ trait BoundedCapacitor { trait MailboxPressureCapacitor { def pressureThreshold: Int def pressure(delegates: Seq[ActorRef]): Int = - delegates count { _.mailboxSize > pressureThreshold } + delegates count { a ⇒ a.dispatcher.mailboxSize(a) > pressureThreshold } } /** diff --git a/akka-actor/src/main/scala/akka/routing/Routing.scala b/akka-actor/src/main/scala/akka/routing/Routing.scala index 454760594a..e707e23f23 100644 --- a/akka-actor/src/main/scala/akka/routing/Routing.scala +++ b/akka-actor/src/main/scala/akka/routing/Routing.scala @@ -206,7 +206,7 @@ case class SmallestMailboxFirstIterator(val items: Seq[ActorRef]) extends Infini def this(items: java.util.List[ActorRef]) = this(items.toList) def hasNext = items != Nil - def next = items.reduceLeft((a1, a2) ⇒ if (a1.mailboxSize < a2.mailboxSize) a1 else a2) + def next = items.reduceLeft((a1, a2) ⇒ if (a1.dispatcher.mailboxSize(a1) < a2.dispatcher.mailboxSize(a2)) a1 else a2) override def exists(f: ActorRef ⇒ Boolean): Boolean = items.exists(f) } diff --git a/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala b/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala index 90d9bfda83..8e21641636 100644 --- a/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala +++ b/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala @@ -135,6 +135,8 @@ class CallingThreadDispatcher(val warnings: Boolean = true) extends MessageDispa override def mailboxSize(actor: ActorRef) = getMailbox(actor).queue.size + def mailboxIsEmpty(actorRef: ActorRef): Boolean = getMailbox(actorRef).queue.isEmpty + private[akka] override def dispatch(handle: MessageInvocation) { val mbox = getMailbox(handle.receiver) val queue = mbox.queue @@ -210,6 +212,7 @@ class CallingThreadDispatcher(val warnings: Boolean = true) extends MessageDispa class NestingQueue { private var q = new LinkedList[MessageInvocation]() def size = q.size + def isEmpty = q.isEmpty def push(handle: MessageInvocation) { q.offer(handle) } def peek = q.peek def pop = q.poll