diff --git a/akka-distributed-data/src/main/scala/akka/cluster/ddata/ORSet.scala b/akka-distributed-data/src/main/scala/akka/cluster/ddata/ORSet.scala index ad191c58fa..405c0e00e0 100644 --- a/akka-distributed-data/src/main/scala/akka/cluster/ddata/ORSet.scala +++ b/akka-distributed-data/src/main/scala/akka/cluster/ddata/ORSet.scala @@ -63,8 +63,12 @@ object ORSet { } } - val newDots = dropDots(dot.versions.toList, Nil) - new VersionVector(versions = TreeMap.empty[UniqueAddress, Long] ++ newDots) + if (dot.versions.isEmpty) + VersionVector.empty + else { + val newDots = dropDots(dot.versions.toList, Nil) + new VersionVector(versions = VersionVector.emptyVersions ++ newDots) + } } /** @@ -74,20 +78,26 @@ object ORSet { private[akka] def mergeCommonKeys[A](commonKeys: Set[A], lhs: ORSet[A], rhs: ORSet[A]): Map[A, ORSet.Dot] = { commonKeys.foldLeft(Map.empty[A, ORSet.Dot]) { case (acc, k) ⇒ - val lhsDots = lhs.elementsMap(k).versions - val rhsDots = rhs.elementsMap(k).versions - val commonDots = lhsDots.filter { - case (thisDotNode, v) ⇒ rhsDots.get(thisDotNode).exists(_ == v) + val lhsDots = lhs.elementsMap(k) + val lhsDotsVersions = lhsDots.versions + val rhsDotsVersions = rhs.elementsMap(k).versions + if (lhsDotsVersions.size == 1 && rhsDotsVersions.size == 1 && lhsDotsVersions.head == rhsDotsVersions.head) { + // one single common dot + acc.updated(k, lhsDots) + } else { + val commonDots = lhsDotsVersions.filter { + case (thisDotNode, v) ⇒ rhsDotsVersions.get(thisDotNode).exists(_ == v) + } + val commonDotsKeys = commonDots.keys + val lhsUniqueDots = lhsDotsVersions -- commonDotsKeys + val rhsUniqueDots = rhsDotsVersions -- commonDotsKeys + val lhsKeep = ORSet.subtractDots(new VersionVector(lhsUniqueDots), rhs.vvector) + val rhsKeep = ORSet.subtractDots(new VersionVector(rhsUniqueDots), lhs.vvector) + val merged = lhsKeep.merge(rhsKeep).merge(new VersionVector(versions = commonDots)) + // Perfectly possible that an item in both sets should be dropped + if (merged.versions.isEmpty) acc + else acc.updated(k, merged) } - val commonDotsKeys = commonDots.keys - val lhsUniqueDots = lhsDots -- commonDotsKeys - val rhsUniqueDots = rhsDots -- commonDotsKeys - val lhsKeep = ORSet.subtractDots(new VersionVector(lhsUniqueDots), rhs.vvector) - val rhsKeep = ORSet.subtractDots(new VersionVector(rhsUniqueDots), lhs.vvector) - val merged = lhsKeep.merge(rhsKeep).merge(new VersionVector(versions = commonDots)) - // Perfectly possible that an item in both sets should be dropped - if (merged.versions.isEmpty) acc - else acc.updated(k, merged) } } diff --git a/akka-distributed-data/src/main/scala/akka/cluster/ddata/VersionVector.scala b/akka-distributed-data/src/main/scala/akka/cluster/ddata/VersionVector.scala index 7b4e4e3ba9..38cfddb79d 100644 --- a/akka-distributed-data/src/main/scala/akka/cluster/ddata/VersionVector.scala +++ b/akka-distributed-data/src/main/scala/akka/cluster/ddata/VersionVector.scala @@ -16,7 +16,11 @@ import akka.cluster.UniqueAddress */ object VersionVector { - val empty: VersionVector = new VersionVector(TreeMap.empty[UniqueAddress, Long]) + /** + * INTERNAL API + */ + private[akka] val emptyVersions: TreeMap[UniqueAddress, Long] = TreeMap.empty + val empty: VersionVector = new VersionVector(emptyVersions) def apply(): VersionVector = empty /** * Java API