diff --git a/akka-durable-mailboxes/akka-mongo-mailbox/src/main/scala/akka/actor/mailbox/MongoBasedMailbox.scala b/akka-durable-mailboxes/akka-mongo-mailbox/src/main/scala/akka/actor/mailbox/MongoBasedMailbox.scala index 5bd4c7749f..e61d7a613e 100644 --- a/akka-durable-mailboxes/akka-mongo-mailbox/src/main/scala/akka/actor/mailbox/MongoBasedMailbox.scala +++ b/akka-durable-mailboxes/akka-mongo-mailbox/src/main/scala/akka/actor/mailbox/MongoBasedMailbox.scala @@ -33,10 +33,15 @@ class MongoBasedNaiveMailbox(val owner: ActorRef) extends DurableExecutableMailb implicit val mailboxBSONSer = BSONSerializableMailbox implicit val safeWrite = WriteConcern.Safe // TODO - Replica Safe when appropriate! - val mongoConfig = config.getList("akka.mailbox.actor.mailbox.mongodb")// need an explicit definition in akka-conf + val URI_CONFIG_KEY = "akka.actor.mailbox.mongodb.uri" + val WRITE_TIMEOUT_KEY = "akka.actor.mailbox.mongodb.timeout.write" + val READ_TIMEOUT_KEY = "akka.actor.mailbox.mongodb.timeout.read" + val mongoURI = config.getString(URI_CONFIG_KEY) + val writeTimeout = config.getInt(WRITE_TIMEOUT_KEY, 3000) + val readTimeout = config.getInt(READ_TIMEOUT_KEY, 3000) + - @volatile private var db = connect() - private val collName = "akka_mailbox.%s".format(name) + @volatile private var mongo = connect() def enqueue(msg: MessageInvocation) = { EventHandler.debug(this, @@ -44,8 +49,8 @@ class MongoBasedNaiveMailbox(val owner: ActorRef) extends DurableExecutableMailb /* TODO - Test if a BSON serializer is registered for the message and only if not, use toByteString? */ val durableMessage = MongoDurableMessage(ownerAddress, msg.receiver, msg.message, msg.channel) // todo - do we need to filter the actor name at all for safe collection naming? - val result = new DefaultPromise[Boolean](10000) // give the write 10 seconds to succeed ... should we wait infinitely (does akka allow it?) - db.insert(collName)(durableMessage, false)(RequestFutures.write { wr: Either[Throwable, (Option[AnyRef], WriteResult)] => wr match { + val result = new DefaultPromise[Boolean](writeTimeout) + mongo.insert(durableMessage, false)(RequestFutures.write { wr: Either[Throwable, (Option[AnyRef], WriteResult)] => wr match { case Right((oid, wr)) => result.completeWithResult(true) case Left(t) => result.completeWithException(t) }}) @@ -61,8 +66,8 @@ class MongoBasedNaiveMailbox(val owner: ActorRef) extends DurableExecutableMailb * TODO - Should we have a specific query in place? Which way do we sort? * TODO - Error handling version! */ - val msgInvocation = new DefaultPromise[MessageInvocation](10000) - db.findAndRemove(collName)(Document.empty) { doc: Option[MongoDurableMessage] => doc match { + val msgInvocation = new DefaultPromise[MessageInvocation](readTimeout) + mongo.findAndRemove(Document.empty) { doc: Option[MongoDurableMessage] => doc match { case Some(msg) => { EventHandler.debug(this, "\nDEQUEUING message in mongo-based mailbox [%s]".format(msg)) @@ -81,8 +86,8 @@ class MongoBasedNaiveMailbox(val owner: ActorRef) extends DurableExecutableMailb } def size: Int = { - val count = new DefaultPromise[Int](10000) - db.count(collName)()(count.completeWithResult) + val count = new DefaultPromise[Int](readTimeout) + mongo.count()(count.completeWithResult) count.as[Int].getOrElse(-1) } @@ -90,9 +95,26 @@ class MongoBasedNaiveMailbox(val owner: ActorRef) extends DurableExecutableMailb def isEmpty: Boolean = size == 0 //TODO review find other solution, this will be very expensive private[akka] def connect() = { + require(mongoURI.isDefined, "Mongo URI (%s) must be explicitly defined in akka.conf; will not assume defaults for safety sake.".format(URI_CONFIG_KEY)) + EventHandler.info(this, + "\nCONNECTING mongodb { uri : [%s] } ".format(mongoURI)) + val _dbh = MongoConnection.fromURI(mongoURI.get) match { + case (conn, None, None) => { + throw new UnsupportedOperationException("You must specify a database name to use with MongoDB; please see the MongoDB Connection URI Spec: 'http://www.mongodb.org/display/DOCS/Connections'") + } + case (conn, Some(db), Some(coll)) => { + EventHandler.warning(this, + "\nCollection name (%s) specified in MongoURI Config will be used as a prefix for mailbox names".format(coll.name)) + db("%s.%s".format(coll.name, name)) + } + case (conn, Some(db), None) => { + db("mailbox.%s".format(name)) + } + case default => throw new IllegalArgumentException("Illegal or unexpected response from Mongo Connection URI Parser: %s".format(default)) + } EventHandler.debug(this, - "\nCONNECTING mongodb { config: [%s] } ".format(mongoConfig)) - MongoConnection("localhost", 27017)("akka") + "\nCONNECTED to mongodb { dbh: '%s | %s'} ".format(_dbh, _dbh.name)) + _dbh } private def withErrorHandling[T](body: => T): T = { @@ -100,7 +122,7 @@ class MongoBasedNaiveMailbox(val owner: ActorRef) extends DurableExecutableMailb body } catch { case e: Exception => { - db = connect() + mongo = connect() body } case e => { diff --git a/config/akka-reference.conf b/config/akka-reference.conf index eec01dec0e..eb8b8262c0 100644 --- a/config/akka-reference.conf +++ b/config/akka-reference.conf @@ -15,7 +15,7 @@ akka { time-unit = "seconds" # Time unit for all timeout properties throughout the config event-handlers = ["akka.event.EventHandler$DefaultListener"] # Event handlers to register at boot time (EventHandler$DefaultListener logs to STDOUT) - event-handler-level = "DEBUG" # Options: ERROR, WARNING, INFO, DEBUG + event-handler-level = "INFO" # Options: ERROR, WARNING, INFO, DEBUG # These boot classes are loaded (and created) automatically when the Akka Microkernel boots up # Can be used to bootstrap your application(s) @@ -132,9 +132,13 @@ akka { } mongodb { - hostname = "127.0.0.1" # IP address or hostname of the MongoDB DB instance - port = 27017 # default port - dbname = "akka" # database name + # Any specified collection name will be used as a prefix for collections that use durable mongo mailboxes + uri = "mongodb://localhost/akka.mailbox" # Follow Mongo URI Spec - http://www.mongodb.org/display/DOCS/Connections + # Configurable timeouts for certain ops + timeout { + read = 3000 # number of milliseconds to wait for a read to succeed before timing out the future + write = 3000 # number of milliseconds to wait for a write to succeed before timing out the future + } } zookeeper {