From 75ecbb55987eb73107d1c9898feb776353a8dcbc Mon Sep 17 00:00:00 2001 From: ticktock Date: Wed, 13 Oct 2010 17:03:27 -0400 Subject: [PATCH] Initial Port of the Voldemort Backend to Riak this is using the protobuf interface now, but may have to try to the http interface, since there seems to be some race conditions internally to riak during rapid updates/deletes, which may not happen with http Basically single threaded tests that involve rapid updates/deletes fail. When the same tests are run in a debugger, and breakpoints are set such that there is time in between riak requests, the tests pass. :( --- .../src/main/scala/RiakStorageBackend.scala | 46 +++++++++++-------- .../RiakStorageBackendTestIntegration.scala | 31 ++++++++++++- 2 files changed, 57 insertions(+), 20 deletions(-) diff --git a/akka-persistence/akka-persistence-riak/src/main/scala/RiakStorageBackend.scala b/akka-persistence/akka-persistence-riak/src/main/scala/RiakStorageBackend.scala index bf3a5c169f..12f28a4990 100644 --- a/akka-persistence/akka-persistence-riak/src/main/scala/RiakStorageBackend.scala +++ b/akka-persistence/akka-persistence-riak/src/main/scala/RiakStorageBackend.scala @@ -18,13 +18,10 @@ import collection.mutable.ArrayBuffer import java.util.{Properties, Map => JMap} import se.scalablesolutions.akka.persistence.common.PersistentMapBinary.COrdering._ import collection.immutable._ -import com.trifork.riak.{RiakObject, RiakClient} import com.google.protobuf.ByteString import com.google.protobuf.ByteString._ -/* - RequiredReads + RequiredWrites should be > ReplicationFactor for all Voldemort Stores - In this case all VoldemortBackend operations can be retried until successful, and data should remain consistent - */ +import com.trifork.riak.{RequestMeta, RiakObject, RiakClient} + private[akka] object RiakStorageBackend extends MapStorageBackend[Array[Byte], Array[Byte]] with @@ -32,17 +29,13 @@ MapStorageBackend[Array[Byte], Array[Byte]] with RefStorageBackend[Array[Byte]] with QueueStorageBackend[Array[Byte]] with Logging { - val bootstrapUrlsProp = "bootstrap_urls" - val clientConfig = config.getConfigMap("akka.storage.riak.client") match { - case Some(configMap) => getClientConfig(configMap.asMap) - case None => getClientConfig(new HashMap[String, String] + (bootstrapUrlsProp -> "tcp://localhost:6666")) - } val refBucket = config.getString("akka.storage.riak.bucket.ref", "Refs") val mapBucket = config.getString("akka.storage.riak.bucket.map", "Maps") val vectorBucket = config.getString("akka.storage.riak.bucket.vector", "Vectors") val queueBucket = config.getString("akka.storage.riak.bucket.queue", "Queues") - - var riakClient: RiakClient = new RiakClient("localhost"); + val clientHost = config.getString("akka.storage.riak.client.host", "localhost") + val clientPort = config.getInt("akka.storage.riak.client.port", 8087) + var riakClient: RiakClient = new RiakClient(clientHost, clientPort); val nullMapValueHeader = 0x00.byteValue val nullMapValue: Array[Byte] = Array(nullMapValueHeader) @@ -124,6 +117,7 @@ MapStorageBackend[Array[Byte], Array[Byte]] with keys.foreach { key => MapClient.delete(getKey(name, key)) + log.debug("deleted key %s for %s", key, name) } MapClient.delete(getKey(name, mapKeysIndex)) } @@ -469,17 +463,24 @@ MapStorageBackend[Array[Byte], Array[Byte]] with trait RiakAccess { def bucket: String - + //http://www.mail-archive.com/riak-users@lists.basho.com/msg01013.html + val quorum: Int = 0xfffffffd + val one: Int = 0xfffffffe + val all: Int = 0xfffffffc + val default: Int = 0xfffffffb def put(key: Array[Byte], value: Array[Byte]) = { - riakClient.store(new RiakObject(bucket, key, value)) + riakClient.store(new RiakObject(bucket, key, value), new RequestMeta().w(quorum).dw(quorum)) } def getValue(key: Array[Byte]): Array[Byte] = { - val objs = riakClient.fetch(bucket, key) + val objs = riakClient.fetch(bucket, key, quorum) objs.size match { case 0 => null; - case _ => objs.last.getValue + case _ => objs.last.getValue.isEmpty match { + case true => null + case false => objs(0).getValue + } } } @@ -493,19 +494,26 @@ MapStorageBackend[Array[Byte], Array[Byte]] with def getAll(keys: Traversable[Array[Byte]]): Map[Array[Byte], Array[Byte]] = { var result = new HashMap[Array[Byte], Array[Byte]] keys.foreach { - key => result += key -> getValue(key) + key => + val value = getValue(key) + Option(value) match { + case Some(value) => result += key -> value + case None => () + } } result } def delete(key: Array[Byte]) = { - riakClient.delete(bucket, key) + riakClient.delete(bucket, key, quorum) } def drop() { - JavaConversions.asIterable(riakClient.listKeys(bucket)) foreach { + val keys = riakClient.listKeys(bucket) + JavaConversions.asIterable(keys) foreach { delete(_) } + keys.close } } diff --git a/akka-persistence/akka-persistence-riak/src/test/scala/RiakStorageBackendTestIntegration.scala b/akka-persistence/akka-persistence-riak/src/test/scala/RiakStorageBackendTestIntegration.scala index b26903c708..e72cc28ba4 100644 --- a/akka-persistence/akka-persistence-riak/src/test/scala/RiakStorageBackendTestIntegration.scala +++ b/akka-persistence/akka-persistence-riak/src/test/scala/RiakStorageBackendTestIntegration.scala @@ -8,17 +8,46 @@ import se.scalablesolutions.akka.util.{Logging} import collection.immutable.TreeSet import scala.None import org.scalatest.{Spec, FunSuite} -import com.trifork.riak.RiakClient +import com.trifork.riak.{RiakObject, RiakClient} +import collection.JavaConversions @RunWith(classOf[JUnitRunner]) class RiakStorageBackendTestIntegration extends Spec with ShouldMatchers with Logging { + import se.scalablesolutions.akka.persistence.riak.RiakStorageBackend.RiakAccess._ describe("successfuly configuring the riak pb client"){ it("should connect to riak, if riak is running"){ val riakClient = new RiakClient("localhost"); + val props = riakClient.getServerInfo + JavaConversions.asMap(props) foreach { + _ match { + case (k,v) => log.info("%s -> %s",k,v) + } + } + val maps = riakClient.getBucketProperties("Maps") + debug(maps) riakClient.listBuckets should not be (null) + riakClient.store(new RiakObject("Maps", "testkey", "testvalue")) + riakClient.fetch("Maps", "testkey").isEmpty should be (false) + riakClient.fetch("Maps", "testkey")(0).getValue.toStringUtf8 should be("testvalue") + //riakClient.delete("Maps","testkey") + RiakStorageBackend.MapClient.delete("testkey") + RiakStorageBackend.VectorClient.quorum + riakClient.fetch("Maps", "testkey").isEmpty should be (true) + riakClient.store(new RiakObject("Maps", "testkey", "testvalue")) + riakClient.fetch("Maps", "testkey").isEmpty should be (false) + riakClient.fetch("Maps", "testkey")(0).getValue.toStringUtf8 should be("testvalue") + //riakClient.delete("Maps","testkey") + RiakStorageBackend.MapClient.delete("testkey") + + riakClient.fetch("Maps", "testkey").isEmpty should be (true) } } + + def debug(ani:Any){ + log.debug("ani") + } + } \ No newline at end of file