Merge pull request #2050 from drexin/wip-3246-priority-mailbox-drexin
+act #3246 Added control aware mailbox types
This commit is contained in:
commit
ac5f4fc72e
8 changed files with 414 additions and 0 deletions
|
|
@ -16,6 +16,9 @@ import scala.concurrent.duration.FiniteDuration
|
|||
import scala.annotation.tailrec
|
||||
import scala.util.control.NonFatal
|
||||
import com.typesafe.config.Config
|
||||
import java.util.concurrent.atomic.AtomicInteger
|
||||
import java.util.concurrent.locks.ReentrantLock
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
*/
|
||||
|
|
@ -676,6 +679,153 @@ object BoundedDequeBasedMailbox {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* ControlAwareMessageQueue handles messages that extend [[akka.dispatch.ControlMessage]] with priority.
|
||||
*/
|
||||
trait ControlAwareMessageQueueSemantics extends QueueBasedMessageQueue {
|
||||
def controlQueue: Queue[Envelope]
|
||||
def queue: Queue[Envelope]
|
||||
|
||||
def enqueue(receiver: ActorRef, handle: Envelope): Unit = handle match {
|
||||
case envelope @ Envelope(_: ControlMessage, _) ⇒ controlQueue add envelope
|
||||
case envelope ⇒ queue add envelope
|
||||
}
|
||||
|
||||
def dequeue(): Envelope = {
|
||||
val controlMsg = controlQueue.poll()
|
||||
|
||||
if (controlMsg ne null) controlMsg
|
||||
else queue.poll()
|
||||
}
|
||||
|
||||
override def numberOfMessages: Int = controlQueue.size() + queue.size()
|
||||
|
||||
override def hasMessages: Boolean = !(queue.isEmpty && controlQueue.isEmpty)
|
||||
}
|
||||
|
||||
trait UnboundedControlAwareMessageQueueSemantics extends UnboundedMessageQueueSemantics with ControlAwareMessageQueueSemantics
|
||||
trait BoundedControlAwareMessageQueueSemantics extends BoundedMessageQueueSemantics with ControlAwareMessageQueueSemantics
|
||||
|
||||
/**
|
||||
* Messages that extend this trait will be handled with priority by control aware mailboxes.
|
||||
*/
|
||||
trait ControlMessage
|
||||
|
||||
/**
|
||||
* UnboundedControlAwareMailbox is an unbounded MailboxType, that maintains two queues
|
||||
* to allow messages that extend [[akka.dispatch.ControlMessage]] to be delivered with priority.
|
||||
*/
|
||||
final case class UnboundedControlAwareMailbox() extends MailboxType with ProducesMessageQueue[UnboundedControlAwareMailbox.MessageQueue] {
|
||||
|
||||
// this constructor will be called via reflection when this mailbox type
|
||||
// is used in the application config
|
||||
def this(settings: ActorSystem.Settings, config: Config) = this()
|
||||
|
||||
def create(owner: Option[ActorRef], system: Option[ActorSystem]): MessageQueue = new UnboundedControlAwareMailbox.MessageQueue
|
||||
}
|
||||
|
||||
object UnboundedControlAwareMailbox {
|
||||
class MessageQueue extends ControlAwareMessageQueueSemantics with UnboundedMessageQueueSemantics {
|
||||
val controlQueue: Queue[Envelope] = new ConcurrentLinkedQueue[Envelope]()
|
||||
val queue: Queue[Envelope] = new ConcurrentLinkedQueue[Envelope]()
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* BoundedControlAwareMailbox is a bounded MailboxType, that maintains two queues
|
||||
* to allow messages that extend [[akka.dispatch.ControlMessage]] to be delivered with priority.
|
||||
*/
|
||||
final case class BoundedControlAwareMailbox(capacity: Int, pushTimeOut: FiniteDuration) extends MailboxType with ProducesMessageQueue[BoundedControlAwareMailbox.MessageQueue] {
|
||||
def this(settings: ActorSystem.Settings, config: Config) = this(config.getInt("mailbox-capacity"),
|
||||
config.getNanosDuration("mailbox-push-timeout-time"))
|
||||
|
||||
def create(owner: Option[ActorRef], system: Option[ActorSystem]): MessageQueue = new BoundedControlAwareMailbox.MessageQueue(capacity, pushTimeOut)
|
||||
}
|
||||
|
||||
object BoundedControlAwareMailbox {
|
||||
class MessageQueue(val capacity: Int, val pushTimeOut: FiniteDuration) extends BoundedControlAwareMessageQueueSemantics {
|
||||
|
||||
private final val size = new AtomicInteger(0)
|
||||
private final val putLock = new ReentrantLock()
|
||||
private final val notFull = putLock.newCondition()
|
||||
|
||||
// no need to use blocking queues here, as blocking is being handled in `enqueueWithTimeout`
|
||||
val controlQueue = new ConcurrentLinkedQueue[Envelope]()
|
||||
val queue = new ConcurrentLinkedQueue[Envelope]()
|
||||
|
||||
override def enqueue(receiver: ActorRef, handle: Envelope): Unit = handle match {
|
||||
case envelope @ Envelope(_: ControlMessage, _) ⇒ enqueueWithTimeout(controlQueue, receiver, envelope)
|
||||
case envelope ⇒ enqueueWithTimeout(queue, receiver, envelope)
|
||||
}
|
||||
|
||||
override def numberOfMessages: Int = size.get()
|
||||
override def hasMessages: Boolean = numberOfMessages > 0
|
||||
|
||||
@tailrec
|
||||
final override def dequeue(): Envelope = {
|
||||
val count = size.get()
|
||||
|
||||
// if both queues are empty return null
|
||||
if (count > 0) {
|
||||
// if there are messages try to fetch the current head
|
||||
// or retry if other consumer dequeued in the mean time
|
||||
if (size.compareAndSet(count, count - 1)) {
|
||||
val item = super.dequeue()
|
||||
|
||||
if (size.get < capacity) signalNotFull()
|
||||
|
||||
item
|
||||
} else {
|
||||
dequeue()
|
||||
}
|
||||
} else {
|
||||
null
|
||||
}
|
||||
}
|
||||
|
||||
private def signalNotFull() {
|
||||
putLock.lock()
|
||||
|
||||
try {
|
||||
notFull.signal()
|
||||
} finally {
|
||||
putLock.unlock()
|
||||
}
|
||||
}
|
||||
|
||||
private final def enqueueWithTimeout(q: Queue[Envelope], receiver: ActorRef, envelope: Envelope) {
|
||||
var remaining = pushTimeOut.toNanos
|
||||
|
||||
putLock.lockInterruptibly()
|
||||
val inserted = try {
|
||||
var stop = false
|
||||
while (size.get() == capacity && !stop) {
|
||||
remaining = notFull.awaitNanos(remaining)
|
||||
stop = remaining <= 0
|
||||
}
|
||||
|
||||
if (stop) {
|
||||
false
|
||||
} else {
|
||||
q.add(envelope)
|
||||
val c = size.incrementAndGet()
|
||||
|
||||
if (c < capacity) notFull.signal()
|
||||
|
||||
true
|
||||
}
|
||||
} finally {
|
||||
putLock.unlock()
|
||||
}
|
||||
|
||||
if (!inserted) {
|
||||
receiver.asInstanceOf[InternalActorRef].provider.deadLetters.tell(
|
||||
DeadLetter(envelope.message, envelope.sender, receiver), envelope.sender)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Trait to signal that an Actor requires a certain type of message queue semantics.
|
||||
*
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue