Don't advertise temporary references in the compression table #28822
This commit is contained in:
parent
f31b58b61a
commit
4c81ef838b
1 changed files with 25 additions and 10 deletions
|
|
@ -7,12 +7,10 @@ package akka.remote.artery.compress
|
|||
import java.util.function.LongFunction
|
||||
|
||||
import scala.annotation.tailrec
|
||||
|
||||
import akka.actor.ActorRef
|
||||
import akka.actor.ActorSystem
|
||||
import akka.actor.Address
|
||||
import akka.actor.{ ActorRef, ActorSystem, Address, InternalActorRef }
|
||||
import akka.event.Logging
|
||||
import akka.event.LoggingAdapter
|
||||
import akka.pattern.PromiseActorRef
|
||||
import akka.remote.artery._
|
||||
import akka.util.{ unused, OptionVal }
|
||||
import org.agrona.collections.Long2ObjectHashMap
|
||||
|
|
@ -192,6 +190,21 @@ private[remote] final class InboundActorRefCompression(
|
|||
outboundContext.sendControl(
|
||||
CompressionProtocol.ActorRefCompressionAdvertisement(inboundContext.localAddress, table))
|
||||
}
|
||||
|
||||
override protected def buildTableForAdvertisement(elements: Iterator[ActorRef]): Map[ActorRef, Int] = {
|
||||
val mb = Map.newBuilder[ActorRef, Int]
|
||||
var idx = 0
|
||||
elements.foreach {
|
||||
case ref: InternalActorRef =>
|
||||
val isTemporaryRef = (ref.isLocal && ref.isInstanceOf[PromiseActorRef]) ||
|
||||
(!ref.isLocal && ref.path.elements.head == "temp")
|
||||
if (!isTemporaryRef) {
|
||||
mb += ref -> idx
|
||||
idx += 1
|
||||
}
|
||||
}
|
||||
mb.result()
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -493,15 +506,17 @@ private[remote] abstract class InboundCompression[T >: Null](
|
|||
protected def advertiseCompressionTable(association: OutboundContext, table: CompressionTable[T]): Unit
|
||||
|
||||
private def prepareCompressionAdvertisement(nextTableVersion: Byte): CompressionTable[T] = {
|
||||
// TODO optimised somewhat, check if still to heavy; could be encoded into simple array
|
||||
val mappings: Map[T, Int] = {
|
||||
val mb = Map.newBuilder[T, Int]
|
||||
mb ++= heavyHitters.iterator.zipWithIndex
|
||||
mb.result()
|
||||
}
|
||||
val mappings: Map[T, Int] = buildTableForAdvertisement(heavyHitters.iterator)
|
||||
CompressionTable(originUid, nextTableVersion, mappings)
|
||||
}
|
||||
|
||||
protected def buildTableForAdvertisement(elements: Iterator[T]): Map[T, Int] = {
|
||||
// TODO optimised somewhat, check if still to heavy; could be encoded into simple array
|
||||
val mb = Map.newBuilder[T, Int]
|
||||
mb ++= elements.zipWithIndex
|
||||
mb.result()
|
||||
}
|
||||
|
||||
override def toString =
|
||||
s"""${Logging.simpleName(getClass)}(countMinSketch: $cms, heavyHitters: $heavyHitters)"""
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue