diff --git a/akka-persistence/akka-persistence-voldemort/src/main/scala/VoldemortStorageBackend.scala b/akka-persistence/akka-persistence-voldemort/src/main/scala/VoldemortStorageBackend.scala index 5a5228f754..b5c1023970 100644 --- a/akka-persistence/akka-persistence-voldemort/src/main/scala/VoldemortStorageBackend.scala +++ b/akka-persistence/akka-persistence-voldemort/src/main/scala/VoldemortStorageBackend.scala @@ -25,6 +25,7 @@ private[akka] object VoldemortStorageBackend extends MapStorageBackend[Array[Byte], Array[Byte]] with VectorStorageBackend[Array[Byte]] with RefStorageBackend[Array[Byte]] with + QueueStorageBackend[Array[Byte]] with Logging { val bootstrapUrlsProp = "bootstrap_urls" val clientConfig = config.getConfigMap("akka.storage.voldemort.client") match { @@ -47,9 +48,12 @@ MapStorageBackend[Array[Byte], Array[Byte]] with val underscoreBytesUTF8 = "_".getBytes("UTF-8") val vectorSizeIndex = -1 - val queueSizeKeyBytes = IntSerializer.toBytes(-1) - val queueHeadKeyBytes = IntSerializer.toBytes(-2) - val queueTailKeyBytes = IntSerializer.toBytes(-3) + val queueHeadIndex = -1 + val queueTailIndex = -2 + case class QueueMetadata(head: Int, tail: Int) { + def size = tail - head + //worry about wrapping etc + } implicit val byteOrder = new Ordering[Array[Byte]] { override def compare(x: Array[Byte], y: Array[Byte]) = ByteUtils.compare(x, y) @@ -220,12 +224,50 @@ MapStorageBackend[Array[Byte], Array[Byte]] with } + def remove(name: String): Boolean = { + false + } + + def peek(name: String, start: Int, count: Int): List[Array[Byte]] = { + List(Array.empty[Byte]) + } + + def size(name: String): Int = { + getQueueMetadata(name).size + } + + def dequeue(name: String): Option[Array[Byte]] = { + None + } + + def enqueue(name: String, item: Array[Byte]): Option[Int] = { + val mdata = getQueueMetadata(name) + val key = getIndexedKey(name, mdata.tail) + queueClient.put(key, item) + queueClient.put(getIndexedKey(name, queueTailIndex), IntSerializer.toBytes(mdata.tail + 1)) + Some (mdata.size + 1) + } + + + def getQueueMetadata(name: String): QueueMetadata = { + val keys = List(getIndexedKey(name, queueHeadIndex), getIndexedKey(name, queueTailIndex)) + val qdata = JavaConversions.asMap(queueClient.getAll(JavaConversions.asIterable(keys))) + val values = keys.map { + qdata.get(_) match { + case Some(versioned) => IntSerializer.fromBytes(versioned.getValue) + case None => 0 + } + } + QueueMetadata(values.head, values.tail.head) + } + /** * Concat the ownerlenght+owner+key+ of owner so owned data will be colocated * Store the length of owner as first byte to work around the rare case * where ownerbytes1 + keybytes1 == ownerbytes2 + keybytes2 but ownerbytes1 != ownerbytes2 */ + def getKey(owner: String, key: Array[Byte]): Array[Byte] = { val ownerBytes: Array[Byte] = owner.getBytes("UTF-8") val ownerLenghtBytes: Array[Byte] = IntSerializer.toBytes(owner.length) @@ -251,7 +293,6 @@ MapStorageBackend[Array[Byte], Array[Byte]] with } - def getClientConfig(configMap: Map[String, String]): Properties = { val properites = new Properties configMap.foreach {