diff --git a/akka-persistence/akka-persistence-common/src/main/scala/Storage.scala b/akka-persistence/akka-persistence-common/src/main/scala/Storage.scala index e75fd9581c..08d42c9148 100644 --- a/akka-persistence/akka-persistence-common/src/main/scala/Storage.scala +++ b/akka-persistence/akka-persistence-common/src/main/scala/Storage.scala @@ -7,6 +7,7 @@ package se.scalablesolutions.akka.persistence.common import se.scalablesolutions.akka.stm._ import se.scalablesolutions.akka.stm.TransactionManagement.transaction import se.scalablesolutions.akka.util.Logging +import collection.mutable.ArraySeq // FIXME move to 'stm' package + add message with more info class NoTransactionInScopeException extends RuntimeException @@ -47,26 +48,38 @@ trait Storage { type ElementType def newMap: PersistentMap[ElementType, ElementType] + def newVector: PersistentVector[ElementType] + def newRef: PersistentRef[ElementType] + def newQueue: PersistentQueue[ElementType] = // only implemented for redis throw new UnsupportedOperationException + def newSortedSet: PersistentSortedSet[ElementType] = // only implemented for redis throw new UnsupportedOperationException def getMap(id: String): PersistentMap[ElementType, ElementType] + def getVector(id: String): PersistentVector[ElementType] + def getRef(id: String): PersistentRef[ElementType] + def getQueue(id: String): PersistentQueue[ElementType] = // only implemented for redis throw new UnsupportedOperationException + def getSortedSet(id: String): PersistentSortedSet[ElementType] = // only implemented for redis throw new UnsupportedOperationException def newMap(id: String): PersistentMap[ElementType, ElementType] + def newVector(id: String): PersistentVector[ElementType] + def newRef(id: String): PersistentRef[ElementType] + def newQueue(id: String): PersistentQueue[ElementType] = // only implemented for redis throw new UnsupportedOperationException + def newSortedSet(id: String): PersistentSortedSet[ElementType] = // only implemented for redis throw new UnsupportedOperationException } @@ -90,7 +103,7 @@ private[akka] object PersistentMap { * @author Jonas Bonér */ trait PersistentMap[K, V] extends scala.collection.mutable.Map[K, V] - with Transactional with Committable with Abortable with Logging { + with Transactional with Committable with Abortable with Logging { //Import Ops import PersistentMap._ @@ -118,7 +131,7 @@ trait PersistentMap[K, V] extends scala.collection.mutable.Map[K, V] protected def clearDistinctKeys = keysInCurrentTx.clear protected def filterTxLogByKey(key: K): IndexedSeq[LogEntry] = - appendOnlyTxLog filter(e => e.key.map(equal(_, key)).getOrElse(true)) + appendOnlyTxLog filter (e => e.key.map(equal(_, key)).getOrElse(true)) // need to get current value considering the underlying storage as well as the transaction log protected def getCurrentValue(key: K): Option[V] = { @@ -129,7 +142,7 @@ trait PersistentMap[K, V] extends scala.collection.mutable.Map[K, V] // get the snapshot from the underlying store for this key val underlying = try { storage.getMapStorageEntryFor(uuid, key) - } catch { case e: Exception => None } + } catch {case e: Exception => None} if (txEntries.isEmpty) underlying else txEntries.last match { @@ -146,12 +159,14 @@ trait PersistentMap[K, V] extends scala.collection.mutable.Map[K, V] case None => Map.empty[K, V] case Some(v) => Map((key, v)) } - txEntries.foreach {case LogEntry(k, v, o) => o match { - case PUT => m.put(k.get, v.get) - case REM => m -= k.get - case UPD => m.update(k.get, v.get) - case CLR => Map.empty[K, V] - }} + txEntries.foreach { + case LogEntry(k, v, o) => o match { + case PUT => m.put(k.get, v.get) + case REM => m -= k.get + case UPD => m.update(k.get, v.get) + case CLR => Map.empty[K, V] + } + } m get key } @@ -159,12 +174,14 @@ trait PersistentMap[K, V] extends scala.collection.mutable.Map[K, V] val storage: MapStorageBackend[K, V] def commit = { - appendOnlyTxLog.foreach { case LogEntry(k, v, o) => o match { - case PUT => storage.insertMapStorageEntryFor(uuid, k.get, v.get) - case UPD => storage.insertMapStorageEntryFor(uuid, k.get, v.get) - case REM => storage.removeMapStorageFor(uuid, k.get) - case CLR => storage.removeMapStorageFor(uuid) - }} + appendOnlyTxLog.foreach { + case LogEntry(k, v, o) => o match { + case PUT => storage.insertMapStorageEntryFor(uuid, k.get, v.get) + case UPD => storage.insertMapStorageEntryFor(uuid, k.get, v.get) + case REM => storage.removeMapStorageFor(uuid, k.get) + case CLR => storage.removeMapStorageFor(uuid) + } + } appendOnlyTxLog.clear clearDistinctKeys @@ -180,8 +197,8 @@ trait PersistentMap[K, V] extends scala.collection.mutable.Map[K, V] this } - override def +=(kv : (K,V)) = { - put(kv._1,kv._2) + override def +=(kv: (K, V)) = { + put(kv._1, kv._2) this } @@ -230,10 +247,10 @@ trait PersistentMap[K, V] extends scala.collection.mutable.Map[K, V] case Seq() => // current tx doesn't use this storage.getMapStorageEntryFor(uuid, key).isDefined // check storage case txs => // present in log - val lastOp = txs.last.op + val lastOp = txs.last.op lastOp != REM && lastOp != CLR // last entry cannot be a REM - } - } catch { case e: Exception => false } + } + } catch {case e: Exception => false} protected def existsInStorage(key: K): Option[V] = try { storage.getMapStorageEntryFor(uuid, key) @@ -243,33 +260,33 @@ trait PersistentMap[K, V] extends scala.collection.mutable.Map[K, V] override def size: Int = try { // partition key set affected in current tx into those which r added & which r deleted - val (keysAdded, keysRemoved) = keysInCurrentTx.map { + val (keysAdded, keysRemoved) = keysInCurrentTx.map { case (kseq, k) => ((kseq, k), getCurrentValue(k)) }.partition(_._2.isDefined) // keys which existed in storage but removed in current tx - val inStorageRemovedInTx = - keysRemoved.keySet - .map(_._2) - .filter(k => existsInStorage(k).isDefined) - .size + val inStorageRemovedInTx = + keysRemoved.keySet + .map(_._2) + .filter(k => existsInStorage(k).isDefined) + .size // all keys in storage - val keysInStorage = - storage.getMapStorageFor(uuid) - .map { case (k, v) => toEquals(k) } - .toSet + val keysInStorage = + storage.getMapStorageFor(uuid) + .map {case (k, v) => toEquals(k)} + .toSet // (keys that existed UNION keys added ) - (keys removed) (keysInStorage union keysAdded.keySet.map(_._1)).size - inStorageRemovedInTx - } catch { - case e: Exception => 0 + } catch { + case e: Exception => 0 } // get must consider underlying storage & current uncommitted tx log override def get(key: K): Option[V] = getCurrentValue(key) - def iterator: Iterator[Tuple2[K, V]] + def iterator: Iterator[Tuple2[K, V]] private def register = { if (transaction.get.isEmpty) throw new NoTransactionInScopeException @@ -277,38 +294,50 @@ trait PersistentMap[K, V] extends scala.collection.mutable.Map[K, V] } } +object PersistentMapBinary { + object COrdering { + //frontend + implicit object ArraySeqOrdering extends Ordering[ArraySeq[Byte]] { + def compare(o1: ArraySeq[Byte], o2: ArraySeq[Byte]) = + ArrayOrdering.compare(o1.toArray, o2.toArray) + } + //backend + implicit object ArrayOrdering extends Ordering[Array[Byte]] { + def compare(o1: Array[Byte], o2: Array[Byte]) = + new String(o1) compare new String(o2) + } + } +} + trait PersistentMapBinary extends PersistentMap[Array[Byte], Array[Byte]] { import scala.collection.mutable.ArraySeq type T = ArraySeq[Byte] + def toEquals(k: Array[Byte]) = ArraySeq(k: _*) + override def equal(k1: Array[Byte], k2: Array[Byte]): Boolean = k1 sameElements k2 - object COrdering { - implicit object ArraySeqOrdering extends Ordering[ArraySeq[Byte]] { - def compare(o1: ArraySeq[Byte], o2: ArraySeq[Byte]) = - new String(o1.toArray) compare new String(o2.toArray) - } - } + import scala.collection.immutable.{TreeMap, SortedMap} private def replayAllKeys: SortedMap[ArraySeq[Byte], Array[Byte]] = { - import COrdering._ + import PersistentMapBinary.COrdering._ // need ArraySeq for ordering - val fromStorage = - TreeMap(storage.getMapStorageFor(uuid).map { case (k, v) => (ArraySeq(k: _*), v) }: _*) + val fromStorage = + TreeMap(storage.getMapStorageFor(uuid).map {case (k, v) => (ArraySeq(k: _*), v)}: _*) - val (keysAdded, keysRemoved) = keysInCurrentTx.map { + val (keysAdded, keysRemoved) = keysInCurrentTx.map { case (_, k) => (k, getCurrentValue(k)) }.partition(_._2.isDefined) - val inStorageRemovedInTx = - keysRemoved.keySet - .filter(k => existsInStorage(k).isDefined) - .map(k => ArraySeq(k: _*)) + val inStorageRemovedInTx = + keysRemoved.keySet + .filter(k => existsInStorage(k).isDefined) + .map(k => ArraySeq(k: _*)) - (fromStorage -- inStorageRemovedInTx) ++ keysAdded.map { case (k, v) => (ArraySeq(k: _*), v.get) } + (fromStorage -- inStorageRemovedInTx) ++ keysAdded.map {case (k, v) => (ArraySeq(k: _*), v.get)} } override def slice(start: Option[Array[Byte]], finish: Option[Array[Byte]], count: Int): List[(Array[Byte], Array[Byte])] = try { @@ -317,51 +346,53 @@ trait PersistentMapBinary extends PersistentMap[Array[Byte], Array[Byte]] { if (newMap isEmpty) List[(Array[Byte], Array[Byte])]() val startKey = - start match { - case Some(bytes) => Some(ArraySeq(bytes: _*)) - case None => None - } + start match { + case Some(bytes) => Some(ArraySeq(bytes: _*)) + case None => None + } val endKey = - finish match { - case Some(bytes) => Some(ArraySeq(bytes: _*)) - case None => None - } + finish match { + case Some(bytes) => Some(ArraySeq(bytes: _*)) + case None => None + } ((startKey, endKey, count): @unchecked) match { case ((Some(s), Some(e), _)) => newMap.range(s, e) - .toList - .map(e => (e._1.toArray, e._2)) - .toList + .toList + .map(e => (e._1.toArray, e._2)) + .toList case ((Some(s), None, c)) if c > 0 => newMap.from(s) - .iterator - .take(count) - .map(e => (e._1.toArray, e._2)) - .toList + .iterator + .take(count) + .map(e => (e._1.toArray, e._2)) + .toList case ((Some(s), None, _)) => newMap.from(s) - .toList - .map(e => (e._1.toArray, e._2)) - .toList + .toList + .map(e => (e._1.toArray, e._2)) + .toList case ((None, Some(e), _)) => newMap.until(e) - .toList - .map(e => (e._1.toArray, e._2)) - .toList + .toList + .map(e => (e._1.toArray, e._2)) + .toList } - } catch { case e: Exception => Nil } + } catch {case e: Exception => Nil} - override def iterator: Iterator[(Array[Byte], Array[Byte])] = { + override def iterator: Iterator[(Array[Byte], Array[Byte])] = { new Iterator[(Array[Byte], Array[Byte])] { private var elements = replayAllKeys + override def next: (Array[Byte], Array[Byte]) = synchronized { val (k, v) = elements.head elements = elements.tail (k.toArray, v) } - override def hasNext: Boolean = synchronized { !elements.isEmpty } + + override def hasNext: Boolean = synchronized {!elements.isEmpty} } } } @@ -394,7 +425,7 @@ trait PersistentVector[T] extends IndexedSeq[T] with Transactional with Committa val storage: VectorStorageBackend[T] def commit = { - for(entry <- appendOnlyTxLog) { + for (entry <- appendOnlyTxLog) { (entry: @unchecked) match { case LogEntry(_, Some(v), ADD) => storage.insertVectorStorageEntryFor(uuid, v) case LogEntry(Some(i), Some(v), UPD) => storage.updateVectorStorageEntryFor(uuid, i, v) @@ -412,7 +443,7 @@ trait PersistentVector[T] extends IndexedSeq[T] with Transactional with Committa import scala.collection.mutable.ArrayBuffer var elemsStorage = ArrayBuffer(storage.getVectorStorageRangeFor(uuid, None, None, storage.getVectorStorageSizeFor(uuid)).reverse: _*) - for(entry <- appendOnlyTxLog) { + for (entry <- appendOnlyTxLog) { (entry: @unchecked) match { case LogEntry(_, Some(v), ADD) => elemsStorage += v case LogEntry(Some(i), Some(v), UPD) => elemsStorage.update(i, v) @@ -446,11 +477,11 @@ trait PersistentVector[T] extends IndexedSeq[T] with Transactional with Committa val curr = replay val s = if (start.isDefined) start.get else 0 val cnt = - if (finish.isDefined) { - val f = finish.get - if (f >= s) (f - s) else count - } - else count + if (finish.isDefined) { + val f = finish.get + if (f >= s) (f - s) else count + } + else count if (s == 0 && cnt == 0) List().toIndexedSeq else curr.slice(s, s + cnt).toIndexedSeq } @@ -519,12 +550,12 @@ trait PersistentRef[T] extends Transactional with Committable with Abortable { } } - private[akka] object PersistentQueue { - //Operations for PersistentQueue - sealed trait QueueOp - case object ENQ extends QueueOp - case object DEQ extends QueueOp - } +private[akka] object PersistentQueue { + //Operations for PersistentQueue + sealed trait QueueOp + case object ENQ extends QueueOp + case object DEQ extends QueueOp +} /** * Implementation of PersistentQueue for every concrete @@ -552,7 +583,7 @@ trait PersistentRef[T] extends Transactional with Committable with Abortable { * @author Debasish Ghosh */ trait PersistentQueue[A] extends scala.collection.mutable.Queue[A] - with Transactional with Committable with Abortable with Logging { + with Transactional with Committable with Abortable with Logging { //Import Ops import PersistentQueue._ @@ -575,11 +606,12 @@ trait PersistentQueue[A] extends scala.collection.mutable.Queue[A] val storage: QueueStorageBackend[A] def commit = { - enqueuedNDequeuedEntries.toList.foreach { e => - e._2 match { - case ENQ => storage.enqueue(uuid, e._1.get) - case DEQ => storage.dequeue(uuid) - } + enqueuedNDequeuedEntries.toList.foreach { + e => + e._2 match { + case ENQ => storage.enqueue(uuid, e._1.get) + case DEQ => storage.dequeue(uuid) + } } if (shouldClearOnCommit.isDefined && shouldClearOnCommit.get) { storage.remove(uuid) @@ -635,7 +667,7 @@ trait PersistentQueue[A] extends scala.collection.mutable.Queue[A] override def size: Int = try { storage.size(uuid) + localQ.get.length - } catch { case e: Exception => 0 } + } catch {case e: Exception => 0} override def isEmpty: Boolean = size == 0 @@ -644,10 +676,12 @@ trait PersistentQueue[A] extends scala.collection.mutable.Queue[A] enqueue(elem) this } + def ++=(elems: Iterator[A]) = { enqueue(elems.toList: _*) this } + def ++=(elems: Iterable[A]): Unit = this ++= elems.iterator override def dequeueFirst(p: A => Boolean): Option[A] = @@ -670,24 +704,24 @@ trait PersistentQueue[A] extends scala.collection.mutable.Queue[A] *

