diff --git a/akka-distributed-data/src/main/scala/akka/cluster/ddata/FastMerge.scala b/akka-distributed-data/src/main/scala/akka/cluster/ddata/FastMerge.scala new file mode 100644 index 0000000000..385dcf16e9 --- /dev/null +++ b/akka-distributed-data/src/main/scala/akka/cluster/ddata/FastMerge.scala @@ -0,0 +1,43 @@ +/** + * Copyright (C) 2015 Typesafe Inc. + */ +package akka.cluster.ddata + +/** + * INTERNAL API + * + * Optimization for add/remove followed by merge and merge should just fast forward to + * the new instance. + * + * It's like a cache between calls of the same thread, you can think of it as a thread local. + * The Replicator actor invokes the user's modify function, which returns a new ReplicatedData instance, + * with the ancestor field set (see for example the add method in ORSet). Then (in same thread) the + * Replication calls merge, which makes use of the ancestor field to perform quick merge + * (see for example merge method in ORSet). + * + * It's not thread safe if the modifying function and merge are called from different threads, + * i.e. if used outside the Replicator infrastructure, but the worst thing that can happen is that + * a full merge is performed instead of the fast forward merge. + */ +private[akka] trait FastMerge { self: ReplicatedData ⇒ + + private var ancestor: FastMerge = null + + /** INTERNAL API: should be called from "updating" methods */ + private[akka] def assignAncestor(newData: T with FastMerge): T = { + newData.ancestor = if (this.ancestor eq null) this else this.ancestor + this.ancestor = null // only one level, for GC + newData + } + + /** INTERNAL API: should be used from merge */ + private[akka] def isAncestorOf(that: T with FastMerge): Boolean = + that.ancestor eq this + + /** INTERNAL API: should be called from merge */ + private[akka] def clearAncestor(): self.type = { + ancestor = null + this + } + +} diff --git a/akka-distributed-data/src/main/scala/akka/cluster/ddata/GCounter.scala b/akka-distributed-data/src/main/scala/akka/cluster/ddata/GCounter.scala index e6ca8c7b3c..b4d679edc2 100644 --- a/akka-distributed-data/src/main/scala/akka/cluster/ddata/GCounter.scala +++ b/akka-distributed-data/src/main/scala/akka/cluster/ddata/GCounter.scala @@ -40,7 +40,7 @@ object GCounter { @SerialVersionUID(1L) final class GCounter private[akka] ( private[akka] val state: Map[UniqueAddress, BigInt] = Map.empty) - extends ReplicatedData with ReplicatedDataSerialization with RemovedNodePruning { + extends ReplicatedData with ReplicatedDataSerialization with RemovedNodePruning with FastMerge { import GCounter.Zero @@ -83,20 +83,24 @@ final class GCounter private[akka] ( else state.get(key) match { case Some(v) ⇒ val tot = v + delta - new GCounter(state + (key -> tot)) - case None ⇒ new GCounter(state + (key -> delta)) + assignAncestor(new GCounter(state + (key -> tot))) + case None ⇒ assignAncestor(new GCounter(state + (key -> delta))) } } - override def merge(that: GCounter): GCounter = { - var merged = that.state - for ((key, thisValue) ← state) { - val thatValue = merged.getOrElse(key, Zero) - if (thisValue > thatValue) - merged = merged.updated(key, thisValue) + override def merge(that: GCounter): GCounter = + if ((this eq that) || that.isAncestorOf(this)) this.clearAncestor() + else if (this.isAncestorOf(that)) that.clearAncestor() + else { + var merged = that.state + for ((key, thisValue) ← state) { + val thatValue = merged.getOrElse(key, Zero) + if (thisValue > thatValue) + merged = merged.updated(key, thisValue) + } + clearAncestor() + new GCounter(merged) } - new GCounter(merged) - } override def needPruningFrom(removedNode: UniqueAddress): Boolean = state.contains(removedNode) diff --git a/akka-distributed-data/src/main/scala/akka/cluster/ddata/GSet.scala b/akka-distributed-data/src/main/scala/akka/cluster/ddata/GSet.scala index 29e5e646da..589b9c91d9 100644 --- a/akka-distributed-data/src/main/scala/akka/cluster/ddata/GSet.scala +++ b/akka-distributed-data/src/main/scala/akka/cluster/ddata/GSet.scala @@ -27,7 +27,7 @@ object GSet { * This class is immutable, i.e. "modifying" methods return a new instance. */ @SerialVersionUID(1L) -final case class GSet[A](elements: Set[A]) extends ReplicatedData with ReplicatedDataSerialization { +final case class GSet[A](elements: Set[A]) extends ReplicatedData with ReplicatedDataSerialization with FastMerge { type T = GSet[A] @@ -53,9 +53,15 @@ final case class GSet[A](elements: Set[A]) extends ReplicatedData with Replicate /** * Adds an element to the set */ - def add(element: A): GSet[A] = copy(elements + element) + def add(element: A): GSet[A] = assignAncestor(copy(elements + element)) - override def merge(that: GSet[A]): GSet[A] = copy(elements ++ that.elements) + override def merge(that: GSet[A]): GSet[A] = + if ((this eq that) || that.isAncestorOf(this)) this.clearAncestor() + else if (this.isAncestorOf(that)) that.clearAncestor() + else { + clearAncestor() + copy(elements ++ that.elements) + } } object GSetKey { diff --git a/akka-distributed-data/src/main/scala/akka/cluster/ddata/ORSet.scala b/akka-distributed-data/src/main/scala/akka/cluster/ddata/ORSet.scala index 405c0e00e0..2e7909cfae 100644 --- a/akka-distributed-data/src/main/scala/akka/cluster/ddata/ORSet.scala +++ b/akka-distributed-data/src/main/scala/akka/cluster/ddata/ORSet.scala @@ -154,7 +154,7 @@ object ORSet { final class ORSet[A] private[akka] ( private[akka] val elementsMap: Map[A, ORSet.Dot], private[akka] val vvector: VersionVector) - extends ReplicatedData with ReplicatedDataSerialization with RemovedNodePruning { + extends ReplicatedData with ReplicatedDataSerialization with RemovedNodePruning with FastMerge { type T = ORSet[A] @@ -193,7 +193,7 @@ final class ORSet[A] private[akka] ( private[akka] def add(node: UniqueAddress, element: A): ORSet[A] = { val newVvector = vvector + node val newDot = new VersionVector(versions = TreeMap(node -> newVvector.versions(node))) - new ORSet(elementsMap = elementsMap.updated(element, newDot), vvector = newVvector) + assignAncestor(new ORSet(elementsMap = elementsMap.updated(element, newDot), vvector = newVvector)) } /** @@ -210,7 +210,7 @@ final class ORSet[A] private[akka] ( * INTERNAL API */ private[akka] def remove(node: UniqueAddress, element: A): ORSet[A] = - copy(elementsMap = elementsMap - element) + assignAncestor(copy(elementsMap = elementsMap - element)) /** * Removes all elements from the set, but keeps the history. @@ -222,7 +222,8 @@ final class ORSet[A] private[akka] ( /** * INTERNAL API */ - private[akka] def clear(node: UniqueAddress): ORSet[A] = copy(elementsMap = Map.empty) + private[akka] def clear(node: UniqueAddress): ORSet[A] = + assignAncestor(copy(elementsMap = Map.empty)) /** * When element is in this Set but not in that Set: @@ -238,18 +239,23 @@ final class ORSet[A] private[akka] ( * Keep only common dots, and dots that are not dominated by the other sides version vector */ override def merge(that: ORSet[A]): ORSet[A] = { - val thisKeys = elementsMap.keySet - val thatKeys = that.elementsMap.keySet - val commonKeys = thisKeys.intersect(thatKeys) - val thisUniqueKeys = thisKeys -- commonKeys - val thatUniqueKeys = thatKeys -- commonKeys + if ((this eq that) || that.isAncestorOf(this)) this.clearAncestor() + else if (this.isAncestorOf(that)) that.clearAncestor() + else { + val thisKeys = elementsMap.keySet + val thatKeys = that.elementsMap.keySet + val commonKeys = thisKeys.intersect(thatKeys) + val thisUniqueKeys = thisKeys -- commonKeys + val thatUniqueKeys = thatKeys -- commonKeys - val entries00 = ORSet.mergeCommonKeys(commonKeys, this, that) - val entries0 = ORSet.mergeDisjointKeys(thisUniqueKeys, this.elementsMap, that.vvector, entries00) - val entries = ORSet.mergeDisjointKeys(thatUniqueKeys, that.elementsMap, this.vvector, entries0) - val mergedVvector = this.vvector.merge(that.vvector) + val entries00 = ORSet.mergeCommonKeys(commonKeys, this, that) + val entries0 = ORSet.mergeDisjointKeys(thisUniqueKeys, this.elementsMap, that.vvector, entries00) + val entries = ORSet.mergeDisjointKeys(thatUniqueKeys, that.elementsMap, this.vvector, entries0) + val mergedVvector = this.vvector.merge(that.vvector) - new ORSet(entries, mergedVvector) + clearAncestor() + new ORSet(entries, mergedVvector) + } } override def needPruningFrom(removedNode: UniqueAddress): Boolean = diff --git a/akka-distributed-data/src/test/scala/akka/cluster/ddata/ORSetSpec.scala b/akka-distributed-data/src/test/scala/akka/cluster/ddata/ORSetSpec.scala index 30e189b304..dabb736602 100644 --- a/akka-distributed-data/src/test/scala/akka/cluster/ddata/ORSetSpec.scala +++ b/akka-distributed-data/src/test/scala/akka/cluster/ddata/ORSetSpec.scala @@ -59,6 +59,14 @@ class ORSetSpec extends WordSpec with Matchers { c5.elements should not contain (user1) c5.elements should not contain (user2) + + val c6 = c3.merge(c5) + c6.elements should not contain (user1) + c6.elements should not contain (user2) + + val c7 = c5.merge(c3) + c7.elements should not contain (user1) + c7.elements should not contain (user2) } "be able to add removed" in {