first pass at refactor, common access working (cassandra) now to KVAccess

This commit is contained in:
ticktock 2010-11-10 16:26:04 -05:00
parent 5dff29642d
commit 0eea188644
5 changed files with 155 additions and 62 deletions

View file

@ -62,8 +62,6 @@ private[akka] object CassandraStorageBackend extends CommonStorageBackend {
CONSISTENCY_LEVEL)
class CassandraAccess(parent: ColumnParent) extends CommonStorageBackendAccess {
def path(key: Array[Byte]): ColumnPath = {
@ -77,7 +75,8 @@ private[akka] object CassandraStorageBackend extends CommonStorageBackend {
}
}
}
def getAll(owner: String, keys: Iterable[Array[Byte]]): Map[Array[Byte], Array[Byte]] = {
override def getAll(owner: String, keys: Iterable[Array[Byte]]): Map[Array[Byte], Array[Byte]] = {
sessions.withSession{
session => {
var predicate = new SlicePredicate().setColumn_names(JavaConversions.asList(keys.toList))
@ -91,7 +90,8 @@ private[akka] object CassandraStorageBackend extends CommonStorageBackend {
}
}
def getValue(owner: String, key: Array[Byte], default: Array[Byte]) = {
def get(owner: String, key: Array[Byte], default: Array[Byte]) = {
sessions.withSession{
session => {
try
@ -115,6 +115,7 @@ private[akka] object CassandraStorageBackend extends CommonStorageBackend {
}
}
def drop() = {
sessions.withSession{
session => {

View file

@ -11,6 +11,7 @@ import collection.Map
import java.util.{Map => JMap}
import akka.persistence.common.PersistentMapBinary.COrdering._
import collection.immutable._
import collection.mutable.ArrayBuffer
private[akka] trait CommonStorageBackendAccess {
@ -19,27 +20,74 @@ private[akka] trait CommonStorageBackendAccess {
/*abstract*/
def getValue(owner: String, key: Array[Byte], default: Array[Byte]): Array[Byte]
def get(owner: String, key: Array[Byte], default: Array[Byte]): Array[Byte]
def getAll(owner: String, keys: Iterable[Array[Byte]]): Map[Array[Byte], Array[Byte]] = {
keys.foldLeft(new HashMap[Array[Byte], Array[Byte]]) {
(map, key) => {
Option(get(owner, key)) match {
case Some(value) => map + (key -> value)
case None => map
}
}
}
}
def put(owner: String, key: Array[Byte], value: Array[Byte]): Unit
def putAll(owner: String, keyValues: Iterable[(Array[Byte], Array[Byte])]): Unit = {
keyValues.foreach{
kv => kv match {
case (key, value) => put(owner, key, value)
}
}
}
def delete(owner: String, key: Array[Byte]): Unit
def getAll(owner: String, keys: Iterable[Array[Byte]]): Map[Array[Byte], Array[Byte]]
def deleteAll(owner: String, keys: Iterable[Array[Byte]]): Unit = {
keys.foreach(delete(owner, _))
}
def drop(): Unit
/*concrete*/
def decodeKey(owner: String, key: Array[Byte]) = key
def decodeMapKey(owner: String, key: Array[Byte]) = key
def delete(owner: String, index: Int): Unit = delete(owner, IntSerializer.toBytes(index))
def decodeIndexedKey(owner: String, key: Array[Byte]) = key
def getValue(owner: String, index: Int): Array[Byte] = getValue(owner, IntSerializer.toBytes(index))
def deleteIndexed(owner: String, index: Int): Unit = delete(owner, IntSerializer.toBytes(index))
def getValue(owner: String, key: Array[Byte]): Array[Byte] = getValue(owner, key, null)
def getIndexed(owner: String, index: Int): Array[Byte] = get(owner, IntSerializer.toBytes(index))
def put(owner: String, index: Int, value: Array[Byte]): Unit = put(owner, IntSerializer.toBytes(index), value)
def get(owner: String, key: Array[Byte]): Array[Byte] = get(owner, key, null)
def putIndexed(owner: String, index: Int, value: Array[Byte]): Unit = put(owner, IntSerializer.toBytes(index), value)
def putAllIndexed(owner: String, values: Iterable[(Int, Array[Byte])]): Unit = {
putAll(owner, values.map{
iv => {
iv match {
case (i, value) => (IntSerializer.toBytes(i) -> value)
}
}
})
}
def getAllIndexed(owner: String, keys: Iterable[Int]): Map[Int, Array[Byte]] = {
val byteKeys = keys.map(IntSerializer.toBytes(_))
getAll(owner, byteKeys).map{
kv => kv match {
case (key, value) => (IntSerializer.fromBytes(key) -> value)
}
}
}
def deleteAllIndexed(owner: String, keys: Iterable[Int]): Unit = {
val byteKeys = keys.map(IntSerializer.toBytes(_))
deleteAll(owner, byteKeys)
}
}
private[akka] trait KVStorageBackendAccess extends CommonStorageBackendAccess {
@ -57,7 +105,7 @@ private[akka] trait KVStorageBackendAccess extends CommonStorageBackendAccess {
def delete(key: Array[Byte]): Unit
override def decodeKey(owner: String, key: Array[Byte]): Array[Byte] = {
override def decodeMapKey(owner: String, key: Array[Byte]): Array[Byte] = {
val mapKeyLength = key.length - IntSerializer.bytesPerInt - owner.getBytes("UTF-8").length
val mapkey = new Array[Byte](mapKeyLength)
System.arraycopy(key, key.length - mapKeyLength, mapkey, 0, mapKeyLength)
@ -69,21 +117,21 @@ private[akka] trait KVStorageBackendAccess extends CommonStorageBackendAccess {
put(getKey(owner, key), value)
}
override def put(owner: String, index: Int, value: Array[Byte]): Unit = {
override def putIndexed(owner: String, index: Int, value: Array[Byte]): Unit = {
put(getIndexedKey(owner, index), value)
}
override def getValue(owner: String, key: Array[Byte]): Array[Byte] = {
override def get(owner: String, key: Array[Byte]): Array[Byte] = {
getValue(getKey(owner, key))
}
override def getValue(owner: String, index: Int): Array[Byte] = {
override def getIndexed(owner: String, index: Int): Array[Byte] = {
getValue(getIndexedKey(owner, index))
}
override def getValue(owner: String, key: Array[Byte], default: Array[Byte]): Array[Byte] = {
override def get(owner: String, key: Array[Byte], default: Array[Byte]): Array[Byte] = {
getValue(getKey(owner, key), default)
}
@ -94,7 +142,7 @@ private[akka] trait KVStorageBackendAccess extends CommonStorageBackendAccess {
})
}
override def delete(owner: String, index: Int): Unit = {
override def deleteIndexed(owner: String, index: Int): Unit = {
delete(getIndexedKey(owner, index))
}
@ -118,6 +166,13 @@ private[akka] object CommonStorageBackend {
val mapKeysIndex: Array[Byte] = new Array[Byte](1).padTo(1, mapKeySetKeyHeader)
val mapKeysWrapperPad: Array[Byte] = new Array[Byte](1).padTo(1, mapKeyHeader)
/**
* Wrap map key prepends mapKeysWrapperPad (1-byte) to map keys so that we can
* use a seperate 1 byte key to store the map keyset.
*
* This basically creates the map key used in underlying storage
*/
def wrapMapKey(key: Array[Byte]): Array[Byte] = {
val wrapped = new Array[Byte](key.length + mapKeysWrapperPad.length)
System.arraycopy(mapKeysWrapperPad, 0, wrapped, 0, mapKeysWrapperPad.length)
@ -125,6 +180,11 @@ private[akka] object CommonStorageBackend {
wrapped
}
/**
* unwrapMapKey removes the mapKeysWrapperPad, this translates the map key used
* in underlying storage back to a key that is understandable by the frontend
*/
def unwrapMapKey(key: Array[Byte]): Array[Byte] = {
val unwrapped = new Array[Byte](key.length - mapKeysWrapperPad.length)
System.arraycopy(key, mapKeysWrapperPad.length, unwrapped, 0, unwrapped.length)
@ -260,7 +320,7 @@ private[akka] trait CommonStorageBackend extends MapStorageBackend[Array[Byte],
def getRefStorageFor(name: String): Option[Array[Byte]] = {
val result: Array[Byte] = refAccess.getValue(name, refItem)
val result: Array[Byte] = refAccess.get(name, refItem)
Option(result)
}
@ -293,7 +353,7 @@ private[akka] trait CommonStorageBackend extends MapStorageBackend[Array[Byte],
entry match {
case (namePlusKey: Array[Byte], value: Array[Byte]) => {
//need to fix here
returned += mapAccess.decodeKey(name, unwrapMapKey(namePlusKey)) -> getMapValueFromStored(value)
returned += mapAccess.decodeMapKey(name, unwrapMapKey(namePlusKey)) -> getMapValueFromStored(value)
}
}
}
@ -307,7 +367,7 @@ private[akka] trait CommonStorageBackend extends MapStorageBackend[Array[Byte],
}
def getMapStorageEntryFor(name: String, key: Array[Byte]): Option[Array[Byte]] = {
val result: Array[Byte] = mapAccess.getValue(name, wrapMapKey(key))
val result: Array[Byte] = mapAccess.get(name, wrapMapKey(key))
result match {
case null => None
case _ => Some(getMapValueFromStored(result))
@ -341,11 +401,15 @@ private[akka] trait CommonStorageBackend extends MapStorageBackend[Array[Byte],
}
def insertMapStorageEntriesFor(name: String, entries: List[(Array[Byte], Array[Byte])]) = {
val newKeys = entries.map{
val toInsert = entries.map{
kv => kv match {
case (key, value) => (wrapMapKey(key) -> getStoredMapValue(value))
}
}
mapAccess.putAll(name, toInsert)
val newKeys = toInsert.map{
case (key, value) => {
val wrapped = wrapMapKey(key)
mapAccess.put(name, wrapped, getStoredMapValue(value))
wrapped
key
}
}
var keys = getMapKeys(name)
@ -358,10 +422,9 @@ private[akka] trait CommonStorageBackend extends MapStorageBackend[Array[Byte],
}
def getMapKeys(name: String): SortedSet[Array[Byte]] = {
SortedSetSerializer.fromBytes(mapAccess.getValue(name, mapKeysIndex, Array.empty[Byte]))
SortedSetSerializer.fromBytes(mapAccess.get(name, mapKeysIndex, Array.empty[Byte]))
}
def getVectorStorageSizeFor(name: String): Int = {
getVectorMetadata(name).size
}
@ -381,19 +444,16 @@ private[akka] trait CommonStorageBackend extends MapStorageBackend[Array[Byte],
cnt = mdata.size - st
}
val ret = mdata.getRangeIndexes(st, count).toList map {
index: Int => {
log.debug("getting:" + index)
vectorAccess.getValue(name, index)
}
}
ret
val indexes = mdata.getRangeIndexes(st, count)
val result = vectorAccess.getAllIndexed(name, indexes)
indexes.map(result.get(_).get).toList
}
def getVectorStorageEntryFor(name: String, index: Int): Array[Byte] = {
val mdata = getVectorMetadata(name)
if (mdata.size > 0 && index < mdata.size) {
vectorAccess.getValue(name, mdata.getRangeIndexes(index, 1)(0))
vectorAccess.getIndexed(name, mdata.getRangeIndexes(index, 1)(0))
} else {
throw new StorageException("In Vector:" + name + " No such Index:" + index)
}
@ -403,8 +463,8 @@ private[akka] trait CommonStorageBackend extends MapStorageBackend[Array[Byte],
val mdata = getVectorMetadata(name)
if (mdata.size > 0 && index < mdata.size) {
elem match {
case null => vectorAccess.delete(name, mdata.getRangeIndexes(index, 1)(0))
case _ => vectorAccess.put(name, mdata.getRangeIndexes(index, 1)(0), elem)
case null => vectorAccess.deleteIndexed(name, mdata.getRangeIndexes(index, 1)(0))
case _ => vectorAccess.putIndexed(name, mdata.getRangeIndexes(index, 1)(0), elem)
}
} else {
throw new StorageException("In Vector:" + name + " No such Index:" + index)
@ -412,18 +472,35 @@ private[akka] trait CommonStorageBackend extends MapStorageBackend[Array[Byte],
}
def insertVectorStorageEntriesFor(name: String, elements: List[Array[Byte]]) = {
var mdata = getVectorMetadata(name)
var deletes: List[Int] = Nil
var puts: List[(Int, Array[Byte])] = Nil
elements.foreach{
insertVectorStorageEntryFor(name, _)
element => {
if (mdata.canInsert) {
element match {
case null => deletes = mdata.head :: deletes
case _ => puts = (mdata.head -> element) :: puts
}
mdata = mdata.copy(head = mdata.nextInsert)
} else {
throw new IllegalStateException("The vector dosent have enough capacity to insert these entries")
}
}
}
vectorAccess.deleteAllIndexed(name, deletes)
vectorAccess.putAllIndexed(name, puts)
vectorAccess.put(name, vectorHeadIndex, IntSerializer.toBytes(mdata.head))
}
def insertVectorStorageEntryFor(name: String, element: Array[Byte]) = {
val mdata = getVectorMetadata(name)
if (mdata.canInsert) {
element match {
case null => vectorAccess.delete(name, mdata.head)
case _ => vectorAccess.put(name, mdata.head, element)
case null => vectorAccess.deleteIndexed(name, mdata.head)
case _ => vectorAccess.putIndexed(name, mdata.head, element)
}
vectorAccess.put(name, vectorHeadIndex, IntSerializer.toBytes(mdata.nextInsert))
} else {
@ -439,7 +516,7 @@ private[akka] trait CommonStorageBackend extends MapStorageBackend[Array[Byte],
vectorAccess.put(name, vectorTailIndex, IntSerializer.toBytes(mdata.nextRemove))
try
{
vectorAccess.delete(name, mdata.tail)
vectorAccess.deleteIndexed(name, mdata.tail)
} catch {
case e: Exception => log.warn("Exception while trying to clean up a popped element from the vector, this is acceptable")
}
@ -450,9 +527,12 @@ private[akka] trait CommonStorageBackend extends MapStorageBackend[Array[Byte],
}
def getVectorMetadata(name: String): VectorMetadata = {
val head = vectorAccess.getValue(name, vectorHeadIndex, zero)
val tail = vectorAccess.getValue(name, vectorTailIndex, zero)
VectorMetadata(IntSerializer.fromBytes(head), IntSerializer.fromBytes(tail))
val result = vectorAccess.getAll(name, List(vectorHeadIndex, vectorTailIndex))
val head = result.getOrElse(vectorHeadIndex, zero)
val tail = result.getOrElse(vectorTailIndex, zero)
val mdata = VectorMetadata(IntSerializer.fromBytes(head), IntSerializer.fromBytes(tail))
log.debug(mdata.toString)
mdata
}
def getOrDefaultToZero(map: Map[Array[Byte], Array[Byte]], key: Array[Byte]): Int = {
@ -467,7 +547,7 @@ private[akka] trait CommonStorageBackend extends MapStorageBackend[Array[Byte],
val mdata = getQueueMetadata(name)
mdata.getActiveIndexes foreach {
index =>
queueAccess.delete(name, index)
queueAccess.deleteIndexed(name, index)
}
queueAccess.delete(name, queueHeadIndex)
queueAccess.delete(name, queueTailIndex)
@ -476,13 +556,9 @@ private[akka] trait CommonStorageBackend extends MapStorageBackend[Array[Byte],
def peek(name: String, start: Int, count: Int): List[Array[Byte]] = {
val mdata = getQueueMetadata(name)
val ret = mdata.getPeekIndexes(start, count).toList map {
index: Int => {
log.debug("peeking:" + index)
queueAccess.getValue(name, index)
}
}
ret
val indexes = mdata.getPeekIndexes(start, count)
val result = queueAccess.getAllIndexed(name, indexes)
indexes.map(result.get(_).get).toList
}
def size(name: String): Int = {
@ -494,13 +570,13 @@ private[akka] trait CommonStorageBackend extends MapStorageBackend[Array[Byte],
if (mdata.canDequeue) {
try
{
val dequeued = queueAccess.getValue(name, mdata.head)
val dequeued = queueAccess.getIndexed(name, mdata.head)
queueAccess.put(name, queueHeadIndex, IntSerializer.toBytes(mdata.nextDequeue))
Some(dequeued)
} finally {
try
{
queueAccess.delete(name, mdata.head)
queueAccess.deleteIndexed(name, mdata.head)
} catch {
//a failure to delete is ok, just leaves a K-V in Voldemort that will be overwritten if the queue ever wraps around
case e: Exception => log.warn(e, "caught an exception while deleting a dequeued element, however this will not cause any inconsistency in the queue")
@ -515,8 +591,8 @@ private[akka] trait CommonStorageBackend extends MapStorageBackend[Array[Byte],
val mdata = getQueueMetadata(name)
if (mdata.canEnqueue) {
item match {
case null => queueAccess.delete(name, mdata.tail)
case _ => queueAccess.put(name, mdata.tail, item)
case null => queueAccess.deleteIndexed(name, mdata.tail)
case _ => queueAccess.putIndexed(name, mdata.tail, item)
}
queueAccess.put(name, queueTailIndex, IntSerializer.toBytes(mdata.nextEnqueue))
Some(mdata.size + 1)
@ -526,8 +602,9 @@ private[akka] trait CommonStorageBackend extends MapStorageBackend[Array[Byte],
}
def getQueueMetadata(name: String): QueueMetadata = {
val head = queueAccess.getValue(name, vectorHeadIndex, zero)
val tail = queueAccess.getValue(name, vectorTailIndex, zero)
val result = queueAccess.getAll(name, List(vectorHeadIndex, vectorTailIndex))
val head = result.get(vectorHeadIndex).getOrElse(zero)
val tail = result.get(vectorTailIndex).getOrElse(zero)
QueueMetadata(IntSerializer.fromBytes(head), IntSerializer.fromBytes(tail))
}

View file

@ -305,10 +305,25 @@ object PersistentMapBinary {
ArrayOrdering.compare(o1.toArray, o2.toArray)
}
//backend
implicit object ArrayOrdering extends Ordering[Array[Byte]] {
def compare(o1: Array[Byte], o2: Array[Byte]) =
new String(o1) compare new String(o2)
def compare(o1: Array[Byte], o2: Array[Byte]): Int = {
if (o1.size == o2.size) {
for (i <- 0 until o1.size) {
var a = o1(i)
var b = o2(i)
if (a != b) {
return (a - b) / (Math.abs(a - b))
}
}
0
} else {
(o1.length - o2.length) / (Math.max(1, Math.abs(o1.length - o2.length)))
}
}
}
}
}

View file

@ -48,10 +48,10 @@ private[akka] object MemcachedStorageBackend extends CommonStorageBackend {
base64.encodeToString(key)
}
override def decodeKey(owner: String, key: Array[Byte]) = {
override def decodeMapKey(owner: String, key: Array[Byte]) = {
val newkey = new Array[Byte](key.length - typeBytes.length)
System.arraycopy(key, 0, newkey, 0, newkey.length)
super.decodeKey(owner, newkey)
super.decodeMapKey(owner, newkey)
}
def drop() = client.flush()

View file

@ -22,7 +22,7 @@ class SimpledbTestIntegration extends Spec with ShouldMatchers with BeforeAndAft
val value = new Array[Byte](valsize)
mapAccess.put(name, key, value)
val result = mapAccess.getValue(name, key, Array.empty[Byte])
val result = mapAccess.get(name, key, Array.empty[Byte])
result.size should be(value.size)
result should be(value)
}