Correct? implementation of merge and other actions, see #2077

* Merge unreachable using  highestPriorityOf
* Avoid merge result in node existing in both members and unreachable
* Fix joining only allowed when !alreadyMember && !isUnreachable (non Down)
* Fix filter bug of unreachable in downing and leaderActions
* Minor cleanups
This commit is contained in:
Patrik Nordwall 2012-06-13 09:37:10 +02:00
parent 8d12385a3e
commit 42c5281d5a
2 changed files with 104 additions and 92 deletions

View file

@ -220,25 +220,30 @@ case class Gossip(
// 1. merge vector clocks
val mergedVClock = this.version merge that.version
// 2. group all members by Address => Seq[Member]
val membersGroupedByAddress = (this.members.toSeq ++ that.members.toSeq).groupBy(_.address)
// 3. merge members by selecting the single Member with highest MemberStatus out of the Member groups
val mergedMembers =
Gossip.emptyMembers ++
membersGroupedByAddress.values.foldLeft(Vector.empty[Member]) { (acc, members)
acc :+ members.reduceLeft(Member.highestPriorityOf(_, _))
}
// 4. merge meta-data
// 2. merge meta-data
val mergedMeta = this.meta ++ that.meta
// 5. merge gossip overview
val mergedOverview = GossipOverview(
this.overview.seen ++ that.overview.seen,
this.overview.unreachable ++ that.overview.unreachable)
def reduceHighestPriority(a: Seq[Member], b: Seq[Member]): Set[Member] = {
// group all members by Address => Seq[Member]
val groupedByAddress = (a ++ b).groupBy(_.address)
// pick highest MemberStatus
(groupedByAddress.values.foldLeft(Vector.empty[Member]) { (acc, members)
acc :+ members.reduceLeft(Member.highestPriorityOf(_, _))
}).toSet
}
Gossip(mergedOverview, mergedMembers, mergedMeta, mergedVClock)
// 3. merge unreachable by selecting the single Member with highest MemberStatus out of the Member groups
val mergedUnreachable = reduceHighestPriority(this.overview.unreachable.toSeq, that.overview.unreachable.toSeq)
// 4. merge members by selecting the single Member with highest MemberStatus out of the Member groups,
// and exclude unreachable
val mergedMembers = Gossip.emptyMembers ++ reduceHighestPriority(this.members.toSeq, that.members.toSeq).
filterNot(m mergedUnreachable.contains(m))
// 5. merge seen (FIXME is this correct?)
val mergedSeen = this.overview.seen ++ that.overview.seen
Gossip(GossipOverview(mergedSeen, mergedUnreachable), mergedMembers, mergedMeta, mergedVClock)
}
override def toString =
@ -648,11 +653,17 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
val localState = state.get
val localGossip = localState.latestGossip
val localMembers = localGossip.members
val localUnreachable = localGossip.overview.unreachable
if (!localMembers.exists(_.address == node)) {
val alreadyMember = localMembers.exists(_.address == node)
val isUnreachable = localUnreachable.exists { m
m.address == node && m.status != MemberStatus.Down && m.status != MemberStatus.Removed
}
if (!alreadyMember && !isUnreachable) {
// remove the node from the 'unreachable' set in case it is a DOWN node that is rejoining cluster
val newUnreachableMembers = localGossip.overview.unreachable filterNot { _.address == node }
val newUnreachableMembers = localUnreachable filterNot { _.address == node }
val newOverview = localGossip.overview copy (unreachable = newUnreachableMembers)
val newMembers = localMembers + Member(node, MemberStatus.Joining) // add joining node as Joining
@ -719,8 +730,8 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
}
/**
* The node to DOWN is removed from the 'members' set and put in the 'unreachable' set (if not alread there)
* and its status is set to DOWN. The node is alo removed from the 'seen' table.
* The node to DOWN is removed from the 'members' set and put in the 'unreachable' set (if not already there)
* and its status is set to DOWN. The node is also removed from the 'seen' table.
*
* The node will reside as DOWN in the 'unreachable' set until an explicit command JOIN command is sent directly
* to this node and it will then go through the normal JOINING procedure.
@ -735,42 +746,34 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
val localUnreachableMembers = localOverview.unreachable
// 1. check if the node to DOWN is in the 'members' set
var downedMember: Option[Member] = None
val newMembers =
localMembers
.map { member
if (member.address == address) {
log.info("Cluster Node [{}] - Marking node [{}] as DOWN", selfAddress, member.address)
val newMember = member copy (status = MemberStatus.Down)
downedMember = Some(newMember)
newMember
} else member
}
.filter(_.status != MemberStatus.Down)
val downedMember: Option[Member] = localMembers.find(_.address == address).map(m m.copy(status = MemberStatus.Down))
val newMembers = downedMember match {
case Some(m)
log.info("Cluster Node [{}] - Marking node [{}] as DOWN", selfAddress, m.address)
localMembers - m
case None localMembers
}
// 2. check if the node to DOWN is in the 'unreachable' set
val newUnreachableMembers =
localUnreachableMembers
.filter(_.status != MemberStatus.Down) // no need to DOWN members already DOWN
.map { member
if (member.address == address) {
log.info("Cluster Node [{}] - Marking unreachable node [{}] as DOWN", selfAddress, member.address)
member copy (status = MemberStatus.Down)
} else member
}
localUnreachableMembers.map { member
// no need to DOWN members already DOWN
if (member.address == address && member.status != MemberStatus.Down) {
log.info("Cluster Node [{}] - Marking unreachable node [{}] as DOWN", selfAddress, member.address)
member copy (status = MemberStatus.Down)
} else member
}
// 3. add the newly DOWNED members from the 'members' (in step 1.) to the 'newUnreachableMembers' set.
val newUnreachablePlusNewlyDownedMembers = downedMember match {
case Some(member) newUnreachableMembers + member
case None newUnreachableMembers
}
val newUnreachablePlusNewlyDownedMembers = newUnreachableMembers ++ downedMember
// 4. remove nodes marked as DOWN from the 'seen' table
val newSeen = newUnreachablePlusNewlyDownedMembers.foldLeft(localSeen) { (currentSeen, member)
currentSeen - member.address
val newSeen = localSeen -- newUnreachablePlusNewlyDownedMembers.collect {
case m if m.status == MemberStatus.Down m.address
}
val newOverview = localOverview copy (seen = newSeen, unreachable = newUnreachablePlusNewlyDownedMembers) // update gossip overview
// update gossip overview
val newOverview = localOverview copy (seen = newSeen, unreachable = newUnreachablePlusNewlyDownedMembers)
val newGossip = localGossip copy (overview = newOverview, members = newMembers) // update gossip
val versionedGossip = newGossip + vclockNode
val newState = localState copy (latestGossip = versionedGossip seen selfAddress)
@ -831,36 +834,6 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
*/
private def autoJoin(): Unit = nodeToJoin foreach join
/**
* Switches the member status.
*
* @param newStatus the new member status
* @param oldState the state to change the member status in
* @return the updated new state with the new member status
*/
private def switchMemberStatusTo(newStatus: MemberStatus, state: State): State = {
log.info("Cluster Node [{}] - Switching membership status to [{}]", selfAddress, newStatus)
val localSelf = self
val localGossip = state.latestGossip
val localMembers = localGossip.members
// change my state into a "new" self
val newSelf = localSelf copy (status = newStatus)
// change my state in 'gossip.members'
val newMembers = localMembers map { member if (member.address == selfAddress) newSelf else member }
val newGossip = localGossip copy (members = newMembers)
// version my changes
val versionedGossip = newGossip + vclockNode
val seenVersionedGossip = versionedGossip seen selfAddress
state copy (latestGossip = seenVersionedGossip)
}
/**
* INTERNAL API
*
@ -985,8 +958,8 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
if (newlyDetectedUnreachableMembers.nonEmpty) { // we have newly detected members marked as unavailable
val newMembers = localMembers diff newlyDetectedUnreachableMembers
val newUnreachableMembers: Set[Member] = localUnreachableMembers ++ newlyDetectedUnreachableMembers
val newMembers = localMembers -- newlyDetectedUnreachableMembers
val newUnreachableMembers = localUnreachableMembers ++ newlyDetectedUnreachableMembers
val newOverview = localOverview copy (unreachable = newUnreachableMembers)
val newGossip = localGossip copy (overview = newOverview, members = newMembers)
@ -1090,16 +1063,20 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
// 4. Move UNREACHABLE => DOWN (auto-downing by leader)
// ----------------------
val newUnreachableMembers =
localUnreachableMembers
.filter(_.status != MemberStatus.Down) // no need to DOWN members already DOWN
.map { member
localUnreachableMembers.map { member
// no need to DOWN members already DOWN
if (member.status == MemberStatus.Down) member
else {
log.info("Cluster Node [{}] - Leader is marking unreachable node [{}] as DOWN", selfAddress, member.address)
hasChangedState = true
member copy (status = MemberStatus.Down)
}
}
// removing nodes marked as DOWN from the 'seen' table
val newSeen = localUnreachableMembers.foldLeft(localSeen)((currentSeen, member) currentSeen - member.address)
val newSeen = localSeen -- newUnreachableMembers.collect {
case m if m.status == MemberStatus.Down m.address
}
val newOverview = localOverview copy (seen = newSeen, unreachable = newUnreachableMembers) // update gossip overview
localGossip copy (overview = newOverview) // update gossip