diff --git a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala index 72fb681495..6cdd6245b0 100644 --- a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala @@ -75,7 +75,7 @@ final case class TaskInvocation(app: ActorSystem, function: () ⇒ Unit, cleanup } object MessageDispatcher { - val UNSCHEDULED = 0 + val UNSCHEDULED = 0 //WARNING DO NOT CHANGE THE VALUE OF THIS: It relies on the faster init of 0 in AbstractMessageDispatcher val SCHEDULED = 1 val RESCHEDULED = 2 @@ -85,12 +85,9 @@ object MessageDispatcher { /** * @author Jonas Bonér */ -abstract class MessageDispatcher(val app: ActorSystem) extends Serializable { +abstract class MessageDispatcher(val app: ActorSystem) extends AbstractMessageDispatcher with Serializable { import MessageDispatcher._ - - protected val _inhabitants = new AtomicLong(0L) - - private val shutdownSchedule = new AtomicInteger(UNSCHEDULED) + import AbstractMessageDispatcher.{ inhabitantsUpdater, shutdownScheduleUpdater } /** * Creates and returns a mailbox for the given actor. @@ -124,40 +121,41 @@ abstract class MessageDispatcher(val app: ActorSystem) extends Serializable { protected[akka] final def dispatchTask(block: () ⇒ Unit) { val invocation = TaskInvocation(app, block, taskCleanup) - _inhabitants.getAndIncrement() + inhabitantsUpdater.incrementAndGet(this) try { executeTask(invocation) } catch { case e ⇒ - _inhabitants.decrementAndGet + inhabitantsUpdater.decrementAndGet(this) throw e } } @tailrec - private final def ifSensibleToDoSoThenScheduleShutdown(): Unit = _inhabitants.get() match { + private final def ifSensibleToDoSoThenScheduleShutdown(): Unit = inhabitantsUpdater.get(this) match { case 0 ⇒ - shutdownSchedule.get match { + shutdownScheduleUpdater.get(this) match { case UNSCHEDULED ⇒ - if (shutdownSchedule.compareAndSet(UNSCHEDULED, SCHEDULED)) { + if (shutdownScheduleUpdater.compareAndSet(this, UNSCHEDULED, SCHEDULED)) { app.scheduler.scheduleOnce(shutdownAction, timeoutMs, TimeUnit.MILLISECONDS) () } else ifSensibleToDoSoThenScheduleShutdown() case SCHEDULED ⇒ - if (shutdownSchedule.compareAndSet(SCHEDULED, RESCHEDULED)) () + if (shutdownScheduleUpdater.compareAndSet(this, SCHEDULED, RESCHEDULED)) () else ifSensibleToDoSoThenScheduleShutdown() case RESCHEDULED ⇒ () } case _ ⇒ () } - private val taskCleanup: () ⇒ Unit = () ⇒ if (_inhabitants.decrementAndGet() == 0) ifSensibleToDoSoThenScheduleShutdown() + private final val taskCleanup: () ⇒ Unit = + () ⇒ if (inhabitantsUpdater.decrementAndGet(this) == 0) ifSensibleToDoSoThenScheduleShutdown() /** * Don't call this, this calls you. See "attach" for only invocation */ protected[akka] def register(actor: ActorCell) { - _inhabitants.incrementAndGet() + inhabitantsUpdater.incrementAndGet(this) // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅ systemDispatch(actor, Create()) //FIXME should this be here or moved into ActorCell.start perhaps? } @@ -166,7 +164,7 @@ abstract class MessageDispatcher(val app: ActorSystem) extends Serializable { * Don't call this, this calls you. See "detach" for the only invocation */ protected[akka] def unregister(actor: ActorCell) { - _inhabitants.decrementAndGet() + inhabitantsUpdater.decrementAndGet(this) val mailBox = actor.mailbox mailBox.becomeClosed() // FIXME reschedule in tell if possible race with cleanUp is detected in order to properly clean up actor.mailbox = deadLetterMailbox @@ -203,17 +201,17 @@ abstract class MessageDispatcher(val app: ActorSystem) extends Serializable { private val shutdownAction = new Runnable { @tailrec final def run() { - shutdownSchedule.get match { + shutdownScheduleUpdater.get(MessageDispatcher.this) match { case UNSCHEDULED ⇒ () case SCHEDULED ⇒ try { - if (_inhabitants.get == 0) //Warning, racy + if (inhabitantsUpdater.get(MessageDispatcher.this) == 0) //Warning, racy shutdown() } finally { - shutdownSchedule.getAndSet(UNSCHEDULED) //TODO perhaps check if it was modified since we checked? + shutdownScheduleUpdater.getAndSet(MessageDispatcher.this, UNSCHEDULED) //TODO perhaps check if it was modified since we checked? } case RESCHEDULED ⇒ - if (shutdownSchedule.compareAndSet(RESCHEDULED, SCHEDULED)) + if (shutdownScheduleUpdater.compareAndSet(MessageDispatcher.this, RESCHEDULED, SCHEDULED)) app.scheduler.scheduleOnce(this, timeoutMs, TimeUnit.MILLISECONDS) else run() }