diff --git a/akka-persistence/akka-persistence-riak/src/main/scala/RiakStorageBackend.scala b/akka-persistence/akka-persistence-riak/src/main/scala/RiakStorageBackend.scala index bf3a5c169f..12f28a4990 100644 --- a/akka-persistence/akka-persistence-riak/src/main/scala/RiakStorageBackend.scala +++ b/akka-persistence/akka-persistence-riak/src/main/scala/RiakStorageBackend.scala @@ -18,13 +18,10 @@ import collection.mutable.ArrayBuffer import java.util.{Properties, Map => JMap} import se.scalablesolutions.akka.persistence.common.PersistentMapBinary.COrdering._ import collection.immutable._ -import com.trifork.riak.{RiakObject, RiakClient} import com.google.protobuf.ByteString import com.google.protobuf.ByteString._ -/* - RequiredReads + RequiredWrites should be > ReplicationFactor for all Voldemort Stores - In this case all VoldemortBackend operations can be retried until successful, and data should remain consistent - */ +import com.trifork.riak.{RequestMeta, RiakObject, RiakClient} + private[akka] object RiakStorageBackend extends MapStorageBackend[Array[Byte], Array[Byte]] with @@ -32,17 +29,13 @@ MapStorageBackend[Array[Byte], Array[Byte]] with RefStorageBackend[Array[Byte]] with QueueStorageBackend[Array[Byte]] with Logging { - val bootstrapUrlsProp = "bootstrap_urls" - val clientConfig = config.getConfigMap("akka.storage.riak.client") match { - case Some(configMap) => getClientConfig(configMap.asMap) - case None => getClientConfig(new HashMap[String, String] + (bootstrapUrlsProp -> "tcp://localhost:6666")) - } val refBucket = config.getString("akka.storage.riak.bucket.ref", "Refs") val mapBucket = config.getString("akka.storage.riak.bucket.map", "Maps") val vectorBucket = config.getString("akka.storage.riak.bucket.vector", "Vectors") val queueBucket = config.getString("akka.storage.riak.bucket.queue", "Queues") - - var riakClient: RiakClient = new RiakClient("localhost"); + val clientHost = config.getString("akka.storage.riak.client.host", "localhost") + val clientPort = config.getInt("akka.storage.riak.client.port", 8087) + var riakClient: RiakClient = new RiakClient(clientHost, clientPort); val nullMapValueHeader = 0x00.byteValue val nullMapValue: Array[Byte] = Array(nullMapValueHeader) @@ -124,6 +117,7 @@ MapStorageBackend[Array[Byte], Array[Byte]] with keys.foreach { key => MapClient.delete(getKey(name, key)) + log.debug("deleted key %s for %s", key, name) } MapClient.delete(getKey(name, mapKeysIndex)) } @@ -469,17 +463,24 @@ MapStorageBackend[Array[Byte], Array[Byte]] with trait RiakAccess { def bucket: String - + //http://www.mail-archive.com/riak-users@lists.basho.com/msg01013.html + val quorum: Int = 0xfffffffd + val one: Int = 0xfffffffe + val all: Int = 0xfffffffc + val default: Int = 0xfffffffb def put(key: Array[Byte], value: Array[Byte]) = { - riakClient.store(new RiakObject(bucket, key, value)) + riakClient.store(new RiakObject(bucket, key, value), new RequestMeta().w(quorum).dw(quorum)) } def getValue(key: Array[Byte]): Array[Byte] = { - val objs = riakClient.fetch(bucket, key) + val objs = riakClient.fetch(bucket, key, quorum) objs.size match { case 0 => null; - case _ => objs.last.getValue + case _ => objs.last.getValue.isEmpty match { + case true => null + case false => objs(0).getValue + } } } @@ -493,19 +494,26 @@ MapStorageBackend[Array[Byte], Array[Byte]] with def getAll(keys: Traversable[Array[Byte]]): Map[Array[Byte], Array[Byte]] = { var result = new HashMap[Array[Byte], Array[Byte]] keys.foreach { - key => result += key -> getValue(key) + key => + val value = getValue(key) + Option(value) match { + case Some(value) => result += key -> value + case None => () + } } result } def delete(key: Array[Byte]) = { - riakClient.delete(bucket, key) + riakClient.delete(bucket, key, quorum) } def drop() { - JavaConversions.asIterable(riakClient.listKeys(bucket)) foreach { + val keys = riakClient.listKeys(bucket) + JavaConversions.asIterable(keys) foreach { delete(_) } + keys.close } } diff --git a/akka-persistence/akka-persistence-riak/src/test/scala/RiakStorageBackendTestIntegration.scala b/akka-persistence/akka-persistence-riak/src/test/scala/RiakStorageBackendTestIntegration.scala index b26903c708..e72cc28ba4 100644 --- a/akka-persistence/akka-persistence-riak/src/test/scala/RiakStorageBackendTestIntegration.scala +++ b/akka-persistence/akka-persistence-riak/src/test/scala/RiakStorageBackendTestIntegration.scala @@ -8,17 +8,46 @@ import se.scalablesolutions.akka.util.{Logging} import collection.immutable.TreeSet import scala.None import org.scalatest.{Spec, FunSuite} -import com.trifork.riak.RiakClient +import com.trifork.riak.{RiakObject, RiakClient} +import collection.JavaConversions @RunWith(classOf[JUnitRunner]) class RiakStorageBackendTestIntegration extends Spec with ShouldMatchers with Logging { + import se.scalablesolutions.akka.persistence.riak.RiakStorageBackend.RiakAccess._ describe("successfuly configuring the riak pb client"){ it("should connect to riak, if riak is running"){ val riakClient = new RiakClient("localhost"); + val props = riakClient.getServerInfo + JavaConversions.asMap(props) foreach { + _ match { + case (k,v) => log.info("%s -> %s",k,v) + } + } + val maps = riakClient.getBucketProperties("Maps") + debug(maps) riakClient.listBuckets should not be (null) + riakClient.store(new RiakObject("Maps", "testkey", "testvalue")) + riakClient.fetch("Maps", "testkey").isEmpty should be (false) + riakClient.fetch("Maps", "testkey")(0).getValue.toStringUtf8 should be("testvalue") + //riakClient.delete("Maps","testkey") + RiakStorageBackend.MapClient.delete("testkey") + RiakStorageBackend.VectorClient.quorum + riakClient.fetch("Maps", "testkey").isEmpty should be (true) + riakClient.store(new RiakObject("Maps", "testkey", "testvalue")) + riakClient.fetch("Maps", "testkey").isEmpty should be (false) + riakClient.fetch("Maps", "testkey")(0).getValue.toStringUtf8 should be("testvalue") + //riakClient.delete("Maps","testkey") + RiakStorageBackend.MapClient.delete("testkey") + + riakClient.fetch("Maps", "testkey").isEmpty should be (true) } } + + def debug(ani:Any){ + log.debug("ani") + } + } \ No newline at end of file