From 0826689c4778efbcb1dc1e6d2f7d1247e3d97ed0 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Thu, 12 Sep 2013 17:19:53 +0200 Subject: [PATCH] =clu #3603 Handle removed member in Gossip and Reachability merge * It was a regression introduced in dc9fe4f * Two problems: 1) Gossip merge could pop back removed member (was previously covered by the filter of unreachable) 2) Reachability merge didn't handle all cases for removed member, i.e. when node not in allowed set --- .../akka/util/BoundedBlockingQueue.scala | 6 ++-- .../src/main/scala/akka/cluster/Member.scala | 8 ++++- .../scala/akka/cluster/Reachability.scala | 34 +++++++++++-------- .../akka/cluster/MultiNodeClusterSpec.scala | 2 +- .../test/scala/akka/cluster/GossipSpec.scala | 17 +++++++++- .../scala/akka/cluster/ReachabilitySpec.scala | 24 +++++++++++++ 6 files changed, 70 insertions(+), 21 deletions(-) diff --git a/akka-actor/src/main/scala/akka/util/BoundedBlockingQueue.scala b/akka-actor/src/main/scala/akka/util/BoundedBlockingQueue.scala index e079b83bbc..fd32479dc5 100644 --- a/akka-actor/src/main/scala/akka/util/BoundedBlockingQueue.scala +++ b/akka-actor/src/main/scala/akka/util/BoundedBlockingQueue.scala @@ -104,8 +104,8 @@ class BoundedBlockingQueue[E <: AnyRef]( @tailrec def pollElement(remainingNanos: Long): E = { backing.poll() match { case null if remainingNanos <= 0 ⇒ null.asInstanceOf[E] - case null ⇒ pollElement(notEmpty.awaitNanos(remainingNanos)) - case e ⇒ { + case null ⇒ pollElement(notEmpty.awaitNanos(remainingNanos)) + case e ⇒ { notFull.signal() e } @@ -120,7 +120,7 @@ class BoundedBlockingQueue[E <: AnyRef]( try { backing.poll() match { case null ⇒ null.asInstanceOf[E] - case e ⇒ + case e ⇒ notFull.signal() e } diff --git a/akka-cluster/src/main/scala/akka/cluster/Member.scala b/akka-cluster/src/main/scala/akka/cluster/Member.scala index c32cf4b5e7..0facef2125 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Member.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Member.scala @@ -127,7 +127,13 @@ object Member { val groupedByAddress = (a.toSeq ++ b.toSeq).groupBy(_.uniqueAddress) // pick highest MemberStatus (Member.none /: groupedByAddress) { - case (acc, (_, members)) ⇒ acc + members.reduceLeft(highestPriorityOf) + case (acc, (_, members)) ⇒ + if (members.size == 2) acc + members.reduceLeft(highestPriorityOf) + else { + val m = members.head + if (Gossip.removeUnreachableWithMemberStatus(m.status)) acc // removed + else acc + m + } } } diff --git a/akka-cluster/src/main/scala/akka/cluster/Reachability.scala b/akka-cluster/src/main/scala/akka/cluster/Reachability.scala index 8de73861b2..dd9f61e666 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Reachability.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Reachability.scala @@ -158,38 +158,43 @@ private[cluster] class Reachability private ( (this.observerRows(observer), other.observerRows(observer)) match { case (None, None) ⇒ case (Some(rows1), Some(rows2)) ⇒ - mergeObserverRows(rows1, rows2, observerVersion1, observerVersion2, recordBuilder) + mergeObserverRows(allowed, rows1, rows2, observerVersion1, observerVersion2, recordBuilder) case (Some(rows1), None) ⇒ - recordBuilder ++= rows1.collect { case (_, r) if r.version > observerVersion2 ⇒ r } + recordBuilder ++= rows1.collect { case (_, r) if r.version > observerVersion2 && allowed(r.subject) ⇒ r } case (None, Some(rows2)) ⇒ - recordBuilder ++= rows2.collect { case (_, r) if r.version > observerVersion1 ⇒ r } + recordBuilder ++= rows2.collect { case (_, r) if r.version > observerVersion1 && allowed(r.subject) ⇒ r } } if (observerVersion2 > observerVersion1) newVersions += (observer -> observerVersion2) } + newVersions = newVersions.filterNot { case (k, _) ⇒ !allowed(k) } + new Reachability(recordBuilder.result(), newVersions) } private def mergeObserverRows( + allowed: immutable.Set[UniqueAddress], rows1: Map[UniqueAddress, Reachability.Record], rows2: Map[UniqueAddress, Reachability.Record], observerVersion1: Long, observerVersion2: Long, recordBuilder: immutable.VectorBuilder[Record]): Unit = { val allSubjects = rows1.keySet ++ rows2.keySet allSubjects foreach { subject ⇒ - (rows1.get(subject), rows2.get(subject)) match { - case (Some(r1), Some(r2)) ⇒ - recordBuilder += (if (r1.version > r2.version) r1 else r2) - case (Some(r1), None) ⇒ - if (r1.version > observerVersion2) - recordBuilder += r1 - case (None, Some(r2)) ⇒ - if (r2.version > observerVersion1) - recordBuilder += r2 - case (None, None) ⇒ - throw new IllegalStateException(s"Unexpected [$subject]") + if (allowed(subject)) { + (rows1.get(subject), rows2.get(subject)) match { + case (Some(r1), Some(r2)) ⇒ + recordBuilder += (if (r1.version > r2.version) r1 else r2) + case (Some(r1), None) ⇒ + if (r1.version > observerVersion2) + recordBuilder += r1 + case (None, Some(r2)) ⇒ + if (r2.version > observerVersion1) + recordBuilder += r2 + case (None, None) ⇒ + throw new IllegalStateException(s"Unexpected [$subject]") + } } } } @@ -284,7 +289,6 @@ private[cluster] class Reachability private ( val record = rows(subject) val aggregated = status(subject) s"${observer.address} -> ${subject.address}: ${record.status} [$aggregated] (${record.version})" - "" } rows.mkString(", ") diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala index 1dbf973521..3408819a39 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala @@ -287,7 +287,7 @@ trait MultiNodeClusterSpec extends Suite with STMultiNodeSpec with WatchedByCoro def awaitMembersUp( numberOfMembers: Int, canNotBePartOfMemberRing: Set[Address] = Set.empty, - timeout: FiniteDuration = 20.seconds): Unit = { + timeout: FiniteDuration = 25.seconds): Unit = { within(timeout) { if (!canNotBePartOfMemberRing.isEmpty) // don't run this on an empty set awaitAssert(canNotBePartOfMemberRing foreach (a ⇒ clusterView.members.map(_.address) must not contain (a))) diff --git a/akka-cluster/src/test/scala/akka/cluster/GossipSpec.scala b/akka-cluster/src/test/scala/akka/cluster/GossipSpec.scala index ea2efb51e0..c989b245e6 100644 --- a/akka-cluster/src/test/scala/akka/cluster/GossipSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/GossipSpec.scala @@ -22,7 +22,6 @@ class GossipSpec extends WordSpec with MustMatchers { val c2 = TestMember(c1.address, Up) val c3 = TestMember(c1.address, Exiting) val d1 = TestMember(Address("akka.tcp", "sys", "d", 2552), Leaving) - val d2 = TestMember(d1.address, Removed) val e1 = TestMember(Address("akka.tcp", "sys", "e", 2552), Joining) val e2 = TestMember(e1.address, Up) val e3 = TestMember(e1.address, Down) @@ -60,6 +59,22 @@ class GossipSpec extends WordSpec with MustMatchers { merged2.overview.reachability.allUnreachable must be(merged1.overview.reachability.allUnreachable) } + "merge members by removing removed members" in { + // c3 removed + val r1 = Reachability.empty.unreachable(b1.uniqueAddress, a1.uniqueAddress) + val g1 = Gossip(members = SortedSet(a1, b1), overview = GossipOverview(reachability = r1)) + val r2 = r1.unreachable(b1.uniqueAddress, c3.uniqueAddress) + val g2 = Gossip(members = SortedSet(a1, b1, c3), overview = GossipOverview(reachability = r2)) + + val merged1 = g1 merge g2 + merged1.members must be(SortedSet(a1, b1)) + merged1.overview.reachability.allUnreachable must be(Set(a1.uniqueAddress)) + + val merged2 = g2 merge g1 + merged2.overview.reachability.allUnreachable must be(merged1.overview.reachability.allUnreachable) + merged2.members must be(merged1.members) + } + "have leader as first member based on ordering, except Exiting status" in { Gossip(members = SortedSet(c2, e2)).leader must be(Some(c2.uniqueAddress)) Gossip(members = SortedSet(c3, e2)).leader must be(Some(e2.uniqueAddress)) diff --git a/akka-cluster/src/test/scala/akka/cluster/ReachabilitySpec.scala b/akka-cluster/src/test/scala/akka/cluster/ReachabilitySpec.scala index fba7104118..0f6967b74b 100644 --- a/akka-cluster/src/test/scala/akka/cluster/ReachabilitySpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/ReachabilitySpec.scala @@ -148,6 +148,30 @@ class ReachabilitySpec extends WordSpec with MustMatchers { merged2.records.toSet must be(merged.records.toSet) } + "merge by taking allowed set into account" in { + val r1 = Reachability.empty.unreachable(nodeB, nodeA).unreachable(nodeC, nodeD) + val r2 = r1.reachable(nodeB, nodeA).unreachable(nodeD, nodeE).unreachable(nodeC, nodeA) + // nodeD not in allowed set + val allowed = Set(nodeA, nodeB, nodeC, nodeE) + val merged = r1.merge(allowed, r2) + + merged.status(nodeB, nodeA) must be(Reachable) + merged.status(nodeC, nodeA) must be(Unreachable) + merged.status(nodeC, nodeD) must be(Reachable) + merged.status(nodeD, nodeE) must be(Reachable) + merged.status(nodeE, nodeA) must be(Reachable) + + merged.isReachable(nodeA) must be(false) + merged.isReachable(nodeD) must be(true) + merged.isReachable(nodeE) must be(true) + + merged.versions.keySet must be(Set(nodeB, nodeC)) + + val merged2 = r2.merge(allowed, r1) + merged2.records.toSet must be(merged.records.toSet) + merged2.versions must be(merged.versions) + } + "merge correctly after pruning" in { val r1 = Reachability.empty.unreachable(nodeB, nodeA).unreachable(nodeC, nodeD) val r2 = r1.unreachable(nodeA, nodeE)