diff --git a/akka-persistence/akka-persistence-voldemort/src/main/scala/VoldemortStorageBackend.scala b/akka-persistence/akka-persistence-voldemort/src/main/scala/VoldemortStorageBackend.scala
index b5c1023970..e08c45d159 100644
--- a/akka-persistence/akka-persistence-voldemort/src/main/scala/VoldemortStorageBackend.scala
+++ b/akka-persistence/akka-persistence-voldemort/src/main/scala/VoldemortStorageBackend.scala
@@ -33,23 +33,22 @@ MapStorageBackend[Array[Byte], Array[Byte]] with
case None => getClientConfig(new HashMap[String, String] + (bootstrapUrlsProp -> "tcp://localhost:6666"))
}
val refStore = config.getString("akka.storage.voldemort.store.ref", "Refs")
- val mapKeyStore = config.getString("akka.storage.voldemort.store.map-key", "MapKeys")
- val mapValueStore = config.getString("akka.storage.voldemort.store.map-value", "MapValues")
+ val mapStore = config.getString("akka.storage.voldemort.store.map", "Maps")
val vectorStore = config.getString("akka.storage.voldemort.store.vector", "Vectors")
val queueStore = config.getString("akka.storage.voldemort.store.queue", "Queues")
var storeClientFactory: StoreClientFactory = null
var refClient: StoreClient[String, Array[Byte]] = null
- var mapKeyClient: StoreClient[String, Array[Byte]] = null
- var mapValueClient: StoreClient[Array[Byte], Array[Byte]] = null
+ var mapClient: StoreClient[Array[Byte], Array[Byte]] = null
var vectorClient: StoreClient[Array[Byte], Array[Byte]] = null
var queueClient: StoreClient[Array[Byte], Array[Byte]] = null
initStoreClients
val underscoreBytesUTF8 = "_".getBytes("UTF-8")
- val vectorSizeIndex = -1
- val queueHeadIndex = -1
- val queueTailIndex = -2
+ val mapKeysIndex = getIndexedBytes(-1)
+ val vectorSizeIndex = getIndexedBytes(-1)
+ val queueHeadIndex = getIndexedBytes(-1)
+ val queueTailIndex = getIndexedBytes(-2)
case class QueueMetadata(head: Int, tail: Int) {
def size = tail - head
//worry about wrapping etc
@@ -85,7 +84,7 @@ MapStorageBackend[Array[Byte], Array[Byte]] with
private def getKeyValues(name: String, keys: SortedSet[Array[Byte]]): List[(Array[Byte], Array[Byte])] = {
val all: JMap[Array[Byte], Versioned[Array[Byte]]] =
- mapValueClient.getAll(JavaConversions.asIterable(keys.map {
+ mapClient.getAll(JavaConversions.asIterable(keys.map {
mapKey => getKey(name, mapKey)
}))
@@ -108,7 +107,7 @@ MapStorageBackend[Array[Byte], Array[Byte]] with
}
def getMapStorageEntryFor(name: String, key: Array[Byte]): Option[Array[Byte]] = {
- val result: Array[Byte] = mapValueClient.getValue(getKey(name, key))
+ val result: Array[Byte] = mapClient.getValue(getKey(name, key))
result match {
case null => None
case _ => Some(result)
@@ -119,7 +118,7 @@ MapStorageBackend[Array[Byte], Array[Byte]] with
var keys = getMapKeys(name)
keys -= key
putMapKeys(name, keys)
- mapValueClient.delete(getKey(name, key))
+ mapClient.delete(getKey(name, key))
}
@@ -127,13 +126,13 @@ MapStorageBackend[Array[Byte], Array[Byte]] with
val keys = getMapKeys(name)
keys.foreach {
key =>
- mapValueClient.delete(getKey(name, key))
+ mapClient.delete(getKey(name, key))
}
- mapKeyClient.delete(name)
+ mapClient.delete(getKey(name, mapKeysIndex))
}
def insertMapStorageEntryFor(name: String, key: Array[Byte], value: Array[Byte]) = {
- mapValueClient.put(getKey(name, key), value)
+ mapClient.put(getKey(name, key), value)
var keys = getMapKeys(name)
keys += key
putMapKeys(name, keys)
@@ -142,7 +141,7 @@ MapStorageBackend[Array[Byte], Array[Byte]] with
def insertMapStorageEntriesFor(name: String, entries: List[(Array[Byte], Array[Byte])]) = {
val newKeys = entries.map {
case (key, value) => {
- mapValueClient.put(getKey(name, key), value)
+ mapClient.put(getKey(name, key), value)
key
}
}
@@ -152,16 +151,16 @@ MapStorageBackend[Array[Byte], Array[Byte]] with
}
def putMapKeys(name: String, keys: SortedSet[Array[Byte]]) = {
- mapKeyClient.put(name, SortedSetSerializer.toBytes(keys))
+ mapClient.put(getKey(name, mapKeysIndex), SortedSetSerializer.toBytes(keys))
}
def getMapKeys(name: String): SortedSet[Array[Byte]] = {
- SortedSetSerializer.fromBytes(mapKeyClient.getValue(name, Array.empty[Byte]))
+ SortedSetSerializer.fromBytes(mapClient.getValue(getKey(name, mapKeysIndex), Array.empty[Byte]))
}
def getVectorStorageSizeFor(name: String): Int = {
- IntSerializer.fromBytes(vectorClient.getValue(getIndexedKey(name, vectorSizeIndex), IntSerializer.toBytes(0)))
+ IntSerializer.fromBytes(vectorClient.getValue(getKey(name, vectorSizeIndex), IntSerializer.toBytes(0)))
}
@@ -205,7 +204,7 @@ MapStorageBackend[Array[Byte], Array[Byte]] with
val size = getVectorStorageSizeFor(name)
vectorClient.put(getIndexedKey(name, index), elem)
if (size < index + 1) {
- vectorClient.put(getIndexedKey(name, vectorSizeIndex), IntSerializer.toBytes(index + 1))
+ vectorClient.put(getKey(name, vectorSizeIndex), IntSerializer.toBytes(index + 1))
}
}
@@ -216,7 +215,7 @@ MapStorageBackend[Array[Byte], Array[Byte]] with
vectorClient.put(getIndexedKey(name, size), element)
size += 1
}
- vectorClient.put(getIndexedKey(name, vectorSizeIndex), IntSerializer.toBytes(size))
+ vectorClient.put(getKey(name, vectorSizeIndex), IntSerializer.toBytes(size))
}
def insertVectorStorageEntryFor(name: String, element: Array[Byte]) = {
@@ -244,13 +243,13 @@ MapStorageBackend[Array[Byte], Array[Byte]] with
val mdata = getQueueMetadata(name)
val key = getIndexedKey(name, mdata.tail)
queueClient.put(key, item)
- queueClient.put(getIndexedKey(name, queueTailIndex), IntSerializer.toBytes(mdata.tail + 1))
- Some (mdata.size + 1)
+ queueClient.put(getKey(name, queueTailIndex), IntSerializer.toBytes(mdata.tail + 1))
+ Some(mdata.size + 1)
}
def getQueueMetadata(name: String): QueueMetadata = {
- val keys = List(getIndexedKey(name, queueHeadIndex), getIndexedKey(name, queueTailIndex))
+ val keys = List(getKey(name, queueHeadIndex), getKey(name, queueTailIndex))
val qdata = JavaConversions.asMap(queueClient.getAll(JavaConversions.asIterable(keys)))
val values = keys.map {
qdata.get(_) match {
@@ -278,12 +277,16 @@ MapStorageBackend[Array[Byte], Array[Byte]] with
theKey
}
- def getIndexedKey(owner: String, index: Int): Array[Byte] = {
+ def getIndexedBytes(index: Int): Array[Byte] = {
val indexbytes = IntSerializer.toBytes(index)
val theIndexKey = new Array[Byte](underscoreBytesUTF8.length + indexbytes.length)
System.arraycopy(underscoreBytesUTF8, 0, theIndexKey, 0, underscoreBytesUTF8.length)
System.arraycopy(indexbytes, 0, theIndexKey, underscoreBytesUTF8.length, indexbytes.length)
- getKey(owner, theIndexKey)
+ theIndexKey
+ }
+
+ def getIndexedKey(owner: String, index: Int): Array[Byte] = {
+ getKey(owner, getIndexedBytes(index))
}
def getIndexFromVectorValueKey(owner: String, key: Array[Byte]): Int = {
@@ -318,8 +321,7 @@ MapStorageBackend[Array[Byte], Array[Byte]] with
}
}
refClient = storeClientFactory.getStoreClient(refStore)
- mapKeyClient = storeClientFactory.getStoreClient(mapKeyStore)
- mapValueClient = storeClientFactory.getStoreClient(mapValueStore)
+ mapClient = storeClientFactory.getStoreClient(mapStore)
vectorClient = storeClientFactory.getStoreClient(vectorStore)
queueClient = storeClientFactory.getStoreClient(queueStore)
}
diff --git a/akka-persistence/akka-persistence-voldemort/src/test/resources/config/stores.xml b/akka-persistence/akka-persistence-voldemort/src/test/resources/config/stores.xml
index de666a219f..203ac20479 100644
--- a/akka-persistence/akka-persistence-voldemort/src/test/resources/config/stores.xml
+++ b/akka-persistence/akka-persistence-voldemort/src/test/resources/config/stores.xml
@@ -17,7 +17,7 @@
- MapValues
+ Maps
1
1
1
@@ -32,23 +32,6 @@
identity
-
- MapKeys
- 1
- 1
- 1
- 1
- 1
- memory
- client
-
- string
- utf8
-
-
- identity
-
-
Vectors
1
diff --git a/akka-persistence/akka-persistence-voldemort/src/test/scala/VoldemortStorageBackendSuite.scala b/akka-persistence/akka-persistence-voldemort/src/test/scala/VoldemortStorageBackendSuite.scala
index 5f27771bae..613181cbd2 100644
--- a/akka-persistence/akka-persistence-voldemort/src/test/scala/VoldemortStorageBackendSuite.scala
+++ b/akka-persistence/akka-persistence-voldemort/src/test/scala/VoldemortStorageBackendSuite.scala
@@ -34,8 +34,8 @@ class VoldemortStorageBackendSuite extends FunSuite with ShouldMatchers with Emb
test("that map key storage and retrieval works") {
val key = "testmapKey"
val mapKeys = new TreeSet[Array[Byte]] + bytes("key1")
- mapKeyClient.delete(key)
- mapKeyClient.getValue(key, SortedSetSerializer.toBytes(emptySet)) should equal(SortedSetSerializer.toBytes(emptySet))
+ mapClient.delete(getKey(key, mapKeysIndex))
+ mapClient.getValue(getKey(key, mapKeysIndex), SortedSetSerializer.toBytes(emptySet)) should equal(SortedSetSerializer.toBytes(emptySet))
putMapKeys(key, mapKeys)
getMapKeys(key) should equal(mapKeys)
}
@@ -43,8 +43,8 @@ class VoldemortStorageBackendSuite extends FunSuite with ShouldMatchers with Emb
test("that map value storage and retrieval works") {
val key = bytes("keyForTestingMapValueClient")
val value = bytes("value for testing map value client")
- mapValueClient.put(key, value)
- mapValueClient.getValue(key, empty) should equal(value)
+ mapClient.put(key, value)
+ mapClient.getValue(key, empty) should equal(value)
}
@@ -99,7 +99,7 @@ class VoldemortStorageBackendSuite extends FunSuite with ShouldMatchers with Emb
val key = "vectorApiKey"
val value = bytes("Some bytes we want to store in a vector")
val updatedValue = bytes("Some updated bytes we want to store in a vector")
- vectorClient.delete(getIndexedKey(key, vectorSizeIndex))
+ vectorClient.delete(getKey(key, vectorSizeIndex))
vectorClient.delete(getIndexedKey(key, 0))
vectorClient.delete(getIndexedKey(key, 1))
getVectorStorageEntryFor(key, 0) should be(empty)
diff --git a/config/akka-reference.conf b/config/akka-reference.conf
index a8bfcb332d..4b510a960e 100644
--- a/config/akka-reference.conf
+++ b/config/akka-reference.conf
@@ -173,13 +173,12 @@ akka {
voldemort {
store {
ref = "Refs" # Voldemort Store Used to Persist Refs. Use string serializer for keys, identity serializer for values
- map-keys = "MapKeys" # Voldemort Store Used to Persist Map Keys. Use string serializer for keys, identity serializer for values
- map-values = "MapValues" # Voldemort Store Used to Persist Map Values. Use identity serializer for keys, identity serializer for values
- vector = "Vectors" # Voldemort Store Used to Persist Vector Sizes. Use string serializer for keys, identity serializer for values
+ maps = "Maps" # Voldemort Store Used to Persist Map Keys. Use identity serializer for keys, identity serializer for values
+ vector = "Vectors" # Voldemort Store Used to Persist Vector Sizes. Use identity serializer for keys, identity serializer for values
queue = "Queues" # Voldemort Store Used to Persist Vector Values. Use identity serializer for keys, identity serializer for values
}
- client { # The KeyValue pairs under client are converted to java Properties and used to construct the ClientConfig
+ client { # The KeyValue pairs under client are converted to java Properties and used to construct the Voldemort ClientConfig
bootstrap_urls = "tcp://localhost:6666" # All Valid Voldemort Client properties are valid here, in string form
}
}