Correction of gossip merge when joining, see #2204
The problem: * Node that is Up joins a cluster and becomes Joining in that cluster * The joining node receives gossip, which results in conflict, merge results in Up * It became Up in the new cluster without passing the ordinary leader action to move it to Up The solution: * Change priority order of Up and Joining so that Joining is used when merging
This commit is contained in:
parent
5f77590eb8
commit
f7a01505ba
3 changed files with 28 additions and 28 deletions
|
|
@ -118,6 +118,15 @@ object Member {
|
|||
case _ ⇒ None
|
||||
}
|
||||
|
||||
def pickHighestPriority(a: Set[Member], b: Set[Member]): Set[Member] = {
|
||||
// group all members by Address => Seq[Member]
|
||||
val groupedByAddress = (a.toSeq ++ b.toSeq).groupBy(_.address)
|
||||
// pick highest MemberStatus
|
||||
(Set.empty[Member] /: groupedByAddress) {
|
||||
case (acc, (_, members)) ⇒ acc + members.reduceLeft(highestPriorityOf)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Picks the Member with the highest "priority" MemberStatus.
|
||||
*/
|
||||
|
|
@ -130,8 +139,8 @@ object Member {
|
|||
case (_, Exiting) ⇒ m2
|
||||
case (Leaving, _) ⇒ m1
|
||||
case (_, Leaving) ⇒ m2
|
||||
case (Up, Joining) ⇒ m1
|
||||
case (Joining, Up) ⇒ m2
|
||||
case (Up, Joining) ⇒ m2
|
||||
case (Joining, Up) ⇒ m1
|
||||
case (Joining, Joining) ⇒ m1
|
||||
case (Up, Up) ⇒ m1
|
||||
}
|
||||
|
|
@ -268,21 +277,12 @@ case class Gossip(
|
|||
// 2. merge meta-data
|
||||
val mergedMeta = this.meta ++ that.meta
|
||||
|
||||
def pickHighestPriority(a: Seq[Member], b: Seq[Member]): Set[Member] = {
|
||||
// group all members by Address => Seq[Member]
|
||||
val groupedByAddress = (a ++ b).groupBy(_.address)
|
||||
// pick highest MemberStatus
|
||||
(Set.empty[Member] /: groupedByAddress) {
|
||||
case (acc, (_, members)) ⇒ acc + members.reduceLeft(Member.highestPriorityOf)
|
||||
}
|
||||
}
|
||||
|
||||
// 3. merge unreachable by selecting the single Member with highest MemberStatus out of the Member groups
|
||||
val mergedUnreachable = pickHighestPriority(this.overview.unreachable.toSeq, that.overview.unreachable.toSeq)
|
||||
val mergedUnreachable = Member.pickHighestPriority(this.overview.unreachable, that.overview.unreachable)
|
||||
|
||||
// 4. merge members by selecting the single Member with highest MemberStatus out of the Member groups,
|
||||
// and exclude unreachable
|
||||
val mergedMembers = Gossip.emptyMembers ++ pickHighestPriority(this.members.toSeq, that.members.toSeq).
|
||||
val mergedMembers = Gossip.emptyMembers ++ Member.pickHighestPriority(this.members, that.members).
|
||||
filterNot(mergedUnreachable.contains)
|
||||
|
||||
// 5. fresh seen table
|
||||
|
|
|
|||
|
|
@ -39,7 +39,7 @@ abstract class ConvergenceSpec
|
|||
|
||||
"A cluster of 3 members" must {
|
||||
|
||||
"reach initial convergence" taggedAs LongRunningTest ignore {
|
||||
"reach initial convergence" taggedAs LongRunningTest in {
|
||||
awaitClusterUp(first, second, third)
|
||||
|
||||
runOn(fourth) {
|
||||
|
|
@ -49,7 +49,7 @@ abstract class ConvergenceSpec
|
|||
testConductor.enter("after-1")
|
||||
}
|
||||
|
||||
"not reach convergence while any nodes are unreachable" taggedAs LongRunningTest ignore {
|
||||
"not reach convergence while any nodes are unreachable" taggedAs LongRunningTest in {
|
||||
val thirdAddress = node(third).address
|
||||
testConductor.enter("before-shutdown")
|
||||
|
||||
|
|
@ -81,7 +81,7 @@ abstract class ConvergenceSpec
|
|||
testConductor.enter("after-2")
|
||||
}
|
||||
|
||||
"not move a new joining node to Up while there is no convergence" taggedAs LongRunningTest ignore {
|
||||
"not move a new joining node to Up while there is no convergence" taggedAs LongRunningTest in {
|
||||
runOn(fourth) {
|
||||
// try to join
|
||||
cluster.join(node(first).address)
|
||||
|
|
|
|||
|
|
@ -33,12 +33,12 @@ class GossipSpec extends WordSpec with MustMatchers {
|
|||
val g2 = Gossip(members = SortedSet(a2, c2, e2))
|
||||
|
||||
val merged1 = g1 merge g2
|
||||
merged1.members must be(SortedSet(a1, c1, e2))
|
||||
merged1.members.toSeq.map(_.status) must be(Seq(Up, Leaving, Up))
|
||||
merged1.members must be(SortedSet(a2, c1, e1))
|
||||
merged1.members.toSeq.map(_.status) must be(Seq(Joining, Leaving, Joining))
|
||||
|
||||
val merged2 = g2 merge g1
|
||||
merged2.members must be(SortedSet(a1, c1, e2))
|
||||
merged2.members.toSeq.map(_.status) must be(Seq(Up, Leaving, Up))
|
||||
merged2.members must be(SortedSet(a2, c1, e1))
|
||||
merged2.members.toSeq.map(_.status) must be(Seq(Joining, Leaving, Joining))
|
||||
|
||||
}
|
||||
|
||||
|
|
@ -48,12 +48,12 @@ class GossipSpec extends WordSpec with MustMatchers {
|
|||
val g2 = Gossip(members = Gossip.emptyMembers, overview = GossipOverview(unreachable = Set(a2, b2, c2, d2)))
|
||||
|
||||
val merged1 = g1 merge g2
|
||||
merged1.overview.unreachable must be(Set(a1, b2, c1, d2))
|
||||
merged1.overview.unreachable.toSeq.sorted.map(_.status) must be(Seq(Up, Removed, Leaving, Removed))
|
||||
merged1.overview.unreachable must be(Set(a2, b2, c1, d2))
|
||||
merged1.overview.unreachable.toSeq.sorted.map(_.status) must be(Seq(Joining, Removed, Leaving, Removed))
|
||||
|
||||
val merged2 = g2 merge g1
|
||||
merged2.overview.unreachable must be(Set(a1, b2, c1, d2))
|
||||
merged2.overview.unreachable.toSeq.sorted.map(_.status) must be(Seq(Up, Removed, Leaving, Removed))
|
||||
merged2.overview.unreachable must be(Set(a2, b2, c1, d2))
|
||||
merged2.overview.unreachable.toSeq.sorted.map(_.status) must be(Seq(Joining, Removed, Leaving, Removed))
|
||||
|
||||
}
|
||||
|
||||
|
|
@ -62,14 +62,14 @@ class GossipSpec extends WordSpec with MustMatchers {
|
|||
val g2 = Gossip(members = SortedSet(a2, c2), overview = GossipOverview(unreachable = Set(b2, d2)))
|
||||
|
||||
val merged1 = g1 merge g2
|
||||
merged1.members must be(SortedSet(a1))
|
||||
merged1.members.toSeq.map(_.status) must be(Seq(Up))
|
||||
merged1.members must be(SortedSet(a2))
|
||||
merged1.members.toSeq.map(_.status) must be(Seq(Joining))
|
||||
merged1.overview.unreachable must be(Set(b2, c1, d2))
|
||||
merged1.overview.unreachable.toSeq.sorted.map(_.status) must be(Seq(Removed, Leaving, Removed))
|
||||
|
||||
val merged2 = g2 merge g1
|
||||
merged2.members must be(SortedSet(a1))
|
||||
merged2.members.toSeq.map(_.status) must be(Seq(Up))
|
||||
merged2.members must be(SortedSet(a2))
|
||||
merged2.members.toSeq.map(_.status) must be(Seq(Joining))
|
||||
merged2.overview.unreachable must be(Set(b2, c1, d2))
|
||||
merged2.overview.unreachable.toSeq.sorted.map(_.status) must be(Seq(Removed, Leaving, Removed))
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue