Initial PersistentMap backend

This commit is contained in:
ticktock 2010-09-14 21:02:22 -04:00
parent 34da28ff03
commit c86497a770
3 changed files with 57 additions and 57 deletions

View file

@ -1,24 +0,0 @@
/**
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.persistence.voldemort
import se.scalablesolutions.akka.util.UUID
import se.scalablesolutions.akka.stm._
import se.scalablesolutions.akka.persistence.common._
import voldemort.client.StoreClient
class VoldemortSession {
val voldemort: StoreClient
def getOptionalBytes(name: String): Option[Array[Byte]] = {
}
def put(name:)
}

View file

@ -34,10 +34,10 @@ class VoldemortPersistentMap(id: String) extends PersistentMapBinary {
class VoldemortPersistentVector(id: String) extends PersistentVector[Array[Byte]] {
val uuid = id
val storage = VoldemortStoragebackend
val storage = VoldemortStorageBackend
}
class VoldemortPersistentRef(id: String) extends PersistentRef[Array[Byte]] {
val uuid = id
val storage = VoldemortStoragebackend
val storage = VoldemortStorageBackend
}

View file

@ -13,6 +13,11 @@ import se.scalablesolutions.akka.config.Config.config
import voldemort.client._
import collection.mutable.{Set, HashSet, ArrayBuffer}
import java.lang.String
import voldemort.utils.ByteUtils
import collection.immutable.{SortedSet, TreeSet}
import voldemort.versioning.Versioned
import java.util.Map
import collection.JavaConversions
private[akka] object VoldemortStorageBackend extends
@ -23,25 +28,30 @@ MapStorageBackend[Array[Byte], Array[Byte]] with
/**
* Concat the owner+key+lenght of owner so owned data will be colocated
* Store the length of owner as last byte to work aroune the rarest case
* Store the length of owner as last byte to work around the rare case
* where ownerbytes1 + keybytes1 == ownerbytes2 + keybytes2 but ownerbytes1 != ownerbytes2
*/
private def mapKey(owner: String, key: Array[Byte]): Array[Byte] = {
val ownerBytes: Array[Byte] = owner.getBytes("UTF-8")
val ownerLenghtByte = ownerBytes.length.byteValue
val mapKey = new Array[Byte](ownerBytes.length + key.length + 1)
System.arraycopy(ownerBytes, 0, mapKey, 0, ownerBytes.length)
System.arraycopy(key, 0, mapKey, ownerBytes.length, key.length)
mapKey.update(mapKey.length - 1) = ownerLenghtByte
val theMapKey = new Array[Byte](ownerBytes.length + key.length + 1)
System.arraycopy(ownerBytes, 0, theMapKey, 0, ownerBytes.length)
System.arraycopy(key, 0, theMapKey, ownerBytes.length, key.length)
theMapKey.update(theMapKey.length - 1, ownerLenghtByte)
theMapKey
}
var refClient: StoreClient
var mapKeyClient: StoreClient
var mapValueClient: StoreClient
var refClient: StoreClient[String, Array[Byte]] = null
var mapKeyClient: StoreClient[String, SortedSet[Array[Byte]]] = null
var mapValueClient: StoreClient[Array[Byte], Array[Byte]] = null
implicit val byteOrder = new Ordering[Array[Byte]] {
override def compare(x: Array[Byte], y: Array[Byte]) = ByteUtils.compare(x, y)
}
def getRefStorageFor(name: String): Option[Array[Byte]] = {
val result: Array[Byte] = refClient.get(RefKey(name).key)
val result: Array[Byte] = refClient.getValue(name)
result match {
case null => None
case _ => Some(result)
@ -49,29 +59,41 @@ MapStorageBackend[Array[Byte], Array[Byte]] with
}
def insertRefStorageFor(name: String, element: Array[Byte]) = {
refClient.put(RefKey(name).key, element)
refClient.put(name, element)
}
def getMapStorageRangeFor(name: String, start: Option[Array[Byte]], finish: Option[Array[Byte]], count: Int): List[(Array[Byte], Array[Byte])] = {
val allkeys: SortedSet[Array[Byte]] = mapKeyClient.getValue(name, new TreeSet[Array[Byte]])
val range = allkeys.rangeImpl(start, finish).take(count)
getKeyValues(range)
}
def getMapStorageFor(name: String): List[(Array[Byte], Array[Byte])] = {
val keys: Set[Array[Byte]] = mapKeyClient.getValue(name, new HashSet[Array[Byte]](0))
val entries: ArrayBuffer[(Array[Byte], Array[Byte])] = new ArrayBuffer
keys.foreach {
entries += (_, mapValueClient.getValue(mapKey(name, _)))
}
entries.toList
val keys = mapKeyClient.getValue(name, new TreeSet[Array[Byte]]())
getKeyValues(keys)
}
private def getKeyValues(keys: SortedSet[Array[Byte]]): List[(Array[Byte], Array[Byte])] = {
val all: Map[Array[Byte], Versioned[Array[Byte]]] = mapValueClient.getAll(JavaConversions.asIterable(keys))
JavaConversions.asMap(all).foldLeft(new ArrayBuffer[(Array[Byte], Array[Byte])]) {
(buf, keyVal) => {
keyVal match {
case (key, versioned) => {
buf += key -> versioned.getValue
}
}
buf
}
}.toList
}
def getMapStorageSizeFor(name: String): Int = {
val keys: Set[Array[Byte]] = mapKeyClient.getValue(name, new HashSet[Array[Byte]](0))
val keys = mapKeyClient.getValue(name, new TreeSet[Array[Byte]]())
keys.size
}
def getMapStorageEntryFor(name: String, key: Array[Byte]): Option[Array[Byte]] = {
val result: Array[Byte] = mapValueClient.get(mapKey(name, key))
val result: Array[Byte] = mapValueClient.getValue(mapKey(name, key))
result match {
case null => None
case _ => Some(result)
@ -79,7 +101,7 @@ MapStorageBackend[Array[Byte], Array[Byte]] with
}
def removeMapStorageFor(name: String, key: Array[Byte]) = {
val keys: Set[Array[Byte]] = mapKeyClient.getValue(name, new HashSet[Array[Byte]](0))
var keys = mapKeyClient.getValue(name, new TreeSet[Array[Byte]]())
keys -= key
mapKeyClient.put(name, keys)
mapValueClient.delete(mapKey(name, key))
@ -87,33 +109,35 @@ MapStorageBackend[Array[Byte], Array[Byte]] with
def removeMapStorageFor(name: String) = {
val keys: Set[Array[Byte]] = mapKeyClient.getValue(name, new HashSet[Array[Byte]](0))
val keys = mapKeyClient.getValue(name, new TreeSet[Array[Byte]]())
keys.foreach {
mapValueClient.delete(mapKey(name, _))
key =>
mapValueClient.delete(mapKey(name, key))
}
mapKeyClient.delete(name)
}
def insertMapStorageEntryFor(name: String, key: Array[Byte], value: Array[Byte]) = {
mapValueClient.put(mapKey(name, key))
val keys: Set[Array[Byte]] = mapKeyClient.getValue(name, new HashSet[Array[Byte]](0))
mapValueClient.put(mapKey(name, key), value)
var keys = mapKeyClient.getValue(name, new TreeSet[Array[Byte]]())
keys += key
mapKeyClient.put(name, keys)
}
def insertMapStorageEntriesFor(name: String, entries: List[(Array[Byte], Array[Byte])]) = {
val newKeys = new HashSet[Array[Byte]]
entries.foreach {
(key, value) => mapValueClient.put(mapKey(name, key), value)
newKeys += key
val newKeys = entries.map {
case (key, value) => {
mapValueClient.put(mapKey(name, key), value)
key
}
}
val keys: Set[Array[Byte]] = mapKeyClient.getValue(name, new HashSet[Array[Byte]](0))
keys += key
var keys = mapKeyClient.getValue(name, new TreeSet[Array[Byte]]())
keys ++= newKeys
mapKeyClient.put(name, keys)
}
def getVectorStorageSizeFor(name: String): Int = null
def getVectorStorageSizeFor(name: String): Int = 0
def getVectorStorageRangeFor(name: String, start: Option[Int], finish: Option[Int], count: Int): List[Array[Byte]] = null