diff --git a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala index 3eddb5bf60..357d610ed5 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala @@ -27,6 +27,7 @@ import javax.management._ import MemberStatus._ import scala.annotation.tailrec import scala.collection.immutable.{ Map, SortedSet } +import scala.collection.GenTraversableOnce /** * Interface for membership change listener. @@ -179,6 +180,20 @@ object Member { case (Joining, Joining) ⇒ m1 case (Up, Up) ⇒ m1 } + + // FIXME Workaround for https://issues.scala-lang.org/browse/SI-5986 + // SortedSet + and ++ operators replaces existing element + // Use these :+ and :++ operators for the Gossip members + implicit def sortedSetWorkaround(sortedSet: SortedSet[Member]): SortedSetWorkaround = new SortedSetWorkaround(sortedSet) + class SortedSetWorkaround(sortedSet: SortedSet[Member]) { + implicit def :+(elem: Member): SortedSet[Member] = { + if (sortedSet.contains(elem)) sortedSet + else sortedSet + elem + } + + implicit def :++(elems: GenTraversableOnce[Member]): SortedSet[Member] = + sortedSet ++ (elems.toSet diff sortedSet) + } } /** @@ -226,6 +241,7 @@ case class GossipOverview( object Gossip { val emptyMembers: SortedSet[Member] = SortedSet.empty + } /** @@ -300,7 +316,7 @@ case class Gossip( */ def :+(member: Member): Gossip = { if (members contains member) this - else this copy (members = members + member) + else this copy (members = members :+ member) } /** @@ -329,7 +345,7 @@ case class Gossip( // 4. merge members by selecting the single Member with highest MemberStatus out of the Member groups, // and exclude unreachable - val mergedMembers = Gossip.emptyMembers ++ Member.pickHighestPriority(this.members, that.members).filterNot(mergedUnreachable.contains) + val mergedMembers = Gossip.emptyMembers :++ Member.pickHighestPriority(this.members, that.members).filterNot(mergedUnreachable.contains) // 5. fresh seen table val mergedSeen = Map.empty[Address, VectorClock] @@ -803,7 +819,7 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) // add joining node as Joining // add self in case someone else joins before self has joined (Set discards duplicates) - val newMembers = localMembers + Member(node, Joining) + Member(selfAddress, Joining) + val newMembers = localMembers :+ Member(node, Joining) :+ Member(selfAddress, Joining) val newGossip = localGossip copy (overview = newOverview, members = newMembers) val versionedGossip = newGossip :+ vclockNode diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/TransitionSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/TransitionSpec.scala index 0376545b41..397d824ef4 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/TransitionSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/TransitionSpec.scala @@ -99,12 +99,14 @@ abstract class TransitionSpec "start nodes as singleton clusters" taggedAs LongRunningTest in { - startClusterNode() - cluster.isSingletonCluster must be(true) - cluster.status must be(Joining) - cluster.convergence.isDefined must be(true) - cluster.leaderActions() - cluster.status must be(Up) + runOn(first) { + startClusterNode() + cluster.isSingletonCluster must be(true) + cluster.status must be(Joining) + cluster.convergence.isDefined must be(true) + cluster.leaderActions() + cluster.status must be(Up) + } enterBarrier("after-1") } @@ -244,13 +246,20 @@ abstract class TransitionSpec } "startup a second separated cluster consisting of nodes fourth and fifth" taggedAs LongRunningTest in { + runOn(fifth) { + startClusterNode() + cluster.leaderActions() + cluster.status must be(Up) + } + enterBarrier("fifth-started") + runOn(fourth) { cluster.join(fifth) } runOn(fifth) { awaitMembers(fourth, fifth) } - testConductor.enter("fourth-joined") + enterBarrier("fourth-joined") fifth gossipTo fourth fourth gossipTo fifth