Add couchdb support

This commit is contained in:
Yung-Luen Lan 2010-10-05 19:55:39 +08:00
parent 32f4a63163
commit 04871d8d30
4 changed files with 393 additions and 5 deletions

View file

@ -5,10 +5,209 @@ import se.scalablesolutions.akka.persistence.common._
import se.scalablesolutions.akka.util.Logging
import se.scalablesolutions.akka.config.Config.config
import org.apache.commons.httpclient.methods.{GetMethod, PostMethod, PutMethod, DeleteMethod}
import org.apache.commons.httpclient.params.HttpMethodParams
import org.apache.commons.httpclient.methods._
import org.apache.commons.httpclient.{DefaultHttpMethodRetryHandler, HttpClient}
import scala.util.parsing.json._;
import sjson.json._
import DefaultProtocol._
private [akka] object CouchDBStorageBackend extends
MapStorageBackend[Array[Byte], Array[Byte]] with
VectorStorageBackend[Array[Byte]] with
RefStorageBackend[Array[Byte]] with
Logging {
// need couch db implementation!
}
import dispatch.json._
implicit object widgetWrites extends Writes[Map[String,Any]] {
def writes(o: Map[String,Any]): JsValue = JsValue(o)
}
val URL = config.getString("akka.storage.couchdb.url", "http://localhost:5984/testakka/")
def drop() = {
val client: HttpClient = new HttpClient()
val delete: DeleteMethod = new DeleteMethod(URL)
client.executeMethod(delete)
}
def create() = {
val client: HttpClient = new HttpClient()
val put: PutMethod = new PutMethod(URL)
put.setRequestEntity(new StringRequestEntity("", null, "utf-8"))
put.setRequestHeader("Content-Type", "application/json")
client.executeMethod(put)
put.getResponseBodyAsString
}
private def storeMap(name: String, entries: List[(Array[Byte], Array[Byte])]) ={
var m = entries.map(e=>(new String(e._1) -> new String(e._2))).toMap + ("_id" -> name)
val dataJson = JsonSerialization.tojson(m)
postData(URL, dataJson.toString)
}
private def storeMap(name: String, entries: Map[String, Any]) ={
postData(URL, JsonSerialization.tojson(entries + ("_id" -> name)).toString)
}
def insertMapStorageEntriesFor(name: String, entries: List[(Array[Byte], Array[Byte])]) ={
val newDoc = getResponseForNameAsMap(name).getOrElse(Map[String, Any]()) ++
entries.map(e=>(new String(e._1) -> new String(e._2))).toMap
storeMap(name, newDoc)
}
def insertMapStorageEntryFor(name: String, key: Array[Byte], value: Array[Byte])={
insertMapStorageEntriesFor(name, List((key, value)))
}
def removeMapStorageFor(name: String): Unit = {
findDocRev(name).flatMap(deleteData(URL + name, _))
}
def removeMapStorageFor(name: String, key: Array[Byte]): Unit = {
getResponseForNameAsMap(name).flatMap(doc=>{ // if we can't find the map for name, then we don't need to delete it.
removeMapStorageFor(name)
storeMap(name, doc - new String(key))
})
}
def getMapStorageEntryFor(name: String, key: Array[Byte]): Option[Array[Byte]] = {
getResponseForNameAsMap(name).flatMap(_.get(new String(key))).asInstanceOf[Option[String]]
.map(_.getBytes)
}
def getMapStorageSizeFor(name: String): Int = {
getMapStorageFor(name).size
}
def getMapStorageFor(name: String): List[(Array[Byte], Array[Byte])] = {
val m = getResponseForNameAsMap(name).map(_ - ("_id", "_rev")).getOrElse(Map[String, Any]())
m.toList.map(e => (e._1.getBytes, e._2.asInstanceOf[String].getBytes))
}
def getMapStorageRangeFor(name: String, start: Option[Array[Byte]], finish: Option[Array[Byte]], count: Int): List[(Array[Byte], Array[Byte])] = {
val m = getResponseForNameAsMap(name).map(_ - ("_id", "_rev")).getOrElse(Map[String, Any]())
val keys = m.keys.toList.sortWith(_ < _)
// if the supplied start is not defined, get the head of keys
val s = start.map(new String(_)).getOrElse(keys.head)
// if the supplied finish is not defined, get the last element of keys
val f = finish.map(new String(_)).getOrElse(keys.last)
val c = if (count == 0) Int.MaxValue else count
// slice from keys: both ends inclusive
val ks = keys.slice(keys.indexOf(s), scala.math.min(keys.indexOf(s) + c, keys.indexOf(f) + 1))
ks.map(k => (k.getBytes, m(k).asInstanceOf[String].getBytes))
}
def insertVectorStorageEntryFor(name: String, element: Array[Byte]) = {
insertVectorStorageEntriesFor(name, List(element))
}
def insertVectorStorageEntriesFor(name: String, elements: List[Array[Byte]]) = {
val m = getResponseForNameAsMap(name).getOrElse(Map[String, Any]())
val v = elements.map(x=>new String(x)) ::: m.getOrElse("vector", List[String]()).asInstanceOf[List[String]]
storeMap(name, m + ("vector" -> v))
}
def updateVectorStorageEntryFor(name: String, index: Int, elem: Array[Byte]) = {
val m = getResponseForNameAsMap(name).getOrElse(Map[String, Any]())
val v: List[String] = m.getOrElse("vector", List[String]()).asInstanceOf[List[String]]
if (v.indices.contains(index)) {
storeMap(name, m + ("vector" -> v.updated(index, new String(elem))))
}
}
def getVectorStorageEntryFor(name: String, index: Int): Array[Byte] ={
val v = getResponseForNameAsMap(name).flatMap(_.get("vector")).getOrElse(List[String]()).asInstanceOf[List[String]]
if (v.indices.contains(index))
v(index).getBytes
else
Array[Byte]()
}
def getVectorStorageSizeFor(name: String): Int ={
getResponseForNameAsMap(name).flatMap(_.get("vector")).map(_.asInstanceOf[List[String]].size).getOrElse(0)
}
def getVectorStorageRangeFor(name: String, start: Option[Int], finish: Option[Int], count: Int): List[Array[Byte]] = {
val v = getResponseForNameAsMap(name).flatMap(_.get("vector")).asInstanceOf[Option[List[String]]].getOrElse(List[String]())
val s = start.getOrElse(0)
val f = finish.getOrElse(v.length)
val c = if (count == 0) v.length else count
v.slice(s, scala.math.min(s + c, f)).map(_.getBytes)
}
def insertRefStorageFor(name: String, element: Array[Byte]) ={
val newDoc = getResponseForNameAsMap(name).getOrElse(Map[String, Any]()) ++ Map("ref" -> new String(element))
storeMap(name, newDoc)
}
def getRefStorageFor(name: String): Option[Array[Byte]] ={
getResponseForNameAsMap(name).flatMap(_.get("ref")).map(_.asInstanceOf[String].getBytes)
}
private def findDocRev(name: String) = {
getResponse(URL + name).flatMap(JSON.parseFull(_)).asInstanceOf[Option[Map[String, Any]]]
.flatMap(_.get("_rev")).asInstanceOf[Option[String]]
}
private def deleteData(url:String, rev:String): Option[String] = {
val client: HttpClient = new HttpClient()
val delete: DeleteMethod = new DeleteMethod(url)
delete.setRequestHeader("If-Match", rev)
client.executeMethod(delete)
val response = delete.getResponseBodyAsString()
if (response != null)
Some(response)
else
None
}
private def postData(url: String, data: String): Option[String] = {
val client: HttpClient = new HttpClient()
val post: PostMethod = new PostMethod(url)
post.setRequestEntity(new StringRequestEntity(data, null, "utf-8"))
post.setRequestHeader("Content-Type", "application/json")
client.executeMethod(post)
val response = post.getResponseBodyAsString
if (response != null)
Some(response)
else
None
}
private def getResponseForNameAsMap(name: String): Option[Map[String, Any]] = {
getResponse(URL + name).flatMap(JSON.parseFull(_)).asInstanceOf[Option[Map[String, Any]]]
}
private def getResponse(url: String): Option[String] = {
val client = new HttpClient()
val method = new GetMethod(url)
method.getParams().setParameter(HttpMethodParams.RETRY_HANDLER,
new DefaultHttpMethodRetryHandler(3, false))
client.executeMethod(method)
val response = method.getResponseBodyAsString
if (method.getStatusCode == 200 && response != null)
Some(response)
else
None
}
}

