diff --git a/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala
index 0216f9825e..00ff5c7817 100644
--- a/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala
+++ b/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala
@@ -124,7 +124,7 @@ object ActorModelSpec {
getStats(actor.ref).unregisters.incrementAndGet()
}
- protected[akka] abstract override def dispatch(invocation: MessageInvocation) {
+ protected[akka] abstract override def dispatch(invocation: Envelope) {
getStats(invocation.receiver.ref).msgsReceived.incrementAndGet()
super.dispatch(invocation)
}
diff --git a/akka-actor-tests/src/test/scala/akka/actor/dispatch/BalancingDispatcherSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/dispatch/BalancingDispatcherSpec.scala
index a2b31e4b3e..9e697c3af4 100644
--- a/akka-actor-tests/src/test/scala/akka/actor/dispatch/BalancingDispatcherSpec.scala
+++ b/akka-actor-tests/src/test/scala/akka/actor/dispatch/BalancingDispatcherSpec.scala
@@ -7,7 +7,7 @@ import org.junit.Test
import java.util.concurrent.{ TimeUnit, CountDownLatch }
import akka.actor.Actor._
-import akka.dispatch.{ MessageQueue, Dispatchers }
+import akka.dispatch.{ Mailbox, Dispatchers }
import akka.actor.{ LocalActorRef, IllegalActorStateException, Actor, Props }
object BalancingDispatcherSpec {
@@ -80,8 +80,8 @@ class BalancingDispatcherSpec extends JUnitSuite with MustMatchers {
}
finishedCounter.await(5, TimeUnit.SECONDS)
- fast.underlying.mailbox.asInstanceOf[MessageQueue].isEmpty must be(true)
- slow.underlying.mailbox.asInstanceOf[MessageQueue].isEmpty must be(true)
+ fast.underlying.mailbox.asInstanceOf[Mailbox].isEmpty must be(true)
+ slow.underlying.mailbox.asInstanceOf[Mailbox].isEmpty must be(true)
fast.underlyingActorInstance.asInstanceOf[DelayableActor].invocationCount must be > sentToFast
fast.underlyingActorInstance.asInstanceOf[DelayableActor].invocationCount must be >
(slow.underlyingActorInstance.asInstanceOf[DelayableActor].invocationCount)
diff --git a/akka-actor-tests/src/test/scala/akka/dispatch/MailboxConfigSpec.scala b/akka-actor-tests/src/test/scala/akka/dispatch/MailboxConfigSpec.scala
index c9ceab557e..a4625fe648 100644
--- a/akka-actor-tests/src/test/scala/akka/dispatch/MailboxConfigSpec.scala
+++ b/akka-actor-tests/src/test/scala/akka/dispatch/MailboxConfigSpec.scala
@@ -15,7 +15,7 @@ import akka.actor.{ LocalActorRef, Actor, ActorRegistry, NullChannel }
abstract class MailboxSpec extends WordSpec with MustMatchers with BeforeAndAfterAll with BeforeAndAfterEach {
def name: String
- def factory: MailboxType ⇒ MessageQueue
+ def factory: MailboxType ⇒ Mailbox
name should {
"create an unbounded mailbox" in {
@@ -80,14 +80,14 @@ abstract class MailboxSpec extends WordSpec with MustMatchers with BeforeAndAfte
result
}
- def createMessageInvocation(msg: Any): MessageInvocation = {
- new MessageInvocation(
+ def createMessageInvocation(msg: Any): Envelope = {
+ new Envelope(
actorOf(new Actor { //Dummy actor
def receive = { case _ ⇒ }
}).asInstanceOf[LocalActorRef].underlying, msg, NullChannel)
}
- def ensureInitialMailboxState(config: MailboxType, q: MessageQueue) {
+ def ensureInitialMailboxState(config: MailboxType, q: Mailbox) {
q must not be null
q match {
case aQueue: BlockingQueue[_] ⇒
@@ -106,7 +106,7 @@ abstract class MailboxSpec extends WordSpec with MustMatchers with BeforeAndAfte
val q = factory(config)
ensureInitialMailboxState(config, q)
- def createProducer(fromNum: Int, toNum: Int): Future[Vector[MessageInvocation]] = spawn {
+ def createProducer(fromNum: Int, toNum: Int): Future[Vector[Envelope]] = spawn {
val messages = Vector() ++ (for (i ← fromNum to toNum) yield createMessageInvocation(i))
for (i ← messages) q.enqueue(i)
messages
@@ -117,8 +117,8 @@ abstract class MailboxSpec extends WordSpec with MustMatchers with BeforeAndAfte
val producers = for (i ← (1 to totalMessages by step).toList) yield createProducer(i, i + step - 1)
- def createConsumer: Future[Vector[MessageInvocation]] = spawn {
- var r = Vector[MessageInvocation]()
+ def createConsumer: Future[Vector[Envelope]] = spawn {
+ var r = Vector[Envelope]()
while (producers.exists(_.isCompleted == false) || !q.isEmpty) {
q.dequeue match {
case null ⇒
@@ -146,8 +146,8 @@ abstract class MailboxSpec extends WordSpec with MustMatchers with BeforeAndAfte
class DefaultMailboxSpec extends MailboxSpec {
lazy val name = "The default mailbox implementation"
def factory = {
- case UnboundedMailbox() ⇒ new DefaultUnboundedMessageQueue()
- case BoundedMailbox(capacity, pushTimeOut) ⇒ new DefaultBoundedMessageQueue(capacity, pushTimeOut)
+ case u: UnboundedMailbox ⇒ u.create(null)
+ case b: BoundedMailbox ⇒ b.create(null)
}
}
@@ -155,7 +155,7 @@ class PriorityMailboxSpec extends MailboxSpec {
val comparator = PriorityGenerator(_.##)
lazy val name = "The priority mailbox implementation"
def factory = {
- case UnboundedMailbox() ⇒ new UnboundedPriorityMessageQueue(comparator)
- case BoundedMailbox(capacity, pushTimeOut) ⇒ new BoundedPriorityMessageQueue(capacity, pushTimeOut, comparator)
+ case UnboundedMailbox() ⇒ UnboundedPriorityMailbox(comparator).create(null)
+ case BoundedMailbox(capacity, pushTimeOut) ⇒ BoundedPriorityMailbox(comparator, capacity, pushTimeOut).create(null)
}
}
diff --git a/akka-actor/src/main/scala/akka/actor/ActorCell.scala b/akka-actor/src/main/scala/akka/actor/ActorCell.scala
index 7ca838c5ee..0651a067f3 100644
--- a/akka-actor/src/main/scala/akka/actor/ActorCell.scala
+++ b/akka-actor/src/main/scala/akka/actor/ActorCell.scala
@@ -31,9 +31,9 @@ private[akka] trait ActorContext {
def hotswap_=(stack: Stack[PartialFunction[Any, Unit]]): Unit
- def currentMessage: MessageInvocation
+ def currentMessage: Envelope
- def currentMessage_=(invocation: MessageInvocation): Unit
+ def currentMessage_=(invocation: Envelope): Unit
def sender: Option[ActorRef]
@@ -69,7 +69,7 @@ private[akka] class ActorCell(
var terminated = false
@volatile
- var mailbox: AnyRef = _
+ var mailbox: Mailbox = _
@volatile
var futureTimeout: Option[ScheduledFuture[AnyRef]] = None
@@ -93,7 +93,7 @@ private[akka] class ActorCell(
var receiveTimeout: Option[Long] = _receiveTimeout // TODO: currently settable from outside for compatibility
@volatile //FIXME volatile can be removed
- var currentMessage: MessageInvocation = null
+ var currentMessage: Envelope = null
val actor: AtomicReference[Actor] = new AtomicReference[Actor]() //FIXME We can most probably make this just a regular reference to Actor
@@ -112,7 +112,7 @@ private[akka] class ActorCell(
if (props.supervisor.isDefined) props.supervisor.get.link(self)
dispatcher.attach(this)
Actor.registry.register(self)
- dispatcher.systemDispatch(SystemMessageInvocation(this, Create, NullChannel))
+ dispatcher.systemDispatch(SystemEnvelope(this, Create, NullChannel))
}
def newActor(restart: Boolean): Actor = {
@@ -150,7 +150,7 @@ private[akka] class ActorCell(
private[akka] def stop(): Unit =
if (!terminated) {
//terminated = true // TODO: turn this into tristate with Running, Terminating, Terminated and use AtomicReferenceFieldUpdater
- dispatcher.systemDispatch(SystemMessageInvocation(this, Terminate, NullChannel))
+ dispatcher.systemDispatch(SystemEnvelope(this, Terminate, NullChannel))
}
def link(actorRef: ActorRef): ActorRef = {
@@ -186,7 +186,7 @@ private[akka] class ActorCell(
def supervisor_=(sup: Option[ActorRef]): Unit = _supervisor = sup
def postMessageToMailbox(message: Any, channel: UntypedChannel): Unit =
- if (isRunning) dispatcher dispatchMessage new MessageInvocation(this, message, channel)
+ if (isRunning) dispatcher dispatchMessage new Envelope(this, message, channel)
else throw new ActorInitializationException("Actor " + self + " is dead")
def postMessageToMailboxAndCreateFutureResultWithTimeout(
@@ -197,7 +197,7 @@ private[akka] class ActorCell(
case f: ActorPromise ⇒ f
case _ ⇒ new ActorPromise(timeout)(dispatcher)
}
- dispatcher dispatchMessage new MessageInvocation(this, message, future)
+ dispatcher dispatchMessage new Envelope(this, message, future)
future
} else throw new ActorInitializationException("Actor " + self + " is dead")
@@ -224,10 +224,10 @@ private[akka] class ActorCell(
case msg ⇒ msg.channel
}
- def systemInvoke(envelope: SystemMessageInvocation): Unit = {
+ def systemInvoke(envelope: SystemEnvelope): Boolean = {
var isTerminated = terminated
- def create(recreation: Boolean): Unit = try {
+ def create(recreation: Boolean): Boolean = try {
actor.get() match {
case null ⇒
val created = newActor(restart = false)
@@ -237,17 +237,27 @@ private[akka] class ActorCell(
restart(new Exception("Restart commanded"), None, None)
case _ ⇒
}
+ true
} catch {
case e ⇒
envelope.channel.sendException(e)
- if (supervisor.isDefined) supervisor.get ! Failed(self, e, false, maxNrOfRetriesCount, restartTimeWindowStartNanos) else throw e
+ if (supervisor.isDefined) {
+ supervisor.get ! Failed(self, e, false, maxNrOfRetriesCount, restartTimeWindowStartNanos)
+ false // don't continue processing messages right now
+ } else throw e
}
- def suspend(): Unit = dispatcher suspend this
+ def suspend(): Boolean = {
+ dispatcher suspend this
+ true
+ }
- def resume(): Unit = dispatcher resume this
+ def resume(): Boolean = {
+ dispatcher resume this
+ true
+ }
- def terminate(): Unit = {
+ def terminate(): Boolean = {
receiveTimeout = None
cancelReceiveTimeout
Actor.registry.unregister(self)
@@ -266,6 +276,9 @@ private[akka] class ActorCell(
}
}
+ // TODO CHECK: stop message dequeuing, which means that mailbox will not be restarted and GCed
+ false
+
} finally {
try {
if (supervisor.isDefined)
@@ -288,6 +301,8 @@ private[akka] class ActorCell(
case Resume ⇒ resume()
case Terminate ⇒ terminate()
}
+ } else {
+ false
}
} catch {
case e ⇒ //Should we really catch everything here?
@@ -299,7 +314,7 @@ private[akka] class ActorCell(
}
}
- def invoke(messageHandle: MessageInvocation): Unit = {
+ def invoke(messageHandle: Envelope): Unit = {
var isTerminated = terminated
guard.lock.lock()
try {
diff --git a/akka-actor/src/main/scala/akka/actor/IO.scala b/akka-actor/src/main/scala/akka/actor/IO.scala
index 92a9cb5563..15c25ee800 100644
--- a/akka-actor/src/main/scala/akka/actor/IO.scala
+++ b/akka-actor/src/main/scala/akka/actor/IO.scala
@@ -4,7 +4,7 @@
package akka.actor
import akka.util.ByteString
-import akka.dispatch.MessageInvocation
+import akka.dispatch.Envelope
import akka.event.EventHandler
import java.net.InetSocketAddress
@@ -130,11 +130,11 @@ object IO {
}
sealed trait IOSuspendable[+A]
- sealed trait CurrentMessage { def message: MessageInvocation }
- private case class ByteStringLength(continuation: (ByteString) ⇒ IOSuspendable[Any], handle: Handle, message: MessageInvocation, length: Int) extends IOSuspendable[ByteString] with CurrentMessage
- private case class ByteStringDelimited(continuation: (ByteString) ⇒ IOSuspendable[Any], handle: Handle, message: MessageInvocation, delimter: ByteString, inclusive: Boolean, scanned: Int) extends IOSuspendable[ByteString] with CurrentMessage
- private case class ByteStringAny(continuation: (ByteString) ⇒ IOSuspendable[Any], handle: Handle, message: MessageInvocation) extends IOSuspendable[ByteString] with CurrentMessage
- private case class Retry(message: MessageInvocation) extends IOSuspendable[Nothing]
+ sealed trait CurrentMessage { def message: Envelope }
+ private case class ByteStringLength(continuation: (ByteString) ⇒ IOSuspendable[Any], handle: Handle, message: Envelope, length: Int) extends IOSuspendable[ByteString] with CurrentMessage
+ private case class ByteStringDelimited(continuation: (ByteString) ⇒ IOSuspendable[Any], handle: Handle, message: Envelope, delimter: ByteString, inclusive: Boolean, scanned: Int) extends IOSuspendable[ByteString] with CurrentMessage
+ private case class ByteStringAny(continuation: (ByteString) ⇒ IOSuspendable[Any], handle: Handle, message: Envelope) extends IOSuspendable[ByteString] with CurrentMessage
+ private case class Retry(message: Envelope) extends IOSuspendable[Nothing]
private case object Idle extends IOSuspendable[Nothing]
}
@@ -147,7 +147,7 @@ trait IO {
implicit protected def ioActor: Actor with IO = this
- private val _messages: mutable.Queue[MessageInvocation] = mutable.Queue.empty
+ private val _messages: mutable.Queue[Envelope] = mutable.Queue.empty
private var _state: Map[Handle, HandleState] = Map.empty
diff --git a/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala
index 5997d922cd..88b7aeb184 100644
--- a/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala
+++ b/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala
@@ -63,8 +63,8 @@ class BalancingDispatcher(
super.unregister(actor)
}
- override protected[akka] def dispatch(invocation: MessageInvocation) = {
- val mbox = getMailbox(invocation.receiver)
+ override protected[akka] def dispatch(invocation: Envelope) = {
+ val mbox = invocation.receiver.mailbox
if (donationInProgress.value == false && (!mbox.isEmpty || mbox.dispatcherLock.locked) && attemptDonationOf(invocation, mbox)) {
//We were busy and we got to donate the message to some other lucky guy, we're done here
} else {
@@ -73,7 +73,7 @@ class BalancingDispatcher(
}
}
- override protected[akka] def reRegisterForExecution(mbox: MessageQueue with ExecutableMailbox): Unit = {
+ override protected[akka] def reRegisterForExecution(mbox: Mailbox): Unit = {
try {
donationInProgress.value = true
while (donateFrom(mbox)) {} //When we reregister, first donate messages to another actor
@@ -86,7 +86,7 @@ class BalancingDispatcher(
/**
* Returns true if it successfully donated a message
*/
- protected def donateFrom(donorMbox: MessageQueue with ExecutableMailbox): Boolean = {
+ protected def donateFrom(donorMbox: Mailbox): Boolean = {
val actors = members // copy to prevent concurrent modifications having any impact
// we risk to pick a thief which is unregistered from the dispatcher in the meantime, but that typically means
@@ -101,7 +101,7 @@ class BalancingDispatcher(
/**
* Returns true if the donation succeeded or false otherwise
*/
- protected def attemptDonationOf(message: MessageInvocation, donorMbox: MessageQueue with ExecutableMailbox): Boolean = try {
+ protected def attemptDonationOf(message: Envelope, donorMbox: Mailbox): Boolean = try {
donationInProgress.value = true
val actors = members // copy to prevent concurrent modifications having any impact
doFindDonorRecipient(donorMbox, actors, System.identityHashCode(message) % actors.size) match {
@@ -114,7 +114,7 @@ class BalancingDispatcher(
* Rewrites the message and adds that message to the recipients mailbox
* returns true if the message is non-null
*/
- protected def donate(organ: MessageInvocation, recipient: ActorCell): Boolean = {
+ protected def donate(organ: Envelope, recipient: ActorCell): Boolean = {
if (organ ne null) {
recipient.postMessageToMailbox(organ.message, organ.channel)
true
@@ -124,14 +124,14 @@ class BalancingDispatcher(
/**
* Returns an available recipient for the message, if any
*/
- protected def doFindDonorRecipient(donorMbox: MessageQueue with ExecutableMailbox, potentialRecipients: Vector[ActorCell], startIndex: Int): ActorCell = {
+ protected def doFindDonorRecipient(donorMbox: Mailbox, potentialRecipients: Vector[ActorCell], startIndex: Int): ActorCell = {
val prSz = potentialRecipients.size
var i = 0
var recipient: ActorCell = null
while ((i < prSz) && (recipient eq null)) {
val actor = potentialRecipients((i + startIndex) % prSz) //Wrap-around, one full lap
- val mbox = getMailbox(actor)
+ val mbox = actor.mailbox
if ((mbox ne donorMbox) && mbox.isEmpty) { //Don't donate to yourself
recipient = actor //Found!
diff --git a/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala
index b232ca6270..9f7ee83c06 100644
--- a/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala
+++ b/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala
@@ -90,16 +90,16 @@ class Dispatcher(
protected[akka] val executorServiceFactory = executorServiceFactoryProvider.createExecutorServiceFactory(name)
protected[akka] val executorService = new AtomicReference[ExecutorService](new LazyExecutorServiceWrapper(executorServiceFactory.createExecutorService))
- protected[akka] def dispatch(invocation: MessageInvocation) = {
- val mbox = getMailbox(invocation.receiver)
+ protected[akka] def dispatch(invocation: Envelope) = {
+ val mbox = invocation.receiver.mailbox
if (mbox ne null) {
mbox enqueue invocation
registerForExecution(mbox)
}
}
- protected[akka] def systemDispatch(invocation: SystemMessageInvocation) = {
- val mbox = getMailbox(invocation.receiver)
+ protected[akka] def systemDispatch(invocation: SystemEnvelope) = {
+ val mbox = invocation.receiver.mailbox
if (mbox ne null) {
mbox systemEnqueue invocation
registerForExecution(mbox)
@@ -115,31 +115,7 @@ class Dispatcher(
}
}
- /**
- * @return the mailbox associated with the actor
- */
- protected def getMailbox(receiver: ActorCell) = receiver.mailbox.asInstanceOf[MessageQueue with ExecutableMailbox]
-
- override def mailboxIsEmpty(actor: ActorCell): Boolean = getMailbox(actor).isEmpty
-
- override def mailboxSize(actor: ActorCell): Int = getMailbox(actor).size
-
- def createMailbox(actor: ActorCell): AnyRef = mailboxType match {
- case b: UnboundedMailbox ⇒
- new ConcurrentLinkedQueue[MessageInvocation] with MessageQueue with ExecutableMailbox {
- @inline
- final def dispatcher = Dispatcher.this
- @inline
- final def enqueue(m: MessageInvocation) = this.add(m)
- @inline
- final def dequeue(): MessageInvocation = this.poll()
- }
- case b: BoundedMailbox ⇒
- new DefaultBoundedMessageQueue(b.capacity, b.pushTimeOut) with ExecutableMailbox {
- @inline
- final def dispatcher = Dispatcher.this
- }
- }
+ def createMailbox(actor: ActorCell): Mailbox = mailboxType.create(this)
protected[akka] def start {}
@@ -150,7 +126,7 @@ class Dispatcher(
}
}
- protected[akka] def registerForExecution(mbox: MessageQueue with ExecutableMailbox): Unit = {
+ protected[akka] def registerForExecution(mbox: Mailbox): Unit = {
if (mbox.dispatcherLock.tryLock()) {
if (active.isOn && (!mbox.suspended.locked || !mbox.systemMessages.isEmpty)) { //If the dispatcher is active and the actor not suspended
try {
@@ -167,11 +143,11 @@ class Dispatcher(
}
}
- protected[akka] def reRegisterForExecution(mbox: MessageQueue with ExecutableMailbox): Unit =
+ protected[akka] def reRegisterForExecution(mbox: Mailbox): Unit =
registerForExecution(mbox)
protected override def cleanUpMailboxFor(actor: ActorCell) {
- val m = getMailbox(actor)
+ val m = actor.mailbox
if (!m.isEmpty) {
var invocation = m.dequeue
lazy val exception = new ActorKilledException("Actor has been stopped")
@@ -185,68 +161,15 @@ class Dispatcher(
override val toString = getClass.getSimpleName + "[" + name + "]"
def suspend(actor: ActorCell): Unit =
- getMailbox(actor).suspended.tryLock
+ actor.mailbox.suspended.tryLock
def resume(actor: ActorCell): Unit = {
- val mbox = getMailbox(actor)
+ val mbox = actor.mailbox
mbox.suspended.tryUnlock
reRegisterForExecution(mbox)
}
}
-/**
- * This is the behavior of an Dispatchers mailbox.
- */
-trait ExecutableMailbox extends Runnable { self: MessageQueue ⇒
-
- def dispatcher: Dispatcher
-
- final def run = {
- try { processMailbox() } catch {
- case ie: InterruptedException ⇒ Thread.currentThread().interrupt() //Restore interrupt
- } finally {
- dispatcherLock.unlock()
- if (!self.isEmpty || !self.systemMessages.isEmpty)
- dispatcher.reRegisterForExecution(this)
- }
- }
-
- /**
- * Process the messages in the mailbox
- *
- * @return true if the processing finished before the mailbox was empty, due to the throughput constraint
- */
- final def processMailbox() {
- processAllSystemMessages()
- if (!self.suspended.locked) {
- var nextMessage = self.dequeue
- if (nextMessage ne null) { //If we have a message
- if (dispatcher.throughput <= 1) //If we only run one message per process
- nextMessage.invoke //Just run it
- else { //But otherwise, if we are throttled, we need to do some book-keeping
- var processedMessages = 0
- val isDeadlineEnabled = dispatcher.throughputDeadlineTime > 0
- val deadlineNs = if (isDeadlineEnabled) System.nanoTime + TimeUnit.MILLISECONDS.toNanos(dispatcher.throughputDeadlineTime)
- else 0
- do {
- nextMessage.invoke
- processAllSystemMessages()
- nextMessage =
- if (self.suspended.locked) {
- null // If we are suspended, abort
- } else { // If we aren't suspended, we need to make sure we're not overstepping our boundaries
- processedMessages += 1
- if ((processedMessages >= dispatcher.throughput) || (isDeadlineEnabled && System.nanoTime >= deadlineNs)) // If we're throttled, break out
- null //We reached our boundaries, abort
- else self.dequeue //Dequeue the next message
- }
- } while (nextMessage ne null)
- }
- }
- }
- }
-}
-
object PriorityGenerator {
/**
* Creates a PriorityGenerator that uses the supplied function as priority generator
@@ -260,13 +183,15 @@ object PriorityGenerator {
* A PriorityGenerator is a convenience API to create a Comparator that orders the messages of a
* PriorityDispatcher
*/
-abstract class PriorityGenerator extends java.util.Comparator[MessageInvocation] {
+abstract class PriorityGenerator extends java.util.Comparator[Envelope] {
def gen(message: Any): Int
- final def compare(thisMessage: MessageInvocation, thatMessage: MessageInvocation): Int =
+ final def compare(thisMessage: Envelope, thatMessage: Envelope): Int =
gen(thisMessage.message) - gen(thatMessage.message)
}
+// TODO: should this be deleted, given that any dispatcher can now use UnboundedPriorityMailbox?
+
/**
* A version of Dispatcher that gives all actors registered to it a priority mailbox,
* prioritized according to the supplied comparator.
@@ -275,50 +200,29 @@ abstract class PriorityGenerator extends java.util.Comparator[MessageInvocation]
*/
class PriorityDispatcher(
name: String,
- val comparator: java.util.Comparator[MessageInvocation],
+ val comparator: java.util.Comparator[Envelope],
throughput: Int = Dispatchers.THROUGHPUT,
throughputDeadlineTime: Int = Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS,
mailboxType: MailboxType = Dispatchers.MAILBOX_TYPE,
- executorServiceFactoryProvider: ExecutorServiceFactoryProvider = ThreadPoolConfig()) extends Dispatcher(name, throughput, throughputDeadlineTime, mailboxType, executorServiceFactoryProvider) with PriorityMailbox {
+ executorServiceFactoryProvider: ExecutorServiceFactoryProvider = ThreadPoolConfig()) extends Dispatcher(name, throughput, throughputDeadlineTime, mailboxType, executorServiceFactoryProvider) {
- def this(name: String, comparator: java.util.Comparator[MessageInvocation], throughput: Int, throughputDeadlineTime: Int, mailboxType: MailboxType) =
+ def this(name: String, comparator: java.util.Comparator[Envelope], throughput: Int, throughputDeadlineTime: Int, mailboxType: MailboxType) =
this(name, comparator, throughput, throughputDeadlineTime, mailboxType, ThreadPoolConfig()) // Needed for Java API usage
- def this(name: String, comparator: java.util.Comparator[MessageInvocation], throughput: Int, mailboxType: MailboxType) =
+ def this(name: String, comparator: java.util.Comparator[Envelope], throughput: Int, mailboxType: MailboxType) =
this(name, comparator, throughput, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, mailboxType) // Needed for Java API usage
- def this(name: String, comparator: java.util.Comparator[MessageInvocation], throughput: Int) =
+ def this(name: String, comparator: java.util.Comparator[Envelope], throughput: Int) =
this(name, comparator, throughput, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, Dispatchers.MAILBOX_TYPE) // Needed for Java API usage
- def this(name: String, comparator: java.util.Comparator[MessageInvocation], executorServiceFactoryProvider: ExecutorServiceFactoryProvider) =
+ def this(name: String, comparator: java.util.Comparator[Envelope], executorServiceFactoryProvider: ExecutorServiceFactoryProvider) =
this(name, comparator, Dispatchers.THROUGHPUT, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, Dispatchers.MAILBOX_TYPE, executorServiceFactoryProvider)
- def this(name: String, comparator: java.util.Comparator[MessageInvocation]) =
+ def this(name: String, comparator: java.util.Comparator[Envelope]) =
this(name, comparator, Dispatchers.THROUGHPUT, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, Dispatchers.MAILBOX_TYPE) // Needed for Java API usage
-}
-/**
- * Can be used to give an Dispatcher's actors priority-enabled mailboxes
- *
- * Usage:
- * new Dispatcher(...) with PriorityMailbox {
- * val comparator = ...comparator that determines mailbox priority ordering...
- * }
- */
-trait PriorityMailbox { self: Dispatcher ⇒
- def comparator: java.util.Comparator[MessageInvocation]
-
- override def createMailbox(actor: ActorCell): AnyRef = self.mailboxType match {
- case b: UnboundedMailbox ⇒
- new UnboundedPriorityMessageQueue(comparator) with ExecutableMailbox {
- @inline
- final def dispatcher = self
- }
-
- case b: BoundedMailbox ⇒
- new BoundedPriorityMessageQueue(b.capacity, b.pushTimeOut, comparator) with ExecutableMailbox {
- @inline
- final def dispatcher = self
- }
+ override def createMailbox(actor: ActorCell): Mailbox = mailboxType match {
+ case _: UnboundedMailbox ⇒ UnboundedPriorityMailbox(comparator).create(this)
+ case BoundedMailbox(cap, timeout) ⇒ BoundedPriorityMailbox(comparator, cap, timeout).create(this)
}
}
diff --git a/akka-actor/src/main/scala/akka/dispatch/MailboxHandling.scala b/akka-actor/src/main/scala/akka/dispatch/MailboxHandling.scala
index ece5c3d1a9..edb04c9a4d 100644
--- a/akka-actor/src/main/scala/akka/dispatch/MailboxHandling.scala
+++ b/akka-actor/src/main/scala/akka/dispatch/MailboxHandling.scala
@@ -5,76 +5,174 @@
package akka.dispatch
import akka.AkkaException
-
import java.util.{ Comparator, PriorityQueue }
import java.util.concurrent._
import akka.util._
+import java.util.Queue
+import akka.actor.ActorContext
class MessageQueueAppendFailedException(message: String, cause: Throwable = null) extends AkkaException(message, cause)
/**
* @author Jonas Bonér
*/
-trait MessageQueue {
- val dispatcherLock = new SimpleLock(startLocked = false)
- val suspended = new SimpleLock(startLocked = false) //(startLocked = true)
- val systemMessages = new ConcurrentLinkedQueue[SystemMessageInvocation]()
+trait Mailbox extends Runnable {
+ /*
+ * Internal implementation of MessageDispatcher uses these, don't touch or rely on
+ */
+ final val dispatcherLock = new SimpleLock(startLocked = false)
+ final val suspended = new SimpleLock(startLocked = false) //(startLocked = true)
+ final val systemMessages = new ConcurrentLinkedQueue[SystemEnvelope]()
- def enqueue(handle: MessageInvocation)
- def dequeue(): MessageInvocation
- def systemEnqueue(handle: SystemMessageInvocation): Unit = systemMessages.offer(handle)
- def systemDequeue(): SystemMessageInvocation = systemMessages.poll()
- def size: Int
- def isEmpty: Boolean
+ final def systemEnqueue(handle: SystemEnvelope): Unit = systemMessages.offer(handle)
+ final def systemDequeue(): SystemEnvelope = systemMessages.poll()
- def processAllSystemMessages(): Unit = {
- var nextMessage = systemDequeue()
- while (nextMessage ne null) {
- nextMessage.invoke()
- nextMessage = systemDequeue()
+ def dispatcher: MessageDispatcher
+
+ final def run = {
+ try { processMailbox() } catch {
+ case ie: InterruptedException ⇒ Thread.currentThread().interrupt() //Restore interrupt
+ } finally {
+ dispatcherLock.unlock()
+ if (!isEmpty || !systemMessages.isEmpty)
+ dispatcher.reRegisterForExecution(this)
}
}
+
+ /**
+ * Process the messages in the mailbox
+ *
+ * @return true if the processing finished before the mailbox was empty, due to the throughput constraint
+ */
+ final def processMailbox() {
+ if (processAllSystemMessages() && !suspended.locked) {
+ var nextMessage = dequeue()
+ if (nextMessage ne null) { //If we have a message
+ if (dispatcher.throughput <= 1) //If we only run one message per process
+ nextMessage.invoke //Just run it
+ else { //But otherwise, if we are throttled, we need to do some book-keeping
+ var processedMessages = 0
+ val isDeadlineEnabled = dispatcher.throughputDeadlineTime > 0
+ val deadlineNs = if (isDeadlineEnabled) System.nanoTime + TimeUnit.MILLISECONDS.toNanos(dispatcher.throughputDeadlineTime)
+ else 0
+ do {
+ nextMessage.invoke
+ nextMessage =
+ if (!processAllSystemMessages() || suspended.locked) {
+ null // If we are suspended, abort
+ } else { // If we aren't suspended, we need to make sure we're not overstepping our boundaries
+ processedMessages += 1
+ if ((processedMessages >= dispatcher.throughput) || (isDeadlineEnabled && System.nanoTime >= deadlineNs)) // If we're throttled, break out
+ null //We reached our boundaries, abort
+ else dequeue //Dequeue the next message
+ }
+ } while (nextMessage ne null)
+ }
+ }
+ }
+ }
+
+ def processAllSystemMessages(): Boolean = {
+ var nextMessage = systemDequeue()
+ while (nextMessage ne null) {
+ if (!nextMessage.invoke()) return false
+ nextMessage = systemDequeue()
+ }
+ true
+ }
+
+ /*
+ * These method need to be implemented in subclasses; they should not rely on the internal stuff above.
+ */
+ def enqueue(handle: Envelope)
+ def dequeue(): Envelope
+
+ def size: Int
+ def isEmpty: Boolean
+}
+
+trait UnboundedMessageQueueSemantics { self: QueueMailbox ⇒
+ val queue: Queue[Envelope]
+
+ final def enqueue(handle: Envelope): Unit = queue add handle
+ final def dequeue(): Envelope = queue.poll()
+}
+
+trait BoundedMessageQueueSemantics { self: BlockingQueueMailbox ⇒
+ def pushTimeOut: Duration
+
+ final def enqueue(handle: Envelope) {
+ if (pushTimeOut.length > 0) {
+ queue.offer(handle, pushTimeOut.length, pushTimeOut.unit) || {
+ throw new MessageQueueAppendFailedException("Couldn't enqueue message " + handle + " to " + toString)
+ }
+ } else queue put handle
+ }
+
+ final def dequeue(): Envelope = queue.poll()
+}
+
+trait QueueMailbox extends Mailbox {
+ val queue: Queue[Envelope]
+ final def size = queue.size
+ final def isEmpty = queue.isEmpty
+}
+
+trait BlockingQueueMailbox extends QueueMailbox {
+ val queue: BlockingQueue[Envelope]
+}
+
+abstract class PriorityBlockingQueueMailbox(cmp: Comparator[Envelope], val dispatcher: MessageDispatcher) extends BlockingQueueMailbox {
+ override val queue = new PriorityBlockingQueue(11, cmp) // 11 is the default initial capacity in PriorityQueue.java
+}
+
+abstract class ConcurrentLinkedQueueMailbox(val dispatcher: MessageDispatcher) extends QueueMailbox {
+ override val queue = new ConcurrentLinkedQueue[Envelope]
+}
+
+abstract class LinkedBlockingQueueMailbox(val dispatcher: MessageDispatcher) extends BlockingQueueMailbox {
+ override val queue = new LinkedBlockingQueue[Envelope]
}
/**
* Mailbox configuration.
*/
-sealed trait MailboxType
+trait MailboxType {
+ def create(dispatcher: MessageDispatcher): Mailbox
+}
+
+case class UnboundedMailbox() extends MailboxType {
+ override def create(dispatcher: MessageDispatcher) = new ConcurrentLinkedQueueMailbox(dispatcher) with UnboundedMessageQueueSemantics
+}
-case class UnboundedMailbox() extends MailboxType
case class BoundedMailbox(
val capacity: Int = { if (Dispatchers.MAILBOX_CAPACITY < 0) Int.MaxValue else Dispatchers.MAILBOX_CAPACITY },
val pushTimeOut: Duration = Dispatchers.MAILBOX_PUSH_TIME_OUT) extends MailboxType {
+
if (capacity < 0) throw new IllegalArgumentException("The capacity for BoundedMailbox can not be negative")
if (pushTimeOut eq null) throw new IllegalArgumentException("The push time-out for BoundedMailbox can not be null")
-}
-trait UnboundedMessageQueueSemantics extends MessageQueue { self: BlockingQueue[MessageInvocation] ⇒
- @inline
- final def enqueue(handle: MessageInvocation): Unit = this add handle
- @inline
- final def dequeue(): MessageInvocation = this.poll()
-}
-
-trait BoundedMessageQueueSemantics extends MessageQueue { self: BlockingQueue[MessageInvocation] ⇒
- def pushTimeOut: Duration
-
- final def enqueue(handle: MessageInvocation) {
- if (pushTimeOut.length > 0) {
- this.offer(handle, pushTimeOut.length, pushTimeOut.unit) || {
- throw new MessageQueueAppendFailedException("Couldn't enqueue message " + handle + " to " + toString)
- }
- } else this put handle
+ override def create(dispatcher: MessageDispatcher) = new LinkedBlockingQueueMailbox(dispatcher) with BoundedMessageQueueSemantics {
+ val capacity = BoundedMailbox.this.capacity
+ val pushTimeOut = BoundedMailbox.this.pushTimeOut
}
-
- @inline
- final def dequeue(): MessageInvocation = this.poll()
}
-class DefaultUnboundedMessageQueue extends LinkedBlockingQueue[MessageInvocation] with UnboundedMessageQueueSemantics
+case class UnboundedPriorityMailbox(cmp: Comparator[Envelope]) extends MailboxType {
+ override def create(dispatcher: MessageDispatcher) = new PriorityBlockingQueueMailbox(cmp, dispatcher) with UnboundedMessageQueueSemantics
+}
-class DefaultBoundedMessageQueue(capacity: Int, val pushTimeOut: Duration) extends LinkedBlockingQueue[MessageInvocation](capacity) with BoundedMessageQueueSemantics
+case class BoundedPriorityMailbox(
+ val cmp: Comparator[Envelope],
+ val capacity: Int = { if (Dispatchers.MAILBOX_CAPACITY < 0) Int.MaxValue else Dispatchers.MAILBOX_CAPACITY },
+ val pushTimeOut: Duration = Dispatchers.MAILBOX_PUSH_TIME_OUT) extends MailboxType {
-class UnboundedPriorityMessageQueue(cmp: Comparator[MessageInvocation]) extends PriorityBlockingQueue[MessageInvocation](11, cmp) with UnboundedMessageQueueSemantics
+ if (capacity < 0) throw new IllegalArgumentException("The capacity for BoundedMailbox can not be negative")
+ if (pushTimeOut eq null) throw new IllegalArgumentException("The push time-out for BoundedMailbox can not be null")
+
+ override def create(dispatcher: MessageDispatcher) = new PriorityBlockingQueueMailbox(cmp, dispatcher) with BoundedMessageQueueSemantics {
+ val capacity = BoundedPriorityMailbox.this.capacity
+ val pushTimeOut = BoundedPriorityMailbox.this.pushTimeOut
+ }
+}
-class BoundedPriorityMessageQueue(capacity: Int, val pushTimeOut: Duration, cmp: Comparator[MessageInvocation]) extends BoundedBlockingQueue[MessageInvocation](capacity, new PriorityQueue[MessageInvocation](11, cmp)) with BoundedMessageQueueSemantics
diff --git a/akka-actor/src/main/scala/akka/dispatch/MessageHandling.scala b/akka-actor/src/main/scala/akka/dispatch/MessageHandling.scala
index a5dddbda12..2586df3ec1 100644
--- a/akka-actor/src/main/scala/akka/dispatch/MessageHandling.scala
+++ b/akka-actor/src/main/scala/akka/dispatch/MessageHandling.scala
@@ -16,7 +16,7 @@ import akka.actor._
/**
* @author Jonas Bonér
*/
-final case class MessageInvocation(val receiver: ActorCell, val message: Any, val channel: UntypedChannel) {
+final case class Envelope(val receiver: ActorCell, val message: Any, val channel: UntypedChannel) {
if (receiver eq null) throw new IllegalArgumentException("Receiver can't be null")
final def invoke() { receiver invoke this }
@@ -29,9 +29,12 @@ case object Suspend extends SystemMessage
case object Resume extends SystemMessage
case object Terminate extends SystemMessage
-final case class SystemMessageInvocation(val receiver: ActorCell, val message: SystemMessage, val channel: UntypedChannel) {
+final case class SystemEnvelope(val receiver: ActorCell, val message: SystemMessage, val channel: UntypedChannel) {
if (receiver eq null) throw new IllegalArgumentException("Receiver can't be null")
- final def invoke() { receiver systemInvoke this }
+ /**
+ * @return whether to proceed with processing other messages
+ */
+ final def invoke(): Boolean = receiver systemInvoke this
}
final case class TaskInvocation(function: () ⇒ Unit, cleanup: () ⇒ Unit) extends Runnable {
@@ -70,7 +73,18 @@ abstract class MessageDispatcher extends Serializable {
/**
* Creates and returns a mailbox for the given actor.
*/
- protected[akka] def createMailbox(actor: ActorCell): AnyRef
+ protected[akka] def createMailbox(actor: ActorCell): Mailbox
+
+ /**
+ * Create a blackhole mailbox for the purpose of replacing the real one upon actor termination
+ */
+ protected[akka] def createDeadletterMailbox = new Mailbox {
+ override def dispatcher = MessageDispatcher.this
+ override def enqueue(envelope: Envelope) {}
+ override def dequeue() = null
+ override def isEmpty = true
+ override def size = 0
+ }
/**
* Name of this dispatcher.
@@ -98,7 +112,7 @@ abstract class MessageDispatcher extends Serializable {
}
}
- protected[akka] final def dispatchMessage(invocation: MessageInvocation): Unit = dispatch(invocation)
+ protected[akka] final def dispatchMessage(invocation: Envelope): Unit = dispatch(invocation)
protected[akka] final def dispatchTask(block: () ⇒ Unit): Unit = {
_tasks.getAndIncrement()
@@ -156,7 +170,7 @@ abstract class MessageDispatcher extends Serializable {
protected[akka] def unregister(actor: ActorCell) = {
if (uuids remove actor.uuid) {
cleanUpMailboxFor(actor)
- actor.mailbox = null
+ actor.mailbox = createDeadletterMailbox
if (uuids.isEmpty && _tasks.get == 0) {
shutdownSchedule match {
case UNSCHEDULED ⇒
@@ -229,12 +243,21 @@ abstract class MessageDispatcher extends Serializable {
/**
* Will be called when the dispatcher is to queue an invocation for execution
*/
- protected[akka] def dispatch(invocation: MessageInvocation)
+ protected[akka] def dispatch(invocation: Envelope)
+
+ /**
+ * Callback for processMailbox() which is called after one sweep of processing is done.
+ */
+ protected[akka] def reRegisterForExecution(mbox: Mailbox)
+
+ // TODO check whether this should not actually be a property of the mailbox
+ protected[akka] def throughput: Int
+ protected[akka] def throughputDeadlineTime: Int
/**
* Will be called when the dispatcher is to queue an invocation for execution
*/
- protected[akka] def systemDispatch(invocation: SystemMessageInvocation)
+ protected[akka] def systemDispatch(invocation: SystemEnvelope)
protected[akka] def executeTask(invocation: TaskInvocation)
@@ -251,12 +274,12 @@ abstract class MessageDispatcher extends Serializable {
/**
* Returns the size of the mailbox for the specified actor
*/
- def mailboxSize(actor: ActorCell): Int
+ def mailboxSize(actor: ActorCell): Int = actor.mailbox.size
/**
* Returns the "current" emptiness status of the mailbox for the specified actor
*/
- def mailboxIsEmpty(actor: ActorCell): Boolean
+ def mailboxIsEmpty(actor: ActorCell): Boolean = actor.mailbox.isEmpty
/**
* Returns the amount of tasks queued for execution
diff --git a/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala b/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala
index 7938966770..3bf47e1c8d 100644
--- a/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala
+++ b/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala
@@ -4,7 +4,7 @@
package akka.util
-import akka.dispatch.MessageInvocation
+import akka.dispatch.Envelope
import akka.config.{ Config, ModuleNotAvailableException }
import akka.cluster.RemoteSupport
import akka.actor._
@@ -99,8 +99,8 @@ object ReflectiveAccess {
}
type Mailbox = {
- def enqueue(message: MessageInvocation)
- def dequeue: MessageInvocation
+ def enqueue(message: Envelope)
+ def dequeue: Envelope
}
type TransactionLogObject = {
@@ -118,7 +118,7 @@ object ReflectiveAccess {
}
type TransactionLog = {
- def recordEntry(messageHandle: MessageInvocation, actorRef: LocalActorRef)
+ def recordEntry(messageHandle: Envelope, actorRef: LocalActorRef)
def recordEntry(entry: Array[Byte])
def recordSnapshot(snapshot: Array[Byte])
def entries: Vector[Array[Byte]]
diff --git a/akka-camel/src/main/scala/akka/camel/component/ActorComponent.scala b/akka-camel/src/main/scala/akka/camel/component/ActorComponent.scala
index aa487ff588..68fb071ec7 100644
--- a/akka-camel/src/main/scala/akka/camel/component/ActorComponent.scala
+++ b/akka-camel/src/main/scala/akka/camel/component/ActorComponent.scala
@@ -16,7 +16,7 @@ import akka.actor._
import akka.camel.{ Ack, Failure, Message }
import akka.camel.CamelMessageConversion.toExchangeAdapter
import scala.reflect.BeanProperty
-import akka.dispatch.{ FutureTimeoutException, Promise, MessageInvocation, MessageDispatcher }
+import akka.dispatch.{ FutureTimeoutException, Promise, MessageDispatcher }
/**
* @author Martin Krasser
diff --git a/akka-remote/src/main/scala/akka/serialization/SerializationProtocol.scala b/akka-remote/src/main/scala/akka/serialization/SerializationProtocol.scala
index 65f3719441..765e6b01af 100644
--- a/akka-remote/src/main/scala/akka/serialization/SerializationProtocol.scala
+++ b/akka-remote/src/main/scala/akka/serialization/SerializationProtocol.scala
@@ -8,7 +8,7 @@ import akka.config.Supervision._
import akka.actor.{ uuidFrom, newUuid }
import akka.actor._
import DeploymentConfig._
-import akka.dispatch.MessageInvocation
+import akka.dispatch.Envelope
import akka.util.{ ReflectiveAccess, Duration }
import akka.cluster.{ RemoteClientSettings, MessageSerializer }
import akka.cluster.RemoteProtocol
@@ -107,9 +107,9 @@ object ActorSerialization {
l.underlying.mailbox match {
case null ⇒ throw new IllegalActorStateException("Can't serialize an actor that has not been started.")
case q: java.util.Queue[_] ⇒
- val l = new scala.collection.mutable.ListBuffer[MessageInvocation]
+ val l = new scala.collection.mutable.ListBuffer[Envelope]
val it = q.iterator
- while (it.hasNext) l += it.next.asInstanceOf[MessageInvocation]
+ while (it.hasNext) l += it.next.asInstanceOf[Envelope]
l map { m ⇒
RemoteActorSerialization.createRemoteMessageProtocolBuilder(
diff --git a/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala b/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala
index b9fc9204bc..e5c75a1940 100644
--- a/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala
+++ b/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala
@@ -16,17 +16,17 @@ import akka.dispatch._
/*
* Locking rules:
*
- * While not suspended, messages are processed (!isActive) or queued
- * thread-locally (isActive). While suspended, messages are queued
+ * While not suspendSwitch, messages are processed (!isActive) or queued
+ * thread-locally (isActive). While suspendSwitch, messages are queued
* thread-locally. When resuming, all messages are atomically scooped from all
* non-active threads and queued on the resuming thread's queue, to be
* processed immediately. Processing a queue checks suspend before each
- * invocation, leaving the active state if suspended. For this to work
+ * invocation, leaving the active state if suspendSwitch. For this to work
* reliably, the active flag needs to be set atomically with the initial check
* for suspend. Scooping up messages means replacing the ThreadLocal's contents
* with an empty new NestingQueue.
*
- * All accesses to the queue must be done under the suspended-switch's lock, so
+ * All accesses to the queue must be done under the suspendSwitch-switch's lock, so
* within one of its methods taking a closure argument.
*/
@@ -83,7 +83,7 @@ object CallingThreadDispatcher {
* Dispatcher which runs invocations on the current thread only. This
* dispatcher does not create any new threads, but it can be used from
* different threads concurrently for the same actor. The dispatch strategy is
- * to run on the current thread unless the target actor is either suspended or
+ * to run on the current thread unless the target actor is either suspendSwitch or
* already running on the current thread (if it is running on a different
* thread, then this thread will block until that other invocation is
* finished); if the invocation is not run, it is queued in a thread-local
@@ -93,7 +93,7 @@ object CallingThreadDispatcher {
*
* Suspending and resuming are global actions for one actor, meaning they can
* affect different threads, which leads to complications. If messages are
- * queued (thread-locally) during the suspended period, the only thread to run
+ * queued (thread-locally) during the suspendSwitch period, the only thread to run
* them upon resume is the thread actually calling the resume method. Hence,
* all thread-local queues which are not currently being drained (possible,
* since suspend-queue-resume might happen entirely during an invocation on a
@@ -107,7 +107,7 @@ object CallingThreadDispatcher {
class CallingThreadDispatcher(val name: String = "calling-thread", val warnings: Boolean = true) extends MessageDispatcher {
import CallingThreadDispatcher._
- protected[akka] override def createMailbox(actor: ActorCell) = new CallingThreadMailbox
+ protected[akka] override def createMailbox(actor: ActorCell) = new CallingThreadMailbox(this)
private def getMailbox(actor: ActorCell) = actor.mailbox.asInstanceOf[CallingThreadMailbox]
@@ -115,17 +115,22 @@ class CallingThreadDispatcher(val name: String = "calling-thread", val warnings:
protected[akka] override def shutdown() {}
+ protected[akka] override def reRegisterForExecution(mbox: Mailbox) {}
+
+ protected[akka] override def throughput = 0
+ protected[akka] override def throughputDeadlineTime = 0
+
protected[akka] override def timeoutMs = 100L
override def suspend(actor: ActorCell) {
- getMailbox(actor).suspended.switchOn
+ getMailbox(actor).suspendSwitch.switchOn
}
override def resume(actor: ActorCell) {
val mbox = getMailbox(actor)
val queue = mbox.queue
val wasActive = queue.isActive
- val switched = mbox.suspended.switchOff {
+ val switched = mbox.suspendSwitch.switchOff {
gatherFromAllInactiveQueues(mbox, queue)
}
if (switched && !wasActive) {
@@ -137,7 +142,7 @@ class CallingThreadDispatcher(val name: String = "calling-thread", val warnings:
override def mailboxIsEmpty(actor: ActorCell): Boolean = getMailbox(actor).queue.isEmpty
- protected[akka] override def systemDispatch(handle: SystemMessageInvocation) {
+ protected[akka] override def systemDispatch(handle: SystemEnvelope) {
val mbox = getMailbox(handle.receiver)
mbox.lock.lock
try {
@@ -147,13 +152,13 @@ class CallingThreadDispatcher(val name: String = "calling-thread", val warnings:
}
}
- protected[akka] override def dispatch(handle: MessageInvocation) {
+ protected[akka] override def dispatch(handle: Envelope) {
val mbox = getMailbox(handle.receiver)
val queue = mbox.queue
- val execute = mbox.suspended.fold {
+ val execute = mbox.suspendSwitch.fold {
queue.push(handle)
if (warnings && handle.channel.isInstanceOf[Promise[_]]) {
- EventHandler.warning(this, "suspended, creating Future could deadlock; target: %s" format handle.receiver)
+ EventHandler.warning(this, "suspendSwitch, creating Future could deadlock; target: %s" format handle.receiver)
}
false
} {
@@ -180,14 +185,14 @@ class CallingThreadDispatcher(val name: String = "calling-thread", val warnings:
*
* If the catch block is executed, then a non-empty mailbox may be stalled as
* there is no-one who cares to execute it before the next message is sent or
- * it is suspended and resumed.
+ * it is suspendSwitch and resumed.
*/
@tailrec
private def runQueue(mbox: CallingThreadMailbox, queue: NestingQueue) {
assert(queue.isActive)
mbox.lock.lock
val recurse = try {
- val handle = mbox.suspended.fold[MessageInvocation] {
+ val handle = mbox.suspendSwitch.fold[Envelope] {
queue.leave
null
} {
@@ -224,10 +229,10 @@ class CallingThreadDispatcher(val name: String = "calling-thread", val warnings:
}
class NestingQueue {
- private var q = new LinkedList[MessageInvocation]()
+ private var q = new LinkedList[Envelope]()
def size = q.size
def isEmpty = q.isEmpty
- def push(handle: MessageInvocation) { q.offer(handle) }
+ def push(handle: Envelope) { q.offer(handle) }
def peek = q.peek
def pop = q.poll
@@ -238,7 +243,7 @@ class NestingQueue {
def isActive = active
}
-class CallingThreadMailbox {
+class CallingThreadMailbox(val dispatcher: MessageDispatcher) extends Mailbox {
private val q = new ThreadLocal[NestingQueue]() {
override def initialValue = new NestingQueue
@@ -247,6 +252,10 @@ class CallingThreadMailbox {
def queue = q.get
val lock = new ReentrantLock
+ val suspendSwitch = new Switch
- val suspended = new Switch(false)
+ override def enqueue(msg: Envelope) {}
+ override def dequeue() = null
+ override def isEmpty = true
+ override def size = 0
}