Switching to a Java baseclass for the MessageDispatcher so we can use primitive fields and Atmoc field updaters for cache locality
This commit is contained in:
parent
13bfee782f
commit
39b374ba22
1 changed files with 17 additions and 19 deletions
|
|
@ -75,7 +75,7 @@ final case class TaskInvocation(app: ActorSystem, function: () ⇒ Unit, cleanup
|
|||
}
|
||||
|
||||
object MessageDispatcher {
|
||||
val UNSCHEDULED = 0
|
||||
val UNSCHEDULED = 0 //WARNING DO NOT CHANGE THE VALUE OF THIS: It relies on the faster init of 0 in AbstractMessageDispatcher
|
||||
val SCHEDULED = 1
|
||||
val RESCHEDULED = 2
|
||||
|
||||
|
|
@ -85,12 +85,9 @@ object MessageDispatcher {
|
|||
/**
|
||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
*/
|
||||
abstract class MessageDispatcher(val app: ActorSystem) extends Serializable {
|
||||
abstract class MessageDispatcher(val app: ActorSystem) extends AbstractMessageDispatcher with Serializable {
|
||||
import MessageDispatcher._
|
||||
|
||||
protected val _inhabitants = new AtomicLong(0L)
|
||||
|
||||
private val shutdownSchedule = new AtomicInteger(UNSCHEDULED)
|
||||
import AbstractMessageDispatcher.{ inhabitantsUpdater, shutdownScheduleUpdater }
|
||||
|
||||
/**
|
||||
* Creates and returns a mailbox for the given actor.
|
||||
|
|
@ -124,40 +121,41 @@ abstract class MessageDispatcher(val app: ActorSystem) extends Serializable {
|
|||
|
||||
protected[akka] final def dispatchTask(block: () ⇒ Unit) {
|
||||
val invocation = TaskInvocation(app, block, taskCleanup)
|
||||
_inhabitants.getAndIncrement()
|
||||
inhabitantsUpdater.incrementAndGet(this)
|
||||
try {
|
||||
executeTask(invocation)
|
||||
} catch {
|
||||
case e ⇒
|
||||
_inhabitants.decrementAndGet
|
||||
inhabitantsUpdater.decrementAndGet(this)
|
||||
throw e
|
||||
}
|
||||
}
|
||||
|
||||
@tailrec
|
||||
private final def ifSensibleToDoSoThenScheduleShutdown(): Unit = _inhabitants.get() match {
|
||||
private final def ifSensibleToDoSoThenScheduleShutdown(): Unit = inhabitantsUpdater.get(this) match {
|
||||
case 0 ⇒
|
||||
shutdownSchedule.get match {
|
||||
shutdownScheduleUpdater.get(this) match {
|
||||
case UNSCHEDULED ⇒
|
||||
if (shutdownSchedule.compareAndSet(UNSCHEDULED, SCHEDULED)) {
|
||||
if (shutdownScheduleUpdater.compareAndSet(this, UNSCHEDULED, SCHEDULED)) {
|
||||
app.scheduler.scheduleOnce(shutdownAction, timeoutMs, TimeUnit.MILLISECONDS)
|
||||
()
|
||||
} else ifSensibleToDoSoThenScheduleShutdown()
|
||||
case SCHEDULED ⇒
|
||||
if (shutdownSchedule.compareAndSet(SCHEDULED, RESCHEDULED)) ()
|
||||
if (shutdownScheduleUpdater.compareAndSet(this, SCHEDULED, RESCHEDULED)) ()
|
||||
else ifSensibleToDoSoThenScheduleShutdown()
|
||||
case RESCHEDULED ⇒ ()
|
||||
}
|
||||
case _ ⇒ ()
|
||||
}
|
||||
|
||||
private val taskCleanup: () ⇒ Unit = () ⇒ if (_inhabitants.decrementAndGet() == 0) ifSensibleToDoSoThenScheduleShutdown()
|
||||
private final val taskCleanup: () ⇒ Unit =
|
||||
() ⇒ if (inhabitantsUpdater.decrementAndGet(this) == 0) ifSensibleToDoSoThenScheduleShutdown()
|
||||
|
||||
/**
|
||||
* Don't call this, this calls you. See "attach" for only invocation
|
||||
*/
|
||||
protected[akka] def register(actor: ActorCell) {
|
||||
_inhabitants.incrementAndGet()
|
||||
inhabitantsUpdater.incrementAndGet(this)
|
||||
// ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅
|
||||
systemDispatch(actor, Create()) //FIXME should this be here or moved into ActorCell.start perhaps?
|
||||
}
|
||||
|
|
@ -166,7 +164,7 @@ abstract class MessageDispatcher(val app: ActorSystem) extends Serializable {
|
|||
* Don't call this, this calls you. See "detach" for the only invocation
|
||||
*/
|
||||
protected[akka] def unregister(actor: ActorCell) {
|
||||
_inhabitants.decrementAndGet()
|
||||
inhabitantsUpdater.decrementAndGet(this)
|
||||
val mailBox = actor.mailbox
|
||||
mailBox.becomeClosed() // FIXME reschedule in tell if possible race with cleanUp is detected in order to properly clean up
|
||||
actor.mailbox = deadLetterMailbox
|
||||
|
|
@ -203,17 +201,17 @@ abstract class MessageDispatcher(val app: ActorSystem) extends Serializable {
|
|||
private val shutdownAction = new Runnable {
|
||||
@tailrec
|
||||
final def run() {
|
||||
shutdownSchedule.get match {
|
||||
shutdownScheduleUpdater.get(MessageDispatcher.this) match {
|
||||
case UNSCHEDULED ⇒ ()
|
||||
case SCHEDULED ⇒
|
||||
try {
|
||||
if (_inhabitants.get == 0) //Warning, racy
|
||||
if (inhabitantsUpdater.get(MessageDispatcher.this) == 0) //Warning, racy
|
||||
shutdown()
|
||||
} finally {
|
||||
shutdownSchedule.getAndSet(UNSCHEDULED) //TODO perhaps check if it was modified since we checked?
|
||||
shutdownScheduleUpdater.getAndSet(MessageDispatcher.this, UNSCHEDULED) //TODO perhaps check if it was modified since we checked?
|
||||
}
|
||||
case RESCHEDULED ⇒
|
||||
if (shutdownSchedule.compareAndSet(RESCHEDULED, SCHEDULED))
|
||||
if (shutdownScheduleUpdater.compareAndSet(MessageDispatcher.this, RESCHEDULED, SCHEDULED))
|
||||
app.scheduler.scheduleOnce(this, timeoutMs, TimeUnit.MILLISECONDS)
|
||||
else run()
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue