Refactor mongodb module to confirm to Redis and Cassandra. Issue #430
This commit is contained in:
parent
ba7801c5dc
commit
44c0d5b075
7 changed files with 392 additions and 721 deletions
|
|
@ -9,7 +9,7 @@ import se.scalablesolutions.akka.persistence.common._
|
|||
import se.scalablesolutions.akka.util.UUID
|
||||
|
||||
object MongoStorage extends Storage {
|
||||
type ElementType = AnyRef
|
||||
type ElementType = Array[Byte]
|
||||
|
||||
def newMap: PersistentMap[ElementType, ElementType] = newMap(UUID.newUuid.toString)
|
||||
def newVector: PersistentVector[ElementType] = newVector(UUID.newUuid.toString)
|
||||
|
|
@ -29,7 +29,7 @@ object MongoStorage extends Storage {
|
|||
*
|
||||
* @author <a href="http://debasishg.blogspot.com">Debasish Ghosh</a>
|
||||
*/
|
||||
class MongoPersistentMap(id: String) extends PersistentMap[AnyRef, AnyRef] {
|
||||
class MongoPersistentMap(id: String) extends PersistentMap[Array[Byte], Array[Byte]] {
|
||||
val uuid = id
|
||||
val storage = MongoStorageBackend
|
||||
}
|
||||
|
|
@ -40,12 +40,12 @@ class MongoPersistentMap(id: String) extends PersistentMap[AnyRef, AnyRef] {
|
|||
*
|
||||
* @author <a href="http://debasishg.blogspot.com">Debaissh Ghosh</a>
|
||||
*/
|
||||
class MongoPersistentVector(id: String) extends PersistentVector[AnyRef] {
|
||||
class MongoPersistentVector(id: String) extends PersistentVector[Array[Byte]] {
|
||||
val uuid = id
|
||||
val storage = MongoStorageBackend
|
||||
}
|
||||
|
||||
class MongoPersistentRef(id: String) extends PersistentRef[AnyRef] {
|
||||
class MongoPersistentRef(id: String) extends PersistentRef[Array[Byte]] {
|
||||
val uuid = id
|
||||
val storage = MongoStorageBackend
|
||||
}
|
||||
|
|
|
|||
|
|
@ -9,13 +9,8 @@ import se.scalablesolutions.akka.persistence.common._
|
|||
import se.scalablesolutions.akka.util.Logging
|
||||
import se.scalablesolutions.akka.config.Config.config
|
||||
|
||||
import sjson.json.Serializer._
|
||||
|
||||
import java.util.NoSuchElementException
|
||||
|
||||
import com.mongodb._
|
||||
|
||||
import java.util.{Map=>JMap, List=>JList, ArrayList=>JArrayList}
|
||||
import com.novus.casbah.mongodb.Imports._
|
||||
|
||||
/**
|
||||
* A module for supporting MongoDB based persistence.
|
||||
|
|
@ -28,294 +23,189 @@ import java.util.{Map=>JMap, List=>JList, ArrayList=>JArrayList}
|
|||
* @author <a href="http://debasishg.blogspot.com">Debasish Ghosh</a>
|
||||
*/
|
||||
private[akka] object MongoStorageBackend extends
|
||||
MapStorageBackend[AnyRef, AnyRef] with
|
||||
VectorStorageBackend[AnyRef] with
|
||||
RefStorageBackend[AnyRef] with
|
||||
MapStorageBackend[Array[Byte], Array[Byte]] with
|
||||
VectorStorageBackend[Array[Byte]] with
|
||||
RefStorageBackend[Array[Byte]] with
|
||||
Logging {
|
||||
|
||||
// enrich with null safe findOne
|
||||
class RichDBCollection(value: DBCollection) {
|
||||
def findOneNS(o: DBObject): Option[DBObject] = {
|
||||
value.findOne(o) match {
|
||||
case null => None
|
||||
case x => Some(x)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
implicit def enrichDBCollection(c: DBCollection) = new RichDBCollection(c)
|
||||
|
||||
val KEY = "key"
|
||||
val VALUE = "value"
|
||||
val KEY = "__key"
|
||||
val REF = "__ref"
|
||||
val COLLECTION = "akka_coll"
|
||||
|
||||
val MONGODB_SERVER_HOSTNAME = config.getString("akka.storage.mongodb.hostname", "127.0.0.1")
|
||||
val MONGODB_SERVER_DBNAME = config.getString("akka.storage.mongodb.dbname", "testdb")
|
||||
val MONGODB_SERVER_PORT = config.getInt("akka.storage.mongodb.port", 27017)
|
||||
val HOSTNAME = config.getString("akka.storage.mongodb.hostname", "127.0.0.1")
|
||||
val DBNAME = config.getString("akka.storage.mongodb.dbname", "testdb")
|
||||
val PORT = config.getInt("akka.storage.mongodb.port", 27017)
|
||||
|
||||
val db = new Mongo(MONGODB_SERVER_HOSTNAME, MONGODB_SERVER_PORT)
|
||||
val coll = db.getDB(MONGODB_SERVER_DBNAME).getCollection(COLLECTION)
|
||||
val db: MongoDB = MongoConnection(HOSTNAME, PORT)(DBNAME)
|
||||
val coll: MongoCollection = db(COLLECTION)
|
||||
|
||||
private[this] val serializer = SJSON
|
||||
def drop() { db.dropDatabase() }
|
||||
|
||||
def insertMapStorageEntryFor(name: String, key: AnyRef, value: AnyRef) {
|
||||
def insertMapStorageEntryFor(name: String, key: Array[Byte], value: Array[Byte]) {
|
||||
insertMapStorageEntriesFor(name, List((key, value)))
|
||||
}
|
||||
|
||||
def insertMapStorageEntriesFor(name: String, entries: List[Tuple2[AnyRef, AnyRef]]) {
|
||||
import java.util.{Map, HashMap}
|
||||
|
||||
val m: Map[AnyRef, AnyRef] = new HashMap
|
||||
for ((k, v) <- entries) {
|
||||
m.put(k, serializer.out(v))
|
||||
}
|
||||
|
||||
nullSafeFindOne(name) match {
|
||||
case None =>
|
||||
coll.insert(new BasicDBObject().append(KEY, name).append(VALUE, m))
|
||||
case Some(dbo) => {
|
||||
// collate the maps
|
||||
val o = dbo.get(VALUE).asInstanceOf[Map[AnyRef, AnyRef]]
|
||||
o.putAll(m)
|
||||
|
||||
val newdbo = new BasicDBObject().append(KEY, name).append(VALUE, o)
|
||||
coll.update(new BasicDBObject().append(KEY, name), newdbo, true, false)
|
||||
}
|
||||
def insertMapStorageEntriesFor(name: String, entries: List[(Array[Byte], Array[Byte])]) {
|
||||
val q: DBObject = MongoDBObject(KEY -> name)
|
||||
coll.findOne(q) match {
|
||||
case Some(dbo) =>
|
||||
entries.foreach { case (k, v) => dbo += new String(k) -> v }
|
||||
coll.update(q, dbo, true, false)
|
||||
case None =>
|
||||
val builder = MongoDBObject.newBuilder
|
||||
builder += KEY -> name
|
||||
entries.foreach { case (k, v) => builder += new String(k) -> v }
|
||||
coll += builder.result.asDBObject
|
||||
}
|
||||
}
|
||||
|
||||
def removeMapStorageFor(name: String): Unit = {
|
||||
val q = new BasicDBObject
|
||||
q.put(KEY, name)
|
||||
val q: DBObject = MongoDBObject(KEY -> name)
|
||||
coll.remove(q)
|
||||
}
|
||||
|
||||
def removeMapStorageFor(name: String, key: AnyRef): Unit = {
|
||||
nullSafeFindOne(name) match {
|
||||
case None =>
|
||||
case Some(dbo) => {
|
||||
val orig = dbo.get(VALUE).asInstanceOf[DBObject].toMap
|
||||
if (key.isInstanceOf[List[_]]) {
|
||||
val keys = key.asInstanceOf[List[_]]
|
||||
keys.foreach(k => orig.remove(k.asInstanceOf[String]))
|
||||
} else {
|
||||
orig.remove(key.asInstanceOf[String])
|
||||
}
|
||||
|
||||
// remove existing reference
|
||||
removeMapStorageFor(name)
|
||||
// and insert
|
||||
coll.insert(new BasicDBObject().append(KEY, name).append(VALUE, orig))
|
||||
}
|
||||
}
|
||||
private def queryFor[T](name: String)(body: (MongoDBObject, MongoDBObject) => T): T = {
|
||||
val q: DBObject = MongoDBObject(KEY -> name)
|
||||
val dbo = coll.findOne(q).getOrElse { throw new NoSuchElementException(name + " not present") }
|
||||
body(q, dbo)
|
||||
}
|
||||
|
||||
def getMapStorageEntryFor(name: String, key: AnyRef): Option[AnyRef] =
|
||||
getValueForKey(name, key.asInstanceOf[String])
|
||||
|
||||
def getMapStorageSizeFor(name: String): Int = {
|
||||
nullSafeFindOne(name) match {
|
||||
case None => 0
|
||||
case Some(dbo) =>
|
||||
dbo.get(VALUE).asInstanceOf[JMap[String, AnyRef]].keySet.size
|
||||
}
|
||||
def removeMapStorageFor(name: String, key: Array[Byte]): Unit = queryFor(name) { (q, dbo) =>
|
||||
dbo -= new String(key)
|
||||
coll.update(q, dbo, true, false)
|
||||
}
|
||||
|
||||
def getMapStorageFor(name: String): List[Tuple2[AnyRef, AnyRef]] = {
|
||||
val m =
|
||||
nullSafeFindOne(name) match {
|
||||
case None =>
|
||||
throw new NoSuchElementException(name + " not present")
|
||||
case Some(dbo) =>
|
||||
dbo.get(VALUE).asInstanceOf[JMap[String, AnyRef]]
|
||||
}
|
||||
val n =
|
||||
List(m.keySet.toArray: _*).asInstanceOf[List[String]]
|
||||
val vals =
|
||||
for(s <- n)
|
||||
yield (s, serializer.in[AnyRef](m.get(s).asInstanceOf[Array[Byte]]))
|
||||
vals.asInstanceOf[List[Tuple2[String, AnyRef]]]
|
||||
def getMapStorageEntryFor(name: String, key: Array[Byte]): Option[Array[Byte]] = queryFor(name) { (q, dbo) =>
|
||||
dbo.get(new String(key)).asInstanceOf[Option[Array[Byte]]]
|
||||
}
|
||||
|
||||
def getMapStorageRangeFor(name: String, start: Option[AnyRef],
|
||||
finish: Option[AnyRef],
|
||||
count: Int): List[Tuple2[AnyRef, AnyRef]] = {
|
||||
val m =
|
||||
nullSafeFindOne(name) match {
|
||||
case None =>
|
||||
throw new NoSuchElementException(name + " not present")
|
||||
case Some(dbo) =>
|
||||
dbo.get(VALUE).asInstanceOf[JMap[String, AnyRef]]
|
||||
}
|
||||
|
||||
/**
|
||||
* <tt>count</tt> is the max number of results to return. Start with
|
||||
* <tt>start</tt> or 0 (if <tt>start</tt> is not defined) and go until
|
||||
* you hit <tt>finish</tt> or <tt>count</tt>.
|
||||
*/
|
||||
val s = if (start.isDefined) start.get.asInstanceOf[Int] else 0
|
||||
val cnt =
|
||||
if (finish.isDefined) {
|
||||
val f = finish.get.asInstanceOf[Int]
|
||||
if (f >= s) math.min(count, (f - s)) else count
|
||||
}
|
||||
else count
|
||||
|
||||
val n =
|
||||
List(m.keySet.toArray: _*).asInstanceOf[List[String]].sortWith((e1, e2) => (e1 compareTo e2) < 0).slice(s, s + cnt)
|
||||
val vals =
|
||||
for(s <- n)
|
||||
yield (s, serializer.in[AnyRef](m.get(s).asInstanceOf[Array[Byte]]))
|
||||
vals.asInstanceOf[List[Tuple2[String, AnyRef]]]
|
||||
def getMapStorageSizeFor(name: String): Int = queryFor(name) { (q, dbo) =>
|
||||
dbo.size - 2 // need to exclude object id and our KEY
|
||||
}
|
||||
|
||||
private def getValueForKey(name: String, key: String): Option[AnyRef] = {
|
||||
try {
|
||||
nullSafeFindOne(name) match {
|
||||
case None => None
|
||||
case Some(dbo) =>
|
||||
Some(serializer.in[AnyRef](
|
||||
dbo.get(VALUE)
|
||||
.asInstanceOf[JMap[String, AnyRef]]
|
||||
.get(key).asInstanceOf[Array[Byte]]))
|
||||
}
|
||||
} catch {
|
||||
case e =>
|
||||
throw new NoSuchElementException(e.getMessage)
|
||||
}
|
||||
def getMapStorageFor(name: String): List[(Array[Byte], Array[Byte])] = queryFor(name) { (q, dbo) =>
|
||||
for {
|
||||
(k, v) <- dbo.toList
|
||||
if k != "_id" && k != KEY
|
||||
} yield (k.getBytes, v.asInstanceOf[Array[Byte]])
|
||||
}
|
||||
|
||||
def insertVectorStorageEntriesFor(name: String, elements: List[AnyRef]) = {
|
||||
val q = new BasicDBObject
|
||||
q.put(KEY, name)
|
||||
def getMapStorageRangeFor(name: String, start: Option[Array[Byte]],
|
||||
finish: Option[Array[Byte]],
|
||||
count: Int): List[(Array[Byte], Array[Byte])] = queryFor(name) { (q, dbo) =>
|
||||
// get all keys except the special ones
|
||||
val keys =
|
||||
dbo.keySet
|
||||
.toList
|
||||
.filter(k => k != "_id" && k != KEY)
|
||||
.sortWith(_ < _)
|
||||
|
||||
val currentList =
|
||||
coll.findOneNS(q) match {
|
||||
case None =>
|
||||
new JArrayList[AnyRef]
|
||||
case Some(dbo) =>
|
||||
dbo.get(VALUE).asInstanceOf[JArrayList[AnyRef]]
|
||||
}
|
||||
if (!currentList.isEmpty) {
|
||||
// record exists
|
||||
// remove before adding
|
||||
coll.remove(q)
|
||||
}
|
||||
// if the supplied start is not defined, get the head of keys
|
||||
val s = start.map(new String(_)).getOrElse(keys.head)
|
||||
|
||||
// add to the current list
|
||||
elements.map(serializer.out(_)).foreach(currentList.add(_))
|
||||
// if the supplied finish is not defined, get the last element of keys
|
||||
val f = finish.map(new String(_)).getOrElse(keys.last)
|
||||
|
||||
coll.insert(
|
||||
new BasicDBObject()
|
||||
.append(KEY, name)
|
||||
.append(VALUE, currentList)
|
||||
)
|
||||
// slice from keys: both ends inclusive
|
||||
val ks = keys.slice(keys.indexOf(s), scala.math.min(count, keys.indexOf(f) + 1))
|
||||
ks.map(k => (k.getBytes, dbo.get(k).get.asInstanceOf[Array[Byte]]))
|
||||
}
|
||||
|
||||
def insertVectorStorageEntryFor(name: String, element: AnyRef) = {
|
||||
def insertVectorStorageEntryFor(name: String, element: Array[Byte]) = {
|
||||
insertVectorStorageEntriesFor(name, List(element))
|
||||
}
|
||||
|
||||
def getVectorStorageEntryFor(name: String, index: Int): AnyRef = {
|
||||
try {
|
||||
val o =
|
||||
nullSafeFindOne(name) match {
|
||||
case None =>
|
||||
throw new NoSuchElementException(name + " not present")
|
||||
def insertVectorStorageEntriesFor(name: String, elements: List[Array[Byte]]) = {
|
||||
// lookup with name
|
||||
val q: DBObject = MongoDBObject(KEY -> name)
|
||||
|
||||
case Some(dbo) =>
|
||||
dbo.get(VALUE).asInstanceOf[JList[AnyRef]]
|
||||
coll.findOne(q) match {
|
||||
// exists : need to update
|
||||
case Some(dbo) =>
|
||||
dbo -= KEY
|
||||
dbo -= "_id"
|
||||
val listBuilder = MongoDBList.newBuilder
|
||||
|
||||
// expensive!
|
||||
listBuilder ++= (elements ++ dbo.toSeq.sortWith((e1, e2) => (e1._1.toInt < e2._1.toInt)).map(_._2))
|
||||
|
||||
val builder = MongoDBObject.newBuilder
|
||||
builder += KEY -> name
|
||||
builder ++= listBuilder.result
|
||||
coll.update(q, builder.result.asDBObject, true, false)
|
||||
|
||||
// new : just add
|
||||
case None =>
|
||||
val listBuilder = MongoDBList.newBuilder
|
||||
listBuilder ++= elements
|
||||
|
||||
val builder = MongoDBObject.newBuilder
|
||||
builder += KEY -> name
|
||||
builder ++= listBuilder.result
|
||||
coll += builder.result.asDBObject
|
||||
}
|
||||
}
|
||||
|
||||
def updateVectorStorageEntryFor(name: String, index: Int, elem: Array[Byte]) = queryFor(name) { (q, dbo) =>
|
||||
dbo += ((index.toString, elem))
|
||||
coll.update(q, dbo, true, false)
|
||||
}
|
||||
|
||||
def getVectorStorageEntryFor(name: String, index: Int): Array[Byte] = queryFor(name) { (q, dbo) =>
|
||||
dbo(index.toString).asInstanceOf[Array[Byte]]
|
||||
}
|
||||
|
||||
/**
|
||||
* if <tt>start</tt> and <tt>finish</tt> both are defined, ignore <tt>count</tt> and
|
||||
* report the range [start, finish)
|
||||
* if <tt>start</tt> is not defined, assume <tt>start</tt> = 0
|
||||
* if <tt>start</tt> == 0 and <tt>finish</tt> == 0, return an empty collection
|
||||
*/
|
||||
def getVectorStorageRangeFor(name: String, start: Option[Int], finish: Option[Int], count: Int): List[Array[Byte]] = queryFor(name) { (q, dbo) =>
|
||||
val ls = dbo.filter { case (k, v) => k != KEY && k != "_id" }
|
||||
.toSeq
|
||||
.sortWith((e1, e2) => (e1._1.toInt < e2._1.toInt))
|
||||
.map(_._2)
|
||||
|
||||
val st = start.getOrElse(0)
|
||||
val cnt =
|
||||
if (finish.isDefined) {
|
||||
val f = finish.get
|
||||
if (f >= st) (f - st) else count
|
||||
}
|
||||
serializer.in[AnyRef](
|
||||
o.get(index).asInstanceOf[Array[Byte]])
|
||||
} catch {
|
||||
case e =>
|
||||
throw new NoSuchElementException(e.getMessage)
|
||||
else count
|
||||
if (st == 0 && cnt == 0) List()
|
||||
ls.slice(st, st + cnt).asInstanceOf[List[Array[Byte]]]
|
||||
}
|
||||
|
||||
def getVectorStorageSizeFor(name: String): Int = queryFor(name) { (q, dbo) =>
|
||||
dbo.size - 2
|
||||
}
|
||||
|
||||
def insertRefStorageFor(name: String, element: Array[Byte]) = {
|
||||
// lookup with name
|
||||
val q: DBObject = MongoDBObject(KEY -> name)
|
||||
|
||||
coll.findOne(q) match {
|
||||
// exists : need to update
|
||||
case Some(dbo) =>
|
||||
dbo += ((REF, element))
|
||||
coll.update(q, dbo, true, false)
|
||||
|
||||
// not found : make one
|
||||
case None =>
|
||||
val builder = MongoDBObject.newBuilder
|
||||
builder += KEY -> name
|
||||
builder += REF -> element
|
||||
coll += builder.result.asDBObject
|
||||
}
|
||||
}
|
||||
|
||||
def getVectorStorageRangeFor(name: String,
|
||||
start: Option[Int], finish: Option[Int], count: Int): List[AnyRef] = {
|
||||
try {
|
||||
val o =
|
||||
nullSafeFindOne(name) match {
|
||||
case None =>
|
||||
throw new NoSuchElementException(name + " not present")
|
||||
|
||||
case Some(dbo) =>
|
||||
dbo.get(VALUE).asInstanceOf[JList[AnyRef]]
|
||||
}
|
||||
|
||||
val s = if (start.isDefined) start.get else 0
|
||||
val cnt =
|
||||
if (finish.isDefined) {
|
||||
val f = finish.get
|
||||
if (f >= s) (f - s) else count
|
||||
}
|
||||
else count
|
||||
|
||||
// pick the subrange and make a Scala list
|
||||
val l =
|
||||
List(o.subList(s, s + cnt).toArray: _*)
|
||||
|
||||
for(e <- l)
|
||||
yield serializer.in[AnyRef](e.asInstanceOf[Array[Byte]])
|
||||
} catch {
|
||||
case e =>
|
||||
throw new NoSuchElementException(e.getMessage)
|
||||
}
|
||||
}
|
||||
|
||||
def updateVectorStorageEntryFor(name: String, index: Int, elem: AnyRef) = {
|
||||
val q = new BasicDBObject
|
||||
q.put(KEY, name)
|
||||
|
||||
val dbobj =
|
||||
coll.findOneNS(q) match {
|
||||
case None =>
|
||||
throw new NoSuchElementException(name + " not present")
|
||||
case Some(dbo) => dbo
|
||||
}
|
||||
val currentList = dbobj.get(VALUE).asInstanceOf[JArrayList[AnyRef]]
|
||||
currentList.set(index, serializer.out(elem))
|
||||
coll.update(q,
|
||||
new BasicDBObject().append(KEY, name).append(VALUE, currentList))
|
||||
}
|
||||
|
||||
def getVectorStorageSizeFor(name: String): Int = {
|
||||
nullSafeFindOne(name) match {
|
||||
case None => 0
|
||||
case Some(dbo) =>
|
||||
dbo.get(VALUE).asInstanceOf[JList[AnyRef]].size
|
||||
}
|
||||
}
|
||||
|
||||
private def nullSafeFindOne(name: String): Option[DBObject] = {
|
||||
val o = new BasicDBObject
|
||||
o.put(KEY, name)
|
||||
coll.findOneNS(o)
|
||||
}
|
||||
|
||||
def insertRefStorageFor(name: String, element: AnyRef) = {
|
||||
nullSafeFindOne(name) match {
|
||||
case None =>
|
||||
case Some(dbo) => {
|
||||
val q = new BasicDBObject
|
||||
q.put(KEY, name)
|
||||
coll.remove(q)
|
||||
}
|
||||
}
|
||||
coll.insert(
|
||||
new BasicDBObject()
|
||||
.append(KEY, name)
|
||||
.append(VALUE, serializer.out(element)))
|
||||
}
|
||||
|
||||
def getRefStorageFor(name: String): Option[AnyRef] = {
|
||||
nullSafeFindOne(name) match {
|
||||
case None => None
|
||||
case Some(dbo) =>
|
||||
Some(serializer.in[AnyRef](dbo.get(VALUE).asInstanceOf[Array[Byte]]))
|
||||
def getRefStorageFor(name: String): Option[Array[Byte]] = try {
|
||||
queryFor(name) { (q, dbo) =>
|
||||
dbo.get(REF).asInstanceOf[Option[Array[Byte]]]
|
||||
}
|
||||
} catch {
|
||||
case e: java.util.NoSuchElementException => None
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,32 +1,19 @@
|
|||
package se.scalablesolutions.akka.persistence.mongo
|
||||
|
||||
import org.junit.{Test, Before}
|
||||
import org.junit.Assert._
|
||||
import org.scalatest.junit.JUnitSuite
|
||||
|
||||
import _root_.dispatch.json.{JsNumber, JsValue}
|
||||
import _root_.dispatch.json.Js._
|
||||
import org.scalatest.Spec
|
||||
import org.scalatest.matchers.ShouldMatchers
|
||||
import org.scalatest.BeforeAndAfterEach
|
||||
import org.scalatest.junit.JUnitRunner
|
||||
import org.junit.runner.RunWith
|
||||
|
||||
import se.scalablesolutions.akka.actor.{Transactor, Actor, ActorRef}
|
||||
import Actor._
|
||||
|
||||
/**
|
||||
* A persistent actor based on MongoDB storage.
|
||||
* <p/>
|
||||
* Demonstrates a bank account operation consisting of messages that:
|
||||
* <li>checks balance <tt>Balance</tt></li>
|
||||
* <li>debits amount<tt>Debit</tt></li>
|
||||
* <li>debits multiple amounts<tt>MultiDebit</tt></li>
|
||||
* <li>credits amount<tt>Credit</tt></li>
|
||||
* <p/>
|
||||
* Needs a running Mongo server.
|
||||
* @author <a href="http://debasishg.blogspot.com">Debasish Ghosh</a>
|
||||
*/
|
||||
|
||||
case class Balance(accountNo: String)
|
||||
case class Debit(accountNo: String, amount: BigInt, failer: ActorRef)
|
||||
case class MultiDebit(accountNo: String, amounts: List[BigInt], failer: ActorRef)
|
||||
case class Credit(accountNo: String, amount: BigInt)
|
||||
case class Debit(accountNo: String, amount: Int, failer: ActorRef)
|
||||
case class MultiDebit(accountNo: String, amounts: List[Int], failer: ActorRef)
|
||||
case class Credit(accountNo: String, amount: Int)
|
||||
case class Log(start: Int, finish: Int)
|
||||
case object LogSize
|
||||
|
||||
|
|
@ -35,63 +22,65 @@ class BankAccountActor extends Transactor {
|
|||
private lazy val accountState = MongoStorage.newMap
|
||||
private lazy val txnLog = MongoStorage.newVector
|
||||
|
||||
import sjson.json.DefaultProtocol._
|
||||
import sjson.json.JsonSerialization._
|
||||
|
||||
def receive: Receive = {
|
||||
// check balance
|
||||
case Balance(accountNo) =>
|
||||
txnLog.add("Balance:" + accountNo)
|
||||
self.reply(accountState.get(accountNo).get)
|
||||
txnLog.add(("Balance:" + accountNo).getBytes)
|
||||
self.reply(
|
||||
accountState.get(accountNo.getBytes)
|
||||
.map(frombinary[Int](_))
|
||||
.getOrElse(0))
|
||||
|
||||
// debit amount: can fail
|
||||
case Debit(accountNo, amount, failer) =>
|
||||
txnLog.add("Debit:" + accountNo + " " + amount)
|
||||
txnLog.add(("Debit:" + accountNo + " " + amount).getBytes)
|
||||
val m = accountState.get(accountNo.getBytes)
|
||||
.map(frombinary[Int](_))
|
||||
.getOrElse(0)
|
||||
|
||||
accountState.put(accountNo.getBytes, tobinary(m - amount))
|
||||
if (amount > m) failer !! "Failure"
|
||||
|
||||
val m: BigInt =
|
||||
accountState.get(accountNo) match {
|
||||
case Some(JsNumber(n)) =>
|
||||
BigInt(n.asInstanceOf[BigDecimal].intValue)
|
||||
case None => 0
|
||||
}
|
||||
accountState.put(accountNo, (m - amount))
|
||||
if (amount > m)
|
||||
failer !! "Failure"
|
||||
self.reply(m - amount)
|
||||
|
||||
// many debits: can fail
|
||||
// demonstrates true rollback even if multiple puts have been done
|
||||
case MultiDebit(accountNo, amounts, failer) =>
|
||||
txnLog.add("MultiDebit:" + accountNo + " " + amounts.map(_.intValue).foldLeft(0)(_ + _))
|
||||
val sum = amounts.foldRight(0)(_ + _)
|
||||
txnLog.add(("MultiDebit:" + accountNo + " " + sum).getBytes)
|
||||
|
||||
val m: BigInt =
|
||||
accountState.get(accountNo) match {
|
||||
case Some(JsNumber(n)) => BigInt(n.toString)
|
||||
case None => 0
|
||||
val m = accountState.get(accountNo.getBytes)
|
||||
.map(frombinary[Int](_))
|
||||
.getOrElse(0)
|
||||
|
||||
var cbal = m
|
||||
amounts.foreach { amount =>
|
||||
accountState.put(accountNo.getBytes, tobinary(m - amount))
|
||||
cbal = cbal - amount
|
||||
if (cbal < 0) failer !! "Failure"
|
||||
}
|
||||
var bal: BigInt = 0
|
||||
amounts.foreach {amount =>
|
||||
bal = bal + amount
|
||||
accountState.put(accountNo, (m - bal))
|
||||
}
|
||||
if (bal > m) failer !! "Failure"
|
||||
self.reply(m - bal)
|
||||
|
||||
self.reply(m - sum)
|
||||
|
||||
// credit amount
|
||||
case Credit(accountNo, amount) =>
|
||||
txnLog.add("Credit:" + accountNo + " " + amount)
|
||||
txnLog.add(("Credit:" + accountNo + " " + amount).getBytes)
|
||||
val m = accountState.get(accountNo.getBytes)
|
||||
.map(frombinary[Int](_))
|
||||
.getOrElse(0)
|
||||
|
||||
accountState.put(accountNo.getBytes, tobinary(m + amount))
|
||||
|
||||
val m: BigInt =
|
||||
accountState.get(accountNo) match {
|
||||
case Some(JsNumber(n)) =>
|
||||
BigInt(n.asInstanceOf[BigDecimal].intValue)
|
||||
case None => 0
|
||||
}
|
||||
accountState.put(accountNo, (m + amount))
|
||||
self.reply(m + amount)
|
||||
|
||||
case LogSize =>
|
||||
self.reply(txnLog.length.asInstanceOf[AnyRef])
|
||||
self.reply(txnLog.length)
|
||||
|
||||
case Log(start, finish) =>
|
||||
self.reply(txnLog.slice(start, finish))
|
||||
self.reply(txnLog.slice(start, finish).map(new String(_)))
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -102,82 +91,71 @@ class BankAccountActor extends Transactor {
|
|||
}
|
||||
}
|
||||
|
||||
class MongoPersistentActorSpec extends JUnitSuite {
|
||||
@Test
|
||||
def testSuccessfulDebit = {
|
||||
val bactor = actorOf[BankAccountActor]
|
||||
bactor.start
|
||||
val failer = actorOf[PersistentFailerActor]
|
||||
failer.start
|
||||
bactor !! Credit("a-123", 5000)
|
||||
bactor !! Debit("a-123", 3000, failer)
|
||||
@RunWith(classOf[JUnitRunner])
|
||||
class MongoPersistentActorSpec extends
|
||||
Spec with
|
||||
ShouldMatchers with
|
||||
BeforeAndAfterEach {
|
||||
|
||||
val JsNumber(b) = (bactor !! Balance("a-123")).get.asInstanceOf[JsValue]
|
||||
assertEquals(BigInt(2000), BigInt(b.intValue))
|
||||
|
||||
bactor !! Credit("a-123", 7000)
|
||||
|
||||
val JsNumber(b1) = (bactor !! Balance("a-123")).get.asInstanceOf[JsValue]
|
||||
assertEquals(BigInt(9000), BigInt(b1.intValue))
|
||||
|
||||
bactor !! Debit("a-123", 8000, failer)
|
||||
|
||||
val JsNumber(b2) = (bactor !! Balance("a-123")).get.asInstanceOf[JsValue]
|
||||
assertEquals(BigInt(1000), BigInt(b2.intValue))
|
||||
|
||||
assert(7 == (bactor !! LogSize).get.asInstanceOf[Int])
|
||||
|
||||
import scala.collection.mutable.ArrayBuffer
|
||||
assert((bactor !! Log(0, 7)).get.asInstanceOf[ArrayBuffer[String]].size == 7)
|
||||
assert((bactor !! Log(0, 0)).get.asInstanceOf[ArrayBuffer[String]].size == 0)
|
||||
assert((bactor !! Log(1, 2)).get.asInstanceOf[ArrayBuffer[String]].size == 1)
|
||||
assert((bactor !! Log(6, 7)).get.asInstanceOf[ArrayBuffer[String]].size == 1)
|
||||
assert((bactor !! Log(0, 1)).get.asInstanceOf[ArrayBuffer[String]].size == 1)
|
||||
override def beforeEach {
|
||||
MongoStorageBackend.drop
|
||||
}
|
||||
|
||||
@Test
|
||||
def testUnsuccessfulDebit = {
|
||||
val bactor = actorOf[BankAccountActor]
|
||||
bactor.start
|
||||
bactor !! Credit("a-123", 5000)
|
||||
|
||||
val JsNumber(b) = (bactor !! Balance("a-123")).get.asInstanceOf[JsValue]
|
||||
assertEquals(BigInt(5000), BigInt(b.intValue))
|
||||
|
||||
val failer = actorOf[PersistentFailerActor]
|
||||
failer.start
|
||||
try {
|
||||
bactor !! Debit("a-123", 7000, failer)
|
||||
fail("should throw exception")
|
||||
} catch { case e: RuntimeException => {}}
|
||||
|
||||
val JsNumber(b1) = (bactor !! Balance("a-123")).get.asInstanceOf[JsValue]
|
||||
assertEquals(BigInt(5000), BigInt(b1.intValue))
|
||||
|
||||
// should not count the failed one
|
||||
assert(3 == (bactor !! LogSize).get.asInstanceOf[Int])
|
||||
override def afterEach {
|
||||
MongoStorageBackend.drop
|
||||
}
|
||||
|
||||
@Test
|
||||
def testUnsuccessfulMultiDebit = {
|
||||
val bactor = actorOf[BankAccountActor]
|
||||
bactor.start
|
||||
bactor !! Credit("a-123", 5000)
|
||||
describe("successful debit") {
|
||||
it("should debit successfully") {
|
||||
val bactor = actorOf[BankAccountActor]
|
||||
bactor.start
|
||||
val failer = actorOf[PersistentFailerActor]
|
||||
failer.start
|
||||
bactor !! Credit("a-123", 5000)
|
||||
bactor !! Debit("a-123", 3000, failer)
|
||||
|
||||
val JsNumber(b) = (bactor !! Balance("a-123")).get.asInstanceOf[JsValue]
|
||||
assertEquals(BigInt(5000), BigInt(b.intValue))
|
||||
(bactor !! Balance("a-123")).get.asInstanceOf[Int] should equal(2000)
|
||||
|
||||
val failer = actorOf[PersistentFailerActor]
|
||||
failer.start
|
||||
try {
|
||||
bactor !! MultiDebit("a-123", List(500, 2000, 1000, 3000), failer)
|
||||
fail("should throw exception")
|
||||
} catch { case e: RuntimeException => {}}
|
||||
bactor !! Credit("a-123", 7000)
|
||||
(bactor !! Balance("a-123")).get.asInstanceOf[Int] should equal(9000)
|
||||
|
||||
val JsNumber(b1) = (bactor !! Balance("a-123")).get.asInstanceOf[JsValue]
|
||||
assertEquals(BigInt(5000), BigInt(b1.intValue))
|
||||
bactor !! Debit("a-123", 8000, failer)
|
||||
(bactor !! Balance("a-123")).get.asInstanceOf[Int] should equal(1000)
|
||||
|
||||
// should not count the failed one
|
||||
assert(3 == (bactor !! LogSize).get.asInstanceOf[Int])
|
||||
(bactor !! LogSize).get.asInstanceOf[Int] should equal(7)
|
||||
(bactor !! Log(0, 7)).get.asInstanceOf[Iterable[String]].size should equal(7)
|
||||
}
|
||||
}
|
||||
|
||||
describe("unsuccessful debit") {
|
||||
it("debit should fail") {
|
||||
val bactor = actorOf[BankAccountActor]
|
||||
bactor.start
|
||||
val failer = actorOf[PersistentFailerActor]
|
||||
failer.start
|
||||
bactor !! Credit("a-123", 5000)
|
||||
(bactor !! Balance("a-123")).get.asInstanceOf[Int] should equal(5000)
|
||||
evaluating {
|
||||
bactor !! Debit("a-123", 7000, failer)
|
||||
} should produce [Exception]
|
||||
(bactor !! Balance("a-123")).get.asInstanceOf[Int] should equal(5000)
|
||||
(bactor !! LogSize).get.asInstanceOf[Int] should equal(3)
|
||||
}
|
||||
}
|
||||
|
||||
describe("unsuccessful multidebit") {
|
||||
it("multidebit should fail") {
|
||||
val bactor = actorOf[BankAccountActor]
|
||||
bactor.start
|
||||
val failer = actorOf[PersistentFailerActor]
|
||||
failer.start
|
||||
bactor !! Credit("a-123", 5000)
|
||||
(bactor !! Balance("a-123")).get.asInstanceOf[Int] should equal(5000)
|
||||
evaluating {
|
||||
bactor !! MultiDebit("a-123", List(1000, 2000, 4000), failer)
|
||||
} should produce [Exception]
|
||||
(bactor !! Balance("a-123")).get.asInstanceOf[Int] should equal(5000)
|
||||
(bactor !! LogSize).get.asInstanceOf[Int] should equal(3)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,364 +1,158 @@
|
|||
package se.scalablesolutions.akka.persistence.mongo
|
||||
|
||||
import org.junit.{Test, Before}
|
||||
import org.junit.Assert._
|
||||
import org.scalatest.junit.JUnitSuite
|
||||
import _root_.dispatch.json._
|
||||
import _root_.dispatch.json.Js._
|
||||
import org.scalatest.Spec
|
||||
import org.scalatest.matchers.ShouldMatchers
|
||||
import org.scalatest.BeforeAndAfterEach
|
||||
import org.scalatest.junit.JUnitRunner
|
||||
import org.junit.runner.RunWith
|
||||
import java.util.NoSuchElementException
|
||||
|
||||
@scala.reflect.BeanInfo case class Foo(no: Int, name: String)
|
||||
class MongoStorageSpec extends JUnitSuite {
|
||||
@RunWith(classOf[JUnitRunner])
|
||||
class MongoStorageSpec extends
|
||||
Spec with
|
||||
ShouldMatchers with
|
||||
BeforeAndAfterEach {
|
||||
|
||||
val changeSetV = new scala.collection.mutable.ArrayBuffer[AnyRef]
|
||||
val changeSetM = new scala.collection.mutable.HashMap[AnyRef, AnyRef]
|
||||
|
||||
@Before def initialize() = {
|
||||
MongoStorageBackend.coll.drop
|
||||
override def beforeEach {
|
||||
MongoStorageBackend.drop
|
||||
}
|
||||
|
||||
@Test
|
||||
def testVectorInsertForTransactionId = {
|
||||
changeSetV += "debasish" // string
|
||||
changeSetV += List(1, 2, 3) // Scala List
|
||||
changeSetV += List(100, 200)
|
||||
MongoStorageBackend.insertVectorStorageEntriesFor("U-A1", changeSetV.toList)
|
||||
assertEquals(
|
||||
3,
|
||||
MongoStorageBackend.getVectorStorageSizeFor("U-A1"))
|
||||
changeSetV.clear
|
||||
|
||||
// changeSetV should be reinitialized
|
||||
changeSetV += List(12, 23, 45)
|
||||
changeSetV += "maulindu"
|
||||
MongoStorageBackend.insertVectorStorageEntriesFor("U-A1", changeSetV.toList)
|
||||
assertEquals(
|
||||
5,
|
||||
MongoStorageBackend.getVectorStorageSizeFor("U-A1"))
|
||||
|
||||
// add more to the same changeSetV
|
||||
changeSetV += "ramanendu"
|
||||
changeSetV += Map(1 -> "dg", 2 -> "mc")
|
||||
|
||||
// add for a diff transaction
|
||||
MongoStorageBackend.insertVectorStorageEntriesFor("U-A2", changeSetV.toList)
|
||||
assertEquals(
|
||||
4,
|
||||
MongoStorageBackend.getVectorStorageSizeFor("U-A2"))
|
||||
|
||||
// previous transaction change set should remain same
|
||||
assertEquals(
|
||||
5,
|
||||
MongoStorageBackend.getVectorStorageSizeFor("U-A1"))
|
||||
|
||||
// test single element entry
|
||||
MongoStorageBackend.insertVectorStorageEntryFor("U-A1", Map(1->1, 2->4, 3->9))
|
||||
assertEquals(
|
||||
6,
|
||||
MongoStorageBackend.getVectorStorageSizeFor("U-A1"))
|
||||
override def afterEach {
|
||||
MongoStorageBackend.drop
|
||||
}
|
||||
|
||||
@Test
|
||||
def testVectorFetchForKeys = {
|
||||
describe("persistent maps") {
|
||||
it("should insert with single key and value") {
|
||||
import MongoStorageBackend._
|
||||
|
||||
// initially everything 0
|
||||
assertEquals(
|
||||
0,
|
||||
MongoStorageBackend.getVectorStorageSizeFor("U-A2"))
|
||||
|
||||
assertEquals(
|
||||
0,
|
||||
MongoStorageBackend.getVectorStorageSizeFor("U-A1"))
|
||||
|
||||
// get some stuff
|
||||
changeSetV += "debasish"
|
||||
changeSetV += List(BigDecimal(12), BigDecimal(13), BigDecimal(14))
|
||||
MongoStorageBackend.insertVectorStorageEntriesFor("U-A1", changeSetV.toList)
|
||||
|
||||
assertEquals(
|
||||
2,
|
||||
MongoStorageBackend.getVectorStorageSizeFor("U-A1"))
|
||||
|
||||
val JsString(str) = MongoStorageBackend.getVectorStorageEntryFor("U-A1", 0).asInstanceOf[JsString]
|
||||
assertEquals("debasish", str)
|
||||
|
||||
val l = MongoStorageBackend.getVectorStorageEntryFor("U-A1", 1).asInstanceOf[JsValue]
|
||||
val num_list = list ! num
|
||||
val num_list(l0) = l
|
||||
assertEquals(List(12, 13, 14), l0)
|
||||
|
||||
changeSetV.clear
|
||||
changeSetV += Map(1->1, 2->4, 3->9)
|
||||
changeSetV += BigInt(2310)
|
||||
changeSetV += List(100, 200, 300)
|
||||
MongoStorageBackend.insertVectorStorageEntriesFor("U-A1", changeSetV.toList)
|
||||
|
||||
assertEquals(
|
||||
5,
|
||||
MongoStorageBackend.getVectorStorageSizeFor("U-A1"))
|
||||
|
||||
val r =
|
||||
MongoStorageBackend.getVectorStorageRangeFor("U-A1", Some(1), None, 3)
|
||||
|
||||
assertEquals(3, r.size)
|
||||
val lr = r(0).asInstanceOf[JsValue]
|
||||
val num_list(l1) = lr
|
||||
assertEquals(List(12, 13, 14), l1)
|
||||
}
|
||||
|
||||
@Test
|
||||
def testVectorFetchForNonExistentKeys = {
|
||||
try {
|
||||
MongoStorageBackend.getVectorStorageEntryFor("U-A1", 1)
|
||||
fail("should throw an exception")
|
||||
} catch {case e: NoSuchElementException => {}}
|
||||
|
||||
try {
|
||||
MongoStorageBackend.getVectorStorageRangeFor("U-A1", Some(2), None, 12)
|
||||
fail("should throw an exception")
|
||||
} catch {case e: NoSuchElementException => {}}
|
||||
}
|
||||
|
||||
@Test
|
||||
def testVectorUpdateForTransactionId = {
|
||||
import MongoStorageBackend._
|
||||
|
||||
changeSetV += "debasish" // string
|
||||
changeSetV += List(1, 2, 3) // Scala List
|
||||
changeSetV += List(100, 200)
|
||||
|
||||
insertVectorStorageEntriesFor("U-A1", changeSetV.toList)
|
||||
assertEquals(3, getVectorStorageSizeFor("U-A1"))
|
||||
updateVectorStorageEntryFor("U-A1", 0, "maulindu")
|
||||
val JsString(str) = getVectorStorageEntryFor("U-A1", 0).asInstanceOf[JsString]
|
||||
assertEquals("maulindu", str)
|
||||
|
||||
updateVectorStorageEntryFor("U-A1", 1, Map("1"->"dg", "2"->"mc"))
|
||||
val JsObject(m) = getVectorStorageEntryFor("U-A1", 1).asInstanceOf[JsObject]
|
||||
assertEquals(m.keySet.size, 2)
|
||||
}
|
||||
|
||||
@Test
|
||||
def testMapInsertForTransactionId = {
|
||||
fillMap
|
||||
|
||||
// add some more to changeSet
|
||||
changeSetM += "5" -> Foo(12, "dg")
|
||||
changeSetM += "6" -> java.util.Calendar.getInstance.getTime
|
||||
|
||||
// insert all into Mongo
|
||||
MongoStorageBackend.insertMapStorageEntriesFor("U-M1", changeSetM.toList)
|
||||
assertEquals(
|
||||
6,
|
||||
MongoStorageBackend.getMapStorageSizeFor("U-M1"))
|
||||
|
||||
// individual insert api
|
||||
MongoStorageBackend.insertMapStorageEntryFor("U-M1", "7", "akka")
|
||||
MongoStorageBackend.insertMapStorageEntryFor("U-M1", "8", List(23, 25))
|
||||
assertEquals(
|
||||
8,
|
||||
MongoStorageBackend.getMapStorageSizeFor("U-M1"))
|
||||
|
||||
// add the same changeSet for another transaction
|
||||
MongoStorageBackend.insertMapStorageEntriesFor("U-M2", changeSetM.toList)
|
||||
assertEquals(
|
||||
6,
|
||||
MongoStorageBackend.getMapStorageSizeFor("U-M2"))
|
||||
|
||||
// the first transaction should remain the same
|
||||
assertEquals(
|
||||
8,
|
||||
MongoStorageBackend.getMapStorageSizeFor("U-M1"))
|
||||
changeSetM.clear
|
||||
}
|
||||
|
||||
@Test
|
||||
def testMapContents = {
|
||||
fillMap
|
||||
MongoStorageBackend.insertMapStorageEntriesFor("U-M1", changeSetM.toList)
|
||||
MongoStorageBackend.getMapStorageEntryFor("U-M1", "2") match {
|
||||
case Some(x) => {
|
||||
val JsString(str) = x.asInstanceOf[JsValue]
|
||||
assertEquals("peter", str)
|
||||
}
|
||||
case None => fail("should fetch peter")
|
||||
}
|
||||
MongoStorageBackend.getMapStorageEntryFor("U-M1", "4") match {
|
||||
case Some(x) => {
|
||||
val num_list = list ! num
|
||||
val num_list(l0) = x.asInstanceOf[JsValue]
|
||||
assertEquals(3, l0.size)
|
||||
}
|
||||
case None => fail("should fetch list")
|
||||
}
|
||||
MongoStorageBackend.getMapStorageEntryFor("U-M1", "3") match {
|
||||
case Some(x) => {
|
||||
val num_list = list ! num
|
||||
val num_list(l0) = x.asInstanceOf[JsValue]
|
||||
assertEquals(2, l0.size)
|
||||
}
|
||||
case None => fail("should fetch list")
|
||||
insertMapStorageEntryFor("t1", "odersky".getBytes, "scala".getBytes)
|
||||
insertMapStorageEntryFor("t1", "gosling".getBytes, "java".getBytes)
|
||||
insertMapStorageEntryFor("t1", "stroustrup".getBytes, "c++".getBytes)
|
||||
getMapStorageSizeFor("t1") should equal(3)
|
||||
new String(getMapStorageEntryFor("t1", "odersky".getBytes).get) should equal("scala")
|
||||
new String(getMapStorageEntryFor("t1", "gosling".getBytes).get) should equal("java")
|
||||
new String(getMapStorageEntryFor("t1", "stroustrup".getBytes).get) should equal("c++")
|
||||
getMapStorageEntryFor("t1", "torvalds".getBytes) should equal(None)
|
||||
}
|
||||
|
||||
// get the entire map
|
||||
val l: List[Tuple2[AnyRef, AnyRef]] =
|
||||
MongoStorageBackend.getMapStorageFor("U-M1")
|
||||
it("should insert with multiple keys and values") {
|
||||
import MongoStorageBackend._
|
||||
|
||||
assertEquals(4, l.size)
|
||||
assertTrue(l.map(_._1).contains("1"))
|
||||
assertTrue(l.map(_._1).contains("2"))
|
||||
assertTrue(l.map(_._1).contains("3"))
|
||||
assertTrue(l.map(_._1).contains("4"))
|
||||
val l = List(("stroustrup", "c++"), ("odersky", "scala"), ("gosling", "java"))
|
||||
insertMapStorageEntriesFor("t1", l.map { case (k, v) => (k.getBytes, v.getBytes) })
|
||||
getMapStorageSizeFor("t1") should equal(3)
|
||||
new String(getMapStorageEntryFor("t1", "stroustrup".getBytes).get) should equal("c++")
|
||||
new String(getMapStorageEntryFor("t1", "gosling".getBytes).get) should equal("java")
|
||||
new String(getMapStorageEntryFor("t1", "odersky".getBytes).get) should equal("scala")
|
||||
getMapStorageEntryFor("t1", "torvalds".getBytes) should equal(None)
|
||||
|
||||
val JsString(str) = l.filter(_._1 == "2").head._2
|
||||
assertEquals(str, "peter")
|
||||
evaluating { getMapStorageEntryFor("t2", "torvalds".getBytes) } should produce [NoSuchElementException]
|
||||
|
||||
// trying to fetch for a non-existent transaction will throw
|
||||
try {
|
||||
MongoStorageBackend.getMapStorageFor("U-M2")
|
||||
fail("should throw an exception")
|
||||
} catch {case e: NoSuchElementException => {}}
|
||||
getMapStorageFor("t1").map { case (k, v) => (new String(k), new String(v)) } should equal (l)
|
||||
|
||||
changeSetM.clear
|
||||
}
|
||||
removeMapStorageFor("t1", "gosling".getBytes)
|
||||
getMapStorageSizeFor("t1") should equal(2)
|
||||
|
||||
@Test
|
||||
def testMapContentsByRange = {
|
||||
fillMap
|
||||
changeSetM += "5" -> Map(1 -> "dg", 2 -> "mc")
|
||||
MongoStorageBackend.insertMapStorageEntriesFor("U-M1", changeSetM.toList)
|
||||
|
||||
// specify start and count
|
||||
val l: List[Tuple2[AnyRef, AnyRef]] =
|
||||
MongoStorageBackend.getMapStorageRangeFor(
|
||||
"U-M1", Some(Integer.valueOf(2)), None, 3)
|
||||
|
||||
assertEquals(3, l.size)
|
||||
assertEquals("3", l(0)._1.asInstanceOf[String])
|
||||
val lst = l(0)._2.asInstanceOf[JsValue]
|
||||
val num_list = list ! num
|
||||
val num_list(l0) = lst
|
||||
assertEquals(List(100, 200), l0)
|
||||
assertEquals("4", l(1)._1.asInstanceOf[String])
|
||||
val ls = l(1)._2.asInstanceOf[JsValue]
|
||||
val num_list(l1) = ls
|
||||
assertEquals(List(10, 20, 30), l1)
|
||||
|
||||
// specify start, finish and count where finish - start == count
|
||||
assertEquals(3,
|
||||
MongoStorageBackend.getMapStorageRangeFor(
|
||||
"U-M1", Some(Integer.valueOf(2)), Some(Integer.valueOf(5)), 3).size)
|
||||
|
||||
// specify start, finish and count where finish - start > count
|
||||
assertEquals(3,
|
||||
MongoStorageBackend.getMapStorageRangeFor(
|
||||
"U-M1", Some(Integer.valueOf(2)), Some(Integer.valueOf(9)), 3).size)
|
||||
|
||||
// do not specify start or finish
|
||||
assertEquals(3,
|
||||
MongoStorageBackend.getMapStorageRangeFor(
|
||||
"U-M1", None, None, 3).size)
|
||||
|
||||
// specify finish and count
|
||||
assertEquals(3,
|
||||
MongoStorageBackend.getMapStorageRangeFor(
|
||||
"U-M1", None, Some(Integer.valueOf(3)), 3).size)
|
||||
|
||||
// specify start, finish and count where finish < start
|
||||
assertEquals(3,
|
||||
MongoStorageBackend.getMapStorageRangeFor(
|
||||
"U-M1", Some(Integer.valueOf(2)), Some(Integer.valueOf(1)), 3).size)
|
||||
|
||||
changeSetM.clear
|
||||
}
|
||||
|
||||
@Test
|
||||
def testMapStorageRemove = {
|
||||
fillMap
|
||||
changeSetM += "5" -> Map(1 -> "dg", 2 -> "mc")
|
||||
|
||||
MongoStorageBackend.insertMapStorageEntriesFor("U-M1", changeSetM.toList)
|
||||
assertEquals(5,
|
||||
MongoStorageBackend.getMapStorageSizeFor("U-M1"))
|
||||
|
||||
// remove key "3"
|
||||
MongoStorageBackend.removeMapStorageFor("U-M1", "3")
|
||||
assertEquals(4,
|
||||
MongoStorageBackend.getMapStorageSizeFor("U-M1"))
|
||||
|
||||
try {
|
||||
MongoStorageBackend.getMapStorageEntryFor("U-M1", "3")
|
||||
fail("should throw exception")
|
||||
} catch { case e => {}}
|
||||
|
||||
// remove key "4"
|
||||
MongoStorageBackend.removeMapStorageFor("U-M1", "4")
|
||||
assertEquals(3,
|
||||
MongoStorageBackend.getMapStorageSizeFor("U-M1"))
|
||||
|
||||
// remove key "2"
|
||||
MongoStorageBackend.removeMapStorageFor("U-M1", "2")
|
||||
assertEquals(2,
|
||||
MongoStorageBackend.getMapStorageSizeFor("U-M1"))
|
||||
|
||||
// remove the whole stuff
|
||||
MongoStorageBackend.removeMapStorageFor("U-M1")
|
||||
|
||||
try {
|
||||
MongoStorageBackend.getMapStorageFor("U-M1")
|
||||
fail("should throw exception")
|
||||
} catch { case e: NoSuchElementException => {}}
|
||||
|
||||
changeSetM.clear
|
||||
}
|
||||
|
||||
private def fillMap = {
|
||||
changeSetM += "1" -> "john"
|
||||
changeSetM += "2" -> "peter"
|
||||
changeSetM += "3" -> List(100, 200)
|
||||
changeSetM += "4" -> List(10, 20, 30)
|
||||
changeSetM
|
||||
}
|
||||
|
||||
@Test
|
||||
def testRefStorage = {
|
||||
MongoStorageBackend.getRefStorageFor("U-R1") match {
|
||||
case None =>
|
||||
case Some(o) => fail("should be None")
|
||||
removeMapStorageFor("t1")
|
||||
evaluating { getMapStorageSizeFor("t1") } should produce [NoSuchElementException]
|
||||
}
|
||||
|
||||
val m = Map("1"->1, "2"->4, "3"->9)
|
||||
MongoStorageBackend.insertRefStorageFor("U-R1", m)
|
||||
MongoStorageBackend.getRefStorageFor("U-R1") match {
|
||||
case None => fail("should not be empty")
|
||||
case Some(r) => {
|
||||
val a = r.asInstanceOf[JsValue]
|
||||
val m1 = Symbol("1") ? num
|
||||
val m2 = Symbol("2") ? num
|
||||
val m3 = Symbol("3") ? num
|
||||
it("should do proper range queries") {
|
||||
import MongoStorageBackend._
|
||||
val l = List(
|
||||
("bjarne stroustrup", "c++"),
|
||||
("martin odersky", "scala"),
|
||||
("james gosling", "java"),
|
||||
("yukihiro matsumoto", "ruby"),
|
||||
("slava pestov", "factor"),
|
||||
("rich hickey", "clojure"),
|
||||
("ola bini", "ioke"),
|
||||
("dennis ritchie", "c"),
|
||||
("larry wall", "perl"),
|
||||
("guido van rossum", "python"),
|
||||
("james strachan", "groovy"))
|
||||
insertMapStorageEntriesFor("t1", l.map { case (k, v) => (k.getBytes, v.getBytes) })
|
||||
getMapStorageSizeFor("t1") should equal(l.size)
|
||||
getMapStorageRangeFor("t1", None, None, 100).map { case (k, v) => (new String(k), new String(v)) } should equal(l.sortWith(_._1 < _._1))
|
||||
getMapStorageRangeFor("t1", None, None, 5).map { case (k, v) => (new String(k), new String(v)) }.size should equal(5)
|
||||
}
|
||||
}
|
||||
|
||||
val m1(n1) = a
|
||||
val m2(n2) = a
|
||||
val m3(n3) = a
|
||||
describe("persistent vectors") {
|
||||
it("should insert a single value") {
|
||||
import MongoStorageBackend._
|
||||
|
||||
assertEquals(n1, 1)
|
||||
assertEquals(n2, 4)
|
||||
assertEquals(n3, 9)
|
||||
}
|
||||
insertVectorStorageEntryFor("t1", "martin odersky".getBytes)
|
||||
insertVectorStorageEntryFor("t1", "james gosling".getBytes)
|
||||
new String(getVectorStorageEntryFor("t1", 0)) should equal("james gosling")
|
||||
new String(getVectorStorageEntryFor("t1", 1)) should equal("martin odersky")
|
||||
}
|
||||
|
||||
// insert another one
|
||||
// the previous one should be replaced
|
||||
val b = List("100", "jonas")
|
||||
MongoStorageBackend.insertRefStorageFor("U-R1", b)
|
||||
MongoStorageBackend.getRefStorageFor("U-R1") match {
|
||||
case None => fail("should not be empty")
|
||||
case Some(r) => {
|
||||
val a = r.asInstanceOf[JsValue]
|
||||
val str_lst = list ! str
|
||||
val str_lst(l) = a
|
||||
assertEquals(b, l)
|
||||
}
|
||||
it("should insert multiple values") {
|
||||
import MongoStorageBackend._
|
||||
|
||||
insertVectorStorageEntryFor("t1", "martin odersky".getBytes)
|
||||
insertVectorStorageEntryFor("t1", "james gosling".getBytes)
|
||||
insertVectorStorageEntriesFor("t1", List("ola bini".getBytes, "james strachan".getBytes, "dennis ritchie".getBytes))
|
||||
new String(getVectorStorageEntryFor("t1", 0)) should equal("ola bini")
|
||||
new String(getVectorStorageEntryFor("t1", 1)) should equal("james strachan")
|
||||
new String(getVectorStorageEntryFor("t1", 2)) should equal("dennis ritchie")
|
||||
new String(getVectorStorageEntryFor("t1", 3)) should equal("james gosling")
|
||||
new String(getVectorStorageEntryFor("t1", 4)) should equal("martin odersky")
|
||||
}
|
||||
|
||||
it("should fetch a range of values") {
|
||||
import MongoStorageBackend._
|
||||
|
||||
insertVectorStorageEntryFor("t1", "martin odersky".getBytes)
|
||||
insertVectorStorageEntryFor("t1", "james gosling".getBytes)
|
||||
getVectorStorageSizeFor("t1") should equal(2)
|
||||
insertVectorStorageEntriesFor("t1", List("ola bini".getBytes, "james strachan".getBytes, "dennis ritchie".getBytes))
|
||||
getVectorStorageRangeFor("t1", None, None, 100).map(new String(_)) should equal(List("ola bini", "james strachan", "dennis ritchie", "james gosling", "martin odersky"))
|
||||
getVectorStorageRangeFor("t1", Some(0), Some(5), 100).map(new String(_)) should equal(List("ola bini", "james strachan", "dennis ritchie", "james gosling", "martin odersky"))
|
||||
getVectorStorageRangeFor("t1", Some(2), Some(5), 100).map(new String(_)) should equal(List("dennis ritchie", "james gosling", "martin odersky"))
|
||||
|
||||
getVectorStorageSizeFor("t1") should equal(5)
|
||||
}
|
||||
|
||||
it("should insert and query complex structures") {
|
||||
import MongoStorageBackend._
|
||||
import sjson.json.DefaultProtocol._
|
||||
import sjson.json.JsonSerialization._
|
||||
|
||||
// a list[AnyRef] should be added successfully
|
||||
val l = List("ola bini".getBytes, tobinary(List(100, 200, 300)), tobinary(List(1, 2, 3)))
|
||||
|
||||
// for id = t1
|
||||
insertVectorStorageEntriesFor("t1", l)
|
||||
new String(getVectorStorageEntryFor("t1", 0)) should equal("ola bini")
|
||||
frombinary[List[Int]](getVectorStorageEntryFor("t1", 1)) should equal(List(100, 200, 300))
|
||||
frombinary[List[Int]](getVectorStorageEntryFor("t1", 2)) should equal(List(1, 2, 3))
|
||||
|
||||
getVectorStorageSizeFor("t1") should equal(3)
|
||||
|
||||
// some more for id = t1
|
||||
val m = List(tobinary(Map(1 -> "dg", 2 -> "mc", 3 -> "nd")), tobinary(List("martin odersky", "james gosling")))
|
||||
insertVectorStorageEntriesFor("t1", m)
|
||||
|
||||
// size should add up
|
||||
getVectorStorageSizeFor("t1") should equal(5)
|
||||
|
||||
// now for a diff id
|
||||
insertVectorStorageEntriesFor("t2", l)
|
||||
getVectorStorageSizeFor("t2") should equal(3)
|
||||
}
|
||||
}
|
||||
|
||||
describe("persistent refs") {
|
||||
it("should insert a ref") {
|
||||
import MongoStorageBackend._
|
||||
|
||||
insertRefStorageFor("t1", "martin odersky".getBytes)
|
||||
new String(getRefStorageFor("t1").get) should equal("martin odersky")
|
||||
insertRefStorageFor("t1", "james gosling".getBytes)
|
||||
new String(getRefStorageFor("t1").get) should equal("james gosling")
|
||||
getRefStorageFor("t2") should equal(None)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Binary file not shown.
Binary file not shown.
|
|
@ -49,6 +49,8 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) {
|
|||
lazy val JavaNetRepo = MavenRepository("java.net Repo", "http://download.java.net/maven/2")
|
||||
lazy val SonatypeSnapshotRepo = MavenRepository("Sonatype OSS Repo", "http://oss.sonatype.org/content/repositories/releases")
|
||||
lazy val SunJDMKRepo = MavenRepository("Sun JDMK Repo", "http://wp5.e-taxonomy.eu/cdmlib/mavenrepo")
|
||||
lazy val CasbahRepoSnapshots = MavenRepository("Casbah Snapshot Repo", "http://repo.bumnetworks.com/snapshots/")
|
||||
lazy val CasbahRepoReleases = MavenRepository("Casbah Snapshot Repo", "http://repo.bumnetworks.com/releases/")
|
||||
}
|
||||
|
||||
// -------------------------------------------------------------------------------------------------------------------
|
||||
|
|
@ -75,6 +77,8 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) {
|
|||
lazy val scalaTestModuleConfig = ModuleConfiguration("org.scalatest", ScalaToolsSnapshots)
|
||||
lazy val logbackModuleConfig = ModuleConfiguration("ch.qos.logback",sbt.DefaultMavenRepository)
|
||||
lazy val atomikosModuleConfig = ModuleConfiguration("com.atomikos",sbt.DefaultMavenRepository)
|
||||
lazy val casbahSnapshot = ModuleConfiguration("com.novus",CasbahRepoSnapshots)
|
||||
lazy val casbahRelease = ModuleConfiguration("com.novus",CasbahRepoReleases)
|
||||
lazy val embeddedRepo = EmbeddedRepo // This is the only exception, because the embedded repo is fast!
|
||||
|
||||
// -------------------------------------------------------------------------------------------------------------------
|
||||
|
|
@ -166,6 +170,10 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) {
|
|||
|
||||
lazy val mongo = "org.mongodb" % "mongo-java-driver" % "2.0" % "compile"
|
||||
|
||||
lazy val casbah = "com.novus" % "casbah_2.8.0" % "1.0.8.5" % "compile"
|
||||
|
||||
lazy val time = "org.scala-tools" % "time" % "2.8.0-SNAPSHOT-0.2-SNAPSHOT" % "compile"
|
||||
|
||||
lazy val multiverse = "org.multiverse" % "multiverse-alpha" % MULTIVERSE_VERSION % "compile" intransitive
|
||||
|
||||
lazy val netty = "org.jboss.netty" % "netty" % "3.2.2.Final" % "compile"
|
||||
|
|
@ -180,7 +188,7 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) {
|
|||
|
||||
lazy val sbinary = "sbinary" % "sbinary" % "2.8.0-0.3.1" % "compile"
|
||||
|
||||
lazy val sjson = "sjson.json" % "sjson" % "0.7-2.8.0" % "compile"
|
||||
lazy val sjson = "sjson.json" % "sjson" % "0.8-SNAPSHOT-2.8.0" % "compile"
|
||||
|
||||
lazy val slf4j = "org.slf4j" % "slf4j-api" % SLF4J_VERSION % "compile"
|
||||
|
||||
|
|
@ -483,8 +491,9 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) {
|
|||
|
||||
class AkkaMongoProject(info: ProjectInfo) extends AkkaDefaultProject(info, distPath) {
|
||||
val mongo = Dependencies.mongo
|
||||
val casbah = Dependencies.casbah
|
||||
|
||||
override def testOptions = TestFilter((name: String) => name.endsWith("Test")) :: Nil
|
||||
// override def testOptions = TestFilter((name: String) => name.endsWith("Test")) :: Nil
|
||||
}
|
||||
|
||||
// -------------------------------------------------------------------------------------------------------------------
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue