From ffcd6b3fcd9d1a9a0fb4770c6584cb1098245915 Mon Sep 17 00:00:00 2001 From: ticktock Date: Thu, 23 Sep 2010 21:14:36 -0400 Subject: [PATCH 1/2] Refactoring Vector to only use 1 voldemort store, and setting up for implementing Queue --- .../main/scala/VoldemortStorageBackend.scala | 37 +++++++++++-------- .../src/test/resources/config/stores.xml | 12 +++--- .../scala/VoldemortPersistentActorSuite.scala | 5 +-- .../scala/VoldemortStorageBackendSuite.scala | 24 ++++-------- config/akka-reference.conf | 6 +-- 5 files changed, 41 insertions(+), 43 deletions(-) diff --git a/akka-persistence/akka-persistence-voldemort/src/main/scala/VoldemortStorageBackend.scala b/akka-persistence/akka-persistence-voldemort/src/main/scala/VoldemortStorageBackend.scala index 83b74a4a05..5a5228f754 100644 --- a/akka-persistence/akka-persistence-voldemort/src/main/scala/VoldemortStorageBackend.scala +++ b/akka-persistence/akka-persistence-voldemort/src/main/scala/VoldemortStorageBackend.scala @@ -34,18 +34,23 @@ MapStorageBackend[Array[Byte], Array[Byte]] with val refStore = config.getString("akka.storage.voldemort.store.ref", "Refs") val mapKeyStore = config.getString("akka.storage.voldemort.store.map-key", "MapKeys") val mapValueStore = config.getString("akka.storage.voldemort.store.map-value", "MapValues") - val vectorSizeStore = config.getString("akka.storage.voldemort.store.vector-size", "VectorSizes") - val vectorValueStore = config.getString("akka.storage.voldemort.store.vector-value", "VectorValues") + val vectorStore = config.getString("akka.storage.voldemort.store.vector", "Vectors") + val queueStore = config.getString("akka.storage.voldemort.store.queue", "Queues") var storeClientFactory: StoreClientFactory = null var refClient: StoreClient[String, Array[Byte]] = null var mapKeyClient: StoreClient[String, Array[Byte]] = null var mapValueClient: StoreClient[Array[Byte], Array[Byte]] = null - var vectorSizeClient: StoreClient[String, Array[Byte]] = null - var vectorValueClient: StoreClient[Array[Byte], Array[Byte]] = null + var vectorClient: StoreClient[Array[Byte], Array[Byte]] = null + var queueClient: StoreClient[Array[Byte], Array[Byte]] = null initStoreClients val underscoreBytesUTF8 = "_".getBytes("UTF-8") + val vectorSizeIndex = -1 + val queueSizeKeyBytes = IntSerializer.toBytes(-1) + val queueHeadKeyBytes = IntSerializer.toBytes(-2) + val queueTailKeyBytes = IntSerializer.toBytes(-3) + implicit val byteOrder = new Ordering[Array[Byte]] { override def compare(x: Array[Byte], y: Array[Byte]) = ByteUtils.compare(x, y) } @@ -152,7 +157,7 @@ MapStorageBackend[Array[Byte], Array[Byte]] with def getVectorStorageSizeFor(name: String): Int = { - IntSerializer.fromBytes(vectorSizeClient.getValue(name, IntSerializer.toBytes(0))) + IntSerializer.fromBytes(vectorClient.getValue(getIndexedKey(name, vectorSizeIndex), IntSerializer.toBytes(0))) } @@ -167,10 +172,10 @@ MapStorageBackend[Array[Byte], Array[Byte]] with count } val seq: IndexedSeq[Array[Byte]] = (st until st + cnt).map { - index => getVectorValueKey(name, index) + index => getIndexedKey(name, index) } - val all: JMap[Array[Byte], Versioned[Array[Byte]]] = vectorValueClient.getAll(JavaConversions.asIterable(seq)) + val all: JMap[Array[Byte], Versioned[Array[Byte]]] = vectorClient.getAll(JavaConversions.asIterable(seq)) var storage = new ArrayBuffer[Array[Byte]](seq.size) storage = storage.padTo(seq.size, Array.empty[Byte]) @@ -189,14 +194,14 @@ MapStorageBackend[Array[Byte], Array[Byte]] with def getVectorStorageEntryFor(name: String, index: Int): Array[Byte] = { - vectorValueClient.getValue(getVectorValueKey(name, index), Array.empty[Byte]) + vectorClient.getValue(getIndexedKey(name, index), Array.empty[Byte]) } def updateVectorStorageEntryFor(name: String, index: Int, elem: Array[Byte]) = { val size = getVectorStorageSizeFor(name) - vectorValueClient.put(getVectorValueKey(name, index), elem) + vectorClient.put(getIndexedKey(name, index), elem) if (size < index + 1) { - vectorSizeClient.put(name, IntSerializer.toBytes(index + 1)) + vectorClient.put(getIndexedKey(name, vectorSizeIndex), IntSerializer.toBytes(index + 1)) } } @@ -204,10 +209,10 @@ MapStorageBackend[Array[Byte], Array[Byte]] with var size = getVectorStorageSizeFor(name) elements.foreach { element => - vectorValueClient.put(getVectorValueKey(name, size), element) + vectorClient.put(getIndexedKey(name, size), element) size += 1 } - vectorSizeClient.put(name, IntSerializer.toBytes(size)) + vectorClient.put(getIndexedKey(name, vectorSizeIndex), IntSerializer.toBytes(size)) } def insertVectorStorageEntryFor(name: String, element: Array[Byte]) = { @@ -220,6 +225,7 @@ MapStorageBackend[Array[Byte], Array[Byte]] with * Store the length of owner as first byte to work around the rare case * where ownerbytes1 + keybytes1 == ownerbytes2 + keybytes2 but ownerbytes1 != ownerbytes2 */ + def getKey(owner: String, key: Array[Byte]): Array[Byte] = { val ownerBytes: Array[Byte] = owner.getBytes("UTF-8") val ownerLenghtBytes: Array[Byte] = IntSerializer.toBytes(owner.length) @@ -230,7 +236,7 @@ MapStorageBackend[Array[Byte], Array[Byte]] with theKey } - def getVectorValueKey(owner: String, index: Int): Array[Byte] = { + def getIndexedKey(owner: String, index: Int): Array[Byte] = { val indexbytes = IntSerializer.toBytes(index) val theIndexKey = new Array[Byte](underscoreBytesUTF8.length + indexbytes.length) System.arraycopy(underscoreBytesUTF8, 0, theIndexKey, 0, underscoreBytesUTF8.length) @@ -245,6 +251,7 @@ MapStorageBackend[Array[Byte], Array[Byte]] with } + def getClientConfig(configMap: Map[String, String]): Properties = { val properites = new Properties configMap.foreach { @@ -272,8 +279,8 @@ MapStorageBackend[Array[Byte], Array[Byte]] with refClient = storeClientFactory.getStoreClient(refStore) mapKeyClient = storeClientFactory.getStoreClient(mapKeyStore) mapValueClient = storeClientFactory.getStoreClient(mapValueStore) - vectorSizeClient = storeClientFactory.getStoreClient(vectorSizeStore) - vectorValueClient = storeClientFactory.getStoreClient(vectorValueStore) + vectorClient = storeClientFactory.getStoreClient(vectorStore) + queueClient = storeClientFactory.getStoreClient(queueStore) } object IntSerializer { diff --git a/akka-persistence/akka-persistence-voldemort/src/test/resources/config/stores.xml b/akka-persistence/akka-persistence-voldemort/src/test/resources/config/stores.xml index 26832d93fe..de666a219f 100644 --- a/akka-persistence/akka-persistence-voldemort/src/test/resources/config/stores.xml +++ b/akka-persistence/akka-persistence-voldemort/src/test/resources/config/stores.xml @@ -15,7 +15,7 @@ identity - + MapValues 1 @@ -33,7 +33,7 @@ - MapKeys + MapKeys 1 1 1 @@ -50,7 +50,7 @@ - VectorValues + Vectors 1 1 1 @@ -66,7 +66,7 @@ - VectorSizes + Queues 1 1 1 @@ -75,11 +75,11 @@ memory client - string - utf8 + identity identity + \ No newline at end of file diff --git a/akka-persistence/akka-persistence-voldemort/src/test/scala/VoldemortPersistentActorSuite.scala b/akka-persistence/akka-persistence-voldemort/src/test/scala/VoldemortPersistentActorSuite.scala index f76c370667..e39732dabf 100644 --- a/akka-persistence/akka-persistence-voldemort/src/test/scala/VoldemortPersistentActorSuite.scala +++ b/akka-persistence/akka-persistence-voldemort/src/test/scala/VoldemortPersistentActorSuite.scala @@ -108,12 +108,11 @@ Spec with override def beforeEach { removeMapStorageFor(state) var size = getVectorStorageSizeFor(tx) - (0 to size).foreach { + (-1 to size).foreach { index => { - vectorValueClient.delete(getVectorValueKey(tx, index)) + vectorClient.delete(getIndexedKey(tx, index)) } } - vectorSizeClient.delete(tx) } override def afterEach { diff --git a/akka-persistence/akka-persistence-voldemort/src/test/scala/VoldemortStorageBackendSuite.scala b/akka-persistence/akka-persistence-voldemort/src/test/scala/VoldemortStorageBackendSuite.scala index aa5f88f020..5f27771bae 100644 --- a/akka-persistence/akka-persistence-voldemort/src/test/scala/VoldemortStorageBackendSuite.scala +++ b/akka-persistence/akka-persistence-voldemort/src/test/scala/VoldemortStorageBackendSuite.scala @@ -82,34 +82,26 @@ class VoldemortStorageBackendSuite extends FunSuite with ShouldMatchers with Emb } - test("that vector size storage and retrieval works") { - val key = "vectorKey" - val size = IntSerializer.toBytes(17) - vectorSizeClient.delete(key) - vectorSizeClient.getValue(key, empty) should equal(empty) - vectorSizeClient.put(key, size) - vectorSizeClient.getValue(key) should equal(size) - } test("that vector value storage and retrieval works") { val key = "vectorValueKey" val index = 3 val value = bytes("some bytes") - val vecKey = getVectorValueKey(key, index) + val vecKey = getIndexedKey(key, index) getIndexFromVectorValueKey(key, vecKey) should be(index) - vectorValueClient.delete(vecKey) - vectorValueClient.getValue(vecKey, empty) should equal(empty) - vectorValueClient.put(vecKey, value) - vectorValueClient.getValue(vecKey) should equal(value) + vectorClient.delete(vecKey) + vectorClient.getValue(vecKey, empty) should equal(empty) + vectorClient.put(vecKey, value) + vectorClient.getValue(vecKey) should equal(value) } test("PersistentVector apis function as expected") { val key = "vectorApiKey" val value = bytes("Some bytes we want to store in a vector") val updatedValue = bytes("Some updated bytes we want to store in a vector") - vectorSizeClient.delete(key) - vectorValueClient.delete(getVectorValueKey(key, 0)) - vectorValueClient.delete(getVectorValueKey(key, 1)) + vectorClient.delete(getIndexedKey(key, vectorSizeIndex)) + vectorClient.delete(getIndexedKey(key, 0)) + vectorClient.delete(getIndexedKey(key, 1)) getVectorStorageEntryFor(key, 0) should be(empty) getVectorStorageEntryFor(key, 1) should be(empty) getVectorStorageRangeFor(key, None, None, 1).head should be(empty) diff --git a/config/akka-reference.conf b/config/akka-reference.conf index eec56c7f06..fd0d658ab4 100644 --- a/config/akka-reference.conf +++ b/config/akka-reference.conf @@ -171,11 +171,11 @@ akka { voldemort { store { - refs = "Refs" # Voldemort Store Used to Persist Refs. Use string serializer for keys, identity serializer for values + ref = "Refs" # Voldemort Store Used to Persist Refs. Use string serializer for keys, identity serializer for values map-keys = "MapKeys" # Voldemort Store Used to Persist Map Keys. Use string serializer for keys, identity serializer for values map-values = "MapValues" # Voldemort Store Used to Persist Map Values. Use identity serializer for keys, identity serializer for values - vector-sizes = "VectorSizes" # Voldemort Store Used to Persist Vector Sizes. Use string serializer for keys, identity serializer for values - vector-values = "VectorValues" # Voldemort Store Used to Persist Vector Values. Use identity serializer for keys, identity serializer for values + vector = "Vectors" # Voldemort Store Used to Persist Vector Sizes. Use string serializer for keys, identity serializer for values + queue = "Queues" # Voldemort Store Used to Persist Vector Values. Use identity serializer for keys, identity serializer for values } client { # The KeyValue pairs under client are converted to java Properties and used to construct the ClientConfig From 0a6210645d50c2ee99c5a5d741f889ebe98c8947 Mon Sep 17 00:00:00 2001 From: ticktock Date: Thu, 23 Sep 2010 22:58:24 -0400 Subject: [PATCH 2/2] More Queue impl --- .../main/scala/VoldemortStorageBackend.scala | 49 +++++++++++++++++-- 1 file changed, 45 insertions(+), 4 deletions(-) diff --git a/akka-persistence/akka-persistence-voldemort/src/main/scala/VoldemortStorageBackend.scala b/akka-persistence/akka-persistence-voldemort/src/main/scala/VoldemortStorageBackend.scala index 5a5228f754..b5c1023970 100644 --- a/akka-persistence/akka-persistence-voldemort/src/main/scala/VoldemortStorageBackend.scala +++ b/akka-persistence/akka-persistence-voldemort/src/main/scala/VoldemortStorageBackend.scala @@ -25,6 +25,7 @@ private[akka] object VoldemortStorageBackend extends MapStorageBackend[Array[Byte], Array[Byte]] with VectorStorageBackend[Array[Byte]] with RefStorageBackend[Array[Byte]] with + QueueStorageBackend[Array[Byte]] with Logging { val bootstrapUrlsProp = "bootstrap_urls" val clientConfig = config.getConfigMap("akka.storage.voldemort.client") match { @@ -47,9 +48,12 @@ MapStorageBackend[Array[Byte], Array[Byte]] with val underscoreBytesUTF8 = "_".getBytes("UTF-8") val vectorSizeIndex = -1 - val queueSizeKeyBytes = IntSerializer.toBytes(-1) - val queueHeadKeyBytes = IntSerializer.toBytes(-2) - val queueTailKeyBytes = IntSerializer.toBytes(-3) + val queueHeadIndex = -1 + val queueTailIndex = -2 + case class QueueMetadata(head: Int, tail: Int) { + def size = tail - head + //worry about wrapping etc + } implicit val byteOrder = new Ordering[Array[Byte]] { override def compare(x: Array[Byte], y: Array[Byte]) = ByteUtils.compare(x, y) @@ -220,12 +224,50 @@ MapStorageBackend[Array[Byte], Array[Byte]] with } + def remove(name: String): Boolean = { + false + } + + def peek(name: String, start: Int, count: Int): List[Array[Byte]] = { + List(Array.empty[Byte]) + } + + def size(name: String): Int = { + getQueueMetadata(name).size + } + + def dequeue(name: String): Option[Array[Byte]] = { + None + } + + def enqueue(name: String, item: Array[Byte]): Option[Int] = { + val mdata = getQueueMetadata(name) + val key = getIndexedKey(name, mdata.tail) + queueClient.put(key, item) + queueClient.put(getIndexedKey(name, queueTailIndex), IntSerializer.toBytes(mdata.tail + 1)) + Some (mdata.size + 1) + } + + + def getQueueMetadata(name: String): QueueMetadata = { + val keys = List(getIndexedKey(name, queueHeadIndex), getIndexedKey(name, queueTailIndex)) + val qdata = JavaConversions.asMap(queueClient.getAll(JavaConversions.asIterable(keys))) + val values = keys.map { + qdata.get(_) match { + case Some(versioned) => IntSerializer.fromBytes(versioned.getValue) + case None => 0 + } + } + QueueMetadata(values.head, values.tail.head) + } + /** * Concat the ownerlenght+owner+key+ of owner so owned data will be colocated * Store the length of owner as first byte to work around the rare case * where ownerbytes1 + keybytes1 == ownerbytes2 + keybytes2 but ownerbytes1 != ownerbytes2 */ + def getKey(owner: String, key: Array[Byte]): Array[Byte] = { val ownerBytes: Array[Byte] = owner.getBytes("UTF-8") val ownerLenghtBytes: Array[Byte] = IntSerializer.toBytes(owner.length) @@ -251,7 +293,6 @@ MapStorageBackend[Array[Byte], Array[Byte]] with } - def getClientConfig(configMap: Map[String, String]): Properties = { val properites = new Properties configMap.foreach {