diff --git a/akka-persistence/akka-persistence-voldemort/src/main/scala/VoldemortStorage.scala b/akka-persistence/akka-persistence-voldemort/src/main/scala/VoldemortStorage.scala new file mode 100644 index 0000000000..4e237267a5 --- /dev/null +++ b/akka-persistence/akka-persistence-voldemort/src/main/scala/VoldemortStorage.scala @@ -0,0 +1,43 @@ +/** + * Copyright (C) 2009-2010 Scalable Solutions AB + */ + +package se.scalablesolutions.akka.persistence.voldemort + +import se.scalablesolutions.akka.actor.{newUuid} +import se.scalablesolutions.akka.stm._ +import se.scalablesolutions.akka.persistence.common._ + + +object VoldemortStorage extends Storage { + + type ElementType = Array[Byte] + def newMap: PersistentMap[ElementType, ElementType] = newMap(newUuid.toString) + def newVector: PersistentVector[ElementType] = newVector(newUuid.toString) + def newRef: PersistentRef[ElementType] = newRef(newUuid.toString) + + def getMap(id: String): PersistentMap[ElementType, ElementType] = newMap(id) + def getVector(id: String): PersistentVector[ElementType] = newVector(id) + def getRef(id: String): PersistentRef[ElementType] = newRef(id) + + def newMap(id: String): PersistentMap[ElementType, ElementType] = new VoldemortPersistentMap(id) + def newVector(id: String): PersistentVector[ElementType] = new VoldemortPersistentVector(id) + def newRef(id: String): PersistentRef[ElementType] = new VoldemortPersistentRef(id) +} + + +class VoldemortPersistentMap(id: String) extends PersistentMapBinary { + val uuid = id + val storage = VoldemortStorageBackend +} + + +class VoldemortPersistentVector(id: String) extends PersistentVector[Array[Byte]] { + val uuid = id + val storage = VoldemortStorageBackend +} + +class VoldemortPersistentRef(id: String) extends PersistentRef[Array[Byte]] { + val uuid = id + val storage = VoldemortStorageBackend +} diff --git a/akka-persistence/akka-persistence-voldemort/src/main/scala/VoldemortStorageBackend.scala b/akka-persistence/akka-persistence-voldemort/src/main/scala/VoldemortStorageBackend.scala new file mode 100644 index 0000000000..83b74a4a05 --- /dev/null +++ b/akka-persistence/akka-persistence-voldemort/src/main/scala/VoldemortStorageBackend.scala @@ -0,0 +1,331 @@ +/** + * Copyright (C) 2009-2010 Scalable Solutions AB + */ + +package se.scalablesolutions.akka.persistence.voldemort + +import se.scalablesolutions.akka.stm._ +import se.scalablesolutions.akka.persistence.common._ +import se.scalablesolutions.akka.util.Logging +import se.scalablesolutions.akka.util.Helpers._ +import se.scalablesolutions.akka.config.Config.config + +import voldemort.client._ +import java.lang.String +import voldemort.utils.ByteUtils +import voldemort.versioning.Versioned +import collection.JavaConversions +import java.nio.ByteBuffer +import collection.Map +import collection.immutable.{IndexedSeq, SortedSet, TreeSet, HashMap} +import collection.mutable.{Set, HashSet, ArrayBuffer} +import java.util.{Properties, Map => JMap} + +private[akka] object VoldemortStorageBackend extends +MapStorageBackend[Array[Byte], Array[Byte]] with + VectorStorageBackend[Array[Byte]] with + RefStorageBackend[Array[Byte]] with + Logging { + val bootstrapUrlsProp = "bootstrap_urls" + val clientConfig = config.getConfigMap("akka.storage.voldemort.client") match { + case Some(configMap) => getClientConfig(configMap.asMap) + case None => getClientConfig(new HashMap[String, String] + (bootstrapUrlsProp -> "tcp://localhost:6666")) + } + val refStore = config.getString("akka.storage.voldemort.store.ref", "Refs") + val mapKeyStore = config.getString("akka.storage.voldemort.store.map-key", "MapKeys") + val mapValueStore = config.getString("akka.storage.voldemort.store.map-value", "MapValues") + val vectorSizeStore = config.getString("akka.storage.voldemort.store.vector-size", "VectorSizes") + val vectorValueStore = config.getString("akka.storage.voldemort.store.vector-value", "VectorValues") + + var storeClientFactory: StoreClientFactory = null + var refClient: StoreClient[String, Array[Byte]] = null + var mapKeyClient: StoreClient[String, Array[Byte]] = null + var mapValueClient: StoreClient[Array[Byte], Array[Byte]] = null + var vectorSizeClient: StoreClient[String, Array[Byte]] = null + var vectorValueClient: StoreClient[Array[Byte], Array[Byte]] = null + initStoreClients + + val underscoreBytesUTF8 = "_".getBytes("UTF-8") + implicit val byteOrder = new Ordering[Array[Byte]] { + override def compare(x: Array[Byte], y: Array[Byte]) = ByteUtils.compare(x, y) + } + + + def getRefStorageFor(name: String): Option[Array[Byte]] = { + val result: Array[Byte] = refClient.getValue(name) + result match { + case null => None + case _ => Some(result) + } + } + + def insertRefStorageFor(name: String, element: Array[Byte]) = { + refClient.put(name, element) + } + + def getMapStorageRangeFor(name: String, start: Option[Array[Byte]], finish: Option[Array[Byte]], count: Int): List[(Array[Byte], Array[Byte])] = { + val allkeys: SortedSet[Array[Byte]] = getMapKeys(name) + val range = allkeys.rangeImpl(start, finish).take(count) + getKeyValues(name, range) + } + + def getMapStorageFor(name: String): List[(Array[Byte], Array[Byte])] = { + val keys = getMapKeys(name) + getKeyValues(name, keys) + } + + private def getKeyValues(name: String, keys: SortedSet[Array[Byte]]): List[(Array[Byte], Array[Byte])] = { + val all: JMap[Array[Byte], Versioned[Array[Byte]]] = + mapValueClient.getAll(JavaConversions.asIterable(keys.map { + mapKey => getKey(name, mapKey) + })) + + val buf = new ArrayBuffer[(Array[Byte], Array[Byte])](all.size) + JavaConversions.asMap(all).foreach { + (entry) => { + entry match { + case (key: Array[Byte], versioned: Versioned[Array[Byte]]) => { + buf += key -> versioned.getValue + } + } + } + } + buf.toList + } + + def getMapStorageSizeFor(name: String): Int = { + val keys = getMapKeys(name) + keys.size + } + + def getMapStorageEntryFor(name: String, key: Array[Byte]): Option[Array[Byte]] = { + val result: Array[Byte] = mapValueClient.getValue(getKey(name, key)) + result match { + case null => None + case _ => Some(result) + } + } + + def removeMapStorageFor(name: String, key: Array[Byte]) = { + var keys = getMapKeys(name) + keys -= key + putMapKeys(name, keys) + mapValueClient.delete(getKey(name, key)) + } + + + def removeMapStorageFor(name: String) = { + val keys = getMapKeys(name) + keys.foreach { + key => + mapValueClient.delete(getKey(name, key)) + } + mapKeyClient.delete(name) + } + + def insertMapStorageEntryFor(name: String, key: Array[Byte], value: Array[Byte]) = { + mapValueClient.put(getKey(name, key), value) + var keys = getMapKeys(name) + keys += key + putMapKeys(name, keys) + } + + def insertMapStorageEntriesFor(name: String, entries: List[(Array[Byte], Array[Byte])]) = { + val newKeys = entries.map { + case (key, value) => { + mapValueClient.put(getKey(name, key), value) + key + } + } + var keys = getMapKeys(name) + keys ++= newKeys + putMapKeys(name, keys) + } + + def putMapKeys(name: String, keys: SortedSet[Array[Byte]]) = { + mapKeyClient.put(name, SortedSetSerializer.toBytes(keys)) + } + + def getMapKeys(name: String): SortedSet[Array[Byte]] = { + SortedSetSerializer.fromBytes(mapKeyClient.getValue(name, Array.empty[Byte])) + } + + + def getVectorStorageSizeFor(name: String): Int = { + IntSerializer.fromBytes(vectorSizeClient.getValue(name, IntSerializer.toBytes(0))) + } + + + def getVectorStorageRangeFor(name: String, start: Option[Int], finish: Option[Int], count: Int): List[Array[Byte]] = { + val size = getVectorStorageSizeFor(name) + val st = start.getOrElse(0) + val cnt = + if (finish.isDefined) { + val f = finish.get + if (f >= st) (f - st) else count + } else { + count + } + val seq: IndexedSeq[Array[Byte]] = (st until st + cnt).map { + index => getVectorValueKey(name, index) + } + + val all: JMap[Array[Byte], Versioned[Array[Byte]]] = vectorValueClient.getAll(JavaConversions.asIterable(seq)) + + var storage = new ArrayBuffer[Array[Byte]](seq.size) + storage = storage.padTo(seq.size, Array.empty[Byte]) + var idx = 0; + seq.foreach { + key => { + if (all.containsKey(key)) { + storage.update(idx, all.get(key).getValue) + } + idx += 1 + } + } + + storage.toList + } + + + def getVectorStorageEntryFor(name: String, index: Int): Array[Byte] = { + vectorValueClient.getValue(getVectorValueKey(name, index), Array.empty[Byte]) + } + + def updateVectorStorageEntryFor(name: String, index: Int, elem: Array[Byte]) = { + val size = getVectorStorageSizeFor(name) + vectorValueClient.put(getVectorValueKey(name, index), elem) + if (size < index + 1) { + vectorSizeClient.put(name, IntSerializer.toBytes(index + 1)) + } + } + + def insertVectorStorageEntriesFor(name: String, elements: List[Array[Byte]]) = { + var size = getVectorStorageSizeFor(name) + elements.foreach { + element => + vectorValueClient.put(getVectorValueKey(name, size), element) + size += 1 + } + vectorSizeClient.put(name, IntSerializer.toBytes(size)) + } + + def insertVectorStorageEntryFor(name: String, element: Array[Byte]) = { + insertVectorStorageEntriesFor(name, List(element)) + } + + + /** + * Concat the ownerlenght+owner+key+ of owner so owned data will be colocated + * Store the length of owner as first byte to work around the rare case + * where ownerbytes1 + keybytes1 == ownerbytes2 + keybytes2 but ownerbytes1 != ownerbytes2 + */ + def getKey(owner: String, key: Array[Byte]): Array[Byte] = { + val ownerBytes: Array[Byte] = owner.getBytes("UTF-8") + val ownerLenghtBytes: Array[Byte] = IntSerializer.toBytes(owner.length) + val theKey = new Array[Byte](ownerLenghtBytes.length + ownerBytes.length + key.length) + System.arraycopy(ownerLenghtBytes, 0, theKey, 0, ownerLenghtBytes.length) + System.arraycopy(ownerBytes, 0, theKey, ownerLenghtBytes.length, ownerBytes.length) + System.arraycopy(key, 0, theKey, ownerLenghtBytes.length + ownerBytes.length, key.length) + theKey + } + + def getVectorValueKey(owner: String, index: Int): Array[Byte] = { + val indexbytes = IntSerializer.toBytes(index) + val theIndexKey = new Array[Byte](underscoreBytesUTF8.length + indexbytes.length) + System.arraycopy(underscoreBytesUTF8, 0, theIndexKey, 0, underscoreBytesUTF8.length) + System.arraycopy(indexbytes, 0, theIndexKey, underscoreBytesUTF8.length, indexbytes.length) + getKey(owner, theIndexKey) + } + + def getIndexFromVectorValueKey(owner: String, key: Array[Byte]): Int = { + val indexBytes = new Array[Byte](IntSerializer.bytesPerInt) + System.arraycopy(key, key.length - IntSerializer.bytesPerInt, indexBytes, 0, IntSerializer.bytesPerInt) + IntSerializer.fromBytes(indexBytes) + } + + + def getClientConfig(configMap: Map[String, String]): Properties = { + val properites = new Properties + configMap.foreach { + keyval => keyval match { + case (key, value) => properites.setProperty(key.asInstanceOf[java.lang.String], value.asInstanceOf[java.lang.String]) + } + } + properites + } + + def initStoreClients() = { + if (storeClientFactory != null) { + storeClientFactory.close + } + + storeClientFactory = { + if (clientConfig.getProperty(bootstrapUrlsProp, "none").startsWith("tcp")) { + new SocketStoreClientFactory(new ClientConfig(clientConfig)) + } else if (clientConfig.getProperty(bootstrapUrlsProp, "none").startsWith("http")) { + new HttpStoreClientFactory(new ClientConfig(clientConfig)) + } else { + throw new IllegalArgumentException("Unknown boostrapUrl syntax" + clientConfig.getProperty(bootstrapUrlsProp, "No Bootstrap URLs defined")) + } + } + refClient = storeClientFactory.getStoreClient(refStore) + mapKeyClient = storeClientFactory.getStoreClient(mapKeyStore) + mapValueClient = storeClientFactory.getStoreClient(mapValueStore) + vectorSizeClient = storeClientFactory.getStoreClient(vectorSizeStore) + vectorValueClient = storeClientFactory.getStoreClient(vectorValueStore) + } + + object IntSerializer { + val bytesPerInt = java.lang.Integer.SIZE / java.lang.Byte.SIZE + + def toBytes(i: Int) = ByteBuffer.wrap(new Array[Byte](bytesPerInt)).putInt(i).array() + + def fromBytes(bytes: Array[Byte]) = ByteBuffer.wrap(bytes).getInt() + + def toString(obj: Int) = obj.toString + + def fromString(str: String) = str.toInt + } + + object SortedSetSerializer { + def toBytes(set: SortedSet[Array[Byte]]): Array[Byte] = { + val length = set.foldLeft(0) { + (total, bytes) => { + total + bytes.length + IntSerializer.bytesPerInt + } + } + val allBytes = new Array[Byte](length) + val written = set.foldLeft(0) { + (total, bytes) => { + val sizeBytes = IntSerializer.toBytes(bytes.length) + System.arraycopy(sizeBytes, 0, allBytes, total, sizeBytes.length) + System.arraycopy(bytes, 0, allBytes, total + sizeBytes.length, bytes.length) + total + sizeBytes.length + bytes.length + } + } + require(length == written, "Bytes Written Did not equal Calculated Length, written %d, length %d".format(written, length)) + allBytes + } + + def fromBytes(bytes: Array[Byte]): SortedSet[Array[Byte]] = { + var set = new TreeSet[Array[Byte]] + if (bytes.length > IntSerializer.bytesPerInt) { + var pos = 0 + while (pos < bytes.length) { + val lengthBytes = new Array[Byte](IntSerializer.bytesPerInt) + System.arraycopy(bytes, pos, lengthBytes, 0, IntSerializer.bytesPerInt) + pos += IntSerializer.bytesPerInt + val length = IntSerializer.fromBytes(lengthBytes) + val item = new Array[Byte](length) + System.arraycopy(bytes, pos, item, 0, length) + set = set + item + pos += length + } + } + set + } + + } + +} \ No newline at end of file diff --git a/akka-persistence/akka-persistence-voldemort/src/test/resources/config/cluster.xml b/akka-persistence/akka-persistence-voldemort/src/test/resources/config/cluster.xml new file mode 100644 index 0000000000..dcf806b0ca --- /dev/null +++ b/akka-persistence/akka-persistence-voldemort/src/test/resources/config/cluster.xml @@ -0,0 +1,14 @@ + + + akka-test + + + 0 + localhost + 8081 + 6666 + 6667 + + 0,1,2,3 + + diff --git a/akka-persistence/akka-persistence-voldemort/src/test/resources/config/server.properties b/akka-persistence/akka-persistence-voldemort/src/test/resources/config/server.properties new file mode 100644 index 0000000000..8f5a8ff884 --- /dev/null +++ b/akka-persistence/akka-persistence-voldemort/src/test/resources/config/server.properties @@ -0,0 +1,2 @@ +node.id=0 +enable.rebalancing=false diff --git a/akka-persistence/akka-persistence-voldemort/src/test/resources/config/stores.xml b/akka-persistence/akka-persistence-voldemort/src/test/resources/config/stores.xml new file mode 100644 index 0000000000..f2dd6ac099 --- /dev/null +++ b/akka-persistence/akka-persistence-voldemort/src/test/resources/config/stores.xml @@ -0,0 +1,85 @@ + + + Refs + 1 + 1 + 1 + 1 + 1 + bdb + client + + string + utf8 + + + identity + + + + MapValues + 1 + 1 + 1 + 1 + 1 + bdb + client + + identity + + + identity + + + + MapKeys + 1 + 1 + 1 + 1 + 1 + bdb + client + + string + utf8 + + + identity + + + + VectorValues + 1 + 1 + 1 + 1 + 1 + bdb + client + + identity + + + identity + + + + VectorSizes + 1 + 1 + 1 + 1 + 1 + bdb + client + + string + utf8 + + + identity + + + \ No newline at end of file diff --git a/akka-persistence/akka-persistence-voldemort/src/test/scala/EmbeddedVoldemort.scala b/akka-persistence/akka-persistence-voldemort/src/test/scala/EmbeddedVoldemort.scala new file mode 100644 index 0000000000..9522d4a33a --- /dev/null +++ b/akka-persistence/akka-persistence-voldemort/src/test/scala/EmbeddedVoldemort.scala @@ -0,0 +1,38 @@ +package se.scalablesolutions.akka.persistence.voldemort + +import org.scalatest.matchers.ShouldMatchers +import voldemort.server.{VoldemortServer, VoldemortConfig} +import org.scalatest.{Suite, BeforeAndAfterAll, FunSuite} +import org.junit.runner.RunWith +import org.scalatest.junit.JUnitRunner +import voldemort.utils.Utils +import java.io.File +import se.scalablesolutions.akka.util.{Logging} + +@RunWith(classOf[JUnitRunner]) +trait EmbeddedVoldemort extends BeforeAndAfterAll with Logging { + this: Suite => + var server: VoldemortServer = null + + override protected def beforeAll(): Unit = { + + try { + val dir = "./akka-persistence/akka-persistence-voldemort/target/scala_2.8.0/test-resources" + val home = new File(dir) + log.info("Creating Voldemort Config") + val config = VoldemortConfig.loadFromVoldemortHome(home.getCanonicalPath) + log.info("Starting Voldemort") + server = new VoldemortServer(config) + server.start + VoldemortStorageBackend.initStoreClients + log.info("Started") + } catch { + case e => log.error(e, "Error Starting Voldemort") + throw e + } + } + + override protected def afterAll(): Unit = { + server.stop + } +} \ No newline at end of file diff --git a/akka-persistence/akka-persistence-voldemort/src/test/scala/VoldemortPersistentActorSuite.scala b/akka-persistence/akka-persistence-voldemort/src/test/scala/VoldemortPersistentActorSuite.scala new file mode 100644 index 0000000000..f76c370667 --- /dev/null +++ b/akka-persistence/akka-persistence-voldemort/src/test/scala/VoldemortPersistentActorSuite.scala @@ -0,0 +1,180 @@ +package se.scalablesolutions.akka.persistence.voldemort + +import org.scalatest.Spec +import org.scalatest.matchers.ShouldMatchers +import org.scalatest.BeforeAndAfterEach +import org.scalatest.junit.JUnitRunner +import org.junit.runner.RunWith + +import se.scalablesolutions.akka.actor.{Transactor, Actor, ActorRef} +import Actor._ +import BankAccountActor._ + + +case class Balance(accountNo: String) +case class Debit(accountNo: String, amount: Int, failer: ActorRef) +case class MultiDebit(accountNo: String, amounts: List[Int], failer: ActorRef) +case class Credit(accountNo: String, amount: Int) +case class Log(start: Int, finish: Int) +case object LogSize + +object BankAccountActor { + val state = "accountState" + val tx = "txnLog" +} + +class BankAccountActor extends Transactor { + private val accountState = VoldemortStorage.newMap(state) + private val txnLog = VoldemortStorage.newVector(tx) + + import sjson.json.DefaultProtocol._ + import sjson.json.JsonSerialization._ + + def receive: Receive = { + // check balance + case Balance(accountNo) => + txnLog.add(("Balance:" + accountNo).getBytes) + self.reply( + accountState.get(accountNo.getBytes) + .map(frombinary[Int](_)) + .getOrElse(0)) + + // debit amount: can fail + case Debit(accountNo, amount, failer) => + txnLog.add(("Debit:" + accountNo + " " + amount).getBytes) + val m = accountState.get(accountNo.getBytes) + .map(frombinary[Int](_)) + .getOrElse(0) + + accountState.put(accountNo.getBytes, tobinary(m - amount)) + if (amount > m) failer !! "Failure" + + self.reply(m - amount) + + // many debits: can fail + // demonstrates true rollback even if multiple puts have been done + case MultiDebit(accountNo, amounts, failer) => + val sum = amounts.foldRight(0)(_ + _) + txnLog.add(("MultiDebit:" + accountNo + " " + sum).getBytes) + + val m = accountState.get(accountNo.getBytes) + .map(frombinary[Int](_)) + .getOrElse(0) + + var cbal = m + amounts.foreach { + amount => + accountState.put(accountNo.getBytes, tobinary(m - amount)) + cbal = cbal - amount + if (cbal < 0) failer !! "Failure" + } + + self.reply(m - sum) + + // credit amount + case Credit(accountNo, amount) => + txnLog.add(("Credit:" + accountNo + " " + amount).getBytes) + val m = accountState.get(accountNo.getBytes) + .map(frombinary[Int](_)) + .getOrElse(0) + + accountState.put(accountNo.getBytes, tobinary(m + amount)) + + self.reply(m + amount) + + case LogSize => + self.reply(txnLog.length) + + case Log(start, finish) => + self.reply(txnLog.slice(start, finish).map(new String(_))) + } +} + +@serializable class PersistentFailerActor extends Transactor { + def receive = { + case "Failure" => + throw new RuntimeException("Expected exception; to test fault-tolerance") + } +} + +@RunWith(classOf[JUnitRunner]) +class VoldemortPersistentActorSuite extends +Spec with + ShouldMatchers with + BeforeAndAfterEach with EmbeddedVoldemort { + import VoldemortStorageBackend._ + + + override def beforeEach { + removeMapStorageFor(state) + var size = getVectorStorageSizeFor(tx) + (0 to size).foreach { + index => { + vectorValueClient.delete(getVectorValueKey(tx, index)) + } + } + vectorSizeClient.delete(tx) + } + + override def afterEach { + beforeEach + } + + describe("successful debit") { + it("should debit successfully") { + log.info("Succesful Debit starting") + val bactor = actorOf[BankAccountActor] + bactor.start + val failer = actorOf[PersistentFailerActor] + failer.start + bactor !! Credit("a-123", 5000) + log.info("credited") + bactor !! Debit("a-123", 3000, failer) + log.info("debited") + (bactor !! Balance("a-123")).get.asInstanceOf[Int] should equal(2000) + log.info("balane matched") + bactor !! Credit("a-123", 7000) + log.info("Credited") + (bactor !! Balance("a-123")).get.asInstanceOf[Int] should equal(9000) + log.info("Balance matched") + bactor !! Debit("a-123", 8000, failer) + log.info("Debited") + (bactor !! Balance("a-123")).get.asInstanceOf[Int] should equal(1000) + log.info("Balance matched") + (bactor !! LogSize).get.asInstanceOf[Int] should equal(7) + (bactor !! Log(0, 7)).get.asInstanceOf[Iterable[String]].size should equal(7) + } + } + + describe("unsuccessful debit") { + it("debit should fail") { + val bactor = actorOf[BankAccountActor] + bactor.start + val failer = actorOf[PersistentFailerActor] + failer.start + bactor !! Credit("a-123", 5000) + (bactor !! Balance("a-123")).get.asInstanceOf[Int] should equal(5000) + evaluating { + bactor !! Debit("a-123", 7000, failer) + } should produce[Exception] + (bactor !! Balance("a-123")).get.asInstanceOf[Int] should equal(5000) + (bactor !! LogSize).get.asInstanceOf[Int] should equal(3) + } + } + + describe("unsuccessful multidebit") { + it("multidebit should fail") { + val bactor = actorOf[BankAccountActor] + bactor.start + val failer = actorOf[PersistentFailerActor] + failer.start + bactor !! Credit("a-123", 5000) + (bactor !! Balance("a-123")).get.asInstanceOf[Int] should equal(5000) + evaluating { + bactor !! MultiDebit("a-123", List(1000, 2000, 4000), failer) + } should produce[Exception] + (bactor !! Balance("a-123")).get.asInstanceOf[Int] should equal(5000) + (bactor !! LogSize).get.asInstanceOf[Int] should equal(3) + } + } +} diff --git a/akka-persistence/akka-persistence-voldemort/src/test/scala/VoldemortPersistentDatastructureSuite.scala b/akka-persistence/akka-persistence-voldemort/src/test/scala/VoldemortPersistentDatastructureSuite.scala new file mode 100644 index 0000000000..76bb989ac9 --- /dev/null +++ b/akka-persistence/akka-persistence-voldemort/src/test/scala/VoldemortPersistentDatastructureSuite.scala @@ -0,0 +1,87 @@ +package se.scalablesolutions.akka.persistence.voldemort + +import org.scalatest.FunSuite +import org.scalatest.matchers.ShouldMatchers +import org.junit.runner.RunWith +import org.scalatest.junit.JUnitRunner +import se.scalablesolutions.akka.persistence.voldemort.VoldemortStorageBackend._ +import se.scalablesolutions.akka.actor.{newUuid,Uuid} +import collection.immutable.TreeSet +import VoldemortStorageBackendSuite._ + +import se.scalablesolutions.akka.stm._ +import se.scalablesolutions.akka.stm.global._ +import se.scalablesolutions.akka.config.ScalaConfig._ +import se.scalablesolutions.akka.persistence.common._ +import se.scalablesolutions.akka.util.Logging +import se.scalablesolutions.akka.config.Config.config + +@RunWith(classOf[JUnitRunner]) +class VoldemortPersistentDatastructureSuite extends FunSuite with ShouldMatchers with EmbeddedVoldemort with Logging { + test("persistentRefs work as expected") { + val name = newUuid.toString + val one = "one".getBytes + atomic { + val ref = VoldemortStorage.getRef(name) + ref.isDefined should be(false) + ref.swap(one) + ref.get match { + case Some(bytes) => bytes should be(one) + case None => true should be(false) + } + } + val two = "two".getBytes + atomic { + val ref = VoldemortStorage.getRef(name) + ref.isDefined should be(true) + ref.swap(two) + ref.get match { + case Some(bytes) => bytes should be(two) + case None => true should be(false) + } + } + } + + + test("Persistent Vectors function as expected") { + val name = newUuid.toString + val one = "one".getBytes + val two = "two".getBytes + atomic { + val vec = VoldemortStorage.getVector(name) + vec.add(one) + } + atomic { + val vec = VoldemortStorage.getVector(name) + vec.size should be(1) + vec.add(two) + } + atomic { + val vec = VoldemortStorage.getVector(name) + + vec.get(0) should be(one) + vec.get(1) should be(two) + vec.size should be(2) + vec.update(0, two) + } + + atomic { + val vec = VoldemortStorage.getVector(name) + vec.get(0) should be(two) + vec.get(1) should be(two) + vec.size should be(2) + vec.update(0, Array.empty[Byte]) + vec.update(1, Array.empty[Byte]) + } + + atomic { + val vec = VoldemortStorage.getVector(name) + vec.get(0) should be(Array.empty[Byte]) + vec.get(1) should be(Array.empty[Byte]) + vec.size should be(2) + } + + + } + +} \ No newline at end of file diff --git a/akka-persistence/akka-persistence-voldemort/src/test/scala/VoldemortStorageBackendSuite.scala b/akka-persistence/akka-persistence-voldemort/src/test/scala/VoldemortStorageBackendSuite.scala new file mode 100644 index 0000000000..aa5f88f020 --- /dev/null +++ b/akka-persistence/akka-persistence-voldemort/src/test/scala/VoldemortStorageBackendSuite.scala @@ -0,0 +1,147 @@ +package se.scalablesolutions.akka.persistence.voldemort + +import org.scalatest.FunSuite +import org.scalatest.matchers.ShouldMatchers +import org.junit.runner.RunWith +import org.scalatest.junit.JUnitRunner +import se.scalablesolutions.akka.persistence.voldemort.VoldemortStorageBackend._ +import se.scalablesolutions.akka.util.{Logging} +import collection.immutable.TreeSet +import VoldemortStorageBackendSuite._ + +@RunWith(classOf[JUnitRunner]) +class VoldemortStorageBackendSuite extends FunSuite with ShouldMatchers with EmbeddedVoldemort with Logging { + test("that ref storage and retrieval works") { + val key = "testRef" + val value = "testRefValue" + val valueBytes = bytes(value) + refClient.delete(key) + refClient.getValue(key, empty) should be(empty) + refClient.put(key, valueBytes) + refClient.getValue(key) should be(valueBytes) + } + + test("PersistentRef apis function as expected") { + val key = "apiTestRef" + val value = "apiTestRefValue" + val valueBytes = bytes(value) + refClient.delete(key) + getRefStorageFor(key) should be(None) + insertRefStorageFor(key, valueBytes) + getRefStorageFor(key).get should equal(valueBytes) + } + + test("that map key storage and retrieval works") { + val key = "testmapKey" + val mapKeys = new TreeSet[Array[Byte]] + bytes("key1") + mapKeyClient.delete(key) + mapKeyClient.getValue(key, SortedSetSerializer.toBytes(emptySet)) should equal(SortedSetSerializer.toBytes(emptySet)) + putMapKeys(key, mapKeys) + getMapKeys(key) should equal(mapKeys) + } + + test("that map value storage and retrieval works") { + val key = bytes("keyForTestingMapValueClient") + val value = bytes("value for testing map value client") + mapValueClient.put(key, value) + mapValueClient.getValue(key, empty) should equal(value) + } + + + test("PersistentMap apis function as expected") { + val name = "theMap" + val key = bytes("mapkey") + val value = bytes("mapValue") + removeMapStorageFor(name, key) + removeMapStorageFor(name) + getMapStorageEntryFor(name, key) should be(None) + getMapStorageSizeFor(name) should be(0) + getMapStorageFor(name).length should be(0) + getMapStorageRangeFor(name, None, None, 100).length should be(0) + + insertMapStorageEntryFor(name, key, value) + + getMapStorageEntryFor(name, key).get should equal(value) + getMapStorageSizeFor(name) should be(1) + getMapStorageFor(name).length should be(1) + getMapStorageRangeFor(name, None, None, 100).length should be(1) + + removeMapStorageFor(name, key) + removeMapStorageFor(name) + getMapStorageEntryFor(name, key) should be(None) + getMapStorageSizeFor(name) should be(0) + getMapStorageFor(name).length should be(0) + getMapStorageRangeFor(name, None, None, 100).length should be(0) + + insertMapStorageEntriesFor(name, List(key -> value)) + + getMapStorageEntryFor(name, key).get should equal(value) + getMapStorageSizeFor(name) should be(1) + getMapStorageFor(name).length should be(1) + getMapStorageRangeFor(name, None, None, 100).length should be(1) + + } + + test("that vector size storage and retrieval works") { + val key = "vectorKey" + val size = IntSerializer.toBytes(17) + vectorSizeClient.delete(key) + vectorSizeClient.getValue(key, empty) should equal(empty) + vectorSizeClient.put(key, size) + vectorSizeClient.getValue(key) should equal(size) + } + + test("that vector value storage and retrieval works") { + val key = "vectorValueKey" + val index = 3 + val value = bytes("some bytes") + val vecKey = getVectorValueKey(key, index) + getIndexFromVectorValueKey(key, vecKey) should be(index) + vectorValueClient.delete(vecKey) + vectorValueClient.getValue(vecKey, empty) should equal(empty) + vectorValueClient.put(vecKey, value) + vectorValueClient.getValue(vecKey) should equal(value) + } + + test("PersistentVector apis function as expected") { + val key = "vectorApiKey" + val value = bytes("Some bytes we want to store in a vector") + val updatedValue = bytes("Some updated bytes we want to store in a vector") + vectorSizeClient.delete(key) + vectorValueClient.delete(getVectorValueKey(key, 0)) + vectorValueClient.delete(getVectorValueKey(key, 1)) + getVectorStorageEntryFor(key, 0) should be(empty) + getVectorStorageEntryFor(key, 1) should be(empty) + getVectorStorageRangeFor(key, None, None, 1).head should be(empty) + + insertVectorStorageEntryFor(key, value) + //again + insertVectorStorageEntryFor(key, value) + + getVectorStorageEntryFor(key, 0) should be(value) + getVectorStorageEntryFor(key, 1) should be(value) + getVectorStorageRangeFor(key, None, None, 1).head should be(value) + getVectorStorageRangeFor(key, Some(1), None, 1).head should be(value) + getVectorStorageSizeFor(key) should be(2) + + updateVectorStorageEntryFor(key, 1, updatedValue) + + getVectorStorageEntryFor(key, 0) should be(value) + getVectorStorageEntryFor(key, 1) should be(updatedValue) + getVectorStorageRangeFor(key, None, None, 1).head should be(value) + getVectorStorageRangeFor(key, Some(1), None, 1).head should be(updatedValue) + getVectorStorageSizeFor(key) should be(2) + + } + +} + +object VoldemortStorageBackendSuite { + val empty = Array.empty[Byte] + val emptySet = new TreeSet[Array[Byte]] + + def bytes(value: String): Array[Byte] = { + value.getBytes("UTF-8") + } + +} \ No newline at end of file diff --git a/config/akka-reference.conf b/config/akka-reference.conf index 728f269170..eec56c7f06 100644 --- a/config/akka-reference.conf +++ b/config/akka-reference.conf @@ -168,5 +168,19 @@ akka { hbase { zookeeper-quorum = "localhost" } + + voldemort { + store { + refs = "Refs" # Voldemort Store Used to Persist Refs. Use string serializer for keys, identity serializer for values + map-keys = "MapKeys" # Voldemort Store Used to Persist Map Keys. Use string serializer for keys, identity serializer for values + map-values = "MapValues" # Voldemort Store Used to Persist Map Values. Use identity serializer for keys, identity serializer for values + vector-sizes = "VectorSizes" # Voldemort Store Used to Persist Vector Sizes. Use string serializer for keys, identity serializer for values + vector-values = "VectorValues" # Voldemort Store Used to Persist Vector Values. Use identity serializer for keys, identity serializer for values + } + + client { # The KeyValue pairs under client are converted to java Properties and used to construct the ClientConfig + bootstrap_urls = "tcp://localhost:6666" # All Valid Voldemort Client properties are valid here, in string form + } + } } } diff --git a/project/build/AkkaProject.scala b/project/build/AkkaProject.scala index 71f39887a0..362dabfb1c 100644 --- a/project/build/AkkaProject.scala +++ b/project/build/AkkaProject.scala @@ -53,6 +53,8 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) { lazy val SunJDMKRepo = MavenRepository("Sun JDMK Repo", "http://wp5.e-taxonomy.eu/cdmlib/mavenrepo") lazy val CasbahRepoReleases = MavenRepository("Casbah Release Repo", "http://repo.bumnetworks.com/releases") lazy val ZookeeperRepo = MavenRepository("Zookeeper Repo", "http://lilycms.org/maven/maven2/deploy/") + lazy val ClojarsRepo = MavenRepository("Clojars Repo", "http://clojars.org/repo") + lazy val OracleRepo = MavenRepository("Oracle Repo", "http://download.oracle.com/maven") } // ------------------------------------------------------------------------------------------------------------------- @@ -83,6 +85,8 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) { lazy val zookeeperRelease = ModuleConfiguration("org.apache.hadoop.zookeeper",ZookeeperRepo) lazy val casbahModuleConfig = ModuleConfiguration("com.novus", CasbahRepo) lazy val timeModuleConfig = ModuleConfiguration("org.scala-tools", "time", CasbahSnapshotRepo) + lazy val voldemortModuleConfig = ModuleConfiguration("voldemort", ClojarsRepo) + lazy val sleepycatModuleConfig = ModuleConfiguration("com.sleepycat", OracleRepo) lazy val embeddedRepo = EmbeddedRepo // This is the only exception, because the embedded repo is fast! // ------------------------------------------------------------------------------------------------------------------- @@ -193,6 +197,7 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) { lazy val sbinary = "sbinary" % "sbinary" % "2.8.0-0.3.1" % "compile" lazy val sjson = "sjson.json" % "sjson" % "0.8-2.8.0" % "compile" + lazy val sjson_test = "sjson.json" % "sjson" % "0.8-2.8.0" % "test" lazy val slf4j = "org.slf4j" % "slf4j-api" % SLF4J_VERSION % "compile" @@ -206,6 +211,10 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) { lazy val thrift = "com.facebook" % "thrift" % "r917130" % "compile" + lazy val voldemort = "voldemort" % "voldemort" % "0.81" % "compile" + lazy val voldemort_contrib = "voldemort" % "voldemort-contrib" % "0.81" % "compile" + lazy val voldemort_needs_log4j = "org.slf4j" % "log4j-over-slf4j" % SLF4J_VERSION % "compile" + lazy val werkz = "org.codehaus.aspectwerkz" % "aspectwerkz-nodeps-jdk5" % ASPECTWERKZ_VERSION % "compile" lazy val werkz_core = "org.codehaus.aspectwerkz" % "aspectwerkz-jdk5" % ASPECTWERKZ_VERSION % "compile" @@ -228,10 +237,19 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) { lazy val junit = "junit" % "junit" % "4.5" % "test" lazy val mockito = "org.mockito" % "mockito-all" % "1.8.1" % "test" lazy val scalatest = "org.scalatest" % "scalatest" % SCALATEST_VERSION % "test" + + //HBase testing lazy val hadoop_test = "org.apache.hadoop" % "hadoop-test" % "0.20.2" % "test" lazy val hbase_test = "org.apache.hbase" % "hbase-test" % "0.20.6" % "test" lazy val log4j = "log4j" % "log4j" % "1.2.15" % "test" lazy val jetty_mortbay = "org.mortbay.jetty" % "jetty" % "6.1.14" % "test" + + //voldemort testing + lazy val jdom = "org.jdom" % "jdom" % "1.1" % "test" + lazy val vold_jetty = "org.mortbay.jetty" % "jetty" % "6.1.18" % "test" + lazy val velocity = "org.apache.velocity" % "velocity" % "1.6.2" % "test" + lazy val bdb = "com.sleepycat" % "je" % "4.0.103" % "test" + lazy val dbcp = "commons-dbcp" % "commons-dbcp" % "1.2.2" % "test" } // ------------------------------------------------------------------------------------------------------------------- @@ -480,6 +498,8 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) { new AkkaCassandraProject(_), akka_persistence_common) lazy val akka_persistence_hbase = project("akka-persistence-hbase", "akka-persistence-hbase", new AkkaHbaseProject(_), akka_persistence_common) + lazy val akka_persistence_voldemort = project("akka-persistence-voldemort", "akka-persistence-voldemort", + new AkkaVoldemortProject(_), akka_persistence_common) } // ------------------------------------------------------------------------------------------------------------------- @@ -544,7 +564,7 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) { - + @@ -557,6 +577,28 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) { override def testOptions = TestFilter((name: String) => name.endsWith("Test")) :: Nil } + // akka-persistence-voldemort subproject + // ------------------------------------------------------------------------------------------------------------------- + + class AkkaVoldemortProject(info: ProjectInfo) extends AkkaDefaultProject(info, distPath) { + val voldemort = Dependencies.voldemort + val voldemort_contrib = Dependencies.voldemort_contrib + val voldemort_needs_log4j = Dependencies.voldemort_needs_log4j + + //testing + val scalatest = Dependencies.scalatest + val google_coll = Dependencies.google_coll + val jdom = Dependencies.jdom + val jetty = Dependencies.vold_jetty + val velocity = Dependencies.velocity + val bdb = Dependencies.bdb + val dbcp = Dependencies.dbcp + val sjson = Dependencies.sjson_test + + override def testOptions = TestFilter((name: String) => name.endsWith("Suite")) :: Nil + } + + // ------------------------------------------------------------------------------------------------------------------- // akka-kernel subproject // ------------------------------------------------------------------------------------------------------------------- @@ -600,7 +642,7 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) { new AkkaOSGiAssemblyProject(_), akka_osgi_dependencies_bundle, akka_remote, akka_amqp, akka_http, akka_camel, akka_spring, akka_jta, akka_persistence.akka_persistence_common, akka_persistence.akka_persistence_redis, akka_persistence.akka_persistence_mongo, - akka_persistence.akka_persistence_cassandra) + akka_persistence.akka_persistence_cassandra,akka_persistence.akka_persistence_voldemort) } class AkkaOSGiDependenciesBundleProject(info: ProjectInfo) extends AkkaDefaultProject(info, distPath) with BNDPlugin {