new redisclient with support for clustering
This commit is contained in:
parent
183fcfbb9a
commit
d6de99bcee
4 changed files with 31 additions and 9 deletions
|
|
@ -44,11 +44,26 @@ private [akka] object RedisStorageBackend extends
|
|||
SortedSetStorageBackend[Array[Byte]] with
|
||||
Logging {
|
||||
|
||||
val REDIS_SERVER_HOSTNAME = config.getString("akka.storage.redis.hostname", "127.0.0.1")
|
||||
val REDIS_SERVER_PORT = config.getInt("akka.storage.redis.port", 6379)
|
||||
// need an explicit definition in akka-conf
|
||||
val nodes = config.getList("akka.storage.redis.cluster")
|
||||
|
||||
val db =
|
||||
nodes match {
|
||||
case Seq() =>
|
||||
// no cluster defined
|
||||
val REDIS_SERVER_HOSTNAME = config.getString("akka.storage.redis.hostname", "127.0.0.1")
|
||||
val REDIS_SERVER_PORT = config.getInt("akka.storage.redis.port", 6379)
|
||||
new RedisClient(REDIS_SERVER_HOSTNAME, REDIS_SERVER_PORT)
|
||||
|
||||
case s =>
|
||||
// with cluster
|
||||
import com.redis.cluster._
|
||||
println("Running suite on Redis cluster")
|
||||
new RedisCluster(nodes: _*) {
|
||||
val keyTag = Some(NoOpKeyTag)
|
||||
}
|
||||
}
|
||||
|
||||
val db = new RedisClient(REDIS_SERVER_HOSTNAME, REDIS_SERVER_PORT)
|
||||
|
||||
/**
|
||||
* Map storage in Redis.
|
||||
* <p/>
|
||||
|
|
@ -107,16 +122,16 @@ private [akka] object RedisStorageBackend extends
|
|||
}
|
||||
|
||||
def removeMapStorageFor(name: String): Unit = withErrorHandling {
|
||||
db.keys("%s:*".format(encode(name.getBytes))) match {
|
||||
db.keys("%s:*".format(new String(encode(name.getBytes)))) match {
|
||||
case None =>
|
||||
throw new NoSuchElementException(name + " not present")
|
||||
case Some(keys) =>
|
||||
keys.foreach(db.delete(_))
|
||||
keys.foreach(db.del(_))
|
||||
}
|
||||
}
|
||||
|
||||
def removeMapStorageFor(name: String, key: Array[Byte]): Unit = withErrorHandling {
|
||||
db.delete(makeRedisKey(name, key))
|
||||
db.del(makeRedisKey(name, key))
|
||||
}
|
||||
|
||||
def getMapStorageEntryFor(name: String, key: Array[Byte]): Option[Array[Byte]] = withErrorHandling {
|
||||
|
|
@ -333,7 +348,7 @@ private [akka] object RedisStorageBackend extends
|
|||
|
||||
// completely delete the queue
|
||||
def remove(name: String): Boolean = withErrorHandling {
|
||||
db.delete(new String(encode(name.getBytes))) match {
|
||||
db.del(new String(encode(name.getBytes))) match {
|
||||
case Some(1) => true
|
||||
case _ => false
|
||||
}
|
||||
|
|
|
|||
|
|
@ -3,3 +3,10 @@ include "akka-reference.conf"
|
|||
|
||||
# In this file you can override any option defined in the 'akka-reference.conf' file.
|
||||
# Copy in all or parts of the 'akka-reference.conf' file and modify as you please.
|
||||
# <akka>
|
||||
# <storage>
|
||||
# <redis>
|
||||
# cluster = ["localhost:6379", "localhost:6380", "localhost:6381"]
|
||||
# </redis>
|
||||
# </storage>
|
||||
# </akka>
|
||||
|
|
|
|||
Binary file not shown.
|
|
@ -269,7 +269,7 @@ class AkkaParent(info: ProjectInfo) extends DefaultProject(info) {
|
|||
}
|
||||
|
||||
class AkkaRedisProject(info: ProjectInfo) extends DefaultProject(info) {
|
||||
val redis = "com.redis" % "redisclient" % "2.8.0.Beta1-1.2" % "compile"
|
||||
val redis = "com.redis" % "redisclient" % "2.8.0.Beta1-1.3-SNAPSHOT" % "compile"
|
||||
override def testOptions = TestFilter((name: String) => name.endsWith("Test")) :: Nil
|
||||
lazy val dist = deployTask(info, distPath, true, true, true) dependsOn(`package`, packageDocs, packageSrc) describedAs("Deploying")
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue