diff --git a/akka-persistence/akka-persistence-common/src/main/scala/Storage.scala b/akka-persistence/akka-persistence-common/src/main/scala/Storage.scala index cac0004879..78cdd5c207 100644 --- a/akka-persistence/akka-persistence-common/src/main/scala/Storage.scala +++ b/akka-persistence/akka-persistence-common/src/main/scala/Storage.scala @@ -40,6 +40,7 @@ class NoTransactionInScopeException extends RuntimeException * * * @author Jonas Bonér + * @author Debasish Ghosh */ trait Storage { type ElementType @@ -47,14 +48,17 @@ trait Storage { def newMap: PersistentMap[ElementType, ElementType] def newVector: PersistentVector[ElementType] def newRef: PersistentRef[ElementType] + def newQueue: PersistentQueue[ElementType] def getMap(id: String): PersistentMap[ElementType, ElementType] def getVector(id: String): PersistentVector[ElementType] def getRef(id: String): PersistentRef[ElementType] + def getQueue(id: String): PersistentQueue[ElementType] def newMap(id: String): PersistentMap[ElementType, ElementType] def newVector(id: String): PersistentVector[ElementType] def newRef(id: String): PersistentRef[ElementType] + def newQueue(id: String): PersistentQueue[ElementType] } @@ -269,3 +273,128 @@ trait PersistentRef[T] extends Transactional with Committable { currentTransaction.get.get.register(uuid, this) } } + +/** + * Implementation of PersistentQueue for every concrete + * storage will have the same workflow. This abstracts the workflow. + *
+ * Enqueue is simpler, we just have to record the operation in a local + * transactional store for playback during commit. This store + * enqueueNDequeuedEntries stores the entire history of enqueue + * and dequeue that will be played at commit on the underlying store. + * + * The main challenge with dequeue is that we need to return the element + * that has been dequeued. Hence in addition to the above store, we need to + * have another local queue that actually does the enqueue dequeue operations + * that take place only during this transaction. This gives us the + * element that will be dequeued next from the set of elements enqueued + * during this transaction. + * + * The third item that we need is an index to the underlying storage element + * that may also have to be dequeued as part of the current transaction. This + * is modeled using a ref to an Int that points to elements in the underlyinng store. + * + * Subclasses just need to provide the actual concrete instance for the + * abstract val storage. + * + * @author Debasish Ghosh + */ +trait PersistentQueue[A] extends scala.collection.mutable.Queue[A] + with Transactional with Committable with Logging { + + abstract case class QueueOp + case object ENQ extends QueueOp + case object DEQ extends QueueOp + + import scala.collection.immutable.Queue + + // current trail that will be played on commit to the underlying store + protected val enqueuedNDequeuedEntries = TransactionalState.newVector[(Option[A], QueueOp)] + protected val shouldClearOnCommit = TransactionalRef[Boolean]() + + // local queue that will record all enqueues and dequeues in the current txn + protected val localQ = TransactionalRef[Queue[A]]() + + // keeps a pointer to the underlying storage for the enxt candidate to be dequeued + protected val pickMeForDQ = TransactionalRef[Int]() + + localQ.swap(Queue.Empty) + pickMeForDQ.swap(0) + + // to be concretized in subclasses + val storage: QueueStorageBackend[A] + + def commit = { + enqueuedNDequeuedEntries.toList.foreach { e => + e._2 match { + case ENQ => storage.enqueue(uuid, e._1.get) + case DEQ => storage.dequeue(uuid) + } + } + if (shouldClearOnCommit.isDefined && shouldClearOnCommit.get.get) { + storage.remove(uuid) + } + enqueuedNDequeuedEntries.clear + localQ.swap(Queue.Empty) + pickMeForDQ.swap(0) + } + + override def enqueue(elems: A*) { + register + elems.foreach(e => { + enqueuedNDequeuedEntries.add((Some(e), ENQ)) + localQ.get.get.enqueue(e) + }) + } + + override def dequeue: A = { + register + // record for later playback + enqueuedNDequeuedEntries.add((None, DEQ)) + + val i = pickMeForDQ.get.get + if (i < storage.size(uuid)) { + // still we can DQ from storage + pickMeForDQ.swap(i + 1) + storage.peek(uuid, i, 1)(0) + } else { + // check we have transient candidates in localQ for DQ + if (localQ.get.get.isEmpty == false) { + val (a, q) = localQ.get.get.dequeue + localQ.swap(q) + a + } + else + throw new NoSuchElementException("trying to dequeue from empty queue") + } + } + + override def clear = { + register + shouldClearOnCommit.swap(true) + localQ.swap(Queue.Empty) + pickMeForDQ.swap(0) + } + + override def size: Int = try { + storage.size(uuid) + localQ.get.get.length + } catch { case e: Exception => 0 } + + override def isEmpty: Boolean = + size == 0 + + override def +=(elem: A): Unit = enqueue(elem) + override def ++=(elems: Iterator[A]): Unit = enqueue(elems.toList: _*) + override def ++=(elems: Iterable[A]): Unit = this ++= elems.elements + + override def dequeueFirst(p: A => Boolean): Option[A] = + throw new UnsupportedOperationException("dequeueFirst not supported") + + override def dequeueAll(p: A => Boolean): Seq[A] = + throw new UnsupportedOperationException("dequeueAll not supported") + + private def register = { + if (currentTransaction.get.isEmpty) throw new NoTransactionInScopeException + currentTransaction.get.get.register(uuid, this) + } +} diff --git a/akka-persistence/akka-persistence-common/src/main/scala/StorageBackend.scala b/akka-persistence/akka-persistence-common/src/main/scala/StorageBackend.scala index e616b0599a..94233acd0a 100644 --- a/akka-persistence/akka-persistence-common/src/main/scala/StorageBackend.scala +++ b/akka-persistence/akka-persistence-common/src/main/scala/StorageBackend.scala @@ -34,3 +34,38 @@ trait RefStorageBackend[T] extends StorageBackend { def insertRefStorageFor(name: String, element: T) def getRefStorageFor(name: String): Option[T] } + +// for Queue +trait QueueStorageBackend[T] extends StorageBackend { + // add to the end of the queue + def enqueue(name: String, item: T): Boolean + + // pop from the front of the queue + def dequeue(name: String): Option[T] + + // get the size of the queue + def size(name: String): Int + + // return an array of items currently stored in the queue + // start is the item to begin, count is how many items to return + def peek(name: String, start: Int, count: Int): List[T] + + // completely delete the queue + def remove(name: String): Boolean +} + +trait SortedSetStorageBackend[T] extends StorageBackend { + // add item to sorted set identified by name + def zadd(name: String, zscore: String, item: T): Boolean + + // remove item from sorted set identified by name + def zrem(name: String, item: T): Boolean + + // cardinality of the set idnetified by name + def zcard(name: String): Int + + def zscore(name: String, item: T): String + + def zrange(name: String, start: Int, end: Int): List[T] +} + diff --git a/akka-persistence/akka-persistence-redis/src/main/scala/RedisStorage.scala b/akka-persistence/akka-persistence-redis/src/main/scala/RedisStorage.scala index 0eba6dbb62..886245d147 100644 --- a/akka-persistence/akka-persistence-redis/src/main/scala/RedisStorage.scala +++ b/akka-persistence/akka-persistence-redis/src/main/scala/RedisStorage.scala @@ -12,18 +12,21 @@ object RedisStorage extends Storage { def newMap: PersistentMap[ElementType, ElementType] = newMap(Uuid.newUuid.toString) def newVector: PersistentVector[ElementType] = newVector(Uuid.newUuid.toString) def newRef: PersistentRef[ElementType] = newRef(Uuid.newUuid.toString) + def newQueue: PersistentQueue[ElementType] = newQueue(Uuid.newUuid.toString) def getMap(id: String): PersistentMap[ElementType, ElementType] = newMap(id) def getVector(id: String): PersistentVector[ElementType] = newVector(id) def getRef(id: String): PersistentRef[ElementType] = newRef(id) + def getQueue(id: String): PersistentQueue[ElementType] = newQueue(id) def newMap(id: String): PersistentMap[ElementType, ElementType] = new RedisPersistentMap(id) def newVector(id: String): PersistentVector[ElementType] = new RedisPersistentVector(id) def newRef(id: String): PersistentRef[ElementType] = new RedisPersistentRef(id) + def newQueue(id: String): PersistentQueue[ElementType] = new RedisPersistentQueue(id) } /** - * Implements a persistent transactional map based on the MongoDB document storage. + * Implements a persistent transactional map based on the Redis storage. * * @author Debasish Ghosh */ @@ -34,7 +37,7 @@ class RedisPersistentMap(id: String) extends PersistentMap[Array[Byte], Array[By /** * Implements a persistent transactional vector based on the Redis - * document storage. + * storage. * * @author Debasish Ghosh */ @@ -47,3 +50,14 @@ class RedisPersistentRef(id: String) extends PersistentRef[Array[Byte]] { val uuid = id val storage = RedisStorageBackend } + +/** + * Implements a persistent transactional queue based on the Redis + * storage. + * + * @author Debasish Ghosh + */ +class RedisPersistentQueue(id: String) extends PersistentQueue[Array[Byte]] { + val uuid = id + val storage = RedisStorageBackend +} diff --git a/akka-persistence/akka-persistence-redis/src/main/scala/RedisStorageBackend.scala b/akka-persistence/akka-persistence-redis/src/main/scala/RedisStorageBackend.scala index 802319cfad..00a44d0513 100644 --- a/akka-persistence/akka-persistence-redis/src/main/scala/RedisStorageBackend.scala +++ b/akka-persistence/akka-persistence-redis/src/main/scala/RedisStorageBackend.scala @@ -38,6 +38,8 @@ private [akka] object RedisStorageBackend extends MapStorageBackend[Array[Byte], Array[Byte]] with VectorStorageBackend[Array[Byte]] with RefStorageBackend[Array[Byte]] with + QueueStorageBackend[Array[Byte]] with + SortedSetStorageBackend[Array[Byte]] with Logging { val REDIS_SERVER_HOSTNAME = config.getString("akka.storage.redis.hostname", "127.0.0.1") @@ -246,6 +248,91 @@ private [akka] object RedisStorageBackend extends case Some(s) => Some(s.getBytes) } } + + // add to the end of the queue + def enqueue(name: String, item: Array[Byte]): Boolean = { + db.pushTail(new String(encode(name.getBytes)), new String(item)) + } + + // pop from the front of the queue + def dequeue(name: String): Option[Array[Byte]] = { + db.popHead(new String(encode(name.getBytes))) match { + case None => + throw new Predef.NoSuchElementException(name + " not present") + case Some(s) => + Some(s.getBytes) + } + } + + // get the size of the queue + def size(name: String): Int = { + db.listLength(new String(encode(name.getBytes))) match { + case None => + throw new Predef.NoSuchElementException(name + " not present") + case Some(l) => l + } + } + + // return an array of items currently stored in the queue + // start is the item to begin, count is how many items to return + def peek(name: String, start: Int, count: Int): List[Array[Byte]] = count match { + case 1 => + db.listIndex(new String(encode(name.getBytes)), start) match { + case None => + throw new Predef.NoSuchElementException("No element at " + start) + case Some(s) => + List(s.getBytes) + } + case n => + db.listRange(new String(encode(name.getBytes)), start, start + count - 1) match { + case None => + throw new Predef.NoSuchElementException( + "No element found between " + start + " and " + (start + count - 1)) + case Some(es) => + es.map(_.getBytes) + } + } + + // completely delete the queue + def remove(name: String): Boolean = { + db.delete(new String(encode(name.getBytes))) + } + + // add item to sorted set identified by name + def zadd(name: String, zscore: String, item: Array[Byte]): Boolean = { + db.zAdd(new String(encode(name.getBytes)), zscore, new String(item)) + } + + // remove item from sorted set identified by name + def zrem(name: String, item: Array[Byte]): Boolean = { + db.zRem(new String(encode(name.getBytes)), new String(item)) + } + + // cardinality of the set identified by name + def zcard(name: String): Int = { + db.zCard(new String(encode(name.getBytes))) match { + case None => + throw new Predef.NoSuchElementException(name + " not present") + case Some(l) => l + } + } + + def zscore(name: String, item: Array[Byte]): String = { + db.zScore(new String(encode(name.getBytes)), new String(item)) match { + case None => + throw new Predef.NoSuchElementException(new String(item) + " not present") + case Some(s) => s + } + } + + def zrange(name: String, start: Int, end: Int): List[Array[Byte]] = { + db.zRange(new String(encode(name.getBytes)), start.toString, end.toString, SocketOperations.ASC, false) match { + case None => + throw new Predef.NoSuchElementException(name + " not present") + case Some(s) => + s.map(_.getBytes) + } + } def flushDB = db.flushDb } diff --git a/akka-persistence/akka-persistence-redis/src/test/scala/RedisPersistentQSpec.scala b/akka-persistence/akka-persistence-redis/src/test/scala/RedisPersistentQSpec.scala new file mode 100644 index 0000000000..2adc980aad --- /dev/null +++ b/akka-persistence/akka-persistence-redis/src/test/scala/RedisPersistentQSpec.scala @@ -0,0 +1,133 @@ +package se.scalablesolutions.akka.state + +import junit.framework.TestCase + +import org.junit.{Test, Before} +import org.junit.Assert._ + +import se.scalablesolutions.akka.actor.{Actor, Transactor} + +/** + * A persistent actor based on Redis queue storage. + * + * Needs a running Redis server. + * @author Debasish Ghosh + */ + +case class NQ(accountNo: String) +case class DQ +case class MNDQ(accountNos: List[String], noOfDQs: Int, failer: Actor) +case class SZ + +class QueueActor extends Transactor { + private val accounts = RedisStorage.newQueue + + def receive = { + // enqueue + case NQ(accountNo) => + accounts.enqueue(accountNo.getBytes) + reply(true) + + // dequeue + case DQ => + val d = new String(accounts.dequeue) + reply(d) + + // multiple NQ and DQ + case MNDQ(enqs, no, failer) => + accounts.enqueue(enqs.map(_.getBytes): _*) + try { + (1 to no).foreach(e => accounts.dequeue) + } catch { + case e: Exception => + failer !! "Failure" + } + reply(true) + + // size + case SZ => + reply(accounts.size) + } +} + +class RedisPersistentQSpec extends TestCase { + @Test + def testSuccessfulNQ = { + val qa = new QueueActor + qa.start + qa !! NQ("a-123") + qa !! NQ("a-124") + qa !! NQ("a-125") + assertEquals(3, (qa !! SZ).get) + } + + @Test + def testSuccessfulDQ = { + val qa = new QueueActor + qa.start + qa !! NQ("a-123") + qa !! NQ("a-124") + qa !! NQ("a-125") + assertEquals(3, (qa !! SZ).get) + assertEquals("a-123", (qa !! DQ).get) + assertEquals("a-124", (qa !! DQ).get) + assertEquals("a-125", (qa !! DQ).get) + assertEquals(0, (qa !! SZ).get) + } + + @Test + def testSuccessfulMNDQ = { + val qa = new QueueActor + qa.start + val failer = new PersistentFailerActor + failer.start + + qa !! NQ("a-123") + qa !! NQ("a-124") + qa !! NQ("a-125") + assertEquals(3, (qa !! SZ).get) + assertEquals("a-123", (qa !! DQ).get) + assertEquals(2, (qa !! SZ).get) + qa !! MNDQ(List("a-126", "a-127"), 2, failer) + assertEquals(2, (qa !! SZ).get) + } + + @Test + def testMixedMNDQ = { + val qa = new QueueActor + qa.start + val failer = new PersistentFailerActor + failer.start + + // 3 enqueues + qa !! NQ("a-123") + qa !! NQ("a-124") + qa !! NQ("a-125") + + assertEquals(3, (qa !! SZ).get) + + // dequeue 1 + assertEquals("a-123", (qa !! DQ).get) + + // size == 2 + assertEquals(2, (qa !! SZ).get) + + // enqueue 2, dequeue 2 => size == 2 + qa !! MNDQ(List("a-126", "a-127"), 2, failer) + assertEquals(2, (qa !! SZ).get) + + // enqueue 2 => size == 4 + qa !! NQ("a-128") + qa !! NQ("a-129") + assertEquals(4, (qa !! SZ).get) + + // enqueue 1 => size 5 + // dequeue 6 => fail transaction + // size should remain 4 + try { + qa !! MNDQ(List("a-130"), 6, failer) + } catch { case e: Exception => {} } + + assertEquals(4, (qa !! SZ).get) + } +} diff --git a/akka-persistence/akka-persistence-redis/src/test/scala/RedisStorageBackendSpec.scala b/akka-persistence/akka-persistence-redis/src/test/scala/RedisStorageBackendSpec.scala index cd91d4a591..504a0e114d 100644 --- a/akka-persistence/akka-persistence-redis/src/test/scala/RedisStorageBackendSpec.scala +++ b/akka-persistence/akka-persistence-redis/src/test/scala/RedisStorageBackendSpec.scala @@ -113,6 +113,59 @@ class RedisStorageBackendSpec extends n.fromBytes(getRefStorageFor("T-4").get) should equal(n) } } + + describe("store and query in queue") { + it("should give proper queue semantics") { + enqueue("T-5", "alan kay".getBytes) + enqueue("T-5", "alan turing".getBytes) + enqueue("T-5", "richard stallman".getBytes) + enqueue("T-5", "yukihiro matsumoto".getBytes) + enqueue("T-5", "claude shannon".getBytes) + enqueue("T-5", "linus torvalds".getBytes) + + RedisStorageBackend.size("T-5") should equal(6) + + new String(dequeue("T-5").get) should equal("alan kay") + new String(dequeue("T-5").get) should equal("alan turing") + + RedisStorageBackend.size("T-5") should equal(4) + + val l = peek("T-5", 0, 3) + l.size should equal(3) + new String(l(0)) should equal("richard stallman") + new String(l(1)) should equal("yukihiro matsumoto") + new String(l(2)) should equal("claude shannon") + } + } + + describe("store and query in sorted set") { + it("should give proper sorted set semantics") { + zadd("hackers", "1965", "yukihiro matsumoto".getBytes) + zadd("hackers", "1953", "richard stallman".getBytes) + zadd("hackers", "1916", "claude shannon".getBytes) + zadd("hackers", "1969", "linus torvalds".getBytes) + zadd("hackers", "1940", "alan kay".getBytes) + zadd("hackers", "1912", "alan turing".getBytes) + + zcard("hackers") should equal(6) + + zscore("hackers", "alan turing".getBytes) should equal("1912") + zscore("hackers", "richard stallman".getBytes) should equal("1953") + zscore("hackers", "claude shannon".getBytes) should equal("1916") + zscore("hackers", "linus torvalds".getBytes) should equal("1969") + + val s: List[Array[Byte]] = zrange("hackers", 0, 2) + s.size should equal(3) + s.map(new String(_)) should equal(List("alan turing", "claude shannon", "alan kay")) + + var sorted: List[String] = + List("alan turing", "claude shannon", "alan kay", "richard stallman", "yukihiro matsumoto", "linus torvalds") + + val t: List[Array[Byte]] = zrange("hackers", 0, -1) + t.size should equal(6) + t.map(new String(_)) should equal(sorted) + } + } } case class Name(id: Int, name: String, address: String) diff --git a/embedded-repo/com/redis/redisclient/1.0.1/redisclient-1.0.1.jar b/embedded-repo/com/redis/redisclient/1.0.1/redisclient-1.0.1.jar index d4a293effa..ec9c6c8ab4 100644 Binary files a/embedded-repo/com/redis/redisclient/1.0.1/redisclient-1.0.1.jar and b/embedded-repo/com/redis/redisclient/1.0.1/redisclient-1.0.1.jar differ