diff --git a/akka-actor-tests/src/test/scala/akka/dispatch/MailboxConfigSpec.scala b/akka-actor-tests/src/test/scala/akka/dispatch/MailboxConfigSpec.scala index 89158e1574..20b6027f15 100644 --- a/akka-actor-tests/src/test/scala/akka/dispatch/MailboxConfigSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/dispatch/MailboxConfigSpec.scala @@ -195,6 +195,15 @@ class PriorityMailboxSpec extends MailboxSpec { } } +class StablePriorityMailboxSpec extends MailboxSpec { + val comparator = PriorityGenerator(_.##) + lazy val name = "The stable priority mailbox implementation" + def factory = { + case UnboundedMailbox() ⇒ new UnboundedStablePriorityMailbox(comparator).create(None, None) + case BoundedMailbox(capacity, pushTimeOut) ⇒ new BoundedStablePriorityMailbox(comparator, capacity, pushTimeOut).create(None, None) + } +} + class ControlAwareMailboxSpec extends MailboxSpec { lazy val name = "The control aware mailbox implementation" def factory = { diff --git a/akka-actor-tests/src/test/scala/akka/dispatch/StablePriorityDispatcherSpec.scala b/akka-actor-tests/src/test/scala/akka/dispatch/StablePriorityDispatcherSpec.scala new file mode 100644 index 0000000000..df0e9a4bfc --- /dev/null +++ b/akka-actor-tests/src/test/scala/akka/dispatch/StablePriorityDispatcherSpec.scala @@ -0,0 +1,88 @@ +/** + * Copyright (C) 2015 Typesafe Inc. + */ +package akka.dispatch + +import language.postfixOps + +import com.typesafe.config.Config + +import akka.actor.{ Props, ActorSystem, Actor } +import akka.testkit.{ DefaultTimeout, AkkaSpec } +import scala.concurrent.duration._ + +object StablePriorityDispatcherSpec { + val config = """ + unbounded-stable-prio-dispatcher { + mailbox-type = "akka.dispatch.StablePriorityDispatcherSpec$Unbounded" + } + bounded-stable-prio-dispatcher { + mailbox-type = "akka.dispatch.StablePriorityDispatcherSpec$Bounded" + } + """ + + class Unbounded(settings: ActorSystem.Settings, config: Config) extends UnboundedStablePriorityMailbox(PriorityGenerator({ + case i: Int if i <= 100 ⇒ i // Small integers have high priority + case i: Int ⇒ 101 // Don't care for other integers + case 'Result ⇒ Int.MaxValue + }: Any ⇒ Int)) + + class Bounded(settings: ActorSystem.Settings, config: Config) extends BoundedStablePriorityMailbox(PriorityGenerator({ + case i: Int if i <= 100 ⇒ i // Small integers have high priority + case i: Int ⇒ 101 // Don't care for other integers + case 'Result ⇒ Int.MaxValue + }: Any ⇒ Int), 1000, 10 seconds) + +} + +@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) +class StablePriorityDispatcherSpec extends AkkaSpec(StablePriorityDispatcherSpec.config) with DefaultTimeout { + + "A StablePriorityDispatcher" must { + "Order its messages according to the specified comparator while preserving FIFO for equal priority messages, " + + "using an unbounded mailbox" in { + val dispatcherKey = "unbounded-stable-prio-dispatcher" + testOrdering(dispatcherKey) + } + + "Order its messages according to the specified comparator while preserving FIFO for equal priority messages, " + + "using a bounded mailbox" in { + val dispatcherKey = "bounded-stable-prio-dispatcher" + testOrdering(dispatcherKey) + } + + def testOrdering(dispatcherKey: String) { + val msgs = (1 to 200) toList + val shuffled = scala.util.Random.shuffle(msgs) + + // It's important that the actor under test is not a top level actor + // with RepointableActorRef, since messages might be queued in + // UnstartedCell and then sent to the StablePriorityQueue and consumed immediately + // without the ordering taking place. + val actor = system.actorOf(Props(new Actor { + context.actorOf(Props(new Actor { + + val acc = scala.collection.mutable.ListBuffer[Int]() + + shuffled foreach { m ⇒ self ! m } + + self.tell('Result, testActor) + + def receive = { + case i: Int ⇒ acc += i + case 'Result ⇒ sender() ! acc.toList + } + }).withDispatcher(dispatcherKey)) + + def receive = Actor.emptyBehavior + + })) + + // Low messages should come out first, and in priority order. High messages follow - they are equal priority and + // should come out in the same order in which they were sent. + val lo = (1 to 100) toList + val hi = shuffled filter { _ > 100 } + expectMsgType[List[_]] should be(lo ++ hi) + } + } +} \ No newline at end of file diff --git a/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala b/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala index 575e82b952..38779f5ae8 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala @@ -8,7 +8,7 @@ import java.util.concurrent._ import akka.AkkaException import akka.dispatch.sysmsg._ import akka.actor.{ ActorCell, ActorRef, Cell, ActorSystem, InternalActorRef, DeadLetter } -import akka.util.{ Unsafe, BoundedBlockingQueue } +import akka.util.{ BoundedBlockingQueue, StablePriorityBlockingQueue, StablePriorityQueue, Unsafe } import akka.util.Helpers.ConfigOps import akka.event.Logging.Error import scala.concurrent.duration.Duration @@ -696,6 +696,48 @@ object BoundedPriorityMailbox { } } +/** + * UnboundedStablePriorityMailbox is an unbounded mailbox that allows for prioritization of its contents. Unlike the + * [[UnboundedPriorityMailbox]] it preserves ordering for messages of equal priority. + * Extend this class and provide the Comparator in the constructor. + */ +class UnboundedStablePriorityMailbox(val cmp: Comparator[Envelope], val initialCapacity: Int) + extends MailboxType with ProducesMessageQueue[UnboundedStablePriorityMailbox.MessageQueue] { + def this(cmp: Comparator[Envelope]) = this(cmp, 11) + final override def create(owner: Option[ActorRef], system: Option[ActorSystem]): MessageQueue = + new UnboundedStablePriorityMailbox.MessageQueue(initialCapacity, cmp) +} + +object UnboundedStablePriorityMailbox { + class MessageQueue(initialCapacity: Int, cmp: Comparator[Envelope]) + extends StablePriorityBlockingQueue[Envelope](initialCapacity, cmp) with UnboundedQueueBasedMessageQueue { + final def queue: Queue[Envelope] = this + } +} + +/** + * BoundedStablePriorityMailbox is a bounded mailbox that allows for prioritization of its contents. Unlike the + * [[BoundedPriorityMailbox]] it preserves ordering for messages of equal priority. + * Extend this class and provide the Comparator in the constructor. + */ +class BoundedStablePriorityMailbox( final val cmp: Comparator[Envelope], final val capacity: Int, final val pushTimeOut: Duration) + extends MailboxType with ProducesMessageQueue[BoundedStablePriorityMailbox.MessageQueue] { + + if (capacity < 0) throw new IllegalArgumentException("The capacity for BoundedMailbox can not be negative") + if (pushTimeOut eq null) throw new IllegalArgumentException("The push time-out for BoundedMailbox can not be null") + + final override def create(owner: Option[ActorRef], system: Option[ActorSystem]): MessageQueue = + new BoundedStablePriorityMailbox.MessageQueue(capacity, cmp, pushTimeOut) +} + +object BoundedStablePriorityMailbox { + class MessageQueue(capacity: Int, cmp: Comparator[Envelope], val pushTimeOut: Duration) + extends BoundedBlockingQueue[Envelope](capacity, new StablePriorityQueue[Envelope](11, cmp)) + with BoundedQueueBasedMessageQueue { + final def queue: BlockingQueue[Envelope] = this + } +} + /** * UnboundedDequeBasedMailbox is an unbounded MailboxType, backed by a Deque. */ diff --git a/akka-actor/src/main/scala/akka/util/StablePriorityQueue.scala b/akka-actor/src/main/scala/akka/util/StablePriorityQueue.scala new file mode 100644 index 0000000000..89b6eec975 --- /dev/null +++ b/akka-actor/src/main/scala/akka/util/StablePriorityQueue.scala @@ -0,0 +1,81 @@ +/** + * Copyright (C) 2015 Typesafe Inc. + */ + +package akka.util + +import java.util.concurrent.PriorityBlockingQueue +import java.util.concurrent.atomic.AtomicLong +import java.util.{ AbstractQueue, Comparator, Iterator, PriorityQueue } + +/** + * PriorityQueueStabilizer wraps a priority queue so that it respects FIFO for elements of equal priority. + * @tparam E - The type of the elements of this Queue + */ +trait PriorityQueueStabilizer[E <: AnyRef] extends AbstractQueue[E] { + val backingQueue: AbstractQueue[PriorityQueueStabilizer.WrappedElement[E]] + val seqNum = new AtomicLong(0) + + override def peek(): E = { + val wrappedElement = backingQueue.peek() + if (wrappedElement eq null) null.asInstanceOf[E] else wrappedElement.element + } + + override def size(): Int = backingQueue.size() + + override def offer(e: E): Boolean = { + if (e eq null) throw new NullPointerException + val wrappedElement = new PriorityQueueStabilizer.WrappedElement(e, seqNum.incrementAndGet) + backingQueue.offer(wrappedElement) + } + + override def iterator(): Iterator[E] = new Iterator[E] { + private[this] val backingIterator = backingQueue.iterator() + def hasNext: Boolean = backingIterator.hasNext + def next(): E = backingIterator.next().element + def remove() = backingIterator.remove() + } + + override def poll(): E = { + val wrappedElement = backingQueue.poll() + if (wrappedElement eq null) null.asInstanceOf[E] else wrappedElement.element + } +} + +object PriorityQueueStabilizer { + class WrappedElement[E](val element: E, val seqNum: Long) + class WrappedElementComparator[E](val cmp: Comparator[E]) extends Comparator[WrappedElement[E]] { + def compare(e1: WrappedElement[E], e2: WrappedElement[E]): Int = { + val baseComparison = cmp.compare(e1.element, e2.element) + if (baseComparison != 0) baseComparison + else { + val diff = e1.seqNum - e2.seqNum + java.lang.Long.signum(diff) + } + } + } +} + +/** + * StablePriorityQueue is a priority queue that preserves order for elements of equal priority. + * @param capacity - the initial capacity of this Queue, needs to be > 0. + * @param cmp - Comparator for comparing Queue elements + * @tparam E - The type of the elements of this Queue + */ +class StablePriorityQueue[E <: AnyRef](capacity: Int, cmp: Comparator[E]) extends PriorityQueueStabilizer[E] { + val backingQueue = new PriorityQueue[PriorityQueueStabilizer.WrappedElement[E]]( + capacity, + new PriorityQueueStabilizer.WrappedElementComparator[E](cmp)) +} + +/** + * StablePriorityBlockingQueue is a blocking priority queue that preserves order for elements of equal priority. + * @param capacity - the initial capacity of this Queue, needs to be > 0. + * @param cmp - Comparator for comparing Queue elements + * @tparam E - The type of the elements of this Queue + */ +class StablePriorityBlockingQueue[E <: AnyRef](capacity: Int, cmp: Comparator[E]) extends PriorityQueueStabilizer[E] { + val backingQueue = new PriorityBlockingQueue[PriorityQueueStabilizer.WrappedElement[E]]( + capacity, + new PriorityQueueStabilizer.WrappedElementComparator[E](cmp)) +} diff --git a/akka-docs/rst/java/code/docs/dispatcher/DispatcherDocTest.java b/akka-docs/rst/java/code/docs/dispatcher/DispatcherDocTest.java index 85aee84fd1..bb1fe1a851 100644 --- a/akka-docs/rst/java/code/docs/dispatcher/DispatcherDocTest.java +++ b/akka-docs/rst/java/code/docs/dispatcher/DispatcherDocTest.java @@ -25,7 +25,7 @@ import akka.event.LoggingAdapter; //#imports-prio-mailbox import akka.dispatch.PriorityGenerator; -import akka.dispatch.UnboundedPriorityMailbox; +import akka.dispatch.UnboundedStablePriorityMailbox; import akka.testkit.AkkaJUnitActorSystemResource; import akka.testkit.JavaTestKit; import com.typesafe.config.Config; @@ -74,7 +74,7 @@ public class DispatcherDocTest { .withDispatcher("my-pinned-dispatcher")); //#defining-pinned-dispatcher } - + @SuppressWarnings("unused") public void compileLookup() { //#lookup @@ -188,7 +188,7 @@ public class DispatcherDocTest { static //#prio-mailbox - public class MyPrioMailbox extends UnboundedPriorityMailbox { + public class MyPrioMailbox extends UnboundedStablePriorityMailbox { // needed for reflective instantiation public MyPrioMailbox(ActorSystem.Settings settings, Config config) { // Create a new PriorityGenerator, lower prio means more important diff --git a/akka-docs/rst/java/mailboxes.rst b/akka-docs/rst/java/mailboxes.rst index 60d18f5b7f..e320d0c06a 100644 --- a/akka-docs/rst/java/mailboxes.rst +++ b/akka-docs/rst/java/mailboxes.rst @@ -82,8 +82,8 @@ dispatcher which will execute it. Then the mailbox is determined as follows: Default Mailbox --------------- -When the mailbox is not specified as described above the default mailbox -is used. By default it is an unbounded mailbox, which is backed by a +When the mailbox is not specified as described above the default mailbox +is used. By default it is an unbounded mailbox, which is backed by a ``java.util.concurrent.ConcurrentLinkedQueue``. ``SingleConsumerOnlyUnboundedMailbox`` is an even more efficient mailbox, and @@ -155,6 +155,8 @@ Akka comes shipped with a number of mailbox implementations: - Backed by a ``java.util.concurrent.PriorityBlockingQueue`` + - Delivery order for messages of equal priority is undefined - contrast with the UnboundedStablePriorityMailbox + - Blocking: Yes - Bounded: No @@ -163,7 +165,9 @@ Akka comes shipped with a number of mailbox implementations: * BoundedPriorityMailbox - - Backed by a ``java.util.PriorityBlockingQueue`` wrapped in an ``akka.util.BoundedBlockingQueue`` + - Backed by a ``java.util.PriorityQueue`` wrapped in an ``akka.util.BoundedBlockingQueue`` + + - Delivery order for messages of equal priority is undefined - contrast with the BoundedStablePriorityMailbox - Blocking: Yes @@ -171,6 +175,30 @@ Akka comes shipped with a number of mailbox implementations: - Configuration name: "akka.dispatch.BoundedPriorityMailbox" +* UnboundedStablePriorityMailbox + + - Backed by a ``java.util.concurrent.PriorityBlockingQueue`` wrapped in an ``akka.util.PriorityQueueStabilizer`` + + - FIFO order is preserved for messages of equal priority - contrast with the UnboundedPriorityMailbox + + - Blocking: Yes + + - Bounded: No + + - Configuration name: "akka.dispatch.UnboundedStablePriorityMailbox" + +* BoundedStablePriorityMailbox + + - Backed by a ``java.util.PriorityQueue`` wrapped in an ``akka.util.PriorityQueueStabilizer`` and an ``akka.util.BoundedBlockingQueue`` + + - FIFO order is preserved for messages of equal priority - contrast with the BoundedPriorityMailbox + + - Blocking: Yes + + - Bounded: Yes + + - Configuration name: "akka.dispatch.BoundedStablePriorityMailbox" + * UnboundedControlAwareMailbox - Delivers messages that extend ``akka.dispatch.ControlMessage`` with higher priority diff --git a/akka-docs/rst/scala/code/docs/dispatcher/DispatcherDocSpec.scala b/akka-docs/rst/scala/code/docs/dispatcher/DispatcherDocSpec.scala index dceac101c6..15e29a5dd3 100644 --- a/akka-docs/rst/scala/code/docs/dispatcher/DispatcherDocSpec.scala +++ b/akka-docs/rst/scala/code/docs/dispatcher/DispatcherDocSpec.scala @@ -187,13 +187,13 @@ object DispatcherDocSpec { //#prio-mailbox import akka.dispatch.PriorityGenerator - import akka.dispatch.UnboundedPriorityMailbox + import akka.dispatch.UnboundedStablePriorityMailbox import com.typesafe.config.Config - // We inherit, in this case, from UnboundedPriorityMailbox + // We inherit, in this case, from UnboundedStablePriorityMailbox // and seed it with the priority generator class MyPrioMailbox(settings: ActorSystem.Settings, config: Config) - extends UnboundedPriorityMailbox( + extends UnboundedStablePriorityMailbox( // Create a new PriorityGenerator, lower prio means more important PriorityGenerator { // 'highpriority messages should be treated first if possible diff --git a/akka-docs/rst/scala/mailboxes.rst b/akka-docs/rst/scala/mailboxes.rst index 3c85f4ad1e..cf2ed4a8a7 100644 --- a/akka-docs/rst/scala/mailboxes.rst +++ b/akka-docs/rst/scala/mailboxes.rst @@ -82,8 +82,8 @@ dispatcher which will execute it. Then the mailbox is determined as follows: Default Mailbox --------------- -When the mailbox is not specified as described above the default mailbox -is used. By default it is an unbounded mailbox, which is backed by a +When the mailbox is not specified as described above the default mailbox +is used. By default it is an unbounded mailbox, which is backed by a ``java.util.concurrent.ConcurrentLinkedQueue``. ``SingleConsumerOnlyUnboundedMailbox`` is an even more efficient mailbox, and @@ -155,6 +155,8 @@ Akka comes shipped with a number of mailbox implementations: - Backed by a ``java.util.concurrent.PriorityBlockingQueue`` + - Delivery order for messages of equal priority is undefined - contrast with the UnboundedStablePriorityMailbox + - Blocking: Yes - Bounded: No @@ -163,7 +165,9 @@ Akka comes shipped with a number of mailbox implementations: * BoundedPriorityMailbox - - Backed by a ``java.util.PriorityBlockingQueue`` wrapped in an ``akka.util.BoundedBlockingQueue`` + - Backed by a ``java.util.PriorityQueue`` wrapped in an ``akka.util.BoundedBlockingQueue`` + + - Delivery order for messages of equal priority is undefined - contrast with the BoundedStablePriorityMailbox - Blocking: Yes @@ -171,6 +175,30 @@ Akka comes shipped with a number of mailbox implementations: - Configuration name: "akka.dispatch.BoundedPriorityMailbox" +* UnboundedStablePriorityMailbox + + - Backed by a ``java.util.concurrent.PriorityBlockingQueue`` wrapped in an ``akka.util.PriorityQueueStabilizer`` + + - FIFO order is preserved for messages of equal priority - contrast with the UnboundedPriorityMailbox + + - Blocking: Yes + + - Bounded: No + + - Configuration name: "akka.dispatch.UnboundedStablePriorityMailbox" + +* BoundedStablePriorityMailbox + + - Backed by a ``java.util.PriorityQueue`` wrapped in an ``akka.util.PriorityQueueStabilizer`` and an ``akka.util.BoundedBlockingQueue`` + + - FIFO order is preserved for messages of equal priority - contrast with the BoundedPriorityMailbox + + - Blocking: Yes + + - Bounded: Yes + + - Configuration name: "akka.dispatch.BoundedStablePriorityMailbox" + * UnboundedControlAwareMailbox - Delivers messages that extend ``akka.dispatch.ControlMessage`` with higher priority