Adding Voldemort persistence plugin

This commit is contained in:
Viktor Klang 2010-09-23 15:12:12 +02:00
commit 9a4f2a79a4
11 changed files with 985 additions and 2 deletions

View file

@ -0,0 +1,43 @@
/**
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.persistence.voldemort
import se.scalablesolutions.akka.actor.{newUuid}
import se.scalablesolutions.akka.stm._
import se.scalablesolutions.akka.persistence.common._
object VoldemortStorage extends Storage {
type ElementType = Array[Byte]
def newMap: PersistentMap[ElementType, ElementType] = newMap(newUuid.toString)
def newVector: PersistentVector[ElementType] = newVector(newUuid.toString)
def newRef: PersistentRef[ElementType] = newRef(newUuid.toString)
def getMap(id: String): PersistentMap[ElementType, ElementType] = newMap(id)
def getVector(id: String): PersistentVector[ElementType] = newVector(id)
def getRef(id: String): PersistentRef[ElementType] = newRef(id)
def newMap(id: String): PersistentMap[ElementType, ElementType] = new VoldemortPersistentMap(id)
def newVector(id: String): PersistentVector[ElementType] = new VoldemortPersistentVector(id)
def newRef(id: String): PersistentRef[ElementType] = new VoldemortPersistentRef(id)
}
class VoldemortPersistentMap(id: String) extends PersistentMapBinary {
val uuid = id
val storage = VoldemortStorageBackend
}
class VoldemortPersistentVector(id: String) extends PersistentVector[Array[Byte]] {
val uuid = id
val storage = VoldemortStorageBackend
}
class VoldemortPersistentRef(id: String) extends PersistentRef[Array[Byte]] {
val uuid = id
val storage = VoldemortStorageBackend
}

View file

@ -0,0 +1,331 @@
/**
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.persistence.voldemort
import se.scalablesolutions.akka.stm._
import se.scalablesolutions.akka.persistence.common._
import se.scalablesolutions.akka.util.Logging
import se.scalablesolutions.akka.util.Helpers._
import se.scalablesolutions.akka.config.Config.config
import voldemort.client._
import java.lang.String
import voldemort.utils.ByteUtils
import voldemort.versioning.Versioned
import collection.JavaConversions
import java.nio.ByteBuffer
import collection.Map
import collection.immutable.{IndexedSeq, SortedSet, TreeSet, HashMap}
import collection.mutable.{Set, HashSet, ArrayBuffer}
import java.util.{Properties, Map => JMap}
private[akka] object VoldemortStorageBackend extends
MapStorageBackend[Array[Byte], Array[Byte]] with
VectorStorageBackend[Array[Byte]] with
RefStorageBackend[Array[Byte]] with
Logging {
val bootstrapUrlsProp = "bootstrap_urls"
val clientConfig = config.getConfigMap("akka.storage.voldemort.client") match {
case Some(configMap) => getClientConfig(configMap.asMap)
case None => getClientConfig(new HashMap[String, String] + (bootstrapUrlsProp -> "tcp://localhost:6666"))
}
val refStore = config.getString("akka.storage.voldemort.store.ref", "Refs")
val mapKeyStore = config.getString("akka.storage.voldemort.store.map-key", "MapKeys")
val mapValueStore = config.getString("akka.storage.voldemort.store.map-value", "MapValues")
val vectorSizeStore = config.getString("akka.storage.voldemort.store.vector-size", "VectorSizes")
val vectorValueStore = config.getString("akka.storage.voldemort.store.vector-value", "VectorValues")
var storeClientFactory: StoreClientFactory = null
var refClient: StoreClient[String, Array[Byte]] = null
var mapKeyClient: StoreClient[String, Array[Byte]] = null
var mapValueClient: StoreClient[Array[Byte], Array[Byte]] = null
var vectorSizeClient: StoreClient[String, Array[Byte]] = null
var vectorValueClient: StoreClient[Array[Byte], Array[Byte]] = null
initStoreClients
val underscoreBytesUTF8 = "_".getBytes("UTF-8")
implicit val byteOrder = new Ordering[Array[Byte]] {
override def compare(x: Array[Byte], y: Array[Byte]) = ByteUtils.compare(x, y)
}
def getRefStorageFor(name: String): Option[Array[Byte]] = {
val result: Array[Byte] = refClient.getValue(name)
result match {
case null => None
case _ => Some(result)
}
}
def insertRefStorageFor(name: String, element: Array[Byte]) = {
refClient.put(name, element)
}
def getMapStorageRangeFor(name: String, start: Option[Array[Byte]], finish: Option[Array[Byte]], count: Int): List[(Array[Byte], Array[Byte])] = {
val allkeys: SortedSet[Array[Byte]] = getMapKeys(name)
val range = allkeys.rangeImpl(start, finish).take(count)
getKeyValues(name, range)
}
def getMapStorageFor(name: String): List[(Array[Byte], Array[Byte])] = {
val keys = getMapKeys(name)
getKeyValues(name, keys)
}
private def getKeyValues(name: String, keys: SortedSet[Array[Byte]]): List[(Array[Byte], Array[Byte])] = {
val all: JMap[Array[Byte], Versioned[Array[Byte]]] =
mapValueClient.getAll(JavaConversions.asIterable(keys.map {
mapKey => getKey(name, mapKey)
}))
val buf = new ArrayBuffer[(Array[Byte], Array[Byte])](all.size)
JavaConversions.asMap(all).foreach {
(entry) => {
entry match {
case (key: Array[Byte], versioned: Versioned[Array[Byte]]) => {
buf += key -> versioned.getValue
}
}
}
}
buf.toList
}
def getMapStorageSizeFor(name: String): Int = {
val keys = getMapKeys(name)
keys.size
}
def getMapStorageEntryFor(name: String, key: Array[Byte]): Option[Array[Byte]] = {
val result: Array[Byte] = mapValueClient.getValue(getKey(name, key))
result match {
case null => None
case _ => Some(result)
}
}
def removeMapStorageFor(name: String, key: Array[Byte]) = {
var keys = getMapKeys(name)
keys -= key
putMapKeys(name, keys)
mapValueClient.delete(getKey(name, key))
}
def removeMapStorageFor(name: String) = {
val keys = getMapKeys(name)
keys.foreach {
key =>
mapValueClient.delete(getKey(name, key))
}
mapKeyClient.delete(name)
}
def insertMapStorageEntryFor(name: String, key: Array[Byte], value: Array[Byte]) = {
mapValueClient.put(getKey(name, key), value)
var keys = getMapKeys(name)
keys += key
putMapKeys(name, keys)
}
def insertMapStorageEntriesFor(name: String, entries: List[(Array[Byte], Array[Byte])]) = {
val newKeys = entries.map {
case (key, value) => {
mapValueClient.put(getKey(name, key), value)
key
}
}
var keys = getMapKeys(name)
keys ++= newKeys
putMapKeys(name, keys)
}
def putMapKeys(name: String, keys: SortedSet[Array[Byte]]) = {
mapKeyClient.put(name, SortedSetSerializer.toBytes(keys))
}
def getMapKeys(name: String): SortedSet[Array[Byte]] = {
SortedSetSerializer.fromBytes(mapKeyClient.getValue(name, Array.empty[Byte]))
}
def getVectorStorageSizeFor(name: String): Int = {
IntSerializer.fromBytes(vectorSizeClient.getValue(name, IntSerializer.toBytes(0)))
}
def getVectorStorageRangeFor(name: String, start: Option[Int], finish: Option[Int], count: Int): List[Array[Byte]] = {
val size = getVectorStorageSizeFor(name)
val st = start.getOrElse(0)
val cnt =
if (finish.isDefined) {
val f = finish.get
if (f >= st) (f - st) else count
} else {
count
}
val seq: IndexedSeq[Array[Byte]] = (st until st + cnt).map {
index => getVectorValueKey(name, index)
}
val all: JMap[Array[Byte], Versioned[Array[Byte]]] = vectorValueClient.getAll(JavaConversions.asIterable(seq))
var storage = new ArrayBuffer[Array[Byte]](seq.size)
storage = storage.padTo(seq.size, Array.empty[Byte])
var idx = 0;
seq.foreach {
key => {
if (all.containsKey(key)) {
storage.update(idx, all.get(key).getValue)
}
idx += 1
}
}
storage.toList
}
def getVectorStorageEntryFor(name: String, index: Int): Array[Byte] = {
vectorValueClient.getValue(getVectorValueKey(name, index), Array.empty[Byte])
}
def updateVectorStorageEntryFor(name: String, index: Int, elem: Array[Byte]) = {
val size = getVectorStorageSizeFor(name)
vectorValueClient.put(getVectorValueKey(name, index), elem)
if (size < index + 1) {
vectorSizeClient.put(name, IntSerializer.toBytes(index + 1))
}
}
def insertVectorStorageEntriesFor(name: String, elements: List[Array[Byte]]) = {
var size = getVectorStorageSizeFor(name)
elements.foreach {
element =>
vectorValueClient.put(getVectorValueKey(name, size), element)
size += 1
}
vectorSizeClient.put(name, IntSerializer.toBytes(size))
}
def insertVectorStorageEntryFor(name: String, element: Array[Byte]) = {
insertVectorStorageEntriesFor(name, List(element))
}
/**
* Concat the ownerlenght+owner+key+ of owner so owned data will be colocated
* Store the length of owner as first byte to work around the rare case
* where ownerbytes1 + keybytes1 == ownerbytes2 + keybytes2 but ownerbytes1 != ownerbytes2
*/
def getKey(owner: String, key: Array[Byte]): Array[Byte] = {
val ownerBytes: Array[Byte] = owner.getBytes("UTF-8")
val ownerLenghtBytes: Array[Byte] = IntSerializer.toBytes(owner.length)
val theKey = new Array[Byte](ownerLenghtBytes.length + ownerBytes.length + key.length)
System.arraycopy(ownerLenghtBytes, 0, theKey, 0, ownerLenghtBytes.length)
System.arraycopy(ownerBytes, 0, theKey, ownerLenghtBytes.length, ownerBytes.length)
System.arraycopy(key, 0, theKey, ownerLenghtBytes.length + ownerBytes.length, key.length)
theKey
}
def getVectorValueKey(owner: String, index: Int): Array[Byte] = {
val indexbytes = IntSerializer.toBytes(index)
val theIndexKey = new Array[Byte](underscoreBytesUTF8.length + indexbytes.length)
System.arraycopy(underscoreBytesUTF8, 0, theIndexKey, 0, underscoreBytesUTF8.length)
System.arraycopy(indexbytes, 0, theIndexKey, underscoreBytesUTF8.length, indexbytes.length)
getKey(owner, theIndexKey)
}
def getIndexFromVectorValueKey(owner: String, key: Array[Byte]): Int = {
val indexBytes = new Array[Byte](IntSerializer.bytesPerInt)
System.arraycopy(key, key.length - IntSerializer.bytesPerInt, indexBytes, 0, IntSerializer.bytesPerInt)
IntSerializer.fromBytes(indexBytes)
}
def getClientConfig(configMap: Map[String, String]): Properties = {
val properites = new Properties
configMap.foreach {
keyval => keyval match {
case (key, value) => properites.setProperty(key.asInstanceOf[java.lang.String], value.asInstanceOf[java.lang.String])
}
}
properites
}
def initStoreClients() = {
if (storeClientFactory != null) {
storeClientFactory.close
}
storeClientFactory = {
if (clientConfig.getProperty(bootstrapUrlsProp, "none").startsWith("tcp")) {
new SocketStoreClientFactory(new ClientConfig(clientConfig))
} else if (clientConfig.getProperty(bootstrapUrlsProp, "none").startsWith("http")) {
new HttpStoreClientFactory(new ClientConfig(clientConfig))
} else {
throw new IllegalArgumentException("Unknown boostrapUrl syntax" + clientConfig.getProperty(bootstrapUrlsProp, "No Bootstrap URLs defined"))
}
}
refClient = storeClientFactory.getStoreClient(refStore)
mapKeyClient = storeClientFactory.getStoreClient(mapKeyStore)
mapValueClient = storeClientFactory.getStoreClient(mapValueStore)
vectorSizeClient = storeClientFactory.getStoreClient(vectorSizeStore)
vectorValueClient = storeClientFactory.getStoreClient(vectorValueStore)
}
object IntSerializer {
val bytesPerInt = java.lang.Integer.SIZE / java.lang.Byte.SIZE
def toBytes(i: Int) = ByteBuffer.wrap(new Array[Byte](bytesPerInt)).putInt(i).array()
def fromBytes(bytes: Array[Byte]) = ByteBuffer.wrap(bytes).getInt()
def toString(obj: Int) = obj.toString
def fromString(str: String) = str.toInt
}
object SortedSetSerializer {
def toBytes(set: SortedSet[Array[Byte]]): Array[Byte] = {
val length = set.foldLeft(0) {
(total, bytes) => {
total + bytes.length + IntSerializer.bytesPerInt
}
}
val allBytes = new Array[Byte](length)
val written = set.foldLeft(0) {
(total, bytes) => {
val sizeBytes = IntSerializer.toBytes(bytes.length)
System.arraycopy(sizeBytes, 0, allBytes, total, sizeBytes.length)
System.arraycopy(bytes, 0, allBytes, total + sizeBytes.length, bytes.length)
total + sizeBytes.length + bytes.length
}
}
require(length == written, "Bytes Written Did not equal Calculated Length, written %d, length %d".format(written, length))
allBytes
}
def fromBytes(bytes: Array[Byte]): SortedSet[Array[Byte]] = {
var set = new TreeSet[Array[Byte]]
if (bytes.length > IntSerializer.bytesPerInt) {
var pos = 0
while (pos < bytes.length) {
val lengthBytes = new Array[Byte](IntSerializer.bytesPerInt)
System.arraycopy(bytes, pos, lengthBytes, 0, IntSerializer.bytesPerInt)
pos += IntSerializer.bytesPerInt
val length = IntSerializer.fromBytes(lengthBytes)
val item = new Array[Byte](length)
System.arraycopy(bytes, pos, item, 0, length)
set = set + item
pos += length
}
}
set
}
}
}

View file

@ -0,0 +1,14 @@
<cluster>
<!-- The name is just to help users identify this cluster from the gui -->
<name>akka-test</name>
<server>
<!-- The node id is a unique, sequential id beginning with 0 that identifies each server in the cluster-->
<id>0</id>
<host>localhost</host>
<http-port>8081</http-port>
<socket-port>6666</socket-port>
<admin-port>6667</admin-port>
<!-- A list of data partitions assigned to this server -->
<partitions>0,1,2,3</partitions>
</server>
</cluster>

View file

@ -0,0 +1,2 @@
node.id=0
enable.rebalancing=false

View file

@ -0,0 +1,85 @@
<stores>
<store>
<name>Refs</name>
<replication-factor>1</replication-factor>
<preferred-reads>1</preferred-reads>
<required-reads>1</required-reads>
<preferred-writes>1</preferred-writes>
<required-writes>1</required-writes>
<persistence>bdb</persistence>
<routing>client</routing>
<key-serializer>
<type>string</type>
<schema-info>utf8</schema-info>
</key-serializer>
<value-serializer>
<type>identity</type>
</value-serializer>
</store>
<store>
<name>MapValues</name>
<replication-factor>1</replication-factor>
<preferred-reads>1</preferred-reads>
<required-reads>1</required-reads>
<preferred-writes>1</preferred-writes>
<required-writes>1</required-writes>
<persistence>bdb</persistence>
<routing>client</routing>
<key-serializer>
<type>identity</type>
</key-serializer>
<value-serializer>
<type>identity</type>
</value-serializer>
</store>
<store>
<name>MapKeys</name>
<replication-factor>1</replication-factor>
<preferred-reads>1</preferred-reads>
<required-reads>1</required-reads>
<preferred-writes>1</preferred-writes>
<required-writes>1</required-writes>
<persistence>bdb</persistence>
<routing>client</routing>
<key-serializer>
<type>string</type>
<schema-info>utf8</schema-info>
</key-serializer>
<value-serializer>
<type>identity</type>
</value-serializer>
</store>
<store>
<name>VectorValues</name>
<replication-factor>1</replication-factor>
<preferred-reads>1</preferred-reads>
<required-reads>1</required-reads>
<preferred-writes>1</preferred-writes>
<required-writes>1</required-writes>
<persistence>bdb</persistence>
<routing>client</routing>
<key-serializer>
<type>identity</type>
</key-serializer>
<value-serializer>
<type>identity</type>
</value-serializer>
</store>
<store>
<name>VectorSizes</name>
<replication-factor>1</replication-factor>
<preferred-reads>1</preferred-reads>
<required-reads>1</required-reads>
<preferred-writes>1</preferred-writes>
<required-writes>1</required-writes>
<persistence>bdb</persistence>
<routing>client</routing>
<key-serializer>
<type>string</type>
<schema-info>utf8</schema-info>
</key-serializer>
<value-serializer>
<type>identity</type>
</value-serializer>
</store>
</stores>

View file

@ -0,0 +1,38 @@
package se.scalablesolutions.akka.persistence.voldemort
import org.scalatest.matchers.ShouldMatchers
import voldemort.server.{VoldemortServer, VoldemortConfig}
import org.scalatest.{Suite, BeforeAndAfterAll, FunSuite}
import org.junit.runner.RunWith
import org.scalatest.junit.JUnitRunner
import voldemort.utils.Utils
import java.io.File
import se.scalablesolutions.akka.util.{Logging}
@RunWith(classOf[JUnitRunner])
trait EmbeddedVoldemort extends BeforeAndAfterAll with Logging {
this: Suite =>
var server: VoldemortServer = null
override protected def beforeAll(): Unit = {
try {
val dir = "./akka-persistence/akka-persistence-voldemort/target/scala_2.8.0/test-resources"
val home = new File(dir)
log.info("Creating Voldemort Config")
val config = VoldemortConfig.loadFromVoldemortHome(home.getCanonicalPath)
log.info("Starting Voldemort")
server = new VoldemortServer(config)
server.start
VoldemortStorageBackend.initStoreClients
log.info("Started")
} catch {
case e => log.error(e, "Error Starting Voldemort")
throw e
}
}
override protected def afterAll(): Unit = {
server.stop
}
}

View file

@ -0,0 +1,180 @@
package se.scalablesolutions.akka.persistence.voldemort
import org.scalatest.Spec
import org.scalatest.matchers.ShouldMatchers
import org.scalatest.BeforeAndAfterEach
import org.scalatest.junit.JUnitRunner
import org.junit.runner.RunWith
import se.scalablesolutions.akka.actor.{Transactor, Actor, ActorRef}
import Actor._
import BankAccountActor._
case class Balance(accountNo: String)
case class Debit(accountNo: String, amount: Int, failer: ActorRef)
case class MultiDebit(accountNo: String, amounts: List[Int], failer: ActorRef)
case class Credit(accountNo: String, amount: Int)
case class Log(start: Int, finish: Int)
case object LogSize
object BankAccountActor {
val state = "accountState"
val tx = "txnLog"
}
class BankAccountActor extends Transactor {
private val accountState = VoldemortStorage.newMap(state)
private val txnLog = VoldemortStorage.newVector(tx)
import sjson.json.DefaultProtocol._
import sjson.json.JsonSerialization._
def receive: Receive = {
// check balance
case Balance(accountNo) =>
txnLog.add(("Balance:" + accountNo).getBytes)
self.reply(
accountState.get(accountNo.getBytes)
.map(frombinary[Int](_))
.getOrElse(0))
// debit amount: can fail
case Debit(accountNo, amount, failer) =>
txnLog.add(("Debit:" + accountNo + " " + amount).getBytes)
val m = accountState.get(accountNo.getBytes)
.map(frombinary[Int](_))
.getOrElse(0)
accountState.put(accountNo.getBytes, tobinary(m - amount))
if (amount > m) failer !! "Failure"
self.reply(m - amount)
// many debits: can fail
// demonstrates true rollback even if multiple puts have been done
case MultiDebit(accountNo, amounts, failer) =>
val sum = amounts.foldRight(0)(_ + _)
txnLog.add(("MultiDebit:" + accountNo + " " + sum).getBytes)
val m = accountState.get(accountNo.getBytes)
.map(frombinary[Int](_))
.getOrElse(0)
var cbal = m
amounts.foreach {
amount =>
accountState.put(accountNo.getBytes, tobinary(m - amount))
cbal = cbal - amount
if (cbal < 0) failer !! "Failure"
}
self.reply(m - sum)
// credit amount
case Credit(accountNo, amount) =>
txnLog.add(("Credit:" + accountNo + " " + amount).getBytes)
val m = accountState.get(accountNo.getBytes)
.map(frombinary[Int](_))
.getOrElse(0)
accountState.put(accountNo.getBytes, tobinary(m + amount))
self.reply(m + amount)
case LogSize =>
self.reply(txnLog.length)
case Log(start, finish) =>
self.reply(txnLog.slice(start, finish).map(new String(_)))
}
}
@serializable class PersistentFailerActor extends Transactor {
def receive = {
case "Failure" =>
throw new RuntimeException("Expected exception; to test fault-tolerance")
}
}
@RunWith(classOf[JUnitRunner])
class VoldemortPersistentActorSuite extends
Spec with
ShouldMatchers with
BeforeAndAfterEach with EmbeddedVoldemort {
import VoldemortStorageBackend._
override def beforeEach {
removeMapStorageFor(state)
var size = getVectorStorageSizeFor(tx)
(0 to size).foreach {
index => {
vectorValueClient.delete(getVectorValueKey(tx, index))
}
}
vectorSizeClient.delete(tx)
}
override def afterEach {
beforeEach
}
describe("successful debit") {
it("should debit successfully") {
log.info("Succesful Debit starting")
val bactor = actorOf[BankAccountActor]
bactor.start
val failer = actorOf[PersistentFailerActor]
failer.start
bactor !! Credit("a-123", 5000)
log.info("credited")
bactor !! Debit("a-123", 3000, failer)
log.info("debited")
(bactor !! Balance("a-123")).get.asInstanceOf[Int] should equal(2000)
log.info("balane matched")
bactor !! Credit("a-123", 7000)
log.info("Credited")
(bactor !! Balance("a-123")).get.asInstanceOf[Int] should equal(9000)
log.info("Balance matched")
bactor !! Debit("a-123", 8000, failer)
log.info("Debited")
(bactor !! Balance("a-123")).get.asInstanceOf[Int] should equal(1000)
log.info("Balance matched")
(bactor !! LogSize).get.asInstanceOf[Int] should equal(7)
(bactor !! Log(0, 7)).get.asInstanceOf[Iterable[String]].size should equal(7)
}
}
describe("unsuccessful debit") {
it("debit should fail") {
val bactor = actorOf[BankAccountActor]
bactor.start
val failer = actorOf[PersistentFailerActor]
failer.start
bactor !! Credit("a-123", 5000)
(bactor !! Balance("a-123")).get.asInstanceOf[Int] should equal(5000)
evaluating {
bactor !! Debit("a-123", 7000, failer)
} should produce[Exception]
(bactor !! Balance("a-123")).get.asInstanceOf[Int] should equal(5000)
(bactor !! LogSize).get.asInstanceOf[Int] should equal(3)
}
}
describe("unsuccessful multidebit") {
it("multidebit should fail") {
val bactor = actorOf[BankAccountActor]
bactor.start
val failer = actorOf[PersistentFailerActor]
failer.start
bactor !! Credit("a-123", 5000)
(bactor !! Balance("a-123")).get.asInstanceOf[Int] should equal(5000)
evaluating {
bactor !! MultiDebit("a-123", List(1000, 2000, 4000), failer)
} should produce[Exception]
(bactor !! Balance("a-123")).get.asInstanceOf[Int] should equal(5000)
(bactor !! LogSize).get.asInstanceOf[Int] should equal(3)
}
}
}

