Switching dispatching strategy to 1 runnable per mailbox and removing use of TransferQueue
This commit is contained in:
parent
03a54e1285
commit
6ae312fcd6
4 changed files with 34 additions and 227 deletions
|
|
@ -85,30 +85,15 @@ class ExecutorBasedEventDrivenDispatcher(
|
|||
*/
|
||||
trait ExecutableMailbox extends Runnable { self: MessageQueue =>
|
||||
final def run = {
|
||||
var lockAcquiredOnce = false
|
||||
var finishedBeforeMailboxEmpty = false
|
||||
// this do-while loop is required to prevent missing new messages between the end of the inner while
|
||||
// loop and releasing the lock
|
||||
do {
|
||||
finishedBeforeMailboxEmpty = false //Reset this every run
|
||||
if (dispatcherLock.tryLock()) {
|
||||
// Only dispatch if we got the lock. Otherwise another thread is already dispatching.
|
||||
lockAcquiredOnce = true
|
||||
finishedBeforeMailboxEmpty = try {
|
||||
processMailbox()
|
||||
} catch {
|
||||
case e =>
|
||||
dispatcherLock.unlock()
|
||||
if (!self.isEmpty)
|
||||
registerForExecution(self)
|
||||
throw e
|
||||
}
|
||||
|
||||
dispatcherLock.unlock()
|
||||
if (finishedBeforeMailboxEmpty)
|
||||
registerForExecution(self)
|
||||
}
|
||||
} while ((lockAcquiredOnce && !finishedBeforeMailboxEmpty && !self.isEmpty))
|
||||
val reschedule = try {
|
||||
processMailbox()
|
||||
} finally {
|
||||
dispatcherLock.unlock()
|
||||
}
|
||||
|
||||
if (reschedule || !self.isEmpty)
|
||||
registerForExecution(self)
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -144,6 +129,20 @@ class ExecutorBasedEventDrivenDispatcher(
|
|||
registerForExecution(mbox)
|
||||
}
|
||||
|
||||
protected def registerForExecution(mailbox: MessageQueue with ExecutableMailbox): Unit = if (active) {
|
||||
if (mailbox.dispatcherLock.tryLock()) {
|
||||
try {
|
||||
executor execute mailbox
|
||||
} catch {
|
||||
case e: RejectedExecutionException =>
|
||||
mailbox.dispatcherLock.unlock()
|
||||
throw e
|
||||
}
|
||||
}
|
||||
} else {
|
||||
log.warning("%s is shut down,\n\tignoring the rest of the messages in the mailbox of\n\t%s", toString, mailbox)
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the mailbox associated with the actor
|
||||
*/
|
||||
|
|
@ -158,11 +157,6 @@ class ExecutorBasedEventDrivenDispatcher(
|
|||
new DefaultUnboundedMessageQueue(blockDequeue = false) with ExecutableMailbox
|
||||
}
|
||||
|
||||
protected def registerForExecution(mailbox: MessageQueue with ExecutableMailbox): Unit = if (active) {
|
||||
executor execute mailbox
|
||||
} else {
|
||||
log.warning("%s is shut down,\n\tignoring the rest of the messages in the mailbox of\n\t%s", toString, mailbox)
|
||||
}
|
||||
|
||||
def start = if (!active) {
|
||||
log.debug("Starting up %s\n\twith throughput [%d]", toString, throughput)
|
||||
|
|
@ -186,6 +180,7 @@ class ExecutorBasedEventDrivenDispatcher(
|
|||
// FIXME: should we have an unbounded queue and not bounded as default ????
|
||||
private[akka] def init = {
|
||||
withNewThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity
|
||||
//withNewThreadPoolWithLinkedBlockingQueueWithCapacity(16)
|
||||
config(this)
|
||||
buildThreadPool
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue