diff --git a/akka-distributed-data/src/main/scala/akka/cluster/ddata/ORMap.scala b/akka-distributed-data/src/main/scala/akka/cluster/ddata/ORMap.scala index b8336f2239..4f1fe0f478 100644 --- a/akka-distributed-data/src/main/scala/akka/cluster/ddata/ORMap.scala +++ b/akka-distributed-data/src/main/scala/akka/cluster/ddata/ORMap.scala @@ -228,9 +228,10 @@ final class ORMap[A, B <: ReplicatedData] private[akka] ( "value, because important history can be lost when replacing the `ORSet` and " + "undesired effects of merging will occur. Use `ORMultiMap` or `ORMap.updated` instead.") else { - val putDeltaOp = PutDeltaOp(keys.resetDelta.add(node, key).delta.get, key → value, zeroTag) - // put forcibly damages history, so we propagate full value that will overwrite previous values - new ORMap(keys.add(node, key), values.updated(key, value), zeroTag, Some(newDelta(putDeltaOp))) + val newKeys = keys.resetDelta.add(node, key) + val putDeltaOp = PutDeltaOp(newKeys.delta.get, key → value, zeroTag) + // put forcibly damages history, so we consciously propagate full value that will overwrite previous value + new ORMap(newKeys, values.updated(key, value), zeroTag, Some(newDelta(putDeltaOp))) } /** @@ -263,20 +264,21 @@ final class ORMap[A, B <: ReplicatedData] private[akka] ( // we can emit (and later merge) their deltas instead of full updates. // However to avoid necessity of tombstones, the derived map type needs to support this // with clearing the value (e.g. removing all elements if value is a set) - // before removing the key - like e.g. ORMultiMap does + // before removing the key - like e.g. ORMultiMap.emptyWithValueDeltas does + val newKeys = keys.resetDelta.add(node, key) oldValue match { case _: DeltaReplicatedData if valueDeltas ⇒ val newValue = modify(oldValue.asInstanceOf[DeltaReplicatedData].resetDelta.asInstanceOf[B]) val newValueDelta = newValue.asInstanceOf[DeltaReplicatedData].delta val deltaOp = newValueDelta match { - case Some(d) if hasOldValue ⇒ UpdateDeltaOp(keys.resetDelta.add(node, key).delta.get, Map(key → d), zeroTag) - case _ ⇒ PutDeltaOp(keys.resetDelta.add(node, key).delta.get, key → newValue, zeroTag) + case Some(d) if hasOldValue ⇒ UpdateDeltaOp(newKeys.delta.get, Map(key → d), zeroTag) + case _ ⇒ PutDeltaOp(newKeys.delta.get, key → newValue, zeroTag) } - new ORMap(keys.add(node, key), values.updated(key, newValue), zeroTag, Some(newDelta(deltaOp))) + new ORMap(newKeys, values.updated(key, newValue), zeroTag, Some(newDelta(deltaOp))) case _ ⇒ val newValue = modify(oldValue) - val deltaOp = PutDeltaOp(keys.resetDelta.add(node, key).delta.get, key → newValue, zeroTag) - new ORMap(keys.add(node, key), values.updated(key, newValue), zeroTag, Some(newDelta(deltaOp))) + val deltaOp = PutDeltaOp(newKeys.delta.get, key → newValue, zeroTag) + new ORMap(newKeys, values.updated(key, newValue), zeroTag, Some(newDelta(deltaOp))) } } @@ -299,8 +301,9 @@ final class ORMap[A, B <: ReplicatedData] private[akka] ( */ @InternalApi private[akka] def remove(node: UniqueAddress, key: A): ORMap[A, B] = { // for removals the delta values map emitted will be empty - val removeDeltaOp = RemoveDeltaOp(keys.resetDelta.remove(node, key).delta.get, zeroTag) - new ORMap(keys.remove(node, key), values - key, zeroTag, Some(newDelta(removeDeltaOp))) + val newKeys = keys.resetDelta.remove(node, key) + val removeDeltaOp = RemoveDeltaOp(newKeys.delta.get, zeroTag) + new ORMap(newKeys, values - key, zeroTag, Some(newDelta(removeDeltaOp))) } /** @@ -309,8 +312,9 @@ final class ORMap[A, B <: ReplicatedData] private[akka] ( * by keeping the vvector (in form of key -> value pair) for deleted keys */ @InternalApi private[akka] def removeKey(node: UniqueAddress, key: A): ORMap[A, B] = { - val removeKeyDeltaOp = RemoveKeyDeltaOp(keys.resetDelta.remove(node, key).delta.get, key, zeroTag) - new ORMap(keys.remove(node, key), values, zeroTag, Some(newDelta(removeKeyDeltaOp))) + val newKeys = keys.resetDelta.remove(node, key) + val removeKeyDeltaOp = RemoveKeyDeltaOp(newKeys.delta.get, key, zeroTag) + new ORMap(newKeys, values, zeroTag, Some(newDelta(removeKeyDeltaOp))) } private def dryMerge(that: ORMap[A, B], mergedKeys: ORSet[A], valueKeysIterator: Iterator[A]): ORMap[A, B] = { @@ -353,7 +357,7 @@ final class ORMap[A, B <: ReplicatedData] private[akka] ( override def resetDelta: ORMap[A, B] = if (delta.isEmpty) this - else new ORMap[A, B](keys, values, zeroTag = zeroTag) + else new ORMap[A, B](keys.resetDelta, values, zeroTag = zeroTag) override def mergeDelta(thatDelta: ORMap.DeltaOp): ORMap[A, B] = { // helper function to simplify folds below diff --git a/akka-distributed-data/src/test/scala/akka/cluster/ddata/ORMapSpec.scala b/akka-distributed-data/src/test/scala/akka/cluster/ddata/ORMapSpec.scala index a9d245d1c4..262c3da232 100644 --- a/akka-distributed-data/src/test/scala/akka/cluster/ddata/ORMapSpec.scala +++ b/akka-distributed-data/src/test/scala/akka/cluster/ddata/ORMapSpec.scala @@ -6,6 +6,7 @@ package akka.cluster.ddata import akka.actor.Address import akka.cluster.UniqueAddress +import akka.cluster.ddata.ORSet.AddDeltaOp import akka.cluster.ddata.Replicator.Changed import org.scalatest.Matchers import org.scalatest.WordSpec @@ -141,6 +142,30 @@ class ORMapSpec extends WordSpec with Matchers { merged2.entries("c").elements should be(Set("C")) } + "do not have divergence in dot versions between the underlying map and ormap delta" in { + val m1 = ORMap.empty.put(node1, "a", GSet.empty + "A") + + val deltaVersion = m1.delta.get match { + case ORMap.PutDeltaOp(delta, v, dt) ⇒ + delta match { + case AddDeltaOp(u) ⇒ + if (u.elementsMap.contains("a")) + Some(u.elementsMap("a").versionAt(node1)) + else + None + case _ ⇒ None + } + case _ ⇒ None + } + + val fullVersion = + if (m1.keys.elementsMap.contains("a")) + Some(m1.keys.elementsMap("a").versionAt(node1)) + else + None + deltaVersion should ===(fullVersion) + } + "not have anomalies for remove+updated scenario and deltas" in { val m1 = ORMap.empty.put(node1, "a", GSet.empty + "A").put(node1, "b", GSet.empty + "B") val m2 = ORMap.empty.put(node2, "c", GSet.empty + "C")