diff --git a/akka-cluster/src/main/scala/akka/cluster/Gossip.scala b/akka-cluster/src/main/scala/akka/cluster/Gossip.scala index ef008cb7f7..d29ceec97c 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Gossip.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Gossip.scala @@ -71,19 +71,31 @@ private[cluster] final case class Gossip( private def assertInvariants(): Unit = { - if (members.exists(_.status == Removed)) - throw new IllegalArgumentException(s"Live members must not have status [${Removed}], " + - s"got [${members.filter(_.status == Removed)}]") + def ifTrueThrow(func: ⇒ Boolean, expected: String, actual: String): Unit = + if (func) throw new IllegalArgumentException(s"$expected, but found [$actual]") + + ifTrueThrow( + members.exists(_.status == Removed), + expected = s"Live members must not have status [$Removed]", + actual = s"${members.filter(_.status == Removed)}") val inReachabilityButNotMember = overview.reachability.allObservers diff members.map(_.uniqueAddress) - if (inReachabilityButNotMember.nonEmpty) - throw new IllegalArgumentException("Nodes not part of cluster in reachability table, got [%s]" - format inReachabilityButNotMember.mkString(", ")) + ifTrueThrow( + inReachabilityButNotMember.nonEmpty, + expected = "Nodes not part of cluster in reachability table", + actual = inReachabilityButNotMember.mkString(", ")) + + val inReachabilityVersionsButNotMember = overview.reachability.versions.keySet diff members.map(_.uniqueAddress) + ifTrueThrow( + inReachabilityVersionsButNotMember.nonEmpty, + expected = "Nodes not part of cluster in reachability versions table", + actual = inReachabilityVersionsButNotMember.mkString(", ")) val seenButNotMember = overview.seen diff members.map(_.uniqueAddress) - if (seenButNotMember.nonEmpty) - throw new IllegalArgumentException("Nodes not part of cluster have marked the Gossip as seen, got [%s]" - format seenButNotMember.mkString(", ")) + ifTrueThrow( + seenButNotMember.nonEmpty, + expected = "Nodes not part of cluster have marked the Gossip as seen", + actual = seenButNotMember.mkString(", ")) } @transient private lazy val membersMap: Map[UniqueAddress, Member] = diff --git a/akka-cluster/src/main/scala/akka/cluster/Reachability.scala b/akka-cluster/src/main/scala/akka/cluster/Reachability.scala index 7fd1806318..1a7848aea5 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Reachability.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Reachability.scala @@ -276,7 +276,7 @@ private[cluster] class Reachability private ( } } - def allObservers: Set[UniqueAddress] = versions.keySet + def allObservers: Set[UniqueAddress] = records.iterator.map(_.observer).toSet def recordsFrom(observer: UniqueAddress): immutable.IndexedSeq[Record] = { observerRows(observer) match { diff --git a/akka-cluster/src/test/scala/akka/cluster/ReachabilityPerfSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ReachabilityPerfSpec.scala index 8c81c791da..8373e7108b 100644 --- a/akka-cluster/src/test/scala/akka/cluster/ReachabilityPerfSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/ReachabilityPerfSpec.scala @@ -27,8 +27,8 @@ class ReachabilityPerfSpec extends WordSpec with Matchers { } private def addUnreachable(base: Reachability, count: Int): Reachability = { - val observers = base.allObservers.take(count) - val subjects = Stream.continually(base.allObservers).flatten.iterator + val observers = base.versions.keySet.take(count) + val subjects = Stream.continually(base.versions.keySet).flatten.iterator observers.foldLeft(base) { case (r, o) ⇒ (1 to 5).foldLeft(r) { case (r, _) ⇒ r.unreachable(o, subjects.next()) } @@ -38,7 +38,7 @@ class ReachabilityPerfSpec extends WordSpec with Matchers { val reachability1 = createReachabilityOfSize(Reachability.empty, nodesSize) val reachability2 = createReachabilityOfSize(reachability1, nodesSize) val reachability3 = addUnreachable(reachability1, nodesSize / 2) - val allowed = reachability1.allObservers + val allowed = reachability1.versions.keySet private def checkThunkFor(r1: Reachability, r2: Reachability, thunk: (Reachability, Reachability) ⇒ Unit, times: Int): Unit = { for (i ← 1 to times) { diff --git a/akka-cluster/src/test/scala/akka/cluster/ReachabilitySpec.scala b/akka-cluster/src/test/scala/akka/cluster/ReachabilitySpec.scala index b27f6c906a..32bed8689f 100644 --- a/akka-cluster/src/test/scala/akka/cluster/ReachabilitySpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/ReachabilitySpec.scala @@ -235,5 +235,17 @@ class ReachabilitySpec extends WordSpec with Matchers { r2.versions.keySet should ===(Set(nodeD)) } + "be able to filter records" in { + val r = Reachability.empty + .unreachable(nodeC, nodeB) + .unreachable(nodeB, nodeA) + .unreachable(nodeB, nodeC) + + val filtered1 = r.filterRecords(record ⇒ record.observer != nodeC) + filtered1.isReachable(nodeB) should ===(true) + filtered1.isReachable(nodeA) should ===(false) + filtered1.allObservers should ===(Set(nodeB)) + } + } }