Added tests of proper null handling for Ref,Vector,Map,Queue and voldemort impl/tweak
This commit is contained in:
parent
3411ad6be1
commit
97a5c05b53
8 changed files with 112 additions and 127 deletions
|
|
@ -142,8 +142,13 @@ trait MapStorageBackendTest extends Spec with ShouldMatchers with BeforeAndAfter
|
|||
}
|
||||
|
||||
|
||||
it("should return Some(null), not None, for a key that has had the value null set?") {
|
||||
|
||||
it("should return Some(null), not None, for a key that has had the value null set and None for a key with no value set") {
|
||||
val mapName = "nullTest"
|
||||
val key = "key".getBytes
|
||||
storage.insertMapStorageEntryFor(mapName, key, null)
|
||||
storage.getMapStorageEntryFor(mapName, key).get should be(null)
|
||||
storage.removeMapStorageFor(mapName, key)
|
||||
storage.getMapStorageEntryFor(mapName, key) should be(None)
|
||||
}
|
||||
|
||||
it("should not throw an exception when size is called on a non existent map?") {
|
||||
|
|
|
|||
|
|
@ -111,6 +111,13 @@ trait QueueStorageBackendTest extends Spec with ShouldMatchers with BeforeAndAft
|
|||
storage.remove(queue)
|
||||
storage.size(queue) should be(0)
|
||||
}
|
||||
|
||||
it("should accept null as a value to enqueue and return Some(null) when that value is dequeued") {
|
||||
val queue = "nullTest"
|
||||
storage.enqueue(queue, null).get should be(1)
|
||||
storage.dequeue(queue).get should be(null)
|
||||
storage.dequeue(queue) should be(None)
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -41,6 +41,12 @@ trait RefStorageBackendTest extends Spec with ShouldMatchers with BeforeAndAfter
|
|||
val value = name.getBytes
|
||||
storage.getRefStorageFor(name) should be(None)
|
||||
}
|
||||
|
||||
it("Should return None, not Some(null) when getRefStorageFor is called when null has been set") {
|
||||
val name = "RefStorageTest #3"
|
||||
storage.insertRefStorageFor(name, null)
|
||||
storage.getRefStorageFor(name) should be(None)
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -86,14 +86,36 @@ trait VectorStorageBackendTest extends Spec with ShouldMatchers with BeforeAndAf
|
|||
val values = (0 to rand).toList.map {i: Int => vector + "value" + i}
|
||||
storage.insertVectorStorageEntriesFor(vector, values.map {s: String => s.getBytes})
|
||||
values.reverse should be(storage.getVectorStorageRangeFor(vector, None, None, rand + 1).map {b: Array[Byte] => new String(b)})
|
||||
(0 to drand).foreach {
|
||||
i: Int => {
|
||||
val value: String = vector + "value" + (rand - i)
|
||||
log.debug(value)
|
||||
List(value) should be(storage.getVectorStorageRangeFor(vector, Some(i), None, 1).map {b: Array[Byte] => new String(b)})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
it("should behave properly when the range used in getVectorStorageRangeFor has indexes outside the current size of the vector") {
|
||||
//what is proper?
|
||||
}
|
||||
|
||||
it("shoud behave properly when getStorageEntry for a non existent entry?") {
|
||||
it("shoud return null when getStorageEntry is called on a null entry") {
|
||||
//What is proper?
|
||||
val vector = "nullTest"
|
||||
storage.insertVectorStorageEntryFor(vector, null)
|
||||
storage.getVectorStorageEntryFor(vector, 0) should be(null)
|
||||
}
|
||||
|
||||
it("shoud throw a Storage exception when there is an attempt to retrieve an index larger than the Vector") {
|
||||
val vector = "tooLargeRetrieve"
|
||||
storage.insertVectorStorageEntryFor(vector, null)
|
||||
evaluating {storage.getVectorStorageEntryFor(vector, 9)} should produce[StorageException]
|
||||
}
|
||||
|
||||
it("shoud throw a Storage exception when there is an attempt to update an index larger than the Vector") {
|
||||
val vector = "tooLargeUpdate"
|
||||
storage.insertVectorStorageEntryFor(vector, null)
|
||||
evaluating {storage.updateVectorStorageEntryFor(vector, 9, null)} should produce[StorageException]
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -15,14 +15,17 @@ object VoldemortStorage extends Storage {
|
|||
def newMap: PersistentMap[ElementType, ElementType] = newMap(newUuid.toString)
|
||||
def newVector: PersistentVector[ElementType] = newVector(newUuid.toString)
|
||||
def newRef: PersistentRef[ElementType] = newRef(newUuid.toString)
|
||||
override def newQueue: PersistentQueue[ElementType] = newQueue(newUuid.toString)
|
||||
|
||||
def getMap(id: String): PersistentMap[ElementType, ElementType] = newMap(id)
|
||||
def getVector(id: String): PersistentVector[ElementType] = newVector(id)
|
||||
def getRef(id: String): PersistentRef[ElementType] = newRef(id)
|
||||
override def getQueue(id: String): PersistentQueue[ElementType] = newQueue(id)
|
||||
|
||||
def newMap(id: String): PersistentMap[ElementType, ElementType] = new VoldemortPersistentMap(id)
|
||||
def newVector(id: String): PersistentVector[ElementType] = new VoldemortPersistentVector(id)
|
||||
def newRef(id: String): PersistentRef[ElementType] = new VoldemortPersistentRef(id)
|
||||
override def newQueue(id:String): PersistentQueue[ElementType] = new VoldemortPersistentQueue(id)
|
||||
}
|
||||
|
||||
|
||||
|
|
@ -41,3 +44,8 @@ class VoldemortPersistentRef(id: String) extends PersistentRef[Array[Byte]] {
|
|||
val uuid = id
|
||||
val storage = VoldemortStorageBackend
|
||||
}
|
||||
|
||||
class VoldemortPersistentQueue(id: String) extends PersistentQueue[Array[Byte]] {
|
||||
val uuid = id
|
||||
val storage = VoldemortStorageBackend
|
||||
}
|
||||
|
|
|
|||
|
|
@ -50,6 +50,9 @@ MapStorageBackend[Array[Byte], Array[Byte]] with
|
|||
var queueClient: StoreClient[Array[Byte], Array[Byte]] = null
|
||||
initStoreClients
|
||||
|
||||
val nullMapValueHeader = 0x00.byteValue
|
||||
val nullMapValue: Array[Byte] = Array(nullMapValueHeader)
|
||||
val notNullMapValueHeader: Byte = 0xff.byteValue
|
||||
val underscoreBytesUTF8 = "_".getBytes("UTF-8")
|
||||
val mapKeysIndex = getIndexedBytes(-1)
|
||||
val vectorSizeIndex = getIndexedBytes(-1)
|
||||
|
|
@ -61,14 +64,14 @@ MapStorageBackend[Array[Byte], Array[Byte]] with
|
|||
|
||||
def getRefStorageFor(name: String): Option[Array[Byte]] = {
|
||||
val result: Array[Byte] = refClient.getValue(name)
|
||||
result match {
|
||||
case null => None
|
||||
case _ => Some(result)
|
||||
}
|
||||
Option(result)
|
||||
}
|
||||
|
||||
def insertRefStorageFor(name: String, element: Array[Byte]) = {
|
||||
refClient.put(name, element)
|
||||
element match {
|
||||
case null => refClient.delete(name)
|
||||
case _ => refClient.put(name, element)
|
||||
}
|
||||
}
|
||||
|
||||
def getMapStorageRangeFor(name: String, start: Option[Array[Byte]], finish: Option[Array[Byte]], count: Int): List[(Array[Byte], Array[Byte])] = {
|
||||
|
|
@ -93,7 +96,7 @@ MapStorageBackend[Array[Byte], Array[Byte]] with
|
|||
(entry) => {
|
||||
entry match {
|
||||
case (namePlusKey: Array[Byte], versioned: Versioned[Array[Byte]]) => {
|
||||
returned += getMapKeyFromKey(name, namePlusKey) -> versioned.getValue
|
||||
returned += getMapKeyFromKey(name, namePlusKey) -> getMapValueFromStored(versioned.getValue)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -110,7 +113,7 @@ MapStorageBackend[Array[Byte], Array[Byte]] with
|
|||
val result: Array[Byte] = mapClient.getValue(getKey(name, key))
|
||||
result match {
|
||||
case null => None
|
||||
case _ => Some(result)
|
||||
case _ => Some(getMapValueFromStored(result))
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -132,7 +135,7 @@ MapStorageBackend[Array[Byte], Array[Byte]] with
|
|||
}
|
||||
|
||||
def insertMapStorageEntryFor(name: String, key: Array[Byte], value: Array[Byte]) = {
|
||||
mapClient.put(getKey(name, key), value)
|
||||
mapClient.put(getKey(name, key), getStoredMapValue(value))
|
||||
var keys = getMapKeys(name)
|
||||
keys += key
|
||||
putMapKeys(name, keys)
|
||||
|
|
@ -141,7 +144,7 @@ MapStorageBackend[Array[Byte], Array[Byte]] with
|
|||
def insertMapStorageEntriesFor(name: String, entries: List[(Array[Byte], Array[Byte])]) = {
|
||||
val newKeys = entries.map {
|
||||
case (key, value) => {
|
||||
mapClient.put(getKey(name, key), value)
|
||||
mapClient.put(getKey(name, key), getStoredMapValue(value))
|
||||
key
|
||||
}
|
||||
}
|
||||
|
|
@ -167,18 +170,21 @@ MapStorageBackend[Array[Byte], Array[Byte]] with
|
|||
def getVectorStorageRangeFor(name: String, start: Option[Int], finish: Option[Int], count: Int): List[Array[Byte]] = {
|
||||
val size = getVectorStorageSizeFor(name)
|
||||
val st = start.getOrElse(0)
|
||||
val cnt =
|
||||
var cnt =
|
||||
if (finish.isDefined) {
|
||||
val f = finish.get
|
||||
if (f >= st) (f - st) else count
|
||||
} else {
|
||||
count
|
||||
}
|
||||
if (cnt > (size - st)) {
|
||||
cnt = size - st
|
||||
}
|
||||
|
||||
|
||||
val seq: IndexedSeq[Array[Byte]] = (st until st + cnt).map {
|
||||
index => getIndexedKey(name, index)
|
||||
}.reverse //read backwards
|
||||
index => getIndexedKey(name, (size - 1) - index)
|
||||
} //read backwards
|
||||
|
||||
val all: JMap[Array[Byte], Versioned[Array[Byte]]] = vectorClient.getAll(JavaConversions.asIterable(seq))
|
||||
|
||||
|
|
@ -200,18 +206,22 @@ MapStorageBackend[Array[Byte], Array[Byte]] with
|
|||
|
||||
def getVectorStorageEntryFor(name: String, index: Int): Array[Byte] = {
|
||||
val size = getVectorStorageSizeFor(name)
|
||||
if (size > 0) {
|
||||
if (size > 0 && index < size) {
|
||||
vectorClient.getValue(getIndexedKey(name, /*read backwards*/ (size - 1) - index))
|
||||
} else {
|
||||
Array.empty[Byte] //is this what to return?
|
||||
throw new StorageException("In Vector:" + name + " No such Index:" + index)
|
||||
}
|
||||
}
|
||||
|
||||
def updateVectorStorageEntryFor(name: String, index: Int, elem: Array[Byte]) = {
|
||||
val size = getVectorStorageSizeFor(name)
|
||||
vectorClient.put(getIndexedKey(name, /*read backwards*/ (size - 1) - index), elem)
|
||||
if (size < index + 1) {
|
||||
vectorClient.put(getKey(name, vectorSizeIndex), IntSerializer.toBytes(index + 1))
|
||||
if (size > 0 && index < size) {
|
||||
elem match {
|
||||
case null => vectorClient.delete(getIndexedKey(name, /*read backwards*/ (size - 1) - index))
|
||||
case _ => vectorClient.put(getIndexedKey(name, /*read backwards*/ (size - 1) - index), elem)
|
||||
}
|
||||
} else {
|
||||
throw new StorageException("In Vector:" + name + " No such Index:" + index)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -219,7 +229,9 @@ MapStorageBackend[Array[Byte], Array[Byte]] with
|
|||
var size = getVectorStorageSizeFor(name)
|
||||
elements.foreach {
|
||||
element =>
|
||||
vectorClient.put(getIndexedKey(name, size), element)
|
||||
if (element != null) {
|
||||
vectorClient.put(getIndexedKey(name, size), element)
|
||||
}
|
||||
size += 1
|
||||
}
|
||||
vectorClient.put(getKey(name, vectorSizeIndex), IntSerializer.toBytes(size))
|
||||
|
|
@ -281,7 +293,10 @@ MapStorageBackend[Array[Byte], Array[Byte]] with
|
|||
val mdata = getQueueMetadata(name)
|
||||
if (mdata.canEnqueue) {
|
||||
val key = getIndexedKey(name, mdata.tail)
|
||||
queueClient.put(key, item)
|
||||
item match {
|
||||
case null => queueClient.delete(key)
|
||||
case _ => queueClient.put(key, item)
|
||||
}
|
||||
queueClient.put(getKey(name, queueTailIndex), IntSerializer.toBytes(mdata.nextEnqueue))
|
||||
Some(mdata.size + 1)
|
||||
} else {
|
||||
|
|
@ -344,6 +359,32 @@ MapStorageBackend[Array[Byte], Array[Byte]] with
|
|||
mapkey
|
||||
}
|
||||
|
||||
//wrapper for null
|
||||
def getStoredMapValue(value: Array[Byte]): Array[Byte] = {
|
||||
value match {
|
||||
case null => nullMapValue
|
||||
case value => {
|
||||
val stored = new Array[Byte](value.length + 1)
|
||||
stored(0) = notNullMapValueHeader
|
||||
System.arraycopy(value, 0, stored, 1, value.length)
|
||||
stored
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
def getMapValueFromStored(value: Array[Byte]): Array[Byte] = {
|
||||
|
||||
if (value(0) == nullMapValueHeader) {
|
||||
null
|
||||
} else if (value(0) == notNullMapValueHeader) {
|
||||
val returned = new Array[Byte](value.length - 1)
|
||||
System.arraycopy(value, 1, returned, 0, value.length - 1)
|
||||
returned
|
||||
} else {
|
||||
throw new StorageException("unknown header byte on map value:" + value(0))
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
def getClientConfig(configMap: Map[String, String]): Properties = {
|
||||
val properites = new Properties
|
||||
|
|
|
|||
|
|
@ -1,101 +0,0 @@
|
|||
package se.scalablesolutions.akka.persistence.voldemort
|
||||
|
||||
import org.scalatest.FunSuite
|
||||
import org.scalatest.matchers.ShouldMatchers
|
||||
import org.junit.runner.RunWith
|
||||
import org.scalatest.junit.JUnitRunner
|
||||
import se.scalablesolutions.akka.persistence.voldemort.VoldemortStorageBackend._
|
||||
import se.scalablesolutions.akka.actor.{newUuid, Uuid}
|
||||
import collection.immutable.TreeSet
|
||||
import VoldemortStorageBackendSuite._
|
||||
|
||||
import se.scalablesolutions.akka.stm._
|
||||
import se.scalablesolutions.akka.stm.global._
|
||||
import se.scalablesolutions.akka.config.ScalaConfig._
|
||||
import se.scalablesolutions.akka.persistence.common._
|
||||
import se.scalablesolutions.akka.util.Logging
|
||||
import se.scalablesolutions.akka.config.Config.config
|
||||
|
||||
@RunWith(classOf[JUnitRunner])
|
||||
class VoldemortPersistentDatastructureSuite extends FunSuite with ShouldMatchers with EmbeddedVoldemort with Logging {
|
||||
test("persistentRefs work as expected") {
|
||||
val name = newUuid.toString
|
||||
val one = "one".getBytes
|
||||
atomic {
|
||||
val ref = VoldemortStorage.getRef(name)
|
||||
ref.isDefined should be(false)
|
||||
ref.swap(one)
|
||||
ref.get match {
|
||||
case Some(bytes) => bytes should be(one)
|
||||
case None => true should be(false)
|
||||
}
|
||||
}
|
||||
val two = "two".getBytes
|
||||
atomic {
|
||||
val ref = VoldemortStorage.getRef(name)
|
||||
ref.isDefined should be(true)
|
||||
ref.swap(two)
|
||||
ref.get match {
|
||||
case Some(bytes) => bytes should be(two)
|
||||
case None => true should be(false)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
test("Persistent Vectors function as expected") {
|
||||
val name = newUuid.toString
|
||||
val one = "one".getBytes
|
||||
val two = "two".getBytes
|
||||
atomic {
|
||||
val vec = VoldemortStorage.getVector(name)
|
||||
vec.add(one)
|
||||
}
|
||||
atomic {
|
||||
val vec = VoldemortStorage.getVector(name)
|
||||
vec.size should be(1)
|
||||
vec.add(two)
|
||||
}
|
||||
atomic {
|
||||
val vec = VoldemortStorage.getVector(name)
|
||||
|
||||
vec.get(0) should be(one)
|
||||
vec.get(1) should be(two)
|
||||
vec.size should be(2)
|
||||
vec.update(0, two)
|
||||
}
|
||||
|
||||
atomic {
|
||||
val vec = VoldemortStorage.getVector(name)
|
||||
vec.get(0) should be(two)
|
||||
vec.get(1) should be(two)
|
||||
vec.size should be(2)
|
||||
vec.update(0, Array.empty[Byte])
|
||||
vec.update(1, Array.empty[Byte])
|
||||
}
|
||||
|
||||
atomic {
|
||||
val vec = VoldemortStorage.getVector(name)
|
||||
vec.get(0) should be(Array.empty[Byte])
|
||||
vec.get(1) should be(Array.empty[Byte])
|
||||
vec.size should be(2)
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
||||
test("Persistent Maps work as expected") {
|
||||
atomic {
|
||||
val map = VoldemortStorage.getMap("map")
|
||||
map.put("mapTest".getBytes, null)
|
||||
}
|
||||
|
||||
atomic {
|
||||
val map = VoldemortStorage.getMap("map")
|
||||
map.get("mapTest".getBytes).get should be(null)
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -103,10 +103,7 @@ class VoldemortStorageBackendSuite extends FunSuite with ShouldMatchers with Emb
|
|||
vectorClient.delete(getKey(key, vectorSizeIndex))
|
||||
vectorClient.delete(getIndexedKey(key, 0))
|
||||
vectorClient.delete(getIndexedKey(key, 1))
|
||||
getVectorStorageEntryFor(key, 0) should be(empty)
|
||||
getVectorStorageEntryFor(key, 1) should be(empty)
|
||||
getVectorStorageRangeFor(key, None, None, 1).head should be(empty)
|
||||
|
||||
|
||||
insertVectorStorageEntryFor(key, value)
|
||||
//again
|
||||
insertVectorStorageEntryFor(key, value)
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue