diff --git a/akka-persistence/akka-persistence-voldemort/src/main/scala/VoldemortSession.scala b/akka-persistence/akka-persistence-voldemort/src/main/scala/VoldemortSession.scala new file mode 100644 index 0000000000..c0eca74832 --- /dev/null +++ b/akka-persistence/akka-persistence-voldemort/src/main/scala/VoldemortSession.scala @@ -0,0 +1,24 @@ +/** + * Copyright (C) 2009-2010 Scalable Solutions AB + */ + +package se.scalablesolutions.akka.persistence.voldemort + +import se.scalablesolutions.akka.util.UUID +import se.scalablesolutions.akka.stm._ +import se.scalablesolutions.akka.persistence.common._ +import voldemort.client.StoreClient + + +class VoldemortSession { + + val voldemort: StoreClient + + def getOptionalBytes(name: String): Option[Array[Byte]] = { + + } + + def put(name:) + + +} \ No newline at end of file diff --git a/akka-persistence/akka-persistence-voldemort/src/main/scala/VoldemortStorage.scala b/akka-persistence/akka-persistence-voldemort/src/main/scala/VoldemortStorage.scala new file mode 100644 index 0000000000..a590de349b --- /dev/null +++ b/akka-persistence/akka-persistence-voldemort/src/main/scala/VoldemortStorage.scala @@ -0,0 +1,43 @@ +/** + * Copyright (C) 2009-2010 Scalable Solutions AB + */ + +package se.scalablesolutions.akka.persistence.voldemort + +import se.scalablesolutions.akka.util.UUID +import se.scalablesolutions.akka.stm._ +import se.scalablesolutions.akka.persistence.common._ + + +object VoldemortStorage extends Storage { + + type ElementType = Array[Byte] + def newMap: PersistentMap[ElementType, ElementType] = newMap(UUID.newUuid.toString) + def newVector: PersistentVector[ElementType] = newVector(UUID.newUuid.toString) + def newRef: PersistentRef[ElementType] = newRef(UUID.newUuid.toString) + + def getMap(id: String): PersistentMap[ElementType, ElementType] = newMap(id) + def getVector(id: String): PersistentVector[ElementType] = newVector(id) + def getRef(id: String): PersistentRef[ElementType] = newRef(id) + + def newMap(id: String): PersistentMap[ElementType, ElementType] = new VoldemortPersistentMap(id) + def newVector(id: String): PersistentVector[ElementType] = new VoldemortPersistentVector(id) + def newRef(id: String): PersistentRef[ElementType] = new VoldemortPersistentRef(id) +} + + +class VoldemortPersistentMap(id: String) extends PersistentMapBinary { + val uuid = id + val storage = VoldemortStorageBackend +} + + +class VoldemortPersistentVector(id: String) extends PersistentVector[Array[Byte]] { + val uuid = id + val storage = VoldemortStoragebackend +} + +class VoldemortPersistentRef(id: String) extends PersistentRef[Array[Byte]] { + val uuid = id + val storage = VoldemortStoragebackend +} diff --git a/akka-persistence/akka-persistence-voldemort/src/main/scala/VoldemortStorageBackend.scala b/akka-persistence/akka-persistence-voldemort/src/main/scala/VoldemortStorageBackend.scala new file mode 100644 index 0000000000..5732fbac8d --- /dev/null +++ b/akka-persistence/akka-persistence-voldemort/src/main/scala/VoldemortStorageBackend.scala @@ -0,0 +1,129 @@ +/** + * Copyright (C) 2009-2010 Scalable Solutions AB + */ + +package se.scalablesolutions.akka.persistence.voldemort + +import se.scalablesolutions.akka.stm._ +import se.scalablesolutions.akka.persistence.common._ +import se.scalablesolutions.akka.util.Logging +import se.scalablesolutions.akka.util.Helpers._ +import se.scalablesolutions.akka.config.Config.config + +import voldemort.client._ +import collection.mutable.{Set, HashSet, ArrayBuffer} +import java.lang.String + + +private[akka] object VoldemortStorageBackend extends +MapStorageBackend[Array[Byte], Array[Byte]] with + VectorStorageBackend[Array[Byte]] with + RefStorageBackend[Array[Byte]] with + Logging { + + /** + * Concat the owner+key+lenght of owner so owned data will be colocated + * Store the length of owner as last byte to work aroune the rarest case + * where ownerbytes1 + keybytes1 == ownerbytes2 + keybytes2 but ownerbytes1 != ownerbytes2 + */ + private def mapKey(owner: String, key: Array[Byte]): Array[Byte] = { + val ownerBytes: Array[Byte] = owner.getBytes("UTF-8") + val ownerLenghtByte = ownerBytes.length.byteValue + val mapKey = new Array[Byte](ownerBytes.length + key.length + 1) + System.arraycopy(ownerBytes, 0, mapKey, 0, ownerBytes.length) + System.arraycopy(key, 0, mapKey, ownerBytes.length, key.length) + mapKey.update(mapKey.length - 1) = ownerLenghtByte + } + + var refClient: StoreClient + var mapKeyClient: StoreClient + var mapValueClient: StoreClient + + + def getRefStorageFor(name: String): Option[Array[Byte]] = { + val result: Array[Byte] = refClient.get(RefKey(name).key) + result match { + case null => None + case _ => Some(result) + } + } + + def insertRefStorageFor(name: String, element: Array[Byte]) = { + refClient.put(RefKey(name).key, element) + } + + def getMapStorageRangeFor(name: String, start: Option[Array[Byte]], finish: Option[Array[Byte]], count: Int): List[(Array[Byte], Array[Byte])] = { + + } + + def getMapStorageFor(name: String): List[(Array[Byte], Array[Byte])] = { + val keys: Set[Array[Byte]] = mapKeyClient.getValue(name, new HashSet[Array[Byte]](0)) + val entries: ArrayBuffer[(Array[Byte], Array[Byte])] = new ArrayBuffer + keys.foreach { + entries += (_, mapValueClient.getValue(mapKey(name, _))) + } + entries.toList + } + + def getMapStorageSizeFor(name: String): Int = { + val keys: Set[Array[Byte]] = mapKeyClient.getValue(name, new HashSet[Array[Byte]](0)) + keys.size + } + + def getMapStorageEntryFor(name: String, key: Array[Byte]): Option[Array[Byte]] = { + val result: Array[Byte] = mapValueClient.get(mapKey(name, key)) + result match { + case null => None + case _ => Some(result) + } + } + + def removeMapStorageFor(name: String, key: Array[Byte]) = { + val keys: Set[Array[Byte]] = mapKeyClient.getValue(name, new HashSet[Array[Byte]](0)) + keys -= key + mapKeyClient.put(name, keys) + mapValueClient.delete(mapKey(name, key)) + } + + + def removeMapStorageFor(name: String) = { + val keys: Set[Array[Byte]] = mapKeyClient.getValue(name, new HashSet[Array[Byte]](0)) + keys.foreach { + mapValueClient.delete(mapKey(name, _)) + } + mapKeyClient.delete(name) + } + + def insertMapStorageEntryFor(name: String, key: Array[Byte], value: Array[Byte]) = { + mapValueClient.put(mapKey(name, key)) + val keys: Set[Array[Byte]] = mapKeyClient.getValue(name, new HashSet[Array[Byte]](0)) + keys += key + mapKeyClient.put(name, keys) + } + + def insertMapStorageEntriesFor(name: String, entries: List[(Array[Byte], Array[Byte])]) = { + val newKeys = new HashSet[Array[Byte]] + entries.foreach { + (key, value) => mapValueClient.put(mapKey(name, key), value) + newKeys += key + } + val keys: Set[Array[Byte]] = mapKeyClient.getValue(name, new HashSet[Array[Byte]](0)) + keys += key + mapKeyClient.put(name, keys) + } + + + def getVectorStorageSizeFor(name: String): Int = null + + def getVectorStorageRangeFor(name: String, start: Option[Int], finish: Option[Int], count: Int): List[Array[Byte]] = null + + def getVectorStorageEntryFor(name: String, index: Int): Array[Byte] = null + + def updateVectorStorageEntryFor(name: String, index: Int, elem: Array[Byte]) = null + + def insertVectorStorageEntriesFor(name: String, elements: List[Array[Byte]]) = null + + def insertVectorStorageEntryFor(name: String, element: Array[Byte]) = null + + +} \ No newline at end of file diff --git a/akka-persistence/akka-persistence-voldemort/src/test/resources/cluster.xml b/akka-persistence/akka-persistence-voldemort/src/test/resources/cluster.xml new file mode 100644 index 0000000000..e69de29bb2 diff --git a/akka-persistence/akka-persistence-voldemort/src/test/resources/server.properties b/akka-persistence/akka-persistence-voldemort/src/test/resources/server.properties new file mode 100644 index 0000000000..e69de29bb2 diff --git a/akka-persistence/akka-persistence-voldemort/src/test/resources/stores.xml b/akka-persistence/akka-persistence-voldemort/src/test/resources/stores.xml new file mode 100644 index 0000000000..f031238e59 --- /dev/null +++ b/akka-persistence/akka-persistence-voldemort/src/test/resources/stores.xml @@ -0,0 +1,52 @@ + + + Refs + 1 + 1 + 1 + 1 + 1 + bdb + client + + string + utf8 + + + identity + + + + MapValues + 1 + 1 + 1 + 1 + 1 + bdb + client + + + + + identity + + + + MapKeys + 1 + 1 + 1 + 1 + 1 + bdb + client + + string + utf8 + + + identity + + + \ No newline at end of file diff --git a/akka-persistence/akka-persistence-voldemort/src/test/scala/VoldemortStorageBackendSuite.scala b/akka-persistence/akka-persistence-voldemort/src/test/scala/VoldemortStorageBackendSuite.scala new file mode 100644 index 0000000000..b11a4bba35 --- /dev/null +++ b/akka-persistence/akka-persistence-voldemort/src/test/scala/VoldemortStorageBackendSuite.scala @@ -0,0 +1,17 @@ +package se.scalablesolutions.akka.persistence.voldemort + +import org.scalatest.FunSuite +import org.scalatest.matchers.ShouldMatchers +import se.scalablesolutions.akka.util.UUID + + +/** + * + */ + +class VoldemortStorageBackendSuite extends FunSuite with ShouldMatchers { + + test("UUID generation looks like"){ + System.out.println(UUID.newUuid.toString) + } +} \ No newline at end of file diff --git a/project/build/AkkaProject.scala b/project/build/AkkaProject.scala index 6a97dbccfd..fe4fb19584 100644 --- a/project/build/AkkaProject.scala +++ b/project/build/AkkaProject.scala @@ -50,6 +50,7 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) { lazy val SonatypeSnapshotRepo = MavenRepository("Sonatype OSS Repo", "http://oss.sonatype.org/content/repositories/releases") lazy val SunJDMKRepo = MavenRepository("Sun JDMK Repo", "http://wp5.e-taxonomy.eu/cdmlib/mavenrepo") lazy val CasbahRepoReleases = MavenRepository("Casbah Release Repo", "http://repo.bumnetworks.com/releases") + lazy val ClojarsRepo = MavenRepository("Clojars Repo", "http://clojars.org/repo") } // ------------------------------------------------------------------------------------------------------------------- @@ -77,6 +78,7 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) { lazy val logbackModuleConfig = ModuleConfiguration("ch.qos.logback",sbt.DefaultMavenRepository) lazy val atomikosModuleConfig = ModuleConfiguration("com.atomikos",sbt.DefaultMavenRepository) lazy val casbahRelease = ModuleConfiguration("com.novus",CasbahRepoReleases) + lazy val voldemortModuleConfig = ModuleConfiguration("voldemort", ClojarsRepo) lazy val embeddedRepo = EmbeddedRepo // This is the only exception, because the embedded repo is fast! // ------------------------------------------------------------------------------------------------------------------- @@ -200,6 +202,9 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) { lazy val thrift = "com.facebook" % "thrift" % "r917130" % "compile" + lazy val voldemort = "voldemort" % "voldemort" % "0.81" % "compile" + lazy val voldemort_contrib = "voldemort" % "voldemort-contrib" % "0.81" % "compile" + lazy val werkz = "org.codehaus.aspectwerkz" % "aspectwerkz-nodeps-jdk5" % ASPECTWERKZ_VERSION % "compile" lazy val werkz_core = "org.codehaus.aspectwerkz" % "aspectwerkz-jdk5" % ASPECTWERKZ_VERSION % "compile" @@ -461,6 +466,8 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) { new AkkaMongoProject(_), akka_persistence_common) lazy val akka_persistence_cassandra = project("akka-persistence-cassandra", "akka-persistence-cassandra", new AkkaCassandraProject(_), akka_persistence_common) + lazy val akka_persistence_voldemort = project("akka-persistence-voldemort", "akka-persistence-voldemort", + new AkkaVoldemortProject(_), akka_persistence_common) } // ------------------------------------------------------------------------------------------------------------------- @@ -510,6 +517,23 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) { override def testOptions = TestFilter((name: String) => name.endsWith("Test")) :: Nil } + + // ------------------------------------------------------------------------------------------------------------------- + // akka-persistence-voldemort subproject + // ------------------------------------------------------------------------------------------------------------------- + + class AkkaVoldemortProject(info: ProjectInfo) extends AkkaDefaultProject(info, distPath) { + val voldemort = Dependencies.voldemort + val voldemort_contrib = Dependencies.voldemort_contrib + + //testing + val scalatest = Dependencies.scalatest + override def testOptions = TestFilter((name: String) => name.endsWith("Test")) :: Nil + } + + + + // ------------------------------------------------------------------------------------------------------------------- // akka-kernel subproject // -------------------------------------------------------------------------------------------------------------------