fixing HawtDispatchEventDrivenDispatcher so that it has at least one non-daemon thread while it's active

This commit is contained in:
Hiram Chirino 2010-07-21 15:10:22 -04:00
parent 06f4a838a9
commit 7a3a7763c0
4 changed files with 61 additions and 29 deletions

View file

@ -950,7 +950,8 @@ sealed class LocalActorRef private[akka](
* Callback for the dispatcher. This is the ingle entry point to the user Actor implementation.
*/
protected[akka] def invoke(messageHandle: MessageInvocation): Unit = guard.withGuard {
if (isShutdown) Actor.log.warning("Actor [%s] is shut down,\n\tignoring message [%s]", toString, messageHandle)
if (isShutdown)
Actor.log.warning("Actor [%s] is shut down,\n\tignoring message [%s]", toString, messageHandle)
else {
currentMessage = Option(messageHandle)
try {

View file

@ -112,8 +112,10 @@ class ExecutorBasedEventDrivenDispatcher(_name: String, throughput: Int = Dispat
} while ((lockAcquiredOnce && !finishedBeforeMailboxEmpty && !mailbox.isEmpty))
}
})
} else log.warning(
"%s is shut down,\n\tignoring the rest of the messages in the mailbox of\n\t%s", toString, receiver)
} else {
log.warning("%s is shut down,\n\tignoring the rest of the messages in the mailbox of\n\t%s", toString, receiver)
}
/**
* Process the messages in the mailbox of the given actor.

View file

@ -22,44 +22,73 @@ import org.fusesource.hawtdispatch.DispatchQueue
import org.fusesource.hawtdispatch.ScalaDispatch._
import actors.threadpool.AtomicInteger
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.CountDownLatch
object HawtDispatchEventDrivenDispatcher {
private val retained = new AtomicInteger()
@volatile private var shutdownLatch: CountDownLatch = _
private def retain = {
if( retained.getAndIncrement == 0 ) {
shutdownLatch = new CountDownLatch(1)
new Thread("HawtDispatch Non-Daemon") {
override def run = {
try {
shutdownLatch.await
} catch {
case _ =>
}
println("done");
}
}.start()
}
}
private def release = {
if( retained.decrementAndGet == 0 ) {
shutdownLatch.countDown
shutdownLatch = null
}
}
}
/**
* <p>
* An HawtDispatch based MessageDispatcher.
* An HawtDispatch based MessageDispatcher. Actors with this dispatcher are executed
* on the HawtDispatch thread pool which is restricted to only executing non blocking
* operations. Therefore, you can only use this dispatcher with actors which are purely
* computational or which use non-blocking IO.
* </p>
* <p>
* This dispatcher delivers messages to the actors in the order that they
* were producer at the sender.
* </p>
*
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
class HawtDispatchEventDrivenDispatcher(val name: String) extends MessageDispatcher {
// a counter used to track if the dispatcher is in use.
private val active = new AtomicBoolean()
private val retained = new AtomicInteger
class HawtDispatchEventDrivenDispatcher(parent:DispatchQueue=globalQueue) extends MessageDispatcher {
private val active = new AtomicBoolean(false)
def start = {
val rc = active.compareAndSet(false, true)
assert( rc )
retained.incrementAndGet
if( active.compareAndSet(false, true) ) {
HawtDispatchEventDrivenDispatcher.retain
}
}
def shutdown = {
val rc = active.compareAndSet(true, false)
assert( rc )
retained.decrementAndGet
if( active.compareAndSet(true, false) ) {
HawtDispatchEventDrivenDispatcher.release
}
}
def isShutdown = {
retained.get == 0
}
def isShutdown = !active.get
def dispatch(invocation: MessageInvocation) = if(active.get) {
retained.incrementAndGet
def dispatch(invocation: MessageInvocation) = if(active.get()) {
getMailbox(invocation.receiver) {
try {
invocation.invoke
} finally {
retained.decrementAndGet
}
invocation.invoke
}
} else {
log.warning("%s is shut down,\n\tignoring the the messages sent to\n\t%s", toString, invocation.receiver)
@ -77,11 +106,11 @@ class HawtDispatchEventDrivenDispatcher(val name: String) extends MessageDispatc
override def register(actorRef: ActorRef) = {
if( actorRef.mailbox == null ) {
actorRef.mailbox = createQueue(actorRef.toString)
actorRef.mailbox = parent.createSerialQueue(actorRef.toString)
}
super.register(actorRef)
}
override def toString = "HawtDispatchEventDrivenDispatcher["+name+"]"
override def toString = "HawtDispatchEventDrivenDispatcher"
}

View file

@ -8,7 +8,7 @@ import se.scalablesolutions.akka.dispatch.{HawtDispatchEventDrivenDispatcher, Di
object HawtDispatchEventDrivenDispatcherActorSpec {
class TestActor extends Actor {
self.dispatcher = new HawtDispatchEventDrivenDispatcher(self.uuid)
self.dispatcher = new HawtDispatchEventDrivenDispatcher()
def receive = {
case "Hello" =>
self.reply("World")
@ -21,7 +21,7 @@ object HawtDispatchEventDrivenDispatcherActorSpec {
val oneWay = new CountDownLatch(1)
}
class OneWayTestActor extends Actor {
self.dispatcher = new HawtDispatchEventDrivenDispatcher(self.uuid)
self.dispatcher = new HawtDispatchEventDrivenDispatcher()
def receive = {
case "OneWay" => OneWayTestActor.oneWay.countDown
}