=clu #3608 Simplify Reachability merge
* No need to merge the individual rows for one observer, simply pick the rows from the observer with highest version
This commit is contained in:
parent
6246099694
commit
d9eda5fbfa
1 changed files with 6 additions and 28 deletions
|
|
@ -158,11 +158,14 @@ private[cluster] class Reachability private (
|
|||
(this.observerRows(observer), other.observerRows(observer)) match {
|
||||
case (None, None) ⇒
|
||||
case (Some(rows1), Some(rows2)) ⇒
|
||||
mergeObserverRows(allowed, rows1, rows2, observerVersion1, observerVersion2, recordBuilder)
|
||||
val rows = if (observerVersion1 > observerVersion2) rows1 else rows2
|
||||
recordBuilder ++= rows.collect { case (_, r) if allowed(r.subject) ⇒ r }
|
||||
case (Some(rows1), None) ⇒
|
||||
recordBuilder ++= rows1.collect { case (_, r) if r.version > observerVersion2 && allowed(r.subject) ⇒ r }
|
||||
if (observerVersion1 > observerVersion2)
|
||||
recordBuilder ++= rows1.collect { case (_, r) if allowed(r.subject) ⇒ r }
|
||||
case (None, Some(rows2)) ⇒
|
||||
recordBuilder ++= rows2.collect { case (_, r) if r.version > observerVersion1 && allowed(r.subject) ⇒ r }
|
||||
if (observerVersion2 > observerVersion1)
|
||||
recordBuilder ++= rows2.collect { case (_, r) if allowed(r.subject) ⇒ r }
|
||||
}
|
||||
|
||||
if (observerVersion2 > observerVersion1)
|
||||
|
|
@ -174,31 +177,6 @@ private[cluster] class Reachability private (
|
|||
new Reachability(recordBuilder.result(), newVersions)
|
||||
}
|
||||
|
||||
private def mergeObserverRows(
|
||||
allowed: immutable.Set[UniqueAddress],
|
||||
rows1: Map[UniqueAddress, Reachability.Record], rows2: Map[UniqueAddress, Reachability.Record],
|
||||
observerVersion1: Long, observerVersion2: Long,
|
||||
recordBuilder: immutable.VectorBuilder[Record]): Unit = {
|
||||
|
||||
val allSubjects = rows1.keySet ++ rows2.keySet
|
||||
allSubjects foreach { subject ⇒
|
||||
if (allowed(subject)) {
|
||||
(rows1.get(subject), rows2.get(subject)) match {
|
||||
case (Some(r1), Some(r2)) ⇒
|
||||
recordBuilder += (if (r1.version > r2.version) r1 else r2)
|
||||
case (Some(r1), None) ⇒
|
||||
if (r1.version > observerVersion2)
|
||||
recordBuilder += r1
|
||||
case (None, Some(r2)) ⇒
|
||||
if (r2.version > observerVersion1)
|
||||
recordBuilder += r2
|
||||
case (None, None) ⇒
|
||||
throw new IllegalStateException(s"Unexpected [$subject]")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
def remove(nodes: Iterable[UniqueAddress]): Reachability = {
|
||||
val nodesSet = nodes.to[immutable.HashSet]
|
||||
val newRecords = records.filterNot(r ⇒ nodesSet(r.observer) || nodesSet(r.subject))
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue