Saving the planet and shufflin'

This commit is contained in:
Viktor Klang 2012-05-18 13:37:26 +02:00
parent 72f12c89cd
commit 5eba9fceef
10 changed files with 177 additions and 148 deletions

View file

@ -16,7 +16,10 @@ import akka.actor.ActorSystem
class MessageQueueAppendFailedException(message: String, cause: Throwable = null) extends AkkaException(message, cause)
object Mailbox {
/**
* INTERNAL API
*/
private[akka] object Mailbox {
type Status = Int
@ -40,6 +43,7 @@ object Mailbox {
* Mailbox and InternalMailbox is separated in two classes because ActorCell is needed for implementation,
* but can't be exposed to user defined mailbox subclasses.
*
* INTERNAL API
*/
private[akka] abstract class Mailbox(val actor: ActorCell, val messageQueue: MessageQueue)
extends SystemMessageQueue with Runnable {
@ -244,6 +248,10 @@ private[akka] abstract class Mailbox(val actor: ActorCell, val messageQueue: Mes
}
}
/**
* A MessageQueue is the user-message "lane" of an Akka Mailbox.
* It needs to atleast support N producers and 1 consumer thread-safely.
*/
trait MessageQueue {
/**
* Try to enqueue the message to this queue, or throw an exception.
@ -325,6 +333,9 @@ private[akka] trait DefaultSystemMessageQueue { self: Mailbox ⇒
def hasSystemMessages: Boolean = systemQueueGet ne null
}
/**
* A QueueBasedMessageQueue is a MessageQueue backed by a java.util.Queue
*/
trait QueueBasedMessageQueue extends MessageQueue {
def queue: Queue[Envelope]
def numberOfMessages = queue.size
@ -340,11 +351,19 @@ trait QueueBasedMessageQueue extends MessageQueue {
}
}
/**
* UnboundedMessageQueueSemantics adds unbounded semantics to a QueueBasedMessageQueue,
* i.e. a non-blocking enqueue and dequeue.
*/
trait UnboundedMessageQueueSemantics extends QueueBasedMessageQueue {
def enqueue(receiver: ActorRef, handle: Envelope): Unit = queue add handle
def dequeue(): Envelope = queue.poll()
}
/**
* BoundedMessageQueueSemantics adds bounded semantics to a QueueBasedMessageQueue,
* i.e. blocking enqueue with timeout
*/
trait BoundedMessageQueueSemantics extends QueueBasedMessageQueue {
def pushTimeOut: Duration
override def queue: BlockingQueue[Envelope]
@ -360,17 +379,28 @@ trait BoundedMessageQueueSemantics extends QueueBasedMessageQueue {
def dequeue(): Envelope = queue.poll()
}
/**
* DequeBasedMessageQueue refines QueueBasedMessageQueue to be backed by a java.util.Deque
*/
trait DequeBasedMessageQueue extends QueueBasedMessageQueue {
def queue: Deque[Envelope]
def enqueueFirst(receiver: ActorRef, handle: Envelope): Unit
}
/**
* UnboundedDequeBasedMessageQueueSemantics adds unbounded semantics to a DequeBasedMessageQueue,
* i.e. a non-blocking enqueue and dequeue.
*/
trait UnboundedDequeBasedMessageQueueSemantics extends DequeBasedMessageQueue {
def enqueue(receiver: ActorRef, handle: Envelope): Unit = queue add handle
def enqueueFirst(receiver: ActorRef, handle: Envelope): Unit = queue addFirst handle
def dequeue(): Envelope = queue.poll()
}
/**
* BoundedMessageQueueSemantics adds bounded semantics to a DequeBasedMessageQueue,
* i.e. blocking enqueue with timeout
*/
trait BoundedDequeBasedMessageQueueSemantics extends DequeBasedMessageQueue {
def pushTimeOut: Duration
override def queue: BlockingDeque[Envelope]
@ -393,14 +423,14 @@ trait BoundedDequeBasedMessageQueueSemantics extends DequeBasedMessageQueue {
}
/**
* Mailbox configuration.
* MailboxType is a factory to create MessageQueues for an optionally provided ActorContext
*/
trait MailboxType {
def create(owner: Option[ActorContext]): MessageQueue
}
/**
* It's a case class for Java (new UnboundedMailbox)
* UnboundedMailbox is the default unbounded MailboxType used by Akka Actors.
*/
case class UnboundedMailbox() extends MailboxType {
@ -412,6 +442,9 @@ case class UnboundedMailbox() extends MailboxType {
}
}
/**
* BoundedMailbox is the default bounded MailboxType used by Akka Actors.
*/
case class BoundedMailbox( final val capacity: Int, final val pushTimeOut: Duration) extends MailboxType {
def this(settings: ActorSystem.Settings, config: Config) = this(config.getInt("mailbox-capacity"),
@ -428,17 +461,20 @@ case class BoundedMailbox( final val capacity: Int, final val pushTimeOut: Durat
}
/**
* Extend me to provide the comparator
* UnboundedPriorityMailbox is an unbounded mailbox that allows for priorization of its contents.
* Extend this class and provide the Comparator in the constructor.
*/
class UnboundedPriorityMailbox( final val cmp: Comparator[Envelope]) extends MailboxType {
class UnboundedPriorityMailbox( final val cmp: Comparator[Envelope], final val initialCapacity: Int) extends MailboxType {
def this(cmp: Comparator[Envelope]) = this(cmp, 11)
final override def create(owner: Option[ActorContext]): MessageQueue =
new PriorityBlockingQueue[Envelope](11, cmp) with QueueBasedMessageQueue with UnboundedMessageQueueSemantics {
new PriorityBlockingQueue[Envelope](initialCapacity, cmp) with QueueBasedMessageQueue with UnboundedMessageQueueSemantics {
final def queue: Queue[Envelope] = this
}
}
/**
* Extend me to provide the comparator
* BoundedPriorityMailbox is a bounded mailbox that allows for priorization of its contents.
* Extend this class and provide the Comparator in the constructor.
*/
class BoundedPriorityMailbox( final val cmp: Comparator[Envelope], final val capacity: Int, final val pushTimeOut: Duration) extends MailboxType {
@ -452,6 +488,9 @@ class BoundedPriorityMailbox( final val cmp: Comparator[Envelope], final val cap
}
}
/**
* UnboundedDequeBasedMailbox is an unbounded MailboxType, backed by a Deque.
*/
case class UnboundedDequeBasedMailbox() extends MailboxType {
def this(settings: ActorSystem.Settings, config: Config) = this()
@ -462,6 +501,9 @@ case class UnboundedDequeBasedMailbox() extends MailboxType {
}
}
/**
* BoundedDequeBasedMailbox is an bounded MailboxType, backed by a Deque.
*/
case class BoundedDequeBasedMailbox( final val capacity: Int, final val pushTimeOut: Duration) extends MailboxType {
def this(settings: ActorSystem.Settings, config: Config) = this(config.getInt("mailbox-capacity"),