Merge pull request #22508 from gosubpl/wip/22350-delta-crdt-orset-friends

ORMap and friends are delta-CRDTs (#22350)
This commit is contained in:
Patrik Nordwall 2017-03-17 10:27:52 +01:00 committed by GitHub
commit edee6ae544
15 changed files with 4371 additions and 140 deletions

View file

@ -72,7 +72,32 @@ message ORMap {
}
required ORSet keys = 1;
repeated Entry entries = 2;
repeated Entry entries = 2;
}
message ORMapDeltaGroup {
message MapEntry {
optional string stringKey = 1;
required OtherMessage value = 2;
optional sint32 intKey = 3;
optional sint64 longKey = 4;
optional OtherMessage otherKey = 5;
}
message Entry {
required ORMapDeltaOp operation = 1;
required ORSet underlying = 2;
required sint32 zeroTag = 3;
optional MapEntry entryData = 4;
}
repeated Entry entries = 1;
}
enum ORMapDeltaOp {
ORMapPut = 0;
ORMapRemove = 1;
ORMapRemoveKey = 2;
ORMapUpdate = 3;
}
message LWWMap {
@ -85,7 +110,7 @@ message LWWMap {
}
required ORSet keys = 1;
repeated Entry entries = 2;
repeated Entry entries = 2;
}
message PNCounterMap {
@ -98,21 +123,22 @@ message PNCounterMap {
}
required ORSet keys = 1;
repeated Entry entries = 2;
repeated Entry entries = 2;
}
message ORMultiMap {
message Entry {
optional string stringKey = 1;
required ORSet value = 2;
optional sint32 intKey = 3;
optional sint64 longKey = 4;
optional OtherMessage otherKey = 5;
}
required ORSet keys = 1;
repeated Entry entries = 2;
}
message Entry {
optional string stringKey = 1;
required ORSet value = 2;
optional sint32 intKey = 3;
optional sint64 longKey = 4;
optional OtherMessage otherKey = 5;
}
required ORSet keys = 1;
repeated Entry entries = 2;
optional bool withValueDeltas = 3;
}

View file

