added support for Redis based SortedSet persistence in Akka transactors

This commit is contained in:
Debasish Ghosh 2010-03-18 10:51:22 +05:30
parent 3010cd1aa2
commit 02e6f5895a
10 changed files with 404 additions and 21 deletions

View file

@ -51,18 +51,24 @@ trait Storage {
def newRef: PersistentRef[ElementType]
def newQueue: PersistentQueue[ElementType] = // only implemented for redis
throw new UnsupportedOperationException
def newSortedSet: PersistentSortedSet[ElementType] = // only implemented for redis
throw new UnsupportedOperationException
def getMap(id: String): PersistentMap[ElementType, ElementType]
def getVector(id: String): PersistentVector[ElementType]
def getRef(id: String): PersistentRef[ElementType]
def getQueue(id: String): PersistentQueue[ElementType] = // only implemented for redis
throw new UnsupportedOperationException
def getSortedSet(id: String): PersistentSortedSet[ElementType] = // only implemented for redis
throw new UnsupportedOperationException
def newMap(id: String): PersistentMap[ElementType, ElementType]
def newVector(id: String): PersistentVector[ElementType]
def newRef(id: String): PersistentRef[ElementType]
def newQueue(id: String): PersistentQueue[ElementType] = // only implemented for redis
throw new UnsupportedOperationException
def newSortedSet(id: String): PersistentSortedSet[ElementType] = // only implemented for redis
throw new UnsupportedOperationException
}
/**
@ -398,3 +404,119 @@ trait PersistentQueue[A] extends scala.collection.mutable.Queue[A]
transaction.get.get.register(uuid, this)
}
}
/**
* Implements a template for a concrete persistent transactional sorted set based storage.
* <p/>
* Sorting is done based on a <i>zscore</i>. But the computation of zscore has been kept
* outside the abstraction.
* <p/>
* zscore can be implemented in a variety of ways by the calling class:
* <pre>
* trait ZScorable {
* def toZScore: Float
* }
*
* class Foo extends ZScorable {
* //.. implemnetation
* }
* </pre>
* Or we can also use views:
* <pre>
* class Foo {
* //..
* }
*
* implicit def Foo2Scorable(foo: Foo): ZScorable = new ZScorable {
* def toZScore = {
* //..
* }
* }
* </pre>
*
* and use <tt>foo.toZScore</tt> to compute the zscore and pass to the APIs.
*
* @author <a href="http://debasishg.blogspot.com"</a>
*/
trait PersistentSortedSet[A]
extends Transactional
with Committable {
protected val newElems = TransactionalState.newMap[A, Float]
protected val removedElems = TransactionalState.newVector[A]
val storage: SortedSetStorageBackend[A]
def commit = {
for ((element, score) <- newElems) storage.zadd(uuid, String.valueOf(score), element)
for (element <- removedElems) storage.zrem(uuid, element)
newElems.clear
removedElems.clear
}
def +(elem: A, score: Float) = add(elem, score)
def add(elem: A, score: Float) = {
register
newElems.put(elem, score)
}
def -(elem: A) = remove(elem)
def remove(elem: A) = {
register
removedElems.add(elem)
}
private def inStorage(elem: A): Option[Float] = storage.zscore(uuid, elem) match {
case Some(s) => Some(s.toFloat)
case None => None
}
def contains(elem: A): Boolean = {
if (newElems contains elem) true
else {
inStorage(elem) match {
case Some(f) => true
case None => false
}
}
}
def size: Int = newElems.size + storage.zcard(uuid) - removedElems.size
def zscore(elem: A): Float = {
if (newElems contains elem) newElems.get(elem).get
inStorage(elem) match {
case Some(f) => f
case None =>
throw new Predef.NoSuchElementException(elem + " not present")
}
}
implicit def order(x: (A, Float)) = new Ordered[(A, Float)] {
def compare(that: (A, Float)) = x._2 compare that._2
}
def zrange(start: Int, end: Int): List[(A, Float)] = {
// need to operate on the whole range
// get all from the underlying storage
val fromStore = storage.zrangeWithScore(uuid, 0, -1)
val ts = scala.collection.immutable.TreeSet(fromStore: _*) ++ newElems.toList
val l = ts.size
// -1 means the last element, -2 means the second last
val s = if (start < 0) start + l else start
val e =
if (end < 0) end + l
else if (end >= l) (l - 1)
else end
// slice is open at the end, we need a closed end range
ts.elements.slice(s, e + 1).toList
}
private def register = {
if (transaction.get.isEmpty) throw new NoTransactionInScopeException
transaction.get.get.register(uuid, this)
}
}

View file

@ -69,11 +69,15 @@ trait SortedSetStorageBackend[T] extends StorageBackend {
// remove item from sorted set identified by name
def zrem(name: String, item: T): Boolean
// cardinality of the set idnetified by name
// cardinality of the set identified by name
def zcard(name: String): Int
def zscore(name: String, item: T): String
// zscore of the item from sorted set identified by name
def zscore(name: String, item: T): Option[Float]
// zrange from the sorted set identified by name
def zrange(name: String, start: Int, end: Int): List[T]
}
// zrange with score from the sorted set identified by name
def zrangeWithScore(name: String, start: Int, end: Int): List[(T, Float)]
}

View file

@ -15,16 +15,20 @@ object RedisStorage extends Storage {
def newVector: PersistentVector[ElementType] = newVector(UUID.newUuid.toString)
def newRef: PersistentRef[ElementType] = newRef(UUID.newUuid.toString)
override def newQueue: PersistentQueue[ElementType] = newQueue(UUID.newUuid.toString)
override def newSortedSet: PersistentSortedSet[ElementType] = newSortedSet(UUID.newUuid.toString)
def getMap(id: String): PersistentMap[ElementType, ElementType] = newMap(id)
def getVector(id: String): PersistentVector[ElementType] = newVector(id)
def getRef(id: String): PersistentRef[ElementType] = newRef(id)
override def getQueue(id: String): PersistentQueue[ElementType] = newQueue(id)
override def getSortedSet(id: String): PersistentSortedSet[ElementType] = newSortedSet(id)
def newMap(id: String): PersistentMap[ElementType, ElementType] = new RedisPersistentMap(id)
def newVector(id: String): PersistentVector[ElementType] = new RedisPersistentVector(id)
def newRef(id: String): PersistentRef[ElementType] = new RedisPersistentRef(id)
override def newQueue(id: String): PersistentQueue[ElementType] = new RedisPersistentQueue(id)
override def newSortedSet(id: String): PersistentSortedSet[ElementType] =
new RedisPersistentSortedSet(id)
}
/**
@ -63,3 +67,14 @@ class RedisPersistentQueue(id: String) extends PersistentQueue[Array[Byte]] {
val uuid = id
val storage = RedisStorageBackend
}
/**
* Implements a persistent transactional sorted set based on the Redis
* storage.
*
* @author <a href="http://debasishg.blogspot.com">Debasish Ghosh</a>
*/
class RedisPersistentSortedSet(id: String) extends PersistentSortedSet[Array[Byte]] {
val uuid = id
val storage = RedisStorageBackend
}

View file

@ -364,11 +364,10 @@ private [akka] object RedisStorageBackend extends
}
}
def zscore(name: String, item: Array[Byte]): String = withErrorHandling {
def zscore(name: String, item: Array[Byte]): Option[Float] = withErrorHandling {
db.zscore(new String(encode(name.getBytes)), new String(item)) match {
case None =>
throw new Predef.NoSuchElementException(new String(item) + " not present")
case Some(s) => s
case Some(s) => Some(s.toFloat)
case None => None
}
}
@ -380,6 +379,16 @@ private [akka] object RedisStorageBackend extends
s.map(_.get.getBytes)
}
}
def zrangeWithScore(name: String, start: Int, end: Int): List[(Array[Byte], Float)] = withErrorHandling {
db.zrangeWithScore(
new String(encode(name.getBytes)), start.toString, end.toString, RedisClient.ASC) match {
case None =>
throw new Predef.NoSuchElementException(name + " not present")
case Some(l) =>
l.map{ case (elem, score) => (elem.get.getBytes, score.get.toFloat) }
}
}
def flushDB = withErrorHandling(db.flushDb)

View file

@ -87,7 +87,7 @@ class AccountActor extends Transactor {
}
@serializable class PersistentFailerActor extends Transactor {
//timeout = 5000
// timeout = 5000
def receive = {
case "Failure" =>
throw new RuntimeException("expected")

View file

@ -0,0 +1,237 @@
package se.scalablesolutions.akka.persistence.redis
import org.scalatest.Spec
import org.scalatest.Assertions
import org.scalatest.matchers.ShouldMatchers
import org.scalatest.BeforeAndAfterAll
import org.scalatest.junit.JUnitRunner
import org.junit.runner.RunWith
import se.scalablesolutions.akka.actor.{Actor, Transactor}
/**
* A persistent actor based on Redis sortedset storage.
* <p/>
* Needs a running Redis server.
* @author <a href="http://debasishg.blogspot.com">Debasish Ghosh</a>
*/
trait ZScorable {
def zscore: Float
}
case class Hacker(name: String, birth: String) extends ZScorable {
def zscore = birth.toFloat
}
class SetThresholdViolationException extends RuntimeException
// add hacker to the set
case class ADD(h: Hacker)
// remove hacker from set
case class REMOVE(h: Hacker)
// size of the set
case object SIZE
// zscore of the hacker
case class SCORE(h: Hacker)
// zrange
case class RANGE(start: Int, end: Int)
// add and remove subject to the condition that there will be at least 3 hackers
case class MULTI(add: List[Hacker], rem: List[Hacker], failer: Actor)
class SortedSetActor extends Transactor {
timeout = 100000
private lazy val hackers = RedisStorage.newSortedSet
def receive = {
case ADD(h) =>
hackers.+(h.name.getBytes, h.zscore)
reply(true)
case REMOVE(h) =>
hackers.-(h.name.getBytes)
reply(true)
case SIZE =>
reply(hackers.size)
case SCORE(h) =>
reply(hackers.zscore(h.name.getBytes))
case RANGE(s, e) =>
reply(hackers.zrange(s, e))
case MULTI(a, r, failer) =>
a.foreach{ h: Hacker =>
hackers.+(h.name.getBytes, h.zscore)
}
try {
r.foreach{ h =>
if (hackers.size <= 3)
throw new SetThresholdViolationException
hackers.-(h.name.getBytes)
}
} catch {
case e: Exception =>
failer !! "Failure"
}
reply((a.size, r.size))
}
}
import RedisStorageBackend._
@RunWith(classOf[JUnitRunner])
class RedisPersistentSortedSetSpec extends
Spec with
ShouldMatchers with
BeforeAndAfterAll {
override def beforeAll {
flushDB
println("** destroyed database")
}
override def afterAll {
flushDB
println("** destroyed database")
}
val h1 = Hacker("Alan kay", "1940")
val h2 = Hacker("Richard Stallman", "1953")
val h3 = Hacker("Yukihiro Matsumoto", "1965")
val h4 = Hacker("Claude Shannon", "1916")
val h5 = Hacker("Linus Torvalds", "1969")
val h6 = Hacker("Alan Turing", "1912")
describe("Add and report cardinality of the set") {
val qa = new SortedSetActor
qa.start
it("should enter 6 hackers") {
qa !! ADD(h1)
qa !! ADD(h2)
qa !! ADD(h3)
qa !! ADD(h4)
qa !! ADD(h5)
qa !! ADD(h6)
(qa !! SIZE).get.asInstanceOf[Int] should equal(6)
}
it("should fetch correct scores for hackers") {
(qa !! SCORE(h1)).get.asInstanceOf[Float] should equal(1940.0f)
(qa !! SCORE(h5)).get.asInstanceOf[Float] should equal(1969.0f)
(qa !! SCORE(h6)).get.asInstanceOf[Float] should equal(1912.0f)
}
it("should fetch proper range") {
(qa !! RANGE(0, 4)).get.asInstanceOf[List[_]].size should equal(5)
(qa !! RANGE(0, 6)).get.asInstanceOf[List[_]].size should equal(6)
}
it("should remove and throw exception for removing non-existent hackers") {
qa !! REMOVE(h2)
(qa !! SIZE).get.asInstanceOf[Int] should equal(5)
qa !! REMOVE(h3)
(qa !! SIZE).get.asInstanceOf[Int] should equal(4)
val h7 = Hacker("Paul Snively", "1952")
try {
qa !! REMOVE(h7)
}
catch {
case e: Predef.NoSuchElementException =>
e.getMessage should endWith("not present")
}
}
it("should change score for entering the same hacker name with diff score") {
(qa !! SIZE).get.asInstanceOf[Int] should equal(4)
// same name as h6
val h7 = Hacker("Alan Turing", "1992")
qa !! ADD(h7)
// size remains same
(qa !! SIZE).get.asInstanceOf[Int] should equal(4)
// score updated
(qa !! SCORE(h7)).get.asInstanceOf[Float] should equal(1992.0f)
}
}
describe("Transaction semantics") {
it("should rollback on exception") {
val qa = new SortedSetActor
qa.start
val failer = new PersistentFailerActor
failer.start
(qa !! SIZE).get.asInstanceOf[Int] should equal(0)
val add = List(h1, h2, h3, h4)
val rem = List(h2)
(qa !! MULTI(add, rem, failer)).get.asInstanceOf[Tuple2[Int, Int]] should equal((4,1))
(qa !! SIZE).get.asInstanceOf[Int] should equal(3)
// size == 3
// add 2 more
val add1 = List(h5, h6)
// remove 3
val rem1 = List(h1, h3, h4)
try {
qa !! MULTI(add1, rem1, failer)
} catch { case e: Exception => {}
}
(qa !! SIZE).get.asInstanceOf[Int] should equal(3)
}
}
describe("zrange") {
it ("should report proper range") {
val qa = new SortedSetActor
qa.start
qa !! ADD(h1)
qa !! ADD(h2)
qa !! ADD(h3)
qa !! ADD(h4)
qa !! ADD(h5)
qa !! ADD(h6)
(qa !! SIZE).get.asInstanceOf[Int] should equal(6)
val l = (qa !! RANGE(0, 6)).get.asInstanceOf[List[(Array[Byte], Float)]]
l.map { case (e, s) => (new String(e), s) }.head should equal(("Alan Turing", 1912.0f))
val h7 = Hacker("Alan Turing", "1992")
qa !! ADD(h7)
(qa !! SIZE).get.asInstanceOf[Int] should equal(6)
val m = (qa !! RANGE(0, 6)).get.asInstanceOf[List[(Array[Byte], Float)]]
m.map { case (e, s) => (new String(e), s) }.head should equal(("Claude Shannon", 1916.0f))
}
it ("should report proper rge") {
val qa = new SortedSetActor
qa.start
qa !! ADD(h1)
qa !! ADD(h2)
qa !! ADD(h3)
qa !! ADD(h4)
qa !! ADD(h5)
qa !! ADD(h6)
(qa !! SIZE).get.asInstanceOf[Int] should equal(6)
(qa !! RANGE(0, 5)).get.asInstanceOf[List[_]].size should equal(6)
(qa !! RANGE(0, 6)).get.asInstanceOf[List[_]].size should equal(6)
(qa !! RANGE(0, 3)).get.asInstanceOf[List[_]].size should equal(4)
(qa !! RANGE(0, 1)).get.asInstanceOf[List[_]].size should equal(2)
(qa !! RANGE(0, 0)).get.asInstanceOf[List[_]].size should equal(1)
(qa !! RANGE(3, 1)).get.asInstanceOf[List[_]].size should equal(0)
(qa !! RANGE(0, -1)).get.asInstanceOf[List[_]].size should equal(6)
(qa !! RANGE(0, -2)).get.asInstanceOf[List[_]].size should equal(5)
(qa !! RANGE(0, -4)).get.asInstanceOf[List[_]].size should equal(3)
(qa !! RANGE(-4, -1)).get.asInstanceOf[List[_]].size should equal(4)
}
}
}

View file

@ -191,10 +191,10 @@ class RedisStorageBackendSpec extends
zcard("hackers") should equal(6)
zscore("hackers", "alan turing".getBytes) should equal("1912")
zscore("hackers", "richard stallman".getBytes) should equal("1953")
zscore("hackers", "claude shannon".getBytes) should equal("1916")
zscore("hackers", "linus torvalds".getBytes) should equal("1969")
zscore("hackers", "alan turing".getBytes).get should equal(1912.0f)
zscore("hackers", "richard stallman".getBytes).get should equal(1953.0f)
zscore("hackers", "claude shannon".getBytes).get should equal(1916.0f)
zscore("hackers", "linus torvalds".getBytes).get should equal(1969.0f)
val s: List[Array[Byte]] = zrange("hackers", 0, 2)
s.size should equal(3)
@ -206,6 +206,10 @@ class RedisStorageBackendSpec extends
val t: List[Array[Byte]] = zrange("hackers", 0, -1)
t.size should equal(6)
t.map(new String(_)) should equal(sorted)
val u: List[(Array[Byte], Float)] = zrangeWithScore("hackers", 0, -1)
u.size should equal(6)
u.map{ case (e, s) => new String(e) } should equal(sorted)
}
}
}

View file

@ -1,8 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.redis</groupId>
<artifactId>redisclient</artifactId>
<version>1.1</version>
<packaging>jar</packaging>
</project>

View file

@ -240,7 +240,7 @@ class AkkaParent(info: ProjectInfo) extends DefaultProject(info) {
}
class AkkaRedisProject(info: ProjectInfo) extends DefaultProject(info) {
val redis = "com.redis" % "redisclient" % "1.1" % "compile"
val redis = "com.redis" % "redisclient" % "1.2-SNAPSHOT" % "compile"
override def testOptions = TestFilter((name: String) => name.endsWith("Test")) :: Nil
lazy val dist = deployTask(info, distPath) dependsOn(`package`) describedAs("Deploying")
}