adding compatibility tests for cassandra (failing currently) and refactor KVStorageBackend to make some functionality easier to get at

This commit is contained in:
ticktock 2010-10-25 20:03:21 -04:00
parent 9dc370f838
commit f26fc4bdbf
4 changed files with 157 additions and 84 deletions

View file

@ -0,0 +1,41 @@
package se.scalablesolutions.akka.persistence.cassandra
import org.junit.runner.RunWith
import org.scalatest.junit.JUnitRunner
import se.scalablesolutions.akka.persistence.common.{VectorStorageBackendTest, MapStorageBackendTest, RefStorageBackendTest}
@RunWith(classOf[JUnitRunner])
class CassandraRefStorageBackendTestIntegration extends RefStorageBackendTest {
def dropRefs = {
//
}
def storage = CassandraStorageBackend
}
@RunWith(classOf[JUnitRunner])
class CassandraMapStorageBackendTestIntegration extends MapStorageBackendTest {
def dropMaps = {
//
}
def storage = CassandraStorageBackend
}
@RunWith(classOf[JUnitRunner])
class CassandraVectorStorageBackendTestIntegration extends VectorStorageBackendTest {
def dropVectors = {
//
}
def storage = CassandraStorageBackend
}

View file

@ -0,0 +1,22 @@
/**
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.persistence.cassandra
import org.junit.runner.RunWith
import org.scalatest.junit.JUnitRunner
import se.scalablesolutions.akka.persistence.common._
@RunWith(classOf[JUnitRunner])
class CassandraTicket343TestIntegration extends Ticket343Test {
def dropMapsAndVectors: Unit = {
//
}
def getVector: (String) => PersistentVector[Array[Byte]] = CassandraStorage.getVector
def getMap: (String) => PersistentMap[Array[Byte], Array[Byte]] = CassandraStorage.getMap
}

View file

@ -40,11 +40,95 @@ private[akka] object KVAccess {
}
}
private[akka] trait KVStorageBackend extends MapStorageBackend[Array[Byte], Array[Byte]] with VectorStorageBackend[Array[Byte]] with RefStorageBackend[Array[Byte]] with QueueStorageBackend[Array[Byte]] with Logging {
private[akka] object KVStorageBackend {
val nullMapValueHeader = 0x00.byteValue
val nullMapValue: Array[Byte] = Array(nullMapValueHeader)
val notNullMapValueHeader: Byte = 0xff.byteValue
def getStoredMapValue(value: Array[Byte]): Array[Byte] = {
value match {
case null => nullMapValue
case value => {
val stored = new Array[Byte](value.length + 1)
stored(0) = notNullMapValueHeader
System.arraycopy(value, 0, stored, 1, value.length)
stored
}
}
}
def getMapValueFromStored(value: Array[Byte]): Array[Byte] = {
if (value(0) == nullMapValueHeader) {
null
} else if (value(0) == notNullMapValueHeader) {
val returned = new Array[Byte](value.length - 1)
System.arraycopy(value, 1, returned, 0, value.length - 1)
returned
} else {
throw new StorageException("unknown header byte on map value:" + value(0))
}
}
object IntSerializer {
val bytesPerInt = java.lang.Integer.SIZE / java.lang.Byte.SIZE
def toBytes(i: Int) = ByteBuffer.wrap(new Array[Byte](bytesPerInt)).putInt(i).array()
def fromBytes(bytes: Array[Byte]) = ByteBuffer.wrap(bytes).getInt()
def toString(obj: Int) = obj.toString
def fromString(str: String) = str.toInt
}
object SortedSetSerializer {
def toBytes(set: SortedSet[Array[Byte]]): Array[Byte] = {
val length = set.foldLeft(0) {
(total, bytes) => {
total + bytes.length + IntSerializer.bytesPerInt
}
}
val allBytes = new Array[Byte](length)
val written = set.foldLeft(0) {
(total, bytes) => {
val sizeBytes = IntSerializer.toBytes(bytes.length)
System.arraycopy(sizeBytes, 0, allBytes, total, sizeBytes.length)
System.arraycopy(bytes, 0, allBytes, total + sizeBytes.length, bytes.length)
total + sizeBytes.length + bytes.length
}
}
require(length == written, "Bytes Written Did not equal Calculated Length, written %d, length %d".format(written, length))
allBytes
}
def fromBytes(bytes: Array[Byte]): SortedSet[Array[Byte]] = {
import se.scalablesolutions.akka.persistence.common.PersistentMapBinary.COrdering._
var set = new TreeSet[Array[Byte]]
if (bytes.length > IntSerializer.bytesPerInt) {
var pos = 0
while (pos < bytes.length) {
val lengthBytes = new Array[Byte](IntSerializer.bytesPerInt)
System.arraycopy(bytes, pos, lengthBytes, 0, IntSerializer.bytesPerInt)
pos += IntSerializer.bytesPerInt
val length = IntSerializer.fromBytes(lengthBytes)
val item = new Array[Byte](length)
System.arraycopy(bytes, pos, item, 0, length)
set = set + item
pos += length
}
}
set
}
}
}
private[akka] trait KVStorageBackend extends MapStorageBackend[Array[Byte], Array[Byte]] with VectorStorageBackend[Array[Byte]] with RefStorageBackend[Array[Byte]] with QueueStorageBackend[Array[Byte]] with Logging {
val underscoreBytesUTF8 = "_".getBytes("UTF-8")
val mapKeysIndex = getIndexedBytes(-1)
val vectorHeadIndex = getIndexedBytes(-1)
@ -54,6 +138,7 @@ private[akka] trait KVStorageBackend extends MapStorageBackend[Array[Byte], Arra
implicit val ordering = ArrayOrdering
import KVStorageBackend._
import KVAccess._
@ -239,13 +324,14 @@ private[akka] trait KVStorageBackend extends MapStorageBackend[Array[Byte], Arra
override def removeVectorStorageEntryFor(name: String) = {
val mdata = getVectorMetadata(name)
if(mdata.canRemove){
if (mdata.canRemove) {
val key = getIndexedKey(name, mdata.tail)
vectorAccess.put(getKey(name, vectorTailIndex), IntSerializer.toBytes(mdata.nextRemove))
try{
vectorAccess.delete(key)
try
{
vectorAccess.delete(key)
} catch {
case e:Exception => log.warn("Exception while trying to clean up a popped element from the vector, this is acceptable")
case e: Exception => log.warn("Exception while trying to clean up a popped element from the vector, this is acceptable")
}
} else {
@ -385,30 +471,6 @@ private[akka] trait KVStorageBackend extends MapStorageBackend[Array[Byte], Arra
//wrapper for null
def getStoredMapValue(value: Array[Byte]): Array[Byte] = {
value match {
case null => nullMapValue
case value => {
val stored = new Array[Byte](value.length + 1)
stored(0) = notNullMapValueHeader
System.arraycopy(value, 0, stored, 1, value.length)
stored
}
}
}
def getMapValueFromStored(value: Array[Byte]): Array[Byte] = {
if (value(0) == nullMapValueHeader) {
null
} else if (value(0) == notNullMapValueHeader) {
val returned = new Array[Byte](value.length - 1)
System.arraycopy(value, 1, returned, 0, value.length - 1)
returned
} else {
throw new StorageException("unknown header byte on map value:" + value(0))
}
}
case class QueueMetadata(head: Int, tail: Int) {
//queue is an sequence with indexes from 0 to Int.MAX_VALUE
@ -476,7 +538,7 @@ private[akka] trait KVStorageBackend extends MapStorageBackend[Array[Byte], Arra
def size = {
if (head >= tail) {
head - tail
head - tail
} else {
//queue has wrapped
(Integer.MAX_VALUE - tail) + (head + 1)
@ -530,58 +592,5 @@ private[akka] trait KVStorageBackend extends MapStorageBackend[Array[Byte], Arra
}
}
object IntSerializer {
val bytesPerInt = java.lang.Integer.SIZE / java.lang.Byte.SIZE
def toBytes(i: Int) = ByteBuffer.wrap(new Array[Byte](bytesPerInt)).putInt(i).array()
def fromBytes(bytes: Array[Byte]) = ByteBuffer.wrap(bytes).getInt()
def toString(obj: Int) = obj.toString
def fromString(str: String) = str.toInt
}
object SortedSetSerializer {
def toBytes(set: SortedSet[Array[Byte]]): Array[Byte] = {
val length = set.foldLeft(0) {
(total, bytes) => {
total + bytes.length + IntSerializer.bytesPerInt
}
}
val allBytes = new Array[Byte](length)
val written = set.foldLeft(0) {
(total, bytes) => {
val sizeBytes = IntSerializer.toBytes(bytes.length)
System.arraycopy(sizeBytes, 0, allBytes, total, sizeBytes.length)
System.arraycopy(bytes, 0, allBytes, total + sizeBytes.length, bytes.length)
total + sizeBytes.length + bytes.length
}
}
require(length == written, "Bytes Written Did not equal Calculated Length, written %d, length %d".format(written, length))
allBytes
}
def fromBytes(bytes: Array[Byte]): SortedSet[Array[Byte]] = {
import se.scalablesolutions.akka.persistence.common.PersistentMapBinary.COrdering._
var set = new TreeSet[Array[Byte]]
if (bytes.length > IntSerializer.bytesPerInt) {
var pos = 0
while (pos < bytes.length) {
val lengthBytes = new Array[Byte](IntSerializer.bytesPerInt)
System.arraycopy(bytes, pos, lengthBytes, 0, IntSerializer.bytesPerInt)
pos += IntSerializer.bytesPerInt
val length = IntSerializer.fromBytes(lengthBytes)
val item = new Array[Byte](length)
System.arraycopy(bytes, pos, item, 0, length)
set = set + item
pos += length
}
}
set
}
}
}

View file

@ -5,6 +5,7 @@ import org.scalatest.matchers.ShouldMatchers
import org.junit.runner.RunWith
import org.scalatest.junit.JUnitRunner
import se.scalablesolutions.akka.persistence.voldemort.VoldemortStorageBackend._
import se.scalablesolutions.akka.persistence.common.KVStorageBackend._
import se.scalablesolutions.akka.util.{Logging}
import collection.immutable.TreeSet
import VoldemortStorageBackendSuite._