Implemented persistent transactional queue with Redis backend

This commit is contained in:
debasishg 2010-01-11 15:30:38 +05:30
parent 15d38ed154
commit 9f12d05def
7 changed files with 453 additions and 2 deletions

View file

@ -40,6 +40,7 @@ class NoTransactionInScopeException extends RuntimeException
* </pre>
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
* @author <a href="http://debasishg.blogspot.com">Debasish Ghosh</a>
*/
trait Storage {
type ElementType
@ -47,14 +48,17 @@ trait Storage {
def newMap: PersistentMap[ElementType, ElementType]
def newVector: PersistentVector[ElementType]
def newRef: PersistentRef[ElementType]
def newQueue: PersistentQueue[ElementType]
def getMap(id: String): PersistentMap[ElementType, ElementType]
def getVector(id: String): PersistentVector[ElementType]
def getRef(id: String): PersistentRef[ElementType]
def getQueue(id: String): PersistentQueue[ElementType]
def newMap(id: String): PersistentMap[ElementType, ElementType]
def newVector(id: String): PersistentVector[ElementType]
def newRef(id: String): PersistentRef[ElementType]
def newQueue(id: String): PersistentQueue[ElementType]
}
@ -269,3 +273,128 @@ trait PersistentRef[T] extends Transactional with Committable {
currentTransaction.get.get.register(uuid, this)
}
}
/**
* Implementation of <tt>PersistentQueue</tt> for every concrete
* storage will have the same workflow. This abstracts the workflow.
* <p/>
* Enqueue is simpler, we just have to record the operation in a local
* transactional store for playback during commit. This store
* <tt>enqueueNDequeuedEntries</tt> stores the entire history of enqueue
* and dequeue that will be played at commit on the underlying store.
* </p>
* The main challenge with dequeue is that we need to return the element
* that has been dequeued. Hence in addition to the above store, we need to
* have another local queue that actually does the enqueue dequeue operations
* that take place <em>only during this transaction</em>. This gives us the
* element that will be dequeued next from the set of elements enqueued
* <em>during this transaction</em>.
* </p>
* The third item that we need is an index to the underlying storage element
* that may also have to be dequeued as part of the current transaction. This
* is modeled using a ref to an Int that points to elements in the underlyinng store.
* </p>
* Subclasses just need to provide the actual concrete instance for the
* abstract val <tt>storage</tt>.
*
* @author <a href="http://debasishg.blogspot.com">Debasish Ghosh</a>
*/
trait PersistentQueue[A] extends scala.collection.mutable.Queue[A]
with Transactional with Committable with Logging {
abstract case class QueueOp
case object ENQ extends QueueOp
case object DEQ extends QueueOp
import scala.collection.immutable.Queue
// current trail that will be played on commit to the underlying store
protected val enqueuedNDequeuedEntries = TransactionalState.newVector[(Option[A], QueueOp)]
protected val shouldClearOnCommit = TransactionalRef[Boolean]()
// local queue that will record all enqueues and dequeues in the current txn
protected val localQ = TransactionalRef[Queue[A]]()
// keeps a pointer to the underlying storage for the enxt candidate to be dequeued
protected val pickMeForDQ = TransactionalRef[Int]()
localQ.swap(Queue.Empty)
pickMeForDQ.swap(0)
// to be concretized in subclasses
val storage: QueueStorageBackend[A]
def commit = {
enqueuedNDequeuedEntries.toList.foreach { e =>
e._2 match {
case ENQ => storage.enqueue(uuid, e._1.get)
case DEQ => storage.dequeue(uuid)
}
}
if (shouldClearOnCommit.isDefined && shouldClearOnCommit.get.get) {
storage.remove(uuid)
}
enqueuedNDequeuedEntries.clear
localQ.swap(Queue.Empty)
pickMeForDQ.swap(0)
}
override def enqueue(elems: A*) {
register
elems.foreach(e => {
enqueuedNDequeuedEntries.add((Some(e), ENQ))
localQ.get.get.enqueue(e)
})
}
override def dequeue: A = {
register
// record for later playback
enqueuedNDequeuedEntries.add((None, DEQ))
val i = pickMeForDQ.get.get
if (i < storage.size(uuid)) {
// still we can DQ from storage
pickMeForDQ.swap(i + 1)
storage.peek(uuid, i, 1)(0)
} else {
// check we have transient candidates in localQ for DQ
if (localQ.get.get.isEmpty == false) {
val (a, q) = localQ.get.get.dequeue
localQ.swap(q)
a
}
else
throw new NoSuchElementException("trying to dequeue from empty queue")
}
}
override def clear = {
register
shouldClearOnCommit.swap(true)
localQ.swap(Queue.Empty)
pickMeForDQ.swap(0)
}
override def size: Int = try {
storage.size(uuid) + localQ.get.get.length
} catch { case e: Exception => 0 }
override def isEmpty: Boolean =
size == 0
override def +=(elem: A): Unit = enqueue(elem)
override def ++=(elems: Iterator[A]): Unit = enqueue(elems.toList: _*)
override def ++=(elems: Iterable[A]): Unit = this ++= elems.elements
override def dequeueFirst(p: A => Boolean): Option[A] =
throw new UnsupportedOperationException("dequeueFirst not supported")
override def dequeueAll(p: A => Boolean): Seq[A] =
throw new UnsupportedOperationException("dequeueAll not supported")
private def register = {
if (currentTransaction.get.isEmpty) throw new NoTransactionInScopeException
currentTransaction.get.get.register(uuid, this)
}
}

View file

@ -34,3 +34,38 @@ trait RefStorageBackend[T] extends StorageBackend {
def insertRefStorageFor(name: String, element: T)
def getRefStorageFor(name: String): Option[T]
}
// for Queue
trait QueueStorageBackend[T] extends StorageBackend {
// add to the end of the queue
def enqueue(name: String, item: T): Boolean
// pop from the front of the queue
def dequeue(name: String): Option[T]
// get the size of the queue
def size(name: String): Int
// return an array of items currently stored in the queue
// start is the item to begin, count is how many items to return
def peek(name: String, start: Int, count: Int): List[T]
// completely delete the queue
def remove(name: String): Boolean
}
trait SortedSetStorageBackend[T] extends StorageBackend {
// add item to sorted set identified by name
def zadd(name: String, zscore: String, item: T): Boolean
// remove item from sorted set identified by name
def zrem(name: String, item: T): Boolean
// cardinality of the set idnetified by name
def zcard(name: String): Int
def zscore(name: String, item: T): String
def zrange(name: String, start: Int, end: Int): List[T]
}

View file

@ -12,18 +12,21 @@ object RedisStorage extends Storage {
def newMap: PersistentMap[ElementType, ElementType] = newMap(Uuid.newUuid.toString)
def newVector: PersistentVector[ElementType] = newVector(Uuid.newUuid.toString)
def newRef: PersistentRef[ElementType] = newRef(Uuid.newUuid.toString)
def newQueue: PersistentQueue[ElementType] = newQueue(Uuid.newUuid.toString)
def getMap(id: String): PersistentMap[ElementType, ElementType] = newMap(id)
def getVector(id: String): PersistentVector[ElementType] = newVector(id)
def getRef(id: String): PersistentRef[ElementType] = newRef(id)
def getQueue(id: String): PersistentQueue[ElementType] = newQueue(id)
def newMap(id: String): PersistentMap[ElementType, ElementType] = new RedisPersistentMap(id)
def newVector(id: String): PersistentVector[ElementType] = new RedisPersistentVector(id)
def newRef(id: String): PersistentRef[ElementType] = new RedisPersistentRef(id)
def newQueue(id: String): PersistentQueue[ElementType] = new RedisPersistentQueue(id)
}
/**
* Implements a persistent transactional map based on the MongoDB document storage.
* Implements a persistent transactional map based on the Redis storage.
*
* @author <a href="http://debasishg.blogspot.com">Debasish Ghosh</a>
*/
@ -34,7 +37,7 @@ class RedisPersistentMap(id: String) extends PersistentMap[Array[Byte], Array[By
/**
* Implements a persistent transactional vector based on the Redis
* document storage.
* storage.
*
* @author <a href="http://debasishg.blogspot.com">Debasish Ghosh</a>
*/
@ -47,3 +50,14 @@ class RedisPersistentRef(id: String) extends PersistentRef[Array[Byte]] {
val uuid = id
val storage = RedisStorageBackend
}
/**
* Implements a persistent transactional queue based on the Redis
* storage.
*
* @author <a href="http://debasishg.blogspot.com">Debasish Ghosh</a>
*/
class RedisPersistentQueue(id: String) extends PersistentQueue[Array[Byte]] {
val uuid = id
val storage = RedisStorageBackend
}

View file

@ -38,6 +38,8 @@ private [akka] object RedisStorageBackend extends
MapStorageBackend[Array[Byte], Array[Byte]] with
VectorStorageBackend[Array[Byte]] with
RefStorageBackend[Array[Byte]] with
QueueStorageBackend[Array[Byte]] with
SortedSetStorageBackend[Array[Byte]] with
Logging {
val REDIS_SERVER_HOSTNAME = config.getString("akka.storage.redis.hostname", "127.0.0.1")
@ -247,5 +249,90 @@ private [akka] object RedisStorageBackend extends
}
}
// add to the end of the queue
def enqueue(name: String, item: Array[Byte]): Boolean = {
db.pushTail(new String(encode(name.getBytes)), new String(item))
}
// pop from the front of the queue
def dequeue(name: String): Option[Array[Byte]] = {
db.popHead(new String(encode(name.getBytes))) match {
case None =>
throw new Predef.NoSuchElementException(name + " not present")
case Some(s) =>
Some(s.getBytes)
}
}
// get the size of the queue
def size(name: String): Int = {
db.listLength(new String(encode(name.getBytes))) match {
case None =>
throw new Predef.NoSuchElementException(name + " not present")
case Some(l) => l
}
}
// return an array of items currently stored in the queue
// start is the item to begin, count is how many items to return
def peek(name: String, start: Int, count: Int): List[Array[Byte]] = count match {
case 1 =>
db.listIndex(new String(encode(name.getBytes)), start) match {
case None =>
throw new Predef.NoSuchElementException("No element at " + start)
case Some(s) =>
List(s.getBytes)
}
case n =>
db.listRange(new String(encode(name.getBytes)), start, start + count - 1) match {
case None =>
throw new Predef.NoSuchElementException(
"No element found between " + start + " and " + (start + count - 1))
case Some(es) =>
es.map(_.getBytes)
}
}
// completely delete the queue
def remove(name: String): Boolean = {
db.delete(new String(encode(name.getBytes)))
}
// add item to sorted set identified by name
def zadd(name: String, zscore: String, item: Array[Byte]): Boolean = {
db.zAdd(new String(encode(name.getBytes)), zscore, new String(item))
}
// remove item from sorted set identified by name
def zrem(name: String, item: Array[Byte]): Boolean = {
db.zRem(new String(encode(name.getBytes)), new String(item))
}
// cardinality of the set identified by name
def zcard(name: String): Int = {
db.zCard(new String(encode(name.getBytes))) match {
case None =>
throw new Predef.NoSuchElementException(name + " not present")
case Some(l) => l
}
}
def zscore(name: String, item: Array[Byte]): String = {
db.zScore(new String(encode(name.getBytes)), new String(item)) match {
case None =>
throw new Predef.NoSuchElementException(new String(item) + " not present")
case Some(s) => s
}
}
def zrange(name: String, start: Int, end: Int): List[Array[Byte]] = {
db.zRange(new String(encode(name.getBytes)), start.toString, end.toString, SocketOperations.ASC, false) match {
case None =>
throw new Predef.NoSuchElementException(name + " not present")
case Some(s) =>
s.map(_.getBytes)
}
}
def flushDB = db.flushDb
}

View file

@ -0,0 +1,133 @@
package se.scalablesolutions.akka.state
import junit.framework.TestCase
import org.junit.{Test, Before}
import org.junit.Assert._
import se.scalablesolutions.akka.actor.{Actor, Transactor}
/**
* A persistent actor based on Redis queue storage.
* <p/>
* Needs a running Redis server.
* @author <a href="http://debasishg.blogspot.com">Debasish Ghosh</a>
*/
case class NQ(accountNo: String)
case class DQ
case class MNDQ(accountNos: List[String], noOfDQs: Int, failer: Actor)
case class SZ
class QueueActor extends Transactor {
private val accounts = RedisStorage.newQueue
def receive = {
// enqueue
case NQ(accountNo) =>
accounts.enqueue(accountNo.getBytes)
reply(true)
// dequeue
case DQ =>
val d = new String(accounts.dequeue)
reply(d)
// multiple NQ and DQ
case MNDQ(enqs, no, failer) =>
accounts.enqueue(enqs.map(_.getBytes): _*)
try {
(1 to no).foreach(e => accounts.dequeue)
} catch {
case e: Exception =>
failer !! "Failure"
}
reply(true)
// size
case SZ =>
reply(accounts.size)
}
}
class RedisPersistentQSpec extends TestCase {
@Test
def testSuccessfulNQ = {
val qa = new QueueActor
qa.start
qa !! NQ("a-123")
qa !! NQ("a-124")
qa !! NQ("a-125")
assertEquals(3, (qa !! SZ).get)
}
@Test
def testSuccessfulDQ = {
val qa = new QueueActor
qa.start
qa !! NQ("a-123")
qa !! NQ("a-124")
qa !! NQ("a-125")
assertEquals(3, (qa !! SZ).get)
assertEquals("a-123", (qa !! DQ).get)
assertEquals("a-124", (qa !! DQ).get)
assertEquals("a-125", (qa !! DQ).get)
assertEquals(0, (qa !! SZ).get)
}
@Test
def testSuccessfulMNDQ = {
val qa = new QueueActor
qa.start
val failer = new PersistentFailerActor
failer.start
qa !! NQ("a-123")
qa !! NQ("a-124")
qa !! NQ("a-125")
assertEquals(3, (qa !! SZ).get)
assertEquals("a-123", (qa !! DQ).get)
assertEquals(2, (qa !! SZ).get)
qa !! MNDQ(List("a-126", "a-127"), 2, failer)
assertEquals(2, (qa !! SZ).get)
}
@Test
def testMixedMNDQ = {
val qa = new QueueActor
qa.start
val failer = new PersistentFailerActor
failer.start
// 3 enqueues
qa !! NQ("a-123")
qa !! NQ("a-124")
qa !! NQ("a-125")
assertEquals(3, (qa !! SZ).get)
// dequeue 1
assertEquals("a-123", (qa !! DQ).get)
// size == 2
assertEquals(2, (qa !! SZ).get)
// enqueue 2, dequeue 2 => size == 2
qa !! MNDQ(List("a-126", "a-127"), 2, failer)
assertEquals(2, (qa !! SZ).get)
// enqueue 2 => size == 4
qa !! NQ("a-128")
qa !! NQ("a-129")
assertEquals(4, (qa !! SZ).get)
// enqueue 1 => size 5
// dequeue 6 => fail transaction
// size should remain 4
try {
qa !! MNDQ(List("a-130"), 6, failer)
} catch { case e: Exception => {} }
assertEquals(4, (qa !! SZ).get)
}
}

View file

@ -113,6 +113,59 @@ class RedisStorageBackendSpec extends
n.fromBytes(getRefStorageFor("T-4").get) should equal(n)
}
}
describe("store and query in queue") {
it("should give proper queue semantics") {
enqueue("T-5", "alan kay".getBytes)
enqueue("T-5", "alan turing".getBytes)
enqueue("T-5", "richard stallman".getBytes)
enqueue("T-5", "yukihiro matsumoto".getBytes)
enqueue("T-5", "claude shannon".getBytes)
enqueue("T-5", "linus torvalds".getBytes)
RedisStorageBackend.size("T-5") should equal(6)
new String(dequeue("T-5").get) should equal("alan kay")
new String(dequeue("T-5").get) should equal("alan turing")
RedisStorageBackend.size("T-5") should equal(4)
val l = peek("T-5", 0, 3)
l.size should equal(3)
new String(l(0)) should equal("richard stallman")
new String(l(1)) should equal("yukihiro matsumoto")
new String(l(2)) should equal("claude shannon")
}
}
describe("store and query in sorted set") {
it("should give proper sorted set semantics") {
zadd("hackers", "1965", "yukihiro matsumoto".getBytes)
zadd("hackers", "1953", "richard stallman".getBytes)
zadd("hackers", "1916", "claude shannon".getBytes)
zadd("hackers", "1969", "linus torvalds".getBytes)
zadd("hackers", "1940", "alan kay".getBytes)
zadd("hackers", "1912", "alan turing".getBytes)
zcard("hackers") should equal(6)
zscore("hackers", "alan turing".getBytes) should equal("1912")
zscore("hackers", "richard stallman".getBytes) should equal("1953")
zscore("hackers", "claude shannon".getBytes) should equal("1916")
zscore("hackers", "linus torvalds".getBytes) should equal("1969")
val s: List[Array[Byte]] = zrange("hackers", 0, 2)
s.size should equal(3)
s.map(new String(_)) should equal(List("alan turing", "claude shannon", "alan kay"))
var sorted: List[String] =
List("alan turing", "claude shannon", "alan kay", "richard stallman", "yukihiro matsumoto", "linus torvalds")
val t: List[Array[Byte]] = zrange("hackers", 0, -1)
t.size should equal(6)
t.map(new String(_)) should equal(sorted)
}
}
}
case class Name(id: Int, name: String, address: String)