Moved active flag into MessageDispatcher and let it handle the callbacks, also fixed race in DataFlowSpec

This commit is contained in:
Viktor Klang 2010-10-24 16:01:00 +02:00
parent b80fb9096a
commit dbd2db691c
6 changed files with 60 additions and 35 deletions

View file

@ -964,7 +964,7 @@ class LocalActorRef private[akka] (
message, senderOption, None, remoteAddress.get, timeout, true, this, None, ActorType.ScalaActor)
} else {
val invocation = new MessageInvocation(this, message, senderOption, None, transactionSet.get)
dispatcher dispatch invocation
dispatcher dispatchMessage invocation
}
}
@ -985,7 +985,7 @@ class LocalActorRef private[akka] (
else new DefaultCompletableFuture[T](timeout)
val invocation = new MessageInvocation(
this, message, senderOption, Some(future.asInstanceOf[CompletableFuture[Any]]), transactionSet.get)
dispatcher dispatch invocation
dispatcher dispatchMessage invocation
future
}
}

View file

@ -91,11 +91,10 @@ class ExecutorBasedEventDrivenDispatcher(
val name = "akka:event-driven:dispatcher:" + _name
val mailboxType = Some(_mailboxType)
private[akka] val active = new Switch(false)
private[akka] val threadFactory = new MonitorableThreadFactory(name)
private[akka] val executorService = new AtomicReference[ExecutorService](config.createLazyExecutorService(threadFactory))
def dispatch(invocation: MessageInvocation) = {
protected def dispatch(invocation: MessageInvocation) = {
val mbox = getMailbox(invocation.receiver)
mbox enqueue invocation
registerForExecution(mbox)
@ -132,11 +131,9 @@ class ExecutorBasedEventDrivenDispatcher(
case JMSBasedDurableMailbox(serializer) => throw new UnsupportedOperationException("JMSBasedDurableMailbox is not yet supported")
}
protected def start: Unit = active switchOn {
log.debug("Starting up %s\n\twith throughput [%d]", toString, throughput)
}
protected def start= log.debug("Starting up %s\n\twith throughput [%d]", toString, throughput)
protected def shutdown: Unit = active switchOff {
protected def shutdown {
val old = executorService.getAndSet(config.createLazyExecutorService(threadFactory))
if (old ne null) {
log.debug("Shutting down %s", toString)

View file

@ -44,7 +44,6 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher(
val mailboxType = Some(_mailboxType)
val name = "akka:event-driven-work-stealing:dispatcher:" + _name
private val active = new Switch(false)
/** Type of the actors registered in this dispatcher. */
@volatile private var actorType: Option[Class[_]] = None
@ -55,8 +54,6 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher(
/** The index in the pooled actors list which was last used to steal work */
@volatile private var lastThiefIndex = 0
/**
* @return the mailbox associated with the actor
*/
@ -64,11 +61,11 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher(
override def mailboxSize(actorRef: ActorRef) = getMailbox(actorRef).size
def dispatch(invocation: MessageInvocation) = if (active.isOn) {
protected def dispatch(invocation: MessageInvocation) {
val mbox = getMailbox(invocation.receiver)
mbox enqueue invocation
executorService.get() execute mbox
} else throw new IllegalActorStateException("Can't submit invocations to dispatcher since it's not started")
}
/**
* Try processing the mailbox of the given actor. Fails if the dispatching lock on the actor is already held by
@ -170,11 +167,9 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher(
} else false
}
protected def start = active switchOn {
log.debug("Starting up %s",toString)
}
protected def start = log.debug("Starting up %s",toString)
protected def shutdown: Unit = active switchOff {
protected def shutdown {
val old = executorService.getAndSet(config.createLazyExecutorService(threadFactory))
if (old ne null) {
log.debug("Shutting down %s", toString)

View file

@ -142,18 +142,14 @@ object HawtDispatcher {
class HawtDispatcher(val aggregate: Boolean = true, val parent: DispatchQueue = globalQueue) extends MessageDispatcher {
import HawtDispatcher._
private val active = new Switch(false)
val mailboxType: Option[MailboxType] = None
def start = active switchOn { retainNonDaemon }
protected def start { retainNonDaemon }
def shutdown = active switchOff { releaseNonDaemon }
protected def shutdown { releaseNonDaemon }
def dispatch(invocation: MessageInvocation) = if (active.isOn) {
protected def dispatch(invocation: MessageInvocation){
mailbox(invocation.receiver).dispatch(invocation)
} else {
log.warning("%s is shut down,\n\tignoring the the messages sent to\n\t%s", toString, invocation.receiver)
}
// hawtdispatch does not have a way to get queue sizes, getting an accurate

View file

@ -4,12 +4,11 @@
package se.scalablesolutions.akka.dispatch
import se.scalablesolutions.akka.actor.{ActorRegistry, ActorRef, Uuid, ActorInitializationException}
import org.multiverse.commitbarriers.CountDownCommitBarrier
import java.util.concurrent._
import se.scalablesolutions.akka.util. {ReentrantGuard, Logging, HashCode}
import se.scalablesolutions.akka.util. {Switch, ReentrantGuard, Logging, HashCode}
import se.scalablesolutions.akka.actor._
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
@ -56,20 +55,34 @@ final class MessageInvocation(val receiver: ActorRef,
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
trait MessageDispatcher extends MailboxFactory with Logging {
protected val uuids = new ConcurrentSkipListSet[Uuid]
protected val guard = new ReentrantGuard
protected val active = new Switch(false)
/**
* Attaches the specified actorRef to this dispatcher
*/
final def attach(actorRef: ActorRef): Unit = guard withGuard {
register(actorRef)
}
/**
* Detaches the specified actorRef from this dispatcher
*/
final def detach(actorRef: ActorRef): Unit = guard withGuard {
unregister(actorRef)
}
private[akka] final def dispatchMessage(invocation: MessageInvocation): Unit = if (active.isOn) {
dispatch(invocation)
} else throw new IllegalActorStateException("Can't submit invocations to dispatcher since it's not started")
protected def register(actorRef: ActorRef) {
if (uuids.isEmpty()) start
if (uuids.isEmpty()) {
active.switchOn {
start
}
}
if (actorRef.mailbox eq null) actorRef.mailbox = createMailbox(actorRef)
uuids add actorRef.uuid
}
@ -77,28 +90,52 @@ trait MessageDispatcher extends MailboxFactory with Logging {
protected def unregister(actorRef: ActorRef) = {
if (uuids remove actorRef.uuid) {
actorRef.mailbox = null
if (uuids.isEmpty) shutdown // shut down in the dispatcher's references is zero
if (uuids.isEmpty){
active switchOff {
shutdown // shut down in the dispatcher's references is zero
}
}
}
}
/**
* Traverses the list of actors (uuids) currently being attached to this dispatcher and stops those actors
*/
def stopAllLinkedActors {
val i = uuids.iterator
while(i.hasNext()) {
val uuid = i.next()
ActorRegistry.actorFor(uuid) match {
case Some(actor) => actor.stop
case None => log.warn("stopAllLinkedActors couldn't find linked actor: " + uuid)
case None =>
log.error("stopAllLinkedActors couldn't find linked actor: " + uuid)
}
}
if(uuids.isEmpty) shutdown
}
/**
* After the call to this method, the dispatcher mustn't begin any new message processing for the specified reference
*/
def suspend(actorRef: ActorRef): Unit
/*
* After the call to this method, the dispatcher must begin any new message processing for the specified reference
*/
def resume(actorRef: ActorRef): Unit
def dispatch(invocation: MessageInvocation): Unit
/**
* Will be called when the dispatcher is to queue an invocation for execution
*/
protected def dispatch(invocation: MessageInvocation): Unit
/**
* Called one time every time an actor is attached to this dispatcher and this dispatcher was previously shutdown
*/
protected def start: Unit
/**
* Called one time every time an actor is detached from this dispatcher and this dispatcher has no actors left attached
*/
protected def shutdown: Unit
/**

View file

@ -28,8 +28,8 @@ class DataFlowTest extends Spec with ShouldMatchers with BeforeAndAfterAll {
val x, y, z = new DataFlowVariable[Int]
thread {
z << x() + y()
latch.countDown
result.set(z())
latch.countDown
}
thread { x << 40 }
thread { y << 2 }