From 5b89d25c37fc7836e4082f581c87feedb6f89410 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Wed, 13 Jun 2012 15:23:45 +0200 Subject: [PATCH] Add invariant assertions to Gossip, see #2077 * Add doc about how members are "moved" --- .../src/main/scala/akka/cluster/Cluster.scala | 71 +++++++++++++++---- .../test/scala/akka/cluster/GossipSpec.scala | 39 ++++++---- 2 files changed, 85 insertions(+), 25 deletions(-) diff --git a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala index 07712d8ed9..3fecd7524b 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala @@ -147,7 +147,13 @@ case class GossipEnvelope(from: Address, gossip: Gossip) extends ClusterMessage * * Can be one of: Joining, Up, Leaving, Exiting and Down. */ -sealed trait MemberStatus extends ClusterMessage +sealed trait MemberStatus extends ClusterMessage { + /** + * Using the same notion for 'unavailable' as 'non-convergence': DOWN and REMOVED. + */ + def isUnavailable: Boolean = this == MemberStatus.Down || this == MemberStatus.Removed +} + object MemberStatus { case object Joining extends MemberStatus case object Up extends MemberStatus @@ -155,11 +161,6 @@ object MemberStatus { case object Exiting extends MemberStatus case object Down extends MemberStatus case object Removed extends MemberStatus - - /** - * Using the same notion for 'unavailable' as 'non-convergence': DOWN and REMOVED. - */ - def isUnavailable(status: MemberStatus): Boolean = status == MemberStatus.Down || status == MemberStatus.Removed } /** @@ -169,8 +170,6 @@ case class GossipOverview( seen: Map[Address, VectorClock] = Map.empty[Address, VectorClock], unreachable: Set[Member] = Set.empty[Member]) { - // FIXME document when nodes are put in 'unreachable' set and removed from 'members' - override def toString = "GossipOverview(seen = [" + seen.mkString(", ") + "], unreachable = [" + unreachable.mkString(", ") + @@ -182,7 +181,31 @@ object Gossip { } /** - * Represents the state of the cluster; cluster ring membership, ring convergence, meta data - all versioned by a vector clock. + * Represents the state of the cluster; cluster ring membership, ring convergence, meta data - + * all versioned by a vector clock. + * + * When a node is joining the Member, with status Joining, is added to `members`. + * If the joining node was downed it is moved from `overview.unreachable` (status Down) + * to `members` (status Joining). It cannot rejoin if not first downed. + * + * When convergence is reached the leader change status of `members` from Joining + * to Up. + * + * When failure detector consider a node as unavailble it will be moved from + * `members` to `overview.unreachable`. + * + * When a node is downed, either manually or automatically, it is moved from `members` + * to `overview.unreachable` (status Down). It is also removed from `overview.seen` + * table. The node will reside as Down in the `overview.unreachable` set until joining + * again and it will then go through the normal joining procedure. + * + * When a Gossip is received the version (vector clock) is used to determine if the + * received Gossip is newer or older than the current local Gossip. The received Gossip + * and local Gossip is merged in case of concurrent vector clocks, i.e. not same history. + * When merged the seen table is cleared. + * + * TODO document leaving, exiting and removed when that is implemented + * */ case class Gossip( overview: GossipOverview = GossipOverview(), @@ -192,6 +215,28 @@ case class Gossip( extends ClusterMessage // is a serializable cluster message with Versioned[Gossip] { + // FIXME can be disabled as optimization + assertInvariants + private def assertInvariants: Unit = { + val unreachableAndLive = members.intersect(overview.unreachable) + if (unreachableAndLive.nonEmpty) + throw new IllegalArgumentException("Same nodes in both members and unreachable is not allowed, got [%s]" + format unreachableAndLive.mkString(", ")) + + val allowedLiveMemberStatuses: Set[MemberStatus] = Set(MemberStatus.Joining, MemberStatus.Up, MemberStatus.Leaving, MemberStatus.Exiting) + def hasNotAllowedLiveMemberStatus(m: Member) = !allowedLiveMemberStatuses.contains(m.status) + if (members exists hasNotAllowedLiveMemberStatus) + throw new IllegalArgumentException("Live members must have status [%s], got [%s]" + format (allowedLiveMemberStatuses.mkString(", "), + (members filter hasNotAllowedLiveMemberStatus).mkString(", "))) + + val seenButNotMember = overview.seen.keySet -- members.map(_.address) -- overview.unreachable.map(_.address) + if (seenButNotMember.nonEmpty) + throw new IllegalArgumentException("Nodes not part of cluster have marked the Gossip as seen, got [%s]" + format seenButNotMember.mkString(", ")) + + } + /** * Increments the version for this 'Node'. */ @@ -223,7 +268,7 @@ case class Gossip( // 2. merge meta-data val mergedMeta = this.meta ++ that.meta - def reduceHighestPriority(a: Seq[Member], b: Seq[Member]): Set[Member] = { + def pickHighestPriority(a: Seq[Member], b: Seq[Member]): Set[Member] = { // group all members by Address => Seq[Member] val groupedByAddress = (a ++ b).groupBy(_.address) // pick highest MemberStatus @@ -233,11 +278,11 @@ case class Gossip( } // 3. merge unreachable by selecting the single Member with highest MemberStatus out of the Member groups - val mergedUnreachable = reduceHighestPriority(this.overview.unreachable.toSeq, that.overview.unreachable.toSeq) + val mergedUnreachable = pickHighestPriority(this.overview.unreachable.toSeq, that.overview.unreachable.toSeq) // 4. merge members by selecting the single Member with highest MemberStatus out of the Member groups, // and exclude unreachable - val mergedMembers = Gossip.emptyMembers ++ reduceHighestPriority(this.members.toSeq, that.members.toSeq). + val mergedMembers = Gossip.emptyMembers ++ pickHighestPriority(this.members.toSeq, that.members.toSeq). filterNot(mergedUnreachable.contains) // 5. fresh seen table @@ -1145,7 +1190,7 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) val localMembers = localGossip.members val localUnreachableMembers = localOverview.unreachable val isUnreachable = localUnreachableMembers exists { _.address == selfAddress } - val hasUnavailableMemberStatus = localMembers exists { m ⇒ (m == self) && MemberStatus.isUnavailable(m.status) } + val hasUnavailableMemberStatus = localMembers exists { m ⇒ (m == self) && m.status.isUnavailable } isUnreachable || hasUnavailableMemberStatus } diff --git a/akka-cluster/src/test/scala/akka/cluster/GossipSpec.scala b/akka-cluster/src/test/scala/akka/cluster/GossipSpec.scala index 8c790cf159..449ebf7bff 100644 --- a/akka-cluster/src/test/scala/akka/cluster/GossipSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/GossipSpec.scala @@ -22,28 +22,30 @@ class GossipSpec extends WordSpec with MustMatchers { val c2 = Member(Address("akka", "sys", "c", 2552), Up) val d1 = Member(Address("akka", "sys", "d", 2552), Leaving) val d2 = Member(Address("akka", "sys", "d", 2552), Removed) + val e1 = Member(Address("akka", "sys", "e", 2552), Joining) + val e2 = Member(Address("akka", "sys", "e", 2552), Up) "A Gossip" must { "merge members by status priority" in { - val g1 = Gossip(members = SortedSet(a1, b1, c1, d1)) - val g2 = Gossip(members = SortedSet(a2, b2, c2, d2)) + val g1 = Gossip(members = SortedSet(a1, c1, e1)) + val g2 = Gossip(members = SortedSet(a2, c2, e2)) val merged1 = g1 merge g2 - merged1.members must be(SortedSet(a1, b2, c1, d2)) - merged1.members.toSeq.map(_.status) must be(Seq(Up, Removed, Leaving, Removed)) + merged1.members must be(SortedSet(a1, c1, e2)) + merged1.members.toSeq.map(_.status) must be(Seq(Up, Leaving, Up)) val merged2 = g2 merge g1 - merged2.members must be(SortedSet(a1, b2, c1, d2)) - merged2.members.toSeq.map(_.status) must be(Seq(Up, Removed, Leaving, Removed)) + merged2.members must be(SortedSet(a1, c1, e2)) + merged2.members.toSeq.map(_.status) must be(Seq(Up, Leaving, Up)) } "merge unreachable by status priority" in { - val g1 = Gossip(members = Gossip.emptyMembers, overview = GossipOverview(unreachable = SortedSet(a1, b1, c1, d1))) - val g2 = Gossip(members = Gossip.emptyMembers, overview = GossipOverview(unreachable = SortedSet(a2, b2, c2, d2))) + val g1 = Gossip(members = Gossip.emptyMembers, overview = GossipOverview(unreachable = Set(a1, b1, c1, d1))) + val g2 = Gossip(members = Gossip.emptyMembers, overview = GossipOverview(unreachable = Set(a2, b2, c2, d2))) val merged1 = g1 merge g2 merged1.overview.unreachable must be(Set(a1, b2, c1, d2)) @@ -56,8 +58,8 @@ class GossipSpec extends WordSpec with MustMatchers { } "merge by excluding unreachable from members" in { - val g1 = Gossip(members = SortedSet(a1, b1), overview = GossipOverview(unreachable = SortedSet(c1, d1))) - val g2 = Gossip(members = SortedSet(a2, c2), overview = GossipOverview(unreachable = SortedSet(b2, d2))) + val g1 = Gossip(members = SortedSet(a1, b1), overview = GossipOverview(unreachable = Set(c1, d1))) + val g2 = Gossip(members = SortedSet(a2, c2), overview = GossipOverview(unreachable = Set(b2, d2))) val merged1 = g1 merge g2 merged1.members must be(SortedSet(a1)) @@ -74,8 +76,8 @@ class GossipSpec extends WordSpec with MustMatchers { } "start with fresh seen table after merge" in { - val g1 = Gossip(members = SortedSet(a1, b1, c1, d1)).seen(a1.address).seen(b1.address) - val g2 = Gossip(members = SortedSet(a2, b2, c2, d2)).seen(b2.address).seen(c2.address) + val g1 = Gossip(members = SortedSet(a1, e1)).seen(a1.address).seen(a1.address) + val g2 = Gossip(members = SortedSet(a2, e2)).seen(e2.address).seen(e2.address) val merged1 = g1 merge g2 merged1.overview.seen.isEmpty must be(true) @@ -85,5 +87,18 @@ class GossipSpec extends WordSpec with MustMatchers { } + "not have node in both members and unreachable" in intercept[IllegalArgumentException] { + Gossip(members = SortedSet(a1, b1), overview = GossipOverview(unreachable = Set(b2))) + } + + "not have live members with wrong status" in intercept[IllegalArgumentException] { + // b2 is Removed + Gossip(members = SortedSet(a2, b2)) + } + + "not have non cluster members in seen table" in intercept[IllegalArgumentException] { + Gossip(members = SortedSet(a1, e1)).seen(a1.address).seen(e1.address).seen(b1.address) + } + } }