diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala index f3a6c1a730..92801313ec 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala @@ -572,7 +572,7 @@ private[cluster] final class ClusterCoreDaemon(environment: ClusterEnvironment) val localGossip = latestGossip val localMembers = localGossip.members - val isLeader = localMembers.nonEmpty && (selfAddress == localMembers.head.address) + val isLeader = localGossip.isLeader(selfAddress) if (isLeader && isAvailable) { // only run the leader actions if we are the LEADER and available diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterReadView.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterReadView.scala index 4d3904cbb8..39ee3888d5 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterReadView.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterReadView.scala @@ -61,7 +61,7 @@ private[akka] class ClusterReadView(cluster: Cluster) extends Closeable { def isRunning: Boolean = cluster.isRunning /** - * Current cluster members, sorted with leader first. + * Current cluster members, sorted by address. */ def members: SortedSet[Member] = state.members diff --git a/akka-cluster/src/main/scala/akka/cluster/Gossip.scala b/akka-cluster/src/main/scala/akka/cluster/Gossip.scala index b975034c66..be734703be 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Gossip.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Gossip.scala @@ -163,7 +163,7 @@ private[cluster] case class Gossip( def isLeader(address: Address): Boolean = leader == Some(address) - def leader: Option[Address] = members.headOption.map(_.address) + def leader: Option[Address] = members.find(_.status != Exiting).orElse(members.headOption).map(_.address) def isSingletonCluster: Boolean = members.size == 1 diff --git a/akka-cluster/src/main/scala/akka/cluster/Member.scala b/akka-cluster/src/main/scala/akka/cluster/Member.scala index c7909b1834..e806b0a933 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Member.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Member.scala @@ -40,13 +40,10 @@ object Member { } /** - * `Member` ordering type class, sorts members by host and port with the exception that - * it puts all members that are in MemberStatus.EXITING last. + * `Member` ordering type class, sorts members by host and port. */ - implicit val ordering: Ordering[Member] = Ordering.fromLessThan[Member] { (a, b) ⇒ - if (a.status == Exiting && b.status != Exiting) false - else if (a.status != Exiting && b.status == Exiting) true - else addressOrdering.compare(a.address, b.address) < 0 + implicit val ordering: Ordering[Member] = new Ordering[Member] { + def compare(a: Member, b: Member): Int = addressOrdering.compare(a.address, b.address) } def apply(address: Address, status: MemberStatus): Member = new Member(address, status) diff --git a/akka-cluster/src/test/scala/akka/cluster/GossipSpec.scala b/akka-cluster/src/test/scala/akka/cluster/GossipSpec.scala index 8020010655..f873f5f252 100644 --- a/akka-cluster/src/test/scala/akka/cluster/GossipSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/GossipSpec.scala @@ -20,6 +20,7 @@ class GossipSpec extends WordSpec with MustMatchers { val b2 = Member(Address("akka", "sys", "b", 2552), Removed) val c1 = Member(Address("akka", "sys", "c", 2552), Leaving) val c2 = Member(Address("akka", "sys", "c", 2552), Up) + val c3 = Member(Address("akka", "sys", "c", 2552), Exiting) val d1 = Member(Address("akka", "sys", "d", 2552), Leaving) val d2 = Member(Address("akka", "sys", "d", 2552), Removed) val e1 = Member(Address("akka", "sys", "e", 2552), Joining) @@ -100,5 +101,11 @@ class GossipSpec extends WordSpec with MustMatchers { Gossip(members = SortedSet(a1, e1)).seen(a1.address).seen(e1.address).seen(b1.address) } + "have leader as first member based on ordering, except Exiting status" in { + Gossip(members = SortedSet(c2, e2)).leader must be(Some(c2.address)) + Gossip(members = SortedSet(c3, e2)).leader must be(Some(e2.address)) + Gossip(members = SortedSet(c3)).leader must be(Some(c3.address)) + } + } } diff --git a/akka-cluster/src/test/scala/akka/cluster/MemberOrderingSpec.scala b/akka-cluster/src/test/scala/akka/cluster/MemberOrderingSpec.scala index d8687312da..280ed3f3e0 100644 --- a/akka-cluster/src/test/scala/akka/cluster/MemberOrderingSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/MemberOrderingSpec.scala @@ -19,7 +19,7 @@ class MemberOrderingSpec extends WordSpec with MustMatchers { "An Ordering[Member]" must { - "order non-exiting members by host:port" in { + "order members by host:port" in { val members = SortedSet.empty[Member] + Member(AddressFromURIString("akka://sys@darkstar:1112"), Up) + Member(AddressFromURIString("akka://sys@darkstar:1113"), Joining) + @@ -32,34 +32,6 @@ class MemberOrderingSpec extends WordSpec with MustMatchers { seq(2) must equal(Member(AddressFromURIString("akka://sys@darkstar:1113"), Joining)) } - "order exiting members by last" in { - val members = SortedSet.empty[Member] + - Member(AddressFromURIString("akka://sys@darkstar:1112"), Exiting) + - Member(AddressFromURIString("akka://sys@darkstar:1113"), Up) + - Member(AddressFromURIString("akka://sys@darkstar:1111"), Joining) - - val seq = members.toSeq - seq.size must equal(3) - seq(0) must equal(Member(AddressFromURIString("akka://sys@darkstar:1111"), Joining)) - seq(1) must equal(Member(AddressFromURIString("akka://sys@darkstar:1113"), Up)) - seq(2) must equal(Member(AddressFromURIString("akka://sys@darkstar:1112"), Exiting)) - } - - "order multiple exiting members by last but internally by host:port" in { - val members = SortedSet.empty[Member] + - Member(AddressFromURIString("akka://sys@darkstar:1112"), Exiting) + - Member(AddressFromURIString("akka://sys@darkstar:1113"), Leaving) + - Member(AddressFromURIString("akka://sys@darkstar:1111"), Up) + - Member(AddressFromURIString("akka://sys@darkstar:1110"), Exiting) - - val seq = members.toSeq - seq.size must equal(4) - seq(0) must equal(Member(AddressFromURIString("akka://sys@darkstar:1111"), Up)) - seq(1) must equal(Member(AddressFromURIString("akka://sys@darkstar:1113"), Leaving)) - seq(2) must equal(Member(AddressFromURIString("akka://sys@darkstar:1110"), Exiting)) - seq(3) must equal(Member(AddressFromURIString("akka://sys@darkstar:1112"), Exiting)) - } - "be sorted by address correctly" in { import Member.ordering // sorting should be done on host and port, only @@ -86,6 +58,29 @@ class MemberOrderingSpec extends WordSpec with MustMatchers { m3 must not be (m2) m3 must not be (m1) } + + "have consistent ordering and equals" in { + val address1 = Address("akka", "sys1", "host1", 9001) + val address2 = Address("akka", "sys1", "host1", 9002) + + val x = Member(address1, Exiting) + val y = Member(address1, Removed) + val z = Member(address2, Up) + Member.ordering.compare(x, y) must be(0) + Member.ordering.compare(x, z) must be(Member.ordering.compare(y, z)) + } + + "work with SortedSet" in { + val address1 = Address("akka", "sys1", "host1", 9001) + val address2 = Address("akka", "sys1", "host1", 9002) + val address3 = Address("akka", "sys1", "host1", 9003) + + (SortedSet(Member(address1, MemberStatus.Joining)) - Member(address1, MemberStatus.Up)) must be(SortedSet.empty[Member]) + (SortedSet(Member(address1, MemberStatus.Exiting)) - Member(address1, MemberStatus.Removed)) must be(SortedSet.empty[Member]) + (SortedSet(Member(address1, MemberStatus.Up)) - Member(address1, MemberStatus.Exiting)) must be(SortedSet.empty[Member]) + (SortedSet(Member(address2, Up), Member(address3, Joining), Member(address1, MemberStatus.Exiting)) - Member(address1, MemberStatus.Removed)) must be( + SortedSet(Member(address2, Up), Member(address3, Joining))) + } } "An Ordering[Address]" must {