diff --git a/akka-persistence/akka-persistence-common/src/main/scala/StorageBackend.scala b/akka-persistence/akka-persistence-common/src/main/scala/StorageBackend.scala index a5226eb1a4..df74040b68 100644 --- a/akka-persistence/akka-persistence-common/src/main/scala/StorageBackend.scala +++ b/akka-persistence/akka-persistence-common/src/main/scala/StorageBackend.scala @@ -33,14 +33,6 @@ trait VectorStorageBackend[T] extends StorageBackend { trait RefStorageBackend[T] extends StorageBackend { def insertRefStorageFor(name: String, element: T) def getRefStorageFor(name: String): Option[T] - def incrementAtomically(name: String): Option[Int] = - throw new UnsupportedOperationException // only for redis - def incrementByAtomically(name: String, by: Int): Option[Int] = - throw new UnsupportedOperationException // only for redis - def decrementAtomically(name: String): Option[Int] = - throw new UnsupportedOperationException // only for redis - def decrementByAtomically(name: String, by: Int): Option[Int] = - throw new UnsupportedOperationException // only for redis } // for Queue diff --git a/akka-persistence/akka-persistence-redis/src/main/scala/RedisStorageBackend.scala b/akka-persistence/akka-persistence-redis/src/main/scala/RedisStorageBackend.scala index b1973c3c7b..ad758f9999 100644 --- a/akka-persistence/akka-persistence-redis/src/main/scala/RedisStorageBackend.scala +++ b/akka-persistence/akka-persistence-redis/src/main/scala/RedisStorageBackend.scala @@ -11,20 +11,45 @@ import se.scalablesolutions.akka.config.Config.config import com.redis._ -trait Encoder { +trait Base64Encoder { def encode(bytes: Array[Byte]): Array[Byte] def decode(bytes: Array[Byte]): Array[Byte] } -trait CommonsCodecBase64 { - import org.apache.commons.codec.binary.Base64._ - - def encode(bytes: Array[Byte]): Array[Byte] = encodeBase64(bytes) - def decode(bytes: Array[Byte]): Array[Byte] = decodeBase64(bytes) +trait Base64StringEncoder { + def byteArrayToString(bytes: Array[Byte]): String + def stringToByteArray(str: String): Array[Byte] } -object Base64Encoder extends Encoder with CommonsCodecBase64 -import Base64Encoder._ +trait NullBase64 { + def encode(bytes: Array[Byte]): Array[Byte] = bytes + def decode(bytes: Array[Byte]): Array[Byte] = bytes +} + +object CommonsCodec { + import org.apache.commons.codec.binary.Base64 + import org.apache.commons.codec.binary.Base64._ + + val b64 = new Base64(true) + + trait CommonsCodecBase64 { + def encode(bytes: Array[Byte]): Array[Byte] = encodeBase64(bytes) + def decode(bytes: Array[Byte]): Array[Byte] = decodeBase64(bytes) + } + + object Base64Encoder extends Base64Encoder with CommonsCodecBase64 + + trait CommonsCodecBase64StringEncoder { + def byteArrayToString(bytes: Array[Byte]) = encodeBase64URLSafeString(bytes) + def stringToByteArray(str: String) = b64.decode(str) + } + + object Base64StringEncoder extends Base64StringEncoder with CommonsCodecBase64StringEncoder +} + +import CommonsCodec._ +import CommonsCodec.Base64Encoder._ +import CommonsCodec.Base64StringEncoder._ /** * A module for supporting Redis based persistence. @@ -95,7 +120,7 @@ private [akka] object RedisStorageBackend extends def insertMapStorageEntriesFor(name: String, entries: List[Tuple2[Array[Byte], Array[Byte]]]): Unit = withErrorHandling { mset(entries.map(e => - (makeRedisKey(name, e._1), new String(e._2)))) + (makeRedisKey(name, e._1), byteArrayToString(e._2)))) } /** @@ -138,7 +163,7 @@ private [akka] object RedisStorageBackend extends db.get(makeRedisKey(name, key)) match { case None => throw new NoSuchElementException(new String(key) + " not present") - case Some(s) => Some(s.getBytes) + case Some(s) => Some(stringToByteArray(s)) } } @@ -155,7 +180,7 @@ private [akka] object RedisStorageBackend extends case None => throw new NoSuchElementException(name + " not present") case Some(keys) => - keys.map(key => (makeKeyFromRedisKey(key)._2, db.get(key).get.getBytes)).toList + keys.map(key => (makeKeyFromRedisKey(key)._2, stringToByteArray(db.get(key).get))).toList } } @@ -207,7 +232,7 @@ private [akka] object RedisStorageBackend extends } def insertVectorStorageEntryFor(name: String, element: Array[Byte]): Unit = withErrorHandling { - db.lpush(new String(encode(name.getBytes)), new String(element)) + db.lpush(new String(encode(name.getBytes)), byteArrayToString(element)) } def insertVectorStorageEntriesFor(name: String, elements: List[Array[Byte]]): Unit = withErrorHandling { @@ -215,14 +240,15 @@ private [akka] object RedisStorageBackend extends } def updateVectorStorageEntryFor(name: String, index: Int, elem: Array[Byte]): Unit = withErrorHandling { - db.lset(new String(encode(name.getBytes)), index, new String(elem)) + db.lset(new String(encode(name.getBytes)), index, byteArrayToString(elem)) } def getVectorStorageEntryFor(name: String, index: Int): Array[Byte] = withErrorHandling { db.lindex(new String(encode(name.getBytes)), index) match { case None => throw new NoSuchElementException(name + " does not have element at " + index) - case Some(e) => e.getBytes + case Some(e) => + stringToByteArray(e) } } @@ -246,75 +272,46 @@ private [akka] object RedisStorageBackend extends case None => throw new NoSuchElementException(name + " does not have elements in the range specified") case Some(l) => - l map (_.get.getBytes) + l map ( e => stringToByteArray(e.get)) } } - def getVectorStorageSizeFor(name: String): Int = { + def getVectorStorageSizeFor(name: String): Int = withErrorHandling { db.llen(new String(encode(name.getBytes))) match { case None => throw new NoSuchElementException(name + " not present") - case Some(l) => l + case Some(l) => + l } } def insertRefStorageFor(name: String, element: Array[Byte]): Unit = withErrorHandling { - db.set(new String(encode(name.getBytes)), new String(element)) + db.set(new String(encode(name.getBytes)), byteArrayToString(element)) + } + + def insertRefStorageFor(name: String, element: String): Unit = withErrorHandling { + db.set(new String(encode(name.getBytes)), element) } def getRefStorageFor(name: String): Option[Array[Byte]] = withErrorHandling { db.get(new String(encode(name.getBytes))) match { case None => throw new NoSuchElementException(name + " not present") - case Some(s) => Some(s.getBytes) - } - } - - override def incrementAtomically(name: String): Option[Int] = withErrorHandling { - db.incr(new String(encode(name.getBytes))) match { - case Some(i) => Some(i) - case None => - throw new IllegalArgumentException(name + " exception in incr") - } - } - - override def incrementByAtomically(name: String, by: Int): Option[Int] = withErrorHandling { - db.incrby(new String(encode(name.getBytes)), by) match { - case Some(i) => Some(i) - case None => - throw new IllegalArgumentException(name + " exception in incrby") - } - } - - override def decrementAtomically(name: String): Option[Int] = withErrorHandling { - db.decr(new String(encode(name.getBytes))) match { - case Some(i) => Some(i) - case None => - throw new IllegalArgumentException(name + " exception in decr") - } - } - - override def decrementByAtomically(name: String, by: Int): Option[Int] = withErrorHandling { - db.decrby(new String(encode(name.getBytes)), by) match { - case Some(i) => Some(i) - case None => - throw new IllegalArgumentException(name + " exception in decrby") + case Some(s) => Some(stringToByteArray(s)) } } // add to the end of the queue def enqueue(name: String, item: Array[Byte]): Boolean = withErrorHandling { - db.rpush(new String(encode(name.getBytes)), new String(item)) + db.rpush(new String(encode(name.getBytes)), byteArrayToString(item)) } - // pop from the front of the queue def dequeue(name: String): Option[Array[Byte]] = withErrorHandling { db.lpop(new String(encode(name.getBytes))) match { case None => throw new NoSuchElementException(name + " not present") - case Some(s) => - Some(s.getBytes) + case Some(s) => Some(stringToByteArray(s)) } } @@ -336,7 +333,7 @@ private [akka] object RedisStorageBackend extends case None => throw new NoSuchElementException("No element at " + start) case Some(s) => - List(s.getBytes) + List(stringToByteArray(s)) } case n => db.lrange(new String(encode(name.getBytes)), start, start + count - 1) match { @@ -344,7 +341,7 @@ private [akka] object RedisStorageBackend extends throw new NoSuchElementException( "No element found between " + start + " and " + (start + count - 1)) case Some(es) => - es.map(_.get.getBytes) + es.map(e => stringToByteArray(e.get)) } } } @@ -359,7 +356,7 @@ private [akka] object RedisStorageBackend extends // add item to sorted set identified by name def zadd(name: String, zscore: String, item: Array[Byte]): Boolean = withErrorHandling { - db.zadd(new String(encode(name.getBytes)), zscore, new String(item)) match { + db.zadd(new String(encode(name.getBytes)), zscore, byteArrayToString(item)) match { case Some(1) => true case _ => false } @@ -367,7 +364,7 @@ private [akka] object RedisStorageBackend extends // remove item from sorted set identified by name def zrem(name: String, item: Array[Byte]): Boolean = withErrorHandling { - db.zrem(new String(encode(name.getBytes)), new String(item)) match { + db.zrem(new String(encode(name.getBytes)), byteArrayToString(item)) match { case Some(1) => true case _ => false } @@ -383,7 +380,7 @@ private [akka] object RedisStorageBackend extends } def zscore(name: String, item: Array[Byte]): Option[Float] = withErrorHandling { - db.zscore(new String(encode(name.getBytes)), new String(item)) match { + db.zscore(new String(encode(name.getBytes)), byteArrayToString(item)) match { case Some(s) => Some(s.toFloat) case None => None } @@ -394,7 +391,7 @@ private [akka] object RedisStorageBackend extends case None => throw new NoSuchElementException(name + " not present") case Some(s) => - s.map(_.get.getBytes) + s.map(e => stringToByteArray(e.get)) } } @@ -404,7 +401,7 @@ private [akka] object RedisStorageBackend extends case None => throw new NoSuchElementException(name + " not present") case Some(l) => - l.map{ case (elem, score) => (elem.get.getBytes, score.get.toFloat) } + l.map{ case (elem, score) => (stringToByteArray(elem.get), score.get.toFloat) } } } diff --git a/akka-persistence/akka-persistence-redis/src/test/scala/RedisStorageBackendSpec.scala b/akka-persistence/akka-persistence-redis/src/test/scala/RedisStorageBackendSpec.scala index d8d79d7f2a..8a8021b3c5 100644 --- a/akka-persistence/akka-persistence-redis/src/test/scala/RedisStorageBackendSpec.scala +++ b/akka-persistence/akka-persistence-redis/src/test/scala/RedisStorageBackendSpec.scala @@ -9,6 +9,11 @@ import org.junit.runner.RunWith import se.scalablesolutions.akka.serialization.Serializable import se.scalablesolutions.akka.serialization.Serializer._ +import sbinary._ +import sbinary.Operations._ +import sbinary.DefaultProtocol._ +import java.util.{Calendar, Date} + import RedisStorageBackend._ @RunWith(classOf[JUnitRunner]) @@ -39,15 +44,6 @@ class RedisStorageBackendSpec extends "T-1", "debasish.language".getBytes).get) should equal("java") } - /** - it("should enter a custom object for transaction T-1") { - val n = Name(100, "debasish", "kolkata") - // insertMapStorageEntryFor("T-1", "debasish.identity".getBytes, Java.out(n)) - // insertMapStorageEntryFor("T-1", "debasish.identity".getBytes, n.toBytes) - getMapStorageSizeFor("T-1") should equal(5) - } - **/ - it("should enter key/values for another transaction T-2") { insertMapStorageEntryFor("T-2", "debasish.age".getBytes, "49".getBytes) insertMapStorageEntryFor("T-2", "debasish.spouse".getBytes, "paramita".getBytes) @@ -61,6 +57,21 @@ class RedisStorageBackendSpec extends } } + describe("Store and query long value in maps") { + it("should enter 4 entries in redis for transaction T-1") { + val d = Calendar.getInstance.getTime.getTime + insertMapStorageEntryFor("T-11", "debasish".getBytes, + toByteArray[Long](d)) + + getMapStorageSizeFor("T-11") should equal(1) + fromByteArray[Long](getMapStorageEntryFor("T-11", "debasish".getBytes).get) should equal(d) + } + + it("should remove map storage for T-1 and T2") { + removeMapStorageFor("T-11") + } + } + describe("Range query in maps") { it("should enter 7 entries in redis for transaction T-5") { insertMapStorageEntryFor("T-5", "trade.refno".getBytes, "R-123".getBytes) @@ -93,73 +104,61 @@ class RedisStorageBackendSpec extends } } + describe("Store and query objects in maps") { + import NameSerialization._ + it("should write a Name object and fetch it properly") { + val dtb = Calendar.getInstance.getTime + val n = Name(100, "debasish ghosh", "kolkata", dtb, Some(dtb)) + + insertMapStorageEntryFor("T-31", "debasish".getBytes, toByteArray[Name](n)) + getMapStorageSizeFor("T-31") should equal(1) + fromByteArray[Name](getMapStorageEntryFor("T-31", "debasish".getBytes).get) should equal(n) + } + it("should remove map storage for T31") { + removeMapStorageFor("T-31") + } + } + describe("Store and query in vectors") { it("should write 4 entries in a vector for transaction T-3") { insertVectorStorageEntryFor("T-3", "debasish".getBytes) insertVectorStorageEntryFor("T-3", "maulindu".getBytes) - val n = Name(100, "debasish", "kolkata") - // insertVectorStorageEntryFor("T-3", Java.out(n)) - // insertVectorStorageEntryFor("T-3", n.toBytes) insertVectorStorageEntryFor("T-3", "1200".getBytes) - getVectorStorageSizeFor("T-3") should equal(3) + + val dt = Calendar.getInstance.getTime.getTime + insertVectorStorageEntryFor("T-3", toByteArray[Long](dt)) + getVectorStorageSizeFor("T-3") should equal(4) + fromByteArray[Long](getVectorStorageEntryFor("T-3", 0)) should equal(dt) + getVectorStorageSizeFor("T-3") should equal(4) + } + } + + describe("Store and query objects in vectors") { + import NameSerialization._ + it("should write a Name object and fetch it properly") { + val dtb = Calendar.getInstance.getTime + val n = Name(100, "debasish ghosh", "kolkata", dtb, Some(dtb)) + + insertVectorStorageEntryFor("T-31", toByteArray[Name](n)) + getVectorStorageSizeFor("T-31") should equal(1) + fromByteArray[Name](getVectorStorageEntryFor("T-31", 0)) should equal(n) } } describe("Store and query in ref") { + import NameSerialization._ it("should write 4 entries in 4 refs for transaction T-4") { insertRefStorageFor("T-4", "debasish".getBytes) insertRefStorageFor("T-4", "maulindu".getBytes) insertRefStorageFor("T-4", "1200".getBytes) new String(getRefStorageFor("T-4").get) should equal("1200") - - // val n = Name(100, "debasish", "kolkata") - // insertRefStorageFor("T-4", Java.out(n)) - // insertRefStorageFor("T-4", n.toBytes) - // Java.in(getRefStorageFor("T-4").get, Some(classOf[Name])).asInstanceOf[Name] should equal(n) - // n.fromBytes(getRefStorageFor("T-4").get) should equal(n) } - } - - describe("atomic increment in ref") { - it("should increment an existing key value by 1") { - insertRefStorageFor("T-4-1", "1200".getBytes) - new String(getRefStorageFor("T-4-1").get) should equal("1200") - incrementAtomically("T-4-1").get should equal(1201) - } - it("should create and increment a non-existing key value by 1") { - incrementAtomically("T-4-2").get should equal(1) - new String(getRefStorageFor("T-4-2").get) should equal("1") - } - it("should increment an existing key value by the amount specified") { - insertRefStorageFor("T-4-3", "1200".getBytes) - new String(getRefStorageFor("T-4-3").get) should equal("1200") - incrementByAtomically("T-4-3", 50).get should equal(1250) - } - it("should create and increment a non-existing key value by the amount specified") { - incrementByAtomically("T-4-4", 20).get should equal(20) - new String(getRefStorageFor("T-4-4").get) should equal("20") - } - } - - describe("atomic decrement in ref") { - it("should decrement an existing key value by 1") { - insertRefStorageFor("T-4-5", "1200".getBytes) - new String(getRefStorageFor("T-4-5").get) should equal("1200") - decrementAtomically("T-4-5").get should equal(1199) - } - it("should create and decrement a non-existing key value by 1") { - decrementAtomically("T-4-6").get should equal(-1) - new String(getRefStorageFor("T-4-6").get) should equal("-1") - } - it("should decrement an existing key value by the amount specified") { - insertRefStorageFor("T-4-7", "1200".getBytes) - new String(getRefStorageFor("T-4-7").get) should equal("1200") - decrementByAtomically("T-4-7", 50).get should equal(1150) - } - it("should create and decrement a non-existing key value by the amount specified") { - decrementByAtomically("T-4-8", 20).get should equal(-20) - new String(getRefStorageFor("T-4-8").get) should equal("-20") + it("should write a Name object and fetch it properly") { + val dtb = Calendar.getInstance.getTime + val n = Name(100, "debasish ghosh", "kolkata", dtb, Some(dtb)) + insertRefStorageFor("T-4", toByteArray[Name](n)) + fromByteArray[Name](getRefStorageFor("T-4").get) should equal(n) } } @@ -185,6 +184,14 @@ class RedisStorageBackendSpec extends new String(l(1)) should equal("yukihiro matsumoto") new String(l(2)) should equal("claude shannon") } + it("should write a Name object and fetch it properly") { + import NameSerialization._ + val dtb = Calendar.getInstance.getTime + val n = Name(100, "debasish ghosh", "kolkata", dtb, Some(dtb)) + enqueue("T-5-1", toByteArray[Name](n)) + fromByteArray[Name](peek("T-5-1", 0, 1).head) should equal(n) + fromByteArray[Name](dequeue("T-5-1").get) should equal(n) + } } describe("store and query in sorted set") { @@ -221,27 +228,18 @@ class RedisStorageBackendSpec extends } } -case class Name(id: Int, name: String, address: String) - extends Serializable.SBinary[Name] { - import sbinary._ - import sbinary.Operations._ - import sbinary.DefaultProtocol._ +object NameSerialization { + implicit object DateFormat extends Format[Date] { + def reads(in : Input) = + new Date(read[Long](in)) - def this() = this(0, null, null) - - implicit object NameFormat extends Format[Name] { - def reads(in : Input) = Name( - read[Int](in), - read[String](in), - read[String](in)) - def writes(out: Output, value: Name) = { - write[Int](out, value.id) - write[String](out, value.name) - write[String](out, value.address) - } + def writes(out: Output, value: Date) = + write[Long](out, value.getTime) } - def fromBytes(bytes: Array[Byte]) = fromByteArray[Name](bytes) + case class Name(id: Int, name: String, + address: String, dateOfBirth: Date, dateDied: Option[Date]) - def toBytes: Array[Byte] = toByteArray(this) + implicit val NameFormat: Format[Name] = + asProduct5(Name)(Name.unapply(_).get) } diff --git a/project/build/AkkaProject.scala b/project/build/AkkaProject.scala index a9db966298..9c7af71867 100644 --- a/project/build/AkkaProject.scala +++ b/project/build/AkkaProject.scala @@ -241,6 +241,7 @@ class AkkaParent(info: ProjectInfo) extends DefaultProject(info) { class AkkaRedisProject(info: ProjectInfo) extends AkkaDefaultProject(info, distPath) { val redis = "com.redis" % "redisclient" % "2.8.0.RC3-1.4-SNAPSHOT" % "compile" + val commons_codec = "commons-codec" % "commons-codec" % "1.4" % "compile" override def testOptions = TestFilter((name: String) => name.endsWith("Test")) :: Nil }