diff --git a/akka-persistence/akka-persistence-cassandra/src/test/scala/CassandraStorageBackendCompatibilityTest.scala b/akka-persistence/akka-persistence-cassandra/src/test/scala/CassandraStorageBackendCompatibilityTest.scala new file mode 100644 index 0000000000..8149589cc7 --- /dev/null +++ b/akka-persistence/akka-persistence-cassandra/src/test/scala/CassandraStorageBackendCompatibilityTest.scala @@ -0,0 +1,41 @@ +package se.scalablesolutions.akka.persistence.cassandra + + +import org.junit.runner.RunWith +import org.scalatest.junit.JUnitRunner +import se.scalablesolutions.akka.persistence.common.{VectorStorageBackendTest, MapStorageBackendTest, RefStorageBackendTest} + +@RunWith(classOf[JUnitRunner]) +class CassandraRefStorageBackendTestIntegration extends RefStorageBackendTest { + def dropRefs = { + // + } + + + def storage = CassandraStorageBackend +} + +@RunWith(classOf[JUnitRunner]) +class CassandraMapStorageBackendTestIntegration extends MapStorageBackendTest { + def dropMaps = { + // + } + + + def storage = CassandraStorageBackend +} + +@RunWith(classOf[JUnitRunner]) +class CassandraVectorStorageBackendTestIntegration extends VectorStorageBackendTest { + def dropVectors = { + // + } + + + def storage = CassandraStorageBackend +} + + + + + diff --git a/akka-persistence/akka-persistence-cassandra/src/test/scala/CassandraTicket343TestIntegration.scala b/akka-persistence/akka-persistence-cassandra/src/test/scala/CassandraTicket343TestIntegration.scala new file mode 100644 index 0000000000..fb727e9d70 --- /dev/null +++ b/akka-persistence/akka-persistence-cassandra/src/test/scala/CassandraTicket343TestIntegration.scala @@ -0,0 +1,22 @@ +/** + * Copyright (C) 2009-2010 Scalable Solutions AB + */ + +package se.scalablesolutions.akka.persistence.cassandra + + +import org.junit.runner.RunWith +import org.scalatest.junit.JUnitRunner +import se.scalablesolutions.akka.persistence.common._ + +@RunWith(classOf[JUnitRunner]) +class CassandraTicket343TestIntegration extends Ticket343Test { + def dropMapsAndVectors: Unit = { + // + } + + def getVector: (String) => PersistentVector[Array[Byte]] = CassandraStorage.getVector + + def getMap: (String) => PersistentMap[Array[Byte], Array[Byte]] = CassandraStorage.getMap + +} \ No newline at end of file diff --git a/akka-persistence/akka-persistence-common/src/main/scala/KVStorageBackend.scala b/akka-persistence/akka-persistence-common/src/main/scala/KVStorageBackend.scala index 4b26bfb27a..5c040fddec 100644 --- a/akka-persistence/akka-persistence-common/src/main/scala/KVStorageBackend.scala +++ b/akka-persistence/akka-persistence-common/src/main/scala/KVStorageBackend.scala @@ -40,11 +40,95 @@ private[akka] object KVAccess { } } -private[akka] trait KVStorageBackend extends MapStorageBackend[Array[Byte], Array[Byte]] with VectorStorageBackend[Array[Byte]] with RefStorageBackend[Array[Byte]] with QueueStorageBackend[Array[Byte]] with Logging { - +private[akka] object KVStorageBackend { val nullMapValueHeader = 0x00.byteValue val nullMapValue: Array[Byte] = Array(nullMapValueHeader) val notNullMapValueHeader: Byte = 0xff.byteValue + + def getStoredMapValue(value: Array[Byte]): Array[Byte] = { + value match { + case null => nullMapValue + case value => { + val stored = new Array[Byte](value.length + 1) + stored(0) = notNullMapValueHeader + System.arraycopy(value, 0, stored, 1, value.length) + stored + } + } + } + + def getMapValueFromStored(value: Array[Byte]): Array[Byte] = { + + if (value(0) == nullMapValueHeader) { + null + } else if (value(0) == notNullMapValueHeader) { + val returned = new Array[Byte](value.length - 1) + System.arraycopy(value, 1, returned, 0, value.length - 1) + returned + } else { + throw new StorageException("unknown header byte on map value:" + value(0)) + } + } + + object IntSerializer { + val bytesPerInt = java.lang.Integer.SIZE / java.lang.Byte.SIZE + + def toBytes(i: Int) = ByteBuffer.wrap(new Array[Byte](bytesPerInt)).putInt(i).array() + + def fromBytes(bytes: Array[Byte]) = ByteBuffer.wrap(bytes).getInt() + + def toString(obj: Int) = obj.toString + + def fromString(str: String) = str.toInt + } + + object SortedSetSerializer { + def toBytes(set: SortedSet[Array[Byte]]): Array[Byte] = { + val length = set.foldLeft(0) { + (total, bytes) => { + total + bytes.length + IntSerializer.bytesPerInt + } + } + val allBytes = new Array[Byte](length) + val written = set.foldLeft(0) { + (total, bytes) => { + val sizeBytes = IntSerializer.toBytes(bytes.length) + System.arraycopy(sizeBytes, 0, allBytes, total, sizeBytes.length) + System.arraycopy(bytes, 0, allBytes, total + sizeBytes.length, bytes.length) + total + sizeBytes.length + bytes.length + } + } + require(length == written, "Bytes Written Did not equal Calculated Length, written %d, length %d".format(written, length)) + allBytes + } + + def fromBytes(bytes: Array[Byte]): SortedSet[Array[Byte]] = { + import se.scalablesolutions.akka.persistence.common.PersistentMapBinary.COrdering._ + + var set = new TreeSet[Array[Byte]] + if (bytes.length > IntSerializer.bytesPerInt) { + var pos = 0 + while (pos < bytes.length) { + val lengthBytes = new Array[Byte](IntSerializer.bytesPerInt) + System.arraycopy(bytes, pos, lengthBytes, 0, IntSerializer.bytesPerInt) + pos += IntSerializer.bytesPerInt + val length = IntSerializer.fromBytes(lengthBytes) + val item = new Array[Byte](length) + System.arraycopy(bytes, pos, item, 0, length) + set = set + item + pos += length + } + } + set + } + + } + +} + +private[akka] trait KVStorageBackend extends MapStorageBackend[Array[Byte], Array[Byte]] with VectorStorageBackend[Array[Byte]] with RefStorageBackend[Array[Byte]] with QueueStorageBackend[Array[Byte]] with Logging { + + val underscoreBytesUTF8 = "_".getBytes("UTF-8") val mapKeysIndex = getIndexedBytes(-1) val vectorHeadIndex = getIndexedBytes(-1) @@ -54,6 +138,7 @@ private[akka] trait KVStorageBackend extends MapStorageBackend[Array[Byte], Arra implicit val ordering = ArrayOrdering + import KVStorageBackend._ import KVAccess._ @@ -239,13 +324,14 @@ private[akka] trait KVStorageBackend extends MapStorageBackend[Array[Byte], Arra override def removeVectorStorageEntryFor(name: String) = { val mdata = getVectorMetadata(name) - if(mdata.canRemove){ + if (mdata.canRemove) { val key = getIndexedKey(name, mdata.tail) vectorAccess.put(getKey(name, vectorTailIndex), IntSerializer.toBytes(mdata.nextRemove)) - try{ - vectorAccess.delete(key) + try + { + vectorAccess.delete(key) } catch { - case e:Exception => log.warn("Exception while trying to clean up a popped element from the vector, this is acceptable") + case e: Exception => log.warn("Exception while trying to clean up a popped element from the vector, this is acceptable") } } else { @@ -385,30 +471,6 @@ private[akka] trait KVStorageBackend extends MapStorageBackend[Array[Byte], Arra //wrapper for null - def getStoredMapValue(value: Array[Byte]): Array[Byte] = { - value match { - case null => nullMapValue - case value => { - val stored = new Array[Byte](value.length + 1) - stored(0) = notNullMapValueHeader - System.arraycopy(value, 0, stored, 1, value.length) - stored - } - } - } - - def getMapValueFromStored(value: Array[Byte]): Array[Byte] = { - - if (value(0) == nullMapValueHeader) { - null - } else if (value(0) == notNullMapValueHeader) { - val returned = new Array[Byte](value.length - 1) - System.arraycopy(value, 1, returned, 0, value.length - 1) - returned - } else { - throw new StorageException("unknown header byte on map value:" + value(0)) - } - } case class QueueMetadata(head: Int, tail: Int) { //queue is an sequence with indexes from 0 to Int.MAX_VALUE @@ -476,7 +538,7 @@ private[akka] trait KVStorageBackend extends MapStorageBackend[Array[Byte], Arra def size = { if (head >= tail) { - head - tail + head - tail } else { //queue has wrapped (Integer.MAX_VALUE - tail) + (head + 1) @@ -530,58 +592,5 @@ private[akka] trait KVStorageBackend extends MapStorageBackend[Array[Byte], Arra } } - object IntSerializer { - val bytesPerInt = java.lang.Integer.SIZE / java.lang.Byte.SIZE - - def toBytes(i: Int) = ByteBuffer.wrap(new Array[Byte](bytesPerInt)).putInt(i).array() - - def fromBytes(bytes: Array[Byte]) = ByteBuffer.wrap(bytes).getInt() - - def toString(obj: Int) = obj.toString - - def fromString(str: String) = str.toInt - } - - object SortedSetSerializer { - def toBytes(set: SortedSet[Array[Byte]]): Array[Byte] = { - val length = set.foldLeft(0) { - (total, bytes) => { - total + bytes.length + IntSerializer.bytesPerInt - } - } - val allBytes = new Array[Byte](length) - val written = set.foldLeft(0) { - (total, bytes) => { - val sizeBytes = IntSerializer.toBytes(bytes.length) - System.arraycopy(sizeBytes, 0, allBytes, total, sizeBytes.length) - System.arraycopy(bytes, 0, allBytes, total + sizeBytes.length, bytes.length) - total + sizeBytes.length + bytes.length - } - } - require(length == written, "Bytes Written Did not equal Calculated Length, written %d, length %d".format(written, length)) - allBytes - } - - def fromBytes(bytes: Array[Byte]): SortedSet[Array[Byte]] = { - import se.scalablesolutions.akka.persistence.common.PersistentMapBinary.COrdering._ - - var set = new TreeSet[Array[Byte]] - if (bytes.length > IntSerializer.bytesPerInt) { - var pos = 0 - while (pos < bytes.length) { - val lengthBytes = new Array[Byte](IntSerializer.bytesPerInt) - System.arraycopy(bytes, pos, lengthBytes, 0, IntSerializer.bytesPerInt) - pos += IntSerializer.bytesPerInt - val length = IntSerializer.fromBytes(lengthBytes) - val item = new Array[Byte](length) - System.arraycopy(bytes, pos, item, 0, length) - set = set + item - pos += length - } - } - set - } - - } } \ No newline at end of file diff --git a/akka-persistence/akka-persistence-voldemort/src/test/scala/VoldemortStorageBackendSuite.scala b/akka-persistence/akka-persistence-voldemort/src/test/scala/VoldemortStorageBackendSuite.scala index 0e57a135e0..9f41c3464a 100644 --- a/akka-persistence/akka-persistence-voldemort/src/test/scala/VoldemortStorageBackendSuite.scala +++ b/akka-persistence/akka-persistence-voldemort/src/test/scala/VoldemortStorageBackendSuite.scala @@ -5,6 +5,7 @@ import org.scalatest.matchers.ShouldMatchers import org.junit.runner.RunWith import org.scalatest.junit.JUnitRunner import se.scalablesolutions.akka.persistence.voldemort.VoldemortStorageBackend._ +import se.scalablesolutions.akka.persistence.common.KVStorageBackend._ import se.scalablesolutions.akka.util.{Logging} import collection.immutable.TreeSet import VoldemortStorageBackendSuite._