diff --git a/akka-persistence-redis/pom.xml b/akka-persistence-redis/pom.xml new file mode 100644 index 0000000000..ba2f295aaf --- /dev/null +++ b/akka-persistence-redis/pom.xml @@ -0,0 +1,46 @@ + + 4.0.0 + + akka-persistence-redis + Akka Persistence Redis Module + + jar + + + akka + se.scalablesolutions.akka + 0.6 + ../pom.xml + + + + + akka-persistence-common + ${project.groupId} + ${project.version} + + + + + com.redis + redisclient + 1.0.1 + + + + + org.scalatest + scalatest + 1.0 + test + + + junit + junit + 4.5 + test + + + + diff --git a/akka-persistence-redis/src/main/scala/RedisStorage.scala b/akka-persistence-redis/src/main/scala/RedisStorage.scala new file mode 100644 index 0000000000..f836cfa293 --- /dev/null +++ b/akka-persistence-redis/src/main/scala/RedisStorage.scala @@ -0,0 +1,49 @@ +/** + * Copyright (C) 2009 Scalable Solutions. + */ + +package se.scalablesolutions.akka.state + +import org.codehaus.aspectwerkz.proxy.Uuid + +object RedisStorage extends Storage { + type ElementType = Array[Byte] + + def newMap: PersistentMap[ElementType, ElementType] = newMap(Uuid.newUuid.toString) + def newVector: PersistentVector[ElementType] = newVector(Uuid.newUuid.toString) + def newRef: PersistentRef[ElementType] = newRef(Uuid.newUuid.toString) + + def getMap(id: String): PersistentMap[ElementType, ElementType] = newMap(id) + def getVector(id: String): PersistentVector[ElementType] = newVector(id) + def getRef(id: String): PersistentRef[ElementType] = newRef(id) + + def newMap(id: String): PersistentMap[ElementType, ElementType] = new RedisPersistentMap(id) + def newVector(id: String): PersistentVector[ElementType] = new RedisPersistentVector(id) + def newRef(id: String): PersistentRef[ElementType] = new RedisPersistentRef(id) +} + +/** + * Implements a persistent transactional map based on the MongoDB document storage. + * + * @author Debasish Ghosh + */ +class RedisPersistentMap(id: String) extends PersistentMap[Array[Byte], Array[Byte]] { + val uuid = id + val storage = RedisStorageBackend +} + +/** + * Implements a persistent transactional vector based on the Redis + * document storage. + * + * @author Debasish Ghosh + */ +class RedisPersistentVector(id: String) extends PersistentVector[Array[Byte]] { + val uuid = id + val storage = RedisStorageBackend +} + +class RedisPersistentRef(id: String) extends PersistentRef[Array[Byte]] { + val uuid = id + val storage = RedisStorageBackend +} diff --git a/akka-persistence-redis/src/main/scala/RedisStorageBackend.scala b/akka-persistence-redis/src/main/scala/RedisStorageBackend.scala new file mode 100644 index 0000000000..2a9f5f1c61 --- /dev/null +++ b/akka-persistence-redis/src/main/scala/RedisStorageBackend.scala @@ -0,0 +1,251 @@ +/** + * Copyright (C) 2009 Scalable Solutions. + */ + +package se.scalablesolutions.akka.state + +import se.scalablesolutions.akka.util.Logging +import se.scalablesolutions.akka.Config.config + +import com.redis._ + +trait Encoder { + def encode(bytes: Array[Byte]): Array[Byte] + def decode(bytes: Array[Byte]): Array[Byte] +} + +trait CommonsCodecBase64 { + val base64 = new org.apache.commons.codec.binary.Base64 + + def encode(bytes: Array[Byte]): Array[Byte] = base64.encode(bytes) + def decode(bytes: Array[Byte]): Array[Byte] = base64.decode(bytes) +} + +object Base64Encoder extends Encoder with CommonsCodecBase64 +import Base64Encoder._ + +/** + * A module for supporting Redis based persistence. + *

+ * The module offers functionality for: + *

  • Persistent Maps
  • + *
  • Persistent Vectors
  • + *
  • Persistent Refs
  • + *

    + * @author Debasish Ghosh + */ +private [akka] object RedisStorageBackend extends + MapStorageBackend[Array[Byte], Array[Byte]] with + VectorStorageBackend[Array[Byte]] with + RefStorageBackend[Array[Byte]] with + Logging { + + val REDIS_SERVER_HOSTNAME = config.getString("akka.storage.redis.hostname", "127.0.0.1") + val REDIS_SERVER_PORT = config.getInt("akka.storage.redis.port", 6379) + + val db = new Redis(REDIS_SERVER_HOSTNAME, REDIS_SERVER_PORT) + + /** + * Map storage in Redis. + *

    + * Maps are stored as key/value pairs in redis. Redis keys cannot contain spaces. But with + * our use case, the keys will be specified by the user. Hence we need to encode the key + * ourselves before sending to Redis. We use base64 encoding. + *

    + * Also since we are storing the key/value in the global namespace, we need to construct the + * key suitably so as to avoid namespace clash. The following strategy is used: + * + * Unique identifier for the map = T1 (say) + *

    +   * Map(
    +   *   "debasish.address" -> "kolkata, India",
    +   *   "debasish.company" -> "anshinsoft",
    +   *   "debasish.programming_language" -> "scala",
    +   * )
    + * will be stored as the following key-value pair in Redis: + * + * + * base64(T1):base64("debasish.address") -> "kolkata, India" + * base64(T1):base64("debasish.company") -> "anshinsoft" + * base64(T1):base64("debasish.programming_language") -> "scala" + * + */ + def insertMapStorageEntryFor(name: String, key: Array[Byte], value: Array[Byte]) { + insertMapStorageEntriesFor(name, List((key, value))) + } + + def insertMapStorageEntriesFor(name: String, entries: List[Tuple2[Array[Byte], Array[Byte]]]) { + mset(entries.map(e => + (makeRedisKey(name, e._1), new String(e._2)))) + } + + /** + * Make a redis key from an Akka Map key. + *

    + * The key is made as follows: + *

  • redis key is composed of 2 parts: the transaction id and the map key separated by :
  • + *
  • : is chosen since it cannot appear in base64 encoding charset
  • + *
  • both parts of the key need to be based64 encoded since there can be spaces within each of them
  • + */ + private [this] def makeRedisKey(name: String, key: Array[Byte]): String = { + "%s:%s".format(new String(encode(name.getBytes)), new String(encode(key))) + } + + private [this] def makeKeyFromRedisKey(redisKey: String) = { + val nk = redisKey.split(':').map{e: String => decode(e.getBytes)} + (nk(0), nk(1)) + } + + private [this] def mset(entries: List[(String, String)]) { + entries.foreach {e: (String, String) => + db.set(e._1, e._2) + } + } + + def removeMapStorageFor(name: String): Unit = { + db.keys("%s:*".format(encode(name.getBytes))) match { + case None => + throw new Predef.NoSuchElementException(name + " not present") + case Some(keys) => + keys.foreach(db.delete(_)) + } + } + + def removeMapStorageFor(name: String, key: Array[Byte]): Unit = { + db.delete(makeRedisKey(name, key)) + } + + def getMapStorageEntryFor(name: String, key: Array[Byte]): Option[Array[Byte]] = + db.get(makeRedisKey(name, key)) match { + case None => + throw new Predef.NoSuchElementException(new String(key) + " not present") + case Some(s) => Some(s.getBytes) + } + + def getMapStorageSizeFor(name: String): Int = { + db.keys("%s:*".format(new String(encode(name.getBytes)))) match { + case None => 0 + case Some(keys) => + keys.length + } + } + + def getMapStorageFor(name: String): List[(Array[Byte], Array[Byte])] = { + db.keys("%s:*".format(new String(encode(name.getBytes)))) match { + case None => + throw new Predef.NoSuchElementException(name + " not present") + case Some(keys) => + keys.map(key => (makeKeyFromRedisKey(key)._2, db.get(key).get.getBytes)).toList + } + } + + def getMapStorageRangeFor(name: String, start: Option[Array[Byte]], + finish: Option[Array[Byte]], + count: Int): List[(Array[Byte], Array[Byte])] = { + + import scala.collection.immutable.TreeMap + val wholeSorted = + TreeMap(getMapStorageFor(name).map(e => (new String(e._1), e._2)): _*) + + if (wholeSorted isEmpty) List() + + val startKey = + start match { + case Some(bytes) => Some(new String(bytes)) + case None => None + } + + val endKey = + finish match { + case Some(bytes) => Some(new String(bytes)) + case None => None + } + + ((startKey, endKey, count): @unchecked) match { + case ((Some(s), Some(e), _)) => + wholeSorted.range(s, e) + .toList + .map(e => (e._1.getBytes, e._2)) + .toList + case ((Some(s), None, c)) if c > 0 => + wholeSorted.from(s) + .elements + .take(count) + .map(e => (e._1.getBytes, e._2)) + .toList + case ((Some(s), None, _)) => + wholeSorted.from(s) + .toList + .map(e => (e._1.getBytes, e._2)) + .toList + case ((None, Some(e), _)) => + wholeSorted.until(e) + .toList + .map(e => (e._1.getBytes, e._2)) + .toList + } + } + + def insertVectorStorageEntryFor(name: String, element: Array[Byte]) { + db.pushHead(new String(encode(name.getBytes)), new String(element)) + } + + def insertVectorStorageEntriesFor(name: String, elements: List[Array[Byte]]) { + elements.foreach(insertVectorStorageEntryFor(name, _)) + } + + def updateVectorStorageEntryFor(name: String, index: Int, elem: Array[Byte]) { + db.listSet(new String(encode(name.getBytes)), index, new String(elem)) + } + + def getVectorStorageEntryFor(name: String, index: Int): Array[Byte] = { + db.listIndex(new String(encode(name.getBytes)), index) match { + case None => + throw new Predef.NoSuchElementException(name + " does not have element at " + index) + case Some(e) => e.getBytes + } + } + + def getVectorStorageRangeFor(name: String, start: Option[Int], finish: Option[Int], count: Int): List[Array[Byte]] = { + /** + * count is the max number of results to return. Start with + * start or 0 (if start is not defined) and go until + * you hit finish or count. + */ + val s = if (start.isDefined) start.get else 0 + val cnt = + if (finish.isDefined) { + val f = finish.get + if (f >= s) Math.min(count, (f - s)) else count + } + else count + db.listRange(new String(encode(name.getBytes)), s, s + cnt - 1) match { + case None => + throw new Predef.NoSuchElementException(name + " does not have elements in the range specified") + case Some(l) => + l map (_.getBytes) + } + } + + def getVectorStorageSizeFor(name: String): Int = { + db.listLength(new String(encode(name.getBytes))) match { + case None => + throw new Predef.NoSuchElementException(name + " not present") + case Some(l) => l + } + } + + def insertRefStorageFor(name: String, element: Array[Byte]) { + db.set(new String(encode(name.getBytes)), new String(element)) + } + + def getRefStorageFor(name: String): Option[Array[Byte]] = { + db.get(new String(encode(name.getBytes))) match { + case None => + throw new Predef.NoSuchElementException(name + " not present") + case Some(s) => Some(s.getBytes) + } + } + + def flushDB = db.flushDb +} diff --git a/akka-persistence-redis/src/test/scala/RedisPersistentActorTest.scala b/akka-persistence-redis/src/test/scala/RedisPersistentActorTest.scala new file mode 100644 index 0000000000..e4b8bfd22b --- /dev/null +++ b/akka-persistence-redis/src/test/scala/RedisPersistentActorTest.scala @@ -0,0 +1,157 @@ +package se.scalablesolutions.akka.state + +import junit.framework.TestCase + +import org.junit.{Test, Before} +import org.junit.Assert._ + +import se.scalablesolutions.akka.actor.Actor + +/** + * A persistent actor based on Redis storage. + *

    + * Demonstrates a bank account operation consisting of messages that: + *

  • checks balance Balance
  • + *
  • debits amountDebit
  • + *
  • debits multiple amountsMultiDebit
  • + *
  • credits amountCredit
  • + *

    + * Needs a running Redis server. + * @author Debasish Ghosh + */ + +case class Balance(accountNo: String) +case class Debit(accountNo: String, amount: BigInt, failer: Actor) +case class MultiDebit(accountNo: String, amounts: List[BigInt], failer: Actor) +case class Credit(accountNo: String, amount: BigInt) +case object LogSize + +class AccountActor extends Actor { + makeTransactionRequired + private val accountState = RedisStorage.newMap + private val txnLog = RedisStorage.newVector + + def receive: PartialFunction[Any, Unit] = { + // check balance + case Balance(accountNo) => + txnLog.add("Balance:%s".format(accountNo).getBytes) + reply(BigInt(new String(accountState.get(accountNo.getBytes).get))) + + // debit amount: can fail + case Debit(accountNo, amount, failer) => + txnLog.add("Debit:%s %s".format(accountNo, amount.toString).getBytes) + + val m: BigInt = + accountState.get(accountNo.getBytes) match { + case Some(bytes) => BigInt(new String(bytes)) + case None => 0 + } + accountState.put(accountNo.getBytes, (m - amount).toString.getBytes) + if (amount > m) + failer !! "Failure" + reply(m - amount) + + // many debits: can fail + // demonstrates true rollback even if multiple puts have been done + case MultiDebit(accountNo, amounts, failer) => + txnLog.add("MultiDebit:%s %s".format(accountNo, amounts.map(_.intValue).foldLeft(0)(_ + _).toString).getBytes) + + val m: BigInt = + accountState.get(accountNo.getBytes) match { + case Some(bytes) => BigInt(new String(bytes)) + case None => 0 + } + var bal: BigInt = 0 + amounts.foreach {amount => + bal = bal + amount + accountState.put(accountNo.getBytes, (m - bal).toString.getBytes) + } + if (bal > m) failer !! "Failure" + reply(m - bal) + + // credit amount + case Credit(accountNo, amount) => + txnLog.add("Credit:%s %s".format(accountNo, amount.toString).getBytes) + + val m: BigInt = + accountState.get(accountNo.getBytes) match { + case Some(bytes) => BigInt(new String(bytes)) + case None => 0 + } + accountState.put(accountNo.getBytes, (m + amount).toString.getBytes) + reply(m + amount) + + case LogSize => + reply(txnLog.length.asInstanceOf[AnyRef]) + } +} + +@serializable class PersistentFailerActor extends Actor { + makeTransactionRequired + def receive = { + case "Failure" => + throw new RuntimeException("expected") + } +} + +class RedisPersistentActorTest extends TestCase { + @Test + def testSuccessfulDebit = { + val bactor = new AccountActor + bactor.start + val failer = new PersistentFailerActor + failer.start + bactor !! Credit("a-123", 5000) + bactor !! Debit("a-123", 3000, failer) + assertEquals(BigInt(2000), (bactor !! Balance("a-123")).get) + + bactor !! Credit("a-123", 7000) + assertEquals(BigInt(9000), (bactor !! Balance("a-123")).get) + + bactor !! Debit("a-123", 8000, failer) + assertEquals(BigInt(1000), (bactor !! Balance("a-123")).get) + + assertEquals(7, (bactor !! LogSize).get) + } + + @Test + def testUnsuccessfulDebit = { + val bactor = new AccountActor + bactor.start + bactor !! Credit("a-123", 5000) + assertEquals(BigInt(5000), (bactor !! Balance("a-123")).get) + + val failer = new PersistentFailerActor + failer.start + try { + bactor !! Debit("a-123", 7000, failer) + fail("should throw exception") + } catch { case e: RuntimeException => {}} + + assertEquals(BigInt(5000), (bactor !! Balance("a-123")).get) + + // should not count the failed one + assertEquals(3, (bactor !! LogSize).get) + } + + @Test + def testUnsuccessfulMultiDebit = { + val bactor = new AccountActor + bactor.start + bactor !! Credit("a-123", 5000) + + assertEquals(BigInt(5000), (bactor !! Balance("a-123")).get) + + val failer = new PersistentFailerActor + failer.start + try { + bactor !! MultiDebit("a-123", List(500, 2000, 1000, 3000), failer) + fail("should throw exception") + } catch { case e: RuntimeException => {}} + + assertEquals(BigInt(5000), (bactor !! Balance("a-123")).get) + + // should not count the failed one + assertEquals(3, (bactor !! LogSize).get) + } +} diff --git a/akka-persistence-redis/src/test/scala/RedisStorageBackendTest.scala b/akka-persistence-redis/src/test/scala/RedisStorageBackendTest.scala new file mode 100644 index 0000000000..262bda47eb --- /dev/null +++ b/akka-persistence-redis/src/test/scala/RedisStorageBackendTest.scala @@ -0,0 +1,139 @@ +package se.scalablesolutions.akka.state + +import org.scalatest.Spec +import org.scalatest.matchers.ShouldMatchers +import org.scalatest.BeforeAndAfterAll +import org.scalatest.junit.JUnitRunner +import org.junit.runner.RunWith + +import se.scalablesolutions.akka.serialization.Serializable + +import RedisStorageBackend._ + +@RunWith(classOf[JUnitRunner]) +class RedisStorageBackendTest extends + Spec with + ShouldMatchers with + BeforeAndAfterAll { + + override def beforeAll { + flushDB + println("** destroyed database") + } + + override def afterAll { + flushDB + println("** destroyed database") + } + + describe("Store and query in maps") { + it("should enter 4 entries in redis for transaction T-1") { + insertMapStorageEntryFor("T-1", "debasish.company".getBytes, "anshinsoft".getBytes) + insertMapStorageEntryFor("T-1", "debasish.language".getBytes, "java".getBytes) + insertMapStorageEntryFor("T-1", "debasish.age".getBytes, "44".getBytes) + insertMapStorageEntryFor("T-1", "debasish.spouse".getBytes, "paramita".getBytes) + + getMapStorageSizeFor("T-1") should equal(4) + new String(getMapStorageEntryFor( + "T-1", "debasish.language".getBytes).get) should equal("java") + } + + it("should enter a custom object for transaction T-1") { + val n = Name(100, "debasish", "kolkata") + insertMapStorageEntryFor("T-1", "debasish.identity".getBytes, n.toBytes) + getMapStorageSizeFor("T-1") should equal(5) + } + + it("should enter key/values for another transaction T-2") { + insertMapStorageEntryFor("T-2", "debasish.age".getBytes, "49".getBytes) + insertMapStorageEntryFor("T-2", "debasish.spouse".getBytes, "paramita".getBytes) + getMapStorageSizeFor("T-1") should equal(5) + getMapStorageSizeFor("T-2") should equal(2) + } + + it("should remove map storage for T-1 and T2") { + removeMapStorageFor("T-1") + removeMapStorageFor("T-2") + } + } + + describe("Range query in maps") { + it("should enter 7 entries in redis for transaction T-5") { + insertMapStorageEntryFor("T-5", "trade.refno".getBytes, "R-123".getBytes) + insertMapStorageEntryFor("T-5", "trade.instrument".getBytes, "IBM".getBytes) + insertMapStorageEntryFor("T-5", "trade.type".getBytes, "BUY".getBytes) + insertMapStorageEntryFor("T-5", "trade.account".getBytes, "A-123".getBytes) + insertMapStorageEntryFor("T-5", "trade.amount".getBytes, "1000000".getBytes) + insertMapStorageEntryFor("T-5", "trade.quantity".getBytes, "1000".getBytes) + insertMapStorageEntryFor("T-5", "trade.broker".getBytes, "Nomura".getBytes) + getMapStorageSizeFor("T-5") should equal(7) + + getMapStorageRangeFor("T-5", + Some("trade.account".getBytes), + None, 3).map(e => (new String(e._1), new String(e._2))).size should equal(3) + + getMapStorageRangeFor("T-5", + Some("trade.account".getBytes), + Some("trade.type".getBytes), 3).map(e => (new String(e._1), new String(e._2))).size should equal(6) + + getMapStorageRangeFor("T-5", + Some("trade.account".getBytes), + Some("trade.type".getBytes), 0).map(e => (new String(e._1), new String(e._2))).size should equal(6) + + getMapStorageRangeFor("T-5", + Some("trade.account".getBytes), + None, 0).map(e => (new String(e._1), new String(e._2))).size should equal(7) + } + it("should remove map storage for T5") { + removeMapStorageFor("T-5") + } + } + + describe("Store and query in vectors") { + it("should write 4 entries in a vector for transaction T-3") { + insertVectorStorageEntryFor("T-3", "debasish".getBytes) + insertVectorStorageEntryFor("T-3", "maulindu".getBytes) + val n = Name(100, "debasish", "kolkata") + insertVectorStorageEntryFor("T-3", n.toBytes) + insertVectorStorageEntryFor("T-3", "1200".getBytes) + getVectorStorageSizeFor("T-3") should equal(4) + } + } + + describe("Store and query in ref") { + it("should write 4 entries in 4 refs for transaction T-4") { + insertRefStorageFor("T-4", "debasish".getBytes) + insertRefStorageFor("T-4", "maulindu".getBytes) + + insertRefStorageFor("T-4", "1200".getBytes) + new String(getRefStorageFor("T-4").get) should equal("1200") + + val n = Name(100, "debasish", "kolkata") + insertRefStorageFor("T-4", n.toBytes) + n.fromBytes(getRefStorageFor("T-4").get) should equal(n) + } + } +} + +case class Name(id: Int, name: String, address: String) + extends Serializable.SBinary[Name] { + import sbinary.DefaultProtocol._ + + def this() = this(0, null, null) + + implicit object NameFormat extends Format[Name] { + def reads(in : Input) = Name( + read[Int](in), + read[String](in), + read[String](in)) + def writes(out: Output, value: Name) = { + write[Int](out, value.id) + write[String](out, value.name) + write[String](out, value.address) + } + } + + def fromBytes(bytes: Array[Byte]) = fromByteArray[Name](bytes) + + def toBytes: Array[Byte] = toByteArray(this) +} diff --git a/config/akka-reference.conf b/config/akka-reference.conf index 7bdfa6b4f1..799d18f61d 100644 --- a/config/akka-reference.conf +++ b/config/akka-reference.conf @@ -77,5 +77,10 @@ port = 27017 dbname = "mydb" + + + hostname = "127.0.0.1" # IP address or hostname of the Redis instance + port = 27017 + diff --git a/embedded-repo/com/redis/redisclient/1.0.1/redisclient-1.0.1.jar b/embedded-repo/com/redis/redisclient/1.0.1/redisclient-1.0.1.jar new file mode 100644 index 0000000000..d4a293effa Binary files /dev/null and b/embedded-repo/com/redis/redisclient/1.0.1/redisclient-1.0.1.jar differ