Removing the Atomic*Updaters from MessageDispatcher, switching to Unsafe all the way baby

This commit is contained in:
Viktor Klang 2012-05-11 15:00:37 +02:00
parent 0879c0e545
commit db3ce87917
2 changed files with 43 additions and 29 deletions

View file

@ -4,15 +4,21 @@
package akka.dispatch;
import akka.util.Unsafe;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
abstract class AbstractMessageDispatcher {
private volatile int _shutdownSchedule; // not initialized because this is faster: 0 == UNSCHEDULED
protected final static AtomicIntegerFieldUpdater<AbstractMessageDispatcher> shutdownScheduleUpdater =
AtomicIntegerFieldUpdater.newUpdater(AbstractMessageDispatcher.class, "_shutdownSchedule");
final static long shutdownScheduleOffset;
final static long inhabitantsOffset;
private volatile long _inhabitants; // not initialized because this is faster
protected final static AtomicLongFieldUpdater<AbstractMessageDispatcher> inhabitantsUpdater =
AtomicLongFieldUpdater.newUpdater(AbstractMessageDispatcher.class, "_inhabitants");
static {
try {
shutdownScheduleOffset = Unsafe.instance.objectFieldOffset(MessageDispatcher.class.getDeclaredField("_shutdownScheduleDoNotCallMeDirectly"));
inhabitantsOffset = Unsafe.instance.objectFieldOffset(MessageDispatcher.class.getDeclaredField("_inhabitantsDoNotCallMeDirectly"));
} catch(Throwable t){
throw new ExceptionInInitializerError(t);
}
}
}

View file