View file

@ -0,0 +1,87 @@
package se.scalablesolutions.akka.persistence.voldemort
import org.scalatest.FunSuite
import org.scalatest.matchers.ShouldMatchers
import org.junit.runner.RunWith
import org.scalatest.junit.JUnitRunner
import se.scalablesolutions.akka.persistence.voldemort.VoldemortStorageBackend._
import se.scalablesolutions.akka.actor.{newUuid,Uuid}
import collection.immutable.TreeSet
import VoldemortStorageBackendSuite._
import se.scalablesolutions.akka.stm._
import se.scalablesolutions.akka.stm.global._
import se.scalablesolutions.akka.config.ScalaConfig._
import se.scalablesolutions.akka.persistence.common._
import se.scalablesolutions.akka.util.Logging
import se.scalablesolutions.akka.config.Config.config
@RunWith(classOf[JUnitRunner])
class VoldemortPersistentDatastructureSuite extends FunSuite with ShouldMatchers with EmbeddedVoldemort with Logging {
test("persistentRefs work as expected") {
val name = newUuid.toString
val one = "one".getBytes
atomic {
val ref = VoldemortStorage.getRef(name)
ref.isDefined should be(false)
ref.swap(one)
ref.get match {
case Some(bytes) => bytes should be(one)
case None => true should be(false)
}
}
val two = "two".getBytes
atomic {
val ref = VoldemortStorage.getRef(name)
ref.isDefined should be(true)
ref.swap(two)
ref.get match {
case Some(bytes) => bytes should be(two)
case None => true should be(false)
}
}
}
test("Persistent Vectors function as expected") {
val name = newUuid.toString
val one = "one".getBytes
val two = "two".getBytes
atomic {
val vec = VoldemortStorage.getVector(name)
vec.add(one)
}
atomic {
val vec = VoldemortStorage.getVector(name)
vec.size should be(1)
vec.add(two)
}
atomic {
val vec = VoldemortStorage.getVector(name)
vec.get(0) should be(one)
vec.get(1) should be(two)
vec.size should be(2)
vec.update(0, two)
}
atomic {
val vec = VoldemortStorage.getVector(name)
vec.get(0) should be(two)
vec.get(1) should be(two)
vec.size should be(2)
vec.update(0, Array.empty[Byte])
vec.update(1, Array.empty[Byte])
}
atomic {
val vec = VoldemortStorage.getVector(name)
vec.get(0) should be(Array.empty[Byte])
vec.get(1) should be(Array.empty[Byte])
vec.size should be(2)
}
}
}

