diff --git a/akka-core/src/main/scala/actor/ActorRef.scala b/akka-core/src/main/scala/actor/ActorRef.scala index 90c09892ce..ccbc1a8675 100644 --- a/akka-core/src/main/scala/actor/ActorRef.scala +++ b/akka-core/src/main/scala/actor/ActorRef.scala @@ -950,7 +950,8 @@ sealed class LocalActorRef private[akka]( * Callback for the dispatcher. This is the ingle entry point to the user Actor implementation. */ protected[akka] def invoke(messageHandle: MessageInvocation): Unit = guard.withGuard { - if (isShutdown) Actor.log.warning("Actor [%s] is shut down,\n\tignoring message [%s]", toString, messageHandle) + if (isShutdown) + Actor.log.warning("Actor [%s] is shut down,\n\tignoring message [%s]", toString, messageHandle) else { currentMessage = Option(messageHandle) try { diff --git a/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala b/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala index 89fb90d16e..42d1ae1620 100644 --- a/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala +++ b/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala @@ -112,8 +112,10 @@ class ExecutorBasedEventDrivenDispatcher(_name: String, throughput: Int = Dispat } while ((lockAcquiredOnce && !finishedBeforeMailboxEmpty && !mailbox.isEmpty)) } }) - } else log.warning( - "%s is shut down,\n\tignoring the rest of the messages in the mailbox of\n\t%s", toString, receiver) + } else { + log.warning("%s is shut down,\n\tignoring the rest of the messages in the mailbox of\n\t%s", toString, receiver) + } + /** * Process the messages in the mailbox of the given actor. diff --git a/akka-core/src/main/scala/dispatch/HawtDispatchEventDrivenDispatcher.scala b/akka-core/src/main/scala/dispatch/HawtDispatchEventDrivenDispatcher.scala index 6c0c1686fa..b6e4cbd6cf 100644 --- a/akka-core/src/main/scala/dispatch/HawtDispatchEventDrivenDispatcher.scala +++ b/akka-core/src/main/scala/dispatch/HawtDispatchEventDrivenDispatcher.scala @@ -22,44 +22,73 @@ import org.fusesource.hawtdispatch.DispatchQueue import org.fusesource.hawtdispatch.ScalaDispatch._ import actors.threadpool.AtomicInteger import java.util.concurrent.atomic.AtomicBoolean +import java.util.concurrent.CountDownLatch + +object HawtDispatchEventDrivenDispatcher { + + private val retained = new AtomicInteger() + @volatile private var shutdownLatch: CountDownLatch = _ + + private def retain = { + if( retained.getAndIncrement == 0 ) { + shutdownLatch = new CountDownLatch(1) + new Thread("HawtDispatch Non-Daemon") { + override def run = { + try { + shutdownLatch.await + } catch { + case _ => + } + println("done"); + } + }.start() + } + } + + private def release = { + if( retained.decrementAndGet == 0 ) { + shutdownLatch.countDown + shutdownLatch = null + } + } + +} /** *
- * An HawtDispatch based MessageDispatcher. + * An HawtDispatch based MessageDispatcher. Actors with this dispatcher are executed + * on the HawtDispatch thread pool which is restricted to only executing non blocking + * operations. Therefore, you can only use this dispatcher with actors which are purely + * computational or which use non-blocking IO. + *
+ *+ * This dispatcher delivers messages to the actors in the order that they + * were producer at the sender. *
* * @author Hiram Chirino */ -class HawtDispatchEventDrivenDispatcher(val name: String) extends MessageDispatcher { - - // a counter used to track if the dispatcher is in use. - private val active = new AtomicBoolean() - private val retained = new AtomicInteger +class HawtDispatchEventDrivenDispatcher(parent:DispatchQueue=globalQueue) extends MessageDispatcher { + private val active = new AtomicBoolean(false) + def start = { - val rc = active.compareAndSet(false, true) - assert( rc ) - retained.incrementAndGet + if( active.compareAndSet(false, true) ) { + HawtDispatchEventDrivenDispatcher.retain + } } def shutdown = { - val rc = active.compareAndSet(true, false) - assert( rc ) - retained.decrementAndGet + if( active.compareAndSet(true, false) ) { + HawtDispatchEventDrivenDispatcher.release + } } - def isShutdown = { - retained.get == 0 - } + def isShutdown = !active.get - def dispatch(invocation: MessageInvocation) = if(active.get) { - retained.incrementAndGet + def dispatch(invocation: MessageInvocation) = if(active.get()) { getMailbox(invocation.receiver) { - try { - invocation.invoke - } finally { - retained.decrementAndGet - } + invocation.invoke } } else { log.warning("%s is shut down,\n\tignoring the the messages sent to\n\t%s", toString, invocation.receiver) @@ -77,11 +106,11 @@ class HawtDispatchEventDrivenDispatcher(val name: String) extends MessageDispatc override def register(actorRef: ActorRef) = { if( actorRef.mailbox == null ) { - actorRef.mailbox = createQueue(actorRef.toString) + actorRef.mailbox = parent.createSerialQueue(actorRef.toString) } super.register(actorRef) } - override def toString = "HawtDispatchEventDrivenDispatcher["+name+"]" + override def toString = "HawtDispatchEventDrivenDispatcher" } \ No newline at end of file diff --git a/akka-core/src/test/scala/HawtDispatchEventDrivenDispatcherActorSpec.scala b/akka-core/src/test/scala/HawtDispatchEventDrivenDispatcherActorSpec.scala index fbaa7e8b43..ce070ab7a7 100644 --- a/akka-core/src/test/scala/HawtDispatchEventDrivenDispatcherActorSpec.scala +++ b/akka-core/src/test/scala/HawtDispatchEventDrivenDispatcherActorSpec.scala @@ -8,7 +8,7 @@ import se.scalablesolutions.akka.dispatch.{HawtDispatchEventDrivenDispatcher, Di object HawtDispatchEventDrivenDispatcherActorSpec { class TestActor extends Actor { - self.dispatcher = new HawtDispatchEventDrivenDispatcher(self.uuid) + self.dispatcher = new HawtDispatchEventDrivenDispatcher() def receive = { case "Hello" => self.reply("World") @@ -21,7 +21,7 @@ object HawtDispatchEventDrivenDispatcherActorSpec { val oneWay = new CountDownLatch(1) } class OneWayTestActor extends Actor { - self.dispatcher = new HawtDispatchEventDrivenDispatcher(self.uuid) + self.dispatcher = new HawtDispatchEventDrivenDispatcher() def receive = { case "OneWay" => OneWayTestActor.oneWay.countDown }