ORMap and friends have deltas (#22350)
This commit is contained in:
parent
40b883cda7
commit
1f2ef60174
15 changed files with 4371 additions and 140 deletions
|
|
@ -7,6 +7,9 @@ import akka.cluster.Cluster
|
|||
import akka.cluster.UniqueAddress
|
||||
import akka.util.HashCode
|
||||
import akka.annotation.InternalApi
|
||||
import akka.cluster.ddata.ORMap.{ AtomicDeltaOp, ZeroTag }
|
||||
|
||||
import scala.collection.immutable
|
||||
|
||||
object ORMap {
|
||||
private val _empty: ORMap[Any, ReplicatedData] = new ORMap(ORSet.empty, Map.empty)
|
||||
|
|
@ -22,6 +25,125 @@ object ORMap {
|
|||
*/
|
||||
def unapply[A, B <: ReplicatedData](m: ORMap[A, B]): Option[Map[A, B]] = Some(m.entries)
|
||||
|
||||
sealed trait DeltaOp extends ReplicatedDelta with RequiresCausalDeliveryOfDeltas {
|
||||
type T = DeltaOp
|
||||
override def zero: DeltaReplicatedData
|
||||
}
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
*/
|
||||
@InternalApi private[akka] def emptyWithPNCounterMapTag[A, B <: ReplicatedData]: ORMap[A, B] = new ORMap(ORSet.empty, Map.empty, zeroTag = PNCounterMapTag)
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
*/
|
||||
@InternalApi private[akka] def emptyWithORMultiMapTag[A, B <: ReplicatedData]: ORMap[A, B] = new ORMap(ORSet.empty, Map.empty, zeroTag = ORMultiMapTag)
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
*/
|
||||
@InternalApi private[akka] def emptyWithORMultiMapWithValueDeltasTag[A, B <: ReplicatedData]: ORMap[A, B] = new ORMap(ORSet.empty, Map.empty, zeroTag = ORMultiMapWithValueDeltasTag)
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
*/
|
||||
@InternalApi private[akka] def emptyWithLWWMapTag[A, B <: ReplicatedData]: ORMap[A, B] = new ORMap(ORSet.empty, Map.empty, zeroTag = LWWMapTag)
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
* Tags for ORMap.DeltaOp's, so that when the Replicator needs to re-create full value from delta,
|
||||
* the right map type will be used
|
||||
*/
|
||||
@InternalApi private[akka] trait ZeroTag {
|
||||
def zero: DeltaReplicatedData
|
||||
def value: Int
|
||||
}
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
*/
|
||||
@InternalApi private[akka] case object VanillaORMapTag extends ZeroTag {
|
||||
override def zero: DeltaReplicatedData = ORMap.empty
|
||||
override final val value: Int = 0
|
||||
}
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
*/
|
||||
@InternalApi private[akka] case object PNCounterMapTag extends ZeroTag {
|
||||
override def zero: DeltaReplicatedData = PNCounterMap.empty
|
||||
override final val value: Int = 1
|
||||
}
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
*/
|
||||
@InternalApi private[akka] case object ORMultiMapTag extends ZeroTag {
|
||||
override def zero: DeltaReplicatedData = ORMultiMap.empty
|
||||
override final val value: Int = 2
|
||||
}
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
*/
|
||||
@InternalApi private[akka] case object ORMultiMapWithValueDeltasTag extends ZeroTag {
|
||||
override def zero: DeltaReplicatedData = ORMultiMap.emptyWithValueDeltas
|
||||
override final val value: Int = 3
|
||||
}
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
*/
|
||||
@InternalApi private[akka] case object LWWMapTag extends ZeroTag {
|
||||
override def zero: DeltaReplicatedData = LWWMap.empty
|
||||
override final val value: Int = 4
|
||||
}
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
*/
|
||||
@InternalApi private[akka] sealed abstract class AtomicDeltaOp[A, B <: ReplicatedData] extends DeltaOp {
|
||||
def underlying: ORSet.DeltaOp
|
||||
def zeroTag: ZeroTag
|
||||
override def zero: DeltaReplicatedData = zeroTag.zero
|
||||
|
||||
override def merge(that: DeltaOp): DeltaOp = that match {
|
||||
case other: AtomicDeltaOp[A, B] ⇒ DeltaGroup(Vector(this, other))
|
||||
case DeltaGroup(ops) ⇒ DeltaGroup(this +: ops)
|
||||
}
|
||||
}
|
||||
|
||||
// PutDeltaOp contains ORSet delta and full value
|
||||
/** INTERNAL API */
|
||||
@InternalApi private[akka] final case class PutDeltaOp[A, B <: ReplicatedData](underlying: ORSet.DeltaOp, value: (A, B), zeroTag: ZeroTag = VanillaORMapTag) extends AtomicDeltaOp[A, B] {
|
||||
}
|
||||
|
||||
// UpdateDeltaOp contains ORSet delta and either delta of value (in case where underlying type supports deltas) or full value
|
||||
/** INTERNAL API */
|
||||
@InternalApi private[akka] final case class UpdateDeltaOp[A, X <: ReplicatedDelta](underlying: ORSet.DeltaOp, values: Map[A, X], zeroTag: ZeroTag = VanillaORMapTag) extends AtomicDeltaOp[A, X] {
|
||||
}
|
||||
|
||||
// RemoveDeltaOp does not contain any value at all - the propagated 'value' map is empty
|
||||
/** INTERNAL API */
|
||||
@InternalApi private[akka] final case class RemoveDeltaOp[A, B <: ReplicatedData](underlying: ORSet.DeltaOp, zeroTag: ZeroTag = VanillaORMapTag) extends AtomicDeltaOp[A, B] {
|
||||
}
|
||||
|
||||
// RemoveKeyDeltaOp contains a single value - to provide the recipient with the removed key for value map
|
||||
/** INTERNAL API */
|
||||
@InternalApi private[akka] final case class RemoveKeyDeltaOp[A, B <: ReplicatedData](underlying: ORSet.DeltaOp, removedKey: A, zeroTag: ZeroTag = VanillaORMapTag) extends AtomicDeltaOp[A, B] {
|
||||
}
|
||||
|
||||
// DeltaGroup is effectively a causally ordered list of individual deltas
|
||||
/** INTERNAL API */
|
||||
@InternalApi private[akka] final case class DeltaGroup[A, B <: ReplicatedData](ops: immutable.IndexedSeq[DeltaOp]) extends DeltaOp {
|
||||
override def merge(that: DeltaOp): DeltaOp = that match {
|
||||
case DeltaGroup(thatOps) ⇒ DeltaGroup(ops ++ thatOps)
|
||||
case that: AtomicDeltaOp[A, B] ⇒ DeltaGroup(ops :+ that)
|
||||
}
|
||||
|
||||
override def zero: DeltaReplicatedData = ops.headOption.fold(ORMap.empty[A, B].asInstanceOf[DeltaReplicatedData])(_.zero)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -34,11 +156,16 @@ object ORMap {
|
|||
*/
|
||||
@SerialVersionUID(1L)
|
||||
final class ORMap[A, B <: ReplicatedData] private[akka] (
|
||||
private[akka] val keys: ORSet[A],
|
||||
private[akka] val values: Map[A, B])
|
||||
extends ReplicatedData with ReplicatedDataSerialization with RemovedNodePruning {
|
||||
private[akka] val keys: ORSet[A],
|
||||
private[akka] val values: Map[A, B],
|
||||
private[akka] val zeroTag: ZeroTag = ORMap.VanillaORMapTag,
|
||||
override val delta: Option[ORMap.DeltaOp] = None)
|
||||
extends DeltaReplicatedData with ReplicatedDataSerialization with RemovedNodePruning {
|
||||
|
||||
import ORMap.{ PutDeltaOp, UpdateDeltaOp, RemoveDeltaOp, RemoveKeyDeltaOp }
|
||||
|
||||
type T = ORMap[A, B]
|
||||
type D = ORMap.DeltaOp
|
||||
|
||||
/**
|
||||
* Scala API: All entries of the map.
|
||||
|
|
@ -100,8 +227,11 @@ final class ORMap[A, B <: ReplicatedData] private[akka] (
|
|||
"`ORMap.put` must not be used to replace an existing `ORSet` " +
|
||||
"value, because important history can be lost when replacing the `ORSet` and " +
|
||||
"undesired effects of merging will occur. Use `ORMultiMap` or `ORMap.updated` instead.")
|
||||
else
|
||||
new ORMap(keys.add(node, key), values.updated(key, value))
|
||||
else {
|
||||
val putDeltaOp = PutDeltaOp(keys.resetDelta.add(node, key).delta.get, key → value, zeroTag)
|
||||
// put forcibly damages history, so we propagate full value that will overwrite previous values
|
||||
new ORMap(keys.add(node, key), values.updated(key, value), zeroTag, Some(newDelta(putDeltaOp)))
|
||||
}
|
||||
|
||||
/**
|
||||
* Scala API: Replace a value by applying the `modify` function on the existing value.
|
||||
|
|
@ -124,12 +254,30 @@ final class ORMap[A, B <: ReplicatedData] private[akka] (
|
|||
/**
|
||||
* INTERNAL API
|
||||
*/
|
||||
@InternalApi private[akka] def updated(node: UniqueAddress, key: A, initial: B)(modify: B ⇒ B): ORMap[A, B] = {
|
||||
val newValue = values.get(key) match {
|
||||
case Some(old) ⇒ modify(old)
|
||||
case _ ⇒ modify(initial)
|
||||
@InternalApi private[akka] def updated(node: UniqueAddress, key: A, initial: B, valueDeltas: Boolean = false)(modify: B ⇒ B): ORMap[A, B] = {
|
||||
val (oldValue, hasOldValue) = values.get(key) match {
|
||||
case Some(old) ⇒ (old, true)
|
||||
case _ ⇒ (initial, false)
|
||||
}
|
||||
// Optimization: for some types - like GSet, GCounter, PNCounter and ORSet - that are delta based
|
||||
// we can emit (and later merge) their deltas instead of full updates.
|
||||
// However to avoid necessity of tombstones, the derived map type needs to support this
|
||||
// with clearing the value (e.g. removing all elements if value is a set)
|
||||
// before removing the key - like e.g. ORMultiMap does
|
||||
oldValue match {
|
||||
case _: DeltaReplicatedData if valueDeltas ⇒
|
||||
val newValue = modify(oldValue.asInstanceOf[DeltaReplicatedData].resetDelta.asInstanceOf[B])
|
||||
val newValueDelta = newValue.asInstanceOf[DeltaReplicatedData].delta
|
||||
val deltaOp = newValueDelta match {
|
||||
case Some(d) if hasOldValue ⇒ UpdateDeltaOp(keys.resetDelta.add(node, key).delta.get, Map(key → d), zeroTag)
|
||||
case _ ⇒ PutDeltaOp(keys.resetDelta.add(node, key).delta.get, key → newValue, zeroTag)
|
||||
}
|
||||
new ORMap(keys.add(node, key), values.updated(key, newValue), zeroTag, Some(newDelta(deltaOp)))
|
||||
case _ ⇒
|
||||
val newValue = modify(oldValue)
|
||||
val deltaOp = PutDeltaOp(keys.resetDelta.add(node, key).delta.get, key → newValue, zeroTag)
|
||||
new ORMap(keys.add(node, key), values.updated(key, newValue), zeroTag, Some(newDelta(deltaOp)))
|
||||
}
|
||||
new ORMap(keys.add(node, key), values.updated(key, newValue))
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -150,13 +298,24 @@ final class ORMap[A, B <: ReplicatedData] private[akka] (
|
|||
* INTERNAL API
|
||||
*/
|
||||
@InternalApi private[akka] def remove(node: UniqueAddress, key: A): ORMap[A, B] = {
|
||||
new ORMap(keys.remove(node, key), values - key)
|
||||
// for removals the delta values map emitted will be empty
|
||||
val removeDeltaOp = RemoveDeltaOp(keys.resetDelta.remove(node, key).delta.get, zeroTag)
|
||||
new ORMap(keys.remove(node, key), values - key, zeroTag, Some(newDelta(removeDeltaOp)))
|
||||
}
|
||||
|
||||
override def merge(that: ORMap[A, B]): ORMap[A, B] = {
|
||||
val mergedKeys = keys.merge(that.keys)
|
||||
/**
|
||||
* INTERNAL API
|
||||
* This function is only to be used by derived maps that avoid remove anomalies
|
||||
* by keeping the vvector (in form of key -> value pair) for deleted keys
|
||||
*/
|
||||
@InternalApi private[akka] def removeKey(node: UniqueAddress, key: A): ORMap[A, B] = {
|
||||
val removeKeyDeltaOp = RemoveKeyDeltaOp(keys.resetDelta.remove(node, key).delta.get, key, zeroTag)
|
||||
new ORMap(keys.remove(node, key), values, zeroTag, Some(newDelta(removeKeyDeltaOp)))
|
||||
}
|
||||
|
||||
private def dryMerge(that: ORMap[A, B], mergedKeys: ORSet[A], valueKeysIterator: Iterator[A]): ORMap[A, B] = {
|
||||
var mergedValues = Map.empty[A, B]
|
||||
mergedKeys.elementsMap.keysIterator.foreach { key ⇒
|
||||
valueKeysIterator.foreach { key ⇒
|
||||
(this.values.get(key), that.values.get(key)) match {
|
||||
case (Some(thisValue), Some(thatValue)) ⇒
|
||||
if (thisValue.getClass != thatValue.getClass) {
|
||||
|
|
@ -174,8 +333,110 @@ final class ORMap[A, B <: ReplicatedData] private[akka] (
|
|||
case (None, None) ⇒ throw new IllegalStateException(s"missing value for $key")
|
||||
}
|
||||
}
|
||||
new ORMap(mergedKeys, mergedValues, zeroTag = zeroTag)
|
||||
}
|
||||
|
||||
new ORMap(mergedKeys, mergedValues)
|
||||
override def merge(that: ORMap[A, B]): ORMap[A, B] = {
|
||||
val mergedKeys = keys.merge(that.keys)
|
||||
dryMerge(that, mergedKeys, mergedKeys.elementsMap.keysIterator)
|
||||
}
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
* This function is only to be used by derived maps that avoid remove anomalies
|
||||
* by keeping the vvector (in form of key -> value pair) for deleted keys
|
||||
*/
|
||||
@InternalApi private[akka] def mergeRetainingDeletedValues(that: ORMap[A, B]): ORMap[A, B] = {
|
||||
val mergedKeys = keys.merge(that.keys)
|
||||
dryMerge(that, mergedKeys, (this.values.keySet ++ that.values.keySet).iterator)
|
||||
}
|
||||
|
||||
override def resetDelta: ORMap[A, B] =
|
||||
if (delta.isEmpty) this
|
||||
else new ORMap[A, B](keys, values, zeroTag = zeroTag)
|
||||
|
||||
override def mergeDelta(thatDelta: ORMap.DeltaOp): ORMap[A, B] = {
|
||||
// helper function to simplify folds below
|
||||
def foldValues(values: List[(A, ReplicatedData)], initial: B) =
|
||||
values.foldLeft(initial) {
|
||||
case (acc: DeltaReplicatedData, (_, value: ReplicatedDelta)) ⇒
|
||||
acc.mergeDelta(value.asInstanceOf[acc.D]).asInstanceOf[B]
|
||||
case (acc, (_, value)) ⇒
|
||||
acc.merge(value.asInstanceOf[acc.T]).asInstanceOf[B]
|
||||
}
|
||||
|
||||
val mergedKeys: ORSet[A] = thatDelta match {
|
||||
case d: AtomicDeltaOp[A, B] ⇒ keys.mergeDelta(d.underlying)
|
||||
case ORMap.DeltaGroup(ops) ⇒
|
||||
ops.foldLeft(keys)((acc, op) ⇒ acc.mergeDelta(op.asInstanceOf[AtomicDeltaOp[A, B]].underlying))
|
||||
}
|
||||
|
||||
var mergedValues = Map.empty[A, B]
|
||||
var tombstonedVals = Set.empty[A]
|
||||
var thatValueDeltas: Map[A, List[(A, ReplicatedData)]] = Map.empty
|
||||
|
||||
val processDelta: PartialFunction[ORMap.DeltaOp, Unit] = {
|
||||
case putOp: PutDeltaOp[A, B] ⇒
|
||||
val key = putOp.value._1
|
||||
thatValueDeltas += (key → (putOp.value :: Nil)) // put is destructive!
|
||||
case _: RemoveDeltaOp[A, B] ⇒
|
||||
// remove delta is only for the side effect of key being removed
|
||||
// please note that if it is not preceded by update clearing the value
|
||||
// anomalies will result
|
||||
case removeKeyOp: RemoveKeyDeltaOp[A, B] ⇒
|
||||
tombstonedVals = tombstonedVals + removeKeyOp.removedKey
|
||||
case updateOp: UpdateDeltaOp[A, _] ⇒
|
||||
val key = updateOp.values.head._1
|
||||
val value = (key, updateOp.values.head._2)
|
||||
if (thatValueDeltas.contains(key))
|
||||
thatValueDeltas = thatValueDeltas + (key → (thatValueDeltas(key) :+ value))
|
||||
else
|
||||
thatValueDeltas += (key → (value :: Nil))
|
||||
}
|
||||
|
||||
val processNestedDelta: PartialFunction[ORMap.DeltaOp, Unit] = {
|
||||
case ORMap.DeltaGroup(ops) ⇒
|
||||
ops.foreach {
|
||||
processDelta.orElse {
|
||||
case ORMap.DeltaGroup(args) ⇒
|
||||
throw new IllegalStateException("Cannot nest DeltaGroups")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
(processDelta orElse processNestedDelta)(thatDelta)
|
||||
|
||||
val aggregateValuesForKey: (A ⇒ Unit) = { key ⇒
|
||||
(this.values.get(key), thatValueDeltas.get(key)) match {
|
||||
case (Some(thisValue), Some(thatValues)) ⇒
|
||||
val mergedValue = foldValues(thatValues, thisValue)
|
||||
mergedValues = mergedValues.updated(key, mergedValue)
|
||||
case (Some(thisValue), None) ⇒
|
||||
mergedValues = mergedValues.updated(key, thisValue)
|
||||
case (None, Some(thatValues)) ⇒
|
||||
val (_, initialValue) = thatValues.head
|
||||
val mergedValue = initialValue match {
|
||||
case _: ReplicatedDelta ⇒
|
||||
foldValues(thatValues, initialValue.asInstanceOf[ReplicatedDelta].zero.asInstanceOf[B])
|
||||
case _ ⇒
|
||||
foldValues(thatValues.tail, initialValue.asInstanceOf[B])
|
||||
}
|
||||
mergedValues = mergedValues.updated(key, mergedValue)
|
||||
case (None, None) ⇒ throw new IllegalStateException(s"missing value for $key")
|
||||
}
|
||||
}
|
||||
|
||||
mergedKeys.elementsMap.keysIterator.foreach { aggregateValuesForKey }
|
||||
tombstonedVals.foreach { aggregateValuesForKey }
|
||||
|
||||
new ORMap[A, B](mergedKeys, mergedValues, zeroTag = zeroTag)
|
||||
}
|
||||
|
||||
private def newDelta(deltaOp: ORMap.DeltaOp) = delta match {
|
||||
case Some(d) ⇒
|
||||
d.merge(deltaOp)
|
||||
case None ⇒
|
||||
deltaOp
|
||||
}
|
||||
|
||||
override def modifiedByNodes: Set[UniqueAddress] = {
|
||||
|
|
@ -199,7 +460,7 @@ final class ORMap[A, B <: ReplicatedData] private[akka] (
|
|||
acc.updated(key, data.prune(removedNode, collapseInto).asInstanceOf[B])
|
||||
case (acc, _) ⇒ acc
|
||||
}
|
||||
new ORMap(prunedKeys, prunedValues)
|
||||
new ORMap(prunedKeys, prunedValues, zeroTag = zeroTag)
|
||||
}
|
||||
|
||||
override def pruningCleanup(removedNode: UniqueAddress): ORMap[A, B] = {
|
||||
|
|
@ -209,7 +470,7 @@ final class ORMap[A, B <: ReplicatedData] private[akka] (
|
|||
acc.updated(key, data.pruningCleanup(removedNode).asInstanceOf[B])
|
||||
case (acc, _) ⇒ acc
|
||||
}
|
||||
new ORMap(pruningCleanupedKeys, pruningCleanupedValues)
|
||||
new ORMap(pruningCleanupedKeys, pruningCleanupedValues, zeroTag = zeroTag)
|
||||
}
|
||||
|
||||
// this class cannot be a `case class` because we need different `unapply`
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue