diff --git a/akka-actor/src/main/java/akka/actor/AbstractActorCell.java b/akka-actor/src/main/java/akka/actor/AbstractActorCell.java new file mode 100644 index 0000000000..d6005f463c --- /dev/null +++ b/akka-actor/src/main/java/akka/actor/AbstractActorCell.java @@ -0,0 +1,19 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ + +package akka.actor; + +import akka.util.Unsafe; + +final class AbstractActorCell { + final static long mailboxOffset; + + static { + try { + mailboxOffset = Unsafe.instance.objectFieldOffset(ActorCell.class.getDeclaredField("_mailboxDoNotCallMeDirectly")); + } catch(Throwable t){ + throw new ExceptionInInitializerError(t); + } + } +} diff --git a/akka-actor/src/main/scala/akka/actor/ActorCell.scala b/akka-actor/src/main/scala/akka/actor/ActorCell.scala index 23cd796ad2..c74010668b 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorCell.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorCell.scala @@ -9,13 +9,12 @@ import scala.annotation.tailrec import java.util.concurrent.TimeUnit import java.util.concurrent.TimeUnit.MILLISECONDS import akka.event.Logging.{ Debug, Warning, Error } -import akka.util.{ Duration, Helpers } import akka.japi.Procedure import java.io.{ NotSerializableException, ObjectOutputStream } import akka.serialization.SerializationExtension -import akka.util.NonFatal import akka.event.Logging.LogEventException import collection.immutable.{ TreeSet, Stack, TreeMap } +import akka.util.{ Unsafe, Duration, Helpers, NonFatal } //TODO: everything here for current compatibility - could be limited more @@ -319,7 +318,7 @@ private[akka] class ActorCell( val props: Props, @volatile var parent: InternalActorRef, /*no member*/ _receiveTimeout: Option[Duration]) extends UntypedActorContext { - + import AbstractActorCell.mailboxOffset import ActorCell._ final def systemImpl = system @@ -412,8 +411,7 @@ private[akka] class ActorCell( var currentMessage: Envelope = _ var actor: Actor = _ private var behaviorStack: Stack[Actor.Receive] = Stack.empty - @volatile //This must be volatile since it isn't protected by the mailbox status - var mailbox: Mailbox = _ + @volatile var _mailboxDoNotCallMeDirectly: Mailbox = _ //This must be volatile since it isn't protected by the mailbox status var nextNameSequence: Long = 0 var watching: Set[ActorRef] = emptyActorRefSet var watchedBy: Set[ActorRef] = emptyActorRefSet @@ -428,6 +426,24 @@ private[akka] class ActorCell( @inline final val dispatcher: MessageDispatcher = system.dispatchers.lookup(props.dispatcher) + /** + * INTERNAL API + * + * Returns a reference to the current mailbox + */ + @inline final def mailbox: Mailbox = Unsafe.instance.getObjectVolatile(this, mailboxOffset).asInstanceOf[Mailbox] + + /** + * INTERNAL API + * + * replaces the current mailbox using getAndSet semantics + */ + @tailrec final def swapMailbox(newMailbox: Mailbox): Mailbox = { + val oldMailbox = mailbox + if (!Unsafe.instance.compareAndSwapObject(this, mailboxOffset, oldMailbox, newMailbox)) swapMailbox(newMailbox) + else oldMailbox + } + /** * UntypedActorContext impl */ @@ -440,7 +456,7 @@ private[akka] class ActorCell( * Create the mailbox and enqueue the Create() message to ensure that * this is processed before anything else. */ - mailbox = dispatcher.createMailbox(this) + swapMailbox(dispatcher.createMailbox(this)) // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅ mailbox.systemEnqueue(self, Create()) diff --git a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala index 8e160276e8..4692486307 100644 --- a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala @@ -310,16 +310,14 @@ abstract class MessageDispatcher(val prerequisites: DispatcherPrerequisites) ext case 0 ⇒ shutdownSchedule match { case UNSCHEDULED ⇒ - if (updateShutdownSchedule(UNSCHEDULED, SCHEDULED)) { - scheduleShutdownAction() - () - } else ifSensibleToDoSoThenScheduleShutdown() + if (updateShutdownSchedule(UNSCHEDULED, SCHEDULED)) scheduleShutdownAction() + else ifSensibleToDoSoThenScheduleShutdown() case SCHEDULED ⇒ if (updateShutdownSchedule(SCHEDULED, RESCHEDULED)) () else ifSensibleToDoSoThenScheduleShutdown() - case RESCHEDULED ⇒ () + case RESCHEDULED ⇒ } - case _ ⇒ () + case _ ⇒ } private def scheduleShutdownAction(): Unit = { @@ -349,9 +347,8 @@ abstract class MessageDispatcher(val prerequisites: DispatcherPrerequisites) ext protected[akka] def unregister(actor: ActorCell) { if (debug) actors.remove(this, actor.self) addInhabitants(-1) - val mailBox = actor.mailbox + val mailBox = actor.swapMailbox(deadLetterMailbox) mailBox.becomeClosed() // FIXME reschedule in tell if possible race with cleanUp is detected in order to properly clean up - actor.mailbox = deadLetterMailbox mailBox.cleanUp() } @@ -359,7 +356,6 @@ abstract class MessageDispatcher(val prerequisites: DispatcherPrerequisites) ext @tailrec final def run() { shutdownSchedule match { - case UNSCHEDULED ⇒ () case SCHEDULED ⇒ try { if (inhabitants == 0) shutdown() //Warning, racy @@ -369,6 +365,7 @@ abstract class MessageDispatcher(val prerequisites: DispatcherPrerequisites) ext case RESCHEDULED ⇒ if (updateShutdownSchedule(RESCHEDULED, SCHEDULED)) scheduleShutdownAction() else run() + case UNSCHEDULED ⇒ } } } diff --git a/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala index 43e8944105..e50f9150a4 100644 --- a/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala @@ -50,9 +50,9 @@ class BalancingDispatcher( private class SharingMailbox(_actor: ActorCell, _messageQueue: MessageQueue) extends Mailbox(_actor, _messageQueue) with DefaultSystemMessageQueue { override def cleanUp(): Unit = { + val dlq = actor.systemImpl.deadLetterMailbox //Don't call the original implementation of this since it scraps all messages, and we don't want to do that - if (hasSystemMessages) { - val dlq = actor.systemImpl.deadLetterMailbox + while (hasSystemMessages) { var message = systemDrain() while (message ne null) { // message must be “virgin” before being able to systemEnqueue again diff --git a/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala b/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala index 35b1e35012..b81a2fc0ba 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala @@ -235,7 +235,7 @@ private[akka] abstract class Mailbox(val actor: ActorCell, val messageQueue: Mes protected[dispatch] def cleanUp(): Unit = if (actor ne null) { // actor is null for the deadLetterMailbox val dlm = actor.systemImpl.deadLetterMailbox - if (hasSystemMessages) { + while (hasSystemMessages) { var message = systemDrain() while (message ne null) { // message must be “virgin” before being able to systemEnqueue again