Node that joins again should be ignored, see #2184
This commit is contained in:
parent
7e55c8424b
commit
dbac17621f
2 changed files with 45 additions and 16 deletions
|
|
@ -571,27 +571,28 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒
|
|||
val localState = state.get
|
||||
val localGossip = localState.latestGossip
|
||||
val localMembers = localGossip.members
|
||||
val localOverview = localGossip.overview
|
||||
val localUnreachableMembers = localOverview.unreachable
|
||||
|
||||
// remove the node from the 'unreachable' set in case it is a DOWN node that is rejoining cluster
|
||||
val newUnreachableMembers = localUnreachableMembers filterNot { _.address == node }
|
||||
val newOverview = localOverview copy (unreachable = newUnreachableMembers)
|
||||
if (!localMembers.exists(_.address == node)) {
|
||||
|
||||
val newMembers = localMembers + Member(node, MemberStatus.Joining) // add joining node as Joining
|
||||
val newGossip = localGossip copy (overview = newOverview, members = newMembers)
|
||||
// remove the node from the 'unreachable' set in case it is a DOWN node that is rejoining cluster
|
||||
val newUnreachableMembers = localGossip.overview.unreachable filterNot { _.address == node }
|
||||
val newOverview = localGossip.overview copy (unreachable = newUnreachableMembers)
|
||||
|
||||
val versionedGossip = newGossip + vclockNode
|
||||
val seenVersionedGossip = versionedGossip seen selfAddress
|
||||
val newMembers = localMembers + Member(node, MemberStatus.Joining) // add joining node as Joining
|
||||
val newGossip = localGossip copy (overview = newOverview, members = newMembers)
|
||||
|
||||
val newState = localState copy (latestGossip = seenVersionedGossip)
|
||||
val versionedGossip = newGossip + vclockNode
|
||||
val seenVersionedGossip = versionedGossip seen selfAddress
|
||||
|
||||
if (!state.compareAndSet(localState, newState)) joining(node) // recur if we failed update
|
||||
else {
|
||||
if (node != selfAddress) failureDetector heartbeat node
|
||||
val newState = localState copy (latestGossip = seenVersionedGossip)
|
||||
|
||||
if (convergence(newState.latestGossip).isDefined) {
|
||||
newState.memberMembershipChangeListeners foreach { _ notify newMembers }
|
||||
if (!state.compareAndSet(localState, newState)) joining(node) // recur if we failed update
|
||||
else {
|
||||
if (node != selfAddress) failureDetector heartbeat node
|
||||
|
||||
if (convergence(newState.latestGossip).isDefined) {
|
||||
newState.memberMembershipChangeListeners foreach { _ notify newMembers }
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue