/**
* Copyright (C) 2009 Scalable Solutions.
*/
package se.scalablesolutions.akka.state
import se.scalablesolutions.akka.util.Logging
import se.scalablesolutions.akka.Config.config
import sjson.json.Serializer._
import com.mongodb._
import java.util.{Map=>JMap, List=>JList, ArrayList=>JArrayList}
/**
* A module for supporting MongoDB based persistence.
*
* The module offers functionality for:
* Persistent Maps
* Persistent Vectors
* Persistent Refs
*
* @author Debasish Ghosh
*/
private[akka] object MongoStorageBackend extends
MapStorageBackend[AnyRef, AnyRef] with
VectorStorageBackend[AnyRef] with
RefStorageBackend[AnyRef] with
Logging {
// enrich with null safe findOne
class RichDBCollection(value: DBCollection) {
def findOneNS(o: DBObject): Option[DBObject] = {
value.findOne(o) match {
case null => None
case x => Some(x)
}
}
}
implicit def enrichDBCollection(c: DBCollection) = new RichDBCollection(c)
val KEY = "key"
val VALUE = "value"
val COLLECTION = "akka_coll"
val MONGODB_SERVER_HOSTNAME = config.getString("akka.storage.mongodb.hostname", "127.0.0.1")
val MONGODB_SERVER_DBNAME = config.getString("akka.storage.mongodb.dbname", "testdb")
val MONGODB_SERVER_PORT = config.getInt("akka.storage.mongodb.port", 27017)
val db = new Mongo(MONGODB_SERVER_HOSTNAME, MONGODB_SERVER_PORT, MONGODB_SERVER_DBNAME)
val coll = db.getCollection(COLLECTION)
// FIXME: make this pluggable
private[this] val serializer = SJSON
def insertMapStorageEntryFor(name: String, key: AnyRef, value: AnyRef) {
insertMapStorageEntriesFor(name, List((key, value)))
}
def insertMapStorageEntriesFor(name: String, entries: List[Tuple2[AnyRef, AnyRef]]) {
import java.util.{Map, HashMap}
val m: Map[AnyRef, AnyRef] = new HashMap
for ((k, v) <- entries) {
m.put(k, serializer.out(v))
}
nullSafeFindOne(name) match {
case None =>
coll.insert(new BasicDBObject().append(KEY, name).append(VALUE, m))
case Some(dbo) => {
// collate the maps
val o = dbo.get(VALUE).asInstanceOf[Map[AnyRef, AnyRef]]
o.putAll(m)
// remove existing reference
removeMapStorageFor(name)
// and insert
coll.insert(new BasicDBObject().append(KEY, name).append(VALUE, o))
}
}
}
def removeMapStorageFor(name: String): Unit = {
val q = new BasicDBObject
q.put(KEY, name)
coll.remove(q)
}
def removeMapStorageFor(name: String, key: AnyRef): Unit = {
nullSafeFindOne(name) match {
case None =>
case Some(dbo) => {
val orig = dbo.get(VALUE).asInstanceOf[DBObject].toMap
if (key.isInstanceOf[List[_]]) {
val keys = key.asInstanceOf[List[_]]
keys.foreach(k => orig.remove(k.asInstanceOf[String]))
} else {
orig.remove(key.asInstanceOf[String])
}
// remove existing reference
removeMapStorageFor(name)
// and insert
coll.insert(new BasicDBObject().append(KEY, name).append(VALUE, orig))
}
}
}
def getMapStorageEntryFor(name: String, key: AnyRef): Option[AnyRef] =
getValueForKey(name, key.asInstanceOf[String])
def getMapStorageSizeFor(name: String): Int = {
nullSafeFindOne(name) match {
case None => 0
case Some(dbo) =>
dbo.get(VALUE).asInstanceOf[JMap[String, AnyRef]].keySet.size
}
}
def getMapStorageFor(name: String): List[Tuple2[AnyRef, AnyRef]] = {
val m =
nullSafeFindOne(name) match {
case None =>
throw new Predef.NoSuchElementException(name + " not present")
case Some(dbo) =>
dbo.get(VALUE).asInstanceOf[JMap[String, AnyRef]]
}
val n =
List(m.keySet.toArray: _*).asInstanceOf[List[String]]
val vals =
for(s <- n)
yield (s, serializer.in[AnyRef](m.get(s).asInstanceOf[Array[Byte]]))
vals.asInstanceOf[List[Tuple2[String, AnyRef]]]
}
def getMapStorageRangeFor(name: String, start: Option[AnyRef],
finish: Option[AnyRef],
count: Int): List[Tuple2[AnyRef, AnyRef]] = {
val m =
nullSafeFindOne(name) match {
case None =>
throw new Predef.NoSuchElementException(name + " not present")
case Some(dbo) =>
dbo.get(VALUE).asInstanceOf[JMap[String, AnyRef]]
}
/**
* count is the max number of results to return. Start with
* start or 0 (if start is not defined) and go until
* you hit finish or count.
*/
val s = if (start.isDefined) start.get.asInstanceOf[Int] else 0
val cnt =
if (finish.isDefined) {
val f = finish.get.asInstanceOf[Int]
if (f >= s) Math.min(count, (f - s)) else count
}
else count
val n =
List(m.keySet.toArray: _*).asInstanceOf[List[String]].sort((e1, e2) => (e1 compareTo e2) < 0).slice(s, s + cnt)
val vals =
for(s <- n)
yield (s, serializer.in[AnyRef](m.get(s).asInstanceOf[Array[Byte]]))
vals.asInstanceOf[List[Tuple2[String, AnyRef]]]
}
private def getValueForKey(name: String, key: String): Option[AnyRef] = {
try {
nullSafeFindOne(name) match {
case None => None
case Some(dbo) =>
Some(serializer.in[AnyRef](
dbo.get(VALUE)
.asInstanceOf[JMap[String, AnyRef]]
.get(key).asInstanceOf[Array[Byte]]))
}
} catch {
case e =>
throw new Predef.NoSuchElementException(e.getMessage)
}
}
def insertVectorStorageEntriesFor(name: String, elements: List[AnyRef]) = {
val q = new BasicDBObject
q.put(KEY, name)
val currentList =
coll.findOneNS(q) match {
case None =>
new JArrayList[AnyRef]
case Some(dbo) =>
dbo.get(VALUE).asInstanceOf[JArrayList[AnyRef]]
}
if (!currentList.isEmpty) {
// record exists
// remove before adding
coll.remove(q)
}
// add to the current list
elements.map(serializer.out(_)).foreach(currentList.add(_))
coll.insert(
new BasicDBObject()
.append(KEY, name)
.append(VALUE, currentList)
)
}
def insertVectorStorageEntryFor(name: String, element: AnyRef) = {
insertVectorStorageEntriesFor(name, List(element))
}
def getVectorStorageEntryFor(name: String, index: Int): AnyRef = {
try {
val o =
nullSafeFindOne(name) match {
case None =>
throw new Predef.NoSuchElementException(name + " not present")
case Some(dbo) =>
dbo.get(VALUE).asInstanceOf[JList[AnyRef]]
}
serializer.in[AnyRef](
o.get(index).asInstanceOf[Array[Byte]])
} catch {
case e =>
throw new Predef.NoSuchElementException(e.getMessage)
}
}
def getVectorStorageRangeFor(name: String,
start: Option[Int], finish: Option[Int], count: Int): List[AnyRef] = {
try {
val o =
nullSafeFindOne(name) match {
case None =>
throw new Predef.NoSuchElementException(name + " not present")
case Some(dbo) =>
dbo.get(VALUE).asInstanceOf[JList[AnyRef]]
}
// pick the subrange and make a Scala list
val l =
List(o.subList(start.get, start.get + count).toArray: _*)
for(e <- l)
yield serializer.in[AnyRef](e.asInstanceOf[Array[Byte]])
} catch {
case e =>
throw new Predef.NoSuchElementException(e.getMessage)
}
}
// FIXME implement updateVectorStorageEntryFor
def updateVectorStorageEntryFor(name: String, index: Int, elem: AnyRef) = throw new UnsupportedOperationException
def getVectorStorageSizeFor(name: String): Int = {
nullSafeFindOne(name) match {
case None => 0
case Some(dbo) =>
dbo.get(VALUE).asInstanceOf[JList[AnyRef]].size
}
}
private def nullSafeFindOne(name: String): Option[DBObject] = {
val o = new BasicDBObject
o.put(KEY, name)
coll.findOneNS(o)
}
def insertRefStorageFor(name: String, element: AnyRef) = {
nullSafeFindOne(name) match {
case None =>
case Some(dbo) => {
val q = new BasicDBObject
q.put(KEY, name)
coll.remove(q)
}
}
coll.insert(
new BasicDBObject()
.append(KEY, name)
.append(VALUE, serializer.out(element)))
}
def getRefStorageFor(name: String): Option[AnyRef] = {
nullSafeFindOne(name) match {
case None => None
case Some(dbo) =>
Some(serializer.in[AnyRef](dbo.get(VALUE).asInstanceOf[Array[Byte]]))
}
}
}