Add bounded deque-based mailbox

This commit is contained in:
phaller 2012-02-08 18:26:54 +01:00
parent ab60681a17
commit 4f43141c50
2 changed files with 92 additions and 9 deletions

View file

@ -318,8 +318,77 @@ trait QueueBasedMessageQueue extends MessageQueue {
final def hasMessages = !queue.isEmpty
}
trait UnboundedDequeBasedMessageQueueSemantics extends DequeBasedMessageQueue {
final def enqueue(receiver: ActorRef, handle: Envelope): Unit = queue add handle
final def enqueueFirst(receiver: ActorRef, handle: Envelope): Unit = queue addFirst handle
final def enqueueAllFirst(receiver: ActorRef, handleIterator: Iterator[Envelope], size: Int) =
handleIterator foreach { enqueueFirst(receiver, _) }
final def dequeue(): Envelope = queue.poll()
}
trait BoundedDequeBasedMessageQueueSemantics extends DequeBasedMessageQueue {
import java.util.concurrent.locks.ReentrantLock
/* used to enable atomic all-or-nothing enqueueAllFirst in the presence of potential
* capacity violations
*/
private val lock = new ReentrantLock(false)
def pushTimeOut: Duration
override def queue: BlockingDeque[Envelope]
final def enqueue(receiver: ActorRef, handle: Envelope) {
lock.lock()
try {
if (pushTimeOut.length > 0) {
queue.offer(handle, pushTimeOut.length, pushTimeOut.unit) || {
throw new MessageQueueAppendFailedException("Couldn't enqueue message " + handle + " to " + receiver)
}
} else queue put handle
} finally {
lock.unlock()
}
}
final def enqueueFirst(receiver: ActorRef, handle: Envelope) {
lock.lock()
try {
if (pushTimeOut.length > 0) {
queue.offerFirst(handle, pushTimeOut.length, pushTimeOut.unit) || {
throw new MessageQueueAppendFailedException("Couldn't enqueue message " + handle + " to " + receiver)
}
} else queue putFirst handle
} finally {
lock.unlock()
}
}
final def enqueueAllFirst(receiver: ActorRef, handleIterator: Iterator[Envelope], size: Int) {
lock.lock()
try {
handleIterator foreach { enqueueFirst(receiver, _) }
} finally {
lock.unlock()
}
}
final def dequeue(): Envelope = {
lock.lock()
try {
queue.poll()
} finally {
lock.unlock()
}
}
}
trait DequeBasedMessageQueue extends QueueBasedMessageQueue {
def queue: Deque[Envelope]
def enqueueFirst(receiver: ActorRef, handle: Envelope): Unit
def enqueueAllFirst(receiver: ActorRef, handleIterator: Iterator[Envelope], size: Int): Unit
}
/**
@ -339,13 +408,6 @@ case class UnboundedMailbox() extends MailboxType {
}
}
case class UnboundedDequeBasedMailbox(config: Config) extends MailboxType {
final override def create(receiver: ActorContext): Mailbox =
new Mailbox(receiver.asInstanceOf[ActorCell]) with DequeBasedMessageQueue with UnboundedMessageQueueSemantics with DefaultSystemMessageQueue {
final val queue = new LinkedBlockingDeque[Envelope]()
}
}
case class BoundedMailbox( final val capacity: Int, final val pushTimeOut: Duration) extends MailboxType {
if (capacity < 0) throw new IllegalArgumentException("The capacity for BoundedMailbox can not be negative")
@ -377,3 +439,21 @@ case class BoundedPriorityMailbox( final val cmp: Comparator[Envelope], final va
}
}
case class UnboundedDequeBasedMailbox(config: Config) extends MailboxType {
final override def create(receiver: ActorContext): Mailbox =
new Mailbox(receiver.asInstanceOf[ActorCell]) with DequeBasedMessageQueue with UnboundedDequeBasedMessageQueueSemantics with DefaultSystemMessageQueue {
final val queue = new LinkedBlockingDeque[Envelope]()
}
}
case class BoundedDequeBasedMailbox( final val capacity: Int, final val pushTimeOut: Duration) extends MailboxType {
if (capacity < 0) throw new IllegalArgumentException("The capacity for BoundedMailbox can not be negative")
if (pushTimeOut eq null) throw new IllegalArgumentException("The push time-out for BoundedMailbox can not be null")
final override def create(receiver: ActorContext): Mailbox =
new Mailbox(receiver.asInstanceOf[ActorCell]) with DequeBasedMessageQueue with BoundedDequeBasedMessageQueueSemantics with DefaultSystemMessageQueue {
final val queue = new LinkedBlockingDeque[Envelope](capacity)
final val pushTimeOut = BoundedDequeBasedMailbox.this.pushTimeOut
}
}