diff --git a/akka-persistence/akka-persistence-common/src/main/scala/StorageBackend.scala b/akka-persistence/akka-persistence-common/src/main/scala/StorageBackend.scala index df74040b68..7e6a95f9a1 100644 --- a/akka-persistence/akka-persistence-common/src/main/scala/StorageBackend.scala +++ b/akka-persistence/akka-persistence-common/src/main/scala/StorageBackend.scala @@ -38,7 +38,7 @@ trait RefStorageBackend[T] extends StorageBackend { // for Queue trait QueueStorageBackend[T] extends StorageBackend { // add to the end of the queue - def enqueue(name: String, item: T): Boolean + def enqueue(name: String, item: T): Option[Int] // pop from the front of the queue def dequeue(name: String): Option[T] diff --git a/akka-persistence/akka-persistence-redis/src/main/scala/RedisStorageBackend.scala b/akka-persistence/akka-persistence-redis/src/main/scala/RedisStorageBackend.scala index eef60784a0..9200393ef9 100644 --- a/akka-persistence/akka-persistence-redis/src/main/scala/RedisStorageBackend.scala +++ b/akka-persistence/akka-persistence-redis/src/main/scala/RedisStorageBackend.scala @@ -11,34 +11,17 @@ import se.scalablesolutions.akka.config.Config.config import com.redis._ -trait Base64Encoder { - def encode(bytes: Array[Byte]): Array[Byte] - def decode(bytes: Array[Byte]): Array[Byte] -} - trait Base64StringEncoder { def byteArrayToString(bytes: Array[Byte]): String def stringToByteArray(str: String): Array[Byte] } -trait NullBase64 { - def encode(bytes: Array[Byte]): Array[Byte] = bytes - def decode(bytes: Array[Byte]): Array[Byte] = bytes -} - object CommonsCodec { import org.apache.commons.codec.binary.Base64 import org.apache.commons.codec.binary.Base64._ val b64 = new Base64(true) - trait CommonsCodecBase64 { - def encode(bytes: Array[Byte]): Array[Byte] = encodeBase64(bytes) - def decode(bytes: Array[Byte]): Array[Byte] = decodeBase64(bytes) - } - - object Base64Encoder extends Base64Encoder with CommonsCodecBase64 - trait CommonsCodecBase64StringEncoder { def byteArrayToString(bytes: Array[Byte]) = encodeBase64URLSafeString(bytes) def stringToByteArray(str: String) = b64.decode(str) @@ -48,7 +31,6 @@ object CommonsCodec { } import CommonsCodec._ -import CommonsCodec.Base64Encoder._ import CommonsCodec.Base64StringEncoder._ /** @@ -94,27 +76,7 @@ private [akka] object RedisStorageBackend extends /** * Map storage in Redis. *

- * Maps are stored as key/value pairs in redis. Redis keys cannot contain spaces. But with - * our use case, the keys will be specified by the user. Hence we need to encode the key - * ourselves before sending to Redis. We use base64 encoding. - *

- * Also since we are storing the key/value in the global namespace, we need to construct the - * key suitably so as to avoid namespace clash. The following strategy is used: - * - * Unique identifier for the map = T1 (say) - *

-   * Map(
-   *   "debasish.address" -> "kolkata, India",
-   *   "debasish.company" -> "anshinsoft",
-   *   "debasish.programming_language" -> "scala",
-   * )
- * will be stored as the following key-value pair in Redis: - * - * - * base64(T1):base64("debasish.address") -> "kolkata, India" - * base64(T1):base64("debasish.company") -> "anshinsoft" - * base64(T1):base64("debasish.programming_language") -> "scala" - * + * Maps are stored as key/value pairs in redis. */ def insertMapStorageEntryFor(name: String, key: Array[Byte], value: Array[Byte]): Unit = withErrorHandling { insertMapStorageEntriesFor(name, List((key, value))) @@ -134,12 +96,12 @@ private [akka] object RedisStorageBackend extends *
  • both parts of the key need to be based64 encoded since there can be spaces within each of them
  • */ private [this] def makeRedisKey(name: String, key: Array[Byte]): String = withErrorHandling { - "%s:%s".format(new String(encode(name.getBytes)), new String(encode(key))) + "%s:%s".format(name, byteArrayToString(key)) } private [this] def makeKeyFromRedisKey(redisKey: String) = withErrorHandling { - val nk = redisKey.split(':').map{e: String => decode(e.getBytes)} - (nk(0), nk(1)) + val nk = redisKey.split(':') + (nk(0), stringToByteArray(nk(1))) } private [this] def mset(entries: List[(String, String)]): Unit = withErrorHandling { @@ -149,11 +111,11 @@ private [akka] object RedisStorageBackend extends } def removeMapStorageFor(name: String): Unit = withErrorHandling { - db.keys("%s:*".format(new String(encode(name.getBytes)))) match { + db.keys("%s:*".format(name)) match { case None => throw new NoSuchElementException(name + " not present") case Some(keys) => - keys.foreach(db.del(_)) + keys.foreach(k => db.del(k.get)) } } @@ -170,19 +132,18 @@ private [akka] object RedisStorageBackend extends } def getMapStorageSizeFor(name: String): Int = withErrorHandling { - db.keys("%s:*".format(new String(encode(name.getBytes)))) match { + db.keys("%s:*".format(name)) match { case None => 0 - case Some(keys) => - keys.length + case Some(keys) => keys.length } } def getMapStorageFor(name: String): List[(Array[Byte], Array[Byte])] = withErrorHandling { - db.keys("%s:*".format(new String(encode(name.getBytes)))) match { + db.keys("%s:*".format(name)) match { case None => throw new NoSuchElementException(name + " not present") case Some(keys) => - keys.map(key => (makeKeyFromRedisKey(key)._2, stringToByteArray(db.get(key).get))).toList + keys.map(key => (makeKeyFromRedisKey(key.get)._2, stringToByteArray(db.get(key.get).get))).toList } } @@ -234,7 +195,7 @@ private [akka] object RedisStorageBackend extends } def insertVectorStorageEntryFor(name: String, element: Array[Byte]): Unit = withErrorHandling { - db.lpush(new String(encode(name.getBytes)), byteArrayToString(element)) + db.lpush(name, byteArrayToString(element)) } def insertVectorStorageEntriesFor(name: String, elements: List[Array[Byte]]): Unit = withErrorHandling { @@ -242,11 +203,11 @@ private [akka] object RedisStorageBackend extends } def updateVectorStorageEntryFor(name: String, index: Int, elem: Array[Byte]): Unit = withErrorHandling { - db.lset(new String(encode(name.getBytes)), index, byteArrayToString(elem)) + db.lset(name, index, byteArrayToString(elem)) } def getVectorStorageEntryFor(name: String, index: Int): Array[Byte] = withErrorHandling { - db.lindex(new String(encode(name.getBytes)), index) match { + db.lindex(name, index) match { case None => throw new NoSuchElementException(name + " does not have element at " + index) case Some(e) => @@ -270,33 +231,28 @@ private [akka] object RedisStorageBackend extends else count if (s == 0 && cnt == 0) List() else - db.lrange(new String(encode(name.getBytes)), s, s + cnt - 1) match { + db.lrange(name, s, s + cnt - 1) match { case None => throw new NoSuchElementException(name + " does not have elements in the range specified") case Some(l) => - l map ( e => stringToByteArray(e.get)) + l map (e => stringToByteArray(e.get)) } } def getVectorStorageSizeFor(name: String): Int = withErrorHandling { - db.llen(new String(encode(name.getBytes))) match { - case None => - throw new NoSuchElementException(name + " not present") - case Some(l) => - l - } + db.llen(name).getOrElse { throw new NoSuchElementException(name + " not present") } } def insertRefStorageFor(name: String, element: Array[Byte]): Unit = withErrorHandling { - db.set(new String(encode(name.getBytes)), byteArrayToString(element)) + db.set(name, byteArrayToString(element)) } def insertRefStorageFor(name: String, element: String): Unit = withErrorHandling { - db.set(new String(encode(name.getBytes)), element) + db.set(name, element) } def getRefStorageFor(name: String): Option[Array[Byte]] = withErrorHandling { - db.get(new String(encode(name.getBytes))) match { + db.get(name) match { case None => throw new NoSuchElementException(name + " not present") case Some(s) => Some(stringToByteArray(s)) @@ -304,13 +260,13 @@ private [akka] object RedisStorageBackend extends } // add to the end of the queue - def enqueue(name: String, item: Array[Byte]): Boolean = withErrorHandling { - db.rpush(new String(encode(name.getBytes)), byteArrayToString(item)) + def enqueue(name: String, item: Array[Byte]): Option[Int] = withErrorHandling { + db.rpush(name, byteArrayToString(item)) } // pop from the front of the queue def dequeue(name: String): Option[Array[Byte]] = withErrorHandling { - db.lpop(new String(encode(name.getBytes))) match { + db.lpop(name) match { case None => throw new NoSuchElementException(name + " not present") case Some(s) => Some(stringToByteArray(s)) @@ -319,11 +275,7 @@ private [akka] object RedisStorageBackend extends // get the size of the queue def size(name: String): Int = withErrorHandling { - db.llen(new String(encode(name.getBytes))) match { - case None => - throw new NoSuchElementException(name + " not present") - case Some(l) => l - } + db.llen(name).getOrElse { throw new NoSuchElementException(name + " not present") } } // return an array of items currently stored in the queue @@ -331,14 +283,14 @@ private [akka] object RedisStorageBackend extends def peek(name: String, start: Int, count: Int): List[Array[Byte]] = withErrorHandling { count match { case 1 => - db.lindex(new String(encode(name.getBytes)), start) match { + db.lindex(name, start) match { case None => throw new NoSuchElementException("No element at " + start) case Some(s) => List(stringToByteArray(s)) } case n => - db.lrange(new String(encode(name.getBytes)), start, start + count - 1) match { + db.lrange(name, start, start + count - 1) match { case None => throw new NoSuchElementException( "No element found between " + start + " and " + (start + count - 1)) @@ -350,7 +302,7 @@ private [akka] object RedisStorageBackend extends // completely delete the queue def remove(name: String): Boolean = withErrorHandling { - db.del(new String(encode(name.getBytes))) match { + db.del(name) match { case Some(1) => true case _ => false } @@ -358,7 +310,7 @@ private [akka] object RedisStorageBackend extends // add item to sorted set identified by name def zadd(name: String, zscore: String, item: Array[Byte]): Boolean = withErrorHandling { - db.zadd(new String(encode(name.getBytes)), zscore, byteArrayToString(item)) match { + db.zadd(name, zscore, byteArrayToString(item)) match { case Some(1) => true case _ => false } @@ -366,7 +318,7 @@ private [akka] object RedisStorageBackend extends // remove item from sorted set identified by name def zrem(name: String, item: Array[Byte]): Boolean = withErrorHandling { - db.zrem(new String(encode(name.getBytes)), byteArrayToString(item)) match { + db.zrem(name, byteArrayToString(item)) match { case Some(1) => true case _ => false } @@ -374,22 +326,18 @@ private [akka] object RedisStorageBackend extends // cardinality of the set identified by name def zcard(name: String): Int = withErrorHandling { - db.zcard(new String(encode(name.getBytes))) match { - case None => - throw new NoSuchElementException(name + " not present") - case Some(l) => l - } + db.zcard(name).getOrElse { throw new NoSuchElementException(name + " not present") } } def zscore(name: String, item: Array[Byte]): Option[Float] = withErrorHandling { - db.zscore(new String(encode(name.getBytes)), byteArrayToString(item)) match { + db.zscore(name, byteArrayToString(item)) match { case Some(s) => Some(s.toFloat) case None => None } } def zrange(name: String, start: Int, end: Int): List[Array[Byte]] = withErrorHandling { - db.zrange(new String(encode(name.getBytes)), start.toString, end.toString, RedisClient.ASC, false) match { + db.zrange(name, start.toString, end.toString, RedisClient.ASC, false) match { case None => throw new NoSuchElementException(name + " not present") case Some(s) => @@ -399,7 +347,7 @@ private [akka] object RedisStorageBackend extends def zrangeWithScore(name: String, start: Int, end: Int): List[(Array[Byte], Float)] = withErrorHandling { db.zrangeWithScore( - new String(encode(name.getBytes)), start.toString, end.toString, RedisClient.ASC) match { + name, start.toString, end.toString, RedisClient.ASC) match { case None => throw new NoSuchElementException(name + " not present") case Some(l) => diff --git a/embedded-repo/com/redis/redisclient/1.1/redisclient-1.1.jar b/embedded-repo/com/redis/redisclient/1.1/redisclient-1.1.jar deleted file mode 100644 index 5d2a6a3632..0000000000 Binary files a/embedded-repo/com/redis/redisclient/1.1/redisclient-1.1.jar and /dev/null differ diff --git a/embedded-repo/com/redis/redisclient/1.1/redisclient-1.1.pom b/embedded-repo/com/redis/redisclient/1.1/redisclient-1.1.pom deleted file mode 100755 index 16dd81402a..0000000000 --- a/embedded-repo/com/redis/redisclient/1.1/redisclient-1.1.pom +++ /dev/null @@ -1,8 +0,0 @@ - - - 4.0.0 - com.redis - redisclient - 1.1 - jar - diff --git a/embedded-repo/com/redis/redisclient/1.2/redisclient-1.2.jar b/embedded-repo/com/redis/redisclient/1.2/redisclient-1.2.jar deleted file mode 100644 index 91ff84b97c..0000000000 Binary files a/embedded-repo/com/redis/redisclient/1.2/redisclient-1.2.jar and /dev/null differ diff --git a/embedded-repo/com/redis/redisclient/2.8.0-2.0/redisclient-2.8.0-2.0.jar b/embedded-repo/com/redis/redisclient/2.8.0-2.0/redisclient-2.8.0-2.0.jar new file mode 100644 index 0000000000..66c18b6fbf Binary files /dev/null and b/embedded-repo/com/redis/redisclient/2.8.0-2.0/redisclient-2.8.0-2.0.jar differ diff --git a/embedded-repo/com/redis/redisclient/2.8.0.Beta1-1.2/redisclient-2.8.0.Beta1-1.2.jar b/embedded-repo/com/redis/redisclient/2.8.0.Beta1-1.2/redisclient-2.8.0.Beta1-1.2.jar deleted file mode 100644 index 3f1593380b..0000000000 Binary files a/embedded-repo/com/redis/redisclient/2.8.0.Beta1-1.2/redisclient-2.8.0.Beta1-1.2.jar and /dev/null differ diff --git a/embedded-repo/com/redis/redisclient/2.8.0.Beta1-1.2/redisclient-2.8.0.Beta1-1.2.pom b/embedded-repo/com/redis/redisclient/2.8.0.Beta1-1.2/redisclient-2.8.0.Beta1-1.2.pom deleted file mode 100755 index 68f3763187..0000000000 --- a/embedded-repo/com/redis/redisclient/2.8.0.Beta1-1.2/redisclient-2.8.0.Beta1-1.2.pom +++ /dev/null @@ -1,8 +0,0 @@ - - - 4.0.0 - com.redis - redisclient - 2.8.0.Beta1-1.2 - jar - diff --git a/embedded-repo/com/redis/redisclient/2.8.0.Beta1-1.3-SNAPSHOT/redisclient-2.8.0.Beta1-1.3-SNAPSHOT.jar b/embedded-repo/com/redis/redisclient/2.8.0.Beta1-1.3-SNAPSHOT/redisclient-2.8.0.Beta1-1.3-SNAPSHOT.jar deleted file mode 100644 index 0daede37f0..0000000000 Binary files a/embedded-repo/com/redis/redisclient/2.8.0.Beta1-1.3-SNAPSHOT/redisclient-2.8.0.Beta1-1.3-SNAPSHOT.jar and /dev/null differ diff --git a/embedded-repo/com/redis/redisclient/2.8.0.RC2-1.4-SNAPSHOT/redisclient-2.8.0.RC2-1.4-SNAPSHOT.jar b/embedded-repo/com/redis/redisclient/2.8.0.RC2-1.4-SNAPSHOT/redisclient-2.8.0.RC2-1.4-SNAPSHOT.jar deleted file mode 100644 index 261b5cc1be..0000000000 Binary files a/embedded-repo/com/redis/redisclient/2.8.0.RC2-1.4-SNAPSHOT/redisclient-2.8.0.RC2-1.4-SNAPSHOT.jar and /dev/null differ diff --git a/embedded-repo/com/redis/redisclient/2.8.0.RC3-1.4-SNAPSHOT/redisclient-2.8.0.RC3-1.4-SNAPSHOT.jar b/embedded-repo/com/redis/redisclient/2.8.0.RC3-1.4-SNAPSHOT/redisclient-2.8.0.RC3-1.4-SNAPSHOT.jar deleted file mode 100644 index d939a49d7c..0000000000 Binary files a/embedded-repo/com/redis/redisclient/2.8.0.RC3-1.4-SNAPSHOT/redisclient-2.8.0.RC3-1.4-SNAPSHOT.jar and /dev/null differ diff --git a/embedded-repo/com/redis/redisclient/2.8.0.RC3-1.4/redisclient-2.8.0.RC3-1.4.jar b/embedded-repo/com/redis/redisclient/2.8.0.RC3-1.4/redisclient-2.8.0.RC3-1.4.jar deleted file mode 100644 index 351ff49c9d..0000000000 Binary files a/embedded-repo/com/redis/redisclient/2.8.0.RC3-1.4/redisclient-2.8.0.RC3-1.4.jar and /dev/null differ diff --git a/project/build/AkkaProject.scala b/project/build/AkkaProject.scala index 15f7ee7a99..393b531dd6 100644 --- a/project/build/AkkaProject.scala +++ b/project/build/AkkaProject.scala @@ -176,7 +176,7 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) { lazy val rabbit = "com.rabbitmq" % "amqp-client" % "1.8.1" % "compile" - lazy val redis = "com.redis" % "redisclient" % "2.8.0-1.4" % "compile" + lazy val redis = "com.redis" % "redisclient" % "2.8.0-2.0" % "compile" lazy val sbinary = "sbinary" % "sbinary" % "2.8.0-0.3.1" % "compile"