Threw away old ThreadBasedDispatcher and replaced it with an EBEDD with 1 in core pool and 1 in max pool

This commit is contained in:
Viktor Klang 2010-09-20 16:56:12 +02:00
parent 16a7a3ecb0
commit b1462ade30
2 changed files with 25 additions and 39 deletions

View file

@ -189,3 +189,13 @@ class ExecutorBasedEventDrivenDispatcher(
buildThreadPool
}
}
/**
* Usable to create a single-threaded dispatcher
*/
object SingleThread extends Function1[ThreadPoolBuilder,Unit] {
def apply(b: ThreadPoolBuilder) {
b setCorePoolSize 1
b setMaxPoolSize 1
}
}

View file

@ -11,6 +11,14 @@ import se.scalablesolutions.akka.config.Config.config
import concurrent.forkjoin.{TransferQueue, LinkedTransferQueue}
import java.util.concurrent.{ConcurrentLinkedQueue, BlockingQueue, TimeUnit, LinkedBlockingQueue}
object ThreadBasedDispatcher {
def oneThread(b: ThreadPoolBuilder) {
b setCorePoolSize 1
b setMaxPoolSize 1
b setAllowCoreThreadTimeout true
}
}
/**
* Dedicates a unique thread for each actor passed in as reference. Served through its messageQueue.
*
@ -18,17 +26,15 @@ import java.util.concurrent.{ConcurrentLinkedQueue, BlockingQueue, TimeUnit, Lin
*/
class ThreadBasedDispatcher(private val actor: ActorRef,
val mailboxConfig: MailboxConfig
) extends MessageDispatcher {
) extends ExecutorBasedEventDrivenDispatcher(
actor.getClass.getName + ":" + actor.uuid,
Dispatchers.THROUGHPUT,
-1,
mailboxConfig,
ThreadBasedDispatcher.oneThread) {
def this(actor: ActorRef, capacity: Int) = this(actor,MailboxConfig(capacity,None,true))
def this(actor: ActorRef) = this(actor, Dispatchers.MAILBOX_CAPACITY)// For Java
private val name = actor.getClass.getName + ":" + actor.uuid
private val threadName = "akka:thread-based:dispatcher:" + name
private var selectorThread: Thread = _
@volatile private var active: Boolean = false
override def createMailbox(actorRef: ActorRef): AnyRef = mailboxConfig.newMailbox(blockDequeue = true)
override def register(actorRef: ActorRef) = {
if(actorRef != actor)
throw new IllegalArgumentException("Cannot register to anyone but " + actor)
@ -36,35 +42,5 @@ class ThreadBasedDispatcher(private val actor: ActorRef,
super.register(actorRef)
}
def mailbox = actor.mailbox.asInstanceOf[Queue[MessageInvocation] with MessageQueue]
def mailboxSize(a: ActorRef) = mailbox.size
def dispatch(invocation: MessageInvocation) = mailbox enqueue invocation
def start = if (!active) {
log.debug("Starting up %s", toString)
active = true
selectorThread = new Thread(threadName) {
override def run = {
while (active) {
try {
actor.invoke(mailbox.dequeue)
} catch { case e: InterruptedException => active = false }
}
}
}
selectorThread.start
}
def isShutdown = !active
def shutdown = if (active) {
log.debug("Shutting down %s", toString)
active = false
selectorThread.interrupt
uuids.clear
}
override def toString = "ThreadBasedDispatcher[" + threadName + "]"
override def toString = "ThreadBasedDispatcher[" + name + "]"
}