Adding more comprehensive tests for durable mailbox implementations
This commit is contained in:
parent
071ea34eb1
commit
42a80ec8b1
6 changed files with 40 additions and 85 deletions
|
|
@ -1,5 +1,7 @@
|
|||
package akka.actor.mailbox
|
||||
|
||||
import akka.dispatch.Mailbox
|
||||
|
||||
object BeanstalkBasedMailboxSpec {
|
||||
val config = """
|
||||
Beanstalkd-dispatcher {
|
||||
|
|
@ -25,6 +27,6 @@ class BeanstalkBasedMailboxSpec extends DurableMailboxSpec("Beanstalkd", Beansta
|
|||
}
|
||||
|
||||
override def atTermination(): Unit = beanstalkd.destroy()
|
||||
|
||||
def isDurableMailbox(m: Mailbox): Boolean = m.messageQueue.isInstanceOf[BeanstalkBasedMessageQueue]
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -2,6 +2,7 @@ package akka.actor.mailbox
|
|||
|
||||
import org.apache.commons.io.FileUtils
|
||||
import com.typesafe.config.ConfigFactory
|
||||
import akka.dispatch.Mailbox
|
||||
|
||||
object FileBasedMailboxSpec {
|
||||
val config = """
|
||||
|
|
@ -24,6 +25,8 @@ class FileBasedMailboxSpec extends DurableMailboxSpec("File", FileBasedMailboxSp
|
|||
}
|
||||
}
|
||||
|
||||
def isDurableMailbox(m: Mailbox): Boolean = m.messageQueue.isInstanceOf[FileBasedMessageQueue]
|
||||
|
||||
def clean {
|
||||
FileUtils.deleteDirectory(new java.io.File(queuePath))
|
||||
}
|
||||
|
|
|
|||
|
|
@ -3,26 +3,19 @@
|
|||
*/
|
||||
package akka.actor.mailbox
|
||||
|
||||
import akka.actor.Actor
|
||||
import akka.actor.ActorRef
|
||||
import akka.actor.PoisonPill
|
||||
import akka.actor.Props
|
||||
import akka.dispatch.Await
|
||||
import akka.testkit.AkkaSpec
|
||||
import akka.testkit.TestLatch
|
||||
import akka.util.duration._
|
||||
import java.io.InputStream
|
||||
import scala.annotation.tailrec
|
||||
import com.typesafe.config.Config
|
||||
import akka.actor._
|
||||
import akka.dispatch.{ Mailbox, Await }
|
||||
|
||||
object DurableMailboxSpecActorFactory {
|
||||
|
||||
class MailboxTestActor extends Actor {
|
||||
def receive = { case "sum" ⇒ sender ! "sum" }
|
||||
}
|
||||
|
||||
class Sender(latch: TestLatch) extends Actor {
|
||||
def receive = { case "sum" ⇒ latch.countDown() }
|
||||
def receive = { case x ⇒ sender ! x }
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -54,32 +47,35 @@ abstract class DurableMailboxSpec(val backendName: String, config: String) exten
|
|||
if (!result.contains(words)) throw new Exception("stream did not contain '" + words + "':\n" + result)
|
||||
}
|
||||
|
||||
def createMailboxTestActor(id: String): ActorRef =
|
||||
system.actorOf(Props(new MailboxTestActor).withDispatcher(backendName + "-dispatcher"))
|
||||
private val props = Props[MailboxTestActor].withDispatcher(backendName + "-dispatcher")
|
||||
|
||||
def createMailboxTestActor(id: String = ""): ActorRef = id match {
|
||||
case null | "" ⇒ system.actorOf(props)
|
||||
case some ⇒ system.actorOf(props, some)
|
||||
}
|
||||
|
||||
def isDurableMailbox(m: Mailbox): Boolean
|
||||
|
||||
"A " + backendName + " based mailbox backed actor" must {
|
||||
|
||||
"handle reply to ! for 1 message" in {
|
||||
val latch = new TestLatch(1)
|
||||
val queueActor = createMailboxTestActor(backendName + " should handle reply to !")
|
||||
val sender = system.actorOf(Props(new Sender(latch)))
|
||||
|
||||
queueActor.!("sum")(sender)
|
||||
Await.ready(latch, 10 seconds)
|
||||
queueActor ! PoisonPill
|
||||
sender ! PoisonPill
|
||||
"get a new, unique, durable mailbox" in {
|
||||
val a1, a2 = createMailboxTestActor()
|
||||
isDurableMailbox(a1.asInstanceOf[LocalActorRef].underlying.mailbox) must be(true)
|
||||
isDurableMailbox(a2.asInstanceOf[LocalActorRef].underlying.mailbox) must be(true)
|
||||
(a1.asInstanceOf[LocalActorRef].underlying.mailbox ne a2.asInstanceOf[LocalActorRef].underlying.mailbox) must be(true)
|
||||
}
|
||||
|
||||
"handle reply to ! for multiple messages" in {
|
||||
val latch = new TestLatch(5)
|
||||
val queueActor = createMailboxTestActor(backendName + " should handle reply to !")
|
||||
val sender = system.actorOf(Props(new Sender(latch)))
|
||||
"deliver messages at most once" in {
|
||||
val queueActor = createMailboxTestActor()
|
||||
implicit val sender = testActor
|
||||
|
||||
for (i ← 1 to 10) queueActor.!("sum")(sender)
|
||||
val msgs = 1 to 100 map { x ⇒ "foo" + x }
|
||||
|
||||
Await.ready(latch, 10 seconds)
|
||||
queueActor ! PoisonPill
|
||||
sender ! PoisonPill
|
||||
msgs foreach { m ⇒ queueActor ! m }
|
||||
|
||||
msgs foreach { m ⇒ expectMsg(m) }
|
||||
|
||||
expectNoMsg()
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -7,7 +7,7 @@ import org.scalatest.{ BeforeAndAfterEach, BeforeAndAfterAll }
|
|||
import akka.actor._
|
||||
import akka.actor.Actor._
|
||||
import java.util.concurrent.CountDownLatch
|
||||
import akka.dispatch.MessageDispatcher
|
||||
import akka.dispatch.{ Mailbox, MessageDispatcher }
|
||||
|
||||
object MongoBasedMailboxSpec {
|
||||
val config = """
|
||||
|
|
@ -27,6 +27,8 @@ class MongoBasedMailboxSpec extends DurableMailboxSpec("mongodb", MongoBasedMail
|
|||
lazy val mongod = new ProcessBuilder("mongod", "--dbpath", "mongoDB", "--bind_ip", "127.0.0.1", "--port", "27123").start()
|
||||
lazy val mongo = MongoConnection("localhost", 27123)("akka")
|
||||
|
||||
def isDurableMailbox(m: Mailbox): Boolean = m.messageQueue.isInstanceOf[MongoBasedMessageQueue]
|
||||
|
||||
override def atStartup(): Unit = {
|
||||
// start MongoDB daemon
|
||||
new java.io.File("mongoDB").mkdir()
|
||||
|
|
@ -41,54 +43,4 @@ class MongoBasedMailboxSpec extends DurableMailboxSpec("mongodb", MongoBasedMail
|
|||
}
|
||||
|
||||
override def atTermination(): Unit = mongod.destroy()
|
||||
|
||||
}
|
||||
|
||||
/*object DurableMongoMailboxSpecActorFactory {
|
||||
|
||||
class MongoMailboxTestActor extends Actor {
|
||||
def receive = {
|
||||
case "sum" => reply("sum")
|
||||
}
|
||||
}
|
||||
|
||||
def createMongoMailboxTestActor(id: String)(implicit dispatcher: MessageDispatcher): ActorRef = {
|
||||
val queueActor = actorOf(Props[MongoMailboxTestActor]
|
||||
queueActor.dispatcher = dispatcher
|
||||
queueActor
|
||||
}
|
||||
}*/
|
||||
|
||||
/*class MongoBasedMailboxSpec extends WordSpec with MustMatchers with BeforeAndAfterEach with BeforeAndAfterAll {
|
||||
import DurableMongoMailboxSpecActorFactory._
|
||||
|
||||
implicit val dispatcher = DurableDispatcher("mongodb", MongoNaiveDurableMailboxStorage, 1)
|
||||
|
||||
"A MongoDB based naive mailbox backed actor" should {
|
||||
"should handle reply to ! for 1 message" in {
|
||||
val latch = new CountDownLatch(1)
|
||||
val queueActor = createMongoMailboxTestActor("mongoDB Backend should handle Reply to !")
|
||||
val sender = actorOf(Props(new Actor { def receive = { case "sum" => latch.countDown } })
|
||||
|
||||
queueActor.!("sum")(Some(sender))
|
||||
latch.await(10, TimeUnit.SECONDS) must be (true)
|
||||
}
|
||||
|
||||
"should handle reply to ! for multiple messages" in {
|
||||
val latch = new CountDownLatch(5)
|
||||
val queueActor = createMongoMailboxTestActor("mongoDB Backend should handle reply to !")
|
||||
val sender = actorOf( new Actor { def receive = { case "sum" => latch.countDown } } )
|
||||
|
||||
queueActor.!("sum")(Some(sender))
|
||||
queueActor.!("sum")(Some(sender))
|
||||
queueActor.!("sum")(Some(sender))
|
||||
queueActor.!("sum")(Some(sender))
|
||||
queueActor.!("sum")(Some(sender))
|
||||
latch.await(10, TimeUnit.SECONDS) must be (true)
|
||||
}
|
||||
}
|
||||
|
||||
override def beforeEach() {
|
||||
registry.local.shutdownAll
|
||||
}
|
||||
}*/
|
||||
}
|
||||
|
|
@ -1,5 +1,7 @@
|
|||
package akka.actor.mailbox
|
||||
|
||||
import akka.dispatch.Mailbox
|
||||
|
||||
object RedisBasedMailboxSpec {
|
||||
val config = """
|
||||
Redis-dispatcher {
|
||||
|
|
@ -40,5 +42,5 @@ class RedisBasedMailboxSpec extends DurableMailboxSpec("Redis", RedisBasedMailbo
|
|||
}
|
||||
|
||||
override def atTermination(): Unit = redisServer.destroy()
|
||||
|
||||
def isDurableMailbox(m: Mailbox): Boolean = m.messageQueue.isInstanceOf[RedisBasedMessageQueue]
|
||||
}
|
||||
|
|
|
|||
|
|
@ -3,10 +3,10 @@ package akka.actor.mailbox
|
|||
import akka.actor.Actor
|
||||
import akka.cluster.zookeeper._
|
||||
import org.I0Itec.zkclient._
|
||||
import akka.dispatch.MessageDispatcher
|
||||
import akka.actor.ActorRef
|
||||
import com.typesafe.config.ConfigFactory
|
||||
import akka.util.duration._
|
||||
import akka.dispatch.{ Mailbox, MessageDispatcher }
|
||||
|
||||
object ZooKeeperBasedMailboxSpec {
|
||||
val config = """
|
||||
|
|
@ -31,7 +31,7 @@ class ZooKeeperBasedMailboxSpec extends DurableMailboxSpec("ZooKeeper", ZooKeepe
|
|||
}
|
||||
|
||||
var zkServer: ZkServer = _
|
||||
|
||||
def isDurableMailbox(m: Mailbox): Boolean = m.messageQueue.isInstanceOf[ZooKeeperBasedMessageQueue]
|
||||
override def atStartup() {
|
||||
zkServer = AkkaZooKeeper.startLocalServer(dataPath, logPath)
|
||||
super.atStartup()
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue