=art Heap structure in heavy hitters

This commit is contained in:
Łukasz Dubiel 2016-07-22 21:40:14 +02:00 committed by Johan Andrén
parent 08670ca155
commit 642a145ca1
3 changed files with 203 additions and 79 deletions

View file

@ -81,9 +81,9 @@ class HeavyHittersBenchmark {
@Benchmark
@OperationsPerInvocation(8192)
def updateExistingHitter(blackhole: Blackhole): Unit = {
var i = 0
var i: Int = 0
while (i < 8192) {
blackhole.consume(topN.update("HEAVY_HITTER", Long.MaxValue)) // definitely a heavy hitter
blackhole.consume(topN.update(preallocatedStrings(i % 16), Long.MaxValue)) // definitely a heavy hitter
i += 1
}
}

View file

@ -3,13 +3,10 @@
*/
package akka.remote.artery.compress
import java.util
import java.util.Objects
import akka.japi.Util
import scala.annotation.{ switch, tailrec }
import scala.collection.immutable
import scala.reflect.ClassTag
/**
* INTERNAL API
@ -22,31 +19,48 @@ import scala.collection.immutable
* for a discussion about the assumptions made and guarantees about the Heavy Hitters made in this model.
* We assume the Cash Register model in which there are only additions, which simplifies HH detecion significantly.
*/
private[remote] final class TopHeavyHitters[T](val max: Int) {
import TopHeavyHitters._
private[this] var _lowestHitterIdx: Int = 0
private[remote] final class TopHeavyHitters[T >: Null](val max: Int)(implicit classTag: ClassTag[T]) {
private[this] val hashes: Array[Int] = Array.ofDim(max)
private[this] val items: Array[T] = Array.ofDim[Object](max).asInstanceOf[Array[T]]
private[this] val weights: Array[Long] = Array.ofDim(max)
require((max & (max - 1)) == 0, "Maximum numbers of heavy hitters should be in form of 2^k for any natural k")
val capacity = max * 2
val mask = capacity - 1
import TopHeavyHitters._
private[this] val hashes: Array[Int] = Array.ofDim(capacity)
private[this] val items: Array[T] = Array.ofDim[T](capacity)
private[this] val weights: Array[Long] = Array.ofDim(capacity)
private[this] val heapIndex: Array[Int] = Array.fill(capacity)(-1)
private[this] val heap: Array[Int] = Array.fill(max)(-1)
// TODO think if we could get away without copy
/** Returns copy(!) of items which are currently considered to be heavy hitters. */
/** Returns the current heavy hitters, order is not of significance */
def snapshot: Array[T] = {
val snap = Array.ofDim(max).asInstanceOf[Array[T]]
System.arraycopy(items, 0, snap, 0, items.length)
var i = 0
while (i < max) {
val index = heap(i)
val value =
if (index < 0) null
else items(index)
snap(i) = value
i += 1
}
snap
}
def toDebugString =
s"""TopHeavyHitters(
| max: $max,
| lowestHitterIdx: $lowestHitterIdx (weight: $lowestHitterWeight)
|
| hashes: ${hashes.toList.mkString("[", ", ", "]")}
| weights: ${weights.toList.mkString("[", ", ", "]")}
| items: ${items.toList.mkString("[", ", ", "]")}
|)""".stripMargin
| max: $max,
| lowestHitterIdx: $lowestHitterIndex (weight: $lowestHitterWeight)
|
| hashes: ${hashes.toList.mkString("[", ", ", "]")}
| weights: ${weights.toList.mkString("[", ", ", "]")}
| items: ${items.toList.mkString("[", ", ", "]")}
| heapIndex: ${heapIndex.toList.mkString("[", ", ", "]")}
| heap: ${heap.toList.mkString("[", ", ", "]")}
|)""".stripMargin
/**
* Attempt adding item to heavy hitters set, if it does not fit in the top yet,
@ -56,30 +70,48 @@ private[remote] final class TopHeavyHitters[T](val max: Int) {
*/
// TODO possibly can be optimised further? (there is a benchmark)
def update(item: T, count: Long): Boolean =
isHeavy(count) && { // O(1) terminate execution ASAP if known to not be a heavy hitter anyway
isHeavy(count) && { // O(1) terminate execution ASAP if known to not be a heavy hitter anyway
val hashCode = new HashCodeVal(item.hashCode()) // avoid re-calculating hashCode
(findHashIdx(0, hashCode): @switch) match { // worst case O(n), can't really bin search here since indexes are kept in synch with other arrays hmm...
val startIndex = hashCode.get & mask
(findHashIdx(startIndex, hashCode): @switch) match { // worst case O(n), common O(1 + alpha), can't really bin search here since indexes are kept in synch with other arrays hmm...
case -1
// not previously heavy hitter
insertKnownNewHeavy(hashCode, item, count) // O(1) + rarely O(n) if needs to update lowest hitter
insertKnownNewHeavy(hashCode, item, count) // O(log n + alpha)
true
case potentialIndexGuess
// the found index could be one of many which hash to the same value (we're using open-addressing),
// so it is only used as hint for the replace call. If the value matches, we're good, if not we need to search from here onwards.
// so it is only used as hint for the replace call. If the value matches, we're good, if not we need to search from here onwards.
val actualIdx = findItemIdx(potentialIndexGuess, hashCode, item)
if (actualIdx == -1) insertKnownNewHeavy(hashCode, item, count) // O(1) + O(n), we simply replace the current lowest heavy hitter
else replaceExistingHeavyHitter(actualIdx, hashCode, item, count) // usually O(1), worst case O(n) if we need to scan due to hash conflicts
if (actualIdx == -1) {
insertKnownNewHeavy(hashCode, item, count) // O(1 + log n), we simply replace the current lowest heavy hitter
true
} else replaceExistingHeavyHitter(actualIdx, hashCode, item, count) // usually O(1), worst case O(n) if we need to scan due to hash conflicts
}
}
def isHeavy(count: Long): Boolean =
count > lowestHitterWeight
@tailrec private def findItemIdx(searchFromIndex: Int, hashCode: HashCodeVal, o: T): Int =
private def findItemIdx(searchFromIndex: Int, hashCode: HashCodeVal, o: T): Int = {
@tailrec def loop(index: Int, start: Int, hashCodeVal: HashCodeVal, o: T): Int = {
if (index == start) -1
else if (hashCodeVal.get == hashes(index)) {
val item: T = items(index)
if (item == o) {
index
} else {
loop((index + 1) & mask, start, hashCodeVal, o)
}
} else {
loop((index + 1) & mask, start, hashCodeVal, o)
}
}
if (searchFromIndex == -1) -1
else if (Objects.equals(items(searchFromIndex), o)) searchFromIndex
else findItemIdx(findHashIdx(searchFromIndex + 1, hashCode), hashCode, o)
else loop((searchFromIndex + 1) & mask, searchFromIndex, hashCode, o)
}
/**
* Replace existing heavy hitter give it a new `count` value.
@ -91,72 +123,162 @@ private[remote] final class TopHeavyHitters[T](val max: Int) {
@tailrec private def replaceExistingHeavyHitter(foundHashIndex: Int, hashCode: HashCodeVal, item: T, count: Long): Boolean =
if (foundHashIndex == -1) throw new NoSuchElementException(s"Item $item is not present in HeavyHitters, can not replace it!")
else if (Objects.equals(items(foundHashIndex), item)) {
putCount(foundHashIndex, count) // we don't need to change `hashCode` or `item`, those remain the same
if (foundHashIndex == lowestHitterIdx) updateLowestHitterIdx() // need to update the lowestHitter since we just bumped its count
false // not a "new" heavy hitter, since we only replaced it (so it was signaled as new once before)
updateCount(foundHashIndex, count) // we don't need to change `hashCode` or `item`, those remain the same
fixHeap(heapIndex(foundHashIndex))
false // not a "new" heavy hitter, since we only replaced it (so it was signaled as new once before)
} else replaceExistingHeavyHitter(findHashIdx(foundHashIndex + 1, hashCode), hashCode, item, count) // recurse
private def findHashIdx(searchFromIndex: Int, hashCode: HashCodeVal): Int =
findEqIndex(hashes, searchFromIndex, hashCode.get)
/**
* Fix heap property on `heap` array
* @param index place to check and fix
*/
@tailrec
private def fixHeap(index: Int): Unit = {
val leftIndex = index * 2 + 1
val rightIndex = index * 2 + 2
val currentWeights: Long = weights(heap(index))
if (rightIndex < max) {
val leftValueIndex: Int = heap(leftIndex)
val rightValueIndex: Int = heap(rightIndex)
if (leftValueIndex < 0) {
swapHeapNode(index, leftIndex)
fixHeap(leftIndex)
} else if (rightValueIndex < 0) {
swapHeapNode(index, rightIndex)
fixHeap(rightIndex)
} else {
val rightWeights: Long = weights(rightValueIndex)
val leftWeights: Long = weights(leftValueIndex)
if (leftWeights < rightWeights) {
if (currentWeights > leftWeights) {
swapHeapNode(index, leftIndex)
fixHeap(leftIndex)
}
} else {
if (currentWeights > rightWeights) {
swapHeapNode(index, rightIndex)
fixHeap(rightIndex)
}
}
}
} else if (leftIndex < max) {
val leftValueIndex: Int = heap(leftIndex)
if (leftValueIndex < 0) {
swapHeapNode(index, leftIndex)
fixHeap(leftIndex)
} else {
val leftWeights: Long = weights(leftValueIndex)
if (currentWeights > leftWeights) {
swapHeapNode(index, leftIndex)
fixHeap(leftIndex)
}
}
}
}
/**
* Swaps two elements in `heap` array and maintain correct index in `heapIndex`.
*
* @param a index of first element
* @param b index of second element
*/
private def swapHeapNode(a: Int, b: Int): Unit = {
if (heap(a) >= 0) {
heapIndex(heap(a)) = b
}
if (heap(b) >= 0) {
heapIndex(heap(b)) = a
}
val temp = heap(a)
heap(a) = heap(b)
heap(b) = temp
}
/**
* Puts the item and additional information into the index of the current lowest hitter.
*
* @return index at which the insertion was performed
*/
private def insertKnownNewHeavy(hashCode: HashCodeVal, item: T, count: Long): Boolean = {
put(_lowestHitterIdx, hashCode, item, count)
updateLowestHitterIdx()
true
private def insertKnownNewHeavy(hashCode: HashCodeVal, item: T, count: Long): Unit = {
removeHash(lowestHitterIndex)
lowestHitterIndex = insert(hashCode, item, count)
}
/**
* Remove value from hash-table based on position.
*
* @param index position to remove
*/
private def removeHash(index: Int): Unit = {
if (index > 0) {
items(index) = null
hashes(index) = 0
weights(index) = 0
}
}
/**
* Only update the count for a given index, e.g. if value and hashCode remained the same.
*/
private def putCount(idx: Int, count: Long): Unit =
private def updateCount(idx: Int, count: Long): Unit =
weights(idx) = count
private def put(idx: Int, hashCode: HashCodeVal, item: T, count: Long): Unit = {
hashes(idx) = hashCode.get
items(idx) = item
weights(idx) = count
}
/** Perform a scan for the lowest hitter (by weight). */
private def updateLowestHitterIdx(): Int = {
_lowestHitterIdx = findIndexOfMinimum(weights)
_lowestHitterIdx
/**
* Insert value in hash-table.
*
* Using open addressing for resolving collisions.
* Initial index is reminder in division hashCode and table size.
*
* @param hashCode hashCode of item
* @param item value which should be added to hash-table
* @param count count associated to value
* @return Index in hash-table where was inserted
*/
private def insert(hashCode: HashCodeVal, item: T, count: Long): Int = {
var index: Int = hashCode.get & mask
while (items(index) != null) {
index = (index + 1) & mask
}
hashes(index) = hashCode.get
items(index) = item
weights(index) = count
index
}
/** Weight of lowest heavy hitter, if a new inserted item has a weight greater than this it is a heavy hitter. */
def lowestHitterWeight: Long =
weights(_lowestHitterIdx)
// do not expose we're array based
private def lowestHitterIdx: Int =
_lowestHitterIdx
private def findEqIndex(hashes: Array[Int], searchFromIndex: Int, hashCode: Int): Int = {
var i: Int = searchFromIndex
while (i < hashes.length) {
if (hashes(i) == hashCode) return i
i += 1
def lowestHitterWeight: Long = {
val index: Int = lowestHitterIndex
if (index > 0) {
weights(index)
} else {
0
}
-1
}
private def findIndexOfMinimum(weights: Array[Long]): Int = {
var _lowestHitterIdx: Int = -1
var min: Long = Long.MaxValue
private def lowestHitterIndex: Int = {
heap(0)
}
private def lowestHitterIndex_=(index: Int): Unit = {
heap(0) = index
heapIndex(index) = 0
fixHeap(0)
}
private def findEqIndex(hashes: Array[Int], searchFromIndex: Int, hashCode: Int): Int = {
var i: Int = 0
while (i < weights.length) {
if (weights(i) < min) {
min = weights(i)
_lowestHitterIdx = i
while (i < hashes.length) {
val index = (i + searchFromIndex) & mask
if (hashes(index) == hashCode) {
return index
}
i += 1
}
_lowestHitterIdx
-1
}
override def toString =
@ -164,8 +286,10 @@ private[remote] final class TopHeavyHitters[T](val max: Int) {
}
object TopHeavyHitters {
/** Value class to avoid mixing up count and hashCode in APIs. */
private[compress] final class HashCodeVal(val get: Int) extends AnyVal {
def isEmpty = false
}
}

View file

@ -10,7 +10,7 @@ class HeavyHittersSpec extends WordSpecLike with Matchers {
"TopHeavyHitters" must {
"should work" in {
val hitters = new TopHeavyHitters[String](3)
val hitters = new TopHeavyHitters[String](4)
hitters.update("A", 10) shouldBe true
hitters.snapshot.filter(_ ne null).toSet should ===(Set("A"))
@ -21,20 +21,20 @@ class HeavyHittersSpec extends WordSpecLike with Matchers {
hitters.snapshot.filter(_ ne null).toSet should ===(Set("A", "B", "C"))
hitters.update("D", 100) shouldBe true
hitters.snapshot.filter(_ ne null).toSet should ===(Set("A", "B", "D"))
hitters.snapshot.filter(_ ne null).toSet should ===(Set("A", "B", "D", "C"))
hitters.update("E", 200) shouldBe true
hitters.snapshot.filter(_ ne null).toSet should ===(Set("B", "D", "E"))
hitters.snapshot.filter(_ ne null).toSet should ===(Set("A", "B", "D", "E"))
hitters.update("BB", 22) shouldBe true
hitters.snapshot.filter(_ ne null).toSet should ===(Set("BB", "D", "E"))
hitters.snapshot.filter(_ ne null).toSet should ===(Set("B", "BB", "D", "E"))
hitters.update("a", 1) shouldBe false
hitters.snapshot.filter(_ ne null).toSet should ===(Set("BB", "D", "E"))
hitters.snapshot.filter(_ ne null).toSet should ===(Set("B", "BB", "D", "E"))
}
"correctly replace a hitter" in {
val hitters = new TopHeavyHitters[String](3)
val hitters = new TopHeavyHitters[String](4)
hitters.update("A", 10) shouldBe true
hitters.snapshot.filter(_ ne null).toSet should ===(Set("A"))
@ -44,7 +44,7 @@ class HeavyHittersSpec extends WordSpecLike with Matchers {
}
"correctly drop least heavy hitter when more than N are inserted" in {
val hitters = new TopHeavyHitters[String](3)
val hitters = new TopHeavyHitters[String](4)
hitters.update("A", 1) shouldBe true
hitters.snapshot.filter(_ ne null).toSet should ===(Set("A"))
@ -54,15 +54,15 @@ class HeavyHittersSpec extends WordSpecLike with Matchers {
hitters.update("C", 33) shouldBe true
hitters.snapshot.filter(_ ne null).toSet should ===(Set("A", "B", "C"))
hitters.lowestHitterWeight should ===(1)
hitters.lowestHitterWeight should ===(0)
// first item which forces dropping least heavy hitter
hitters.update("D", 100) shouldBe true
hitters.snapshot.filter(_ ne null).toSet should ===(Set("B", "C", "D"))
hitters.snapshot.filter(_ ne null).toSet should ===(Set("A", "B", "C", "D"))
// second item which forces dropping least heavy hitter
hitters.update("X", 999) shouldBe true
hitters.snapshot.filter(_ ne null).toSet should ===(Set("X", "C", "D"))
hitters.snapshot.filter(_ ne null).toSet should ===(Set("X", "B", "C", "D"))
}
"replace the right item even when hashCodes collide" in {