Don't cache temp actors in ActorRefResolveCache, #29828 (#29834)

* problem described in in issue
* temp (ask) ActorRef shouldn't be cached
* similar to to the ActorRef compression that was fixed
  in https://github.com/akka/akka/pull/28823
* temp ActorRef doesn't contain the ActorRef uid so it can
  be resolved to an ActorRef with an old cached Association
* additionally, invalidate the cached Association if it has
  been removed after quarantine
* test that reproduces the problem in the issue
* also verified with the Main example in the issue
This commit is contained in:
Patrik Nordwall 2020-12-04 08:14:58 +01:00 committed by GitHub
parent 3602ffa4d9
commit 021e7d58bc
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 148 additions and 38 deletions

View file

@ -18,6 +18,7 @@ import akka.event.AddressTerminatedTopic
import akka.event.EventStream
import akka.event.Logging
import akka.event.MarkerLoggingAdapter
import akka.pattern.PromiseActorRef
import akka.serialization.JavaSerializer
import akka.serialization.Serialization
import akka.util.OptionVal
@ -213,13 +214,26 @@ private[akka] trait RepointableRef extends ActorRefScope {
def isStarted: Boolean
}
/**
* INTERNAL API
*/
@InternalApi private[akka] object InternalActorRef {
def isTemporaryRef(ref: ActorRef): Boolean =
ref match {
case i: InternalActorRef =>
(i.isLocal && i.isInstanceOf[PromiseActorRef]) ||
(!i.isLocal && i.path.elements.head == "temp")
}
}
/**
* Internal trait for assembling all the functionality needed internally on
* ActorRefs. NOTE THAT THIS IS NOT A STABLE EXTERNAL INTERFACE!
*
* DO NOT USE THIS UNLESS INTERNALLY WITHIN AKKA!
*/
private[akka] abstract class InternalActorRef extends ActorRef with ScalaActorRef { this: ActorRefScope =>
@InternalApi private[akka] abstract class InternalActorRef extends ActorRef with ScalaActorRef { this: ActorRefScope =>
/*
* Actor life-cycle management, invoked only internally (in response to user requests via ActorContext).
*/

View file

@ -0,0 +1,3 @@
# #29828 internal changes to ActorRefResolveCache
ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.remote.artery.ActorRefResolveCacheWithAddress.isCacheable")
ProblemFilters.exclude[IncompatibleSignatureProblem]("akka.remote.serialization.ActorRefResolveCache.isCacheable")

View file

@ -514,8 +514,10 @@ private[akka] class RemoteActorRefProvider(
// using thread local LRU cache, which will call internalResolveActorRef
// if the value is not cached
actorRefResolveThreadLocalCache match {
case null => internalResolveActorRef(path) // not initialized yet
case c => c.threadLocalCache(this).getOrCompute(path)
case null =>
internalResolveActorRef(path) // not initialized yet
case c =>
c.threadLocalCache(this).resolve(path)
}
}

View file

@ -6,29 +6,35 @@ package akka.remote.artery
import java.util.concurrent.TimeUnit
import scala.concurrent.{ Future, Promise }
import scala.concurrent.Future
import scala.concurrent.Promise
import scala.concurrent.duration._
import scala.util.control.NonFatal
import akka.Done
import akka.actor.{ EmptyLocalActorRef, _ }
import akka.actor.EmptyLocalActorRef
import akka.actor._
import akka.event.Logging
import akka.pattern.PromiseActorRef
import akka.remote.{ MessageSerializer, OversizedPayloadException, RemoteActorRefProvider, UniqueAddress }
import akka.remote.artery.Decoder.{
AdvertiseActorRefsCompressionTable,
AdvertiseClassManifestsCompressionTable,
InboundCompressionAccess,
InboundCompressionAccessImpl
}
import akka.remote.MessageSerializer
import akka.remote.OversizedPayloadException
import akka.remote.RemoteActorRefProvider
import akka.remote.UniqueAddress
import akka.remote.artery.Decoder.AdvertiseActorRefsCompressionTable
import akka.remote.artery.Decoder.AdvertiseClassManifestsCompressionTable
import akka.remote.artery.Decoder.InboundCompressionAccess
import akka.remote.artery.Decoder.InboundCompressionAccessImpl
import akka.remote.artery.OutboundHandshake.HandshakeReq
import akka.remote.artery.SystemMessageDelivery.SystemMessageEnvelope
import akka.remote.artery.compress._
import akka.remote.artery.compress.CompressionProtocol._
import akka.serialization.{ Serialization, SerializationExtension, Serializers }
import akka.remote.artery.compress._
import akka.remote.serialization.AbstractActorRefResolveCache
import akka.serialization.Serialization
import akka.serialization.SerializationExtension
import akka.serialization.Serializers
import akka.stream._
import akka.stream.stage._
import akka.util.{ unused, OptionVal, Unsafe }
import akka.util.OptionVal
import akka.util.unused
/**
* INTERNAL API
@ -335,21 +341,10 @@ private[remote] object Decoder {
private[remote] final class ActorRefResolveCacheWithAddress(
provider: RemoteActorRefProvider,
localAddress: UniqueAddress)
extends LruBoundedCache[String, InternalActorRef](capacity = 1024, evictAgeThreshold = 600) {
extends AbstractActorRefResolveCache[InternalActorRef] {
override protected def compute(k: String): InternalActorRef =
provider.resolveActorRefWithLocalAddress(k, localAddress.address)
override protected def hash(k: String): Int = Unsafe.fastHash(k)
override protected def isCacheable(v: InternalActorRef): Boolean =
v match {
case _: EmptyLocalActorRef => false
case _: PromiseActorRef =>
// each of these are only for one request-response interaction so don't cache
false
case _ => true
}
}
/**
@ -437,7 +432,7 @@ private[remote] class Decoder(
case OptionVal.Some(ref) =>
OptionVal(ref.asInstanceOf[InternalActorRef])
case OptionVal.None if headerBuilder.senderActorRefPath.isDefined =>
OptionVal(actorRefResolver.getOrCompute(headerBuilder.senderActorRefPath.get))
OptionVal(actorRefResolver.resolve(headerBuilder.senderActorRefPath.get))
case _ =>
OptionVal.None
} catch {

View file

@ -10,12 +10,15 @@ import scala.annotation.tailrec
import org.agrona.collections.Long2ObjectHashMap
import akka.actor.{ ActorRef, ActorSystem, Address, InternalActorRef }
import akka.actor.ActorRef
import akka.actor.ActorSystem
import akka.actor.Address
import akka.actor.InternalActorRef
import akka.event.Logging
import akka.event.LoggingAdapter
import akka.pattern.PromiseActorRef
import akka.remote.artery._
import akka.util.{ unused, OptionVal }
import akka.util.OptionVal
import akka.util.unused
/**
* INTERNAL API
@ -198,9 +201,7 @@ private[remote] final class InboundActorRefCompression(
var idx = 0
elements.foreach {
case ref: InternalActorRef =>
val isTemporaryRef = (ref.isLocal && ref.isInstanceOf[PromiseActorRef]) ||
(!ref.isLocal && ref.path.elements.head == "temp")
if (!isTemporaryRef) {
if (!InternalActorRef.isTemporaryRef(ref)) {
mb += ref -> idx
idx += 1
}

View file

@ -4,6 +4,8 @@
package akka.remote.serialization
import scala.reflect.ClassTag
import akka.actor.ActorRef
import akka.actor.ActorSystem
import akka.actor.ClassicActorSystemProvider
@ -12,9 +14,12 @@ import akka.actor.ExtendedActorSystem
import akka.actor.Extension
import akka.actor.ExtensionId
import akka.actor.ExtensionIdProvider
import akka.actor.InternalActorRef
import akka.remote.RemoteActorRef
import akka.remote.RemoteActorRefProvider
import akka.remote.artery.LruBoundedCache
import akka.util.{ unused, Unsafe }
import akka.util.Unsafe
import akka.util.unused
/**
* INTERNAL API: Thread local cache per actor system
@ -58,12 +63,43 @@ private[akka] class ActorRefResolveThreadLocalCache(val system: ExtendedActorSys
* INTERNAL API
*/
private[akka] final class ActorRefResolveCache(provider: RemoteActorRefProvider)
extends LruBoundedCache[String, ActorRef](capacity = 1024, evictAgeThreshold = 600) {
extends AbstractActorRefResolveCache[ActorRef] {
override protected def compute(k: String): ActorRef =
provider.internalResolveActorRef(k)
}
/**
* INTERNAL API
*/
private[akka] abstract class AbstractActorRefResolveCache[R <: ActorRef: ClassTag]
extends LruBoundedCache[String, R](capacity = 1024, evictAgeThreshold = 600) {
/**
* Compared to `getOrCompute` this will also invalidate cachedAssociation of RemoteActorRef
* if the `Association` is removed.
*/
def resolve(k: String): R = {
val ref = getOrCompute(k)
ref match {
case r: RemoteActorRef =>
val cachedAssociation = r.cachedAssociation
if (cachedAssociation != null && cachedAssociation.isRemovedAfterQuarantined())
r.cachedAssociation = null
case _ =>
}
ref
}
override protected def compute(k: String): R
override protected def hash(k: String): Int = Unsafe.fastHash(k)
override protected def isCacheable(v: ActorRef): Boolean = !v.isInstanceOf[EmptyLocalActorRef]
override protected def isCacheable(ref: R): Boolean =
ref match {
case _: EmptyLocalActorRef => false
case _ =>
// "temp" only for one request-response interaction so don't cache
!InternalActorRef.isTemporaryRef(ref)
}
}

View file

@ -0,0 +1,59 @@
/*
* Copyright (C) 2020 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.remote.artery
import scala.concurrent.duration._
import akka.remote.RARP
import akka.testkit.DeadLettersFilter
import akka.testkit.ImplicitSender
import akka.testkit.TestActors
import akka.testkit.TestEvent.Mute
import akka.pattern.ask
import akka.util.Timeout
/**
* Reproducer of issue #29828
*/
class ActorRefResolveCacheQuarantineSpec
extends ArteryMultiNodeSpec("""
akka.remote.artery.advanced.remove-quarantined-association-after = 2 seconds
""")
with ImplicitSender {
import RemoteFailureSpec._
private implicit val timeout: Timeout = 3.seconds
system.eventStream.publish(Mute(DeadLettersFilter(classOf[Ping])(occurrences = Int.MaxValue)))
"ActorRefResolveCache" should {
"not use cached quarantined association" in {
system.actorOf(TestActors.echoActorProps, name = "echo")
val clientSystem1 = newRemoteSystem()
val remoteSelection1 = clientSystem1.actorSelection(rootActorPath(system) / "user" / "echo")
// PromiseActorRef (temp) doesn't include a uid in the ActorRef
val reply1 = remoteSelection1 ? "hello-1"
reply1.futureValue shouldBe "hello-1"
shutdown(clientSystem1)
// wait for it to be removed fully, remove-quarantined-association-after
Thread.sleep(4000)
val port1 = RARP(clientSystem1).provider.getDefaultAddress.getPort().get
val clientSystem2 =
newRemoteSystem(
name = Some(clientSystem1.name),
extraConfig = Some(s"akka.remote.artery.canonical.port = $port1"))
val remoteSelection2 = clientSystem2.actorSelection(rootActorPath(system) / "user" / "echo")
val reply2 = remoteSelection2 ? "hello-2"
reply2.futureValue shouldBe "hello-2"
}
}
}