#3340 - Adding support for 0ms push timeout for blocking mailboxes

This commit is contained in:
Viktor Klang 2013-05-13 15:39:52 +02:00
parent d0ed7385b2
commit 6aab7b8705
3 changed files with 69 additions and 39 deletions

View file

@ -8,6 +8,7 @@ import com.typesafe.config.ConfigFactory
import akka.testkit._ import akka.testkit._
import akka.dispatch._ import akka.dispatch._
import akka.TestUtils.verifyActorTermination import akka.TestUtils.verifyActorTermination
import scala.concurrent.duration.Duration
object ActorMailboxSpec { object ActorMailboxSpec {
val mailboxConf = ConfigFactory.parseString(""" val mailboxConf = ConfigFactory.parseString("""
@ -31,6 +32,12 @@ object ActorMailboxSpec {
mailbox-type = "akka.dispatch.BoundedMailbox" mailbox-type = "akka.dispatch.BoundedMailbox"
} }
bounded-mailbox-with-zero-pushtimeout {
mailbox-capacity = 1000
mailbox-push-timeout-time = 0s
mailbox-type = "akka.dispatch.BoundedMailbox"
}
akka.actor.deployment { akka.actor.deployment {
/default-default { /default-default {
} }
@ -43,6 +50,9 @@ object ActorMailboxSpec {
/default-bounded { /default-bounded {
mailbox = bounded-mailbox mailbox = bounded-mailbox
} }
/default-bounded-mailbox-with-zero-pushtimeout {
mailbox = bounded-mailbox-with-zero-pushtimeout
}
/default-unbounded-deque { /default-unbounded-deque {
mailbox = akka.actor.mailbox.unbounded-deque-based mailbox = akka.actor.mailbox.unbounded-deque-based
} }
@ -97,12 +107,13 @@ class ActorMailboxSpec extends AkkaSpec(ActorMailboxSpec.mailboxConf) with Defau
import ActorMailboxSpec._ import ActorMailboxSpec._
def checkMailboxQueue(props: Props, name: String, types: Seq[Class[_]]): Unit = { def checkMailboxQueue(props: Props, name: String, types: Seq[Class[_]]): MessageQueue = {
val actor = system.actorOf(props, name) val actor = system.actorOf(props, name)
actor ! "ping" actor ! "ping"
val q = expectMsgType[MessageQueue] val q = expectMsgType[MessageQueue]
types foreach (t assert(t isInstance q, s"Type [${q.getClass.getName}] is not assignable to [${t.getName}]")) types foreach (t assert(t isInstance q, s"Type [${q.getClass.getName}] is not assignable to [${t.getName}]"))
q
} }
"An Actor" must { "An Actor" must {
@ -154,10 +165,15 @@ class ActorMailboxSpec extends AkkaSpec(ActorMailboxSpec.mailboxConf) with Defau
checkMailboxQueue(Props[QueueReportingActor], "unbounded-bounded", BoundedMailboxTypes) checkMailboxQueue(Props[QueueReportingActor], "unbounded-bounded", BoundedMailboxTypes)
} }
"get a bounded message queue by when defined in dispatcher" in { "get a bounded message queue when defined in dispatcher" in {
checkMailboxQueue(Props[QueueReportingActor], "bounded-default", BoundedMailboxTypes) checkMailboxQueue(Props[QueueReportingActor], "bounded-default", BoundedMailboxTypes)
} }
"get a bounded message queue with 0 push timeout when defined in dispatcher" in {
val q = checkMailboxQueue(Props[QueueReportingActor], "default-bounded-mailbox-with-zero-pushtimeout", BoundedMailboxTypes)
q.asInstanceOf[BoundedMessageQueueSemantics].pushTimeOut must be === Duration.Zero
}
"get an unbounded message queue when it's configured as mailbox overriding bounded in dispatcher" in { "get an unbounded message queue when it's configured as mailbox overriding bounded in dispatcher" in {
checkMailboxQueue(Props[QueueReportingActor], "bounded-unbounded", UnboundedMailboxTypes) checkMailboxQueue(Props[QueueReportingActor], "bounded-unbounded", UnboundedMailboxTypes)
} }

View file

@ -9,7 +9,7 @@ import java.util.concurrent.{ ConcurrentLinkedQueue, BlockingQueue }
import org.scalatest.{ BeforeAndAfterEach, BeforeAndAfterAll } import org.scalatest.{ BeforeAndAfterEach, BeforeAndAfterAll }
import com.typesafe.config.Config import com.typesafe.config.Config
import akka.actor.{ RepointableRef, Props, DeadLetter, ActorSystem, ActorRefWithCell, ActorRef, ActorCell } import akka.actor.{ RepointableRef, Props, DeadLetter, ActorSystem, ActorRefWithCell, ActorRef, ActorCell }
import akka.testkit.AkkaSpec import akka.testkit.{ EventFilter, AkkaSpec }
import scala.concurrent.{ Future, Promise, Await, ExecutionContext } import scala.concurrent.{ Future, Promise, Await, ExecutionContext }
import scala.concurrent.duration._ import scala.concurrent.duration._
@ -69,6 +69,10 @@ abstract class MailboxSpec extends AkkaSpec with BeforeAndAfterAll with BeforeAn
testEnqueueDequeue(BoundedMailbox(10000, -1 millisecond)) testEnqueueDequeue(BoundedMailbox(10000, -1 millisecond))
} }
"dequeue what was enqueued properly for bounded mailboxes with 0 pushTimeout" in {
testEnqueueDequeue(BoundedMailbox(10, 0 millisecond), 20, 10, false)
}
"dequeue what was enqueued properly for bounded mailboxes with pushTimeout" in { "dequeue what was enqueued properly for bounded mailboxes with pushTimeout" in {
testEnqueueDequeue(BoundedMailbox(10000, 100 milliseconds)) testEnqueueDequeue(BoundedMailbox(10000, 100 milliseconds))
} }
@ -121,45 +125,55 @@ abstract class MailboxSpec extends AkkaSpec with BeforeAndAfterAll with BeforeAn
q.hasMessages must be === false q.hasMessages must be === false
} }
def testEnqueueDequeue(config: MailboxType) { def testEnqueueDequeue(config: MailboxType,
implicit val within = 10 seconds enqueueN: Int = 10000,
dequeueN: Int = 10000,
parallel: Boolean = true): Unit = within(10 seconds) {
val q = factory(config) val q = factory(config)
ensureInitialMailboxState(config, q) ensureInitialMailboxState(config, q)
def createProducer(fromNum: Int, toNum: Int): Future[Vector[Envelope]] = spawn { EventFilter.warning(pattern = ".*received dead letter from Actor.*MailboxSpec/deadLetters.*",
val messages = Vector() ++ (for (i fromNum to toNum) yield createMessageInvocation(i)) occurrences = (enqueueN - dequeueN)) intercept {
for (i messages) q.enqueue(null, i)
messages
}
val totalMessages = 10000 def createProducer(fromNum: Int, toNum: Int): Future[Vector[Envelope]] = spawn {
val step = 500 val messages = Vector() ++ (for (i fromNum to toNum) yield createMessageInvocation(i))
for (i messages) q.enqueue(testActor, i)
val producers = for (i (1 to totalMessages by step).toList) yield createProducer(i, i + step - 1) messages
def createConsumer: Future[Vector[Envelope]] = spawn {
var r = Vector[Envelope]()
while (producers.exists(_.isCompleted == false) || q.hasMessages) {
q.dequeue match {
case null
case message r = r :+ message
} }
val producers = {
val step = 500
val ps = for (i (1 to enqueueN by step).toList) yield createProducer(i, Math.min(enqueueN, i + step - 1))
if (parallel == false)
ps foreach { Await.ready(_, remaining) }
ps
}
def createConsumer: Future[Vector[Envelope]] = spawn {
var r = Vector[Envelope]()
while (producers.exists(_.isCompleted == false) || q.hasMessages)
Option(q.dequeue) foreach { message r = r :+ message }
r
}
val consumers = List.fill(maxConsumers)(createConsumer)
val ps = producers.map(Await.result(_, remaining))
val cs = consumers.map(Await.result(_, remaining))
ps.map(_.size).sum must be === enqueueN //Must have produced 1000 messages
cs.map(_.size).sum must be === dequeueN //Must have consumed all produced messages
//No message is allowed to be consumed by more than one consumer
cs.flatten.distinct.size must be === dequeueN
//All consumed messages must have been produced
(cs.flatten diff ps.flatten).size must be === 0
//The ones that were produced and not consumed
(ps.flatten diff cs.flatten).size must be === (enqueueN - dequeueN)
} }
r
}
val consumers = List.fill(maxConsumers)(createConsumer)
val ps = producers.map(Await.result(_, within))
val cs = consumers.map(Await.result(_, within))
ps.map(_.size).sum must be === totalMessages //Must have produced 1000 messages
cs.map(_.size).sum must be === totalMessages //Must have consumed all produced messages
//No message is allowed to be consumed by more than one consumer
cs.flatten.distinct.size must be === totalMessages
//All produced messages should have been consumed
(cs.flatten diff ps.flatten).size must be === 0
(ps.flatten diff cs.flatten).size must be === 0
} }
} }

View file

@ -454,7 +454,7 @@ trait BoundedMessageQueueSemantics extends QueueBasedMessageQueue {
override def queue: BlockingQueue[Envelope] override def queue: BlockingQueue[Envelope]
def enqueue(receiver: ActorRef, handle: Envelope): Unit = def enqueue(receiver: ActorRef, handle: Envelope): Unit =
if (pushTimeOut.length > 0) { if (pushTimeOut.length >= 0) {
if (!queue.offer(handle, pushTimeOut.length, pushTimeOut.unit)) if (!queue.offer(handle, pushTimeOut.length, pushTimeOut.unit))
receiver.asInstanceOf[InternalActorRef].provider.deadLetters.tell( receiver.asInstanceOf[InternalActorRef].provider.deadLetters.tell(
DeadLetter(handle.message, handle.sender, receiver), handle.sender) DeadLetter(handle.message, handle.sender, receiver), handle.sender)
@ -490,14 +490,14 @@ trait BoundedDequeBasedMessageQueueSemantics extends DequeBasedMessageQueue {
override def queue: BlockingDeque[Envelope] override def queue: BlockingDeque[Envelope]
def enqueue(receiver: ActorRef, handle: Envelope): Unit = def enqueue(receiver: ActorRef, handle: Envelope): Unit =
if (pushTimeOut.length > 0) { if (pushTimeOut.length >= 0) {
if (!queue.offer(handle, pushTimeOut.length, pushTimeOut.unit)) if (!queue.offer(handle, pushTimeOut.length, pushTimeOut.unit))
receiver.asInstanceOf[InternalActorRef].provider.deadLetters.tell( receiver.asInstanceOf[InternalActorRef].provider.deadLetters.tell(
DeadLetter(handle.message, handle.sender, receiver), handle.sender) DeadLetter(handle.message, handle.sender, receiver), handle.sender)
} else queue put handle } else queue put handle
def enqueueFirst(receiver: ActorRef, handle: Envelope): Unit = def enqueueFirst(receiver: ActorRef, handle: Envelope): Unit =
if (pushTimeOut.length > 0) { if (pushTimeOut.length >= 0) {
if (!queue.offerFirst(handle, pushTimeOut.length, pushTimeOut.unit)) if (!queue.offerFirst(handle, pushTimeOut.length, pushTimeOut.unit))
receiver.asInstanceOf[InternalActorRef].provider.deadLetters.tell( receiver.asInstanceOf[InternalActorRef].provider.deadLetters.tell(
DeadLetter(handle.message, handle.sender, receiver), handle.sender) DeadLetter(handle.message, handle.sender, receiver), handle.sender)