21203: Cache/memoize ActorRef resolution

This commit is contained in:
Endre Sándor Varga 2016-09-02 18:09:43 +02:00
parent 6191f39565
commit 11fceb4121
7 changed files with 575 additions and 18 deletions

View file

@ -167,7 +167,7 @@ class CodecBenchmark {
val decoder: Flow[EnvelopeBuffer, InboundEnvelope, NotUsed] =
Flow.fromGraph(new Decoder(inboundContext, system.asInstanceOf[ExtendedActorSystem],
resolveActorRefWithLocalAddress, NoInboundCompressions, envelopePool, inboundEnvelopePool))
uniqueLocalAddress, NoInboundCompressions, envelopePool, inboundEnvelopePool))
Source.fromGraph(new BenchTestSourceSameElement(N, "elem"))
.map { _ =>
@ -208,7 +208,7 @@ class CodecBenchmark {
val decoder: Flow[EnvelopeBuffer, InboundEnvelope, NotUsed] =
Flow.fromGraph(new Decoder(inboundContext, system.asInstanceOf[ExtendedActorSystem],
resolveActorRefWithLocalAddress, NoInboundCompressions, envelopePool, inboundEnvelopePool))
uniqueLocalAddress, NoInboundCompressions, envelopePool, inboundEnvelopePool))
Source.fromGraph(new BenchTestSourceSameElement(N, "elem"))
.map(msg outboundEnvelopePool.acquire().init(OptionVal.None, payload, OptionVal.Some(remoteRefB)))

View file

@ -0,0 +1,78 @@
/**
* Copyright (C) 2016 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.util
import java.util
import java.util.concurrent.TimeUnit
import akka.remote.artery.LruBoundedCache
import org.openjdk.jmh.annotations.{ Param, _ }
import scala.util.Random
@State(Scope.Benchmark)
@Measurement(timeUnit = TimeUnit.MICROSECONDS)
class LruBoundedCacheBench {
var javaHashMap: java.util.HashMap[String, String] = _
@Param(Array("1024", "8192"))
var count = 0
@Param(Array("128", "256"))
var stringSize = 0
var lruCache: LruBoundedCache[String, String] = _
@Param(Array("90", "99"))
var loadFactor: Int = _
var toAdd: String = _
var toRemove: String = _
var toGet: String = _
@Setup
def setup(): Unit = {
val loadF: Double = loadFactor / 100.0
val threshold = (loadF * count).toInt
val random = Random
javaHashMap = new util.HashMap[String, String](count)
lruCache = new LruBoundedCache[String, String](count, threshold) {
override protected def compute(k: String): String = k
override protected def hash(k: String): Int = k.hashCode
override protected def isCacheable(v: String): Boolean = true
}
// Loading
for (i <- 1 to threshold) {
val value = random.nextString(stringSize)
if (i == 1) toGet = value
toRemove = value
javaHashMap.put(value, value)
lruCache.get(value)
}
toAdd = random.nextString(stringSize)
}
@Benchmark
def addOne_lruCache(): String = {
lruCache.getOrCompute(toAdd)
}
@Benchmark
def addOne_hashMap(): String = {
javaHashMap.put(toAdd, toAdd)
javaHashMap.get(toAdd)
}
@Benchmark
def addOne_hashMap_remove_put_get(): String = {
javaHashMap.remove(toRemove)
javaHashMap.put(toAdd, toAdd)
javaHashMap.get(toAdd)
}
}

View file

@ -914,10 +914,7 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R
}
def createDecoder(compression: InboundCompressions, bufferPool: EnvelopeBufferPool): Flow[EnvelopeBuffer, InboundEnvelope, NotUsed] = {
val resolveActorRefWithLocalAddress: String InternalActorRef = {
recipient provider.resolveActorRefWithLocalAddress(recipient, localAddress.address)
}
Flow.fromGraph(new Decoder(this, system, resolveActorRefWithLocalAddress, compression, bufferPool,
Flow.fromGraph(new Decoder(this, system, localAddress, compression, bufferPool,
inboundEnvelopePool))
}

View file

@ -7,7 +7,7 @@ package akka.remote.artery
import java.nio.charset.Charset
import java.nio.{ ByteBuffer, ByteOrder }
import akka.actor.{ ActorRef, Address }
import akka.actor.{ ActorPath, ChildActorPath, ActorRef, Address }
import akka.remote.artery.compress.CompressionProtocol._
import akka.remote.artery.compress.{ CompressionTable, InboundCompressions }
import akka.serialization.Serialization
@ -147,6 +147,21 @@ private[remote] sealed trait HeaderBuilder {
def manifest(originUid: Long): String
}
/**
* INTERNAL API
*/
private[remote] final class SerializationFormatCache
extends LruBoundedCache[ActorRef, String](capacity = 1024, evictAgeThreshold = 600) {
override protected def compute(ref: ActorRef): String = ref.path.toSerializationFormat
// Not calling ref.hashCode since it does a path.hashCode if ActorCell.undefinedUid is encountered.
// Refs with ActorCell.undefinedUid will now collide all the time, but this is not a usual scenario anyway.
override protected def hash(ref: ActorRef): Int = ref.path.uid
override protected def isCacheable(v: String): Boolean = true
}
/**
* INTERNAL API
*/
@ -154,6 +169,9 @@ private[remote] final class HeaderBuilderImpl(
inboundCompression: InboundCompressions,
var _outboundActorRefCompression: CompressionTable[ActorRef],
var _outboundClassManifestCompression: CompressionTable[String]) extends HeaderBuilder {
private[this] val toSerializationFormat: SerializationFormatCache = new SerializationFormatCache
// Fields only available for EnvelopeBuffer
var _version: Int = _
var _uid: Long = _
@ -215,7 +233,7 @@ private[remote] final class HeaderBuilderImpl(
def setRecipientActorRef(ref: ActorRef): Unit = {
_recipientActorRefIdx = outboundActorRefCompression.compress(ref)
if (_recipientActorRefIdx == -1) {
_recipientActorRef = ref.path.toSerializationFormat
_recipientActorRef = toSerializationFormat.getOrCompute(ref)
}
}
def recipientActorRef(originUid: Long): OptionVal[ActorRef] =

View file

@ -6,7 +6,7 @@ package akka.remote.artery
import scala.concurrent.duration._
import scala.util.control.NonFatal
import akka.actor._
import akka.remote.{ MessageSerializer, OversizedPayloadException, UniqueAddress }
import akka.remote.{ MessageSerializer, OversizedPayloadException, RemoteActorRefProvider, UniqueAddress }
import akka.remote.artery.SystemMessageDelivery.SystemMessageEnvelope
import akka.serialization.{ Serialization, SerializationExtension }
import akka.stream._
@ -16,10 +16,12 @@ import akka.actor.EmptyLocalActorRef
import akka.remote.artery.compress.InboundCompressions
import akka.stream.stage.TimerGraphStageLogic
import java.util.concurrent.TimeUnit
import scala.concurrent.Future
import akka.remote.artery.compress.CompressionTable
import akka.Done
import akka.stream.stage.GraphStageWithMaterializedValue
import scala.concurrent.Promise
import java.util.concurrent.atomic.AtomicInteger
@ -194,13 +196,27 @@ private[remote] object Decoder {
private object Tick
}
/**
* INTERNAL API
*/
private[akka] final class ActorRefResolveCache(provider: RemoteActorRefProvider, localAddress: UniqueAddress)
extends LruBoundedCache[String, InternalActorRef](capacity = 1024, evictAgeThreshold = 600) {
override protected def compute(k: String): InternalActorRef =
provider.resolveActorRefWithLocalAddress(k, localAddress.address)
override protected def hash(k: String): Int = FastHash.ofString(k)
override protected def isCacheable(v: InternalActorRef): Boolean = !v.isInstanceOf[EmptyLocalActorRef]
}
/**
* INTERNAL API
*/
private[remote] class Decoder(
inboundContext: InboundContext,
system: ExtendedActorSystem,
resolveActorRefWithLocalAddress: String InternalActorRef,
uniqueLocalAddress: UniqueAddress,
compression: InboundCompressions,
bufferPool: EnvelopeBufferPool,
inEnvelopePool: ObjectPool[ReusableInboundEnvelope]) extends GraphStage[FlowShape[EnvelopeBuffer, InboundEnvelope]] {
@ -214,6 +230,8 @@ private[remote] class Decoder(
import Decoder.RetryResolveRemoteDeployedRecipient
private val localAddress = inboundContext.localAddress.address
private val headerBuilder = HeaderBuilder.in(compression)
private val actorRefResolver: ActorRefResolveCache =
new ActorRefResolveCache(system.provider.asInstanceOf[RemoteActorRefProvider], uniqueLocalAddress)
private val retryResolveRemoteDeployedRecipientInterval = 50.millis
private val retryResolveRemoteDeployedRecipientAttempts = 20
@ -260,7 +278,7 @@ private[remote] class Decoder(
case OptionVal.Some(ref)
OptionVal(ref.asInstanceOf[InternalActorRef])
case OptionVal.None if headerBuilder.senderActorRefPath.isDefined
OptionVal(resolveActorRefWithLocalAddress(headerBuilder.senderActorRefPath.get))
OptionVal(actorRefResolver.getOrCompute(headerBuilder.senderActorRefPath.get))
case _
OptionVal.None
}
@ -315,7 +333,7 @@ private[remote] class Decoder(
}
private def resolveRecipient(path: String): OptionVal[InternalActorRef] = {
resolveActorRefWithLocalAddress(path) match {
actorRefResolver.getOrCompute(path) match {
case empty: EmptyLocalActorRef
val pathElements = empty.path.elements
// FIXME remote deployment corner case, please fix @patriknw (see also below, in onTimer)
@ -354,7 +372,7 @@ private[remote] class Decoder(
attemptsLeft - 1,
recipientPath, inboundEnvelope), retryResolveRemoteDeployedRecipientInterval)
else {
val recipient = resolveActorRefWithLocalAddress(recipientPath)
val recipient = actorRefResolver.getOrCompute(recipientPath)
// FIXME only retry for the first message, need to keep them in a cache
push(out, inboundEnvelope.withRecipient(recipient))
}

View file

@ -0,0 +1,221 @@
/**
* Copyright (C) 2016 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.remote.artery
import akka.util.{ OptionVal, Unsafe }
import scala.annotation.tailrec
import scala.reflect.ClassTag
object FastHash {
// Fast hash based on the 128 bit Xorshift128+ PRNG. Mixes in character bits into the random generator state.
def ofString(s: String): Int = {
val chars = Unsafe.instance.getObject(s, EnvelopeBuffer.StringValueFieldOffset).asInstanceOf[Array[Char]]
var s0: Long = 391408
var s1: Long = 601258
var i = 0
while (i < chars.length) {
var x = s0 ^ chars(i).toLong // Mix character into PRNG state
var y = s1
// Xorshift128+ round
s0 = y
x ^= x << 23
y ^= (y >>> 26)
x ^= (x >>> 17)
s1 = x ^ y
i += 1
}
(s0 + s1).toInt
}
}
/**
* INTERNAL API
*/
private[akka] case class CacheStatistics(entries: Int, maxProbeDistance: Int, averageProbeDistance: Double)
/**
* INTERNAL API
*
* This class is based on a Robin-Hood hashmap
* (http://www.sebastiansylvan.com/post/robin-hood-hashing-should-be-your-default-hash-table-implementation/)
* with backshift (http://codecapsule.com/2013/11/17/robin-hood-hashing-backward-shift-deletion/).
*
* The main modification compared to an RH hashmap is that it never grows the map (no rehashes) instead it is allowed
* to kick out entires that are considered old. The implementation tries to keep the map close to full, only evicting
* old entries when needed.
*/
private[akka] abstract class LruBoundedCache[K: ClassTag, V <: AnyRef: ClassTag](capacity: Int, evictAgeThreshold: Int) {
require(capacity > 0, "Capacity must be larger than zero")
require((capacity & (capacity - 1)) == 0, "Capacity must be power of two")
require(evictAgeThreshold <= capacity, "Age threshold must be less than capacity.")
private[this] val Mask = capacity - 1
// Practically guarantee an overflow
private[this] var epoch = Int.MaxValue - 1
private[this] val keys = Array.ofDim[K](capacity)
private[this] val values = Array.ofDim[V](capacity)
private[this] val hashes = Array.ofDim[Int](capacity)
private[this] val epochs = Array.fill[Int](capacity)(epoch - evictAgeThreshold) // Guarantee existing "values" are stale
final def get(k: K): Option[V] = {
val h = hash(k)
@tailrec def find(position: Int, probeDistance: Int): Option[V] = {
val otherProbeDistance = probeDistanceOf(position)
if (values(position) eq null) {
None
} else if (probeDistance > otherProbeDistance) {
None
} else if (hashes(position) == h && k == keys(position)) {
Some(values(position))
} else {
find((position + 1) & Mask, probeDistance + 1)
}
}
find(position = h & Mask, probeDistance = 0)
}
final def stats: CacheStatistics = {
var i = 0
var sum = 0
var count = 0
var max = 0
while (i < hashes.length) {
if (values(i) ne null) {
val dist = probeDistanceOf(i)
sum += dist
count += 1
max = math.max(dist, max)
}
i += 1
}
CacheStatistics(count, max, sum.toDouble / count)
}
final def getOrCompute(k: K): V = {
val h = hash(k)
epoch += 1
@tailrec def findOrCalculate(position: Int, probeDistance: Int): V = {
if (values(position) eq null) {
val value = compute(k)
if (isCacheable(value)) {
keys(position) = k
values(position) = value
hashes(position) = h
epochs(position) = epoch
}
value
} else {
val otherProbeDistance = probeDistanceOf(position)
// If probe distance of the element we try to get is larger than the current slot's, then the element cannot be in
// the table since because of the Robin-Hood property we would have swapped it with the current element.
if (probeDistance > otherProbeDistance) {
val value = compute(k)
if (isCacheable(value)) move(position, k, h, value, epoch, probeDistance)
value
} else if (hashes(position) == h && k == keys(position)) {
// Update usage
epochs(position) = epoch
values(position)
} else {
// This is not our slot yet
findOrCalculate((position + 1) & Mask, probeDistance + 1)
}
}
}
findOrCalculate(position = h & Mask, probeDistance = 0)
}
@tailrec private def removeAt(position: Int): Unit = {
val next = (position + 1) & Mask
if ((values(next) eq null) || probeDistanceOf(next) == 0) {
// Next is not movable, just empty this slot
values(position) = null.asInstanceOf[V]
} else {
// Shift the next slot here
keys(position) = keys(next)
values(position) = values(next)
hashes(position) = hashes(next)
epochs(position) = epochs(next)
// remove the shifted slot
removeAt(next)
}
}
// Wraparound distance of the element that is in this slot. (X + capacity) & Mask ensures that there are no
// negative numbers on wraparound
private def probeDistanceOf(slot: Int): Int = probeDistanceOf(idealSlot = hashes(slot) & Mask, actualSlot = slot)
// Protected for exposing it to unit tests
protected def probeDistanceOf(idealSlot: Int, actualSlot: Int) = ((actualSlot - idealSlot) + capacity) & Mask
@tailrec private def move(position: Int, k: K, h: Int, value: V, elemEpoch: Int, probeDistance: Int): Unit = {
if (values(position) eq null) {
// Found an empty place, done.
keys(position) = k
values(position) = value
hashes(position) = h
epochs(position) = elemEpoch // Do NOT update the epoch of the elem. It was not touched, just moved
} else {
val otherEpoch = epochs(position)
// Check if the current entry is too old
if (epoch - otherEpoch >= evictAgeThreshold) {
// Remove old entry to make space
removeAt(position)
// Try to insert our element in hand to its ideal slot
move(h & Mask, k, h, value, elemEpoch, 0)
} else {
val otherProbeDistance = probeDistanceOf(position)
val otherEpoch = epochs(position)
// Check whose probe distance is larger. The one with the larger one wins the slot.
if (probeDistance > otherProbeDistance) {
// Due to the Robin-Hood property, we now take away this slot from the "richer" and take it for ourselves
val otherKey = keys(position)
val otherValue = values(position)
val otherHash = hashes(position)
keys(position) = k
values(position) = value
hashes(position) = h
epochs(position) = elemEpoch
// Move out the old one
move((position + 1) & Mask, otherKey, otherHash, otherValue, otherEpoch, otherProbeDistance + 1)
} else {
// We are the "richer" so we need to find another slot
move((position + 1) & Mask, k, h, value, elemEpoch, probeDistance + 1)
}
}
}
}
protected def compute(k: K): V
protected def hash(k: K): Int
protected def isCacheable(v: V): Boolean
override def toString =
s"LruBoundedCache(" +
s" values = ${values.mkString("[", ",", "]")}," +
s" hashes = ${hashes.map(_ & Mask).mkString("[", ",", "]")}," +
s" epochs = ${epochs.mkString("[", ",", "]")}," +
s" distances = ${(0 until hashes.length).map(probeDistanceOf).mkString("[", ",", "]")}," +
s" $epoch)"
}

View file

@ -0,0 +1,225 @@
/**
* Copyright (C) 2016 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.remote.artery
import akka.testkit.AkkaSpec
import scala.util.Random
class LruBoundedCacheSpec extends AkkaSpec {
class TestCache(_capacity: Int, threshold: Int, hashSeed: String = "") extends LruBoundedCache[String, String](_capacity, threshold) {
private var cntr = 0
override protected def compute(k: String): String = {
val id = cntr
cntr += 1
k + ":" + id
}
override protected def hash(k: String): Int = FastHash.ofString(hashSeed + k + hashSeed)
override protected def isCacheable(v: String): Boolean = !v.startsWith("#")
def internalProbeDistanceOf(idealSlot: Int, actualSlot: Int): Int = probeDistanceOf(idealSlot, actualSlot)
def expectComputed(key: String, value: String): Unit = {
get(key) should ===(None)
getOrCompute(key) should ===(value)
get(key) should ===(Some(value))
}
def expectCached(key: String, value: String): Unit = {
get(key) should ===(Some(value))
getOrCompute(key) should ===(value)
get(key) should ===(Some(value))
}
def expectComputedOnly(key: String, value: String): Unit = {
get(key) should ===(None)
getOrCompute(key) should ===(value)
get(key) should ===(None)
}
}
final class BrokenHashFunctionTestCache(_capacity: Int, _threshold: Int) extends TestCache(_capacity, _threshold) {
override protected def hash(k: String): Int = 0
}
"LruBoundedCache" must {
"work in the happy case" in {
val cache = new TestCache(4, 4)
cache.expectComputed("A", "A:0")
cache.expectComputed("B", "B:1")
cache.expectComputed("C", "C:2")
cache.expectComputed("D", "D:3")
cache.expectCached("A", "A:0")
cache.expectCached("B", "B:1")
cache.expectCached("C", "C:2")
cache.expectCached("D", "D:3")
}
"evict oldest when full" in {
for (_ 1 to 10) {
val seed = Random.nextInt(1024)
info(s"Variant $seed")
val cache = new TestCache(4, 4, seed.toString)
cache.expectComputed("A", "A:0")
cache.expectComputed("B", "B:1")
cache.expectComputed("C", "C:2")
cache.expectComputed("D", "D:3")
cache.expectComputed("E", "E:4")
cache.expectCached("B", "B:1")
cache.expectCached("C", "C:2")
cache.expectCached("D", "D:3")
cache.expectCached("E", "E:4")
cache.expectComputed("A", "A:5")
cache.expectComputed("B", "B:6")
cache.expectComputed("C", "C:7")
cache.expectComputed("D", "D:8")
cache.expectComputed("E", "E:9")
cache.expectCached("B", "B:6")
cache.expectCached("C", "C:7")
cache.expectCached("D", "D:8")
cache.expectCached("E", "E:9")
}
}
"work with low quality hash function" in {
val cache = new BrokenHashFunctionTestCache(4, 4)
cache.expectComputed("A", "A:0")
cache.expectComputed("B", "B:1")
cache.expectComputed("C", "C:2")
cache.expectComputed("D", "D:3")
cache.expectComputed("E", "E:4")
cache.expectCached("B", "B:1")
cache.expectCached("C", "C:2")
cache.expectCached("D", "D:3")
cache.expectCached("E", "E:4")
cache.expectComputed("A", "A:5")
cache.expectComputed("B", "B:6")
cache.expectComputed("C", "C:7")
cache.expectComputed("D", "D:8")
cache.expectComputed("E", "E:9")
cache.expectCached("B", "B:6")
cache.expectCached("C", "C:7")
cache.expectCached("D", "D:8")
cache.expectCached("E", "E:9")
}
"calculate probe distance correctly" in {
val cache = new TestCache(4, 4)
cache.internalProbeDistanceOf(0, 0) should ===(0)
cache.internalProbeDistanceOf(0, 1) should ===(1)
cache.internalProbeDistanceOf(0, 2) should ===(2)
cache.internalProbeDistanceOf(0, 3) should ===(3)
cache.internalProbeDistanceOf(1, 1) should ===(0)
cache.internalProbeDistanceOf(1, 2) should ===(1)
cache.internalProbeDistanceOf(1, 3) should ===(2)
cache.internalProbeDistanceOf(1, 0) should ===(3)
cache.internalProbeDistanceOf(2, 2) should ===(0)
cache.internalProbeDistanceOf(2, 3) should ===(1)
cache.internalProbeDistanceOf(2, 0) should ===(2)
cache.internalProbeDistanceOf(2, 1) should ===(3)
cache.internalProbeDistanceOf(3, 3) should ===(0)
cache.internalProbeDistanceOf(3, 0) should ===(1)
cache.internalProbeDistanceOf(3, 1) should ===(2)
cache.internalProbeDistanceOf(3, 2) should ===(3)
}
"work with a lower age threshold" in {
for (_ 1 to 10) {
val seed = Random.nextInt(1024)
info(s"Variant $seed")
val cache = new TestCache(4, 2, seed.toString)
cache.expectComputed("A", "A:0")
cache.expectComputed("B", "B:1")
cache.expectComputed("C", "C:2")
cache.expectComputed("D", "D:3")
cache.expectComputed("E", "E:4")
cache.expectCached("D", "D:3")
cache.expectCached("E", "E:4")
cache.expectComputed("F", "F:5")
cache.expectComputed("G", "G:6")
cache.expectComputed("H", "H:7")
cache.expectComputed("I", "I:8")
cache.expectComputed("J", "J:9")
cache.expectCached("I", "I:8")
cache.expectCached("J", "J:9")
}
}
"must not cache noncacheable values" in {
val cache = new TestCache(4, 4)
cache.expectComputedOnly("#A", "#A:0")
cache.expectComputedOnly("#A", "#A:1")
cache.expectComputedOnly("#A", "#A:2")
cache.expectComputedOnly("#A", "#A:3")
cache.expectComputed("A", "A:4")
cache.expectComputed("B", "B:5")
cache.expectComputed("C", "C:6")
cache.expectComputed("D", "D:7")
cache.expectComputed("E", "E:8")
cache.expectCached("B", "B:5")
cache.expectCached("C", "C:6")
cache.expectCached("D", "D:7")
cache.expectCached("E", "E:8")
cache.expectComputedOnly("#A", "#A:9")
cache.expectComputedOnly("#A", "#A:10")
cache.expectComputedOnly("#A", "#A:11")
cache.expectComputedOnly("#A", "#A:12")
// Cacheable values are not affected
cache.expectCached("B", "B:5")
cache.expectCached("C", "C:6")
cache.expectCached("D", "D:7")
cache.expectCached("E", "E:8")
}
"maintain a good average probe distance" in {
for (_ 1 to 10) {
val seed = Random.nextInt(1024)
info(s"Variant $seed")
// Cache emulating 60% fill rate
val cache = new TestCache(1024, 600, seed.toString)
// Fill up cache
for (_ 1 to 10000) cache.getOrCompute(Random.nextString(32))
val stats = cache.stats
// Have not seen lower than 890
stats.entries should be > 750
// Have not seen higher than 1.8
stats.averageProbeDistance should be < 2.5
// Have not seen higher than 15
stats.maxProbeDistance should be < 25
}
}
}
}