split Mailbox and MessageQueue, see #1844

- this enables using any MessageQueue in BalancingDispatcher,
  CallingThreadDispatcher and in general leads to less conflation of
  concepts
- add MessageQueue.cleanUp(owner, deadLetterQueue) for the benefit of
  durable mailboxes
- change MailboxType.create to take an optional owner and generate only
  a MessageQueue, not a Mailbox
This commit is contained in:
Roland 2012-02-19 10:28:56 +01:00
parent 62be4dafee
commit 2f3737195b
18 changed files with 171 additions and 144 deletions

View file

@ -9,10 +9,12 @@ import java.util.LinkedList
import scala.annotation.tailrec
import com.typesafe.config.Config
import akka.actor.{ ActorInitializationException, ExtensionIdProvider, ExtensionId, Extension, ExtendedActorSystem, ActorRef, ActorCell }
import akka.dispatch.{ TaskInvocation, SystemMessage, Suspend, Resume, MessageDispatcherConfigurator, MessageDispatcher, Mailbox, Envelope, DispatcherPrerequisites, DefaultSystemMessageQueue }
import akka.dispatch.{ MailboxType, TaskInvocation, SystemMessage, Suspend, Resume, MessageDispatcherConfigurator, MessageDispatcher, Mailbox, Envelope, DispatcherPrerequisites, DefaultSystemMessageQueue }
import akka.util.duration.intToDurationInt
import akka.util.{ Switch, Duration }
import akka.util.NonFatal
import akka.actor.ActorContext
import akka.dispatch.MessageQueue
/*
* Locking rules:
@ -75,9 +77,12 @@ private[testkit] class CallingThreadDispatcherQueues extends Extension {
val q = ref.get
if (q ne null) && (q ne own)
} {
while (q.peek ne null) {
val owner = mbox.actor.self
var msg = q.q.dequeue()
while (msg ne null) {
// this is safe because this method is only ever called while holding the suspendSwitch monitor
own.push(q.pop)
own.q.enqueue(owner, msg)
msg = q.q.dequeue()
}
}
}
@ -115,6 +120,7 @@ object CallingThreadDispatcher {
*/
class CallingThreadDispatcher(
_prerequisites: DispatcherPrerequisites,
val mailboxType: MailboxType,
val name: String = "calling-thread") extends MessageDispatcher(_prerequisites) {
import CallingThreadDispatcher._
@ -122,7 +128,7 @@ class CallingThreadDispatcher(
override def id: String = Id
protected[akka] override def createMailbox(actor: ActorCell) = new CallingThreadMailbox(actor)
protected[akka] override def createMailbox(actor: ActorCell) = new CallingThreadMailbox(actor, mailboxType)
protected[akka] override def shutdown() {}
@ -183,17 +189,17 @@ class CallingThreadDispatcher(
case mbox: CallingThreadMailbox
val queue = mbox.queue
val execute = mbox.suspendSwitch.fold {
queue.push(handle)
queue.q.enqueue(receiver.self, handle)
false
} {
queue.push(handle)
queue.q.enqueue(receiver.self, handle)
if (!queue.isActive) {
queue.enter
true
} else false
}
if (execute) runQueue(mbox, queue)
case m m.enqueue(receiver.self, handle)
case m m.messageQueue.enqueue(receiver.self, handle)
}
}
@ -219,7 +225,7 @@ class CallingThreadDispatcher(
queue.leave
null
} {
val ret = if (mbox.isClosed) null else queue.pop
val ret = if (mbox.isClosed) null else queue.q.dequeue()
if (ret eq null) queue.leave
ret
}
@ -261,19 +267,13 @@ class CallingThreadDispatcher(
class CallingThreadDispatcherConfigurator(config: Config, prerequisites: DispatcherPrerequisites)
extends MessageDispatcherConfigurator(config, prerequisites) {
private val instance = new CallingThreadDispatcher(prerequisites)
private val instance = new CallingThreadDispatcher(prerequisites, mailboxType())
override def dispatcher(): MessageDispatcher = instance
}
class NestingQueue {
private var q = new LinkedList[Envelope]()
def size = q.size
def isEmpty = q.isEmpty
def push(handle: Envelope) { q.offer(handle) }
def peek = q.peek
def pop = q.poll
class NestingQueue(val q: MessageQueue) {
@volatile
private var active = false
def enter { if (active) sys.error("already active") else active = true }
@ -281,11 +281,11 @@ class NestingQueue {
def isActive = active
}
class CallingThreadMailbox(_receiver: ActorCell) extends Mailbox(_receiver) with DefaultSystemMessageQueue {
class CallingThreadMailbox(_receiver: ActorCell, val mailboxType: MailboxType) extends Mailbox(_receiver, null) with DefaultSystemMessageQueue {
private val q = new ThreadLocal[NestingQueue]() {
override def initialValue = {
val queue = new NestingQueue
val queue = new NestingQueue(mailboxType.create(Some(actor)))
CallingThreadDispatcherQueues(actor.system).registerQueue(CallingThreadMailbox.this, queue)
queue
}
@ -296,11 +296,6 @@ class CallingThreadMailbox(_receiver: ActorCell) extends Mailbox(_receiver) with
val ctdLock = new ReentrantLock
val suspendSwitch = new Switch
override def enqueue(receiver: ActorRef, msg: Envelope) {}
override def dequeue() = null
override def hasMessages = queue.isEmpty
override def numberOfMessages = queue.size
override def cleanUp(): Unit = {
/*
* This is called from dispatcher.unregister, i.e. under this.lock. If
@ -308,8 +303,10 @@ class CallingThreadMailbox(_receiver: ActorCell) extends Mailbox(_receiver) with
* the gather operation, tough luck: no guaranteed delivery to deadLetters.
*/
suspendSwitch.locked {
CallingThreadDispatcherQueues(actor.system).gatherFromAllOtherQueues(this, queue)
val q = queue
CallingThreadDispatcherQueues(actor.system).gatherFromAllOtherQueues(this, q)
super.cleanUp()
q.q.cleanUp(actor, actor.systemImpl.deadLetterQueue)
}
}
}