diff --git a/akka-persistence/akka-persistence-common/src/main/scala/CommonStorageBackend.scala b/akka-persistence/akka-persistence-common/src/main/scala/CommonStorageBackend.scala index 42e5fa8819..2dd9147d74 100644 --- a/akka-persistence/akka-persistence-common/src/main/scala/CommonStorageBackend.scala +++ b/akka-persistence/akka-persistence-common/src/main/scala/CommonStorageBackend.scala @@ -14,9 +14,11 @@ import collection.immutable._ private[akka] trait CommonStorageBackendAccess { + import CommonStorageBackend._ /*abstract*/ + def getValue(owner: String, key: Array[Byte], default: Array[Byte]): Array[Byte] def put(owner: String, key: Array[Byte], value: Array[Byte]): Unit @@ -28,6 +30,7 @@ private[akka] trait CommonStorageBackendAccess { def drop(): Unit /*concrete*/ + def decodeKey(owner: String, key: Array[Byte]) = key def delete(owner: String, index: Int): Unit = delete(owner, IntSerializer.toBytes(index)) @@ -40,6 +43,7 @@ private[akka] trait CommonStorageBackendAccess { } private[akka] trait KVStorageBackendAccess extends CommonStorageBackendAccess { + import CommonStorageBackend._ import KVStorageBackend._ @@ -85,7 +89,7 @@ private[akka] trait KVStorageBackendAccess extends CommonStorageBackendAccess { override def getAll(owner: String, keys: Iterable[Array[Byte]]): Map[Array[Byte], Array[Byte]] = { - getAll(keys.map { + getAll(keys.map{ getKey(owner, _) }) } @@ -109,8 +113,25 @@ private[akka] object CommonStorageBackend { val nullMapValueHeader = 0x00.byteValue val nullMapValue: Array[Byte] = Array(nullMapValueHeader) val notNullMapValueHeader: Byte = 0xff.byteValue + val mapKeySetKeyHeader = 0x00.byteValue + val mapKeyHeader = 0xff.byteValue + val mapKeysIndex: Array[Byte] = new Array[Byte](1).padTo(1, mapKeySetKeyHeader) + val mapKeysWrapperPad: Array[Byte] = new Array[Byte](1).padTo(1, mapKeyHeader) - def getStoredMapValue(value: Array[Byte]): Array[Byte] = { + def wrapMapKey(key: Array[Byte]): Array[Byte] = { + val wrapped = new Array[Byte](key.length + mapKeysWrapperPad.length) + System.arraycopy(mapKeysWrapperPad, 0, wrapped, 0, mapKeysWrapperPad.length) + System.arraycopy(key, 0, wrapped, mapKeysWrapperPad.length, key.length) + wrapped + } + + def unwrapMapKey(key: Array[Byte]): Array[Byte] = { + val unwrapped = new Array[Byte](key.length - mapKeysWrapperPad.length) + System.arraycopy(key, mapKeysWrapperPad.length, unwrapped, 0, unwrapped.length) + unwrapped + } + + def getStoredMapValue(value: Array[Byte]): Array[Byte] = { value match { case null => nullMapValue case value => { @@ -190,7 +211,9 @@ private[akka] object CommonStorageBackend { } private[akka] object KVStorageBackend { - import CommonStorageBackend._ + + import CommonStorageBackend._ + /** * Concat the ownerlenght+owner+key+ of owner so owned data will be colocated * Store the length of owner as first byte to work around the rare case @@ -214,8 +237,9 @@ private[akka] object KVStorageBackend { } private[akka] trait CommonStorageBackend extends MapStorageBackend[Array[Byte], Array[Byte]] with VectorStorageBackend[Array[Byte]] with RefStorageBackend[Array[Byte]] with QueueStorageBackend[Array[Byte]] with Logging { + import CommonStorageBackend._ - val mapKeysIndex:Array[Byte] = new Array[Byte](1).padTo(1,1.asInstanceOf[Byte]) + val vectorHeadIndex = IntSerializer.toBytes(-1) val vectorTailIndex = IntSerializer.toBytes(-2) val queueHeadIndex = IntSerializer.toBytes(-1) @@ -261,15 +285,15 @@ private[akka] trait CommonStorageBackend extends MapStorageBackend[Array[Byte], private def getKeyValues(name: String, keys: SortedSet[Array[Byte]]): List[(Array[Byte], Array[Byte])] = { val all: Map[Array[Byte], Array[Byte]] = - mapAccess.getAll(name, keys) + mapAccess.getAll(name, keys) var returned = new TreeMap[Array[Byte], Array[Byte]]()(ordering) - all.foreach { + all.foreach{ (entry) => { entry match { case (namePlusKey: Array[Byte], value: Array[Byte]) => { //need to fix here - returned += mapAccess.decodeKey(name, namePlusKey) -> getMapValueFromStored(value) + returned += mapAccess.decodeKey(name, unwrapMapKey(namePlusKey)) -> getMapValueFromStored(value) } } } @@ -283,7 +307,7 @@ private[akka] trait CommonStorageBackend extends MapStorageBackend[Array[Byte], } def getMapStorageEntryFor(name: String, key: Array[Byte]): Option[Array[Byte]] = { - val result: Array[Byte] = mapAccess.getValue(name, key) + val result: Array[Byte] = mapAccess.getValue(name, wrapMapKey(key)) result match { case null => None case _ => Some(getMapValueFromStored(result)) @@ -291,15 +315,16 @@ private[akka] trait CommonStorageBackend extends MapStorageBackend[Array[Byte], } def removeMapStorageFor(name: String, key: Array[Byte]) = { + val wrapped = wrapMapKey(key) var keys = getMapKeys(name) - keys -= key + keys -= wrapped putMapKeys(name, keys) - mapAccess.delete(name, key) + mapAccess.delete(name, wrapped) } def removeMapStorageFor(name: String) = { val keys = getMapKeys(name) - keys.foreach { + keys.foreach{ key => mapAccess.delete(name, key) log.debug("deleted key %s for %s", key, name) @@ -308,17 +333,19 @@ private[akka] trait CommonStorageBackend extends MapStorageBackend[Array[Byte], } def insertMapStorageEntryFor(name: String, key: Array[Byte], value: Array[Byte]) = { - mapAccess.put(name, key, getStoredMapValue(value)) + val wrapped = wrapMapKey(key) + mapAccess.put(name, wrapped, getStoredMapValue(value)) var keys = getMapKeys(name) - keys += key + keys += wrapped putMapKeys(name, keys) } def insertMapStorageEntriesFor(name: String, entries: List[(Array[Byte], Array[Byte])]) = { - val newKeys = entries.map { + val newKeys = entries.map{ case (key, value) => { - mapAccess.put(name, key, getStoredMapValue(value)) - key + val wrapped = wrapMapKey(key) + mapAccess.put(name, wrapped, getStoredMapValue(value)) + wrapped } } var keys = getMapKeys(name) @@ -334,6 +361,7 @@ private[akka] trait CommonStorageBackend extends MapStorageBackend[Array[Byte], SortedSetSerializer.fromBytes(mapAccess.getValue(name, mapKeysIndex, Array.empty[Byte])) } + def getVectorStorageSizeFor(name: String): Int = { getVectorMetadata(name).size } @@ -343,12 +371,12 @@ private[akka] trait CommonStorageBackend extends MapStorageBackend[Array[Byte], val st = start.getOrElse(0) var cnt = - if (finish.isDefined) { - val f = finish.get - if (f >= st) (f - st) else count - } else { - count - } + if (finish.isDefined) { + val f = finish.get + if (f >= st) (f - st) else count + } else { + count + } if (cnt > (mdata.size - st)) { cnt = mdata.size - st } @@ -384,7 +412,7 @@ private[akka] trait CommonStorageBackend extends MapStorageBackend[Array[Byte], } def insertVectorStorageEntriesFor(name: String, elements: List[Array[Byte]]) = { - elements.foreach { + elements.foreach{ insertVectorStorageEntryFor(name, _) } diff --git a/akka-persistence/akka-persistence-common/src/test/scala/MapStorageBackendTest.scala b/akka-persistence/akka-persistence-common/src/test/scala/MapStorageBackendTest.scala index 51f3de40d0..4900ea7695 100644 --- a/akka-persistence/akka-persistence-common/src/test/scala/MapStorageBackendTest.scala +++ b/akka-persistence/akka-persistence-common/src/test/scala/MapStorageBackendTest.scala @@ -69,7 +69,7 @@ trait MapStorageBackendTest extends Spec with ShouldMatchers with BeforeAndAfter it("should insert multiple map storage elements properly") { val mapName = "insertMultipleTest" val rand = new Random(3).nextInt(100) - val entries = (1 to rand).toList.map { + val entries = (1 to rand).toList.map{ index => (("insertMultipleTestKey" + index).getBytes -> ("insertMutlipleTestValue" + index).getBytes) } @@ -97,7 +97,7 @@ trait MapStorageBackendTest extends Spec with ShouldMatchers with BeforeAndAfter it("should accurately track the number of key value pairs in a map") { val mapName = "sizeTest" val rand = new Random(3).nextInt(100) - val entries = (1 to rand).toList.map { + val entries = (1 to rand).toList.map{ index => (("sizeTestKey" + index).getBytes -> ("sizeTestValue" + index).getBytes) } @@ -112,7 +112,7 @@ trait MapStorageBackendTest extends Spec with ShouldMatchers with BeforeAndAfter val mapName = "allTest" val rand = new Random(3).nextInt(100) var entries = new TreeMap[Array[Byte], Array[Byte]]()(ArrayOrdering) - (1 to rand).foreach { + (1 to rand).foreach{ index => entries += (("allTestKey" + index).getBytes -> ("allTestValue" + index).getBytes) } @@ -124,12 +124,20 @@ trait MapStorageBackendTest extends Spec with ShouldMatchers with BeforeAndAfter - val entryMap = new HashMap[String, String] ++ entries.map {_ match {case (k, v) => (new String(k), new String(v))}} - val retrievedMap = new HashMap[String, String] ++ entries.map {_ match {case (k, v) => (new String(k), new String(v))}} + val entryMap = new HashMap[String, String] ++ entries.map{ + _ match { + case (k, v) => (new String(k), new String(v)) + } + } + val retrievedMap = new HashMap[String, String] ++ entries.map{ + _ match { + case (k, v) => (new String(k), new String(v)) + } + } entryMap should equal(retrievedMap) - (0 until rand).foreach { + (0 until rand).foreach{ i: Int => { new String(entries.toList(i)._1) should be(new String(retrieved(i)._1)) } @@ -155,6 +163,14 @@ trait MapStorageBackendTest extends Spec with ShouldMatchers with BeforeAndAfter storage.getMapStorageSizeFor("nonExistent") should be(0) } + it("should not stomp on the map keyset when a map key of 0xff is used") { + val mapName = "keySetStomp" + val key = CommonStorageBackend.mapKeysIndex + storage.insertMapStorageEntryFor(mapName, key, key) + storage.getMapStorageSizeFor(mapName) should be(1) + storage.getMapStorageEntryFor(mapName,key).get should be (key) + } + }