From 6c200163cfe2f4485a54ebff15ae9acf93d3e161 Mon Sep 17 00:00:00 2001 From: Debasish Ghosh Date: Thu, 11 Nov 2010 16:18:16 +0530 Subject: [PATCH] Fix for Ticket 513 : Implement snapshot based persistence control in SortedSet --- .../src/main/scala/Storage.scala | 105 +++++++++++------- .../src/main/scala/RedisStorage.scala | 2 +- .../scala/RedisPersistentSortedSetSpec.scala | 7 +- .../src/test/scala/RedisTicket513Spec.scala | 66 +++++++++++ 4 files changed, 135 insertions(+), 45 deletions(-) create mode 100644 akka-persistence/akka-persistence-redis/src/test/scala/RedisTicket513Spec.scala diff --git a/akka-persistence/akka-persistence-common/src/main/scala/Storage.scala b/akka-persistence/akka-persistence-common/src/main/scala/Storage.scala index 7d74496936..4e7f1d1cfe 100644 --- a/akka-persistence/akka-persistence-common/src/main/scala/Storage.scala +++ b/akka-persistence/akka-persistence-common/src/main/scala/Storage.scala @@ -700,6 +700,13 @@ trait PersistentQueue[A] extends scala.collection.mutable.Queue[A] } } +private[akka] object PersistentSortedSet { + // operations on the SortedSet + sealed trait Op + case object ADD extends Op + case object REM extends Op +} + /** * Implements a template for a concrete persistent transactional sorted set based storage. *

@@ -734,61 +741,45 @@ trait PersistentQueue[A] extends scala.collection.mutable.Queue[A] * @author */ trait PersistentSortedSet[A] extends Transactional with Committable with Abortable { - protected val newElems = TransactionalMap[A, Float]() - protected val removedElems = TransactionalVector[A]() + //Import Ops + import PersistentSortedSet._ + + // append only log: records all mutating operations + protected val appendOnlyTxLog = TransactionalVector[LogEntry]() + + // need to override in subclasses e.g. "sameElements" for Array[Byte] + def equal(v1: A, v2: A): Boolean = v1 == v2 + + case class LogEntry(value: A, score: Option[Float], op: Op) val storage: SortedSetStorageBackend[A] def commit = { - for ((element, score) <- newElems) storage.zadd(uuid, String.valueOf(score), element) - for (element <- removedElems) storage.zrem(uuid, element) - newElems.clear - removedElems.clear + for (entry <- appendOnlyTxLog) { + (entry: @unchecked) match { + case LogEntry(e, Some(s), ADD) => storage.zadd(uuid, String.valueOf(s), e) + case LogEntry(e, _, REM) => storage.zrem(uuid, e) + } + } + appendOnlyTxLog.clear } def abort = { - newElems.clear - removedElems.clear + appendOnlyTxLog.clear } def +(elem: A, score: Float) = add(elem, score) def add(elem: A, score: Float) = { register - newElems.put(elem, score) + appendOnlyTxLog.add(LogEntry(elem, Some(score), ADD)) } def -(elem: A) = remove(elem) def remove(elem: A) = { register - removedElems.add(elem) - } - - private def inStorage(elem: A): Option[Float] = storage.zscore(uuid, elem) match { - case Some(s) => Some(s.toFloat) - case None => None - } - - def contains(elem: A): Boolean = { - if (newElems contains elem) true - else { - inStorage(elem) match { - case Some(f) => true - case None => false - } - } - } - - def size: Int = newElems.size + storage.zcard(uuid) - removedElems.size - - def zscore(elem: A): Float = { - if (newElems contains elem) newElems.get(elem).get - inStorage(elem) match { - case Some(f) => f - case None => - throw new NoSuchElementException(elem + " not present") - } + appendOnlyTxLog.add(LogEntry(elem, None, REM)) } implicit def order(x: (A, Float)) = new Ordered[(A, Float)] { @@ -799,11 +790,27 @@ trait PersistentSortedSet[A] extends Transactional with Committable with Abortab def compare(x: (A, Float), y: (A, Float)) = x._2 compare y._2 } + protected def replay: List[(A, Float)] = { + val es = collection.mutable.Map() ++ storage.zrangeWithScore(uuid, 0, -1) + + for (entry <- appendOnlyTxLog) { + (entry: @unchecked) match { + case LogEntry(v, Some(s), ADD) => es += ((v, s)) + case LogEntry(v, _, REM) => es -= v + } + } + es.toList + } + + def contains(elem: A): Boolean = replay.map(_._1).contains(elem) + + def size: Int = replay size + + def zscore(elem: A): Float = replay.filter { case (e, s) => equal(e, elem) }.map(_._2).head + def zrange(start: Int, end: Int): List[(A, Float)] = { - // need to operate on the whole range - // get all from the underlying storage - val fromStore = storage.zrangeWithScore(uuid, 0, -1) - val ts = scala.collection.immutable.TreeSet(fromStore: _*) ++ newElems.toList + import PersistentSortedSet._ + val ts = collection.immutable.TreeSet(replay: _*) val l = ts.size // -1 means the last element, -2 means the second last @@ -821,3 +828,21 @@ trait PersistentSortedSet[A] extends Transactional with Committable with Abortab transaction.get.get.register(uuid, this) } } + +trait PersistentSortedSetBinary extends PersistentSortedSet[Array[Byte]] { + import PersistentSortedSet._ + + override def equal(k1: Array[Byte], k2: Array[Byte]): Boolean = k1 sameElements k2 + + override protected def replay: List[(Array[Byte], Float)] = { + val es = collection.mutable.Map() ++ storage.zrangeWithScore(uuid, 0, -1).map { case (k, v) => (ArraySeq(k: _*), v) } + + for (entry <- appendOnlyTxLog) { + (entry: @unchecked) match { + case LogEntry(v, Some(s), ADD) => es += ((ArraySeq(v: _*), s)) + case LogEntry(v, _, REM) => es -= ArraySeq(v: _*) + } + } + es.toList.map { case (k, v) => (k.toArray, v) } + } +} diff --git a/akka-persistence/akka-persistence-redis/src/main/scala/RedisStorage.scala b/akka-persistence/akka-persistence-redis/src/main/scala/RedisStorage.scala index 9dfc37770a..591d337af9 100644 --- a/akka-persistence/akka-persistence-redis/src/main/scala/RedisStorage.scala +++ b/akka-persistence/akka-persistence-redis/src/main/scala/RedisStorage.scala @@ -74,7 +74,7 @@ class RedisPersistentQueue(id: String) extends PersistentQueue[Array[Byte]] { * * @author Debasish Ghosh */ -class RedisPersistentSortedSet(id: String) extends PersistentSortedSet[Array[Byte]] { +class RedisPersistentSortedSet(id: String) extends PersistentSortedSetBinary { val uuid = id val storage = RedisStorageBackend } diff --git a/akka-persistence/akka-persistence-redis/src/test/scala/RedisPersistentSortedSetSpec.scala b/akka-persistence/akka-persistence-redis/src/test/scala/RedisPersistentSortedSetSpec.scala index 8e25dbf4d6..02e3c03bab 100644 --- a/akka-persistence/akka-persistence-redis/src/test/scala/RedisPersistentSortedSetSpec.scala +++ b/akka-persistence/akka-persistence-redis/src/test/scala/RedisPersistentSortedSetSpec.scala @@ -72,7 +72,7 @@ class SortedSetActor extends Transactor { hackers.+(h.name.getBytes, h.zscore) } try { - r.foreach{ h => + r.foreach { h => if (hackers.size <= 3) throw new SetThresholdViolationException hackers.-(h.name.getBytes) @@ -184,11 +184,10 @@ class RedisPersistentSortedSetSpec extends val add1 = List(h5, h6) // remove 3 - val rem1 = List(h1, h3, h4) + val rem1 = List(h1, h3, h4, h5) try { qa !! MULTI(add1, rem1, failer) - } catch { case e: Exception => {} - } + } catch { case e: RuntimeException => {} } (qa !! SIZE).get.asInstanceOf[Int] should equal(3) } } diff --git a/akka-persistence/akka-persistence-redis/src/test/scala/RedisTicket513Spec.scala b/akka-persistence/akka-persistence-redis/src/test/scala/RedisTicket513Spec.scala new file mode 100644 index 0000000000..66e08f2c98 --- /dev/null +++ b/akka-persistence/akka-persistence-redis/src/test/scala/RedisTicket513Spec.scala @@ -0,0 +1,66 @@ +package akka.persistence.redis + +import org.scalatest.Spec +import org.scalatest.Assertions +import org.scalatest.matchers.ShouldMatchers +import org.scalatest.BeforeAndAfterAll +import org.scalatest.junit.JUnitRunner +import org.junit.runner.RunWith + +import akka.actor.{Actor, ActorRef, Transactor} +import Actor._ + +/** + * A persistent actor based on Redis sortedset storage. + *

+ * Needs a running Redis server. + * @author Debasish Ghosh + */ + +case class Add(email: String, value: String) +case class GetAll(email: String) + +class MySortedSet extends Transactor { + def receive = { + case Add(userEmail, value) => { + val registryId = "userValues:%s".format(userEmail) + val storageSet = RedisStorage.getSortedSet(registryId) + storageSet.add(value.getBytes, System.nanoTime.toFloat) + self.reply(storageSet.size) + } + case GetAll(userEmail) => { + val registryId = "userValues:%s".format(userEmail) + val storageSet = RedisStorage.getSortedSet(registryId) + self.reply(storageSet.zrange(0, -1)) + } + } +} + +import RedisStorageBackend._ + +@RunWith(classOf[JUnitRunner]) +class RedisTicket513Spec extends + Spec with + ShouldMatchers with + BeforeAndAfterAll { + + override def beforeAll { + flushDB + println("** destroyed database") + } + + override def afterAll { + flushDB + println("** destroyed database") + } + + describe("insert into user specific set") { + val a = actorOf[MySortedSet] + a.start + it("should work with transactors") { + (a !! Add("test.user@gmail.com", "foo")).get should equal(1) + (a !! Add("test.user@gmail.com", "bar")).get should equal(2) + (a !! GetAll("test.user@gmail.com")).get.asInstanceOf[List[_]].size should equal(2) + } + } +}