Moved active flag into MessageDispatcher and let it handle the callbacks, also fixed race in DataFlowSpec

This commit is contained in:
Viktor Klang 2010-10-24 16:01:00 +02:00
parent 53e67d6b3e
commit 990b933d8f
6 changed files with 60 additions and 35 deletions

View file

@ -4,12 +4,11 @@
package se.scalablesolutions.akka.dispatch
import se.scalablesolutions.akka.actor.{ActorRegistry, ActorRef, Uuid, ActorInitializationException}
import org.multiverse.commitbarriers.CountDownCommitBarrier
import java.util.concurrent._
import se.scalablesolutions.akka.util. {ReentrantGuard, Logging, HashCode}
import se.scalablesolutions.akka.util. {Switch, ReentrantGuard, Logging, HashCode}
import se.scalablesolutions.akka.actor._
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
@ -56,20 +55,34 @@ final class MessageInvocation(val receiver: ActorRef,
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
trait MessageDispatcher extends MailboxFactory with Logging {
protected val uuids = new ConcurrentSkipListSet[Uuid]
protected val guard = new ReentrantGuard
protected val active = new Switch(false)
/**
* Attaches the specified actorRef to this dispatcher
*/
final def attach(actorRef: ActorRef): Unit = guard withGuard {
register(actorRef)
}
/**
* Detaches the specified actorRef from this dispatcher
*/
final def detach(actorRef: ActorRef): Unit = guard withGuard {
unregister(actorRef)
}
private[akka] final def dispatchMessage(invocation: MessageInvocation): Unit = if (active.isOn) {
dispatch(invocation)
} else throw new IllegalActorStateException("Can't submit invocations to dispatcher since it's not started")
protected def register(actorRef: ActorRef) {
if (uuids.isEmpty()) start
if (uuids.isEmpty()) {
active.switchOn {
start
}
}
if (actorRef.mailbox eq null) actorRef.mailbox = createMailbox(actorRef)
uuids add actorRef.uuid
}
@ -77,28 +90,52 @@ trait MessageDispatcher extends MailboxFactory with Logging {
protected def unregister(actorRef: ActorRef) = {
if (uuids remove actorRef.uuid) {
actorRef.mailbox = null
if (uuids.isEmpty) shutdown // shut down in the dispatcher's references is zero
if (uuids.isEmpty){
active switchOff {
shutdown // shut down in the dispatcher's references is zero
}
}
}
}
/**
* Traverses the list of actors (uuids) currently being attached to this dispatcher and stops those actors
*/
def stopAllLinkedActors {
val i = uuids.iterator
while(i.hasNext()) {
val uuid = i.next()
ActorRegistry.actorFor(uuid) match {
case Some(actor) => actor.stop
case None => log.warn("stopAllLinkedActors couldn't find linked actor: " + uuid)
case None =>
log.error("stopAllLinkedActors couldn't find linked actor: " + uuid)
}
}
if(uuids.isEmpty) shutdown
}
/**
* After the call to this method, the dispatcher mustn't begin any new message processing for the specified reference
*/
def suspend(actorRef: ActorRef): Unit
/*
* After the call to this method, the dispatcher must begin any new message processing for the specified reference
*/
def resume(actorRef: ActorRef): Unit
def dispatch(invocation: MessageInvocation): Unit
/**
* Will be called when the dispatcher is to queue an invocation for execution
*/
protected def dispatch(invocation: MessageInvocation): Unit
/**
* Called one time every time an actor is attached to this dispatcher and this dispatcher was previously shutdown
*/
protected def start: Unit
/**
* Called one time every time an actor is detached from this dispatcher and this dispatcher has no actors left attached
*/
protected def shutdown: Unit
/**