sorted set hand serialization and working actor test

This commit is contained in:
ticktock 2010-09-16 11:47:35 -04:00
parent cb0bc2d639
commit beee516b3d
4 changed files with 240 additions and 18 deletions

View file

@ -42,7 +42,7 @@ MapStorageBackend[Array[Byte], Array[Byte]] with
}
}
var refClient: StoreClient[String, Array[Byte]] = storeClientFactory.getStoreClient(refStore)
var mapKeyClient: StoreClient[String, SortedSet[Array[Byte]]] = storeClientFactory.getStoreClient(mapKeyStore)
var mapKeyClient: StoreClient[String, Array[Byte]] = storeClientFactory.getStoreClient(mapKeyStore)
var mapValueClient: StoreClient[Array[Byte], Array[Byte]] = storeClientFactory.getStoreClient(mapValueStore)
var vectorSizeClient: StoreClient[String, Array[Byte]] = storeClientFactory.getStoreClient(vectorSizeStore)
var vectorValueClient: StoreClient[Array[Byte], Array[Byte]] = storeClientFactory.getStoreClient(vectorValueStore)
@ -65,13 +65,13 @@ MapStorageBackend[Array[Byte], Array[Byte]] with
}
def getMapStorageRangeFor(name: String, start: Option[Array[Byte]], finish: Option[Array[Byte]], count: Int): List[(Array[Byte], Array[Byte])] = {
val allkeys: SortedSet[Array[Byte]] = mapKeyClient.getValue(name, new TreeSet[Array[Byte]])
val allkeys: SortedSet[Array[Byte]] = getMapKeys(name)
val range = allkeys.rangeImpl(start, finish).take(count)
getKeyValues(name, range)
}
def getMapStorageFor(name: String): List[(Array[Byte], Array[Byte])] = {
val keys = mapKeyClient.getValue(name, new TreeSet[Array[Byte]]())
val keys = getMapKeys(name)
getKeyValues(name, keys)
}
@ -95,7 +95,7 @@ MapStorageBackend[Array[Byte], Array[Byte]] with
}
def getMapStorageSizeFor(name: String): Int = {
val keys = mapKeyClient.getValue(name, new TreeSet[Array[Byte]]())
val keys = getMapKeys(name)
keys.size
}
@ -108,15 +108,15 @@ MapStorageBackend[Array[Byte], Array[Byte]] with
}
def removeMapStorageFor(name: String, key: Array[Byte]) = {
var keys = mapKeyClient.getValue(name, new TreeSet[Array[Byte]]())
var keys = getMapKeys(name)
keys -= key
mapKeyClient.put(name, keys)
putMapKeys(name, keys)
mapValueClient.delete(getKey(name, key))
}
def removeMapStorageFor(name: String) = {
val keys = mapKeyClient.getValue(name, new TreeSet[Array[Byte]]())
val keys = getMapKeys(name)
keys.foreach {
key =>
mapValueClient.delete(getKey(name, key))
@ -126,9 +126,9 @@ MapStorageBackend[Array[Byte], Array[Byte]] with
def insertMapStorageEntryFor(name: String, key: Array[Byte], value: Array[Byte]) = {
mapValueClient.put(getKey(name, key), value)
var keys = mapKeyClient.getValue(name, new TreeSet[Array[Byte]]())
var keys = getMapKeys(name)
keys += key
mapKeyClient.put(name, keys)
putMapKeys(name, keys)
}
def insertMapStorageEntriesFor(name: String, entries: List[(Array[Byte], Array[Byte])]) = {
@ -138,9 +138,17 @@ MapStorageBackend[Array[Byte], Array[Byte]] with
key
}
}
var keys = mapKeyClient.getValue(name, new TreeSet[Array[Byte]]())
var keys = getMapKeys(name)
keys ++= newKeys
mapKeyClient.put(name, keys)
putMapKeys(name, keys)
}
def putMapKeys(name: String, keys: SortedSet[Array[Byte]]) = {
mapKeyClient.put(name, SortedSetSerializer.toBytes(keys))
}
def getMapKeys(name: String): SortedSet[Array[Byte]] = {
SortedSetSerializer.fromBytes(mapKeyClient.getValue(name, Array.empty[Byte]))
}
@ -176,8 +184,7 @@ MapStorageBackend[Array[Byte], Array[Byte]] with
idx += 1
}
}
log.info("StorageSize:" + storage.size)
log.info("SeqSize:" + seq.size)
storage.toList
}
@ -250,4 +257,44 @@ MapStorageBackend[Array[Byte], Array[Byte]] with
def fromString(str: String) = str.toInt
}
object SortedSetSerializer {
def toBytes(set: SortedSet[Array[Byte]]): Array[Byte] = {
val length = set.foldLeft(0) {
(total, bytes) => {
total + bytes.length + IntSerializer.bytesPerInt
}
}
val allBytes = new Array[Byte](length)
val written = set.foldLeft(0) {
(total, bytes) => {
val sizeBytes = IntSerializer.toBytes(bytes.length)
System.arraycopy(sizeBytes, 0, allBytes, total, sizeBytes.length)
System.arraycopy(bytes, 0, allBytes, total + sizeBytes.length, bytes.length)
total + sizeBytes.length + bytes.length
}
}
require(length == written, "Bytes Written Did not equal Calculated Length, written %d, length %d".format(written, length))
allBytes
}
def fromBytes(bytes: Array[Byte]): SortedSet[Array[Byte]] = {
var set = new TreeSet[Array[Byte]]
if (bytes.length > IntSerializer.bytesPerInt) {
var pos = 0
while (pos < bytes.length) {
val lengthBytes = new Array[Byte](IntSerializer.bytesPerInt)
System.arraycopy(bytes, pos, lengthBytes, 0, IntSerializer.bytesPerInt)
pos += IntSerializer.bytesPerInt
val length = IntSerializer.fromBytes(lengthBytes)
val item = new Array[Byte](length)
System.arraycopy(bytes, pos, item, 0, length)
set = set + item
pos += length
}
}
set
}
}
}

View file

@ -46,7 +46,7 @@
<schema-info>utf8</schema-info>
</key-serializer>
<value-serializer>
<type>java-serialization</type>
<type>identity</type>
</value-serializer>
</store>
<store>

View file

@ -0,0 +1,176 @@
package se.scalablesolutions.akka.persistence.voldemort
import org.scalatest.Spec
import org.scalatest.matchers.ShouldMatchers
import org.scalatest.BeforeAndAfterEach
import org.scalatest.junit.JUnitRunner
import org.junit.runner.RunWith
import se.scalablesolutions.akka.actor.{Transactor, Actor, ActorRef}
import Actor._
import BankAccountActor._
case class Balance(accountNo: String)
case class Debit(accountNo: String, amount: Int, failer: ActorRef)
case class MultiDebit(accountNo: String, amounts: List[Int], failer: ActorRef)
case class Credit(accountNo: String, amount: Int)
case class Log(start: Int, finish: Int)
case object LogSize
object BankAccountActor {
val state = "accountState"
val tx = "txnLog"
}
class BankAccountActor extends Transactor {
private lazy val accountState = VoldemortStorage.newMap(state)
private lazy val txnLog = VoldemortStorage.newVector(tx)
import sjson.json.DefaultProtocol._
import sjson.json.JsonSerialization._
def receive: Receive = {
// check balance
case Balance(accountNo) =>
txnLog.add(("Balance:" + accountNo).getBytes)
self.reply(
accountState.get(accountNo.getBytes)
.map(frombinary[Int](_))
.getOrElse(0))
// debit amount: can fail
case Debit(accountNo, amount, failer) =>
txnLog.add(("Debit:" + accountNo + " " + amount).getBytes)
val m = accountState.get(accountNo.getBytes)
.map(frombinary[Int](_))
.getOrElse(0)
accountState.put(accountNo.getBytes, tobinary(m - amount))
if (amount > m) failer !! "Failure"
self.reply(m - amount)
// many debits: can fail
// demonstrates true rollback even if multiple puts have been done
case MultiDebit(accountNo, amounts, failer) =>
val sum = amounts.foldRight(0)(_ + _)
txnLog.add(("MultiDebit:" + accountNo + " " + sum).getBytes)
val m = accountState.get(accountNo.getBytes)
.map(frombinary[Int](_))
.getOrElse(0)
var cbal = m
amounts.foreach {
amount =>
accountState.put(accountNo.getBytes, tobinary(m - amount))
cbal = cbal - amount
if (cbal < 0) failer !! "Failure"
}
self.reply(m - sum)
// credit amount
case Credit(accountNo, amount) =>
txnLog.add(("Credit:" + accountNo + " " + amount).getBytes)
val m = accountState.get(accountNo.getBytes)
.map(frombinary[Int](_))
.getOrElse(0)
accountState.put(accountNo.getBytes, tobinary(m + amount))
self.reply(m + amount)
case LogSize =>
self.reply(txnLog.length)
case Log(start, finish) =>
self.reply(txnLog.slice(start, finish).map(new String(_)))
}
}
@serializable class PersistentFailerActor extends Transactor {
def receive = {
case "Failure" =>
throw new RuntimeException("Expected exception; to test fault-tolerance")
}
}
@RunWith(classOf[JUnitRunner])
class VoldemortPersistentActorSuite extends
Spec with
ShouldMatchers with
BeforeAndAfterEach with EmbeddedVoldemort {
import VoldemortStorageBackend._
override def beforeEach {
removeMapStorageFor(state)
var size = getVectorStorageSizeFor(tx)
(0 to size).foreach {
index => {
vectorValueClient.delete(getVectorValueKey(tx, index))
}
}
vectorSizeClient.delete(tx)
}
override def afterEach {
beforeEach
}
describe("successful debit") {
it("should debit successfully") {
val bactor = actorOf[BankAccountActor]
bactor.start
val failer = actorOf[PersistentFailerActor]
failer.start
bactor !! Credit("a-123", 5000)
bactor !! Debit("a-123", 3000, failer)
(bactor !! Balance("a-123")).get.asInstanceOf[Int] should equal(2000)
bactor !! Credit("a-123", 7000)
(bactor !! Balance("a-123")).get.asInstanceOf[Int] should equal(9000)
bactor !! Debit("a-123", 8000, failer)
(bactor !! Balance("a-123")).get.asInstanceOf[Int] should equal(1000)
(bactor !! LogSize).get.asInstanceOf[Int] should equal(7)
(bactor !! Log(0, 7)).get.asInstanceOf[Iterable[String]].size should equal(7)
}
}
describe("unsuccessful debit") {
it("debit should fail") {
val bactor = actorOf[BankAccountActor]
bactor.start
val failer = actorOf[PersistentFailerActor]
failer.start
bactor !! Credit("a-123", 5000)
(bactor !! Balance("a-123")).get.asInstanceOf[Int] should equal(5000)
evaluating {
bactor !! Debit("a-123", 7000, failer)
} should produce[Exception]
(bactor !! Balance("a-123")).get.asInstanceOf[Int] should equal(5000)
(bactor !! LogSize).get.asInstanceOf[Int] should equal(3)
}
}
describe("unsuccessful multidebit") {
it("multidebit should fail") {
val bactor = actorOf[BankAccountActor]
bactor.start
val failer = actorOf[PersistentFailerActor]
failer.start
bactor !! Credit("a-123", 5000)
(bactor !! Balance("a-123")).get.asInstanceOf[Int] should equal(5000)
evaluating {
bactor !! MultiDebit("a-123", List(1000, 2000, 4000), failer)
} should produce[Exception]
(bactor !! Balance("a-123")).get.asInstanceOf[Int] should equal(5000)
(bactor !! LogSize).get.asInstanceOf[Int] should equal(3)
}
}
}

View file

@ -35,10 +35,9 @@ class VoldemortStorageBackendSuite extends FunSuite with ShouldMatchers with Emb
val key = "testmapKey"
val mapKeys = new TreeSet[Array[Byte]] + bytes("key1")
mapKeyClient.delete(key)
mapKeyClient.getValue(key, emptySet) should equal(emptySet)
mapKeyClient.put(key, mapKeys)
mapKeyClient.getValue(key, emptySet) should equal(mapKeys)
mapKeyClient.getValue(key, SortedSetSerializer.toBytes(emptySet)) should equal(SortedSetSerializer.toBytes(emptySet))
putMapKeys(key, mapKeys)
getMapKeys(key) should equal(mapKeys)
}
test("that map value storage and retrieval works") {