+act #3246 Added control aware mailbox types

This commit is contained in:
Dario Rexin 2014-03-11 17:03:05 +01:00
parent dfef14a590
commit c3950a7525
8 changed files with 414 additions and 0 deletions

View file

@ -16,6 +16,9 @@ import scala.concurrent.duration.FiniteDuration
import scala.annotation.tailrec
import scala.util.control.NonFatal
import com.typesafe.config.Config
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.locks.ReentrantLock
/**
* INTERNAL API
*/
@ -676,6 +679,153 @@ object BoundedDequeBasedMailbox {
}
}
/**
* ControlAwareMessageQueue handles messages that extend [[akka.dispatch.ControlMessage]] with priority.
*/
trait ControlAwareMessageQueueSemantics extends QueueBasedMessageQueue {
def controlQueue: Queue[Envelope]
def queue: Queue[Envelope]
def enqueue(receiver: ActorRef, handle: Envelope): Unit = handle match {
case envelope @ Envelope(_: ControlMessage, _) controlQueue add envelope
case envelope queue add envelope
}
def dequeue(): Envelope = {
val controlMsg = controlQueue.poll()
if (controlMsg ne null) controlMsg
else queue.poll()
}
override def numberOfMessages: Int = controlQueue.size() + queue.size()
override def hasMessages: Boolean = !(queue.isEmpty && controlQueue.isEmpty)
}
trait UnboundedControlAwareMessageQueueSemantics extends UnboundedMessageQueueSemantics with ControlAwareMessageQueueSemantics
trait BoundedControlAwareMessageQueueSemantics extends BoundedMessageQueueSemantics with ControlAwareMessageQueueSemantics
/**
* Messages that extend this trait will be handled with priority by control aware mailboxes.
*/
trait ControlMessage
/**
* UnboundedControlAwareMailbox is an unbounded MailboxType, that maintains two queues
* to allow messages that extend [[akka.dispatch.ControlMessage]] to be delivered with priority.
*/
final case class UnboundedControlAwareMailbox() extends MailboxType with ProducesMessageQueue[UnboundedControlAwareMailbox.MessageQueue] {
// this constructor will be called via reflection when this mailbox type
// is used in the application config
def this(settings: ActorSystem.Settings, config: Config) = this()
def create(owner: Option[ActorRef], system: Option[ActorSystem]): MessageQueue = new UnboundedControlAwareMailbox.MessageQueue
}
object UnboundedControlAwareMailbox {
class MessageQueue extends ControlAwareMessageQueueSemantics with UnboundedMessageQueueSemantics {
val controlQueue: Queue[Envelope] = new ConcurrentLinkedQueue[Envelope]()
val queue: Queue[Envelope] = new ConcurrentLinkedQueue[Envelope]()
}
}
/**
* BoundedControlAwareMailbox is a bounded MailboxType, that maintains two queues
* to allow messages that extend [[akka.dispatch.ControlMessage]] to be delivered with priority.
*/
final case class BoundedControlAwareMailbox(capacity: Int, pushTimeOut: FiniteDuration) extends MailboxType with ProducesMessageQueue[BoundedControlAwareMailbox.MessageQueue] {
def this(settings: ActorSystem.Settings, config: Config) = this(config.getInt("mailbox-capacity"),
config.getNanosDuration("mailbox-push-timeout-time"))
def create(owner: Option[ActorRef], system: Option[ActorSystem]): MessageQueue = new BoundedControlAwareMailbox.MessageQueue(capacity, pushTimeOut)
}
object BoundedControlAwareMailbox {
class MessageQueue(val capacity: Int, val pushTimeOut: FiniteDuration) extends BoundedControlAwareMessageQueueSemantics {
private final val size = new AtomicInteger(0)
private final val putLock = new ReentrantLock()
private final val notFull = putLock.newCondition()
// no need to use blocking queues here, as blocking is being handled in `enqueueWithTimeout`
val controlQueue = new ConcurrentLinkedQueue[Envelope]()
val queue = new ConcurrentLinkedQueue[Envelope]()
override def enqueue(receiver: ActorRef, handle: Envelope): Unit = handle match {
case envelope @ Envelope(_: ControlMessage, _) enqueueWithTimeout(controlQueue, receiver, envelope)
case envelope enqueueWithTimeout(queue, receiver, envelope)
}
override def numberOfMessages: Int = size.get()
override def hasMessages: Boolean = numberOfMessages > 0
@tailrec
final override def dequeue(): Envelope = {
val count = size.get()
// if both queues are empty return null
if (count > 0) {
// if there are messages try to fetch the current head
// or retry if other consumer dequeued in the mean time
if (size.compareAndSet(count, count - 1)) {
val item = super.dequeue()
if (size.get < capacity) signalNotFull()
item
} else {
dequeue()
}
} else {
null
}
}
private def signalNotFull() {
putLock.lock()
try {
notFull.signal()
} finally {
putLock.unlock()
}
}
private final def enqueueWithTimeout(q: Queue[Envelope], receiver: ActorRef, envelope: Envelope) {
var remaining = pushTimeOut.toNanos
putLock.lockInterruptibly()
val inserted = try {
var stop = false
while (size.get() == capacity && !stop) {
remaining = notFull.awaitNanos(remaining)
stop = remaining <= 0
}
if (stop) {
false
} else {
q.add(envelope)
val c = size.incrementAndGet()
if (c < capacity) notFull.signal()
true
}
} finally {
putLock.unlock()
}
if (!inserted) {
receiver.asInstanceOf[InternalActorRef].provider.deadLetters.tell(
DeadLetter(envelope.message, envelope.sender, receiver), envelope.sender)
}
}
}
}
/**
* Trait to signal that an Actor requires a certain type of message queue semantics.
*