Making SerializedSuspendableExecutionContext use AbstractNodeQueue instead of ConcurrentLinkedQueue

This commit is contained in:
Viktor Klang 2013-03-28 20:14:37 +01:00
parent fb2decbcda
commit 3ab3de1eb6
161 changed files with 3553 additions and 1656 deletions

View file

@ -6,6 +6,7 @@ package akka.dispatch
import java.util.{ Comparator, PriorityQueue, Queue, Deque }
import java.util.concurrent._
import akka.AkkaException
import akka.dispatch.sysmsg._
import akka.actor.{ ActorCell, ActorRef, Cell, ActorSystem, InternalActorRef, DeadLetter }
import akka.util.{ Unsafe, BoundedBlockingQueue }
import akka.event.Logging.Error
@ -192,12 +193,19 @@ private[akka] abstract class Mailbox(val messageQueue: MessageQueue)
val s = status
updateStatus(s, s & ~Scheduled) || setAsIdle()
}
/*
* AtomicReferenceFieldUpdater for system queue.
*/
protected final def systemQueueGet: LatestFirstSystemMessageList =
// Note: contrary how it looks, there is no allocation here, as SystemMessageList is a value class and as such
// it just exists as a typed view during compile-time. The actual return type is still SystemMessage.
new LatestFirstSystemMessageList(Unsafe.instance.getObjectVolatile(this, AbstractMailbox.systemMessageOffset).asInstanceOf[SystemMessage])
protected final def systemQueueGet: SystemMessage =
Unsafe.instance.getObjectVolatile(this, AbstractMailbox.systemMessageOffset).asInstanceOf[SystemMessage]
protected final def systemQueuePut(_old: SystemMessage, _new: SystemMessage): Boolean =
Unsafe.instance.compareAndSwapObject(this, AbstractMailbox.systemMessageOffset, _old, _new)
protected final def systemQueuePut(_old: LatestFirstSystemMessageList, _new: LatestFirstSystemMessageList): Boolean =
// Note: calling .head is not actually existing on the bytecode level as the parameters _old and _new
// are SystemMessage instances hidden during compile time behind the SystemMessageList value class.
// Without calling .head the parameters would be boxed in SystemMessageList wrapper.
Unsafe.instance.compareAndSwapObject(this, AbstractMailbox.systemMessageOffset, _old.head, _new.head)
final def canBeScheduledForExecution(hasMessageHint: Boolean, hasSystemMessageHint: Boolean): Boolean = status match {
case Open | Scheduled hasMessageHint || hasSystemMessageHint || hasSystemMessages || hasMessages
@ -245,28 +253,28 @@ private[akka] abstract class Mailbox(val messageQueue: MessageQueue)
*/
final def processAllSystemMessages() {
var interruption: Throwable = null
var nextMessage = systemDrain(null)
while ((nextMessage ne null) && !isClosed) {
val msg = nextMessage
nextMessage = nextMessage.next
msg.next = null
var messageList = systemDrain(SystemMessageList.LNil)
while ((messageList.nonEmpty) && !isClosed) {
val msg = messageList.head
messageList = messageList.tail
msg.unlink()
if (debug) println(actor.self + " processing system message " + msg + " with " + actor.childrenRefs)
// we know here that systemInvoke ensures that only "fatal" exceptions get rethrown
actor systemInvoke msg
if (Thread.interrupted())
interruption = new InterruptedException("Interrupted while processing system messages")
// dont ever execute normal message when system message present!
if ((nextMessage eq null) && !isClosed) nextMessage = systemDrain(null)
if ((messageList.isEmpty) && !isClosed) messageList = systemDrain(SystemMessageList.LNil)
}
/*
* if we closed the mailbox, we must dump the remaining system messages
* to deadLetters (this is essential for DeathWatch)
*/
val dlm = actor.systemImpl.deadLetterMailbox
while (nextMessage ne null) {
val msg = nextMessage
nextMessage = nextMessage.next
msg.next = null
while (messageList.nonEmpty) {
val msg = messageList.head
messageList = messageList.tail
msg.unlink()
try dlm.systemEnqueue(actor.self, msg)
catch {
case e: InterruptedException interruption = e
@ -289,13 +297,13 @@ private[akka] abstract class Mailbox(val messageQueue: MessageQueue)
protected[dispatch] def cleanUp(): Unit =
if (actor ne null) { // actor is null for the deadLetterMailbox
val dlm = actor.systemImpl.deadLetterMailbox
var message = systemDrain(NoMessage)
while (message ne null) {
var messageList = systemDrain(new LatestFirstSystemMessageList(NoMessage))
while (messageList.nonEmpty) {
// message must be virgin before being able to systemEnqueue again
val next = message.next
message.next = null
dlm.systemEnqueue(actor.self, message)
message = next
val msg = messageList.head
messageList = messageList.tail
msg.unlink()
dlm.systemEnqueue(actor.self, msg)
}
if (messageQueue ne null) // needed for CallingThreadDispatcher, which never calls Mailbox.run()
@ -374,7 +382,7 @@ private[akka] trait SystemMessageQueue {
/**
* Dequeue all messages from system queue and return them as single-linked list.
*/
def systemDrain(newContents: SystemMessage): SystemMessage
def systemDrain(newContents: LatestFirstSystemMessageList): EarliestFirstSystemMessageList
def hasSystemMessages: Boolean
}
@ -386,36 +394,26 @@ private[akka] trait DefaultSystemMessageQueue { self: Mailbox ⇒
@tailrec
final def systemEnqueue(receiver: ActorRef, message: SystemMessage): Unit = {
assert(message.next eq null)
assert(message.unlinked)
if (Mailbox.debug) println(receiver + " having enqueued " + message)
val head = systemQueueGet
if (head == NoMessage) {
val currentList = systemQueueGet
if (currentList.head == NoMessage) {
if (actor ne null) actor.systemImpl.deadLetterMailbox.systemEnqueue(receiver, message)
} else {
/*
* This write is safely published by the compareAndSet contained within
* systemQueuePut; Intra-Thread Semantics on page 12 of the JSR133 spec
* guarantees that head uses the value obtained from systemQueueGet above.
* Hence, SystemMessage.next does not need to be volatile.
*/
message.next = head
if (!systemQueuePut(head, message)) {
message.next = null
if (!systemQueuePut(currentList, message :: currentList)) {
message.unlink()
systemEnqueue(receiver, message)
}
}
}
@tailrec
final def systemDrain(newContents: SystemMessage): SystemMessage = systemQueueGet match {
case NoMessage null
case head if (systemQueuePut(head, newContents)) SystemMessage.reverse(head) else systemDrain(newContents)
final def systemDrain(newContents: LatestFirstSystemMessageList): EarliestFirstSystemMessageList = {
val currentList = systemQueueGet
if (systemQueuePut(currentList, newContents)) currentList.reverse else systemDrain(newContents)
}
def hasSystemMessages: Boolean = systemQueueGet match {
case null | NoMessage false
case _ true
}
def hasSystemMessages: Boolean = systemQueueGet.nonEmpty
}