Refactor Mailbox handling

- rename MessageInvocation to Envelope (same for System...)
- rename MessageQueue to Mailbox and include ExecutableMailbox code
- make MailboxType act as factory, so users can provide their own
- properly type mailbox field as Mailbox everywhere
- adapt CallingThreadDispatcher and some tests
- create DeadletterMailbox and use it to replace actor’s mailbox on
  terminate()
This commit is contained in:
Roland 2011-09-21 15:01:47 +02:00
parent d6eb76852a
commit 7c63f94169
13 changed files with 293 additions and 244 deletions

View file

@ -124,7 +124,7 @@ object ActorModelSpec {
getStats(actor.ref).unregisters.incrementAndGet()
}
protected[akka] abstract override def dispatch(invocation: MessageInvocation) {
protected[akka] abstract override def dispatch(invocation: Envelope) {
getStats(invocation.receiver.ref).msgsReceived.incrementAndGet()
super.dispatch(invocation)
}

View file

@ -7,7 +7,7 @@ import org.junit.Test
import java.util.concurrent.{ TimeUnit, CountDownLatch }
import akka.actor.Actor._
import akka.dispatch.{ MessageQueue, Dispatchers }
import akka.dispatch.{ Mailbox, Dispatchers }
import akka.actor.{ LocalActorRef, IllegalActorStateException, Actor, Props }
object BalancingDispatcherSpec {
@ -80,8 +80,8 @@ class BalancingDispatcherSpec extends JUnitSuite with MustMatchers {
}
finishedCounter.await(5, TimeUnit.SECONDS)
fast.underlying.mailbox.asInstanceOf[MessageQueue].isEmpty must be(true)
slow.underlying.mailbox.asInstanceOf[MessageQueue].isEmpty must be(true)
fast.underlying.mailbox.asInstanceOf[Mailbox].isEmpty must be(true)
slow.underlying.mailbox.asInstanceOf[Mailbox].isEmpty must be(true)
fast.underlyingActorInstance.asInstanceOf[DelayableActor].invocationCount must be > sentToFast
fast.underlyingActorInstance.asInstanceOf[DelayableActor].invocationCount must be >
(slow.underlyingActorInstance.asInstanceOf[DelayableActor].invocationCount)

View file

@ -15,7 +15,7 @@ import akka.actor.{ LocalActorRef, Actor, ActorRegistry, NullChannel }
abstract class MailboxSpec extends WordSpec with MustMatchers with BeforeAndAfterAll with BeforeAndAfterEach {
def name: String
def factory: MailboxType MessageQueue
def factory: MailboxType Mailbox
name should {
"create an unbounded mailbox" in {
@ -80,14 +80,14 @@ abstract class MailboxSpec extends WordSpec with MustMatchers with BeforeAndAfte
result
}
def createMessageInvocation(msg: Any): MessageInvocation = {
new MessageInvocation(
def createMessageInvocation(msg: Any): Envelope = {
new Envelope(
actorOf(new Actor { //Dummy actor
def receive = { case _ }
}).asInstanceOf[LocalActorRef].underlying, msg, NullChannel)
}
def ensureInitialMailboxState(config: MailboxType, q: MessageQueue) {
def ensureInitialMailboxState(config: MailboxType, q: Mailbox) {
q must not be null
q match {
case aQueue: BlockingQueue[_]
@ -106,7 +106,7 @@ abstract class MailboxSpec extends WordSpec with MustMatchers with BeforeAndAfte
val q = factory(config)
ensureInitialMailboxState(config, q)
def createProducer(fromNum: Int, toNum: Int): Future[Vector[MessageInvocation]] = spawn {
def createProducer(fromNum: Int, toNum: Int): Future[Vector[Envelope]] = spawn {
val messages = Vector() ++ (for (i fromNum to toNum) yield createMessageInvocation(i))
for (i messages) q.enqueue(i)
messages
@ -117,8 +117,8 @@ abstract class MailboxSpec extends WordSpec with MustMatchers with BeforeAndAfte
val producers = for (i (1 to totalMessages by step).toList) yield createProducer(i, i + step - 1)
def createConsumer: Future[Vector[MessageInvocation]] = spawn {
var r = Vector[MessageInvocation]()
def createConsumer: Future[Vector[Envelope]] = spawn {
var r = Vector[Envelope]()
while (producers.exists(_.isCompleted == false) || !q.isEmpty) {
q.dequeue match {
case null
@ -146,8 +146,8 @@ abstract class MailboxSpec extends WordSpec with MustMatchers with BeforeAndAfte
class DefaultMailboxSpec extends MailboxSpec {
lazy val name = "The default mailbox implementation"
def factory = {
case UnboundedMailbox() new DefaultUnboundedMessageQueue()
case BoundedMailbox(capacity, pushTimeOut) new DefaultBoundedMessageQueue(capacity, pushTimeOut)
case u: UnboundedMailbox u.create(null)
case b: BoundedMailbox b.create(null)
}
}
@ -155,7 +155,7 @@ class PriorityMailboxSpec extends MailboxSpec {
val comparator = PriorityGenerator(_.##)
lazy val name = "The priority mailbox implementation"
def factory = {
case UnboundedMailbox() new UnboundedPriorityMessageQueue(comparator)
case BoundedMailbox(capacity, pushTimeOut) new BoundedPriorityMessageQueue(capacity, pushTimeOut, comparator)
case UnboundedMailbox() UnboundedPriorityMailbox(comparator).create(null)
case BoundedMailbox(capacity, pushTimeOut) BoundedPriorityMailbox(comparator, capacity, pushTimeOut).create(null)
}
}

View file

@ -31,9 +31,9 @@ private[akka] trait ActorContext {
def hotswap_=(stack: Stack[PartialFunction[Any, Unit]]): Unit
def currentMessage: MessageInvocation
def currentMessage: Envelope
def currentMessage_=(invocation: MessageInvocation): Unit
def currentMessage_=(invocation: Envelope): Unit
def sender: Option[ActorRef]
@ -69,7 +69,7 @@ private[akka] class ActorCell(
var terminated = false
@volatile
var mailbox: AnyRef = _
var mailbox: Mailbox = _
@volatile
var futureTimeout: Option[ScheduledFuture[AnyRef]] = None
@ -93,7 +93,7 @@ private[akka] class ActorCell(
var receiveTimeout: Option[Long] = _receiveTimeout // TODO: currently settable from outside for compatibility
@volatile //FIXME volatile can be removed
var currentMessage: MessageInvocation = null
var currentMessage: Envelope = null
val actor: AtomicReference[Actor] = new AtomicReference[Actor]() //FIXME We can most probably make this just a regular reference to Actor
@ -112,7 +112,7 @@ private[akka] class ActorCell(
if (props.supervisor.isDefined) props.supervisor.get.link(self)
dispatcher.attach(this)
Actor.registry.register(self)
dispatcher.systemDispatch(SystemMessageInvocation(this, Create, NullChannel))
dispatcher.systemDispatch(SystemEnvelope(this, Create, NullChannel))
}
def newActor(restart: Boolean): Actor = {
@ -150,7 +150,7 @@ private[akka] class ActorCell(
private[akka] def stop(): Unit =
if (!terminated) {
//terminated = true // TODO: turn this into tristate with Running, Terminating, Terminated and use AtomicReferenceFieldUpdater
dispatcher.systemDispatch(SystemMessageInvocation(this, Terminate, NullChannel))
dispatcher.systemDispatch(SystemEnvelope(this, Terminate, NullChannel))
}
def link(actorRef: ActorRef): ActorRef = {
@ -186,7 +186,7 @@ private[akka] class ActorCell(
def supervisor_=(sup: Option[ActorRef]): Unit = _supervisor = sup
def postMessageToMailbox(message: Any, channel: UntypedChannel): Unit =
if (isRunning) dispatcher dispatchMessage new MessageInvocation(this, message, channel)
if (isRunning) dispatcher dispatchMessage new Envelope(this, message, channel)
else throw new ActorInitializationException("Actor " + self + " is dead")
def postMessageToMailboxAndCreateFutureResultWithTimeout(
@ -197,7 +197,7 @@ private[akka] class ActorCell(
case f: ActorPromise f
case _ new ActorPromise(timeout)(dispatcher)
}
dispatcher dispatchMessage new MessageInvocation(this, message, future)
dispatcher dispatchMessage new Envelope(this, message, future)
future
} else throw new ActorInitializationException("Actor " + self + " is dead")
@ -224,10 +224,10 @@ private[akka] class ActorCell(
case msg msg.channel
}
def systemInvoke(envelope: SystemMessageInvocation): Unit = {
def systemInvoke(envelope: SystemEnvelope): Boolean = {
var isTerminated = terminated
def create(recreation: Boolean): Unit = try {
def create(recreation: Boolean): Boolean = try {
actor.get() match {
case null
val created = newActor(restart = false)
@ -237,17 +237,27 @@ private[akka] class ActorCell(
restart(new Exception("Restart commanded"), None, None)
case _
}
true
} catch {
case e
envelope.channel.sendException(e)
if (supervisor.isDefined) supervisor.get ! Failed(self, e, false, maxNrOfRetriesCount, restartTimeWindowStartNanos) else throw e
if (supervisor.isDefined) {
supervisor.get ! Failed(self, e, false, maxNrOfRetriesCount, restartTimeWindowStartNanos)
false // don't continue processing messages right now
} else throw e
}
def suspend(): Unit = dispatcher suspend this
def suspend(): Boolean = {
dispatcher suspend this
true
}
def resume(): Unit = dispatcher resume this
def resume(): Boolean = {
dispatcher resume this
true
}
def terminate(): Unit = {
def terminate(): Boolean = {
receiveTimeout = None
cancelReceiveTimeout
Actor.registry.unregister(self)
@ -266,6 +276,9 @@ private[akka] class ActorCell(
}
}
// TODO CHECK: stop message dequeuing, which means that mailbox will not be restarted and GCed
false
} finally {
try {
if (supervisor.isDefined)
@ -288,6 +301,8 @@ private[akka] class ActorCell(
case Resume resume()
case Terminate terminate()
}
} else {
false
}
} catch {
case e //Should we really catch everything here?
@ -299,7 +314,7 @@ private[akka] class ActorCell(
}
}
def invoke(messageHandle: MessageInvocation): Unit = {
def invoke(messageHandle: Envelope): Unit = {
var isTerminated = terminated
guard.lock.lock()
try {

View file

@ -4,7 +4,7 @@
package akka.actor
import akka.util.ByteString
import akka.dispatch.MessageInvocation
import akka.dispatch.Envelope
import akka.event.EventHandler
import java.net.InetSocketAddress
@ -130,11 +130,11 @@ object IO {
}
sealed trait IOSuspendable[+A]
sealed trait CurrentMessage { def message: MessageInvocation }
private case class ByteStringLength(continuation: (ByteString) IOSuspendable[Any], handle: Handle, message: MessageInvocation, length: Int) extends IOSuspendable[ByteString] with CurrentMessage
private case class ByteStringDelimited(continuation: (ByteString) IOSuspendable[Any], handle: Handle, message: MessageInvocation, delimter: ByteString, inclusive: Boolean, scanned: Int) extends IOSuspendable[ByteString] with CurrentMessage
private case class ByteStringAny(continuation: (ByteString) IOSuspendable[Any], handle: Handle, message: MessageInvocation) extends IOSuspendable[ByteString] with CurrentMessage
private case class Retry(message: MessageInvocation) extends IOSuspendable[Nothing]
sealed trait CurrentMessage { def message: Envelope }
private case class ByteStringLength(continuation: (ByteString) IOSuspendable[Any], handle: Handle, message: Envelope, length: Int) extends IOSuspendable[ByteString] with CurrentMessage
private case class ByteStringDelimited(continuation: (ByteString) IOSuspendable[Any], handle: Handle, message: Envelope, delimter: ByteString, inclusive: Boolean, scanned: Int) extends IOSuspendable[ByteString] with CurrentMessage
private case class ByteStringAny(continuation: (ByteString) IOSuspendable[Any], handle: Handle, message: Envelope) extends IOSuspendable[ByteString] with CurrentMessage
private case class Retry(message: Envelope) extends IOSuspendable[Nothing]
private case object Idle extends IOSuspendable[Nothing]
}
@ -147,7 +147,7 @@ trait IO {
implicit protected def ioActor: Actor with IO = this
private val _messages: mutable.Queue[MessageInvocation] = mutable.Queue.empty
private val _messages: mutable.Queue[Envelope] = mutable.Queue.empty
private var _state: Map[Handle, HandleState] = Map.empty

View file

@ -63,8 +63,8 @@ class BalancingDispatcher(
super.unregister(actor)
}
override protected[akka] def dispatch(invocation: MessageInvocation) = {
val mbox = getMailbox(invocation.receiver)
override protected[akka] def dispatch(invocation: Envelope) = {
val mbox = invocation.receiver.mailbox
if (donationInProgress.value == false && (!mbox.isEmpty || mbox.dispatcherLock.locked) && attemptDonationOf(invocation, mbox)) {
//We were busy and we got to donate the message to some other lucky guy, we're done here
} else {
@ -73,7 +73,7 @@ class BalancingDispatcher(
}
}
override protected[akka] def reRegisterForExecution(mbox: MessageQueue with ExecutableMailbox): Unit = {
override protected[akka] def reRegisterForExecution(mbox: Mailbox): Unit = {
try {
donationInProgress.value = true
while (donateFrom(mbox)) {} //When we reregister, first donate messages to another actor
@ -86,7 +86,7 @@ class BalancingDispatcher(
/**
* Returns true if it successfully donated a message
*/
protected def donateFrom(donorMbox: MessageQueue with ExecutableMailbox): Boolean = {
protected def donateFrom(donorMbox: Mailbox): Boolean = {
val actors = members // copy to prevent concurrent modifications having any impact
// we risk to pick a thief which is unregistered from the dispatcher in the meantime, but that typically means
@ -101,7 +101,7 @@ class BalancingDispatcher(
/**
* Returns true if the donation succeeded or false otherwise
*/
protected def attemptDonationOf(message: MessageInvocation, donorMbox: MessageQueue with ExecutableMailbox): Boolean = try {
protected def attemptDonationOf(message: Envelope, donorMbox: Mailbox): Boolean = try {
donationInProgress.value = true
val actors = members // copy to prevent concurrent modifications having any impact
doFindDonorRecipient(donorMbox, actors, System.identityHashCode(message) % actors.size) match {
@ -114,7 +114,7 @@ class BalancingDispatcher(
* Rewrites the message and adds that message to the recipients mailbox
* returns true if the message is non-null
*/
protected def donate(organ: MessageInvocation, recipient: ActorCell): Boolean = {
protected def donate(organ: Envelope, recipient: ActorCell): Boolean = {
if (organ ne null) {
recipient.postMessageToMailbox(organ.message, organ.channel)
true
@ -124,14 +124,14 @@ class BalancingDispatcher(
/**
* Returns an available recipient for the message, if any
*/
protected def doFindDonorRecipient(donorMbox: MessageQueue with ExecutableMailbox, potentialRecipients: Vector[ActorCell], startIndex: Int): ActorCell = {
protected def doFindDonorRecipient(donorMbox: Mailbox, potentialRecipients: Vector[ActorCell], startIndex: Int): ActorCell = {
val prSz = potentialRecipients.size
var i = 0
var recipient: ActorCell = null
while ((i < prSz) && (recipient eq null)) {
val actor = potentialRecipients((i + startIndex) % prSz) //Wrap-around, one full lap
val mbox = getMailbox(actor)
val mbox = actor.mailbox
if ((mbox ne donorMbox) && mbox.isEmpty) { //Don't donate to yourself
recipient = actor //Found!

View file

@ -90,16 +90,16 @@ class Dispatcher(
protected[akka] val executorServiceFactory = executorServiceFactoryProvider.createExecutorServiceFactory(name)
protected[akka] val executorService = new AtomicReference[ExecutorService](new LazyExecutorServiceWrapper(executorServiceFactory.createExecutorService))
protected[akka] def dispatch(invocation: MessageInvocation) = {
val mbox = getMailbox(invocation.receiver)
protected[akka] def dispatch(invocation: Envelope) = {
val mbox = invocation.receiver.mailbox
if (mbox ne null) {
mbox enqueue invocation
registerForExecution(mbox)
}
}
protected[akka] def systemDispatch(invocation: SystemMessageInvocation) = {
val mbox = getMailbox(invocation.receiver)
protected[akka] def systemDispatch(invocation: SystemEnvelope) = {
val mbox = invocation.receiver.mailbox
if (mbox ne null) {
mbox systemEnqueue invocation
registerForExecution(mbox)
@ -115,31 +115,7 @@ class Dispatcher(
}
}
/**
* @return the mailbox associated with the actor
*/
protected def getMailbox(receiver: ActorCell) = receiver.mailbox.asInstanceOf[MessageQueue with ExecutableMailbox]
override def mailboxIsEmpty(actor: ActorCell): Boolean = getMailbox(actor).isEmpty
override def mailboxSize(actor: ActorCell): Int = getMailbox(actor).size
def createMailbox(actor: ActorCell): AnyRef = mailboxType match {
case b: UnboundedMailbox
new ConcurrentLinkedQueue[MessageInvocation] with MessageQueue with ExecutableMailbox {
@inline
final def dispatcher = Dispatcher.this
@inline
final def enqueue(m: MessageInvocation) = this.add(m)
@inline
final def dequeue(): MessageInvocation = this.poll()
}
case b: BoundedMailbox
new DefaultBoundedMessageQueue(b.capacity, b.pushTimeOut) with ExecutableMailbox {
@inline
final def dispatcher = Dispatcher.this
}
}
def createMailbox(actor: ActorCell): Mailbox = mailboxType.create(this)
protected[akka] def start {}
@ -150,7 +126,7 @@ class Dispatcher(
}
}
protected[akka] def registerForExecution(mbox: MessageQueue with ExecutableMailbox): Unit = {
protected[akka] def registerForExecution(mbox: Mailbox): Unit = {
if (mbox.dispatcherLock.tryLock()) {
if (active.isOn && (!mbox.suspended.locked || !mbox.systemMessages.isEmpty)) { //If the dispatcher is active and the actor not suspended
try {
@ -167,11 +143,11 @@ class Dispatcher(
}
}
protected[akka] def reRegisterForExecution(mbox: MessageQueue with ExecutableMailbox): Unit =
protected[akka] def reRegisterForExecution(mbox: Mailbox): Unit =
registerForExecution(mbox)
protected override def cleanUpMailboxFor(actor: ActorCell) {
val m = getMailbox(actor)
val m = actor.mailbox
if (!m.isEmpty) {
var invocation = m.dequeue
lazy val exception = new ActorKilledException("Actor has been stopped")
@ -185,68 +161,15 @@ class Dispatcher(
override val toString = getClass.getSimpleName + "[" + name + "]"
def suspend(actor: ActorCell): Unit =
getMailbox(actor).suspended.tryLock
actor.mailbox.suspended.tryLock
def resume(actor: ActorCell): Unit = {
val mbox = getMailbox(actor)
val mbox = actor.mailbox
mbox.suspended.tryUnlock
reRegisterForExecution(mbox)
}
}
/**
* This is the behavior of an Dispatchers mailbox.
*/
trait ExecutableMailbox extends Runnable { self: MessageQueue
def dispatcher: Dispatcher
final def run = {
try { processMailbox() } catch {
case ie: InterruptedException Thread.currentThread().interrupt() //Restore interrupt
} finally {
dispatcherLock.unlock()
if (!self.isEmpty || !self.systemMessages.isEmpty)
dispatcher.reRegisterForExecution(this)
}
}
/**
* Process the messages in the mailbox
*
* @return true if the processing finished before the mailbox was empty, due to the throughput constraint
*/
final def processMailbox() {
processAllSystemMessages()
if (!self.suspended.locked) {
var nextMessage = self.dequeue
if (nextMessage ne null) { //If we have a message
if (dispatcher.throughput <= 1) //If we only run one message per process
nextMessage.invoke //Just run it
else { //But otherwise, if we are throttled, we need to do some book-keeping
var processedMessages = 0
val isDeadlineEnabled = dispatcher.throughputDeadlineTime > 0
val deadlineNs = if (isDeadlineEnabled) System.nanoTime + TimeUnit.MILLISECONDS.toNanos(dispatcher.throughputDeadlineTime)
else 0
do {
nextMessage.invoke
processAllSystemMessages()
nextMessage =
if (self.suspended.locked) {
null // If we are suspended, abort
} else { // If we aren't suspended, we need to make sure we're not overstepping our boundaries
processedMessages += 1
if ((processedMessages >= dispatcher.throughput) || (isDeadlineEnabled && System.nanoTime >= deadlineNs)) // If we're throttled, break out
null //We reached our boundaries, abort
else self.dequeue //Dequeue the next message
}
} while (nextMessage ne null)
}
}
}
}
}
object PriorityGenerator {
/**
* Creates a PriorityGenerator that uses the supplied function as priority generator
@ -260,13 +183,15 @@ object PriorityGenerator {
* A PriorityGenerator is a convenience API to create a Comparator that orders the messages of a
* PriorityDispatcher
*/
abstract class PriorityGenerator extends java.util.Comparator[MessageInvocation] {
abstract class PriorityGenerator extends java.util.Comparator[Envelope] {
def gen(message: Any): Int
final def compare(thisMessage: MessageInvocation, thatMessage: MessageInvocation): Int =
final def compare(thisMessage: Envelope, thatMessage: Envelope): Int =
gen(thisMessage.message) - gen(thatMessage.message)
}
// TODO: should this be deleted, given that any dispatcher can now use UnboundedPriorityMailbox?
/**
* A version of Dispatcher that gives all actors registered to it a priority mailbox,
* prioritized according to the supplied comparator.
@ -275,50 +200,29 @@ abstract class PriorityGenerator extends java.util.Comparator[MessageInvocation]
*/
class PriorityDispatcher(
name: String,
val comparator: java.util.Comparator[MessageInvocation],
val comparator: java.util.Comparator[Envelope],
throughput: Int = Dispatchers.THROUGHPUT,
throughputDeadlineTime: Int = Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS,
mailboxType: MailboxType = Dispatchers.MAILBOX_TYPE,
executorServiceFactoryProvider: ExecutorServiceFactoryProvider = ThreadPoolConfig()) extends Dispatcher(name, throughput, throughputDeadlineTime, mailboxType, executorServiceFactoryProvider) with PriorityMailbox {
executorServiceFactoryProvider: ExecutorServiceFactoryProvider = ThreadPoolConfig()) extends Dispatcher(name, throughput, throughputDeadlineTime, mailboxType, executorServiceFactoryProvider) {
def this(name: String, comparator: java.util.Comparator[MessageInvocation], throughput: Int, throughputDeadlineTime: Int, mailboxType: MailboxType) =
def this(name: String, comparator: java.util.Comparator[Envelope], throughput: Int, throughputDeadlineTime: Int, mailboxType: MailboxType) =
this(name, comparator, throughput, throughputDeadlineTime, mailboxType, ThreadPoolConfig()) // Needed for Java API usage
def this(name: String, comparator: java.util.Comparator[MessageInvocation], throughput: Int, mailboxType: MailboxType) =
def this(name: String, comparator: java.util.Comparator[Envelope], throughput: Int, mailboxType: MailboxType) =
this(name, comparator, throughput, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, mailboxType) // Needed for Java API usage
def this(name: String, comparator: java.util.Comparator[MessageInvocation], throughput: Int) =
def this(name: String, comparator: java.util.Comparator[Envelope], throughput: Int) =
this(name, comparator, throughput, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, Dispatchers.MAILBOX_TYPE) // Needed for Java API usage
def this(name: String, comparator: java.util.Comparator[MessageInvocation], executorServiceFactoryProvider: ExecutorServiceFactoryProvider) =
def this(name: String, comparator: java.util.Comparator[Envelope], executorServiceFactoryProvider: ExecutorServiceFactoryProvider) =
this(name, comparator, Dispatchers.THROUGHPUT, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, Dispatchers.MAILBOX_TYPE, executorServiceFactoryProvider)
def this(name: String, comparator: java.util.Comparator[MessageInvocation]) =
def this(name: String, comparator: java.util.Comparator[Envelope]) =
this(name, comparator, Dispatchers.THROUGHPUT, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, Dispatchers.MAILBOX_TYPE) // Needed for Java API usage
}
/**
* Can be used to give an Dispatcher's actors priority-enabled mailboxes
*
* Usage:
* new Dispatcher(...) with PriorityMailbox {
* val comparator = ...comparator that determines mailbox priority ordering...
* }
*/
trait PriorityMailbox { self: Dispatcher
def comparator: java.util.Comparator[MessageInvocation]
override def createMailbox(actor: ActorCell): AnyRef = self.mailboxType match {
case b: UnboundedMailbox
new UnboundedPriorityMessageQueue(comparator) with ExecutableMailbox {
@inline
final def dispatcher = self
}
case b: BoundedMailbox
new BoundedPriorityMessageQueue(b.capacity, b.pushTimeOut, comparator) with ExecutableMailbox {
@inline
final def dispatcher = self
}
override def createMailbox(actor: ActorCell): Mailbox = mailboxType match {
case _: UnboundedMailbox UnboundedPriorityMailbox(comparator).create(this)
case BoundedMailbox(cap, timeout) BoundedPriorityMailbox(comparator, cap, timeout).create(this)
}
}

View file

@ -5,76 +5,174 @@
package akka.dispatch
import akka.AkkaException
import java.util.{ Comparator, PriorityQueue }
import java.util.concurrent._
import akka.util._
import java.util.Queue
import akka.actor.ActorContext
class MessageQueueAppendFailedException(message: String, cause: Throwable = null) extends AkkaException(message, cause)
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
trait MessageQueue {
val dispatcherLock = new SimpleLock(startLocked = false)
val suspended = new SimpleLock(startLocked = false) //(startLocked = true)
val systemMessages = new ConcurrentLinkedQueue[SystemMessageInvocation]()
trait Mailbox extends Runnable {
/*
* Internal implementation of MessageDispatcher uses these, don't touch or rely on
*/
final val dispatcherLock = new SimpleLock(startLocked = false)
final val suspended = new SimpleLock(startLocked = false) //(startLocked = true)
final val systemMessages = new ConcurrentLinkedQueue[SystemEnvelope]()
def enqueue(handle: MessageInvocation)
def dequeue(): MessageInvocation
def systemEnqueue(handle: SystemMessageInvocation): Unit = systemMessages.offer(handle)
def systemDequeue(): SystemMessageInvocation = systemMessages.poll()
def size: Int
def isEmpty: Boolean
final def systemEnqueue(handle: SystemEnvelope): Unit = systemMessages.offer(handle)
final def systemDequeue(): SystemEnvelope = systemMessages.poll()
def processAllSystemMessages(): Unit = {
var nextMessage = systemDequeue()
while (nextMessage ne null) {
nextMessage.invoke()
nextMessage = systemDequeue()
def dispatcher: MessageDispatcher
final def run = {
try { processMailbox() } catch {
case ie: InterruptedException Thread.currentThread().interrupt() //Restore interrupt
} finally {
dispatcherLock.unlock()
if (!isEmpty || !systemMessages.isEmpty)
dispatcher.reRegisterForExecution(this)
}
}
/**
* Process the messages in the mailbox
*
* @return true if the processing finished before the mailbox was empty, due to the throughput constraint
*/
final def processMailbox() {
if (processAllSystemMessages() && !suspended.locked) {
var nextMessage = dequeue()
if (nextMessage ne null) { //If we have a message
if (dispatcher.throughput <= 1) //If we only run one message per process
nextMessage.invoke //Just run it
else { //But otherwise, if we are throttled, we need to do some book-keeping
var processedMessages = 0
val isDeadlineEnabled = dispatcher.throughputDeadlineTime > 0
val deadlineNs = if (isDeadlineEnabled) System.nanoTime + TimeUnit.MILLISECONDS.toNanos(dispatcher.throughputDeadlineTime)
else 0
do {
nextMessage.invoke
nextMessage =
if (!processAllSystemMessages() || suspended.locked) {
null // If we are suspended, abort
} else { // If we aren't suspended, we need to make sure we're not overstepping our boundaries
processedMessages += 1
if ((processedMessages >= dispatcher.throughput) || (isDeadlineEnabled && System.nanoTime >= deadlineNs)) // If we're throttled, break out
null //We reached our boundaries, abort
else dequeue //Dequeue the next message
}
} while (nextMessage ne null)
}
}
}
}
def processAllSystemMessages(): Boolean = {
var nextMessage = systemDequeue()
while (nextMessage ne null) {
if (!nextMessage.invoke()) return false
nextMessage = systemDequeue()
}
true
}
/*
* These method need to be implemented in subclasses; they should not rely on the internal stuff above.
*/
def enqueue(handle: Envelope)
def dequeue(): Envelope
def size: Int
def isEmpty: Boolean
}
trait UnboundedMessageQueueSemantics { self: QueueMailbox
val queue: Queue[Envelope]
final def enqueue(handle: Envelope): Unit = queue add handle
final def dequeue(): Envelope = queue.poll()
}
trait BoundedMessageQueueSemantics { self: BlockingQueueMailbox
def pushTimeOut: Duration
final def enqueue(handle: Envelope) {
if (pushTimeOut.length > 0) {
queue.offer(handle, pushTimeOut.length, pushTimeOut.unit) || {
throw new MessageQueueAppendFailedException("Couldn't enqueue message " + handle + " to " + toString)
}
} else queue put handle
}
final def dequeue(): Envelope = queue.poll()
}
trait QueueMailbox extends Mailbox {
val queue: Queue[Envelope]
final def size = queue.size
final def isEmpty = queue.isEmpty
}
trait BlockingQueueMailbox extends QueueMailbox {
val queue: BlockingQueue[Envelope]
}
abstract class PriorityBlockingQueueMailbox(cmp: Comparator[Envelope], val dispatcher: MessageDispatcher) extends BlockingQueueMailbox {
override val queue = new PriorityBlockingQueue(11, cmp) // 11 is the default initial capacity in PriorityQueue.java
}
abstract class ConcurrentLinkedQueueMailbox(val dispatcher: MessageDispatcher) extends QueueMailbox {
override val queue = new ConcurrentLinkedQueue[Envelope]
}
abstract class LinkedBlockingQueueMailbox(val dispatcher: MessageDispatcher) extends BlockingQueueMailbox {
override val queue = new LinkedBlockingQueue[Envelope]
}
/**
* Mailbox configuration.
*/
sealed trait MailboxType
trait MailboxType {
def create(dispatcher: MessageDispatcher): Mailbox
}
case class UnboundedMailbox() extends MailboxType {
override def create(dispatcher: MessageDispatcher) = new ConcurrentLinkedQueueMailbox(dispatcher) with UnboundedMessageQueueSemantics
}
case class UnboundedMailbox() extends MailboxType
case class BoundedMailbox(
val capacity: Int = { if (Dispatchers.MAILBOX_CAPACITY < 0) Int.MaxValue else Dispatchers.MAILBOX_CAPACITY },
val pushTimeOut: Duration = Dispatchers.MAILBOX_PUSH_TIME_OUT) extends MailboxType {
if (capacity < 0) throw new IllegalArgumentException("The capacity for BoundedMailbox can not be negative")
if (pushTimeOut eq null) throw new IllegalArgumentException("The push time-out for BoundedMailbox can not be null")
}
trait UnboundedMessageQueueSemantics extends MessageQueue { self: BlockingQueue[MessageInvocation]
@inline
final def enqueue(handle: MessageInvocation): Unit = this add handle
@inline
final def dequeue(): MessageInvocation = this.poll()
}
trait BoundedMessageQueueSemantics extends MessageQueue { self: BlockingQueue[MessageInvocation]
def pushTimeOut: Duration
final def enqueue(handle: MessageInvocation) {
if (pushTimeOut.length > 0) {
this.offer(handle, pushTimeOut.length, pushTimeOut.unit) || {
throw new MessageQueueAppendFailedException("Couldn't enqueue message " + handle + " to " + toString)
}
} else this put handle
override def create(dispatcher: MessageDispatcher) = new LinkedBlockingQueueMailbox(dispatcher) with BoundedMessageQueueSemantics {
val capacity = BoundedMailbox.this.capacity
val pushTimeOut = BoundedMailbox.this.pushTimeOut
}
@inline
final def dequeue(): MessageInvocation = this.poll()
}
class DefaultUnboundedMessageQueue extends LinkedBlockingQueue[MessageInvocation] with UnboundedMessageQueueSemantics
case class UnboundedPriorityMailbox(cmp: Comparator[Envelope]) extends MailboxType {
override def create(dispatcher: MessageDispatcher) = new PriorityBlockingQueueMailbox(cmp, dispatcher) with UnboundedMessageQueueSemantics
}
class DefaultBoundedMessageQueue(capacity: Int, val pushTimeOut: Duration) extends LinkedBlockingQueue[MessageInvocation](capacity) with BoundedMessageQueueSemantics
case class BoundedPriorityMailbox(
val cmp: Comparator[Envelope],
val capacity: Int = { if (Dispatchers.MAILBOX_CAPACITY < 0) Int.MaxValue else Dispatchers.MAILBOX_CAPACITY },
val pushTimeOut: Duration = Dispatchers.MAILBOX_PUSH_TIME_OUT) extends MailboxType {
class UnboundedPriorityMessageQueue(cmp: Comparator[MessageInvocation]) extends PriorityBlockingQueue[MessageInvocation](11, cmp) with UnboundedMessageQueueSemantics
if (capacity < 0) throw new IllegalArgumentException("The capacity for BoundedMailbox can not be negative")
if (pushTimeOut eq null) throw new IllegalArgumentException("The push time-out for BoundedMailbox can not be null")
override def create(dispatcher: MessageDispatcher) = new PriorityBlockingQueueMailbox(cmp, dispatcher) with BoundedMessageQueueSemantics {
val capacity = BoundedPriorityMailbox.this.capacity
val pushTimeOut = BoundedPriorityMailbox.this.pushTimeOut
}
}
class BoundedPriorityMessageQueue(capacity: Int, val pushTimeOut: Duration, cmp: Comparator[MessageInvocation]) extends BoundedBlockingQueue[MessageInvocation](capacity, new PriorityQueue[MessageInvocation](11, cmp)) with BoundedMessageQueueSemantics

View file

@ -16,7 +16,7 @@ import akka.actor._
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
final case class MessageInvocation(val receiver: ActorCell, val message: Any, val channel: UntypedChannel) {
final case class Envelope(val receiver: ActorCell, val message: Any, val channel: UntypedChannel) {
if (receiver eq null) throw new IllegalArgumentException("Receiver can't be null")
final def invoke() { receiver invoke this }
@ -29,9 +29,12 @@ case object Suspend extends SystemMessage
case object Resume extends SystemMessage
case object Terminate extends SystemMessage
final case class SystemMessageInvocation(val receiver: ActorCell, val message: SystemMessage, val channel: UntypedChannel) {
final case class SystemEnvelope(val receiver: ActorCell, val message: SystemMessage, val channel: UntypedChannel) {
if (receiver eq null) throw new IllegalArgumentException("Receiver can't be null")
final def invoke() { receiver systemInvoke this }
/**
* @return whether to proceed with processing other messages
*/
final def invoke(): Boolean = receiver systemInvoke this
}
final case class TaskInvocation(function: () Unit, cleanup: () Unit) extends Runnable {
@ -70,7 +73,18 @@ abstract class MessageDispatcher extends Serializable {
/**
* Creates and returns a mailbox for the given actor.
*/
protected[akka] def createMailbox(actor: ActorCell): AnyRef
protected[akka] def createMailbox(actor: ActorCell): Mailbox
/**
* Create a blackhole mailbox for the purpose of replacing the real one upon actor termination
*/
protected[akka] def createDeadletterMailbox = new Mailbox {
override def dispatcher = MessageDispatcher.this
override def enqueue(envelope: Envelope) {}
override def dequeue() = null
override def isEmpty = true
override def size = 0
}
/**
* Name of this dispatcher.
@ -98,7 +112,7 @@ abstract class MessageDispatcher extends Serializable {
}
}
protected[akka] final def dispatchMessage(invocation: MessageInvocation): Unit = dispatch(invocation)
protected[akka] final def dispatchMessage(invocation: Envelope): Unit = dispatch(invocation)
protected[akka] final def dispatchTask(block: () Unit): Unit = {
_tasks.getAndIncrement()
@ -156,7 +170,7 @@ abstract class MessageDispatcher extends Serializable {
protected[akka] def unregister(actor: ActorCell) = {
if (uuids remove actor.uuid) {
cleanUpMailboxFor(actor)
actor.mailbox = null
actor.mailbox = createDeadletterMailbox
if (uuids.isEmpty && _tasks.get == 0) {
shutdownSchedule match {
case UNSCHEDULED
@ -229,12 +243,21 @@ abstract class MessageDispatcher extends Serializable {
/**
* Will be called when the dispatcher is to queue an invocation for execution
*/
protected[akka] def dispatch(invocation: MessageInvocation)
protected[akka] def dispatch(invocation: Envelope)
/**
* Callback for processMailbox() which is called after one sweep of processing is done.
*/
protected[akka] def reRegisterForExecution(mbox: Mailbox)
// TODO check whether this should not actually be a property of the mailbox
protected[akka] def throughput: Int
protected[akka] def throughputDeadlineTime: Int
/**
* Will be called when the dispatcher is to queue an invocation for execution
*/
protected[akka] def systemDispatch(invocation: SystemMessageInvocation)
protected[akka] def systemDispatch(invocation: SystemEnvelope)
protected[akka] def executeTask(invocation: TaskInvocation)
@ -251,12 +274,12 @@ abstract class MessageDispatcher extends Serializable {
/**
* Returns the size of the mailbox for the specified actor
*/
def mailboxSize(actor: ActorCell): Int
def mailboxSize(actor: ActorCell): Int = actor.mailbox.size
/**
* Returns the "current" emptiness status of the mailbox for the specified actor
*/
def mailboxIsEmpty(actor: ActorCell): Boolean
def mailboxIsEmpty(actor: ActorCell): Boolean = actor.mailbox.isEmpty
/**
* Returns the amount of tasks queued for execution

View file

@ -4,7 +4,7 @@
package akka.util
import akka.dispatch.MessageInvocation
import akka.dispatch.Envelope
import akka.config.{ Config, ModuleNotAvailableException }
import akka.cluster.RemoteSupport
import akka.actor._
@ -99,8 +99,8 @@ object ReflectiveAccess {
}
type Mailbox = {
def enqueue(message: MessageInvocation)
def dequeue: MessageInvocation
def enqueue(message: Envelope)
def dequeue: Envelope
}
type TransactionLogObject = {
@ -118,7 +118,7 @@ object ReflectiveAccess {
}
type TransactionLog = {
def recordEntry(messageHandle: MessageInvocation, actorRef: LocalActorRef)
def recordEntry(messageHandle: Envelope, actorRef: LocalActorRef)
def recordEntry(entry: Array[Byte])
def recordSnapshot(snapshot: Array[Byte])
def entries: Vector[Array[Byte]]

View file

@ -16,7 +16,7 @@ import akka.actor._
import akka.camel.{ Ack, Failure, Message }
import akka.camel.CamelMessageConversion.toExchangeAdapter
import scala.reflect.BeanProperty
import akka.dispatch.{ FutureTimeoutException, Promise, MessageInvocation, MessageDispatcher }
import akka.dispatch.{ FutureTimeoutException, Promise, MessageDispatcher }
/**
* @author Martin Krasser

View file

@ -8,7 +8,7 @@ import akka.config.Supervision._
import akka.actor.{ uuidFrom, newUuid }
import akka.actor._
import DeploymentConfig._
import akka.dispatch.MessageInvocation
import akka.dispatch.Envelope
import akka.util.{ ReflectiveAccess, Duration }
import akka.cluster.{ RemoteClientSettings, MessageSerializer }
import akka.cluster.RemoteProtocol
@ -107,9 +107,9 @@ object ActorSerialization {
l.underlying.mailbox match {
case null throw new IllegalActorStateException("Can't serialize an actor that has not been started.")
case q: java.util.Queue[_]
val l = new scala.collection.mutable.ListBuffer[MessageInvocation]
val l = new scala.collection.mutable.ListBuffer[Envelope]
val it = q.iterator
while (it.hasNext) l += it.next.asInstanceOf[MessageInvocation]
while (it.hasNext) l += it.next.asInstanceOf[Envelope]
l map { m
RemoteActorSerialization.createRemoteMessageProtocolBuilder(

View file

@ -16,17 +16,17 @@ import akka.dispatch._
/*
* Locking rules:
*
* While not suspended, messages are processed (!isActive) or queued
* thread-locally (isActive). While suspended, messages are queued
* While not suspendSwitch, messages are processed (!isActive) or queued
* thread-locally (isActive). While suspendSwitch, messages are queued
* thread-locally. When resuming, all messages are atomically scooped from all
* non-active threads and queued on the resuming thread's queue, to be
* processed immediately. Processing a queue checks suspend before each
* invocation, leaving the active state if suspended. For this to work
* invocation, leaving the active state if suspendSwitch. For this to work
* reliably, the active flag needs to be set atomically with the initial check
* for suspend. Scooping up messages means replacing the ThreadLocal's contents
* with an empty new NestingQueue.
*
* All accesses to the queue must be done under the suspended-switch's lock, so
* All accesses to the queue must be done under the suspendSwitch-switch's lock, so
* within one of its methods taking a closure argument.
*/
@ -83,7 +83,7 @@ object CallingThreadDispatcher {
* Dispatcher which runs invocations on the current thread only. This
* dispatcher does not create any new threads, but it can be used from
* different threads concurrently for the same actor. The dispatch strategy is
* to run on the current thread unless the target actor is either suspended or
* to run on the current thread unless the target actor is either suspendSwitch or
* already running on the current thread (if it is running on a different
* thread, then this thread will block until that other invocation is
* finished); if the invocation is not run, it is queued in a thread-local
@ -93,7 +93,7 @@ object CallingThreadDispatcher {
*
* Suspending and resuming are global actions for one actor, meaning they can
* affect different threads, which leads to complications. If messages are
* queued (thread-locally) during the suspended period, the only thread to run
* queued (thread-locally) during the suspendSwitch period, the only thread to run
* them upon resume is the thread actually calling the resume method. Hence,
* all thread-local queues which are not currently being drained (possible,
* since suspend-queue-resume might happen entirely during an invocation on a
@ -107,7 +107,7 @@ object CallingThreadDispatcher {
class CallingThreadDispatcher(val name: String = "calling-thread", val warnings: Boolean = true) extends MessageDispatcher {
import CallingThreadDispatcher._
protected[akka] override def createMailbox(actor: ActorCell) = new CallingThreadMailbox
protected[akka] override def createMailbox(actor: ActorCell) = new CallingThreadMailbox(this)
private def getMailbox(actor: ActorCell) = actor.mailbox.asInstanceOf[CallingThreadMailbox]
@ -115,17 +115,22 @@ class CallingThreadDispatcher(val name: String = "calling-thread", val warnings:
protected[akka] override def shutdown() {}
protected[akka] override def reRegisterForExecution(mbox: Mailbox) {}
protected[akka] override def throughput = 0
protected[akka] override def throughputDeadlineTime = 0
protected[akka] override def timeoutMs = 100L
override def suspend(actor: ActorCell) {
getMailbox(actor).suspended.switchOn
getMailbox(actor).suspendSwitch.switchOn
}
override def resume(actor: ActorCell) {
val mbox = getMailbox(actor)
val queue = mbox.queue
val wasActive = queue.isActive
val switched = mbox.suspended.switchOff {
val switched = mbox.suspendSwitch.switchOff {
gatherFromAllInactiveQueues(mbox, queue)
}
if (switched && !wasActive) {
@ -137,7 +142,7 @@ class CallingThreadDispatcher(val name: String = "calling-thread", val warnings:
override def mailboxIsEmpty(actor: ActorCell): Boolean = getMailbox(actor).queue.isEmpty
protected[akka] override def systemDispatch(handle: SystemMessageInvocation) {
protected[akka] override def systemDispatch(handle: SystemEnvelope) {
val mbox = getMailbox(handle.receiver)
mbox.lock.lock
try {
@ -147,13 +152,13 @@ class CallingThreadDispatcher(val name: String = "calling-thread", val warnings:
}
}
protected[akka] override def dispatch(handle: MessageInvocation) {
protected[akka] override def dispatch(handle: Envelope) {
val mbox = getMailbox(handle.receiver)
val queue = mbox.queue
val execute = mbox.suspended.fold {
val execute = mbox.suspendSwitch.fold {
queue.push(handle)
if (warnings && handle.channel.isInstanceOf[Promise[_]]) {
EventHandler.warning(this, "suspended, creating Future could deadlock; target: %s" format handle.receiver)
EventHandler.warning(this, "suspendSwitch, creating Future could deadlock; target: %s" format handle.receiver)
}
false
} {
@ -180,14 +185,14 @@ class CallingThreadDispatcher(val name: String = "calling-thread", val warnings:
*
* If the catch block is executed, then a non-empty mailbox may be stalled as
* there is no-one who cares to execute it before the next message is sent or
* it is suspended and resumed.
* it is suspendSwitch and resumed.
*/
@tailrec
private def runQueue(mbox: CallingThreadMailbox, queue: NestingQueue) {
assert(queue.isActive)
mbox.lock.lock
val recurse = try {
val handle = mbox.suspended.fold[MessageInvocation] {
val handle = mbox.suspendSwitch.fold[Envelope] {
queue.leave
null
} {
@ -224,10 +229,10 @@ class CallingThreadDispatcher(val name: String = "calling-thread", val warnings:
}
class NestingQueue {
private var q = new LinkedList[MessageInvocation]()
private var q = new LinkedList[Envelope]()
def size = q.size
def isEmpty = q.isEmpty
def push(handle: MessageInvocation) { q.offer(handle) }
def push(handle: Envelope) { q.offer(handle) }
def peek = q.peek
def pop = q.poll
@ -238,7 +243,7 @@ class NestingQueue {
def isActive = active
}
class CallingThreadMailbox {
class CallingThreadMailbox(val dispatcher: MessageDispatcher) extends Mailbox {
private val q = new ThreadLocal[NestingQueue]() {
override def initialValue = new NestingQueue
@ -247,6 +252,10 @@ class CallingThreadMailbox {
def queue = q.get
val lock = new ReentrantLock
val suspendSwitch = new Switch
val suspended = new Switch(false)
override def enqueue(msg: Envelope) {}
override def dequeue() = null
override def isEmpty = true
override def size = 0
}