diff --git a/akka-persistence/akka-persistence-memcached/src/main/scala/MemcachedStorageBackend.scala b/akka-persistence/akka-persistence-memcached/src/main/scala/MemcachedStorageBackend.scala index c05f710e8c..a387836547 100644 --- a/akka-persistence/akka-persistence-memcached/src/main/scala/MemcachedStorageBackend.scala +++ b/akka-persistence/akka-persistence-memcached/src/main/scala/MemcachedStorageBackend.scala @@ -11,7 +11,7 @@ import net.spy.memcached.transcoders._ import collection.JavaConversions import java.lang.String import collection.immutable.{TreeMap, Iterable} -import java.util.concurrent.TimeUnit +import java.util.concurrent.{TimeoutException, Future, TimeUnit} private[akka] object MemcachedStorageBackend extends CommonStorageBackend { @@ -57,8 +57,9 @@ private[akka] object MemcachedStorageBackend extends CommonStorageBackend { def drop() = client.flush() def delete(key: Array[Byte]) = { - val deleted = client.delete(keyStr(encodeKey(key))).get(5L, TimeUnit.SECONDS); - () + retry(5, (1L, TimeUnit.SECONDS), false) { + client.delete(keyStr(encodeKey(key))) + } } def getAll(keys: Iterable[Array[Byte]]) = { @@ -84,8 +85,30 @@ private[akka] object MemcachedStorageBackend extends CommonStorageBackend { def put(key: Array[Byte], value: Array[Byte]) = { - client.set(keyStr(encodeKey(key)), Integer.MAX_VALUE, value).get(5L, TimeUnit.SECONDS); - () + retry(5, (1L, TimeUnit.SECONDS), true) { + client.set(keyStr(encodeKey(key)), Integer.MAX_VALUE, value) + } + + } + + private def retry(tries: Int, waitFor: (Long, TimeUnit), tillTrue: Boolean)(action: => Future[java.lang.Boolean]): Unit = { + if (tries == 0) { + throw new TimeoutException("Exahusted all retries performing an operation on memcached") + } else { + val future = action + try + { + if (future.get(waitFor._1, waitFor._2).equals(false) && tillTrue) { + log.debug("memcached future returned false, operation failed. retrying") + retry(tries - 1, waitFor, tillTrue)(action) + } + } catch { + case te: TimeoutException => { + log.debug("memcached future timed out. retrying") + retry(tries - 1, waitFor, tillTrue)(action) + } + } + } } }