diff --git a/akka-persistence/akka-persistence-riak/src/main/scala/RiakStorageBackend.scala b/akka-persistence/akka-persistence-riak/src/main/scala/RiakStorageBackend.scala index 534d9888eb..bf3a5c169f 100644 --- a/akka-persistence/akka-persistence-riak/src/main/scala/RiakStorageBackend.scala +++ b/akka-persistence/akka-persistence-riak/src/main/scala/RiakStorageBackend.scala @@ -1,11 +1,584 @@ /** - * Copyright (C) 2009-2010 Scalable Solutions AB + * Copyright (C) 2009-2010 Scalable Solutions AB */ package se.scalablesolutions.akka.persistence.riak -import se.scalablesolutions.akka.actor.{newUuid} import se.scalablesolutions.akka.stm._ import se.scalablesolutions.akka.persistence.common._ +import se.scalablesolutions.akka.util.Logging +import se.scalablesolutions.akka.util.Helpers._ +import se.scalablesolutions.akka.config.Config.config -class RiakStorageBackend \ No newline at end of file +import java.lang.String +import collection.JavaConversions +import java.nio.ByteBuffer +import collection.Map +import collection.mutable.ArrayBuffer +import java.util.{Properties, Map => JMap} +import se.scalablesolutions.akka.persistence.common.PersistentMapBinary.COrdering._ +import collection.immutable._ +import com.trifork.riak.{RiakObject, RiakClient} +import com.google.protobuf.ByteString +import com.google.protobuf.ByteString._ +/* + RequiredReads + RequiredWrites should be > ReplicationFactor for all Voldemort Stores + In this case all VoldemortBackend operations can be retried until successful, and data should remain consistent + */ + +private[akka] object RiakStorageBackend extends +MapStorageBackend[Array[Byte], Array[Byte]] with + VectorStorageBackend[Array[Byte]] with + RefStorageBackend[Array[Byte]] with + QueueStorageBackend[Array[Byte]] with + Logging { + val bootstrapUrlsProp = "bootstrap_urls" + val clientConfig = config.getConfigMap("akka.storage.riak.client") match { + case Some(configMap) => getClientConfig(configMap.asMap) + case None => getClientConfig(new HashMap[String, String] + (bootstrapUrlsProp -> "tcp://localhost:6666")) + } + val refBucket = config.getString("akka.storage.riak.bucket.ref", "Refs") + val mapBucket = config.getString("akka.storage.riak.bucket.map", "Maps") + val vectorBucket = config.getString("akka.storage.riak.bucket.vector", "Vectors") + val queueBucket = config.getString("akka.storage.riak.bucket.queue", "Queues") + + var riakClient: RiakClient = new RiakClient("localhost"); + + val nullMapValueHeader = 0x00.byteValue + val nullMapValue: Array[Byte] = Array(nullMapValueHeader) + val notNullMapValueHeader: Byte = 0xff.byteValue + val underscoreBytesUTF8 = "_".getBytes("UTF-8") + val mapKeysIndex = getIndexedBytes(-1) + val vectorSizeIndex = getIndexedBytes(-1) + val queueHeadIndex = getIndexedBytes(-1) + val queueTailIndex = getIndexedBytes(-2) + //explicit implicit :) + implicit val ordering = ArrayOrdering + import RiakAccess._ + + def getRefStorageFor(name: String): Option[Array[Byte]] = { + val result: Array[Byte] = RefClient.getValue(name) + Option(result) + } + + def insertRefStorageFor(name: String, element: Array[Byte]) = { + element match { + case null => RefClient.delete(name) + case _ => RefClient.put(name, element) + } + } + + def getMapStorageRangeFor(name: String, start: Option[Array[Byte]], finish: Option[Array[Byte]], count: Int): List[(Array[Byte], Array[Byte])] = { + val allkeys: SortedSet[Array[Byte]] = getMapKeys(name) + val range = allkeys.rangeImpl(start, finish).take(count) + getKeyValues(name, range) + } + + def getMapStorageFor(name: String): List[(Array[Byte], Array[Byte])] = { + val keys = getMapKeys(name) + getKeyValues(name, keys) + } + + private def getKeyValues(name: String, keys: SortedSet[Array[Byte]]): List[(Array[Byte], Array[Byte])] = { + val all: Map[Array[Byte], Array[Byte]] = + MapClient.getAll(keys.map { + mapKey => getKey(name, mapKey) + }) + + var returned = new TreeMap[Array[Byte], Array[Byte]]()(ordering) + all.foreach { + (entry) => { + entry match { + case (namePlusKey: Array[Byte], value: Array[Byte]) => { + returned += getMapKeyFromKey(name, namePlusKey) -> getMapValueFromStored(value) + } + } + } + } + returned.toList + } + + def getMapStorageSizeFor(name: String): Int = { + val keys = getMapKeys(name) + keys.size + } + + def getMapStorageEntryFor(name: String, key: Array[Byte]): Option[Array[Byte]] = { + val result: Array[Byte] = MapClient.getValue(getKey(name, key)) + result match { + case null => None + case _ => Some(getMapValueFromStored(result)) + } + } + + def removeMapStorageFor(name: String, key: Array[Byte]) = { + var keys = getMapKeys(name) + keys -= key + putMapKeys(name, keys) + MapClient.delete(getKey(name, key)) + } + + + def removeMapStorageFor(name: String) = { + val keys = getMapKeys(name) + keys.foreach { + key => + MapClient.delete(getKey(name, key)) + } + MapClient.delete(getKey(name, mapKeysIndex)) + } + + def insertMapStorageEntryFor(name: String, key: Array[Byte], value: Array[Byte]) = { + MapClient.put(getKey(name, key), getStoredMapValue(value)) + var keys = getMapKeys(name) + keys += key + putMapKeys(name, keys) + } + + def insertMapStorageEntriesFor(name: String, entries: List[(Array[Byte], Array[Byte])]) = { + val newKeys = entries.map { + case (key, value) => { + MapClient.put(getKey(name, key), getStoredMapValue(value)) + key + } + } + var keys = getMapKeys(name) + keys ++= newKeys + putMapKeys(name, keys) + } + + def putMapKeys(name: String, keys: SortedSet[Array[Byte]]) = { + MapClient.put(getKey(name, mapKeysIndex), SortedSetSerializer.toBytes(keys)) + } + + def getMapKeys(name: String): SortedSet[Array[Byte]] = { + SortedSetSerializer.fromBytes(MapClient.getValue(getKey(name, mapKeysIndex), Array.empty[Byte])) + } + + + def getVectorStorageSizeFor(name: String): Int = { + IntSerializer.fromBytes(VectorClient.getValue(getKey(name, vectorSizeIndex), IntSerializer.toBytes(0))) + } + + + def getVectorStorageRangeFor(name: String, start: Option[Int], finish: Option[Int], count: Int): List[Array[Byte]] = { + val size = getVectorStorageSizeFor(name) + val st = start.getOrElse(0) + var cnt = + if (finish.isDefined) { + val f = finish.get + if (f >= st) (f - st) else count + } else { + count + } + if (cnt > (size - st)) { + cnt = size - st + } + + + val seq: IndexedSeq[Array[Byte]] = (st until st + cnt).map { + index => getIndexedKey(name, (size - 1) - index) + } //read backwards + + val all: Map[Array[Byte], Array[Byte]] = VectorClient.getAll(seq) + + var storage = new ArrayBuffer[Array[Byte]](seq.size) + storage = storage.padTo(seq.size, Array.empty[Byte]) + var idx = 0; + seq.foreach { + key => { + if (all.isDefinedAt(key)) { + storage.update(idx, all.get(key).get) + } + idx += 1 + } + } + + storage.toList + } + + + def getVectorStorageEntryFor(name: String, index: Int): Array[Byte] = { + val size = getVectorStorageSizeFor(name) + if (size > 0 && index < size) { + VectorClient.getValue(getIndexedKey(name, /*read backwards*/ (size - 1) - index)) + } else { + throw new StorageException("In Vector:" + name + " No such Index:" + index) + } + } + + def updateVectorStorageEntryFor(name: String, index: Int, elem: Array[Byte]) = { + val size = getVectorStorageSizeFor(name) + if (size > 0 && index < size) { + elem match { + case null => VectorClient.delete(getIndexedKey(name, /*read backwards*/ (size - 1) - index)) + case _ => VectorClient.put(getIndexedKey(name, /*read backwards*/ (size - 1) - index), elem) + } + } else { + throw new StorageException("In Vector:" + name + " No such Index:" + index) + } + } + + def insertVectorStorageEntriesFor(name: String, elements: List[Array[Byte]]) = { + var size = getVectorStorageSizeFor(name) + elements.foreach { + element => + if (element != null) { + VectorClient.put(getIndexedKey(name, size), element) + } + size += 1 + } + VectorClient.put(getKey(name, vectorSizeIndex), IntSerializer.toBytes(size)) + } + + def insertVectorStorageEntryFor(name: String, element: Array[Byte]) = { + insertVectorStorageEntriesFor(name, List(element)) + } + + + def remove(name: String): Boolean = { + val mdata = getQueueMetadata(name) + mdata.getActiveIndexes foreach { + index => + QueueClient.delete(getIndexedKey(name, index)) + } + QueueClient.delete(getKey(name, queueHeadIndex)) + QueueClient.delete(getKey(name, queueTailIndex)) + true + } + + def peek(name: String, start: Int, count: Int): List[Array[Byte]] = { + val mdata = getQueueMetadata(name) + val ret = mdata.getPeekIndexes(start, count).toList map { + index: Int => { + log.debug("peeking:" + index) + QueueClient.getValue(getIndexedKey(name, index)) + } + } + ret + } + + def size(name: String): Int = { + getQueueMetadata(name).size + } + + def dequeue(name: String): Option[Array[Byte]] = { + val mdata = getQueueMetadata(name) + if (mdata.canDequeue) { + val key = getIndexedKey(name, mdata.head) + try { + val dequeued = QueueClient.getValue(key) + QueueClient.put(getKey(name, queueHeadIndex), IntSerializer.toBytes(mdata.nextDequeue)) + Some(dequeued) + } + finally { + try { + QueueClient.delete(key) + } catch { + //a failure to delete is ok, just leaves a K-V in Voldemort that will be overwritten if the queue ever wraps around + case e: Exception => log.warn(e, "caught an exception while deleting a dequeued element, however this will not cause any inconsistency in the queue") + } + } + } else { + None + } + } + + def enqueue(name: String, item: Array[Byte]): Option[Int] = { + val mdata = getQueueMetadata(name) + if (mdata.canEnqueue) { + val key = getIndexedKey(name, mdata.tail) + item match { + case null => QueueClient.delete(key) + case _ => QueueClient.put(key, item) + } + QueueClient.put(getKey(name, queueTailIndex), IntSerializer.toBytes(mdata.nextEnqueue)) + Some(mdata.size + 1) + } else { + None + } + } + + + def getQueueMetadata(name: String): QueueMetadata = { + val keys = List(getKey(name, queueHeadIndex), getKey(name, queueTailIndex)) + val qdata = QueueClient.getAll(keys) + val values = keys.map { + qdata.get(_) match { + case Some(value) => IntSerializer.fromBytes(value) + case None => 0 + } + } + QueueMetadata(values.head, values.tail.head) + } + + /** + * Concat the ownerlenght+owner+key+ of owner so owned data will be colocated + * Store the length of owner as first byte to work around the rare case + * where ownerbytes1 + keybytes1 == ownerbytes2 + keybytes2 but ownerbytes1 != ownerbytes2 + */ + + + def getKey(owner: String, key: Array[Byte]): Array[Byte] = { + val ownerBytes: Array[Byte] = owner.getBytes("UTF-8") + val ownerLenghtBytes: Array[Byte] = IntSerializer.toBytes(owner.length) + val theKey = new Array[Byte](ownerLenghtBytes.length + ownerBytes.length + key.length) + System.arraycopy(ownerLenghtBytes, 0, theKey, 0, ownerLenghtBytes.length) + System.arraycopy(ownerBytes, 0, theKey, ownerLenghtBytes.length, ownerBytes.length) + System.arraycopy(key, 0, theKey, ownerLenghtBytes.length + ownerBytes.length, key.length) + theKey + } + + def getIndexedBytes(index: Int): Array[Byte] = { + val indexbytes = IntSerializer.toBytes(index) + val theIndexKey = new Array[Byte](underscoreBytesUTF8.length + indexbytes.length) + System.arraycopy(underscoreBytesUTF8, 0, theIndexKey, 0, underscoreBytesUTF8.length) + System.arraycopy(indexbytes, 0, theIndexKey, underscoreBytesUTF8.length, indexbytes.length) + theIndexKey + } + + def getIndexedKey(owner: String, index: Int): Array[Byte] = { + getKey(owner, getIndexedBytes(index)) + } + + def getIndexFromVectorValueKey(owner: String, key: Array[Byte]): Int = { + val indexBytes = new Array[Byte](IntSerializer.bytesPerInt) + System.arraycopy(key, key.length - IntSerializer.bytesPerInt, indexBytes, 0, IntSerializer.bytesPerInt) + IntSerializer.fromBytes(indexBytes) + } + + def getMapKeyFromKey(owner: String, key: Array[Byte]): Array[Byte] = { + val mapKeyLength = key.length - IntSerializer.bytesPerInt - owner.getBytes("UTF-8").length + val mapkey = new Array[Byte](mapKeyLength) + System.arraycopy(key, key.length - mapKeyLength, mapkey, 0, mapKeyLength) + mapkey + } + + //wrapper for null + def getStoredMapValue(value: Array[Byte]): Array[Byte] = { + value match { + case null => nullMapValue + case value => { + val stored = new Array[Byte](value.length + 1) + stored(0) = notNullMapValueHeader + System.arraycopy(value, 0, stored, 1, value.length) + stored + } + } + } + + def getMapValueFromStored(value: Array[Byte]): Array[Byte] = { + + if (value(0) == nullMapValueHeader) { + null + } else if (value(0) == notNullMapValueHeader) { + val returned = new Array[Byte](value.length - 1) + System.arraycopy(value, 1, returned, 0, value.length - 1) + returned + } else { + throw new StorageException("unknown header byte on map value:" + value(0)) + } + } + + + def getClientConfig(configMap: Map[String, String]): Properties = { + val properites = new Properties + configMap.foreach { + keyval => keyval match { + case (key, value) => properites.setProperty(key.asInstanceOf[java.lang.String], value.asInstanceOf[java.lang.String]) + } + } + properites + } + + + + + case class QueueMetadata(head: Int, tail: Int) { + //queue is an sequence with indexes from 0 to Int.MAX_VALUE + //wraps around when one pointer gets to max value + //head has an element in it. + //tail is the next slot to write to. + def size = { + if (tail >= head) { + tail - head + } else { + //queue has wrapped + (Integer.MAX_VALUE - head) + (tail + 1) + } + } + + def canEnqueue = { + //the -1 stops the tail from catching the head on a wrap around + size < Integer.MAX_VALUE - 1 + } + + def canDequeue = {size > 0} + + def getActiveIndexes(): IndexedSeq[Int] = { + if (tail >= head) { + Range(head, tail) + } else { + //queue has wrapped + val headRange = Range.inclusive(head, Integer.MAX_VALUE) + (if (tail > 0) {headRange ++ Range(0, tail)} else {headRange}) + } + } + + def getPeekIndexes(start: Int, count: Int): IndexedSeq[Int] = { + val indexes = getActiveIndexes + if (indexes.size < start) + {IndexedSeq.empty[Int]} else + {indexes.drop(start).take(count)} + } + + def nextEnqueue = { + tail match { + case Integer.MAX_VALUE => 0 + case _ => tail + 1 + } + } + + def nextDequeue = { + head match { + case Integer.MAX_VALUE => 0 + case _ => head + 1 + } + } + } + + + + object RiakAccess { + implicit def byteArrayToByteString(ary: Array[Byte]): ByteString = { + ByteString.copyFrom(ary) + } + + implicit def byteStringToByteArray(bs: ByteString): Array[Byte] = { + bs.toByteArray + } + + implicit def stringToByteString(bucket: String): ByteString = { + ByteString.copyFromUtf8(bucket) + } + + implicit def stringToByteArray(st: String): Array[Byte] = { + st.getBytes("UTF-8") + } + } + + trait RiakAccess { + def bucket: String + + + def put(key: Array[Byte], value: Array[Byte]) = { + riakClient.store(new RiakObject(bucket, key, value)) + } + + def getValue(key: Array[Byte]): Array[Byte] = { + val objs = riakClient.fetch(bucket, key) + objs.size match { + case 0 => null; + case _ => objs.last.getValue + } + } + + def getValue(key: Array[Byte], default: Array[Byte]): Array[Byte] = { + Option(getValue(key)) match { + case Some(value) => value + case None => default + } + } + + def getAll(keys: Traversable[Array[Byte]]): Map[Array[Byte], Array[Byte]] = { + var result = new HashMap[Array[Byte], Array[Byte]] + keys.foreach { + key => result += key -> getValue(key) + } + result + } + + def delete(key: Array[Byte]) = { + riakClient.delete(bucket, key) + } + + def drop() { + JavaConversions.asIterable(riakClient.listKeys(bucket)) foreach { + delete(_) + } + } + } + + object RefClient extends RiakAccess { + def bucket: String = refBucket + } + + object MapClient extends RiakAccess { + def bucket = mapBucket + } + + object VectorClient extends RiakAccess { + def bucket = vectorBucket + } + + object QueueClient extends RiakAccess { + def bucket = queueBucket + } + + + + object IntSerializer { + val bytesPerInt = java.lang.Integer.SIZE / java.lang.Byte.SIZE + + def toBytes(i: Int) = ByteBuffer.wrap(new Array[Byte](bytesPerInt)).putInt(i).array() + + def fromBytes(bytes: Array[Byte]) = ByteBuffer.wrap(bytes).getInt() + + def toString(obj: Int) = obj.toString + + def fromString(str: String) = str.toInt + } + + object SortedSetSerializer { + def toBytes(set: SortedSet[Array[Byte]]): Array[Byte] = { + val length = set.foldLeft(0) { + (total, bytes) => { + total + bytes.length + IntSerializer.bytesPerInt + } + } + val allBytes = new Array[Byte](length) + val written = set.foldLeft(0) { + (total, bytes) => { + val sizeBytes = IntSerializer.toBytes(bytes.length) + System.arraycopy(sizeBytes, 0, allBytes, total, sizeBytes.length) + System.arraycopy(bytes, 0, allBytes, total + sizeBytes.length, bytes.length) + total + sizeBytes.length + bytes.length + } + } + require(length == written, "Bytes Written Did not equal Calculated Length, written %d, length %d".format(written, length)) + allBytes + } + + def fromBytes(bytes: Array[Byte]): SortedSet[Array[Byte]] = { + import se.scalablesolutions.akka.persistence.common.PersistentMapBinary.COrdering._ + + var set = new TreeSet[Array[Byte]] + if (bytes.length > IntSerializer.bytesPerInt) { + var pos = 0 + while (pos < bytes.length) { + val lengthBytes = new Array[Byte](IntSerializer.bytesPerInt) + System.arraycopy(bytes, pos, lengthBytes, 0, IntSerializer.bytesPerInt) + pos += IntSerializer.bytesPerInt + val length = IntSerializer.fromBytes(lengthBytes) + val item = new Array[Byte](length) + System.arraycopy(bytes, pos, item, 0, length) + set = set + item + pos += length + } + } + set + } + + } + +} \ No newline at end of file diff --git a/akka-persistence/akka-persistence-riak/src/test/scala/RiakStorageBackendCompatibilityTest.scala b/akka-persistence/akka-persistence-riak/src/test/scala/RiakStorageBackendCompatibilityTest.scala new file mode 100644 index 0000000000..473931104a --- /dev/null +++ b/akka-persistence/akka-persistence-riak/src/test/scala/RiakStorageBackendCompatibilityTest.scala @@ -0,0 +1,49 @@ +package se.scalablesolutions.akka.persistence.riak + + +import org.junit.runner.RunWith +import org.scalatest.junit.JUnitRunner +import se.scalablesolutions.akka.persistence.common.{QueueStorageBackendTest, VectorStorageBackendTest, MapStorageBackendTest, RefStorageBackendTest} + +@RunWith(classOf[JUnitRunner]) +class RiakRefStorageBackendTestIntegration extends RefStorageBackendTest { + def dropRefs = { + RiakStorageBackend.RefClient.drop + } + + + def storage = RiakStorageBackend +} + +@RunWith(classOf[JUnitRunner]) +class RiakMapStorageBackendTestIntegration extends MapStorageBackendTest { + def dropMaps = { + RiakStorageBackend.MapClient.drop + } + + + def storage = RiakStorageBackend +} + +@RunWith(classOf[JUnitRunner]) +class RiakVectorStorageBackendTestIntegration extends VectorStorageBackendTest { + def dropVectors = { + RiakStorageBackend.VectorClient.drop + } + + + def storage = RiakStorageBackend +} + + +@RunWith(classOf[JUnitRunner]) +class RiakQueueStorageBackendTestIntegration extends QueueStorageBackendTest { + def dropQueues = { + RiakStorageBackend.QueueClient.drop + } + + + def storage = RiakStorageBackend +} + + diff --git a/akka-persistence/akka-persistence-riak/src/test/scala/RiakStorageBackendTestIntegration.scala b/akka-persistence/akka-persistence-riak/src/test/scala/RiakStorageBackendTestIntegration.scala new file mode 100644 index 0000000000..b26903c708 --- /dev/null +++ b/akka-persistence/akka-persistence-riak/src/test/scala/RiakStorageBackendTestIntegration.scala @@ -0,0 +1,24 @@ +package se.scalablesolutions.akka.persistence.riak + +import org.scalatest.matchers.ShouldMatchers +import org.junit.runner.RunWith +import org.scalatest.junit.JUnitRunner +import se.scalablesolutions.akka.persistence.riak.RiakStorageBackend._ +import se.scalablesolutions.akka.util.{Logging} +import collection.immutable.TreeSet +import scala.None +import org.scalatest.{Spec, FunSuite} +import com.trifork.riak.RiakClient + +@RunWith(classOf[JUnitRunner]) +class RiakStorageBackendTestIntegration extends Spec with ShouldMatchers with Logging { + + + describe("successfuly configuring the riak pb client"){ + it("should connect to riak, if riak is running"){ + val riakClient = new RiakClient("localhost"); + riakClient.listBuckets should not be (null) + } + } + +} \ No newline at end of file diff --git a/akka-persistence/akka-persistence-riak/src/test/scala/RiakTicket343TestIntegration.scala b/akka-persistence/akka-persistence-riak/src/test/scala/RiakTicket343TestIntegration.scala new file mode 100644 index 0000000000..9782730803 --- /dev/null +++ b/akka-persistence/akka-persistence-riak/src/test/scala/RiakTicket343TestIntegration.scala @@ -0,0 +1,22 @@ +/** + * Copyright (C) 2009-2010 Scalable Solutions AB + */ + +package se.scalablesolutions.akka.persistence.riak + + +import org.junit.runner.RunWith +import org.scalatest.junit.JUnitRunner +import se.scalablesolutions.akka.persistence.common._ + +@RunWith(classOf[JUnitRunner]) +class RiakTicket343Test extends Ticket343Test { + def dropMapsAndVectors: Unit = { + RiakStorageBackend.VectorClient.drop + RiakStorageBackend.MapClient.drop + } + + def getVector: (String) => PersistentVector[Array[Byte]] = RiakStorage.getVector + + def getMap: (String) => PersistentMap[Array[Byte], Array[Byte]] = RiakStorage.getMap +} \ No newline at end of file