From 49efebc942e9426cf1b387ce90c60b7c4d3e0bdb Mon Sep 17 00:00:00 2001 From: ticktock Date: Fri, 24 Sep 2010 19:38:28 -0400 Subject: [PATCH] Refactor to utilize only one voldemort store per datastructure type --- .../main/scala/VoldemortStorageBackend.scala | 54 ++++++++++--------- .../src/test/resources/config/stores.xml | 19 +------ .../scala/VoldemortStorageBackendSuite.scala | 10 ++-- config/akka-reference.conf | 7 ++- 4 files changed, 37 insertions(+), 53 deletions(-) diff --git a/akka-persistence/akka-persistence-voldemort/src/main/scala/VoldemortStorageBackend.scala b/akka-persistence/akka-persistence-voldemort/src/main/scala/VoldemortStorageBackend.scala index b5c1023970..e08c45d159 100644 --- a/akka-persistence/akka-persistence-voldemort/src/main/scala/VoldemortStorageBackend.scala +++ b/akka-persistence/akka-persistence-voldemort/src/main/scala/VoldemortStorageBackend.scala @@ -33,23 +33,22 @@ MapStorageBackend[Array[Byte], Array[Byte]] with case None => getClientConfig(new HashMap[String, String] + (bootstrapUrlsProp -> "tcp://localhost:6666")) } val refStore = config.getString("akka.storage.voldemort.store.ref", "Refs") - val mapKeyStore = config.getString("akka.storage.voldemort.store.map-key", "MapKeys") - val mapValueStore = config.getString("akka.storage.voldemort.store.map-value", "MapValues") + val mapStore = config.getString("akka.storage.voldemort.store.map", "Maps") val vectorStore = config.getString("akka.storage.voldemort.store.vector", "Vectors") val queueStore = config.getString("akka.storage.voldemort.store.queue", "Queues") var storeClientFactory: StoreClientFactory = null var refClient: StoreClient[String, Array[Byte]] = null - var mapKeyClient: StoreClient[String, Array[Byte]] = null - var mapValueClient: StoreClient[Array[Byte], Array[Byte]] = null + var mapClient: StoreClient[Array[Byte], Array[Byte]] = null var vectorClient: StoreClient[Array[Byte], Array[Byte]] = null var queueClient: StoreClient[Array[Byte], Array[Byte]] = null initStoreClients val underscoreBytesUTF8 = "_".getBytes("UTF-8") - val vectorSizeIndex = -1 - val queueHeadIndex = -1 - val queueTailIndex = -2 + val mapKeysIndex = getIndexedBytes(-1) + val vectorSizeIndex = getIndexedBytes(-1) + val queueHeadIndex = getIndexedBytes(-1) + val queueTailIndex = getIndexedBytes(-2) case class QueueMetadata(head: Int, tail: Int) { def size = tail - head //worry about wrapping etc @@ -85,7 +84,7 @@ MapStorageBackend[Array[Byte], Array[Byte]] with private def getKeyValues(name: String, keys: SortedSet[Array[Byte]]): List[(Array[Byte], Array[Byte])] = { val all: JMap[Array[Byte], Versioned[Array[Byte]]] = - mapValueClient.getAll(JavaConversions.asIterable(keys.map { + mapClient.getAll(JavaConversions.asIterable(keys.map { mapKey => getKey(name, mapKey) })) @@ -108,7 +107,7 @@ MapStorageBackend[Array[Byte], Array[Byte]] with } def getMapStorageEntryFor(name: String, key: Array[Byte]): Option[Array[Byte]] = { - val result: Array[Byte] = mapValueClient.getValue(getKey(name, key)) + val result: Array[Byte] = mapClient.getValue(getKey(name, key)) result match { case null => None case _ => Some(result) @@ -119,7 +118,7 @@ MapStorageBackend[Array[Byte], Array[Byte]] with var keys = getMapKeys(name) keys -= key putMapKeys(name, keys) - mapValueClient.delete(getKey(name, key)) + mapClient.delete(getKey(name, key)) } @@ -127,13 +126,13 @@ MapStorageBackend[Array[Byte], Array[Byte]] with val keys = getMapKeys(name) keys.foreach { key => - mapValueClient.delete(getKey(name, key)) + mapClient.delete(getKey(name, key)) } - mapKeyClient.delete(name) + mapClient.delete(getKey(name, mapKeysIndex)) } def insertMapStorageEntryFor(name: String, key: Array[Byte], value: Array[Byte]) = { - mapValueClient.put(getKey(name, key), value) + mapClient.put(getKey(name, key), value) var keys = getMapKeys(name) keys += key putMapKeys(name, keys) @@ -142,7 +141,7 @@ MapStorageBackend[Array[Byte], Array[Byte]] with def insertMapStorageEntriesFor(name: String, entries: List[(Array[Byte], Array[Byte])]) = { val newKeys = entries.map { case (key, value) => { - mapValueClient.put(getKey(name, key), value) + mapClient.put(getKey(name, key), value) key } } @@ -152,16 +151,16 @@ MapStorageBackend[Array[Byte], Array[Byte]] with } def putMapKeys(name: String, keys: SortedSet[Array[Byte]]) = { - mapKeyClient.put(name, SortedSetSerializer.toBytes(keys)) + mapClient.put(getKey(name, mapKeysIndex), SortedSetSerializer.toBytes(keys)) } def getMapKeys(name: String): SortedSet[Array[Byte]] = { - SortedSetSerializer.fromBytes(mapKeyClient.getValue(name, Array.empty[Byte])) + SortedSetSerializer.fromBytes(mapClient.getValue(getKey(name, mapKeysIndex), Array.empty[Byte])) } def getVectorStorageSizeFor(name: String): Int = { - IntSerializer.fromBytes(vectorClient.getValue(getIndexedKey(name, vectorSizeIndex), IntSerializer.toBytes(0))) + IntSerializer.fromBytes(vectorClient.getValue(getKey(name, vectorSizeIndex), IntSerializer.toBytes(0))) } @@ -205,7 +204,7 @@ MapStorageBackend[Array[Byte], Array[Byte]] with val size = getVectorStorageSizeFor(name) vectorClient.put(getIndexedKey(name, index), elem) if (size < index + 1) { - vectorClient.put(getIndexedKey(name, vectorSizeIndex), IntSerializer.toBytes(index + 1)) + vectorClient.put(getKey(name, vectorSizeIndex), IntSerializer.toBytes(index + 1)) } } @@ -216,7 +215,7 @@ MapStorageBackend[Array[Byte], Array[Byte]] with vectorClient.put(getIndexedKey(name, size), element) size += 1 } - vectorClient.put(getIndexedKey(name, vectorSizeIndex), IntSerializer.toBytes(size)) + vectorClient.put(getKey(name, vectorSizeIndex), IntSerializer.toBytes(size)) } def insertVectorStorageEntryFor(name: String, element: Array[Byte]) = { @@ -244,13 +243,13 @@ MapStorageBackend[Array[Byte], Array[Byte]] with val mdata = getQueueMetadata(name) val key = getIndexedKey(name, mdata.tail) queueClient.put(key, item) - queueClient.put(getIndexedKey(name, queueTailIndex), IntSerializer.toBytes(mdata.tail + 1)) - Some (mdata.size + 1) + queueClient.put(getKey(name, queueTailIndex), IntSerializer.toBytes(mdata.tail + 1)) + Some(mdata.size + 1) } def getQueueMetadata(name: String): QueueMetadata = { - val keys = List(getIndexedKey(name, queueHeadIndex), getIndexedKey(name, queueTailIndex)) + val keys = List(getKey(name, queueHeadIndex), getKey(name, queueTailIndex)) val qdata = JavaConversions.asMap(queueClient.getAll(JavaConversions.asIterable(keys))) val values = keys.map { qdata.get(_) match { @@ -278,12 +277,16 @@ MapStorageBackend[Array[Byte], Array[Byte]] with theKey } - def getIndexedKey(owner: String, index: Int): Array[Byte] = { + def getIndexedBytes(index: Int): Array[Byte] = { val indexbytes = IntSerializer.toBytes(index) val theIndexKey = new Array[Byte](underscoreBytesUTF8.length + indexbytes.length) System.arraycopy(underscoreBytesUTF8, 0, theIndexKey, 0, underscoreBytesUTF8.length) System.arraycopy(indexbytes, 0, theIndexKey, underscoreBytesUTF8.length, indexbytes.length) - getKey(owner, theIndexKey) + theIndexKey + } + + def getIndexedKey(owner: String, index: Int): Array[Byte] = { + getKey(owner, getIndexedBytes(index)) } def getIndexFromVectorValueKey(owner: String, key: Array[Byte]): Int = { @@ -318,8 +321,7 @@ MapStorageBackend[Array[Byte], Array[Byte]] with } } refClient = storeClientFactory.getStoreClient(refStore) - mapKeyClient = storeClientFactory.getStoreClient(mapKeyStore) - mapValueClient = storeClientFactory.getStoreClient(mapValueStore) + mapClient = storeClientFactory.getStoreClient(mapStore) vectorClient = storeClientFactory.getStoreClient(vectorStore) queueClient = storeClientFactory.getStoreClient(queueStore) } diff --git a/akka-persistence/akka-persistence-voldemort/src/test/resources/config/stores.xml b/akka-persistence/akka-persistence-voldemort/src/test/resources/config/stores.xml index de666a219f..203ac20479 100644 --- a/akka-persistence/akka-persistence-voldemort/src/test/resources/config/stores.xml +++ b/akka-persistence/akka-persistence-voldemort/src/test/resources/config/stores.xml @@ -17,7 +17,7 @@ - MapValues + Maps 1 1 1 @@ -32,23 +32,6 @@ identity - - MapKeys - 1 - 1 - 1 - 1 - 1 - memory - client - - string - utf8 - - - identity - - Vectors 1 diff --git a/akka-persistence/akka-persistence-voldemort/src/test/scala/VoldemortStorageBackendSuite.scala b/akka-persistence/akka-persistence-voldemort/src/test/scala/VoldemortStorageBackendSuite.scala index 5f27771bae..613181cbd2 100644 --- a/akka-persistence/akka-persistence-voldemort/src/test/scala/VoldemortStorageBackendSuite.scala +++ b/akka-persistence/akka-persistence-voldemort/src/test/scala/VoldemortStorageBackendSuite.scala @@ -34,8 +34,8 @@ class VoldemortStorageBackendSuite extends FunSuite with ShouldMatchers with Emb test("that map key storage and retrieval works") { val key = "testmapKey" val mapKeys = new TreeSet[Array[Byte]] + bytes("key1") - mapKeyClient.delete(key) - mapKeyClient.getValue(key, SortedSetSerializer.toBytes(emptySet)) should equal(SortedSetSerializer.toBytes(emptySet)) + mapClient.delete(getKey(key, mapKeysIndex)) + mapClient.getValue(getKey(key, mapKeysIndex), SortedSetSerializer.toBytes(emptySet)) should equal(SortedSetSerializer.toBytes(emptySet)) putMapKeys(key, mapKeys) getMapKeys(key) should equal(mapKeys) } @@ -43,8 +43,8 @@ class VoldemortStorageBackendSuite extends FunSuite with ShouldMatchers with Emb test("that map value storage and retrieval works") { val key = bytes("keyForTestingMapValueClient") val value = bytes("value for testing map value client") - mapValueClient.put(key, value) - mapValueClient.getValue(key, empty) should equal(value) + mapClient.put(key, value) + mapClient.getValue(key, empty) should equal(value) } @@ -99,7 +99,7 @@ class VoldemortStorageBackendSuite extends FunSuite with ShouldMatchers with Emb val key = "vectorApiKey" val value = bytes("Some bytes we want to store in a vector") val updatedValue = bytes("Some updated bytes we want to store in a vector") - vectorClient.delete(getIndexedKey(key, vectorSizeIndex)) + vectorClient.delete(getKey(key, vectorSizeIndex)) vectorClient.delete(getIndexedKey(key, 0)) vectorClient.delete(getIndexedKey(key, 1)) getVectorStorageEntryFor(key, 0) should be(empty) diff --git a/config/akka-reference.conf b/config/akka-reference.conf index a8bfcb332d..4b510a960e 100644 --- a/config/akka-reference.conf +++ b/config/akka-reference.conf @@ -173,13 +173,12 @@ akka { voldemort { store { ref = "Refs" # Voldemort Store Used to Persist Refs. Use string serializer for keys, identity serializer for values - map-keys = "MapKeys" # Voldemort Store Used to Persist Map Keys. Use string serializer for keys, identity serializer for values - map-values = "MapValues" # Voldemort Store Used to Persist Map Values. Use identity serializer for keys, identity serializer for values - vector = "Vectors" # Voldemort Store Used to Persist Vector Sizes. Use string serializer for keys, identity serializer for values + maps = "Maps" # Voldemort Store Used to Persist Map Keys. Use identity serializer for keys, identity serializer for values + vector = "Vectors" # Voldemort Store Used to Persist Vector Sizes. Use identity serializer for keys, identity serializer for values queue = "Queues" # Voldemort Store Used to Persist Vector Values. Use identity serializer for keys, identity serializer for values } - client { # The KeyValue pairs under client are converted to java Properties and used to construct the ClientConfig + client { # The KeyValue pairs under client are converted to java Properties and used to construct the Voldemort ClientConfig bootstrap_urls = "tcp://localhost:6666" # All Valid Voldemort Client properties are valid here, in string form } }