View file

@ -0,0 +1,9 @@
package se.scalablesolutions.akka.persistence.couchdb
import org.scalatest.junit.JUnitSuite
import org.junit.{Test, Before}
import org.junit.Assert._
import se.scalablesolutions.akka.actor.{ActorRef, Transactor}
import se.scalablesolutions.akka.actor.Actor._

View file

@ -0,0 +1,175 @@
package se.scalablesolutions.akka.persistence.couchdb
import org.specs._
import org.scalatest.junit.JUnitRunner
import org.junit.runner.RunWith
import se.scalablesolutions.akka.serialization.Serializable
import se.scalablesolutions.akka.serialization.Serializer._
import CouchDBStorageBackend._
import sbinary._
import sbinary.Operations._
import sbinary.DefaultProtocol._
import java.util.{Calendar, Date}
@RunWith(classOf[JUnitRunner])
class CouchDBStorageBackendSpec extends Specification {
doBeforeSpec {
CouchDBStorageBackend.create()
}
doAfterSpec {
CouchDBStorageBackend.drop()
}
"CouchDBStorageBackend store and query in map" should {
"enter 4 entries for transaction T-1" in {
insertMapStorageEntryFor("T-1", "debasish.company".getBytes, "anshinsoft".getBytes)
insertMapStorageEntryFor("T-1", "debasish.language".getBytes, "java".getBytes)
insertMapStorageEntryFor("T-1", "debasish.age".getBytes, "44".getBytes)
insertMapStorageEntryFor("T-1", "debasish.spouse".getBytes, "paramita".getBytes)
getMapStorageSizeFor("T-1") mustEqual(4)
new String(getMapStorageEntryFor(
"T-1", "debasish.language".getBytes).get) mustEqual("java")
}
"enter key/values for another transaction T-2" in {
insertMapStorageEntryFor("T-2", "debasish.age".getBytes, "49".getBytes)
insertMapStorageEntryFor("T-2", "debasish.spouse".getBytes, "paramita".getBytes)
getMapStorageSizeFor("T-1") mustEqual(4)
getMapStorageSizeFor("T-2") mustEqual(2)
}
// "remove map storage for T-1 and T2" in {
// removeMapStorageFor("T-1")
// removeMapStorageFor("T-2")
// }
}
"CouchDBStorageBackend store and query long value in map" should {
"enter 4 entries for transaction T-11" in {
val d = Calendar.getInstance.getTime.getTime
insertMapStorageEntryFor("T-11", "debasish".getBytes, toByteArray[Long](d))
getMapStorageSizeFor("T-11") mustEqual(1)
fromByteArray[Long](getMapStorageEntryFor("T-11", "debasish".getBytes).get) mustEqual(d)
}
// "should remove map storage for T-11" in {
// removeMapStorageFor("T-11")
// }
}
"Range query in maps" should {
"enter 7 entries in redis for transaction T-5" in {
insertMapStorageEntryFor("T-5", "trade.refno".getBytes, "R-123".getBytes)
insertMapStorageEntryFor("T-5", "trade.instrument".getBytes, "IBM".getBytes)
insertMapStorageEntryFor("T-5", "trade.type".getBytes, "BUY".getBytes)
insertMapStorageEntryFor("T-5", "trade.account".getBytes, "A-123".getBytes)
insertMapStorageEntryFor("T-5", "trade.amount".getBytes, "1000000".getBytes)
insertMapStorageEntryFor("T-5", "trade.quantity".getBytes, "1000".getBytes)
insertMapStorageEntryFor("T-5", "trade.broker".getBytes, "Nomura".getBytes)
getMapStorageSizeFor("T-5") mustEqual(7)
getMapStorageRangeFor("T-5",
Some("trade.account".getBytes),
None, 3).map(e => (new String(e._1), new String(e._2))).size mustEqual(3)
getMapStorageRangeFor("T-5",
Some("trade.account".getBytes),
Some("trade.type".getBytes), 3).map(e => (new String(e._1), new String(e._2))).size mustEqual(3)
getMapStorageRangeFor("T-5",
Some("trade.amount".getBytes),
Some("trade.type".getBytes), 0).map(e => (new String(e._1), new String(e._2))).size mustEqual(6)
getMapStorageRangeFor("T-5",
Some("trade.account".getBytes),
None, 0).map(e => (new String(e._1), new String(e._2))).size mustEqual(7)
}
"remove map storage for T5" in {
removeMapStorageFor("T-5")
}
}
"Store and query objects in maps" should {
import NameSerialization._
"write a Name object and fetch it properly" in {
val dtb = Calendar.getInstance.getTime
val n = Name(100, "debasish ghosh", "kolkata", dtb, Some(dtb))
insertMapStorageEntryFor("T-31", "debasish".getBytes, toByteArray[Name](n))
getMapStorageSizeFor("T-31") mustEqual(1)
fromByteArray[Name](getMapStorageEntryFor("T-31", "debasish".getBytes).getOrElse(Array[Byte]())) mustEqual(n)
}
"should remove map storage for T31" in {
removeMapStorageFor("T-31")
}
}
"Store and query in vectors" should {
"write 4 entries in a vector for transaction T-3" in {
insertVectorStorageEntryFor("T-3", "debasish".getBytes)
insertVectorStorageEntryFor("T-3", "maulindu".getBytes)
insertVectorStorageEntryFor("T-3", "1200".getBytes)
val dt = Calendar.getInstance.getTime.getTime
insertVectorStorageEntryFor("T-3", toByteArray[Long](dt))
getVectorStorageSizeFor("T-3") mustEqual(4)
fromByteArray[Long](getVectorStorageEntryFor("T-3", 0)) mustEqual(dt)
getVectorStorageSizeFor("T-3") mustEqual(4)
// removeVectorStorageFor("T-3")
// getVectorStorageSizeFor("T-3") mustEqual(0)
}
}
"Store and query objects in vectors" should {
import NameSerialization._
"write a Name object and fetch it properly" in {
val dtb = Calendar.getInstance.getTime
val n = Name(100, "debasish ghosh", "kolkata", dtb, Some(dtb))
insertVectorStorageEntryFor("T-31", toByteArray[Name](n))
getVectorStorageSizeFor("T-31") mustEqual(1)
fromByteArray[Name](getVectorStorageEntryFor("T-31", 0)) mustEqual(n)
}
}
"Store and query in ref" should {
import NameSerialization._
"write 4 entries in 4 refs for transaction T-4" in {
insertRefStorageFor("T-4", "debasish".getBytes)
insertRefStorageFor("T-4", "maulindu".getBytes)
insertRefStorageFor("T-4", "1200".getBytes)
new String(getRefStorageFor("T-4").get) mustEqual("1200")
}
"should write a Name object and fetch it properly" in {
val dtb = Calendar.getInstance.getTime
val n = Name(100, "debasish ghosh", "kolkata", dtb, Some(dtb))
insertRefStorageFor("T-4", toByteArray[Name](n))
fromByteArray[Name](getRefStorageFor("T-4").get) mustEqual(n)
}
}
}
object NameSerialization {
implicit object DateFormat extends Format[Date] {
def reads(in : Input) =
new Date(read[Long](in))
def writes(out: Output, value: Date) =
write[Long](out, value.getTime)
}
case class Name(id: Int, name: String,
address: String, dateOfBirth: Date, dateDied: Option[Date])
implicit val NameFormat: Format[Name] =
asProduct5(Name)(Name.unapply(_).get)
}

