move back to explicit enqueue/dequeue delegation from Mailbox to MessageQueue, see #1844
This commit is contained in:
parent
587950b863
commit
3eeaadd804
8 changed files with 19 additions and 14 deletions
|
|
@ -182,7 +182,7 @@ object FSMTimingSpec {
|
|||
when(TestCancelTimer) {
|
||||
case Event(Tick, _) ⇒
|
||||
setTimer("hallo", Tock, 1 milli, false)
|
||||
TestKit.awaitCond(context.asInstanceOf[ActorCell].mailbox.messageQueue.hasMessages, 1 second)
|
||||
TestKit.awaitCond(context.asInstanceOf[ActorCell].mailbox.hasMessages, 1 second)
|
||||
cancelTimer("hallo")
|
||||
sender ! Tick
|
||||
setTimer("hallo", Tock, 500 millis, false)
|
||||
|
|
@ -209,7 +209,7 @@ object FSMTimingSpec {
|
|||
case Event(Tick, _) ⇒
|
||||
suspend(self)
|
||||
setTimer("named", Tock, 1 millis, false)
|
||||
TestKit.awaitCond(context.asInstanceOf[ActorCell].mailbox.messageQueue.hasMessages, 1 second)
|
||||
TestKit.awaitCond(context.asInstanceOf[ActorCell].mailbox.hasMessages, 1 second)
|
||||
stay forMax (1 millis) replying Tick
|
||||
case Event(Tock, _) ⇒
|
||||
goto(TestCancelStateTimerInNamedTimerMessage2)
|
||||
|
|
|
|||
|
|
@ -374,7 +374,7 @@ abstract class ActorModelSpec(config: String) extends AkkaSpec(config) with Defa
|
|||
def compare(l: AnyRef, r: AnyRef) = (l, r) match { case (ll: ActorCell, rr: ActorCell) ⇒ ll.self.path compareTo rr.self.path }
|
||||
} foreach {
|
||||
case cell: ActorCell ⇒
|
||||
System.err.println(" - " + cell.self.path + " " + cell.isTerminated + " " + cell.mailbox.status + " " + cell.mailbox.messageQueue.numberOfMessages + " " + SystemMessage.size(cell.mailbox.systemDrain()))
|
||||
System.err.println(" - " + cell.self.path + " " + cell.isTerminated + " " + cell.mailbox.status + " " + cell.mailbox.numberOfMessages + " " + SystemMessage.size(cell.mailbox.systemDrain()))
|
||||
}
|
||||
|
||||
System.err.println("Mailbox: " + mq.numberOfMessages + " " + mq.hasMessages)
|
||||
|
|
|
|||
|
|
@ -76,8 +76,8 @@ class BalancingDispatcherSpec extends AkkaSpec(BalancingDispatcherSpec.config) {
|
|||
}
|
||||
|
||||
finishedCounter.await(5, TimeUnit.SECONDS)
|
||||
fast.underlying.mailbox.asInstanceOf[Mailbox].messageQueue.hasMessages must be(false)
|
||||
slow.underlying.mailbox.asInstanceOf[Mailbox].messageQueue.hasMessages must be(false)
|
||||
fast.underlying.mailbox.asInstanceOf[Mailbox].hasMessages must be(false)
|
||||
slow.underlying.mailbox.asInstanceOf[Mailbox].hasMessages must be(false)
|
||||
fast.underlying.actor.asInstanceOf[DelayableActor].invocationCount must be > sentToFast
|
||||
fast.underlying.actor.asInstanceOf[DelayableActor].invocationCount must be >
|
||||
(slow.underlying.actor.asInstanceOf[DelayableActor].invocationCount)
|
||||
|
|
|
|||
|
|
@ -590,7 +590,7 @@ private[akka] class ActorCell(
|
|||
|
||||
final def checkReceiveTimeout() {
|
||||
val recvtimeout = receiveTimeoutData
|
||||
if (recvtimeout._1 > 0 && !mailbox.messageQueue.hasMessages) {
|
||||
if (recvtimeout._1 > 0 && !mailbox.hasMessages) {
|
||||
recvtimeout._2.cancel() //Cancel any ongoing future
|
||||
//Only reschedule if desired and there are currently no more messages to be processed
|
||||
receiveTimeoutData = (recvtimeout._1, system.scheduler.scheduleOnce(Duration(recvtimeout._1, TimeUnit.MILLISECONDS), self, ReceiveTimeout))
|
||||
|
|
|
|||
|
|
@ -40,7 +40,7 @@ class Dispatcher(
|
|||
|
||||
protected[akka] def dispatch(receiver: ActorCell, invocation: Envelope) = {
|
||||
val mbox = receiver.mailbox
|
||||
mbox.messageQueue.enqueue(receiver.self, invocation)
|
||||
mbox.enqueue(receiver.self, invocation)
|
||||
registerForExecution(mbox, true, false)
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -44,6 +44,11 @@ private[akka] abstract class Mailbox(val actor: ActorCell, val messageQueue: Mes
|
|||
|
||||
import Mailbox._
|
||||
|
||||
def enqueue(receiver: ActorRef, msg: Envelope): Unit = messageQueue.enqueue(receiver, msg)
|
||||
def dequeue(): Envelope = messageQueue.dequeue()
|
||||
def hasMessages: Boolean = messageQueue.hasMessages
|
||||
def numberOfMessages: Int = messageQueue.numberOfMessages
|
||||
|
||||
@volatile
|
||||
protected var _statusDoNotCallMeDirectly: Status = _ //0 by default
|
||||
|
||||
|
|
@ -142,7 +147,7 @@ private[akka] abstract class Mailbox(val actor: ActorCell, val messageQueue: Mes
|
|||
Unsafe.instance.compareAndSwapObject(this, AbstractMailbox.systemMessageOffset, _old, _new)
|
||||
|
||||
final def canBeScheduledForExecution(hasMessageHint: Boolean, hasSystemMessageHint: Boolean): Boolean = status match {
|
||||
case Open | Scheduled ⇒ hasMessageHint || hasSystemMessageHint || hasSystemMessages || messageQueue.hasMessages
|
||||
case Open | Scheduled ⇒ hasMessageHint || hasSystemMessageHint || hasSystemMessages || hasMessages
|
||||
case Closed ⇒ false
|
||||
case _ ⇒ hasSystemMessageHint || hasSystemMessages
|
||||
}
|
||||
|
|
@ -166,7 +171,7 @@ private[akka] abstract class Mailbox(val actor: ActorCell, val messageQueue: Mes
|
|||
left: Int = java.lang.Math.max(dispatcher.throughput, 1),
|
||||
deadlineNs: Long = if (dispatcher.isThroughputDeadlineTimeDefined == true) System.nanoTime + dispatcher.throughputDeadlineTime.toNanos else 0L): Unit =
|
||||
if (shouldProcessMessage) {
|
||||
val next = messageQueue.dequeue()
|
||||
val next = dequeue()
|
||||
if (next ne null) {
|
||||
if (Mailbox.debug) println(actor.self + " processing message " + next)
|
||||
actor invoke next
|
||||
|
|
|
|||
|
|
@ -773,7 +773,7 @@ trait SmallestMailboxLike { this: RouterConfig ⇒
|
|||
* routers based on mailbox and actor internal state.
|
||||
*/
|
||||
protected def hasMessages(a: ActorRef): Boolean = a match {
|
||||
case x: LocalActorRef ⇒ x.underlying.mailbox.messageQueue.hasMessages
|
||||
case x: LocalActorRef ⇒ x.underlying.mailbox.hasMessages
|
||||
case _ ⇒ false
|
||||
}
|
||||
|
||||
|
|
@ -797,7 +797,7 @@ trait SmallestMailboxLike { this: RouterConfig ⇒
|
|||
* routers based on mailbox and actor internal state.
|
||||
*/
|
||||
protected def numberOfMessages(a: ActorRef): Int = a match {
|
||||
case x: LocalActorRef ⇒ x.underlying.mailbox.messageQueue.numberOfMessages
|
||||
case x: LocalActorRef ⇒ x.underlying.mailbox.numberOfMessages
|
||||
case _ ⇒ 0
|
||||
}
|
||||
|
||||
|
|
@ -1249,9 +1249,9 @@ case class DefaultResizer(
|
|||
case a: LocalActorRef ⇒
|
||||
val cell = a.underlying
|
||||
pressureThreshold match {
|
||||
case 1 ⇒ cell.mailbox.isScheduled && cell.mailbox.messageQueue.hasMessages
|
||||
case 1 ⇒ cell.mailbox.isScheduled && cell.mailbox.hasMessages
|
||||
case i if i < 1 ⇒ cell.mailbox.isScheduled && cell.currentMessage != null
|
||||
case threshold ⇒ cell.mailbox.messageQueue.numberOfMessages >= threshold
|
||||
case threshold ⇒ cell.mailbox.numberOfMessages >= threshold
|
||||
}
|
||||
case x ⇒
|
||||
false
|
||||
|
|
|
|||
|
|
@ -199,7 +199,7 @@ class CallingThreadDispatcher(
|
|||
} else false
|
||||
}
|
||||
if (execute) runQueue(mbox, queue)
|
||||
case m ⇒ m.messageQueue.enqueue(receiver.self, handle)
|
||||
case m ⇒ m.enqueue(receiver.self, handle)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue