diff --git a/akka-persistence/akka-persistence-common/src/main/scala/Storage.scala b/akka-persistence/akka-persistence-common/src/main/scala/Storage.scala index f21490a1be..0f0eeac912 100644 --- a/akka-persistence/akka-persistence-common/src/main/scala/Storage.scala +++ b/akka-persistence/akka-persistence-common/src/main/scala/Storage.scala @@ -51,18 +51,24 @@ trait Storage { def newRef: PersistentRef[ElementType] def newQueue: PersistentQueue[ElementType] = // only implemented for redis throw new UnsupportedOperationException + def newSortedSet: PersistentSortedSet[ElementType] = // only implemented for redis + throw new UnsupportedOperationException def getMap(id: String): PersistentMap[ElementType, ElementType] def getVector(id: String): PersistentVector[ElementType] def getRef(id: String): PersistentRef[ElementType] def getQueue(id: String): PersistentQueue[ElementType] = // only implemented for redis throw new UnsupportedOperationException + def getSortedSet(id: String): PersistentSortedSet[ElementType] = // only implemented for redis + throw new UnsupportedOperationException def newMap(id: String): PersistentMap[ElementType, ElementType] def newVector(id: String): PersistentVector[ElementType] def newRef(id: String): PersistentRef[ElementType] def newQueue(id: String): PersistentQueue[ElementType] = // only implemented for redis throw new UnsupportedOperationException + def newSortedSet(id: String): PersistentSortedSet[ElementType] = // only implemented for redis + throw new UnsupportedOperationException } /** @@ -398,3 +404,119 @@ trait PersistentQueue[A] extends scala.collection.mutable.Queue[A] transaction.get.get.register(uuid, this) } } + +/** + * Implements a template for a concrete persistent transactional sorted set based storage. + *

+ * Sorting is done based on a zscore. But the computation of zscore has been kept + * outside the abstraction. + *

+ * zscore can be implemented in a variety of ways by the calling class: + *

+ * trait ZScorable {
+ *   def toZScore: Float
+ * }
+ *
+ * class Foo extends ZScorable {
+ *   //.. implemnetation
+ * }
+ * 
+ * Or we can also use views: + *
+ * class Foo {
+ *   //..
+ * }
+ * 
+ * implicit def Foo2Scorable(foo: Foo): ZScorable = new ZScorable {
+ *   def toZScore = {
+ *     //..
+ *   }
+ * }
+ * 
+ * + * and use foo.toZScore to compute the zscore and pass to the APIs. + * + * @author + */ +trait PersistentSortedSet[A] + extends Transactional + with Committable { + + protected val newElems = TransactionalState.newMap[A, Float] + protected val removedElems = TransactionalState.newVector[A] + + val storage: SortedSetStorageBackend[A] + + def commit = { + for ((element, score) <- newElems) storage.zadd(uuid, String.valueOf(score), element) + for (element <- removedElems) storage.zrem(uuid, element) + newElems.clear + removedElems.clear + } + + def +(elem: A, score: Float) = add(elem, score) + + def add(elem: A, score: Float) = { + register + newElems.put(elem, score) + } + + def -(elem: A) = remove(elem) + + def remove(elem: A) = { + register + removedElems.add(elem) + } + + private def inStorage(elem: A): Option[Float] = storage.zscore(uuid, elem) match { + case Some(s) => Some(s.toFloat) + case None => None + } + + def contains(elem: A): Boolean = { + if (newElems contains elem) true + else { + inStorage(elem) match { + case Some(f) => true + case None => false + } + } + } + + def size: Int = newElems.size + storage.zcard(uuid) - removedElems.size + + def zscore(elem: A): Float = { + if (newElems contains elem) newElems.get(elem).get + inStorage(elem) match { + case Some(f) => f + case None => + throw new Predef.NoSuchElementException(elem + " not present") + } + } + + implicit def order(x: (A, Float)) = new Ordered[(A, Float)] { + def compare(that: (A, Float)) = x._2 compare that._2 + } + + def zrange(start: Int, end: Int): List[(A, Float)] = { + // need to operate on the whole range + // get all from the underlying storage + val fromStore = storage.zrangeWithScore(uuid, 0, -1) + val ts = scala.collection.immutable.TreeSet(fromStore: _*) ++ newElems.toList + val l = ts.size + + // -1 means the last element, -2 means the second last + val s = if (start < 0) start + l else start + val e = + if (end < 0) end + l + else if (end >= l) (l - 1) + else end + // slice is open at the end, we need a closed end range + ts.elements.slice(s, e + 1).toList + } + + private def register = { + if (transaction.get.isEmpty) throw new NoTransactionInScopeException + transaction.get.get.register(uuid, this) + } +} diff --git a/akka-persistence/akka-persistence-common/src/main/scala/StorageBackend.scala b/akka-persistence/akka-persistence-common/src/main/scala/StorageBackend.scala index d909f0e4a4..ab0cfaf4d3 100644 --- a/akka-persistence/akka-persistence-common/src/main/scala/StorageBackend.scala +++ b/akka-persistence/akka-persistence-common/src/main/scala/StorageBackend.scala @@ -69,11 +69,15 @@ trait SortedSetStorageBackend[T] extends StorageBackend { // remove item from sorted set identified by name def zrem(name: String, item: T): Boolean - // cardinality of the set idnetified by name + // cardinality of the set identified by name def zcard(name: String): Int - def zscore(name: String, item: T): String + // zscore of the item from sorted set identified by name + def zscore(name: String, item: T): Option[Float] + // zrange from the sorted set identified by name def zrange(name: String, start: Int, end: Int): List[T] -} + // zrange with score from the sorted set identified by name + def zrangeWithScore(name: String, start: Int, end: Int): List[(T, Float)] +} diff --git a/akka-persistence/akka-persistence-redis/src/main/scala/RedisStorage.scala b/akka-persistence/akka-persistence-redis/src/main/scala/RedisStorage.scala index 1338a9f8d4..b8aada0572 100644 --- a/akka-persistence/akka-persistence-redis/src/main/scala/RedisStorage.scala +++ b/akka-persistence/akka-persistence-redis/src/main/scala/RedisStorage.scala @@ -15,16 +15,20 @@ object RedisStorage extends Storage { def newVector: PersistentVector[ElementType] = newVector(UUID.newUuid.toString) def newRef: PersistentRef[ElementType] = newRef(UUID.newUuid.toString) override def newQueue: PersistentQueue[ElementType] = newQueue(UUID.newUuid.toString) + override def newSortedSet: PersistentSortedSet[ElementType] = newSortedSet(UUID.newUuid.toString) def getMap(id: String): PersistentMap[ElementType, ElementType] = newMap(id) def getVector(id: String): PersistentVector[ElementType] = newVector(id) def getRef(id: String): PersistentRef[ElementType] = newRef(id) override def getQueue(id: String): PersistentQueue[ElementType] = newQueue(id) + override def getSortedSet(id: String): PersistentSortedSet[ElementType] = newSortedSet(id) def newMap(id: String): PersistentMap[ElementType, ElementType] = new RedisPersistentMap(id) def newVector(id: String): PersistentVector[ElementType] = new RedisPersistentVector(id) def newRef(id: String): PersistentRef[ElementType] = new RedisPersistentRef(id) override def newQueue(id: String): PersistentQueue[ElementType] = new RedisPersistentQueue(id) + override def newSortedSet(id: String): PersistentSortedSet[ElementType] = + new RedisPersistentSortedSet(id) } /** @@ -63,3 +67,14 @@ class RedisPersistentQueue(id: String) extends PersistentQueue[Array[Byte]] { val uuid = id val storage = RedisStorageBackend } + +/** + * Implements a persistent transactional sorted set based on the Redis + * storage. + * + * @author Debasish Ghosh + */ +class RedisPersistentSortedSet(id: String) extends PersistentSortedSet[Array[Byte]] { + val uuid = id + val storage = RedisStorageBackend +} diff --git a/akka-persistence/akka-persistence-redis/src/main/scala/RedisStorageBackend.scala b/akka-persistence/akka-persistence-redis/src/main/scala/RedisStorageBackend.scala index 8bca9c6af6..c48c84fa39 100644 --- a/akka-persistence/akka-persistence-redis/src/main/scala/RedisStorageBackend.scala +++ b/akka-persistence/akka-persistence-redis/src/main/scala/RedisStorageBackend.scala @@ -364,11 +364,10 @@ private [akka] object RedisStorageBackend extends } } - def zscore(name: String, item: Array[Byte]): String = withErrorHandling { + def zscore(name: String, item: Array[Byte]): Option[Float] = withErrorHandling { db.zscore(new String(encode(name.getBytes)), new String(item)) match { - case None => - throw new Predef.NoSuchElementException(new String(item) + " not present") - case Some(s) => s + case Some(s) => Some(s.toFloat) + case None => None } } @@ -380,6 +379,16 @@ private [akka] object RedisStorageBackend extends s.map(_.get.getBytes) } } + + def zrangeWithScore(name: String, start: Int, end: Int): List[(Array[Byte], Float)] = withErrorHandling { + db.zrangeWithScore( + new String(encode(name.getBytes)), start.toString, end.toString, RedisClient.ASC) match { + case None => + throw new Predef.NoSuchElementException(name + " not present") + case Some(l) => + l.map{ case (elem, score) => (elem.get.getBytes, score.get.toFloat) } + } + } def flushDB = withErrorHandling(db.flushDb) diff --git a/akka-persistence/akka-persistence-redis/src/test/scala/RedisPersistentActorSpec.scala b/akka-persistence/akka-persistence-redis/src/test/scala/RedisPersistentActorSpec.scala index 9405789bfd..41ee6fb909 100644 --- a/akka-persistence/akka-persistence-redis/src/test/scala/RedisPersistentActorSpec.scala +++ b/akka-persistence/akka-persistence-redis/src/test/scala/RedisPersistentActorSpec.scala @@ -87,7 +87,7 @@ class AccountActor extends Transactor { } @serializable class PersistentFailerActor extends Transactor { - //timeout = 5000 + // timeout = 5000 def receive = { case "Failure" => throw new RuntimeException("expected") diff --git a/akka-persistence/akka-persistence-redis/src/test/scala/RedisPersistentSortedSetSpec.scala b/akka-persistence/akka-persistence-redis/src/test/scala/RedisPersistentSortedSetSpec.scala new file mode 100644 index 0000000000..a2abb2cd40 --- /dev/null +++ b/akka-persistence/akka-persistence-redis/src/test/scala/RedisPersistentSortedSetSpec.scala @@ -0,0 +1,237 @@ +package se.scalablesolutions.akka.persistence.redis + +import org.scalatest.Spec +import org.scalatest.Assertions +import org.scalatest.matchers.ShouldMatchers +import org.scalatest.BeforeAndAfterAll +import org.scalatest.junit.JUnitRunner +import org.junit.runner.RunWith + +import se.scalablesolutions.akka.actor.{Actor, Transactor} + +/** + * A persistent actor based on Redis sortedset storage. + *

+ * Needs a running Redis server. + * @author Debasish Ghosh + */ + +trait ZScorable { + def zscore: Float +} + +case class Hacker(name: String, birth: String) extends ZScorable { + def zscore = birth.toFloat +} + +class SetThresholdViolationException extends RuntimeException + +// add hacker to the set +case class ADD(h: Hacker) + +// remove hacker from set +case class REMOVE(h: Hacker) + +// size of the set +case object SIZE + +// zscore of the hacker +case class SCORE(h: Hacker) + +// zrange +case class RANGE(start: Int, end: Int) + +// add and remove subject to the condition that there will be at least 3 hackers +case class MULTI(add: List[Hacker], rem: List[Hacker], failer: Actor) + +class SortedSetActor extends Transactor { + timeout = 100000 + private lazy val hackers = RedisStorage.newSortedSet + + def receive = { + case ADD(h) => + hackers.+(h.name.getBytes, h.zscore) + reply(true) + + case REMOVE(h) => + hackers.-(h.name.getBytes) + reply(true) + + case SIZE => + reply(hackers.size) + + case SCORE(h) => + reply(hackers.zscore(h.name.getBytes)) + + case RANGE(s, e) => + reply(hackers.zrange(s, e)) + + case MULTI(a, r, failer) => + a.foreach{ h: Hacker => + hackers.+(h.name.getBytes, h.zscore) + } + try { + r.foreach{ h => + if (hackers.size <= 3) + throw new SetThresholdViolationException + hackers.-(h.name.getBytes) + } + } catch { + case e: Exception => + failer !! "Failure" + } + reply((a.size, r.size)) + } +} + +import RedisStorageBackend._ + +@RunWith(classOf[JUnitRunner]) +class RedisPersistentSortedSetSpec extends + Spec with + ShouldMatchers with + BeforeAndAfterAll { + + override def beforeAll { + flushDB + println("** destroyed database") + } + + override def afterAll { + flushDB + println("** destroyed database") + } + + val h1 = Hacker("Alan kay", "1940") + val h2 = Hacker("Richard Stallman", "1953") + val h3 = Hacker("Yukihiro Matsumoto", "1965") + val h4 = Hacker("Claude Shannon", "1916") + val h5 = Hacker("Linus Torvalds", "1969") + val h6 = Hacker("Alan Turing", "1912") + + describe("Add and report cardinality of the set") { + val qa = new SortedSetActor + qa.start + + it("should enter 6 hackers") { + qa !! ADD(h1) + qa !! ADD(h2) + qa !! ADD(h3) + qa !! ADD(h4) + qa !! ADD(h5) + qa !! ADD(h6) + (qa !! SIZE).get.asInstanceOf[Int] should equal(6) + } + + it("should fetch correct scores for hackers") { + (qa !! SCORE(h1)).get.asInstanceOf[Float] should equal(1940.0f) + (qa !! SCORE(h5)).get.asInstanceOf[Float] should equal(1969.0f) + (qa !! SCORE(h6)).get.asInstanceOf[Float] should equal(1912.0f) + } + + it("should fetch proper range") { + (qa !! RANGE(0, 4)).get.asInstanceOf[List[_]].size should equal(5) + (qa !! RANGE(0, 6)).get.asInstanceOf[List[_]].size should equal(6) + } + + it("should remove and throw exception for removing non-existent hackers") { + qa !! REMOVE(h2) + (qa !! SIZE).get.asInstanceOf[Int] should equal(5) + qa !! REMOVE(h3) + (qa !! SIZE).get.asInstanceOf[Int] should equal(4) + val h7 = Hacker("Paul Snively", "1952") + try { + qa !! REMOVE(h7) + } + catch { + case e: Predef.NoSuchElementException => + e.getMessage should endWith("not present") + } + } + + it("should change score for entering the same hacker name with diff score") { + (qa !! SIZE).get.asInstanceOf[Int] should equal(4) + + // same name as h6 + val h7 = Hacker("Alan Turing", "1992") + qa !! ADD(h7) + + // size remains same + (qa !! SIZE).get.asInstanceOf[Int] should equal(4) + + // score updated + (qa !! SCORE(h7)).get.asInstanceOf[Float] should equal(1992.0f) + } + } + + describe("Transaction semantics") { + it("should rollback on exception") { + val qa = new SortedSetActor + qa.start + + val failer = new PersistentFailerActor + failer.start + + (qa !! SIZE).get.asInstanceOf[Int] should equal(0) + val add = List(h1, h2, h3, h4) + val rem = List(h2) + (qa !! MULTI(add, rem, failer)).get.asInstanceOf[Tuple2[Int, Int]] should equal((4,1)) + (qa !! SIZE).get.asInstanceOf[Int] should equal(3) + // size == 3 + + // add 2 more + val add1 = List(h5, h6) + + // remove 3 + val rem1 = List(h1, h3, h4) + try { + qa !! MULTI(add1, rem1, failer) + } catch { case e: Exception => {} + } + (qa !! SIZE).get.asInstanceOf[Int] should equal(3) + } + } + + describe("zrange") { + it ("should report proper range") { + val qa = new SortedSetActor + qa.start + qa !! ADD(h1) + qa !! ADD(h2) + qa !! ADD(h3) + qa !! ADD(h4) + qa !! ADD(h5) + qa !! ADD(h6) + (qa !! SIZE).get.asInstanceOf[Int] should equal(6) + val l = (qa !! RANGE(0, 6)).get.asInstanceOf[List[(Array[Byte], Float)]] + l.map { case (e, s) => (new String(e), s) }.head should equal(("Alan Turing", 1912.0f)) + val h7 = Hacker("Alan Turing", "1992") + qa !! ADD(h7) + (qa !! SIZE).get.asInstanceOf[Int] should equal(6) + val m = (qa !! RANGE(0, 6)).get.asInstanceOf[List[(Array[Byte], Float)]] + m.map { case (e, s) => (new String(e), s) }.head should equal(("Claude Shannon", 1916.0f)) + } + + it ("should report proper rge") { + val qa = new SortedSetActor + qa.start + qa !! ADD(h1) + qa !! ADD(h2) + qa !! ADD(h3) + qa !! ADD(h4) + qa !! ADD(h5) + qa !! ADD(h6) + (qa !! SIZE).get.asInstanceOf[Int] should equal(6) + (qa !! RANGE(0, 5)).get.asInstanceOf[List[_]].size should equal(6) + (qa !! RANGE(0, 6)).get.asInstanceOf[List[_]].size should equal(6) + (qa !! RANGE(0, 3)).get.asInstanceOf[List[_]].size should equal(4) + (qa !! RANGE(0, 1)).get.asInstanceOf[List[_]].size should equal(2) + (qa !! RANGE(0, 0)).get.asInstanceOf[List[_]].size should equal(1) + (qa !! RANGE(3, 1)).get.asInstanceOf[List[_]].size should equal(0) + (qa !! RANGE(0, -1)).get.asInstanceOf[List[_]].size should equal(6) + (qa !! RANGE(0, -2)).get.asInstanceOf[List[_]].size should equal(5) + (qa !! RANGE(0, -4)).get.asInstanceOf[List[_]].size should equal(3) + (qa !! RANGE(-4, -1)).get.asInstanceOf[List[_]].size should equal(4) + } + } +} diff --git a/akka-persistence/akka-persistence-redis/src/test/scala/RedisStorageBackendSpec.scala b/akka-persistence/akka-persistence-redis/src/test/scala/RedisStorageBackendSpec.scala index cfe704c6ba..44081a43c6 100644 --- a/akka-persistence/akka-persistence-redis/src/test/scala/RedisStorageBackendSpec.scala +++ b/akka-persistence/akka-persistence-redis/src/test/scala/RedisStorageBackendSpec.scala @@ -191,10 +191,10 @@ class RedisStorageBackendSpec extends zcard("hackers") should equal(6) - zscore("hackers", "alan turing".getBytes) should equal("1912") - zscore("hackers", "richard stallman".getBytes) should equal("1953") - zscore("hackers", "claude shannon".getBytes) should equal("1916") - zscore("hackers", "linus torvalds".getBytes) should equal("1969") + zscore("hackers", "alan turing".getBytes).get should equal(1912.0f) + zscore("hackers", "richard stallman".getBytes).get should equal(1953.0f) + zscore("hackers", "claude shannon".getBytes).get should equal(1916.0f) + zscore("hackers", "linus torvalds".getBytes).get should equal(1969.0f) val s: List[Array[Byte]] = zrange("hackers", 0, 2) s.size should equal(3) @@ -206,6 +206,10 @@ class RedisStorageBackendSpec extends val t: List[Array[Byte]] = zrange("hackers", 0, -1) t.size should equal(6) t.map(new String(_)) should equal(sorted) + + val u: List[(Array[Byte], Float)] = zrangeWithScore("hackers", 0, -1) + u.size should equal(6) + u.map{ case (e, s) => new String(e) } should equal(sorted) } } } diff --git a/embedded-repo/com/redis/redisclient/1.1/redisclient-1.1.jar b/embedded-repo/com/redis/redisclient/1.1/redisclient-1.1.jar deleted file mode 100644 index a269f15f7a..0000000000 Binary files a/embedded-repo/com/redis/redisclient/1.1/redisclient-1.1.jar and /dev/null differ diff --git a/embedded-repo/com/redis/redisclient/1.1/redisclient-1.1.pom b/embedded-repo/com/redis/redisclient/1.1/redisclient-1.1.pom deleted file mode 100755 index 16dd81402a..0000000000 --- a/embedded-repo/com/redis/redisclient/1.1/redisclient-1.1.pom +++ /dev/null @@ -1,8 +0,0 @@ - - - 4.0.0 - com.redis - redisclient - 1.1 - jar - diff --git a/project/build/AkkaProject.scala b/project/build/AkkaProject.scala index cd46dbea5c..783a8d010c 100644 --- a/project/build/AkkaProject.scala +++ b/project/build/AkkaProject.scala @@ -240,7 +240,7 @@ class AkkaParent(info: ProjectInfo) extends DefaultProject(info) { } class AkkaRedisProject(info: ProjectInfo) extends DefaultProject(info) { - val redis = "com.redis" % "redisclient" % "1.1" % "compile" + val redis = "com.redis" % "redisclient" % "1.2-SNAPSHOT" % "compile" override def testOptions = TestFilter((name: String) => name.endsWith("Test")) :: Nil lazy val dist = deployTask(info, distPath) dependsOn(`package`) describedAs("Deploying") }