diff --git a/akka-persistence/akka-persistence-voldemort/src/main/scala/VoldemortStorageBackend.scala b/akka-persistence/akka-persistence-voldemort/src/main/scala/VoldemortStorageBackend.scala index e08c45d159..7cfc24c092 100644 --- a/akka-persistence/akka-persistence-voldemort/src/main/scala/VoldemortStorageBackend.scala +++ b/akka-persistence/akka-persistence-voldemort/src/main/scala/VoldemortStorageBackend.scala @@ -49,10 +49,7 @@ MapStorageBackend[Array[Byte], Array[Byte]] with val vectorSizeIndex = getIndexedBytes(-1) val queueHeadIndex = getIndexedBytes(-1) val queueTailIndex = getIndexedBytes(-2) - case class QueueMetadata(head: Int, tail: Int) { - def size = tail - head - //worry about wrapping etc - } + implicit val byteOrder = new Ordering[Array[Byte]] { override def compare(x: Array[Byte], y: Array[Byte]) = ByteUtils.compare(x, y) @@ -224,7 +221,13 @@ MapStorageBackend[Array[Byte], Array[Byte]] with def remove(name: String): Boolean = { - false + val mdata = getQueueMetadata(name) + mdata.getActiveIndexes foreach { + index => + queueClient.delete(getIndexedKey(name, index)) + } + queueClient.delete(getKey(name, queueHeadIndex)) + queueClient.delete(getKey(name, queueTailIndex)) } def peek(name: String, start: Int, count: Int): List[Array[Byte]] = { @@ -236,15 +239,28 @@ MapStorageBackend[Array[Byte], Array[Byte]] with } def dequeue(name: String): Option[Array[Byte]] = { - None + val mdata = getQueueMetadata(name) + if (mdata.canDequeue) { + val key = getIndexedKey(name, mdata.head) + val dequeued = queueClient.getValue(key) + queueClient.delete(key) + queueClient.put(getKey(name, queueHeadIndex), IntSerializer.toBytes(mdata.nextDequeue)) + Some(dequeued) + } else { + None + } } def enqueue(name: String, item: Array[Byte]): Option[Int] = { val mdata = getQueueMetadata(name) - val key = getIndexedKey(name, mdata.tail) - queueClient.put(key, item) - queueClient.put(getKey(name, queueTailIndex), IntSerializer.toBytes(mdata.tail + 1)) - Some(mdata.size + 1) + if (mdata.canEnqueue) { + val key = getIndexedKey(name, mdata.tail) + queueClient.put(key, item) + queueClient.put(getKey(name, queueTailIndex), IntSerializer.toBytes(mdata.nextEnqueue)) + Some(mdata.size + 1) + } else { + None + } } @@ -326,6 +342,50 @@ MapStorageBackend[Array[Byte], Array[Byte]] with queueClient = storeClientFactory.getStoreClient(queueStore) } + + case class QueueMetadata(head: Int, tail: Int) { + //queue is an sequence with indexes from 0 to Int.MAX_VALUE + //wraps around when one pointer gets to max value + def size = { + if (tail >= head) { + tail - head + } else { + //queue has wrapped + Integer.MAX_VALUE - head + tail + 1 + } + } + + def canEnqueue = { + //the -1 stops the tail from catching the head on a wrap around + size < Integer.MAX_VALUE - 1 + } + + def canDequeue = {size > 0} + + def getActiveIndexes(): Seq[Int] = { + if (tail >= head) { + head to tail + } else { + //queue has wrapped + (0 to tail) ++ (head to Integer.MAX_VALUE) + } + } + + def nextEnqueue = { + tail match { + case Integer.MAX_VALUE => 0 + case _ => tail + 1 + } + } + + def nextDequeue = { + head match { + case Integer.MAX_VALUE => 0 + case _ => head + 1 + } + } + } + object IntSerializer { val bytesPerInt = java.lang.Integer.SIZE / java.lang.Byte.SIZE diff --git a/akka-persistence/akka-persistence-voldemort/src/test/scala/VoldemortStorageBackendSuite.scala b/akka-persistence/akka-persistence-voldemort/src/test/scala/VoldemortStorageBackendSuite.scala index 613181cbd2..d96e8520d2 100644 --- a/akka-persistence/akka-persistence-voldemort/src/test/scala/VoldemortStorageBackendSuite.scala +++ b/akka-persistence/akka-persistence-voldemort/src/test/scala/VoldemortStorageBackendSuite.scala @@ -8,6 +8,7 @@ import se.scalablesolutions.akka.persistence.voldemort.VoldemortStorageBackend._ import se.scalablesolutions.akka.util.{Logging} import collection.immutable.TreeSet import VoldemortStorageBackendSuite._ +import scala.None @RunWith(classOf[JUnitRunner]) class VoldemortStorageBackendSuite extends FunSuite with ShouldMatchers with EmbeddedVoldemort with Logging { @@ -126,6 +127,37 @@ class VoldemortStorageBackendSuite extends FunSuite with ShouldMatchers with Emb } + test("Persistent Queue apis function as expected") { + val key = "queueApiKey" + val value = bytes("some bytes even") + val valueOdd = bytes("some bytes odd") + remove(key) + VoldemortStorageBackend.size(key) should be(0) + enqueue(key, value) should be(Some(1)) + VoldemortStorageBackend.size(key) should be(1) + enqueue(key, valueOdd) should be(Some(2)) + VoldemortStorageBackend.size(key) should be(2) + dequeue(key).get should be(value) + VoldemortStorageBackend.size(key) should be(1) + dequeue(key).get should be(valueOdd) + VoldemortStorageBackend.size(key) should be(0) + dequeue(key) should be(None) + queueClient.put(getKey(key, queueHeadIndex), IntSerializer.toBytes(Integer.MAX_VALUE)) + queueClient.put(getKey(key, queueTailIndex), IntSerializer.toBytes(Integer.MAX_VALUE)) + VoldemortStorageBackend.size(key) should be(0) + enqueue(key, value) should be(Some(1)) + VoldemortStorageBackend.size(key) should be(1) + enqueue(key, valueOdd) should be(Some(2)) + VoldemortStorageBackend.size(key) should be(2) + dequeue(key).get should be(value) + VoldemortStorageBackend.size(key) should be(1) + dequeue(key).get should be(valueOdd) + VoldemortStorageBackend.size(key) should be(0) + dequeue(key) should be(None) + + + } + } object VoldemortStorageBackendSuite {