=cdd #18328 use ancestor field, for fast forward merge
AFTER: [info] Benchmark (set1Size) Mode Cnt Score Error Units [info] ORSetMergeBenchmark.mergeAddFromBothNodes 1 thrpt 10 717.362 ± 15.770 ops/ms [info] ORSetMergeBenchmark.mergeAddFromBothNodes 10 thrpt 10 144.862 ± 8.313 ops/ms [info] ORSetMergeBenchmark.mergeAddFromBothNodes 20 thrpt 10 96.004 ± 0.972 ops/ms [info] ORSetMergeBenchmark.mergeAddFromBothNodes 100 thrpt 10 18.735 ± 0.368 ops/ms [info] ORSetMergeBenchmark.mergeAddFromOtherNode 1 thrpt 10 1261.825 ± 51.717 ops/ms [info] ORSetMergeBenchmark.mergeAddFromOtherNode 10 thrpt 10 162.367 ± 21.443 ops/ms [info] ORSetMergeBenchmark.mergeAddFromOtherNode 20 thrpt 10 103.423 ± 1.569 ops/ms [info] ORSetMergeBenchmark.mergeAddFromOtherNode 100 thrpt 10 18.690 ± 0.642 ops/ms [info] ORSetMergeBenchmark.mergeAddFromSameNode 1 thrpt 10 3666.086 ± 330.087 ops/ms [info] ORSetMergeBenchmark.mergeAddFromSameNode 10 thrpt 10 2404.863 ± 136.244 ops/ms [info] ORSetMergeBenchmark.mergeAddFromSameNode 20 thrpt 10 2423.596 ± 142.533 ops/ms [info] ORSetMergeBenchmark.mergeAddFromSameNode 100 thrpt 10 2094.802 ± 161.307 ops/ms [info] ORSetMergeBenchmark.mergeComplex 1 thrpt 10 326.784 ± 6.665 ops/ms [info] ORSetMergeBenchmark.mergeComplex 10 thrpt 10 133.394 ± 4.749 ops/ms [info] ORSetMergeBenchmark.mergeComplex 20 thrpt 10 88.241 ± 1.733 ops/ms [info] ORSetMergeBenchmark.mergeComplex 100 thrpt 10 18.117 ± 0.543 ops/ms BEFORE: [info] Benchmark (set1Size) Mode Cnt Score Error Units [info] ORSetMergeBenchmark.mergeAddFromBothNodes 1 thrpt 10 737.646 ± 10.289 ops/ms [info] ORSetMergeBenchmark.mergeAddFromBothNodes 10 thrpt 10 146.706 ± 6.331 ops/ms [info] ORSetMergeBenchmark.mergeAddFromBothNodes 20 thrpt 10 95.553 ± 1.801 ops/ms [info] ORSetMergeBenchmark.mergeAddFromBothNodes 100 thrpt 10 18.321 ± 0.586 ops/ms [info] ORSetMergeBenchmark.mergeAddFromOtherNode 1 thrpt 10 1274.526 ± 23.732 ops/ms [info] ORSetMergeBenchmark.mergeAddFromOtherNode 10 thrpt 10 162.426 ± 20.490 ops/ms [info] ORSetMergeBenchmark.mergeAddFromOtherNode 20 thrpt 10 102.436 ± 2.435 ops/ms [info] ORSetMergeBenchmark.mergeAddFromOtherNode 100 thrpt 10 18.911 ± 0.659 ops/ms [info] ORSetMergeBenchmark.mergeAddFromSameNode 1 thrpt 10 653.358 ± 71.232 ops/ms [info] ORSetMergeBenchmark.mergeAddFromSameNode 10 thrpt 10 147.385 ± 2.750 ops/ms [info] ORSetMergeBenchmark.mergeAddFromSameNode 20 thrpt 10 94.280 ± 0.894 ops/ms [info] ORSetMergeBenchmark.mergeAddFromSameNode 100 thrpt 10 17.922 ± 1.522 ops/ms [info] ORSetMergeBenchmark.mergeComplex 1 thrpt 10 335.060 ± 8.385 ops/ms [info] ORSetMergeBenchmark.mergeComplex 10 thrpt 10 134.438 ± 3.044 ops/ms [info] ORSetMergeBenchmark.mergeComplex 20 thrpt 10 86.015 ± 2.145 ops/ms [info] ORSetMergeBenchmark.mergeComplex 100 thrpt 10 17.611 ± 0.136 ops/ms
This commit is contained in:
parent
8026e216aa
commit
e10593ec31
5 changed files with 95 additions and 28 deletions
|
|
@ -0,0 +1,43 @@
|
||||||
|
/**
|
||||||
|
* Copyright (C) 2015 Typesafe Inc. <http://www.typesafe.com>
|
||||||
|
*/
|
||||||
|
package akka.cluster.ddata
|
||||||
|
|
||||||
|
/**
|
||||||
|
* INTERNAL API
|
||||||
|
*
|
||||||
|
* Optimization for add/remove followed by merge and merge should just fast forward to
|
||||||
|
* the new instance.
|
||||||
|
*
|
||||||
|
* It's like a cache between calls of the same thread, you can think of it as a thread local.
|
||||||
|
* The Replicator actor invokes the user's modify function, which returns a new ReplicatedData instance,
|
||||||
|
* with the ancestor field set (see for example the add method in ORSet). Then (in same thread) the
|
||||||
|
* Replication calls merge, which makes use of the ancestor field to perform quick merge
|
||||||
|
* (see for example merge method in ORSet).
|
||||||
|
*
|
||||||
|
* It's not thread safe if the modifying function and merge are called from different threads,
|
||||||
|
* i.e. if used outside the Replicator infrastructure, but the worst thing that can happen is that
|
||||||
|
* a full merge is performed instead of the fast forward merge.
|
||||||
|
*/
|
||||||
|
private[akka] trait FastMerge { self: ReplicatedData ⇒
|
||||||
|
|
||||||
|
private var ancestor: FastMerge = null
|
||||||
|
|
||||||
|
/** INTERNAL API: should be called from "updating" methods */
|
||||||
|
private[akka] def assignAncestor(newData: T with FastMerge): T = {
|
||||||
|
newData.ancestor = if (this.ancestor eq null) this else this.ancestor
|
||||||
|
this.ancestor = null // only one level, for GC
|
||||||
|
newData
|
||||||
|
}
|
||||||
|
|
||||||
|
/** INTERNAL API: should be used from merge */
|
||||||
|
private[akka] def isAncestorOf(that: T with FastMerge): Boolean =
|
||||||
|
that.ancestor eq this
|
||||||
|
|
||||||
|
/** INTERNAL API: should be called from merge */
|
||||||
|
private[akka] def clearAncestor(): self.type = {
|
||||||
|
ancestor = null
|
||||||
|
this
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
@ -40,7 +40,7 @@ object GCounter {
|
||||||
@SerialVersionUID(1L)
|
@SerialVersionUID(1L)
|
||||||
final class GCounter private[akka] (
|
final class GCounter private[akka] (
|
||||||
private[akka] val state: Map[UniqueAddress, BigInt] = Map.empty)
|
private[akka] val state: Map[UniqueAddress, BigInt] = Map.empty)
|
||||||
extends ReplicatedData with ReplicatedDataSerialization with RemovedNodePruning {
|
extends ReplicatedData with ReplicatedDataSerialization with RemovedNodePruning with FastMerge {
|
||||||
|
|
||||||
import GCounter.Zero
|
import GCounter.Zero
|
||||||
|
|
||||||
|
|
@ -83,18 +83,22 @@ final class GCounter private[akka] (
|
||||||
else state.get(key) match {
|
else state.get(key) match {
|
||||||
case Some(v) ⇒
|
case Some(v) ⇒
|
||||||
val tot = v + delta
|
val tot = v + delta
|
||||||
new GCounter(state + (key -> tot))
|
assignAncestor(new GCounter(state + (key -> tot)))
|
||||||
case None ⇒ new GCounter(state + (key -> delta))
|
case None ⇒ assignAncestor(new GCounter(state + (key -> delta)))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
override def merge(that: GCounter): GCounter = {
|
override def merge(that: GCounter): GCounter =
|
||||||
|
if ((this eq that) || that.isAncestorOf(this)) this.clearAncestor()
|
||||||
|
else if (this.isAncestorOf(that)) that.clearAncestor()
|
||||||
|
else {
|
||||||
var merged = that.state
|
var merged = that.state
|
||||||
for ((key, thisValue) ← state) {
|
for ((key, thisValue) ← state) {
|
||||||
val thatValue = merged.getOrElse(key, Zero)
|
val thatValue = merged.getOrElse(key, Zero)
|
||||||
if (thisValue > thatValue)
|
if (thisValue > thatValue)
|
||||||
merged = merged.updated(key, thisValue)
|
merged = merged.updated(key, thisValue)
|
||||||
}
|
}
|
||||||
|
clearAncestor()
|
||||||
new GCounter(merged)
|
new GCounter(merged)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -27,7 +27,7 @@ object GSet {
|
||||||
* This class is immutable, i.e. "modifying" methods return a new instance.
|
* This class is immutable, i.e. "modifying" methods return a new instance.
|
||||||
*/
|
*/
|
||||||
@SerialVersionUID(1L)
|
@SerialVersionUID(1L)
|
||||||
final case class GSet[A](elements: Set[A]) extends ReplicatedData with ReplicatedDataSerialization {
|
final case class GSet[A](elements: Set[A]) extends ReplicatedData with ReplicatedDataSerialization with FastMerge {
|
||||||
|
|
||||||
type T = GSet[A]
|
type T = GSet[A]
|
||||||
|
|
||||||
|
|
@ -53,9 +53,15 @@ final case class GSet[A](elements: Set[A]) extends ReplicatedData with Replicate
|
||||||
/**
|
/**
|
||||||
* Adds an element to the set
|
* Adds an element to the set
|
||||||
*/
|
*/
|
||||||
def add(element: A): GSet[A] = copy(elements + element)
|
def add(element: A): GSet[A] = assignAncestor(copy(elements + element))
|
||||||
|
|
||||||
override def merge(that: GSet[A]): GSet[A] = copy(elements ++ that.elements)
|
override def merge(that: GSet[A]): GSet[A] =
|
||||||
|
if ((this eq that) || that.isAncestorOf(this)) this.clearAncestor()
|
||||||
|
else if (this.isAncestorOf(that)) that.clearAncestor()
|
||||||
|
else {
|
||||||
|
clearAncestor()
|
||||||
|
copy(elements ++ that.elements)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
object GSetKey {
|
object GSetKey {
|
||||||
|
|
|
||||||
|
|
@ -154,7 +154,7 @@ object ORSet {
|
||||||
final class ORSet[A] private[akka] (
|
final class ORSet[A] private[akka] (
|
||||||
private[akka] val elementsMap: Map[A, ORSet.Dot],
|
private[akka] val elementsMap: Map[A, ORSet.Dot],
|
||||||
private[akka] val vvector: VersionVector)
|
private[akka] val vvector: VersionVector)
|
||||||
extends ReplicatedData with ReplicatedDataSerialization with RemovedNodePruning {
|
extends ReplicatedData with ReplicatedDataSerialization with RemovedNodePruning with FastMerge {
|
||||||
|
|
||||||
type T = ORSet[A]
|
type T = ORSet[A]
|
||||||
|
|
||||||
|
|
@ -193,7 +193,7 @@ final class ORSet[A] private[akka] (
|
||||||
private[akka] def add(node: UniqueAddress, element: A): ORSet[A] = {
|
private[akka] def add(node: UniqueAddress, element: A): ORSet[A] = {
|
||||||
val newVvector = vvector + node
|
val newVvector = vvector + node
|
||||||
val newDot = new VersionVector(versions = TreeMap(node -> newVvector.versions(node)))
|
val newDot = new VersionVector(versions = TreeMap(node -> newVvector.versions(node)))
|
||||||
new ORSet(elementsMap = elementsMap.updated(element, newDot), vvector = newVvector)
|
assignAncestor(new ORSet(elementsMap = elementsMap.updated(element, newDot), vvector = newVvector))
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -210,7 +210,7 @@ final class ORSet[A] private[akka] (
|
||||||
* INTERNAL API
|
* INTERNAL API
|
||||||
*/
|
*/
|
||||||
private[akka] def remove(node: UniqueAddress, element: A): ORSet[A] =
|
private[akka] def remove(node: UniqueAddress, element: A): ORSet[A] =
|
||||||
copy(elementsMap = elementsMap - element)
|
assignAncestor(copy(elementsMap = elementsMap - element))
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Removes all elements from the set, but keeps the history.
|
* Removes all elements from the set, but keeps the history.
|
||||||
|
|
@ -222,7 +222,8 @@ final class ORSet[A] private[akka] (
|
||||||
/**
|
/**
|
||||||
* INTERNAL API
|
* INTERNAL API
|
||||||
*/
|
*/
|
||||||
private[akka] def clear(node: UniqueAddress): ORSet[A] = copy(elementsMap = Map.empty)
|
private[akka] def clear(node: UniqueAddress): ORSet[A] =
|
||||||
|
assignAncestor(copy(elementsMap = Map.empty))
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* When element is in this Set but not in that Set:
|
* When element is in this Set but not in that Set:
|
||||||
|
|
@ -238,6 +239,9 @@ final class ORSet[A] private[akka] (
|
||||||
* Keep only common dots, and dots that are not dominated by the other sides version vector
|
* Keep only common dots, and dots that are not dominated by the other sides version vector
|
||||||
*/
|
*/
|
||||||
override def merge(that: ORSet[A]): ORSet[A] = {
|
override def merge(that: ORSet[A]): ORSet[A] = {
|
||||||
|
if ((this eq that) || that.isAncestorOf(this)) this.clearAncestor()
|
||||||
|
else if (this.isAncestorOf(that)) that.clearAncestor()
|
||||||
|
else {
|
||||||
val thisKeys = elementsMap.keySet
|
val thisKeys = elementsMap.keySet
|
||||||
val thatKeys = that.elementsMap.keySet
|
val thatKeys = that.elementsMap.keySet
|
||||||
val commonKeys = thisKeys.intersect(thatKeys)
|
val commonKeys = thisKeys.intersect(thatKeys)
|
||||||
|
|
@ -249,8 +253,10 @@ final class ORSet[A] private[akka] (
|
||||||
val entries = ORSet.mergeDisjointKeys(thatUniqueKeys, that.elementsMap, this.vvector, entries0)
|
val entries = ORSet.mergeDisjointKeys(thatUniqueKeys, that.elementsMap, this.vvector, entries0)
|
||||||
val mergedVvector = this.vvector.merge(that.vvector)
|
val mergedVvector = this.vvector.merge(that.vvector)
|
||||||
|
|
||||||
|
clearAncestor()
|
||||||
new ORSet(entries, mergedVvector)
|
new ORSet(entries, mergedVvector)
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
override def needPruningFrom(removedNode: UniqueAddress): Boolean =
|
override def needPruningFrom(removedNode: UniqueAddress): Boolean =
|
||||||
vvector.needPruningFrom(removedNode)
|
vvector.needPruningFrom(removedNode)
|
||||||
|
|
|
||||||
|
|
@ -59,6 +59,14 @@ class ORSetSpec extends WordSpec with Matchers {
|
||||||
|
|
||||||
c5.elements should not contain (user1)
|
c5.elements should not contain (user1)
|
||||||
c5.elements should not contain (user2)
|
c5.elements should not contain (user2)
|
||||||
|
|
||||||
|
val c6 = c3.merge(c5)
|
||||||
|
c6.elements should not contain (user1)
|
||||||
|
c6.elements should not contain (user2)
|
||||||
|
|
||||||
|
val c7 = c5.merge(c3)
|
||||||
|
c7.elements should not contain (user1)
|
||||||
|
c7.elements should not contain (user2)
|
||||||
}
|
}
|
||||||
|
|
||||||
"be able to add removed" in {
|
"be able to add removed" in {
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue