2010-09-21 18:52:41 +02:00
|
|
|
/**
|
2011-07-14 16:03:08 +02:00
|
|
|
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
|
2010-09-21 18:52:41 +02:00
|
|
|
*/
|
|
|
|
|
|
2010-10-26 12:49:25 +02:00
|
|
|
package akka.dispatch
|
2010-09-21 18:52:41 +02:00
|
|
|
|
2010-10-26 12:49:25 +02:00
|
|
|
import akka.AkkaException
|
2010-09-21 18:52:41 +02:00
|
|
|
|
2011-05-18 17:25:30 +02:00
|
|
|
import java.util.{ Comparator, PriorityQueue }
|
2010-09-21 18:52:41 +02:00
|
|
|
import java.util.concurrent._
|
2010-10-26 12:49:25 +02:00
|
|
|
import akka.util._
|
2010-09-21 18:52:41 +02:00
|
|
|
|
2011-04-29 17:15:00 +02:00
|
|
|
class MessageQueueAppendFailedException(message: String, cause: Throwable = null) extends AkkaException(message, cause)
|
2010-09-21 18:52:41 +02:00
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
|
|
|
|
*/
|
|
|
|
|
trait MessageQueue {
|
2011-09-20 18:34:21 +02:00
|
|
|
val dispatcherLock = new SimpleLock(startLocked = false)
|
|
|
|
|
val suspended = new SimpleLock(startLocked = false) //(startLocked = true)
|
|
|
|
|
val systemMessages = new ConcurrentLinkedQueue[SystemMessageInvocation]()
|
|
|
|
|
|
2010-09-21 18:52:41 +02:00
|
|
|
def enqueue(handle: MessageInvocation)
|
|
|
|
|
def dequeue(): MessageInvocation
|
2011-09-20 18:34:21 +02:00
|
|
|
def systemEnqueue(handle: SystemMessageInvocation): Unit = systemMessages.offer(handle)
|
|
|
|
|
def systemDequeue(): SystemMessageInvocation = systemMessages.poll()
|
2010-09-21 18:52:41 +02:00
|
|
|
def size: Int
|
|
|
|
|
def isEmpty: Boolean
|
2011-09-20 18:34:21 +02:00
|
|
|
|
|
|
|
|
def processAllSystemMessages(): Unit = {
|
|
|
|
|
var nextMessage = systemDequeue()
|
|
|
|
|
while (nextMessage ne null) {
|
|
|
|
|
nextMessage.invoke()
|
|
|
|
|
nextMessage = systemDequeue()
|
|
|
|
|
}
|
|
|
|
|
}
|
2010-09-21 18:52:41 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Mailbox configuration.
|
|
|
|
|
*/
|
|
|
|
|
sealed trait MailboxType
|
|
|
|
|
|
2011-04-27 12:21:19 +02:00
|
|
|
case class UnboundedMailbox() extends MailboxType
|
2010-09-21 18:52:41 +02:00
|
|
|
case class BoundedMailbox(
|
2011-05-18 17:25:30 +02:00
|
|
|
val capacity: Int = { if (Dispatchers.MAILBOX_CAPACITY < 0) Int.MaxValue else Dispatchers.MAILBOX_CAPACITY },
|
2011-01-20 17:16:44 +01:00
|
|
|
val pushTimeOut: Duration = Dispatchers.MAILBOX_PUSH_TIME_OUT) extends MailboxType {
|
2011-05-18 17:25:30 +02:00
|
|
|
if (capacity < 0) throw new IllegalArgumentException("The capacity for BoundedMailbox can not be negative")
|
2010-09-21 18:52:41 +02:00
|
|
|
if (pushTimeOut eq null) throw new IllegalArgumentException("The push time-out for BoundedMailbox can not be null")
|
|
|
|
|
}
|
|
|
|
|
|
2011-05-18 17:25:30 +02:00
|
|
|
trait UnboundedMessageQueueSemantics extends MessageQueue { self: BlockingQueue[MessageInvocation] ⇒
|
|
|
|
|
@inline
|
|
|
|
|
final def enqueue(handle: MessageInvocation): Unit = this add handle
|
|
|
|
|
@inline
|
|
|
|
|
final def dequeue(): MessageInvocation = this.poll()
|
2010-09-21 18:52:41 +02:00
|
|
|
}
|
|
|
|
|
|
2011-05-18 17:25:30 +02:00
|
|
|
trait BoundedMessageQueueSemantics extends MessageQueue { self: BlockingQueue[MessageInvocation] ⇒
|
2011-03-09 18:11:45 +01:00
|
|
|
def pushTimeOut: Duration
|
2010-09-21 18:52:41 +02:00
|
|
|
|
|
|
|
|
final def enqueue(handle: MessageInvocation) {
|
2011-04-27 12:21:19 +02:00
|
|
|
if (pushTimeOut.length > 0) {
|
|
|
|
|
this.offer(handle, pushTimeOut.length, pushTimeOut.unit) || {
|
2011-05-18 17:25:30 +02:00
|
|
|
throw new MessageQueueAppendFailedException("Couldn't enqueue message " + handle + " to " + toString)
|
|
|
|
|
}
|
2010-09-21 18:52:41 +02:00
|
|
|
} else this put handle
|
|
|
|
|
}
|
|
|
|
|
|
2011-05-18 17:25:30 +02:00
|
|
|
@inline
|
|
|
|
|
final def dequeue(): MessageInvocation = this.poll()
|
2011-03-09 18:11:45 +01:00
|
|
|
}
|
|
|
|
|
|
2011-05-18 17:25:30 +02:00
|
|
|
class DefaultUnboundedMessageQueue extends LinkedBlockingQueue[MessageInvocation] with UnboundedMessageQueueSemantics
|
2011-03-09 18:11:45 +01:00
|
|
|
|
2011-05-18 17:25:30 +02:00
|
|
|
class DefaultBoundedMessageQueue(capacity: Int, val pushTimeOut: Duration) extends LinkedBlockingQueue[MessageInvocation](capacity) with BoundedMessageQueueSemantics
|
2011-03-09 18:11:45 +01:00
|
|
|
|
2011-05-18 17:25:30 +02:00
|
|
|
class UnboundedPriorityMessageQueue(cmp: Comparator[MessageInvocation]) extends PriorityBlockingQueue[MessageInvocation](11, cmp) with UnboundedMessageQueueSemantics
|
2011-03-09 18:11:45 +01:00
|
|
|
|
2011-05-18 17:25:30 +02:00
|
|
|
class BoundedPriorityMessageQueue(capacity: Int, val pushTimeOut: Duration, cmp: Comparator[MessageInvocation]) extends BoundedBlockingQueue[MessageInvocation](capacity, new PriorityQueue[MessageInvocation](11, cmp)) with BoundedMessageQueueSemantics
|