diff --git a/akka-actor-tests/src/test/scala/akka/actor/FSMTimingSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/FSMTimingSpec.scala index 1f708983bd..59468125eb 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/FSMTimingSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/FSMTimingSpec.scala @@ -182,7 +182,7 @@ object FSMTimingSpec { when(TestCancelTimer) { case Event(Tick, _) ⇒ setTimer("hallo", Tock, 1 milli, false) - TestKit.awaitCond(context.asInstanceOf[ActorCell].mailbox.messageQueue.hasMessages, 1 second) + TestKit.awaitCond(context.asInstanceOf[ActorCell].mailbox.hasMessages, 1 second) cancelTimer("hallo") sender ! Tick setTimer("hallo", Tock, 500 millis, false) @@ -209,7 +209,7 @@ object FSMTimingSpec { case Event(Tick, _) ⇒ suspend(self) setTimer("named", Tock, 1 millis, false) - TestKit.awaitCond(context.asInstanceOf[ActorCell].mailbox.messageQueue.hasMessages, 1 second) + TestKit.awaitCond(context.asInstanceOf[ActorCell].mailbox.hasMessages, 1 second) stay forMax (1 millis) replying Tick case Event(Tock, _) ⇒ goto(TestCancelStateTimerInNamedTimerMessage2) diff --git a/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala index 87a9cf9734..88358e9f16 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala @@ -374,7 +374,7 @@ abstract class ActorModelSpec(config: String) extends AkkaSpec(config) with Defa def compare(l: AnyRef, r: AnyRef) = (l, r) match { case (ll: ActorCell, rr: ActorCell) ⇒ ll.self.path compareTo rr.self.path } } foreach { case cell: ActorCell ⇒ - System.err.println(" - " + cell.self.path + " " + cell.isTerminated + " " + cell.mailbox.status + " " + cell.mailbox.messageQueue.numberOfMessages + " " + SystemMessage.size(cell.mailbox.systemDrain())) + System.err.println(" - " + cell.self.path + " " + cell.isTerminated + " " + cell.mailbox.status + " " + cell.mailbox.numberOfMessages + " " + SystemMessage.size(cell.mailbox.systemDrain())) } System.err.println("Mailbox: " + mq.numberOfMessages + " " + mq.hasMessages) diff --git a/akka-actor-tests/src/test/scala/akka/actor/dispatch/BalancingDispatcherSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/dispatch/BalancingDispatcherSpec.scala index ec0982982c..4060587b73 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/dispatch/BalancingDispatcherSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/dispatch/BalancingDispatcherSpec.scala @@ -76,8 +76,8 @@ class BalancingDispatcherSpec extends AkkaSpec(BalancingDispatcherSpec.config) { } finishedCounter.await(5, TimeUnit.SECONDS) - fast.underlying.mailbox.asInstanceOf[Mailbox].messageQueue.hasMessages must be(false) - slow.underlying.mailbox.asInstanceOf[Mailbox].messageQueue.hasMessages must be(false) + fast.underlying.mailbox.asInstanceOf[Mailbox].hasMessages must be(false) + slow.underlying.mailbox.asInstanceOf[Mailbox].hasMessages must be(false) fast.underlying.actor.asInstanceOf[DelayableActor].invocationCount must be > sentToFast fast.underlying.actor.asInstanceOf[DelayableActor].invocationCount must be > (slow.underlying.actor.asInstanceOf[DelayableActor].invocationCount) diff --git a/akka-actor/src/main/scala/akka/actor/ActorCell.scala b/akka-actor/src/main/scala/akka/actor/ActorCell.scala index 0a77457cf9..39bd52aa87 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorCell.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorCell.scala @@ -590,7 +590,7 @@ private[akka] class ActorCell( final def checkReceiveTimeout() { val recvtimeout = receiveTimeoutData - if (recvtimeout._1 > 0 && !mailbox.messageQueue.hasMessages) { + if (recvtimeout._1 > 0 && !mailbox.hasMessages) { recvtimeout._2.cancel() //Cancel any ongoing future //Only reschedule if desired and there are currently no more messages to be processed receiveTimeoutData = (recvtimeout._1, system.scheduler.scheduleOnce(Duration(recvtimeout._1, TimeUnit.MILLISECONDS), self, ReceiveTimeout)) diff --git a/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala index 46d7b249df..5537b01244 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala @@ -40,7 +40,7 @@ class Dispatcher( protected[akka] def dispatch(receiver: ActorCell, invocation: Envelope) = { val mbox = receiver.mailbox - mbox.messageQueue.enqueue(receiver.self, invocation) + mbox.enqueue(receiver.self, invocation) registerForExecution(mbox, true, false) } diff --git a/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala b/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala index 1a6515ea2d..f25c6571e8 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala @@ -44,6 +44,11 @@ private[akka] abstract class Mailbox(val actor: ActorCell, val messageQueue: Mes import Mailbox._ + def enqueue(receiver: ActorRef, msg: Envelope): Unit = messageQueue.enqueue(receiver, msg) + def dequeue(): Envelope = messageQueue.dequeue() + def hasMessages: Boolean = messageQueue.hasMessages + def numberOfMessages: Int = messageQueue.numberOfMessages + @volatile protected var _statusDoNotCallMeDirectly: Status = _ //0 by default @@ -142,7 +147,7 @@ private[akka] abstract class Mailbox(val actor: ActorCell, val messageQueue: Mes Unsafe.instance.compareAndSwapObject(this, AbstractMailbox.systemMessageOffset, _old, _new) final def canBeScheduledForExecution(hasMessageHint: Boolean, hasSystemMessageHint: Boolean): Boolean = status match { - case Open | Scheduled ⇒ hasMessageHint || hasSystemMessageHint || hasSystemMessages || messageQueue.hasMessages + case Open | Scheduled ⇒ hasMessageHint || hasSystemMessageHint || hasSystemMessages || hasMessages case Closed ⇒ false case _ ⇒ hasSystemMessageHint || hasSystemMessages } @@ -166,7 +171,7 @@ private[akka] abstract class Mailbox(val actor: ActorCell, val messageQueue: Mes left: Int = java.lang.Math.max(dispatcher.throughput, 1), deadlineNs: Long = if (dispatcher.isThroughputDeadlineTimeDefined == true) System.nanoTime + dispatcher.throughputDeadlineTime.toNanos else 0L): Unit = if (shouldProcessMessage) { - val next = messageQueue.dequeue() + val next = dequeue() if (next ne null) { if (Mailbox.debug) println(actor.self + " processing message " + next) actor invoke next diff --git a/akka-actor/src/main/scala/akka/routing/Routing.scala b/akka-actor/src/main/scala/akka/routing/Routing.scala index e77d6ac469..57847f3553 100644 --- a/akka-actor/src/main/scala/akka/routing/Routing.scala +++ b/akka-actor/src/main/scala/akka/routing/Routing.scala @@ -773,7 +773,7 @@ trait SmallestMailboxLike { this: RouterConfig ⇒ * routers based on mailbox and actor internal state. */ protected def hasMessages(a: ActorRef): Boolean = a match { - case x: LocalActorRef ⇒ x.underlying.mailbox.messageQueue.hasMessages + case x: LocalActorRef ⇒ x.underlying.mailbox.hasMessages case _ ⇒ false } @@ -797,7 +797,7 @@ trait SmallestMailboxLike { this: RouterConfig ⇒ * routers based on mailbox and actor internal state. */ protected def numberOfMessages(a: ActorRef): Int = a match { - case x: LocalActorRef ⇒ x.underlying.mailbox.messageQueue.numberOfMessages + case x: LocalActorRef ⇒ x.underlying.mailbox.numberOfMessages case _ ⇒ 0 } @@ -1249,9 +1249,9 @@ case class DefaultResizer( case a: LocalActorRef ⇒ val cell = a.underlying pressureThreshold match { - case 1 ⇒ cell.mailbox.isScheduled && cell.mailbox.messageQueue.hasMessages + case 1 ⇒ cell.mailbox.isScheduled && cell.mailbox.hasMessages case i if i < 1 ⇒ cell.mailbox.isScheduled && cell.currentMessage != null - case threshold ⇒ cell.mailbox.messageQueue.numberOfMessages >= threshold + case threshold ⇒ cell.mailbox.numberOfMessages >= threshold } case x ⇒ false diff --git a/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala b/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala index 8cbe954aac..aba582ae68 100644 --- a/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala +++ b/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala @@ -199,7 +199,7 @@ class CallingThreadDispatcher( } else false } if (execute) runQueue(mbox, queue) - case m ⇒ m.messageQueue.enqueue(receiver.self, handle) + case m ⇒ m.enqueue(receiver.self, handle) } }