From a8361eb22f93e8973fb988717d820175783034f3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bj=C3=B6rn=20Antonsson?= Date: Thu, 21 Mar 2013 15:41:45 +0100 Subject: [PATCH] Ignore members that can't be leader during convergence check. See #3150 --- .../src/main/scala/akka/cluster/Gossip.scala | 19 +++++++++++-------- .../akka/cluster/ClusterDomainEventSpec.scala | 2 +- 2 files changed, 12 insertions(+), 9 deletions(-) diff --git a/akka-cluster/src/main/scala/akka/cluster/Gossip.scala b/akka-cluster/src/main/scala/akka/cluster/Gossip.scala index a5eac87bfb..70d79e5c22 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Gossip.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Gossip.scala @@ -17,6 +17,8 @@ private[cluster] object Gossip { def apply(members: immutable.SortedSet[Member]) = if (members.isEmpty) empty else empty.copy(members = members) + + private val leaderMemberStatus = Set[MemberStatus](Up, Leaving) } /** @@ -68,11 +70,11 @@ private[cluster] case class Gossip( throw new IllegalArgumentException("Same nodes in both members and unreachable is not allowed, got [%s]" format unreachableAndLive.mkString(", ")) - val allowedLiveMemberStatuses: Set[MemberStatus] = Set(Joining, Up, Leaving, Exiting) - def hasNotAllowedLiveMemberStatus(m: Member) = !allowedLiveMemberStatuses.contains(m.status) + val allowedLiveMemberStatus: Set[MemberStatus] = Set(Joining, Up, Leaving, Exiting) + def hasNotAllowedLiveMemberStatus(m: Member) = !allowedLiveMemberStatus(m.status) if (members exists hasNotAllowedLiveMemberStatus) throw new IllegalArgumentException("Live members must have status [%s], got [%s]" - format (allowedLiveMemberStatuses.mkString(", "), + format (allowedLiveMemberStatus.mkString(", "), (members filter hasNotAllowedLiveMemberStatus).mkString(", "))) val seenButNotMember = overview.seen.keySet -- members.map(_.address) -- overview.unreachable.map(_.address) @@ -177,10 +179,11 @@ private[cluster] case class Gossip( // 1. we don't have any members that are unreachable, or // 2. all unreachable members in the set have status DOWN // Else we can't continue to check for convergence - // When that is done we check that all members exists in the seen table and - // have the latest vector clock version - - overview.unreachable.forall(_.status == Down) && members.forall(m ⇒ seenByAddress(m.address)) + // When that is done we check that all memebers with a leader + // status is in the seen table and has the latest vector clock + // version + overview.unreachable.forall(_.status == Down) && + !members.exists(m ⇒ Gossip.leaderMemberStatus(m.status) && !seenByAddress(m.address)) } def isLeader(address: Address): Boolean = leader == Some(address) @@ -191,7 +194,7 @@ private[cluster] case class Gossip( private def leaderOf(mbrs: immutable.SortedSet[Member]): Option[Address] = { if (mbrs.isEmpty) None - else mbrs.find(m ⇒ m.status != Joining && m.status != Exiting && m.status != Down). + else mbrs.find(m ⇒ Gossip.leaderMemberStatus(m.status)). orElse(Some(mbrs.min(Member.leaderStatusOrdering))).map(_.address) } diff --git a/akka-cluster/src/test/scala/akka/cluster/ClusterDomainEventSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ClusterDomainEventSpec.scala index f624012307..fc16b503df 100644 --- a/akka-cluster/src/test/scala/akka/cluster/ClusterDomainEventSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/ClusterDomainEventSpec.scala @@ -87,7 +87,7 @@ class ClusterDomainEventSpec extends WordSpec with MustMatchers { diffMemberEvents(g1, g2) must be(Seq.empty) diffUnreachable(g1, g2) must be(Seq.empty) - diffSeen(g1, g2) must be(Seq(SeenChanged(convergence = false, seenBy = Set(aUp.address, bUp.address)))) + diffSeen(g1, g2) must be(Seq(SeenChanged(convergence = true, seenBy = Set(aUp.address, bUp.address)))) diffMemberEvents(g2, g1) must be(Seq.empty) diffUnreachable(g2, g1) must be(Seq.empty) diffSeen(g2, g1) must be(Seq(SeenChanged(convergence = true, seenBy = Set(aUp.address, bUp.address, eJoining.address))))