diff --git a/akka-cluster/src/main/scala/akka/cluster/Reachability.scala b/akka-cluster/src/main/scala/akka/cluster/Reachability.scala index dd9f61e666..b895ee1fb7 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Reachability.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Reachability.scala @@ -158,11 +158,14 @@ private[cluster] class Reachability private ( (this.observerRows(observer), other.observerRows(observer)) match { case (None, None) ⇒ case (Some(rows1), Some(rows2)) ⇒ - mergeObserverRows(allowed, rows1, rows2, observerVersion1, observerVersion2, recordBuilder) + val rows = if (observerVersion1 > observerVersion2) rows1 else rows2 + recordBuilder ++= rows.collect { case (_, r) if allowed(r.subject) ⇒ r } case (Some(rows1), None) ⇒ - recordBuilder ++= rows1.collect { case (_, r) if r.version > observerVersion2 && allowed(r.subject) ⇒ r } + if (observerVersion1 > observerVersion2) + recordBuilder ++= rows1.collect { case (_, r) if allowed(r.subject) ⇒ r } case (None, Some(rows2)) ⇒ - recordBuilder ++= rows2.collect { case (_, r) if r.version > observerVersion1 && allowed(r.subject) ⇒ r } + if (observerVersion2 > observerVersion1) + recordBuilder ++= rows2.collect { case (_, r) if allowed(r.subject) ⇒ r } } if (observerVersion2 > observerVersion1) @@ -174,31 +177,6 @@ private[cluster] class Reachability private ( new Reachability(recordBuilder.result(), newVersions) } - private def mergeObserverRows( - allowed: immutable.Set[UniqueAddress], - rows1: Map[UniqueAddress, Reachability.Record], rows2: Map[UniqueAddress, Reachability.Record], - observerVersion1: Long, observerVersion2: Long, - recordBuilder: immutable.VectorBuilder[Record]): Unit = { - - val allSubjects = rows1.keySet ++ rows2.keySet - allSubjects foreach { subject ⇒ - if (allowed(subject)) { - (rows1.get(subject), rows2.get(subject)) match { - case (Some(r1), Some(r2)) ⇒ - recordBuilder += (if (r1.version > r2.version) r1 else r2) - case (Some(r1), None) ⇒ - if (r1.version > observerVersion2) - recordBuilder += r1 - case (None, Some(r2)) ⇒ - if (r2.version > observerVersion1) - recordBuilder += r2 - case (None, None) ⇒ - throw new IllegalStateException(s"Unexpected [$subject]") - } - } - } - } - def remove(nodes: Iterable[UniqueAddress]): Reachability = { val nodesSet = nodes.to[immutable.HashSet] val newRecords = records.filterNot(r ⇒ nodesSet(r.observer) || nodesSet(r.subject))