diff --git a/akka-persistence/akka-persistence-couchdb/src/main/scala/CouchDBStorageBackend.scala b/akka-persistence/akka-persistence-couchdb/src/main/scala/CouchDBStorageBackend.scala index 1d641e5468..61beaebb39 100644 --- a/akka-persistence/akka-persistence-couchdb/src/main/scala/CouchDBStorageBackend.scala +++ b/akka-persistence/akka-persistence-couchdb/src/main/scala/CouchDBStorageBackend.scala @@ -5,10 +5,209 @@ import se.scalablesolutions.akka.persistence.common._ import se.scalablesolutions.akka.util.Logging import se.scalablesolutions.akka.config.Config.config + +import org.apache.commons.httpclient.methods.{GetMethod, PostMethod, PutMethod, DeleteMethod} +import org.apache.commons.httpclient.params.HttpMethodParams +import org.apache.commons.httpclient.methods._ +import org.apache.commons.httpclient.{DefaultHttpMethodRetryHandler, HttpClient} + +import scala.util.parsing.json._; +import sjson.json._ +import DefaultProtocol._ + + + private [akka] object CouchDBStorageBackend extends MapStorageBackend[Array[Byte], Array[Byte]] with VectorStorageBackend[Array[Byte]] with RefStorageBackend[Array[Byte]] with Logging { -// need couch db implementation! -} \ No newline at end of file + + + import dispatch.json._ + + implicit object widgetWrites extends Writes[Map[String,Any]] { + def writes(o: Map[String,Any]): JsValue = JsValue(o) + } + + val URL = config.getString("akka.storage.couchdb.url", "http://localhost:5984/testakka/") + + def drop() = { + val client: HttpClient = new HttpClient() + val delete: DeleteMethod = new DeleteMethod(URL) + client.executeMethod(delete) + } + + def create() = { + val client: HttpClient = new HttpClient() + val put: PutMethod = new PutMethod(URL) + put.setRequestEntity(new StringRequestEntity("", null, "utf-8")) + put.setRequestHeader("Content-Type", "application/json") + client.executeMethod(put) + put.getResponseBodyAsString + } + + private def storeMap(name: String, entries: List[(Array[Byte], Array[Byte])]) ={ + var m = entries.map(e=>(new String(e._1) -> new String(e._2))).toMap + ("_id" -> name) + val dataJson = JsonSerialization.tojson(m) + postData(URL, dataJson.toString) + } + + private def storeMap(name: String, entries: Map[String, Any]) ={ + postData(URL, JsonSerialization.tojson(entries + ("_id" -> name)).toString) + } + + def insertMapStorageEntriesFor(name: String, entries: List[(Array[Byte], Array[Byte])]) ={ + val newDoc = getResponseForNameAsMap(name).getOrElse(Map[String, Any]()) ++ + entries.map(e=>(new String(e._1) -> new String(e._2))).toMap + storeMap(name, newDoc) + } + + def insertMapStorageEntryFor(name: String, key: Array[Byte], value: Array[Byte])={ + insertMapStorageEntriesFor(name, List((key, value))) + } + + + def removeMapStorageFor(name: String): Unit = { + findDocRev(name).flatMap(deleteData(URL + name, _)) + } + + def removeMapStorageFor(name: String, key: Array[Byte]): Unit = { + getResponseForNameAsMap(name).flatMap(doc=>{ // if we can't find the map for name, then we don't need to delete it. + removeMapStorageFor(name) + storeMap(name, doc - new String(key)) + }) + } + + def getMapStorageEntryFor(name: String, key: Array[Byte]): Option[Array[Byte]] = { + getResponseForNameAsMap(name).flatMap(_.get(new String(key))).asInstanceOf[Option[String]] + .map(_.getBytes) + } + + def getMapStorageSizeFor(name: String): Int = { + getMapStorageFor(name).size + } + + def getMapStorageFor(name: String): List[(Array[Byte], Array[Byte])] = { + val m = getResponseForNameAsMap(name).map(_ - ("_id", "_rev")).getOrElse(Map[String, Any]()) + m.toList.map(e => (e._1.getBytes, e._2.asInstanceOf[String].getBytes)) + } + + def getMapStorageRangeFor(name: String, start: Option[Array[Byte]], finish: Option[Array[Byte]], count: Int): List[(Array[Byte], Array[Byte])] = { + val m = getResponseForNameAsMap(name).map(_ - ("_id", "_rev")).getOrElse(Map[String, Any]()) + val keys = m.keys.toList.sortWith(_ < _) + + // if the supplied start is not defined, get the head of keys + val s = start.map(new String(_)).getOrElse(keys.head) + + // if the supplied finish is not defined, get the last element of keys + val f = finish.map(new String(_)).getOrElse(keys.last) + + val c = if (count == 0) Int.MaxValue else count + // slice from keys: both ends inclusive + val ks = keys.slice(keys.indexOf(s), scala.math.min(keys.indexOf(s) + c, keys.indexOf(f) + 1)) + ks.map(k => (k.getBytes, m(k).asInstanceOf[String].getBytes)) + } + + def insertVectorStorageEntryFor(name: String, element: Array[Byte]) = { + insertVectorStorageEntriesFor(name, List(element)) + } + + def insertVectorStorageEntriesFor(name: String, elements: List[Array[Byte]]) = { + val m = getResponseForNameAsMap(name).getOrElse(Map[String, Any]()) + val v = elements.map(x=>new String(x)) ::: m.getOrElse("vector", List[String]()).asInstanceOf[List[String]] + storeMap(name, m + ("vector" -> v)) + } + + def updateVectorStorageEntryFor(name: String, index: Int, elem: Array[Byte]) = { + val m = getResponseForNameAsMap(name).getOrElse(Map[String, Any]()) + val v: List[String] = m.getOrElse("vector", List[String]()).asInstanceOf[List[String]] + if (v.indices.contains(index)) { + storeMap(name, m + ("vector" -> v.updated(index, new String(elem)))) + } + } + + def getVectorStorageEntryFor(name: String, index: Int): Array[Byte] ={ + val v = getResponseForNameAsMap(name).flatMap(_.get("vector")).getOrElse(List[String]()).asInstanceOf[List[String]] + if (v.indices.contains(index)) + v(index).getBytes + else + Array[Byte]() + } + + def getVectorStorageSizeFor(name: String): Int ={ + getResponseForNameAsMap(name).flatMap(_.get("vector")).map(_.asInstanceOf[List[String]].size).getOrElse(0) + } + + def getVectorStorageRangeFor(name: String, start: Option[Int], finish: Option[Int], count: Int): List[Array[Byte]] = { + val v = getResponseForNameAsMap(name).flatMap(_.get("vector")).asInstanceOf[Option[List[String]]].getOrElse(List[String]()) + val s = start.getOrElse(0) + val f = finish.getOrElse(v.length) + val c = if (count == 0) v.length else count + v.slice(s, scala.math.min(s + c, f)).map(_.getBytes) + } + + def insertRefStorageFor(name: String, element: Array[Byte]) ={ + val newDoc = getResponseForNameAsMap(name).getOrElse(Map[String, Any]()) ++ Map("ref" -> new String(element)) + storeMap(name, newDoc) + } + + def getRefStorageFor(name: String): Option[Array[Byte]] ={ + getResponseForNameAsMap(name).flatMap(_.get("ref")).map(_.asInstanceOf[String].getBytes) + } + + + private def findDocRev(name: String) = { + getResponse(URL + name).flatMap(JSON.parseFull(_)).asInstanceOf[Option[Map[String, Any]]] + .flatMap(_.get("_rev")).asInstanceOf[Option[String]] + } + + private def deleteData(url:String, rev:String): Option[String] = { + val client: HttpClient = new HttpClient() + val delete: DeleteMethod = new DeleteMethod(url) + delete.setRequestHeader("If-Match", rev) + client.executeMethod(delete) + + val response = delete.getResponseBodyAsString() + if (response != null) + Some(response) + else + None + } + + private def postData(url: String, data: String): Option[String] = { + val client: HttpClient = new HttpClient() + val post: PostMethod = new PostMethod(url) + post.setRequestEntity(new StringRequestEntity(data, null, "utf-8")) + post.setRequestHeader("Content-Type", "application/json") + client.executeMethod(post) + val response = post.getResponseBodyAsString + if (response != null) + Some(response) + else + None + } + + private def getResponseForNameAsMap(name: String): Option[Map[String, Any]] = { + getResponse(URL + name).flatMap(JSON.parseFull(_)).asInstanceOf[Option[Map[String, Any]]] + } + + + private def getResponse(url: String): Option[String] = { + val client = new HttpClient() + val method = new GetMethod(url) + + method.getParams().setParameter(HttpMethodParams.RETRY_HANDLER, + new DefaultHttpMethodRetryHandler(3, false)) + + client.executeMethod(method) + val response = method.getResponseBodyAsString + if (method.getStatusCode == 200 && response != null) + Some(response) + else + None + } + + +} + diff --git a/akka-persistence/akka-persistence-couchdb/src/test/scala/CouchDBPersistentActorSpec.scala b/akka-persistence/akka-persistence-couchdb/src/test/scala/CouchDBPersistentActorSpec.scala new file mode 100644 index 0000000000..7dc69ba6f2 --- /dev/null +++ b/akka-persistence/akka-persistence-couchdb/src/test/scala/CouchDBPersistentActorSpec.scala @@ -0,0 +1,9 @@ +package se.scalablesolutions.akka.persistence.couchdb + +import org.scalatest.junit.JUnitSuite + +import org.junit.{Test, Before} +import org.junit.Assert._ + +import se.scalablesolutions.akka.actor.{ActorRef, Transactor} +import se.scalablesolutions.akka.actor.Actor._ \ No newline at end of file diff --git a/akka-persistence/akka-persistence-couchdb/src/test/scala/CouchDBStorageBackendSpec.scala b/akka-persistence/akka-persistence-couchdb/src/test/scala/CouchDBStorageBackendSpec.scala new file mode 100644 index 0000000000..9844e4baf0 --- /dev/null +++ b/akka-persistence/akka-persistence-couchdb/src/test/scala/CouchDBStorageBackendSpec.scala @@ -0,0 +1,175 @@ +package se.scalablesolutions.akka.persistence.couchdb + +import org.specs._ +import org.scalatest.junit.JUnitRunner +import org.junit.runner.RunWith + +import se.scalablesolutions.akka.serialization.Serializable +import se.scalablesolutions.akka.serialization.Serializer._ + +import CouchDBStorageBackend._ +import sbinary._ +import sbinary.Operations._ +import sbinary.DefaultProtocol._ +import java.util.{Calendar, Date} + +@RunWith(classOf[JUnitRunner]) +class CouchDBStorageBackendSpec extends Specification { + doBeforeSpec { + CouchDBStorageBackend.create() + } + + doAfterSpec { + CouchDBStorageBackend.drop() + } + "CouchDBStorageBackend store and query in map" should { + "enter 4 entries for transaction T-1" in { + insertMapStorageEntryFor("T-1", "debasish.company".getBytes, "anshinsoft".getBytes) + insertMapStorageEntryFor("T-1", "debasish.language".getBytes, "java".getBytes) + insertMapStorageEntryFor("T-1", "debasish.age".getBytes, "44".getBytes) + insertMapStorageEntryFor("T-1", "debasish.spouse".getBytes, "paramita".getBytes) + + getMapStorageSizeFor("T-1") mustEqual(4) + new String(getMapStorageEntryFor( + "T-1", "debasish.language".getBytes).get) mustEqual("java") + + } + + "enter key/values for another transaction T-2" in { + insertMapStorageEntryFor("T-2", "debasish.age".getBytes, "49".getBytes) + insertMapStorageEntryFor("T-2", "debasish.spouse".getBytes, "paramita".getBytes) + getMapStorageSizeFor("T-1") mustEqual(4) + getMapStorageSizeFor("T-2") mustEqual(2) + } + + // "remove map storage for T-1 and T2" in { + // removeMapStorageFor("T-1") + // removeMapStorageFor("T-2") + // } + } + + "CouchDBStorageBackend store and query long value in map" should { + "enter 4 entries for transaction T-11" in { + val d = Calendar.getInstance.getTime.getTime + insertMapStorageEntryFor("T-11", "debasish".getBytes, toByteArray[Long](d)) + + getMapStorageSizeFor("T-11") mustEqual(1) + fromByteArray[Long](getMapStorageEntryFor("T-11", "debasish".getBytes).get) mustEqual(d) + } + + // "should remove map storage for T-11" in { + // removeMapStorageFor("T-11") + // } + } + + + "Range query in maps" should { + "enter 7 entries in redis for transaction T-5" in { + insertMapStorageEntryFor("T-5", "trade.refno".getBytes, "R-123".getBytes) + insertMapStorageEntryFor("T-5", "trade.instrument".getBytes, "IBM".getBytes) + insertMapStorageEntryFor("T-5", "trade.type".getBytes, "BUY".getBytes) + insertMapStorageEntryFor("T-5", "trade.account".getBytes, "A-123".getBytes) + insertMapStorageEntryFor("T-5", "trade.amount".getBytes, "1000000".getBytes) + insertMapStorageEntryFor("T-5", "trade.quantity".getBytes, "1000".getBytes) + insertMapStorageEntryFor("T-5", "trade.broker".getBytes, "Nomura".getBytes) + getMapStorageSizeFor("T-5") mustEqual(7) + + getMapStorageRangeFor("T-5", + Some("trade.account".getBytes), + None, 3).map(e => (new String(e._1), new String(e._2))).size mustEqual(3) + + getMapStorageRangeFor("T-5", + Some("trade.account".getBytes), + Some("trade.type".getBytes), 3).map(e => (new String(e._1), new String(e._2))).size mustEqual(3) + + getMapStorageRangeFor("T-5", + Some("trade.amount".getBytes), + Some("trade.type".getBytes), 0).map(e => (new String(e._1), new String(e._2))).size mustEqual(6) + + getMapStorageRangeFor("T-5", + Some("trade.account".getBytes), + None, 0).map(e => (new String(e._1), new String(e._2))).size mustEqual(7) + } + + "remove map storage for T5" in { + removeMapStorageFor("T-5") + } + } + + "Store and query objects in maps" should { + import NameSerialization._ + "write a Name object and fetch it properly" in { + val dtb = Calendar.getInstance.getTime + val n = Name(100, "debasish ghosh", "kolkata", dtb, Some(dtb)) + + insertMapStorageEntryFor("T-31", "debasish".getBytes, toByteArray[Name](n)) + getMapStorageSizeFor("T-31") mustEqual(1) + fromByteArray[Name](getMapStorageEntryFor("T-31", "debasish".getBytes).getOrElse(Array[Byte]())) mustEqual(n) + } + + "should remove map storage for T31" in { + removeMapStorageFor("T-31") + } + } + + "Store and query in vectors" should { + "write 4 entries in a vector for transaction T-3" in { + insertVectorStorageEntryFor("T-3", "debasish".getBytes) + insertVectorStorageEntryFor("T-3", "maulindu".getBytes) + insertVectorStorageEntryFor("T-3", "1200".getBytes) + + val dt = Calendar.getInstance.getTime.getTime + insertVectorStorageEntryFor("T-3", toByteArray[Long](dt)) + getVectorStorageSizeFor("T-3") mustEqual(4) + fromByteArray[Long](getVectorStorageEntryFor("T-3", 0)) mustEqual(dt) + getVectorStorageSizeFor("T-3") mustEqual(4) + // removeVectorStorageFor("T-3") + // getVectorStorageSizeFor("T-3") mustEqual(0) + } + } + + "Store and query objects in vectors" should { + import NameSerialization._ + "write a Name object and fetch it properly" in { + val dtb = Calendar.getInstance.getTime + val n = Name(100, "debasish ghosh", "kolkata", dtb, Some(dtb)) + + insertVectorStorageEntryFor("T-31", toByteArray[Name](n)) + getVectorStorageSizeFor("T-31") mustEqual(1) + fromByteArray[Name](getVectorStorageEntryFor("T-31", 0)) mustEqual(n) + } + } + + "Store and query in ref" should { + import NameSerialization._ + "write 4 entries in 4 refs for transaction T-4" in { + insertRefStorageFor("T-4", "debasish".getBytes) + insertRefStorageFor("T-4", "maulindu".getBytes) + + insertRefStorageFor("T-4", "1200".getBytes) + new String(getRefStorageFor("T-4").get) mustEqual("1200") + } + "should write a Name object and fetch it properly" in { + val dtb = Calendar.getInstance.getTime + val n = Name(100, "debasish ghosh", "kolkata", dtb, Some(dtb)) + insertRefStorageFor("T-4", toByteArray[Name](n)) + fromByteArray[Name](getRefStorageFor("T-4").get) mustEqual(n) + } + } +} + +object NameSerialization { + implicit object DateFormat extends Format[Date] { + def reads(in : Input) = + new Date(read[Long](in)) + + def writes(out: Output, value: Date) = + write[Long](out, value.getTime) + } + + case class Name(id: Int, name: String, + address: String, dateOfBirth: Date, dateDied: Option[Date]) + + implicit val NameFormat: Format[Name] = + asProduct5(Name)(Name.unapply(_).get) +} diff --git a/project/build/AkkaProject.scala b/project/build/AkkaProject.scala index 8fc209c716..056e9c83ef 100644 --- a/project/build/AkkaProject.scala +++ b/project/build/AkkaProject.scala @@ -193,6 +193,7 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) { lazy val redis = "com.redis" % "redisclient" % "2.8.0-2.0.1" % "compile" lazy val sbinary = "sbinary" % "sbinary" % "2.8.0-0.3.1" % "compile" + lazy val specs = "org.scala-tools.testing" % "specs_2.8.0" % "1.6.5" % "test" lazy val sjson = "sjson.json" % "sjson" % "0.8-2.8.0" % "compile" lazy val sjson_test = "sjson.json" % "sjson" % "0.8-2.8.0" % "test" @@ -499,7 +500,7 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) { new AkkaHbaseProject(_), akka_persistence_common) lazy val akka_persistence_voldemort = project("akka-persistence-voldemort", "akka-persistence-voldemort", new AkkaVoldemortProject(_), akka_persistence_common) - lazy val akka_persistence_couchdb = project("akka_persistence_couchdb", "akka_persistence_couchdb", + lazy val akka_persistence_couchdb = project("akka-persistence-couchdb", "akka-persistence-couchdb", new AkkaCouchDBProject(_), akka_persistence_common) } @@ -520,7 +521,7 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) { val commons_codec = Dependencies.commons_codec val redis = Dependencies.redis - override def testOptions = createTestFilter( _.endsWith("Test")) + // override def testOptions = createTestFilter( _.endsWith("Test")) } // ------------------------------------------------------------------------------------------------------------------- @@ -601,7 +602,11 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) { } class AkkaCouchDBProject(info: ProjectInfo) extends AkkaDefaultProject(info, distPath) { + val specs = Dependencies.specs + val httpclient = "maven2" at "http://repo1.maven.org/maven2/" + val commonsHttpClient = "commons-httpclient" % "commons-httpclient" % "3.1" % "compile" + // override def testOptions = createTestFilter( _.endsWith("Test")) } // ------------------------------------------------------------------------------------------------------------------- @@ -707,7 +712,7 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) { val commons_fileupload = "commons-fileupload" % "commons-fileupload" % "1.2.1" % "compile" intransitive val jms_1_1 = "org.apache.geronimo.specs" % "geronimo-jms_1.1_spec" % "1.1.1" % "compile" intransitive val joda = "joda-time" % "joda-time" % "1.6" intransitive - + override def packageAction = task { val libs: Seq[Path] = managedClasspath(config("compile")).get.toSeq