Rewriting the Balancing dispatcher

This commit is contained in:
Viktor Klang 2011-09-23 09:33:53 +02:00
parent e4e8ddc2a7
commit 1662d25944
7 changed files with 136 additions and 129 deletions

View file

@ -16,7 +16,7 @@ class MessageQueueAppendFailedException(message: String, cause: Throwable = null
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
trait Mailbox extends Runnable {
abstract class Mailbox extends MessageQueue with SystemMessageQueue with Runnable {
/*
* Internal implementation of MessageDispatcher uses these, don't touch or rely on
*/
@ -78,41 +78,48 @@ trait Mailbox extends Runnable {
}
}
def dispatcher: MessageDispatcher
}
trait MessageQueue {
/*
* These method need to be implemented in subclasses; they should not rely on the internal stuff above.
*/
def enqueue(handle: Envelope)
def dequeue(): Envelope
def numberOfMessages: Int
def systemEnqueue(handle: SystemEnvelope): Unit
def systemDequeue(): SystemEnvelope
def hasMessages: Boolean
def hasSystemMessages: Boolean
def dispatcher: MessageDispatcher
}
trait DefaultSystemMessageImpl { self: Mailbox
trait SystemMessageQueue {
def systemEnqueue(handle: SystemEnvelope): Unit
def systemDequeue(): SystemEnvelope
def hasSystemMessages: Boolean
}
trait DefaultSystemMessageQueue { self: SystemMessageQueue
val systemMessages = new ConcurrentLinkedQueue[SystemEnvelope]()
def systemEnqueue(handle: SystemEnvelope): Unit = systemMessages offer handle
def systemDequeue(): SystemEnvelope = systemMessages.poll()
def hasSystemMessages: Boolean = !systemMessages.isEmpty
}
trait UnboundedMessageQueueSemantics { self: QueueMailbox
val queue: Queue[Envelope]
trait UnboundedMessageQueueSemantics extends QueueBasedMessageQueue {
final def enqueue(handle: Envelope): Unit = queue add handle
final def dequeue(): Envelope = queue.poll()
}
trait BoundedMessageQueueSemantics { self: BlockingQueueMailbox
trait BoundedMessageQueueSemantics extends QueueBasedMessageQueue {
def pushTimeOut: Duration
override def queue: BlockingQueue[Envelope]
final def enqueue(handle: Envelope) {
if (pushTimeOut.length > 0) {
@ -125,20 +132,12 @@ trait BoundedMessageQueueSemantics { self: BlockingQueueMailbox ⇒
final def dequeue(): Envelope = queue.poll()
}
trait QueueMailbox extends Mailbox {
trait QueueBasedMessageQueue extends MessageQueue {
def queue: Queue[Envelope]
final def numberOfMessages = queue.size
final def hasMessages = !queue.isEmpty
}
abstract class NonblockingQueueMailbox(val dispatcher: MessageDispatcher) extends QueueMailbox {
def queue: Queue[Envelope]
}
abstract class BlockingQueueMailbox(val dispatcher: MessageDispatcher) extends QueueMailbox {
def queue: BlockingQueue[Envelope]
}
/**
* Mailbox configuration.
*/
@ -146,9 +145,13 @@ trait MailboxType {
def create(dispatcher: MessageDispatcher): Mailbox
}
/**
* It's a case class for Java (new UnboundedMailbox)
*/
case class UnboundedMailbox() extends MailboxType {
override def create(dispatcher: MessageDispatcher) = new NonblockingQueueMailbox(dispatcher) with UnboundedMessageQueueSemantics with DefaultSystemMessageImpl {
override def create(_dispatcher: MessageDispatcher) = new Mailbox with QueueBasedMessageQueue with UnboundedMessageQueueSemantics with DefaultSystemMessageQueue {
final val queue = new ConcurrentLinkedQueue[Envelope]()
final val dispatcher = _dispatcher
}
}
@ -159,15 +162,17 @@ case class BoundedMailbox(
if (capacity < 0) throw new IllegalArgumentException("The capacity for BoundedMailbox can not be negative")
if (pushTimeOut eq null) throw new IllegalArgumentException("The push time-out for BoundedMailbox can not be null")
override def create(dispatcher: MessageDispatcher) = new BlockingQueueMailbox(dispatcher) with BoundedMessageQueueSemantics with DefaultSystemMessageImpl {
override def create(_dispatcher: MessageDispatcher) = new Mailbox with QueueBasedMessageQueue with BoundedMessageQueueSemantics with DefaultSystemMessageQueue {
final val queue = new LinkedBlockingQueue[Envelope](capacity)
final val pushTimeOut = BoundedMailbox.this.pushTimeOut
final val dispatcher = _dispatcher
}
}
case class UnboundedPriorityMailbox(cmp: Comparator[Envelope]) extends MailboxType {
override def create(dispatcher: MessageDispatcher) = new BlockingQueueMailbox(dispatcher) with UnboundedMessageQueueSemantics with DefaultSystemMessageImpl {
override def create(_dispatcher: MessageDispatcher) = new Mailbox with QueueBasedMessageQueue with UnboundedMessageQueueSemantics with DefaultSystemMessageQueue {
val queue = new PriorityBlockingQueue[Envelope](11, cmp)
final val dispatcher = _dispatcher
}
}
@ -179,9 +184,10 @@ case class BoundedPriorityMailbox(
if (capacity < 0) throw new IllegalArgumentException("The capacity for BoundedMailbox can not be negative")
if (pushTimeOut eq null) throw new IllegalArgumentException("The push time-out for BoundedMailbox can not be null")
override def create(dispatcher: MessageDispatcher) = new BlockingQueueMailbox(dispatcher) with BoundedMessageQueueSemantics with DefaultSystemMessageImpl {
override def create(_dispatcher: MessageDispatcher) = new Mailbox with QueueBasedMessageQueue with BoundedMessageQueueSemantics with DefaultSystemMessageQueue {
final val queue = new BoundedBlockingQueue[Envelope](capacity, new PriorityQueue[Envelope](11, cmp))
final val pushTimeOut = BoundedPriorityMailbox.this.pushTimeOut
final val dispatcher = _dispatcher
}
}