Tweaking the encoding of map keys so that there is no possibility of stomping on the key used to hold the maps keyset
This commit is contained in:
parent
ad6252c023
commit
ba4b240122
2 changed files with 73 additions and 29 deletions
|
|
@ -14,9 +14,11 @@ import collection.immutable._
|
|||
|
||||
|
||||
private[akka] trait CommonStorageBackendAccess {
|
||||
|
||||
import CommonStorageBackend._
|
||||
|
||||
/*abstract*/
|
||||
|
||||
def getValue(owner: String, key: Array[Byte], default: Array[Byte]): Array[Byte]
|
||||
|
||||
def put(owner: String, key: Array[Byte], value: Array[Byte]): Unit
|
||||
|
|
@ -28,6 +30,7 @@ private[akka] trait CommonStorageBackendAccess {
|
|||
def drop(): Unit
|
||||
|
||||
/*concrete*/
|
||||
|
||||
def decodeKey(owner: String, key: Array[Byte]) = key
|
||||
|
||||
def delete(owner: String, index: Int): Unit = delete(owner, IntSerializer.toBytes(index))
|
||||
|
|
@ -40,6 +43,7 @@ private[akka] trait CommonStorageBackendAccess {
|
|||
}
|
||||
|
||||
private[akka] trait KVStorageBackendAccess extends CommonStorageBackendAccess {
|
||||
|
||||
import CommonStorageBackend._
|
||||
import KVStorageBackend._
|
||||
|
||||
|
|
@ -85,7 +89,7 @@ private[akka] trait KVStorageBackendAccess extends CommonStorageBackendAccess {
|
|||
|
||||
|
||||
override def getAll(owner: String, keys: Iterable[Array[Byte]]): Map[Array[Byte], Array[Byte]] = {
|
||||
getAll(keys.map {
|
||||
getAll(keys.map{
|
||||
getKey(owner, _)
|
||||
})
|
||||
}
|
||||
|
|
@ -109,8 +113,25 @@ private[akka] object CommonStorageBackend {
|
|||
val nullMapValueHeader = 0x00.byteValue
|
||||
val nullMapValue: Array[Byte] = Array(nullMapValueHeader)
|
||||
val notNullMapValueHeader: Byte = 0xff.byteValue
|
||||
val mapKeySetKeyHeader = 0x00.byteValue
|
||||
val mapKeyHeader = 0xff.byteValue
|
||||
val mapKeysIndex: Array[Byte] = new Array[Byte](1).padTo(1, mapKeySetKeyHeader)
|
||||
val mapKeysWrapperPad: Array[Byte] = new Array[Byte](1).padTo(1, mapKeyHeader)
|
||||
|
||||
def getStoredMapValue(value: Array[Byte]): Array[Byte] = {
|
||||
def wrapMapKey(key: Array[Byte]): Array[Byte] = {
|
||||
val wrapped = new Array[Byte](key.length + mapKeysWrapperPad.length)
|
||||
System.arraycopy(mapKeysWrapperPad, 0, wrapped, 0, mapKeysWrapperPad.length)
|
||||
System.arraycopy(key, 0, wrapped, mapKeysWrapperPad.length, key.length)
|
||||
wrapped
|
||||
}
|
||||
|
||||
def unwrapMapKey(key: Array[Byte]): Array[Byte] = {
|
||||
val unwrapped = new Array[Byte](key.length - mapKeysWrapperPad.length)
|
||||
System.arraycopy(key, mapKeysWrapperPad.length, unwrapped, 0, unwrapped.length)
|
||||
unwrapped
|
||||
}
|
||||
|
||||
def getStoredMapValue(value: Array[Byte]): Array[Byte] = {
|
||||
value match {
|
||||
case null => nullMapValue
|
||||
case value => {
|
||||
|
|
@ -190,7 +211,9 @@ private[akka] object CommonStorageBackend {
|
|||
}
|
||||
|
||||
private[akka] object KVStorageBackend {
|
||||
import CommonStorageBackend._
|
||||
|
||||
import CommonStorageBackend._
|
||||
|
||||
/**
|
||||
* Concat the ownerlenght+owner+key+ of owner so owned data will be colocated
|
||||
* Store the length of owner as first byte to work around the rare case
|
||||
|
|
@ -214,8 +237,9 @@ private[akka] object KVStorageBackend {
|
|||
}
|
||||
|
||||
private[akka] trait CommonStorageBackend extends MapStorageBackend[Array[Byte], Array[Byte]] with VectorStorageBackend[Array[Byte]] with RefStorageBackend[Array[Byte]] with QueueStorageBackend[Array[Byte]] with Logging {
|
||||
|
||||
import CommonStorageBackend._
|
||||
val mapKeysIndex:Array[Byte] = new Array[Byte](1).padTo(1,1.asInstanceOf[Byte])
|
||||
|
||||
val vectorHeadIndex = IntSerializer.toBytes(-1)
|
||||
val vectorTailIndex = IntSerializer.toBytes(-2)
|
||||
val queueHeadIndex = IntSerializer.toBytes(-1)
|
||||
|
|
@ -261,15 +285,15 @@ private[akka] trait CommonStorageBackend extends MapStorageBackend[Array[Byte],
|
|||
|
||||
private def getKeyValues(name: String, keys: SortedSet[Array[Byte]]): List[(Array[Byte], Array[Byte])] = {
|
||||
val all: Map[Array[Byte], Array[Byte]] =
|
||||
mapAccess.getAll(name, keys)
|
||||
mapAccess.getAll(name, keys)
|
||||
|
||||
var returned = new TreeMap[Array[Byte], Array[Byte]]()(ordering)
|
||||
all.foreach {
|
||||
all.foreach{
|
||||
(entry) => {
|
||||
entry match {
|
||||
case (namePlusKey: Array[Byte], value: Array[Byte]) => {
|
||||
//need to fix here
|
||||
returned += mapAccess.decodeKey(name, namePlusKey) -> getMapValueFromStored(value)
|
||||
returned += mapAccess.decodeKey(name, unwrapMapKey(namePlusKey)) -> getMapValueFromStored(value)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -283,7 +307,7 @@ private[akka] trait CommonStorageBackend extends MapStorageBackend[Array[Byte],
|
|||
}
|
||||
|
||||
def getMapStorageEntryFor(name: String, key: Array[Byte]): Option[Array[Byte]] = {
|
||||
val result: Array[Byte] = mapAccess.getValue(name, key)
|
||||
val result: Array[Byte] = mapAccess.getValue(name, wrapMapKey(key))
|
||||
result match {
|
||||
case null => None
|
||||
case _ => Some(getMapValueFromStored(result))
|
||||
|
|
@ -291,15 +315,16 @@ private[akka] trait CommonStorageBackend extends MapStorageBackend[Array[Byte],
|
|||
}
|
||||
|
||||
def removeMapStorageFor(name: String, key: Array[Byte]) = {
|
||||
val wrapped = wrapMapKey(key)
|
||||
var keys = getMapKeys(name)
|
||||
keys -= key
|
||||
keys -= wrapped
|
||||
putMapKeys(name, keys)
|
||||
mapAccess.delete(name, key)
|
||||
mapAccess.delete(name, wrapped)
|
||||
}
|
||||
|
||||
def removeMapStorageFor(name: String) = {
|
||||
val keys = getMapKeys(name)
|
||||
keys.foreach {
|
||||
keys.foreach{
|
||||
key =>
|
||||
mapAccess.delete(name, key)
|
||||
log.debug("deleted key %s for %s", key, name)
|
||||
|
|
@ -308,17 +333,19 @@ private[akka] trait CommonStorageBackend extends MapStorageBackend[Array[Byte],
|
|||
}
|
||||
|
||||
def insertMapStorageEntryFor(name: String, key: Array[Byte], value: Array[Byte]) = {
|
||||
mapAccess.put(name, key, getStoredMapValue(value))
|
||||
val wrapped = wrapMapKey(key)
|
||||
mapAccess.put(name, wrapped, getStoredMapValue(value))
|
||||
var keys = getMapKeys(name)
|
||||
keys += key
|
||||
keys += wrapped
|
||||
putMapKeys(name, keys)
|
||||
}
|
||||
|
||||
def insertMapStorageEntriesFor(name: String, entries: List[(Array[Byte], Array[Byte])]) = {
|
||||
val newKeys = entries.map {
|
||||
val newKeys = entries.map{
|
||||
case (key, value) => {
|
||||
mapAccess.put(name, key, getStoredMapValue(value))
|
||||
key
|
||||
val wrapped = wrapMapKey(key)
|
||||
mapAccess.put(name, wrapped, getStoredMapValue(value))
|
||||
wrapped
|
||||
}
|
||||
}
|
||||
var keys = getMapKeys(name)
|
||||
|
|
@ -334,6 +361,7 @@ private[akka] trait CommonStorageBackend extends MapStorageBackend[Array[Byte],
|
|||
SortedSetSerializer.fromBytes(mapAccess.getValue(name, mapKeysIndex, Array.empty[Byte]))
|
||||
}
|
||||
|
||||
|
||||
def getVectorStorageSizeFor(name: String): Int = {
|
||||
getVectorMetadata(name).size
|
||||
}
|
||||
|
|
@ -343,12 +371,12 @@ private[akka] trait CommonStorageBackend extends MapStorageBackend[Array[Byte],
|
|||
|
||||
val st = start.getOrElse(0)
|
||||
var cnt =
|
||||
if (finish.isDefined) {
|
||||
val f = finish.get
|
||||
if (f >= st) (f - st) else count
|
||||
} else {
|
||||
count
|
||||
}
|
||||
if (finish.isDefined) {
|
||||
val f = finish.get
|
||||
if (f >= st) (f - st) else count
|
||||
} else {
|
||||
count
|
||||
}
|
||||
if (cnt > (mdata.size - st)) {
|
||||
cnt = mdata.size - st
|
||||
}
|
||||
|
|
@ -384,7 +412,7 @@ private[akka] trait CommonStorageBackend extends MapStorageBackend[Array[Byte],
|
|||
}
|
||||
|
||||
def insertVectorStorageEntriesFor(name: String, elements: List[Array[Byte]]) = {
|
||||
elements.foreach {
|
||||
elements.foreach{
|
||||
insertVectorStorageEntryFor(name, _)
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -69,7 +69,7 @@ trait MapStorageBackendTest extends Spec with ShouldMatchers with BeforeAndAfter
|
|||
it("should insert multiple map storage elements properly") {
|
||||
val mapName = "insertMultipleTest"
|
||||
val rand = new Random(3).nextInt(100)
|
||||
val entries = (1 to rand).toList.map {
|
||||
val entries = (1 to rand).toList.map{
|
||||
index =>
|
||||
(("insertMultipleTestKey" + index).getBytes -> ("insertMutlipleTestValue" + index).getBytes)
|
||||
}
|
||||
|
|
@ -97,7 +97,7 @@ trait MapStorageBackendTest extends Spec with ShouldMatchers with BeforeAndAfter
|
|||
it("should accurately track the number of key value pairs in a map") {
|
||||
val mapName = "sizeTest"
|
||||
val rand = new Random(3).nextInt(100)
|
||||
val entries = (1 to rand).toList.map {
|
||||
val entries = (1 to rand).toList.map{
|
||||
index =>
|
||||
(("sizeTestKey" + index).getBytes -> ("sizeTestValue" + index).getBytes)
|
||||
}
|
||||
|
|
@ -112,7 +112,7 @@ trait MapStorageBackendTest extends Spec with ShouldMatchers with BeforeAndAfter
|
|||
val mapName = "allTest"
|
||||
val rand = new Random(3).nextInt(100)
|
||||
var entries = new TreeMap[Array[Byte], Array[Byte]]()(ArrayOrdering)
|
||||
(1 to rand).foreach {
|
||||
(1 to rand).foreach{
|
||||
index =>
|
||||
entries += (("allTestKey" + index).getBytes -> ("allTestValue" + index).getBytes)
|
||||
}
|
||||
|
|
@ -124,12 +124,20 @@ trait MapStorageBackendTest extends Spec with ShouldMatchers with BeforeAndAfter
|
|||
|
||||
|
||||
|
||||
val entryMap = new HashMap[String, String] ++ entries.map {_ match {case (k, v) => (new String(k), new String(v))}}
|
||||
val retrievedMap = new HashMap[String, String] ++ entries.map {_ match {case (k, v) => (new String(k), new String(v))}}
|
||||
val entryMap = new HashMap[String, String] ++ entries.map{
|
||||
_ match {
|
||||
case (k, v) => (new String(k), new String(v))
|
||||
}
|
||||
}
|
||||
val retrievedMap = new HashMap[String, String] ++ entries.map{
|
||||
_ match {
|
||||
case (k, v) => (new String(k), new String(v))
|
||||
}
|
||||
}
|
||||
|
||||
entryMap should equal(retrievedMap)
|
||||
|
||||
(0 until rand).foreach {
|
||||
(0 until rand).foreach{
|
||||
i: Int => {
|
||||
new String(entries.toList(i)._1) should be(new String(retrieved(i)._1))
|
||||
}
|
||||
|
|
@ -155,6 +163,14 @@ trait MapStorageBackendTest extends Spec with ShouldMatchers with BeforeAndAfter
|
|||
storage.getMapStorageSizeFor("nonExistent") should be(0)
|
||||
}
|
||||
|
||||
it("should not stomp on the map keyset when a map key of 0xff is used") {
|
||||
val mapName = "keySetStomp"
|
||||
val key = CommonStorageBackend.mapKeysIndex
|
||||
storage.insertMapStorageEntryFor(mapName, key, key)
|
||||
storage.getMapStorageSizeFor(mapName) should be(1)
|
||||
storage.getMapStorageEntryFor(mapName,key).get should be (key)
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue