diff --git a/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala b/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala index 63ce310848..7a86c47c51 100644 --- a/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala +++ b/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala @@ -189,3 +189,13 @@ class ExecutorBasedEventDrivenDispatcher( buildThreadPool } } + +/** + * Usable to create a single-threaded dispatcher + */ +object SingleThread extends Function1[ThreadPoolBuilder,Unit] { + def apply(b: ThreadPoolBuilder) { + b setCorePoolSize 1 + b setMaxPoolSize 1 + } +} \ No newline at end of file diff --git a/akka-actor/src/main/scala/dispatch/ThreadBasedDispatcher.scala b/akka-actor/src/main/scala/dispatch/ThreadBasedDispatcher.scala index eda5a86a9e..090be85cee 100644 --- a/akka-actor/src/main/scala/dispatch/ThreadBasedDispatcher.scala +++ b/akka-actor/src/main/scala/dispatch/ThreadBasedDispatcher.scala @@ -11,6 +11,14 @@ import se.scalablesolutions.akka.config.Config.config import concurrent.forkjoin.{TransferQueue, LinkedTransferQueue} import java.util.concurrent.{ConcurrentLinkedQueue, BlockingQueue, TimeUnit, LinkedBlockingQueue} +object ThreadBasedDispatcher { + def oneThread(b: ThreadPoolBuilder) { + b setCorePoolSize 1 + b setMaxPoolSize 1 + b setAllowCoreThreadTimeout true + } +} + /** * Dedicates a unique thread for each actor passed in as reference. Served through its messageQueue. * @@ -18,16 +26,14 @@ import java.util.concurrent.{ConcurrentLinkedQueue, BlockingQueue, TimeUnit, Lin */ class ThreadBasedDispatcher(private val actor: ActorRef, val mailboxConfig: MailboxConfig - ) extends MessageDispatcher { + ) extends ExecutorBasedEventDrivenDispatcher( + actor.getClass.getName + ":" + actor.uuid, + Dispatchers.THROUGHPUT, + -1, + mailboxConfig, + ThreadBasedDispatcher.oneThread) { def this(actor: ActorRef, capacity: Int) = this(actor,MailboxConfig(capacity,None,true)) def this(actor: ActorRef) = this(actor, Dispatchers.MAILBOX_CAPACITY)// For Java - - private val name = actor.getClass.getName + ":" + actor.uuid - private val threadName = "akka:thread-based:dispatcher:" + name - private var selectorThread: Thread = _ - @volatile private var active: Boolean = false - - override def createMailbox(actorRef: ActorRef): AnyRef = mailboxConfig.newMailbox(blockDequeue = true) override def register(actorRef: ActorRef) = { if(actorRef != actor) @@ -36,35 +42,5 @@ class ThreadBasedDispatcher(private val actor: ActorRef, super.register(actorRef) } - def mailbox = actor.mailbox.asInstanceOf[Queue[MessageInvocation] with MessageQueue] - - def mailboxSize(a: ActorRef) = mailbox.size - - def dispatch(invocation: MessageInvocation) = mailbox enqueue invocation - - def start = if (!active) { - log.debug("Starting up %s", toString) - active = true - selectorThread = new Thread(threadName) { - override def run = { - while (active) { - try { - actor.invoke(mailbox.dequeue) - } catch { case e: InterruptedException => active = false } - } - } - } - selectorThread.start - } - - def isShutdown = !active - - def shutdown = if (active) { - log.debug("Shutting down %s", toString) - active = false - selectorThread.interrupt - uuids.clear - } - - override def toString = "ThreadBasedDispatcher[" + threadName + "]" + override def toString = "ThreadBasedDispatcher[" + name + "]" } \ No newline at end of file