diff --git a/akka-actor/src/main/scala/akka/dispatch/ExecutorBasedEventDrivenDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/ExecutorBasedEventDrivenDispatcher.scala index e3a72c4c9f..fab5580b9e 100644 --- a/akka-actor/src/main/scala/akka/dispatch/ExecutorBasedEventDrivenDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/ExecutorBasedEventDrivenDispatcher.scala @@ -231,7 +231,7 @@ class PriorityExecutorBasedEventDrivenDispatcher( val comparator: java.util.Comparator[MessageInvocation], throughput: Int = Dispatchers.THROUGHPUT, throughputDeadlineTime: Int = Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, - mailboxType: UnboundedMailbox = UnboundedMailbox(false), + mailboxType: MailboxType = Dispatchers.MAILBOX_TYPE, config: ThreadPoolConfig = ThreadPoolConfig() ) extends ExecutorBasedEventDrivenDispatcher(name, throughput, throughputDeadlineTime, mailboxType, config) with PriorityMailbox { @@ -242,13 +242,13 @@ class PriorityExecutorBasedEventDrivenDispatcher( this(name, comparator, throughput, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, mailboxType) // Needed for Java API usage def this(name: String, comparator: java.util.Comparator[MessageInvocation], throughput: Int) = - this(name, comparator, throughput, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, UnboundedMailbox(false)) // Needed for Java API usage + this(name, comparator, throughput, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, Dispatchers.MAILBOX_TYPE) // Needed for Java API usage def this(name: String, comparator: java.util.Comparator[MessageInvocation], config: ThreadPoolConfig) = - this(name, comparator, Dispatchers.THROUGHPUT, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, UnboundedMailbox(false), config) + this(name, comparator, Dispatchers.THROUGHPUT, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, Dispatchers.MAILBOX_TYPE, config) def this(name: String, comparator: java.util.Comparator[MessageInvocation]) = - this(name, comparator, Dispatchers.THROUGHPUT, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, UnboundedMailbox(false)) // Needed for Java API usage + this(name, comparator, Dispatchers.THROUGHPUT, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, Dispatchers.MAILBOX_TYPE) // Needed for Java API usage } trait PriorityMailbox { self: ExecutorBasedEventDrivenDispatcher => @@ -259,9 +259,9 @@ trait PriorityMailbox { self: ExecutorBasedEventDrivenDispatcher => def dispatcher = self } - case BoundedMailbox(blocking, capacity, pushTimeOut) => throw new IllegalStateException("PriorityMailbox does not work when a Bounded mailbox is specified.") - /*new BoundedPriorityMessageQueue(capacity, pushTimeOut, blocking, comparator) with ExecutableMailbox { + case BoundedMailbox(blocking, capacity, pushTimeOut) => + new BoundedPriorityMessageQueue(capacity, pushTimeOut, blocking, comparator) with ExecutableMailbox { def dispatcher = self - }*/ + } } } \ No newline at end of file diff --git a/akka-actor/src/main/scala/akka/dispatch/MailboxHandling.scala b/akka-actor/src/main/scala/akka/dispatch/MailboxHandling.scala index 9e40dbfa68..80bceec96a 100644 --- a/akka-actor/src/main/scala/akka/dispatch/MailboxHandling.scala +++ b/akka-actor/src/main/scala/akka/dispatch/MailboxHandling.scala @@ -7,9 +7,8 @@ package akka.dispatch import akka.actor.{Actor, ActorType, ActorRef, ActorInitializationException} import akka.AkkaException -import java.util.{Queue, List, Comparator} +import java.util.{Queue, List, Comparator, PriorityQueue} import java.util.concurrent._ -import concurrent.forkjoin.LinkedTransferQueue import akka.util._ class MessageQueueAppendFailedException(message: String) extends AkkaException(message) @@ -58,7 +57,7 @@ trait BoundedMessageQueueSemantics extends MessageQueue { self: BlockingQueue[Me def pushTimeOut: Duration final def enqueue(handle: MessageInvocation) { - if (pushTimeOut.toMillis > 0) { + if (pushTimeOut.length > 0 && pushTimeOut.toMillis > 0) { if (!this.offer(handle, pushTimeOut.length, pushTimeOut.unit)) throw new MessageQueueAppendFailedException("Couldn't enqueue message " + handle + " to " + toString) } else this put handle @@ -81,8 +80,6 @@ class UnboundedPriorityMessageQueue(val blockDequeue: Boolean, cmp: Comparator[M PriorityBlockingQueue[MessageInvocation](11, cmp) with UnboundedMessageQueueSemantics -/* PriorityBlockingQueue cannot be bounded class BoundedPriorityMessageQueue(capacity: Int, val pushTimeOut: Duration, val blockDequeue: Boolean, cmp: Comparator[MessageInvocation]) extends - PriorityBlockingQueue[MessageInvocation](capacity, cmp) with + BoundedBlockingQueue[MessageInvocation](capacity, new PriorityQueue[MessageInvocation](capacity, cmp)) with BoundedMessageQueueSemantics - */ \ No newline at end of file diff --git a/akka-actor/src/main/scala/akka/util/BoundedBlockingQueue.scala b/akka-actor/src/main/scala/akka/util/BoundedBlockingQueue.scala new file mode 100644 index 0000000000..7a229c31b3 --- /dev/null +++ b/akka-actor/src/main/scala/akka/util/BoundedBlockingQueue.scala @@ -0,0 +1,315 @@ +package akka.util + +import java.util.concurrent.locks.ReentrantLock +import java.util.concurrent.atomic.AtomicInteger +import java.util.concurrent.{ TimeUnit, BlockingQueue } +import java.util.{ AbstractQueue, Queue, Collection, Iterator } + +class BoundedBlockingQueue[E <: AnyRef](val maxCapacity: Int, private val backing: Queue[E]) extends AbstractQueue[E] with BlockingQueue[E] { + + backing match { + case null => throw new IllegalArgumentException("Backing Queue may not be null") + case b: BlockingQueue[_] => + require(maxCapacity > 0) + require(b.size() == 0) + require(b.remainingCapacity >= maxCapacity) + case b: Queue[_] => + require(b.size() == 0) + require(maxCapacity > 0) + } + + protected val lock = new ReentrantLock(true) + + private val notEmpty = lock.newCondition() + private val notFull = lock.newCondition() + + def put(e: E): Unit = { //Blocks until not full + if (e eq null) throw new NullPointerException + lock.lock() + try { + while (backing.size() == maxCapacity) + notFull.await() + require(backing.offer(e)) + notEmpty.signal() + } finally { + lock.unlock() + } + } + + def take(): E = { //Blocks until not empty + lock.lockInterruptibly() + try { + while (backing.size() == 0) + notEmpty.await() + val e = backing.poll() + require(e ne null) + notFull.signal() + e + } finally { + lock.unlock() + } + } + + def offer(e: E): Boolean = { //Tries to do it immediately, if fail return false + if (e eq null) throw new NullPointerException + lock.lock() + try { + if (backing.size() == maxCapacity) false + else { + require(backing.offer(e)) //Should never fail + notEmpty.signal() + true + } + } finally { + lock.unlock() + } + } + + def offer(e: E, timeout: Long, unit: TimeUnit): Boolean = { //Tries to do it within the timeout, return false if fail + if (e eq null) throw new NullPointerException + var nanos = unit.toNanos(timeout) + lock.lockInterruptibly() + try { + while(backing.size() == maxCapacity) { + if (nanos <= 0) + return false + else + nanos = notFull.awaitNanos(nanos) + } + require(backing.offer(e)) //Should never fail + notEmpty.signal() + true + } finally { + lock.unlock() + } + } + + def poll(timeout: Long, unit: TimeUnit): E = { //Tries to do it within the timeout, returns null if fail + var nanos = unit.toNanos(timeout) + lock.lockInterruptibly() + try { + var result: E = null.asInstanceOf[E] + var hasResult = false + while(!hasResult) { + hasResult = backing.poll() match { + case null if nanos <= 0 => + result = null.asInstanceOf[E] + true + case null => + try { + nanos = notEmpty.awaitNanos(nanos) + } catch { + case ie: InterruptedException => + notEmpty.signal() + throw ie + } + false + case e => + notFull.signal() + result = e + true + } + } + result + } finally { + lock.unlock() + } + } + + def poll(): E = { //Tries to remove the head of the queue immediately, if fail, return null + lock.lock() + try { + backing.poll() match { + case null => null.asInstanceOf[E] + case e => + notFull.signal() + e + } + } finally { + lock.unlock + } + } + + override def remove(e: AnyRef): Boolean = { //Tries to do it immediately, if fail, return false + if (e eq null) throw new NullPointerException + lock.lock() + try { + if (backing remove e) { + notFull.signal() + true + } else false + } finally { + lock.unlock() + } + } + + override def contains(e: AnyRef): Boolean = { + if (e eq null) throw new NullPointerException + lock.lock() + try { + backing contains e + } finally { + lock.unlock() + } + } + + override def clear(): Unit = { + lock.lock() + try { + backing.clear + } finally { + lock.unlock() + } + } + + def remainingCapacity(): Int = { + lock.lock() + try { + maxCapacity - backing.size() + } finally { + lock.unlock() + } + } + + def size(): Int = { + lock.lock() + try { + backing.size() + } finally { + lock.unlock() + } + } + + def peek(): E = { + lock.lock() + try { + backing.peek() + } finally { + lock.unlock() + } + } + + def drainTo(c: Collection[_ >: E]): Int = drainTo(c, Int.MaxValue) + + def drainTo(c: Collection[_ >: E], maxElements: Int): Int = { + if (c eq null) throw new NullPointerException + if (c eq this) throw new IllegalArgumentException + if (maxElements <= 0) 0 + else { + lock.lock() + try { + var n = 0 + var e: E = null.asInstanceOf[E] + while(n < maxElements) { + backing.poll() match { + case null => return n + case e => + c add e + n += 1 + } + } + n + } finally { + lock.unlock() + } + } + } + + override def containsAll(c: Collection[_]): Boolean = { + lock.lock() + try { + backing containsAll c + } finally { + lock.unlock() + } + } + + override def removeAll(c: Collection[_]): Boolean = { + lock.lock() + try { + if (backing.removeAll(c)) { + val sz = backing.size() + if (sz < maxCapacity) notFull.signal() + if (sz > 0) notEmpty.signal() //FIXME needed? + true + } else false + } finally { + lock.unlock() + } + } + + override def retainAll(c: Collection[_]): Boolean = { + lock.lock() + try { + if (backing.retainAll(c)) { + val sz = backing.size() + if (sz < maxCapacity) notFull.signal() //FIXME needed? + if (sz > 0) notEmpty.signal() + true + } else false + } finally { + lock.unlock() + } + } + + def iterator(): Iterator[E] = { + lock.lock + try { + val elements = backing.toArray + new Iterator[E] { + var at = 0 + var last = -1 + + def hasNext(): Boolean = at < elements.length + + def next(): E = { + if (at >= elements.length) throw new NoSuchElementException + last = at + at += 1 + elements(last).asInstanceOf[E] + } + + def remove(): Unit = { + if (last < 0) throw new IllegalStateException + val target = elements(last) + last = -1 //To avoid 2 subsequent removes without a next in between + lock.lock() + try { + val i = backing.iterator() + while(i.hasNext) { + if (i.next eq target) { + i.remove() + notFull.signal() + return () + } + } + } finally { + lock.unlock() + } + } + } + } finally { + lock.unlock + } + } + + override def toArray(): Array[AnyRef] = { + lock.lock() + try { + backing.toArray + } finally { + lock.unlock() + } + } + + override def isEmpty(): Boolean = { + lock.lock() + try { + backing.isEmpty() + } finally { + lock.unlock() + } + } + + //FIXME Implement toArray[T] => Array[T] +} \ No newline at end of file diff --git a/akka-actor/src/test/scala/akka/dispatch/MailboxConfigSpec.scala b/akka-actor/src/test/scala/akka/dispatch/MailboxConfigSpec.scala index 5dd0dfbe6d..21537ff930 100644 --- a/akka-actor/src/test/scala/akka/dispatch/MailboxConfigSpec.scala +++ b/akka-actor/src/test/scala/akka/dispatch/MailboxConfigSpec.scala @@ -1,54 +1,165 @@ -package akka.actor.dispatch +package akka.dispatch +import org.scalatest.WordSpec +import org.scalatest.matchers.MustMatchers +import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach} +import org.scalatest.junit.JUnitRunner +import org.junit.runner.RunWith +import akka.actor. {Actor, ActorRegistry} +import akka.actor.Actor.{actorOf} +import java.util.concurrent. {TimeUnit, CountDownLatch, BlockingQueue} +import java.util.{Queue} +import akka.util._ +import akka.util.Duration._ -import org.scalatest.junit.JUnitSuite -import org.junit.Test +@RunWith(classOf[JUnitRunner]) +abstract class MailboxSpec extends + WordSpec with + MustMatchers with + BeforeAndAfterAll with + BeforeAndAfterEach { + def name: String -import akka.actor.Actor -import akka.util.Duration -import akka.dispatch._ -import Actor._ + def factory: MailboxType => MessageQueue -import java.util.concurrent.{BlockingQueue, CountDownLatch, TimeUnit} -import java.util.concurrent.atomic.AtomicReference + name should { + "create a !blockDequeue && unbounded mailbox" in { + val config = UnboundedMailbox(false) + val q = factory(config) + ensureInitialMailboxState(config, q) -class MailboxTypeSpec extends JUnitSuite { - @Test def shouldDoNothing = assert(true) + implicit val within = Duration(1,TimeUnit.SECONDS) -/* - private val unit = TimeUnit.MILLISECONDS + val f = spawn { + q.dequeue + } - @Test def shouldCreateUnboundedQueue = { - val m = UnboundedMailbox(false) - assert(m.newMailbox("uuid").asInstanceOf[BlockingQueue[MessageInvocation]].remainingCapacity === Integer.MAX_VALUE) + f.await.resultOrException must be === Some(null) + } + + "create a !blockDequeue and bounded mailbox with 10 capacity and with push timeout" in { + val config = BoundedMailbox(false, 10, Duration(10,TimeUnit.MILLISECONDS)) + val q = factory(config) + ensureInitialMailboxState(config, q) + + val exampleMessage = createMessageInvocation("test") + + for(i <- 1 to config.capacity) q.enqueue(exampleMessage) + + q.size must be === config.capacity + q.isEmpty must be === false + + intercept[MessageQueueAppendFailedException] { + q.enqueue(exampleMessage) + } + + q.dequeue must be === exampleMessage + q.size must be (config.capacity - 1) + q.isEmpty must be === false + } + + "dequeue in one thread what was enqueued by another" in { + implicit val within = Duration(10,TimeUnit.SECONDS) + val config = BoundedMailbox(false, 1000, Duration(10, TimeUnit.MILLISECONDS)) + val q = factory(config) + ensureInitialMailboxState(config, q) + + def createProducer(fromNum: Int, toNum: Int): Future[Vector[MessageInvocation]] = spawn { + val messages = Vector() ++ (for(i <- fromNum to toNum) yield createMessageInvocation(i)) + for(i <- messages) q.enqueue(i) + messages + } + + val producer1 = createProducer(1, 500) + val producer2 = createProducer(501, 1000) + + def createConsumer: Future[Vector[MessageInvocation]] = spawn { + var r = Vector[MessageInvocation]() + while(!producer1.isCompleted || !producer2.isCompleted || !q.isEmpty) { + q.dequeue match { + case null => + case message => r = r :+ message + } + } + r + } + + val consumer1 = createConsumer + val consumer2 = createConsumer + + Futures.awaitAll(List(producer1, producer2, consumer1, consumer2)) + + val c1 = consumer1.result.get + val c2 = consumer2.result.get + val p1 = producer1.result.get + val p2 = producer2.result.get + + (p1.size + p2.size) must be === 1000 + (c1.size + c2.size) must be === 1000 + (c1 forall (!c2.contains(_))) must be (true) //No messages produced may exist in the + (c2 forall (!c1.contains(_))) must be (true) + (p1 forall ( m => c1.contains(m) || c2.contains(m))) must be (true) + (p2 forall ( m => c1.contains(m) || c2.contains(m))) must be (true) + } + } + + //CANDIDATE FOR TESTKIT + def spawn[T <: AnyRef](fun: => T)(implicit within: Duration): Future[T] = { + val result = new DefaultCompletableFuture[T](within.length, within.unit) + val t = new Thread(new Runnable { + def run = try { + result.completeWithResult(fun) + } catch { + case e: Throwable => result.completeWithException(e) + } + }) + t.start + result + } + + def createMessageInvocation(msg: Any): MessageInvocation = { + new MessageInvocation( + actorOf(new Actor { //Dummy actor + def receive = { case _ => } + }), msg, None, None) + } + + def ensureInitialMailboxState(config: MailboxType, q: MessageQueue) { + q must not be null + q match { + case aQueue: BlockingQueue[_] => + config match { + case BoundedMailbox(_,capacity,_) => aQueue.remainingCapacity must be === capacity + case UnboundedMailbox(_) => aQueue.remainingCapacity must be === Int.MaxValue + } + case _ => + } + q.size must be === 0 + q.isEmpty must be === true + } } - @Test def shouldCreateBoundedQueue = { - val m = BoundedMailbox(blocking = false, capacity = 1) - assert(m.newMailbox("uuid").asInstanceOf[BlockingQueue[MessageInvocation]].remainingCapacity === 1) +class DefaultMailboxSpec extends MailboxSpec { + lazy val name = "The default mailbox implementation" + def factory = { + case UnboundedMailbox(blockDequeue) => + new DefaultUnboundedMessageQueue(blockDequeue) + case BoundedMailbox(blocking, capacity, pushTimeOut) => + new DefaultBoundedMessageQueue(capacity, pushTimeOut, blocking) } - - @Test(expected = classOf[MessageQueueAppendFailedException]) def shouldThrowMessageQueueAppendFailedExceptionWhenTimeOutEnqueue = { - val m = BoundedMailbox(false, 1, Duration(1, unit)) - val testActor = actorOf( new Actor { def receive = { case _ => }} ) - val mbox = m.newMailbox("uuid") - (1 to 10000) foreach { i => mbox.enqueue(new MessageInvocation(testActor, i, None, None, None)) } - } - - - @Test def shouldBeAbleToDequeueUnblocking = { - val m = BoundedMailbox(false, 1, Duration(1, unit)) - val mbox = m.newMailbox("uuid") - val latch = new CountDownLatch(1) - val t = new Thread { override def run = { - mbox.dequeue - latch.countDown - }} - t.start - val result = latch.await(5000,unit) - if (!result) - t.interrupt - assert(result === true) - } - */ } + +class PriorityMailboxSpec extends MailboxSpec { + val comparator = new java.util.Comparator[MessageInvocation] { + def compare(a: MessageInvocation, b: MessageInvocation): Int = { + a.## - b.## + } + } + lazy val name = "The priority mailbox implementation" + def factory = { + case UnboundedMailbox(blockDequeue) => + new UnboundedPriorityMessageQueue(blockDequeue, comparator) + case BoundedMailbox(blocking, capacity, pushTimeOut) => + new BoundedPriorityMessageQueue(capacity, pushTimeOut, blocking, comparator) + } +} \ No newline at end of file