Merge remote branch 'ticktock/master'
This commit is contained in:
commit
a17837e4b4
5 changed files with 82 additions and 43 deletions
|
|
@ -25,6 +25,7 @@ private[akka] object VoldemortStorageBackend extends
|
|||
MapStorageBackend[Array[Byte], Array[Byte]] with
|
||||
VectorStorageBackend[Array[Byte]] with
|
||||
RefStorageBackend[Array[Byte]] with
|
||||
QueueStorageBackend[Array[Byte]] with
|
||||
Logging {
|
||||
val bootstrapUrlsProp = "bootstrap_urls"
|
||||
val clientConfig = config.getConfigMap("akka.storage.voldemort.client") match {
|
||||
|
|
@ -34,18 +35,26 @@ MapStorageBackend[Array[Byte], Array[Byte]] with
|
|||
val refStore = config.getString("akka.storage.voldemort.store.ref", "Refs")
|
||||
val mapKeyStore = config.getString("akka.storage.voldemort.store.map-key", "MapKeys")
|
||||
val mapValueStore = config.getString("akka.storage.voldemort.store.map-value", "MapValues")
|
||||
val vectorSizeStore = config.getString("akka.storage.voldemort.store.vector-size", "VectorSizes")
|
||||
val vectorValueStore = config.getString("akka.storage.voldemort.store.vector-value", "VectorValues")
|
||||
val vectorStore = config.getString("akka.storage.voldemort.store.vector", "Vectors")
|
||||
val queueStore = config.getString("akka.storage.voldemort.store.queue", "Queues")
|
||||
|
||||
var storeClientFactory: StoreClientFactory = null
|
||||
var refClient: StoreClient[String, Array[Byte]] = null
|
||||
var mapKeyClient: StoreClient[String, Array[Byte]] = null
|
||||
var mapValueClient: StoreClient[Array[Byte], Array[Byte]] = null
|
||||
var vectorSizeClient: StoreClient[String, Array[Byte]] = null
|
||||
var vectorValueClient: StoreClient[Array[Byte], Array[Byte]] = null
|
||||
var vectorClient: StoreClient[Array[Byte], Array[Byte]] = null
|
||||
var queueClient: StoreClient[Array[Byte], Array[Byte]] = null
|
||||
initStoreClients
|
||||
|
||||
val underscoreBytesUTF8 = "_".getBytes("UTF-8")
|
||||
val vectorSizeIndex = -1
|
||||
val queueHeadIndex = -1
|
||||
val queueTailIndex = -2
|
||||
case class QueueMetadata(head: Int, tail: Int) {
|
||||
def size = tail - head
|
||||
//worry about wrapping etc
|
||||
}
|
||||
|
||||
implicit val byteOrder = new Ordering[Array[Byte]] {
|
||||
override def compare(x: Array[Byte], y: Array[Byte]) = ByteUtils.compare(x, y)
|
||||
}
|
||||
|
|
@ -152,7 +161,7 @@ MapStorageBackend[Array[Byte], Array[Byte]] with
|
|||
|
||||
|
||||
def getVectorStorageSizeFor(name: String): Int = {
|
||||
IntSerializer.fromBytes(vectorSizeClient.getValue(name, IntSerializer.toBytes(0)))
|
||||
IntSerializer.fromBytes(vectorClient.getValue(getIndexedKey(name, vectorSizeIndex), IntSerializer.toBytes(0)))
|
||||
}
|
||||
|
||||
|
||||
|
|
@ -167,10 +176,10 @@ MapStorageBackend[Array[Byte], Array[Byte]] with
|
|||
count
|
||||
}
|
||||
val seq: IndexedSeq[Array[Byte]] = (st until st + cnt).map {
|
||||
index => getVectorValueKey(name, index)
|
||||
index => getIndexedKey(name, index)
|
||||
}
|
||||
|
||||
val all: JMap[Array[Byte], Versioned[Array[Byte]]] = vectorValueClient.getAll(JavaConversions.asIterable(seq))
|
||||
val all: JMap[Array[Byte], Versioned[Array[Byte]]] = vectorClient.getAll(JavaConversions.asIterable(seq))
|
||||
|
||||
var storage = new ArrayBuffer[Array[Byte]](seq.size)
|
||||
storage = storage.padTo(seq.size, Array.empty[Byte])
|
||||
|
|
@ -189,14 +198,14 @@ MapStorageBackend[Array[Byte], Array[Byte]] with
|
|||
|
||||
|
||||
def getVectorStorageEntryFor(name: String, index: Int): Array[Byte] = {
|
||||
vectorValueClient.getValue(getVectorValueKey(name, index), Array.empty[Byte])
|
||||
vectorClient.getValue(getIndexedKey(name, index), Array.empty[Byte])
|
||||
}
|
||||
|
||||
def updateVectorStorageEntryFor(name: String, index: Int, elem: Array[Byte]) = {
|
||||
val size = getVectorStorageSizeFor(name)
|
||||
vectorValueClient.put(getVectorValueKey(name, index), elem)
|
||||
vectorClient.put(getIndexedKey(name, index), elem)
|
||||
if (size < index + 1) {
|
||||
vectorSizeClient.put(name, IntSerializer.toBytes(index + 1))
|
||||
vectorClient.put(getIndexedKey(name, vectorSizeIndex), IntSerializer.toBytes(index + 1))
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -204,10 +213,10 @@ MapStorageBackend[Array[Byte], Array[Byte]] with
|
|||
var size = getVectorStorageSizeFor(name)
|
||||
elements.foreach {
|
||||
element =>
|
||||
vectorValueClient.put(getVectorValueKey(name, size), element)
|
||||
vectorClient.put(getIndexedKey(name, size), element)
|
||||
size += 1
|
||||
}
|
||||
vectorSizeClient.put(name, IntSerializer.toBytes(size))
|
||||
vectorClient.put(getIndexedKey(name, vectorSizeIndex), IntSerializer.toBytes(size))
|
||||
}
|
||||
|
||||
def insertVectorStorageEntryFor(name: String, element: Array[Byte]) = {
|
||||
|
|
@ -215,11 +224,50 @@ MapStorageBackend[Array[Byte], Array[Byte]] with
|
|||
}
|
||||
|
||||
|
||||
def remove(name: String): Boolean = {
|
||||
false
|
||||
}
|
||||
|
||||
def peek(name: String, start: Int, count: Int): List[Array[Byte]] = {
|
||||
List(Array.empty[Byte])
|
||||
}
|
||||
|
||||
def size(name: String): Int = {
|
||||
getQueueMetadata(name).size
|
||||
}
|
||||
|
||||
def dequeue(name: String): Option[Array[Byte]] = {
|
||||
None
|
||||
}
|
||||
|
||||
def enqueue(name: String, item: Array[Byte]): Option[Int] = {
|
||||
val mdata = getQueueMetadata(name)
|
||||
val key = getIndexedKey(name, mdata.tail)
|
||||
queueClient.put(key, item)
|
||||
queueClient.put(getIndexedKey(name, queueTailIndex), IntSerializer.toBytes(mdata.tail + 1))
|
||||
Some (mdata.size + 1)
|
||||
}
|
||||
|
||||
|
||||
def getQueueMetadata(name: String): QueueMetadata = {
|
||||
val keys = List(getIndexedKey(name, queueHeadIndex), getIndexedKey(name, queueTailIndex))
|
||||
val qdata = JavaConversions.asMap(queueClient.getAll(JavaConversions.asIterable(keys)))
|
||||
val values = keys.map {
|
||||
qdata.get(_) match {
|
||||
case Some(versioned) => IntSerializer.fromBytes(versioned.getValue)
|
||||
case None => 0
|
||||
}
|
||||
}
|
||||
QueueMetadata(values.head, values.tail.head)
|
||||
}
|
||||
|
||||
/**
|
||||
* Concat the ownerlenght+owner+key+ of owner so owned data will be colocated
|
||||
* Store the length of owner as first byte to work around the rare case
|
||||
* where ownerbytes1 + keybytes1 == ownerbytes2 + keybytes2 but ownerbytes1 != ownerbytes2
|
||||
*/
|
||||
|
||||
|
||||
def getKey(owner: String, key: Array[Byte]): Array[Byte] = {
|
||||
val ownerBytes: Array[Byte] = owner.getBytes("UTF-8")
|
||||
val ownerLenghtBytes: Array[Byte] = IntSerializer.toBytes(owner.length)
|
||||
|
|
@ -230,7 +278,7 @@ MapStorageBackend[Array[Byte], Array[Byte]] with
|
|||
theKey
|
||||
}
|
||||
|
||||
def getVectorValueKey(owner: String, index: Int): Array[Byte] = {
|
||||
def getIndexedKey(owner: String, index: Int): Array[Byte] = {
|
||||
val indexbytes = IntSerializer.toBytes(index)
|
||||
val theIndexKey = new Array[Byte](underscoreBytesUTF8.length + indexbytes.length)
|
||||
System.arraycopy(underscoreBytesUTF8, 0, theIndexKey, 0, underscoreBytesUTF8.length)
|
||||
|
|
@ -272,8 +320,8 @@ MapStorageBackend[Array[Byte], Array[Byte]] with
|
|||
refClient = storeClientFactory.getStoreClient(refStore)
|
||||
mapKeyClient = storeClientFactory.getStoreClient(mapKeyStore)
|
||||
mapValueClient = storeClientFactory.getStoreClient(mapValueStore)
|
||||
vectorSizeClient = storeClientFactory.getStoreClient(vectorSizeStore)
|
||||
vectorValueClient = storeClientFactory.getStoreClient(vectorValueStore)
|
||||
vectorClient = storeClientFactory.getStoreClient(vectorStore)
|
||||
queueClient = storeClientFactory.getStoreClient(queueStore)
|
||||
}
|
||||
|
||||
object IntSerializer {
|
||||
|
|
|
|||
|
|
@ -15,7 +15,7 @@
|
|||
<value-serializer>
|
||||
<type>identity</type>
|
||||
</value-serializer>
|
||||
</store>
|
||||
</store>
|
||||
<store>
|
||||
<name>MapValues</name>
|
||||
<replication-factor>1</replication-factor>
|
||||
|
|
@ -33,7 +33,7 @@
|
|||
</value-serializer>
|
||||
</store>
|
||||
<store>
|
||||
<name>MapKeys</name>
|
||||
<name>MapKeys</name>
|
||||
<replication-factor>1</replication-factor>
|
||||
<preferred-reads>1</preferred-reads>
|
||||
<required-reads>1</required-reads>
|
||||
|
|
@ -50,7 +50,7 @@
|
|||
</value-serializer>
|
||||
</store>
|
||||
<store>
|
||||
<name>VectorValues</name>
|
||||
<name>Vectors</name>
|
||||
<replication-factor>1</replication-factor>
|
||||
<preferred-reads>1</preferred-reads>
|
||||
<required-reads>1</required-reads>
|
||||
|
|
@ -66,7 +66,7 @@
|
|||
</value-serializer>
|
||||
</store>
|
||||
<store>
|
||||
<name>VectorSizes</name>
|
||||
<name>Queues</name>
|
||||
<replication-factor>1</replication-factor>
|
||||
<preferred-reads>1</preferred-reads>
|
||||
<required-reads>1</required-reads>
|
||||
|
|
@ -75,11 +75,11 @@
|
|||
<persistence>memory</persistence>
|
||||
<routing>client</routing>
|
||||
<key-serializer>
|
||||
<type>string</type>
|
||||
<schema-info>utf8</schema-info>
|
||||
<type>identity</type>
|
||||
</key-serializer>
|
||||
<value-serializer>
|
||||
<type>identity</type>
|
||||
</value-serializer>
|
||||
</store>
|
||||
|
||||
</stores>
|
||||
|
|
@ -108,12 +108,11 @@ Spec with
|
|||
override def beforeEach {
|
||||
removeMapStorageFor(state)
|
||||
var size = getVectorStorageSizeFor(tx)
|
||||
(0 to size).foreach {
|
||||
(-1 to size).foreach {
|
||||
index => {
|
||||
vectorValueClient.delete(getVectorValueKey(tx, index))
|
||||
vectorClient.delete(getIndexedKey(tx, index))
|
||||
}
|
||||
}
|
||||
vectorSizeClient.delete(tx)
|
||||
}
|
||||
|
||||
override def afterEach {
|
||||
|
|
|
|||
|
|
@ -82,34 +82,26 @@ class VoldemortStorageBackendSuite extends FunSuite with ShouldMatchers with Emb
|
|||
|
||||
}
|
||||
|
||||
test("that vector size storage and retrieval works") {
|
||||
val key = "vectorKey"
|
||||
val size = IntSerializer.toBytes(17)
|
||||
vectorSizeClient.delete(key)
|
||||
vectorSizeClient.getValue(key, empty) should equal(empty)
|
||||
vectorSizeClient.put(key, size)
|
||||
vectorSizeClient.getValue(key) should equal(size)
|
||||
}
|
||||
|
||||
test("that vector value storage and retrieval works") {
|
||||
val key = "vectorValueKey"
|
||||
val index = 3
|
||||
val value = bytes("some bytes")
|
||||
val vecKey = getVectorValueKey(key, index)
|
||||
val vecKey = getIndexedKey(key, index)
|
||||
getIndexFromVectorValueKey(key, vecKey) should be(index)
|
||||
vectorValueClient.delete(vecKey)
|
||||
vectorValueClient.getValue(vecKey, empty) should equal(empty)
|
||||
vectorValueClient.put(vecKey, value)
|
||||
vectorValueClient.getValue(vecKey) should equal(value)
|
||||
vectorClient.delete(vecKey)
|
||||
vectorClient.getValue(vecKey, empty) should equal(empty)
|
||||
vectorClient.put(vecKey, value)
|
||||
vectorClient.getValue(vecKey) should equal(value)
|
||||
}
|
||||
|
||||
test("PersistentVector apis function as expected") {
|
||||
val key = "vectorApiKey"
|
||||
val value = bytes("Some bytes we want to store in a vector")
|
||||
val updatedValue = bytes("Some updated bytes we want to store in a vector")
|
||||
vectorSizeClient.delete(key)
|
||||
vectorValueClient.delete(getVectorValueKey(key, 0))
|
||||
vectorValueClient.delete(getVectorValueKey(key, 1))
|
||||
vectorClient.delete(getIndexedKey(key, vectorSizeIndex))
|
||||
vectorClient.delete(getIndexedKey(key, 0))
|
||||
vectorClient.delete(getIndexedKey(key, 1))
|
||||
getVectorStorageEntryFor(key, 0) should be(empty)
|
||||
getVectorStorageEntryFor(key, 1) should be(empty)
|
||||
getVectorStorageRangeFor(key, None, None, 1).head should be(empty)
|
||||
|
|
|
|||
|
|
@ -171,11 +171,11 @@ akka {
|
|||
|
||||
voldemort {
|
||||
store {
|
||||
refs = "Refs" # Voldemort Store Used to Persist Refs. Use string serializer for keys, identity serializer for values
|
||||
ref = "Refs" # Voldemort Store Used to Persist Refs. Use string serializer for keys, identity serializer for values
|
||||
map-keys = "MapKeys" # Voldemort Store Used to Persist Map Keys. Use string serializer for keys, identity serializer for values
|
||||
map-values = "MapValues" # Voldemort Store Used to Persist Map Values. Use identity serializer for keys, identity serializer for values
|
||||
vector-sizes = "VectorSizes" # Voldemort Store Used to Persist Vector Sizes. Use string serializer for keys, identity serializer for values
|
||||
vector-values = "VectorValues" # Voldemort Store Used to Persist Vector Values. Use identity serializer for keys, identity serializer for values
|
||||
vector = "Vectors" # Voldemort Store Used to Persist Vector Sizes. Use string serializer for keys, identity serializer for values
|
||||
queue = "Queues" # Voldemort Store Used to Persist Vector Values. Use identity serializer for keys, identity serializer for values
|
||||
}
|
||||
|
||||
client { # The KeyValue pairs under client are converted to java Properties and used to construct the ClientConfig
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue