diff --git a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala index 187f9f1389..7b8b871941 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala @@ -21,6 +21,8 @@ import akka.remote.artery.ArteryTransport import akka.util.OptionVal import akka.remote.artery.OutboundEnvelope import akka.remote.artery.SystemMessageDelivery.SystemMessageEnvelope +import akka.remote.serialization.ActorRefResolveCache +import akka.remote.serialization.ActorRefResolveThreadLocalCache /** * INTERNAL API @@ -175,9 +177,13 @@ private[akka] class RemoteActorRefProvider( @volatile private var remoteDeploymentWatcher: ActorRef = _ + @volatile private var actorRefResolveThreadLocalCache: ActorRefResolveThreadLocalCache = _ + def init(system: ActorSystemImpl): Unit = { local.init(system) + actorRefResolveThreadLocalCache = ActorRefResolveThreadLocalCache(system) + remotingTerminator = system.systemActorOf( remoteSettings.configureDispatcher(Props(classOf[RemotingTerminator], local.systemGuardian)), "remoting-terminator") @@ -389,7 +395,20 @@ private[akka] class RemoteActorRefProvider( } } - def resolveActorRef(path: String): ActorRef = path match { + def resolveActorRef(path: String): ActorRef = { + // using thread local LRU cache, which will call internalRresolveActorRef + // if the value is not cached + actorRefResolveThreadLocalCache match { + case null ⇒ internalResolveActorRef(path) // not initalized yet + case c ⇒ c.threadLocalCache(this).getOrCompute(path) + } + } + + /** + * INTERNAL API: This is used by the `ActorRefResolveCache` via the + * public `resolveActorRef(path: String)`. + */ + private[akka] def internalResolveActorRef(path: String): ActorRef = path match { case ActorPathExtractor(address, elems) ⇒ if (hasAddress(address)) local.resolveActorRef(rootGuardian, elems) else { diff --git a/akka-remote/src/main/scala/akka/remote/artery/Codecs.scala b/akka-remote/src/main/scala/akka/remote/artery/Codecs.scala index b59ac96d51..26b2b0bb43 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/Codecs.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/Codecs.scala @@ -255,7 +255,7 @@ private[remote] object Decoder { /** * INTERNAL API */ -private[akka] final class ActorRefResolveCache(provider: RemoteActorRefProvider, localAddress: UniqueAddress) +private[akka] final class ActorRefResolveCacheWithAddress(provider: RemoteActorRefProvider, localAddress: UniqueAddress) extends LruBoundedCache[String, InternalActorRef](capacity = 1024, evictAgeThreshold = 600) { override protected def compute(k: String): InternalActorRef = @@ -286,8 +286,8 @@ private[remote] class Decoder( import Decoder.RetryResolveRemoteDeployedRecipient private val localAddress = inboundContext.localAddress.address private val headerBuilder = HeaderBuilder.in(compression) - private val actorRefResolver: ActorRefResolveCache = - new ActorRefResolveCache(system.provider.asInstanceOf[RemoteActorRefProvider], uniqueLocalAddress) + private val actorRefResolver: ActorRefResolveCacheWithAddress = + new ActorRefResolveCacheWithAddress(system.provider.asInstanceOf[RemoteActorRefProvider], uniqueLocalAddress) private val bannedRemoteDeployedActorRefs = new java.util.HashSet[String] private val retryResolveRemoteDeployedRecipientInterval = 50.millis diff --git a/akka-remote/src/main/scala/akka/remote/serialization/ActorRefResolveCache.scala b/akka-remote/src/main/scala/akka/remote/serialization/ActorRefResolveCache.scala new file mode 100644 index 0000000000..4ced76fe04 --- /dev/null +++ b/akka-remote/src/main/scala/akka/remote/serialization/ActorRefResolveCache.scala @@ -0,0 +1,64 @@ +/** + * Copyright (C) 2016 Lightbend Inc. + */ +package akka.remote.serialization + +import akka.actor.ActorRef +import akka.actor.ActorSystem +import akka.actor.EmptyLocalActorRef +import akka.actor.ExtendedActorSystem +import akka.actor.Extension +import akka.actor.ExtensionId +import akka.actor.ExtensionIdProvider +import akka.remote.RemoteActorRefProvider +import akka.remote.artery.FastHash +import akka.remote.artery.LruBoundedCache + +/** + * INTERNAL API: Thread local cache per actor system + */ +private[akka] object ActorRefResolveThreadLocalCache + extends ExtensionId[ActorRefResolveThreadLocalCache] with ExtensionIdProvider { + + override def get(system: ActorSystem): ActorRefResolveThreadLocalCache = super.get(system) + + override def lookup = ActorRefResolveThreadLocalCache + + override def createExtension(system: ExtendedActorSystem): ActorRefResolveThreadLocalCache = + new ActorRefResolveThreadLocalCache(system) +} + +/** + * INTERNAL API + */ +private[akka] class ActorRefResolveThreadLocalCache(val system: ExtendedActorSystem) extends Extension { + + private val provider = system.provider match { + case r: RemoteActorRefProvider ⇒ r + case _ ⇒ throw new IllegalArgumentException( + "ActorRefResolveThreadLocalCache can only be used with RemoteActorRefProvider, " + + s"not with ${system.provider.getClass}") + } + + private val current = new ThreadLocal[ActorRefResolveCache] { + override def initialValue: ActorRefResolveCache = new ActorRefResolveCache(provider) + } + + def threadLocalCache(provider: RemoteActorRefProvider): ActorRefResolveCache = + current.get + +} + +/** + * INTERNAL API + */ +private[akka] final class ActorRefResolveCache(provider: RemoteActorRefProvider) + extends LruBoundedCache[String, ActorRef](capacity = 1024, evictAgeThreshold = 600) { + + override protected def compute(k: String): ActorRef = + provider.internalResolveActorRef(k) + + override protected def hash(k: String): Int = FastHash.ofString(k) + + override protected def isCacheable(v: ActorRef): Boolean = !v.isInstanceOf[EmptyLocalActorRef] +} diff --git a/akka-remote/src/test/scala/akka/remote/artery/RemoteActorRefProviderSpec.scala b/akka-remote/src/test/scala/akka/remote/artery/RemoteActorRefProviderSpec.scala index 96f7cef146..586f41e8cb 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/RemoteActorRefProviderSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/RemoteActorRefProviderSpec.scala @@ -6,6 +6,9 @@ package akka.remote.artery import akka.actor.{ EmptyLocalActorRef, InternalActorRef } import akka.remote.RemoteActorRef import akka.testkit.{ EventFilter, TestActors } +import akka.actor.Props +import akka.actor.ExtendedActorSystem +import akka.actor.ActorRefScope class RemoteActorRefProviderSpec extends ArteryMultiNodeSpec { @@ -29,6 +32,37 @@ class RemoteActorRefProviderSpec extends ArteryMultiNodeSpec { sel.anchor.asInstanceOf[InternalActorRef].isLocal should be(false) } + "cache resolveActorRef for local ref" in { + val provider = localSystem.asInstanceOf[ExtendedActorSystem].provider + val path = s"akka://${system.name}@${addressA.host.get}:${addressA.port.get}/user/echo" + val ref1 = provider.resolveActorRef(path) + ref1.getClass should !==(classOf[EmptyLocalActorRef]) + ref1.asInstanceOf[ActorRefScope].isLocal should ===(true) + + val ref2 = provider.resolveActorRef(path) + ref1 should be theSameInstanceAs (ref2) + } + + "not cache resolveActorRef for unresolved ref" in { + val provider = localSystem.asInstanceOf[ExtendedActorSystem].provider + val path = s"akka://${system.name}@${addressA.host.get}:${addressA.port.get}/user/doesNotExist" + val ref1 = provider.resolveActorRef(path) + ref1.getClass should ===(classOf[EmptyLocalActorRef]) + + val ref2 = provider.resolveActorRef(path) + ref1 should not be theSameInstanceAs(ref2) + } + + "cache resolveActorRef for remote ref" in { + val provider = localSystem.asInstanceOf[ExtendedActorSystem].provider + val path = s"akka://${systemB.name}@${addressB.host.get}:${addressB.port.get}/user/echo" + val ref1 = provider.resolveActorRef(path) + ref1.getClass should ===(classOf[RemoteActorRef]) + + val ref2 = provider.resolveActorRef(path) + ref1 should be theSameInstanceAs (ref2) + } + "detect wrong protocol" in { EventFilter[IllegalArgumentException](start = "No root guardian at", occurrences = 1).intercept { val sel = system.actorSelection(s"akka.tcp://${systemB.name}@${addressB.host.get}:${addressB.port.get}/user/echo")