Adding final declarations on a number of case-classes in Mailbox.scala, and also made constants of methods that were called frequently in the hot path
This commit is contained in:
parent
913ef5d232
commit
4313a28fbd
2 changed files with 21 additions and 18 deletions
|
|
@ -155,31 +155,29 @@ abstract class Mailbox extends AbstractMailbox with MessageQueue with SystemMess
|
|||
* @return true if the processing finished before the mailbox was empty, due to the throughput constraint
|
||||
*/
|
||||
final def processMailbox() {
|
||||
processAllSystemMessages()
|
||||
processAllSystemMessages() //First, process all system messages
|
||||
|
||||
if (isActive) {
|
||||
var nextMessage = dequeue()
|
||||
if (nextMessage ne null) { //If we have a message
|
||||
if (dispatcher.throughput <= 1) { //If we only run one message per process {
|
||||
nextMessage.invoke //Just run it
|
||||
processAllSystemMessages()
|
||||
} else { //But otherwise, if we are throttled, we need to do some book-keeping
|
||||
if (dispatcher.isThroughputDefined) { //If we're using throughput, we need to do some book-keeping
|
||||
var processedMessages = 0
|
||||
val isDeadlineEnabled = dispatcher.throughputDeadlineTime > 0
|
||||
val deadlineNs = if (isDeadlineEnabled) System.nanoTime + TimeUnit.MILLISECONDS.toNanos(dispatcher.throughputDeadlineTime)
|
||||
else 0
|
||||
val deadlineNs = if (dispatcher.isThroughputDeadlineTimeDefined) System.nanoTime + TimeUnit.MILLISECONDS.toNanos(dispatcher.throughputDeadlineTime) else 0
|
||||
do {
|
||||
nextMessage.invoke
|
||||
|
||||
processAllSystemMessages()
|
||||
processAllSystemMessages() //After we're done, process all system messages
|
||||
|
||||
nextMessage = if (isActive) { // If we aren't suspended, we need to make sure we're not overstepping our boundaries
|
||||
processedMessages += 1
|
||||
if ((processedMessages >= dispatcher.throughput) || (isDeadlineEnabled && System.nanoTime >= deadlineNs)) // If we're throttled, break out
|
||||
if ((processedMessages >= dispatcher.throughput) || (dispatcher.isThroughputDeadlineTimeDefined && System.nanoTime >= deadlineNs)) // If we're throttled, break out
|
||||
null //We reached our boundaries, abort
|
||||
else dequeue //Dequeue the next message
|
||||
} else null //Abort
|
||||
} while (nextMessage ne null)
|
||||
} else { //If we only run one message per process
|
||||
nextMessage.invoke //Just run it
|
||||
processAllSystemMessages() //After we're done, process all system messages
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -218,7 +216,7 @@ trait SystemMessageQueue {
|
|||
}
|
||||
|
||||
trait DefaultSystemMessageQueue { self: SystemMessageQueue ⇒
|
||||
val systemMessages = new ConcurrentLinkedQueue[SystemEnvelope]()
|
||||
final val systemMessages = new ConcurrentLinkedQueue[SystemEnvelope]()
|
||||
|
||||
def systemEnqueue(handle: SystemEnvelope): Unit = systemMessages offer handle
|
||||
|
||||
|
|
@ -271,8 +269,8 @@ case class UnboundedMailbox() extends MailboxType {
|
|||
}
|
||||
|
||||
case class BoundedMailbox(
|
||||
val capacity: Int = { if (Dispatchers.MAILBOX_CAPACITY < 0) Int.MaxValue else Dispatchers.MAILBOX_CAPACITY },
|
||||
val pushTimeOut: Duration = Dispatchers.MAILBOX_PUSH_TIME_OUT) extends MailboxType {
|
||||
final val capacity: Int = { if (Dispatchers.MAILBOX_CAPACITY < 0) Int.MaxValue else Dispatchers.MAILBOX_CAPACITY },
|
||||
final val pushTimeOut: Duration = Dispatchers.MAILBOX_PUSH_TIME_OUT) extends MailboxType {
|
||||
|
||||
if (capacity < 0) throw new IllegalArgumentException("The capacity for BoundedMailbox can not be negative")
|
||||
if (pushTimeOut eq null) throw new IllegalArgumentException("The push time-out for BoundedMailbox can not be null")
|
||||
|
|
@ -284,17 +282,17 @@ case class BoundedMailbox(
|
|||
}
|
||||
}
|
||||
|
||||
case class UnboundedPriorityMailbox(cmp: Comparator[Envelope]) extends MailboxType {
|
||||
case class UnboundedPriorityMailbox( final val cmp: Comparator[Envelope]) extends MailboxType {
|
||||
override def create(_dispatcher: MessageDispatcher) = new Mailbox with QueueBasedMessageQueue with UnboundedMessageQueueSemantics with DefaultSystemMessageQueue {
|
||||
val queue = new PriorityBlockingQueue[Envelope](11, cmp)
|
||||
final val queue = new PriorityBlockingQueue[Envelope](11, cmp)
|
||||
final val dispatcher = _dispatcher
|
||||
}
|
||||
}
|
||||
|
||||
case class BoundedPriorityMailbox(
|
||||
val cmp: Comparator[Envelope],
|
||||
val capacity: Int = { if (Dispatchers.MAILBOX_CAPACITY < 0) Int.MaxValue else Dispatchers.MAILBOX_CAPACITY },
|
||||
val pushTimeOut: Duration = Dispatchers.MAILBOX_PUSH_TIME_OUT) extends MailboxType {
|
||||
final val cmp: Comparator[Envelope],
|
||||
final val capacity: Int = { if (Dispatchers.MAILBOX_CAPACITY < 0) Int.MaxValue else Dispatchers.MAILBOX_CAPACITY },
|
||||
final val pushTimeOut: Duration = Dispatchers.MAILBOX_PUSH_TIME_OUT) extends MailboxType {
|
||||
|
||||
if (capacity < 0) throw new IllegalArgumentException("The capacity for BoundedMailbox can not be negative")
|
||||
if (pushTimeOut eq null) throw new IllegalArgumentException("The push time-out for BoundedMailbox can not be null")
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue