Merge branch 'master' of git@github.com:jboner/akka
This commit is contained in:
commit
67adfd8a86
8 changed files with 648 additions and 1 deletions
46
akka-persistence-redis/pom.xml
Normal file
46
akka-persistence-redis/pom.xml
Normal file
|
|
@ -0,0 +1,46 @@
|
|||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
|
||||
<artifactId>akka-persistence-redis</artifactId>
|
||||
<name>Akka Persistence Redis Module</name>
|
||||
|
||||
<packaging>jar</packaging>
|
||||
|
||||
<parent>
|
||||
<artifactId>akka</artifactId>
|
||||
<groupId>se.scalablesolutions.akka</groupId>
|
||||
<version>0.6</version>
|
||||
<relativePath>../pom.xml</relativePath>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<artifactId>akka-persistence-common</artifactId>
|
||||
<groupId>${project.groupId}</groupId>
|
||||
<version>${project.version}</version>
|
||||
</dependency>
|
||||
|
||||
<!-- For Mongo -->
|
||||
<dependency>
|
||||
<groupId>com.redis</groupId>
|
||||
<artifactId>redisclient</artifactId>
|
||||
<version>1.0.1</version>
|
||||
</dependency>
|
||||
|
||||
<!-- For Testing -->
|
||||
<dependency>
|
||||
<groupId>org.scalatest</groupId>
|
||||
<artifactId>scalatest</artifactId>
|
||||
<version>1.0</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>junit</groupId>
|
||||
<artifactId>junit</artifactId>
|
||||
<version>4.5</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
</project>
|
||||
49
akka-persistence-redis/src/main/scala/RedisStorage.scala
Normal file
49
akka-persistence-redis/src/main/scala/RedisStorage.scala
Normal file
|
|
@ -0,0 +1,49 @@
|
|||
/**
|
||||
* Copyright (C) 2009 Scalable Solutions.
|
||||
*/
|
||||
|
||||
package se.scalablesolutions.akka.state
|
||||
|
||||
import org.codehaus.aspectwerkz.proxy.Uuid
|
||||
|
||||
object RedisStorage extends Storage {
|
||||
type ElementType = Array[Byte]
|
||||
|
||||
def newMap: PersistentMap[ElementType, ElementType] = newMap(Uuid.newUuid.toString)
|
||||
def newVector: PersistentVector[ElementType] = newVector(Uuid.newUuid.toString)
|
||||
def newRef: PersistentRef[ElementType] = newRef(Uuid.newUuid.toString)
|
||||
|
||||
def getMap(id: String): PersistentMap[ElementType, ElementType] = newMap(id)
|
||||
def getVector(id: String): PersistentVector[ElementType] = newVector(id)
|
||||
def getRef(id: String): PersistentRef[ElementType] = newRef(id)
|
||||
|
||||
def newMap(id: String): PersistentMap[ElementType, ElementType] = new RedisPersistentMap(id)
|
||||
def newVector(id: String): PersistentVector[ElementType] = new RedisPersistentVector(id)
|
||||
def newRef(id: String): PersistentRef[ElementType] = new RedisPersistentRef(id)
|
||||
}
|
||||
|
||||
/**
|
||||
* Implements a persistent transactional map based on the MongoDB document storage.
|
||||
*
|
||||
* @author <a href="http://debasishg.blogspot.com">Debasish Ghosh</a>
|
||||
*/
|
||||
class RedisPersistentMap(id: String) extends PersistentMap[Array[Byte], Array[Byte]] {
|
||||
val uuid = id
|
||||
val storage = RedisStorageBackend
|
||||
}
|
||||
|
||||
/**
|
||||
* Implements a persistent transactional vector based on the Redis
|
||||
* document storage.
|
||||
*
|
||||
* @author <a href="http://debasishg.blogspot.com">Debasish Ghosh</a>
|
||||
*/
|
||||
class RedisPersistentVector(id: String) extends PersistentVector[Array[Byte]] {
|
||||
val uuid = id
|
||||
val storage = RedisStorageBackend
|
||||
}
|
||||
|
||||
class RedisPersistentRef(id: String) extends PersistentRef[Array[Byte]] {
|
||||
val uuid = id
|
||||
val storage = RedisStorageBackend
|
||||
}
|
||||
251
akka-persistence-redis/src/main/scala/RedisStorageBackend.scala
Normal file
251
akka-persistence-redis/src/main/scala/RedisStorageBackend.scala
Normal file
|
|
@ -0,0 +1,251 @@
|
|||
/**
|
||||
* Copyright (C) 2009 Scalable Solutions.
|
||||
*/
|
||||
|
||||
package se.scalablesolutions.akka.state
|
||||
|
||||
import se.scalablesolutions.akka.util.Logging
|
||||
import se.scalablesolutions.akka.Config.config
|
||||
|
||||
import com.redis._
|
||||
|
||||
trait Encoder {
|
||||
def encode(bytes: Array[Byte]): Array[Byte]
|
||||
def decode(bytes: Array[Byte]): Array[Byte]
|
||||
}
|
||||
|
||||
trait CommonsCodecBase64 {
|
||||
val base64 = new org.apache.commons.codec.binary.Base64
|
||||
|
||||
def encode(bytes: Array[Byte]): Array[Byte] = base64.encode(bytes)
|
||||
def decode(bytes: Array[Byte]): Array[Byte] = base64.decode(bytes)
|
||||
}
|
||||
|
||||
object Base64Encoder extends Encoder with CommonsCodecBase64
|
||||
import Base64Encoder._
|
||||
|
||||
/**
|
||||
* A module for supporting Redis based persistence.
|
||||
* <p/>
|
||||
* The module offers functionality for:
|
||||
* <li>Persistent Maps</li>
|
||||
* <li>Persistent Vectors</li>
|
||||
* <li>Persistent Refs</li>
|
||||
* <p/>
|
||||
* @author <a href="http://debasishg.blogspot.com">Debasish Ghosh</a>
|
||||
*/
|
||||
private [akka] object RedisStorageBackend extends
|
||||
MapStorageBackend[Array[Byte], Array[Byte]] with
|
||||
VectorStorageBackend[Array[Byte]] with
|
||||
RefStorageBackend[Array[Byte]] with
|
||||
Logging {
|
||||
|
||||
val REDIS_SERVER_HOSTNAME = config.getString("akka.storage.redis.hostname", "127.0.0.1")
|
||||
val REDIS_SERVER_PORT = config.getInt("akka.storage.redis.port", 6379)
|
||||
|
||||
val db = new Redis(REDIS_SERVER_HOSTNAME, REDIS_SERVER_PORT)
|
||||
|
||||
/**
|
||||
* Map storage in Redis.
|
||||
* <p/>
|
||||
* Maps are stored as key/value pairs in redis. <i>Redis keys cannot contain spaces</i>. But with
|
||||
* our use case, the keys will be specified by the user. Hence we need to encode the key
|
||||
* ourselves before sending to Redis. We use base64 encoding.
|
||||
* <p/>
|
||||
* Also since we are storing the key/value in the global namespace, we need to construct the
|
||||
* key suitably so as to avoid namespace clash. The following strategy is used:
|
||||
*
|
||||
* Unique identifier for the map = T1 (say)
|
||||
* <pre>
|
||||
* Map(
|
||||
* "debasish.address" -> "kolkata, India",
|
||||
* "debasish.company" -> "anshinsoft",
|
||||
* "debasish.programming_language" -> "scala",
|
||||
* )</pre>
|
||||
* will be stored as the following key-value pair in Redis:
|
||||
*
|
||||
* <i>
|
||||
* base64(T1):base64("debasish.address") -> "kolkata, India"
|
||||
* base64(T1):base64("debasish.company") -> "anshinsoft"
|
||||
* base64(T1):base64("debasish.programming_language") -> "scala"
|
||||
* </i>
|
||||
*/
|
||||
def insertMapStorageEntryFor(name: String, key: Array[Byte], value: Array[Byte]) {
|
||||
insertMapStorageEntriesFor(name, List((key, value)))
|
||||
}
|
||||
|
||||
def insertMapStorageEntriesFor(name: String, entries: List[Tuple2[Array[Byte], Array[Byte]]]) {
|
||||
mset(entries.map(e =>
|
||||
(makeRedisKey(name, e._1), new String(e._2))))
|
||||
}
|
||||
|
||||
/**
|
||||
* Make a redis key from an Akka Map key.
|
||||
* <p/>
|
||||
* The key is made as follows:
|
||||
* <li>redis key is composed of 2 parts: the transaction id and the map key separated by :</li>
|
||||
* <li>: is chosen since it cannot appear in base64 encoding charset</li>
|
||||
* <li>both parts of the key need to be based64 encoded since there can be spaces within each of them</li>
|
||||
*/
|
||||
private [this] def makeRedisKey(name: String, key: Array[Byte]): String = {
|
||||
"%s:%s".format(new String(encode(name.getBytes)), new String(encode(key)))
|
||||
}
|
||||
|
||||
private [this] def makeKeyFromRedisKey(redisKey: String) = {
|
||||
val nk = redisKey.split(':').map{e: String => decode(e.getBytes)}
|
||||
(nk(0), nk(1))
|
||||
}
|
||||
|
||||
private [this] def mset(entries: List[(String, String)]) {
|
||||
entries.foreach {e: (String, String) =>
|
||||
db.set(e._1, e._2)
|
||||
}
|
||||
}
|
||||
|
||||
def removeMapStorageFor(name: String): Unit = {
|
||||
db.keys("%s:*".format(encode(name.getBytes))) match {
|
||||
case None =>
|
||||
throw new Predef.NoSuchElementException(name + " not present")
|
||||
case Some(keys) =>
|
||||
keys.foreach(db.delete(_))
|
||||
}
|
||||
}
|
||||
|
||||
def removeMapStorageFor(name: String, key: Array[Byte]): Unit = {
|
||||
db.delete(makeRedisKey(name, key))
|
||||
}
|
||||
|
||||
def getMapStorageEntryFor(name: String, key: Array[Byte]): Option[Array[Byte]] =
|
||||
db.get(makeRedisKey(name, key)) match {
|
||||
case None =>
|
||||
throw new Predef.NoSuchElementException(new String(key) + " not present")
|
||||
case Some(s) => Some(s.getBytes)
|
||||
}
|
||||
|
||||
def getMapStorageSizeFor(name: String): Int = {
|
||||
db.keys("%s:*".format(new String(encode(name.getBytes)))) match {
|
||||
case None => 0
|
||||
case Some(keys) =>
|
||||
keys.length
|
||||
}
|
||||
}
|
||||
|
||||
def getMapStorageFor(name: String): List[(Array[Byte], Array[Byte])] = {
|
||||
db.keys("%s:*".format(new String(encode(name.getBytes)))) match {
|
||||
case None =>
|
||||
throw new Predef.NoSuchElementException(name + " not present")
|
||||
case Some(keys) =>
|
||||
keys.map(key => (makeKeyFromRedisKey(key)._2, db.get(key).get.getBytes)).toList
|
||||
}
|
||||
}
|
||||
|
||||
def getMapStorageRangeFor(name: String, start: Option[Array[Byte]],
|
||||
finish: Option[Array[Byte]],
|
||||
count: Int): List[(Array[Byte], Array[Byte])] = {
|
||||
|
||||
import scala.collection.immutable.TreeMap
|
||||
val wholeSorted =
|
||||
TreeMap(getMapStorageFor(name).map(e => (new String(e._1), e._2)): _*)
|
||||
|
||||
if (wholeSorted isEmpty) List()
|
||||
|
||||
val startKey =
|
||||
start match {
|
||||
case Some(bytes) => Some(new String(bytes))
|
||||
case None => None
|
||||
}
|
||||
|
||||
val endKey =
|
||||
finish match {
|
||||
case Some(bytes) => Some(new String(bytes))
|
||||
case None => None
|
||||
}
|
||||
|
||||
((startKey, endKey, count): @unchecked) match {
|
||||
case ((Some(s), Some(e), _)) =>
|
||||
wholeSorted.range(s, e)
|
||||
.toList
|
||||
.map(e => (e._1.getBytes, e._2))
|
||||
.toList
|
||||
case ((Some(s), None, c)) if c > 0 =>
|
||||
wholeSorted.from(s)
|
||||
.elements
|
||||
.take(count)
|
||||
.map(e => (e._1.getBytes, e._2))
|
||||
.toList
|
||||
case ((Some(s), None, _)) =>
|
||||
wholeSorted.from(s)
|
||||
.toList
|
||||
.map(e => (e._1.getBytes, e._2))
|
||||
.toList
|
||||
case ((None, Some(e), _)) =>
|
||||
wholeSorted.until(e)
|
||||
.toList
|
||||
.map(e => (e._1.getBytes, e._2))
|
||||
.toList
|
||||
}
|
||||
}
|
||||
|
||||
def insertVectorStorageEntryFor(name: String, element: Array[Byte]) {
|
||||
db.pushHead(new String(encode(name.getBytes)), new String(element))
|
||||
}
|
||||
|
||||
def insertVectorStorageEntriesFor(name: String, elements: List[Array[Byte]]) {
|
||||
elements.foreach(insertVectorStorageEntryFor(name, _))
|
||||
}
|
||||
|
||||
def updateVectorStorageEntryFor(name: String, index: Int, elem: Array[Byte]) {
|
||||
db.listSet(new String(encode(name.getBytes)), index, new String(elem))
|
||||
}
|
||||
|
||||
def getVectorStorageEntryFor(name: String, index: Int): Array[Byte] = {
|
||||
db.listIndex(new String(encode(name.getBytes)), index) match {
|
||||
case None =>
|
||||
throw new Predef.NoSuchElementException(name + " does not have element at " + index)
|
||||
case Some(e) => e.getBytes
|
||||
}
|
||||
}
|
||||
|
||||
def getVectorStorageRangeFor(name: String, start: Option[Int], finish: Option[Int], count: Int): List[Array[Byte]] = {
|
||||
/**
|
||||
* <tt>count</tt> is the max number of results to return. Start with
|
||||
* <tt>start</tt> or 0 (if <tt>start</tt> is not defined) and go until
|
||||
* you hit <tt>finish</tt> or <tt>count</tt>.
|
||||
*/
|
||||
val s = if (start.isDefined) start.get else 0
|
||||
val cnt =
|
||||
if (finish.isDefined) {
|
||||
val f = finish.get
|
||||
if (f >= s) Math.min(count, (f - s)) else count
|
||||
}
|
||||
else count
|
||||
db.listRange(new String(encode(name.getBytes)), s, s + cnt - 1) match {
|
||||
case None =>
|
||||
throw new Predef.NoSuchElementException(name + " does not have elements in the range specified")
|
||||
case Some(l) =>
|
||||
l map (_.getBytes)
|
||||
}
|
||||
}
|
||||
|
||||
def getVectorStorageSizeFor(name: String): Int = {
|
||||
db.listLength(new String(encode(name.getBytes))) match {
|
||||
case None =>
|
||||
throw new Predef.NoSuchElementException(name + " not present")
|
||||
case Some(l) => l
|
||||
}
|
||||
}
|
||||
|
||||
def insertRefStorageFor(name: String, element: Array[Byte]) {
|
||||
db.set(new String(encode(name.getBytes)), new String(element))
|
||||
}
|
||||
|
||||
def getRefStorageFor(name: String): Option[Array[Byte]] = {
|
||||
db.get(new String(encode(name.getBytes))) match {
|
||||
case None =>
|
||||
throw new Predef.NoSuchElementException(name + " not present")
|
||||
case Some(s) => Some(s.getBytes)
|
||||
}
|
||||
}
|
||||
|
||||
def flushDB = db.flushDb
|
||||
}
|
||||
|
|
@ -0,0 +1,157 @@
|
|||
package se.scalablesolutions.akka.state
|
||||
|
||||
import junit.framework.TestCase
|
||||
|
||||
import org.junit.{Test, Before}
|
||||
import org.junit.Assert._
|
||||
|
||||
import se.scalablesolutions.akka.actor.Actor
|
||||
|
||||
/**
|
||||
* A persistent actor based on Redis storage.
|
||||
* <p/>
|
||||
* Demonstrates a bank account operation consisting of messages that:
|
||||
* <li>checks balance <tt>Balance</tt></li>
|
||||
* <li>debits amount<tt>Debit</tt></li>
|
||||
* <li>debits multiple amounts<tt>MultiDebit</tt></li>
|
||||
* <li>credits amount<tt>Credit</tt></li>
|
||||
* <p/>
|
||||
* Needs a running Redis server.
|
||||
* @author <a href="http://debasishg.blogspot.com">Debasish Ghosh</a>
|
||||
*/
|
||||
|
||||
case class Balance(accountNo: String)
|
||||
case class Debit(accountNo: String, amount: BigInt, failer: Actor)
|
||||
case class MultiDebit(accountNo: String, amounts: List[BigInt], failer: Actor)
|
||||
case class Credit(accountNo: String, amount: BigInt)
|
||||
case object LogSize
|
||||
|
||||
class AccountActor extends Actor {
|
||||
makeTransactionRequired
|
||||
private val accountState = RedisStorage.newMap
|
||||
private val txnLog = RedisStorage.newVector
|
||||
|
||||
def receive: PartialFunction[Any, Unit] = {
|
||||
// check balance
|
||||
case Balance(accountNo) =>
|
||||
txnLog.add("Balance:%s".format(accountNo).getBytes)
|
||||
reply(BigInt(new String(accountState.get(accountNo.getBytes).get)))
|
||||
|
||||
// debit amount: can fail
|
||||
case Debit(accountNo, amount, failer) =>
|
||||
txnLog.add("Debit:%s %s".format(accountNo, amount.toString).getBytes)
|
||||
|
||||
val m: BigInt =
|
||||
accountState.get(accountNo.getBytes) match {
|
||||
case Some(bytes) => BigInt(new String(bytes))
|
||||
case None => 0
|
||||
}
|
||||
accountState.put(accountNo.getBytes, (m - amount).toString.getBytes)
|
||||
if (amount > m)
|
||||
failer !! "Failure"
|
||||
reply(m - amount)
|
||||
|
||||
// many debits: can fail
|
||||
// demonstrates true rollback even if multiple puts have been done
|
||||
case MultiDebit(accountNo, amounts, failer) =>
|
||||
txnLog.add("MultiDebit:%s %s".format(accountNo, amounts.map(_.intValue).foldLeft(0)(_ + _).toString).getBytes)
|
||||
|
||||
val m: BigInt =
|
||||
accountState.get(accountNo.getBytes) match {
|
||||
case Some(bytes) => BigInt(new String(bytes))
|
||||
case None => 0
|
||||
}
|
||||
var bal: BigInt = 0
|
||||
amounts.foreach {amount =>
|
||||
bal = bal + amount
|
||||
accountState.put(accountNo.getBytes, (m - bal).toString.getBytes)
|
||||
}
|
||||
if (bal > m) failer !! "Failure"
|
||||
reply(m - bal)
|
||||
|
||||
// credit amount
|
||||
case Credit(accountNo, amount) =>
|
||||
txnLog.add("Credit:%s %s".format(accountNo, amount.toString).getBytes)
|
||||
|
||||
val m: BigInt =
|
||||
accountState.get(accountNo.getBytes) match {
|
||||
case Some(bytes) => BigInt(new String(bytes))
|
||||
case None => 0
|
||||
}
|
||||
accountState.put(accountNo.getBytes, (m + amount).toString.getBytes)
|
||||
reply(m + amount)
|
||||
|
||||
case LogSize =>
|
||||
reply(txnLog.length.asInstanceOf[AnyRef])
|
||||
}
|
||||
}
|
||||
|
||||
@serializable class PersistentFailerActor extends Actor {
|
||||
makeTransactionRequired
|
||||
def receive = {
|
||||
case "Failure" =>
|
||||
throw new RuntimeException("expected")
|
||||
}
|
||||
}
|
||||
|
||||
class RedisPersistentActorTest extends TestCase {
|
||||
@Test
|
||||
def testSuccessfulDebit = {
|
||||
val bactor = new AccountActor
|
||||
bactor.start
|
||||
val failer = new PersistentFailerActor
|
||||
failer.start
|
||||
bactor !! Credit("a-123", 5000)
|
||||
bactor !! Debit("a-123", 3000, failer)
|
||||
assertEquals(BigInt(2000), (bactor !! Balance("a-123")).get)
|
||||
|
||||
bactor !! Credit("a-123", 7000)
|
||||
assertEquals(BigInt(9000), (bactor !! Balance("a-123")).get)
|
||||
|
||||
bactor !! Debit("a-123", 8000, failer)
|
||||
assertEquals(BigInt(1000), (bactor !! Balance("a-123")).get)
|
||||
|
||||
assertEquals(7, (bactor !! LogSize).get)
|
||||
}
|
||||
|
||||
@Test
|
||||
def testUnsuccessfulDebit = {
|
||||
val bactor = new AccountActor
|
||||
bactor.start
|
||||
bactor !! Credit("a-123", 5000)
|
||||
assertEquals(BigInt(5000), (bactor !! Balance("a-123")).get)
|
||||
|
||||
val failer = new PersistentFailerActor
|
||||
failer.start
|
||||
try {
|
||||
bactor !! Debit("a-123", 7000, failer)
|
||||
fail("should throw exception")
|
||||
} catch { case e: RuntimeException => {}}
|
||||
|
||||
assertEquals(BigInt(5000), (bactor !! Balance("a-123")).get)
|
||||
|
||||
// should not count the failed one
|
||||
assertEquals(3, (bactor !! LogSize).get)
|
||||
}
|
||||
|
||||
@Test
|
||||
def testUnsuccessfulMultiDebit = {
|
||||
val bactor = new AccountActor
|
||||
bactor.start
|
||||
bactor !! Credit("a-123", 5000)
|
||||
|
||||
assertEquals(BigInt(5000), (bactor !! Balance("a-123")).get)
|
||||
|
||||
val failer = new PersistentFailerActor
|
||||
failer.start
|
||||
try {
|
||||
bactor !! MultiDebit("a-123", List(500, 2000, 1000, 3000), failer)
|
||||
fail("should throw exception")
|
||||
} catch { case e: RuntimeException => {}}
|
||||
|
||||
assertEquals(BigInt(5000), (bactor !! Balance("a-123")).get)
|
||||
|
||||
// should not count the failed one
|
||||
assertEquals(3, (bactor !! LogSize).get)
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,139 @@
|
|||
package se.scalablesolutions.akka.state
|
||||
|
||||
import org.scalatest.Spec
|
||||
import org.scalatest.matchers.ShouldMatchers
|
||||
import org.scalatest.BeforeAndAfterAll
|
||||
import org.scalatest.junit.JUnitRunner
|
||||
import org.junit.runner.RunWith
|
||||
|
||||
import se.scalablesolutions.akka.serialization.Serializable
|
||||
|
||||
import RedisStorageBackend._
|
||||
|
||||
@RunWith(classOf[JUnitRunner])
|
||||
class RedisStorageBackendTest extends
|
||||
Spec with
|
||||
ShouldMatchers with
|
||||
BeforeAndAfterAll {
|
||||
|
||||
override def beforeAll {
|
||||
flushDB
|
||||
println("** destroyed database")
|
||||
}
|
||||
|
||||
override def afterAll {
|
||||
flushDB
|
||||
println("** destroyed database")
|
||||
}
|
||||
|
||||
describe("Store and query in maps") {
|
||||
it("should enter 4 entries in redis for transaction T-1") {
|
||||
insertMapStorageEntryFor("T-1", "debasish.company".getBytes, "anshinsoft".getBytes)
|
||||
insertMapStorageEntryFor("T-1", "debasish.language".getBytes, "java".getBytes)
|
||||
insertMapStorageEntryFor("T-1", "debasish.age".getBytes, "44".getBytes)
|
||||
insertMapStorageEntryFor("T-1", "debasish.spouse".getBytes, "paramita".getBytes)
|
||||
|
||||
getMapStorageSizeFor("T-1") should equal(4)
|
||||
new String(getMapStorageEntryFor(
|
||||
"T-1", "debasish.language".getBytes).get) should equal("java")
|
||||
}
|
||||
|
||||
it("should enter a custom object for transaction T-1") {
|
||||
val n = Name(100, "debasish", "kolkata")
|
||||
insertMapStorageEntryFor("T-1", "debasish.identity".getBytes, n.toBytes)
|
||||
getMapStorageSizeFor("T-1") should equal(5)
|
||||
}
|
||||
|
||||
it("should enter key/values for another transaction T-2") {
|
||||
insertMapStorageEntryFor("T-2", "debasish.age".getBytes, "49".getBytes)
|
||||
insertMapStorageEntryFor("T-2", "debasish.spouse".getBytes, "paramita".getBytes)
|
||||
getMapStorageSizeFor("T-1") should equal(5)
|
||||
getMapStorageSizeFor("T-2") should equal(2)
|
||||
}
|
||||
|
||||
it("should remove map storage for T-1 and T2") {
|
||||
removeMapStorageFor("T-1")
|
||||
removeMapStorageFor("T-2")
|
||||
}
|
||||
}
|
||||
|
||||
describe("Range query in maps") {
|
||||
it("should enter 7 entries in redis for transaction T-5") {
|
||||
insertMapStorageEntryFor("T-5", "trade.refno".getBytes, "R-123".getBytes)
|
||||
insertMapStorageEntryFor("T-5", "trade.instrument".getBytes, "IBM".getBytes)
|
||||
insertMapStorageEntryFor("T-5", "trade.type".getBytes, "BUY".getBytes)
|
||||
insertMapStorageEntryFor("T-5", "trade.account".getBytes, "A-123".getBytes)
|
||||
insertMapStorageEntryFor("T-5", "trade.amount".getBytes, "1000000".getBytes)
|
||||
insertMapStorageEntryFor("T-5", "trade.quantity".getBytes, "1000".getBytes)
|
||||
insertMapStorageEntryFor("T-5", "trade.broker".getBytes, "Nomura".getBytes)
|
||||
getMapStorageSizeFor("T-5") should equal(7)
|
||||
|
||||
getMapStorageRangeFor("T-5",
|
||||
Some("trade.account".getBytes),
|
||||
None, 3).map(e => (new String(e._1), new String(e._2))).size should equal(3)
|
||||
|
||||
getMapStorageRangeFor("T-5",
|
||||
Some("trade.account".getBytes),
|
||||
Some("trade.type".getBytes), 3).map(e => (new String(e._1), new String(e._2))).size should equal(6)
|
||||
|
||||
getMapStorageRangeFor("T-5",
|
||||
Some("trade.account".getBytes),
|
||||
Some("trade.type".getBytes), 0).map(e => (new String(e._1), new String(e._2))).size should equal(6)
|
||||
|
||||
getMapStorageRangeFor("T-5",
|
||||
Some("trade.account".getBytes),
|
||||
None, 0).map(e => (new String(e._1), new String(e._2))).size should equal(7)
|
||||
}
|
||||
it("should remove map storage for T5") {
|
||||
removeMapStorageFor("T-5")
|
||||
}
|
||||
}
|
||||
|
||||
describe("Store and query in vectors") {
|
||||
it("should write 4 entries in a vector for transaction T-3") {
|
||||
insertVectorStorageEntryFor("T-3", "debasish".getBytes)
|
||||
insertVectorStorageEntryFor("T-3", "maulindu".getBytes)
|
||||
val n = Name(100, "debasish", "kolkata")
|
||||
insertVectorStorageEntryFor("T-3", n.toBytes)
|
||||
insertVectorStorageEntryFor("T-3", "1200".getBytes)
|
||||
getVectorStorageSizeFor("T-3") should equal(4)
|
||||
}
|
||||
}
|
||||
|
||||
describe("Store and query in ref") {
|
||||
it("should write 4 entries in 4 refs for transaction T-4") {
|
||||
insertRefStorageFor("T-4", "debasish".getBytes)
|
||||
insertRefStorageFor("T-4", "maulindu".getBytes)
|
||||
|
||||
insertRefStorageFor("T-4", "1200".getBytes)
|
||||
new String(getRefStorageFor("T-4").get) should equal("1200")
|
||||
|
||||
val n = Name(100, "debasish", "kolkata")
|
||||
insertRefStorageFor("T-4", n.toBytes)
|
||||
n.fromBytes(getRefStorageFor("T-4").get) should equal(n)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
case class Name(id: Int, name: String, address: String)
|
||||
extends Serializable.SBinary[Name] {
|
||||
import sbinary.DefaultProtocol._
|
||||
|
||||
def this() = this(0, null, null)
|
||||
|
||||
implicit object NameFormat extends Format[Name] {
|
||||
def reads(in : Input) = Name(
|
||||
read[Int](in),
|
||||
read[String](in),
|
||||
read[String](in))
|
||||
def writes(out: Output, value: Name) = {
|
||||
write[Int](out, value.id)
|
||||
write[String](out, value.name)
|
||||
write[String](out, value.address)
|
||||
}
|
||||
}
|
||||
|
||||
def fromBytes(bytes: Array[Byte]) = fromByteArray[Name](bytes)
|
||||
|
||||
def toBytes: Array[Byte] = toByteArray(this)
|
||||
}
|
||||
|
|
@ -77,5 +77,10 @@
|
|||
port = 27017
|
||||
dbname = "mydb"
|
||||
</mongodb>
|
||||
|
||||
<redis>
|
||||
hostname = "127.0.0.1" # IP address or hostname of the Redis instance
|
||||
port = 6379
|
||||
</redis>
|
||||
</storage>
|
||||
</akka>
|
||||
|
|
|
|||
BIN
embedded-repo/com/redis/redisclient/1.0.1/redisclient-1.0.1.jar
Normal file
BIN
embedded-repo/com/redis/redisclient/1.0.1/redisclient-1.0.1.jar
Normal file
Binary file not shown.
2
pom.xml
2
pom.xml
|
|
@ -18,7 +18,7 @@
|
|||
* Simple and high-level abstractions for concurrency and parallelism.
|
||||
* Asynchronous, non-blocking and highly performant event-driven programming model.
|
||||
* Very lightweight event-driven processes (create ~6.5 million actors on 4 G RAM).
|
||||
* Supervision hierarchies with let-it-crash semantics. For writing highly fault-tolerant systems that never stops, systems that self-heals.
|
||||
* Supervision hierarchies with let-it-crash semantics. For writing highly fault-tolerant systems that never stop, systems that self-heal.
|
||||
* Software Transactional Memory (STM). (Distributed transactions coming soon).
|
||||
* Transactors: combine actors and STM into transactional actors. Allows you to compose atomic message flows with automatic rollback and retry.
|
||||
* Remoting: highly performant distributed actors with remote supervision and error management.
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue