diff --git a/akka-persistence/akka-persistence-voldemort/src/main/scala/VoldemortStorageBackend.scala b/akka-persistence/akka-persistence-voldemort/src/main/scala/VoldemortStorageBackend.scala
index d0efd7347e..77fd7acedb 100644
--- a/akka-persistence/akka-persistence-voldemort/src/main/scala/VoldemortStorageBackend.scala
+++ b/akka-persistence/akka-persistence-voldemort/src/main/scala/VoldemortStorageBackend.scala
@@ -42,7 +42,7 @@ MapStorageBackend[Array[Byte], Array[Byte]] with
}
}
var refClient: StoreClient[String, Array[Byte]] = storeClientFactory.getStoreClient(refStore)
- var mapKeyClient: StoreClient[String, SortedSet[Array[Byte]]] = storeClientFactory.getStoreClient(mapKeyStore)
+ var mapKeyClient: StoreClient[String, Array[Byte]] = storeClientFactory.getStoreClient(mapKeyStore)
var mapValueClient: StoreClient[Array[Byte], Array[Byte]] = storeClientFactory.getStoreClient(mapValueStore)
var vectorSizeClient: StoreClient[String, Array[Byte]] = storeClientFactory.getStoreClient(vectorSizeStore)
var vectorValueClient: StoreClient[Array[Byte], Array[Byte]] = storeClientFactory.getStoreClient(vectorValueStore)
@@ -65,13 +65,13 @@ MapStorageBackend[Array[Byte], Array[Byte]] with
}
def getMapStorageRangeFor(name: String, start: Option[Array[Byte]], finish: Option[Array[Byte]], count: Int): List[(Array[Byte], Array[Byte])] = {
- val allkeys: SortedSet[Array[Byte]] = mapKeyClient.getValue(name, new TreeSet[Array[Byte]])
+ val allkeys: SortedSet[Array[Byte]] = getMapKeys(name)
val range = allkeys.rangeImpl(start, finish).take(count)
getKeyValues(name, range)
}
def getMapStorageFor(name: String): List[(Array[Byte], Array[Byte])] = {
- val keys = mapKeyClient.getValue(name, new TreeSet[Array[Byte]]())
+ val keys = getMapKeys(name)
getKeyValues(name, keys)
}
@@ -95,7 +95,7 @@ MapStorageBackend[Array[Byte], Array[Byte]] with
}
def getMapStorageSizeFor(name: String): Int = {
- val keys = mapKeyClient.getValue(name, new TreeSet[Array[Byte]]())
+ val keys = getMapKeys(name)
keys.size
}
@@ -108,15 +108,15 @@ MapStorageBackend[Array[Byte], Array[Byte]] with
}
def removeMapStorageFor(name: String, key: Array[Byte]) = {
- var keys = mapKeyClient.getValue(name, new TreeSet[Array[Byte]]())
+ var keys = getMapKeys(name)
keys -= key
- mapKeyClient.put(name, keys)
+ putMapKeys(name, keys)
mapValueClient.delete(getKey(name, key))
}
def removeMapStorageFor(name: String) = {
- val keys = mapKeyClient.getValue(name, new TreeSet[Array[Byte]]())
+ val keys = getMapKeys(name)
keys.foreach {
key =>
mapValueClient.delete(getKey(name, key))
@@ -126,9 +126,9 @@ MapStorageBackend[Array[Byte], Array[Byte]] with
def insertMapStorageEntryFor(name: String, key: Array[Byte], value: Array[Byte]) = {
mapValueClient.put(getKey(name, key), value)
- var keys = mapKeyClient.getValue(name, new TreeSet[Array[Byte]]())
+ var keys = getMapKeys(name)
keys += key
- mapKeyClient.put(name, keys)
+ putMapKeys(name, keys)
}
def insertMapStorageEntriesFor(name: String, entries: List[(Array[Byte], Array[Byte])]) = {
@@ -138,9 +138,17 @@ MapStorageBackend[Array[Byte], Array[Byte]] with
key
}
}
- var keys = mapKeyClient.getValue(name, new TreeSet[Array[Byte]]())
+ var keys = getMapKeys(name)
keys ++= newKeys
- mapKeyClient.put(name, keys)
+ putMapKeys(name, keys)
+ }
+
+ def putMapKeys(name: String, keys: SortedSet[Array[Byte]]) = {
+ mapKeyClient.put(name, SortedSetSerializer.toBytes(keys))
+ }
+
+ def getMapKeys(name: String): SortedSet[Array[Byte]] = {
+ SortedSetSerializer.fromBytes(mapKeyClient.getValue(name, Array.empty[Byte]))
}
@@ -176,8 +184,7 @@ MapStorageBackend[Array[Byte], Array[Byte]] with
idx += 1
}
}
- log.info("StorageSize:" + storage.size)
- log.info("SeqSize:" + seq.size)
+
storage.toList
}
@@ -250,4 +257,44 @@ MapStorageBackend[Array[Byte], Array[Byte]] with
def fromString(str: String) = str.toInt
}
+ object SortedSetSerializer {
+ def toBytes(set: SortedSet[Array[Byte]]): Array[Byte] = {
+ val length = set.foldLeft(0) {
+ (total, bytes) => {
+ total + bytes.length + IntSerializer.bytesPerInt
+ }
+ }
+ val allBytes = new Array[Byte](length)
+ val written = set.foldLeft(0) {
+ (total, bytes) => {
+ val sizeBytes = IntSerializer.toBytes(bytes.length)
+ System.arraycopy(sizeBytes, 0, allBytes, total, sizeBytes.length)
+ System.arraycopy(bytes, 0, allBytes, total + sizeBytes.length, bytes.length)
+ total + sizeBytes.length + bytes.length
+ }
+ }
+ require(length == written, "Bytes Written Did not equal Calculated Length, written %d, length %d".format(written, length))
+ allBytes
+ }
+
+ def fromBytes(bytes: Array[Byte]): SortedSet[Array[Byte]] = {
+ var set = new TreeSet[Array[Byte]]
+ if (bytes.length > IntSerializer.bytesPerInt) {
+ var pos = 0
+ while (pos < bytes.length) {
+ val lengthBytes = new Array[Byte](IntSerializer.bytesPerInt)
+ System.arraycopy(bytes, pos, lengthBytes, 0, IntSerializer.bytesPerInt)
+ pos += IntSerializer.bytesPerInt
+ val length = IntSerializer.fromBytes(lengthBytes)
+ val item = new Array[Byte](length)
+ System.arraycopy(bytes, pos, item, 0, length)
+ set = set + item
+ pos += length
+ }
+ }
+ set
+ }
+
+ }
+
}
\ No newline at end of file
diff --git a/akka-persistence/akka-persistence-voldemort/src/test/resources/config/stores.xml b/akka-persistence/akka-persistence-voldemort/src/test/resources/config/stores.xml
index 78f1b1385a..f2dd6ac099 100644
--- a/akka-persistence/akka-persistence-voldemort/src/test/resources/config/stores.xml
+++ b/akka-persistence/akka-persistence-voldemort/src/test/resources/config/stores.xml
@@ -46,7 +46,7 @@
utf8
- java-serialization
+ identity
diff --git a/akka-persistence/akka-persistence-voldemort/src/test/scala/VoldemortPersistentActorSuite.scala b/akka-persistence/akka-persistence-voldemort/src/test/scala/VoldemortPersistentActorSuite.scala
new file mode 100644
index 0000000000..ae575e1e96
--- /dev/null
+++ b/akka-persistence/akka-persistence-voldemort/src/test/scala/VoldemortPersistentActorSuite.scala
@@ -0,0 +1,176 @@
+package se.scalablesolutions.akka.persistence.voldemort
+
+import org.scalatest.Spec
+import org.scalatest.matchers.ShouldMatchers
+import org.scalatest.BeforeAndAfterEach
+import org.scalatest.junit.JUnitRunner
+import org.junit.runner.RunWith
+
+import se.scalablesolutions.akka.actor.{Transactor, Actor, ActorRef}
+import Actor._
+import BankAccountActor._
+
+
+case class Balance(accountNo: String)
+case class Debit(accountNo: String, amount: Int, failer: ActorRef)
+case class MultiDebit(accountNo: String, amounts: List[Int], failer: ActorRef)
+case class Credit(accountNo: String, amount: Int)
+case class Log(start: Int, finish: Int)
+case object LogSize
+
+object BankAccountActor {
+ val state = "accountState"
+ val tx = "txnLog"
+}
+
+class BankAccountActor extends Transactor {
+ private lazy val accountState = VoldemortStorage.newMap(state)
+ private lazy val txnLog = VoldemortStorage.newVector(tx)
+
+ import sjson.json.DefaultProtocol._
+ import sjson.json.JsonSerialization._
+
+ def receive: Receive = {
+ // check balance
+ case Balance(accountNo) =>
+ txnLog.add(("Balance:" + accountNo).getBytes)
+ self.reply(
+ accountState.get(accountNo.getBytes)
+ .map(frombinary[Int](_))
+ .getOrElse(0))
+
+ // debit amount: can fail
+ case Debit(accountNo, amount, failer) =>
+ txnLog.add(("Debit:" + accountNo + " " + amount).getBytes)
+ val m = accountState.get(accountNo.getBytes)
+ .map(frombinary[Int](_))
+ .getOrElse(0)
+
+ accountState.put(accountNo.getBytes, tobinary(m - amount))
+ if (amount > m) failer !! "Failure"
+
+ self.reply(m - amount)
+
+ // many debits: can fail
+ // demonstrates true rollback even if multiple puts have been done
+ case MultiDebit(accountNo, amounts, failer) =>
+ val sum = amounts.foldRight(0)(_ + _)
+ txnLog.add(("MultiDebit:" + accountNo + " " + sum).getBytes)
+
+ val m = accountState.get(accountNo.getBytes)
+ .map(frombinary[Int](_))
+ .getOrElse(0)
+
+ var cbal = m
+ amounts.foreach {
+ amount =>
+ accountState.put(accountNo.getBytes, tobinary(m - amount))
+ cbal = cbal - amount
+ if (cbal < 0) failer !! "Failure"
+ }
+
+ self.reply(m - sum)
+
+ // credit amount
+ case Credit(accountNo, amount) =>
+ txnLog.add(("Credit:" + accountNo + " " + amount).getBytes)
+ val m = accountState.get(accountNo.getBytes)
+ .map(frombinary[Int](_))
+ .getOrElse(0)
+
+ accountState.put(accountNo.getBytes, tobinary(m + amount))
+
+ self.reply(m + amount)
+
+ case LogSize =>
+ self.reply(txnLog.length)
+
+ case Log(start, finish) =>
+ self.reply(txnLog.slice(start, finish).map(new String(_)))
+ }
+}
+
+@serializable class PersistentFailerActor extends Transactor {
+ def receive = {
+ case "Failure" =>
+ throw new RuntimeException("Expected exception; to test fault-tolerance")
+ }
+}
+
+@RunWith(classOf[JUnitRunner])
+class VoldemortPersistentActorSuite extends
+Spec with
+ ShouldMatchers with
+ BeforeAndAfterEach with EmbeddedVoldemort {
+ import VoldemortStorageBackend._
+
+
+ override def beforeEach {
+ removeMapStorageFor(state)
+ var size = getVectorStorageSizeFor(tx)
+ (0 to size).foreach {
+ index => {
+ vectorValueClient.delete(getVectorValueKey(tx, index))
+ }
+ }
+ vectorSizeClient.delete(tx)
+ }
+
+ override def afterEach {
+ beforeEach
+ }
+
+ describe("successful debit") {
+ it("should debit successfully") {
+ val bactor = actorOf[BankAccountActor]
+ bactor.start
+ val failer = actorOf[PersistentFailerActor]
+ failer.start
+ bactor !! Credit("a-123", 5000)
+ bactor !! Debit("a-123", 3000, failer)
+
+ (bactor !! Balance("a-123")).get.asInstanceOf[Int] should equal(2000)
+
+ bactor !! Credit("a-123", 7000)
+ (bactor !! Balance("a-123")).get.asInstanceOf[Int] should equal(9000)
+
+ bactor !! Debit("a-123", 8000, failer)
+ (bactor !! Balance("a-123")).get.asInstanceOf[Int] should equal(1000)
+
+ (bactor !! LogSize).get.asInstanceOf[Int] should equal(7)
+ (bactor !! Log(0, 7)).get.asInstanceOf[Iterable[String]].size should equal(7)
+ }
+ }
+
+ describe("unsuccessful debit") {
+ it("debit should fail") {
+ val bactor = actorOf[BankAccountActor]
+ bactor.start
+ val failer = actorOf[PersistentFailerActor]
+ failer.start
+ bactor !! Credit("a-123", 5000)
+ (bactor !! Balance("a-123")).get.asInstanceOf[Int] should equal(5000)
+ evaluating {
+ bactor !! Debit("a-123", 7000, failer)
+ } should produce[Exception]
+ (bactor !! Balance("a-123")).get.asInstanceOf[Int] should equal(5000)
+ (bactor !! LogSize).get.asInstanceOf[Int] should equal(3)
+ }
+ }
+
+ describe("unsuccessful multidebit") {
+ it("multidebit should fail") {
+ val bactor = actorOf[BankAccountActor]
+ bactor.start
+ val failer = actorOf[PersistentFailerActor]
+ failer.start
+ bactor !! Credit("a-123", 5000)
+ (bactor !! Balance("a-123")).get.asInstanceOf[Int] should equal(5000)
+ evaluating {
+ bactor !! MultiDebit("a-123", List(1000, 2000, 4000), failer)
+ } should produce[Exception]
+ (bactor !! Balance("a-123")).get.asInstanceOf[Int] should equal(5000)
+ (bactor !! LogSize).get.asInstanceOf[Int] should equal(3)
+ }
+ }
+}
diff --git a/akka-persistence/akka-persistence-voldemort/src/test/scala/VoldemortStorageBackendSuite.scala b/akka-persistence/akka-persistence-voldemort/src/test/scala/VoldemortStorageBackendSuite.scala
index 8906daa5fb..419bd05555 100644
--- a/akka-persistence/akka-persistence-voldemort/src/test/scala/VoldemortStorageBackendSuite.scala
+++ b/akka-persistence/akka-persistence-voldemort/src/test/scala/VoldemortStorageBackendSuite.scala
@@ -35,10 +35,9 @@ class VoldemortStorageBackendSuite extends FunSuite with ShouldMatchers with Emb
val key = "testmapKey"
val mapKeys = new TreeSet[Array[Byte]] + bytes("key1")
mapKeyClient.delete(key)
- mapKeyClient.getValue(key, emptySet) should equal(emptySet)
- mapKeyClient.put(key, mapKeys)
- mapKeyClient.getValue(key, emptySet) should equal(mapKeys)
-
+ mapKeyClient.getValue(key, SortedSetSerializer.toBytes(emptySet)) should equal(SortedSetSerializer.toBytes(emptySet))
+ putMapKeys(key, mapKeys)
+ getMapKeys(key) should equal(mapKeys)
}
test("that map value storage and retrieval works") {