diff --git a/akka-actor/src/main/scala/akka/dispatch/ExecutorBasedEventDrivenDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/ExecutorBasedEventDrivenDispatcher.scala index 4e48806a8c..1c0a9bdc84 100644 --- a/akka-actor/src/main/scala/akka/dispatch/ExecutorBasedEventDrivenDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/ExecutorBasedEventDrivenDispatcher.scala @@ -9,7 +9,7 @@ import akka.util.{ReflectiveAccess, Switch} import java.util.Queue import java.util.concurrent.atomic.AtomicReference -import java.util.concurrent.{ExecutorService, RejectedExecutionException, ConcurrentLinkedQueue, LinkedBlockingQueue} +import java.util.concurrent.{ TimeUnit, ExecutorService, RejectedExecutionException, ConcurrentLinkedQueue, LinkedBlockingQueue} /** * Default settings are: @@ -128,7 +128,7 @@ class ExecutorBasedEventDrivenDispatcher( private[akka] def registerForExecution(mbox: MessageQueue with ExecutableMailbox): Unit = if (active.isOn) { - if (mbox.suspended.isOff && mbox.dispatcherLock.tryLock()) { + if (!mbox.suspended.locked && mbox.dispatcherLock.tryLock()) { try { executorService.get() execute mbox } catch { @@ -143,14 +143,14 @@ class ExecutorBasedEventDrivenDispatcher( def suspend(actorRef: ActorRef) { log.slf4j.debug("Suspending {}",actorRef.uuid) - getMailbox(actorRef).suspended.switchOn + getMailbox(actorRef).suspended.tryLock } def resume(actorRef: ActorRef) { log.slf4j.debug("Resuming {}",actorRef.uuid) val mbox = getMailbox(actorRef) - mbox.suspended.switchOff - registerForExecution(mbox) + if (mbox.suspended.tryUnlock) + registerForExecution(mbox) } } @@ -162,12 +162,12 @@ trait ExecutableMailbox extends Runnable { self: MessageQueue => def dispatcher: ExecutorBasedEventDrivenDispatcher final def run = { - val reschedule = try { + try { try { processMailbox() } catch { case ie: InterruptedException => true } } finally { dispatcherLock.unlock() } - if (reschedule || !self.isEmpty) + if (!self.isEmpty) dispatcher.registerForExecution(this) } @@ -176,33 +176,33 @@ trait ExecutableMailbox extends Runnable { self: MessageQueue => * * @return true if the processing finished before the mailbox was empty, due to the throughput constraint */ - final def processMailbox(): Boolean = { - if (self.suspended.isOn) - true - else { + final def processMailbox() { + if (!self.suspended.locked) { var nextMessage = self.dequeue - if (nextMessage ne null) { - val throttle = dispatcher.throughput > 0 - var processedMessages = 0 - val isDeadlineEnabled = throttle && dispatcher.throughputDeadlineTime > 0 - val started = if (isDeadlineEnabled) System.currentTimeMillis else 0 - do { - nextMessage.invoke + if (nextMessage ne null) { //If we have a message + if (dispatcher.throughput <= 1) //If we only run one message per process + nextMessage.invoke //Just run it + else { //But otherwise, if we are throttled, we need to do some book-keeping + var processedMessages = 0 + val isDeadlineEnabled = dispatcher.throughputDeadlineTime > 0 + val deadlineNs = if (isDeadlineEnabled) System.nanoTime + TimeUnit.MILLISECONDS.toNanos(dispatcher.throughputDeadlineTime) else 0 + do { + nextMessage.invoke - if (throttle) { // Will be elided when false - processedMessages += 1 - if ((processedMessages >= dispatcher.throughput) || - (isDeadlineEnabled && (System.currentTimeMillis - started) >= dispatcher.throughputDeadlineTime)) // If we're throttled, break out - return !self.isEmpty - } - - if (self.suspended.isOn) - return true - - nextMessage = self.dequeue - } while (nextMessage ne null) + nextMessage = + if (self.suspended.locked) { + null //If we are suspended, abort + } + else { //If we aren't suspended, we need to make sure we're not overstepping our boundraries + processedMessages += 1 + if ((processedMessages >= dispatcher.throughput) || (isDeadlineEnabled && System.nanoTime >= deadlineNs)) // If we're throttled, break out + null //We reached our boundraries, abort + else + self.dequeue //Dequeue the next message + } + } while (nextMessage ne null) + } } - false } } } diff --git a/akka-actor/src/main/scala/akka/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala index 3d4a6c439b..54aec2607d 100644 --- a/akka-actor/src/main/scala/akka/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala @@ -95,13 +95,13 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher( * @return */ private def processMailbox(mailbox: MessageQueue): Boolean = try { - if (mailbox.suspended.isOn) + if (mailbox.suspended.locked) return false var messageInvocation = mailbox.dequeue while (messageInvocation ne null) { messageInvocation.invoke - if (mailbox.suspended.isOn) + if (mailbox.suspended.locked) return false messageInvocation = mailbox.dequeue } @@ -180,12 +180,12 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher( def suspend(actorRef: ActorRef) { - getMailbox(actorRef).suspended.switchOn + getMailbox(actorRef).suspended.tryLock } def resume(actorRef: ActorRef) { val mbox = getMailbox(actorRef) - mbox.suspended.switchOff + mbox.suspended.tryUnlock executorService.get() execute mbox } diff --git a/akka-actor/src/main/scala/akka/dispatch/MailboxHandling.scala b/akka-actor/src/main/scala/akka/dispatch/MailboxHandling.scala index 9b364b3af1..68e8cf68ce 100644 --- a/akka-actor/src/main/scala/akka/dispatch/MailboxHandling.scala +++ b/akka-actor/src/main/scala/akka/dispatch/MailboxHandling.scala @@ -19,7 +19,7 @@ class MessageQueueAppendFailedException(message: String) extends AkkaException(m */ trait MessageQueue { val dispatcherLock = new SimpleLock - val suspended = new Switch(false) + val suspended = new SimpleLock def enqueue(handle: MessageInvocation) def dequeue(): MessageInvocation def size: Int