try delayed retry resolve of remote deployed refs only once, #21403
* If the retried resolve isn't successful the ref is banned and we will not do the delayed retry resolve again. The reason for that is if many messages are sent to such dead refs the resolve process will slow down other messages.
This commit is contained in:
parent
02de58392a
commit
7b665f705e
1 changed files with 28 additions and 7 deletions
|
|
@ -232,6 +232,7 @@ private[remote] class Decoder(
|
|||
private val headerBuilder = HeaderBuilder.in(compression)
|
||||
private val actorRefResolver: ActorRefResolveCache =
|
||||
new ActorRefResolveCache(system.provider.asInstanceOf[RemoteActorRefProvider], uniqueLocalAddress)
|
||||
private val bannedRemoteDeployedActorRefs = new java.util.HashSet[String]
|
||||
|
||||
private val retryResolveRemoteDeployedRecipientInterval = 50.millis
|
||||
private val retryResolveRemoteDeployedRecipientAttempts = 20
|
||||
|
|
@ -322,11 +323,23 @@ private[remote] class Decoder(
|
|||
association)
|
||||
|
||||
if (recipient.isEmpty && !headerBuilder.isNoRecipient) {
|
||||
// the remote deployed actor might not be created yet when resolving the
|
||||
// recipient for the first message that is sent to it, best effort retry
|
||||
scheduleOnce(RetryResolveRemoteDeployedRecipient(
|
||||
retryResolveRemoteDeployedRecipientAttempts,
|
||||
headerBuilder.recipientActorRefPath.get, decoded), retryResolveRemoteDeployedRecipientInterval)
|
||||
|
||||
// The remote deployed actor might not be created yet when resolving the
|
||||
// recipient for the first message that is sent to it, best effort retry.
|
||||
// However, if the retried resolve isn't successful the ref is banned and
|
||||
// we will not do the delayed retry resolve again. The reason for that is
|
||||
// if many messages are sent to such dead refs the resolve process will slow
|
||||
// down other messages.
|
||||
val recipientActorRefPath = headerBuilder.recipientActorRefPath.get
|
||||
if (bannedRemoteDeployedActorRefs.contains(recipientActorRefPath)) {
|
||||
log.debug(
|
||||
"Dropping message for banned (terminated) remote deployed recipient [{}].",
|
||||
recipientActorRefPath)
|
||||
pull(in)
|
||||
} else
|
||||
scheduleOnce(RetryResolveRemoteDeployedRecipient(
|
||||
retryResolveRemoteDeployedRecipientAttempts,
|
||||
recipientActorRefPath, decoded), retryResolveRemoteDeployedRecipientInterval)
|
||||
} else
|
||||
push(out, decoded)
|
||||
}
|
||||
|
|
@ -336,7 +349,6 @@ private[remote] class Decoder(
|
|||
actorRefResolver.getOrCompute(path) match {
|
||||
case empty: EmptyLocalActorRef ⇒
|
||||
val pathElements = empty.path.elements
|
||||
// FIXME remote deployment corner case, please fix @patriknw (see also below, in onTimer)
|
||||
if (pathElements.nonEmpty && pathElements.head == "remote") OptionVal.None
|
||||
else OptionVal(empty)
|
||||
case ref ⇒ OptionVal(ref)
|
||||
|
|
@ -372,8 +384,17 @@ private[remote] class Decoder(
|
|||
attemptsLeft - 1,
|
||||
recipientPath, inboundEnvelope), retryResolveRemoteDeployedRecipientInterval)
|
||||
else {
|
||||
// No more attempts left. If the retried resolve isn't successful the ref is banned and
|
||||
// we will not do the delayed retry resolve again. The reason for that is
|
||||
// if many messages are sent to such dead refs the resolve process will slow
|
||||
// down other messages.
|
||||
if (bannedRemoteDeployedActorRefs.size >= 100) {
|
||||
// keep it bounded
|
||||
bannedRemoteDeployedActorRefs.clear()
|
||||
}
|
||||
bannedRemoteDeployedActorRefs.add(recipientPath)
|
||||
|
||||
val recipient = actorRefResolver.getOrCompute(recipientPath)
|
||||
// FIXME only retry for the first message, need to keep them in a cache
|
||||
push(out, inboundEnvelope.withRecipient(recipient))
|
||||
}
|
||||
case OptionVal.Some(recipient) ⇒
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue