This commit is contained in:
Viktor Klang 2010-10-22 17:50:48 +02:00
parent 7b56928bca
commit 3ecb38b5b7
8 changed files with 322 additions and 212 deletions

View file

@ -8,8 +8,9 @@ import se.scalablesolutions.akka.actor.{ActorRef, IllegalActorStateException}
import se.scalablesolutions.akka.util.ReflectiveAccess.EnterpriseModule
import java.util.Queue
import java.util.concurrent.{RejectedExecutionException, ConcurrentLinkedQueue, LinkedBlockingQueue}
import se.scalablesolutions.akka.util.Switch
import java.util.concurrent.atomic.AtomicReference
import java.util.concurrent. {ExecutorService, RejectedExecutionException, ConcurrentLinkedQueue, LinkedBlockingQueue}
/**
* Default settings are:
@ -69,11 +70,11 @@ class ExecutorBasedEventDrivenDispatcher(
val throughput: Int = Dispatchers.THROUGHPUT,
val throughputDeadlineTime: Int = Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS,
_mailboxType: MailboxType = Dispatchers.MAILBOX_TYPE,
config: (ThreadPoolBuilder) => Unit = _ => ())
extends MessageDispatcher with ThreadPoolBuilder {
val config: ThreadPoolConfig = ThreadPoolConfig())
extends MessageDispatcher {
def this(_name: String, throughput: Int, throughputDeadlineTime: Int, mailboxType: MailboxType) =
this(_name, throughput, throughputDeadlineTime, mailboxType, _ => ()) // Needed for Java API usage
this(_name, throughput, throughputDeadlineTime, mailboxType,ThreadPoolConfig()) // Needed for Java API usage
def this(_name: String, throughput: Int, mailboxType: MailboxType) =
this(_name, throughput, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, mailboxType) // Needed for Java API usage
@ -84,14 +85,12 @@ class ExecutorBasedEventDrivenDispatcher(
def this(_name: String) =
this(_name, Dispatchers.THROUGHPUT, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, Dispatchers.MAILBOX_TYPE) // Needed for Java API usage
val name = "akka:event-driven:dispatcher:" + _name
val mailboxType = Some(_mailboxType)
private[akka] val active = new Switch(false)
val name = "akka:event-driven:dispatcher:" + _name
//Initialize
init
private[akka] val threadFactory = new MonitorableThreadFactory(name)
private[akka] val executorService = new AtomicReference[ExecutorService](null)
def dispatch(invocation: MessageInvocation) = {
val mbox = getMailbox(invocation.receiver)
@ -99,6 +98,8 @@ class ExecutorBasedEventDrivenDispatcher(
registerForExecution(mbox)
}
def isShutdown = active.isOff
/**
* @return the mailbox associated with the actor
*/
@ -112,8 +113,7 @@ class ExecutorBasedEventDrivenDispatcher(
}
case BoundedMailbox(blocking, capacity, pushTimeOut) =>
val cap = if (mailboxCapacity == -1) capacity else mailboxCapacity
new DefaultBoundedMessageQueue(cap, pushTimeOut, blocking) with ExecutableMailbox {
new DefaultBoundedMessageQueue(capacity, pushTimeOut, blocking) with ExecutableMailbox {
def dispatcher = ExecutorBasedEventDrivenDispatcher.this
}
}
@ -131,25 +131,28 @@ class ExecutorBasedEventDrivenDispatcher(
case JMSBasedDurableMailbox(serializer) => throw new UnsupportedOperationException("JMSBasedDurableMailbox is not yet supported")
}
def start = active switchOn {
def start: Unit = if (active.isOff) active switchOn {
log.debug("Starting up %s\n\twith throughput [%d]", toString, throughput)
if (executorService.get() eq null) {
val newExecutor = config.createExecutorService(threadFactory)
if (!executorService.compareAndSet(null,newExecutor))
log.error("Thought the ExecutorService was missing but appeared out of nowhere!")
}
}
def shutdown = active switchOff {
log.debug("Shutting down %s", toString)
executor.shutdownNow
uuids.clear
}
def ensureNotActive(): Unit = if (active.isOn) {
throw new IllegalActorStateException(
"Can't build a new thread pool for a dispatcher that is already up and running")
def shutdown: Unit = if (active.isOn) active switchOff {
val old = executorService.getAndSet(null)
if (old ne null) {
log.debug("Shutting down %s", toString)
old.shutdownNow()
uuids.clear
}
}
private[akka] def registerForExecution(mbox: MessageQueue with ExecutableMailbox): Unit = if (active.isOn) {
if (mbox.suspended.isOff && mbox.dispatcherLock.tryLock()) {
try {
executor execute mbox
executorService.get() execute mbox
} catch {
case e: RejectedExecutionException =>
mbox.dispatcherLock.unlock()
@ -171,13 +174,6 @@ class ExecutorBasedEventDrivenDispatcher(
mbox.suspended.switchOff
registerForExecution(mbox)
}
// FIXME: should we have an unbounded queue and not bounded as default ????
private[akka] def init {
withNewThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity
config(this)
buildThreadPool
}
}
/**