Initial frontend code to support vector pop, and KVStorageBackend changes to put the scaffolding in place to support this

This commit is contained in:
ticktock 2010-10-22 20:13:27 -04:00
parent 7b56928bca
commit 9aa70b9139
4 changed files with 178 additions and 76 deletions

View file

@ -15,24 +15,29 @@ import collection.JavaConversions
import java.nio.ByteBuffer
import collection.Map
import collection.mutable.ArrayBuffer
import java.util.{ Properties, Map => JMap }
import java.util.{Properties, Map => JMap}
import se.scalablesolutions.akka.persistence.common.PersistentMapBinary.COrdering._
import collection.immutable._
private [akka] trait KVAccess {
def put(key: Array[Byte], value: Array[Byte])
def getValue(key: Array[Byte]): Array[Byte]
def getValue(key: Array[Byte], default: Array[Byte]): Array[Byte]
def getAll(keys: Iterable[Array[Byte]]): Map[Array[Byte], Array[Byte]]
def delete(key: Array[Byte])
def drop()
}
private[akka] trait KVAccess {
def put(key: Array[Byte], value: Array[Byte])
private [akka] object KVAccess {
implicit def stringToByteArray(st: String): Array[Byte] = {
st.getBytes("UTF-8")
}
def getValue(key: Array[Byte]): Array[Byte]
def getValue(key: Array[Byte], default: Array[Byte]): Array[Byte]
def getAll(keys: Iterable[Array[Byte]]): Map[Array[Byte], Array[Byte]]
def delete(key: Array[Byte])
def drop()
}
private[akka] object KVAccess {
implicit def stringToByteArray(st: String): Array[Byte] = {
st.getBytes("UTF-8")
}
}
private[akka] trait KVStorageBackend extends MapStorageBackend[Array[Byte], Array[Byte]] with VectorStorageBackend[Array[Byte]] with RefStorageBackend[Array[Byte]] with QueueStorageBackend[Array[Byte]] with Logging {
@ -42,17 +47,22 @@ private[akka] trait KVStorageBackend extends MapStorageBackend[Array[Byte], Arra
val notNullMapValueHeader: Byte = 0xff.byteValue
val underscoreBytesUTF8 = "_".getBytes("UTF-8")
val mapKeysIndex = getIndexedBytes(-1)
val vectorSizeIndex = getIndexedBytes(-1)
val vectorHeadIndex = getIndexedBytes(-1)
val vectorTailIndex = getIndexedBytes(-2)
val queueHeadIndex = getIndexedBytes(-1)
val queueTailIndex = getIndexedBytes(-2)
implicit val ordering = ArrayOrdering
import KVAccess._
def refAccess: KVAccess
def vectorAccess: KVAccess
def mapAccess: KVAccess
def queueAccess: KVAccess
def getRefStorageFor(name: String): Option[Array[Byte]] = {
@ -80,13 +90,14 @@ private[akka] trait KVStorageBackend extends MapStorageBackend[Array[Byte], Arra
private def getKeyValues(name: String, keys: SortedSet[Array[Byte]]): List[(Array[Byte], Array[Byte])] = {
val all: Map[Array[Byte], Array[Byte]] =
mapAccess.getAll(keys.map { mapKey =>
getKey(name, mapKey)
mapAccess.getAll(keys.map{
mapKey =>
getKey(name, mapKey)
})
var returned = new TreeMap[Array[Byte], Array[Byte]]()(ordering)
all.foreach { (entry) =>
{
all.foreach{
(entry) => {
entry match {
case (namePlusKey: Array[Byte], value: Array[Byte]) => {
returned += getMapKeyFromKey(name, namePlusKey) -> getMapValueFromStored(value)
@ -119,9 +130,10 @@ private[akka] trait KVStorageBackend extends MapStorageBackend[Array[Byte], Arra
def removeMapStorageFor(name: String) = {
val keys = getMapKeys(name)
keys.foreach { key =>
mapAccess.delete(getKey(name, key))
log.debug("deleted key %s for %s", key, name)
keys.foreach{
key =>
mapAccess.delete(getKey(name, key))
log.debug("deleted key %s for %s", key, name)
}
mapAccess.delete(getKey(name, mapKeysIndex))
}
@ -134,7 +146,7 @@ private[akka] trait KVStorageBackend extends MapStorageBackend[Array[Byte], Arra
}
def insertMapStorageEntriesFor(name: String, entries: List[(Array[Byte], Array[Byte])]) = {
val newKeys = entries.map {
val newKeys = entries.map{
case (key, value) => {
mapAccess.put(getKey(name, key), getStoredMapValue(value))
key
@ -154,11 +166,12 @@ private[akka] trait KVStorageBackend extends MapStorageBackend[Array[Byte], Arra
}
def getVectorStorageSizeFor(name: String): Int = {
IntSerializer.fromBytes(vectorAccess.getValue(getKey(name, vectorSizeIndex), IntSerializer.toBytes(0)))
getVectorMetadata(name).size
}
def getVectorStorageRangeFor(name: String, start: Option[Int], finish: Option[Int], count: Int): List[Array[Byte]] = {
val size = getVectorStorageSizeFor(name)
val mdata = getVectorMetadata(name)
val st = start.getOrElse(0)
var cnt =
if (finish.isDefined) {
@ -167,46 +180,34 @@ private[akka] trait KVStorageBackend extends MapStorageBackend[Array[Byte], Arra
} else {
count
}
if (cnt > (size - st)) {
cnt = size - st
if (cnt > (mdata.size - st)) {
cnt = mdata.size - st
}
val seq: IndexedSeq[Array[Byte]] = (st until st + cnt).map { index =>
getIndexedKey(name, (size - 1) - index)
} //read backwards
val all: Map[Array[Byte], Array[Byte]] = vectorAccess.getAll(seq)
var storage = new ArrayBuffer[Array[Byte]](seq.size)
storage = storage.padTo(seq.size, Array.empty[Byte])
var idx = 0;
seq.foreach { key =>
{
if (all.isDefinedAt(key)) {
storage.update(idx, all.get(key).get)
}
idx += 1
val ret = mdata.getRangeIndexes(st, count).toList map {
index: Int => {
log.debug("getting:" + index)
vectorAccess.getValue(getIndexedKey(name, index))
}
}
storage.toList
ret
}
def getVectorStorageEntryFor(name: String, index: Int): Array[Byte] = {
val size = getVectorStorageSizeFor(name)
if (size > 0 && index < size) {
vectorAccess.getValue(getIndexedKey(name, /*read backwards*/ (size - 1) - index))
val mdata = getVectorMetadata(name)
if (mdata.size > 0 && index < mdata.size) {
vectorAccess.getValue(getIndexedKey(name, mdata.getRangeIndexes(index, 1)(0)))
} else {
throw new StorageException("In Vector:" + name + " No such Index:" + index)
}
}
def updateVectorStorageEntryFor(name: String, index: Int, elem: Array[Byte]) = {
val size = getVectorStorageSizeFor(name)
if (size > 0 && index < size) {
val mdata = getVectorMetadata(name)
if (mdata.size > 0 && index < mdata.size) {
elem match {
case null => vectorAccess.delete(getIndexedKey(name, /*read backwards*/ (size - 1) - index))
case _ => vectorAccess.put(getIndexedKey(name, /*read backwards*/ (size - 1) - index), elem)
case null => vectorAccess.delete(getIndexedKey(name, mdata.getRangeIndexes(index, 1)(0)))
case _ => vectorAccess.put(getIndexedKey(name, mdata.getRangeIndexes(index, 1)(0)), elem)
}
} else {
throw new StorageException("In Vector:" + name + " No such Index:" + index)
@ -214,24 +215,46 @@ private[akka] trait KVStorageBackend extends MapStorageBackend[Array[Byte], Arra
}
def insertVectorStorageEntriesFor(name: String, elements: List[Array[Byte]]) = {
var size = getVectorStorageSizeFor(name)
elements.foreach { element =>
if (element != null) {
vectorAccess.put(getIndexedKey(name, size), element)
}
size += 1
elements.foreach{
insertVectorStorageEntryFor(name, _)
}
vectorAccess.put(getKey(name, vectorSizeIndex), IntSerializer.toBytes(size))
}
def insertVectorStorageEntryFor(name: String, element: Array[Byte]) = {
insertVectorStorageEntriesFor(name, List(element))
val mdata = getVectorMetadata(name)
if (mdata.canInsert) {
val key = getIndexedKey(name, mdata.head)
element match {
case null => vectorAccess.delete(key)
case _ => vectorAccess.put(key, element)
}
vectorAccess.put(getKey(name, vectorHeadIndex), IntSerializer.toBytes(mdata.nextInsert))
} else {
throw new IllegalStateException("The vector %s is full".format(name))
}
}
def getVectorMetadata(name: String): VectorMetadata = {
val keys = List(getKey(name, vectorHeadIndex), getKey(name, vectorTailIndex))
val vdata = vectorAccess.getAll(keys)
val values = keys.map{
vdata.get(_) match {
case Some(value) => IntSerializer.fromBytes(value)
case None => 0
}
}
VectorMetadata(values.head, values.tail.head)
}
def remove(name: String): Boolean = {
val mdata = getQueueMetadata(name)
mdata.getActiveIndexes foreach { index =>
queueAccess.delete(getIndexedKey(name, index))
mdata.getActiveIndexes foreach {
index =>
queueAccess.delete(getIndexedKey(name, index))
}
queueAccess.delete(getKey(name, queueHeadIndex))
queueAccess.delete(getKey(name, queueTailIndex))
@ -240,8 +263,8 @@ private[akka] trait KVStorageBackend extends MapStorageBackend[Array[Byte], Arra
def peek(name: String, start: Int, count: Int): List[Array[Byte]] = {
val mdata = getQueueMetadata(name)
val ret = mdata.getPeekIndexes(start, count).toList map { index: Int =>
{
val ret = mdata.getPeekIndexes(start, count).toList map {
index: Int => {
log.debug("peeking:" + index)
queueAccess.getValue(getIndexedKey(name, index))
}
@ -257,12 +280,14 @@ private[akka] trait KVStorageBackend extends MapStorageBackend[Array[Byte], Arra
val mdata = getQueueMetadata(name)
if (mdata.canDequeue) {
val key = getIndexedKey(name, mdata.head)
try {
try
{
val dequeued = queueAccess.getValue(key)
queueAccess.put(getKey(name, queueHeadIndex), IntSerializer.toBytes(mdata.nextDequeue))
Some(dequeued)
} finally {
try {
try
{
queueAccess.delete(key)
} catch {
//a failure to delete is ok, just leaves a K-V in Voldemort that will be overwritten if the queue ever wraps around
@ -292,7 +317,7 @@ private[akka] trait KVStorageBackend extends MapStorageBackend[Array[Byte], Arra
def getQueueMetadata(name: String): QueueMetadata = {
val keys = List(getKey(name, queueHeadIndex), getKey(name, queueTailIndex))
val qdata = queueAccess.getAll(keys)
val values = keys.map {
val values = keys.map{
qdata.get(_) match {
case Some(value) => IntSerializer.fromBytes(value)
case None => 0
@ -343,6 +368,7 @@ private[akka] trait KVStorageBackend extends MapStorageBackend[Array[Byte], Arra
}
//wrapper for null
def getStoredMapValue(value: Array[Byte]): Array[Byte] = {
value match {
case null => nullMapValue
@ -373,6 +399,7 @@ private[akka] trait KVStorageBackend extends MapStorageBackend[Array[Byte], Arra
//wraps around when one pointer gets to max value
//head has an element in it.
//tail is the next slot to write to.
def size = {
if (tail >= head) {
tail - head
@ -387,7 +414,9 @@ private[akka] trait KVStorageBackend extends MapStorageBackend[Array[Byte], Arra
size < Integer.MAX_VALUE - 1
}
def canDequeue = { size > 0 }
def canDequeue = {
size > 0
}
def getActiveIndexes(): IndexedSeq[Int] = {
if (tail >= head) {
@ -395,13 +424,21 @@ private[akka] trait KVStorageBackend extends MapStorageBackend[Array[Byte], Arra
} else {
//queue has wrapped
val headRange = Range.inclusive(head, Integer.MAX_VALUE)
(if (tail > 0) { headRange ++ Range(0, tail) } else { headRange })
(if (tail > 0) {
headRange ++ Range(0, tail)
} else {
headRange
})
}
}
def getPeekIndexes(start: Int, count: Int): IndexedSeq[Int] = {
val indexes = getActiveIndexes
if (indexes.size < start) { IndexedSeq.empty[Int] } else { indexes.drop(start).take(count) }
if (indexes.size < start) {
IndexedSeq.empty[Int]
} else {
indexes.drop(start).take(count)
}
}
def nextEnqueue = {
@ -419,6 +456,64 @@ private[akka] trait KVStorageBackend extends MapStorageBackend[Array[Byte], Arra
}
}
case class VectorMetadata(head: Int, tail: Int) {
def size = {
if (head >= tail) {
head - tail
} else {
//queue has wrapped
(Integer.MAX_VALUE - tail) + (head + 1)
}
}
def canInsert = {
//the -1 stops the tail from catching the head on a wrap around
size < Integer.MAX_VALUE - 1
}
def canRemove = {
size > 0
}
def getActiveIndexes(): IndexedSeq[Int] = {
if (head >= tail) {
Range(tail, head)
} else {
//queue has wrapped
val headRange = Range.inclusive(tail, Integer.MAX_VALUE)
(if (head > 0) {
headRange ++ Range(0, head)
} else {
headRange
})
}
}
def getRangeIndexes(start: Int, count: Int): IndexedSeq[Int] = {
val indexes = getActiveIndexes.reverse
if (indexes.size < start) {
IndexedSeq.empty[Int]
} else {
indexes.drop(start).take(count)
}
}
def nextInsert = {
head match {
case Integer.MAX_VALUE => 0
case _ => head + 1
}
}
def nextRemove = {
tail match {
case Integer.MAX_VALUE => 0
case _ => tail + 1
}
}
}
object IntSerializer {
val bytesPerInt = java.lang.Integer.SIZE / java.lang.Byte.SIZE
@ -433,14 +528,14 @@ private[akka] trait KVStorageBackend extends MapStorageBackend[Array[Byte], Arra
object SortedSetSerializer {
def toBytes(set: SortedSet[Array[Byte]]): Array[Byte] = {
val length = set.foldLeft(0) { (total, bytes) =>
{
val length = set.foldLeft(0) {
(total, bytes) => {
total + bytes.length + IntSerializer.bytesPerInt
}
}
val allBytes = new Array[Byte](length)
val written = set.foldLeft(0) { (total, bytes) =>
{
val written = set.foldLeft(0) {
(total, bytes) => {
val sizeBytes = IntSerializer.toBytes(bytes.length)
System.arraycopy(sizeBytes, 0, allBytes, total, sizeBytes.length)
System.arraycopy(bytes, 0, allBytes, total + sizeBytes.length, bytes.length)

View file

@ -455,7 +455,7 @@ trait PersistentVector[T] extends IndexedSeq[T] with Transactional with Committa
(entry: @unchecked) match {
case LogEntry(_, Some(v), ADD) => storage.insertVectorStorageEntryFor(uuid, v)
case LogEntry(Some(i), Some(v), UPD) => storage.updateVectorStorageEntryFor(uuid, i, v)
case LogEntry(_, _, POP) => //..
case LogEntry(_, _, POP) => storage.removeVectorStorageEntryFor(uuid)
}
}
appendOnlyTxLog.clear
@ -517,8 +517,9 @@ trait PersistentVector[T] extends IndexedSeq[T] with Transactional with Committa
*/
def pop: T = {
register
val curr = replay
appendOnlyTxLog + LogEntry(None, None, POP)
throw new UnsupportedOperationException("PersistentVector::pop is not implemented")
curr.last
}
def update(index: Int, newElem: T) = {

View file

@ -27,6 +27,11 @@ trait VectorStorageBackend[T] extends StorageBackend {
def getVectorStorageEntryFor(name: String, index: Int): T
def getVectorStorageRangeFor(name: String, start: Option[Int], finish: Option[Int], count: Int): List[T]
def getVectorStorageSizeFor(name: String): Int
def removeVectorStorageEntryFor(name:String) = {
//Unfortunately this is thrown on commit, not at the time of the call to VectorStorage.pop
//Should we add a supportsRemove method that allows an early throw of the exception?
throw new UnsupportedOperationException("VectorStorageBackend.removeVectorStorageEntry is not supported")
}
}
// for Ref

View file

@ -100,7 +100,8 @@ class VoldemortStorageBackendSuite extends FunSuite with ShouldMatchers with Emb
val key = "vectorApiKey"
val value = bytes("Some bytes we want to store in a vector")
val updatedValue = bytes("Some updated bytes we want to store in a vector")
vectorAccess.delete(getKey(key, vectorSizeIndex))
vectorAccess.delete(getKey(key, vectorHeadIndex))
vectorAccess.delete(getKey(key, vectorTailIndex))
vectorAccess.delete(getIndexedKey(key, 0))
vectorAccess.delete(getIndexedKey(key, 1))