Merge pull request #23916 from gosubpl/wip/22974-gc-tombstones
=ddata Garbage collect valueDeltas tombstones at merge/mergeDelta (#22974)
This commit is contained in:
commit
74b5866f60
4 changed files with 43 additions and 30 deletions
|
|
@ -331,9 +331,13 @@ final class ORMap[A, B <: ReplicatedData] private[akka] (
|
|||
val mergedValue = thisValue.merge(thatValue.asInstanceOf[thisValue.T]).asInstanceOf[B]
|
||||
mergedValues = mergedValues.updated(key, mergedValue)
|
||||
case (Some(thisValue), None) ⇒
|
||||
mergedValues = mergedValues.updated(key, thisValue)
|
||||
if (mergedKeys.contains(key))
|
||||
mergedValues = mergedValues.updated(key, thisValue)
|
||||
// else thisValue is a tombstone, but we don't want to carry it forward, as the other side does not have the element at all
|
||||
case (None, Some(thatValue)) ⇒
|
||||
mergedValues = mergedValues.updated(key, thatValue)
|
||||
if (mergedKeys.contains(key))
|
||||
mergedValues = mergedValues.updated(key, thatValue)
|
||||
// else thatValue is a tombstone, but we don't want to carry it forward, as the other side does not have the element at all
|
||||
case (None, None) ⇒ throw new IllegalStateException(s"missing value for $key")
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -3,9 +3,9 @@
|
|||
*/
|
||||
package akka.cluster.ddata
|
||||
|
||||
import akka.cluster.{ Cluster, UniqueAddress }
|
||||
import akka.annotation.InternalApi
|
||||
import akka.cluster.ddata.ORMap._
|
||||
import akka.cluster.{ Cluster, UniqueAddress }
|
||||
|
||||
object ORMultiMap {
|
||||
/**
|
||||
|
|
@ -69,9 +69,12 @@ final class ORMultiMap[A, B] private[akka] (
|
|||
|
||||
override def merge(that: T): T =
|
||||
if (withValueDeltas == that.withValueDeltas) {
|
||||
if (withValueDeltas)
|
||||
new ORMultiMap(underlying.mergeRetainingDeletedValues(that.underlying), withValueDeltas)
|
||||
else
|
||||
if (withValueDeltas) {
|
||||
val newUnderlying = underlying.mergeRetainingDeletedValues(that.underlying)
|
||||
// Garbage collect the tombstones we no longer need, i.e. those that have Set() as a value.
|
||||
val newValues = newUnderlying.values.filterNot { case (key, value) ⇒ !newUnderlying.keys.contains(key) && value.isEmpty }
|
||||
new ORMultiMap[A, B](new ORMap(newUnderlying.keys, newValues, newUnderlying.zeroTag, newUnderlying.delta), withValueDeltas)
|
||||
} else
|
||||
new ORMultiMap(underlying.merge(that.underlying), withValueDeltas)
|
||||
} else throw new IllegalArgumentException("Trying to merge two ORMultiMaps of different map sub-type")
|
||||
|
||||
|
|
@ -253,9 +256,12 @@ final class ORMultiMap[A, B] private[akka] (
|
|||
override def delta: Option[D] = underlying.delta
|
||||
|
||||
override def mergeDelta(thatDelta: D): ORMultiMap[A, B] =
|
||||
if (withValueDeltas)
|
||||
new ORMultiMap(underlying.mergeDeltaRetainingDeletedValues(thatDelta), withValueDeltas)
|
||||
else
|
||||
if (withValueDeltas) {
|
||||
val newUnderlying = underlying.mergeDeltaRetainingDeletedValues(thatDelta)
|
||||
// Garbage collect the tombstones we no longer need, i.e. those that have Set() as a value.
|
||||
val newValues = newUnderlying.values.filterNot { case (key, value) ⇒ !newUnderlying.keys.contains(key) && value.isEmpty }
|
||||
new ORMultiMap[A, B](new ORMap(newUnderlying.keys, newValues, newUnderlying.zeroTag, newUnderlying.delta), withValueDeltas)
|
||||
} else
|
||||
new ORMultiMap(underlying.mergeDelta(thatDelta), withValueDeltas)
|
||||
|
||||
override def modifiedByNodes: Set[UniqueAddress] =
|
||||
|
|
|
|||
|
|
@ -467,8 +467,8 @@ class ORMultiMapSpec extends WordSpec with Matchers {
|
|||
val m3 = m1.mergeDelta(m2.delta.get)
|
||||
val m4 = m1.merge(m2)
|
||||
|
||||
m3.underlying.values("a").elements should ===(Set()) // tombstone for 'a' - but we can probably optimize that away, read on
|
||||
m4.underlying.values("a").elements should ===(Set()) // tombstone for 'a' - but we can probably optimize that away, read on
|
||||
m3.underlying.values.contains("a") should be(false) // tombstone for 'a' has been optimized away at the end of the mergeDelta
|
||||
m4.underlying.values.contains("a") should be(false) // tombstone for 'a' has been optimized away at the end of the merge
|
||||
|
||||
val m5 = ORMultiMap.emptyWithValueDeltas[String, String].put(node1, "a", Set("A1"))
|
||||
(m3 mergeDelta m5.delta.get).entries("a") should ===(Set("A1"))
|
||||
|
|
@ -489,8 +489,8 @@ class ORMultiMapSpec extends WordSpec with Matchers {
|
|||
val um3 = um1.mergeDelta(um2.delta.get)
|
||||
val um4 = um1.merge(um2)
|
||||
|
||||
um3.underlying.values("a").elements should ===(Set()) // tombstone for 'a' - but we can probably optimize that away, read on
|
||||
um4.underlying.values("a").elements should ===(Set()) // tombstone for 'a' - but we can probably optimize that away, read on
|
||||
um3.underlying.values.contains("a") should be(false) // tombstone for 'a' has been optimized away at the end of the mergeDelta
|
||||
um4.underlying.values.contains("a") should be(false) // tombstone for 'a' has been optimized away at the end of the merge
|
||||
|
||||
val um5 = ORMultiMap.emptyWithValueDeltas[String, String].addBinding(node1, "a", "A1")
|
||||
(um3 mergeDelta um5.delta.get).entries("a") should ===(Set("A1"))
|
||||
|
|
@ -518,11 +518,9 @@ class ORMultiMapSpec extends WordSpec with Matchers {
|
|||
tm3.mergeDelta(tm2.delta.get).entries should ===(Map.empty[String, String]) // no tombstone - update delta could not be applied
|
||||
tm3.merge(tm2).entries should ===(Map.empty[String, String])
|
||||
|
||||
// This situation gives us possibility of removing the impact of tombstones altogether, as the only valid value for tombstone
|
||||
// created by means of either API call or application of delta propagation would be Set()
|
||||
// then the tombstones being only empty sets can be entirely cleared up
|
||||
// because the merge delta operation will use in that case the natural zero from the delta.
|
||||
// Thus in case of valid API usage and normal operation of delta propagation no tombstones will be created.
|
||||
// The only valid value for tombstone created by means of either API call or application of delta propagation is Set()
|
||||
// which is then garbage collected at every `merge` and `mergeDelta` operation.
|
||||
// Hence in the case of valid API usage and normal operation of delta propagation no tombstones will be permanently created.
|
||||
}
|
||||
|
||||
"have unapply extractor" in {
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue