diff --git a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala index 054697e10a..f15bc3cdc4 100644 --- a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala @@ -274,6 +274,11 @@ abstract class MessageDispatcher extends Serializable { protected[akka] def throughput: Int protected[akka] def throughputDeadlineTime: Int + @inline + protected[akka] final val isThroughputDeadlineTimeDefined = throughputDeadlineTime > 0 + @inline + protected[akka] final val isThroughputDefined = throughput > 1 + protected[akka] def executeTask(invocation: TaskInvocation) /** diff --git a/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala b/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala index 97a53d93a7..eddb636264 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala @@ -155,31 +155,29 @@ abstract class Mailbox extends AbstractMailbox with MessageQueue with SystemMess * @return true if the processing finished before the mailbox was empty, due to the throughput constraint */ final def processMailbox() { - processAllSystemMessages() + processAllSystemMessages() //First, process all system messages if (isActive) { var nextMessage = dequeue() if (nextMessage ne null) { //If we have a message - if (dispatcher.throughput <= 1) { //If we only run one message per process { - nextMessage.invoke //Just run it - processAllSystemMessages() - } else { //But otherwise, if we are throttled, we need to do some book-keeping + if (dispatcher.isThroughputDefined) { //If we're using throughput, we need to do some book-keeping var processedMessages = 0 - val isDeadlineEnabled = dispatcher.throughputDeadlineTime > 0 - val deadlineNs = if (isDeadlineEnabled) System.nanoTime + TimeUnit.MILLISECONDS.toNanos(dispatcher.throughputDeadlineTime) - else 0 + val deadlineNs = if (dispatcher.isThroughputDeadlineTimeDefined) System.nanoTime + TimeUnit.MILLISECONDS.toNanos(dispatcher.throughputDeadlineTime) else 0 do { nextMessage.invoke - processAllSystemMessages() + processAllSystemMessages() //After we're done, process all system messages nextMessage = if (isActive) { // If we aren't suspended, we need to make sure we're not overstepping our boundaries processedMessages += 1 - if ((processedMessages >= dispatcher.throughput) || (isDeadlineEnabled && System.nanoTime >= deadlineNs)) // If we're throttled, break out + if ((processedMessages >= dispatcher.throughput) || (dispatcher.isThroughputDeadlineTimeDefined && System.nanoTime >= deadlineNs)) // If we're throttled, break out null //We reached our boundaries, abort else dequeue //Dequeue the next message } else null //Abort } while (nextMessage ne null) + } else { //If we only run one message per process + nextMessage.invoke //Just run it + processAllSystemMessages() //After we're done, process all system messages } } } @@ -218,7 +216,7 @@ trait SystemMessageQueue { } trait DefaultSystemMessageQueue { self: SystemMessageQueue ⇒ - val systemMessages = new ConcurrentLinkedQueue[SystemEnvelope]() + final val systemMessages = new ConcurrentLinkedQueue[SystemEnvelope]() def systemEnqueue(handle: SystemEnvelope): Unit = systemMessages offer handle @@ -271,8 +269,8 @@ case class UnboundedMailbox() extends MailboxType { } case class BoundedMailbox( - val capacity: Int = { if (Dispatchers.MAILBOX_CAPACITY < 0) Int.MaxValue else Dispatchers.MAILBOX_CAPACITY }, - val pushTimeOut: Duration = Dispatchers.MAILBOX_PUSH_TIME_OUT) extends MailboxType { + final val capacity: Int = { if (Dispatchers.MAILBOX_CAPACITY < 0) Int.MaxValue else Dispatchers.MAILBOX_CAPACITY }, + final val pushTimeOut: Duration = Dispatchers.MAILBOX_PUSH_TIME_OUT) extends MailboxType { if (capacity < 0) throw new IllegalArgumentException("The capacity for BoundedMailbox can not be negative") if (pushTimeOut eq null) throw new IllegalArgumentException("The push time-out for BoundedMailbox can not be null") @@ -284,17 +282,17 @@ case class BoundedMailbox( } } -case class UnboundedPriorityMailbox(cmp: Comparator[Envelope]) extends MailboxType { +case class UnboundedPriorityMailbox( final val cmp: Comparator[Envelope]) extends MailboxType { override def create(_dispatcher: MessageDispatcher) = new Mailbox with QueueBasedMessageQueue with UnboundedMessageQueueSemantics with DefaultSystemMessageQueue { - val queue = new PriorityBlockingQueue[Envelope](11, cmp) + final val queue = new PriorityBlockingQueue[Envelope](11, cmp) final val dispatcher = _dispatcher } } case class BoundedPriorityMailbox( - val cmp: Comparator[Envelope], - val capacity: Int = { if (Dispatchers.MAILBOX_CAPACITY < 0) Int.MaxValue else Dispatchers.MAILBOX_CAPACITY }, - val pushTimeOut: Duration = Dispatchers.MAILBOX_PUSH_TIME_OUT) extends MailboxType { + final val cmp: Comparator[Envelope], + final val capacity: Int = { if (Dispatchers.MAILBOX_CAPACITY < 0) Int.MaxValue else Dispatchers.MAILBOX_CAPACITY }, + final val pushTimeOut: Duration = Dispatchers.MAILBOX_PUSH_TIME_OUT) extends MailboxType { if (capacity < 0) throw new IllegalArgumentException("The capacity for BoundedMailbox can not be negative") if (pushTimeOut eq null) throw new IllegalArgumentException("The push time-out for BoundedMailbox can not be null")