=clu #16726 Down member automatically when restarted
* When new uid is seen in join attempt we can down existing member and thereby new restarted node will be able to join in later retried join attempt without relying on auto-down.
This commit is contained in:
parent
f0818882a3
commit
37f6a6581c
3 changed files with 39 additions and 39 deletions
|
|
@ -448,31 +448,36 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
|
|||
|
||||
// check by address without uid to make sure that node with same host:port is not allowed
|
||||
// to join until previous node with that host:port has been removed from the cluster
|
||||
val alreadyMember = localMembers.exists(_.address == node.address)
|
||||
val isUnreachable = !latestGossip.overview.reachability.isReachable(node)
|
||||
localMembers.find(_.address == node.address) match {
|
||||
case Some(m) if m.uniqueAddress == node ⇒
|
||||
// node retried join attempt, probably due to lost Welcome message
|
||||
logInfo("Existing member [{}] is joining again.", m)
|
||||
if (node != selfUniqueAddress)
|
||||
sender() ! Welcome(selfUniqueAddress, latestGossip)
|
||||
case Some(m) ⇒
|
||||
// node restarted, same host:port as existing member, but with different uid
|
||||
// safe to down and later remove existing member
|
||||
// new node will retry join
|
||||
logInfo("New incarnation of existing member [{}] is trying to join. " +
|
||||
"Existing will be removed from the cluster and then new member will be allowed to join.", m)
|
||||
if (m.status != Down && m.status != Leaving && m.status != Exiting)
|
||||
downing(m.address)
|
||||
case None ⇒
|
||||
// remove the node from the failure detector
|
||||
failureDetector.remove(node.address)
|
||||
|
||||
if (alreadyMember)
|
||||
logInfo("Existing member [{}] is trying to join, ignoring", node)
|
||||
else if (isUnreachable)
|
||||
logInfo("Unreachable member [{}] is trying to join, ignoring", node)
|
||||
else {
|
||||
// add joining node as Joining
|
||||
// add self in case someone else joins before self has joined (Set discards duplicates)
|
||||
val newMembers = localMembers + Member(node, roles) + Member(selfUniqueAddress, cluster.selfRoles)
|
||||
val newGossip = latestGossip copy (members = newMembers)
|
||||
|
||||
// remove the node from the failure detector
|
||||
failureDetector.remove(node.address)
|
||||
updateLatestGossip(newGossip)
|
||||
|
||||
// add joining node as Joining
|
||||
// add self in case someone else joins before self has joined (Set discards duplicates)
|
||||
val newMembers = localMembers + Member(node, roles) + Member(selfUniqueAddress, cluster.selfRoles)
|
||||
val newGossip = latestGossip copy (members = newMembers)
|
||||
logInfo("Node [{}] is JOINING, roles [{}]", node.address, roles.mkString(", "))
|
||||
if (node != selfUniqueAddress)
|
||||
sender() ! Welcome(selfUniqueAddress, latestGossip)
|
||||
|
||||
updateLatestGossip(newGossip)
|
||||
|
||||
logInfo("Node [{}] is JOINING, roles [{}]", node.address, roles.mkString(", "))
|
||||
if (node != selfUniqueAddress) {
|
||||
sender() ! Welcome(selfUniqueAddress, latestGossip)
|
||||
}
|
||||
|
||||
publish(latestGossip)
|
||||
publish(latestGossip)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -776,8 +781,8 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
|
|||
/**
|
||||
* Leader actions are as follows:
|
||||
* 1. Move JOINING => UP -- When a node joins the cluster
|
||||
* 2. Move LEAVING => EXITING -- When all partition handoff has completed
|
||||
* 3. Non-exiting remain -- When all partition handoff has completed
|
||||
* 2. Move LEAVING => EXITING --
|
||||
* 3. Non-exiting remain --
|
||||
* 4. Move unreachable EXITING => REMOVED -- When all nodes have seen the EXITING node as unreachable (convergence) -
|
||||
* remove the node from the node ring and seen table
|
||||
* 5. Move unreachable DOWN/EXITING => REMOVED -- When all nodes have seen that the node is DOWN/EXITING (convergence) -
|
||||
|
|
@ -792,11 +797,6 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
|
|||
val localOverview = localGossip.overview
|
||||
val localSeen = localOverview.seen
|
||||
|
||||
val hasPartionHandoffCompletedSuccessfully: Boolean = {
|
||||
// TODO implement partion handoff and a check if it is completed - now just returns TRUE - e.g. has completed successfully
|
||||
true
|
||||
}
|
||||
|
||||
def enoughMembers: Boolean = {
|
||||
localMembers.size >= MinNrOfMembers && MinNrOfMembersOfRole.forall {
|
||||
case (role, threshold) ⇒ localMembers.count(_.hasRole(role)) >= threshold
|
||||
|
|
@ -827,9 +827,8 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
|
|||
}
|
||||
m.copyUp(upNumber)
|
||||
|
||||
case m if m.status == Leaving && hasPartionHandoffCompletedSuccessfully ⇒
|
||||
// Move LEAVING => EXITING (once we have a convergence on LEAVING
|
||||
// *and* if we have a successful partition handoff)
|
||||
case m if m.status == Leaving ⇒
|
||||
// Move LEAVING => EXITING (once we have a convergence on LEAVING)
|
||||
m copy (status = Exiting)
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue