Quarantine gracefully downed node after some time (#21534)
* New setting for quarantining after graceful leave
This commit is contained in:
parent
1012248c0b
commit
0f376e751e
3 changed files with 27 additions and 7 deletions
|
|
@ -52,9 +52,12 @@ private[cluster] class ClusterRemoteWatcher(
|
|||
unreachableReaperInterval,
|
||||
heartbeatExpectedResponseAfter) {
|
||||
|
||||
private val arteryEnabled = RARP(context.system).provider.remoteSettings.Artery.Enabled
|
||||
val cluster = Cluster(context.system)
|
||||
import cluster.selfAddress
|
||||
|
||||
private final case class DelayedQuarantine(m: Member, previousStatus: MemberStatus) extends NoSerializationVerificationNeeded
|
||||
|
||||
var clusterNodes: Set[Address] = Set.empty
|
||||
|
||||
override def preStart(): Unit = {
|
||||
|
|
@ -74,10 +77,11 @@ private[cluster] class ClusterRemoteWatcher(
|
|||
clusterNodes = state.members.collect { case m if m.address != selfAddress ⇒ m.address }
|
||||
clusterNodes foreach takeOverResponsibility
|
||||
unreachable = unreachable diff clusterNodes
|
||||
case MemberUp(m) ⇒ memberUp(m)
|
||||
case MemberWeaklyUp(m) ⇒ memberUp(m)
|
||||
case MemberRemoved(m, previousStatus) ⇒ memberRemoved(m, previousStatus)
|
||||
case _: MemberEvent ⇒ // not interesting
|
||||
case MemberUp(m) ⇒ memberUp(m)
|
||||
case MemberWeaklyUp(m) ⇒ memberUp(m)
|
||||
case MemberRemoved(m, previousStatus) ⇒ memberRemoved(m, previousStatus)
|
||||
case _: MemberEvent ⇒ // not interesting
|
||||
case DelayedQuarantine(m, previousStatus) ⇒ delayedQuarantine(m, previousStatus)
|
||||
}
|
||||
|
||||
def memberUp(m: Member): Unit =
|
||||
|
|
@ -90,14 +94,22 @@ private[cluster] class ClusterRemoteWatcher(
|
|||
def memberRemoved(m: Member, previousStatus: MemberStatus): Unit =
|
||||
if (m.address != selfAddress) {
|
||||
clusterNodes -= m.address
|
||||
// The reason we don't quarantine gracefully removed members (leaving) is that
|
||||
// Cluster Singleton need to exchange TakeOver/HandOver messages.
|
||||
|
||||
if (previousStatus == MemberStatus.Down) {
|
||||
quarantine(m.address, Some(m.uniqueAddress.longUid), s"Cluster member removed, previous status [$previousStatus]")
|
||||
quarantine(m.address, Some(m.uniqueAddress.uid), s"Cluster member removed, previous status [$previousStatus]")
|
||||
} else if (arteryEnabled) {
|
||||
// don't quarantine gracefully removed members (leaving) directly,
|
||||
// give Cluster Singleton some time to exchange TakeOver/HandOver messages.
|
||||
import context.dispatcher
|
||||
context.system.scheduler.scheduleOnce(cluster.settings.QuarantineRemovedNodeAfter, self, DelayedQuarantine(m, previousStatus))
|
||||
}
|
||||
|
||||
publishAddressTerminated(m.address)
|
||||
}
|
||||
|
||||
def delayedQuarantine(m: Member, previousStatus: MemberStatus): Unit =
|
||||
quarantine(m.address, Some(m.uniqueAddress.longUid), s"Cluster member removed, previous status [$previousStatus]")
|
||||
|
||||
override def watchNode(watchee: InternalActorRef) =
|
||||
if (!clusterNodes(watchee.path.address)) super.watchNode(watchee)
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue