Introduce stable priority mailboxes.

Similar to existing priority mailboxes, but these preserve FIFO ordering
for messages of equal priority.
This commit is contained in:
dch 2015-01-10 14:44:25 +00:00
parent c56d670c03
commit 8df81e6b72
8 changed files with 289 additions and 13 deletions

View file

@ -8,7 +8,7 @@ import java.util.concurrent._
import akka.AkkaException
import akka.dispatch.sysmsg._
import akka.actor.{ ActorCell, ActorRef, Cell, ActorSystem, InternalActorRef, DeadLetter }
import akka.util.{ Unsafe, BoundedBlockingQueue }
import akka.util.{ BoundedBlockingQueue, StablePriorityBlockingQueue, StablePriorityQueue, Unsafe }
import akka.util.Helpers.ConfigOps
import akka.event.Logging.Error
import scala.concurrent.duration.Duration
@ -696,6 +696,48 @@ object BoundedPriorityMailbox {
}
}
/**
* UnboundedStablePriorityMailbox is an unbounded mailbox that allows for prioritization of its contents. Unlike the
* [[UnboundedPriorityMailbox]] it preserves ordering for messages of equal priority.
* Extend this class and provide the Comparator in the constructor.
*/
class UnboundedStablePriorityMailbox(val cmp: Comparator[Envelope], val initialCapacity: Int)
extends MailboxType with ProducesMessageQueue[UnboundedStablePriorityMailbox.MessageQueue] {
def this(cmp: Comparator[Envelope]) = this(cmp, 11)
final override def create(owner: Option[ActorRef], system: Option[ActorSystem]): MessageQueue =
new UnboundedStablePriorityMailbox.MessageQueue(initialCapacity, cmp)
}
object UnboundedStablePriorityMailbox {
class MessageQueue(initialCapacity: Int, cmp: Comparator[Envelope])
extends StablePriorityBlockingQueue[Envelope](initialCapacity, cmp) with UnboundedQueueBasedMessageQueue {
final def queue: Queue[Envelope] = this
}
}
/**
* BoundedStablePriorityMailbox is a bounded mailbox that allows for prioritization of its contents. Unlike the
* [[BoundedPriorityMailbox]] it preserves ordering for messages of equal priority.
* Extend this class and provide the Comparator in the constructor.
*/
class BoundedStablePriorityMailbox( final val cmp: Comparator[Envelope], final val capacity: Int, final val pushTimeOut: Duration)
extends MailboxType with ProducesMessageQueue[BoundedStablePriorityMailbox.MessageQueue] {
if (capacity < 0) throw new IllegalArgumentException("The capacity for BoundedMailbox can not be negative")
if (pushTimeOut eq null) throw new IllegalArgumentException("The push time-out for BoundedMailbox can not be null")
final override def create(owner: Option[ActorRef], system: Option[ActorSystem]): MessageQueue =
new BoundedStablePriorityMailbox.MessageQueue(capacity, cmp, pushTimeOut)
}
object BoundedStablePriorityMailbox {
class MessageQueue(capacity: Int, cmp: Comparator[Envelope], val pushTimeOut: Duration)
extends BoundedBlockingQueue[Envelope](capacity, new StablePriorityQueue[Envelope](11, cmp))
with BoundedQueueBasedMessageQueue {
final def queue: BlockingQueue[Envelope] = this
}
}
/**
* UnboundedDequeBasedMailbox is an unbounded MailboxType, backed by a Deque.
*/