2012-06-13 17:57:56 +02:00
|
|
|
/**
|
2013-01-09 01:47:48 +01:00
|
|
|
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
|
2012-06-13 17:57:56 +02:00
|
|
|
*/
|
2011-03-11 14:51:24 +01:00
|
|
|
package akka.dispatch
|
2011-12-19 21:46:37 +01:00
|
|
|
|
2012-06-21 16:09:14 +02:00
|
|
|
import language.postfixOps
|
|
|
|
|
|
2012-06-25 12:55:25 +02:00
|
|
|
import java.util.concurrent.{ ConcurrentLinkedQueue, BlockingQueue }
|
2012-06-13 17:57:56 +02:00
|
|
|
import org.scalatest.{ BeforeAndAfterEach, BeforeAndAfterAll }
|
2011-12-21 19:37:18 +01:00
|
|
|
import com.typesafe.config.Config
|
2012-06-25 11:12:08 +02:00
|
|
|
import akka.actor.{ RepointableRef, Props, DeadLetter, ActorSystem, ActorRefWithCell, ActorRef, ActorCell }
|
2013-05-13 15:39:52 +02:00
|
|
|
import akka.testkit.{ EventFilter, AkkaSpec }
|
2013-04-05 15:20:25 +02:00
|
|
|
import scala.concurrent.{ Future, Promise, Await, ExecutionContext }
|
2012-09-21 14:50:06 +02:00
|
|
|
import scala.concurrent.duration._
|
2010-09-09 15:49:59 +02:00
|
|
|
|
2011-10-21 17:01:22 +02:00
|
|
|
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
|
2011-10-11 16:05:48 +02:00
|
|
|
abstract class MailboxSpec extends AkkaSpec with BeforeAndAfterAll with BeforeAndAfterEach {
|
2011-05-18 17:25:30 +02:00
|
|
|
def name: String
|
2010-09-09 15:49:59 +02:00
|
|
|
|
2012-02-19 10:28:56 +01:00
|
|
|
def factory: MailboxType ⇒ MessageQueue
|
2010-09-09 15:49:59 +02:00
|
|
|
|
2013-04-03 20:19:15 +02:00
|
|
|
def supportsBeingBounded = true
|
|
|
|
|
|
|
|
|
|
def maxConsumers = 4
|
|
|
|
|
|
2013-04-05 15:20:25 +02:00
|
|
|
private val exampleMessage = createMessageInvocation("test")
|
|
|
|
|
|
2011-05-18 17:25:30 +02:00
|
|
|
name should {
|
2013-04-05 15:20:25 +02:00
|
|
|
|
2011-05-18 17:25:30 +02:00
|
|
|
"create an unbounded mailbox" in {
|
|
|
|
|
val config = UnboundedMailbox()
|
|
|
|
|
val q = factory(config)
|
|
|
|
|
ensureInitialMailboxState(config, q)
|
2013-04-05 15:20:25 +02:00
|
|
|
}
|
2010-09-09 15:49:59 +02:00
|
|
|
|
2013-04-05 15:20:25 +02:00
|
|
|
"UnboundedMailbox.numberOfMessages must be consistent with queue size" in {
|
|
|
|
|
ensureSingleConsumerEnqueueDequeue(UnboundedMailbox())
|
|
|
|
|
}
|
2010-09-09 15:49:59 +02:00
|
|
|
|
2013-04-05 15:20:25 +02:00
|
|
|
"BoundedMailbox.numberOfMessages must be consistent with queue size" in {
|
|
|
|
|
ensureSingleConsumerEnqueueDequeue(BoundedMailbox(1000, 10 milliseconds))
|
2011-05-18 17:25:30 +02:00
|
|
|
}
|
2011-03-11 14:51:24 +01:00
|
|
|
|
2011-05-18 17:25:30 +02:00
|
|
|
"create a bounded mailbox with 10 capacity and with push timeout" in {
|
2011-11-23 15:15:44 +01:00
|
|
|
val config = BoundedMailbox(10, 10 milliseconds)
|
2011-05-18 17:25:30 +02:00
|
|
|
val q = factory(config)
|
|
|
|
|
ensureInitialMailboxState(config, q)
|
2011-03-11 14:51:24 +01:00
|
|
|
|
2013-04-05 15:20:25 +02:00
|
|
|
for (i ← 1 to config.capacity) q.enqueue(testActor, exampleMessage)
|
2011-03-11 14:51:24 +01:00
|
|
|
|
2011-09-21 16:27:31 +02:00
|
|
|
q.numberOfMessages must be === config.capacity
|
|
|
|
|
q.hasMessages must be === true
|
2011-03-11 14:51:24 +01:00
|
|
|
|
2012-06-08 13:56:53 +02:00
|
|
|
system.eventStream.subscribe(testActor, classOf[DeadLetter])
|
|
|
|
|
q.enqueue(testActor, exampleMessage)
|
|
|
|
|
expectMsg(DeadLetter(exampleMessage.message, system.deadLetters, testActor))
|
|
|
|
|
system.eventStream.unsubscribe(testActor, classOf[DeadLetter])
|
2011-03-11 14:51:24 +01:00
|
|
|
|
2011-05-18 17:25:30 +02:00
|
|
|
q.dequeue must be === exampleMessage
|
2011-09-21 16:27:31 +02:00
|
|
|
q.numberOfMessages must be(config.capacity - 1)
|
|
|
|
|
q.hasMessages must be === true
|
2011-05-18 17:25:30 +02:00
|
|
|
}
|
2010-09-09 15:49:59 +02:00
|
|
|
|
2011-05-18 17:25:30 +02:00
|
|
|
"dequeue what was enqueued properly for unbounded mailboxes" in {
|
|
|
|
|
testEnqueueDequeue(UnboundedMailbox())
|
2011-03-11 14:51:24 +01:00
|
|
|
}
|
|
|
|
|
|
2011-05-18 17:25:30 +02:00
|
|
|
"dequeue what was enqueued properly for bounded mailboxes" in {
|
2011-11-23 15:15:44 +01:00
|
|
|
testEnqueueDequeue(BoundedMailbox(10000, -1 millisecond))
|
2011-03-11 14:51:24 +01:00
|
|
|
}
|
|
|
|
|
|
2013-05-13 15:39:52 +02:00
|
|
|
"dequeue what was enqueued properly for bounded mailboxes with 0 pushTimeout" in {
|
|
|
|
|
testEnqueueDequeue(BoundedMailbox(10, 0 millisecond), 20, 10, false)
|
|
|
|
|
}
|
|
|
|
|
|
2011-05-18 17:25:30 +02:00
|
|
|
"dequeue what was enqueued properly for bounded mailboxes with pushTimeout" in {
|
2011-11-23 15:15:44 +01:00
|
|
|
testEnqueueDequeue(BoundedMailbox(10000, 100 milliseconds))
|
2011-03-11 14:51:24 +01:00
|
|
|
}
|
2011-05-18 17:25:30 +02:00
|
|
|
}
|
2011-03-11 14:51:24 +01:00
|
|
|
|
2011-05-18 17:25:30 +02:00
|
|
|
//CANDIDATE FOR TESTKIT
|
2013-04-05 15:20:25 +02:00
|
|
|
def spawn[T <: AnyRef](fun: ⇒ T): Future[T] = Future(fun)(ExecutionContext.global)
|
2011-03-11 16:48:44 +01:00
|
|
|
|
2012-06-13 17:57:56 +02:00
|
|
|
def createMessageInvocation(msg: Any): Envelope = Envelope(msg, system.deadLetters, system)
|
2011-03-11 16:48:44 +01:00
|
|
|
|
2013-04-05 15:20:25 +02:00
|
|
|
def ensureMailboxSize(q: MessageQueue, expected: Int): Unit = q.numberOfMessages match {
|
|
|
|
|
case -1 | `expected` ⇒
|
|
|
|
|
q.hasMessages must be === (expected != 0)
|
|
|
|
|
case other ⇒
|
|
|
|
|
other must be === expected
|
|
|
|
|
q.hasMessages must be === (expected != 0)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
def ensureSingleConsumerEnqueueDequeue(config: MailboxType) {
|
|
|
|
|
val q = factory(config)
|
|
|
|
|
ensureMailboxSize(q, 0)
|
|
|
|
|
q.dequeue must be === null
|
|
|
|
|
for (i ← 1 to 100) {
|
|
|
|
|
q.enqueue(testActor, exampleMessage)
|
|
|
|
|
ensureMailboxSize(q, i)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
ensureMailboxSize(q, 100)
|
|
|
|
|
|
|
|
|
|
for (i ← 99 to 0 by -1) {
|
|
|
|
|
q.dequeue() must be === exampleMessage
|
|
|
|
|
ensureMailboxSize(q, i)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
q.dequeue must be === null
|
|
|
|
|
ensureMailboxSize(q, 0)
|
|
|
|
|
}
|
|
|
|
|
|
2012-02-19 10:28:56 +01:00
|
|
|
def ensureInitialMailboxState(config: MailboxType, q: MessageQueue) {
|
2011-05-18 17:25:30 +02:00
|
|
|
q must not be null
|
|
|
|
|
q match {
|
|
|
|
|
case aQueue: BlockingQueue[_] ⇒
|
|
|
|
|
config match {
|
|
|
|
|
case BoundedMailbox(capacity, _) ⇒ aQueue.remainingCapacity must be === capacity
|
|
|
|
|
case UnboundedMailbox() ⇒ aQueue.remainingCapacity must be === Int.MaxValue
|
2011-03-11 16:48:44 +01:00
|
|
|
}
|
2011-05-18 17:25:30 +02:00
|
|
|
case _ ⇒
|
|
|
|
|
}
|
2011-09-21 16:27:31 +02:00
|
|
|
q.numberOfMessages must be === 0
|
|
|
|
|
q.hasMessages must be === false
|
2011-05-18 17:25:30 +02:00
|
|
|
}
|
|
|
|
|
|
2013-05-13 15:39:52 +02:00
|
|
|
def testEnqueueDequeue(config: MailboxType,
|
|
|
|
|
enqueueN: Int = 10000,
|
|
|
|
|
dequeueN: Int = 10000,
|
|
|
|
|
parallel: Boolean = true): Unit = within(10 seconds) {
|
2011-05-18 17:25:30 +02:00
|
|
|
val q = factory(config)
|
|
|
|
|
ensureInitialMailboxState(config, q)
|
2011-03-11 16:48:44 +01:00
|
|
|
|
2013-05-13 15:39:52 +02:00
|
|
|
EventFilter.warning(pattern = ".*received dead letter from Actor.*MailboxSpec/deadLetters.*",
|
|
|
|
|
occurrences = (enqueueN - dequeueN)) intercept {
|
2011-05-18 17:25:30 +02:00
|
|
|
|
2013-05-13 15:39:52 +02:00
|
|
|
def createProducer(fromNum: Int, toNum: Int): Future[Vector[Envelope]] = spawn {
|
|
|
|
|
val messages = Vector() ++ (for (i ← fromNum to toNum) yield createMessageInvocation(i))
|
|
|
|
|
for (i ← messages) q.enqueue(testActor, i)
|
|
|
|
|
messages
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
val producers = {
|
|
|
|
|
val step = 500
|
|
|
|
|
val ps = for (i ← (1 to enqueueN by step).toList) yield createProducer(i, Math.min(enqueueN, i + step - 1))
|
2011-03-11 16:48:44 +01:00
|
|
|
|
2013-05-13 15:39:52 +02:00
|
|
|
if (parallel == false)
|
|
|
|
|
ps foreach { Await.ready(_, remaining) }
|
2011-03-11 16:48:44 +01:00
|
|
|
|
2013-05-13 15:39:52 +02:00
|
|
|
ps
|
2011-03-11 16:48:44 +01:00
|
|
|
}
|
|
|
|
|
|
2013-05-13 15:39:52 +02:00
|
|
|
def createConsumer: Future[Vector[Envelope]] = spawn {
|
|
|
|
|
var r = Vector[Envelope]()
|
|
|
|
|
|
|
|
|
|
while (producers.exists(_.isCompleted == false) || q.hasMessages)
|
|
|
|
|
Option(q.dequeue) foreach { message ⇒ r = r :+ message }
|
|
|
|
|
|
|
|
|
|
r
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
val consumers = List.fill(maxConsumers)(createConsumer)
|
2011-03-11 16:48:44 +01:00
|
|
|
|
2013-05-13 15:39:52 +02:00
|
|
|
val ps = producers.map(Await.result(_, remaining))
|
|
|
|
|
val cs = consumers.map(Await.result(_, remaining))
|
2011-03-11 16:48:44 +01:00
|
|
|
|
2013-05-13 15:39:52 +02:00
|
|
|
ps.map(_.size).sum must be === enqueueN //Must have produced 1000 messages
|
|
|
|
|
cs.map(_.size).sum must be === dequeueN //Must have consumed all produced messages
|
|
|
|
|
//No message is allowed to be consumed by more than one consumer
|
|
|
|
|
cs.flatten.distinct.size must be === dequeueN
|
|
|
|
|
//All consumed messages must have been produced
|
|
|
|
|
(cs.flatten diff ps.flatten).size must be === 0
|
|
|
|
|
//The ones that were produced and not consumed
|
|
|
|
|
(ps.flatten diff cs.flatten).size must be === (enqueueN - dequeueN)
|
|
|
|
|
}
|
2011-03-11 14:51:24 +01:00
|
|
|
}
|
2011-05-18 17:25:30 +02:00
|
|
|
}
|
2011-03-11 14:51:24 +01:00
|
|
|
|
|
|
|
|
class DefaultMailboxSpec extends MailboxSpec {
|
|
|
|
|
lazy val name = "The default mailbox implementation"
|
|
|
|
|
def factory = {
|
2012-06-13 17:57:56 +02:00
|
|
|
case u: UnboundedMailbox ⇒ u.create(None, None)
|
|
|
|
|
case b: BoundedMailbox ⇒ b.create(None, None)
|
2010-09-09 15:49:59 +02:00
|
|
|
}
|
|
|
|
|
}
|
2011-03-11 14:51:24 +01:00
|
|
|
|
|
|
|
|
class PriorityMailboxSpec extends MailboxSpec {
|
2011-04-11 17:08:10 +02:00
|
|
|
val comparator = PriorityGenerator(_.##)
|
2011-03-11 14:51:24 +01:00
|
|
|
lazy val name = "The priority mailbox implementation"
|
|
|
|
|
def factory = {
|
2012-06-13 17:57:56 +02:00
|
|
|
case UnboundedMailbox() ⇒ new UnboundedPriorityMailbox(comparator).create(None, None)
|
|
|
|
|
case BoundedMailbox(capacity, pushTimeOut) ⇒ new BoundedPriorityMailbox(comparator, capacity, pushTimeOut).create(None, None)
|
2011-03-11 14:51:24 +01:00
|
|
|
}
|
2011-06-13 22:36:46 +02:00
|
|
|
}
|
2011-12-19 20:36:06 +01:00
|
|
|
|
|
|
|
|
object CustomMailboxSpec {
|
|
|
|
|
val config = """
|
|
|
|
|
my-dispatcher {
|
2012-02-09 20:40:09 +01:00
|
|
|
mailbox-type = "akka.dispatch.CustomMailboxSpec$MyMailboxType"
|
2011-12-19 20:36:06 +01:00
|
|
|
}
|
|
|
|
|
"""
|
|
|
|
|
|
2012-02-26 21:26:25 +01:00
|
|
|
class MyMailboxType(settings: ActorSystem.Settings, config: Config) extends MailboxType {
|
2012-06-13 17:57:56 +02:00
|
|
|
override def create(owner: Option[ActorRef], system: Option[ActorSystem]) = owner match {
|
2012-02-19 10:28:56 +01:00
|
|
|
case Some(o) ⇒ new MyMailbox(o)
|
|
|
|
|
case None ⇒ throw new Exception("no mailbox owner given")
|
|
|
|
|
}
|
2011-12-21 19:37:18 +01:00
|
|
|
}
|
|
|
|
|
|
2012-06-13 17:57:56 +02:00
|
|
|
class MyMailbox(owner: ActorRef) extends QueueBasedMessageQueue with UnboundedMessageQueueSemantics {
|
2011-12-19 20:36:06 +01:00
|
|
|
final val queue = new ConcurrentLinkedQueue[Envelope]()
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
|
|
|
|
|
class CustomMailboxSpec extends AkkaSpec(CustomMailboxSpec.config) {
|
|
|
|
|
"Dispatcher configuration" must {
|
|
|
|
|
"support custom mailboxType" in {
|
2012-02-19 10:28:56 +01:00
|
|
|
val actor = system.actorOf(Props.empty.withDispatcher("my-dispatcher"))
|
2012-06-13 17:57:56 +02:00
|
|
|
awaitCond(actor match {
|
|
|
|
|
case r: RepointableRef ⇒ r.isStarted
|
|
|
|
|
case _ ⇒ true
|
|
|
|
|
}, 1 second, 10 millis)
|
|
|
|
|
val queue = actor.asInstanceOf[ActorRefWithCell].underlying.asInstanceOf[ActorCell].mailbox.messageQueue
|
2012-02-19 10:28:56 +01:00
|
|
|
queue.getClass must be(classOf[CustomMailboxSpec.MyMailbox])
|
2011-12-19 20:36:06 +01:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
2013-04-03 20:19:15 +02:00
|
|
|
|
|
|
|
|
class SingleConsumerOnlyMailboxSpec extends MailboxSpec {
|
|
|
|
|
lazy val name = "The single-consumer-only mailbox implementation"
|
|
|
|
|
override def maxConsumers = 1
|
|
|
|
|
def factory = {
|
|
|
|
|
case u: UnboundedMailbox ⇒ SingleConsumerOnlyUnboundedMailbox().create(None, None)
|
|
|
|
|
case b: BoundedMailbox ⇒ pending; null
|
|
|
|
|
}
|
|
|
|
|
}
|