diff --git a/akka-persistence/akka-persistence-cassandra/src/main/scala/CassandraStorage.scala b/akka-persistence/akka-persistence-cassandra/src/main/scala/CassandraStorage.scala
index be5fc4f4c7..0c6f239ef7 100644
--- a/akka-persistence/akka-persistence-cassandra/src/main/scala/CassandraStorage.scala
+++ b/akka-persistence/akka-persistence-cassandra/src/main/scala/CassandraStorage.scala
@@ -29,7 +29,7 @@ object CassandraStorage extends Storage {
*
* @author Jonas Bonér
*/
-class CassandraPersistentMap(id: String) extends PersistentMap[Array[Byte], Array[Byte]] {
+class CassandraPersistentMap(id: String) extends PersistentMapBinary {
val uuid = id
val storage = CassandraStorageBackend
}
diff --git a/akka-persistence/akka-persistence-common/src/main/scala/Storage.scala b/akka-persistence/akka-persistence-common/src/main/scala/Storage.scala
index ccaf7518f1..4d9ff48a60 100644
--- a/akka-persistence/akka-persistence-common/src/main/scala/Storage.scala
+++ b/akka-persistence/akka-persistence-common/src/main/scala/Storage.scala
@@ -7,9 +7,11 @@ package se.scalablesolutions.akka.persistence.common
import se.scalablesolutions.akka.stm._
import se.scalablesolutions.akka.stm.TransactionManagement.transaction
import se.scalablesolutions.akka.util.Logging
-import se.scalablesolutions.akka.AkkaException
-class StorageException(message: String) extends AkkaException(message)
+// FIXME move to 'stm' package + add message with more info
+class NoTransactionInScopeException extends RuntimeException
+
+class StorageException(message: String) extends RuntimeException(message)
/**
* Example Scala usage.
@@ -80,24 +82,90 @@ trait Storage {
*/
trait PersistentMap[K, V] extends scala.collection.mutable.Map[K, V]
with Transactional with Committable with Abortable with Logging {
- protected val newAndUpdatedEntries = TransactionalMap[K, V]()
- protected val removedEntries = TransactionalVector[K]()
protected val shouldClearOnCommit = Ref[Boolean]()
+ // operations on the Map
+ trait Op
+ case object GET extends Op
+ case object PUT extends Op
+ case object REM extends Op
+ case object UPD extends Op
+
+ // append only log: records all mutating operations
+ protected val appendOnlyTxLog = TransactionalVector[LogEntry]()
+
+ case class LogEntry(key: K, value: Option[V], op: Op)
+
+ // need to override in subclasses e.g. "sameElements" for Array[Byte]
+ def equal(k1: K, k2: K): Boolean = k1 == k2
+
+ // Seqable type that's required for maintaining the log of distinct keys affected in current transaction
+ type T <: Equals
+
+ // converts key K to the Seqable type Equals
+ def toEquals(k: K): T
+
+ // keys affected in the current transaction
+ protected val keysInCurrentTx = TransactionalMap[T, K]()
+
+ protected def addToListOfKeysInTx(key: K): Unit =
+ keysInCurrentTx += (toEquals(key), key)
+
+ protected def clearDistinctKeys = keysInCurrentTx.clear
+
+ protected def filterTxLogByKey(key: K): IndexedSeq[LogEntry] =
+ appendOnlyTxLog filter(e => equal(e.key, key))
+
+ // need to get current value considering the underlying storage as well as the transaction log
+ protected def getCurrentValue(key: K): Option[V] = {
+
+ // get all mutating entries for this key for this tx
+ val txEntries = filterTxLogByKey(key)
+
+ // get the snapshot from the underlying store for this key
+ val underlying = try {
+ storage.getMapStorageEntryFor(uuid, key)
+ } catch { case e: Exception => None }
+
+ if (txEntries.isEmpty) underlying
+ else replay(txEntries, key, underlying)
+ }
+
+ // replay all tx entries for key k with seed = initial
+ private def replay(txEntries: IndexedSeq[LogEntry], key: K, initial: Option[V]): Option[V] = {
+ import scala.collection.mutable._
+
+ val m = initial match {
+ case None => Map.empty[K, V]
+ case Some(v) => Map((key, v))
+ }
+ txEntries.foreach {case LogEntry(k, v, o) => o match {
+ case PUT => m.put(k, v.get)
+ case REM => m -= k
+ case UPD => m.update(k, v.get)
+ }}
+ m get key
+ }
+
// to be concretized in subclasses
val storage: MapStorageBackend[K, V]
def commit = {
- if (shouldClearOnCommit.isDefined && shouldClearOnCommit.get) storage.removeMapStorageFor(uuid)
- removedEntries.toList.foreach(key => storage.removeMapStorageFor(uuid, key))
- storage.insertMapStorageEntriesFor(uuid, newAndUpdatedEntries.toList)
- newAndUpdatedEntries.clear
- removedEntries.clear
+ // if (shouldClearOnCommit.isDefined && shouldClearOnCommit.get) storage.removeMapStorageFor(uuid)
+
+ appendOnlyTxLog.foreach { case LogEntry(k, v, o) => o match {
+ case PUT => storage.insertMapStorageEntryFor(uuid, k, v.get)
+ case UPD => storage.insertMapStorageEntryFor(uuid, k, v.get)
+ case REM => storage.removeMapStorageFor(uuid, k)
+ }}
+
+ appendOnlyTxLog.clear
+ clearDistinctKeys
}
def abort = {
- newAndUpdatedEntries.clear
- removedEntries.clear
+ appendOnlyTxLog.clear
+ clearDistinctKeys
shouldClearOnCommit.swap(false)
}
@@ -118,68 +186,84 @@ trait PersistentMap[K, V] extends scala.collection.mutable.Map[K, V]
override def put(key: K, value: V): Option[V] = {
register
- newAndUpdatedEntries.put(key, value)
+ val curr = getCurrentValue(key)
+ appendOnlyTxLog add LogEntry(key, Some(value), PUT)
+ addToListOfKeysInTx(key)
+ curr
}
override def update(key: K, value: V) = {
register
- newAndUpdatedEntries.update(key, value)
+ val curr = getCurrentValue(key)
+ appendOnlyTxLog add LogEntry(key, Some(value), UPD)
+ addToListOfKeysInTx(key)
+ curr
}
override def remove(key: K) = {
register
- removedEntries.add(key)
- newAndUpdatedEntries.get(key)
+ val curr = getCurrentValue(key)
+ appendOnlyTxLog add LogEntry(key, None, REM)
+ addToListOfKeysInTx(key)
+ curr
}
- def slice(start: Option[K], count: Int): List[Tuple2[K, V]] =
+ def slice(start: Option[K], count: Int): List[(K, V)] =
slice(start, None, count)
- def slice(start: Option[K], finish: Option[K], count: Int): List[Tuple2[K, V]] = try {
- storage.getMapStorageRangeFor(uuid, start, finish, count)
- } catch { case e: Exception => Nil }
+ def slice(start: Option[K], finish: Option[K], count: Int): List[(K, V)]
override def clear = {
register
+ appendOnlyTxLog.clear
+ clearDistinctKeys
shouldClearOnCommit.swap(true)
}
override def contains(key: K): Boolean = try {
- newAndUpdatedEntries.contains(key) ||
- storage.getMapStorageEntryFor(uuid, key).isDefined
+ filterTxLogByKey(key) match {
+ case Seq() => // current tx doesn't use this
+ storage.getMapStorageEntryFor(uuid, key).isDefined // check storage
+ case txs => // present in log
+ txs.last.op != REM // last entry cannot be a REM
+ }
} catch { case e: Exception => false }
+ protected def existsInStorage(key: K): Option[V] = try {
+ storage.getMapStorageEntryFor(uuid, key)
+ } catch {
+ case e: Exception => None
+ }
+
override def size: Int = try {
- storage.getMapStorageSizeFor(uuid)
- } catch { case e: Exception => 0 }
+ // partition key set affected in current tx into those which r added & which r deleted
+ val (keysAdded, keysRemoved) = keysInCurrentTx.map {
+ case (kseq, k) => ((kseq, k), getCurrentValue(k))
+ }.partition(_._2.isDefined)
- override def get(key: K): Option[V] = {
- if (newAndUpdatedEntries.contains(key)) {
- newAndUpdatedEntries.get(key)
- }
- else try {
- storage.getMapStorageEntryFor(uuid, key)
- } catch { case e: Exception => None }
+ // keys which existed in storage but removed in current tx
+ val inStorageRemovedInTx =
+ keysRemoved.keySet
+ .map(_._2)
+ .filter(k => existsInStorage(k).isDefined)
+ .size
+
+ // all keys in storage
+ val keysInStorage =
+ storage.getMapStorageFor(uuid)
+ .map { case (k, v) => toEquals(k) }
+ .toSet
+
+ // (keys that existed UNION keys added ) - (keys removed)
+ (keysInStorage union keysAdded.keySet.map(_._1)).size - inStorageRemovedInTx
+ } catch {
+ case e: Exception => 0
}
- def iterator = elements
+ // get must consider underlying storage & current uncommitted tx log
+ override def get(key: K): Option[V] = getCurrentValue(key)
- override def elements: Iterator[Tuple2[K, V]] = {
- new Iterator[Tuple2[K, V]] {
- private val originalList: List[Tuple2[K, V]] = try {
- storage.getMapStorageFor(uuid)
- } catch {
- case e: Throwable => Nil
- }
- private var elements = newAndUpdatedEntries.toList union originalList.reverse
- override def next: Tuple2[K, V]= synchronized {
- val element = elements.head
- elements = elements.tail
- element
- }
- override def hasNext: Boolean = synchronized { !elements.isEmpty }
- }
- }
+ def iterator: Iterator[Tuple2[K, V]]
private def register = {
if (transaction.get.isEmpty) throw new NoTransactionInScopeException
@@ -187,6 +271,95 @@ trait PersistentMap[K, V] extends scala.collection.mutable.Map[K, V]
}
}
+trait PersistentMapBinary extends PersistentMap[Array[Byte], Array[Byte]] {
+ import scala.collection.mutable.ArraySeq
+
+ type T = ArraySeq[Byte]
+ def toEquals(k: Array[Byte]) = ArraySeq(k: _*)
+ override def equal(k1: Array[Byte], k2: Array[Byte]): Boolean = k1 sameElements k2
+
+ object COrdering {
+ implicit object ArraySeqOrdering extends Ordering[ArraySeq[Byte]] {
+ def compare(o1: ArraySeq[Byte], o2: ArraySeq[Byte]) =
+ new String(o1.toArray) compare new String(o2.toArray)
+ }
+ }
+
+ import scala.collection.immutable.{TreeMap, SortedMap}
+ private def replayAllKeys: SortedMap[ArraySeq[Byte], Array[Byte]] = {
+ import COrdering._
+
+ // need ArraySeq for ordering
+ val fromStorage =
+ TreeMap(storage.getMapStorageFor(uuid).map { case (k, v) => (ArraySeq(k: _*), v) }: _*)
+
+ val (keysAdded, keysRemoved) = keysInCurrentTx.map {
+ case (_, k) => (k, getCurrentValue(k))
+ }.partition(_._2.isDefined)
+
+ val inStorageRemovedInTx =
+ keysRemoved.keySet
+ .filter(k => existsInStorage(k).isDefined)
+ .map(k => ArraySeq(k: _*))
+
+ (fromStorage -- inStorageRemovedInTx) ++ keysAdded.map { case (k, Some(v)) => (ArraySeq(k: _*), v) }
+ }
+
+ override def slice(start: Option[Array[Byte]], finish: Option[Array[Byte]], count: Int): List[(Array[Byte], Array[Byte])] = try {
+ val newMap = replayAllKeys
+
+ if (newMap isEmpty) List[(Array[Byte], Array[Byte])]()
+
+ val startKey =
+ start match {
+ case Some(bytes) => Some(ArraySeq(bytes: _*))
+ case None => None
+ }
+
+ val endKey =
+ finish match {
+ case Some(bytes) => Some(ArraySeq(bytes: _*))
+ case None => None
+ }
+
+ ((startKey, endKey, count): @unchecked) match {
+ case ((Some(s), Some(e), _)) =>
+ newMap.range(s, e)
+ .toList
+ .map(e => (e._1.toArray, e._2))
+ .toList
+ case ((Some(s), None, c)) if c > 0 =>
+ newMap.from(s)
+ .iterator
+ .take(count)
+ .map(e => (e._1.toArray, e._2))
+ .toList
+ case ((Some(s), None, _)) =>
+ newMap.from(s)
+ .toList
+ .map(e => (e._1.toArray, e._2))
+ .toList
+ case ((None, Some(e), _)) =>
+ newMap.until(e)
+ .toList
+ .map(e => (e._1.toArray, e._2))
+ .toList
+ }
+ } catch { case e: Exception => Nil }
+
+ override def iterator: Iterator[(Array[Byte], Array[Byte])] = {
+ new Iterator[(Array[Byte], Array[Byte])] {
+ private var elements = replayAllKeys
+ override def next: (Array[Byte], Array[Byte]) = synchronized {
+ val (k, v) = elements.head
+ elements = elements.tail
+ (k.toArray, v)
+ }
+ override def hasNext: Boolean = synchronized { !elements.isEmpty }
+ }
+ }
+}
+
/**
* Implements a template for a concrete persistent transactional vector based storage.
*
@@ -198,42 +371,83 @@ trait PersistentVector[T] extends IndexedSeq[T] with Transactional with Committa
protected val removedElems = TransactionalVector[T]()
protected val shouldClearOnCommit = Ref[Boolean]()
+ // operations on the Vector
+ trait Op
+ case object ADD extends Op
+ case object UPD extends Op
+ case object POP extends Op
+
+ // append only log: records all mutating operations
+ protected val appendOnlyTxLog = TransactionalVector[LogEntry]()
+
+ case class LogEntry(index: Option[Int], value: Option[T], op: Op)
+
+ // need to override in subclasses e.g. "sameElements" for Array[Byte]
+ def equal(v1: T, v2: T): Boolean = v1 == v2
+
val storage: VectorStorageBackend[T]
def commit = {
- for (element <- newElems) storage.insertVectorStorageEntryFor(uuid, element)
- for (entry <- updatedElems) storage.updateVectorStorageEntryFor(uuid, entry._1, entry._2)
- newElems.clear
- updatedElems.clear
+ for(entry <- appendOnlyTxLog) {
+ entry match {
+ case LogEntry(_, Some(v), ADD) => storage.insertVectorStorageEntryFor(uuid, v)
+ case LogEntry(Some(i), Some(v), UPD) => storage.updateVectorStorageEntryFor(uuid, i, v)
+ case LogEntry(_, _, POP) => //..
+ }
+ }
+ appendOnlyTxLog.clear
}
def abort = {
- newElems.clear
- updatedElems.clear
- removedElems.clear
+ appendOnlyTxLog.clear
shouldClearOnCommit.swap(false)
}
+ private def replay: List[T] = {
+ import scala.collection.mutable.ArrayBuffer
+ var elemsStorage = ArrayBuffer(storage.getVectorStorageRangeFor(uuid, None, None, storage.getVectorStorageSizeFor(uuid)).reverse: _*)
+
+ for(entry <- appendOnlyTxLog) {
+ entry match {
+ case LogEntry(_, Some(v), ADD) => elemsStorage += v
+ case LogEntry(Some(i), Some(v), UPD) => elemsStorage.update(i, v)
+ case LogEntry(_, _, POP) => elemsStorage = elemsStorage.drop(1)
+ }
+ }
+ elemsStorage.toList.reverse
+ }
+
def +(elem: T) = add(elem)
def add(elem: T) = {
register
- newElems + elem
+ appendOnlyTxLog + LogEntry(None, Some(elem), ADD)
}
def apply(index: Int): T = get(index)
def get(index: Int): T = {
- if (newElems.size > index) newElems(index)
- else storage.getVectorStorageEntryFor(uuid, index)
+ if (appendOnlyTxLog.isEmpty) {
+ storage.getVectorStorageEntryFor(uuid, index)
+ } else {
+ val curr = replay
+ curr(index)
+ }
}
override def slice(start: Int, finish: Int): IndexedSeq[T] = slice(Some(start), Some(finish))
def slice(start: Option[Int], finish: Option[Int], count: Int = 0): IndexedSeq[T] = {
- val buffer = new scala.collection.mutable.ArrayBuffer[T]
- storage.getVectorStorageRangeFor(uuid, start, finish, count).foreach(buffer.append(_))
- buffer
+ val curr = replay
+ val s = if (start.isDefined) start.get else 0
+ val cnt =
+ if (finish.isDefined) {
+ val f = finish.get
+ if (f >= s) (f - s) else count
+ }
+ else count
+ if (s == 0 && cnt == 0) List().toIndexedSeq
+ else curr.slice(s, s + cnt).toIndexedSeq
}
/**
@@ -241,12 +455,13 @@ trait PersistentVector[T] extends IndexedSeq[T] with Transactional with Committa
*/
def pop: T = {
register
+ appendOnlyTxLog + LogEntry(None, None, POP)
throw new UnsupportedOperationException("PersistentVector::pop is not implemented")
}
def update(index: Int, newElem: T) = {
register
- storage.updateVectorStorageEntryFor(uuid, index, newElem)
+ appendOnlyTxLog + LogEntry(Some(index), Some(newElem), UPD)
}
override def first: T = get(0)
@@ -260,7 +475,7 @@ trait PersistentVector[T] extends IndexedSeq[T] with Transactional with Committa
}
}
- def length: Int = storage.getVectorStorageSizeFor(uuid) + newElems.length
+ def length: Int = replay.length
private def register = {
if (transaction.get.isEmpty) throw new NoTransactionInScopeException
diff --git a/akka-persistence/akka-persistence-mongo/src/main/scala/MongoStorage.scala b/akka-persistence/akka-persistence-mongo/src/main/scala/MongoStorage.scala
index 98776253a5..83e47e3ba5 100644
--- a/akka-persistence/akka-persistence-mongo/src/main/scala/MongoStorage.scala
+++ b/akka-persistence/akka-persistence-mongo/src/main/scala/MongoStorage.scala
@@ -9,7 +9,7 @@ import se.scalablesolutions.akka.persistence.common._
import se.scalablesolutions.akka.util.UUID
object MongoStorage extends Storage {
- type ElementType = AnyRef
+ type ElementType = Array[Byte]
def newMap: PersistentMap[ElementType, ElementType] = newMap(UUID.newUuid.toString)
def newVector: PersistentVector[ElementType] = newVector(UUID.newUuid.toString)
@@ -29,7 +29,7 @@ object MongoStorage extends Storage {
*
* @author Debasish Ghosh
*/
-class MongoPersistentMap(id: String) extends PersistentMap[AnyRef, AnyRef] {
+class MongoPersistentMap(id: String) extends PersistentMapBinary {
val uuid = id
val storage = MongoStorageBackend
}
@@ -40,12 +40,12 @@ class MongoPersistentMap(id: String) extends PersistentMap[AnyRef, AnyRef] {
*
* @author Debaissh Ghosh
*/
-class MongoPersistentVector(id: String) extends PersistentVector[AnyRef] {
+class MongoPersistentVector(id: String) extends PersistentVector[Array[Byte]] {
val uuid = id
val storage = MongoStorageBackend
}
-class MongoPersistentRef(id: String) extends PersistentRef[AnyRef] {
+class MongoPersistentRef(id: String) extends PersistentRef[Array[Byte]] {
val uuid = id
val storage = MongoStorageBackend
}
diff --git a/akka-persistence/akka-persistence-mongo/src/main/scala/MongoStorageBackend.scala b/akka-persistence/akka-persistence-mongo/src/main/scala/MongoStorageBackend.scala
index 950165567d..01d8ababce 100644
--- a/akka-persistence/akka-persistence-mongo/src/main/scala/MongoStorageBackend.scala
+++ b/akka-persistence/akka-persistence-mongo/src/main/scala/MongoStorageBackend.scala
@@ -9,13 +9,8 @@ import se.scalablesolutions.akka.persistence.common._
import se.scalablesolutions.akka.util.Logging
import se.scalablesolutions.akka.config.Config.config
-import sjson.json.Serializer._
-
import java.util.NoSuchElementException
-
-import com.mongodb._
-
-import java.util.{Map=>JMap, List=>JList, ArrayList=>JArrayList}
+import com.novus.casbah.mongodb.Imports._
/**
* A module for supporting MongoDB based persistence.
@@ -28,294 +23,208 @@ import java.util.{Map=>JMap, List=>JList, ArrayList=>JArrayList}
* @author Debasish Ghosh
*/
private[akka] object MongoStorageBackend extends
- MapStorageBackend[AnyRef, AnyRef] with
- VectorStorageBackend[AnyRef] with
- RefStorageBackend[AnyRef] with
+ MapStorageBackend[Array[Byte], Array[Byte]] with
+ VectorStorageBackend[Array[Byte]] with
+ RefStorageBackend[Array[Byte]] with
Logging {
- // enrich with null safe findOne
- class RichDBCollection(value: DBCollection) {
- def findOneNS(o: DBObject): Option[DBObject] = {
- value.findOne(o) match {
- case null => None
- case x => Some(x)
- }
- }
- }
-
- implicit def enrichDBCollection(c: DBCollection) = new RichDBCollection(c)
-
- val KEY = "key"
- val VALUE = "value"
+ val KEY = "__key"
+ val REF = "__ref"
val COLLECTION = "akka_coll"
- val MONGODB_SERVER_HOSTNAME = config.getString("akka.storage.mongodb.hostname", "127.0.0.1")
- val MONGODB_SERVER_DBNAME = config.getString("akka.storage.mongodb.dbname", "testdb")
- val MONGODB_SERVER_PORT = config.getInt("akka.storage.mongodb.port", 27017)
+ val HOSTNAME = config.getString("akka.storage.mongodb.hostname", "127.0.0.1")
+ val DBNAME = config.getString("akka.storage.mongodb.dbname", "testdb")
+ val PORT = config.getInt("akka.storage.mongodb.port", 27017)
- val db = new Mongo(MONGODB_SERVER_HOSTNAME, MONGODB_SERVER_PORT)
- val coll = db.getDB(MONGODB_SERVER_DBNAME).getCollection(COLLECTION)
+ val db: MongoDB = MongoConnection(HOSTNAME, PORT)(DBNAME)
+ val coll: MongoCollection = db(COLLECTION)
- private[this] val serializer = SJSON
+ def drop() { db.dropDatabase() }
- def insertMapStorageEntryFor(name: String, key: AnyRef, value: AnyRef) {
+ def insertMapStorageEntryFor(name: String, key: Array[Byte], value: Array[Byte]) {
insertMapStorageEntriesFor(name, List((key, value)))
}
- def insertMapStorageEntriesFor(name: String, entries: List[Tuple2[AnyRef, AnyRef]]) {
- import java.util.{Map, HashMap}
-
- val m: Map[AnyRef, AnyRef] = new HashMap
- for ((k, v) <- entries) {
- m.put(k, serializer.out(v))
- }
-
- nullSafeFindOne(name) match {
- case None =>
- coll.insert(new BasicDBObject().append(KEY, name).append(VALUE, m))
- case Some(dbo) => {
- // collate the maps
- val o = dbo.get(VALUE).asInstanceOf[Map[AnyRef, AnyRef]]
- o.putAll(m)
-
- val newdbo = new BasicDBObject().append(KEY, name).append(VALUE, o)
- coll.update(new BasicDBObject().append(KEY, name), newdbo, true, false)
+ def insertMapStorageEntriesFor(name: String, entries: List[(Array[Byte], Array[Byte])]) {
+ db.safely { db =>
+ val q: DBObject = MongoDBObject(KEY -> name)
+ coll.findOne(q) match {
+ case Some(dbo) =>
+ entries.foreach { case (k, v) => dbo += new String(k) -> v }
+ db.safely { db => coll.update(q, dbo, true, false) }
+ case None =>
+ val builder = MongoDBObject.newBuilder
+ builder += KEY -> name
+ entries.foreach { case (k, v) => builder += new String(k) -> v }
+ coll += builder.result.asDBObject
}
}
}
def removeMapStorageFor(name: String): Unit = {
- val q = new BasicDBObject
- q.put(KEY, name)
- coll.remove(q)
+ val q: DBObject = MongoDBObject(KEY -> name)
+ db.safely { db => coll.remove(q) }
}
- def removeMapStorageFor(name: String, key: AnyRef): Unit = {
- nullSafeFindOne(name) match {
- case None =>
- case Some(dbo) => {
- val orig = dbo.get(VALUE).asInstanceOf[DBObject].toMap
- if (key.isInstanceOf[List[_]]) {
- val keys = key.asInstanceOf[List[_]]
- keys.foreach(k => orig.remove(k.asInstanceOf[String]))
- } else {
- orig.remove(key.asInstanceOf[String])
- }
- // remove existing reference
- removeMapStorageFor(name)
- // and insert
- coll.insert(new BasicDBObject().append(KEY, name).append(VALUE, orig))
- }
+ private def queryFor[T](name: String)(body: (MongoDBObject, Option[DBObject]) => T): T = {
+ val q = MongoDBObject(KEY -> name)
+ body(q, coll.findOne(q))
+ }
+
+ def removeMapStorageFor(name: String, key: Array[Byte]): Unit = queryFor(name) { (q, dbo) =>
+ dbo.foreach { d =>
+ d -= new String(key)
+ db.safely { db => coll.update(q, d, true, false) }
}
}
- def getMapStorageEntryFor(name: String, key: AnyRef): Option[AnyRef] =
- getValueForKey(name, key.asInstanceOf[String])
-
- def getMapStorageSizeFor(name: String): Int = {
- nullSafeFindOne(name) match {
- case None => 0
- case Some(dbo) =>
- dbo.get(VALUE).asInstanceOf[JMap[String, AnyRef]].keySet.size
- }
+ def getMapStorageEntryFor(name: String, key: Array[Byte]): Option[Array[Byte]] = queryFor(name) { (q, dbo) =>
+ dbo.map { d =>
+ d.getAs[Array[Byte]](new String(key))
+ }.getOrElse(None)
}
- def getMapStorageFor(name: String): List[Tuple2[AnyRef, AnyRef]] = {
- val m =
- nullSafeFindOne(name) match {
- case None =>
- throw new NoSuchElementException(name + " not present")
- case Some(dbo) =>
- dbo.get(VALUE).asInstanceOf[JMap[String, AnyRef]]
- }
- val n =
- List(m.keySet.toArray: _*).asInstanceOf[List[String]]
- val vals =
- for(s <- n)
- yield (s, serializer.in[AnyRef](m.get(s).asInstanceOf[Array[Byte]]))
- vals.asInstanceOf[List[Tuple2[String, AnyRef]]]
+ def getMapStorageSizeFor(name: String): Int = queryFor(name) { (q, dbo) =>
+ dbo.map { d =>
+ d.size - 2 // need to exclude object id and our KEY
+ }.getOrElse(0)
}
- def getMapStorageRangeFor(name: String, start: Option[AnyRef],
- finish: Option[AnyRef],
- count: Int): List[Tuple2[AnyRef, AnyRef]] = {
- val m =
- nullSafeFindOne(name) match {
- case None =>
- throw new NoSuchElementException(name + " not present")
- case Some(dbo) =>
- dbo.get(VALUE).asInstanceOf[JMap[String, AnyRef]]
- }
-
- /**
- * count is the max number of results to return. Start with
- * start or 0 (if start is not defined) and go until
- * you hit finish or count.
- */
- val s = if (start.isDefined) start.get.asInstanceOf[Int] else 0
- val cnt =
- if (finish.isDefined) {
- val f = finish.get.asInstanceOf[Int]
- if (f >= s) math.min(count, (f - s)) else count
- }
- else count
-
- val n =
- List(m.keySet.toArray: _*).asInstanceOf[List[String]].sortWith((e1, e2) => (e1 compareTo e2) < 0).slice(s, s + cnt)
- val vals =
- for(s <- n)
- yield (s, serializer.in[AnyRef](m.get(s).asInstanceOf[Array[Byte]]))
- vals.asInstanceOf[List[Tuple2[String, AnyRef]]]
+ def getMapStorageFor(name: String): List[(Array[Byte], Array[Byte])] = queryFor(name) { (q, dbo) =>
+ dbo.map { d =>
+ for {
+ (k, v) <- d.toList
+ if k != "_id" && k != KEY
+ } yield (k.getBytes, v.asInstanceOf[Array[Byte]])
+ }.getOrElse(List.empty[(Array[Byte], Array[Byte])])
}
- private def getValueForKey(name: String, key: String): Option[AnyRef] = {
- try {
- nullSafeFindOne(name) match {
- case None => None
- case Some(dbo) =>
- Some(serializer.in[AnyRef](
- dbo.get(VALUE)
- .asInstanceOf[JMap[String, AnyRef]]
- .get(key).asInstanceOf[Array[Byte]]))
- }
- } catch {
- case e =>
- throw new NoSuchElementException(e.getMessage)
- }
+ def getMapStorageRangeFor(name: String, start: Option[Array[Byte]],
+ finish: Option[Array[Byte]],
+ count: Int): List[(Array[Byte], Array[Byte])] = queryFor(name) { (q, dbo) =>
+ dbo.map { d =>
+ // get all keys except the special ones
+ val keys = d.keys
+ .filter(k => k != "_id" && k != KEY)
+ .toList
+ .sortWith(_ < _)
+
+ // if the supplied start is not defined, get the head of keys
+ val s = start.map(new String(_)).getOrElse(keys.head)
+
+ // if the supplied finish is not defined, get the last element of keys
+ val f = finish.map(new String(_)).getOrElse(keys.last)
+
+ // slice from keys: both ends inclusive
+ val ks = keys.slice(keys.indexOf(s), scala.math.min(count, keys.indexOf(f) + 1))
+ ks.map(k => (k.getBytes, d.getAs[Array[Byte]](k).get))
+ }.getOrElse(List.empty[(Array[Byte], Array[Byte])])
}
- def insertVectorStorageEntriesFor(name: String, elements: List[AnyRef]) = {
- val q = new BasicDBObject
- q.put(KEY, name)
-
- val currentList =
- coll.findOneNS(q) match {
- case None =>
- new JArrayList[AnyRef]
- case Some(dbo) =>
- dbo.get(VALUE).asInstanceOf[JArrayList[AnyRef]]
- }
- if (!currentList.isEmpty) {
- // record exists
- // remove before adding
- coll.remove(q)
- }
-
- // add to the current list
- elements.map(serializer.out(_)).foreach(currentList.add(_))
-
- coll.insert(
- new BasicDBObject()
- .append(KEY, name)
- .append(VALUE, currentList)
- )
- }
-
- def insertVectorStorageEntryFor(name: String, element: AnyRef) = {
+ def insertVectorStorageEntryFor(name: String, element: Array[Byte]) = {
insertVectorStorageEntriesFor(name, List(element))
}
- def getVectorStorageEntryFor(name: String, index: Int): AnyRef = {
- try {
- val o =
- nullSafeFindOne(name) match {
- case None =>
- throw new NoSuchElementException(name + " not present")
+ def insertVectorStorageEntriesFor(name: String, elements: List[Array[Byte]]) = {
+ // lookup with name
+ val q: DBObject = MongoDBObject(KEY -> name)
- case Some(dbo) =>
- dbo.get(VALUE).asInstanceOf[JList[AnyRef]]
+ db.safely { db =>
+ coll.findOne(q) match {
+ // exists : need to update
+ case Some(dbo) =>
+ dbo -= KEY
+ dbo -= "_id"
+ val listBuilder = MongoDBList.newBuilder
+
+ // expensive!
+ listBuilder ++= (elements ++ dbo.toSeq.sortWith((e1, e2) => (e1._1.toInt < e2._1.toInt)).map(_._2))
+
+ val builder = MongoDBObject.newBuilder
+ builder += KEY -> name
+ builder ++= listBuilder.result
+ coll.update(q, builder.result.asDBObject, true, false)
+
+ // new : just add
+ case None =>
+ val listBuilder = MongoDBList.newBuilder
+ listBuilder ++= elements
+
+ val builder = MongoDBObject.newBuilder
+ builder += KEY -> name
+ builder ++= listBuilder.result
+ coll += builder.result.asDBObject
}
- serializer.in[AnyRef](
- o.get(index).asInstanceOf[Array[Byte]])
- } catch {
- case e =>
- throw new NoSuchElementException(e.getMessage)
}
}
- def getVectorStorageRangeFor(name: String,
- start: Option[Int], finish: Option[Int], count: Int): List[AnyRef] = {
- try {
- val o =
- nullSafeFindOne(name) match {
- case None =>
- throw new NoSuchElementException(name + " not present")
+ def updateVectorStorageEntryFor(name: String, index: Int, elem: Array[Byte]) = queryFor(name) { (q, dbo) =>
+ dbo.foreach { d =>
+ d += ((index.toString, elem))
+ db.safely { db => coll.update(q, d, true, false) }
+ }
+ }
- case Some(dbo) =>
- dbo.get(VALUE).asInstanceOf[JList[AnyRef]]
- }
+ def getVectorStorageEntryFor(name: String, index: Int): Array[Byte] = queryFor(name) { (q, dbo) =>
+ dbo.map { d =>
+ d(index.toString).asInstanceOf[Array[Byte]]
+ }.getOrElse(Array.empty[Byte])
+ }
- val s = if (start.isDefined) start.get else 0
+ /**
+ * if start and finish both are defined, ignore count and
+ * report the range [start, finish)
+ * if start is not defined, assume start = 0
+ * if start == 0 and finish == 0, return an empty collection
+ */
+ def getVectorStorageRangeFor(name: String, start: Option[Int], finish: Option[Int], count: Int): List[Array[Byte]] = queryFor(name) { (q, dbo) =>
+ dbo.map { d =>
+ val ls = d.filter { case (k, v) => k != KEY && k != "_id" }
+ .toSeq
+ .sortWith((e1, e2) => (e1._1.toInt < e2._1.toInt))
+ .map(_._2)
+
+ val st = start.getOrElse(0)
val cnt =
if (finish.isDefined) {
val f = finish.get
- if (f >= s) (f - s) else count
+ if (f >= st) (f - st) else count
}
else count
-
- // pick the subrange and make a Scala list
- val l =
- List(o.subList(s, s + cnt).toArray: _*)
-
- for(e <- l)
- yield serializer.in[AnyRef](e.asInstanceOf[Array[Byte]])
- } catch {
- case e =>
- throw new NoSuchElementException(e.getMessage)
- }
+ if (st == 0 && cnt == 0) List()
+ ls.slice(st, st + cnt).asInstanceOf[List[Array[Byte]]]
+ }.getOrElse(List.empty[Array[Byte]])
}
- def updateVectorStorageEntryFor(name: String, index: Int, elem: AnyRef) = {
- val q = new BasicDBObject
- q.put(KEY, name)
-
- val dbobj =
- coll.findOneNS(q) match {
- case None =>
- throw new NoSuchElementException(name + " not present")
- case Some(dbo) => dbo
- }
- val currentList = dbobj.get(VALUE).asInstanceOf[JArrayList[AnyRef]]
- currentList.set(index, serializer.out(elem))
- coll.update(q,
- new BasicDBObject().append(KEY, name).append(VALUE, currentList))
+ def getVectorStorageSizeFor(name: String): Int = queryFor(name) { (q, dbo) =>
+ dbo.map { d => d.size - 2 }.getOrElse(0)
}
- def getVectorStorageSizeFor(name: String): Int = {
- nullSafeFindOne(name) match {
- case None => 0
- case Some(dbo) =>
- dbo.get(VALUE).asInstanceOf[JList[AnyRef]].size
- }
- }
+ def insertRefStorageFor(name: String, element: Array[Byte]) = {
+ // lookup with name
+ val q: DBObject = MongoDBObject(KEY -> name)
- private def nullSafeFindOne(name: String): Option[DBObject] = {
- val o = new BasicDBObject
- o.put(KEY, name)
- coll.findOneNS(o)
- }
+ db.safely { db =>
+ coll.findOne(q) match {
+ // exists : need to update
+ case Some(dbo) =>
+ dbo += ((REF, element))
+ coll.update(q, dbo, true, false)
- def insertRefStorageFor(name: String, element: AnyRef) = {
- nullSafeFindOne(name) match {
- case None =>
- case Some(dbo) => {
- val q = new BasicDBObject
- q.put(KEY, name)
- coll.remove(q)
+ // not found : make one
+ case None =>
+ val builder = MongoDBObject.newBuilder
+ builder += KEY -> name
+ builder += REF -> element
+ coll += builder.result.asDBObject
}
}
- coll.insert(
- new BasicDBObject()
- .append(KEY, name)
- .append(VALUE, serializer.out(element)))
}
- def getRefStorageFor(name: String): Option[AnyRef] = {
- nullSafeFindOne(name) match {
- case None => None
- case Some(dbo) =>
- Some(serializer.in[AnyRef](dbo.get(VALUE).asInstanceOf[Array[Byte]]))
- }
+ def getRefStorageFor(name: String): Option[Array[Byte]] = queryFor(name) { (q, dbo) =>
+ dbo.map { d =>
+ d.getAs[Array[Byte]](REF)
+ }.getOrElse(None)
}
}
diff --git a/akka-persistence/akka-persistence-mongo/src/test/scala/MongoPersistentActorSpec.scala b/akka-persistence/akka-persistence-mongo/src/test/scala/MongoPersistentActorSpec.scala
index 1acc9ee97d..01f735b254 100644
--- a/akka-persistence/akka-persistence-mongo/src/test/scala/MongoPersistentActorSpec.scala
+++ b/akka-persistence/akka-persistence-mongo/src/test/scala/MongoPersistentActorSpec.scala
@@ -1,32 +1,19 @@
package se.scalablesolutions.akka.persistence.mongo
-import org.junit.{Test, Before}
-import org.junit.Assert._
-import org.scalatest.junit.JUnitSuite
-
-import _root_.dispatch.json.{JsNumber, JsValue}
-import _root_.dispatch.json.Js._
+import org.scalatest.Spec
+import org.scalatest.matchers.ShouldMatchers
+import org.scalatest.BeforeAndAfterEach
+import org.scalatest.junit.JUnitRunner
+import org.junit.runner.RunWith
import se.scalablesolutions.akka.actor.{Transactor, Actor, ActorRef}
import Actor._
-/**
- * A persistent actor based on MongoDB storage.
- *
- * Demonstrates a bank account operation consisting of messages that:
- * checks balance Balance
- * debits amountDebit
- * debits multiple amountsMultiDebit
- * credits amountCredit
- *
- * Needs a running Mongo server.
- * @author Debasish Ghosh
- */
case class Balance(accountNo: String)
-case class Debit(accountNo: String, amount: BigInt, failer: ActorRef)
-case class MultiDebit(accountNo: String, amounts: List[BigInt], failer: ActorRef)
-case class Credit(accountNo: String, amount: BigInt)
+case class Debit(accountNo: String, amount: Int, failer: ActorRef)
+case class MultiDebit(accountNo: String, amounts: List[Int], failer: ActorRef)
+case class Credit(accountNo: String, amount: Int)
case class Log(start: Int, finish: Int)
case object LogSize
@@ -35,63 +22,65 @@ class BankAccountActor extends Transactor {
private lazy val accountState = MongoStorage.newMap
private lazy val txnLog = MongoStorage.newVector
+ import sjson.json.DefaultProtocol._
+ import sjson.json.JsonSerialization._
+
def receive: Receive = {
// check balance
case Balance(accountNo) =>
- txnLog.add("Balance:" + accountNo)
- self.reply(accountState.get(accountNo).get)
+ txnLog.add(("Balance:" + accountNo).getBytes)
+ self.reply(
+ accountState.get(accountNo.getBytes)
+ .map(frombinary[Int](_))
+ .getOrElse(0))
// debit amount: can fail
case Debit(accountNo, amount, failer) =>
- txnLog.add("Debit:" + accountNo + " " + amount)
+ txnLog.add(("Debit:" + accountNo + " " + amount).getBytes)
+ val m = accountState.get(accountNo.getBytes)
+ .map(frombinary[Int](_))
+ .getOrElse(0)
+
+ accountState.put(accountNo.getBytes, tobinary(m - amount))
+ if (amount > m) failer !! "Failure"
- val m: BigInt =
- accountState.get(accountNo) match {
- case Some(JsNumber(n)) =>
- BigInt(n.asInstanceOf[BigDecimal].intValue)
- case None => 0
- }
- accountState.put(accountNo, (m - amount))
- if (amount > m)
- failer !! "Failure"
self.reply(m - amount)
// many debits: can fail
// demonstrates true rollback even if multiple puts have been done
case MultiDebit(accountNo, amounts, failer) =>
- txnLog.add("MultiDebit:" + accountNo + " " + amounts.map(_.intValue).foldLeft(0)(_ + _))
+ val sum = amounts.foldRight(0)(_ + _)
+ txnLog.add(("MultiDebit:" + accountNo + " " + sum).getBytes)
- val m: BigInt =
- accountState.get(accountNo) match {
- case Some(JsNumber(n)) => BigInt(n.toString)
- case None => 0
+ val m = accountState.get(accountNo.getBytes)
+ .map(frombinary[Int](_))
+ .getOrElse(0)
+
+ var cbal = m
+ amounts.foreach { amount =>
+ accountState.put(accountNo.getBytes, tobinary(m - amount))
+ cbal = cbal - amount
+ if (cbal < 0) failer !! "Failure"
}
- var bal: BigInt = 0
- amounts.foreach {amount =>
- bal = bal + amount
- accountState.put(accountNo, (m - bal))
- }
- if (bal > m) failer !! "Failure"
- self.reply(m - bal)
+
+ self.reply(m - sum)
// credit amount
case Credit(accountNo, amount) =>
- txnLog.add("Credit:" + accountNo + " " + amount)
+ txnLog.add(("Credit:" + accountNo + " " + amount).getBytes)
+ val m = accountState.get(accountNo.getBytes)
+ .map(frombinary[Int](_))
+ .getOrElse(0)
+
+ accountState.put(accountNo.getBytes, tobinary(m + amount))
- val m: BigInt =
- accountState.get(accountNo) match {
- case Some(JsNumber(n)) =>
- BigInt(n.asInstanceOf[BigDecimal].intValue)
- case None => 0
- }
- accountState.put(accountNo, (m + amount))
self.reply(m + amount)
case LogSize =>
- self.reply(txnLog.length.asInstanceOf[AnyRef])
+ self.reply(txnLog.length)
case Log(start, finish) =>
- self.reply(txnLog.slice(start, finish))
+ self.reply(txnLog.slice(start, finish).map(new String(_)))
}
}
@@ -102,82 +91,71 @@ class BankAccountActor extends Transactor {
}
}
-class MongoPersistentActorSpec extends JUnitSuite {
- @Test
- def testSuccessfulDebit = {
- val bactor = actorOf[BankAccountActor]
- bactor.start
- val failer = actorOf[PersistentFailerActor]
- failer.start
- bactor !! Credit("a-123", 5000)
- bactor !! Debit("a-123", 3000, failer)
+@RunWith(classOf[JUnitRunner])
+class MongoPersistentActorSpec extends
+ Spec with
+ ShouldMatchers with
+ BeforeAndAfterEach {
- val JsNumber(b) = (bactor !! Balance("a-123")).get.asInstanceOf[JsValue]
- assertEquals(BigInt(2000), BigInt(b.intValue))
-
- bactor !! Credit("a-123", 7000)
-
- val JsNumber(b1) = (bactor !! Balance("a-123")).get.asInstanceOf[JsValue]
- assertEquals(BigInt(9000), BigInt(b1.intValue))
-
- bactor !! Debit("a-123", 8000, failer)
-
- val JsNumber(b2) = (bactor !! Balance("a-123")).get.asInstanceOf[JsValue]
- assertEquals(BigInt(1000), BigInt(b2.intValue))
-
- assert(7 == (bactor !! LogSize).get.asInstanceOf[Int])
-
- import scala.collection.mutable.ArrayBuffer
- assert((bactor !! Log(0, 7)).get.asInstanceOf[ArrayBuffer[String]].size == 7)
- assert((bactor !! Log(0, 0)).get.asInstanceOf[ArrayBuffer[String]].size == 0)
- assert((bactor !! Log(1, 2)).get.asInstanceOf[ArrayBuffer[String]].size == 1)
- assert((bactor !! Log(6, 7)).get.asInstanceOf[ArrayBuffer[String]].size == 1)
- assert((bactor !! Log(0, 1)).get.asInstanceOf[ArrayBuffer[String]].size == 1)
+ override def beforeEach {
+ MongoStorageBackend.drop
}
- @Test
- def testUnsuccessfulDebit = {
- val bactor = actorOf[BankAccountActor]
- bactor.start
- bactor !! Credit("a-123", 5000)
-
- val JsNumber(b) = (bactor !! Balance("a-123")).get.asInstanceOf[JsValue]
- assertEquals(BigInt(5000), BigInt(b.intValue))
-
- val failer = actorOf[PersistentFailerActor]
- failer.start
- try {
- bactor !! Debit("a-123", 7000, failer)
- fail("should throw exception")
- } catch { case e: RuntimeException => {}}
-
- val JsNumber(b1) = (bactor !! Balance("a-123")).get.asInstanceOf[JsValue]
- assertEquals(BigInt(5000), BigInt(b1.intValue))
-
- // should not count the failed one
- assert(3 == (bactor !! LogSize).get.asInstanceOf[Int])
+ override def afterEach {
+ MongoStorageBackend.drop
}
- @Test
- def testUnsuccessfulMultiDebit = {
- val bactor = actorOf[BankAccountActor]
- bactor.start
- bactor !! Credit("a-123", 5000)
+ describe("successful debit") {
+ it("should debit successfully") {
+ val bactor = actorOf[BankAccountActor]
+ bactor.start
+ val failer = actorOf[PersistentFailerActor]
+ failer.start
+ bactor !! Credit("a-123", 5000)
+ bactor !! Debit("a-123", 3000, failer)
- val JsNumber(b) = (bactor !! Balance("a-123")).get.asInstanceOf[JsValue]
- assertEquals(BigInt(5000), BigInt(b.intValue))
+ (bactor !! Balance("a-123")).get.asInstanceOf[Int] should equal(2000)
- val failer = actorOf[PersistentFailerActor]
- failer.start
- try {
- bactor !! MultiDebit("a-123", List(500, 2000, 1000, 3000), failer)
- fail("should throw exception")
- } catch { case e: RuntimeException => {}}
+ bactor !! Credit("a-123", 7000)
+ (bactor !! Balance("a-123")).get.asInstanceOf[Int] should equal(9000)
- val JsNumber(b1) = (bactor !! Balance("a-123")).get.asInstanceOf[JsValue]
- assertEquals(BigInt(5000), BigInt(b1.intValue))
+ bactor !! Debit("a-123", 8000, failer)
+ (bactor !! Balance("a-123")).get.asInstanceOf[Int] should equal(1000)
- // should not count the failed one
- assert(3 == (bactor !! LogSize).get.asInstanceOf[Int])
+ (bactor !! LogSize).get.asInstanceOf[Int] should equal(7)
+ (bactor !! Log(0, 7)).get.asInstanceOf[Iterable[String]].size should equal(7)
+ }
+ }
+
+ describe("unsuccessful debit") {
+ it("debit should fail") {
+ val bactor = actorOf[BankAccountActor]
+ bactor.start
+ val failer = actorOf[PersistentFailerActor]
+ failer.start
+ bactor !! Credit("a-123", 5000)
+ (bactor !! Balance("a-123")).get.asInstanceOf[Int] should equal(5000)
+ evaluating {
+ bactor !! Debit("a-123", 7000, failer)
+ } should produce [Exception]
+ (bactor !! Balance("a-123")).get.asInstanceOf[Int] should equal(5000)
+ (bactor !! LogSize).get.asInstanceOf[Int] should equal(3)
+ }
+ }
+
+ describe("unsuccessful multidebit") {
+ it("multidebit should fail") {
+ val bactor = actorOf[BankAccountActor]
+ bactor.start
+ val failer = actorOf[PersistentFailerActor]
+ failer.start
+ bactor !! Credit("a-123", 5000)
+ (bactor !! Balance("a-123")).get.asInstanceOf[Int] should equal(5000)
+ evaluating {
+ bactor !! MultiDebit("a-123", List(1000, 2000, 4000), failer)
+ } should produce [Exception]
+ (bactor !! Balance("a-123")).get.asInstanceOf[Int] should equal(5000)
+ (bactor !! LogSize).get.asInstanceOf[Int] should equal(3)
+ }
}
}
diff --git a/akka-persistence/akka-persistence-mongo/src/test/scala/MongoStorageSpec.scala b/akka-persistence/akka-persistence-mongo/src/test/scala/MongoStorageSpec.scala
index e518b28d66..e9576cc152 100644
--- a/akka-persistence/akka-persistence-mongo/src/test/scala/MongoStorageSpec.scala
+++ b/akka-persistence/akka-persistence-mongo/src/test/scala/MongoStorageSpec.scala
@@ -1,364 +1,158 @@
package se.scalablesolutions.akka.persistence.mongo
-import org.junit.{Test, Before}
-import org.junit.Assert._
-import org.scalatest.junit.JUnitSuite
-import _root_.dispatch.json._
-import _root_.dispatch.json.Js._
+import org.scalatest.Spec
+import org.scalatest.matchers.ShouldMatchers
+import org.scalatest.BeforeAndAfterEach
+import org.scalatest.junit.JUnitRunner
+import org.junit.runner.RunWith
import java.util.NoSuchElementException
-@scala.reflect.BeanInfo case class Foo(no: Int, name: String)
-class MongoStorageSpec extends JUnitSuite {
+@RunWith(classOf[JUnitRunner])
+class MongoStorageSpec extends
+ Spec with
+ ShouldMatchers with
+ BeforeAndAfterEach {
- val changeSetV = new scala.collection.mutable.ArrayBuffer[AnyRef]
- val changeSetM = new scala.collection.mutable.HashMap[AnyRef, AnyRef]
-
- @Before def initialize() = {
- MongoStorageBackend.coll.drop
+ override def beforeEach {
+ MongoStorageBackend.drop
}
- @Test
- def testVectorInsertForTransactionId = {
- changeSetV += "debasish" // string
- changeSetV += List(1, 2, 3) // Scala List
- changeSetV += List(100, 200)
- MongoStorageBackend.insertVectorStorageEntriesFor("U-A1", changeSetV.toList)
- assertEquals(
- 3,
- MongoStorageBackend.getVectorStorageSizeFor("U-A1"))
- changeSetV.clear
-
- // changeSetV should be reinitialized
- changeSetV += List(12, 23, 45)
- changeSetV += "maulindu"
- MongoStorageBackend.insertVectorStorageEntriesFor("U-A1", changeSetV.toList)
- assertEquals(
- 5,
- MongoStorageBackend.getVectorStorageSizeFor("U-A1"))
-
- // add more to the same changeSetV
- changeSetV += "ramanendu"
- changeSetV += Map(1 -> "dg", 2 -> "mc")
-
- // add for a diff transaction
- MongoStorageBackend.insertVectorStorageEntriesFor("U-A2", changeSetV.toList)
- assertEquals(
- 4,
- MongoStorageBackend.getVectorStorageSizeFor("U-A2"))
-
- // previous transaction change set should remain same
- assertEquals(
- 5,
- MongoStorageBackend.getVectorStorageSizeFor("U-A1"))
-
- // test single element entry
- MongoStorageBackend.insertVectorStorageEntryFor("U-A1", Map(1->1, 2->4, 3->9))
- assertEquals(
- 6,
- MongoStorageBackend.getVectorStorageSizeFor("U-A1"))
+ override def afterEach {
+ MongoStorageBackend.drop
}
- @Test
- def testVectorFetchForKeys = {
+ describe("persistent maps") {
+ it("should insert with single key and value") {
+ import MongoStorageBackend._
- // initially everything 0
- assertEquals(
- 0,
- MongoStorageBackend.getVectorStorageSizeFor("U-A2"))
-
- assertEquals(
- 0,
- MongoStorageBackend.getVectorStorageSizeFor("U-A1"))
-
- // get some stuff
- changeSetV += "debasish"
- changeSetV += List(BigDecimal(12), BigDecimal(13), BigDecimal(14))
- MongoStorageBackend.insertVectorStorageEntriesFor("U-A1", changeSetV.toList)
-
- assertEquals(
- 2,
- MongoStorageBackend.getVectorStorageSizeFor("U-A1"))
-
- val JsString(str) = MongoStorageBackend.getVectorStorageEntryFor("U-A1", 0).asInstanceOf[JsString]
- assertEquals("debasish", str)
-
- val l = MongoStorageBackend.getVectorStorageEntryFor("U-A1", 1).asInstanceOf[JsValue]
- val num_list = list ! num
- val num_list(l0) = l
- assertEquals(List(12, 13, 14), l0)
-
- changeSetV.clear
- changeSetV += Map(1->1, 2->4, 3->9)
- changeSetV += BigInt(2310)
- changeSetV += List(100, 200, 300)
- MongoStorageBackend.insertVectorStorageEntriesFor("U-A1", changeSetV.toList)
-
- assertEquals(
- 5,
- MongoStorageBackend.getVectorStorageSizeFor("U-A1"))
-
- val r =
- MongoStorageBackend.getVectorStorageRangeFor("U-A1", Some(1), None, 3)
-
- assertEquals(3, r.size)
- val lr = r(0).asInstanceOf[JsValue]
- val num_list(l1) = lr
- assertEquals(List(12, 13, 14), l1)
- }
-
- @Test
- def testVectorFetchForNonExistentKeys = {
- try {
- MongoStorageBackend.getVectorStorageEntryFor("U-A1", 1)
- fail("should throw an exception")
- } catch {case e: NoSuchElementException => {}}
-
- try {
- MongoStorageBackend.getVectorStorageRangeFor("U-A1", Some(2), None, 12)
- fail("should throw an exception")
- } catch {case e: NoSuchElementException => {}}
- }
-
- @Test
- def testVectorUpdateForTransactionId = {
- import MongoStorageBackend._
-
- changeSetV += "debasish" // string
- changeSetV += List(1, 2, 3) // Scala List
- changeSetV += List(100, 200)
-
- insertVectorStorageEntriesFor("U-A1", changeSetV.toList)
- assertEquals(3, getVectorStorageSizeFor("U-A1"))
- updateVectorStorageEntryFor("U-A1", 0, "maulindu")
- val JsString(str) = getVectorStorageEntryFor("U-A1", 0).asInstanceOf[JsString]
- assertEquals("maulindu", str)
-
- updateVectorStorageEntryFor("U-A1", 1, Map("1"->"dg", "2"->"mc"))
- val JsObject(m) = getVectorStorageEntryFor("U-A1", 1).asInstanceOf[JsObject]
- assertEquals(m.keySet.size, 2)
- }
-
- @Test
- def testMapInsertForTransactionId = {
- fillMap
-
- // add some more to changeSet
- changeSetM += "5" -> Foo(12, "dg")
- changeSetM += "6" -> java.util.Calendar.getInstance.getTime
-
- // insert all into Mongo
- MongoStorageBackend.insertMapStorageEntriesFor("U-M1", changeSetM.toList)
- assertEquals(
- 6,
- MongoStorageBackend.getMapStorageSizeFor("U-M1"))
-
- // individual insert api
- MongoStorageBackend.insertMapStorageEntryFor("U-M1", "7", "akka")
- MongoStorageBackend.insertMapStorageEntryFor("U-M1", "8", List(23, 25))
- assertEquals(
- 8,
- MongoStorageBackend.getMapStorageSizeFor("U-M1"))
-
- // add the same changeSet for another transaction
- MongoStorageBackend.insertMapStorageEntriesFor("U-M2", changeSetM.toList)
- assertEquals(
- 6,
- MongoStorageBackend.getMapStorageSizeFor("U-M2"))
-
- // the first transaction should remain the same
- assertEquals(
- 8,
- MongoStorageBackend.getMapStorageSizeFor("U-M1"))
- changeSetM.clear
- }
-
- @Test
- def testMapContents = {
- fillMap
- MongoStorageBackend.insertMapStorageEntriesFor("U-M1", changeSetM.toList)
- MongoStorageBackend.getMapStorageEntryFor("U-M1", "2") match {
- case Some(x) => {
- val JsString(str) = x.asInstanceOf[JsValue]
- assertEquals("peter", str)
- }
- case None => fail("should fetch peter")
- }
- MongoStorageBackend.getMapStorageEntryFor("U-M1", "4") match {
- case Some(x) => {
- val num_list = list ! num
- val num_list(l0) = x.asInstanceOf[JsValue]
- assertEquals(3, l0.size)
- }
- case None => fail("should fetch list")
- }
- MongoStorageBackend.getMapStorageEntryFor("U-M1", "3") match {
- case Some(x) => {
- val num_list = list ! num
- val num_list(l0) = x.asInstanceOf[JsValue]
- assertEquals(2, l0.size)
- }
- case None => fail("should fetch list")
+ insertMapStorageEntryFor("t1", "odersky".getBytes, "scala".getBytes)
+ insertMapStorageEntryFor("t1", "gosling".getBytes, "java".getBytes)
+ insertMapStorageEntryFor("t1", "stroustrup".getBytes, "c++".getBytes)
+ getMapStorageSizeFor("t1") should equal(3)
+ new String(getMapStorageEntryFor("t1", "odersky".getBytes).get) should equal("scala")
+ new String(getMapStorageEntryFor("t1", "gosling".getBytes).get) should equal("java")
+ new String(getMapStorageEntryFor("t1", "stroustrup".getBytes).get) should equal("c++")
+ getMapStorageEntryFor("t1", "torvalds".getBytes) should equal(None)
}
- // get the entire map
- val l: List[Tuple2[AnyRef, AnyRef]] =
- MongoStorageBackend.getMapStorageFor("U-M1")
+ it("should insert with multiple keys and values") {
+ import MongoStorageBackend._
- assertEquals(4, l.size)
- assertTrue(l.map(_._1).contains("1"))
- assertTrue(l.map(_._1).contains("2"))
- assertTrue(l.map(_._1).contains("3"))
- assertTrue(l.map(_._1).contains("4"))
+ val l = List(("stroustrup", "c++"), ("odersky", "scala"), ("gosling", "java"))
+ insertMapStorageEntriesFor("t1", l.map { case (k, v) => (k.getBytes, v.getBytes) })
+ getMapStorageSizeFor("t1") should equal(3)
+ new String(getMapStorageEntryFor("t1", "stroustrup".getBytes).get) should equal("c++")
+ new String(getMapStorageEntryFor("t1", "gosling".getBytes).get) should equal("java")
+ new String(getMapStorageEntryFor("t1", "odersky".getBytes).get) should equal("scala")
+ getMapStorageEntryFor("t1", "torvalds".getBytes) should equal(None)
- val JsString(str) = l.filter(_._1 == "2").head._2
- assertEquals(str, "peter")
+ getMapStorageEntryFor("t2", "torvalds".getBytes) should equal(None)
- // trying to fetch for a non-existent transaction will throw
- try {
- MongoStorageBackend.getMapStorageFor("U-M2")
- fail("should throw an exception")
- } catch {case e: NoSuchElementException => {}}
+ getMapStorageFor("t1").map { case (k, v) => (new String(k), new String(v)) } should equal (l)
- changeSetM.clear
- }
+ removeMapStorageFor("t1", "gosling".getBytes)
+ getMapStorageSizeFor("t1") should equal(2)
- @Test
- def testMapContentsByRange = {
- fillMap
- changeSetM += "5" -> Map(1 -> "dg", 2 -> "mc")
- MongoStorageBackend.insertMapStorageEntriesFor("U-M1", changeSetM.toList)
-
- // specify start and count
- val l: List[Tuple2[AnyRef, AnyRef]] =
- MongoStorageBackend.getMapStorageRangeFor(
- "U-M1", Some(Integer.valueOf(2)), None, 3)
-
- assertEquals(3, l.size)
- assertEquals("3", l(0)._1.asInstanceOf[String])
- val lst = l(0)._2.asInstanceOf[JsValue]
- val num_list = list ! num
- val num_list(l0) = lst
- assertEquals(List(100, 200), l0)
- assertEquals("4", l(1)._1.asInstanceOf[String])
- val ls = l(1)._2.asInstanceOf[JsValue]
- val num_list(l1) = ls
- assertEquals(List(10, 20, 30), l1)
-
- // specify start, finish and count where finish - start == count
- assertEquals(3,
- MongoStorageBackend.getMapStorageRangeFor(
- "U-M1", Some(Integer.valueOf(2)), Some(Integer.valueOf(5)), 3).size)
-
- // specify start, finish and count where finish - start > count
- assertEquals(3,
- MongoStorageBackend.getMapStorageRangeFor(
- "U-M1", Some(Integer.valueOf(2)), Some(Integer.valueOf(9)), 3).size)
-
- // do not specify start or finish
- assertEquals(3,
- MongoStorageBackend.getMapStorageRangeFor(
- "U-M1", None, None, 3).size)
-
- // specify finish and count
- assertEquals(3,
- MongoStorageBackend.getMapStorageRangeFor(
- "U-M1", None, Some(Integer.valueOf(3)), 3).size)
-
- // specify start, finish and count where finish < start
- assertEquals(3,
- MongoStorageBackend.getMapStorageRangeFor(
- "U-M1", Some(Integer.valueOf(2)), Some(Integer.valueOf(1)), 3).size)
-
- changeSetM.clear
- }
-
- @Test
- def testMapStorageRemove = {
- fillMap
- changeSetM += "5" -> Map(1 -> "dg", 2 -> "mc")
-
- MongoStorageBackend.insertMapStorageEntriesFor("U-M1", changeSetM.toList)
- assertEquals(5,
- MongoStorageBackend.getMapStorageSizeFor("U-M1"))
-
- // remove key "3"
- MongoStorageBackend.removeMapStorageFor("U-M1", "3")
- assertEquals(4,
- MongoStorageBackend.getMapStorageSizeFor("U-M1"))
-
- try {
- MongoStorageBackend.getMapStorageEntryFor("U-M1", "3")
- fail("should throw exception")
- } catch { case e => {}}
-
- // remove key "4"
- MongoStorageBackend.removeMapStorageFor("U-M1", "4")
- assertEquals(3,
- MongoStorageBackend.getMapStorageSizeFor("U-M1"))
-
- // remove key "2"
- MongoStorageBackend.removeMapStorageFor("U-M1", "2")
- assertEquals(2,
- MongoStorageBackend.getMapStorageSizeFor("U-M1"))
-
- // remove the whole stuff
- MongoStorageBackend.removeMapStorageFor("U-M1")
-
- try {
- MongoStorageBackend.getMapStorageFor("U-M1")
- fail("should throw exception")
- } catch { case e: NoSuchElementException => {}}
-
- changeSetM.clear
- }
-
- private def fillMap = {
- changeSetM += "1" -> "john"
- changeSetM += "2" -> "peter"
- changeSetM += "3" -> List(100, 200)
- changeSetM += "4" -> List(10, 20, 30)
- changeSetM
- }
-
- @Test
- def testRefStorage = {
- MongoStorageBackend.getRefStorageFor("U-R1") match {
- case None =>
- case Some(o) => fail("should be None")
+ removeMapStorageFor("t1")
+ getMapStorageSizeFor("t1") should equal(0)
}
- val m = Map("1"->1, "2"->4, "3"->9)
- MongoStorageBackend.insertRefStorageFor("U-R1", m)
- MongoStorageBackend.getRefStorageFor("U-R1") match {
- case None => fail("should not be empty")
- case Some(r) => {
- val a = r.asInstanceOf[JsValue]
- val m1 = Symbol("1") ? num
- val m2 = Symbol("2") ? num
- val m3 = Symbol("3") ? num
+ it("should do proper range queries") {
+ import MongoStorageBackend._
+ val l = List(
+ ("bjarne stroustrup", "c++"),
+ ("martin odersky", "scala"),
+ ("james gosling", "java"),
+ ("yukihiro matsumoto", "ruby"),
+ ("slava pestov", "factor"),
+ ("rich hickey", "clojure"),
+ ("ola bini", "ioke"),
+ ("dennis ritchie", "c"),
+ ("larry wall", "perl"),
+ ("guido van rossum", "python"),
+ ("james strachan", "groovy"))
+ insertMapStorageEntriesFor("t1", l.map { case (k, v) => (k.getBytes, v.getBytes) })
+ getMapStorageSizeFor("t1") should equal(l.size)
+ getMapStorageRangeFor("t1", None, None, 100).map { case (k, v) => (new String(k), new String(v)) } should equal(l.sortWith(_._1 < _._1))
+ getMapStorageRangeFor("t1", None, None, 5).map { case (k, v) => (new String(k), new String(v)) }.size should equal(5)
+ }
+ }
- val m1(n1) = a
- val m2(n2) = a
- val m3(n3) = a
+ describe("persistent vectors") {
+ it("should insert a single value") {
+ import MongoStorageBackend._
- assertEquals(n1, 1)
- assertEquals(n2, 4)
- assertEquals(n3, 9)
- }
+ insertVectorStorageEntryFor("t1", "martin odersky".getBytes)
+ insertVectorStorageEntryFor("t1", "james gosling".getBytes)
+ new String(getVectorStorageEntryFor("t1", 0)) should equal("james gosling")
+ new String(getVectorStorageEntryFor("t1", 1)) should equal("martin odersky")
}
- // insert another one
- // the previous one should be replaced
- val b = List("100", "jonas")
- MongoStorageBackend.insertRefStorageFor("U-R1", b)
- MongoStorageBackend.getRefStorageFor("U-R1") match {
- case None => fail("should not be empty")
- case Some(r) => {
- val a = r.asInstanceOf[JsValue]
- val str_lst = list ! str
- val str_lst(l) = a
- assertEquals(b, l)
- }
+ it("should insert multiple values") {
+ import MongoStorageBackend._
+
+ insertVectorStorageEntryFor("t1", "martin odersky".getBytes)
+ insertVectorStorageEntryFor("t1", "james gosling".getBytes)
+ insertVectorStorageEntriesFor("t1", List("ola bini".getBytes, "james strachan".getBytes, "dennis ritchie".getBytes))
+ new String(getVectorStorageEntryFor("t1", 0)) should equal("ola bini")
+ new String(getVectorStorageEntryFor("t1", 1)) should equal("james strachan")
+ new String(getVectorStorageEntryFor("t1", 2)) should equal("dennis ritchie")
+ new String(getVectorStorageEntryFor("t1", 3)) should equal("james gosling")
+ new String(getVectorStorageEntryFor("t1", 4)) should equal("martin odersky")
+ }
+
+ it("should fetch a range of values") {
+ import MongoStorageBackend._
+
+ insertVectorStorageEntryFor("t1", "martin odersky".getBytes)
+ insertVectorStorageEntryFor("t1", "james gosling".getBytes)
+ getVectorStorageSizeFor("t1") should equal(2)
+ insertVectorStorageEntriesFor("t1", List("ola bini".getBytes, "james strachan".getBytes, "dennis ritchie".getBytes))
+ getVectorStorageRangeFor("t1", None, None, 100).map(new String(_)) should equal(List("ola bini", "james strachan", "dennis ritchie", "james gosling", "martin odersky"))
+ getVectorStorageRangeFor("t1", Some(0), Some(5), 100).map(new String(_)) should equal(List("ola bini", "james strachan", "dennis ritchie", "james gosling", "martin odersky"))
+ getVectorStorageRangeFor("t1", Some(2), Some(5), 100).map(new String(_)) should equal(List("dennis ritchie", "james gosling", "martin odersky"))
+
+ getVectorStorageSizeFor("t1") should equal(5)
+ }
+
+ it("should insert and query complex structures") {
+ import MongoStorageBackend._
+ import sjson.json.DefaultProtocol._
+ import sjson.json.JsonSerialization._
+
+ // a list[AnyRef] should be added successfully
+ val l = List("ola bini".getBytes, tobinary(List(100, 200, 300)), tobinary(List(1, 2, 3)))
+
+ // for id = t1
+ insertVectorStorageEntriesFor("t1", l)
+ new String(getVectorStorageEntryFor("t1", 0)) should equal("ola bini")
+ frombinary[List[Int]](getVectorStorageEntryFor("t1", 1)) should equal(List(100, 200, 300))
+ frombinary[List[Int]](getVectorStorageEntryFor("t1", 2)) should equal(List(1, 2, 3))
+
+ getVectorStorageSizeFor("t1") should equal(3)
+
+ // some more for id = t1
+ val m = List(tobinary(Map(1 -> "dg", 2 -> "mc", 3 -> "nd")), tobinary(List("martin odersky", "james gosling")))
+ insertVectorStorageEntriesFor("t1", m)
+
+ // size should add up
+ getVectorStorageSizeFor("t1") should equal(5)
+
+ // now for a diff id
+ insertVectorStorageEntriesFor("t2", l)
+ getVectorStorageSizeFor("t2") should equal(3)
+ }
+ }
+
+ describe("persistent refs") {
+ it("should insert a ref") {
+ import MongoStorageBackend._
+
+ insertRefStorageFor("t1", "martin odersky".getBytes)
+ new String(getRefStorageFor("t1").get) should equal("martin odersky")
+ insertRefStorageFor("t1", "james gosling".getBytes)
+ new String(getRefStorageFor("t1").get) should equal("james gosling")
+ getRefStorageFor("t2") should equal(None)
}
}
}
diff --git a/akka-persistence/akka-persistence-mongo/src/test/scala/MongoTicket343Spec.scala b/akka-persistence/akka-persistence-mongo/src/test/scala/MongoTicket343Spec.scala
new file mode 100644
index 0000000000..3b160c8c50
--- /dev/null
+++ b/akka-persistence/akka-persistence-mongo/src/test/scala/MongoTicket343Spec.scala
@@ -0,0 +1,347 @@
+package se.scalablesolutions.akka.persistence.mongo
+
+import org.scalatest.Spec
+import org.scalatest.matchers.ShouldMatchers
+import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
+import org.scalatest.junit.JUnitRunner
+import org.junit.runner.RunWith
+
+import se.scalablesolutions.akka.actor.{Actor, ActorRef}
+import se.scalablesolutions.akka.config.OneForOneStrategy
+import Actor._
+import se.scalablesolutions.akka.stm.global._
+import se.scalablesolutions.akka.config.ScalaConfig._
+import se.scalablesolutions.akka.util.Logging
+
+import MongoStorageBackend._
+
+case class GET(k: String)
+case class SET(k: String, v: String)
+case class REM(k: String)
+case class CONTAINS(k: String)
+case object MAP_SIZE
+case class MSET(kvs: List[(String, String)])
+case class REMOVE_AFTER_PUT(kvsToAdd: List[(String, String)], ksToRem: List[String])
+case class CLEAR_AFTER_PUT(kvsToAdd: List[(String, String)])
+case class PUT_WITH_SLICE(kvsToAdd: List[(String, String)], start: String, cnt: Int)
+case class PUT_REM_WITH_SLICE(kvsToAdd: List[(String, String)], ksToRem: List[String], start: String, cnt: Int)
+
+case class VADD(v: String)
+case class VUPD(i: Int, v: String)
+case class VUPD_AND_ABORT(i: Int, v: String)
+case class VGET(i: Int)
+case object VSIZE
+case class VGET_AFTER_VADD(vsToAdd: List[String], isToFetch: List[Int])
+case class VADD_WITH_SLICE(vsToAdd: List[String], start: Int, cnt: Int)
+
+object Storage {
+ class MongoSampleMapStorage extends Actor {
+ self.lifeCycle = Some(LifeCycle(Permanent))
+ val FOO_MAP = "akka.sample.map"
+
+ private var fooMap = atomic { MongoStorage.getMap(FOO_MAP) }
+
+ def receive = {
+ case SET(k, v) =>
+ atomic {
+ fooMap += (k.getBytes, v.getBytes)
+ }
+ self.reply((k, v))
+
+ case GET(k) =>
+ val v = atomic {
+ fooMap.get(k.getBytes).map(new String(_)).getOrElse(k + " Not found")
+ }
+ self.reply(v)
+
+ case REM(k) =>
+ val v = atomic {
+ fooMap -= k.getBytes
+ }
+ self.reply(k)
+
+ case CONTAINS(k) =>
+ val v = atomic {
+ fooMap contains k.getBytes
+ }
+ self.reply(v)
+
+ case MAP_SIZE =>
+ val v = atomic {
+ fooMap.size
+ }
+ self.reply(v)
+
+ case MSET(kvs) => atomic {
+ kvs.foreach {kv => fooMap += (kv._1.getBytes, kv._2.getBytes) }
+ }
+ self.reply(kvs.size)
+
+ case REMOVE_AFTER_PUT(kvs2add, ks2rem) => atomic {
+ kvs2add.foreach {kv =>
+ fooMap += (kv._1.getBytes, kv._2.getBytes)
+ }
+
+ ks2rem.foreach {k =>
+ fooMap -= k.getBytes
+ }}
+ self.reply(fooMap.size)
+
+ case CLEAR_AFTER_PUT(kvs2add) => atomic {
+ kvs2add.foreach {kv =>
+ fooMap += (kv._1.getBytes, kv._2.getBytes)
+ }
+ fooMap.clear
+ }
+ self.reply(true)
+
+ case PUT_WITH_SLICE(kvs2add, from, cnt) =>
+ val v = atomic {
+ kvs2add.foreach {kv =>
+ fooMap += (kv._1.getBytes, kv._2.getBytes)
+ }
+ fooMap.slice(Some(from.getBytes), cnt)
+ }
+ self.reply(v: List[(Array[Byte], Array[Byte])])
+
+ case PUT_REM_WITH_SLICE(kvs2add, ks2rem, from, cnt) =>
+ val v = atomic {
+ kvs2add.foreach {kv =>
+ fooMap += (kv._1.getBytes, kv._2.getBytes)
+ }
+ ks2rem.foreach {k =>
+ fooMap -= k.getBytes
+ }
+ fooMap.slice(Some(from.getBytes), cnt)
+ }
+ self.reply(v: List[(Array[Byte], Array[Byte])])
+ }
+ }
+
+ class MongoSampleVectorStorage extends Actor {
+ self.lifeCycle = Some(LifeCycle(Permanent))
+ val FOO_VECTOR = "akka.sample.vector"
+
+ private var fooVector = atomic { MongoStorage.getVector(FOO_VECTOR) }
+
+ def receive = {
+ case VADD(v) =>
+ val size =
+ atomic {
+ fooVector + v.getBytes
+ fooVector length
+ }
+ self.reply(size)
+
+ case VGET(index) =>
+ val ind =
+ atomic {
+ fooVector get index
+ }
+ self.reply(ind)
+
+ case VGET_AFTER_VADD(vs, is) =>
+ val els =
+ atomic {
+ vs.foreach(fooVector + _.getBytes)
+ (is.foldRight(List[Array[Byte]]())(fooVector.get(_) :: _)).map(new String(_))
+ }
+ self.reply(els)
+
+ case VUPD_AND_ABORT(index, value) =>
+ val l =
+ atomic {
+ fooVector.update(index, value.getBytes)
+ // force fail
+ fooVector get 100
+ }
+ self.reply(index)
+
+ case VADD_WITH_SLICE(vs, s, c) =>
+ val l =
+ atomic {
+ vs.foreach(fooVector + _.getBytes)
+ fooVector.slice(Some(s), None, c)
+ }
+ self.reply(l.map(new String(_)))
+ }
+ }
+}
+
+import Storage._
+
+@RunWith(classOf[JUnitRunner])
+class MongoTicket343Spec extends
+ Spec with
+ ShouldMatchers with
+ BeforeAndAfterAll with
+ BeforeAndAfterEach {
+
+
+ override def beforeAll {
+ MongoStorageBackend.drop
+ println("** destroyed database")
+ }
+
+ override def beforeEach {
+ MongoStorageBackend.drop
+ println("** destroyed database")
+ }
+
+ override def afterEach {
+ MongoStorageBackend.drop
+ println("** destroyed database")
+ }
+
+ describe("Ticket 343 Issue #1") {
+ it("remove after put should work within the same transaction") {
+ val proc = actorOf[MongoSampleMapStorage]
+ proc.start
+
+ (proc !! SET("debasish", "anshinsoft")).getOrElse("Set failed") should equal(("debasish", "anshinsoft"))
+ (proc !! GET("debasish")).getOrElse("Get failed") should equal("anshinsoft")
+ (proc !! MAP_SIZE).getOrElse("Size failed") should equal(1)
+
+ (proc !! MSET(List(("dg", "1"), ("mc", "2"), ("nd", "3")))).getOrElse("Mset failed") should equal(3)
+
+ (proc !! GET("dg")).getOrElse("Get failed") should equal("1")
+ (proc !! GET("mc")).getOrElse("Get failed") should equal("2")
+ (proc !! GET("nd")).getOrElse("Get failed") should equal("3")
+
+ (proc !! MAP_SIZE).getOrElse("Size failed") should equal(4)
+
+ val add = List(("a", "1"), ("b", "2"), ("c", "3"))
+ val rem = List("a", "debasish")
+ (proc !! REMOVE_AFTER_PUT(add, rem)).getOrElse("REMOVE_AFTER_PUT failed") should equal(5)
+
+ (proc !! GET("debasish")).getOrElse("debasish not found") should equal("debasish Not found")
+ (proc !! GET("a")).getOrElse("a not found") should equal("a Not found")
+
+ (proc !! GET("b")).getOrElse("b not found") should equal("2")
+
+ (proc !! CONTAINS("b")).getOrElse("b not found") should equal(true)
+ (proc !! CONTAINS("debasish")).getOrElse("debasish not found") should equal(false)
+ (proc !! MAP_SIZE).getOrElse("Size failed") should equal(5)
+ proc.stop
+ }
+ }
+
+ describe("Ticket 343 Issue #2") {
+ it("clear after put should work within the same transaction") {
+ val proc = actorOf[MongoSampleMapStorage]
+ proc.start
+
+ (proc !! SET("debasish", "anshinsoft")).getOrElse("Set failed") should equal(("debasish", "anshinsoft"))
+ (proc !! GET("debasish")).getOrElse("Get failed") should equal("anshinsoft")
+ (proc !! MAP_SIZE).getOrElse("Size failed") should equal(1)
+
+ val add = List(("a", "1"), ("b", "2"), ("c", "3"))
+ (proc !! CLEAR_AFTER_PUT(add)).getOrElse("CLEAR_AFTER_PUT failed") should equal(true)
+
+ (proc !! MAP_SIZE).getOrElse("Size failed") should equal(1)
+ proc.stop
+ }
+ }
+
+ describe("Ticket 343 Issue #3") {
+ it("map size should change after the transaction") {
+ val proc = actorOf[MongoSampleMapStorage]
+ proc.start
+
+ (proc !! SET("debasish", "anshinsoft")).getOrElse("Set failed") should equal(("debasish", "anshinsoft"))
+ (proc !! GET("debasish")).getOrElse("Get failed") should equal("anshinsoft")
+ (proc !! MAP_SIZE).getOrElse("Size failed") should equal(1)
+
+ (proc !! MSET(List(("dg", "1"), ("mc", "2"), ("nd", "3")))).getOrElse("Mset failed") should equal(3)
+ (proc !! MAP_SIZE).getOrElse("Size failed") should equal(4)
+
+ (proc !! GET("dg")).getOrElse("Get failed") should equal("1")
+ (proc !! GET("mc")).getOrElse("Get failed") should equal("2")
+ (proc !! GET("nd")).getOrElse("Get failed") should equal("3")
+ proc.stop
+ }
+ }
+
+ describe("slice test") {
+ it("should pass") {
+ val proc = actorOf[MongoSampleMapStorage]
+ proc.start
+
+ (proc !! SET("debasish", "anshinsoft")).getOrElse("Set failed") should equal(("debasish", "anshinsoft"))
+ (proc !! GET("debasish")).getOrElse("Get failed") should equal("anshinsoft")
+ // (proc !! MAP_SIZE).getOrElse("Size failed") should equal(1)
+
+ (proc !! MSET(List(("dg", "1"), ("mc", "2"), ("nd", "3")))).getOrElse("Mset failed") should equal(3)
+ (proc !! MAP_SIZE).getOrElse("Size failed") should equal(4)
+
+ (proc !! PUT_WITH_SLICE(List(("ec", "1"), ("tb", "2"), ("mc", "10")), "dg", 3)).get.asInstanceOf[List[(Array[Byte], Array[Byte])]].map { case (k, v) => (new String(k), new String(v)) } should equal(List(("dg", "1"), ("ec", "1"), ("mc", "10")))
+
+ (proc !! PUT_REM_WITH_SLICE(List(("fc", "1"), ("gb", "2"), ("xy", "10")), List("tb", "fc"), "dg", 5)).get.asInstanceOf[List[(Array[Byte], Array[Byte])]].map { case (k, v) => (new String(k), new String(v)) } should equal(List(("dg", "1"), ("ec", "1"), ("gb", "2"), ("mc", "10"), ("nd", "3")))
+ proc.stop
+ }
+ }
+
+ describe("Ticket 343 Issue #4") {
+ it("vector get should not ignore elements that were in vector before transaction") {
+
+ val proc = actorOf[MongoSampleVectorStorage]
+ proc.start
+
+ // add 4 elements in separate transactions
+ (proc !! VADD("debasish")).getOrElse("VADD failed") should equal(1)
+ (proc !! VADD("maulindu")).getOrElse("VADD failed") should equal(2)
+ (proc !! VADD("ramanendu")).getOrElse("VADD failed") should equal(3)
+ (proc !! VADD("nilanjan")).getOrElse("VADD failed") should equal(4)
+
+ new String((proc !! VGET(0)).get.asInstanceOf[Array[Byte]] ) should equal("nilanjan")
+ new String((proc !! VGET(1)).get.asInstanceOf[Array[Byte]] ) should equal("ramanendu")
+ new String((proc !! VGET(2)).get.asInstanceOf[Array[Byte]] ) should equal("maulindu")
+ new String((proc !! VGET(3)).get.asInstanceOf[Array[Byte]] ) should equal("debasish")
+
+ // now add 3 more and do gets in the same transaction
+ (proc !! VGET_AFTER_VADD(List("a", "b", "c"), List(0, 2, 4))).get.asInstanceOf[List[String]] should equal(List("c", "a", "ramanendu"))
+ proc.stop
+ }
+ }
+
+ describe("Ticket 343 Issue #6") {
+ it("vector update should not ignore transaction") {
+ val proc = actorOf[MongoSampleVectorStorage]
+ proc.start
+
+ // add 4 elements in separate transactions
+ (proc !! VADD("debasish")).getOrElse("VADD failed") should equal(1)
+ (proc !! VADD("maulindu")).getOrElse("VADD failed") should equal(2)
+ (proc !! VADD("ramanendu")).getOrElse("VADD failed") should equal(3)
+ (proc !! VADD("nilanjan")).getOrElse("VADD failed") should equal(4)
+
+ evaluating {
+ (proc !! VUPD_AND_ABORT(0, "virat")).getOrElse("VUPD_AND_ABORT failed")
+ } should produce [Exception]
+
+ // update aborts and hence values will remain unchanged
+ new String((proc !! VGET(0)).get.asInstanceOf[Array[Byte]] ) should equal("nilanjan")
+ proc.stop
+ }
+ }
+
+ describe("Ticket 343 Issue #5") {
+ it("vector slice() should not ignore elements added in current transaction") {
+ val proc = actorOf[MongoSampleVectorStorage]
+ proc.start
+
+ // add 4 elements in separate transactions
+ (proc !! VADD("debasish")).getOrElse("VADD failed") should equal(1)
+ (proc !! VADD("maulindu")).getOrElse("VADD failed") should equal(2)
+ (proc !! VADD("ramanendu")).getOrElse("VADD failed") should equal(3)
+ (proc !! VADD("nilanjan")).getOrElse("VADD failed") should equal(4)
+
+ // slice with no new elements added in current transaction
+ (proc !! VADD_WITH_SLICE(List(), 2, 2)).getOrElse("VADD_WITH_SLICE failed") should equal(Vector("maulindu", "debasish"))
+
+ // slice with new elements added in current transaction
+ (proc !! VADD_WITH_SLICE(List("a", "b", "c", "d"), 2, 2)).getOrElse("VADD_WITH_SLICE failed") should equal(Vector("b", "a"))
+ proc.stop
+ }
+ }
+}
diff --git a/akka-persistence/akka-persistence-redis/src/main/scala/RedisStorage.scala b/akka-persistence/akka-persistence-redis/src/main/scala/RedisStorage.scala
index c92761beea..1eca775567 100644
--- a/akka-persistence/akka-persistence-redis/src/main/scala/RedisStorage.scala
+++ b/akka-persistence/akka-persistence-redis/src/main/scala/RedisStorage.scala
@@ -36,7 +36,7 @@ object RedisStorage extends Storage {
*
* @author Debasish Ghosh
*/
-class RedisPersistentMap(id: String) extends PersistentMap[Array[Byte], Array[Byte]] {
+class RedisPersistentMap(id: String) extends PersistentMapBinary {
val uuid = id
val storage = RedisStorageBackend
}
diff --git a/akka-persistence/akka-persistence-redis/src/main/scala/RedisStorageBackend.scala b/akka-persistence/akka-persistence-redis/src/main/scala/RedisStorageBackend.scala
index 9200393ef9..61595ec21f 100644
--- a/akka-persistence/akka-persistence-redis/src/main/scala/RedisStorageBackend.scala
+++ b/akka-persistence/akka-persistence-redis/src/main/scala/RedisStorageBackend.scala
@@ -96,12 +96,12 @@ private [akka] object RedisStorageBackend extends
* both parts of the key need to be based64 encoded since there can be spaces within each of them
*/
private [this] def makeRedisKey(name: String, key: Array[Byte]): String = withErrorHandling {
- "%s:%s".format(name, byteArrayToString(key))
+ "%s:%s".format(name, new String(key))
}
private [this] def makeKeyFromRedisKey(redisKey: String) = withErrorHandling {
val nk = redisKey.split(':')
- (nk(0), stringToByteArray(nk(1)))
+ (nk(0), nk(1).getBytes)
}
private [this] def mset(entries: List[(String, String)]): Unit = withErrorHandling {
@@ -124,27 +124,22 @@ private [akka] object RedisStorageBackend extends
}
def getMapStorageEntryFor(name: String, key: Array[Byte]): Option[Array[Byte]] = withErrorHandling {
- db.get(makeRedisKey(name, key)) match {
- case None =>
- throw new NoSuchElementException(new String(key) + " not present")
- case Some(s) => Some(stringToByteArray(s))
+ db.get(makeRedisKey(name, key))
+ .map(stringToByteArray(_))
+ .orElse(throw new NoSuchElementException(new String(key) + " not present"))
}
- }
def getMapStorageSizeFor(name: String): Int = withErrorHandling {
- db.keys("%s:*".format(name)) match {
- case None => 0
- case Some(keys) => keys.length
- }
+ db.keys("%s:*".format(name)).map(_.length).getOrElse(0)
}
def getMapStorageFor(name: String): List[(Array[Byte], Array[Byte])] = withErrorHandling {
- db.keys("%s:*".format(name)) match {
- case None =>
- throw new NoSuchElementException(name + " not present")
- case Some(keys) =>
+ db.keys("%s:*".format(name))
+ .map { keys =>
keys.map(key => (makeKeyFromRedisKey(key.get)._2, stringToByteArray(db.get(key.get).get))).toList
- }
+ }.getOrElse {
+ throw new NoSuchElementException(name + " not present")
+ }
}
def getMapStorageRangeFor(name: String, start: Option[Array[Byte]],
@@ -207,12 +202,11 @@ private [akka] object RedisStorageBackend extends
}
def getVectorStorageEntryFor(name: String, index: Int): Array[Byte] = withErrorHandling {
- db.lindex(name, index) match {
- case None =>
+ db.lindex(name, index)
+ .map(stringToByteArray(_))
+ .getOrElse {
throw new NoSuchElementException(name + " does not have element at " + index)
- case Some(e) =>
- stringToByteArray(e)
- }
+ }
}
/**
@@ -252,11 +246,11 @@ private [akka] object RedisStorageBackend extends
}
def getRefStorageFor(name: String): Option[Array[Byte]] = withErrorHandling {
- db.get(name) match {
- case None =>
+ db.get(name)
+ .map(stringToByteArray(_))
+ .orElse {
throw new NoSuchElementException(name + " not present")
- case Some(s) => Some(stringToByteArray(s))
- }
+ }
}
// add to the end of the queue
@@ -266,11 +260,11 @@ private [akka] object RedisStorageBackend extends
// pop from the front of the queue
def dequeue(name: String): Option[Array[Byte]] = withErrorHandling {
- db.lpop(name) match {
- case None =>
+ db.lpop(name)
+ .map(stringToByteArray(_))
+ .orElse {
throw new NoSuchElementException(name + " not present")
- case Some(s) => Some(stringToByteArray(s))
- }
+ }
}
// get the size of the queue
@@ -302,26 +296,19 @@ private [akka] object RedisStorageBackend extends
// completely delete the queue
def remove(name: String): Boolean = withErrorHandling {
- db.del(name) match {
- case Some(1) => true
- case _ => false
- }
+ db.del(name).map { case 1 => true }.getOrElse(false)
}
// add item to sorted set identified by name
def zadd(name: String, zscore: String, item: Array[Byte]): Boolean = withErrorHandling {
- db.zadd(name, zscore, byteArrayToString(item)) match {
- case Some(1) => true
- case _ => false
- }
+ db.zadd(name, zscore, byteArrayToString(item))
+ .map { case 1 => true }.getOrElse(false)
}
// remove item from sorted set identified by name
def zrem(name: String, item: Array[Byte]): Boolean = withErrorHandling {
- db.zrem(name, byteArrayToString(item)) match {
- case Some(1) => true
- case _ => false
- }
+ db.zrem(name, byteArrayToString(item))
+ .map { case 1 => true }.getOrElse(false)
}
// cardinality of the set identified by name
@@ -330,29 +317,23 @@ private [akka] object RedisStorageBackend extends
}
def zscore(name: String, item: Array[Byte]): Option[Float] = withErrorHandling {
- db.zscore(name, byteArrayToString(item)) match {
- case Some(s) => Some(s.toFloat)
- case None => None
- }
+ db.zscore(name, byteArrayToString(item)).map(_.toFloat)
}
def zrange(name: String, start: Int, end: Int): List[Array[Byte]] = withErrorHandling {
- db.zrange(name, start.toString, end.toString, RedisClient.ASC, false) match {
- case None =>
+ db.zrange(name, start.toString, end.toString, RedisClient.ASC, false)
+ .map(_.map(e => stringToByteArray(e.get)))
+ .getOrElse {
throw new NoSuchElementException(name + " not present")
- case Some(s) =>
- s.map(e => stringToByteArray(e.get))
- }
+ }
}
def zrangeWithScore(name: String, start: Int, end: Int): List[(Array[Byte], Float)] = withErrorHandling {
- db.zrangeWithScore(
- name, start.toString, end.toString, RedisClient.ASC) match {
- case None =>
- throw new NoSuchElementException(name + " not present")
- case Some(l) =>
- l.map{ case (elem, score) => (stringToByteArray(elem.get), score.get.toFloat) }
- }
+ db.zrangeWithScore(name, start.toString, end.toString, RedisClient.ASC)
+ .map(_.map { case (elem, score) => (stringToByteArray(elem.get), score.get.toFloat) })
+ .getOrElse {
+ throw new NoSuchElementException(name + " not present")
+ }
}
def flushDB = withErrorHandling(db.flushdb)
diff --git a/akka-persistence/akka-persistence-redis/src/test/scala/RedisTicket343Spec.scala b/akka-persistence/akka-persistence-redis/src/test/scala/RedisTicket343Spec.scala
new file mode 100644
index 0000000000..de236b9a5a
--- /dev/null
+++ b/akka-persistence/akka-persistence-redis/src/test/scala/RedisTicket343Spec.scala
@@ -0,0 +1,351 @@
+package se.scalablesolutions.akka.persistence.redis
+
+import org.scalatest.Spec
+import org.scalatest.matchers.ShouldMatchers
+import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
+import org.scalatest.junit.JUnitRunner
+import org.junit.runner.RunWith
+
+import se.scalablesolutions.akka.actor.{Actor}
+import se.scalablesolutions.akka.config.OneForOneStrategy
+import Actor._
+import se.scalablesolutions.akka.persistence.common.PersistentVector
+import se.scalablesolutions.akka.stm.global._
+import se.scalablesolutions.akka.config.ScalaConfig._
+import se.scalablesolutions.akka.util.Logging
+
+import RedisStorageBackend._
+
+case class GET(k: String)
+case class SET(k: String, v: String)
+case class REM(k: String)
+case class CONTAINS(k: String)
+case object MAP_SIZE
+case class MSET(kvs: List[(String, String)])
+case class REMOVE_AFTER_PUT(kvsToAdd: List[(String, String)], ksToRem: List[String])
+case class CLEAR_AFTER_PUT(kvsToAdd: List[(String, String)])
+case class PUT_WITH_SLICE(kvsToAdd: List[(String, String)], start: String, cnt: Int)
+case class PUT_REM_WITH_SLICE(kvsToAdd: List[(String, String)], ksToRem: List[String], start: String, cnt: Int)
+
+case class VADD(v: String)
+case class VUPD(i: Int, v: String)
+case class VUPD_AND_ABORT(i: Int, v: String)
+case class VGET(i: Int)
+case object VSIZE
+case class VGET_AFTER_VADD(vsToAdd: List[String], isToFetch: List[Int])
+case class VADD_WITH_SLICE(vsToAdd: List[String], start: Int, cnt: Int)
+
+object Storage {
+ class RedisSampleMapStorage extends Actor {
+ self.lifeCycle = Some(LifeCycle(Permanent))
+ val FOO_MAP = "akka.sample.map"
+
+ private var fooMap = atomic { RedisStorage.getMap(FOO_MAP) }
+
+ def receive = {
+ case SET(k, v) =>
+ atomic {
+ fooMap += (k.getBytes, v.getBytes)
+ }
+ self.reply((k, v))
+
+ case GET(k) =>
+ val v = atomic {
+ fooMap.get(k.getBytes)
+ }
+ self.reply(v.collect {case byte => new String(byte)}.getOrElse(k + " Not found"))
+
+ case REM(k) =>
+ val v = atomic {
+ fooMap -= k.getBytes
+ }
+ self.reply(k)
+
+ case CONTAINS(k) =>
+ val v = atomic {
+ fooMap contains k.getBytes
+ }
+ self.reply(v)
+
+ case MAP_SIZE =>
+ val v = atomic {
+ fooMap.size
+ }
+ self.reply(v)
+
+ case MSET(kvs) =>
+ atomic {
+ kvs.foreach {kv =>
+ fooMap += (kv._1.getBytes, kv._2.getBytes)
+ }
+ }
+ self.reply(kvs.size)
+
+ case REMOVE_AFTER_PUT(kvs2add, ks2rem) =>
+ val v =
+ atomic {
+ kvs2add.foreach {kv =>
+ fooMap += (kv._1.getBytes, kv._2.getBytes)
+ }
+
+ ks2rem.foreach {k =>
+ fooMap -= k.getBytes
+ }
+ fooMap.size
+ }
+ self.reply(v)
+
+ case CLEAR_AFTER_PUT(kvs2add) =>
+ atomic {
+ kvs2add.foreach {kv =>
+ fooMap += (kv._1.getBytes, kv._2.getBytes)
+ }
+ fooMap.clear
+ }
+ self.reply(true)
+
+ case PUT_WITH_SLICE(kvs2add, from, cnt) =>
+ val v =
+ atomic {
+ kvs2add.foreach {kv =>
+ fooMap += (kv._1.getBytes, kv._2.getBytes)
+ }
+ fooMap.slice(Some(from.getBytes), cnt)
+ }
+ self.reply(v: List[(Array[Byte], Array[Byte])])
+
+ case PUT_REM_WITH_SLICE(kvs2add, ks2rem, from, cnt) =>
+ val v =
+ atomic {
+ kvs2add.foreach {kv =>
+ fooMap += (kv._1.getBytes, kv._2.getBytes)
+ }
+ ks2rem.foreach {k =>
+ fooMap -= k.getBytes
+ }
+ fooMap.slice(Some(from.getBytes), cnt)
+ }
+ self.reply(v: List[(Array[Byte], Array[Byte])])
+ }
+ }
+
+ class RedisSampleVectorStorage extends Actor {
+ self.lifeCycle = Some(LifeCycle(Permanent))
+ val FOO_VECTOR = "akka.sample.vector"
+
+ private var fooVector = atomic { RedisStorage.getVector(FOO_VECTOR) }
+
+ def receive = {
+ case VADD(v) =>
+ val size =
+ atomic {
+ fooVector + v.getBytes
+ fooVector length
+ }
+ self.reply(size)
+
+ case VGET(index) =>
+ val ind =
+ atomic {
+ fooVector get index
+ }
+ self.reply(ind)
+
+ case VGET_AFTER_VADD(vs, is) =>
+ val els =
+ atomic {
+ vs.foreach(fooVector + _.getBytes)
+ (is.foldRight(List[Array[Byte]]())(fooVector.get(_) :: _)).map(new String(_))
+ }
+ self.reply(els)
+
+ case VUPD_AND_ABORT(index, value) =>
+ val l =
+ atomic {
+ fooVector.update(index, value.getBytes)
+ // force fail
+ fooVector get 100
+ }
+ self.reply(index)
+
+ case VADD_WITH_SLICE(vs, s, c) =>
+ val l =
+ atomic {
+ vs.foreach(fooVector + _.getBytes)
+ fooVector.slice(Some(s), None, c)
+ }
+ self.reply(l.map(new String(_)))
+ }
+ }
+}
+
+import Storage._
+
+@RunWith(classOf[JUnitRunner])
+class RedisTicket343Spec extends
+ Spec with
+ ShouldMatchers with
+ BeforeAndAfterAll with
+ BeforeAndAfterEach {
+
+ override def beforeAll {
+ flushDB
+ println("** destroyed database")
+ }
+
+ override def afterEach {
+ flushDB
+ println("** destroyed database")
+ }
+
+ describe("Ticket 343 Issue #1") {
+ it("remove after put should work within the same transaction") {
+ val proc = actorOf[RedisSampleMapStorage]
+ proc.start
+
+ (proc !! SET("debasish", "anshinsoft")).getOrElse("Set failed") should equal(("debasish", "anshinsoft"))
+ (proc !! GET("debasish")).getOrElse("Get failed") should equal("anshinsoft")
+ (proc !! MAP_SIZE).getOrElse("Size failed") should equal(1)
+
+ (proc !! MSET(List(("dg", "1"), ("mc", "2"), ("nd", "3")))).getOrElse("Mset failed") should equal(3)
+
+ (proc !! GET("dg")).getOrElse("Get failed") should equal("1")
+ (proc !! GET("mc")).getOrElse("Get failed") should equal("2")
+ (proc !! GET("nd")).getOrElse("Get failed") should equal("3")
+
+ (proc !! MAP_SIZE).getOrElse("Size failed") should equal(4)
+
+ val add = List(("a", "1"), ("b", "2"), ("c", "3"))
+ val rem = List("a", "debasish")
+ (proc !! REMOVE_AFTER_PUT(add, rem)).getOrElse("REMOVE_AFTER_PUT failed") should equal(5)
+
+ (proc !! GET("debasish")).getOrElse("debasish not found") should equal("debasish Not found")
+ (proc !! GET("a")).getOrElse("a not found") should equal("a Not found")
+
+ (proc !! GET("b")).getOrElse("b not found") should equal("2")
+
+ (proc !! CONTAINS("b")).getOrElse("b not found") should equal(true)
+ (proc !! CONTAINS("debasish")).getOrElse("debasish not found") should equal(false)
+ (proc !! MAP_SIZE).getOrElse("Size failed") should equal(5)
+ proc.stop
+ }
+ }
+
+ describe("Ticket 343 Issue #2") {
+ it("clear after put should work within the same transaction") {
+ val proc = actorOf[RedisSampleMapStorage]
+ proc.start
+
+ (proc !! SET("debasish", "anshinsoft")).getOrElse("Set failed") should equal(("debasish", "anshinsoft"))
+ (proc !! GET("debasish")).getOrElse("Get failed") should equal("anshinsoft")
+ (proc !! MAP_SIZE).getOrElse("Size failed") should equal(1)
+
+ val add = List(("a", "1"), ("b", "2"), ("c", "3"))
+ (proc !! CLEAR_AFTER_PUT(add)).getOrElse("CLEAR_AFTER_PUT failed") should equal(true)
+
+ (proc !! MAP_SIZE).getOrElse("Size failed") should equal(1)
+ proc.stop
+ }
+ }
+
+ describe("Ticket 343 Issue #3") {
+ it("map size should change after the transaction") {
+ val proc = actorOf[RedisSampleMapStorage]
+ proc.start
+
+ (proc !! SET("debasish", "anshinsoft")).getOrElse("Set failed") should equal(("debasish", "anshinsoft"))
+ (proc !! GET("debasish")).getOrElse("Get failed") should equal("anshinsoft")
+ (proc !! MAP_SIZE).getOrElse("Size failed") should equal(1)
+
+ (proc !! MSET(List(("dg", "1"), ("mc", "2"), ("nd", "3")))).getOrElse("Mset failed") should equal(3)
+ (proc !! MAP_SIZE).getOrElse("Size failed") should equal(4)
+
+ (proc !! GET("dg")).getOrElse("Get failed") should equal("1")
+ (proc !! GET("mc")).getOrElse("Get failed") should equal("2")
+ (proc !! GET("nd")).getOrElse("Get failed") should equal("3")
+ proc.stop
+ }
+ }
+
+ describe("slice test") {
+ it("should pass") {
+ val proc = actorOf[RedisSampleMapStorage]
+ proc.start
+
+ (proc !! SET("debasish", "anshinsoft")).getOrElse("Set failed") should equal(("debasish", "anshinsoft"))
+ (proc !! GET("debasish")).getOrElse("Get failed") should equal("anshinsoft")
+ (proc !! MAP_SIZE).getOrElse("Size failed") should equal(1)
+
+ (proc !! MSET(List(("dg", "1"), ("mc", "2"), ("nd", "3")))).getOrElse("Mset failed") should equal(3)
+ (proc !! MAP_SIZE).getOrElse("Size failed") should equal(4)
+
+ (proc !! PUT_WITH_SLICE(List(("ec", "1"), ("tb", "2"), ("mc", "10")), "dg", 3)).get.asInstanceOf[List[(Array[Byte], Array[Byte])]].map { case (k, v) => (new String(k), new String(v)) } should equal(List(("dg", "1"), ("ec", "1"), ("mc", "10")))
+
+ (proc !! PUT_REM_WITH_SLICE(List(("fc", "1"), ("gb", "2"), ("xy", "10")), List("tb", "fc"), "dg", 5)).get.asInstanceOf[List[(Array[Byte], Array[Byte])]].map { case (k, v) => (new String(k), new String(v)) } should equal(List(("dg", "1"), ("ec", "1"), ("gb", "2"), ("mc", "10"), ("nd", "3")))
+ proc.stop
+ }
+ }
+
+ describe("Ticket 343 Issue #4") {
+ it("vector get should not ignore elements that were in vector before transaction") {
+ val proc = actorOf[RedisSampleVectorStorage]
+ proc.start
+
+ // add 4 elements in separate transactions
+ (proc !! VADD("debasish")).getOrElse("VADD failed") should equal(1)
+ (proc !! VADD("maulindu")).getOrElse("VADD failed") should equal(2)
+ (proc !! VADD("ramanendu")).getOrElse("VADD failed") should equal(3)
+ (proc !! VADD("nilanjan")).getOrElse("VADD failed") should equal(4)
+
+ new String((proc !! VGET(0)).get.asInstanceOf[Array[Byte]] ) should equal("nilanjan")
+ new String((proc !! VGET(1)).get.asInstanceOf[Array[Byte]] ) should equal("ramanendu")
+ new String((proc !! VGET(2)).get.asInstanceOf[Array[Byte]] ) should equal("maulindu")
+ new String((proc !! VGET(3)).get.asInstanceOf[Array[Byte]] ) should equal("debasish")
+
+ // now add 3 more and do gets in the same transaction
+ (proc !! VGET_AFTER_VADD(List("a", "b", "c"), List(0, 2, 4))).get.asInstanceOf[List[String]] should equal(List("c", "a", "ramanendu"))
+ proc.stop
+ }
+ }
+
+ describe("Ticket 343 Issue #6") {
+ it("vector update should not ignore transaction") {
+ val proc = actorOf[RedisSampleVectorStorage]
+ proc.start
+
+ // add 4 elements in separate transactions
+ (proc !! VADD("debasish")).getOrElse("VADD failed") should equal(1)
+ (proc !! VADD("maulindu")).getOrElse("VADD failed") should equal(2)
+ (proc !! VADD("ramanendu")).getOrElse("VADD failed") should equal(3)
+ (proc !! VADD("nilanjan")).getOrElse("VADD failed") should equal(4)
+
+ evaluating {
+ (proc !! VUPD_AND_ABORT(0, "virat")).getOrElse("VUPD_AND_ABORT failed")
+ } should produce [Exception]
+
+ // update aborts and hence values will remain unchanged
+ new String((proc !! VGET(0)).get.asInstanceOf[Array[Byte]] ) should equal("nilanjan")
+ proc.stop
+ }
+ }
+
+ describe("Ticket 343 Issue #5") {
+ it("vector slice() should not ignore elements added in current transaction") {
+ val proc = actorOf[RedisSampleVectorStorage]
+ proc.start
+
+ // add 4 elements in separate transactions
+ (proc !! VADD("debasish")).getOrElse("VADD failed") should equal(1)
+ (proc !! VADD("maulindu")).getOrElse("VADD failed") should equal(2)
+ (proc !! VADD("ramanendu")).getOrElse("VADD failed") should equal(3)
+ (proc !! VADD("nilanjan")).getOrElse("VADD failed") should equal(4)
+
+ // slice with no new elements added in current transaction
+ (proc !! VADD_WITH_SLICE(List(), 2, 2)).getOrElse("VADD_WITH_SLICE failed") should equal(Vector("maulindu", "debasish"))
+
+ // slice with new elements added in current transaction
+ (proc !! VADD_WITH_SLICE(List("a", "b", "c", "d"), 2, 2)).getOrElse("VADD_WITH_SLICE failed") should equal(Vector("b", "a"))
+ proc.stop
+ }
+ }
+}
diff --git a/embedded-repo/org/scala-tools/time/2.8.0-0.2-SNAPSHOT/time-2.8.0-0.2-SNAPSHOT.jar b/embedded-repo/org/scala-tools/time/2.8.0-0.2-SNAPSHOT/time-2.8.0-0.2-SNAPSHOT.jar
new file mode 100644
index 0000000000..038768fe14
Binary files /dev/null and b/embedded-repo/org/scala-tools/time/2.8.0-0.2-SNAPSHOT/time-2.8.0-0.2-SNAPSHOT.jar differ
diff --git a/embedded-repo/sjson/json/sjson/0.8-SNAPSHOT-2.8.0/sjson-0.8-SNAPSHOT-2.8.0.jar b/embedded-repo/sjson/json/sjson/0.8-SNAPSHOT-2.8.0/sjson-0.8-SNAPSHOT-2.8.0.jar
new file mode 100644
index 0000000000..1542632a82
Binary files /dev/null and b/embedded-repo/sjson/json/sjson/0.8-SNAPSHOT-2.8.0/sjson-0.8-SNAPSHOT-2.8.0.jar differ
diff --git a/project/build/AkkaProject.scala b/project/build/AkkaProject.scala
index 393b531dd6..0ee2b18108 100644
--- a/project/build/AkkaProject.scala
+++ b/project/build/AkkaProject.scala
@@ -49,6 +49,8 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) {
lazy val JavaNetRepo = MavenRepository("java.net Repo", "http://download.java.net/maven/2")
lazy val SonatypeSnapshotRepo = MavenRepository("Sonatype OSS Repo", "http://oss.sonatype.org/content/repositories/releases")
lazy val SunJDMKRepo = MavenRepository("Sun JDMK Repo", "http://wp5.e-taxonomy.eu/cdmlib/mavenrepo")
+ lazy val CasbahRepoSnapshots = MavenRepository("Casbah Snapshot Repo", "http://repo.bumnetworks.com/snapshots/")
+ lazy val CasbahRepoReleases = MavenRepository("Casbah Snapshot Repo", "http://repo.bumnetworks.com/releases/")
}
// -------------------------------------------------------------------------------------------------------------------
@@ -75,6 +77,8 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) {
lazy val scalaTestModuleConfig = ModuleConfiguration("org.scalatest", ScalaToolsSnapshots)
lazy val logbackModuleConfig = ModuleConfiguration("ch.qos.logback",sbt.DefaultMavenRepository)
lazy val atomikosModuleConfig = ModuleConfiguration("com.atomikos",sbt.DefaultMavenRepository)
+ lazy val casbahSnapshot = ModuleConfiguration("com.novus",CasbahRepoSnapshots)
+ lazy val casbahRelease = ModuleConfiguration("com.novus",CasbahRepoReleases)
lazy val embeddedRepo = EmbeddedRepo // This is the only exception, because the embedded repo is fast!
// -------------------------------------------------------------------------------------------------------------------
@@ -166,6 +170,10 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) {
lazy val mongo = "org.mongodb" % "mongo-java-driver" % "2.0" % "compile"
+ lazy val casbah = "com.novus" % "casbah_2.8.0" % "1.0.8.5" % "compile"
+
+ lazy val time = "org.scala-tools" % "time" % "2.8.0-SNAPSHOT-0.2-SNAPSHOT" % "compile"
+
lazy val multiverse = "org.multiverse" % "multiverse-alpha" % MULTIVERSE_VERSION % "compile" intransitive
lazy val netty = "org.jboss.netty" % "netty" % "3.2.2.Final" % "compile"
@@ -180,7 +188,7 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) {
lazy val sbinary = "sbinary" % "sbinary" % "2.8.0-0.3.1" % "compile"
- lazy val sjson = "sjson.json" % "sjson" % "0.7-2.8.0" % "compile"
+ lazy val sjson = "sjson.json" % "sjson" % "0.8-SNAPSHOT-2.8.0" % "compile"
lazy val slf4j = "org.slf4j" % "slf4j-api" % SLF4J_VERSION % "compile"
@@ -474,7 +482,7 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) {
val commons_codec = Dependencies.commons_codec
val redis = Dependencies.redis
- override def testOptions = TestFilter((name: String) => name.endsWith("Test")) :: Nil
+ // override def testOptions = TestFilter((name: String) => name.endsWith("Test")) :: Nil
}
// -------------------------------------------------------------------------------------------------------------------
@@ -483,8 +491,9 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) {
class AkkaMongoProject(info: ProjectInfo) extends AkkaDefaultProject(info, distPath) {
val mongo = Dependencies.mongo
+ val casbah = Dependencies.casbah
- override def testOptions = TestFilter((name: String) => name.endsWith("Test")) :: Nil
+ // override def testOptions = TestFilter((name: String) => name.endsWith("Test")) :: Nil
}
// -------------------------------------------------------------------------------------------------------------------