* zscore can be implemented in a variety of ways by the calling class: *

- * trait ZScorable {
+ * trait ZScorable       {
  *   def toZScore: Float
  * }
  *
- * class Foo extends ZScorable {
+ * class Foo extends ZScorable       {
  *   //.. implemnetation
  * }
  * 
* Or we can also use views: *
- * class Foo {
+ * class Foo       {
  *   //..
  * }
  *
- * implicit def Foo2Scorable(foo: Foo): ZScorable = new ZScorable {
- *   def toZScore = {
+ * implicit def Foo2Scorable(foo: Foo): ZScorable = new ZScorable       {
+ *   def toZScore =       {
  *     //..
- *   }
+ * }
  * }
  * 
* @@ -696,7 +730,6 @@ trait PersistentQueue[A] extends scala.collection.mutable.Queue[A] * @author */ trait PersistentSortedSet[A] extends Transactional with Committable with Abortable { - protected val newElems = TransactionalMap[A, Float]() protected val removedElems = TransactionalVector[A]() @@ -729,8 +762,8 @@ trait PersistentSortedSet[A] extends Transactional with Committable with Abortab } private def inStorage(elem: A): Option[Float] = storage.zscore(uuid, elem) match { - case Some(s) => Some(s.toFloat) - case None => None + case Some(s) => Some(s.toFloat) + case None => None } def contains(elem: A): Boolean = { @@ -758,8 +791,8 @@ trait PersistentSortedSet[A] extends Transactional with Committable with Abortab def compare(that: (A, Float)) = x._2 compare that._2 } - implicit def ordering = new scala.math.Ordering[(A,Float)] { - def compare(x: (A, Float),y : (A,Float)) = x._2 compare y._2 + implicit def ordering = new scala.math.Ordering[(A, Float)] { + def compare(x: (A, Float), y: (A, Float)) = x._2 compare y._2 } @@ -773,9 +806,9 @@ trait PersistentSortedSet[A] extends Transactional with Committable with Abortab // -1 means the last element, -2 means the second last val s = if (start < 0) start + l else start val e = - if (end < 0) end + l - else if (end >= l) (l - 1) - else end + if (end < 0) end + l + else if (end >= l) (l - 1) + else end // slice is open at the end, we need a closed end range ts.iterator.slice(s, e + 1).toList } diff --git a/akka-persistence/akka-persistence-common/src/test/scala/MapStorageBackendTest.scala b/akka-persistence/akka-persistence-common/src/test/scala/MapStorageBackendTest.scala index 38c2e9e45a..f2203cd282 100644 --- a/akka-persistence/akka-persistence-common/src/test/scala/MapStorageBackendTest.scala +++ b/akka-persistence/akka-persistence-common/src/test/scala/MapStorageBackendTest.scala @@ -8,7 +8,9 @@ import org.scalatest.matchers.ShouldMatchers import se.scalablesolutions.akka.util.Logging import org.scalatest.{BeforeAndAfterEach, Spec} import scala.util.Random -import collection.immutable.{HashMap, HashSet} +import collection.immutable.{TreeMap, HashMap, HashSet} +import se.scalablesolutions.akka.persistence.common.PersistentMapBinary.COrdering._ + /** * Implementation Compatibility test for PersistentMap backend implementations. @@ -106,38 +108,44 @@ trait MapStorageBackendTest extends Spec with ShouldMatchers with BeforeAndAfter - it("should return all the key value pairs in the map (in the correct order?) when getMapStorageFor(name) is called") { + it("should return all the key value pairs in the map in the correct order when getMapStorageFor(name) is called") { val mapName = "allTest" val rand = new Random(3).nextInt(100) - val entries = (1 to rand).toList.map { + var entries = new TreeMap[Array[Byte], Array[Byte]]()(ArrayOrdering) + (1 to rand).foreach { index => - (("allTestKey" + index).getBytes -> ("allTestValue" + index).getBytes) + entries += (("allTestKey" + index).getBytes -> ("allTestValue" + index).getBytes) } - storage.insertMapStorageEntriesFor(mapName, entries) + storage.insertMapStorageEntriesFor(mapName, entries.toList) val retrieved = storage.getMapStorageFor(mapName) retrieved.size should be(rand) entries.size should be(rand) + val entryMap = new HashMap[String, String] ++ entries.map {_ match {case (k, v) => (new String(k), new String(v))}} val retrievedMap = new HashMap[String, String] ++ entries.map {_ match {case (k, v) => (new String(k), new String(v))}} entryMap should equal(retrievedMap) - //Should the ordering of key-vals returned be enforced? - //ordered by key? - //using what comaparison? + (0 until rand).foreach { + i: Int => { + new String(entries.toList(i)._1) should be(new String(retrieved(i)._1)) + } + } + + } + + it("should return all the key->value pairs that exist in the map that are between start and end, up to count pairs when getMapStorageRangeFor is called") { + //implement if this method will be used } it("should not throw an exception when size is called on a non existent map?") { storage.getMapStorageSizeFor("nonExistent") should be(0) } - it("should behave properly when getMapStorageRange is called?") { - //No current code calls getMapStorageRangeFor - } - + } } \ No newline at end of file diff --git a/akka-persistence/akka-persistence-voldemort/src/main/scala/VoldemortStorageBackend.scala b/akka-persistence/akka-persistence-voldemort/src/main/scala/VoldemortStorageBackend.scala index 20b9804ed4..da8fe9c1b6 100644 --- a/akka-persistence/akka-persistence-voldemort/src/main/scala/VoldemortStorageBackend.scala +++ b/akka-persistence/akka-persistence-voldemort/src/main/scala/VoldemortStorageBackend.scala @@ -17,9 +17,10 @@ import voldemort.versioning.Versioned import collection.JavaConversions import java.nio.ByteBuffer import collection.Map -import collection.immutable.{IndexedSeq, SortedSet, TreeSet, HashMap} import collection.mutable.{Set, HashSet, ArrayBuffer} import java.util.{Properties, Map => JMap} +import se.scalablesolutions.akka.persistence.common.PersistentMapBinary.COrdering._ +import collection.immutable._ /* RequiredReads + RequiredWrites should be > ReplicationFactor for all Voldemort Stores @@ -54,11 +55,8 @@ MapStorageBackend[Array[Byte], Array[Byte]] with val vectorSizeIndex = getIndexedBytes(-1) val queueHeadIndex = getIndexedBytes(-1) val queueTailIndex = getIndexedBytes(-2) - - - implicit val byteOrder = new Ordering[Array[Byte]] { - override def compare(x: Array[Byte], y: Array[Byte]) = ByteUtils.compare(x, y) - } + //explicit implicit :) + implicit val ordering = ArrayOrdering def getRefStorageFor(name: String): Option[Array[Byte]] = { @@ -90,17 +88,17 @@ MapStorageBackend[Array[Byte], Array[Byte]] with mapKey => getKey(name, mapKey) })) - val buf = new ArrayBuffer[(Array[Byte], Array[Byte])](all.size) + var returned = new TreeMap[Array[Byte], Array[Byte]]()(ordering) JavaConversions.asMap(all).foreach { (entry) => { entry match { - case (key: Array[Byte], versioned: Versioned[Array[Byte]]) => { - buf += key -> versioned.getValue + case (namePlusKey: Array[Byte], versioned: Versioned[Array[Byte]]) => { + returned += getMapKeyFromKey(name, namePlusKey) -> versioned.getValue } } } } - buf.toList + returned.toList } def getMapStorageSizeFor(name: String): Int = { @@ -263,7 +261,7 @@ MapStorageBackend[Array[Byte], Array[Byte]] with try { queueClient.delete(key) } catch { - //a failure to delete is ok, just leaves a K-V in Voldemort that will be overwritten if the queue ever wraps around + //a failure to delete is ok, just leaves a K-V in Voldemort that will be overwritten if the queue ever wraps around case e: Exception => log.warn(e, "caught an exception while deleting a dequeued element, however this will not cause any inconsistency in the queue") } } @@ -332,6 +330,13 @@ MapStorageBackend[Array[Byte], Array[Byte]] with IntSerializer.fromBytes(indexBytes) } + def getMapKeyFromKey(owner: String, key: Array[Byte]): Array[Byte] = { + val mapKeyLength = key.length - IntSerializer.bytesPerInt - owner.getBytes("UTF-8").length + val mapkey = new Array[Byte](mapKeyLength) + System.arraycopy(key, key.length - mapKeyLength, mapkey, 0, mapKeyLength) + mapkey + } + def getClientConfig(configMap: Map[String, String]): Properties = { val properites = new Properties @@ -450,6 +455,8 @@ MapStorageBackend[Array[Byte], Array[Byte]] with } def fromBytes(bytes: Array[Byte]): SortedSet[Array[Byte]] = { + import se.scalablesolutions.akka.persistence.common.PersistentMapBinary.COrdering._ + var set = new TreeSet[Array[Byte]] if (bytes.length > IntSerializer.bytesPerInt) { var pos = 0 diff --git a/akka-persistence/akka-persistence-voldemort/src/test/scala/VoldemortPersistentDatastructureSuite.scala b/akka-persistence/akka-persistence-voldemort/src/test/scala/VoldemortPersistentDatastructureSuite.scala index 76bb989ac9..b283cad692 100644 --- a/akka-persistence/akka-persistence-voldemort/src/test/scala/VoldemortPersistentDatastructureSuite.scala +++ b/akka-persistence/akka-persistence-voldemort/src/test/scala/VoldemortPersistentDatastructureSuite.scala @@ -5,7 +5,7 @@ import org.scalatest.matchers.ShouldMatchers import org.junit.runner.RunWith import org.scalatest.junit.JUnitRunner import se.scalablesolutions.akka.persistence.voldemort.VoldemortStorageBackend._ -import se.scalablesolutions.akka.actor.{newUuid,Uuid} +import se.scalablesolutions.akka.actor.{newUuid, Uuid} import collection.immutable.TreeSet import VoldemortStorageBackendSuite._ @@ -84,4 +84,18 @@ class VoldemortPersistentDatastructureSuite extends FunSuite with ShouldMatchers } + test("Persistent Maps work as expected") { + atomic { + val map = VoldemortStorage.getMap("map") + map.put("mapTest".getBytes, null) + } + + atomic { + val map = VoldemortStorage.getMap("map") + map.get("mapTest".getBytes).get should be(null) + } + + + } + } \ No newline at end of file