/** * Copyright (C) 2009 Scalable Solutions. */ package se.scalablesolutions.akka.state import akka.util.Logging import serialization.{Serializer} import Config.config import sjson.json.Serializer._ import com.mongodb._ import scala.collection.mutable.ArrayBuffer import java.util.{Map=>JMap, List=>JList, ArrayList=>JArrayList} /** * A module for supporting MongoDB based persistence. *

* The module offers functionality for: *

  • Persistent Maps
  • *
  • Persistent Vectors
  • *
  • Persistent Refs
  • *

    * @author Debasish Ghosh */ object MongoStorage extends MapStorage with VectorStorage with RefStorage with Logging { // enrich with null safe findOne class RichDBCollection(value: DBCollection) { def findOneNS(o: DBObject): Option[DBObject] = { value.findOne(o) match { case null => None case x => Some(x) } } } implicit def enrichDBCollection(c: DBCollection) = new RichDBCollection(c) val KEY = "key" val VALUE = "value" val COLLECTION = "akka_coll" val MONGODB_SERVER_HOSTNAME = config.getString("akka.storage.mongodb.hostname", "127.0.0.1") val MONGODB_SERVER_DBNAME = config.getString("akka.storage.mongodb.dbname", "testdb") val MONGODB_SERVER_PORT = config.getInt("akka.storage.mongodb.port", 27017) val db = new Mongo(MONGODB_SERVER_HOSTNAME, MONGODB_SERVER_PORT, MONGODB_SERVER_DBNAME) val coll = db.getCollection(COLLECTION) // FIXME: make this pluggable private[this] val serializer = SJSON def insertMapStorageEntryFor(name: String, key: AnyRef, value: AnyRef) { insertMapStorageEntriesFor(name, List((key, value))) } def insertMapStorageEntriesFor(name: String, entries: List[Tuple2[AnyRef, AnyRef]]) { import java.util.{Map, HashMap} val m: Map[AnyRef, AnyRef] = new HashMap for ((k, v) <- entries) m.put(k, serializer.out(v)) nullSafeFindOne(name) match { case None => coll.insert(new BasicDBObject().append(KEY, name).append(VALUE, m)) case Some(dbo) => { // collate the maps val o = dbo.get(VALUE).asInstanceOf[Map[AnyRef, AnyRef]] o.putAll(m) // remove existing reference removeMapStorageFor(name) // and insert coll.insert(new BasicDBObject().append(KEY, name).append(VALUE, o)) } } } def removeMapStorageFor(name: String) = { val q = new BasicDBObject q.put(KEY, name) coll.remove(q) } def removeMapStorageFor(name: String, key: AnyRef) = { nullSafeFindOne(name) match { case None => case Some(dbo) => { val orig = dbo.get(VALUE).asInstanceOf[DBObject].toMap orig.remove(key.asInstanceOf[String]) // remove existing reference removeMapStorageFor(name) // and insert coll.insert(new BasicDBObject().append(KEY, name).append(VALUE, orig)) } } } def getMapStorageEntryFor(name: String, key: AnyRef): Option[AnyRef] = { getValueForKey(name, key.asInstanceOf[String]) } def getMapStorageSizeFor(name: String): Int = { nullSafeFindOne(name) match { case None => 0 case Some(dbo) => dbo.get(VALUE).asInstanceOf[JMap[String, AnyRef]].keySet.size } } def getMapStorageFor(name: String): List[Tuple2[AnyRef, AnyRef]] = { val m = nullSafeFindOne(name) match { case None => throw new Predef.NoSuchElementException(name + " not present") case Some(dbo) => dbo.get(VALUE).asInstanceOf[JMap[String, AnyRef]] } val n = List(m.keySet.toArray: _*).asInstanceOf[List[String]] val vals = for(s <- n) yield (s, serializer.in[AnyRef](m.get(s).asInstanceOf[Array[Byte]])) vals.asInstanceOf[List[Tuple2[String, AnyRef]]] } def getMapStorageRangeFor(name: String, start: Option[AnyRef], finish: Option[AnyRef], count: Int): List[Tuple2[AnyRef, AnyRef]] = { val m = nullSafeFindOne(name) match { case None => throw new Predef.NoSuchElementException(name + " not present") case Some(dbo) => dbo.get(VALUE).asInstanceOf[JMap[String, AnyRef]] } /** * count is the max number of results to return. Start with * start or 0 (if start is not defined) and go until * you hit finish or count. */ val s = if (start.isDefined) start.get.asInstanceOf[Int] else 0 val cnt = if (finish.isDefined) { val f = finish.get.asInstanceOf[Int] if (f >= s) Math.min(count, (f - s)) else count } else count val n = List(m.keySet.toArray: _*).asInstanceOf[List[String]].sort((e1, e2) => (e1 compareTo e2) < 0).slice(s, s + cnt) val vals = for(s <- n) yield (s, serializer.in[AnyRef](m.get(s).asInstanceOf[Array[Byte]])) vals.asInstanceOf[List[Tuple2[String, AnyRef]]] } private def getValueForKey(name: String, key: String): Option[AnyRef] = { try { nullSafeFindOne(name) match { case None => None case Some(dbo) => Some(serializer.in[AnyRef](dbo.get(VALUE).asInstanceOf[JMap[String, AnyRef]].get(key).asInstanceOf[Array[Byte]])) } } catch { case e => throw new Predef.NoSuchElementException(e.getMessage) } } def updateVectorStorageEntryFor(name: String, index: Int, elem: AnyRef) = throw new UnsupportedOperationException("The updateVectorStorageEntryFor method is not yet implemented for MongoDB") def insertVectorStorageEntriesFor(name: String, elements: List[AnyRef]) = { val q = new BasicDBObject q.put(KEY, name) val currentList = coll.findOneNS(q) match { case None => new JArrayList[AnyRef] case Some(dbo) => dbo.get(VALUE).asInstanceOf[JArrayList[AnyRef]] } if (!currentList.isEmpty) { // record exists // remove before adding coll.remove(q) } // add to the current list elements.map(serializer.out(_)).foreach(currentList.add(_)) coll.insert(new BasicDBObject().append(KEY, name).append(VALUE, currentList)) } def insertVectorStorageEntryFor(name: String, element: AnyRef) = insertVectorStorageEntriesFor(name, List(element)) def getVectorStorageEntryFor(name: String, index: Int): AnyRef = { try { val o = nullSafeFindOne(name) match { case None => throw new Predef.NoSuchElementException(name + " not present") case Some(dbo) => dbo.get(VALUE).asInstanceOf[JList[AnyRef]] } serializer.in[AnyRef](o.get(index).asInstanceOf[Array[Byte]]) } catch { case e => throw new Predef.NoSuchElementException(e.getMessage) } } def getVectorStorageRangeFor(name: String, start: Option[Int], finish: Option[Int], count: Int): RandomAccessSeq[AnyRef] = { try { val o = nullSafeFindOne(name) match { case None => throw new Predef.NoSuchElementException(name + " not present") case Some(dbo) => dbo.get(VALUE).asInstanceOf[JList[AnyRef]] } // pick the subrange and make a Scala list val l = List(o.subList(start.get, start.get + count).toArray: _*) val buffer = new ArrayBuffer[AnyRef] for (elem <- l.map(e => serializer.in[AnyRef](e.asInstanceOf[Array[Byte]]))) buffer.append(elem) buffer } catch { case e => throw new Predef.NoSuchElementException(e.getMessage) } } def getVectorStorageSizeFor(name: String): Int = { nullSafeFindOne(name) match { case None => 0 case Some(dbo) => dbo.get(VALUE).asInstanceOf[JList[AnyRef]].size } } private def nullSafeFindOne(name: String): Option[DBObject] = { val o = new BasicDBObject o.put(KEY, name) coll.findOneNS(o) } def insertRefStorageFor(name: String, element: AnyRef) = { nullSafeFindOne(name) match { case None => case Some(dbo) => { val q = new BasicDBObject q.put(KEY, name) coll.remove(q) } } coll.insert(new BasicDBObject().append(KEY, name).append(VALUE, serializer.out(element))) } def getRefStorageFor(name: String): Option[AnyRef] = { nullSafeFindOne(name) match { case None => None case Some(dbo) => Some(serializer.in[AnyRef](dbo.get(VALUE).asInstanceOf[Array[Byte]])) } } }