Merge pull request #21414 from akka/wip-21403-banned-resolveActorRef-patriknw
try delayed retry resolve of remote deployed refs only once, #21403
This commit is contained in:
commit
e2ea9fc46a
1 changed files with 28 additions and 7 deletions
|
|
@ -232,6 +232,7 @@ private[remote] class Decoder(
|
||||||
private val headerBuilder = HeaderBuilder.in(compression)
|
private val headerBuilder = HeaderBuilder.in(compression)
|
||||||
private val actorRefResolver: ActorRefResolveCache =
|
private val actorRefResolver: ActorRefResolveCache =
|
||||||
new ActorRefResolveCache(system.provider.asInstanceOf[RemoteActorRefProvider], uniqueLocalAddress)
|
new ActorRefResolveCache(system.provider.asInstanceOf[RemoteActorRefProvider], uniqueLocalAddress)
|
||||||
|
private val bannedRemoteDeployedActorRefs = new java.util.HashSet[String]
|
||||||
|
|
||||||
private val retryResolveRemoteDeployedRecipientInterval = 50.millis
|
private val retryResolveRemoteDeployedRecipientInterval = 50.millis
|
||||||
private val retryResolveRemoteDeployedRecipientAttempts = 20
|
private val retryResolveRemoteDeployedRecipientAttempts = 20
|
||||||
|
|
@ -322,11 +323,23 @@ private[remote] class Decoder(
|
||||||
association)
|
association)
|
||||||
|
|
||||||
if (recipient.isEmpty && !headerBuilder.isNoRecipient) {
|
if (recipient.isEmpty && !headerBuilder.isNoRecipient) {
|
||||||
// the remote deployed actor might not be created yet when resolving the
|
|
||||||
// recipient for the first message that is sent to it, best effort retry
|
// The remote deployed actor might not be created yet when resolving the
|
||||||
scheduleOnce(RetryResolveRemoteDeployedRecipient(
|
// recipient for the first message that is sent to it, best effort retry.
|
||||||
retryResolveRemoteDeployedRecipientAttempts,
|
// However, if the retried resolve isn't successful the ref is banned and
|
||||||
headerBuilder.recipientActorRefPath.get, decoded), retryResolveRemoteDeployedRecipientInterval)
|
// we will not do the delayed retry resolve again. The reason for that is
|
||||||
|
// if many messages are sent to such dead refs the resolve process will slow
|
||||||
|
// down other messages.
|
||||||
|
val recipientActorRefPath = headerBuilder.recipientActorRefPath.get
|
||||||
|
if (bannedRemoteDeployedActorRefs.contains(recipientActorRefPath)) {
|
||||||
|
log.debug(
|
||||||
|
"Dropping message for banned (terminated) remote deployed recipient [{}].",
|
||||||
|
recipientActorRefPath)
|
||||||
|
pull(in)
|
||||||
|
} else
|
||||||
|
scheduleOnce(RetryResolveRemoteDeployedRecipient(
|
||||||
|
retryResolveRemoteDeployedRecipientAttempts,
|
||||||
|
recipientActorRefPath, decoded), retryResolveRemoteDeployedRecipientInterval)
|
||||||
} else
|
} else
|
||||||
push(out, decoded)
|
push(out, decoded)
|
||||||
}
|
}
|
||||||
|
|
@ -336,7 +349,6 @@ private[remote] class Decoder(
|
||||||
actorRefResolver.getOrCompute(path) match {
|
actorRefResolver.getOrCompute(path) match {
|
||||||
case empty: EmptyLocalActorRef ⇒
|
case empty: EmptyLocalActorRef ⇒
|
||||||
val pathElements = empty.path.elements
|
val pathElements = empty.path.elements
|
||||||
// FIXME remote deployment corner case, please fix @patriknw (see also below, in onTimer)
|
|
||||||
if (pathElements.nonEmpty && pathElements.head == "remote") OptionVal.None
|
if (pathElements.nonEmpty && pathElements.head == "remote") OptionVal.None
|
||||||
else OptionVal(empty)
|
else OptionVal(empty)
|
||||||
case ref ⇒ OptionVal(ref)
|
case ref ⇒ OptionVal(ref)
|
||||||
|
|
@ -372,8 +384,17 @@ private[remote] class Decoder(
|
||||||
attemptsLeft - 1,
|
attemptsLeft - 1,
|
||||||
recipientPath, inboundEnvelope), retryResolveRemoteDeployedRecipientInterval)
|
recipientPath, inboundEnvelope), retryResolveRemoteDeployedRecipientInterval)
|
||||||
else {
|
else {
|
||||||
|
// No more attempts left. If the retried resolve isn't successful the ref is banned and
|
||||||
|
// we will not do the delayed retry resolve again. The reason for that is
|
||||||
|
// if many messages are sent to such dead refs the resolve process will slow
|
||||||
|
// down other messages.
|
||||||
|
if (bannedRemoteDeployedActorRefs.size >= 100) {
|
||||||
|
// keep it bounded
|
||||||
|
bannedRemoteDeployedActorRefs.clear()
|
||||||
|
}
|
||||||
|
bannedRemoteDeployedActorRefs.add(recipientPath)
|
||||||
|
|
||||||
val recipient = actorRefResolver.getOrCompute(recipientPath)
|
val recipient = actorRefResolver.getOrCompute(recipientPath)
|
||||||
// FIXME only retry for the first message, need to keep them in a cache
|
|
||||||
push(out, inboundEnvelope.withRecipient(recipient))
|
push(out, inboundEnvelope.withRecipient(recipient))
|
||||||
}
|
}
|
||||||
case OptionVal.Some(recipient) ⇒
|
case OptionVal.Some(recipient) ⇒
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue