rewrote the persistent storage with support for unit-of-work and new multiverse stm

This commit is contained in:
jboner 2009-09-10 01:33:01 +02:00
parent e4a4451533
commit dfc08f559d
21 changed files with 1275 additions and 3835 deletions

View file

@ -9,11 +9,14 @@ import java.io.{Flushable, Closeable}
import util.Logging
import util.Helpers._
import serialization.Serializer
import akka.Config.config
import Config.config
import org.apache.cassandra.db.ColumnFamily
import org.apache.cassandra.service._
import org.apache.commons.pool._
import org.apache.commons.pool.impl._
import org.apache.thrift.transport._
import org.apache.thrift.protocol._
@ -238,3 +241,89 @@ object Protocol {
object SimpleJSON extends Protocol(new TSimpleJSONProtocol.Factory)
object JSON extends Protocol(new TJSONProtocol.Factory)
}
trait Pool[T] extends java.io.Closeable {
def borrowObject: T
def returnObject(t: T): Unit
def invalidateObject(t: T): Unit
def addObject: Unit
def getNumIdle: Int
def getNumActive: Int
def clear: Unit
def setFactory(factory: PoolItemFactory[T]): Unit
}
trait PoolFactory[T] {
def createPool: Pool[T]
}
trait PoolItemFactory[T] {
def makeObject: T
def destroyObject(t: T): Unit
def validateObject(t: T): Boolean
def activateObject(t: T): Unit
def passivateObject(t: T): Unit
}
trait PoolBridge[T, OP <: ObjectPool] extends Pool[T] {
val impl: OP
override def borrowObject: T = impl.borrowObject.asInstanceOf[T]
override def returnObject(t: T) = impl.returnObject(t)
override def invalidateObject(t: T) = impl.invalidateObject(t)
override def addObject = impl.addObject
override def getNumIdle: Int = impl.getNumIdle
override def getNumActive: Int = impl.getNumActive
override def clear: Unit = impl.clear
override def close: Unit = impl.close
override def setFactory(factory: PoolItemFactory[T]) = impl.setFactory(toPoolableObjectFactory(factory))
def toPoolableObjectFactory[T](pif: PoolItemFactory[T]) = new PoolableObjectFactory {
def makeObject: Object = pif.makeObject.asInstanceOf[Object]
def destroyObject(o: Object): Unit = pif.destroyObject(o.asInstanceOf[T])
def validateObject(o: Object): Boolean = pif.validateObject(o.asInstanceOf[T])
def activateObject(o: Object): Unit = pif.activateObject(o.asInstanceOf[T])
def passivateObject(o: Object): Unit = pif.passivateObject(o.asInstanceOf[T])
}
}
object StackPool {
def apply[T](factory: PoolItemFactory[T]) = new PoolBridge[T,StackObjectPool] {
val impl = new StackObjectPool(toPoolableObjectFactory(factory))
}
def apply[T](factory: PoolItemFactory[T], maxIdle: Int) = new PoolBridge[T,StackObjectPool] {
val impl = new StackObjectPool(toPoolableObjectFactory(factory),maxIdle)
}
def apply[T](factory: PoolItemFactory[T], maxIdle: Int, initIdleCapacity: Int) = new PoolBridge[T,StackObjectPool] {
val impl = new StackObjectPool(toPoolableObjectFactory(factory),maxIdle,initIdleCapacity)
}
}
object SoftRefPool {
def apply[T](factory: PoolItemFactory[T]) = new PoolBridge[T,SoftReferenceObjectPool] {
val impl = new SoftReferenceObjectPool(toPoolableObjectFactory(factory))
}
def apply[T](factory: PoolItemFactory[T], initSize: Int) = new PoolBridge[T,SoftReferenceObjectPool] {
val impl = new SoftReferenceObjectPool(toPoolableObjectFactory(factory),initSize)
}
}
trait TransportFactory[T <: TTransport] extends PoolItemFactory[T] {
def createTransport: T
def makeObject: T = createTransport
def destroyObject(transport: T): Unit = transport.close
def validateObject(transport: T) = transport.isOpen
def activateObject(transport: T): Unit = if( !transport.isOpen ) transport.open else ()
def passivateObject(transport: T): Unit = transport.flush
}
case class SocketProvider(val host: String, val port: Int) extends TransportFactory[TSocket] {
def createTransport = {
val t = new TSocket(host, port)
t.open
t
}
}

View file

@ -17,6 +17,8 @@ import org.apache.cassandra.service._
import org.apache.thrift.transport._
import org.apache.thrift.protocol._
import scala.collection.mutable.ArrayBuffer
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
@ -62,11 +64,12 @@ object CassandraStorage extends MapStorage
StackPool(SocketProvider(CASSANDRA_SERVER_HOSTNAME, CASSANDRA_SERVER_PORT)),
protocol,
CONSISTENCY_LEVEL)
// ===============================================================
// For Ref
// ===============================================================
override def insertRefStorageFor(name: String, element: AnyRef) = {
def insertRefStorageFor(name: String, element: AnyRef) = {
sessions.withSession {
_ ++| (name,
new ColumnPath(REF_COLUMN_PARENT.getColumn_family, null, REF_KEY),
@ -76,7 +79,7 @@ object CassandraStorage extends MapStorage
}
}
override def getRefStorageFor(name: String): Option[AnyRef] = {
def getRefStorageFor(name: String): Option[AnyRef] = {
try {
val column: Option[Column] = sessions.withSession {
_ | (name, new ColumnPath(REF_COLUMN_PARENT.getColumn_family, null, REF_KEY))
@ -94,7 +97,7 @@ object CassandraStorage extends MapStorage
// For Vector
// ===============================================================
override def insertVectorStorageEntryFor(name: String, element: AnyRef) = {
def insertVectorStorageEntryFor(name: String, element: AnyRef) = {
sessions.withSession {
_ ++| (name,
new ColumnPath(VECTOR_COLUMN_PARENT.getColumn_family, null, intToBytes(getVectorStorageSizeFor(name))),
@ -104,10 +107,20 @@ object CassandraStorage extends MapStorage
}
}
override def insertVectorStorageEntriesFor(name: String, elements: List[AnyRef]) = {
def insertVectorStorageEntriesFor(name: String, elements: List[AnyRef]) = {
}
override def getVectorStorageEntryFor(name: String, index: Int): AnyRef = {
def updateVectorStorageEntryFor(name: String, index: Int, elem: AnyRef) = {
sessions.withSession {
_ ++| (name,
new ColumnPath(VECTOR_COLUMN_PARENT.getColumn_family, null, intToBytes(index)),
serializer.out(elem),
System.currentTimeMillis,
CONSISTENCY_LEVEL)
}
}
def getVectorStorageEntryFor(name: String, index: Int): AnyRef = {
val column: Option[Column] = sessions.withSession {
_ | (name, new ColumnPath(VECTOR_COLUMN_PARENT.getColumn_family, null, intToBytes(index)))
}
@ -115,7 +128,7 @@ object CassandraStorage extends MapStorage
else throw new NoSuchElementException("No element for vector [" + name + "] and index [" + index + "]")
}
override def getVectorStorageRangeFor(name: String, start: Option[Int], finish: Option[Int], count: Int): List[AnyRef] = {
def getVectorStorageRangeFor(name: String, start: Option[Int], finish: Option[Int], count: Int): RandomAccessSeq[AnyRef] = {
val startBytes = if (start.isDefined) intToBytes(start.get) else null
val finishBytes = if (finish.isDefined) intToBytes(finish.get) else null
val columns: List[Column] = sessions.withSession {
@ -126,10 +139,12 @@ object CassandraStorage extends MapStorage
count,
CONSISTENCY_LEVEL)
}
columns.map(column => serializer.in(column.value, None))
val buffer = new ArrayBuffer[AnyRef]
for (elem <- columns.map(column => serializer.in(column.value, None))) buffer.append(elem)
buffer
}
override def getVectorStorageSizeFor(name: String): Int = {
def getVectorStorageSizeFor(name: String): Int = {
sessions.withSession {
_ |# (name, VECTOR_COLUMN_PARENT)
}
@ -139,7 +154,7 @@ object CassandraStorage extends MapStorage
// For Map
// ===============================================================
override def insertMapStorageEntryFor(name: String, key: AnyRef, element: AnyRef) = {
def insertMapStorageEntryFor(name: String, key: AnyRef, element: AnyRef) = {
sessions.withSession {
_ ++| (name,
new ColumnPath(MAP_COLUMN_PARENT.getColumn_family, null, serializer.out(key)),
@ -149,7 +164,7 @@ object CassandraStorage extends MapStorage
}
}
override def insertMapStorageEntriesFor(name: String, entries: List[Tuple2[AnyRef, AnyRef]]) = {
def insertMapStorageEntriesFor(name: String, entries: List[Tuple2[AnyRef, AnyRef]]) = {
val cf2columns: java.util.Map[String, java.util.List[Column]] = new java.util.HashMap
for (entry <- entries) {
val columns: java.util.List[Column] = new java.util.ArrayList
@ -161,7 +176,7 @@ object CassandraStorage extends MapStorage
}
}
override def getMapStorageEntryFor(name: String, key: AnyRef): Option[AnyRef] = {
def getMapStorageEntryFor(name: String, key: AnyRef): Option[AnyRef] = {
try {
val column: Option[Column] = sessions.withSession {
_ | (name, new ColumnPath(MAP_COLUMN_PARENT.getColumn_family, null, serializer.out(key)))
@ -175,7 +190,7 @@ object CassandraStorage extends MapStorage
}
}
override def getMapStorageFor(name: String): List[Tuple2[AnyRef, AnyRef]] = {
def getMapStorageFor(name: String): List[Tuple2[AnyRef, AnyRef]] = {
throw new UnsupportedOperationException
/*
val columns = server.get_columns_since(name, MAP_COLUMN_FAMILY, -1)
@ -187,15 +202,15 @@ object CassandraStorage extends MapStorage
*/
}
override def getMapStorageSizeFor(name: String): Int = {
def getMapStorageSizeFor(name: String): Int = {
sessions.withSession {
_ |# (name, MAP_COLUMN_PARENT)
}
}
override def removeMapStorageFor(name: String): Unit = removeMapStorageFor(name, null)
def removeMapStorageFor(name: String): Unit = removeMapStorageFor(name, null)
override def removeMapStorageFor(name: String, key: AnyRef): Unit = {
def removeMapStorageFor(name: String, key: AnyRef): Unit = {
val keyBytes = if (key == null) null else serializer.out(key)
sessions.withSession {
_ -- (name,
@ -205,7 +220,7 @@ object CassandraStorage extends MapStorage
}
}
override def getMapStorageRangeFor(name: String, start: Option[AnyRef], finish: Option[AnyRef], count: Int):
def getMapStorageRangeFor(name: String, start: Option[AnyRef], finish: Option[AnyRef], count: Int):
List[Tuple2[AnyRef, AnyRef]] = {
val startBytes = if (start.isDefined) serializer.out(start.get) else null
val finishBytes = if (finish.isDefined) serializer.out(finish.get) else null

View file

@ -1,287 +1,261 @@
package se.scalablesolutions.akka.state
import akka.util.Logging
import serialization.{Serializer}
import akka.Config.config
import sjson.json.Serializer._
import com.mongodb._
import java.util.{Map=>JMap, List=>JList, ArrayList=>JArrayList}
/**
* A module for supporting MongoDB based persistence.
* <p/>
* The module offers functionality for:
* <li>Persistent Maps</li>
* <li>Persistent Vectors</li>
* <li>Persistent Refs</li>
* <p/>
* @author <a href="http://debasishg.blogspot.com">Debasish Ghosh</a>
*/
object MongoStorage extends MapStorage with VectorStorage with RefStorage with Logging {
// enrich with null safe findOne
class RichDBCollection(value: DBCollection) {
def findOneNS(o: DBObject): Option[DBObject] = {
value.findOne(o) match {
case null => None
case x => Some(x)
}
}
}
implicit def enrichDBCollection(c: DBCollection) = new RichDBCollection(c)
val KEY = "key"
val VALUE = "value"
val COLLECTION = "akka_coll"
val MONGODB_SERVER_HOSTNAME = config.getString("akka.storage.mongodb.hostname", "127.0.0.1")
val MONGODB_SERVER_DBNAME = config.getString("akka.storage.mongodb.dbname", "testdb")
val MONGODB_SERVER_PORT = config.getInt("akka.storage.mongodb.port", 27017)
val db = new Mongo(MONGODB_SERVER_HOSTNAME, MONGODB_SERVER_PORT, MONGODB_SERVER_DBNAME)
val coll = db.getCollection(COLLECTION)
// FIXME: make this pluggable
private[this] val serializer = SJSON
override def insertMapStorageEntryFor(name: String,
key: AnyRef, value: AnyRef) {
insertMapStorageEntriesFor(name, List((key, value)))
}
override def insertMapStorageEntriesFor(name: String,
entries: List[Tuple2[AnyRef, AnyRef]]) {
import java.util.{Map, HashMap}
val m: Map[AnyRef, AnyRef] = new HashMap
for ((k, v) <- entries) {
m.put(k, serializer.out(v))
}
nullSafeFindOne(name) match {
case None =>
coll.insert(new BasicDBObject().append(KEY, name).append(VALUE, m))
case Some(dbo) => {
// collate the maps
val o = dbo.get(VALUE).asInstanceOf[Map[AnyRef, AnyRef]]
o.putAll(m)
// remove existing reference
removeMapStorageFor(name)
// and insert
coll.insert(new BasicDBObject().append(KEY, name).append(VALUE, o))
}
}
}
override def removeMapStorageFor(name: String) = {
val q = new BasicDBObject
q.put(KEY, name)
coll.remove(q)
}
override def removeMapStorageFor(name: String, key: AnyRef) = {
nullSafeFindOne(name) match {
case None =>
case Some(dbo) => {
val orig = dbo.get(VALUE).asInstanceOf[DBObject].toMap
orig.remove(key.asInstanceOf[String])
// remove existing reference
removeMapStorageFor(name)
// and insert
coll.insert(new BasicDBObject().append(KEY, name).append(VALUE, orig))
}
}
}
override def getMapStorageEntryFor(name: String,
key: AnyRef): Option[AnyRef] = {
getValueForKey(name, key.asInstanceOf[String])
}
override def getMapStorageSizeFor(name: String): Int = {
nullSafeFindOne(name) match {
case None => 0
case Some(dbo) =>
dbo.get(VALUE).asInstanceOf[JMap[String, AnyRef]].keySet.size
}
}
override def getMapStorageFor(name: String): List[Tuple2[AnyRef, AnyRef]] = {
val m =
nullSafeFindOne(name) match {
case None =>
throw new Predef.NoSuchElementException(name + " not present")
case Some(dbo) =>
dbo.get(VALUE).asInstanceOf[JMap[String, AnyRef]]
}
val n =
List(m.keySet.toArray: _*).asInstanceOf[List[String]]
val vals =
for(s <- n)
yield (s, serializer.in[AnyRef](m.get(s).asInstanceOf[Array[Byte]]))
vals.asInstanceOf[List[Tuple2[String, AnyRef]]]
}
override def getMapStorageRangeFor(name: String, start: Option[AnyRef],
finish: Option[AnyRef],
count: Int): List[Tuple2[AnyRef, AnyRef]] = {
val m =
nullSafeFindOne(name) match {
case None =>
throw new Predef.NoSuchElementException(name + " not present")
case Some(dbo) =>
dbo.get(VALUE).asInstanceOf[JMap[String, AnyRef]]
}
/**
* <tt>count</tt> is the max number of results to return. Start with
* <tt>start</tt> or 0 (if <tt>start</tt> is not defined) and go until
* you hit <tt>finish</tt> or <tt>count</tt>.
*/
val s = if (start.isDefined) start.get.asInstanceOf[Int] else 0
val cnt =
if (finish.isDefined) {
val f = finish.get.asInstanceOf[Int]
if (f >= s) Math.min(count, (f - s)) else count
}
else count
val n =
List(m.keySet.toArray: _*).asInstanceOf[List[String]].sort((e1, e2) => (e1 compareTo e2) < 0).slice(s, s + cnt)
val vals =
for(s <- n)
yield (s, serializer.in[AnyRef](m.get(s).asInstanceOf[Array[Byte]]))
vals.asInstanceOf[List[Tuple2[String, AnyRef]]]
}
private def getValueForKey(name: String, key: String): Option[AnyRef] = {
try {
nullSafeFindOne(name) match {
case None => None
case Some(dbo) =>
Some(serializer.in[AnyRef](
dbo.get(VALUE)
.asInstanceOf[JMap[String, AnyRef]]
.get(key).asInstanceOf[Array[Byte]]))
}
} catch {
case e =>
throw new Predef.NoSuchElementException(e.getMessage)
}
}
override def insertVectorStorageEntriesFor(name: String, elements: List[AnyRef]) = {
val q = new BasicDBObject
q.put(KEY, name)
val currentList =
coll.findOneNS(q) match {
case None =>
new JArrayList[AnyRef]
case Some(dbo) =>
dbo.get(VALUE).asInstanceOf[JArrayList[AnyRef]]
}
if (!currentList.isEmpty) {
// record exists
// remove before adding
coll.remove(q)
}
// add to the current list
elements.map(serializer.out(_)).foreach(currentList.add(_))
coll.insert(
new BasicDBObject()
.append(KEY, name)
.append(VALUE, currentList)
)
}
override def insertVectorStorageEntryFor(name: String, element: AnyRef) = {
insertVectorStorageEntriesFor(name, List(element))
}
override def getVectorStorageEntryFor(name: String, index: Int): AnyRef = {
try {
val o =
nullSafeFindOne(name) match {
case None =>
throw new Predef.NoSuchElementException(name + " not present")
case Some(dbo) =>
dbo.get(VALUE).asInstanceOf[JList[AnyRef]]
}
serializer.in[AnyRef](
o.get(index).asInstanceOf[Array[Byte]])
} catch {
case e =>
throw new Predef.NoSuchElementException(e.getMessage)
}
}
override def getVectorStorageRangeFor(name: String,
start: Option[Int], finish: Option[Int], count: Int): List[AnyRef] = {
try {
val o =
nullSafeFindOne(name) match {
case None =>
throw new Predef.NoSuchElementException(name + " not present")
case Some(dbo) =>
dbo.get(VALUE).asInstanceOf[JList[AnyRef]]
}
// pick the subrange and make a Scala list
val l =
List(o.subList(start.get, start.get + count).toArray: _*)
for(e <- l)
yield serializer.in[AnyRef](e.asInstanceOf[Array[Byte]])
} catch {
case e =>
throw new Predef.NoSuchElementException(e.getMessage)
}
}
override def getVectorStorageSizeFor(name: String): Int = {
nullSafeFindOne(name) match {
case None => 0
case Some(dbo) =>
dbo.get(VALUE).asInstanceOf[JList[AnyRef]].size
}
}
private def nullSafeFindOne(name: String): Option[DBObject] = {
val o = new BasicDBObject
o.put(KEY, name)
coll.findOneNS(o)
}
override def insertRefStorageFor(name: String, element: AnyRef) = {
nullSafeFindOne(name) match {
case None =>
case Some(dbo) => {
val q = new BasicDBObject
q.put(KEY, name)
coll.remove(q)
}
}
coll.insert(
new BasicDBObject()
.append(KEY, name)
.append(VALUE, serializer.out(element)))
}
override def getRefStorageFor(name: String): Option[AnyRef] = {
nullSafeFindOne(name) match {
case None => None
case Some(dbo) =>
Some(serializer.in[AnyRef](dbo.get(VALUE).asInstanceOf[Array[Byte]]))
}
}
}
/**
* Copyright (C) 2009 Scalable Solutions.
*/
package se.scalablesolutions.akka.state
import akka.util.Logging
import serialization.{Serializer}
import Config.config
import sjson.json.Serializer._
import com.mongodb._
import scala.collection.mutable.ArrayBuffer
import java.util.{Map=>JMap, List=>JList, ArrayList=>JArrayList}
/**
* A module for supporting MongoDB based persistence.
* <p/>
* The module offers functionality for:
* <li>Persistent Maps</li>
* <li>Persistent Vectors</li>
* <li>Persistent Refs</li>
* <p/>
* @author <a href="http://debasishg.blogspot.com">Debasish Ghosh</a>
*/
object MongoStorage extends MapStorage with VectorStorage with RefStorage with Logging {
// enrich with null safe findOne
class RichDBCollection(value: DBCollection) {
def findOneNS(o: DBObject): Option[DBObject] = {
value.findOne(o) match {
case null => None
case x => Some(x)
}
}
}
implicit def enrichDBCollection(c: DBCollection) = new RichDBCollection(c)
val KEY = "key"
val VALUE = "value"
val COLLECTION = "akka_coll"
val MONGODB_SERVER_HOSTNAME = config.getString("akka.storage.mongodb.hostname", "127.0.0.1")
val MONGODB_SERVER_DBNAME = config.getString("akka.storage.mongodb.dbname", "testdb")
val MONGODB_SERVER_PORT = config.getInt("akka.storage.mongodb.port", 27017)
val db = new Mongo(MONGODB_SERVER_HOSTNAME, MONGODB_SERVER_PORT, MONGODB_SERVER_DBNAME)
val coll = db.getCollection(COLLECTION)
// FIXME: make this pluggable
private[this] val serializer = SJSON
def insertMapStorageEntryFor(name: String,
key: AnyRef, value: AnyRef) {
insertMapStorageEntriesFor(name, List((key, value)))
}
def insertMapStorageEntriesFor(name: String,
entries: List[Tuple2[AnyRef, AnyRef]]) {
import java.util.{Map, HashMap}
val m: Map[AnyRef, AnyRef] = new HashMap
for ((k, v) <- entries) m.put(k, serializer.out(v))
nullSafeFindOne(name) match {
case None =>
coll.insert(new BasicDBObject().append(KEY, name).append(VALUE, m))
case Some(dbo) => {
// collate the maps
val o = dbo.get(VALUE).asInstanceOf[Map[AnyRef, AnyRef]]
o.putAll(m)
// remove existing reference
removeMapStorageFor(name)
// and insert
coll.insert(new BasicDBObject().append(KEY, name).append(VALUE, o))
}
}
}
def removeMapStorageFor(name: String) = {
val q = new BasicDBObject
q.put(KEY, name)
coll.remove(q)
}
def removeMapStorageFor(name: String, key: AnyRef) = {
nullSafeFindOne(name) match {
case None =>
case Some(dbo) => {
val orig = dbo.get(VALUE).asInstanceOf[DBObject].toMap
orig.remove(key.asInstanceOf[String])
// remove existing reference
removeMapStorageFor(name)
// and insert
coll.insert(new BasicDBObject().append(KEY, name).append(VALUE, orig))
}
}
}
def getMapStorageEntryFor(name: String, key: AnyRef): Option[AnyRef] = {
getValueForKey(name, key.asInstanceOf[String])
}
def getMapStorageSizeFor(name: String): Int = {
nullSafeFindOne(name) match {
case None => 0
case Some(dbo) =>
dbo.get(VALUE).asInstanceOf[JMap[String, AnyRef]].keySet.size
}
}
def getMapStorageFor(name: String): List[Tuple2[AnyRef, AnyRef]] = {
val m = nullSafeFindOne(name) match {
case None =>
throw new Predef.NoSuchElementException(name + " not present")
case Some(dbo) =>
dbo.get(VALUE).asInstanceOf[JMap[String, AnyRef]]
}
val n = List(m.keySet.toArray: _*).asInstanceOf[List[String]]
val vals = for(s <- n) yield (s, serializer.in[AnyRef](m.get(s).asInstanceOf[Array[Byte]]))
vals.asInstanceOf[List[Tuple2[String, AnyRef]]]
}
def getMapStorageRangeFor(name: String, start: Option[AnyRef],
finish: Option[AnyRef],
count: Int): List[Tuple2[AnyRef, AnyRef]] = {
val m = nullSafeFindOne(name) match {
case None =>
throw new Predef.NoSuchElementException(name + " not present")
case Some(dbo) =>
dbo.get(VALUE).asInstanceOf[JMap[String, AnyRef]]
}
/**
* <tt>count</tt> is the max number of results to return. Start with
* <tt>start</tt> or 0 (if <tt>start</tt> is not defined) and go until
* you hit <tt>finish</tt> or <tt>count</tt>.
*/
val s = if (start.isDefined) start.get.asInstanceOf[Int] else 0
val cnt =
if (finish.isDefined) {
val f = finish.get.asInstanceOf[Int]
if (f >= s) Math.min(count, (f - s)) else count
}
else count
val n = List(m.keySet.toArray: _*).asInstanceOf[List[String]].sort((e1, e2) => (e1 compareTo e2) < 0).slice(s, s + cnt)
val vals = for(s <- n) yield (s, serializer.in[AnyRef](m.get(s).asInstanceOf[Array[Byte]]))
vals.asInstanceOf[List[Tuple2[String, AnyRef]]]
}
private def getValueForKey(name: String, key: String): Option[AnyRef] = {
try {
nullSafeFindOne(name) match {
case None => None
case Some(dbo) =>
Some(serializer.in[AnyRef](dbo.get(VALUE).asInstanceOf[JMap[String, AnyRef]].get(key).asInstanceOf[Array[Byte]]))
}
} catch {
case e => throw new Predef.NoSuchElementException(e.getMessage)
}
}
def updateVectorStorageEntryFor(name: String, index: Int, elem: AnyRef) = throw new UnsupportedOperationException("The updateVectorStorageEntryFor method is not yet implemented for MongoDB")
def insertVectorStorageEntriesFor(name: String, elements: List[AnyRef]) = {
val q = new BasicDBObject
q.put(KEY, name)
val currentList =
coll.findOneNS(q) match {
case None =>
new JArrayList[AnyRef]
case Some(dbo) =>
dbo.get(VALUE).asInstanceOf[JArrayList[AnyRef]]
}
if (!currentList.isEmpty) {
// record exists
// remove before adding
coll.remove(q)
}
// add to the current list
elements.map(serializer.out(_)).foreach(currentList.add(_))
coll.insert(new BasicDBObject().append(KEY, name).append(VALUE, currentList))
}
def insertVectorStorageEntryFor(name: String, element: AnyRef) = insertVectorStorageEntriesFor(name, List(element))
def getVectorStorageEntryFor(name: String, index: Int): AnyRef = {
try {
val o = nullSafeFindOne(name) match {
case None =>
throw new Predef.NoSuchElementException(name + " not present")
case Some(dbo) =>
dbo.get(VALUE).asInstanceOf[JList[AnyRef]]
}
serializer.in[AnyRef](o.get(index).asInstanceOf[Array[Byte]])
} catch {
case e => throw new Predef.NoSuchElementException(e.getMessage)
}
}
def getVectorStorageRangeFor(name: String, start: Option[Int], finish: Option[Int], count: Int): RandomAccessSeq[AnyRef] = {
try {
val o = nullSafeFindOne(name) match {
case None =>
throw new Predef.NoSuchElementException(name + " not present")
case Some(dbo) =>
dbo.get(VALUE).asInstanceOf[JList[AnyRef]]
}
// pick the subrange and make a Scala list
val l = List(o.subList(start.get, start.get + count).toArray: _*)
val buffer = new ArrayBuffer[AnyRef]
for (elem <- l.map(e => serializer.in[AnyRef](e.asInstanceOf[Array[Byte]]))) buffer.append(elem)
buffer
} catch {
case e => throw new Predef.NoSuchElementException(e.getMessage)
}
}
def getVectorStorageSizeFor(name: String): Int = {
nullSafeFindOne(name) match {
case None => 0
case Some(dbo) =>
dbo.get(VALUE).asInstanceOf[JList[AnyRef]].size
}
}
private def nullSafeFindOne(name: String): Option[DBObject] = {
val o = new BasicDBObject
o.put(KEY, name)
coll.findOneNS(o)
}
def insertRefStorageFor(name: String, element: AnyRef) = {
nullSafeFindOne(name) match {
case None =>
case Some(dbo) => {
val q = new BasicDBObject
q.put(KEY, name)
coll.remove(q)
}
}
coll.insert(new BasicDBObject().append(KEY, name).append(VALUE, serializer.out(element)))
}
def getRefStorageFor(name: String): Option[AnyRef] = {
nullSafeFindOne(name) match {
case None => None
case Some(dbo) =>
Some(serializer.in[AnyRef](dbo.get(VALUE).asInstanceOf[Array[Byte]]))
}
}
}

View file

@ -38,23 +38,23 @@ object PersistentState extends PersistentState
* </pre>
*/
class PersistentState {
def newMap(config: PersistentStorageConfig): TransactionalMap[AnyRef, AnyRef] = config match {
case CassandraStorageConfig() => new CassandraPersistentTransactionalMap
case MongoStorageConfig() => new MongoPersistentTransactionalMap
def newMap(config: PersistentStorageConfig): PersistentMap[AnyRef, AnyRef] = config match {
case CassandraStorageConfig() => new CassandraPersistentMap
case MongoStorageConfig() => new MongoPersistentMap
case TerracottaStorageConfig() => throw new UnsupportedOperationException
case TokyoCabinetStorageConfig() => throw new UnsupportedOperationException
}
def newVector(config: PersistentStorageConfig): TransactionalVector[AnyRef] = config match {
case CassandraStorageConfig() => new CassandraPersistentTransactionalVector
case MongoStorageConfig() => new MongoPersistentTransactionalVector
def newVector(config: PersistentStorageConfig): PersistentVector[AnyRef] = config match {
case CassandraStorageConfig() => new CassandraPersistentVector
case MongoStorageConfig() => new MongoPersistentVector
case TerracottaStorageConfig() => throw new UnsupportedOperationException
case TokyoCabinetStorageConfig() => throw new UnsupportedOperationException
}
def newRef(config: PersistentStorageConfig): TransactionalRef[AnyRef] = config match {
case CassandraStorageConfig() => new CassandraPersistentTransactionalRef
case MongoStorageConfig() => new MongoPersistentTransactionalRef
def newRef(config: PersistentStorageConfig): PersistentRef[AnyRef] = config match {
case CassandraStorageConfig() => new CassandraPersistentRef
case MongoStorageConfig() => new MongoPersistentRef
case TerracottaStorageConfig() => throw new UnsupportedOperationException
case TokyoCabinetStorageConfig() => throw new UnsupportedOperationException
}
@ -68,32 +68,10 @@ class PersistentState {
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
abstract class PersistentTransactionalMap[K, V] extends TransactionalMap[K, V] {
// FIXME: need to handle remove in another changeSet
protected[akka] val changeSet = new HashMap[K, V]
def getRange(start: Option[AnyRef], count: Int)
// ---- For Transactional ----
override def begin = {}
override def rollback = changeSet.clear
// ---- For scala.collection.mutable.Map ----
override def put(key: K, value: V): Option[V] = {
verifyTransaction
changeSet += key -> value
None // always return None to speed up writes (else need to go to DB to get
}
override def -=(key: K) = remove(key)
override def update(key: K, value: V) = put(key, value)
}
trait PersistentMap[K, V] extends Transactional with scala.collection.mutable.Map[K, V]
/**
* Implementation of <tt>PersistentTransactionalMap</tt> for every concrete
* Implementation of <tt>PersistentMap</tt> for every concrete
* storage will have the same workflow. This abstracts the workflow.
*
* Subclasses just need to provide the actual concrete instance for the
@ -101,86 +79,65 @@ abstract class PersistentTransactionalMap[K, V] extends TransactionalMap[K, V] {
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
abstract class TemplatePersistentTransactionalMap extends PersistentTransactionalMap[AnyRef, AnyRef] {
trait TemplatePersistentMap extends PersistentMap[AnyRef, AnyRef] {
protected val newAndUpdatedEntries = TransactionalState.newMap[AnyRef, AnyRef]
protected val removedEntries = TransactionalState.newMap[AnyRef, AnyRef]
protected val shouldClearOnCommit = TransactionalRef[Boolean](false)
// to be concretized in subclasses
val storage: MapStorage
override def remove(key: AnyRef) = {
verifyTransaction
if (changeSet.contains(key)) changeSet -= key
else storage.removeMapStorageFor(uuid, key)
def commit = {
storage.removeMapStorageFor(uuid, removedEntries.toList)
storage.insertMapStorageEntriesFor(uuid, newAndUpdatedEntries.toList)
if (shouldClearOnCommit.isDefined & shouldClearOnCommit.get.get) storage.removeMapStorageFor(uuid)
newAndUpdatedEntries.clear
removedEntries.clear
}
override def getRange(start: Option[AnyRef], count: Int) =
getRange(start, None, count)
def -=(key: AnyRef) = remove(key)
def getRange(start: Option[AnyRef], finish: Option[AnyRef], count: Int) = {
verifyTransaction
try {
def +=(key: AnyRef, value: AnyRef) = put(key, value)
override def put(key: AnyRef, value: AnyRef): Option[AnyRef] = newAndUpdatedEntries.put(key, value)
override def update(key: AnyRef, value: AnyRef) = newAndUpdatedEntries.update(key, value)
def remove(key: AnyRef) = removedEntries.remove(key)
def slice(start: Option[AnyRef], count: Int): List[Tuple2[AnyRef, AnyRef]] = slice(start, None, count)
def slice(start: Option[AnyRef], finish: Option[AnyRef], count: Int): List[Tuple2[AnyRef, AnyRef]] = try {
storage.getMapStorageRangeFor(uuid, start, finish, count)
} catch {
case e: Exception => Nil
}
}
} catch { case e: Exception => Nil }
// ---- For Transactional ----
override def commit = {
storage.insertMapStorageEntriesFor(uuid, changeSet.toList)
changeSet.clear
}
override def clear = shouldClearOnCommit.swap(true)
// ---- Overriding scala.collection.mutable.Map behavior ----
override def clear = {
verifyTransaction
try {
storage.removeMapStorageFor(uuid)
} catch {
case e: Exception => {}
}
}
override def contains(key: AnyRef): Boolean = {
try {
verifyTransaction
override def contains(key: AnyRef): Boolean = try {
newAndUpdatedEntries.contains(key) ||
storage.getMapStorageEntryFor(uuid, key).isDefined
} catch {
case e: Exception => false
}
}
} catch { case e: Exception => false }
override def size: Int = {
verifyTransaction
try {
storage.getMapStorageSizeFor(uuid)
} catch {
case e: Exception => 0
}
}
override def size: Int = try {
storage.getMapStorageSizeFor(uuid) + newAndUpdatedEntries.size
} catch { case e: Exception => 0 }
// ---- For scala.collection.mutable.Map ----
override def get(key: AnyRef): Option[AnyRef] = {
verifyTransaction
// if (changeSet.contains(key)) changeSet.get(key)
// else {
val result = try {
if (newAndUpdatedEntries.contains(key)) newAndUpdatedEntries.get(key)
else try {
storage.getMapStorageEntryFor(uuid, key)
} catch {
case e: Exception => None
}
result
//}
} catch { case e: Exception => None }
}
override def elements: Iterator[Tuple2[AnyRef, AnyRef]] = {
//verifyTransaction
new Iterator[Tuple2[AnyRef, AnyRef]] {
private val originalList: List[Tuple2[AnyRef, AnyRef]] = try {
storage.getMapStorageFor(uuid)
} catch {
case e: Throwable => Nil
}
private var elements = originalList.reverse
// FIXME how to deal with updated entries, these should be replaced in the originalList not just added
private var elements = newAndUpdatedEntries.toList ::: originalList.reverse
override def next: Tuple2[AnyRef, AnyRef]= synchronized {
val element = elements.head
elements = elements.tail
@ -197,7 +154,7 @@ abstract class TemplatePersistentTransactionalMap extends PersistentTransactiona
*
* @author <a href="http://debasishg.blogspot.com">Debasish Ghosh</a>
*/
class CassandraPersistentTransactionalMap extends TemplatePersistentTransactionalMap {
class CassandraPersistentMap extends TemplatePersistentMap {
val storage = CassandraStorage
}
@ -206,7 +163,7 @@ class CassandraPersistentTransactionalMap extends TemplatePersistentTransactiona
*
* @author <a href="http://debasishg.blogspot.com">Debasish Ghosh</a>
*/
class MongoPersistentTransactionalMap extends TemplatePersistentTransactionalMap {
class MongoPersistentMap extends TemplatePersistentMap {
val storage = MongoStorage
}
@ -218,69 +175,62 @@ class MongoPersistentTransactionalMap extends TemplatePersistentTransactionalMap
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
abstract class PersistentTransactionalVector[T] extends TransactionalVector[T] {
// FIXME: need to handle remove in another changeSet
protected[akka] val changeSet = new ArrayBuffer[T]
// ---- For Transactional ----
override def begin = {}
override def rollback = changeSet.clear
// ---- For TransactionalVector ----
override def add(value: T) = {
verifyTransaction
changeSet += value
}
}
trait PersistentVector[T] extends Transactional with RandomAccessSeq[T]
/**
* Implements a template for a concrete persistent transactional vector based storage.
*
* @author <a href="http://debasishg.blogspot.com">Debasish Ghosh</a>
*/
abstract class TemplatePersistentTransactionalVector extends PersistentTransactionalVector[AnyRef] {
trait TemplatePersistentVector extends PersistentVector[AnyRef] {
protected val newElems = TransactionalState.newVector[AnyRef]
protected val updatedElems = TransactionalState.newMap[Int, AnyRef]
protected val removedElems = TransactionalState.newVector[AnyRef]
protected val shouldClearOnCommit = TransactionalRef[Boolean](false)
val storage: VectorStorage
// ---- For TransactionalVector ----
override def get(index: Int): AnyRef = {
verifyTransaction
if (changeSet.size > index) changeSet(index)
def commit = {
// FIXME: should use batch function once the bug is resolved
for (element <- newElems) storage.insertVectorStorageEntryFor(uuid, element)
for (entry <- updatedElems) storage.updateVectorStorageEntryFor(uuid, entry._1, entry._2)
newElems.clear
updatedElems.clear
}
def apply(index: Int): AnyRef = get(index)
def get(index: Int): AnyRef = {
if (newElems.size > index) newElems(index)
else storage.getVectorStorageEntryFor(uuid, index)
}
override def getRange(start: Int, count: Int): List[AnyRef] =
getRange(Some(start), None, count)
override def slice(start: Int, count: Int): RandomAccessSeq[AnyRef] = slice(Some(start), None, count)
def getRange(start: Option[Int], finish: Option[Int], count: Int): List[AnyRef] = {
verifyTransaction
def slice(start: Option[Int], finish: Option[Int], count: Int): RandomAccessSeq[AnyRef] =
storage.getVectorStorageRangeFor(uuid, start, finish, count)
}
override def length: Int = {
verifyTransaction
storage.getVectorStorageSizeFor(uuid)
}
/**
* Removes the <i>tail</i> element of this vector.
*/
// FIXME
def pop: AnyRef = throw new UnsupportedOperationException("need to implement persistent vector pop")
override def apply(index: Int): AnyRef = get(index)
// FIXME
def update(index: Int, newElem: AnyRef) = storage.updateVectorStorageEntryFor(uuid, index, newElem)
override def first: AnyRef = get(0)
override def last: AnyRef = {
verifyTransaction
val l = length
if (l == 0) throw new NoSuchElementException("Vector is empty")
get(length - 1)
if (newElems.length != 0) newElems.last
else {
val len = length
if (len == 0) throw new NoSuchElementException("Vector is empty")
get(len - 1)
}
}
// ---- For Transactional ----
override def commit = {
// FIXME: should use batch function once the bug is resolved
for (element <- changeSet) storage.insertVectorStorageEntryFor(uuid, element)
changeSet.clear
}
def length: Int = storage.getVectorStorageSizeFor(uuid) + newElems.length
}
/**
@ -288,7 +238,7 @@ abstract class TemplatePersistentTransactionalVector extends PersistentTransacti
*
* @author <a href="http://debasishg.blogspot.com">Debaissh Ghosh</a>
*/
class CassandraPersistentTransactionalVector extends TemplatePersistentTransactionalVector {
class CassandraPersistentVector extends TemplatePersistentVector {
val storage = CassandraStorage
}
@ -297,38 +247,37 @@ class CassandraPersistentTransactionalVector extends TemplatePersistentTransacti
*
* @author <a href="http://debasishg.blogspot.com">Debaissh Ghosh</a>
*/
class MongoPersistentTransactionalVector extends TemplatePersistentTransactionalVector {
class MongoPersistentVector extends TemplatePersistentVector {
val storage = MongoStorage
}
abstract class TemplatePersistentTransactionalRef extends TransactionalRef[AnyRef] {
trait PersistentRef[T] extends Transactional
trait TemplatePersistentRef extends PersistentRef[AnyRef] {
protected val ref = new TransactionalRef[AnyRef]
val storage: RefStorage
override def commit = if (ref.isDefined) {
def commit = if (ref.isDefined) {
storage.insertRefStorageFor(uuid, ref.get)
ref = None
ref.swap(null)
}
override def rollback = ref = None
def get: Option[AnyRef] = if (ref.isDefined) ref.get else storage.getRefStorageFor(uuid)
override def get: Option[AnyRef] = {
verifyTransaction
storage.getRefStorageFor(uuid)
}
def isDefined: Boolean = ref.isDefined || storage.getRefStorageFor(uuid).isDefined
override def isDefined: Boolean = get.isDefined
override def getOrElse(default: => AnyRef): AnyRef = {
val ref = get
if (ref.isDefined) ref
def getOrElse(default: => AnyRef): AnyRef = {
val current = get
if (current.isDefined) current
else default
}
}
class CassandraPersistentTransactionalRef extends TemplatePersistentTransactionalRef {
class CassandraPersistentRef extends TemplatePersistentRef {
val storage = CassandraStorage
}
class MongoPersistentTransactionalRef extends TemplatePersistentTransactionalRef {
class MongoPersistentRef extends TemplatePersistentRef {
val storage = MongoStorage
}

View file

@ -1,96 +0,0 @@
/**
* Copyright (C) 2009 Scalable Solutions.
*/
package se.scalablesolutions.akka.state
import org.apache.commons.pool._
import org.apache.commons.pool.impl._
import org.apache.thrift.transport._
trait Pool[T] extends java.io.Closeable {
def borrowObject: T
def returnObject(t: T): Unit
def invalidateObject(t: T): Unit
def addObject: Unit
def getNumIdle: Int
def getNumActive: Int
def clear: Unit
def setFactory(factory: PoolItemFactory[T]): Unit
}
trait PoolFactory[T] {
def createPool: Pool[T]
}
trait PoolItemFactory[T] {
def makeObject: T
def destroyObject(t: T): Unit
def validateObject(t: T): Boolean
def activateObject(t: T): Unit
def passivateObject(t: T): Unit
}
trait PoolBridge[T, OP <: ObjectPool] extends Pool[T] {
val impl: OP
override def borrowObject: T = impl.borrowObject.asInstanceOf[T]
override def returnObject(t: T) = impl.returnObject(t)
override def invalidateObject(t: T) = impl.invalidateObject(t)
override def addObject = impl.addObject
override def getNumIdle: Int = impl.getNumIdle
override def getNumActive: Int = impl.getNumActive
override def clear: Unit = impl.clear
override def close: Unit = impl.close
override def setFactory(factory: PoolItemFactory[T]) = impl.setFactory(toPoolableObjectFactory(factory))
def toPoolableObjectFactory[T](pif: PoolItemFactory[T]) = new PoolableObjectFactory {
def makeObject: Object = pif.makeObject.asInstanceOf[Object]
def destroyObject(o: Object): Unit = pif.destroyObject(o.asInstanceOf[T])
def validateObject(o: Object): Boolean = pif.validateObject(o.asInstanceOf[T])
def activateObject(o: Object): Unit = pif.activateObject(o.asInstanceOf[T])
def passivateObject(o: Object): Unit = pif.passivateObject(o.asInstanceOf[T])
}
}
object StackPool {
def apply[T](factory: PoolItemFactory[T]) = new PoolBridge[T,StackObjectPool] {
val impl = new StackObjectPool(toPoolableObjectFactory(factory))
}
def apply[T](factory: PoolItemFactory[T], maxIdle: Int) = new PoolBridge[T,StackObjectPool] {
val impl = new StackObjectPool(toPoolableObjectFactory(factory),maxIdle)
}
def apply[T](factory: PoolItemFactory[T], maxIdle: Int, initIdleCapacity: Int) = new PoolBridge[T,StackObjectPool] {
val impl = new StackObjectPool(toPoolableObjectFactory(factory),maxIdle,initIdleCapacity)
}
}
object SoftRefPool {
def apply[T](factory: PoolItemFactory[T]) = new PoolBridge[T,SoftReferenceObjectPool] {
val impl = new SoftReferenceObjectPool(toPoolableObjectFactory(factory))
}
def apply[T](factory: PoolItemFactory[T], initSize: Int) = new PoolBridge[T,SoftReferenceObjectPool] {
val impl = new SoftReferenceObjectPool(toPoolableObjectFactory(factory),initSize)
}
}
trait TransportFactory[T <: TTransport] extends PoolItemFactory[T] {
def createTransport: T
def makeObject: T = createTransport
def destroyObject(transport: T): Unit = transport.close
def validateObject(transport: T) = transport.isOpen
def activateObject(transport: T): Unit = if( !transport.isOpen ) transport.open else ()
def passivateObject(transport: T): Unit = transport.flush
}
case class SocketProvider(val host: String, val port: Int) extends TransportFactory[TSocket] {
def createTransport = {
val t = new TSocket(host, port)
t.open
t
}
}

View file

@ -1,3 +1,7 @@
/**
* Copyright (C) 2009 Scalable Solutions.
*/
package se.scalablesolutions.akka.state
// abstracts persistence storage
@ -15,12 +19,13 @@ trait MapStorage extends Storage {
def getMapStorageRangeFor(name: String, start: Option[AnyRef], finish: Option[AnyRef], count: Int): List[Tuple2[AnyRef, AnyRef]]
}
// for vectors
// for Vectors
trait VectorStorage extends Storage {
def insertVectorStorageEntryFor(name: String, element: AnyRef)
def insertVectorStorageEntriesFor(name: String, elements: List[AnyRef])
def updateVectorStorageEntryFor(name: String, index: Int, elem: AnyRef)
def getVectorStorageEntryFor(name: String, index: Int): AnyRef
def getVectorStorageRangeFor(name: String, start: Option[Int], finish: Option[Int], count: Int): List[AnyRef]
def getVectorStorageRangeFor(name: String, start: Option[Int], finish: Option[Int], count: Int): RandomAccessSeq[AnyRef]
def getVectorStorageSizeFor(name: String): Int
}