From 9aa70b91391a4ea57dfa2b5be2d5204a2aed57d6 Mon Sep 17 00:00:00 2001 From: ticktock Date: Fri, 22 Oct 2010 20:13:27 -0400 Subject: [PATCH] Initial frontend code to support vector pop, and KVStorageBackend changes to put the scaffolding in place to support this --- .../src/main/scala/KVStorageBackend.scala | 241 ++++++++++++------ .../src/main/scala/Storage.scala | 5 +- .../src/main/scala/StorageBackend.scala | 5 + .../scala/VoldemortStorageBackendSuite.scala | 3 +- 4 files changed, 178 insertions(+), 76 deletions(-) diff --git a/akka-persistence/akka-persistence-common/src/main/scala/KVStorageBackend.scala b/akka-persistence/akka-persistence-common/src/main/scala/KVStorageBackend.scala index 25c1b0b1f1..1a77bb91fc 100644 --- a/akka-persistence/akka-persistence-common/src/main/scala/KVStorageBackend.scala +++ b/akka-persistence/akka-persistence-common/src/main/scala/KVStorageBackend.scala @@ -15,24 +15,29 @@ import collection.JavaConversions import java.nio.ByteBuffer import collection.Map import collection.mutable.ArrayBuffer -import java.util.{ Properties, Map => JMap } +import java.util.{Properties, Map => JMap} import se.scalablesolutions.akka.persistence.common.PersistentMapBinary.COrdering._ import collection.immutable._ -private [akka] trait KVAccess { - def put(key: Array[Byte], value: Array[Byte]) - def getValue(key: Array[Byte]): Array[Byte] - def getValue(key: Array[Byte], default: Array[Byte]): Array[Byte] - def getAll(keys: Iterable[Array[Byte]]): Map[Array[Byte], Array[Byte]] - def delete(key: Array[Byte]) - def drop() - } +private[akka] trait KVAccess { + def put(key: Array[Byte], value: Array[Byte]) -private [akka] object KVAccess { - implicit def stringToByteArray(st: String): Array[Byte] = { - st.getBytes("UTF-8") - } + def getValue(key: Array[Byte]): Array[Byte] + + def getValue(key: Array[Byte], default: Array[Byte]): Array[Byte] + + def getAll(keys: Iterable[Array[Byte]]): Map[Array[Byte], Array[Byte]] + + def delete(key: Array[Byte]) + + def drop() +} + +private[akka] object KVAccess { + implicit def stringToByteArray(st: String): Array[Byte] = { + st.getBytes("UTF-8") + } } private[akka] trait KVStorageBackend extends MapStorageBackend[Array[Byte], Array[Byte]] with VectorStorageBackend[Array[Byte]] with RefStorageBackend[Array[Byte]] with QueueStorageBackend[Array[Byte]] with Logging { @@ -42,17 +47,22 @@ private[akka] trait KVStorageBackend extends MapStorageBackend[Array[Byte], Arra val notNullMapValueHeader: Byte = 0xff.byteValue val underscoreBytesUTF8 = "_".getBytes("UTF-8") val mapKeysIndex = getIndexedBytes(-1) - val vectorSizeIndex = getIndexedBytes(-1) + val vectorHeadIndex = getIndexedBytes(-1) + val vectorTailIndex = getIndexedBytes(-2) val queueHeadIndex = getIndexedBytes(-1) val queueTailIndex = getIndexedBytes(-2) implicit val ordering = ArrayOrdering + import KVAccess._ - + def refAccess: KVAccess + def vectorAccess: KVAccess + def mapAccess: KVAccess + def queueAccess: KVAccess def getRefStorageFor(name: String): Option[Array[Byte]] = { @@ -80,13 +90,14 @@ private[akka] trait KVStorageBackend extends MapStorageBackend[Array[Byte], Arra private def getKeyValues(name: String, keys: SortedSet[Array[Byte]]): List[(Array[Byte], Array[Byte])] = { val all: Map[Array[Byte], Array[Byte]] = - mapAccess.getAll(keys.map { mapKey => - getKey(name, mapKey) + mapAccess.getAll(keys.map{ + mapKey => + getKey(name, mapKey) }) var returned = new TreeMap[Array[Byte], Array[Byte]]()(ordering) - all.foreach { (entry) => - { + all.foreach{ + (entry) => { entry match { case (namePlusKey: Array[Byte], value: Array[Byte]) => { returned += getMapKeyFromKey(name, namePlusKey) -> getMapValueFromStored(value) @@ -119,9 +130,10 @@ private[akka] trait KVStorageBackend extends MapStorageBackend[Array[Byte], Arra def removeMapStorageFor(name: String) = { val keys = getMapKeys(name) - keys.foreach { key => - mapAccess.delete(getKey(name, key)) - log.debug("deleted key %s for %s", key, name) + keys.foreach{ + key => + mapAccess.delete(getKey(name, key)) + log.debug("deleted key %s for %s", key, name) } mapAccess.delete(getKey(name, mapKeysIndex)) } @@ -134,7 +146,7 @@ private[akka] trait KVStorageBackend extends MapStorageBackend[Array[Byte], Arra } def insertMapStorageEntriesFor(name: String, entries: List[(Array[Byte], Array[Byte])]) = { - val newKeys = entries.map { + val newKeys = entries.map{ case (key, value) => { mapAccess.put(getKey(name, key), getStoredMapValue(value)) key @@ -154,11 +166,12 @@ private[akka] trait KVStorageBackend extends MapStorageBackend[Array[Byte], Arra } def getVectorStorageSizeFor(name: String): Int = { - IntSerializer.fromBytes(vectorAccess.getValue(getKey(name, vectorSizeIndex), IntSerializer.toBytes(0))) + getVectorMetadata(name).size } def getVectorStorageRangeFor(name: String, start: Option[Int], finish: Option[Int], count: Int): List[Array[Byte]] = { - val size = getVectorStorageSizeFor(name) + val mdata = getVectorMetadata(name) + val st = start.getOrElse(0) var cnt = if (finish.isDefined) { @@ -167,46 +180,34 @@ private[akka] trait KVStorageBackend extends MapStorageBackend[Array[Byte], Arra } else { count } - if (cnt > (size - st)) { - cnt = size - st + if (cnt > (mdata.size - st)) { + cnt = mdata.size - st } - val seq: IndexedSeq[Array[Byte]] = (st until st + cnt).map { index => - getIndexedKey(name, (size - 1) - index) - } //read backwards - - val all: Map[Array[Byte], Array[Byte]] = vectorAccess.getAll(seq) - - var storage = new ArrayBuffer[Array[Byte]](seq.size) - storage = storage.padTo(seq.size, Array.empty[Byte]) - var idx = 0; - seq.foreach { key => - { - if (all.isDefinedAt(key)) { - storage.update(idx, all.get(key).get) - } - idx += 1 + val ret = mdata.getRangeIndexes(st, count).toList map { + index: Int => { + log.debug("getting:" + index) + vectorAccess.getValue(getIndexedKey(name, index)) } } - - storage.toList + ret } def getVectorStorageEntryFor(name: String, index: Int): Array[Byte] = { - val size = getVectorStorageSizeFor(name) - if (size > 0 && index < size) { - vectorAccess.getValue(getIndexedKey(name, /*read backwards*/ (size - 1) - index)) + val mdata = getVectorMetadata(name) + if (mdata.size > 0 && index < mdata.size) { + vectorAccess.getValue(getIndexedKey(name, mdata.getRangeIndexes(index, 1)(0))) } else { throw new StorageException("In Vector:" + name + " No such Index:" + index) } } def updateVectorStorageEntryFor(name: String, index: Int, elem: Array[Byte]) = { - val size = getVectorStorageSizeFor(name) - if (size > 0 && index < size) { + val mdata = getVectorMetadata(name) + if (mdata.size > 0 && index < mdata.size) { elem match { - case null => vectorAccess.delete(getIndexedKey(name, /*read backwards*/ (size - 1) - index)) - case _ => vectorAccess.put(getIndexedKey(name, /*read backwards*/ (size - 1) - index), elem) + case null => vectorAccess.delete(getIndexedKey(name, mdata.getRangeIndexes(index, 1)(0))) + case _ => vectorAccess.put(getIndexedKey(name, mdata.getRangeIndexes(index, 1)(0)), elem) } } else { throw new StorageException("In Vector:" + name + " No such Index:" + index) @@ -214,24 +215,46 @@ private[akka] trait KVStorageBackend extends MapStorageBackend[Array[Byte], Arra } def insertVectorStorageEntriesFor(name: String, elements: List[Array[Byte]]) = { - var size = getVectorStorageSizeFor(name) - elements.foreach { element => - if (element != null) { - vectorAccess.put(getIndexedKey(name, size), element) - } - size += 1 + elements.foreach{ + insertVectorStorageEntryFor(name, _) } - vectorAccess.put(getKey(name, vectorSizeIndex), IntSerializer.toBytes(size)) + } def insertVectorStorageEntryFor(name: String, element: Array[Byte]) = { - insertVectorStorageEntriesFor(name, List(element)) + val mdata = getVectorMetadata(name) + if (mdata.canInsert) { + val key = getIndexedKey(name, mdata.head) + element match { + case null => vectorAccess.delete(key) + case _ => vectorAccess.put(key, element) + } + vectorAccess.put(getKey(name, vectorHeadIndex), IntSerializer.toBytes(mdata.nextInsert)) + } else { + throw new IllegalStateException("The vector %s is full".format(name)) + } + } + + def getVectorMetadata(name: String): VectorMetadata = { + val keys = List(getKey(name, vectorHeadIndex), getKey(name, vectorTailIndex)) + val vdata = vectorAccess.getAll(keys) + val values = keys.map{ + vdata.get(_) match { + case Some(value) => IntSerializer.fromBytes(value) + case None => 0 + } + } + VectorMetadata(values.head, values.tail.head) + } + + def remove(name: String): Boolean = { val mdata = getQueueMetadata(name) - mdata.getActiveIndexes foreach { index => - queueAccess.delete(getIndexedKey(name, index)) + mdata.getActiveIndexes foreach { + index => + queueAccess.delete(getIndexedKey(name, index)) } queueAccess.delete(getKey(name, queueHeadIndex)) queueAccess.delete(getKey(name, queueTailIndex)) @@ -240,8 +263,8 @@ private[akka] trait KVStorageBackend extends MapStorageBackend[Array[Byte], Arra def peek(name: String, start: Int, count: Int): List[Array[Byte]] = { val mdata = getQueueMetadata(name) - val ret = mdata.getPeekIndexes(start, count).toList map { index: Int => - { + val ret = mdata.getPeekIndexes(start, count).toList map { + index: Int => { log.debug("peeking:" + index) queueAccess.getValue(getIndexedKey(name, index)) } @@ -257,12 +280,14 @@ private[akka] trait KVStorageBackend extends MapStorageBackend[Array[Byte], Arra val mdata = getQueueMetadata(name) if (mdata.canDequeue) { val key = getIndexedKey(name, mdata.head) - try { + try + { val dequeued = queueAccess.getValue(key) queueAccess.put(getKey(name, queueHeadIndex), IntSerializer.toBytes(mdata.nextDequeue)) Some(dequeued) } finally { - try { + try + { queueAccess.delete(key) } catch { //a failure to delete is ok, just leaves a K-V in Voldemort that will be overwritten if the queue ever wraps around @@ -292,7 +317,7 @@ private[akka] trait KVStorageBackend extends MapStorageBackend[Array[Byte], Arra def getQueueMetadata(name: String): QueueMetadata = { val keys = List(getKey(name, queueHeadIndex), getKey(name, queueTailIndex)) val qdata = queueAccess.getAll(keys) - val values = keys.map { + val values = keys.map{ qdata.get(_) match { case Some(value) => IntSerializer.fromBytes(value) case None => 0 @@ -343,6 +368,7 @@ private[akka] trait KVStorageBackend extends MapStorageBackend[Array[Byte], Arra } //wrapper for null + def getStoredMapValue(value: Array[Byte]): Array[Byte] = { value match { case null => nullMapValue @@ -373,6 +399,7 @@ private[akka] trait KVStorageBackend extends MapStorageBackend[Array[Byte], Arra //wraps around when one pointer gets to max value //head has an element in it. //tail is the next slot to write to. + def size = { if (tail >= head) { tail - head @@ -387,7 +414,9 @@ private[akka] trait KVStorageBackend extends MapStorageBackend[Array[Byte], Arra size < Integer.MAX_VALUE - 1 } - def canDequeue = { size > 0 } + def canDequeue = { + size > 0 + } def getActiveIndexes(): IndexedSeq[Int] = { if (tail >= head) { @@ -395,13 +424,21 @@ private[akka] trait KVStorageBackend extends MapStorageBackend[Array[Byte], Arra } else { //queue has wrapped val headRange = Range.inclusive(head, Integer.MAX_VALUE) - (if (tail > 0) { headRange ++ Range(0, tail) } else { headRange }) + (if (tail > 0) { + headRange ++ Range(0, tail) + } else { + headRange + }) } } def getPeekIndexes(start: Int, count: Int): IndexedSeq[Int] = { val indexes = getActiveIndexes - if (indexes.size < start) { IndexedSeq.empty[Int] } else { indexes.drop(start).take(count) } + if (indexes.size < start) { + IndexedSeq.empty[Int] + } else { + indexes.drop(start).take(count) + } } def nextEnqueue = { @@ -419,6 +456,64 @@ private[akka] trait KVStorageBackend extends MapStorageBackend[Array[Byte], Arra } } + case class VectorMetadata(head: Int, tail: Int) { + + def size = { + if (head >= tail) { + head - tail + } else { + //queue has wrapped + (Integer.MAX_VALUE - tail) + (head + 1) + } + } + + def canInsert = { + //the -1 stops the tail from catching the head on a wrap around + size < Integer.MAX_VALUE - 1 + } + + def canRemove = { + size > 0 + } + + def getActiveIndexes(): IndexedSeq[Int] = { + if (head >= tail) { + Range(tail, head) + } else { + //queue has wrapped + val headRange = Range.inclusive(tail, Integer.MAX_VALUE) + (if (head > 0) { + headRange ++ Range(0, head) + } else { + headRange + }) + } + } + + def getRangeIndexes(start: Int, count: Int): IndexedSeq[Int] = { + val indexes = getActiveIndexes.reverse + if (indexes.size < start) { + IndexedSeq.empty[Int] + } else { + indexes.drop(start).take(count) + } + } + + def nextInsert = { + head match { + case Integer.MAX_VALUE => 0 + case _ => head + 1 + } + } + + def nextRemove = { + tail match { + case Integer.MAX_VALUE => 0 + case _ => tail + 1 + } + } + } + object IntSerializer { val bytesPerInt = java.lang.Integer.SIZE / java.lang.Byte.SIZE @@ -433,14 +528,14 @@ private[akka] trait KVStorageBackend extends MapStorageBackend[Array[Byte], Arra object SortedSetSerializer { def toBytes(set: SortedSet[Array[Byte]]): Array[Byte] = { - val length = set.foldLeft(0) { (total, bytes) => - { + val length = set.foldLeft(0) { + (total, bytes) => { total + bytes.length + IntSerializer.bytesPerInt } } val allBytes = new Array[Byte](length) - val written = set.foldLeft(0) { (total, bytes) => - { + val written = set.foldLeft(0) { + (total, bytes) => { val sizeBytes = IntSerializer.toBytes(bytes.length) System.arraycopy(sizeBytes, 0, allBytes, total, sizeBytes.length) System.arraycopy(bytes, 0, allBytes, total + sizeBytes.length, bytes.length) diff --git a/akka-persistence/akka-persistence-common/src/main/scala/Storage.scala b/akka-persistence/akka-persistence-common/src/main/scala/Storage.scala index fa4ae1c358..e4eeca5aba 100644 --- a/akka-persistence/akka-persistence-common/src/main/scala/Storage.scala +++ b/akka-persistence/akka-persistence-common/src/main/scala/Storage.scala @@ -455,7 +455,7 @@ trait PersistentVector[T] extends IndexedSeq[T] with Transactional with Committa (entry: @unchecked) match { case LogEntry(_, Some(v), ADD) => storage.insertVectorStorageEntryFor(uuid, v) case LogEntry(Some(i), Some(v), UPD) => storage.updateVectorStorageEntryFor(uuid, i, v) - case LogEntry(_, _, POP) => //.. + case LogEntry(_, _, POP) => storage.removeVectorStorageEntryFor(uuid) } } appendOnlyTxLog.clear @@ -517,8 +517,9 @@ trait PersistentVector[T] extends IndexedSeq[T] with Transactional with Committa */ def pop: T = { register + val curr = replay appendOnlyTxLog + LogEntry(None, None, POP) - throw new UnsupportedOperationException("PersistentVector::pop is not implemented") + curr.last } def update(index: Int, newElem: T) = { diff --git a/akka-persistence/akka-persistence-common/src/main/scala/StorageBackend.scala b/akka-persistence/akka-persistence-common/src/main/scala/StorageBackend.scala index 7e6a95f9a1..6e56e0b792 100644 --- a/akka-persistence/akka-persistence-common/src/main/scala/StorageBackend.scala +++ b/akka-persistence/akka-persistence-common/src/main/scala/StorageBackend.scala @@ -27,6 +27,11 @@ trait VectorStorageBackend[T] extends StorageBackend { def getVectorStorageEntryFor(name: String, index: Int): T def getVectorStorageRangeFor(name: String, start: Option[Int], finish: Option[Int], count: Int): List[T] def getVectorStorageSizeFor(name: String): Int + def removeVectorStorageEntryFor(name:String) = { + //Unfortunately this is thrown on commit, not at the time of the call to VectorStorage.pop + //Should we add a supportsRemove method that allows an early throw of the exception? + throw new UnsupportedOperationException("VectorStorageBackend.removeVectorStorageEntry is not supported") + } } // for Ref diff --git a/akka-persistence/akka-persistence-voldemort/src/test/scala/VoldemortStorageBackendSuite.scala b/akka-persistence/akka-persistence-voldemort/src/test/scala/VoldemortStorageBackendSuite.scala index 5b03e013ff..0e57a135e0 100644 --- a/akka-persistence/akka-persistence-voldemort/src/test/scala/VoldemortStorageBackendSuite.scala +++ b/akka-persistence/akka-persistence-voldemort/src/test/scala/VoldemortStorageBackendSuite.scala @@ -100,7 +100,8 @@ class VoldemortStorageBackendSuite extends FunSuite with ShouldMatchers with Emb val key = "vectorApiKey" val value = bytes("Some bytes we want to store in a vector") val updatedValue = bytes("Some updated bytes we want to store in a vector") - vectorAccess.delete(getKey(key, vectorSizeIndex)) + vectorAccess.delete(getKey(key, vectorHeadIndex)) + vectorAccess.delete(getKey(key, vectorTailIndex)) vectorAccess.delete(getIndexedKey(key, 0)) vectorAccess.delete(getIndexedKey(key, 1))