From 11fceb41213c5dba79f2b14f3fd0b1cbcfb5363b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Endre=20S=C3=A1ndor=20Varga?= Date: Fri, 2 Sep 2016 18:09:43 +0200 Subject: [PATCH] 21203: Cache/memoize ActorRef resolution --- .../akka/remote/artery/CodecBenchmark.scala | 4 +- .../akka/util/LruBoundedCacheBench.scala | 78 ++++++ .../akka/remote/artery/ArteryTransport.scala | 5 +- .../scala/akka/remote/artery/BufferPool.scala | 22 +- .../scala/akka/remote/artery/Codecs.scala | 38 ++- .../akka/remote/artery/LruBoundedCache.scala | 221 +++++++++++++++++ .../remote/artery/LruBoundedCacheSpec.scala | 225 ++++++++++++++++++ 7 files changed, 575 insertions(+), 18 deletions(-) create mode 100644 akka-bench-jmh/src/main/scala/akka/util/LruBoundedCacheBench.scala create mode 100644 akka-remote/src/main/scala/akka/remote/artery/LruBoundedCache.scala create mode 100644 akka-remote/src/test/scala/akka/remote/artery/LruBoundedCacheSpec.scala diff --git a/akka-bench-jmh/src/main/scala/akka/remote/artery/CodecBenchmark.scala b/akka-bench-jmh/src/main/scala/akka/remote/artery/CodecBenchmark.scala index b2b9cf121a..76c52c01a5 100644 --- a/akka-bench-jmh/src/main/scala/akka/remote/artery/CodecBenchmark.scala +++ b/akka-bench-jmh/src/main/scala/akka/remote/artery/CodecBenchmark.scala @@ -167,7 +167,7 @@ class CodecBenchmark { val decoder: Flow[EnvelopeBuffer, InboundEnvelope, NotUsed] = Flow.fromGraph(new Decoder(inboundContext, system.asInstanceOf[ExtendedActorSystem], - resolveActorRefWithLocalAddress, NoInboundCompressions, envelopePool, inboundEnvelopePool)) + uniqueLocalAddress, NoInboundCompressions, envelopePool, inboundEnvelopePool)) Source.fromGraph(new BenchTestSourceSameElement(N, "elem")) .map { _ => @@ -208,7 +208,7 @@ class CodecBenchmark { val decoder: Flow[EnvelopeBuffer, InboundEnvelope, NotUsed] = Flow.fromGraph(new Decoder(inboundContext, system.asInstanceOf[ExtendedActorSystem], - resolveActorRefWithLocalAddress, NoInboundCompressions, envelopePool, inboundEnvelopePool)) + uniqueLocalAddress, NoInboundCompressions, envelopePool, inboundEnvelopePool)) Source.fromGraph(new BenchTestSourceSameElement(N, "elem")) .map(msg ⇒ outboundEnvelopePool.acquire().init(OptionVal.None, payload, OptionVal.Some(remoteRefB))) diff --git a/akka-bench-jmh/src/main/scala/akka/util/LruBoundedCacheBench.scala b/akka-bench-jmh/src/main/scala/akka/util/LruBoundedCacheBench.scala new file mode 100644 index 0000000000..cc89de28d9 --- /dev/null +++ b/akka-bench-jmh/src/main/scala/akka/util/LruBoundedCacheBench.scala @@ -0,0 +1,78 @@ +/** + * Copyright (C) 2016 Lightbend Inc. + */ +package akka.util + +import java.util +import java.util.concurrent.TimeUnit + +import akka.remote.artery.LruBoundedCache +import org.openjdk.jmh.annotations.{ Param, _ } + +import scala.util.Random + +@State(Scope.Benchmark) +@Measurement(timeUnit = TimeUnit.MICROSECONDS) +class LruBoundedCacheBench { + + var javaHashMap: java.util.HashMap[String, String] = _ + + @Param(Array("1024", "8192")) + var count = 0 + + @Param(Array("128", "256")) + var stringSize = 0 + var lruCache: LruBoundedCache[String, String] = _ + + @Param(Array("90", "99")) + var loadFactor: Int = _ + + var toAdd: String = _ + var toRemove: String = _ + var toGet: String = _ + + @Setup + def setup(): Unit = { + val loadF: Double = loadFactor / 100.0 + val threshold = (loadF * count).toInt + + val random = Random + javaHashMap = new util.HashMap[String, String](count) + lruCache = new LruBoundedCache[String, String](count, threshold) { + override protected def compute(k: String): String = k + override protected def hash(k: String): Int = k.hashCode + override protected def isCacheable(v: String): Boolean = true + } + + // Loading + for (i <- 1 to threshold) { + val value = random.nextString(stringSize) + if (i == 1) toGet = value + toRemove = value + javaHashMap.put(value, value) + lruCache.get(value) + } + + toAdd = random.nextString(stringSize) + + } + + @Benchmark + def addOne_lruCache(): String = { + lruCache.getOrCompute(toAdd) + } + + @Benchmark + def addOne_hashMap(): String = { + javaHashMap.put(toAdd, toAdd) + javaHashMap.get(toAdd) + } + + @Benchmark + def addOne_hashMap_remove_put_get(): String = { + javaHashMap.remove(toRemove) + javaHashMap.put(toAdd, toAdd) + javaHashMap.get(toAdd) + } + +} diff --git a/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala b/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala index 41c7c74332..8ddb7e17a7 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala @@ -914,10 +914,7 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R } def createDecoder(compression: InboundCompressions, bufferPool: EnvelopeBufferPool): Flow[EnvelopeBuffer, InboundEnvelope, NotUsed] = { - val resolveActorRefWithLocalAddress: String ⇒ InternalActorRef = { - recipient ⇒ provider.resolveActorRefWithLocalAddress(recipient, localAddress.address) - } - Flow.fromGraph(new Decoder(this, system, resolveActorRefWithLocalAddress, compression, bufferPool, + Flow.fromGraph(new Decoder(this, system, localAddress, compression, bufferPool, inboundEnvelopePool)) } diff --git a/akka-remote/src/main/scala/akka/remote/artery/BufferPool.scala b/akka-remote/src/main/scala/akka/remote/artery/BufferPool.scala index 6773dfdeb8..745417daa1 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/BufferPool.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/BufferPool.scala @@ -7,7 +7,7 @@ package akka.remote.artery import java.nio.charset.Charset import java.nio.{ ByteBuffer, ByteOrder } -import akka.actor.{ ActorRef, Address } +import akka.actor.{ ActorPath, ChildActorPath, ActorRef, Address } import akka.remote.artery.compress.CompressionProtocol._ import akka.remote.artery.compress.{ CompressionTable, InboundCompressions } import akka.serialization.Serialization @@ -147,6 +147,21 @@ private[remote] sealed trait HeaderBuilder { def manifest(originUid: Long): String } +/** + * INTERNAL API + */ +private[remote] final class SerializationFormatCache + extends LruBoundedCache[ActorRef, String](capacity = 1024, evictAgeThreshold = 600) { + + override protected def compute(ref: ActorRef): String = ref.path.toSerializationFormat + + // Not calling ref.hashCode since it does a path.hashCode if ActorCell.undefinedUid is encountered. + // Refs with ActorCell.undefinedUid will now collide all the time, but this is not a usual scenario anyway. + override protected def hash(ref: ActorRef): Int = ref.path.uid + + override protected def isCacheable(v: String): Boolean = true +} + /** * INTERNAL API */ @@ -154,6 +169,9 @@ private[remote] final class HeaderBuilderImpl( inboundCompression: InboundCompressions, var _outboundActorRefCompression: CompressionTable[ActorRef], var _outboundClassManifestCompression: CompressionTable[String]) extends HeaderBuilder { + + private[this] val toSerializationFormat: SerializationFormatCache = new SerializationFormatCache + // Fields only available for EnvelopeBuffer var _version: Int = _ var _uid: Long = _ @@ -215,7 +233,7 @@ private[remote] final class HeaderBuilderImpl( def setRecipientActorRef(ref: ActorRef): Unit = { _recipientActorRefIdx = outboundActorRefCompression.compress(ref) if (_recipientActorRefIdx == -1) { - _recipientActorRef = ref.path.toSerializationFormat + _recipientActorRef = toSerializationFormat.getOrCompute(ref) } } def recipientActorRef(originUid: Long): OptionVal[ActorRef] = diff --git a/akka-remote/src/main/scala/akka/remote/artery/Codecs.scala b/akka-remote/src/main/scala/akka/remote/artery/Codecs.scala index 04081b6ebe..b9ded54aed 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/Codecs.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/Codecs.scala @@ -6,7 +6,7 @@ package akka.remote.artery import scala.concurrent.duration._ import scala.util.control.NonFatal import akka.actor._ -import akka.remote.{ MessageSerializer, OversizedPayloadException, UniqueAddress } +import akka.remote.{ MessageSerializer, OversizedPayloadException, RemoteActorRefProvider, UniqueAddress } import akka.remote.artery.SystemMessageDelivery.SystemMessageEnvelope import akka.serialization.{ Serialization, SerializationExtension } import akka.stream._ @@ -16,10 +16,12 @@ import akka.actor.EmptyLocalActorRef import akka.remote.artery.compress.InboundCompressions import akka.stream.stage.TimerGraphStageLogic import java.util.concurrent.TimeUnit + import scala.concurrent.Future import akka.remote.artery.compress.CompressionTable import akka.Done import akka.stream.stage.GraphStageWithMaterializedValue + import scala.concurrent.Promise import java.util.concurrent.atomic.AtomicInteger @@ -194,16 +196,30 @@ private[remote] object Decoder { private object Tick } +/** + * INTERNAL API + */ +private[akka] final class ActorRefResolveCache(provider: RemoteActorRefProvider, localAddress: UniqueAddress) + extends LruBoundedCache[String, InternalActorRef](capacity = 1024, evictAgeThreshold = 600) { + + override protected def compute(k: String): InternalActorRef = + provider.resolveActorRefWithLocalAddress(k, localAddress.address) + + override protected def hash(k: String): Int = FastHash.ofString(k) + + override protected def isCacheable(v: InternalActorRef): Boolean = !v.isInstanceOf[EmptyLocalActorRef] +} + /** * INTERNAL API */ private[remote] class Decoder( - inboundContext: InboundContext, - system: ExtendedActorSystem, - resolveActorRefWithLocalAddress: String ⇒ InternalActorRef, - compression: InboundCompressions, - bufferPool: EnvelopeBufferPool, - inEnvelopePool: ObjectPool[ReusableInboundEnvelope]) extends GraphStage[FlowShape[EnvelopeBuffer, InboundEnvelope]] { + inboundContext: InboundContext, + system: ExtendedActorSystem, + uniqueLocalAddress: UniqueAddress, + compression: InboundCompressions, + bufferPool: EnvelopeBufferPool, + inEnvelopePool: ObjectPool[ReusableInboundEnvelope]) extends GraphStage[FlowShape[EnvelopeBuffer, InboundEnvelope]] { import Decoder.Tick val in: Inlet[EnvelopeBuffer] = Inlet("Artery.Decoder.in") val out: Outlet[InboundEnvelope] = Outlet("Artery.Decoder.out") @@ -214,6 +230,8 @@ private[remote] class Decoder( import Decoder.RetryResolveRemoteDeployedRecipient private val localAddress = inboundContext.localAddress.address private val headerBuilder = HeaderBuilder.in(compression) + private val actorRefResolver: ActorRefResolveCache = + new ActorRefResolveCache(system.provider.asInstanceOf[RemoteActorRefProvider], uniqueLocalAddress) private val retryResolveRemoteDeployedRecipientInterval = 50.millis private val retryResolveRemoteDeployedRecipientAttempts = 20 @@ -260,7 +278,7 @@ private[remote] class Decoder( case OptionVal.Some(ref) ⇒ OptionVal(ref.asInstanceOf[InternalActorRef]) case OptionVal.None if headerBuilder.senderActorRefPath.isDefined ⇒ - OptionVal(resolveActorRefWithLocalAddress(headerBuilder.senderActorRefPath.get)) + OptionVal(actorRefResolver.getOrCompute(headerBuilder.senderActorRefPath.get)) case _ ⇒ OptionVal.None } @@ -315,7 +333,7 @@ private[remote] class Decoder( } private def resolveRecipient(path: String): OptionVal[InternalActorRef] = { - resolveActorRefWithLocalAddress(path) match { + actorRefResolver.getOrCompute(path) match { case empty: EmptyLocalActorRef ⇒ val pathElements = empty.path.elements // FIXME remote deployment corner case, please fix @patriknw (see also below, in onTimer) @@ -354,7 +372,7 @@ private[remote] class Decoder( attemptsLeft - 1, recipientPath, inboundEnvelope), retryResolveRemoteDeployedRecipientInterval) else { - val recipient = resolveActorRefWithLocalAddress(recipientPath) + val recipient = actorRefResolver.getOrCompute(recipientPath) // FIXME only retry for the first message, need to keep them in a cache push(out, inboundEnvelope.withRecipient(recipient)) } diff --git a/akka-remote/src/main/scala/akka/remote/artery/LruBoundedCache.scala b/akka-remote/src/main/scala/akka/remote/artery/LruBoundedCache.scala new file mode 100644 index 0000000000..40e382348f --- /dev/null +++ b/akka-remote/src/main/scala/akka/remote/artery/LruBoundedCache.scala @@ -0,0 +1,221 @@ +/** + * Copyright (C) 2016 Lightbend Inc. + */ +package akka.remote.artery + +import akka.util.{ OptionVal, Unsafe } + +import scala.annotation.tailrec +import scala.reflect.ClassTag + +object FastHash { + + // Fast hash based on the 128 bit Xorshift128+ PRNG. Mixes in character bits into the random generator state. + def ofString(s: String): Int = { + val chars = Unsafe.instance.getObject(s, EnvelopeBuffer.StringValueFieldOffset).asInstanceOf[Array[Char]] + var s0: Long = 391408 + var s1: Long = 601258 + var i = 0 + + while (i < chars.length) { + var x = s0 ^ chars(i).toLong // Mix character into PRNG state + var y = s1 + + // Xorshift128+ round + s0 = y + x ^= x << 23 + y ^= (y >>> 26) + x ^= (x >>> 17) + s1 = x ^ y + + i += 1 + } + + (s0 + s1).toInt + } + +} + +/** + * INTERNAL API + */ +private[akka] case class CacheStatistics(entries: Int, maxProbeDistance: Int, averageProbeDistance: Double) + +/** + * INTERNAL API + * + * This class is based on a Robin-Hood hashmap + * (http://www.sebastiansylvan.com/post/robin-hood-hashing-should-be-your-default-hash-table-implementation/) + * with backshift (http://codecapsule.com/2013/11/17/robin-hood-hashing-backward-shift-deletion/). + * + * The main modification compared to an RH hashmap is that it never grows the map (no rehashes) instead it is allowed + * to kick out entires that are considered old. The implementation tries to keep the map close to full, only evicting + * old entries when needed. + */ +private[akka] abstract class LruBoundedCache[K: ClassTag, V <: AnyRef: ClassTag](capacity: Int, evictAgeThreshold: Int) { + require(capacity > 0, "Capacity must be larger than zero") + require((capacity & (capacity - 1)) == 0, "Capacity must be power of two") + require(evictAgeThreshold <= capacity, "Age threshold must be less than capacity.") + + private[this] val Mask = capacity - 1 + + // Practically guarantee an overflow + private[this] var epoch = Int.MaxValue - 1 + + private[this] val keys = Array.ofDim[K](capacity) + private[this] val values = Array.ofDim[V](capacity) + private[this] val hashes = Array.ofDim[Int](capacity) + private[this] val epochs = Array.fill[Int](capacity)(epoch - evictAgeThreshold) // Guarantee existing "values" are stale + + final def get(k: K): Option[V] = { + val h = hash(k) + + @tailrec def find(position: Int, probeDistance: Int): Option[V] = { + val otherProbeDistance = probeDistanceOf(position) + if (values(position) eq null) { + None + } else if (probeDistance > otherProbeDistance) { + None + } else if (hashes(position) == h && k == keys(position)) { + Some(values(position)) + } else { + find((position + 1) & Mask, probeDistance + 1) + } + } + + find(position = h & Mask, probeDistance = 0) + } + + final def stats: CacheStatistics = { + var i = 0 + var sum = 0 + var count = 0 + var max = 0 + while (i < hashes.length) { + if (values(i) ne null) { + val dist = probeDistanceOf(i) + sum += dist + count += 1 + max = math.max(dist, max) + } + i += 1 + } + CacheStatistics(count, max, sum.toDouble / count) + } + + final def getOrCompute(k: K): V = { + val h = hash(k) + epoch += 1 + + @tailrec def findOrCalculate(position: Int, probeDistance: Int): V = { + if (values(position) eq null) { + val value = compute(k) + if (isCacheable(value)) { + keys(position) = k + values(position) = value + hashes(position) = h + epochs(position) = epoch + } + value + } else { + val otherProbeDistance = probeDistanceOf(position) + // If probe distance of the element we try to get is larger than the current slot's, then the element cannot be in + // the table since because of the Robin-Hood property we would have swapped it with the current element. + if (probeDistance > otherProbeDistance) { + val value = compute(k) + if (isCacheable(value)) move(position, k, h, value, epoch, probeDistance) + value + } else if (hashes(position) == h && k == keys(position)) { + // Update usage + epochs(position) = epoch + values(position) + } else { + // This is not our slot yet + findOrCalculate((position + 1) & Mask, probeDistance + 1) + } + } + } + + findOrCalculate(position = h & Mask, probeDistance = 0) + } + + @tailrec private def removeAt(position: Int): Unit = { + val next = (position + 1) & Mask + if ((values(next) eq null) || probeDistanceOf(next) == 0) { + // Next is not movable, just empty this slot + values(position) = null.asInstanceOf[V] + } else { + // Shift the next slot here + keys(position) = keys(next) + values(position) = values(next) + hashes(position) = hashes(next) + epochs(position) = epochs(next) + // remove the shifted slot + removeAt(next) + } + } + + // Wraparound distance of the element that is in this slot. (X + capacity) & Mask ensures that there are no + // negative numbers on wraparound + private def probeDistanceOf(slot: Int): Int = probeDistanceOf(idealSlot = hashes(slot) & Mask, actualSlot = slot) + + // Protected for exposing it to unit tests + protected def probeDistanceOf(idealSlot: Int, actualSlot: Int) = ((actualSlot - idealSlot) + capacity) & Mask + + @tailrec private def move(position: Int, k: K, h: Int, value: V, elemEpoch: Int, probeDistance: Int): Unit = { + if (values(position) eq null) { + // Found an empty place, done. + keys(position) = k + values(position) = value + hashes(position) = h + epochs(position) = elemEpoch // Do NOT update the epoch of the elem. It was not touched, just moved + } else { + val otherEpoch = epochs(position) + // Check if the current entry is too old + if (epoch - otherEpoch >= evictAgeThreshold) { + // Remove old entry to make space + removeAt(position) + // Try to insert our element in hand to its ideal slot + move(h & Mask, k, h, value, elemEpoch, 0) + } else { + val otherProbeDistance = probeDistanceOf(position) + val otherEpoch = epochs(position) + + // Check whose probe distance is larger. The one with the larger one wins the slot. + if (probeDistance > otherProbeDistance) { + // Due to the Robin-Hood property, we now take away this slot from the "richer" and take it for ourselves + val otherKey = keys(position) + val otherValue = values(position) + val otherHash = hashes(position) + + keys(position) = k + values(position) = value + hashes(position) = h + epochs(position) = elemEpoch + + // Move out the old one + move((position + 1) & Mask, otherKey, otherHash, otherValue, otherEpoch, otherProbeDistance + 1) + } else { + // We are the "richer" so we need to find another slot + move((position + 1) & Mask, k, h, value, elemEpoch, probeDistance + 1) + } + + } + } + + } + + protected def compute(k: K): V + + protected def hash(k: K): Int + + protected def isCacheable(v: V): Boolean + + override def toString = + s"LruBoundedCache(" + + s" values = ${values.mkString("[", ",", "]")}," + + s" hashes = ${hashes.map(_ & Mask).mkString("[", ",", "]")}," + + s" epochs = ${epochs.mkString("[", ",", "]")}," + + s" distances = ${(0 until hashes.length).map(probeDistanceOf).mkString("[", ",", "]")}," + + s" $epoch)" +} diff --git a/akka-remote/src/test/scala/akka/remote/artery/LruBoundedCacheSpec.scala b/akka-remote/src/test/scala/akka/remote/artery/LruBoundedCacheSpec.scala new file mode 100644 index 0000000000..1ae4329585 --- /dev/null +++ b/akka-remote/src/test/scala/akka/remote/artery/LruBoundedCacheSpec.scala @@ -0,0 +1,225 @@ +/** + * Copyright (C) 2016 Lightbend Inc. + */ +package akka.remote.artery + +import akka.testkit.AkkaSpec + +import scala.util.Random + +class LruBoundedCacheSpec extends AkkaSpec { + + class TestCache(_capacity: Int, threshold: Int, hashSeed: String = "") extends LruBoundedCache[String, String](_capacity, threshold) { + private var cntr = 0 + + override protected def compute(k: String): String = { + val id = cntr + cntr += 1 + k + ":" + id + } + override protected def hash(k: String): Int = FastHash.ofString(hashSeed + k + hashSeed) + + override protected def isCacheable(v: String): Boolean = !v.startsWith("#") + + def internalProbeDistanceOf(idealSlot: Int, actualSlot: Int): Int = probeDistanceOf(idealSlot, actualSlot) + + def expectComputed(key: String, value: String): Unit = { + get(key) should ===(None) + getOrCompute(key) should ===(value) + get(key) should ===(Some(value)) + } + + def expectCached(key: String, value: String): Unit = { + get(key) should ===(Some(value)) + getOrCompute(key) should ===(value) + get(key) should ===(Some(value)) + } + + def expectComputedOnly(key: String, value: String): Unit = { + get(key) should ===(None) + getOrCompute(key) should ===(value) + get(key) should ===(None) + } + } + + final class BrokenHashFunctionTestCache(_capacity: Int, _threshold: Int) extends TestCache(_capacity, _threshold) { + override protected def hash(k: String): Int = 0 + } + + "LruBoundedCache" must { + + "work in the happy case" in { + val cache = new TestCache(4, 4) + + cache.expectComputed("A", "A:0") + cache.expectComputed("B", "B:1") + cache.expectComputed("C", "C:2") + cache.expectComputed("D", "D:3") + + cache.expectCached("A", "A:0") + cache.expectCached("B", "B:1") + cache.expectCached("C", "C:2") + cache.expectCached("D", "D:3") + } + + "evict oldest when full" in { + for (_ ← 1 to 10) { + val seed = Random.nextInt(1024) + info(s"Variant $seed") + val cache = new TestCache(4, 4, seed.toString) + + cache.expectComputed("A", "A:0") + cache.expectComputed("B", "B:1") + cache.expectComputed("C", "C:2") + cache.expectComputed("D", "D:3") + cache.expectComputed("E", "E:4") + + cache.expectCached("B", "B:1") + cache.expectCached("C", "C:2") + cache.expectCached("D", "D:3") + cache.expectCached("E", "E:4") + + cache.expectComputed("A", "A:5") + cache.expectComputed("B", "B:6") + cache.expectComputed("C", "C:7") + cache.expectComputed("D", "D:8") + cache.expectComputed("E", "E:9") + + cache.expectCached("B", "B:6") + cache.expectCached("C", "C:7") + cache.expectCached("D", "D:8") + cache.expectCached("E", "E:9") + } + } + + "work with low quality hash function" in { + val cache = new BrokenHashFunctionTestCache(4, 4) + + cache.expectComputed("A", "A:0") + cache.expectComputed("B", "B:1") + cache.expectComputed("C", "C:2") + cache.expectComputed("D", "D:3") + cache.expectComputed("E", "E:4") + + cache.expectCached("B", "B:1") + cache.expectCached("C", "C:2") + cache.expectCached("D", "D:3") + cache.expectCached("E", "E:4") + + cache.expectComputed("A", "A:5") + cache.expectComputed("B", "B:6") + cache.expectComputed("C", "C:7") + cache.expectComputed("D", "D:8") + cache.expectComputed("E", "E:9") + + cache.expectCached("B", "B:6") + cache.expectCached("C", "C:7") + cache.expectCached("D", "D:8") + cache.expectCached("E", "E:9") + } + + "calculate probe distance correctly" in { + val cache = new TestCache(4, 4) + + cache.internalProbeDistanceOf(0, 0) should ===(0) + cache.internalProbeDistanceOf(0, 1) should ===(1) + cache.internalProbeDistanceOf(0, 2) should ===(2) + cache.internalProbeDistanceOf(0, 3) should ===(3) + + cache.internalProbeDistanceOf(1, 1) should ===(0) + cache.internalProbeDistanceOf(1, 2) should ===(1) + cache.internalProbeDistanceOf(1, 3) should ===(2) + cache.internalProbeDistanceOf(1, 0) should ===(3) + + cache.internalProbeDistanceOf(2, 2) should ===(0) + cache.internalProbeDistanceOf(2, 3) should ===(1) + cache.internalProbeDistanceOf(2, 0) should ===(2) + cache.internalProbeDistanceOf(2, 1) should ===(3) + + cache.internalProbeDistanceOf(3, 3) should ===(0) + cache.internalProbeDistanceOf(3, 0) should ===(1) + cache.internalProbeDistanceOf(3, 1) should ===(2) + cache.internalProbeDistanceOf(3, 2) should ===(3) + } + + "work with a lower age threshold" in { + for (_ ← 1 to 10) { + val seed = Random.nextInt(1024) + info(s"Variant $seed") + val cache = new TestCache(4, 2, seed.toString) + + cache.expectComputed("A", "A:0") + cache.expectComputed("B", "B:1") + cache.expectComputed("C", "C:2") + cache.expectComputed("D", "D:3") + cache.expectComputed("E", "E:4") + + cache.expectCached("D", "D:3") + cache.expectCached("E", "E:4") + + cache.expectComputed("F", "F:5") + cache.expectComputed("G", "G:6") + cache.expectComputed("H", "H:7") + cache.expectComputed("I", "I:8") + cache.expectComputed("J", "J:9") + + cache.expectCached("I", "I:8") + cache.expectCached("J", "J:9") + } + } + + "must not cache noncacheable values" in { + val cache = new TestCache(4, 4) + + cache.expectComputedOnly("#A", "#A:0") + cache.expectComputedOnly("#A", "#A:1") + cache.expectComputedOnly("#A", "#A:2") + cache.expectComputedOnly("#A", "#A:3") + + cache.expectComputed("A", "A:4") + cache.expectComputed("B", "B:5") + cache.expectComputed("C", "C:6") + cache.expectComputed("D", "D:7") + cache.expectComputed("E", "E:8") + + cache.expectCached("B", "B:5") + cache.expectCached("C", "C:6") + cache.expectCached("D", "D:7") + cache.expectCached("E", "E:8") + + cache.expectComputedOnly("#A", "#A:9") + cache.expectComputedOnly("#A", "#A:10") + cache.expectComputedOnly("#A", "#A:11") + cache.expectComputedOnly("#A", "#A:12") + + // Cacheable values are not affected + cache.expectCached("B", "B:5") + cache.expectCached("C", "C:6") + cache.expectCached("D", "D:7") + cache.expectCached("E", "E:8") + } + + "maintain a good average probe distance" in { + for (_ ← 1 to 10) { + val seed = Random.nextInt(1024) + info(s"Variant $seed") + // Cache emulating 60% fill rate + val cache = new TestCache(1024, 600, seed.toString) + + // Fill up cache + for (_ ← 1 to 10000) cache.getOrCompute(Random.nextString(32)) + + val stats = cache.stats + // Have not seen lower than 890 + stats.entries should be > 750 + // Have not seen higher than 1.8 + stats.averageProbeDistance should be < 2.5 + // Have not seen higher than 15 + stats.maxProbeDistance should be < 25 + } + + } + + } + +}