More Queue impl

This commit is contained in:
ticktock 2010-09-23 22:58:24 -04:00
parent 60bd020150
commit 97ff092e00

View file

@ -25,6 +25,7 @@ private[akka] object VoldemortStorageBackend extends
MapStorageBackend[Array[Byte], Array[Byte]] with
VectorStorageBackend[Array[Byte]] with
RefStorageBackend[Array[Byte]] with
QueueStorageBackend[Array[Byte]] with
Logging {
val bootstrapUrlsProp = "bootstrap_urls"
val clientConfig = config.getConfigMap("akka.storage.voldemort.client") match {
@ -47,9 +48,12 @@ MapStorageBackend[Array[Byte], Array[Byte]] with
val underscoreBytesUTF8 = "_".getBytes("UTF-8")
val vectorSizeIndex = -1
val queueSizeKeyBytes = IntSerializer.toBytes(-1)
val queueHeadKeyBytes = IntSerializer.toBytes(-2)
val queueTailKeyBytes = IntSerializer.toBytes(-3)
val queueHeadIndex = -1
val queueTailIndex = -2
case class QueueMetadata(head: Int, tail: Int) {
def size = tail - head
//worry about wrapping etc
}
implicit val byteOrder = new Ordering[Array[Byte]] {
override def compare(x: Array[Byte], y: Array[Byte]) = ByteUtils.compare(x, y)
@ -220,12 +224,50 @@ MapStorageBackend[Array[Byte], Array[Byte]] with
}
def remove(name: String): Boolean = {
false
}
def peek(name: String, start: Int, count: Int): List[Array[Byte]] = {
List(Array.empty[Byte])
}
def size(name: String): Int = {
getQueueMetadata(name).size
}
def dequeue(name: String): Option[Array[Byte]] = {
None
}
def enqueue(name: String, item: Array[Byte]): Option[Int] = {
val mdata = getQueueMetadata(name)
val key = getIndexedKey(name, mdata.tail)
queueClient.put(key, item)
queueClient.put(getIndexedKey(name, queueTailIndex), IntSerializer.toBytes(mdata.tail + 1))
Some (mdata.size + 1)
}
def getQueueMetadata(name: String): QueueMetadata = {
val keys = List(getIndexedKey(name, queueHeadIndex), getIndexedKey(name, queueTailIndex))
val qdata = JavaConversions.asMap(queueClient.getAll(JavaConversions.asIterable(keys)))
val values = keys.map {
qdata.get(_) match {
case Some(versioned) => IntSerializer.fromBytes(versioned.getValue)
case None => 0
}
}
QueueMetadata(values.head, values.tail.head)
}
/**
* Concat the ownerlenght+owner+key+ of owner so owned data will be colocated
* Store the length of owner as first byte to work around the rare case
* where ownerbytes1 + keybytes1 == ownerbytes2 + keybytes2 but ownerbytes1 != ownerbytes2
*/
def getKey(owner: String, key: Array[Byte]): Array[Byte] = {
val ownerBytes: Array[Byte] = owner.getBytes("UTF-8")
val ownerLenghtBytes: Array[Byte] = IntSerializer.toBytes(owner.length)
@ -251,7 +293,6 @@ MapStorageBackend[Array[Byte], Array[Byte]] with
}
def getClientConfig(configMap: Map[String, String]): Properties = {
val properites = new Properties
configMap.foreach {