diff --git a/akka-actor/src/main/scala/akka/util/BoundedBlockingQueue.scala b/akka-actor/src/main/scala/akka/util/BoundedBlockingQueue.scala index e079b83bbc..fd32479dc5 100644 --- a/akka-actor/src/main/scala/akka/util/BoundedBlockingQueue.scala +++ b/akka-actor/src/main/scala/akka/util/BoundedBlockingQueue.scala @@ -104,8 +104,8 @@ class BoundedBlockingQueue[E <: AnyRef]( @tailrec def pollElement(remainingNanos: Long): E = { backing.poll() match { case null if remainingNanos <= 0 ⇒ null.asInstanceOf[E] - case null ⇒ pollElement(notEmpty.awaitNanos(remainingNanos)) - case e ⇒ { + case null ⇒ pollElement(notEmpty.awaitNanos(remainingNanos)) + case e ⇒ { notFull.signal() e } @@ -120,7 +120,7 @@ class BoundedBlockingQueue[E <: AnyRef]( try { backing.poll() match { case null ⇒ null.asInstanceOf[E] - case e ⇒ + case e ⇒ notFull.signal() e } diff --git a/akka-cluster/src/main/scala/akka/cluster/Member.scala b/akka-cluster/src/main/scala/akka/cluster/Member.scala index c32cf4b5e7..0facef2125 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Member.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Member.scala @@ -127,7 +127,13 @@ object Member { val groupedByAddress = (a.toSeq ++ b.toSeq).groupBy(_.uniqueAddress) // pick highest MemberStatus (Member.none /: groupedByAddress) { - case (acc, (_, members)) ⇒ acc + members.reduceLeft(highestPriorityOf) + case (acc, (_, members)) ⇒ + if (members.size == 2) acc + members.reduceLeft(highestPriorityOf) + else { + val m = members.head + if (Gossip.removeUnreachableWithMemberStatus(m.status)) acc // removed + else acc + m + } } } diff --git a/akka-cluster/src/main/scala/akka/cluster/Reachability.scala b/akka-cluster/src/main/scala/akka/cluster/Reachability.scala index 8de73861b2..dd9f61e666 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Reachability.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Reachability.scala @@ -158,38 +158,43 @@ private[cluster] class Reachability private ( (this.observerRows(observer), other.observerRows(observer)) match { case (None, None) ⇒ case (Some(rows1), Some(rows2)) ⇒ - mergeObserverRows(rows1, rows2, observerVersion1, observerVersion2, recordBuilder) + mergeObserverRows(allowed, rows1, rows2, observerVersion1, observerVersion2, recordBuilder) case (Some(rows1), None) ⇒ - recordBuilder ++= rows1.collect { case (_, r) if r.version > observerVersion2 ⇒ r } + recordBuilder ++= rows1.collect { case (_, r) if r.version > observerVersion2 && allowed(r.subject) ⇒ r } case (None, Some(rows2)) ⇒ - recordBuilder ++= rows2.collect { case (_, r) if r.version > observerVersion1 ⇒ r } + recordBuilder ++= rows2.collect { case (_, r) if r.version > observerVersion1 && allowed(r.subject) ⇒ r } } if (observerVersion2 > observerVersion1) newVersions += (observer -> observerVersion2) } + newVersions = newVersions.filterNot { case (k, _) ⇒ !allowed(k) } + new Reachability(recordBuilder.result(), newVersions) } private def mergeObserverRows( + allowed: immutable.Set[UniqueAddress], rows1: Map[UniqueAddress, Reachability.Record], rows2: Map[UniqueAddress, Reachability.Record], observerVersion1: Long, observerVersion2: Long, recordBuilder: immutable.VectorBuilder[Record]): Unit = { val allSubjects = rows1.keySet ++ rows2.keySet allSubjects foreach { subject ⇒ - (rows1.get(subject), rows2.get(subject)) match { - case (Some(r1), Some(r2)) ⇒ - recordBuilder += (if (r1.version > r2.version) r1 else r2) - case (Some(r1), None) ⇒ - if (r1.version > observerVersion2) - recordBuilder += r1 - case (None, Some(r2)) ⇒ - if (r2.version > observerVersion1) - recordBuilder += r2 - case (None, None) ⇒ - throw new IllegalStateException(s"Unexpected [$subject]") + if (allowed(subject)) { + (rows1.get(subject), rows2.get(subject)) match { + case (Some(r1), Some(r2)) ⇒ + recordBuilder += (if (r1.version > r2.version) r1 else r2) + case (Some(r1), None) ⇒ + if (r1.version > observerVersion2) + recordBuilder += r1 + case (None, Some(r2)) ⇒ + if (r2.version > observerVersion1) + recordBuilder += r2 + case (None, None) ⇒ + throw new IllegalStateException(s"Unexpected [$subject]") + } } } } @@ -284,7 +289,6 @@ private[cluster] class Reachability private ( val record = rows(subject) val aggregated = status(subject) s"${observer.address} -> ${subject.address}: ${record.status} [$aggregated] (${record.version})" - "" } rows.mkString(", ") diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala index 1dbf973521..3408819a39 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala @@ -287,7 +287,7 @@ trait MultiNodeClusterSpec extends Suite with STMultiNodeSpec with WatchedByCoro def awaitMembersUp( numberOfMembers: Int, canNotBePartOfMemberRing: Set[Address] = Set.empty, - timeout: FiniteDuration = 20.seconds): Unit = { + timeout: FiniteDuration = 25.seconds): Unit = { within(timeout) { if (!canNotBePartOfMemberRing.isEmpty) // don't run this on an empty set awaitAssert(canNotBePartOfMemberRing foreach (a ⇒ clusterView.members.map(_.address) must not contain (a))) diff --git a/akka-cluster/src/test/scala/akka/cluster/GossipSpec.scala b/akka-cluster/src/test/scala/akka/cluster/GossipSpec.scala index ea2efb51e0..c989b245e6 100644 --- a/akka-cluster/src/test/scala/akka/cluster/GossipSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/GossipSpec.scala @@ -22,7 +22,6 @@ class GossipSpec extends WordSpec with MustMatchers { val c2 = TestMember(c1.address, Up) val c3 = TestMember(c1.address, Exiting) val d1 = TestMember(Address("akka.tcp", "sys", "d", 2552), Leaving) - val d2 = TestMember(d1.address, Removed) val e1 = TestMember(Address("akka.tcp", "sys", "e", 2552), Joining) val e2 = TestMember(e1.address, Up) val e3 = TestMember(e1.address, Down) @@ -60,6 +59,22 @@ class GossipSpec extends WordSpec with MustMatchers { merged2.overview.reachability.allUnreachable must be(merged1.overview.reachability.allUnreachable) } + "merge members by removing removed members" in { + // c3 removed + val r1 = Reachability.empty.unreachable(b1.uniqueAddress, a1.uniqueAddress) + val g1 = Gossip(members = SortedSet(a1, b1), overview = GossipOverview(reachability = r1)) + val r2 = r1.unreachable(b1.uniqueAddress, c3.uniqueAddress) + val g2 = Gossip(members = SortedSet(a1, b1, c3), overview = GossipOverview(reachability = r2)) + + val merged1 = g1 merge g2 + merged1.members must be(SortedSet(a1, b1)) + merged1.overview.reachability.allUnreachable must be(Set(a1.uniqueAddress)) + + val merged2 = g2 merge g1 + merged2.overview.reachability.allUnreachable must be(merged1.overview.reachability.allUnreachable) + merged2.members must be(merged1.members) + } + "have leader as first member based on ordering, except Exiting status" in { Gossip(members = SortedSet(c2, e2)).leader must be(Some(c2.uniqueAddress)) Gossip(members = SortedSet(c3, e2)).leader must be(Some(e2.uniqueAddress)) diff --git a/akka-cluster/src/test/scala/akka/cluster/ReachabilitySpec.scala b/akka-cluster/src/test/scala/akka/cluster/ReachabilitySpec.scala index fba7104118..0f6967b74b 100644 --- a/akka-cluster/src/test/scala/akka/cluster/ReachabilitySpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/ReachabilitySpec.scala @@ -148,6 +148,30 @@ class ReachabilitySpec extends WordSpec with MustMatchers { merged2.records.toSet must be(merged.records.toSet) } + "merge by taking allowed set into account" in { + val r1 = Reachability.empty.unreachable(nodeB, nodeA).unreachable(nodeC, nodeD) + val r2 = r1.reachable(nodeB, nodeA).unreachable(nodeD, nodeE).unreachable(nodeC, nodeA) + // nodeD not in allowed set + val allowed = Set(nodeA, nodeB, nodeC, nodeE) + val merged = r1.merge(allowed, r2) + + merged.status(nodeB, nodeA) must be(Reachable) + merged.status(nodeC, nodeA) must be(Unreachable) + merged.status(nodeC, nodeD) must be(Reachable) + merged.status(nodeD, nodeE) must be(Reachable) + merged.status(nodeE, nodeA) must be(Reachable) + + merged.isReachable(nodeA) must be(false) + merged.isReachable(nodeD) must be(true) + merged.isReachable(nodeE) must be(true) + + merged.versions.keySet must be(Set(nodeB, nodeC)) + + val merged2 = r2.merge(allowed, r1) + merged2.records.toSet must be(merged.records.toSet) + merged2.versions must be(merged.versions) + } + "merge correctly after pruning" in { val r1 = Reachability.empty.unreachable(nodeB, nodeA).unreachable(nodeC, nodeD) val r2 = r1.unreachable(nodeA, nodeE)