From 3a8eef450691fc1217085ddcc79d48b2ec340270 Mon Sep 17 00:00:00 2001 From: gosubpl Date: Fri, 31 Mar 2017 13:29:27 +0200 Subject: [PATCH] fixes to ORSet mergeRemoveDelta and ORMap deltaMerge (#22648) --- .../main/scala/akka/cluster/ddata/ORMap.scala | 123 ++++++------- .../scala/akka/cluster/ddata/ORMultiMap.scala | 5 +- .../main/scala/akka/cluster/ddata/ORSet.scala | 34 ++-- .../cluster/ddata/ReplicatorDeltaSpec.scala | 11 +- .../ddata/ReplicatorMapDeltaSpec.scala | 167 +++++++++++++++--- .../scala/akka/cluster/ddata/LWWMapSpec.scala | 3 - .../scala/akka/cluster/ddata/ORMapSpec.scala | 23 ++- .../akka/cluster/ddata/ORMultiMapSpec.scala | 140 +++++++++++++-- .../scala/akka/cluster/ddata/ORSetSpec.scala | 49 +++-- .../akka/cluster/ddata/PNCounterMapSpec.scala | 3 - .../ReplicatedDataSerializerSpec.scala | 13 +- 11 files changed, 421 insertions(+), 150 deletions(-) diff --git a/akka-distributed-data/src/main/scala/akka/cluster/ddata/ORMap.scala b/akka-distributed-data/src/main/scala/akka/cluster/ddata/ORMap.scala index 798916efa4..40d01e1d89 100644 --- a/akka-distributed-data/src/main/scala/akka/cluster/ddata/ORMap.scala +++ b/akka-distributed-data/src/main/scala/akka/cluster/ddata/ORMap.scala @@ -298,11 +298,8 @@ final class ORMap[A, B <: ReplicatedData] private[akka] ( @InternalApi private[akka] def remove(node: UniqueAddress, key: A): ORMap[A, B] = { // for removals the delta values map emitted will be empty val newKeys = keys.resetDelta.remove(node, key) - // FIXME use full state for removals, until issue #22648 is fixed - // val removeDeltaOp = RemoveDeltaOp(newKeys.delta.get, zeroTag) - // new ORMap(newKeys, values - key, zeroTag, Some(newDelta(removeDeltaOp))) - new ORMap(newKeys, values - key, zeroTag, delta = None) - + val removeDeltaOp = RemoveDeltaOp(newKeys.delta.get, zeroTag) + new ORMap(newKeys, values - key, zeroTag, Some(newDelta(removeDeltaOp))) } /** @@ -312,10 +309,8 @@ final class ORMap[A, B <: ReplicatedData] private[akka] ( */ @InternalApi private[akka] def removeKey(node: UniqueAddress, key: A): ORMap[A, B] = { val newKeys = keys.resetDelta.remove(node, key) - // FIXME use full state for removals, until issue #22648 is fixed - // val removeKeyDeltaOp = RemoveKeyDeltaOp(newKeys.delta.get, key, zeroTag) - // new ORMap(newKeys, values, zeroTag, Some(newDelta(removeKeyDeltaOp))) - new ORMap(newKeys, values, zeroTag, delta = None) + val removeKeyDeltaOp = RemoveKeyDeltaOp(newKeys.delta.get, key, zeroTag) + new ORMap(newKeys, values, zeroTag, Some(newDelta(removeKeyDeltaOp))) } private def dryMerge(that: ORMap[A, B], mergedKeys: ORSet[A], valueKeysIterator: Iterator[A]): ORMap[A, B] = { @@ -360,43 +355,58 @@ final class ORMap[A, B <: ReplicatedData] private[akka] ( if (delta.isEmpty) this else new ORMap[A, B](keys.resetDelta, values, zeroTag = zeroTag) - override def mergeDelta(thatDelta: ORMap.DeltaOp): ORMap[A, B] = { - // helper function to simplify folds below - def foldValues(values: List[(A, ReplicatedData)], initial: B) = - values.foldLeft(initial) { - case (acc: DeltaReplicatedData, (_, value: ReplicatedDelta)) ⇒ - acc.mergeDelta(value.asInstanceOf[acc.D]).asInstanceOf[B] - case (acc, (_, value)) ⇒ - acc.merge(value.asInstanceOf[acc.T]).asInstanceOf[B] + private def dryMergeDelta(thatDelta: ORMap.DeltaOp, withValueDeltas: Boolean = false): ORMap[A, B] = { + def mergeValue(lvalue: ReplicatedData, rvalue: ReplicatedData): B = + (lvalue, rvalue) match { + case (v: DeltaReplicatedData, delta: ReplicatedDelta) ⇒ + v.mergeDelta(delta.asInstanceOf[v.D]).asInstanceOf[B] + case _ ⇒ + lvalue.merge(rvalue.asInstanceOf[lvalue.T]).asInstanceOf[B] } - val mergedKeys: ORSet[A] = thatDelta match { - case d: AtomicDeltaOp[A, B] ⇒ keys.mergeDelta(d.underlying) - case ORMap.DeltaGroup(ops) ⇒ - ops.foldLeft(keys)((acc, op) ⇒ acc.mergeDelta(op.asInstanceOf[AtomicDeltaOp[A, B]].underlying)) - } - - var mergedValues = Map.empty[A, B] - var tombstonedVals = Set.empty[A] - var thatValueDeltas: Map[A, List[(A, ReplicatedData)]] = Map.empty + var mergedKeys: ORSet[A] = this.keys + var (mergedValues, tombstonedVals): (Map[A, B], Map[A, B]) = this.values.partition { case (k, _) ⇒ this.keys.contains(k) } val processDelta: PartialFunction[ORMap.DeltaOp, Unit] = { case putOp: PutDeltaOp[A, B] ⇒ - val key = putOp.value._1 - thatValueDeltas += (key → (putOp.value :: Nil)) // put is destructive! - case _: RemoveDeltaOp[A, B] ⇒ - // remove delta is only for the side effect of key being removed - // please note that if it is not preceded by update clearing the value - // anomalies will result + val keyDelta = putOp.underlying + mergedKeys = mergedKeys.mergeDelta(keyDelta) + mergedValues = mergedValues + putOp.value // put is destructive and propagates only full values of B! + case removeOp: RemoveDeltaOp[A, B] ⇒ + val removedKey = removeOp.underlying match { + // if op is RemoveDeltaOp then it must have exactly one element in the elements + case op: ORSet.RemoveDeltaOp[_] ⇒ op.underlying.elements.head.asInstanceOf[A] + case _ ⇒ throw new IllegalArgumentException("ORMap.RemoveDeltaOp must contain ORSet.RemoveDeltaOp inside") + } + mergedValues = mergedValues - removedKey + mergedKeys = mergedKeys.mergeDelta(removeOp.underlying) + // please note that if RemoveDeltaOp is not preceded by update clearing the value + // anomalies may result case removeKeyOp: RemoveKeyDeltaOp[A, B] ⇒ - tombstonedVals = tombstonedVals + removeKeyOp.removedKey + // removeKeyOp tombstones values for later use + if (mergedValues.contains(removeKeyOp.removedKey)) { + tombstonedVals = tombstonedVals + (removeKeyOp.removedKey → mergedValues(removeKeyOp.removedKey)) + } + mergedValues = mergedValues - removeKeyOp.removedKey + mergedKeys = mergedKeys.mergeDelta(removeKeyOp.underlying) case updateOp: UpdateDeltaOp[A, _] ⇒ + mergedKeys = mergedKeys.mergeDelta(updateOp.underlying) updateOp.values.foreach { case (key, value) ⇒ - if (thatValueDeltas.contains(key)) - thatValueDeltas = thatValueDeltas + (key → (thatValueDeltas(key) :+ (key → value))) - else - thatValueDeltas += (key → ((key, value) :: Nil)) + if (mergedKeys.contains(key)) { + if (mergedValues.contains(key)) { + mergedValues = mergedValues + (key → mergeValue(mergedValues(key), value)) + } else if (tombstonedVals.contains(key)) { + mergedValues = mergedValues + (key → mergeValue(tombstonedVals(key), value)) + } else { + value match { + case _: ReplicatedDelta ⇒ + mergedValues = mergedValues + (key → mergeValue(value.asInstanceOf[ReplicatedDelta].zero, value)) + case _ ⇒ + mergedValues = mergedValues + (key → value.asInstanceOf[B]) + } + } + } } } @@ -412,30 +422,25 @@ final class ORMap[A, B <: ReplicatedData] private[akka] ( (processDelta orElse processNestedDelta)(thatDelta) - val aggregateValuesForKey: (A ⇒ Unit) = { key ⇒ - (this.values.get(key), thatValueDeltas.get(key)) match { - case (Some(thisValue), Some(thatValues)) ⇒ - val mergedValue = foldValues(thatValues, thisValue) - mergedValues = mergedValues.updated(key, mergedValue) - case (Some(thisValue), None) ⇒ - mergedValues = mergedValues.updated(key, thisValue) - case (None, Some(thatValues)) ⇒ - val (_, initialValue) = thatValues.head - val mergedValue = initialValue match { - case _: ReplicatedDelta ⇒ - foldValues(thatValues, initialValue.asInstanceOf[ReplicatedDelta].zero.asInstanceOf[B]) - case _ ⇒ - foldValues(thatValues.tail, initialValue.asInstanceOf[B]) - } - mergedValues = mergedValues.updated(key, mergedValue) - case (None, None) ⇒ throw new IllegalStateException(s"missing value for $key") - } - } + if (withValueDeltas) + new ORMap[A, B](mergedKeys, tombstonedVals ++ mergedValues, zeroTag = zeroTag) + else + new ORMap[A, B](mergedKeys, mergedValues, zeroTag = zeroTag) + } - mergedKeys.elementsMap.keysIterator.foreach { aggregateValuesForKey } - tombstonedVals.foreach { aggregateValuesForKey } + override def mergeDelta(thatDelta: ORMap.DeltaOp): ORMap[A, B] = { + val thisWithDeltas = dryMergeDelta(thatDelta) + this.merge(thisWithDeltas) + } - new ORMap[A, B](mergedKeys, mergedValues, zeroTag = zeroTag) + /** + * INTERNAL API + * This function is only to be used by derived maps that avoid remove anomalies + * by keeping the vvector (in form of key -> value pair) for deleted keys + */ + @InternalApi private[akka] def mergeDeltaRetainingDeletedValues(thatDelta: ORMap.DeltaOp): ORMap[A, B] = { + val thisWithDeltas = dryMergeDelta(thatDelta, true) + this.mergeRetainingDeletedValues(thisWithDeltas) } private def newDelta(deltaOp: ORMap.DeltaOp) = delta match { diff --git a/akka-distributed-data/src/main/scala/akka/cluster/ddata/ORMultiMap.scala b/akka-distributed-data/src/main/scala/akka/cluster/ddata/ORMultiMap.scala index 25651954a2..871ddaf097 100644 --- a/akka-distributed-data/src/main/scala/akka/cluster/ddata/ORMultiMap.scala +++ b/akka-distributed-data/src/main/scala/akka/cluster/ddata/ORMultiMap.scala @@ -253,7 +253,10 @@ final class ORMultiMap[A, B] private[akka] ( override def delta: Option[D] = underlying.delta override def mergeDelta(thatDelta: D): ORMultiMap[A, B] = - new ORMultiMap(underlying.mergeDelta(thatDelta), withValueDeltas) + if (withValueDeltas) + new ORMultiMap(underlying.mergeDeltaRetainingDeletedValues(thatDelta), withValueDeltas) + else + new ORMultiMap(underlying.mergeDelta(thatDelta), withValueDeltas) override def modifiedByNodes: Set[UniqueAddress] = underlying.modifiedByNodes diff --git a/akka-distributed-data/src/main/scala/akka/cluster/ddata/ORSet.scala b/akka-distributed-data/src/main/scala/akka/cluster/ddata/ORSet.scala index ee8dd6ff3e..93a73bebd2 100644 --- a/akka-distributed-data/src/main/scala/akka/cluster/ddata/ORSet.scala +++ b/akka-distributed-data/src/main/scala/akka/cluster/ddata/ORSet.scala @@ -345,15 +345,13 @@ final class ORSet[A] private[akka] ( * INTERNAL API */ @InternalApi private[akka] def remove(node: UniqueAddress, element: A): ORSet[A] = { - // FIXME use full state for removals, until issue #22648 is fixed - // val deltaDot = VersionVector(node, vvector.versionAt(node)) - // val rmOp = ORSet.RemoveDeltaOp(new ORSet(Map(element → deltaDot), vvector)) - // val newDelta = delta match { - // case None ⇒ rmOp - // case Some(d) ⇒ d.merge(rmOp) - // } - // assignAncestor(copy(elementsMap = elementsMap - element, delta = Some(newDelta))) - assignAncestor(copy(elementsMap = elementsMap - element, delta = None)) + val deltaDot = VersionVector(node, vvector.versionAt(node)) + val rmOp = ORSet.RemoveDeltaOp(new ORSet(Map(element → deltaDot), vvector)) + val newDelta = delta match { + case None ⇒ rmOp + case Some(d) ⇒ d.merge(rmOp) + } + assignAncestor(copy(elementsMap = elementsMap - element, delta = Some(newDelta))) } /** @@ -439,9 +437,17 @@ final class ORSet[A] private[akka] ( val (elem, thatDot) = that.elementsMap.head def deleteDots = that.vvector.versionsIterator def deleteDotsNodes = deleteDots.map { case (dotNode, _) ⇒ dotNode } - val newElementsMap = - if (deleteDots.forall { case (dotNode, dotV) ⇒ this.vvector.versionAt(dotNode) <= dotV }) { - elementsMap.get(elem) match { + val newElementsMap = { + val thisDotOption = this.elementsMap.get(elem) + val deleteDotsAreGreater = deleteDots.forall { + case (dotNode, dotV) ⇒ + thisDotOption match { + case Some(thisDot) ⇒ thisDot.versionAt(dotNode) <= dotV + case None ⇒ false + } + } + if (deleteDotsAreGreater) { + thisDotOption match { case Some(thisDot) ⇒ if (thisDot.versionsIterator.forall { case (thisDotNode, _) ⇒ deleteDotsNodes.contains(thisDotNode) }) elementsMap - elem @@ -449,9 +455,9 @@ final class ORSet[A] private[akka] ( case None ⇒ elementsMap } - } else { + } else elementsMap - } + } clearAncestor() val newVvector = vvector.merge(that.vvector) new ORSet(newElementsMap, newVvector) diff --git a/akka-distributed-data/src/multi-jvm/scala/akka/cluster/ddata/ReplicatorDeltaSpec.scala b/akka-distributed-data/src/multi-jvm/scala/akka/cluster/ddata/ReplicatorDeltaSpec.scala index 3df79d27ff..7b1f70a26f 100644 --- a/akka-distributed-data/src/multi-jvm/scala/akka/cluster/ddata/ReplicatorDeltaSpec.scala +++ b/akka-distributed-data/src/multi-jvm/scala/akka/cluster/ddata/ReplicatorDeltaSpec.scala @@ -118,12 +118,11 @@ object ReplicatorDeltaSpec extends MultiNodeConfig { case 3 ⇒ // ORSet val key = rndOrSetkey() - // FIXME use full state for removals, until issue #22648 is fixed - // // only removals for KeyF on node first - // if (key == KeyF && onNode == first && rnd.nextBoolean()) - // Remove(key, rndRemoveElement(), consistency()) - // else - Add(key, rndAddElement(), consistency()) + // only removals for KeyF on node first + if (key == KeyF && onNode == first && rnd.nextBoolean()) + Remove(key, rndRemoveElement(), consistency()) + else + Add(key, rndAddElement(), consistency()) } }.toVector } diff --git a/akka-distributed-data/src/multi-jvm/scala/akka/cluster/ddata/ReplicatorMapDeltaSpec.scala b/akka-distributed-data/src/multi-jvm/scala/akka/cluster/ddata/ReplicatorMapDeltaSpec.scala index f4297facd7..e094182936 100644 --- a/akka-distributed-data/src/multi-jvm/scala/akka/cluster/ddata/ReplicatorMapDeltaSpec.scala +++ b/akka-distributed-data/src/multi-jvm/scala/akka/cluster/ddata/ReplicatorMapDeltaSpec.scala @@ -6,8 +6,7 @@ package akka.cluster.ddata import java.util.concurrent.ThreadLocalRandom import scala.concurrent.duration._ - -import akka.cluster.Cluster +import akka.cluster.{ Cluster, ddata } import akka.cluster.ddata.Replicator._ import akka.remote.testconductor.RoleName import akka.remote.testkit.MultiNodeConfig @@ -38,21 +37,38 @@ object ReplicatorMapDeltaSpec extends MultiNodeConfig { final case class Delay(n: Int) extends Op final case class Incr(ki: (PNCounterMapKey[String], String), n: Int, consistency: WriteConsistency) extends Op final case class Decr(ki: (PNCounterMapKey[String], String), n: Int, consistency: WriteConsistency) extends Op - final case class Add(ki: (ORMultiMapKey[String, String], String), elem: String, consistency: WriteConsistency) extends Op - final case class Remove(ki: (ORMultiMapKey[String, String], String), elem: String, consistency: WriteConsistency) extends Op + // AddVD and RemoveVD for variant of ORMultiMap with Value Deltas, NoVD - for the vanilla ORMultiMap + final case class AddVD(ki: (ORMultiMapKey[String, String], String), elem: String, consistency: WriteConsistency) extends Op + final case class RemoveVD(ki: (ORMultiMapKey[String, String], String), elem: String, consistency: WriteConsistency) extends Op + final case class AddNoVD(ki: (ORMultiMapKey[String, String], String), elem: String, consistency: WriteConsistency) extends Op + final case class RemoveNoVD(ki: (ORMultiMapKey[String, String], String), elem: String, consistency: WriteConsistency) extends Op + // AddOM and RemoveOM for Vanilla ORMap holding ORSet inside + final case class AddOM(ki: (ORMapKey[String, ORSet[String]], String), elem: String, consistency: WriteConsistency) extends Op + final case class RemoveOM(ki: (ORMapKey[String, ORSet[String]], String), elem: String, consistency: WriteConsistency) extends Op val timeout = 5.seconds val writeTwo = WriteTo(2, timeout) val writeMajority = WriteMajority(timeout) val KeyPN = PNCounterMapKey[String]("A") - val KeyMM = ORMultiMapKey[String, String]("D") + // VD and NoVD as above + val KeyMMVD = ORMultiMapKey[String, String]("D") + val KeyMMNoVD = ORMultiMapKey[String, String]("G") + // OM as above + val KeyOM = ORMapKey[String, ORSet[String]]("J") + val KeyA: (PNCounterMapKey[String], String) = (KeyPN, "a") val KeyB: (PNCounterMapKey[String], String) = (KeyPN, "b") val KeyC: (PNCounterMapKey[String], String) = (KeyPN, "c") - val KeyD: (ORMultiMapKey[String, String], String) = (KeyMM, "d") - val KeyE: (ORMultiMapKey[String, String], String) = (KeyMM, "e") - val KeyF: (ORMultiMapKey[String, String], String) = (KeyMM, "f") + val KeyD: (ORMultiMapKey[String, String], String) = (KeyMMVD, "d") + val KeyE: (ORMultiMapKey[String, String], String) = (KeyMMVD, "e") + val KeyF: (ORMultiMapKey[String, String], String) = (KeyMMVD, "f") + val KeyG: (ORMultiMapKey[String, String], String) = (KeyMMNoVD, "g") + val KeyH: (ORMultiMapKey[String, String], String) = (KeyMMNoVD, "h") + val KeyI: (ORMultiMapKey[String, String], String) = (KeyMMNoVD, "i") + val KeyJ: (ORMapKey[String, ORSet[String]], String) = (KeyOM, "j") + val KeyK: (ORMapKey[String, ORSet[String]], String) = (KeyOM, "k") + val KeyL: (ORMapKey[String, ORSet[String]], String) = (KeyOM, "l") def generateOperations(onNode: RoleName): Vector[Op] = { val rnd = ThreadLocalRandom.current() @@ -73,7 +89,7 @@ object ReplicatorMapDeltaSpec extends MultiNodeConfig { } } - def rndOrSetkey(): (ORMultiMapKey[String, String], String) = { + def rndOrSetkeyVD(): (ORMultiMapKey[String, String], String) = { rnd.nextInt(3) match { case 0 ⇒ KeyD case 1 ⇒ KeyE @@ -81,6 +97,22 @@ object ReplicatorMapDeltaSpec extends MultiNodeConfig { } } + def rndOrSetkeyNoVD(): (ORMultiMapKey[String, String], String) = { + rnd.nextInt(3) match { + case 0 ⇒ KeyG + case 1 ⇒ KeyH + case 2 ⇒ KeyI + } + } + + def rndOrSetkeyOM(): (ORMapKey[String, ORSet[String]], String) = { + rnd.nextInt(3) match { + case 0 ⇒ KeyJ + case 1 ⇒ KeyK + case 2 ⇒ KeyL + } + } + var availableForRemove = Set.empty[String] def rndAddElement(): String = { @@ -97,24 +129,44 @@ object ReplicatorMapDeltaSpec extends MultiNodeConfig { availableForRemove.toVector(rnd.nextInt(availableForRemove.size)) } - (0 to (30 + rnd.nextInt(10))).map { _ ⇒ - rnd.nextInt(4) match { + (0 to (50 + rnd.nextInt(10))).map { _ ⇒ + rnd.nextInt(6) match { case 0 ⇒ Delay(rnd.nextInt(500)) case 1 ⇒ Incr(rndPnCounterkey(), rnd.nextInt(100), consistency()) case 2 ⇒ Decr(rndPnCounterkey(), rnd.nextInt(10), consistency()) case 3 ⇒ - // ORSet - val key = rndOrSetkey() - // FIXME use full state for removals, until issue #22648 is fixed - // // only removals for KeyF on node first - // if (key == KeyF && onNode == first && rnd.nextBoolean()) - // Remove(key, rndRemoveElement(), consistency()) - // else - Add(key, rndAddElement(), consistency()) + // ORMultiMap.withValueDeltas + val key = rndOrSetkeyVD() + // only removals for KeyF on node first + if (key == KeyF && onNode == first && rnd.nextBoolean()) + RemoveVD(key, rndRemoveElement(), consistency()) + else + AddVD(key, rndAddElement(), consistency()) + case 4 ⇒ + // ORMultiMap - vanilla variant - without Value Deltas + val key = rndOrSetkeyNoVD() + // only removals for KeyI on node first + if (key == KeyI && onNode == first && rnd.nextBoolean()) + RemoveNoVD(key, rndRemoveElement(), consistency()) + else + AddNoVD(key, rndAddElement(), consistency()) + case 5 ⇒ + // Vanilla ORMap - with ORSet inside + val key = rndOrSetkeyOM() + // only removals for KeyL on node first + if (key == KeyL && onNode == first && rnd.nextBoolean()) + RemoveOM(key, rndRemoveElement(), consistency()) + else + AddOM(key, rndAddElement(), consistency()) } }.toVector } + def addElementToORMap(om: ORMap[String, ORSet[String]], key: String, element: String)(implicit node: Cluster) = + om.updated(node, key, ORSet.empty[String])(_.add(node, element)) + + def removeElementFromORMap(om: ORMap[String, ORSet[String]], key: String, element: String)(implicit node: Cluster) = + om.updated(node, key, ORSet.empty[String])(_.remove(node, element)) } class ReplicatorMapDeltaSpecMultiJvmNode1 extends ReplicatorMapDeltaSpec @@ -191,6 +243,14 @@ class ReplicatorMapDeltaSpec extends MultiNodeSpec(ReplicatorMapDeltaSpec) with fullStateReplicator ! Update(key._1, ORMultiMap.emptyWithValueDeltas[String, String], WriteLocal)(_ + (key._2 → Set("a"))) deltaReplicator ! Update(key._1, ORMultiMap.emptyWithValueDeltas[String, String], WriteLocal)(_ + (key._2 → Set("a"))) } + List(KeyG, KeyH, KeyI).foreach { key ⇒ + fullStateReplicator ! Update(key._1, ORMultiMap.empty[String, String], WriteLocal)(_ + (key._2 → Set("a"))) + deltaReplicator ! Update(key._1, ORMultiMap.empty[String, String], WriteLocal)(_ + (key._2 → Set("a"))) + } + List(KeyJ, KeyK, KeyL).foreach { key ⇒ + fullStateReplicator ! Update(key._1, ORMap.empty[String, ORSet[String]], WriteLocal)(_ + (key._2 → (ORSet.empty + "a"))) + deltaReplicator ! Update(key._1, ORMap.empty[String, ORSet[String]], WriteLocal)(_ + (key._2 → (ORSet.empty + "a"))) + } } enterBarrier("updated-1") @@ -205,10 +265,25 @@ class ReplicatorMapDeltaSpec extends MultiNodeSpec(ReplicatorMapDeltaSpec) with awaitAssert { val p = TestProbe() List(KeyD, KeyE, KeyF).foreach { key ⇒ + fullStateReplicator.tell(Get(key._1, ReadLocal), p.ref) + val res = p.expectMsgType[GetSuccess[ORMultiMap[String, String]]].dataValue.get(key._2) should ===(Some(Set("a"))) + } + } + awaitAssert { + val p = TestProbe() + List(KeyG, KeyH, KeyI).foreach { key ⇒ fullStateReplicator.tell(Get(key._1, ReadLocal), p.ref) p.expectMsgType[GetSuccess[ORMultiMap[String, String]]].dataValue.get(key._2) should ===(Some(Set("a"))) } } + awaitAssert { + val p = TestProbe() + List(KeyJ, KeyK, KeyL).foreach { key ⇒ + fullStateReplicator.tell(Get(key._1, ReadLocal), p.ref) + val res = p.expectMsgType[GetSuccess[ORMap[String, ORSet[String]]]].dataValue.get(key._2) + res.map(_.elements) should ===(Some(Set("a"))) + } + } } enterBarrierAfterTestStep() @@ -231,18 +306,42 @@ class ReplicatorMapDeltaSpec extends MultiNodeSpec(ReplicatorMapDeltaSpec) with case Decr(key, n, consistency) ⇒ fullStateReplicator ! Update(key._1, PNCounterMap.empty[String], WriteLocal)(_ decrement (key._2, n)) deltaReplicator ! Update(key._1, PNCounterMap.empty[String], WriteLocal)(_ decrement (key._2, n)) - case Add(key, elem, consistency) ⇒ + case AddVD(key, elem, consistency) ⇒ // to have an deterministic result when mixing add/remove we can only perform // the ORSet operations from one node runOn((if (key == KeyF) List(first) else List(first, second, third)): _*) { fullStateReplicator ! Update(key._1, ORMultiMap.emptyWithValueDeltas[String, String], WriteLocal)(_ addBinding (key._2, elem)) deltaReplicator ! Update(key._1, ORMultiMap.emptyWithValueDeltas[String, String], WriteLocal)(_ addBinding (key._2, elem)) } - case Remove(key, elem, consistency) ⇒ + case RemoveVD(key, elem, consistency) ⇒ runOn(first) { fullStateReplicator ! Update(key._1, ORMultiMap.emptyWithValueDeltas[String, String], WriteLocal)(_ removeBinding (key._2, elem)) deltaReplicator ! Update(key._1, ORMultiMap.emptyWithValueDeltas[String, String], WriteLocal)(_ removeBinding (key._2, elem)) } + case AddNoVD(key, elem, consistency) ⇒ + // to have an deterministic result when mixing add/remove we can only perform + // the ORSet operations from one node + runOn((if (key == KeyI) List(first) else List(first, second, third)): _*) { + fullStateReplicator ! Update(key._1, ORMultiMap.empty[String, String], WriteLocal)(_ addBinding (key._2, elem)) + deltaReplicator ! Update(key._1, ORMultiMap.empty[String, String], WriteLocal)(_ addBinding (key._2, elem)) + } + case RemoveNoVD(key, elem, consistency) ⇒ + runOn(first) { + fullStateReplicator ! Update(key._1, ORMultiMap.empty[String, String], WriteLocal)(_ removeBinding (key._2, elem)) + deltaReplicator ! Update(key._1, ORMultiMap.empty[String, String], WriteLocal)(_ removeBinding (key._2, elem)) + } + case AddOM(key, elem, consistency) ⇒ + // to have an deterministic result when mixing add/remove we can only perform + // the ORSet operations from one node + runOn((if (key == KeyL) List(first) else List(first, second, third)): _*) { + fullStateReplicator ! Update(key._1, ORMap.empty[String, ORSet[String]], WriteLocal)(om ⇒ addElementToORMap(om, key._2, elem)) + deltaReplicator ! Update(key._1, ORMap.empty[String, ORSet[String]], WriteLocal)(om ⇒ addElementToORMap(om, key._2, elem)) + } + case RemoveOM(key, elem, consistency) ⇒ + runOn(first) { + fullStateReplicator ! Update(key._1, ORMap.empty[String, ORSet[String]], WriteLocal)(om ⇒ removeElementFromORMap(om, key._2, elem)) + deltaReplicator ! Update(key._1, ORMap.empty[String, ORSet[String]], WriteLocal)(om ⇒ removeElementFromORMap(om, key._2, elem)) + } } } @@ -274,6 +373,32 @@ class ReplicatorMapDeltaSpec extends MultiNodeSpec(ReplicatorMapDeltaSpec) with } } + List(KeyG, KeyH, KeyI).foreach { key ⇒ + within(5.seconds) { + awaitAssert { + val p = TestProbe() + fullStateReplicator.tell(Get(key._1, ReadLocal), p.ref) + val fullStateValue = p.expectMsgType[GetSuccess[ORMultiMap[String, String]]].dataValue.get(key._2) + deltaReplicator.tell(Get(key._1, ReadLocal), p.ref) + val deltaValue = p.expectMsgType[GetSuccess[ORMultiMap[String, String]]].dataValue.get(key._2) + deltaValue should ===(fullStateValue) + } + } + } + + List(KeyJ, KeyK, KeyL).foreach { key ⇒ + within(5.seconds) { + awaitAssert { + val p = TestProbe() + fullStateReplicator.tell(Get(key._1, ReadLocal), p.ref) + val fullStateValue = p.expectMsgType[GetSuccess[ORMap[String, ORSet[String]]]].dataValue.get(key._2) + deltaReplicator.tell(Get(key._1, ReadLocal), p.ref) + val deltaValue = p.expectMsgType[GetSuccess[ORMap[String, ORSet[String]]]].dataValue.get(key._2) + deltaValue.map(_.elements) should ===(fullStateValue.map(_.elements)) + } + } + } + enterBarrierAfterTestStep() } catch { case e: Throwable ⇒ diff --git a/akka-distributed-data/src/test/scala/akka/cluster/ddata/LWWMapSpec.scala b/akka-distributed-data/src/test/scala/akka/cluster/ddata/LWWMapSpec.scala index 67a2a5ad3a..37f364d393 100644 --- a/akka-distributed-data/src/test/scala/akka/cluster/ddata/LWWMapSpec.scala +++ b/akka-distributed-data/src/test/scala/akka/cluster/ddata/LWWMapSpec.scala @@ -60,9 +60,6 @@ class LWWMapSpec extends WordSpec with Matchers { val merged1 = m1 merge m2 - // FIXME use full state for removals, until issue #22648 is fixed - pending - val m3 = merged1.resetDelta.remove(node1, "b") (merged1 mergeDelta m3.delta.get).entries should be(Map("a" → 1, "c" → 3)) diff --git a/akka-distributed-data/src/test/scala/akka/cluster/ddata/ORMapSpec.scala b/akka-distributed-data/src/test/scala/akka/cluster/ddata/ORMapSpec.scala index 65286b1c6c..716557702f 100644 --- a/akka-distributed-data/src/test/scala/akka/cluster/ddata/ORMapSpec.scala +++ b/akka-distributed-data/src/test/scala/akka/cluster/ddata/ORMapSpec.scala @@ -55,9 +55,6 @@ class ORMapSpec extends WordSpec with Matchers { } "be able to remove entry using a delta" in { - // FIXME use full state for removals, until issue #22648 is fixed - pending - val m = ORMap().put(node1, "a", GSet() + "A").put(node1, "b", GSet() + "B") val addDelta = m.delta.get @@ -328,9 +325,6 @@ class ORMapSpec extends WordSpec with Matchers { } "not have anomalies for remove+updated scenario and deltas 8" in { - // FIXME use full state for removals, until issue #22648 is fixed - pending - val m1 = ORMap.empty.put(node1, "a", GSet.empty + "A") .put(node1, "b", GSet.empty + "B").put(node2, "b", GSet.empty + "B") val m2 = ORMap.empty.put(node2, "c", GSet.empty + "C") @@ -354,9 +348,6 @@ class ORMapSpec extends WordSpec with Matchers { } "not have anomalies for remove+updated scenario and deltas 9" in { - // FIXME use full state for removals, until issue #22648 is fixed - pending - val m1 = ORMap.empty.put(node1, "a", GSet.empty + "A") .put(node1, "b", GSet.empty + "B").put(node2, "b", GSet.empty + "B") val m2 = ORMap.empty.put(node2, "c", GSet.empty + "C") @@ -398,6 +389,20 @@ class ORMapSpec extends WordSpec with Matchers { merged3.entries("b").elements should be(Set("B2", "B3")) } + "not have anomalies for remove+updated scenario and deltas 11" in { + val m1 = ORMap.empty.put(node1, "a", GSet.empty + "A") + + val m2 = ORMap.empty.put(node2, "a", GSet.empty[String]).remove(node2, "a") + + val merged1 = m1 merge m2 + + merged1.entries("a").elements should be(Set("A")) + + val merged2 = m1 mergeDelta m2.delta.get + + merged2.entries("a").elements should be(Set("A")) + } + "have the usual anomalies for remove+updated scenario" in { // please note that the current ORMultiMap has the same anomaly // because the condition of keeping global vvector is violated diff --git a/akka-distributed-data/src/test/scala/akka/cluster/ddata/ORMultiMapSpec.scala b/akka-distributed-data/src/test/scala/akka/cluster/ddata/ORMultiMapSpec.scala index 830b70da2e..180821b7de 100644 --- a/akka-distributed-data/src/test/scala/akka/cluster/ddata/ORMultiMapSpec.scala +++ b/akka-distributed-data/src/test/scala/akka/cluster/ddata/ORMultiMapSpec.scala @@ -78,9 +78,6 @@ class ORMultiMapSpec extends WordSpec with Matchers { val merged2 = m2 merge m1 merged2.entries should be(expectedMerged) - // FIXME use full state for removals, until issue #22648 is fixed - pending - val merged3 = m1 mergeDelta m2.delta.get merged3.entries should be(expectedMerged) @@ -89,6 +86,47 @@ class ORMultiMapSpec extends WordSpec with Matchers { } } + "be able to have its entries correctly merged with another ORMultiMap with overlapping entries 2" in { + val m1 = ORMultiMap() + .addBinding(node1, "b", "B1") + val m2 = ORMultiMap() + .addBinding(node2, "b", "B2") + .remove(node2, "b") + + // merge both ways + + val expectedMerged = Map( + "b" → Set("B1")) + + val merged1 = m1 merge m2 + merged1.entries should be(expectedMerged) + + val merged2 = m2 merge m1 + merged2.entries should be(expectedMerged) + + val merged3 = m1 mergeDelta m2.delta.get + merged3.entries should be(expectedMerged) + + val merged4 = m2 mergeDelta m1.delta.get + merged4.entries should be(expectedMerged) + } + + "not have anomalies for remove+updated scenario and deltas" in { + val m2a = ORMultiMap.empty[String, String].addBinding(node1, "q", "Q").removeBinding(node1, "q", "Q") + val m1 = ORMultiMap.empty[String, String].addBinding(node1, "z", "Z").addBinding(node2, "x", "X") + .removeBinding(node1, "z", "Z") + + val m2 = m2a.resetDelta.removeBinding(node2, "a", "A") + + val merged1 = m1 merge m2 + + merged1.contains("a") should be(false) + + val merged2 = m1 mergeDelta m2.delta.get + + merged2.contains("a") should be(false) + } + "be able to get all bindings for an entry and then reduce them upon putting them back" in { val m = ORMultiMap().addBinding(node1, "a", "A1").addBinding(node1, "a", "A2").addBinding(node1, "b", "B1") val Some(a) = m.get("a") @@ -117,16 +155,13 @@ class ORMultiMapSpec extends WordSpec with Matchers { } "not have usual anomalies for remove+addBinding scenario and delta-deltas" in { - // FIXME use full state for removals, until issue #22648 is fixed - pending - val m1 = ORMultiMap.emptyWithValueDeltas[String, String].put(node1, "a", Set("A")).put(node1, "b", Set("B")) val m2 = ORMultiMap.emptyWithValueDeltas[String, String].put(node2, "c", Set("C")) val merged1 = m1 merge m2 val m3 = merged1.resetDelta.remove(node1, "b") - val m4 = merged1.resetDelta.addBinding(node1, "b", "B2") + val m4 = m3.resetDelta.addBinding(node1, "b", "B2") val merged2 = m3 merge m4 @@ -140,17 +175,26 @@ class ORMultiMapSpec extends WordSpec with Matchers { merged3.entries("b") should be(Set("B2")) merged3.entries("c") should be(Set("C")) - val merged4 = merged1 mergeDelta m3.delta.get.merge(m4.delta.get) + val merged4 = merged1 merge m3 merge m4 merged4.entries("a") should be(Set("A")) merged4.entries("b") should be(Set("B2")) merged4.entries("c") should be(Set("C")) + + val merged5 = merged1 mergeDelta m3.delta.get mergeDelta m4.delta.get + + merged5.entries("a") should be(Set("A")) + merged5.entries("b") should be(Set("B2")) + merged5.entries("c") should be(Set("C")) + + val merged6 = merged1 mergeDelta m3.delta.get.merge(m4.delta.get) + + merged6.entries("a") should be(Set("A")) + merged6.entries("b") should be(Set("B2")) + merged6.entries("c") should be(Set("C")) } "not have usual anomalies for remove+addBinding scenario and delta-deltas 2" in { - // FIXME use full state for removals, until issue #22648 is fixed - pending - // the new delta-delta ORMultiMap is free from this anomaly val m1 = ORMultiMap.emptyWithValueDeltas[String, String].put(node1, "a", Set("A")).put(node1, "b", Set("B")) val m2 = ORMultiMap.emptyWithValueDeltas[String, String].put(node2, "c", Set("C")) @@ -407,6 +451,80 @@ class ORMultiMapSpec extends WordSpec with Matchers { merged12.entries("b") should be(Set("B2", "B3")) } + "work with tombstones for ORMultiMap.withValueDeltas and its delta-delta operations" in { + // ORMultiMap.withValueDeltas has the following (public) interface: + // put - place (or replace) a value in a destructive way - no tombstone is created + // this can be seen in the relevant delta: PutDeltaOp(AddDeltaOp(ORSet(a)),(a,ORSet()),ORMultiMapWithValueDeltasTag) + // remove - to avoid anomalies that ORMultiMap has, value for the key being removed is being cleared + // before key removal, this can be seen in the following deltas created by the remove op (depending on situation): + // DeltaGroup(Vector(PutDeltaOp(AddDeltaOp(ORSet(a)),(a,ORSet()),ORMultiMapWithValueDeltasTag), RemoveKeyDeltaOp(RemoveDeltaOp(ORSet(a)),a,ORMultiMapWithValueDeltasTag))) + // DeltaGroup(Vector(UpdateDeltaOp(AddDeltaOp(ORSet(c)),Map(c -> FullStateDeltaOp(ORSet())),ORMultiMapWithValueDeltasTag), RemoveKeyDeltaOp(RemoveDeltaOp(ORSet(c)),c,ORMultiMapWithValueDeltasTag))) + // after applying the remove operation the tombstone for the given map looks as follows: Map(a -> ORSet()) (or Map(c -> ORSet()) ) + + val m1 = ORMultiMap.emptyWithValueDeltas[String, String].put(node1, "a", Set("A")) + val m2 = m1.resetDelta.remove(node1, "a") + + val m3 = m1.mergeDelta(m2.delta.get) + val m4 = m1.merge(m2) + + m3.underlying.values("a").elements should ===(Set()) // tombstone for 'a' - but we can probably optimize that away, read on + m4.underlying.values("a").elements should ===(Set()) // tombstone for 'a' - but we can probably optimize that away, read on + + val m5 = ORMultiMap.emptyWithValueDeltas[String, String].put(node1, "a", Set("A1")) + (m3 mergeDelta m5.delta.get).entries("a") should ===(Set("A1")) + (m4 mergeDelta m5.delta.get).entries("a") should ===(Set("A1")) + (m4 merge m5).entries("a") should ===(Set("A1")) + + // addBinding - add a binding for a certain value - no tombstone is created + // this operation works through "updated" call of the underlying ORMap, that is not exposed + // in the ORMultiMap interface + // the side-effect of addBinding is that it can lead to anomalies with the standard "ORMultiMap" + + // removeBinding - remove binding for a certain value, and if there are no more remaining elements, remove + // the now superfluous key, please note that for .withValueDeltas variant tombstone will be created + + val um1 = ORMultiMap.emptyWithValueDeltas[String, String].addBinding(node1, "a", "A") + val um2 = um1.resetDelta.removeBinding(node1, "a", "A") + + val um3 = um1.mergeDelta(um2.delta.get) + val um4 = um1.merge(um2) + + um3.underlying.values("a").elements should ===(Set()) // tombstone for 'a' - but we can probably optimize that away, read on + um4.underlying.values("a").elements should ===(Set()) // tombstone for 'a' - but we can probably optimize that away, read on + + val um5 = ORMultiMap.emptyWithValueDeltas[String, String].addBinding(node1, "a", "A1") + (um3 mergeDelta um5.delta.get).entries("a") should ===(Set("A1")) + (um4 mergeDelta um5.delta.get).entries("a") should ===(Set("A1")) + (um4 merge um5).entries("a") should ===(Set("A1")) + + // replaceBinding - that would first addBinding for new binding and then removeBinding for old binding + // so no tombstone would be created + + // so the only option to create a tombstone with non-zero (!= Set() ) contents would be to call removeKey (not remove!) + // for the underlying ORMap (or have a removeKeyOp delta that does exactly that) + // but this is not possible in applications, as both remove and removeKey operations are API of internal ORMap + // and are not externally exposed in the ORMultiMap, and deltas are causal, so removeKeyOp delta cannot arise + // without previous delta containing 'clear' or 'put' operation setting the tombstone at Set() + // the example shown below cannot happen in practice + + val tm1 = new ORMultiMap(ORMultiMap.emptyWithValueDeltas[String, String].addBinding(node1, "a", "A").underlying.removeKey(node1, "a"), true) + tm1.underlying.values("a").elements should ===(Set("A")) // tombstone + tm1.addBinding(node1, "a", "A1").entries("a") should be(Set("A", "A1")) + val tm2 = ORMultiMap.emptyWithValueDeltas[String, String].put(node1, "a", Set("A")).resetDelta.addBinding(node1, "a", "A1") + tm1.mergeDelta(tm2.delta.get).entries("a") should be(Set("A", "A1")) + tm1.merge(tm2).entries("a") should be(Set("A", "A1")) + val tm3 = new ORMultiMap(ORMultiMap.emptyWithValueDeltas[String, String].addBinding(node1, "a", "A").underlying.remove(node1, "a"), true) + tm3.underlying.contains("a") should ===(false) // no tombstone, because remove not removeKey + tm3.mergeDelta(tm2.delta.get).entries should ===(Map.empty[String, String]) // no tombstone - update delta could not be applied + tm3.merge(tm2).entries should ===(Map.empty[String, String]) + + // This situation gives us possibility of removing the impact of tombstones altogether, as the only valid value for tombstone + // created by means of either API call or application of delta propagation would be Set() + // then the tombstones being only empty sets can be entirely cleared up + // because the merge delta operation will use in that case the natural zero from the delta. + // Thus in case of valid API usage and normal operation of delta propagation no tombstones will be created. + } + "have unapply extractor" in { val m1 = ORMultiMap.empty.put(node1, "a", Set(1L, 2L)).put(node2, "b", Set(3L)) val m2: ORMultiMap[String, Long] = m1 diff --git a/akka-distributed-data/src/test/scala/akka/cluster/ddata/ORSetSpec.scala b/akka-distributed-data/src/test/scala/akka/cluster/ddata/ORSetSpec.scala index 7a2b68c72b..32dcaa9f7f 100644 --- a/akka-distributed-data/src/test/scala/akka/cluster/ddata/ORSetSpec.scala +++ b/akka-distributed-data/src/test/scala/akka/cluster/ddata/ORSetSpec.scala @@ -294,9 +294,6 @@ class ORSetSpec extends WordSpec with Matchers { s1.mergeDelta(d4) should ===(s3) s2.mergeDelta(d4) should ===(s3) - // FIXME use full state for removals, until issue #22648 is fixed - pending - val s5 = s3.resetDelta.remove(node1, "b") val d5 = s5.delta.get val d6 = (d4 merge d5).asInstanceOf[ORSet.DeltaGroup[String]] @@ -315,9 +312,6 @@ class ORSetSpec extends WordSpec with Matchers { } "work for removals" in { - // FIXME use full state for removals, until issue #22648 is fixed - pending - val s1 = ORSet.empty[String] val s2 = s1.add(node1, "a").add(node1, "b").resetDelta val s3 = s2.remove(node1, "b") @@ -363,9 +357,6 @@ class ORSetSpec extends WordSpec with Matchers { } "handle a mixed add/remove scenario" in { - // FIXME use full state for removals, until issue #22648 is fixed - pending - val s1 = ORSet.empty[String] val s2 = s1.resetDelta.remove(node1, "e") val s3 = s2.resetDelta.add(node1, "b") @@ -385,9 +376,6 @@ class ORSetSpec extends WordSpec with Matchers { } "handle a mixed add/remove scenario 2" in { - // FIXME use full state for removals, until issue #22648 is fixed - pending - val s1 = ORSet.empty[String] val s2 = s1.resetDelta.add(node1, "a") val s3 = s2.resetDelta.add(node1, "b") @@ -410,9 +398,6 @@ class ORSetSpec extends WordSpec with Matchers { } "handle a mixed add/remove scenario 3" in { - // FIXME use full state for removals, until issue #22648 is fixed - pending - val s1 = ORSet.empty[String] val s2 = s1.resetDelta.add(node1, "a") val s3 = s2.resetDelta.add(node1, "b") @@ -434,6 +419,40 @@ class ORSetSpec extends WordSpec with Matchers { t4.elements should ===(Set("b", "a")) } + "not have anomalies for ORSet in complex but realistic scenario" in { + val node1_1 = ORSet.empty[String].add(node1, "q").remove(node1, "q") + val delta1_1 = node1_1.delta.get + val node1_2 = node1_1.resetDelta.resetDelta.add(node1, "z").remove(node1, "z") + val delta1_2 = node1_2.delta.get + // we finished doing stuff on node1 - there are two separate deltas that will be propagated + // node2 is created, then gets first delta from node1 and then adds an element "x" + val node2_1 = ORSet.empty[String].mergeDelta(delta1_1).resetDelta.add(node2, "x") + val delta2_1 = node2_1.delta.get + // node2 continues its existence adding and later removing the element + // it still didn't get the second update from node1 (that is fully legit :) ) + val node2_2 = node2_1.resetDelta.add(node2, "a").remove(node2, "a") + val delta2_2 = node2_2.delta.get + + // in the meantime there is some node3 + // there is not much activity on it, it just gets the first delta from node1 then it gets + // first delta from node2 + // then it gets the second delta from node1 (that node2 still didn't get, but, hey!, this is fine) + val node3_1 = ORSet.empty[String].mergeDelta(delta1_1).mergeDelta(delta2_1).mergeDelta(delta1_2) + + // and node3_1 receives full update from node2 via gossip + val merged1 = node3_1 merge node2_2 + + merged1.contains("a") should be(false) + + // and node3_1 receives delta update from node2 (it just needs to get the second delta, + // as it already got the first delta just a second ago) + + val merged2 = node3_1 mergeDelta delta2_2 + + val ORSet(mg2) = merged2 + mg2 should be(Set("x")) // !!! + } + "require causal delivery of deltas" in { // This test illustrates why we need causal delivery of deltas. // Otherwise the following could happen. diff --git a/akka-distributed-data/src/test/scala/akka/cluster/ddata/PNCounterMapSpec.scala b/akka-distributed-data/src/test/scala/akka/cluster/ddata/PNCounterMapSpec.scala index b82e767952..5ac4915a33 100644 --- a/akka-distributed-data/src/test/scala/akka/cluster/ddata/PNCounterMapSpec.scala +++ b/akka-distributed-data/src/test/scala/akka/cluster/ddata/PNCounterMapSpec.scala @@ -56,9 +56,6 @@ class PNCounterMapSpec extends WordSpec with Matchers { val merged1 = m1 merge m2 - // FIXME use full state for removals, until issue #22648 is fixed - pending - val m3 = merged1.resetDelta.remove(node1, "b") (merged1 mergeDelta m3.delta.get).entries should be(Map("a" → 1, "c" → 7)) diff --git a/akka-distributed-data/src/test/scala/akka/cluster/ddata/protobuf/ReplicatedDataSerializerSpec.scala b/akka-distributed-data/src/test/scala/akka/cluster/ddata/protobuf/ReplicatedDataSerializerSpec.scala index 35f90c8e9e..5a63607339 100644 --- a/akka-distributed-data/src/test/scala/akka/cluster/ddata/protobuf/ReplicatedDataSerializerSpec.scala +++ b/akka-distributed-data/src/test/scala/akka/cluster/ddata/protobuf/ReplicatedDataSerializerSpec.scala @@ -113,9 +113,8 @@ class ReplicatedDataSerializerSpec extends TestKit(ActorSystem( "serialize ORSet delta" in { checkSerialization(ORSet().add(address1, "a").delta.get) - // FIXME use full state for removals, until issue #22648 is fixed - //checkSerialization(ORSet().add(address1, "a").resetDelta.remove(address2, "a").delta.get) - // checkSerialization(ORSet().add(address1, "a").remove(address2, "a").delta.get) + checkSerialization(ORSet().add(address1, "a").resetDelta.remove(address2, "a").delta.get) + checkSerialization(ORSet().add(address1, "a").remove(address2, "a").delta.get) checkSerialization(ORSet().add(address1, "a").resetDelta.clear(address2).delta.get) checkSerialization(ORSet().add(address1, "a").clear(address2).delta.get) } @@ -197,9 +196,8 @@ class ReplicatedDataSerializerSpec extends TestKit(ActorSystem( "serialize ORMap delta" in { checkSerialization(ORMap().put(address1, "a", GSet() + "A").put(address2, "b", GSet() + "B").delta.get) - // FIXME use full state for removals, until issue #22648 is fixed - // checkSerialization(ORMap().put(address1, "a", GSet() + "A").resetDelta.remove(address2, "a").delta.get) - // checkSerialization(ORMap().put(address1, "a", GSet() + "A").remove(address2, "a").delta.get) + checkSerialization(ORMap().put(address1, "a", GSet() + "A").resetDelta.remove(address2, "a").delta.get) + checkSerialization(ORMap().put(address1, "a", GSet() + "A").remove(address2, "a").delta.get) checkSerialization(ORMap().put(address1, 1, GSet() + "A").delta.get) checkSerialization(ORMap().put(address1, 1L, GSet() + "A").delta.get) checkSerialization(ORMap.empty[String, ORSet[String]] @@ -283,8 +281,7 @@ class ReplicatedDataSerializerSpec extends TestKit(ActorSystem( checkSerialization(ORMultiMap._emptyWithValueDeltas.addBinding(address1, 1, "A")) checkSerialization(ORMultiMap._emptyWithValueDeltas.addBinding(address1, 1L, "A")) checkSerialization(ORMultiMap._emptyWithValueDeltas.addBinding(address1, Flag(), "A")) - // FIXME use full state for removals, until issue #22648 is fixed - // checkSerialization(ORMultiMap.emptyWithValueDeltas[String, String].addBinding(address1, "a", "A").remove(address1, "a").delta.get) + checkSerialization(ORMultiMap.emptyWithValueDeltas[String, String].addBinding(address1, "a", "A").remove(address1, "a").delta.get) checkSerialization(ORMultiMap.emptyWithValueDeltas[String, String] .addBinding(address1, "a", "A1") .put(address2, "b", Set("B1", "B2", "B3"))