Workaround for #21475 until it is fully fixed. Makes snapshots thread-safe

This commit is contained in:
drewhk 2016-11-10 15:59:20 +01:00 committed by Johan Andrén
parent 91b522e186
commit 2dd4ac1ef6

View file

@ -4,6 +4,7 @@
package akka.remote.artery.compress
import java.util.Objects
import java.util.concurrent.atomic.AtomicReference
import scala.annotation.{ switch, tailrec }
import scala.reflect.ClassTag
@ -34,9 +35,18 @@ private[remote] final class TopHeavyHitters[T >: Null](val max: Int)(implicit cl
private[this] val heapIndex: Array[Int] = Array.fill(capacity)(-1)
private[this] val heap: Array[Int] = Array.fill(max)(-1)
// TODO: Workaround for races. This location gets a snapshot automatically after certain amount of changes
// in this table. This is a workaround until we make this threadsafe by moving InboundCompression to the Codec
// to fully own it.
private[this] val lastSnapshot: AtomicReference[Array[T]] = new AtomicReference[Array[T]](Array.empty)
// TODO think if we could get away without copy
/** Returns the current heavy hitters, order is not of significance */
def snapshot: Array[T] = {
// TODO: Workaround for races until we make this threadsafe by moving InboundCompression to the Codec
// to fully own it.
def snapshot: Array[T] = lastSnapshot.get()
private def takeSnapshot(): Unit = {
val snap = Array.ofDim(max).asInstanceOf[Array[T]]
var i = 0
while (i < max) {
@ -47,7 +57,7 @@ private[remote] final class TopHeavyHitters[T >: Null](val max: Int)(implicit cl
snap(i) = value
i += 1
}
snap
lastSnapshot.set(snap)
}
def toDebugString =
@ -69,11 +79,13 @@ private[remote] final class TopHeavyHitters[T >: Null](val max: Int)(implicit cl
* @return `true` if the added item has become a heavy hitter.
*/
// TODO possibly can be optimised further? (there is a benchmark)
def update(item: T, count: Long): Boolean =
isHeavy(count) && { // O(1) terminate execution ASAP if known to not be a heavy hitter anyway
def update(item: T, count: Long): Boolean = {
val result = isHeavy(count) && {
// O(1) terminate execution ASAP if known to not be a heavy hitter anyway
val hashCode = new HashCodeVal(item.hashCode()) // avoid re-calculating hashCode
val startIndex = hashCode.get & mask
(findHashIdx(startIndex, hashCode): @switch) match { // worst case O(n), common O(1 + alpha), can't really bin search here since indexes are kept in synch with other arrays hmm...
(findHashIdx(startIndex, hashCode): @switch) match {
// worst case O(n), common O(1 + alpha), can't really bin search here since indexes are kept in synch with other arrays hmm...
case -1
// not previously heavy hitter
insertKnownNewHeavy(hashCode, item, count) // O(log n + alpha)
@ -89,6 +101,8 @@ private[remote] final class TopHeavyHitters[T >: Null](val max: Int)(implicit cl
} else replaceExistingHeavyHitter(actualIdx, hashCode, item, count) // usually O(1), worst case O(n) if we need to scan due to hash conflicts
}
}
result
}
def isHeavy(count: Long): Boolean =
count > lowestHitterWeight
@ -205,6 +219,7 @@ private[remote] final class TopHeavyHitters[T >: Null](val max: Int)(implicit cl
private def insertKnownNewHeavy(hashCode: HashCodeVal, item: T, count: Long): Unit = {
removeHash(lowestHitterIndex)
lowestHitterIndex = insert(hashCode, item, count)
takeSnapshot()
}
/**