diff --git a/akka-persistence/akka-persistence-cassandra/src/main/scala/CassandraStorage.scala b/akka-persistence/akka-persistence-cassandra/src/main/scala/CassandraStorage.scala
index be5fc4f4c7..0c6f239ef7 100644
--- a/akka-persistence/akka-persistence-cassandra/src/main/scala/CassandraStorage.scala
+++ b/akka-persistence/akka-persistence-cassandra/src/main/scala/CassandraStorage.scala
@@ -29,7 +29,7 @@ object CassandraStorage extends Storage {
*
* @author Jonas Bonér
*/
-class CassandraPersistentMap(id: String) extends PersistentMap[Array[Byte], Array[Byte]] {
+class CassandraPersistentMap(id: String) extends PersistentMapBinary {
val uuid = id
val storage = CassandraStorageBackend
}
diff --git a/akka-persistence/akka-persistence-common/src/main/scala/Storage.scala b/akka-persistence/akka-persistence-common/src/main/scala/Storage.scala
index ccaf7518f1..4d9ff48a60 100644
--- a/akka-persistence/akka-persistence-common/src/main/scala/Storage.scala
+++ b/akka-persistence/akka-persistence-common/src/main/scala/Storage.scala
@@ -7,9 +7,11 @@ package se.scalablesolutions.akka.persistence.common
import se.scalablesolutions.akka.stm._
import se.scalablesolutions.akka.stm.TransactionManagement.transaction
import se.scalablesolutions.akka.util.Logging
-import se.scalablesolutions.akka.AkkaException
-class StorageException(message: String) extends AkkaException(message)
+// FIXME move to 'stm' package + add message with more info
+class NoTransactionInScopeException extends RuntimeException
+
+class StorageException(message: String) extends RuntimeException(message)
/**
* Example Scala usage.
@@ -80,24 +82,90 @@ trait Storage {
*/
trait PersistentMap[K, V] extends scala.collection.mutable.Map[K, V]
with Transactional with Committable with Abortable with Logging {
- protected val newAndUpdatedEntries = TransactionalMap[K, V]()
- protected val removedEntries = TransactionalVector[K]()
protected val shouldClearOnCommit = Ref[Boolean]()
+ // operations on the Map
+ trait Op
+ case object GET extends Op
+ case object PUT extends Op
+ case object REM extends Op
+ case object UPD extends Op
+
+ // append only log: records all mutating operations
+ protected val appendOnlyTxLog = TransactionalVector[LogEntry]()
+
+ case class LogEntry(key: K, value: Option[V], op: Op)
+
+ // need to override in subclasses e.g. "sameElements" for Array[Byte]
+ def equal(k1: K, k2: K): Boolean = k1 == k2
+
+ // Seqable type that's required for maintaining the log of distinct keys affected in current transaction
+ type T <: Equals
+
+ // converts key K to the Seqable type Equals
+ def toEquals(k: K): T
+
+ // keys affected in the current transaction
+ protected val keysInCurrentTx = TransactionalMap[T, K]()
+
+ protected def addToListOfKeysInTx(key: K): Unit =
+ keysInCurrentTx += (toEquals(key), key)
+
+ protected def clearDistinctKeys = keysInCurrentTx.clear
+
+ protected def filterTxLogByKey(key: K): IndexedSeq[LogEntry] =
+ appendOnlyTxLog filter(e => equal(e.key, key))
+
+ // need to get current value considering the underlying storage as well as the transaction log
+ protected def getCurrentValue(key: K): Option[V] = {
+
+ // get all mutating entries for this key for this tx
+ val txEntries = filterTxLogByKey(key)
+
+ // get the snapshot from the underlying store for this key
+ val underlying = try {
+ storage.getMapStorageEntryFor(uuid, key)
+ } catch { case e: Exception => None }
+
+ if (txEntries.isEmpty) underlying
+ else replay(txEntries, key, underlying)
+ }
+
+ // replay all tx entries for key k with seed = initial
+ private def replay(txEntries: IndexedSeq[LogEntry], key: K, initial: Option[V]): Option[V] = {
+ import scala.collection.mutable._
+
+ val m = initial match {
+ case None => Map.empty[K, V]
+ case Some(v) => Map((key, v))
+ }
+ txEntries.foreach {case LogEntry(k, v, o) => o match {
+ case PUT => m.put(k, v.get)
+ case REM => m -= k
+ case UPD => m.update(k, v.get)
+ }}
+ m get key
+ }
+
// to be concretized in subclasses
val storage: MapStorageBackend[K, V]
def commit = {
- if (shouldClearOnCommit.isDefined && shouldClearOnCommit.get) storage.removeMapStorageFor(uuid)
- removedEntries.toList.foreach(key => storage.removeMapStorageFor(uuid, key))
- storage.insertMapStorageEntriesFor(uuid, newAndUpdatedEntries.toList)
- newAndUpdatedEntries.clear
- removedEntries.clear
+ // if (shouldClearOnCommit.isDefined && shouldClearOnCommit.get) storage.removeMapStorageFor(uuid)
+
+ appendOnlyTxLog.foreach { case LogEntry(k, v, o) => o match {
+ case PUT => storage.insertMapStorageEntryFor(uuid, k, v.get)
+ case UPD => storage.insertMapStorageEntryFor(uuid, k, v.get)
+ case REM => storage.removeMapStorageFor(uuid, k)
+ }}
+
+ appendOnlyTxLog.clear
+ clearDistinctKeys
}
def abort = {
- newAndUpdatedEntries.clear
- removedEntries.clear
+ appendOnlyTxLog.clear
+ clearDistinctKeys
shouldClearOnCommit.swap(false)
}
@@ -118,68 +186,84 @@ trait PersistentMap[K, V] extends scala.collection.mutable.Map[K, V]
override def put(key: K, value: V): Option[V] = {
register
- newAndUpdatedEntries.put(key, value)
+ val curr = getCurrentValue(key)
+ appendOnlyTxLog add LogEntry(key, Some(value), PUT)
+ addToListOfKeysInTx(key)
+ curr
}
override def update(key: K, value: V) = {
register
- newAndUpdatedEntries.update(key, value)
+ val curr = getCurrentValue(key)
+ appendOnlyTxLog add LogEntry(key, Some(value), UPD)
+ addToListOfKeysInTx(key)
+ curr
}
override def remove(key: K) = {
register
- removedEntries.add(key)
- newAndUpdatedEntries.get(key)
+ val curr = getCurrentValue(key)
+ appendOnlyTxLog add LogEntry(key, None, REM)
+ addToListOfKeysInTx(key)
+ curr
}
- def slice(start: Option[K], count: Int): List[Tuple2[K, V]] =
+ def slice(start: Option[K], count: Int): List[(K, V)] =
slice(start, None, count)
- def slice(start: Option[K], finish: Option[K], count: Int): List[Tuple2[K, V]] = try {
- storage.getMapStorageRangeFor(uuid, start, finish, count)
- } catch { case e: Exception => Nil }
+ def slice(start: Option[K], finish: Option[K], count: Int): List[(K, V)]
override def clear = {
register
+ appendOnlyTxLog.clear
+ clearDistinctKeys
shouldClearOnCommit.swap(true)
}
override def contains(key: K): Boolean = try {
- newAndUpdatedEntries.contains(key) ||
- storage.getMapStorageEntryFor(uuid, key).isDefined
+ filterTxLogByKey(key) match {
+ case Seq() => // current tx doesn't use this
+ storage.getMapStorageEntryFor(uuid, key).isDefined // check storage
+ case txs => // present in log
+ txs.last.op != REM // last entry cannot be a REM
+ }
} catch { case e: Exception => false }
+ protected def existsInStorage(key: K): Option[V] = try {
+ storage.getMapStorageEntryFor(uuid, key)
+ } catch {
+ case e: Exception => None
+ }
+
override def size: Int = try {
- storage.getMapStorageSizeFor(uuid)
- } catch { case e: Exception => 0 }
+ // partition key set affected in current tx into those which r added & which r deleted
+ val (keysAdded, keysRemoved) = keysInCurrentTx.map {
+ case (kseq, k) => ((kseq, k), getCurrentValue(k))
+ }.partition(_._2.isDefined)
- override def get(key: K): Option[V] = {
- if (newAndUpdatedEntries.contains(key)) {
- newAndUpdatedEntries.get(key)
- }
- else try {
- storage.getMapStorageEntryFor(uuid, key)
- } catch { case e: Exception => None }
+ // keys which existed in storage but removed in current tx
+ val inStorageRemovedInTx =
+ keysRemoved.keySet
+ .map(_._2)
+ .filter(k => existsInStorage(k).isDefined)
+ .size
+
+ // all keys in storage
+ val keysInStorage =
+ storage.getMapStorageFor(uuid)
+ .map { case (k, v) => toEquals(k) }
+ .toSet
+
+ // (keys that existed UNION keys added ) - (keys removed)
+ (keysInStorage union keysAdded.keySet.map(_._1)).size - inStorageRemovedInTx
+ } catch {
+ case e: Exception => 0
}
- def iterator = elements
+ // get must consider underlying storage & current uncommitted tx log
+ override def get(key: K): Option[V] = getCurrentValue(key)
- override def elements: Iterator[Tuple2[K, V]] = {
- new Iterator[Tuple2[K, V]] {
- private val originalList: List[Tuple2[K, V]] = try {
- storage.getMapStorageFor(uuid)
- } catch {
- case e: Throwable => Nil
- }
- private var elements = newAndUpdatedEntries.toList union originalList.reverse
- override def next: Tuple2[K, V]= synchronized {
- val element = elements.head
- elements = elements.tail
- element
- }
- override def hasNext: Boolean = synchronized { !elements.isEmpty }
- }
- }
+ def iterator: Iterator[Tuple2[K, V]]
private def register = {
if (transaction.get.isEmpty) throw new NoTransactionInScopeException
@@ -187,6 +271,95 @@ trait PersistentMap[K, V] extends scala.collection.mutable.Map[K, V]
}
}
+trait PersistentMapBinary extends PersistentMap[Array[Byte], Array[Byte]] {
+ import scala.collection.mutable.ArraySeq
+
+ type T = ArraySeq[Byte]
+ def toEquals(k: Array[Byte]) = ArraySeq(k: _*)
+ override def equal(k1: Array[Byte], k2: Array[Byte]): Boolean = k1 sameElements k2
+
+ object COrdering {
+ implicit object ArraySeqOrdering extends Ordering[ArraySeq[Byte]] {
+ def compare(o1: ArraySeq[Byte], o2: ArraySeq[Byte]) =
+ new String(o1.toArray) compare new String(o2.toArray)
+ }
+ }
+
+ import scala.collection.immutable.{TreeMap, SortedMap}
+ private def replayAllKeys: SortedMap[ArraySeq[Byte], Array[Byte]] = {
+ import COrdering._
+
+ // need ArraySeq for ordering
+ val fromStorage =
+ TreeMap(storage.getMapStorageFor(uuid).map { case (k, v) => (ArraySeq(k: _*), v) }: _*)
+
+ val (keysAdded, keysRemoved) = keysInCurrentTx.map {
+ case (_, k) => (k, getCurrentValue(k))
+ }.partition(_._2.isDefined)
+
+ val inStorageRemovedInTx =
+ keysRemoved.keySet
+ .filter(k => existsInStorage(k).isDefined)
+ .map(k => ArraySeq(k: _*))
+
+ (fromStorage -- inStorageRemovedInTx) ++ keysAdded.map { case (k, Some(v)) => (ArraySeq(k: _*), v) }
+ }
+
+ override def slice(start: Option[Array[Byte]], finish: Option[Array[Byte]], count: Int): List[(Array[Byte], Array[Byte])] = try {
+ val newMap = replayAllKeys
+
+ if (newMap isEmpty) List[(Array[Byte], Array[Byte])]()
+
+ val startKey =
+ start match {
+ case Some(bytes) => Some(ArraySeq(bytes: _*))
+ case None => None
+ }
+
+ val endKey =
+ finish match {
+ case Some(bytes) => Some(ArraySeq(bytes: _*))
+ case None => None
+ }
+
+ ((startKey, endKey, count): @unchecked) match {
+ case ((Some(s), Some(e), _)) =>
+ newMap.range(s, e)
+ .toList
+ .map(e => (e._1.toArray, e._2))
+ .toList
+ case ((Some(s), None, c)) if c > 0 =>
+ newMap.from(s)
+ .iterator
+ .take(count)
+ .map(e => (e._1.toArray, e._2))
+ .toList
+ case ((Some(s), None, _)) =>
+ newMap.from(s)
+ .toList
+ .map(e => (e._1.toArray, e._2))
+ .toList
+ case ((None, Some(e), _)) =>
+ newMap.until(e)
+ .toList
+ .map(e => (e._1.toArray, e._2))
+ .toList
+ }
+ } catch { case e: Exception => Nil }
+
+ override def iterator: Iterator[(Array[Byte], Array[Byte])] = {
+ new Iterator[(Array[Byte], Array[Byte])] {
+ private var elements = replayAllKeys
+ override def next: (Array[Byte], Array[Byte]) = synchronized {
+ val (k, v) = elements.head
+ elements = elements.tail
+ (k.toArray, v)
+ }
+ override def hasNext: Boolean = synchronized { !elements.isEmpty }
+ }
+ }
+}
+
/**
* Implements a template for a concrete persistent transactional vector based storage.
*
@@ -198,42 +371,83 @@ trait PersistentVector[T] extends IndexedSeq[T] with Transactional with Committa
protected val removedElems = TransactionalVector[T]()
protected val shouldClearOnCommit = Ref[Boolean]()
+ // operations on the Vector
+ trait Op
+ case object ADD extends Op
+ case object UPD extends Op
+ case object POP extends Op
+
+ // append only log: records all mutating operations
+ protected val appendOnlyTxLog = TransactionalVector[LogEntry]()
+
+ case class LogEntry(index: Option[Int], value: Option[T], op: Op)
+
+ // need to override in subclasses e.g. "sameElements" for Array[Byte]
+ def equal(v1: T, v2: T): Boolean = v1 == v2
+
val storage: VectorStorageBackend[T]
def commit = {
- for (element <- newElems) storage.insertVectorStorageEntryFor(uuid, element)
- for (entry <- updatedElems) storage.updateVectorStorageEntryFor(uuid, entry._1, entry._2)
- newElems.clear
- updatedElems.clear
+ for(entry <- appendOnlyTxLog) {
+ entry match {
+ case LogEntry(_, Some(v), ADD) => storage.insertVectorStorageEntryFor(uuid, v)
+ case LogEntry(Some(i), Some(v), UPD) => storage.updateVectorStorageEntryFor(uuid, i, v)
+ case LogEntry(_, _, POP) => //..
+ }
+ }
+ appendOnlyTxLog.clear
}
def abort = {
- newElems.clear
- updatedElems.clear
- removedElems.clear
+ appendOnlyTxLog.clear
shouldClearOnCommit.swap(false)
}
+ private def replay: List[T] = {
+ import scala.collection.mutable.ArrayBuffer
+ var elemsStorage = ArrayBuffer(storage.getVectorStorageRangeFor(uuid, None, None, storage.getVectorStorageSizeFor(uuid)).reverse: _*)
+
+ for(entry <- appendOnlyTxLog) {
+ entry match {
+ case LogEntry(_, Some(v), ADD) => elemsStorage += v
+ case LogEntry(Some(i), Some(v), UPD) => elemsStorage.update(i, v)
+ case LogEntry(_, _, POP) => elemsStorage = elemsStorage.drop(1)
+ }
+ }
+ elemsStorage.toList.reverse
+ }
+
def +(elem: T) = add(elem)
def add(elem: T) = {
register
- newElems + elem
+ appendOnlyTxLog + LogEntry(None, Some(elem), ADD)
}
def apply(index: Int): T = get(index)
def get(index: Int): T = {
- if (newElems.size > index) newElems(index)
- else storage.getVectorStorageEntryFor(uuid, index)
+ if (appendOnlyTxLog.isEmpty) {
+ storage.getVectorStorageEntryFor(uuid, index)
+ } else {
+ val curr = replay
+ curr(index)
+ }
}
override def slice(start: Int, finish: Int): IndexedSeq[T] = slice(Some(start), Some(finish))
def slice(start: Option[Int], finish: Option[Int], count: Int = 0): IndexedSeq[T] = {
- val buffer = new scala.collection.mutable.ArrayBuffer[T]
- storage.getVectorStorageRangeFor(uuid, start, finish, count).foreach(buffer.append(_))
- buffer
+ val curr = replay
+ val s = if (start.isDefined) start.get else 0
+ val cnt =
+ if (finish.isDefined) {
+ val f = finish.get
+ if (f >= s) (f - s) else count
+ }
+ else count
+ if (s == 0 && cnt == 0) List().toIndexedSeq
+ else curr.slice(s, s + cnt).toIndexedSeq
}
/**
@@ -241,12 +455,13 @@ trait PersistentVector[T] extends IndexedSeq[T] with Transactional with Committa
*/
def pop: T = {
register
+ appendOnlyTxLog + LogEntry(None, None, POP)
throw new UnsupportedOperationException("PersistentVector::pop is not implemented")
}
def update(index: Int, newElem: T) = {
register
- storage.updateVectorStorageEntryFor(uuid, index, newElem)
+ appendOnlyTxLog + LogEntry(Some(index), Some(newElem), UPD)
}
override def first: T = get(0)
@@ -260,7 +475,7 @@ trait PersistentVector[T] extends IndexedSeq[T] with Transactional with Committa
}
}
- def length: Int = storage.getVectorStorageSizeFor(uuid) + newElems.length
+ def length: Int = replay.length
private def register = {
if (transaction.get.isEmpty) throw new NoTransactionInScopeException
diff --git a/akka-persistence/akka-persistence-mongo/src/main/scala/MongoStorage.scala b/akka-persistence/akka-persistence-mongo/src/main/scala/MongoStorage.scala
index 79cacfeb07..83e47e3ba5 100644
--- a/akka-persistence/akka-persistence-mongo/src/main/scala/MongoStorage.scala
+++ b/akka-persistence/akka-persistence-mongo/src/main/scala/MongoStorage.scala
@@ -29,7 +29,7 @@ object MongoStorage extends Storage {
*
* @author Debasish Ghosh
*/
-class MongoPersistentMap(id: String) extends PersistentMap[Array[Byte], Array[Byte]] {
+class MongoPersistentMap(id: String) extends PersistentMapBinary {
val uuid = id
val storage = MongoStorageBackend
}
diff --git a/akka-persistence/akka-persistence-mongo/src/main/scala/MongoStorageBackend.scala b/akka-persistence/akka-persistence-mongo/src/main/scala/MongoStorageBackend.scala
index 847c226630..d51ff17dab 100644
--- a/akka-persistence/akka-persistence-mongo/src/main/scala/MongoStorageBackend.scala
+++ b/akka-persistence/akka-persistence-mongo/src/main/scala/MongoStorageBackend.scala
@@ -64,51 +64,60 @@ private[akka] object MongoStorageBackend extends
coll.remove(q)
}
- private def queryFor[T](name: String)(body: (MongoDBObject, MongoDBObject) => T): T = {
- val q: DBObject = MongoDBObject(KEY -> name)
- val dbo = coll.findOne(q).getOrElse { throw new NoSuchElementException(name + " not present") }
- body(q, dbo)
+
+ private def queryFor[T](name: String)(body: (MongoDBObject, Option[DBObject]) => T): T = {
+ val q = MongoDBObject(KEY -> name)
+ body(q, coll.findOne(q))
}
def removeMapStorageFor(name: String, key: Array[Byte]): Unit = queryFor(name) { (q, dbo) =>
- dbo -= new String(key)
- coll.update(q, dbo, true, false)
+ dbo.foreach { d =>
+ d -= new String(key)
+ coll.update(q, d, true, false)
+ }
}
def getMapStorageEntryFor(name: String, key: Array[Byte]): Option[Array[Byte]] = queryFor(name) { (q, dbo) =>
- dbo.get(new String(key)).asInstanceOf[Option[Array[Byte]]]
+ dbo.map { d =>
+ Option(d.get(new String(key))).asInstanceOf[Option[Array[Byte]]]
+ }.getOrElse(None)
}
def getMapStorageSizeFor(name: String): Int = queryFor(name) { (q, dbo) =>
- dbo.size - 2 // need to exclude object id and our KEY
+ dbo.map { d =>
+ d.size - 2 // need to exclude object id and our KEY
+ }.getOrElse(0)
}
def getMapStorageFor(name: String): List[(Array[Byte], Array[Byte])] = queryFor(name) { (q, dbo) =>
- for {
- (k, v) <- dbo.toList
- if k != "_id" && k != KEY
- } yield (k.getBytes, v.asInstanceOf[Array[Byte]])
+ dbo.map { d =>
+ for {
+ (k, v) <- d.toList
+ if k != "_id" && k != KEY
+ } yield (k.getBytes, v.asInstanceOf[Array[Byte]])
+ }.getOrElse(List.empty[(Array[Byte], Array[Byte])])
}
def getMapStorageRangeFor(name: String, start: Option[Array[Byte]],
finish: Option[Array[Byte]],
count: Int): List[(Array[Byte], Array[Byte])] = queryFor(name) { (q, dbo) =>
- // get all keys except the special ones
- val keys =
- dbo.keySet
- .toList
- .filter(k => k != "_id" && k != KEY)
- .sortWith(_ < _)
+ dbo.map { d =>
+ // get all keys except the special ones
+ val keys = d.keys
+ .filter(k => k != "_id" && k != KEY)
+ .toList
+ .sortWith(_ < _)
- // if the supplied start is not defined, get the head of keys
- val s = start.map(new String(_)).getOrElse(keys.head)
+ // if the supplied start is not defined, get the head of keys
+ val s = start.map(new String(_)).getOrElse(keys.head)
- // if the supplied finish is not defined, get the last element of keys
- val f = finish.map(new String(_)).getOrElse(keys.last)
+ // if the supplied finish is not defined, get the last element of keys
+ val f = finish.map(new String(_)).getOrElse(keys.last)
- // slice from keys: both ends inclusive
- val ks = keys.slice(keys.indexOf(s), scala.math.min(count, keys.indexOf(f) + 1))
- ks.map(k => (k.getBytes, dbo.get(k).get.asInstanceOf[Array[Byte]]))
+ // slice from keys: both ends inclusive
+ val ks = keys.slice(keys.indexOf(s), scala.math.min(count, keys.indexOf(f) + 1))
+ ks.map(k => (k.getBytes, d.get(k).asInstanceOf[Array[Byte]]))
+ }.getOrElse(List.empty[(Array[Byte], Array[Byte])])
}
def insertVectorStorageEntryFor(name: String, element: Array[Byte]) = {
@@ -147,12 +156,16 @@ private[akka] object MongoStorageBackend extends
}
def updateVectorStorageEntryFor(name: String, index: Int, elem: Array[Byte]) = queryFor(name) { (q, dbo) =>
- dbo += ((index.toString, elem))
- coll.update(q, dbo, true, false)
+ dbo.foreach { d =>
+ d += ((index.toString, elem))
+ coll.update(q, d, true, false)
+ }
}
def getVectorStorageEntryFor(name: String, index: Int): Array[Byte] = queryFor(name) { (q, dbo) =>
- dbo(index.toString).asInstanceOf[Array[Byte]]
+ dbo.map { d =>
+ d(index.toString).asInstanceOf[Array[Byte]]
+ }.getOrElse(Array.empty[Byte])
}
/**
@@ -162,24 +175,26 @@ private[akka] object MongoStorageBackend extends
* if start == 0 and finish == 0, return an empty collection
*/
def getVectorStorageRangeFor(name: String, start: Option[Int], finish: Option[Int], count: Int): List[Array[Byte]] = queryFor(name) { (q, dbo) =>
- val ls = dbo.filter { case (k, v) => k != KEY && k != "_id" }
+ dbo.map { d =>
+ val ls = d.filter { case (k, v) => k != KEY && k != "_id" }
.toSeq
.sortWith((e1, e2) => (e1._1.toInt < e2._1.toInt))
.map(_._2)
- val st = start.getOrElse(0)
- val cnt =
- if (finish.isDefined) {
- val f = finish.get
- if (f >= st) (f - st) else count
- }
- else count
- if (st == 0 && cnt == 0) List()
- ls.slice(st, st + cnt).asInstanceOf[List[Array[Byte]]]
+ val st = start.getOrElse(0)
+ val cnt =
+ if (finish.isDefined) {
+ val f = finish.get
+ if (f >= st) (f - st) else count
+ }
+ else count
+ if (st == 0 && cnt == 0) List()
+ ls.slice(st, st + cnt).asInstanceOf[List[Array[Byte]]]
+ }.getOrElse(List.empty[Array[Byte]])
}
def getVectorStorageSizeFor(name: String): Int = queryFor(name) { (q, dbo) =>
- dbo.size - 2
+ dbo.map { d => d.size - 2 }.getOrElse(0)
}
def insertRefStorageFor(name: String, element: Array[Byte]) = {
@@ -201,11 +216,9 @@ private[akka] object MongoStorageBackend extends
}
}
- def getRefStorageFor(name: String): Option[Array[Byte]] = try {
- queryFor(name) { (q, dbo) =>
- dbo.get(REF).asInstanceOf[Option[Array[Byte]]]
- }
- } catch {
- case e: java.util.NoSuchElementException => None
+ def getRefStorageFor(name: String): Option[Array[Byte]] = queryFor(name) { (q, dbo) =>
+ dbo.map { d =>
+ Option(d.get(REF)).asInstanceOf[Option[Array[Byte]]]
+ }.getOrElse(None)
}
}
diff --git a/akka-persistence/akka-persistence-mongo/src/test/scala/MongoStorageSpec.scala b/akka-persistence/akka-persistence-mongo/src/test/scala/MongoStorageSpec.scala
index fb2034b6c1..e9576cc152 100644
--- a/akka-persistence/akka-persistence-mongo/src/test/scala/MongoStorageSpec.scala
+++ b/akka-persistence/akka-persistence-mongo/src/test/scala/MongoStorageSpec.scala
@@ -46,7 +46,7 @@ class MongoStorageSpec extends
new String(getMapStorageEntryFor("t1", "odersky".getBytes).get) should equal("scala")
getMapStorageEntryFor("t1", "torvalds".getBytes) should equal(None)
- evaluating { getMapStorageEntryFor("t2", "torvalds".getBytes) } should produce [NoSuchElementException]
+ getMapStorageEntryFor("t2", "torvalds".getBytes) should equal(None)
getMapStorageFor("t1").map { case (k, v) => (new String(k), new String(v)) } should equal (l)
@@ -54,7 +54,7 @@ class MongoStorageSpec extends
getMapStorageSizeFor("t1") should equal(2)
removeMapStorageFor("t1")
- evaluating { getMapStorageSizeFor("t1") } should produce [NoSuchElementException]
+ getMapStorageSizeFor("t1") should equal(0)
}
it("should do proper range queries") {
diff --git a/akka-persistence/akka-persistence-mongo/src/test/scala/MongoTicket343Spec.scala b/akka-persistence/akka-persistence-mongo/src/test/scala/MongoTicket343Spec.scala
new file mode 100644
index 0000000000..3b160c8c50
--- /dev/null
+++ b/akka-persistence/akka-persistence-mongo/src/test/scala/MongoTicket343Spec.scala
@@ -0,0 +1,347 @@
+package se.scalablesolutions.akka.persistence.mongo
+
+import org.scalatest.Spec
+import org.scalatest.matchers.ShouldMatchers
+import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
+import org.scalatest.junit.JUnitRunner
+import org.junit.runner.RunWith
+
+import se.scalablesolutions.akka.actor.{Actor, ActorRef}
+import se.scalablesolutions.akka.config.OneForOneStrategy
+import Actor._
+import se.scalablesolutions.akka.stm.global._
+import se.scalablesolutions.akka.config.ScalaConfig._
+import se.scalablesolutions.akka.util.Logging
+
+import MongoStorageBackend._
+
+case class GET(k: String)
+case class SET(k: String, v: String)
+case class REM(k: String)
+case class CONTAINS(k: String)
+case object MAP_SIZE
+case class MSET(kvs: List[(String, String)])
+case class REMOVE_AFTER_PUT(kvsToAdd: List[(String, String)], ksToRem: List[String])
+case class CLEAR_AFTER_PUT(kvsToAdd: List[(String, String)])
+case class PUT_WITH_SLICE(kvsToAdd: List[(String, String)], start: String, cnt: Int)
+case class PUT_REM_WITH_SLICE(kvsToAdd: List[(String, String)], ksToRem: List[String], start: String, cnt: Int)
+
+case class VADD(v: String)
+case class VUPD(i: Int, v: String)
+case class VUPD_AND_ABORT(i: Int, v: String)
+case class VGET(i: Int)
+case object VSIZE
+case class VGET_AFTER_VADD(vsToAdd: List[String], isToFetch: List[Int])
+case class VADD_WITH_SLICE(vsToAdd: List[String], start: Int, cnt: Int)
+
+object Storage {
+ class MongoSampleMapStorage extends Actor {
+ self.lifeCycle = Some(LifeCycle(Permanent))
+ val FOO_MAP = "akka.sample.map"
+
+ private var fooMap = atomic { MongoStorage.getMap(FOO_MAP) }
+
+ def receive = {
+ case SET(k, v) =>
+ atomic {
+ fooMap += (k.getBytes, v.getBytes)
+ }
+ self.reply((k, v))
+
+ case GET(k) =>
+ val v = atomic {
+ fooMap.get(k.getBytes).map(new String(_)).getOrElse(k + " Not found")
+ }
+ self.reply(v)
+
+ case REM(k) =>
+ val v = atomic {
+ fooMap -= k.getBytes
+ }
+ self.reply(k)
+
+ case CONTAINS(k) =>
+ val v = atomic {
+ fooMap contains k.getBytes
+ }
+ self.reply(v)
+
+ case MAP_SIZE =>
+ val v = atomic {
+ fooMap.size
+ }
+ self.reply(v)
+
+ case MSET(kvs) => atomic {
+ kvs.foreach {kv => fooMap += (kv._1.getBytes, kv._2.getBytes) }
+ }
+ self.reply(kvs.size)
+
+ case REMOVE_AFTER_PUT(kvs2add, ks2rem) => atomic {
+ kvs2add.foreach {kv =>
+ fooMap += (kv._1.getBytes, kv._2.getBytes)
+ }
+
+ ks2rem.foreach {k =>
+ fooMap -= k.getBytes
+ }}
+ self.reply(fooMap.size)
+
+ case CLEAR_AFTER_PUT(kvs2add) => atomic {
+ kvs2add.foreach {kv =>
+ fooMap += (kv._1.getBytes, kv._2.getBytes)
+ }
+ fooMap.clear
+ }
+ self.reply(true)
+
+ case PUT_WITH_SLICE(kvs2add, from, cnt) =>
+ val v = atomic {
+ kvs2add.foreach {kv =>
+ fooMap += (kv._1.getBytes, kv._2.getBytes)
+ }
+ fooMap.slice(Some(from.getBytes), cnt)
+ }
+ self.reply(v: List[(Array[Byte], Array[Byte])])
+
+ case PUT_REM_WITH_SLICE(kvs2add, ks2rem, from, cnt) =>
+ val v = atomic {
+ kvs2add.foreach {kv =>
+ fooMap += (kv._1.getBytes, kv._2.getBytes)
+ }
+ ks2rem.foreach {k =>
+ fooMap -= k.getBytes
+ }
+ fooMap.slice(Some(from.getBytes), cnt)
+ }
+ self.reply(v: List[(Array[Byte], Array[Byte])])
+ }
+ }
+
+ class MongoSampleVectorStorage extends Actor {
+ self.lifeCycle = Some(LifeCycle(Permanent))
+ val FOO_VECTOR = "akka.sample.vector"
+
+ private var fooVector = atomic { MongoStorage.getVector(FOO_VECTOR) }
+
+ def receive = {
+ case VADD(v) =>
+ val size =
+ atomic {
+ fooVector + v.getBytes
+ fooVector length
+ }
+ self.reply(size)
+
+ case VGET(index) =>
+ val ind =
+ atomic {
+ fooVector get index
+ }
+ self.reply(ind)
+
+ case VGET_AFTER_VADD(vs, is) =>
+ val els =
+ atomic {
+ vs.foreach(fooVector + _.getBytes)
+ (is.foldRight(List[Array[Byte]]())(fooVector.get(_) :: _)).map(new String(_))
+ }
+ self.reply(els)
+
+ case VUPD_AND_ABORT(index, value) =>
+ val l =
+ atomic {
+ fooVector.update(index, value.getBytes)
+ // force fail
+ fooVector get 100
+ }
+ self.reply(index)
+
+ case VADD_WITH_SLICE(vs, s, c) =>
+ val l =
+ atomic {
+ vs.foreach(fooVector + _.getBytes)
+ fooVector.slice(Some(s), None, c)
+ }
+ self.reply(l.map(new String(_)))
+ }
+ }
+}
+
+import Storage._
+
+@RunWith(classOf[JUnitRunner])
+class MongoTicket343Spec extends
+ Spec with
+ ShouldMatchers with
+ BeforeAndAfterAll with
+ BeforeAndAfterEach {
+
+
+ override def beforeAll {
+ MongoStorageBackend.drop
+ println("** destroyed database")
+ }
+
+ override def beforeEach {
+ MongoStorageBackend.drop
+ println("** destroyed database")
+ }
+
+ override def afterEach {
+ MongoStorageBackend.drop
+ println("** destroyed database")
+ }
+
+ describe("Ticket 343 Issue #1") {
+ it("remove after put should work within the same transaction") {
+ val proc = actorOf[MongoSampleMapStorage]
+ proc.start
+
+ (proc !! SET("debasish", "anshinsoft")).getOrElse("Set failed") should equal(("debasish", "anshinsoft"))
+ (proc !! GET("debasish")).getOrElse("Get failed") should equal("anshinsoft")
+ (proc !! MAP_SIZE).getOrElse("Size failed") should equal(1)
+
+ (proc !! MSET(List(("dg", "1"), ("mc", "2"), ("nd", "3")))).getOrElse("Mset failed") should equal(3)
+
+ (proc !! GET("dg")).getOrElse("Get failed") should equal("1")
+ (proc !! GET("mc")).getOrElse("Get failed") should equal("2")
+ (proc !! GET("nd")).getOrElse("Get failed") should equal("3")
+
+ (proc !! MAP_SIZE).getOrElse("Size failed") should equal(4)
+
+ val add = List(("a", "1"), ("b", "2"), ("c", "3"))
+ val rem = List("a", "debasish")
+ (proc !! REMOVE_AFTER_PUT(add, rem)).getOrElse("REMOVE_AFTER_PUT failed") should equal(5)
+
+ (proc !! GET("debasish")).getOrElse("debasish not found") should equal("debasish Not found")
+ (proc !! GET("a")).getOrElse("a not found") should equal("a Not found")
+
+ (proc !! GET("b")).getOrElse("b not found") should equal("2")
+
+ (proc !! CONTAINS("b")).getOrElse("b not found") should equal(true)
+ (proc !! CONTAINS("debasish")).getOrElse("debasish not found") should equal(false)
+ (proc !! MAP_SIZE).getOrElse("Size failed") should equal(5)
+ proc.stop
+ }
+ }
+
+ describe("Ticket 343 Issue #2") {
+ it("clear after put should work within the same transaction") {
+ val proc = actorOf[MongoSampleMapStorage]
+ proc.start
+
+ (proc !! SET("debasish", "anshinsoft")).getOrElse("Set failed") should equal(("debasish", "anshinsoft"))
+ (proc !! GET("debasish")).getOrElse("Get failed") should equal("anshinsoft")
+ (proc !! MAP_SIZE).getOrElse("Size failed") should equal(1)
+
+ val add = List(("a", "1"), ("b", "2"), ("c", "3"))
+ (proc !! CLEAR_AFTER_PUT(add)).getOrElse("CLEAR_AFTER_PUT failed") should equal(true)
+
+ (proc !! MAP_SIZE).getOrElse("Size failed") should equal(1)
+ proc.stop
+ }
+ }
+
+ describe("Ticket 343 Issue #3") {
+ it("map size should change after the transaction") {
+ val proc = actorOf[MongoSampleMapStorage]
+ proc.start
+
+ (proc !! SET("debasish", "anshinsoft")).getOrElse("Set failed") should equal(("debasish", "anshinsoft"))
+ (proc !! GET("debasish")).getOrElse("Get failed") should equal("anshinsoft")
+ (proc !! MAP_SIZE).getOrElse("Size failed") should equal(1)
+
+ (proc !! MSET(List(("dg", "1"), ("mc", "2"), ("nd", "3")))).getOrElse("Mset failed") should equal(3)
+ (proc !! MAP_SIZE).getOrElse("Size failed") should equal(4)
+
+ (proc !! GET("dg")).getOrElse("Get failed") should equal("1")
+ (proc !! GET("mc")).getOrElse("Get failed") should equal("2")
+ (proc !! GET("nd")).getOrElse("Get failed") should equal("3")
+ proc.stop
+ }
+ }
+
+ describe("slice test") {
+ it("should pass") {
+ val proc = actorOf[MongoSampleMapStorage]
+ proc.start
+
+ (proc !! SET("debasish", "anshinsoft")).getOrElse("Set failed") should equal(("debasish", "anshinsoft"))
+ (proc !! GET("debasish")).getOrElse("Get failed") should equal("anshinsoft")
+ // (proc !! MAP_SIZE).getOrElse("Size failed") should equal(1)
+
+ (proc !! MSET(List(("dg", "1"), ("mc", "2"), ("nd", "3")))).getOrElse("Mset failed") should equal(3)
+ (proc !! MAP_SIZE).getOrElse("Size failed") should equal(4)
+
+ (proc !! PUT_WITH_SLICE(List(("ec", "1"), ("tb", "2"), ("mc", "10")), "dg", 3)).get.asInstanceOf[List[(Array[Byte], Array[Byte])]].map { case (k, v) => (new String(k), new String(v)) } should equal(List(("dg", "1"), ("ec", "1"), ("mc", "10")))
+
+ (proc !! PUT_REM_WITH_SLICE(List(("fc", "1"), ("gb", "2"), ("xy", "10")), List("tb", "fc"), "dg", 5)).get.asInstanceOf[List[(Array[Byte], Array[Byte])]].map { case (k, v) => (new String(k), new String(v)) } should equal(List(("dg", "1"), ("ec", "1"), ("gb", "2"), ("mc", "10"), ("nd", "3")))
+ proc.stop
+ }
+ }
+
+ describe("Ticket 343 Issue #4") {
+ it("vector get should not ignore elements that were in vector before transaction") {
+
+ val proc = actorOf[MongoSampleVectorStorage]
+ proc.start
+
+ // add 4 elements in separate transactions
+ (proc !! VADD("debasish")).getOrElse("VADD failed") should equal(1)
+ (proc !! VADD("maulindu")).getOrElse("VADD failed") should equal(2)
+ (proc !! VADD("ramanendu")).getOrElse("VADD failed") should equal(3)
+ (proc !! VADD("nilanjan")).getOrElse("VADD failed") should equal(4)
+
+ new String((proc !! VGET(0)).get.asInstanceOf[Array[Byte]] ) should equal("nilanjan")
+ new String((proc !! VGET(1)).get.asInstanceOf[Array[Byte]] ) should equal("ramanendu")
+ new String((proc !! VGET(2)).get.asInstanceOf[Array[Byte]] ) should equal("maulindu")
+ new String((proc !! VGET(3)).get.asInstanceOf[Array[Byte]] ) should equal("debasish")
+
+ // now add 3 more and do gets in the same transaction
+ (proc !! VGET_AFTER_VADD(List("a", "b", "c"), List(0, 2, 4))).get.asInstanceOf[List[String]] should equal(List("c", "a", "ramanendu"))
+ proc.stop
+ }
+ }
+
+ describe("Ticket 343 Issue #6") {
+ it("vector update should not ignore transaction") {
+ val proc = actorOf[MongoSampleVectorStorage]
+ proc.start
+
+ // add 4 elements in separate transactions
+ (proc !! VADD("debasish")).getOrElse("VADD failed") should equal(1)
+ (proc !! VADD("maulindu")).getOrElse("VADD failed") should equal(2)
+ (proc !! VADD("ramanendu")).getOrElse("VADD failed") should equal(3)
+ (proc !! VADD("nilanjan")).getOrElse("VADD failed") should equal(4)
+
+ evaluating {
+ (proc !! VUPD_AND_ABORT(0, "virat")).getOrElse("VUPD_AND_ABORT failed")
+ } should produce [Exception]
+
+ // update aborts and hence values will remain unchanged
+ new String((proc !! VGET(0)).get.asInstanceOf[Array[Byte]] ) should equal("nilanjan")
+ proc.stop
+ }
+ }
+
+ describe("Ticket 343 Issue #5") {
+ it("vector slice() should not ignore elements added in current transaction") {
+ val proc = actorOf[MongoSampleVectorStorage]
+ proc.start
+
+ // add 4 elements in separate transactions
+ (proc !! VADD("debasish")).getOrElse("VADD failed") should equal(1)
+ (proc !! VADD("maulindu")).getOrElse("VADD failed") should equal(2)
+ (proc !! VADD("ramanendu")).getOrElse("VADD failed") should equal(3)
+ (proc !! VADD("nilanjan")).getOrElse("VADD failed") should equal(4)
+
+ // slice with no new elements added in current transaction
+ (proc !! VADD_WITH_SLICE(List(), 2, 2)).getOrElse("VADD_WITH_SLICE failed") should equal(Vector("maulindu", "debasish"))
+
+ // slice with new elements added in current transaction
+ (proc !! VADD_WITH_SLICE(List("a", "b", "c", "d"), 2, 2)).getOrElse("VADD_WITH_SLICE failed") should equal(Vector("b", "a"))
+ proc.stop
+ }
+ }
+}
diff --git a/akka-persistence/akka-persistence-redis/src/main/scala/RedisStorage.scala b/akka-persistence/akka-persistence-redis/src/main/scala/RedisStorage.scala
index c92761beea..1eca775567 100644
--- a/akka-persistence/akka-persistence-redis/src/main/scala/RedisStorage.scala
+++ b/akka-persistence/akka-persistence-redis/src/main/scala/RedisStorage.scala
@@ -36,7 +36,7 @@ object RedisStorage extends Storage {
*
* @author Debasish Ghosh
*/
-class RedisPersistentMap(id: String) extends PersistentMap[Array[Byte], Array[Byte]] {
+class RedisPersistentMap(id: String) extends PersistentMapBinary {
val uuid = id
val storage = RedisStorageBackend
}
diff --git a/akka-persistence/akka-persistence-redis/src/test/scala/RedisTicket343Spec.scala b/akka-persistence/akka-persistence-redis/src/test/scala/RedisTicket343Spec.scala
new file mode 100644
index 0000000000..de236b9a5a
--- /dev/null
+++ b/akka-persistence/akka-persistence-redis/src/test/scala/RedisTicket343Spec.scala
@@ -0,0 +1,351 @@
+package se.scalablesolutions.akka.persistence.redis
+
+import org.scalatest.Spec
+import org.scalatest.matchers.ShouldMatchers
+import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
+import org.scalatest.junit.JUnitRunner
+import org.junit.runner.RunWith
+
+import se.scalablesolutions.akka.actor.{Actor}
+import se.scalablesolutions.akka.config.OneForOneStrategy
+import Actor._
+import se.scalablesolutions.akka.persistence.common.PersistentVector
+import se.scalablesolutions.akka.stm.global._
+import se.scalablesolutions.akka.config.ScalaConfig._
+import se.scalablesolutions.akka.util.Logging
+
+import RedisStorageBackend._
+
+case class GET(k: String)
+case class SET(k: String, v: String)
+case class REM(k: String)
+case class CONTAINS(k: String)
+case object MAP_SIZE
+case class MSET(kvs: List[(String, String)])
+case class REMOVE_AFTER_PUT(kvsToAdd: List[(String, String)], ksToRem: List[String])
+case class CLEAR_AFTER_PUT(kvsToAdd: List[(String, String)])
+case class PUT_WITH_SLICE(kvsToAdd: List[(String, String)], start: String, cnt: Int)
+case class PUT_REM_WITH_SLICE(kvsToAdd: List[(String, String)], ksToRem: List[String], start: String, cnt: Int)
+
+case class VADD(v: String)
+case class VUPD(i: Int, v: String)
+case class VUPD_AND_ABORT(i: Int, v: String)
+case class VGET(i: Int)
+case object VSIZE
+case class VGET_AFTER_VADD(vsToAdd: List[String], isToFetch: List[Int])
+case class VADD_WITH_SLICE(vsToAdd: List[String], start: Int, cnt: Int)
+
+object Storage {
+ class RedisSampleMapStorage extends Actor {
+ self.lifeCycle = Some(LifeCycle(Permanent))
+ val FOO_MAP = "akka.sample.map"
+
+ private var fooMap = atomic { RedisStorage.getMap(FOO_MAP) }
+
+ def receive = {
+ case SET(k, v) =>
+ atomic {
+ fooMap += (k.getBytes, v.getBytes)
+ }
+ self.reply((k, v))
+
+ case GET(k) =>
+ val v = atomic {
+ fooMap.get(k.getBytes)
+ }
+ self.reply(v.collect {case byte => new String(byte)}.getOrElse(k + " Not found"))
+
+ case REM(k) =>
+ val v = atomic {
+ fooMap -= k.getBytes
+ }
+ self.reply(k)
+
+ case CONTAINS(k) =>
+ val v = atomic {
+ fooMap contains k.getBytes
+ }
+ self.reply(v)
+
+ case MAP_SIZE =>
+ val v = atomic {
+ fooMap.size
+ }
+ self.reply(v)
+
+ case MSET(kvs) =>
+ atomic {
+ kvs.foreach {kv =>
+ fooMap += (kv._1.getBytes, kv._2.getBytes)
+ }
+ }
+ self.reply(kvs.size)
+
+ case REMOVE_AFTER_PUT(kvs2add, ks2rem) =>
+ val v =
+ atomic {
+ kvs2add.foreach {kv =>
+ fooMap += (kv._1.getBytes, kv._2.getBytes)
+ }
+
+ ks2rem.foreach {k =>
+ fooMap -= k.getBytes
+ }
+ fooMap.size
+ }
+ self.reply(v)
+
+ case CLEAR_AFTER_PUT(kvs2add) =>
+ atomic {
+ kvs2add.foreach {kv =>
+ fooMap += (kv._1.getBytes, kv._2.getBytes)
+ }
+ fooMap.clear
+ }
+ self.reply(true)
+
+ case PUT_WITH_SLICE(kvs2add, from, cnt) =>
+ val v =
+ atomic {
+ kvs2add.foreach {kv =>
+ fooMap += (kv._1.getBytes, kv._2.getBytes)
+ }
+ fooMap.slice(Some(from.getBytes), cnt)
+ }
+ self.reply(v: List[(Array[Byte], Array[Byte])])
+
+ case PUT_REM_WITH_SLICE(kvs2add, ks2rem, from, cnt) =>
+ val v =
+ atomic {
+ kvs2add.foreach {kv =>
+ fooMap += (kv._1.getBytes, kv._2.getBytes)
+ }
+ ks2rem.foreach {k =>
+ fooMap -= k.getBytes
+ }
+ fooMap.slice(Some(from.getBytes), cnt)
+ }
+ self.reply(v: List[(Array[Byte], Array[Byte])])
+ }
+ }
+
+ class RedisSampleVectorStorage extends Actor {
+ self.lifeCycle = Some(LifeCycle(Permanent))
+ val FOO_VECTOR = "akka.sample.vector"
+
+ private var fooVector = atomic { RedisStorage.getVector(FOO_VECTOR) }
+
+ def receive = {
+ case VADD(v) =>
+ val size =
+ atomic {
+ fooVector + v.getBytes
+ fooVector length
+ }
+ self.reply(size)
+
+ case VGET(index) =>
+ val ind =
+ atomic {
+ fooVector get index
+ }
+ self.reply(ind)
+
+ case VGET_AFTER_VADD(vs, is) =>
+ val els =
+ atomic {
+ vs.foreach(fooVector + _.getBytes)
+ (is.foldRight(List[Array[Byte]]())(fooVector.get(_) :: _)).map(new String(_))
+ }
+ self.reply(els)
+
+ case VUPD_AND_ABORT(index, value) =>
+ val l =
+ atomic {
+ fooVector.update(index, value.getBytes)
+ // force fail
+ fooVector get 100
+ }
+ self.reply(index)
+
+ case VADD_WITH_SLICE(vs, s, c) =>
+ val l =
+ atomic {
+ vs.foreach(fooVector + _.getBytes)
+ fooVector.slice(Some(s), None, c)
+ }
+ self.reply(l.map(new String(_)))
+ }
+ }
+}
+
+import Storage._
+
+@RunWith(classOf[JUnitRunner])
+class RedisTicket343Spec extends
+ Spec with
+ ShouldMatchers with
+ BeforeAndAfterAll with
+ BeforeAndAfterEach {
+
+ override def beforeAll {
+ flushDB
+ println("** destroyed database")
+ }
+
+ override def afterEach {
+ flushDB
+ println("** destroyed database")
+ }
+
+ describe("Ticket 343 Issue #1") {
+ it("remove after put should work within the same transaction") {
+ val proc = actorOf[RedisSampleMapStorage]
+ proc.start
+
+ (proc !! SET("debasish", "anshinsoft")).getOrElse("Set failed") should equal(("debasish", "anshinsoft"))
+ (proc !! GET("debasish")).getOrElse("Get failed") should equal("anshinsoft")
+ (proc !! MAP_SIZE).getOrElse("Size failed") should equal(1)
+
+ (proc !! MSET(List(("dg", "1"), ("mc", "2"), ("nd", "3")))).getOrElse("Mset failed") should equal(3)
+
+ (proc !! GET("dg")).getOrElse("Get failed") should equal("1")
+ (proc !! GET("mc")).getOrElse("Get failed") should equal("2")
+ (proc !! GET("nd")).getOrElse("Get failed") should equal("3")
+
+ (proc !! MAP_SIZE).getOrElse("Size failed") should equal(4)
+
+ val add = List(("a", "1"), ("b", "2"), ("c", "3"))
+ val rem = List("a", "debasish")
+ (proc !! REMOVE_AFTER_PUT(add, rem)).getOrElse("REMOVE_AFTER_PUT failed") should equal(5)
+
+ (proc !! GET("debasish")).getOrElse("debasish not found") should equal("debasish Not found")
+ (proc !! GET("a")).getOrElse("a not found") should equal("a Not found")
+
+ (proc !! GET("b")).getOrElse("b not found") should equal("2")
+
+ (proc !! CONTAINS("b")).getOrElse("b not found") should equal(true)
+ (proc !! CONTAINS("debasish")).getOrElse("debasish not found") should equal(false)
+ (proc !! MAP_SIZE).getOrElse("Size failed") should equal(5)
+ proc.stop
+ }
+ }
+
+ describe("Ticket 343 Issue #2") {
+ it("clear after put should work within the same transaction") {
+ val proc = actorOf[RedisSampleMapStorage]
+ proc.start
+
+ (proc !! SET("debasish", "anshinsoft")).getOrElse("Set failed") should equal(("debasish", "anshinsoft"))
+ (proc !! GET("debasish")).getOrElse("Get failed") should equal("anshinsoft")
+ (proc !! MAP_SIZE).getOrElse("Size failed") should equal(1)
+
+ val add = List(("a", "1"), ("b", "2"), ("c", "3"))
+ (proc !! CLEAR_AFTER_PUT(add)).getOrElse("CLEAR_AFTER_PUT failed") should equal(true)
+
+ (proc !! MAP_SIZE).getOrElse("Size failed") should equal(1)
+ proc.stop
+ }
+ }
+
+ describe("Ticket 343 Issue #3") {
+ it("map size should change after the transaction") {
+ val proc = actorOf[RedisSampleMapStorage]
+ proc.start
+
+ (proc !! SET("debasish", "anshinsoft")).getOrElse("Set failed") should equal(("debasish", "anshinsoft"))
+ (proc !! GET("debasish")).getOrElse("Get failed") should equal("anshinsoft")
+ (proc !! MAP_SIZE).getOrElse("Size failed") should equal(1)
+
+ (proc !! MSET(List(("dg", "1"), ("mc", "2"), ("nd", "3")))).getOrElse("Mset failed") should equal(3)
+ (proc !! MAP_SIZE).getOrElse("Size failed") should equal(4)
+
+ (proc !! GET("dg")).getOrElse("Get failed") should equal("1")
+ (proc !! GET("mc")).getOrElse("Get failed") should equal("2")
+ (proc !! GET("nd")).getOrElse("Get failed") should equal("3")
+ proc.stop
+ }
+ }
+
+ describe("slice test") {
+ it("should pass") {
+ val proc = actorOf[RedisSampleMapStorage]
+ proc.start
+
+ (proc !! SET("debasish", "anshinsoft")).getOrElse("Set failed") should equal(("debasish", "anshinsoft"))
+ (proc !! GET("debasish")).getOrElse("Get failed") should equal("anshinsoft")
+ (proc !! MAP_SIZE).getOrElse("Size failed") should equal(1)
+
+ (proc !! MSET(List(("dg", "1"), ("mc", "2"), ("nd", "3")))).getOrElse("Mset failed") should equal(3)
+ (proc !! MAP_SIZE).getOrElse("Size failed") should equal(4)
+
+ (proc !! PUT_WITH_SLICE(List(("ec", "1"), ("tb", "2"), ("mc", "10")), "dg", 3)).get.asInstanceOf[List[(Array[Byte], Array[Byte])]].map { case (k, v) => (new String(k), new String(v)) } should equal(List(("dg", "1"), ("ec", "1"), ("mc", "10")))
+
+ (proc !! PUT_REM_WITH_SLICE(List(("fc", "1"), ("gb", "2"), ("xy", "10")), List("tb", "fc"), "dg", 5)).get.asInstanceOf[List[(Array[Byte], Array[Byte])]].map { case (k, v) => (new String(k), new String(v)) } should equal(List(("dg", "1"), ("ec", "1"), ("gb", "2"), ("mc", "10"), ("nd", "3")))
+ proc.stop
+ }
+ }
+
+ describe("Ticket 343 Issue #4") {
+ it("vector get should not ignore elements that were in vector before transaction") {
+ val proc = actorOf[RedisSampleVectorStorage]
+ proc.start
+
+ // add 4 elements in separate transactions
+ (proc !! VADD("debasish")).getOrElse("VADD failed") should equal(1)
+ (proc !! VADD("maulindu")).getOrElse("VADD failed") should equal(2)
+ (proc !! VADD("ramanendu")).getOrElse("VADD failed") should equal(3)
+ (proc !! VADD("nilanjan")).getOrElse("VADD failed") should equal(4)
+
+ new String((proc !! VGET(0)).get.asInstanceOf[Array[Byte]] ) should equal("nilanjan")
+ new String((proc !! VGET(1)).get.asInstanceOf[Array[Byte]] ) should equal("ramanendu")
+ new String((proc !! VGET(2)).get.asInstanceOf[Array[Byte]] ) should equal("maulindu")
+ new String((proc !! VGET(3)).get.asInstanceOf[Array[Byte]] ) should equal("debasish")
+
+ // now add 3 more and do gets in the same transaction
+ (proc !! VGET_AFTER_VADD(List("a", "b", "c"), List(0, 2, 4))).get.asInstanceOf[List[String]] should equal(List("c", "a", "ramanendu"))
+ proc.stop
+ }
+ }
+
+ describe("Ticket 343 Issue #6") {
+ it("vector update should not ignore transaction") {
+ val proc = actorOf[RedisSampleVectorStorage]
+ proc.start
+
+ // add 4 elements in separate transactions
+ (proc !! VADD("debasish")).getOrElse("VADD failed") should equal(1)
+ (proc !! VADD("maulindu")).getOrElse("VADD failed") should equal(2)
+ (proc !! VADD("ramanendu")).getOrElse("VADD failed") should equal(3)
+ (proc !! VADD("nilanjan")).getOrElse("VADD failed") should equal(4)
+
+ evaluating {
+ (proc !! VUPD_AND_ABORT(0, "virat")).getOrElse("VUPD_AND_ABORT failed")
+ } should produce [Exception]
+
+ // update aborts and hence values will remain unchanged
+ new String((proc !! VGET(0)).get.asInstanceOf[Array[Byte]] ) should equal("nilanjan")
+ proc.stop
+ }
+ }
+
+ describe("Ticket 343 Issue #5") {
+ it("vector slice() should not ignore elements added in current transaction") {
+ val proc = actorOf[RedisSampleVectorStorage]
+ proc.start
+
+ // add 4 elements in separate transactions
+ (proc !! VADD("debasish")).getOrElse("VADD failed") should equal(1)
+ (proc !! VADD("maulindu")).getOrElse("VADD failed") should equal(2)
+ (proc !! VADD("ramanendu")).getOrElse("VADD failed") should equal(3)
+ (proc !! VADD("nilanjan")).getOrElse("VADD failed") should equal(4)
+
+ // slice with no new elements added in current transaction
+ (proc !! VADD_WITH_SLICE(List(), 2, 2)).getOrElse("VADD_WITH_SLICE failed") should equal(Vector("maulindu", "debasish"))
+
+ // slice with new elements added in current transaction
+ (proc !! VADD_WITH_SLICE(List("a", "b", "c", "d"), 2, 2)).getOrElse("VADD_WITH_SLICE failed") should equal(Vector("b", "a"))
+ proc.stop
+ }
+ }
+}
diff --git a/project/build/AkkaProject.scala b/project/build/AkkaProject.scala
index 0eb289a290..0ee2b18108 100644
--- a/project/build/AkkaProject.scala
+++ b/project/build/AkkaProject.scala
@@ -482,7 +482,7 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) {
val commons_codec = Dependencies.commons_codec
val redis = Dependencies.redis
- override def testOptions = TestFilter((name: String) => name.endsWith("Test")) :: Nil
+ // override def testOptions = TestFilter((name: String) => name.endsWith("Test")) :: Nil
}
// -------------------------------------------------------------------------------------------------------------------