diff --git a/akka-persistence/akka-persistence-couchdb/src/main/scala/CouchDBStorageBackend.scala b/akka-persistence/akka-persistence-couchdb/src/main/scala/CouchDBStorageBackend.scala index 1d641e5468..778adbc99e 100644 --- a/akka-persistence/akka-persistence-couchdb/src/main/scala/CouchDBStorageBackend.scala +++ b/akka-persistence/akka-persistence-couchdb/src/main/scala/CouchDBStorageBackend.scala @@ -5,10 +5,220 @@ import se.scalablesolutions.akka.persistence.common._ import se.scalablesolutions.akka.util.Logging import se.scalablesolutions.akka.config.Config.config + +import org.apache.commons.httpclient.methods.{GetMethod, PostMethod, PutMethod, DeleteMethod} +import org.apache.commons.httpclient.params.HttpMethodParams +import org.apache.commons.httpclient.methods._ +import org.apache.commons.httpclient.{DefaultHttpMethodRetryHandler, HttpClient} + +import scala.util.parsing.json._; +import sjson.json._ +import DefaultProtocol._ + + + private [akka] object CouchDBStorageBackend extends MapStorageBackend[Array[Byte], Array[Byte]] with VectorStorageBackend[Array[Byte]] with RefStorageBackend[Array[Byte]] with Logging { -// need couch db implementation! -} \ No newline at end of file + + + import dispatch.json._ + + implicit object widgetWrites extends Writes[Map[String,Any]] { + def writes(o: Map[String,Any]): JsValue = JsValue(o) + } + + val ENCODING = "UTF-8" + val URL = config.getString("akka.storage.couchdb.url", "http://localhost:5984/testakka/") + + def drop() = { + val client: HttpClient = new HttpClient() + val delete: DeleteMethod = new DeleteMethod(URL) + client.executeMethod(delete) + } + + def create() = { + val client: HttpClient = new HttpClient() + val put: PutMethod = new PutMethod(URL) + put.setRequestEntity(new StringRequestEntity("", null, "utf-8")) + put.setRequestHeader("Content-Type", "application/json") + client.executeMethod(put) + put.getResponseBodyAsString + } + + + // private def storeMap(name: String, entries: List[(Array[Byte], Array[Byte])]) ={ + // var m = entries.map(e=>(new String(e._1, ENCODING) -> new String(e._2, ENCODING))).toMap + // val dataJson = JsonSerialization.tojson(m) + // postData(URL + name, dataJson.toString) + // } + + private def storeMap(name: String, entries: Map[String, Any]) ={ + postData(URL + name, JsonSerialization.tojson(entries).toString) + } + + private def getResponseForNameAsMap(name: String): Option[Map[String, Any]] = { + getResponse(URL + name).flatMap(JSON.parseFull(_)).asInstanceOf[Option[Map[String, Any]]] + } + + + def insertMapStorageEntriesFor(name: String, entries: List[(Array[Byte], Array[Byte])]) ={ + val newDoc = getResponseForNameAsMap(name).getOrElse(Map[String, Any]()) ++ + entries.map(e=>(new String(e._1) -> new String(e._2))).toMap + storeMap(name, newDoc) + } + + def insertMapStorageEntryFor(name: String, key: Array[Byte], value: Array[Byte])={ + insertMapStorageEntriesFor(name, List((key, value))) + } + + + def removeMapStorageFor(name: String): Unit = { + findDocRev(name).flatMap(deleteData(URL + name, _)) + } + + def removeMapStorageFor(name: String, key: Array[Byte]): Unit = { + getResponseForNameAsMap(name).flatMap(doc=>{ + removeMapStorageFor(name) + storeMap(name, doc - new String(key, ENCODING))}) + } + + def getMapStorageEntryFor(name: String, key: Array[Byte]): Option[Array[Byte]] = { + getResponseForNameAsMap(name).flatMap(_.get(new String(key, ENCODING))).asInstanceOf[Option[String]] + .map(_.getBytes) + } + + def getMapStorageSizeFor(name: String): Int = { + getMapStorageFor(name).size + } + + def getMapStorageFor(name: String): List[(Array[Byte], Array[Byte])] = { + val m = getResponseForNameAsMap(name).map(_ - ("_id", "_rev")).getOrElse(Map[String, Any]()) + m.toList.map(e => (e._1.getBytes -> e._2.asInstanceOf[String].getBytes)) + } + + def getMapStorageRangeFor(name: String, start: Option[Array[Byte]], finish: Option[Array[Byte]], count: Int): List[(Array[Byte], Array[Byte])] = { + val m = getResponseForNameAsMap(name).map(_ - ("_id", "_rev")).getOrElse(Map[String, Any]()) + val keys = m.keys.toList.sortWith(_ < _) + + // if the supplied start is not defined, get the head of keys + val s = start.map(new String(_, ENCODING)).getOrElse(keys.head) + + // if the supplied finish is not defined, get the last element of keys + val f = finish.map(new String(_, ENCODING)).getOrElse(keys.last) + + // slice from keys: both ends inclusive + val ks = keys.slice(keys.indexOf(s), scala.math.min(count, keys.indexOf(f) + 1)) + ks.map(k => (k.getBytes, m(k).asInstanceOf[String].getBytes)) + } + + def insertVectorStorageEntryFor(name: String, element: Array[Byte]) = { + insertVectorStorageEntriesFor(name, List(element)) + } + + def insertVectorStorageEntriesFor(name: String, elements: List[Array[Byte]]) = { + val m = getResponseForNameAsMap(name).getOrElse(Map[String, Any]()) + val v = elements.map(x=>new String(x)) ::: m.getOrElse("vector", List[String]()).asInstanceOf[List[String]] + storeMap(name, m + ("vector" -> v)) + + // val m = getResponseForNameAsMap(name).getOrElse(Map[String, Any]()) + // val v = m.getOrElse("vector", List[String]()).asInstanceOf[List[String]] ::: elements.map(x=>new String(x, ENCODING)) + // storeMap(name, m + ("vector" -> v)) + } + + def updateVectorStorageEntryFor(name: String, index: Int, elem: Array[Byte]) ={ + val m = getResponseForNameAsMap(name).getOrElse(Map[String, Any]()) + val v: List[String] = m.getOrElse("vector", List[String]()).asInstanceOf[List[String]] + if (v.indices.contains(index)) { + storeMap(name, m + ("vector" -> v.updated(index, new String(elem, ENCODING)))) + } + } + + def getVectorStorageEntryFor(name: String, index: Int): Array[Byte] ={ + val v = getResponseForNameAsMap(name).flatMap(_.get("vector")).getOrElse(List[String]()).asInstanceOf[List[String]] + if (v.indices.contains(index)) + v(index).getBytes(ENCODING) + else + Array[Byte]() + } + + def getVectorStorageRangeFor(name: String, start: Option[Int], finish: Option[Int], count: Int): List[Array[Byte]]={ + val v = getResponseForNameAsMap(name).flatMap(_.get("vector")).asInstanceOf[Option[List[String]]].getOrElse(List[String]()) + val s = start.getOrElse(0) + val f = finish.getOrElse(v.length) + val c = if (count == 0) v.length else count + v.slice(s, scala.math.min(s + c, f)).map(_.getBytes(ENCODING)) + } + + def getVectorStorageSizeFor(name: String): Int ={ + getResponseForNameAsMap(name).flatMap(_.get("vector")).map(_.asInstanceOf[List[String]].size).getOrElse(0) + } + + + + def insertRefStorageFor(name: String, element: Array[Byte]) ={ + val newDoc = getResponseForNameAsMap(name).getOrElse(Map[String, Any]()) ++ Map("ref" -> new String(element)) + removeMapStorageFor(name) + storeMap(name, newDoc) + } + + def getRefStorageFor(name: String): Option[Array[Byte]] ={ + getResponseForNameAsMap(name).flatMap(_.get("ref")).map(_.asInstanceOf[String].getBytes) + } + + + + private def findDocRev(name: String) = { + getResponse(URL + name).flatMap(JSON.parseFull(_)).asInstanceOf[Option[Map[String, Any]]] + .flatMap(_.get("_rev")).asInstanceOf[Option[String]] + } + + private def deleteData(url:String, rev:String): Option[String] = { + val client: HttpClient = new HttpClient() + val delete: DeleteMethod = new DeleteMethod(url) + delete.setRequestHeader("If-Match", rev) + client.executeMethod(delete) + + val response = delete.getResponseBodyAsString() + if (response != null) + Some(response) + else + None + } + + private def postData(url: String, data: String): Option[String] = { + val client: HttpClient = new HttpClient() + val post: PostMethod = new PostMethod(url) + post.setRequestEntity(new StringRequestEntity(data, null, "utf-8")) + post.setRequestHeader("Content-Type", "application/json") + client.executeMethod(post) + val response = post.getResponseBodyAsString + if (response != null) + Some(response) + else + None + } + + + + + private def getResponse(url: String): Option[String] = { + val client = new HttpClient() + val method = new GetMethod(url) + + method.getParams().setParameter(HttpMethodParams.RETRY_HANDLER, + new DefaultHttpMethodRetryHandler(3, false)) + + client.executeMethod(method) + val response = method.getResponseBodyAsString + if (method.getStatusCode == 200 && response != null) + Some(response) + else + None + } + + +} + diff --git a/akka-persistence/akka-persistence-couchdb/src/test/scala/CouchDBStorageSpec.scala b/akka-persistence/akka-persistence-couchdb/src/test/scala/CouchDBStorageSpec.scala new file mode 100644 index 0000000000..e418b5dc50 --- /dev/null +++ b/akka-persistence/akka-persistence-couchdb/src/test/scala/CouchDBStorageSpec.scala @@ -0,0 +1,13 @@ +package se.scalablesolutions.akka.persistence.couchdb + +import org.specs._ + +object CouchDBStorageSpec extends Specification { + val ENCODING = "UTF-8" + + println("\n\n\n\n") + CouchDBStorageBackend.insertMapStorageEntryFor("weather", "abc".getBytes , "henbf".getBytes) + //CouchDBStorageBackend.removeMapStorageFor("weather") + CouchDBStorageBackend.insertMapStorageEntryFor("weather", "def".getBytes , "werg".getBytes) + //CouchDBStorageBackend.removeMapStorageFor("weather" , "def".getBytes) +} \ No newline at end of file diff --git a/config/akka-reference.conf b/config/akka-reference.conf index b2202fb669..896af22c83 100644 --- a/config/akka-reference.conf +++ b/config/akka-reference.conf @@ -166,6 +166,10 @@ akka { port = 6379 # Port to Redis } + couchdb { + url = "http://localhost:5984/testakka/" + } + hbase { zookeeper-quorum = "localhost" # A comma separated list of hostnames or IPs of the zookeeper quorum instances } diff --git a/project/build/AkkaProject.scala b/project/build/AkkaProject.scala index 8fc209c716..3a77af369a 100644 --- a/project/build/AkkaProject.scala +++ b/project/build/AkkaProject.scala @@ -114,6 +114,13 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) { // Compile + val httpclient = "maven2" at "http://repo1.maven.org/maven2/" + val commonsHttpClient = "commons-httpclient" % "commons-httpclient" % "3.1" % "compile" + + val scalatoolsSnapshot = "Scala Tools Snapshot" at "http://scala-tools.org/repo-snapshots/" + val specs = "org.scala-tools.testing" %% "specs" % "1.6.5" % "test" + + lazy val annotation = "javax.annotation" % "jsr250-api" % "1.0" % "compile" lazy val aopalliance = "aopalliance" % "aopalliance" % "1.0" % "compile" @@ -499,7 +506,7 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) { new AkkaHbaseProject(_), akka_persistence_common) lazy val akka_persistence_voldemort = project("akka-persistence-voldemort", "akka-persistence-voldemort", new AkkaVoldemortProject(_), akka_persistence_common) - lazy val akka_persistence_couchdb = project("akka_persistence_couchdb", "akka_persistence_couchdb", + lazy val akka_persistence_couchdb = project("akka-persistence-couchdb", "akka-persistence-couchdb", new AkkaCouchDBProject(_), akka_persistence_common) } @@ -534,6 +541,7 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) { override def testOptions = createTestFilter( _.endsWith("Test")) } + // ------------------------------------------------------------------------------------------------------------------- // akka-persistence-cassandra subproject // ------------------------------------------------------------------------------------------------------------------- @@ -601,7 +609,8 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) { } class AkkaCouchDBProject(info: ProjectInfo) extends AkkaDefaultProject(info, distPath) { - + val couch = Dependencies.commonsHttpClient + val spec = Dependencies.specs } // -------------------------------------------------------------------------------------------------------------------