Commented out the BoundedPriorityMailbox, since it wasn´t bounded, and broke out the mailbox logic into PriorityMailbox
This commit is contained in:
parent
0b5858f46f
commit
00dea71492
2 changed files with 22 additions and 16 deletions
|
|
@ -228,36 +228,40 @@ trait ExecutableMailbox extends Runnable { self: MessageQueue =>
|
||||||
*/
|
*/
|
||||||
class PriorityExecutorBasedEventDrivenDispatcher(
|
class PriorityExecutorBasedEventDrivenDispatcher(
|
||||||
name: String,
|
name: String,
|
||||||
comparator: java.util.Comparator[MessageInvocation],
|
val comparator: java.util.Comparator[MessageInvocation],
|
||||||
throughput: Int = Dispatchers.THROUGHPUT,
|
throughput: Int = Dispatchers.THROUGHPUT,
|
||||||
throughputDeadlineTime: Int = Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS,
|
throughputDeadlineTime: Int = Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS,
|
||||||
mailboxType: MailboxType = Dispatchers.MAILBOX_TYPE,
|
mailboxType: UnboundedMailbox = UnboundedMailbox(false),
|
||||||
config: ThreadPoolConfig = ThreadPoolConfig()
|
config: ThreadPoolConfig = ThreadPoolConfig()
|
||||||
) extends ExecutorBasedEventDrivenDispatcher(name, throughput, throughputDeadlineTime, mailboxType, config) {
|
) extends ExecutorBasedEventDrivenDispatcher(name, throughput, throughputDeadlineTime, mailboxType, config) with PriorityMailbox {
|
||||||
|
|
||||||
def this(name: String, comparator: java.util.Comparator[MessageInvocation], throughput: Int, throughputDeadlineTime: Int, mailboxType: MailboxType) =
|
def this(name: String, comparator: java.util.Comparator[MessageInvocation], throughput: Int, throughputDeadlineTime: Int, mailboxType: UnboundedMailbox) =
|
||||||
this(name, comparator, throughput, throughputDeadlineTime, mailboxType,ThreadPoolConfig()) // Needed for Java API usage
|
this(name, comparator, throughput, throughputDeadlineTime, mailboxType,ThreadPoolConfig()) // Needed for Java API usage
|
||||||
|
|
||||||
def this(name: String, comparator: java.util.Comparator[MessageInvocation], throughput: Int, mailboxType: MailboxType) =
|
def this(name: String, comparator: java.util.Comparator[MessageInvocation], throughput: Int, mailboxType: UnboundedMailbox) =
|
||||||
this(name, comparator, throughput, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, mailboxType) // Needed for Java API usage
|
this(name, comparator, throughput, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, mailboxType) // Needed for Java API usage
|
||||||
|
|
||||||
def this(name: String, comparator: java.util.Comparator[MessageInvocation], throughput: Int) =
|
def this(name: String, comparator: java.util.Comparator[MessageInvocation], throughput: Int) =
|
||||||
this(name, comparator, throughput, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, Dispatchers.MAILBOX_TYPE) // Needed for Java API usage
|
this(name, comparator, throughput, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, UnboundedMailbox(false)) // Needed for Java API usage
|
||||||
|
|
||||||
def this(name: String, comparator: java.util.Comparator[MessageInvocation], config: ThreadPoolConfig) =
|
def this(name: String, comparator: java.util.Comparator[MessageInvocation], config: ThreadPoolConfig) =
|
||||||
this(name, comparator, Dispatchers.THROUGHPUT, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, Dispatchers.MAILBOX_TYPE, config)
|
this(name, comparator, Dispatchers.THROUGHPUT, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, UnboundedMailbox(false), config)
|
||||||
|
|
||||||
def this(name: String, comparator: java.util.Comparator[MessageInvocation]) =
|
def this(name: String, comparator: java.util.Comparator[MessageInvocation]) =
|
||||||
this(name, comparator, Dispatchers.THROUGHPUT, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, Dispatchers.MAILBOX_TYPE) // Needed for Java API usage
|
this(name, comparator, Dispatchers.THROUGHPUT, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, UnboundedMailbox(false)) // Needed for Java API usage
|
||||||
|
}
|
||||||
|
|
||||||
override def createMailbox(actorRef: ActorRef): AnyRef = mailboxType match {
|
trait PriorityMailbox { self: ExecutorBasedEventDrivenDispatcher =>
|
||||||
|
def comparator: java.util.Comparator[MessageInvocation]
|
||||||
|
|
||||||
|
override def createMailbox(actorRef: ActorRef): AnyRef = self.mailboxType match {
|
||||||
case UnboundedMailbox(blocking) => new UnboundedPriorityMessageQueue(blocking, comparator) with ExecutableMailbox {
|
case UnboundedMailbox(blocking) => new UnboundedPriorityMessageQueue(blocking, comparator) with ExecutableMailbox {
|
||||||
def dispatcher = PriorityExecutorBasedEventDrivenDispatcher.this
|
def dispatcher = self
|
||||||
}
|
}
|
||||||
|
|
||||||
case BoundedMailbox(blocking, capacity, pushTimeOut) =>
|
case BoundedMailbox(blocking, capacity, pushTimeOut) => throw new IllegalStateException("PriorityMailbox does not work when a Bounded mailbox is specified.")
|
||||||
new BoundedPriorityMessageQueue(capacity, pushTimeOut, blocking, comparator) with ExecutableMailbox {
|
/*new BoundedPriorityMessageQueue(capacity, pushTimeOut, blocking, comparator) with ExecutableMailbox {
|
||||||
def dispatcher = PriorityExecutorBasedEventDrivenDispatcher.this
|
def dispatcher = self
|
||||||
}
|
}*/
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -81,6 +81,8 @@ class UnboundedPriorityMessageQueue(val blockDequeue: Boolean, cmp: Comparator[M
|
||||||
PriorityBlockingQueue[MessageInvocation](11, cmp) with
|
PriorityBlockingQueue[MessageInvocation](11, cmp) with
|
||||||
UnboundedMessageQueueSemantics
|
UnboundedMessageQueueSemantics
|
||||||
|
|
||||||
class BoundedPriorityMessageQueue(capacity: Int, val pushTimeOut: Duration, val blockDequeue: Boolean, cmp: Comparator[MessageInvocation]) extends
|
/* PriorityBlockingQueue cannot be bounded
|
||||||
|
class BoundedPriorityMessageQueue(capacity: Int, val pushTimeOut: Duration, val blockDequeue: Boolean, cmp: Comparator[MessageInvocation]) extends
|
||||||
PriorityBlockingQueue[MessageInvocation](capacity, cmp) with
|
PriorityBlockingQueue[MessageInvocation](capacity, cmp) with
|
||||||
BoundedMessageQueueSemantics
|
BoundedMessageQueueSemantics
|
||||||
|
*/
|
||||||
Loading…
Add table
Add a link
Reference in a new issue