From 0d740db75b5e2d8ddba2f603fb5be71ecc6bf94a Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Fri, 11 Mar 2011 16:48:44 +0100 Subject: [PATCH] Beefed up the concurrency level for the mailbox tests --- .../scala/akka/dispatch/MailboxHandling.scala | 6 +- .../akka/dispatch/MailboxConfigSpec.scala | 107 +++++++++++------- 2 files changed, 68 insertions(+), 45 deletions(-) diff --git a/akka-actor/src/main/scala/akka/dispatch/MailboxHandling.scala b/akka-actor/src/main/scala/akka/dispatch/MailboxHandling.scala index 80bceec96a..e0586a40a7 100644 --- a/akka-actor/src/main/scala/akka/dispatch/MailboxHandling.scala +++ b/akka-actor/src/main/scala/akka/dispatch/MailboxHandling.scala @@ -80,6 +80,6 @@ class UnboundedPriorityMessageQueue(val blockDequeue: Boolean, cmp: Comparator[M PriorityBlockingQueue[MessageInvocation](11, cmp) with UnboundedMessageQueueSemantics - class BoundedPriorityMessageQueue(capacity: Int, val pushTimeOut: Duration, val blockDequeue: Boolean, cmp: Comparator[MessageInvocation]) extends - BoundedBlockingQueue[MessageInvocation](capacity, new PriorityQueue[MessageInvocation](capacity, cmp)) with - BoundedMessageQueueSemantics +class BoundedPriorityMessageQueue(capacity: Int, val pushTimeOut: Duration, val blockDequeue: Boolean, cmp: Comparator[MessageInvocation]) extends + BoundedBlockingQueue[MessageInvocation](capacity, new PriorityQueue[MessageInvocation](11, cmp)) with + BoundedMessageQueueSemantics diff --git a/akka-actor/src/test/scala/akka/dispatch/MailboxConfigSpec.scala b/akka-actor/src/test/scala/akka/dispatch/MailboxConfigSpec.scala index 21537ff930..7a469868a4 100644 --- a/akka-actor/src/test/scala/akka/dispatch/MailboxConfigSpec.scala +++ b/akka-actor/src/test/scala/akka/dispatch/MailboxConfigSpec.scala @@ -58,49 +58,31 @@ abstract class MailboxSpec extends q.isEmpty must be === false } - "dequeue in one thread what was enqueued by another" in { - implicit val within = Duration(10,TimeUnit.SECONDS) - val config = BoundedMailbox(false, 1000, Duration(10, TimeUnit.MILLISECONDS)) - val q = factory(config) - ensureInitialMailboxState(config, q) - - def createProducer(fromNum: Int, toNum: Int): Future[Vector[MessageInvocation]] = spawn { - val messages = Vector() ++ (for(i <- fromNum to toNum) yield createMessageInvocation(i)) - for(i <- messages) q.enqueue(i) - messages - } - - val producer1 = createProducer(1, 500) - val producer2 = createProducer(501, 1000) - - def createConsumer: Future[Vector[MessageInvocation]] = spawn { - var r = Vector[MessageInvocation]() - while(!producer1.isCompleted || !producer2.isCompleted || !q.isEmpty) { - q.dequeue match { - case null => - case message => r = r :+ message - } - } - r - } - - val consumer1 = createConsumer - val consumer2 = createConsumer - - Futures.awaitAll(List(producer1, producer2, consumer1, consumer2)) - - val c1 = consumer1.result.get - val c2 = consumer2.result.get - val p1 = producer1.result.get - val p2 = producer2.result.get - - (p1.size + p2.size) must be === 1000 - (c1.size + c2.size) must be === 1000 - (c1 forall (!c2.contains(_))) must be (true) //No messages produced may exist in the - (c2 forall (!c1.contains(_))) must be (true) - (p1 forall ( m => c1.contains(m) || c2.contains(m))) must be (true) - (p2 forall ( m => c1.contains(m) || c2.contains(m))) must be (true) + "dequeue what was enqueued properly for unbounded mailboxes" in { + testEnqueueDequeue(UnboundedMailbox(false)) } + + "dequeue what was enqueued properly for bounded mailboxes" in { + testEnqueueDequeue(BoundedMailbox(false, 10000, Duration(-1, TimeUnit.MILLISECONDS))) + } + + "dequeue what was enqueued properly for bounded mailboxes with pushTimeout" in { + testEnqueueDequeue(BoundedMailbox(false, 10000, Duration(100, TimeUnit.MILLISECONDS))) + } + + /** FIXME Adapt test so it works with the last dequeue + + "dequeue what was enqueued properly for unbounded mailboxes with blockDeque" in { + testEnqueueDequeue(UnboundedMailbox(true)) + } + + "dequeue what was enqueued properly for bounded mailboxes with blockDeque" in { + testEnqueueDequeue(BoundedMailbox(true, 1000, Duration(-1, TimeUnit.MILLISECONDS))) + } + + "dequeue what was enqueued properly for bounded mailboxes with blockDeque and pushTimeout" in { + testEnqueueDequeue(BoundedMailbox(true, 1000, Duration(100, TimeUnit.MILLISECONDS))) + }*/ } //CANDIDATE FOR TESTKIT @@ -137,6 +119,47 @@ abstract class MailboxSpec extends q.size must be === 0 q.isEmpty must be === true } + + def testEnqueueDequeue(config: MailboxType) { + implicit val within = Duration(10,TimeUnit.SECONDS) + val q = factory(config) + ensureInitialMailboxState(config, q) + + def createProducer(fromNum: Int, toNum: Int): Future[Vector[MessageInvocation]] = spawn { + val messages = Vector() ++ (for(i <- fromNum to toNum) yield createMessageInvocation(i)) + for(i <- messages) q.enqueue(i) + messages + } + + val totalMessages = 10000 + val step = 500 + + val producers = for(i <- (1 to totalMessages by step).toList) yield createProducer(i,i+step-1) + + def createConsumer: Future[Vector[MessageInvocation]] = spawn { + var r = Vector[MessageInvocation]() + while(producers.exists(_.isCompleted == false) || !q.isEmpty) { + q.dequeue match { + case null => + case message => r = r :+ message + } + } + r + } + + val consumers = for(i <- (1 to 4).toList) yield createConsumer + + val ps = producers.map(_.await.resultOrException.get) + val cs = consumers.map(_.await.resultOrException.get) + + ps.map(_.size).sum must be === totalMessages //Must have produced 1000 messages + cs.map(_.size).sum must be === totalMessages //Must have consumed all produced messages + //No message is allowed to be consumed by more than one consumer + cs.flatten.distinct.size must be === totalMessages + //All produced messages should have been consumed + (cs.flatten diff ps.flatten).size must be === 0 + (ps.flatten diff cs.flatten).size must be === 0 + } } class DefaultMailboxSpec extends MailboxSpec {