From 2f3737195b848d0de05479d2fa9b1507f9adb729 Mon Sep 17 00:00:00 2001 From: Roland Date: Sun, 19 Feb 2012 10:28:56 +0100 Subject: [PATCH 1/4] split Mailbox and MessageQueue, see #1844 - this enables using any MessageQueue in BalancingDispatcher, CallingThreadDispatcher and in general leads to less conflation of concepts - add MessageQueue.cleanUp(owner, deadLetterQueue) for the benefit of durable mailboxes - change MailboxType.create to take an optional owner and generate only a MessageQueue, not a Mailbox --- .../test/scala/akka/actor/FSMTimingSpec.scala | 4 +- .../akka/actor/dispatch/ActorModelSpec.scala | 2 +- .../dispatch/BalancingDispatcherSpec.scala | 4 +- .../akka/dispatch/MailboxConfigSpec.scala | 28 +++---- .../CallingThreadDispatcherModelSpec.scala | 3 +- .../src/main/scala/akka/actor/ActorCell.scala | 2 +- .../main/scala/akka/actor/ActorSystem.scala | 20 +++-- .../akka/dispatch/BalancingDispatcher.scala | 63 ++++++--------- .../main/scala/akka/dispatch/Dispatcher.scala | 4 +- .../main/scala/akka/dispatch/Mailbox.scala | 78 ++++++++++--------- .../src/main/scala/akka/routing/Routing.scala | 8 +- .../actor/mailbox/BeanstalkBasedMailbox.scala | 9 ++- .../actor/mailbox/FiledBasedMailbox.scala | 10 ++- .../akka/actor/mailbox/DurableMailbox.scala | 4 +- .../actor/mailbox/MongoBasedMailbox.scala | 11 ++- .../actor/mailbox/RedisBasedMailbox.scala | 9 ++- .../actor/mailbox/ZooKeeperBasedMailbox.scala | 9 ++- .../testkit/CallingThreadDispatcher.scala | 47 ++++++----- 18 files changed, 171 insertions(+), 144 deletions(-) diff --git a/akka-actor-tests/src/test/scala/akka/actor/FSMTimingSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/FSMTimingSpec.scala index 59468125eb..1f708983bd 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/FSMTimingSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/FSMTimingSpec.scala @@ -182,7 +182,7 @@ object FSMTimingSpec { when(TestCancelTimer) { case Event(Tick, _) ⇒ setTimer("hallo", Tock, 1 milli, false) - TestKit.awaitCond(context.asInstanceOf[ActorCell].mailbox.hasMessages, 1 second) + TestKit.awaitCond(context.asInstanceOf[ActorCell].mailbox.messageQueue.hasMessages, 1 second) cancelTimer("hallo") sender ! Tick setTimer("hallo", Tock, 500 millis, false) @@ -209,7 +209,7 @@ object FSMTimingSpec { case Event(Tick, _) ⇒ suspend(self) setTimer("named", Tock, 1 millis, false) - TestKit.awaitCond(context.asInstanceOf[ActorCell].mailbox.hasMessages, 1 second) + TestKit.awaitCond(context.asInstanceOf[ActorCell].mailbox.messageQueue.hasMessages, 1 second) stay forMax (1 millis) replying Tick case Event(Tock, _) ⇒ goto(TestCancelStateTimerInNamedTimerMessage2) diff --git a/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala index 88358e9f16..87a9cf9734 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala @@ -374,7 +374,7 @@ abstract class ActorModelSpec(config: String) extends AkkaSpec(config) with Defa def compare(l: AnyRef, r: AnyRef) = (l, r) match { case (ll: ActorCell, rr: ActorCell) ⇒ ll.self.path compareTo rr.self.path } } foreach { case cell: ActorCell ⇒ - System.err.println(" - " + cell.self.path + " " + cell.isTerminated + " " + cell.mailbox.status + " " + cell.mailbox.numberOfMessages + " " + SystemMessage.size(cell.mailbox.systemDrain())) + System.err.println(" - " + cell.self.path + " " + cell.isTerminated + " " + cell.mailbox.status + " " + cell.mailbox.messageQueue.numberOfMessages + " " + SystemMessage.size(cell.mailbox.systemDrain())) } System.err.println("Mailbox: " + mq.numberOfMessages + " " + mq.hasMessages) diff --git a/akka-actor-tests/src/test/scala/akka/actor/dispatch/BalancingDispatcherSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/dispatch/BalancingDispatcherSpec.scala index 4060587b73..ec0982982c 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/dispatch/BalancingDispatcherSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/dispatch/BalancingDispatcherSpec.scala @@ -76,8 +76,8 @@ class BalancingDispatcherSpec extends AkkaSpec(BalancingDispatcherSpec.config) { } finishedCounter.await(5, TimeUnit.SECONDS) - fast.underlying.mailbox.asInstanceOf[Mailbox].hasMessages must be(false) - slow.underlying.mailbox.asInstanceOf[Mailbox].hasMessages must be(false) + fast.underlying.mailbox.asInstanceOf[Mailbox].messageQueue.hasMessages must be(false) + slow.underlying.mailbox.asInstanceOf[Mailbox].messageQueue.hasMessages must be(false) fast.underlying.actor.asInstanceOf[DelayableActor].invocationCount must be > sentToFast fast.underlying.actor.asInstanceOf[DelayableActor].invocationCount must be > (slow.underlying.actor.asInstanceOf[DelayableActor].invocationCount) diff --git a/akka-actor-tests/src/test/scala/akka/dispatch/MailboxConfigSpec.scala b/akka-actor-tests/src/test/scala/akka/dispatch/MailboxConfigSpec.scala index 4f787a730f..21c6c75cb2 100644 --- a/akka-actor-tests/src/test/scala/akka/dispatch/MailboxConfigSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/dispatch/MailboxConfigSpec.scala @@ -6,15 +6,14 @@ import java.util.concurrent.ConcurrentLinkedQueue import akka.util._ import akka.util.duration._ import akka.testkit.AkkaSpec -import akka.actor.ActorRef -import akka.actor.ActorContext +import akka.actor.{ ActorRef, ActorContext, Props, LocalActorRef } import com.typesafe.config.Config @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) abstract class MailboxSpec extends AkkaSpec with BeforeAndAfterAll with BeforeAndAfterEach { def name: String - def factory: MailboxType ⇒ Mailbox + def factory: MailboxType ⇒ MessageQueue name should { "create an unbounded mailbox" in { @@ -77,7 +76,7 @@ abstract class MailboxSpec extends AkkaSpec with BeforeAndAfterAll with BeforeAn def createMessageInvocation(msg: Any): Envelope = Envelope(msg, system.deadLetters)(system) - def ensureInitialMailboxState(config: MailboxType, q: Mailbox) { + def ensureInitialMailboxState(config: MailboxType, q: MessageQueue) { q must not be null q match { case aQueue: BlockingQueue[_] ⇒ @@ -136,8 +135,8 @@ abstract class MailboxSpec extends AkkaSpec with BeforeAndAfterAll with BeforeAn class DefaultMailboxSpec extends MailboxSpec { lazy val name = "The default mailbox implementation" def factory = { - case u: UnboundedMailbox ⇒ u.create(null) - case b: BoundedMailbox ⇒ b.create(null) + case u: UnboundedMailbox ⇒ u.create(None) + case b: BoundedMailbox ⇒ b.create(None) } } @@ -145,8 +144,8 @@ class PriorityMailboxSpec extends MailboxSpec { val comparator = PriorityGenerator(_.##) lazy val name = "The priority mailbox implementation" def factory = { - case UnboundedMailbox() ⇒ UnboundedPriorityMailbox(comparator).create(null) - case BoundedMailbox(capacity, pushTimeOut) ⇒ BoundedPriorityMailbox(comparator, capacity, pushTimeOut).create(null) + case UnboundedMailbox() ⇒ UnboundedPriorityMailbox(comparator).create(None) + case BoundedMailbox(capacity, pushTimeOut) ⇒ BoundedPriorityMailbox(comparator, capacity, pushTimeOut).create(None) } } @@ -158,11 +157,13 @@ object CustomMailboxSpec { """ class MyMailboxType(config: Config) extends MailboxType { - override def create(owner: ActorContext) = new MyMailbox(owner) + override def create(owner: Option[ActorContext]) = owner match { + case Some(o) ⇒ new MyMailbox(o) + case None ⇒ throw new Exception("no mailbox owner given") + } } - class MyMailbox(owner: ActorContext) extends CustomMailbox(owner) - with QueueBasedMessageQueue with UnboundedMessageQueueSemantics with DefaultSystemMessageQueue { + class MyMailbox(owner: ActorContext) extends QueueBasedMessageQueue with UnboundedMessageQueueSemantics { final val queue = new ConcurrentLinkedQueue[Envelope]() } } @@ -171,8 +172,9 @@ object CustomMailboxSpec { class CustomMailboxSpec extends AkkaSpec(CustomMailboxSpec.config) { "Dispatcher configuration" must { "support custom mailboxType" in { - val dispatcher = system.dispatchers.lookup("my-dispatcher") - dispatcher.createMailbox(null).getClass must be(classOf[CustomMailboxSpec.MyMailbox]) + val actor = system.actorOf(Props.empty.withDispatcher("my-dispatcher")) + val queue = actor.asInstanceOf[LocalActorRef].underlying.mailbox.messageQueue + queue.getClass must be(classOf[CustomMailboxSpec.MyMailbox]) } } } diff --git a/akka-actor-tests/src/test/scala/akka/testkit/CallingThreadDispatcherModelSpec.scala b/akka-actor-tests/src/test/scala/akka/testkit/CallingThreadDispatcherModelSpec.scala index 4693a56536..5b023054d4 100644 --- a/akka-actor-tests/src/test/scala/akka/testkit/CallingThreadDispatcherModelSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/testkit/CallingThreadDispatcherModelSpec.scala @@ -9,6 +9,7 @@ import com.typesafe.config.Config import akka.dispatch.DispatcherPrerequisites import akka.dispatch.MessageDispatcher import akka.dispatch.MessageDispatcherConfigurator +import akka.dispatch.UnboundedMailbox object CallingThreadDispatcherModelSpec { import ActorModelSpec._ @@ -31,7 +32,7 @@ object CallingThreadDispatcherModelSpec { extends MessageDispatcherConfigurator(config, prerequisites) { private val instance: MessageDispatcher = - new CallingThreadDispatcher(prerequisites) with MessageDispatcherInterceptor { + new CallingThreadDispatcher(prerequisites, UnboundedMailbox()) with MessageDispatcherInterceptor { override def id: String = config.getString("id") } diff --git a/akka-actor/src/main/scala/akka/actor/ActorCell.scala b/akka-actor/src/main/scala/akka/actor/ActorCell.scala index 39bd52aa87..0a77457cf9 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorCell.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorCell.scala @@ -590,7 +590,7 @@ private[akka] class ActorCell( final def checkReceiveTimeout() { val recvtimeout = receiveTimeoutData - if (recvtimeout._1 > 0 && !mailbox.hasMessages) { + if (recvtimeout._1 > 0 && !mailbox.messageQueue.hasMessages) { recvtimeout._2.cancel() //Cancel any ongoing future //Only reschedule if desired and there are currently no more messages to be processed receiveTimeoutData = (recvtimeout._1, system.scheduler.scheduleOnce(Duration(recvtimeout._1, TimeUnit.MILLISECONDS), self, ReceiveTimeout)) diff --git a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala index 6cdcf8817d..f31e0d830f 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala @@ -452,15 +452,19 @@ class ActorSystemImpl protected[akka] (val name: String, applicationConfig: Conf def deadLetters: ActorRef = provider.deadLetters - val deadLetterMailbox: Mailbox = new Mailbox(null) { + val deadLetterQueue: MessageQueue = new MessageQueue { + def enqueue(receiver: ActorRef, envelope: Envelope) { deadLetters ! DeadLetter(envelope.message, envelope.sender, receiver) } + def dequeue() = null + def hasMessages = false + def numberOfMessages = 0 + def cleanUp(owner: ActorContext, deadLetters: MessageQueue): Unit = () + } + + val deadLetterMailbox: Mailbox = new Mailbox(null, deadLetterQueue) { becomeClosed() - override def enqueue(receiver: ActorRef, envelope: Envelope) { deadLetters ! DeadLetter(envelope.message, envelope.sender, receiver) } - override def dequeue() = null - override def systemEnqueue(receiver: ActorRef, handle: SystemMessage) { deadLetters ! DeadLetter(handle, receiver, receiver) } - override def systemDrain(): SystemMessage = null - override def hasMessages = false - override def hasSystemMessages = false - override def numberOfMessages = 0 + def systemEnqueue(receiver: ActorRef, handle: SystemMessage): Unit = deadLetters ! DeadLetter(handle, receiver, receiver) + def systemDrain(): SystemMessage = null + def hasSystemMessages = false } def locker: Locker = provider.locker diff --git a/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala index a7f03db5a0..e95f54b88b 100644 --- a/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala @@ -40,47 +40,9 @@ class BalancingDispatcher( def compare(l: ActorCell, r: ActorCell) = l.self.path compareTo r.self.path })) - val messageQueue: MessageQueue = mailboxType match { - case UnboundedMailbox() ⇒ - new QueueBasedMessageQueue with UnboundedMessageQueueSemantics { - final val queue = new ConcurrentLinkedQueue[Envelope] - } + val messageQueue: MessageQueue = mailboxType.create(None) - case BoundedMailbox(cap, timeout) ⇒ - new QueueBasedMessageQueue with BoundedMessageQueueSemantics { - final val queue = new LinkedBlockingQueue[Envelope](cap) - final val pushTimeOut = timeout - } - - case other ⇒ throw new IllegalArgumentException("Only handles BoundedMailbox and UnboundedMailbox, but you specified [" + other + "]") - } - - protected[akka] override def createMailbox(actor: ActorCell): Mailbox = new SharingMailbox(actor) - - class SharingMailbox(_actor: ActorCell) extends Mailbox(_actor) with DefaultSystemMessageQueue { - final def enqueue(receiver: ActorRef, handle: Envelope) = messageQueue.enqueue(receiver, handle) - - final def dequeue(): Envelope = messageQueue.dequeue() - - final def numberOfMessages: Int = messageQueue.numberOfMessages - - final def hasMessages: Boolean = messageQueue.hasMessages - - override def cleanUp(): Unit = { - //Don't call the original implementation of this since it scraps all messages, and we don't want to do that - if (hasSystemMessages) { - val dlq = actor.systemImpl.deadLetterMailbox - var message = systemDrain() - while (message ne null) { - // message must be “virgin” before being able to systemEnqueue again - val next = message.next - message.next = null - dlq.systemEnqueue(actor.self, message) - message = next - } - } - } - } + protected[akka] override def createMailbox(actor: ActorCell): Mailbox = new SharingMailbox(actor, messageQueue) protected[akka] override def register(actor: ActorCell): Unit = { super.register(actor) @@ -111,4 +73,23 @@ class BalancingDispatcher( scheduleOne() } -} \ No newline at end of file +} + +class SharingMailbox(_actor: ActorCell, _messageQueue: MessageQueue) + extends Mailbox(_actor, _messageQueue) with DefaultSystemMessageQueue { + + override def cleanUp(): Unit = { + //Don't call the original implementation of this since it scraps all messages, and we don't want to do that + if (hasSystemMessages) { + val dlq = actor.systemImpl.deadLetterMailbox + var message = systemDrain() + while (message ne null) { + // message must be “virgin” before being able to systemEnqueue again + val next = message.next + message.next = null + dlq.systemEnqueue(actor.self, message) + message = next + } + } + } +} diff --git a/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala index 2046f02286..46d7b249df 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala @@ -40,7 +40,7 @@ class Dispatcher( protected[akka] def dispatch(receiver: ActorCell, invocation: Envelope) = { val mbox = receiver.mailbox - mbox.enqueue(receiver.self, invocation) + mbox.messageQueue.enqueue(receiver.self, invocation) registerForExecution(mbox, true, false) } @@ -65,7 +65,7 @@ class Dispatcher( } } - protected[akka] def createMailbox(actor: ActorCell): Mailbox = mailboxType.create(actor) + protected[akka] def createMailbox(actor: ActorCell): Mailbox = new Mailbox(actor, mailboxType.create(Some(actor))) with DefaultSystemMessageQueue protected[akka] def shutdown: Unit = Option(executorService.getAndSet(new ExecutorServiceDelegate { diff --git a/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala b/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala index 4c50cb5c8d..1a6515ea2d 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala @@ -34,24 +34,14 @@ object Mailbox { final val debug = false } -/** - * Custom mailbox implementations are implemented by extending this class. - * E.g. - * - * class MyMailbox(owner: ActorContext) extends CustomMailbox(owner) - * with QueueBasedMessageQueue with UnboundedMessageQueueSemantics with DefaultSystemMessageQueue { - * val queue = new ConcurrentLinkedQueue[Envelope]() - * } - * - */ -abstract class CustomMailbox(val actorContext: ActorContext) extends Mailbox(actorContext.asInstanceOf[ActorCell]) - /** * Mailbox and InternalMailbox is separated in two classes because ActorCell is needed for implementation, * but can't be exposed to user defined mailbox subclasses. * */ -private[akka] abstract class Mailbox(val actor: ActorCell) extends MessageQueue with SystemMessageQueue with Runnable { +private[akka] abstract class Mailbox(val actor: ActorCell, val messageQueue: MessageQueue) + extends SystemMessageQueue with Runnable { + import Mailbox._ @volatile @@ -152,7 +142,7 @@ private[akka] abstract class Mailbox(val actor: ActorCell) extends MessageQueue Unsafe.instance.compareAndSwapObject(this, AbstractMailbox.systemMessageOffset, _old, _new) final def canBeScheduledForExecution(hasMessageHint: Boolean, hasSystemMessageHint: Boolean): Boolean = status match { - case Open | Scheduled ⇒ hasMessageHint || hasSystemMessageHint || hasSystemMessages || hasMessages + case Open | Scheduled ⇒ hasMessageHint || hasSystemMessageHint || hasSystemMessages || messageQueue.hasMessages case Closed ⇒ false case _ ⇒ hasSystemMessageHint || hasSystemMessages } @@ -176,7 +166,7 @@ private[akka] abstract class Mailbox(val actor: ActorCell) extends MessageQueue left: Int = java.lang.Math.max(dispatcher.throughput, 1), deadlineNs: Long = if (dispatcher.isThroughputDeadlineTimeDefined == true) System.nanoTime + dispatcher.throughputDeadlineTime.toNanos else 0L): Unit = if (shouldProcessMessage) { - val next = dequeue() + val next = messageQueue.dequeue() if (next ne null) { if (Mailbox.debug) println(actor.self + " processing message " + next) actor invoke next @@ -216,25 +206,20 @@ private[akka] abstract class Mailbox(val actor: ActorCell) extends MessageQueue */ protected[dispatch] def cleanUp(): Unit = if (actor ne null) { // actor is null for the deadLetterMailbox - val dlq = actor.systemImpl.deadLetterMailbox + val dlm = actor.systemImpl.deadLetterMailbox if (hasSystemMessages) { var message = systemDrain() while (message ne null) { // message must be “virgin” before being able to systemEnqueue again val next = message.next message.next = null - dlq.systemEnqueue(actor.self, message) + dlm.systemEnqueue(actor.self, message) message = next } } - if (hasMessages) { - var envelope = dequeue - while (envelope ne null) { - dlq.enqueue(actor.self, envelope) - envelope = dequeue - } - } + if (messageQueue ne null) // needed for CallingThreadDispatcher, which never calls Mailbox.run() + messageQueue.cleanUp(actor, actor.systemImpl.deadLetterQueue) } } @@ -260,9 +245,20 @@ trait MessageQueue { * Indicates whether this queue is non-empty. */ def hasMessages: Boolean + + /** + * Called when the mailbox this queue belongs to is disposed of. Normally it + * is expected to transfer all remaining messages into the dead letter queue + * which is passed in. The owner of this MessageQueue is passed in if + * available (e.g. for creating DeadLetters()), “/deadletters” otherwise. + */ + def cleanUp(owner: ActorContext, deadLetters: MessageQueue): Unit } -trait SystemMessageQueue { +/** + * Internal mailbox implementation detail. + */ +private[akka] trait SystemMessageQueue { /** * Enqueue a new system message, e.g. by prepending atomically as new head of a single-linked list. */ @@ -276,7 +272,10 @@ trait SystemMessageQueue { def hasSystemMessages: Boolean } -trait DefaultSystemMessageQueue { self: Mailbox ⇒ +/** + * Internal mailbox implementation detail. + */ +private[akka] trait DefaultSystemMessageQueue { self: Mailbox ⇒ @tailrec final def systemEnqueue(receiver: ActorRef, message: SystemMessage): Unit = { @@ -329,21 +328,30 @@ trait QueueBasedMessageQueue extends MessageQueue { def queue: Queue[Envelope] def numberOfMessages = queue.size def hasMessages = !queue.isEmpty + def cleanUp(owner: ActorContext, deadLetters: MessageQueue): Unit = { + if (hasMessages) { + var envelope = dequeue + while (envelope ne null) { + deadLetters.enqueue(owner.self, envelope) + envelope = dequeue + } + } + } } /** * Mailbox configuration. */ trait MailboxType { - def create(receiver: ActorContext): Mailbox + def create(owner: Option[ActorContext]): MessageQueue } /** * It's a case class for Java (new UnboundedMailbox) */ case class UnboundedMailbox() extends MailboxType { - final override def create(receiver: ActorContext): Mailbox = - new Mailbox(receiver.asInstanceOf[ActorCell]) with QueueBasedMessageQueue with UnboundedMessageQueueSemantics with DefaultSystemMessageQueue { + final override def create(owner: Option[ActorContext]): MessageQueue = + new QueueBasedMessageQueue with UnboundedMessageQueueSemantics { final val queue = new ConcurrentLinkedQueue[Envelope]() } } @@ -353,16 +361,16 @@ case class BoundedMailbox( final val capacity: Int, final val pushTimeOut: Durat if (capacity < 0) throw new IllegalArgumentException("The capacity for BoundedMailbox can not be negative") if (pushTimeOut eq null) throw new IllegalArgumentException("The push time-out for BoundedMailbox can not be null") - final override def create(receiver: ActorContext): Mailbox = - new Mailbox(receiver.asInstanceOf[ActorCell]) with QueueBasedMessageQueue with BoundedMessageQueueSemantics with DefaultSystemMessageQueue { + final override def create(owner: Option[ActorContext]): MessageQueue = + new QueueBasedMessageQueue with BoundedMessageQueueSemantics { final val queue = new LinkedBlockingQueue[Envelope](capacity) final val pushTimeOut = BoundedMailbox.this.pushTimeOut } } case class UnboundedPriorityMailbox( final val cmp: Comparator[Envelope]) extends MailboxType { - final override def create(receiver: ActorContext): Mailbox = - new Mailbox(receiver.asInstanceOf[ActorCell]) with QueueBasedMessageQueue with UnboundedMessageQueueSemantics with DefaultSystemMessageQueue { + final override def create(owner: Option[ActorContext]): MessageQueue = + new QueueBasedMessageQueue with UnboundedMessageQueueSemantics { final val queue = new PriorityBlockingQueue[Envelope](11, cmp) } } @@ -372,8 +380,8 @@ case class BoundedPriorityMailbox( final val cmp: Comparator[Envelope], final va if (capacity < 0) throw new IllegalArgumentException("The capacity for BoundedMailbox can not be negative") if (pushTimeOut eq null) throw new IllegalArgumentException("The push time-out for BoundedMailbox can not be null") - final override def create(receiver: ActorContext): Mailbox = - new Mailbox(receiver.asInstanceOf[ActorCell]) with QueueBasedMessageQueue with BoundedMessageQueueSemantics with DefaultSystemMessageQueue { + final override def create(owner: Option[ActorContext]): MessageQueue = + new QueueBasedMessageQueue with BoundedMessageQueueSemantics { final val queue = new BoundedBlockingQueue[Envelope](capacity, new PriorityQueue[Envelope](11, cmp)) final val pushTimeOut = BoundedPriorityMailbox.this.pushTimeOut } diff --git a/akka-actor/src/main/scala/akka/routing/Routing.scala b/akka-actor/src/main/scala/akka/routing/Routing.scala index 57847f3553..e77d6ac469 100644 --- a/akka-actor/src/main/scala/akka/routing/Routing.scala +++ b/akka-actor/src/main/scala/akka/routing/Routing.scala @@ -773,7 +773,7 @@ trait SmallestMailboxLike { this: RouterConfig ⇒ * routers based on mailbox and actor internal state. */ protected def hasMessages(a: ActorRef): Boolean = a match { - case x: LocalActorRef ⇒ x.underlying.mailbox.hasMessages + case x: LocalActorRef ⇒ x.underlying.mailbox.messageQueue.hasMessages case _ ⇒ false } @@ -797,7 +797,7 @@ trait SmallestMailboxLike { this: RouterConfig ⇒ * routers based on mailbox and actor internal state. */ protected def numberOfMessages(a: ActorRef): Int = a match { - case x: LocalActorRef ⇒ x.underlying.mailbox.numberOfMessages + case x: LocalActorRef ⇒ x.underlying.mailbox.messageQueue.numberOfMessages case _ ⇒ 0 } @@ -1249,9 +1249,9 @@ case class DefaultResizer( case a: LocalActorRef ⇒ val cell = a.underlying pressureThreshold match { - case 1 ⇒ cell.mailbox.isScheduled && cell.mailbox.hasMessages + case 1 ⇒ cell.mailbox.isScheduled && cell.mailbox.messageQueue.hasMessages case i if i < 1 ⇒ cell.mailbox.isScheduled && cell.currentMessage != null - case threshold ⇒ cell.mailbox.numberOfMessages >= threshold + case threshold ⇒ cell.mailbox.messageQueue.numberOfMessages >= threshold } case x ⇒ false diff --git a/akka-durable-mailboxes/akka-beanstalk-mailbox/src/main/scala/akka/actor/mailbox/BeanstalkBasedMailbox.scala b/akka-durable-mailboxes/akka-beanstalk-mailbox/src/main/scala/akka/actor/mailbox/BeanstalkBasedMailbox.scala index 489d97d176..68aaa6067e 100644 --- a/akka-durable-mailboxes/akka-beanstalk-mailbox/src/main/scala/akka/actor/mailbox/BeanstalkBasedMailbox.scala +++ b/akka-durable-mailboxes/akka-beanstalk-mailbox/src/main/scala/akka/actor/mailbox/BeanstalkBasedMailbox.scala @@ -12,11 +12,16 @@ import akka.event.Logging import akka.actor.ActorRef import akka.dispatch.MailboxType import com.typesafe.config.Config +import akka.config.ConfigurationException +import akka.dispatch.MessageQueue class BeanstalkBasedMailboxException(message: String) extends AkkaException(message) {} class BeanstalkBasedMailboxType(config: Config) extends MailboxType { - override def create(owner: ActorContext) = new BeanstalkBasedMailbox(owner) + override def create(owner: Option[ActorContext]): MessageQueue = owner match { + case Some(o) ⇒ new BeanstalkBasedMailbox(o) + case None ⇒ throw new ConfigurationException("creating a durable mailbox requires an owner (i.e. does not work with BalancingDispatcher)") + } } /** @@ -110,4 +115,6 @@ class BeanstalkBasedMailbox(_owner: ActorContext) extends DurableMailbox(_owner) private def reconnect(name: String): ThreadLocal[Client] = { new ThreadLocal[Client] { override def initialValue: Client = connect(name) } } + + def cleanUp(owner: ActorContext, deadLetters: MessageQueue): Unit = () } diff --git a/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/FiledBasedMailbox.scala b/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/FiledBasedMailbox.scala index ccdbdc4145..70368e2f40 100644 --- a/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/FiledBasedMailbox.scala +++ b/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/FiledBasedMailbox.scala @@ -6,15 +6,19 @@ package akka.actor.mailbox import org.apache.commons.io.FileUtils import akka.actor.ActorContext -import akka.dispatch.Envelope +import akka.dispatch.{ Envelope, MessageQueue } import akka.event.Logging import akka.actor.ActorRef import akka.dispatch.MailboxType import com.typesafe.config.Config import akka.util.NonFatal +import akka.config.ConfigurationException class FileBasedMailboxType(config: Config) extends MailboxType { - override def create(owner: ActorContext) = new FileBasedMailbox(owner) + override def create(owner: Option[ActorContext]): MessageQueue = owner match { + case Some(o) ⇒ new FileBasedMailbox(o) + case None ⇒ throw new ConfigurationException("creating a durable mailbox requires an owner (i.e. does not work with BalancingDispatcher)") + } } class FileBasedMailbox(_owner: ActorContext) extends DurableMailbox(_owner) with DurableMessageSerialization { @@ -72,4 +76,6 @@ class FileBasedMailbox(_owner: ActorContext) extends DurableMailbox(_owner) with case NonFatal(_) ⇒ false } + def cleanUp(owner: ActorContext, deadLetters: MessageQueue): Unit = () + } diff --git a/akka-durable-mailboxes/akka-mailboxes-common/src/main/scala/akka/actor/mailbox/DurableMailbox.scala b/akka-durable-mailboxes/akka-mailboxes-common/src/main/scala/akka/actor/mailbox/DurableMailbox.scala index 69f7fb50c1..92072c8cf3 100644 --- a/akka-durable-mailboxes/akka-mailboxes-common/src/main/scala/akka/actor/mailbox/DurableMailbox.scala +++ b/akka-durable-mailboxes/akka-mailboxes-common/src/main/scala/akka/actor/mailbox/DurableMailbox.scala @@ -4,7 +4,7 @@ package akka.actor.mailbox import akka.actor.{ ActorContext, ActorRef, ExtendedActorSystem } -import akka.dispatch.{ Envelope, DefaultSystemMessageQueue, CustomMailbox } +import akka.dispatch.{ Envelope, MessageQueue } import akka.remote.MessageSerializer import akka.remote.RemoteProtocol.{ ActorRefProtocol, RemoteMessageProtocol } @@ -12,7 +12,7 @@ private[akka] object DurableExecutableMailboxConfig { val Name = "[\\.\\/\\$\\s]".r } -abstract class DurableMailbox(val owner: ActorContext) extends CustomMailbox(owner) with DefaultSystemMessageQueue { +abstract class DurableMailbox(val owner: ActorContext) extends MessageQueue { import DurableExecutableMailboxConfig._ def system: ExtendedActorSystem = owner.system.asInstanceOf[ExtendedActorSystem] diff --git a/akka-durable-mailboxes/akka-mongo-mailbox/src/main/scala/akka/actor/mailbox/MongoBasedMailbox.scala b/akka-durable-mailboxes/akka-mongo-mailbox/src/main/scala/akka/actor/mailbox/MongoBasedMailbox.scala index d17a1221a8..ac969695bb 100644 --- a/akka-durable-mailboxes/akka-mongo-mailbox/src/main/scala/akka/actor/mailbox/MongoBasedMailbox.scala +++ b/akka-durable-mailboxes/akka-mongo-mailbox/src/main/scala/akka/actor/mailbox/MongoBasedMailbox.scala @@ -14,11 +14,16 @@ import akka.dispatch.{ Await, Promise, Envelope } import java.util.concurrent.TimeoutException import akka.dispatch.MailboxType import com.typesafe.config.Config +import akka.config.ConfigurationException +import akka.dispatch.MessageQueue class MongoBasedMailboxException(message: String) extends AkkaException(message) class MongoBasedMailboxType(config: Config) extends MailboxType { - override def create(owner: ActorContext) = new MongoBasedMailbox(owner) + override def create(owner: Option[ActorContext]): MessageQueue = owner match { + case Some(o) ⇒ new MongoBasedMailbox(o) + case None ⇒ throw new ConfigurationException("creating a durable mailbox requires an owner (i.e. does not work with BalancingDispatcher)") + } } /** @@ -37,6 +42,8 @@ class MongoBasedMailbox(_owner: ActorContext) extends DurableMailbox(_owner) { implicit val mailboxBSONSer = new BSONSerializableMailbox(system) implicit val safeWrite = WriteConcern.Safe // TODO - Replica Safe when appropriate! + private val dispatcher = owner.dispatcher + private val settings = MongoBasedMailboxExtension(owner.system) val log = Logging(system, "MongoBasedMailbox") @@ -132,4 +139,6 @@ class MongoBasedMailbox(_owner: ActorContext) extends DurableMailbox(_owner) { } } } + + def cleanUp(owner: ActorContext, deadLetters: MessageQueue): Unit = () } diff --git a/akka-durable-mailboxes/akka-redis-mailbox/src/main/scala/akka/actor/mailbox/RedisBasedMailbox.scala b/akka-durable-mailboxes/akka-redis-mailbox/src/main/scala/akka/actor/mailbox/RedisBasedMailbox.scala index b6cf3febc6..9a01d462f8 100644 --- a/akka-durable-mailboxes/akka-redis-mailbox/src/main/scala/akka/actor/mailbox/RedisBasedMailbox.scala +++ b/akka-durable-mailboxes/akka-redis-mailbox/src/main/scala/akka/actor/mailbox/RedisBasedMailbox.scala @@ -12,11 +12,16 @@ import akka.actor.ActorRef import akka.dispatch.MailboxType import com.typesafe.config.Config import akka.util.NonFatal +import akka.config.ConfigurationException +import akka.dispatch.MessageQueue class RedisBasedMailboxException(message: String) extends AkkaException(message) class RedisBasedMailboxType(config: Config) extends MailboxType { - override def create(owner: ActorContext) = new RedisBasedMailbox(owner) + override def create(owner: Option[ActorContext]): MessageQueue = owner match { + case Some(o) ⇒ new RedisBasedMailbox(o) + case None ⇒ throw new ConfigurationException("creating a durable mailbox requires an owner (i.e. does not work with BalancingDispatcher)") + } } class RedisBasedMailbox(_owner: ActorContext) extends DurableMailbox(_owner) with DurableMessageSerialization { @@ -80,5 +85,7 @@ class RedisBasedMailbox(_owner: ActorContext) extends DurableMailbox(_owner) wit throw error } } + + def cleanUp(owner: ActorContext, deadLetters: MessageQueue): Unit = () } diff --git a/akka-durable-mailboxes/akka-zookeeper-mailbox/src/main/scala/akka/actor/mailbox/ZooKeeperBasedMailbox.scala b/akka-durable-mailboxes/akka-zookeeper-mailbox/src/main/scala/akka/actor/mailbox/ZooKeeperBasedMailbox.scala index 90fd381af1..1434f6c1d9 100644 --- a/akka-durable-mailboxes/akka-zookeeper-mailbox/src/main/scala/akka/actor/mailbox/ZooKeeperBasedMailbox.scala +++ b/akka-durable-mailboxes/akka-zookeeper-mailbox/src/main/scala/akka/actor/mailbox/ZooKeeperBasedMailbox.scala @@ -13,11 +13,16 @@ import akka.actor.ActorRef import akka.dispatch.MailboxType import com.typesafe.config.Config import akka.util.NonFatal +import akka.config.ConfigurationException +import akka.dispatch.MessageQueue class ZooKeeperBasedMailboxException(message: String) extends AkkaException(message) class ZooKeeperBasedMailboxType(config: Config) extends MailboxType { - override def create(owner: ActorContext) = new ZooKeeperBasedMailbox(owner) + override def create(owner: Option[ActorContext]): MessageQueue = owner match { + case Some(o) ⇒ new ZooKeeperBasedMailbox(o) + case None ⇒ throw new ConfigurationException("creating a durable mailbox requires an owner (i.e. does not work with BalancingDispatcher)") + } } class ZooKeeperBasedMailbox(_owner: ActorContext) extends DurableMailbox(_owner) with DurableMessageSerialization { @@ -62,7 +67,7 @@ class ZooKeeperBasedMailbox(_owner: ActorContext) extends DurableMailbox(_owner) case e: Exception ⇒ false } - override def cleanUp() { + def cleanUp(owner: ActorContext, deadLetters: MessageQueue): Unit = { try { zkClient.close() } catch { diff --git a/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala b/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala index 8b2d15a079..8cbe954aac 100644 --- a/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala +++ b/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala @@ -9,10 +9,12 @@ import java.util.LinkedList import scala.annotation.tailrec import com.typesafe.config.Config import akka.actor.{ ActorInitializationException, ExtensionIdProvider, ExtensionId, Extension, ExtendedActorSystem, ActorRef, ActorCell } -import akka.dispatch.{ TaskInvocation, SystemMessage, Suspend, Resume, MessageDispatcherConfigurator, MessageDispatcher, Mailbox, Envelope, DispatcherPrerequisites, DefaultSystemMessageQueue } +import akka.dispatch.{ MailboxType, TaskInvocation, SystemMessage, Suspend, Resume, MessageDispatcherConfigurator, MessageDispatcher, Mailbox, Envelope, DispatcherPrerequisites, DefaultSystemMessageQueue } import akka.util.duration.intToDurationInt import akka.util.{ Switch, Duration } import akka.util.NonFatal +import akka.actor.ActorContext +import akka.dispatch.MessageQueue /* * Locking rules: @@ -75,9 +77,12 @@ private[testkit] class CallingThreadDispatcherQueues extends Extension { val q = ref.get if (q ne null) && (q ne own) } { - while (q.peek ne null) { + val owner = mbox.actor.self + var msg = q.q.dequeue() + while (msg ne null) { // this is safe because this method is only ever called while holding the suspendSwitch monitor - own.push(q.pop) + own.q.enqueue(owner, msg) + msg = q.q.dequeue() } } } @@ -115,6 +120,7 @@ object CallingThreadDispatcher { */ class CallingThreadDispatcher( _prerequisites: DispatcherPrerequisites, + val mailboxType: MailboxType, val name: String = "calling-thread") extends MessageDispatcher(_prerequisites) { import CallingThreadDispatcher._ @@ -122,7 +128,7 @@ class CallingThreadDispatcher( override def id: String = Id - protected[akka] override def createMailbox(actor: ActorCell) = new CallingThreadMailbox(actor) + protected[akka] override def createMailbox(actor: ActorCell) = new CallingThreadMailbox(actor, mailboxType) protected[akka] override def shutdown() {} @@ -183,17 +189,17 @@ class CallingThreadDispatcher( case mbox: CallingThreadMailbox ⇒ val queue = mbox.queue val execute = mbox.suspendSwitch.fold { - queue.push(handle) + queue.q.enqueue(receiver.self, handle) false } { - queue.push(handle) + queue.q.enqueue(receiver.self, handle) if (!queue.isActive) { queue.enter true } else false } if (execute) runQueue(mbox, queue) - case m ⇒ m.enqueue(receiver.self, handle) + case m ⇒ m.messageQueue.enqueue(receiver.self, handle) } } @@ -219,7 +225,7 @@ class CallingThreadDispatcher( queue.leave null } { - val ret = if (mbox.isClosed) null else queue.pop + val ret = if (mbox.isClosed) null else queue.q.dequeue() if (ret eq null) queue.leave ret } @@ -261,19 +267,13 @@ class CallingThreadDispatcher( class CallingThreadDispatcherConfigurator(config: Config, prerequisites: DispatcherPrerequisites) extends MessageDispatcherConfigurator(config, prerequisites) { - private val instance = new CallingThreadDispatcher(prerequisites) + + private val instance = new CallingThreadDispatcher(prerequisites, mailboxType()) override def dispatcher(): MessageDispatcher = instance } -class NestingQueue { - private var q = new LinkedList[Envelope]() - def size = q.size - def isEmpty = q.isEmpty - def push(handle: Envelope) { q.offer(handle) } - def peek = q.peek - def pop = q.poll - +class NestingQueue(val q: MessageQueue) { @volatile private var active = false def enter { if (active) sys.error("already active") else active = true } @@ -281,11 +281,11 @@ class NestingQueue { def isActive = active } -class CallingThreadMailbox(_receiver: ActorCell) extends Mailbox(_receiver) with DefaultSystemMessageQueue { +class CallingThreadMailbox(_receiver: ActorCell, val mailboxType: MailboxType) extends Mailbox(_receiver, null) with DefaultSystemMessageQueue { private val q = new ThreadLocal[NestingQueue]() { override def initialValue = { - val queue = new NestingQueue + val queue = new NestingQueue(mailboxType.create(Some(actor))) CallingThreadDispatcherQueues(actor.system).registerQueue(CallingThreadMailbox.this, queue) queue } @@ -296,11 +296,6 @@ class CallingThreadMailbox(_receiver: ActorCell) extends Mailbox(_receiver) with val ctdLock = new ReentrantLock val suspendSwitch = new Switch - override def enqueue(receiver: ActorRef, msg: Envelope) {} - override def dequeue() = null - override def hasMessages = queue.isEmpty - override def numberOfMessages = queue.size - override def cleanUp(): Unit = { /* * This is called from dispatcher.unregister, i.e. under this.lock. If @@ -308,8 +303,10 @@ class CallingThreadMailbox(_receiver: ActorCell) extends Mailbox(_receiver) with * the gather operation, tough luck: no guaranteed delivery to deadLetters. */ suspendSwitch.locked { - CallingThreadDispatcherQueues(actor.system).gatherFromAllOtherQueues(this, queue) + val q = queue + CallingThreadDispatcherQueues(actor.system).gatherFromAllOtherQueues(this, q) super.cleanUp() + q.q.cleanUp(actor, actor.systemImpl.deadLetterQueue) } } } From 587950b863d23367fbe623bd0d12469e53046adf Mon Sep 17 00:00:00 2001 From: Roland Date: Mon, 20 Feb 2012 15:15:20 +0100 Subject: [PATCH 2/4] =?UTF-8?q?fix=20BSONSerializer=E2=80=99s=20use=20of?= =?UTF-8?q?=20paths?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../src/main/scala/akka/actor/mailbox/BSONSerialization.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/akka-durable-mailboxes/akka-mongo-mailbox/src/main/scala/akka/actor/mailbox/BSONSerialization.scala b/akka-durable-mailboxes/akka-mongo-mailbox/src/main/scala/akka/actor/mailbox/BSONSerialization.scala index 5aa314eb55..8da4db9376 100644 --- a/akka-durable-mailboxes/akka-mongo-mailbox/src/main/scala/akka/actor/mailbox/BSONSerialization.scala +++ b/akka-durable-mailboxes/akka-mongo-mailbox/src/main/scala/akka/actor/mailbox/BSONSerialization.scala @@ -26,7 +26,7 @@ class BSONSerializableMailbox(system: ExtendedActorSystem) extends SerializableB val b = Map.newBuilder[String, Any] b += "_id" -> msg._id b += "ownerPath" -> msg.ownerPath - b += "senderPath" -> msg.sender.path + b += "senderPath" -> msg.sender.path.toString /** * TODO - Figure out a way for custom serialization of the message instance From 3eeaadd804431312bd705439dfebae07d4d418ea Mon Sep 17 00:00:00 2001 From: Roland Date: Tue, 21 Feb 2012 13:22:25 +0100 Subject: [PATCH 3/4] move back to explicit enqueue/dequeue delegation from Mailbox to MessageQueue, see #1844 --- .../src/test/scala/akka/actor/FSMTimingSpec.scala | 4 ++-- .../test/scala/akka/actor/dispatch/ActorModelSpec.scala | 2 +- .../akka/actor/dispatch/BalancingDispatcherSpec.scala | 4 ++-- akka-actor/src/main/scala/akka/actor/ActorCell.scala | 2 +- akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala | 2 +- akka-actor/src/main/scala/akka/dispatch/Mailbox.scala | 9 +++++++-- akka-actor/src/main/scala/akka/routing/Routing.scala | 8 ++++---- .../scala/akka/testkit/CallingThreadDispatcher.scala | 2 +- 8 files changed, 19 insertions(+), 14 deletions(-) diff --git a/akka-actor-tests/src/test/scala/akka/actor/FSMTimingSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/FSMTimingSpec.scala index 1f708983bd..59468125eb 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/FSMTimingSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/FSMTimingSpec.scala @@ -182,7 +182,7 @@ object FSMTimingSpec { when(TestCancelTimer) { case Event(Tick, _) ⇒ setTimer("hallo", Tock, 1 milli, false) - TestKit.awaitCond(context.asInstanceOf[ActorCell].mailbox.messageQueue.hasMessages, 1 second) + TestKit.awaitCond(context.asInstanceOf[ActorCell].mailbox.hasMessages, 1 second) cancelTimer("hallo") sender ! Tick setTimer("hallo", Tock, 500 millis, false) @@ -209,7 +209,7 @@ object FSMTimingSpec { case Event(Tick, _) ⇒ suspend(self) setTimer("named", Tock, 1 millis, false) - TestKit.awaitCond(context.asInstanceOf[ActorCell].mailbox.messageQueue.hasMessages, 1 second) + TestKit.awaitCond(context.asInstanceOf[ActorCell].mailbox.hasMessages, 1 second) stay forMax (1 millis) replying Tick case Event(Tock, _) ⇒ goto(TestCancelStateTimerInNamedTimerMessage2) diff --git a/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala index 87a9cf9734..88358e9f16 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala @@ -374,7 +374,7 @@ abstract class ActorModelSpec(config: String) extends AkkaSpec(config) with Defa def compare(l: AnyRef, r: AnyRef) = (l, r) match { case (ll: ActorCell, rr: ActorCell) ⇒ ll.self.path compareTo rr.self.path } } foreach { case cell: ActorCell ⇒ - System.err.println(" - " + cell.self.path + " " + cell.isTerminated + " " + cell.mailbox.status + " " + cell.mailbox.messageQueue.numberOfMessages + " " + SystemMessage.size(cell.mailbox.systemDrain())) + System.err.println(" - " + cell.self.path + " " + cell.isTerminated + " " + cell.mailbox.status + " " + cell.mailbox.numberOfMessages + " " + SystemMessage.size(cell.mailbox.systemDrain())) } System.err.println("Mailbox: " + mq.numberOfMessages + " " + mq.hasMessages) diff --git a/akka-actor-tests/src/test/scala/akka/actor/dispatch/BalancingDispatcherSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/dispatch/BalancingDispatcherSpec.scala index ec0982982c..4060587b73 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/dispatch/BalancingDispatcherSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/dispatch/BalancingDispatcherSpec.scala @@ -76,8 +76,8 @@ class BalancingDispatcherSpec extends AkkaSpec(BalancingDispatcherSpec.config) { } finishedCounter.await(5, TimeUnit.SECONDS) - fast.underlying.mailbox.asInstanceOf[Mailbox].messageQueue.hasMessages must be(false) - slow.underlying.mailbox.asInstanceOf[Mailbox].messageQueue.hasMessages must be(false) + fast.underlying.mailbox.asInstanceOf[Mailbox].hasMessages must be(false) + slow.underlying.mailbox.asInstanceOf[Mailbox].hasMessages must be(false) fast.underlying.actor.asInstanceOf[DelayableActor].invocationCount must be > sentToFast fast.underlying.actor.asInstanceOf[DelayableActor].invocationCount must be > (slow.underlying.actor.asInstanceOf[DelayableActor].invocationCount) diff --git a/akka-actor/src/main/scala/akka/actor/ActorCell.scala b/akka-actor/src/main/scala/akka/actor/ActorCell.scala index 0a77457cf9..39bd52aa87 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorCell.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorCell.scala @@ -590,7 +590,7 @@ private[akka] class ActorCell( final def checkReceiveTimeout() { val recvtimeout = receiveTimeoutData - if (recvtimeout._1 > 0 && !mailbox.messageQueue.hasMessages) { + if (recvtimeout._1 > 0 && !mailbox.hasMessages) { recvtimeout._2.cancel() //Cancel any ongoing future //Only reschedule if desired and there are currently no more messages to be processed receiveTimeoutData = (recvtimeout._1, system.scheduler.scheduleOnce(Duration(recvtimeout._1, TimeUnit.MILLISECONDS), self, ReceiveTimeout)) diff --git a/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala index 46d7b249df..5537b01244 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala @@ -40,7 +40,7 @@ class Dispatcher( protected[akka] def dispatch(receiver: ActorCell, invocation: Envelope) = { val mbox = receiver.mailbox - mbox.messageQueue.enqueue(receiver.self, invocation) + mbox.enqueue(receiver.self, invocation) registerForExecution(mbox, true, false) } diff --git a/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala b/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala index 1a6515ea2d..f25c6571e8 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala @@ -44,6 +44,11 @@ private[akka] abstract class Mailbox(val actor: ActorCell, val messageQueue: Mes import Mailbox._ + def enqueue(receiver: ActorRef, msg: Envelope): Unit = messageQueue.enqueue(receiver, msg) + def dequeue(): Envelope = messageQueue.dequeue() + def hasMessages: Boolean = messageQueue.hasMessages + def numberOfMessages: Int = messageQueue.numberOfMessages + @volatile protected var _statusDoNotCallMeDirectly: Status = _ //0 by default @@ -142,7 +147,7 @@ private[akka] abstract class Mailbox(val actor: ActorCell, val messageQueue: Mes Unsafe.instance.compareAndSwapObject(this, AbstractMailbox.systemMessageOffset, _old, _new) final def canBeScheduledForExecution(hasMessageHint: Boolean, hasSystemMessageHint: Boolean): Boolean = status match { - case Open | Scheduled ⇒ hasMessageHint || hasSystemMessageHint || hasSystemMessages || messageQueue.hasMessages + case Open | Scheduled ⇒ hasMessageHint || hasSystemMessageHint || hasSystemMessages || hasMessages case Closed ⇒ false case _ ⇒ hasSystemMessageHint || hasSystemMessages } @@ -166,7 +171,7 @@ private[akka] abstract class Mailbox(val actor: ActorCell, val messageQueue: Mes left: Int = java.lang.Math.max(dispatcher.throughput, 1), deadlineNs: Long = if (dispatcher.isThroughputDeadlineTimeDefined == true) System.nanoTime + dispatcher.throughputDeadlineTime.toNanos else 0L): Unit = if (shouldProcessMessage) { - val next = messageQueue.dequeue() + val next = dequeue() if (next ne null) { if (Mailbox.debug) println(actor.self + " processing message " + next) actor invoke next diff --git a/akka-actor/src/main/scala/akka/routing/Routing.scala b/akka-actor/src/main/scala/akka/routing/Routing.scala index e77d6ac469..57847f3553 100644 --- a/akka-actor/src/main/scala/akka/routing/Routing.scala +++ b/akka-actor/src/main/scala/akka/routing/Routing.scala @@ -773,7 +773,7 @@ trait SmallestMailboxLike { this: RouterConfig ⇒ * routers based on mailbox and actor internal state. */ protected def hasMessages(a: ActorRef): Boolean = a match { - case x: LocalActorRef ⇒ x.underlying.mailbox.messageQueue.hasMessages + case x: LocalActorRef ⇒ x.underlying.mailbox.hasMessages case _ ⇒ false } @@ -797,7 +797,7 @@ trait SmallestMailboxLike { this: RouterConfig ⇒ * routers based on mailbox and actor internal state. */ protected def numberOfMessages(a: ActorRef): Int = a match { - case x: LocalActorRef ⇒ x.underlying.mailbox.messageQueue.numberOfMessages + case x: LocalActorRef ⇒ x.underlying.mailbox.numberOfMessages case _ ⇒ 0 } @@ -1249,9 +1249,9 @@ case class DefaultResizer( case a: LocalActorRef ⇒ val cell = a.underlying pressureThreshold match { - case 1 ⇒ cell.mailbox.isScheduled && cell.mailbox.messageQueue.hasMessages + case 1 ⇒ cell.mailbox.isScheduled && cell.mailbox.hasMessages case i if i < 1 ⇒ cell.mailbox.isScheduled && cell.currentMessage != null - case threshold ⇒ cell.mailbox.messageQueue.numberOfMessages >= threshold + case threshold ⇒ cell.mailbox.numberOfMessages >= threshold } case x ⇒ false diff --git a/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala b/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala index 8cbe954aac..aba582ae68 100644 --- a/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala +++ b/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala @@ -199,7 +199,7 @@ class CallingThreadDispatcher( } else false } if (execute) runQueue(mbox, queue) - case m ⇒ m.messageQueue.enqueue(receiver.self, handle) + case m ⇒ m.enqueue(receiver.self, handle) } } From 882249c5cfef3879a7a1fb61d992f4ac49321d0b Mon Sep 17 00:00:00 2001 From: Roland Date: Tue, 21 Feb 2012 13:40:05 +0100 Subject: [PATCH 4/4] rename durable mailboxes to message queues, see #1844 keep *MailboxType in order to not confuse users. --- .../scala/akka/actor/mailbox/BeanstalkBasedMailbox.scala | 6 +++--- .../main/scala/akka/actor/mailbox/FiledBasedMailbox.scala | 6 +++--- .../main/scala/akka/actor/mailbox/DurableMailbox.scala | 4 ++-- .../main/scala/akka/actor/mailbox/BSONSerialization.scala | 2 +- .../main/scala/akka/actor/mailbox/MongoBasedMailbox.scala | 8 ++++---- .../main/scala/akka/actor/mailbox/RedisBasedMailbox.scala | 6 +++--- .../scala/akka/actor/mailbox/ZooKeeperBasedMailbox.scala | 6 +++--- 7 files changed, 19 insertions(+), 19 deletions(-) diff --git a/akka-durable-mailboxes/akka-beanstalk-mailbox/src/main/scala/akka/actor/mailbox/BeanstalkBasedMailbox.scala b/akka-durable-mailboxes/akka-beanstalk-mailbox/src/main/scala/akka/actor/mailbox/BeanstalkBasedMailbox.scala index 68aaa6067e..2e3232b22a 100644 --- a/akka-durable-mailboxes/akka-beanstalk-mailbox/src/main/scala/akka/actor/mailbox/BeanstalkBasedMailbox.scala +++ b/akka-durable-mailboxes/akka-beanstalk-mailbox/src/main/scala/akka/actor/mailbox/BeanstalkBasedMailbox.scala @@ -19,7 +19,7 @@ class BeanstalkBasedMailboxException(message: String) extends AkkaException(mess class BeanstalkBasedMailboxType(config: Config) extends MailboxType { override def create(owner: Option[ActorContext]): MessageQueue = owner match { - case Some(o) ⇒ new BeanstalkBasedMailbox(o) + case Some(o) ⇒ new BeanstalkBasedMessageQueue(o) case None ⇒ throw new ConfigurationException("creating a durable mailbox requires an owner (i.e. does not work with BalancingDispatcher)") } } @@ -27,13 +27,13 @@ class BeanstalkBasedMailboxType(config: Config) extends MailboxType { /** * @author Jonas Bonér */ -class BeanstalkBasedMailbox(_owner: ActorContext) extends DurableMailbox(_owner) with DurableMessageSerialization { +class BeanstalkBasedMessageQueue(_owner: ActorContext) extends DurableMessageQueue(_owner) with DurableMessageSerialization { private val settings = BeanstalkBasedMailboxExtension(owner.system) private val messageSubmitDelaySeconds = settings.MessageSubmitDelay.toSeconds.toInt private val messageTimeToLiveSeconds = settings.MessageTimeToLive.toSeconds.toInt - val log = Logging(system, "BeanstalkBasedMailbox") + val log = Logging(system, "BeanstalkBasedMessageQueue") private val queue = new ThreadLocal[Client] { override def initialValue = connect(name) } diff --git a/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/FiledBasedMailbox.scala b/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/FiledBasedMailbox.scala index 70368e2f40..8f4754ebbf 100644 --- a/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/FiledBasedMailbox.scala +++ b/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/FiledBasedMailbox.scala @@ -16,14 +16,14 @@ import akka.config.ConfigurationException class FileBasedMailboxType(config: Config) extends MailboxType { override def create(owner: Option[ActorContext]): MessageQueue = owner match { - case Some(o) ⇒ new FileBasedMailbox(o) + case Some(o) ⇒ new FileBasedMessageQueue(o) case None ⇒ throw new ConfigurationException("creating a durable mailbox requires an owner (i.e. does not work with BalancingDispatcher)") } } -class FileBasedMailbox(_owner: ActorContext) extends DurableMailbox(_owner) with DurableMessageSerialization { +class FileBasedMessageQueue(_owner: ActorContext) extends DurableMessageQueue(_owner) with DurableMessageSerialization { - val log = Logging(system, "FileBasedMailbox") + val log = Logging(system, "FileBasedMessageQueue") private val settings = FileBasedMailboxExtension(owner.system) val queuePath = settings.QueuePath diff --git a/akka-durable-mailboxes/akka-mailboxes-common/src/main/scala/akka/actor/mailbox/DurableMailbox.scala b/akka-durable-mailboxes/akka-mailboxes-common/src/main/scala/akka/actor/mailbox/DurableMailbox.scala index 92072c8cf3..77b932911d 100644 --- a/akka-durable-mailboxes/akka-mailboxes-common/src/main/scala/akka/actor/mailbox/DurableMailbox.scala +++ b/akka-durable-mailboxes/akka-mailboxes-common/src/main/scala/akka/actor/mailbox/DurableMailbox.scala @@ -12,7 +12,7 @@ private[akka] object DurableExecutableMailboxConfig { val Name = "[\\.\\/\\$\\s]".r } -abstract class DurableMailbox(val owner: ActorContext) extends MessageQueue { +abstract class DurableMessageQueue(val owner: ActorContext) extends MessageQueue { import DurableExecutableMailboxConfig._ def system: ExtendedActorSystem = owner.system.asInstanceOf[ExtendedActorSystem] @@ -22,7 +22,7 @@ abstract class DurableMailbox(val owner: ActorContext) extends MessageQueue { } -trait DurableMessageSerialization { this: DurableMailbox ⇒ +trait DurableMessageSerialization { this: DurableMessageQueue ⇒ def serialize(durableMessage: Envelope): Array[Byte] = { diff --git a/akka-durable-mailboxes/akka-mongo-mailbox/src/main/scala/akka/actor/mailbox/BSONSerialization.scala b/akka-durable-mailboxes/akka-mongo-mailbox/src/main/scala/akka/actor/mailbox/BSONSerialization.scala index 8da4db9376..aad04630c8 100644 --- a/akka-durable-mailboxes/akka-mongo-mailbox/src/main/scala/akka/actor/mailbox/BSONSerialization.scala +++ b/akka-durable-mailboxes/akka-mongo-mailbox/src/main/scala/akka/actor/mailbox/BSONSerialization.scala @@ -18,7 +18,7 @@ import akka.remote.RemoteProtocol.MessageProtocol import akka.remote.MessageSerializer import akka.actor.ExtendedActorSystem -class BSONSerializableMailbox(system: ExtendedActorSystem) extends SerializableBSONObject[MongoDurableMessage] with Logging { +class BSONSerializableMessageQueue(system: ExtendedActorSystem) extends SerializableBSONObject[MongoDurableMessage] with Logging { protected[akka] def serializeDurableMsg(msg: MongoDurableMessage)(implicit serializer: BSONSerializer) = { diff --git a/akka-durable-mailboxes/akka-mongo-mailbox/src/main/scala/akka/actor/mailbox/MongoBasedMailbox.scala b/akka-durable-mailboxes/akka-mongo-mailbox/src/main/scala/akka/actor/mailbox/MongoBasedMailbox.scala index ac969695bb..647411b747 100644 --- a/akka-durable-mailboxes/akka-mongo-mailbox/src/main/scala/akka/actor/mailbox/MongoBasedMailbox.scala +++ b/akka-durable-mailboxes/akka-mongo-mailbox/src/main/scala/akka/actor/mailbox/MongoBasedMailbox.scala @@ -21,7 +21,7 @@ class MongoBasedMailboxException(message: String) extends AkkaException(message) class MongoBasedMailboxType(config: Config) extends MailboxType { override def create(owner: Option[ActorContext]): MessageQueue = owner match { - case Some(o) ⇒ new MongoBasedMailbox(o) + case Some(o) ⇒ new MongoBasedMessageQueue(o) case None ⇒ throw new ConfigurationException("creating a durable mailbox requires an owner (i.e. does not work with BalancingDispatcher)") } } @@ -37,16 +37,16 @@ class MongoBasedMailboxType(config: Config) extends MailboxType { * * @author Brendan W. McAdams */ -class MongoBasedMailbox(_owner: ActorContext) extends DurableMailbox(_owner) { +class MongoBasedMessageQueue(_owner: ActorContext) extends DurableMessageQueue(_owner) { // this implicit object provides the context for reading/writing things as MongoDurableMessage - implicit val mailboxBSONSer = new BSONSerializableMailbox(system) + implicit val mailboxBSONSer = new BSONSerializableMessageQueue(system) implicit val safeWrite = WriteConcern.Safe // TODO - Replica Safe when appropriate! private val dispatcher = owner.dispatcher private val settings = MongoBasedMailboxExtension(owner.system) - val log = Logging(system, "MongoBasedMailbox") + val log = Logging(system, "MongoBasedMessageQueue") @volatile private var mongo = connect() diff --git a/akka-durable-mailboxes/akka-redis-mailbox/src/main/scala/akka/actor/mailbox/RedisBasedMailbox.scala b/akka-durable-mailboxes/akka-redis-mailbox/src/main/scala/akka/actor/mailbox/RedisBasedMailbox.scala index 9a01d462f8..78a8bc3b37 100644 --- a/akka-durable-mailboxes/akka-redis-mailbox/src/main/scala/akka/actor/mailbox/RedisBasedMailbox.scala +++ b/akka-durable-mailboxes/akka-redis-mailbox/src/main/scala/akka/actor/mailbox/RedisBasedMailbox.scala @@ -19,19 +19,19 @@ class RedisBasedMailboxException(message: String) extends AkkaException(message) class RedisBasedMailboxType(config: Config) extends MailboxType { override def create(owner: Option[ActorContext]): MessageQueue = owner match { - case Some(o) ⇒ new RedisBasedMailbox(o) + case Some(o) ⇒ new RedisBasedMessageQueue(o) case None ⇒ throw new ConfigurationException("creating a durable mailbox requires an owner (i.e. does not work with BalancingDispatcher)") } } -class RedisBasedMailbox(_owner: ActorContext) extends DurableMailbox(_owner) with DurableMessageSerialization { +class RedisBasedMessageQueue(_owner: ActorContext) extends DurableMessageQueue(_owner) with DurableMessageSerialization { private val settings = RedisBasedMailboxExtension(owner.system) @volatile private var clients = connect() // returns a RedisClientPool for multiple asynchronous message handling - val log = Logging(system, "RedisBasedMailbox") + val log = Logging(system, "RedisBasedMessageQueue") def enqueue(receiver: ActorRef, envelope: Envelope) { log.debug("ENQUEUING message in redis-based mailbox [%s]".format(envelope)) diff --git a/akka-durable-mailboxes/akka-zookeeper-mailbox/src/main/scala/akka/actor/mailbox/ZooKeeperBasedMailbox.scala b/akka-durable-mailboxes/akka-zookeeper-mailbox/src/main/scala/akka/actor/mailbox/ZooKeeperBasedMailbox.scala index 1434f6c1d9..abeb430c64 100644 --- a/akka-durable-mailboxes/akka-zookeeper-mailbox/src/main/scala/akka/actor/mailbox/ZooKeeperBasedMailbox.scala +++ b/akka-durable-mailboxes/akka-zookeeper-mailbox/src/main/scala/akka/actor/mailbox/ZooKeeperBasedMailbox.scala @@ -20,18 +20,18 @@ class ZooKeeperBasedMailboxException(message: String) extends AkkaException(mess class ZooKeeperBasedMailboxType(config: Config) extends MailboxType { override def create(owner: Option[ActorContext]): MessageQueue = owner match { - case Some(o) ⇒ new ZooKeeperBasedMailbox(o) + case Some(o) ⇒ new ZooKeeperBasedMessageQueue(o) case None ⇒ throw new ConfigurationException("creating a durable mailbox requires an owner (i.e. does not work with BalancingDispatcher)") } } -class ZooKeeperBasedMailbox(_owner: ActorContext) extends DurableMailbox(_owner) with DurableMessageSerialization { +class ZooKeeperBasedMessageQueue(_owner: ActorContext) extends DurableMessageQueue(_owner) with DurableMessageSerialization { private val settings = ZooKeeperBasedMailboxExtension(owner.system) val queueNode = "/queues" val queuePathTemplate = queueNode + "/%s" - val log = Logging(system, "ZooKeeperBasedMailbox") + val log = Logging(system, "ZooKeeperBasedMessageQueue") private val zkClient = new AkkaZkClient( settings.ZkServerAddresses,