diff --git a/akka-actor-tests/src/test/scala/akka/actor/ActorMailboxSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/ActorMailboxSpec.scala index 0ac89fa654..9e256f2643 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/ActorMailboxSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/ActorMailboxSpec.scala @@ -8,6 +8,7 @@ import com.typesafe.config.ConfigFactory import akka.testkit._ import akka.dispatch._ import akka.TestUtils.verifyActorTermination +import scala.concurrent.duration.Duration object ActorMailboxSpec { val mailboxConf = ConfigFactory.parseString(""" @@ -31,6 +32,12 @@ object ActorMailboxSpec { mailbox-type = "akka.dispatch.BoundedMailbox" } + bounded-mailbox-with-zero-pushtimeout { + mailbox-capacity = 1000 + mailbox-push-timeout-time = 0s + mailbox-type = "akka.dispatch.BoundedMailbox" + } + akka.actor.deployment { /default-default { } @@ -43,6 +50,9 @@ object ActorMailboxSpec { /default-bounded { mailbox = bounded-mailbox } + /default-bounded-mailbox-with-zero-pushtimeout { + mailbox = bounded-mailbox-with-zero-pushtimeout + } /default-unbounded-deque { mailbox = akka.actor.mailbox.unbounded-deque-based } @@ -97,12 +107,13 @@ class ActorMailboxSpec extends AkkaSpec(ActorMailboxSpec.mailboxConf) with Defau import ActorMailboxSpec._ - def checkMailboxQueue(props: Props, name: String, types: Seq[Class[_]]): Unit = { + def checkMailboxQueue(props: Props, name: String, types: Seq[Class[_]]): MessageQueue = { val actor = system.actorOf(props, name) actor ! "ping" val q = expectMsgType[MessageQueue] types foreach (t ⇒ assert(t isInstance q, s"Type [${q.getClass.getName}] is not assignable to [${t.getName}]")) + q } "An Actor" must { @@ -154,10 +165,15 @@ class ActorMailboxSpec extends AkkaSpec(ActorMailboxSpec.mailboxConf) with Defau checkMailboxQueue(Props[QueueReportingActor], "unbounded-bounded", BoundedMailboxTypes) } - "get a bounded message queue by when defined in dispatcher" in { + "get a bounded message queue when defined in dispatcher" in { checkMailboxQueue(Props[QueueReportingActor], "bounded-default", BoundedMailboxTypes) } + "get a bounded message queue with 0 push timeout when defined in dispatcher" in { + val q = checkMailboxQueue(Props[QueueReportingActor], "default-bounded-mailbox-with-zero-pushtimeout", BoundedMailboxTypes) + q.asInstanceOf[BoundedMessageQueueSemantics].pushTimeOut must be === Duration.Zero + } + "get an unbounded message queue when it's configured as mailbox overriding bounded in dispatcher" in { checkMailboxQueue(Props[QueueReportingActor], "bounded-unbounded", UnboundedMailboxTypes) } diff --git a/akka-actor-tests/src/test/scala/akka/dispatch/MailboxConfigSpec.scala b/akka-actor-tests/src/test/scala/akka/dispatch/MailboxConfigSpec.scala index 4ec8715863..2876ab34cc 100644 --- a/akka-actor-tests/src/test/scala/akka/dispatch/MailboxConfigSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/dispatch/MailboxConfigSpec.scala @@ -9,7 +9,7 @@ import java.util.concurrent.{ ConcurrentLinkedQueue, BlockingQueue } import org.scalatest.{ BeforeAndAfterEach, BeforeAndAfterAll } import com.typesafe.config.Config import akka.actor.{ RepointableRef, Props, DeadLetter, ActorSystem, ActorRefWithCell, ActorRef, ActorCell } -import akka.testkit.AkkaSpec +import akka.testkit.{ EventFilter, AkkaSpec } import scala.concurrent.{ Future, Promise, Await, ExecutionContext } import scala.concurrent.duration._ @@ -69,6 +69,10 @@ abstract class MailboxSpec extends AkkaSpec with BeforeAndAfterAll with BeforeAn testEnqueueDequeue(BoundedMailbox(10000, -1 millisecond)) } + "dequeue what was enqueued properly for bounded mailboxes with 0 pushTimeout" in { + testEnqueueDequeue(BoundedMailbox(10, 0 millisecond), 20, 10, false) + } + "dequeue what was enqueued properly for bounded mailboxes with pushTimeout" in { testEnqueueDequeue(BoundedMailbox(10000, 100 milliseconds)) } @@ -121,45 +125,55 @@ abstract class MailboxSpec extends AkkaSpec with BeforeAndAfterAll with BeforeAn q.hasMessages must be === false } - def testEnqueueDequeue(config: MailboxType) { - implicit val within = 10 seconds + def testEnqueueDequeue(config: MailboxType, + enqueueN: Int = 10000, + dequeueN: Int = 10000, + parallel: Boolean = true): Unit = within(10 seconds) { val q = factory(config) ensureInitialMailboxState(config, q) - def createProducer(fromNum: Int, toNum: Int): Future[Vector[Envelope]] = spawn { - val messages = Vector() ++ (for (i ← fromNum to toNum) yield createMessageInvocation(i)) - for (i ← messages) q.enqueue(null, i) - messages - } + EventFilter.warning(pattern = ".*received dead letter from Actor.*MailboxSpec/deadLetters.*", + occurrences = (enqueueN - dequeueN)) intercept { - val totalMessages = 10000 - val step = 500 - - val producers = for (i ← (1 to totalMessages by step).toList) yield createProducer(i, i + step - 1) - - def createConsumer: Future[Vector[Envelope]] = spawn { - var r = Vector[Envelope]() - while (producers.exists(_.isCompleted == false) || q.hasMessages) { - q.dequeue match { - case null ⇒ - case message ⇒ r = r :+ message + def createProducer(fromNum: Int, toNum: Int): Future[Vector[Envelope]] = spawn { + val messages = Vector() ++ (for (i ← fromNum to toNum) yield createMessageInvocation(i)) + for (i ← messages) q.enqueue(testActor, i) + messages } + + val producers = { + val step = 500 + val ps = for (i ← (1 to enqueueN by step).toList) yield createProducer(i, Math.min(enqueueN, i + step - 1)) + + if (parallel == false) + ps foreach { Await.ready(_, remaining) } + + ps + } + + def createConsumer: Future[Vector[Envelope]] = spawn { + var r = Vector[Envelope]() + + while (producers.exists(_.isCompleted == false) || q.hasMessages) + Option(q.dequeue) foreach { message ⇒ r = r :+ message } + + r + } + + val consumers = List.fill(maxConsumers)(createConsumer) + + val ps = producers.map(Await.result(_, remaining)) + val cs = consumers.map(Await.result(_, remaining)) + + ps.map(_.size).sum must be === enqueueN //Must have produced 1000 messages + cs.map(_.size).sum must be === dequeueN //Must have consumed all produced messages + //No message is allowed to be consumed by more than one consumer + cs.flatten.distinct.size must be === dequeueN + //All consumed messages must have been produced + (cs.flatten diff ps.flatten).size must be === 0 + //The ones that were produced and not consumed + (ps.flatten diff cs.flatten).size must be === (enqueueN - dequeueN) } - r - } - - val consumers = List.fill(maxConsumers)(createConsumer) - - val ps = producers.map(Await.result(_, within)) - val cs = consumers.map(Await.result(_, within)) - - ps.map(_.size).sum must be === totalMessages //Must have produced 1000 messages - cs.map(_.size).sum must be === totalMessages //Must have consumed all produced messages - //No message is allowed to be consumed by more than one consumer - cs.flatten.distinct.size must be === totalMessages - //All produced messages should have been consumed - (cs.flatten diff ps.flatten).size must be === 0 - (ps.flatten diff cs.flatten).size must be === 0 } } diff --git a/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala b/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala index f35917c12a..13cf8b6315 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala @@ -454,7 +454,7 @@ trait BoundedMessageQueueSemantics extends QueueBasedMessageQueue { override def queue: BlockingQueue[Envelope] def enqueue(receiver: ActorRef, handle: Envelope): Unit = - if (pushTimeOut.length > 0) { + if (pushTimeOut.length >= 0) { if (!queue.offer(handle, pushTimeOut.length, pushTimeOut.unit)) receiver.asInstanceOf[InternalActorRef].provider.deadLetters.tell( DeadLetter(handle.message, handle.sender, receiver), handle.sender) @@ -490,14 +490,14 @@ trait BoundedDequeBasedMessageQueueSemantics extends DequeBasedMessageQueue { override def queue: BlockingDeque[Envelope] def enqueue(receiver: ActorRef, handle: Envelope): Unit = - if (pushTimeOut.length > 0) { + if (pushTimeOut.length >= 0) { if (!queue.offer(handle, pushTimeOut.length, pushTimeOut.unit)) receiver.asInstanceOf[InternalActorRef].provider.deadLetters.tell( DeadLetter(handle.message, handle.sender, receiver), handle.sender) } else queue put handle def enqueueFirst(receiver: ActorRef, handle: Envelope): Unit = - if (pushTimeOut.length > 0) { + if (pushTimeOut.length >= 0) { if (!queue.offerFirst(handle, pushTimeOut.length, pushTimeOut.unit)) receiver.asInstanceOf[InternalActorRef].provider.deadLetters.tell( DeadLetter(handle.message, handle.sender, receiver), handle.sender)