From 8494f8b1fc1d981083873bf39ce4190b6a40eb7e Mon Sep 17 00:00:00 2001 From: Debasish Ghosh Date: Fri, 10 Sep 2010 23:08:25 +0530 Subject: [PATCH] changes for ticket #343. Test harness runs for both Redis and Mongo --- .../src/main/scala/CassandraStorage.scala | 2 +- .../src/main/scala/Storage.scala | 339 +++++++++++++---- .../src/main/scala/MongoStorage.scala | 2 +- .../src/main/scala/MongoStorageBackend.scala | 103 ++--- .../src/test/scala/MongoStorageSpec.scala | 4 +- .../src/test/scala/MongoTicket343Spec.scala | 347 +++++++++++++++++ .../src/main/scala/RedisStorage.scala | 2 +- .../src/test/scala/RedisTicket343Spec.scala | 351 ++++++++++++++++++ project/build/AkkaProject.scala | 2 +- 9 files changed, 1039 insertions(+), 113 deletions(-) create mode 100644 akka-persistence/akka-persistence-mongo/src/test/scala/MongoTicket343Spec.scala create mode 100644 akka-persistence/akka-persistence-redis/src/test/scala/RedisTicket343Spec.scala diff --git a/akka-persistence/akka-persistence-cassandra/src/main/scala/CassandraStorage.scala b/akka-persistence/akka-persistence-cassandra/src/main/scala/CassandraStorage.scala index be5fc4f4c7..0c6f239ef7 100644 --- a/akka-persistence/akka-persistence-cassandra/src/main/scala/CassandraStorage.scala +++ b/akka-persistence/akka-persistence-cassandra/src/main/scala/CassandraStorage.scala @@ -29,7 +29,7 @@ object CassandraStorage extends Storage { * * @author Jonas Bonér */ -class CassandraPersistentMap(id: String) extends PersistentMap[Array[Byte], Array[Byte]] { +class CassandraPersistentMap(id: String) extends PersistentMapBinary { val uuid = id val storage = CassandraStorageBackend } diff --git a/akka-persistence/akka-persistence-common/src/main/scala/Storage.scala b/akka-persistence/akka-persistence-common/src/main/scala/Storage.scala index ccaf7518f1..4d9ff48a60 100644 --- a/akka-persistence/akka-persistence-common/src/main/scala/Storage.scala +++ b/akka-persistence/akka-persistence-common/src/main/scala/Storage.scala @@ -7,9 +7,11 @@ package se.scalablesolutions.akka.persistence.common import se.scalablesolutions.akka.stm._ import se.scalablesolutions.akka.stm.TransactionManagement.transaction import se.scalablesolutions.akka.util.Logging -import se.scalablesolutions.akka.AkkaException -class StorageException(message: String) extends AkkaException(message) +// FIXME move to 'stm' package + add message with more info +class NoTransactionInScopeException extends RuntimeException + +class StorageException(message: String) extends RuntimeException(message) /** * Example Scala usage. @@ -80,24 +82,90 @@ trait Storage { */ trait PersistentMap[K, V] extends scala.collection.mutable.Map[K, V] with Transactional with Committable with Abortable with Logging { - protected val newAndUpdatedEntries = TransactionalMap[K, V]() - protected val removedEntries = TransactionalVector[K]() protected val shouldClearOnCommit = Ref[Boolean]() + // operations on the Map + trait Op + case object GET extends Op + case object PUT extends Op + case object REM extends Op + case object UPD extends Op + + // append only log: records all mutating operations + protected val appendOnlyTxLog = TransactionalVector[LogEntry]() + + case class LogEntry(key: K, value: Option[V], op: Op) + + // need to override in subclasses e.g. "sameElements" for Array[Byte] + def equal(k1: K, k2: K): Boolean = k1 == k2 + + // Seqable type that's required for maintaining the log of distinct keys affected in current transaction + type T <: Equals + + // converts key K to the Seqable type Equals + def toEquals(k: K): T + + // keys affected in the current transaction + protected val keysInCurrentTx = TransactionalMap[T, K]() + + protected def addToListOfKeysInTx(key: K): Unit = + keysInCurrentTx += (toEquals(key), key) + + protected def clearDistinctKeys = keysInCurrentTx.clear + + protected def filterTxLogByKey(key: K): IndexedSeq[LogEntry] = + appendOnlyTxLog filter(e => equal(e.key, key)) + + // need to get current value considering the underlying storage as well as the transaction log + protected def getCurrentValue(key: K): Option[V] = { + + // get all mutating entries for this key for this tx + val txEntries = filterTxLogByKey(key) + + // get the snapshot from the underlying store for this key + val underlying = try { + storage.getMapStorageEntryFor(uuid, key) + } catch { case e: Exception => None } + + if (txEntries.isEmpty) underlying + else replay(txEntries, key, underlying) + } + + // replay all tx entries for key k with seed = initial + private def replay(txEntries: IndexedSeq[LogEntry], key: K, initial: Option[V]): Option[V] = { + import scala.collection.mutable._ + + val m = initial match { + case None => Map.empty[K, V] + case Some(v) => Map((key, v)) + } + txEntries.foreach {case LogEntry(k, v, o) => o match { + case PUT => m.put(k, v.get) + case REM => m -= k + case UPD => m.update(k, v.get) + }} + m get key + } + // to be concretized in subclasses val storage: MapStorageBackend[K, V] def commit = { - if (shouldClearOnCommit.isDefined && shouldClearOnCommit.get) storage.removeMapStorageFor(uuid) - removedEntries.toList.foreach(key => storage.removeMapStorageFor(uuid, key)) - storage.insertMapStorageEntriesFor(uuid, newAndUpdatedEntries.toList) - newAndUpdatedEntries.clear - removedEntries.clear + // if (shouldClearOnCommit.isDefined && shouldClearOnCommit.get) storage.removeMapStorageFor(uuid) + + appendOnlyTxLog.foreach { case LogEntry(k, v, o) => o match { + case PUT => storage.insertMapStorageEntryFor(uuid, k, v.get) + case UPD => storage.insertMapStorageEntryFor(uuid, k, v.get) + case REM => storage.removeMapStorageFor(uuid, k) + }} + + appendOnlyTxLog.clear + clearDistinctKeys } def abort = { - newAndUpdatedEntries.clear - removedEntries.clear + appendOnlyTxLog.clear + clearDistinctKeys shouldClearOnCommit.swap(false) } @@ -118,68 +186,84 @@ trait PersistentMap[K, V] extends scala.collection.mutable.Map[K, V] override def put(key: K, value: V): Option[V] = { register - newAndUpdatedEntries.put(key, value) + val curr = getCurrentValue(key) + appendOnlyTxLog add LogEntry(key, Some(value), PUT) + addToListOfKeysInTx(key) + curr } override def update(key: K, value: V) = { register - newAndUpdatedEntries.update(key, value) + val curr = getCurrentValue(key) + appendOnlyTxLog add LogEntry(key, Some(value), UPD) + addToListOfKeysInTx(key) + curr } override def remove(key: K) = { register - removedEntries.add(key) - newAndUpdatedEntries.get(key) + val curr = getCurrentValue(key) + appendOnlyTxLog add LogEntry(key, None, REM) + addToListOfKeysInTx(key) + curr } - def slice(start: Option[K], count: Int): List[Tuple2[K, V]] = + def slice(start: Option[K], count: Int): List[(K, V)] = slice(start, None, count) - def slice(start: Option[K], finish: Option[K], count: Int): List[Tuple2[K, V]] = try { - storage.getMapStorageRangeFor(uuid, start, finish, count) - } catch { case e: Exception => Nil } + def slice(start: Option[K], finish: Option[K], count: Int): List[(K, V)] override def clear = { register + appendOnlyTxLog.clear + clearDistinctKeys shouldClearOnCommit.swap(true) } override def contains(key: K): Boolean = try { - newAndUpdatedEntries.contains(key) || - storage.getMapStorageEntryFor(uuid, key).isDefined + filterTxLogByKey(key) match { + case Seq() => // current tx doesn't use this + storage.getMapStorageEntryFor(uuid, key).isDefined // check storage + case txs => // present in log + txs.last.op != REM // last entry cannot be a REM + } } catch { case e: Exception => false } + protected def existsInStorage(key: K): Option[V] = try { + storage.getMapStorageEntryFor(uuid, key) + } catch { + case e: Exception => None + } + override def size: Int = try { - storage.getMapStorageSizeFor(uuid) - } catch { case e: Exception => 0 } + // partition key set affected in current tx into those which r added & which r deleted + val (keysAdded, keysRemoved) = keysInCurrentTx.map { + case (kseq, k) => ((kseq, k), getCurrentValue(k)) + }.partition(_._2.isDefined) - override def get(key: K): Option[V] = { - if (newAndUpdatedEntries.contains(key)) { - newAndUpdatedEntries.get(key) - } - else try { - storage.getMapStorageEntryFor(uuid, key) - } catch { case e: Exception => None } + // keys which existed in storage but removed in current tx + val inStorageRemovedInTx = + keysRemoved.keySet + .map(_._2) + .filter(k => existsInStorage(k).isDefined) + .size + + // all keys in storage + val keysInStorage = + storage.getMapStorageFor(uuid) + .map { case (k, v) => toEquals(k) } + .toSet + + // (keys that existed UNION keys added ) - (keys removed) + (keysInStorage union keysAdded.keySet.map(_._1)).size - inStorageRemovedInTx + } catch { + case e: Exception => 0 } - def iterator = elements + // get must consider underlying storage & current uncommitted tx log + override def get(key: K): Option[V] = getCurrentValue(key) - override def elements: Iterator[Tuple2[K, V]] = { - new Iterator[Tuple2[K, V]] { - private val originalList: List[Tuple2[K, V]] = try { - storage.getMapStorageFor(uuid) - } catch { - case e: Throwable => Nil - } - private var elements = newAndUpdatedEntries.toList union originalList.reverse - override def next: Tuple2[K, V]= synchronized { - val element = elements.head - elements = elements.tail - element - } - override def hasNext: Boolean = synchronized { !elements.isEmpty } - } - } + def iterator: Iterator[Tuple2[K, V]] private def register = { if (transaction.get.isEmpty) throw new NoTransactionInScopeException @@ -187,6 +271,95 @@ trait PersistentMap[K, V] extends scala.collection.mutable.Map[K, V] } } +trait PersistentMapBinary extends PersistentMap[Array[Byte], Array[Byte]] { + import scala.collection.mutable.ArraySeq + + type T = ArraySeq[Byte] + def toEquals(k: Array[Byte]) = ArraySeq(k: _*) + override def equal(k1: Array[Byte], k2: Array[Byte]): Boolean = k1 sameElements k2 + + object COrdering { + implicit object ArraySeqOrdering extends Ordering[ArraySeq[Byte]] { + def compare(o1: ArraySeq[Byte], o2: ArraySeq[Byte]) = + new String(o1.toArray) compare new String(o2.toArray) + } + } + + import scala.collection.immutable.{TreeMap, SortedMap} + private def replayAllKeys: SortedMap[ArraySeq[Byte], Array[Byte]] = { + import COrdering._ + + // need ArraySeq for ordering + val fromStorage = + TreeMap(storage.getMapStorageFor(uuid).map { case (k, v) => (ArraySeq(k: _*), v) }: _*) + + val (keysAdded, keysRemoved) = keysInCurrentTx.map { + case (_, k) => (k, getCurrentValue(k)) + }.partition(_._2.isDefined) + + val inStorageRemovedInTx = + keysRemoved.keySet + .filter(k => existsInStorage(k).isDefined) + .map(k => ArraySeq(k: _*)) + + (fromStorage -- inStorageRemovedInTx) ++ keysAdded.map { case (k, Some(v)) => (ArraySeq(k: _*), v) } + } + + override def slice(start: Option[Array[Byte]], finish: Option[Array[Byte]], count: Int): List[(Array[Byte], Array[Byte])] = try { + val newMap = replayAllKeys + + if (newMap isEmpty) List[(Array[Byte], Array[Byte])]() + + val startKey = + start match { + case Some(bytes) => Some(ArraySeq(bytes: _*)) + case None => None + } + + val endKey = + finish match { + case Some(bytes) => Some(ArraySeq(bytes: _*)) + case None => None + } + + ((startKey, endKey, count): @unchecked) match { + case ((Some(s), Some(e), _)) => + newMap.range(s, e) + .toList + .map(e => (e._1.toArray, e._2)) + .toList + case ((Some(s), None, c)) if c > 0 => + newMap.from(s) + .iterator + .take(count) + .map(e => (e._1.toArray, e._2)) + .toList + case ((Some(s), None, _)) => + newMap.from(s) + .toList + .map(e => (e._1.toArray, e._2)) + .toList + case ((None, Some(e), _)) => + newMap.until(e) + .toList + .map(e => (e._1.toArray, e._2)) + .toList + } + } catch { case e: Exception => Nil } + + override def iterator: Iterator[(Array[Byte], Array[Byte])] = { + new Iterator[(Array[Byte], Array[Byte])] { + private var elements = replayAllKeys + override def next: (Array[Byte], Array[Byte]) = synchronized { + val (k, v) = elements.head + elements = elements.tail + (k.toArray, v) + } + override def hasNext: Boolean = synchronized { !elements.isEmpty } + } + } +} + /** * Implements a template for a concrete persistent transactional vector based storage. * @@ -198,42 +371,83 @@ trait PersistentVector[T] extends IndexedSeq[T] with Transactional with Committa protected val removedElems = TransactionalVector[T]() protected val shouldClearOnCommit = Ref[Boolean]() + // operations on the Vector + trait Op + case object ADD extends Op + case object UPD extends Op + case object POP extends Op + + // append only log: records all mutating operations + protected val appendOnlyTxLog = TransactionalVector[LogEntry]() + + case class LogEntry(index: Option[Int], value: Option[T], op: Op) + + // need to override in subclasses e.g. "sameElements" for Array[Byte] + def equal(v1: T, v2: T): Boolean = v1 == v2 + val storage: VectorStorageBackend[T] def commit = { - for (element <- newElems) storage.insertVectorStorageEntryFor(uuid, element) - for (entry <- updatedElems) storage.updateVectorStorageEntryFor(uuid, entry._1, entry._2) - newElems.clear - updatedElems.clear + for(entry <- appendOnlyTxLog) { + entry match { + case LogEntry(_, Some(v), ADD) => storage.insertVectorStorageEntryFor(uuid, v) + case LogEntry(Some(i), Some(v), UPD) => storage.updateVectorStorageEntryFor(uuid, i, v) + case LogEntry(_, _, POP) => //.. + } + } + appendOnlyTxLog.clear } def abort = { - newElems.clear - updatedElems.clear - removedElems.clear + appendOnlyTxLog.clear shouldClearOnCommit.swap(false) } + private def replay: List[T] = { + import scala.collection.mutable.ArrayBuffer + var elemsStorage = ArrayBuffer(storage.getVectorStorageRangeFor(uuid, None, None, storage.getVectorStorageSizeFor(uuid)).reverse: _*) + + for(entry <- appendOnlyTxLog) { + entry match { + case LogEntry(_, Some(v), ADD) => elemsStorage += v + case LogEntry(Some(i), Some(v), UPD) => elemsStorage.update(i, v) + case LogEntry(_, _, POP) => elemsStorage = elemsStorage.drop(1) + } + } + elemsStorage.toList.reverse + } + def +(elem: T) = add(elem) def add(elem: T) = { register - newElems + elem + appendOnlyTxLog + LogEntry(None, Some(elem), ADD) } def apply(index: Int): T = get(index) def get(index: Int): T = { - if (newElems.size > index) newElems(index) - else storage.getVectorStorageEntryFor(uuid, index) + if (appendOnlyTxLog.isEmpty) { + storage.getVectorStorageEntryFor(uuid, index) + } else { + val curr = replay + curr(index) + } } override def slice(start: Int, finish: Int): IndexedSeq[T] = slice(Some(start), Some(finish)) def slice(start: Option[Int], finish: Option[Int], count: Int = 0): IndexedSeq[T] = { - val buffer = new scala.collection.mutable.ArrayBuffer[T] - storage.getVectorStorageRangeFor(uuid, start, finish, count).foreach(buffer.append(_)) - buffer + val curr = replay + val s = if (start.isDefined) start.get else 0 + val cnt = + if (finish.isDefined) { + val f = finish.get + if (f >= s) (f - s) else count + } + else count + if (s == 0 && cnt == 0) List().toIndexedSeq + else curr.slice(s, s + cnt).toIndexedSeq } /** @@ -241,12 +455,13 @@ trait PersistentVector[T] extends IndexedSeq[T] with Transactional with Committa */ def pop: T = { register + appendOnlyTxLog + LogEntry(None, None, POP) throw new UnsupportedOperationException("PersistentVector::pop is not implemented") } def update(index: Int, newElem: T) = { register - storage.updateVectorStorageEntryFor(uuid, index, newElem) + appendOnlyTxLog + LogEntry(Some(index), Some(newElem), UPD) } override def first: T = get(0) @@ -260,7 +475,7 @@ trait PersistentVector[T] extends IndexedSeq[T] with Transactional with Committa } } - def length: Int = storage.getVectorStorageSizeFor(uuid) + newElems.length + def length: Int = replay.length private def register = { if (transaction.get.isEmpty) throw new NoTransactionInScopeException diff --git a/akka-persistence/akka-persistence-mongo/src/main/scala/MongoStorage.scala b/akka-persistence/akka-persistence-mongo/src/main/scala/MongoStorage.scala index 79cacfeb07..83e47e3ba5 100644 --- a/akka-persistence/akka-persistence-mongo/src/main/scala/MongoStorage.scala +++ b/akka-persistence/akka-persistence-mongo/src/main/scala/MongoStorage.scala @@ -29,7 +29,7 @@ object MongoStorage extends Storage { * * @author Debasish Ghosh */ -class MongoPersistentMap(id: String) extends PersistentMap[Array[Byte], Array[Byte]] { +class MongoPersistentMap(id: String) extends PersistentMapBinary { val uuid = id val storage = MongoStorageBackend } diff --git a/akka-persistence/akka-persistence-mongo/src/main/scala/MongoStorageBackend.scala b/akka-persistence/akka-persistence-mongo/src/main/scala/MongoStorageBackend.scala index 847c226630..d51ff17dab 100644 --- a/akka-persistence/akka-persistence-mongo/src/main/scala/MongoStorageBackend.scala +++ b/akka-persistence/akka-persistence-mongo/src/main/scala/MongoStorageBackend.scala @@ -64,51 +64,60 @@ private[akka] object MongoStorageBackend extends coll.remove(q) } - private def queryFor[T](name: String)(body: (MongoDBObject, MongoDBObject) => T): T = { - val q: DBObject = MongoDBObject(KEY -> name) - val dbo = coll.findOne(q).getOrElse { throw new NoSuchElementException(name + " not present") } - body(q, dbo) + + private def queryFor[T](name: String)(body: (MongoDBObject, Option[DBObject]) => T): T = { + val q = MongoDBObject(KEY -> name) + body(q, coll.findOne(q)) } def removeMapStorageFor(name: String, key: Array[Byte]): Unit = queryFor(name) { (q, dbo) => - dbo -= new String(key) - coll.update(q, dbo, true, false) + dbo.foreach { d => + d -= new String(key) + coll.update(q, d, true, false) + } } def getMapStorageEntryFor(name: String, key: Array[Byte]): Option[Array[Byte]] = queryFor(name) { (q, dbo) => - dbo.get(new String(key)).asInstanceOf[Option[Array[Byte]]] + dbo.map { d => + Option(d.get(new String(key))).asInstanceOf[Option[Array[Byte]]] + }.getOrElse(None) } def getMapStorageSizeFor(name: String): Int = queryFor(name) { (q, dbo) => - dbo.size - 2 // need to exclude object id and our KEY + dbo.map { d => + d.size - 2 // need to exclude object id and our KEY + }.getOrElse(0) } def getMapStorageFor(name: String): List[(Array[Byte], Array[Byte])] = queryFor(name) { (q, dbo) => - for { - (k, v) <- dbo.toList - if k != "_id" && k != KEY - } yield (k.getBytes, v.asInstanceOf[Array[Byte]]) + dbo.map { d => + for { + (k, v) <- d.toList + if k != "_id" && k != KEY + } yield (k.getBytes, v.asInstanceOf[Array[Byte]]) + }.getOrElse(List.empty[(Array[Byte], Array[Byte])]) } def getMapStorageRangeFor(name: String, start: Option[Array[Byte]], finish: Option[Array[Byte]], count: Int): List[(Array[Byte], Array[Byte])] = queryFor(name) { (q, dbo) => - // get all keys except the special ones - val keys = - dbo.keySet - .toList - .filter(k => k != "_id" && k != KEY) - .sortWith(_ < _) + dbo.map { d => + // get all keys except the special ones + val keys = d.keys + .filter(k => k != "_id" && k != KEY) + .toList + .sortWith(_ < _) - // if the supplied start is not defined, get the head of keys - val s = start.map(new String(_)).getOrElse(keys.head) + // if the supplied start is not defined, get the head of keys + val s = start.map(new String(_)).getOrElse(keys.head) - // if the supplied finish is not defined, get the last element of keys - val f = finish.map(new String(_)).getOrElse(keys.last) + // if the supplied finish is not defined, get the last element of keys + val f = finish.map(new String(_)).getOrElse(keys.last) - // slice from keys: both ends inclusive - val ks = keys.slice(keys.indexOf(s), scala.math.min(count, keys.indexOf(f) + 1)) - ks.map(k => (k.getBytes, dbo.get(k).get.asInstanceOf[Array[Byte]])) + // slice from keys: both ends inclusive + val ks = keys.slice(keys.indexOf(s), scala.math.min(count, keys.indexOf(f) + 1)) + ks.map(k => (k.getBytes, d.get(k).asInstanceOf[Array[Byte]])) + }.getOrElse(List.empty[(Array[Byte], Array[Byte])]) } def insertVectorStorageEntryFor(name: String, element: Array[Byte]) = { @@ -147,12 +156,16 @@ private[akka] object MongoStorageBackend extends } def updateVectorStorageEntryFor(name: String, index: Int, elem: Array[Byte]) = queryFor(name) { (q, dbo) => - dbo += ((index.toString, elem)) - coll.update(q, dbo, true, false) + dbo.foreach { d => + d += ((index.toString, elem)) + coll.update(q, d, true, false) + } } def getVectorStorageEntryFor(name: String, index: Int): Array[Byte] = queryFor(name) { (q, dbo) => - dbo(index.toString).asInstanceOf[Array[Byte]] + dbo.map { d => + d(index.toString).asInstanceOf[Array[Byte]] + }.getOrElse(Array.empty[Byte]) } /** @@ -162,24 +175,26 @@ private[akka] object MongoStorageBackend extends * if start == 0 and finish == 0, return an empty collection */ def getVectorStorageRangeFor(name: String, start: Option[Int], finish: Option[Int], count: Int): List[Array[Byte]] = queryFor(name) { (q, dbo) => - val ls = dbo.filter { case (k, v) => k != KEY && k != "_id" } + dbo.map { d => + val ls = d.filter { case (k, v) => k != KEY && k != "_id" } .toSeq .sortWith((e1, e2) => (e1._1.toInt < e2._1.toInt)) .map(_._2) - val st = start.getOrElse(0) - val cnt = - if (finish.isDefined) { - val f = finish.get - if (f >= st) (f - st) else count - } - else count - if (st == 0 && cnt == 0) List() - ls.slice(st, st + cnt).asInstanceOf[List[Array[Byte]]] + val st = start.getOrElse(0) + val cnt = + if (finish.isDefined) { + val f = finish.get + if (f >= st) (f - st) else count + } + else count + if (st == 0 && cnt == 0) List() + ls.slice(st, st + cnt).asInstanceOf[List[Array[Byte]]] + }.getOrElse(List.empty[Array[Byte]]) } def getVectorStorageSizeFor(name: String): Int = queryFor(name) { (q, dbo) => - dbo.size - 2 + dbo.map { d => d.size - 2 }.getOrElse(0) } def insertRefStorageFor(name: String, element: Array[Byte]) = { @@ -201,11 +216,9 @@ private[akka] object MongoStorageBackend extends } } - def getRefStorageFor(name: String): Option[Array[Byte]] = try { - queryFor(name) { (q, dbo) => - dbo.get(REF).asInstanceOf[Option[Array[Byte]]] - } - } catch { - case e: java.util.NoSuchElementException => None + def getRefStorageFor(name: String): Option[Array[Byte]] = queryFor(name) { (q, dbo) => + dbo.map { d => + Option(d.get(REF)).asInstanceOf[Option[Array[Byte]]] + }.getOrElse(None) } } diff --git a/akka-persistence/akka-persistence-mongo/src/test/scala/MongoStorageSpec.scala b/akka-persistence/akka-persistence-mongo/src/test/scala/MongoStorageSpec.scala index fb2034b6c1..e9576cc152 100644 --- a/akka-persistence/akka-persistence-mongo/src/test/scala/MongoStorageSpec.scala +++ b/akka-persistence/akka-persistence-mongo/src/test/scala/MongoStorageSpec.scala @@ -46,7 +46,7 @@ class MongoStorageSpec extends new String(getMapStorageEntryFor("t1", "odersky".getBytes).get) should equal("scala") getMapStorageEntryFor("t1", "torvalds".getBytes) should equal(None) - evaluating { getMapStorageEntryFor("t2", "torvalds".getBytes) } should produce [NoSuchElementException] + getMapStorageEntryFor("t2", "torvalds".getBytes) should equal(None) getMapStorageFor("t1").map { case (k, v) => (new String(k), new String(v)) } should equal (l) @@ -54,7 +54,7 @@ class MongoStorageSpec extends getMapStorageSizeFor("t1") should equal(2) removeMapStorageFor("t1") - evaluating { getMapStorageSizeFor("t1") } should produce [NoSuchElementException] + getMapStorageSizeFor("t1") should equal(0) } it("should do proper range queries") { diff --git a/akka-persistence/akka-persistence-mongo/src/test/scala/MongoTicket343Spec.scala b/akka-persistence/akka-persistence-mongo/src/test/scala/MongoTicket343Spec.scala new file mode 100644 index 0000000000..3b160c8c50 --- /dev/null +++ b/akka-persistence/akka-persistence-mongo/src/test/scala/MongoTicket343Spec.scala @@ -0,0 +1,347 @@ +package se.scalablesolutions.akka.persistence.mongo + +import org.scalatest.Spec +import org.scalatest.matchers.ShouldMatchers +import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach} +import org.scalatest.junit.JUnitRunner +import org.junit.runner.RunWith + +import se.scalablesolutions.akka.actor.{Actor, ActorRef} +import se.scalablesolutions.akka.config.OneForOneStrategy +import Actor._ +import se.scalablesolutions.akka.stm.global._ +import se.scalablesolutions.akka.config.ScalaConfig._ +import se.scalablesolutions.akka.util.Logging + +import MongoStorageBackend._ + +case class GET(k: String) +case class SET(k: String, v: String) +case class REM(k: String) +case class CONTAINS(k: String) +case object MAP_SIZE +case class MSET(kvs: List[(String, String)]) +case class REMOVE_AFTER_PUT(kvsToAdd: List[(String, String)], ksToRem: List[String]) +case class CLEAR_AFTER_PUT(kvsToAdd: List[(String, String)]) +case class PUT_WITH_SLICE(kvsToAdd: List[(String, String)], start: String, cnt: Int) +case class PUT_REM_WITH_SLICE(kvsToAdd: List[(String, String)], ksToRem: List[String], start: String, cnt: Int) + +case class VADD(v: String) +case class VUPD(i: Int, v: String) +case class VUPD_AND_ABORT(i: Int, v: String) +case class VGET(i: Int) +case object VSIZE +case class VGET_AFTER_VADD(vsToAdd: List[String], isToFetch: List[Int]) +case class VADD_WITH_SLICE(vsToAdd: List[String], start: Int, cnt: Int) + +object Storage { + class MongoSampleMapStorage extends Actor { + self.lifeCycle = Some(LifeCycle(Permanent)) + val FOO_MAP = "akka.sample.map" + + private var fooMap = atomic { MongoStorage.getMap(FOO_MAP) } + + def receive = { + case SET(k, v) => + atomic { + fooMap += (k.getBytes, v.getBytes) + } + self.reply((k, v)) + + case GET(k) => + val v = atomic { + fooMap.get(k.getBytes).map(new String(_)).getOrElse(k + " Not found") + } + self.reply(v) + + case REM(k) => + val v = atomic { + fooMap -= k.getBytes + } + self.reply(k) + + case CONTAINS(k) => + val v = atomic { + fooMap contains k.getBytes + } + self.reply(v) + + case MAP_SIZE => + val v = atomic { + fooMap.size + } + self.reply(v) + + case MSET(kvs) => atomic { + kvs.foreach {kv => fooMap += (kv._1.getBytes, kv._2.getBytes) } + } + self.reply(kvs.size) + + case REMOVE_AFTER_PUT(kvs2add, ks2rem) => atomic { + kvs2add.foreach {kv => + fooMap += (kv._1.getBytes, kv._2.getBytes) + } + + ks2rem.foreach {k => + fooMap -= k.getBytes + }} + self.reply(fooMap.size) + + case CLEAR_AFTER_PUT(kvs2add) => atomic { + kvs2add.foreach {kv => + fooMap += (kv._1.getBytes, kv._2.getBytes) + } + fooMap.clear + } + self.reply(true) + + case PUT_WITH_SLICE(kvs2add, from, cnt) => + val v = atomic { + kvs2add.foreach {kv => + fooMap += (kv._1.getBytes, kv._2.getBytes) + } + fooMap.slice(Some(from.getBytes), cnt) + } + self.reply(v: List[(Array[Byte], Array[Byte])]) + + case PUT_REM_WITH_SLICE(kvs2add, ks2rem, from, cnt) => + val v = atomic { + kvs2add.foreach {kv => + fooMap += (kv._1.getBytes, kv._2.getBytes) + } + ks2rem.foreach {k => + fooMap -= k.getBytes + } + fooMap.slice(Some(from.getBytes), cnt) + } + self.reply(v: List[(Array[Byte], Array[Byte])]) + } + } + + class MongoSampleVectorStorage extends Actor { + self.lifeCycle = Some(LifeCycle(Permanent)) + val FOO_VECTOR = "akka.sample.vector" + + private var fooVector = atomic { MongoStorage.getVector(FOO_VECTOR) } + + def receive = { + case VADD(v) => + val size = + atomic { + fooVector + v.getBytes + fooVector length + } + self.reply(size) + + case VGET(index) => + val ind = + atomic { + fooVector get index + } + self.reply(ind) + + case VGET_AFTER_VADD(vs, is) => + val els = + atomic { + vs.foreach(fooVector + _.getBytes) + (is.foldRight(List[Array[Byte]]())(fooVector.get(_) :: _)).map(new String(_)) + } + self.reply(els) + + case VUPD_AND_ABORT(index, value) => + val l = + atomic { + fooVector.update(index, value.getBytes) + // force fail + fooVector get 100 + } + self.reply(index) + + case VADD_WITH_SLICE(vs, s, c) => + val l = + atomic { + vs.foreach(fooVector + _.getBytes) + fooVector.slice(Some(s), None, c) + } + self.reply(l.map(new String(_))) + } + } +} + +import Storage._ + +@RunWith(classOf[JUnitRunner]) +class MongoTicket343Spec extends + Spec with + ShouldMatchers with + BeforeAndAfterAll with + BeforeAndAfterEach { + + + override def beforeAll { + MongoStorageBackend.drop + println("** destroyed database") + } + + override def beforeEach { + MongoStorageBackend.drop + println("** destroyed database") + } + + override def afterEach { + MongoStorageBackend.drop + println("** destroyed database") + } + + describe("Ticket 343 Issue #1") { + it("remove after put should work within the same transaction") { + val proc = actorOf[MongoSampleMapStorage] + proc.start + + (proc !! SET("debasish", "anshinsoft")).getOrElse("Set failed") should equal(("debasish", "anshinsoft")) + (proc !! GET("debasish")).getOrElse("Get failed") should equal("anshinsoft") + (proc !! MAP_SIZE).getOrElse("Size failed") should equal(1) + + (proc !! MSET(List(("dg", "1"), ("mc", "2"), ("nd", "3")))).getOrElse("Mset failed") should equal(3) + + (proc !! GET("dg")).getOrElse("Get failed") should equal("1") + (proc !! GET("mc")).getOrElse("Get failed") should equal("2") + (proc !! GET("nd")).getOrElse("Get failed") should equal("3") + + (proc !! MAP_SIZE).getOrElse("Size failed") should equal(4) + + val add = List(("a", "1"), ("b", "2"), ("c", "3")) + val rem = List("a", "debasish") + (proc !! REMOVE_AFTER_PUT(add, rem)).getOrElse("REMOVE_AFTER_PUT failed") should equal(5) + + (proc !! GET("debasish")).getOrElse("debasish not found") should equal("debasish Not found") + (proc !! GET("a")).getOrElse("a not found") should equal("a Not found") + + (proc !! GET("b")).getOrElse("b not found") should equal("2") + + (proc !! CONTAINS("b")).getOrElse("b not found") should equal(true) + (proc !! CONTAINS("debasish")).getOrElse("debasish not found") should equal(false) + (proc !! MAP_SIZE).getOrElse("Size failed") should equal(5) + proc.stop + } + } + + describe("Ticket 343 Issue #2") { + it("clear after put should work within the same transaction") { + val proc = actorOf[MongoSampleMapStorage] + proc.start + + (proc !! SET("debasish", "anshinsoft")).getOrElse("Set failed") should equal(("debasish", "anshinsoft")) + (proc !! GET("debasish")).getOrElse("Get failed") should equal("anshinsoft") + (proc !! MAP_SIZE).getOrElse("Size failed") should equal(1) + + val add = List(("a", "1"), ("b", "2"), ("c", "3")) + (proc !! CLEAR_AFTER_PUT(add)).getOrElse("CLEAR_AFTER_PUT failed") should equal(true) + + (proc !! MAP_SIZE).getOrElse("Size failed") should equal(1) + proc.stop + } + } + + describe("Ticket 343 Issue #3") { + it("map size should change after the transaction") { + val proc = actorOf[MongoSampleMapStorage] + proc.start + + (proc !! SET("debasish", "anshinsoft")).getOrElse("Set failed") should equal(("debasish", "anshinsoft")) + (proc !! GET("debasish")).getOrElse("Get failed") should equal("anshinsoft") + (proc !! MAP_SIZE).getOrElse("Size failed") should equal(1) + + (proc !! MSET(List(("dg", "1"), ("mc", "2"), ("nd", "3")))).getOrElse("Mset failed") should equal(3) + (proc !! MAP_SIZE).getOrElse("Size failed") should equal(4) + + (proc !! GET("dg")).getOrElse("Get failed") should equal("1") + (proc !! GET("mc")).getOrElse("Get failed") should equal("2") + (proc !! GET("nd")).getOrElse("Get failed") should equal("3") + proc.stop + } + } + + describe("slice test") { + it("should pass") { + val proc = actorOf[MongoSampleMapStorage] + proc.start + + (proc !! SET("debasish", "anshinsoft")).getOrElse("Set failed") should equal(("debasish", "anshinsoft")) + (proc !! GET("debasish")).getOrElse("Get failed") should equal("anshinsoft") + // (proc !! MAP_SIZE).getOrElse("Size failed") should equal(1) + + (proc !! MSET(List(("dg", "1"), ("mc", "2"), ("nd", "3")))).getOrElse("Mset failed") should equal(3) + (proc !! MAP_SIZE).getOrElse("Size failed") should equal(4) + + (proc !! PUT_WITH_SLICE(List(("ec", "1"), ("tb", "2"), ("mc", "10")), "dg", 3)).get.asInstanceOf[List[(Array[Byte], Array[Byte])]].map { case (k, v) => (new String(k), new String(v)) } should equal(List(("dg", "1"), ("ec", "1"), ("mc", "10"))) + + (proc !! PUT_REM_WITH_SLICE(List(("fc", "1"), ("gb", "2"), ("xy", "10")), List("tb", "fc"), "dg", 5)).get.asInstanceOf[List[(Array[Byte], Array[Byte])]].map { case (k, v) => (new String(k), new String(v)) } should equal(List(("dg", "1"), ("ec", "1"), ("gb", "2"), ("mc", "10"), ("nd", "3"))) + proc.stop + } + } + + describe("Ticket 343 Issue #4") { + it("vector get should not ignore elements that were in vector before transaction") { + + val proc = actorOf[MongoSampleVectorStorage] + proc.start + + // add 4 elements in separate transactions + (proc !! VADD("debasish")).getOrElse("VADD failed") should equal(1) + (proc !! VADD("maulindu")).getOrElse("VADD failed") should equal(2) + (proc !! VADD("ramanendu")).getOrElse("VADD failed") should equal(3) + (proc !! VADD("nilanjan")).getOrElse("VADD failed") should equal(4) + + new String((proc !! VGET(0)).get.asInstanceOf[Array[Byte]] ) should equal("nilanjan") + new String((proc !! VGET(1)).get.asInstanceOf[Array[Byte]] ) should equal("ramanendu") + new String((proc !! VGET(2)).get.asInstanceOf[Array[Byte]] ) should equal("maulindu") + new String((proc !! VGET(3)).get.asInstanceOf[Array[Byte]] ) should equal("debasish") + + // now add 3 more and do gets in the same transaction + (proc !! VGET_AFTER_VADD(List("a", "b", "c"), List(0, 2, 4))).get.asInstanceOf[List[String]] should equal(List("c", "a", "ramanendu")) + proc.stop + } + } + + describe("Ticket 343 Issue #6") { + it("vector update should not ignore transaction") { + val proc = actorOf[MongoSampleVectorStorage] + proc.start + + // add 4 elements in separate transactions + (proc !! VADD("debasish")).getOrElse("VADD failed") should equal(1) + (proc !! VADD("maulindu")).getOrElse("VADD failed") should equal(2) + (proc !! VADD("ramanendu")).getOrElse("VADD failed") should equal(3) + (proc !! VADD("nilanjan")).getOrElse("VADD failed") should equal(4) + + evaluating { + (proc !! VUPD_AND_ABORT(0, "virat")).getOrElse("VUPD_AND_ABORT failed") + } should produce [Exception] + + // update aborts and hence values will remain unchanged + new String((proc !! VGET(0)).get.asInstanceOf[Array[Byte]] ) should equal("nilanjan") + proc.stop + } + } + + describe("Ticket 343 Issue #5") { + it("vector slice() should not ignore elements added in current transaction") { + val proc = actorOf[MongoSampleVectorStorage] + proc.start + + // add 4 elements in separate transactions + (proc !! VADD("debasish")).getOrElse("VADD failed") should equal(1) + (proc !! VADD("maulindu")).getOrElse("VADD failed") should equal(2) + (proc !! VADD("ramanendu")).getOrElse("VADD failed") should equal(3) + (proc !! VADD("nilanjan")).getOrElse("VADD failed") should equal(4) + + // slice with no new elements added in current transaction + (proc !! VADD_WITH_SLICE(List(), 2, 2)).getOrElse("VADD_WITH_SLICE failed") should equal(Vector("maulindu", "debasish")) + + // slice with new elements added in current transaction + (proc !! VADD_WITH_SLICE(List("a", "b", "c", "d"), 2, 2)).getOrElse("VADD_WITH_SLICE failed") should equal(Vector("b", "a")) + proc.stop + } + } +} diff --git a/akka-persistence/akka-persistence-redis/src/main/scala/RedisStorage.scala b/akka-persistence/akka-persistence-redis/src/main/scala/RedisStorage.scala index c92761beea..1eca775567 100644 --- a/akka-persistence/akka-persistence-redis/src/main/scala/RedisStorage.scala +++ b/akka-persistence/akka-persistence-redis/src/main/scala/RedisStorage.scala @@ -36,7 +36,7 @@ object RedisStorage extends Storage { * * @author Debasish Ghosh */ -class RedisPersistentMap(id: String) extends PersistentMap[Array[Byte], Array[Byte]] { +class RedisPersistentMap(id: String) extends PersistentMapBinary { val uuid = id val storage = RedisStorageBackend } diff --git a/akka-persistence/akka-persistence-redis/src/test/scala/RedisTicket343Spec.scala b/akka-persistence/akka-persistence-redis/src/test/scala/RedisTicket343Spec.scala new file mode 100644 index 0000000000..de236b9a5a --- /dev/null +++ b/akka-persistence/akka-persistence-redis/src/test/scala/RedisTicket343Spec.scala @@ -0,0 +1,351 @@ +package se.scalablesolutions.akka.persistence.redis + +import org.scalatest.Spec +import org.scalatest.matchers.ShouldMatchers +import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach} +import org.scalatest.junit.JUnitRunner +import org.junit.runner.RunWith + +import se.scalablesolutions.akka.actor.{Actor} +import se.scalablesolutions.akka.config.OneForOneStrategy +import Actor._ +import se.scalablesolutions.akka.persistence.common.PersistentVector +import se.scalablesolutions.akka.stm.global._ +import se.scalablesolutions.akka.config.ScalaConfig._ +import se.scalablesolutions.akka.util.Logging + +import RedisStorageBackend._ + +case class GET(k: String) +case class SET(k: String, v: String) +case class REM(k: String) +case class CONTAINS(k: String) +case object MAP_SIZE +case class MSET(kvs: List[(String, String)]) +case class REMOVE_AFTER_PUT(kvsToAdd: List[(String, String)], ksToRem: List[String]) +case class CLEAR_AFTER_PUT(kvsToAdd: List[(String, String)]) +case class PUT_WITH_SLICE(kvsToAdd: List[(String, String)], start: String, cnt: Int) +case class PUT_REM_WITH_SLICE(kvsToAdd: List[(String, String)], ksToRem: List[String], start: String, cnt: Int) + +case class VADD(v: String) +case class VUPD(i: Int, v: String) +case class VUPD_AND_ABORT(i: Int, v: String) +case class VGET(i: Int) +case object VSIZE +case class VGET_AFTER_VADD(vsToAdd: List[String], isToFetch: List[Int]) +case class VADD_WITH_SLICE(vsToAdd: List[String], start: Int, cnt: Int) + +object Storage { + class RedisSampleMapStorage extends Actor { + self.lifeCycle = Some(LifeCycle(Permanent)) + val FOO_MAP = "akka.sample.map" + + private var fooMap = atomic { RedisStorage.getMap(FOO_MAP) } + + def receive = { + case SET(k, v) => + atomic { + fooMap += (k.getBytes, v.getBytes) + } + self.reply((k, v)) + + case GET(k) => + val v = atomic { + fooMap.get(k.getBytes) + } + self.reply(v.collect {case byte => new String(byte)}.getOrElse(k + " Not found")) + + case REM(k) => + val v = atomic { + fooMap -= k.getBytes + } + self.reply(k) + + case CONTAINS(k) => + val v = atomic { + fooMap contains k.getBytes + } + self.reply(v) + + case MAP_SIZE => + val v = atomic { + fooMap.size + } + self.reply(v) + + case MSET(kvs) => + atomic { + kvs.foreach {kv => + fooMap += (kv._1.getBytes, kv._2.getBytes) + } + } + self.reply(kvs.size) + + case REMOVE_AFTER_PUT(kvs2add, ks2rem) => + val v = + atomic { + kvs2add.foreach {kv => + fooMap += (kv._1.getBytes, kv._2.getBytes) + } + + ks2rem.foreach {k => + fooMap -= k.getBytes + } + fooMap.size + } + self.reply(v) + + case CLEAR_AFTER_PUT(kvs2add) => + atomic { + kvs2add.foreach {kv => + fooMap += (kv._1.getBytes, kv._2.getBytes) + } + fooMap.clear + } + self.reply(true) + + case PUT_WITH_SLICE(kvs2add, from, cnt) => + val v = + atomic { + kvs2add.foreach {kv => + fooMap += (kv._1.getBytes, kv._2.getBytes) + } + fooMap.slice(Some(from.getBytes), cnt) + } + self.reply(v: List[(Array[Byte], Array[Byte])]) + + case PUT_REM_WITH_SLICE(kvs2add, ks2rem, from, cnt) => + val v = + atomic { + kvs2add.foreach {kv => + fooMap += (kv._1.getBytes, kv._2.getBytes) + } + ks2rem.foreach {k => + fooMap -= k.getBytes + } + fooMap.slice(Some(from.getBytes), cnt) + } + self.reply(v: List[(Array[Byte], Array[Byte])]) + } + } + + class RedisSampleVectorStorage extends Actor { + self.lifeCycle = Some(LifeCycle(Permanent)) + val FOO_VECTOR = "akka.sample.vector" + + private var fooVector = atomic { RedisStorage.getVector(FOO_VECTOR) } + + def receive = { + case VADD(v) => + val size = + atomic { + fooVector + v.getBytes + fooVector length + } + self.reply(size) + + case VGET(index) => + val ind = + atomic { + fooVector get index + } + self.reply(ind) + + case VGET_AFTER_VADD(vs, is) => + val els = + atomic { + vs.foreach(fooVector + _.getBytes) + (is.foldRight(List[Array[Byte]]())(fooVector.get(_) :: _)).map(new String(_)) + } + self.reply(els) + + case VUPD_AND_ABORT(index, value) => + val l = + atomic { + fooVector.update(index, value.getBytes) + // force fail + fooVector get 100 + } + self.reply(index) + + case VADD_WITH_SLICE(vs, s, c) => + val l = + atomic { + vs.foreach(fooVector + _.getBytes) + fooVector.slice(Some(s), None, c) + } + self.reply(l.map(new String(_))) + } + } +} + +import Storage._ + +@RunWith(classOf[JUnitRunner]) +class RedisTicket343Spec extends + Spec with + ShouldMatchers with + BeforeAndAfterAll with + BeforeAndAfterEach { + + override def beforeAll { + flushDB + println("** destroyed database") + } + + override def afterEach { + flushDB + println("** destroyed database") + } + + describe("Ticket 343 Issue #1") { + it("remove after put should work within the same transaction") { + val proc = actorOf[RedisSampleMapStorage] + proc.start + + (proc !! SET("debasish", "anshinsoft")).getOrElse("Set failed") should equal(("debasish", "anshinsoft")) + (proc !! GET("debasish")).getOrElse("Get failed") should equal("anshinsoft") + (proc !! MAP_SIZE).getOrElse("Size failed") should equal(1) + + (proc !! MSET(List(("dg", "1"), ("mc", "2"), ("nd", "3")))).getOrElse("Mset failed") should equal(3) + + (proc !! GET("dg")).getOrElse("Get failed") should equal("1") + (proc !! GET("mc")).getOrElse("Get failed") should equal("2") + (proc !! GET("nd")).getOrElse("Get failed") should equal("3") + + (proc !! MAP_SIZE).getOrElse("Size failed") should equal(4) + + val add = List(("a", "1"), ("b", "2"), ("c", "3")) + val rem = List("a", "debasish") + (proc !! REMOVE_AFTER_PUT(add, rem)).getOrElse("REMOVE_AFTER_PUT failed") should equal(5) + + (proc !! GET("debasish")).getOrElse("debasish not found") should equal("debasish Not found") + (proc !! GET("a")).getOrElse("a not found") should equal("a Not found") + + (proc !! GET("b")).getOrElse("b not found") should equal("2") + + (proc !! CONTAINS("b")).getOrElse("b not found") should equal(true) + (proc !! CONTAINS("debasish")).getOrElse("debasish not found") should equal(false) + (proc !! MAP_SIZE).getOrElse("Size failed") should equal(5) + proc.stop + } + } + + describe("Ticket 343 Issue #2") { + it("clear after put should work within the same transaction") { + val proc = actorOf[RedisSampleMapStorage] + proc.start + + (proc !! SET("debasish", "anshinsoft")).getOrElse("Set failed") should equal(("debasish", "anshinsoft")) + (proc !! GET("debasish")).getOrElse("Get failed") should equal("anshinsoft") + (proc !! MAP_SIZE).getOrElse("Size failed") should equal(1) + + val add = List(("a", "1"), ("b", "2"), ("c", "3")) + (proc !! CLEAR_AFTER_PUT(add)).getOrElse("CLEAR_AFTER_PUT failed") should equal(true) + + (proc !! MAP_SIZE).getOrElse("Size failed") should equal(1) + proc.stop + } + } + + describe("Ticket 343 Issue #3") { + it("map size should change after the transaction") { + val proc = actorOf[RedisSampleMapStorage] + proc.start + + (proc !! SET("debasish", "anshinsoft")).getOrElse("Set failed") should equal(("debasish", "anshinsoft")) + (proc !! GET("debasish")).getOrElse("Get failed") should equal("anshinsoft") + (proc !! MAP_SIZE).getOrElse("Size failed") should equal(1) + + (proc !! MSET(List(("dg", "1"), ("mc", "2"), ("nd", "3")))).getOrElse("Mset failed") should equal(3) + (proc !! MAP_SIZE).getOrElse("Size failed") should equal(4) + + (proc !! GET("dg")).getOrElse("Get failed") should equal("1") + (proc !! GET("mc")).getOrElse("Get failed") should equal("2") + (proc !! GET("nd")).getOrElse("Get failed") should equal("3") + proc.stop + } + } + + describe("slice test") { + it("should pass") { + val proc = actorOf[RedisSampleMapStorage] + proc.start + + (proc !! SET("debasish", "anshinsoft")).getOrElse("Set failed") should equal(("debasish", "anshinsoft")) + (proc !! GET("debasish")).getOrElse("Get failed") should equal("anshinsoft") + (proc !! MAP_SIZE).getOrElse("Size failed") should equal(1) + + (proc !! MSET(List(("dg", "1"), ("mc", "2"), ("nd", "3")))).getOrElse("Mset failed") should equal(3) + (proc !! MAP_SIZE).getOrElse("Size failed") should equal(4) + + (proc !! PUT_WITH_SLICE(List(("ec", "1"), ("tb", "2"), ("mc", "10")), "dg", 3)).get.asInstanceOf[List[(Array[Byte], Array[Byte])]].map { case (k, v) => (new String(k), new String(v)) } should equal(List(("dg", "1"), ("ec", "1"), ("mc", "10"))) + + (proc !! PUT_REM_WITH_SLICE(List(("fc", "1"), ("gb", "2"), ("xy", "10")), List("tb", "fc"), "dg", 5)).get.asInstanceOf[List[(Array[Byte], Array[Byte])]].map { case (k, v) => (new String(k), new String(v)) } should equal(List(("dg", "1"), ("ec", "1"), ("gb", "2"), ("mc", "10"), ("nd", "3"))) + proc.stop + } + } + + describe("Ticket 343 Issue #4") { + it("vector get should not ignore elements that were in vector before transaction") { + val proc = actorOf[RedisSampleVectorStorage] + proc.start + + // add 4 elements in separate transactions + (proc !! VADD("debasish")).getOrElse("VADD failed") should equal(1) + (proc !! VADD("maulindu")).getOrElse("VADD failed") should equal(2) + (proc !! VADD("ramanendu")).getOrElse("VADD failed") should equal(3) + (proc !! VADD("nilanjan")).getOrElse("VADD failed") should equal(4) + + new String((proc !! VGET(0)).get.asInstanceOf[Array[Byte]] ) should equal("nilanjan") + new String((proc !! VGET(1)).get.asInstanceOf[Array[Byte]] ) should equal("ramanendu") + new String((proc !! VGET(2)).get.asInstanceOf[Array[Byte]] ) should equal("maulindu") + new String((proc !! VGET(3)).get.asInstanceOf[Array[Byte]] ) should equal("debasish") + + // now add 3 more and do gets in the same transaction + (proc !! VGET_AFTER_VADD(List("a", "b", "c"), List(0, 2, 4))).get.asInstanceOf[List[String]] should equal(List("c", "a", "ramanendu")) + proc.stop + } + } + + describe("Ticket 343 Issue #6") { + it("vector update should not ignore transaction") { + val proc = actorOf[RedisSampleVectorStorage] + proc.start + + // add 4 elements in separate transactions + (proc !! VADD("debasish")).getOrElse("VADD failed") should equal(1) + (proc !! VADD("maulindu")).getOrElse("VADD failed") should equal(2) + (proc !! VADD("ramanendu")).getOrElse("VADD failed") should equal(3) + (proc !! VADD("nilanjan")).getOrElse("VADD failed") should equal(4) + + evaluating { + (proc !! VUPD_AND_ABORT(0, "virat")).getOrElse("VUPD_AND_ABORT failed") + } should produce [Exception] + + // update aborts and hence values will remain unchanged + new String((proc !! VGET(0)).get.asInstanceOf[Array[Byte]] ) should equal("nilanjan") + proc.stop + } + } + + describe("Ticket 343 Issue #5") { + it("vector slice() should not ignore elements added in current transaction") { + val proc = actorOf[RedisSampleVectorStorage] + proc.start + + // add 4 elements in separate transactions + (proc !! VADD("debasish")).getOrElse("VADD failed") should equal(1) + (proc !! VADD("maulindu")).getOrElse("VADD failed") should equal(2) + (proc !! VADD("ramanendu")).getOrElse("VADD failed") should equal(3) + (proc !! VADD("nilanjan")).getOrElse("VADD failed") should equal(4) + + // slice with no new elements added in current transaction + (proc !! VADD_WITH_SLICE(List(), 2, 2)).getOrElse("VADD_WITH_SLICE failed") should equal(Vector("maulindu", "debasish")) + + // slice with new elements added in current transaction + (proc !! VADD_WITH_SLICE(List("a", "b", "c", "d"), 2, 2)).getOrElse("VADD_WITH_SLICE failed") should equal(Vector("b", "a")) + proc.stop + } + } +} diff --git a/project/build/AkkaProject.scala b/project/build/AkkaProject.scala index 0eb289a290..0ee2b18108 100644 --- a/project/build/AkkaProject.scala +++ b/project/build/AkkaProject.scala @@ -482,7 +482,7 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) { val commons_codec = Dependencies.commons_codec val redis = Dependencies.redis - override def testOptions = TestFilter((name: String) => name.endsWith("Test")) :: Nil + // override def testOptions = TestFilter((name: String) => name.endsWith("Test")) :: Nil } // -------------------------------------------------------------------------------------------------------------------