diff --git a/akka-distributed-data/src/main/scala/akka/cluster/ddata/GSet.scala b/akka-distributed-data/src/main/scala/akka/cluster/ddata/GSet.scala index 5813bcba07..a566937da0 100644 --- a/akka-distributed-data/src/main/scala/akka/cluster/ddata/GSet.scala +++ b/akka-distributed-data/src/main/scala/akka/cluster/ddata/GSet.scala @@ -4,9 +4,11 @@ package akka.cluster.ddata object GSet { - private val _empty: GSet[Any] = new GSet(Set.empty) + private val _empty: GSet[Any] = new GSet(Set.empty)(None) def empty[A]: GSet[A] = _empty.asInstanceOf[GSet[A]] def apply(): GSet[Any] = _empty + private[akka] def apply[A](set: Set[A]): GSet[A] = new GSet(set)(None) + /** * Java API */ @@ -27,7 +29,8 @@ object GSet { * This class is immutable, i.e. "modifying" methods return a new instance. */ @SerialVersionUID(1L) -final case class GSet[A](elements: Set[A]) extends ReplicatedData with ReplicatedDataSerialization with FastMerge { +final case class GSet[A] private (elements: Set[A])(_delta: Option[GSet[A]]) + extends DeltaReplicatedData with ReplicatedDataSerialization with FastMerge { type T = GSet[A] @@ -53,15 +56,32 @@ final case class GSet[A](elements: Set[A]) extends ReplicatedData with Replicate /** * Adds an element to the set */ - def add(element: A): GSet[A] = assignAncestor(copy(elements + element)) + def add(element: A): GSet[A] = { + val newDelta = _delta match { + case Some(e) ⇒ Some(new GSet(e.elements + element)(None)) + case None ⇒ Some(new GSet[A](Set.apply[A](element))(None)) + } + assignAncestor(new GSet[A](elements + element)(newDelta)) + } override def merge(that: GSet[A]): GSet[A] = if ((this eq that) || that.isAncestorOf(this)) this.clearAncestor() else if (this.isAncestorOf(that)) that.clearAncestor() else { clearAncestor() - copy(elements union that.elements) + new GSet[A](elements union that.elements)(None) } + + override def delta: GSet[A] = _delta match { + case Some(d) ⇒ d + case None ⇒ GSet.empty[A] + } + + override def resetDelta: GSet[A] = new GSet[A](elements)(None) + + override def toString: String = s"G$elements" + + def copy(e: Set[A] = elements) = new GSet[A](e)(_delta) } object GSetKey { diff --git a/akka-distributed-data/src/test/scala/akka/cluster/ddata/GSetSpec.scala b/akka-distributed-data/src/test/scala/akka/cluster/ddata/GSetSpec.scala index eeb96aedcf..6387bf463f 100644 --- a/akka-distributed-data/src/test/scala/akka/cluster/ddata/GSetSpec.scala +++ b/akka-distributed-data/src/test/scala/akka/cluster/ddata/GSetSpec.scala @@ -39,8 +39,7 @@ class GSetSpec extends WordSpec with Matchers { val c12 = c11 + user1 val c13 = c12 + user2 - c13.elements should contain(user1) - c13.elements should contain(user2) + c13.elements should ===(Set(user1, user2)) // set 2 val c21 = GSet.empty[String] @@ -48,21 +47,54 @@ class GSetSpec extends WordSpec with Matchers { val c22 = c21 + user3 val c23 = c22 + user4 - c23.elements should contain(user3) - c23.elements should contain(user4) + c23.elements should ===(Set(user3, user4)) // merge both ways val merged1 = c13 merge c23 - merged1.elements should contain(user1) - merged1.elements should contain(user2) - merged1.elements should contain(user3) - merged1.elements should contain(user4) + merged1.elements should ===(Set(user1, user2, user3, user4)) val merged2 = c23 merge c13 - merged2.elements should contain(user1) - merged2.elements should contain(user2) - merged2.elements should contain(user3) - merged2.elements should contain(user4) + merged2.elements should ===(Set(user1, user2, user3, user4)) + } + + "be able to work with deltas" in { + // set 1 + val c11 = GSet.empty[String] + + val c12 = c11 + user1 + val c13 = c12 + user2 + + c12.delta.elements should ===(Set(user1)) + c13.delta.elements should ===(Set(user1, user2)) + + // deltas build state + (c12 merge c13.delta) should ===(c13) + + // own deltas are idempotent + (c13 merge c13.delta) should ===(c13) + + // set 2 + val c21 = GSet.empty[String] + + val c22 = c21 + user3 + val c23 = c22.resetDelta + user4 + + c22.delta.elements should ===(Set(user3)) + c23.delta.elements should ===(Set(user4)) + + c23.elements should ===(Set(user3, user4)) + + val c33 = c13 merge c23 + + // merge both ways + val merged1 = GSet.empty[String] merge c12.delta merge c13.delta merge c22.delta merge c23.delta + merged1.elements should ===(Set(user1, user2, user3, user4)) + + val merged2 = GSet.empty[String] merge c23.delta merge c13.delta merge c22.delta + merged2.elements should ===(Set(user1, user2, user3, user4)) + + merged1 should ===(c33) + merged2 should ===(c33) } "be able to have its user set correctly merged with another GSet with overlapping user sets" in { diff --git a/akka-docs/rst/java/code/docs/ddata/japi/protobuf/TwoPhaseSetSerializer.java b/akka-docs/rst/java/code/docs/ddata/japi/protobuf/TwoPhaseSetSerializer.java index a5311f7b70..524e058144 100644 --- a/akka-docs/rst/java/code/docs/ddata/japi/protobuf/TwoPhaseSetSerializer.java +++ b/akka-docs/rst/java/code/docs/ddata/japi/protobuf/TwoPhaseSetSerializer.java @@ -79,7 +79,9 @@ public class TwoPhaseSetSerializer extends AbstractSerializationSupport { for (String elem : msg.getRemovalsList()) { removals = removals.add(elem); } - return new TwoPhaseSet(adds, removals); + // GSet will accumulate deltas when adding elements, + // but those are not of interest in the result of the deserialization + return new TwoPhaseSet(adds.resetDelta(), removals.resetDelta()); } catch (Exception e) { throw new RuntimeException(e.getMessage(), e); } diff --git a/akka-docs/rst/scala/code/docs/ddata/protobuf/TwoPhaseSetSerializer.scala b/akka-docs/rst/scala/code/docs/ddata/protobuf/TwoPhaseSetSerializer.scala index 09318ebf13..1b85b384c9 100644 --- a/akka-docs/rst/scala/code/docs/ddata/protobuf/TwoPhaseSetSerializer.scala +++ b/akka-docs/rst/scala/code/docs/ddata/protobuf/TwoPhaseSetSerializer.scala @@ -51,9 +51,13 @@ class TwoPhaseSetSerializer(val system: ExtendedActorSystem) def twoPhaseSetFromBinary(bytes: Array[Byte]): TwoPhaseSet = { val msg = TwoPhaseSetMessages.TwoPhaseSet.parseFrom(bytes) - TwoPhaseSet( - adds = GSet(msg.getAddsList.iterator.asScala.toSet), - removals = GSet(msg.getRemovalsList.iterator.asScala.toSet)) + val addsSet = msg.getAddsList.iterator.asScala.toSet + val removalsSet = msg.getRemovalsList.iterator.asScala.toSet + val adds = addsSet.foldLeft(GSet.empty[String])((acc, el) => acc.add(el)) + val removals = removalsSet.foldLeft(GSet.empty[String])((acc, el) => acc.add(el)) + // GSet will accumulate deltas when adding elements, + // but those are not of interest in the result of the deserialization + TwoPhaseSet(adds.resetDelta, removals.resetDelta) } } //#serializer diff --git a/project/MiMa.scala b/project/MiMa.scala index 0e4587383a..f006c94699 100644 --- a/project/MiMa.scala +++ b/project/MiMa.scala @@ -93,6 +93,8 @@ object MiMa extends AutoPlugin { import com.typesafe.tools.mima.core._ val bcIssuesBetween24and25 = Seq( + // ##22269 GSet as delta-CRDT + ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.ddata.GSet.this"), // constructor supplied by companion object // #21875 delta-CRDT ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.ddata.GCounter.this"),