From 42a80ec8b18bf014f582d58ff2a6af98ced663c2 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Wed, 2 May 2012 14:27:13 +0200 Subject: [PATCH] Adding more comprehensive tests for durable mailbox implementations --- .../mailbox/BeanstalkBasedMailboxSpec.scala | 4 +- .../actor/mailbox/FileBasedMailboxSpec.scala | 3 + .../actor/mailbox/DurableMailboxSpec.scala | 54 +++++++++--------- .../actor/mailbox/MongoBasedMailboxSpec.scala | 56 ++----------------- .../actor/mailbox/RedisBasedMailboxSpec.scala | 4 +- .../mailbox/ZooKeeperBasedMailboxSpec.scala | 4 +- 6 files changed, 40 insertions(+), 85 deletions(-) diff --git a/akka-durable-mailboxes/akka-beanstalk-mailbox/src/test/scala/akka/actor/mailbox/BeanstalkBasedMailboxSpec.scala b/akka-durable-mailboxes/akka-beanstalk-mailbox/src/test/scala/akka/actor/mailbox/BeanstalkBasedMailboxSpec.scala index 3b538fdf4c..77bc3e041c 100644 --- a/akka-durable-mailboxes/akka-beanstalk-mailbox/src/test/scala/akka/actor/mailbox/BeanstalkBasedMailboxSpec.scala +++ b/akka-durable-mailboxes/akka-beanstalk-mailbox/src/test/scala/akka/actor/mailbox/BeanstalkBasedMailboxSpec.scala @@ -1,5 +1,7 @@ package akka.actor.mailbox +import akka.dispatch.Mailbox + object BeanstalkBasedMailboxSpec { val config = """ Beanstalkd-dispatcher { @@ -25,6 +27,6 @@ class BeanstalkBasedMailboxSpec extends DurableMailboxSpec("Beanstalkd", Beansta } override def atTermination(): Unit = beanstalkd.destroy() - + def isDurableMailbox(m: Mailbox): Boolean = m.messageQueue.isInstanceOf[BeanstalkBasedMessageQueue] } diff --git a/akka-durable-mailboxes/akka-file-mailbox/src/test/scala/akka/actor/mailbox/FileBasedMailboxSpec.scala b/akka-durable-mailboxes/akka-file-mailbox/src/test/scala/akka/actor/mailbox/FileBasedMailboxSpec.scala index 5d428e0776..be82e0fcb3 100644 --- a/akka-durable-mailboxes/akka-file-mailbox/src/test/scala/akka/actor/mailbox/FileBasedMailboxSpec.scala +++ b/akka-durable-mailboxes/akka-file-mailbox/src/test/scala/akka/actor/mailbox/FileBasedMailboxSpec.scala @@ -2,6 +2,7 @@ package akka.actor.mailbox import org.apache.commons.io.FileUtils import com.typesafe.config.ConfigFactory +import akka.dispatch.Mailbox object FileBasedMailboxSpec { val config = """ @@ -24,6 +25,8 @@ class FileBasedMailboxSpec extends DurableMailboxSpec("File", FileBasedMailboxSp } } + def isDurableMailbox(m: Mailbox): Boolean = m.messageQueue.isInstanceOf[FileBasedMessageQueue] + def clean { FileUtils.deleteDirectory(new java.io.File(queuePath)) } diff --git a/akka-durable-mailboxes/akka-mailboxes-common/src/test/scala/akka/actor/mailbox/DurableMailboxSpec.scala b/akka-durable-mailboxes/akka-mailboxes-common/src/test/scala/akka/actor/mailbox/DurableMailboxSpec.scala index 970e0d6f1d..fc161cb11c 100644 --- a/akka-durable-mailboxes/akka-mailboxes-common/src/test/scala/akka/actor/mailbox/DurableMailboxSpec.scala +++ b/akka-durable-mailboxes/akka-mailboxes-common/src/test/scala/akka/actor/mailbox/DurableMailboxSpec.scala @@ -3,26 +3,19 @@ */ package akka.actor.mailbox -import akka.actor.Actor -import akka.actor.ActorRef -import akka.actor.PoisonPill -import akka.actor.Props -import akka.dispatch.Await import akka.testkit.AkkaSpec import akka.testkit.TestLatch import akka.util.duration._ import java.io.InputStream import scala.annotation.tailrec import com.typesafe.config.Config +import akka.actor._ +import akka.dispatch.{ Mailbox, Await } object DurableMailboxSpecActorFactory { class MailboxTestActor extends Actor { - def receive = { case "sum" ⇒ sender ! "sum" } - } - - class Sender(latch: TestLatch) extends Actor { - def receive = { case "sum" ⇒ latch.countDown() } + def receive = { case x ⇒ sender ! x } } } @@ -54,32 +47,35 @@ abstract class DurableMailboxSpec(val backendName: String, config: String) exten if (!result.contains(words)) throw new Exception("stream did not contain '" + words + "':\n" + result) } - def createMailboxTestActor(id: String): ActorRef = - system.actorOf(Props(new MailboxTestActor).withDispatcher(backendName + "-dispatcher")) + private val props = Props[MailboxTestActor].withDispatcher(backendName + "-dispatcher") + + def createMailboxTestActor(id: String = ""): ActorRef = id match { + case null | "" ⇒ system.actorOf(props) + case some ⇒ system.actorOf(props, some) + } + + def isDurableMailbox(m: Mailbox): Boolean "A " + backendName + " based mailbox backed actor" must { - "handle reply to ! for 1 message" in { - val latch = new TestLatch(1) - val queueActor = createMailboxTestActor(backendName + " should handle reply to !") - val sender = system.actorOf(Props(new Sender(latch))) - - queueActor.!("sum")(sender) - Await.ready(latch, 10 seconds) - queueActor ! PoisonPill - sender ! PoisonPill + "get a new, unique, durable mailbox" in { + val a1, a2 = createMailboxTestActor() + isDurableMailbox(a1.asInstanceOf[LocalActorRef].underlying.mailbox) must be(true) + isDurableMailbox(a2.asInstanceOf[LocalActorRef].underlying.mailbox) must be(true) + (a1.asInstanceOf[LocalActorRef].underlying.mailbox ne a2.asInstanceOf[LocalActorRef].underlying.mailbox) must be(true) } - "handle reply to ! for multiple messages" in { - val latch = new TestLatch(5) - val queueActor = createMailboxTestActor(backendName + " should handle reply to !") - val sender = system.actorOf(Props(new Sender(latch))) + "deliver messages at most once" in { + val queueActor = createMailboxTestActor() + implicit val sender = testActor - for (i ← 1 to 10) queueActor.!("sum")(sender) + val msgs = 1 to 100 map { x ⇒ "foo" + x } - Await.ready(latch, 10 seconds) - queueActor ! PoisonPill - sender ! PoisonPill + msgs foreach { m ⇒ queueActor ! m } + + msgs foreach { m ⇒ expectMsg(m) } + + expectNoMsg() } } diff --git a/akka-durable-mailboxes/akka-mongo-mailbox/src/test/scala/akka/actor/mailbox/MongoBasedMailboxSpec.scala b/akka-durable-mailboxes/akka-mongo-mailbox/src/test/scala/akka/actor/mailbox/MongoBasedMailboxSpec.scala index 0579215df0..b4926f4ad7 100644 --- a/akka-durable-mailboxes/akka-mongo-mailbox/src/test/scala/akka/actor/mailbox/MongoBasedMailboxSpec.scala +++ b/akka-durable-mailboxes/akka-mongo-mailbox/src/test/scala/akka/actor/mailbox/MongoBasedMailboxSpec.scala @@ -7,7 +7,7 @@ import org.scalatest.{ BeforeAndAfterEach, BeforeAndAfterAll } import akka.actor._ import akka.actor.Actor._ import java.util.concurrent.CountDownLatch -import akka.dispatch.MessageDispatcher +import akka.dispatch.{ Mailbox, MessageDispatcher } object MongoBasedMailboxSpec { val config = """ @@ -27,6 +27,8 @@ class MongoBasedMailboxSpec extends DurableMailboxSpec("mongodb", MongoBasedMail lazy val mongod = new ProcessBuilder("mongod", "--dbpath", "mongoDB", "--bind_ip", "127.0.0.1", "--port", "27123").start() lazy val mongo = MongoConnection("localhost", 27123)("akka") + def isDurableMailbox(m: Mailbox): Boolean = m.messageQueue.isInstanceOf[MongoBasedMessageQueue] + override def atStartup(): Unit = { // start MongoDB daemon new java.io.File("mongoDB").mkdir() @@ -41,54 +43,4 @@ class MongoBasedMailboxSpec extends DurableMailboxSpec("mongodb", MongoBasedMail } override def atTermination(): Unit = mongod.destroy() - -} - -/*object DurableMongoMailboxSpecActorFactory { - - class MongoMailboxTestActor extends Actor { - def receive = { - case "sum" => reply("sum") - } - } - - def createMongoMailboxTestActor(id: String)(implicit dispatcher: MessageDispatcher): ActorRef = { - val queueActor = actorOf(Props[MongoMailboxTestActor] - queueActor.dispatcher = dispatcher - queueActor - } -}*/ - -/*class MongoBasedMailboxSpec extends WordSpec with MustMatchers with BeforeAndAfterEach with BeforeAndAfterAll { - import DurableMongoMailboxSpecActorFactory._ - - implicit val dispatcher = DurableDispatcher("mongodb", MongoNaiveDurableMailboxStorage, 1) - - "A MongoDB based naive mailbox backed actor" should { - "should handle reply to ! for 1 message" in { - val latch = new CountDownLatch(1) - val queueActor = createMongoMailboxTestActor("mongoDB Backend should handle Reply to !") - val sender = actorOf(Props(new Actor { def receive = { case "sum" => latch.countDown } }) - - queueActor.!("sum")(Some(sender)) - latch.await(10, TimeUnit.SECONDS) must be (true) - } - - "should handle reply to ! for multiple messages" in { - val latch = new CountDownLatch(5) - val queueActor = createMongoMailboxTestActor("mongoDB Backend should handle reply to !") - val sender = actorOf( new Actor { def receive = { case "sum" => latch.countDown } } ) - - queueActor.!("sum")(Some(sender)) - queueActor.!("sum")(Some(sender)) - queueActor.!("sum")(Some(sender)) - queueActor.!("sum")(Some(sender)) - queueActor.!("sum")(Some(sender)) - latch.await(10, TimeUnit.SECONDS) must be (true) - } - } - - override def beforeEach() { - registry.local.shutdownAll - } -}*/ +} \ No newline at end of file diff --git a/akka-durable-mailboxes/akka-redis-mailbox/src/test/scala/akka/actor/mailbox/RedisBasedMailboxSpec.scala b/akka-durable-mailboxes/akka-redis-mailbox/src/test/scala/akka/actor/mailbox/RedisBasedMailboxSpec.scala index 8dd0d09ce4..69a212e981 100644 --- a/akka-durable-mailboxes/akka-redis-mailbox/src/test/scala/akka/actor/mailbox/RedisBasedMailboxSpec.scala +++ b/akka-durable-mailboxes/akka-redis-mailbox/src/test/scala/akka/actor/mailbox/RedisBasedMailboxSpec.scala @@ -1,5 +1,7 @@ package akka.actor.mailbox +import akka.dispatch.Mailbox + object RedisBasedMailboxSpec { val config = """ Redis-dispatcher { @@ -40,5 +42,5 @@ class RedisBasedMailboxSpec extends DurableMailboxSpec("Redis", RedisBasedMailbo } override def atTermination(): Unit = redisServer.destroy() - + def isDurableMailbox(m: Mailbox): Boolean = m.messageQueue.isInstanceOf[RedisBasedMessageQueue] } diff --git a/akka-durable-mailboxes/akka-zookeeper-mailbox/src/test/scala/akka/actor/mailbox/ZooKeeperBasedMailboxSpec.scala b/akka-durable-mailboxes/akka-zookeeper-mailbox/src/test/scala/akka/actor/mailbox/ZooKeeperBasedMailboxSpec.scala index a2d9028bfe..a615bc6507 100644 --- a/akka-durable-mailboxes/akka-zookeeper-mailbox/src/test/scala/akka/actor/mailbox/ZooKeeperBasedMailboxSpec.scala +++ b/akka-durable-mailboxes/akka-zookeeper-mailbox/src/test/scala/akka/actor/mailbox/ZooKeeperBasedMailboxSpec.scala @@ -3,10 +3,10 @@ package akka.actor.mailbox import akka.actor.Actor import akka.cluster.zookeeper._ import org.I0Itec.zkclient._ -import akka.dispatch.MessageDispatcher import akka.actor.ActorRef import com.typesafe.config.ConfigFactory import akka.util.duration._ +import akka.dispatch.{ Mailbox, MessageDispatcher } object ZooKeeperBasedMailboxSpec { val config = """ @@ -31,7 +31,7 @@ class ZooKeeperBasedMailboxSpec extends DurableMailboxSpec("ZooKeeper", ZooKeepe } var zkServer: ZkServer = _ - + def isDurableMailbox(m: Mailbox): Boolean = m.messageQueue.isInstanceOf[ZooKeeperBasedMessageQueue] override def atStartup() { zkServer = AkkaZooKeeper.startLocalServer(dataPath, logPath) super.atStartup()