replace ConcurrentLinkedQueue with single-linked list for Mailbox.systemQueue

cost zero when empty, non-blocking, shave off 84 bytes per actor
This commit is contained in:
Roland 2011-10-18 18:06:17 +02:00
parent 01efcd7b50
commit 5c823ad50d
5 changed files with 116 additions and 41 deletions

View file

@ -5,8 +5,14 @@
package akka.dispatch;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
abstract class AbstractMailbox {
private volatile int _status; // not initialized because this is faster: 0 == Open
protected final static AtomicIntegerFieldUpdater<AbstractMailbox> updater = AtomicIntegerFieldUpdater.newUpdater(AbstractMailbox.class, "_status");
protected final static AtomicIntegerFieldUpdater<AbstractMailbox> updater =
AtomicIntegerFieldUpdater.newUpdater(AbstractMailbox.class, "_status");
private volatile SystemMessage _systemQueue; // not initialized because this is faster
protected final static AtomicReferenceFieldUpdater<AbstractMailbox, SystemMessage> systemQueueUpdater =
AtomicReferenceFieldUpdater.newUpdater(AbstractMailbox.class, SystemMessage.class, "_systemQueue");
}

View file

@ -243,6 +243,7 @@ private[akka] class ActorCell(
if (props.supervisor.isDefined) {
props.supervisor.get match {
case l: LocalActorRef
// ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅
l.underlying.dispatcher.systemDispatch(l.underlying, akka.dispatch.Supervise(self)) //FIXME TODO Support all ActorRefs?
case other throw new UnsupportedOperationException("Supervision failure: " + other + " cannot be a supervisor, only LocalActorRefs can")
}
@ -251,19 +252,23 @@ private[akka] class ActorCell(
dispatcher.attach(this)
}
// ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅
def suspend(): Unit = dispatcher.systemDispatch(this, Suspend())
// ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅
def resume(): Unit = dispatcher.systemDispatch(this, Resume())
private[akka] def stop(): Unit =
dispatcher.systemDispatch(this, Terminate())
// ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅
private[akka] def stop(): Unit = dispatcher.systemDispatch(this, Terminate())
def startsMonitoring(subject: ActorRef): ActorRef = {
// ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅
dispatcher.systemDispatch(this, Link(subject))
subject
}
def stopsMonitoring(subject: ActorRef): ActorRef = {
// ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅
dispatcher.systemDispatch(this, Unlink(subject))
subject
}
@ -421,18 +426,18 @@ private[akka] class ActorCell(
val isClosed = mailbox.isClosed //Fence plus volatile read
if (!isClosed) {
message match {
case Create(_) create()
case Recreate(cause, _) recreate(cause)
case Link(subject, _)
case Create() create()
case Recreate(cause) recreate(cause)
case Link(subject)
app.deathWatch.subscribe(self, subject)
if (app.AkkaConfig.DebugLifecycle) app.eventHandler.debug(self, "now monitoring " + subject)
case Unlink(subject, _)
case Unlink(subject)
app.deathWatch.unsubscribe(self, subject)
if (app.AkkaConfig.DebugLifecycle) app.eventHandler.debug(self, "stopped monitoring " + subject)
case Suspend(_) suspend()
case Resume(_) resume()
case Terminate(_) terminate()
case Supervise(child, _) supervise(child)
case Suspend() suspend()
case Resume() resume()
case Terminate() terminate()
case Supervise(child) supervise(child)
}
}
} catch {
@ -491,6 +496,7 @@ private[akka] class ActorCell(
def handleChildTerminated(child: ActorRef): Unit = _children = props.faultHandler.handleChildTerminated(child, _children)
// ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅
def restart(cause: Throwable): Unit = dispatcher.systemDispatch(this, Recreate(cause))
def checkReceiveTimeout() {

View file

@ -12,6 +12,7 @@ import akka.util.{ Duration, Switch, ReentrantGuard }
import java.util.concurrent.ThreadPoolExecutor.{ AbortPolicy, CallerRunsPolicy, DiscardOldestPolicy, DiscardPolicy }
import akka.actor._
import akka.AkkaApplication
import scala.annotation.tailrec
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
@ -24,17 +25,45 @@ final case class Envelope(val receiver: ActorCell, val message: Any, val channel
}
}
sealed trait SystemMessage extends PossiblyHarmful {
def next: SystemMessage
object SystemMessage {
@tailrec
final def size(list: SystemMessage, acc: Int = 0): Int = {
if (list eq null) acc else size(list.next, acc + 1)
}
@tailrec
final def reverse(list: SystemMessage, acc: SystemMessage = null): SystemMessage = {
if (list eq null) acc else {
val next = list.next
list.next = acc
reverse(next, list)
}
}
}
case class Create(next: SystemMessage = null) extends SystemMessage
case class Recreate(cause: Throwable, next: SystemMessage = null) extends SystemMessage
case class Suspend(next: SystemMessage = null) extends SystemMessage
case class Resume(next: SystemMessage = null) extends SystemMessage
case class Terminate(next: SystemMessage = null) extends SystemMessage
case class Supervise(child: ActorRef, next: SystemMessage = null) extends SystemMessage
case class Link(subject: ActorRef, next: SystemMessage = null) extends SystemMessage
case class Unlink(subject: ActorRef, next: SystemMessage = null) extends SystemMessage
/**
* System messages are handled specially: they form their own queue within
* each actors mailbox. This queue is encoded in the messages themselves to
* avoid extra allocations and overhead. The next pointer is a normal var, and
* it does not need to be volatile because in the enqueuing method its update
* is immediately succeeded by a volatile write and all reads happen after the
* volatile read in the dequeuing thread. Afterwards, the obtained list of
* system messages is handled in a single thread only and not ever passed around,
* hence no further synchronization is needed.
*
* ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅
*/
sealed trait SystemMessage extends PossiblyHarmful {
var next: SystemMessage = _
}
case class Create() extends SystemMessage
case class Recreate(cause: Throwable) extends SystemMessage
case class Suspend() extends SystemMessage
case class Resume() extends SystemMessage
case class Terminate() extends SystemMessage
case class Supervise(child: ActorRef) extends SystemMessage
case class Link(subject: ActorRef) extends SystemMessage
case class Unlink(subject: ActorRef) extends SystemMessage
final case class TaskInvocation(app: AkkaApplication, function: () Unit, cleanup: () Unit) extends Runnable {
def run() {
@ -85,7 +114,7 @@ abstract class MessageDispatcher(val app: AkkaApplication) extends Serializable
override def enqueue(envelope: Envelope) { envelope.channel sendException new ActorKilledException("Actor has been stopped") }
override def dequeue() = null
override def systemEnqueue(handle: SystemMessage): Unit = ()
override def systemDequeue(): SystemMessage = null
override def systemDrain(): SystemMessage = null
override def hasMessages = false
override def hasSystemMessages = false
override def numberOfMessages = 0
@ -172,6 +201,7 @@ abstract class MessageDispatcher(val app: AkkaApplication) extends Serializable
*/
protected[akka] def register(actor: ActorCell) {
_actors.incrementAndGet()
// ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅
systemDispatch(actor, Create()) //FIXME should this be here or moved into ActorCell.start perhaps?
}
@ -194,10 +224,10 @@ abstract class MessageDispatcher(val app: AkkaApplication) extends Serializable
protected def cleanUpMailboxFor(actor: ActorCell, mailBox: Mailbox) {
if (mailBox.hasSystemMessages) {
var envelope = mailBox.systemDequeue()
while (envelope ne null) {
deadLetterMailbox.systemEnqueue(envelope)
envelope = mailBox.systemDequeue()
var message = mailBox.systemDrain()
while (message ne null) {
deadLetterMailbox.systemEnqueue(message)
message = message.next
}
}

View file

@ -91,10 +91,10 @@ class BalancingDispatcher(
protected override def cleanUpMailboxFor(actor: ActorCell, mailBox: Mailbox) {
if (mailBox.hasSystemMessages) {
var envelope = mailBox.systemDequeue()
while (envelope ne null) {
deadLetterMailbox.systemEnqueue(envelope) //Send to dead letter queue
envelope = mailBox.systemDequeue()
var messages = mailBox.systemDrain()
while (messages ne null) {
deadLetterMailbox.systemEnqueue(messages) //Send to dead letter queue
messages = messages.next
}
}
}

View file

@ -134,6 +134,12 @@ abstract class Mailbox(val actor: ActorCell) extends AbstractMailbox with Messag
}
}
/*
* AtomicReferenceFieldUpdater for system queue
*/
protected final def systemQueueGet: SystemMessage = AbstractMailbox.systemQueueUpdater.get(this)
protected final def systemQueuePut(_old: SystemMessage, _new: SystemMessage): Boolean = AbstractMailbox.systemQueueUpdater.compareAndSet(this, _old, _new)
def shouldBeRegisteredForExecution(hasMessageHint: Boolean, hasSystemMessageHint: Boolean): Boolean = status match {
case Open | Scheduled hasMessageHint || hasSystemMessageHint || hasSystemMessages || hasMessages
case Closed false
@ -184,10 +190,18 @@ abstract class Mailbox(val actor: ActorCell) extends AbstractMailbox with Messag
}
def processAllSystemMessages() {
var nextMessage = systemDequeue()
while (nextMessage ne null) {
actor systemInvoke nextMessage
nextMessage = systemDequeue()
var nextMessage = systemDrain()
try {
while (nextMessage ne null) {
actor systemInvoke nextMessage
nextMessage = nextMessage.next
// dont ever execute normal message when system message present!
if (nextMessage eq null) nextMessage = systemDrain()
}
} catch {
case e
actor.app.eventHandler.error(e, this, "exception during processing system messages, dropping " + SystemMessage.size(nextMessage) + " messages!")
throw e
}
}
@ -208,22 +222,41 @@ trait MessageQueue {
}
trait SystemMessageQueue {
/**
* Enqueue a new system message, e.g. by prepending atomically as new head of a single-linked list.
*/
def systemEnqueue(message: SystemMessage): Unit
def systemDequeue(): SystemMessage
/**
* Dequeue all messages from system queue and return them as single-linked list.
*/
def systemDrain(): SystemMessage
def hasSystemMessages: Boolean
}
trait DefaultSystemMessageQueue { self: SystemMessageQueue
trait DefaultSystemMessageQueue { self: Mailbox
final val systemMessages = new ConcurrentLinkedQueue[SystemMessage]()
@tailrec
final def systemEnqueue(message: SystemMessage): Unit = {
val head = systemQueueGet
/*
* this write is safely published by the compareAndSet contained within
* systemQueuePut; Intra-Thread Semantics on page 12 of the JSR133 spec
* guarantees that head uses the value obtained from systemQueueGet above.
* Hence, SystemMessage.next does not need to be volatile.
*/
message.next = head
if (!systemQueuePut(head, message)) systemEnqueue(message)
}
def systemEnqueue(message: SystemMessage): Unit = systemMessages offer message
@tailrec
final def systemDrain(): SystemMessage = {
val head = systemQueueGet
if (systemQueuePut(head, null)) SystemMessage.reverse(head) else systemDrain()
}
def systemDequeue(): SystemMessage = systemMessages.poll()
def hasSystemMessages: Boolean = !systemMessages.isEmpty
def hasSystemMessages: Boolean = systemQueueGet ne null
}
trait UnboundedMessageQueueSemantics extends QueueBasedMessageQueue {