diff --git a/akka-persistence/akka-persistence-common/src/test/scala/MapStorageBackendTest.scala b/akka-persistence/akka-persistence-common/src/test/scala/MapStorageBackendTest.scala index 2021c1d3aa..395d0ef269 100644 --- a/akka-persistence/akka-persistence-common/src/test/scala/MapStorageBackendTest.scala +++ b/akka-persistence/akka-persistence-common/src/test/scala/MapStorageBackendTest.scala @@ -142,8 +142,13 @@ trait MapStorageBackendTest extends Spec with ShouldMatchers with BeforeAndAfter } - it("should return Some(null), not None, for a key that has had the value null set?") { - + it("should return Some(null), not None, for a key that has had the value null set and None for a key with no value set") { + val mapName = "nullTest" + val key = "key".getBytes + storage.insertMapStorageEntryFor(mapName, key, null) + storage.getMapStorageEntryFor(mapName, key).get should be(null) + storage.removeMapStorageFor(mapName, key) + storage.getMapStorageEntryFor(mapName, key) should be(None) } it("should not throw an exception when size is called on a non existent map?") { diff --git a/akka-persistence/akka-persistence-common/src/test/scala/QueueStorageBackendTest.scala b/akka-persistence/akka-persistence-common/src/test/scala/QueueStorageBackendTest.scala index 61730a42d3..3eb89e3db5 100644 --- a/akka-persistence/akka-persistence-common/src/test/scala/QueueStorageBackendTest.scala +++ b/akka-persistence/akka-persistence-common/src/test/scala/QueueStorageBackendTest.scala @@ -111,6 +111,13 @@ trait QueueStorageBackendTest extends Spec with ShouldMatchers with BeforeAndAft storage.remove(queue) storage.size(queue) should be(0) } + + it("should accept null as a value to enqueue and return Some(null) when that value is dequeued") { + val queue = "nullTest" + storage.enqueue(queue, null).get should be(1) + storage.dequeue(queue).get should be(null) + storage.dequeue(queue) should be(None) + } } } \ No newline at end of file diff --git a/akka-persistence/akka-persistence-common/src/test/scala/RefStorageBackendTest.scala b/akka-persistence/akka-persistence-common/src/test/scala/RefStorageBackendTest.scala index 96451bc063..37902cf7c9 100644 --- a/akka-persistence/akka-persistence-common/src/test/scala/RefStorageBackendTest.scala +++ b/akka-persistence/akka-persistence-common/src/test/scala/RefStorageBackendTest.scala @@ -41,6 +41,12 @@ trait RefStorageBackendTest extends Spec with ShouldMatchers with BeforeAndAfter val value = name.getBytes storage.getRefStorageFor(name) should be(None) } + + it("Should return None, not Some(null) when getRefStorageFor is called when null has been set") { + val name = "RefStorageTest #3" + storage.insertRefStorageFor(name, null) + storage.getRefStorageFor(name) should be(None) + } } } \ No newline at end of file diff --git a/akka-persistence/akka-persistence-common/src/test/scala/VectorStorageBackendTest.scala b/akka-persistence/akka-persistence-common/src/test/scala/VectorStorageBackendTest.scala index c88795cc23..e677f8fe66 100644 --- a/akka-persistence/akka-persistence-common/src/test/scala/VectorStorageBackendTest.scala +++ b/akka-persistence/akka-persistence-common/src/test/scala/VectorStorageBackendTest.scala @@ -86,14 +86,36 @@ trait VectorStorageBackendTest extends Spec with ShouldMatchers with BeforeAndAf val values = (0 to rand).toList.map {i: Int => vector + "value" + i} storage.insertVectorStorageEntriesFor(vector, values.map {s: String => s.getBytes}) values.reverse should be(storage.getVectorStorageRangeFor(vector, None, None, rand + 1).map {b: Array[Byte] => new String(b)}) + (0 to drand).foreach { + i: Int => { + val value: String = vector + "value" + (rand - i) + log.debug(value) + List(value) should be(storage.getVectorStorageRangeFor(vector, Some(i), None, 1).map {b: Array[Byte] => new String(b)}) + } + } } it("should behave properly when the range used in getVectorStorageRangeFor has indexes outside the current size of the vector") { //what is proper? } - it("shoud behave properly when getStorageEntry for a non existent entry?") { + it("shoud return null when getStorageEntry is called on a null entry") { //What is proper? + val vector = "nullTest" + storage.insertVectorStorageEntryFor(vector, null) + storage.getVectorStorageEntryFor(vector, 0) should be(null) + } + + it("shoud throw a Storage exception when there is an attempt to retrieve an index larger than the Vector") { + val vector = "tooLargeRetrieve" + storage.insertVectorStorageEntryFor(vector, null) + evaluating {storage.getVectorStorageEntryFor(vector, 9)} should produce[StorageException] + } + + it("shoud throw a Storage exception when there is an attempt to update an index larger than the Vector") { + val vector = "tooLargeUpdate" + storage.insertVectorStorageEntryFor(vector, null) + evaluating {storage.updateVectorStorageEntryFor(vector, 9, null)} should produce[StorageException] } } diff --git a/akka-persistence/akka-persistence-voldemort/src/main/scala/VoldemortStorage.scala b/akka-persistence/akka-persistence-voldemort/src/main/scala/VoldemortStorage.scala index 4e237267a5..2a9c3c5717 100644 --- a/akka-persistence/akka-persistence-voldemort/src/main/scala/VoldemortStorage.scala +++ b/akka-persistence/akka-persistence-voldemort/src/main/scala/VoldemortStorage.scala @@ -15,14 +15,17 @@ object VoldemortStorage extends Storage { def newMap: PersistentMap[ElementType, ElementType] = newMap(newUuid.toString) def newVector: PersistentVector[ElementType] = newVector(newUuid.toString) def newRef: PersistentRef[ElementType] = newRef(newUuid.toString) + override def newQueue: PersistentQueue[ElementType] = newQueue(newUuid.toString) def getMap(id: String): PersistentMap[ElementType, ElementType] = newMap(id) def getVector(id: String): PersistentVector[ElementType] = newVector(id) def getRef(id: String): PersistentRef[ElementType] = newRef(id) + override def getQueue(id: String): PersistentQueue[ElementType] = newQueue(id) def newMap(id: String): PersistentMap[ElementType, ElementType] = new VoldemortPersistentMap(id) def newVector(id: String): PersistentVector[ElementType] = new VoldemortPersistentVector(id) def newRef(id: String): PersistentRef[ElementType] = new VoldemortPersistentRef(id) + override def newQueue(id:String): PersistentQueue[ElementType] = new VoldemortPersistentQueue(id) } @@ -41,3 +44,8 @@ class VoldemortPersistentRef(id: String) extends PersistentRef[Array[Byte]] { val uuid = id val storage = VoldemortStorageBackend } + +class VoldemortPersistentQueue(id: String) extends PersistentQueue[Array[Byte]] { + val uuid = id + val storage = VoldemortStorageBackend +} diff --git a/akka-persistence/akka-persistence-voldemort/src/main/scala/VoldemortStorageBackend.scala b/akka-persistence/akka-persistence-voldemort/src/main/scala/VoldemortStorageBackend.scala index 5cbe0097df..abc7855d9c 100644 --- a/akka-persistence/akka-persistence-voldemort/src/main/scala/VoldemortStorageBackend.scala +++ b/akka-persistence/akka-persistence-voldemort/src/main/scala/VoldemortStorageBackend.scala @@ -50,6 +50,9 @@ MapStorageBackend[Array[Byte], Array[Byte]] with var queueClient: StoreClient[Array[Byte], Array[Byte]] = null initStoreClients + val nullMapValueHeader = 0x00.byteValue + val nullMapValue: Array[Byte] = Array(nullMapValueHeader) + val notNullMapValueHeader: Byte = 0xff.byteValue val underscoreBytesUTF8 = "_".getBytes("UTF-8") val mapKeysIndex = getIndexedBytes(-1) val vectorSizeIndex = getIndexedBytes(-1) @@ -61,14 +64,14 @@ MapStorageBackend[Array[Byte], Array[Byte]] with def getRefStorageFor(name: String): Option[Array[Byte]] = { val result: Array[Byte] = refClient.getValue(name) - result match { - case null => None - case _ => Some(result) - } + Option(result) } def insertRefStorageFor(name: String, element: Array[Byte]) = { - refClient.put(name, element) + element match { + case null => refClient.delete(name) + case _ => refClient.put(name, element) + } } def getMapStorageRangeFor(name: String, start: Option[Array[Byte]], finish: Option[Array[Byte]], count: Int): List[(Array[Byte], Array[Byte])] = { @@ -93,7 +96,7 @@ MapStorageBackend[Array[Byte], Array[Byte]] with (entry) => { entry match { case (namePlusKey: Array[Byte], versioned: Versioned[Array[Byte]]) => { - returned += getMapKeyFromKey(name, namePlusKey) -> versioned.getValue + returned += getMapKeyFromKey(name, namePlusKey) -> getMapValueFromStored(versioned.getValue) } } } @@ -110,7 +113,7 @@ MapStorageBackend[Array[Byte], Array[Byte]] with val result: Array[Byte] = mapClient.getValue(getKey(name, key)) result match { case null => None - case _ => Some(result) + case _ => Some(getMapValueFromStored(result)) } } @@ -132,7 +135,7 @@ MapStorageBackend[Array[Byte], Array[Byte]] with } def insertMapStorageEntryFor(name: String, key: Array[Byte], value: Array[Byte]) = { - mapClient.put(getKey(name, key), value) + mapClient.put(getKey(name, key), getStoredMapValue(value)) var keys = getMapKeys(name) keys += key putMapKeys(name, keys) @@ -141,7 +144,7 @@ MapStorageBackend[Array[Byte], Array[Byte]] with def insertMapStorageEntriesFor(name: String, entries: List[(Array[Byte], Array[Byte])]) = { val newKeys = entries.map { case (key, value) => { - mapClient.put(getKey(name, key), value) + mapClient.put(getKey(name, key), getStoredMapValue(value)) key } } @@ -167,18 +170,21 @@ MapStorageBackend[Array[Byte], Array[Byte]] with def getVectorStorageRangeFor(name: String, start: Option[Int], finish: Option[Int], count: Int): List[Array[Byte]] = { val size = getVectorStorageSizeFor(name) val st = start.getOrElse(0) - val cnt = + var cnt = if (finish.isDefined) { val f = finish.get if (f >= st) (f - st) else count } else { count } + if (cnt > (size - st)) { + cnt = size - st + } val seq: IndexedSeq[Array[Byte]] = (st until st + cnt).map { - index => getIndexedKey(name, index) - }.reverse //read backwards + index => getIndexedKey(name, (size - 1) - index) + } //read backwards val all: JMap[Array[Byte], Versioned[Array[Byte]]] = vectorClient.getAll(JavaConversions.asIterable(seq)) @@ -200,18 +206,22 @@ MapStorageBackend[Array[Byte], Array[Byte]] with def getVectorStorageEntryFor(name: String, index: Int): Array[Byte] = { val size = getVectorStorageSizeFor(name) - if (size > 0) { + if (size > 0 && index < size) { vectorClient.getValue(getIndexedKey(name, /*read backwards*/ (size - 1) - index)) } else { - Array.empty[Byte] //is this what to return? + throw new StorageException("In Vector:" + name + " No such Index:" + index) } } def updateVectorStorageEntryFor(name: String, index: Int, elem: Array[Byte]) = { val size = getVectorStorageSizeFor(name) - vectorClient.put(getIndexedKey(name, /*read backwards*/ (size - 1) - index), elem) - if (size < index + 1) { - vectorClient.put(getKey(name, vectorSizeIndex), IntSerializer.toBytes(index + 1)) + if (size > 0 && index < size) { + elem match { + case null => vectorClient.delete(getIndexedKey(name, /*read backwards*/ (size - 1) - index)) + case _ => vectorClient.put(getIndexedKey(name, /*read backwards*/ (size - 1) - index), elem) + } + } else { + throw new StorageException("In Vector:" + name + " No such Index:" + index) } } @@ -219,7 +229,9 @@ MapStorageBackend[Array[Byte], Array[Byte]] with var size = getVectorStorageSizeFor(name) elements.foreach { element => - vectorClient.put(getIndexedKey(name, size), element) + if (element != null) { + vectorClient.put(getIndexedKey(name, size), element) + } size += 1 } vectorClient.put(getKey(name, vectorSizeIndex), IntSerializer.toBytes(size)) @@ -281,7 +293,10 @@ MapStorageBackend[Array[Byte], Array[Byte]] with val mdata = getQueueMetadata(name) if (mdata.canEnqueue) { val key = getIndexedKey(name, mdata.tail) - queueClient.put(key, item) + item match { + case null => queueClient.delete(key) + case _ => queueClient.put(key, item) + } queueClient.put(getKey(name, queueTailIndex), IntSerializer.toBytes(mdata.nextEnqueue)) Some(mdata.size + 1) } else { @@ -344,6 +359,32 @@ MapStorageBackend[Array[Byte], Array[Byte]] with mapkey } + //wrapper for null + def getStoredMapValue(value: Array[Byte]): Array[Byte] = { + value match { + case null => nullMapValue + case value => { + val stored = new Array[Byte](value.length + 1) + stored(0) = notNullMapValueHeader + System.arraycopy(value, 0, stored, 1, value.length) + stored + } + } + } + + def getMapValueFromStored(value: Array[Byte]): Array[Byte] = { + + if (value(0) == nullMapValueHeader) { + null + } else if (value(0) == notNullMapValueHeader) { + val returned = new Array[Byte](value.length - 1) + System.arraycopy(value, 1, returned, 0, value.length - 1) + returned + } else { + throw new StorageException("unknown header byte on map value:" + value(0)) + } + } + def getClientConfig(configMap: Map[String, String]): Properties = { val properites = new Properties diff --git a/akka-persistence/akka-persistence-voldemort/src/test/scala/VoldemortPersistentDatastructureSuite.scala b/akka-persistence/akka-persistence-voldemort/src/test/scala/VoldemortPersistentDatastructureSuite.scala deleted file mode 100644 index b283cad692..0000000000 --- a/akka-persistence/akka-persistence-voldemort/src/test/scala/VoldemortPersistentDatastructureSuite.scala +++ /dev/null @@ -1,101 +0,0 @@ -package se.scalablesolutions.akka.persistence.voldemort - -import org.scalatest.FunSuite -import org.scalatest.matchers.ShouldMatchers -import org.junit.runner.RunWith -import org.scalatest.junit.JUnitRunner -import se.scalablesolutions.akka.persistence.voldemort.VoldemortStorageBackend._ -import se.scalablesolutions.akka.actor.{newUuid, Uuid} -import collection.immutable.TreeSet -import VoldemortStorageBackendSuite._ - -import se.scalablesolutions.akka.stm._ -import se.scalablesolutions.akka.stm.global._ -import se.scalablesolutions.akka.config.ScalaConfig._ -import se.scalablesolutions.akka.persistence.common._ -import se.scalablesolutions.akka.util.Logging -import se.scalablesolutions.akka.config.Config.config - -@RunWith(classOf[JUnitRunner]) -class VoldemortPersistentDatastructureSuite extends FunSuite with ShouldMatchers with EmbeddedVoldemort with Logging { - test("persistentRefs work as expected") { - val name = newUuid.toString - val one = "one".getBytes - atomic { - val ref = VoldemortStorage.getRef(name) - ref.isDefined should be(false) - ref.swap(one) - ref.get match { - case Some(bytes) => bytes should be(one) - case None => true should be(false) - } - } - val two = "two".getBytes - atomic { - val ref = VoldemortStorage.getRef(name) - ref.isDefined should be(true) - ref.swap(two) - ref.get match { - case Some(bytes) => bytes should be(two) - case None => true should be(false) - } - } - } - - - test("Persistent Vectors function as expected") { - val name = newUuid.toString - val one = "one".getBytes - val two = "two".getBytes - atomic { - val vec = VoldemortStorage.getVector(name) - vec.add(one) - } - atomic { - val vec = VoldemortStorage.getVector(name) - vec.size should be(1) - vec.add(two) - } - atomic { - val vec = VoldemortStorage.getVector(name) - - vec.get(0) should be(one) - vec.get(1) should be(two) - vec.size should be(2) - vec.update(0, two) - } - - atomic { - val vec = VoldemortStorage.getVector(name) - vec.get(0) should be(two) - vec.get(1) should be(two) - vec.size should be(2) - vec.update(0, Array.empty[Byte]) - vec.update(1, Array.empty[Byte]) - } - - atomic { - val vec = VoldemortStorage.getVector(name) - vec.get(0) should be(Array.empty[Byte]) - vec.get(1) should be(Array.empty[Byte]) - vec.size should be(2) - } - - - } - - test("Persistent Maps work as expected") { - atomic { - val map = VoldemortStorage.getMap("map") - map.put("mapTest".getBytes, null) - } - - atomic { - val map = VoldemortStorage.getMap("map") - map.get("mapTest".getBytes).get should be(null) - } - - - } - -} \ No newline at end of file diff --git a/akka-persistence/akka-persistence-voldemort/src/test/scala/VoldemortStorageBackendSuite.scala b/akka-persistence/akka-persistence-voldemort/src/test/scala/VoldemortStorageBackendSuite.scala index 8ac3d306c4..b28ea90171 100644 --- a/akka-persistence/akka-persistence-voldemort/src/test/scala/VoldemortStorageBackendSuite.scala +++ b/akka-persistence/akka-persistence-voldemort/src/test/scala/VoldemortStorageBackendSuite.scala @@ -103,10 +103,7 @@ class VoldemortStorageBackendSuite extends FunSuite with ShouldMatchers with Emb vectorClient.delete(getKey(key, vectorSizeIndex)) vectorClient.delete(getIndexedKey(key, 0)) vectorClient.delete(getIndexedKey(key, 1)) - getVectorStorageEntryFor(key, 0) should be(empty) - getVectorStorageEntryFor(key, 1) should be(empty) - getVectorStorageRangeFor(key, None, None, 1).head should be(empty) - + insertVectorStorageEntryFor(key, value) //again insertVectorStorageEntryFor(key, value)