diff --git a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala index 40d67d7161..709f82a5e8 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala @@ -256,7 +256,7 @@ object Gossip { * When convergence is reached the leader change status of `members` from `Joining` * to `Up`. * - * When failure detector consider a node as unavailble it will be moved from + * When failure detector consider a node as unavailable it will be moved from * `members` to `overview.unreachable`. * * When a node is downed, either manually or automatically, its status is changed to `Down`. @@ -555,12 +555,14 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) } } - private val state = { + private def createCleanState: State = { // note that self is not initially member, // and the Gossip is not versioned for this 'Node' yet - new AtomicReference[State](State(Gossip(members = Gossip.emptyMembers))) + State(Gossip(members = Gossip.emptyMembers)) } + private val state = new AtomicReference[State](createCleanState) + // try to join one of the nodes defined in the 'akka.cluster.seed-nodes' if (AutoJoin) joinSeedNode() @@ -735,8 +737,10 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) @tailrec final def join(address: Address): Unit = { val localState = state.get - val newState = localState copy (joinInProgress = localState.joinInProgress + - (address -> (Deadline.now + JoinTimeout))) + // wipe our state + val newState = createCleanState copy (joinInProgress = Map.empty + (address -> (Deadline.now + JoinTimeout))) + // wipe the failure detector + failureDetector.reset() if (!state.compareAndSet(localState, newState)) join(address) // recur else { val connection = clusterCommandConnectionFor(address) @@ -818,6 +822,9 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) val newUnreachableMembers = localUnreachable filterNot { _.address == node } val newOverview = localGossip.overview copy (unreachable = newUnreachableMembers) + // remove the node from the failure detector if it is a DOWN node that is rejoining cluster + if (localUnreachable.size > newUnreachableMembers.size) failureDetector.remove(node) + // add joining node as Joining // add self in case someone else joins before self has joined (Set discards duplicates) val newMembers = localMembers :+ Member(node, Joining) :+ Member(selfAddress, Joining) @@ -998,6 +1005,13 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) latestGossip = winningGossip seen selfAddress, joinInProgress = newJoinInProgress) + // for all new joining nodes we optimistically remove them from the failure detector, since if we wait until + // we have won the CAS, then the node might be picked up by the reapUnreachableMembers task and moved to + // unreachable before we can remove the node from the failure detector + (newState.latestGossip.members -- localState.latestGossip.members).filter(_.status == Joining).foreach { + case node ⇒ failureDetector.remove(node.address) + } + // if we won the race then update else try again if (!state.compareAndSet(localState, newState)) receiveGossip(from, remoteGossip) // recur if we fail the update else {