!cdd #18328 optimize VersionVector for size 1 (typical dots)
AFTER: [info] Benchmark (set1Size) Mode Cnt Score Error Units [info] ORSetMergeBenchmark.mergeAddFromBothNodes 1 thrpt 10 2007.939 ± 74.673 ops/ms [info] ORSetMergeBenchmark.mergeAddFromBothNodes 10 thrpt 10 337.110 ± 15.055 ops/ms [info] ORSetMergeBenchmark.mergeAddFromBothNodes 20 thrpt 10 223.600 ± 8.403 ops/ms [info] ORSetMergeBenchmark.mergeAddFromBothNodes 100 thrpt 10 46.697 ± 2.136 ops/ms [info] ORSetMergeBenchmark.mergeAddFromOtherNode 1 thrpt 10 2542.537 ± 120.697 ops/ms [info] ORSetMergeBenchmark.mergeAddFromOtherNode 10 thrpt 10 365.694 ± 17.571 ops/ms [info] ORSetMergeBenchmark.mergeAddFromOtherNode 20 thrpt 10 216.323 ± 9.446 ops/ms [info] ORSetMergeBenchmark.mergeAddFromOtherNode 100 thrpt 10 49.563 ± 2.725 ops/ms [info] ORSetMergeBenchmark.mergeAddFromSameNode 1 thrpt 10 9883.186 ± 725.672 ops/ms [info] ORSetMergeBenchmark.mergeAddFromSameNode 10 thrpt 10 3266.528 ± 189.993 ops/ms [info] ORSetMergeBenchmark.mergeAddFromSameNode 20 thrpt 10 3206.017 ± 124.623 ops/ms [info] ORSetMergeBenchmark.mergeAddFromSameNode 100 thrpt 10 2709.031 ± 162.182 ops/ms [info] ORSetMergeBenchmark.mergeComplex 1 thrpt 10 572.704 ± 21.504 ops/ms [info] ORSetMergeBenchmark.mergeComplex 10 thrpt 10 249.226 ± 12.324 ops/ms [info] ORSetMergeBenchmark.mergeComplex 20 thrpt 10 170.560 ± 10.320 ops/ms [info] ORSetMergeBenchmark.mergeComplex 100 thrpt 10 46.373 ± 1.800 ops/ms BEFORE: [info] Benchmark (set1Size) Mode Cnt Score Error Units [info] ORSetMergeBenchmark.mergeAddFromBothNodes 1 thrpt 10 885.664 ± 99.718 ops/ms [info] ORSetMergeBenchmark.mergeAddFromBothNodes 10 thrpt 10 304.617 ± 4.755 ops/ms [info] ORSetMergeBenchmark.mergeAddFromBothNodes 20 thrpt 10 200.977 ± 3.708 ops/ms [info] ORSetMergeBenchmark.mergeAddFromBothNodes 100 thrpt 10 47.879 ± 4.352 ops/ms [info] ORSetMergeBenchmark.mergeAddFromOtherNode 1 thrpt 10 1586.848 ± 27.476 ops/ms [info] ORSetMergeBenchmark.mergeAddFromOtherNode 10 thrpt 10 354.408 ± 4.772 ops/ms [info] ORSetMergeBenchmark.mergeAddFromOtherNode 20 thrpt 10 210.563 ± 32.914 ops/ms [info] ORSetMergeBenchmark.mergeAddFromOtherNode 100 thrpt 10 52.750 ± 0.698 ops/ms [info] ORSetMergeBenchmark.mergeAddFromSameNode 1 thrpt 10 3915.817 ± 420.643 ops/ms [info] ORSetMergeBenchmark.mergeAddFromSameNode 10 thrpt 10 2369.476 ± 250.336 ops/ms [info] ORSetMergeBenchmark.mergeAddFromSameNode 20 thrpt 10 2378.924 ± 47.160 ops/ms [info] ORSetMergeBenchmark.mergeAddFromSameNode 100 thrpt 10 2167.841 ± 20.339 ops/ms [info] ORSetMergeBenchmark.mergeComplex 1 thrpt 10 387.261 ± 8.820 ops/ms [info] ORSetMergeBenchmark.mergeComplex 10 thrpt 10 212.661 ± 4.802 ops/ms [info] ORSetMergeBenchmark.mergeComplex 20 thrpt 10 151.512 ± 2.627 ops/ms [info] ORSetMergeBenchmark.mergeComplex 100 thrpt 10 40.976 ± 2.014 ops/ms * use subtype polymorphism for VersionVector tmp
This commit is contained in:
parent
94294c74a7
commit
c11b600cc1
9 changed files with 305 additions and 96 deletions
|
|
@ -54,20 +54,28 @@ object ORSet {
|
|||
remaining match {
|
||||
case Nil ⇒ acc
|
||||
case (d @ (node, v1)) :: rest ⇒
|
||||
vvector.versions.get(node) match {
|
||||
case Some(v2) if v2 >= v1 ⇒
|
||||
// dot is dominated by version vector, drop it
|
||||
dropDots(rest, acc)
|
||||
case _ ⇒
|
||||
dropDots(rest, d :: acc)
|
||||
}
|
||||
val v2 = vvector.versionAt(node)
|
||||
if (v2 >= v1)
|
||||
// dot is dominated by version vector, drop it
|
||||
dropDots(rest, acc)
|
||||
else
|
||||
dropDots(rest, d :: acc)
|
||||
}
|
||||
|
||||
if (dot.versions.isEmpty)
|
||||
if (dot.isEmpty)
|
||||
VersionVector.empty
|
||||
else {
|
||||
val newDots = dropDots(dot.versions.toList, Nil)
|
||||
new VersionVector(versions = VersionVector.emptyVersions ++ newDots)
|
||||
dot match {
|
||||
case OneVersionVector(node, v1) ⇒
|
||||
// if dot is dominated by version vector, drop it
|
||||
if (vvector.versionAt(node) >= v1) VersionVector.empty
|
||||
else dot
|
||||
|
||||
case ManyVersionVector(vs) ⇒
|
||||
val remaining = vs.toList
|
||||
val newDots = dropDots(remaining, Nil)
|
||||
VersionVector(newDots)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -82,24 +90,60 @@ object ORSet {
|
|||
commonKeys.foldLeft(Map.empty[A, ORSet.Dot]) {
|
||||
case (acc, k) ⇒
|
||||
val lhsDots = lhs.elementsMap(k)
|
||||
val lhsDotsVersions = lhsDots.versions
|
||||
val rhsDotsVersions = rhs.elementsMap(k).versions
|
||||
if (lhsDotsVersions.size == 1 && rhsDotsVersions.size == 1 && lhsDotsVersions.head == rhsDotsVersions.head) {
|
||||
// one single common dot
|
||||
acc.updated(k, lhsDots)
|
||||
} else {
|
||||
val commonDots = lhsDotsVersions.filter {
|
||||
case (thisDotNode, v) ⇒ rhsDotsVersions.get(thisDotNode).exists(_ == v)
|
||||
}
|
||||
val commonDotsKeys = commonDots.keys
|
||||
val lhsUniqueDots = lhsDotsVersions -- commonDotsKeys
|
||||
val rhsUniqueDots = rhsDotsVersions -- commonDotsKeys
|
||||
val lhsKeep = ORSet.subtractDots(new VersionVector(lhsUniqueDots), rhs.vvector)
|
||||
val rhsKeep = ORSet.subtractDots(new VersionVector(rhsUniqueDots), lhs.vvector)
|
||||
val merged = lhsKeep.merge(rhsKeep).merge(new VersionVector(versions = commonDots))
|
||||
// Perfectly possible that an item in both sets should be dropped
|
||||
if (merged.versions.isEmpty) acc
|
||||
else acc.updated(k, merged)
|
||||
val rhsDots = rhs.elementsMap(k)
|
||||
(lhsDots, rhsDots) match {
|
||||
case (OneVersionVector(n1, v1), OneVersionVector(n2, v2)) ⇒
|
||||
if (n1 == n2 && v1 == v2)
|
||||
// one single common dot
|
||||
acc.updated(k, lhsDots)
|
||||
else {
|
||||
// no common, lhsUniqueDots == lhsDots, rhsUniqueDots == rhsDots
|
||||
val lhsKeep = ORSet.subtractDots(lhsDots, rhs.vvector)
|
||||
val rhsKeep = ORSet.subtractDots(rhsDots, lhs.vvector)
|
||||
val merged = lhsKeep.merge(rhsKeep)
|
||||
// Perfectly possible that an item in both sets should be dropped
|
||||
if (merged.isEmpty) acc
|
||||
else acc.updated(k, merged)
|
||||
}
|
||||
case (ManyVersionVector(lhsVs), ManyVersionVector(rhsVs)) ⇒
|
||||
val commonDots = lhsVs.filter {
|
||||
case (thisDotNode, v) ⇒ rhsVs.get(thisDotNode).exists(_ == v)
|
||||
}
|
||||
val commonDotsKeys = commonDots.keys
|
||||
val lhsUniqueDots = lhsVs -- commonDotsKeys
|
||||
val rhsUniqueDots = rhsVs -- commonDotsKeys
|
||||
val lhsKeep = ORSet.subtractDots(VersionVector(lhsUniqueDots), rhs.vvector)
|
||||
val rhsKeep = ORSet.subtractDots(VersionVector(rhsUniqueDots), lhs.vvector)
|
||||
val merged = lhsKeep.merge(rhsKeep).merge(VersionVector(commonDots))
|
||||
// Perfectly possible that an item in both sets should be dropped
|
||||
if (merged.isEmpty) acc
|
||||
else acc.updated(k, merged)
|
||||
case (ManyVersionVector(lhsVs), OneVersionVector(n2, v2)) ⇒
|
||||
val commonDots = lhsVs.filter {
|
||||
case (n1, v1) ⇒ v1 == v2 && n1 == n2
|
||||
}
|
||||
val commonDotsKeys = commonDots.keys
|
||||
val lhsUniqueDots = lhsVs -- commonDotsKeys
|
||||
val rhsUnique = if (commonDotsKeys.isEmpty) rhsDots else VersionVector.empty
|
||||
val lhsKeep = ORSet.subtractDots(VersionVector(lhsUniqueDots), rhs.vvector)
|
||||
val rhsKeep = ORSet.subtractDots(rhsUnique, lhs.vvector)
|
||||
val merged = lhsKeep.merge(rhsKeep).merge(VersionVector(commonDots))
|
||||
// Perfectly possible that an item in both sets should be dropped
|
||||
if (merged.isEmpty) acc
|
||||
else acc.updated(k, merged)
|
||||
case (OneVersionVector(n1, v1), ManyVersionVector(rhsVs)) ⇒
|
||||
val commonDots = rhsVs.filter {
|
||||
case (n2, v2) ⇒ v1 == v2 && n1 == n2
|
||||
}
|
||||
val commonDotsKeys = commonDots.keys
|
||||
val lhsUnique = if (commonDotsKeys.isEmpty) lhsDots else VersionVector.empty
|
||||
val rhsUniqueDots = rhsVs -- commonDotsKeys
|
||||
val lhsKeep = ORSet.subtractDots(lhsUnique, rhs.vvector)
|
||||
val rhsKeep = ORSet.subtractDots(VersionVector(rhsUniqueDots), lhs.vvector)
|
||||
val merged = lhsKeep.merge(rhsKeep).merge(VersionVector(commonDots))
|
||||
// Perfectly possible that an item in both sets should be dropped
|
||||
if (merged.isEmpty) acc
|
||||
else acc.updated(k, merged)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -199,7 +243,7 @@ final class ORSet[A] private[akka] (
|
|||
*/
|
||||
private[akka] def add(node: UniqueAddress, element: A): ORSet[A] = {
|
||||
val newVvector = vvector + node
|
||||
val newDot = new VersionVector(versions = TreeMap(node -> newVvector.versions(node)))
|
||||
val newDot = VersionVector(node, newVvector.versionAt(node))
|
||||
assignAncestor(new ORSet(elementsMap = elementsMap.updated(element, newDot), vvector = newVvector))
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -3,25 +3,37 @@
|
|||
*/
|
||||
package akka.cluster.ddata
|
||||
|
||||
import scala.collection.immutable
|
||||
import java.util.concurrent.atomic.AtomicLong
|
||||
|
||||
import scala.annotation.tailrec
|
||||
import scala.collection.immutable.TreeMap
|
||||
|
||||
import akka.cluster.Cluster
|
||||
import akka.cluster.UniqueAddress
|
||||
import akka.cluster.UniqueAddress
|
||||
|
||||
/**
|
||||
* VersionVector module with helper classes and methods.
|
||||
*/
|
||||
object VersionVector {
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
*/
|
||||
private[akka] val emptyVersions: TreeMap[UniqueAddress, Long] = TreeMap.empty
|
||||
val empty: VersionVector = new VersionVector(emptyVersions)
|
||||
private val emptyVersions: TreeMap[UniqueAddress, Long] = TreeMap.empty
|
||||
val empty: VersionVector = ManyVersionVector(emptyVersions)
|
||||
|
||||
def apply(): VersionVector = empty
|
||||
|
||||
def apply(versions: TreeMap[UniqueAddress, Long]): VersionVector =
|
||||
if (versions.isEmpty) empty
|
||||
else if (versions.size == 1) apply(versions.head._1, versions.head._2)
|
||||
else ManyVersionVector(versions)
|
||||
|
||||
def apply(node: UniqueAddress, version: Long): VersionVector = OneVersionVector(node, version)
|
||||
|
||||
/** INTERNAL API */
|
||||
private[akka] def apply(versions: List[(UniqueAddress, Long)]): VersionVector =
|
||||
if (versions.isEmpty) empty
|
||||
else if (versions.tail.isEmpty) apply(versions.head._1, versions.head._2)
|
||||
else apply(emptyVersions ++ versions)
|
||||
|
||||
/**
|
||||
* Java API
|
||||
*/
|
||||
|
|
@ -57,7 +69,8 @@ object VersionVector {
|
|||
*/
|
||||
def ConcurrentInstance = Concurrent
|
||||
|
||||
private object Timestamp {
|
||||
/** INTERNAL API */
|
||||
private[akka] object Timestamp {
|
||||
final val Zero = 0L
|
||||
final val EndMarker = Long.MinValue
|
||||
val counter = new AtomicLong(1L)
|
||||
|
|
@ -83,8 +96,7 @@ object VersionVector {
|
|||
* This class is immutable, i.e. "modifying" methods return a new instance.
|
||||
*/
|
||||
@SerialVersionUID(1L)
|
||||
final case class VersionVector private[akka] (
|
||||
private[akka] val versions: TreeMap[UniqueAddress, Long])
|
||||
sealed abstract class VersionVector
|
||||
extends ReplicatedData with ReplicatedDataSerialization with RemovedNodePruning {
|
||||
|
||||
type T = VersionVector
|
||||
|
|
@ -107,12 +119,28 @@ final case class VersionVector private[akka] (
|
|||
*/
|
||||
def increment(node: Cluster): VersionVector = increment(node.selfUniqueAddress)
|
||||
|
||||
def isEmpty: Boolean
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
*/
|
||||
private[akka] def size: Int
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
* Increment the version for the node passed as argument. Returns a new VersionVector.
|
||||
*/
|
||||
private[akka] def increment(node: UniqueAddress): VersionVector =
|
||||
copy(versions = versions.updated(node, Timestamp.counter.getAndIncrement()))
|
||||
private[akka] def increment(node: UniqueAddress): VersionVector
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
*/
|
||||
private[akka] def versionAt(node: UniqueAddress): Long
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
*/
|
||||
private[akka] def contains(node: UniqueAddress): Boolean
|
||||
|
||||
/**
|
||||
* Returns true if <code>this</code> and <code>that</code> are concurrent else false.
|
||||
|
|
@ -187,10 +215,15 @@ final case class VersionVector private[akka] (
|
|||
compareNext(nextOrElse(i1, cmpEndMarker), nextOrElse(i2, cmpEndMarker), Same)
|
||||
}
|
||||
|
||||
if ((this eq that) || (this.versions eq that.versions)) Same
|
||||
else compare(this.versions.iterator, that.versions.iterator, if (order eq Concurrent) FullOrder else order)
|
||||
if (this eq that) Same
|
||||
else compare(this.versionsIterator, that.versionsIterator, if (order eq Concurrent) FullOrder else order)
|
||||
}
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
*/
|
||||
private[akka] def versionsIterator: Iterator[(UniqueAddress, Long)]
|
||||
|
||||
/**
|
||||
* Compare two version vectors. The outcome will be one of the following:
|
||||
* <p/>
|
||||
|
|
@ -208,23 +241,128 @@ final case class VersionVector private[akka] (
|
|||
/**
|
||||
* Merges this VersionVector with another VersionVector. E.g. merges its versioned history.
|
||||
*/
|
||||
def merge(that: VersionVector): VersionVector = {
|
||||
var mergedVersions = that.versions
|
||||
for ((node, time) ← versions) {
|
||||
val mergedVersionsCurrentTime = mergedVersions.getOrElse(node, Timestamp.Zero)
|
||||
if (time > mergedVersionsCurrentTime)
|
||||
mergedVersions = mergedVersions.updated(node, time)
|
||||
def merge(that: VersionVector): VersionVector
|
||||
|
||||
override def needPruningFrom(removedNode: UniqueAddress): Boolean
|
||||
|
||||
override def prune(removedNode: UniqueAddress, collapseInto: UniqueAddress): VersionVector
|
||||
|
||||
override def pruningCleanup(removedNode: UniqueAddress): VersionVector
|
||||
|
||||
}
|
||||
|
||||
final case class OneVersionVector private[akka] (node: UniqueAddress, version: Long) extends VersionVector {
|
||||
import VersionVector.Timestamp
|
||||
|
||||
override def isEmpty: Boolean = false
|
||||
|
||||
/** INTERNAL API */
|
||||
private[akka] override def size: Int = 1
|
||||
|
||||
/** INTERNAL API */
|
||||
private[akka] override def increment(n: UniqueAddress): VersionVector = {
|
||||
val v = Timestamp.counter.getAndIncrement()
|
||||
if (n == node) copy(version = v)
|
||||
else ManyVersionVector(TreeMap(node -> version, n -> v))
|
||||
}
|
||||
|
||||
/** INTERNAL API */
|
||||
private[akka] override def versionAt(n: UniqueAddress): Long =
|
||||
if (n == node) version
|
||||
else Timestamp.Zero
|
||||
|
||||
/** INTERNAL API */
|
||||
private[akka] override def contains(n: UniqueAddress): Boolean =
|
||||
n == node
|
||||
|
||||
/** INTERNAL API */
|
||||
private[akka] override def versionsIterator: Iterator[(UniqueAddress, Long)] =
|
||||
Iterator.single((node, version))
|
||||
|
||||
override def merge(that: VersionVector): VersionVector = {
|
||||
that match {
|
||||
case OneVersionVector(n2, v2) ⇒
|
||||
if (node == n2) if (version >= v2) this else OneVersionVector(n2, v2)
|
||||
else ManyVersionVector(TreeMap(node -> version, n2 -> v2))
|
||||
case ManyVersionVector(vs2) ⇒
|
||||
val v2 = vs2.getOrElse(node, Timestamp.Zero)
|
||||
val mergedVersions =
|
||||
if (v2 >= version) vs2
|
||||
else vs2.updated(node, version)
|
||||
VersionVector(mergedVersions)
|
||||
}
|
||||
}
|
||||
|
||||
override def needPruningFrom(removedNode: UniqueAddress): Boolean =
|
||||
node == removedNode
|
||||
|
||||
override def prune(removedNode: UniqueAddress, collapseInto: UniqueAddress): VersionVector =
|
||||
(if (node == removedNode) VersionVector.empty else this) + collapseInto
|
||||
|
||||
override def pruningCleanup(removedNode: UniqueAddress): VersionVector =
|
||||
if (node == removedNode) VersionVector.empty else this
|
||||
|
||||
override def toString: String =
|
||||
s"VersionVector($node -> $version)"
|
||||
|
||||
}
|
||||
|
||||
final case class ManyVersionVector(versions: TreeMap[UniqueAddress, Long]) extends VersionVector {
|
||||
import VersionVector.Timestamp
|
||||
|
||||
override def isEmpty: Boolean = versions.isEmpty
|
||||
|
||||
/** INTERNAL API */
|
||||
private[akka] override def size: Int = versions.size
|
||||
|
||||
/** INTERNAL API */
|
||||
private[akka] override def increment(node: UniqueAddress): VersionVector = {
|
||||
val v = Timestamp.counter.getAndIncrement()
|
||||
VersionVector(versions.updated(node, v))
|
||||
}
|
||||
|
||||
/** INTERNAL API */
|
||||
private[akka] override def versionAt(node: UniqueAddress): Long = versions.get(node) match {
|
||||
case Some(v) ⇒ v
|
||||
case None ⇒ Timestamp.Zero
|
||||
}
|
||||
|
||||
/** INTERNAL API */
|
||||
private[akka] override def contains(node: UniqueAddress): Boolean =
|
||||
versions.contains(node)
|
||||
|
||||
/** INTERNAL API */
|
||||
private[akka] override def versionsIterator: Iterator[(UniqueAddress, Long)] =
|
||||
versions.iterator
|
||||
|
||||
override def merge(that: VersionVector): VersionVector = {
|
||||
that match {
|
||||
case ManyVersionVector(vs2) ⇒
|
||||
var mergedVersions = vs2
|
||||
for ((node, time) ← versions) {
|
||||
val mergedVersionsCurrentTime = mergedVersions.getOrElse(node, Timestamp.Zero)
|
||||
if (time > mergedVersionsCurrentTime)
|
||||
mergedVersions = mergedVersions.updated(node, time)
|
||||
}
|
||||
VersionVector(mergedVersions)
|
||||
case OneVersionVector(n2, v2) ⇒
|
||||
val v1 = versions.getOrElse(n2, Timestamp.Zero)
|
||||
val mergedVersions =
|
||||
if (v1 >= v2) versions
|
||||
else versions.updated(n2, v2)
|
||||
VersionVector(mergedVersions)
|
||||
}
|
||||
VersionVector(mergedVersions)
|
||||
}
|
||||
|
||||
override def needPruningFrom(removedNode: UniqueAddress): Boolean =
|
||||
versions.contains(removedNode)
|
||||
|
||||
override def prune(removedNode: UniqueAddress, collapseInto: UniqueAddress): VersionVector =
|
||||
copy(versions = versions - removedNode) + collapseInto
|
||||
VersionVector(versions = versions - removedNode) + collapseInto
|
||||
|
||||
override def pruningCleanup(removedNode: UniqueAddress): VersionVector = copy(versions = versions - removedNode)
|
||||
override def pruningCleanup(removedNode: UniqueAddress): VersionVector =
|
||||
VersionVector(versions = versions - removedNode)
|
||||
|
||||
override def toString = versions.map { case ((n, t)) ⇒ n + " -> " + t }.mkString("VersionVector(", ", ", ")")
|
||||
override def toString: String =
|
||||
versions.map { case ((n, v)) ⇒ n + " -> " + v }.mkString("VersionVector(", ", ", ")")
|
||||
}
|
||||
|
|
|
|||
|
|
@ -20,6 +20,8 @@ import akka.serialization.SerializerWithStringManifest
|
|||
import akka.serialization.BaseSerializer
|
||||
import akka.protobuf.ByteString
|
||||
import akka.util.ByteString.UTF_8
|
||||
import scala.collection.immutable.TreeMap
|
||||
import akka.cluster.UniqueAddress
|
||||
|
||||
/**
|
||||
* Protobuf serializer of ReplicatedData.
|
||||
|
|
@ -303,7 +305,7 @@ class ReplicatedDataSerializer(val system: ExtendedActorSystem)
|
|||
|
||||
def versionVectorToProto(versionVector: VersionVector): rd.VersionVector = {
|
||||
val b = rd.VersionVector.newBuilder()
|
||||
versionVector.versions.foreach {
|
||||
versionVector.versionsIterator.foreach {
|
||||
case (node, value) ⇒ b.addEntries(rd.VersionVector.Entry.newBuilder().
|
||||
setNode(uniqueAddressToProto(node)).setVersion(value))
|
||||
}
|
||||
|
|
@ -314,8 +316,16 @@ class ReplicatedDataSerializer(val system: ExtendedActorSystem)
|
|||
versionVectorFromProto(rd.VersionVector.parseFrom(bytes))
|
||||
|
||||
def versionVectorFromProto(versionVector: rd.VersionVector): VersionVector = {
|
||||
VersionVector(versions = versionVector.getEntriesList.asScala.map(entry ⇒
|
||||
uniqueAddressFromProto(entry.getNode) -> entry.getVersion)(breakOut))
|
||||
val entries = versionVector.getEntriesList
|
||||
if (entries.isEmpty)
|
||||
VersionVector.empty
|
||||
else if (entries.size == 1)
|
||||
VersionVector(uniqueAddressFromProto(entries.get(0).getNode), entries.get(0).getVersion)
|
||||
else {
|
||||
val versions: TreeMap[UniqueAddress, Long] = versionVector.getEntriesList.asScala.map(entry ⇒
|
||||
uniqueAddressFromProto(entry.getNode) -> entry.getVersion)(breakOut)
|
||||
VersionVector(versions)
|
||||
}
|
||||
}
|
||||
|
||||
def ormapToProto(ormap: ORMap[_]): rd.ORMap = {
|
||||
|
|
|
|||
|
|
@ -101,7 +101,7 @@ class ORMapSpec extends WordSpec with Matchers {
|
|||
|
||||
val m3 = merged1.remove(node1, "b").put(node1, "b", GSet.empty + "B2")
|
||||
// same thing if only put is used
|
||||
// val m3 = merged1.put(node1, "b", GSet() + "B2")
|
||||
// val m3 = merged1.put(node1, "b", GSet.empty + "B2")
|
||||
val merged2 = merged1 merge m3
|
||||
|
||||
merged2.entries("a").elements should be(Set("A"))
|
||||
|
|
|
|||
|
|
@ -12,6 +12,7 @@ import akka.cluster.ddata.Replicator.Changed
|
|||
import org.scalatest.Matchers
|
||||
import org.scalatest.WordSpec
|
||||
|
||||
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
|
||||
class ORSetSpec extends WordSpec with Matchers {
|
||||
|
||||
val node1 = UniqueAddress(Address("akka.tcp", "Sys", "localhost", 2551), 1)
|
||||
|
|
@ -228,30 +229,30 @@ class ORSetSpec extends WordSpec with Matchers {
|
|||
|
||||
"ORSet unit test" must {
|
||||
"verify subtractDots" in {
|
||||
val dot = new VersionVector(TreeMap(nodeA -> 3, nodeB -> 2, nodeD -> 14, nodeG -> 22))
|
||||
val vvector = new VersionVector(TreeMap(nodeA -> 4, nodeB -> 1, nodeC -> 1, nodeD -> 14, nodeE -> 5, nodeF -> 2))
|
||||
val expected = new VersionVector(TreeMap(nodeB -> 2, nodeG -> 22))
|
||||
val dot = VersionVector(TreeMap(nodeA -> 3L, nodeB -> 2L, nodeD -> 14L, nodeG -> 22L))
|
||||
val vvector = VersionVector(TreeMap(nodeA -> 4L, nodeB -> 1L, nodeC -> 1L, nodeD -> 14L, nodeE -> 5L, nodeF -> 2L))
|
||||
val expected = VersionVector(TreeMap(nodeB -> 2L, nodeG -> 22L))
|
||||
ORSet.subtractDots(dot, vvector) should be(expected)
|
||||
}
|
||||
|
||||
"verify mergeCommonKeys" in {
|
||||
val commonKeys: Set[String] = Set("K1", "K2")
|
||||
val thisDot1 = new VersionVector(TreeMap(nodeA -> 3, nodeD -> 7))
|
||||
val thisDot2 = new VersionVector(TreeMap(nodeB -> 5, nodeC -> 2))
|
||||
val thisVvector = new VersionVector(TreeMap(nodeA -> 3, nodeB -> 5, nodeC -> 2, nodeD -> 7))
|
||||
val thisDot1 = VersionVector(TreeMap(nodeA -> 3L, nodeD -> 7L))
|
||||
val thisDot2 = VersionVector(TreeMap(nodeB -> 5L, nodeC -> 2L))
|
||||
val thisVvector = VersionVector(TreeMap(nodeA -> 3L, nodeB -> 5L, nodeC -> 2L, nodeD -> 7L))
|
||||
val thisSet = new ORSet(
|
||||
elementsMap = Map("K1" -> thisDot1, "K2" -> thisDot2),
|
||||
vvector = thisVvector)
|
||||
val thatDot1 = new VersionVector(TreeMap(nodeA -> 3))
|
||||
val thatDot2 = new VersionVector(TreeMap(nodeB -> 6))
|
||||
val thatVvector = new VersionVector(TreeMap(nodeA -> 3, nodeB -> 6, nodeC -> 1, nodeD -> 8))
|
||||
val thatDot1 = VersionVector(nodeA, 3L)
|
||||
val thatDot2 = VersionVector(nodeB, 6L)
|
||||
val thatVvector = VersionVector(TreeMap(nodeA -> 3L, nodeB -> 6L, nodeC -> 1L, nodeD -> 8L))
|
||||
val thatSet = new ORSet(
|
||||
elementsMap = Map("K1" -> thatDot1, "K2" -> thatDot2),
|
||||
vvector = thatVvector)
|
||||
|
||||
val expectedDots = Map(
|
||||
"K1" -> new VersionVector(TreeMap(nodeA -> 3)),
|
||||
"K2" -> new VersionVector(TreeMap(nodeB -> 6, nodeC -> 2)))
|
||||
"K1" -> VersionVector(nodeA, 3L),
|
||||
"K2" -> VersionVector(TreeMap(nodeB -> 6L, nodeC -> 2L)))
|
||||
|
||||
ORSet.mergeCommonKeys(commonKeys, thisSet, thatSet) should be(expectedDots)
|
||||
}
|
||||
|
|
@ -259,14 +260,14 @@ class ORSetSpec extends WordSpec with Matchers {
|
|||
"verify mergeDisjointKeys" in {
|
||||
val keys: Set[Any] = Set("K3", "K4", "K5")
|
||||
val elements: Map[Any, VersionVector] = Map(
|
||||
"K3" -> new VersionVector(TreeMap(nodeA -> 4)),
|
||||
"K4" -> new VersionVector(TreeMap(nodeA -> 3, nodeD -> 8)),
|
||||
"K5" -> new VersionVector(TreeMap(nodeA -> 2)))
|
||||
val vvector = new VersionVector(TreeMap(nodeA -> 3, nodeD -> 7))
|
||||
val acc: Map[Any, VersionVector] = Map("K1" -> new VersionVector(TreeMap(nodeA -> 3)))
|
||||
"K3" -> VersionVector(nodeA, 4L),
|
||||
"K4" -> VersionVector(TreeMap(nodeA -> 3L, nodeD -> 8L)),
|
||||
"K5" -> VersionVector(nodeA, 2L))
|
||||
val vvector = VersionVector(TreeMap(nodeA -> 3L, nodeD -> 7L))
|
||||
val acc: Map[Any, VersionVector] = Map("K1" -> VersionVector(nodeA, 3L))
|
||||
val expectedDots = acc ++ Map(
|
||||
"K3" -> new VersionVector(TreeMap(nodeA -> 4)),
|
||||
"K4" -> new VersionVector(TreeMap(nodeD -> 8))) // "a" -> 3 removed, optimized to include only those unseen
|
||||
"K3" -> VersionVector(nodeA, 4L),
|
||||
"K4" -> VersionVector(nodeD, 8L)) // "a" -> 3 removed, optimized to include only those unseen
|
||||
|
||||
ORSet.mergeDisjointKeys(keys, elements, vvector, acc) should be(expectedDots)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -28,7 +28,7 @@ class VersionVectorSpec extends TestKit(ActorSystem("VersionVectorSpec"))
|
|||
|
||||
"have zero versions when created" in {
|
||||
val vv = VersionVector()
|
||||
vv.versions should be(Map())
|
||||
vv.size should be(0)
|
||||
}
|
||||
|
||||
"not happen before itself" in {
|
||||
|
|
@ -38,6 +38,18 @@ class VersionVectorSpec extends TestKit(ActorSystem("VersionVectorSpec"))
|
|||
vv1 <> vv2 should be(false)
|
||||
}
|
||||
|
||||
"increment correctly" in {
|
||||
val vv1 = VersionVector()
|
||||
val vv2 = vv1 + node1
|
||||
vv2.versionAt(node1) should be > vv1.versionAt(node1)
|
||||
val vv3 = vv2 + node1
|
||||
vv3.versionAt(node1) should be > vv2.versionAt(node1)
|
||||
|
||||
val vv4 = vv3 + node2
|
||||
vv4.versionAt(node1) should be(vv3.versionAt(node1))
|
||||
vv4.versionAt(node2) should be > vv3.versionAt(node2)
|
||||
}
|
||||
|
||||
"pass misc comparison test 1" in {
|
||||
val vv1_1 = VersionVector()
|
||||
val vv2_1 = vv1_1 + node1
|
||||
|
|
@ -159,16 +171,16 @@ class VersionVectorSpec extends TestKit(ActorSystem("VersionVectorSpec"))
|
|||
val vv3_2 = vv2_2 + node2
|
||||
|
||||
val merged1 = vv3_2 merge vv5_1
|
||||
merged1.versions.size should be(3)
|
||||
merged1.versions.contains(node1) should be(true)
|
||||
merged1.versions.contains(node2) should be(true)
|
||||
merged1.versions.contains(node3) should be(true)
|
||||
merged1.size should be(3)
|
||||
merged1.contains(node1) should be(true)
|
||||
merged1.contains(node2) should be(true)
|
||||
merged1.contains(node3) should be(true)
|
||||
|
||||
val merged2 = vv5_1 merge vv3_2
|
||||
merged2.versions.size should be(3)
|
||||
merged2.versions.contains(node1) should be(true)
|
||||
merged2.versions.contains(node2) should be(true)
|
||||
merged2.versions.contains(node3) should be(true)
|
||||
merged2.size should be(3)
|
||||
merged2.contains(node1) should be(true)
|
||||
merged2.contains(node2) should be(true)
|
||||
merged2.contains(node3) should be(true)
|
||||
|
||||
vv3_2 < merged1 should be(true)
|
||||
vv5_1 < merged1 should be(true)
|
||||
|
|
@ -192,18 +204,18 @@ class VersionVectorSpec extends TestKit(ActorSystem("VersionVectorSpec"))
|
|||
val vv3_2 = vv2_2 + node4
|
||||
|
||||
val merged1 = vv3_2 merge vv5_1
|
||||
merged1.versions.size should be(4)
|
||||
merged1.versions.contains(node1) should be(true)
|
||||
merged1.versions.contains(node2) should be(true)
|
||||
merged1.versions.contains(node3) should be(true)
|
||||
merged1.versions.contains(node4) should be(true)
|
||||
merged1.size should be(4)
|
||||
merged1.contains(node1) should be(true)
|
||||
merged1.contains(node2) should be(true)
|
||||
merged1.contains(node3) should be(true)
|
||||
merged1.contains(node4) should be(true)
|
||||
|
||||
val merged2 = vv5_1 merge vv3_2
|
||||
merged2.versions.size should be(4)
|
||||
merged2.versions.contains(node1) should be(true)
|
||||
merged2.versions.contains(node2) should be(true)
|
||||
merged2.versions.contains(node3) should be(true)
|
||||
merged2.versions.contains(node4) should be(true)
|
||||
merged2.size should be(4)
|
||||
merged2.contains(node1) should be(true)
|
||||
merged2.contains(node2) should be(true)
|
||||
merged2.contains(node3) should be(true)
|
||||
merged2.contains(node4) should be(true)
|
||||
|
||||
vv3_2 < merged1 should be(true)
|
||||
vv5_1 < merged1 should be(true)
|
||||
|
|
|
|||
|
|
@ -86,6 +86,7 @@ class ReplicatedDataSerializerSpec extends TestKit(ActorSystem("ReplicatedDataSe
|
|||
|
||||
val s1 = ORSet().add(address1, "a").add(address2, "b")
|
||||
val s2 = ORSet().add(address2, "b").add(address1, "a")
|
||||
|
||||
checkSameContent(s1.merge(s2), s2.merge(s1))
|
||||
|
||||
val s3 = ORSet().add(address1, "a").add(address2, 17).remove(address3, 17)
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue