diff --git a/akka-remote/src/main/scala/akka/remote/artery/Codecs.scala b/akka-remote/src/main/scala/akka/remote/artery/Codecs.scala index e62d6de02e..e2ee25c58e 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/Codecs.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/Codecs.scala @@ -232,6 +232,7 @@ private[remote] class Decoder( private val headerBuilder = HeaderBuilder.in(compression) private val actorRefResolver: ActorRefResolveCache = new ActorRefResolveCache(system.provider.asInstanceOf[RemoteActorRefProvider], uniqueLocalAddress) + private val bannedRemoteDeployedActorRefs = new java.util.HashSet[String] private val retryResolveRemoteDeployedRecipientInterval = 50.millis private val retryResolveRemoteDeployedRecipientAttempts = 20 @@ -322,11 +323,23 @@ private[remote] class Decoder( association) if (recipient.isEmpty && !headerBuilder.isNoRecipient) { - // the remote deployed actor might not be created yet when resolving the - // recipient for the first message that is sent to it, best effort retry - scheduleOnce(RetryResolveRemoteDeployedRecipient( - retryResolveRemoteDeployedRecipientAttempts, - headerBuilder.recipientActorRefPath.get, decoded), retryResolveRemoteDeployedRecipientInterval) + + // The remote deployed actor might not be created yet when resolving the + // recipient for the first message that is sent to it, best effort retry. + // However, if the retried resolve isn't successful the ref is banned and + // we will not do the delayed retry resolve again. The reason for that is + // if many messages are sent to such dead refs the resolve process will slow + // down other messages. + val recipientActorRefPath = headerBuilder.recipientActorRefPath.get + if (bannedRemoteDeployedActorRefs.contains(recipientActorRefPath)) { + log.debug( + "Dropping message for banned (terminated) remote deployed recipient [{}].", + recipientActorRefPath) + pull(in) + } else + scheduleOnce(RetryResolveRemoteDeployedRecipient( + retryResolveRemoteDeployedRecipientAttempts, + recipientActorRefPath, decoded), retryResolveRemoteDeployedRecipientInterval) } else push(out, decoded) } @@ -336,7 +349,6 @@ private[remote] class Decoder( actorRefResolver.getOrCompute(path) match { case empty: EmptyLocalActorRef ⇒ val pathElements = empty.path.elements - // FIXME remote deployment corner case, please fix @patriknw (see also below, in onTimer) if (pathElements.nonEmpty && pathElements.head == "remote") OptionVal.None else OptionVal(empty) case ref ⇒ OptionVal(ref) @@ -372,8 +384,17 @@ private[remote] class Decoder( attemptsLeft - 1, recipientPath, inboundEnvelope), retryResolveRemoteDeployedRecipientInterval) else { + // No more attempts left. If the retried resolve isn't successful the ref is banned and + // we will not do the delayed retry resolve again. The reason for that is + // if many messages are sent to such dead refs the resolve process will slow + // down other messages. + if (bannedRemoteDeployedActorRefs.size >= 100) { + // keep it bounded + bannedRemoteDeployedActorRefs.clear() + } + bannedRemoteDeployedActorRefs.add(recipientPath) + val recipient = actorRefResolver.getOrCompute(recipientPath) - // FIXME only retry for the first message, need to keep them in a cache push(out, inboundEnvelope.withRecipient(recipient)) } case OptionVal.Some(recipient) ⇒