diff --git a/akka-remote/src/main/scala/akka/remote/artery/compress/TopHeavyHitters.scala b/akka-remote/src/main/scala/akka/remote/artery/compress/TopHeavyHitters.scala index 638806bb58..f36fdc48b8 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/compress/TopHeavyHitters.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/compress/TopHeavyHitters.scala @@ -4,6 +4,7 @@ package akka.remote.artery.compress import java.util.Objects +import java.util.concurrent.atomic.AtomicReference import scala.annotation.{ switch, tailrec } import scala.reflect.ClassTag @@ -34,9 +35,18 @@ private[remote] final class TopHeavyHitters[T >: Null](val max: Int)(implicit cl private[this] val heapIndex: Array[Int] = Array.fill(capacity)(-1) private[this] val heap: Array[Int] = Array.fill(max)(-1) + // TODO: Workaround for races. This location gets a snapshot automatically after certain amount of changes + // in this table. This is a workaround until we make this threadsafe by moving InboundCompression to the Codec + // to fully own it. + private[this] val lastSnapshot: AtomicReference[Array[T]] = new AtomicReference[Array[T]](Array.empty) + // TODO think if we could get away without copy /** Returns the current heavy hitters, order is not of significance */ - def snapshot: Array[T] = { + // TODO: Workaround for races until we make this threadsafe by moving InboundCompression to the Codec + // to fully own it. + def snapshot: Array[T] = lastSnapshot.get() + + private def takeSnapshot(): Unit = { val snap = Array.ofDim(max).asInstanceOf[Array[T]] var i = 0 while (i < max) { @@ -47,7 +57,7 @@ private[remote] final class TopHeavyHitters[T >: Null](val max: Int)(implicit cl snap(i) = value i += 1 } - snap + lastSnapshot.set(snap) } def toDebugString = @@ -69,11 +79,13 @@ private[remote] final class TopHeavyHitters[T >: Null](val max: Int)(implicit cl * @return `true` if the added item has become a heavy hitter. */ // TODO possibly can be optimised further? (there is a benchmark) - def update(item: T, count: Long): Boolean = - isHeavy(count) && { // O(1) terminate execution ASAP if known to not be a heavy hitter anyway + def update(item: T, count: Long): Boolean = { + val result = isHeavy(count) && { + // O(1) terminate execution ASAP if known to not be a heavy hitter anyway val hashCode = new HashCodeVal(item.hashCode()) // avoid re-calculating hashCode val startIndex = hashCode.get & mask - (findHashIdx(startIndex, hashCode): @switch) match { // worst case O(n), common O(1 + alpha), can't really bin search here since indexes are kept in synch with other arrays hmm... + (findHashIdx(startIndex, hashCode): @switch) match { + // worst case O(n), common O(1 + alpha), can't really bin search here since indexes are kept in synch with other arrays hmm... case -1 ⇒ // not previously heavy hitter insertKnownNewHeavy(hashCode, item, count) // O(log n + alpha) @@ -89,6 +101,8 @@ private[remote] final class TopHeavyHitters[T >: Null](val max: Int)(implicit cl } else replaceExistingHeavyHitter(actualIdx, hashCode, item, count) // usually O(1), worst case O(n) if we need to scan due to hash conflicts } } + result + } def isHeavy(count: Long): Boolean = count > lowestHitterWeight @@ -205,6 +219,7 @@ private[remote] final class TopHeavyHitters[T >: Null](val max: Int)(implicit cl private def insertKnownNewHeavy(hashCode: HashCodeVal, item: T, count: Long): Unit = { removeHash(lowestHitterIndex) lowestHitterIndex = insert(hashCode, item, count) + takeSnapshot() } /**