Possible optimization for EBEDD

This commit is contained in:
Viktor Klang 2011-02-14 02:34:40 +01:00
parent 0fe4d8c6ee
commit fef5bc40a8
3 changed files with 36 additions and 36 deletions

View file

@ -9,7 +9,7 @@ import akka.util.{ReflectiveAccess, Switch}
import java.util.Queue import java.util.Queue
import java.util.concurrent.atomic.AtomicReference import java.util.concurrent.atomic.AtomicReference
import java.util.concurrent.{ExecutorService, RejectedExecutionException, ConcurrentLinkedQueue, LinkedBlockingQueue} import java.util.concurrent.{ TimeUnit, ExecutorService, RejectedExecutionException, ConcurrentLinkedQueue, LinkedBlockingQueue}
/** /**
* Default settings are: * Default settings are:
@ -128,7 +128,7 @@ class ExecutorBasedEventDrivenDispatcher(
private[akka] def registerForExecution(mbox: MessageQueue with ExecutableMailbox): Unit = if (active.isOn) { private[akka] def registerForExecution(mbox: MessageQueue with ExecutableMailbox): Unit = if (active.isOn) {
if (mbox.suspended.isOff && mbox.dispatcherLock.tryLock()) { if (!mbox.suspended.locked && mbox.dispatcherLock.tryLock()) {
try { try {
executorService.get() execute mbox executorService.get() execute mbox
} catch { } catch {
@ -143,14 +143,14 @@ class ExecutorBasedEventDrivenDispatcher(
def suspend(actorRef: ActorRef) { def suspend(actorRef: ActorRef) {
log.slf4j.debug("Suspending {}",actorRef.uuid) log.slf4j.debug("Suspending {}",actorRef.uuid)
getMailbox(actorRef).suspended.switchOn getMailbox(actorRef).suspended.tryLock
} }
def resume(actorRef: ActorRef) { def resume(actorRef: ActorRef) {
log.slf4j.debug("Resuming {}",actorRef.uuid) log.slf4j.debug("Resuming {}",actorRef.uuid)
val mbox = getMailbox(actorRef) val mbox = getMailbox(actorRef)
mbox.suspended.switchOff if (mbox.suspended.tryUnlock)
registerForExecution(mbox) registerForExecution(mbox)
} }
} }
@ -162,12 +162,12 @@ trait ExecutableMailbox extends Runnable { self: MessageQueue =>
def dispatcher: ExecutorBasedEventDrivenDispatcher def dispatcher: ExecutorBasedEventDrivenDispatcher
final def run = { final def run = {
val reschedule = try { try {
try { processMailbox() } catch { case ie: InterruptedException => true } try { processMailbox() } catch { case ie: InterruptedException => true }
} finally { } finally {
dispatcherLock.unlock() dispatcherLock.unlock()
} }
if (reschedule || !self.isEmpty) if (!self.isEmpty)
dispatcher.registerForExecution(this) dispatcher.registerForExecution(this)
} }
@ -176,33 +176,33 @@ trait ExecutableMailbox extends Runnable { self: MessageQueue =>
* *
* @return true if the processing finished before the mailbox was empty, due to the throughput constraint * @return true if the processing finished before the mailbox was empty, due to the throughput constraint
*/ */
final def processMailbox(): Boolean = { final def processMailbox() {
if (self.suspended.isOn) if (!self.suspended.locked) {
true
else {
var nextMessage = self.dequeue var nextMessage = self.dequeue
if (nextMessage ne null) { if (nextMessage ne null) { //If we have a message
val throttle = dispatcher.throughput > 0 if (dispatcher.throughput <= 1) //If we only run one message per process
var processedMessages = 0 nextMessage.invoke //Just run it
val isDeadlineEnabled = throttle && dispatcher.throughputDeadlineTime > 0 else { //But otherwise, if we are throttled, we need to do some book-keeping
val started = if (isDeadlineEnabled) System.currentTimeMillis else 0 var processedMessages = 0
do { val isDeadlineEnabled = dispatcher.throughputDeadlineTime > 0
nextMessage.invoke val deadlineNs = if (isDeadlineEnabled) System.nanoTime + TimeUnit.MILLISECONDS.toNanos(dispatcher.throughputDeadlineTime) else 0
do {
nextMessage.invoke
if (throttle) { // Will be elided when false nextMessage =
processedMessages += 1 if (self.suspended.locked) {
if ((processedMessages >= dispatcher.throughput) || null //If we are suspended, abort
(isDeadlineEnabled && (System.currentTimeMillis - started) >= dispatcher.throughputDeadlineTime)) // If we're throttled, break out }
return !self.isEmpty else { //If we aren't suspended, we need to make sure we're not overstepping our boundraries
} processedMessages += 1
if ((processedMessages >= dispatcher.throughput) || (isDeadlineEnabled && System.nanoTime >= deadlineNs)) // If we're throttled, break out
if (self.suspended.isOn) null //We reached our boundraries, abort
return true else
self.dequeue //Dequeue the next message
nextMessage = self.dequeue }
} while (nextMessage ne null) } while (nextMessage ne null)
}
} }
false
} }
} }
} }

View file

@ -95,13 +95,13 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher(
* @return * @return
*/ */
private def processMailbox(mailbox: MessageQueue): Boolean = try { private def processMailbox(mailbox: MessageQueue): Boolean = try {
if (mailbox.suspended.isOn) if (mailbox.suspended.locked)
return false return false
var messageInvocation = mailbox.dequeue var messageInvocation = mailbox.dequeue
while (messageInvocation ne null) { while (messageInvocation ne null) {
messageInvocation.invoke messageInvocation.invoke
if (mailbox.suspended.isOn) if (mailbox.suspended.locked)
return false return false
messageInvocation = mailbox.dequeue messageInvocation = mailbox.dequeue
} }
@ -180,12 +180,12 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher(
def suspend(actorRef: ActorRef) { def suspend(actorRef: ActorRef) {
getMailbox(actorRef).suspended.switchOn getMailbox(actorRef).suspended.tryLock
} }
def resume(actorRef: ActorRef) { def resume(actorRef: ActorRef) {
val mbox = getMailbox(actorRef) val mbox = getMailbox(actorRef)
mbox.suspended.switchOff mbox.suspended.tryUnlock
executorService.get() execute mbox executorService.get() execute mbox
} }

View file

@ -19,7 +19,7 @@ class MessageQueueAppendFailedException(message: String) extends AkkaException(m
*/ */
trait MessageQueue { trait MessageQueue {
val dispatcherLock = new SimpleLock val dispatcherLock = new SimpleLock
val suspended = new Switch(false) val suspended = new SimpleLock
def enqueue(handle: MessageInvocation) def enqueue(handle: MessageInvocation)
def dequeue(): MessageInvocation def dequeue(): MessageInvocation
def size: Int def size: Int