From 642a145ca1f5a180de343ca4b9f32b2dfef04b1c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Dubiel?= Date: Fri, 22 Jul 2016 21:40:14 +0200 Subject: [PATCH] =art Heap structure in heavy hitters --- .../compress/HeavyHittersBenchmark.scala | 4 +- .../artery/compress/TopHeavyHitters.scala | 258 +++++++++++++----- .../artery/compress/HeavyHittersSpec.scala | 20 +- 3 files changed, 203 insertions(+), 79 deletions(-) diff --git a/akka-bench-jmh/src/main/scala/akka/remote/compress/HeavyHittersBenchmark.scala b/akka-bench-jmh/src/main/scala/akka/remote/compress/HeavyHittersBenchmark.scala index b6e14f9465..6db11656b7 100644 --- a/akka-bench-jmh/src/main/scala/akka/remote/compress/HeavyHittersBenchmark.scala +++ b/akka-bench-jmh/src/main/scala/akka/remote/compress/HeavyHittersBenchmark.scala @@ -81,9 +81,9 @@ class HeavyHittersBenchmark { @Benchmark @OperationsPerInvocation(8192) def updateExistingHitter(blackhole: Blackhole): Unit = { - var i = 0 + var i: Int = 0 while (i < 8192) { - blackhole.consume(topN.update("HEAVY_HITTER", Long.MaxValue)) // definitely a heavy hitter + blackhole.consume(topN.update(preallocatedStrings(i % 16), Long.MaxValue)) // definitely a heavy hitter i += 1 } } diff --git a/akka-remote/src/main/scala/akka/remote/artery/compress/TopHeavyHitters.scala b/akka-remote/src/main/scala/akka/remote/artery/compress/TopHeavyHitters.scala index 4f6a9e0a15..715b081b71 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/compress/TopHeavyHitters.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/compress/TopHeavyHitters.scala @@ -3,13 +3,10 @@ */ package akka.remote.artery.compress -import java.util import java.util.Objects -import akka.japi.Util - import scala.annotation.{ switch, tailrec } -import scala.collection.immutable +import scala.reflect.ClassTag /** * INTERNAL API @@ -22,31 +19,48 @@ import scala.collection.immutable * for a discussion about the assumptions made and guarantees about the Heavy Hitters made in this model. * We assume the Cash Register model in which there are only additions, which simplifies HH detecion significantly. */ -private[remote] final class TopHeavyHitters[T](val max: Int) { - import TopHeavyHitters._ - private[this] var _lowestHitterIdx: Int = 0 +private[remote] final class TopHeavyHitters[T >: Null](val max: Int)(implicit classTag: ClassTag[T]) { - private[this] val hashes: Array[Int] = Array.ofDim(max) - private[this] val items: Array[T] = Array.ofDim[Object](max).asInstanceOf[Array[T]] - private[this] val weights: Array[Long] = Array.ofDim(max) + require((max & (max - 1)) == 0, "Maximum numbers of heavy hitters should be in form of 2^k for any natural k") + + val capacity = max * 2 + val mask = capacity - 1 + + import TopHeavyHitters._ + + private[this] val hashes: Array[Int] = Array.ofDim(capacity) + private[this] val items: Array[T] = Array.ofDim[T](capacity) + private[this] val weights: Array[Long] = Array.ofDim(capacity) + private[this] val heapIndex: Array[Int] = Array.fill(capacity)(-1) + private[this] val heap: Array[Int] = Array.fill(max)(-1) // TODO think if we could get away without copy - /** Returns copy(!) of items which are currently considered to be heavy hitters. */ + /** Returns the current heavy hitters, order is not of significance */ def snapshot: Array[T] = { val snap = Array.ofDim(max).asInstanceOf[Array[T]] - System.arraycopy(items, 0, snap, 0, items.length) + var i = 0 + while (i < max) { + val index = heap(i) + val value = + if (index < 0) null + else items(index) + snap(i) = value + i += 1 + } snap } def toDebugString = s"""TopHeavyHitters( - | max: $max, - | lowestHitterIdx: $lowestHitterIdx (weight: $lowestHitterWeight) - | - | hashes: ${hashes.toList.mkString("[", ", ", "]")} - | weights: ${weights.toList.mkString("[", ", ", "]")} - | items: ${items.toList.mkString("[", ", ", "]")} - |)""".stripMargin + | max: $max, + | lowestHitterIdx: $lowestHitterIndex (weight: $lowestHitterWeight) + | + | hashes: ${hashes.toList.mkString("[", ", ", "]")} + | weights: ${weights.toList.mkString("[", ", ", "]")} + | items: ${items.toList.mkString("[", ", ", "]")} + | heapIndex: ${heapIndex.toList.mkString("[", ", ", "]")} + | heap: ${heap.toList.mkString("[", ", ", "]")} + |)""".stripMargin /** * Attempt adding item to heavy hitters set, if it does not fit in the top yet, @@ -56,30 +70,48 @@ private[remote] final class TopHeavyHitters[T](val max: Int) { */ // TODO possibly can be optimised further? (there is a benchmark) def update(item: T, count: Long): Boolean = - isHeavy(count) && { // O(1) terminate execution ASAP if known to not be a heavy hitter anyway + isHeavy(count) && { // O(1) terminate execution ASAP if known to not be a heavy hitter anyway val hashCode = new HashCodeVal(item.hashCode()) // avoid re-calculating hashCode - (findHashIdx(0, hashCode): @switch) match { // worst case O(n), can't really bin search here since indexes are kept in synch with other arrays hmm... + val startIndex = hashCode.get & mask + (findHashIdx(startIndex, hashCode): @switch) match { // worst case O(n), common O(1 + alpha), can't really bin search here since indexes are kept in synch with other arrays hmm... case -1 ⇒ // not previously heavy hitter - insertKnownNewHeavy(hashCode, item, count) // O(1) + rarely O(n) if needs to update lowest hitter - + insertKnownNewHeavy(hashCode, item, count) // O(log n + alpha) + true case potentialIndexGuess ⇒ // the found index could be one of many which hash to the same value (we're using open-addressing), - // so it is only used as hint for the replace call. If the value matches, we're good, if not we need to search from here onwards. + // so it is only used as hint for the replace call. If the value matches, we're good, if not we need to search from here onwards. val actualIdx = findItemIdx(potentialIndexGuess, hashCode, item) - if (actualIdx == -1) insertKnownNewHeavy(hashCode, item, count) // O(1) + O(n), we simply replace the current lowest heavy hitter - else replaceExistingHeavyHitter(actualIdx, hashCode, item, count) // usually O(1), worst case O(n) if we need to scan due to hash conflicts + if (actualIdx == -1) { + insertKnownNewHeavy(hashCode, item, count) // O(1 + log n), we simply replace the current lowest heavy hitter + true + } else replaceExistingHeavyHitter(actualIdx, hashCode, item, count) // usually O(1), worst case O(n) if we need to scan due to hash conflicts } } def isHeavy(count: Long): Boolean = count > lowestHitterWeight - @tailrec private def findItemIdx(searchFromIndex: Int, hashCode: HashCodeVal, o: T): Int = + private def findItemIdx(searchFromIndex: Int, hashCode: HashCodeVal, o: T): Int = { + @tailrec def loop(index: Int, start: Int, hashCodeVal: HashCodeVal, o: T): Int = { + if (index == start) -1 + else if (hashCodeVal.get == hashes(index)) { + val item: T = items(index) + if (item == o) { + index + } else { + loop((index + 1) & mask, start, hashCodeVal, o) + } + } else { + loop((index + 1) & mask, start, hashCodeVal, o) + } + } + if (searchFromIndex == -1) -1 else if (Objects.equals(items(searchFromIndex), o)) searchFromIndex - else findItemIdx(findHashIdx(searchFromIndex + 1, hashCode), hashCode, o) + else loop((searchFromIndex + 1) & mask, searchFromIndex, hashCode, o) + } /** * Replace existing heavy hitter – give it a new `count` value. @@ -91,72 +123,162 @@ private[remote] final class TopHeavyHitters[T](val max: Int) { @tailrec private def replaceExistingHeavyHitter(foundHashIndex: Int, hashCode: HashCodeVal, item: T, count: Long): Boolean = if (foundHashIndex == -1) throw new NoSuchElementException(s"Item $item is not present in HeavyHitters, can not replace it!") else if (Objects.equals(items(foundHashIndex), item)) { - putCount(foundHashIndex, count) // we don't need to change `hashCode` or `item`, those remain the same - if (foundHashIndex == lowestHitterIdx) updateLowestHitterIdx() // need to update the lowestHitter since we just bumped its count - false // not a "new" heavy hitter, since we only replaced it (so it was signaled as new once before) + updateCount(foundHashIndex, count) // we don't need to change `hashCode` or `item`, those remain the same + fixHeap(heapIndex(foundHashIndex)) + false // not a "new" heavy hitter, since we only replaced it (so it was signaled as new once before) } else replaceExistingHeavyHitter(findHashIdx(foundHashIndex + 1, hashCode), hashCode, item, count) // recurse private def findHashIdx(searchFromIndex: Int, hashCode: HashCodeVal): Int = findEqIndex(hashes, searchFromIndex, hashCode.get) + /** + * Fix heap property on `heap` array + * @param index place to check and fix + */ + @tailrec + private def fixHeap(index: Int): Unit = { + val leftIndex = index * 2 + 1 + val rightIndex = index * 2 + 2 + val currentWeights: Long = weights(heap(index)) + if (rightIndex < max) { + val leftValueIndex: Int = heap(leftIndex) + val rightValueIndex: Int = heap(rightIndex) + if (leftValueIndex < 0) { + swapHeapNode(index, leftIndex) + fixHeap(leftIndex) + } else if (rightValueIndex < 0) { + swapHeapNode(index, rightIndex) + fixHeap(rightIndex) + } else { + val rightWeights: Long = weights(rightValueIndex) + val leftWeights: Long = weights(leftValueIndex) + if (leftWeights < rightWeights) { + if (currentWeights > leftWeights) { + swapHeapNode(index, leftIndex) + fixHeap(leftIndex) + } + } else { + if (currentWeights > rightWeights) { + swapHeapNode(index, rightIndex) + fixHeap(rightIndex) + } + } + } + } else if (leftIndex < max) { + val leftValueIndex: Int = heap(leftIndex) + if (leftValueIndex < 0) { + swapHeapNode(index, leftIndex) + fixHeap(leftIndex) + } else { + val leftWeights: Long = weights(leftValueIndex) + if (currentWeights > leftWeights) { + swapHeapNode(index, leftIndex) + fixHeap(leftIndex) + } + } + } + } + + /** + * Swaps two elements in `heap` array and maintain correct index in `heapIndex`. + * + * @param a index of first element + * @param b index of second element + */ + private def swapHeapNode(a: Int, b: Int): Unit = { + if (heap(a) >= 0) { + heapIndex(heap(a)) = b + } + if (heap(b) >= 0) { + heapIndex(heap(b)) = a + } + val temp = heap(a) + heap(a) = heap(b) + heap(b) = temp + } + /** * Puts the item and additional information into the index of the current lowest hitter. * * @return index at which the insertion was performed */ - private def insertKnownNewHeavy(hashCode: HashCodeVal, item: T, count: Long): Boolean = { - put(_lowestHitterIdx, hashCode, item, count) - updateLowestHitterIdx() - true + private def insertKnownNewHeavy(hashCode: HashCodeVal, item: T, count: Long): Unit = { + removeHash(lowestHitterIndex) + lowestHitterIndex = insert(hashCode, item, count) + } + + /** + * Remove value from hash-table based on position. + * + * @param index position to remove + */ + private def removeHash(index: Int): Unit = { + if (index > 0) { + items(index) = null + hashes(index) = 0 + weights(index) = 0 + } } /** * Only update the count for a given index, e.g. if value and hashCode remained the same. */ - private def putCount(idx: Int, count: Long): Unit = + private def updateCount(idx: Int, count: Long): Unit = weights(idx) = count - private def put(idx: Int, hashCode: HashCodeVal, item: T, count: Long): Unit = { - hashes(idx) = hashCode.get - items(idx) = item - weights(idx) = count - } - - /** Perform a scan for the lowest hitter (by weight). */ - private def updateLowestHitterIdx(): Int = { - _lowestHitterIdx = findIndexOfMinimum(weights) - _lowestHitterIdx + /** + * Insert value in hash-table. + * + * Using open addressing for resolving collisions. + * Initial index is reminder in division hashCode and table size. + * + * @param hashCode hashCode of item + * @param item value which should be added to hash-table + * @param count count associated to value + * @return Index in hash-table where was inserted + */ + private def insert(hashCode: HashCodeVal, item: T, count: Long): Int = { + var index: Int = hashCode.get & mask + while (items(index) != null) { + index = (index + 1) & mask + } + hashes(index) = hashCode.get + items(index) = item + weights(index) = count + index } /** Weight of lowest heavy hitter, if a new inserted item has a weight greater than this it is a heavy hitter. */ - def lowestHitterWeight: Long = - weights(_lowestHitterIdx) - - // do not expose we're array based - private def lowestHitterIdx: Int = - _lowestHitterIdx - - private def findEqIndex(hashes: Array[Int], searchFromIndex: Int, hashCode: Int): Int = { - var i: Int = searchFromIndex - while (i < hashes.length) { - if (hashes(i) == hashCode) return i - i += 1 + def lowestHitterWeight: Long = { + val index: Int = lowestHitterIndex + if (index > 0) { + weights(index) + } else { + 0 } - -1 + } - private def findIndexOfMinimum(weights: Array[Long]): Int = { - var _lowestHitterIdx: Int = -1 - var min: Long = Long.MaxValue + private def lowestHitterIndex: Int = { + heap(0) + } + + private def lowestHitterIndex_=(index: Int): Unit = { + heap(0) = index + heapIndex(index) = 0 + fixHeap(0) + } + + private def findEqIndex(hashes: Array[Int], searchFromIndex: Int, hashCode: Int): Int = { var i: Int = 0 - while (i < weights.length) { - if (weights(i) < min) { - min = weights(i) - _lowestHitterIdx = i + while (i < hashes.length) { + val index = (i + searchFromIndex) & mask + if (hashes(index) == hashCode) { + return index } i += 1 } - _lowestHitterIdx + -1 } override def toString = @@ -164,8 +286,10 @@ private[remote] final class TopHeavyHitters[T](val max: Int) { } object TopHeavyHitters { + /** Value class to avoid mixing up count and hashCode in APIs. */ private[compress] final class HashCodeVal(val get: Int) extends AnyVal { def isEmpty = false } + } diff --git a/akka-remote/src/test/scala/akka/remote/artery/compress/HeavyHittersSpec.scala b/akka-remote/src/test/scala/akka/remote/artery/compress/HeavyHittersSpec.scala index 647d81f318..ba18e46449 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/compress/HeavyHittersSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/compress/HeavyHittersSpec.scala @@ -10,7 +10,7 @@ class HeavyHittersSpec extends WordSpecLike with Matchers { "TopHeavyHitters" must { "should work" in { - val hitters = new TopHeavyHitters[String](3) + val hitters = new TopHeavyHitters[String](4) hitters.update("A", 10) shouldBe true hitters.snapshot.filter(_ ne null).toSet should ===(Set("A")) @@ -21,20 +21,20 @@ class HeavyHittersSpec extends WordSpecLike with Matchers { hitters.snapshot.filter(_ ne null).toSet should ===(Set("A", "B", "C")) hitters.update("D", 100) shouldBe true - hitters.snapshot.filter(_ ne null).toSet should ===(Set("A", "B", "D")) + hitters.snapshot.filter(_ ne null).toSet should ===(Set("A", "B", "D", "C")) hitters.update("E", 200) shouldBe true - hitters.snapshot.filter(_ ne null).toSet should ===(Set("B", "D", "E")) + hitters.snapshot.filter(_ ne null).toSet should ===(Set("A", "B", "D", "E")) hitters.update("BB", 22) shouldBe true - hitters.snapshot.filter(_ ne null).toSet should ===(Set("BB", "D", "E")) + hitters.snapshot.filter(_ ne null).toSet should ===(Set("B", "BB", "D", "E")) hitters.update("a", 1) shouldBe false - hitters.snapshot.filter(_ ne null).toSet should ===(Set("BB", "D", "E")) + hitters.snapshot.filter(_ ne null).toSet should ===(Set("B", "BB", "D", "E")) } "correctly replace a hitter" in { - val hitters = new TopHeavyHitters[String](3) + val hitters = new TopHeavyHitters[String](4) hitters.update("A", 10) shouldBe true hitters.snapshot.filter(_ ne null).toSet should ===(Set("A")) @@ -44,7 +44,7 @@ class HeavyHittersSpec extends WordSpecLike with Matchers { } "correctly drop least heavy hitter when more than N are inserted" in { - val hitters = new TopHeavyHitters[String](3) + val hitters = new TopHeavyHitters[String](4) hitters.update("A", 1) shouldBe true hitters.snapshot.filter(_ ne null).toSet should ===(Set("A")) @@ -54,15 +54,15 @@ class HeavyHittersSpec extends WordSpecLike with Matchers { hitters.update("C", 33) shouldBe true hitters.snapshot.filter(_ ne null).toSet should ===(Set("A", "B", "C")) - hitters.lowestHitterWeight should ===(1) + hitters.lowestHitterWeight should ===(0) // first item which forces dropping least heavy hitter hitters.update("D", 100) shouldBe true - hitters.snapshot.filter(_ ne null).toSet should ===(Set("B", "C", "D")) + hitters.snapshot.filter(_ ne null).toSet should ===(Set("A", "B", "C", "D")) // second item which forces dropping least heavy hitter hitters.update("X", 999) shouldBe true - hitters.snapshot.filter(_ ne null).toSet should ===(Set("X", "C", "D")) + hitters.snapshot.filter(_ ne null).toSet should ===(Set("X", "B", "C", "D")) } "replace the right item even when hashCodes collide" in {