Members ordered by address only, see #2405
* The special ordering of status Exiting makes ordering and equals inconsistent * Take the Exiting status into account when looking for leader
This commit is contained in:
parent
7f1a4d3ab6
commit
6d1631aa8c
6 changed files with 37 additions and 38 deletions
|
|
@ -572,7 +572,7 @@ private[cluster] final class ClusterCoreDaemon(environment: ClusterEnvironment)
|
|||
val localGossip = latestGossip
|
||||
val localMembers = localGossip.members
|
||||
|
||||
val isLeader = localMembers.nonEmpty && (selfAddress == localMembers.head.address)
|
||||
val isLeader = localGossip.isLeader(selfAddress)
|
||||
|
||||
if (isLeader && isAvailable) {
|
||||
// only run the leader actions if we are the LEADER and available
|
||||
|
|
|
|||
|
|
@ -61,7 +61,7 @@ private[akka] class ClusterReadView(cluster: Cluster) extends Closeable {
|
|||
def isRunning: Boolean = cluster.isRunning
|
||||
|
||||
/**
|
||||
* Current cluster members, sorted with leader first.
|
||||
* Current cluster members, sorted by address.
|
||||
*/
|
||||
def members: SortedSet[Member] = state.members
|
||||
|
||||
|
|
|
|||
|
|
@ -163,7 +163,7 @@ private[cluster] case class Gossip(
|
|||
|
||||
def isLeader(address: Address): Boolean = leader == Some(address)
|
||||
|
||||
def leader: Option[Address] = members.headOption.map(_.address)
|
||||
def leader: Option[Address] = members.find(_.status != Exiting).orElse(members.headOption).map(_.address)
|
||||
|
||||
def isSingletonCluster: Boolean = members.size == 1
|
||||
|
||||
|
|
|
|||
|
|
@ -40,13 +40,10 @@ object Member {
|
|||
}
|
||||
|
||||
/**
|
||||
* `Member` ordering type class, sorts members by host and port with the exception that
|
||||
* it puts all members that are in MemberStatus.EXITING last.
|
||||
* `Member` ordering type class, sorts members by host and port.
|
||||
*/
|
||||
implicit val ordering: Ordering[Member] = Ordering.fromLessThan[Member] { (a, b) ⇒
|
||||
if (a.status == Exiting && b.status != Exiting) false
|
||||
else if (a.status != Exiting && b.status == Exiting) true
|
||||
else addressOrdering.compare(a.address, b.address) < 0
|
||||
implicit val ordering: Ordering[Member] = new Ordering[Member] {
|
||||
def compare(a: Member, b: Member): Int = addressOrdering.compare(a.address, b.address)
|
||||
}
|
||||
|
||||
def apply(address: Address, status: MemberStatus): Member = new Member(address, status)
|
||||
|
|
|
|||
|
|
@ -20,6 +20,7 @@ class GossipSpec extends WordSpec with MustMatchers {
|
|||
val b2 = Member(Address("akka", "sys", "b", 2552), Removed)
|
||||
val c1 = Member(Address("akka", "sys", "c", 2552), Leaving)
|
||||
val c2 = Member(Address("akka", "sys", "c", 2552), Up)
|
||||
val c3 = Member(Address("akka", "sys", "c", 2552), Exiting)
|
||||
val d1 = Member(Address("akka", "sys", "d", 2552), Leaving)
|
||||
val d2 = Member(Address("akka", "sys", "d", 2552), Removed)
|
||||
val e1 = Member(Address("akka", "sys", "e", 2552), Joining)
|
||||
|
|
@ -100,5 +101,11 @@ class GossipSpec extends WordSpec with MustMatchers {
|
|||
Gossip(members = SortedSet(a1, e1)).seen(a1.address).seen(e1.address).seen(b1.address)
|
||||
}
|
||||
|
||||
"have leader as first member based on ordering, except Exiting status" in {
|
||||
Gossip(members = SortedSet(c2, e2)).leader must be(Some(c2.address))
|
||||
Gossip(members = SortedSet(c3, e2)).leader must be(Some(e2.address))
|
||||
Gossip(members = SortedSet(c3)).leader must be(Some(c3.address))
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -19,7 +19,7 @@ class MemberOrderingSpec extends WordSpec with MustMatchers {
|
|||
|
||||
"An Ordering[Member]" must {
|
||||
|
||||
"order non-exiting members by host:port" in {
|
||||
"order members by host:port" in {
|
||||
val members = SortedSet.empty[Member] +
|
||||
Member(AddressFromURIString("akka://sys@darkstar:1112"), Up) +
|
||||
Member(AddressFromURIString("akka://sys@darkstar:1113"), Joining) +
|
||||
|
|
@ -32,34 +32,6 @@ class MemberOrderingSpec extends WordSpec with MustMatchers {
|
|||
seq(2) must equal(Member(AddressFromURIString("akka://sys@darkstar:1113"), Joining))
|
||||
}
|
||||
|
||||
"order exiting members by last" in {
|
||||
val members = SortedSet.empty[Member] +
|
||||
Member(AddressFromURIString("akka://sys@darkstar:1112"), Exiting) +
|
||||
Member(AddressFromURIString("akka://sys@darkstar:1113"), Up) +
|
||||
Member(AddressFromURIString("akka://sys@darkstar:1111"), Joining)
|
||||
|
||||
val seq = members.toSeq
|
||||
seq.size must equal(3)
|
||||
seq(0) must equal(Member(AddressFromURIString("akka://sys@darkstar:1111"), Joining))
|
||||
seq(1) must equal(Member(AddressFromURIString("akka://sys@darkstar:1113"), Up))
|
||||
seq(2) must equal(Member(AddressFromURIString("akka://sys@darkstar:1112"), Exiting))
|
||||
}
|
||||
|
||||
"order multiple exiting members by last but internally by host:port" in {
|
||||
val members = SortedSet.empty[Member] +
|
||||
Member(AddressFromURIString("akka://sys@darkstar:1112"), Exiting) +
|
||||
Member(AddressFromURIString("akka://sys@darkstar:1113"), Leaving) +
|
||||
Member(AddressFromURIString("akka://sys@darkstar:1111"), Up) +
|
||||
Member(AddressFromURIString("akka://sys@darkstar:1110"), Exiting)
|
||||
|
||||
val seq = members.toSeq
|
||||
seq.size must equal(4)
|
||||
seq(0) must equal(Member(AddressFromURIString("akka://sys@darkstar:1111"), Up))
|
||||
seq(1) must equal(Member(AddressFromURIString("akka://sys@darkstar:1113"), Leaving))
|
||||
seq(2) must equal(Member(AddressFromURIString("akka://sys@darkstar:1110"), Exiting))
|
||||
seq(3) must equal(Member(AddressFromURIString("akka://sys@darkstar:1112"), Exiting))
|
||||
}
|
||||
|
||||
"be sorted by address correctly" in {
|
||||
import Member.ordering
|
||||
// sorting should be done on host and port, only
|
||||
|
|
@ -86,6 +58,29 @@ class MemberOrderingSpec extends WordSpec with MustMatchers {
|
|||
m3 must not be (m2)
|
||||
m3 must not be (m1)
|
||||
}
|
||||
|
||||
"have consistent ordering and equals" in {
|
||||
val address1 = Address("akka", "sys1", "host1", 9001)
|
||||
val address2 = Address("akka", "sys1", "host1", 9002)
|
||||
|
||||
val x = Member(address1, Exiting)
|
||||
val y = Member(address1, Removed)
|
||||
val z = Member(address2, Up)
|
||||
Member.ordering.compare(x, y) must be(0)
|
||||
Member.ordering.compare(x, z) must be(Member.ordering.compare(y, z))
|
||||
}
|
||||
|
||||
"work with SortedSet" in {
|
||||
val address1 = Address("akka", "sys1", "host1", 9001)
|
||||
val address2 = Address("akka", "sys1", "host1", 9002)
|
||||
val address3 = Address("akka", "sys1", "host1", 9003)
|
||||
|
||||
(SortedSet(Member(address1, MemberStatus.Joining)) - Member(address1, MemberStatus.Up)) must be(SortedSet.empty[Member])
|
||||
(SortedSet(Member(address1, MemberStatus.Exiting)) - Member(address1, MemberStatus.Removed)) must be(SortedSet.empty[Member])
|
||||
(SortedSet(Member(address1, MemberStatus.Up)) - Member(address1, MemberStatus.Exiting)) must be(SortedSet.empty[Member])
|
||||
(SortedSet(Member(address2, Up), Member(address3, Joining), Member(address1, MemberStatus.Exiting)) - Member(address1, MemberStatus.Removed)) must be(
|
||||
SortedSet(Member(address2, Up), Member(address3, Joining)))
|
||||
}
|
||||
}
|
||||
|
||||
"An Ordering[Address]" must {
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue