From c11b600cc1c8f7b28da58ba1ed96d040631cc992 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Mon, 21 Sep 2015 13:09:19 +0200 Subject: [PATCH] !cdd #18328 optimize VersionVector for size 1 (typical dots) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit AFTER: [info] Benchmark (set1Size) Mode Cnt Score Error Units [info] ORSetMergeBenchmark.mergeAddFromBothNodes 1 thrpt 10 2007.939 ± 74.673 ops/ms [info] ORSetMergeBenchmark.mergeAddFromBothNodes 10 thrpt 10 337.110 ± 15.055 ops/ms [info] ORSetMergeBenchmark.mergeAddFromBothNodes 20 thrpt 10 223.600 ± 8.403 ops/ms [info] ORSetMergeBenchmark.mergeAddFromBothNodes 100 thrpt 10 46.697 ± 2.136 ops/ms [info] ORSetMergeBenchmark.mergeAddFromOtherNode 1 thrpt 10 2542.537 ± 120.697 ops/ms [info] ORSetMergeBenchmark.mergeAddFromOtherNode 10 thrpt 10 365.694 ± 17.571 ops/ms [info] ORSetMergeBenchmark.mergeAddFromOtherNode 20 thrpt 10 216.323 ± 9.446 ops/ms [info] ORSetMergeBenchmark.mergeAddFromOtherNode 100 thrpt 10 49.563 ± 2.725 ops/ms [info] ORSetMergeBenchmark.mergeAddFromSameNode 1 thrpt 10 9883.186 ± 725.672 ops/ms [info] ORSetMergeBenchmark.mergeAddFromSameNode 10 thrpt 10 3266.528 ± 189.993 ops/ms [info] ORSetMergeBenchmark.mergeAddFromSameNode 20 thrpt 10 3206.017 ± 124.623 ops/ms [info] ORSetMergeBenchmark.mergeAddFromSameNode 100 thrpt 10 2709.031 ± 162.182 ops/ms [info] ORSetMergeBenchmark.mergeComplex 1 thrpt 10 572.704 ± 21.504 ops/ms [info] ORSetMergeBenchmark.mergeComplex 10 thrpt 10 249.226 ± 12.324 ops/ms [info] ORSetMergeBenchmark.mergeComplex 20 thrpt 10 170.560 ± 10.320 ops/ms [info] ORSetMergeBenchmark.mergeComplex 100 thrpt 10 46.373 ± 1.800 ops/ms BEFORE: [info] Benchmark (set1Size) Mode Cnt Score Error Units [info] ORSetMergeBenchmark.mergeAddFromBothNodes 1 thrpt 10 885.664 ± 99.718 ops/ms [info] ORSetMergeBenchmark.mergeAddFromBothNodes 10 thrpt 10 304.617 ± 4.755 ops/ms [info] ORSetMergeBenchmark.mergeAddFromBothNodes 20 thrpt 10 200.977 ± 3.708 ops/ms [info] ORSetMergeBenchmark.mergeAddFromBothNodes 100 thrpt 10 47.879 ± 4.352 ops/ms [info] ORSetMergeBenchmark.mergeAddFromOtherNode 1 thrpt 10 1586.848 ± 27.476 ops/ms [info] ORSetMergeBenchmark.mergeAddFromOtherNode 10 thrpt 10 354.408 ± 4.772 ops/ms [info] ORSetMergeBenchmark.mergeAddFromOtherNode 20 thrpt 10 210.563 ± 32.914 ops/ms [info] ORSetMergeBenchmark.mergeAddFromOtherNode 100 thrpt 10 52.750 ± 0.698 ops/ms [info] ORSetMergeBenchmark.mergeAddFromSameNode 1 thrpt 10 3915.817 ± 420.643 ops/ms [info] ORSetMergeBenchmark.mergeAddFromSameNode 10 thrpt 10 2369.476 ± 250.336 ops/ms [info] ORSetMergeBenchmark.mergeAddFromSameNode 20 thrpt 10 2378.924 ± 47.160 ops/ms [info] ORSetMergeBenchmark.mergeAddFromSameNode 100 thrpt 10 2167.841 ± 20.339 ops/ms [info] ORSetMergeBenchmark.mergeComplex 1 thrpt 10 387.261 ± 8.820 ops/ms [info] ORSetMergeBenchmark.mergeComplex 10 thrpt 10 212.661 ± 4.802 ops/ms [info] ORSetMergeBenchmark.mergeComplex 20 thrpt 10 151.512 ± 2.627 ops/ms [info] ORSetMergeBenchmark.mergeComplex 100 thrpt 10 40.976 ± 2.014 ops/ms * use subtype polymorphism for VersionVector tmp --- .../ddata/VersionVectorBenchmark.scala | 2 +- .../main/scala/akka/cluster/ddata/ORSet.scala | 102 +++++++--- .../akka/cluster/ddata/VersionVector.scala | 186 +++++++++++++++--- .../protobuf/ReplicatedDataSerializer.scala | 16 +- .../scala/akka/cluster/ddata/ORMapSpec.scala | 2 +- .../scala/akka/cluster/ddata/ORSetSpec.scala | 37 ++-- .../cluster/ddata/VersionVectorSpec.scala | 50 +++-- .../ReplicatedDataSerializerSpec.scala | 1 + project/MiMa.scala | 5 +- 9 files changed, 305 insertions(+), 96 deletions(-) diff --git a/akka-bench-jmh/src/main/scala/akka/cluster/ddata/VersionVectorBenchmark.scala b/akka-bench-jmh/src/main/scala/akka/cluster/ddata/VersionVectorBenchmark.scala index 915962014b..c8c341774f 100644 --- a/akka-bench-jmh/src/main/scala/akka/cluster/ddata/VersionVectorBenchmark.scala +++ b/akka-bench-jmh/src/main/scala/akka/cluster/ddata/VersionVectorBenchmark.scala @@ -51,7 +51,7 @@ class VersionVectorBenchmark { vv1 = (1 to size).foldLeft(VersionVector.empty)((vv, n) => vv + nextNode()) vv2 = vv1 + nextNode() vv3 = vv1 + nextNode() - dot1 = VersionVector(TreeMap(nodeA -> vv1.versions(nodeA))) + dot1 = VersionVector(nodeA, vv1.versionAt(nodeA)) } @Benchmark diff --git a/akka-distributed-data/src/main/scala/akka/cluster/ddata/ORSet.scala b/akka-distributed-data/src/main/scala/akka/cluster/ddata/ORSet.scala index 88bf262625..eb6e3d8988 100644 --- a/akka-distributed-data/src/main/scala/akka/cluster/ddata/ORSet.scala +++ b/akka-distributed-data/src/main/scala/akka/cluster/ddata/ORSet.scala @@ -54,20 +54,28 @@ object ORSet { remaining match { case Nil ⇒ acc case (d @ (node, v1)) :: rest ⇒ - vvector.versions.get(node) match { - case Some(v2) if v2 >= v1 ⇒ - // dot is dominated by version vector, drop it - dropDots(rest, acc) - case _ ⇒ - dropDots(rest, d :: acc) - } + val v2 = vvector.versionAt(node) + if (v2 >= v1) + // dot is dominated by version vector, drop it + dropDots(rest, acc) + else + dropDots(rest, d :: acc) } - if (dot.versions.isEmpty) + if (dot.isEmpty) VersionVector.empty else { - val newDots = dropDots(dot.versions.toList, Nil) - new VersionVector(versions = VersionVector.emptyVersions ++ newDots) + dot match { + case OneVersionVector(node, v1) ⇒ + // if dot is dominated by version vector, drop it + if (vvector.versionAt(node) >= v1) VersionVector.empty + else dot + + case ManyVersionVector(vs) ⇒ + val remaining = vs.toList + val newDots = dropDots(remaining, Nil) + VersionVector(newDots) + } } } @@ -82,24 +90,60 @@ object ORSet { commonKeys.foldLeft(Map.empty[A, ORSet.Dot]) { case (acc, k) ⇒ val lhsDots = lhs.elementsMap(k) - val lhsDotsVersions = lhsDots.versions - val rhsDotsVersions = rhs.elementsMap(k).versions - if (lhsDotsVersions.size == 1 && rhsDotsVersions.size == 1 && lhsDotsVersions.head == rhsDotsVersions.head) { - // one single common dot - acc.updated(k, lhsDots) - } else { - val commonDots = lhsDotsVersions.filter { - case (thisDotNode, v) ⇒ rhsDotsVersions.get(thisDotNode).exists(_ == v) - } - val commonDotsKeys = commonDots.keys - val lhsUniqueDots = lhsDotsVersions -- commonDotsKeys - val rhsUniqueDots = rhsDotsVersions -- commonDotsKeys - val lhsKeep = ORSet.subtractDots(new VersionVector(lhsUniqueDots), rhs.vvector) - val rhsKeep = ORSet.subtractDots(new VersionVector(rhsUniqueDots), lhs.vvector) - val merged = lhsKeep.merge(rhsKeep).merge(new VersionVector(versions = commonDots)) - // Perfectly possible that an item in both sets should be dropped - if (merged.versions.isEmpty) acc - else acc.updated(k, merged) + val rhsDots = rhs.elementsMap(k) + (lhsDots, rhsDots) match { + case (OneVersionVector(n1, v1), OneVersionVector(n2, v2)) ⇒ + if (n1 == n2 && v1 == v2) + // one single common dot + acc.updated(k, lhsDots) + else { + // no common, lhsUniqueDots == lhsDots, rhsUniqueDots == rhsDots + val lhsKeep = ORSet.subtractDots(lhsDots, rhs.vvector) + val rhsKeep = ORSet.subtractDots(rhsDots, lhs.vvector) + val merged = lhsKeep.merge(rhsKeep) + // Perfectly possible that an item in both sets should be dropped + if (merged.isEmpty) acc + else acc.updated(k, merged) + } + case (ManyVersionVector(lhsVs), ManyVersionVector(rhsVs)) ⇒ + val commonDots = lhsVs.filter { + case (thisDotNode, v) ⇒ rhsVs.get(thisDotNode).exists(_ == v) + } + val commonDotsKeys = commonDots.keys + val lhsUniqueDots = lhsVs -- commonDotsKeys + val rhsUniqueDots = rhsVs -- commonDotsKeys + val lhsKeep = ORSet.subtractDots(VersionVector(lhsUniqueDots), rhs.vvector) + val rhsKeep = ORSet.subtractDots(VersionVector(rhsUniqueDots), lhs.vvector) + val merged = lhsKeep.merge(rhsKeep).merge(VersionVector(commonDots)) + // Perfectly possible that an item in both sets should be dropped + if (merged.isEmpty) acc + else acc.updated(k, merged) + case (ManyVersionVector(lhsVs), OneVersionVector(n2, v2)) ⇒ + val commonDots = lhsVs.filter { + case (n1, v1) ⇒ v1 == v2 && n1 == n2 + } + val commonDotsKeys = commonDots.keys + val lhsUniqueDots = lhsVs -- commonDotsKeys + val rhsUnique = if (commonDotsKeys.isEmpty) rhsDots else VersionVector.empty + val lhsKeep = ORSet.subtractDots(VersionVector(lhsUniqueDots), rhs.vvector) + val rhsKeep = ORSet.subtractDots(rhsUnique, lhs.vvector) + val merged = lhsKeep.merge(rhsKeep).merge(VersionVector(commonDots)) + // Perfectly possible that an item in both sets should be dropped + if (merged.isEmpty) acc + else acc.updated(k, merged) + case (OneVersionVector(n1, v1), ManyVersionVector(rhsVs)) ⇒ + val commonDots = rhsVs.filter { + case (n2, v2) ⇒ v1 == v2 && n1 == n2 + } + val commonDotsKeys = commonDots.keys + val lhsUnique = if (commonDotsKeys.isEmpty) lhsDots else VersionVector.empty + val rhsUniqueDots = rhsVs -- commonDotsKeys + val lhsKeep = ORSet.subtractDots(lhsUnique, rhs.vvector) + val rhsKeep = ORSet.subtractDots(VersionVector(rhsUniqueDots), lhs.vvector) + val merged = lhsKeep.merge(rhsKeep).merge(VersionVector(commonDots)) + // Perfectly possible that an item in both sets should be dropped + if (merged.isEmpty) acc + else acc.updated(k, merged) } } } @@ -199,7 +243,7 @@ final class ORSet[A] private[akka] ( */ private[akka] def add(node: UniqueAddress, element: A): ORSet[A] = { val newVvector = vvector + node - val newDot = new VersionVector(versions = TreeMap(node -> newVvector.versions(node))) + val newDot = VersionVector(node, newVvector.versionAt(node)) assignAncestor(new ORSet(elementsMap = elementsMap.updated(element, newDot), vvector = newVvector)) } diff --git a/akka-distributed-data/src/main/scala/akka/cluster/ddata/VersionVector.scala b/akka-distributed-data/src/main/scala/akka/cluster/ddata/VersionVector.scala index 38cfddb79d..c3a21cfd19 100644 --- a/akka-distributed-data/src/main/scala/akka/cluster/ddata/VersionVector.scala +++ b/akka-distributed-data/src/main/scala/akka/cluster/ddata/VersionVector.scala @@ -3,25 +3,37 @@ */ package akka.cluster.ddata +import scala.collection.immutable import java.util.concurrent.atomic.AtomicLong - import scala.annotation.tailrec import scala.collection.immutable.TreeMap - import akka.cluster.Cluster import akka.cluster.UniqueAddress +import akka.cluster.UniqueAddress /** * VersionVector module with helper classes and methods. */ object VersionVector { - /** - * INTERNAL API - */ - private[akka] val emptyVersions: TreeMap[UniqueAddress, Long] = TreeMap.empty - val empty: VersionVector = new VersionVector(emptyVersions) + private val emptyVersions: TreeMap[UniqueAddress, Long] = TreeMap.empty + val empty: VersionVector = ManyVersionVector(emptyVersions) + def apply(): VersionVector = empty + + def apply(versions: TreeMap[UniqueAddress, Long]): VersionVector = + if (versions.isEmpty) empty + else if (versions.size == 1) apply(versions.head._1, versions.head._2) + else ManyVersionVector(versions) + + def apply(node: UniqueAddress, version: Long): VersionVector = OneVersionVector(node, version) + + /** INTERNAL API */ + private[akka] def apply(versions: List[(UniqueAddress, Long)]): VersionVector = + if (versions.isEmpty) empty + else if (versions.tail.isEmpty) apply(versions.head._1, versions.head._2) + else apply(emptyVersions ++ versions) + /** * Java API */ @@ -57,7 +69,8 @@ object VersionVector { */ def ConcurrentInstance = Concurrent - private object Timestamp { + /** INTERNAL API */ + private[akka] object Timestamp { final val Zero = 0L final val EndMarker = Long.MinValue val counter = new AtomicLong(1L) @@ -83,8 +96,7 @@ object VersionVector { * This class is immutable, i.e. "modifying" methods return a new instance. */ @SerialVersionUID(1L) -final case class VersionVector private[akka] ( - private[akka] val versions: TreeMap[UniqueAddress, Long]) +sealed abstract class VersionVector extends ReplicatedData with ReplicatedDataSerialization with RemovedNodePruning { type T = VersionVector @@ -107,12 +119,28 @@ final case class VersionVector private[akka] ( */ def increment(node: Cluster): VersionVector = increment(node.selfUniqueAddress) + def isEmpty: Boolean + + /** + * INTERNAL API + */ + private[akka] def size: Int + /** * INTERNAL API * Increment the version for the node passed as argument. Returns a new VersionVector. */ - private[akka] def increment(node: UniqueAddress): VersionVector = - copy(versions = versions.updated(node, Timestamp.counter.getAndIncrement())) + private[akka] def increment(node: UniqueAddress): VersionVector + + /** + * INTERNAL API + */ + private[akka] def versionAt(node: UniqueAddress): Long + + /** + * INTERNAL API + */ + private[akka] def contains(node: UniqueAddress): Boolean /** * Returns true if this and that are concurrent else false. @@ -187,10 +215,15 @@ final case class VersionVector private[akka] ( compareNext(nextOrElse(i1, cmpEndMarker), nextOrElse(i2, cmpEndMarker), Same) } - if ((this eq that) || (this.versions eq that.versions)) Same - else compare(this.versions.iterator, that.versions.iterator, if (order eq Concurrent) FullOrder else order) + if (this eq that) Same + else compare(this.versionsIterator, that.versionsIterator, if (order eq Concurrent) FullOrder else order) } + /** + * INTERNAL API + */ + private[akka] def versionsIterator: Iterator[(UniqueAddress, Long)] + /** * Compare two version vectors. The outcome will be one of the following: *

@@ -208,23 +241,128 @@ final case class VersionVector private[akka] ( /** * Merges this VersionVector with another VersionVector. E.g. merges its versioned history. */ - def merge(that: VersionVector): VersionVector = { - var mergedVersions = that.versions - for ((node, time) ← versions) { - val mergedVersionsCurrentTime = mergedVersions.getOrElse(node, Timestamp.Zero) - if (time > mergedVersionsCurrentTime) - mergedVersions = mergedVersions.updated(node, time) + def merge(that: VersionVector): VersionVector + + override def needPruningFrom(removedNode: UniqueAddress): Boolean + + override def prune(removedNode: UniqueAddress, collapseInto: UniqueAddress): VersionVector + + override def pruningCleanup(removedNode: UniqueAddress): VersionVector + +} + +final case class OneVersionVector private[akka] (node: UniqueAddress, version: Long) extends VersionVector { + import VersionVector.Timestamp + + override def isEmpty: Boolean = false + + /** INTERNAL API */ + private[akka] override def size: Int = 1 + + /** INTERNAL API */ + private[akka] override def increment(n: UniqueAddress): VersionVector = { + val v = Timestamp.counter.getAndIncrement() + if (n == node) copy(version = v) + else ManyVersionVector(TreeMap(node -> version, n -> v)) + } + + /** INTERNAL API */ + private[akka] override def versionAt(n: UniqueAddress): Long = + if (n == node) version + else Timestamp.Zero + + /** INTERNAL API */ + private[akka] override def contains(n: UniqueAddress): Boolean = + n == node + + /** INTERNAL API */ + private[akka] override def versionsIterator: Iterator[(UniqueAddress, Long)] = + Iterator.single((node, version)) + + override def merge(that: VersionVector): VersionVector = { + that match { + case OneVersionVector(n2, v2) ⇒ + if (node == n2) if (version >= v2) this else OneVersionVector(n2, v2) + else ManyVersionVector(TreeMap(node -> version, n2 -> v2)) + case ManyVersionVector(vs2) ⇒ + val v2 = vs2.getOrElse(node, Timestamp.Zero) + val mergedVersions = + if (v2 >= version) vs2 + else vs2.updated(node, version) + VersionVector(mergedVersions) + } + } + + override def needPruningFrom(removedNode: UniqueAddress): Boolean = + node == removedNode + + override def prune(removedNode: UniqueAddress, collapseInto: UniqueAddress): VersionVector = + (if (node == removedNode) VersionVector.empty else this) + collapseInto + + override def pruningCleanup(removedNode: UniqueAddress): VersionVector = + if (node == removedNode) VersionVector.empty else this + + override def toString: String = + s"VersionVector($node -> $version)" + +} + +final case class ManyVersionVector(versions: TreeMap[UniqueAddress, Long]) extends VersionVector { + import VersionVector.Timestamp + + override def isEmpty: Boolean = versions.isEmpty + + /** INTERNAL API */ + private[akka] override def size: Int = versions.size + + /** INTERNAL API */ + private[akka] override def increment(node: UniqueAddress): VersionVector = { + val v = Timestamp.counter.getAndIncrement() + VersionVector(versions.updated(node, v)) + } + + /** INTERNAL API */ + private[akka] override def versionAt(node: UniqueAddress): Long = versions.get(node) match { + case Some(v) ⇒ v + case None ⇒ Timestamp.Zero + } + + /** INTERNAL API */ + private[akka] override def contains(node: UniqueAddress): Boolean = + versions.contains(node) + + /** INTERNAL API */ + private[akka] override def versionsIterator: Iterator[(UniqueAddress, Long)] = + versions.iterator + + override def merge(that: VersionVector): VersionVector = { + that match { + case ManyVersionVector(vs2) ⇒ + var mergedVersions = vs2 + for ((node, time) ← versions) { + val mergedVersionsCurrentTime = mergedVersions.getOrElse(node, Timestamp.Zero) + if (time > mergedVersionsCurrentTime) + mergedVersions = mergedVersions.updated(node, time) + } + VersionVector(mergedVersions) + case OneVersionVector(n2, v2) ⇒ + val v1 = versions.getOrElse(n2, Timestamp.Zero) + val mergedVersions = + if (v1 >= v2) versions + else versions.updated(n2, v2) + VersionVector(mergedVersions) } - VersionVector(mergedVersions) } override def needPruningFrom(removedNode: UniqueAddress): Boolean = versions.contains(removedNode) override def prune(removedNode: UniqueAddress, collapseInto: UniqueAddress): VersionVector = - copy(versions = versions - removedNode) + collapseInto + VersionVector(versions = versions - removedNode) + collapseInto - override def pruningCleanup(removedNode: UniqueAddress): VersionVector = copy(versions = versions - removedNode) + override def pruningCleanup(removedNode: UniqueAddress): VersionVector = + VersionVector(versions = versions - removedNode) - override def toString = versions.map { case ((n, t)) ⇒ n + " -> " + t }.mkString("VersionVector(", ", ", ")") + override def toString: String = + versions.map { case ((n, v)) ⇒ n + " -> " + v }.mkString("VersionVector(", ", ", ")") } diff --git a/akka-distributed-data/src/main/scala/akka/cluster/ddata/protobuf/ReplicatedDataSerializer.scala b/akka-distributed-data/src/main/scala/akka/cluster/ddata/protobuf/ReplicatedDataSerializer.scala index 18ae04cdbc..17a8928cfa 100644 --- a/akka-distributed-data/src/main/scala/akka/cluster/ddata/protobuf/ReplicatedDataSerializer.scala +++ b/akka-distributed-data/src/main/scala/akka/cluster/ddata/protobuf/ReplicatedDataSerializer.scala @@ -20,6 +20,8 @@ import akka.serialization.SerializerWithStringManifest import akka.serialization.BaseSerializer import akka.protobuf.ByteString import akka.util.ByteString.UTF_8 +import scala.collection.immutable.TreeMap +import akka.cluster.UniqueAddress /** * Protobuf serializer of ReplicatedData. @@ -303,7 +305,7 @@ class ReplicatedDataSerializer(val system: ExtendedActorSystem) def versionVectorToProto(versionVector: VersionVector): rd.VersionVector = { val b = rd.VersionVector.newBuilder() - versionVector.versions.foreach { + versionVector.versionsIterator.foreach { case (node, value) ⇒ b.addEntries(rd.VersionVector.Entry.newBuilder(). setNode(uniqueAddressToProto(node)).setVersion(value)) } @@ -314,8 +316,16 @@ class ReplicatedDataSerializer(val system: ExtendedActorSystem) versionVectorFromProto(rd.VersionVector.parseFrom(bytes)) def versionVectorFromProto(versionVector: rd.VersionVector): VersionVector = { - VersionVector(versions = versionVector.getEntriesList.asScala.map(entry ⇒ - uniqueAddressFromProto(entry.getNode) -> entry.getVersion)(breakOut)) + val entries = versionVector.getEntriesList + if (entries.isEmpty) + VersionVector.empty + else if (entries.size == 1) + VersionVector(uniqueAddressFromProto(entries.get(0).getNode), entries.get(0).getVersion) + else { + val versions: TreeMap[UniqueAddress, Long] = versionVector.getEntriesList.asScala.map(entry ⇒ + uniqueAddressFromProto(entry.getNode) -> entry.getVersion)(breakOut) + VersionVector(versions) + } } def ormapToProto(ormap: ORMap[_]): rd.ORMap = { diff --git a/akka-distributed-data/src/test/scala/akka/cluster/ddata/ORMapSpec.scala b/akka-distributed-data/src/test/scala/akka/cluster/ddata/ORMapSpec.scala index 2a6b46ee93..00127efc94 100644 --- a/akka-distributed-data/src/test/scala/akka/cluster/ddata/ORMapSpec.scala +++ b/akka-distributed-data/src/test/scala/akka/cluster/ddata/ORMapSpec.scala @@ -101,7 +101,7 @@ class ORMapSpec extends WordSpec with Matchers { val m3 = merged1.remove(node1, "b").put(node1, "b", GSet.empty + "B2") // same thing if only put is used - // val m3 = merged1.put(node1, "b", GSet() + "B2") + // val m3 = merged1.put(node1, "b", GSet.empty + "B2") val merged2 = merged1 merge m3 merged2.entries("a").elements should be(Set("A")) diff --git a/akka-distributed-data/src/test/scala/akka/cluster/ddata/ORSetSpec.scala b/akka-distributed-data/src/test/scala/akka/cluster/ddata/ORSetSpec.scala index dabb736602..620b90f7e7 100644 --- a/akka-distributed-data/src/test/scala/akka/cluster/ddata/ORSetSpec.scala +++ b/akka-distributed-data/src/test/scala/akka/cluster/ddata/ORSetSpec.scala @@ -12,6 +12,7 @@ import akka.cluster.ddata.Replicator.Changed import org.scalatest.Matchers import org.scalatest.WordSpec +@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) class ORSetSpec extends WordSpec with Matchers { val node1 = UniqueAddress(Address("akka.tcp", "Sys", "localhost", 2551), 1) @@ -228,30 +229,30 @@ class ORSetSpec extends WordSpec with Matchers { "ORSet unit test" must { "verify subtractDots" in { - val dot = new VersionVector(TreeMap(nodeA -> 3, nodeB -> 2, nodeD -> 14, nodeG -> 22)) - val vvector = new VersionVector(TreeMap(nodeA -> 4, nodeB -> 1, nodeC -> 1, nodeD -> 14, nodeE -> 5, nodeF -> 2)) - val expected = new VersionVector(TreeMap(nodeB -> 2, nodeG -> 22)) + val dot = VersionVector(TreeMap(nodeA -> 3L, nodeB -> 2L, nodeD -> 14L, nodeG -> 22L)) + val vvector = VersionVector(TreeMap(nodeA -> 4L, nodeB -> 1L, nodeC -> 1L, nodeD -> 14L, nodeE -> 5L, nodeF -> 2L)) + val expected = VersionVector(TreeMap(nodeB -> 2L, nodeG -> 22L)) ORSet.subtractDots(dot, vvector) should be(expected) } "verify mergeCommonKeys" in { val commonKeys: Set[String] = Set("K1", "K2") - val thisDot1 = new VersionVector(TreeMap(nodeA -> 3, nodeD -> 7)) - val thisDot2 = new VersionVector(TreeMap(nodeB -> 5, nodeC -> 2)) - val thisVvector = new VersionVector(TreeMap(nodeA -> 3, nodeB -> 5, nodeC -> 2, nodeD -> 7)) + val thisDot1 = VersionVector(TreeMap(nodeA -> 3L, nodeD -> 7L)) + val thisDot2 = VersionVector(TreeMap(nodeB -> 5L, nodeC -> 2L)) + val thisVvector = VersionVector(TreeMap(nodeA -> 3L, nodeB -> 5L, nodeC -> 2L, nodeD -> 7L)) val thisSet = new ORSet( elementsMap = Map("K1" -> thisDot1, "K2" -> thisDot2), vvector = thisVvector) - val thatDot1 = new VersionVector(TreeMap(nodeA -> 3)) - val thatDot2 = new VersionVector(TreeMap(nodeB -> 6)) - val thatVvector = new VersionVector(TreeMap(nodeA -> 3, nodeB -> 6, nodeC -> 1, nodeD -> 8)) + val thatDot1 = VersionVector(nodeA, 3L) + val thatDot2 = VersionVector(nodeB, 6L) + val thatVvector = VersionVector(TreeMap(nodeA -> 3L, nodeB -> 6L, nodeC -> 1L, nodeD -> 8L)) val thatSet = new ORSet( elementsMap = Map("K1" -> thatDot1, "K2" -> thatDot2), vvector = thatVvector) val expectedDots = Map( - "K1" -> new VersionVector(TreeMap(nodeA -> 3)), - "K2" -> new VersionVector(TreeMap(nodeB -> 6, nodeC -> 2))) + "K1" -> VersionVector(nodeA, 3L), + "K2" -> VersionVector(TreeMap(nodeB -> 6L, nodeC -> 2L))) ORSet.mergeCommonKeys(commonKeys, thisSet, thatSet) should be(expectedDots) } @@ -259,14 +260,14 @@ class ORSetSpec extends WordSpec with Matchers { "verify mergeDisjointKeys" in { val keys: Set[Any] = Set("K3", "K4", "K5") val elements: Map[Any, VersionVector] = Map( - "K3" -> new VersionVector(TreeMap(nodeA -> 4)), - "K4" -> new VersionVector(TreeMap(nodeA -> 3, nodeD -> 8)), - "K5" -> new VersionVector(TreeMap(nodeA -> 2))) - val vvector = new VersionVector(TreeMap(nodeA -> 3, nodeD -> 7)) - val acc: Map[Any, VersionVector] = Map("K1" -> new VersionVector(TreeMap(nodeA -> 3))) + "K3" -> VersionVector(nodeA, 4L), + "K4" -> VersionVector(TreeMap(nodeA -> 3L, nodeD -> 8L)), + "K5" -> VersionVector(nodeA, 2L)) + val vvector = VersionVector(TreeMap(nodeA -> 3L, nodeD -> 7L)) + val acc: Map[Any, VersionVector] = Map("K1" -> VersionVector(nodeA, 3L)) val expectedDots = acc ++ Map( - "K3" -> new VersionVector(TreeMap(nodeA -> 4)), - "K4" -> new VersionVector(TreeMap(nodeD -> 8))) // "a" -> 3 removed, optimized to include only those unseen + "K3" -> VersionVector(nodeA, 4L), + "K4" -> VersionVector(nodeD, 8L)) // "a" -> 3 removed, optimized to include only those unseen ORSet.mergeDisjointKeys(keys, elements, vvector, acc) should be(expectedDots) } diff --git a/akka-distributed-data/src/test/scala/akka/cluster/ddata/VersionVectorSpec.scala b/akka-distributed-data/src/test/scala/akka/cluster/ddata/VersionVectorSpec.scala index 0f83fef8c5..918a8e5019 100644 --- a/akka-distributed-data/src/test/scala/akka/cluster/ddata/VersionVectorSpec.scala +++ b/akka-distributed-data/src/test/scala/akka/cluster/ddata/VersionVectorSpec.scala @@ -28,7 +28,7 @@ class VersionVectorSpec extends TestKit(ActorSystem("VersionVectorSpec")) "have zero versions when created" in { val vv = VersionVector() - vv.versions should be(Map()) + vv.size should be(0) } "not happen before itself" in { @@ -38,6 +38,18 @@ class VersionVectorSpec extends TestKit(ActorSystem("VersionVectorSpec")) vv1 <> vv2 should be(false) } + "increment correctly" in { + val vv1 = VersionVector() + val vv2 = vv1 + node1 + vv2.versionAt(node1) should be > vv1.versionAt(node1) + val vv3 = vv2 + node1 + vv3.versionAt(node1) should be > vv2.versionAt(node1) + + val vv4 = vv3 + node2 + vv4.versionAt(node1) should be(vv3.versionAt(node1)) + vv4.versionAt(node2) should be > vv3.versionAt(node2) + } + "pass misc comparison test 1" in { val vv1_1 = VersionVector() val vv2_1 = vv1_1 + node1 @@ -159,16 +171,16 @@ class VersionVectorSpec extends TestKit(ActorSystem("VersionVectorSpec")) val vv3_2 = vv2_2 + node2 val merged1 = vv3_2 merge vv5_1 - merged1.versions.size should be(3) - merged1.versions.contains(node1) should be(true) - merged1.versions.contains(node2) should be(true) - merged1.versions.contains(node3) should be(true) + merged1.size should be(3) + merged1.contains(node1) should be(true) + merged1.contains(node2) should be(true) + merged1.contains(node3) should be(true) val merged2 = vv5_1 merge vv3_2 - merged2.versions.size should be(3) - merged2.versions.contains(node1) should be(true) - merged2.versions.contains(node2) should be(true) - merged2.versions.contains(node3) should be(true) + merged2.size should be(3) + merged2.contains(node1) should be(true) + merged2.contains(node2) should be(true) + merged2.contains(node3) should be(true) vv3_2 < merged1 should be(true) vv5_1 < merged1 should be(true) @@ -192,18 +204,18 @@ class VersionVectorSpec extends TestKit(ActorSystem("VersionVectorSpec")) val vv3_2 = vv2_2 + node4 val merged1 = vv3_2 merge vv5_1 - merged1.versions.size should be(4) - merged1.versions.contains(node1) should be(true) - merged1.versions.contains(node2) should be(true) - merged1.versions.contains(node3) should be(true) - merged1.versions.contains(node4) should be(true) + merged1.size should be(4) + merged1.contains(node1) should be(true) + merged1.contains(node2) should be(true) + merged1.contains(node3) should be(true) + merged1.contains(node4) should be(true) val merged2 = vv5_1 merge vv3_2 - merged2.versions.size should be(4) - merged2.versions.contains(node1) should be(true) - merged2.versions.contains(node2) should be(true) - merged2.versions.contains(node3) should be(true) - merged2.versions.contains(node4) should be(true) + merged2.size should be(4) + merged2.contains(node1) should be(true) + merged2.contains(node2) should be(true) + merged2.contains(node3) should be(true) + merged2.contains(node4) should be(true) vv3_2 < merged1 should be(true) vv5_1 < merged1 should be(true) diff --git a/akka-distributed-data/src/test/scala/akka/cluster/ddata/protobuf/ReplicatedDataSerializerSpec.scala b/akka-distributed-data/src/test/scala/akka/cluster/ddata/protobuf/ReplicatedDataSerializerSpec.scala index 6fbbd4f288..c5d667b5aa 100644 --- a/akka-distributed-data/src/test/scala/akka/cluster/ddata/protobuf/ReplicatedDataSerializerSpec.scala +++ b/akka-distributed-data/src/test/scala/akka/cluster/ddata/protobuf/ReplicatedDataSerializerSpec.scala @@ -86,6 +86,7 @@ class ReplicatedDataSerializerSpec extends TestKit(ActorSystem("ReplicatedDataSe val s1 = ORSet().add(address1, "a").add(address2, "b") val s2 = ORSet().add(address2, "b").add(address1, "a") + checkSameContent(s1.merge(s2), s2.merge(s1)) val s3 = ORSet().add(address1, "a").add(address2, 17).remove(address3, 17) diff --git a/project/MiMa.scala b/project/MiMa.scala index a552a6a433..8bd2449604 100644 --- a/project/MiMa.scala +++ b/project/MiMa.scala @@ -577,7 +577,10 @@ object MiMa extends AutoPlugin { ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.remote.EndpointManager.pruneTimerCancellable"), // #18722 internal changes to actor - FilterAnyProblem("akka.cluster.sharding.DDataShardCoordinator") + FilterAnyProblem("akka.cluster.sharding.DDataShardCoordinator"), + + // #18328 optimize VersionVector for size 1 + FilterAnyProblem("akka.cluster.ddata.VersionVector") ) ) }