From 48f88772f17c7a93fe571f99f08a15e0904a45d8 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Mon, 9 Nov 2020 11:50:45 +0100 Subject: [PATCH] cleanup associations when handshake didn't complete, #29615 (#29763) * symptom was continously logged "Quarantine of [] ignored because unknown UID", triggered from the Association.setupIdleTimer task * when handshake wasn't completed for some reason the UID is unknown and then this task continues forever * abort such associations and cancel the idle task in that case * also cleanup such associations from AssociationRegistry.removeUnusedQuarantined * 3 state ADT for the uniqueAddress --- .../akka/remote/artery/ArteryTransport.scala | 35 ++++++++++++------ .../akka/remote/artery/Association.scala | 37 ++++++++++++++----- 2 files changed, 52 insertions(+), 20 deletions(-) diff --git a/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala b/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala index a01baf53da..1bba99ee88 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala @@ -106,6 +106,12 @@ private[remote] object AssociationState { private final case class UniqueRemoteAddressValue( uniqueRemoteAddress: Option[UniqueAddress], listeners: List[UniqueAddress => Unit]) + + sealed trait UniqueRemoteAddressState + case object UidKnown extends UniqueRemoteAddressState + case object UidUnknown extends UniqueRemoteAddressState + case object UidQuarantined extends UniqueRemoteAddressState + } /** @@ -118,8 +124,7 @@ private[remote] final class AssociationState( val quarantined: ImmutableLongMap[AssociationState.QuarantinedTimestamp], _uniqueRemoteAddress: AtomicReference[AssociationState.UniqueRemoteAddressValue]) { - import AssociationState.QuarantinedTimestamp - import AssociationState.UniqueRemoteAddressValue + import AssociationState._ /** * Full outbound address with UID for this association. @@ -127,6 +132,23 @@ private[remote] final class AssociationState( */ def uniqueRemoteAddress(): Option[UniqueAddress] = _uniqueRemoteAddress.get().uniqueRemoteAddress + def uniqueRemoteAddressState(): UniqueRemoteAddressState = { + uniqueRemoteAddress() match { + case Some(a) if isQuarantined(a.uid) => UidQuarantined + case Some(_) => UidKnown + case None => UidUnknown // handshake not completed yet + } + } + + def isQuarantined(): Boolean = { + uniqueRemoteAddress() match { + case Some(a) => isQuarantined(a.uid) + case None => false // handshake not completed yet + } + } + + def isQuarantined(uid: Long): Boolean = quarantined.contains(uid) + @tailrec def completeUniqueRemoteAddress(peer: UniqueAddress): Unit = { val current = _uniqueRemoteAddress.get() if (current.uniqueRemoteAddress.isEmpty) { @@ -176,15 +198,6 @@ private[remote] final class AssociationState( case None => this } - def isQuarantined(): Boolean = { - uniqueRemoteAddress() match { - case Some(a) => isQuarantined(a.uid) - case None => false // handshake not completed yet - } - } - - def isQuarantined(uid: Long): Boolean = quarantined.contains(uid) - def withControlIdleKillSwitch(killSwitch: OptionVal[SharedKillSwitch]): AssociationState = new AssociationState( incarnation, diff --git a/akka-remote/src/main/scala/akka/remote/artery/Association.scala b/akka-remote/src/main/scala/akka/remote/artery/Association.scala index 72eba60688..1a4c19c5b1 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/Association.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/Association.scala @@ -666,6 +666,15 @@ private[remote] class Association( // If idle longer than quarantine-idle-outbound-after and the low frequency HandshakeReq // doesn't get through it will be quarantined to cleanup lingering associations to crashed systems. quarantine(s"Idle longer than quarantine-idle-outbound-after [${QuarantineIdleOutboundAfter.pretty}]") + associationState.uniqueRemoteAddressState() match { + case AssociationState.UidQuarantined => // quarantined as expected + case AssociationState.UidKnown => // must be new uid, keep as is + case AssociationState.UidUnknown => + val newLastUsedDurationNanos = System.nanoTime() - associationState.lastUsedTimestamp.get + // quarantine ignored due to unknown UID, have to stop this task anyway + if (newLastUsedDurationNanos >= QuarantineIdleOutboundAfter.toNanos) + abortQuarantined() // quarantine ignored due to unknown UID, have to stop this task anyway + } } else if (lastUsedDurationNanos >= StopIdleOutboundAfter.toNanos) { streamMatValues.get.foreach { case (queueIndex, OutboundStreamMatValues(streamKillSwitch, _, stopping)) => @@ -1166,10 +1175,14 @@ private[remote] class AssociationRegistry(createAssociation: Address => Associat val remove = currentMap.foldLeft(Map.empty[Address, Association]) { case (acc, (address, association)) => val state = association.associationState - if (state.isQuarantined() && ((now - state.lastUsedTimestamp.get) >= afterNanos)) - acc.updated(address, association) - else + if ((now - state.lastUsedTimestamp.get) >= afterNanos) { + state.uniqueRemoteAddressState() match { + case AssociationState.UidQuarantined | AssociationState.UidUnknown => acc.updated(address, association) + case AssociationState.UidKnown => acc + } + } else { acc + } } if (remove.nonEmpty) { val newMap = currentMap -- remove.keysIterator @@ -1184,12 +1197,18 @@ private[remote] class AssociationRegistry(createAssociation: Address => Associat val now = System.nanoTime() val afterNanos = after.toNanos val currentMap = associationsByUid.get - var remove = Map.empty[Long, Association] - currentMap.keysIterator.foreach { uid => - val association = currentMap.get(uid).get - val state = association.associationState - if (state.isQuarantined() && ((now - state.lastUsedTimestamp.get) >= afterNanos)) - remove = remove.updated(uid, association) + val remove = currentMap.keysIterator.foldLeft(Map.empty[Long, Association]) { + case (acc, uid) => + val association = currentMap.get(uid).get + val state = association.associationState + if ((now - state.lastUsedTimestamp.get) >= afterNanos) { + state.uniqueRemoteAddressState() match { + case AssociationState.UidQuarantined | AssociationState.UidUnknown => acc.updated(uid, association) + case AssociationState.UidKnown => acc + } + } else { + acc + } } if (remove.nonEmpty) { val newMap = remove.keysIterator.foldLeft(currentMap)((acc, uid) => acc.remove(uid))