diff --git a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala index a86bc0148c..67ea0c4cd0 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala @@ -118,6 +118,15 @@ object Member { case _ ⇒ None } + def pickHighestPriority(a: Set[Member], b: Set[Member]): Set[Member] = { + // group all members by Address => Seq[Member] + val groupedByAddress = (a.toSeq ++ b.toSeq).groupBy(_.address) + // pick highest MemberStatus + (Set.empty[Member] /: groupedByAddress) { + case (acc, (_, members)) ⇒ acc + members.reduceLeft(highestPriorityOf) + } + } + /** * Picks the Member with the highest "priority" MemberStatus. */ @@ -130,8 +139,8 @@ object Member { case (_, Exiting) ⇒ m2 case (Leaving, _) ⇒ m1 case (_, Leaving) ⇒ m2 - case (Up, Joining) ⇒ m1 - case (Joining, Up) ⇒ m2 + case (Up, Joining) ⇒ m2 + case (Joining, Up) ⇒ m1 case (Joining, Joining) ⇒ m1 case (Up, Up) ⇒ m1 } @@ -268,21 +277,12 @@ case class Gossip( // 2. merge meta-data val mergedMeta = this.meta ++ that.meta - def pickHighestPriority(a: Seq[Member], b: Seq[Member]): Set[Member] = { - // group all members by Address => Seq[Member] - val groupedByAddress = (a ++ b).groupBy(_.address) - // pick highest MemberStatus - (Set.empty[Member] /: groupedByAddress) { - case (acc, (_, members)) ⇒ acc + members.reduceLeft(Member.highestPriorityOf) - } - } - // 3. merge unreachable by selecting the single Member with highest MemberStatus out of the Member groups - val mergedUnreachable = pickHighestPriority(this.overview.unreachable.toSeq, that.overview.unreachable.toSeq) + val mergedUnreachable = Member.pickHighestPriority(this.overview.unreachable, that.overview.unreachable) // 4. merge members by selecting the single Member with highest MemberStatus out of the Member groups, // and exclude unreachable - val mergedMembers = Gossip.emptyMembers ++ pickHighestPriority(this.members.toSeq, that.members.toSeq). + val mergedMembers = Gossip.emptyMembers ++ Member.pickHighestPriority(this.members, that.members). filterNot(mergedUnreachable.contains) // 5. fresh seen table diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/ConvergenceSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/ConvergenceSpec.scala index bdc0a1ae8b..52206f1b8c 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/ConvergenceSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/ConvergenceSpec.scala @@ -39,7 +39,7 @@ abstract class ConvergenceSpec "A cluster of 3 members" must { - "reach initial convergence" taggedAs LongRunningTest ignore { + "reach initial convergence" taggedAs LongRunningTest in { awaitClusterUp(first, second, third) runOn(fourth) { @@ -49,7 +49,7 @@ abstract class ConvergenceSpec testConductor.enter("after-1") } - "not reach convergence while any nodes are unreachable" taggedAs LongRunningTest ignore { + "not reach convergence while any nodes are unreachable" taggedAs LongRunningTest in { val thirdAddress = node(third).address testConductor.enter("before-shutdown") @@ -81,7 +81,7 @@ abstract class ConvergenceSpec testConductor.enter("after-2") } - "not move a new joining node to Up while there is no convergence" taggedAs LongRunningTest ignore { + "not move a new joining node to Up while there is no convergence" taggedAs LongRunningTest in { runOn(fourth) { // try to join cluster.join(node(first).address) diff --git a/akka-cluster/src/test/scala/akka/cluster/GossipSpec.scala b/akka-cluster/src/test/scala/akka/cluster/GossipSpec.scala index 449ebf7bff..8020010655 100644 --- a/akka-cluster/src/test/scala/akka/cluster/GossipSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/GossipSpec.scala @@ -33,12 +33,12 @@ class GossipSpec extends WordSpec with MustMatchers { val g2 = Gossip(members = SortedSet(a2, c2, e2)) val merged1 = g1 merge g2 - merged1.members must be(SortedSet(a1, c1, e2)) - merged1.members.toSeq.map(_.status) must be(Seq(Up, Leaving, Up)) + merged1.members must be(SortedSet(a2, c1, e1)) + merged1.members.toSeq.map(_.status) must be(Seq(Joining, Leaving, Joining)) val merged2 = g2 merge g1 - merged2.members must be(SortedSet(a1, c1, e2)) - merged2.members.toSeq.map(_.status) must be(Seq(Up, Leaving, Up)) + merged2.members must be(SortedSet(a2, c1, e1)) + merged2.members.toSeq.map(_.status) must be(Seq(Joining, Leaving, Joining)) } @@ -48,12 +48,12 @@ class GossipSpec extends WordSpec with MustMatchers { val g2 = Gossip(members = Gossip.emptyMembers, overview = GossipOverview(unreachable = Set(a2, b2, c2, d2))) val merged1 = g1 merge g2 - merged1.overview.unreachable must be(Set(a1, b2, c1, d2)) - merged1.overview.unreachable.toSeq.sorted.map(_.status) must be(Seq(Up, Removed, Leaving, Removed)) + merged1.overview.unreachable must be(Set(a2, b2, c1, d2)) + merged1.overview.unreachable.toSeq.sorted.map(_.status) must be(Seq(Joining, Removed, Leaving, Removed)) val merged2 = g2 merge g1 - merged2.overview.unreachable must be(Set(a1, b2, c1, d2)) - merged2.overview.unreachable.toSeq.sorted.map(_.status) must be(Seq(Up, Removed, Leaving, Removed)) + merged2.overview.unreachable must be(Set(a2, b2, c1, d2)) + merged2.overview.unreachable.toSeq.sorted.map(_.status) must be(Seq(Joining, Removed, Leaving, Removed)) } @@ -62,14 +62,14 @@ class GossipSpec extends WordSpec with MustMatchers { val g2 = Gossip(members = SortedSet(a2, c2), overview = GossipOverview(unreachable = Set(b2, d2))) val merged1 = g1 merge g2 - merged1.members must be(SortedSet(a1)) - merged1.members.toSeq.map(_.status) must be(Seq(Up)) + merged1.members must be(SortedSet(a2)) + merged1.members.toSeq.map(_.status) must be(Seq(Joining)) merged1.overview.unreachable must be(Set(b2, c1, d2)) merged1.overview.unreachable.toSeq.sorted.map(_.status) must be(Seq(Removed, Leaving, Removed)) val merged2 = g2 merge g1 - merged2.members must be(SortedSet(a1)) - merged2.members.toSeq.map(_.status) must be(Seq(Up)) + merged2.members must be(SortedSet(a2)) + merged2.members.toSeq.map(_.status) must be(Seq(Joining)) merged2.overview.unreachable must be(Set(b2, c1, d2)) merged2.overview.unreachable.toSeq.sorted.map(_.status) must be(Seq(Removed, Leaving, Removed))