2011-03-11 14:51:24 +01:00
|
|
|
package akka.dispatch
|
|
|
|
|
import org.scalatest.WordSpec
|
|
|
|
|
import org.scalatest.matchers.MustMatchers
|
2011-05-18 17:25:30 +02:00
|
|
|
import org.scalatest.{ BeforeAndAfterAll, BeforeAndAfterEach }
|
2011-03-11 14:51:24 +01:00
|
|
|
import org.scalatest.junit.JUnitRunner
|
|
|
|
|
import org.junit.runner.RunWith
|
2011-05-18 17:25:30 +02:00
|
|
|
import akka.actor.Actor.{ actorOf }
|
|
|
|
|
import java.util.concurrent.{ TimeUnit, CountDownLatch, BlockingQueue }
|
|
|
|
|
import java.util.{ Queue }
|
2011-03-11 14:51:24 +01:00
|
|
|
import akka.util._
|
|
|
|
|
import akka.util.Duration._
|
2011-08-26 17:25:18 +02:00
|
|
|
import akka.actor.{ LocalActorRef, Actor, ActorRegistry, NullChannel }
|
2010-09-09 15:49:59 +02:00
|
|
|
|
2011-03-11 14:51:24 +01:00
|
|
|
@RunWith(classOf[JUnitRunner])
|
2011-05-18 17:25:30 +02:00
|
|
|
abstract class MailboxSpec extends WordSpec with MustMatchers with BeforeAndAfterAll with BeforeAndAfterEach {
|
|
|
|
|
def name: String
|
2010-09-09 15:49:59 +02:00
|
|
|
|
2011-05-18 17:25:30 +02:00
|
|
|
def factory: MailboxType ⇒ MessageQueue
|
2010-09-09 15:49:59 +02:00
|
|
|
|
2011-05-18 17:25:30 +02:00
|
|
|
name should {
|
|
|
|
|
"create an unbounded mailbox" in {
|
|
|
|
|
val config = UnboundedMailbox()
|
|
|
|
|
val q = factory(config)
|
|
|
|
|
ensureInitialMailboxState(config, q)
|
2010-09-09 15:49:59 +02:00
|
|
|
|
2011-05-18 17:25:30 +02:00
|
|
|
implicit val within = Duration(1, TimeUnit.SECONDS)
|
2010-09-09 15:49:59 +02:00
|
|
|
|
2011-05-18 17:25:30 +02:00
|
|
|
val f = spawn {
|
|
|
|
|
q.dequeue
|
2011-03-11 14:51:24 +01:00
|
|
|
}
|
2010-09-09 15:49:59 +02:00
|
|
|
|
2011-05-18 17:25:30 +02:00
|
|
|
f.await.resultOrException must be === Some(null)
|
|
|
|
|
}
|
2011-03-11 14:51:24 +01:00
|
|
|
|
2011-05-18 17:25:30 +02:00
|
|
|
"create a bounded mailbox with 10 capacity and with push timeout" in {
|
|
|
|
|
val config = BoundedMailbox(10, Duration(10, TimeUnit.MILLISECONDS))
|
|
|
|
|
val q = factory(config)
|
|
|
|
|
ensureInitialMailboxState(config, q)
|
2011-03-11 14:51:24 +01:00
|
|
|
|
2011-05-18 17:25:30 +02:00
|
|
|
val exampleMessage = createMessageInvocation("test")
|
2011-03-11 14:51:24 +01:00
|
|
|
|
2011-05-18 17:25:30 +02:00
|
|
|
for (i ← 1 to config.capacity) q.enqueue(exampleMessage)
|
2011-03-11 14:51:24 +01:00
|
|
|
|
2011-05-18 17:25:30 +02:00
|
|
|
q.size must be === config.capacity
|
|
|
|
|
q.isEmpty must be === false
|
2011-03-11 14:51:24 +01:00
|
|
|
|
2011-05-18 17:25:30 +02:00
|
|
|
intercept[MessageQueueAppendFailedException] {
|
|
|
|
|
q.enqueue(exampleMessage)
|
2011-03-11 16:48:44 +01:00
|
|
|
}
|
2011-03-11 14:51:24 +01:00
|
|
|
|
2011-05-18 17:25:30 +02:00
|
|
|
q.dequeue must be === exampleMessage
|
|
|
|
|
q.size must be(config.capacity - 1)
|
|
|
|
|
q.isEmpty must be === false
|
|
|
|
|
}
|
2010-09-09 15:49:59 +02:00
|
|
|
|
2011-05-18 17:25:30 +02:00
|
|
|
"dequeue what was enqueued properly for unbounded mailboxes" in {
|
|
|
|
|
testEnqueueDequeue(UnboundedMailbox())
|
2011-03-11 14:51:24 +01:00
|
|
|
}
|
|
|
|
|
|
2011-05-18 17:25:30 +02:00
|
|
|
"dequeue what was enqueued properly for bounded mailboxes" in {
|
|
|
|
|
testEnqueueDequeue(BoundedMailbox(10000, Duration(-1, TimeUnit.MILLISECONDS)))
|
2011-03-11 14:51:24 +01:00
|
|
|
}
|
|
|
|
|
|
2011-05-18 17:25:30 +02:00
|
|
|
"dequeue what was enqueued properly for bounded mailboxes with pushTimeout" in {
|
|
|
|
|
testEnqueueDequeue(BoundedMailbox(10000, Duration(100, TimeUnit.MILLISECONDS)))
|
2011-03-11 14:51:24 +01:00
|
|
|
}
|
2011-05-18 17:25:30 +02:00
|
|
|
}
|
2011-03-11 14:51:24 +01:00
|
|
|
|
2011-05-18 17:25:30 +02:00
|
|
|
//CANDIDATE FOR TESTKIT
|
|
|
|
|
def spawn[T <: AnyRef](fun: ⇒ T)(implicit within: Duration): Future[T] = {
|
2011-05-23 11:31:01 +02:00
|
|
|
val result = new DefaultPromise[T](within.length, within.unit)
|
2011-05-18 17:25:30 +02:00
|
|
|
val t = new Thread(new Runnable {
|
|
|
|
|
def run = try {
|
|
|
|
|
result.completeWithResult(fun)
|
|
|
|
|
} catch {
|
|
|
|
|
case e: Throwable ⇒ result.completeWithException(e)
|
2011-03-11 14:51:24 +01:00
|
|
|
}
|
2011-05-18 17:25:30 +02:00
|
|
|
})
|
|
|
|
|
t.start()
|
|
|
|
|
result
|
|
|
|
|
}
|
2011-03-11 16:48:44 +01:00
|
|
|
|
2011-05-18 17:25:30 +02:00
|
|
|
def createMessageInvocation(msg: Any): MessageInvocation = {
|
|
|
|
|
new MessageInvocation(
|
|
|
|
|
actorOf(new Actor { //Dummy actor
|
|
|
|
|
def receive = { case _ ⇒ }
|
2011-08-26 17:25:18 +02:00
|
|
|
}).asInstanceOf[LocalActorRef], msg, NullChannel)
|
2011-05-18 17:25:30 +02:00
|
|
|
}
|
2011-03-11 16:48:44 +01:00
|
|
|
|
2011-05-18 17:25:30 +02:00
|
|
|
def ensureInitialMailboxState(config: MailboxType, q: MessageQueue) {
|
|
|
|
|
q must not be null
|
|
|
|
|
q match {
|
|
|
|
|
case aQueue: BlockingQueue[_] ⇒
|
|
|
|
|
config match {
|
|
|
|
|
case BoundedMailbox(capacity, _) ⇒ aQueue.remainingCapacity must be === capacity
|
|
|
|
|
case UnboundedMailbox() ⇒ aQueue.remainingCapacity must be === Int.MaxValue
|
2011-03-11 16:48:44 +01:00
|
|
|
}
|
2011-05-18 17:25:30 +02:00
|
|
|
case _ ⇒
|
|
|
|
|
}
|
|
|
|
|
q.size must be === 0
|
|
|
|
|
q.isEmpty must be === true
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
def testEnqueueDequeue(config: MailboxType) {
|
|
|
|
|
implicit val within = Duration(10, TimeUnit.SECONDS)
|
|
|
|
|
val q = factory(config)
|
|
|
|
|
ensureInitialMailboxState(config, q)
|
2011-03-11 16:48:44 +01:00
|
|
|
|
2011-05-18 17:25:30 +02:00
|
|
|
def createProducer(fromNum: Int, toNum: Int): Future[Vector[MessageInvocation]] = spawn {
|
|
|
|
|
val messages = Vector() ++ (for (i ← fromNum to toNum) yield createMessageInvocation(i))
|
|
|
|
|
for (i ← messages) q.enqueue(i)
|
|
|
|
|
messages
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
val totalMessages = 10000
|
|
|
|
|
val step = 500
|
2011-03-11 16:48:44 +01:00
|
|
|
|
2011-05-18 17:25:30 +02:00
|
|
|
val producers = for (i ← (1 to totalMessages by step).toList) yield createProducer(i, i + step - 1)
|
2011-03-11 16:48:44 +01:00
|
|
|
|
2011-05-18 17:25:30 +02:00
|
|
|
def createConsumer: Future[Vector[MessageInvocation]] = spawn {
|
|
|
|
|
var r = Vector[MessageInvocation]()
|
|
|
|
|
while (producers.exists(_.isCompleted == false) || !q.isEmpty) {
|
|
|
|
|
q.dequeue match {
|
|
|
|
|
case null ⇒
|
|
|
|
|
case message ⇒ r = r :+ message
|
2011-03-11 16:48:44 +01:00
|
|
|
}
|
2011-05-18 17:25:30 +02:00
|
|
|
}
|
|
|
|
|
r
|
|
|
|
|
}
|
2011-03-11 16:48:44 +01:00
|
|
|
|
2011-05-18 17:25:30 +02:00
|
|
|
val consumers = for (i ← (1 to 4).toList) yield createConsumer
|
2011-03-11 16:48:44 +01:00
|
|
|
|
2011-05-18 17:25:30 +02:00
|
|
|
val ps = producers.map(_.await.resultOrException.get)
|
|
|
|
|
val cs = consumers.map(_.await.resultOrException.get)
|
2011-03-11 16:48:44 +01:00
|
|
|
|
2011-05-18 17:25:30 +02:00
|
|
|
ps.map(_.size).sum must be === totalMessages //Must have produced 1000 messages
|
|
|
|
|
cs.map(_.size).sum must be === totalMessages //Must have consumed all produced messages
|
|
|
|
|
//No message is allowed to be consumed by more than one consumer
|
|
|
|
|
cs.flatten.distinct.size must be === totalMessages
|
|
|
|
|
//All produced messages should have been consumed
|
|
|
|
|
(cs.flatten diff ps.flatten).size must be === 0
|
|
|
|
|
(ps.flatten diff cs.flatten).size must be === 0
|
2011-03-11 14:51:24 +01:00
|
|
|
}
|
2011-05-18 17:25:30 +02:00
|
|
|
}
|
2011-03-11 14:51:24 +01:00
|
|
|
|
|
|
|
|
class DefaultMailboxSpec extends MailboxSpec {
|
|
|
|
|
lazy val name = "The default mailbox implementation"
|
|
|
|
|
def factory = {
|
2011-05-18 17:25:30 +02:00
|
|
|
case UnboundedMailbox() ⇒ new DefaultUnboundedMessageQueue()
|
|
|
|
|
case BoundedMailbox(capacity, pushTimeOut) ⇒ new DefaultBoundedMessageQueue(capacity, pushTimeOut)
|
2010-09-09 15:49:59 +02:00
|
|
|
}
|
|
|
|
|
}
|
2011-03-11 14:51:24 +01:00
|
|
|
|
|
|
|
|
class PriorityMailboxSpec extends MailboxSpec {
|
2011-04-11 17:08:10 +02:00
|
|
|
val comparator = PriorityGenerator(_.##)
|
2011-03-11 14:51:24 +01:00
|
|
|
lazy val name = "The priority mailbox implementation"
|
|
|
|
|
def factory = {
|
2011-05-18 17:25:30 +02:00
|
|
|
case UnboundedMailbox() ⇒ new UnboundedPriorityMessageQueue(comparator)
|
|
|
|
|
case BoundedMailbox(capacity, pushTimeOut) ⇒ new BoundedPriorityMessageQueue(capacity, pushTimeOut, comparator)
|
2011-03-11 14:51:24 +01:00
|
|
|
}
|
2011-06-13 22:36:46 +02:00
|
|
|
}
|