=ddata Garbage collect valueDeltas tombstones at merge/mergeDelta #22974
This commit is contained in:
parent
c8748e8cf0
commit
cb6e9b1e49
4 changed files with 43 additions and 30 deletions
|
|
@ -331,9 +331,13 @@ final class ORMap[A, B <: ReplicatedData] private[akka] (
|
||||||
val mergedValue = thisValue.merge(thatValue.asInstanceOf[thisValue.T]).asInstanceOf[B]
|
val mergedValue = thisValue.merge(thatValue.asInstanceOf[thisValue.T]).asInstanceOf[B]
|
||||||
mergedValues = mergedValues.updated(key, mergedValue)
|
mergedValues = mergedValues.updated(key, mergedValue)
|
||||||
case (Some(thisValue), None) ⇒
|
case (Some(thisValue), None) ⇒
|
||||||
mergedValues = mergedValues.updated(key, thisValue)
|
if (mergedKeys.contains(key))
|
||||||
|
mergedValues = mergedValues.updated(key, thisValue)
|
||||||
|
// else thisValue is a tombstone, but we don't want to carry it forward, as the other side does not have the element at all
|
||||||
case (None, Some(thatValue)) ⇒
|
case (None, Some(thatValue)) ⇒
|
||||||
mergedValues = mergedValues.updated(key, thatValue)
|
if (mergedKeys.contains(key))
|
||||||
|
mergedValues = mergedValues.updated(key, thatValue)
|
||||||
|
// else thatValue is a tombstone, but we don't want to carry it forward, as the other side does not have the element at all
|
||||||
case (None, None) ⇒ throw new IllegalStateException(s"missing value for $key")
|
case (None, None) ⇒ throw new IllegalStateException(s"missing value for $key")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -3,9 +3,9 @@
|
||||||
*/
|
*/
|
||||||
package akka.cluster.ddata
|
package akka.cluster.ddata
|
||||||
|
|
||||||
import akka.cluster.{ Cluster, UniqueAddress }
|
|
||||||
import akka.annotation.InternalApi
|
import akka.annotation.InternalApi
|
||||||
import akka.cluster.ddata.ORMap._
|
import akka.cluster.ddata.ORMap._
|
||||||
|
import akka.cluster.{ Cluster, UniqueAddress }
|
||||||
|
|
||||||
object ORMultiMap {
|
object ORMultiMap {
|
||||||
/**
|
/**
|
||||||
|
|
@ -69,9 +69,12 @@ final class ORMultiMap[A, B] private[akka] (
|
||||||
|
|
||||||
override def merge(that: T): T =
|
override def merge(that: T): T =
|
||||||
if (withValueDeltas == that.withValueDeltas) {
|
if (withValueDeltas == that.withValueDeltas) {
|
||||||
if (withValueDeltas)
|
if (withValueDeltas) {
|
||||||
new ORMultiMap(underlying.mergeRetainingDeletedValues(that.underlying), withValueDeltas)
|
val newUnderlying = underlying.mergeRetainingDeletedValues(that.underlying)
|
||||||
else
|
// Garbage collect the tombstones we no longer need, i.e. those that have Set() as a value.
|
||||||
|
val newValues = newUnderlying.values.filterNot { case (key, value) ⇒ !newUnderlying.keys.contains(key) && value.isEmpty }
|
||||||
|
new ORMultiMap[A, B](new ORMap(newUnderlying.keys, newValues, newUnderlying.zeroTag, newUnderlying.delta), withValueDeltas)
|
||||||
|
} else
|
||||||
new ORMultiMap(underlying.merge(that.underlying), withValueDeltas)
|
new ORMultiMap(underlying.merge(that.underlying), withValueDeltas)
|
||||||
} else throw new IllegalArgumentException("Trying to merge two ORMultiMaps of different map sub-type")
|
} else throw new IllegalArgumentException("Trying to merge two ORMultiMaps of different map sub-type")
|
||||||
|
|
||||||
|
|
@ -253,9 +256,12 @@ final class ORMultiMap[A, B] private[akka] (
|
||||||
override def delta: Option[D] = underlying.delta
|
override def delta: Option[D] = underlying.delta
|
||||||
|
|
||||||
override def mergeDelta(thatDelta: D): ORMultiMap[A, B] =
|
override def mergeDelta(thatDelta: D): ORMultiMap[A, B] =
|
||||||
if (withValueDeltas)
|
if (withValueDeltas) {
|
||||||
new ORMultiMap(underlying.mergeDeltaRetainingDeletedValues(thatDelta), withValueDeltas)
|
val newUnderlying = underlying.mergeDeltaRetainingDeletedValues(thatDelta)
|
||||||
else
|
// Garbage collect the tombstones we no longer need, i.e. those that have Set() as a value.
|
||||||
|
val newValues = newUnderlying.values.filterNot { case (key, value) ⇒ !newUnderlying.keys.contains(key) && value.isEmpty }
|
||||||
|
new ORMultiMap[A, B](new ORMap(newUnderlying.keys, newValues, newUnderlying.zeroTag, newUnderlying.delta), withValueDeltas)
|
||||||
|
} else
|
||||||
new ORMultiMap(underlying.mergeDelta(thatDelta), withValueDeltas)
|
new ORMultiMap(underlying.mergeDelta(thatDelta), withValueDeltas)
|
||||||
|
|
||||||
override def modifiedByNodes: Set[UniqueAddress] =
|
override def modifiedByNodes: Set[UniqueAddress] =
|
||||||
|
|
|
||||||
|
|
@ -467,8 +467,8 @@ class ORMultiMapSpec extends WordSpec with Matchers {
|
||||||
val m3 = m1.mergeDelta(m2.delta.get)
|
val m3 = m1.mergeDelta(m2.delta.get)
|
||||||
val m4 = m1.merge(m2)
|
val m4 = m1.merge(m2)
|
||||||
|
|
||||||
m3.underlying.values("a").elements should ===(Set()) // tombstone for 'a' - but we can probably optimize that away, read on
|
m3.underlying.values.contains("a") should be(false) // tombstone for 'a' has been optimized away at the end of the mergeDelta
|
||||||
m4.underlying.values("a").elements should ===(Set()) // tombstone for 'a' - but we can probably optimize that away, read on
|
m4.underlying.values.contains("a") should be(false) // tombstone for 'a' has been optimized away at the end of the merge
|
||||||
|
|
||||||
val m5 = ORMultiMap.emptyWithValueDeltas[String, String].put(node1, "a", Set("A1"))
|
val m5 = ORMultiMap.emptyWithValueDeltas[String, String].put(node1, "a", Set("A1"))
|
||||||
(m3 mergeDelta m5.delta.get).entries("a") should ===(Set("A1"))
|
(m3 mergeDelta m5.delta.get).entries("a") should ===(Set("A1"))
|
||||||
|
|
@ -489,8 +489,8 @@ class ORMultiMapSpec extends WordSpec with Matchers {
|
||||||
val um3 = um1.mergeDelta(um2.delta.get)
|
val um3 = um1.mergeDelta(um2.delta.get)
|
||||||
val um4 = um1.merge(um2)
|
val um4 = um1.merge(um2)
|
||||||
|
|
||||||
um3.underlying.values("a").elements should ===(Set()) // tombstone for 'a' - but we can probably optimize that away, read on
|
um3.underlying.values.contains("a") should be(false) // tombstone for 'a' has been optimized away at the end of the mergeDelta
|
||||||
um4.underlying.values("a").elements should ===(Set()) // tombstone for 'a' - but we can probably optimize that away, read on
|
um4.underlying.values.contains("a") should be(false) // tombstone for 'a' has been optimized away at the end of the merge
|
||||||
|
|
||||||
val um5 = ORMultiMap.emptyWithValueDeltas[String, String].addBinding(node1, "a", "A1")
|
val um5 = ORMultiMap.emptyWithValueDeltas[String, String].addBinding(node1, "a", "A1")
|
||||||
(um3 mergeDelta um5.delta.get).entries("a") should ===(Set("A1"))
|
(um3 mergeDelta um5.delta.get).entries("a") should ===(Set("A1"))
|
||||||
|
|
@ -518,11 +518,9 @@ class ORMultiMapSpec extends WordSpec with Matchers {
|
||||||
tm3.mergeDelta(tm2.delta.get).entries should ===(Map.empty[String, String]) // no tombstone - update delta could not be applied
|
tm3.mergeDelta(tm2.delta.get).entries should ===(Map.empty[String, String]) // no tombstone - update delta could not be applied
|
||||||
tm3.merge(tm2).entries should ===(Map.empty[String, String])
|
tm3.merge(tm2).entries should ===(Map.empty[String, String])
|
||||||
|
|
||||||
// This situation gives us possibility of removing the impact of tombstones altogether, as the only valid value for tombstone
|
// The only valid value for tombstone created by means of either API call or application of delta propagation is Set()
|
||||||
// created by means of either API call or application of delta propagation would be Set()
|
// which is then garbage collected at every `merge` and `mergeDelta` operation.
|
||||||
// then the tombstones being only empty sets can be entirely cleared up
|
// Hence in the case of valid API usage and normal operation of delta propagation no tombstones will be permanently created.
|
||||||
// because the merge delta operation will use in that case the natural zero from the delta.
|
|
||||||
// Thus in case of valid API usage and normal operation of delta propagation no tombstones will be created.
|
|
||||||
}
|
}
|
||||||
|
|
||||||
"have unapply extractor" in {
|
"have unapply extractor" in {
|
||||||
|
|
|
||||||
|
|
@ -469,17 +469,6 @@ delivery of deltas. Support for deltas here means that the `ORSet` being underly
|
||||||
uses delta propagation to deliver updates. Effectively, the update for map is then a pair, consisting of delta for the `ORSet`
|
uses delta propagation to deliver updates. Effectively, the update for map is then a pair, consisting of delta for the `ORSet`
|
||||||
being the key and full update for the respective value (`ORSet`, `PNCounter` or `LWWRegister`) kept in the map.
|
being the key and full update for the respective value (`ORSet`, `PNCounter` or `LWWRegister`) kept in the map.
|
||||||
|
|
||||||
There is a special version of `ORMultiMap`, created by using separate constructor
|
|
||||||
`ORMultiMap.emptyWithValueDeltas[A, B]`, that also propagates the updates to its values (of `ORSet` type) as deltas.
|
|
||||||
This means that the `ORMultiMap` initiated with `ORMultiMap.emptyWithValueDeltas` propagates its updates as pairs
|
|
||||||
consisting of delta of the key and delta of the value. It is much more efficient in terms of network bandwith consumed.
|
|
||||||
However, this behaviour has not been made default for `ORMultiMap` because currently the merge process for
|
|
||||||
updates for `ORMultiMap.emptyWithValueDeltas` results in a tombstone (being a form of [CRDT Garbage](#crdt-garbage) )
|
|
||||||
in form of additional `ORSet` entry being created in a situation when a key has been added and then removed.
|
|
||||||
There is ongoing work aimed at removing necessity of creation of the aforementioned tombstone. Please also note
|
|
||||||
that despite having the same Scala type, `ORMultiMap.emptyWithValueDeltas` is not compatible with 'vanilla' `ORMultiMap`,
|
|
||||||
because of different replication mechanism.
|
|
||||||
|
|
||||||
Scala
|
Scala
|
||||||
: @@snip [DistributedDataDocSpec.scala]($code$/scala/docs/ddata/DistributedDataDocSpec.scala) { #ormultimap }
|
: @@snip [DistributedDataDocSpec.scala]($code$/scala/docs/ddata/DistributedDataDocSpec.scala) { #ormultimap }
|
||||||
|
|
||||||
|
|
@ -487,13 +476,29 @@ Java
|
||||||
: @@snip [DistributedDataDocTest.java]($code$/java/jdocs/ddata/DistributedDataDocTest.java) { #ormultimap }
|
: @@snip [DistributedDataDocTest.java]($code$/java/jdocs/ddata/DistributedDataDocTest.java) { #ormultimap }
|
||||||
|
|
||||||
When a data entry is changed the full state of that entry is replicated to other nodes, i.e.
|
When a data entry is changed the full state of that entry is replicated to other nodes, i.e.
|
||||||
when you update a map the whole map is replicated. Therefore, instead of using one `ORMap`
|
when you update a map, the whole map is replicated. Therefore, instead of using one `ORMap`
|
||||||
with 1000 elements it is more efficient to split that up in 10 top level `ORMap` entries
|
with 1000 elements it is more efficient to split that up in 10 top level `ORMap` entries
|
||||||
with 100 elements each. Top level entries are replicated individually, which has the
|
with 100 elements each. Top level entries are replicated individually, which has the
|
||||||
trade-off that different entries may not be replicated at the same time and you may see
|
trade-off that different entries may not be replicated at the same time and you may see
|
||||||
inconsistencies between related entries. Separate top level entries cannot be updated atomically
|
inconsistencies between related entries. Separate top level entries cannot be updated atomically
|
||||||
together.
|
together.
|
||||||
|
|
||||||
|
There is a special version of `ORMultiMap`, created by using separate constructor
|
||||||
|
`ORMultiMap.emptyWithValueDeltas[A, B]`, that also propagates the updates to its values (of `ORSet` type) as deltas.
|
||||||
|
This means that the `ORMultiMap` initiated with `ORMultiMap.emptyWithValueDeltas` propagates its updates as pairs
|
||||||
|
consisting of delta of the key and delta of the value. It is much more efficient in terms of network bandwith consumed.
|
||||||
|
|
||||||
|
However, this behaviour has not been made default for `ORMultiMap` and if you wish to use it in your code, you
|
||||||
|
need to replace invocations of `ORMultiMap.empty[A, B]` (or `ORMultiMap()`) with `ORMultiMap.emptyWithValueDeltas[A, B]`
|
||||||
|
where `A` and `B` are types respectively of keys and values in the map.
|
||||||
|
|
||||||
|
Please also note, that despite having the same Scala type, `ORMultiMap.emptyWithValueDeltas`
|
||||||
|
is not compatible with 'vanilla' `ORMultiMap`, because of different replication mechanism.
|
||||||
|
One needs to be extra careful not to mix the two, as they have the same
|
||||||
|
type, so compiler will not hint the error.
|
||||||
|
Nonetheless `ORMultiMap.emptyWithValueDeltas` uses the same `ORMultiMapKey` type as the
|
||||||
|
'vanilla' `ORMultiMap` for referencing.
|
||||||
|
|
||||||
Note that `LWWRegister` and therefore `LWWMap` relies on synchronized clocks and should only be used
|
Note that `LWWRegister` and therefore `LWWMap` relies on synchronized clocks and should only be used
|
||||||
when the choice of value is not important for concurrent updates occurring within the clock skew. Read more
|
when the choice of value is not important for concurrent updates occurring within the clock skew. Read more
|
||||||
in the below section about `LWWRegister`.
|
in the below section about `LWWRegister`.
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue