Removing mailboxIsEmpty and mailboxSize from MessageDispatcher
This commit is contained in:
parent
ca96cb3973
commit
8db3f6aa0a
5 changed files with 7 additions and 21 deletions
|
|
@ -182,7 +182,7 @@ object FSMTimingSpec {
|
||||||
when(TestCancelTimer) {
|
when(TestCancelTimer) {
|
||||||
case Ev(Tick) ⇒
|
case Ev(Tick) ⇒
|
||||||
setTimer("hallo", Tock, 1 milli, false)
|
setTimer("hallo", Tock, 1 milli, false)
|
||||||
TestKit.awaitCond(!context.dispatcher.mailboxIsEmpty(context.asInstanceOf[ActorCell]), 1 second)
|
TestKit.awaitCond(context.asInstanceOf[ActorCell].mailbox.hasMessages, 1 second)
|
||||||
cancelTimer("hallo")
|
cancelTimer("hallo")
|
||||||
sender ! Tick
|
sender ! Tick
|
||||||
setTimer("hallo", Tock, 500 millis, false)
|
setTimer("hallo", Tock, 500 millis, false)
|
||||||
|
|
@ -209,7 +209,7 @@ object FSMTimingSpec {
|
||||||
case Ev(Tick) ⇒
|
case Ev(Tick) ⇒
|
||||||
suspend(self)
|
suspend(self)
|
||||||
setTimer("named", Tock, 1 millis, false)
|
setTimer("named", Tock, 1 millis, false)
|
||||||
TestKit.awaitCond(!context.dispatcher.mailboxIsEmpty(context.asInstanceOf[ActorCell]), 1 second)
|
TestKit.awaitCond(context.asInstanceOf[ActorCell].mailbox.hasMessages, 1 second)
|
||||||
stay forMax (1 millis) replying Tick
|
stay forMax (1 millis) replying Tick
|
||||||
case Ev(Tock) ⇒
|
case Ev(Tock) ⇒
|
||||||
goto(TestCancelStateTimerInNamedTimerMessage2)
|
goto(TestCancelStateTimerInNamedTimerMessage2)
|
||||||
|
|
|
||||||
|
|
@ -566,7 +566,7 @@ private[akka] class ActorCell(
|
||||||
|
|
||||||
final def checkReceiveTimeout() {
|
final def checkReceiveTimeout() {
|
||||||
val recvtimeout = receiveTimeoutData
|
val recvtimeout = receiveTimeoutData
|
||||||
if (recvtimeout._1 > 0 && dispatcher.mailboxIsEmpty(this)) {
|
if (recvtimeout._1 > 0 && !mailbox.hasMessages) {
|
||||||
recvtimeout._2.cancel() //Cancel any ongoing future
|
recvtimeout._2.cancel() //Cancel any ongoing future
|
||||||
//Only reschedule if desired and there are currently no more messages to be processed
|
//Only reschedule if desired and there are currently no more messages to be processed
|
||||||
receiveTimeoutData = (recvtimeout._1, system.scheduler.scheduleOnce(Duration(recvtimeout._1, TimeUnit.MILLISECONDS), self, ReceiveTimeout))
|
receiveTimeoutData = (recvtimeout._1, system.scheduler.scheduleOnce(Duration(recvtimeout._1, TimeUnit.MILLISECONDS), self, ReceiveTimeout))
|
||||||
|
|
|
||||||
|
|
@ -257,16 +257,6 @@ abstract class MessageDispatcher(val prerequisites: DispatcherPrerequisites) ext
|
||||||
* Must be idempotent
|
* Must be idempotent
|
||||||
*/
|
*/
|
||||||
protected[akka] def shutdown(): Unit
|
protected[akka] def shutdown(): Unit
|
||||||
|
|
||||||
/**
|
|
||||||
* Returns the size of the mailbox for the specified actor
|
|
||||||
*/
|
|
||||||
def mailboxSize(actor: ActorCell): Int = actor.mailbox.numberOfMessages
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Returns the "current" emptiness status of the mailbox for the specified actor
|
|
||||||
*/
|
|
||||||
def mailboxIsEmpty(actor: ActorCell): Boolean = !actor.mailbox.hasMessages
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
||||||
|
|
@ -165,7 +165,7 @@ trait SmallestMailboxSelector {
|
||||||
var take = if (partialFill) math.min(selectionCount, delegates.length) else selectionCount
|
var take = if (partialFill) math.min(selectionCount, delegates.length) else selectionCount
|
||||||
|
|
||||||
def mailboxSize(a: ActorRef): Int = a match {
|
def mailboxSize(a: ActorRef): Int = a match {
|
||||||
case l: LocalActorRef ⇒ l.underlying.dispatcher.mailboxSize(l.underlying)
|
case l: LocalActorRef ⇒ l.underlying.mailbox.numberOfMessages
|
||||||
case _ ⇒ Int.MaxValue //Non-local actors mailbox size is unknown, so consider them lowest priority
|
case _ ⇒ Int.MaxValue //Non-local actors mailbox size is unknown, so consider them lowest priority
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -282,7 +282,7 @@ trait MailboxPressureCapacitor {
|
||||||
def pressureThreshold: Int
|
def pressureThreshold: Int
|
||||||
def pressure(delegates: Seq[ActorRef]): Int =
|
def pressure(delegates: Seq[ActorRef]): Int =
|
||||||
delegates count {
|
delegates count {
|
||||||
case a: LocalActorRef ⇒ a.underlying.dispatcher.mailboxSize(a.underlying) > pressureThreshold
|
case a: LocalActorRef ⇒ a.underlying.mailbox.numberOfMessages > pressureThreshold
|
||||||
case _ ⇒ false
|
case _ ⇒ false
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -165,10 +165,6 @@ class CallingThreadDispatcher(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
override def mailboxSize(actor: ActorCell) = getMailbox(actor) map (_.queue.size) getOrElse 0
|
|
||||||
|
|
||||||
override def mailboxIsEmpty(actor: ActorCell): Boolean = getMailbox(actor) map (_.queue.isEmpty) getOrElse true
|
|
||||||
|
|
||||||
protected[akka] override def systemDispatch(receiver: ActorCell, message: SystemMessage) {
|
protected[akka] override def systemDispatch(receiver: ActorCell, message: SystemMessage) {
|
||||||
receiver.mailbox match {
|
receiver.mailbox match {
|
||||||
case mbox: CallingThreadMailbox ⇒
|
case mbox: CallingThreadMailbox ⇒
|
||||||
|
|
@ -304,6 +300,6 @@ class CallingThreadMailbox(_receiver: ActorCell) extends Mailbox(_receiver) with
|
||||||
|
|
||||||
override def enqueue(receiver: ActorRef, msg: Envelope) {}
|
override def enqueue(receiver: ActorRef, msg: Envelope) {}
|
||||||
override def dequeue() = null
|
override def dequeue() = null
|
||||||
override def hasMessages = true
|
override def hasMessages = queue.isEmpty
|
||||||
override def numberOfMessages = 0
|
override def numberOfMessages = queue.size
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue