diff --git a/akka-http/src/main/scala/AkkaCometServlet.scala b/akka-http/src/main/scala/AkkaCometServlet.scala index 16e271ba71..a56346d8be 100644 --- a/akka-http/src/main/scala/AkkaCometServlet.scala +++ b/akka-http/src/main/scala/AkkaCometServlet.scala @@ -89,16 +89,14 @@ class AkkaServlet extends AtmosphereServlet { addAtmosphereHandler("/*", servlet, new AkkaBroadcaster) } - lazy val akkaCometResolver: CometSupportResolver = { + lazy val akkaCometResolver: CometSupportResolver = new DefaultCometSupportResolver(config) { import scala.collection.JavaConversions._ - new DefaultCometSupportResolver(config) { - lazy val desiredCometSupport = - Option(getInitParameter("cometSupport")) filter testClassExists map newCometSupport + lazy val desiredCometSupport = + Option(getInitParameter("cometSupport")) filter testClassExists map newCometSupport - override def resolve(useNativeIfPossible : Boolean, useBlockingAsDefault : Boolean) : CometSupport[_ <: AtmosphereResource[_,_]] = - desiredCometSupport.getOrElse(super.resolve(useNativeIfPossible, useBlockingAsDefault)) - } + override def resolve(useNativeIfPossible : Boolean, useBlockingAsDefault : Boolean) : CometSupport[_ <: AtmosphereResource[_,_]] = + desiredCometSupport.getOrElse(super.resolve(useNativeIfPossible, useBlockingAsDefault)) } override def createCometSupportResolver() = akkaCometResolver diff --git a/akka-persistence/akka-persistence-common/src/main/scala/Storage.scala b/akka-persistence/akka-persistence-common/src/main/scala/Storage.scala index e75fd9581c..9d98095045 100644 --- a/akka-persistence/akka-persistence-common/src/main/scala/Storage.scala +++ b/akka-persistence/akka-persistence-common/src/main/scala/Storage.scala @@ -7,6 +7,7 @@ package se.scalablesolutions.akka.persistence.common import se.scalablesolutions.akka.stm._ import se.scalablesolutions.akka.stm.TransactionManagement.transaction import se.scalablesolutions.akka.util.Logging +import collection.mutable.ArraySeq // FIXME move to 'stm' package + add message with more info class NoTransactionInScopeException extends RuntimeException @@ -47,26 +48,38 @@ trait Storage { type ElementType def newMap: PersistentMap[ElementType, ElementType] + def newVector: PersistentVector[ElementType] + def newRef: PersistentRef[ElementType] + def newQueue: PersistentQueue[ElementType] = // only implemented for redis throw new UnsupportedOperationException + def newSortedSet: PersistentSortedSet[ElementType] = // only implemented for redis throw new UnsupportedOperationException def getMap(id: String): PersistentMap[ElementType, ElementType] + def getVector(id: String): PersistentVector[ElementType] + def getRef(id: String): PersistentRef[ElementType] + def getQueue(id: String): PersistentQueue[ElementType] = // only implemented for redis throw new UnsupportedOperationException + def getSortedSet(id: String): PersistentSortedSet[ElementType] = // only implemented for redis throw new UnsupportedOperationException def newMap(id: String): PersistentMap[ElementType, ElementType] + def newVector(id: String): PersistentVector[ElementType] + def newRef(id: String): PersistentRef[ElementType] + def newQueue(id: String): PersistentQueue[ElementType] = // only implemented for redis throw new UnsupportedOperationException + def newSortedSet(id: String): PersistentSortedSet[ElementType] = // only implemented for redis throw new UnsupportedOperationException } @@ -90,7 +103,7 @@ private[akka] object PersistentMap { * @author Jonas Bonér */ trait PersistentMap[K, V] extends scala.collection.mutable.Map[K, V] - with Transactional with Committable with Abortable with Logging { + with Transactional with Committable with Abortable with Logging { //Import Ops import PersistentMap._ @@ -118,7 +131,7 @@ trait PersistentMap[K, V] extends scala.collection.mutable.Map[K, V] protected def clearDistinctKeys = keysInCurrentTx.clear protected def filterTxLogByKey(key: K): IndexedSeq[LogEntry] = - appendOnlyTxLog filter(e => e.key.map(equal(_, key)).getOrElse(true)) + appendOnlyTxLog filter (e => e.key.map(equal(_, key)).getOrElse(true)) // need to get current value considering the underlying storage as well as the transaction log protected def getCurrentValue(key: K): Option[V] = { @@ -129,7 +142,7 @@ trait PersistentMap[K, V] extends scala.collection.mutable.Map[K, V] // get the snapshot from the underlying store for this key val underlying = try { storage.getMapStorageEntryFor(uuid, key) - } catch { case e: Exception => None } + } catch {case e: Exception => None} if (txEntries.isEmpty) underlying else txEntries.last match { @@ -146,12 +159,14 @@ trait PersistentMap[K, V] extends scala.collection.mutable.Map[K, V] case None => Map.empty[K, V] case Some(v) => Map((key, v)) } - txEntries.foreach {case LogEntry(k, v, o) => o match { - case PUT => m.put(k.get, v.get) - case REM => m -= k.get - case UPD => m.update(k.get, v.get) - case CLR => Map.empty[K, V] - }} + txEntries.foreach { + case LogEntry(k, v, o) => o match { + case PUT => m.put(k.get, v.get) + case REM => m -= k.get + case UPD => m.update(k.get, v.get) + case CLR => Map.empty[K, V] + } + } m get key } @@ -159,12 +174,14 @@ trait PersistentMap[K, V] extends scala.collection.mutable.Map[K, V] val storage: MapStorageBackend[K, V] def commit = { - appendOnlyTxLog.foreach { case LogEntry(k, v, o) => o match { - case PUT => storage.insertMapStorageEntryFor(uuid, k.get, v.get) - case UPD => storage.insertMapStorageEntryFor(uuid, k.get, v.get) - case REM => storage.removeMapStorageFor(uuid, k.get) - case CLR => storage.removeMapStorageFor(uuid) - }} + appendOnlyTxLog.foreach { + case LogEntry(k, v, o) => o match { + case PUT => storage.insertMapStorageEntryFor(uuid, k.get, v.get) + case UPD => storage.insertMapStorageEntryFor(uuid, k.get, v.get) + case REM => storage.removeMapStorageFor(uuid, k.get) + case CLR => storage.removeMapStorageFor(uuid) + } + } appendOnlyTxLog.clear clearDistinctKeys @@ -180,8 +197,8 @@ trait PersistentMap[K, V] extends scala.collection.mutable.Map[K, V] this } - override def +=(kv : (K,V)) = { - put(kv._1,kv._2) + override def +=(kv: (K, V)) = { + put(kv._1, kv._2) this } @@ -230,10 +247,10 @@ trait PersistentMap[K, V] extends scala.collection.mutable.Map[K, V] case Seq() => // current tx doesn't use this storage.getMapStorageEntryFor(uuid, key).isDefined // check storage case txs => // present in log - val lastOp = txs.last.op + val lastOp = txs.last.op lastOp != REM && lastOp != CLR // last entry cannot be a REM - } - } catch { case e: Exception => false } + } + } catch {case e: Exception => false} protected def existsInStorage(key: K): Option[V] = try { storage.getMapStorageEntryFor(uuid, key) @@ -243,33 +260,33 @@ trait PersistentMap[K, V] extends scala.collection.mutable.Map[K, V] override def size: Int = try { // partition key set affected in current tx into those which r added & which r deleted - val (keysAdded, keysRemoved) = keysInCurrentTx.map { + val (keysAdded, keysRemoved) = keysInCurrentTx.map { case (kseq, k) => ((kseq, k), getCurrentValue(k)) }.partition(_._2.isDefined) // keys which existed in storage but removed in current tx - val inStorageRemovedInTx = - keysRemoved.keySet - .map(_._2) - .filter(k => existsInStorage(k).isDefined) - .size + val inStorageRemovedInTx = + keysRemoved.keySet + .map(_._2) + .filter(k => existsInStorage(k).isDefined) + .size // all keys in storage - val keysInStorage = - storage.getMapStorageFor(uuid) - .map { case (k, v) => toEquals(k) } - .toSet + val keysInStorage = + storage.getMapStorageFor(uuid) + .map {case (k, v) => toEquals(k)} + .toSet // (keys that existed UNION keys added ) - (keys removed) (keysInStorage union keysAdded.keySet.map(_._1)).size - inStorageRemovedInTx - } catch { - case e: Exception => 0 + } catch { + case e: Exception => 0 } // get must consider underlying storage & current uncommitted tx log override def get(key: K): Option[V] = getCurrentValue(key) - def iterator: Iterator[Tuple2[K, V]] + def iterator: Iterator[Tuple2[K, V]] private def register = { if (transaction.get.isEmpty) throw new NoTransactionInScopeException @@ -277,38 +294,50 @@ trait PersistentMap[K, V] extends scala.collection.mutable.Map[K, V] } } +object PersistentMapBinary { + object COrdering { + //frontend + implicit object ArraySeqOrdering extends Ordering[ArraySeq[Byte]] { + def compare(o1: ArraySeq[Byte], o2: ArraySeq[Byte]) = + ArrayOrdering.compare(o1.toArray, o2.toArray) + } + //backend + implicit object ArrayOrdering extends Ordering[Array[Byte]] { + def compare(o1: Array[Byte], o2: Array[Byte]) = + new String(o1) compare new String(o2) + } + } +} + trait PersistentMapBinary extends PersistentMap[Array[Byte], Array[Byte]] { import scala.collection.mutable.ArraySeq type T = ArraySeq[Byte] + def toEquals(k: Array[Byte]) = ArraySeq(k: _*) + override def equal(k1: Array[Byte], k2: Array[Byte]): Boolean = k1 sameElements k2 - object COrdering { - implicit object ArraySeqOrdering extends Ordering[ArraySeq[Byte]] { - def compare(o1: ArraySeq[Byte], o2: ArraySeq[Byte]) = - new String(o1.toArray) compare new String(o2.toArray) - } - } + import scala.collection.immutable.{TreeMap, SortedMap} private def replayAllKeys: SortedMap[ArraySeq[Byte], Array[Byte]] = { - import COrdering._ + import PersistentMapBinary.COrdering._ // need ArraySeq for ordering - val fromStorage = - TreeMap(storage.getMapStorageFor(uuid).map { case (k, v) => (ArraySeq(k: _*), v) }: _*) + val fromStorage = + TreeMap(storage.getMapStorageFor(uuid).map {case (k, v) => (ArraySeq(k: _*), v)}: _*) - val (keysAdded, keysRemoved) = keysInCurrentTx.map { + val (keysAdded, keysRemoved) = keysInCurrentTx.map { case (_, k) => (k, getCurrentValue(k)) }.partition(_._2.isDefined) - val inStorageRemovedInTx = - keysRemoved.keySet - .filter(k => existsInStorage(k).isDefined) - .map(k => ArraySeq(k: _*)) + val inStorageRemovedInTx = + keysRemoved.keySet + .filter(k => existsInStorage(k).isDefined) + .map(k => ArraySeq(k: _*)) - (fromStorage -- inStorageRemovedInTx) ++ keysAdded.map { case (k, v) => (ArraySeq(k: _*), v.get) } + (fromStorage -- inStorageRemovedInTx) ++ keysAdded.map {case (k, v) => (ArraySeq(k: _*), v.get)} } override def slice(start: Option[Array[Byte]], finish: Option[Array[Byte]], count: Int): List[(Array[Byte], Array[Byte])] = try { @@ -317,51 +346,53 @@ trait PersistentMapBinary extends PersistentMap[Array[Byte], Array[Byte]] { if (newMap isEmpty) List[(Array[Byte], Array[Byte])]() val startKey = - start match { - case Some(bytes) => Some(ArraySeq(bytes: _*)) - case None => None - } + start match { + case Some(bytes) => Some(ArraySeq(bytes: _*)) + case None => None + } val endKey = - finish match { - case Some(bytes) => Some(ArraySeq(bytes: _*)) - case None => None - } + finish match { + case Some(bytes) => Some(ArraySeq(bytes: _*)) + case None => None + } ((startKey, endKey, count): @unchecked) match { case ((Some(s), Some(e), _)) => newMap.range(s, e) - .toList - .map(e => (e._1.toArray, e._2)) - .toList + .toList + .map(e => (e._1.toArray, e._2)) + .toList case ((Some(s), None, c)) if c > 0 => newMap.from(s) - .iterator - .take(count) - .map(e => (e._1.toArray, e._2)) - .toList + .iterator + .take(count) + .map(e => (e._1.toArray, e._2)) + .toList case ((Some(s), None, _)) => newMap.from(s) - .toList - .map(e => (e._1.toArray, e._2)) - .toList + .toList + .map(e => (e._1.toArray, e._2)) + .toList case ((None, Some(e), _)) => newMap.until(e) - .toList - .map(e => (e._1.toArray, e._2)) - .toList + .toList + .map(e => (e._1.toArray, e._2)) + .toList } - } catch { case e: Exception => Nil } + } catch {case e: Exception => Nil} - override def iterator: Iterator[(Array[Byte], Array[Byte])] = { + override def iterator: Iterator[(Array[Byte], Array[Byte])] = { new Iterator[(Array[Byte], Array[Byte])] { private var elements = replayAllKeys + override def next: (Array[Byte], Array[Byte]) = synchronized { val (k, v) = elements.head elements = elements.tail (k.toArray, v) } - override def hasNext: Boolean = synchronized { !elements.isEmpty } + + override def hasNext: Boolean = synchronized {!elements.isEmpty} } } } @@ -394,7 +425,7 @@ trait PersistentVector[T] extends IndexedSeq[T] with Transactional with Committa val storage: VectorStorageBackend[T] def commit = { - for(entry <- appendOnlyTxLog) { + for (entry <- appendOnlyTxLog) { (entry: @unchecked) match { case LogEntry(_, Some(v), ADD) => storage.insertVectorStorageEntryFor(uuid, v) case LogEntry(Some(i), Some(v), UPD) => storage.updateVectorStorageEntryFor(uuid, i, v) @@ -412,7 +443,7 @@ trait PersistentVector[T] extends IndexedSeq[T] with Transactional with Committa import scala.collection.mutable.ArrayBuffer var elemsStorage = ArrayBuffer(storage.getVectorStorageRangeFor(uuid, None, None, storage.getVectorStorageSizeFor(uuid)).reverse: _*) - for(entry <- appendOnlyTxLog) { + for (entry <- appendOnlyTxLog) { (entry: @unchecked) match { case LogEntry(_, Some(v), ADD) => elemsStorage += v case LogEntry(Some(i), Some(v), UPD) => elemsStorage.update(i, v) @@ -446,11 +477,11 @@ trait PersistentVector[T] extends IndexedSeq[T] with Transactional with Committa val curr = replay val s = if (start.isDefined) start.get else 0 val cnt = - if (finish.isDefined) { - val f = finish.get - if (f >= s) (f - s) else count - } - else count + if (finish.isDefined) { + val f = finish.get + if (f >= s) (f - s) else count + } + else count if (s == 0 && cnt == 0) List().toIndexedSeq else curr.slice(s, s + cnt).toIndexedSeq } @@ -519,12 +550,12 @@ trait PersistentRef[T] extends Transactional with Committable with Abortable { } } - private[akka] object PersistentQueue { - //Operations for PersistentQueue - sealed trait QueueOp - case object ENQ extends QueueOp - case object DEQ extends QueueOp - } +private[akka] object PersistentQueue { + //Operations for PersistentQueue + sealed trait QueueOp + case object ENQ extends QueueOp + case object DEQ extends QueueOp +} /** * Implementation of PersistentQueue for every concrete @@ -552,7 +583,7 @@ trait PersistentRef[T] extends Transactional with Committable with Abortable { * @author Debasish Ghosh */ trait PersistentQueue[A] extends scala.collection.mutable.Queue[A] - with Transactional with Committable with Abortable with Logging { + with Transactional with Committable with Abortable with Logging { //Import Ops import PersistentQueue._ @@ -575,11 +606,12 @@ trait PersistentQueue[A] extends scala.collection.mutable.Queue[A] val storage: QueueStorageBackend[A] def commit = { - enqueuedNDequeuedEntries.toList.foreach { e => - e._2 match { - case ENQ => storage.enqueue(uuid, e._1.get) - case DEQ => storage.dequeue(uuid) - } + enqueuedNDequeuedEntries.toList.foreach { + e => + e._2 match { + case ENQ => storage.enqueue(uuid, e._1.get) + case DEQ => storage.dequeue(uuid) + } } if (shouldClearOnCommit.isDefined && shouldClearOnCommit.get) { storage.remove(uuid) @@ -635,7 +667,7 @@ trait PersistentQueue[A] extends scala.collection.mutable.Queue[A] override def size: Int = try { storage.size(uuid) + localQ.get.length - } catch { case e: Exception => 0 } + } catch {case e: Exception => 0} override def isEmpty: Boolean = size == 0 @@ -644,10 +676,12 @@ trait PersistentQueue[A] extends scala.collection.mutable.Queue[A] enqueue(elem) this } + def ++=(elems: Iterator[A]) = { enqueue(elems.toList: _*) this } + def ++=(elems: Iterable[A]): Unit = this ++= elems.iterator override def dequeueFirst(p: A => Boolean): Option[A] = @@ -670,24 +704,24 @@ trait PersistentQueue[A] extends scala.collection.mutable.Queue[A] *

* zscore can be implemented in a variety of ways by the calling class: *

- * trait ZScorable {
+ * trait ZScorable        {
  *   def toZScore: Float
  * }
  *
- * class Foo extends ZScorable {
+ * class Foo extends ZScorable        {
  *   //.. implemnetation
  * }
  * 
* Or we can also use views: *
- * class Foo {
+ * class Foo        {
  *   //..
  * }
  *
- * implicit def Foo2Scorable(foo: Foo): ZScorable = new ZScorable {
- *   def toZScore = {
+ * implicit def Foo2Scorable(foo: Foo): ZScorable = new ZScorable        {
+ *   def toZScore =        {
  *     //..
- *   }
+ * }
  * }
  * 
* @@ -696,7 +730,6 @@ trait PersistentQueue[A] extends scala.collection.mutable.Queue[A] * @author */ trait PersistentSortedSet[A] extends Transactional with Committable with Abortable { - protected val newElems = TransactionalMap[A, Float]() protected val removedElems = TransactionalVector[A]() @@ -729,8 +762,8 @@ trait PersistentSortedSet[A] extends Transactional with Committable with Abortab } private def inStorage(elem: A): Option[Float] = storage.zscore(uuid, elem) match { - case Some(s) => Some(s.toFloat) - case None => None + case Some(s) => Some(s.toFloat) + case None => None } def contains(elem: A): Boolean = { @@ -758,8 +791,8 @@ trait PersistentSortedSet[A] extends Transactional with Committable with Abortab def compare(that: (A, Float)) = x._2 compare that._2 } - implicit def ordering = new scala.math.Ordering[(A,Float)] { - def compare(x: (A, Float),y : (A,Float)) = x._2 compare y._2 + implicit def ordering = new scala.math.Ordering[(A, Float)] { + def compare(x: (A, Float), y: (A, Float)) = x._2 compare y._2 } @@ -773,9 +806,9 @@ trait PersistentSortedSet[A] extends Transactional with Committable with Abortab // -1 means the last element, -2 means the second last val s = if (start < 0) start + l else start val e = - if (end < 0) end + l - else if (end >= l) (l - 1) - else end + if (end < 0) end + l + else if (end >= l) (l - 1) + else end // slice is open at the end, we need a closed end range ts.iterator.slice(s, e + 1).toList } diff --git a/akka-persistence/akka-persistence-common/src/test/scala/MapStorageBackendTest.scala b/akka-persistence/akka-persistence-common/src/test/scala/MapStorageBackendTest.scala new file mode 100644 index 0000000000..395d0ef269 --- /dev/null +++ b/akka-persistence/akka-persistence-common/src/test/scala/MapStorageBackendTest.scala @@ -0,0 +1,161 @@ +/** + * Copyright (C) 2009-2010 Scalable Solutions AB + */ + +package se.scalablesolutions.akka.persistence.common + +import org.scalatest.matchers.ShouldMatchers +import se.scalablesolutions.akka.util.Logging +import org.scalatest.{BeforeAndAfterEach, Spec} +import scala.util.Random +import collection.immutable.{TreeMap, HashMap, HashSet} +import se.scalablesolutions.akka.persistence.common.PersistentMapBinary.COrdering._ + + +/** + * Implementation Compatibility test for PersistentMap backend implementations. + */ + +trait MapStorageBackendTest extends Spec with ShouldMatchers with BeforeAndAfterEach with Logging { + def storage: MapStorageBackend[Array[Byte], Array[Byte]] + + def dropMaps: Unit + + override def beforeEach = { + log.info("beforeEach: dropping maps") + dropMaps + } + + override def afterEach = { + log.info("afterEach: dropping maps") + dropMaps + } + + + describe("A Properly functioning MapStorageBackend") { + it("should remove map storage properly") { + val mapName = "removeTest" + val mkey = "removeTestKey".getBytes + val value = "removeTestValue".getBytes + + storage.insertMapStorageEntryFor(mapName, mkey, value) + storage.getMapStorageEntryFor(mapName, mkey).isDefined should be(true) + storage.removeMapStorageFor(mapName, mkey) + storage.getMapStorageEntryFor(mapName, mkey) should be(None) + + storage.insertMapStorageEntryFor(mapName, mkey, value) + storage.getMapStorageEntryFor(mapName, mkey).isDefined should be(true) + storage.removeMapStorageFor(mapName) + storage.getMapStorageEntryFor(mapName, mkey) should be(None) + } + + it("should insert a single map storage element properly") { + val mapName = "insertSingleTest" + val mkey = "insertSingleTestKey".getBytes + val value = "insertSingleTestValue".getBytes + + storage.insertMapStorageEntryFor(mapName, mkey, value) + storage.getMapStorageEntryFor(mapName, mkey).get should be(value) + storage.removeMapStorageFor(mapName, mkey) + storage.getMapStorageEntryFor(mapName, mkey) should be(None) + + storage.insertMapStorageEntryFor(mapName, mkey, value) + storage.getMapStorageEntryFor(mapName, mkey).get should be(value) + storage.removeMapStorageFor(mapName) + storage.getMapStorageEntryFor(mapName, mkey) should be(None) + } + + + it("should insert multiple map storage elements properly") { + val mapName = "insertMultipleTest" + val rand = new Random(3).nextInt(100) + val entries = (1 to rand).toList.map { + index => + (("insertMultipleTestKey" + index).getBytes -> ("insertMutlipleTestValue" + index).getBytes) + } + + storage.insertMapStorageEntriesFor(mapName, entries) + entries foreach { + _ match { + case (mkey, value) => { + storage.getMapStorageEntryFor(mapName, mkey).isDefined should be(true) + storage.getMapStorageEntryFor(mapName, mkey).get should be(value) + } + } + } + storage.removeMapStorageFor(mapName) + entries foreach { + _ match { + case (mkey, value) => { + storage.getMapStorageEntryFor(mapName, mkey) should be(None) + } + } + } + } + + + it("should accurately track the number of key value pairs in a map") { + val mapName = "sizeTest" + val rand = new Random(3).nextInt(100) + val entries = (1 to rand).toList.map { + index => + (("sizeTestKey" + index).getBytes -> ("sizeTestValue" + index).getBytes) + } + + storage.insertMapStorageEntriesFor(mapName, entries) + storage.getMapStorageSizeFor(mapName) should be(rand) + } + + + + it("should return all the key value pairs in the map in the correct order when getMapStorageFor(name) is called") { + val mapName = "allTest" + val rand = new Random(3).nextInt(100) + var entries = new TreeMap[Array[Byte], Array[Byte]]()(ArrayOrdering) + (1 to rand).foreach { + index => + entries += (("allTestKey" + index).getBytes -> ("allTestValue" + index).getBytes) + } + + storage.insertMapStorageEntriesFor(mapName, entries.toList) + val retrieved = storage.getMapStorageFor(mapName) + retrieved.size should be(rand) + entries.size should be(rand) + + + + val entryMap = new HashMap[String, String] ++ entries.map {_ match {case (k, v) => (new String(k), new String(v))}} + val retrievedMap = new HashMap[String, String] ++ entries.map {_ match {case (k, v) => (new String(k), new String(v))}} + + entryMap should equal(retrievedMap) + + (0 until rand).foreach { + i: Int => { + new String(entries.toList(i)._1) should be(new String(retrieved(i)._1)) + } + } + + } + + it("should return all the key->value pairs that exist in the map that are between start and end, up to count pairs when getMapStorageRangeFor is called") { + //implement if this method will be used + } + + + it("should return Some(null), not None, for a key that has had the value null set and None for a key with no value set") { + val mapName = "nullTest" + val key = "key".getBytes + storage.insertMapStorageEntryFor(mapName, key, null) + storage.getMapStorageEntryFor(mapName, key).get should be(null) + storage.removeMapStorageFor(mapName, key) + storage.getMapStorageEntryFor(mapName, key) should be(None) + } + + it("should not throw an exception when size is called on a non existent map?") { + storage.getMapStorageSizeFor("nonExistent") should be(0) + } + + + } + +} \ No newline at end of file diff --git a/akka-persistence/akka-persistence-common/src/test/scala/QueueStorageBackendTest.scala b/akka-persistence/akka-persistence-common/src/test/scala/QueueStorageBackendTest.scala new file mode 100644 index 0000000000..3eb89e3db5 --- /dev/null +++ b/akka-persistence/akka-persistence-common/src/test/scala/QueueStorageBackendTest.scala @@ -0,0 +1,123 @@ +/** + * Copyright (C) 2009-2010 Scalable Solutions AB + */ + +package se.scalablesolutions.akka.persistence.common + +import org.scalatest.matchers.ShouldMatchers +import se.scalablesolutions.akka.util.Logging +import org.scalatest.{BeforeAndAfterEach, Spec} +import scala.util.Random + +/** + * Implementation Compatibility test for PersistentQueue backend implementations. + */ + +trait QueueStorageBackendTest extends Spec with ShouldMatchers with BeforeAndAfterEach with Logging { + def storage: QueueStorageBackend[Array[Byte]] + + def dropQueues: Unit + + override def beforeEach = { + log.info("beforeEach: dropping queues") + dropQueues + } + + override def afterEach = { + log.info("afterEach: dropping queues") + dropQueues + } + + + + describe("A Properly functioning QueueStorage Backend") { + it("should enqueue properly when there is capacity in the queue") { + val queue = "enqueueTest" + val value = "enqueueTestValue".getBytes + storage.size(queue) should be(0) + storage.enqueue(queue, value).get should be(1) + storage.size(queue) should be(1) + } + + it("should return None when enqueue is called on a full queue?") { + + } + + it("should dequeue properly when the queue is not empty") { + val queue = "dequeueTest" + val value = "dequeueTestValue".getBytes + storage.size(queue) should be(0) + storage.enqueue(queue, value) + storage.size(queue) should be(1) + storage.dequeue(queue).get should be(value) + } + + it("should return None when dequeue is called on an empty queue") { + val queue = "dequeueTest2" + val value = "dequeueTestValue2".getBytes + storage.size(queue) should be(0) + storage.dequeue(queue) should be(None) + } + + it("should accurately reflect the size of the queue") { + val queue = "sizeTest" + val rand = new Random(3).nextInt(100) + val values = (1 to rand).toList.map {i: Int => ("sizeTestValue" + i).getBytes} + values.foreach {storage.enqueue(queue, _)} + storage.size(queue) should be(rand) + val drand = new Random(3).nextInt(rand) + (1 to drand).foreach { + i: Int => { + storage.dequeue(queue).isDefined should be(true) + storage.size(queue) should be(rand - i) + } + } + } + + it("should support peek properly") { + val queue = "sizeTest" + val rand = new Random(3).nextInt(100) + val values = (1 to rand).toList.map {i: Int => ("peekTestValue" + i)} + storage.remove(queue) + values.foreach {s: String => storage.enqueue(queue, s.getBytes)} + (1 to rand).foreach { + index => { + val peek = storage.peek(queue, 0, index).map {new String(_)} + peek.size should be(index) + values.dropRight(values.size - index).equals(peek) should be(true) + } + } + (0 until rand).foreach { + index => { + val peek = storage.peek(queue, index, rand - index).map {new String(_)} + peek.size should be(rand - index) + values.drop(index).equals(peek) should be(true) + } + } + + //Should we test counts greater than queue size? or greater than queue size - count??? + } + + it("should not throw an exception when remove is called on a non-existent queue") { + storage.remove("exceptionTest") + } + + it("should remove queue storage properly") { + val queue = "removeTest" + val rand = new Random(3).nextInt(100) + val values = (1 to rand).toList.map {i: Int => ("removeValue" + i).getBytes} + values.foreach {storage.enqueue(queue, _)} + storage.size(queue) should be(rand) + storage.remove(queue) + storage.size(queue) should be(0) + } + + it("should accept null as a value to enqueue and return Some(null) when that value is dequeued") { + val queue = "nullTest" + storage.enqueue(queue, null).get should be(1) + storage.dequeue(queue).get should be(null) + storage.dequeue(queue) should be(None) + } + } + +} \ No newline at end of file diff --git a/akka-persistence/akka-persistence-common/src/test/scala/RefStorageBackendTest.scala b/akka-persistence/akka-persistence-common/src/test/scala/RefStorageBackendTest.scala new file mode 100644 index 0000000000..37902cf7c9 --- /dev/null +++ b/akka-persistence/akka-persistence-common/src/test/scala/RefStorageBackendTest.scala @@ -0,0 +1,52 @@ +/** + * Copyright (C) 2009-2010 Scalable Solutions AB + */ + +package se.scalablesolutions.akka.persistence.common + +import org.scalatest.matchers.ShouldMatchers +import se.scalablesolutions.akka.util.Logging +import org.scalatest.{BeforeAndAfterEach, Spec} + +/** + * Implementation Compatibility test for PersistentRef backend implementations. + */ + +trait RefStorageBackendTest extends Spec with ShouldMatchers with BeforeAndAfterEach with Logging { + def storage: RefStorageBackend[Array[Byte]] + + def dropRefs: Unit + + override def beforeEach = { + log.info("beforeEach: dropping refs") + dropRefs + } + + override def afterEach = { + log.info("afterEach: dropping refs") + dropRefs + } + + + describe("A Properly functioning RefStorageBackend") { + it("should successfully insert ref storage") { + val name = "RefStorageTest #1" + val value = name.getBytes + storage.insertRefStorageFor(name, value) + storage.getRefStorageFor(name).get should be(value) + } + + it("should return None when getRefStorage is called when no value has been inserted") { + val name = "RefStorageTest #2" + val value = name.getBytes + storage.getRefStorageFor(name) should be(None) + } + + it("Should return None, not Some(null) when getRefStorageFor is called when null has been set") { + val name = "RefStorageTest #3" + storage.insertRefStorageFor(name, null) + storage.getRefStorageFor(name) should be(None) + } + } + +} \ No newline at end of file diff --git a/akka-persistence/akka-persistence-common/src/test/scala/SortedSetStorageBackendTest.scala b/akka-persistence/akka-persistence-common/src/test/scala/SortedSetStorageBackendTest.scala new file mode 100644 index 0000000000..2a9d3ab324 --- /dev/null +++ b/akka-persistence/akka-persistence-common/src/test/scala/SortedSetStorageBackendTest.scala @@ -0,0 +1,35 @@ +/** + * Copyright (C) 2009-2010 Scalable Solutions AB + */ + +package se.scalablesolutions.akka.persistence.common + +import org.scalatest.matchers.ShouldMatchers +import se.scalablesolutions.akka.util.Logging +import org.scalatest.{BeforeAndAfterEach, Spec} + +/** + * Implementation Compatibility test for PersistentSortedSet backend implementations. + */ + +trait SortedSetStorageBackendTest extends Spec with ShouldMatchers with BeforeAndAfterEach with Logging { + def storage: SortedSetStorageBackend[Array[Byte]] + + def dropSortedSets: Unit + + override def beforeEach = { + log.info("beforeEach: dropping sorted sets") + dropSortedSets + } + + override def afterEach = { + log.info("afterEach: dropping sorted sets") + dropSortedSets + } + + + describe("A Properly functioning SortedSetStorageBackend Backend") { + + } + +} \ No newline at end of file diff --git a/akka-persistence/akka-persistence-common/src/test/scala/Ticket343Test.scala b/akka-persistence/akka-persistence-common/src/test/scala/Ticket343Test.scala new file mode 100644 index 0000000000..14eba7d4e3 --- /dev/null +++ b/akka-persistence/akka-persistence-common/src/test/scala/Ticket343Test.scala @@ -0,0 +1,362 @@ +/** + * Copyright (C) 2009-2010 Scalable Solutions AB + */ + +package se.scalablesolutions.akka.persistence.common + +import org.scalatest.Spec +import org.scalatest.matchers.ShouldMatchers +import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach} +import org.scalatest.junit.JUnitRunner +import org.junit.runner.RunWith + +import se.scalablesolutions.akka.actor.{Actor, ActorRef} +import se.scalablesolutions.akka.config.OneForOneStrategy +import Actor._ +import se.scalablesolutions.akka.stm.global._ +import se.scalablesolutions.akka.config.ScalaConfig._ +import se.scalablesolutions.akka.util.Logging +import StorageObj._ + + +case class GET(k: String) +case class SET(k: String, v: String) +case class REM(k: String) +case class CONTAINS(k: String) +case object MAP_SIZE +case class MSET(kvs: List[(String, String)]) +case class REMOVE_AFTER_PUT(kvsToAdd: List[(String, String)], ksToRem: List[String]) +case class CLEAR_AFTER_PUT(kvsToAdd: List[(String, String)]) +case class PUT_WITH_SLICE(kvsToAdd: List[(String, String)], start: String, cnt: Int) +case class PUT_REM_WITH_SLICE(kvsToAdd: List[(String, String)], ksToRem: List[String], start: String, cnt: Int) + +case class VADD(v: String) +case class VUPD(i: Int, v: String) +case class VUPD_AND_ABORT(i: Int, v: String) +case class VGET(i: Int) +case object VSIZE +case class VGET_AFTER_VADD(vsToAdd: List[String], isToFetch: List[Int]) +case class VADD_WITH_SLICE(vsToAdd: List[String], start: Int, cnt: Int) + + +object StorageObj { + var getMap: String => PersistentMap[Array[Byte], Array[Byte]] = _ + var getVector: String => PersistentVector[Array[Byte]] = _ + + class SampleMapStorage extends Actor { + self.lifeCycle = Permanent + val FOO_MAP = "akka.sample.map" + + private var fooMap = atomic {StorageObj.getMap(FOO_MAP)} + + def receive = { + case SET(k, v) => + atomic { + fooMap += (k.getBytes, v.getBytes) + } + self.reply((k, v)) + + case GET(k) => + val v = atomic { + fooMap.get(k.getBytes).map(new String(_)).getOrElse(k + " Not found") + } + self.reply(v) + + case REM(k) => + val v = atomic { + fooMap -= k.getBytes + } + self.reply(k) + + case CONTAINS(k) => + val v = atomic { + fooMap contains k.getBytes + } + self.reply(v) + + case MAP_SIZE => + val v = atomic { + fooMap.size + } + self.reply(v) + + case MSET(kvs) => atomic { + kvs.foreach {kv => fooMap += (kv._1.getBytes, kv._2.getBytes)} + } + self.reply(kvs.size) + + case REMOVE_AFTER_PUT(kvs2add, ks2rem) => atomic { + kvs2add.foreach { + kv => + fooMap += (kv._1.getBytes, kv._2.getBytes) + } + + ks2rem.foreach { + k => + fooMap -= k.getBytes + } + } + self.reply(fooMap.size) + + case CLEAR_AFTER_PUT(kvs2add) => atomic { + kvs2add.foreach { + kv => + fooMap += (kv._1.getBytes, kv._2.getBytes) + } + fooMap.clear + } + self.reply(true) + + case PUT_WITH_SLICE(kvs2add, from, cnt) => + val v = atomic { + kvs2add.foreach { + kv => + fooMap += (kv._1.getBytes, kv._2.getBytes) + } + fooMap.slice(Some(from.getBytes), cnt) + } + self.reply(v: List[(Array[Byte], Array[Byte])]) + + case PUT_REM_WITH_SLICE(kvs2add, ks2rem, from, cnt) => + val v = atomic { + kvs2add.foreach { + kv => + fooMap += (kv._1.getBytes, kv._2.getBytes) + } + ks2rem.foreach { + k => + fooMap -= k.getBytes + } + fooMap.slice(Some(from.getBytes), cnt) + } + self.reply(v: List[(Array[Byte], Array[Byte])]) + } + } + + class SampleVectorStorage extends Actor { + self.lifeCycle = Permanent + val FOO_VECTOR = "akka.sample.vector" + + private var fooVector = atomic {StorageObj.getVector(FOO_VECTOR)} + + def receive = { + case VADD(v) => + val size = + atomic { + fooVector + v.getBytes + fooVector length + } + self.reply(size) + + case VGET(index) => + val ind = + atomic { + fooVector get index + } + self.reply(ind) + + case VGET_AFTER_VADD(vs, is) => + val els = + atomic { + vs.foreach(fooVector + _.getBytes) + (is.foldRight(List[Array[Byte]]())(fooVector.get(_) :: _)).map(new String(_)) + } + self.reply(els) + + case VUPD_AND_ABORT(index, value) => + val l = + atomic { + fooVector.update(index, value.getBytes) + // force fail + fooVector get 100 + } + self.reply(index) + + case VADD_WITH_SLICE(vs, s, c) => + val l = + atomic { + vs.foreach(fooVector + _.getBytes) + fooVector.slice(Some(s), None, c) + } + self.reply(l.map(new String(_))) + } + } +} + + + +trait Ticket343Test extends +Spec with + ShouldMatchers with + BeforeAndAfterEach { + def getMap: String => PersistentMap[Array[Byte], Array[Byte]] + + def getVector: String => PersistentVector[Array[Byte]] + + + def dropMapsAndVectors: Unit + + override def beforeEach { + StorageObj.getMap = getMap + StorageObj.getVector = getVector + dropMapsAndVectors + println("** dropMapsAndVectors") + } + + override def afterEach { + dropMapsAndVectors + println("** dropMapsAndVectors") + } + + describe("Ticket 343 Issue #1") { + it("remove after put should work within the same transaction") { + val proc = actorOf[SampleMapStorage] + proc.start + + (proc !! SET("debasish", "anshinsoft")).getOrElse("Set failed") should equal(("debasish", "anshinsoft")) + (proc !! GET("debasish")).getOrElse("Get failed") should equal("anshinsoft") + (proc !! MAP_SIZE).getOrElse("Size failed") should equal(1) + + (proc !! MSET(List(("dg", "1"), ("mc", "2"), ("nd", "3")))).getOrElse("Mset failed") should equal(3) + + (proc !! GET("dg")).getOrElse("Get failed") should equal("1") + (proc !! GET("mc")).getOrElse("Get failed") should equal("2") + (proc !! GET("nd")).getOrElse("Get failed") should equal("3") + + (proc !! MAP_SIZE).getOrElse("Size failed") should equal(4) + + val add = List(("a", "1"), ("b", "2"), ("c", "3")) + val rem = List("a", "debasish") + (proc !! REMOVE_AFTER_PUT(add, rem)).getOrElse("REMOVE_AFTER_PUT failed") should equal(5) + + (proc !! GET("debasish")).getOrElse("debasish not found") should equal("debasish Not found") + (proc !! GET("a")).getOrElse("a not found") should equal("a Not found") + + (proc !! GET("b")).getOrElse("b not found") should equal("2") + + (proc !! CONTAINS("b")).getOrElse("b not found") should equal(true) + (proc !! CONTAINS("debasish")).getOrElse("debasish not found") should equal(false) + (proc !! MAP_SIZE).getOrElse("Size failed") should equal(5) + proc.stop + } + } + + describe("Ticket 343 Issue #2") { + it("clear after put should work within the same transaction") { + val proc = actorOf[SampleMapStorage] + proc.start + + (proc !! SET("debasish", "anshinsoft")).getOrElse("Set failed") should equal(("debasish", "anshinsoft")) + (proc !! GET("debasish")).getOrElse("Get failed") should equal("anshinsoft") + (proc !! MAP_SIZE).getOrElse("Size failed") should equal(1) + + val add = List(("a", "1"), ("b", "2"), ("c", "3")) + (proc !! CLEAR_AFTER_PUT(add)).getOrElse("CLEAR_AFTER_PUT failed") should equal(true) + + (proc !! MAP_SIZE).getOrElse("Size failed") should equal(0) + proc.stop + } + } + + describe("Ticket 343 Issue #3") { + it("map size should change after the transaction") { + val proc = actorOf[SampleMapStorage] + proc.start + + (proc !! SET("debasish", "anshinsoft")).getOrElse("Set failed") should equal(("debasish", "anshinsoft")) + (proc !! GET("debasish")).getOrElse("Get failed") should equal("anshinsoft") + (proc !! MAP_SIZE).getOrElse("Size failed") should equal(1) + + (proc !! MSET(List(("dg", "1"), ("mc", "2"), ("nd", "3")))).getOrElse("Mset failed") should equal(3) + (proc !! MAP_SIZE).getOrElse("Size failed") should equal(4) + + (proc !! GET("dg")).getOrElse("Get failed") should equal("1") + (proc !! GET("mc")).getOrElse("Get failed") should equal("2") + (proc !! GET("nd")).getOrElse("Get failed") should equal("3") + proc.stop + } + } + + describe("slice test") { + it("should pass") { + val proc = actorOf[SampleMapStorage] + proc.start + + (proc !! SET("debasish", "anshinsoft")).getOrElse("Set failed") should equal(("debasish", "anshinsoft")) + (proc !! GET("debasish")).getOrElse("Get failed") should equal("anshinsoft") + // (proc !! MAP_SIZE).getOrElse("Size failed") should equal(1) + + (proc !! MSET(List(("dg", "1"), ("mc", "2"), ("nd", "3")))).getOrElse("Mset failed") should equal(3) + (proc !! MAP_SIZE).getOrElse("Size failed") should equal(4) + + (proc !! PUT_WITH_SLICE(List(("ec", "1"), ("tb", "2"), ("mc", "10")), "dg", 3)).get.asInstanceOf[List[(Array[Byte], Array[Byte])]].map {case (k, v) => (new String(k), new String(v))} should equal(List(("dg", "1"), ("ec", "1"), ("mc", "10"))) + + (proc !! PUT_REM_WITH_SLICE(List(("fc", "1"), ("gb", "2"), ("xy", "10")), List("tb", "fc"), "dg", 5)).get.asInstanceOf[List[(Array[Byte], Array[Byte])]].map {case (k, v) => (new String(k), new String(v))} should equal(List(("dg", "1"), ("ec", "1"), ("gb", "2"), ("mc", "10"), ("nd", "3"))) + proc.stop + } + } + + describe("Ticket 343 Issue #4") { + it("vector get should not ignore elements that were in vector before transaction") { + + val proc = actorOf[SampleVectorStorage] + proc.start + + // add 4 elements in separate transactions + (proc !! VADD("debasish")).getOrElse("VADD failed") should equal(1) + (proc !! VADD("maulindu")).getOrElse("VADD failed") should equal(2) + (proc !! VADD("ramanendu")).getOrElse("VADD failed") should equal(3) + (proc !! VADD("nilanjan")).getOrElse("VADD failed") should equal(4) + + new String((proc !! VGET(0)).get.asInstanceOf[Array[Byte]]) should equal("nilanjan") + new String((proc !! VGET(1)).get.asInstanceOf[Array[Byte]]) should equal("ramanendu") + new String((proc !! VGET(2)).get.asInstanceOf[Array[Byte]]) should equal("maulindu") + new String((proc !! VGET(3)).get.asInstanceOf[Array[Byte]]) should equal("debasish") + + // now add 3 more and do gets in the same transaction + (proc !! VGET_AFTER_VADD(List("a", "b", "c"), List(0, 2, 4))).get.asInstanceOf[List[String]] should equal(List("c", "a", "ramanendu")) + proc.stop + } + } + + describe("Ticket 343 Issue #6") { + it("vector update should not ignore transaction") { + val proc = actorOf[SampleVectorStorage] + proc.start + + // add 4 elements in separate transactions + (proc !! VADD("debasish")).getOrElse("VADD failed") should equal(1) + (proc !! VADD("maulindu")).getOrElse("VADD failed") should equal(2) + (proc !! VADD("ramanendu")).getOrElse("VADD failed") should equal(3) + (proc !! VADD("nilanjan")).getOrElse("VADD failed") should equal(4) + + evaluating { + (proc !! VUPD_AND_ABORT(0, "virat")).getOrElse("VUPD_AND_ABORT failed") + } should produce[Exception] + + // update aborts and hence values will remain unchanged + new String((proc !! VGET(0)).get.asInstanceOf[Array[Byte]]) should equal("nilanjan") + proc.stop + } + } + + describe("Ticket 343 Issue #5") { + it("vector slice() should not ignore elements added in current transaction") { + val proc = actorOf[SampleVectorStorage] + proc.start + + // add 4 elements in separate transactions + (proc !! VADD("debasish")).getOrElse("VADD failed") should equal(1) + (proc !! VADD("maulindu")).getOrElse("VADD failed") should equal(2) + (proc !! VADD("ramanendu")).getOrElse("VADD failed") should equal(3) + (proc !! VADD("nilanjan")).getOrElse("VADD failed") should equal(4) + + // slice with no new elements added in current transaction + (proc !! VADD_WITH_SLICE(List(), 2, 2)).getOrElse("VADD_WITH_SLICE failed") should equal(Vector("maulindu", "debasish")) + + // slice with new elements added in current transaction + (proc !! VADD_WITH_SLICE(List("a", "b", "c", "d"), 2, 2)).getOrElse("VADD_WITH_SLICE failed") should equal(Vector("b", "a")) + proc.stop + } + } +} diff --git a/akka-persistence/akka-persistence-common/src/test/scala/VectorStorageBackendTest.scala b/akka-persistence/akka-persistence-common/src/test/scala/VectorStorageBackendTest.scala new file mode 100644 index 0000000000..e677f8fe66 --- /dev/null +++ b/akka-persistence/akka-persistence-common/src/test/scala/VectorStorageBackendTest.scala @@ -0,0 +1,123 @@ +/** + * Copyright (C) 2009-2010 Scalable Solutions AB + */ + +package se.scalablesolutions.akka.persistence.common + +import org.scalatest.matchers.ShouldMatchers +import se.scalablesolutions.akka.util.Logging +import org.scalatest.{BeforeAndAfterEach, Spec} +import scala.util.Random + +/** + * Implementation Compatibility test for PersistentVector backend implementations. + */ + +trait VectorStorageBackendTest extends Spec with ShouldMatchers with BeforeAndAfterEach with Logging { + def storage: VectorStorageBackend[Array[Byte]] + + def dropVectors: Unit + + override def beforeEach = { + log.info("beforeEach: dropping vectors") + dropVectors + } + + override def afterEach = { + log.info("afterEach: dropping vectors") + dropVectors + } + + + + describe("A Properly functioning VectorStorageBackend") { + it("should insertVectorStorageEntry as a logical prepend operation to the existing list") { + val vector = "insertSingleTest" + val rand = new Random(3).nextInt(100) + val values = (0 to rand).toList.map {i: Int => vector + "value" + i} + storage.getVectorStorageSizeFor(vector) should be(0) + values.foreach {s: String => storage.insertVectorStorageEntryFor(vector, s.getBytes)} + val shouldRetrieve = values.reverse + (0 to rand).foreach { + i: Int => { + shouldRetrieve(i) should be(new String(storage.getVectorStorageEntryFor(vector, i))) + } + } + } + + it("should insertVectorStorageEntries as a logical prepend operation to the existing list") { + val vector = "insertMultiTest" + val rand = new Random(3).nextInt(100) + val values = (0 to rand).toList.map {i: Int => vector + "value" + i} + storage.getVectorStorageSizeFor(vector) should be(0) + storage.insertVectorStorageEntriesFor(vector, values.map {s: String => s.getBytes}) + val shouldRetrieve = values.reverse + (0 to rand).foreach { + i: Int => { + shouldRetrieve(i) should be(new String(storage.getVectorStorageEntryFor(vector, i))) + } + } + } + + it("should successfully update entries") { + val vector = "updateTest" + val rand = new Random(3).nextInt(100) + val values = (0 to rand).toList.map {i: Int => vector + "value" + i} + val urand = new Random(3).nextInt(rand) + storage.insertVectorStorageEntriesFor(vector, values.map {s: String => s.getBytes}) + val toUpdate = "updated" + values.reverse(urand) + storage.updateVectorStorageEntryFor(vector, urand, toUpdate.getBytes) + toUpdate should be(new String(storage.getVectorStorageEntryFor(vector, urand))) + } + + it("should return the correct value from getVectorStorageFor") { + val vector = "getTest" + val rand = new Random(3).nextInt(100) + val values = (0 to rand).toList.map {i: Int => vector + "value" + i} + val urand = new Random(3).nextInt(rand) + storage.insertVectorStorageEntriesFor(vector, values.map {s: String => s.getBytes}) + values.reverse(urand) should be(new String(storage.getVectorStorageEntryFor(vector, urand))) + } + + it("should return the correct values from getVectorStorageRangeFor") { + val vector = "getTest" + val rand = new Random(3).nextInt(100) + val drand = new Random(3).nextInt(rand) + val values = (0 to rand).toList.map {i: Int => vector + "value" + i} + storage.insertVectorStorageEntriesFor(vector, values.map {s: String => s.getBytes}) + values.reverse should be(storage.getVectorStorageRangeFor(vector, None, None, rand + 1).map {b: Array[Byte] => new String(b)}) + (0 to drand).foreach { + i: Int => { + val value: String = vector + "value" + (rand - i) + log.debug(value) + List(value) should be(storage.getVectorStorageRangeFor(vector, Some(i), None, 1).map {b: Array[Byte] => new String(b)}) + } + } + } + + it("should behave properly when the range used in getVectorStorageRangeFor has indexes outside the current size of the vector") { + //what is proper? + } + + it("shoud return null when getStorageEntry is called on a null entry") { + //What is proper? + val vector = "nullTest" + storage.insertVectorStorageEntryFor(vector, null) + storage.getVectorStorageEntryFor(vector, 0) should be(null) + } + + it("shoud throw a Storage exception when there is an attempt to retrieve an index larger than the Vector") { + val vector = "tooLargeRetrieve" + storage.insertVectorStorageEntryFor(vector, null) + evaluating {storage.getVectorStorageEntryFor(vector, 9)} should produce[StorageException] + } + + it("shoud throw a Storage exception when there is an attempt to update an index larger than the Vector") { + val vector = "tooLargeUpdate" + storage.insertVectorStorageEntryFor(vector, null) + evaluating {storage.updateVectorStorageEntryFor(vector, 9, null)} should produce[StorageException] + } + + } + +} \ No newline at end of file diff --git a/akka-persistence/akka-persistence-voldemort/src/main/scala/VoldemortStorage.scala b/akka-persistence/akka-persistence-voldemort/src/main/scala/VoldemortStorage.scala index 4e237267a5..2a9c3c5717 100644 --- a/akka-persistence/akka-persistence-voldemort/src/main/scala/VoldemortStorage.scala +++ b/akka-persistence/akka-persistence-voldemort/src/main/scala/VoldemortStorage.scala @@ -15,14 +15,17 @@ object VoldemortStorage extends Storage { def newMap: PersistentMap[ElementType, ElementType] = newMap(newUuid.toString) def newVector: PersistentVector[ElementType] = newVector(newUuid.toString) def newRef: PersistentRef[ElementType] = newRef(newUuid.toString) + override def newQueue: PersistentQueue[ElementType] = newQueue(newUuid.toString) def getMap(id: String): PersistentMap[ElementType, ElementType] = newMap(id) def getVector(id: String): PersistentVector[ElementType] = newVector(id) def getRef(id: String): PersistentRef[ElementType] = newRef(id) + override def getQueue(id: String): PersistentQueue[ElementType] = newQueue(id) def newMap(id: String): PersistentMap[ElementType, ElementType] = new VoldemortPersistentMap(id) def newVector(id: String): PersistentVector[ElementType] = new VoldemortPersistentVector(id) def newRef(id: String): PersistentRef[ElementType] = new VoldemortPersistentRef(id) + override def newQueue(id:String): PersistentQueue[ElementType] = new VoldemortPersistentQueue(id) } @@ -41,3 +44,8 @@ class VoldemortPersistentRef(id: String) extends PersistentRef[Array[Byte]] { val uuid = id val storage = VoldemortStorageBackend } + +class VoldemortPersistentQueue(id: String) extends PersistentQueue[Array[Byte]] { + val uuid = id + val storage = VoldemortStorageBackend +} diff --git a/akka-persistence/akka-persistence-voldemort/src/main/scala/VoldemortStorageBackend.scala b/akka-persistence/akka-persistence-voldemort/src/main/scala/VoldemortStorageBackend.scala index 20b9804ed4..abc7855d9c 100644 --- a/akka-persistence/akka-persistence-voldemort/src/main/scala/VoldemortStorageBackend.scala +++ b/akka-persistence/akka-persistence-voldemort/src/main/scala/VoldemortStorageBackend.scala @@ -17,9 +17,10 @@ import voldemort.versioning.Versioned import collection.JavaConversions import java.nio.ByteBuffer import collection.Map -import collection.immutable.{IndexedSeq, SortedSet, TreeSet, HashMap} import collection.mutable.{Set, HashSet, ArrayBuffer} import java.util.{Properties, Map => JMap} +import se.scalablesolutions.akka.persistence.common.PersistentMapBinary.COrdering._ +import collection.immutable._ /* RequiredReads + RequiredWrites should be > ReplicationFactor for all Voldemort Stores @@ -49,28 +50,28 @@ MapStorageBackend[Array[Byte], Array[Byte]] with var queueClient: StoreClient[Array[Byte], Array[Byte]] = null initStoreClients + val nullMapValueHeader = 0x00.byteValue + val nullMapValue: Array[Byte] = Array(nullMapValueHeader) + val notNullMapValueHeader: Byte = 0xff.byteValue val underscoreBytesUTF8 = "_".getBytes("UTF-8") val mapKeysIndex = getIndexedBytes(-1) val vectorSizeIndex = getIndexedBytes(-1) val queueHeadIndex = getIndexedBytes(-1) val queueTailIndex = getIndexedBytes(-2) - - - implicit val byteOrder = new Ordering[Array[Byte]] { - override def compare(x: Array[Byte], y: Array[Byte]) = ByteUtils.compare(x, y) - } + //explicit implicit :) + implicit val ordering = ArrayOrdering def getRefStorageFor(name: String): Option[Array[Byte]] = { val result: Array[Byte] = refClient.getValue(name) - result match { - case null => None - case _ => Some(result) - } + Option(result) } def insertRefStorageFor(name: String, element: Array[Byte]) = { - refClient.put(name, element) + element match { + case null => refClient.delete(name) + case _ => refClient.put(name, element) + } } def getMapStorageRangeFor(name: String, start: Option[Array[Byte]], finish: Option[Array[Byte]], count: Int): List[(Array[Byte], Array[Byte])] = { @@ -90,17 +91,17 @@ MapStorageBackend[Array[Byte], Array[Byte]] with mapKey => getKey(name, mapKey) })) - val buf = new ArrayBuffer[(Array[Byte], Array[Byte])](all.size) + var returned = new TreeMap[Array[Byte], Array[Byte]]()(ordering) JavaConversions.asMap(all).foreach { (entry) => { entry match { - case (key: Array[Byte], versioned: Versioned[Array[Byte]]) => { - buf += key -> versioned.getValue + case (namePlusKey: Array[Byte], versioned: Versioned[Array[Byte]]) => { + returned += getMapKeyFromKey(name, namePlusKey) -> getMapValueFromStored(versioned.getValue) } } } } - buf.toList + returned.toList } def getMapStorageSizeFor(name: String): Int = { @@ -112,7 +113,7 @@ MapStorageBackend[Array[Byte], Array[Byte]] with val result: Array[Byte] = mapClient.getValue(getKey(name, key)) result match { case null => None - case _ => Some(result) + case _ => Some(getMapValueFromStored(result)) } } @@ -134,7 +135,7 @@ MapStorageBackend[Array[Byte], Array[Byte]] with } def insertMapStorageEntryFor(name: String, key: Array[Byte], value: Array[Byte]) = { - mapClient.put(getKey(name, key), value) + mapClient.put(getKey(name, key), getStoredMapValue(value)) var keys = getMapKeys(name) keys += key putMapKeys(name, keys) @@ -143,7 +144,7 @@ MapStorageBackend[Array[Byte], Array[Byte]] with def insertMapStorageEntriesFor(name: String, entries: List[(Array[Byte], Array[Byte])]) = { val newKeys = entries.map { case (key, value) => { - mapClient.put(getKey(name, key), value) + mapClient.put(getKey(name, key), getStoredMapValue(value)) key } } @@ -169,17 +170,22 @@ MapStorageBackend[Array[Byte], Array[Byte]] with def getVectorStorageRangeFor(name: String, start: Option[Int], finish: Option[Int], count: Int): List[Array[Byte]] = { val size = getVectorStorageSizeFor(name) val st = start.getOrElse(0) - val cnt = + var cnt = if (finish.isDefined) { val f = finish.get if (f >= st) (f - st) else count } else { count } - val seq: IndexedSeq[Array[Byte]] = (st until st + cnt).map { - index => getIndexedKey(name, index) + if (cnt > (size - st)) { + cnt = size - st } + + val seq: IndexedSeq[Array[Byte]] = (st until st + cnt).map { + index => getIndexedKey(name, (size - 1) - index) + } //read backwards + val all: JMap[Array[Byte], Versioned[Array[Byte]]] = vectorClient.getAll(JavaConversions.asIterable(seq)) var storage = new ArrayBuffer[Array[Byte]](seq.size) @@ -199,14 +205,23 @@ MapStorageBackend[Array[Byte], Array[Byte]] with def getVectorStorageEntryFor(name: String, index: Int): Array[Byte] = { - vectorClient.getValue(getIndexedKey(name, index), Array.empty[Byte]) + val size = getVectorStorageSizeFor(name) + if (size > 0 && index < size) { + vectorClient.getValue(getIndexedKey(name, /*read backwards*/ (size - 1) - index)) + } else { + throw new StorageException("In Vector:" + name + " No such Index:" + index) + } } def updateVectorStorageEntryFor(name: String, index: Int, elem: Array[Byte]) = { val size = getVectorStorageSizeFor(name) - vectorClient.put(getIndexedKey(name, index), elem) - if (size < index + 1) { - vectorClient.put(getKey(name, vectorSizeIndex), IntSerializer.toBytes(index + 1)) + if (size > 0 && index < size) { + elem match { + case null => vectorClient.delete(getIndexedKey(name, /*read backwards*/ (size - 1) - index)) + case _ => vectorClient.put(getIndexedKey(name, /*read backwards*/ (size - 1) - index), elem) + } + } else { + throw new StorageException("In Vector:" + name + " No such Index:" + index) } } @@ -214,7 +229,9 @@ MapStorageBackend[Array[Byte], Array[Byte]] with var size = getVectorStorageSizeFor(name) elements.foreach { element => - vectorClient.put(getIndexedKey(name, size), element) + if (element != null) { + vectorClient.put(getIndexedKey(name, size), element) + } size += 1 } vectorClient.put(getKey(name, vectorSizeIndex), IntSerializer.toBytes(size)) @@ -263,7 +280,7 @@ MapStorageBackend[Array[Byte], Array[Byte]] with try { queueClient.delete(key) } catch { - //a failure to delete is ok, just leaves a K-V in Voldemort that will be overwritten if the queue ever wraps around + //a failure to delete is ok, just leaves a K-V in Voldemort that will be overwritten if the queue ever wraps around case e: Exception => log.warn(e, "caught an exception while deleting a dequeued element, however this will not cause any inconsistency in the queue") } } @@ -276,7 +293,10 @@ MapStorageBackend[Array[Byte], Array[Byte]] with val mdata = getQueueMetadata(name) if (mdata.canEnqueue) { val key = getIndexedKey(name, mdata.tail) - queueClient.put(key, item) + item match { + case null => queueClient.delete(key) + case _ => queueClient.put(key, item) + } queueClient.put(getKey(name, queueTailIndex), IntSerializer.toBytes(mdata.nextEnqueue)) Some(mdata.size + 1) } else { @@ -332,6 +352,39 @@ MapStorageBackend[Array[Byte], Array[Byte]] with IntSerializer.fromBytes(indexBytes) } + def getMapKeyFromKey(owner: String, key: Array[Byte]): Array[Byte] = { + val mapKeyLength = key.length - IntSerializer.bytesPerInt - owner.getBytes("UTF-8").length + val mapkey = new Array[Byte](mapKeyLength) + System.arraycopy(key, key.length - mapKeyLength, mapkey, 0, mapKeyLength) + mapkey + } + + //wrapper for null + def getStoredMapValue(value: Array[Byte]): Array[Byte] = { + value match { + case null => nullMapValue + case value => { + val stored = new Array[Byte](value.length + 1) + stored(0) = notNullMapValueHeader + System.arraycopy(value, 0, stored, 1, value.length) + stored + } + } + } + + def getMapValueFromStored(value: Array[Byte]): Array[Byte] = { + + if (value(0) == nullMapValueHeader) { + null + } else if (value(0) == notNullMapValueHeader) { + val returned = new Array[Byte](value.length - 1) + System.arraycopy(value, 1, returned, 0, value.length - 1) + returned + } else { + throw new StorageException("unknown header byte on map value:" + value(0)) + } + } + def getClientConfig(configMap: Map[String, String]): Properties = { val properites = new Properties @@ -450,6 +503,8 @@ MapStorageBackend[Array[Byte], Array[Byte]] with } def fromBytes(bytes: Array[Byte]): SortedSet[Array[Byte]] = { + import se.scalablesolutions.akka.persistence.common.PersistentMapBinary.COrdering._ + var set = new TreeSet[Array[Byte]] if (bytes.length > IntSerializer.bytesPerInt) { var pos = 0 diff --git a/akka-persistence/akka-persistence-voldemort/src/test/scala/EmbeddedVoldemort.scala b/akka-persistence/akka-persistence-voldemort/src/test/scala/EmbeddedVoldemort.scala index ce87309fb9..d0f40f1a03 100644 --- a/akka-persistence/akka-persistence-voldemort/src/test/scala/EmbeddedVoldemort.scala +++ b/akka-persistence/akka-persistence-voldemort/src/test/scala/EmbeddedVoldemort.scala @@ -1,20 +1,20 @@ package se.scalablesolutions.akka.persistence.voldemort -import org.scalatest.matchers.ShouldMatchers import voldemort.server.{VoldemortServer, VoldemortConfig} -import org.scalatest.{Suite, BeforeAndAfterAll, FunSuite} +import org.scalatest.{Suite, BeforeAndAfterAll} import org.junit.runner.RunWith import org.scalatest.junit.JUnitRunner -import voldemort.utils.Utils import java.io.File import se.scalablesolutions.akka.util.{Logging} import collection.JavaConversions import voldemort.store.memory.InMemoryStorageConfiguration +import voldemort.client.protocol.admin.{AdminClientConfig, AdminClient} + -@RunWith(classOf[JUnitRunner]) trait EmbeddedVoldemort extends BeforeAndAfterAll with Logging { this: Suite => var server: VoldemortServer = null + var admin: AdminClient = null override protected def beforeAll(): Unit = { @@ -28,6 +28,7 @@ trait EmbeddedVoldemort extends BeforeAndAfterAll with Logging { server = new VoldemortServer(config) server.start VoldemortStorageBackend.initStoreClients + admin = new AdminClient(VoldemortStorageBackend.clientConfig.getProperty(VoldemortStorageBackend.bootstrapUrlsProp), new AdminClientConfig) log.info("Started") } catch { case e => log.error(e, "Error Starting Voldemort") @@ -36,6 +37,7 @@ trait EmbeddedVoldemort extends BeforeAndAfterAll with Logging { } override protected def afterAll(): Unit = { + admin.stop server.stop } } \ No newline at end of file diff --git a/akka-persistence/akka-persistence-voldemort/src/test/scala/VoldemortPersistentDatastructureSuite.scala b/akka-persistence/akka-persistence-voldemort/src/test/scala/VoldemortPersistentDatastructureSuite.scala deleted file mode 100644 index 76bb989ac9..0000000000 --- a/akka-persistence/akka-persistence-voldemort/src/test/scala/VoldemortPersistentDatastructureSuite.scala +++ /dev/null @@ -1,87 +0,0 @@ -package se.scalablesolutions.akka.persistence.voldemort - -import org.scalatest.FunSuite -import org.scalatest.matchers.ShouldMatchers -import org.junit.runner.RunWith -import org.scalatest.junit.JUnitRunner -import se.scalablesolutions.akka.persistence.voldemort.VoldemortStorageBackend._ -import se.scalablesolutions.akka.actor.{newUuid,Uuid} -import collection.immutable.TreeSet -import VoldemortStorageBackendSuite._ - -import se.scalablesolutions.akka.stm._ -import se.scalablesolutions.akka.stm.global._ -import se.scalablesolutions.akka.config.ScalaConfig._ -import se.scalablesolutions.akka.persistence.common._ -import se.scalablesolutions.akka.util.Logging -import se.scalablesolutions.akka.config.Config.config - -@RunWith(classOf[JUnitRunner]) -class VoldemortPersistentDatastructureSuite extends FunSuite with ShouldMatchers with EmbeddedVoldemort with Logging { - test("persistentRefs work as expected") { - val name = newUuid.toString - val one = "one".getBytes - atomic { - val ref = VoldemortStorage.getRef(name) - ref.isDefined should be(false) - ref.swap(one) - ref.get match { - case Some(bytes) => bytes should be(one) - case None => true should be(false) - } - } - val two = "two".getBytes - atomic { - val ref = VoldemortStorage.getRef(name) - ref.isDefined should be(true) - ref.swap(two) - ref.get match { - case Some(bytes) => bytes should be(two) - case None => true should be(false) - } - } - } - - - test("Persistent Vectors function as expected") { - val name = newUuid.toString - val one = "one".getBytes - val two = "two".getBytes - atomic { - val vec = VoldemortStorage.getVector(name) - vec.add(one) - } - atomic { - val vec = VoldemortStorage.getVector(name) - vec.size should be(1) - vec.add(two) - } - atomic { - val vec = VoldemortStorage.getVector(name) - - vec.get(0) should be(one) - vec.get(1) should be(two) - vec.size should be(2) - vec.update(0, two) - } - - atomic { - val vec = VoldemortStorage.getVector(name) - vec.get(0) should be(two) - vec.get(1) should be(two) - vec.size should be(2) - vec.update(0, Array.empty[Byte]) - vec.update(1, Array.empty[Byte]) - } - - atomic { - val vec = VoldemortStorage.getVector(name) - vec.get(0) should be(Array.empty[Byte]) - vec.get(1) should be(Array.empty[Byte]) - vec.size should be(2) - } - - - } - -} \ No newline at end of file diff --git a/akka-persistence/akka-persistence-voldemort/src/test/scala/VoldemortStorageBackendCompatibilityTest.scala b/akka-persistence/akka-persistence-voldemort/src/test/scala/VoldemortStorageBackendCompatibilityTest.scala new file mode 100644 index 0000000000..b9b3ea4ed1 --- /dev/null +++ b/akka-persistence/akka-persistence-voldemort/src/test/scala/VoldemortStorageBackendCompatibilityTest.scala @@ -0,0 +1,49 @@ +package se.scalablesolutions.akka.persistence.voldemort + + +import org.junit.runner.RunWith +import org.scalatest.junit.JUnitRunner +import se.scalablesolutions.akka.persistence.common.{QueueStorageBackendTest, VectorStorageBackendTest, MapStorageBackendTest, RefStorageBackendTest} + +@RunWith(classOf[JUnitRunner]) +class VoldemortRefStorageBackendTest extends RefStorageBackendTest with EmbeddedVoldemort { + def dropRefs = { + admin.truncate(0, VoldemortStorageBackend.refStore) + } + + + def storage = VoldemortStorageBackend +} + +@RunWith(classOf[JUnitRunner]) +class VoldemortMapStorageBackendTest extends MapStorageBackendTest with EmbeddedVoldemort { + def dropMaps = { + admin.truncate(0, VoldemortStorageBackend.mapStore) + } + + + def storage = VoldemortStorageBackend +} + +@RunWith(classOf[JUnitRunner]) +class VoldemortVectorStorageBackendTest extends VectorStorageBackendTest with EmbeddedVoldemort { + def dropVectors = { + admin.truncate(0, VoldemortStorageBackend.vectorStore) + } + + + def storage = VoldemortStorageBackend +} + + +@RunWith(classOf[JUnitRunner]) +class VoldemortQueueStorageBackendTest extends QueueStorageBackendTest with EmbeddedVoldemort { + def dropQueues = { + admin.truncate(0, VoldemortStorageBackend.queueStore) + } + + + def storage = VoldemortStorageBackend +} + + diff --git a/akka-persistence/akka-persistence-voldemort/src/test/scala/VoldemortStorageBackendSuite.scala b/akka-persistence/akka-persistence-voldemort/src/test/scala/VoldemortStorageBackendSuite.scala index 8ac3d306c4..b28ea90171 100644 --- a/akka-persistence/akka-persistence-voldemort/src/test/scala/VoldemortStorageBackendSuite.scala +++ b/akka-persistence/akka-persistence-voldemort/src/test/scala/VoldemortStorageBackendSuite.scala @@ -103,10 +103,7 @@ class VoldemortStorageBackendSuite extends FunSuite with ShouldMatchers with Emb vectorClient.delete(getKey(key, vectorSizeIndex)) vectorClient.delete(getIndexedKey(key, 0)) vectorClient.delete(getIndexedKey(key, 1)) - getVectorStorageEntryFor(key, 0) should be(empty) - getVectorStorageEntryFor(key, 1) should be(empty) - getVectorStorageRangeFor(key, None, None, 1).head should be(empty) - + insertVectorStorageEntryFor(key, value) //again insertVectorStorageEntryFor(key, value) diff --git a/akka-persistence/akka-persistence-voldemort/src/test/scala/VoldemortTicket343Test.scala b/akka-persistence/akka-persistence-voldemort/src/test/scala/VoldemortTicket343Test.scala new file mode 100644 index 0000000000..b170f949cf --- /dev/null +++ b/akka-persistence/akka-persistence-voldemort/src/test/scala/VoldemortTicket343Test.scala @@ -0,0 +1,22 @@ +/** + * Copyright (C) 2009-2010 Scalable Solutions AB + */ + +package se.scalablesolutions.akka.persistence.voldemort + + +import org.junit.runner.RunWith +import org.scalatest.junit.JUnitRunner +import se.scalablesolutions.akka.persistence.common._ + +@RunWith(classOf[JUnitRunner]) +class VoldemortTicket343Test extends Ticket343Test with EmbeddedVoldemort { + def dropMapsAndVectors: Unit = { + admin.truncate(0, VoldemortStorageBackend.mapStore) + admin.truncate(0, VoldemortStorageBackend.vectorStore) + } + + def getVector: (String) => PersistentVector[Array[Byte]] = VoldemortStorage.getVector + + def getMap: (String) => PersistentMap[Array[Byte], Array[Byte]] = VoldemortStorage.getMap +} \ No newline at end of file diff --git a/project/build/AkkaProject.scala b/project/build/AkkaProject.scala index bbb4cd6a88..c21aba9267 100644 --- a/project/build/AkkaProject.scala +++ b/project/build/AkkaProject.scala @@ -595,7 +595,7 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) { val dbcp = Dependencies.dbcp val sjson = Dependencies.sjson_test - override def testOptions = createTestFilter( _.endsWith("Suite")) + override def testOptions = createTestFilter({ s:String=> s.endsWith("Suite") || s.endsWith("Test")}) }