@ -6,9 +6,10 @@ package akka.cluster.ddata
import akka.cluster.Cluster
import akka.cluster.UniqueAddress
import akka.annotation.InternalApi
import akka.cluster.ddata.ORMap.LWWMapTag
object LWWMap {
private val _empty: LWWMap[Any, Any] = new LWWMap(ORMap.empty)
private val _empty: LWWMap[Any, Any] = new LWWMap(ORMap.emptyWithLWWMapTag)
def empty[A, B]: LWWMap[A, B] = _empty.asInstanceOf[LWWMap[A, B]]
def apply(): LWWMap[Any, Any] = _empty
/**
@ -47,10 +48,11 @@ object LWWMap {
@SerialVersionUID(1L)
final class LWWMap[A, B] private[akka] (
private[akka] val underlying: ORMap[A, LWWRegister[B]])
extends ReplicatedData with ReplicatedDataSerialization with RemovedNodePruning {
extends DeltaReplicatedData with ReplicatedDataSerialization with RemovedNodePruning {
import LWWRegister.{ Clock, defaultClock }
type T = LWWMap[A, B]
type D = ORMap.DeltaOp
/**
* Scala API: All entries of the map.
@ -141,6 +143,14 @@ final class LWWMap[A, B] private[akka] (
@InternalApi private[akka] def remove(node: UniqueAddress, key: A): LWWMap[A, B] =
new LWWMap(underlying.remove(node, key))
override def resetDelta: LWWMap[A, B] =
new LWWMap(underlying.resetDelta)
override def delta: Option[D] = underlying.delta
override def mergeDelta(thatDelta: D): LWWMap[A, B] =
new LWWMap(underlying.mergeDelta(thatDelta))
override def merge(that: LWWMap[A, B]): LWWMap[A, B] =
new LWWMap(underlying.merge(that.underlying))

View file

@ -7,6 +7,9 @@ import akka.cluster.Cluster
import akka.cluster.UniqueAddress
import akka.util.HashCode
import akka.annotation.InternalApi
import akka.cluster.ddata.ORMap.{ AtomicDeltaOp, ZeroTag }
import scala.collection.immutable
object ORMap {
private val _empty: ORMap[Any, ReplicatedData] = new ORMap(ORSet.empty, Map.empty)
@ -22,6 +25,125 @@ object ORMap {
*/
def unapply[A, B <: ReplicatedData](m: ORMap[A, B]): Option[Map[A, B]] = Some(m.entries)
sealed trait DeltaOp extends ReplicatedDelta with RequiresCausalDeliveryOfDeltas {
type T = DeltaOp
override def zero: DeltaReplicatedData
}
/**
* INTERNAL API
*/
@InternalApi private[akka] def emptyWithPNCounterMapTag[A, B <: ReplicatedData]: ORMap[A, B] = new ORMap(ORSet.empty, Map.empty, zeroTag = PNCounterMapTag)
/**
* INTERNAL API
*/
@InternalApi private[akka] def emptyWithORMultiMapTag[A, B <: ReplicatedData]: ORMap[A, B] = new ORMap(ORSet.empty, Map.empty, zeroTag = ORMultiMapTag)
/**
* INTERNAL API
*/
@InternalApi private[akka] def emptyWithORMultiMapWithValueDeltasTag[A, B <: ReplicatedData]: ORMap[A, B] = new ORMap(ORSet.empty, Map.empty, zeroTag = ORMultiMapWithValueDeltasTag)
/**
* INTERNAL API
*/
@InternalApi private[akka] def emptyWithLWWMapTag[A, B <: ReplicatedData]: ORMap[A, B] = new ORMap(ORSet.empty, Map.empty, zeroTag = LWWMapTag)
/**
* INTERNAL API
* Tags for ORMap.DeltaOp's, so that when the Replicator needs to re-create full value from delta,
* the right map type will be used
*/
@InternalApi private[akka] trait ZeroTag {
def zero: DeltaReplicatedData
def value: Int
}
/**
* INTERNAL API
*/
@InternalApi private[akka] case object VanillaORMapTag extends ZeroTag {
override def zero: DeltaReplicatedData = ORMap.empty
override final val value: Int = 0
}
/**
* INTERNAL API
*/
@InternalApi private[akka] case object PNCounterMapTag extends ZeroTag {
override def zero: DeltaReplicatedData = PNCounterMap.empty
override final val value: Int = 1
}
/**
* INTERNAL API
*/
@InternalApi private[akka] case object ORMultiMapTag extends ZeroTag {
override def zero: DeltaReplicatedData = ORMultiMap.empty
override final val value: Int = 2
}
/**
* INTERNAL API
*/
@InternalApi private[akka] case object ORMultiMapWithValueDeltasTag extends ZeroTag {
override def zero: DeltaReplicatedData = ORMultiMap.emptyWithValueDeltas
override final val value: Int = 3
}
/**
* INTERNAL API
*/
@InternalApi private[akka] case object LWWMapTag extends ZeroTag {
override def zero: DeltaReplicatedData = LWWMap.empty
override final val value: Int = 4
}
/**
* INTERNAL API
*/
@InternalApi private[akka] sealed abstract class AtomicDeltaOp[A, B <: ReplicatedData] extends DeltaOp {
def underlying: ORSet.DeltaOp
def zeroTag: ZeroTag
override def zero: DeltaReplicatedData = zeroTag.zero
override def merge(that: DeltaOp): DeltaOp = that match {
case other: AtomicDeltaOp[A, B] DeltaGroup(Vector(this, other))
case DeltaGroup(ops) DeltaGroup(this +: ops)
}
}
// PutDeltaOp contains ORSet delta and full value
/** INTERNAL API */
@InternalApi private[akka] final case class PutDeltaOp[A, B <: ReplicatedData](underlying: ORSet.DeltaOp, value: (A, B), zeroTag: ZeroTag = VanillaORMapTag) extends AtomicDeltaOp[A, B] {
}
// UpdateDeltaOp contains ORSet delta and either delta of value (in case where underlying type supports deltas) or full value
/** INTERNAL API */
@InternalApi private[akka] final case class UpdateDeltaOp[A, X <: ReplicatedDelta](underlying: ORSet.DeltaOp, values: Map[A, X], zeroTag: ZeroTag = VanillaORMapTag) extends AtomicDeltaOp[A, X] {
}
// RemoveDeltaOp does not contain any value at all - the propagated 'value' map is empty
/** INTERNAL API */
@InternalApi private[akka] final case class RemoveDeltaOp[A, B <: ReplicatedData](underlying: ORSet.DeltaOp, zeroTag: ZeroTag = VanillaORMapTag) extends AtomicDeltaOp[A, B] {
}
// RemoveKeyDeltaOp contains a single value - to provide the recipient with the removed key for value map
/** INTERNAL API */
@InternalApi private[akka] final case class RemoveKeyDeltaOp[A, B <: ReplicatedData](underlying: ORSet.DeltaOp, removedKey: A, zeroTag: ZeroTag = VanillaORMapTag) extends AtomicDeltaOp[A, B] {
}
// DeltaGroup is effectively a causally ordered list of individual deltas
/** INTERNAL API */
@InternalApi private[akka] final case class DeltaGroup[A, B <: ReplicatedData](ops: immutable.IndexedSeq[DeltaOp]) extends DeltaOp {
override def merge(that: DeltaOp): DeltaOp = that match {
case DeltaGroup(thatOps) DeltaGroup(ops ++ thatOps)
case that: AtomicDeltaOp[A, B] DeltaGroup(ops :+ that)
}
override def zero: DeltaReplicatedData = ops.headOption.fold(ORMap.empty[A, B].asInstanceOf[DeltaReplicatedData])(_.zero)
}
}
/**
@ -34,11 +156,16 @@ object ORMap {
*/
@SerialVersionUID(1L)
final class ORMap[A, B <: ReplicatedData] private[akka] (
private[akka] val keys: ORSet[A],
private[akka] val values: Map[A, B])
extends ReplicatedData with ReplicatedDataSerialization with RemovedNodePruning {
private[akka] val keys: ORSet[A],
private[akka] val values: Map[A, B],
private[akka] val zeroTag: ZeroTag = ORMap.VanillaORMapTag,
override val delta: Option[ORMap.DeltaOp] = None)
extends DeltaReplicatedData with ReplicatedDataSerialization with RemovedNodePruning {
import ORMap.{ PutDeltaOp, UpdateDeltaOp, RemoveDeltaOp, RemoveKeyDeltaOp }
type T = ORMap[A, B]
type D = ORMap.DeltaOp
/**
* Scala API: All entries of the map.
@ -100,8 +227,11 @@ final class ORMap[A, B <: ReplicatedData] private[akka] (
"`ORMap.put` must not be used to replace an existing `ORSet` " +
"value, because important history can be lost when replacing the `ORSet` and " +
"undesired effects of merging will occur. Use `ORMultiMap` or `ORMap.updated` instead.")
else
new ORMap(keys.add(node, key), values.updated(key, value))
else {
val putDeltaOp = PutDeltaOp(keys.resetDelta.add(node, key).delta.get, key value, zeroTag)
// put forcibly damages history, so we propagate full value that will overwrite previous values
new ORMap(keys.add(node, key), values.updated(key, value), zeroTag, Some(newDelta(putDeltaOp)))
}
/**
* Scala API: Replace a value by applying the `modify` function on the existing value.
@ -124,12 +254,30 @@ final class ORMap[A, B <: ReplicatedData] private[akka] (
/**
* INTERNAL API
*/
@InternalApi private[akka] def updated(node: UniqueAddress, key: A, initial: B)(modify: B B): ORMap[A, B] = {
val newValue = values.get(key) match {
case Some(old) modify(old)
case _ modify(initial)
@InternalApi private[akka] def updated(node: UniqueAddress, key: A, initial: B, valueDeltas: Boolean = false)(modify: B B): ORMap[A, B] = {
val (oldValue, hasOldValue) = values.get(key) match {
case Some(old) (old, true)
case _ (initial, false)
}
// Optimization: for some types - like GSet, GCounter, PNCounter and ORSet - that are delta based
// we can emit (and later merge) their deltas instead of full updates.
// However to avoid necessity of tombstones, the derived map type needs to support this
// with clearing the value (e.g. removing all elements if value is a set)
// before removing the key - like e.g. ORMultiMap does
oldValue match {
case _: DeltaReplicatedData if valueDeltas
val newValue = modify(oldValue.asInstanceOf[DeltaReplicatedData].resetDelta.asInstanceOf[B])
val newValueDelta = newValue.asInstanceOf[DeltaReplicatedData].delta
val deltaOp = newValueDelta match {
case Some(d) if hasOldValue UpdateDeltaOp(keys.resetDelta.add(node, key).delta.get, Map(key d), zeroTag)
case _ PutDeltaOp(keys.resetDelta.add(node, key).delta.get, key newValue, zeroTag)
}
new ORMap(keys.add(node, key), values.updated(key, newValue), zeroTag, Some(newDelta(deltaOp)))
case _
val newValue = modify(oldValue)
val deltaOp = PutDeltaOp(keys.resetDelta.add(node, key).delta.get, key newValue, zeroTag)
new ORMap(keys.add(node, key), values.updated(key, newValue), zeroTag, Some(newDelta(deltaOp)))
}
new ORMap(keys.add(node, key), values.updated(key, newValue))
}
/**
@ -150,13 +298,24 @@ final class ORMap[A, B <: ReplicatedData] private[akka] (
* INTERNAL API
*/
@InternalApi private[akka] def remove(node: UniqueAddress, key: A): ORMap[A, B] = {
new ORMap(keys.remove(node, key), values - key)
// for removals the delta values map emitted will be empty
val removeDeltaOp = RemoveDeltaOp(keys.resetDelta.remove(node, key).delta.get, zeroTag)
new ORMap(keys.remove(node, key), values - key, zeroTag, Some(newDelta(removeDeltaOp)))
}
override def merge(that: ORMap[A, B]): ORMap[A, B] = {
val mergedKeys = keys.merge(that.keys)
/**
* INTERNAL API
* This function is only to be used by derived maps that avoid remove anomalies
* by keeping the vvector (in form of key -> value pair) for deleted keys
*/
@InternalApi private[akka] def removeKey(node: UniqueAddress, key: A): ORMap[A, B] = {
val removeKeyDeltaOp = RemoveKeyDeltaOp(keys.resetDelta.remove(node, key).delta.get, key, zeroTag)
new ORMap(keys.remove(node, key), values, zeroTag, Some(newDelta(removeKeyDeltaOp)))
}
private def dryMerge(that: ORMap[A, B], mergedKeys: ORSet[A], valueKeysIterator: Iterator[A]): ORMap[A, B] = {
var mergedValues = Map.empty[A, B]
mergedKeys.elementsMap.keysIterator.foreach { key
valueKeysIterator.foreach { key
(this.values.get(key), that.values.get(key)) match {
case (Some(thisValue), Some(thatValue))
if (thisValue.getClass != thatValue.getClass) {
@ -174,8 +333,110 @@ final class ORMap[A, B <: ReplicatedData] private[akka] (
case (None, None) throw new IllegalStateException(s"missing value for $key")
}
}
new ORMap(mergedKeys, mergedValues, zeroTag = zeroTag)
}
new ORMap(mergedKeys, mergedValues)
override def merge(that: ORMap[A, B]): ORMap[A, B] = {
val mergedKeys = keys.merge(that.keys)
dryMerge(that, mergedKeys, mergedKeys.elementsMap.keysIterator)
}
/**
* INTERNAL API
* This function is only to be used by derived maps that avoid remove anomalies
* by keeping the vvector (in form of key -> value pair) for deleted keys
*/
@InternalApi private[akka] def mergeRetainingDeletedValues(that: ORMap[A, B]): ORMap[A, B] = {
val mergedKeys = keys.merge(that.keys)
dryMerge(that, mergedKeys, (this.values.keySet ++ that.values.keySet).iterator)
}
override def resetDelta: ORMap[A, B] =
if (delta.isEmpty) this
else new ORMap[A, B](keys, values, zeroTag = zeroTag)
override def mergeDelta(thatDelta: ORMap.DeltaOp): ORMap[A, B] = {
// helper function to simplify folds below
def foldValues(values: List[(A, ReplicatedData)], initial: B) =
values.foldLeft(initial) {
case (acc: DeltaReplicatedData, (_, value: ReplicatedDelta))
acc.mergeDelta(value.asInstanceOf[acc.D]).asInstanceOf[B]
case (acc, (_, value))
acc.merge(value.asInstanceOf[acc.T]).asInstanceOf[B]
}
val mergedKeys: ORSet[A] = thatDelta match {
case d: AtomicDeltaOp[A, B] keys.mergeDelta(d.underlying)
case ORMap.DeltaGroup(ops)
ops.foldLeft(keys)((acc, op) acc.mergeDelta(op.asInstanceOf[AtomicDeltaOp[A, B]].underlying))
}
var mergedValues = Map.empty[A, B]
var tombstonedVals = Set.empty[A]
var thatValueDeltas: Map[A, List[(A, ReplicatedData)]] = Map.empty
val processDelta: PartialFunction[ORMap.DeltaOp, Unit] = {
case putOp: PutDeltaOp[A, B]
val key = putOp.value._1
thatValueDeltas += (key (putOp.value :: Nil)) // put is destructive!
case _: RemoveDeltaOp[A, B]
// remove delta is only for the side effect of key being removed
// please note that if it is not preceded by update clearing the value
// anomalies will result
case removeKeyOp: RemoveKeyDeltaOp[A, B]
tombstonedVals = tombstonedVals + removeKeyOp.removedKey
case updateOp: UpdateDeltaOp[A, _]
val key = updateOp.values.head._1
val value = (key, updateOp.values.head._2)
if (thatValueDeltas.contains(key))
thatValueDeltas = thatValueDeltas + (key (thatValueDeltas(key) :+ value))
else
thatValueDeltas += (key (value :: Nil))
}
val processNestedDelta: PartialFunction[ORMap.DeltaOp, Unit] = {
case ORMap.DeltaGroup(ops)
ops.foreach {
processDelta.orElse {
case ORMap.DeltaGroup(args)
throw new IllegalStateException("Cannot nest DeltaGroups")
}
}
}
(processDelta orElse processNestedDelta)(thatDelta)
val aggregateValuesForKey: (A Unit) = { key
(this.values.get(key), thatValueDeltas.get(key)) match {
case (Some(thisValue), Some(thatValues))
val mergedValue = foldValues(thatValues, thisValue)
mergedValues = mergedValues.updated(key, mergedValue)
case (Some(thisValue), None)
mergedValues = mergedValues.updated(key, thisValue)
case (None, Some(thatValues))
val (_, initialValue) = thatValues.head
val mergedValue = initialValue match {
case _: ReplicatedDelta
foldValues(thatValues, initialValue.asInstanceOf[ReplicatedDelta].zero.asInstanceOf[B])
case _
foldValues(thatValues.tail, initialValue.asInstanceOf[B])
}
mergedValues = mergedValues.updated(key, mergedValue)
case (None, None) throw new IllegalStateException(s"missing value for $key")
}
}
mergedKeys.elementsMap.keysIterator.foreach { aggregateValuesForKey }
tombstonedVals.foreach { aggregateValuesForKey }
new ORMap[A, B](mergedKeys, mergedValues, zeroTag = zeroTag)
}
private def newDelta(deltaOp: ORMap.DeltaOp) = delta match {
case Some(d)
d.merge(deltaOp)
case None
deltaOp
}
override def modifiedByNodes: Set[UniqueAddress] = {
@ -199,7 +460,7 @@ final class ORMap[A, B <: ReplicatedData] private[akka] (
acc.updated(key, data.prune(removedNode, collapseInto).asInstanceOf[B])
case (acc, _) acc
}
new ORMap(prunedKeys, prunedValues)
new ORMap(prunedKeys, prunedValues, zeroTag = zeroTag)
}
override def pruningCleanup(removedNode: UniqueAddress): ORMap[A, B] = {
@ -209,7 +470,7 @@ final class ORMap[A, B <: ReplicatedData] private[akka] (
acc.updated(key, data.pruningCleanup(removedNode).asInstanceOf[B])
case (acc, _) acc
}
new ORMap(pruningCleanupedKeys, pruningCleanupedValues)
new ORMap(pruningCleanupedKeys, pruningCleanupedValues, zeroTag = zeroTag)
}
// this class cannot be a `case class` because we need different `unapply`

View file

@ -3,22 +3,26 @@
*/
package akka.cluster.ddata
import akka.cluster.{ UniqueAddress, Cluster }
import akka.cluster.{ Cluster, UniqueAddress }
import akka.annotation.InternalApi
import akka.cluster.ddata.ORMap._
object ORMultiMap {
val _empty: ORMultiMap[Any, Any] = new ORMultiMap(ORMap.empty)
val _empty: ORMultiMap[Any, Any] = new ORMultiMap(ORMap.emptyWithORMultiMapTag, false)
val _emptyWithValueDeltas: ORMultiMap[Any, Any] = new ORMultiMap(ORMap.emptyWithORMultiMapTag, true)
/**
* Provides an empty multimap.
*/
def empty[A, B]: ORMultiMap[A, B] = _empty.asInstanceOf[ORMultiMap[A, B]]
def emptyWithValueDeltas[A, B]: ORMultiMap[A, B] = _emptyWithValueDeltas.asInstanceOf[ORMultiMap[A, B]]
def apply(): ORMultiMap[Any, Any] = _empty
/**
* Java API
*/
def create[A, B](): ORMultiMap[A, B] = empty[A, B]
def createWithDeltaDelta[A, B](): ORMultiMap[A, B] = emptyWithValueDeltas[A, B]
/**
* Extract the [[ORMultiMap#entries]].
@ -41,18 +45,28 @@ object ORMultiMap {
* This class is immutable, i.e. "modifying" methods return a new instance.
*/
@SerialVersionUID(1L)
final class ORMultiMap[A, B] private[akka] (private[akka] val underlying: ORMap[A, ORSet[B]])
extends ReplicatedData with ReplicatedDataSerialization with RemovedNodePruning {
final class ORMultiMap[A, B] private[akka] (
private[akka] val underlying: ORMap[A, ORSet[B]],
private[akka] val withValueDeltas: Boolean)
extends DeltaReplicatedData with ReplicatedDataSerialization with RemovedNodePruning {
override type T = ORMultiMap[A, B]
override type D = ORMap.DeltaOp
override def merge(that: T): T =
new ORMultiMap(underlying.merge(that.underlying))
if (withValueDeltas == that.withValueDeltas) {
if (withValueDeltas)
new ORMultiMap(underlying.mergeRetainingDeletedValues(that.underlying), withValueDeltas)
else
new ORMultiMap(underlying.merge(that.underlying), withValueDeltas)
} else throw new IllegalArgumentException("Trying to merge two ORMultiMaps of different map sub-type")
/**
* Scala API: All entries of a multimap where keys are strings and values are sets.
*/
def entries: Map[A, Set[B]] =
def entries: Map[A, Set[B]] = if (withValueDeltas)
underlying.entries.collect { case (k, v) if underlying.keys.elements.contains(k) k v.elements }
else
underlying.entries.map { case (k, v) k v.elements }
/**
@ -61,9 +75,10 @@ final class ORMultiMap[A, B] private[akka] (private[akka] val underlying: ORMap[
def getEntries(): java.util.Map[A, java.util.Set[B]] = {
import scala.collection.JavaConverters._
val result = new java.util.HashMap[A, java.util.Set[B]]
underlying.entries.foreach {
case (k, v) result.put(k, v.elements.asJava)
}
if (withValueDeltas)
underlying.entries.foreach { case (k, v) if (underlying.keys.elements.contains(k)) result.put(k, v.elements.asJava) }
else
underlying.entries.foreach { case (k, v) result.put(k, v.elements.asJava) }
result
}
@ -71,7 +86,10 @@ final class ORMultiMap[A, B] private[akka] (private[akka] val underlying: ORMap[
* Get the set associated with the key if there is one.
*/
def get(key: A): Option[Set[B]] =
underlying.get(key).map(_.elements)
if (withValueDeltas && !underlying.keys.elements.contains(key))
None
else
underlying.get(key).map(_.elements)
/**
* Scala API: Get the set associated with the key if there is one,
@ -80,11 +98,11 @@ final class ORMultiMap[A, B] private[akka] (private[akka] val underlying: ORMap[
def getOrElse(key: A, default: Set[B]): Set[B] =
get(key).getOrElse(default)
def contains(key: A): Boolean = underlying.contains(key)
def contains(key: A): Boolean = underlying.keys.elements.contains(key)
def isEmpty: Boolean = underlying.isEmpty
def isEmpty: Boolean = underlying.keys.elements.isEmpty
def size: Int = underlying.size
def size: Int = underlying.keys.elements.size
/**
* Convenience for put. Requires an implicit Cluster.
@ -115,10 +133,10 @@ final class ORMultiMap[A, B] private[akka] (private[akka] val underlying: ORMap[
* INTERNAL API
*/
@InternalApi private[akka] def put(node: UniqueAddress, key: A, value: Set[B]): ORMultiMap[A, B] = {
val newUnderlying = underlying.updated(node, key, ORSet.empty[B]) { existing
val newUnderlying = underlying.updated(node, key, ORSet.empty[B], valueDeltas = withValueDeltas) { existing
value.foldLeft(existing.clear(node)) { (s, element) s.add(node, element) }
}
new ORMultiMap(newUnderlying)
new ORMultiMap(newUnderlying, withValueDeltas)
}
/**
@ -137,8 +155,14 @@ final class ORMultiMap[A, B] private[akka] (private[akka] val underlying: ORMap[
/**
* INTERNAL API
*/
@InternalApi private[akka] def remove(node: UniqueAddress, key: A): ORMultiMap[A, B] =
new ORMultiMap(underlying.remove(node, key))
@InternalApi private[akka] def remove(node: UniqueAddress, key: A): ORMultiMap[A, B] = {
if (withValueDeltas) {
val u = underlying.updated(node, key, ORSet.empty[B], valueDeltas = true) { existing existing.clear(node) }
new ORMultiMap(u.removeKey(node, key), withValueDeltas)
} else {
new ORMultiMap(underlying.remove(node, key), withValueDeltas)
}
}
/**
* Scala API: Add an element to a set associated with a key. If there is no existing set then one will be initialised.
@ -156,8 +180,8 @@ final class ORMultiMap[A, B] private[akka] (private[akka] val underlying: ORMap[
* INTERNAL API
*/
@InternalApi private[akka] def addBinding(node: UniqueAddress, key: A, element: B): ORMultiMap[A, B] = {
val newUnderlying = underlying.updated(node, key, ORSet.empty[B])(_.add(node, element))
new ORMultiMap(newUnderlying)
val newUnderlying = underlying.updated(node, key, ORSet.empty[B], valueDeltas = withValueDeltas)(_.add(node, element))
new ORMultiMap(newUnderlying, withValueDeltas)
}
/**
@ -179,13 +203,17 @@ final class ORMultiMap[A, B] private[akka] (private[akka] val underlying: ORMap[
*/
@InternalApi private[akka] def removeBinding(node: UniqueAddress, key: A, element: B): ORMultiMap[A, B] = {
val newUnderlying = {
val u = underlying.updated(node, key, ORSet.empty[B])(_.remove(node, element))
val u = underlying.updated(node, key, ORSet.empty[B], valueDeltas = withValueDeltas)(_.remove(node, element))
u.get(key) match {
case Some(s) if s.isEmpty u.remove(node, key)
case _ u
case Some(s) if s.isEmpty
if (withValueDeltas)
u.removeKey(node, key)
else
u.remove(node, key)
case _ u
}
}
new ORMultiMap(newUnderlying)
new ORMultiMap(newUnderlying, withValueDeltas)
}
/**
@ -205,6 +233,14 @@ final class ORMultiMap[A, B] private[akka] (private[akka] val underlying: ORMap[
else
this
override def resetDelta: ORMultiMap[A, B] =
new ORMultiMap(underlying.resetDelta, withValueDeltas)
override def delta: Option[D] = underlying.delta
override def mergeDelta(thatDelta: D): ORMultiMap[A, B] =
new ORMultiMap(underlying.mergeDelta(thatDelta), withValueDeltas)
override def modifiedByNodes: Set[UniqueAddress] =
underlying.modifiedByNodes
@ -212,10 +248,10 @@ final class ORMultiMap[A, B] private[akka] (private[akka] val underlying: ORMap[
underlying.needPruningFrom(removedNode)
override def pruningCleanup(removedNode: UniqueAddress): T =
new ORMultiMap(underlying.pruningCleanup(removedNode))
new ORMultiMap(underlying.pruningCleanup(removedNode), withValueDeltas)
override def prune(removedNode: UniqueAddress, collapseInto: UniqueAddress): T =
new ORMultiMap(underlying.prune(removedNode, collapseInto))
new ORMultiMap(underlying.prune(removedNode, collapseInto), withValueDeltas)
// this class cannot be a `case class` because we need different `unapply`

View file

@ -91,7 +91,10 @@ object ORSet {
}
}
final case class DeltaGroup[A](ops: immutable.IndexedSeq[DeltaOp]) extends DeltaOp {
/**
* INTERNAL API
*/
@InternalApi private[akka] final case class DeltaGroup[A](ops: immutable.IndexedSeq[DeltaOp]) extends DeltaOp {
override def merge(that: DeltaOp): DeltaOp = that match {
case thatAdd: AddDeltaOp[A]
// merge AddDeltaOp into last AddDeltaOp in the group, if possible

View file

@ -6,10 +6,12 @@ package akka.cluster.ddata
import akka.cluster.Cluster
import akka.cluster.UniqueAddress
import java.math.BigInteger
import akka.annotation.InternalApi
import akka.cluster.ddata.ORMap._
object PNCounterMap {
def empty[A]: PNCounterMap[A] = new PNCounterMap(ORMap.empty)
def empty[A]: PNCounterMap[A] = new PNCounterMap(ORMap.emptyWithPNCounterMapTag)
def apply[A](): PNCounterMap[A] = empty
/**
* Java API
@ -30,9 +32,10 @@ object PNCounterMap {
@SerialVersionUID(1L)
final class PNCounterMap[A] private[akka] (
private[akka] val underlying: ORMap[A, PNCounter])
extends ReplicatedData with ReplicatedDataSerialization with RemovedNodePruning {
extends DeltaReplicatedData with ReplicatedDataSerialization with RemovedNodePruning {
type T = PNCounterMap[A]
type D = ORMap.DeltaOp
/** Scala API */
def entries: Map[A, BigInt] = underlying.entries.map { case (k, c) k c.value }
@ -124,6 +127,14 @@ final class PNCounterMap[A] private[akka] (
override def merge(that: PNCounterMap[A]): PNCounterMap[A] =
new PNCounterMap(underlying.merge(that.underlying))
override def resetDelta: PNCounterMap[A] =
new PNCounterMap(underlying.resetDelta)
override def delta: Option[D] = underlying.delta
override def mergeDelta(thatDelta: D): PNCounterMap[A] =
new PNCounterMap(underlying.mergeDelta(thatDelta))
override def modifiedByNodes: Set[UniqueAddress] =
underlying.modifiedByNodes

View file

@ -149,6 +149,22 @@ private object ReplicatedDataSerializer {
override def getValue(entry: rd.ORMultiMap.Entry): rd.ORSet = entry.getValue
}
implicit object ORMapDeltaGroupEntry extends ProtoMapEntryWriter[rd.ORMapDeltaGroup.MapEntry, rd.ORMapDeltaGroup.MapEntry.Builder, dm.OtherMessage] with ProtoMapEntryReader[rd.ORMapDeltaGroup.MapEntry, dm.OtherMessage] {
override def setStringKey(builder: rd.ORMapDeltaGroup.MapEntry.Builder, key: String, value: dm.OtherMessage): rd.ORMapDeltaGroup.MapEntry = builder.setStringKey(key).setValue(value).build()
override def setLongKey(builder: rd.ORMapDeltaGroup.MapEntry.Builder, key: Long, value: dm.OtherMessage): rd.ORMapDeltaGroup.MapEntry = builder.setLongKey(key).setValue(value).build()
override def setIntKey(builder: rd.ORMapDeltaGroup.MapEntry.Builder, key: Int, value: dm.OtherMessage): rd.ORMapDeltaGroup.MapEntry = builder.setIntKey(key).setValue(value).build()
override def setOtherKey(builder: rd.ORMapDeltaGroup.MapEntry.Builder, key: dm.OtherMessage, value: dm.OtherMessage): rd.ORMapDeltaGroup.MapEntry = builder.setOtherKey(key).setValue(value).build()
override def hasStringKey(entry: rd.ORMapDeltaGroup.MapEntry): Boolean = entry.hasStringKey
override def getStringKey(entry: rd.ORMapDeltaGroup.MapEntry): String = entry.getStringKey
override def hasIntKey(entry: rd.ORMapDeltaGroup.MapEntry): Boolean = entry.hasIntKey
override def getIntKey(entry: rd.ORMapDeltaGroup.MapEntry): Int = entry.getIntKey
override def hasLongKey(entry: rd.ORMapDeltaGroup.MapEntry): Boolean = entry.hasLongKey
override def getLongKey(entry: rd.ORMapDeltaGroup.MapEntry): Long = entry.getLongKey
override def hasOtherKey(entry: rd.ORMapDeltaGroup.MapEntry): Boolean = entry.hasOtherKey
override def getOtherKey(entry: rd.ORMapDeltaGroup.MapEntry): OtherMessage = entry.getOtherKey
override def getValue(entry: rd.ORMapDeltaGroup.MapEntry): dm.OtherMessage = entry.getValue
}
}
/**
@ -178,6 +194,11 @@ class ReplicatedDataSerializer(val system: ExtendedActorSystem)
private val PNCounterKeyManifest = "g"
private val ORMapManifest = "H"
private val ORMapKeyManifest = "h"
private val ORMapPutManifest = "Ha"
private val ORMapRemoveManifest = "Hr"
private val ORMapRemoveKeyManifest = "Hk"
private val ORMapUpdateManifest = "Hu"
private val ORMapDeltaGroupManifest = "Hg"
private val LWWMapManifest = "I"
private val LWWMapKeyManifest = "i"
private val PNCounterMapManifest = "J"
@ -198,6 +219,11 @@ class ReplicatedDataSerializer(val system: ExtendedActorSystem)
GCounterManifest gcounterFromBinary,
PNCounterManifest pncounterFromBinary,
ORMapManifest ormapFromBinary,
ORMapPutManifest ormapPutFromBinary,
ORMapRemoveManifest ormapRemoveFromBinary,
ORMapRemoveKeyManifest ormapRemoveKeyFromBinary,
ORMapUpdateManifest ormapUpdateFromBinary,
ORMapDeltaGroupManifest ormapDeltaGroupFromBinary,
LWWMapManifest lwwmapFromBinary,
PNCounterMapManifest pncountermapFromBinary,
ORMultiMapManifest multimapFromBinary,
@ -216,57 +242,67 @@ class ReplicatedDataSerializer(val system: ExtendedActorSystem)
ORMultiMapKeyManifest (bytes ORMultiMapKey(keyIdFromBinary(bytes))))
override def manifest(obj: AnyRef): String = obj match {
case _: ORSet[_] ORSetManifest
case _: ORSet.AddDeltaOp[_] ORSetAddManifest
case _: ORSet.RemoveDeltaOp[_] ORSetRemoveManifest
case _: GSet[_] GSetManifest
case _: GCounter GCounterManifest
case _: PNCounter PNCounterManifest
case _: Flag FlagManifest
case _: LWWRegister[_] LWWRegisterManifest
case _: ORMap[_, _] ORMapManifest
case _: LWWMap[_, _] LWWMapManifest
case _: PNCounterMap[_] PNCounterMapManifest
case _: ORMultiMap[_, _] ORMultiMapManifest
case DeletedData DeletedDataManifest
case _: VersionVector VersionVectorManifest
case _: ORSet[_] ORSetManifest
case _: ORSet.AddDeltaOp[_] ORSetAddManifest
case _: ORSet.RemoveDeltaOp[_] ORSetRemoveManifest
case _: GSet[_] GSetManifest
case _: GCounter GCounterManifest
case _: PNCounter PNCounterManifest
case _: Flag FlagManifest
case _: LWWRegister[_] LWWRegisterManifest
case _: ORMap[_, _] ORMapManifest
case _: ORMap.PutDeltaOp[_, _] ORMapPutManifest
case _: ORMap.RemoveDeltaOp[_, _] ORMapRemoveManifest
case _: ORMap.RemoveKeyDeltaOp[_, _] ORMapRemoveKeyManifest
case _: ORMap.UpdateDeltaOp[_, _] ORMapUpdateManifest
case _: LWWMap[_, _] LWWMapManifest
case _: PNCounterMap[_] PNCounterMapManifest
case _: ORMultiMap[_, _] ORMultiMapManifest
case DeletedData DeletedDataManifest
case _: VersionVector VersionVectorManifest
case _: ORSetKey[_] ORSetKeyManifest
case _: GSetKey[_] GSetKeyManifest
case _: GCounterKey GCounterKeyManifest
case _: PNCounterKey PNCounterKeyManifest
case _: FlagKey FlagKeyManifest
case _: LWWRegisterKey[_] LWWRegisterKeyManifest
case _: ORMapKey[_, _] ORMapKeyManifest
case _: LWWMapKey[_, _] LWWMapKeyManifest
case _: PNCounterMapKey[_] PNCounterMapKeyManifest
case _: ORMultiMapKey[_, _] ORMultiMapKeyManifest
case _: ORSetKey[_] ORSetKeyManifest
case _: GSetKey[_] GSetKeyManifest
case _: GCounterKey GCounterKeyManifest
case _: PNCounterKey PNCounterKeyManifest
case _: FlagKey FlagKeyManifest
case _: LWWRegisterKey[_] LWWRegisterKeyManifest
case _: ORMapKey[_, _] ORMapKeyManifest
case _: LWWMapKey[_, _] LWWMapKeyManifest
case _: PNCounterMapKey[_] PNCounterMapKeyManifest
case _: ORMultiMapKey[_, _] ORMultiMapKeyManifest
case _: ORSet.DeltaGroup[_] ORSetDeltaGroupManifest
case _: ORSet.FullStateDeltaOp[_] ORSetFullManifest
case _: ORSet.DeltaGroup[_] ORSetDeltaGroupManifest
case _: ORMap.DeltaGroup[_, _] ORMapDeltaGroupManifest
case _: ORSet.FullStateDeltaOp[_] ORSetFullManifest
case _
throw new IllegalArgumentException(s"Can't serialize object of type ${obj.getClass} in [${getClass.getName}]")
}
def toBinary(obj: AnyRef): Array[Byte] = obj match {
case m: ORSet[_] compress(orsetToProto(m))
case m: ORSet.AddDeltaOp[_] orsetToProto(m.underlying).toByteArray
case m: ORSet.RemoveDeltaOp[_] orsetToProto(m.underlying).toByteArray
case m: GSet[_] gsetToProto(m).toByteArray
case m: GCounter gcounterToProto(m).toByteArray
case m: PNCounter pncounterToProto(m).toByteArray
case m: Flag flagToProto(m).toByteArray
case m: LWWRegister[_] lwwRegisterToProto(m).toByteArray
case m: ORMap[_, _] compress(ormapToProto(m))
case m: LWWMap[_, _] compress(lwwmapToProto(m))
case m: PNCounterMap[_] compress(pncountermapToProto(m))
case m: ORMultiMap[_, _] compress(multimapToProto(m))
case DeletedData dm.Empty.getDefaultInstance.toByteArray
case m: VersionVector versionVectorToProto(m).toByteArray
case Key(id) keyIdToBinary(id)
case m: ORSet.DeltaGroup[_] orsetDeltaGroupToProto(m).toByteArray
case m: ORSet.FullStateDeltaOp[_] orsetToProto(m.underlying).toByteArray
case m: ORSet[_] compress(orsetToProto(m))
case m: ORSet.AddDeltaOp[_] orsetToProto(m.underlying).toByteArray
case m: ORSet.RemoveDeltaOp[_] orsetToProto(m.underlying).toByteArray
case m: GSet[_] gsetToProto(m).toByteArray
case m: GCounter gcounterToProto(m).toByteArray
case m: PNCounter pncounterToProto(m).toByteArray
case m: Flag flagToProto(m).toByteArray
case m: LWWRegister[_] lwwRegisterToProto(m).toByteArray
case m: ORMap[_, _] compress(ormapToProto(m))
case m: ORMap.PutDeltaOp[_, _] ormapPutToProto(m).toByteArray
case m: ORMap.RemoveDeltaOp[_, _] ormapRemoveToProto(m).toByteArray
case m: ORMap.RemoveKeyDeltaOp[_, _] ormapRemoveKeyToProto(m).toByteArray
case m: ORMap.UpdateDeltaOp[_, _] ormapUpdateToProto(m).toByteArray
case m: LWWMap[_, _] compress(lwwmapToProto(m))
case m: PNCounterMap[_] compress(pncountermapToProto(m))
case m: ORMultiMap[_, _] compress(multimapToProto(m))
case DeletedData dm.Empty.getDefaultInstance.toByteArray
case m: VersionVector versionVectorToProto(m).toByteArray
case Key(id) keyIdToBinary(id)
case m: ORSet.DeltaGroup[_] orsetDeltaGroupToProto(m).toByteArray
case m: ORMap.DeltaGroup[_, _] ormapDeltaGroupToProto(m).toByteArray
case m: ORSet.FullStateDeltaOp[_] orsetToProto(m.underlying).toByteArray
case _
throw new IllegalArgumentException(s"Can't serialize object of type ${obj.getClass} in [${getClass.getName}]")
}
@ -512,8 +548,9 @@ class ReplicatedDataSerializer(val system: ExtendedActorSystem)
}
def ormapToProto(ormap: ORMap[_, _]): rd.ORMap = {
val ormapBuilder = rd.ORMap.newBuilder()
val entries: jl.Iterable[rd.ORMap.Entry] = getEntries(ormap.values, rd.ORMap.Entry.newBuilder, otherMessageToProto)
rd.ORMap.newBuilder().setKeys(orsetToProto(ormap.keys)).addAllEntries(entries).build()
ormapBuilder.setKeys(orsetToProto(ormap.keys)).addAllEntries(entries).build()
}
def ormapFromBinary(bytes: Array[Byte]): ORMap[Any, ReplicatedData] =
@ -536,9 +573,144 @@ class ReplicatedDataSerializer(val system: ExtendedActorSystem)
entries)
}
def singleMapEntryFromProto[PEntry <: GeneratedMessage, A <: GeneratedMessage, B <: ReplicatedData](entry: PEntry, valueCreator: A B)(implicit eh: ProtoMapEntryReader[PEntry, A]): Map[Any, B] = {
val elem = if (eh.hasStringKey(entry)) Some(eh.getStringKey(entry) valueCreator(eh.getValue(entry)))
else if (eh.hasIntKey(entry)) Some(eh.getIntKey(entry) valueCreator(eh.getValue(entry)))
else if (eh.hasLongKey(entry)) Some(eh.getLongKey(entry) valueCreator(eh.getValue(entry)))
else if (eh.hasOtherKey(entry)) Some(otherMessageFromProto(eh.getOtherKey(entry)) valueCreator(eh.getValue(entry)))
else None
elem match {
case Some(e) Map(e)
case _ Map.empty[Any, B]
}
}
// wire protocol is always DeltaGroup
private def ormapPutFromBinary(bytes: Array[Byte]): ORMap.PutDeltaOp[Any, ReplicatedData] = {
val group = ormapDeltaGroupFromBinary(bytes)
if (group.ops.size == 1 && group.ops.head.isInstanceOf[ORMap.PutDeltaOp[_, _]])
group.ops.head.asInstanceOf[ORMap.PutDeltaOp[Any, ReplicatedData]]
else
throw new NotSerializableException("Improper ORMap delta put operation size or kind")
}
// wire protocol is always delta group
private def ormapRemoveFromBinary(bytes: Array[Byte]): ORMap.RemoveDeltaOp[Any, ReplicatedData] = {
val group = ormapDeltaGroupFromBinary(bytes)
if (group.ops.size == 1 && group.ops.head.isInstanceOf[ORMap.RemoveDeltaOp[_, _]])
group.ops.head.asInstanceOf[ORMap.RemoveDeltaOp[Any, ReplicatedData]]
else
throw new NotSerializableException("Improper ORMap delta remove operation size or kind")
}
// wire protocol is always delta group
private def ormapRemoveKeyFromBinary(bytes: Array[Byte]): ORMap.RemoveKeyDeltaOp[Any, ReplicatedData] = {
val group = ormapDeltaGroupFromBinary(bytes)
if (group.ops.size == 1 && group.ops.head.isInstanceOf[ORMap.RemoveKeyDeltaOp[_, _]])
group.ops.head.asInstanceOf[ORMap.RemoveKeyDeltaOp[Any, ReplicatedData]]
else
throw new NotSerializableException("Improper ORMap delta remove key operation size or kind")
}
// wire protocol is always delta group
private def ormapUpdateFromBinary(bytes: Array[Byte]): ORMap.UpdateDeltaOp[Any, ReplicatedDelta] = {
val group = ormapDeltaGroupFromBinary(bytes)
if (group.ops.size == 1 && group.ops.head.isInstanceOf[ORMap.UpdateDeltaOp[_, _]])
group.ops.head.asInstanceOf[ORMap.UpdateDeltaOp[Any, ReplicatedDelta]]
else
throw new NotSerializableException("Improper ORMap delta update operation size or kind")
}
// this can be made client-extendable in the same way as Http codes in Spray are
private def zeroTagFromCode(code: Int) = code match {
case ORMap.VanillaORMapTag.value ORMap.VanillaORMapTag
case ORMap.PNCounterMapTag.value ORMap.PNCounterMapTag
case ORMap.ORMultiMapTag.value ORMap.ORMultiMapTag
case ORMap.ORMultiMapWithValueDeltasTag.value ORMap.ORMultiMapWithValueDeltasTag
case ORMap.LWWMapTag.value ORMap.LWWMapTag
case _ throw new IllegalArgumentException("Invalid ZeroTag code")
}
private def ormapDeltaGroupFromBinary(bytes: Array[Byte]): ORMap.DeltaGroup[Any, ReplicatedData] = {
val deltaGroup = rd.ORMapDeltaGroup.parseFrom(bytes)
val ops: Vector[ORMap.DeltaOp] =
deltaGroup.getEntriesList.asScala.map { entry
if (entry.getOperation == rd.ORMapDeltaOp.ORMapPut) {
val map = singleMapEntryFromProto(entry.getEntryData, (v: dm.OtherMessage) otherMessageFromProto(v).asInstanceOf[ReplicatedData])
ORMap.PutDeltaOp(ORSet.AddDeltaOp(orsetFromProto(entry.getUnderlying)), map.head, zeroTagFromCode(entry.getZeroTag))
} else if (entry.getOperation == rd.ORMapDeltaOp.ORMapRemove) {
val map = singleMapEntryFromProto(entry.getEntryData, (v: dm.OtherMessage) otherMessageFromProto(v).asInstanceOf[ReplicatedData])
ORMap.RemoveDeltaOp(ORSet.RemoveDeltaOp(orsetFromProto(entry.getUnderlying)), zeroTagFromCode(entry.getZeroTag))
} else if (entry.getOperation == rd.ORMapDeltaOp.ORMapRemoveKey) {
val map = singleMapEntryFromProto(entry.getEntryData, (v: dm.OtherMessage) otherMessageFromProto(v).asInstanceOf[ReplicatedData])
ORMap.RemoveKeyDeltaOp(ORSet.RemoveDeltaOp(orsetFromProto(entry.getUnderlying)), map.keySet.head, zeroTagFromCode(entry.getZeroTag))
} else if (entry.getOperation == rd.ORMapDeltaOp.ORMapUpdate) {
val map = singleMapEntryFromProto(entry.getEntryData, (v: dm.OtherMessage) otherMessageFromProto(v).asInstanceOf[ReplicatedDelta])
ORMap.UpdateDeltaOp(ORSet.AddDeltaOp(orsetFromProto(entry.getUnderlying)), map, zeroTagFromCode(entry.getZeroTag))
} else
throw new NotSerializableException(s"Unknown ORMap delta operation ${entry.getOperation}")
}(collection.breakOut)
ORMap.DeltaGroup(ops)
}
private def ormapPutToProto(addDelta: ORMap.PutDeltaOp[_, _]): rd.ORMapDeltaGroup = {
ormapDeltaGroupToProto(ORMap.DeltaGroup(scala.collection.immutable.IndexedSeq(addDelta.asInstanceOf[ORMap.DeltaOp])))
}
private def ormapRemoveToProto(addDelta: ORMap.RemoveDeltaOp[_, _]): rd.ORMapDeltaGroup = {
ormapDeltaGroupToProto(ORMap.DeltaGroup(scala.collection.immutable.IndexedSeq(addDelta.asInstanceOf[ORMap.DeltaOp])))
}
private def ormapRemoveKeyToProto(addDelta: ORMap.RemoveKeyDeltaOp[_, _]): rd.ORMapDeltaGroup = {
ormapDeltaGroupToProto(ORMap.DeltaGroup(scala.collection.immutable.IndexedSeq(addDelta.asInstanceOf[ORMap.DeltaOp])))
}
private def ormapUpdateToProto(addDelta: ORMap.UpdateDeltaOp[_, _]): rd.ORMapDeltaGroup = {
ormapDeltaGroupToProto(ORMap.DeltaGroup(scala.collection.immutable.IndexedSeq(addDelta.asInstanceOf[ORMap.DeltaOp])))
}
private def ormapDeltaGroupToProto(deltaGroup: ORMap.DeltaGroup[_, _]): rd.ORMapDeltaGroup = {
def createEntry(opType: rd.ORMapDeltaOp, u: ORSet[_], m: Map[_, _], zt: Int) = {
if (m.size > 1)
throw new IllegalArgumentException("Invalid size of ORMap delta map")
else {
val entryDataBuilder = rd.ORMapDeltaGroup.MapEntry.newBuilder()
m.headOption.map {
case (key: String, value) entryDataBuilder.setStringKey(key).setValue(otherMessageToProto(value))
case (key: Int, value) entryDataBuilder.setIntKey(key).setValue(otherMessageToProto(value))
case (key: Long, value) entryDataBuilder.setLongKey(key).setValue(otherMessageToProto(value))
case (key, value) entryDataBuilder.setOtherKey(otherMessageToProto(key)).setValue(otherMessageToProto(value))
}
val builder = rd.ORMapDeltaGroup.Entry.newBuilder()
.setOperation(opType)
.setUnderlying(orsetToProto(u))
.setZeroTag(zt)
if (m.size > 0)
builder.setEntryData(entryDataBuilder.build())
builder
}
}
val b = rd.ORMapDeltaGroup.newBuilder()
deltaGroup.ops.foreach {
case ORMap.PutDeltaOp(op, pair, zt)
b.addEntries(createEntry(rd.ORMapDeltaOp.ORMapPut, op.asInstanceOf[ORSet.AddDeltaOp[_]].underlying, Map(pair), zt.value))
case ORMap.RemoveDeltaOp(op, zt)
b.addEntries(createEntry(rd.ORMapDeltaOp.ORMapRemove, op.asInstanceOf[ORSet.RemoveDeltaOp[_]].underlying, Map.empty, zt.value))
case ORMap.RemoveKeyDeltaOp(op, k, zt)
b.addEntries(createEntry(rd.ORMapDeltaOp.ORMapRemove, op.asInstanceOf[ORSet.RemoveDeltaOp[_]].underlying, Map(k k), zt.value))
case ORMap.UpdateDeltaOp(op, m, zt)
b.addEntries(createEntry(rd.ORMapDeltaOp.ORMapUpdate, op.asInstanceOf[ORSet.AddDeltaOp[_]].underlying, m, zt.value))
case ORMap.DeltaGroup(u)
throw new IllegalArgumentException("ORMap.DeltaGroup should not be nested")
}
b.build()
}
def lwwmapToProto(lwwmap: LWWMap[_, _]): rd.LWWMap = {
val lwwmapBuilder = rd.LWWMap.newBuilder()
val entries: jl.Iterable[rd.LWWMap.Entry] = getEntries(lwwmap.underlying.entries, rd.LWWMap.Entry.newBuilder, lwwRegisterToProto)
rd.LWWMap.newBuilder().setKeys(orsetToProto(lwwmap.underlying.keys)).addAllEntries(entries).build()
lwwmapBuilder.setKeys(orsetToProto(lwwmap.underlying.keys)).addAllEntries(entries).build()
}
def lwwmapFromBinary(bytes: Array[Byte]): LWWMap[Any, Any] =
@ -548,12 +720,13 @@ class ReplicatedDataSerializer(val system: ExtendedActorSystem)
val entries = mapTypeFromProto(lwwmap.getEntriesList, lwwRegisterFromProto)
new LWWMap(new ORMap(
keys = orsetFromProto(lwwmap.getKeys),
entries))
entries, ORMap.LWWMapTag))
}
def pncountermapToProto(pncountermap: PNCounterMap[_]): rd.PNCounterMap = {
val pncountermapBuilder = rd.PNCounterMap.newBuilder()
val entries: jl.Iterable[rd.PNCounterMap.Entry] = getEntries(pncountermap.underlying.entries, rd.PNCounterMap.Entry.newBuilder, pncounterToProto)
rd.PNCounterMap.newBuilder().setKeys(orsetToProto(pncountermap.underlying.keys)).addAllEntries(entries).build()
pncountermapBuilder.setKeys(orsetToProto(pncountermap.underlying.keys)).addAllEntries(entries).build()
}
def pncountermapFromBinary(bytes: Array[Byte]): PNCounterMap[_] =
@ -563,12 +736,16 @@ class ReplicatedDataSerializer(val system: ExtendedActorSystem)
val entries = mapTypeFromProto(pncountermap.getEntriesList, pncounterFromProto)
new PNCounterMap(new ORMap(
keys = orsetFromProto(pncountermap.getKeys),
entries))
entries, ORMap.PNCounterMapTag))
}
def multimapToProto(multimap: ORMultiMap[_, _]): rd.ORMultiMap = {
val ormultimapBuilder = rd.ORMultiMap.newBuilder()
val entries: jl.Iterable[rd.ORMultiMap.Entry] = getEntries(multimap.underlying.entries, rd.ORMultiMap.Entry.newBuilder, orsetToProto)
rd.ORMultiMap.newBuilder().setKeys(orsetToProto(multimap.underlying.keys)).addAllEntries(entries).build()
ormultimapBuilder.setKeys(orsetToProto(multimap.underlying.keys)).addAllEntries(entries)
if (multimap.withValueDeltas)
ormultimapBuilder.setWithValueDeltas(true)
ormultimapBuilder.build()
}
def multimapFromBinary(bytes: Array[Byte]): ORMultiMap[Any, Any] =
@ -576,9 +753,18 @@ class ReplicatedDataSerializer(val system: ExtendedActorSystem)
def multimapFromProto(multimap: rd.ORMultiMap): ORMultiMap[Any, Any] = {
val entries = mapTypeFromProto(multimap.getEntriesList, orsetFromProto)
new ORMultiMap(new ORMap(
keys = orsetFromProto(multimap.getKeys),
entries))
val withValueDeltas = if (multimap.hasWithValueDeltas)
multimap.getWithValueDeltas
else false
new ORMultiMap(
new ORMap(
keys = orsetFromProto(multimap.getKeys),
entries,
if (withValueDeltas)
ORMap.ORMultiMapWithValueDeltasTag
else
ORMap.ORMultiMapTag),
withValueDeltas)
}
def keyIdToBinary(id: String): Array[Byte] =