View file

@ -0,0 +1,147 @@
package se.scalablesolutions.akka.persistence.voldemort
import org.scalatest.FunSuite
import org.scalatest.matchers.ShouldMatchers
import org.junit.runner.RunWith
import org.scalatest.junit.JUnitRunner
import se.scalablesolutions.akka.persistence.voldemort.VoldemortStorageBackend._
import se.scalablesolutions.akka.util.{Logging}
import collection.immutable.TreeSet
import VoldemortStorageBackendSuite._
@RunWith(classOf[JUnitRunner])
class VoldemortStorageBackendSuite extends FunSuite with ShouldMatchers with EmbeddedVoldemort with Logging {
test("that ref storage and retrieval works") {
val key = "testRef"
val value = "testRefValue"
val valueBytes = bytes(value)
refClient.delete(key)
refClient.getValue(key, empty) should be(empty)
refClient.put(key, valueBytes)
refClient.getValue(key) should be(valueBytes)
}
test("PersistentRef apis function as expected") {
val key = "apiTestRef"
val value = "apiTestRefValue"
val valueBytes = bytes(value)
refClient.delete(key)
getRefStorageFor(key) should be(None)
insertRefStorageFor(key, valueBytes)
getRefStorageFor(key).get should equal(valueBytes)
}
test("that map key storage and retrieval works") {
val key = "testmapKey"
val mapKeys = new TreeSet[Array[Byte]] + bytes("key1")
mapKeyClient.delete(key)
mapKeyClient.getValue(key, SortedSetSerializer.toBytes(emptySet)) should equal(SortedSetSerializer.toBytes(emptySet))
putMapKeys(key, mapKeys)
getMapKeys(key) should equal(mapKeys)
}
test("that map value storage and retrieval works") {
val key = bytes("keyForTestingMapValueClient")
val value = bytes("value for testing map value client")
mapValueClient.put(key, value)
mapValueClient.getValue(key, empty) should equal(value)
}
test("PersistentMap apis function as expected") {
val name = "theMap"
val key = bytes("mapkey")
val value = bytes("mapValue")
removeMapStorageFor(name, key)
removeMapStorageFor(name)
getMapStorageEntryFor(name, key) should be(None)
getMapStorageSizeFor(name) should be(0)
getMapStorageFor(name).length should be(0)
getMapStorageRangeFor(name, None, None, 100).length should be(0)
insertMapStorageEntryFor(name, key, value)
getMapStorageEntryFor(name, key).get should equal(value)
getMapStorageSizeFor(name) should be(1)
getMapStorageFor(name).length should be(1)
getMapStorageRangeFor(name, None, None, 100).length should be(1)
removeMapStorageFor(name, key)
removeMapStorageFor(name)
getMapStorageEntryFor(name, key) should be(None)
getMapStorageSizeFor(name) should be(0)
getMapStorageFor(name).length should be(0)
getMapStorageRangeFor(name, None, None, 100).length should be(0)
insertMapStorageEntriesFor(name, List(key -> value))
getMapStorageEntryFor(name, key).get should equal(value)
getMapStorageSizeFor(name) should be(1)
getMapStorageFor(name).length should be(1)
getMapStorageRangeFor(name, None, None, 100).length should be(1)
}
test("that vector size storage and retrieval works") {
val key = "vectorKey"
val size = IntSerializer.toBytes(17)
vectorSizeClient.delete(key)
vectorSizeClient.getValue(key, empty) should equal(empty)
vectorSizeClient.put(key, size)
vectorSizeClient.getValue(key) should equal(size)
}
test("that vector value storage and retrieval works") {
val key = "vectorValueKey"
val index = 3
val value = bytes("some bytes")
val vecKey = getVectorValueKey(key, index)
getIndexFromVectorValueKey(key, vecKey) should be(index)
vectorValueClient.delete(vecKey)
vectorValueClient.getValue(vecKey, empty) should equal(empty)
vectorValueClient.put(vecKey, value)
vectorValueClient.getValue(vecKey) should equal(value)
}
test("PersistentVector apis function as expected") {
val key = "vectorApiKey"
val value = bytes("Some bytes we want to store in a vector")
val updatedValue = bytes("Some updated bytes we want to store in a vector")
vectorSizeClient.delete(key)
vectorValueClient.delete(getVectorValueKey(key, 0))
vectorValueClient.delete(getVectorValueKey(key, 1))
getVectorStorageEntryFor(key, 0) should be(empty)
getVectorStorageEntryFor(key, 1) should be(empty)
getVectorStorageRangeFor(key, None, None, 1).head should be(empty)
insertVectorStorageEntryFor(key, value)
//again
insertVectorStorageEntryFor(key, value)
getVectorStorageEntryFor(key, 0) should be(value)
getVectorStorageEntryFor(key, 1) should be(value)
getVectorStorageRangeFor(key, None, None, 1).head should be(value)
getVectorStorageRangeFor(key, Some(1), None, 1).head should be(value)
getVectorStorageSizeFor(key) should be(2)
updateVectorStorageEntryFor(key, 1, updatedValue)
getVectorStorageEntryFor(key, 0) should be(value)
getVectorStorageEntryFor(key, 1) should be(updatedValue)
getVectorStorageRangeFor(key, None, None, 1).head should be(value)
getVectorStorageRangeFor(key, Some(1), None, 1).head should be(updatedValue)
getVectorStorageSizeFor(key) should be(2)
}
}
object VoldemortStorageBackendSuite {
val empty = Array.empty[Byte]
val emptySet = new TreeSet[Array[Byte]]
def bytes(value: String): Array[Byte] = {
value.getBytes("UTF-8")
}
}

View file

@ -168,5 +168,19 @@ akka {
hbase {
zookeeper-quorum = "localhost"
}
voldemort {
store {
refs = "Refs" # Voldemort Store Used to Persist Refs. Use string serializer for keys, identity serializer for values
map-keys = "MapKeys" # Voldemort Store Used to Persist Map Keys. Use string serializer for keys, identity serializer for values
map-values = "MapValues" # Voldemort Store Used to Persist Map Values. Use identity serializer for keys, identity serializer for values
vector-sizes = "VectorSizes" # Voldemort Store Used to Persist Vector Sizes. Use string serializer for keys, identity serializer for values
vector-values = "VectorValues" # Voldemort Store Used to Persist Vector Values. Use identity serializer for keys, identity serializer for values
}
client { # The KeyValue pairs under client are converted to java Properties and used to construct the ClientConfig
bootstrap_urls = "tcp://localhost:6666" # All Valid Voldemort Client properties are valid here, in string form
}
}
}
}

View file

@ -53,6 +53,8 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) {
lazy val SunJDMKRepo = MavenRepository("Sun JDMK Repo", "http://wp5.e-taxonomy.eu/cdmlib/mavenrepo")
lazy val CasbahRepoReleases = MavenRepository("Casbah Release Repo", "http://repo.bumnetworks.com/releases")
lazy val ZookeeperRepo = MavenRepository("Zookeeper Repo", "http://lilycms.org/maven/maven2/deploy/")
lazy val ClojarsRepo = MavenRepository("Clojars Repo", "http://clojars.org/repo")
lazy val OracleRepo = MavenRepository("Oracle Repo", "http://download.oracle.com/maven")
}
// -------------------------------------------------------------------------------------------------------------------
@ -83,6 +85,8 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) {
lazy val zookeeperRelease = ModuleConfiguration("org.apache.hadoop.zookeeper",ZookeeperRepo)
lazy val casbahModuleConfig = ModuleConfiguration("com.novus", CasbahRepo)
lazy val timeModuleConfig = ModuleConfiguration("org.scala-tools", "time", CasbahSnapshotRepo)
lazy val voldemortModuleConfig = ModuleConfiguration("voldemort", ClojarsRepo)
lazy val sleepycatModuleConfig = ModuleConfiguration("com.sleepycat", OracleRepo)
lazy val embeddedRepo = EmbeddedRepo // This is the only exception, because the embedded repo is fast!
// -------------------------------------------------------------------------------------------------------------------
@ -193,6 +197,7 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) {
lazy val sbinary = "sbinary" % "sbinary" % "2.8.0-0.3.1" % "compile"
lazy val sjson = "sjson.json" % "sjson" % "0.8-2.8.0" % "compile"
lazy val sjson_test = "sjson.json" % "sjson" % "0.8-2.8.0" % "test"
lazy val slf4j = "org.slf4j" % "slf4j-api" % SLF4J_VERSION % "compile"
@ -206,6 +211,10 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) {
lazy val thrift = "com.facebook" % "thrift" % "r917130" % "compile"
lazy val voldemort = "voldemort" % "voldemort" % "0.81" % "compile"
lazy val voldemort_contrib = "voldemort" % "voldemort-contrib" % "0.81" % "compile"
lazy val voldemort_needs_log4j = "org.slf4j" % "log4j-over-slf4j" % SLF4J_VERSION % "compile"
lazy val werkz = "org.codehaus.aspectwerkz" % "aspectwerkz-nodeps-jdk5" % ASPECTWERKZ_VERSION % "compile"
lazy val werkz_core = "org.codehaus.aspectwerkz" % "aspectwerkz-jdk5" % ASPECTWERKZ_VERSION % "compile"
@ -228,10 +237,19 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) {
lazy val junit = "junit" % "junit" % "4.5" % "test"
lazy val mockito = "org.mockito" % "mockito-all" % "1.8.1" % "test"
lazy val scalatest = "org.scalatest" % "scalatest" % SCALATEST_VERSION % "test"
//HBase testing
lazy val hadoop_test = "org.apache.hadoop" % "hadoop-test" % "0.20.2" % "test"
lazy val hbase_test = "org.apache.hbase" % "hbase-test" % "0.20.6" % "test"
lazy val log4j = "log4j" % "log4j" % "1.2.15" % "test"
lazy val jetty_mortbay = "org.mortbay.jetty" % "jetty" % "6.1.14" % "test"
//voldemort testing
lazy val jdom = "org.jdom" % "jdom" % "1.1" % "test"
lazy val vold_jetty = "org.mortbay.jetty" % "jetty" % "6.1.18" % "test"
lazy val velocity = "org.apache.velocity" % "velocity" % "1.6.2" % "test"
lazy val bdb = "com.sleepycat" % "je" % "4.0.103" % "test"
lazy val dbcp = "commons-dbcp" % "commons-dbcp" % "1.2.2" % "test"
}
// -------------------------------------------------------------------------------------------------------------------
@ -480,6 +498,8 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) {
new AkkaCassandraProject(_), akka_persistence_common)
lazy val akka_persistence_hbase = project("akka-persistence-hbase", "akka-persistence-hbase",
new AkkaHbaseProject(_), akka_persistence_common)
lazy val akka_persistence_voldemort = project("akka-persistence-voldemort", "akka-persistence-voldemort",
new AkkaVoldemortProject(_), akka_persistence_common)
}
// -------------------------------------------------------------------------------------------------------------------
@ -544,7 +564,7 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) {
</dependency>
<dependency org="org.apache.hadoop" name="hadoop-test" rev="0.20.2" conf="test">
<exclude module="slf4j-api"/>
<exclude module="slf4j-api"/>
</dependency>
<dependency org="org.apache.hbase" name="hbase-test" rev="0.20.6" conf="test">
</dependency>
@ -557,6 +577,28 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) {
override def testOptions = TestFilter((name: String) => name.endsWith("Test")) :: Nil
}
// akka-persistence-voldemort subproject
// -------------------------------------------------------------------------------------------------------------------
class AkkaVoldemortProject(info: ProjectInfo) extends AkkaDefaultProject(info, distPath) {
val voldemort = Dependencies.voldemort
val voldemort_contrib = Dependencies.voldemort_contrib
val voldemort_needs_log4j = Dependencies.voldemort_needs_log4j
//testing
val scalatest = Dependencies.scalatest
val google_coll = Dependencies.google_coll
val jdom = Dependencies.jdom
val jetty = Dependencies.vold_jetty
val velocity = Dependencies.velocity
val bdb = Dependencies.bdb
val dbcp = Dependencies.dbcp
val sjson = Dependencies.sjson_test
override def testOptions = TestFilter((name: String) => name.endsWith("Suite")) :: Nil
}
// -------------------------------------------------------------------------------------------------------------------
// akka-kernel subproject
// -------------------------------------------------------------------------------------------------------------------
@ -600,7 +642,7 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) {
new AkkaOSGiAssemblyProject(_), akka_osgi_dependencies_bundle, akka_remote, akka_amqp, akka_http,
akka_camel, akka_spring, akka_jta, akka_persistence.akka_persistence_common,
akka_persistence.akka_persistence_redis, akka_persistence.akka_persistence_mongo,
akka_persistence.akka_persistence_cassandra)
akka_persistence.akka_persistence_cassandra,akka_persistence.akka_persistence_voldemort)
}
class AkkaOSGiDependenciesBundleProject(info: ProjectInfo) extends AkkaDefaultProject(info, distPath) with BNDPlugin {