delta-aggregation in the ORMap deltas (#22633)

This commit is contained in:
gosubpl 2017-03-24 16:18:01 +01:00
parent ecbcc56f28
commit 7c42627ea9
10 changed files with 728 additions and 231 deletions

View file

@ -8,6 +8,7 @@ import java.util.ArrayList
import java.util.Collections
import java.util.Comparator
import java.util.TreeSet
import scala.annotation.tailrec
import scala.collection.JavaConverters._
import scala.collection.breakOut
@ -24,6 +25,7 @@ import akka.util.ByteString.UTF_8
import scala.collection.immutable.TreeMap
import akka.cluster.UniqueAddress
import java.io.NotSerializableException
import akka.cluster.ddata.protobuf.msg.ReplicatorMessages.OtherMessage
import akka.cluster.ddata.ORSet.DeltaOp
@ -570,129 +572,134 @@ class ReplicatedDataSerializer(val system: ExtendedActorSystem)
val entries = mapTypeFromProto(ormap.getEntriesList, (v: dm.OtherMessage) otherMessageFromProto(v).asInstanceOf[ReplicatedData])
new ORMap(
keys = orsetFromProto(ormap.getKeys),
entries)
entries,
ORMap.VanillaORMapTag)
}
def singleMapEntryFromProto[PEntry <: GeneratedMessage, A <: GeneratedMessage, B <: ReplicatedData](entry: PEntry, valueCreator: A B)(implicit eh: ProtoMapEntryReader[PEntry, A]): Map[Any, B] = {
val elem = if (eh.hasStringKey(entry)) Some(eh.getStringKey(entry) valueCreator(eh.getValue(entry)))
else if (eh.hasIntKey(entry)) Some(eh.getIntKey(entry) valueCreator(eh.getValue(entry)))
else if (eh.hasLongKey(entry)) Some(eh.getLongKey(entry) valueCreator(eh.getValue(entry)))
else if (eh.hasOtherKey(entry)) Some(otherMessageFromProto(eh.getOtherKey(entry)) valueCreator(eh.getValue(entry)))
else None
elem match {
case Some(e) Map(e)
case _ Map.empty[Any, B]
def singleMapEntryFromProto[PEntry <: GeneratedMessage, A <: GeneratedMessage, B <: ReplicatedData](input: util.List[PEntry], valueCreator: A B)(implicit eh: ProtoMapEntryReader[PEntry, A]): Map[Any, B] = {
val map = mapTypeFromProto(input, valueCreator)
if (map.size > 1)
throw new IllegalArgumentException(s"Can't deserialize the key/value pair in the ORMap delta - too many pairs on the wire")
else
map
}
def singleKeyEntryFromProto[PEntry <: GeneratedMessage, A <: GeneratedMessage](entryOption: Option[PEntry])(implicit eh: ProtoMapEntryReader[PEntry, A]): Any =
entryOption match {
case Some(entry) if (eh.hasStringKey(entry)) eh.getStringKey(entry)
else if (eh.hasIntKey(entry)) eh.getIntKey(entry)
else if (eh.hasLongKey(entry)) eh.getLongKey(entry)
else if (eh.hasOtherKey(entry)) otherMessageFromProto(eh.getOtherKey(entry))
else throw new IllegalArgumentException(s"Can't deserialize the key in the ORMap delta")
case _ throw new IllegalArgumentException(s"Can't deserialize the key in the ORMap delta")
}
}
def singleKeyEntryFromProto[PEntry <: GeneratedMessage, A <: GeneratedMessage](entry: PEntry)(implicit eh: ProtoMapEntryReader[PEntry, A]): Any =
if (eh.hasStringKey(entry)) eh.getStringKey(entry)
else if (eh.hasIntKey(entry)) eh.getIntKey(entry)
else if (eh.hasLongKey(entry)) eh.getLongKey(entry)
else if (eh.hasOtherKey(entry)) otherMessageFromProto(eh.getOtherKey(entry))
else throw new IllegalArgumentException(s"Can't deserialize the key in the ORMap delta")
// wire protocol is always DeltaGroup
private def ormapPutFromBinary(bytes: Array[Byte]): ORMap.PutDeltaOp[Any, ReplicatedData] = {
val group = ormapDeltaGroupFromBinary(bytes)
if (group.ops.size == 1 && group.ops.head.isInstanceOf[ORMap.PutDeltaOp[_, _]])
group.ops.head.asInstanceOf[ORMap.PutDeltaOp[Any, ReplicatedData]]
val ops = ormapDeltaGroupOpsFromBinary(bytes)
if (ops.size == 1 && ops.head.isInstanceOf[ORMap.PutDeltaOp[_, _]])
ops.head.asInstanceOf[ORMap.PutDeltaOp[Any, ReplicatedData]]
else
throw new NotSerializableException("Improper ORMap delta put operation size or kind")
}
// wire protocol is always delta group
private def ormapRemoveFromBinary(bytes: Array[Byte]): ORMap.RemoveDeltaOp[Any, ReplicatedData] = {
val group = ormapDeltaGroupFromBinary(bytes)
if (group.ops.size == 1 && group.ops.head.isInstanceOf[ORMap.RemoveDeltaOp[_, _]])
group.ops.head.asInstanceOf[ORMap.RemoveDeltaOp[Any, ReplicatedData]]
val ops = ormapDeltaGroupOpsFromBinary(bytes)
if (ops.size == 1 && ops.head.isInstanceOf[ORMap.RemoveDeltaOp[_, _]])
ops.head.asInstanceOf[ORMap.RemoveDeltaOp[Any, ReplicatedData]]
else
throw new NotSerializableException("Improper ORMap delta remove operation size or kind")
}
// wire protocol is always delta group
private def ormapRemoveKeyFromBinary(bytes: Array[Byte]): ORMap.RemoveKeyDeltaOp[Any, ReplicatedData] = {
val group = ormapDeltaGroupFromBinary(bytes)
if (group.ops.size == 1 && group.ops.head.isInstanceOf[ORMap.RemoveKeyDeltaOp[_, _]])
group.ops.head.asInstanceOf[ORMap.RemoveKeyDeltaOp[Any, ReplicatedData]]
val ops = ormapDeltaGroupOpsFromBinary(bytes)
if (ops.size == 1 && ops.head.isInstanceOf[ORMap.RemoveKeyDeltaOp[_, _]])
ops.head.asInstanceOf[ORMap.RemoveKeyDeltaOp[Any, ReplicatedData]]
else
throw new NotSerializableException("Improper ORMap delta remove key operation size or kind")
}
// wire protocol is always delta group
private def ormapUpdateFromBinary(bytes: Array[Byte]): ORMap.UpdateDeltaOp[Any, ReplicatedDelta] = {
val group = ormapDeltaGroupFromBinary(bytes)
if (group.ops.size == 1 && group.ops.head.isInstanceOf[ORMap.UpdateDeltaOp[_, _]])
group.ops.head.asInstanceOf[ORMap.UpdateDeltaOp[Any, ReplicatedDelta]]
val ops = ormapDeltaGroupOpsFromBinary(bytes)
if (ops.size == 1 && ops.head.isInstanceOf[ORMap.UpdateDeltaOp[_, _]])
ops.head.asInstanceOf[ORMap.UpdateDeltaOp[Any, ReplicatedDelta]]
else
throw new NotSerializableException("Improper ORMap delta update operation size or kind")
}
// this can be made client-extendable in the same way as Http codes in Spray are
private def zeroTagFromCode(code: Int) = code match {
case ORMap.VanillaORMapTag.value ORMap.VanillaORMapTag
case ORMap.PNCounterMapTag.value ORMap.PNCounterMapTag
case ORMap.ORMultiMapTag.value ORMap.ORMultiMapTag
case ORMap.ORMultiMapWithValueDeltasTag.value ORMap.ORMultiMapWithValueDeltasTag
case ORMap.LWWMapTag.value ORMap.LWWMapTag
case _ throw new IllegalArgumentException("Invalid ZeroTag code")
case ORMap.VanillaORMapTag.value ORMap.VanillaORMapTag
case PNCounterMap.PNCounterMapTag.value PNCounterMap.PNCounterMapTag
case ORMultiMap.ORMultiMapTag.value ORMultiMap.ORMultiMapTag
case ORMultiMap.ORMultiMapWithValueDeltasTag.value ORMultiMap.ORMultiMapWithValueDeltasTag
case LWWMap.LWWMapTag.value LWWMap.LWWMapTag
case _ throw new IllegalArgumentException("Invalid ZeroTag code")
}
private def ormapDeltaGroupFromBinary(bytes: Array[Byte]): ORMap.DeltaGroup[Any, ReplicatedData] = {
ORMap.DeltaGroup(ormapDeltaGroupOpsFromBinary(bytes))
}
private def ormapDeltaGroupOpsFromBinary(bytes: Array[Byte]): scala.collection.immutable.IndexedSeq[ORMap.DeltaOp] = {
val deltaGroup = rd.ORMapDeltaGroup.parseFrom(bytes)
val ops: Vector[ORMap.DeltaOp] =
deltaGroup.getEntriesList.asScala.map { entry
if (entry.getOperation == rd.ORMapDeltaOp.ORMapPut) {
val map = singleMapEntryFromProto(entry.getEntryData, (v: dm.OtherMessage) otherMessageFromProto(v).asInstanceOf[ReplicatedData])
val map = singleMapEntryFromProto(entry.getEntryDataList, (v: dm.OtherMessage) otherMessageFromProto(v).asInstanceOf[ReplicatedData])
ORMap.PutDeltaOp(ORSet.AddDeltaOp(orsetFromProto(entry.getUnderlying)), map.head, zeroTagFromCode(entry.getZeroTag))
} else if (entry.getOperation == rd.ORMapDeltaOp.ORMapRemove) {
ORMap.RemoveDeltaOp(ORSet.RemoveDeltaOp(orsetFromProto(entry.getUnderlying)), zeroTagFromCode(entry.getZeroTag))
} else if (entry.getOperation == rd.ORMapDeltaOp.ORMapRemoveKey) {
val elem = singleKeyEntryFromProto(entry.getEntryData)
val elem = singleKeyEntryFromProto(entry.getEntryDataList.asScala.headOption)
ORMap.RemoveKeyDeltaOp(ORSet.RemoveDeltaOp(orsetFromProto(entry.getUnderlying)), elem, zeroTagFromCode(entry.getZeroTag))
} else if (entry.getOperation == rd.ORMapDeltaOp.ORMapUpdate) {
val map = singleMapEntryFromProto(entry.getEntryData, (v: dm.OtherMessage) otherMessageFromProto(v).asInstanceOf[ReplicatedDelta])
val map = mapTypeFromProto(entry.getEntryDataList, (v: dm.OtherMessage) otherMessageFromProto(v).asInstanceOf[ReplicatedDelta])
ORMap.UpdateDeltaOp(ORSet.AddDeltaOp(orsetFromProto(entry.getUnderlying)), map, zeroTagFromCode(entry.getZeroTag))
} else
throw new NotSerializableException(s"Unknown ORMap delta operation ${entry.getOperation}")
}(collection.breakOut)
ORMap.DeltaGroup(ops)
ops
}
private def ormapPutToProto(deltaOp: ORMap.PutDeltaOp[_, _]): rd.ORMapDeltaGroup = {
ormapDeltaGroupToProto(ORMap.DeltaGroup(scala.collection.immutable.IndexedSeq(deltaOp.asInstanceOf[ORMap.DeltaOp])))
ormapDeltaGroupOpsToProto(scala.collection.immutable.IndexedSeq(deltaOp.asInstanceOf[ORMap.DeltaOp]))
}
private def ormapRemoveToProto(deltaOp: ORMap.RemoveDeltaOp[_, _]): rd.ORMapDeltaGroup = {
ormapDeltaGroupToProto(ORMap.DeltaGroup(scala.collection.immutable.IndexedSeq(deltaOp.asInstanceOf[ORMap.DeltaOp])))
ormapDeltaGroupOpsToProto(scala.collection.immutable.IndexedSeq(deltaOp.asInstanceOf[ORMap.DeltaOp]))
}
private def ormapRemoveKeyToProto(deltaOp: ORMap.RemoveKeyDeltaOp[_, _]): rd.ORMapDeltaGroup = {
ormapDeltaGroupToProto(ORMap.DeltaGroup(scala.collection.immutable.IndexedSeq(deltaOp.asInstanceOf[ORMap.DeltaOp])))
ormapDeltaGroupOpsToProto(scala.collection.immutable.IndexedSeq(deltaOp.asInstanceOf[ORMap.DeltaOp]))
}
private def ormapUpdateToProto(deltaOp: ORMap.UpdateDeltaOp[_, _]): rd.ORMapDeltaGroup = {
ormapDeltaGroupToProto(ORMap.DeltaGroup(scala.collection.immutable.IndexedSeq(deltaOp.asInstanceOf[ORMap.DeltaOp])))
ormapDeltaGroupOpsToProto(scala.collection.immutable.IndexedSeq(deltaOp.asInstanceOf[ORMap.DeltaOp]))
}
private def ormapDeltaGroupToProto(deltaGroup: ORMap.DeltaGroup[_, _]): rd.ORMapDeltaGroup = {
ormapDeltaGroupOpsToProto(deltaGroup.ops)
}
private def ormapDeltaGroupOpsToProto(deltaGroupOps: scala.collection.immutable.IndexedSeq[ORMap.DeltaOp]): rd.ORMapDeltaGroup = {
def createEntry(opType: rd.ORMapDeltaOp, u: ORSet[_], m: Map[_, _], zt: Int) = {
if (m.size > 1)
if (m.size > 1 && opType != rd.ORMapDeltaOp.ORMapUpdate)
throw new IllegalArgumentException("Invalid size of ORMap delta map")
else {
val entryDataBuilder = rd.ORMapDeltaGroup.MapEntry.newBuilder()
m.headOption.map {
case (key: String, value) entryDataBuilder.setStringKey(key).setValue(otherMessageToProto(value))
case (key: Int, value) entryDataBuilder.setIntKey(key).setValue(otherMessageToProto(value))
case (key: Long, value) entryDataBuilder.setLongKey(key).setValue(otherMessageToProto(value))
case (key, value) entryDataBuilder.setOtherKey(otherMessageToProto(key)).setValue(otherMessageToProto(value))
}
val builder = rd.ORMapDeltaGroup.Entry.newBuilder()
.setOperation(opType)
.setUnderlying(orsetToProto(u))
.setZeroTag(zt)
if (m.size > 0)
builder.setEntryData(entryDataBuilder.build())
m.foreach {
case (key: String, value) builder.addEntryData(rd.ORMapDeltaGroup.MapEntry.newBuilder().setStringKey(key).setValue(otherMessageToProto(value)).build())
case (key: Int, value) builder.addEntryData(rd.ORMapDeltaGroup.MapEntry.newBuilder().setIntKey(key).setValue(otherMessageToProto(value)).build())
case (key: Long, value) builder.addEntryData(rd.ORMapDeltaGroup.MapEntry.newBuilder().setLongKey(key).setValue(otherMessageToProto(value)).build())
case (key, value) builder.addEntryData(rd.ORMapDeltaGroup.MapEntry.newBuilder().setOtherKey(otherMessageToProto(key)).setValue(otherMessageToProto(value)).build())
}
builder
}
}
@ -709,12 +716,12 @@ class ReplicatedDataSerializer(val system: ExtendedActorSystem)
.setOperation(opType)
.setUnderlying(orsetToProto(u))
.setZeroTag(zt)
builder.setEntryData(entryDataBuilder.build())
builder.addEntryData(entryDataBuilder.build())
builder
}
val b = rd.ORMapDeltaGroup.newBuilder()
deltaGroup.ops.foreach {
deltaGroupOps.foreach {
case ORMap.PutDeltaOp(op, pair, zt)
b.addEntries(createEntry(rd.ORMapDeltaOp.ORMapPut, op.asInstanceOf[ORSet.AddDeltaOp[_]].underlying, Map(pair), zt.value))
case ORMap.RemoveDeltaOp(op, zt)
@ -742,7 +749,7 @@ class ReplicatedDataSerializer(val system: ExtendedActorSystem)
val entries = mapTypeFromProto(lwwmap.getEntriesList, lwwRegisterFromProto)
new LWWMap(new ORMap(
keys = orsetFromProto(lwwmap.getKeys),
entries, ORMap.LWWMapTag))
entries, LWWMap.LWWMapTag))
}
def pncountermapToProto(pncountermap: PNCounterMap[_]): rd.PNCounterMap = {
@ -758,7 +765,7 @@ class ReplicatedDataSerializer(val system: ExtendedActorSystem)
val entries = mapTypeFromProto(pncountermap.getEntriesList, pncounterFromProto)
new PNCounterMap(new ORMap(
keys = orsetFromProto(pncountermap.getKeys),
entries, ORMap.PNCounterMapTag))
entries, PNCounterMap.PNCounterMapTag))
}
def multimapToProto(multimap: ORMultiMap[_, _]): rd.ORMultiMap = {
@ -783,9 +790,9 @@ class ReplicatedDataSerializer(val system: ExtendedActorSystem)
keys = orsetFromProto(multimap.getKeys),
entries,
if (withValueDeltas)
ORMap.ORMultiMapWithValueDeltasTag
ORMultiMap.ORMultiMapWithValueDeltasTag
else
ORMap.ORMultiMapTag),
ORMultiMap.ORMultiMapTag),
withValueDeltas)
}