diff --git a/akka-persistence/akka-persistence-redis/src/main/scala/RedisStorageBackend.scala b/akka-persistence/akka-persistence-redis/src/main/scala/RedisStorageBackend.scala index a81f94b53f..a9b7c9e9ea 100644 --- a/akka-persistence/akka-persistence-redis/src/main/scala/RedisStorageBackend.scala +++ b/akka-persistence/akka-persistence-redis/src/main/scala/RedisStorageBackend.scala @@ -44,11 +44,26 @@ private [akka] object RedisStorageBackend extends SortedSetStorageBackend[Array[Byte]] with Logging { - val REDIS_SERVER_HOSTNAME = config.getString("akka.storage.redis.hostname", "127.0.0.1") - val REDIS_SERVER_PORT = config.getInt("akka.storage.redis.port", 6379) + // need an explicit definition in akka-conf + val nodes = config.getList("akka.storage.redis.cluster") + + val db = + nodes match { + case Seq() => + // no cluster defined + val REDIS_SERVER_HOSTNAME = config.getString("akka.storage.redis.hostname", "127.0.0.1") + val REDIS_SERVER_PORT = config.getInt("akka.storage.redis.port", 6379) + new RedisClient(REDIS_SERVER_HOSTNAME, REDIS_SERVER_PORT) + + case s => + // with cluster + import com.redis.cluster._ + println("Running suite on Redis cluster") + new RedisCluster(nodes: _*) { + val keyTag = Some(NoOpKeyTag) + } + } - val db = new RedisClient(REDIS_SERVER_HOSTNAME, REDIS_SERVER_PORT) - /** * Map storage in Redis. *
@@ -107,16 +122,16 @@ private [akka] object RedisStorageBackend extends } def removeMapStorageFor(name: String): Unit = withErrorHandling { - db.keys("%s:*".format(encode(name.getBytes))) match { + db.keys("%s:*".format(new String(encode(name.getBytes)))) match { case None => throw new NoSuchElementException(name + " not present") case Some(keys) => - keys.foreach(db.delete(_)) + keys.foreach(db.del(_)) } } def removeMapStorageFor(name: String, key: Array[Byte]): Unit = withErrorHandling { - db.delete(makeRedisKey(name, key)) + db.del(makeRedisKey(name, key)) } def getMapStorageEntryFor(name: String, key: Array[Byte]): Option[Array[Byte]] = withErrorHandling { @@ -333,7 +348,7 @@ private [akka] object RedisStorageBackend extends // completely delete the queue def remove(name: String): Boolean = withErrorHandling { - db.delete(new String(encode(name.getBytes))) match { + db.del(new String(encode(name.getBytes))) match { case Some(1) => true case _ => false } diff --git a/config/akka.conf b/config/akka.conf index 84b9bfbbcf..71670837d3 100644 --- a/config/akka.conf +++ b/config/akka.conf @@ -3,3 +3,10 @@ include "akka-reference.conf" # In this file you can override any option defined in the 'akka-reference.conf' file. # Copy in all or parts of the 'akka-reference.conf' file and modify as you please. +#