From fe98dae6500c5bd0591a102e3bb789a160dbb131 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Fri, 6 Mar 2015 14:29:26 +0100 Subject: [PATCH] =clu #13875 Fix regression in leader selection * The leader is selected by picking the first reachable member, but in #13875 we had to let the self member be unreachable in the Reachability table and that was not considered in the logic of the leader selection. * That means changed behavior that is unwanted, especially when there is only one node left the leader could be evaluated to None instead of Some(selfUniqueAddress). * Note that #13875 has not been released yet. --- .../scala/akka/cluster/ClusterDaemon.scala | 2 +- .../scala/akka/cluster/ClusterEvent.scala | 21 ++++++++++--------- .../src/main/scala/akka/cluster/Gossip.scala | 13 +++++++----- .../akka/cluster/ClusterDomainEventSpec.scala | 6 +++--- .../test/scala/akka/cluster/GossipSpec.scala | 14 ++++++++++--- 5 files changed, 34 insertions(+), 22 deletions(-) diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala index ccc6530135..f5d3157f95 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala @@ -760,7 +760,7 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with * Runs periodic leader actions, such as member status transitions, assigning partitions etc. */ def leaderActions(): Unit = - if (latestGossip.isLeader(selfUniqueAddress)) { + if (latestGossip.isLeader(selfUniqueAddress, selfUniqueAddress)) { // only run the leader actions if we are the LEADER val firstNotice = 20 val periodicNotice = 60 diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala index 222234eb50..d478a4a3f5 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala @@ -271,20 +271,20 @@ object ClusterEvent { /** * INTERNAL API */ - private[cluster] def diffLeader(oldGossip: Gossip, newGossip: Gossip): immutable.Seq[LeaderChanged] = { - val newLeader = newGossip.leader - if (newLeader != oldGossip.leader) List(LeaderChanged(newLeader.map(_.address))) + private[cluster] def diffLeader(oldGossip: Gossip, newGossip: Gossip, selfUniqueAddress: UniqueAddress): immutable.Seq[LeaderChanged] = { + val newLeader = newGossip.leader(selfUniqueAddress) + if (newLeader != oldGossip.leader(selfUniqueAddress)) List(LeaderChanged(newLeader.map(_.address))) else Nil } /** * INTERNAL API */ - private[cluster] def diffRolesLeader(oldGossip: Gossip, newGossip: Gossip): Set[RoleLeaderChanged] = { + private[cluster] def diffRolesLeader(oldGossip: Gossip, newGossip: Gossip, selfUniqueAddress: UniqueAddress): Set[RoleLeaderChanged] = { for { role ← (oldGossip.allRoles ++ newGossip.allRoles) - newLeader = newGossip.roleLeader(role) - if newLeader != oldGossip.roleLeader(role) + newLeader = newGossip.roleLeader(role, selfUniqueAddress) + if newLeader != oldGossip.roleLeader(role, selfUniqueAddress) } yield RoleLeaderChanged(role, newLeader.map(_.address)) } @@ -354,8 +354,9 @@ private[cluster] final class ClusterDomainEventPublisher extends Actor with Acto members = latestGossip.members, unreachable = unreachable, seenBy = latestGossip.seenBy.map(_.address), - leader = latestGossip.leader.map(_.address), - roleLeaderMap = latestGossip.allRoles.map(r ⇒ r -> latestGossip.roleLeader(r).map(_.address))(collection.breakOut)) + leader = latestGossip.leader(selfUniqueAddress).map(_.address), + roleLeaderMap = latestGossip.allRoles.map(r ⇒ r -> latestGossip.roleLeader(r, selfUniqueAddress) + .map(_.address))(collection.breakOut)) receiver ! state } @@ -390,8 +391,8 @@ private[cluster] final class ClusterDomainEventPublisher extends Actor with Acto diffMemberEvents(oldGossip, newGossip) foreach pub diffUnreachable(oldGossip, newGossip, selfUniqueAddress) foreach pub diffReachable(oldGossip, newGossip, selfUniqueAddress) foreach pub - diffLeader(oldGossip, newGossip) foreach pub - diffRolesLeader(oldGossip, newGossip) foreach pub + diffLeader(oldGossip, newGossip, selfUniqueAddress) foreach pub + diffRolesLeader(oldGossip, newGossip, selfUniqueAddress) foreach pub // publish internal SeenState for testing purposes diffSeen(oldGossip, newGossip, selfUniqueAddress) foreach pub diffReachability(oldGossip, newGossip) foreach pub diff --git a/akka-cluster/src/main/scala/akka/cluster/Gossip.scala b/akka-cluster/src/main/scala/akka/cluster/Gossip.scala index 32f7bb87d7..d7deef059a 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Gossip.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Gossip.scala @@ -179,16 +179,19 @@ private[cluster] final case class Gossip( overview.reachability.removeObservers(downed.map(_.uniqueAddress)) } - def isLeader(node: UniqueAddress): Boolean = leader == Some(node) + def isLeader(node: UniqueAddress, selfUniqueAddress: UniqueAddress): Boolean = + leader(selfUniqueAddress) == Some(node) - def leader: Option[UniqueAddress] = leaderOf(members) + def leader(selfUniqueAddress: UniqueAddress): Option[UniqueAddress] = + leaderOf(members, selfUniqueAddress) - def roleLeader(role: String): Option[UniqueAddress] = leaderOf(members.filter(_.hasRole(role))) + def roleLeader(role: String, selfUniqueAddress: UniqueAddress): Option[UniqueAddress] = + leaderOf(members.filter(_.hasRole(role)), selfUniqueAddress) - private def leaderOf(mbrs: immutable.SortedSet[Member]): Option[UniqueAddress] = { + private def leaderOf(mbrs: immutable.SortedSet[Member], selfUniqueAddress: UniqueAddress): Option[UniqueAddress] = { val reachableMembers = if (overview.reachability.isAllReachable) mbrs - else mbrs.filter(m ⇒ overview.reachability.isReachable(m.uniqueAddress)) + else mbrs.filter(m ⇒ overview.reachability.isReachable(m.uniqueAddress) || m.uniqueAddress == selfUniqueAddress) if (reachableMembers.isEmpty) None else reachableMembers.find(m ⇒ Gossip.leaderMemberStatus(m.status)). orElse(Some(reachableMembers.min(Member.leaderStatusOrdering))).map(_.uniqueAddress) diff --git a/akka-cluster/src/test/scala/akka/cluster/ClusterDomainEventSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ClusterDomainEventSpec.scala index c2decca4af..db5210873d 100644 --- a/akka-cluster/src/test/scala/akka/cluster/ClusterDomainEventSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/ClusterDomainEventSpec.scala @@ -127,21 +127,21 @@ class ClusterDomainEventSpec extends WordSpec with Matchers { diffMemberEvents(g1, g2) should ===(Seq(MemberRemoved(aRemoved, Up))) diffUnreachable(g1, g2, selfDummyAddress) should ===(Seq.empty) diffSeen(g1, g2, selfDummyAddress) should ===(Seq(SeenChanged(convergence = true, seenBy = s2.map(_.address)))) - diffLeader(g1, g2) should ===(Seq(LeaderChanged(Some(bUp.address)))) + diffLeader(g1, g2, selfDummyAddress) should ===(Seq(LeaderChanged(Some(bUp.address)))) } "be produced for role leader changes" in { val g0 = Gossip.empty val g1 = Gossip(members = SortedSet(aUp, bUp, cUp, dLeaving, eJoining)) val g2 = Gossip(members = SortedSet(bUp, cUp, dExiting, eJoining)) - diffRolesLeader(g0, g1) should ===( + diffRolesLeader(g0, g1, selfDummyAddress) should ===( Set(RoleLeaderChanged("AA", Some(aUp.address)), RoleLeaderChanged("AB", Some(aUp.address)), RoleLeaderChanged("BB", Some(bUp.address)), RoleLeaderChanged("DD", Some(dLeaving.address)), RoleLeaderChanged("DE", Some(dLeaving.address)), RoleLeaderChanged("EE", Some(eUp.address)))) - diffRolesLeader(g1, g2) should ===( + diffRolesLeader(g1, g2, selfDummyAddress) should ===( Set(RoleLeaderChanged("AA", None), RoleLeaderChanged("AB", Some(bUp.address)), RoleLeaderChanged("DE", Some(eJoining.address)))) diff --git a/akka-cluster/src/test/scala/akka/cluster/GossipSpec.scala b/akka-cluster/src/test/scala/akka/cluster/GossipSpec.scala index 8818f636df..9f40b802b9 100644 --- a/akka-cluster/src/test/scala/akka/cluster/GossipSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/GossipSpec.scala @@ -120,9 +120,17 @@ class GossipSpec extends WordSpec with Matchers { } "have leader as first member based on ordering, except Exiting status" in { - Gossip(members = SortedSet(c2, e2)).leader should ===(Some(c2.uniqueAddress)) - Gossip(members = SortedSet(c3, e2)).leader should ===(Some(e2.uniqueAddress)) - Gossip(members = SortedSet(c3)).leader should ===(Some(c3.uniqueAddress)) + Gossip(members = SortedSet(c2, e2)).leader(c2.uniqueAddress) should ===(Some(c2.uniqueAddress)) + Gossip(members = SortedSet(c3, e2)).leader(c3.uniqueAddress) should ===(Some(e2.uniqueAddress)) + Gossip(members = SortedSet(c3)).leader(c3.uniqueAddress) should ===(Some(c3.uniqueAddress)) + } + + "have leader as first reachable member based on ordering" in { + val r1 = Reachability.empty.unreachable(e2.uniqueAddress, c2.uniqueAddress) + val g1 = Gossip(members = SortedSet(c2, e2), overview = GossipOverview(reachability = r1)) + g1.leader(e2.uniqueAddress) should ===(Some(e2.uniqueAddress)) + // but when c2 is selfUniqueAddress + g1.leader(c2.uniqueAddress) should ===(Some(c2.uniqueAddress)) } "merge seen table correctly" in {