add thread local LRU cache for resolveActorRef

This commit is contained in:
Patrik Nordwall 2016-09-27 16:34:43 +02:00
parent be319afcd2
commit 1cae346f4d
4 changed files with 121 additions and 4 deletions

View file

@ -21,6 +21,8 @@ import akka.remote.artery.ArteryTransport
import akka.util.OptionVal
import akka.remote.artery.OutboundEnvelope
import akka.remote.artery.SystemMessageDelivery.SystemMessageEnvelope
import akka.remote.serialization.ActorRefResolveCache
import akka.remote.serialization.ActorRefResolveThreadLocalCache
/**
* INTERNAL API
@ -175,9 +177,13 @@ private[akka] class RemoteActorRefProvider(
@volatile private var remoteDeploymentWatcher: ActorRef = _
@volatile private var actorRefResolveThreadLocalCache: ActorRefResolveThreadLocalCache = _
def init(system: ActorSystemImpl): Unit = {
local.init(system)
actorRefResolveThreadLocalCache = ActorRefResolveThreadLocalCache(system)
remotingTerminator = system.systemActorOf(
remoteSettings.configureDispatcher(Props(classOf[RemotingTerminator], local.systemGuardian)),
"remoting-terminator")
@ -389,7 +395,20 @@ private[akka] class RemoteActorRefProvider(
}
}
def resolveActorRef(path: String): ActorRef = path match {
def resolveActorRef(path: String): ActorRef = {
// using thread local LRU cache, which will call internalRresolveActorRef
// if the value is not cached
actorRefResolveThreadLocalCache match {
case null internalResolveActorRef(path) // not initalized yet
case c c.threadLocalCache(this).getOrCompute(path)
}
}
/**
* INTERNAL API: This is used by the `ActorRefResolveCache` via the
* public `resolveActorRef(path: String)`.
*/
private[akka] def internalResolveActorRef(path: String): ActorRef = path match {
case ActorPathExtractor(address, elems)
if (hasAddress(address)) local.resolveActorRef(rootGuardian, elems)
else {

View file

@ -255,7 +255,7 @@ private[remote] object Decoder {
/**
* INTERNAL API
*/
private[akka] final class ActorRefResolveCache(provider: RemoteActorRefProvider, localAddress: UniqueAddress)
private[akka] final class ActorRefResolveCacheWithAddress(provider: RemoteActorRefProvider, localAddress: UniqueAddress)
extends LruBoundedCache[String, InternalActorRef](capacity = 1024, evictAgeThreshold = 600) {
override protected def compute(k: String): InternalActorRef =
@ -286,8 +286,8 @@ private[remote] class Decoder(
import Decoder.RetryResolveRemoteDeployedRecipient
private val localAddress = inboundContext.localAddress.address
private val headerBuilder = HeaderBuilder.in(compression)
private val actorRefResolver: ActorRefResolveCache =
new ActorRefResolveCache(system.provider.asInstanceOf[RemoteActorRefProvider], uniqueLocalAddress)
private val actorRefResolver: ActorRefResolveCacheWithAddress =
new ActorRefResolveCacheWithAddress(system.provider.asInstanceOf[RemoteActorRefProvider], uniqueLocalAddress)
private val bannedRemoteDeployedActorRefs = new java.util.HashSet[String]
private val retryResolveRemoteDeployedRecipientInterval = 50.millis

View file

@ -0,0 +1,64 @@
/**
* Copyright (C) 2016 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.remote.serialization
import akka.actor.ActorRef
import akka.actor.ActorSystem
import akka.actor.EmptyLocalActorRef
import akka.actor.ExtendedActorSystem
import akka.actor.Extension
import akka.actor.ExtensionId
import akka.actor.ExtensionIdProvider
import akka.remote.RemoteActorRefProvider
import akka.remote.artery.FastHash
import akka.remote.artery.LruBoundedCache
/**
* INTERNAL API: Thread local cache per actor system
*/
private[akka] object ActorRefResolveThreadLocalCache
extends ExtensionId[ActorRefResolveThreadLocalCache] with ExtensionIdProvider {
override def get(system: ActorSystem): ActorRefResolveThreadLocalCache = super.get(system)
override def lookup = ActorRefResolveThreadLocalCache
override def createExtension(system: ExtendedActorSystem): ActorRefResolveThreadLocalCache =
new ActorRefResolveThreadLocalCache(system)
}
/**
* INTERNAL API
*/
private[akka] class ActorRefResolveThreadLocalCache(val system: ExtendedActorSystem) extends Extension {
private val provider = system.provider match {
case r: RemoteActorRefProvider r
case _ throw new IllegalArgumentException(
"ActorRefResolveThreadLocalCache can only be used with RemoteActorRefProvider, " +
s"not with ${system.provider.getClass}")
}
private val current = new ThreadLocal[ActorRefResolveCache] {
override def initialValue: ActorRefResolveCache = new ActorRefResolveCache(provider)
}
def threadLocalCache(provider: RemoteActorRefProvider): ActorRefResolveCache =
current.get
}
/**
* INTERNAL API
*/
private[akka] final class ActorRefResolveCache(provider: RemoteActorRefProvider)
extends LruBoundedCache[String, ActorRef](capacity = 1024, evictAgeThreshold = 600) {
override protected def compute(k: String): ActorRef =
provider.internalResolveActorRef(k)
override protected def hash(k: String): Int = FastHash.ofString(k)
override protected def isCacheable(v: ActorRef): Boolean = !v.isInstanceOf[EmptyLocalActorRef]
}

View file

@ -6,6 +6,9 @@ package akka.remote.artery
import akka.actor.{ EmptyLocalActorRef, InternalActorRef }
import akka.remote.RemoteActorRef
import akka.testkit.{ EventFilter, TestActors }
import akka.actor.Props
import akka.actor.ExtendedActorSystem
import akka.actor.ActorRefScope
class RemoteActorRefProviderSpec extends ArteryMultiNodeSpec {
@ -29,6 +32,37 @@ class RemoteActorRefProviderSpec extends ArteryMultiNodeSpec {
sel.anchor.asInstanceOf[InternalActorRef].isLocal should be(false)
}
"cache resolveActorRef for local ref" in {
val provider = localSystem.asInstanceOf[ExtendedActorSystem].provider
val path = s"akka://${system.name}@${addressA.host.get}:${addressA.port.get}/user/echo"
val ref1 = provider.resolveActorRef(path)
ref1.getClass should !==(classOf[EmptyLocalActorRef])
ref1.asInstanceOf[ActorRefScope].isLocal should ===(true)
val ref2 = provider.resolveActorRef(path)
ref1 should be theSameInstanceAs (ref2)
}
"not cache resolveActorRef for unresolved ref" in {
val provider = localSystem.asInstanceOf[ExtendedActorSystem].provider
val path = s"akka://${system.name}@${addressA.host.get}:${addressA.port.get}/user/doesNotExist"
val ref1 = provider.resolveActorRef(path)
ref1.getClass should ===(classOf[EmptyLocalActorRef])
val ref2 = provider.resolveActorRef(path)
ref1 should not be theSameInstanceAs(ref2)
}
"cache resolveActorRef for remote ref" in {
val provider = localSystem.asInstanceOf[ExtendedActorSystem].provider
val path = s"akka://${systemB.name}@${addressB.host.get}:${addressB.port.get}/user/echo"
val ref1 = provider.resolveActorRef(path)
ref1.getClass should ===(classOf[RemoteActorRef])
val ref2 = provider.resolveActorRef(path)
ref1 should be theSameInstanceAs (ref2)
}
"detect wrong protocol" in {
EventFilter[IllegalArgumentException](start = "No root guardian at", occurrences = 1).intercept {
val sel = system.actorSelection(s"akka.tcp://${systemB.name}@${addressB.host.get}:${addressB.port.get}/user/echo")