From beee516b3dcad8a63d14d0ca6ee8a5d9ff7fa3f4 Mon Sep 17 00:00:00 2001 From: ticktock Date: Thu, 16 Sep 2010 11:47:35 -0400 Subject: [PATCH] sorted set hand serialization and working actor test --- .../main/scala/VoldemortStorageBackend.scala | 73 ++++++-- .../src/test/resources/config/stores.xml | 2 +- .../scala/VoldemortPersistentActorSuite.scala | 176 ++++++++++++++++++ .../scala/VoldemortStorageBackendSuite.scala | 7 +- 4 files changed, 240 insertions(+), 18 deletions(-) create mode 100644 akka-persistence/akka-persistence-voldemort/src/test/scala/VoldemortPersistentActorSuite.scala diff --git a/akka-persistence/akka-persistence-voldemort/src/main/scala/VoldemortStorageBackend.scala b/akka-persistence/akka-persistence-voldemort/src/main/scala/VoldemortStorageBackend.scala index d0efd7347e..77fd7acedb 100644 --- a/akka-persistence/akka-persistence-voldemort/src/main/scala/VoldemortStorageBackend.scala +++ b/akka-persistence/akka-persistence-voldemort/src/main/scala/VoldemortStorageBackend.scala @@ -42,7 +42,7 @@ MapStorageBackend[Array[Byte], Array[Byte]] with } } var refClient: StoreClient[String, Array[Byte]] = storeClientFactory.getStoreClient(refStore) - var mapKeyClient: StoreClient[String, SortedSet[Array[Byte]]] = storeClientFactory.getStoreClient(mapKeyStore) + var mapKeyClient: StoreClient[String, Array[Byte]] = storeClientFactory.getStoreClient(mapKeyStore) var mapValueClient: StoreClient[Array[Byte], Array[Byte]] = storeClientFactory.getStoreClient(mapValueStore) var vectorSizeClient: StoreClient[String, Array[Byte]] = storeClientFactory.getStoreClient(vectorSizeStore) var vectorValueClient: StoreClient[Array[Byte], Array[Byte]] = storeClientFactory.getStoreClient(vectorValueStore) @@ -65,13 +65,13 @@ MapStorageBackend[Array[Byte], Array[Byte]] with } def getMapStorageRangeFor(name: String, start: Option[Array[Byte]], finish: Option[Array[Byte]], count: Int): List[(Array[Byte], Array[Byte])] = { - val allkeys: SortedSet[Array[Byte]] = mapKeyClient.getValue(name, new TreeSet[Array[Byte]]) + val allkeys: SortedSet[Array[Byte]] = getMapKeys(name) val range = allkeys.rangeImpl(start, finish).take(count) getKeyValues(name, range) } def getMapStorageFor(name: String): List[(Array[Byte], Array[Byte])] = { - val keys = mapKeyClient.getValue(name, new TreeSet[Array[Byte]]()) + val keys = getMapKeys(name) getKeyValues(name, keys) } @@ -95,7 +95,7 @@ MapStorageBackend[Array[Byte], Array[Byte]] with } def getMapStorageSizeFor(name: String): Int = { - val keys = mapKeyClient.getValue(name, new TreeSet[Array[Byte]]()) + val keys = getMapKeys(name) keys.size } @@ -108,15 +108,15 @@ MapStorageBackend[Array[Byte], Array[Byte]] with } def removeMapStorageFor(name: String, key: Array[Byte]) = { - var keys = mapKeyClient.getValue(name, new TreeSet[Array[Byte]]()) + var keys = getMapKeys(name) keys -= key - mapKeyClient.put(name, keys) + putMapKeys(name, keys) mapValueClient.delete(getKey(name, key)) } def removeMapStorageFor(name: String) = { - val keys = mapKeyClient.getValue(name, new TreeSet[Array[Byte]]()) + val keys = getMapKeys(name) keys.foreach { key => mapValueClient.delete(getKey(name, key)) @@ -126,9 +126,9 @@ MapStorageBackend[Array[Byte], Array[Byte]] with def insertMapStorageEntryFor(name: String, key: Array[Byte], value: Array[Byte]) = { mapValueClient.put(getKey(name, key), value) - var keys = mapKeyClient.getValue(name, new TreeSet[Array[Byte]]()) + var keys = getMapKeys(name) keys += key - mapKeyClient.put(name, keys) + putMapKeys(name, keys) } def insertMapStorageEntriesFor(name: String, entries: List[(Array[Byte], Array[Byte])]) = { @@ -138,9 +138,17 @@ MapStorageBackend[Array[Byte], Array[Byte]] with key } } - var keys = mapKeyClient.getValue(name, new TreeSet[Array[Byte]]()) + var keys = getMapKeys(name) keys ++= newKeys - mapKeyClient.put(name, keys) + putMapKeys(name, keys) + } + + def putMapKeys(name: String, keys: SortedSet[Array[Byte]]) = { + mapKeyClient.put(name, SortedSetSerializer.toBytes(keys)) + } + + def getMapKeys(name: String): SortedSet[Array[Byte]] = { + SortedSetSerializer.fromBytes(mapKeyClient.getValue(name, Array.empty[Byte])) } @@ -176,8 +184,7 @@ MapStorageBackend[Array[Byte], Array[Byte]] with idx += 1 } } - log.info("StorageSize:" + storage.size) - log.info("SeqSize:" + seq.size) + storage.toList } @@ -250,4 +257,44 @@ MapStorageBackend[Array[Byte], Array[Byte]] with def fromString(str: String) = str.toInt } + object SortedSetSerializer { + def toBytes(set: SortedSet[Array[Byte]]): Array[Byte] = { + val length = set.foldLeft(0) { + (total, bytes) => { + total + bytes.length + IntSerializer.bytesPerInt + } + } + val allBytes = new Array[Byte](length) + val written = set.foldLeft(0) { + (total, bytes) => { + val sizeBytes = IntSerializer.toBytes(bytes.length) + System.arraycopy(sizeBytes, 0, allBytes, total, sizeBytes.length) + System.arraycopy(bytes, 0, allBytes, total + sizeBytes.length, bytes.length) + total + sizeBytes.length + bytes.length + } + } + require(length == written, "Bytes Written Did not equal Calculated Length, written %d, length %d".format(written, length)) + allBytes + } + + def fromBytes(bytes: Array[Byte]): SortedSet[Array[Byte]] = { + var set = new TreeSet[Array[Byte]] + if (bytes.length > IntSerializer.bytesPerInt) { + var pos = 0 + while (pos < bytes.length) { + val lengthBytes = new Array[Byte](IntSerializer.bytesPerInt) + System.arraycopy(bytes, pos, lengthBytes, 0, IntSerializer.bytesPerInt) + pos += IntSerializer.bytesPerInt + val length = IntSerializer.fromBytes(lengthBytes) + val item = new Array[Byte](length) + System.arraycopy(bytes, pos, item, 0, length) + set = set + item + pos += length + } + } + set + } + + } + } \ No newline at end of file diff --git a/akka-persistence/akka-persistence-voldemort/src/test/resources/config/stores.xml b/akka-persistence/akka-persistence-voldemort/src/test/resources/config/stores.xml index 78f1b1385a..f2dd6ac099 100644 --- a/akka-persistence/akka-persistence-voldemort/src/test/resources/config/stores.xml +++ b/akka-persistence/akka-persistence-voldemort/src/test/resources/config/stores.xml @@ -46,7 +46,7 @@ utf8 - java-serialization + identity diff --git a/akka-persistence/akka-persistence-voldemort/src/test/scala/VoldemortPersistentActorSuite.scala b/akka-persistence/akka-persistence-voldemort/src/test/scala/VoldemortPersistentActorSuite.scala new file mode 100644 index 0000000000..ae575e1e96 --- /dev/null +++ b/akka-persistence/akka-persistence-voldemort/src/test/scala/VoldemortPersistentActorSuite.scala @@ -0,0 +1,176 @@ +package se.scalablesolutions.akka.persistence.voldemort + +import org.scalatest.Spec +import org.scalatest.matchers.ShouldMatchers +import org.scalatest.BeforeAndAfterEach +import org.scalatest.junit.JUnitRunner +import org.junit.runner.RunWith + +import se.scalablesolutions.akka.actor.{Transactor, Actor, ActorRef} +import Actor._ +import BankAccountActor._ + + +case class Balance(accountNo: String) +case class Debit(accountNo: String, amount: Int, failer: ActorRef) +case class MultiDebit(accountNo: String, amounts: List[Int], failer: ActorRef) +case class Credit(accountNo: String, amount: Int) +case class Log(start: Int, finish: Int) +case object LogSize + +object BankAccountActor { + val state = "accountState" + val tx = "txnLog" +} + +class BankAccountActor extends Transactor { + private lazy val accountState = VoldemortStorage.newMap(state) + private lazy val txnLog = VoldemortStorage.newVector(tx) + + import sjson.json.DefaultProtocol._ + import sjson.json.JsonSerialization._ + + def receive: Receive = { + // check balance + case Balance(accountNo) => + txnLog.add(("Balance:" + accountNo).getBytes) + self.reply( + accountState.get(accountNo.getBytes) + .map(frombinary[Int](_)) + .getOrElse(0)) + + // debit amount: can fail + case Debit(accountNo, amount, failer) => + txnLog.add(("Debit:" + accountNo + " " + amount).getBytes) + val m = accountState.get(accountNo.getBytes) + .map(frombinary[Int](_)) + .getOrElse(0) + + accountState.put(accountNo.getBytes, tobinary(m - amount)) + if (amount > m) failer !! "Failure" + + self.reply(m - amount) + + // many debits: can fail + // demonstrates true rollback even if multiple puts have been done + case MultiDebit(accountNo, amounts, failer) => + val sum = amounts.foldRight(0)(_ + _) + txnLog.add(("MultiDebit:" + accountNo + " " + sum).getBytes) + + val m = accountState.get(accountNo.getBytes) + .map(frombinary[Int](_)) + .getOrElse(0) + + var cbal = m + amounts.foreach { + amount => + accountState.put(accountNo.getBytes, tobinary(m - amount)) + cbal = cbal - amount + if (cbal < 0) failer !! "Failure" + } + + self.reply(m - sum) + + // credit amount + case Credit(accountNo, amount) => + txnLog.add(("Credit:" + accountNo + " " + amount).getBytes) + val m = accountState.get(accountNo.getBytes) + .map(frombinary[Int](_)) + .getOrElse(0) + + accountState.put(accountNo.getBytes, tobinary(m + amount)) + + self.reply(m + amount) + + case LogSize => + self.reply(txnLog.length) + + case Log(start, finish) => + self.reply(txnLog.slice(start, finish).map(new String(_))) + } +} + +@serializable class PersistentFailerActor extends Transactor { + def receive = { + case "Failure" => + throw new RuntimeException("Expected exception; to test fault-tolerance") + } +} + +@RunWith(classOf[JUnitRunner]) +class VoldemortPersistentActorSuite extends +Spec with + ShouldMatchers with + BeforeAndAfterEach with EmbeddedVoldemort { + import VoldemortStorageBackend._ + + + override def beforeEach { + removeMapStorageFor(state) + var size = getVectorStorageSizeFor(tx) + (0 to size).foreach { + index => { + vectorValueClient.delete(getVectorValueKey(tx, index)) + } + } + vectorSizeClient.delete(tx) + } + + override def afterEach { + beforeEach + } + + describe("successful debit") { + it("should debit successfully") { + val bactor = actorOf[BankAccountActor] + bactor.start + val failer = actorOf[PersistentFailerActor] + failer.start + bactor !! Credit("a-123", 5000) + bactor !! Debit("a-123", 3000, failer) + + (bactor !! Balance("a-123")).get.asInstanceOf[Int] should equal(2000) + + bactor !! Credit("a-123", 7000) + (bactor !! Balance("a-123")).get.asInstanceOf[Int] should equal(9000) + + bactor !! Debit("a-123", 8000, failer) + (bactor !! Balance("a-123")).get.asInstanceOf[Int] should equal(1000) + + (bactor !! LogSize).get.asInstanceOf[Int] should equal(7) + (bactor !! Log(0, 7)).get.asInstanceOf[Iterable[String]].size should equal(7) + } + } + + describe("unsuccessful debit") { + it("debit should fail") { + val bactor = actorOf[BankAccountActor] + bactor.start + val failer = actorOf[PersistentFailerActor] + failer.start + bactor !! Credit("a-123", 5000) + (bactor !! Balance("a-123")).get.asInstanceOf[Int] should equal(5000) + evaluating { + bactor !! Debit("a-123", 7000, failer) + } should produce[Exception] + (bactor !! Balance("a-123")).get.asInstanceOf[Int] should equal(5000) + (bactor !! LogSize).get.asInstanceOf[Int] should equal(3) + } + } + + describe("unsuccessful multidebit") { + it("multidebit should fail") { + val bactor = actorOf[BankAccountActor] + bactor.start + val failer = actorOf[PersistentFailerActor] + failer.start + bactor !! Credit("a-123", 5000) + (bactor !! Balance("a-123")).get.asInstanceOf[Int] should equal(5000) + evaluating { + bactor !! MultiDebit("a-123", List(1000, 2000, 4000), failer) + } should produce[Exception] + (bactor !! Balance("a-123")).get.asInstanceOf[Int] should equal(5000) + (bactor !! LogSize).get.asInstanceOf[Int] should equal(3) + } + } +} diff --git a/akka-persistence/akka-persistence-voldemort/src/test/scala/VoldemortStorageBackendSuite.scala b/akka-persistence/akka-persistence-voldemort/src/test/scala/VoldemortStorageBackendSuite.scala index 8906daa5fb..419bd05555 100644 --- a/akka-persistence/akka-persistence-voldemort/src/test/scala/VoldemortStorageBackendSuite.scala +++ b/akka-persistence/akka-persistence-voldemort/src/test/scala/VoldemortStorageBackendSuite.scala @@ -35,10 +35,9 @@ class VoldemortStorageBackendSuite extends FunSuite with ShouldMatchers with Emb val key = "testmapKey" val mapKeys = new TreeSet[Array[Byte]] + bytes("key1") mapKeyClient.delete(key) - mapKeyClient.getValue(key, emptySet) should equal(emptySet) - mapKeyClient.put(key, mapKeys) - mapKeyClient.getValue(key, emptySet) should equal(mapKeys) - + mapKeyClient.getValue(key, SortedSetSerializer.toBytes(emptySet)) should equal(SortedSetSerializer.toBytes(emptySet)) + putMapKeys(key, mapKeys) + getMapKeys(key) should equal(mapKeys) } test("that map value storage and retrieval works") {