=rem 3933: Quarantine piercing should not lose quarantine marker
(cherry picked from commit 7c8160b)
This commit is contained in:
parent
e7b37e0ed5
commit
f69a94a3f4
2 changed files with 120 additions and 15 deletions
|
|
@ -332,8 +332,18 @@ private[remote] object EndpointManager {
|
|||
def isReadOnly(endpoint: ActorRef): Boolean = readonlyToAddress contains endpoint
|
||||
|
||||
def isQuarantined(address: Address, uid: Int): Boolean = writableEndpointWithPolicyFor(address) match {
|
||||
case Some(Quarantined(`uid`, timeOfRelease)) ⇒ timeOfRelease.hasTimeLeft()
|
||||
case _ ⇒ false
|
||||
// timeOfRelease is only used for garbage collection. If an address is still probed, we should report the
|
||||
// known fact that it is quarantined.
|
||||
case Some(Quarantined(`uid`, _)) ⇒ true
|
||||
case _ ⇒ false
|
||||
}
|
||||
|
||||
def refuseUid(address: Address): Option[Int] = writableEndpointWithPolicyFor(address) match {
|
||||
// timeOfRelease is only used for garbage collection. If an address is still probed, we should report the
|
||||
// known fact that it is quarantined.
|
||||
case Some(Quarantined(uid, _)) ⇒ Some(uid)
|
||||
case Some(Pass(_, uidOption)) ⇒ uidOption
|
||||
case _ ⇒ None
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -397,21 +407,33 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends
|
|||
var pendingReadHandoffs = Map[ActorRef, AkkaProtocolHandle]()
|
||||
var stashedInbound = Map[ActorRef, Vector[InboundAssociation]]()
|
||||
|
||||
def keepQuarantinedOr(remoteAddress: Address)(body: ⇒ Unit): Unit = endpoints.refuseUid(remoteAddress) match {
|
||||
case Some(uid) ⇒
|
||||
log.info("Quarantined address [{}] is still unreachable or has not been restarted. Keeping it quarantined.", remoteAddress)
|
||||
// Restoring Quarantine marker overwritten by a Pass(endpoint, refuseUid) pair while probing remote system.
|
||||
endpoints.markAsQuarantined(remoteAddress, uid, Deadline.now + settings.QuarantineDuration)
|
||||
case None ⇒ body
|
||||
}
|
||||
|
||||
override val supervisorStrategy =
|
||||
OneForOneStrategy(loggingEnabled = false) {
|
||||
case e @ InvalidAssociation(localAddress, remoteAddress, reason) ⇒
|
||||
log.warning("Tried to associate with unreachable remote address [{}]. " +
|
||||
"Address is now gated for {} ms, all messages to this address will be delivered to dead letters. Reason: {}",
|
||||
remoteAddress, settings.RetryGateClosedFor.toMillis, reason.getMessage)
|
||||
endpoints.markAsFailed(sender(), Deadline.now + settings.RetryGateClosedFor)
|
||||
keepQuarantinedOr(remoteAddress) {
|
||||
log.warning("Tried to associate with unreachable remote address [{}]. " +
|
||||
"Address is now gated for {} ms, all messages to this address will be delivered to dead letters. Reason: {}",
|
||||
remoteAddress, settings.RetryGateClosedFor.toMillis, reason.getMessage)
|
||||
endpoints.markAsFailed(sender(), Deadline.now + settings.RetryGateClosedFor)
|
||||
}
|
||||
AddressTerminatedTopic(context.system).publish(AddressTerminated(remoteAddress))
|
||||
Stop
|
||||
|
||||
case ShutDownAssociation(localAddress, remoteAddress, _) ⇒
|
||||
log.debug("Remote system with address [{}] has shut down. " +
|
||||
"Address is now gated for {} ms, all messages to this address will be delivered to dead letters.",
|
||||
remoteAddress, settings.RetryGateClosedFor.toMillis)
|
||||
endpoints.markAsFailed(sender(), Deadline.now + settings.RetryGateClosedFor)
|
||||
keepQuarantinedOr(remoteAddress) {
|
||||
log.debug("Remote system with address [{}] has shut down. " +
|
||||
"Address is now gated for {} ms, all messages to this address will be delivered to dead letters.",
|
||||
remoteAddress, settings.RetryGateClosedFor.toMillis)
|
||||
endpoints.markAsFailed(sender(), Deadline.now + settings.RetryGateClosedFor)
|
||||
}
|
||||
AddressTerminatedTopic(context.system).publish(AddressTerminated(remoteAddress))
|
||||
Stop
|
||||
|
||||
|
|
@ -426,10 +448,12 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends
|
|||
Stop
|
||||
|
||||
case HopelessAssociation(localAddress, remoteAddress, None, _) ⇒
|
||||
log.warning("Association to [{}] with unknown UID is irrecoverably failed. " +
|
||||
"Address cannot be quarantined without knowing the UID, gating instead for {} ms.",
|
||||
remoteAddress, settings.RetryGateClosedFor.toMillis)
|
||||
endpoints.markAsFailed(sender(), Deadline.now + settings.RetryGateClosedFor)
|
||||
keepQuarantinedOr(remoteAddress) {
|
||||
log.warning("Association to [{}] with unknown UID is irrecoverably failed. " +
|
||||
"Address cannot be quarantined without knowing the UID, gating instead for {} ms.",
|
||||
remoteAddress, settings.RetryGateClosedFor.toMillis)
|
||||
endpoints.markAsFailed(sender(), Deadline.now + settings.RetryGateClosedFor)
|
||||
}
|
||||
AddressTerminatedTopic(context.system).publish(AddressTerminated(remoteAddress))
|
||||
Stop
|
||||
|
||||
|
|
@ -514,7 +538,7 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends
|
|||
def createAndRegisterWritingEndpoint(refuseUid: Option[Int]): ActorRef =
|
||||
endpoints.registerWritableEndpoint(
|
||||
recipientAddress,
|
||||
None,
|
||||
refuseUid,
|
||||
createEndpoint(
|
||||
recipientAddress,
|
||||
recipientRef.localAddressToUse,
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue