Fix for Ticket 513 : Implement snapshot based persistence control in SortedSet

This commit is contained in:
Debasish Ghosh 2010-11-11 16:18:16 +05:30
parent 39783de6f3
commit 6c200163cf
4 changed files with 135 additions and 45 deletions

View file

@ -700,6 +700,13 @@ trait PersistentQueue[A] extends scala.collection.mutable.Queue[A]
}
}
private[akka] object PersistentSortedSet {
// operations on the SortedSet
sealed trait Op
case object ADD extends Op
case object REM extends Op
}
/**
* Implements a template for a concrete persistent transactional sorted set based storage.
* <p/>
@ -734,61 +741,45 @@ trait PersistentQueue[A] extends scala.collection.mutable.Queue[A]
* @author <a href="http://debasishg.blogspot.com"</a>
*/
trait PersistentSortedSet[A] extends Transactional with Committable with Abortable {
protected val newElems = TransactionalMap[A, Float]()
protected val removedElems = TransactionalVector[A]()
//Import Ops
import PersistentSortedSet._
// append only log: records all mutating operations
protected val appendOnlyTxLog = TransactionalVector[LogEntry]()
// need to override in subclasses e.g. "sameElements" for Array[Byte]
def equal(v1: A, v2: A): Boolean = v1 == v2
case class LogEntry(value: A, score: Option[Float], op: Op)
val storage: SortedSetStorageBackend[A]
def commit = {
for ((element, score) <- newElems) storage.zadd(uuid, String.valueOf(score), element)
for (element <- removedElems) storage.zrem(uuid, element)
newElems.clear
removedElems.clear
for (entry <- appendOnlyTxLog) {
(entry: @unchecked) match {
case LogEntry(e, Some(s), ADD) => storage.zadd(uuid, String.valueOf(s), e)
case LogEntry(e, _, REM) => storage.zrem(uuid, e)
}
}
appendOnlyTxLog.clear
}
def abort = {
newElems.clear
removedElems.clear
appendOnlyTxLog.clear
}
def +(elem: A, score: Float) = add(elem, score)
def add(elem: A, score: Float) = {
register
newElems.put(elem, score)
appendOnlyTxLog.add(LogEntry(elem, Some(score), ADD))
}
def -(elem: A) = remove(elem)
def remove(elem: A) = {
register
removedElems.add(elem)
}
private def inStorage(elem: A): Option[Float] = storage.zscore(uuid, elem) match {
case Some(s) => Some(s.toFloat)
case None => None
}
def contains(elem: A): Boolean = {
if (newElems contains elem) true
else {
inStorage(elem) match {
case Some(f) => true
case None => false
}
}
}
def size: Int = newElems.size + storage.zcard(uuid) - removedElems.size
def zscore(elem: A): Float = {
if (newElems contains elem) newElems.get(elem).get
inStorage(elem) match {
case Some(f) => f
case None =>
throw new NoSuchElementException(elem + " not present")
}
appendOnlyTxLog.add(LogEntry(elem, None, REM))
}
implicit def order(x: (A, Float)) = new Ordered[(A, Float)] {
@ -799,11 +790,27 @@ trait PersistentSortedSet[A] extends Transactional with Committable with Abortab
def compare(x: (A, Float), y: (A, Float)) = x._2 compare y._2
}
protected def replay: List[(A, Float)] = {
val es = collection.mutable.Map() ++ storage.zrangeWithScore(uuid, 0, -1)
for (entry <- appendOnlyTxLog) {
(entry: @unchecked) match {
case LogEntry(v, Some(s), ADD) => es += ((v, s))
case LogEntry(v, _, REM) => es -= v
}
}
es.toList
}
def contains(elem: A): Boolean = replay.map(_._1).contains(elem)
def size: Int = replay size
def zscore(elem: A): Float = replay.filter { case (e, s) => equal(e, elem) }.map(_._2).head
def zrange(start: Int, end: Int): List[(A, Float)] = {
// need to operate on the whole range
// get all from the underlying storage
val fromStore = storage.zrangeWithScore(uuid, 0, -1)
val ts = scala.collection.immutable.TreeSet(fromStore: _*) ++ newElems.toList
import PersistentSortedSet._
val ts = collection.immutable.TreeSet(replay: _*)
val l = ts.size
// -1 means the last element, -2 means the second last
@ -821,3 +828,21 @@ trait PersistentSortedSet[A] extends Transactional with Committable with Abortab
transaction.get.get.register(uuid, this)
}
}
trait PersistentSortedSetBinary extends PersistentSortedSet[Array[Byte]] {
import PersistentSortedSet._
override def equal(k1: Array[Byte], k2: Array[Byte]): Boolean = k1 sameElements k2
override protected def replay: List[(Array[Byte], Float)] = {
val es = collection.mutable.Map() ++ storage.zrangeWithScore(uuid, 0, -1).map { case (k, v) => (ArraySeq(k: _*), v) }
for (entry <- appendOnlyTxLog) {
(entry: @unchecked) match {
case LogEntry(v, Some(s), ADD) => es += ((ArraySeq(v: _*), s))
case LogEntry(v, _, REM) => es -= ArraySeq(v: _*)
}
}
es.toList.map { case (k, v) => (k.toArray, v) }
}
}

View file

@ -74,7 +74,7 @@ class RedisPersistentQueue(id: String) extends PersistentQueue[Array[Byte]] {
*
* @author <a href="http://debasishg.blogspot.com">Debasish Ghosh</a>
*/
class RedisPersistentSortedSet(id: String) extends PersistentSortedSet[Array[Byte]] {
class RedisPersistentSortedSet(id: String) extends PersistentSortedSetBinary {
val uuid = id
val storage = RedisStorageBackend
}

View file

@ -72,7 +72,7 @@ class SortedSetActor extends Transactor {
hackers.+(h.name.getBytes, h.zscore)
}
try {
r.foreach{ h =>
r.foreach { h =>
if (hackers.size <= 3)
throw new SetThresholdViolationException
hackers.-(h.name.getBytes)
@ -184,11 +184,10 @@ class RedisPersistentSortedSetSpec extends
val add1 = List(h5, h6)
// remove 3
val rem1 = List(h1, h3, h4)
val rem1 = List(h1, h3, h4, h5)
try {
qa !! MULTI(add1, rem1, failer)
} catch { case e: Exception => {}
}
} catch { case e: RuntimeException => {} }
(qa !! SIZE).get.asInstanceOf[Int] should equal(3)
}
}

View file

@ -0,0 +1,66 @@
package akka.persistence.redis
import org.scalatest.Spec
import org.scalatest.Assertions
import org.scalatest.matchers.ShouldMatchers
import org.scalatest.BeforeAndAfterAll
import org.scalatest.junit.JUnitRunner
import org.junit.runner.RunWith
import akka.actor.{Actor, ActorRef, Transactor}
import Actor._
/**
* A persistent actor based on Redis sortedset storage.
* <p/>
* Needs a running Redis server.
* @author <a href="http://debasishg.blogspot.com">Debasish Ghosh</a>
*/
case class Add(email: String, value: String)
case class GetAll(email: String)
class MySortedSet extends Transactor {
def receive = {
case Add(userEmail, value) => {
val registryId = "userValues:%s".format(userEmail)
val storageSet = RedisStorage.getSortedSet(registryId)
storageSet.add(value.getBytes, System.nanoTime.toFloat)
self.reply(storageSet.size)
}
case GetAll(userEmail) => {
val registryId = "userValues:%s".format(userEmail)
val storageSet = RedisStorage.getSortedSet(registryId)
self.reply(storageSet.zrange(0, -1))
}
}
}
import RedisStorageBackend._
@RunWith(classOf[JUnitRunner])
class RedisTicket513Spec extends
Spec with
ShouldMatchers with
BeforeAndAfterAll {
override def beforeAll {
flushDB
println("** destroyed database")
}
override def afterAll {
flushDB
println("** destroyed database")
}
describe("insert into user specific set") {
val a = actorOf[MySortedSet]
a.start
it("should work with transactors") {
(a !! Add("test.user@gmail.com", "foo")).get should equal(1)
(a !! Add("test.user@gmail.com", "bar")).get should equal(2)
(a !! GetAll("test.user@gmail.com")).get.asInstanceOf[List[_]].size should equal(2)
}
}
}