Merge pull request #16634 from dimbleby/stable-priority-mailbox

Introduce stable priority mailboxes.
This commit is contained in:
Roland Kuhn 2015-01-28 15:10:24 +01:00
commit 50d1569f37
8 changed files with 289 additions and 13 deletions

View file

@ -195,6 +195,15 @@ class PriorityMailboxSpec extends MailboxSpec {
}
}
class StablePriorityMailboxSpec extends MailboxSpec {
val comparator = PriorityGenerator(_.##)
lazy val name = "The stable priority mailbox implementation"
def factory = {
case UnboundedMailbox() new UnboundedStablePriorityMailbox(comparator).create(None, None)
case BoundedMailbox(capacity, pushTimeOut) new BoundedStablePriorityMailbox(comparator, capacity, pushTimeOut).create(None, None)
}
}
class ControlAwareMailboxSpec extends MailboxSpec {
lazy val name = "The control aware mailbox implementation"
def factory = {

View file

@ -0,0 +1,88 @@
/**
* Copyright (C) 2015 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.dispatch
import language.postfixOps
import com.typesafe.config.Config
import akka.actor.{ Props, ActorSystem, Actor }
import akka.testkit.{ DefaultTimeout, AkkaSpec }
import scala.concurrent.duration._
object StablePriorityDispatcherSpec {
val config = """
unbounded-stable-prio-dispatcher {
mailbox-type = "akka.dispatch.StablePriorityDispatcherSpec$Unbounded"
}
bounded-stable-prio-dispatcher {
mailbox-type = "akka.dispatch.StablePriorityDispatcherSpec$Bounded"
}
"""
class Unbounded(settings: ActorSystem.Settings, config: Config) extends UnboundedStablePriorityMailbox(PriorityGenerator({
case i: Int if i <= 100 i // Small integers have high priority
case i: Int 101 // Don't care for other integers
case 'Result Int.MaxValue
}: Any Int))
class Bounded(settings: ActorSystem.Settings, config: Config) extends BoundedStablePriorityMailbox(PriorityGenerator({
case i: Int if i <= 100 i // Small integers have high priority
case i: Int 101 // Don't care for other integers
case 'Result Int.MaxValue
}: Any Int), 1000, 10 seconds)
}
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class StablePriorityDispatcherSpec extends AkkaSpec(StablePriorityDispatcherSpec.config) with DefaultTimeout {
"A StablePriorityDispatcher" must {
"Order its messages according to the specified comparator while preserving FIFO for equal priority messages, " +
"using an unbounded mailbox" in {
val dispatcherKey = "unbounded-stable-prio-dispatcher"
testOrdering(dispatcherKey)
}
"Order its messages according to the specified comparator while preserving FIFO for equal priority messages, " +
"using a bounded mailbox" in {
val dispatcherKey = "bounded-stable-prio-dispatcher"
testOrdering(dispatcherKey)
}
def testOrdering(dispatcherKey: String) {
val msgs = (1 to 200) toList
val shuffled = scala.util.Random.shuffle(msgs)
// It's important that the actor under test is not a top level actor
// with RepointableActorRef, since messages might be queued in
// UnstartedCell and then sent to the StablePriorityQueue and consumed immediately
// without the ordering taking place.
val actor = system.actorOf(Props(new Actor {
context.actorOf(Props(new Actor {
val acc = scala.collection.mutable.ListBuffer[Int]()
shuffled foreach { m self ! m }
self.tell('Result, testActor)
def receive = {
case i: Int acc += i
case 'Result sender() ! acc.toList
}
}).withDispatcher(dispatcherKey))
def receive = Actor.emptyBehavior
}))
// Low messages should come out first, and in priority order. High messages follow - they are equal priority and
// should come out in the same order in which they were sent.
val lo = (1 to 100) toList
val hi = shuffled filter { _ > 100 }
expectMsgType[List[_]] should be(lo ++ hi)
}
}
}

View file

@ -8,7 +8,7 @@ import java.util.concurrent._
import akka.AkkaException
import akka.dispatch.sysmsg._
import akka.actor.{ ActorCell, ActorRef, Cell, ActorSystem, InternalActorRef, DeadLetter }
import akka.util.{ Unsafe, BoundedBlockingQueue }
import akka.util.{ BoundedBlockingQueue, StablePriorityBlockingQueue, StablePriorityQueue, Unsafe }
import akka.util.Helpers.ConfigOps
import akka.event.Logging.Error
import scala.concurrent.duration.Duration
@ -696,6 +696,48 @@ object BoundedPriorityMailbox {
}
}
/**
* UnboundedStablePriorityMailbox is an unbounded mailbox that allows for prioritization of its contents. Unlike the
* [[UnboundedPriorityMailbox]] it preserves ordering for messages of equal priority.
* Extend this class and provide the Comparator in the constructor.
*/
class UnboundedStablePriorityMailbox(val cmp: Comparator[Envelope], val initialCapacity: Int)
extends MailboxType with ProducesMessageQueue[UnboundedStablePriorityMailbox.MessageQueue] {
def this(cmp: Comparator[Envelope]) = this(cmp, 11)
final override def create(owner: Option[ActorRef], system: Option[ActorSystem]): MessageQueue =
new UnboundedStablePriorityMailbox.MessageQueue(initialCapacity, cmp)
}
object UnboundedStablePriorityMailbox {
class MessageQueue(initialCapacity: Int, cmp: Comparator[Envelope])
extends StablePriorityBlockingQueue[Envelope](initialCapacity, cmp) with UnboundedQueueBasedMessageQueue {
final def queue: Queue[Envelope] = this
}
}
/**
* BoundedStablePriorityMailbox is a bounded mailbox that allows for prioritization of its contents. Unlike the
* [[BoundedPriorityMailbox]] it preserves ordering for messages of equal priority.
* Extend this class and provide the Comparator in the constructor.
*/
class BoundedStablePriorityMailbox( final val cmp: Comparator[Envelope], final val capacity: Int, final val pushTimeOut: Duration)
extends MailboxType with ProducesMessageQueue[BoundedStablePriorityMailbox.MessageQueue] {
if (capacity < 0) throw new IllegalArgumentException("The capacity for BoundedMailbox can not be negative")
if (pushTimeOut eq null) throw new IllegalArgumentException("The push time-out for BoundedMailbox can not be null")
final override def create(owner: Option[ActorRef], system: Option[ActorSystem]): MessageQueue =
new BoundedStablePriorityMailbox.MessageQueue(capacity, cmp, pushTimeOut)
}
object BoundedStablePriorityMailbox {
class MessageQueue(capacity: Int, cmp: Comparator[Envelope], val pushTimeOut: Duration)
extends BoundedBlockingQueue[Envelope](capacity, new StablePriorityQueue[Envelope](11, cmp))
with BoundedQueueBasedMessageQueue {
final def queue: BlockingQueue[Envelope] = this
}
}
/**
* UnboundedDequeBasedMailbox is an unbounded MailboxType, backed by a Deque.
*/

View file

@ -0,0 +1,81 @@
/**
* Copyright (C) 2015 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.util
import java.util.concurrent.PriorityBlockingQueue
import java.util.concurrent.atomic.AtomicLong
import java.util.{ AbstractQueue, Comparator, Iterator, PriorityQueue }
/**
* PriorityQueueStabilizer wraps a priority queue so that it respects FIFO for elements of equal priority.
* @tparam E - The type of the elements of this Queue
*/
trait PriorityQueueStabilizer[E <: AnyRef] extends AbstractQueue[E] {
val backingQueue: AbstractQueue[PriorityQueueStabilizer.WrappedElement[E]]
val seqNum = new AtomicLong(0)
override def peek(): E = {
val wrappedElement = backingQueue.peek()
if (wrappedElement eq null) null.asInstanceOf[E] else wrappedElement.element
}
override def size(): Int = backingQueue.size()
override def offer(e: E): Boolean = {
if (e eq null) throw new NullPointerException
val wrappedElement = new PriorityQueueStabilizer.WrappedElement(e, seqNum.incrementAndGet)
backingQueue.offer(wrappedElement)
}
override def iterator(): Iterator[E] = new Iterator[E] {
private[this] val backingIterator = backingQueue.iterator()
def hasNext: Boolean = backingIterator.hasNext
def next(): E = backingIterator.next().element
def remove() = backingIterator.remove()
}
override def poll(): E = {
val wrappedElement = backingQueue.poll()
if (wrappedElement eq null) null.asInstanceOf[E] else wrappedElement.element
}
}
object PriorityQueueStabilizer {
class WrappedElement[E](val element: E, val seqNum: Long)
class WrappedElementComparator[E](val cmp: Comparator[E]) extends Comparator[WrappedElement[E]] {
def compare(e1: WrappedElement[E], e2: WrappedElement[E]): Int = {
val baseComparison = cmp.compare(e1.element, e2.element)
if (baseComparison != 0) baseComparison
else {
val diff = e1.seqNum - e2.seqNum
java.lang.Long.signum(diff)
}
}
}
}
/**
* StablePriorityQueue is a priority queue that preserves order for elements of equal priority.
* @param capacity - the initial capacity of this Queue, needs to be > 0.
* @param cmp - Comparator for comparing Queue elements
* @tparam E - The type of the elements of this Queue
*/
class StablePriorityQueue[E <: AnyRef](capacity: Int, cmp: Comparator[E]) extends PriorityQueueStabilizer[E] {
val backingQueue = new PriorityQueue[PriorityQueueStabilizer.WrappedElement[E]](
capacity,
new PriorityQueueStabilizer.WrappedElementComparator[E](cmp))
}
/**
* StablePriorityBlockingQueue is a blocking priority queue that preserves order for elements of equal priority.
* @param capacity - the initial capacity of this Queue, needs to be > 0.
* @param cmp - Comparator for comparing Queue elements
* @tparam E - The type of the elements of this Queue
*/
class StablePriorityBlockingQueue[E <: AnyRef](capacity: Int, cmp: Comparator[E]) extends PriorityQueueStabilizer[E] {
val backingQueue = new PriorityBlockingQueue[PriorityQueueStabilizer.WrappedElement[E]](
capacity,
new PriorityQueueStabilizer.WrappedElementComparator[E](cmp))
}

View file

@ -25,7 +25,7 @@ import akka.event.LoggingAdapter;
//#imports-prio-mailbox
import akka.dispatch.PriorityGenerator;
import akka.dispatch.UnboundedPriorityMailbox;
import akka.dispatch.UnboundedStablePriorityMailbox;
import akka.testkit.AkkaJUnitActorSystemResource;
import akka.testkit.JavaTestKit;
import com.typesafe.config.Config;
@ -74,7 +74,7 @@ public class DispatcherDocTest {
.withDispatcher("my-pinned-dispatcher"));
//#defining-pinned-dispatcher
}
@SuppressWarnings("unused")
public void compileLookup() {
//#lookup
@ -188,7 +188,7 @@ public class DispatcherDocTest {
static
//#prio-mailbox
public class MyPrioMailbox extends UnboundedPriorityMailbox {
public class MyPrioMailbox extends UnboundedStablePriorityMailbox {
// needed for reflective instantiation
public MyPrioMailbox(ActorSystem.Settings settings, Config config) {
// Create a new PriorityGenerator, lower prio means more important

View file

@ -82,8 +82,8 @@ dispatcher which will execute it. Then the mailbox is determined as follows:
Default Mailbox
---------------
When the mailbox is not specified as described above the default mailbox
is used. By default it is an unbounded mailbox, which is backed by a
When the mailbox is not specified as described above the default mailbox
is used. By default it is an unbounded mailbox, which is backed by a
``java.util.concurrent.ConcurrentLinkedQueue``.
``SingleConsumerOnlyUnboundedMailbox`` is an even more efficient mailbox, and
@ -155,6 +155,8 @@ Akka comes shipped with a number of mailbox implementations:
- Backed by a ``java.util.concurrent.PriorityBlockingQueue``
- Delivery order for messages of equal priority is undefined - contrast with the UnboundedStablePriorityMailbox
- Blocking: Yes
- Bounded: No
@ -163,7 +165,9 @@ Akka comes shipped with a number of mailbox implementations:
* BoundedPriorityMailbox
- Backed by a ``java.util.PriorityBlockingQueue`` wrapped in an ``akka.util.BoundedBlockingQueue``
- Backed by a ``java.util.PriorityQueue`` wrapped in an ``akka.util.BoundedBlockingQueue``
- Delivery order for messages of equal priority is undefined - contrast with the BoundedStablePriorityMailbox
- Blocking: Yes
@ -171,6 +175,30 @@ Akka comes shipped with a number of mailbox implementations:
- Configuration name: "akka.dispatch.BoundedPriorityMailbox"
* UnboundedStablePriorityMailbox
- Backed by a ``java.util.concurrent.PriorityBlockingQueue`` wrapped in an ``akka.util.PriorityQueueStabilizer``
- FIFO order is preserved for messages of equal priority - contrast with the UnboundedPriorityMailbox
- Blocking: Yes
- Bounded: No
- Configuration name: "akka.dispatch.UnboundedStablePriorityMailbox"
* BoundedStablePriorityMailbox
- Backed by a ``java.util.PriorityQueue`` wrapped in an ``akka.util.PriorityQueueStabilizer`` and an ``akka.util.BoundedBlockingQueue``
- FIFO order is preserved for messages of equal priority - contrast with the BoundedPriorityMailbox
- Blocking: Yes
- Bounded: Yes
- Configuration name: "akka.dispatch.BoundedStablePriorityMailbox"
* UnboundedControlAwareMailbox
- Delivers messages that extend ``akka.dispatch.ControlMessage`` with higher priority

View file

@ -187,13 +187,13 @@ object DispatcherDocSpec {
//#prio-mailbox
import akka.dispatch.PriorityGenerator
import akka.dispatch.UnboundedPriorityMailbox
import akka.dispatch.UnboundedStablePriorityMailbox
import com.typesafe.config.Config
// We inherit, in this case, from UnboundedPriorityMailbox
// We inherit, in this case, from UnboundedStablePriorityMailbox
// and seed it with the priority generator
class MyPrioMailbox(settings: ActorSystem.Settings, config: Config)
extends UnboundedPriorityMailbox(
extends UnboundedStablePriorityMailbox(
// Create a new PriorityGenerator, lower prio means more important
PriorityGenerator {
// 'highpriority messages should be treated first if possible

View file

@ -82,8 +82,8 @@ dispatcher which will execute it. Then the mailbox is determined as follows:
Default Mailbox
---------------
When the mailbox is not specified as described above the default mailbox
is used. By default it is an unbounded mailbox, which is backed by a
When the mailbox is not specified as described above the default mailbox
is used. By default it is an unbounded mailbox, which is backed by a
``java.util.concurrent.ConcurrentLinkedQueue``.
``SingleConsumerOnlyUnboundedMailbox`` is an even more efficient mailbox, and
@ -155,6 +155,8 @@ Akka comes shipped with a number of mailbox implementations:
- Backed by a ``java.util.concurrent.PriorityBlockingQueue``
- Delivery order for messages of equal priority is undefined - contrast with the UnboundedStablePriorityMailbox
- Blocking: Yes
- Bounded: No
@ -163,7 +165,9 @@ Akka comes shipped with a number of mailbox implementations:
* BoundedPriorityMailbox
- Backed by a ``java.util.PriorityBlockingQueue`` wrapped in an ``akka.util.BoundedBlockingQueue``
- Backed by a ``java.util.PriorityQueue`` wrapped in an ``akka.util.BoundedBlockingQueue``
- Delivery order for messages of equal priority is undefined - contrast with the BoundedStablePriorityMailbox
- Blocking: Yes
@ -171,6 +175,30 @@ Akka comes shipped with a number of mailbox implementations:
- Configuration name: "akka.dispatch.BoundedPriorityMailbox"
* UnboundedStablePriorityMailbox
- Backed by a ``java.util.concurrent.PriorityBlockingQueue`` wrapped in an ``akka.util.PriorityQueueStabilizer``
- FIFO order is preserved for messages of equal priority - contrast with the UnboundedPriorityMailbox
- Blocking: Yes
- Bounded: No
- Configuration name: "akka.dispatch.UnboundedStablePriorityMailbox"
* BoundedStablePriorityMailbox
- Backed by a ``java.util.PriorityQueue`` wrapped in an ``akka.util.PriorityQueueStabilizer`` and an ``akka.util.BoundedBlockingQueue``
- FIFO order is preserved for messages of equal priority - contrast with the BoundedPriorityMailbox
- Blocking: Yes
- Bounded: Yes
- Configuration name: "akka.dispatch.BoundedStablePriorityMailbox"
* UnboundedControlAwareMailbox
- Delivers messages that extend ``akka.dispatch.ControlMessage`` with higher priority