Optimize recipient serialization cache (#31084)

* New benchmark specifically for SerializationFormatCache
* avoid caching serialized form of temporary actors
This commit is contained in:
Johan Andrén 2022-01-25 15:41:03 +01:00 committed by GitHub
parent 609a7c5f8d
commit 7ff3d9fbff
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
9 changed files with 184 additions and 31 deletions

View file

@ -505,7 +505,7 @@ final class ExplicitlyAskableActorSelection(val actorSel: ActorSelection) extend
*
* INTERNAL API
*/
private[akka] final class PromiseActorRef private (
private[akka] final class PromiseActorRef(
val provider: ActorRefProvider,
val result: Promise[Any],
_mcn: String,

View file

@ -0,0 +1,105 @@
/*
* Copyright (C) 2009-2021 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.remote.artery.compress
import akka.actor.Actor
import akka.actor.ActorRef
import akka.actor.ActorSystem
import akka.actor.ExtendedActorSystem
import akka.actor.Props
import akka.pattern.PromiseActorRef
import akka.remote.artery.SerializationFormatCache
import akka.serialization.Serialization
import org.openjdk.jmh.annotations.Benchmark
import org.openjdk.jmh.annotations.Fork
import org.openjdk.jmh.annotations.Level
import org.openjdk.jmh.annotations.OperationsPerInvocation
import org.openjdk.jmh.annotations.Param
import org.openjdk.jmh.annotations.Scope
import org.openjdk.jmh.annotations.Setup
import org.openjdk.jmh.annotations.State
import org.openjdk.jmh.annotations.TearDown
import org.openjdk.jmh.infra.Blackhole
import scala.annotation.nowarn
import scala.concurrent.Promise
/**
* Actually more like specific benchmarks for the few concrete LRU cache usages
*/
@Fork(1)
@State(Scope.Benchmark)
@nowarn
class SerializationFormatCacheBenchmark {
// note 1 means only top level no temporary at all
@Param(Array("1", "2", "5", "10"))
private var everyNToToplevel = 0
// a few "normal" top level actors communicating
@Param(Array("100"))
private var uniqueTopLevelRefs = 0
// we want to simulate one per request-response, but create upfront, so very high number
@Param(Array("100000"))
private var uniqueTemporaryRefs = 0
private var system: ActorSystem = _
// hardocoded capacity of 1024
// note that this is not quite realistic, with a single cache,
// in practice there are N caches, one in each outgoing artery lane
private var cache: SerializationFormatCache = _
private var temporaryActorRefs: Array[ActorRef] = _
private var topLevelActorRefs: Array[ActorRef] = _
object Parent {
def props(childCount: Int, childProps: Props) = Props(new Parent(childCount, childProps))
}
class Parent(childCount: Int, childProps: Props) extends Actor {
val children = (0 to childCount).map(_ => context.actorOf(childProps))
def receive = PartialFunction.empty
}
@Setup
def init(): Unit = {
system = ActorSystem("SerializationFormatCacheBenchmark")
temporaryActorRefs = Array.tabulate(uniqueTemporaryRefs)(
n =>
new PromiseActorRef(
system.asInstanceOf[ExtendedActorSystem].provider,
Promise(),
"Any",
// request path is encoded in this string
s"_user_region_shard${n % 100}_entitypretendid${n}"))
topLevelActorRefs = Array.tabulate(uniqueTopLevelRefs)(n => system.actorOf(Props.empty, s"actor_$n"))
}
// new empty cache instance each iteration to have more control over cached contents
@Setup(Level.Iteration)
def perRunSetup(): Unit = {
cache = new SerializationFormatCache
}
@TearDown
def shutdown(): Unit = {
system.terminate()
}
@Benchmark
@OperationsPerInvocation(2000)
def useCache(blackhole: Blackhole): Unit = {
// serialization requires this
Serialization.withTransportInformation(system.asInstanceOf[ExtendedActorSystem]) { () =>
var i: Int = 0
while (i < 2000) {
val actorRef =
if (i % everyNToToplevel == 0) topLevelActorRefs(i % uniqueTopLevelRefs)
else temporaryActorRefs(i % uniqueTemporaryRefs)
blackhole.consume(cache.getOrCompute(actorRef))
i += 1
}
}
}
}

View file

@ -44,6 +44,7 @@ class LruBoundedCacheBench {
override protected def compute(k: String): String = k
override protected def hash(k: String): Int = k.hashCode
override protected def isCacheable(v: String): Boolean = true
override protected def isKeyCacheable(k: String): Boolean = true
}
// Loading

View file

@ -0,0 +1,2 @@
# Issue #31080, internal changes
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.remote.artery.LruBoundedCache.isKeyCacheable")

View file

@ -346,6 +346,8 @@ private[remote] final class ActorRefResolveCacheWithAddress(
override protected def compute(k: String): InternalActorRef =
provider.resolveActorRefWithLocalAddress(k, localAddress.address)
override protected def isKeyCacheable(k: String): Boolean = true
}
/**

View file

@ -5,10 +5,9 @@
package akka.remote.artery
import java.nio.{ ByteBuffer, ByteOrder }
import org.agrona.concurrent.{ ManyToManyConcurrentArrayQueue, UnsafeBuffer }
import akka.actor.ActorRef
import akka.actor.InternalActorRef
import akka.io.DirectByteBufferPool
import akka.remote.artery.compress.{ CompressionTable, InboundCompressions, NoInboundCompressions }
import akka.serialization.Serialization
@ -198,6 +197,9 @@ private[remote] sealed trait HeaderBuilder {
}
/**
* Cache of the serialized path for the most used actor refs, performance critical as applied once for every
* outgoing message. Depends on the Serialization#withTransportInformation thread local to be set for serialization.
*
* INTERNAL API
*/
private[remote] final class SerializationFormatCache
@ -209,6 +211,10 @@ private[remote] final class SerializationFormatCache
// Refs with ActorCell.undefinedUid will now collide all the time, but this is not a usual scenario anyway.
override protected def hash(ref: ActorRef): Int = ref.path.uid
override protected def isKeyCacheable(k: ActorRef): Boolean =
// "temp" only for one request-response interaction so don't cache
!InternalActorRef.isTemporaryRef(k)
override protected def isCacheable(v: String): Boolean = true
}

View file

@ -76,41 +76,44 @@ private[akka] abstract class LruBoundedCache[K: ClassTag, V <: AnyRef: ClassTag]
CacheStatistics(count, max, sum.toDouble / count)
}
final def getOrCompute(k: K): V = {
val h = hash(k)
epoch += 1
final def getOrCompute(k: K): V =
if (!isKeyCacheable(k)) {
compute(k)
} else {
val h = hash(k)
epoch += 1
@tailrec def findOrCalculate(position: Int, probeDistance: Int): V = {
if (values(position) eq null) {
val value = compute(k)
if (isCacheable(value)) {
keys(position) = k
values(position) = value
hashes(position) = h
epochs(position) = epoch
}
value
} else {
val otherProbeDistance = probeDistanceOf(position)
// If probe distance of the element we try to get is larger than the current slot's, then the element cannot be in
// the table since because of the Robin-Hood property we would have swapped it with the current element.
if (probeDistance > otherProbeDistance) {
@tailrec def findOrCalculate(position: Int, probeDistance: Int): V = {
if (values(position) eq null) {
val value = compute(k)
if (isCacheable(value)) move(position, k, h, value, epoch, probeDistance)
if (isCacheable(value)) {
keys(position) = k
values(position) = value
hashes(position) = h
epochs(position) = epoch
}
value
} else if (hashes(position) == h && k == keys(position)) {
// Update usage
epochs(position) = epoch
values(position)
} else {
// This is not our slot yet
findOrCalculate((position + 1) & Mask, probeDistance + 1)
val otherProbeDistance = probeDistanceOf(position)
// If probe distance of the element we try to get is larger than the current slot's, then the element cannot be in
// the table since because of the Robin-Hood property we would have swapped it with the current element.
if (probeDistance > otherProbeDistance) {
val value = compute(k)
if (isCacheable(value)) move(position, k, h, value, epoch, probeDistance)
value
} else if (hashes(position) == h && k == keys(position)) {
// Update usage
epochs(position) = epoch
values(position)
} else {
// This is not our slot yet
findOrCalculate((position + 1) & Mask, probeDistance + 1)
}
}
}
}
findOrCalculate(position = h & Mask, probeDistance = 0)
}
findOrCalculate(position = h & Mask, probeDistance = 0)
}
@tailrec private def removeAt(position: Int): Unit = {
val next = (position + 1) & Mask
@ -182,6 +185,7 @@ private[akka] abstract class LruBoundedCache[K: ClassTag, V <: AnyRef: ClassTag]
protected def hash(k: K): Int
protected def isKeyCacheable(k: K): Boolean
protected def isCacheable(v: V): Boolean
override def toString =

View file

@ -95,6 +95,7 @@ private[akka] abstract class AbstractActorRefResolveCache[R <: ActorRef: ClassTa
override protected def hash(k: String): Int = Unsafe.fastHash(k)
override protected def isKeyCacheable(k: String): Boolean = true
override protected def isCacheable(ref: R): Boolean =
ref match {
case _: EmptyLocalActorRef => false

View file

@ -27,6 +27,7 @@ class LruBoundedCacheSpec extends AkkaSpec {
override protected def hash(k: String): Int = Unsafe.fastHash(hashSeed + k + hashSeed)
override protected def isCacheable(v: String): Boolean = !v.startsWith("#")
override protected def isKeyCacheable(k: String): Boolean = !k.startsWith("!")
def internalProbeDistanceOf(idealSlot: Int, actualSlot: Int): Int = probeDistanceOf(idealSlot, actualSlot)
@ -206,6 +207,37 @@ class LruBoundedCacheSpec extends AkkaSpec {
cache.expectCached("E", "E:8")
}
"not cache non cacheable keys" in {
val cache = new TestCache(4, 4)
cache.expectComputedOnly("!A", "!A:0")
cache.expectComputedOnly("!A", "!A:1")
cache.expectComputedOnly("!A", "!A:2")
cache.expectComputedOnly("!A", "!A:3")
cache.expectComputed("A", "A:4")
cache.expectComputed("B", "B:5")
cache.expectComputed("C", "C:6")
cache.expectComputed("D", "D:7")
cache.expectComputed("E", "E:8")
cache.expectCached("B", "B:5")
cache.expectCached("C", "C:6")
cache.expectCached("D", "D:7")
cache.expectCached("E", "E:8")
cache.expectComputedOnly("!A", "!A:9")
cache.expectComputedOnly("!A", "!A:10")
cache.expectComputedOnly("!A", "!A:11")
cache.expectComputedOnly("!A", "!A:12")
// Cacheable values are not affected
cache.expectCached("B", "B:5")
cache.expectCached("C", "C:6")
cache.expectCached("D", "D:7")
cache.expectCached("E", "E:8")
}
"maintain a good average probe distance" in {
for (_ <- 1 to 10) {
val seed = Random.nextInt(1024)