Refactor to utilize only one voldemort store per datastructure type
This commit is contained in:
parent
1a5466e075
commit
96c9fecf0d
4 changed files with 37 additions and 53 deletions
|
|
@ -33,23 +33,22 @@ MapStorageBackend[Array[Byte], Array[Byte]] with
|
|||
case None => getClientConfig(new HashMap[String, String] + (bootstrapUrlsProp -> "tcp://localhost:6666"))
|
||||
}
|
||||
val refStore = config.getString("akka.storage.voldemort.store.ref", "Refs")
|
||||
val mapKeyStore = config.getString("akka.storage.voldemort.store.map-key", "MapKeys")
|
||||
val mapValueStore = config.getString("akka.storage.voldemort.store.map-value", "MapValues")
|
||||
val mapStore = config.getString("akka.storage.voldemort.store.map", "Maps")
|
||||
val vectorStore = config.getString("akka.storage.voldemort.store.vector", "Vectors")
|
||||
val queueStore = config.getString("akka.storage.voldemort.store.queue", "Queues")
|
||||
|
||||
var storeClientFactory: StoreClientFactory = null
|
||||
var refClient: StoreClient[String, Array[Byte]] = null
|
||||
var mapKeyClient: StoreClient[String, Array[Byte]] = null
|
||||
var mapValueClient: StoreClient[Array[Byte], Array[Byte]] = null
|
||||
var mapClient: StoreClient[Array[Byte], Array[Byte]] = null
|
||||
var vectorClient: StoreClient[Array[Byte], Array[Byte]] = null
|
||||
var queueClient: StoreClient[Array[Byte], Array[Byte]] = null
|
||||
initStoreClients
|
||||
|
||||
val underscoreBytesUTF8 = "_".getBytes("UTF-8")
|
||||
val vectorSizeIndex = -1
|
||||
val queueHeadIndex = -1
|
||||
val queueTailIndex = -2
|
||||
val mapKeysIndex = getIndexedBytes(-1)
|
||||
val vectorSizeIndex = getIndexedBytes(-1)
|
||||
val queueHeadIndex = getIndexedBytes(-1)
|
||||
val queueTailIndex = getIndexedBytes(-2)
|
||||
case class QueueMetadata(head: Int, tail: Int) {
|
||||
def size = tail - head
|
||||
//worry about wrapping etc
|
||||
|
|
@ -85,7 +84,7 @@ MapStorageBackend[Array[Byte], Array[Byte]] with
|
|||
|
||||
private def getKeyValues(name: String, keys: SortedSet[Array[Byte]]): List[(Array[Byte], Array[Byte])] = {
|
||||
val all: JMap[Array[Byte], Versioned[Array[Byte]]] =
|
||||
mapValueClient.getAll(JavaConversions.asIterable(keys.map {
|
||||
mapClient.getAll(JavaConversions.asIterable(keys.map {
|
||||
mapKey => getKey(name, mapKey)
|
||||
}))
|
||||
|
||||
|
|
@ -108,7 +107,7 @@ MapStorageBackend[Array[Byte], Array[Byte]] with
|
|||
}
|
||||
|
||||
def getMapStorageEntryFor(name: String, key: Array[Byte]): Option[Array[Byte]] = {
|
||||
val result: Array[Byte] = mapValueClient.getValue(getKey(name, key))
|
||||
val result: Array[Byte] = mapClient.getValue(getKey(name, key))
|
||||
result match {
|
||||
case null => None
|
||||
case _ => Some(result)
|
||||
|
|
@ -119,7 +118,7 @@ MapStorageBackend[Array[Byte], Array[Byte]] with
|
|||
var keys = getMapKeys(name)
|
||||
keys -= key
|
||||
putMapKeys(name, keys)
|
||||
mapValueClient.delete(getKey(name, key))
|
||||
mapClient.delete(getKey(name, key))
|
||||
}
|
||||
|
||||
|
||||
|
|
@ -127,13 +126,13 @@ MapStorageBackend[Array[Byte], Array[Byte]] with
|
|||
val keys = getMapKeys(name)
|
||||
keys.foreach {
|
||||
key =>
|
||||
mapValueClient.delete(getKey(name, key))
|
||||
mapClient.delete(getKey(name, key))
|
||||
}
|
||||
mapKeyClient.delete(name)
|
||||
mapClient.delete(getKey(name, mapKeysIndex))
|
||||
}
|
||||
|
||||
def insertMapStorageEntryFor(name: String, key: Array[Byte], value: Array[Byte]) = {
|
||||
mapValueClient.put(getKey(name, key), value)
|
||||
mapClient.put(getKey(name, key), value)
|
||||
var keys = getMapKeys(name)
|
||||
keys += key
|
||||
putMapKeys(name, keys)
|
||||
|
|
@ -142,7 +141,7 @@ MapStorageBackend[Array[Byte], Array[Byte]] with
|
|||
def insertMapStorageEntriesFor(name: String, entries: List[(Array[Byte], Array[Byte])]) = {
|
||||
val newKeys = entries.map {
|
||||
case (key, value) => {
|
||||
mapValueClient.put(getKey(name, key), value)
|
||||
mapClient.put(getKey(name, key), value)
|
||||
key
|
||||
}
|
||||
}
|
||||
|
|
@ -152,16 +151,16 @@ MapStorageBackend[Array[Byte], Array[Byte]] with
|
|||
}
|
||||
|
||||
def putMapKeys(name: String, keys: SortedSet[Array[Byte]]) = {
|
||||
mapKeyClient.put(name, SortedSetSerializer.toBytes(keys))
|
||||
mapClient.put(getKey(name, mapKeysIndex), SortedSetSerializer.toBytes(keys))
|
||||
}
|
||||
|
||||
def getMapKeys(name: String): SortedSet[Array[Byte]] = {
|
||||
SortedSetSerializer.fromBytes(mapKeyClient.getValue(name, Array.empty[Byte]))
|
||||
SortedSetSerializer.fromBytes(mapClient.getValue(getKey(name, mapKeysIndex), Array.empty[Byte]))
|
||||
}
|
||||
|
||||
|
||||
def getVectorStorageSizeFor(name: String): Int = {
|
||||
IntSerializer.fromBytes(vectorClient.getValue(getIndexedKey(name, vectorSizeIndex), IntSerializer.toBytes(0)))
|
||||
IntSerializer.fromBytes(vectorClient.getValue(getKey(name, vectorSizeIndex), IntSerializer.toBytes(0)))
|
||||
}
|
||||
|
||||
|
||||
|
|
@ -205,7 +204,7 @@ MapStorageBackend[Array[Byte], Array[Byte]] with
|
|||
val size = getVectorStorageSizeFor(name)
|
||||
vectorClient.put(getIndexedKey(name, index), elem)
|
||||
if (size < index + 1) {
|
||||
vectorClient.put(getIndexedKey(name, vectorSizeIndex), IntSerializer.toBytes(index + 1))
|
||||
vectorClient.put(getKey(name, vectorSizeIndex), IntSerializer.toBytes(index + 1))
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -216,7 +215,7 @@ MapStorageBackend[Array[Byte], Array[Byte]] with
|
|||
vectorClient.put(getIndexedKey(name, size), element)
|
||||
size += 1
|
||||
}
|
||||
vectorClient.put(getIndexedKey(name, vectorSizeIndex), IntSerializer.toBytes(size))
|
||||
vectorClient.put(getKey(name, vectorSizeIndex), IntSerializer.toBytes(size))
|
||||
}
|
||||
|
||||
def insertVectorStorageEntryFor(name: String, element: Array[Byte]) = {
|
||||
|
|
@ -244,13 +243,13 @@ MapStorageBackend[Array[Byte], Array[Byte]] with
|
|||
val mdata = getQueueMetadata(name)
|
||||
val key = getIndexedKey(name, mdata.tail)
|
||||
queueClient.put(key, item)
|
||||
queueClient.put(getIndexedKey(name, queueTailIndex), IntSerializer.toBytes(mdata.tail + 1))
|
||||
Some (mdata.size + 1)
|
||||
queueClient.put(getKey(name, queueTailIndex), IntSerializer.toBytes(mdata.tail + 1))
|
||||
Some(mdata.size + 1)
|
||||
}
|
||||
|
||||
|
||||
def getQueueMetadata(name: String): QueueMetadata = {
|
||||
val keys = List(getIndexedKey(name, queueHeadIndex), getIndexedKey(name, queueTailIndex))
|
||||
val keys = List(getKey(name, queueHeadIndex), getKey(name, queueTailIndex))
|
||||
val qdata = JavaConversions.asMap(queueClient.getAll(JavaConversions.asIterable(keys)))
|
||||
val values = keys.map {
|
||||
qdata.get(_) match {
|
||||
|
|
@ -278,12 +277,16 @@ MapStorageBackend[Array[Byte], Array[Byte]] with
|
|||
theKey
|
||||
}
|
||||
|
||||
def getIndexedKey(owner: String, index: Int): Array[Byte] = {
|
||||
def getIndexedBytes(index: Int): Array[Byte] = {
|
||||
val indexbytes = IntSerializer.toBytes(index)
|
||||
val theIndexKey = new Array[Byte](underscoreBytesUTF8.length + indexbytes.length)
|
||||
System.arraycopy(underscoreBytesUTF8, 0, theIndexKey, 0, underscoreBytesUTF8.length)
|
||||
System.arraycopy(indexbytes, 0, theIndexKey, underscoreBytesUTF8.length, indexbytes.length)
|
||||
getKey(owner, theIndexKey)
|
||||
theIndexKey
|
||||
}
|
||||
|
||||
def getIndexedKey(owner: String, index: Int): Array[Byte] = {
|
||||
getKey(owner, getIndexedBytes(index))
|
||||
}
|
||||
|
||||
def getIndexFromVectorValueKey(owner: String, key: Array[Byte]): Int = {
|
||||
|
|
@ -318,8 +321,7 @@ MapStorageBackend[Array[Byte], Array[Byte]] with
|
|||
}
|
||||
}
|
||||
refClient = storeClientFactory.getStoreClient(refStore)
|
||||
mapKeyClient = storeClientFactory.getStoreClient(mapKeyStore)
|
||||
mapValueClient = storeClientFactory.getStoreClient(mapValueStore)
|
||||
mapClient = storeClientFactory.getStoreClient(mapStore)
|
||||
vectorClient = storeClientFactory.getStoreClient(vectorStore)
|
||||
queueClient = storeClientFactory.getStoreClient(queueStore)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -17,7 +17,7 @@
|
|||
</value-serializer>
|
||||
</store>
|
||||
<store>
|
||||
<name>MapValues</name>
|
||||
<name>Maps</name>
|
||||
<replication-factor>1</replication-factor>
|
||||
<preferred-reads>1</preferred-reads>
|
||||
<required-reads>1</required-reads>
|
||||
|
|
@ -32,23 +32,6 @@
|
|||
<type>identity</type>
|
||||
</value-serializer>
|
||||
</store>
|
||||
<store>
|
||||
<name>MapKeys</name>
|
||||
<replication-factor>1</replication-factor>
|
||||
<preferred-reads>1</preferred-reads>
|
||||
<required-reads>1</required-reads>
|
||||
<preferred-writes>1</preferred-writes>
|
||||
<required-writes>1</required-writes>
|
||||
<persistence>memory</persistence>
|
||||
<routing>client</routing>
|
||||
<key-serializer>
|
||||
<type>string</type>
|
||||
<schema-info>utf8</schema-info>
|
||||
</key-serializer>
|
||||
<value-serializer>
|
||||
<type>identity</type>
|
||||
</value-serializer>
|
||||
</store>
|
||||
<store>
|
||||
<name>Vectors</name>
|
||||
<replication-factor>1</replication-factor>
|
||||
|
|
|
|||
|
|
@ -34,8 +34,8 @@ class VoldemortStorageBackendSuite extends FunSuite with ShouldMatchers with Emb
|
|||
test("that map key storage and retrieval works") {
|
||||
val key = "testmapKey"
|
||||
val mapKeys = new TreeSet[Array[Byte]] + bytes("key1")
|
||||
mapKeyClient.delete(key)
|
||||
mapKeyClient.getValue(key, SortedSetSerializer.toBytes(emptySet)) should equal(SortedSetSerializer.toBytes(emptySet))
|
||||
mapClient.delete(getKey(key, mapKeysIndex))
|
||||
mapClient.getValue(getKey(key, mapKeysIndex), SortedSetSerializer.toBytes(emptySet)) should equal(SortedSetSerializer.toBytes(emptySet))
|
||||
putMapKeys(key, mapKeys)
|
||||
getMapKeys(key) should equal(mapKeys)
|
||||
}
|
||||
|
|
@ -43,8 +43,8 @@ class VoldemortStorageBackendSuite extends FunSuite with ShouldMatchers with Emb
|
|||
test("that map value storage and retrieval works") {
|
||||
val key = bytes("keyForTestingMapValueClient")
|
||||
val value = bytes("value for testing map value client")
|
||||
mapValueClient.put(key, value)
|
||||
mapValueClient.getValue(key, empty) should equal(value)
|
||||
mapClient.put(key, value)
|
||||
mapClient.getValue(key, empty) should equal(value)
|
||||
}
|
||||
|
||||
|
||||
|
|
@ -99,7 +99,7 @@ class VoldemortStorageBackendSuite extends FunSuite with ShouldMatchers with Emb
|
|||
val key = "vectorApiKey"
|
||||
val value = bytes("Some bytes we want to store in a vector")
|
||||
val updatedValue = bytes("Some updated bytes we want to store in a vector")
|
||||
vectorClient.delete(getIndexedKey(key, vectorSizeIndex))
|
||||
vectorClient.delete(getKey(key, vectorSizeIndex))
|
||||
vectorClient.delete(getIndexedKey(key, 0))
|
||||
vectorClient.delete(getIndexedKey(key, 1))
|
||||
getVectorStorageEntryFor(key, 0) should be(empty)
|
||||
|
|
|
|||
|
|
@ -173,13 +173,12 @@ akka {
|
|||
voldemort {
|
||||
store {
|
||||
ref = "Refs" # Voldemort Store Used to Persist Refs. Use string serializer for keys, identity serializer for values
|
||||
map-keys = "MapKeys" # Voldemort Store Used to Persist Map Keys. Use string serializer for keys, identity serializer for values
|
||||
map-values = "MapValues" # Voldemort Store Used to Persist Map Values. Use identity serializer for keys, identity serializer for values
|
||||
vector = "Vectors" # Voldemort Store Used to Persist Vector Sizes. Use string serializer for keys, identity serializer for values
|
||||
maps = "Maps" # Voldemort Store Used to Persist Map Keys. Use identity serializer for keys, identity serializer for values
|
||||
vector = "Vectors" # Voldemort Store Used to Persist Vector Sizes. Use identity serializer for keys, identity serializer for values
|
||||
queue = "Queues" # Voldemort Store Used to Persist Vector Values. Use identity serializer for keys, identity serializer for values
|
||||
}
|
||||
|
||||
client { # The KeyValue pairs under client are converted to java Properties and used to construct the ClientConfig
|
||||
client { # The KeyValue pairs under client are converted to java Properties and used to construct the Voldemort ClientConfig
|
||||
bootstrap_urls = "tcp://localhost:6666" # All Valid Voldemort Client properties are valid here, in string form
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue