fix ORMap duplicated delta propagation (#22606)
This commit is contained in:
parent
2d006558ab
commit
2caef783dc
2 changed files with 43 additions and 14 deletions
|
|
@ -228,9 +228,10 @@ final class ORMap[A, B <: ReplicatedData] private[akka] (
|
|||
"value, because important history can be lost when replacing the `ORSet` and " +
|
||||
"undesired effects of merging will occur. Use `ORMultiMap` or `ORMap.updated` instead.")
|
||||
else {
|
||||
val putDeltaOp = PutDeltaOp(keys.resetDelta.add(node, key).delta.get, key → value, zeroTag)
|
||||
// put forcibly damages history, so we propagate full value that will overwrite previous values
|
||||
new ORMap(keys.add(node, key), values.updated(key, value), zeroTag, Some(newDelta(putDeltaOp)))
|
||||
val newKeys = keys.resetDelta.add(node, key)
|
||||
val putDeltaOp = PutDeltaOp(newKeys.delta.get, key → value, zeroTag)
|
||||
// put forcibly damages history, so we consciously propagate full value that will overwrite previous value
|
||||
new ORMap(newKeys, values.updated(key, value), zeroTag, Some(newDelta(putDeltaOp)))
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -263,20 +264,21 @@ final class ORMap[A, B <: ReplicatedData] private[akka] (
|
|||
// we can emit (and later merge) their deltas instead of full updates.
|
||||
// However to avoid necessity of tombstones, the derived map type needs to support this
|
||||
// with clearing the value (e.g. removing all elements if value is a set)
|
||||
// before removing the key - like e.g. ORMultiMap does
|
||||
// before removing the key - like e.g. ORMultiMap.emptyWithValueDeltas does
|
||||
val newKeys = keys.resetDelta.add(node, key)
|
||||
oldValue match {
|
||||
case _: DeltaReplicatedData if valueDeltas ⇒
|
||||
val newValue = modify(oldValue.asInstanceOf[DeltaReplicatedData].resetDelta.asInstanceOf[B])
|
||||
val newValueDelta = newValue.asInstanceOf[DeltaReplicatedData].delta
|
||||
val deltaOp = newValueDelta match {
|
||||
case Some(d) if hasOldValue ⇒ UpdateDeltaOp(keys.resetDelta.add(node, key).delta.get, Map(key → d), zeroTag)
|
||||
case _ ⇒ PutDeltaOp(keys.resetDelta.add(node, key).delta.get, key → newValue, zeroTag)
|
||||
case Some(d) if hasOldValue ⇒ UpdateDeltaOp(newKeys.delta.get, Map(key → d), zeroTag)
|
||||
case _ ⇒ PutDeltaOp(newKeys.delta.get, key → newValue, zeroTag)
|
||||
}
|
||||
new ORMap(keys.add(node, key), values.updated(key, newValue), zeroTag, Some(newDelta(deltaOp)))
|
||||
new ORMap(newKeys, values.updated(key, newValue), zeroTag, Some(newDelta(deltaOp)))
|
||||
case _ ⇒
|
||||
val newValue = modify(oldValue)
|
||||
val deltaOp = PutDeltaOp(keys.resetDelta.add(node, key).delta.get, key → newValue, zeroTag)
|
||||
new ORMap(keys.add(node, key), values.updated(key, newValue), zeroTag, Some(newDelta(deltaOp)))
|
||||
val deltaOp = PutDeltaOp(newKeys.delta.get, key → newValue, zeroTag)
|
||||
new ORMap(newKeys, values.updated(key, newValue), zeroTag, Some(newDelta(deltaOp)))
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -299,8 +301,9 @@ final class ORMap[A, B <: ReplicatedData] private[akka] (
|
|||
*/
|
||||
@InternalApi private[akka] def remove(node: UniqueAddress, key: A): ORMap[A, B] = {
|
||||
// for removals the delta values map emitted will be empty
|
||||
val removeDeltaOp = RemoveDeltaOp(keys.resetDelta.remove(node, key).delta.get, zeroTag)
|
||||
new ORMap(keys.remove(node, key), values - key, zeroTag, Some(newDelta(removeDeltaOp)))
|
||||
val newKeys = keys.resetDelta.remove(node, key)
|
||||
val removeDeltaOp = RemoveDeltaOp(newKeys.delta.get, zeroTag)
|
||||
new ORMap(newKeys, values - key, zeroTag, Some(newDelta(removeDeltaOp)))
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -309,8 +312,9 @@ final class ORMap[A, B <: ReplicatedData] private[akka] (
|
|||
* by keeping the vvector (in form of key -> value pair) for deleted keys
|
||||
*/
|
||||
@InternalApi private[akka] def removeKey(node: UniqueAddress, key: A): ORMap[A, B] = {
|
||||
val removeKeyDeltaOp = RemoveKeyDeltaOp(keys.resetDelta.remove(node, key).delta.get, key, zeroTag)
|
||||
new ORMap(keys.remove(node, key), values, zeroTag, Some(newDelta(removeKeyDeltaOp)))
|
||||
val newKeys = keys.resetDelta.remove(node, key)
|
||||
val removeKeyDeltaOp = RemoveKeyDeltaOp(newKeys.delta.get, key, zeroTag)
|
||||
new ORMap(newKeys, values, zeroTag, Some(newDelta(removeKeyDeltaOp)))
|
||||
}
|
||||
|
||||
private def dryMerge(that: ORMap[A, B], mergedKeys: ORSet[A], valueKeysIterator: Iterator[A]): ORMap[A, B] = {
|
||||
|
|
@ -353,7 +357,7 @@ final class ORMap[A, B <: ReplicatedData] private[akka] (
|
|||
|
||||
override def resetDelta: ORMap[A, B] =
|
||||
if (delta.isEmpty) this
|
||||
else new ORMap[A, B](keys, values, zeroTag = zeroTag)
|
||||
else new ORMap[A, B](keys.resetDelta, values, zeroTag = zeroTag)
|
||||
|
||||
override def mergeDelta(thatDelta: ORMap.DeltaOp): ORMap[A, B] = {
|
||||
// helper function to simplify folds below
|
||||
|
|
|
|||
|
|
@ -6,6 +6,7 @@ package akka.cluster.ddata
|
|||
|
||||
import akka.actor.Address
|
||||
import akka.cluster.UniqueAddress
|
||||
import akka.cluster.ddata.ORSet.AddDeltaOp
|
||||
import akka.cluster.ddata.Replicator.Changed
|
||||
import org.scalatest.Matchers
|
||||
import org.scalatest.WordSpec
|
||||
|
|
@ -141,6 +142,30 @@ class ORMapSpec extends WordSpec with Matchers {
|
|||
merged2.entries("c").elements should be(Set("C"))
|
||||
}
|
||||
|
||||
"do not have divergence in dot versions between the underlying map and ormap delta" in {
|
||||
val m1 = ORMap.empty.put(node1, "a", GSet.empty + "A")
|
||||
|
||||
val deltaVersion = m1.delta.get match {
|
||||
case ORMap.PutDeltaOp(delta, v, dt) ⇒
|
||||
delta match {
|
||||
case AddDeltaOp(u) ⇒
|
||||
if (u.elementsMap.contains("a"))
|
||||
Some(u.elementsMap("a").versionAt(node1))
|
||||
else
|
||||
None
|
||||
case _ ⇒ None
|
||||
}
|
||||
case _ ⇒ None
|
||||
}
|
||||
|
||||
val fullVersion =
|
||||
if (m1.keys.elementsMap.contains("a"))
|
||||
Some(m1.keys.elementsMap("a").versionAt(node1))
|
||||
else
|
||||
None
|
||||
deltaVersion should ===(fullVersion)
|
||||
}
|
||||
|
||||
"not have anomalies for remove+updated scenario and deltas" in {
|
||||
val m1 = ORMap.empty.put(node1, "a", GSet.empty + "A").put(node1, "b", GSet.empty + "B")
|
||||
val m2 = ORMap.empty.put(node2, "c", GSet.empty + "C")
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue