Beefed up the concurrency level for the mailbox tests
This commit is contained in:
parent
a743dcfd02
commit
0d740db75b
2 changed files with 68 additions and 45 deletions
|
|
@ -80,6 +80,6 @@ class UnboundedPriorityMessageQueue(val blockDequeue: Boolean, cmp: Comparator[M
|
|||
PriorityBlockingQueue[MessageInvocation](11, cmp) with
|
||||
UnboundedMessageQueueSemantics
|
||||
|
||||
class BoundedPriorityMessageQueue(capacity: Int, val pushTimeOut: Duration, val blockDequeue: Boolean, cmp: Comparator[MessageInvocation]) extends
|
||||
BoundedBlockingQueue[MessageInvocation](capacity, new PriorityQueue[MessageInvocation](capacity, cmp)) with
|
||||
BoundedMessageQueueSemantics
|
||||
class BoundedPriorityMessageQueue(capacity: Int, val pushTimeOut: Duration, val blockDequeue: Boolean, cmp: Comparator[MessageInvocation]) extends
|
||||
BoundedBlockingQueue[MessageInvocation](capacity, new PriorityQueue[MessageInvocation](11, cmp)) with
|
||||
BoundedMessageQueueSemantics
|
||||
|
|
|
|||
|
|
@ -58,49 +58,31 @@ abstract class MailboxSpec extends
|
|||
q.isEmpty must be === false
|
||||
}
|
||||
|
||||
"dequeue in one thread what was enqueued by another" in {
|
||||
implicit val within = Duration(10,TimeUnit.SECONDS)
|
||||
val config = BoundedMailbox(false, 1000, Duration(10, TimeUnit.MILLISECONDS))
|
||||
val q = factory(config)
|
||||
ensureInitialMailboxState(config, q)
|
||||
|
||||
def createProducer(fromNum: Int, toNum: Int): Future[Vector[MessageInvocation]] = spawn {
|
||||
val messages = Vector() ++ (for(i <- fromNum to toNum) yield createMessageInvocation(i))
|
||||
for(i <- messages) q.enqueue(i)
|
||||
messages
|
||||
}
|
||||
|
||||
val producer1 = createProducer(1, 500)
|
||||
val producer2 = createProducer(501, 1000)
|
||||
|
||||
def createConsumer: Future[Vector[MessageInvocation]] = spawn {
|
||||
var r = Vector[MessageInvocation]()
|
||||
while(!producer1.isCompleted || !producer2.isCompleted || !q.isEmpty) {
|
||||
q.dequeue match {
|
||||
case null =>
|
||||
case message => r = r :+ message
|
||||
}
|
||||
}
|
||||
r
|
||||
}
|
||||
|
||||
val consumer1 = createConsumer
|
||||
val consumer2 = createConsumer
|
||||
|
||||
Futures.awaitAll(List(producer1, producer2, consumer1, consumer2))
|
||||
|
||||
val c1 = consumer1.result.get
|
||||
val c2 = consumer2.result.get
|
||||
val p1 = producer1.result.get
|
||||
val p2 = producer2.result.get
|
||||
|
||||
(p1.size + p2.size) must be === 1000
|
||||
(c1.size + c2.size) must be === 1000
|
||||
(c1 forall (!c2.contains(_))) must be (true) //No messages produced may exist in the
|
||||
(c2 forall (!c1.contains(_))) must be (true)
|
||||
(p1 forall ( m => c1.contains(m) || c2.contains(m))) must be (true)
|
||||
(p2 forall ( m => c1.contains(m) || c2.contains(m))) must be (true)
|
||||
"dequeue what was enqueued properly for unbounded mailboxes" in {
|
||||
testEnqueueDequeue(UnboundedMailbox(false))
|
||||
}
|
||||
|
||||
"dequeue what was enqueued properly for bounded mailboxes" in {
|
||||
testEnqueueDequeue(BoundedMailbox(false, 10000, Duration(-1, TimeUnit.MILLISECONDS)))
|
||||
}
|
||||
|
||||
"dequeue what was enqueued properly for bounded mailboxes with pushTimeout" in {
|
||||
testEnqueueDequeue(BoundedMailbox(false, 10000, Duration(100, TimeUnit.MILLISECONDS)))
|
||||
}
|
||||
|
||||
/** FIXME Adapt test so it works with the last dequeue
|
||||
|
||||
"dequeue what was enqueued properly for unbounded mailboxes with blockDeque" in {
|
||||
testEnqueueDequeue(UnboundedMailbox(true))
|
||||
}
|
||||
|
||||
"dequeue what was enqueued properly for bounded mailboxes with blockDeque" in {
|
||||
testEnqueueDequeue(BoundedMailbox(true, 1000, Duration(-1, TimeUnit.MILLISECONDS)))
|
||||
}
|
||||
|
||||
"dequeue what was enqueued properly for bounded mailboxes with blockDeque and pushTimeout" in {
|
||||
testEnqueueDequeue(BoundedMailbox(true, 1000, Duration(100, TimeUnit.MILLISECONDS)))
|
||||
}*/
|
||||
}
|
||||
|
||||
//CANDIDATE FOR TESTKIT
|
||||
|
|
@ -137,6 +119,47 @@ abstract class MailboxSpec extends
|
|||
q.size must be === 0
|
||||
q.isEmpty must be === true
|
||||
}
|
||||
|
||||
def testEnqueueDequeue(config: MailboxType) {
|
||||
implicit val within = Duration(10,TimeUnit.SECONDS)
|
||||
val q = factory(config)
|
||||
ensureInitialMailboxState(config, q)
|
||||
|
||||
def createProducer(fromNum: Int, toNum: Int): Future[Vector[MessageInvocation]] = spawn {
|
||||
val messages = Vector() ++ (for(i <- fromNum to toNum) yield createMessageInvocation(i))
|
||||
for(i <- messages) q.enqueue(i)
|
||||
messages
|
||||
}
|
||||
|
||||
val totalMessages = 10000
|
||||
val step = 500
|
||||
|
||||
val producers = for(i <- (1 to totalMessages by step).toList) yield createProducer(i,i+step-1)
|
||||
|
||||
def createConsumer: Future[Vector[MessageInvocation]] = spawn {
|
||||
var r = Vector[MessageInvocation]()
|
||||
while(producers.exists(_.isCompleted == false) || !q.isEmpty) {
|
||||
q.dequeue match {
|
||||
case null =>
|
||||
case message => r = r :+ message
|
||||
}
|
||||
}
|
||||
r
|
||||
}
|
||||
|
||||
val consumers = for(i <- (1 to 4).toList) yield createConsumer
|
||||
|
||||
val ps = producers.map(_.await.resultOrException.get)
|
||||
val cs = consumers.map(_.await.resultOrException.get)
|
||||
|
||||
ps.map(_.size).sum must be === totalMessages //Must have produced 1000 messages
|
||||
cs.map(_.size).sum must be === totalMessages //Must have consumed all produced messages
|
||||
//No message is allowed to be consumed by more than one consumer
|
||||
cs.flatten.distinct.size must be === totalMessages
|
||||
//All produced messages should have been consumed
|
||||
(cs.flatten diff ps.flatten).size must be === 0
|
||||
(ps.flatten diff cs.flatten).size must be === 0
|
||||
}
|
||||
}
|
||||
|
||||
class DefaultMailboxSpec extends MailboxSpec {
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue