From 8026e216aab1c88676241811abaa8705bb47edb4 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Sat, 19 Sep 2015 21:46:01 +0200 Subject: [PATCH] =cdd #18328 optimize ORSet.mergeCommonKeys MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit AFTER: [info] Benchmark (set1Size) Mode Cnt Score Error Units [info] ORSetMergeBenchmark.mergeAddFromBothNodes 1 thrpt 10 737.646 ± 10.289 ops/ms [info] ORSetMergeBenchmark.mergeAddFromBothNodes 10 thrpt 10 146.706 ± 6.331 ops/ms [info] ORSetMergeBenchmark.mergeAddFromBothNodes 20 thrpt 10 95.553 ± 1.801 ops/ms [info] ORSetMergeBenchmark.mergeAddFromBothNodes 100 thrpt 10 18.321 ± 0.586 ops/ms [info] ORSetMergeBenchmark.mergeAddFromOtherNode 1 thrpt 10 1274.526 ± 23.732 ops/ms [info] ORSetMergeBenchmark.mergeAddFromOtherNode 10 thrpt 10 162.426 ± 20.490 ops/ms [info] ORSetMergeBenchmark.mergeAddFromOtherNode 20 thrpt 10 102.436 ± 2.435 ops/ms [info] ORSetMergeBenchmark.mergeAddFromOtherNode 100 thrpt 10 18.911 ± 0.659 ops/ms [info] ORSetMergeBenchmark.mergeAddFromSameNode 1 thrpt 10 653.358 ± 71.232 ops/ms [info] ORSetMergeBenchmark.mergeAddFromSameNode 10 thrpt 10 147.385 ± 2.750 ops/ms [info] ORSetMergeBenchmark.mergeAddFromSameNode 20 thrpt 10 94.280 ± 0.894 ops/ms [info] ORSetMergeBenchmark.mergeAddFromSameNode 100 thrpt 10 17.922 ± 1.522 ops/ms [info] ORSetMergeBenchmark.mergeComplex 1 thrpt 10 335.060 ± 8.385 ops/ms [info] ORSetMergeBenchmark.mergeComplex 10 thrpt 10 134.438 ± 3.044 ops/ms [info] ORSetMergeBenchmark.mergeComplex 20 thrpt 10 86.015 ± 2.145 ops/ms [info] ORSetMergeBenchmark.mergeComplex 100 thrpt 10 17.611 ± 0.136 ops/ms BEFORE: [info] Benchmark (set1Size) Mode Cnt Score Error Units [info] ORSetMergeBenchmark.mergeAddFromBothNodes 1 thrpt 10 492.291 ± 55.903 ops/ms [info] ORSetMergeBenchmark.mergeAddFromBothNodes 10 thrpt 10 79.890 ± 4.977 ops/ms [info] ORSetMergeBenchmark.mergeAddFromBothNodes 20 thrpt 10 44.560 ± 9.081 ops/ms [info] ORSetMergeBenchmark.mergeAddFromBothNodes 100 thrpt 10 9.405 ± 0.323 ops/ms [info] ORSetMergeBenchmark.mergeAddFromOtherNode 1 thrpt 10 714.043 ± 14.694 ops/ms [info] ORSetMergeBenchmark.mergeAddFromOtherNode 10 thrpt 10 88.281 ± 13.858 ops/ms [info] ORSetMergeBenchmark.mergeAddFromOtherNode 20 thrpt 10 49.001 ± 0.940 ops/ms [info] ORSetMergeBenchmark.mergeAddFromOtherNode 100 thrpt 10 9.379 ± 0.300 ops/ms [info] ORSetMergeBenchmark.mergeAddFromSameNode 1 thrpt 10 487.261 ± 7.457 ops/ms [info] ORSetMergeBenchmark.mergeAddFromSameNode 10 thrpt 10 80.073 ± 9.736 ops/ms [info] ORSetMergeBenchmark.mergeAddFromSameNode 20 thrpt 10 46.385 ± 1.225 ops/ms [info] ORSetMergeBenchmark.mergeAddFromSameNode 100 thrpt 10 9.583 ± 0.234 ops/ms [info] ORSetMergeBenchmark.mergeComplex 1 thrpt 10 313.959 ± 31.419 ops/ms [info] ORSetMergeBenchmark.mergeComplex 10 thrpt 10 79.265 ± 1.126 ops/ms [info] ORSetMergeBenchmark.mergeComplex 20 thrpt 10 44.478 ± 2.077 ops/ms [info] ORSetMergeBenchmark.mergeComplex 100 thrpt 10 9.202 ± 0.564 ops/ms --- .../main/scala/akka/cluster/ddata/ORSet.scala | 40 ++++++++++++------- .../akka/cluster/ddata/VersionVector.scala | 6 ++- 2 files changed, 30 insertions(+), 16 deletions(-) diff --git a/akka-distributed-data/src/main/scala/akka/cluster/ddata/ORSet.scala b/akka-distributed-data/src/main/scala/akka/cluster/ddata/ORSet.scala index ad191c58fa..405c0e00e0 100644 --- a/akka-distributed-data/src/main/scala/akka/cluster/ddata/ORSet.scala +++ b/akka-distributed-data/src/main/scala/akka/cluster/ddata/ORSet.scala @@ -63,8 +63,12 @@ object ORSet { } } - val newDots = dropDots(dot.versions.toList, Nil) - new VersionVector(versions = TreeMap.empty[UniqueAddress, Long] ++ newDots) + if (dot.versions.isEmpty) + VersionVector.empty + else { + val newDots = dropDots(dot.versions.toList, Nil) + new VersionVector(versions = VersionVector.emptyVersions ++ newDots) + } } /** @@ -74,20 +78,26 @@ object ORSet { private[akka] def mergeCommonKeys[A](commonKeys: Set[A], lhs: ORSet[A], rhs: ORSet[A]): Map[A, ORSet.Dot] = { commonKeys.foldLeft(Map.empty[A, ORSet.Dot]) { case (acc, k) ⇒ - val lhsDots = lhs.elementsMap(k).versions - val rhsDots = rhs.elementsMap(k).versions - val commonDots = lhsDots.filter { - case (thisDotNode, v) ⇒ rhsDots.get(thisDotNode).exists(_ == v) + val lhsDots = lhs.elementsMap(k) + val lhsDotsVersions = lhsDots.versions + val rhsDotsVersions = rhs.elementsMap(k).versions + if (lhsDotsVersions.size == 1 && rhsDotsVersions.size == 1 && lhsDotsVersions.head == rhsDotsVersions.head) { + // one single common dot + acc.updated(k, lhsDots) + } else { + val commonDots = lhsDotsVersions.filter { + case (thisDotNode, v) ⇒ rhsDotsVersions.get(thisDotNode).exists(_ == v) + } + val commonDotsKeys = commonDots.keys + val lhsUniqueDots = lhsDotsVersions -- commonDotsKeys + val rhsUniqueDots = rhsDotsVersions -- commonDotsKeys + val lhsKeep = ORSet.subtractDots(new VersionVector(lhsUniqueDots), rhs.vvector) + val rhsKeep = ORSet.subtractDots(new VersionVector(rhsUniqueDots), lhs.vvector) + val merged = lhsKeep.merge(rhsKeep).merge(new VersionVector(versions = commonDots)) + // Perfectly possible that an item in both sets should be dropped + if (merged.versions.isEmpty) acc + else acc.updated(k, merged) } - val commonDotsKeys = commonDots.keys - val lhsUniqueDots = lhsDots -- commonDotsKeys - val rhsUniqueDots = rhsDots -- commonDotsKeys - val lhsKeep = ORSet.subtractDots(new VersionVector(lhsUniqueDots), rhs.vvector) - val rhsKeep = ORSet.subtractDots(new VersionVector(rhsUniqueDots), lhs.vvector) - val merged = lhsKeep.merge(rhsKeep).merge(new VersionVector(versions = commonDots)) - // Perfectly possible that an item in both sets should be dropped - if (merged.versions.isEmpty) acc - else acc.updated(k, merged) } } diff --git a/akka-distributed-data/src/main/scala/akka/cluster/ddata/VersionVector.scala b/akka-distributed-data/src/main/scala/akka/cluster/ddata/VersionVector.scala index 7b4e4e3ba9..38cfddb79d 100644 --- a/akka-distributed-data/src/main/scala/akka/cluster/ddata/VersionVector.scala +++ b/akka-distributed-data/src/main/scala/akka/cluster/ddata/VersionVector.scala @@ -16,7 +16,11 @@ import akka.cluster.UniqueAddress */ object VersionVector { - val empty: VersionVector = new VersionVector(TreeMap.empty[UniqueAddress, Long]) + /** + * INTERNAL API + */ + private[akka] val emptyVersions: TreeMap[UniqueAddress, Long] = TreeMap.empty + val empty: VersionVector = new VersionVector(emptyVersions) def apply(): VersionVector = empty /** * Java API