@ -6,17 +6,15 @@ package akka.dispatch
import java.util.concurrent._
import akka.event.Logging.Error
import akka.util.Duration
import akka.actor._
import akka.actor.ActorSystem
import scala.annotation.tailrec
import akka.event.EventStream
import com.typesafe.config.Config
import akka.serialization.SerializationExtension
import akka.util.NonFatal
import akka.event.Logging.LogEventException
import akka.jsr166y.{ ForkJoinTask, ForkJoinPool }
import akka.util.Index
import akka.util.{ Unsafe, Duration, NonFatal, Index }
final case class Envelope(val message: Any, val sender: ActorRef)(system: ActorSystem) {
if (message.isInstanceOf[AnyRef]) {
@ -176,7 +174,7 @@ private[akka] trait LoadMetrics { self: Executor ⇒
def atFullThrottle(): Boolean
}
object MessageDispatcher {
private[akka] object MessageDispatcher {
val UNSCHEDULED = 0 //WARNING DO NOT CHANGE THE VALUE OF THIS: It relies on the faster init of 0 in AbstractMessageDispatcher
val SCHEDULED = 1
val RESCHEDULED = 2
@ -210,9 +208,24 @@ object MessageDispatcher {
abstract class MessageDispatcher(val prerequisites: DispatcherPrerequisites) extends AbstractMessageDispatcher with Executor with ExecutionContext {
import MessageDispatcher._
import AbstractMessageDispatcher.{ inhabitantsUpdater, shutdownScheduleUpdater }
import AbstractMessageDispatcher.{ inhabitantsOffset, shutdownScheduleOffset }
import prerequisites._
@volatile private[this] var _inhabitantsDoNotCallMeDirectly: Long = _ // DO NOT TOUCH!
@volatile private[this] var _shutdownScheduleDoNotCallMeDirectly: Int = _ // DO NOT TOUCH!
@tailrec private final def addInhabitants(add: Long): Long = {
val u = Unsafe.instance
val c = u.getLongVolatile(this, inhabitantsOffset)
val r = c + add
if (u.compareAndSwapLong(this, inhabitantsOffset, c, r)) r else addInhabitants(add)
}
final def inhabitants: Long = Unsafe.instance.getLongVolatile(this, inhabitantsOffset)
private final def shutdownSchedule: Int = Unsafe.instance.getIntVolatile(this, shutdownScheduleOffset)
private final def updateShutdownSchedule(expect: Int, update: Int): Boolean = Unsafe.instance.compareAndSwapInt(this, shutdownScheduleOffset, expect, update)
/**
* Creates and returns a mailbox for the given actor.
*/
@ -245,12 +258,12 @@ abstract class MessageDispatcher(val prerequisites: DispatcherPrerequisites) ext
final def execute(runnable: Runnable) {
val invocation = TaskInvocation(eventStream, runnable, taskCleanup)
inhabitantsUpdater.incrementAndGet(this)
addInhabitants(+1)
try {
executeTask(invocation)
} catch {
case t
inhabitantsUpdater.decrementAndGet(this)
addInhabitants(-1)
throw t
}
}
@ -261,16 +274,16 @@ abstract class MessageDispatcher(val prerequisites: DispatcherPrerequisites) ext
}
@tailrec
private final def ifSensibleToDoSoThenScheduleShutdown(): Unit = inhabitantsUpdater.get(this) match {
private final def ifSensibleToDoSoThenScheduleShutdown(): Unit = inhabitants match {
case 0
shutdownScheduleUpdater.get(this) match {
shutdownSchedule match {
case UNSCHEDULED
if (shutdownScheduleUpdater.compareAndSet(this, UNSCHEDULED, SCHEDULED)) {
if (updateShutdownSchedule(UNSCHEDULED, SCHEDULED)) {
scheduleShutdownAction()
()
} else ifSensibleToDoSoThenScheduleShutdown()
case SCHEDULED
if (shutdownScheduleUpdater.compareAndSet(this, SCHEDULED, RESCHEDULED)) ()
if (updateShutdownSchedule(SCHEDULED, RESCHEDULED)) ()
else ifSensibleToDoSoThenScheduleShutdown()
case RESCHEDULED ()
}
@ -284,15 +297,14 @@ abstract class MessageDispatcher(val prerequisites: DispatcherPrerequisites) ext
}
}
private final val taskCleanup: () Unit =
() if (inhabitantsUpdater.decrementAndGet(this) == 0) ifSensibleToDoSoThenScheduleShutdown()
private final val taskCleanup: () Unit = () if (addInhabitants(-1) == 0) ifSensibleToDoSoThenScheduleShutdown()
/**
* If you override it, you must call it. But only ever once. See "attach" for only invocation.
*/
protected[akka] def register(actor: ActorCell) {
if (debug) actors.put(this, actor.self)
inhabitantsUpdater.incrementAndGet(this)
addInhabitants(+1)
}
/**
@ -300,30 +312,26 @@ abstract class MessageDispatcher(val prerequisites: DispatcherPrerequisites) ext
*/
protected[akka] def unregister(actor: ActorCell) {
if (debug) actors.remove(this, actor.self)
inhabitantsUpdater.decrementAndGet(this)
addInhabitants(-1)
val mailBox = actor.mailbox
mailBox.becomeClosed() // FIXME reschedule in tell if possible race with cleanUp is detected in order to properly clean up
actor.mailbox = deadLetterMailbox
mailBox.cleanUp()
}
def inhabitants: Long = inhabitantsUpdater.get(this)
private val shutdownAction = new Runnable {
@tailrec
final def run() {
shutdownScheduleUpdater.get(MessageDispatcher.this) match {
shutdownSchedule match {
case UNSCHEDULED ()
case SCHEDULED
try {
if (inhabitantsUpdater.get(MessageDispatcher.this) == 0) //Warning, racy
shutdown()
if (inhabitants == 0) shutdown() //Warning, racy
} finally {
shutdownScheduleUpdater.getAndSet(MessageDispatcher.this, UNSCHEDULED) //TODO perhaps check if it was modified since we checked?
while (!updateShutdownSchedule(shutdownSchedule, UNSCHEDULED)) {}
}
case RESCHEDULED
if (shutdownScheduleUpdater.compareAndSet(MessageDispatcher.this, RESCHEDULED, SCHEDULED))
scheduleShutdownAction()
if (updateShutdownSchedule(RESCHEDULED, SCHEDULED)) scheduleShutdownAction()
else run()
}
}