Removing Un(der)used locking utils (locking is evil) and removing the last locks from the MessageDispatcher

This commit is contained in:
Viktor Klang 2011-11-16 15:54:14 +01:00
parent 5593e860bc
commit 13bfee782f
9 changed files with 129 additions and 244 deletions

View file

@ -5,13 +5,14 @@
package akka.dispatch
import java.util.concurrent._
import java.util.concurrent.atomic.AtomicLong
import akka.event.Logging.Error
import akka.config.Configuration
import akka.util.{ Duration, Switch, ReentrantGuard }
import atomic.{ AtomicInteger, AtomicLong }
import java.util.concurrent.ThreadPoolExecutor.{ AbortPolicy, CallerRunsPolicy, DiscardOldestPolicy, DiscardPolicy }
import akka.actor._
import akka.actor.ActorSystem
import locks.ReentrantLock
import scala.annotation.tailrec
/**
@ -87,12 +88,9 @@ object MessageDispatcher {
abstract class MessageDispatcher(val app: ActorSystem) extends Serializable {
import MessageDispatcher._
protected val _tasks = new AtomicLong(0L)
protected val _actors = new AtomicLong(0L)
protected val guard = new ReentrantGuard
protected val active = new Switch(false)
protected val _inhabitants = new AtomicLong(0L)
private var shutdownSchedule = UNSCHEDULED //This can be non-volatile since it is protected by guard withGuard
private val shutdownSchedule = new AtomicInteger(UNSCHEDULED)
/**
* Creates and returns a mailbox for the given actor.
@ -113,91 +111,65 @@ abstract class MessageDispatcher(val app: ActorSystem) extends Serializable {
* Attaches the specified actor instance to this dispatcher
*/
final def attach(actor: ActorCell) {
guard.lock.lock()
try {
startIfUnstarted()
register(actor)
} finally {
guard.lock.unlock()
}
register(actor)
}
/**
* Detaches the specified actor instance from this dispatcher
*/
final def detach(actor: ActorCell) {
guard.lock.lock()
try {
unregister(actor)
if (_tasks.get == 0 && _actors.get == 0) {
shutdownSchedule match {
case UNSCHEDULED
shutdownSchedule = SCHEDULED
app.scheduler.scheduleOnce(shutdownAction, timeoutMs, TimeUnit.MILLISECONDS)
case SCHEDULED
shutdownSchedule = RESCHEDULED
case RESCHEDULED //Already marked for reschedule
}
}
} finally { guard.lock.unlock() }
}
protected final def startIfUnstarted() {
if (active.isOff) {
guard.lock.lock()
try { active.switchOn { start() } }
finally { guard.lock.unlock() }
}
unregister(actor)
ifSensibleToDoSoThenScheduleShutdown()
}
protected[akka] final def dispatchTask(block: () Unit) {
_tasks.getAndIncrement()
val invocation = TaskInvocation(app, block, taskCleanup)
_inhabitants.getAndIncrement()
try {
startIfUnstarted()
executeTask(TaskInvocation(app, block, taskCleanup))
executeTask(invocation)
} catch {
case e
_tasks.decrementAndGet
_inhabitants.decrementAndGet
throw e
}
}
private val taskCleanup: () Unit =
() if (_tasks.decrementAndGet() == 0) {
guard.lock.lock()
try {
if (_tasks.get == 0 && _actors.get == 0) {
shutdownSchedule match {
case UNSCHEDULED
shutdownSchedule = SCHEDULED
app.scheduler.scheduleOnce(shutdownAction, timeoutMs, TimeUnit.MILLISECONDS)
case SCHEDULED
shutdownSchedule = RESCHEDULED
case RESCHEDULED //Already marked for reschedule
}
}
} finally { guard.lock.unlock() }
}
@tailrec
private final def ifSensibleToDoSoThenScheduleShutdown(): Unit = _inhabitants.get() match {
case 0
shutdownSchedule.get match {
case UNSCHEDULED
if (shutdownSchedule.compareAndSet(UNSCHEDULED, SCHEDULED)) {
app.scheduler.scheduleOnce(shutdownAction, timeoutMs, TimeUnit.MILLISECONDS)
()
} else ifSensibleToDoSoThenScheduleShutdown()
case SCHEDULED
if (shutdownSchedule.compareAndSet(SCHEDULED, RESCHEDULED)) ()
else ifSensibleToDoSoThenScheduleShutdown()
case RESCHEDULED ()
}
case _ ()
}
private val taskCleanup: () Unit = () if (_inhabitants.decrementAndGet() == 0) ifSensibleToDoSoThenScheduleShutdown()
/**
* Only "private[akka] for the sake of intercepting calls, DO NOT CALL THIS OUTSIDE OF THE DISPATCHER,
* and only call it under the dispatcher-guard, see "attach" for the only invocation
* Don't call this, this calls you. See "attach" for only invocation
*/
protected[akka] def register(actor: ActorCell) {
_actors.incrementAndGet()
_inhabitants.incrementAndGet()
// ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅
systemDispatch(actor, Create()) //FIXME should this be here or moved into ActorCell.start perhaps?
}
/**
* Only "private[akka] for the sake of intercepting calls, DO NOT CALL THIS OUTSIDE OF THE DISPATCHER,
* and only call it under the dispatcher-guard, see "detach" for the only invocation
* Don't call this, this calls you. See "detach" for the only invocation
*/
protected[akka] def unregister(actor: ActorCell) {
_actors.decrementAndGet()
_inhabitants.decrementAndGet()
val mailBox = actor.mailbox
mailBox.becomeClosed() // FIXME reschedule in tell if possible race with cleanUp is detected in order to properly clean up
actor.mailbox = deadLetterMailbox
mailBox.becomeClosed() // FIXME reschedule in tell if possible race with cleanUp is detected in order to properly clean up
cleanUpMailboxFor(actor, mailBox)
mailBox.cleanUp()
}
@ -229,23 +201,22 @@ abstract class MessageDispatcher(val app: ActorSystem) extends Serializable {
}
private val shutdownAction = new Runnable {
def run() {
guard.lock.lock()
try {
shutdownSchedule match {
case RESCHEDULED
shutdownSchedule = SCHEDULED
@tailrec
final def run() {
shutdownSchedule.get match {
case UNSCHEDULED ()
case SCHEDULED
try {
if (_inhabitants.get == 0) //Warning, racy
shutdown()
} finally {
shutdownSchedule.getAndSet(UNSCHEDULED) //TODO perhaps check if it was modified since we checked?
}
case RESCHEDULED
if (shutdownSchedule.compareAndSet(RESCHEDULED, SCHEDULED))
app.scheduler.scheduleOnce(this, timeoutMs, TimeUnit.MILLISECONDS)
case SCHEDULED
if (_tasks.get == 0 && _actors.get() == 0) {
active switchOff {
shutdown() // shut down in the dispatcher's references is zero
}
}
shutdownSchedule = UNSCHEDULED
case UNSCHEDULED //Do nothing
}
} finally { guard.lock.unlock() }
else run()
}
}
}
@ -301,13 +272,9 @@ abstract class MessageDispatcher(val app: ActorSystem) extends Serializable {
protected[akka] def executeTask(invocation: TaskInvocation)
/**
* Called one time every time an actor is attached to this dispatcher and this dispatcher was previously shutdown
*/
protected[akka] def start(): Unit
/**
* Called one time every time an actor is detached from this dispatcher and this dispatcher has no actors left attached
* Must be idempotent
*/
protected[akka] def shutdown(): Unit
@ -320,11 +287,6 @@ abstract class MessageDispatcher(val app: ActorSystem) extends Serializable {
* Returns the "current" emptiness status of the mailbox for the specified actor
*/
def mailboxIsEmpty(actor: ActorCell): Boolean = !actor.mailbox.hasMessages
/**
* Returns the amount of tasks queued for execution
*/
def tasks: Long = _tasks.get
}
/**