GSet ported to delta-CRDT (#22187)
This commit is contained in:
parent
5ffb08cd78
commit
d470321051
5 changed files with 80 additions and 20 deletions
|
|
@ -4,9 +4,11 @@
|
||||||
package akka.cluster.ddata
|
package akka.cluster.ddata
|
||||||
|
|
||||||
object GSet {
|
object GSet {
|
||||||
private val _empty: GSet[Any] = new GSet(Set.empty)
|
private val _empty: GSet[Any] = new GSet(Set.empty)(None)
|
||||||
def empty[A]: GSet[A] = _empty.asInstanceOf[GSet[A]]
|
def empty[A]: GSet[A] = _empty.asInstanceOf[GSet[A]]
|
||||||
def apply(): GSet[Any] = _empty
|
def apply(): GSet[Any] = _empty
|
||||||
|
private[akka] def apply[A](set: Set[A]): GSet[A] = new GSet(set)(None)
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Java API
|
* Java API
|
||||||
*/
|
*/
|
||||||
|
|
@ -27,7 +29,8 @@ object GSet {
|
||||||
* This class is immutable, i.e. "modifying" methods return a new instance.
|
* This class is immutable, i.e. "modifying" methods return a new instance.
|
||||||
*/
|
*/
|
||||||
@SerialVersionUID(1L)
|
@SerialVersionUID(1L)
|
||||||
final case class GSet[A](elements: Set[A]) extends ReplicatedData with ReplicatedDataSerialization with FastMerge {
|
final case class GSet[A] private (elements: Set[A])(_delta: Option[GSet[A]])
|
||||||
|
extends DeltaReplicatedData with ReplicatedDataSerialization with FastMerge {
|
||||||
|
|
||||||
type T = GSet[A]
|
type T = GSet[A]
|
||||||
|
|
||||||
|
|
@ -53,15 +56,32 @@ final case class GSet[A](elements: Set[A]) extends ReplicatedData with Replicate
|
||||||
/**
|
/**
|
||||||
* Adds an element to the set
|
* Adds an element to the set
|
||||||
*/
|
*/
|
||||||
def add(element: A): GSet[A] = assignAncestor(copy(elements + element))
|
def add(element: A): GSet[A] = {
|
||||||
|
val newDelta = _delta match {
|
||||||
|
case Some(e) ⇒ Some(new GSet(e.elements + element)(None))
|
||||||
|
case None ⇒ Some(new GSet[A](Set.apply[A](element))(None))
|
||||||
|
}
|
||||||
|
assignAncestor(new GSet[A](elements + element)(newDelta))
|
||||||
|
}
|
||||||
|
|
||||||
override def merge(that: GSet[A]): GSet[A] =
|
override def merge(that: GSet[A]): GSet[A] =
|
||||||
if ((this eq that) || that.isAncestorOf(this)) this.clearAncestor()
|
if ((this eq that) || that.isAncestorOf(this)) this.clearAncestor()
|
||||||
else if (this.isAncestorOf(that)) that.clearAncestor()
|
else if (this.isAncestorOf(that)) that.clearAncestor()
|
||||||
else {
|
else {
|
||||||
clearAncestor()
|
clearAncestor()
|
||||||
copy(elements union that.elements)
|
new GSet[A](elements union that.elements)(None)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
override def delta: GSet[A] = _delta match {
|
||||||
|
case Some(d) ⇒ d
|
||||||
|
case None ⇒ GSet.empty[A]
|
||||||
|
}
|
||||||
|
|
||||||
|
override def resetDelta: GSet[A] = new GSet[A](elements)(None)
|
||||||
|
|
||||||
|
override def toString: String = s"G$elements"
|
||||||
|
|
||||||
|
def copy(e: Set[A] = elements) = new GSet[A](e)(_delta)
|
||||||
}
|
}
|
||||||
|
|
||||||
object GSetKey {
|
object GSetKey {
|
||||||
|
|
|
||||||
|
|
@ -39,8 +39,7 @@ class GSetSpec extends WordSpec with Matchers {
|
||||||
val c12 = c11 + user1
|
val c12 = c11 + user1
|
||||||
val c13 = c12 + user2
|
val c13 = c12 + user2
|
||||||
|
|
||||||
c13.elements should contain(user1)
|
c13.elements should ===(Set(user1, user2))
|
||||||
c13.elements should contain(user2)
|
|
||||||
|
|
||||||
// set 2
|
// set 2
|
||||||
val c21 = GSet.empty[String]
|
val c21 = GSet.empty[String]
|
||||||
|
|
@ -48,21 +47,54 @@ class GSetSpec extends WordSpec with Matchers {
|
||||||
val c22 = c21 + user3
|
val c22 = c21 + user3
|
||||||
val c23 = c22 + user4
|
val c23 = c22 + user4
|
||||||
|
|
||||||
c23.elements should contain(user3)
|
c23.elements should ===(Set(user3, user4))
|
||||||
c23.elements should contain(user4)
|
|
||||||
|
|
||||||
// merge both ways
|
// merge both ways
|
||||||
val merged1 = c13 merge c23
|
val merged1 = c13 merge c23
|
||||||
merged1.elements should contain(user1)
|
merged1.elements should ===(Set(user1, user2, user3, user4))
|
||||||
merged1.elements should contain(user2)
|
|
||||||
merged1.elements should contain(user3)
|
|
||||||
merged1.elements should contain(user4)
|
|
||||||
|
|
||||||
val merged2 = c23 merge c13
|
val merged2 = c23 merge c13
|
||||||
merged2.elements should contain(user1)
|
merged2.elements should ===(Set(user1, user2, user3, user4))
|
||||||
merged2.elements should contain(user2)
|
}
|
||||||
merged2.elements should contain(user3)
|
|
||||||
merged2.elements should contain(user4)
|
"be able to work with deltas" in {
|
||||||
|
// set 1
|
||||||
|
val c11 = GSet.empty[String]
|
||||||
|
|
||||||
|
val c12 = c11 + user1
|
||||||
|
val c13 = c12 + user2
|
||||||
|
|
||||||
|
c12.delta.elements should ===(Set(user1))
|
||||||
|
c13.delta.elements should ===(Set(user1, user2))
|
||||||
|
|
||||||
|
// deltas build state
|
||||||
|
(c12 merge c13.delta) should ===(c13)
|
||||||
|
|
||||||
|
// own deltas are idempotent
|
||||||
|
(c13 merge c13.delta) should ===(c13)
|
||||||
|
|
||||||
|
// set 2
|
||||||
|
val c21 = GSet.empty[String]
|
||||||
|
|
||||||
|
val c22 = c21 + user3
|
||||||
|
val c23 = c22.resetDelta + user4
|
||||||
|
|
||||||
|
c22.delta.elements should ===(Set(user3))
|
||||||
|
c23.delta.elements should ===(Set(user4))
|
||||||
|
|
||||||
|
c23.elements should ===(Set(user3, user4))
|
||||||
|
|
||||||
|
val c33 = c13 merge c23
|
||||||
|
|
||||||
|
// merge both ways
|
||||||
|
val merged1 = GSet.empty[String] merge c12.delta merge c13.delta merge c22.delta merge c23.delta
|
||||||
|
merged1.elements should ===(Set(user1, user2, user3, user4))
|
||||||
|
|
||||||
|
val merged2 = GSet.empty[String] merge c23.delta merge c13.delta merge c22.delta
|
||||||
|
merged2.elements should ===(Set(user1, user2, user3, user4))
|
||||||
|
|
||||||
|
merged1 should ===(c33)
|
||||||
|
merged2 should ===(c33)
|
||||||
}
|
}
|
||||||
|
|
||||||
"be able to have its user set correctly merged with another GSet with overlapping user sets" in {
|
"be able to have its user set correctly merged with another GSet with overlapping user sets" in {
|
||||||
|
|
|
||||||
|
|
@ -79,7 +79,9 @@ public class TwoPhaseSetSerializer extends AbstractSerializationSupport {
|
||||||
for (String elem : msg.getRemovalsList()) {
|
for (String elem : msg.getRemovalsList()) {
|
||||||
removals = removals.add(elem);
|
removals = removals.add(elem);
|
||||||
}
|
}
|
||||||
return new TwoPhaseSet(adds, removals);
|
// GSet will accumulate deltas when adding elements,
|
||||||
|
// but those are not of interest in the result of the deserialization
|
||||||
|
return new TwoPhaseSet(adds.resetDelta(), removals.resetDelta());
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
throw new RuntimeException(e.getMessage(), e);
|
throw new RuntimeException(e.getMessage(), e);
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -51,9 +51,13 @@ class TwoPhaseSetSerializer(val system: ExtendedActorSystem)
|
||||||
|
|
||||||
def twoPhaseSetFromBinary(bytes: Array[Byte]): TwoPhaseSet = {
|
def twoPhaseSetFromBinary(bytes: Array[Byte]): TwoPhaseSet = {
|
||||||
val msg = TwoPhaseSetMessages.TwoPhaseSet.parseFrom(bytes)
|
val msg = TwoPhaseSetMessages.TwoPhaseSet.parseFrom(bytes)
|
||||||
TwoPhaseSet(
|
val addsSet = msg.getAddsList.iterator.asScala.toSet
|
||||||
adds = GSet(msg.getAddsList.iterator.asScala.toSet),
|
val removalsSet = msg.getRemovalsList.iterator.asScala.toSet
|
||||||
removals = GSet(msg.getRemovalsList.iterator.asScala.toSet))
|
val adds = addsSet.foldLeft(GSet.empty[String])((acc, el) => acc.add(el))
|
||||||
|
val removals = removalsSet.foldLeft(GSet.empty[String])((acc, el) => acc.add(el))
|
||||||
|
// GSet will accumulate deltas when adding elements,
|
||||||
|
// but those are not of interest in the result of the deserialization
|
||||||
|
TwoPhaseSet(adds.resetDelta, removals.resetDelta)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
//#serializer
|
//#serializer
|
||||||
|
|
|
||||||
|
|
@ -93,6 +93,8 @@ object MiMa extends AutoPlugin {
|
||||||
import com.typesafe.tools.mima.core._
|
import com.typesafe.tools.mima.core._
|
||||||
|
|
||||||
val bcIssuesBetween24and25 = Seq(
|
val bcIssuesBetween24and25 = Seq(
|
||||||
|
// ##22269 GSet as delta-CRDT
|
||||||
|
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.ddata.GSet.this"), // constructor supplied by companion object
|
||||||
|
|
||||||
// #21875 delta-CRDT
|
// #21875 delta-CRDT
|
||||||
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.ddata.GCounter.this"),
|
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.ddata.GCounter.this"),
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue