Merge branch 'branch-343'

This commit is contained in:
Debasish Ghosh 2010-09-13 17:43:39 +05:30
commit 0d2ed3bf4e
13 changed files with 1414 additions and 830 deletions

View file

@ -29,7 +29,7 @@ object CassandraStorage extends Storage {
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class CassandraPersistentMap(id: String) extends PersistentMap[Array[Byte], Array[Byte]] {
class CassandraPersistentMap(id: String) extends PersistentMapBinary {
val uuid = id
val storage = CassandraStorageBackend
}

View file

@ -7,9 +7,11 @@ package se.scalablesolutions.akka.persistence.common
import se.scalablesolutions.akka.stm._
import se.scalablesolutions.akka.stm.TransactionManagement.transaction
import se.scalablesolutions.akka.util.Logging
import se.scalablesolutions.akka.AkkaException
class StorageException(message: String) extends AkkaException(message)
// FIXME move to 'stm' package + add message with more info
class NoTransactionInScopeException extends RuntimeException
class StorageException(message: String) extends RuntimeException(message)
/**
* Example Scala usage.
@ -80,24 +82,90 @@ trait Storage {
*/
trait PersistentMap[K, V] extends scala.collection.mutable.Map[K, V]
with Transactional with Committable with Abortable with Logging {
protected val newAndUpdatedEntries = TransactionalMap[K, V]()
protected val removedEntries = TransactionalVector[K]()
protected val shouldClearOnCommit = Ref[Boolean]()
// operations on the Map
trait Op
case object GET extends Op
case object PUT extends Op
case object REM extends Op
case object UPD extends Op
// append only log: records all mutating operations
protected val appendOnlyTxLog = TransactionalVector[LogEntry]()
case class LogEntry(key: K, value: Option[V], op: Op)
// need to override in subclasses e.g. "sameElements" for Array[Byte]
def equal(k1: K, k2: K): Boolean = k1 == k2
// Seqable type that's required for maintaining the log of distinct keys affected in current transaction
type T <: Equals
// converts key K to the Seqable type Equals
def toEquals(k: K): T
// keys affected in the current transaction
protected val keysInCurrentTx = TransactionalMap[T, K]()
protected def addToListOfKeysInTx(key: K): Unit =
keysInCurrentTx += (toEquals(key), key)
protected def clearDistinctKeys = keysInCurrentTx.clear
protected def filterTxLogByKey(key: K): IndexedSeq[LogEntry] =
appendOnlyTxLog filter(e => equal(e.key, key))
// need to get current value considering the underlying storage as well as the transaction log
protected def getCurrentValue(key: K): Option[V] = {
// get all mutating entries for this key for this tx
val txEntries = filterTxLogByKey(key)
// get the snapshot from the underlying store for this key
val underlying = try {
storage.getMapStorageEntryFor(uuid, key)
} catch { case e: Exception => None }
if (txEntries.isEmpty) underlying
else replay(txEntries, key, underlying)
}
// replay all tx entries for key k with seed = initial
private def replay(txEntries: IndexedSeq[LogEntry], key: K, initial: Option[V]): Option[V] = {
import scala.collection.mutable._
val m = initial match {
case None => Map.empty[K, V]
case Some(v) => Map((key, v))
}
txEntries.foreach {case LogEntry(k, v, o) => o match {
case PUT => m.put(k, v.get)
case REM => m -= k
case UPD => m.update(k, v.get)
}}
m get key
}
// to be concretized in subclasses
val storage: MapStorageBackend[K, V]
def commit = {
if (shouldClearOnCommit.isDefined && shouldClearOnCommit.get) storage.removeMapStorageFor(uuid)
removedEntries.toList.foreach(key => storage.removeMapStorageFor(uuid, key))
storage.insertMapStorageEntriesFor(uuid, newAndUpdatedEntries.toList)
newAndUpdatedEntries.clear
removedEntries.clear
// if (shouldClearOnCommit.isDefined && shouldClearOnCommit.get) storage.removeMapStorageFor(uuid)
appendOnlyTxLog.foreach { case LogEntry(k, v, o) => o match {
case PUT => storage.insertMapStorageEntryFor(uuid, k, v.get)
case UPD => storage.insertMapStorageEntryFor(uuid, k, v.get)
case REM => storage.removeMapStorageFor(uuid, k)
}}
appendOnlyTxLog.clear
clearDistinctKeys
}
def abort = {
newAndUpdatedEntries.clear
removedEntries.clear
appendOnlyTxLog.clear
clearDistinctKeys
shouldClearOnCommit.swap(false)
}
@ -118,68 +186,84 @@ trait PersistentMap[K, V] extends scala.collection.mutable.Map[K, V]
override def put(key: K, value: V): Option[V] = {
register
newAndUpdatedEntries.put(key, value)
val curr = getCurrentValue(key)
appendOnlyTxLog add LogEntry(key, Some(value), PUT)
addToListOfKeysInTx(key)
curr
}
override def update(key: K, value: V) = {
register
newAndUpdatedEntries.update(key, value)
val curr = getCurrentValue(key)
appendOnlyTxLog add LogEntry(key, Some(value), UPD)
addToListOfKeysInTx(key)
curr
}
override def remove(key: K) = {
register
removedEntries.add(key)
newAndUpdatedEntries.get(key)
val curr = getCurrentValue(key)
appendOnlyTxLog add LogEntry(key, None, REM)
addToListOfKeysInTx(key)
curr
}
def slice(start: Option[K], count: Int): List[Tuple2[K, V]] =
def slice(start: Option[K], count: Int): List[(K, V)] =
slice(start, None, count)
def slice(start: Option[K], finish: Option[K], count: Int): List[Tuple2[K, V]] = try {
storage.getMapStorageRangeFor(uuid, start, finish, count)
} catch { case e: Exception => Nil }
def slice(start: Option[K], finish: Option[K], count: Int): List[(K, V)]
override def clear = {
register
appendOnlyTxLog.clear
clearDistinctKeys
shouldClearOnCommit.swap(true)
}
override def contains(key: K): Boolean = try {
newAndUpdatedEntries.contains(key) ||
storage.getMapStorageEntryFor(uuid, key).isDefined
filterTxLogByKey(key) match {
case Seq() => // current tx doesn't use this
storage.getMapStorageEntryFor(uuid, key).isDefined // check storage
case txs => // present in log
txs.last.op != REM // last entry cannot be a REM
}
} catch { case e: Exception => false }
protected def existsInStorage(key: K): Option[V] = try {
storage.getMapStorageEntryFor(uuid, key)
} catch {
case e: Exception => None
}
override def size: Int = try {
storage.getMapStorageSizeFor(uuid)
} catch { case e: Exception => 0 }
// partition key set affected in current tx into those which r added & which r deleted
val (keysAdded, keysRemoved) = keysInCurrentTx.map {
case (kseq, k) => ((kseq, k), getCurrentValue(k))
}.partition(_._2.isDefined)
override def get(key: K): Option[V] = {
if (newAndUpdatedEntries.contains(key)) {
newAndUpdatedEntries.get(key)
}
else try {
storage.getMapStorageEntryFor(uuid, key)
} catch { case e: Exception => None }
// keys which existed in storage but removed in current tx
val inStorageRemovedInTx =
keysRemoved.keySet
.map(_._2)
.filter(k => existsInStorage(k).isDefined)
.size
// all keys in storage
val keysInStorage =
storage.getMapStorageFor(uuid)
.map { case (k, v) => toEquals(k) }
.toSet
// (keys that existed UNION keys added ) - (keys removed)
(keysInStorage union keysAdded.keySet.map(_._1)).size - inStorageRemovedInTx
} catch {
case e: Exception => 0
}
def iterator = elements
// get must consider underlying storage & current uncommitted tx log
override def get(key: K): Option[V] = getCurrentValue(key)
override def elements: Iterator[Tuple2[K, V]] = {
new Iterator[Tuple2[K, V]] {
private val originalList: List[Tuple2[K, V]] = try {
storage.getMapStorageFor(uuid)
} catch {
case e: Throwable => Nil
}
private var elements = newAndUpdatedEntries.toList union originalList.reverse
override def next: Tuple2[K, V]= synchronized {
val element = elements.head
elements = elements.tail
element
}
override def hasNext: Boolean = synchronized { !elements.isEmpty }
}
}
def iterator: Iterator[Tuple2[K, V]]
private def register = {
if (transaction.get.isEmpty) throw new NoTransactionInScopeException
@ -187,6 +271,95 @@ trait PersistentMap[K, V] extends scala.collection.mutable.Map[K, V]
}
}
trait PersistentMapBinary extends PersistentMap[Array[Byte], Array[Byte]] {
import scala.collection.mutable.ArraySeq
type T = ArraySeq[Byte]
def toEquals(k: Array[Byte]) = ArraySeq(k: _*)
override def equal(k1: Array[Byte], k2: Array[Byte]): Boolean = k1 sameElements k2
object COrdering {
implicit object ArraySeqOrdering extends Ordering[ArraySeq[Byte]] {
def compare(o1: ArraySeq[Byte], o2: ArraySeq[Byte]) =
new String(o1.toArray) compare new String(o2.toArray)
}
}
import scala.collection.immutable.{TreeMap, SortedMap}
private def replayAllKeys: SortedMap[ArraySeq[Byte], Array[Byte]] = {
import COrdering._
// need ArraySeq for ordering
val fromStorage =
TreeMap(storage.getMapStorageFor(uuid).map { case (k, v) => (ArraySeq(k: _*), v) }: _*)
val (keysAdded, keysRemoved) = keysInCurrentTx.map {
case (_, k) => (k, getCurrentValue(k))
}.partition(_._2.isDefined)
val inStorageRemovedInTx =
keysRemoved.keySet
.filter(k => existsInStorage(k).isDefined)
.map(k => ArraySeq(k: _*))
(fromStorage -- inStorageRemovedInTx) ++ keysAdded.map { case (k, Some(v)) => (ArraySeq(k: _*), v) }
}
override def slice(start: Option[Array[Byte]], finish: Option[Array[Byte]], count: Int): List[(Array[Byte], Array[Byte])] = try {
val newMap = replayAllKeys
if (newMap isEmpty) List[(Array[Byte], Array[Byte])]()
val startKey =
start match {
case Some(bytes) => Some(ArraySeq(bytes: _*))
case None => None
}
val endKey =
finish match {
case Some(bytes) => Some(ArraySeq(bytes: _*))
case None => None
}
((startKey, endKey, count): @unchecked) match {
case ((Some(s), Some(e), _)) =>
newMap.range(s, e)
.toList
.map(e => (e._1.toArray, e._2))
.toList
case ((Some(s), None, c)) if c > 0 =>
newMap.from(s)
.iterator
.take(count)
.map(e => (e._1.toArray, e._2))
.toList
case ((Some(s), None, _)) =>
newMap.from(s)
.toList
.map(e => (e._1.toArray, e._2))
.toList
case ((None, Some(e), _)) =>
newMap.until(e)
.toList
.map(e => (e._1.toArray, e._2))
.toList
}
} catch { case e: Exception => Nil }
override def iterator: Iterator[(Array[Byte], Array[Byte])] = {
new Iterator[(Array[Byte], Array[Byte])] {
private var elements = replayAllKeys
override def next: (Array[Byte], Array[Byte]) = synchronized {
val (k, v) = elements.head
elements = elements.tail
(k.toArray, v)
}
override def hasNext: Boolean = synchronized { !elements.isEmpty }
}
}
}
/**
* Implements a template for a concrete persistent transactional vector based storage.
*
@ -198,42 +371,83 @@ trait PersistentVector[T] extends IndexedSeq[T] with Transactional with Committa
protected val removedElems = TransactionalVector[T]()
protected val shouldClearOnCommit = Ref[Boolean]()
// operations on the Vector
trait Op
case object ADD extends Op
case object UPD extends Op
case object POP extends Op
// append only log: records all mutating operations
protected val appendOnlyTxLog = TransactionalVector[LogEntry]()
case class LogEntry(index: Option[Int], value: Option[T], op: Op)
// need to override in subclasses e.g. "sameElements" for Array[Byte]
def equal(v1: T, v2: T): Boolean = v1 == v2
val storage: VectorStorageBackend[T]
def commit = {
for (element <- newElems) storage.insertVectorStorageEntryFor(uuid, element)
for (entry <- updatedElems) storage.updateVectorStorageEntryFor(uuid, entry._1, entry._2)
newElems.clear
updatedElems.clear
for(entry <- appendOnlyTxLog) {
entry match {
case LogEntry(_, Some(v), ADD) => storage.insertVectorStorageEntryFor(uuid, v)
case LogEntry(Some(i), Some(v), UPD) => storage.updateVectorStorageEntryFor(uuid, i, v)
case LogEntry(_, _, POP) => //..
}
}
appendOnlyTxLog.clear
}
def abort = {
newElems.clear
updatedElems.clear
removedElems.clear
appendOnlyTxLog.clear
shouldClearOnCommit.swap(false)
}
private def replay: List[T] = {
import scala.collection.mutable.ArrayBuffer
var elemsStorage = ArrayBuffer(storage.getVectorStorageRangeFor(uuid, None, None, storage.getVectorStorageSizeFor(uuid)).reverse: _*)
for(entry <- appendOnlyTxLog) {
entry match {
case LogEntry(_, Some(v), ADD) => elemsStorage += v
case LogEntry(Some(i), Some(v), UPD) => elemsStorage.update(i, v)
case LogEntry(_, _, POP) => elemsStorage = elemsStorage.drop(1)
}
}
elemsStorage.toList.reverse
}
def +(elem: T) = add(elem)
def add(elem: T) = {
register
newElems + elem
appendOnlyTxLog + LogEntry(None, Some(elem), ADD)
}
def apply(index: Int): T = get(index)
def get(index: Int): T = {
if (newElems.size > index) newElems(index)
else storage.getVectorStorageEntryFor(uuid, index)
if (appendOnlyTxLog.isEmpty) {
storage.getVectorStorageEntryFor(uuid, index)
} else {
val curr = replay
curr(index)
}
}
override def slice(start: Int, finish: Int): IndexedSeq[T] = slice(Some(start), Some(finish))
def slice(start: Option[Int], finish: Option[Int], count: Int = 0): IndexedSeq[T] = {
val buffer = new scala.collection.mutable.ArrayBuffer[T]
storage.getVectorStorageRangeFor(uuid, start, finish, count).foreach(buffer.append(_))
buffer
val curr = replay
val s = if (start.isDefined) start.get else 0
val cnt =
if (finish.isDefined) {
val f = finish.get
if (f >= s) (f - s) else count
}
else count
if (s == 0 && cnt == 0) List().toIndexedSeq
else curr.slice(s, s + cnt).toIndexedSeq
}
/**
@ -241,12 +455,13 @@ trait PersistentVector[T] extends IndexedSeq[T] with Transactional with Committa
*/
def pop: T = {
register
appendOnlyTxLog + LogEntry(None, None, POP)
throw new UnsupportedOperationException("PersistentVector::pop is not implemented")
}
def update(index: Int, newElem: T) = {
register
storage.updateVectorStorageEntryFor(uuid, index, newElem)
appendOnlyTxLog + LogEntry(Some(index), Some(newElem), UPD)
}
override def first: T = get(0)
@ -260,7 +475,7 @@ trait PersistentVector[T] extends IndexedSeq[T] with Transactional with Committa
}
}
def length: Int = storage.getVectorStorageSizeFor(uuid) + newElems.length
def length: Int = replay.length
private def register = {
if (transaction.get.isEmpty) throw new NoTransactionInScopeException

View file

@ -9,7 +9,7 @@ import se.scalablesolutions.akka.persistence.common._
import se.scalablesolutions.akka.util.UUID
object MongoStorage extends Storage {
type ElementType = AnyRef
type ElementType = Array[Byte]
def newMap: PersistentMap[ElementType, ElementType] = newMap(UUID.newUuid.toString)
def newVector: PersistentVector[ElementType] = newVector(UUID.newUuid.toString)
@ -29,7 +29,7 @@ object MongoStorage extends Storage {
*
* @author <a href="http://debasishg.blogspot.com">Debasish Ghosh</a>
*/
class MongoPersistentMap(id: String) extends PersistentMap[AnyRef, AnyRef] {
class MongoPersistentMap(id: String) extends PersistentMapBinary {
val uuid = id
val storage = MongoStorageBackend
}
@ -40,12 +40,12 @@ class MongoPersistentMap(id: String) extends PersistentMap[AnyRef, AnyRef] {
*
* @author <a href="http://debasishg.blogspot.com">Debaissh Ghosh</a>
*/
class MongoPersistentVector(id: String) extends PersistentVector[AnyRef] {
class MongoPersistentVector(id: String) extends PersistentVector[Array[Byte]] {
val uuid = id
val storage = MongoStorageBackend
}
class MongoPersistentRef(id: String) extends PersistentRef[AnyRef] {
class MongoPersistentRef(id: String) extends PersistentRef[Array[Byte]] {
val uuid = id
val storage = MongoStorageBackend
}

View file

@ -9,13 +9,8 @@ import se.scalablesolutions.akka.persistence.common._
import se.scalablesolutions.akka.util.Logging
import se.scalablesolutions.akka.config.Config.config
import sjson.json.Serializer._
import java.util.NoSuchElementException
import com.mongodb._
import java.util.{Map=>JMap, List=>JList, ArrayList=>JArrayList}
import com.novus.casbah.mongodb.Imports._
/**
* A module for supporting MongoDB based persistence.
@ -28,294 +23,208 @@ import java.util.{Map=>JMap, List=>JList, ArrayList=>JArrayList}
* @author <a href="http://debasishg.blogspot.com">Debasish Ghosh</a>
*/
private[akka] object MongoStorageBackend extends
MapStorageBackend[AnyRef, AnyRef] with
VectorStorageBackend[AnyRef] with
RefStorageBackend[AnyRef] with
MapStorageBackend[Array[Byte], Array[Byte]] with
VectorStorageBackend[Array[Byte]] with
RefStorageBackend[Array[Byte]] with
Logging {
// enrich with null safe findOne
class RichDBCollection(value: DBCollection) {
def findOneNS(o: DBObject): Option[DBObject] = {
value.findOne(o) match {
case null => None
case x => Some(x)
}
}
}
implicit def enrichDBCollection(c: DBCollection) = new RichDBCollection(c)
val KEY = "key"
val VALUE = "value"
val KEY = "__key"
val REF = "__ref"
val COLLECTION = "akka_coll"
val MONGODB_SERVER_HOSTNAME = config.getString("akka.storage.mongodb.hostname", "127.0.0.1")
val MONGODB_SERVER_DBNAME = config.getString("akka.storage.mongodb.dbname", "testdb")
val MONGODB_SERVER_PORT = config.getInt("akka.storage.mongodb.port", 27017)
val HOSTNAME = config.getString("akka.storage.mongodb.hostname", "127.0.0.1")
val DBNAME = config.getString("akka.storage.mongodb.dbname", "testdb")
val PORT = config.getInt("akka.storage.mongodb.port", 27017)
val db = new Mongo(MONGODB_SERVER_HOSTNAME, MONGODB_SERVER_PORT)
val coll = db.getDB(MONGODB_SERVER_DBNAME).getCollection(COLLECTION)
val db: MongoDB = MongoConnection(HOSTNAME, PORT)(DBNAME)
val coll: MongoCollection = db(COLLECTION)
private[this] val serializer = SJSON
def drop() { db.dropDatabase() }
def insertMapStorageEntryFor(name: String, key: AnyRef, value: AnyRef) {
def insertMapStorageEntryFor(name: String, key: Array[Byte], value: Array[Byte]) {
insertMapStorageEntriesFor(name, List((key, value)))
}
def insertMapStorageEntriesFor(name: String, entries: List[Tuple2[AnyRef, AnyRef]]) {
import java.util.{Map, HashMap}
val m: Map[AnyRef, AnyRef] = new HashMap
for ((k, v) <- entries) {
m.put(k, serializer.out(v))
}
nullSafeFindOne(name) match {
case None =>
coll.insert(new BasicDBObject().append(KEY, name).append(VALUE, m))
case Some(dbo) => {
// collate the maps
val o = dbo.get(VALUE).asInstanceOf[Map[AnyRef, AnyRef]]
o.putAll(m)
val newdbo = new BasicDBObject().append(KEY, name).append(VALUE, o)
coll.update(new BasicDBObject().append(KEY, name), newdbo, true, false)
def insertMapStorageEntriesFor(name: String, entries: List[(Array[Byte], Array[Byte])]) {
db.safely { db =>
val q: DBObject = MongoDBObject(KEY -> name)
coll.findOne(q) match {
case Some(dbo) =>
entries.foreach { case (k, v) => dbo += new String(k) -> v }
db.safely { db => coll.update(q, dbo, true, false) }
case None =>
val builder = MongoDBObject.newBuilder
builder += KEY -> name
entries.foreach { case (k, v) => builder += new String(k) -> v }
coll += builder.result.asDBObject
}
}
}
def removeMapStorageFor(name: String): Unit = {
val q = new BasicDBObject
q.put(KEY, name)
coll.remove(q)
val q: DBObject = MongoDBObject(KEY -> name)
db.safely { db => coll.remove(q) }
}
def removeMapStorageFor(name: String, key: AnyRef): Unit = {
nullSafeFindOne(name) match {
case None =>
case Some(dbo) => {
val orig = dbo.get(VALUE).asInstanceOf[DBObject].toMap
if (key.isInstanceOf[List[_]]) {
val keys = key.asInstanceOf[List[_]]
keys.foreach(k => orig.remove(k.asInstanceOf[String]))
} else {
orig.remove(key.asInstanceOf[String])
}
// remove existing reference
removeMapStorageFor(name)
// and insert
coll.insert(new BasicDBObject().append(KEY, name).append(VALUE, orig))
}
private def queryFor[T](name: String)(body: (MongoDBObject, Option[DBObject]) => T): T = {
val q = MongoDBObject(KEY -> name)
body(q, coll.findOne(q))
}
def removeMapStorageFor(name: String, key: Array[Byte]): Unit = queryFor(name) { (q, dbo) =>
dbo.foreach { d =>
d -= new String(key)
db.safely { db => coll.update(q, d, true, false) }
}
}
def getMapStorageEntryFor(name: String, key: AnyRef): Option[AnyRef] =
getValueForKey(name, key.asInstanceOf[String])
def getMapStorageSizeFor(name: String): Int = {
nullSafeFindOne(name) match {
case None => 0
case Some(dbo) =>
dbo.get(VALUE).asInstanceOf[JMap[String, AnyRef]].keySet.size
}
def getMapStorageEntryFor(name: String, key: Array[Byte]): Option[Array[Byte]] = queryFor(name) { (q, dbo) =>
dbo.map { d =>
d.getAs[Array[Byte]](new String(key))
}.getOrElse(None)
}
def getMapStorageFor(name: String): List[Tuple2[AnyRef, AnyRef]] = {
val m =
nullSafeFindOne(name) match {
case None =>
throw new NoSuchElementException(name + " not present")
case Some(dbo) =>
dbo.get(VALUE).asInstanceOf[JMap[String, AnyRef]]
}
val n =
List(m.keySet.toArray: _*).asInstanceOf[List[String]]
val vals =
for(s <- n)
yield (s, serializer.in[AnyRef](m.get(s).asInstanceOf[Array[Byte]]))
vals.asInstanceOf[List[Tuple2[String, AnyRef]]]
def getMapStorageSizeFor(name: String): Int = queryFor(name) { (q, dbo) =>
dbo.map { d =>
d.size - 2 // need to exclude object id and our KEY
}.getOrElse(0)
}
def getMapStorageRangeFor(name: String, start: Option[AnyRef],
finish: Option[AnyRef],
count: Int): List[Tuple2[AnyRef, AnyRef]] = {
val m =
nullSafeFindOne(name) match {
case None =>
throw new NoSuchElementException(name + " not present")
case Some(dbo) =>
dbo.get(VALUE).asInstanceOf[JMap[String, AnyRef]]
}
/**
* <tt>count</tt> is the max number of results to return. Start with
* <tt>start</tt> or 0 (if <tt>start</tt> is not defined) and go until
* you hit <tt>finish</tt> or <tt>count</tt>.
*/
val s = if (start.isDefined) start.get.asInstanceOf[Int] else 0
val cnt =
if (finish.isDefined) {
val f = finish.get.asInstanceOf[Int]
if (f >= s) math.min(count, (f - s)) else count
}
else count
val n =
List(m.keySet.toArray: _*).asInstanceOf[List[String]].sortWith((e1, e2) => (e1 compareTo e2) < 0).slice(s, s + cnt)
val vals =
for(s <- n)
yield (s, serializer.in[AnyRef](m.get(s).asInstanceOf[Array[Byte]]))
vals.asInstanceOf[List[Tuple2[String, AnyRef]]]
def getMapStorageFor(name: String): List[(Array[Byte], Array[Byte])] = queryFor(name) { (q, dbo) =>
dbo.map { d =>
for {
(k, v) <- d.toList
if k != "_id" && k != KEY
} yield (k.getBytes, v.asInstanceOf[Array[Byte]])
}.getOrElse(List.empty[(Array[Byte], Array[Byte])])
}
private def getValueForKey(name: String, key: String): Option[AnyRef] = {
try {
nullSafeFindOne(name) match {
case None => None
case Some(dbo) =>
Some(serializer.in[AnyRef](
dbo.get(VALUE)
.asInstanceOf[JMap[String, AnyRef]]
.get(key).asInstanceOf[Array[Byte]]))
}
} catch {
case e =>
throw new NoSuchElementException(e.getMessage)
}
def getMapStorageRangeFor(name: String, start: Option[Array[Byte]],
finish: Option[Array[Byte]],
count: Int): List[(Array[Byte], Array[Byte])] = queryFor(name) { (q, dbo) =>
dbo.map { d =>
// get all keys except the special ones
val keys = d.keys
.filter(k => k != "_id" && k != KEY)
.toList
.sortWith(_ < _)
// if the supplied start is not defined, get the head of keys
val s = start.map(new String(_)).getOrElse(keys.head)
// if the supplied finish is not defined, get the last element of keys
val f = finish.map(new String(_)).getOrElse(keys.last)
// slice from keys: both ends inclusive
val ks = keys.slice(keys.indexOf(s), scala.math.min(count, keys.indexOf(f) + 1))
ks.map(k => (k.getBytes, d.getAs[Array[Byte]](k).get))
}.getOrElse(List.empty[(Array[Byte], Array[Byte])])
}
def insertVectorStorageEntriesFor(name: String, elements: List[AnyRef]) = {
val q = new BasicDBObject
q.put(KEY, name)
val currentList =
coll.findOneNS(q) match {
case None =>
new JArrayList[AnyRef]
case Some(dbo) =>
dbo.get(VALUE).asInstanceOf[JArrayList[AnyRef]]
}
if (!currentList.isEmpty) {
// record exists
// remove before adding
coll.remove(q)
}
// add to the current list
elements.map(serializer.out(_)).foreach(currentList.add(_))
coll.insert(
new BasicDBObject()
.append(KEY, name)
.append(VALUE, currentList)
)
}
def insertVectorStorageEntryFor(name: String, element: AnyRef) = {
def insertVectorStorageEntryFor(name: String, element: Array[Byte]) = {
insertVectorStorageEntriesFor(name, List(element))
}
def getVectorStorageEntryFor(name: String, index: Int): AnyRef = {
try {
val o =
nullSafeFindOne(name) match {
case None =>
throw new NoSuchElementException(name + " not present")
def insertVectorStorageEntriesFor(name: String, elements: List[Array[Byte]]) = {
// lookup with name
val q: DBObject = MongoDBObject(KEY -> name)
case Some(dbo) =>
dbo.get(VALUE).asInstanceOf[JList[AnyRef]]
db.safely { db =>
coll.findOne(q) match {
// exists : need to update
case Some(dbo) =>
dbo -= KEY
dbo -= "_id"
val listBuilder = MongoDBList.newBuilder
// expensive!
listBuilder ++= (elements ++ dbo.toSeq.sortWith((e1, e2) => (e1._1.toInt < e2._1.toInt)).map(_._2))
val builder = MongoDBObject.newBuilder
builder += KEY -> name
builder ++= listBuilder.result
coll.update(q, builder.result.asDBObject, true, false)
// new : just add
case None =>
val listBuilder = MongoDBList.newBuilder
listBuilder ++= elements
val builder = MongoDBObject.newBuilder
builder += KEY -> name
builder ++= listBuilder.result
coll += builder.result.asDBObject
}
serializer.in[AnyRef](
o.get(index).asInstanceOf[Array[Byte]])
} catch {
case e =>
throw new NoSuchElementException(e.getMessage)
}
}
def getVectorStorageRangeFor(name: String,
start: Option[Int], finish: Option[Int], count: Int): List[AnyRef] = {
try {
val o =
nullSafeFindOne(name) match {
case None =>
throw new NoSuchElementException(name + " not present")
def updateVectorStorageEntryFor(name: String, index: Int, elem: Array[Byte]) = queryFor(name) { (q, dbo) =>
dbo.foreach { d =>
d += ((index.toString, elem))
db.safely { db => coll.update(q, d, true, false) }
}
}
case Some(dbo) =>
dbo.get(VALUE).asInstanceOf[JList[AnyRef]]
}
def getVectorStorageEntryFor(name: String, index: Int): Array[Byte] = queryFor(name) { (q, dbo) =>
dbo.map { d =>
d(index.toString).asInstanceOf[Array[Byte]]
}.getOrElse(Array.empty[Byte])
}
val s = if (start.isDefined) start.get else 0
/**
* if <tt>start</tt> and <tt>finish</tt> both are defined, ignore <tt>count</tt> and
* report the range [start, finish)
* if <tt>start</tt> is not defined, assume <tt>start</tt> = 0
* if <tt>start</tt> == 0 and <tt>finish</tt> == 0, return an empty collection
*/
def getVectorStorageRangeFor(name: String, start: Option[Int], finish: Option[Int], count: Int): List[Array[Byte]] = queryFor(name) { (q, dbo) =>
dbo.map { d =>
val ls = d.filter { case (k, v) => k != KEY && k != "_id" }
.toSeq
.sortWith((e1, e2) => (e1._1.toInt < e2._1.toInt))
.map(_._2)
val st = start.getOrElse(0)
val cnt =
if (finish.isDefined) {
val f = finish.get
if (f >= s) (f - s) else count
if (f >= st) (f - st) else count
}
else count
// pick the subrange and make a Scala list
val l =
List(o.subList(s, s + cnt).toArray: _*)
for(e <- l)
yield serializer.in[AnyRef](e.asInstanceOf[Array[Byte]])
} catch {
case e =>
throw new NoSuchElementException(e.getMessage)
}
if (st == 0 && cnt == 0) List()
ls.slice(st, st + cnt).asInstanceOf[List[Array[Byte]]]
}.getOrElse(List.empty[Array[Byte]])
}
def updateVectorStorageEntryFor(name: String, index: Int, elem: AnyRef) = {
val q = new BasicDBObject
q.put(KEY, name)
val dbobj =
coll.findOneNS(q) match {
case None =>
throw new NoSuchElementException(name + " not present")
case Some(dbo) => dbo
}
val currentList = dbobj.get(VALUE).asInstanceOf[JArrayList[AnyRef]]
currentList.set(index, serializer.out(elem))
coll.update(q,
new BasicDBObject().append(KEY, name).append(VALUE, currentList))
def getVectorStorageSizeFor(name: String): Int = queryFor(name) { (q, dbo) =>
dbo.map { d => d.size - 2 }.getOrElse(0)
}
def getVectorStorageSizeFor(name: String): Int = {
nullSafeFindOne(name) match {
case None => 0
case Some(dbo) =>
dbo.get(VALUE).asInstanceOf[JList[AnyRef]].size
}
}
def insertRefStorageFor(name: String, element: Array[Byte]) = {
// lookup with name
val q: DBObject = MongoDBObject(KEY -> name)
private def nullSafeFindOne(name: String): Option[DBObject] = {
val o = new BasicDBObject
o.put(KEY, name)
coll.findOneNS(o)
}
db.safely { db =>
coll.findOne(q) match {
// exists : need to update
case Some(dbo) =>
dbo += ((REF, element))
coll.update(q, dbo, true, false)
def insertRefStorageFor(name: String, element: AnyRef) = {
nullSafeFindOne(name) match {
case None =>
case Some(dbo) => {
val q = new BasicDBObject
q.put(KEY, name)
coll.remove(q)
// not found : make one
case None =>
val builder = MongoDBObject.newBuilder
builder += KEY -> name
builder += REF -> element
coll += builder.result.asDBObject
}
}
coll.insert(
new BasicDBObject()
.append(KEY, name)
.append(VALUE, serializer.out(element)))
}
def getRefStorageFor(name: String): Option[AnyRef] = {
nullSafeFindOne(name) match {
case None => None
case Some(dbo) =>
Some(serializer.in[AnyRef](dbo.get(VALUE).asInstanceOf[Array[Byte]]))
}
def getRefStorageFor(name: String): Option[Array[Byte]] = queryFor(name) { (q, dbo) =>
dbo.map { d =>
d.getAs[Array[Byte]](REF)
}.getOrElse(None)
}
}

View file

@ -1,32 +1,19 @@
package se.scalablesolutions.akka.persistence.mongo
import org.junit.{Test, Before}
import org.junit.Assert._
import org.scalatest.junit.JUnitSuite
import _root_.dispatch.json.{JsNumber, JsValue}
import _root_.dispatch.json.Js._
import org.scalatest.Spec
import org.scalatest.matchers.ShouldMatchers
import org.scalatest.BeforeAndAfterEach
import org.scalatest.junit.JUnitRunner
import org.junit.runner.RunWith
import se.scalablesolutions.akka.actor.{Transactor, Actor, ActorRef}
import Actor._
/**
* A persistent actor based on MongoDB storage.
* <p/>
* Demonstrates a bank account operation consisting of messages that:
* <li>checks balance <tt>Balance</tt></li>
* <li>debits amount<tt>Debit</tt></li>
* <li>debits multiple amounts<tt>MultiDebit</tt></li>
* <li>credits amount<tt>Credit</tt></li>
* <p/>
* Needs a running Mongo server.
* @author <a href="http://debasishg.blogspot.com">Debasish Ghosh</a>
*/
case class Balance(accountNo: String)
case class Debit(accountNo: String, amount: BigInt, failer: ActorRef)
case class MultiDebit(accountNo: String, amounts: List[BigInt], failer: ActorRef)
case class Credit(accountNo: String, amount: BigInt)
case class Debit(accountNo: String, amount: Int, failer: ActorRef)
case class MultiDebit(accountNo: String, amounts: List[Int], failer: ActorRef)
case class Credit(accountNo: String, amount: Int)
case class Log(start: Int, finish: Int)
case object LogSize
@ -35,63 +22,65 @@ class BankAccountActor extends Transactor {
private lazy val accountState = MongoStorage.newMap
private lazy val txnLog = MongoStorage.newVector
import sjson.json.DefaultProtocol._
import sjson.json.JsonSerialization._
def receive: Receive = {
// check balance
case Balance(accountNo) =>
txnLog.add("Balance:" + accountNo)
self.reply(accountState.get(accountNo).get)
txnLog.add(("Balance:" + accountNo).getBytes)
self.reply(
accountState.get(accountNo.getBytes)
.map(frombinary[Int](_))
.getOrElse(0))
// debit amount: can fail
case Debit(accountNo, amount, failer) =>
txnLog.add("Debit:" + accountNo + " " + amount)
txnLog.add(("Debit:" + accountNo + " " + amount).getBytes)
val m = accountState.get(accountNo.getBytes)
.map(frombinary[Int](_))
.getOrElse(0)
accountState.put(accountNo.getBytes, tobinary(m - amount))
if (amount > m) failer !! "Failure"
val m: BigInt =
accountState.get(accountNo) match {
case Some(JsNumber(n)) =>
BigInt(n.asInstanceOf[BigDecimal].intValue)
case None => 0
}
accountState.put(accountNo, (m - amount))
if (amount > m)
failer !! "Failure"
self.reply(m - amount)
// many debits: can fail
// demonstrates true rollback even if multiple puts have been done
case MultiDebit(accountNo, amounts, failer) =>
txnLog.add("MultiDebit:" + accountNo + " " + amounts.map(_.intValue).foldLeft(0)(_ + _))
val sum = amounts.foldRight(0)(_ + _)
txnLog.add(("MultiDebit:" + accountNo + " " + sum).getBytes)
val m: BigInt =
accountState.get(accountNo) match {
case Some(JsNumber(n)) => BigInt(n.toString)
case None => 0
val m = accountState.get(accountNo.getBytes)
.map(frombinary[Int](_))
.getOrElse(0)
var cbal = m
amounts.foreach { amount =>
accountState.put(accountNo.getBytes, tobinary(m - amount))
cbal = cbal - amount
if (cbal < 0) failer !! "Failure"
}
var bal: BigInt = 0
amounts.foreach {amount =>
bal = bal + amount
accountState.put(accountNo, (m - bal))
}
if (bal > m) failer !! "Failure"
self.reply(m - bal)
self.reply(m - sum)
// credit amount
case Credit(accountNo, amount) =>
txnLog.add("Credit:" + accountNo + " " + amount)
txnLog.add(("Credit:" + accountNo + " " + amount).getBytes)
val m = accountState.get(accountNo.getBytes)
.map(frombinary[Int](_))
.getOrElse(0)
accountState.put(accountNo.getBytes, tobinary(m + amount))
val m: BigInt =
accountState.get(accountNo) match {
case Some(JsNumber(n)) =>
BigInt(n.asInstanceOf[BigDecimal].intValue)
case None => 0
}
accountState.put(accountNo, (m + amount))
self.reply(m + amount)
case LogSize =>
self.reply(txnLog.length.asInstanceOf[AnyRef])
self.reply(txnLog.length)
case Log(start, finish) =>
self.reply(txnLog.slice(start, finish))
self.reply(txnLog.slice(start, finish).map(new String(_)))
}
}
@ -102,82 +91,71 @@ class BankAccountActor extends Transactor {
}
}
class MongoPersistentActorSpec extends JUnitSuite {
@Test
def testSuccessfulDebit = {
val bactor = actorOf[BankAccountActor]
bactor.start
val failer = actorOf[PersistentFailerActor]
failer.start
bactor !! Credit("a-123", 5000)
bactor !! Debit("a-123", 3000, failer)
@RunWith(classOf[JUnitRunner])
class MongoPersistentActorSpec extends
Spec with
ShouldMatchers with
BeforeAndAfterEach {
val JsNumber(b) = (bactor !! Balance("a-123")).get.asInstanceOf[JsValue]
assertEquals(BigInt(2000), BigInt(b.intValue))
bactor !! Credit("a-123", 7000)
val JsNumber(b1) = (bactor !! Balance("a-123")).get.asInstanceOf[JsValue]
assertEquals(BigInt(9000), BigInt(b1.intValue))
bactor !! Debit("a-123", 8000, failer)
val JsNumber(b2) = (bactor !! Balance("a-123")).get.asInstanceOf[JsValue]
assertEquals(BigInt(1000), BigInt(b2.intValue))
assert(7 == (bactor !! LogSize).get.asInstanceOf[Int])
import scala.collection.mutable.ArrayBuffer
assert((bactor !! Log(0, 7)).get.asInstanceOf[ArrayBuffer[String]].size == 7)
assert((bactor !! Log(0, 0)).get.asInstanceOf[ArrayBuffer[String]].size == 0)
assert((bactor !! Log(1, 2)).get.asInstanceOf[ArrayBuffer[String]].size == 1)
assert((bactor !! Log(6, 7)).get.asInstanceOf[ArrayBuffer[String]].size == 1)
assert((bactor !! Log(0, 1)).get.asInstanceOf[ArrayBuffer[String]].size == 1)
override def beforeEach {
MongoStorageBackend.drop
}
@Test
def testUnsuccessfulDebit = {
val bactor = actorOf[BankAccountActor]
bactor.start
bactor !! Credit("a-123", 5000)
val JsNumber(b) = (bactor !! Balance("a-123")).get.asInstanceOf[JsValue]
assertEquals(BigInt(5000), BigInt(b.intValue))
val failer = actorOf[PersistentFailerActor]
failer.start
try {
bactor !! Debit("a-123", 7000, failer)
fail("should throw exception")
} catch { case e: RuntimeException => {}}
val JsNumber(b1) = (bactor !! Balance("a-123")).get.asInstanceOf[JsValue]
assertEquals(BigInt(5000), BigInt(b1.intValue))
// should not count the failed one
assert(3 == (bactor !! LogSize).get.asInstanceOf[Int])
override def afterEach {
MongoStorageBackend.drop
}
@Test
def testUnsuccessfulMultiDebit = {
val bactor = actorOf[BankAccountActor]
bactor.start
bactor !! Credit("a-123", 5000)
describe("successful debit") {
it("should debit successfully") {
val bactor = actorOf[BankAccountActor]
bactor.start
val failer = actorOf[PersistentFailerActor]
failer.start
bactor !! Credit("a-123", 5000)
bactor !! Debit("a-123", 3000, failer)
val JsNumber(b) = (bactor !! Balance("a-123")).get.asInstanceOf[JsValue]
assertEquals(BigInt(5000), BigInt(b.intValue))
(bactor !! Balance("a-123")).get.asInstanceOf[Int] should equal(2000)
val failer = actorOf[PersistentFailerActor]
failer.start
try {
bactor !! MultiDebit("a-123", List(500, 2000, 1000, 3000), failer)
fail("should throw exception")
} catch { case e: RuntimeException => {}}
bactor !! Credit("a-123", 7000)
(bactor !! Balance("a-123")).get.asInstanceOf[Int] should equal(9000)
val JsNumber(b1) = (bactor !! Balance("a-123")).get.asInstanceOf[JsValue]
assertEquals(BigInt(5000), BigInt(b1.intValue))
bactor !! Debit("a-123", 8000, failer)
(bactor !! Balance("a-123")).get.asInstanceOf[Int] should equal(1000)
// should not count the failed one
assert(3 == (bactor !! LogSize).get.asInstanceOf[Int])
(bactor !! LogSize).get.asInstanceOf[Int] should equal(7)
(bactor !! Log(0, 7)).get.asInstanceOf[Iterable[String]].size should equal(7)
}
}
describe("unsuccessful debit") {
it("debit should fail") {
val bactor = actorOf[BankAccountActor]
bactor.start
val failer = actorOf[PersistentFailerActor]
failer.start
bactor !! Credit("a-123", 5000)
(bactor !! Balance("a-123")).get.asInstanceOf[Int] should equal(5000)
evaluating {
bactor !! Debit("a-123", 7000, failer)
} should produce [Exception]
(bactor !! Balance("a-123")).get.asInstanceOf[Int] should equal(5000)
(bactor !! LogSize).get.asInstanceOf[Int] should equal(3)
}
}
describe("unsuccessful multidebit") {
it("multidebit should fail") {
val bactor = actorOf[BankAccountActor]
bactor.start
val failer = actorOf[PersistentFailerActor]
failer.start
bactor !! Credit("a-123", 5000)
(bactor !! Balance("a-123")).get.asInstanceOf[Int] should equal(5000)
evaluating {
bactor !! MultiDebit("a-123", List(1000, 2000, 4000), failer)
} should produce [Exception]
(bactor !! Balance("a-123")).get.asInstanceOf[Int] should equal(5000)
(bactor !! LogSize).get.asInstanceOf[Int] should equal(3)
}
}
}

View file

@ -1,364 +1,158 @@
package se.scalablesolutions.akka.persistence.mongo
import org.junit.{Test, Before}
import org.junit.Assert._
import org.scalatest.junit.JUnitSuite
import _root_.dispatch.json._
import _root_.dispatch.json.Js._
import org.scalatest.Spec
import org.scalatest.matchers.ShouldMatchers
import org.scalatest.BeforeAndAfterEach
import org.scalatest.junit.JUnitRunner
import org.junit.runner.RunWith
import java.util.NoSuchElementException
@scala.reflect.BeanInfo case class Foo(no: Int, name: String)
class MongoStorageSpec extends JUnitSuite {
@RunWith(classOf[JUnitRunner])
class MongoStorageSpec extends
Spec with
ShouldMatchers with
BeforeAndAfterEach {
val changeSetV = new scala.collection.mutable.ArrayBuffer[AnyRef]
val changeSetM = new scala.collection.mutable.HashMap[AnyRef, AnyRef]
@Before def initialize() = {
MongoStorageBackend.coll.drop
override def beforeEach {
MongoStorageBackend.drop
}
@Test
def testVectorInsertForTransactionId = {
changeSetV += "debasish" // string
changeSetV += List(1, 2, 3) // Scala List
changeSetV += List(100, 200)
MongoStorageBackend.insertVectorStorageEntriesFor("U-A1", changeSetV.toList)
assertEquals(
3,
MongoStorageBackend.getVectorStorageSizeFor("U-A1"))
changeSetV.clear
// changeSetV should be reinitialized
changeSetV += List(12, 23, 45)
changeSetV += "maulindu"
MongoStorageBackend.insertVectorStorageEntriesFor("U-A1", changeSetV.toList)
assertEquals(
5,
MongoStorageBackend.getVectorStorageSizeFor("U-A1"))
// add more to the same changeSetV
changeSetV += "ramanendu"
changeSetV += Map(1 -> "dg", 2 -> "mc")
// add for a diff transaction
MongoStorageBackend.insertVectorStorageEntriesFor("U-A2", changeSetV.toList)
assertEquals(
4,
MongoStorageBackend.getVectorStorageSizeFor("U-A2"))
// previous transaction change set should remain same
assertEquals(
5,
MongoStorageBackend.getVectorStorageSizeFor("U-A1"))
// test single element entry
MongoStorageBackend.insertVectorStorageEntryFor("U-A1", Map(1->1, 2->4, 3->9))
assertEquals(
6,
MongoStorageBackend.getVectorStorageSizeFor("U-A1"))
override def afterEach {
MongoStorageBackend.drop
}
@Test
def testVectorFetchForKeys = {
describe("persistent maps") {
it("should insert with single key and value") {
import MongoStorageBackend._
// initially everything 0
assertEquals(
0,
MongoStorageBackend.getVectorStorageSizeFor("U-A2"))
assertEquals(
0,
MongoStorageBackend.getVectorStorageSizeFor("U-A1"))
// get some stuff
changeSetV += "debasish"
changeSetV += List(BigDecimal(12), BigDecimal(13), BigDecimal(14))
MongoStorageBackend.insertVectorStorageEntriesFor("U-A1", changeSetV.toList)
assertEquals(
2,
MongoStorageBackend.getVectorStorageSizeFor("U-A1"))
val JsString(str) = MongoStorageBackend.getVectorStorageEntryFor("U-A1", 0).asInstanceOf[JsString]
assertEquals("debasish", str)
val l = MongoStorageBackend.getVectorStorageEntryFor("U-A1", 1).asInstanceOf[JsValue]
val num_list = list ! num
val num_list(l0) = l
assertEquals(List(12, 13, 14), l0)
changeSetV.clear
changeSetV += Map(1->1, 2->4, 3->9)
changeSetV += BigInt(2310)
changeSetV += List(100, 200, 300)
MongoStorageBackend.insertVectorStorageEntriesFor("U-A1", changeSetV.toList)
assertEquals(
5,
MongoStorageBackend.getVectorStorageSizeFor("U-A1"))
val r =
MongoStorageBackend.getVectorStorageRangeFor("U-A1", Some(1), None, 3)
assertEquals(3, r.size)
val lr = r(0).asInstanceOf[JsValue]
val num_list(l1) = lr
assertEquals(List(12, 13, 14), l1)
}
@Test
def testVectorFetchForNonExistentKeys = {
try {
MongoStorageBackend.getVectorStorageEntryFor("U-A1", 1)
fail("should throw an exception")
} catch {case e: NoSuchElementException => {}}
try {
MongoStorageBackend.getVectorStorageRangeFor("U-A1", Some(2), None, 12)
fail("should throw an exception")
} catch {case e: NoSuchElementException => {}}
}
@Test
def testVectorUpdateForTransactionId = {
import MongoStorageBackend._
changeSetV += "debasish" // string
changeSetV += List(1, 2, 3) // Scala List
changeSetV += List(100, 200)
insertVectorStorageEntriesFor("U-A1", changeSetV.toList)
assertEquals(3, getVectorStorageSizeFor("U-A1"))
updateVectorStorageEntryFor("U-A1", 0, "maulindu")
val JsString(str) = getVectorStorageEntryFor("U-A1", 0).asInstanceOf[JsString]
assertEquals("maulindu", str)
updateVectorStorageEntryFor("U-A1", 1, Map("1"->"dg", "2"->"mc"))
val JsObject(m) = getVectorStorageEntryFor("U-A1", 1).asInstanceOf[JsObject]
assertEquals(m.keySet.size, 2)
}
@Test
def testMapInsertForTransactionId = {
fillMap
// add some more to changeSet
changeSetM += "5" -> Foo(12, "dg")
changeSetM += "6" -> java.util.Calendar.getInstance.getTime
// insert all into Mongo
MongoStorageBackend.insertMapStorageEntriesFor("U-M1", changeSetM.toList)
assertEquals(
6,
MongoStorageBackend.getMapStorageSizeFor("U-M1"))
// individual insert api
MongoStorageBackend.insertMapStorageEntryFor("U-M1", "7", "akka")
MongoStorageBackend.insertMapStorageEntryFor("U-M1", "8", List(23, 25))
assertEquals(
8,
MongoStorageBackend.getMapStorageSizeFor("U-M1"))
// add the same changeSet for another transaction
MongoStorageBackend.insertMapStorageEntriesFor("U-M2", changeSetM.toList)
assertEquals(
6,
MongoStorageBackend.getMapStorageSizeFor("U-M2"))
// the first transaction should remain the same
assertEquals(
8,
MongoStorageBackend.getMapStorageSizeFor("U-M1"))
changeSetM.clear
}
@Test
def testMapContents = {
fillMap
MongoStorageBackend.insertMapStorageEntriesFor("U-M1", changeSetM.toList)
MongoStorageBackend.getMapStorageEntryFor("U-M1", "2") match {
case Some(x) => {
val JsString(str) = x.asInstanceOf[JsValue]
assertEquals("peter", str)
}
case None => fail("should fetch peter")
}
MongoStorageBackend.getMapStorageEntryFor("U-M1", "4") match {
case Some(x) => {
val num_list = list ! num
val num_list(l0) = x.asInstanceOf[JsValue]
assertEquals(3, l0.size)
}
case None => fail("should fetch list")
}
MongoStorageBackend.getMapStorageEntryFor("U-M1", "3") match {
case Some(x) => {
val num_list = list ! num
val num_list(l0) = x.asInstanceOf[JsValue]
assertEquals(2, l0.size)
}
case None => fail("should fetch list")
insertMapStorageEntryFor("t1", "odersky".getBytes, "scala".getBytes)
insertMapStorageEntryFor("t1", "gosling".getBytes, "java".getBytes)
insertMapStorageEntryFor("t1", "stroustrup".getBytes, "c++".getBytes)
getMapStorageSizeFor("t1") should equal(3)
new String(getMapStorageEntryFor("t1", "odersky".getBytes).get) should equal("scala")
new String(getMapStorageEntryFor("t1", "gosling".getBytes).get) should equal("java")
new String(getMapStorageEntryFor("t1", "stroustrup".getBytes).get) should equal("c++")
getMapStorageEntryFor("t1", "torvalds".getBytes) should equal(None)
}
// get the entire map
val l: List[Tuple2[AnyRef, AnyRef]] =
MongoStorageBackend.getMapStorageFor("U-M1")
it("should insert with multiple keys and values") {
import MongoStorageBackend._
assertEquals(4, l.size)
assertTrue(l.map(_._1).contains("1"))
assertTrue(l.map(_._1).contains("2"))
assertTrue(l.map(_._1).contains("3"))
assertTrue(l.map(_._1).contains("4"))
val l = List(("stroustrup", "c++"), ("odersky", "scala"), ("gosling", "java"))
insertMapStorageEntriesFor("t1", l.map { case (k, v) => (k.getBytes, v.getBytes) })
getMapStorageSizeFor("t1") should equal(3)
new String(getMapStorageEntryFor("t1", "stroustrup".getBytes).get) should equal("c++")
new String(getMapStorageEntryFor("t1", "gosling".getBytes).get) should equal("java")
new String(getMapStorageEntryFor("t1", "odersky".getBytes).get) should equal("scala")
getMapStorageEntryFor("t1", "torvalds".getBytes) should equal(None)
val JsString(str) = l.filter(_._1 == "2").head._2
assertEquals(str, "peter")
getMapStorageEntryFor("t2", "torvalds".getBytes) should equal(None)
// trying to fetch for a non-existent transaction will throw
try {
MongoStorageBackend.getMapStorageFor("U-M2")
fail("should throw an exception")
} catch {case e: NoSuchElementException => {}}
getMapStorageFor("t1").map { case (k, v) => (new String(k), new String(v)) } should equal (l)
changeSetM.clear
}
removeMapStorageFor("t1", "gosling".getBytes)
getMapStorageSizeFor("t1") should equal(2)
@Test
def testMapContentsByRange = {
fillMap
changeSetM += "5" -> Map(1 -> "dg", 2 -> "mc")
MongoStorageBackend.insertMapStorageEntriesFor("U-M1", changeSetM.toList)
// specify start and count
val l: List[Tuple2[AnyRef, AnyRef]] =
MongoStorageBackend.getMapStorageRangeFor(
"U-M1", Some(Integer.valueOf(2)), None, 3)
assertEquals(3, l.size)
assertEquals("3", l(0)._1.asInstanceOf[String])
val lst = l(0)._2.asInstanceOf[JsValue]
val num_list = list ! num
val num_list(l0) = lst
assertEquals(List(100, 200), l0)
assertEquals("4", l(1)._1.asInstanceOf[String])
val ls = l(1)._2.asInstanceOf[JsValue]
val num_list(l1) = ls
assertEquals(List(10, 20, 30), l1)
// specify start, finish and count where finish - start == count
assertEquals(3,
MongoStorageBackend.getMapStorageRangeFor(
"U-M1", Some(Integer.valueOf(2)), Some(Integer.valueOf(5)), 3).size)
// specify start, finish and count where finish - start > count
assertEquals(3,
MongoStorageBackend.getMapStorageRangeFor(
"U-M1", Some(Integer.valueOf(2)), Some(Integer.valueOf(9)), 3).size)
// do not specify start or finish
assertEquals(3,
MongoStorageBackend.getMapStorageRangeFor(
"U-M1", None, None, 3).size)
// specify finish and count
assertEquals(3,
MongoStorageBackend.getMapStorageRangeFor(
"U-M1", None, Some(Integer.valueOf(3)), 3).size)
// specify start, finish and count where finish < start
assertEquals(3,
MongoStorageBackend.getMapStorageRangeFor(
"U-M1", Some(Integer.valueOf(2)), Some(Integer.valueOf(1)), 3).size)
changeSetM.clear
}
@Test
def testMapStorageRemove = {
fillMap
changeSetM += "5" -> Map(1 -> "dg", 2 -> "mc")
MongoStorageBackend.insertMapStorageEntriesFor("U-M1", changeSetM.toList)
assertEquals(5,
MongoStorageBackend.getMapStorageSizeFor("U-M1"))
// remove key "3"
MongoStorageBackend.removeMapStorageFor("U-M1", "3")
assertEquals(4,
MongoStorageBackend.getMapStorageSizeFor("U-M1"))
try {
MongoStorageBackend.getMapStorageEntryFor("U-M1", "3")
fail("should throw exception")
} catch { case e => {}}
// remove key "4"
MongoStorageBackend.removeMapStorageFor("U-M1", "4")
assertEquals(3,
MongoStorageBackend.getMapStorageSizeFor("U-M1"))
// remove key "2"
MongoStorageBackend.removeMapStorageFor("U-M1", "2")
assertEquals(2,
MongoStorageBackend.getMapStorageSizeFor("U-M1"))
// remove the whole stuff
MongoStorageBackend.removeMapStorageFor("U-M1")
try {
MongoStorageBackend.getMapStorageFor("U-M1")
fail("should throw exception")
} catch { case e: NoSuchElementException => {}}
changeSetM.clear
}
private def fillMap = {
changeSetM += "1" -> "john"
changeSetM += "2" -> "peter"
changeSetM += "3" -> List(100, 200)
changeSetM += "4" -> List(10, 20, 30)
changeSetM
}
@Test
def testRefStorage = {
MongoStorageBackend.getRefStorageFor("U-R1") match {
case None =>
case Some(o) => fail("should be None")
removeMapStorageFor("t1")
getMapStorageSizeFor("t1") should equal(0)
}
val m = Map("1"->1, "2"->4, "3"->9)
MongoStorageBackend.insertRefStorageFor("U-R1", m)
MongoStorageBackend.getRefStorageFor("U-R1") match {
case None => fail("should not be empty")
case Some(r) => {
val a = r.asInstanceOf[JsValue]
val m1 = Symbol("1") ? num
val m2 = Symbol("2") ? num
val m3 = Symbol("3") ? num
it("should do proper range queries") {
import MongoStorageBackend._
val l = List(
("bjarne stroustrup", "c++"),
("martin odersky", "scala"),
("james gosling", "java"),
("yukihiro matsumoto", "ruby"),
("slava pestov", "factor"),
("rich hickey", "clojure"),
("ola bini", "ioke"),
("dennis ritchie", "c"),
("larry wall", "perl"),
("guido van rossum", "python"),
("james strachan", "groovy"))
insertMapStorageEntriesFor("t1", l.map { case (k, v) => (k.getBytes, v.getBytes) })
getMapStorageSizeFor("t1") should equal(l.size)
getMapStorageRangeFor("t1", None, None, 100).map { case (k, v) => (new String(k), new String(v)) } should equal(l.sortWith(_._1 < _._1))
getMapStorageRangeFor("t1", None, None, 5).map { case (k, v) => (new String(k), new String(v)) }.size should equal(5)
}
}
val m1(n1) = a
val m2(n2) = a
val m3(n3) = a
describe("persistent vectors") {
it("should insert a single value") {
import MongoStorageBackend._
assertEquals(n1, 1)
assertEquals(n2, 4)
assertEquals(n3, 9)
}
insertVectorStorageEntryFor("t1", "martin odersky".getBytes)
insertVectorStorageEntryFor("t1", "james gosling".getBytes)
new String(getVectorStorageEntryFor("t1", 0)) should equal("james gosling")
new String(getVectorStorageEntryFor("t1", 1)) should equal("martin odersky")
}
// insert another one
// the previous one should be replaced
val b = List("100", "jonas")
MongoStorageBackend.insertRefStorageFor("U-R1", b)
MongoStorageBackend.getRefStorageFor("U-R1") match {
case None => fail("should not be empty")
case Some(r) => {
val a = r.asInstanceOf[JsValue]
val str_lst = list ! str
val str_lst(l) = a
assertEquals(b, l)
}
it("should insert multiple values") {
import MongoStorageBackend._
insertVectorStorageEntryFor("t1", "martin odersky".getBytes)
insertVectorStorageEntryFor("t1", "james gosling".getBytes)
insertVectorStorageEntriesFor("t1", List("ola bini".getBytes, "james strachan".getBytes, "dennis ritchie".getBytes))
new String(getVectorStorageEntryFor("t1", 0)) should equal("ola bini")
new String(getVectorStorageEntryFor("t1", 1)) should equal("james strachan")
new String(getVectorStorageEntryFor("t1", 2)) should equal("dennis ritchie")
new String(getVectorStorageEntryFor("t1", 3)) should equal("james gosling")
new String(getVectorStorageEntryFor("t1", 4)) should equal("martin odersky")
}
it("should fetch a range of values") {
import MongoStorageBackend._
insertVectorStorageEntryFor("t1", "martin odersky".getBytes)
insertVectorStorageEntryFor("t1", "james gosling".getBytes)
getVectorStorageSizeFor("t1") should equal(2)
insertVectorStorageEntriesFor("t1", List("ola bini".getBytes, "james strachan".getBytes, "dennis ritchie".getBytes))
getVectorStorageRangeFor("t1", None, None, 100).map(new String(_)) should equal(List("ola bini", "james strachan", "dennis ritchie", "james gosling", "martin odersky"))
getVectorStorageRangeFor("t1", Some(0), Some(5), 100).map(new String(_)) should equal(List("ola bini", "james strachan", "dennis ritchie", "james gosling", "martin odersky"))
getVectorStorageRangeFor("t1", Some(2), Some(5), 100).map(new String(_)) should equal(List("dennis ritchie", "james gosling", "martin odersky"))
getVectorStorageSizeFor("t1") should equal(5)
}
it("should insert and query complex structures") {
import MongoStorageBackend._
import sjson.json.DefaultProtocol._
import sjson.json.JsonSerialization._
// a list[AnyRef] should be added successfully
val l = List("ola bini".getBytes, tobinary(List(100, 200, 300)), tobinary(List(1, 2, 3)))
// for id = t1
insertVectorStorageEntriesFor("t1", l)
new String(getVectorStorageEntryFor("t1", 0)) should equal("ola bini")
frombinary[List[Int]](getVectorStorageEntryFor("t1", 1)) should equal(List(100, 200, 300))
frombinary[List[Int]](getVectorStorageEntryFor("t1", 2)) should equal(List(1, 2, 3))
getVectorStorageSizeFor("t1") should equal(3)
// some more for id = t1
val m = List(tobinary(Map(1 -> "dg", 2 -> "mc", 3 -> "nd")), tobinary(List("martin odersky", "james gosling")))
insertVectorStorageEntriesFor("t1", m)
// size should add up
getVectorStorageSizeFor("t1") should equal(5)
// now for a diff id
insertVectorStorageEntriesFor("t2", l)
getVectorStorageSizeFor("t2") should equal(3)
}
}
describe("persistent refs") {
it("should insert a ref") {
import MongoStorageBackend._
insertRefStorageFor("t1", "martin odersky".getBytes)
new String(getRefStorageFor("t1").get) should equal("martin odersky")
insertRefStorageFor("t1", "james gosling".getBytes)
new String(getRefStorageFor("t1").get) should equal("james gosling")
getRefStorageFor("t2") should equal(None)
}
}
}

View file

@ -0,0 +1,347 @@
package se.scalablesolutions.akka.persistence.mongo
import org.scalatest.Spec
import org.scalatest.matchers.ShouldMatchers
import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
import org.scalatest.junit.JUnitRunner
import org.junit.runner.RunWith
import se.scalablesolutions.akka.actor.{Actor, ActorRef}
import se.scalablesolutions.akka.config.OneForOneStrategy
import Actor._
import se.scalablesolutions.akka.stm.global._
import se.scalablesolutions.akka.config.ScalaConfig._
import se.scalablesolutions.akka.util.Logging
import MongoStorageBackend._
case class GET(k: String)
case class SET(k: String, v: String)
case class REM(k: String)
case class CONTAINS(k: String)
case object MAP_SIZE
case class MSET(kvs: List[(String, String)])
case class REMOVE_AFTER_PUT(kvsToAdd: List[(String, String)], ksToRem: List[String])
case class CLEAR_AFTER_PUT(kvsToAdd: List[(String, String)])
case class PUT_WITH_SLICE(kvsToAdd: List[(String, String)], start: String, cnt: Int)
case class PUT_REM_WITH_SLICE(kvsToAdd: List[(String, String)], ksToRem: List[String], start: String, cnt: Int)
case class VADD(v: String)
case class VUPD(i: Int, v: String)
case class VUPD_AND_ABORT(i: Int, v: String)
case class VGET(i: Int)
case object VSIZE
case class VGET_AFTER_VADD(vsToAdd: List[String], isToFetch: List[Int])
case class VADD_WITH_SLICE(vsToAdd: List[String], start: Int, cnt: Int)
object Storage {
class MongoSampleMapStorage extends Actor {
self.lifeCycle = Some(LifeCycle(Permanent))
val FOO_MAP = "akka.sample.map"
private var fooMap = atomic { MongoStorage.getMap(FOO_MAP) }
def receive = {
case SET(k, v) =>
atomic {
fooMap += (k.getBytes, v.getBytes)
}
self.reply((k, v))
case GET(k) =>
val v = atomic {
fooMap.get(k.getBytes).map(new String(_)).getOrElse(k + " Not found")
}
self.reply(v)
case REM(k) =>
val v = atomic {
fooMap -= k.getBytes
}
self.reply(k)
case CONTAINS(k) =>
val v = atomic {
fooMap contains k.getBytes
}
self.reply(v)
case MAP_SIZE =>
val v = atomic {
fooMap.size
}
self.reply(v)
case MSET(kvs) => atomic {
kvs.foreach {kv => fooMap += (kv._1.getBytes, kv._2.getBytes) }
}
self.reply(kvs.size)
case REMOVE_AFTER_PUT(kvs2add, ks2rem) => atomic {
kvs2add.foreach {kv =>
fooMap += (kv._1.getBytes, kv._2.getBytes)
}
ks2rem.foreach {k =>
fooMap -= k.getBytes
}}
self.reply(fooMap.size)
case CLEAR_AFTER_PUT(kvs2add) => atomic {
kvs2add.foreach {kv =>
fooMap += (kv._1.getBytes, kv._2.getBytes)
}
fooMap.clear
}
self.reply(true)
case PUT_WITH_SLICE(kvs2add, from, cnt) =>
val v = atomic {
kvs2add.foreach {kv =>
fooMap += (kv._1.getBytes, kv._2.getBytes)
}
fooMap.slice(Some(from.getBytes), cnt)
}
self.reply(v: List[(Array[Byte], Array[Byte])])
case PUT_REM_WITH_SLICE(kvs2add, ks2rem, from, cnt) =>
val v = atomic {
kvs2add.foreach {kv =>
fooMap += (kv._1.getBytes, kv._2.getBytes)
}
ks2rem.foreach {k =>
fooMap -= k.getBytes
}
fooMap.slice(Some(from.getBytes), cnt)
}
self.reply(v: List[(Array[Byte], Array[Byte])])
}
}
class MongoSampleVectorStorage extends Actor {
self.lifeCycle = Some(LifeCycle(Permanent))
val FOO_VECTOR = "akka.sample.vector"
private var fooVector = atomic { MongoStorage.getVector(FOO_VECTOR) }
def receive = {
case VADD(v) =>
val size =
atomic {
fooVector + v.getBytes
fooVector length
}
self.reply(size)
case VGET(index) =>
val ind =
atomic {
fooVector get index
}
self.reply(ind)
case VGET_AFTER_VADD(vs, is) =>
val els =
atomic {
vs.foreach(fooVector + _.getBytes)
(is.foldRight(List[Array[Byte]]())(fooVector.get(_) :: _)).map(new String(_))
}
self.reply(els)
case VUPD_AND_ABORT(index, value) =>
val l =
atomic {
fooVector.update(index, value.getBytes)
// force fail
fooVector get 100
}
self.reply(index)
case VADD_WITH_SLICE(vs, s, c) =>
val l =
atomic {
vs.foreach(fooVector + _.getBytes)
fooVector.slice(Some(s), None, c)
}
self.reply(l.map(new String(_)))
}
}
}
import Storage._
@RunWith(classOf[JUnitRunner])
class MongoTicket343Spec extends
Spec with
ShouldMatchers with
BeforeAndAfterAll with
BeforeAndAfterEach {
override def beforeAll {
MongoStorageBackend.drop
println("** destroyed database")
}
override def beforeEach {
MongoStorageBackend.drop
println("** destroyed database")
}
override def afterEach {
MongoStorageBackend.drop
println("** destroyed database")
}
describe("Ticket 343 Issue #1") {
it("remove after put should work within the same transaction") {
val proc = actorOf[MongoSampleMapStorage]
proc.start
(proc !! SET("debasish", "anshinsoft")).getOrElse("Set failed") should equal(("debasish", "anshinsoft"))
(proc !! GET("debasish")).getOrElse("Get failed") should equal("anshinsoft")
(proc !! MAP_SIZE).getOrElse("Size failed") should equal(1)
(proc !! MSET(List(("dg", "1"), ("mc", "2"), ("nd", "3")))).getOrElse("Mset failed") should equal(3)
(proc !! GET("dg")).getOrElse("Get failed") should equal("1")
(proc !! GET("mc")).getOrElse("Get failed") should equal("2")
(proc !! GET("nd")).getOrElse("Get failed") should equal("3")
(proc !! MAP_SIZE).getOrElse("Size failed") should equal(4)
val add = List(("a", "1"), ("b", "2"), ("c", "3"))
val rem = List("a", "debasish")
(proc !! REMOVE_AFTER_PUT(add, rem)).getOrElse("REMOVE_AFTER_PUT failed") should equal(5)
(proc !! GET("debasish")).getOrElse("debasish not found") should equal("debasish Not found")
(proc !! GET("a")).getOrElse("a not found") should equal("a Not found")
(proc !! GET("b")).getOrElse("b not found") should equal("2")
(proc !! CONTAINS("b")).getOrElse("b not found") should equal(true)
(proc !! CONTAINS("debasish")).getOrElse("debasish not found") should equal(false)
(proc !! MAP_SIZE).getOrElse("Size failed") should equal(5)
proc.stop
}
}
describe("Ticket 343 Issue #2") {
it("clear after put should work within the same transaction") {
val proc = actorOf[MongoSampleMapStorage]
proc.start
(proc !! SET("debasish", "anshinsoft")).getOrElse("Set failed") should equal(("debasish", "anshinsoft"))
(proc !! GET("debasish")).getOrElse("Get failed") should equal("anshinsoft")
(proc !! MAP_SIZE).getOrElse("Size failed") should equal(1)
val add = List(("a", "1"), ("b", "2"), ("c", "3"))
(proc !! CLEAR_AFTER_PUT(add)).getOrElse("CLEAR_AFTER_PUT failed") should equal(true)
(proc !! MAP_SIZE).getOrElse("Size failed") should equal(1)
proc.stop
}
}
describe("Ticket 343 Issue #3") {
it("map size should change after the transaction") {
val proc = actorOf[MongoSampleMapStorage]
proc.start
(proc !! SET("debasish", "anshinsoft")).getOrElse("Set failed") should equal(("debasish", "anshinsoft"))
(proc !! GET("debasish")).getOrElse("Get failed") should equal("anshinsoft")
(proc !! MAP_SIZE).getOrElse("Size failed") should equal(1)
(proc !! MSET(List(("dg", "1"), ("mc", "2"), ("nd", "3")))).getOrElse("Mset failed") should equal(3)
(proc !! MAP_SIZE).getOrElse("Size failed") should equal(4)
(proc !! GET("dg")).getOrElse("Get failed") should equal("1")
(proc !! GET("mc")).getOrElse("Get failed") should equal("2")
(proc !! GET("nd")).getOrElse("Get failed") should equal("3")
proc.stop
}
}
describe("slice test") {
it("should pass") {
val proc = actorOf[MongoSampleMapStorage]
proc.start
(proc !! SET("debasish", "anshinsoft")).getOrElse("Set failed") should equal(("debasish", "anshinsoft"))
(proc !! GET("debasish")).getOrElse("Get failed") should equal("anshinsoft")
// (proc !! MAP_SIZE).getOrElse("Size failed") should equal(1)
(proc !! MSET(List(("dg", "1"), ("mc", "2"), ("nd", "3")))).getOrElse("Mset failed") should equal(3)
(proc !! MAP_SIZE).getOrElse("Size failed") should equal(4)
(proc !! PUT_WITH_SLICE(List(("ec", "1"), ("tb", "2"), ("mc", "10")), "dg", 3)).get.asInstanceOf[List[(Array[Byte], Array[Byte])]].map { case (k, v) => (new String(k), new String(v)) } should equal(List(("dg", "1"), ("ec", "1"), ("mc", "10")))
(proc !! PUT_REM_WITH_SLICE(List(("fc", "1"), ("gb", "2"), ("xy", "10")), List("tb", "fc"), "dg", 5)).get.asInstanceOf[List[(Array[Byte], Array[Byte])]].map { case (k, v) => (new String(k), new String(v)) } should equal(List(("dg", "1"), ("ec", "1"), ("gb", "2"), ("mc", "10"), ("nd", "3")))
proc.stop
}
}
describe("Ticket 343 Issue #4") {
it("vector get should not ignore elements that were in vector before transaction") {
val proc = actorOf[MongoSampleVectorStorage]
proc.start
// add 4 elements in separate transactions
(proc !! VADD("debasish")).getOrElse("VADD failed") should equal(1)
(proc !! VADD("maulindu")).getOrElse("VADD failed") should equal(2)
(proc !! VADD("ramanendu")).getOrElse("VADD failed") should equal(3)
(proc !! VADD("nilanjan")).getOrElse("VADD failed") should equal(4)
new String((proc !! VGET(0)).get.asInstanceOf[Array[Byte]] ) should equal("nilanjan")
new String((proc !! VGET(1)).get.asInstanceOf[Array[Byte]] ) should equal("ramanendu")
new String((proc !! VGET(2)).get.asInstanceOf[Array[Byte]] ) should equal("maulindu")
new String((proc !! VGET(3)).get.asInstanceOf[Array[Byte]] ) should equal("debasish")
// now add 3 more and do gets in the same transaction
(proc !! VGET_AFTER_VADD(List("a", "b", "c"), List(0, 2, 4))).get.asInstanceOf[List[String]] should equal(List("c", "a", "ramanendu"))
proc.stop
}
}
describe("Ticket 343 Issue #6") {
it("vector update should not ignore transaction") {
val proc = actorOf[MongoSampleVectorStorage]
proc.start
// add 4 elements in separate transactions
(proc !! VADD("debasish")).getOrElse("VADD failed") should equal(1)
(proc !! VADD("maulindu")).getOrElse("VADD failed") should equal(2)
(proc !! VADD("ramanendu")).getOrElse("VADD failed") should equal(3)
(proc !! VADD("nilanjan")).getOrElse("VADD failed") should equal(4)
evaluating {
(proc !! VUPD_AND_ABORT(0, "virat")).getOrElse("VUPD_AND_ABORT failed")
} should produce [Exception]
// update aborts and hence values will remain unchanged
new String((proc !! VGET(0)).get.asInstanceOf[Array[Byte]] ) should equal("nilanjan")
proc.stop
}
}
describe("Ticket 343 Issue #5") {
it("vector slice() should not ignore elements added in current transaction") {
val proc = actorOf[MongoSampleVectorStorage]
proc.start
// add 4 elements in separate transactions
(proc !! VADD("debasish")).getOrElse("VADD failed") should equal(1)
(proc !! VADD("maulindu")).getOrElse("VADD failed") should equal(2)
(proc !! VADD("ramanendu")).getOrElse("VADD failed") should equal(3)
(proc !! VADD("nilanjan")).getOrElse("VADD failed") should equal(4)
// slice with no new elements added in current transaction
(proc !! VADD_WITH_SLICE(List(), 2, 2)).getOrElse("VADD_WITH_SLICE failed") should equal(Vector("maulindu", "debasish"))
// slice with new elements added in current transaction
(proc !! VADD_WITH_SLICE(List("a", "b", "c", "d"), 2, 2)).getOrElse("VADD_WITH_SLICE failed") should equal(Vector("b", "a"))
proc.stop
}
}
}

View file

@ -36,7 +36,7 @@ object RedisStorage extends Storage {
*
* @author <a href="http://debasishg.blogspot.com">Debasish Ghosh</a>
*/
class RedisPersistentMap(id: String) extends PersistentMap[Array[Byte], Array[Byte]] {
class RedisPersistentMap(id: String) extends PersistentMapBinary {
val uuid = id
val storage = RedisStorageBackend
}

View file

@ -96,12 +96,12 @@ private [akka] object RedisStorageBackend extends
* <li>both parts of the key need to be based64 encoded since there can be spaces within each of them</li>
*/
private [this] def makeRedisKey(name: String, key: Array[Byte]): String = withErrorHandling {
"%s:%s".format(name, byteArrayToString(key))
"%s:%s".format(name, new String(key))
}
private [this] def makeKeyFromRedisKey(redisKey: String) = withErrorHandling {
val nk = redisKey.split(':')
(nk(0), stringToByteArray(nk(1)))
(nk(0), nk(1).getBytes)
}
private [this] def mset(entries: List[(String, String)]): Unit = withErrorHandling {
@ -124,27 +124,22 @@ private [akka] object RedisStorageBackend extends
}
def getMapStorageEntryFor(name: String, key: Array[Byte]): Option[Array[Byte]] = withErrorHandling {
db.get(makeRedisKey(name, key)) match {
case None =>
throw new NoSuchElementException(new String(key) + " not present")
case Some(s) => Some(stringToByteArray(s))
db.get(makeRedisKey(name, key))
.map(stringToByteArray(_))
.orElse(throw new NoSuchElementException(new String(key) + " not present"))
}
}
def getMapStorageSizeFor(name: String): Int = withErrorHandling {
db.keys("%s:*".format(name)) match {
case None => 0
case Some(keys) => keys.length
}
db.keys("%s:*".format(name)).map(_.length).getOrElse(0)
}
def getMapStorageFor(name: String): List[(Array[Byte], Array[Byte])] = withErrorHandling {
db.keys("%s:*".format(name)) match {
case None =>
throw new NoSuchElementException(name + " not present")
case Some(keys) =>
db.keys("%s:*".format(name))
.map { keys =>
keys.map(key => (makeKeyFromRedisKey(key.get)._2, stringToByteArray(db.get(key.get).get))).toList
}
}.getOrElse {
throw new NoSuchElementException(name + " not present")
}
}
def getMapStorageRangeFor(name: String, start: Option[Array[Byte]],
@ -207,12 +202,11 @@ private [akka] object RedisStorageBackend extends
}
def getVectorStorageEntryFor(name: String, index: Int): Array[Byte] = withErrorHandling {
db.lindex(name, index) match {
case None =>
db.lindex(name, index)
.map(stringToByteArray(_))
.getOrElse {
throw new NoSuchElementException(name + " does not have element at " + index)
case Some(e) =>
stringToByteArray(e)
}
}
}
/**
@ -252,11 +246,11 @@ private [akka] object RedisStorageBackend extends
}
def getRefStorageFor(name: String): Option[Array[Byte]] = withErrorHandling {
db.get(name) match {
case None =>
db.get(name)
.map(stringToByteArray(_))
.orElse {
throw new NoSuchElementException(name + " not present")
case Some(s) => Some(stringToByteArray(s))
}
}
}
// add to the end of the queue
@ -266,11 +260,11 @@ private [akka] object RedisStorageBackend extends
// pop from the front of the queue
def dequeue(name: String): Option[Array[Byte]] = withErrorHandling {
db.lpop(name) match {
case None =>
db.lpop(name)
.map(stringToByteArray(_))
.orElse {
throw new NoSuchElementException(name + " not present")
case Some(s) => Some(stringToByteArray(s))
}
}
}
// get the size of the queue
@ -302,26 +296,19 @@ private [akka] object RedisStorageBackend extends
// completely delete the queue
def remove(name: String): Boolean = withErrorHandling {
db.del(name) match {
case Some(1) => true
case _ => false
}
db.del(name).map { case 1 => true }.getOrElse(false)
}
// add item to sorted set identified by name
def zadd(name: String, zscore: String, item: Array[Byte]): Boolean = withErrorHandling {
db.zadd(name, zscore, byteArrayToString(item)) match {
case Some(1) => true
case _ => false
}
db.zadd(name, zscore, byteArrayToString(item))
.map { case 1 => true }.getOrElse(false)
}
// remove item from sorted set identified by name
def zrem(name: String, item: Array[Byte]): Boolean = withErrorHandling {
db.zrem(name, byteArrayToString(item)) match {
case Some(1) => true
case _ => false
}
db.zrem(name, byteArrayToString(item))
.map { case 1 => true }.getOrElse(false)
}
// cardinality of the set identified by name
@ -330,29 +317,23 @@ private [akka] object RedisStorageBackend extends
}
def zscore(name: String, item: Array[Byte]): Option[Float] = withErrorHandling {
db.zscore(name, byteArrayToString(item)) match {
case Some(s) => Some(s.toFloat)
case None => None
}
db.zscore(name, byteArrayToString(item)).map(_.toFloat)
}
def zrange(name: String, start: Int, end: Int): List[Array[Byte]] = withErrorHandling {
db.zrange(name, start.toString, end.toString, RedisClient.ASC, false) match {
case None =>
db.zrange(name, start.toString, end.toString, RedisClient.ASC, false)
.map(_.map(e => stringToByteArray(e.get)))
.getOrElse {
throw new NoSuchElementException(name + " not present")
case Some(s) =>
s.map(e => stringToByteArray(e.get))
}
}
}
def zrangeWithScore(name: String, start: Int, end: Int): List[(Array[Byte], Float)] = withErrorHandling {
db.zrangeWithScore(
name, start.toString, end.toString, RedisClient.ASC) match {
case None =>
throw new NoSuchElementException(name + " not present")
case Some(l) =>
l.map{ case (elem, score) => (stringToByteArray(elem.get), score.get.toFloat) }
}
db.zrangeWithScore(name, start.toString, end.toString, RedisClient.ASC)
.map(_.map { case (elem, score) => (stringToByteArray(elem.get), score.get.toFloat) })
.getOrElse {
throw new NoSuchElementException(name + " not present")
}
}
def flushDB = withErrorHandling(db.flushdb)

View file

@ -0,0 +1,351 @@
package se.scalablesolutions.akka.persistence.redis
import org.scalatest.Spec
import org.scalatest.matchers.ShouldMatchers
import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
import org.scalatest.junit.JUnitRunner
import org.junit.runner.RunWith
import se.scalablesolutions.akka.actor.{Actor}
import se.scalablesolutions.akka.config.OneForOneStrategy
import Actor._
import se.scalablesolutions.akka.persistence.common.PersistentVector
import se.scalablesolutions.akka.stm.global._
import se.scalablesolutions.akka.config.ScalaConfig._
import se.scalablesolutions.akka.util.Logging
import RedisStorageBackend._
case class GET(k: String)
case class SET(k: String, v: String)
case class REM(k: String)
case class CONTAINS(k: String)
case object MAP_SIZE
case class MSET(kvs: List[(String, String)])
case class REMOVE_AFTER_PUT(kvsToAdd: List[(String, String)], ksToRem: List[String])
case class CLEAR_AFTER_PUT(kvsToAdd: List[(String, String)])
case class PUT_WITH_SLICE(kvsToAdd: List[(String, String)], start: String, cnt: Int)
case class PUT_REM_WITH_SLICE(kvsToAdd: List[(String, String)], ksToRem: List[String], start: String, cnt: Int)
case class VADD(v: String)
case class VUPD(i: Int, v: String)
case class VUPD_AND_ABORT(i: Int, v: String)
case class VGET(i: Int)
case object VSIZE
case class VGET_AFTER_VADD(vsToAdd: List[String], isToFetch: List[Int])
case class VADD_WITH_SLICE(vsToAdd: List[String], start: Int, cnt: Int)
object Storage {
class RedisSampleMapStorage extends Actor {
self.lifeCycle = Some(LifeCycle(Permanent))
val FOO_MAP = "akka.sample.map"
private var fooMap = atomic { RedisStorage.getMap(FOO_MAP) }
def receive = {
case SET(k, v) =>
atomic {
fooMap += (k.getBytes, v.getBytes)
}
self.reply((k, v))
case GET(k) =>
val v = atomic {
fooMap.get(k.getBytes)
}
self.reply(v.collect {case byte => new String(byte)}.getOrElse(k + " Not found"))
case REM(k) =>
val v = atomic {
fooMap -= k.getBytes
}
self.reply(k)
case CONTAINS(k) =>
val v = atomic {
fooMap contains k.getBytes
}
self.reply(v)
case MAP_SIZE =>
val v = atomic {
fooMap.size
}
self.reply(v)
case MSET(kvs) =>
atomic {
kvs.foreach {kv =>
fooMap += (kv._1.getBytes, kv._2.getBytes)
}
}
self.reply(kvs.size)
case REMOVE_AFTER_PUT(kvs2add, ks2rem) =>
val v =
atomic {
kvs2add.foreach {kv =>
fooMap += (kv._1.getBytes, kv._2.getBytes)
}
ks2rem.foreach {k =>
fooMap -= k.getBytes
}
fooMap.size
}
self.reply(v)
case CLEAR_AFTER_PUT(kvs2add) =>
atomic {
kvs2add.foreach {kv =>
fooMap += (kv._1.getBytes, kv._2.getBytes)
}
fooMap.clear
}
self.reply(true)
case PUT_WITH_SLICE(kvs2add, from, cnt) =>
val v =
atomic {
kvs2add.foreach {kv =>
fooMap += (kv._1.getBytes, kv._2.getBytes)
}
fooMap.slice(Some(from.getBytes), cnt)
}
self.reply(v: List[(Array[Byte], Array[Byte])])
case PUT_REM_WITH_SLICE(kvs2add, ks2rem, from, cnt) =>
val v =
atomic {
kvs2add.foreach {kv =>
fooMap += (kv._1.getBytes, kv._2.getBytes)
}
ks2rem.foreach {k =>
fooMap -= k.getBytes
}
fooMap.slice(Some(from.getBytes), cnt)
}
self.reply(v: List[(Array[Byte], Array[Byte])])
}
}
class RedisSampleVectorStorage extends Actor {
self.lifeCycle = Some(LifeCycle(Permanent))
val FOO_VECTOR = "akka.sample.vector"
private var fooVector = atomic { RedisStorage.getVector(FOO_VECTOR) }
def receive = {
case VADD(v) =>
val size =
atomic {
fooVector + v.getBytes
fooVector length
}
self.reply(size)
case VGET(index) =>
val ind =
atomic {
fooVector get index
}
self.reply(ind)
case VGET_AFTER_VADD(vs, is) =>
val els =
atomic {
vs.foreach(fooVector + _.getBytes)
(is.foldRight(List[Array[Byte]]())(fooVector.get(_) :: _)).map(new String(_))
}
self.reply(els)
case VUPD_AND_ABORT(index, value) =>
val l =
atomic {
fooVector.update(index, value.getBytes)
// force fail
fooVector get 100
}
self.reply(index)
case VADD_WITH_SLICE(vs, s, c) =>
val l =
atomic {
vs.foreach(fooVector + _.getBytes)
fooVector.slice(Some(s), None, c)
}
self.reply(l.map(new String(_)))
}
}
}
import Storage._
@RunWith(classOf[JUnitRunner])
class RedisTicket343Spec extends
Spec with
ShouldMatchers with
BeforeAndAfterAll with
BeforeAndAfterEach {
override def beforeAll {
flushDB
println("** destroyed database")
}
override def afterEach {
flushDB
println("** destroyed database")
}
describe("Ticket 343 Issue #1") {
it("remove after put should work within the same transaction") {
val proc = actorOf[RedisSampleMapStorage]
proc.start
(proc !! SET("debasish", "anshinsoft")).getOrElse("Set failed") should equal(("debasish", "anshinsoft"))
(proc !! GET("debasish")).getOrElse("Get failed") should equal("anshinsoft")
(proc !! MAP_SIZE).getOrElse("Size failed") should equal(1)
(proc !! MSET(List(("dg", "1"), ("mc", "2"), ("nd", "3")))).getOrElse("Mset failed") should equal(3)
(proc !! GET("dg")).getOrElse("Get failed") should equal("1")
(proc !! GET("mc")).getOrElse("Get failed") should equal("2")
(proc !! GET("nd")).getOrElse("Get failed") should equal("3")
(proc !! MAP_SIZE).getOrElse("Size failed") should equal(4)
val add = List(("a", "1"), ("b", "2"), ("c", "3"))
val rem = List("a", "debasish")
(proc !! REMOVE_AFTER_PUT(add, rem)).getOrElse("REMOVE_AFTER_PUT failed") should equal(5)
(proc !! GET("debasish")).getOrElse("debasish not found") should equal("debasish Not found")
(proc !! GET("a")).getOrElse("a not found") should equal("a Not found")
(proc !! GET("b")).getOrElse("b not found") should equal("2")
(proc !! CONTAINS("b")).getOrElse("b not found") should equal(true)
(proc !! CONTAINS("debasish")).getOrElse("debasish not found") should equal(false)
(proc !! MAP_SIZE).getOrElse("Size failed") should equal(5)
proc.stop
}
}
describe("Ticket 343 Issue #2") {
it("clear after put should work within the same transaction") {
val proc = actorOf[RedisSampleMapStorage]
proc.start
(proc !! SET("debasish", "anshinsoft")).getOrElse("Set failed") should equal(("debasish", "anshinsoft"))
(proc !! GET("debasish")).getOrElse("Get failed") should equal("anshinsoft")
(proc !! MAP_SIZE).getOrElse("Size failed") should equal(1)
val add = List(("a", "1"), ("b", "2"), ("c", "3"))
(proc !! CLEAR_AFTER_PUT(add)).getOrElse("CLEAR_AFTER_PUT failed") should equal(true)
(proc !! MAP_SIZE).getOrElse("Size failed") should equal(1)
proc.stop
}
}
describe("Ticket 343 Issue #3") {
it("map size should change after the transaction") {
val proc = actorOf[RedisSampleMapStorage]
proc.start
(proc !! SET("debasish", "anshinsoft")).getOrElse("Set failed") should equal(("debasish", "anshinsoft"))
(proc !! GET("debasish")).getOrElse("Get failed") should equal("anshinsoft")
(proc !! MAP_SIZE).getOrElse("Size failed") should equal(1)
(proc !! MSET(List(("dg", "1"), ("mc", "2"), ("nd", "3")))).getOrElse("Mset failed") should equal(3)
(proc !! MAP_SIZE).getOrElse("Size failed") should equal(4)
(proc !! GET("dg")).getOrElse("Get failed") should equal("1")
(proc !! GET("mc")).getOrElse("Get failed") should equal("2")
(proc !! GET("nd")).getOrElse("Get failed") should equal("3")
proc.stop
}
}
describe("slice test") {
it("should pass") {
val proc = actorOf[RedisSampleMapStorage]
proc.start
(proc !! SET("debasish", "anshinsoft")).getOrElse("Set failed") should equal(("debasish", "anshinsoft"))
(proc !! GET("debasish")).getOrElse("Get failed") should equal("anshinsoft")
(proc !! MAP_SIZE).getOrElse("Size failed") should equal(1)
(proc !! MSET(List(("dg", "1"), ("mc", "2"), ("nd", "3")))).getOrElse("Mset failed") should equal(3)
(proc !! MAP_SIZE).getOrElse("Size failed") should equal(4)
(proc !! PUT_WITH_SLICE(List(("ec", "1"), ("tb", "2"), ("mc", "10")), "dg", 3)).get.asInstanceOf[List[(Array[Byte], Array[Byte])]].map { case (k, v) => (new String(k), new String(v)) } should equal(List(("dg", "1"), ("ec", "1"), ("mc", "10")))
(proc !! PUT_REM_WITH_SLICE(List(("fc", "1"), ("gb", "2"), ("xy", "10")), List("tb", "fc"), "dg", 5)).get.asInstanceOf[List[(Array[Byte], Array[Byte])]].map { case (k, v) => (new String(k), new String(v)) } should equal(List(("dg", "1"), ("ec", "1"), ("gb", "2"), ("mc", "10"), ("nd", "3")))
proc.stop
}
}
describe("Ticket 343 Issue #4") {
it("vector get should not ignore elements that were in vector before transaction") {
val proc = actorOf[RedisSampleVectorStorage]
proc.start
// add 4 elements in separate transactions
(proc !! VADD("debasish")).getOrElse("VADD failed") should equal(1)
(proc !! VADD("maulindu")).getOrElse("VADD failed") should equal(2)
(proc !! VADD("ramanendu")).getOrElse("VADD failed") should equal(3)
(proc !! VADD("nilanjan")).getOrElse("VADD failed") should equal(4)
new String((proc !! VGET(0)).get.asInstanceOf[Array[Byte]] ) should equal("nilanjan")
new String((proc !! VGET(1)).get.asInstanceOf[Array[Byte]] ) should equal("ramanendu")
new String((proc !! VGET(2)).get.asInstanceOf[Array[Byte]] ) should equal("maulindu")
new String((proc !! VGET(3)).get.asInstanceOf[Array[Byte]] ) should equal("debasish")
// now add 3 more and do gets in the same transaction
(proc !! VGET_AFTER_VADD(List("a", "b", "c"), List(0, 2, 4))).get.asInstanceOf[List[String]] should equal(List("c", "a", "ramanendu"))
proc.stop
}
}
describe("Ticket 343 Issue #6") {
it("vector update should not ignore transaction") {
val proc = actorOf[RedisSampleVectorStorage]
proc.start
// add 4 elements in separate transactions
(proc !! VADD("debasish")).getOrElse("VADD failed") should equal(1)
(proc !! VADD("maulindu")).getOrElse("VADD failed") should equal(2)
(proc !! VADD("ramanendu")).getOrElse("VADD failed") should equal(3)
(proc !! VADD("nilanjan")).getOrElse("VADD failed") should equal(4)
evaluating {
(proc !! VUPD_AND_ABORT(0, "virat")).getOrElse("VUPD_AND_ABORT failed")
} should produce [Exception]
// update aborts and hence values will remain unchanged
new String((proc !! VGET(0)).get.asInstanceOf[Array[Byte]] ) should equal("nilanjan")
proc.stop
}
}
describe("Ticket 343 Issue #5") {
it("vector slice() should not ignore elements added in current transaction") {
val proc = actorOf[RedisSampleVectorStorage]
proc.start
// add 4 elements in separate transactions
(proc !! VADD("debasish")).getOrElse("VADD failed") should equal(1)
(proc !! VADD("maulindu")).getOrElse("VADD failed") should equal(2)
(proc !! VADD("ramanendu")).getOrElse("VADD failed") should equal(3)
(proc !! VADD("nilanjan")).getOrElse("VADD failed") should equal(4)
// slice with no new elements added in current transaction
(proc !! VADD_WITH_SLICE(List(), 2, 2)).getOrElse("VADD_WITH_SLICE failed") should equal(Vector("maulindu", "debasish"))
// slice with new elements added in current transaction
(proc !! VADD_WITH_SLICE(List("a", "b", "c", "d"), 2, 2)).getOrElse("VADD_WITH_SLICE failed") should equal(Vector("b", "a"))
proc.stop
}
}
}

View file

@ -49,6 +49,8 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) {
lazy val JavaNetRepo = MavenRepository("java.net Repo", "http://download.java.net/maven/2")
lazy val SonatypeSnapshotRepo = MavenRepository("Sonatype OSS Repo", "http://oss.sonatype.org/content/repositories/releases")
lazy val SunJDMKRepo = MavenRepository("Sun JDMK Repo", "http://wp5.e-taxonomy.eu/cdmlib/mavenrepo")
lazy val CasbahRepoSnapshots = MavenRepository("Casbah Snapshot Repo", "http://repo.bumnetworks.com/snapshots/")
lazy val CasbahRepoReleases = MavenRepository("Casbah Snapshot Repo", "http://repo.bumnetworks.com/releases/")
}
// -------------------------------------------------------------------------------------------------------------------
@ -75,6 +77,8 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) {
lazy val scalaTestModuleConfig = ModuleConfiguration("org.scalatest", ScalaToolsSnapshots)
lazy val logbackModuleConfig = ModuleConfiguration("ch.qos.logback",sbt.DefaultMavenRepository)
lazy val atomikosModuleConfig = ModuleConfiguration("com.atomikos",sbt.DefaultMavenRepository)
lazy val casbahSnapshot = ModuleConfiguration("com.novus",CasbahRepoSnapshots)
lazy val casbahRelease = ModuleConfiguration("com.novus",CasbahRepoReleases)
lazy val embeddedRepo = EmbeddedRepo // This is the only exception, because the embedded repo is fast!
// -------------------------------------------------------------------------------------------------------------------
@ -166,6 +170,10 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) {
lazy val mongo = "org.mongodb" % "mongo-java-driver" % "2.0" % "compile"
lazy val casbah = "com.novus" % "casbah_2.8.0" % "1.0.8.5" % "compile"
lazy val time = "org.scala-tools" % "time" % "2.8.0-SNAPSHOT-0.2-SNAPSHOT" % "compile"
lazy val multiverse = "org.multiverse" % "multiverse-alpha" % MULTIVERSE_VERSION % "compile" intransitive
lazy val netty = "org.jboss.netty" % "netty" % "3.2.2.Final" % "compile"
@ -180,7 +188,7 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) {
lazy val sbinary = "sbinary" % "sbinary" % "2.8.0-0.3.1" % "compile"
lazy val sjson = "sjson.json" % "sjson" % "0.7-2.8.0" % "compile"
lazy val sjson = "sjson.json" % "sjson" % "0.8-SNAPSHOT-2.8.0" % "compile"
lazy val slf4j = "org.slf4j" % "slf4j-api" % SLF4J_VERSION % "compile"
@ -474,7 +482,7 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) {
val commons_codec = Dependencies.commons_codec
val redis = Dependencies.redis
override def testOptions = TestFilter((name: String) => name.endsWith("Test")) :: Nil
// override def testOptions = TestFilter((name: String) => name.endsWith("Test")) :: Nil
}
// -------------------------------------------------------------------------------------------------------------------
@ -483,8 +491,9 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) {
class AkkaMongoProject(info: ProjectInfo) extends AkkaDefaultProject(info, distPath) {
val mongo = Dependencies.mongo
val casbah = Dependencies.casbah
override def testOptions = TestFilter((name: String) => name.endsWith("Test")) :: Nil
// override def testOptions = TestFilter((name: String) => name.endsWith("Test")) :: Nil
}
// -------------------------------------------------------------------------------------------------------------------