diff --git a/akka-actor/src/main/java/akka/dispatch/AbstractMessageDispatcher.java b/akka-actor/src/main/java/akka/dispatch/AbstractMessageDispatcher.java index 6720cf2b54..c701931edc 100644 --- a/akka-actor/src/main/java/akka/dispatch/AbstractMessageDispatcher.java +++ b/akka-actor/src/main/java/akka/dispatch/AbstractMessageDispatcher.java @@ -4,15 +4,21 @@ package akka.dispatch; +import akka.util.Unsafe; + import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.concurrent.atomic.AtomicLongFieldUpdater; abstract class AbstractMessageDispatcher { - private volatile int _shutdownSchedule; // not initialized because this is faster: 0 == UNSCHEDULED - protected final static AtomicIntegerFieldUpdater shutdownScheduleUpdater = - AtomicIntegerFieldUpdater.newUpdater(AbstractMessageDispatcher.class, "_shutdownSchedule"); + final static long shutdownScheduleOffset; + final static long inhabitantsOffset; - private volatile long _inhabitants; // not initialized because this is faster - protected final static AtomicLongFieldUpdater inhabitantsUpdater = - AtomicLongFieldUpdater.newUpdater(AbstractMessageDispatcher.class, "_inhabitants"); + static { + try { + shutdownScheduleOffset = Unsafe.instance.objectFieldOffset(MessageDispatcher.class.getDeclaredField("_shutdownScheduleDoNotCallMeDirectly")); + inhabitantsOffset = Unsafe.instance.objectFieldOffset(MessageDispatcher.class.getDeclaredField("_inhabitantsDoNotCallMeDirectly")); + } catch(Throwable t){ + throw new ExceptionInInitializerError(t); + } + } } diff --git a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala index 82e734f596..f3703c53cf 100644 --- a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala @@ -6,17 +6,15 @@ package akka.dispatch import java.util.concurrent._ import akka.event.Logging.Error -import akka.util.Duration import akka.actor._ import akka.actor.ActorSystem import scala.annotation.tailrec import akka.event.EventStream import com.typesafe.config.Config import akka.serialization.SerializationExtension -import akka.util.NonFatal import akka.event.Logging.LogEventException import akka.jsr166y.{ ForkJoinTask, ForkJoinPool } -import akka.util.Index +import akka.util.{ Unsafe, Duration, NonFatal, Index } final case class Envelope(val message: Any, val sender: ActorRef)(system: ActorSystem) { if (message.isInstanceOf[AnyRef]) { @@ -176,7 +174,7 @@ private[akka] trait LoadMetrics { self: Executor ⇒ def atFullThrottle(): Boolean } -object MessageDispatcher { +private[akka] object MessageDispatcher { val UNSCHEDULED = 0 //WARNING DO NOT CHANGE THE VALUE OF THIS: It relies on the faster init of 0 in AbstractMessageDispatcher val SCHEDULED = 1 val RESCHEDULED = 2 @@ -210,9 +208,24 @@ object MessageDispatcher { abstract class MessageDispatcher(val prerequisites: DispatcherPrerequisites) extends AbstractMessageDispatcher with Executor with ExecutionContext { import MessageDispatcher._ - import AbstractMessageDispatcher.{ inhabitantsUpdater, shutdownScheduleUpdater } + import AbstractMessageDispatcher.{ inhabitantsOffset, shutdownScheduleOffset } import prerequisites._ + @volatile private[this] var _inhabitantsDoNotCallMeDirectly: Long = _ // DO NOT TOUCH! + @volatile private[this] var _shutdownScheduleDoNotCallMeDirectly: Int = _ // DO NOT TOUCH! + + @tailrec private final def addInhabitants(add: Long): Long = { + val u = Unsafe.instance + val c = u.getLongVolatile(this, inhabitantsOffset) + val r = c + add + if (u.compareAndSwapLong(this, inhabitantsOffset, c, r)) r else addInhabitants(add) + } + + final def inhabitants: Long = Unsafe.instance.getLongVolatile(this, inhabitantsOffset) + + private final def shutdownSchedule: Int = Unsafe.instance.getIntVolatile(this, shutdownScheduleOffset) + private final def updateShutdownSchedule(expect: Int, update: Int): Boolean = Unsafe.instance.compareAndSwapInt(this, shutdownScheduleOffset, expect, update) + /** * Creates and returns a mailbox for the given actor. */ @@ -245,12 +258,12 @@ abstract class MessageDispatcher(val prerequisites: DispatcherPrerequisites) ext final def execute(runnable: Runnable) { val invocation = TaskInvocation(eventStream, runnable, taskCleanup) - inhabitantsUpdater.incrementAndGet(this) + addInhabitants(+1) try { executeTask(invocation) } catch { case t ⇒ - inhabitantsUpdater.decrementAndGet(this) + addInhabitants(-1) throw t } } @@ -261,16 +274,16 @@ abstract class MessageDispatcher(val prerequisites: DispatcherPrerequisites) ext } @tailrec - private final def ifSensibleToDoSoThenScheduleShutdown(): Unit = inhabitantsUpdater.get(this) match { + private final def ifSensibleToDoSoThenScheduleShutdown(): Unit = inhabitants match { case 0 ⇒ - shutdownScheduleUpdater.get(this) match { + shutdownSchedule match { case UNSCHEDULED ⇒ - if (shutdownScheduleUpdater.compareAndSet(this, UNSCHEDULED, SCHEDULED)) { + if (updateShutdownSchedule(UNSCHEDULED, SCHEDULED)) { scheduleShutdownAction() () } else ifSensibleToDoSoThenScheduleShutdown() case SCHEDULED ⇒ - if (shutdownScheduleUpdater.compareAndSet(this, SCHEDULED, RESCHEDULED)) () + if (updateShutdownSchedule(SCHEDULED, RESCHEDULED)) () else ifSensibleToDoSoThenScheduleShutdown() case RESCHEDULED ⇒ () } @@ -284,15 +297,14 @@ abstract class MessageDispatcher(val prerequisites: DispatcherPrerequisites) ext } } - private final val taskCleanup: () ⇒ Unit = - () ⇒ if (inhabitantsUpdater.decrementAndGet(this) == 0) ifSensibleToDoSoThenScheduleShutdown() + private final val taskCleanup: () ⇒ Unit = () ⇒ if (addInhabitants(-1) == 0) ifSensibleToDoSoThenScheduleShutdown() /** * If you override it, you must call it. But only ever once. See "attach" for only invocation. */ protected[akka] def register(actor: ActorCell) { if (debug) actors.put(this, actor.self) - inhabitantsUpdater.incrementAndGet(this) + addInhabitants(+1) } /** @@ -300,30 +312,26 @@ abstract class MessageDispatcher(val prerequisites: DispatcherPrerequisites) ext */ protected[akka] def unregister(actor: ActorCell) { if (debug) actors.remove(this, actor.self) - inhabitantsUpdater.decrementAndGet(this) + addInhabitants(-1) val mailBox = actor.mailbox mailBox.becomeClosed() // FIXME reschedule in tell if possible race with cleanUp is detected in order to properly clean up actor.mailbox = deadLetterMailbox mailBox.cleanUp() } - def inhabitants: Long = inhabitantsUpdater.get(this) - private val shutdownAction = new Runnable { @tailrec final def run() { - shutdownScheduleUpdater.get(MessageDispatcher.this) match { + shutdownSchedule match { case UNSCHEDULED ⇒ () case SCHEDULED ⇒ try { - if (inhabitantsUpdater.get(MessageDispatcher.this) == 0) //Warning, racy - shutdown() + if (inhabitants == 0) shutdown() //Warning, racy } finally { - shutdownScheduleUpdater.getAndSet(MessageDispatcher.this, UNSCHEDULED) //TODO perhaps check if it was modified since we checked? + while (!updateShutdownSchedule(shutdownSchedule, UNSCHEDULED)) {} } case RESCHEDULED ⇒ - if (shutdownScheduleUpdater.compareAndSet(MessageDispatcher.this, RESCHEDULED, SCHEDULED)) - scheduleShutdownAction() + if (updateShutdownSchedule(RESCHEDULED, SCHEDULED)) scheduleShutdownAction() else run() } }