merge in Viktor’s dispatcher uuid map removal

This commit is contained in:
Roland 2011-10-18 16:47:07 +02:00
commit df29faca2d
3 changed files with 22 additions and 25 deletions

View file

@ -62,8 +62,8 @@ object MessageDispatcher {
abstract class MessageDispatcher(val app: AkkaApplication) extends Serializable {
import MessageDispatcher._
protected val uuids = new ConcurrentSkipListSet[Uuid]
protected val _tasks = new AtomicLong(0L)
protected val _actors = new AtomicLong(0L)
protected val guard = new ReentrantGuard
protected val active = new Switch(false)
@ -115,7 +115,7 @@ abstract class MessageDispatcher(val app: AkkaApplication) extends Serializable
final def detach(actor: ActorCell) {
guard withGuard {
unregister(actor)
if (uuids.isEmpty && _tasks.get == 0) {
if (_tasks.get == 0 && _actors.get == 0) {
shutdownSchedule match {
case UNSCHEDULED
shutdownSchedule = SCHEDULED
@ -147,7 +147,7 @@ abstract class MessageDispatcher(val app: AkkaApplication) extends Serializable
private val taskCleanup: () Unit =
() if (_tasks.decrementAndGet() == 0) {
guard withGuard {
if (_tasks.get == 0 && uuids.isEmpty) {
if (_tasks.get == 0 && _actors.get == 0) {
shutdownSchedule match {
case UNSCHEDULED
shutdownSchedule = SCHEDULED
@ -165,9 +165,8 @@ abstract class MessageDispatcher(val app: AkkaApplication) extends Serializable
* and only call it under the dispatcher-guard, see "attach" for the only invocation
*/
protected[akka] def register(actor: ActorCell) {
if (uuids add actor.uuid) {
systemDispatch(actor, Create()) //FIXME should this be here or moved into ActorCell.start perhaps?
} else System.err.println("Couldn't register: " + actor)
_actors.incrementAndGet()
systemDispatch(actor, Create()) //FIXME should this be here or moved into ActorCell.start perhaps?
}
/**
@ -175,12 +174,11 @@ abstract class MessageDispatcher(val app: AkkaApplication) extends Serializable
* and only call it under the dispatcher-guard, see "detach" for the only invocation
*/
protected[akka] def unregister(actor: ActorCell) {
if (uuids remove actor.uuid) {
val mailBox = actor.mailbox
mailBox.becomeClosed()
actor.mailbox = deadLetterMailbox //FIXME getAndSet would be preferrable here
cleanUpMailboxFor(actor, mailBox)
} else System.err.println("Couldn't unregister: " + actor)
_actors.decrementAndGet()
val mailBox = actor.mailbox
mailBox.becomeClosed()
actor.mailbox = deadLetterMailbox //FIXME getAndSet would be preferrable here
cleanUpMailboxFor(actor, mailBox)
}
/**
@ -214,7 +212,7 @@ abstract class MessageDispatcher(val app: AkkaApplication) extends Serializable
shutdownSchedule = SCHEDULED
app.scheduler.scheduleOnce(this, timeoutMs, TimeUnit.MILLISECONDS)
case SCHEDULED
if (uuids.isEmpty && _tasks.get == 0) {
if (_tasks.get == 0) {
active switchOff {
shutdown() // shut down in the dispatcher's references is zero
}
@ -235,16 +233,21 @@ abstract class MessageDispatcher(val app: AkkaApplication) extends Serializable
/**
* After the call to this method, the dispatcher mustn't begin any new message processing for the specified reference
*/
def suspend(actor: ActorCell): Unit =
if (uuids.contains(actor.uuid)) actor.mailbox.becomeSuspended()
def suspend(actor: ActorCell): Unit = {
val mbox = actor.mailbox
if (mbox.dispatcher eq this)
mbox.becomeSuspended()
}
/*
* After the call to this method, the dispatcher must begin any new message processing for the specified reference
*/
def resume(actor: ActorCell): Unit = if (uuids.contains(actor.uuid)) {
def resume(actor: ActorCell): Unit = {
val mbox = actor.mailbox
mbox.becomeOpen()
registerForExecution(mbox, false, false)
if (mbox.dispatcher eq this) {
mbox.becomeOpen()
registerForExecution(mbox, false, false)
}
}
/**