Adding unbounded and bounded MessageQueues based on PriorityBlockingQueue

This commit is contained in:
Viktor Klang 2011-03-09 18:11:45 +01:00
parent 9f4144c5f3
commit 42cfe2720a
2 changed files with 24 additions and 8 deletions

View file

@ -7,7 +7,7 @@ package akka.dispatch
import akka.actor.{Actor, ActorType, ActorRef, ActorInitializationException}
import akka.AkkaException
import java.util.{Queue, List}
import java.util.{Queue, List, Comparator}
import java.util.concurrent._
import concurrent.forkjoin.LinkedTransferQueue
import akka.util._
@ -40,8 +40,8 @@ case class BoundedMailbox(
if (pushTimeOut eq null) throw new IllegalArgumentException("The push time-out for BoundedMailbox can not be null")
}
class DefaultUnboundedMessageQueue(blockDequeue: Boolean)
extends LinkedBlockingQueue[MessageInvocation] with MessageQueue {
trait UnboundedMessageQueueSemantics extends MessageQueue { self: BlockingQueue[MessageInvocation] =>
def blockDequeue: Boolean
final def enqueue(handle: MessageInvocation) {
this add handle
@ -53,8 +53,9 @@ class DefaultUnboundedMessageQueue(blockDequeue: Boolean)
}
}
class DefaultBoundedMessageQueue(capacity: Int, pushTimeOut: Duration, blockDequeue: Boolean)
extends LinkedBlockingQueue[MessageInvocation](capacity) with MessageQueue {
trait BoundedMessageQueueSemantics extends MessageQueue { self: BlockingQueue[MessageInvocation] =>
def blockDequeue: Boolean
def pushTimeOut: Duration
final def enqueue(handle: MessageInvocation) {
if (pushTimeOut.toMillis > 0) {
@ -66,4 +67,20 @@ class DefaultBoundedMessageQueue(capacity: Int, pushTimeOut: Duration, blockDequ
final def dequeue(): MessageInvocation =
if (blockDequeue) this.take()
else this.poll()
}
}
class DefaultUnboundedMessageQueue(val blockDequeue: Boolean) extends
LinkedBlockingQueue[MessageInvocation] with
UnboundedMessageQueueSemantics
class DefaultBoundedMessageQueue(capacity: Int, val pushTimeOut: Duration, val blockDequeue: Boolean) extends
LinkedBlockingQueue[MessageInvocation](capacity) with
BoundedMessageQueueSemantics
class UnboundedPriorityMessageQueue(val blockDequeue: Boolean, cmp: Comparator[MessageInvocation]) extends
PriorityBlockingQueue[MessageInvocation](11, cmp) with
UnboundedMessageQueueSemantics
class BoundedPriorityMessageQueue(capacity: Int, val pushTimeOut: Duration, val blockDequeue: Boolean, cmp: Comparator[MessageInvocation]) extends
PriorityBlockingQueue[MessageInvocation](capacity, cmp) with
BoundedMessageQueueSemantics