Merge branch 'master' into wip-2077-gossip-merge-patriknw

Conflicts:
	akka-cluster/src/main/scala/akka/cluster/Cluster.scala
This commit is contained in:
Patrik Nordwall 2012-06-13 17:04:09 +02:00
commit c5164085b2
7 changed files with 132 additions and 110 deletions

View file

@ -240,9 +240,9 @@ case class Gossip(
/**
* Increments the version for this 'Node'.
*/
def +(node: VectorClock.Node): Gossip = copy(version = version + node)
def :+(node: VectorClock.Node): Gossip = copy(version = version :+ node)
def +(member: Member): Gossip = {
def :+(member: Member): Gossip = {
if (members contains member) this
else this copy (members = members + member)
}
@ -474,7 +474,7 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
private val state = {
val member = Member(selfAddress, Joining)
val versionedGossip = Gossip(members = Gossip.emptyMembers + member) + vclockNode // add me as member and update my vector clock
val versionedGossip = Gossip(members = Gossip.emptyMembers + member) :+ vclockNode // add me as member and update my vector clock
val seenVersionedGossip = versionedGossip seen selfAddress
new AtomicReference[State](State(seenVersionedGossip))
}
@ -714,7 +714,7 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
val newMembers = localMembers + Member(node, Joining) // add joining node as Joining
val newGossip = localGossip copy (overview = newOverview, members = newMembers)
val versionedGossip = newGossip + vclockNode
val versionedGossip = newGossip :+ vclockNode
val seenVersionedGossip = versionedGossip seen selfAddress
val newState = localState copy (latestGossip = seenVersionedGossip)
@ -742,7 +742,7 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
val newMembers = localMembers + Member(address, Leaving) // mark node as LEAVING
val newGossip = localGossip copy (members = newMembers)
val versionedGossip = newGossip + vclockNode
val versionedGossip = newGossip :+ vclockNode
val seenVersionedGossip = versionedGossip seen selfAddress
val newState = localState copy (latestGossip = seenVersionedGossip)
@ -822,7 +822,7 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
// update gossip overview
val newOverview = localOverview copy (seen = newSeen, unreachable = newUnreachablePlusNewlyDownedMembers)
val newGossip = localGossip copy (overview = newOverview, members = newMembers) // update gossip
val versionedGossip = newGossip + vclockNode
val versionedGossip = newGossip :+ vclockNode
val newState = localState copy (latestGossip = versionedGossip seen selfAddress)
if (!state.compareAndSet(localState, newState)) downing(address) // recur if we fail the update
@ -843,7 +843,7 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
if (remoteGossip.version <> localGossip.version) {
// concurrent
val mergedGossip = remoteGossip merge localGossip
val versionedMergedGossip = mergedGossip + vclockNode
val versionedMergedGossip = mergedGossip :+ vclockNode
// FIXME change to debug log level, when failure detector is stable
log.info(
@ -1012,7 +1012,7 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
val newGossip = localGossip copy (overview = newOverview, members = newMembers)
// updating vclock and 'seen' table
val versionedGossip = newGossip + vclockNode
val versionedGossip = newGossip :+ vclockNode
val seenVersionedGossip = versionedGossip seen selfAddress
val newState = localState copy (latestGossip = seenVersionedGossip)
@ -1135,7 +1135,7 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
// ----------------------
// 5. Updating the vclock version for the changes
// ----------------------
val versionedGossip = newGossip + vclockNode
val versionedGossip = newGossip :+ vclockNode
// ----------------------
// 6. Updating the 'seen' table
@ -1162,23 +1162,39 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
private def convergence(gossip: Gossip): Option[Gossip] = {
val overview = gossip.overview
val unreachable = overview.unreachable
val seen = overview.seen
// First check that:
// 1. we don't have any members that are unreachable (unreachable.isEmpty == true), or
// 1. we don't have any members that are unreachable, or
// 2. all unreachable members in the set have status DOWN
// Else we can't continue to check for convergence
// When that is done we check that all the entries in the 'seen' table have the same vector clock version
if (unreachable.isEmpty || !unreachable.exists { m
// and that all members exists in seen table
val hasUnreachable = unreachable.nonEmpty && unreachable.exists { m
m.status != Down && m.status != Removed
}) {
val seen = gossip.overview.seen
val views = Set.empty[VectorClock] ++ seen.values
}
val allMembersInSeen = gossip.members.forall(m seen.contains(m.address))
if (views.size == 1) {
if (hasUnreachable) {
log.debug("Cluster Node [{}] - No cluster convergence, due to unreachable nodes [{}].", selfAddress, unreachable)
None
} else if (!allMembersInSeen) {
log.debug("Cluster Node [{}] - No cluster convergence, due to members not in seen table [{}].", selfAddress,
gossip.members.map(_.address) -- seen.keySet)
None
} else {
val views = seen.values.toSet.size
if (views == 1) {
log.debug("Cluster Node [{}] - Cluster convergence reached: [{}]", selfAddress, gossip.members.mkString(", "))
Some(gossip)
} else None
} else None
} else {
log.debug("Cluster Node [{}] - No cluster convergence, since not all nodes have seen the same state yet. [{} of {}]",
selfAddress, views, seen.values.size)
None
}
}
}
private def isAvailable(state: State): Boolean = !isUnavailable(state)