Fixing 2 wrong types in PriorityEBEDD and added tests for the message processing ordering
This commit is contained in:
parent
6537c75625
commit
3770a32216
3 changed files with 93 additions and 7 deletions
|
|
@ -173,11 +173,7 @@ class DefaultMailboxSpec extends MailboxSpec {
|
||||||
}
|
}
|
||||||
|
|
||||||
class PriorityMailboxSpec extends MailboxSpec {
|
class PriorityMailboxSpec extends MailboxSpec {
|
||||||
val comparator = new java.util.Comparator[MessageInvocation] {
|
val comparator = PriorityGenerator(_.##)
|
||||||
def compare(a: MessageInvocation, b: MessageInvocation): Int = {
|
|
||||||
a.## - b.##
|
|
||||||
}
|
|
||||||
}
|
|
||||||
lazy val name = "The priority mailbox implementation"
|
lazy val name = "The priority mailbox implementation"
|
||||||
def factory = {
|
def factory = {
|
||||||
case UnboundedMailbox(blockDequeue) =>
|
case UnboundedMailbox(blockDequeue) =>
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,51 @@
|
||||||
|
package akka.dispatch
|
||||||
|
|
||||||
|
import akka.actor.Actor._
|
||||||
|
import akka.actor.Actor
|
||||||
|
import org.scalatest.WordSpec
|
||||||
|
import org.scalatest.matchers.MustMatchers
|
||||||
|
import java.util.concurrent.CountDownLatch
|
||||||
|
|
||||||
|
class PriorityDispatcherSpec extends WordSpec with MustMatchers {
|
||||||
|
|
||||||
|
"A PriorityExecutorBasedEventDrivenDispatcher" must {
|
||||||
|
"Order it's messages according to the specified comparator using an unbounded mailbox" in {
|
||||||
|
testOrdering(UnboundedMailbox(false))
|
||||||
|
}
|
||||||
|
|
||||||
|
"Order it's messages according to the specified comparator using a bounded mailbox" in {
|
||||||
|
testOrdering(BoundedMailbox(false,1000))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
def testOrdering(mboxType: MailboxType) {
|
||||||
|
val dispatcher = new PriorityExecutorBasedEventDrivenDispatcher("Test",
|
||||||
|
PriorityGenerator({
|
||||||
|
case i: Int => i //Reverse order
|
||||||
|
case 'Result => Int.MaxValue
|
||||||
|
}: Any => Int),
|
||||||
|
throughput = 1,
|
||||||
|
mailboxType = mboxType
|
||||||
|
)
|
||||||
|
|
||||||
|
val actor = actorOf(new Actor {
|
||||||
|
self.dispatcher = dispatcher
|
||||||
|
var acc: List[Int] = Nil
|
||||||
|
|
||||||
|
def receive = {
|
||||||
|
case i: Int => acc = i :: acc
|
||||||
|
case 'Result => self reply_? acc
|
||||||
|
}
|
||||||
|
}).start
|
||||||
|
|
||||||
|
dispatcher.suspend(actor) //Make sure the actor isn't treating any messages, let it buffer the incoming messages
|
||||||
|
|
||||||
|
val msgs = (1 to 100).toList
|
||||||
|
for(m <- msgs) actor ! m
|
||||||
|
|
||||||
|
dispatcher.resume(actor) //Signal the actor to start treating it's message backlog
|
||||||
|
|
||||||
|
actor.!!![List[Int]]('Result).await.result.get must be === (msgs.reverse)
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
@ -229,9 +229,39 @@ trait ExecutableMailbox extends Runnable { self: MessageQueue =>
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
object PriorityGenerator {
|
||||||
|
/**
|
||||||
|
* Creates a PriorityGenerator that uses the supplied function as priority generator
|
||||||
|
*/
|
||||||
|
def apply(priorityFunction: Any => Int): PriorityGenerator = new PriorityGenerator {
|
||||||
|
def gen(message: Any): Int = priorityFunction(message)
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Java API
|
||||||
|
* Creates a PriorityGenerator that uses the supplied function as priority generator
|
||||||
|
*/
|
||||||
|
def apply(priorityFunction: akka.japi.Function[Any, Int]): PriorityGenerator = new PriorityGenerator {
|
||||||
|
def gen(message: Any): Int = priorityFunction(message)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A PriorityGenerator is a convenience API to create a Comparator that orders the messages of a
|
||||||
|
* PriorityExecutorBasedEventDrivenDispatcher
|
||||||
|
*/
|
||||||
|
abstract class PriorityGenerator extends java.util.Comparator[MessageInvocation] {
|
||||||
|
def gen(message: Any): Int
|
||||||
|
|
||||||
|
final def compare(thisMessage: MessageInvocation, thatMessage: MessageInvocation): Int =
|
||||||
|
gen(thisMessage.message) - gen(thatMessage.message)
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A version of ExecutorBasedEventDrivenDispatcher that gives all actors registered to it a priority mailbox,
|
* A version of ExecutorBasedEventDrivenDispatcher that gives all actors registered to it a priority mailbox,
|
||||||
* prioritized according to the supplied comparator.
|
* prioritized according to the supplied comparator.
|
||||||
|
*
|
||||||
|
* The dispatcher will process the messages with the _lowest_ priority first.
|
||||||
*/
|
*/
|
||||||
class PriorityExecutorBasedEventDrivenDispatcher(
|
class PriorityExecutorBasedEventDrivenDispatcher(
|
||||||
name: String,
|
name: String,
|
||||||
|
|
@ -242,10 +272,10 @@ class PriorityExecutorBasedEventDrivenDispatcher(
|
||||||
config: ThreadPoolConfig = ThreadPoolConfig()
|
config: ThreadPoolConfig = ThreadPoolConfig()
|
||||||
) extends ExecutorBasedEventDrivenDispatcher(name, throughput, throughputDeadlineTime, mailboxType, config) with PriorityMailbox {
|
) extends ExecutorBasedEventDrivenDispatcher(name, throughput, throughputDeadlineTime, mailboxType, config) with PriorityMailbox {
|
||||||
|
|
||||||
def this(name: String, comparator: java.util.Comparator[MessageInvocation], throughput: Int, throughputDeadlineTime: Int, mailboxType: UnboundedMailbox) =
|
def this(name: String, comparator: java.util.Comparator[MessageInvocation], throughput: Int, throughputDeadlineTime: Int, mailboxType: MailboxType) =
|
||||||
this(name, comparator, throughput, throughputDeadlineTime, mailboxType,ThreadPoolConfig()) // Needed for Java API usage
|
this(name, comparator, throughput, throughputDeadlineTime, mailboxType,ThreadPoolConfig()) // Needed for Java API usage
|
||||||
|
|
||||||
def this(name: String, comparator: java.util.Comparator[MessageInvocation], throughput: Int, mailboxType: UnboundedMailbox) =
|
def this(name: String, comparator: java.util.Comparator[MessageInvocation], throughput: Int, mailboxType: MailboxType) =
|
||||||
this(name, comparator, throughput, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, mailboxType) // Needed for Java API usage
|
this(name, comparator, throughput, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, mailboxType) // Needed for Java API usage
|
||||||
|
|
||||||
def this(name: String, comparator: java.util.Comparator[MessageInvocation], throughput: Int) =
|
def this(name: String, comparator: java.util.Comparator[MessageInvocation], throughput: Int) =
|
||||||
|
|
@ -258,6 +288,15 @@ class PriorityExecutorBasedEventDrivenDispatcher(
|
||||||
this(name, comparator, Dispatchers.THROUGHPUT, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, Dispatchers.MAILBOX_TYPE) // Needed for Java API usage
|
this(name, comparator, Dispatchers.THROUGHPUT, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, Dispatchers.MAILBOX_TYPE) // Needed for Java API usage
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Can be used to give an ExecutorBasedEventDrivenDispatcher's actors priority-enabled mailboxes
|
||||||
|
*
|
||||||
|
* Usage:
|
||||||
|
* new ExecutorBasedEventDrivenDispatcher(...) with PriorityMailbox {
|
||||||
|
* val comparator = ...comparator that determines mailbox priority ordering...
|
||||||
|
* }
|
||||||
|
*/
|
||||||
trait PriorityMailbox { self: ExecutorBasedEventDrivenDispatcher =>
|
trait PriorityMailbox { self: ExecutorBasedEventDrivenDispatcher =>
|
||||||
def comparator: java.util.Comparator[MessageInvocation]
|
def comparator: java.util.Comparator[MessageInvocation]
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue