Workaround for SI-5986, see #2275
* Add new operators :+ and :++ by implicit conversion * Unfortunately this means that we must remember to use these until SI-5986 is fixed. Is there a better way?
This commit is contained in:
parent
a4e27aaea6
commit
aed78f702b
2 changed files with 35 additions and 10 deletions
|
|
@ -27,6 +27,7 @@ import javax.management._
|
||||||
import MemberStatus._
|
import MemberStatus._
|
||||||
import scala.annotation.tailrec
|
import scala.annotation.tailrec
|
||||||
import scala.collection.immutable.{ Map, SortedSet }
|
import scala.collection.immutable.{ Map, SortedSet }
|
||||||
|
import scala.collection.GenTraversableOnce
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Interface for membership change listener.
|
* Interface for membership change listener.
|
||||||
|
|
@ -179,6 +180,20 @@ object Member {
|
||||||
case (Joining, Joining) ⇒ m1
|
case (Joining, Joining) ⇒ m1
|
||||||
case (Up, Up) ⇒ m1
|
case (Up, Up) ⇒ m1
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// FIXME Workaround for https://issues.scala-lang.org/browse/SI-5986
|
||||||
|
// SortedSet + and ++ operators replaces existing element
|
||||||
|
// Use these :+ and :++ operators for the Gossip members
|
||||||
|
implicit def sortedSetWorkaround(sortedSet: SortedSet[Member]): SortedSetWorkaround = new SortedSetWorkaround(sortedSet)
|
||||||
|
class SortedSetWorkaround(sortedSet: SortedSet[Member]) {
|
||||||
|
implicit def :+(elem: Member): SortedSet[Member] = {
|
||||||
|
if (sortedSet.contains(elem)) sortedSet
|
||||||
|
else sortedSet + elem
|
||||||
|
}
|
||||||
|
|
||||||
|
implicit def :++(elems: GenTraversableOnce[Member]): SortedSet[Member] =
|
||||||
|
sortedSet ++ (elems.toSet diff sortedSet)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -226,6 +241,7 @@ case class GossipOverview(
|
||||||
|
|
||||||
object Gossip {
|
object Gossip {
|
||||||
val emptyMembers: SortedSet[Member] = SortedSet.empty
|
val emptyMembers: SortedSet[Member] = SortedSet.empty
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -300,7 +316,7 @@ case class Gossip(
|
||||||
*/
|
*/
|
||||||
def :+(member: Member): Gossip = {
|
def :+(member: Member): Gossip = {
|
||||||
if (members contains member) this
|
if (members contains member) this
|
||||||
else this copy (members = members + member)
|
else this copy (members = members :+ member)
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -329,7 +345,7 @@ case class Gossip(
|
||||||
|
|
||||||
// 4. merge members by selecting the single Member with highest MemberStatus out of the Member groups,
|
// 4. merge members by selecting the single Member with highest MemberStatus out of the Member groups,
|
||||||
// and exclude unreachable
|
// and exclude unreachable
|
||||||
val mergedMembers = Gossip.emptyMembers ++ Member.pickHighestPriority(this.members, that.members).filterNot(mergedUnreachable.contains)
|
val mergedMembers = Gossip.emptyMembers :++ Member.pickHighestPriority(this.members, that.members).filterNot(mergedUnreachable.contains)
|
||||||
|
|
||||||
// 5. fresh seen table
|
// 5. fresh seen table
|
||||||
val mergedSeen = Map.empty[Address, VectorClock]
|
val mergedSeen = Map.empty[Address, VectorClock]
|
||||||
|
|
@ -803,7 +819,7 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
|
||||||
|
|
||||||
// add joining node as Joining
|
// add joining node as Joining
|
||||||
// add self in case someone else joins before self has joined (Set discards duplicates)
|
// add self in case someone else joins before self has joined (Set discards duplicates)
|
||||||
val newMembers = localMembers + Member(node, Joining) + Member(selfAddress, Joining)
|
val newMembers = localMembers :+ Member(node, Joining) :+ Member(selfAddress, Joining)
|
||||||
val newGossip = localGossip copy (overview = newOverview, members = newMembers)
|
val newGossip = localGossip copy (overview = newOverview, members = newMembers)
|
||||||
|
|
||||||
val versionedGossip = newGossip :+ vclockNode
|
val versionedGossip = newGossip :+ vclockNode
|
||||||
|
|
|
||||||
|
|
@ -99,12 +99,14 @@ abstract class TransitionSpec
|
||||||
|
|
||||||
"start nodes as singleton clusters" taggedAs LongRunningTest in {
|
"start nodes as singleton clusters" taggedAs LongRunningTest in {
|
||||||
|
|
||||||
startClusterNode()
|
runOn(first) {
|
||||||
cluster.isSingletonCluster must be(true)
|
startClusterNode()
|
||||||
cluster.status must be(Joining)
|
cluster.isSingletonCluster must be(true)
|
||||||
cluster.convergence.isDefined must be(true)
|
cluster.status must be(Joining)
|
||||||
cluster.leaderActions()
|
cluster.convergence.isDefined must be(true)
|
||||||
cluster.status must be(Up)
|
cluster.leaderActions()
|
||||||
|
cluster.status must be(Up)
|
||||||
|
}
|
||||||
|
|
||||||
enterBarrier("after-1")
|
enterBarrier("after-1")
|
||||||
}
|
}
|
||||||
|
|
@ -244,13 +246,20 @@ abstract class TransitionSpec
|
||||||
}
|
}
|
||||||
|
|
||||||
"startup a second separated cluster consisting of nodes fourth and fifth" taggedAs LongRunningTest in {
|
"startup a second separated cluster consisting of nodes fourth and fifth" taggedAs LongRunningTest in {
|
||||||
|
runOn(fifth) {
|
||||||
|
startClusterNode()
|
||||||
|
cluster.leaderActions()
|
||||||
|
cluster.status must be(Up)
|
||||||
|
}
|
||||||
|
enterBarrier("fifth-started")
|
||||||
|
|
||||||
runOn(fourth) {
|
runOn(fourth) {
|
||||||
cluster.join(fifth)
|
cluster.join(fifth)
|
||||||
}
|
}
|
||||||
runOn(fifth) {
|
runOn(fifth) {
|
||||||
awaitMembers(fourth, fifth)
|
awaitMembers(fourth, fifth)
|
||||||
}
|
}
|
||||||
testConductor.enter("fourth-joined")
|
enterBarrier("fourth-joined")
|
||||||
|
|
||||||
fifth gossipTo fourth
|
fifth gossipTo fourth
|
||||||
fourth gossipTo fifth
|
fourth gossipTo fifth
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue