diff --git a/akka-persistence/akka-persistence-common/src/main/scala/KVStorageBackend.scala b/akka-persistence/akka-persistence-common/src/main/scala/KVStorageBackend.scala index f32cd269a3..25e0f9ee81 100644 --- a/akka-persistence/akka-persistence-common/src/main/scala/KVStorageBackend.scala +++ b/akka-persistence/akka-persistence-common/src/main/scala/KVStorageBackend.scala @@ -245,7 +245,7 @@ private[akka] trait KVStorageBackend extends MapStorageBackend[Array[Byte], Arra try{ vectorAccess.delete(key) } catch { - case e:Exception => log.warn("Exception while trying to clean up a poped element from the vector, this is acceptable") + case e:Exception => log.warn("Exception while trying to clean up a popped element from the vector, this is acceptable") } } else { @@ -253,6 +253,9 @@ private[akka] trait KVStorageBackend extends MapStorageBackend[Array[Byte], Arra } } + + override def supportsRemoveVectorStorageEntry = true + def getVectorMetadata(name: String): VectorMetadata = { val keys = List(getKey(name, vectorHeadIndex), getKey(name, vectorTailIndex)) val vdata = vectorAccess.getAll(keys) diff --git a/akka-persistence/akka-persistence-common/src/main/scala/Storage.scala b/akka-persistence/akka-persistence-common/src/main/scala/Storage.scala index e4eeca5aba..76bacde546 100644 --- a/akka-persistence/akka-persistence-common/src/main/scala/Storage.scala +++ b/akka-persistence/akka-persistence-common/src/main/scala/Storage.scala @@ -516,10 +516,14 @@ trait PersistentVector[T] extends IndexedSeq[T] with Transactional with Committa * Removes the tail element of this vector. */ def pop: T = { - register - val curr = replay - appendOnlyTxLog + LogEntry(None, None, POP) - curr.last + if(storage.supportsRemoveVectorStorageEntry){ + register + val curr = replay + appendOnlyTxLog + LogEntry(None, None, POP) + curr.last + } else { + throw new UnsupportedOperationException("Vector pop is not supported by the current backend") + } } def update(index: Int, newElem: T) = { diff --git a/akka-persistence/akka-persistence-common/src/main/scala/StorageBackend.scala b/akka-persistence/akka-persistence-common/src/main/scala/StorageBackend.scala index 01e14ae595..0f1764774a 100644 --- a/akka-persistence/akka-persistence-common/src/main/scala/StorageBackend.scala +++ b/akka-persistence/akka-persistence-common/src/main/scala/StorageBackend.scala @@ -28,10 +28,10 @@ trait VectorStorageBackend[T] extends StorageBackend { def getVectorStorageRangeFor(name: String, start: Option[Int], finish: Option[Int], count: Int): List[T] def getVectorStorageSizeFor(name: String): Int def removeVectorStorageEntryFor(name:String):Unit = { - //Unfortunately this is thrown on commit, not at the time of the call to VectorStorage.pop - //Should we add a supportsRemove method that allows an early throw of the exception? + //should remove the "tail" if supported throw new UnsupportedOperationException("VectorStorageBackend.removeVectorStorageEntry is not supported") } + def supportsRemoveVectorStorageEntry:Boolean = false //Allows the Vector frontend to fail on calls to pop, instead of at commit time } // for Ref diff --git a/akka-persistence/akka-persistence-common/src/test/scala/Ticket343Test.scala b/akka-persistence/akka-persistence-common/src/test/scala/Ticket343Test.scala index 2f42d4b2b2..9c952f4f92 100644 --- a/akka-persistence/akka-persistence-common/src/test/scala/Ticket343Test.scala +++ b/akka-persistence/akka-persistence-common/src/test/scala/Ticket343Test.scala @@ -306,11 +306,13 @@ Spec with val proc = actorOf[SampleVectorStorage] proc.start - // add 4 elements in separate transactions - (proc !! VADD("debasish")).getOrElse("VADD failed") should equal(1) - (proc !! VADD("maulindu")).getOrElse("VADD failed") should equal(2) - (proc !! VADD("ramanendu")).getOrElse("VADD failed") should equal(3) - (proc !! VADD("nilanjan")).getOrElse("VADD failed") should equal(4) + // add 4 elements in separate transactions //also test add + pop of a 5th element + (proc !! VADD("ticktock")).getOrElse("VADD failed") should equal(1) + (proc !! VADD("debasish")).getOrElse("VADD failed") should equal(2) + (proc !! VADD("maulindu")).getOrElse("VADD failed") should equal(3) + (proc !! VADD("ramanendu")).getOrElse("VADD failed") should equal(4) + (proc !! VADD("nilanjan")).getOrElse("VADD failed") should equal(5) + (proc !! VPOP).getOrElse("VPOP failed") should equal("ticktock".getBytes) new String((proc !! VGET(0)).get.asInstanceOf[Array[Byte]]) should equal("nilanjan") new String((proc !! VGET(1)).get.asInstanceOf[Array[Byte]]) should equal("ramanendu") diff --git a/akka-persistence/akka-persistence-common/src/test/scala/VectorStorageBackendTest.scala b/akka-persistence/akka-persistence-common/src/test/scala/VectorStorageBackendTest.scala index db0f65bb13..f31fc9a8f0 100644 --- a/akka-persistence/akka-persistence-common/src/test/scala/VectorStorageBackendTest.scala +++ b/akka-persistence/akka-persistence-common/src/test/scala/VectorStorageBackendTest.scala @@ -29,16 +29,19 @@ trait VectorStorageBackendTest extends Spec with ShouldMatchers with BeforeAndAf } - describe("A Properly functioning VectorStorageBackend") { it("should insertVectorStorageEntry as a logical prepend operation to the existing list") { val vector = "insertSingleTest" val rand = new Random(3).nextInt(100) - val values = (0 to rand).toList.map {i: Int => vector + "value" + i} + val values = (0 to rand).toList.map{ + i: Int => vector + "value" + i + } storage.getVectorStorageSizeFor(vector) should be(0) - values.foreach {s: String => storage.insertVectorStorageEntryFor(vector, s.getBytes)} + values.foreach{ + s: String => storage.insertVectorStorageEntryFor(vector, s.getBytes) + } val shouldRetrieve = values.reverse - (0 to rand).foreach { + (0 to rand).foreach{ i: Int => { shouldRetrieve(i) should be(new String(storage.getVectorStorageEntryFor(vector, i))) } @@ -48,11 +51,15 @@ trait VectorStorageBackendTest extends Spec with ShouldMatchers with BeforeAndAf it("should insertVectorStorageEntries as a logical prepend operation to the existing list") { val vector = "insertMultiTest" val rand = new Random(3).nextInt(100) - val values = (0 to rand).toList.map {i: Int => vector + "value" + i} + val values = (0 to rand).toList.map{ + i: Int => vector + "value" + i + } storage.getVectorStorageSizeFor(vector) should be(0) - storage.insertVectorStorageEntriesFor(vector, values.map {s: String => s.getBytes}) + storage.insertVectorStorageEntriesFor(vector, values.map{ + s: String => s.getBytes + }) val shouldRetrieve = values.reverse - (0 to rand).foreach { + (0 to rand).foreach{ i: Int => { shouldRetrieve(i) should be(new String(storage.getVectorStorageEntryFor(vector, i))) } @@ -62,9 +69,13 @@ trait VectorStorageBackendTest extends Spec with ShouldMatchers with BeforeAndAf it("should successfully update entries") { val vector = "updateTest" val rand = new Random(3).nextInt(100) - val values = (0 to rand).toList.map {i: Int => vector + "value" + i} + val values = (0 to rand).toList.map{ + i: Int => vector + "value" + i + } val urand = new Random(3).nextInt(rand) - storage.insertVectorStorageEntriesFor(vector, values.map {s: String => s.getBytes}) + storage.insertVectorStorageEntriesFor(vector, values.map{ + s: String => s.getBytes + }) val toUpdate = "updated" + values.reverse(urand) storage.updateVectorStorageEntryFor(vector, urand, toUpdate.getBytes) toUpdate should be(new String(storage.getVectorStorageEntryFor(vector, urand))) @@ -73,9 +84,13 @@ trait VectorStorageBackendTest extends Spec with ShouldMatchers with BeforeAndAf it("should return the correct value from getVectorStorageFor") { val vector = "getTest" val rand = new Random(3).nextInt(100) - val values = (0 to rand).toList.map {i: Int => vector + "value" + i} + val values = (0 to rand).toList.map{ + i: Int => vector + "value" + i + } val urand = new Random(3).nextInt(rand) - storage.insertVectorStorageEntriesFor(vector, values.map {s: String => s.getBytes}) + storage.insertVectorStorageEntriesFor(vector, values.map{ + s: String => s.getBytes + }) values.reverse(urand) should be(new String(storage.getVectorStorageEntryFor(vector, urand))) } @@ -83,35 +98,60 @@ trait VectorStorageBackendTest extends Spec with ShouldMatchers with BeforeAndAf val vector = "getTest" val rand = new Random(3).nextInt(100) val drand = new Random(3).nextInt(rand) - val values = (0 to rand).toList.map {i: Int => vector + "value" + i} - storage.insertVectorStorageEntriesFor(vector, values.map {s: String => s.getBytes}) - values.reverse should be(storage.getVectorStorageRangeFor(vector, None, None, rand + 1).map {b: Array[Byte] => new String(b)}) - (0 to drand).foreach { + val values = (0 to rand).toList.map{ + i: Int => vector + "value" + i + } + storage.insertVectorStorageEntriesFor(vector, values.map{ + s: String => s.getBytes + }) + values.reverse should be(storage.getVectorStorageRangeFor(vector, None, None, rand + 1).map{ + b: Array[Byte] => new String(b) + }) + (0 to drand).foreach{ i: Int => { val value: String = vector + "value" + (rand - i) log.debug(value) - List(value) should be(storage.getVectorStorageRangeFor(vector, Some(i), None, 1).map {b: Array[Byte] => new String(b)}) + List(value) should be(storage.getVectorStorageRangeFor(vector, Some(i), None, 1).map{ + b: Array[Byte] => new String(b) + }) } } } - it("should support remove properly"){ - val vector = "removeTest" - val rand = new Random(3).nextInt(100) - val values = (0 to rand).toList.map {i: Int => vector + "value" + i} - storage.insertVectorStorageEntriesFor(vector, values.map {s: String => s.getBytes}) - storage.getVectorStorageSizeFor(vector) should be (values.size) - values.foreach{ - s => storage.removeVectorStorageEntryFor(vector) + it("should support remove properly") { + if (storage.supportsRemoveVectorStorageEntry) { + val vector = "removeTest" + val rand = new Random(3).nextInt(100) + val values = (0 to rand).toList.map{ + i: Int => vector + "value" + i + } + storage.insertVectorStorageEntriesFor(vector, values.map{ + s: String => s.getBytes + }) + storage.getVectorStorageSizeFor(vector) should be(values.size) + (1 to rand).foreach{ + i: Int => { + storage.removeVectorStorageEntryFor(vector) + values.reverse.dropRight(i) should be(storage.getVectorStorageRangeFor(vector, None, None, rand + 1 - i).map{ + b: Array[Byte] => new String(b) + }) + } + + } + storage.removeVectorStorageEntryFor(vector) + storage.getVectorStorageSizeFor(vector) should be(0) + storage.insertVectorStorageEntriesFor(vector, values.map{ + s: String => s.getBytes + }) + storage.getVectorStorageSizeFor(vector) should be(values.size) + values.foreach{ + s => storage.removeVectorStorageEntryFor(vector) + } + storage.getVectorStorageSizeFor(vector) should be(0) + } else { + log.warn("The current backend being tested does not support removeVectorStorageEntryFor") } - storage.getVectorStorageSizeFor(vector) should be(0) - storage.insertVectorStorageEntriesFor(vector, values.map {s: String => s.getBytes}) - storage.getVectorStorageSizeFor(vector) should be (values.size) - values.foreach{ - s=> storage.removeVectorStorageEntryFor(vector) - } - storage.getVectorStorageSizeFor(vector) should be(0) } @@ -129,13 +169,17 @@ trait VectorStorageBackendTest extends Spec with ShouldMatchers with BeforeAndAf it("shoud throw a Storage exception when there is an attempt to retrieve an index larger than the Vector") { val vector = "tooLargeRetrieve" storage.insertVectorStorageEntryFor(vector, null) - evaluating {storage.getVectorStorageEntryFor(vector, 9)} should produce[StorageException] + evaluating{ + storage.getVectorStorageEntryFor(vector, 9) + } should produce[StorageException] } it("shoud throw a Storage exception when there is an attempt to update an index larger than the Vector") { val vector = "tooLargeUpdate" storage.insertVectorStorageEntryFor(vector, null) - evaluating {storage.updateVectorStorageEntryFor(vector, 9, null)} should produce[StorageException] + evaluating{ + storage.updateVectorStorageEntryFor(vector, 9, null) + } should produce[StorageException] } }