Cleanup and wait/retry on futures returned from spymemcached appropriately

This commit is contained in:
ticktock 2010-11-03 20:27:56 -04:00
parent 1af208532a
commit 7fea2448b9

View file

@ -11,7 +11,7 @@ import net.spy.memcached.transcoders._
import collection.JavaConversions
import java.lang.String
import collection.immutable.{TreeMap, Iterable}
import java.util.concurrent.TimeUnit
import java.util.concurrent.{TimeoutException, Future, TimeUnit}
private[akka] object MemcachedStorageBackend extends CommonStorageBackend {
@ -57,8 +57,9 @@ private[akka] object MemcachedStorageBackend extends CommonStorageBackend {
def drop() = client.flush()
def delete(key: Array[Byte]) = {
val deleted = client.delete(keyStr(encodeKey(key))).get(5L, TimeUnit.SECONDS);
()
retry(5, (1L, TimeUnit.SECONDS), false) {
client.delete(keyStr(encodeKey(key)))
}
}
def getAll(keys: Iterable[Array[Byte]]) = {
@ -84,8 +85,30 @@ private[akka] object MemcachedStorageBackend extends CommonStorageBackend {
def put(key: Array[Byte], value: Array[Byte]) = {
client.set(keyStr(encodeKey(key)), Integer.MAX_VALUE, value).get(5L, TimeUnit.SECONDS);
()
retry(5, (1L, TimeUnit.SECONDS), true) {
client.set(keyStr(encodeKey(key)), Integer.MAX_VALUE, value)
}
}
private def retry(tries: Int, waitFor: (Long, TimeUnit), tillTrue: Boolean)(action: => Future[java.lang.Boolean]): Unit = {
if (tries == 0) {
throw new TimeoutException("Exahusted all retries performing an operation on memcached")
} else {
val future = action
try
{
if (future.get(waitFor._1, waitFor._2).equals(false) && tillTrue) {
log.debug("memcached future returned false, operation failed. retrying")
retry(tries - 1, waitFor, tillTrue)(action)
}
} catch {
case te: TimeoutException => {
log.debug("memcached future timed out. retrying")
retry(tries - 1, waitFor, tillTrue)(action)
}
}
}
}
}