cleanup associations when handshake didn't complete, #29615 (#29763)

* symptom was continously logged "Quarantine of [] ignored because unknown UID",
  triggered from the Association.setupIdleTimer task
* when handshake wasn't completed for some reason the UID is unknown and then
  this task continues forever
* abort such associations and cancel the idle task in that case
* also cleanup such associations from AssociationRegistry.removeUnusedQuarantined
* 3 state ADT for the uniqueAddress
This commit is contained in:
Patrik Nordwall 2020-11-09 11:50:45 +01:00 committed by GitHub
parent a57742f31b
commit 48f88772f1
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 52 additions and 20 deletions

View file

@ -106,6 +106,12 @@ private[remote] object AssociationState {
private final case class UniqueRemoteAddressValue(
uniqueRemoteAddress: Option[UniqueAddress],
listeners: List[UniqueAddress => Unit])
sealed trait UniqueRemoteAddressState
case object UidKnown extends UniqueRemoteAddressState
case object UidUnknown extends UniqueRemoteAddressState
case object UidQuarantined extends UniqueRemoteAddressState
}
/**
@ -118,8 +124,7 @@ private[remote] final class AssociationState(
val quarantined: ImmutableLongMap[AssociationState.QuarantinedTimestamp],
_uniqueRemoteAddress: AtomicReference[AssociationState.UniqueRemoteAddressValue]) {
import AssociationState.QuarantinedTimestamp
import AssociationState.UniqueRemoteAddressValue
import AssociationState._
/**
* Full outbound address with UID for this association.
@ -127,6 +132,23 @@ private[remote] final class AssociationState(
*/
def uniqueRemoteAddress(): Option[UniqueAddress] = _uniqueRemoteAddress.get().uniqueRemoteAddress
def uniqueRemoteAddressState(): UniqueRemoteAddressState = {
uniqueRemoteAddress() match {
case Some(a) if isQuarantined(a.uid) => UidQuarantined
case Some(_) => UidKnown
case None => UidUnknown // handshake not completed yet
}
}
def isQuarantined(): Boolean = {
uniqueRemoteAddress() match {
case Some(a) => isQuarantined(a.uid)
case None => false // handshake not completed yet
}
}
def isQuarantined(uid: Long): Boolean = quarantined.contains(uid)
@tailrec def completeUniqueRemoteAddress(peer: UniqueAddress): Unit = {
val current = _uniqueRemoteAddress.get()
if (current.uniqueRemoteAddress.isEmpty) {
@ -176,15 +198,6 @@ private[remote] final class AssociationState(
case None => this
}
def isQuarantined(): Boolean = {
uniqueRemoteAddress() match {
case Some(a) => isQuarantined(a.uid)
case None => false // handshake not completed yet
}
}
def isQuarantined(uid: Long): Boolean = quarantined.contains(uid)
def withControlIdleKillSwitch(killSwitch: OptionVal[SharedKillSwitch]): AssociationState =
new AssociationState(
incarnation,

View file

@ -666,6 +666,15 @@ private[remote] class Association(
// If idle longer than quarantine-idle-outbound-after and the low frequency HandshakeReq
// doesn't get through it will be quarantined to cleanup lingering associations to crashed systems.
quarantine(s"Idle longer than quarantine-idle-outbound-after [${QuarantineIdleOutboundAfter.pretty}]")
associationState.uniqueRemoteAddressState() match {
case AssociationState.UidQuarantined => // quarantined as expected
case AssociationState.UidKnown => // must be new uid, keep as is
case AssociationState.UidUnknown =>
val newLastUsedDurationNanos = System.nanoTime() - associationState.lastUsedTimestamp.get
// quarantine ignored due to unknown UID, have to stop this task anyway
if (newLastUsedDurationNanos >= QuarantineIdleOutboundAfter.toNanos)
abortQuarantined() // quarantine ignored due to unknown UID, have to stop this task anyway
}
} else if (lastUsedDurationNanos >= StopIdleOutboundAfter.toNanos) {
streamMatValues.get.foreach {
case (queueIndex, OutboundStreamMatValues(streamKillSwitch, _, stopping)) =>
@ -1166,10 +1175,14 @@ private[remote] class AssociationRegistry(createAssociation: Address => Associat
val remove = currentMap.foldLeft(Map.empty[Address, Association]) {
case (acc, (address, association)) =>
val state = association.associationState
if (state.isQuarantined() && ((now - state.lastUsedTimestamp.get) >= afterNanos))
acc.updated(address, association)
else
if ((now - state.lastUsedTimestamp.get) >= afterNanos) {
state.uniqueRemoteAddressState() match {
case AssociationState.UidQuarantined | AssociationState.UidUnknown => acc.updated(address, association)
case AssociationState.UidKnown => acc
}
} else {
acc
}
}
if (remove.nonEmpty) {
val newMap = currentMap -- remove.keysIterator
@ -1184,12 +1197,18 @@ private[remote] class AssociationRegistry(createAssociation: Address => Associat
val now = System.nanoTime()
val afterNanos = after.toNanos
val currentMap = associationsByUid.get
var remove = Map.empty[Long, Association]
currentMap.keysIterator.foreach { uid =>
val association = currentMap.get(uid).get
val state = association.associationState
if (state.isQuarantined() && ((now - state.lastUsedTimestamp.get) >= afterNanos))
remove = remove.updated(uid, association)
val remove = currentMap.keysIterator.foldLeft(Map.empty[Long, Association]) {
case (acc, uid) =>
val association = currentMap.get(uid).get
val state = association.associationState
if ((now - state.lastUsedTimestamp.get) >= afterNanos) {
state.uniqueRemoteAddressState() match {
case AssociationState.UidQuarantined | AssociationState.UidUnknown => acc.updated(uid, association)
case AssociationState.UidKnown => acc
}
} else {
acc
}
}
if (remove.nonEmpty) {
val newMap = remove.keysIterator.foldLeft(currentMap)((acc, uid) => acc.remove(uid))