Migrate to Hammersmith (com.mongodb.async) URI Format based URLs; new

config options
    - Mongo URI config options
    - Configurable timeout values for Read and Write
This commit is contained in:
Brendan W. McAdams 2011-07-11 20:02:14 -04:00
parent 5092adc2e0
commit 2a56322814
2 changed files with 42 additions and 16 deletions

View file

@ -33,10 +33,15 @@ class MongoBasedNaiveMailbox(val owner: ActorRef) extends DurableExecutableMailb
implicit val mailboxBSONSer = BSONSerializableMailbox
implicit val safeWrite = WriteConcern.Safe // TODO - Replica Safe when appropriate!
val mongoConfig = config.getList("akka.mailbox.actor.mailbox.mongodb")// need an explicit definition in akka-conf
val URI_CONFIG_KEY = "akka.actor.mailbox.mongodb.uri"
val WRITE_TIMEOUT_KEY = "akka.actor.mailbox.mongodb.timeout.write"
val READ_TIMEOUT_KEY = "akka.actor.mailbox.mongodb.timeout.read"
val mongoURI = config.getString(URI_CONFIG_KEY)
val writeTimeout = config.getInt(WRITE_TIMEOUT_KEY, 3000)
val readTimeout = config.getInt(READ_TIMEOUT_KEY, 3000)
@volatile private var db = connect()
private val collName = "akka_mailbox.%s".format(name)
@volatile private var mongo = connect()
def enqueue(msg: MessageInvocation) = {
EventHandler.debug(this,
@ -44,8 +49,8 @@ class MongoBasedNaiveMailbox(val owner: ActorRef) extends DurableExecutableMailb
/* TODO - Test if a BSON serializer is registered for the message and only if not, use toByteString? */
val durableMessage = MongoDurableMessage(ownerAddress, msg.receiver, msg.message, msg.channel)
// todo - do we need to filter the actor name at all for safe collection naming?
val result = new DefaultPromise[Boolean](10000) // give the write 10 seconds to succeed ... should we wait infinitely (does akka allow it?)
db.insert(collName)(durableMessage, false)(RequestFutures.write { wr: Either[Throwable, (Option[AnyRef], WriteResult)] => wr match {
val result = new DefaultPromise[Boolean](writeTimeout)
mongo.insert(durableMessage, false)(RequestFutures.write { wr: Either[Throwable, (Option[AnyRef], WriteResult)] => wr match {
case Right((oid, wr)) => result.completeWithResult(true)
case Left(t) => result.completeWithException(t)
}})
@ -61,8 +66,8 @@ class MongoBasedNaiveMailbox(val owner: ActorRef) extends DurableExecutableMailb
* TODO - Should we have a specific query in place? Which way do we sort?
* TODO - Error handling version!
*/
val msgInvocation = new DefaultPromise[MessageInvocation](10000)
db.findAndRemove(collName)(Document.empty) { doc: Option[MongoDurableMessage] => doc match {
val msgInvocation = new DefaultPromise[MessageInvocation](readTimeout)
mongo.findAndRemove(Document.empty) { doc: Option[MongoDurableMessage] => doc match {
case Some(msg) => {
EventHandler.debug(this,
"\nDEQUEUING message in mongo-based mailbox [%s]".format(msg))
@ -81,8 +86,8 @@ class MongoBasedNaiveMailbox(val owner: ActorRef) extends DurableExecutableMailb
}
def size: Int = {
val count = new DefaultPromise[Int](10000)
db.count(collName)()(count.completeWithResult)
val count = new DefaultPromise[Int](readTimeout)
mongo.count()(count.completeWithResult)
count.as[Int].getOrElse(-1)
}
@ -90,9 +95,26 @@ class MongoBasedNaiveMailbox(val owner: ActorRef) extends DurableExecutableMailb
def isEmpty: Boolean = size == 0 //TODO review find other solution, this will be very expensive
private[akka] def connect() = {
require(mongoURI.isDefined, "Mongo URI (%s) must be explicitly defined in akka.conf; will not assume defaults for safety sake.".format(URI_CONFIG_KEY))
EventHandler.info(this,
"\nCONNECTING mongodb { uri : [%s] } ".format(mongoURI))
val _dbh = MongoConnection.fromURI(mongoURI.get) match {
case (conn, None, None) => {
throw new UnsupportedOperationException("You must specify a database name to use with MongoDB; please see the MongoDB Connection URI Spec: 'http://www.mongodb.org/display/DOCS/Connections'")
}
case (conn, Some(db), Some(coll)) => {
EventHandler.warning(this,
"\nCollection name (%s) specified in MongoURI Config will be used as a prefix for mailbox names".format(coll.name))
db("%s.%s".format(coll.name, name))
}
case (conn, Some(db), None) => {
db("mailbox.%s".format(name))
}
case default => throw new IllegalArgumentException("Illegal or unexpected response from Mongo Connection URI Parser: %s".format(default))
}
EventHandler.debug(this,
"\nCONNECTING mongodb { config: [%s] } ".format(mongoConfig))
MongoConnection("localhost", 27017)("akka")
"\nCONNECTED to mongodb { dbh: '%s | %s'} ".format(_dbh, _dbh.name))
_dbh
}
private def withErrorHandling[T](body: => T): T = {
@ -100,7 +122,7 @@ class MongoBasedNaiveMailbox(val owner: ActorRef) extends DurableExecutableMailb
body
} catch {
case e: Exception => {
db = connect()
mongo = connect()
body
}
case e => {

View file

@ -15,7 +15,7 @@ akka {
time-unit = "seconds" # Time unit for all timeout properties throughout the config
event-handlers = ["akka.event.EventHandler$DefaultListener"] # Event handlers to register at boot time (EventHandler$DefaultListener logs to STDOUT)
event-handler-level = "DEBUG" # Options: ERROR, WARNING, INFO, DEBUG
event-handler-level = "INFO" # Options: ERROR, WARNING, INFO, DEBUG
# These boot classes are loaded (and created) automatically when the Akka Microkernel boots up
# Can be used to bootstrap your application(s)
@ -132,9 +132,13 @@ akka {
}
mongodb {
hostname = "127.0.0.1" # IP address or hostname of the MongoDB DB instance
port = 27017 # default port
dbname = "akka" # database name
# Any specified collection name will be used as a prefix for collections that use durable mongo mailboxes
uri = "mongodb://localhost/akka.mailbox" # Follow Mongo URI Spec - http://www.mongodb.org/display/DOCS/Connections
# Configurable timeouts for certain ops
timeout {
read = 3000 # number of milliseconds to wait for a read to succeed before timing out the future
write = 3000 # number of milliseconds to wait for a write to succeed before timing out the future
}
}
zookeeper {