fixes to ORSet mergeRemoveDelta and ORMap deltaMerge (#22648)
This commit is contained in:
parent
d3de9d40cd
commit
3a8eef4506
11 changed files with 421 additions and 150 deletions
|
|
@ -298,11 +298,8 @@ final class ORMap[A, B <: ReplicatedData] private[akka] (
|
|||
@InternalApi private[akka] def remove(node: UniqueAddress, key: A): ORMap[A, B] = {
|
||||
// for removals the delta values map emitted will be empty
|
||||
val newKeys = keys.resetDelta.remove(node, key)
|
||||
// FIXME use full state for removals, until issue #22648 is fixed
|
||||
// val removeDeltaOp = RemoveDeltaOp(newKeys.delta.get, zeroTag)
|
||||
// new ORMap(newKeys, values - key, zeroTag, Some(newDelta(removeDeltaOp)))
|
||||
new ORMap(newKeys, values - key, zeroTag, delta = None)
|
||||
|
||||
val removeDeltaOp = RemoveDeltaOp(newKeys.delta.get, zeroTag)
|
||||
new ORMap(newKeys, values - key, zeroTag, Some(newDelta(removeDeltaOp)))
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -312,10 +309,8 @@ final class ORMap[A, B <: ReplicatedData] private[akka] (
|
|||
*/
|
||||
@InternalApi private[akka] def removeKey(node: UniqueAddress, key: A): ORMap[A, B] = {
|
||||
val newKeys = keys.resetDelta.remove(node, key)
|
||||
// FIXME use full state for removals, until issue #22648 is fixed
|
||||
// val removeKeyDeltaOp = RemoveKeyDeltaOp(newKeys.delta.get, key, zeroTag)
|
||||
// new ORMap(newKeys, values, zeroTag, Some(newDelta(removeKeyDeltaOp)))
|
||||
new ORMap(newKeys, values, zeroTag, delta = None)
|
||||
val removeKeyDeltaOp = RemoveKeyDeltaOp(newKeys.delta.get, key, zeroTag)
|
||||
new ORMap(newKeys, values, zeroTag, Some(newDelta(removeKeyDeltaOp)))
|
||||
}
|
||||
|
||||
private def dryMerge(that: ORMap[A, B], mergedKeys: ORSet[A], valueKeysIterator: Iterator[A]): ORMap[A, B] = {
|
||||
|
|
@ -360,43 +355,58 @@ final class ORMap[A, B <: ReplicatedData] private[akka] (
|
|||
if (delta.isEmpty) this
|
||||
else new ORMap[A, B](keys.resetDelta, values, zeroTag = zeroTag)
|
||||
|
||||
override def mergeDelta(thatDelta: ORMap.DeltaOp): ORMap[A, B] = {
|
||||
// helper function to simplify folds below
|
||||
def foldValues(values: List[(A, ReplicatedData)], initial: B) =
|
||||
values.foldLeft(initial) {
|
||||
case (acc: DeltaReplicatedData, (_, value: ReplicatedDelta)) ⇒
|
||||
acc.mergeDelta(value.asInstanceOf[acc.D]).asInstanceOf[B]
|
||||
case (acc, (_, value)) ⇒
|
||||
acc.merge(value.asInstanceOf[acc.T]).asInstanceOf[B]
|
||||
private def dryMergeDelta(thatDelta: ORMap.DeltaOp, withValueDeltas: Boolean = false): ORMap[A, B] = {
|
||||
def mergeValue(lvalue: ReplicatedData, rvalue: ReplicatedData): B =
|
||||
(lvalue, rvalue) match {
|
||||
case (v: DeltaReplicatedData, delta: ReplicatedDelta) ⇒
|
||||
v.mergeDelta(delta.asInstanceOf[v.D]).asInstanceOf[B]
|
||||
case _ ⇒
|
||||
lvalue.merge(rvalue.asInstanceOf[lvalue.T]).asInstanceOf[B]
|
||||
}
|
||||
|
||||
val mergedKeys: ORSet[A] = thatDelta match {
|
||||
case d: AtomicDeltaOp[A, B] ⇒ keys.mergeDelta(d.underlying)
|
||||
case ORMap.DeltaGroup(ops) ⇒
|
||||
ops.foldLeft(keys)((acc, op) ⇒ acc.mergeDelta(op.asInstanceOf[AtomicDeltaOp[A, B]].underlying))
|
||||
}
|
||||
|
||||
var mergedValues = Map.empty[A, B]
|
||||
var tombstonedVals = Set.empty[A]
|
||||
var thatValueDeltas: Map[A, List[(A, ReplicatedData)]] = Map.empty
|
||||
var mergedKeys: ORSet[A] = this.keys
|
||||
var (mergedValues, tombstonedVals): (Map[A, B], Map[A, B]) = this.values.partition { case (k, _) ⇒ this.keys.contains(k) }
|
||||
|
||||
val processDelta: PartialFunction[ORMap.DeltaOp, Unit] = {
|
||||
case putOp: PutDeltaOp[A, B] ⇒
|
||||
val key = putOp.value._1
|
||||
thatValueDeltas += (key → (putOp.value :: Nil)) // put is destructive!
|
||||
case _: RemoveDeltaOp[A, B] ⇒
|
||||
// remove delta is only for the side effect of key being removed
|
||||
// please note that if it is not preceded by update clearing the value
|
||||
// anomalies will result
|
||||
val keyDelta = putOp.underlying
|
||||
mergedKeys = mergedKeys.mergeDelta(keyDelta)
|
||||
mergedValues = mergedValues + putOp.value // put is destructive and propagates only full values of B!
|
||||
case removeOp: RemoveDeltaOp[A, B] ⇒
|
||||
val removedKey = removeOp.underlying match {
|
||||
// if op is RemoveDeltaOp then it must have exactly one element in the elements
|
||||
case op: ORSet.RemoveDeltaOp[_] ⇒ op.underlying.elements.head.asInstanceOf[A]
|
||||
case _ ⇒ throw new IllegalArgumentException("ORMap.RemoveDeltaOp must contain ORSet.RemoveDeltaOp inside")
|
||||
}
|
||||
mergedValues = mergedValues - removedKey
|
||||
mergedKeys = mergedKeys.mergeDelta(removeOp.underlying)
|
||||
// please note that if RemoveDeltaOp is not preceded by update clearing the value
|
||||
// anomalies may result
|
||||
case removeKeyOp: RemoveKeyDeltaOp[A, B] ⇒
|
||||
tombstonedVals = tombstonedVals + removeKeyOp.removedKey
|
||||
// removeKeyOp tombstones values for later use
|
||||
if (mergedValues.contains(removeKeyOp.removedKey)) {
|
||||
tombstonedVals = tombstonedVals + (removeKeyOp.removedKey → mergedValues(removeKeyOp.removedKey))
|
||||
}
|
||||
mergedValues = mergedValues - removeKeyOp.removedKey
|
||||
mergedKeys = mergedKeys.mergeDelta(removeKeyOp.underlying)
|
||||
case updateOp: UpdateDeltaOp[A, _] ⇒
|
||||
mergedKeys = mergedKeys.mergeDelta(updateOp.underlying)
|
||||
updateOp.values.foreach {
|
||||
case (key, value) ⇒
|
||||
if (thatValueDeltas.contains(key))
|
||||
thatValueDeltas = thatValueDeltas + (key → (thatValueDeltas(key) :+ (key → value)))
|
||||
else
|
||||
thatValueDeltas += (key → ((key, value) :: Nil))
|
||||
if (mergedKeys.contains(key)) {
|
||||
if (mergedValues.contains(key)) {
|
||||
mergedValues = mergedValues + (key → mergeValue(mergedValues(key), value))
|
||||
} else if (tombstonedVals.contains(key)) {
|
||||
mergedValues = mergedValues + (key → mergeValue(tombstonedVals(key), value))
|
||||
} else {
|
||||
value match {
|
||||
case _: ReplicatedDelta ⇒
|
||||
mergedValues = mergedValues + (key → mergeValue(value.asInstanceOf[ReplicatedDelta].zero, value))
|
||||
case _ ⇒
|
||||
mergedValues = mergedValues + (key → value.asInstanceOf[B])
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -412,30 +422,25 @@ final class ORMap[A, B <: ReplicatedData] private[akka] (
|
|||
|
||||
(processDelta orElse processNestedDelta)(thatDelta)
|
||||
|
||||
val aggregateValuesForKey: (A ⇒ Unit) = { key ⇒
|
||||
(this.values.get(key), thatValueDeltas.get(key)) match {
|
||||
case (Some(thisValue), Some(thatValues)) ⇒
|
||||
val mergedValue = foldValues(thatValues, thisValue)
|
||||
mergedValues = mergedValues.updated(key, mergedValue)
|
||||
case (Some(thisValue), None) ⇒
|
||||
mergedValues = mergedValues.updated(key, thisValue)
|
||||
case (None, Some(thatValues)) ⇒
|
||||
val (_, initialValue) = thatValues.head
|
||||
val mergedValue = initialValue match {
|
||||
case _: ReplicatedDelta ⇒
|
||||
foldValues(thatValues, initialValue.asInstanceOf[ReplicatedDelta].zero.asInstanceOf[B])
|
||||
case _ ⇒
|
||||
foldValues(thatValues.tail, initialValue.asInstanceOf[B])
|
||||
}
|
||||
mergedValues = mergedValues.updated(key, mergedValue)
|
||||
case (None, None) ⇒ throw new IllegalStateException(s"missing value for $key")
|
||||
}
|
||||
}
|
||||
if (withValueDeltas)
|
||||
new ORMap[A, B](mergedKeys, tombstonedVals ++ mergedValues, zeroTag = zeroTag)
|
||||
else
|
||||
new ORMap[A, B](mergedKeys, mergedValues, zeroTag = zeroTag)
|
||||
}
|
||||
|
||||
mergedKeys.elementsMap.keysIterator.foreach { aggregateValuesForKey }
|
||||
tombstonedVals.foreach { aggregateValuesForKey }
|
||||
override def mergeDelta(thatDelta: ORMap.DeltaOp): ORMap[A, B] = {
|
||||
val thisWithDeltas = dryMergeDelta(thatDelta)
|
||||
this.merge(thisWithDeltas)
|
||||
}
|
||||
|
||||
new ORMap[A, B](mergedKeys, mergedValues, zeroTag = zeroTag)
|
||||
/**
|
||||
* INTERNAL API
|
||||
* This function is only to be used by derived maps that avoid remove anomalies
|
||||
* by keeping the vvector (in form of key -> value pair) for deleted keys
|
||||
*/
|
||||
@InternalApi private[akka] def mergeDeltaRetainingDeletedValues(thatDelta: ORMap.DeltaOp): ORMap[A, B] = {
|
||||
val thisWithDeltas = dryMergeDelta(thatDelta, true)
|
||||
this.mergeRetainingDeletedValues(thisWithDeltas)
|
||||
}
|
||||
|
||||
private def newDelta(deltaOp: ORMap.DeltaOp) = delta match {
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue