Adding some sanity and some CAS-magic

This commit is contained in:
Viktor Klang 2012-06-04 11:46:59 +02:00
parent a515377592
commit 2e788c9704
5 changed files with 50 additions and 18 deletions

View file

@ -0,0 +1,19 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.actor;
import akka.util.Unsafe;
final class AbstractActorCell {
final static long mailboxOffset;
static {
try {
mailboxOffset = Unsafe.instance.objectFieldOffset(ActorCell.class.getDeclaredField("_mailboxDoNotCallMeDirectly"));
} catch(Throwable t){
throw new ExceptionInInitializerError(t);
}
}
}

View file

@ -9,13 +9,12 @@ import scala.annotation.tailrec
import java.util.concurrent.TimeUnit
import java.util.concurrent.TimeUnit.MILLISECONDS
import akka.event.Logging.{ Debug, Warning, Error }
import akka.util.{ Duration, Helpers }
import akka.japi.Procedure
import java.io.{ NotSerializableException, ObjectOutputStream }
import akka.serialization.SerializationExtension
import akka.util.NonFatal
import akka.event.Logging.LogEventException
import collection.immutable.{ TreeSet, Stack, TreeMap }
import akka.util.{ Unsafe, Duration, Helpers, NonFatal }
//TODO: everything here for current compatibility - could be limited more
@ -319,7 +318,7 @@ private[akka] class ActorCell(
val props: Props,
@volatile var parent: InternalActorRef,
/*no member*/ _receiveTimeout: Option[Duration]) extends UntypedActorContext {
import AbstractActorCell.mailboxOffset
import ActorCell._
final def systemImpl = system
@ -412,8 +411,7 @@ private[akka] class ActorCell(
var currentMessage: Envelope = _
var actor: Actor = _
private var behaviorStack: Stack[Actor.Receive] = Stack.empty
@volatile //This must be volatile since it isn't protected by the mailbox status
var mailbox: Mailbox = _
@volatile var _mailboxDoNotCallMeDirectly: Mailbox = _ //This must be volatile since it isn't protected by the mailbox status
var nextNameSequence: Long = 0
var watching: Set[ActorRef] = emptyActorRefSet
var watchedBy: Set[ActorRef] = emptyActorRefSet
@ -428,6 +426,24 @@ private[akka] class ActorCell(
@inline
final val dispatcher: MessageDispatcher = system.dispatchers.lookup(props.dispatcher)
/**
* INTERNAL API
*
* Returns a reference to the current mailbox
*/
@inline final def mailbox: Mailbox = Unsafe.instance.getObjectVolatile(this, mailboxOffset).asInstanceOf[Mailbox]
/**
* INTERNAL API
*
* replaces the current mailbox using getAndSet semantics
*/
@tailrec final def swapMailbox(newMailbox: Mailbox): Mailbox = {
val oldMailbox = mailbox
if (!Unsafe.instance.compareAndSwapObject(this, mailboxOffset, oldMailbox, newMailbox)) swapMailbox(newMailbox)
else oldMailbox
}
/**
* UntypedActorContext impl
*/
@ -440,7 +456,7 @@ private[akka] class ActorCell(
* Create the mailbox and enqueue the Create() message to ensure that
* this is processed before anything else.
*/
mailbox = dispatcher.createMailbox(this)
swapMailbox(dispatcher.createMailbox(this))
// ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅
mailbox.systemEnqueue(self, Create())

View file

@ -310,16 +310,14 @@ abstract class MessageDispatcher(val prerequisites: DispatcherPrerequisites) ext
case 0
shutdownSchedule match {
case UNSCHEDULED
if (updateShutdownSchedule(UNSCHEDULED, SCHEDULED)) {
scheduleShutdownAction()
()
} else ifSensibleToDoSoThenScheduleShutdown()
if (updateShutdownSchedule(UNSCHEDULED, SCHEDULED)) scheduleShutdownAction()
else ifSensibleToDoSoThenScheduleShutdown()
case SCHEDULED
if (updateShutdownSchedule(SCHEDULED, RESCHEDULED)) ()
else ifSensibleToDoSoThenScheduleShutdown()
case RESCHEDULED ()
case RESCHEDULED
}
case _ ()
case _
}
private def scheduleShutdownAction(): Unit = {
@ -349,9 +347,8 @@ abstract class MessageDispatcher(val prerequisites: DispatcherPrerequisites) ext
protected[akka] def unregister(actor: ActorCell) {
if (debug) actors.remove(this, actor.self)
addInhabitants(-1)
val mailBox = actor.mailbox
val mailBox = actor.swapMailbox(deadLetterMailbox)
mailBox.becomeClosed() // FIXME reschedule in tell if possible race with cleanUp is detected in order to properly clean up
actor.mailbox = deadLetterMailbox
mailBox.cleanUp()
}
@ -359,7 +356,6 @@ abstract class MessageDispatcher(val prerequisites: DispatcherPrerequisites) ext
@tailrec
final def run() {
shutdownSchedule match {
case UNSCHEDULED ()
case SCHEDULED
try {
if (inhabitants == 0) shutdown() //Warning, racy
@ -369,6 +365,7 @@ abstract class MessageDispatcher(val prerequisites: DispatcherPrerequisites) ext
case RESCHEDULED
if (updateShutdownSchedule(RESCHEDULED, SCHEDULED)) scheduleShutdownAction()
else run()
case UNSCHEDULED
}
}
}

View file

@ -50,9 +50,9 @@ class BalancingDispatcher(
private class SharingMailbox(_actor: ActorCell, _messageQueue: MessageQueue) extends Mailbox(_actor, _messageQueue) with DefaultSystemMessageQueue {
override def cleanUp(): Unit = {
val dlq = actor.systemImpl.deadLetterMailbox
//Don't call the original implementation of this since it scraps all messages, and we don't want to do that
if (hasSystemMessages) {
val dlq = actor.systemImpl.deadLetterMailbox
while (hasSystemMessages) {
var message = systemDrain()
while (message ne null) {
// message must be virgin before being able to systemEnqueue again

View file

@ -235,7 +235,7 @@ private[akka] abstract class Mailbox(val actor: ActorCell, val messageQueue: Mes
protected[dispatch] def cleanUp(): Unit =
if (actor ne null) { // actor is null for the deadLetterMailbox
val dlm = actor.systemImpl.deadLetterMailbox
if (hasSystemMessages) {
while (hasSystemMessages) {
var message = systemDrain()
while (message ne null) {
// message must be virgin before being able to systemEnqueue again