wip: adding storage for MongoDB and refactoring common storage logic into template methods

This commit is contained in:
debasishg 2009-08-12 17:01:11 +05:30
parent c6e24518ae
commit b4bb7a057d
4 changed files with 335 additions and 30 deletions

View file

@ -20,7 +20,7 @@ import org.apache.thrift.protocol._
/** /**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a> * @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/ */
object CassandraStorage extends Logging { object CassandraStorage extends MapStorage with VectorStorage with Logging {
val KEYSPACE = "akka" val KEYSPACE = "akka"
val MAP_COLUMN_PARENT = new ColumnParent("map", null) val MAP_COLUMN_PARENT = new ColumnParent("map", null)
val VECTOR_COLUMN_PARENT = new ColumnParent("vector", null) val VECTOR_COLUMN_PARENT = new ColumnParent("vector", null)
@ -112,7 +112,7 @@ object CassandraStorage extends Logging {
// For Vector // For Vector
// =============================================================== // ===============================================================
def insertVectorStorageEntryFor(name: String, element: AnyRef) = if (sessions.isDefined) { override def insertVectorStorageEntryFor(name: String, element: AnyRef) = if (sessions.isDefined) {
sessions.get.withSession { sessions.get.withSession {
_ ++| (name, _ ++| (name,
new ColumnPath(VECTOR_COLUMN_PARENT.getColumn_family, null, intToBytes(getVectorStorageSizeFor(name))), new ColumnPath(VECTOR_COLUMN_PARENT.getColumn_family, null, intToBytes(getVectorStorageSizeFor(name))),
@ -122,7 +122,10 @@ object CassandraStorage extends Logging {
} }
} else throw new IllegalStateException("CassandraStorage is not started") } else throw new IllegalStateException("CassandraStorage is not started")
def getVectorStorageEntryFor(name: String, index: Int): AnyRef = if (sessions.isDefined) { override def insertVectorStorageEntriesFor(name: String, elements: List[AnyRef]) = {
}
override def getVectorStorageEntryFor(name: String, index: Int): AnyRef = if (sessions.isDefined) {
val column: Option[Column] = sessions.get.withSession { val column: Option[Column] = sessions.get.withSession {
_ | (name, new ColumnPath(VECTOR_COLUMN_PARENT.getColumn_family, null, intToBytes(index))) _ | (name, new ColumnPath(VECTOR_COLUMN_PARENT.getColumn_family, null, intToBytes(index)))
} }
@ -130,7 +133,7 @@ object CassandraStorage extends Logging {
else throw new NoSuchElementException("No element for vector [" + name + "] and index [" + index + "]") else throw new NoSuchElementException("No element for vector [" + name + "] and index [" + index + "]")
} else throw new IllegalStateException("CassandraStorage is not started") } else throw new IllegalStateException("CassandraStorage is not started")
def getVectorStorageRangeFor(name: String, start: Option[Int], finish: Option[Int], count: Int): List[AnyRef] = if (sessions.isDefined) { override def getVectorStorageRangeFor(name: String, start: Option[Int], finish: Option[Int], count: Int): List[AnyRef] = if (sessions.isDefined) {
val startBytes = if (start.isDefined) intToBytes(start.get) else null val startBytes = if (start.isDefined) intToBytes(start.get) else null
val finishBytes = if (finish.isDefined) intToBytes(finish.get) else null val finishBytes = if (finish.isDefined) intToBytes(finish.get) else null
val columns: List[Column] = sessions.get.withSession { val columns: List[Column] = sessions.get.withSession {
@ -144,7 +147,7 @@ object CassandraStorage extends Logging {
columns.map(column => serializer.in(column.value, None)) columns.map(column => serializer.in(column.value, None))
} else throw new IllegalStateException("CassandraStorage is not started") } else throw new IllegalStateException("CassandraStorage is not started")
def getVectorStorageSizeFor(name: String): Int = if (sessions.isDefined) { override def getVectorStorageSizeFor(name: String): Int = if (sessions.isDefined) {
sessions.get.withSession { sessions.get.withSession {
_ |# (name, VECTOR_COLUMN_PARENT) _ |# (name, VECTOR_COLUMN_PARENT)
} }
@ -154,7 +157,7 @@ object CassandraStorage extends Logging {
// For Map // For Map
// =============================================================== // ===============================================================
def insertMapStorageEntryFor(name: String, key: AnyRef, element: AnyRef) = if (sessions.isDefined) { override def insertMapStorageEntryFor(name: String, key: AnyRef, element: AnyRef) = if (sessions.isDefined) {
sessions.get.withSession { sessions.get.withSession {
_ ++| (name, _ ++| (name,
new ColumnPath(MAP_COLUMN_PARENT.getColumn_family, null, serializer.out(key)), new ColumnPath(MAP_COLUMN_PARENT.getColumn_family, null, serializer.out(key)),
@ -164,7 +167,7 @@ object CassandraStorage extends Logging {
} }
} else throw new IllegalStateException("CassandraStorage is not started") } else throw new IllegalStateException("CassandraStorage is not started")
def insertMapStorageEntriesFor(name: String, entries: List[Tuple2[AnyRef, AnyRef]]) = if (sessions.isDefined) { override def insertMapStorageEntriesFor(name: String, entries: List[Tuple2[AnyRef, AnyRef]]) = if (sessions.isDefined) {
val cf2columns: java.util.Map[String, java.util.List[Column]] = new java.util.HashMap val cf2columns: java.util.Map[String, java.util.List[Column]] = new java.util.HashMap
for (entry <- entries) { for (entry <- entries) {
val columns: java.util.List[Column] = new java.util.ArrayList val columns: java.util.List[Column] = new java.util.ArrayList
@ -176,7 +179,7 @@ object CassandraStorage extends Logging {
} }
} else throw new IllegalStateException("CassandraStorage is not started") } else throw new IllegalStateException("CassandraStorage is not started")
def getMapStorageEntryFor(name: String, key: AnyRef): Option[AnyRef] = if (sessions.isDefined) { override def getMapStorageEntryFor(name: String, key: AnyRef): Option[AnyRef] = if (sessions.isDefined) {
try { try {
val column: Option[Column] = sessions.get.withSession { val column: Option[Column] = sessions.get.withSession {
_ | (name, new ColumnPath(MAP_COLUMN_PARENT.getColumn_family, null, serializer.out(key))) _ | (name, new ColumnPath(MAP_COLUMN_PARENT.getColumn_family, null, serializer.out(key)))
@ -190,7 +193,7 @@ object CassandraStorage extends Logging {
} }
} else throw new IllegalStateException("CassandraStorage is not started") } else throw new IllegalStateException("CassandraStorage is not started")
def getMapStorageFor(name: String): List[Tuple2[AnyRef, AnyRef]] = if (sessions.isDefined) { override def getMapStorageFor(name: String): List[Tuple2[AnyRef, AnyRef]] = if (sessions.isDefined) {
throw new UnsupportedOperationException throw new UnsupportedOperationException
/* /*
val columns = server.get_columns_since(name, MAP_COLUMN_FAMILY, -1) val columns = server.get_columns_since(name, MAP_COLUMN_FAMILY, -1)
@ -202,15 +205,15 @@ object CassandraStorage extends Logging {
*/ */
} else throw new IllegalStateException("CassandraStorage is not started") } else throw new IllegalStateException("CassandraStorage is not started")
def getMapStorageSizeFor(name: String): Int = if (sessions.isDefined) { override def getMapStorageSizeFor(name: String): Int = if (sessions.isDefined) {
sessions.get.withSession { sessions.get.withSession {
_ |# (name, MAP_COLUMN_PARENT) _ |# (name, MAP_COLUMN_PARENT)
} }
} else throw new IllegalStateException("CassandraStorage is not started") } else throw new IllegalStateException("CassandraStorage is not started")
def removeMapStorageFor(name: String): Unit = removeMapStorageFor(name, null) override def removeMapStorageFor(name: String): Unit = removeMapStorageFor(name, null)
def removeMapStorageFor(name: String, key: AnyRef): Unit = if (sessions.isDefined) { override def removeMapStorageFor(name: String, key: AnyRef): Unit = if (sessions.isDefined) {
val keyBytes = if (key == null) null else serializer.out(key) val keyBytes = if (key == null) null else serializer.out(key)
sessions.get.withSession { sessions.get.withSession {
_ -- (name, _ -- (name,
@ -220,7 +223,7 @@ object CassandraStorage extends Logging {
} }
} else throw new IllegalStateException("CassandraStorage is not started") } else throw new IllegalStateException("CassandraStorage is not started")
def getMapStorageRangeFor(name: String, start: Option[AnyRef], finish: Option[AnyRef], count: Int): override def getMapStorageRangeFor(name: String, start: Option[AnyRef], finish: Option[AnyRef], count: Int):
List[Tuple2[AnyRef, AnyRef]] = if (sessions.isDefined) { List[Tuple2[AnyRef, AnyRef]] = if (sessions.isDefined) {
val startBytes = if (start.isDefined) serializer.out(start.get) else null val startBytes = if (start.isDefined) serializer.out(start.get) else null
val finishBytes = if (finish.isDefined) serializer.out(finish.get) else null val finishBytes = if (finish.isDefined) serializer.out(finish.get) else null

View file

@ -0,0 +1,226 @@
package se.scalablesolutions.akka.kernel.state
import com.mongodb._
import se.scalablesolutions.akka.kernel.util.Logging
import serialization.{Serializer}
import java.util.{Map=>JMap, List=>JList, ArrayList=>JArrayList}
object MongoStorage extends MapStorage
with VectorStorage with Logging {
// enrich with null safe findOne
class RichDBCollection(value: DBCollection) {
def findOneNS(o: DBObject): Option[DBObject] = {
value.findOne(o) match {
case null => None
case x => Some(x)
}
}
}
implicit def enrichDBCollection(c: DBCollection) = new RichDBCollection(c)
val KEY = "key"
val VALUE = "value"
val db = new Mongo("mydb"); // @fixme: need to externalize
val COLLECTION = "akka_coll"
val coll = db.getCollection(COLLECTION)
// @fixme: make this pluggable
private[this] val serializer: Serializer = Serializer.ScalaJSON
override def insertMapStorageEntryFor(name: String, key: AnyRef, value: AnyRef) {
insertMapStorageEntriesFor(name, List((key, value)))
}
override def insertMapStorageEntriesFor(name: String, entries: List[Tuple2[AnyRef, AnyRef]]) {
import java.util.{Map, HashMap}
val m: Map[AnyRef, AnyRef] = new HashMap
for ((k, v) <- entries) {
m.put(k, serializer.out(v))
}
nullSafeFindOne(name) match {
case None =>
coll.insert(new BasicDBObject().append(KEY, name).append(VALUE, m))
case Some(dbo) => {
// collate the maps
val o = dbo.get(VALUE).asInstanceOf[Map[AnyRef, AnyRef]]
o.putAll(m)
// remove existing reference
removeMapStorageFor(name)
// and insert
coll.insert(new BasicDBObject().append(KEY, name).append(VALUE, o))
}
}
}
override def removeMapStorageFor(name: String) = {
val q = new BasicDBObject
q.put(KEY, name)
coll.remove(q)
}
override def removeMapStorageFor(name: String, key: AnyRef) = {
}
override def getMapStorageEntryFor(name: String, key: AnyRef): Option[AnyRef] = {
try {
getValueForKey(name, key.asInstanceOf[String])
} catch {
case e =>
e.printStackTrace
None
}
}
override def getMapStorageSizeFor(name: String): Int = {
nullSafeFindOne(name) match {
case None => 0
case Some(dbo) =>
dbo.get(VALUE).asInstanceOf[JMap[String, AnyRef]].keySet.size
}
}
override def getMapStorageFor(name: String): List[Tuple2[AnyRef, AnyRef]] = {
val m =
nullSafeFindOne(name) match {
case None =>
throw new Predef.NoSuchElementException(name + " not present")
case Some(dbo) =>
dbo.get(VALUE).asInstanceOf[JMap[String, AnyRef]]
}
val n =
List(m.keySet.toArray: _*).asInstanceOf[List[String]]
val vals =
for(s <- n)
yield (s, serializer.in(m.get(s).asInstanceOf[Array[Byte]], None))
vals.asInstanceOf[List[Tuple2[String, AnyRef]]]
}
override def getMapStorageRangeFor(name: String, start: Option[AnyRef],
finish: Option[AnyRef],
count: Int): List[Tuple2[AnyRef, AnyRef]] = {
val m =
nullSafeFindOne(name) match {
case None =>
throw new Predef.NoSuchElementException(name + " not present")
case Some(dbo) =>
dbo.get(VALUE).asInstanceOf[JMap[String, AnyRef]]
}
val s = start.get.asInstanceOf[Int]
val n =
List(m.keySet.toArray: _*).asInstanceOf[List[String]].slice(s, s + count)
val vals =
for(s <- n)
yield (s, serializer.in(m.get(s).asInstanceOf[Array[Byte]], None))
vals.asInstanceOf[List[Tuple2[String, AnyRef]]]
}
private def getValueForKey(name: String, key: String): Option[AnyRef] = {
try {
nullSafeFindOne(name) match {
case None => None
case Some(dbo) =>
Some(serializer.in(
dbo.get(VALUE)
.asInstanceOf[JMap[String, AnyRef]]
.get(key).asInstanceOf[Array[Byte]], None))
}
} catch {
case e =>
throw new Predef.NoSuchElementException(e.getMessage)
}
}
def insertVectorStorageEntriesFor(name: String, elements: List[AnyRef]) = {
val q = new BasicDBObject
q.put(KEY, name)
val currentList =
coll.findOneNS(q) match {
case None =>
new JArrayList[AnyRef]
case Some(dbo) =>
dbo.get(VALUE).asInstanceOf[JArrayList[AnyRef]]
}
if (!currentList.isEmpty) {
// record exists
// remove before adding
coll.remove(q)
}
// add to the current list
elements.map(serializer.out(_)).foreach(currentList.add(_))
coll.insert(
new BasicDBObject()
.append(KEY, name)
.append(VALUE, currentList)
)
}
override def insertVectorStorageEntryFor(name: String, element: AnyRef) = {
insertVectorStorageEntriesFor(name, List(element))
}
override def getVectorStorageEntryFor(name: String, index: Int): AnyRef = {
try {
val o =
nullSafeFindOne(name) match {
case None =>
throw new Predef.NoSuchElementException(name + " not present")
case Some(dbo) =>
dbo.get(VALUE).asInstanceOf[JList[AnyRef]]
}
serializer.in(
o.get(index).asInstanceOf[Array[Byte]],
None
)
} catch {
case e =>
throw new Predef.NoSuchElementException(e.getMessage)
}
}
override def getVectorStorageRangeFor(name: String, start: Option[Int], finish: Option[Int], count: Int): List[AnyRef] = {
try {
val o =
nullSafeFindOne(name) match {
case None =>
throw new Predef.NoSuchElementException(name + " not present")
case Some(dbo) =>
dbo.get(VALUE).asInstanceOf[JList[AnyRef]]
}
// pick the subrange and make a Scala list
val l =
List(o.subList(start.get, start.get + count).toArray: _*)
for(e <- l)
yield serializer.in(e.asInstanceOf[Array[Byte]], None)
} catch {
case e =>
throw new Predef.NoSuchElementException(e.getMessage)
}
}
override def getVectorStorageSizeFor(name: String): Int = {
nullSafeFindOne(name) match {
case None => 0
case Some(dbo) =>
dbo.get(VALUE).asInstanceOf[JList[AnyRef]].size
}
}
private def nullSafeFindOne(name: String): Option[DBObject] = {
val o = new BasicDBObject
o.put(KEY, name)
coll.findOneNS(o)
}
}

View file

@ -16,6 +16,7 @@ abstract class PersistentStorageConfig extends TransactionalStateConfig
case class CassandraStorageConfig extends PersistentStorageConfig case class CassandraStorageConfig extends PersistentStorageConfig
case class TerracottaStorageConfig extends PersistentStorageConfig case class TerracottaStorageConfig extends PersistentStorageConfig
case class TokyoCabinetStorageConfig extends PersistentStorageConfig case class TokyoCabinetStorageConfig extends PersistentStorageConfig
case class MongoStorageConfig extends PersistentStorageConfig
/** /**
* Scala API. * Scala API.
@ -39,12 +40,14 @@ object TransactionalState extends TransactionalState
class TransactionalState { class TransactionalState {
def newPersistentMap(config: PersistentStorageConfig): TransactionalMap[AnyRef, AnyRef] = config match { def newPersistentMap(config: PersistentStorageConfig): TransactionalMap[AnyRef, AnyRef] = config match {
case CassandraStorageConfig() => new CassandraPersistentTransactionalMap case CassandraStorageConfig() => new CassandraPersistentTransactionalMap
case MongoStorageConfig() => new MongoPersistentTransactionalMap
case TerracottaStorageConfig() => throw new UnsupportedOperationException case TerracottaStorageConfig() => throw new UnsupportedOperationException
case TokyoCabinetStorageConfig() => throw new UnsupportedOperationException case TokyoCabinetStorageConfig() => throw new UnsupportedOperationException
} }
def newPersistentVector(config: PersistentStorageConfig): TransactionalVector[AnyRef] = config match { def newPersistentVector(config: PersistentStorageConfig): TransactionalVector[AnyRef] = config match {
case CassandraStorageConfig() => new CassandraPersistentTransactionalVector case CassandraStorageConfig() => new CassandraPersistentTransactionalVector
case MongoStorageConfig() => new MongoPersistentTransactionalVector
case TerracottaStorageConfig() => throw new UnsupportedOperationException case TerracottaStorageConfig() => throw new UnsupportedOperationException
case TokyoCabinetStorageConfig() => throw new UnsupportedOperationException case TokyoCabinetStorageConfig() => throw new UnsupportedOperationException
} }
@ -194,16 +197,23 @@ abstract class PersistentTransactionalMap[K, V] extends TransactionalMap[K, V] {
} }
/** /**
* Implements a persistent transactional map based on the Cassandra distributed P2P key-value storage. * Implementation of <tt>PersistentTransactionalMap</tt> for every concrete
* storage will have the same workflow. This abstracts the workflow.
*
* Subclasses just need to provide the actual concrete instance for the
* abstract val <tt>storage</tt>.
* *
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a> * @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/ */
class CassandraPersistentTransactionalMap extends PersistentTransactionalMap[AnyRef, AnyRef] { abstract class TemplatePersistentTransactionalMap extends PersistentTransactionalMap[AnyRef, AnyRef] {
// to be concretized in subclasses
val storage: MapStorage
override def remove(key: AnyRef) = { override def remove(key: AnyRef) = {
verifyTransaction verifyTransaction
if (changeSet.contains(key)) changeSet -= key if (changeSet.contains(key)) changeSet -= key
else CassandraStorage.removeMapStorageFor(uuid, key) else storage.removeMapStorageFor(uuid, key)
} }
override def getRange(start: Option[AnyRef], count: Int) = override def getRange(start: Option[AnyRef], count: Int) =
@ -212,7 +222,7 @@ class CassandraPersistentTransactionalMap extends PersistentTransactionalMap[Any
def getRange(start: Option[AnyRef], finish: Option[AnyRef], count: Int) = { def getRange(start: Option[AnyRef], finish: Option[AnyRef], count: Int) = {
verifyTransaction verifyTransaction
try { try {
CassandraStorage.getMapStorageRangeFor(uuid, start, finish, count) storage.getMapStorageRangeFor(uuid, start, finish, count)
} catch { } catch {
case e: Exception => Nil case e: Exception => Nil
} }
@ -220,7 +230,7 @@ class CassandraPersistentTransactionalMap extends PersistentTransactionalMap[Any
// ---- For Transactional ---- // ---- For Transactional ----
override def commit = { override def commit = {
CassandraStorage.insertMapStorageEntriesFor(uuid, changeSet.toList) storage.insertMapStorageEntriesFor(uuid, changeSet.toList)
changeSet.clear changeSet.clear
} }
@ -228,7 +238,7 @@ class CassandraPersistentTransactionalMap extends PersistentTransactionalMap[Any
override def clear = { override def clear = {
verifyTransaction verifyTransaction
try { try {
CassandraStorage.removeMapStorageFor(uuid) storage.removeMapStorageFor(uuid)
} catch { } catch {
case e: Exception => {} case e: Exception => {}
} }
@ -237,7 +247,7 @@ class CassandraPersistentTransactionalMap extends PersistentTransactionalMap[Any
override def contains(key: AnyRef): Boolean = { override def contains(key: AnyRef): Boolean = {
try { try {
verifyTransaction verifyTransaction
CassandraStorage.getMapStorageEntryFor(uuid, key).isDefined storage.getMapStorageEntryFor(uuid, key).isDefined
} catch { } catch {
case e: Exception => false case e: Exception => false
} }
@ -246,7 +256,7 @@ class CassandraPersistentTransactionalMap extends PersistentTransactionalMap[Any
override def size: Int = { override def size: Int = {
verifyTransaction verifyTransaction
try { try {
CassandraStorage.getMapStorageSizeFor(uuid) storage.getMapStorageSizeFor(uuid)
} catch { } catch {
case e: Exception => 0 case e: Exception => 0
} }
@ -258,7 +268,7 @@ class CassandraPersistentTransactionalMap extends PersistentTransactionalMap[Any
// if (changeSet.contains(key)) changeSet.get(key) // if (changeSet.contains(key)) changeSet.get(key)
// else { // else {
val result = try { val result = try {
CassandraStorage.getMapStorageEntryFor(uuid, key) storage.getMapStorageEntryFor(uuid, key)
} catch { } catch {
case e: Exception => None case e: Exception => None
} }
@ -270,7 +280,7 @@ class CassandraPersistentTransactionalMap extends PersistentTransactionalMap[Any
//verifyTransaction //verifyTransaction
new Iterator[Tuple2[AnyRef, AnyRef]] { new Iterator[Tuple2[AnyRef, AnyRef]] {
private val originalList: List[Tuple2[AnyRef, AnyRef]] = try { private val originalList: List[Tuple2[AnyRef, AnyRef]] = try {
CassandraStorage.getMapStorageFor(uuid) storage.getMapStorageFor(uuid)
} catch { } catch {
case e: Throwable => Nil case e: Throwable => Nil
} }
@ -285,6 +295,25 @@ class CassandraPersistentTransactionalMap extends PersistentTransactionalMap[Any
} }
} }
/**
* Implements a persistent transactional map based on the Cassandra distributed P2P key-value storage.
*
* @author <a href="http://debasishg.blogspot.com">Debasish Ghosh</a>
*/
class CassandraPersistentTransactionalMap extends TemplatePersistentTransactionalMap {
val storage = CassandraStorage
}
/**
* Implements a persistent transactional map based on the MongoDB distributed P2P key-value storage.
*
* @author <a href="http://debasishg.blogspot.com">Debasish Ghosh</a>
*/
class MongoPersistentTransactionalMap extends TemplatePersistentTransactionalMap {
val storage = MongoStorage
}
/** /**
* Base for all transactional vector implementations. * Base for all transactional vector implementations.
* *
@ -382,17 +411,19 @@ abstract class PersistentTransactionalVector[T] extends TransactionalVector[T] {
} }
/** /**
* Implements a persistent transactional vector based on the Cassandra distributed P2P key-value storage. * Implements a template for a concrete persistent transactional vector based storage.
* *
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a> * @author <a href="http://debasishg.blogspot.com">Debasish Ghosh</a>
*/ */
class CassandraPersistentTransactionalVector extends PersistentTransactionalVector[AnyRef] { abstract class TemplatePersistentTransactionalVector extends PersistentTransactionalVector[AnyRef] {
val storage: VectorStorage
// ---- For TransactionalVector ---- // ---- For TransactionalVector ----
override def get(index: Int): AnyRef = { override def get(index: Int): AnyRef = {
verifyTransaction verifyTransaction
if (changeSet.size > index) changeSet(index) if (changeSet.size > index) changeSet(index)
else CassandraStorage.getVectorStorageEntryFor(uuid, index) else storage.getVectorStorageEntryFor(uuid, index)
} }
override def getRange(start: Int, count: Int): List[AnyRef] = override def getRange(start: Int, count: Int): List[AnyRef] =
@ -400,12 +431,12 @@ class CassandraPersistentTransactionalVector extends PersistentTransactionalVect
def getRange(start: Option[Int], finish: Option[Int], count: Int): List[AnyRef] = { def getRange(start: Option[Int], finish: Option[Int], count: Int): List[AnyRef] = {
verifyTransaction verifyTransaction
CassandraStorage.getVectorStorageRangeFor(uuid, start, finish, count) storage.getVectorStorageRangeFor(uuid, start, finish, count)
} }
override def length: Int = { override def length: Int = {
verifyTransaction verifyTransaction
CassandraStorage.getVectorStorageSizeFor(uuid) storage.getVectorStorageSizeFor(uuid)
} }
override def apply(index: Int): AnyRef = get(index) override def apply(index: Int): AnyRef = get(index)
@ -422,11 +453,29 @@ class CassandraPersistentTransactionalVector extends PersistentTransactionalVect
// ---- For Transactional ---- // ---- For Transactional ----
override def commit = { override def commit = {
// FIXME: should use batch function once the bug is resolved // FIXME: should use batch function once the bug is resolved
for (element <- changeSet) CassandraStorage.insertVectorStorageEntryFor(uuid, element) for (element <- changeSet) storage.insertVectorStorageEntryFor(uuid, element)
changeSet.clear changeSet.clear
} }
} }
/**
* Implements a persistent transactional vector based on the Cassandra distributed P2P key-value storage.
*
* @author <a href="http://debasishg.blogspot.com">Debaissh Ghosh</a>
*/
class CassandraPersistentTransactionalVector extends TemplatePersistentTransactionalVector {
val storage = CassandraStorage
}
/**
* Implements a persistent transactional vector based on the MongoDB distributed P2P key-value storage.
*
* @author <a href="http://debasishg.blogspot.com">Debaissh Ghosh</a>
*/
class MongoPersistentTransactionalVector extends TemplatePersistentTransactionalVector {
val storage = MongoStorage
}
/** /**
* Implements a transactional reference. * Implements a transactional reference.
* *

View file

@ -0,0 +1,27 @@
package se.scalablesolutions.akka.kernel.state
// abstracts persistence storage
trait Storage {
}
// for Maps
trait MapStorage extends Storage {
def insertMapStorageEntriesFor(name: String, entries: List[Tuple2[AnyRef, AnyRef]])
def insertMapStorageEntryFor(name: String, key: AnyRef, value: AnyRef)
def removeMapStorageFor(name: String)
def removeMapStorageFor(name: String, key: AnyRef)
def getMapStorageEntryFor(name: String, key: AnyRef): Option[AnyRef]
def getMapStorageSizeFor(name: String): Int
def getMapStorageFor(name: String): List[Tuple2[AnyRef, AnyRef]]
def getMapStorageRangeFor(name: String, start: Option[AnyRef],
finish: Option[AnyRef], count: Int): List[Tuple2[AnyRef, AnyRef]]
}
// for vectors
trait VectorStorage extends Storage {
def insertVectorStorageEntryFor(name: String, element: AnyRef)
def insertVectorStorageEntriesFor(name: String, elements: List[AnyRef])
def getVectorStorageEntryFor(name: String, index: Int): AnyRef
def getVectorStorageRangeFor(name: String, start: Option[Int], finish: Option[Int], count: Int): List[AnyRef]
def getVectorStorageSizeFor(name: String): Int
}