Implemented persistent transactional queue with Redis backend
This commit is contained in:
parent
fee16e42a4
commit
a9371c229b
7 changed files with 453 additions and 2 deletions
|
|
@ -40,6 +40,7 @@ class NoTransactionInScopeException extends RuntimeException
|
||||||
* </pre>
|
* </pre>
|
||||||
*
|
*
|
||||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||||
|
* @author <a href="http://debasishg.blogspot.com">Debasish Ghosh</a>
|
||||||
*/
|
*/
|
||||||
trait Storage {
|
trait Storage {
|
||||||
type ElementType
|
type ElementType
|
||||||
|
|
@ -47,14 +48,17 @@ trait Storage {
|
||||||
def newMap: PersistentMap[ElementType, ElementType]
|
def newMap: PersistentMap[ElementType, ElementType]
|
||||||
def newVector: PersistentVector[ElementType]
|
def newVector: PersistentVector[ElementType]
|
||||||
def newRef: PersistentRef[ElementType]
|
def newRef: PersistentRef[ElementType]
|
||||||
|
def newQueue: PersistentQueue[ElementType]
|
||||||
|
|
||||||
def getMap(id: String): PersistentMap[ElementType, ElementType]
|
def getMap(id: String): PersistentMap[ElementType, ElementType]
|
||||||
def getVector(id: String): PersistentVector[ElementType]
|
def getVector(id: String): PersistentVector[ElementType]
|
||||||
def getRef(id: String): PersistentRef[ElementType]
|
def getRef(id: String): PersistentRef[ElementType]
|
||||||
|
def getQueue(id: String): PersistentQueue[ElementType]
|
||||||
|
|
||||||
def newMap(id: String): PersistentMap[ElementType, ElementType]
|
def newMap(id: String): PersistentMap[ElementType, ElementType]
|
||||||
def newVector(id: String): PersistentVector[ElementType]
|
def newVector(id: String): PersistentVector[ElementType]
|
||||||
def newRef(id: String): PersistentRef[ElementType]
|
def newRef(id: String): PersistentRef[ElementType]
|
||||||
|
def newQueue(id: String): PersistentQueue[ElementType]
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
@ -269,3 +273,128 @@ trait PersistentRef[T] extends Transactional with Committable {
|
||||||
currentTransaction.get.get.register(uuid, this)
|
currentTransaction.get.get.register(uuid, this)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Implementation of <tt>PersistentQueue</tt> for every concrete
|
||||||
|
* storage will have the same workflow. This abstracts the workflow.
|
||||||
|
* <p/>
|
||||||
|
* Enqueue is simpler, we just have to record the operation in a local
|
||||||
|
* transactional store for playback during commit. This store
|
||||||
|
* <tt>enqueueNDequeuedEntries</tt> stores the entire history of enqueue
|
||||||
|
* and dequeue that will be played at commit on the underlying store.
|
||||||
|
* </p>
|
||||||
|
* The main challenge with dequeue is that we need to return the element
|
||||||
|
* that has been dequeued. Hence in addition to the above store, we need to
|
||||||
|
* have another local queue that actually does the enqueue dequeue operations
|
||||||
|
* that take place <em>only during this transaction</em>. This gives us the
|
||||||
|
* element that will be dequeued next from the set of elements enqueued
|
||||||
|
* <em>during this transaction</em>.
|
||||||
|
* </p>
|
||||||
|
* The third item that we need is an index to the underlying storage element
|
||||||
|
* that may also have to be dequeued as part of the current transaction. This
|
||||||
|
* is modeled using a ref to an Int that points to elements in the underlyinng store.
|
||||||
|
* </p>
|
||||||
|
* Subclasses just need to provide the actual concrete instance for the
|
||||||
|
* abstract val <tt>storage</tt>.
|
||||||
|
*
|
||||||
|
* @author <a href="http://debasishg.blogspot.com">Debasish Ghosh</a>
|
||||||
|
*/
|
||||||
|
trait PersistentQueue[A] extends scala.collection.mutable.Queue[A]
|
||||||
|
with Transactional with Committable with Logging {
|
||||||
|
|
||||||
|
abstract case class QueueOp
|
||||||
|
case object ENQ extends QueueOp
|
||||||
|
case object DEQ extends QueueOp
|
||||||
|
|
||||||
|
import scala.collection.immutable.Queue
|
||||||
|
|
||||||
|
// current trail that will be played on commit to the underlying store
|
||||||
|
protected val enqueuedNDequeuedEntries = TransactionalState.newVector[(Option[A], QueueOp)]
|
||||||
|
protected val shouldClearOnCommit = TransactionalRef[Boolean]()
|
||||||
|
|
||||||
|
// local queue that will record all enqueues and dequeues in the current txn
|
||||||
|
protected val localQ = TransactionalRef[Queue[A]]()
|
||||||
|
|
||||||
|
// keeps a pointer to the underlying storage for the enxt candidate to be dequeued
|
||||||
|
protected val pickMeForDQ = TransactionalRef[Int]()
|
||||||
|
|
||||||
|
localQ.swap(Queue.Empty)
|
||||||
|
pickMeForDQ.swap(0)
|
||||||
|
|
||||||
|
// to be concretized in subclasses
|
||||||
|
val storage: QueueStorageBackend[A]
|
||||||
|
|
||||||
|
def commit = {
|
||||||
|
enqueuedNDequeuedEntries.toList.foreach { e =>
|
||||||
|
e._2 match {
|
||||||
|
case ENQ => storage.enqueue(uuid, e._1.get)
|
||||||
|
case DEQ => storage.dequeue(uuid)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (shouldClearOnCommit.isDefined && shouldClearOnCommit.get.get) {
|
||||||
|
storage.remove(uuid)
|
||||||
|
}
|
||||||
|
enqueuedNDequeuedEntries.clear
|
||||||
|
localQ.swap(Queue.Empty)
|
||||||
|
pickMeForDQ.swap(0)
|
||||||
|
}
|
||||||
|
|
||||||
|
override def enqueue(elems: A*) {
|
||||||
|
register
|
||||||
|
elems.foreach(e => {
|
||||||
|
enqueuedNDequeuedEntries.add((Some(e), ENQ))
|
||||||
|
localQ.get.get.enqueue(e)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
override def dequeue: A = {
|
||||||
|
register
|
||||||
|
// record for later playback
|
||||||
|
enqueuedNDequeuedEntries.add((None, DEQ))
|
||||||
|
|
||||||
|
val i = pickMeForDQ.get.get
|
||||||
|
if (i < storage.size(uuid)) {
|
||||||
|
// still we can DQ from storage
|
||||||
|
pickMeForDQ.swap(i + 1)
|
||||||
|
storage.peek(uuid, i, 1)(0)
|
||||||
|
} else {
|
||||||
|
// check we have transient candidates in localQ for DQ
|
||||||
|
if (localQ.get.get.isEmpty == false) {
|
||||||
|
val (a, q) = localQ.get.get.dequeue
|
||||||
|
localQ.swap(q)
|
||||||
|
a
|
||||||
|
}
|
||||||
|
else
|
||||||
|
throw new NoSuchElementException("trying to dequeue from empty queue")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
override def clear = {
|
||||||
|
register
|
||||||
|
shouldClearOnCommit.swap(true)
|
||||||
|
localQ.swap(Queue.Empty)
|
||||||
|
pickMeForDQ.swap(0)
|
||||||
|
}
|
||||||
|
|
||||||
|
override def size: Int = try {
|
||||||
|
storage.size(uuid) + localQ.get.get.length
|
||||||
|
} catch { case e: Exception => 0 }
|
||||||
|
|
||||||
|
override def isEmpty: Boolean =
|
||||||
|
size == 0
|
||||||
|
|
||||||
|
override def +=(elem: A): Unit = enqueue(elem)
|
||||||
|
override def ++=(elems: Iterator[A]): Unit = enqueue(elems.toList: _*)
|
||||||
|
override def ++=(elems: Iterable[A]): Unit = this ++= elems.elements
|
||||||
|
|
||||||
|
override def dequeueFirst(p: A => Boolean): Option[A] =
|
||||||
|
throw new UnsupportedOperationException("dequeueFirst not supported")
|
||||||
|
|
||||||
|
override def dequeueAll(p: A => Boolean): Seq[A] =
|
||||||
|
throw new UnsupportedOperationException("dequeueAll not supported")
|
||||||
|
|
||||||
|
private def register = {
|
||||||
|
if (currentTransaction.get.isEmpty) throw new NoTransactionInScopeException
|
||||||
|
currentTransaction.get.get.register(uuid, this)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
||||||
|
|
@ -34,3 +34,38 @@ trait RefStorageBackend[T] extends StorageBackend {
|
||||||
def insertRefStorageFor(name: String, element: T)
|
def insertRefStorageFor(name: String, element: T)
|
||||||
def getRefStorageFor(name: String): Option[T]
|
def getRefStorageFor(name: String): Option[T]
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// for Queue
|
||||||
|
trait QueueStorageBackend[T] extends StorageBackend {
|
||||||
|
// add to the end of the queue
|
||||||
|
def enqueue(name: String, item: T): Boolean
|
||||||
|
|
||||||
|
// pop from the front of the queue
|
||||||
|
def dequeue(name: String): Option[T]
|
||||||
|
|
||||||
|
// get the size of the queue
|
||||||
|
def size(name: String): Int
|
||||||
|
|
||||||
|
// return an array of items currently stored in the queue
|
||||||
|
// start is the item to begin, count is how many items to return
|
||||||
|
def peek(name: String, start: Int, count: Int): List[T]
|
||||||
|
|
||||||
|
// completely delete the queue
|
||||||
|
def remove(name: String): Boolean
|
||||||
|
}
|
||||||
|
|
||||||
|
trait SortedSetStorageBackend[T] extends StorageBackend {
|
||||||
|
// add item to sorted set identified by name
|
||||||
|
def zadd(name: String, zscore: String, item: T): Boolean
|
||||||
|
|
||||||
|
// remove item from sorted set identified by name
|
||||||
|
def zrem(name: String, item: T): Boolean
|
||||||
|
|
||||||
|
// cardinality of the set idnetified by name
|
||||||
|
def zcard(name: String): Int
|
||||||
|
|
||||||
|
def zscore(name: String, item: T): String
|
||||||
|
|
||||||
|
def zrange(name: String, start: Int, end: Int): List[T]
|
||||||
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -12,18 +12,21 @@ object RedisStorage extends Storage {
|
||||||
def newMap: PersistentMap[ElementType, ElementType] = newMap(Uuid.newUuid.toString)
|
def newMap: PersistentMap[ElementType, ElementType] = newMap(Uuid.newUuid.toString)
|
||||||
def newVector: PersistentVector[ElementType] = newVector(Uuid.newUuid.toString)
|
def newVector: PersistentVector[ElementType] = newVector(Uuid.newUuid.toString)
|
||||||
def newRef: PersistentRef[ElementType] = newRef(Uuid.newUuid.toString)
|
def newRef: PersistentRef[ElementType] = newRef(Uuid.newUuid.toString)
|
||||||
|
def newQueue: PersistentQueue[ElementType] = newQueue(Uuid.newUuid.toString)
|
||||||
|
|
||||||
def getMap(id: String): PersistentMap[ElementType, ElementType] = newMap(id)
|
def getMap(id: String): PersistentMap[ElementType, ElementType] = newMap(id)
|
||||||
def getVector(id: String): PersistentVector[ElementType] = newVector(id)
|
def getVector(id: String): PersistentVector[ElementType] = newVector(id)
|
||||||
def getRef(id: String): PersistentRef[ElementType] = newRef(id)
|
def getRef(id: String): PersistentRef[ElementType] = newRef(id)
|
||||||
|
def getQueue(id: String): PersistentQueue[ElementType] = newQueue(id)
|
||||||
|
|
||||||
def newMap(id: String): PersistentMap[ElementType, ElementType] = new RedisPersistentMap(id)
|
def newMap(id: String): PersistentMap[ElementType, ElementType] = new RedisPersistentMap(id)
|
||||||
def newVector(id: String): PersistentVector[ElementType] = new RedisPersistentVector(id)
|
def newVector(id: String): PersistentVector[ElementType] = new RedisPersistentVector(id)
|
||||||
def newRef(id: String): PersistentRef[ElementType] = new RedisPersistentRef(id)
|
def newRef(id: String): PersistentRef[ElementType] = new RedisPersistentRef(id)
|
||||||
|
def newQueue(id: String): PersistentQueue[ElementType] = new RedisPersistentQueue(id)
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Implements a persistent transactional map based on the MongoDB document storage.
|
* Implements a persistent transactional map based on the Redis storage.
|
||||||
*
|
*
|
||||||
* @author <a href="http://debasishg.blogspot.com">Debasish Ghosh</a>
|
* @author <a href="http://debasishg.blogspot.com">Debasish Ghosh</a>
|
||||||
*/
|
*/
|
||||||
|
|
@ -34,7 +37,7 @@ class RedisPersistentMap(id: String) extends PersistentMap[Array[Byte], Array[By
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Implements a persistent transactional vector based on the Redis
|
* Implements a persistent transactional vector based on the Redis
|
||||||
* document storage.
|
* storage.
|
||||||
*
|
*
|
||||||
* @author <a href="http://debasishg.blogspot.com">Debasish Ghosh</a>
|
* @author <a href="http://debasishg.blogspot.com">Debasish Ghosh</a>
|
||||||
*/
|
*/
|
||||||
|
|
@ -47,3 +50,14 @@ class RedisPersistentRef(id: String) extends PersistentRef[Array[Byte]] {
|
||||||
val uuid = id
|
val uuid = id
|
||||||
val storage = RedisStorageBackend
|
val storage = RedisStorageBackend
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Implements a persistent transactional queue based on the Redis
|
||||||
|
* storage.
|
||||||
|
*
|
||||||
|
* @author <a href="http://debasishg.blogspot.com">Debasish Ghosh</a>
|
||||||
|
*/
|
||||||
|
class RedisPersistentQueue(id: String) extends PersistentQueue[Array[Byte]] {
|
||||||
|
val uuid = id
|
||||||
|
val storage = RedisStorageBackend
|
||||||
|
}
|
||||||
|
|
|
||||||
|
|
@ -38,6 +38,8 @@ private [akka] object RedisStorageBackend extends
|
||||||
MapStorageBackend[Array[Byte], Array[Byte]] with
|
MapStorageBackend[Array[Byte], Array[Byte]] with
|
||||||
VectorStorageBackend[Array[Byte]] with
|
VectorStorageBackend[Array[Byte]] with
|
||||||
RefStorageBackend[Array[Byte]] with
|
RefStorageBackend[Array[Byte]] with
|
||||||
|
QueueStorageBackend[Array[Byte]] with
|
||||||
|
SortedSetStorageBackend[Array[Byte]] with
|
||||||
Logging {
|
Logging {
|
||||||
|
|
||||||
val REDIS_SERVER_HOSTNAME = config.getString("akka.storage.redis.hostname", "127.0.0.1")
|
val REDIS_SERVER_HOSTNAME = config.getString("akka.storage.redis.hostname", "127.0.0.1")
|
||||||
|
|
@ -246,6 +248,91 @@ private [akka] object RedisStorageBackend extends
|
||||||
case Some(s) => Some(s.getBytes)
|
case Some(s) => Some(s.getBytes)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// add to the end of the queue
|
||||||
|
def enqueue(name: String, item: Array[Byte]): Boolean = {
|
||||||
|
db.pushTail(new String(encode(name.getBytes)), new String(item))
|
||||||
|
}
|
||||||
|
|
||||||
|
// pop from the front of the queue
|
||||||
|
def dequeue(name: String): Option[Array[Byte]] = {
|
||||||
|
db.popHead(new String(encode(name.getBytes))) match {
|
||||||
|
case None =>
|
||||||
|
throw new Predef.NoSuchElementException(name + " not present")
|
||||||
|
case Some(s) =>
|
||||||
|
Some(s.getBytes)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// get the size of the queue
|
||||||
|
def size(name: String): Int = {
|
||||||
|
db.listLength(new String(encode(name.getBytes))) match {
|
||||||
|
case None =>
|
||||||
|
throw new Predef.NoSuchElementException(name + " not present")
|
||||||
|
case Some(l) => l
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// return an array of items currently stored in the queue
|
||||||
|
// start is the item to begin, count is how many items to return
|
||||||
|
def peek(name: String, start: Int, count: Int): List[Array[Byte]] = count match {
|
||||||
|
case 1 =>
|
||||||
|
db.listIndex(new String(encode(name.getBytes)), start) match {
|
||||||
|
case None =>
|
||||||
|
throw new Predef.NoSuchElementException("No element at " + start)
|
||||||
|
case Some(s) =>
|
||||||
|
List(s.getBytes)
|
||||||
|
}
|
||||||
|
case n =>
|
||||||
|
db.listRange(new String(encode(name.getBytes)), start, start + count - 1) match {
|
||||||
|
case None =>
|
||||||
|
throw new Predef.NoSuchElementException(
|
||||||
|
"No element found between " + start + " and " + (start + count - 1))
|
||||||
|
case Some(es) =>
|
||||||
|
es.map(_.getBytes)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// completely delete the queue
|
||||||
|
def remove(name: String): Boolean = {
|
||||||
|
db.delete(new String(encode(name.getBytes)))
|
||||||
|
}
|
||||||
|
|
||||||
|
// add item to sorted set identified by name
|
||||||
|
def zadd(name: String, zscore: String, item: Array[Byte]): Boolean = {
|
||||||
|
db.zAdd(new String(encode(name.getBytes)), zscore, new String(item))
|
||||||
|
}
|
||||||
|
|
||||||
|
// remove item from sorted set identified by name
|
||||||
|
def zrem(name: String, item: Array[Byte]): Boolean = {
|
||||||
|
db.zRem(new String(encode(name.getBytes)), new String(item))
|
||||||
|
}
|
||||||
|
|
||||||
|
// cardinality of the set identified by name
|
||||||
|
def zcard(name: String): Int = {
|
||||||
|
db.zCard(new String(encode(name.getBytes))) match {
|
||||||
|
case None =>
|
||||||
|
throw new Predef.NoSuchElementException(name + " not present")
|
||||||
|
case Some(l) => l
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
def zscore(name: String, item: Array[Byte]): String = {
|
||||||
|
db.zScore(new String(encode(name.getBytes)), new String(item)) match {
|
||||||
|
case None =>
|
||||||
|
throw new Predef.NoSuchElementException(new String(item) + " not present")
|
||||||
|
case Some(s) => s
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
def zrange(name: String, start: Int, end: Int): List[Array[Byte]] = {
|
||||||
|
db.zRange(new String(encode(name.getBytes)), start.toString, end.toString, SocketOperations.ASC, false) match {
|
||||||
|
case None =>
|
||||||
|
throw new Predef.NoSuchElementException(name + " not present")
|
||||||
|
case Some(s) =>
|
||||||
|
s.map(_.getBytes)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
def flushDB = db.flushDb
|
def flushDB = db.flushDb
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,133 @@
|
||||||
|
package se.scalablesolutions.akka.state
|
||||||
|
|
||||||
|
import junit.framework.TestCase
|
||||||
|
|
||||||
|
import org.junit.{Test, Before}
|
||||||
|
import org.junit.Assert._
|
||||||
|
|
||||||
|
import se.scalablesolutions.akka.actor.{Actor, Transactor}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A persistent actor based on Redis queue storage.
|
||||||
|
* <p/>
|
||||||
|
* Needs a running Redis server.
|
||||||
|
* @author <a href="http://debasishg.blogspot.com">Debasish Ghosh</a>
|
||||||
|
*/
|
||||||
|
|
||||||
|
case class NQ(accountNo: String)
|
||||||
|
case class DQ
|
||||||
|
case class MNDQ(accountNos: List[String], noOfDQs: Int, failer: Actor)
|
||||||
|
case class SZ
|
||||||
|
|
||||||
|
class QueueActor extends Transactor {
|
||||||
|
private val accounts = RedisStorage.newQueue
|
||||||
|
|
||||||
|
def receive = {
|
||||||
|
// enqueue
|
||||||
|
case NQ(accountNo) =>
|
||||||
|
accounts.enqueue(accountNo.getBytes)
|
||||||
|
reply(true)
|
||||||
|
|
||||||
|
// dequeue
|
||||||
|
case DQ =>
|
||||||
|
val d = new String(accounts.dequeue)
|
||||||
|
reply(d)
|
||||||
|
|
||||||
|
// multiple NQ and DQ
|
||||||
|
case MNDQ(enqs, no, failer) =>
|
||||||
|
accounts.enqueue(enqs.map(_.getBytes): _*)
|
||||||
|
try {
|
||||||
|
(1 to no).foreach(e => accounts.dequeue)
|
||||||
|
} catch {
|
||||||
|
case e: Exception =>
|
||||||
|
failer !! "Failure"
|
||||||
|
}
|
||||||
|
reply(true)
|
||||||
|
|
||||||
|
// size
|
||||||
|
case SZ =>
|
||||||
|
reply(accounts.size)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
class RedisPersistentQSpec extends TestCase {
|
||||||
|
@Test
|
||||||
|
def testSuccessfulNQ = {
|
||||||
|
val qa = new QueueActor
|
||||||
|
qa.start
|
||||||
|
qa !! NQ("a-123")
|
||||||
|
qa !! NQ("a-124")
|
||||||
|
qa !! NQ("a-125")
|
||||||
|
assertEquals(3, (qa !! SZ).get)
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
def testSuccessfulDQ = {
|
||||||
|
val qa = new QueueActor
|
||||||
|
qa.start
|
||||||
|
qa !! NQ("a-123")
|
||||||
|
qa !! NQ("a-124")
|
||||||
|
qa !! NQ("a-125")
|
||||||
|
assertEquals(3, (qa !! SZ).get)
|
||||||
|
assertEquals("a-123", (qa !! DQ).get)
|
||||||
|
assertEquals("a-124", (qa !! DQ).get)
|
||||||
|
assertEquals("a-125", (qa !! DQ).get)
|
||||||
|
assertEquals(0, (qa !! SZ).get)
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
def testSuccessfulMNDQ = {
|
||||||
|
val qa = new QueueActor
|
||||||
|
qa.start
|
||||||
|
val failer = new PersistentFailerActor
|
||||||
|
failer.start
|
||||||
|
|
||||||
|
qa !! NQ("a-123")
|
||||||
|
qa !! NQ("a-124")
|
||||||
|
qa !! NQ("a-125")
|
||||||
|
assertEquals(3, (qa !! SZ).get)
|
||||||
|
assertEquals("a-123", (qa !! DQ).get)
|
||||||
|
assertEquals(2, (qa !! SZ).get)
|
||||||
|
qa !! MNDQ(List("a-126", "a-127"), 2, failer)
|
||||||
|
assertEquals(2, (qa !! SZ).get)
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
def testMixedMNDQ = {
|
||||||
|
val qa = new QueueActor
|
||||||
|
qa.start
|
||||||
|
val failer = new PersistentFailerActor
|
||||||
|
failer.start
|
||||||
|
|
||||||
|
// 3 enqueues
|
||||||
|
qa !! NQ("a-123")
|
||||||
|
qa !! NQ("a-124")
|
||||||
|
qa !! NQ("a-125")
|
||||||
|
|
||||||
|
assertEquals(3, (qa !! SZ).get)
|
||||||
|
|
||||||
|
// dequeue 1
|
||||||
|
assertEquals("a-123", (qa !! DQ).get)
|
||||||
|
|
||||||
|
// size == 2
|
||||||
|
assertEquals(2, (qa !! SZ).get)
|
||||||
|
|
||||||
|
// enqueue 2, dequeue 2 => size == 2
|
||||||
|
qa !! MNDQ(List("a-126", "a-127"), 2, failer)
|
||||||
|
assertEquals(2, (qa !! SZ).get)
|
||||||
|
|
||||||
|
// enqueue 2 => size == 4
|
||||||
|
qa !! NQ("a-128")
|
||||||
|
qa !! NQ("a-129")
|
||||||
|
assertEquals(4, (qa !! SZ).get)
|
||||||
|
|
||||||
|
// enqueue 1 => size 5
|
||||||
|
// dequeue 6 => fail transaction
|
||||||
|
// size should remain 4
|
||||||
|
try {
|
||||||
|
qa !! MNDQ(List("a-130"), 6, failer)
|
||||||
|
} catch { case e: Exception => {} }
|
||||||
|
|
||||||
|
assertEquals(4, (qa !! SZ).get)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -113,6 +113,59 @@ class RedisStorageBackendSpec extends
|
||||||
n.fromBytes(getRefStorageFor("T-4").get) should equal(n)
|
n.fromBytes(getRefStorageFor("T-4").get) should equal(n)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
describe("store and query in queue") {
|
||||||
|
it("should give proper queue semantics") {
|
||||||
|
enqueue("T-5", "alan kay".getBytes)
|
||||||
|
enqueue("T-5", "alan turing".getBytes)
|
||||||
|
enqueue("T-5", "richard stallman".getBytes)
|
||||||
|
enqueue("T-5", "yukihiro matsumoto".getBytes)
|
||||||
|
enqueue("T-5", "claude shannon".getBytes)
|
||||||
|
enqueue("T-5", "linus torvalds".getBytes)
|
||||||
|
|
||||||
|
RedisStorageBackend.size("T-5") should equal(6)
|
||||||
|
|
||||||
|
new String(dequeue("T-5").get) should equal("alan kay")
|
||||||
|
new String(dequeue("T-5").get) should equal("alan turing")
|
||||||
|
|
||||||
|
RedisStorageBackend.size("T-5") should equal(4)
|
||||||
|
|
||||||
|
val l = peek("T-5", 0, 3)
|
||||||
|
l.size should equal(3)
|
||||||
|
new String(l(0)) should equal("richard stallman")
|
||||||
|
new String(l(1)) should equal("yukihiro matsumoto")
|
||||||
|
new String(l(2)) should equal("claude shannon")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
describe("store and query in sorted set") {
|
||||||
|
it("should give proper sorted set semantics") {
|
||||||
|
zadd("hackers", "1965", "yukihiro matsumoto".getBytes)
|
||||||
|
zadd("hackers", "1953", "richard stallman".getBytes)
|
||||||
|
zadd("hackers", "1916", "claude shannon".getBytes)
|
||||||
|
zadd("hackers", "1969", "linus torvalds".getBytes)
|
||||||
|
zadd("hackers", "1940", "alan kay".getBytes)
|
||||||
|
zadd("hackers", "1912", "alan turing".getBytes)
|
||||||
|
|
||||||
|
zcard("hackers") should equal(6)
|
||||||
|
|
||||||
|
zscore("hackers", "alan turing".getBytes) should equal("1912")
|
||||||
|
zscore("hackers", "richard stallman".getBytes) should equal("1953")
|
||||||
|
zscore("hackers", "claude shannon".getBytes) should equal("1916")
|
||||||
|
zscore("hackers", "linus torvalds".getBytes) should equal("1969")
|
||||||
|
|
||||||
|
val s: List[Array[Byte]] = zrange("hackers", 0, 2)
|
||||||
|
s.size should equal(3)
|
||||||
|
s.map(new String(_)) should equal(List("alan turing", "claude shannon", "alan kay"))
|
||||||
|
|
||||||
|
var sorted: List[String] =
|
||||||
|
List("alan turing", "claude shannon", "alan kay", "richard stallman", "yukihiro matsumoto", "linus torvalds")
|
||||||
|
|
||||||
|
val t: List[Array[Byte]] = zrange("hackers", 0, -1)
|
||||||
|
t.size should equal(6)
|
||||||
|
t.map(new String(_)) should equal(sorted)
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
case class Name(id: Int, name: String, address: String)
|
case class Name(id: Int, name: String, address: String)
|
||||||
|
|
|
||||||
Binary file not shown.
Loading…
Add table
Add a link
Reference in a new issue