Initial Port of the Voldemort Backend to Riak
this is using the protobuf interface now, but may have to try to the http interface, since there seems to be some race conditions internally to riak during rapid updates/deletes, which may not happen with http Basically single threaded tests that involve rapid updates/deletes fail. When the same tests are run in a debugger, and breakpoints are set such that there is time in between riak requests, the tests pass. :(
This commit is contained in:
parent
a3a9dcddbb
commit
20007f7355
2 changed files with 57 additions and 20 deletions
|
|
@ -18,13 +18,10 @@ import collection.mutable.ArrayBuffer
|
|||
import java.util.{Properties, Map => JMap}
|
||||
import se.scalablesolutions.akka.persistence.common.PersistentMapBinary.COrdering._
|
||||
import collection.immutable._
|
||||
import com.trifork.riak.{RiakObject, RiakClient}
|
||||
import com.google.protobuf.ByteString
|
||||
import com.google.protobuf.ByteString._
|
||||
/*
|
||||
RequiredReads + RequiredWrites should be > ReplicationFactor for all Voldemort Stores
|
||||
In this case all VoldemortBackend operations can be retried until successful, and data should remain consistent
|
||||
*/
|
||||
import com.trifork.riak.{RequestMeta, RiakObject, RiakClient}
|
||||
|
||||
|
||||
private[akka] object RiakStorageBackend extends
|
||||
MapStorageBackend[Array[Byte], Array[Byte]] with
|
||||
|
|
@ -32,17 +29,13 @@ MapStorageBackend[Array[Byte], Array[Byte]] with
|
|||
RefStorageBackend[Array[Byte]] with
|
||||
QueueStorageBackend[Array[Byte]] with
|
||||
Logging {
|
||||
val bootstrapUrlsProp = "bootstrap_urls"
|
||||
val clientConfig = config.getConfigMap("akka.storage.riak.client") match {
|
||||
case Some(configMap) => getClientConfig(configMap.asMap)
|
||||
case None => getClientConfig(new HashMap[String, String] + (bootstrapUrlsProp -> "tcp://localhost:6666"))
|
||||
}
|
||||
val refBucket = config.getString("akka.storage.riak.bucket.ref", "Refs")
|
||||
val mapBucket = config.getString("akka.storage.riak.bucket.map", "Maps")
|
||||
val vectorBucket = config.getString("akka.storage.riak.bucket.vector", "Vectors")
|
||||
val queueBucket = config.getString("akka.storage.riak.bucket.queue", "Queues")
|
||||
|
||||
var riakClient: RiakClient = new RiakClient("localhost");
|
||||
val clientHost = config.getString("akka.storage.riak.client.host", "localhost")
|
||||
val clientPort = config.getInt("akka.storage.riak.client.port", 8087)
|
||||
var riakClient: RiakClient = new RiakClient(clientHost, clientPort);
|
||||
|
||||
val nullMapValueHeader = 0x00.byteValue
|
||||
val nullMapValue: Array[Byte] = Array(nullMapValueHeader)
|
||||
|
|
@ -124,6 +117,7 @@ MapStorageBackend[Array[Byte], Array[Byte]] with
|
|||
keys.foreach {
|
||||
key =>
|
||||
MapClient.delete(getKey(name, key))
|
||||
log.debug("deleted key %s for %s", key, name)
|
||||
}
|
||||
MapClient.delete(getKey(name, mapKeysIndex))
|
||||
}
|
||||
|
|
@ -469,17 +463,24 @@ MapStorageBackend[Array[Byte], Array[Byte]] with
|
|||
|
||||
trait RiakAccess {
|
||||
def bucket: String
|
||||
|
||||
//http://www.mail-archive.com/riak-users@lists.basho.com/msg01013.html
|
||||
val quorum: Int = 0xfffffffd
|
||||
val one: Int = 0xfffffffe
|
||||
val all: Int = 0xfffffffc
|
||||
val default: Int = 0xfffffffb
|
||||
|
||||
def put(key: Array[Byte], value: Array[Byte]) = {
|
||||
riakClient.store(new RiakObject(bucket, key, value))
|
||||
riakClient.store(new RiakObject(bucket, key, value), new RequestMeta().w(quorum).dw(quorum))
|
||||
}
|
||||
|
||||
def getValue(key: Array[Byte]): Array[Byte] = {
|
||||
val objs = riakClient.fetch(bucket, key)
|
||||
val objs = riakClient.fetch(bucket, key, quorum)
|
||||
objs.size match {
|
||||
case 0 => null;
|
||||
case _ => objs.last.getValue
|
||||
case _ => objs.last.getValue.isEmpty match {
|
||||
case true => null
|
||||
case false => objs(0).getValue
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -493,19 +494,26 @@ MapStorageBackend[Array[Byte], Array[Byte]] with
|
|||
def getAll(keys: Traversable[Array[Byte]]): Map[Array[Byte], Array[Byte]] = {
|
||||
var result = new HashMap[Array[Byte], Array[Byte]]
|
||||
keys.foreach {
|
||||
key => result += key -> getValue(key)
|
||||
key =>
|
||||
val value = getValue(key)
|
||||
Option(value) match {
|
||||
case Some(value) => result += key -> value
|
||||
case None => ()
|
||||
}
|
||||
}
|
||||
result
|
||||
}
|
||||
|
||||
def delete(key: Array[Byte]) = {
|
||||
riakClient.delete(bucket, key)
|
||||
riakClient.delete(bucket, key, quorum)
|
||||
}
|
||||
|
||||
def drop() {
|
||||
JavaConversions.asIterable(riakClient.listKeys(bucket)) foreach {
|
||||
val keys = riakClient.listKeys(bucket)
|
||||
JavaConversions.asIterable(keys) foreach {
|
||||
delete(_)
|
||||
}
|
||||
keys.close
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -8,17 +8,46 @@ import se.scalablesolutions.akka.util.{Logging}
|
|||
import collection.immutable.TreeSet
|
||||
import scala.None
|
||||
import org.scalatest.{Spec, FunSuite}
|
||||
import com.trifork.riak.RiakClient
|
||||
import com.trifork.riak.{RiakObject, RiakClient}
|
||||
import collection.JavaConversions
|
||||
|
||||
@RunWith(classOf[JUnitRunner])
|
||||
class RiakStorageBackendTestIntegration extends Spec with ShouldMatchers with Logging {
|
||||
|
||||
import se.scalablesolutions.akka.persistence.riak.RiakStorageBackend.RiakAccess._
|
||||
|
||||
describe("successfuly configuring the riak pb client"){
|
||||
it("should connect to riak, if riak is running"){
|
||||
val riakClient = new RiakClient("localhost");
|
||||
val props = riakClient.getServerInfo
|
||||
JavaConversions.asMap(props) foreach {
|
||||
_ match {
|
||||
case (k,v) => log.info("%s -> %s",k,v)
|
||||
}
|
||||
}
|
||||
val maps = riakClient.getBucketProperties("Maps")
|
||||
debug(maps)
|
||||
riakClient.listBuckets should not be (null)
|
||||
riakClient.store(new RiakObject("Maps", "testkey", "testvalue"))
|
||||
riakClient.fetch("Maps", "testkey").isEmpty should be (false)
|
||||
riakClient.fetch("Maps", "testkey")(0).getValue.toStringUtf8 should be("testvalue")
|
||||
//riakClient.delete("Maps","testkey")
|
||||
RiakStorageBackend.MapClient.delete("testkey")
|
||||
RiakStorageBackend.VectorClient.quorum
|
||||
riakClient.fetch("Maps", "testkey").isEmpty should be (true)
|
||||
riakClient.store(new RiakObject("Maps", "testkey", "testvalue"))
|
||||
riakClient.fetch("Maps", "testkey").isEmpty should be (false)
|
||||
riakClient.fetch("Maps", "testkey")(0).getValue.toStringUtf8 should be("testvalue")
|
||||
//riakClient.delete("Maps","testkey")
|
||||
RiakStorageBackend.MapClient.delete("testkey")
|
||||
|
||||
riakClient.fetch("Maps", "testkey").isEmpty should be (true)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
def debug(ani:Any){
|
||||
log.debug("ani")
|
||||
}
|
||||
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue