Merge pull request #17022 from akka/wip-13875-leader-reachability-patriknw
=clu #13875 Fix regression in leader selection
This commit is contained in:
commit
39e7f05db8
5 changed files with 34 additions and 22 deletions
|
|
@ -760,7 +760,7 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
|
|||
* Runs periodic leader actions, such as member status transitions, assigning partitions etc.
|
||||
*/
|
||||
def leaderActions(): Unit =
|
||||
if (latestGossip.isLeader(selfUniqueAddress)) {
|
||||
if (latestGossip.isLeader(selfUniqueAddress, selfUniqueAddress)) {
|
||||
// only run the leader actions if we are the LEADER
|
||||
val firstNotice = 20
|
||||
val periodicNotice = 60
|
||||
|
|
|
|||
|
|
@ -271,20 +271,20 @@ object ClusterEvent {
|
|||
/**
|
||||
* INTERNAL API
|
||||
*/
|
||||
private[cluster] def diffLeader(oldGossip: Gossip, newGossip: Gossip): immutable.Seq[LeaderChanged] = {
|
||||
val newLeader = newGossip.leader
|
||||
if (newLeader != oldGossip.leader) List(LeaderChanged(newLeader.map(_.address)))
|
||||
private[cluster] def diffLeader(oldGossip: Gossip, newGossip: Gossip, selfUniqueAddress: UniqueAddress): immutable.Seq[LeaderChanged] = {
|
||||
val newLeader = newGossip.leader(selfUniqueAddress)
|
||||
if (newLeader != oldGossip.leader(selfUniqueAddress)) List(LeaderChanged(newLeader.map(_.address)))
|
||||
else Nil
|
||||
}
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
*/
|
||||
private[cluster] def diffRolesLeader(oldGossip: Gossip, newGossip: Gossip): Set[RoleLeaderChanged] = {
|
||||
private[cluster] def diffRolesLeader(oldGossip: Gossip, newGossip: Gossip, selfUniqueAddress: UniqueAddress): Set[RoleLeaderChanged] = {
|
||||
for {
|
||||
role ← (oldGossip.allRoles ++ newGossip.allRoles)
|
||||
newLeader = newGossip.roleLeader(role)
|
||||
if newLeader != oldGossip.roleLeader(role)
|
||||
newLeader = newGossip.roleLeader(role, selfUniqueAddress)
|
||||
if newLeader != oldGossip.roleLeader(role, selfUniqueAddress)
|
||||
} yield RoleLeaderChanged(role, newLeader.map(_.address))
|
||||
}
|
||||
|
||||
|
|
@ -354,8 +354,9 @@ private[cluster] final class ClusterDomainEventPublisher extends Actor with Acto
|
|||
members = latestGossip.members,
|
||||
unreachable = unreachable,
|
||||
seenBy = latestGossip.seenBy.map(_.address),
|
||||
leader = latestGossip.leader.map(_.address),
|
||||
roleLeaderMap = latestGossip.allRoles.map(r ⇒ r -> latestGossip.roleLeader(r).map(_.address))(collection.breakOut))
|
||||
leader = latestGossip.leader(selfUniqueAddress).map(_.address),
|
||||
roleLeaderMap = latestGossip.allRoles.map(r ⇒ r -> latestGossip.roleLeader(r, selfUniqueAddress)
|
||||
.map(_.address))(collection.breakOut))
|
||||
receiver ! state
|
||||
}
|
||||
|
||||
|
|
@ -390,8 +391,8 @@ private[cluster] final class ClusterDomainEventPublisher extends Actor with Acto
|
|||
diffMemberEvents(oldGossip, newGossip) foreach pub
|
||||
diffUnreachable(oldGossip, newGossip, selfUniqueAddress) foreach pub
|
||||
diffReachable(oldGossip, newGossip, selfUniqueAddress) foreach pub
|
||||
diffLeader(oldGossip, newGossip) foreach pub
|
||||
diffRolesLeader(oldGossip, newGossip) foreach pub
|
||||
diffLeader(oldGossip, newGossip, selfUniqueAddress) foreach pub
|
||||
diffRolesLeader(oldGossip, newGossip, selfUniqueAddress) foreach pub
|
||||
// publish internal SeenState for testing purposes
|
||||
diffSeen(oldGossip, newGossip, selfUniqueAddress) foreach pub
|
||||
diffReachability(oldGossip, newGossip) foreach pub
|
||||
|
|
|
|||
|
|
@ -179,16 +179,19 @@ private[cluster] final case class Gossip(
|
|||
overview.reachability.removeObservers(downed.map(_.uniqueAddress))
|
||||
}
|
||||
|
||||
def isLeader(node: UniqueAddress): Boolean = leader == Some(node)
|
||||
def isLeader(node: UniqueAddress, selfUniqueAddress: UniqueAddress): Boolean =
|
||||
leader(selfUniqueAddress) == Some(node)
|
||||
|
||||
def leader: Option[UniqueAddress] = leaderOf(members)
|
||||
def leader(selfUniqueAddress: UniqueAddress): Option[UniqueAddress] =
|
||||
leaderOf(members, selfUniqueAddress)
|
||||
|
||||
def roleLeader(role: String): Option[UniqueAddress] = leaderOf(members.filter(_.hasRole(role)))
|
||||
def roleLeader(role: String, selfUniqueAddress: UniqueAddress): Option[UniqueAddress] =
|
||||
leaderOf(members.filter(_.hasRole(role)), selfUniqueAddress)
|
||||
|
||||
private def leaderOf(mbrs: immutable.SortedSet[Member]): Option[UniqueAddress] = {
|
||||
private def leaderOf(mbrs: immutable.SortedSet[Member], selfUniqueAddress: UniqueAddress): Option[UniqueAddress] = {
|
||||
val reachableMembers =
|
||||
if (overview.reachability.isAllReachable) mbrs
|
||||
else mbrs.filter(m ⇒ overview.reachability.isReachable(m.uniqueAddress))
|
||||
else mbrs.filter(m ⇒ overview.reachability.isReachable(m.uniqueAddress) || m.uniqueAddress == selfUniqueAddress)
|
||||
if (reachableMembers.isEmpty) None
|
||||
else reachableMembers.find(m ⇒ Gossip.leaderMemberStatus(m.status)).
|
||||
orElse(Some(reachableMembers.min(Member.leaderStatusOrdering))).map(_.uniqueAddress)
|
||||
|
|
|
|||
|
|
@ -127,21 +127,21 @@ class ClusterDomainEventSpec extends WordSpec with Matchers {
|
|||
diffMemberEvents(g1, g2) should ===(Seq(MemberRemoved(aRemoved, Up)))
|
||||
diffUnreachable(g1, g2, selfDummyAddress) should ===(Seq.empty)
|
||||
diffSeen(g1, g2, selfDummyAddress) should ===(Seq(SeenChanged(convergence = true, seenBy = s2.map(_.address))))
|
||||
diffLeader(g1, g2) should ===(Seq(LeaderChanged(Some(bUp.address))))
|
||||
diffLeader(g1, g2, selfDummyAddress) should ===(Seq(LeaderChanged(Some(bUp.address))))
|
||||
}
|
||||
|
||||
"be produced for role leader changes" in {
|
||||
val g0 = Gossip.empty
|
||||
val g1 = Gossip(members = SortedSet(aUp, bUp, cUp, dLeaving, eJoining))
|
||||
val g2 = Gossip(members = SortedSet(bUp, cUp, dExiting, eJoining))
|
||||
diffRolesLeader(g0, g1) should ===(
|
||||
diffRolesLeader(g0, g1, selfDummyAddress) should ===(
|
||||
Set(RoleLeaderChanged("AA", Some(aUp.address)),
|
||||
RoleLeaderChanged("AB", Some(aUp.address)),
|
||||
RoleLeaderChanged("BB", Some(bUp.address)),
|
||||
RoleLeaderChanged("DD", Some(dLeaving.address)),
|
||||
RoleLeaderChanged("DE", Some(dLeaving.address)),
|
||||
RoleLeaderChanged("EE", Some(eUp.address))))
|
||||
diffRolesLeader(g1, g2) should ===(
|
||||
diffRolesLeader(g1, g2, selfDummyAddress) should ===(
|
||||
Set(RoleLeaderChanged("AA", None),
|
||||
RoleLeaderChanged("AB", Some(bUp.address)),
|
||||
RoleLeaderChanged("DE", Some(eJoining.address))))
|
||||
|
|
|
|||
|
|
@ -120,9 +120,17 @@ class GossipSpec extends WordSpec with Matchers {
|
|||
}
|
||||
|
||||
"have leader as first member based on ordering, except Exiting status" in {
|
||||
Gossip(members = SortedSet(c2, e2)).leader should ===(Some(c2.uniqueAddress))
|
||||
Gossip(members = SortedSet(c3, e2)).leader should ===(Some(e2.uniqueAddress))
|
||||
Gossip(members = SortedSet(c3)).leader should ===(Some(c3.uniqueAddress))
|
||||
Gossip(members = SortedSet(c2, e2)).leader(c2.uniqueAddress) should ===(Some(c2.uniqueAddress))
|
||||
Gossip(members = SortedSet(c3, e2)).leader(c3.uniqueAddress) should ===(Some(e2.uniqueAddress))
|
||||
Gossip(members = SortedSet(c3)).leader(c3.uniqueAddress) should ===(Some(c3.uniqueAddress))
|
||||
}
|
||||
|
||||
"have leader as first reachable member based on ordering" in {
|
||||
val r1 = Reachability.empty.unreachable(e2.uniqueAddress, c2.uniqueAddress)
|
||||
val g1 = Gossip(members = SortedSet(c2, e2), overview = GossipOverview(reachability = r1))
|
||||
g1.leader(e2.uniqueAddress) should ===(Some(e2.uniqueAddress))
|
||||
// but when c2 is selfUniqueAddress
|
||||
g1.leader(c2.uniqueAddress) should ===(Some(c2.uniqueAddress))
|
||||
}
|
||||
|
||||
"merge seen table correctly" in {
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue