Adding a rather untested BoundedBlockingQueue to wrap PriorityQueue for BoundedPriorityMessageQueue

This commit is contained in:
Viktor Klang 2011-03-11 14:51:24 +01:00
parent 5d3b669fac
commit a743dcfd02
4 changed files with 479 additions and 56 deletions

View file

@ -231,7 +231,7 @@ class PriorityExecutorBasedEventDrivenDispatcher(
val comparator: java.util.Comparator[MessageInvocation],
throughput: Int = Dispatchers.THROUGHPUT,
throughputDeadlineTime: Int = Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS,
mailboxType: UnboundedMailbox = UnboundedMailbox(false),
mailboxType: MailboxType = Dispatchers.MAILBOX_TYPE,
config: ThreadPoolConfig = ThreadPoolConfig()
) extends ExecutorBasedEventDrivenDispatcher(name, throughput, throughputDeadlineTime, mailboxType, config) with PriorityMailbox {
@ -242,13 +242,13 @@ class PriorityExecutorBasedEventDrivenDispatcher(
this(name, comparator, throughput, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, mailboxType) // Needed for Java API usage
def this(name: String, comparator: java.util.Comparator[MessageInvocation], throughput: Int) =
this(name, comparator, throughput, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, UnboundedMailbox(false)) // Needed for Java API usage
this(name, comparator, throughput, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, Dispatchers.MAILBOX_TYPE) // Needed for Java API usage
def this(name: String, comparator: java.util.Comparator[MessageInvocation], config: ThreadPoolConfig) =
this(name, comparator, Dispatchers.THROUGHPUT, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, UnboundedMailbox(false), config)
this(name, comparator, Dispatchers.THROUGHPUT, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, Dispatchers.MAILBOX_TYPE, config)
def this(name: String, comparator: java.util.Comparator[MessageInvocation]) =
this(name, comparator, Dispatchers.THROUGHPUT, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, UnboundedMailbox(false)) // Needed for Java API usage
this(name, comparator, Dispatchers.THROUGHPUT, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, Dispatchers.MAILBOX_TYPE) // Needed for Java API usage
}
trait PriorityMailbox { self: ExecutorBasedEventDrivenDispatcher =>
@ -259,9 +259,9 @@ trait PriorityMailbox { self: ExecutorBasedEventDrivenDispatcher =>
def dispatcher = self
}
case BoundedMailbox(blocking, capacity, pushTimeOut) => throw new IllegalStateException("PriorityMailbox does not work when a Bounded mailbox is specified.")
/*new BoundedPriorityMessageQueue(capacity, pushTimeOut, blocking, comparator) with ExecutableMailbox {
case BoundedMailbox(blocking, capacity, pushTimeOut) =>
new BoundedPriorityMessageQueue(capacity, pushTimeOut, blocking, comparator) with ExecutableMailbox {
def dispatcher = self
}*/
}
}
}

View file

@ -7,9 +7,8 @@ package akka.dispatch
import akka.actor.{Actor, ActorType, ActorRef, ActorInitializationException}
import akka.AkkaException
import java.util.{Queue, List, Comparator}
import java.util.{Queue, List, Comparator, PriorityQueue}
import java.util.concurrent._
import concurrent.forkjoin.LinkedTransferQueue
import akka.util._
class MessageQueueAppendFailedException(message: String) extends AkkaException(message)
@ -58,7 +57,7 @@ trait BoundedMessageQueueSemantics extends MessageQueue { self: BlockingQueue[Me
def pushTimeOut: Duration
final def enqueue(handle: MessageInvocation) {
if (pushTimeOut.toMillis > 0) {
if (pushTimeOut.length > 0 && pushTimeOut.toMillis > 0) {
if (!this.offer(handle, pushTimeOut.length, pushTimeOut.unit))
throw new MessageQueueAppendFailedException("Couldn't enqueue message " + handle + " to " + toString)
} else this put handle
@ -81,8 +80,6 @@ class UnboundedPriorityMessageQueue(val blockDequeue: Boolean, cmp: Comparator[M
PriorityBlockingQueue[MessageInvocation](11, cmp) with
UnboundedMessageQueueSemantics
/* PriorityBlockingQueue cannot be bounded
class BoundedPriorityMessageQueue(capacity: Int, val pushTimeOut: Duration, val blockDequeue: Boolean, cmp: Comparator[MessageInvocation]) extends
PriorityBlockingQueue[MessageInvocation](capacity, cmp) with
BoundedBlockingQueue[MessageInvocation](capacity, new PriorityQueue[MessageInvocation](capacity, cmp)) with
BoundedMessageQueueSemantics
*/

View file

@ -0,0 +1,315 @@
package akka.util
import java.util.concurrent.locks.ReentrantLock
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.{ TimeUnit, BlockingQueue }
import java.util.{ AbstractQueue, Queue, Collection, Iterator }
class BoundedBlockingQueue[E <: AnyRef](val maxCapacity: Int, private val backing: Queue[E]) extends AbstractQueue[E] with BlockingQueue[E] {
backing match {
case null => throw new IllegalArgumentException("Backing Queue may not be null")
case b: BlockingQueue[_] =>
require(maxCapacity > 0)
require(b.size() == 0)
require(b.remainingCapacity >= maxCapacity)
case b: Queue[_] =>
require(b.size() == 0)
require(maxCapacity > 0)
}
protected val lock = new ReentrantLock(true)
private val notEmpty = lock.newCondition()
private val notFull = lock.newCondition()
def put(e: E): Unit = { //Blocks until not full
if (e eq null) throw new NullPointerException
lock.lock()
try {
while (backing.size() == maxCapacity)
notFull.await()
require(backing.offer(e))
notEmpty.signal()
} finally {
lock.unlock()
}
}
def take(): E = { //Blocks until not empty
lock.lockInterruptibly()
try {
while (backing.size() == 0)
notEmpty.await()
val e = backing.poll()
require(e ne null)
notFull.signal()
e
} finally {
lock.unlock()
}
}
def offer(e: E): Boolean = { //Tries to do it immediately, if fail return false
if (e eq null) throw new NullPointerException
lock.lock()
try {
if (backing.size() == maxCapacity) false
else {
require(backing.offer(e)) //Should never fail
notEmpty.signal()
true
}
} finally {
lock.unlock()
}
}
def offer(e: E, timeout: Long, unit: TimeUnit): Boolean = { //Tries to do it within the timeout, return false if fail
if (e eq null) throw new NullPointerException
var nanos = unit.toNanos(timeout)
lock.lockInterruptibly()
try {
while(backing.size() == maxCapacity) {
if (nanos <= 0)
return false
else
nanos = notFull.awaitNanos(nanos)
}
require(backing.offer(e)) //Should never fail
notEmpty.signal()
true
} finally {
lock.unlock()
}
}
def poll(timeout: Long, unit: TimeUnit): E = { //Tries to do it within the timeout, returns null if fail
var nanos = unit.toNanos(timeout)
lock.lockInterruptibly()
try {
var result: E = null.asInstanceOf[E]
var hasResult = false
while(!hasResult) {
hasResult = backing.poll() match {
case null if nanos <= 0 =>
result = null.asInstanceOf[E]
true
case null =>
try {
nanos = notEmpty.awaitNanos(nanos)
} catch {
case ie: InterruptedException =>
notEmpty.signal()
throw ie
}
false
case e =>
notFull.signal()
result = e
true
}
}
result
} finally {
lock.unlock()
}
}
def poll(): E = { //Tries to remove the head of the queue immediately, if fail, return null
lock.lock()
try {
backing.poll() match {
case null => null.asInstanceOf[E]
case e =>
notFull.signal()
e
}
} finally {
lock.unlock
}
}
override def remove(e: AnyRef): Boolean = { //Tries to do it immediately, if fail, return false
if (e eq null) throw new NullPointerException
lock.lock()
try {
if (backing remove e) {
notFull.signal()
true
} else false
} finally {
lock.unlock()
}
}
override def contains(e: AnyRef): Boolean = {
if (e eq null) throw new NullPointerException
lock.lock()
try {
backing contains e
} finally {
lock.unlock()
}
}
override def clear(): Unit = {
lock.lock()
try {
backing.clear
} finally {
lock.unlock()
}
}
def remainingCapacity(): Int = {
lock.lock()
try {
maxCapacity - backing.size()
} finally {
lock.unlock()
}
}
def size(): Int = {
lock.lock()
try {
backing.size()
} finally {
lock.unlock()
}
}
def peek(): E = {
lock.lock()
try {
backing.peek()
} finally {
lock.unlock()
}
}
def drainTo(c: Collection[_ >: E]): Int = drainTo(c, Int.MaxValue)
def drainTo(c: Collection[_ >: E], maxElements: Int): Int = {
if (c eq null) throw new NullPointerException
if (c eq this) throw new IllegalArgumentException
if (maxElements <= 0) 0
else {
lock.lock()
try {
var n = 0
var e: E = null.asInstanceOf[E]
while(n < maxElements) {
backing.poll() match {
case null => return n
case e =>
c add e
n += 1
}
}
n
} finally {
lock.unlock()
}
}
}
override def containsAll(c: Collection[_]): Boolean = {
lock.lock()
try {
backing containsAll c
} finally {
lock.unlock()
}
}
override def removeAll(c: Collection[_]): Boolean = {
lock.lock()
try {
if (backing.removeAll(c)) {
val sz = backing.size()
if (sz < maxCapacity) notFull.signal()
if (sz > 0) notEmpty.signal() //FIXME needed?
true
} else false
} finally {
lock.unlock()
}
}
override def retainAll(c: Collection[_]): Boolean = {
lock.lock()
try {
if (backing.retainAll(c)) {
val sz = backing.size()
if (sz < maxCapacity) notFull.signal() //FIXME needed?
if (sz > 0) notEmpty.signal()
true
} else false
} finally {
lock.unlock()
}
}
def iterator(): Iterator[E] = {
lock.lock
try {
val elements = backing.toArray
new Iterator[E] {
var at = 0
var last = -1
def hasNext(): Boolean = at < elements.length
def next(): E = {
if (at >= elements.length) throw new NoSuchElementException
last = at
at += 1
elements(last).asInstanceOf[E]
}
def remove(): Unit = {
if (last < 0) throw new IllegalStateException
val target = elements(last)
last = -1 //To avoid 2 subsequent removes without a next in between
lock.lock()
try {
val i = backing.iterator()
while(i.hasNext) {
if (i.next eq target) {
i.remove()
notFull.signal()
return ()
}
}
} finally {
lock.unlock()
}
}
}
} finally {
lock.unlock
}
}
override def toArray(): Array[AnyRef] = {
lock.lock()
try {
backing.toArray
} finally {
lock.unlock()
}
}
override def isEmpty(): Boolean = {
lock.lock()
try {
backing.isEmpty()
} finally {
lock.unlock()
}
}
//FIXME Implement toArray[T] => Array[T]
}

View file

@ -1,54 +1,165 @@
package akka.actor.dispatch
package akka.dispatch
import org.scalatest.WordSpec
import org.scalatest.matchers.MustMatchers
import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
import org.scalatest.junit.JUnitRunner
import org.junit.runner.RunWith
import akka.actor. {Actor, ActorRegistry}
import akka.actor.Actor.{actorOf}
import java.util.concurrent. {TimeUnit, CountDownLatch, BlockingQueue}
import java.util.{Queue}
import akka.util._
import akka.util.Duration._
import org.scalatest.junit.JUnitSuite
import org.junit.Test
@RunWith(classOf[JUnitRunner])
abstract class MailboxSpec extends
WordSpec with
MustMatchers with
BeforeAndAfterAll with
BeforeAndAfterEach {
def name: String
import akka.actor.Actor
import akka.util.Duration
import akka.dispatch._
import Actor._
def factory: MailboxType => MessageQueue
import java.util.concurrent.{BlockingQueue, CountDownLatch, TimeUnit}
import java.util.concurrent.atomic.AtomicReference
name should {
"create a !blockDequeue && unbounded mailbox" in {
val config = UnboundedMailbox(false)
val q = factory(config)
ensureInitialMailboxState(config, q)
class MailboxTypeSpec extends JUnitSuite {
@Test def shouldDoNothing = assert(true)
implicit val within = Duration(1,TimeUnit.SECONDS)
/*
private val unit = TimeUnit.MILLISECONDS
val f = spawn {
q.dequeue
}
@Test def shouldCreateUnboundedQueue = {
val m = UnboundedMailbox(false)
assert(m.newMailbox("uuid").asInstanceOf[BlockingQueue[MessageInvocation]].remainingCapacity === Integer.MAX_VALUE)
f.await.resultOrException must be === Some(null)
}
"create a !blockDequeue and bounded mailbox with 10 capacity and with push timeout" in {
val config = BoundedMailbox(false, 10, Duration(10,TimeUnit.MILLISECONDS))
val q = factory(config)
ensureInitialMailboxState(config, q)
val exampleMessage = createMessageInvocation("test")
for(i <- 1 to config.capacity) q.enqueue(exampleMessage)
q.size must be === config.capacity
q.isEmpty must be === false
intercept[MessageQueueAppendFailedException] {
q.enqueue(exampleMessage)
}
q.dequeue must be === exampleMessage
q.size must be (config.capacity - 1)
q.isEmpty must be === false
}
"dequeue in one thread what was enqueued by another" in {
implicit val within = Duration(10,TimeUnit.SECONDS)
val config = BoundedMailbox(false, 1000, Duration(10, TimeUnit.MILLISECONDS))
val q = factory(config)
ensureInitialMailboxState(config, q)
def createProducer(fromNum: Int, toNum: Int): Future[Vector[MessageInvocation]] = spawn {
val messages = Vector() ++ (for(i <- fromNum to toNum) yield createMessageInvocation(i))
for(i <- messages) q.enqueue(i)
messages
}
val producer1 = createProducer(1, 500)
val producer2 = createProducer(501, 1000)
def createConsumer: Future[Vector[MessageInvocation]] = spawn {
var r = Vector[MessageInvocation]()
while(!producer1.isCompleted || !producer2.isCompleted || !q.isEmpty) {
q.dequeue match {
case null =>
case message => r = r :+ message
}
}
r
}
val consumer1 = createConsumer
val consumer2 = createConsumer
Futures.awaitAll(List(producer1, producer2, consumer1, consumer2))
val c1 = consumer1.result.get
val c2 = consumer2.result.get
val p1 = producer1.result.get
val p2 = producer2.result.get
(p1.size + p2.size) must be === 1000
(c1.size + c2.size) must be === 1000
(c1 forall (!c2.contains(_))) must be (true) //No messages produced may exist in the
(c2 forall (!c1.contains(_))) must be (true)
(p1 forall ( m => c1.contains(m) || c2.contains(m))) must be (true)
(p2 forall ( m => c1.contains(m) || c2.contains(m))) must be (true)
}
}
//CANDIDATE FOR TESTKIT
def spawn[T <: AnyRef](fun: => T)(implicit within: Duration): Future[T] = {
val result = new DefaultCompletableFuture[T](within.length, within.unit)
val t = new Thread(new Runnable {
def run = try {
result.completeWithResult(fun)
} catch {
case e: Throwable => result.completeWithException(e)
}
})
t.start
result
}
def createMessageInvocation(msg: Any): MessageInvocation = {
new MessageInvocation(
actorOf(new Actor { //Dummy actor
def receive = { case _ => }
}), msg, None, None)
}
def ensureInitialMailboxState(config: MailboxType, q: MessageQueue) {
q must not be null
q match {
case aQueue: BlockingQueue[_] =>
config match {
case BoundedMailbox(_,capacity,_) => aQueue.remainingCapacity must be === capacity
case UnboundedMailbox(_) => aQueue.remainingCapacity must be === Int.MaxValue
}
case _ =>
}
q.size must be === 0
q.isEmpty must be === true
}
}
@Test def shouldCreateBoundedQueue = {
val m = BoundedMailbox(blocking = false, capacity = 1)
assert(m.newMailbox("uuid").asInstanceOf[BlockingQueue[MessageInvocation]].remainingCapacity === 1)
class DefaultMailboxSpec extends MailboxSpec {
lazy val name = "The default mailbox implementation"
def factory = {
case UnboundedMailbox(blockDequeue) =>
new DefaultUnboundedMessageQueue(blockDequeue)
case BoundedMailbox(blocking, capacity, pushTimeOut) =>
new DefaultBoundedMessageQueue(capacity, pushTimeOut, blocking)
}
@Test(expected = classOf[MessageQueueAppendFailedException]) def shouldThrowMessageQueueAppendFailedExceptionWhenTimeOutEnqueue = {
val m = BoundedMailbox(false, 1, Duration(1, unit))
val testActor = actorOf( new Actor { def receive = { case _ => }} )
val mbox = m.newMailbox("uuid")
(1 to 10000) foreach { i => mbox.enqueue(new MessageInvocation(testActor, i, None, None, None)) }
}
@Test def shouldBeAbleToDequeueUnblocking = {
val m = BoundedMailbox(false, 1, Duration(1, unit))
val mbox = m.newMailbox("uuid")
val latch = new CountDownLatch(1)
val t = new Thread { override def run = {
mbox.dequeue
latch.countDown
}}
t.start
val result = latch.await(5000,unit)
if (!result)
t.interrupt
assert(result === true)
}
*/
}
class PriorityMailboxSpec extends MailboxSpec {
val comparator = new java.util.Comparator[MessageInvocation] {
def compare(a: MessageInvocation, b: MessageInvocation): Int = {
a.## - b.##
}
}
lazy val name = "The priority mailbox implementation"
def factory = {
case UnboundedMailbox(blockDequeue) =>
new UnboundedPriorityMessageQueue(blockDequeue, comparator)
case BoundedMailbox(blocking, capacity, pushTimeOut) =>
new BoundedPriorityMessageQueue(capacity, pushTimeOut, blocking, comparator)
}
}