From 2cb5fafce564ae25ef84158f535673738b9f12b9 Mon Sep 17 00:00:00 2001 From: ticktock Date: Mon, 27 Sep 2010 20:23:05 -0400 Subject: [PATCH 01/12] Initial PersistentRef spec --- .../test/scala/RefStorageBackendTest.scala | 49 +++++++++++++++++++ ...emortStorageBackendCompatibilityTest.scala | 16 ++++++ 2 files changed, 65 insertions(+) create mode 100644 akka-persistence/akka-persistence-common/src/test/scala/RefStorageBackendTest.scala create mode 100644 akka-persistence/akka-persistence-voldemort/src/test/scala/VoldemortStorageBackendCompatibilityTest.scala diff --git a/akka-persistence/akka-persistence-common/src/test/scala/RefStorageBackendTest.scala b/akka-persistence/akka-persistence-common/src/test/scala/RefStorageBackendTest.scala new file mode 100644 index 0000000000..9625f45ebd --- /dev/null +++ b/akka-persistence/akka-persistence-common/src/test/scala/RefStorageBackendTest.scala @@ -0,0 +1,49 @@ +/** + * Copyright (C) 2009-2010 Scalable Solutions AB + */ + +package se.scalablesolutions.akka.persistence.common + +import org.scalatest.junit.JUnitRunner +import org.junit.runner.RunWith +import org.scalatest.matchers.ShouldMatchers +import se.scalablesolutions.akka.util.Logging +import org.scalatest.{Suite, BeforeAndAfterEach, Spec} + +/** + * Implementation Compatibility test for PersistentRef backend implementations. + */ +@RunWith(classOf[JUnitRunner]) +trait RefStorageBackendTest extends Spec with ShouldMatchers with BeforeAndAfterEach with Logging { + this: Spec => + def storage: RefStorageBackend[Array[Byte]] + + def dropRefs: Unit + + override def beforeEach = { + log.info("beforeEach: dropping refs") + dropRefs + } + + override def afterEach = { + log.info("afterEach: dropping refs") + dropRefs + } + + + describe("A Properly functioning RefStorage Backend") { + it("should successfully insert ref storage") { + val name = "RefStorageTest #1" + val value = name.getBytes + storage.insertRefStorageFor(name, value) + storage.getRefStorageFor(name).get should be(value) + } + + it("should return None when getRefStorage is called when no value has been inserted") { + val name = "RefStorageTest #2" + val value = name.getBytes + storage.getRefStorageFor(name) should be(None) + } + } + +} \ No newline at end of file diff --git a/akka-persistence/akka-persistence-voldemort/src/test/scala/VoldemortStorageBackendCompatibilityTest.scala b/akka-persistence/akka-persistence-voldemort/src/test/scala/VoldemortStorageBackendCompatibilityTest.scala new file mode 100644 index 0000000000..095927e8ae --- /dev/null +++ b/akka-persistence/akka-persistence-voldemort/src/test/scala/VoldemortStorageBackendCompatibilityTest.scala @@ -0,0 +1,16 @@ +package se.scalablesolutions.akka.persistence.voldemort + + +import org.junit.runner.RunWith +import org.scalatest.junit.JUnitRunner +import se.scalablesolutions.akka.persistence.common.{RefStorageBackend, RefStorageBackendTest} +import org.scalatest.Spec + + +@RunWith(classOf[JUnitRunner]) +class VoldemortStorageBackendCompatibilityTest extends RefStorageBackendTest with EmbeddedVoldemort { + def dropRefs: Unit = {} + + + def storage: RefStorageBackend[Array[Byte]] = {VoldemortStorageBackend} +} \ No newline at end of file From b234bd621a05bfffa899520e91504223406a7b7d Mon Sep 17 00:00:00 2001 From: ticktock Date: Tue, 28 Sep 2010 09:49:58 -0400 Subject: [PATCH 02/12] Initial Sketch of Persistence Compatibility Tests --- .../src/test/scala/RefStorageBackendTest.scala | 3 +-- .../test/scala/VoldemortStorageBackendCompatibilityTest.scala | 4 +++- project/build/AkkaProject.scala | 2 +- 3 files changed, 5 insertions(+), 4 deletions(-) diff --git a/akka-persistence/akka-persistence-common/src/test/scala/RefStorageBackendTest.scala b/akka-persistence/akka-persistence-common/src/test/scala/RefStorageBackendTest.scala index 9625f45ebd..2a4cf85f21 100644 --- a/akka-persistence/akka-persistence-common/src/test/scala/RefStorageBackendTest.scala +++ b/akka-persistence/akka-persistence-common/src/test/scala/RefStorageBackendTest.scala @@ -13,9 +13,8 @@ import org.scalatest.{Suite, BeforeAndAfterEach, Spec} /** * Implementation Compatibility test for PersistentRef backend implementations. */ -@RunWith(classOf[JUnitRunner]) + trait RefStorageBackendTest extends Spec with ShouldMatchers with BeforeAndAfterEach with Logging { - this: Spec => def storage: RefStorageBackend[Array[Byte]] def dropRefs: Unit diff --git a/akka-persistence/akka-persistence-voldemort/src/test/scala/VoldemortStorageBackendCompatibilityTest.scala b/akka-persistence/akka-persistence-voldemort/src/test/scala/VoldemortStorageBackendCompatibilityTest.scala index 095927e8ae..16d0a0c572 100644 --- a/akka-persistence/akka-persistence-voldemort/src/test/scala/VoldemortStorageBackendCompatibilityTest.scala +++ b/akka-persistence/akka-persistence-voldemort/src/test/scala/VoldemortStorageBackendCompatibilityTest.scala @@ -9,7 +9,9 @@ import org.scalatest.Spec @RunWith(classOf[JUnitRunner]) class VoldemortStorageBackendCompatibilityTest extends RefStorageBackendTest with EmbeddedVoldemort { - def dropRefs: Unit = {} + def dropRefs: Unit = { + //drop Refs Impl + } def storage: RefStorageBackend[Array[Byte]] = {VoldemortStorageBackend} diff --git a/project/build/AkkaProject.scala b/project/build/AkkaProject.scala index a1c2a9fdf6..3fc6df6617 100644 --- a/project/build/AkkaProject.scala +++ b/project/build/AkkaProject.scala @@ -595,7 +595,7 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) { val dbcp = Dependencies.dbcp val sjson = Dependencies.sjson_test - override def testOptions = createTestFilter( _.endsWith("Suite")) + override def testOptions = createTestFilter({ s:String=> s.endsWith("Suite") || s.endsWith("Test")}) } From 2218198909c2bf76f99075155cb6e7722b10d5fc Mon Sep 17 00:00:00 2001 From: ticktock Date: Tue, 28 Sep 2010 18:38:15 -0400 Subject: [PATCH 03/12] Persistence Compatibility Test Harness and Voldemort Implementation Kept each of the datastructure test classes in seperate files as comprehensive tests will probably be relatively long --- .../test/scala/MapStorageBackendTest.scala | 35 +++++++++++++++ .../test/scala/QueueStorageBackendTest.scala | 35 +++++++++++++++ .../test/scala/RefStorageBackendTest.scala | 6 +-- .../scala/SortedSetStorageBackendTest.scala | 35 +++++++++++++++ .../test/scala/VectorStorageBackendTest.scala | 35 +++++++++++++++ ...emortStorageBackendCompatibilityTest.scala | 45 ++++++++++++++++--- 6 files changed, 180 insertions(+), 11 deletions(-) create mode 100644 akka-persistence/akka-persistence-common/src/test/scala/MapStorageBackendTest.scala create mode 100644 akka-persistence/akka-persistence-common/src/test/scala/QueueStorageBackendTest.scala create mode 100644 akka-persistence/akka-persistence-common/src/test/scala/SortedSetStorageBackendTest.scala create mode 100644 akka-persistence/akka-persistence-common/src/test/scala/VectorStorageBackendTest.scala diff --git a/akka-persistence/akka-persistence-common/src/test/scala/MapStorageBackendTest.scala b/akka-persistence/akka-persistence-common/src/test/scala/MapStorageBackendTest.scala new file mode 100644 index 0000000000..aa9a417094 --- /dev/null +++ b/akka-persistence/akka-persistence-common/src/test/scala/MapStorageBackendTest.scala @@ -0,0 +1,35 @@ +/** + * Copyright (C) 2009-2010 Scalable Solutions AB + */ + +package se.scalablesolutions.akka.persistence.common + +import org.scalatest.matchers.ShouldMatchers +import se.scalablesolutions.akka.util.Logging +import org.scalatest.{BeforeAndAfterEach, Spec} + +/** + * Implementation Compatibility test for PersistentMap backend implementations. + */ + +trait MapStorageBackendTest extends Spec with ShouldMatchers with BeforeAndAfterEach with Logging { + def storage: MapStorageBackend[Array[Byte], Array[Byte]] + + def dropMaps: Unit + + override def beforeEach = { + log.info("beforeEach: dropping maps") + dropMaps + } + + override def afterEach = { + log.info("afterEach: dropping maps") + dropMaps + } + + + describe("A Properly functioning MapStorageBackend") { + + } + +} \ No newline at end of file diff --git a/akka-persistence/akka-persistence-common/src/test/scala/QueueStorageBackendTest.scala b/akka-persistence/akka-persistence-common/src/test/scala/QueueStorageBackendTest.scala new file mode 100644 index 0000000000..7b2acc22b1 --- /dev/null +++ b/akka-persistence/akka-persistence-common/src/test/scala/QueueStorageBackendTest.scala @@ -0,0 +1,35 @@ +/** + * Copyright (C) 2009-2010 Scalable Solutions AB + */ + +package se.scalablesolutions.akka.persistence.common + +import org.scalatest.matchers.ShouldMatchers +import se.scalablesolutions.akka.util.Logging +import org.scalatest.{BeforeAndAfterEach, Spec} + +/** + * Implementation Compatibility test for PersistentQueue backend implementations. + */ + +trait QueueStorageBackendTest extends Spec with ShouldMatchers with BeforeAndAfterEach with Logging { + def storage: QueueStorageBackend[Array[Byte]] + + def dropQueues: Unit + + override def beforeEach = { + log.info("beforeEach: dropping queues") + dropQueues + } + + override def afterEach = { + log.info("afterEach: dropping queues") + dropQueues + } + + + describe("A Properly functioning QueueStorage Backend") { + + } + +} \ No newline at end of file diff --git a/akka-persistence/akka-persistence-common/src/test/scala/RefStorageBackendTest.scala b/akka-persistence/akka-persistence-common/src/test/scala/RefStorageBackendTest.scala index 2a4cf85f21..96451bc063 100644 --- a/akka-persistence/akka-persistence-common/src/test/scala/RefStorageBackendTest.scala +++ b/akka-persistence/akka-persistence-common/src/test/scala/RefStorageBackendTest.scala @@ -4,11 +4,9 @@ package se.scalablesolutions.akka.persistence.common -import org.scalatest.junit.JUnitRunner -import org.junit.runner.RunWith import org.scalatest.matchers.ShouldMatchers import se.scalablesolutions.akka.util.Logging -import org.scalatest.{Suite, BeforeAndAfterEach, Spec} +import org.scalatest.{BeforeAndAfterEach, Spec} /** * Implementation Compatibility test for PersistentRef backend implementations. @@ -30,7 +28,7 @@ trait RefStorageBackendTest extends Spec with ShouldMatchers with BeforeAndAfter } - describe("A Properly functioning RefStorage Backend") { + describe("A Properly functioning RefStorageBackend") { it("should successfully insert ref storage") { val name = "RefStorageTest #1" val value = name.getBytes diff --git a/akka-persistence/akka-persistence-common/src/test/scala/SortedSetStorageBackendTest.scala b/akka-persistence/akka-persistence-common/src/test/scala/SortedSetStorageBackendTest.scala new file mode 100644 index 0000000000..2a9d3ab324 --- /dev/null +++ b/akka-persistence/akka-persistence-common/src/test/scala/SortedSetStorageBackendTest.scala @@ -0,0 +1,35 @@ +/** + * Copyright (C) 2009-2010 Scalable Solutions AB + */ + +package se.scalablesolutions.akka.persistence.common + +import org.scalatest.matchers.ShouldMatchers +import se.scalablesolutions.akka.util.Logging +import org.scalatest.{BeforeAndAfterEach, Spec} + +/** + * Implementation Compatibility test for PersistentSortedSet backend implementations. + */ + +trait SortedSetStorageBackendTest extends Spec with ShouldMatchers with BeforeAndAfterEach with Logging { + def storage: SortedSetStorageBackend[Array[Byte]] + + def dropSortedSets: Unit + + override def beforeEach = { + log.info("beforeEach: dropping sorted sets") + dropSortedSets + } + + override def afterEach = { + log.info("afterEach: dropping sorted sets") + dropSortedSets + } + + + describe("A Properly functioning SortedSetStorageBackend Backend") { + + } + +} \ No newline at end of file diff --git a/akka-persistence/akka-persistence-common/src/test/scala/VectorStorageBackendTest.scala b/akka-persistence/akka-persistence-common/src/test/scala/VectorStorageBackendTest.scala new file mode 100644 index 0000000000..8b0008ea5a --- /dev/null +++ b/akka-persistence/akka-persistence-common/src/test/scala/VectorStorageBackendTest.scala @@ -0,0 +1,35 @@ +/** + * Copyright (C) 2009-2010 Scalable Solutions AB + */ + +package se.scalablesolutions.akka.persistence.common + +import org.scalatest.matchers.ShouldMatchers +import se.scalablesolutions.akka.util.Logging +import org.scalatest.{BeforeAndAfterEach, Spec} + +/** + * Implementation Compatibility test for PersistentVector backend implementations. + */ + +trait VectorStorageBackendTest extends Spec with ShouldMatchers with BeforeAndAfterEach with Logging { + def storage: VectorStorageBackend[Array[Byte]] + + def dropVectors: Unit + + override def beforeEach = { + log.info("beforeEach: dropping vectors") + dropVectors + } + + override def afterEach = { + log.info("afterEach: dropping vectors") + dropVectors + } + + + describe("A Properly functioning VectorStorageBackend") { + + } + +} \ No newline at end of file diff --git a/akka-persistence/akka-persistence-voldemort/src/test/scala/VoldemortStorageBackendCompatibilityTest.scala b/akka-persistence/akka-persistence-voldemort/src/test/scala/VoldemortStorageBackendCompatibilityTest.scala index 16d0a0c572..a7bcd9afff 100644 --- a/akka-persistence/akka-persistence-voldemort/src/test/scala/VoldemortStorageBackendCompatibilityTest.scala +++ b/akka-persistence/akka-persistence-voldemort/src/test/scala/VoldemortStorageBackendCompatibilityTest.scala @@ -3,16 +3,47 @@ package se.scalablesolutions.akka.persistence.voldemort import org.junit.runner.RunWith import org.scalatest.junit.JUnitRunner -import se.scalablesolutions.akka.persistence.common.{RefStorageBackend, RefStorageBackendTest} -import org.scalatest.Spec - +import se.scalablesolutions.akka.persistence.common.{QueueStorageBackendTest, VectorStorageBackendTest, MapStorageBackendTest, RefStorageBackendTest} @RunWith(classOf[JUnitRunner]) -class VoldemortStorageBackendCompatibilityTest extends RefStorageBackendTest with EmbeddedVoldemort { - def dropRefs: Unit = { +class VoldemortRefStorageBackendTest extends RefStorageBackendTest with EmbeddedVoldemort { + def dropRefs = { //drop Refs Impl } - def storage: RefStorageBackend[Array[Byte]] = {VoldemortStorageBackend} -} \ No newline at end of file + def storage = VoldemortStorageBackend +} + +@RunWith(classOf[JUnitRunner]) +class VoldemortMapStorageBackendTest extends MapStorageBackendTest with EmbeddedVoldemort { + def dropMaps = { + //drop Maps Impl + } + + + def storage = VoldemortStorageBackend +} + +@RunWith(classOf[JUnitRunner]) +class VoldemortVectorStorageBackendTest extends VectorStorageBackendTest with EmbeddedVoldemort { + def dropVectors = { + //drop Maps Impl + } + + + def storage = VoldemortStorageBackend +} + + +@RunWith(classOf[JUnitRunner]) +class VoldemortQueueStorageBackendTest extends QueueStorageBackendTest with EmbeddedVoldemort { + def dropQueues = { + //drop Maps Impl + } + + + def storage = VoldemortStorageBackend +} + + From 77bda9f582897f85c2677087206733733b41cb8a Mon Sep 17 00:00:00 2001 From: ticktock Date: Tue, 28 Sep 2010 22:06:06 -0400 Subject: [PATCH 04/12] Initial Spec for MapStorageBackend --- .../test/scala/MapStorageBackendTest.scala | 108 ++++++++++++++++++ .../src/test/scala/EmbeddedVoldemort.scala | 10 +- ...emortStorageBackendCompatibilityTest.scala | 8 +- 3 files changed, 118 insertions(+), 8 deletions(-) diff --git a/akka-persistence/akka-persistence-common/src/test/scala/MapStorageBackendTest.scala b/akka-persistence/akka-persistence-common/src/test/scala/MapStorageBackendTest.scala index aa9a417094..5bab5b8939 100644 --- a/akka-persistence/akka-persistence-common/src/test/scala/MapStorageBackendTest.scala +++ b/akka-persistence/akka-persistence-common/src/test/scala/MapStorageBackendTest.scala @@ -7,6 +7,8 @@ package se.scalablesolutions.akka.persistence.common import org.scalatest.matchers.ShouldMatchers import se.scalablesolutions.akka.util.Logging import org.scalatest.{BeforeAndAfterEach, Spec} +import scala.util.Random +import collection.immutable.{HashMap, HashSet} /** * Implementation Compatibility test for PersistentMap backend implementations. @@ -29,6 +31,112 @@ trait MapStorageBackendTest extends Spec with ShouldMatchers with BeforeAndAfter describe("A Properly functioning MapStorageBackend") { + it("should remove map storage properly") { + val mapName = "removeTest" + val mkey = "removeTestKey".getBytes + val value = "removeTestValue".getBytes + + storage.insertMapStorageEntryFor(mapName, mkey, value) + storage.getMapStorageEntryFor(mapName, mkey).isDefined should be(true) + storage.removeMapStorageFor(mapName, mkey) + storage.getMapStorageEntryFor(mapName, mkey) should be(None) + + storage.insertMapStorageEntryFor(mapName, mkey, value) + storage.getMapStorageEntryFor(mapName, mkey).isDefined should be(true) + storage.removeMapStorageFor(mapName) + storage.getMapStorageEntryFor(mapName, mkey) should be(None) + } + + it("should insert a single map storage element properly") { + val mapName = "insertSingleTest" + val mkey = "insertSingleTestKey".getBytes + val value = "insertSingleTestValue".getBytes + + storage.insertMapStorageEntryFor(mapName, mkey, value) + storage.getMapStorageEntryFor(mapName, mkey).get should be(value) + storage.removeMapStorageFor(mapName, mkey) + storage.getMapStorageEntryFor(mapName, mkey) should be(None) + + storage.insertMapStorageEntryFor(mapName, mkey, value) + storage.getMapStorageEntryFor(mapName, mkey).get should be(value) + storage.removeMapStorageFor(mapName) + storage.getMapStorageEntryFor(mapName, mkey) should be(None) + } + + + it("should insert multiple map storage elements properly") { + val mapName = "insertMultipleTest" + val rand = new Random(2).nextInt(100) + val entries = (1 to rand).toList.map { + index => + (("insertMultipleTestKey" + index).getBytes -> ("insertMutlipleTestValue" + index).getBytes) + } + + storage.insertMapStorageEntriesFor(mapName, entries) + entries foreach { + _ match { + case (mkey, value) => { + storage.getMapStorageEntryFor(mapName, mkey).isDefined should be(true) + storage.getMapStorageEntryFor(mapName, mkey).get should be(value) + } + } + } + storage.removeMapStorageFor(mapName) + entries foreach { + _ match { + case (mkey, value) => { + storage.getMapStorageEntryFor(mapName, mkey) should be(None) + } + } + } + } + + + it("should accurately track the number of key value pairs in a map") { + val mapName = "sizeTest" + val rand = new Random(3).nextInt(100) + val entries = (1 to rand).toList.map { + index => + (("sizeTestKey" + index).getBytes -> ("sizeTestValue" + index).getBytes) + } + + storage.insertMapStorageEntriesFor(mapName, entries) + storage.getMapStorageSizeFor(mapName) should be(rand) + } + + + + it("should return all the key value pairs in the map (in the correct order?) when getMapStorageFor(name) is called") { + val mapName = "allTest" + val rand = new Random(3).nextInt(100) + val entries = (1 to rand).toList.map { + index => + (("allTestKey" + index).getBytes -> ("allTestValue" + index).getBytes) + } + + storage.insertMapStorageEntriesFor(mapName, entries) + val retrieved = storage.getMapStorageFor(mapName) + retrieved.size should be(rand) + entries.size should be(rand) + + + val entryMap = new HashMap[String, String] ++ entries.map {_ match {case (k, v) => (new String(k), new String(v))}} + val retrievedMap = new HashMap[String, String] ++ entries.map {_ match {case (k, v) => (new String(k), new String(v))}} + + entryMap should equal(retrievedMap) + //Should the ordering of key-vals returned be enforced? + //ordered by key? + //using what comaparison? + + } + + it("should not throw an exception when size is called on a non existent map?") { + storage.getMapStorageSizeFor("nonExistent") should be(0) + } + + it("should behave properly when getMapStorageRange is called?") { + //No current code calls getMapStorageRangeFor + } } diff --git a/akka-persistence/akka-persistence-voldemort/src/test/scala/EmbeddedVoldemort.scala b/akka-persistence/akka-persistence-voldemort/src/test/scala/EmbeddedVoldemort.scala index ce87309fb9..d0f40f1a03 100644 --- a/akka-persistence/akka-persistence-voldemort/src/test/scala/EmbeddedVoldemort.scala +++ b/akka-persistence/akka-persistence-voldemort/src/test/scala/EmbeddedVoldemort.scala @@ -1,20 +1,20 @@ package se.scalablesolutions.akka.persistence.voldemort -import org.scalatest.matchers.ShouldMatchers import voldemort.server.{VoldemortServer, VoldemortConfig} -import org.scalatest.{Suite, BeforeAndAfterAll, FunSuite} +import org.scalatest.{Suite, BeforeAndAfterAll} import org.junit.runner.RunWith import org.scalatest.junit.JUnitRunner -import voldemort.utils.Utils import java.io.File import se.scalablesolutions.akka.util.{Logging} import collection.JavaConversions import voldemort.store.memory.InMemoryStorageConfiguration +import voldemort.client.protocol.admin.{AdminClientConfig, AdminClient} + -@RunWith(classOf[JUnitRunner]) trait EmbeddedVoldemort extends BeforeAndAfterAll with Logging { this: Suite => var server: VoldemortServer = null + var admin: AdminClient = null override protected def beforeAll(): Unit = { @@ -28,6 +28,7 @@ trait EmbeddedVoldemort extends BeforeAndAfterAll with Logging { server = new VoldemortServer(config) server.start VoldemortStorageBackend.initStoreClients + admin = new AdminClient(VoldemortStorageBackend.clientConfig.getProperty(VoldemortStorageBackend.bootstrapUrlsProp), new AdminClientConfig) log.info("Started") } catch { case e => log.error(e, "Error Starting Voldemort") @@ -36,6 +37,7 @@ trait EmbeddedVoldemort extends BeforeAndAfterAll with Logging { } override protected def afterAll(): Unit = { + admin.stop server.stop } } \ No newline at end of file diff --git a/akka-persistence/akka-persistence-voldemort/src/test/scala/VoldemortStorageBackendCompatibilityTest.scala b/akka-persistence/akka-persistence-voldemort/src/test/scala/VoldemortStorageBackendCompatibilityTest.scala index a7bcd9afff..b9b3ea4ed1 100644 --- a/akka-persistence/akka-persistence-voldemort/src/test/scala/VoldemortStorageBackendCompatibilityTest.scala +++ b/akka-persistence/akka-persistence-voldemort/src/test/scala/VoldemortStorageBackendCompatibilityTest.scala @@ -8,7 +8,7 @@ import se.scalablesolutions.akka.persistence.common.{QueueStorageBackendTest, Ve @RunWith(classOf[JUnitRunner]) class VoldemortRefStorageBackendTest extends RefStorageBackendTest with EmbeddedVoldemort { def dropRefs = { - //drop Refs Impl + admin.truncate(0, VoldemortStorageBackend.refStore) } @@ -18,7 +18,7 @@ class VoldemortRefStorageBackendTest extends RefStorageBackendTest with Embedded @RunWith(classOf[JUnitRunner]) class VoldemortMapStorageBackendTest extends MapStorageBackendTest with EmbeddedVoldemort { def dropMaps = { - //drop Maps Impl + admin.truncate(0, VoldemortStorageBackend.mapStore) } @@ -28,7 +28,7 @@ class VoldemortMapStorageBackendTest extends MapStorageBackendTest with Embedded @RunWith(classOf[JUnitRunner]) class VoldemortVectorStorageBackendTest extends VectorStorageBackendTest with EmbeddedVoldemort { def dropVectors = { - //drop Maps Impl + admin.truncate(0, VoldemortStorageBackend.vectorStore) } @@ -39,7 +39,7 @@ class VoldemortVectorStorageBackendTest extends VectorStorageBackendTest with Em @RunWith(classOf[JUnitRunner]) class VoldemortQueueStorageBackendTest extends QueueStorageBackendTest with EmbeddedVoldemort { def dropQueues = { - //drop Maps Impl + admin.truncate(0, VoldemortStorageBackend.queueStore) } From 46f1f97b95a904ae68cab61d47abf31a113b0d65 Mon Sep 17 00:00:00 2001 From: ticktock Date: Wed, 29 Sep 2010 20:12:55 -0400 Subject: [PATCH 05/12] Initial QueueStorageBackend Spec --- .../test/scala/MapStorageBackendTest.scala | 2 +- .../test/scala/QueueStorageBackendTest.scala | 83 ++++++++++++++++++- 2 files changed, 83 insertions(+), 2 deletions(-) diff --git a/akka-persistence/akka-persistence-common/src/test/scala/MapStorageBackendTest.scala b/akka-persistence/akka-persistence-common/src/test/scala/MapStorageBackendTest.scala index 5bab5b8939..38c2e9e45a 100644 --- a/akka-persistence/akka-persistence-common/src/test/scala/MapStorageBackendTest.scala +++ b/akka-persistence/akka-persistence-common/src/test/scala/MapStorageBackendTest.scala @@ -66,7 +66,7 @@ trait MapStorageBackendTest extends Spec with ShouldMatchers with BeforeAndAfter it("should insert multiple map storage elements properly") { val mapName = "insertMultipleTest" - val rand = new Random(2).nextInt(100) + val rand = new Random(3).nextInt(100) val entries = (1 to rand).toList.map { index => (("insertMultipleTestKey" + index).getBytes -> ("insertMutlipleTestValue" + index).getBytes) diff --git a/akka-persistence/akka-persistence-common/src/test/scala/QueueStorageBackendTest.scala b/akka-persistence/akka-persistence-common/src/test/scala/QueueStorageBackendTest.scala index 7b2acc22b1..88ddaeea28 100644 --- a/akka-persistence/akka-persistence-common/src/test/scala/QueueStorageBackendTest.scala +++ b/akka-persistence/akka-persistence-common/src/test/scala/QueueStorageBackendTest.scala @@ -7,6 +7,7 @@ package se.scalablesolutions.akka.persistence.common import org.scalatest.matchers.ShouldMatchers import se.scalablesolutions.akka.util.Logging import org.scalatest.{BeforeAndAfterEach, Spec} +import scala.util.Random /** * Implementation Compatibility test for PersistentQueue backend implementations. @@ -28,8 +29,88 @@ trait QueueStorageBackendTest extends Spec with ShouldMatchers with BeforeAndAft } - describe("A Properly functioning QueueStorage Backend") { + describe("A Properly functioning QueueStorage Backend") { + it("should enqueue properly when there is capacity in the queue") { + val queue = "enqueueTest" + val value = "enqueueTestValue".getBytes + storage.size(queue) should be(0) + storage.enqueue(queue, value).get should be(1) + storage.size(queue) should be(1) + } + + it("should return None when enqueue is called on a full queue?") { + + } + + it("should dequeue properly when the queue is empty") { + val queue = "dequeueTest" + val value = "dequeueTestValue".getBytes + storage.size(queue) should be(0) + storage.enqueue(queue, value) + storage.size(queue) should be(1) + storage.dequeue(queue).get should be(value) + } + + it("should return None when dequeue is called on an empty queue") { + val queue = "dequeueTest2" + val value = "dequeueTestValue2".getBytes + storage.size(queue) should be(0) + storage.dequeue(queue) should be(None) + } + + it("should accurately reflect the size of the queue") { + val queue = "sizeTest" + val rand = new Random(3).nextInt(100) + val values = (1 to rand).toList.map {i: Int => ("sizeTestValue" + i).getBytes} + values.foreach {storage.enqueue(queue, _)} + storage.size(queue) should be(rand) + val drand = new Random(3).nextInt(rand) + (1 to drand).foreach { + i: Int => { + storage.dequeue(queue).isDefined should be(true) + storage.size(queue) should be(rand - i) + } + } + } + + it("should support peek properly") { + val queue = "sizeTest" + val rand = new Random(3).nextInt(100) + val values = (1 to rand).toList.map {i: Int => ("peekTestValue" + i)} + storage.remove(queue) + values.foreach {s: String => storage.enqueue(queue, s.getBytes)} + (1 to rand).foreach { + index => { + val peek = storage.peek(queue, 0, index).map {new String(_)} + peek.size should be(index) + values.dropRight(values.size - index).equals(peek) should be(true) + } + } + (0 until rand).foreach { + index => { + val peek = storage.peek(queue, index, rand - index).map {new String(_)} + peek.size should be(rand - index) + values.drop(index).equals(peek) should be(true) + } + } + + //Should we test counts greater than queue size? or greater than queue size - count??? + } + + it("should not throw an exception when remove is called on a non-existent queue") { + storage.remove("exceptionTest") + } + + it("should remove queue storage properly") { + val queue = "removeTest" + val rand = new Random(3).nextInt(100) + val values = (1 to rand).toList.map {i: Int => ("removeValue" + i).getBytes} + values.foreach {storage.enqueue(queue, _)} + storage.size(queue) should be(rand) + storage.remove(queue) + storage.size(queue) should be(0) + } } } \ No newline at end of file From 7c2c550e216d0a43228829e1f6bf24402486fe3c Mon Sep 17 00:00:00 2001 From: ticktock Date: Wed, 29 Sep 2010 20:26:30 -0400 Subject: [PATCH 06/12] Initial QueueStorageBackend Spec --- .../src/test/scala/QueueStorageBackendTest.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/akka-persistence/akka-persistence-common/src/test/scala/QueueStorageBackendTest.scala b/akka-persistence/akka-persistence-common/src/test/scala/QueueStorageBackendTest.scala index 88ddaeea28..61730a42d3 100644 --- a/akka-persistence/akka-persistence-common/src/test/scala/QueueStorageBackendTest.scala +++ b/akka-persistence/akka-persistence-common/src/test/scala/QueueStorageBackendTest.scala @@ -43,7 +43,7 @@ trait QueueStorageBackendTest extends Spec with ShouldMatchers with BeforeAndAft } - it("should dequeue properly when the queue is empty") { + it("should dequeue properly when the queue is not empty") { val queue = "dequeueTest" val value = "dequeueTestValue".getBytes storage.size(queue) should be(0) From 984de304a8436034cd151ce1dcdd23bdec89e6f0 Mon Sep 17 00:00:00 2001 From: ticktock Date: Thu, 30 Sep 2010 13:43:59 -0400 Subject: [PATCH 07/12] Moved implicit Ordering(ArraySeq[Byte]) to a new PersistentMapBinary companion object and created an implicit Ordering(Array[Byte]) that can be used on the backends too Finished Map Backend Spec that tests proper ordering of retrieved k->v pairs and fixed Voldemort to work properly --- .../src/main/scala/Storage.scala | 245 ++++++++++-------- .../test/scala/MapStorageBackendTest.scala | 32 ++- .../main/scala/VoldemortStorageBackend.scala | 29 ++- ...oldemortPersistentDatastructureSuite.scala | 16 +- 4 files changed, 192 insertions(+), 130 deletions(-) diff --git a/akka-persistence/akka-persistence-common/src/main/scala/Storage.scala b/akka-persistence/akka-persistence-common/src/main/scala/Storage.scala index e75fd9581c..08d42c9148 100644 --- a/akka-persistence/akka-persistence-common/src/main/scala/Storage.scala +++ b/akka-persistence/akka-persistence-common/src/main/scala/Storage.scala @@ -7,6 +7,7 @@ package se.scalablesolutions.akka.persistence.common import se.scalablesolutions.akka.stm._ import se.scalablesolutions.akka.stm.TransactionManagement.transaction import se.scalablesolutions.akka.util.Logging +import collection.mutable.ArraySeq // FIXME move to 'stm' package + add message with more info class NoTransactionInScopeException extends RuntimeException @@ -47,26 +48,38 @@ trait Storage { type ElementType def newMap: PersistentMap[ElementType, ElementType] + def newVector: PersistentVector[ElementType] + def newRef: PersistentRef[ElementType] + def newQueue: PersistentQueue[ElementType] = // only implemented for redis throw new UnsupportedOperationException + def newSortedSet: PersistentSortedSet[ElementType] = // only implemented for redis throw new UnsupportedOperationException def getMap(id: String): PersistentMap[ElementType, ElementType] + def getVector(id: String): PersistentVector[ElementType] + def getRef(id: String): PersistentRef[ElementType] + def getQueue(id: String): PersistentQueue[ElementType] = // only implemented for redis throw new UnsupportedOperationException + def getSortedSet(id: String): PersistentSortedSet[ElementType] = // only implemented for redis throw new UnsupportedOperationException def newMap(id: String): PersistentMap[ElementType, ElementType] + def newVector(id: String): PersistentVector[ElementType] + def newRef(id: String): PersistentRef[ElementType] + def newQueue(id: String): PersistentQueue[ElementType] = // only implemented for redis throw new UnsupportedOperationException + def newSortedSet(id: String): PersistentSortedSet[ElementType] = // only implemented for redis throw new UnsupportedOperationException } @@ -90,7 +103,7 @@ private[akka] object PersistentMap { * @author Jonas Bonér */ trait PersistentMap[K, V] extends scala.collection.mutable.Map[K, V] - with Transactional with Committable with Abortable with Logging { + with Transactional with Committable with Abortable with Logging { //Import Ops import PersistentMap._ @@ -118,7 +131,7 @@ trait PersistentMap[K, V] extends scala.collection.mutable.Map[K, V] protected def clearDistinctKeys = keysInCurrentTx.clear protected def filterTxLogByKey(key: K): IndexedSeq[LogEntry] = - appendOnlyTxLog filter(e => e.key.map(equal(_, key)).getOrElse(true)) + appendOnlyTxLog filter (e => e.key.map(equal(_, key)).getOrElse(true)) // need to get current value considering the underlying storage as well as the transaction log protected def getCurrentValue(key: K): Option[V] = { @@ -129,7 +142,7 @@ trait PersistentMap[K, V] extends scala.collection.mutable.Map[K, V] // get the snapshot from the underlying store for this key val underlying = try { storage.getMapStorageEntryFor(uuid, key) - } catch { case e: Exception => None } + } catch {case e: Exception => None} if (txEntries.isEmpty) underlying else txEntries.last match { @@ -146,12 +159,14 @@ trait PersistentMap[K, V] extends scala.collection.mutable.Map[K, V] case None => Map.empty[K, V] case Some(v) => Map((key, v)) } - txEntries.foreach {case LogEntry(k, v, o) => o match { - case PUT => m.put(k.get, v.get) - case REM => m -= k.get - case UPD => m.update(k.get, v.get) - case CLR => Map.empty[K, V] - }} + txEntries.foreach { + case LogEntry(k, v, o) => o match { + case PUT => m.put(k.get, v.get) + case REM => m -= k.get + case UPD => m.update(k.get, v.get) + case CLR => Map.empty[K, V] + } + } m get key } @@ -159,12 +174,14 @@ trait PersistentMap[K, V] extends scala.collection.mutable.Map[K, V] val storage: MapStorageBackend[K, V] def commit = { - appendOnlyTxLog.foreach { case LogEntry(k, v, o) => o match { - case PUT => storage.insertMapStorageEntryFor(uuid, k.get, v.get) - case UPD => storage.insertMapStorageEntryFor(uuid, k.get, v.get) - case REM => storage.removeMapStorageFor(uuid, k.get) - case CLR => storage.removeMapStorageFor(uuid) - }} + appendOnlyTxLog.foreach { + case LogEntry(k, v, o) => o match { + case PUT => storage.insertMapStorageEntryFor(uuid, k.get, v.get) + case UPD => storage.insertMapStorageEntryFor(uuid, k.get, v.get) + case REM => storage.removeMapStorageFor(uuid, k.get) + case CLR => storage.removeMapStorageFor(uuid) + } + } appendOnlyTxLog.clear clearDistinctKeys @@ -180,8 +197,8 @@ trait PersistentMap[K, V] extends scala.collection.mutable.Map[K, V] this } - override def +=(kv : (K,V)) = { - put(kv._1,kv._2) + override def +=(kv: (K, V)) = { + put(kv._1, kv._2) this } @@ -230,10 +247,10 @@ trait PersistentMap[K, V] extends scala.collection.mutable.Map[K, V] case Seq() => // current tx doesn't use this storage.getMapStorageEntryFor(uuid, key).isDefined // check storage case txs => // present in log - val lastOp = txs.last.op + val lastOp = txs.last.op lastOp != REM && lastOp != CLR // last entry cannot be a REM - } - } catch { case e: Exception => false } + } + } catch {case e: Exception => false} protected def existsInStorage(key: K): Option[V] = try { storage.getMapStorageEntryFor(uuid, key) @@ -243,33 +260,33 @@ trait PersistentMap[K, V] extends scala.collection.mutable.Map[K, V] override def size: Int = try { // partition key set affected in current tx into those which r added & which r deleted - val (keysAdded, keysRemoved) = keysInCurrentTx.map { + val (keysAdded, keysRemoved) = keysInCurrentTx.map { case (kseq, k) => ((kseq, k), getCurrentValue(k)) }.partition(_._2.isDefined) // keys which existed in storage but removed in current tx - val inStorageRemovedInTx = - keysRemoved.keySet - .map(_._2) - .filter(k => existsInStorage(k).isDefined) - .size + val inStorageRemovedInTx = + keysRemoved.keySet + .map(_._2) + .filter(k => existsInStorage(k).isDefined) + .size // all keys in storage - val keysInStorage = - storage.getMapStorageFor(uuid) - .map { case (k, v) => toEquals(k) } - .toSet + val keysInStorage = + storage.getMapStorageFor(uuid) + .map {case (k, v) => toEquals(k)} + .toSet // (keys that existed UNION keys added ) - (keys removed) (keysInStorage union keysAdded.keySet.map(_._1)).size - inStorageRemovedInTx - } catch { - case e: Exception => 0 + } catch { + case e: Exception => 0 } // get must consider underlying storage & current uncommitted tx log override def get(key: K): Option[V] = getCurrentValue(key) - def iterator: Iterator[Tuple2[K, V]] + def iterator: Iterator[Tuple2[K, V]] private def register = { if (transaction.get.isEmpty) throw new NoTransactionInScopeException @@ -277,38 +294,50 @@ trait PersistentMap[K, V] extends scala.collection.mutable.Map[K, V] } } +object PersistentMapBinary { + object COrdering { + //frontend + implicit object ArraySeqOrdering extends Ordering[ArraySeq[Byte]] { + def compare(o1: ArraySeq[Byte], o2: ArraySeq[Byte]) = + ArrayOrdering.compare(o1.toArray, o2.toArray) + } + //backend + implicit object ArrayOrdering extends Ordering[Array[Byte]] { + def compare(o1: Array[Byte], o2: Array[Byte]) = + new String(o1) compare new String(o2) + } + } +} + trait PersistentMapBinary extends PersistentMap[Array[Byte], Array[Byte]] { import scala.collection.mutable.ArraySeq type T = ArraySeq[Byte] + def toEquals(k: Array[Byte]) = ArraySeq(k: _*) + override def equal(k1: Array[Byte], k2: Array[Byte]): Boolean = k1 sameElements k2 - object COrdering { - implicit object ArraySeqOrdering extends Ordering[ArraySeq[Byte]] { - def compare(o1: ArraySeq[Byte], o2: ArraySeq[Byte]) = - new String(o1.toArray) compare new String(o2.toArray) - } - } + import scala.collection.immutable.{TreeMap, SortedMap} private def replayAllKeys: SortedMap[ArraySeq[Byte], Array[Byte]] = { - import COrdering._ + import PersistentMapBinary.COrdering._ // need ArraySeq for ordering - val fromStorage = - TreeMap(storage.getMapStorageFor(uuid).map { case (k, v) => (ArraySeq(k: _*), v) }: _*) + val fromStorage = + TreeMap(storage.getMapStorageFor(uuid).map {case (k, v) => (ArraySeq(k: _*), v)}: _*) - val (keysAdded, keysRemoved) = keysInCurrentTx.map { + val (keysAdded, keysRemoved) = keysInCurrentTx.map { case (_, k) => (k, getCurrentValue(k)) }.partition(_._2.isDefined) - val inStorageRemovedInTx = - keysRemoved.keySet - .filter(k => existsInStorage(k).isDefined) - .map(k => ArraySeq(k: _*)) + val inStorageRemovedInTx = + keysRemoved.keySet + .filter(k => existsInStorage(k).isDefined) + .map(k => ArraySeq(k: _*)) - (fromStorage -- inStorageRemovedInTx) ++ keysAdded.map { case (k, v) => (ArraySeq(k: _*), v.get) } + (fromStorage -- inStorageRemovedInTx) ++ keysAdded.map {case (k, v) => (ArraySeq(k: _*), v.get)} } override def slice(start: Option[Array[Byte]], finish: Option[Array[Byte]], count: Int): List[(Array[Byte], Array[Byte])] = try { @@ -317,51 +346,53 @@ trait PersistentMapBinary extends PersistentMap[Array[Byte], Array[Byte]] { if (newMap isEmpty) List[(Array[Byte], Array[Byte])]() val startKey = - start match { - case Some(bytes) => Some(ArraySeq(bytes: _*)) - case None => None - } + start match { + case Some(bytes) => Some(ArraySeq(bytes: _*)) + case None => None + } val endKey = - finish match { - case Some(bytes) => Some(ArraySeq(bytes: _*)) - case None => None - } + finish match { + case Some(bytes) => Some(ArraySeq(bytes: _*)) + case None => None + } ((startKey, endKey, count): @unchecked) match { case ((Some(s), Some(e), _)) => newMap.range(s, e) - .toList - .map(e => (e._1.toArray, e._2)) - .toList + .toList + .map(e => (e._1.toArray, e._2)) + .toList case ((Some(s), None, c)) if c > 0 => newMap.from(s) - .iterator - .take(count) - .map(e => (e._1.toArray, e._2)) - .toList + .iterator + .take(count) + .map(e => (e._1.toArray, e._2)) + .toList case ((Some(s), None, _)) => newMap.from(s) - .toList - .map(e => (e._1.toArray, e._2)) - .toList + .toList + .map(e => (e._1.toArray, e._2)) + .toList case ((None, Some(e), _)) => newMap.until(e) - .toList - .map(e => (e._1.toArray, e._2)) - .toList + .toList + .map(e => (e._1.toArray, e._2)) + .toList } - } catch { case e: Exception => Nil } + } catch {case e: Exception => Nil} - override def iterator: Iterator[(Array[Byte], Array[Byte])] = { + override def iterator: Iterator[(Array[Byte], Array[Byte])] = { new Iterator[(Array[Byte], Array[Byte])] { private var elements = replayAllKeys + override def next: (Array[Byte], Array[Byte]) = synchronized { val (k, v) = elements.head elements = elements.tail (k.toArray, v) } - override def hasNext: Boolean = synchronized { !elements.isEmpty } + + override def hasNext: Boolean = synchronized {!elements.isEmpty} } } } @@ -394,7 +425,7 @@ trait PersistentVector[T] extends IndexedSeq[T] with Transactional with Committa val storage: VectorStorageBackend[T] def commit = { - for(entry <- appendOnlyTxLog) { + for (entry <- appendOnlyTxLog) { (entry: @unchecked) match { case LogEntry(_, Some(v), ADD) => storage.insertVectorStorageEntryFor(uuid, v) case LogEntry(Some(i), Some(v), UPD) => storage.updateVectorStorageEntryFor(uuid, i, v) @@ -412,7 +443,7 @@ trait PersistentVector[T] extends IndexedSeq[T] with Transactional with Committa import scala.collection.mutable.ArrayBuffer var elemsStorage = ArrayBuffer(storage.getVectorStorageRangeFor(uuid, None, None, storage.getVectorStorageSizeFor(uuid)).reverse: _*) - for(entry <- appendOnlyTxLog) { + for (entry <- appendOnlyTxLog) { (entry: @unchecked) match { case LogEntry(_, Some(v), ADD) => elemsStorage += v case LogEntry(Some(i), Some(v), UPD) => elemsStorage.update(i, v) @@ -446,11 +477,11 @@ trait PersistentVector[T] extends IndexedSeq[T] with Transactional with Committa val curr = replay val s = if (start.isDefined) start.get else 0 val cnt = - if (finish.isDefined) { - val f = finish.get - if (f >= s) (f - s) else count - } - else count + if (finish.isDefined) { + val f = finish.get + if (f >= s) (f - s) else count + } + else count if (s == 0 && cnt == 0) List().toIndexedSeq else curr.slice(s, s + cnt).toIndexedSeq } @@ -519,12 +550,12 @@ trait PersistentRef[T] extends Transactional with Committable with Abortable { } } - private[akka] object PersistentQueue { - //Operations for PersistentQueue - sealed trait QueueOp - case object ENQ extends QueueOp - case object DEQ extends QueueOp - } +private[akka] object PersistentQueue { + //Operations for PersistentQueue + sealed trait QueueOp + case object ENQ extends QueueOp + case object DEQ extends QueueOp +} /** * Implementation of PersistentQueue for every concrete @@ -552,7 +583,7 @@ trait PersistentRef[T] extends Transactional with Committable with Abortable { * @author Debasish Ghosh */ trait PersistentQueue[A] extends scala.collection.mutable.Queue[A] - with Transactional with Committable with Abortable with Logging { + with Transactional with Committable with Abortable with Logging { //Import Ops import PersistentQueue._ @@ -575,11 +606,12 @@ trait PersistentQueue[A] extends scala.collection.mutable.Queue[A] val storage: QueueStorageBackend[A] def commit = { - enqueuedNDequeuedEntries.toList.foreach { e => - e._2 match { - case ENQ => storage.enqueue(uuid, e._1.get) - case DEQ => storage.dequeue(uuid) - } + enqueuedNDequeuedEntries.toList.foreach { + e => + e._2 match { + case ENQ => storage.enqueue(uuid, e._1.get) + case DEQ => storage.dequeue(uuid) + } } if (shouldClearOnCommit.isDefined && shouldClearOnCommit.get) { storage.remove(uuid) @@ -635,7 +667,7 @@ trait PersistentQueue[A] extends scala.collection.mutable.Queue[A] override def size: Int = try { storage.size(uuid) + localQ.get.length - } catch { case e: Exception => 0 } + } catch {case e: Exception => 0} override def isEmpty: Boolean = size == 0 @@ -644,10 +676,12 @@ trait PersistentQueue[A] extends scala.collection.mutable.Queue[A] enqueue(elem) this } + def ++=(elems: Iterator[A]) = { enqueue(elems.toList: _*) this } + def ++=(elems: Iterable[A]): Unit = this ++= elems.iterator override def dequeueFirst(p: A => Boolean): Option[A] = @@ -670,24 +704,24 @@ trait PersistentQueue[A] extends scala.collection.mutable.Queue[A] *

* zscore can be implemented in a variety of ways by the calling class: *

- * trait ZScorable {
+ * trait ZScorable       {
  *   def toZScore: Float
  * }
  *
- * class Foo extends ZScorable {
+ * class Foo extends ZScorable       {
  *   //.. implemnetation
  * }
  * 
* Or we can also use views: *
- * class Foo {
+ * class Foo       {
  *   //..
  * }
  *
- * implicit def Foo2Scorable(foo: Foo): ZScorable = new ZScorable {
- *   def toZScore = {
+ * implicit def Foo2Scorable(foo: Foo): ZScorable = new ZScorable       {
+ *   def toZScore =       {
  *     //..
- *   }
+ * }
  * }
  * 
* @@ -696,7 +730,6 @@ trait PersistentQueue[A] extends scala.collection.mutable.Queue[A] * @author */ trait PersistentSortedSet[A] extends Transactional with Committable with Abortable { - protected val newElems = TransactionalMap[A, Float]() protected val removedElems = TransactionalVector[A]() @@ -729,8 +762,8 @@ trait PersistentSortedSet[A] extends Transactional with Committable with Abortab } private def inStorage(elem: A): Option[Float] = storage.zscore(uuid, elem) match { - case Some(s) => Some(s.toFloat) - case None => None + case Some(s) => Some(s.toFloat) + case None => None } def contains(elem: A): Boolean = { @@ -758,8 +791,8 @@ trait PersistentSortedSet[A] extends Transactional with Committable with Abortab def compare(that: (A, Float)) = x._2 compare that._2 } - implicit def ordering = new scala.math.Ordering[(A,Float)] { - def compare(x: (A, Float),y : (A,Float)) = x._2 compare y._2 + implicit def ordering = new scala.math.Ordering[(A, Float)] { + def compare(x: (A, Float), y: (A, Float)) = x._2 compare y._2 } @@ -773,9 +806,9 @@ trait PersistentSortedSet[A] extends Transactional with Committable with Abortab // -1 means the last element, -2 means the second last val s = if (start < 0) start + l else start val e = - if (end < 0) end + l - else if (end >= l) (l - 1) - else end + if (end < 0) end + l + else if (end >= l) (l - 1) + else end // slice is open at the end, we need a closed end range ts.iterator.slice(s, e + 1).toList } diff --git a/akka-persistence/akka-persistence-common/src/test/scala/MapStorageBackendTest.scala b/akka-persistence/akka-persistence-common/src/test/scala/MapStorageBackendTest.scala index 38c2e9e45a..f2203cd282 100644 --- a/akka-persistence/akka-persistence-common/src/test/scala/MapStorageBackendTest.scala +++ b/akka-persistence/akka-persistence-common/src/test/scala/MapStorageBackendTest.scala @@ -8,7 +8,9 @@ import org.scalatest.matchers.ShouldMatchers import se.scalablesolutions.akka.util.Logging import org.scalatest.{BeforeAndAfterEach, Spec} import scala.util.Random -import collection.immutable.{HashMap, HashSet} +import collection.immutable.{TreeMap, HashMap, HashSet} +import se.scalablesolutions.akka.persistence.common.PersistentMapBinary.COrdering._ + /** * Implementation Compatibility test for PersistentMap backend implementations. @@ -106,38 +108,44 @@ trait MapStorageBackendTest extends Spec with ShouldMatchers with BeforeAndAfter - it("should return all the key value pairs in the map (in the correct order?) when getMapStorageFor(name) is called") { + it("should return all the key value pairs in the map in the correct order when getMapStorageFor(name) is called") { val mapName = "allTest" val rand = new Random(3).nextInt(100) - val entries = (1 to rand).toList.map { + var entries = new TreeMap[Array[Byte], Array[Byte]]()(ArrayOrdering) + (1 to rand).foreach { index => - (("allTestKey" + index).getBytes -> ("allTestValue" + index).getBytes) + entries += (("allTestKey" + index).getBytes -> ("allTestValue" + index).getBytes) } - storage.insertMapStorageEntriesFor(mapName, entries) + storage.insertMapStorageEntriesFor(mapName, entries.toList) val retrieved = storage.getMapStorageFor(mapName) retrieved.size should be(rand) entries.size should be(rand) + val entryMap = new HashMap[String, String] ++ entries.map {_ match {case (k, v) => (new String(k), new String(v))}} val retrievedMap = new HashMap[String, String] ++ entries.map {_ match {case (k, v) => (new String(k), new String(v))}} entryMap should equal(retrievedMap) - //Should the ordering of key-vals returned be enforced? - //ordered by key? - //using what comaparison? + (0 until rand).foreach { + i: Int => { + new String(entries.toList(i)._1) should be(new String(retrieved(i)._1)) + } + } + + } + + it("should return all the key->value pairs that exist in the map that are between start and end, up to count pairs when getMapStorageRangeFor is called") { + //implement if this method will be used } it("should not throw an exception when size is called on a non existent map?") { storage.getMapStorageSizeFor("nonExistent") should be(0) } - it("should behave properly when getMapStorageRange is called?") { - //No current code calls getMapStorageRangeFor - } - + } } \ No newline at end of file diff --git a/akka-persistence/akka-persistence-voldemort/src/main/scala/VoldemortStorageBackend.scala b/akka-persistence/akka-persistence-voldemort/src/main/scala/VoldemortStorageBackend.scala index 20b9804ed4..da8fe9c1b6 100644 --- a/akka-persistence/akka-persistence-voldemort/src/main/scala/VoldemortStorageBackend.scala +++ b/akka-persistence/akka-persistence-voldemort/src/main/scala/VoldemortStorageBackend.scala @@ -17,9 +17,10 @@ import voldemort.versioning.Versioned import collection.JavaConversions import java.nio.ByteBuffer import collection.Map -import collection.immutable.{IndexedSeq, SortedSet, TreeSet, HashMap} import collection.mutable.{Set, HashSet, ArrayBuffer} import java.util.{Properties, Map => JMap} +import se.scalablesolutions.akka.persistence.common.PersistentMapBinary.COrdering._ +import collection.immutable._ /* RequiredReads + RequiredWrites should be > ReplicationFactor for all Voldemort Stores @@ -54,11 +55,8 @@ MapStorageBackend[Array[Byte], Array[Byte]] with val vectorSizeIndex = getIndexedBytes(-1) val queueHeadIndex = getIndexedBytes(-1) val queueTailIndex = getIndexedBytes(-2) - - - implicit val byteOrder = new Ordering[Array[Byte]] { - override def compare(x: Array[Byte], y: Array[Byte]) = ByteUtils.compare(x, y) - } + //explicit implicit :) + implicit val ordering = ArrayOrdering def getRefStorageFor(name: String): Option[Array[Byte]] = { @@ -90,17 +88,17 @@ MapStorageBackend[Array[Byte], Array[Byte]] with mapKey => getKey(name, mapKey) })) - val buf = new ArrayBuffer[(Array[Byte], Array[Byte])](all.size) + var returned = new TreeMap[Array[Byte], Array[Byte]]()(ordering) JavaConversions.asMap(all).foreach { (entry) => { entry match { - case (key: Array[Byte], versioned: Versioned[Array[Byte]]) => { - buf += key -> versioned.getValue + case (namePlusKey: Array[Byte], versioned: Versioned[Array[Byte]]) => { + returned += getMapKeyFromKey(name, namePlusKey) -> versioned.getValue } } } } - buf.toList + returned.toList } def getMapStorageSizeFor(name: String): Int = { @@ -263,7 +261,7 @@ MapStorageBackend[Array[Byte], Array[Byte]] with try { queueClient.delete(key) } catch { - //a failure to delete is ok, just leaves a K-V in Voldemort that will be overwritten if the queue ever wraps around + //a failure to delete is ok, just leaves a K-V in Voldemort that will be overwritten if the queue ever wraps around case e: Exception => log.warn(e, "caught an exception while deleting a dequeued element, however this will not cause any inconsistency in the queue") } } @@ -332,6 +330,13 @@ MapStorageBackend[Array[Byte], Array[Byte]] with IntSerializer.fromBytes(indexBytes) } + def getMapKeyFromKey(owner: String, key: Array[Byte]): Array[Byte] = { + val mapKeyLength = key.length - IntSerializer.bytesPerInt - owner.getBytes("UTF-8").length + val mapkey = new Array[Byte](mapKeyLength) + System.arraycopy(key, key.length - mapKeyLength, mapkey, 0, mapKeyLength) + mapkey + } + def getClientConfig(configMap: Map[String, String]): Properties = { val properites = new Properties @@ -450,6 +455,8 @@ MapStorageBackend[Array[Byte], Array[Byte]] with } def fromBytes(bytes: Array[Byte]): SortedSet[Array[Byte]] = { + import se.scalablesolutions.akka.persistence.common.PersistentMapBinary.COrdering._ + var set = new TreeSet[Array[Byte]] if (bytes.length > IntSerializer.bytesPerInt) { var pos = 0 diff --git a/akka-persistence/akka-persistence-voldemort/src/test/scala/VoldemortPersistentDatastructureSuite.scala b/akka-persistence/akka-persistence-voldemort/src/test/scala/VoldemortPersistentDatastructureSuite.scala index 76bb989ac9..b283cad692 100644 --- a/akka-persistence/akka-persistence-voldemort/src/test/scala/VoldemortPersistentDatastructureSuite.scala +++ b/akka-persistence/akka-persistence-voldemort/src/test/scala/VoldemortPersistentDatastructureSuite.scala @@ -5,7 +5,7 @@ import org.scalatest.matchers.ShouldMatchers import org.junit.runner.RunWith import org.scalatest.junit.JUnitRunner import se.scalablesolutions.akka.persistence.voldemort.VoldemortStorageBackend._ -import se.scalablesolutions.akka.actor.{newUuid,Uuid} +import se.scalablesolutions.akka.actor.{newUuid, Uuid} import collection.immutable.TreeSet import VoldemortStorageBackendSuite._ @@ -84,4 +84,18 @@ class VoldemortPersistentDatastructureSuite extends FunSuite with ShouldMatchers } + test("Persistent Maps work as expected") { + atomic { + val map = VoldemortStorage.getMap("map") + map.put("mapTest".getBytes, null) + } + + atomic { + val map = VoldemortStorage.getMap("map") + map.get("mapTest".getBytes).get should be(null) + } + + + } + } \ No newline at end of file From 81407006c16ec7916967d793c2145149be81d49e Mon Sep 17 00:00:00 2001 From: ticktock Date: Thu, 30 Sep 2010 19:13:19 -0400 Subject: [PATCH 08/12] Map Spec --- .../src/test/scala/MapStorageBackendTest.scala | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/akka-persistence/akka-persistence-common/src/test/scala/MapStorageBackendTest.scala b/akka-persistence/akka-persistence-common/src/test/scala/MapStorageBackendTest.scala index f2203cd282..2021c1d3aa 100644 --- a/akka-persistence/akka-persistence-common/src/test/scala/MapStorageBackendTest.scala +++ b/akka-persistence/akka-persistence-common/src/test/scala/MapStorageBackendTest.scala @@ -141,11 +141,16 @@ trait MapStorageBackendTest extends Spec with ShouldMatchers with BeforeAndAfter //implement if this method will be used } + + it("should return Some(null), not None, for a key that has had the value null set?") { + + } + it("should not throw an exception when size is called on a non existent map?") { storage.getMapStorageSizeFor("nonExistent") should be(0) } - + } } \ No newline at end of file From f8d77e04ea572873efe73dce01e56b8a096e045a Mon Sep 17 00:00:00 2001 From: ticktock Date: Thu, 30 Sep 2010 19:13:59 -0400 Subject: [PATCH 09/12] More VectorStorageBackend tests plus an abstract Ticket343Test with a working VoldemortImpl --- .../src/main/scala/Storage.scala | 10 +- .../src/test/scala/Ticket343Test.scala | 362 ++++++++++++++++++ .../test/scala/VectorStorageBackendTest.scala | 45 ++- .../main/scala/VoldemortStorageBackend.scala | 13 +- .../test/scala/VoldemortTicket343Test.scala | 22 ++ 5 files changed, 443 insertions(+), 9 deletions(-) create mode 100644 akka-persistence/akka-persistence-common/src/test/scala/Ticket343Test.scala create mode 100644 akka-persistence/akka-persistence-voldemort/src/test/scala/VoldemortTicket343Test.scala diff --git a/akka-persistence/akka-persistence-common/src/main/scala/Storage.scala b/akka-persistence/akka-persistence-common/src/main/scala/Storage.scala index 08d42c9148..9d98095045 100644 --- a/akka-persistence/akka-persistence-common/src/main/scala/Storage.scala +++ b/akka-persistence/akka-persistence-common/src/main/scala/Storage.scala @@ -704,22 +704,22 @@ trait PersistentQueue[A] extends scala.collection.mutable.Queue[A] *

* zscore can be implemented in a variety of ways by the calling class: *

- * trait ZScorable       {
+ * trait ZScorable        {
  *   def toZScore: Float
  * }
  *
- * class Foo extends ZScorable       {
+ * class Foo extends ZScorable        {
  *   //.. implemnetation
  * }
  * 
* Or we can also use views: *
- * class Foo       {
+ * class Foo        {
  *   //..
  * }
  *
- * implicit def Foo2Scorable(foo: Foo): ZScorable = new ZScorable       {
- *   def toZScore =       {
+ * implicit def Foo2Scorable(foo: Foo): ZScorable = new ZScorable        {
+ *   def toZScore =        {
  *     //..
  * }
  * }
diff --git a/akka-persistence/akka-persistence-common/src/test/scala/Ticket343Test.scala b/akka-persistence/akka-persistence-common/src/test/scala/Ticket343Test.scala
new file mode 100644
index 0000000000..eb724144a2
--- /dev/null
+++ b/akka-persistence/akka-persistence-common/src/test/scala/Ticket343Test.scala
@@ -0,0 +1,362 @@
+/**
+ * Copyright (C) 2009-2010 Scalable Solutions AB 
+ */
+
+package se.scalablesolutions.akka.persistence.common
+
+import org.scalatest.Spec
+import org.scalatest.matchers.ShouldMatchers
+import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
+import org.scalatest.junit.JUnitRunner
+import org.junit.runner.RunWith
+
+import se.scalablesolutions.akka.actor.{Actor, ActorRef}
+import se.scalablesolutions.akka.config.OneForOneStrategy
+import Actor._
+import se.scalablesolutions.akka.stm.global._
+import se.scalablesolutions.akka.config.ScalaConfig._
+import se.scalablesolutions.akka.util.Logging
+import StorageObj._
+
+
+case class GET(k: String)
+case class SET(k: String, v: String)
+case class REM(k: String)
+case class CONTAINS(k: String)
+case object MAP_SIZE
+case class MSET(kvs: List[(String, String)])
+case class REMOVE_AFTER_PUT(kvsToAdd: List[(String, String)], ksToRem: List[String])
+case class CLEAR_AFTER_PUT(kvsToAdd: List[(String, String)])
+case class PUT_WITH_SLICE(kvsToAdd: List[(String, String)], start: String, cnt: Int)
+case class PUT_REM_WITH_SLICE(kvsToAdd: List[(String, String)], ksToRem: List[String], start: String, cnt: Int)
+
+case class VADD(v: String)
+case class VUPD(i: Int, v: String)
+case class VUPD_AND_ABORT(i: Int, v: String)
+case class VGET(i: Int)
+case object VSIZE
+case class VGET_AFTER_VADD(vsToAdd: List[String], isToFetch: List[Int])
+case class VADD_WITH_SLICE(vsToAdd: List[String], start: Int, cnt: Int)
+
+
+object StorageObj {
+  var getMap: String => PersistentMap[Array[Byte], Array[Byte]] = _
+  var getVector: String => PersistentVector[Array[Byte]] = _
+
+  class SampleMapStorage extends Actor {
+    self.lifeCycle = Some(LifeCycle(Permanent))
+    val FOO_MAP = "akka.sample.map"
+
+    private var fooMap = atomic {StorageObj.getMap(FOO_MAP)}
+
+    def receive = {
+      case SET(k, v) =>
+        atomic {
+          fooMap += (k.getBytes, v.getBytes)
+        }
+        self.reply((k, v))
+
+      case GET(k) =>
+        val v = atomic {
+          fooMap.get(k.getBytes).map(new String(_)).getOrElse(k + " Not found")
+        }
+        self.reply(v)
+
+      case REM(k) =>
+        val v = atomic {
+          fooMap -= k.getBytes
+        }
+        self.reply(k)
+
+      case CONTAINS(k) =>
+        val v = atomic {
+          fooMap contains k.getBytes
+        }
+        self.reply(v)
+
+      case MAP_SIZE =>
+        val v = atomic {
+          fooMap.size
+        }
+        self.reply(v)
+
+      case MSET(kvs) => atomic {
+        kvs.foreach {kv => fooMap += (kv._1.getBytes, kv._2.getBytes)}
+      }
+      self.reply(kvs.size)
+
+      case REMOVE_AFTER_PUT(kvs2add, ks2rem) => atomic {
+        kvs2add.foreach {
+          kv =>
+            fooMap += (kv._1.getBytes, kv._2.getBytes)
+        }
+
+        ks2rem.foreach {
+          k =>
+            fooMap -= k.getBytes
+        }
+      }
+      self.reply(fooMap.size)
+
+      case CLEAR_AFTER_PUT(kvs2add) => atomic {
+        kvs2add.foreach {
+          kv =>
+            fooMap += (kv._1.getBytes, kv._2.getBytes)
+        }
+        fooMap.clear
+      }
+      self.reply(true)
+
+      case PUT_WITH_SLICE(kvs2add, from, cnt) =>
+        val v = atomic {
+          kvs2add.foreach {
+            kv =>
+              fooMap += (kv._1.getBytes, kv._2.getBytes)
+          }
+          fooMap.slice(Some(from.getBytes), cnt)
+        }
+        self.reply(v: List[(Array[Byte], Array[Byte])])
+
+      case PUT_REM_WITH_SLICE(kvs2add, ks2rem, from, cnt) =>
+        val v = atomic {
+          kvs2add.foreach {
+            kv =>
+              fooMap += (kv._1.getBytes, kv._2.getBytes)
+          }
+          ks2rem.foreach {
+            k =>
+              fooMap -= k.getBytes
+          }
+          fooMap.slice(Some(from.getBytes), cnt)
+        }
+        self.reply(v: List[(Array[Byte], Array[Byte])])
+    }
+  }
+
+  class SampleVectorStorage extends Actor {
+    self.lifeCycle = Some(LifeCycle(Permanent))
+    val FOO_VECTOR = "akka.sample.vector"
+
+    private var fooVector = atomic {StorageObj.getVector(FOO_VECTOR)}
+
+    def receive = {
+      case VADD(v) =>
+        val size =
+        atomic {
+          fooVector + v.getBytes
+          fooVector length
+        }
+        self.reply(size)
+
+      case VGET(index) =>
+        val ind =
+        atomic {
+          fooVector get index
+        }
+        self.reply(ind)
+
+      case VGET_AFTER_VADD(vs, is) =>
+        val els =
+        atomic {
+          vs.foreach(fooVector + _.getBytes)
+          (is.foldRight(List[Array[Byte]]())(fooVector.get(_) :: _)).map(new String(_))
+        }
+        self.reply(els)
+
+      case VUPD_AND_ABORT(index, value) =>
+        val l =
+        atomic {
+          fooVector.update(index, value.getBytes)
+          // force fail
+          fooVector get 100
+        }
+        self.reply(index)
+
+      case VADD_WITH_SLICE(vs, s, c) =>
+        val l =
+        atomic {
+          vs.foreach(fooVector + _.getBytes)
+          fooVector.slice(Some(s), None, c)
+        }
+        self.reply(l.map(new String(_)))
+    }
+  }
+}
+
+
+
+trait Ticket343Test extends
+Spec with
+        ShouldMatchers with
+        BeforeAndAfterEach {
+  def getMap: String => PersistentMap[Array[Byte], Array[Byte]]
+
+  def getVector: String => PersistentVector[Array[Byte]]
+
+
+  def dropMapsAndVectors: Unit
+
+  override def beforeEach {
+    StorageObj.getMap = getMap
+    StorageObj.getVector = getVector
+    dropMapsAndVectors
+    println("** dropMapsAndVectors")
+  }
+
+  override def afterEach {
+    dropMapsAndVectors
+    println("** dropMapsAndVectors")
+  }
+
+  describe("Ticket 343 Issue #1") {
+    it("remove after put should work within the same transaction") {
+      val proc = actorOf[SampleMapStorage]
+      proc.start
+
+      (proc !! SET("debasish", "anshinsoft")).getOrElse("Set failed") should equal(("debasish", "anshinsoft"))
+      (proc !! GET("debasish")).getOrElse("Get failed") should equal("anshinsoft")
+      (proc !! MAP_SIZE).getOrElse("Size failed") should equal(1)
+
+      (proc !! MSET(List(("dg", "1"), ("mc", "2"), ("nd", "3")))).getOrElse("Mset failed") should equal(3)
+
+      (proc !! GET("dg")).getOrElse("Get failed") should equal("1")
+      (proc !! GET("mc")).getOrElse("Get failed") should equal("2")
+      (proc !! GET("nd")).getOrElse("Get failed") should equal("3")
+
+      (proc !! MAP_SIZE).getOrElse("Size failed") should equal(4)
+
+      val add = List(("a", "1"), ("b", "2"), ("c", "3"))
+      val rem = List("a", "debasish")
+      (proc !! REMOVE_AFTER_PUT(add, rem)).getOrElse("REMOVE_AFTER_PUT failed") should equal(5)
+
+      (proc !! GET("debasish")).getOrElse("debasish not found") should equal("debasish Not found")
+      (proc !! GET("a")).getOrElse("a not found") should equal("a Not found")
+
+      (proc !! GET("b")).getOrElse("b not found") should equal("2")
+
+      (proc !! CONTAINS("b")).getOrElse("b not found") should equal(true)
+      (proc !! CONTAINS("debasish")).getOrElse("debasish not found") should equal(false)
+      (proc !! MAP_SIZE).getOrElse("Size failed") should equal(5)
+      proc.stop
+    }
+  }
+
+  describe("Ticket 343 Issue #2") {
+    it("clear after put should work within the same transaction") {
+      val proc = actorOf[SampleMapStorage]
+      proc.start
+
+      (proc !! SET("debasish", "anshinsoft")).getOrElse("Set failed") should equal(("debasish", "anshinsoft"))
+      (proc !! GET("debasish")).getOrElse("Get failed") should equal("anshinsoft")
+      (proc !! MAP_SIZE).getOrElse("Size failed") should equal(1)
+
+      val add = List(("a", "1"), ("b", "2"), ("c", "3"))
+      (proc !! CLEAR_AFTER_PUT(add)).getOrElse("CLEAR_AFTER_PUT failed") should equal(true)
+
+      (proc !! MAP_SIZE).getOrElse("Size failed") should equal(0)
+      proc.stop
+    }
+  }
+
+  describe("Ticket 343 Issue #3") {
+    it("map size should change after the transaction") {
+      val proc = actorOf[SampleMapStorage]
+      proc.start
+
+      (proc !! SET("debasish", "anshinsoft")).getOrElse("Set failed") should equal(("debasish", "anshinsoft"))
+      (proc !! GET("debasish")).getOrElse("Get failed") should equal("anshinsoft")
+      (proc !! MAP_SIZE).getOrElse("Size failed") should equal(1)
+
+      (proc !! MSET(List(("dg", "1"), ("mc", "2"), ("nd", "3")))).getOrElse("Mset failed") should equal(3)
+      (proc !! MAP_SIZE).getOrElse("Size failed") should equal(4)
+
+      (proc !! GET("dg")).getOrElse("Get failed") should equal("1")
+      (proc !! GET("mc")).getOrElse("Get failed") should equal("2")
+      (proc !! GET("nd")).getOrElse("Get failed") should equal("3")
+      proc.stop
+    }
+  }
+
+  describe("slice test") {
+    it("should pass") {
+      val proc = actorOf[SampleMapStorage]
+      proc.start
+
+      (proc !! SET("debasish", "anshinsoft")).getOrElse("Set failed") should equal(("debasish", "anshinsoft"))
+      (proc !! GET("debasish")).getOrElse("Get failed") should equal("anshinsoft")
+      // (proc !! MAP_SIZE).getOrElse("Size failed") should equal(1)
+
+      (proc !! MSET(List(("dg", "1"), ("mc", "2"), ("nd", "3")))).getOrElse("Mset failed") should equal(3)
+      (proc !! MAP_SIZE).getOrElse("Size failed") should equal(4)
+
+      (proc !! PUT_WITH_SLICE(List(("ec", "1"), ("tb", "2"), ("mc", "10")), "dg", 3)).get.asInstanceOf[List[(Array[Byte], Array[Byte])]].map {case (k, v) => (new String(k), new String(v))} should equal(List(("dg", "1"), ("ec", "1"), ("mc", "10")))
+
+      (proc !! PUT_REM_WITH_SLICE(List(("fc", "1"), ("gb", "2"), ("xy", "10")), List("tb", "fc"), "dg", 5)).get.asInstanceOf[List[(Array[Byte], Array[Byte])]].map {case (k, v) => (new String(k), new String(v))} should equal(List(("dg", "1"), ("ec", "1"), ("gb", "2"), ("mc", "10"), ("nd", "3")))
+      proc.stop
+    }
+  }
+
+  describe("Ticket 343 Issue #4") {
+    it("vector get should not ignore elements that were in vector before transaction") {
+
+      val proc = actorOf[SampleVectorStorage]
+      proc.start
+
+      // add 4 elements in separate transactions
+      (proc !! VADD("debasish")).getOrElse("VADD failed") should equal(1)
+      (proc !! VADD("maulindu")).getOrElse("VADD failed") should equal(2)
+      (proc !! VADD("ramanendu")).getOrElse("VADD failed") should equal(3)
+      (proc !! VADD("nilanjan")).getOrElse("VADD failed") should equal(4)
+
+      new String((proc !! VGET(0)).get.asInstanceOf[Array[Byte]]) should equal("nilanjan")
+      new String((proc !! VGET(1)).get.asInstanceOf[Array[Byte]]) should equal("ramanendu")
+      new String((proc !! VGET(2)).get.asInstanceOf[Array[Byte]]) should equal("maulindu")
+      new String((proc !! VGET(3)).get.asInstanceOf[Array[Byte]]) should equal("debasish")
+
+      // now add 3 more and do gets in the same transaction
+      (proc !! VGET_AFTER_VADD(List("a", "b", "c"), List(0, 2, 4))).get.asInstanceOf[List[String]] should equal(List("c", "a", "ramanendu"))
+      proc.stop
+    }
+  }
+
+  describe("Ticket 343 Issue #6") {
+    it("vector update should not ignore transaction") {
+      val proc = actorOf[SampleVectorStorage]
+      proc.start
+
+      // add 4 elements in separate transactions
+      (proc !! VADD("debasish")).getOrElse("VADD failed") should equal(1)
+      (proc !! VADD("maulindu")).getOrElse("VADD failed") should equal(2)
+      (proc !! VADD("ramanendu")).getOrElse("VADD failed") should equal(3)
+      (proc !! VADD("nilanjan")).getOrElse("VADD failed") should equal(4)
+
+      evaluating {
+        (proc !! VUPD_AND_ABORT(0, "virat")).getOrElse("VUPD_AND_ABORT failed")
+      } should produce[Exception]
+
+      // update aborts and hence values will remain unchanged
+      new String((proc !! VGET(0)).get.asInstanceOf[Array[Byte]]) should equal("nilanjan")
+      proc.stop
+    }
+  }
+
+  describe("Ticket 343 Issue #5") {
+    it("vector slice() should not ignore elements added in current transaction") {
+      val proc = actorOf[SampleVectorStorage]
+      proc.start
+
+      // add 4 elements in separate transactions
+      (proc !! VADD("debasish")).getOrElse("VADD failed") should equal(1)
+      (proc !! VADD("maulindu")).getOrElse("VADD failed") should equal(2)
+      (proc !! VADD("ramanendu")).getOrElse("VADD failed") should equal(3)
+      (proc !! VADD("nilanjan")).getOrElse("VADD failed") should equal(4)
+
+      // slice with no new elements added in current transaction
+      (proc !! VADD_WITH_SLICE(List(), 2, 2)).getOrElse("VADD_WITH_SLICE failed") should equal(Vector("maulindu", "debasish"))
+
+      // slice with new elements added in current transaction
+      (proc !! VADD_WITH_SLICE(List("a", "b", "c", "d"), 2, 2)).getOrElse("VADD_WITH_SLICE failed") should equal(Vector("b", "a"))
+      proc.stop
+    }
+  }
+}
diff --git a/akka-persistence/akka-persistence-common/src/test/scala/VectorStorageBackendTest.scala b/akka-persistence/akka-persistence-common/src/test/scala/VectorStorageBackendTest.scala
index 8b0008ea5a..87b20566c6 100644
--- a/akka-persistence/akka-persistence-common/src/test/scala/VectorStorageBackendTest.scala
+++ b/akka-persistence/akka-persistence-common/src/test/scala/VectorStorageBackendTest.scala
@@ -7,6 +7,7 @@ package se.scalablesolutions.akka.persistence.common
 import org.scalatest.matchers.ShouldMatchers
 import se.scalablesolutions.akka.util.Logging
 import org.scalatest.{BeforeAndAfterEach, Spec}
+import scala.util.Random
 
 /**
  * Implementation Compatibility test for PersistentVector backend implementations.
@@ -28,8 +29,50 @@ trait VectorStorageBackendTest extends Spec with ShouldMatchers with BeforeAndAf
   }
 
 
-  describe("A Properly functioning VectorStorageBackend") {
 
+  describe("A Properly functioning VectorStorageBackend") {
+    it("should insertVectorStorageEntry as a logical prepend operation to the existing list") {
+      val vector = "insertSingleTest"
+      val rand = new Random(3).nextInt(100)
+      val values = (0 to rand).toList.map {i: Int => vector + "value" + i}
+      storage.getVectorStorageSizeFor(vector) should be(0)
+      values.foreach {s: String => storage.insertVectorStorageEntryFor(vector, s.getBytes)}
+      val shouldRetrieve = values.reverse
+      (0 to rand).foreach {
+        i: Int => {
+          shouldRetrieve(i) should be(new String(storage.getVectorStorageEntryFor(vector, i)))
+        }
+      }
+    }
+
+    it("should insertVectorStorageEntries as a logical prepend operation to the existing list") {
+      val vector = "insertMultiTest"
+      val rand = new Random(3).nextInt(100)
+      val values = (0 to rand).toList.map {i: Int => vector + "value" + i}
+      storage.getVectorStorageSizeFor(vector) should be(0)
+      storage.insertVectorStorageEntriesFor(vector, values.map {s: String => s.getBytes})
+      val shouldRetrieve = values.reverse
+      (0 to rand).foreach {
+        i: Int => {
+          shouldRetrieve(i) should be(new String(storage.getVectorStorageEntryFor(vector, i)))
+        }
+      }
+    }
+
+    it("should successfully update entries") {
+      val vector = "updateTest"
+      val rand = new Random(3).nextInt(100)
+      val values = (0 to rand).toList.map {i: Int => vector + "value" + i}
+      val urand = new Random(3).nextInt(rand)
+      storage.insertVectorStorageEntriesFor(vector, values.map {s: String => s.getBytes})
+      val toUpdate = "updated" + values.reverse(urand)
+      storage.updateVectorStorageEntryFor(vector, urand, toUpdate.getBytes)
+      toUpdate should be(new String(storage.getVectorStorageEntryFor(vector, urand)))
+    }
+
+    
+
+    //getStorageEntry for a non existent entry?
   }
 
 }
\ No newline at end of file
diff --git a/akka-persistence/akka-persistence-voldemort/src/main/scala/VoldemortStorageBackend.scala b/akka-persistence/akka-persistence-voldemort/src/main/scala/VoldemortStorageBackend.scala
index da8fe9c1b6..5cbe0097df 100644
--- a/akka-persistence/akka-persistence-voldemort/src/main/scala/VoldemortStorageBackend.scala
+++ b/akka-persistence/akka-persistence-voldemort/src/main/scala/VoldemortStorageBackend.scala
@@ -174,9 +174,11 @@ MapStorageBackend[Array[Byte], Array[Byte]] with
     } else {
       count
     }
+
+
     val seq: IndexedSeq[Array[Byte]] = (st until st + cnt).map {
       index => getIndexedKey(name, index)
-    }
+    }.reverse //read backwards
 
     val all: JMap[Array[Byte], Versioned[Array[Byte]]] = vectorClient.getAll(JavaConversions.asIterable(seq))
 
@@ -197,12 +199,17 @@ MapStorageBackend[Array[Byte], Array[Byte]] with
 
 
   def getVectorStorageEntryFor(name: String, index: Int): Array[Byte] = {
-    vectorClient.getValue(getIndexedKey(name, index), Array.empty[Byte])
+    val size = getVectorStorageSizeFor(name)
+    if (size > 0) {
+      vectorClient.getValue(getIndexedKey(name, /*read backwards*/ (size - 1) - index))
+    } else {
+      Array.empty[Byte] //is this what to return?
+    }
   }
 
   def updateVectorStorageEntryFor(name: String, index: Int, elem: Array[Byte]) = {
     val size = getVectorStorageSizeFor(name)
-    vectorClient.put(getIndexedKey(name, index), elem)
+    vectorClient.put(getIndexedKey(name, /*read backwards*/ (size - 1) - index), elem)
     if (size < index + 1) {
       vectorClient.put(getKey(name, vectorSizeIndex), IntSerializer.toBytes(index + 1))
     }
diff --git a/akka-persistence/akka-persistence-voldemort/src/test/scala/VoldemortTicket343Test.scala b/akka-persistence/akka-persistence-voldemort/src/test/scala/VoldemortTicket343Test.scala
new file mode 100644
index 0000000000..b170f949cf
--- /dev/null
+++ b/akka-persistence/akka-persistence-voldemort/src/test/scala/VoldemortTicket343Test.scala
@@ -0,0 +1,22 @@
+/**
+ * Copyright (C) 2009-2010 Scalable Solutions AB 
+ */
+
+package se.scalablesolutions.akka.persistence.voldemort
+
+
+import org.junit.runner.RunWith
+import org.scalatest.junit.JUnitRunner
+import se.scalablesolutions.akka.persistence.common._
+
+@RunWith(classOf[JUnitRunner])
+class VoldemortTicket343Test extends Ticket343Test with EmbeddedVoldemort {
+  def dropMapsAndVectors: Unit = {
+    admin.truncate(0, VoldemortStorageBackend.mapStore)
+    admin.truncate(0, VoldemortStorageBackend.vectorStore)
+  }
+
+  def getVector: (String) => PersistentVector[Array[Byte]] = VoldemortStorage.getVector
+
+  def getMap: (String) => PersistentMap[Array[Byte], Array[Byte]] = VoldemortStorage.getMap
+}
\ No newline at end of file

From 3411ad6be1b0a6e4bff824255cc5f399e4f95575 Mon Sep 17 00:00:00 2001
From: ticktock 
Date: Thu, 30 Sep 2010 19:35:10 -0400
Subject: [PATCH 10/12] two more stub tests in Vector Spec

---
 .../test/scala/VectorStorageBackendTest.scala | 27 +++++++++++++++++--
 1 file changed, 25 insertions(+), 2 deletions(-)

diff --git a/akka-persistence/akka-persistence-common/src/test/scala/VectorStorageBackendTest.scala b/akka-persistence/akka-persistence-common/src/test/scala/VectorStorageBackendTest.scala
index 87b20566c6..c88795cc23 100644
--- a/akka-persistence/akka-persistence-common/src/test/scala/VectorStorageBackendTest.scala
+++ b/akka-persistence/akka-persistence-common/src/test/scala/VectorStorageBackendTest.scala
@@ -70,9 +70,32 @@ trait VectorStorageBackendTest extends Spec with ShouldMatchers with BeforeAndAf
       toUpdate should be(new String(storage.getVectorStorageEntryFor(vector, urand)))
     }
 
-    
+    it("should return the correct value from getVectorStorageFor") {
+      val vector = "getTest"
+      val rand = new Random(3).nextInt(100)
+      val values = (0 to rand).toList.map {i: Int => vector + "value" + i}
+      val urand = new Random(3).nextInt(rand)
+      storage.insertVectorStorageEntriesFor(vector, values.map {s: String => s.getBytes})
+      values.reverse(urand) should be(new String(storage.getVectorStorageEntryFor(vector, urand)))
+    }
+
+    it("should return the correct values from getVectorStorageRangeFor") {
+      val vector = "getTest"
+      val rand = new Random(3).nextInt(100)
+      val drand = new Random(3).nextInt(rand)
+      val values = (0 to rand).toList.map {i: Int => vector + "value" + i}
+      storage.insertVectorStorageEntriesFor(vector, values.map {s: String => s.getBytes})
+      values.reverse should be(storage.getVectorStorageRangeFor(vector, None, None, rand + 1).map {b: Array[Byte] => new String(b)})
+    }
+
+    it("should behave properly when the range used in getVectorStorageRangeFor has indexes outside the current size of the vector") {
+      //what is proper?
+    }
+
+    it("shoud behave properly when getStorageEntry for a non existent entry?") {
+      //What is proper?
+    }
 
-    //getStorageEntry for a non existent entry?
   }
 
 }
\ No newline at end of file

From 97a5c05b53f51a7681cce38085ca1f4c742f9f12 Mon Sep 17 00:00:00 2001
From: ticktock 
Date: Fri, 1 Oct 2010 16:04:22 -0400
Subject: [PATCH 11/12] Added tests of proper null handling for
 Ref,Vector,Map,Queue and voldemort impl/tweak

---
 .../test/scala/MapStorageBackendTest.scala    |   9 +-
 .../test/scala/QueueStorageBackendTest.scala  |   7 ++
 .../test/scala/RefStorageBackendTest.scala    |   6 ++
 .../test/scala/VectorStorageBackendTest.scala |  24 ++++-
 .../src/main/scala/VoldemortStorage.scala     |   8 ++
 .../main/scala/VoldemortStorageBackend.scala  |  79 ++++++++++----
 ...oldemortPersistentDatastructureSuite.scala | 101 ------------------
 .../scala/VoldemortStorageBackendSuite.scala  |   5 +-
 8 files changed, 112 insertions(+), 127 deletions(-)
 delete mode 100644 akka-persistence/akka-persistence-voldemort/src/test/scala/VoldemortPersistentDatastructureSuite.scala

diff --git a/akka-persistence/akka-persistence-common/src/test/scala/MapStorageBackendTest.scala b/akka-persistence/akka-persistence-common/src/test/scala/MapStorageBackendTest.scala
index 2021c1d3aa..395d0ef269 100644
--- a/akka-persistence/akka-persistence-common/src/test/scala/MapStorageBackendTest.scala
+++ b/akka-persistence/akka-persistence-common/src/test/scala/MapStorageBackendTest.scala
@@ -142,8 +142,13 @@ trait MapStorageBackendTest extends Spec with ShouldMatchers with BeforeAndAfter
     }
 
 
-    it("should return Some(null), not None, for a key that has had the value null set?") {
-
+    it("should return Some(null), not None, for a key that has had the value null set and None for a key with no value set") {
+      val mapName = "nullTest"
+      val key = "key".getBytes
+      storage.insertMapStorageEntryFor(mapName, key, null)
+      storage.getMapStorageEntryFor(mapName, key).get should be(null)
+      storage.removeMapStorageFor(mapName, key)
+      storage.getMapStorageEntryFor(mapName, key) should be(None)
     }
 
     it("should not throw an exception when size is called on a non existent map?") {
diff --git a/akka-persistence/akka-persistence-common/src/test/scala/QueueStorageBackendTest.scala b/akka-persistence/akka-persistence-common/src/test/scala/QueueStorageBackendTest.scala
index 61730a42d3..3eb89e3db5 100644
--- a/akka-persistence/akka-persistence-common/src/test/scala/QueueStorageBackendTest.scala
+++ b/akka-persistence/akka-persistence-common/src/test/scala/QueueStorageBackendTest.scala
@@ -111,6 +111,13 @@ trait QueueStorageBackendTest extends Spec with ShouldMatchers with BeforeAndAft
       storage.remove(queue)
       storage.size(queue) should be(0)
     }
+
+    it("should accept null as a value to enqueue and return Some(null) when that value is dequeued") {
+      val queue = "nullTest"
+      storage.enqueue(queue, null).get should be(1)
+      storage.dequeue(queue).get should be(null)
+      storage.dequeue(queue) should be(None)
+    }
   }
 
 }
\ No newline at end of file
diff --git a/akka-persistence/akka-persistence-common/src/test/scala/RefStorageBackendTest.scala b/akka-persistence/akka-persistence-common/src/test/scala/RefStorageBackendTest.scala
index 96451bc063..37902cf7c9 100644
--- a/akka-persistence/akka-persistence-common/src/test/scala/RefStorageBackendTest.scala
+++ b/akka-persistence/akka-persistence-common/src/test/scala/RefStorageBackendTest.scala
@@ -41,6 +41,12 @@ trait RefStorageBackendTest extends Spec with ShouldMatchers with BeforeAndAfter
       val value = name.getBytes
       storage.getRefStorageFor(name) should be(None)
     }
+
+    it("Should return None, not Some(null) when getRefStorageFor is called when null has been set") {
+      val name = "RefStorageTest #3"
+      storage.insertRefStorageFor(name, null)
+      storage.getRefStorageFor(name) should be(None)
+    }
   }
 
 }
\ No newline at end of file
diff --git a/akka-persistence/akka-persistence-common/src/test/scala/VectorStorageBackendTest.scala b/akka-persistence/akka-persistence-common/src/test/scala/VectorStorageBackendTest.scala
index c88795cc23..e677f8fe66 100644
--- a/akka-persistence/akka-persistence-common/src/test/scala/VectorStorageBackendTest.scala
+++ b/akka-persistence/akka-persistence-common/src/test/scala/VectorStorageBackendTest.scala
@@ -86,14 +86,36 @@ trait VectorStorageBackendTest extends Spec with ShouldMatchers with BeforeAndAf
       val values = (0 to rand).toList.map {i: Int => vector + "value" + i}
       storage.insertVectorStorageEntriesFor(vector, values.map {s: String => s.getBytes})
       values.reverse should be(storage.getVectorStorageRangeFor(vector, None, None, rand + 1).map {b: Array[Byte] => new String(b)})
+      (0 to drand).foreach {
+        i: Int => {
+          val value: String = vector + "value" + (rand - i)
+          log.debug(value)
+          List(value) should be(storage.getVectorStorageRangeFor(vector, Some(i), None, 1).map {b: Array[Byte] => new String(b)})
+        }
+      }
     }
 
     it("should behave properly when the range used in getVectorStorageRangeFor has indexes outside the current size of the vector") {
       //what is proper?
     }
 
-    it("shoud behave properly when getStorageEntry for a non existent entry?") {
+    it("shoud return null when getStorageEntry is called on a null entry") {
       //What is proper?
+      val vector = "nullTest"
+      storage.insertVectorStorageEntryFor(vector, null)
+      storage.getVectorStorageEntryFor(vector, 0) should be(null)
+    }
+
+    it("shoud throw a Storage exception when there is an attempt to retrieve an index larger than the Vector") {
+      val vector = "tooLargeRetrieve"
+      storage.insertVectorStorageEntryFor(vector, null)
+      evaluating {storage.getVectorStorageEntryFor(vector, 9)} should produce[StorageException]
+    }
+
+    it("shoud throw a Storage exception when there is an attempt to update an index larger than the Vector") {
+      val vector = "tooLargeUpdate"
+      storage.insertVectorStorageEntryFor(vector, null)
+      evaluating {storage.updateVectorStorageEntryFor(vector, 9, null)} should produce[StorageException]
     }
 
   }
diff --git a/akka-persistence/akka-persistence-voldemort/src/main/scala/VoldemortStorage.scala b/akka-persistence/akka-persistence-voldemort/src/main/scala/VoldemortStorage.scala
index 4e237267a5..2a9c3c5717 100644
--- a/akka-persistence/akka-persistence-voldemort/src/main/scala/VoldemortStorage.scala
+++ b/akka-persistence/akka-persistence-voldemort/src/main/scala/VoldemortStorage.scala
@@ -15,14 +15,17 @@ object VoldemortStorage extends Storage {
   def newMap: PersistentMap[ElementType, ElementType] = newMap(newUuid.toString)
   def newVector: PersistentVector[ElementType] = newVector(newUuid.toString)
   def newRef: PersistentRef[ElementType] = newRef(newUuid.toString)
+  override def newQueue: PersistentQueue[ElementType] = newQueue(newUuid.toString)
 
   def getMap(id: String): PersistentMap[ElementType, ElementType] = newMap(id)
   def getVector(id: String): PersistentVector[ElementType] = newVector(id)
   def getRef(id: String): PersistentRef[ElementType] = newRef(id)
+  override def getQueue(id: String): PersistentQueue[ElementType] = newQueue(id)
 
   def newMap(id: String): PersistentMap[ElementType, ElementType] = new VoldemortPersistentMap(id)
   def newVector(id: String): PersistentVector[ElementType] = new VoldemortPersistentVector(id)
   def newRef(id: String): PersistentRef[ElementType] = new VoldemortPersistentRef(id)
+  override def newQueue(id:String): PersistentQueue[ElementType] = new VoldemortPersistentQueue(id)
 }
 
 
@@ -41,3 +44,8 @@ class VoldemortPersistentRef(id: String) extends PersistentRef[Array[Byte]] {
   val uuid = id
   val storage = VoldemortStorageBackend
 }
+
+class VoldemortPersistentQueue(id: String) extends PersistentQueue[Array[Byte]] {
+  val uuid = id
+  val storage = VoldemortStorageBackend
+}
diff --git a/akka-persistence/akka-persistence-voldemort/src/main/scala/VoldemortStorageBackend.scala b/akka-persistence/akka-persistence-voldemort/src/main/scala/VoldemortStorageBackend.scala
index 5cbe0097df..abc7855d9c 100644
--- a/akka-persistence/akka-persistence-voldemort/src/main/scala/VoldemortStorageBackend.scala
+++ b/akka-persistence/akka-persistence-voldemort/src/main/scala/VoldemortStorageBackend.scala
@@ -50,6 +50,9 @@ MapStorageBackend[Array[Byte], Array[Byte]] with
   var queueClient: StoreClient[Array[Byte], Array[Byte]] = null
   initStoreClients
 
+  val nullMapValueHeader = 0x00.byteValue
+  val nullMapValue: Array[Byte] = Array(nullMapValueHeader)
+  val notNullMapValueHeader: Byte = 0xff.byteValue
   val underscoreBytesUTF8 = "_".getBytes("UTF-8")
   val mapKeysIndex = getIndexedBytes(-1)
   val vectorSizeIndex = getIndexedBytes(-1)
@@ -61,14 +64,14 @@ MapStorageBackend[Array[Byte], Array[Byte]] with
 
   def getRefStorageFor(name: String): Option[Array[Byte]] = {
     val result: Array[Byte] = refClient.getValue(name)
-    result match {
-      case null => None
-      case _ => Some(result)
-    }
+    Option(result)
   }
 
   def insertRefStorageFor(name: String, element: Array[Byte]) = {
-    refClient.put(name, element)
+    element match {
+      case null => refClient.delete(name)
+      case _ => refClient.put(name, element)
+    }
   }
 
   def getMapStorageRangeFor(name: String, start: Option[Array[Byte]], finish: Option[Array[Byte]], count: Int): List[(Array[Byte], Array[Byte])] = {
@@ -93,7 +96,7 @@ MapStorageBackend[Array[Byte], Array[Byte]] with
       (entry) => {
         entry match {
           case (namePlusKey: Array[Byte], versioned: Versioned[Array[Byte]]) => {
-            returned += getMapKeyFromKey(name, namePlusKey) -> versioned.getValue
+            returned += getMapKeyFromKey(name, namePlusKey) -> getMapValueFromStored(versioned.getValue)
           }
         }
       }
@@ -110,7 +113,7 @@ MapStorageBackend[Array[Byte], Array[Byte]] with
     val result: Array[Byte] = mapClient.getValue(getKey(name, key))
     result match {
       case null => None
-      case _ => Some(result)
+      case _ => Some(getMapValueFromStored(result))
     }
   }
 
@@ -132,7 +135,7 @@ MapStorageBackend[Array[Byte], Array[Byte]] with
   }
 
   def insertMapStorageEntryFor(name: String, key: Array[Byte], value: Array[Byte]) = {
-    mapClient.put(getKey(name, key), value)
+    mapClient.put(getKey(name, key), getStoredMapValue(value))
     var keys = getMapKeys(name)
     keys += key
     putMapKeys(name, keys)
@@ -141,7 +144,7 @@ MapStorageBackend[Array[Byte], Array[Byte]] with
   def insertMapStorageEntriesFor(name: String, entries: List[(Array[Byte], Array[Byte])]) = {
     val newKeys = entries.map {
       case (key, value) => {
-        mapClient.put(getKey(name, key), value)
+        mapClient.put(getKey(name, key), getStoredMapValue(value))
         key
       }
     }
@@ -167,18 +170,21 @@ MapStorageBackend[Array[Byte], Array[Byte]] with
   def getVectorStorageRangeFor(name: String, start: Option[Int], finish: Option[Int], count: Int): List[Array[Byte]] = {
     val size = getVectorStorageSizeFor(name)
     val st = start.getOrElse(0)
-    val cnt =
+    var cnt =
     if (finish.isDefined) {
       val f = finish.get
       if (f >= st) (f - st) else count
     } else {
       count
     }
+    if (cnt > (size - st)) {
+      cnt = size - st
+    }
 
 
     val seq: IndexedSeq[Array[Byte]] = (st until st + cnt).map {
-      index => getIndexedKey(name, index)
-    }.reverse //read backwards
+      index => getIndexedKey(name, (size - 1) - index)
+    } //read backwards
 
     val all: JMap[Array[Byte], Versioned[Array[Byte]]] = vectorClient.getAll(JavaConversions.asIterable(seq))
 
@@ -200,18 +206,22 @@ MapStorageBackend[Array[Byte], Array[Byte]] with
 
   def getVectorStorageEntryFor(name: String, index: Int): Array[Byte] = {
     val size = getVectorStorageSizeFor(name)
-    if (size > 0) {
+    if (size > 0 && index < size) {
       vectorClient.getValue(getIndexedKey(name, /*read backwards*/ (size - 1) - index))
     } else {
-      Array.empty[Byte] //is this what to return?
+      throw new StorageException("In Vector:" + name + " No such Index:" + index)
     }
   }
 
   def updateVectorStorageEntryFor(name: String, index: Int, elem: Array[Byte]) = {
     val size = getVectorStorageSizeFor(name)
-    vectorClient.put(getIndexedKey(name, /*read backwards*/ (size - 1) - index), elem)
-    if (size < index + 1) {
-      vectorClient.put(getKey(name, vectorSizeIndex), IntSerializer.toBytes(index + 1))
+    if (size > 0 && index < size) {
+      elem match {
+        case null => vectorClient.delete(getIndexedKey(name, /*read backwards*/ (size - 1) - index))
+        case _ => vectorClient.put(getIndexedKey(name, /*read backwards*/ (size - 1) - index), elem)
+      }
+    } else {
+      throw new StorageException("In Vector:" + name + " No such Index:" + index)
     }
   }
 
@@ -219,7 +229,9 @@ MapStorageBackend[Array[Byte], Array[Byte]] with
     var size = getVectorStorageSizeFor(name)
     elements.foreach {
       element =>
-        vectorClient.put(getIndexedKey(name, size), element)
+        if (element != null) {
+          vectorClient.put(getIndexedKey(name, size), element)
+        }
         size += 1
     }
     vectorClient.put(getKey(name, vectorSizeIndex), IntSerializer.toBytes(size))
@@ -281,7 +293,10 @@ MapStorageBackend[Array[Byte], Array[Byte]] with
     val mdata = getQueueMetadata(name)
     if (mdata.canEnqueue) {
       val key = getIndexedKey(name, mdata.tail)
-      queueClient.put(key, item)
+      item match {
+        case null => queueClient.delete(key)
+        case _ => queueClient.put(key, item)
+      }
       queueClient.put(getKey(name, queueTailIndex), IntSerializer.toBytes(mdata.nextEnqueue))
       Some(mdata.size + 1)
     } else {
@@ -344,6 +359,32 @@ MapStorageBackend[Array[Byte], Array[Byte]] with
     mapkey
   }
 
+  //wrapper for null
+  def getStoredMapValue(value: Array[Byte]): Array[Byte] = {
+    value match {
+      case null => nullMapValue
+      case value => {
+        val stored = new Array[Byte](value.length + 1)
+        stored(0) = notNullMapValueHeader
+        System.arraycopy(value, 0, stored, 1, value.length)
+        stored
+      }
+    }
+  }
+
+  def getMapValueFromStored(value: Array[Byte]): Array[Byte] = {
+
+    if (value(0) == nullMapValueHeader) {
+      null
+    } else if (value(0) == notNullMapValueHeader) {
+      val returned = new Array[Byte](value.length - 1)
+      System.arraycopy(value, 1, returned, 0, value.length - 1)
+      returned
+    } else {
+      throw new StorageException("unknown header byte on map value:" + value(0))
+    }
+  }
+
 
   def getClientConfig(configMap: Map[String, String]): Properties = {
     val properites = new Properties
diff --git a/akka-persistence/akka-persistence-voldemort/src/test/scala/VoldemortPersistentDatastructureSuite.scala b/akka-persistence/akka-persistence-voldemort/src/test/scala/VoldemortPersistentDatastructureSuite.scala
deleted file mode 100644
index b283cad692..0000000000
--- a/akka-persistence/akka-persistence-voldemort/src/test/scala/VoldemortPersistentDatastructureSuite.scala
+++ /dev/null
@@ -1,101 +0,0 @@
-package se.scalablesolutions.akka.persistence.voldemort
-
-import org.scalatest.FunSuite
-import org.scalatest.matchers.ShouldMatchers
-import org.junit.runner.RunWith
-import org.scalatest.junit.JUnitRunner
-import se.scalablesolutions.akka.persistence.voldemort.VoldemortStorageBackend._
-import se.scalablesolutions.akka.actor.{newUuid, Uuid}
-import collection.immutable.TreeSet
-import VoldemortStorageBackendSuite._
-
-import se.scalablesolutions.akka.stm._
-import se.scalablesolutions.akka.stm.global._
-import se.scalablesolutions.akka.config.ScalaConfig._
-import se.scalablesolutions.akka.persistence.common._
-import se.scalablesolutions.akka.util.Logging
-import se.scalablesolutions.akka.config.Config.config
-
-@RunWith(classOf[JUnitRunner])
-class VoldemortPersistentDatastructureSuite extends FunSuite with ShouldMatchers with EmbeddedVoldemort with Logging {
-  test("persistentRefs work as expected") {
-    val name = newUuid.toString
-    val one = "one".getBytes
-    atomic {
-      val ref = VoldemortStorage.getRef(name)
-      ref.isDefined should be(false)
-      ref.swap(one)
-      ref.get match {
-        case Some(bytes) => bytes should be(one)
-        case None => true should be(false)
-      }
-    }
-    val two = "two".getBytes
-    atomic {
-      val ref = VoldemortStorage.getRef(name)
-      ref.isDefined should be(true)
-      ref.swap(two)
-      ref.get match {
-        case Some(bytes) => bytes should be(two)
-        case None => true should be(false)
-      }
-    }
-  }
-
-
-  test("Persistent Vectors function as expected") {
-    val name = newUuid.toString
-    val one = "one".getBytes
-    val two = "two".getBytes
-    atomic {
-      val vec = VoldemortStorage.getVector(name)
-      vec.add(one)
-    }
-    atomic {
-      val vec = VoldemortStorage.getVector(name)
-      vec.size should be(1)
-      vec.add(two)
-    }
-    atomic {
-      val vec = VoldemortStorage.getVector(name)
-
-      vec.get(0) should be(one)
-      vec.get(1) should be(two)
-      vec.size should be(2)
-      vec.update(0, two)
-    }
-
-    atomic {
-      val vec = VoldemortStorage.getVector(name)
-      vec.get(0) should be(two)
-      vec.get(1) should be(two)
-      vec.size should be(2)
-      vec.update(0, Array.empty[Byte])
-      vec.update(1, Array.empty[Byte])
-    }
-
-    atomic {
-      val vec = VoldemortStorage.getVector(name)
-      vec.get(0) should be(Array.empty[Byte])
-      vec.get(1) should be(Array.empty[Byte])
-      vec.size should be(2)
-    }
-
-
-  }
-
-  test("Persistent Maps work as expected") {
-    atomic {
-      val map = VoldemortStorage.getMap("map")
-      map.put("mapTest".getBytes, null)
-    }
-
-    atomic {
-      val map = VoldemortStorage.getMap("map")
-      map.get("mapTest".getBytes).get should be(null)
-    }
-
-
-  }
-
-}
\ No newline at end of file
diff --git a/akka-persistence/akka-persistence-voldemort/src/test/scala/VoldemortStorageBackendSuite.scala b/akka-persistence/akka-persistence-voldemort/src/test/scala/VoldemortStorageBackendSuite.scala
index 8ac3d306c4..b28ea90171 100644
--- a/akka-persistence/akka-persistence-voldemort/src/test/scala/VoldemortStorageBackendSuite.scala
+++ b/akka-persistence/akka-persistence-voldemort/src/test/scala/VoldemortStorageBackendSuite.scala
@@ -103,10 +103,7 @@ class VoldemortStorageBackendSuite extends FunSuite with ShouldMatchers with Emb
     vectorClient.delete(getKey(key, vectorSizeIndex))
     vectorClient.delete(getIndexedKey(key, 0))
     vectorClient.delete(getIndexedKey(key, 1))
-    getVectorStorageEntryFor(key, 0) should be(empty)
-    getVectorStorageEntryFor(key, 1) should be(empty)
-    getVectorStorageRangeFor(key, None, None, 1).head should be(empty)
-
+    
     insertVectorStorageEntryFor(key, value)
     //again
     insertVectorStorageEntryFor(key, value)

From 0fc3f2fbd4772e9041bea814d41ea6b6eb25a077 Mon Sep 17 00:00:00 2001
From: ticktock 
Date: Mon, 4 Oct 2010 19:47:23 -0400
Subject: [PATCH 12/12] porting a ticket 450 change over

---
 .../src/test/scala/Ticket343Test.scala                        | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/akka-persistence/akka-persistence-common/src/test/scala/Ticket343Test.scala b/akka-persistence/akka-persistence-common/src/test/scala/Ticket343Test.scala
index eb724144a2..14eba7d4e3 100644
--- a/akka-persistence/akka-persistence-common/src/test/scala/Ticket343Test.scala
+++ b/akka-persistence/akka-persistence-common/src/test/scala/Ticket343Test.scala
@@ -44,7 +44,7 @@ object StorageObj {
   var getVector: String => PersistentVector[Array[Byte]] = _
 
   class SampleMapStorage extends Actor {
-    self.lifeCycle = Some(LifeCycle(Permanent))
+    self.lifeCycle = Permanent
     val FOO_MAP = "akka.sample.map"
 
     private var fooMap = atomic {StorageObj.getMap(FOO_MAP)}
@@ -134,7 +134,7 @@ object StorageObj {
   }
 
   class SampleVectorStorage extends Actor {
-    self.lifeCycle = Some(LifeCycle(Permanent))
+    self.lifeCycle = Permanent
     val FOO_VECTOR = "akka.sample.vector"
 
     private var fooVector = atomic {StorageObj.getVector(FOO_VECTOR)}