Merge with master

This commit is contained in:
Viktor Klang 2012-06-29 16:07:36 +02:00
commit e62a0eee1c
15 changed files with 562 additions and 224 deletions

View file

@ -30,6 +30,7 @@ import MemberStatus._
import scala.annotation.tailrec
import scala.collection.immutable.{ Map, SortedSet }
import scala.collection.GenTraversableOnce
import java.util.concurrent.atomic.AtomicLong
/**
* Interface for membership change listener.
@ -257,7 +258,7 @@ object Gossip {
* When convergence is reached the leader change status of `members` from `Joining`
* to `Up`.
*
* When failure detector consider a node as unavailble it will be moved from
* When failure detector consider a node as unavailable it will be moved from
* `members` to `overview.unreachable`.
*
* When a node is downed, either manually or automatically, its status is changed to `Down`.
@ -556,12 +557,14 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
}
}
private val state = {
private def createCleanState: State = {
// note that self is not initially member,
// and the Gossip is not versioned for this 'Node' yet
new AtomicReference[State](State(Gossip(members = Gossip.emptyMembers)))
State(Gossip(members = Gossip.emptyMembers))
}
private val state = new AtomicReference[State](createCleanState)
// try to join one of the nodes defined in the 'akka.cluster.seed-nodes'
if (AutoJoin) joinSeedNode()
@ -736,8 +739,11 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
@tailrec
final def join(address: Address): Unit = {
val localState = state.get
val newState = localState copy (joinInProgress = localState.joinInProgress +
(address -> (Deadline.now + JoinTimeout)))
// wipe our state since a node that joins a cluster must be empty
val newState = createCleanState copy (joinInProgress = Map.empty + (address -> (Deadline.now + JoinTimeout)),
memberMembershipChangeListeners = localState.memberMembershipChangeListeners)
// wipe the failure detector since we are starting fresh and shouldn't care about the past
failureDetector.reset()
if (!state.compareAndSet(localState, newState)) join(address) // recur
else {
val connection = clusterCommandConnectionFor(address)
@ -816,9 +822,12 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
if (!alreadyMember && !isUnreachable) {
// remove the node from the 'unreachable' set in case it is a DOWN node that is rejoining cluster
val newUnreachableMembers = localUnreachable filterNot { _.address == node }
val (rejoiningMember, newUnreachableMembers) = localUnreachable partition { _.address == node }
val newOverview = localGossip.overview copy (unreachable = newUnreachableMembers)
// remove the node from the failure detector if it is a DOWN node that is rejoining cluster
if (rejoiningMember.nonEmpty) failureDetector.remove(node)
// add joining node as Joining
// add self in case someone else joins before self has joined (Set discards duplicates)
val newMembers = localMembers :+ Member(node, Joining) :+ Member(selfAddress, Joining)
@ -833,7 +842,10 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
else {
log.info("Cluster Node [{}] - Node [{}] is JOINING", selfAddress, node)
// treat join as initial heartbeat, so that it becomes unavailable if nothing more happens
if (node != selfAddress) failureDetector heartbeat node
if (node != selfAddress) {
failureDetector heartbeat node
gossipTo(node)
}
notifyMembershipChangeListeners(localState, newState)
}
}
@ -950,6 +962,13 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
}
}
// Can be removed when gossip has been optimized
private val _receivedGossipCount = new AtomicLong
/**
* INTERNAL API.
*/
private[cluster] def receivedGossipCount: Long = _receivedGossipCount.get
/**
* INTERNAL API.
*
@ -968,10 +987,6 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
val mergedGossip = remoteGossip merge localGossip
val versionedMergedGossip = mergedGossip :+ vclockNode
log.debug(
"""Can't establish a causal relationship between "remote" gossip and "local" gossip - Remote[{}] - Local[{}] - merging them into [{}]""",
remoteGossip, localGossip, versionedMergedGossip)
versionedMergedGossip
} else if (remoteGossip.version < localGossip.version) {
@ -993,11 +1008,31 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
latestGossip = winningGossip seen selfAddress,
joinInProgress = newJoinInProgress)
// for all new joining nodes we optimistically remove them from the failure detector, since if we wait until
// we have won the CAS, then the node might be picked up by the reapUnreachableMembers task and moved to
// unreachable before we can remove the node from the failure detector
(newState.latestGossip.members -- localState.latestGossip.members).filter(_.status == Joining).foreach {
case node failureDetector.remove(node.address)
}
// if we won the race then update else try again
if (!state.compareAndSet(localState, newState)) receiveGossip(from, remoteGossip) // recur if we fail the update
else {
log.debug("Cluster Node [{}] - Receiving gossip from [{}]", selfAddress, from)
if ((winningGossip ne localGossip) && (winningGossip ne remoteGossip))
log.debug(
"""Couldn't establish a causal relationship between "remote" gossip and "local" gossip - Remote[{}] - Local[{}] - merged them into [{}]""",
remoteGossip, localGossip, winningGossip)
_receivedGossipCount.incrementAndGet()
notifyMembershipChangeListeners(localState, newState)
if ((winningGossip ne remoteGossip) || (newState.latestGossip ne remoteGossip)) {
// send back gossip to sender when sender had different view, i.e. merge, or sender had
// older or sender had newer
gossipTo(from)
}
}
}
}
@ -1068,8 +1103,21 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
val localUnreachableMembers = localGossip.overview.unreachable.toIndexedSeq
val localUnreachableSize = localUnreachableMembers.size
// 1. gossip to alive members
val gossipedToAlive = gossipToRandomNodeOf(localMemberAddresses)
// 1. gossip to a random alive member with preference to a member
// with older or newer gossip version
val nodesWithdifferentView = {
val localMemberAddressesSet = localGossip.members map { _.address }
for {
(address, version) localGossip.overview.seen
if localMemberAddressesSet contains address
if version != localGossip.version
} yield address
}
val gossipedToAlive =
if (nodesWithdifferentView.nonEmpty && ThreadLocalRandom.current.nextDouble() < GossipDifferentViewProbability)
gossipToRandomNodeOf(nodesWithdifferentView.toIndexedSeq)
else
gossipToRandomNodeOf(localMemberAddresses)
// 2. gossip to a deputy nodes for facilitating partition healing
val deputies = deputyNodes(localMemberAddresses)