From 7b665f705e3b71dd5d5b264de8fa03dfcedd2997 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Fri, 9 Sep 2016 10:15:12 +0200 Subject: [PATCH] try delayed retry resolve of remote deployed refs only once, #21403 * If the retried resolve isn't successful the ref is banned and we will not do the delayed retry resolve again. The reason for that is if many messages are sent to such dead refs the resolve process will slow down other messages. --- .../scala/akka/remote/artery/Codecs.scala | 35 +++++++++++++++---- 1 file changed, 28 insertions(+), 7 deletions(-) diff --git a/akka-remote/src/main/scala/akka/remote/artery/Codecs.scala b/akka-remote/src/main/scala/akka/remote/artery/Codecs.scala index e62d6de02e..e2ee25c58e 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/Codecs.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/Codecs.scala @@ -232,6 +232,7 @@ private[remote] class Decoder( private val headerBuilder = HeaderBuilder.in(compression) private val actorRefResolver: ActorRefResolveCache = new ActorRefResolveCache(system.provider.asInstanceOf[RemoteActorRefProvider], uniqueLocalAddress) + private val bannedRemoteDeployedActorRefs = new java.util.HashSet[String] private val retryResolveRemoteDeployedRecipientInterval = 50.millis private val retryResolveRemoteDeployedRecipientAttempts = 20 @@ -322,11 +323,23 @@ private[remote] class Decoder( association) if (recipient.isEmpty && !headerBuilder.isNoRecipient) { - // the remote deployed actor might not be created yet when resolving the - // recipient for the first message that is sent to it, best effort retry - scheduleOnce(RetryResolveRemoteDeployedRecipient( - retryResolveRemoteDeployedRecipientAttempts, - headerBuilder.recipientActorRefPath.get, decoded), retryResolveRemoteDeployedRecipientInterval) + + // The remote deployed actor might not be created yet when resolving the + // recipient for the first message that is sent to it, best effort retry. + // However, if the retried resolve isn't successful the ref is banned and + // we will not do the delayed retry resolve again. The reason for that is + // if many messages are sent to such dead refs the resolve process will slow + // down other messages. + val recipientActorRefPath = headerBuilder.recipientActorRefPath.get + if (bannedRemoteDeployedActorRefs.contains(recipientActorRefPath)) { + log.debug( + "Dropping message for banned (terminated) remote deployed recipient [{}].", + recipientActorRefPath) + pull(in) + } else + scheduleOnce(RetryResolveRemoteDeployedRecipient( + retryResolveRemoteDeployedRecipientAttempts, + recipientActorRefPath, decoded), retryResolveRemoteDeployedRecipientInterval) } else push(out, decoded) } @@ -336,7 +349,6 @@ private[remote] class Decoder( actorRefResolver.getOrCompute(path) match { case empty: EmptyLocalActorRef ⇒ val pathElements = empty.path.elements - // FIXME remote deployment corner case, please fix @patriknw (see also below, in onTimer) if (pathElements.nonEmpty && pathElements.head == "remote") OptionVal.None else OptionVal(empty) case ref ⇒ OptionVal(ref) @@ -372,8 +384,17 @@ private[remote] class Decoder( attemptsLeft - 1, recipientPath, inboundEnvelope), retryResolveRemoteDeployedRecipientInterval) else { + // No more attempts left. If the retried resolve isn't successful the ref is banned and + // we will not do the delayed retry resolve again. The reason for that is + // if many messages are sent to such dead refs the resolve process will slow + // down other messages. + if (bannedRemoteDeployedActorRefs.size >= 100) { + // keep it bounded + bannedRemoteDeployedActorRefs.clear() + } + bannedRemoteDeployedActorRefs.add(recipientPath) + val recipient = actorRefResolver.getOrCompute(recipientPath) - // FIXME only retry for the first message, need to keep them in a cache push(out, inboundEnvelope.withRecipient(recipient)) } case OptionVal.Some(recipient) ⇒