852 lines
44 KiB
Scala
852 lines
44 KiB
Scala
/**
|
|
* Copyright (C) 2009-2018 Lightbend Inc. <https://www.lightbend.com>
|
|
*/
|
|
|
|
package akka.cluster.ddata.protobuf
|
|
|
|
import java.{ util, lang ⇒ jl }
|
|
import java.util.ArrayList
|
|
import java.util.Collections
|
|
import java.util.Comparator
|
|
import java.util.TreeSet
|
|
|
|
import scala.annotation.tailrec
|
|
import scala.collection.JavaConverters._
|
|
import scala.collection.breakOut
|
|
|
|
import akka.actor.ExtendedActorSystem
|
|
import akka.cluster.ddata._
|
|
import akka.cluster.ddata.Replicator.Internal._
|
|
import akka.cluster.ddata.protobuf.msg.{ ReplicatedDataMessages ⇒ rd }
|
|
import akka.cluster.ddata.protobuf.msg.{ ReplicatorMessages ⇒ dm }
|
|
import akka.serialization.SerializerWithStringManifest
|
|
import akka.serialization.BaseSerializer
|
|
import akka.protobuf.{ ByteString, GeneratedMessage }
|
|
import akka.util.ByteString.UTF_8
|
|
import java.io.NotSerializableException
|
|
|
|
import akka.actor.ActorRef
|
|
import akka.cluster.ddata.protobuf.msg.ReplicatorMessages.OtherMessage
|
|
import akka.serialization.Serialization
|
|
|
|
private object ReplicatedDataSerializer {
|
|
/*
|
|
* Generic superclass to allow to compare Entry types used in protobuf.
|
|
*/
|
|
abstract class KeyComparator[A <: GeneratedMessage] extends Comparator[A] {
|
|
/**
|
|
* Get the key from the entry. The key may be a String, Integer, Long, or Any
|
|
* @param entry The protobuf entry used with Map types
|
|
* @return The Key
|
|
*/
|
|
def getKey(entry: A): Any
|
|
final def compare(x: A, y: A): Int = compareKeys(getKey(x), getKey(y))
|
|
private final def compareKeys(t1: Any, t2: Any): Int = (t1, t2) match {
|
|
case (k1: String, k2: String) ⇒ k1.compareTo(k2)
|
|
case (k1: String, k2) ⇒ -1
|
|
case (k1, k2: String) ⇒ 1
|
|
case (k1: Int, k2: Int) ⇒ k1.compareTo(k2)
|
|
case (k1: Int, k2) ⇒ -1
|
|
case (k1, k2: Int) ⇒ 1
|
|
case (k1: Long, k2: Long) ⇒ k1.compareTo(k2)
|
|
case (k1: Long, k2) ⇒ -1
|
|
case (k1, k2: Long) ⇒ 1
|
|
case (k1: OtherMessage, k2: OtherMessage) ⇒ OtherMessageComparator.compare(k1, k2)
|
|
}
|
|
}
|
|
|
|
implicit object ORMapEntryComparator extends KeyComparator[rd.ORMap.Entry] {
|
|
override def getKey(e: rd.ORMap.Entry): Any = if (e.hasStringKey) e.getStringKey else if (e.hasIntKey) e.getIntKey else if (e.hasLongKey) e.getLongKey else e.getOtherKey
|
|
}
|
|
implicit object LWWMapEntryComparator extends KeyComparator[rd.LWWMap.Entry] {
|
|
override def getKey(e: rd.LWWMap.Entry): Any = if (e.hasStringKey) e.getStringKey else if (e.hasIntKey) e.getIntKey else if (e.hasLongKey) e.getLongKey else e.getOtherKey
|
|
}
|
|
implicit object PNCounterMapEntryComparator extends KeyComparator[rd.PNCounterMap.Entry] {
|
|
override def getKey(e: rd.PNCounterMap.Entry): Any = if (e.hasStringKey) e.getStringKey else if (e.hasIntKey) e.getIntKey else if (e.hasLongKey) e.getLongKey else e.getOtherKey
|
|
}
|
|
implicit object ORMultiMapEntryComparator extends KeyComparator[rd.ORMultiMap.Entry] {
|
|
override def getKey(e: rd.ORMultiMap.Entry): Any = if (e.hasStringKey) e.getStringKey else if (e.hasIntKey) e.getIntKey else if (e.hasLongKey) e.getLongKey else e.getOtherKey
|
|
}
|
|
|
|
sealed trait ProtoMapEntryWriter[Entry <: GeneratedMessage, EntryBuilder <: GeneratedMessage.Builder[EntryBuilder], Value <: GeneratedMessage] {
|
|
def setStringKey(builder: EntryBuilder, key: String, value: Value): Entry
|
|
def setLongKey(builder: EntryBuilder, key: Long, value: Value): Entry
|
|
def setIntKey(builder: EntryBuilder, key: Int, value: Value): Entry
|
|
def setOtherKey(builder: EntryBuilder, key: dm.OtherMessage, value: Value): Entry
|
|
}
|
|
|
|
sealed trait ProtoMapEntryReader[Entry <: GeneratedMessage, A <: GeneratedMessage] {
|
|
def hasStringKey(entry: Entry): Boolean
|
|
def getStringKey(entry: Entry): String
|
|
def hasIntKey(entry: Entry): Boolean
|
|
def getIntKey(entry: Entry): Int
|
|
def hasLongKey(entry: Entry): Boolean
|
|
def getLongKey(entry: Entry): Long
|
|
def hasOtherKey(entry: Entry): Boolean
|
|
def getOtherKey(entry: Entry): dm.OtherMessage
|
|
def getValue(entry: Entry): A
|
|
}
|
|
|
|
implicit object ORMapEntry extends ProtoMapEntryWriter[rd.ORMap.Entry, rd.ORMap.Entry.Builder, dm.OtherMessage] with ProtoMapEntryReader[rd.ORMap.Entry, dm.OtherMessage] {
|
|
override def setStringKey(builder: rd.ORMap.Entry.Builder, key: String, value: dm.OtherMessage): rd.ORMap.Entry = builder.setStringKey(key).setValue(value).build()
|
|
override def setLongKey(builder: rd.ORMap.Entry.Builder, key: Long, value: dm.OtherMessage): rd.ORMap.Entry = builder.setLongKey(key).setValue(value).build()
|
|
override def setIntKey(builder: rd.ORMap.Entry.Builder, key: Int, value: dm.OtherMessage): rd.ORMap.Entry = builder.setIntKey(key).setValue(value).build()
|
|
override def setOtherKey(builder: rd.ORMap.Entry.Builder, key: dm.OtherMessage, value: dm.OtherMessage): rd.ORMap.Entry = builder.setOtherKey(key).setValue(value).build()
|
|
override def hasStringKey(entry: rd.ORMap.Entry): Boolean = entry.hasStringKey
|
|
override def getStringKey(entry: rd.ORMap.Entry): String = entry.getStringKey
|
|
override def hasIntKey(entry: rd.ORMap.Entry): Boolean = entry.hasIntKey
|
|
override def getIntKey(entry: rd.ORMap.Entry): Int = entry.getIntKey
|
|
override def hasLongKey(entry: rd.ORMap.Entry): Boolean = entry.hasLongKey
|
|
override def getLongKey(entry: rd.ORMap.Entry): Long = entry.getLongKey
|
|
override def hasOtherKey(entry: rd.ORMap.Entry): Boolean = entry.hasOtherKey
|
|
override def getOtherKey(entry: rd.ORMap.Entry): OtherMessage = entry.getOtherKey
|
|
override def getValue(entry: rd.ORMap.Entry): dm.OtherMessage = entry.getValue
|
|
}
|
|
|
|
implicit object LWWMapEntry extends ProtoMapEntryWriter[rd.LWWMap.Entry, rd.LWWMap.Entry.Builder, rd.LWWRegister] with ProtoMapEntryReader[rd.LWWMap.Entry, rd.LWWRegister] {
|
|
override def setStringKey(builder: rd.LWWMap.Entry.Builder, key: String, value: rd.LWWRegister): rd.LWWMap.Entry = builder.setStringKey(key).setValue(value).build()
|
|
override def setLongKey(builder: rd.LWWMap.Entry.Builder, key: Long, value: rd.LWWRegister): rd.LWWMap.Entry = builder.setLongKey(key).setValue(value).build()
|
|
override def setIntKey(builder: rd.LWWMap.Entry.Builder, key: Int, value: rd.LWWRegister): rd.LWWMap.Entry = builder.setIntKey(key).setValue(value).build()
|
|
override def setOtherKey(builder: rd.LWWMap.Entry.Builder, key: OtherMessage, value: rd.LWWRegister): rd.LWWMap.Entry = builder.setOtherKey(key).setValue(value).build()
|
|
override def hasStringKey(entry: rd.LWWMap.Entry): Boolean = entry.hasStringKey
|
|
override def getStringKey(entry: rd.LWWMap.Entry): String = entry.getStringKey
|
|
override def hasIntKey(entry: rd.LWWMap.Entry): Boolean = entry.hasIntKey
|
|
override def getIntKey(entry: rd.LWWMap.Entry): Int = entry.getIntKey
|
|
override def hasLongKey(entry: rd.LWWMap.Entry): Boolean = entry.hasLongKey
|
|
override def getLongKey(entry: rd.LWWMap.Entry): Long = entry.getLongKey
|
|
override def hasOtherKey(entry: rd.LWWMap.Entry): Boolean = entry.hasOtherKey
|
|
override def getOtherKey(entry: rd.LWWMap.Entry): OtherMessage = entry.getOtherKey
|
|
override def getValue(entry: rd.LWWMap.Entry): rd.LWWRegister = entry.getValue
|
|
}
|
|
|
|
implicit object PNCounterMapEntry extends ProtoMapEntryWriter[rd.PNCounterMap.Entry, rd.PNCounterMap.Entry.Builder, rd.PNCounter] with ProtoMapEntryReader[rd.PNCounterMap.Entry, rd.PNCounter] {
|
|
override def setStringKey(builder: rd.PNCounterMap.Entry.Builder, key: String, value: rd.PNCounter): rd.PNCounterMap.Entry = builder.setStringKey(key).setValue(value).build()
|
|
override def setLongKey(builder: rd.PNCounterMap.Entry.Builder, key: Long, value: rd.PNCounter): rd.PNCounterMap.Entry = builder.setLongKey(key).setValue(value).build()
|
|
override def setIntKey(builder: rd.PNCounterMap.Entry.Builder, key: Int, value: rd.PNCounter): rd.PNCounterMap.Entry = builder.setIntKey(key).setValue(value).build()
|
|
override def setOtherKey(builder: rd.PNCounterMap.Entry.Builder, key: OtherMessage, value: rd.PNCounter): rd.PNCounterMap.Entry = builder.setOtherKey(key).setValue(value).build()
|
|
override def hasStringKey(entry: rd.PNCounterMap.Entry): Boolean = entry.hasStringKey
|
|
override def getStringKey(entry: rd.PNCounterMap.Entry): String = entry.getStringKey
|
|
override def hasIntKey(entry: rd.PNCounterMap.Entry): Boolean = entry.hasIntKey
|
|
override def getIntKey(entry: rd.PNCounterMap.Entry): Int = entry.getIntKey
|
|
override def hasLongKey(entry: rd.PNCounterMap.Entry): Boolean = entry.hasLongKey
|
|
override def getLongKey(entry: rd.PNCounterMap.Entry): Long = entry.getLongKey
|
|
override def hasOtherKey(entry: rd.PNCounterMap.Entry): Boolean = entry.hasOtherKey
|
|
override def getOtherKey(entry: rd.PNCounterMap.Entry): OtherMessage = entry.getOtherKey
|
|
override def getValue(entry: rd.PNCounterMap.Entry): rd.PNCounter = entry.getValue
|
|
}
|
|
|
|
implicit object ORMultiMapEntry extends ProtoMapEntryWriter[rd.ORMultiMap.Entry, rd.ORMultiMap.Entry.Builder, rd.ORSet] with ProtoMapEntryReader[rd.ORMultiMap.Entry, rd.ORSet] {
|
|
override def setStringKey(builder: rd.ORMultiMap.Entry.Builder, key: String, value: rd.ORSet): rd.ORMultiMap.Entry = builder.setStringKey(key).setValue(value).build()
|
|
override def setLongKey(builder: rd.ORMultiMap.Entry.Builder, key: Long, value: rd.ORSet): rd.ORMultiMap.Entry = builder.setLongKey(key).setValue(value).build()
|
|
override def setIntKey(builder: rd.ORMultiMap.Entry.Builder, key: Int, value: rd.ORSet): rd.ORMultiMap.Entry = builder.setIntKey(key).setValue(value).build()
|
|
override def setOtherKey(builder: rd.ORMultiMap.Entry.Builder, key: dm.OtherMessage, value: rd.ORSet): rd.ORMultiMap.Entry = builder.setOtherKey(key).setValue(value).build()
|
|
override def hasStringKey(entry: rd.ORMultiMap.Entry): Boolean = entry.hasStringKey
|
|
override def getStringKey(entry: rd.ORMultiMap.Entry): String = entry.getStringKey
|
|
override def hasIntKey(entry: rd.ORMultiMap.Entry): Boolean = entry.hasIntKey
|
|
override def getIntKey(entry: rd.ORMultiMap.Entry): Int = entry.getIntKey
|
|
override def hasLongKey(entry: rd.ORMultiMap.Entry): Boolean = entry.hasLongKey
|
|
override def getLongKey(entry: rd.ORMultiMap.Entry): Long = entry.getLongKey
|
|
override def hasOtherKey(entry: rd.ORMultiMap.Entry): Boolean = entry.hasOtherKey
|
|
override def getOtherKey(entry: rd.ORMultiMap.Entry): OtherMessage = entry.getOtherKey
|
|
override def getValue(entry: rd.ORMultiMap.Entry): rd.ORSet = entry.getValue
|
|
}
|
|
|
|
implicit object ORMapDeltaGroupEntry extends ProtoMapEntryWriter[rd.ORMapDeltaGroup.MapEntry, rd.ORMapDeltaGroup.MapEntry.Builder, dm.OtherMessage] with ProtoMapEntryReader[rd.ORMapDeltaGroup.MapEntry, dm.OtherMessage] {
|
|
override def setStringKey(builder: rd.ORMapDeltaGroup.MapEntry.Builder, key: String, value: dm.OtherMessage): rd.ORMapDeltaGroup.MapEntry = builder.setStringKey(key).setValue(value).build()
|
|
override def setLongKey(builder: rd.ORMapDeltaGroup.MapEntry.Builder, key: Long, value: dm.OtherMessage): rd.ORMapDeltaGroup.MapEntry = builder.setLongKey(key).setValue(value).build()
|
|
override def setIntKey(builder: rd.ORMapDeltaGroup.MapEntry.Builder, key: Int, value: dm.OtherMessage): rd.ORMapDeltaGroup.MapEntry = builder.setIntKey(key).setValue(value).build()
|
|
override def setOtherKey(builder: rd.ORMapDeltaGroup.MapEntry.Builder, key: dm.OtherMessage, value: dm.OtherMessage): rd.ORMapDeltaGroup.MapEntry = builder.setOtherKey(key).setValue(value).build()
|
|
override def hasStringKey(entry: rd.ORMapDeltaGroup.MapEntry): Boolean = entry.hasStringKey
|
|
override def getStringKey(entry: rd.ORMapDeltaGroup.MapEntry): String = entry.getStringKey
|
|
override def hasIntKey(entry: rd.ORMapDeltaGroup.MapEntry): Boolean = entry.hasIntKey
|
|
override def getIntKey(entry: rd.ORMapDeltaGroup.MapEntry): Int = entry.getIntKey
|
|
override def hasLongKey(entry: rd.ORMapDeltaGroup.MapEntry): Boolean = entry.hasLongKey
|
|
override def getLongKey(entry: rd.ORMapDeltaGroup.MapEntry): Long = entry.getLongKey
|
|
override def hasOtherKey(entry: rd.ORMapDeltaGroup.MapEntry): Boolean = entry.hasOtherKey
|
|
override def getOtherKey(entry: rd.ORMapDeltaGroup.MapEntry): OtherMessage = entry.getOtherKey
|
|
override def getValue(entry: rd.ORMapDeltaGroup.MapEntry): dm.OtherMessage = entry.getValue
|
|
}
|
|
|
|
}
|
|
|
|
/**
|
|
* Protobuf serializer of ReplicatedData.
|
|
*/
|
|
class ReplicatedDataSerializer(val system: ExtendedActorSystem)
|
|
extends SerializerWithStringManifest with SerializationSupport with BaseSerializer {
|
|
|
|
import ReplicatedDataSerializer._
|
|
|
|
private val DeletedDataManifest = "A"
|
|
private val GSetManifest = "B"
|
|
private val GSetKeyManifest = "b"
|
|
private val ORSetManifest = "C"
|
|
private val ORSetKeyManifest = "c"
|
|
private val ORSetAddManifest = "Ca"
|
|
private val ORSetRemoveManifest = "Cr"
|
|
private val ORSetFullManifest = "Cf"
|
|
private val ORSetDeltaGroupManifest = "Cg"
|
|
private val FlagManifest = "D"
|
|
private val FlagKeyManifest = "d"
|
|
private val LWWRegisterManifest = "E"
|
|
private val LWWRegisterKeyManifest = "e"
|
|
private val GCounterManifest = "F"
|
|
private val GCounterKeyManifest = "f"
|
|
private val PNCounterManifest = "G"
|
|
private val PNCounterKeyManifest = "g"
|
|
private val ORMapManifest = "H"
|
|
private val ORMapKeyManifest = "h"
|
|
private val ORMapPutManifest = "Ha"
|
|
private val ORMapRemoveManifest = "Hr"
|
|
private val ORMapRemoveKeyManifest = "Hk"
|
|
private val ORMapUpdateManifest = "Hu"
|
|
private val ORMapDeltaGroupManifest = "Hg"
|
|
private val LWWMapManifest = "I"
|
|
private val LWWMapKeyManifest = "i"
|
|
private val PNCounterMapManifest = "J"
|
|
private val PNCounterMapKeyManifest = "j"
|
|
private val ORMultiMapManifest = "K"
|
|
private val ORMultiMapKeyManifest = "k"
|
|
private val VersionVectorManifest = "L"
|
|
|
|
private val fromBinaryMap = collection.immutable.HashMap[String, Array[Byte] ⇒ AnyRef](
|
|
GSetManifest → gsetFromBinary,
|
|
ORSetManifest → orsetFromBinary,
|
|
ORSetAddManifest → orsetAddFromBinary,
|
|
ORSetRemoveManifest → orsetRemoveFromBinary,
|
|
ORSetFullManifest → orsetFullFromBinary,
|
|
ORSetDeltaGroupManifest → orsetDeltaGroupFromBinary,
|
|
FlagManifest → flagFromBinary,
|
|
LWWRegisterManifest → lwwRegisterFromBinary,
|
|
GCounterManifest → gcounterFromBinary,
|
|
PNCounterManifest → pncounterFromBinary,
|
|
ORMapManifest → ormapFromBinary,
|
|
ORMapPutManifest → ormapPutFromBinary,
|
|
ORMapRemoveManifest → ormapRemoveFromBinary,
|
|
ORMapRemoveKeyManifest → ormapRemoveKeyFromBinary,
|
|
ORMapUpdateManifest → ormapUpdateFromBinary,
|
|
ORMapDeltaGroupManifest → ormapDeltaGroupFromBinary,
|
|
LWWMapManifest → lwwmapFromBinary,
|
|
PNCounterMapManifest → pncountermapFromBinary,
|
|
ORMultiMapManifest → multimapFromBinary,
|
|
DeletedDataManifest → (_ ⇒ DeletedData),
|
|
VersionVectorManifest → versionVectorFromBinary,
|
|
|
|
GSetKeyManifest → (bytes ⇒ GSetKey(keyIdFromBinary(bytes))),
|
|
ORSetKeyManifest → (bytes ⇒ ORSetKey(keyIdFromBinary(bytes))),
|
|
FlagKeyManifest → (bytes ⇒ FlagKey(keyIdFromBinary(bytes))),
|
|
LWWRegisterKeyManifest → (bytes ⇒ LWWRegisterKey(keyIdFromBinary(bytes))),
|
|
GCounterKeyManifest → (bytes ⇒ GCounterKey(keyIdFromBinary(bytes))),
|
|
PNCounterKeyManifest → (bytes ⇒ PNCounterKey(keyIdFromBinary(bytes))),
|
|
ORMapKeyManifest → (bytes ⇒ ORMapKey(keyIdFromBinary(bytes))),
|
|
LWWMapKeyManifest → (bytes ⇒ LWWMapKey(keyIdFromBinary(bytes))),
|
|
PNCounterMapKeyManifest → (bytes ⇒ PNCounterMapKey(keyIdFromBinary(bytes))),
|
|
ORMultiMapKeyManifest → (bytes ⇒ ORMultiMapKey(keyIdFromBinary(bytes))))
|
|
|
|
override def manifest(obj: AnyRef): String = obj match {
|
|
case _: ORSet[_] ⇒ ORSetManifest
|
|
case _: ORSet.AddDeltaOp[_] ⇒ ORSetAddManifest
|
|
case _: ORSet.RemoveDeltaOp[_] ⇒ ORSetRemoveManifest
|
|
case _: GSet[_] ⇒ GSetManifest
|
|
case _: GCounter ⇒ GCounterManifest
|
|
case _: PNCounter ⇒ PNCounterManifest
|
|
case _: Flag ⇒ FlagManifest
|
|
case _: LWWRegister[_] ⇒ LWWRegisterManifest
|
|
case _: ORMap[_, _] ⇒ ORMapManifest
|
|
case _: ORMap.PutDeltaOp[_, _] ⇒ ORMapPutManifest
|
|
case _: ORMap.RemoveDeltaOp[_, _] ⇒ ORMapRemoveManifest
|
|
case _: ORMap.RemoveKeyDeltaOp[_, _] ⇒ ORMapRemoveKeyManifest
|
|
case _: ORMap.UpdateDeltaOp[_, _] ⇒ ORMapUpdateManifest
|
|
case _: LWWMap[_, _] ⇒ LWWMapManifest
|
|
case _: PNCounterMap[_] ⇒ PNCounterMapManifest
|
|
case _: ORMultiMap[_, _] ⇒ ORMultiMapManifest
|
|
case DeletedData ⇒ DeletedDataManifest
|
|
case _: VersionVector ⇒ VersionVectorManifest
|
|
|
|
case _: ORSetKey[_] ⇒ ORSetKeyManifest
|
|
case _: GSetKey[_] ⇒ GSetKeyManifest
|
|
case _: GCounterKey ⇒ GCounterKeyManifest
|
|
case _: PNCounterKey ⇒ PNCounterKeyManifest
|
|
case _: FlagKey ⇒ FlagKeyManifest
|
|
case _: LWWRegisterKey[_] ⇒ LWWRegisterKeyManifest
|
|
case _: ORMapKey[_, _] ⇒ ORMapKeyManifest
|
|
case _: LWWMapKey[_, _] ⇒ LWWMapKeyManifest
|
|
case _: PNCounterMapKey[_] ⇒ PNCounterMapKeyManifest
|
|
case _: ORMultiMapKey[_, _] ⇒ ORMultiMapKeyManifest
|
|
|
|
case _: ORSet.DeltaGroup[_] ⇒ ORSetDeltaGroupManifest
|
|
case _: ORMap.DeltaGroup[_, _] ⇒ ORMapDeltaGroupManifest
|
|
case _: ORSet.FullStateDeltaOp[_] ⇒ ORSetFullManifest
|
|
|
|
case _ ⇒
|
|
throw new IllegalArgumentException(s"Can't serialize object of type ${obj.getClass} in [${getClass.getName}]")
|
|
}
|
|
|
|
def toBinary(obj: AnyRef): Array[Byte] = obj match {
|
|
case m: ORSet[_] ⇒ compress(orsetToProto(m))
|
|
case m: ORSet.AddDeltaOp[_] ⇒ orsetToProto(m.underlying).toByteArray
|
|
case m: ORSet.RemoveDeltaOp[_] ⇒ orsetToProto(m.underlying).toByteArray
|
|
case m: GSet[_] ⇒ gsetToProto(m).toByteArray
|
|
case m: GCounter ⇒ gcounterToProto(m).toByteArray
|
|
case m: PNCounter ⇒ pncounterToProto(m).toByteArray
|
|
case m: Flag ⇒ flagToProto(m).toByteArray
|
|
case m: LWWRegister[_] ⇒ lwwRegisterToProto(m).toByteArray
|
|
case m: ORMap[_, _] ⇒ compress(ormapToProto(m))
|
|
case m: ORMap.PutDeltaOp[_, _] ⇒ ormapPutToProto(m).toByteArray
|
|
case m: ORMap.RemoveDeltaOp[_, _] ⇒ ormapRemoveToProto(m).toByteArray
|
|
case m: ORMap.RemoveKeyDeltaOp[_, _] ⇒ ormapRemoveKeyToProto(m).toByteArray
|
|
case m: ORMap.UpdateDeltaOp[_, _] ⇒ ormapUpdateToProto(m).toByteArray
|
|
case m: LWWMap[_, _] ⇒ compress(lwwmapToProto(m))
|
|
case m: PNCounterMap[_] ⇒ compress(pncountermapToProto(m))
|
|
case m: ORMultiMap[_, _] ⇒ compress(multimapToProto(m))
|
|
case DeletedData ⇒ dm.Empty.getDefaultInstance.toByteArray
|
|
case m: VersionVector ⇒ versionVectorToProto(m).toByteArray
|
|
case Key(id) ⇒ keyIdToBinary(id)
|
|
case m: ORSet.DeltaGroup[_] ⇒ orsetDeltaGroupToProto(m).toByteArray
|
|
case m: ORMap.DeltaGroup[_, _] ⇒ ormapDeltaGroupToProto(m).toByteArray
|
|
case m: ORSet.FullStateDeltaOp[_] ⇒ orsetToProto(m.underlying).toByteArray
|
|
case _ ⇒
|
|
throw new IllegalArgumentException(s"Can't serialize object of type ${obj.getClass} in [${getClass.getName}]")
|
|
}
|
|
|
|
override def fromBinary(bytes: Array[Byte], manifest: String): AnyRef =
|
|
fromBinaryMap.get(manifest) match {
|
|
case Some(f) ⇒ f(bytes)
|
|
case None ⇒ throw new NotSerializableException(
|
|
s"Unimplemented deserialization of message with manifest [$manifest] in [${getClass.getName}]")
|
|
}
|
|
|
|
def gsetToProto(gset: GSet[_]): rd.GSet = {
|
|
val b = rd.GSet.newBuilder()
|
|
// using java collections and sorting for performance (avoid conversions)
|
|
val stringElements = new ArrayList[String]
|
|
val intElements = new ArrayList[Integer]
|
|
val longElements = new ArrayList[jl.Long]
|
|
val otherElements = new ArrayList[dm.OtherMessage]
|
|
val actorRefElements = new ArrayList[String]
|
|
gset.elements.foreach {
|
|
case s: String ⇒ stringElements.add(s)
|
|
case i: Int ⇒ intElements.add(i)
|
|
case l: Long ⇒ longElements.add(l)
|
|
case ref: ActorRef ⇒ actorRefElements.add(Serialization.serializedActorPath(ref))
|
|
case other ⇒ otherElements.add(otherMessageToProto(other))
|
|
}
|
|
if (!stringElements.isEmpty) {
|
|
Collections.sort(stringElements)
|
|
b.addAllStringElements(stringElements)
|
|
}
|
|
if (!intElements.isEmpty) {
|
|
Collections.sort(intElements)
|
|
b.addAllIntElements(intElements)
|
|
}
|
|
if (!longElements.isEmpty) {
|
|
Collections.sort(longElements)
|
|
b.addAllLongElements(longElements)
|
|
}
|
|
if (!otherElements.isEmpty) {
|
|
Collections.sort(otherElements, OtherMessageComparator)
|
|
b.addAllOtherElements(otherElements)
|
|
}
|
|
if (!actorRefElements.isEmpty) {
|
|
Collections.sort(actorRefElements)
|
|
b.addAllActorRefElements(actorRefElements)
|
|
}
|
|
b.build()
|
|
}
|
|
|
|
def gsetFromBinary(bytes: Array[Byte]): GSet[_] =
|
|
gsetFromProto(rd.GSet.parseFrom(bytes))
|
|
|
|
def gsetFromProto(gset: rd.GSet): GSet[Any] = {
|
|
val elements: Iterator[Any] = {
|
|
gset.getStringElementsList.iterator.asScala ++
|
|
gset.getIntElementsList.iterator.asScala ++
|
|
gset.getLongElementsList.iterator.asScala ++
|
|
gset.getOtherElementsList.iterator.asScala.map(otherMessageFromProto) ++
|
|
gset.getActorRefElementsList.iterator.asScala.map(resolveActorRef)
|
|
}
|
|
GSet(elements.toSet)
|
|
}
|
|
|
|
def orsetToProto(orset: ORSet[_]): rd.ORSet =
|
|
orsetToProtoImpl(orset.asInstanceOf[ORSet[Any]])
|
|
|
|
private def orsetToProtoImpl(orset: ORSet[Any]): rd.ORSet = {
|
|
val b = rd.ORSet.newBuilder().setVvector(versionVectorToProto(orset.vvector))
|
|
// using java collections and sorting for performance (avoid conversions)
|
|
val stringElements = new ArrayList[String]
|
|
val intElements = new ArrayList[Integer]
|
|
val longElements = new ArrayList[jl.Long]
|
|
val otherElements = new ArrayList[dm.OtherMessage]
|
|
var otherElementsMap = Map.empty[dm.OtherMessage, Any]
|
|
val actorRefElements = new ArrayList[ActorRef]
|
|
orset.elementsMap.keysIterator.foreach {
|
|
case s: String ⇒ stringElements.add(s)
|
|
case i: Int ⇒ intElements.add(i)
|
|
case l: Long ⇒ longElements.add(l)
|
|
case ref: ActorRef ⇒ actorRefElements.add(ref)
|
|
case other ⇒
|
|
val enclosedMsg = otherMessageToProto(other)
|
|
otherElements.add(enclosedMsg)
|
|
// need the mapping back to the `other` when adding dots
|
|
otherElementsMap = otherElementsMap.updated(enclosedMsg, other)
|
|
}
|
|
|
|
def addDots(elements: ArrayList[_]): Unit = {
|
|
// add corresponding dots in same order
|
|
val iter = elements.iterator
|
|
while (iter.hasNext) {
|
|
val element = iter.next() match {
|
|
case enclosedMsg: dm.OtherMessage ⇒ otherElementsMap(enclosedMsg)
|
|
case e ⇒ e
|
|
}
|
|
b.addDots(versionVectorToProto(orset.elementsMap(element)))
|
|
}
|
|
}
|
|
|
|
if (!stringElements.isEmpty) {
|
|
Collections.sort(stringElements)
|
|
b.addAllStringElements(stringElements)
|
|
addDots(stringElements)
|
|
}
|
|
if (!intElements.isEmpty) {
|
|
Collections.sort(intElements)
|
|
b.addAllIntElements(intElements)
|
|
addDots(intElements)
|
|
}
|
|
if (!longElements.isEmpty) {
|
|
Collections.sort(longElements)
|
|
b.addAllLongElements(longElements)
|
|
addDots(longElements)
|
|
}
|
|
if (!otherElements.isEmpty) {
|
|
Collections.sort(otherElements, OtherMessageComparator)
|
|
b.addAllOtherElements(otherElements)
|
|
addDots(otherElements)
|
|
}
|
|
if (!actorRefElements.isEmpty) {
|
|
Collections.sort(actorRefElements)
|
|
val iter = actorRefElements.iterator
|
|
while (iter.hasNext) {
|
|
b.addActorRefElements(Serialization.serializedActorPath(iter.next()))
|
|
}
|
|
addDots(actorRefElements)
|
|
}
|
|
|
|
b.build()
|
|
}
|
|
|
|
def orsetFromBinary(bytes: Array[Byte]): ORSet[Any] =
|
|
orsetFromProto(rd.ORSet.parseFrom(decompress(bytes)))
|
|
|
|
private def orsetAddFromBinary(bytes: Array[Byte]): ORSet.AddDeltaOp[Any] =
|
|
new ORSet.AddDeltaOp(orsetFromProto(rd.ORSet.parseFrom(bytes)))
|
|
|
|
private def orsetRemoveFromBinary(bytes: Array[Byte]): ORSet.RemoveDeltaOp[Any] =
|
|
new ORSet.RemoveDeltaOp(orsetFromProto(rd.ORSet.parseFrom(bytes)))
|
|
|
|
private def orsetFullFromBinary(bytes: Array[Byte]): ORSet.FullStateDeltaOp[Any] =
|
|
new ORSet.FullStateDeltaOp(orsetFromProto(rd.ORSet.parseFrom(bytes)))
|
|
|
|
private def orsetDeltaGroupToProto(deltaGroup: ORSet.DeltaGroup[_]): rd.ORSetDeltaGroup = {
|
|
def createEntry(opType: rd.ORSetDeltaOp, u: ORSet[_]) = {
|
|
rd.ORSetDeltaGroup.Entry.newBuilder()
|
|
.setOperation(opType)
|
|
.setUnderlying(orsetToProto(u))
|
|
}
|
|
|
|
val b = rd.ORSetDeltaGroup.newBuilder()
|
|
deltaGroup.ops.foreach {
|
|
case ORSet.AddDeltaOp(u) ⇒
|
|
b.addEntries(createEntry(rd.ORSetDeltaOp.Add, u))
|
|
case ORSet.RemoveDeltaOp(u) ⇒
|
|
b.addEntries(createEntry(rd.ORSetDeltaOp.Remove, u))
|
|
case ORSet.FullStateDeltaOp(u) ⇒
|
|
b.addEntries(createEntry(rd.ORSetDeltaOp.Full, u))
|
|
case ORSet.DeltaGroup(u) ⇒
|
|
throw new IllegalArgumentException("ORSet.DeltaGroup should not be nested")
|
|
}
|
|
b.build()
|
|
}
|
|
|
|
private def orsetDeltaGroupFromBinary(bytes: Array[Byte]): ORSet.DeltaGroup[Any] = {
|
|
val deltaGroup = rd.ORSetDeltaGroup.parseFrom(bytes)
|
|
val ops: Vector[ORSet.DeltaOp] =
|
|
deltaGroup.getEntriesList.asScala.map { entry ⇒
|
|
if (entry.getOperation == rd.ORSetDeltaOp.Add)
|
|
ORSet.AddDeltaOp(orsetFromProto(entry.getUnderlying))
|
|
else if (entry.getOperation == rd.ORSetDeltaOp.Remove)
|
|
ORSet.RemoveDeltaOp(orsetFromProto(entry.getUnderlying))
|
|
else if (entry.getOperation == rd.ORSetDeltaOp.Full)
|
|
ORSet.FullStateDeltaOp(orsetFromProto(entry.getUnderlying))
|
|
else
|
|
throw new NotSerializableException(s"Unknow ORSet delta operation ${entry.getOperation}")
|
|
}(collection.breakOut)
|
|
ORSet.DeltaGroup(ops)
|
|
}
|
|
|
|
def orsetFromProto(orset: rd.ORSet): ORSet[Any] = {
|
|
val elements: Iterator[Any] = {
|
|
orset.getStringElementsList.iterator.asScala ++
|
|
orset.getIntElementsList.iterator.asScala ++
|
|
orset.getLongElementsList.iterator.asScala ++
|
|
orset.getOtherElementsList.iterator.asScala.map(otherMessageFromProto) ++
|
|
orset.getActorRefElementsList.iterator.asScala.map(resolveActorRef)
|
|
}
|
|
|
|
val dots = orset.getDotsList.asScala.map(versionVectorFromProto).iterator
|
|
val elementsMap = elements.zip(dots).toMap
|
|
|
|
new ORSet(elementsMap, vvector = versionVectorFromProto(orset.getVvector))
|
|
}
|
|
|
|
def flagToProto(flag: Flag): rd.Flag =
|
|
rd.Flag.newBuilder().setEnabled(flag.enabled).build()
|
|
|
|
def flagFromBinary(bytes: Array[Byte]): Flag =
|
|
flagFromProto(rd.Flag.parseFrom(bytes))
|
|
|
|
def flagFromProto(flag: rd.Flag): Flag =
|
|
if (flag.getEnabled) Flag.Enabled else Flag.Disabled
|
|
|
|
def lwwRegisterToProto(lwwRegister: LWWRegister[_]): rd.LWWRegister =
|
|
rd.LWWRegister.newBuilder().
|
|
setTimestamp(lwwRegister.timestamp).
|
|
setNode(uniqueAddressToProto(lwwRegister.node)).
|
|
setState(otherMessageToProto(lwwRegister.value)).
|
|
build()
|
|
|
|
def lwwRegisterFromBinary(bytes: Array[Byte]): LWWRegister[Any] =
|
|
lwwRegisterFromProto(rd.LWWRegister.parseFrom(bytes))
|
|
|
|
def lwwRegisterFromProto(lwwRegister: rd.LWWRegister): LWWRegister[Any] =
|
|
new LWWRegister(
|
|
uniqueAddressFromProto(lwwRegister.getNode),
|
|
otherMessageFromProto(lwwRegister.getState),
|
|
lwwRegister.getTimestamp)
|
|
|
|
def gcounterToProto(gcounter: GCounter): rd.GCounter = {
|
|
val b = rd.GCounter.newBuilder()
|
|
gcounter.state.toVector.sortBy { case (address, _) ⇒ address }.foreach {
|
|
case (address, value) ⇒ b.addEntries(rd.GCounter.Entry.newBuilder().
|
|
setNode(uniqueAddressToProto(address)).setValue(ByteString.copyFrom(value.toByteArray)))
|
|
}
|
|
b.build()
|
|
}
|
|
|
|
def gcounterFromBinary(bytes: Array[Byte]): GCounter =
|
|
gcounterFromProto(rd.GCounter.parseFrom(bytes))
|
|
|
|
def gcounterFromProto(gcounter: rd.GCounter): GCounter = {
|
|
new GCounter(state = gcounter.getEntriesList.asScala.map(entry ⇒
|
|
uniqueAddressFromProto(entry.getNode) → BigInt(entry.getValue.toByteArray))(breakOut))
|
|
}
|
|
|
|
def pncounterToProto(pncounter: PNCounter): rd.PNCounter =
|
|
rd.PNCounter.newBuilder().
|
|
setIncrements(gcounterToProto(pncounter.increments)).
|
|
setDecrements(gcounterToProto(pncounter.decrements)).
|
|
build()
|
|
|
|
def pncounterFromBinary(bytes: Array[Byte]): PNCounter =
|
|
pncounterFromProto(rd.PNCounter.parseFrom(bytes))
|
|
|
|
def pncounterFromProto(pncounter: rd.PNCounter): PNCounter = {
|
|
new PNCounter(
|
|
increments = gcounterFromProto(pncounter.getIncrements),
|
|
decrements = gcounterFromProto(pncounter.getDecrements))
|
|
}
|
|
|
|
/*
|
|
* Convert a Map[A, B] to an Iterable[Entry] where Entry is the protobuf map entry.
|
|
*/
|
|
private def getEntries[IKey, IValue, EntryBuilder <: GeneratedMessage.Builder[EntryBuilder], PEntry <: GeneratedMessage, PValue <: GeneratedMessage](input: Map[IKey, IValue], createBuilder: () ⇒ EntryBuilder, valueConverter: IValue ⇒ PValue)(implicit comparator: Comparator[PEntry], eh: ProtoMapEntryWriter[PEntry, EntryBuilder, PValue]): java.lang.Iterable[PEntry] = {
|
|
// The resulting Iterable needs to be ordered deterministically in order to create same signature upon serializing same data
|
|
val protoEntries = new TreeSet[PEntry](comparator)
|
|
input.foreach {
|
|
case (key: String, value) ⇒ protoEntries.add(eh.setStringKey(createBuilder(), key, valueConverter(value)))
|
|
case (key: Int, value) ⇒ protoEntries.add(eh.setIntKey(createBuilder(), key, valueConverter(value)))
|
|
case (key: Long, value) ⇒ protoEntries.add(eh.setLongKey(createBuilder(), key, valueConverter(value)))
|
|
case (key, value) ⇒ protoEntries.add(eh.setOtherKey(createBuilder(), otherMessageToProto(key), valueConverter(value)))
|
|
}
|
|
protoEntries
|
|
}
|
|
|
|
def ormapToProto(ormap: ORMap[_, _]): rd.ORMap = {
|
|
val ormapBuilder = rd.ORMap.newBuilder()
|
|
val entries: jl.Iterable[rd.ORMap.Entry] = getEntries(ormap.values, rd.ORMap.Entry.newBuilder _, otherMessageToProto)
|
|
ormapBuilder.setKeys(orsetToProto(ormap.keys)).addAllEntries(entries).build()
|
|
}
|
|
|
|
def ormapFromBinary(bytes: Array[Byte]): ORMap[Any, ReplicatedData] =
|
|
ormapFromProto(rd.ORMap.parseFrom(decompress(bytes)))
|
|
|
|
def mapTypeFromProto[PEntry <: GeneratedMessage, A <: GeneratedMessage, B <: ReplicatedData](input: util.List[PEntry], valueCreator: A ⇒ B)(implicit eh: ProtoMapEntryReader[PEntry, A]): Map[Any, B] = {
|
|
input.asScala.map { entry ⇒
|
|
if (eh.hasStringKey(entry)) eh.getStringKey(entry) → valueCreator(eh.getValue(entry))
|
|
else if (eh.hasIntKey(entry)) eh.getIntKey(entry) → valueCreator(eh.getValue(entry))
|
|
else if (eh.hasLongKey(entry)) eh.getLongKey(entry) → valueCreator(eh.getValue(entry))
|
|
else if (eh.hasOtherKey(entry)) otherMessageFromProto(eh.getOtherKey(entry)) → valueCreator(eh.getValue(entry))
|
|
else throw new IllegalArgumentException(s"Can't deserialize ${entry.getClass} because it does not have any key in the serialized message.")
|
|
}.toMap
|
|
}
|
|
|
|
def ormapFromProto(ormap: rd.ORMap): ORMap[Any, ReplicatedData] = {
|
|
val entries = mapTypeFromProto(ormap.getEntriesList, (v: dm.OtherMessage) ⇒ otherMessageFromProto(v).asInstanceOf[ReplicatedData])
|
|
new ORMap(
|
|
keys = orsetFromProto(ormap.getKeys),
|
|
entries,
|
|
ORMap.VanillaORMapTag)
|
|
}
|
|
|
|
def singleMapEntryFromProto[PEntry <: GeneratedMessage, A <: GeneratedMessage, B <: ReplicatedData](input: util.List[PEntry], valueCreator: A ⇒ B)(implicit eh: ProtoMapEntryReader[PEntry, A]): Map[Any, B] = {
|
|
val map = mapTypeFromProto(input, valueCreator)
|
|
if (map.size > 1)
|
|
throw new IllegalArgumentException(s"Can't deserialize the key/value pair in the ORMap delta - too many pairs on the wire")
|
|
else
|
|
map
|
|
}
|
|
|
|
def singleKeyEntryFromProto[PEntry <: GeneratedMessage, A <: GeneratedMessage](entryOption: Option[PEntry])(implicit eh: ProtoMapEntryReader[PEntry, A]): Any =
|
|
entryOption match {
|
|
case Some(entry) ⇒ if (eh.hasStringKey(entry)) eh.getStringKey(entry)
|
|
else if (eh.hasIntKey(entry)) eh.getIntKey(entry)
|
|
else if (eh.hasLongKey(entry)) eh.getLongKey(entry)
|
|
else if (eh.hasOtherKey(entry)) otherMessageFromProto(eh.getOtherKey(entry))
|
|
else throw new IllegalArgumentException(s"Can't deserialize the key in the ORMap delta")
|
|
case _ ⇒ throw new IllegalArgumentException(s"Can't deserialize the key in the ORMap delta")
|
|
}
|
|
|
|
// wire protocol is always DeltaGroup
|
|
private def ormapPutFromBinary(bytes: Array[Byte]): ORMap.PutDeltaOp[Any, ReplicatedData] = {
|
|
val ops = ormapDeltaGroupOpsFromBinary(bytes)
|
|
if (ops.size == 1 && ops.head.isInstanceOf[ORMap.PutDeltaOp[_, _]])
|
|
ops.head.asInstanceOf[ORMap.PutDeltaOp[Any, ReplicatedData]]
|
|
else
|
|
throw new NotSerializableException("Improper ORMap delta put operation size or kind")
|
|
}
|
|
|
|
// wire protocol is always delta group
|
|
private def ormapRemoveFromBinary(bytes: Array[Byte]): ORMap.RemoveDeltaOp[Any, ReplicatedData] = {
|
|
val ops = ormapDeltaGroupOpsFromBinary(bytes)
|
|
if (ops.size == 1 && ops.head.isInstanceOf[ORMap.RemoveDeltaOp[_, _]])
|
|
ops.head.asInstanceOf[ORMap.RemoveDeltaOp[Any, ReplicatedData]]
|
|
else
|
|
throw new NotSerializableException("Improper ORMap delta remove operation size or kind")
|
|
}
|
|
|
|
// wire protocol is always delta group
|
|
private def ormapRemoveKeyFromBinary(bytes: Array[Byte]): ORMap.RemoveKeyDeltaOp[Any, ReplicatedData] = {
|
|
val ops = ormapDeltaGroupOpsFromBinary(bytes)
|
|
if (ops.size == 1 && ops.head.isInstanceOf[ORMap.RemoveKeyDeltaOp[_, _]])
|
|
ops.head.asInstanceOf[ORMap.RemoveKeyDeltaOp[Any, ReplicatedData]]
|
|
else
|
|
throw new NotSerializableException("Improper ORMap delta remove key operation size or kind")
|
|
}
|
|
|
|
// wire protocol is always delta group
|
|
private def ormapUpdateFromBinary(bytes: Array[Byte]): ORMap.UpdateDeltaOp[Any, ReplicatedDelta] = {
|
|
val ops = ormapDeltaGroupOpsFromBinary(bytes)
|
|
if (ops.size == 1 && ops.head.isInstanceOf[ORMap.UpdateDeltaOp[_, _]])
|
|
ops.head.asInstanceOf[ORMap.UpdateDeltaOp[Any, ReplicatedDelta]]
|
|
else
|
|
throw new NotSerializableException("Improper ORMap delta update operation size or kind")
|
|
}
|
|
|
|
// this can be made client-extendable in the same way as Http codes in Spray are
|
|
private def zeroTagFromCode(code: Int) = code match {
|
|
case ORMap.VanillaORMapTag.value ⇒ ORMap.VanillaORMapTag
|
|
case PNCounterMap.PNCounterMapTag.value ⇒ PNCounterMap.PNCounterMapTag
|
|
case ORMultiMap.ORMultiMapTag.value ⇒ ORMultiMap.ORMultiMapTag
|
|
case ORMultiMap.ORMultiMapWithValueDeltasTag.value ⇒ ORMultiMap.ORMultiMapWithValueDeltasTag
|
|
case LWWMap.LWWMapTag.value ⇒ LWWMap.LWWMapTag
|
|
case _ ⇒ throw new IllegalArgumentException("Invalid ZeroTag code")
|
|
}
|
|
|
|
private def ormapDeltaGroupFromBinary(bytes: Array[Byte]): ORMap.DeltaGroup[Any, ReplicatedData] = {
|
|
ORMap.DeltaGroup(ormapDeltaGroupOpsFromBinary(bytes))
|
|
}
|
|
|
|
private def ormapDeltaGroupOpsFromBinary(bytes: Array[Byte]): scala.collection.immutable.IndexedSeq[ORMap.DeltaOp] = {
|
|
val deltaGroup = rd.ORMapDeltaGroup.parseFrom(bytes)
|
|
val ops: Vector[ORMap.DeltaOp] =
|
|
deltaGroup.getEntriesList.asScala.map { entry ⇒
|
|
if (entry.getOperation == rd.ORMapDeltaOp.ORMapPut) {
|
|
val map = singleMapEntryFromProto(entry.getEntryDataList, (v: dm.OtherMessage) ⇒ otherMessageFromProto(v).asInstanceOf[ReplicatedData])
|
|
ORMap.PutDeltaOp(ORSet.AddDeltaOp(orsetFromProto(entry.getUnderlying)), map.head, zeroTagFromCode(entry.getZeroTag))
|
|
} else if (entry.getOperation == rd.ORMapDeltaOp.ORMapRemove) {
|
|
ORMap.RemoveDeltaOp(ORSet.RemoveDeltaOp(orsetFromProto(entry.getUnderlying)), zeroTagFromCode(entry.getZeroTag))
|
|
} else if (entry.getOperation == rd.ORMapDeltaOp.ORMapRemoveKey) {
|
|
val elem = singleKeyEntryFromProto(entry.getEntryDataList.asScala.headOption)
|
|
ORMap.RemoveKeyDeltaOp(ORSet.RemoveDeltaOp(orsetFromProto(entry.getUnderlying)), elem, zeroTagFromCode(entry.getZeroTag))
|
|
} else if (entry.getOperation == rd.ORMapDeltaOp.ORMapUpdate) {
|
|
val map = mapTypeFromProto(entry.getEntryDataList, (v: dm.OtherMessage) ⇒ otherMessageFromProto(v).asInstanceOf[ReplicatedDelta])
|
|
ORMap.UpdateDeltaOp(ORSet.AddDeltaOp(orsetFromProto(entry.getUnderlying)), map, zeroTagFromCode(entry.getZeroTag))
|
|
} else
|
|
throw new NotSerializableException(s"Unknown ORMap delta operation ${entry.getOperation}")
|
|
}(collection.breakOut)
|
|
ops
|
|
}
|
|
|
|
private def ormapPutToProto(deltaOp: ORMap.PutDeltaOp[_, _]): rd.ORMapDeltaGroup = {
|
|
ormapDeltaGroupOpsToProto(scala.collection.immutable.IndexedSeq(deltaOp.asInstanceOf[ORMap.DeltaOp]))
|
|
}
|
|
|
|
private def ormapRemoveToProto(deltaOp: ORMap.RemoveDeltaOp[_, _]): rd.ORMapDeltaGroup = {
|
|
ormapDeltaGroupOpsToProto(scala.collection.immutable.IndexedSeq(deltaOp.asInstanceOf[ORMap.DeltaOp]))
|
|
}
|
|
|
|
private def ormapRemoveKeyToProto(deltaOp: ORMap.RemoveKeyDeltaOp[_, _]): rd.ORMapDeltaGroup = {
|
|
ormapDeltaGroupOpsToProto(scala.collection.immutable.IndexedSeq(deltaOp.asInstanceOf[ORMap.DeltaOp]))
|
|
}
|
|
|
|
private def ormapUpdateToProto(deltaOp: ORMap.UpdateDeltaOp[_, _]): rd.ORMapDeltaGroup = {
|
|
ormapDeltaGroupOpsToProto(scala.collection.immutable.IndexedSeq(deltaOp.asInstanceOf[ORMap.DeltaOp]))
|
|
}
|
|
|
|
private def ormapDeltaGroupToProto(deltaGroup: ORMap.DeltaGroup[_, _]): rd.ORMapDeltaGroup = {
|
|
ormapDeltaGroupOpsToProto(deltaGroup.ops)
|
|
}
|
|
|
|
private def ormapDeltaGroupOpsToProto(deltaGroupOps: scala.collection.immutable.IndexedSeq[ORMap.DeltaOp]): rd.ORMapDeltaGroup = {
|
|
def createEntry(opType: rd.ORMapDeltaOp, u: ORSet[_], m: Map[_, _], zt: Int) = {
|
|
if (m.size > 1 && opType != rd.ORMapDeltaOp.ORMapUpdate)
|
|
throw new IllegalArgumentException("Invalid size of ORMap delta map")
|
|
else {
|
|
val builder = rd.ORMapDeltaGroup.Entry.newBuilder()
|
|
.setOperation(opType)
|
|
.setUnderlying(orsetToProto(u))
|
|
.setZeroTag(zt)
|
|
m.foreach {
|
|
case (key: String, value) ⇒ builder.addEntryData(rd.ORMapDeltaGroup.MapEntry.newBuilder().setStringKey(key).setValue(otherMessageToProto(value)).build())
|
|
case (key: Int, value) ⇒ builder.addEntryData(rd.ORMapDeltaGroup.MapEntry.newBuilder().setIntKey(key).setValue(otherMessageToProto(value)).build())
|
|
case (key: Long, value) ⇒ builder.addEntryData(rd.ORMapDeltaGroup.MapEntry.newBuilder().setLongKey(key).setValue(otherMessageToProto(value)).build())
|
|
case (key, value) ⇒ builder.addEntryData(rd.ORMapDeltaGroup.MapEntry.newBuilder().setOtherKey(otherMessageToProto(key)).setValue(otherMessageToProto(value)).build())
|
|
}
|
|
builder
|
|
}
|
|
}
|
|
|
|
def createEntryWithKey(opType: rd.ORMapDeltaOp, u: ORSet[_], k: Any, zt: Int) = {
|
|
val entryDataBuilder = rd.ORMapDeltaGroup.MapEntry.newBuilder()
|
|
k match {
|
|
case key: String ⇒ entryDataBuilder.setStringKey(key)
|
|
case key: Int ⇒ entryDataBuilder.setIntKey(key)
|
|
case key: Long ⇒ entryDataBuilder.setLongKey(key)
|
|
case key ⇒ entryDataBuilder.setOtherKey(otherMessageToProto(key))
|
|
}
|
|
val builder = rd.ORMapDeltaGroup.Entry.newBuilder()
|
|
.setOperation(opType)
|
|
.setUnderlying(orsetToProto(u))
|
|
.setZeroTag(zt)
|
|
builder.addEntryData(entryDataBuilder.build())
|
|
builder
|
|
}
|
|
|
|
val b = rd.ORMapDeltaGroup.newBuilder()
|
|
deltaGroupOps.foreach {
|
|
case ORMap.PutDeltaOp(op, pair, zt) ⇒
|
|
b.addEntries(createEntry(rd.ORMapDeltaOp.ORMapPut, op.asInstanceOf[ORSet.AddDeltaOp[_]].underlying, Map(pair), zt.value))
|
|
case ORMap.RemoveDeltaOp(op, zt) ⇒
|
|
b.addEntries(createEntry(rd.ORMapDeltaOp.ORMapRemove, op.asInstanceOf[ORSet.RemoveDeltaOp[_]].underlying, Map.empty, zt.value))
|
|
case ORMap.RemoveKeyDeltaOp(op, k, zt) ⇒
|
|
b.addEntries(createEntryWithKey(rd.ORMapDeltaOp.ORMapRemoveKey, op.asInstanceOf[ORSet.RemoveDeltaOp[_]].underlying, k, zt.value))
|
|
case ORMap.UpdateDeltaOp(op, m, zt) ⇒
|
|
b.addEntries(createEntry(rd.ORMapDeltaOp.ORMapUpdate, op.asInstanceOf[ORSet.AddDeltaOp[_]].underlying, m, zt.value))
|
|
case ORMap.DeltaGroup(u) ⇒
|
|
throw new IllegalArgumentException("ORMap.DeltaGroup should not be nested")
|
|
}
|
|
b.build()
|
|
}
|
|
|
|
def lwwmapToProto(lwwmap: LWWMap[_, _]): rd.LWWMap = {
|
|
val lwwmapBuilder = rd.LWWMap.newBuilder()
|
|
val entries: jl.Iterable[rd.LWWMap.Entry] = getEntries(lwwmap.underlying.entries, rd.LWWMap.Entry.newBuilder _, lwwRegisterToProto)
|
|
lwwmapBuilder.setKeys(orsetToProto(lwwmap.underlying.keys)).addAllEntries(entries).build()
|
|
}
|
|
|
|
def lwwmapFromBinary(bytes: Array[Byte]): LWWMap[Any, Any] =
|
|
lwwmapFromProto(rd.LWWMap.parseFrom(decompress(bytes)))
|
|
|
|
def lwwmapFromProto(lwwmap: rd.LWWMap): LWWMap[Any, Any] = {
|
|
val entries = mapTypeFromProto(lwwmap.getEntriesList, lwwRegisterFromProto)
|
|
new LWWMap(new ORMap(
|
|
keys = orsetFromProto(lwwmap.getKeys),
|
|
entries, LWWMap.LWWMapTag))
|
|
}
|
|
|
|
def pncountermapToProto(pncountermap: PNCounterMap[_]): rd.PNCounterMap = {
|
|
val pncountermapBuilder = rd.PNCounterMap.newBuilder()
|
|
val entries: jl.Iterable[rd.PNCounterMap.Entry] = getEntries(pncountermap.underlying.entries, rd.PNCounterMap.Entry.newBuilder _, pncounterToProto)
|
|
pncountermapBuilder.setKeys(orsetToProto(pncountermap.underlying.keys)).addAllEntries(entries).build()
|
|
}
|
|
|
|
def pncountermapFromBinary(bytes: Array[Byte]): PNCounterMap[_] =
|
|
pncountermapFromProto(rd.PNCounterMap.parseFrom(decompress(bytes)))
|
|
|
|
def pncountermapFromProto(pncountermap: rd.PNCounterMap): PNCounterMap[_] = {
|
|
val entries = mapTypeFromProto(pncountermap.getEntriesList, pncounterFromProto)
|
|
new PNCounterMap(new ORMap(
|
|
keys = orsetFromProto(pncountermap.getKeys),
|
|
entries, PNCounterMap.PNCounterMapTag))
|
|
}
|
|
|
|
def multimapToProto(multimap: ORMultiMap[_, _]): rd.ORMultiMap = {
|
|
val ormultimapBuilder = rd.ORMultiMap.newBuilder()
|
|
val entries: jl.Iterable[rd.ORMultiMap.Entry] = getEntries(multimap.underlying.entries, rd.ORMultiMap.Entry.newBuilder _, orsetToProto)
|
|
ormultimapBuilder.setKeys(orsetToProto(multimap.underlying.keys)).addAllEntries(entries)
|
|
if (multimap.withValueDeltas)
|
|
ormultimapBuilder.setWithValueDeltas(true)
|
|
ormultimapBuilder.build()
|
|
}
|
|
|
|
def multimapFromBinary(bytes: Array[Byte]): ORMultiMap[Any, Any] =
|
|
multimapFromProto(rd.ORMultiMap.parseFrom(decompress(bytes)))
|
|
|
|
def multimapFromProto(multimap: rd.ORMultiMap): ORMultiMap[Any, Any] = {
|
|
val entries = mapTypeFromProto(multimap.getEntriesList, orsetFromProto)
|
|
val withValueDeltas = if (multimap.hasWithValueDeltas)
|
|
multimap.getWithValueDeltas
|
|
else false
|
|
new ORMultiMap(
|
|
new ORMap(
|
|
keys = orsetFromProto(multimap.getKeys),
|
|
entries,
|
|
if (withValueDeltas)
|
|
ORMultiMap.ORMultiMapWithValueDeltasTag
|
|
else
|
|
ORMultiMap.ORMultiMapTag),
|
|
withValueDeltas)
|
|
}
|
|
|
|
def keyIdToBinary(id: String): Array[Byte] =
|
|
id.getBytes(UTF_8)
|
|
|
|
def keyIdFromBinary(bytes: Array[Byte]): String =
|
|
new String(bytes, UTF_8)
|
|
|
|
}
|
|
|
|
object OtherMessageComparator extends Comparator[dm.OtherMessage] {
|
|
override def compare(a: dm.OtherMessage, b: dm.OtherMessage): Int = {
|
|
val aByteString = a.getEnclosedMessage
|
|
val bByteString = b.getEnclosedMessage
|
|
val aSize = aByteString.size
|
|
val bSize = bByteString.size
|
|
if (aSize == bSize) {
|
|
val aIter = aByteString.iterator
|
|
val bIter = bByteString.iterator
|
|
@tailrec def findDiff(): Int = {
|
|
if (aIter.hasNext) {
|
|
val aByte = aIter.nextByte()
|
|
val bByte = bIter.nextByte()
|
|
if (aByte < bByte) -1
|
|
else if (aByte > bByte) 1
|
|
else findDiff()
|
|
} else 0
|
|
}
|
|
findDiff()
|
|
} else if (aSize < bSize) -1
|
|
else 1
|
|
}
|
|
}
|