Decoupling system message implementation details from the Mailbox

This commit is contained in:
Viktor Klang 2011-09-21 16:27:31 +02:00
parent 7c63f94169
commit 3d12e47e7d
10 changed files with 72 additions and 49 deletions

View file

@ -267,8 +267,8 @@ class ActorRefSpec extends WordSpec with MustMatchers {
val inetAddress = ReflectiveAccess.RemoteModule.configDefaultAddress
val expectedSerializedRepresentation = SerializedActorRef(
newUuid,
"nonsense",
a.uuid,
a.address,
inetAddress.getAddress.getHostAddress,
inetAddress.getPort,
a.timeout)

View file

@ -317,8 +317,12 @@ abstract class ActorModelSpec extends JUnitSuite {
assertCountDown(bParallel, Testing.testTime(3000), "Should process other actors in parallel")
aStop.countDown()
a.stop()
b.stop()
a.stop
b.stop
while (a.isRunning && b.isRunning) {} //Busy wait for termination
assertRefDefaultZero(a)(registers = 1, unregisters = 1, msgsReceived = 1, msgsProcessed = 1)
assertRefDefaultZero(b)(registers = 1, unregisters = 1, msgsReceived = 1, msgsProcessed = 1)
}

View file

@ -80,8 +80,8 @@ class BalancingDispatcherSpec extends JUnitSuite with MustMatchers {
}
finishedCounter.await(5, TimeUnit.SECONDS)
fast.underlying.mailbox.asInstanceOf[Mailbox].isEmpty must be(true)
slow.underlying.mailbox.asInstanceOf[Mailbox].isEmpty must be(true)
fast.underlying.mailbox.asInstanceOf[Mailbox].hasMessages must be(false)
slow.underlying.mailbox.asInstanceOf[Mailbox].hasMessages must be(false)
fast.underlyingActorInstance.asInstanceOf[DelayableActor].invocationCount must be > sentToFast
fast.underlyingActorInstance.asInstanceOf[DelayableActor].invocationCount must be >
(slow.underlyingActorInstance.asInstanceOf[DelayableActor].invocationCount)

View file

@ -41,16 +41,16 @@ abstract class MailboxSpec extends WordSpec with MustMatchers with BeforeAndAfte
for (i 1 to config.capacity) q.enqueue(exampleMessage)
q.size must be === config.capacity
q.isEmpty must be === false
q.numberOfMessages must be === config.capacity
q.hasMessages must be === true
intercept[MessageQueueAppendFailedException] {
q.enqueue(exampleMessage)
}
q.dequeue must be === exampleMessage
q.size must be(config.capacity - 1)
q.isEmpty must be === false
q.numberOfMessages must be(config.capacity - 1)
q.hasMessages must be === true
}
"dequeue what was enqueued properly for unbounded mailboxes" in {
@ -97,8 +97,8 @@ abstract class MailboxSpec extends WordSpec with MustMatchers with BeforeAndAfte
}
case _
}
q.size must be === 0
q.isEmpty must be === true
q.numberOfMessages must be === 0
q.hasMessages must be === false
}
def testEnqueueDequeue(config: MailboxType) {
@ -119,7 +119,7 @@ abstract class MailboxSpec extends WordSpec with MustMatchers with BeforeAndAfte
def createConsumer: Future[Vector[Envelope]] = spawn {
var r = Vector[Envelope]()
while (producers.exists(_.isCompleted == false) || !q.isEmpty) {
while (producers.exists(_.isCompleted == false) || q.hasMessages) {
q.dequeue match {
case null
case message r = r :+ message

View file

@ -264,7 +264,7 @@ class LocalActorRef private[akka] (
protected[akka] def underlyingActorInstance: Actor = {
var instance = actorCell.actor.get
while (instance eq null) {
while ((instance eq null) && actorCell.isRunning) {
try { Thread.sleep(1) } catch { case i: InterruptedException }
instance = actorCell.actor.get
}

View file

@ -65,7 +65,7 @@ class BalancingDispatcher(
override protected[akka] def dispatch(invocation: Envelope) = {
val mbox = invocation.receiver.mailbox
if (donationInProgress.value == false && (!mbox.isEmpty || mbox.dispatcherLock.locked) && attemptDonationOf(invocation, mbox)) {
if (donationInProgress.value == false && (mbox.hasMessages || mbox.dispatcherLock.locked) && attemptDonationOf(invocation, mbox)) {
//We were busy and we got to donate the message to some other lucky guy, we're done here
} else {
mbox enqueue invocation
@ -79,7 +79,7 @@ class BalancingDispatcher(
while (donateFrom(mbox)) {} //When we reregister, first donate messages to another actor
} finally { donationInProgress.value = false }
if (!mbox.isEmpty) //If we still have messages left to process, reschedule for execution
if (mbox.hasMessages) //If we still have messages left to process, reschedule for execution
super.reRegisterForExecution(mbox)
}
@ -131,10 +131,11 @@ class BalancingDispatcher(
while ((i < prSz) && (recipient eq null)) {
val actor = potentialRecipients((i + startIndex) % prSz) //Wrap-around, one full lap
val mbox = actor.mailbox
if ((mbox ne donorMbox) && mbox.isEmpty) { //Don't donate to yourself
recipient = actor //Found!
actor.mailbox match {
case `donorMbox` | `deadLetterMailbox` //Not interesting
case mbox
if (!mbox.hasMessages) //Don't donate to yourself
recipient = actor //Found!
}
i += 1

View file

@ -128,7 +128,7 @@ class Dispatcher(
protected[akka] def registerForExecution(mbox: Mailbox): Unit = {
if (mbox.dispatcherLock.tryLock()) {
if (active.isOn && (!mbox.suspended.locked || !mbox.systemMessages.isEmpty)) { //If the dispatcher is active and the actor not suspended
if (active.isOn && (!mbox.suspended.locked || mbox.hasSystemMessages)) { //If the dispatcher is active and the actor not suspended
try {
executorService.get() execute mbox
} catch {
@ -148,7 +148,7 @@ class Dispatcher(
protected override def cleanUpMailboxFor(actor: ActorCell) {
val m = actor.mailbox
if (!m.isEmpty) {
if (m.hasMessages) {
var invocation = m.dequeue
lazy val exception = new ActorKilledException("Actor has been stopped")
while (invocation ne null) {
@ -156,6 +156,9 @@ class Dispatcher(
invocation = m.dequeue
}
}
while (m.systemDequeue() ne null) {
//Empty the system messages
}
}
override val toString = getClass.getSimpleName + "[" + name + "]"

View file

@ -6,10 +6,10 @@ package akka.dispatch
import akka.AkkaException
import java.util.{ Comparator, PriorityQueue }
import java.util.concurrent._
import akka.util._
import java.util.Queue
import akka.actor.ActorContext
import java.util.concurrent._
class MessageQueueAppendFailedException(message: String, cause: Throwable = null) extends AkkaException(message, cause)
@ -22,19 +22,13 @@ trait Mailbox extends Runnable {
*/
final val dispatcherLock = new SimpleLock(startLocked = false)
final val suspended = new SimpleLock(startLocked = false) //(startLocked = true)
final val systemMessages = new ConcurrentLinkedQueue[SystemEnvelope]()
final def systemEnqueue(handle: SystemEnvelope): Unit = systemMessages.offer(handle)
final def systemDequeue(): SystemEnvelope = systemMessages.poll()
def dispatcher: MessageDispatcher
final def run = {
try { processMailbox() } catch {
case ie: InterruptedException Thread.currentThread().interrupt() //Restore interrupt
} finally {
dispatcherLock.unlock()
if (!isEmpty || !systemMessages.isEmpty)
if (hasMessages || hasSystemMessages)
dispatcher.reRegisterForExecution(this)
}
}
@ -87,8 +81,24 @@ trait Mailbox extends Runnable {
def enqueue(handle: Envelope)
def dequeue(): Envelope
def size: Int
def isEmpty: Boolean
def numberOfMessages: Int
def systemEnqueue(handle: SystemEnvelope): Unit
def systemDequeue(): SystemEnvelope
def hasMessages: Boolean
def hasSystemMessages: Boolean
def dispatcher: MessageDispatcher
}
trait DefaultSystemMessageImpl { self: Mailbox
val systemMessages = new ConcurrentLinkedQueue[SystemEnvelope]()
def systemEnqueue(handle: SystemEnvelope): Unit = systemMessages offer handle
def systemDequeue(): SystemEnvelope = systemMessages.poll()
def hasSystemMessages: Boolean = !systemMessages.isEmpty
}
trait UnboundedMessageQueueSemantics { self: QueueMailbox
@ -114,8 +124,8 @@ trait BoundedMessageQueueSemantics { self: BlockingQueueMailbox ⇒
trait QueueMailbox extends Mailbox {
val queue: Queue[Envelope]
final def size = queue.size
final def isEmpty = queue.isEmpty
final def numberOfMessages = queue.size
final def hasMessages = !queue.isEmpty
}
trait BlockingQueueMailbox extends QueueMailbox {
@ -142,7 +152,7 @@ trait MailboxType {
}
case class UnboundedMailbox() extends MailboxType {
override def create(dispatcher: MessageDispatcher) = new ConcurrentLinkedQueueMailbox(dispatcher) with UnboundedMessageQueueSemantics
override def create(dispatcher: MessageDispatcher) = new ConcurrentLinkedQueueMailbox(dispatcher) with UnboundedMessageQueueSemantics with DefaultSystemMessageImpl
}
case class BoundedMailbox(
@ -152,14 +162,14 @@ case class BoundedMailbox(
if (capacity < 0) throw new IllegalArgumentException("The capacity for BoundedMailbox can not be negative")
if (pushTimeOut eq null) throw new IllegalArgumentException("The push time-out for BoundedMailbox can not be null")
override def create(dispatcher: MessageDispatcher) = new LinkedBlockingQueueMailbox(dispatcher) with BoundedMessageQueueSemantics {
override def create(dispatcher: MessageDispatcher) = new LinkedBlockingQueueMailbox(dispatcher) with BoundedMessageQueueSemantics with DefaultSystemMessageImpl {
val capacity = BoundedMailbox.this.capacity
val pushTimeOut = BoundedMailbox.this.pushTimeOut
}
}
case class UnboundedPriorityMailbox(cmp: Comparator[Envelope]) extends MailboxType {
override def create(dispatcher: MessageDispatcher) = new PriorityBlockingQueueMailbox(cmp, dispatcher) with UnboundedMessageQueueSemantics
override def create(dispatcher: MessageDispatcher) = new PriorityBlockingQueueMailbox(cmp, dispatcher) with UnboundedMessageQueueSemantics with DefaultSystemMessageImpl
}
case class BoundedPriorityMailbox(
@ -170,7 +180,7 @@ case class BoundedPriorityMailbox(
if (capacity < 0) throw new IllegalArgumentException("The capacity for BoundedMailbox can not be negative")
if (pushTimeOut eq null) throw new IllegalArgumentException("The push time-out for BoundedMailbox can not be null")
override def create(dispatcher: MessageDispatcher) = new PriorityBlockingQueueMailbox(cmp, dispatcher) with BoundedMessageQueueSemantics {
override def create(dispatcher: MessageDispatcher) = new PriorityBlockingQueueMailbox(cmp, dispatcher) with BoundedMessageQueueSemantics with DefaultSystemMessageImpl {
val capacity = BoundedPriorityMailbox.this.capacity
val pushTimeOut = BoundedPriorityMailbox.this.pushTimeOut
}

View file

@ -78,12 +78,17 @@ abstract class MessageDispatcher extends Serializable {
/**
* Create a blackhole mailbox for the purpose of replacing the real one upon actor termination
*/
protected[akka] def createDeadletterMailbox = new Mailbox {
override def dispatcher = MessageDispatcher.this
override def enqueue(envelope: Envelope) {}
protected[akka] val deadLetterMailbox = new Mailbox {
override def dispatcher = null //MessageDispatcher.this
dispatcherLock.tryLock()
override def enqueue(envelope: Envelope) { envelope.channel sendException new ActorKilledException("Actor has been stopped") }
override def dequeue() = null
override def isEmpty = true
override def size = 0
override def systemEnqueue(handle: SystemEnvelope): Unit = ()
override def systemDequeue(): SystemEnvelope = null
override def hasMessages = false
override def hasSystemMessages = false
override def numberOfMessages = 0
}
/**
@ -170,7 +175,7 @@ abstract class MessageDispatcher extends Serializable {
protected[akka] def unregister(actor: ActorCell) = {
if (uuids remove actor.uuid) {
cleanUpMailboxFor(actor)
actor.mailbox = createDeadletterMailbox
actor.mailbox = deadLetterMailbox
if (uuids.isEmpty && _tasks.get == 0) {
shutdownSchedule match {
case UNSCHEDULED
@ -274,12 +279,12 @@ abstract class MessageDispatcher extends Serializable {
/**
* Returns the size of the mailbox for the specified actor
*/
def mailboxSize(actor: ActorCell): Int = actor.mailbox.size
def mailboxSize(actor: ActorCell): Int = actor.mailbox.numberOfMessages
/**
* Returns the "current" emptiness status of the mailbox for the specified actor
*/
def mailboxIsEmpty(actor: ActorCell): Boolean = actor.mailbox.isEmpty
def mailboxIsEmpty(actor: ActorCell): Boolean = actor.mailbox.hasMessages
/**
* Returns the amount of tasks queued for execution

View file

@ -243,7 +243,7 @@ class NestingQueue {
def isActive = active
}
class CallingThreadMailbox(val dispatcher: MessageDispatcher) extends Mailbox {
class CallingThreadMailbox(val dispatcher: MessageDispatcher) extends Mailbox with DefaultSystemMessageImpl {
private val q = new ThreadLocal[NestingQueue]() {
override def initialValue = new NestingQueue
@ -256,6 +256,6 @@ class CallingThreadMailbox(val dispatcher: MessageDispatcher) extends Mailbox {
override def enqueue(msg: Envelope) {}
override def dequeue() = null
override def isEmpty = true
override def size = 0
override def hasMessages = true
override def numberOfMessages = 0
}