diff --git a/akka-durable-mailboxes/akka-redis-mailbox/src/main/scala/akka/actor/mailbox/RedisBasedMailbox.scala b/akka-durable-mailboxes/akka-redis-mailbox/src/main/scala/akka/actor/mailbox/RedisBasedMailbox.scala index ffccee269b..3b4efac5e9 100644 --- a/akka-durable-mailboxes/akka-redis-mailbox/src/main/scala/akka/actor/mailbox/RedisBasedMailbox.scala +++ b/akka-durable-mailboxes/akka-redis-mailbox/src/main/scala/akka/actor/mailbox/RedisBasedMailbox.scala @@ -15,48 +15,29 @@ import com.redis._ class RedisBasedMailboxException(message: String) extends AkkaException(message) -trait Base64StringEncoder { - def byteArrayToString(bytes: Array[Byte]): String - def stringToByteArray(str: String): Array[Byte] -} - -object CommonsCodec { - import org.apache.commons.codec.binary.Base64 - import org.apache.commons.codec.binary.Base64._ - - val b64 = new Base64(true) - - trait CommonsCodecBase64StringEncoder { - def byteArrayToString(bytes: Array[Byte]) = encodeBase64URLSafeString(bytes) - def stringToByteArray(str: String) = b64.decode(str) - } - - object Base64StringEncoder extends Base64StringEncoder with CommonsCodecBase64StringEncoder -} - -import CommonsCodec._ -import CommonsCodec.Base64StringEncoder._ - /** * @author Jonas Bonér */ class RedisBasedMailbox(val owner: ActorRef) extends DurableExecutableMailbox(owner) { - val nodes = config.getList("akka.persistence.redis.cluster") // need an explicit definition in akka-conf - @volatile - private var db = connect() //review Is the Redis connection thread safe? + private var clients = connect() // returns a RedisClientPool for multiple asynchronous message handling def enqueue(message: MessageInvocation) = { EventHandler.debug(this, "\nENQUEUING message in redis-based mailbox [%s]".format(message)) withErrorHandling { - db.rpush(name, byteArrayToString(serialize(message))) + clients.withClient { client ⇒ + client.rpush(name, serialize(message)) + } } } def dequeue: MessageInvocation = withErrorHandling { try { - val item = db.lpop(name).map(stringToByteArray(_)).getOrElse(throw new NoSuchElementException(name + " not present")) + import serialization.Parse.Implicits.parseByteArray + val item = clients.withClient { client ⇒ + client.lpop[Array[Byte]](name).getOrElse(throw new NoSuchElementException(name + " not present")) + } val messageInvocation = deserialize(item) EventHandler.debug(this, "\nDEQUEUING message in redis-based mailbox [%s]".format(messageInvocation)) @@ -70,34 +51,25 @@ class RedisBasedMailbox(val owner: ActorRef) extends DurableExecutableMailbox(ow } def size: Int = withErrorHandling { - db.llen(name).getOrElse(throw new NoSuchElementException(name + " not present")) + clients.withClient { client ⇒ + client.llen(name).getOrElse(throw new NoSuchElementException(name + " not present")) + } } def isEmpty: Boolean = size == 0 //TODO review find other solution, this will be very expensive - private[akka] def connect() = - nodes match { - case Seq() ⇒ - // no cluster defined - new RedisClient( - config.getString("akka.actor.mailbox.redis.hostname", "127.0.0.1"), - config.getInt("akka.actor.mailbox.redis.port", 6379)) - - case s ⇒ - // with cluster - import com.redis.cluster._ - EventHandler.info(this, "Running on Redis cluster") - new RedisCluster(nodes: _*) { - val keyTag = Some(NoOpKeyTag) - } - } + private[akka] def connect() = { + new RedisClientPool( + config.getString("akka.actor.mailbox.redis.hostname", "127.0.0.1"), + config.getInt("akka.actor.mailbox.redis.port", 6379)) + } private def withErrorHandling[T](body: ⇒ T): T = { try { body } catch { case e: RedisConnectionException ⇒ { - db = connect() + clients = connect() body } case e ⇒ @@ -107,3 +79,4 @@ class RedisBasedMailbox(val owner: ActorRef) extends DurableExecutableMailbox(ow } } } +