View file

@ -193,6 +193,7 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) {
lazy val redis = "com.redis" % "redisclient" % "2.8.0-2.0.1" % "compile"
lazy val sbinary = "sbinary" % "sbinary" % "2.8.0-0.3.1" % "compile"
lazy val specs = "org.scala-tools.testing" % "specs_2.8.0" % "1.6.5" % "test"
lazy val sjson = "sjson.json" % "sjson" % "0.8-2.8.0" % "compile"
lazy val sjson_test = "sjson.json" % "sjson" % "0.8-2.8.0" % "test"
@ -499,7 +500,7 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) {
new AkkaHbaseProject(_), akka_persistence_common)
lazy val akka_persistence_voldemort = project("akka-persistence-voldemort", "akka-persistence-voldemort",
new AkkaVoldemortProject(_), akka_persistence_common)
lazy val akka_persistence_couchdb = project("akka_persistence_couchdb", "akka_persistence_couchdb",
lazy val akka_persistence_couchdb = project("akka-persistence-couchdb", "akka-persistence-couchdb",
new AkkaCouchDBProject(_), akka_persistence_common)
}
@ -520,7 +521,7 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) {
val commons_codec = Dependencies.commons_codec
val redis = Dependencies.redis
override def testOptions = createTestFilter( _.endsWith("Test"))
// override def testOptions = createTestFilter( _.endsWith("Test"))
}
// -------------------------------------------------------------------------------------------------------------------
@ -601,7 +602,11 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) {
}
class AkkaCouchDBProject(info: ProjectInfo) extends AkkaDefaultProject(info, distPath) {
val specs = Dependencies.specs
val httpclient = "maven2" at "http://repo1.maven.org/maven2/"
val commonsHttpClient = "commons-httpclient" % "commons-httpclient" % "3.1" % "compile"
// override def testOptions = createTestFilter( _.endsWith("Test"))
}
// -------------------------------------------------------------------------------------------------------------------
@ -707,7 +712,7 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) {
val commons_fileupload = "commons-fileupload" % "commons-fileupload" % "1.2.1" % "compile" intransitive
val jms_1_1 = "org.apache.geronimo.specs" % "geronimo-jms_1.1_spec" % "1.1.1" % "compile" intransitive
val joda = "joda-time" % "joda-time" % "1.6" intransitive
override def packageAction =
task {
val libs: Seq[Path] = managedClasspath(config("compile")).get.toSeq