diff --git a/akka-remote/src/main/scala/akka/remote/artery/compress/InboundCompressions.scala b/akka-remote/src/main/scala/akka/remote/artery/compress/InboundCompressions.scala index 60dd3dbe99..73d5a4d3d6 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/compress/InboundCompressions.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/compress/InboundCompressions.scala @@ -7,12 +7,10 @@ package akka.remote.artery.compress import java.util.function.LongFunction import scala.annotation.tailrec - -import akka.actor.ActorRef -import akka.actor.ActorSystem -import akka.actor.Address +import akka.actor.{ ActorRef, ActorSystem, Address, InternalActorRef } import akka.event.Logging import akka.event.LoggingAdapter +import akka.pattern.PromiseActorRef import akka.remote.artery._ import akka.util.{ unused, OptionVal } import org.agrona.collections.Long2ObjectHashMap @@ -192,6 +190,21 @@ private[remote] final class InboundActorRefCompression( outboundContext.sendControl( CompressionProtocol.ActorRefCompressionAdvertisement(inboundContext.localAddress, table)) } + + override protected def buildTableForAdvertisement(elements: Iterator[ActorRef]): Map[ActorRef, Int] = { + val mb = Map.newBuilder[ActorRef, Int] + var idx = 0 + elements.foreach { + case ref: InternalActorRef => + val isTemporaryRef = (ref.isLocal && ref.isInstanceOf[PromiseActorRef]) || + (!ref.isLocal && ref.path.elements.head == "temp") + if (!isTemporaryRef) { + mb += ref -> idx + idx += 1 + } + } + mb.result() + } } /** @@ -493,15 +506,17 @@ private[remote] abstract class InboundCompression[T >: Null]( protected def advertiseCompressionTable(association: OutboundContext, table: CompressionTable[T]): Unit private def prepareCompressionAdvertisement(nextTableVersion: Byte): CompressionTable[T] = { - // TODO optimised somewhat, check if still to heavy; could be encoded into simple array - val mappings: Map[T, Int] = { - val mb = Map.newBuilder[T, Int] - mb ++= heavyHitters.iterator.zipWithIndex - mb.result() - } + val mappings: Map[T, Int] = buildTableForAdvertisement(heavyHitters.iterator) CompressionTable(originUid, nextTableVersion, mappings) } + protected def buildTableForAdvertisement(elements: Iterator[T]): Map[T, Int] = { + // TODO optimised somewhat, check if still to heavy; could be encoded into simple array + val mb = Map.newBuilder[T, Int] + mb ++= elements.zipWithIndex + mb.result() + } + override def toString = s"""${Logging.simpleName(getClass)}(countMinSketch: $cms, heavyHitters: $heavyHitters)"""