diff --git a/akka-actor/src/main/scala/akka/pattern/AskSupport.scala b/akka-actor/src/main/scala/akka/pattern/AskSupport.scala index 2954c966ca..61be651321 100644 --- a/akka-actor/src/main/scala/akka/pattern/AskSupport.scala +++ b/akka-actor/src/main/scala/akka/pattern/AskSupport.scala @@ -505,7 +505,7 @@ final class ExplicitlyAskableActorSelection(val actorSel: ActorSelection) extend * * INTERNAL API */ -private[akka] final class PromiseActorRef private ( +private[akka] final class PromiseActorRef( val provider: ActorRefProvider, val result: Promise[Any], _mcn: String, diff --git a/akka-bench-jmh/src/main/scala/akka/remote/artery/compress/SerializationFormatCacheBenchmark.scala b/akka-bench-jmh/src/main/scala/akka/remote/artery/compress/SerializationFormatCacheBenchmark.scala new file mode 100644 index 0000000000..71ea8145ab --- /dev/null +++ b/akka-bench-jmh/src/main/scala/akka/remote/artery/compress/SerializationFormatCacheBenchmark.scala @@ -0,0 +1,105 @@ +/* + * Copyright (C) 2009-2021 Lightbend Inc. + */ + +package akka.remote.artery.compress + +import akka.actor.Actor +import akka.actor.ActorRef +import akka.actor.ActorSystem +import akka.actor.ExtendedActorSystem +import akka.actor.Props +import akka.pattern.PromiseActorRef +import akka.remote.artery.SerializationFormatCache +import akka.serialization.Serialization +import org.openjdk.jmh.annotations.Benchmark +import org.openjdk.jmh.annotations.Fork +import org.openjdk.jmh.annotations.Level +import org.openjdk.jmh.annotations.OperationsPerInvocation +import org.openjdk.jmh.annotations.Param +import org.openjdk.jmh.annotations.Scope +import org.openjdk.jmh.annotations.Setup +import org.openjdk.jmh.annotations.State +import org.openjdk.jmh.annotations.TearDown +import org.openjdk.jmh.infra.Blackhole + +import scala.annotation.nowarn +import scala.concurrent.Promise + +/** + * Actually more like specific benchmarks for the few concrete LRU cache usages + */ +@Fork(1) +@State(Scope.Benchmark) +@nowarn +class SerializationFormatCacheBenchmark { + + // note 1 means only top level no temporary at all + @Param(Array("1", "2", "5", "10")) + private var everyNToToplevel = 0 + // a few "normal" top level actors communicating + @Param(Array("100")) + private var uniqueTopLevelRefs = 0 + // we want to simulate one per request-response, but create upfront, so very high number + @Param(Array("100000")) + private var uniqueTemporaryRefs = 0 + + private var system: ActorSystem = _ + // hardocoded capacity of 1024 + // note that this is not quite realistic, with a single cache, + // in practice there are N caches, one in each outgoing artery lane + private var cache: SerializationFormatCache = _ + private var temporaryActorRefs: Array[ActorRef] = _ + private var topLevelActorRefs: Array[ActorRef] = _ + + object Parent { + def props(childCount: Int, childProps: Props) = Props(new Parent(childCount, childProps)) + } + class Parent(childCount: Int, childProps: Props) extends Actor { + val children = (0 to childCount).map(_ => context.actorOf(childProps)) + def receive = PartialFunction.empty + } + + @Setup + def init(): Unit = { + system = ActorSystem("SerializationFormatCacheBenchmark") + temporaryActorRefs = Array.tabulate(uniqueTemporaryRefs)( + n => + new PromiseActorRef( + system.asInstanceOf[ExtendedActorSystem].provider, + Promise(), + "Any", + // request path is encoded in this string + s"_user_region_shard${n % 100}_entitypretendid${n}")) + + topLevelActorRefs = Array.tabulate(uniqueTopLevelRefs)(n => system.actorOf(Props.empty, s"actor_$n")) + } + + // new empty cache instance each iteration to have more control over cached contents + @Setup(Level.Iteration) + def perRunSetup(): Unit = { + cache = new SerializationFormatCache + } + + @TearDown + def shutdown(): Unit = { + system.terminate() + } + + @Benchmark + @OperationsPerInvocation(2000) + def useCache(blackhole: Blackhole): Unit = { + // serialization requires this + Serialization.withTransportInformation(system.asInstanceOf[ExtendedActorSystem]) { () => + var i: Int = 0 + while (i < 2000) { + val actorRef = + if (i % everyNToToplevel == 0) topLevelActorRefs(i % uniqueTopLevelRefs) + else temporaryActorRefs(i % uniqueTemporaryRefs) + blackhole.consume(cache.getOrCompute(actorRef)) + i += 1 + } + } + } + +} diff --git a/akka-bench-jmh/src/main/scala/akka/util/LruBoundedCacheBench.scala b/akka-bench-jmh/src/main/scala/akka/util/LruBoundedCacheBench.scala index 42c1aed63d..61fd2c6ca5 100644 --- a/akka-bench-jmh/src/main/scala/akka/util/LruBoundedCacheBench.scala +++ b/akka-bench-jmh/src/main/scala/akka/util/LruBoundedCacheBench.scala @@ -44,6 +44,7 @@ class LruBoundedCacheBench { override protected def compute(k: String): String = k override protected def hash(k: String): Int = k.hashCode override protected def isCacheable(v: String): Boolean = true + override protected def isKeyCacheable(k: String): Boolean = true } // Loading diff --git a/akka-remote/src/main/mima-filters/2.6.18.backwards.excludes/actor-ref-serialization-cache-optimization.backwards.excludes b/akka-remote/src/main/mima-filters/2.6.18.backwards.excludes/actor-ref-serialization-cache-optimization.backwards.excludes new file mode 100644 index 0000000000..af775ef8c6 --- /dev/null +++ b/akka-remote/src/main/mima-filters/2.6.18.backwards.excludes/actor-ref-serialization-cache-optimization.backwards.excludes @@ -0,0 +1,2 @@ +# Issue #31080, internal changes +ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.remote.artery.LruBoundedCache.isKeyCacheable") \ No newline at end of file diff --git a/akka-remote/src/main/scala/akka/remote/artery/Codecs.scala b/akka-remote/src/main/scala/akka/remote/artery/Codecs.scala index 698b9ccbf0..8a76c17146 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/Codecs.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/Codecs.scala @@ -346,6 +346,8 @@ private[remote] final class ActorRefResolveCacheWithAddress( override protected def compute(k: String): InternalActorRef = provider.resolveActorRefWithLocalAddress(k, localAddress.address) + + override protected def isKeyCacheable(k: String): Boolean = true } /** diff --git a/akka-remote/src/main/scala/akka/remote/artery/EnvelopeBufferPool.scala b/akka-remote/src/main/scala/akka/remote/artery/EnvelopeBufferPool.scala index dffa58b4aa..ee79b00553 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/EnvelopeBufferPool.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/EnvelopeBufferPool.scala @@ -5,10 +5,9 @@ package akka.remote.artery import java.nio.{ ByteBuffer, ByteOrder } - import org.agrona.concurrent.{ ManyToManyConcurrentArrayQueue, UnsafeBuffer } - import akka.actor.ActorRef +import akka.actor.InternalActorRef import akka.io.DirectByteBufferPool import akka.remote.artery.compress.{ CompressionTable, InboundCompressions, NoInboundCompressions } import akka.serialization.Serialization @@ -198,6 +197,9 @@ private[remote] sealed trait HeaderBuilder { } /** + * Cache of the serialized path for the most used actor refs, performance critical as applied once for every + * outgoing message. Depends on the Serialization#withTransportInformation thread local to be set for serialization. + * * INTERNAL API */ private[remote] final class SerializationFormatCache @@ -209,6 +211,10 @@ private[remote] final class SerializationFormatCache // Refs with ActorCell.undefinedUid will now collide all the time, but this is not a usual scenario anyway. override protected def hash(ref: ActorRef): Int = ref.path.uid + override protected def isKeyCacheable(k: ActorRef): Boolean = + // "temp" only for one request-response interaction so don't cache + !InternalActorRef.isTemporaryRef(k) + override protected def isCacheable(v: String): Boolean = true } diff --git a/akka-remote/src/main/scala/akka/remote/artery/LruBoundedCache.scala b/akka-remote/src/main/scala/akka/remote/artery/LruBoundedCache.scala index 4387648453..e815df83fa 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/LruBoundedCache.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/LruBoundedCache.scala @@ -76,41 +76,44 @@ private[akka] abstract class LruBoundedCache[K: ClassTag, V <: AnyRef: ClassTag] CacheStatistics(count, max, sum.toDouble / count) } - final def getOrCompute(k: K): V = { - val h = hash(k) - epoch += 1 + final def getOrCompute(k: K): V = + if (!isKeyCacheable(k)) { + compute(k) + } else { + val h = hash(k) + epoch += 1 - @tailrec def findOrCalculate(position: Int, probeDistance: Int): V = { - if (values(position) eq null) { - val value = compute(k) - if (isCacheable(value)) { - keys(position) = k - values(position) = value - hashes(position) = h - epochs(position) = epoch - } - value - } else { - val otherProbeDistance = probeDistanceOf(position) - // If probe distance of the element we try to get is larger than the current slot's, then the element cannot be in - // the table since because of the Robin-Hood property we would have swapped it with the current element. - if (probeDistance > otherProbeDistance) { + @tailrec def findOrCalculate(position: Int, probeDistance: Int): V = { + if (values(position) eq null) { val value = compute(k) - if (isCacheable(value)) move(position, k, h, value, epoch, probeDistance) + if (isCacheable(value)) { + keys(position) = k + values(position) = value + hashes(position) = h + epochs(position) = epoch + } value - } else if (hashes(position) == h && k == keys(position)) { - // Update usage - epochs(position) = epoch - values(position) } else { - // This is not our slot yet - findOrCalculate((position + 1) & Mask, probeDistance + 1) + val otherProbeDistance = probeDistanceOf(position) + // If probe distance of the element we try to get is larger than the current slot's, then the element cannot be in + // the table since because of the Robin-Hood property we would have swapped it with the current element. + if (probeDistance > otherProbeDistance) { + val value = compute(k) + if (isCacheable(value)) move(position, k, h, value, epoch, probeDistance) + value + } else if (hashes(position) == h && k == keys(position)) { + // Update usage + epochs(position) = epoch + values(position) + } else { + // This is not our slot yet + findOrCalculate((position + 1) & Mask, probeDistance + 1) + } } } - } - findOrCalculate(position = h & Mask, probeDistance = 0) - } + findOrCalculate(position = h & Mask, probeDistance = 0) + } @tailrec private def removeAt(position: Int): Unit = { val next = (position + 1) & Mask @@ -182,6 +185,7 @@ private[akka] abstract class LruBoundedCache[K: ClassTag, V <: AnyRef: ClassTag] protected def hash(k: K): Int + protected def isKeyCacheable(k: K): Boolean protected def isCacheable(v: V): Boolean override def toString = diff --git a/akka-remote/src/main/scala/akka/remote/serialization/ActorRefResolveCache.scala b/akka-remote/src/main/scala/akka/remote/serialization/ActorRefResolveCache.scala index 410938bb9d..7883b35f24 100644 --- a/akka-remote/src/main/scala/akka/remote/serialization/ActorRefResolveCache.scala +++ b/akka-remote/src/main/scala/akka/remote/serialization/ActorRefResolveCache.scala @@ -95,6 +95,7 @@ private[akka] abstract class AbstractActorRefResolveCache[R <: ActorRef: ClassTa override protected def hash(k: String): Int = Unsafe.fastHash(k) + override protected def isKeyCacheable(k: String): Boolean = true override protected def isCacheable(ref: R): Boolean = ref match { case _: EmptyLocalActorRef => false diff --git a/akka-remote/src/test/scala/akka/remote/artery/LruBoundedCacheSpec.scala b/akka-remote/src/test/scala/akka/remote/artery/LruBoundedCacheSpec.scala index 3d1c6d33bf..90a96b6ed5 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/LruBoundedCacheSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/LruBoundedCacheSpec.scala @@ -27,6 +27,7 @@ class LruBoundedCacheSpec extends AkkaSpec { override protected def hash(k: String): Int = Unsafe.fastHash(hashSeed + k + hashSeed) override protected def isCacheable(v: String): Boolean = !v.startsWith("#") + override protected def isKeyCacheable(k: String): Boolean = !k.startsWith("!") def internalProbeDistanceOf(idealSlot: Int, actualSlot: Int): Int = probeDistanceOf(idealSlot, actualSlot) @@ -206,6 +207,37 @@ class LruBoundedCacheSpec extends AkkaSpec { cache.expectCached("E", "E:8") } + "not cache non cacheable keys" in { + val cache = new TestCache(4, 4) + + cache.expectComputedOnly("!A", "!A:0") + cache.expectComputedOnly("!A", "!A:1") + cache.expectComputedOnly("!A", "!A:2") + cache.expectComputedOnly("!A", "!A:3") + + cache.expectComputed("A", "A:4") + cache.expectComputed("B", "B:5") + cache.expectComputed("C", "C:6") + cache.expectComputed("D", "D:7") + cache.expectComputed("E", "E:8") + + cache.expectCached("B", "B:5") + cache.expectCached("C", "C:6") + cache.expectCached("D", "D:7") + cache.expectCached("E", "E:8") + + cache.expectComputedOnly("!A", "!A:9") + cache.expectComputedOnly("!A", "!A:10") + cache.expectComputedOnly("!A", "!A:11") + cache.expectComputedOnly("!A", "!A:12") + + // Cacheable values are not affected + cache.expectCached("B", "B:5") + cache.expectCached("C", "C:6") + cache.expectCached("D", "D:7") + cache.expectCached("E", "E:8") + } + "maintain a good average probe distance" in { for (_ <- 1 to 10) { val seed = Random.nextInt(1024)