diff --git a/akka-actor-tests/src/test/scala/akka/dispatch/MailboxConfigSpec.scala b/akka-actor-tests/src/test/scala/akka/dispatch/MailboxConfigSpec.scala index 7a469868a4..5e08708037 100644 --- a/akka-actor-tests/src/test/scala/akka/dispatch/MailboxConfigSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/dispatch/MailboxConfigSpec.scala @@ -173,11 +173,7 @@ class DefaultMailboxSpec extends MailboxSpec { } class PriorityMailboxSpec extends MailboxSpec { - val comparator = new java.util.Comparator[MessageInvocation] { - def compare(a: MessageInvocation, b: MessageInvocation): Int = { - a.## - b.## - } - } + val comparator = PriorityGenerator(_.##) lazy val name = "The priority mailbox implementation" def factory = { case UnboundedMailbox(blockDequeue) => diff --git a/akka-actor-tests/src/test/scala/akka/dispatch/PriorityDispatcherSpec.scala b/akka-actor-tests/src/test/scala/akka/dispatch/PriorityDispatcherSpec.scala new file mode 100644 index 0000000000..383cf63f48 --- /dev/null +++ b/akka-actor-tests/src/test/scala/akka/dispatch/PriorityDispatcherSpec.scala @@ -0,0 +1,51 @@ +package akka.dispatch + +import akka.actor.Actor._ +import akka.actor.Actor +import org.scalatest.WordSpec +import org.scalatest.matchers.MustMatchers +import java.util.concurrent.CountDownLatch + +class PriorityDispatcherSpec extends WordSpec with MustMatchers { + + "A PriorityExecutorBasedEventDrivenDispatcher" must { + "Order it's messages according to the specified comparator using an unbounded mailbox" in { + testOrdering(UnboundedMailbox(false)) + } + + "Order it's messages according to the specified comparator using a bounded mailbox" in { + testOrdering(BoundedMailbox(false,1000)) + } + } + + def testOrdering(mboxType: MailboxType) { + val dispatcher = new PriorityExecutorBasedEventDrivenDispatcher("Test", + PriorityGenerator({ + case i: Int => i //Reverse order + case 'Result => Int.MaxValue + }: Any => Int), + throughput = 1, + mailboxType = mboxType + ) + + val actor = actorOf(new Actor { + self.dispatcher = dispatcher + var acc: List[Int] = Nil + + def receive = { + case i: Int => acc = i :: acc + case 'Result => self reply_? acc + } + }).start + + dispatcher.suspend(actor) //Make sure the actor isn't treating any messages, let it buffer the incoming messages + + val msgs = (1 to 100).toList + for(m <- msgs) actor ! m + + dispatcher.resume(actor) //Signal the actor to start treating it's message backlog + + actor.!!![List[Int]]('Result).await.result.get must be === (msgs.reverse) + } + +} diff --git a/akka-actor/src/main/scala/akka/dispatch/ExecutorBasedEventDrivenDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/ExecutorBasedEventDrivenDispatcher.scala index 4fec527642..019923b4b4 100644 --- a/akka-actor/src/main/scala/akka/dispatch/ExecutorBasedEventDrivenDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/ExecutorBasedEventDrivenDispatcher.scala @@ -229,9 +229,39 @@ trait ExecutableMailbox extends Runnable { self: MessageQueue => } } +object PriorityGenerator { + /** + * Creates a PriorityGenerator that uses the supplied function as priority generator + */ + def apply(priorityFunction: Any => Int): PriorityGenerator = new PriorityGenerator { + def gen(message: Any): Int = priorityFunction(message) + } + + /** + * Java API + * Creates a PriorityGenerator that uses the supplied function as priority generator + */ + def apply(priorityFunction: akka.japi.Function[Any, Int]): PriorityGenerator = new PriorityGenerator { + def gen(message: Any): Int = priorityFunction(message) + } +} + +/** + * A PriorityGenerator is a convenience API to create a Comparator that orders the messages of a + * PriorityExecutorBasedEventDrivenDispatcher + */ +abstract class PriorityGenerator extends java.util.Comparator[MessageInvocation] { + def gen(message: Any): Int + + final def compare(thisMessage: MessageInvocation, thatMessage: MessageInvocation): Int = + gen(thisMessage.message) - gen(thatMessage.message) +} + /** * A version of ExecutorBasedEventDrivenDispatcher that gives all actors registered to it a priority mailbox, * prioritized according to the supplied comparator. + * + * The dispatcher will process the messages with the _lowest_ priority first. */ class PriorityExecutorBasedEventDrivenDispatcher( name: String, @@ -242,10 +272,10 @@ class PriorityExecutorBasedEventDrivenDispatcher( config: ThreadPoolConfig = ThreadPoolConfig() ) extends ExecutorBasedEventDrivenDispatcher(name, throughput, throughputDeadlineTime, mailboxType, config) with PriorityMailbox { - def this(name: String, comparator: java.util.Comparator[MessageInvocation], throughput: Int, throughputDeadlineTime: Int, mailboxType: UnboundedMailbox) = + def this(name: String, comparator: java.util.Comparator[MessageInvocation], throughput: Int, throughputDeadlineTime: Int, mailboxType: MailboxType) = this(name, comparator, throughput, throughputDeadlineTime, mailboxType,ThreadPoolConfig()) // Needed for Java API usage - def this(name: String, comparator: java.util.Comparator[MessageInvocation], throughput: Int, mailboxType: UnboundedMailbox) = + def this(name: String, comparator: java.util.Comparator[MessageInvocation], throughput: Int, mailboxType: MailboxType) = this(name, comparator, throughput, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, mailboxType) // Needed for Java API usage def this(name: String, comparator: java.util.Comparator[MessageInvocation], throughput: Int) = @@ -258,6 +288,15 @@ class PriorityExecutorBasedEventDrivenDispatcher( this(name, comparator, Dispatchers.THROUGHPUT, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, Dispatchers.MAILBOX_TYPE) // Needed for Java API usage } + +/** + * Can be used to give an ExecutorBasedEventDrivenDispatcher's actors priority-enabled mailboxes + * + * Usage: + * new ExecutorBasedEventDrivenDispatcher(...) with PriorityMailbox { + * val comparator = ...comparator that determines mailbox priority ordering... + * } + */ trait PriorityMailbox { self: ExecutorBasedEventDrivenDispatcher => def comparator: java.util.Comparator[MessageInvocation]