diff --git a/akka-actor/src/main/java/akka/dispatch/AbstractMailbox.java b/akka-actor/src/main/java/akka/dispatch/AbstractMailbox.java index 21d41ac921..0f37dec003 100644 --- a/akka-actor/src/main/java/akka/dispatch/AbstractMailbox.java +++ b/akka-actor/src/main/java/akka/dispatch/AbstractMailbox.java @@ -5,8 +5,14 @@ package akka.dispatch; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; abstract class AbstractMailbox { private volatile int _status; // not initialized because this is faster: 0 == Open - protected final static AtomicIntegerFieldUpdater updater = AtomicIntegerFieldUpdater.newUpdater(AbstractMailbox.class, "_status"); + protected final static AtomicIntegerFieldUpdater updater = + AtomicIntegerFieldUpdater.newUpdater(AbstractMailbox.class, "_status"); + + private volatile SystemMessage _systemQueue; // not initialized because this is faster + protected final static AtomicReferenceFieldUpdater systemQueueUpdater = + AtomicReferenceFieldUpdater.newUpdater(AbstractMailbox.class, SystemMessage.class, "_systemQueue"); } diff --git a/akka-actor/src/main/scala/akka/actor/ActorCell.scala b/akka-actor/src/main/scala/akka/actor/ActorCell.scala index 268beb890a..910777685c 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorCell.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorCell.scala @@ -243,6 +243,7 @@ private[akka] class ActorCell( if (props.supervisor.isDefined) { props.supervisor.get match { case l: LocalActorRef ⇒ + // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅ l.underlying.dispatcher.systemDispatch(l.underlying, akka.dispatch.Supervise(self)) //FIXME TODO Support all ActorRefs? case other ⇒ throw new UnsupportedOperationException("Supervision failure: " + other + " cannot be a supervisor, only LocalActorRefs can") } @@ -251,19 +252,23 @@ private[akka] class ActorCell( dispatcher.attach(this) } + // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅ def suspend(): Unit = dispatcher.systemDispatch(this, Suspend()) + // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅ def resume(): Unit = dispatcher.systemDispatch(this, Resume()) - private[akka] def stop(): Unit = - dispatcher.systemDispatch(this, Terminate()) + // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅ + private[akka] def stop(): Unit = dispatcher.systemDispatch(this, Terminate()) def startsMonitoring(subject: ActorRef): ActorRef = { + // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅ dispatcher.systemDispatch(this, Link(subject)) subject } def stopsMonitoring(subject: ActorRef): ActorRef = { + // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅ dispatcher.systemDispatch(this, Unlink(subject)) subject } @@ -421,18 +426,18 @@ private[akka] class ActorCell( val isClosed = mailbox.isClosed //Fence plus volatile read if (!isClosed) { message match { - case Create(_) ⇒ create() - case Recreate(cause, _) ⇒ recreate(cause) - case Link(subject, _) ⇒ + case Create() ⇒ create() + case Recreate(cause) ⇒ recreate(cause) + case Link(subject) ⇒ app.deathWatch.subscribe(self, subject) if (app.AkkaConfig.DebugLifecycle) app.eventHandler.debug(self, "now monitoring " + subject) - case Unlink(subject, _) ⇒ + case Unlink(subject) ⇒ app.deathWatch.unsubscribe(self, subject) if (app.AkkaConfig.DebugLifecycle) app.eventHandler.debug(self, "stopped monitoring " + subject) - case Suspend(_) ⇒ suspend() - case Resume(_) ⇒ resume() - case Terminate(_) ⇒ terminate() - case Supervise(child, _) ⇒ supervise(child) + case Suspend() ⇒ suspend() + case Resume() ⇒ resume() + case Terminate() ⇒ terminate() + case Supervise(child) ⇒ supervise(child) } } } catch { @@ -491,6 +496,7 @@ private[akka] class ActorCell( def handleChildTerminated(child: ActorRef): Unit = _children = props.faultHandler.handleChildTerminated(child, _children) + // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅ def restart(cause: Throwable): Unit = dispatcher.systemDispatch(this, Recreate(cause)) def checkReceiveTimeout() { diff --git a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala index 48b72d576b..58502fa334 100644 --- a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala @@ -12,6 +12,7 @@ import akka.util.{ Duration, Switch, ReentrantGuard } import java.util.concurrent.ThreadPoolExecutor.{ AbortPolicy, CallerRunsPolicy, DiscardOldestPolicy, DiscardPolicy } import akka.actor._ import akka.AkkaApplication +import scala.annotation.tailrec /** * @author Jonas Bonér @@ -24,17 +25,45 @@ final case class Envelope(val receiver: ActorCell, val message: Any, val channel } } -sealed trait SystemMessage extends PossiblyHarmful { - def next: SystemMessage +object SystemMessage { + @tailrec + final def size(list: SystemMessage, acc: Int = 0): Int = { + if (list eq null) acc else size(list.next, acc + 1) + } + + @tailrec + final def reverse(list: SystemMessage, acc: SystemMessage = null): SystemMessage = { + if (list eq null) acc else { + val next = list.next + list.next = acc + reverse(next, list) + } + } } -case class Create(next: SystemMessage = null) extends SystemMessage -case class Recreate(cause: Throwable, next: SystemMessage = null) extends SystemMessage -case class Suspend(next: SystemMessage = null) extends SystemMessage -case class Resume(next: SystemMessage = null) extends SystemMessage -case class Terminate(next: SystemMessage = null) extends SystemMessage -case class Supervise(child: ActorRef, next: SystemMessage = null) extends SystemMessage -case class Link(subject: ActorRef, next: SystemMessage = null) extends SystemMessage -case class Unlink(subject: ActorRef, next: SystemMessage = null) extends SystemMessage + +/** + * System messages are handled specially: they form their own queue within + * each actor’s mailbox. This queue is encoded in the messages themselves to + * avoid extra allocations and overhead. The next pointer is a normal var, and + * it does not need to be volatile because in the enqueuing method its update + * is immediately succeeded by a volatile write and all reads happen after the + * volatile read in the dequeuing thread. Afterwards, the obtained list of + * system messages is handled in a single thread only and not ever passed around, + * hence no further synchronization is needed. + * + * ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅ + */ +sealed trait SystemMessage extends PossiblyHarmful { + var next: SystemMessage = _ +} +case class Create() extends SystemMessage +case class Recreate(cause: Throwable) extends SystemMessage +case class Suspend() extends SystemMessage +case class Resume() extends SystemMessage +case class Terminate() extends SystemMessage +case class Supervise(child: ActorRef) extends SystemMessage +case class Link(subject: ActorRef) extends SystemMessage +case class Unlink(subject: ActorRef) extends SystemMessage final case class TaskInvocation(app: AkkaApplication, function: () ⇒ Unit, cleanup: () ⇒ Unit) extends Runnable { def run() { @@ -85,7 +114,7 @@ abstract class MessageDispatcher(val app: AkkaApplication) extends Serializable override def enqueue(envelope: Envelope) { envelope.channel sendException new ActorKilledException("Actor has been stopped") } override def dequeue() = null override def systemEnqueue(handle: SystemMessage): Unit = () - override def systemDequeue(): SystemMessage = null + override def systemDrain(): SystemMessage = null override def hasMessages = false override def hasSystemMessages = false override def numberOfMessages = 0 @@ -172,6 +201,7 @@ abstract class MessageDispatcher(val app: AkkaApplication) extends Serializable */ protected[akka] def register(actor: ActorCell) { _actors.incrementAndGet() + // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅ systemDispatch(actor, Create()) //FIXME should this be here or moved into ActorCell.start perhaps? } @@ -194,10 +224,10 @@ abstract class MessageDispatcher(val app: AkkaApplication) extends Serializable protected def cleanUpMailboxFor(actor: ActorCell, mailBox: Mailbox) { if (mailBox.hasSystemMessages) { - var envelope = mailBox.systemDequeue() - while (envelope ne null) { - deadLetterMailbox.systemEnqueue(envelope) - envelope = mailBox.systemDequeue() + var message = mailBox.systemDrain() + while (message ne null) { + deadLetterMailbox.systemEnqueue(message) + message = message.next } } diff --git a/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala index 551eea21ce..f38e8d8f54 100644 --- a/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala @@ -91,10 +91,10 @@ class BalancingDispatcher( protected override def cleanUpMailboxFor(actor: ActorCell, mailBox: Mailbox) { if (mailBox.hasSystemMessages) { - var envelope = mailBox.systemDequeue() - while (envelope ne null) { - deadLetterMailbox.systemEnqueue(envelope) //Send to dead letter queue - envelope = mailBox.systemDequeue() + var messages = mailBox.systemDrain() + while (messages ne null) { + deadLetterMailbox.systemEnqueue(messages) //Send to dead letter queue + messages = messages.next } } } diff --git a/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala b/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala index 095a0f9cd5..df730efd8c 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala @@ -134,6 +134,12 @@ abstract class Mailbox(val actor: ActorCell) extends AbstractMailbox with Messag } } + /* + * AtomicReferenceFieldUpdater for system queue + */ + protected final def systemQueueGet: SystemMessage = AbstractMailbox.systemQueueUpdater.get(this) + protected final def systemQueuePut(_old: SystemMessage, _new: SystemMessage): Boolean = AbstractMailbox.systemQueueUpdater.compareAndSet(this, _old, _new) + def shouldBeRegisteredForExecution(hasMessageHint: Boolean, hasSystemMessageHint: Boolean): Boolean = status match { case Open | Scheduled ⇒ hasMessageHint || hasSystemMessageHint || hasSystemMessages || hasMessages case Closed ⇒ false @@ -184,10 +190,18 @@ abstract class Mailbox(val actor: ActorCell) extends AbstractMailbox with Messag } def processAllSystemMessages() { - var nextMessage = systemDequeue() - while (nextMessage ne null) { - actor systemInvoke nextMessage - nextMessage = systemDequeue() + var nextMessage = systemDrain() + try { + while (nextMessage ne null) { + actor systemInvoke nextMessage + nextMessage = nextMessage.next + // don’t ever execute normal message when system message present! + if (nextMessage eq null) nextMessage = systemDrain() + } + } catch { + case e ⇒ + actor.app.eventHandler.error(e, this, "exception during processing system messages, dropping " + SystemMessage.size(nextMessage) + " messages!") + throw e } } @@ -208,22 +222,41 @@ trait MessageQueue { } trait SystemMessageQueue { + /** + * Enqueue a new system message, e.g. by prepending atomically as new head of a single-linked list. + */ def systemEnqueue(message: SystemMessage): Unit - def systemDequeue(): SystemMessage + /** + * Dequeue all messages from system queue and return them as single-linked list. + */ + def systemDrain(): SystemMessage def hasSystemMessages: Boolean } -trait DefaultSystemMessageQueue { self: SystemMessageQueue ⇒ +trait DefaultSystemMessageQueue { self: Mailbox ⇒ - final val systemMessages = new ConcurrentLinkedQueue[SystemMessage]() + @tailrec + final def systemEnqueue(message: SystemMessage): Unit = { + val head = systemQueueGet + /* + * this write is safely published by the compareAndSet contained within + * systemQueuePut; “Intra-Thread Semantics” on page 12 of the JSR133 spec + * guarantees that “head” uses the value obtained from systemQueueGet above. + * Hence, SystemMessage.next does not need to be volatile. + */ + message.next = head + if (!systemQueuePut(head, message)) systemEnqueue(message) + } - def systemEnqueue(message: SystemMessage): Unit = systemMessages offer message + @tailrec + final def systemDrain(): SystemMessage = { + val head = systemQueueGet + if (systemQueuePut(head, null)) SystemMessage.reverse(head) else systemDrain() + } - def systemDequeue(): SystemMessage = systemMessages.poll() - - def hasSystemMessages: Boolean = !systemMessages.isEmpty + def hasSystemMessages: Boolean = systemQueueGet ne null } trait UnboundedMessageQueueSemantics extends QueueBasedMessageQueue {