diff --git a/akka-persistence/akka-persistence-voldemort/src/main/scala/VoldemortStorageBackend.scala b/akka-persistence/akka-persistence-voldemort/src/main/scala/VoldemortStorageBackend.scala index 83b74a4a05..5a5228f754 100644 --- a/akka-persistence/akka-persistence-voldemort/src/main/scala/VoldemortStorageBackend.scala +++ b/akka-persistence/akka-persistence-voldemort/src/main/scala/VoldemortStorageBackend.scala @@ -34,18 +34,23 @@ MapStorageBackend[Array[Byte], Array[Byte]] with val refStore = config.getString("akka.storage.voldemort.store.ref", "Refs") val mapKeyStore = config.getString("akka.storage.voldemort.store.map-key", "MapKeys") val mapValueStore = config.getString("akka.storage.voldemort.store.map-value", "MapValues") - val vectorSizeStore = config.getString("akka.storage.voldemort.store.vector-size", "VectorSizes") - val vectorValueStore = config.getString("akka.storage.voldemort.store.vector-value", "VectorValues") + val vectorStore = config.getString("akka.storage.voldemort.store.vector", "Vectors") + val queueStore = config.getString("akka.storage.voldemort.store.queue", "Queues") var storeClientFactory: StoreClientFactory = null var refClient: StoreClient[String, Array[Byte]] = null var mapKeyClient: StoreClient[String, Array[Byte]] = null var mapValueClient: StoreClient[Array[Byte], Array[Byte]] = null - var vectorSizeClient: StoreClient[String, Array[Byte]] = null - var vectorValueClient: StoreClient[Array[Byte], Array[Byte]] = null + var vectorClient: StoreClient[Array[Byte], Array[Byte]] = null + var queueClient: StoreClient[Array[Byte], Array[Byte]] = null initStoreClients val underscoreBytesUTF8 = "_".getBytes("UTF-8") + val vectorSizeIndex = -1 + val queueSizeKeyBytes = IntSerializer.toBytes(-1) + val queueHeadKeyBytes = IntSerializer.toBytes(-2) + val queueTailKeyBytes = IntSerializer.toBytes(-3) + implicit val byteOrder = new Ordering[Array[Byte]] { override def compare(x: Array[Byte], y: Array[Byte]) = ByteUtils.compare(x, y) } @@ -152,7 +157,7 @@ MapStorageBackend[Array[Byte], Array[Byte]] with def getVectorStorageSizeFor(name: String): Int = { - IntSerializer.fromBytes(vectorSizeClient.getValue(name, IntSerializer.toBytes(0))) + IntSerializer.fromBytes(vectorClient.getValue(getIndexedKey(name, vectorSizeIndex), IntSerializer.toBytes(0))) } @@ -167,10 +172,10 @@ MapStorageBackend[Array[Byte], Array[Byte]] with count } val seq: IndexedSeq[Array[Byte]] = (st until st + cnt).map { - index => getVectorValueKey(name, index) + index => getIndexedKey(name, index) } - val all: JMap[Array[Byte], Versioned[Array[Byte]]] = vectorValueClient.getAll(JavaConversions.asIterable(seq)) + val all: JMap[Array[Byte], Versioned[Array[Byte]]] = vectorClient.getAll(JavaConversions.asIterable(seq)) var storage = new ArrayBuffer[Array[Byte]](seq.size) storage = storage.padTo(seq.size, Array.empty[Byte]) @@ -189,14 +194,14 @@ MapStorageBackend[Array[Byte], Array[Byte]] with def getVectorStorageEntryFor(name: String, index: Int): Array[Byte] = { - vectorValueClient.getValue(getVectorValueKey(name, index), Array.empty[Byte]) + vectorClient.getValue(getIndexedKey(name, index), Array.empty[Byte]) } def updateVectorStorageEntryFor(name: String, index: Int, elem: Array[Byte]) = { val size = getVectorStorageSizeFor(name) - vectorValueClient.put(getVectorValueKey(name, index), elem) + vectorClient.put(getIndexedKey(name, index), elem) if (size < index + 1) { - vectorSizeClient.put(name, IntSerializer.toBytes(index + 1)) + vectorClient.put(getIndexedKey(name, vectorSizeIndex), IntSerializer.toBytes(index + 1)) } } @@ -204,10 +209,10 @@ MapStorageBackend[Array[Byte], Array[Byte]] with var size = getVectorStorageSizeFor(name) elements.foreach { element => - vectorValueClient.put(getVectorValueKey(name, size), element) + vectorClient.put(getIndexedKey(name, size), element) size += 1 } - vectorSizeClient.put(name, IntSerializer.toBytes(size)) + vectorClient.put(getIndexedKey(name, vectorSizeIndex), IntSerializer.toBytes(size)) } def insertVectorStorageEntryFor(name: String, element: Array[Byte]) = { @@ -220,6 +225,7 @@ MapStorageBackend[Array[Byte], Array[Byte]] with * Store the length of owner as first byte to work around the rare case * where ownerbytes1 + keybytes1 == ownerbytes2 + keybytes2 but ownerbytes1 != ownerbytes2 */ + def getKey(owner: String, key: Array[Byte]): Array[Byte] = { val ownerBytes: Array[Byte] = owner.getBytes("UTF-8") val ownerLenghtBytes: Array[Byte] = IntSerializer.toBytes(owner.length) @@ -230,7 +236,7 @@ MapStorageBackend[Array[Byte], Array[Byte]] with theKey } - def getVectorValueKey(owner: String, index: Int): Array[Byte] = { + def getIndexedKey(owner: String, index: Int): Array[Byte] = { val indexbytes = IntSerializer.toBytes(index) val theIndexKey = new Array[Byte](underscoreBytesUTF8.length + indexbytes.length) System.arraycopy(underscoreBytesUTF8, 0, theIndexKey, 0, underscoreBytesUTF8.length) @@ -245,6 +251,7 @@ MapStorageBackend[Array[Byte], Array[Byte]] with } + def getClientConfig(configMap: Map[String, String]): Properties = { val properites = new Properties configMap.foreach { @@ -272,8 +279,8 @@ MapStorageBackend[Array[Byte], Array[Byte]] with refClient = storeClientFactory.getStoreClient(refStore) mapKeyClient = storeClientFactory.getStoreClient(mapKeyStore) mapValueClient = storeClientFactory.getStoreClient(mapValueStore) - vectorSizeClient = storeClientFactory.getStoreClient(vectorSizeStore) - vectorValueClient = storeClientFactory.getStoreClient(vectorValueStore) + vectorClient = storeClientFactory.getStoreClient(vectorStore) + queueClient = storeClientFactory.getStoreClient(queueStore) } object IntSerializer { diff --git a/akka-persistence/akka-persistence-voldemort/src/test/resources/config/stores.xml b/akka-persistence/akka-persistence-voldemort/src/test/resources/config/stores.xml index 26832d93fe..de666a219f 100644 --- a/akka-persistence/akka-persistence-voldemort/src/test/resources/config/stores.xml +++ b/akka-persistence/akka-persistence-voldemort/src/test/resources/config/stores.xml @@ -15,7 +15,7 @@ identity - + MapValues 1 @@ -33,7 +33,7 @@ - MapKeys + MapKeys 1 1 1 @@ -50,7 +50,7 @@ - VectorValues + Vectors 1 1 1 @@ -66,7 +66,7 @@ - VectorSizes + Queues 1 1 1 @@ -75,11 +75,11 @@ memory client - string - utf8 + identity identity + \ No newline at end of file diff --git a/akka-persistence/akka-persistence-voldemort/src/test/scala/VoldemortPersistentActorSuite.scala b/akka-persistence/akka-persistence-voldemort/src/test/scala/VoldemortPersistentActorSuite.scala index f76c370667..e39732dabf 100644 --- a/akka-persistence/akka-persistence-voldemort/src/test/scala/VoldemortPersistentActorSuite.scala +++ b/akka-persistence/akka-persistence-voldemort/src/test/scala/VoldemortPersistentActorSuite.scala @@ -108,12 +108,11 @@ Spec with override def beforeEach { removeMapStorageFor(state) var size = getVectorStorageSizeFor(tx) - (0 to size).foreach { + (-1 to size).foreach { index => { - vectorValueClient.delete(getVectorValueKey(tx, index)) + vectorClient.delete(getIndexedKey(tx, index)) } } - vectorSizeClient.delete(tx) } override def afterEach { diff --git a/akka-persistence/akka-persistence-voldemort/src/test/scala/VoldemortStorageBackendSuite.scala b/akka-persistence/akka-persistence-voldemort/src/test/scala/VoldemortStorageBackendSuite.scala index aa5f88f020..5f27771bae 100644 --- a/akka-persistence/akka-persistence-voldemort/src/test/scala/VoldemortStorageBackendSuite.scala +++ b/akka-persistence/akka-persistence-voldemort/src/test/scala/VoldemortStorageBackendSuite.scala @@ -82,34 +82,26 @@ class VoldemortStorageBackendSuite extends FunSuite with ShouldMatchers with Emb } - test("that vector size storage and retrieval works") { - val key = "vectorKey" - val size = IntSerializer.toBytes(17) - vectorSizeClient.delete(key) - vectorSizeClient.getValue(key, empty) should equal(empty) - vectorSizeClient.put(key, size) - vectorSizeClient.getValue(key) should equal(size) - } test("that vector value storage and retrieval works") { val key = "vectorValueKey" val index = 3 val value = bytes("some bytes") - val vecKey = getVectorValueKey(key, index) + val vecKey = getIndexedKey(key, index) getIndexFromVectorValueKey(key, vecKey) should be(index) - vectorValueClient.delete(vecKey) - vectorValueClient.getValue(vecKey, empty) should equal(empty) - vectorValueClient.put(vecKey, value) - vectorValueClient.getValue(vecKey) should equal(value) + vectorClient.delete(vecKey) + vectorClient.getValue(vecKey, empty) should equal(empty) + vectorClient.put(vecKey, value) + vectorClient.getValue(vecKey) should equal(value) } test("PersistentVector apis function as expected") { val key = "vectorApiKey" val value = bytes("Some bytes we want to store in a vector") val updatedValue = bytes("Some updated bytes we want to store in a vector") - vectorSizeClient.delete(key) - vectorValueClient.delete(getVectorValueKey(key, 0)) - vectorValueClient.delete(getVectorValueKey(key, 1)) + vectorClient.delete(getIndexedKey(key, vectorSizeIndex)) + vectorClient.delete(getIndexedKey(key, 0)) + vectorClient.delete(getIndexedKey(key, 1)) getVectorStorageEntryFor(key, 0) should be(empty) getVectorStorageEntryFor(key, 1) should be(empty) getVectorStorageRangeFor(key, None, None, 1).head should be(empty) diff --git a/config/akka-reference.conf b/config/akka-reference.conf index eec56c7f06..fd0d658ab4 100644 --- a/config/akka-reference.conf +++ b/config/akka-reference.conf @@ -171,11 +171,11 @@ akka { voldemort { store { - refs = "Refs" # Voldemort Store Used to Persist Refs. Use string serializer for keys, identity serializer for values + ref = "Refs" # Voldemort Store Used to Persist Refs. Use string serializer for keys, identity serializer for values map-keys = "MapKeys" # Voldemort Store Used to Persist Map Keys. Use string serializer for keys, identity serializer for values map-values = "MapValues" # Voldemort Store Used to Persist Map Values. Use identity serializer for keys, identity serializer for values - vector-sizes = "VectorSizes" # Voldemort Store Used to Persist Vector Sizes. Use string serializer for keys, identity serializer for values - vector-values = "VectorValues" # Voldemort Store Used to Persist Vector Values. Use identity serializer for keys, identity serializer for values + vector = "Vectors" # Voldemort Store Used to Persist Vector Sizes. Use string serializer for keys, identity serializer for values + queue = "Queues" # Voldemort Store Used to Persist Vector Values. Use identity serializer for keys, identity serializer for values } client { # The KeyValue pairs under client are converted to java Properties and used to construct the ClientConfig