fixed redis based serialization logic, added RedisClientPool, which is needed to handle multiple asynchronous message persistence over single threaded Redis - now runs all test cases in DurableMailboxSpec

This commit is contained in:
Debasish Ghosh 2011-07-31 01:59:06 +05:30
parent 02aeec6b57
commit 433c83c0db

View file

@ -15,48 +15,29 @@ import com.redis._
class RedisBasedMailboxException(message: String) extends AkkaException(message)
trait Base64StringEncoder {
def byteArrayToString(bytes: Array[Byte]): String
def stringToByteArray(str: String): Array[Byte]
}
object CommonsCodec {
import org.apache.commons.codec.binary.Base64
import org.apache.commons.codec.binary.Base64._
val b64 = new Base64(true)
trait CommonsCodecBase64StringEncoder {
def byteArrayToString(bytes: Array[Byte]) = encodeBase64URLSafeString(bytes)
def stringToByteArray(str: String) = b64.decode(str)
}
object Base64StringEncoder extends Base64StringEncoder with CommonsCodecBase64StringEncoder
}
import CommonsCodec._
import CommonsCodec.Base64StringEncoder._
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class RedisBasedMailbox(val owner: ActorRef) extends DurableExecutableMailbox(owner) {
val nodes = config.getList("akka.persistence.redis.cluster") // need an explicit definition in akka-conf
@volatile
private var db = connect() //review Is the Redis connection thread safe?
private var clients = connect() // returns a RedisClientPool for multiple asynchronous message handling
def enqueue(message: MessageInvocation) = {
EventHandler.debug(this,
"\nENQUEUING message in redis-based mailbox [%s]".format(message))
withErrorHandling {
db.rpush(name, byteArrayToString(serialize(message)))
clients.withClient { client
client.rpush(name, serialize(message))
}
}
}
def dequeue: MessageInvocation = withErrorHandling {
try {
val item = db.lpop(name).map(stringToByteArray(_)).getOrElse(throw new NoSuchElementException(name + " not present"))
import serialization.Parse.Implicits.parseByteArray
val item = clients.withClient { client
client.lpop[Array[Byte]](name).getOrElse(throw new NoSuchElementException(name + " not present"))
}
val messageInvocation = deserialize(item)
EventHandler.debug(this,
"\nDEQUEUING message in redis-based mailbox [%s]".format(messageInvocation))
@ -70,34 +51,25 @@ class RedisBasedMailbox(val owner: ActorRef) extends DurableExecutableMailbox(ow
}
def size: Int = withErrorHandling {
db.llen(name).getOrElse(throw new NoSuchElementException(name + " not present"))
clients.withClient { client
client.llen(name).getOrElse(throw new NoSuchElementException(name + " not present"))
}
}
def isEmpty: Boolean = size == 0 //TODO review find other solution, this will be very expensive
private[akka] def connect() =
nodes match {
case Seq()
// no cluster defined
new RedisClient(
config.getString("akka.actor.mailbox.redis.hostname", "127.0.0.1"),
config.getInt("akka.actor.mailbox.redis.port", 6379))
case s
// with cluster
import com.redis.cluster._
EventHandler.info(this, "Running on Redis cluster")
new RedisCluster(nodes: _*) {
val keyTag = Some(NoOpKeyTag)
}
}
private[akka] def connect() = {
new RedisClientPool(
config.getString("akka.actor.mailbox.redis.hostname", "127.0.0.1"),
config.getInt("akka.actor.mailbox.redis.port", 6379))
}
private def withErrorHandling[T](body: T): T = {
try {
body
} catch {
case e: RedisConnectionException {
db = connect()
clients = connect()
body
}
case e
@ -107,3 +79,4 @@ class RedisBasedMailbox(val owner: ActorRef) extends DurableExecutableMailbox(ow
}
}
}