Support for re-JOINING a node that have been DOWN. See #1908

This commit is contained in:
Björn Antonsson 2012-06-27 15:56:45 +02:00
parent db1175e1f3
commit 574ff26bb4

View file

@ -256,7 +256,7 @@ object Gossip {
* When convergence is reached the leader change status of `members` from `Joining`
* to `Up`.
*
* When failure detector consider a node as unavailble it will be moved from
* When failure detector consider a node as unavailable it will be moved from
* `members` to `overview.unreachable`.
*
* When a node is downed, either manually or automatically, its status is changed to `Down`.
@ -555,12 +555,14 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
}
}
private val state = {
private def createCleanState: State = {
// note that self is not initially member,
// and the Gossip is not versioned for this 'Node' yet
new AtomicReference[State](State(Gossip(members = Gossip.emptyMembers)))
State(Gossip(members = Gossip.emptyMembers))
}
private val state = new AtomicReference[State](createCleanState)
// try to join one of the nodes defined in the 'akka.cluster.seed-nodes'
if (AutoJoin) joinSeedNode()
@ -735,8 +737,10 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
@tailrec
final def join(address: Address): Unit = {
val localState = state.get
val newState = localState copy (joinInProgress = localState.joinInProgress +
(address -> (Deadline.now + JoinTimeout)))
// wipe our state
val newState = createCleanState copy (joinInProgress = Map.empty + (address -> (Deadline.now + JoinTimeout)))
// wipe the failure detector
failureDetector.reset()
if (!state.compareAndSet(localState, newState)) join(address) // recur
else {
val connection = clusterCommandConnectionFor(address)
@ -818,6 +822,9 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
val newUnreachableMembers = localUnreachable filterNot { _.address == node }
val newOverview = localGossip.overview copy (unreachable = newUnreachableMembers)
// remove the node from the failure detector if it is a DOWN node that is rejoining cluster
if (localUnreachable.size > newUnreachableMembers.size) failureDetector.remove(node)
// add joining node as Joining
// add self in case someone else joins before self has joined (Set discards duplicates)
val newMembers = localMembers :+ Member(node, Joining) :+ Member(selfAddress, Joining)
@ -998,6 +1005,13 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
latestGossip = winningGossip seen selfAddress,
joinInProgress = newJoinInProgress)
// for all new joining nodes we optimistically remove them from the failure detector, since if we wait until
// we have won the CAS, then the node might be picked up by the reapUnreachableMembers task and moved to
// unreachable before we can remove the node from the failure detector
(newState.latestGossip.members -- localState.latestGossip.members).filter(_.status == Joining).foreach {
case node failureDetector.remove(node.address)
}
// if we won the race then update else try again
if (!state.compareAndSet(localState, newState)) receiveGossip(from, remoteGossip) // recur if we fail the update
else {