diff --git a/akka-persistence/akka-persistence-voldemort/src/main/scala/VoldemortStorageBackend.scala b/akka-persistence/akka-persistence-voldemort/src/main/scala/VoldemortStorageBackend.scala
index 83b74a4a05..b5c1023970 100644
--- a/akka-persistence/akka-persistence-voldemort/src/main/scala/VoldemortStorageBackend.scala
+++ b/akka-persistence/akka-persistence-voldemort/src/main/scala/VoldemortStorageBackend.scala
@@ -25,6 +25,7 @@ private[akka] object VoldemortStorageBackend extends
MapStorageBackend[Array[Byte], Array[Byte]] with
VectorStorageBackend[Array[Byte]] with
RefStorageBackend[Array[Byte]] with
+ QueueStorageBackend[Array[Byte]] with
Logging {
val bootstrapUrlsProp = "bootstrap_urls"
val clientConfig = config.getConfigMap("akka.storage.voldemort.client") match {
@@ -34,18 +35,26 @@ MapStorageBackend[Array[Byte], Array[Byte]] with
val refStore = config.getString("akka.storage.voldemort.store.ref", "Refs")
val mapKeyStore = config.getString("akka.storage.voldemort.store.map-key", "MapKeys")
val mapValueStore = config.getString("akka.storage.voldemort.store.map-value", "MapValues")
- val vectorSizeStore = config.getString("akka.storage.voldemort.store.vector-size", "VectorSizes")
- val vectorValueStore = config.getString("akka.storage.voldemort.store.vector-value", "VectorValues")
+ val vectorStore = config.getString("akka.storage.voldemort.store.vector", "Vectors")
+ val queueStore = config.getString("akka.storage.voldemort.store.queue", "Queues")
var storeClientFactory: StoreClientFactory = null
var refClient: StoreClient[String, Array[Byte]] = null
var mapKeyClient: StoreClient[String, Array[Byte]] = null
var mapValueClient: StoreClient[Array[Byte], Array[Byte]] = null
- var vectorSizeClient: StoreClient[String, Array[Byte]] = null
- var vectorValueClient: StoreClient[Array[Byte], Array[Byte]] = null
+ var vectorClient: StoreClient[Array[Byte], Array[Byte]] = null
+ var queueClient: StoreClient[Array[Byte], Array[Byte]] = null
initStoreClients
val underscoreBytesUTF8 = "_".getBytes("UTF-8")
+ val vectorSizeIndex = -1
+ val queueHeadIndex = -1
+ val queueTailIndex = -2
+ case class QueueMetadata(head: Int, tail: Int) {
+ def size = tail - head
+ //worry about wrapping etc
+ }
+
implicit val byteOrder = new Ordering[Array[Byte]] {
override def compare(x: Array[Byte], y: Array[Byte]) = ByteUtils.compare(x, y)
}
@@ -152,7 +161,7 @@ MapStorageBackend[Array[Byte], Array[Byte]] with
def getVectorStorageSizeFor(name: String): Int = {
- IntSerializer.fromBytes(vectorSizeClient.getValue(name, IntSerializer.toBytes(0)))
+ IntSerializer.fromBytes(vectorClient.getValue(getIndexedKey(name, vectorSizeIndex), IntSerializer.toBytes(0)))
}
@@ -167,10 +176,10 @@ MapStorageBackend[Array[Byte], Array[Byte]] with
count
}
val seq: IndexedSeq[Array[Byte]] = (st until st + cnt).map {
- index => getVectorValueKey(name, index)
+ index => getIndexedKey(name, index)
}
- val all: JMap[Array[Byte], Versioned[Array[Byte]]] = vectorValueClient.getAll(JavaConversions.asIterable(seq))
+ val all: JMap[Array[Byte], Versioned[Array[Byte]]] = vectorClient.getAll(JavaConversions.asIterable(seq))
var storage = new ArrayBuffer[Array[Byte]](seq.size)
storage = storage.padTo(seq.size, Array.empty[Byte])
@@ -189,14 +198,14 @@ MapStorageBackend[Array[Byte], Array[Byte]] with
def getVectorStorageEntryFor(name: String, index: Int): Array[Byte] = {
- vectorValueClient.getValue(getVectorValueKey(name, index), Array.empty[Byte])
+ vectorClient.getValue(getIndexedKey(name, index), Array.empty[Byte])
}
def updateVectorStorageEntryFor(name: String, index: Int, elem: Array[Byte]) = {
val size = getVectorStorageSizeFor(name)
- vectorValueClient.put(getVectorValueKey(name, index), elem)
+ vectorClient.put(getIndexedKey(name, index), elem)
if (size < index + 1) {
- vectorSizeClient.put(name, IntSerializer.toBytes(index + 1))
+ vectorClient.put(getIndexedKey(name, vectorSizeIndex), IntSerializer.toBytes(index + 1))
}
}
@@ -204,10 +213,10 @@ MapStorageBackend[Array[Byte], Array[Byte]] with
var size = getVectorStorageSizeFor(name)
elements.foreach {
element =>
- vectorValueClient.put(getVectorValueKey(name, size), element)
+ vectorClient.put(getIndexedKey(name, size), element)
size += 1
}
- vectorSizeClient.put(name, IntSerializer.toBytes(size))
+ vectorClient.put(getIndexedKey(name, vectorSizeIndex), IntSerializer.toBytes(size))
}
def insertVectorStorageEntryFor(name: String, element: Array[Byte]) = {
@@ -215,11 +224,50 @@ MapStorageBackend[Array[Byte], Array[Byte]] with
}
+ def remove(name: String): Boolean = {
+ false
+ }
+
+ def peek(name: String, start: Int, count: Int): List[Array[Byte]] = {
+ List(Array.empty[Byte])
+ }
+
+ def size(name: String): Int = {
+ getQueueMetadata(name).size
+ }
+
+ def dequeue(name: String): Option[Array[Byte]] = {
+ None
+ }
+
+ def enqueue(name: String, item: Array[Byte]): Option[Int] = {
+ val mdata = getQueueMetadata(name)
+ val key = getIndexedKey(name, mdata.tail)
+ queueClient.put(key, item)
+ queueClient.put(getIndexedKey(name, queueTailIndex), IntSerializer.toBytes(mdata.tail + 1))
+ Some (mdata.size + 1)
+ }
+
+
+ def getQueueMetadata(name: String): QueueMetadata = {
+ val keys = List(getIndexedKey(name, queueHeadIndex), getIndexedKey(name, queueTailIndex))
+ val qdata = JavaConversions.asMap(queueClient.getAll(JavaConversions.asIterable(keys)))
+ val values = keys.map {
+ qdata.get(_) match {
+ case Some(versioned) => IntSerializer.fromBytes(versioned.getValue)
+ case None => 0
+ }
+ }
+ QueueMetadata(values.head, values.tail.head)
+ }
+
/**
* Concat the ownerlenght+owner+key+ of owner so owned data will be colocated
* Store the length of owner as first byte to work around the rare case
* where ownerbytes1 + keybytes1 == ownerbytes2 + keybytes2 but ownerbytes1 != ownerbytes2
*/
+
+
def getKey(owner: String, key: Array[Byte]): Array[Byte] = {
val ownerBytes: Array[Byte] = owner.getBytes("UTF-8")
val ownerLenghtBytes: Array[Byte] = IntSerializer.toBytes(owner.length)
@@ -230,7 +278,7 @@ MapStorageBackend[Array[Byte], Array[Byte]] with
theKey
}
- def getVectorValueKey(owner: String, index: Int): Array[Byte] = {
+ def getIndexedKey(owner: String, index: Int): Array[Byte] = {
val indexbytes = IntSerializer.toBytes(index)
val theIndexKey = new Array[Byte](underscoreBytesUTF8.length + indexbytes.length)
System.arraycopy(underscoreBytesUTF8, 0, theIndexKey, 0, underscoreBytesUTF8.length)
@@ -272,8 +320,8 @@ MapStorageBackend[Array[Byte], Array[Byte]] with
refClient = storeClientFactory.getStoreClient(refStore)
mapKeyClient = storeClientFactory.getStoreClient(mapKeyStore)
mapValueClient = storeClientFactory.getStoreClient(mapValueStore)
- vectorSizeClient = storeClientFactory.getStoreClient(vectorSizeStore)
- vectorValueClient = storeClientFactory.getStoreClient(vectorValueStore)
+ vectorClient = storeClientFactory.getStoreClient(vectorStore)
+ queueClient = storeClientFactory.getStoreClient(queueStore)
}
object IntSerializer {
diff --git a/akka-persistence/akka-persistence-voldemort/src/test/resources/config/stores.xml b/akka-persistence/akka-persistence-voldemort/src/test/resources/config/stores.xml
index 26832d93fe..de666a219f 100644
--- a/akka-persistence/akka-persistence-voldemort/src/test/resources/config/stores.xml
+++ b/akka-persistence/akka-persistence-voldemort/src/test/resources/config/stores.xml
@@ -15,7 +15,7 @@
identity
-
+
MapValues
1
@@ -33,7 +33,7 @@
- MapKeys
+ MapKeys
1
1
1
@@ -50,7 +50,7 @@
- VectorValues
+ Vectors
1
1
1
@@ -66,7 +66,7 @@
- VectorSizes
+ Queues
1
1
1
@@ -75,11 +75,11 @@
memory
client
- string
- utf8
+ identity
identity
+
\ No newline at end of file
diff --git a/akka-persistence/akka-persistence-voldemort/src/test/scala/VoldemortPersistentActorSuite.scala b/akka-persistence/akka-persistence-voldemort/src/test/scala/VoldemortPersistentActorSuite.scala
index f76c370667..e39732dabf 100644
--- a/akka-persistence/akka-persistence-voldemort/src/test/scala/VoldemortPersistentActorSuite.scala
+++ b/akka-persistence/akka-persistence-voldemort/src/test/scala/VoldemortPersistentActorSuite.scala
@@ -108,12 +108,11 @@ Spec with
override def beforeEach {
removeMapStorageFor(state)
var size = getVectorStorageSizeFor(tx)
- (0 to size).foreach {
+ (-1 to size).foreach {
index => {
- vectorValueClient.delete(getVectorValueKey(tx, index))
+ vectorClient.delete(getIndexedKey(tx, index))
}
}
- vectorSizeClient.delete(tx)
}
override def afterEach {
diff --git a/akka-persistence/akka-persistence-voldemort/src/test/scala/VoldemortStorageBackendSuite.scala b/akka-persistence/akka-persistence-voldemort/src/test/scala/VoldemortStorageBackendSuite.scala
index aa5f88f020..5f27771bae 100644
--- a/akka-persistence/akka-persistence-voldemort/src/test/scala/VoldemortStorageBackendSuite.scala
+++ b/akka-persistence/akka-persistence-voldemort/src/test/scala/VoldemortStorageBackendSuite.scala
@@ -82,34 +82,26 @@ class VoldemortStorageBackendSuite extends FunSuite with ShouldMatchers with Emb
}
- test("that vector size storage and retrieval works") {
- val key = "vectorKey"
- val size = IntSerializer.toBytes(17)
- vectorSizeClient.delete(key)
- vectorSizeClient.getValue(key, empty) should equal(empty)
- vectorSizeClient.put(key, size)
- vectorSizeClient.getValue(key) should equal(size)
- }
test("that vector value storage and retrieval works") {
val key = "vectorValueKey"
val index = 3
val value = bytes("some bytes")
- val vecKey = getVectorValueKey(key, index)
+ val vecKey = getIndexedKey(key, index)
getIndexFromVectorValueKey(key, vecKey) should be(index)
- vectorValueClient.delete(vecKey)
- vectorValueClient.getValue(vecKey, empty) should equal(empty)
- vectorValueClient.put(vecKey, value)
- vectorValueClient.getValue(vecKey) should equal(value)
+ vectorClient.delete(vecKey)
+ vectorClient.getValue(vecKey, empty) should equal(empty)
+ vectorClient.put(vecKey, value)
+ vectorClient.getValue(vecKey) should equal(value)
}
test("PersistentVector apis function as expected") {
val key = "vectorApiKey"
val value = bytes("Some bytes we want to store in a vector")
val updatedValue = bytes("Some updated bytes we want to store in a vector")
- vectorSizeClient.delete(key)
- vectorValueClient.delete(getVectorValueKey(key, 0))
- vectorValueClient.delete(getVectorValueKey(key, 1))
+ vectorClient.delete(getIndexedKey(key, vectorSizeIndex))
+ vectorClient.delete(getIndexedKey(key, 0))
+ vectorClient.delete(getIndexedKey(key, 1))
getVectorStorageEntryFor(key, 0) should be(empty)
getVectorStorageEntryFor(key, 1) should be(empty)
getVectorStorageRangeFor(key, None, None, 1).head should be(empty)
diff --git a/config/akka-reference.conf b/config/akka-reference.conf
index b44fdf867f..e1f3cdb46b 100644
--- a/config/akka-reference.conf
+++ b/config/akka-reference.conf
@@ -171,11 +171,11 @@ akka {
voldemort {
store {
- refs = "Refs" # Voldemort Store Used to Persist Refs. Use string serializer for keys, identity serializer for values
+ ref = "Refs" # Voldemort Store Used to Persist Refs. Use string serializer for keys, identity serializer for values
map-keys = "MapKeys" # Voldemort Store Used to Persist Map Keys. Use string serializer for keys, identity serializer for values
map-values = "MapValues" # Voldemort Store Used to Persist Map Values. Use identity serializer for keys, identity serializer for values
- vector-sizes = "VectorSizes" # Voldemort Store Used to Persist Vector Sizes. Use string serializer for keys, identity serializer for values
- vector-values = "VectorValues" # Voldemort Store Used to Persist Vector Values. Use identity serializer for keys, identity serializer for values
+ vector = "Vectors" # Voldemort Store Used to Persist Vector Sizes. Use string serializer for keys, identity serializer for values
+ queue = "Queues" # Voldemort Store Used to Persist Vector Values. Use identity serializer for keys, identity serializer for values
}
client { # The KeyValue pairs under client are converted to java Properties and used to construct the ClientConfig