diff --git a/akka-actor/src/main/scala/akka/actor/ActorRef.scala b/akka-actor/src/main/scala/akka/actor/ActorRef.scala index b827771f1a..2eb5dfb337 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRef.scala @@ -18,6 +18,7 @@ import akka.event.AddressTerminatedTopic import akka.event.EventStream import akka.event.Logging import akka.event.MarkerLoggingAdapter +import akka.pattern.PromiseActorRef import akka.serialization.JavaSerializer import akka.serialization.Serialization import akka.util.OptionVal @@ -213,13 +214,26 @@ private[akka] trait RepointableRef extends ActorRefScope { def isStarted: Boolean } +/** + * INTERNAL API + */ +@InternalApi private[akka] object InternalActorRef { + def isTemporaryRef(ref: ActorRef): Boolean = + ref match { + case i: InternalActorRef => + (i.isLocal && i.isInstanceOf[PromiseActorRef]) || + (!i.isLocal && i.path.elements.head == "temp") + } + +} + /** * Internal trait for assembling all the functionality needed internally on * ActorRefs. NOTE THAT THIS IS NOT A STABLE EXTERNAL INTERFACE! * * DO NOT USE THIS UNLESS INTERNALLY WITHIN AKKA! */ -private[akka] abstract class InternalActorRef extends ActorRef with ScalaActorRef { this: ActorRefScope => +@InternalApi private[akka] abstract class InternalActorRef extends ActorRef with ScalaActorRef { this: ActorRefScope => /* * Actor life-cycle management, invoked only internally (in response to user requests via ActorContext). */ diff --git a/akka-remote/src/main/mima-filters/2.6.10.backwards.excludes/issue-29828-resolve-cache.excludes b/akka-remote/src/main/mima-filters/2.6.10.backwards.excludes/issue-29828-resolve-cache.excludes new file mode 100644 index 0000000000..e664fc2775 --- /dev/null +++ b/akka-remote/src/main/mima-filters/2.6.10.backwards.excludes/issue-29828-resolve-cache.excludes @@ -0,0 +1,3 @@ +# #29828 internal changes to ActorRefResolveCache +ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.remote.artery.ActorRefResolveCacheWithAddress.isCacheable") +ProblemFilters.exclude[IncompatibleSignatureProblem]("akka.remote.serialization.ActorRefResolveCache.isCacheable") diff --git a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala index 3e7b705eca..da42c43268 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala @@ -514,8 +514,10 @@ private[akka] class RemoteActorRefProvider( // using thread local LRU cache, which will call internalResolveActorRef // if the value is not cached actorRefResolveThreadLocalCache match { - case null => internalResolveActorRef(path) // not initialized yet - case c => c.threadLocalCache(this).getOrCompute(path) + case null => + internalResolveActorRef(path) // not initialized yet + case c => + c.threadLocalCache(this).resolve(path) } } diff --git a/akka-remote/src/main/scala/akka/remote/artery/Codecs.scala b/akka-remote/src/main/scala/akka/remote/artery/Codecs.scala index f505c89cf0..c8ec664671 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/Codecs.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/Codecs.scala @@ -6,29 +6,35 @@ package akka.remote.artery import java.util.concurrent.TimeUnit -import scala.concurrent.{ Future, Promise } +import scala.concurrent.Future +import scala.concurrent.Promise import scala.concurrent.duration._ import scala.util.control.NonFatal import akka.Done -import akka.actor.{ EmptyLocalActorRef, _ } +import akka.actor.EmptyLocalActorRef +import akka.actor._ import akka.event.Logging -import akka.pattern.PromiseActorRef -import akka.remote.{ MessageSerializer, OversizedPayloadException, RemoteActorRefProvider, UniqueAddress } -import akka.remote.artery.Decoder.{ - AdvertiseActorRefsCompressionTable, - AdvertiseClassManifestsCompressionTable, - InboundCompressionAccess, - InboundCompressionAccessImpl -} +import akka.remote.MessageSerializer +import akka.remote.OversizedPayloadException +import akka.remote.RemoteActorRefProvider +import akka.remote.UniqueAddress +import akka.remote.artery.Decoder.AdvertiseActorRefsCompressionTable +import akka.remote.artery.Decoder.AdvertiseClassManifestsCompressionTable +import akka.remote.artery.Decoder.InboundCompressionAccess +import akka.remote.artery.Decoder.InboundCompressionAccessImpl import akka.remote.artery.OutboundHandshake.HandshakeReq import akka.remote.artery.SystemMessageDelivery.SystemMessageEnvelope -import akka.remote.artery.compress._ import akka.remote.artery.compress.CompressionProtocol._ -import akka.serialization.{ Serialization, SerializationExtension, Serializers } +import akka.remote.artery.compress._ +import akka.remote.serialization.AbstractActorRefResolveCache +import akka.serialization.Serialization +import akka.serialization.SerializationExtension +import akka.serialization.Serializers import akka.stream._ import akka.stream.stage._ -import akka.util.{ unused, OptionVal, Unsafe } +import akka.util.OptionVal +import akka.util.unused /** * INTERNAL API @@ -335,21 +341,10 @@ private[remote] object Decoder { private[remote] final class ActorRefResolveCacheWithAddress( provider: RemoteActorRefProvider, localAddress: UniqueAddress) - extends LruBoundedCache[String, InternalActorRef](capacity = 1024, evictAgeThreshold = 600) { + extends AbstractActorRefResolveCache[InternalActorRef] { override protected def compute(k: String): InternalActorRef = provider.resolveActorRefWithLocalAddress(k, localAddress.address) - - override protected def hash(k: String): Int = Unsafe.fastHash(k) - - override protected def isCacheable(v: InternalActorRef): Boolean = - v match { - case _: EmptyLocalActorRef => false - case _: PromiseActorRef => - // each of these are only for one request-response interaction so don't cache - false - case _ => true - } } /** @@ -437,7 +432,7 @@ private[remote] class Decoder( case OptionVal.Some(ref) => OptionVal(ref.asInstanceOf[InternalActorRef]) case OptionVal.None if headerBuilder.senderActorRefPath.isDefined => - OptionVal(actorRefResolver.getOrCompute(headerBuilder.senderActorRefPath.get)) + OptionVal(actorRefResolver.resolve(headerBuilder.senderActorRefPath.get)) case _ => OptionVal.None } catch { diff --git a/akka-remote/src/main/scala/akka/remote/artery/compress/InboundCompressions.scala b/akka-remote/src/main/scala/akka/remote/artery/compress/InboundCompressions.scala index 7a57c9c3ac..7160106b4c 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/compress/InboundCompressions.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/compress/InboundCompressions.scala @@ -10,12 +10,15 @@ import scala.annotation.tailrec import org.agrona.collections.Long2ObjectHashMap -import akka.actor.{ ActorRef, ActorSystem, Address, InternalActorRef } +import akka.actor.ActorRef +import akka.actor.ActorSystem +import akka.actor.Address +import akka.actor.InternalActorRef import akka.event.Logging import akka.event.LoggingAdapter -import akka.pattern.PromiseActorRef import akka.remote.artery._ -import akka.util.{ unused, OptionVal } +import akka.util.OptionVal +import akka.util.unused /** * INTERNAL API @@ -198,9 +201,7 @@ private[remote] final class InboundActorRefCompression( var idx = 0 elements.foreach { case ref: InternalActorRef => - val isTemporaryRef = (ref.isLocal && ref.isInstanceOf[PromiseActorRef]) || - (!ref.isLocal && ref.path.elements.head == "temp") - if (!isTemporaryRef) { + if (!InternalActorRef.isTemporaryRef(ref)) { mb += ref -> idx idx += 1 } diff --git a/akka-remote/src/main/scala/akka/remote/serialization/ActorRefResolveCache.scala b/akka-remote/src/main/scala/akka/remote/serialization/ActorRefResolveCache.scala index 49984ddc9b..5953242f34 100644 --- a/akka-remote/src/main/scala/akka/remote/serialization/ActorRefResolveCache.scala +++ b/akka-remote/src/main/scala/akka/remote/serialization/ActorRefResolveCache.scala @@ -4,6 +4,8 @@ package akka.remote.serialization +import scala.reflect.ClassTag + import akka.actor.ActorRef import akka.actor.ActorSystem import akka.actor.ClassicActorSystemProvider @@ -12,9 +14,12 @@ import akka.actor.ExtendedActorSystem import akka.actor.Extension import akka.actor.ExtensionId import akka.actor.ExtensionIdProvider +import akka.actor.InternalActorRef +import akka.remote.RemoteActorRef import akka.remote.RemoteActorRefProvider import akka.remote.artery.LruBoundedCache -import akka.util.{ unused, Unsafe } +import akka.util.Unsafe +import akka.util.unused /** * INTERNAL API: Thread local cache per actor system @@ -58,12 +63,43 @@ private[akka] class ActorRefResolveThreadLocalCache(val system: ExtendedActorSys * INTERNAL API */ private[akka] final class ActorRefResolveCache(provider: RemoteActorRefProvider) - extends LruBoundedCache[String, ActorRef](capacity = 1024, evictAgeThreshold = 600) { + extends AbstractActorRefResolveCache[ActorRef] { override protected def compute(k: String): ActorRef = provider.internalResolveActorRef(k) +} + +/** + * INTERNAL API + */ +private[akka] abstract class AbstractActorRefResolveCache[R <: ActorRef: ClassTag] + extends LruBoundedCache[String, R](capacity = 1024, evictAgeThreshold = 600) { + + /** + * Compared to `getOrCompute` this will also invalidate cachedAssociation of RemoteActorRef + * if the `Association` is removed. + */ + def resolve(k: String): R = { + val ref = getOrCompute(k) + ref match { + case r: RemoteActorRef => + val cachedAssociation = r.cachedAssociation + if (cachedAssociation != null && cachedAssociation.isRemovedAfterQuarantined()) + r.cachedAssociation = null + case _ => + } + ref + } + + override protected def compute(k: String): R override protected def hash(k: String): Int = Unsafe.fastHash(k) - override protected def isCacheable(v: ActorRef): Boolean = !v.isInstanceOf[EmptyLocalActorRef] + override protected def isCacheable(ref: R): Boolean = + ref match { + case _: EmptyLocalActorRef => false + case _ => + // "temp" only for one request-response interaction so don't cache + !InternalActorRef.isTemporaryRef(ref) + } } diff --git a/akka-remote/src/test/scala/akka/remote/artery/ActorRefResolveCacheQuarantineSpec.scala b/akka-remote/src/test/scala/akka/remote/artery/ActorRefResolveCacheQuarantineSpec.scala new file mode 100644 index 0000000000..edf2b39a78 --- /dev/null +++ b/akka-remote/src/test/scala/akka/remote/artery/ActorRefResolveCacheQuarantineSpec.scala @@ -0,0 +1,59 @@ +/* + * Copyright (C) 2020 Lightbend Inc. + */ + +package akka.remote.artery + +import scala.concurrent.duration._ +import akka.remote.RARP +import akka.testkit.DeadLettersFilter +import akka.testkit.ImplicitSender +import akka.testkit.TestActors +import akka.testkit.TestEvent.Mute +import akka.pattern.ask +import akka.util.Timeout + +/** + * Reproducer of issue #29828 + */ +class ActorRefResolveCacheQuarantineSpec + extends ArteryMultiNodeSpec(""" + akka.remote.artery.advanced.remove-quarantined-association-after = 2 seconds + """) + with ImplicitSender { + import RemoteFailureSpec._ + + private implicit val timeout: Timeout = 3.seconds + + system.eventStream.publish(Mute(DeadLettersFilter(classOf[Ping])(occurrences = Int.MaxValue))) + + "ActorRefResolveCache" should { + + "not use cached quarantined association" in { + system.actorOf(TestActors.echoActorProps, name = "echo") + + val clientSystem1 = newRemoteSystem() + val remoteSelection1 = clientSystem1.actorSelection(rootActorPath(system) / "user" / "echo") + + // PromiseActorRef (temp) doesn't include a uid in the ActorRef + val reply1 = remoteSelection1 ? "hello-1" + reply1.futureValue shouldBe "hello-1" + + shutdown(clientSystem1) + + // wait for it to be removed fully, remove-quarantined-association-after + Thread.sleep(4000) + + val port1 = RARP(clientSystem1).provider.getDefaultAddress.getPort().get + val clientSystem2 = + newRemoteSystem( + name = Some(clientSystem1.name), + extraConfig = Some(s"akka.remote.artery.canonical.port = $port1")) + val remoteSelection2 = clientSystem2.actorSelection(rootActorPath(system) / "user" / "echo") + + val reply2 = remoteSelection2 ? "hello-2" + reply2.futureValue shouldBe "hello-2" + } + + } +}